summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYorhel <git@yorhel.nl>2012-02-23 09:58:40 +0100
committerYorhel <git@yorhel.nl>2012-02-23 09:58:40 +0100
commitc330a85ef90437801044969d8485cdfc2f69fb5e (patch)
tree2e33e12bc1587f13b99a88409e5b2433d57e1e47
parent1a5213104b9ac47eb15cdc6c3858cdacdff8fe0a (diff)
go: Moved to go/ dir and s/server/node/g
-rw-r--r--go/matching.go (renamed from matching.go)0
-rw-r--r--go/matching_test.go (renamed from matching_test.go)0
-rw-r--r--go/node.go (renamed from server.go)70
-rw-r--r--go/proto.go (renamed from proto.go)38
-rw-r--r--go/proto_gob.go (renamed from proto_gob.go)0
-rw-r--r--go/proto_test.go (renamed from proto_test.go)4
-rw-r--r--go/session.go (renamed from session.go)40
-rw-r--r--go/session_test.go (renamed from session_test.go)48
8 files changed, 101 insertions, 99 deletions
diff --git a/matching.go b/go/matching.go
index b319842..b319842 100644
--- a/matching.go
+++ b/go/matching.go
diff --git a/matching_test.go b/go/matching_test.go
index f01aaf4..f01aaf4 100644
--- a/matching_test.go
+++ b/go/matching_test.go
diff --git a/server.go b/go/node.go
index 36968fa..7e8f810 100644
--- a/server.go
+++ b/go/node.go
@@ -2,67 +2,67 @@ package tanja
type PatternReg int32
-type Server struct {
+type Node struct {
ref chan<- int
- reg chan<- *serverRegister
+ reg chan<- *nodeRegister
unreg chan<- PatternReg
- snd chan<- *serverSend
- hook chan<- *serverHook
- unhook chan<- *serverHook // must be same pointer as sent with hook
+ snd chan<- *nodeSend
+ hook chan<- *nodeHook
+ unhook chan<- *nodeHook // must be same pointer as sent with hook
}
-func ServerCreate() *Server {
+func NodeCreate() *Node {
ref := make(chan int)
- reg := make(chan *serverRegister)
+ reg := make(chan *nodeRegister)
unreg := make(chan PatternReg)
- snd := make(chan *serverSend)
- hook := make(chan *serverHook)
- unhook := make(chan *serverHook)
- go serverRouter(ref, reg, unreg, snd, hook, unhook)
- return &Server{ref, reg, unreg, snd, hook, unhook}
+ snd := make(chan *nodeSend)
+ hook := make(chan *nodeHook)
+ unhook := make(chan *nodeHook)
+ go nodeRouter(ref, reg, unreg, snd, hook, unhook)
+ return &Node{ref, reg, unreg, snd, hook, unhook}
}
-func (s *Server) Send(t ...interface{}) {
- s.snd <- &serverSend{Tuple(t), nil}
+func (s *Node) Send(t ...interface{}) {
+ s.snd <- &nodeSend{Tuple(t), nil}
}
-func (s *Server) Request(t ...interface{}) <-chan Tuple {
+func (s *Node) Request(t ...interface{}) <-chan Tuple {
ch := make(chan Tuple, 100) // TODO: make sure this is buffered
- s.snd <- &serverSend{Tuple(t), ch}
+ s.snd <- &nodeSend{Tuple(t), ch}
return ch
}
-func (s *Server) Close() {
+func (s *Node) Close() {
s.ref <- -1
}
-type serverSend struct {
+type nodeSend struct {
t Tuple
ch chan<- Tuple
}
-type serverRecv struct {
+type nodeRecv struct {
t Tuple
ch chan<- Tuple
id PatternReg
data interface{}
}
-type serverPattern struct {
- recipient chan<- *serverRecv
+type nodePattern struct {
+ recipient chan<- *nodeRecv
willReply bool
id PatternReg
data interface{}
}
-type serverRegister struct {
- dat *serverPattern
+type nodeRegister struct {
+ dat *nodePattern
pat Pattern
reply chan<- PatternReg
}
-type serverHook struct {
- reg chan<- *serverRegister
+type nodeHook struct {
+ reg chan<- *nodeRegister
unreg chan<- PatternReg
list bool // whether or not to send a list of all current patterns to reg
}
@@ -75,12 +75,12 @@ func (i *PatternReg) inc() {
}
// Manages a Matcher object and routes messages to sessions
-func serverRouter(ref <-chan int, reg <-chan *serverRegister, unreg <-chan PatternReg, snd <-chan *serverSend, hook, unhook <-chan *serverHook) {
+func nodeRouter(ref <-chan int, reg <-chan *nodeRegister, unreg <-chan PatternReg, snd <-chan *nodeSend, hook, unhook <-chan *nodeHook) {
p := make(map[PatternReg]*MatcherItem)
lastId := PatternReg(1)
m := MatcherCreate()
- h := make(map[*serverHook]bool)
- clients := 1 // The initial client is the *Server object itself
+ h := make(map[*nodeHook]bool)
+ clients := 1 // The initial client is the *Node object itself
for clients > 0 {
select {
case n := <-ref:
@@ -106,12 +106,12 @@ func serverRouter(ref <-chan int, reg <-chan *serverRegister, unreg <-chan Patte
delete(p, n)
}
case n := <-snd:
- serverDeliver(m, n)
+ nodeDeliver(m, n)
case n := <-hook:
h[n] = true
if n.list {
for _, v := range p {
- n.reg <- &serverRegister{v.Data.(*serverPattern), v.Pattern, nil}
+ n.reg <- &nodeRegister{v.Data.(*nodePattern), v.Pattern, nil}
}
}
case n := <-unhook:
@@ -120,12 +120,12 @@ func serverRouter(ref <-chan int, reg <-chan *serverRegister, unreg <-chan Patte
}
}
-func serverDeliver(m *Matcher, n *serverSend) {
+func nodeDeliver(m *Matcher, n *nodeSend) {
var chout chan Tuple
chnum := 0
m.With(n.t, func(i *MatcherItem, t Tuple) bool {
- p := i.Data.(*serverPattern)
+ p := i.Data.(*nodePattern)
// Create a reply channel if there is one that will reply
var ch chan Tuple
@@ -144,21 +144,21 @@ func serverDeliver(m *Matcher, n *serverSend) {
}(ch, chout)
}
- p.recipient <- &serverRecv{t, ch, p.id, p.data}
+ p.recipient <- &nodeRecv{t, ch, p.id, p.data}
return true
})
// Buffer the reply channel and make sure to close it when all sessions have
// closed their channel (i.e. sent a 'nil')
if chnum > 0 {
- go serverBufReply(chout, n.ch, chnum)
+ go nodeBufReply(chout, n.ch, chnum)
} else if n.ch != nil {
close(n.ch)
}
}
// TODO: BUFFER!
-func serverBufReply(in chan Tuple, out chan<- Tuple, num int) {
+func nodeBufReply(in chan Tuple, out chan<- Tuple, num int) {
for i := range in {
if i == nil {
num--
diff --git a/proto.go b/go/proto.go
index 4e64237..74a8161 100644
--- a/proto.go
+++ b/go/proto.go
@@ -1,9 +1,11 @@
+// TODO: Rename "proto" -> "link" and update with latest protocol.
+
/* Usage:
-serv = ServerCreate()
+node = NodeCreate()
// The passed net.Conn will be closed automatically.
-conn = serv.ProtoLink(net.Conn, false)
+conn = node.ProtoLink(net.Conn, false)
// Blocks
err := conn.Run()
// In some other goroutine, if necessary
@@ -12,7 +14,7 @@ conn.Close()
// Listener can be implemented on top of that:
if l, _ := net.ListenUnix("unix", "/tmp/blah.sock"); l != nil {
for c, _ := l.Accept(); c != nil {
- conn := serv.ProtoLink(c, true)
+ conn := node.ProtoLink(c, true)
go conn.Run()
}
}
@@ -75,7 +77,7 @@ var protoSerList = [...]protoSerItem{
}
type ProtoConn struct {
- s *Server
+ s *Node
c net.Conn
r *bufio.Reader
serv bool
@@ -84,7 +86,7 @@ type ProtoConn struct {
io protoSerIO
}
-func (s *Server) ProtoLink(c net.Conn, serv bool) *ProtoConn {
+func (s *Node) ProtoLink(c net.Conn, serv bool) *ProtoConn {
r := bufio.NewReader(c)
cl := make(chan int)
return &ProtoConn{s, c, r, serv, cl, 0, nil}
@@ -97,7 +99,7 @@ func (p *ProtoConn) Run() error {
return err
}
- // Make sure the server stays alive while we're running
+ // Make sure the node stays alive while we're running
p.s.ref <- 1
defer func() { p.s.ref <- -1 }()
@@ -155,7 +157,7 @@ func (p *ProtoConn) handshake() error {
var err error = nil
if p.serv {
- err = p.handshakeServer()
+ err = p.handshakeNode()
} else {
err = p.handshakeClient()
}
@@ -218,7 +220,7 @@ func protoGetArgPrefix(a []string, p string) string {
return ""
}
-func (p *ProtoConn) handshakeServer() error {
+func (p *ProtoConn) handshakeNode() error {
// Send our hello
hello := []byte("ver,1.0 ser,")
for _, i := range protoSerList {
@@ -246,7 +248,7 @@ func (p *ProtoConn) handshakeServer() error {
}
func (p *ProtoConn) handshakeClient() error {
- // Receive and parse server hello
+ // Receive and parse node hello
msg, err := p.r.ReadString('\n')
if err != nil {
return err
@@ -304,10 +306,10 @@ func (p *ProtoConn) reader(out chan<- interface{}, err chan<- error) {
type routerOwnId *int
func (p *ProtoConn) routerPatterns(self routerOwnId, out chan<- interface{}) {
- // Register pattern registration hook with the server
- s_reg := make(chan *serverRegister, 2)
+ // Register pattern registration hook with the node
+ s_reg := make(chan *nodeRegister, 2)
s_unreg := make(chan PatternReg, 2)
- s_hook := &serverHook{s_reg, s_unreg, true}
+ s_hook := &nodeHook{s_reg, s_unreg, true}
p.s.hook <- s_hook
running := true
@@ -339,7 +341,7 @@ func (p *ProtoConn) routerPatterns(self routerOwnId, out chan<- interface{}) {
}
}
-func (p *ProtoConn) routerTuples(tuples <-chan *serverRecv, out chan<- interface{}) {
+func (p *ProtoConn) routerTuples(tuples <-chan *nodeRecv, out chan<- interface{}) {
running := true
for running {
select {
@@ -363,7 +365,7 @@ func (p *ProtoConn) routerTuples(tuples <-chan *serverRecv, out chan<- interface
}
}
-func (p *ProtoConn) routerMessages(self routerOwnId, in <-chan interface{}, tuples chan<- *serverRecv) {
+func (p *ProtoConn) routerMessages(self routerOwnId, in <-chan interface{}, tuples chan<- *nodeRecv) {
// Patterns that the connected party has registered for
// (remote ID -> local ID)
reg := make(map[PatternReg]PatternReg)
@@ -372,7 +374,7 @@ func (p *ProtoConn) routerMessages(self routerOwnId, in <-chan interface{}, tupl
switch m := msg.(type) {
case *protoRegister:
ret := make(chan PatternReg)
- p.s.reg <- &serverRegister{&serverPattern{tuples, true, 0, self}, m.Pattern, ret}
+ p.s.reg <- &nodeRegister{&nodePattern{tuples, true, 0, self}, m.Pattern, ret}
reg[m.Pid] = <-ret
case *protoUnregister:
if n, e := reg[m.Pid]; e {
@@ -400,11 +402,11 @@ func (p *ProtoConn) router(in <-chan interface{}, out chan<- interface{}) {
var ownid int
// Channel for incoming tuples (may be buffered)
- tuples := make(chan *serverRecv, 2)
+ tuples := make(chan *nodeRecv, 2)
- // Route (un)register actions from the server to the network
+ // Route (un)register actions from the node to the network
go p.routerPatterns(routerOwnId(&ownid), out)
- // Route tuples from the server to the network
+ // Route tuples from the node to the network
go p.routerTuples(tuples, out)
// Handle incoming messages
diff --git a/proto_gob.go b/go/proto_gob.go
index afa4264..afa4264 100644
--- a/proto_gob.go
+++ b/go/proto_gob.go
diff --git a/proto_test.go b/go/proto_test.go
index db67500..d53aa4a 100644
--- a/proto_test.go
+++ b/go/proto_test.go
@@ -4,8 +4,8 @@ import "testing"
import "net"
func Testdebug(t *testing.T) {
- ss := ServerCreate()
- sc := ServerCreate()
+ ss := NodeCreate()
+ sc := NodeCreate()
serv, cli := net.Pipe()
cs := ss.ProtoLink(serv, true)
cc := sc.ProtoLink(cli, false)
diff --git a/session.go b/go/session.go
index 63e1606..616a78e 100644
--- a/session.go
+++ b/go/session.go
@@ -8,11 +8,11 @@ import "container/list"
// returns.
// TODO: Warn/error somehow when the buffer exceeds some maximum?
// TODO: This function can be optimized. Probably.
-func sessionRecvQueue(r <-chan *serverRecv, s chan<- *serverRecv, p <-chan PatternReg) {
+func sessionRecvQueue(r <-chan *nodeRecv, s chan<- *nodeRecv, p <-chan PatternReg) {
buf := list.New()
rem := func(n PatternReg) {
for i := buf.Front(); i != nil; {
- re := i.Value.(*serverRecv)
+ re := i.Value.(*nodeRecv)
if re.id == n {
// Make sure to close a reply channel if there is one
if re.ch != nil {
@@ -28,12 +28,12 @@ func sessionRecvQueue(r <-chan *serverRecv, s chan<- *serverRecv, p <-chan Patte
}
running := true
for running {
- var send chan<- *serverRecv
- var msg *serverRecv
+ var send chan<- *nodeRecv
+ var msg *nodeRecv
n := buf.Front()
if n != nil {
send = s
- msg = n.Value.(*serverRecv)
+ msg = n.Value.(*nodeRecv)
}
select {
case send <- msg:
@@ -51,7 +51,7 @@ func sessionRecvQueue(r <-chan *serverRecv, s chan<- *serverRecv, p <-chan Patte
// Close all reply channels for buffered messages, these will be discarded
// and the session will not reply anymore.
for i := buf.Front(); i != nil; i = i.Next() {
- re := i.Value.(*serverRecv)
+ re := i.Value.(*nodeRecv)
if re.ch != nil {
close(re.ch)
}
@@ -60,37 +60,37 @@ func sessionRecvQueue(r <-chan *serverRecv, s chan<- *serverRecv, p <-chan Patte
}
type Session struct {
- serv *Server
+ node *Node
patterns map[PatternReg]bool
- recipient chan<- *serverRecv
- reader <-chan *serverRecv
+ recipient chan<- *nodeRecv
+ reader <-chan *nodeRecv
purge chan<- PatternReg
}
-func (s *Server) Session() *Session {
+func (s *Node) Session() *Session {
// Buffering the recipient channel gives a ~5% performance improvement, but
// doesn't affect correctness. A small buffer appears to be enough.
- recipient := make(chan *serverRecv, 5)
+ recipient := make(chan *nodeRecv, 5)
// Reader and buffer channels must be synchronous.
- reader := make(chan *serverRecv)
+ reader := make(chan *nodeRecv)
purge := make(chan PatternReg)
se := &Session{s, make(map[PatternReg]bool), recipient, reader, purge}
- se.serv.ref <- 1
+ se.node.ref <- 1
go sessionRecvQueue(recipient, reader, purge)
return se
}
func (s *Session) Send(t ...interface{}) {
- s.serv.Send(t...)
+ s.node.Send(t...)
}
func (s *Session) Request(t ...interface{}) <-chan Tuple {
- return s.serv.Request(t...)
+ return s.node.Request(t...)
}
func (s *Session) registerRaw(willReply bool, f interface{}, p Pattern) PatternReg {
ret := make(chan PatternReg)
- s.serv.reg <- &serverRegister{&serverPattern{s.recipient, willReply, 0, f}, p, ret}
+ s.node.reg <- &nodeRegister{&nodePattern{s.recipient, willReply, 0, f}, p, ret}
r := <-ret
s.patterns[r] = true
return r
@@ -106,13 +106,13 @@ func (s *Session) Response(f func(Tuple, chan<- Tuple) bool, p ...interface{}) P
func (s *Session) Unregister(id PatternReg) {
if s.patterns[id] {
- s.serv.unreg <- id
+ s.node.unreg <- id
s.purge <- id
delete(s.patterns, id)
}
}
-func (s *Session) dispatch(msg *serverRecv) {
+func (s *Session) dispatch(msg *nodeRecv) {
keep := true
switch f := msg.data.(type) {
case func(Tuple) bool:
@@ -145,9 +145,9 @@ func (s *Session) Run() {
func (s *Session) Close() {
for i, _ := range s.patterns {
- s.serv.unreg <- i
+ s.node.unreg <- i
}
s.patterns = nil
close(s.purge)
- s.serv.ref <- -1
+ s.node.ref <- -1
}
diff --git a/session_test.go b/go/session_test.go
index 9773378..ca55cf0 100644
--- a/session_test.go
+++ b/go/session_test.go
@@ -4,16 +4,16 @@ import "testing"
import "reflect"
func TestSess(tst *testing.T) {
- serv := ServerCreate()
- if serv == nil {
- tst.Errorf("ServerCreate() returned nil")
+ node := NodeCreate()
+ if node == nil {
+ tst.Errorf("NodeCreate() returned nil")
}
done := make(chan int)
go func() {
next := 0
havequit := false
- ses := serv.Session()
+ ses := node.Session()
if ses == nil {
tst.Errorf("Session()(1) returned nil")
}
@@ -56,14 +56,14 @@ func TestSess(tst *testing.T) {
<-done
go func() {
- ses := serv.Session()
+ ses := node.Session()
if ses == nil {
tst.Errorf("Session()(2) returned nil")
}
for i := 0; i < 10; i++ {
ses.Send("test", i)
}
- serv.Send("quit") // serv should also work
+ node.Send("quit") // node should also work
ses.Send("last")
ses.Close()
done <- 1
@@ -71,7 +71,7 @@ func TestSess(tst *testing.T) {
<-done
<-done
- serv.Close()
+ node.Close()
}
// Receive a tuple sent from the same session, from a different goroutine,
@@ -79,12 +79,12 @@ func TestSess(tst *testing.T) {
// that the function doesn't deadlock.
func TestSessOwn(t *testing.T) {
wait := make(chan int)
- serv := ServerCreate()
- ses := serv.Session()
+ node := NodeCreate()
+ ses := node.Session()
- // We don't use the serv object directly anymore, but it should still stay
+ // We don't use the node object directly anymore, but it should still stay
// alive. (Deadlock occurs if it doesn't)
- serv.Close()
+ node.Close()
ses.Register(func(t Tuple) bool {
switch t[0].(int) {
@@ -108,9 +108,9 @@ func TestSessOwn(t *testing.T) {
// A simple request-response test
func TestSessRequest(tst *testing.T) {
done := make(chan int)
- serv := ServerCreate()
+ node := NodeCreate()
- sa := serv.Session()
+ sa := node.Session()
sa.Response(func(t Tuple, c chan<- Tuple) bool {
c <- Tuple{"a", t[0], 1}
c <- Tuple{"a", t[0], 2}
@@ -124,7 +124,7 @@ func TestSessRequest(tst *testing.T) {
}()
sb_have := false
- sb := serv.Session()
+ sb := node.Session()
sb.Response(func(t Tuple, c chan<- Tuple) bool {
c <- Tuple{"b", t[0], 1}
c <- Tuple{"b", t[0], 2}
@@ -142,7 +142,7 @@ func TestSessRequest(tst *testing.T) {
done <- 1
}()
- ses := serv.Session()
+ ses := node.Session()
// Should be able to handle multiple requests simultaneously.
zero := ses.Request()
one := ses.Request(1)
@@ -200,15 +200,15 @@ func TestSessRequest(tst *testing.T) {
<-done
<-done
- serv.Close()
+ node.Close()
}
// TODO: test with several (temporary) sessions
// A simple benchmark with only a single session and Pattern
func BenchmarkSessSingle(b *testing.B) {
- serv := ServerCreate()
- ses := serv.Session()
+ node := NodeCreate()
+ ses := node.Session()
ses.Register(func(t Tuple) bool {
if t[0].(int) == 0 {
ses.Close()
@@ -219,15 +219,15 @@ func BenchmarkSessSingle(b *testing.B) {
})
ses.Send(b.N - 1)
ses.Run()
- serv.Close()
+ node.Close()
}
// Similar to BechmarkSessSingle, but with two sessions
func BenchmarkSessDouble(b *testing.B) {
- serv := ServerCreate()
+ node := NodeCreate()
done := make(chan int)
- sa := serv.Session()
+ sa := node.Session()
sa.Register(func(t Tuple) bool {
if t[0].(int) == 0 {
sa.Close()
@@ -239,14 +239,14 @@ func BenchmarkSessDouble(b *testing.B) {
done <- 1
}()
- sb := serv.Session()
+ sb := node.Session()
for i := b.N - 1; i >= 0; i-- {
sb.Send(i)
}
sb.Close()
<-done
- serv.Close()
+ node.Close()
}
/*
@@ -269,5 +269,5 @@ Some time afterwards (possibly from another session):
The panic() may not be reached. When "foo" is received by session A, the "bar"
Tuple may have (and with a high probability, has) already been processed by the
-server, at which time session A had no registration for "bar".
+node, at which time session A had no registration for "bar".
*/