summaryrefslogtreecommitdiff
path: root/reg.go
blob: 494163be31cde4fdca76f565e3d20c83491b9167 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package tanja

type Registration struct {
	pat       Tuple
	id        int32
	active    bool
	willReply bool

	// This is either a *Session or a *Link
	recv interface{}

	// If obj is a *Session, then this is either:
	//   chan<- *Message: Application-handled channel
	//   func(*Message): Session-dispatched callback
	//   nil: Not registered yet
	data interface{}
}

func (r *Registration) node() (n *Node) {
	if s, ok := r.recv.(*Session); ok {
		n = s.node
	} else {
		n = r.recv.(*Link).node
	}
	return
}

func (r *Registration) Close() {
	n := r.node()
	n.lock.Lock()
	n.unregOne(r)
	n.lock.Unlock()
}

func (r *Registration) reg() {
	if !r.active {
		n := r.node()
		n.lock.Lock()
		n.reg(r)
		if s, ok := r.recv.(*Session); ok {
			if _, ok := r.data.(func(*Message)); ok {
				s.refCalls()
			}
		}
		n.lock.Unlock()
	}
}

// Called by the node if active goes to false. The node lock is held while this
// function is called.
func (r *Registration) unreg() {
	if s, ok := r.recv.(*Session); ok {
		if ch, ok := r.data.(chan<- *Message); ok {
			// The receiver is a channel, tell the dispatcher to close it
			s.disp <- ch
		} else if _, ok := r.data.(func(*Message)); ok {
			// Just wake up the dispatcher, as it might be trying to send to
			// the session channel, which may be closed in the next call.
			s.disp <- nil
			// If the receiver is a callback, notify the session that there is
			// one callback-registration less. (So it can close that channel if
			// there are none left)
			s.unrefCalls()
		}
	}
}

// Called while the node lock is held.
func (r *Registration) send(t *Tuple, path *ReturnPath) {
	if s, ok := r.recv.(*Session); ok {
		s.disp <- &Message{*t, path, r}
	} else {
		r.recv.(*Link).nodeSend(t, path)
	}
}

// Specific to session-registrations
// TODO? Calling Chan() after Callback() or the other way around will go wrong,
//   because s.(un)refCalls isn't called.
func (r *Registration) Chan() <-chan *Message {
	ch := make(chan *Message)
	r.data = ch
	r.reg()
	return ch
}

// Also specific to session-registrations
func (r *Registration) Callback(cb func(*Message)) {
	r.data = cb
	r.reg()
}