diff options
author | Yorhel <git@yorhel.nl> | 2012-04-10 16:41:51 +0200 |
---|---|---|
committer | Yorhel <git@yorhel.nl> | 2012-04-10 16:41:51 +0200 |
commit | 56251e336627d1ef64d29dfd93f178f7f857ba6d (patch) | |
tree | 3a48547306fea8ca17d193548b4a55f644d10d23 | |
parent | 7da6230ae0a3fbd003170639f3918fa77b98944a (diff) |
Added (a crappy but functional) userlist session
This also handles our user info for hubs, to avoid keeping a separate
"this user" entity in the hub session and associated tuples for managing
that.
-rw-r--r-- | src/globster/globster.go | 6 | ||||
-rw-r--r-- | src/globster/hub/hub.go | 12 | ||||
-rw-r--r-- | src/globster/userlist/list.go | 260 | ||||
-rw-r--r-- | src/globster/userlist/list_test.go | 58 |
4 files changed, 329 insertions, 7 deletions
diff --git a/src/globster/globster.go b/src/globster/globster.go index 19d5b56..6dde5d0 100644 --- a/src/globster/globster.go +++ b/src/globster/globster.go @@ -2,8 +2,9 @@ package main import ( "flag" - "go.blicky.net/tanja" "globster/hub" + "globster/userlist" + "go.blicky.net/tanja" "log" "net" ) @@ -47,6 +48,9 @@ func main() { hub := hub.New(n) go hub.Run() + userlist := userlist.New(n) + go userlist.Run() + if err := listen(n, *path); err != nil { log.Fatalf("Error listening on UNIX socket: %s", err) } diff --git a/src/globster/hub/hub.go b/src/globster/hub/hub.go index fb7b377..5bf7f79 100644 --- a/src/globster/hub/hub.go +++ b/src/globster/hub/hub.go @@ -3,20 +3,20 @@ package hub import ( "globster/adc" "go.blicky.net/tanja" - "time" "net" + "time" ) // TODO: Do the connect() and write()s in a separate goroutine. type hub struct { - name string - ses *tanja.Session - conn net.Conn + name string + ses *tanja.Session + conn net.Conn } func newHub(node *tanja.Node, name string) *hub { - s := &hub{name:name, ses:node.Session()} + s := &hub{name: name, ses: node.Session()} s.ses.RegRPC(s, func(s string) tanja.Tuple { if s[0] != 'T' { @@ -45,7 +45,7 @@ func (s *hub) Tconnect(addr string) { s.ses.Send(false, "hub", s.name, "Disconnected", e.Error()) } else { s.ses.Send(false, "hub", s.name, "Connected", s.conn.RemoteAddr().String()) - adc.NewWriter(s.conn).Write(&adc.Message{Header:adc.SUP.SetType('I')}) + adc.NewWriter(s.conn).Write(&adc.Message{Header: adc.SUP.SetType('I')}) } } diff --git a/src/globster/userlist/list.go b/src/globster/userlist/list.go new file mode 100644 index 0000000..012b1e2 --- /dev/null +++ b/src/globster/userlist/list.go @@ -0,0 +1,260 @@ +package userlist + +import "go.blicky.net/tanja" + +// Note: Using a map for each user is not very memory-efficient, but it's not +// much typing overhead and saves some conversions to and from tuple Elements. +// This is mostly a toy implementation of the user list, it'll really need a +// more memory-efficient rewrite when used in real-life situations and large +// hubs. Ideally, this entire package is implemented in C to use the most +// efficient packing possible while keeping things fast. +type user map[string]tanja.Element + +var elNil = tanja.El(nil) +var fieldDefs map[string]tanja.Element = map[string]tanja.Element{ + "Name": tanja.El(""), + "CID": tanja.El(""), + "TID": elNil, // set on creation + "GID": elNil, // virtual + "Desc": tanja.El(""), + "EMail": tanja.El(""), + "Client": tanja.El(""), + "Upload": tanja.El(""), + "Download": tanja.El(0), + "ShareSize": tanja.El(0), + "ShareFiles": tanja.El(0), + "KeyPrint": tanja.El(""), + "IP4": tanja.El(""), + "Slots": tanja.El(0), + "AutoSlots": tanja.El(0), + "HubsOp": tanja.El(0), + "HubsReg": tanja.El(0), + "HubsNorm": tanja.El(0), +} + +// Assumes the field exists +func (u user) get(field string) tanja.Element { + // GID is the only "virtual" field + if field == "GID" { + if _, ok := u["CID"]; !ok { + return u["TID"] + } else { + return u["CID"] + } + } + return u[field] +} + +func (u user) getFields(fields []string) tanja.Tuple { + if len(fields) == 0 { + // BUG: This doesn't include the 'GID' field + return tanja.Tup(map[string]tanja.Element(u)) + } + r := make(map[string]tanja.Element, len(fields)) + for _, v := range fields { + r[v] = u.get(v) + } + return tanja.Tup(r) +} + +// TODO: Some more validation than only existance of the field would be nice +func (u *user) set(field string, val tanja.Element) bool { + if fieldDefs[field] != elNil { + return false + } + (*u)[field] = val + return true +} + +func newUser(tid string) *user { + u := make(user, len(fieldDefs)) + for k, v := range fieldDefs { + if v != elNil { + u[k] = v + } + } + u["TID"] = tanja.El(tid) + return &u +} + +type hub struct { + ses *tanja.Session + name string + userTID map[string]*user // lookup by TID + userGID map[string]*user // lookup by GID (only if GID!=TID) + size int64 +} + +func newHub(node *tanja.Node, name string) *hub { + s := &hub{node.Session(), name, make(map[string]*user), make(map[string]*user), 0} + s.ses.RegRPC(s, func(s string) tanja.Tuple { + if s[0] != 'T' { + return nil + } + return tanja.Tup("userlist", name, s[1:]) + }) + go s.ses.Run() + return s +} + +func normalizeFields(fin []tanja.Element) []string { + fout := make([]string, 0, len(fin)) + for _, f := range fin { + str := f.String() + if str == "" { + return nil + } + if _, ok := fieldDefs[str]; !ok { + return nil + } + fout = append(fout, str) + } + return fout +} + +func (s *hub) Tget(m *tanja.Message, id tanja.Element, fields ...tanja.Element) { + defer m.Close() + flds := normalizeFields(fields) + if flds == nil { + return + } + if id.WC() { + for _, u := range s.userTID { + m.Replyt(u.getFields(flds)) + } + } else if ids, ok := id.IsString(); ok { + if u := s.userTID[ids]; u != nil { + m.Replyt(u.getFields(flds)) + } + } +} + +func (s *hub) Tgetid(m *tanja.Message, id string, fields ...tanja.Element) { + defer m.Close() + flds := normalizeFields(fields) + if flds == nil { + return + } + var u *user + if u = s.userGID[id]; u == nil { + if u = s.userTID[id]; u == nil { + return + } + } + m.Replyt(u.getFields(flds)) +} + +func (s *hub) Tset(id string, data map[string]tanja.Element) { + // Get or create user entry + u := s.userTID[id] + added := false + if u == nil { + u = newUser(id) + s.userTID[id] = u + added = true + } + old := make(map[string]tanja.Element) + new := make(map[string]tanja.Element) + + for k, v := range data { + if fieldDefs[k] == elNil { + continue + } + // All elements are scalar types and must be representable as a string, + // this makes for easy comparison. + olde := (*u)[k] + oldv := olde.String() + newv := v.String() + if oldv == newv { + continue + } + // Actual change, handle it + old[k] = olde + new[k] = v + (*u)[k] = v + // CID change, make sure to update the userGID table + if k == "CID" { + if oldv != "" { + delete(s.userGID, oldv) + } + if newv != "" { + s.userGID[newv] = u + } + } + // ShareSize change, make sure to update size statistic + if k == "ShareSize" { + s.size += v.Int() - olde.Int() + } + } + + if added { + s.ses.Send(false, "userlist", s.name, "Added", new) + } else if len(new) > 0 { + s.ses.Send(false, "userlist", s.name, "Changed", old, new) + } +} + +func (s *hub) Tdel(id tanja.Element) { + if id.WC() { + for _, u := range s.userTID { + s.ses.Send(false, "userlist", s.name, "Del", (*u)["TID"]) + } + s.userTID = make(map[string]*user) + s.userGID = make(map[string]*user) + s.size = 0 + + } else if ids, ok := id.IsString(); ok { + if u := s.userTID[ids]; u != nil { + delete(s.userTID, ids) + if v := (*u)["CID"].String(); v != "" { + delete(s.userGID, v) + } + s.size -= (*u)["ShareSize"].Int() + s.ses.Send(false, "userlist", s.name, "Del", ids) + } + } +} + +func (s *hub) Tstats(m *tanja.Message) { + m.Reply(len(s.userTID), s.size) + m.Close() +} + +type UserList struct { + ses *tanja.Session + hubs map[string]*hub +} + +func New(node *tanja.Node) *UserList { + s := &UserList{node.Session(), make(map[string]*hub)} + + // Magic redirector. Captures all tuples for ["userlist", *] and, if no + // session for that hub exists yet, creates one and forwards the tuple and + // any replies. If the hub session already exists the tuple is ignored. + s.ses.Register(true, "userlist", nil).Callback(func(m *tanja.Message) { + hub := m.Tup[1].String() + if hub == "" || s.hubs[hub] != nil { + m.Close() + return + } + s.hubs[hub] = newHub(node, hub) + r := s.ses.Sendt(true, m.Tup) + go func() { + for t := range r.Chan() { + m.Replyt(t) + } + r.Close() + m.Close() + }() + }) + + // TODO: periodically remove empty hubs - they are semantically equivalent + // to not existing in memory anyway. + + return s +} + +func (s *UserList) Run() { + s.ses.Run() + // TODO: Closing +} diff --git a/src/globster/userlist/list_test.go b/src/globster/userlist/list_test.go new file mode 100644 index 0000000..cc49836 --- /dev/null +++ b/src/globster/userlist/list_test.go @@ -0,0 +1,58 @@ +package userlist + +// TODO: Also test the following: +// - Notifications +// - Multiple userlists +// - More fields + +import ( + "go.blicky.net/tanja" + "reflect" + "testing" +) + +func retExpect(t *testing.T, id string, r *tanja.ReturnPath, ret ...tanja.Tuple) { + defer r.Close() + i := 0 + for v := range r.Chan() { + if i >= len(ret) { + t.Errorf("%s: Received reply too many: %#v", id, v) + return + } + if !reflect.DeepEqual(v, ret[i]) { + t.Errorf("%s: Incorrect reply(%d): %#v", id, i, v) + } + i++ + } + if i != len(ret) { + t.Errorf("%s: Unexpected number of replies, got %d expected %d", id, i, len(ret)) + } +} + +func TestList(t *testing.T) { + n := tanja.NewNode() + l := New(n) + go l.Run() + + s := n.Session() + + retExpect(t, "empty1", s.Send(true, "userlist")) + retExpect(t, "empty2", s.Send(true, "userlist", 1)) + retExpect(t, "empty3", s.Send(true, "userlist", 1, "get")) + retExpect(t, "empty4", s.Send(true, "userlist", 1, "get", nil)) + retExpect(t, "set1", s.Send(true, "userlist", 1, "set", 1, tanja.Map("Name", "1"))) + retExpect(t, "stats1", s.Send(true, "userlist", 1, "stats"), tanja.Tup(1, 0)) + retExpect(t, "set2", s.Send(true, "userlist", 1, "set", 2, tanja.Map("CID", "ABC", "ShareSize", 1023))) + retExpect(t, "stats2", s.Send(true, "userlist", 1, "stats"), tanja.Tup(2, 1023)) + retExpect(t, "get1", s.Send(true, "userlist", 1, "get", 1, "Name"), tanja.Tup(tanja.Map("Name", "1"))) + retExpect(t, "set3", s.Send(true, "userlist", 1, "set", 2, tanja.Map("ShareSize", 1020))) + retExpect(t, "stats3", s.Send(true, "userlist", 1, "stats"), tanja.Tup(2, 1020)) + retExpect(t, "set4", s.Send(true, "userlist", 1, "set", 1, tanja.Map("ShareSize", 10))) + retExpect(t, "stats4", s.Send(true, "userlist", 1, "stats"), tanja.Tup(2, 1030)) + retExpect(t, "getid1", s.Send(true, "userlist", 1, "getid", "ABC", "ShareSize"), tanja.Tup(tanja.Map("ShareSize", 1020))) + retExpect(t, "get5", s.Send(true, "userlist", 1, "get", nil, "ShareSize"), + tanja.Tup(tanja.Map("ShareSize", 10)), // Order is actually undefined + tanja.Tup(tanja.Map("ShareSize", 1020))) + retExpect(t, "del", s.Send(true, "userlist", 1, "del", nil)) + retExpect(t, "stats5", s.Send(true, "userlist", 1, "stats"), tanja.Tup(0, 0)) +} |