diff options
author | Yorhel <git@yorhel.nl> | 2012-04-14 19:06:45 +0200 |
---|---|---|
committer | Yorhel <git@yorhel.nl> | 2012-04-14 19:07:31 +0200 |
commit | cdb96d903fdd65f98eb1ccfe278b752cd89404d9 (patch) | |
tree | dc018ecb059605d8b7cdb3c0852f7716e0f6faa3 | |
parent | f4af5fc86471865a13d911ff160fd44fb764fce5 (diff) |
hub.go: Added async read/write framework
-rw-r--r-- | Makefile | 2 | ||||
-rw-r--r-- | src/globster/globster.go | 2 | ||||
-rw-r--r-- | src/globster/hub/hub.go | 59 |
3 files changed, 51 insertions, 12 deletions
@@ -26,6 +26,6 @@ distclean: clean # Yorhel's playground. play: all @./globster -l sock &echo $$!>pid;\ - perl -I../tanja/perl ../tanja/perl/tanja-cli.pl -vc $$PWD/sock;\ + PERL_ANYEVENT_MODEL=Perl perl -I../tanja/perl ../tanja/perl/tanja-cli.pl -vc $$PWD/sock;\ kill `cat pid`;\ rm -f sock pid diff --git a/src/globster/globster.go b/src/globster/globster.go index b511e5b..c604f33 100644 --- a/src/globster/globster.go +++ b/src/globster/globster.go @@ -1,10 +1,10 @@ package main import ( + "blicky.net/tanja" "flag" "globster/hub" "globster/userlist" - "blicky.net/tanja" "log" "net" ) diff --git a/src/globster/hub/hub.go b/src/globster/hub/hub.go index f17c2e7..ecb75cd 100644 --- a/src/globster/hub/hub.go +++ b/src/globster/hub/hub.go @@ -1,19 +1,20 @@ package hub import ( - "globster/adc" + "blicky.net/asyncwr" "blicky.net/tanja" + "globster/adc" "net" "time" ) -// TODO: Do write()s in a separate goroutine. - type hub struct { name string ses *tanja.Session // Only set in the connected state, nil otherwise. conn net.Conn + wr *adc.Writer + rd <-chan *adc.Message // Channel for the asynchronous connect. Only set in the connecting state, // otherwise nil. Only one value is written to the channel, a net.Conn on // success or an error otherwise. @@ -40,6 +41,12 @@ func newHub(node *tanja.Node, name string) *hub { return } m.Dispatch() + case m := <-s.rd: + if m == nil { + s.rd = nil + } else { + s.handleMessage(m) + } case c := <-s.connCh: s.connected(c) } @@ -96,6 +103,15 @@ func (s *hub) Tdisconnect(m *tanja.Message, r ...tanja.Element) { m.Close() }(s.conn) s.conn = nil + s.wr = nil + // Also drain the reader channel (to make sure the reader goroutine doesn't deadlock) + if s.rd != nil { + go func(ch <-chan *adc.Message) { + for _ = range ch { + } + }(s.rd) + s.rd = nil + } } else { m.Close() @@ -104,14 +120,37 @@ func (s *hub) Tdisconnect(m *tanja.Message, r ...tanja.Element) { func (s *hub) connected(r interface{}) { s.connCh = nil - switch v := r.(type) { - case net.Conn: - s.conn = v - s.ses.Send(false, "hub", s.name, "Connected", s.conn.RemoteAddr().String()) - adc.NewWriter(s.conn).Write(&adc.Message{Header: adc.SUP.SetType('I')}) - case error: - s.ses.Send(false, "hub", s.name, "Disconnected", v.Error()) + if e, ok := r.(error); ok { + s.ses.Send(false, "hub", s.name, "Disconnected", e.Error()) + return } + + // Successful connect + s.conn = r.(net.Conn) + s.ses.Send(false, "hub", s.name, "Connected", s.conn.RemoteAddr().String()) + s.wr = adc.NewWriter(asyncwr.New(s.conn, 1024*1024)) + + // Start reader goroutine + ch := make(chan *adc.Message, 10) + s.rd = ch + go func(rd *adc.Reader) { + for { + m, err := rd.Read() + if err != nil { + s.ses.Send(false, "hub", s.name, "disconnect", err.Error()) + break + } else { + ch <- m + } + } + close(ch) + }(adc.NewReader(s.conn, 100*1024)) + + // Start handshake + s.wr.Write(&adc.Message{Header: adc.SUP.SetType('I')}) +} + +func (s *hub) handleMessage(m *adc.Message) { } func (s *hub) close() { |