summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYorhel <git@yorhel.nl>2012-03-31 17:58:58 +0200
committerYorhel <git@yorhel.nl>2012-03-31 17:58:58 +0200
commitd09103ce98ea843c6eea5ec2decdc34b29185824 (patch)
tree7095f80ce0ebdc30c160d498ddc58816b993b34e
parent13fc4f3c933bf666f5efa62ed743d5afc0050675 (diff)
Started on a link implementation
Still a lot of TODO's left.
-rw-r--r--link.go136
1 files changed, 136 insertions, 0 deletions
diff --git a/link.go b/link.go
new file mode 100644
index 0000000..303bec2
--- /dev/null
+++ b/link.go
@@ -0,0 +1,136 @@
+package tanja
+
+// TODO: Abstract out the JSON encoding/decoding to allow for other
+// serialisation formats.
+
+import (
+ "bufio"
+ "errors"
+ "io"
+)
+
+type Link struct {
+ Sync bool
+ Ready bool
+ ReadyCh <-chan bool
+ readyWr chan<- bool
+ node *Node
+ io io.ReadWriteCloser
+ err chan<- error
+ wr chan<- interface{}
+}
+
+func (n *Node) Link(io io.ReadWriteCloser) (l *Link) {
+ l = &Link{}
+ l.Sync = true
+ ready := make(chan bool, 1)
+ l.ReadyCh = ready
+ l.readyWr = ready
+ l.io = io
+ l.node = n
+ return l
+}
+
+func (l *Link) formatMessage(msg interface{}) []byte {
+ return nil
+ // TODO
+}
+
+// TODO: Timeouts
+func (l *Link) writer(in <-chan interface{}) {
+ // Send handshake
+ if _, e := l.io.Write([]byte("ver,1.0 seri,json sero,json\n")); e != nil {
+ l.err <- e
+ return
+ }
+ // Request remote patterns when sync is enabled
+ if l.Sync {
+ if _, e := l.io.Write([]byte("[1,true]\n")); e != nil {
+ l.err <- e
+ return
+ }
+ } else {
+ l.Ready = true
+ l.readyWr <- true
+ }
+ // TODO: add an unbounded (to some extent) buffer to avoid blocking sends to l.wr.
+ for msg := range in {
+ if msg == nil {
+ return
+ }
+ if _, e := l.io.Write(l.formatMessage(msg)); e != nil {
+ l.err <- e
+ return
+ }
+ }
+}
+
+func (l *Link) readHandshake(buf []byte) error {
+ // TODO
+ return nil
+}
+
+func (l *Link) parseMessage(buf []byte) (interface{}, error) {
+ // TODO
+ return nil, nil
+}
+
+func (l *Link) handleMessage(msg interface{}) error {
+ // TODO
+ return nil
+}
+
+// TODO: There should be a timeout on the handshake (+ remote pattern syncing)
+func (l *Link) reader() {
+ // Make sure we've got a buffered reader with a large enough buffer to fit
+ // any message. This isn't the most elegant approach, but should do the
+ // trick.
+ rd := bufio.NewReaderSize(l.io, 1*1024*1024)
+ handshaked := false
+
+ for {
+ var m interface{}
+ line, prefix, err := rd.ReadLine()
+ if prefix || err != nil {
+ if err == nil {
+ err = errors.New("Read buffer overflow.")
+ }
+ } else if !handshaked {
+ err = l.readHandshake(line)
+ } else if m, err = l.parseMessage(line); err == nil {
+ err = l.handleMessage(m)
+ }
+
+ if err != nil {
+ l.err <- err
+ return
+ }
+ }
+}
+
+// The returned channel will get exactly a single message when the link is
+// closed. Either nil after a normal Close() or an error otherwise.
+func (l *Link) Start() <-chan error {
+ err := make(chan error, 5) // Must be buffered to allow some extra errors (which will be ignored)
+ wr := make(chan interface{}, 5)
+ uerr := make(chan error)
+ l.err = err
+ l.wr = wr
+
+ go l.writer(wr)
+ go l.reader()
+
+ // A control goroutine to shut things down. This allows multiple writes to
+ // l.err, but only the first is handled.
+ go func() {
+ e := <-err
+ l.wr <- nil // tell the writer to shut down
+ l.io.Close() // should wake up the reader (with an error)
+ uerr <- e
+ }()
+ return uerr
+}
+
+func (l *Link) Close() {
+ l.err <- nil
+}