diff options
author | Yorhel <git@yorhel.nl> | 2012-04-09 10:38:10 +0200 |
---|---|---|
committer | Yorhel <git@yorhel.nl> | 2012-04-09 10:38:10 +0200 |
commit | 8289946e8a349b09fa56ef21252ba5f8edc03b47 (patch) | |
tree | 0e15520237700446c7e57251cc2b7a3fa19e54bd |
Initial commit of a WIP DC client based on Tanja
-rw-r--r-- | .gitignore | 4 | ||||
-rw-r--r-- | Makefile | 31 | ||||
-rw-r--r-- | src/globster/adc/adc_test.go | 116 | ||||
-rw-r--r-- | src/globster/adc/fmt.go | 68 | ||||
-rw-r--r-- | src/globster/adc/parse.go | 156 | ||||
-rw-r--r-- | src/globster/adc/types.go | 118 | ||||
-rw-r--r-- | src/globster/globster.go | 53 | ||||
-rw-r--r-- | src/globster/hub/hub.go | 70 | ||||
-rw-r--r-- | src/globster/hub/manager.go | 74 |
9 files changed, 690 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4a1c242 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +globster +!src/globster +src/go.blicky.net/ +pkg/ diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..38aa6ac --- /dev/null +++ b/Makefile @@ -0,0 +1,31 @@ +.PHONY: all fmt install test clean distclean play + +PWD=$(shell pwd) +GOPATH:=${PWD}:${GOPATH} +OLDBIN:=${GOBIN} +GOBIN:=${PWD} + +all: + go get -v globster/... + +fmt: + go fmt globster/... + +install: + GOBIN="${OLDBIN}" go install globster + +test: + go test globster/... + +clean: + rm -rf globster pkg + +distclean: clean + rm -rf src/go.blicky.net + +# Yorhel's playground. +play: all + @./globster -l sock &echo $$!>pid;\ + perl -I../tanja/perl ../tanja/perl/tanja-cli.pl -vc $$PWD/sock;\ + kill `cat pid`;\ + rm -f sock pid diff --git a/src/globster/adc/adc_test.go b/src/globster/adc/adc_test.go new file mode 100644 index 0000000..f733b43 --- /dev/null +++ b/src/globster/adc/adc_test.go @@ -0,0 +1,116 @@ +package adc + +import "testing" + +type argv struct { + input string + output string + rem string + haserr bool +} + +func TestParseArg(t *testing.T) { + l := []argv{ + {"", "", "", false}, + {" 1", "", " 1", false}, + {"1", "1", "", false}, + {"1\n", "1", "\n", false}, + {"a b", "a", " b", false}, + {"a\\\\b", "a\\b", "", false}, + {"\\n\\\\\\s", "\n\\ ", "", false}, + {"\\", "", "\\", true}, + {"\\ ", "", "\\ ", true}, + {"some\\", "", "\\", true}, + {"\\e", "", "\\e", true}, + {"\xff", "", "", true}, + {"\xc9a", "", "", true}, + } + for _, v := range l { + out, rem, err := ParseArg([]byte(v.input)) + if string(out) != v.output { + t.Errorf("Unexpected output: got '%s', expected '%s'", out, v.output) + } + if string(rem) != v.rem { + t.Errorf("Unexpected remainder: got '%s', expected '%s'", rem, v.rem) + } + if (err != nil) != v.haserr { + t.Errorf("Unexpected error: got '%s', expected %v", err, v.haserr) + } + } +} + +type parsev struct { + input string + haserr bool +} + +func TestParseFmt(t *testing.T) { + l := []parsev{ + {"", true}, + {"\n", false}, + {"HSUP\n", false}, + {" HSUP\n", true}, + {"\nHSUP\n", true}, + {"HSUP\n", false}, + {"HSuP\n", true}, + {"NSUP\n", true}, + {"HSUP ", true}, + {"HSUP", true}, + {"HUNK\n", true}, + {"HSTA 0 blah\n", false}, + {"HSTA 0\n", true}, + {"HSTA 0 blah \n", true}, + {"HSTA blah\n", false}, + {"HSTA \n", false}, + {"HSTA 0 \n", false}, + {"HSUP AD0 AD1\n", false}, + {"HSUP AD AD AD\n", false}, + {"HSUP 3D\n", true}, + {"HSUP A B\n", true}, + {"HSUP AD1\n", true}, + {"BSUP 2344\n", false}, + {"BSUP 2344\n", true}, + {"BSUP 2304\n", true}, + {"BSUP 23043\n", true}, + {"BSUP 234 \n", true}, + {"BSUP 233\n", true}, + {"BSTA 2334 0 1\n", false}, + {"BSTA 2334 \n", false}, + {"BSUP\n", true}, + {"BSUP ABCD\n", false}, + {"BSUP ABcD\n", true}, + {"BSUP abcd\n", true}, + {"DSUP ABCD \n", true}, + {"DSUP ABCD 2304D\n", true}, + {"DSUP ABCD 2304\n", true}, + {"DSUP ABCD 239\n", true}, + {"DSUP ABCD 239 \n", true}, + {"DSUP ABCD 2344\n", true}, + {"DSUP ABCD 2344\n", false}, + {"DSTA ABCD 2344 0 1\n", false}, + {"DSTA ABCD 2344 \n", false}, + {"DSTA ABCD 2344 \n", true}, + {"DSTA ABCD 2344 \n", true}, + {"USUP\n", true}, + {"USUP \n", true}, + {"USUP A\n", false}, + {"USUP ABCDEFGHIJKLMNOPQRSTUVWXYZ234567\n", false}, + {"USUP 0\n", true}, + {"USTA 0 \n", true}, + {"USTA 0 a b\n", true}, + {"HSTA a\"'\\\\\n", false}, + {"HINF RFa\"'\\\\\n", false}, + } + for _, v := range l { + msg, err := Parse([]byte(v.input)) + if (err != nil) != v.haserr { + t.Errorf("Unexpected error: got '%s' for '%s'", err, v.input) + } + if msg != nil { + out := msg.Format(nil) + if string(out) != v.input { + t.Errorf("Unexpected output: got '%s', expected '%s'", out, v.input) + } + } + } +} diff --git a/src/globster/adc/fmt.go b/src/globster/adc/fmt.go new file mode 100644 index 0000000..39360d5 --- /dev/null +++ b/src/globster/adc/fmt.go @@ -0,0 +1,68 @@ +package adc + +import "io" + +func (a Arg) Format(b []byte) []byte { + // Walk over the raw bytes, Unicode characters (and anything else) is + // simply passed through unmodified. + for i := 0; i < len(a); i++ { + switch a[i] { + case ' ': + b = append(b, []byte{'\\', 's'}...) + case '\n': + b = append(b, []byte{'\\', 'n'}...) + case '\\': + b = append(b, []byte{'\\', '\\'}...) + default: + b = append(b, a[i]) + } + } + return b +} + +// TODO: features +func (m *Message) Format(b []byte) []byte { + if m == nil { + return append(b, '\n') + } + b = m.Header.Format(b) + if m.Src != 0 { + b = append(b, ' ') + b = m.Src.Format(b) + } + if m.Dest != 0 { + b = append(b, ' ') + b = m.Dest.Format(b) + } + if len(m.CID) > 0 { + b = append(b, ' ') + b = append(b, []byte(m.CID)...) + } + for _, v := range m.PosArgs { + b = append(b, ' ') + b = v.Format(b) + } + for k, l := range m.NamedArgs { + for _, v := range l { + b = append(b, []byte{' ', k[0], k[1]}...) + b = v.Format(b) + } + } + return append(b, 0x0a) +} + +func (m *Message) String() string { + return string(m.Format(nil)) +} + +type Writer struct { + wr io.Writer +} + +func NewWriter(wr io.Writer) *Writer { + return &Writer{wr} +} + +func (wr *Writer) Write(m *Message) (int, error) { + return wr.wr.Write(m.Format(nil)) +} diff --git a/src/globster/adc/parse.go b/src/globster/adc/parse.go new file mode 100644 index 0000000..192d95a --- /dev/null +++ b/src/globster/adc/parse.go @@ -0,0 +1,156 @@ +package adc + +import ( + "bufio" + "errors" + "io" + "unicode/utf8" +) + +func ParseArg(b []byte) (Arg, []byte, error) { + var res []byte + for len(b) > 0 { + if b[0] == ' ' || b[0] == '\n' { + break + } + if b[0] == '\\' { + if len(b) < 2 { + return "", b, errors.New("Unexpected end of message after escape.") + } + if b[1] == '\\' { + res = append(res, '\\') + } else if b[1] == 'n' { + res = append(res, '\n') + } else if b[1] == 's' { + res = append(res, ' ') + } else { + return "", b, errors.New("Invalid escape character.") + } + b = b[2:] + } else { + res = append(res, b[0]) + b = b[1:] + } + } + if !utf8.Valid(res) { + return "", b, errors.New("Invalid UTF-8 sequence.") + } + return Arg(res), b, nil +} + +// TODO: features +// Note: Can return nil, nil if the message is empty (which is a valid ADC message). +func Parse(buf []byte) (*Message, error) { + if len(buf) == 1 && buf[0] == '\n' { + return nil, nil + } + m := &Message{} + if len(buf) < 4 { + return nil, errors.New("Invalid command.") + } + if buf[len(buf)-1] != '\n' { + return nil, errors.New("Message does not end with a newline.") + } + buf = buf[:len(buf)-1] + + // Header + if buf[0] != 'B' && buf[0] != 'C' && buf[0] != 'I' && buf[0] != 'H' && buf[0] != 'D' && buf[0] != 'E' && buf[0] != 'F' && buf[0] != 'U' { + return nil, errors.New("Invalid command type.") + } + if !isAlphaNum(buf[1]) || !isAlphaNum(buf[2]) || !isAlphaNum(buf[3]) { + return nil, errors.New("Invalid command.") + } + m.Header = NewFourCC(buf) + t := m.Header.Type() + n, ok := cmdArgNum[m.Header.Command()] + if !ok { + return nil, errors.New("Invalid command.") + } + buf = buf[4:] + + // Src + if t == 'B' || t == 'D' || t == 'E' || t == 'F' { + if len(buf) < 5 || buf[0] != ' ' || !isSID(buf[1:]) { + return nil, errors.New("Invalid SID argument.") + } + m.Src = NewFourCC(buf[1:]) + buf = buf[5:] + } + + // Dest + if t == 'D' || t == 'E' { + if len(buf) < 5 || buf[0] != ' ' || !isSID(buf[1:]) { + return nil, errors.New("Invalid SID argument.") + } + m.Dest = NewFourCC(buf[1:]) + buf = buf[5:] + } + + // CID + if t == 'U' { + if len(buf) < 2 || buf[0] != ' ' { + return nil, errors.New("Invalid CID argument.") + } + buf = buf[1:] + var cid []byte + for len(buf) > 0 && buf[0] != ' ' { + if !isBase32(buf[0]) { + return nil, errors.New("Invalid CID argument.") + } + cid = append(cid, buf[0]) + buf = buf[1:] + } + m.CID = string(cid) + } + + // Positional arguments + for n > 0 { + if len(buf) < 1 || buf[0] != ' ' { + return nil, errors.New("Invalid or missing positional argument.") + } + var s Arg + var e error + s, buf, e = ParseArg(buf[1:]) + if e != nil { + return nil, errors.New("Invalid positional argument: " + e.Error()) + } + m.PosArgs = append(m.PosArgs, s) + n-- + } + + // Named arguments + for len(buf) > 0 { + if len(buf) < 3 || buf[0] != ' ' || !isAlpha(buf[1]) || !isAlphaNum(buf[2]) { + return nil, errors.New("Invalid named argument.") + } + if m.NamedArgs == nil { + m.NamedArgs = make(map[[2]byte][]Arg) + } + name := [2]byte{buf[1], buf[2]} + var val Arg + var e error + val, buf, e = ParseArg(buf[3:]) + if e != nil { + return nil, errors.New("Invalid named argument: " + e.Error()) + } + m.NamedArgs[name] = append(m.NamedArgs[name], val) + } + + return m, nil +} + +type Reader struct { + rd *bufio.Reader +} + +func NewReader(rd io.Reader, maxSize int) *Reader { + return &Reader{bufio.NewReaderSize(rd, maxSize)} +} + +func (rd *Reader) Read() (*Message, error) { + l, e := rd.rd.ReadString('\n') + if e != nil { + return nil, e + } + return Parse([]byte(l)) +} diff --git a/src/globster/adc/types.go b/src/globster/adc/types.go new file mode 100644 index 0000000..ecea43f --- /dev/null +++ b/src/globster/adc/types.go @@ -0,0 +1,118 @@ +package adc + +// Works for both command_type+command_name and encoded_cid. +// For command names without type, the first byte of the FourCC = '*'. +type FourCC int32 + +type Arg string + +// TODO: feature broadcasts +type Message struct { + Header FourCC + Src FourCC // 0 if not present + Dest FourCC // 0 if not present + CID string // if Header.Type() == 'U', unmodified base32 string otherwise empty + PosArgs []Arg + NamedArgs map[[2]byte][]Arg // may be nil +} + +const ( + CTM FourCC = 0x2a43544d + GET FourCC = 0x2a474554 + GFI FourCC = 0x2a474749 + GPA FourCC = 0x2a475041 + INF FourCC = 0x2a494e46 + MSG FourCC = 0x2a4d5347 + PAS FourCC = 0x2a504153 + QUI FourCC = 0x2a515549 + RCM FourCC = 0x2a52434d + RES FourCC = 0x2a524553 + SCH FourCC = 0x2a534348 + SID FourCC = 0x2a534944 + SND FourCC = 0x2a534e44 + STA FourCC = 0x2a535441 + SUP FourCC = 0x2A535550 +) + +// List of all valid commands and their number of fixed positional arguments. +var cmdArgNum map[FourCC]int = map[FourCC]int{ + CTM: 3, + GET: 4, + GFI: 2, + GPA: 1, + INF: 0, + MSG: 1, + PAS: 1, + QUI: 1, + RCM: 2, + RES: 0, + SCH: 0, + SID: 1, + SND: 4, + STA: 2, + SUP: 0, +} + +// len(s) must be >= 4 +func NewFourCC(s []byte) FourCC { + return FourCC(int32(s[0])<<24 + int32(s[1])<<16 + int32(s[2])<<8 + int32(s[3])) +} + +func (f FourCC) Type() byte { + return byte(f >> 24) +} + +func (f FourCC) SetType(t byte) FourCC { + return (f & 0xFFFFFF) | (FourCC(t) << 24) +} + +func (f FourCC) Command() FourCC { + return f.SetType('*') +} + +func (f FourCC) Format(b []byte) []byte { + return append(b, []byte{byte(f >> 24), byte(f >> 16), byte(f >> 8), byte(f)}...) +} + +func (f FourCC) String() string { + return string(f.Format(nil)) +} + +// Returns the first found value of the named argument, or an empty string if not found. +func (m *Message) Arg(a [2]byte) Arg { + if l := m.NamedArgs[a]; l != nil && len(l) > 0 { + return l[0] + } + return "" +} + +// Some handy validation functions. Used by parse.go, but some would also be +// used in applications to perform further validation on incoming messages. +func isAlpha(b byte) bool { + return b >= 'A' && b <= 'Z' +} + +func isAlphaNum(b byte) bool { + return isAlpha(b) || (b >= '0' && b <= '9') +} + +func isBase32(b byte) bool { + return isAlpha(b) || (b >= '2' && b <= '7') +} + +func isSID(b []byte) bool { + return len(b) >= 4 && isBase32(b[0]) && isBase32(b[1]) && isBase32(b[2]) && isBase32(b[3]) +} + +func (a Arg) IsSID() bool { + return len(a) == 4 && isSID([]byte(a)) +} + +func (a Arg) IsBase32() bool { + for i := 0; i < len(a); i++ { + if !isBase32(a[i]) { + return false + } + } + return len(a) > 0 +} diff --git a/src/globster/globster.go b/src/globster/globster.go new file mode 100644 index 0000000..19d5b56 --- /dev/null +++ b/src/globster/globster.go @@ -0,0 +1,53 @@ +package main + +import ( + "flag" + "go.blicky.net/tanja" + "globster/hub" + "log" + "net" +) + +func listen(node *tanja.Node, path string) (e error) { + var addr *net.UnixAddr + if addr, e = net.ResolveUnixAddr("unix", path); e != nil { + return + } + var lst *net.UnixListener + if lst, e = net.ListenUnix("unix", addr); e != nil { + return + } + i := 0 + for { + var conn net.Conn + if conn, e = lst.Accept(); e != nil { + return + } + i++ + log.Printf("%03d: Incoming connection on UNIX socket.", i) + lnk := node.Link(conn) + go func(i int) { + err := <-lnk.Start() + if err == nil { + log.Printf("%03d: Disconnected.", i) + } else { + log.Printf("%03d: %s", i, err) + } + }(i) + } + return +} + +func main() { + path := flag.String("l", "/tmp/globster.sock", "Path to the UNIX listen socket.") + flag.Parse() + + n := tanja.NewNode() + + hub := hub.New(n) + go hub.Run() + + if err := listen(n, *path); err != nil { + log.Fatalf("Error listening on UNIX socket: %s", err) + } +} diff --git a/src/globster/hub/hub.go b/src/globster/hub/hub.go new file mode 100644 index 0000000..fb7b377 --- /dev/null +++ b/src/globster/hub/hub.go @@ -0,0 +1,70 @@ +package hub + +import ( + "globster/adc" + "go.blicky.net/tanja" + "time" + "net" +) + +// TODO: Do the connect() and write()s in a separate goroutine. + +type hub struct { + name string + ses *tanja.Session + conn net.Conn +} + +func newHub(node *tanja.Node, name string) *hub { + s := &hub{name:name, ses:node.Session()} + + s.ses.RegRPC(s, func(s string) tanja.Tuple { + if s[0] != 'T' { + return nil + } + return tanja.Tup("hub", name, s[1:]) + }) + + s.ses.Send(false, "hub", name, "Created") + go s.ses.Run() + return s +} + +func (s *hub) Texists(m *tanja.Message) { + m.Reply(s.name) + m.Close() +} + +func (s *hub) Tconnect(addr string) { + if s.conn != nil { + return + } + var e error + s.conn, e = net.DialTimeout("tcp", addr, 30*time.Second) + if e != nil { + s.ses.Send(false, "hub", s.name, "Disconnected", e.Error()) + } else { + s.ses.Send(false, "hub", s.name, "Connected", s.conn.RemoteAddr().String()) + adc.NewWriter(s.conn).Write(&adc.Message{Header:adc.SUP.SetType('I')}) + } +} + +func (s *hub) Tdisconnect(m *tanja.Message, r ...tanja.Element) { + if s.conn != nil { + s.conn.Close() + s.conn = nil + s.ses.Sendt(false, append(tanja.Tup("hub", s.name, "Disconnected"), r...)) + } + // Only close the return-path after disconnect, to allow users to wait for + // successful disconnect. + m.Close() +} + +func (s *hub) close() { + r := s.ses.Send(true, "hub", s.name, "disconnect") + <-r.Chan() + r.Close() + + s.ses.Close() + s.ses.Send(false, "hub", s.name, "Closed") +} diff --git a/src/globster/hub/manager.go b/src/globster/hub/manager.go new file mode 100644 index 0000000..385d29e --- /dev/null +++ b/src/globster/hub/manager.go @@ -0,0 +1,74 @@ +package hub + +import ( + "go.blicky.net/tanja" +) + +type Manager struct { + ses *tanja.Session + node *tanja.Node + closed chan bool + hubs map[string]*hub +} + +func New(node *tanja.Node) *Manager { + s := &Manager{} + s.ses = node.Session() + s.closed = make(chan bool) + s.hubs = make(map[string]*hub) + s.node = node + + s.ses.Register(true, "hub", nil, "new").Callback(func(t *tanja.Message) { s.tNew(t) }) + s.ses.Register(false, "hub", nil, "close").Callback(func(t *tanja.Message) { s.tClose(t) }) + + return s +} + +func (s *Manager) tNew(m *tanja.Message) { + n := m.Tup[1].String() + if s.hubs[n] != nil { + m.Reply(0) + } else if n != "" { + s.hubs[n] = newHub(s.node, n) + m.Reply(1) + } + m.Close() +} + +func (s *Manager) tClose(m *tanja.Message) { + if m.Tup[1].WC() { // Close all + for _, h := range s.hubs { + go h.close() + } + s.hubs = make(map[string]*hub) + } else { // Close one + n := m.Tup[1].String() + if h := s.hubs[n]; h != nil { + delete(s.hubs, n) + go h.close() + } + } +} + +func (s *Manager) Run() { + s.ses.Run() + + // Close all hubs (in parallel) + ch := make(chan bool, 10) + for _, h := range s.hubs { + go func() { + h.close() + ch <- true + }() + } + for i := len(s.hubs); i > 0; i-- { + <-ch + } + + close(s.closed) +} + +func (s *Manager) Close() { + s.ses.Close() + <-s.closed +} |