diff options
author | Yorhel <git@yorhel.nl> | 2012-03-31 17:58:58 +0200 |
---|---|---|
committer | Yorhel <git@yorhel.nl> | 2012-03-31 17:58:58 +0200 |
commit | d09103ce98ea843c6eea5ec2decdc34b29185824 (patch) | |
tree | 7095f80ce0ebdc30c160d498ddc58816b993b34e | |
parent | 13fc4f3c933bf666f5efa62ed743d5afc0050675 (diff) |
Started on a link implementation
Still a lot of TODO's left.
-rw-r--r-- | link.go | 136 |
1 files changed, 136 insertions, 0 deletions
@@ -0,0 +1,136 @@ +package tanja + +// TODO: Abstract out the JSON encoding/decoding to allow for other +// serialisation formats. + +import ( + "bufio" + "errors" + "io" +) + +type Link struct { + Sync bool + Ready bool + ReadyCh <-chan bool + readyWr chan<- bool + node *Node + io io.ReadWriteCloser + err chan<- error + wr chan<- interface{} +} + +func (n *Node) Link(io io.ReadWriteCloser) (l *Link) { + l = &Link{} + l.Sync = true + ready := make(chan bool, 1) + l.ReadyCh = ready + l.readyWr = ready + l.io = io + l.node = n + return l +} + +func (l *Link) formatMessage(msg interface{}) []byte { + return nil + // TODO +} + +// TODO: Timeouts +func (l *Link) writer(in <-chan interface{}) { + // Send handshake + if _, e := l.io.Write([]byte("ver,1.0 seri,json sero,json\n")); e != nil { + l.err <- e + return + } + // Request remote patterns when sync is enabled + if l.Sync { + if _, e := l.io.Write([]byte("[1,true]\n")); e != nil { + l.err <- e + return + } + } else { + l.Ready = true + l.readyWr <- true + } + // TODO: add an unbounded (to some extent) buffer to avoid blocking sends to l.wr. + for msg := range in { + if msg == nil { + return + } + if _, e := l.io.Write(l.formatMessage(msg)); e != nil { + l.err <- e + return + } + } +} + +func (l *Link) readHandshake(buf []byte) error { + // TODO + return nil +} + +func (l *Link) parseMessage(buf []byte) (interface{}, error) { + // TODO + return nil, nil +} + +func (l *Link) handleMessage(msg interface{}) error { + // TODO + return nil +} + +// TODO: There should be a timeout on the handshake (+ remote pattern syncing) +func (l *Link) reader() { + // Make sure we've got a buffered reader with a large enough buffer to fit + // any message. This isn't the most elegant approach, but should do the + // trick. + rd := bufio.NewReaderSize(l.io, 1*1024*1024) + handshaked := false + + for { + var m interface{} + line, prefix, err := rd.ReadLine() + if prefix || err != nil { + if err == nil { + err = errors.New("Read buffer overflow.") + } + } else if !handshaked { + err = l.readHandshake(line) + } else if m, err = l.parseMessage(line); err == nil { + err = l.handleMessage(m) + } + + if err != nil { + l.err <- err + return + } + } +} + +// The returned channel will get exactly a single message when the link is +// closed. Either nil after a normal Close() or an error otherwise. +func (l *Link) Start() <-chan error { + err := make(chan error, 5) // Must be buffered to allow some extra errors (which will be ignored) + wr := make(chan interface{}, 5) + uerr := make(chan error) + l.err = err + l.wr = wr + + go l.writer(wr) + go l.reader() + + // A control goroutine to shut things down. This allows multiple writes to + // l.err, but only the first is handled. + go func() { + e := <-err + l.wr <- nil // tell the writer to shut down + l.io.Close() // should wake up the reader (with an error) + uerr <- e + }() + return uerr +} + +func (l *Link) Close() { + l.err <- nil +} |