package tanja import ( "blicky.net/asyncwr" "bufio" "errors" "io" "strings" ) type Link struct { Sync bool Ready bool ReadyCh <-chan bool readyWr chan<- bool Closed bool node *Node io io.ReadWriteCloser wr *asyncwr.Writer err chan<- error regs map[int32]*Registration paths map[int32]*ReturnPath lastId int32 lastTup *Tuple } func (n *Node) Link(io io.ReadWriteCloser) (l *Link) { l = &Link{} l.Sync = true ready := make(chan bool, 1) l.ReadyCh = ready l.readyWr = ready l.io = io l.wr = asyncwr.New(io, 1024*1024) // TODO: make configurable l.node = n l.regs = make(map[int32]*Registration) l.paths = make(map[int32]*ReturnPath) l.lastId = 1 return l } func (l *Link) handleHandshake(buf []byte) (err error) { strl := strings.Split(string(buf), " ") hasver := false hasseri := false hassero := false for _, arg := range strl { if strings.HasPrefix(arg, "ver,") && strings.Contains(arg, ",1.") { hasver = true } // TODO: These checks aren't too good, would also match "json-whatever" if strings.HasPrefix(arg, "seri,") && strings.Contains(arg, ",json") { hasseri = true } if strings.HasPrefix(arg, "sero,") && strings.Contains(arg, ",json") { hassero = true } } if !hasver || !hasseri || !hassero { err = errors.New("Invalid handshake message.") } else { if l.Sync { linkMsgWrite(l.wr, linkMsgPatternsync(true)) } else { l.node.lock.Lock() r := &Registration{pat: Tup(), active: true, willReply: true, recv: l} l.node.reg(r) l.regs[0] = r l.node.lock.Unlock() l.Ready = true l.readyWr <- true close(l.readyWr) } } return } // Called while the node lock is held func (l *Link) nodeUnreg(id int32) { if !l.Closed { linkMsgWrite(l.wr, linkMsgUnregister(id)) } } // Called while the node lock is held. func (l *Link) nodeReg(r *Registration) { if !l.Closed && r.recv != l { linkMsgWrite(l.wr, &linkMsgRegister{r.id, r.pat}) } } // Called while the node lock is held. func (l *Link) nodeSend(t *Tuple, p *ReturnPath) { if !l.Closed && l.lastTup != t { m := &linkMsgTuple{0, *t} if p != nil { old := l.lastId for old == l.lastId || l.paths[l.lastId] != nil { l.lastId++ if l.lastId < 0 { l.lastId = 1 } } m.tid = l.lastId l.paths[m.tid] = p } linkMsgWrite(l.wr, m) l.lastTup = t } else { p.unref() } } // Node lock may or may not be held when this function is called func (l *Link) pathReply(r *ReturnPath, t Tuple) { if !l.Closed { linkMsgWrite(l.wr, &linkMsgReply{r.id, t}) } } // Node lock is held when this is called. func (l *Link) pathClose(r *ReturnPath) { if !l.Closed { linkMsgWrite(l.wr, linkMsgClose(r.id)) } } func (l *Link) handleMessage(msg interface{}) error { switch m := msg.(type) { case linkMsgPatternsync: l.node.lock.Lock() if bool(m) && !l.Closed { l.node.links[l] = true for k, v := range l.node.regs { linkMsgWrite(l.wr, &linkMsgRegister{k, v.pat}) } linkMsgWrite(l.wr, linkMsgRegdone{}) } else { delete(l.node.links, l) } l.node.lock.Unlock() case *linkMsgRegister: l.node.lock.Lock() if l.Sync && !l.Closed { if r := l.regs[m.pid]; r != nil { l.node.unregOne(r) } r := &Registration{pat: m.pat, active: true, willReply: true, recv: l} l.node.reg(r) l.regs[m.pid] = r } l.node.lock.Unlock() case linkMsgRegdone: if !l.Ready { l.Ready = true l.readyWr <- true close(l.readyWr) } case linkMsgUnregister: l.node.lock.Lock() if l.Sync && !l.Closed { if r := l.regs[int32(m)]; r != nil { l.node.unregOne(r) delete(l.regs, int32(m)) } } l.node.lock.Unlock() case *linkMsgTuple: l.node.lock.Lock() var r *ReturnPath if m.tid != 0 { r = &ReturnPath{obj: l, id: m.tid} } l.node.send(m.tup, r, l) l.node.lock.Unlock() case *linkMsgReply: // l.paths is protected by the node lock. (This lock does protect a lot...) l.node.lock.Lock() if r := l.paths[m.tid]; r != nil { r.reply(m.tup) } l.node.lock.Unlock() case linkMsgClose: l.node.lock.Lock() if r := l.paths[int32(m)]; r != nil { r.unref() } l.node.lock.Unlock() } return nil } // TODO: There should be a timeout on the handshake (+ remote pattern syncing) func (l *Link) reader() { // Make sure we've got a buffered reader with a large enough buffer to fit // any message. This isn't the most elegant approach, but should do the // trick. rd := bufio.NewReaderSize(l.io, 1*1024*1024) // Handshake line, prefix, err := rd.ReadLine() if prefix && err == nil { err = errors.New("Read buffer overflow.") } else if err == nil { err = l.handleHandshake(line) } // Message loop for err == nil { var m interface{} if m, err = linkMsgRead(rd); err == nil { err = l.handleMessage(m) } } if err != nil { l.err <- err return } } func (l *Link) shutdown() { l.node.lock.Lock() if !l.Closed { delete(l.node.links, l) for _, r := range l.regs { l.node.unregOne(r) } for _, r := range l.paths { r.unref() } l.Closed = true } l.node.lock.Unlock() } // The returned channel will get exactly a single message when the link is // closed. Either nil after a normal Close() or an error otherwise. func (l *Link) Start() <-chan error { err := make(chan error, 5) // Must be buffered to allow some extra errors (which will be ignored) uerr := make(chan error) l.err = err go l.reader() // A control goroutine to shut things down. This allows multiple writes to // l.err, but only the first is handled. go func() { var e error select { case e = <-err: case e = <-l.wr.ErrorCh(): } l.shutdown() l.io.Close() // should wake up the reader (with an error) uerr <- e }() // Send handshake l.wr.Write([]byte("ver,1.0 seri,json sero,json\n")) return uerr } // Makes sure any pending messages are flushed and then closes the link. func (l *Link) Close() { l.shutdown() <-l.wr.FlushCh() l.err <- nil } // Closes the link without flushing any outstanding data. func (l *Link) ForceClose() { l.err <- nil }