summaryrefslogtreecommitdiff
path: root/src/globster/hub/hub.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/globster/hub/hub.go')
-rw-r--r--src/globster/hub/hub.go59
1 files changed, 49 insertions, 10 deletions
diff --git a/src/globster/hub/hub.go b/src/globster/hub/hub.go
index f17c2e7..ecb75cd 100644
--- a/src/globster/hub/hub.go
+++ b/src/globster/hub/hub.go
@@ -1,19 +1,20 @@
package hub
import (
- "globster/adc"
+ "blicky.net/asyncwr"
"blicky.net/tanja"
+ "globster/adc"
"net"
"time"
)
-// TODO: Do write()s in a separate goroutine.
-
type hub struct {
name string
ses *tanja.Session
// Only set in the connected state, nil otherwise.
conn net.Conn
+ wr *adc.Writer
+ rd <-chan *adc.Message
// Channel for the asynchronous connect. Only set in the connecting state,
// otherwise nil. Only one value is written to the channel, a net.Conn on
// success or an error otherwise.
@@ -40,6 +41,12 @@ func newHub(node *tanja.Node, name string) *hub {
return
}
m.Dispatch()
+ case m := <-s.rd:
+ if m == nil {
+ s.rd = nil
+ } else {
+ s.handleMessage(m)
+ }
case c := <-s.connCh:
s.connected(c)
}
@@ -96,6 +103,15 @@ func (s *hub) Tdisconnect(m *tanja.Message, r ...tanja.Element) {
m.Close()
}(s.conn)
s.conn = nil
+ s.wr = nil
+ // Also drain the reader channel (to make sure the reader goroutine doesn't deadlock)
+ if s.rd != nil {
+ go func(ch <-chan *adc.Message) {
+ for _ = range ch {
+ }
+ }(s.rd)
+ s.rd = nil
+ }
} else {
m.Close()
@@ -104,14 +120,37 @@ func (s *hub) Tdisconnect(m *tanja.Message, r ...tanja.Element) {
func (s *hub) connected(r interface{}) {
s.connCh = nil
- switch v := r.(type) {
- case net.Conn:
- s.conn = v
- s.ses.Send(false, "hub", s.name, "Connected", s.conn.RemoteAddr().String())
- adc.NewWriter(s.conn).Write(&adc.Message{Header: adc.SUP.SetType('I')})
- case error:
- s.ses.Send(false, "hub", s.name, "Disconnected", v.Error())
+ if e, ok := r.(error); ok {
+ s.ses.Send(false, "hub", s.name, "Disconnected", e.Error())
+ return
}
+
+ // Successful connect
+ s.conn = r.(net.Conn)
+ s.ses.Send(false, "hub", s.name, "Connected", s.conn.RemoteAddr().String())
+ s.wr = adc.NewWriter(asyncwr.New(s.conn, 1024*1024))
+
+ // Start reader goroutine
+ ch := make(chan *adc.Message, 10)
+ s.rd = ch
+ go func(rd *adc.Reader) {
+ for {
+ m, err := rd.Read()
+ if err != nil {
+ s.ses.Send(false, "hub", s.name, "disconnect", err.Error())
+ break
+ } else {
+ ch <- m
+ }
+ }
+ close(ch)
+ }(adc.NewReader(s.conn, 100*1024))
+
+ // Start handshake
+ s.wr.Write(&adc.Message{Header: adc.SUP.SetType('I')})
+}
+
+func (s *hub) handleMessage(m *adc.Message) {
}
func (s *hub) close() {