1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
|
package tanja
type Registration struct {
pat Tuple
id int32
active bool
willReply bool
// This is either a *Session or a *Link
recv interface{}
// If obj is a *Session, then this is either:
// chan<- *Message: Application-handled channel
// func(*Message): Session-dispatched callback
// nil: Not registered yet
data interface{}
}
func (r *Registration) node() (n *Node) {
if s, ok := r.recv.(*Session); ok {
n = s.node
} else {
n = r.recv.(*Link).node
}
return
}
func (r *Registration) Close() {
n := r.node()
n.lock.Lock()
n.unregOne(r)
n.lock.Unlock()
}
func (r *Registration) reg() {
if !r.active {
n := r.node()
n.lock.Lock()
n.reg(r)
if s, ok := r.recv.(*Session); ok {
if _, ok := r.data.(func(*Message)); ok {
s.refCalls()
}
}
n.lock.Unlock()
}
}
// Called by the node if active goes to false. The node lock is held while this
// function is called.
func (r *Registration) unreg() {
if s, ok := r.recv.(*Session); ok {
if ch, ok := r.data.(chan<- *Message); ok {
// The receiver is a channel, tell the dispatcher to close it
s.disp <- ch
} else if _, ok := r.data.(func(*Message)); ok {
// Just wake up the dispatcher, as it might be trying to send to
// the session channel, which may be closed in the next call.
s.disp <- nil
// If the receiver is a callback, notify the session that there is
// one callback-registration less. (So it can close that channel if
// there are none left)
s.unrefCalls()
}
}
}
// Called while the node lock is held.
func (r *Registration) send(t *Tuple, path *ReturnPath) {
if s, ok := r.recv.(*Session); ok {
s.disp <- &Message{*t, path, r}
} else {
r.recv.(*Link).nodeSend(t, path)
}
}
// Specific to session-registrations
// TODO? Calling Chan() after Callback() or the other way around will go wrong,
// because s.(un)refCalls isn't called.
func (r *Registration) Chan() <-chan *Message {
ch := make(chan *Message)
r.data = ch
r.reg()
return ch
}
// Also specific to session-registrations
func (r *Registration) Callback(cb func(*Message)) {
r.data = cb
r.reg()
}
|