summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYorhel <git@yorhel.nl>2012-02-04 16:08:31 +0100
committerYorhel <git@yorhel.nl>2012-02-04 16:08:31 +0100
commite1954e1054ecd1e98a61381eef9398be32a1290e (patch)
tree78393a697b7ea444c01d3fd388348e06223b292a
parent9104163cf286b7a7d30679a62ddf3f8d7c0e7d2f (diff)
Continued on proto.go
But there's still a lot of TODO's left.
-rw-r--r--matching.go6
-rw-r--r--proto.go149
-rw-r--r--server.go3
3 files changed, 129 insertions, 29 deletions
diff --git a/matching.go b/matching.go
index 6a3d751..b319842 100644
--- a/matching.go
+++ b/matching.go
@@ -144,12 +144,14 @@ func (m *Matcher) Remove(i *MatcherItem) {
}
}
-func (m *Matcher) With(t Tuple, f func(*MatcherItem, Tuple)) int {
+func (m *Matcher) With(t Tuple, f func(*MatcherItem, Tuple) bool) int {
num := 0
for n := range *m {
if nt := t.Match((*m)[n].Pattern); nt != nil {
num++
- f((*m)[n], nt)
+ if !f((*m)[n], nt) {
+ return num
+ }
}
}
return num
diff --git a/proto.go b/proto.go
index f52b500..76979db 100644
--- a/proto.go
+++ b/proto.go
@@ -37,26 +37,26 @@ type Error struct {
func (err *Error) Error() string { return err.ErrorString }
type protoRegister struct {
- pid int
+ pid PatternReg
pattern Pattern
}
type protoUnregister struct {
- pid int
+ pid PatternReg
}
type protoTuple struct {
- tid int
+ tid PatternReg
tuple Tuple
}
type protoResponse struct {
- tid int
+ tid PatternReg
pattern Pattern
}
type protoClose struct {
- tid int
+ tid PatternReg
}
type protoSerIO interface {
@@ -75,7 +75,7 @@ var protoSerList = [...]protoSerItem{
}
type ProtoConn struct {
- s *Session
+ s *Server
c net.Conn
r *bufio.Reader
serv bool
@@ -86,27 +86,59 @@ type ProtoConn struct {
func (s *Server) ProtoLink(c net.Conn, serv bool) *ProtoConn {
r := bufio.NewReader(c)
- cl := make(chan int, 5)
- return &ProtoConn{s.Session(), c, r, serv, cl, 0, nil}
+ cl := make(chan int)
+ return &ProtoConn{s, c, r, serv, cl, 0, nil}
}
func (p *ProtoConn) Run() error {
err := p.handshake()
if err != nil {
p.Close()
+ return err
}
- return err
- // TODO: do the rest as well
+
+ // Make sure the server stays alive while we're running
+ p.s.ref <- 1
+ defer func() { p.s.ref <- -1 }()
+
+ // Spawn I/O helpers
+ ech := make(chan error, 2)
+ rd := make(chan interface{})
+ wr := make(chan interface{}, 10) // TODO: INFINITE BUFFER!
+ go p.reader(rd, ech)
+ go p.writer(wr, ech)
+
+ // Run the router
+ p.router(rd, wr)
+
+ // Get the first I/O error and return that
+ select {
+ case err = <-ech:
+ return err
+ default:
+ return nil
+ }
+ return nil
}
func (p *ProtoConn) Close() {
- // This should wake up the reader goroutine.
- // The writer goroutine will also wake up if it is in a Write() call.
- p.c.Close()
- // The writer goroutine may block on receiving Tuples via a channel, wake
- // that up as well.
- p.closed <- 1
- p.s.Close()
+ // Close() may be called multiple times, handle that.
+ select {
+ case <-p.closed:
+ return
+ default:
+ // This should wake up the reader goroutine.
+ // The writer goroutine will also wake up if it is in a Write() call.
+ p.c.Close()
+ // The writer/router goroutine may block in a select, wake those up as
+ // well. This should also prevent a second call to Close() from doing
+ // anything.
+ close(p.closed)
+ }
+ // Note that there is a race condition in the above code when two
+ // goroutines call Close() at the same time. I suspect that this situation
+ // only happens *after* one Close() has already been called, so we should
+ // be safe.
}
// --
@@ -236,18 +268,83 @@ func (p *ProtoConn) handshakeClient() error {
return err
}
-// --
+// ---
-// Writer
+// I/O handlers
+// TODO: timeouts
-func (p *ProtoConn) writer() error {
- return nil
+func (p *ProtoConn) writer(in <-chan interface{}, err chan<- error) {
+ select {
+ case <-p.closed:
+ return
+ case msg := <-in:
+ if e := p.io.send(msg); e != nil {
+ p.Close()
+ err <- e
+ return
+ }
+ }
}
-// --
-
-// Reader
+func (p *ProtoConn) reader(out chan<- interface{}, err chan<- error) {
+ var r interface{}
+ var e error
+ for r, e = p.io.receive(); e == nil; {
+ out <- r
+ }
+ p.Close()
+ err <- e
+}
-func (p *ProtoConn) reader() error {
- return nil
+// ---
+
+// The router
+
+func (p *ProtoConn) router(in <-chan interface{}, out chan<- interface{}) {
+ // Register catch-all pattern with the server
+ tuples := make(chan *serverRecv)
+ tuples_ch := make(chan PatternReg, 2)
+ p.s.reg <- &serverRegister{&serverPattern{tuples, true, 0, nil}, Pattern{}, tuples_ch}
+ tuples_id := <-tuples_ch
+ defer func() { p.s.unreg <- tuples_id }()
+
+ // Patterns that the connected party has registered for
+ reg_p := make(map[PatternReg]*MatcherItem)
+ reg_m := MatcherCreate()
+
+ select {
+ case <-p.closed: // Connection has been closed for whatever reason.
+ return
+ case msg := <-in: // Message received.
+ switch m := msg.(type) {
+ case *protoRegister:
+ if n, e := reg_p[m.pid]; e {
+ reg_m.Remove(n)
+ }
+ reg_p[m.pid] = reg_m.Add(m.pattern, nil)
+ case *protoUnregister:
+ if n, e := reg_p[m.pid]; e {
+ reg_m.Remove(n)
+ delete(reg_p, m.pid)
+ }
+ case *protoTuple:
+ // TODO: return-path for request-tuples
+ p.s.Send(m.tuple)
+ case *protoResponse:
+ // TODO
+ case *protoClose:
+ // TODO
+ }
+ case recv := <-tuples: // Incoming tuples from the server
+ // TODO: provide return-path for request-tuples
+ // TODO: don't route back tuples that we received with a *protoTuple.
+ if reg_m.With(recv.t, func(*MatcherItem, Tuple) bool { return false }) > 0 {
+ out <- &protoTuple{0, recv.t}
+ if recv.ch != nil {
+ close(recv.ch)
+ }
+ }
+ //case <-reg_unreg: // A pattern has been (un)registered with the server
+ // TODO: The catch-all patterns registered by this (and other) ProtoLink instances shouldn't be forwarded.
+ }
}
diff --git a/server.go b/server.go
index 14d441f..9aa9ed3 100644
--- a/server.go
+++ b/server.go
@@ -97,7 +97,7 @@ func serverDeliver(m *Matcher, n *serverSend) {
var chout chan Tuple
chnum := 0
- m.With(n.t, func(i *MatcherItem, t Tuple) {
+ m.With(n.t, func(i *MatcherItem, t Tuple) bool {
p := i.Data.(*serverPattern)
// Create a reply channel if there is one that will reply
@@ -118,6 +118,7 @@ func serverDeliver(m *Matcher, n *serverSend) {
}
p.recipient <- &serverRecv{t, ch, p.id, p.data}
+ return true
})
// Buffer the reply channel and make sure to close it when all sessions have