summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/globster/globster.go6
-rw-r--r--src/globster/hub/hub.go12
-rw-r--r--src/globster/userlist/list.go260
-rw-r--r--src/globster/userlist/list_test.go58
4 files changed, 329 insertions, 7 deletions
diff --git a/src/globster/globster.go b/src/globster/globster.go
index 19d5b56..6dde5d0 100644
--- a/src/globster/globster.go
+++ b/src/globster/globster.go
@@ -2,8 +2,9 @@ package main
import (
"flag"
- "go.blicky.net/tanja"
"globster/hub"
+ "globster/userlist"
+ "go.blicky.net/tanja"
"log"
"net"
)
@@ -47,6 +48,9 @@ func main() {
hub := hub.New(n)
go hub.Run()
+ userlist := userlist.New(n)
+ go userlist.Run()
+
if err := listen(n, *path); err != nil {
log.Fatalf("Error listening on UNIX socket: %s", err)
}
diff --git a/src/globster/hub/hub.go b/src/globster/hub/hub.go
index fb7b377..5bf7f79 100644
--- a/src/globster/hub/hub.go
+++ b/src/globster/hub/hub.go
@@ -3,20 +3,20 @@ package hub
import (
"globster/adc"
"go.blicky.net/tanja"
- "time"
"net"
+ "time"
)
// TODO: Do the connect() and write()s in a separate goroutine.
type hub struct {
- name string
- ses *tanja.Session
- conn net.Conn
+ name string
+ ses *tanja.Session
+ conn net.Conn
}
func newHub(node *tanja.Node, name string) *hub {
- s := &hub{name:name, ses:node.Session()}
+ s := &hub{name: name, ses: node.Session()}
s.ses.RegRPC(s, func(s string) tanja.Tuple {
if s[0] != 'T' {
@@ -45,7 +45,7 @@ func (s *hub) Tconnect(addr string) {
s.ses.Send(false, "hub", s.name, "Disconnected", e.Error())
} else {
s.ses.Send(false, "hub", s.name, "Connected", s.conn.RemoteAddr().String())
- adc.NewWriter(s.conn).Write(&adc.Message{Header:adc.SUP.SetType('I')})
+ adc.NewWriter(s.conn).Write(&adc.Message{Header: adc.SUP.SetType('I')})
}
}
diff --git a/src/globster/userlist/list.go b/src/globster/userlist/list.go
new file mode 100644
index 0000000..012b1e2
--- /dev/null
+++ b/src/globster/userlist/list.go
@@ -0,0 +1,260 @@
+package userlist
+
+import "go.blicky.net/tanja"
+
+// Note: Using a map for each user is not very memory-efficient, but it's not
+// much typing overhead and saves some conversions to and from tuple Elements.
+// This is mostly a toy implementation of the user list, it'll really need a
+// more memory-efficient rewrite when used in real-life situations and large
+// hubs. Ideally, this entire package is implemented in C to use the most
+// efficient packing possible while keeping things fast.
+type user map[string]tanja.Element
+
+var elNil = tanja.El(nil)
+var fieldDefs map[string]tanja.Element = map[string]tanja.Element{
+ "Name": tanja.El(""),
+ "CID": tanja.El(""),
+ "TID": elNil, // set on creation
+ "GID": elNil, // virtual
+ "Desc": tanja.El(""),
+ "EMail": tanja.El(""),
+ "Client": tanja.El(""),
+ "Upload": tanja.El(""),
+ "Download": tanja.El(0),
+ "ShareSize": tanja.El(0),
+ "ShareFiles": tanja.El(0),
+ "KeyPrint": tanja.El(""),
+ "IP4": tanja.El(""),
+ "Slots": tanja.El(0),
+ "AutoSlots": tanja.El(0),
+ "HubsOp": tanja.El(0),
+ "HubsReg": tanja.El(0),
+ "HubsNorm": tanja.El(0),
+}
+
+// Assumes the field exists
+func (u user) get(field string) tanja.Element {
+ // GID is the only "virtual" field
+ if field == "GID" {
+ if _, ok := u["CID"]; !ok {
+ return u["TID"]
+ } else {
+ return u["CID"]
+ }
+ }
+ return u[field]
+}
+
+func (u user) getFields(fields []string) tanja.Tuple {
+ if len(fields) == 0 {
+ // BUG: This doesn't include the 'GID' field
+ return tanja.Tup(map[string]tanja.Element(u))
+ }
+ r := make(map[string]tanja.Element, len(fields))
+ for _, v := range fields {
+ r[v] = u.get(v)
+ }
+ return tanja.Tup(r)
+}
+
+// TODO: Some more validation than only existance of the field would be nice
+func (u *user) set(field string, val tanja.Element) bool {
+ if fieldDefs[field] != elNil {
+ return false
+ }
+ (*u)[field] = val
+ return true
+}
+
+func newUser(tid string) *user {
+ u := make(user, len(fieldDefs))
+ for k, v := range fieldDefs {
+ if v != elNil {
+ u[k] = v
+ }
+ }
+ u["TID"] = tanja.El(tid)
+ return &u
+}
+
+type hub struct {
+ ses *tanja.Session
+ name string
+ userTID map[string]*user // lookup by TID
+ userGID map[string]*user // lookup by GID (only if GID!=TID)
+ size int64
+}
+
+func newHub(node *tanja.Node, name string) *hub {
+ s := &hub{node.Session(), name, make(map[string]*user), make(map[string]*user), 0}
+ s.ses.RegRPC(s, func(s string) tanja.Tuple {
+ if s[0] != 'T' {
+ return nil
+ }
+ return tanja.Tup("userlist", name, s[1:])
+ })
+ go s.ses.Run()
+ return s
+}
+
+func normalizeFields(fin []tanja.Element) []string {
+ fout := make([]string, 0, len(fin))
+ for _, f := range fin {
+ str := f.String()
+ if str == "" {
+ return nil
+ }
+ if _, ok := fieldDefs[str]; !ok {
+ return nil
+ }
+ fout = append(fout, str)
+ }
+ return fout
+}
+
+func (s *hub) Tget(m *tanja.Message, id tanja.Element, fields ...tanja.Element) {
+ defer m.Close()
+ flds := normalizeFields(fields)
+ if flds == nil {
+ return
+ }
+ if id.WC() {
+ for _, u := range s.userTID {
+ m.Replyt(u.getFields(flds))
+ }
+ } else if ids, ok := id.IsString(); ok {
+ if u := s.userTID[ids]; u != nil {
+ m.Replyt(u.getFields(flds))
+ }
+ }
+}
+
+func (s *hub) Tgetid(m *tanja.Message, id string, fields ...tanja.Element) {
+ defer m.Close()
+ flds := normalizeFields(fields)
+ if flds == nil {
+ return
+ }
+ var u *user
+ if u = s.userGID[id]; u == nil {
+ if u = s.userTID[id]; u == nil {
+ return
+ }
+ }
+ m.Replyt(u.getFields(flds))
+}
+
+func (s *hub) Tset(id string, data map[string]tanja.Element) {
+ // Get or create user entry
+ u := s.userTID[id]
+ added := false
+ if u == nil {
+ u = newUser(id)
+ s.userTID[id] = u
+ added = true
+ }
+ old := make(map[string]tanja.Element)
+ new := make(map[string]tanja.Element)
+
+ for k, v := range data {
+ if fieldDefs[k] == elNil {
+ continue
+ }
+ // All elements are scalar types and must be representable as a string,
+ // this makes for easy comparison.
+ olde := (*u)[k]
+ oldv := olde.String()
+ newv := v.String()
+ if oldv == newv {
+ continue
+ }
+ // Actual change, handle it
+ old[k] = olde
+ new[k] = v
+ (*u)[k] = v
+ // CID change, make sure to update the userGID table
+ if k == "CID" {
+ if oldv != "" {
+ delete(s.userGID, oldv)
+ }
+ if newv != "" {
+ s.userGID[newv] = u
+ }
+ }
+ // ShareSize change, make sure to update size statistic
+ if k == "ShareSize" {
+ s.size += v.Int() - olde.Int()
+ }
+ }
+
+ if added {
+ s.ses.Send(false, "userlist", s.name, "Added", new)
+ } else if len(new) > 0 {
+ s.ses.Send(false, "userlist", s.name, "Changed", old, new)
+ }
+}
+
+func (s *hub) Tdel(id tanja.Element) {
+ if id.WC() {
+ for _, u := range s.userTID {
+ s.ses.Send(false, "userlist", s.name, "Del", (*u)["TID"])
+ }
+ s.userTID = make(map[string]*user)
+ s.userGID = make(map[string]*user)
+ s.size = 0
+
+ } else if ids, ok := id.IsString(); ok {
+ if u := s.userTID[ids]; u != nil {
+ delete(s.userTID, ids)
+ if v := (*u)["CID"].String(); v != "" {
+ delete(s.userGID, v)
+ }
+ s.size -= (*u)["ShareSize"].Int()
+ s.ses.Send(false, "userlist", s.name, "Del", ids)
+ }
+ }
+}
+
+func (s *hub) Tstats(m *tanja.Message) {
+ m.Reply(len(s.userTID), s.size)
+ m.Close()
+}
+
+type UserList struct {
+ ses *tanja.Session
+ hubs map[string]*hub
+}
+
+func New(node *tanja.Node) *UserList {
+ s := &UserList{node.Session(), make(map[string]*hub)}
+
+ // Magic redirector. Captures all tuples for ["userlist", *] and, if no
+ // session for that hub exists yet, creates one and forwards the tuple and
+ // any replies. If the hub session already exists the tuple is ignored.
+ s.ses.Register(true, "userlist", nil).Callback(func(m *tanja.Message) {
+ hub := m.Tup[1].String()
+ if hub == "" || s.hubs[hub] != nil {
+ m.Close()
+ return
+ }
+ s.hubs[hub] = newHub(node, hub)
+ r := s.ses.Sendt(true, m.Tup)
+ go func() {
+ for t := range r.Chan() {
+ m.Replyt(t)
+ }
+ r.Close()
+ m.Close()
+ }()
+ })
+
+ // TODO: periodically remove empty hubs - they are semantically equivalent
+ // to not existing in memory anyway.
+
+ return s
+}
+
+func (s *UserList) Run() {
+ s.ses.Run()
+ // TODO: Closing
+}
diff --git a/src/globster/userlist/list_test.go b/src/globster/userlist/list_test.go
new file mode 100644
index 0000000..cc49836
--- /dev/null
+++ b/src/globster/userlist/list_test.go
@@ -0,0 +1,58 @@
+package userlist
+
+// TODO: Also test the following:
+// - Notifications
+// - Multiple userlists
+// - More fields
+
+import (
+ "go.blicky.net/tanja"
+ "reflect"
+ "testing"
+)
+
+func retExpect(t *testing.T, id string, r *tanja.ReturnPath, ret ...tanja.Tuple) {
+ defer r.Close()
+ i := 0
+ for v := range r.Chan() {
+ if i >= len(ret) {
+ t.Errorf("%s: Received reply too many: %#v", id, v)
+ return
+ }
+ if !reflect.DeepEqual(v, ret[i]) {
+ t.Errorf("%s: Incorrect reply(%d): %#v", id, i, v)
+ }
+ i++
+ }
+ if i != len(ret) {
+ t.Errorf("%s: Unexpected number of replies, got %d expected %d", id, i, len(ret))
+ }
+}
+
+func TestList(t *testing.T) {
+ n := tanja.NewNode()
+ l := New(n)
+ go l.Run()
+
+ s := n.Session()
+
+ retExpect(t, "empty1", s.Send(true, "userlist"))
+ retExpect(t, "empty2", s.Send(true, "userlist", 1))
+ retExpect(t, "empty3", s.Send(true, "userlist", 1, "get"))
+ retExpect(t, "empty4", s.Send(true, "userlist", 1, "get", nil))
+ retExpect(t, "set1", s.Send(true, "userlist", 1, "set", 1, tanja.Map("Name", "1")))
+ retExpect(t, "stats1", s.Send(true, "userlist", 1, "stats"), tanja.Tup(1, 0))
+ retExpect(t, "set2", s.Send(true, "userlist", 1, "set", 2, tanja.Map("CID", "ABC", "ShareSize", 1023)))
+ retExpect(t, "stats2", s.Send(true, "userlist", 1, "stats"), tanja.Tup(2, 1023))
+ retExpect(t, "get1", s.Send(true, "userlist", 1, "get", 1, "Name"), tanja.Tup(tanja.Map("Name", "1")))
+ retExpect(t, "set3", s.Send(true, "userlist", 1, "set", 2, tanja.Map("ShareSize", 1020)))
+ retExpect(t, "stats3", s.Send(true, "userlist", 1, "stats"), tanja.Tup(2, 1020))
+ retExpect(t, "set4", s.Send(true, "userlist", 1, "set", 1, tanja.Map("ShareSize", 10)))
+ retExpect(t, "stats4", s.Send(true, "userlist", 1, "stats"), tanja.Tup(2, 1030))
+ retExpect(t, "getid1", s.Send(true, "userlist", 1, "getid", "ABC", "ShareSize"), tanja.Tup(tanja.Map("ShareSize", 1020)))
+ retExpect(t, "get5", s.Send(true, "userlist", 1, "get", nil, "ShareSize"),
+ tanja.Tup(tanja.Map("ShareSize", 10)), // Order is actually undefined
+ tanja.Tup(tanja.Map("ShareSize", 1020)))
+ retExpect(t, "del", s.Send(true, "userlist", 1, "del", nil))
+ retExpect(t, "stats5", s.Send(true, "userlist", 1, "stats"), tanja.Tup(0, 0))
+}