diff options
author | Yorhel <git@yorhel.nl> | 2012-04-15 16:07:09 +0200 |
---|---|---|
committer | Yorhel <git@yorhel.nl> | 2012-04-15 16:07:09 +0200 |
commit | 8a698dbcb6dd15c4581c8b145b333c2a4f2eb740 (patch) | |
tree | a9a9f7c8d95bfbeb47a40234a0aae8054990953b | |
parent | cdb96d903fdd65f98eb1ccfe278b752cd89404d9 (diff) |
hub,adc: Added framework for handling incoming ADC messages
-rw-r--r-- | src/globster/adc/handle.go | 64 | ||||
-rw-r--r-- | src/globster/adc/parse.go | 3 | ||||
-rw-r--r-- | src/globster/adc/types.go | 78 | ||||
-rw-r--r-- | src/globster/hub/hub.go | 46 |
4 files changed, 166 insertions, 25 deletions
diff --git a/src/globster/adc/handle.go b/src/globster/adc/handle.go new file mode 100644 index 0000000..669b4c5 --- /dev/null +++ b/src/globster/adc/handle.go @@ -0,0 +1,64 @@ +package adc + +import "errors" + +var ( + ErrInvalidState = errors.New("Received message in invalid state") + ErrInvalidType = errors.New("Message with invalid type") + ErrInvalidArg = errors.New("Invalid message") +) + +type HubHandler interface { + AdcSUP(ad, rm []string) error + AdcSID(string) error +} + +// Assumes the message has been received from a hub. This function performs +// validation on the message and, if valid, calls the respective method in +// Handler. +func HandleHub(m *Message, state State, h HubHandler) error { + if m == nil { + return nil + } + nfo := cmdList[m.Header.Command()] + if state != InvalidState && (state&nfo.states) == 0 { + return ErrInvalidState + } + hastype := false + for _, t := range []byte(nfo.htypes) { + if t == m.Header.Type() { + hastype = true + break + } + } + if !hastype { + return ErrInvalidType + } + + switch m.Header.Command() { + case SUP: + f := func(a []Arg) []string { + r := make([]string, 0, len(a)) + for i, v := range a { + if v.FourCC() == 0 { + return nil + } + r[i] = string(v) + } + return r + } + ad, rm := f(m.NamedArgs[[2]byte{'A', 'D'}]), f(m.NamedArgs[[2]byte{'R', 'M'}]) + if ad == nil || rm == nil { + return ErrInvalidArg + } + return h.AdcSUP(ad, rm) + + case SID: + if !m.PosArgs[0].IsSID() { + return ErrInvalidArg + } + return h.AdcSID(string(m.PosArgs[0])) + + } + return nil +} diff --git a/src/globster/adc/parse.go b/src/globster/adc/parse.go index 192d95a..12b5dd7 100644 --- a/src/globster/adc/parse.go +++ b/src/globster/adc/parse.go @@ -62,7 +62,7 @@ func Parse(buf []byte) (*Message, error) { } m.Header = NewFourCC(buf) t := m.Header.Type() - n, ok := cmdArgNum[m.Header.Command()] + nfo, ok := cmdList[m.Header.Command()] if !ok { return nil, errors.New("Invalid command.") } @@ -104,6 +104,7 @@ func Parse(buf []byte) (*Message, error) { } // Positional arguments + n := nfo.args for n > 0 { if len(buf) < 1 || buf[0] != ' ' { return nil, errors.New("Invalid or missing positional argument.") diff --git a/src/globster/adc/types.go b/src/globster/adc/types.go index ecea43f..83f9491 100644 --- a/src/globster/adc/types.go +++ b/src/globster/adc/types.go @@ -6,6 +6,9 @@ type FourCC int32 type Arg string +// Protocol states +type State int + // TODO: feature broadcasts type Message struct { Header FourCC @@ -34,23 +37,38 @@ const ( SUP FourCC = 0x2A535550 ) -// List of all valid commands and their number of fixed positional arguments. -var cmdArgNum map[FourCC]int = map[FourCC]int{ - CTM: 3, - GET: 4, - GFI: 2, - GPA: 1, - INF: 0, - MSG: 1, - PAS: 1, - QUI: 1, - RCM: 2, - RES: 0, - SCH: 0, - SID: 1, - SND: 4, - STA: 2, - SUP: 0, +const ( + InvalidState State = (1 << iota) + PROTOCOL + IDENTIFY + VERIFY + NORMAL + DATA +) + +type cmdInfo struct { + args int // Number of fixed positional arguments + states State // OR'ed states in which this command is allowed + htypes string // allowed message types when received from a hub +} + +// List of all known commands +var cmdList = map[FourCC]cmdInfo{ + CTM: {3, NORMAL, "DEF"}, + GET: {4, NORMAL, ""}, // htype = "I" for the BLOM extension + GFI: {2, NORMAL, ""}, + GPA: {1, VERIFY, "I"}, + INF: {0, IDENTIFY | NORMAL, "IBDEF"}, // (doesn't make much sense with htype=D/E/F, but it's allowed) + MSG: {1, NORMAL, "IBDEF"}, + PAS: {1, VERIFY, ""}, + QUI: {1, IDENTIFY | VERIFY | NORMAL, "I"}, + RCM: {2, NORMAL, "DEF"}, + RES: {0, NORMAL, "D"}, + SCH: {0, NORMAL, "BDEF"}, + SID: {1, PROTOCOL, "I"}, + SND: {4, NORMAL, ""}, // htype = "I" for the BLOM extension + STA: {2, PROTOCOL | IDENTIFY | VERIFY | NORMAL | DATA, "IBDEF"}, + SUP: {0, NORMAL | PROTOCOL, "I"}, } // len(s) must be >= 4 @@ -78,6 +96,24 @@ func (f FourCC) String() string { return string(f.Format(nil)) } +func NewMessage(t byte, m FourCC) *Message { + return &Message{Header: m.SetType(t)} +} + +func (m *Message) AddPos(s string) *Message { + m.PosArgs = append(m.PosArgs, Arg(s)) + return m +} + +func (m *Message) AddNamed(n, s string) *Message { + if m.NamedArgs == nil { + m.NamedArgs = make(map[[2]byte][]Arg) + } + b := [2]byte{n[0], n[1]} + m.NamedArgs[b] = append(m.NamedArgs[b], Arg(s)) + return m +} + // Returns the first found value of the named argument, or an empty string if not found. func (m *Message) Arg(a [2]byte) Arg { if l := m.NamedArgs[a]; l != nil && len(l) > 0 { @@ -104,6 +140,14 @@ func isSID(b []byte) bool { return len(b) >= 4 && isBase32(b[0]) && isBase32(b[1]) && isBase32(b[2]) && isBase32(b[3]) } +// Returns 0 if arg is not a valid FourCC +func (a Arg) FourCC() FourCC { + if len(a) == 4 && isAlphaNum(a[0]) && isAlphaNum(a[1]) && isAlphaNum(a[2]) && isAlphaNum(a[3]) { + return NewFourCC([]byte(a)) + } + return FourCC(0) +} + func (a Arg) IsSID() bool { return len(a) == 4 && isSID([]byte(a)) } diff --git a/src/globster/hub/hub.go b/src/globster/hub/hub.go index ecb75cd..6ee6237 100644 --- a/src/globster/hub/hub.go +++ b/src/globster/hub/hub.go @@ -3,6 +3,7 @@ package hub import ( "blicky.net/asyncwr" "blicky.net/tanja" + "errors" "globster/adc" "net" "time" @@ -19,6 +20,9 @@ type hub struct { // otherwise nil. Only one value is written to the channel, a net.Conn on // success or an error otherwise. connCh <-chan interface{} + // ADC info (only useful if conn != nil) + state adc.State + mysid string } func newHub(node *tanja.Node, name string) *hub { @@ -44,8 +48,8 @@ func newHub(node *tanja.Node, name string) *hub { case m := <-s.rd: if m == nil { s.rd = nil - } else { - s.handleMessage(m) + } else if err := adc.HandleHub(m, s.state, s); err != nil { + s.ses.Send(false, "hub", s.name, "disconnect", err.Error()) } case c := <-s.connCh: s.connected(c) @@ -82,9 +86,10 @@ func (s *hub) Tconnect(addr string) { // notification is sent immediately, but the return path is only closed after // the disconnect has been completed. func (s *hub) Tdisconnect(m *tanja.Message, r ...tanja.Element) { - s.ses.Sendt(false, append(tanja.Tup("hub", s.name, "Disconnected"), r...)) + broadcast := false if s.connCh != nil { + broadcast = true // If we're in the connecting state, set connCh to nil so we won't be // getting its result. In case the connection was successful, make sure // to immediately disconnect again. @@ -97,6 +102,7 @@ func (s *hub) Tdisconnect(m *tanja.Message, r ...tanja.Element) { m.Close() } else if s.conn != nil { + broadcast = true // If we're connected, close the connection. go func(c net.Conn) { c.Close() @@ -116,6 +122,10 @@ func (s *hub) Tdisconnect(m *tanja.Message, r ...tanja.Element) { } else { m.Close() } + + if broadcast { + s.ses.Sendt(false, append(tanja.Tup("hub", s.name, "Disconnected"), r...)) + } } func (s *hub) connected(r interface{}) { @@ -139,18 +149,40 @@ func (s *hub) connected(r interface{}) { if err != nil { s.ses.Send(false, "hub", s.name, "disconnect", err.Error()) break - } else { + } else if m != nil { ch <- m } } close(ch) }(adc.NewReader(s.conn, 100*1024)) - // Start handshake - s.wr.Write(&adc.Message{Header: adc.SUP.SetType('I')}) + // Initiate handshake + s.state = adc.PROTOCOL + s.wr.Write(adc.NewMessage('H', adc.SUP).AddNamed("AD", "BASE").AddNamed("AD", "TIGR")) +} + +func (s *hub) AdcSUP(ad []string, rm []string) error { + if s.state != adc.PROTOCOL || len(rm) > 0 { + return errors.New("Dynamic changing of protocol features is not supported") + } + hasbase, hastigr := false, false + for _, v := range ad { + if string(v) == "BASE" { + hasbase = true + } else if string(v) == "TIGR" { + hastigr = true + } + } + if !hasbase || !hastigr { + return errors.New("Hub does not support BASE or TIGR") + } + return nil } -func (s *hub) handleMessage(m *adc.Message) { +func (s *hub) AdcSID(sid string) error { + s.mysid = sid + s.state = adc.IDENTIFY + return nil } func (s *hub) close() { |