diff options
Diffstat (limited to 'src/globster/hub/hub.go')
-rw-r--r-- | src/globster/hub/hub.go | 46 |
1 files changed, 39 insertions, 7 deletions
diff --git a/src/globster/hub/hub.go b/src/globster/hub/hub.go index ecb75cd..6ee6237 100644 --- a/src/globster/hub/hub.go +++ b/src/globster/hub/hub.go @@ -3,6 +3,7 @@ package hub import ( "blicky.net/asyncwr" "blicky.net/tanja" + "errors" "globster/adc" "net" "time" @@ -19,6 +20,9 @@ type hub struct { // otherwise nil. Only one value is written to the channel, a net.Conn on // success or an error otherwise. connCh <-chan interface{} + // ADC info (only useful if conn != nil) + state adc.State + mysid string } func newHub(node *tanja.Node, name string) *hub { @@ -44,8 +48,8 @@ func newHub(node *tanja.Node, name string) *hub { case m := <-s.rd: if m == nil { s.rd = nil - } else { - s.handleMessage(m) + } else if err := adc.HandleHub(m, s.state, s); err != nil { + s.ses.Send(false, "hub", s.name, "disconnect", err.Error()) } case c := <-s.connCh: s.connected(c) @@ -82,9 +86,10 @@ func (s *hub) Tconnect(addr string) { // notification is sent immediately, but the return path is only closed after // the disconnect has been completed. func (s *hub) Tdisconnect(m *tanja.Message, r ...tanja.Element) { - s.ses.Sendt(false, append(tanja.Tup("hub", s.name, "Disconnected"), r...)) + broadcast := false if s.connCh != nil { + broadcast = true // If we're in the connecting state, set connCh to nil so we won't be // getting its result. In case the connection was successful, make sure // to immediately disconnect again. @@ -97,6 +102,7 @@ func (s *hub) Tdisconnect(m *tanja.Message, r ...tanja.Element) { m.Close() } else if s.conn != nil { + broadcast = true // If we're connected, close the connection. go func(c net.Conn) { c.Close() @@ -116,6 +122,10 @@ func (s *hub) Tdisconnect(m *tanja.Message, r ...tanja.Element) { } else { m.Close() } + + if broadcast { + s.ses.Sendt(false, append(tanja.Tup("hub", s.name, "Disconnected"), r...)) + } } func (s *hub) connected(r interface{}) { @@ -139,18 +149,40 @@ func (s *hub) connected(r interface{}) { if err != nil { s.ses.Send(false, "hub", s.name, "disconnect", err.Error()) break - } else { + } else if m != nil { ch <- m } } close(ch) }(adc.NewReader(s.conn, 100*1024)) - // Start handshake - s.wr.Write(&adc.Message{Header: adc.SUP.SetType('I')}) + // Initiate handshake + s.state = adc.PROTOCOL + s.wr.Write(adc.NewMessage('H', adc.SUP).AddNamed("AD", "BASE").AddNamed("AD", "TIGR")) +} + +func (s *hub) AdcSUP(ad []string, rm []string) error { + if s.state != adc.PROTOCOL || len(rm) > 0 { + return errors.New("Dynamic changing of protocol features is not supported") + } + hasbase, hastigr := false, false + for _, v := range ad { + if string(v) == "BASE" { + hasbase = true + } else if string(v) == "TIGR" { + hastigr = true + } + } + if !hasbase || !hastigr { + return errors.New("Hub does not support BASE or TIGR") + } + return nil } -func (s *hub) handleMessage(m *adc.Message) { +func (s *hub) AdcSID(sid string) error { + s.mysid = sid + s.state = adc.IDENTIFY + return nil } func (s *hub) close() { |