summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--link.go86
-rw-r--r--linkmsg.go6
2 files changed, 21 insertions, 71 deletions
diff --git a/link.go b/link.go
index 49d7b8e..4cc7f2a 100644
--- a/link.go
+++ b/link.go
@@ -1,6 +1,7 @@
package tanja
import (
+ "blicky.net/asyncwr"
"bufio"
"errors"
"io"
@@ -15,15 +16,12 @@ type Link struct {
Closed bool
node *Node
io io.ReadWriteCloser
+ wr *asyncwr.Writer
err chan<- error
regs map[int32]*Registration
paths map[int32]*ReturnPath
lastId int32
lastTup *Tuple
- // It doesn't really matter if there are a few sends to l.wr after closing,
- // but there certainly shouldn't be too many in order to avoid having the
- // send deadlock when the buffer is ful. So check Closed before sending.
- wr chan<- interface{}
}
func (n *Node) Link(io io.ReadWriteCloser) (l *Link) {
@@ -33,6 +31,7 @@ func (n *Node) Link(io io.ReadWriteCloser) (l *Link) {
l.ReadyCh = ready
l.readyWr = ready
l.io = io
+ l.wr = asyncwr.New(io, 1024*1024) // TODO: make configurable
l.node = n
l.regs = make(map[int32]*Registration)
l.paths = make(map[int32]*ReturnPath)
@@ -40,57 +39,6 @@ func (n *Node) Link(io io.ReadWriteCloser) (l *Link) {
return l
}
-// TODO: Timeouts
-func (l *Link) writer(in <-chan interface{}) {
- buf := make([]byte, 1024)
- flush := make(chan int, 2)
- flushing := false
-
- for {
- select {
- case msg := <-in:
- if msg == nil {
- return
- }
- if data, ok := msg.([]byte); ok {
- buf = append(buf, data...)
- } else {
- buf = linkMsgFmt(msg, buf)
- }
- // TODO: check buffer size and report error
- case wr := <-flush:
- flushing = false
- // TODO: as a performance optimisation, it's possible to only use
- // copy() occasionally, and just slice the underlying buffer in
- // other cases.
- if wr > 0 {
- i := copy(buf, buf[wr:])
- buf = buf[:i]
- }
- }
-
- // Initiate a flush if we have data
- if !flushing && len(buf) > 0 {
- flushing = true
- size := len(buf)
- // Don't give too many data to a single Write(), otherwise it
- // may block for too long without giving feedback any on how
- // many data we've written so far.
- if size > 10240 {
- size = 10240
- }
- go func(data []byte) {
- w, e := l.io.Write(data)
- if e != nil {
- l.err <- e
- } else {
- flush <- w
- }
- }(buf[:size])
- }
- }
-}
-
func (l *Link) handleHandshake(buf []byte) (err error) {
strl := strings.Split(string(buf), " ")
hasver := false
@@ -113,7 +61,7 @@ func (l *Link) handleHandshake(buf []byte) (err error) {
err = errors.New("Invalid handshake message.")
} else {
if l.Sync {
- l.wr <- linkMsgPatternsync(true)
+ linkMsgWrite(l.wr, linkMsgPatternsync(true))
} else {
l.node.lock.Lock()
r := &Registration{pat: Tup(), active: true, willReply: true, recv: l}
@@ -131,14 +79,14 @@ func (l *Link) handleHandshake(buf []byte) (err error) {
// Called while the node lock is held
func (l *Link) nodeUnreg(id int32) {
if !l.Closed {
- l.wr <- linkMsgUnregister(id)
+ linkMsgWrite(l.wr, linkMsgUnregister(id))
}
}
// Called while the node lock is held.
func (l *Link) nodeReg(r *Registration) {
if !l.Closed && r.recv != l {
- l.wr <- &linkMsgRegister{r.id, r.pat}
+ linkMsgWrite(l.wr, &linkMsgRegister{r.id, r.pat})
}
}
@@ -159,7 +107,7 @@ func (l *Link) nodeSend(t *Tuple, p *ReturnPath) {
l.paths[m.tid] = p
}
- l.wr <- m
+ linkMsgWrite(l.wr, m)
l.lastTup = t
} else {
p.unref()
@@ -169,14 +117,14 @@ func (l *Link) nodeSend(t *Tuple, p *ReturnPath) {
// Node lock may or may not be held when this function is called
func (l *Link) pathReply(r *ReturnPath, t Tuple) {
if !l.Closed {
- l.wr <- &linkMsgReply{r.id, t}
+ linkMsgWrite(l.wr, &linkMsgReply{r.id, t})
}
}
// Node lock is held when this is called.
func (l *Link) pathClose(r *ReturnPath) {
if !l.Closed {
- l.wr <- linkMsgClose(r.id)
+ linkMsgWrite(l.wr, linkMsgClose(r.id))
}
}
@@ -187,9 +135,9 @@ func (l *Link) handleMessage(msg interface{}) error {
if bool(m) && !l.Closed {
l.node.links[l] = true
for k, v := range l.node.regs {
- l.wr <- &linkMsgRegister{k, v.pat}
+ linkMsgWrite(l.wr, &linkMsgRegister{k, v.pat})
}
- l.wr <- linkMsgRegdone{}
+ linkMsgWrite(l.wr, linkMsgRegdone{})
} else {
delete(l.node.links, l)
}
@@ -284,18 +232,19 @@ func (l *Link) reader() {
// closed. Either nil after a normal Close() or an error otherwise.
func (l *Link) Start() <-chan error {
err := make(chan error, 5) // Must be buffered to allow some extra errors (which will be ignored)
- wr := make(chan interface{}, 5)
uerr := make(chan error)
l.err = err
- l.wr = wr
- go l.writer(wr)
go l.reader()
// A control goroutine to shut things down. This allows multiple writes to
// l.err, but only the first is handled.
go func() {
- e := <-err
+ var e error
+ select {
+ case e = <-err:
+ case e = <-l.wr.ErrorCh():
+ }
l.node.lock.Lock()
delete(l.node.links, l)
@@ -308,13 +257,12 @@ func (l *Link) Start() <-chan error {
l.Closed = true
l.node.lock.Unlock()
- l.wr <- nil // tell the writer to shut down
l.io.Close() // should wake up the reader (with an error)
uerr <- e
}()
// Send handshake
- l.wr <- []byte("ver,1.0 seri,json sero,json\n")
+ l.wr.Write([]byte("ver,1.0 seri,json sero,json\n"))
return uerr
}
diff --git a/linkmsg.go b/linkmsg.go
index 97ed141..0c384a3 100644
--- a/linkmsg.go
+++ b/linkmsg.go
@@ -4,6 +4,7 @@ import (
"bufio"
"encoding/json"
"errors"
+ "io"
. "strconv"
)
@@ -143,8 +144,9 @@ func linkMsgRead(rd *bufio.Reader) (msg interface{}, err error) {
return
}
-func linkMsgFmt(msg interface{}, buf []byte) []byte {
+func linkMsgWrite(wr io.Writer, msg interface{}) (int, error) {
// Somewhat low-level, but oh well
+ var buf []byte
switch m := msg.(type) {
case linkMsgPatternsync:
if m {
@@ -184,5 +186,5 @@ func linkMsgFmt(msg interface{}, buf []byte) []byte {
buf = append(buf, []byte(FormatInt(int64(m), 10))...)
buf = append(buf, []byte("]\n")...)
}
- return buf
+ return wr.Write(buf)
}