summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--link.go35
1 files changed, 24 insertions, 11 deletions
diff --git a/link.go b/link.go
index 4cc7f2a..40a2987 100644
--- a/link.go
+++ b/link.go
@@ -228,6 +228,21 @@ func (l *Link) reader() {
}
}
+func (l *Link) shutdown() {
+ l.node.lock.Lock()
+ if !l.Closed {
+ delete(l.node.links, l)
+ for _, r := range l.regs {
+ l.node.unregOne(r)
+ }
+ for _, r := range l.paths {
+ r.unref()
+ }
+ l.Closed = true
+ }
+ l.node.lock.Unlock()
+}
+
// The returned channel will get exactly a single message when the link is
// closed. Either nil after a normal Close() or an error otherwise.
func (l *Link) Start() <-chan error {
@@ -246,17 +261,7 @@ func (l *Link) Start() <-chan error {
case e = <-l.wr.ErrorCh():
}
- l.node.lock.Lock()
- delete(l.node.links, l)
- for _, r := range l.regs {
- l.node.unregOne(r)
- }
- for _, r := range l.paths {
- r.unref()
- }
- l.Closed = true
- l.node.lock.Unlock()
-
+ l.shutdown()
l.io.Close() // should wake up the reader (with an error)
uerr <- e
}()
@@ -267,6 +272,14 @@ func (l *Link) Start() <-chan error {
return uerr
}
+// Makes sure any pending messages are flushed and then closes the link.
func (l *Link) Close() {
+ l.shutdown()
+ <-l.wr.FlushCh()
+ l.err <- nil
+}
+
+// Closes the link without flushing any outstanding data.
+func (l *Link) ForceClose() {
l.err <- nil
}