From 1849ccbbc46fbcc57abaa150a691c80a3bd4a668 Mon Sep 17 00:00:00 2001 From: Yorhel Date: Sun, 8 Apr 2012 21:03:40 +0200 Subject: Added RegRPC() method for convenient RPC-like registering --- rpc.go | 136 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ rpc_test.go | 125 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ ses.go | 8 +++- 3 files changed, 267 insertions(+), 2 deletions(-) create mode 100644 rpc.go create mode 100644 rpc_test.go diff --git a/rpc.go b/rpc.go new file mode 100644 index 0000000..a31fdc3 --- /dev/null +++ b/rpc.go @@ -0,0 +1,136 @@ +package tanja + +import "reflect" + +// Registers a pattern for each of the methods of 'obj'. The method will be +// called from ses.Dispatch() (and thus ses.Run() if you use that) whenever a +// tuple has been received for the pattern. +// +// The function f() is called for each method with as its argument the name of +// that method. The function is expected to return the pattern to be +// registered. If nil is returned instead, no pattern will be registered for +// the method. +// +// The method type must conform to the following rules: +// - It should be exported (name must start with an uppercase letter) +// - It should not have any out parameters (return values) +// - It may have any number of arguments, as long as their types are allowed. +// - The method may not be variadic +// - The first argument may be of type *tanja.Message +// - Any other arguments must be of a type accepted by El() +// +// Methods that do not conform to these rules can not be registered and f() +// should return nil for them (Bad Things happen if it doesn't). +// +// If the first argument is of type *tanja.Message, the corresponding message +// is passed to the method. The method is then also registered with willReply +// set to true, so it MUST eventually call Message.Close(). +// If the first argument is not of type *tanja.Message, the method will have no +// way of replying to the incoming tuple and is thus registered with willReply +// set to false. +// +// Any remaining arguments are used to capture (and in a limited sense, +// validate) elements from the remainder of the received tuple. The 'remainder' +// being any elements after the pattern returned by f(). If no matching element +// exist in the received tuple or if the element can not be converted into the +// type that the method accepts, the method will not be called. +// +// Returns the number of patters registered. +func (s *Session) RegRPC(obj interface{}, f func(string) Tuple) int { + objv := reflect.ValueOf(obj) + objt := objv.Type() + num := 0 + for i := 0; i < objt.NumMethod(); i++ { + mm := objt.Method(i) + // get pattern + pat := f(mm.Name) + if pat == nil { + continue + } + // Determine how many elements to capture, and pad that many nil's to + // the pattern. (Ensures we only match tuples with enough elements) + mv := objv.Method(i) + mt := mv.Type() + offset := len(pat) + willReply := false + j := 0 + if mt.NumIn() > 0 && mt.In(0) == reflect.TypeOf((*Message)(nil)) { + willReply = true + j++ + } + for ; j < mt.NumIn(); j++ { + pat = append(pat, El(nil)) + } + // Generate callback and register + s.register(willReply, pat).Callback(regRPCGenCallback(mt, mv, offset, willReply)) + num++ + } + return num +} + +// Should be mostly the reverse of El(). Returns the zero value if the type is +// not supported or the element could not be converted. +func regRPCValue(at reflect.Type, el Element) (val reflect.Value) { + // Matching should be on 'at' itself rather than it's kind, since the type + // should really be a basic one. But I suspect that this is faster, as + // reflect.TypeOf() calls are probably not done at compile-time. + switch at.Kind() { + case reflect.Struct: + if at == reflect.TypeOf(Element{nil}) { + val = reflect.ValueOf(el) + } + case reflect.String: + if v, ok := el.IsString(); ok { + val = reflect.New(at).Elem() + val.SetString(v) + } + case reflect.Bool: + val = reflect.ValueOf(el.Bool()) + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + if v, ok := el.IsInt(); ok { + val = reflect.New(at).Elem() + val.SetInt(v) + } + case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: + if v, ok := el.IsInt(); ok && v >= 0 { + val = reflect.New(at).Elem() + val.SetUint(uint64(v)) + } + case reflect.Float32, reflect.Float64: + if v, ok := el.IsFloat(); ok { + val = reflect.New(at).Elem() + val.SetFloat(v) + } + case reflect.Slice: + if v, ok := el.E.([]Element); ok && at == reflect.TypeOf(el.E) { + val = reflect.ValueOf(v) + } + case reflect.Map: + if v, ok := el.E.(map[string]Element); ok && at == reflect.TypeOf(el.E) { + val = reflect.ValueOf(v) + } + } + return +} + +func regRPCGenCallback(mt reflect.Type, mv reflect.Value, offset int, willReply bool) func(*Message) { + return func(msg *Message) { + args := make([]reflect.Value, mt.NumIn()) + arg := 0 + if willReply { + args[0] = reflect.ValueOf(msg) + arg++ + } + off := offset + for arg < mt.NumIn() { + args[arg] = regRPCValue(mt.In(arg), msg.Tup[off]) + if !args[arg].IsValid() { + msg.Close() + return + } + off++ + arg++ + } + mv.Call(args) + } +} diff --git a/rpc_test.go b/rpc_test.go new file mode 100644 index 0000000..1a06b9e --- /dev/null +++ b/rpc_test.go @@ -0,0 +1,125 @@ +package tanja + +import ( + "reflect" + "strings" + "testing" +) + +type sT struct { + tst *testing.T + ses *Session + thiscalled int + replycalled int + closecalled int +} + +var testmap map[string]Element = map[string]Element{ + "a": El("val"), + "stttr": El(nil), +} + +var testslice []Element = []Element{El(0), El("")} + +func (t *sT) ExportThis(a float32, b uint8, c string, d bool, e map[string]Element, f []Element) { + if a != 10.5 { + t.tst.Errorf("a == %v != 10.5", a) + } + if b != 42 { + t.tst.Errorf("b == %v != 42", a) + } + if c != "str" { + t.tst.Errorf("c == %v != str", c) + } + if !d { + t.tst.Errorf("d == %v != true", d) + } + if !reflect.DeepEqual(e, testmap) { + t.tst.Errorf("e == %v != %v", e, testmap) + } + if !reflect.DeepEqual(f, testslice) { + t.tst.Errorf("f == %v != %v", f, testslice) + } + t.thiscalled++ +} + +func (t *sT) ExportReply(m *Message, s string) { + if s != "10" { + t.tst.Errorf("ExportReply string argument is '%s', expected '%s'", s, "argument") + } + if len(m.Tup) != 5 || m.Tup[4].String() != "extra" { + t.tst.Errorf("ExportReply invalid tuple length or missing extra element") + } + m.Reply(len(s)) + m.Close() + t.replycalled++ +} + +func (t *sT) ExportClose(s string) { + t.closecalled++ + if s == "really!" { + t.ses.Close() + } +} + +func (t *sT) DontExport() { + t.tst.Error("DontExport() called.") +} + +func TestRPC(tst *testing.T) { + node := NewNode() + ses := node.Session() + + obj := &sT{tst: tst, ses: ses} + n := ses.RegRPC(obj, func(s string) Tuple { + if strings.HasPrefix(s, "Export") { + return Tup("prefix", 1, strings.ToLower(s[6:])) + } + return nil + }) + if n != 3 { + tst.Fatalf("Number of exported methods is %d, expected %d", n, 3) + } + + go func() { + // These shouldn't match anything + ses.Send(false) + ses.Send(false, "") + ses.Send(false, "Prefix", 1, "close", 1) + ses.Send(false, "prefix", 2, "close", 1) + ses.Send(false, "prefix", 1, "Close", 1) + + // Try replying + r := ses.Send(true, "prefix", 1, nil, 10, "extra") + if t := <-r.Chan(); !reflect.DeepEqual(t, Tup(2)) || <-r.Chan() != nil { + tst.Error("Received invalid reply") + } + r.Close() + + // Should match and close immediately. + r = ses.Send(true, "prefix", 1, "close", 10.2) + if <-r.Chan() != nil { + tst.Error("Close did not immediately close the return path") + } + r.Close() + + // Testing various types + ses.Send(false, "prefix", 1, "this", 10.5, "42", "str", "some_true_value", testmap, testslice) + + // Actual close + ses.Send(false, "prefix", 1, "close", "really!") + // Shouldn't be received + ses.Send(false, "prefix", 1, nil, "str") + }() + + ses.Run() + if obj.thiscalled != 1 { + tst.Errorf("This is called %d times, expected %d", obj.thiscalled, 1) + } + if obj.closecalled != 3 { + tst.Errorf("Close is called %d times, expected %d", obj.closecalled, 3) + } + if obj.replycalled != 1 { + tst.Errorf("Close is called %d times, expected %d", obj.replycalled, 1) + } +} diff --git a/ses.go b/ses.go index 0f9e7b4..537b1a4 100644 --- a/ses.go +++ b/ses.go @@ -79,14 +79,18 @@ func (n *Node) Session() *Session { return s } -func (s *Session) Register(willReply bool, p ...interface{}) *Registration { +func (s *Session) register(willReply bool, p Tuple) *Registration { r := &Registration{} r.recv = s - r.pat = Tup(p...) + r.pat = p r.willReply = willReply return r } +func (s *Session) Register(willReply bool, p ...interface{}) *Registration { + return s.register(willReply, Tup(p...)) +} + func (s *Session) Close() { s.node.lock.Lock() s.node.unregMatch(func(r *Registration) bool { -- cgit v1.2.3