diff options
-rw-r--r-- | link.go | 86 | ||||
-rw-r--r-- | linkmsg.go | 6 |
2 files changed, 21 insertions, 71 deletions
@@ -1,6 +1,7 @@ package tanja import ( + "blicky.net/asyncwr" "bufio" "errors" "io" @@ -15,15 +16,12 @@ type Link struct { Closed bool node *Node io io.ReadWriteCloser + wr *asyncwr.Writer err chan<- error regs map[int32]*Registration paths map[int32]*ReturnPath lastId int32 lastTup *Tuple - // It doesn't really matter if there are a few sends to l.wr after closing, - // but there certainly shouldn't be too many in order to avoid having the - // send deadlock when the buffer is ful. So check Closed before sending. - wr chan<- interface{} } func (n *Node) Link(io io.ReadWriteCloser) (l *Link) { @@ -33,6 +31,7 @@ func (n *Node) Link(io io.ReadWriteCloser) (l *Link) { l.ReadyCh = ready l.readyWr = ready l.io = io + l.wr = asyncwr.New(io, 1024*1024) // TODO: make configurable l.node = n l.regs = make(map[int32]*Registration) l.paths = make(map[int32]*ReturnPath) @@ -40,57 +39,6 @@ func (n *Node) Link(io io.ReadWriteCloser) (l *Link) { return l } -// TODO: Timeouts -func (l *Link) writer(in <-chan interface{}) { - buf := make([]byte, 1024) - flush := make(chan int, 2) - flushing := false - - for { - select { - case msg := <-in: - if msg == nil { - return - } - if data, ok := msg.([]byte); ok { - buf = append(buf, data...) - } else { - buf = linkMsgFmt(msg, buf) - } - // TODO: check buffer size and report error - case wr := <-flush: - flushing = false - // TODO: as a performance optimisation, it's possible to only use - // copy() occasionally, and just slice the underlying buffer in - // other cases. - if wr > 0 { - i := copy(buf, buf[wr:]) - buf = buf[:i] - } - } - - // Initiate a flush if we have data - if !flushing && len(buf) > 0 { - flushing = true - size := len(buf) - // Don't give too many data to a single Write(), otherwise it - // may block for too long without giving feedback any on how - // many data we've written so far. - if size > 10240 { - size = 10240 - } - go func(data []byte) { - w, e := l.io.Write(data) - if e != nil { - l.err <- e - } else { - flush <- w - } - }(buf[:size]) - } - } -} - func (l *Link) handleHandshake(buf []byte) (err error) { strl := strings.Split(string(buf), " ") hasver := false @@ -113,7 +61,7 @@ func (l *Link) handleHandshake(buf []byte) (err error) { err = errors.New("Invalid handshake message.") } else { if l.Sync { - l.wr <- linkMsgPatternsync(true) + linkMsgWrite(l.wr, linkMsgPatternsync(true)) } else { l.node.lock.Lock() r := &Registration{pat: Tup(), active: true, willReply: true, recv: l} @@ -131,14 +79,14 @@ func (l *Link) handleHandshake(buf []byte) (err error) { // Called while the node lock is held func (l *Link) nodeUnreg(id int32) { if !l.Closed { - l.wr <- linkMsgUnregister(id) + linkMsgWrite(l.wr, linkMsgUnregister(id)) } } // Called while the node lock is held. func (l *Link) nodeReg(r *Registration) { if !l.Closed && r.recv != l { - l.wr <- &linkMsgRegister{r.id, r.pat} + linkMsgWrite(l.wr, &linkMsgRegister{r.id, r.pat}) } } @@ -159,7 +107,7 @@ func (l *Link) nodeSend(t *Tuple, p *ReturnPath) { l.paths[m.tid] = p } - l.wr <- m + linkMsgWrite(l.wr, m) l.lastTup = t } else { p.unref() @@ -169,14 +117,14 @@ func (l *Link) nodeSend(t *Tuple, p *ReturnPath) { // Node lock may or may not be held when this function is called func (l *Link) pathReply(r *ReturnPath, t Tuple) { if !l.Closed { - l.wr <- &linkMsgReply{r.id, t} + linkMsgWrite(l.wr, &linkMsgReply{r.id, t}) } } // Node lock is held when this is called. func (l *Link) pathClose(r *ReturnPath) { if !l.Closed { - l.wr <- linkMsgClose(r.id) + linkMsgWrite(l.wr, linkMsgClose(r.id)) } } @@ -187,9 +135,9 @@ func (l *Link) handleMessage(msg interface{}) error { if bool(m) && !l.Closed { l.node.links[l] = true for k, v := range l.node.regs { - l.wr <- &linkMsgRegister{k, v.pat} + linkMsgWrite(l.wr, &linkMsgRegister{k, v.pat}) } - l.wr <- linkMsgRegdone{} + linkMsgWrite(l.wr, linkMsgRegdone{}) } else { delete(l.node.links, l) } @@ -284,18 +232,19 @@ func (l *Link) reader() { // closed. Either nil after a normal Close() or an error otherwise. func (l *Link) Start() <-chan error { err := make(chan error, 5) // Must be buffered to allow some extra errors (which will be ignored) - wr := make(chan interface{}, 5) uerr := make(chan error) l.err = err - l.wr = wr - go l.writer(wr) go l.reader() // A control goroutine to shut things down. This allows multiple writes to // l.err, but only the first is handled. go func() { - e := <-err + var e error + select { + case e = <-err: + case e = <-l.wr.ErrorCh(): + } l.node.lock.Lock() delete(l.node.links, l) @@ -308,13 +257,12 @@ func (l *Link) Start() <-chan error { l.Closed = true l.node.lock.Unlock() - l.wr <- nil // tell the writer to shut down l.io.Close() // should wake up the reader (with an error) uerr <- e }() // Send handshake - l.wr <- []byte("ver,1.0 seri,json sero,json\n") + l.wr.Write([]byte("ver,1.0 seri,json sero,json\n")) return uerr } @@ -4,6 +4,7 @@ import ( "bufio" "encoding/json" "errors" + "io" . "strconv" ) @@ -143,8 +144,9 @@ func linkMsgRead(rd *bufio.Reader) (msg interface{}, err error) { return } -func linkMsgFmt(msg interface{}, buf []byte) []byte { +func linkMsgWrite(wr io.Writer, msg interface{}) (int, error) { // Somewhat low-level, but oh well + var buf []byte switch m := msg.(type) { case linkMsgPatternsync: if m { @@ -184,5 +186,5 @@ func linkMsgFmt(msg interface{}, buf []byte) []byte { buf = append(buf, []byte(FormatInt(int64(m), 10))...) buf = append(buf, []byte("]\n")...) } - return buf + return wr.Write(buf) } |