diff options
author | Yorhel <git@yorhel.nl> | 2012-03-30 10:13:40 +0200 |
---|---|---|
committer | Yorhel <git@yorhel.nl> | 2012-03-30 10:13:40 +0200 |
commit | 163fb3d7fbdb17a209518e7499a9538c6e7a917f (patch) | |
tree | 6de602433c54da986ce2d5a1e46bf2aba9fc6d89 |
Initial commit of my 3rd attempt at a Tanja implementation for Go
My previous attempts were miserable failures. This one is modelled after
my C implementation, so I hope it won't fail again.
Note the use of Mutexes instead of channels in some places, this makes
things easier for me as it more resembles the C implementation, and
should also be faster than an implementation completely based on
channels.
-rw-r--r-- | COPYING | 20 | ||||
-rw-r--r-- | README | 16 | ||||
-rw-r--r-- | msg.go | 26 | ||||
-rw-r--r-- | node.go | 57 | ||||
-rw-r--r-- | reg.go | 86 | ||||
-rw-r--r-- | ses.go | 142 | ||||
-rw-r--r-- | tanja_test.go | 73 | ||||
-rw-r--r-- | tuples.go | 130 |
8 files changed, 550 insertions, 0 deletions
@@ -0,0 +1,20 @@ +Copyright (c) 2012 Yoran Heling + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +"Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be included +in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, +TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE +SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -0,0 +1,16 @@ +Tanja implementation in Go +========================== + +INTRODUCTION + + This project implements an interface to the Tanja communication system in Go. + + An introduction to the basic concepts and ideas can be found in the following + article: + + http://dev.yorhel.nl/doc/commvis + + +CONTACT + + projects@yorhel.nl @@ -0,0 +1,26 @@ +package tanja + +type Message struct { + Tup *Tuple + ret *ReturnPath + reg *Registration +} + +// dual +type ReturnPath struct { + ref int +} + +func (m *Message) Dispatch() { + if f, ok := m.reg.data.(func(*Message)); ok { + f(m) + } +} + +func (*Message) Reply(t *Tuple) { + // TODO +} + +func (*Message) Close() { + // TODO +} @@ -0,0 +1,57 @@ +package tanja + +import ( + . "sync" +) + +type Node struct { + lock Mutex + lastId int32 + regs map[int32]*Registration + // TODO: list of links that are interested in registration messages +} + +func NewNode() *Node { + return &Node{regs: make(map[int32]*Registration)} +} + +func (n *Node) unregOne(id int32, r *Registration) { + // TODO: notify links + r.active = false + r.unreg() + delete(n.regs, id) +} + +func (n *Node) unregMatch(f func(r *Registration) bool) { + for id, r := range n.regs { + if f(r) { + n.unregOne(id, r) + } + } +} + +func (n *Node) reg(r *Registration) { + old := n.lastId + for old == n.lastId || n.regs[n.lastId] != nil { + n.lastId++ + if n.lastId < 0 { + n.lastId = 1 + } + } + r.id = n.lastId + n.regs[n.lastId] = r + r.active = true + // TODO: notify links +} + +func (n *Node) send(t *Tuple, path *ReturnPath) { + // TODO: Some way of preventing that a message is routed back to the link + // it came from. + n.lock.Lock() + defer n.lock.Unlock() + for _, r := range n.regs { + if t.Match(*r.pat) { + r.send(t, path) + } + } +} @@ -0,0 +1,86 @@ +package tanja + +type Registration struct { + pat *Tuple + id int32 + active bool + willReply bool + + // This is either a *Session or a *Link + recv interface{} + + // If obj is a *Session, then this is either: + // chan<- *Message: Application-handled channel + // func(*Message): Session-dispatched callback + // nil: Not registered yet + data interface{} +} + +func (r *Registration) node() *Node { + // TODO: also handle *Link + return r.recv.(*Session).node +} + +func (r *Registration) Close() { + n := r.node() + n.lock.Lock() + n.unregOne(r.id, r) + n.lock.Unlock() +} + +func (r *Registration) reg() { + if !r.active { + n := r.node() + n.lock.Lock() + n.reg(r) + if s, ok := r.recv.(*Session); ok { + if _, ok := r.data.(func(*Message)); ok { + s.refCalls() + } + } + n.lock.Unlock() + } +} + +// Called by the node if active goes to false. The node lock is held while this +// function is called. +func (r *Registration) unreg() { + if s, ok := r.recv.(*Session); ok { + if ch, ok := r.data.(chan<- *Message); ok { + // The receiver is a channel, tell the dispatcher to close it + s.disp <- ch + } else if _, ok := r.data.(func(*Message)); ok { + // Just wake up the dispatcher, as it might be trying to send to + // the session channel, which may be closed in the next call. + s.disp <- nil + // If the receiver is a callback, notify the session that there is + // one callback-registration less. (So it can close that channel if + // there are none left) + s.unrefCalls() + } + } +} + +// Called while the node lock is held. +func (r *Registration) send(t *Tuple, path *ReturnPath) { + if s, ok := r.recv.(*Session); ok { + s.disp <- &Message{t, path, r} + } + // TODO: links +} + +// Specific to session-registrations +// TODO? Calling Chan() after Callback() or the other way around will go wrong, +// because s.(un)refCalls isn't called. +func (r *Registration) Chan() <-chan *Message { + ch := make(chan *Message) + r.data = ch + r.reg() + return ch +} + +// Also specific to session-registrations +func (r *Registration) Callback(cb func(*Message)) { + r.data = cb + r.reg() +} @@ -0,0 +1,142 @@ +package tanja + +type Session struct { + node *Node + disp chan<- interface{} // either a *Message (queued) or a chan<-*Message (closed) + calls int // number of active callback registrations + callch chan *Message +} + +type sesQueue struct { + next, prev *sesQueue + msg *Message +} + +func (s *Session) dispatcher(ch <-chan interface{}) { + var head *sesQueue + var tail *sesQueue + rem := func() { + if tail.prev != nil { + tail.prev.next = nil + } else { + head = nil + } + tail = tail.prev + } +L: + for { + // Remove any inactive messages from the tail + for tail != nil && !tail.msg.reg.active { + tail.msg.Close() + rem() + } + // Get latest message + var snd chan<- *Message + var msg *Message + if tail != nil { + msg = tail.msg + if c, ok := msg.reg.data.(chan<- *Message); ok { + snd = c + } else { + snd = msg.reg.recv.(*Session).callch + } + } + // Send/receive + select { + case m, ok := <-ch: + if !ok { + break L + } else if msg, ok := m.(*Message); ok { + q := &sesQueue{head, nil, msg} + if head != nil { + head.prev = q + } else { + tail = q + } + head = q + } else if ch, ok := m.(chan<- *Message); ok { + close(ch) + } + case snd <- msg: + rem() + } + } + + // If the queue isn't empty, make sure to close the messages before exiting. + for n := tail; n != nil; n = n.prev { + n.msg.Close() + } +} + +func (n *Node) Session() *Session { + s := &Session{} + s.node = n + ch := make(chan interface{}) + s.disp = ch + go s.dispatcher(ch) + return s +} + +func (s *Session) Register(p *Tuple, willReply bool) *Registration { + r := &Registration{} + r.recv = s + r.pat = p + r.willReply = willReply + return r +} + +func (s *Session) Close() { + s.node.lock.Lock() + s.node.unregMatch(func(r *Registration) bool { + return r.recv == s + }) + s.node.lock.Unlock() + close(s.disp) + // TODO: close any incoming returnpaths +} + +// Must be called when the node lock is held +func (s *Session) refCalls() { + if s.calls == 0 { + s.callch = make(chan *Message) + } + s.calls++ +} + +// Must be called when the node lock is held +func (s *Session) unrefCalls() { + s.calls-- + if s.calls == 0 { + // It shouldn't be possible for the dispatcher to write to it at this + // point. No callback patterns are currently registered, and the + // dispatcher is synchronously notified when something is unregistered. + close(s.callch) + s.callch = nil + } +} + +func (s *Session) Chan() <-chan *Message { + // Ensure that we won't return nil when s.callch is modified in some other + // goroutine. + s.node.lock.Lock() + c := s.callch + s.node.lock.Unlock() + if c == nil { + c = make(chan *Message) + close(c) + } + return c +} + +func (s *Session) Run() { + for msg := range s.Chan() { + msg.Dispatch() + } +} + +func (s *Session) Send(t *Tuple, wantReply bool) *ReturnPath { + // TODO: fix a return path + var r *ReturnPath + s.node.send(t, r) + return r +} diff --git a/tanja_test.go b/tanja_test.go new file mode 100644 index 0000000..02603f7 --- /dev/null +++ b/tanja_test.go @@ -0,0 +1,73 @@ +package tanja + +import "testing" +import "reflect" + +func TestSess(tst *testing.T) { + node := NewNode() + if node == nil { + tst.Errorf("NewNode() returned nil") + } + done := make(chan int) + + go func() { + next := int64(0) + havequit := false + ses := node.Session() + if ses == nil { + tst.Errorf("Session()(1) returned nil") + } + reg := ses.Register(Tup("test"), false) + reg.Callback(func(m *Message) { + // Tuples must be received in order + if !reflect.DeepEqual(m.Tup, Tup("test", next)) { + tst.Errorf("Unexpected Tuple: %v", m.Tup) + } + // Returning false should prevent this callback from being called again + next++ + if next > 6 { + tst.Errorf("Tuple after unregistering: %v", m.Tup) + } else if next == 6 { + reg.Close() + } + }) + ses.Register(Tup("quit"), false).Callback(func(m *Message) { + havequit = true + ses.Close() + }) + ses.Register(Tup("last"), false).Callback(func(m *Message) { + tst.Errorf("Tuple after closing: %v", m.Tup) + }) + done <- 1 + ses.Run() + // next should be 6 + if next < 6 { + tst.Errorf("Not all test Tuples have been received (next=%d)", next) + } + // Run shouldn't return before Close() + if !havequit { + tst.Errorf("Run() stopped before Close()") + } + done <- 1 + }() + + // Make sure the stuff is registered before spawning the sending session. + <-done + + go func() { + ses := node.Session() + if ses == nil { + tst.Errorf("Session()(2) returned nil") + } + for i := int64(0); i < 10; i++ { + ses.Send(Tup("test", i), false) + } + ses.Send(Tup("quit"), false) + ses.Send(Tup("last"), false) + ses.Close() + done <- 1 + }() + + <-done + <-done +} diff --git a/tuples.go b/tuples.go new file mode 100644 index 0000000..de128be --- /dev/null +++ b/tuples.go @@ -0,0 +1,130 @@ +package tanja + +import ( + . "strconv" +) + +type Element struct { + // This is either nil, int64, string, float64, []Element or map[string]Element + E interface{} +} + +type Tuple []Element + +func (e Element) IsInt() (bool, int64) { + switch v := e.E.(type) { + case int64: + return true, v + case string: + n, r := ParseInt(v, 10, 64) + if r != nil { + return false, 0 + } else { + return true, n + } + case float64: + return true, int64(v) + } + return false, 0 +} + +// Just returns 0 on error +func (e Element) Int() int64 { + _, i := e.IsInt() + return i +} + +// Returns empty string on error +func (e Element) String() string { + switch v := e.E.(type) { + case int64: + return FormatInt(v, 10) + case string: + return v + case float64: + return FormatFloat(v, 'g', -1, 64) + } + return "" +} + +// Returns 0 on error +func (e Element) Float() float64 { + switch v := e.E.(type) { + case int64: + return float64(v) + case string: + n, r := ParseFloat(v, 64) + if r != nil { + return 0 + } else { + return n + } + case float64: + return v + } + return 0 +} + +// Whether this is a wildcard or not +func (e Element) WC() bool { + return e.E == nil +} + +// Returns an empty slice if this is no array +func (e Element) Slice() []Element { + if v, ok := e.E.([]Element); ok { + return v + } + return []Element{} +} + +// Returns a nil map if this is no map. +func (e Element) Map() map[string]Element { + if v, ok := e.E.(map[string]Element); ok { + return v + } + return nil +} + +func (a Element) Match(b Element) bool { + // Wildcards always match + if a.WC() || b.WC() { + return true + } + // Fuzzy int matching + if ca, ia := a.IsInt(); ca { + if cb, ib := b.IsInt(); cb && ia == ib { + return true + } + } + // Exact matching (works fine for strings) + return a.E == b.E +} + +// Same as Element{e} +// TODO: Also allow other int/float (and perhaps even composite types) and +// cast/convert those to valid elements? +func El(e interface{}) Element { + return Element{e} +} + +func (t Tuple) Match(p Tuple) bool { + if len(t) < len(p) { + return false + } + for i := range p { + if !t[i].Match(p[i]) { + return false + } + } + return true +} + +// Same as &Tuple{El(a), El(b), ..} +func Tup(t ...interface{}) *Tuple { + r := make(Tuple, len(t)) + for i, v := range t { + r[i] = El(v) + } + return &r +} |