summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYorhel <git@yorhel.nl>2012-03-30 10:13:40 +0200
committerYorhel <git@yorhel.nl>2012-03-30 10:13:40 +0200
commit163fb3d7fbdb17a209518e7499a9538c6e7a917f (patch)
tree6de602433c54da986ce2d5a1e46bf2aba9fc6d89
Initial commit of my 3rd attempt at a Tanja implementation for Go
My previous attempts were miserable failures. This one is modelled after my C implementation, so I hope it won't fail again. Note the use of Mutexes instead of channels in some places, this makes things easier for me as it more resembles the C implementation, and should also be faster than an implementation completely based on channels.
-rw-r--r--COPYING20
-rw-r--r--README16
-rw-r--r--msg.go26
-rw-r--r--node.go57
-rw-r--r--reg.go86
-rw-r--r--ses.go142
-rw-r--r--tanja_test.go73
-rw-r--r--tuples.go130
8 files changed, 550 insertions, 0 deletions
diff --git a/COPYING b/COPYING
new file mode 100644
index 0000000..251b5a4
--- /dev/null
+++ b/COPYING
@@ -0,0 +1,20 @@
+Copyright (c) 2012 Yoran Heling
+
+Permission is hereby granted, free of charge, to any person obtaining
+a copy of this software and associated documentation files (the
+"Software"), to deal in the Software without restriction, including
+without limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
+
+The above copyright notice and this permission notice shall be included
+in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
+SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
diff --git a/README b/README
new file mode 100644
index 0000000..8422ef9
--- /dev/null
+++ b/README
@@ -0,0 +1,16 @@
+Tanja implementation in Go
+==========================
+
+INTRODUCTION
+
+ This project implements an interface to the Tanja communication system in Go.
+
+ An introduction to the basic concepts and ideas can be found in the following
+ article:
+
+ http://dev.yorhel.nl/doc/commvis
+
+
+CONTACT
+
+ projects@yorhel.nl
diff --git a/msg.go b/msg.go
new file mode 100644
index 0000000..f73e35d
--- /dev/null
+++ b/msg.go
@@ -0,0 +1,26 @@
+package tanja
+
+type Message struct {
+ Tup *Tuple
+ ret *ReturnPath
+ reg *Registration
+}
+
+// dual
+type ReturnPath struct {
+ ref int
+}
+
+func (m *Message) Dispatch() {
+ if f, ok := m.reg.data.(func(*Message)); ok {
+ f(m)
+ }
+}
+
+func (*Message) Reply(t *Tuple) {
+ // TODO
+}
+
+func (*Message) Close() {
+ // TODO
+}
diff --git a/node.go b/node.go
new file mode 100644
index 0000000..0e1d8b4
--- /dev/null
+++ b/node.go
@@ -0,0 +1,57 @@
+package tanja
+
+import (
+ . "sync"
+)
+
+type Node struct {
+ lock Mutex
+ lastId int32
+ regs map[int32]*Registration
+ // TODO: list of links that are interested in registration messages
+}
+
+func NewNode() *Node {
+ return &Node{regs: make(map[int32]*Registration)}
+}
+
+func (n *Node) unregOne(id int32, r *Registration) {
+ // TODO: notify links
+ r.active = false
+ r.unreg()
+ delete(n.regs, id)
+}
+
+func (n *Node) unregMatch(f func(r *Registration) bool) {
+ for id, r := range n.regs {
+ if f(r) {
+ n.unregOne(id, r)
+ }
+ }
+}
+
+func (n *Node) reg(r *Registration) {
+ old := n.lastId
+ for old == n.lastId || n.regs[n.lastId] != nil {
+ n.lastId++
+ if n.lastId < 0 {
+ n.lastId = 1
+ }
+ }
+ r.id = n.lastId
+ n.regs[n.lastId] = r
+ r.active = true
+ // TODO: notify links
+}
+
+func (n *Node) send(t *Tuple, path *ReturnPath) {
+ // TODO: Some way of preventing that a message is routed back to the link
+ // it came from.
+ n.lock.Lock()
+ defer n.lock.Unlock()
+ for _, r := range n.regs {
+ if t.Match(*r.pat) {
+ r.send(t, path)
+ }
+ }
+}
diff --git a/reg.go b/reg.go
new file mode 100644
index 0000000..cfcaa88
--- /dev/null
+++ b/reg.go
@@ -0,0 +1,86 @@
+package tanja
+
+type Registration struct {
+ pat *Tuple
+ id int32
+ active bool
+ willReply bool
+
+ // This is either a *Session or a *Link
+ recv interface{}
+
+ // If obj is a *Session, then this is either:
+ // chan<- *Message: Application-handled channel
+ // func(*Message): Session-dispatched callback
+ // nil: Not registered yet
+ data interface{}
+}
+
+func (r *Registration) node() *Node {
+ // TODO: also handle *Link
+ return r.recv.(*Session).node
+}
+
+func (r *Registration) Close() {
+ n := r.node()
+ n.lock.Lock()
+ n.unregOne(r.id, r)
+ n.lock.Unlock()
+}
+
+func (r *Registration) reg() {
+ if !r.active {
+ n := r.node()
+ n.lock.Lock()
+ n.reg(r)
+ if s, ok := r.recv.(*Session); ok {
+ if _, ok := r.data.(func(*Message)); ok {
+ s.refCalls()
+ }
+ }
+ n.lock.Unlock()
+ }
+}
+
+// Called by the node if active goes to false. The node lock is held while this
+// function is called.
+func (r *Registration) unreg() {
+ if s, ok := r.recv.(*Session); ok {
+ if ch, ok := r.data.(chan<- *Message); ok {
+ // The receiver is a channel, tell the dispatcher to close it
+ s.disp <- ch
+ } else if _, ok := r.data.(func(*Message)); ok {
+ // Just wake up the dispatcher, as it might be trying to send to
+ // the session channel, which may be closed in the next call.
+ s.disp <- nil
+ // If the receiver is a callback, notify the session that there is
+ // one callback-registration less. (So it can close that channel if
+ // there are none left)
+ s.unrefCalls()
+ }
+ }
+}
+
+// Called while the node lock is held.
+func (r *Registration) send(t *Tuple, path *ReturnPath) {
+ if s, ok := r.recv.(*Session); ok {
+ s.disp <- &Message{t, path, r}
+ }
+ // TODO: links
+}
+
+// Specific to session-registrations
+// TODO? Calling Chan() after Callback() or the other way around will go wrong,
+// because s.(un)refCalls isn't called.
+func (r *Registration) Chan() <-chan *Message {
+ ch := make(chan *Message)
+ r.data = ch
+ r.reg()
+ return ch
+}
+
+// Also specific to session-registrations
+func (r *Registration) Callback(cb func(*Message)) {
+ r.data = cb
+ r.reg()
+}
diff --git a/ses.go b/ses.go
new file mode 100644
index 0000000..160a037
--- /dev/null
+++ b/ses.go
@@ -0,0 +1,142 @@
+package tanja
+
+type Session struct {
+ node *Node
+ disp chan<- interface{} // either a *Message (queued) or a chan<-*Message (closed)
+ calls int // number of active callback registrations
+ callch chan *Message
+}
+
+type sesQueue struct {
+ next, prev *sesQueue
+ msg *Message
+}
+
+func (s *Session) dispatcher(ch <-chan interface{}) {
+ var head *sesQueue
+ var tail *sesQueue
+ rem := func() {
+ if tail.prev != nil {
+ tail.prev.next = nil
+ } else {
+ head = nil
+ }
+ tail = tail.prev
+ }
+L:
+ for {
+ // Remove any inactive messages from the tail
+ for tail != nil && !tail.msg.reg.active {
+ tail.msg.Close()
+ rem()
+ }
+ // Get latest message
+ var snd chan<- *Message
+ var msg *Message
+ if tail != nil {
+ msg = tail.msg
+ if c, ok := msg.reg.data.(chan<- *Message); ok {
+ snd = c
+ } else {
+ snd = msg.reg.recv.(*Session).callch
+ }
+ }
+ // Send/receive
+ select {
+ case m, ok := <-ch:
+ if !ok {
+ break L
+ } else if msg, ok := m.(*Message); ok {
+ q := &sesQueue{head, nil, msg}
+ if head != nil {
+ head.prev = q
+ } else {
+ tail = q
+ }
+ head = q
+ } else if ch, ok := m.(chan<- *Message); ok {
+ close(ch)
+ }
+ case snd <- msg:
+ rem()
+ }
+ }
+
+ // If the queue isn't empty, make sure to close the messages before exiting.
+ for n := tail; n != nil; n = n.prev {
+ n.msg.Close()
+ }
+}
+
+func (n *Node) Session() *Session {
+ s := &Session{}
+ s.node = n
+ ch := make(chan interface{})
+ s.disp = ch
+ go s.dispatcher(ch)
+ return s
+}
+
+func (s *Session) Register(p *Tuple, willReply bool) *Registration {
+ r := &Registration{}
+ r.recv = s
+ r.pat = p
+ r.willReply = willReply
+ return r
+}
+
+func (s *Session) Close() {
+ s.node.lock.Lock()
+ s.node.unregMatch(func(r *Registration) bool {
+ return r.recv == s
+ })
+ s.node.lock.Unlock()
+ close(s.disp)
+ // TODO: close any incoming returnpaths
+}
+
+// Must be called when the node lock is held
+func (s *Session) refCalls() {
+ if s.calls == 0 {
+ s.callch = make(chan *Message)
+ }
+ s.calls++
+}
+
+// Must be called when the node lock is held
+func (s *Session) unrefCalls() {
+ s.calls--
+ if s.calls == 0 {
+ // It shouldn't be possible for the dispatcher to write to it at this
+ // point. No callback patterns are currently registered, and the
+ // dispatcher is synchronously notified when something is unregistered.
+ close(s.callch)
+ s.callch = nil
+ }
+}
+
+func (s *Session) Chan() <-chan *Message {
+ // Ensure that we won't return nil when s.callch is modified in some other
+ // goroutine.
+ s.node.lock.Lock()
+ c := s.callch
+ s.node.lock.Unlock()
+ if c == nil {
+ c = make(chan *Message)
+ close(c)
+ }
+ return c
+}
+
+func (s *Session) Run() {
+ for msg := range s.Chan() {
+ msg.Dispatch()
+ }
+}
+
+func (s *Session) Send(t *Tuple, wantReply bool) *ReturnPath {
+ // TODO: fix a return path
+ var r *ReturnPath
+ s.node.send(t, r)
+ return r
+}
diff --git a/tanja_test.go b/tanja_test.go
new file mode 100644
index 0000000..02603f7
--- /dev/null
+++ b/tanja_test.go
@@ -0,0 +1,73 @@
+package tanja
+
+import "testing"
+import "reflect"
+
+func TestSess(tst *testing.T) {
+ node := NewNode()
+ if node == nil {
+ tst.Errorf("NewNode() returned nil")
+ }
+ done := make(chan int)
+
+ go func() {
+ next := int64(0)
+ havequit := false
+ ses := node.Session()
+ if ses == nil {
+ tst.Errorf("Session()(1) returned nil")
+ }
+ reg := ses.Register(Tup("test"), false)
+ reg.Callback(func(m *Message) {
+ // Tuples must be received in order
+ if !reflect.DeepEqual(m.Tup, Tup("test", next)) {
+ tst.Errorf("Unexpected Tuple: %v", m.Tup)
+ }
+ // Returning false should prevent this callback from being called again
+ next++
+ if next > 6 {
+ tst.Errorf("Tuple after unregistering: %v", m.Tup)
+ } else if next == 6 {
+ reg.Close()
+ }
+ })
+ ses.Register(Tup("quit"), false).Callback(func(m *Message) {
+ havequit = true
+ ses.Close()
+ })
+ ses.Register(Tup("last"), false).Callback(func(m *Message) {
+ tst.Errorf("Tuple after closing: %v", m.Tup)
+ })
+ done <- 1
+ ses.Run()
+ // next should be 6
+ if next < 6 {
+ tst.Errorf("Not all test Tuples have been received (next=%d)", next)
+ }
+ // Run shouldn't return before Close()
+ if !havequit {
+ tst.Errorf("Run() stopped before Close()")
+ }
+ done <- 1
+ }()
+
+ // Make sure the stuff is registered before spawning the sending session.
+ <-done
+
+ go func() {
+ ses := node.Session()
+ if ses == nil {
+ tst.Errorf("Session()(2) returned nil")
+ }
+ for i := int64(0); i < 10; i++ {
+ ses.Send(Tup("test", i), false)
+ }
+ ses.Send(Tup("quit"), false)
+ ses.Send(Tup("last"), false)
+ ses.Close()
+ done <- 1
+ }()
+
+ <-done
+ <-done
+}
diff --git a/tuples.go b/tuples.go
new file mode 100644
index 0000000..de128be
--- /dev/null
+++ b/tuples.go
@@ -0,0 +1,130 @@
+package tanja
+
+import (
+ . "strconv"
+)
+
+type Element struct {
+ // This is either nil, int64, string, float64, []Element or map[string]Element
+ E interface{}
+}
+
+type Tuple []Element
+
+func (e Element) IsInt() (bool, int64) {
+ switch v := e.E.(type) {
+ case int64:
+ return true, v
+ case string:
+ n, r := ParseInt(v, 10, 64)
+ if r != nil {
+ return false, 0
+ } else {
+ return true, n
+ }
+ case float64:
+ return true, int64(v)
+ }
+ return false, 0
+}
+
+// Just returns 0 on error
+func (e Element) Int() int64 {
+ _, i := e.IsInt()
+ return i
+}
+
+// Returns empty string on error
+func (e Element) String() string {
+ switch v := e.E.(type) {
+ case int64:
+ return FormatInt(v, 10)
+ case string:
+ return v
+ case float64:
+ return FormatFloat(v, 'g', -1, 64)
+ }
+ return ""
+}
+
+// Returns 0 on error
+func (e Element) Float() float64 {
+ switch v := e.E.(type) {
+ case int64:
+ return float64(v)
+ case string:
+ n, r := ParseFloat(v, 64)
+ if r != nil {
+ return 0
+ } else {
+ return n
+ }
+ case float64:
+ return v
+ }
+ return 0
+}
+
+// Whether this is a wildcard or not
+func (e Element) WC() bool {
+ return e.E == nil
+}
+
+// Returns an empty slice if this is no array
+func (e Element) Slice() []Element {
+ if v, ok := e.E.([]Element); ok {
+ return v
+ }
+ return []Element{}
+}
+
+// Returns a nil map if this is no map.
+func (e Element) Map() map[string]Element {
+ if v, ok := e.E.(map[string]Element); ok {
+ return v
+ }
+ return nil
+}
+
+func (a Element) Match(b Element) bool {
+ // Wildcards always match
+ if a.WC() || b.WC() {
+ return true
+ }
+ // Fuzzy int matching
+ if ca, ia := a.IsInt(); ca {
+ if cb, ib := b.IsInt(); cb && ia == ib {
+ return true
+ }
+ }
+ // Exact matching (works fine for strings)
+ return a.E == b.E
+}
+
+// Same as Element{e}
+// TODO: Also allow other int/float (and perhaps even composite types) and
+// cast/convert those to valid elements?
+func El(e interface{}) Element {
+ return Element{e}
+}
+
+func (t Tuple) Match(p Tuple) bool {
+ if len(t) < len(p) {
+ return false
+ }
+ for i := range p {
+ if !t[i].Match(p[i]) {
+ return false
+ }
+ }
+ return true
+}
+
+// Same as &Tuple{El(a), El(b), ..}
+func Tup(t ...interface{}) *Tuple {
+ r := make(Tuple, len(t))
+ for i, v := range t {
+ r[i] = El(v)
+ }
+ return &r
+}