diff options
author | Yorhel <git@yorhel.nl> | 2012-02-23 09:58:40 +0100 |
---|---|---|
committer | Yorhel <git@yorhel.nl> | 2012-02-23 09:58:40 +0100 |
commit | c330a85ef90437801044969d8485cdfc2f69fb5e (patch) | |
tree | 2e33e12bc1587f13b99a88409e5b2433d57e1e47 | |
parent | 1a5213104b9ac47eb15cdc6c3858cdacdff8fe0a (diff) |
go: Moved to go/ dir and s/server/node/g
-rw-r--r-- | go/matching.go (renamed from matching.go) | 0 | ||||
-rw-r--r-- | go/matching_test.go (renamed from matching_test.go) | 0 | ||||
-rw-r--r-- | go/node.go (renamed from server.go) | 70 | ||||
-rw-r--r-- | go/proto.go (renamed from proto.go) | 38 | ||||
-rw-r--r-- | go/proto_gob.go (renamed from proto_gob.go) | 0 | ||||
-rw-r--r-- | go/proto_test.go (renamed from proto_test.go) | 4 | ||||
-rw-r--r-- | go/session.go (renamed from session.go) | 40 | ||||
-rw-r--r-- | go/session_test.go (renamed from session_test.go) | 48 |
8 files changed, 101 insertions, 99 deletions
diff --git a/matching.go b/go/matching.go index b319842..b319842 100644 --- a/matching.go +++ b/go/matching.go diff --git a/matching_test.go b/go/matching_test.go index f01aaf4..f01aaf4 100644 --- a/matching_test.go +++ b/go/matching_test.go @@ -2,67 +2,67 @@ package tanja type PatternReg int32 -type Server struct { +type Node struct { ref chan<- int - reg chan<- *serverRegister + reg chan<- *nodeRegister unreg chan<- PatternReg - snd chan<- *serverSend - hook chan<- *serverHook - unhook chan<- *serverHook // must be same pointer as sent with hook + snd chan<- *nodeSend + hook chan<- *nodeHook + unhook chan<- *nodeHook // must be same pointer as sent with hook } -func ServerCreate() *Server { +func NodeCreate() *Node { ref := make(chan int) - reg := make(chan *serverRegister) + reg := make(chan *nodeRegister) unreg := make(chan PatternReg) - snd := make(chan *serverSend) - hook := make(chan *serverHook) - unhook := make(chan *serverHook) - go serverRouter(ref, reg, unreg, snd, hook, unhook) - return &Server{ref, reg, unreg, snd, hook, unhook} + snd := make(chan *nodeSend) + hook := make(chan *nodeHook) + unhook := make(chan *nodeHook) + go nodeRouter(ref, reg, unreg, snd, hook, unhook) + return &Node{ref, reg, unreg, snd, hook, unhook} } -func (s *Server) Send(t ...interface{}) { - s.snd <- &serverSend{Tuple(t), nil} +func (s *Node) Send(t ...interface{}) { + s.snd <- &nodeSend{Tuple(t), nil} } -func (s *Server) Request(t ...interface{}) <-chan Tuple { +func (s *Node) Request(t ...interface{}) <-chan Tuple { ch := make(chan Tuple, 100) // TODO: make sure this is buffered - s.snd <- &serverSend{Tuple(t), ch} + s.snd <- &nodeSend{Tuple(t), ch} return ch } -func (s *Server) Close() { +func (s *Node) Close() { s.ref <- -1 } -type serverSend struct { +type nodeSend struct { t Tuple ch chan<- Tuple } -type serverRecv struct { +type nodeRecv struct { t Tuple ch chan<- Tuple id PatternReg data interface{} } -type serverPattern struct { - recipient chan<- *serverRecv +type nodePattern struct { + recipient chan<- *nodeRecv willReply bool id PatternReg data interface{} } -type serverRegister struct { - dat *serverPattern +type nodeRegister struct { + dat *nodePattern pat Pattern reply chan<- PatternReg } -type serverHook struct { - reg chan<- *serverRegister +type nodeHook struct { + reg chan<- *nodeRegister unreg chan<- PatternReg list bool // whether or not to send a list of all current patterns to reg } @@ -75,12 +75,12 @@ func (i *PatternReg) inc() { } // Manages a Matcher object and routes messages to sessions -func serverRouter(ref <-chan int, reg <-chan *serverRegister, unreg <-chan PatternReg, snd <-chan *serverSend, hook, unhook <-chan *serverHook) { +func nodeRouter(ref <-chan int, reg <-chan *nodeRegister, unreg <-chan PatternReg, snd <-chan *nodeSend, hook, unhook <-chan *nodeHook) { p := make(map[PatternReg]*MatcherItem) lastId := PatternReg(1) m := MatcherCreate() - h := make(map[*serverHook]bool) - clients := 1 // The initial client is the *Server object itself + h := make(map[*nodeHook]bool) + clients := 1 // The initial client is the *Node object itself for clients > 0 { select { case n := <-ref: @@ -106,12 +106,12 @@ func serverRouter(ref <-chan int, reg <-chan *serverRegister, unreg <-chan Patte delete(p, n) } case n := <-snd: - serverDeliver(m, n) + nodeDeliver(m, n) case n := <-hook: h[n] = true if n.list { for _, v := range p { - n.reg <- &serverRegister{v.Data.(*serverPattern), v.Pattern, nil} + n.reg <- &nodeRegister{v.Data.(*nodePattern), v.Pattern, nil} } } case n := <-unhook: @@ -120,12 +120,12 @@ func serverRouter(ref <-chan int, reg <-chan *serverRegister, unreg <-chan Patte } } -func serverDeliver(m *Matcher, n *serverSend) { +func nodeDeliver(m *Matcher, n *nodeSend) { var chout chan Tuple chnum := 0 m.With(n.t, func(i *MatcherItem, t Tuple) bool { - p := i.Data.(*serverPattern) + p := i.Data.(*nodePattern) // Create a reply channel if there is one that will reply var ch chan Tuple @@ -144,21 +144,21 @@ func serverDeliver(m *Matcher, n *serverSend) { }(ch, chout) } - p.recipient <- &serverRecv{t, ch, p.id, p.data} + p.recipient <- &nodeRecv{t, ch, p.id, p.data} return true }) // Buffer the reply channel and make sure to close it when all sessions have // closed their channel (i.e. sent a 'nil') if chnum > 0 { - go serverBufReply(chout, n.ch, chnum) + go nodeBufReply(chout, n.ch, chnum) } else if n.ch != nil { close(n.ch) } } // TODO: BUFFER! -func serverBufReply(in chan Tuple, out chan<- Tuple, num int) { +func nodeBufReply(in chan Tuple, out chan<- Tuple, num int) { for i := range in { if i == nil { num-- @@ -1,9 +1,11 @@ +// TODO: Rename "proto" -> "link" and update with latest protocol. + /* Usage: -serv = ServerCreate() +node = NodeCreate() // The passed net.Conn will be closed automatically. -conn = serv.ProtoLink(net.Conn, false) +conn = node.ProtoLink(net.Conn, false) // Blocks err := conn.Run() // In some other goroutine, if necessary @@ -12,7 +14,7 @@ conn.Close() // Listener can be implemented on top of that: if l, _ := net.ListenUnix("unix", "/tmp/blah.sock"); l != nil { for c, _ := l.Accept(); c != nil { - conn := serv.ProtoLink(c, true) + conn := node.ProtoLink(c, true) go conn.Run() } } @@ -75,7 +77,7 @@ var protoSerList = [...]protoSerItem{ } type ProtoConn struct { - s *Server + s *Node c net.Conn r *bufio.Reader serv bool @@ -84,7 +86,7 @@ type ProtoConn struct { io protoSerIO } -func (s *Server) ProtoLink(c net.Conn, serv bool) *ProtoConn { +func (s *Node) ProtoLink(c net.Conn, serv bool) *ProtoConn { r := bufio.NewReader(c) cl := make(chan int) return &ProtoConn{s, c, r, serv, cl, 0, nil} @@ -97,7 +99,7 @@ func (p *ProtoConn) Run() error { return err } - // Make sure the server stays alive while we're running + // Make sure the node stays alive while we're running p.s.ref <- 1 defer func() { p.s.ref <- -1 }() @@ -155,7 +157,7 @@ func (p *ProtoConn) handshake() error { var err error = nil if p.serv { - err = p.handshakeServer() + err = p.handshakeNode() } else { err = p.handshakeClient() } @@ -218,7 +220,7 @@ func protoGetArgPrefix(a []string, p string) string { return "" } -func (p *ProtoConn) handshakeServer() error { +func (p *ProtoConn) handshakeNode() error { // Send our hello hello := []byte("ver,1.0 ser,") for _, i := range protoSerList { @@ -246,7 +248,7 @@ func (p *ProtoConn) handshakeServer() error { } func (p *ProtoConn) handshakeClient() error { - // Receive and parse server hello + // Receive and parse node hello msg, err := p.r.ReadString('\n') if err != nil { return err @@ -304,10 +306,10 @@ func (p *ProtoConn) reader(out chan<- interface{}, err chan<- error) { type routerOwnId *int func (p *ProtoConn) routerPatterns(self routerOwnId, out chan<- interface{}) { - // Register pattern registration hook with the server - s_reg := make(chan *serverRegister, 2) + // Register pattern registration hook with the node + s_reg := make(chan *nodeRegister, 2) s_unreg := make(chan PatternReg, 2) - s_hook := &serverHook{s_reg, s_unreg, true} + s_hook := &nodeHook{s_reg, s_unreg, true} p.s.hook <- s_hook running := true @@ -339,7 +341,7 @@ func (p *ProtoConn) routerPatterns(self routerOwnId, out chan<- interface{}) { } } -func (p *ProtoConn) routerTuples(tuples <-chan *serverRecv, out chan<- interface{}) { +func (p *ProtoConn) routerTuples(tuples <-chan *nodeRecv, out chan<- interface{}) { running := true for running { select { @@ -363,7 +365,7 @@ func (p *ProtoConn) routerTuples(tuples <-chan *serverRecv, out chan<- interface } } -func (p *ProtoConn) routerMessages(self routerOwnId, in <-chan interface{}, tuples chan<- *serverRecv) { +func (p *ProtoConn) routerMessages(self routerOwnId, in <-chan interface{}, tuples chan<- *nodeRecv) { // Patterns that the connected party has registered for // (remote ID -> local ID) reg := make(map[PatternReg]PatternReg) @@ -372,7 +374,7 @@ func (p *ProtoConn) routerMessages(self routerOwnId, in <-chan interface{}, tupl switch m := msg.(type) { case *protoRegister: ret := make(chan PatternReg) - p.s.reg <- &serverRegister{&serverPattern{tuples, true, 0, self}, m.Pattern, ret} + p.s.reg <- &nodeRegister{&nodePattern{tuples, true, 0, self}, m.Pattern, ret} reg[m.Pid] = <-ret case *protoUnregister: if n, e := reg[m.Pid]; e { @@ -400,11 +402,11 @@ func (p *ProtoConn) router(in <-chan interface{}, out chan<- interface{}) { var ownid int // Channel for incoming tuples (may be buffered) - tuples := make(chan *serverRecv, 2) + tuples := make(chan *nodeRecv, 2) - // Route (un)register actions from the server to the network + // Route (un)register actions from the node to the network go p.routerPatterns(routerOwnId(&ownid), out) - // Route tuples from the server to the network + // Route tuples from the node to the network go p.routerTuples(tuples, out) // Handle incoming messages diff --git a/proto_gob.go b/go/proto_gob.go index afa4264..afa4264 100644 --- a/proto_gob.go +++ b/go/proto_gob.go diff --git a/proto_test.go b/go/proto_test.go index db67500..d53aa4a 100644 --- a/proto_test.go +++ b/go/proto_test.go @@ -4,8 +4,8 @@ import "testing" import "net" func Testdebug(t *testing.T) { - ss := ServerCreate() - sc := ServerCreate() + ss := NodeCreate() + sc := NodeCreate() serv, cli := net.Pipe() cs := ss.ProtoLink(serv, true) cc := sc.ProtoLink(cli, false) diff --git a/session.go b/go/session.go index 63e1606..616a78e 100644 --- a/session.go +++ b/go/session.go @@ -8,11 +8,11 @@ import "container/list" // returns. // TODO: Warn/error somehow when the buffer exceeds some maximum? // TODO: This function can be optimized. Probably. -func sessionRecvQueue(r <-chan *serverRecv, s chan<- *serverRecv, p <-chan PatternReg) { +func sessionRecvQueue(r <-chan *nodeRecv, s chan<- *nodeRecv, p <-chan PatternReg) { buf := list.New() rem := func(n PatternReg) { for i := buf.Front(); i != nil; { - re := i.Value.(*serverRecv) + re := i.Value.(*nodeRecv) if re.id == n { // Make sure to close a reply channel if there is one if re.ch != nil { @@ -28,12 +28,12 @@ func sessionRecvQueue(r <-chan *serverRecv, s chan<- *serverRecv, p <-chan Patte } running := true for running { - var send chan<- *serverRecv - var msg *serverRecv + var send chan<- *nodeRecv + var msg *nodeRecv n := buf.Front() if n != nil { send = s - msg = n.Value.(*serverRecv) + msg = n.Value.(*nodeRecv) } select { case send <- msg: @@ -51,7 +51,7 @@ func sessionRecvQueue(r <-chan *serverRecv, s chan<- *serverRecv, p <-chan Patte // Close all reply channels for buffered messages, these will be discarded // and the session will not reply anymore. for i := buf.Front(); i != nil; i = i.Next() { - re := i.Value.(*serverRecv) + re := i.Value.(*nodeRecv) if re.ch != nil { close(re.ch) } @@ -60,37 +60,37 @@ func sessionRecvQueue(r <-chan *serverRecv, s chan<- *serverRecv, p <-chan Patte } type Session struct { - serv *Server + node *Node patterns map[PatternReg]bool - recipient chan<- *serverRecv - reader <-chan *serverRecv + recipient chan<- *nodeRecv + reader <-chan *nodeRecv purge chan<- PatternReg } -func (s *Server) Session() *Session { +func (s *Node) Session() *Session { // Buffering the recipient channel gives a ~5% performance improvement, but // doesn't affect correctness. A small buffer appears to be enough. - recipient := make(chan *serverRecv, 5) + recipient := make(chan *nodeRecv, 5) // Reader and buffer channels must be synchronous. - reader := make(chan *serverRecv) + reader := make(chan *nodeRecv) purge := make(chan PatternReg) se := &Session{s, make(map[PatternReg]bool), recipient, reader, purge} - se.serv.ref <- 1 + se.node.ref <- 1 go sessionRecvQueue(recipient, reader, purge) return se } func (s *Session) Send(t ...interface{}) { - s.serv.Send(t...) + s.node.Send(t...) } func (s *Session) Request(t ...interface{}) <-chan Tuple { - return s.serv.Request(t...) + return s.node.Request(t...) } func (s *Session) registerRaw(willReply bool, f interface{}, p Pattern) PatternReg { ret := make(chan PatternReg) - s.serv.reg <- &serverRegister{&serverPattern{s.recipient, willReply, 0, f}, p, ret} + s.node.reg <- &nodeRegister{&nodePattern{s.recipient, willReply, 0, f}, p, ret} r := <-ret s.patterns[r] = true return r @@ -106,13 +106,13 @@ func (s *Session) Response(f func(Tuple, chan<- Tuple) bool, p ...interface{}) P func (s *Session) Unregister(id PatternReg) { if s.patterns[id] { - s.serv.unreg <- id + s.node.unreg <- id s.purge <- id delete(s.patterns, id) } } -func (s *Session) dispatch(msg *serverRecv) { +func (s *Session) dispatch(msg *nodeRecv) { keep := true switch f := msg.data.(type) { case func(Tuple) bool: @@ -145,9 +145,9 @@ func (s *Session) Run() { func (s *Session) Close() { for i, _ := range s.patterns { - s.serv.unreg <- i + s.node.unreg <- i } s.patterns = nil close(s.purge) - s.serv.ref <- -1 + s.node.ref <- -1 } diff --git a/session_test.go b/go/session_test.go index 9773378..ca55cf0 100644 --- a/session_test.go +++ b/go/session_test.go @@ -4,16 +4,16 @@ import "testing" import "reflect" func TestSess(tst *testing.T) { - serv := ServerCreate() - if serv == nil { - tst.Errorf("ServerCreate() returned nil") + node := NodeCreate() + if node == nil { + tst.Errorf("NodeCreate() returned nil") } done := make(chan int) go func() { next := 0 havequit := false - ses := serv.Session() + ses := node.Session() if ses == nil { tst.Errorf("Session()(1) returned nil") } @@ -56,14 +56,14 @@ func TestSess(tst *testing.T) { <-done go func() { - ses := serv.Session() + ses := node.Session() if ses == nil { tst.Errorf("Session()(2) returned nil") } for i := 0; i < 10; i++ { ses.Send("test", i) } - serv.Send("quit") // serv should also work + node.Send("quit") // node should also work ses.Send("last") ses.Close() done <- 1 @@ -71,7 +71,7 @@ func TestSess(tst *testing.T) { <-done <-done - serv.Close() + node.Close() } // Receive a tuple sent from the same session, from a different goroutine, @@ -79,12 +79,12 @@ func TestSess(tst *testing.T) { // that the function doesn't deadlock. func TestSessOwn(t *testing.T) { wait := make(chan int) - serv := ServerCreate() - ses := serv.Session() + node := NodeCreate() + ses := node.Session() - // We don't use the serv object directly anymore, but it should still stay + // We don't use the node object directly anymore, but it should still stay // alive. (Deadlock occurs if it doesn't) - serv.Close() + node.Close() ses.Register(func(t Tuple) bool { switch t[0].(int) { @@ -108,9 +108,9 @@ func TestSessOwn(t *testing.T) { // A simple request-response test func TestSessRequest(tst *testing.T) { done := make(chan int) - serv := ServerCreate() + node := NodeCreate() - sa := serv.Session() + sa := node.Session() sa.Response(func(t Tuple, c chan<- Tuple) bool { c <- Tuple{"a", t[0], 1} c <- Tuple{"a", t[0], 2} @@ -124,7 +124,7 @@ func TestSessRequest(tst *testing.T) { }() sb_have := false - sb := serv.Session() + sb := node.Session() sb.Response(func(t Tuple, c chan<- Tuple) bool { c <- Tuple{"b", t[0], 1} c <- Tuple{"b", t[0], 2} @@ -142,7 +142,7 @@ func TestSessRequest(tst *testing.T) { done <- 1 }() - ses := serv.Session() + ses := node.Session() // Should be able to handle multiple requests simultaneously. zero := ses.Request() one := ses.Request(1) @@ -200,15 +200,15 @@ func TestSessRequest(tst *testing.T) { <-done <-done - serv.Close() + node.Close() } // TODO: test with several (temporary) sessions // A simple benchmark with only a single session and Pattern func BenchmarkSessSingle(b *testing.B) { - serv := ServerCreate() - ses := serv.Session() + node := NodeCreate() + ses := node.Session() ses.Register(func(t Tuple) bool { if t[0].(int) == 0 { ses.Close() @@ -219,15 +219,15 @@ func BenchmarkSessSingle(b *testing.B) { }) ses.Send(b.N - 1) ses.Run() - serv.Close() + node.Close() } // Similar to BechmarkSessSingle, but with two sessions func BenchmarkSessDouble(b *testing.B) { - serv := ServerCreate() + node := NodeCreate() done := make(chan int) - sa := serv.Session() + sa := node.Session() sa.Register(func(t Tuple) bool { if t[0].(int) == 0 { sa.Close() @@ -239,14 +239,14 @@ func BenchmarkSessDouble(b *testing.B) { done <- 1 }() - sb := serv.Session() + sb := node.Session() for i := b.N - 1; i >= 0; i-- { sb.Send(i) } sb.Close() <-done - serv.Close() + node.Close() } /* @@ -269,5 +269,5 @@ Some time afterwards (possibly from another session): The panic() may not be reached. When "foo" is received by session A, the "bar" Tuple may have (and with a high probability, has) already been processed by the -server, at which time session A had no registration for "bar". +node, at which time session A had no registration for "bar". */ |