summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYorhel <git@yorhel.nl>2012-03-30 20:15:50 +0200
committerYorhel <git@yorhel.nl>2012-03-30 20:16:07 +0200
commit09a47c039175644234aee887d8b3aef1be525531 (patch)
treeb18fac09a14e9e3c315a50ba2dcc06ed9242139e
parent1e36c6d184fdebedbd897bb06593e7f733a33a28 (diff)
Implemented return path handling
The test is just a simple one, there's no doubt some more bugs.
-rw-r--r--msg.go140
-rw-r--r--node.go13
-rw-r--r--ses.go14
-rw-r--r--tanja_test.go24
4 files changed, 179 insertions, 12 deletions
diff --git a/msg.go b/msg.go
index da8599d..2454368 100644
--- a/msg.go
+++ b/msg.go
@@ -1,5 +1,9 @@
package tanja
+import (
+ . "sync/atomic"
+)
+
type Message struct {
Tup Tuple
ret *ReturnPath
@@ -8,7 +12,13 @@ type Message struct {
// dual
type ReturnPath struct {
- ref int
+ num int32
+ // TODO: These are specific to sessions, links probably require some other
+ // destination handling.
+ ses *Session
+ disp chan<- Tuple
+ ctrl chan<- bool
+ recv <-chan Tuple
}
func (m *Message) Dispatch() {
@@ -17,10 +27,130 @@ func (m *Message) Dispatch() {
}
}
-func (*Message) Reply(t Tuple) {
- // TODO
+func (m *Message) Reply(t Tuple) {
+ m.ret.reply(t)
+}
+
+func (m *Message) Close() {
+ if m.ret != nil {
+ m.ret.ses.node.lock.Lock()
+ m.ret.unref()
+ m.ret.ses.node.lock.Unlock()
+ }
+}
+
+func newReturnPathSession(s *Session) (r *ReturnPath) {
+ r = &ReturnPath{num: 0}
+ // disp and recv may be buffered for performance, ctrl must be buffered to
+ // allow multiple calls to Close().
+ disp := make(chan Tuple, 10)
+ recv := make(chan Tuple, 10)
+ ctrl := make(chan bool, 10)
+ r.disp = disp
+ r.recv = recv
+ r.ses = s
+ r.ctrl = ctrl
+ go r.dispatcher(disp, ctrl, recv)
+ return r
+}
+
+type retQueue struct {
+ next, prev *retQueue
+ tup Tuple
+}
+
+// Makes sure that the return path is always asynchronous. After 'nil' is
+// received on the in channel, the dispatcher will not attempt to write more
+// tuples to the out channel and will just discard everything it receives on
+// in.
+func (r *ReturnPath) dispatcher(in <-chan Tuple, ctrl <-chan bool, out chan<- Tuple) {
+ var head *retQueue
+ var tail *retQueue
+L:
+ for tail != nil || in != nil {
+ // Check if we have something queued
+ var snd chan<- Tuple
+ var tup Tuple
+ if tail != nil {
+ tup = tail.tup
+ snd = out
+ }
+ // Send/receive
+ select {
+ case <-ctrl:
+ break L
+ case t, ok := <-in:
+ if !ok {
+ in = nil
+ } else {
+ q := &retQueue{head, nil, t}
+ if head != nil {
+ head.prev = q
+ } else {
+ tail = q
+ }
+ head = q
+ }
+ case snd <- tup:
+ if tail.prev != nil {
+ tail.prev.next = nil
+ } else {
+ head = nil
+ }
+ tail = tail.prev
+ }
+ }
+
+ // Free stuff as soon as we can
+ close(out)
+ head = nil
+ tail = nil
+
+ // Now just keep reading stuff until in has been closed
+ for {
+ if _, ok := <-in; !ok {
+ break
+ }
+ }
}
-func (*Message) Close() {
- // TODO
+func (r *ReturnPath) reply(t Tuple) {
+ if r != nil {
+ // TODO: This is specific to sessions
+ r.disp <- t
+ }
+}
+
+func (r *ReturnPath) ref() {
+ if r != nil {
+ AddInt32(&r.num, 1)
+ }
+}
+
+// Node lock is held when this is called.
+func (r *ReturnPath) unref() {
+ if r != nil && AddInt32(&r.num, -1) == 0 {
+ // TODO: This is specific to sessions
+ delete(r.ses.rets, r)
+ close(r.disp)
+ }
+}
+
+// Used by a session to indicate that it doesn't want to receive more replies.
+func (r *ReturnPath) Close() {
+ if r != nil {
+ r.ctrl <- true
+ }
+}
+
+// Also specific to sessions
+func (r *ReturnPath) Chan() (ch <-chan Tuple) {
+ if r == nil {
+ c := make(chan Tuple)
+ close(c)
+ ch = c
+ } else {
+ ch = r.recv
+ }
+ return ch
}
diff --git a/node.go b/node.go
index fdb75a3..b79de0c 100644
--- a/node.go
+++ b/node.go
@@ -47,12 +47,19 @@ func (n *Node) reg(r *Registration) {
func (n *Node) send(t Tuple, path *ReturnPath) {
// TODO: Some way of preventing that a message is routed back to the link
// it came from.
- n.lock.Lock()
- defer n.lock.Unlock()
for _, r := range n.regs {
if t.Match(r.pat) {
+ var p *ReturnPath
+ if r.willReply {
+ p = path
+ p.ref()
+ }
// Pass a reference to the Tuple to allow duplicate detection.
- r.send(&t, path)
+ r.send(&t, p)
}
}
+
+ // Make sure to send path closed notification if there's nobody to reply.
+ path.ref()
+ path.unref()
}
diff --git a/ses.go b/ses.go
index 2ef4ae0..92f809d 100644
--- a/ses.go
+++ b/ses.go
@@ -5,6 +5,7 @@ type Session struct {
disp chan<- interface{} // either a *Message (queued) or a chan<-*Message (closed)
calls int // number of active callback registrations
callch chan *Message
+ rets map[*ReturnPath]bool
}
type sesQueue struct {
@@ -70,6 +71,7 @@ L:
func (n *Node) Session() *Session {
s := &Session{}
+ s.rets = make(map[*ReturnPath]bool)
s.node = n
ch := make(chan interface{})
s.disp = ch
@@ -92,7 +94,10 @@ func (s *Session) Close() {
})
s.node.lock.Unlock()
close(s.disp)
- // TODO: close any incoming returnpaths
+
+ for ret := range s.rets {
+ ret.Close()
+ }
}
// Must be called when the node lock is held
@@ -135,8 +140,13 @@ func (s *Session) Run() {
}
func (s *Session) Send(t Tuple, wantReply bool) *ReturnPath {
- // TODO: fix a return path
+ s.node.lock.Lock()
var r *ReturnPath
+ if wantReply {
+ r = newReturnPathSession(s)
+ s.rets[r] = true
+ }
s.node.send(t, r)
+ s.node.lock.Unlock()
return r
}
diff --git a/tanja_test.go b/tanja_test.go
index 02603f7..5b3f7eb 100644
--- a/tanja_test.go
+++ b/tanja_test.go
@@ -19,6 +19,13 @@ func TestSess(tst *testing.T) {
}
reg := ses.Register(Tup("test"), false)
reg.Callback(func(m *Message) {
+ // Calling some arbitrary reply methods shouldn't matter at all if
+ // the pattern is registered with !willReply.
+ if next == 1 {
+ m.Close()
+ } else if next == 2 {
+ m.Reply(Tup("blah"))
+ }
// Tuples must be received in order
if !reflect.DeepEqual(m.Tup, Tup("test", next)) {
tst.Errorf("Unexpected Tuple: %v", m.Tup)
@@ -31,7 +38,10 @@ func TestSess(tst *testing.T) {
reg.Close()
}
})
- ses.Register(Tup("quit"), false).Callback(func(m *Message) {
+ ses.Register(Tup("quit"), true).Callback(func(m *Message) {
+ m.Reply(Tup("reply", int64(0)))
+ m.Reply(Tup("reply", int64(1)))
+ m.Close()
havequit = true
ses.Close()
})
@@ -62,9 +72,19 @@ func TestSess(tst *testing.T) {
for i := int64(0); i < 10; i++ {
ses.Send(Tup("test", i), false)
}
- ses.Send(Tup("quit"), false)
+ ret := ses.Send(Tup("quit"), true)
ses.Send(Tup("last"), false)
+ replies := int64(0)
+ for t := range ret.Chan() {
+ if !reflect.DeepEqual(t, Tup("reply", replies)) {
+ tst.Errorf("Unexpected reply: %v", t)
+ }
+ replies++
+ }
ses.Close()
+ if replies != 2 {
+ tst.Errorf("Replies != 2 (= %d)", replies)
+ }
done <- 1
}()