summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYorhel <git@yorhel.nl>2012-04-10 21:30:45 +0200
committerYorhel <git@yorhel.nl>2012-04-10 21:30:45 +0200
commit62f4d5b3bda688a7099a99b9d84b76b831f5ead8 (patch)
treeac845f2729344f7f905e4f3dcb5e509235439a09
parent675245c2da4c59ce48eb2da0ad0c9e317ad478b2 (diff)
hub: Use asynchronous connect
This allows a connect to be cancelled by sending a 'disconnect', which immediately puts the hub into disconnected state again.
-rw-r--r--src/globster/hub/hub.go68
1 files changed, 58 insertions, 10 deletions
diff --git a/src/globster/hub/hub.go b/src/globster/hub/hub.go
index 5bf7f79..da032e5 100644
--- a/src/globster/hub/hub.go
+++ b/src/globster/hub/hub.go
@@ -12,7 +12,12 @@ import (
type hub struct {
name string
ses *tanja.Session
+ // Only set in the connected state, nil otherwise.
conn net.Conn
+ // Channel for the asynchronous connect. Only set in the connecting state,
+ // otherwise nil. Only one value is written to the channel, a net.Conn on
+ // success or an error otherwise.
+ connCh <-chan interface{}
}
func newHub(node *tanja.Node, name string) *hub {
@@ -26,7 +31,21 @@ func newHub(node *tanja.Node, name string) *hub {
})
s.ses.Send(false, "hub", name, "Created")
- go s.ses.Run()
+
+ go func() {
+ for {
+ select {
+ case m, ok := <-s.ses.Chan():
+ if !ok {
+ return
+ }
+ m.Dispatch()
+ case c := <-s.connCh:
+ s.connected(c)
+ }
+ }
+ s.ses.Run()
+ }()
return s
}
@@ -36,30 +55,59 @@ func (s *hub) Texists(m *tanja.Message) {
}
func (s *hub) Tconnect(addr string) {
- if s.conn != nil {
+ if s.conn != nil || s.connCh != nil {
return
}
- var e error
- s.conn, e = net.DialTimeout("tcp", addr, 30*time.Second)
- if e != nil {
- s.ses.Send(false, "hub", s.name, "Disconnected", e.Error())
- } else {
- s.ses.Send(false, "hub", s.name, "Connected", s.conn.RemoteAddr().String())
- adc.NewWriter(s.conn).Write(&adc.Message{Header: adc.SUP.SetType('I')})
- }
+ ch := make(chan interface{}, 2)
+ s.connCh = ch
+ go func() {
+ c, e := net.DialTimeout("tcp", addr, 30*time.Second)
+ if e != nil {
+ ch <- e
+ } else {
+ ch <- c
+ }
+ }()
}
func (s *hub) Tdisconnect(m *tanja.Message, r ...tanja.Element) {
+ // If we're in the connecting state, set connCh to nil so we won't be
+ // getting its result. In case the connection was successful, make sure to
+ // immediately disconnect again.
+ if s.connCh != nil {
+ go func(ch <-chan interface{}) {
+ if c, ok := (<-ch).(net.Conn); ok {
+ c.Close()
+ }
+ }(s.connCh)
+ s.connCh = nil
+ s.ses.Sendt(false, append(tanja.Tup("hub", s.name, "Disconnected"), r...))
+ }
+
+ // If we're connected, properly disconnect. (TODO: async?)
if s.conn != nil {
s.conn.Close()
s.conn = nil
s.ses.Sendt(false, append(tanja.Tup("hub", s.name, "Disconnected"), r...))
}
+
// Only close the return-path after disconnect, to allow users to wait for
// successful disconnect.
m.Close()
}
+func (s *hub) connected(r interface{}) {
+ s.connCh = nil
+ switch v := r.(type) {
+ case net.Conn:
+ s.conn = v
+ s.ses.Send(false, "hub", s.name, "Connected", s.conn.RemoteAddr().String())
+ adc.NewWriter(s.conn).Write(&adc.Message{Header: adc.SUP.SetType('I')})
+ case error:
+ s.ses.Send(false, "hub", s.name, "Disconnected", v.Error())
+ }
+}
+
func (s *hub) close() {
r := s.ses.Send(true, "hub", s.name, "disconnect")
<-r.Chan()