summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYorhel <git@yorhel.nl>2012-04-09 10:38:10 +0200
committerYorhel <git@yorhel.nl>2012-04-09 10:38:10 +0200
commit8289946e8a349b09fa56ef21252ba5f8edc03b47 (patch)
tree0e15520237700446c7e57251cc2b7a3fa19e54bd
Initial commit of a WIP DC client based on Tanja
-rw-r--r--.gitignore4
-rw-r--r--Makefile31
-rw-r--r--src/globster/adc/adc_test.go116
-rw-r--r--src/globster/adc/fmt.go68
-rw-r--r--src/globster/adc/parse.go156
-rw-r--r--src/globster/adc/types.go118
-rw-r--r--src/globster/globster.go53
-rw-r--r--src/globster/hub/hub.go70
-rw-r--r--src/globster/hub/manager.go74
9 files changed, 690 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..4a1c242
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,4 @@
+globster
+!src/globster
+src/go.blicky.net/
+pkg/
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..38aa6ac
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,31 @@
+.PHONY: all fmt install test clean distclean play
+
+PWD=$(shell pwd)
+GOPATH:=${PWD}:${GOPATH}
+OLDBIN:=${GOBIN}
+GOBIN:=${PWD}
+
+all:
+ go get -v globster/...
+
+fmt:
+ go fmt globster/...
+
+install:
+ GOBIN="${OLDBIN}" go install globster
+
+test:
+ go test globster/...
+
+clean:
+ rm -rf globster pkg
+
+distclean: clean
+ rm -rf src/go.blicky.net
+
+# Yorhel's playground.
+play: all
+ @./globster -l sock &echo $$!>pid;\
+ perl -I../tanja/perl ../tanja/perl/tanja-cli.pl -vc $$PWD/sock;\
+ kill `cat pid`;\
+ rm -f sock pid
diff --git a/src/globster/adc/adc_test.go b/src/globster/adc/adc_test.go
new file mode 100644
index 0000000..f733b43
--- /dev/null
+++ b/src/globster/adc/adc_test.go
@@ -0,0 +1,116 @@
+package adc
+
+import "testing"
+
+type argv struct {
+ input string
+ output string
+ rem string
+ haserr bool
+}
+
+func TestParseArg(t *testing.T) {
+ l := []argv{
+ {"", "", "", false},
+ {" 1", "", " 1", false},
+ {"1", "1", "", false},
+ {"1\n", "1", "\n", false},
+ {"a b", "a", " b", false},
+ {"a\\\\b", "a\\b", "", false},
+ {"\\n\\\\\\s", "\n\\ ", "", false},
+ {"\\", "", "\\", true},
+ {"\\ ", "", "\\ ", true},
+ {"some\\", "", "\\", true},
+ {"\\e", "", "\\e", true},
+ {"\xff", "", "", true},
+ {"\xc9a", "", "", true},
+ }
+ for _, v := range l {
+ out, rem, err := ParseArg([]byte(v.input))
+ if string(out) != v.output {
+ t.Errorf("Unexpected output: got '%s', expected '%s'", out, v.output)
+ }
+ if string(rem) != v.rem {
+ t.Errorf("Unexpected remainder: got '%s', expected '%s'", rem, v.rem)
+ }
+ if (err != nil) != v.haserr {
+ t.Errorf("Unexpected error: got '%s', expected %v", err, v.haserr)
+ }
+ }
+}
+
+type parsev struct {
+ input string
+ haserr bool
+}
+
+func TestParseFmt(t *testing.T) {
+ l := []parsev{
+ {"", true},
+ {"\n", false},
+ {"HSUP\n", false},
+ {" HSUP\n", true},
+ {"\nHSUP\n", true},
+ {"HSUP\n", false},
+ {"HSuP\n", true},
+ {"NSUP\n", true},
+ {"HSUP ", true},
+ {"HSUP", true},
+ {"HUNK\n", true},
+ {"HSTA 0 blah\n", false},
+ {"HSTA 0\n", true},
+ {"HSTA 0 blah \n", true},
+ {"HSTA blah\n", false},
+ {"HSTA \n", false},
+ {"HSTA 0 \n", false},
+ {"HSUP AD0 AD1\n", false},
+ {"HSUP AD AD AD\n", false},
+ {"HSUP 3D\n", true},
+ {"HSUP A B\n", true},
+ {"HSUP AD1\n", true},
+ {"BSUP 2344\n", false},
+ {"BSUP 2344\n", true},
+ {"BSUP 2304\n", true},
+ {"BSUP 23043\n", true},
+ {"BSUP 234 \n", true},
+ {"BSUP 233\n", true},
+ {"BSTA 2334 0 1\n", false},
+ {"BSTA 2334 \n", false},
+ {"BSUP\n", true},
+ {"BSUP ABCD\n", false},
+ {"BSUP ABcD\n", true},
+ {"BSUP abcd\n", true},
+ {"DSUP ABCD \n", true},
+ {"DSUP ABCD 2304D\n", true},
+ {"DSUP ABCD 2304\n", true},
+ {"DSUP ABCD 239\n", true},
+ {"DSUP ABCD 239 \n", true},
+ {"DSUP ABCD 2344\n", true},
+ {"DSUP ABCD 2344\n", false},
+ {"DSTA ABCD 2344 0 1\n", false},
+ {"DSTA ABCD 2344 \n", false},
+ {"DSTA ABCD 2344 \n", true},
+ {"DSTA ABCD 2344 \n", true},
+ {"USUP\n", true},
+ {"USUP \n", true},
+ {"USUP A\n", false},
+ {"USUP ABCDEFGHIJKLMNOPQRSTUVWXYZ234567\n", false},
+ {"USUP 0\n", true},
+ {"USTA 0 \n", true},
+ {"USTA 0 a b\n", true},
+ {"HSTA a\"'\\\\\n", false},
+ {"HINF RFa\"'\\\\\n", false},
+ }
+ for _, v := range l {
+ msg, err := Parse([]byte(v.input))
+ if (err != nil) != v.haserr {
+ t.Errorf("Unexpected error: got '%s' for '%s'", err, v.input)
+ }
+ if msg != nil {
+ out := msg.Format(nil)
+ if string(out) != v.input {
+ t.Errorf("Unexpected output: got '%s', expected '%s'", out, v.input)
+ }
+ }
+ }
+}
diff --git a/src/globster/adc/fmt.go b/src/globster/adc/fmt.go
new file mode 100644
index 0000000..39360d5
--- /dev/null
+++ b/src/globster/adc/fmt.go
@@ -0,0 +1,68 @@
+package adc
+
+import "io"
+
+func (a Arg) Format(b []byte) []byte {
+ // Walk over the raw bytes, Unicode characters (and anything else) is
+ // simply passed through unmodified.
+ for i := 0; i < len(a); i++ {
+ switch a[i] {
+ case ' ':
+ b = append(b, []byte{'\\', 's'}...)
+ case '\n':
+ b = append(b, []byte{'\\', 'n'}...)
+ case '\\':
+ b = append(b, []byte{'\\', '\\'}...)
+ default:
+ b = append(b, a[i])
+ }
+ }
+ return b
+}
+
+// TODO: features
+func (m *Message) Format(b []byte) []byte {
+ if m == nil {
+ return append(b, '\n')
+ }
+ b = m.Header.Format(b)
+ if m.Src != 0 {
+ b = append(b, ' ')
+ b = m.Src.Format(b)
+ }
+ if m.Dest != 0 {
+ b = append(b, ' ')
+ b = m.Dest.Format(b)
+ }
+ if len(m.CID) > 0 {
+ b = append(b, ' ')
+ b = append(b, []byte(m.CID)...)
+ }
+ for _, v := range m.PosArgs {
+ b = append(b, ' ')
+ b = v.Format(b)
+ }
+ for k, l := range m.NamedArgs {
+ for _, v := range l {
+ b = append(b, []byte{' ', k[0], k[1]}...)
+ b = v.Format(b)
+ }
+ }
+ return append(b, 0x0a)
+}
+
+func (m *Message) String() string {
+ return string(m.Format(nil))
+}
+
+type Writer struct {
+ wr io.Writer
+}
+
+func NewWriter(wr io.Writer) *Writer {
+ return &Writer{wr}
+}
+
+func (wr *Writer) Write(m *Message) (int, error) {
+ return wr.wr.Write(m.Format(nil))
+}
diff --git a/src/globster/adc/parse.go b/src/globster/adc/parse.go
new file mode 100644
index 0000000..192d95a
--- /dev/null
+++ b/src/globster/adc/parse.go
@@ -0,0 +1,156 @@
+package adc
+
+import (
+ "bufio"
+ "errors"
+ "io"
+ "unicode/utf8"
+)
+
+func ParseArg(b []byte) (Arg, []byte, error) {
+ var res []byte
+ for len(b) > 0 {
+ if b[0] == ' ' || b[0] == '\n' {
+ break
+ }
+ if b[0] == '\\' {
+ if len(b) < 2 {
+ return "", b, errors.New("Unexpected end of message after escape.")
+ }
+ if b[1] == '\\' {
+ res = append(res, '\\')
+ } else if b[1] == 'n' {
+ res = append(res, '\n')
+ } else if b[1] == 's' {
+ res = append(res, ' ')
+ } else {
+ return "", b, errors.New("Invalid escape character.")
+ }
+ b = b[2:]
+ } else {
+ res = append(res, b[0])
+ b = b[1:]
+ }
+ }
+ if !utf8.Valid(res) {
+ return "", b, errors.New("Invalid UTF-8 sequence.")
+ }
+ return Arg(res), b, nil
+}
+
+// TODO: features
+// Note: Can return nil, nil if the message is empty (which is a valid ADC message).
+func Parse(buf []byte) (*Message, error) {
+ if len(buf) == 1 && buf[0] == '\n' {
+ return nil, nil
+ }
+ m := &Message{}
+ if len(buf) < 4 {
+ return nil, errors.New("Invalid command.")
+ }
+ if buf[len(buf)-1] != '\n' {
+ return nil, errors.New("Message does not end with a newline.")
+ }
+ buf = buf[:len(buf)-1]
+
+ // Header
+ if buf[0] != 'B' && buf[0] != 'C' && buf[0] != 'I' && buf[0] != 'H' && buf[0] != 'D' && buf[0] != 'E' && buf[0] != 'F' && buf[0] != 'U' {
+ return nil, errors.New("Invalid command type.")
+ }
+ if !isAlphaNum(buf[1]) || !isAlphaNum(buf[2]) || !isAlphaNum(buf[3]) {
+ return nil, errors.New("Invalid command.")
+ }
+ m.Header = NewFourCC(buf)
+ t := m.Header.Type()
+ n, ok := cmdArgNum[m.Header.Command()]
+ if !ok {
+ return nil, errors.New("Invalid command.")
+ }
+ buf = buf[4:]
+
+ // Src
+ if t == 'B' || t == 'D' || t == 'E' || t == 'F' {
+ if len(buf) < 5 || buf[0] != ' ' || !isSID(buf[1:]) {
+ return nil, errors.New("Invalid SID argument.")
+ }
+ m.Src = NewFourCC(buf[1:])
+ buf = buf[5:]
+ }
+
+ // Dest
+ if t == 'D' || t == 'E' {
+ if len(buf) < 5 || buf[0] != ' ' || !isSID(buf[1:]) {
+ return nil, errors.New("Invalid SID argument.")
+ }
+ m.Dest = NewFourCC(buf[1:])
+ buf = buf[5:]
+ }
+
+ // CID
+ if t == 'U' {
+ if len(buf) < 2 || buf[0] != ' ' {
+ return nil, errors.New("Invalid CID argument.")
+ }
+ buf = buf[1:]
+ var cid []byte
+ for len(buf) > 0 && buf[0] != ' ' {
+ if !isBase32(buf[0]) {
+ return nil, errors.New("Invalid CID argument.")
+ }
+ cid = append(cid, buf[0])
+ buf = buf[1:]
+ }
+ m.CID = string(cid)
+ }
+
+ // Positional arguments
+ for n > 0 {
+ if len(buf) < 1 || buf[0] != ' ' {
+ return nil, errors.New("Invalid or missing positional argument.")
+ }
+ var s Arg
+ var e error
+ s, buf, e = ParseArg(buf[1:])
+ if e != nil {
+ return nil, errors.New("Invalid positional argument: " + e.Error())
+ }
+ m.PosArgs = append(m.PosArgs, s)
+ n--
+ }
+
+ // Named arguments
+ for len(buf) > 0 {
+ if len(buf) < 3 || buf[0] != ' ' || !isAlpha(buf[1]) || !isAlphaNum(buf[2]) {
+ return nil, errors.New("Invalid named argument.")
+ }
+ if m.NamedArgs == nil {
+ m.NamedArgs = make(map[[2]byte][]Arg)
+ }
+ name := [2]byte{buf[1], buf[2]}
+ var val Arg
+ var e error
+ val, buf, e = ParseArg(buf[3:])
+ if e != nil {
+ return nil, errors.New("Invalid named argument: " + e.Error())
+ }
+ m.NamedArgs[name] = append(m.NamedArgs[name], val)
+ }
+
+ return m, nil
+}
+
+type Reader struct {
+ rd *bufio.Reader
+}
+
+func NewReader(rd io.Reader, maxSize int) *Reader {
+ return &Reader{bufio.NewReaderSize(rd, maxSize)}
+}
+
+func (rd *Reader) Read() (*Message, error) {
+ l, e := rd.rd.ReadString('\n')
+ if e != nil {
+ return nil, e
+ }
+ return Parse([]byte(l))
+}
diff --git a/src/globster/adc/types.go b/src/globster/adc/types.go
new file mode 100644
index 0000000..ecea43f
--- /dev/null
+++ b/src/globster/adc/types.go
@@ -0,0 +1,118 @@
+package adc
+
+// Works for both command_type+command_name and encoded_cid.
+// For command names without type, the first byte of the FourCC = '*'.
+type FourCC int32
+
+type Arg string
+
+// TODO: feature broadcasts
+type Message struct {
+ Header FourCC
+ Src FourCC // 0 if not present
+ Dest FourCC // 0 if not present
+ CID string // if Header.Type() == 'U', unmodified base32 string otherwise empty
+ PosArgs []Arg
+ NamedArgs map[[2]byte][]Arg // may be nil
+}
+
+const (
+ CTM FourCC = 0x2a43544d
+ GET FourCC = 0x2a474554
+ GFI FourCC = 0x2a474749
+ GPA FourCC = 0x2a475041
+ INF FourCC = 0x2a494e46
+ MSG FourCC = 0x2a4d5347
+ PAS FourCC = 0x2a504153
+ QUI FourCC = 0x2a515549
+ RCM FourCC = 0x2a52434d
+ RES FourCC = 0x2a524553
+ SCH FourCC = 0x2a534348
+ SID FourCC = 0x2a534944
+ SND FourCC = 0x2a534e44
+ STA FourCC = 0x2a535441
+ SUP FourCC = 0x2A535550
+)
+
+// List of all valid commands and their number of fixed positional arguments.
+var cmdArgNum map[FourCC]int = map[FourCC]int{
+ CTM: 3,
+ GET: 4,
+ GFI: 2,
+ GPA: 1,
+ INF: 0,
+ MSG: 1,
+ PAS: 1,
+ QUI: 1,
+ RCM: 2,
+ RES: 0,
+ SCH: 0,
+ SID: 1,
+ SND: 4,
+ STA: 2,
+ SUP: 0,
+}
+
+// len(s) must be >= 4
+func NewFourCC(s []byte) FourCC {
+ return FourCC(int32(s[0])<<24 + int32(s[1])<<16 + int32(s[2])<<8 + int32(s[3]))
+}
+
+func (f FourCC) Type() byte {
+ return byte(f >> 24)
+}
+
+func (f FourCC) SetType(t byte) FourCC {
+ return (f & 0xFFFFFF) | (FourCC(t) << 24)
+}
+
+func (f FourCC) Command() FourCC {
+ return f.SetType('*')
+}
+
+func (f FourCC) Format(b []byte) []byte {
+ return append(b, []byte{byte(f >> 24), byte(f >> 16), byte(f >> 8), byte(f)}...)
+}
+
+func (f FourCC) String() string {
+ return string(f.Format(nil))
+}
+
+// Returns the first found value of the named argument, or an empty string if not found.
+func (m *Message) Arg(a [2]byte) Arg {
+ if l := m.NamedArgs[a]; l != nil && len(l) > 0 {
+ return l[0]
+ }
+ return ""
+}
+
+// Some handy validation functions. Used by parse.go, but some would also be
+// used in applications to perform further validation on incoming messages.
+func isAlpha(b byte) bool {
+ return b >= 'A' && b <= 'Z'
+}
+
+func isAlphaNum(b byte) bool {
+ return isAlpha(b) || (b >= '0' && b <= '9')
+}
+
+func isBase32(b byte) bool {
+ return isAlpha(b) || (b >= '2' && b <= '7')
+}
+
+func isSID(b []byte) bool {
+ return len(b) >= 4 && isBase32(b[0]) && isBase32(b[1]) && isBase32(b[2]) && isBase32(b[3])
+}
+
+func (a Arg) IsSID() bool {
+ return len(a) == 4 && isSID([]byte(a))
+}
+
+func (a Arg) IsBase32() bool {
+ for i := 0; i < len(a); i++ {
+ if !isBase32(a[i]) {
+ return false
+ }
+ }
+ return len(a) > 0
+}
diff --git a/src/globster/globster.go b/src/globster/globster.go
new file mode 100644
index 0000000..19d5b56
--- /dev/null
+++ b/src/globster/globster.go
@@ -0,0 +1,53 @@
+package main
+
+import (
+ "flag"
+ "go.blicky.net/tanja"
+ "globster/hub"
+ "log"
+ "net"
+)
+
+func listen(node *tanja.Node, path string) (e error) {
+ var addr *net.UnixAddr
+ if addr, e = net.ResolveUnixAddr("unix", path); e != nil {
+ return
+ }
+ var lst *net.UnixListener
+ if lst, e = net.ListenUnix("unix", addr); e != nil {
+ return
+ }
+ i := 0
+ for {
+ var conn net.Conn
+ if conn, e = lst.Accept(); e != nil {
+ return
+ }
+ i++
+ log.Printf("%03d: Incoming connection on UNIX socket.", i)
+ lnk := node.Link(conn)
+ go func(i int) {
+ err := <-lnk.Start()
+ if err == nil {
+ log.Printf("%03d: Disconnected.", i)
+ } else {
+ log.Printf("%03d: %s", i, err)
+ }
+ }(i)
+ }
+ return
+}
+
+func main() {
+ path := flag.String("l", "/tmp/globster.sock", "Path to the UNIX listen socket.")
+ flag.Parse()
+
+ n := tanja.NewNode()
+
+ hub := hub.New(n)
+ go hub.Run()
+
+ if err := listen(n, *path); err != nil {
+ log.Fatalf("Error listening on UNIX socket: %s", err)
+ }
+}
diff --git a/src/globster/hub/hub.go b/src/globster/hub/hub.go
new file mode 100644
index 0000000..fb7b377
--- /dev/null
+++ b/src/globster/hub/hub.go
@@ -0,0 +1,70 @@
+package hub
+
+import (
+ "globster/adc"
+ "go.blicky.net/tanja"
+ "time"
+ "net"
+)
+
+// TODO: Do the connect() and write()s in a separate goroutine.
+
+type hub struct {
+ name string
+ ses *tanja.Session
+ conn net.Conn
+}
+
+func newHub(node *tanja.Node, name string) *hub {
+ s := &hub{name:name, ses:node.Session()}
+
+ s.ses.RegRPC(s, func(s string) tanja.Tuple {
+ if s[0] != 'T' {
+ return nil
+ }
+ return tanja.Tup("hub", name, s[1:])
+ })
+
+ s.ses.Send(false, "hub", name, "Created")
+ go s.ses.Run()
+ return s
+}
+
+func (s *hub) Texists(m *tanja.Message) {
+ m.Reply(s.name)
+ m.Close()
+}
+
+func (s *hub) Tconnect(addr string) {
+ if s.conn != nil {
+ return
+ }
+ var e error
+ s.conn, e = net.DialTimeout("tcp", addr, 30*time.Second)
+ if e != nil {
+ s.ses.Send(false, "hub", s.name, "Disconnected", e.Error())
+ } else {
+ s.ses.Send(false, "hub", s.name, "Connected", s.conn.RemoteAddr().String())
+ adc.NewWriter(s.conn).Write(&adc.Message{Header:adc.SUP.SetType('I')})
+ }
+}
+
+func (s *hub) Tdisconnect(m *tanja.Message, r ...tanja.Element) {
+ if s.conn != nil {
+ s.conn.Close()
+ s.conn = nil
+ s.ses.Sendt(false, append(tanja.Tup("hub", s.name, "Disconnected"), r...))
+ }
+ // Only close the return-path after disconnect, to allow users to wait for
+ // successful disconnect.
+ m.Close()
+}
+
+func (s *hub) close() {
+ r := s.ses.Send(true, "hub", s.name, "disconnect")
+ <-r.Chan()
+ r.Close()
+
+ s.ses.Close()
+ s.ses.Send(false, "hub", s.name, "Closed")
+}
diff --git a/src/globster/hub/manager.go b/src/globster/hub/manager.go
new file mode 100644
index 0000000..385d29e
--- /dev/null
+++ b/src/globster/hub/manager.go
@@ -0,0 +1,74 @@
+package hub
+
+import (
+ "go.blicky.net/tanja"
+)
+
+type Manager struct {
+ ses *tanja.Session
+ node *tanja.Node
+ closed chan bool
+ hubs map[string]*hub
+}
+
+func New(node *tanja.Node) *Manager {
+ s := &Manager{}
+ s.ses = node.Session()
+ s.closed = make(chan bool)
+ s.hubs = make(map[string]*hub)
+ s.node = node
+
+ s.ses.Register(true, "hub", nil, "new").Callback(func(t *tanja.Message) { s.tNew(t) })
+ s.ses.Register(false, "hub", nil, "close").Callback(func(t *tanja.Message) { s.tClose(t) })
+
+ return s
+}
+
+func (s *Manager) tNew(m *tanja.Message) {
+ n := m.Tup[1].String()
+ if s.hubs[n] != nil {
+ m.Reply(0)
+ } else if n != "" {
+ s.hubs[n] = newHub(s.node, n)
+ m.Reply(1)
+ }
+ m.Close()
+}
+
+func (s *Manager) tClose(m *tanja.Message) {
+ if m.Tup[1].WC() { // Close all
+ for _, h := range s.hubs {
+ go h.close()
+ }
+ s.hubs = make(map[string]*hub)
+ } else { // Close one
+ n := m.Tup[1].String()
+ if h := s.hubs[n]; h != nil {
+ delete(s.hubs, n)
+ go h.close()
+ }
+ }
+}
+
+func (s *Manager) Run() {
+ s.ses.Run()
+
+ // Close all hubs (in parallel)
+ ch := make(chan bool, 10)
+ for _, h := range s.hubs {
+ go func() {
+ h.close()
+ ch <- true
+ }()
+ }
+ for i := len(s.hubs); i > 0; i-- {
+ <-ch
+ }
+
+ close(s.closed)
+}
+
+func (s *Manager) Close() {
+ s.ses.Close()
+ <-s.closed
+}