summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--rpc.go136
-rw-r--r--rpc_test.go125
-rw-r--r--ses.go8
3 files changed, 267 insertions, 2 deletions
diff --git a/rpc.go b/rpc.go
new file mode 100644
index 0000000..a31fdc3
--- /dev/null
+++ b/rpc.go
@@ -0,0 +1,136 @@
+package tanja
+
+import "reflect"
+
+// Registers a pattern for each of the methods of 'obj'. The method will be
+// called from ses.Dispatch() (and thus ses.Run() if you use that) whenever a
+// tuple has been received for the pattern.
+//
+// The function f() is called for each method with as its argument the name of
+// that method. The function is expected to return the pattern to be
+// registered. If nil is returned instead, no pattern will be registered for
+// the method.
+//
+// The method type must conform to the following rules:
+// - It should be exported (name must start with an uppercase letter)
+// - It should not have any out parameters (return values)
+// - It may have any number of arguments, as long as their types are allowed.
+// - The method may not be variadic
+// - The first argument may be of type *tanja.Message
+// - Any other arguments must be of a type accepted by El()
+//
+// Methods that do not conform to these rules can not be registered and f()
+// should return nil for them (Bad Things happen if it doesn't).
+//
+// If the first argument is of type *tanja.Message, the corresponding message
+// is passed to the method. The method is then also registered with willReply
+// set to true, so it MUST eventually call Message.Close().
+// If the first argument is not of type *tanja.Message, the method will have no
+// way of replying to the incoming tuple and is thus registered with willReply
+// set to false.
+//
+// Any remaining arguments are used to capture (and in a limited sense,
+// validate) elements from the remainder of the received tuple. The 'remainder'
+// being any elements after the pattern returned by f(). If no matching element
+// exist in the received tuple or if the element can not be converted into the
+// type that the method accepts, the method will not be called.
+//
+// Returns the number of patters registered.
+func (s *Session) RegRPC(obj interface{}, f func(string) Tuple) int {
+ objv := reflect.ValueOf(obj)
+ objt := objv.Type()
+ num := 0
+ for i := 0; i < objt.NumMethod(); i++ {
+ mm := objt.Method(i)
+ // get pattern
+ pat := f(mm.Name)
+ if pat == nil {
+ continue
+ }
+ // Determine how many elements to capture, and pad that many nil's to
+ // the pattern. (Ensures we only match tuples with enough elements)
+ mv := objv.Method(i)
+ mt := mv.Type()
+ offset := len(pat)
+ willReply := false
+ j := 0
+ if mt.NumIn() > 0 && mt.In(0) == reflect.TypeOf((*Message)(nil)) {
+ willReply = true
+ j++
+ }
+ for ; j < mt.NumIn(); j++ {
+ pat = append(pat, El(nil))
+ }
+ // Generate callback and register
+ s.register(willReply, pat).Callback(regRPCGenCallback(mt, mv, offset, willReply))
+ num++
+ }
+ return num
+}
+
+// Should be mostly the reverse of El(). Returns the zero value if the type is
+// not supported or the element could not be converted.
+func regRPCValue(at reflect.Type, el Element) (val reflect.Value) {
+ // Matching should be on 'at' itself rather than it's kind, since the type
+ // should really be a basic one. But I suspect that this is faster, as
+ // reflect.TypeOf() calls are probably not done at compile-time.
+ switch at.Kind() {
+ case reflect.Struct:
+ if at == reflect.TypeOf(Element{nil}) {
+ val = reflect.ValueOf(el)
+ }
+ case reflect.String:
+ if v, ok := el.IsString(); ok {
+ val = reflect.New(at).Elem()
+ val.SetString(v)
+ }
+ case reflect.Bool:
+ val = reflect.ValueOf(el.Bool())
+ case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
+ if v, ok := el.IsInt(); ok {
+ val = reflect.New(at).Elem()
+ val.SetInt(v)
+ }
+ case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
+ if v, ok := el.IsInt(); ok && v >= 0 {
+ val = reflect.New(at).Elem()
+ val.SetUint(uint64(v))
+ }
+ case reflect.Float32, reflect.Float64:
+ if v, ok := el.IsFloat(); ok {
+ val = reflect.New(at).Elem()
+ val.SetFloat(v)
+ }
+ case reflect.Slice:
+ if v, ok := el.E.([]Element); ok && at == reflect.TypeOf(el.E) {
+ val = reflect.ValueOf(v)
+ }
+ case reflect.Map:
+ if v, ok := el.E.(map[string]Element); ok && at == reflect.TypeOf(el.E) {
+ val = reflect.ValueOf(v)
+ }
+ }
+ return
+}
+
+func regRPCGenCallback(mt reflect.Type, mv reflect.Value, offset int, willReply bool) func(*Message) {
+ return func(msg *Message) {
+ args := make([]reflect.Value, mt.NumIn())
+ arg := 0
+ if willReply {
+ args[0] = reflect.ValueOf(msg)
+ arg++
+ }
+ off := offset
+ for arg < mt.NumIn() {
+ args[arg] = regRPCValue(mt.In(arg), msg.Tup[off])
+ if !args[arg].IsValid() {
+ msg.Close()
+ return
+ }
+ off++
+ arg++
+ }
+ mv.Call(args)
+ }
+}
diff --git a/rpc_test.go b/rpc_test.go
new file mode 100644
index 0000000..1a06b9e
--- /dev/null
+++ b/rpc_test.go
@@ -0,0 +1,125 @@
+package tanja
+
+import (
+ "reflect"
+ "strings"
+ "testing"
+)
+
+type sT struct {
+ tst *testing.T
+ ses *Session
+ thiscalled int
+ replycalled int
+ closecalled int
+}
+
+var testmap map[string]Element = map[string]Element{
+ "a": El("val"),
+ "stttr": El(nil),
+}
+
+var testslice []Element = []Element{El(0), El("")}
+
+func (t *sT) ExportThis(a float32, b uint8, c string, d bool, e map[string]Element, f []Element) {
+ if a != 10.5 {
+ t.tst.Errorf("a == %v != 10.5", a)
+ }
+ if b != 42 {
+ t.tst.Errorf("b == %v != 42", a)
+ }
+ if c != "str" {
+ t.tst.Errorf("c == %v != str", c)
+ }
+ if !d {
+ t.tst.Errorf("d == %v != true", d)
+ }
+ if !reflect.DeepEqual(e, testmap) {
+ t.tst.Errorf("e == %v != %v", e, testmap)
+ }
+ if !reflect.DeepEqual(f, testslice) {
+ t.tst.Errorf("f == %v != %v", f, testslice)
+ }
+ t.thiscalled++
+}
+
+func (t *sT) ExportReply(m *Message, s string) {
+ if s != "10" {
+ t.tst.Errorf("ExportReply string argument is '%s', expected '%s'", s, "argument")
+ }
+ if len(m.Tup) != 5 || m.Tup[4].String() != "extra" {
+ t.tst.Errorf("ExportReply invalid tuple length or missing extra element")
+ }
+ m.Reply(len(s))
+ m.Close()
+ t.replycalled++
+}
+
+func (t *sT) ExportClose(s string) {
+ t.closecalled++
+ if s == "really!" {
+ t.ses.Close()
+ }
+}
+
+func (t *sT) DontExport() {
+ t.tst.Error("DontExport() called.")
+}
+
+func TestRPC(tst *testing.T) {
+ node := NewNode()
+ ses := node.Session()
+
+ obj := &sT{tst: tst, ses: ses}
+ n := ses.RegRPC(obj, func(s string) Tuple {
+ if strings.HasPrefix(s, "Export") {
+ return Tup("prefix", 1, strings.ToLower(s[6:]))
+ }
+ return nil
+ })
+ if n != 3 {
+ tst.Fatalf("Number of exported methods is %d, expected %d", n, 3)
+ }
+
+ go func() {
+ // These shouldn't match anything
+ ses.Send(false)
+ ses.Send(false, "")
+ ses.Send(false, "Prefix", 1, "close", 1)
+ ses.Send(false, "prefix", 2, "close", 1)
+ ses.Send(false, "prefix", 1, "Close", 1)
+
+ // Try replying
+ r := ses.Send(true, "prefix", 1, nil, 10, "extra")
+ if t := <-r.Chan(); !reflect.DeepEqual(t, Tup(2)) || <-r.Chan() != nil {
+ tst.Error("Received invalid reply")
+ }
+ r.Close()
+
+ // Should match and close immediately.
+ r = ses.Send(true, "prefix", 1, "close", 10.2)
+ if <-r.Chan() != nil {
+ tst.Error("Close did not immediately close the return path")
+ }
+ r.Close()
+
+ // Testing various types
+ ses.Send(false, "prefix", 1, "this", 10.5, "42", "str", "some_true_value", testmap, testslice)
+
+ // Actual close
+ ses.Send(false, "prefix", 1, "close", "really!")
+ // Shouldn't be received
+ ses.Send(false, "prefix", 1, nil, "str")
+ }()
+
+ ses.Run()
+ if obj.thiscalled != 1 {
+ tst.Errorf("This is called %d times, expected %d", obj.thiscalled, 1)
+ }
+ if obj.closecalled != 3 {
+ tst.Errorf("Close is called %d times, expected %d", obj.closecalled, 3)
+ }
+ if obj.replycalled != 1 {
+ tst.Errorf("Close is called %d times, expected %d", obj.replycalled, 1)
+ }
+}
diff --git a/ses.go b/ses.go
index 0f9e7b4..537b1a4 100644
--- a/ses.go
+++ b/ses.go
@@ -79,14 +79,18 @@ func (n *Node) Session() *Session {
return s
}
-func (s *Session) Register(willReply bool, p ...interface{}) *Registration {
+func (s *Session) register(willReply bool, p Tuple) *Registration {
r := &Registration{}
r.recv = s
- r.pat = Tup(p...)
+ r.pat = p
r.willReply = willReply
return r
}
+func (s *Session) Register(willReply bool, p ...interface{}) *Registration {
+ return s.register(willReply, Tup(p...))
+}
+
func (s *Session) Close() {
s.node.lock.Lock()
s.node.unregMatch(func(r *Registration) bool {