diff options
Diffstat (limited to 'src/globster/hub/hub.go')
-rw-r--r-- | src/globster/hub/hub.go | 59 |
1 files changed, 49 insertions, 10 deletions
diff --git a/src/globster/hub/hub.go b/src/globster/hub/hub.go index f17c2e7..ecb75cd 100644 --- a/src/globster/hub/hub.go +++ b/src/globster/hub/hub.go @@ -1,19 +1,20 @@ package hub import ( - "globster/adc" + "blicky.net/asyncwr" "blicky.net/tanja" + "globster/adc" "net" "time" ) -// TODO: Do write()s in a separate goroutine. - type hub struct { name string ses *tanja.Session // Only set in the connected state, nil otherwise. conn net.Conn + wr *adc.Writer + rd <-chan *adc.Message // Channel for the asynchronous connect. Only set in the connecting state, // otherwise nil. Only one value is written to the channel, a net.Conn on // success or an error otherwise. @@ -40,6 +41,12 @@ func newHub(node *tanja.Node, name string) *hub { return } m.Dispatch() + case m := <-s.rd: + if m == nil { + s.rd = nil + } else { + s.handleMessage(m) + } case c := <-s.connCh: s.connected(c) } @@ -96,6 +103,15 @@ func (s *hub) Tdisconnect(m *tanja.Message, r ...tanja.Element) { m.Close() }(s.conn) s.conn = nil + s.wr = nil + // Also drain the reader channel (to make sure the reader goroutine doesn't deadlock) + if s.rd != nil { + go func(ch <-chan *adc.Message) { + for _ = range ch { + } + }(s.rd) + s.rd = nil + } } else { m.Close() @@ -104,14 +120,37 @@ func (s *hub) Tdisconnect(m *tanja.Message, r ...tanja.Element) { func (s *hub) connected(r interface{}) { s.connCh = nil - switch v := r.(type) { - case net.Conn: - s.conn = v - s.ses.Send(false, "hub", s.name, "Connected", s.conn.RemoteAddr().String()) - adc.NewWriter(s.conn).Write(&adc.Message{Header: adc.SUP.SetType('I')}) - case error: - s.ses.Send(false, "hub", s.name, "Disconnected", v.Error()) + if e, ok := r.(error); ok { + s.ses.Send(false, "hub", s.name, "Disconnected", e.Error()) + return } + + // Successful connect + s.conn = r.(net.Conn) + s.ses.Send(false, "hub", s.name, "Connected", s.conn.RemoteAddr().String()) + s.wr = adc.NewWriter(asyncwr.New(s.conn, 1024*1024)) + + // Start reader goroutine + ch := make(chan *adc.Message, 10) + s.rd = ch + go func(rd *adc.Reader) { + for { + m, err := rd.Read() + if err != nil { + s.ses.Send(false, "hub", s.name, "disconnect", err.Error()) + break + } else { + ch <- m + } + } + close(ch) + }(adc.NewReader(s.conn, 100*1024)) + + // Start handshake + s.wr.Write(&adc.Message{Header: adc.SUP.SetType('I')}) +} + +func (s *hub) handleMessage(m *adc.Message) { } func (s *hub) close() { |