package tanja import ( . "sync/atomic" ) type Message struct { Tup Tuple ret *ReturnPath reg *Registration } type ReturnPath struct { num int32 // This is either a *Session or a *Link obj interface{} // These are only used if the above is a *Session disp chan<- Tuple ctrl chan<- bool recv <-chan Tuple // And this is for links id int32 } func (m *Message) Dispatch() { if f, ok := m.reg.data.(func(*Message)); ok { f(m) } } func (m *Message) Replyt(t Tuple) { m.ret.reply(t) } func (m *Message) Reply(t ...interface{}) { m.ret.reply(Tup(t...)) } func (m *Message) Close() { if m.ret != nil { var n *Node if s, ok := m.ret.obj.(*Session); ok { n = s.node } else { n = m.ret.obj.(*Link).node } n.lock.Lock() m.ret.unref() n.lock.Unlock() } } func newReturnPathSession(s *Session) (r *ReturnPath) { r = &ReturnPath{} // disp and recv may be buffered for performance, ctrl must be buffered to // allow multiple calls to Close(). disp := make(chan Tuple, 10) recv := make(chan Tuple, 10) ctrl := make(chan bool, 10) r.disp = disp r.recv = recv r.obj = s r.ctrl = ctrl go r.dispatcher(disp, ctrl, recv) return r } type retQueue struct { next, prev *retQueue tup Tuple } // Makes sure that the return path is always asynchronous. After 'nil' is // received on the in channel, the dispatcher will not attempt to write more // tuples to the out channel and will just discard everything it receives on // in. func (r *ReturnPath) dispatcher(in <-chan Tuple, ctrl <-chan bool, out chan<- Tuple) { var head *retQueue var tail *retQueue L: for tail != nil || in != nil { // Check if we have something queued var snd chan<- Tuple var tup Tuple if tail != nil { tup = tail.tup snd = out } // Send/receive select { case <-ctrl: break L case t, ok := <-in: if !ok { in = nil } else { q := &retQueue{head, nil, t} if head != nil { head.prev = q } else { tail = q } head = q } case snd <- tup: if tail.prev != nil { tail.prev.next = nil } else { head = nil } tail = tail.prev } } // Free stuff as soon as we can close(out) head = nil tail = nil // Now just keep reading stuff until in has been closed for { if _, ok := <-in; !ok { break } } } // Node lock may or may not be held when this function is called. func (r *ReturnPath) reply(t Tuple) { if r != nil { if _, ok := r.obj.(*Session); ok { r.disp <- t } else { r.obj.(*Link).pathReply(r, t) } } } func (r *ReturnPath) ref() { if r != nil { AddInt32(&r.num, 1) } } // Node lock is held when this is called. func (r *ReturnPath) unref() { if r != nil && AddInt32(&r.num, -1) == 0 { if s, ok := r.obj.(*Session); ok { delete(s.rets, r) close(r.disp) } else { r.obj.(*Link).pathClose(r) } } } // Used by a session to indicate that it doesn't want to receive more replies. func (r *ReturnPath) Close() { if r != nil { r.ctrl <- true } } // Also specific to sessions func (r *ReturnPath) Chan() (ch <-chan Tuple) { if r == nil { c := make(chan Tuple) close(c) ch = c } else { ch = r.recv } return ch }