package tanja type Session struct { node *Node disp chan<- interface{} // either a *Message (queued) or a chan<-*Message (closed) calls int // number of active callback registrations callch chan *Message rets map[*ReturnPath]bool } type sesQueue struct { next, prev *sesQueue msg *Message } func (s *Session) dispatcher(ch <-chan interface{}) { var head *sesQueue var tail *sesQueue rem := func() { if tail.prev != nil { tail.prev.next = nil } else { head = nil } tail = tail.prev } L: for { // Remove any inactive messages from the tail for tail != nil && !tail.msg.reg.active { tail.msg.Close() rem() } // Get latest message var snd chan<- *Message var msg *Message if tail != nil { msg = tail.msg if c, ok := msg.reg.data.(chan<- *Message); ok { snd = c } else { snd = msg.reg.recv.(*Session).callch } } // Send/receive select { case m, ok := <-ch: if !ok { break L } else if msg, ok := m.(*Message); ok { q := &sesQueue{head, nil, msg} if head != nil { head.prev = q } else { tail = q } head = q } else if ch, ok := m.(chan<- *Message); ok { close(ch) } case snd <- msg: rem() } } // If the queue isn't empty, make sure to close the messages before exiting. for n := tail; n != nil; n = n.prev { n.msg.Close() } } func (n *Node) Session() *Session { s := &Session{} s.rets = make(map[*ReturnPath]bool) s.node = n ch := make(chan interface{}) s.disp = ch go s.dispatcher(ch) return s } func (s *Session) Registert(willReply bool, p Tuple) *Registration { r := &Registration{} r.recv = s r.pat = p r.willReply = willReply return r } func (s *Session) Register(willReply bool, p ...interface{}) *Registration { return s.Registert(willReply, Tup(p...)) } func (s *Session) Close() { s.node.lock.Lock() s.node.unregMatch(func(r *Registration) bool { return r.recv == s }) s.node.lock.Unlock() close(s.disp) for ret := range s.rets { ret.Close() } } // Must be called when the node lock is held func (s *Session) refCalls() { if s.calls == 0 { s.callch = make(chan *Message) } s.calls++ } // Must be called when the node lock is held func (s *Session) unrefCalls() { s.calls-- if s.calls == 0 { // It shouldn't be possible for the dispatcher to write to it at this // point. No callback patterns are currently registered, and the // dispatcher is synchronously notified when something is unregistered. close(s.callch) s.callch = nil } } func (s *Session) Chan() <-chan *Message { // Ensure that we won't return nil when s.callch is modified in some other // goroutine. s.node.lock.Lock() c := s.callch s.node.lock.Unlock() if c == nil { c = make(chan *Message) close(c) } return c } func (s *Session) Run() { for msg := range s.Chan() { msg.Dispatch() } } func (s *Session) Sendt(wantReply bool, t Tuple) *ReturnPath { s.node.lock.Lock() var r *ReturnPath if wantReply { r = newReturnPathSession(s) s.rets[r] = true } s.node.send(t, r, nil) s.node.lock.Unlock() return r } func (s *Session) Send(wantReply bool, t ...interface{}) *ReturnPath { return s.Sendt(wantReply, Tup(t...)) }