summaryrefslogtreecommitdiff
path: root/src/globster/hub/hub.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/globster/hub/hub.go')
-rw-r--r--src/globster/hub/hub.go46
1 files changed, 39 insertions, 7 deletions
diff --git a/src/globster/hub/hub.go b/src/globster/hub/hub.go
index ecb75cd..6ee6237 100644
--- a/src/globster/hub/hub.go
+++ b/src/globster/hub/hub.go
@@ -3,6 +3,7 @@ package hub
import (
"blicky.net/asyncwr"
"blicky.net/tanja"
+ "errors"
"globster/adc"
"net"
"time"
@@ -19,6 +20,9 @@ type hub struct {
// otherwise nil. Only one value is written to the channel, a net.Conn on
// success or an error otherwise.
connCh <-chan interface{}
+ // ADC info (only useful if conn != nil)
+ state adc.State
+ mysid string
}
func newHub(node *tanja.Node, name string) *hub {
@@ -44,8 +48,8 @@ func newHub(node *tanja.Node, name string) *hub {
case m := <-s.rd:
if m == nil {
s.rd = nil
- } else {
- s.handleMessage(m)
+ } else if err := adc.HandleHub(m, s.state, s); err != nil {
+ s.ses.Send(false, "hub", s.name, "disconnect", err.Error())
}
case c := <-s.connCh:
s.connected(c)
@@ -82,9 +86,10 @@ func (s *hub) Tconnect(addr string) {
// notification is sent immediately, but the return path is only closed after
// the disconnect has been completed.
func (s *hub) Tdisconnect(m *tanja.Message, r ...tanja.Element) {
- s.ses.Sendt(false, append(tanja.Tup("hub", s.name, "Disconnected"), r...))
+ broadcast := false
if s.connCh != nil {
+ broadcast = true
// If we're in the connecting state, set connCh to nil so we won't be
// getting its result. In case the connection was successful, make sure
// to immediately disconnect again.
@@ -97,6 +102,7 @@ func (s *hub) Tdisconnect(m *tanja.Message, r ...tanja.Element) {
m.Close()
} else if s.conn != nil {
+ broadcast = true
// If we're connected, close the connection.
go func(c net.Conn) {
c.Close()
@@ -116,6 +122,10 @@ func (s *hub) Tdisconnect(m *tanja.Message, r ...tanja.Element) {
} else {
m.Close()
}
+
+ if broadcast {
+ s.ses.Sendt(false, append(tanja.Tup("hub", s.name, "Disconnected"), r...))
+ }
}
func (s *hub) connected(r interface{}) {
@@ -139,18 +149,40 @@ func (s *hub) connected(r interface{}) {
if err != nil {
s.ses.Send(false, "hub", s.name, "disconnect", err.Error())
break
- } else {
+ } else if m != nil {
ch <- m
}
}
close(ch)
}(adc.NewReader(s.conn, 100*1024))
- // Start handshake
- s.wr.Write(&adc.Message{Header: adc.SUP.SetType('I')})
+ // Initiate handshake
+ s.state = adc.PROTOCOL
+ s.wr.Write(adc.NewMessage('H', adc.SUP).AddNamed("AD", "BASE").AddNamed("AD", "TIGR"))
+}
+
+func (s *hub) AdcSUP(ad []string, rm []string) error {
+ if s.state != adc.PROTOCOL || len(rm) > 0 {
+ return errors.New("Dynamic changing of protocol features is not supported")
+ }
+ hasbase, hastigr := false, false
+ for _, v := range ad {
+ if string(v) == "BASE" {
+ hasbase = true
+ } else if string(v) == "TIGR" {
+ hastigr = true
+ }
+ }
+ if !hasbase || !hastigr {
+ return errors.New("Hub does not support BASE or TIGR")
+ }
+ return nil
}
-func (s *hub) handleMessage(m *adc.Message) {
+func (s *hub) AdcSID(sid string) error {
+ s.mysid = sid
+ s.state = adc.IDENTIFY
+ return nil
}
func (s *hub) close() {