diff options
-rw-r--r-- | link.go | 35 |
1 files changed, 24 insertions, 11 deletions
@@ -228,6 +228,21 @@ func (l *Link) reader() { } } +func (l *Link) shutdown() { + l.node.lock.Lock() + if !l.Closed { + delete(l.node.links, l) + for _, r := range l.regs { + l.node.unregOne(r) + } + for _, r := range l.paths { + r.unref() + } + l.Closed = true + } + l.node.lock.Unlock() +} + // The returned channel will get exactly a single message when the link is // closed. Either nil after a normal Close() or an error otherwise. func (l *Link) Start() <-chan error { @@ -246,17 +261,7 @@ func (l *Link) Start() <-chan error { case e = <-l.wr.ErrorCh(): } - l.node.lock.Lock() - delete(l.node.links, l) - for _, r := range l.regs { - l.node.unregOne(r) - } - for _, r := range l.paths { - r.unref() - } - l.Closed = true - l.node.lock.Unlock() - + l.shutdown() l.io.Close() // should wake up the reader (with an error) uerr <- e }() @@ -267,6 +272,14 @@ func (l *Link) Start() <-chan error { return uerr } +// Makes sure any pending messages are flushed and then closes the link. func (l *Link) Close() { + l.shutdown() + <-l.wr.FlushCh() + l.err <- nil +} + +// Closes the link without flushing any outstanding data. +func (l *Link) ForceClose() { l.err <- nil } |