diff options
author | Yorhel <git@yorhel.nl> | 2012-02-04 16:08:31 +0100 |
---|---|---|
committer | Yorhel <git@yorhel.nl> | 2012-02-04 16:08:31 +0100 |
commit | e1954e1054ecd1e98a61381eef9398be32a1290e (patch) | |
tree | 78393a697b7ea444c01d3fd388348e06223b292a | |
parent | 9104163cf286b7a7d30679a62ddf3f8d7c0e7d2f (diff) |
Continued on proto.go
But there's still a lot of TODO's left.
-rw-r--r-- | matching.go | 6 | ||||
-rw-r--r-- | proto.go | 149 | ||||
-rw-r--r-- | server.go | 3 |
3 files changed, 129 insertions, 29 deletions
diff --git a/matching.go b/matching.go index 6a3d751..b319842 100644 --- a/matching.go +++ b/matching.go @@ -144,12 +144,14 @@ func (m *Matcher) Remove(i *MatcherItem) { } } -func (m *Matcher) With(t Tuple, f func(*MatcherItem, Tuple)) int { +func (m *Matcher) With(t Tuple, f func(*MatcherItem, Tuple) bool) int { num := 0 for n := range *m { if nt := t.Match((*m)[n].Pattern); nt != nil { num++ - f((*m)[n], nt) + if !f((*m)[n], nt) { + return num + } } } return num @@ -37,26 +37,26 @@ type Error struct { func (err *Error) Error() string { return err.ErrorString } type protoRegister struct { - pid int + pid PatternReg pattern Pattern } type protoUnregister struct { - pid int + pid PatternReg } type protoTuple struct { - tid int + tid PatternReg tuple Tuple } type protoResponse struct { - tid int + tid PatternReg pattern Pattern } type protoClose struct { - tid int + tid PatternReg } type protoSerIO interface { @@ -75,7 +75,7 @@ var protoSerList = [...]protoSerItem{ } type ProtoConn struct { - s *Session + s *Server c net.Conn r *bufio.Reader serv bool @@ -86,27 +86,59 @@ type ProtoConn struct { func (s *Server) ProtoLink(c net.Conn, serv bool) *ProtoConn { r := bufio.NewReader(c) - cl := make(chan int, 5) - return &ProtoConn{s.Session(), c, r, serv, cl, 0, nil} + cl := make(chan int) + return &ProtoConn{s, c, r, serv, cl, 0, nil} } func (p *ProtoConn) Run() error { err := p.handshake() if err != nil { p.Close() + return err } - return err - // TODO: do the rest as well + + // Make sure the server stays alive while we're running + p.s.ref <- 1 + defer func() { p.s.ref <- -1 }() + + // Spawn I/O helpers + ech := make(chan error, 2) + rd := make(chan interface{}) + wr := make(chan interface{}, 10) // TODO: INFINITE BUFFER! + go p.reader(rd, ech) + go p.writer(wr, ech) + + // Run the router + p.router(rd, wr) + + // Get the first I/O error and return that + select { + case err = <-ech: + return err + default: + return nil + } + return nil } func (p *ProtoConn) Close() { - // This should wake up the reader goroutine. - // The writer goroutine will also wake up if it is in a Write() call. - p.c.Close() - // The writer goroutine may block on receiving Tuples via a channel, wake - // that up as well. - p.closed <- 1 - p.s.Close() + // Close() may be called multiple times, handle that. + select { + case <-p.closed: + return + default: + // This should wake up the reader goroutine. + // The writer goroutine will also wake up if it is in a Write() call. + p.c.Close() + // The writer/router goroutine may block in a select, wake those up as + // well. This should also prevent a second call to Close() from doing + // anything. + close(p.closed) + } + // Note that there is a race condition in the above code when two + // goroutines call Close() at the same time. I suspect that this situation + // only happens *after* one Close() has already been called, so we should + // be safe. } // -- @@ -236,18 +268,83 @@ func (p *ProtoConn) handshakeClient() error { return err } -// -- +// --- -// Writer +// I/O handlers +// TODO: timeouts -func (p *ProtoConn) writer() error { - return nil +func (p *ProtoConn) writer(in <-chan interface{}, err chan<- error) { + select { + case <-p.closed: + return + case msg := <-in: + if e := p.io.send(msg); e != nil { + p.Close() + err <- e + return + } + } } -// -- - -// Reader +func (p *ProtoConn) reader(out chan<- interface{}, err chan<- error) { + var r interface{} + var e error + for r, e = p.io.receive(); e == nil; { + out <- r + } + p.Close() + err <- e +} -func (p *ProtoConn) reader() error { - return nil +// --- + +// The router + +func (p *ProtoConn) router(in <-chan interface{}, out chan<- interface{}) { + // Register catch-all pattern with the server + tuples := make(chan *serverRecv) + tuples_ch := make(chan PatternReg, 2) + p.s.reg <- &serverRegister{&serverPattern{tuples, true, 0, nil}, Pattern{}, tuples_ch} + tuples_id := <-tuples_ch + defer func() { p.s.unreg <- tuples_id }() + + // Patterns that the connected party has registered for + reg_p := make(map[PatternReg]*MatcherItem) + reg_m := MatcherCreate() + + select { + case <-p.closed: // Connection has been closed for whatever reason. + return + case msg := <-in: // Message received. + switch m := msg.(type) { + case *protoRegister: + if n, e := reg_p[m.pid]; e { + reg_m.Remove(n) + } + reg_p[m.pid] = reg_m.Add(m.pattern, nil) + case *protoUnregister: + if n, e := reg_p[m.pid]; e { + reg_m.Remove(n) + delete(reg_p, m.pid) + } + case *protoTuple: + // TODO: return-path for request-tuples + p.s.Send(m.tuple) + case *protoResponse: + // TODO + case *protoClose: + // TODO + } + case recv := <-tuples: // Incoming tuples from the server + // TODO: provide return-path for request-tuples + // TODO: don't route back tuples that we received with a *protoTuple. + if reg_m.With(recv.t, func(*MatcherItem, Tuple) bool { return false }) > 0 { + out <- &protoTuple{0, recv.t} + if recv.ch != nil { + close(recv.ch) + } + } + //case <-reg_unreg: // A pattern has been (un)registered with the server + // TODO: The catch-all patterns registered by this (and other) ProtoLink instances shouldn't be forwarded. + } } @@ -97,7 +97,7 @@ func serverDeliver(m *Matcher, n *serverSend) { var chout chan Tuple chnum := 0 - m.With(n.t, func(i *MatcherItem, t Tuple) { + m.With(n.t, func(i *MatcherItem, t Tuple) bool { p := i.Data.(*serverPattern) // Create a reply channel if there is one that will reply @@ -118,6 +118,7 @@ func serverDeliver(m *Matcher, n *serverSend) { } p.recipient <- &serverRecv{t, ch, p.id, p.data} + return true }) // Buffer the reply channel and make sure to close it when all sessions have |