diff options
author | Yorhel <git@yorhel.nl> | 2012-03-30 20:15:50 +0200 |
---|---|---|
committer | Yorhel <git@yorhel.nl> | 2012-03-30 20:16:07 +0200 |
commit | 09a47c039175644234aee887d8b3aef1be525531 (patch) | |
tree | b18fac09a14e9e3c315a50ba2dcc06ed9242139e | |
parent | 1e36c6d184fdebedbd897bb06593e7f733a33a28 (diff) |
Implemented return path handling
The test is just a simple one, there's no doubt some more bugs.
-rw-r--r-- | msg.go | 140 | ||||
-rw-r--r-- | node.go | 13 | ||||
-rw-r--r-- | ses.go | 14 | ||||
-rw-r--r-- | tanja_test.go | 24 |
4 files changed, 179 insertions, 12 deletions
@@ -1,5 +1,9 @@ package tanja +import ( + . "sync/atomic" +) + type Message struct { Tup Tuple ret *ReturnPath @@ -8,7 +12,13 @@ type Message struct { // dual type ReturnPath struct { - ref int + num int32 + // TODO: These are specific to sessions, links probably require some other + // destination handling. + ses *Session + disp chan<- Tuple + ctrl chan<- bool + recv <-chan Tuple } func (m *Message) Dispatch() { @@ -17,10 +27,130 @@ func (m *Message) Dispatch() { } } -func (*Message) Reply(t Tuple) { - // TODO +func (m *Message) Reply(t Tuple) { + m.ret.reply(t) +} + +func (m *Message) Close() { + if m.ret != nil { + m.ret.ses.node.lock.Lock() + m.ret.unref() + m.ret.ses.node.lock.Unlock() + } +} + +func newReturnPathSession(s *Session) (r *ReturnPath) { + r = &ReturnPath{num: 0} + // disp and recv may be buffered for performance, ctrl must be buffered to + // allow multiple calls to Close(). + disp := make(chan Tuple, 10) + recv := make(chan Tuple, 10) + ctrl := make(chan bool, 10) + r.disp = disp + r.recv = recv + r.ses = s + r.ctrl = ctrl + go r.dispatcher(disp, ctrl, recv) + return r +} + +type retQueue struct { + next, prev *retQueue + tup Tuple +} + +// Makes sure that the return path is always asynchronous. After 'nil' is +// received on the in channel, the dispatcher will not attempt to write more +// tuples to the out channel and will just discard everything it receives on +// in. +func (r *ReturnPath) dispatcher(in <-chan Tuple, ctrl <-chan bool, out chan<- Tuple) { + var head *retQueue + var tail *retQueue +L: + for tail != nil || in != nil { + // Check if we have something queued + var snd chan<- Tuple + var tup Tuple + if tail != nil { + tup = tail.tup + snd = out + } + // Send/receive + select { + case <-ctrl: + break L + case t, ok := <-in: + if !ok { + in = nil + } else { + q := &retQueue{head, nil, t} + if head != nil { + head.prev = q + } else { + tail = q + } + head = q + } + case snd <- tup: + if tail.prev != nil { + tail.prev.next = nil + } else { + head = nil + } + tail = tail.prev + } + } + + // Free stuff as soon as we can + close(out) + head = nil + tail = nil + + // Now just keep reading stuff until in has been closed + for { + if _, ok := <-in; !ok { + break + } + } } -func (*Message) Close() { - // TODO +func (r *ReturnPath) reply(t Tuple) { + if r != nil { + // TODO: This is specific to sessions + r.disp <- t + } +} + +func (r *ReturnPath) ref() { + if r != nil { + AddInt32(&r.num, 1) + } +} + +// Node lock is held when this is called. +func (r *ReturnPath) unref() { + if r != nil && AddInt32(&r.num, -1) == 0 { + // TODO: This is specific to sessions + delete(r.ses.rets, r) + close(r.disp) + } +} + +// Used by a session to indicate that it doesn't want to receive more replies. +func (r *ReturnPath) Close() { + if r != nil { + r.ctrl <- true + } +} + +// Also specific to sessions +func (r *ReturnPath) Chan() (ch <-chan Tuple) { + if r == nil { + c := make(chan Tuple) + close(c) + ch = c + } else { + ch = r.recv + } + return ch } @@ -47,12 +47,19 @@ func (n *Node) reg(r *Registration) { func (n *Node) send(t Tuple, path *ReturnPath) { // TODO: Some way of preventing that a message is routed back to the link // it came from. - n.lock.Lock() - defer n.lock.Unlock() for _, r := range n.regs { if t.Match(r.pat) { + var p *ReturnPath + if r.willReply { + p = path + p.ref() + } // Pass a reference to the Tuple to allow duplicate detection. - r.send(&t, path) + r.send(&t, p) } } + + // Make sure to send path closed notification if there's nobody to reply. + path.ref() + path.unref() } @@ -5,6 +5,7 @@ type Session struct { disp chan<- interface{} // either a *Message (queued) or a chan<-*Message (closed) calls int // number of active callback registrations callch chan *Message + rets map[*ReturnPath]bool } type sesQueue struct { @@ -70,6 +71,7 @@ L: func (n *Node) Session() *Session { s := &Session{} + s.rets = make(map[*ReturnPath]bool) s.node = n ch := make(chan interface{}) s.disp = ch @@ -92,7 +94,10 @@ func (s *Session) Close() { }) s.node.lock.Unlock() close(s.disp) - // TODO: close any incoming returnpaths + + for ret := range s.rets { + ret.Close() + } } // Must be called when the node lock is held @@ -135,8 +140,13 @@ func (s *Session) Run() { } func (s *Session) Send(t Tuple, wantReply bool) *ReturnPath { - // TODO: fix a return path + s.node.lock.Lock() var r *ReturnPath + if wantReply { + r = newReturnPathSession(s) + s.rets[r] = true + } s.node.send(t, r) + s.node.lock.Unlock() return r } diff --git a/tanja_test.go b/tanja_test.go index 02603f7..5b3f7eb 100644 --- a/tanja_test.go +++ b/tanja_test.go @@ -19,6 +19,13 @@ func TestSess(tst *testing.T) { } reg := ses.Register(Tup("test"), false) reg.Callback(func(m *Message) { + // Calling some arbitrary reply methods shouldn't matter at all if + // the pattern is registered with !willReply. + if next == 1 { + m.Close() + } else if next == 2 { + m.Reply(Tup("blah")) + } // Tuples must be received in order if !reflect.DeepEqual(m.Tup, Tup("test", next)) { tst.Errorf("Unexpected Tuple: %v", m.Tup) @@ -31,7 +38,10 @@ func TestSess(tst *testing.T) { reg.Close() } }) - ses.Register(Tup("quit"), false).Callback(func(m *Message) { + ses.Register(Tup("quit"), true).Callback(func(m *Message) { + m.Reply(Tup("reply", int64(0))) + m.Reply(Tup("reply", int64(1))) + m.Close() havequit = true ses.Close() }) @@ -62,9 +72,19 @@ func TestSess(tst *testing.T) { for i := int64(0); i < 10; i++ { ses.Send(Tup("test", i), false) } - ses.Send(Tup("quit"), false) + ret := ses.Send(Tup("quit"), true) ses.Send(Tup("last"), false) + replies := int64(0) + for t := range ret.Chan() { + if !reflect.DeepEqual(t, Tup("reply", replies)) { + tst.Errorf("Unexpected reply: %v", t) + } + replies++ + } ses.Close() + if replies != 2 { + tst.Errorf("Replies != 2 (= %d)", replies) + } done <- 1 }() |