summaryrefslogtreecommitdiff
path: root/ses.go
blob: 537b1a4033ec51b5b3602fa6e5b31a0c414a43a7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package tanja

type Session struct {
	node   *Node
	disp   chan<- interface{} // either a *Message (queued) or a chan<-*Message (closed)
	calls  int                // number of active callback registrations
	callch chan *Message
	rets   map[*ReturnPath]bool
}

type sesQueue struct {
	next, prev *sesQueue
	msg        *Message
}

func (s *Session) dispatcher(ch <-chan interface{}) {
	var head *sesQueue
	var tail *sesQueue
	rem := func() {
		if tail.prev != nil {
			tail.prev.next = nil
		} else {
			head = nil
		}
		tail = tail.prev
	}
L:
	for {
		// Remove any inactive messages from the tail
		for tail != nil && !tail.msg.reg.active {
			tail.msg.Close()
			rem()
		}
		// Get latest message
		var snd chan<- *Message
		var msg *Message
		if tail != nil {
			msg = tail.msg
			if c, ok := msg.reg.data.(chan<- *Message); ok {
				snd = c
			} else {
				snd = msg.reg.recv.(*Session).callch
			}
		}
		// Send/receive
		select {
		case m, ok := <-ch:
			if !ok {
				break L
			} else if msg, ok := m.(*Message); ok {
				q := &sesQueue{head, nil, msg}
				if head != nil {
					head.prev = q
				} else {
					tail = q
				}
				head = q
			} else if ch, ok := m.(chan<- *Message); ok {
				close(ch)
			}
		case snd <- msg:
			rem()
		}
	}

	// If the queue isn't empty, make sure to close the messages before exiting.
	for n := tail; n != nil; n = n.prev {
		n.msg.Close()
	}
}

func (n *Node) Session() *Session {
	s := &Session{}
	s.rets = make(map[*ReturnPath]bool)
	s.node = n
	ch := make(chan interface{})
	s.disp = ch
	go s.dispatcher(ch)
	return s
}

func (s *Session) register(willReply bool, p Tuple) *Registration {
	r := &Registration{}
	r.recv = s
	r.pat = p
	r.willReply = willReply
	return r
}

func (s *Session) Register(willReply bool, p ...interface{}) *Registration {
	return s.register(willReply, Tup(p...))
}

func (s *Session) Close() {
	s.node.lock.Lock()
	s.node.unregMatch(func(r *Registration) bool {
		return r.recv == s
	})
	s.node.lock.Unlock()
	close(s.disp)

	for ret := range s.rets {
		ret.Close()
	}
}

// Must be called when the node lock is held
func (s *Session) refCalls() {
	if s.calls == 0 {
		s.callch = make(chan *Message)
	}
	s.calls++
}

// Must be called when the node lock is held
func (s *Session) unrefCalls() {
	s.calls--
	if s.calls == 0 {
		// It shouldn't be possible for the dispatcher to write to it at this
		// point. No callback patterns are currently registered, and the
		// dispatcher is synchronously notified when something is unregistered.
		close(s.callch)
		s.callch = nil
	}
}

func (s *Session) Chan() <-chan *Message {
	// Ensure that we won't return nil when s.callch is modified in some other
	// goroutine.
	s.node.lock.Lock()
	c := s.callch
	s.node.lock.Unlock()
	if c == nil {
		c = make(chan *Message)
		close(c)
	}
	return c
}

func (s *Session) Run() {
	for msg := range s.Chan() {
		msg.Dispatch()
	}
}

func (s *Session) Send(wantReply bool, t ...interface{}) *ReturnPath {
	s.node.lock.Lock()
	var r *ReturnPath
	if wantReply {
		r = newReturnPathSession(s)
		s.rets[r] = true
	}
	s.node.send(Tup(t...), r, nil)
	s.node.lock.Unlock()
	return r
}