diff options
Diffstat (limited to 'Tanja.pm')
-rw-r--r-- | Tanja.pm | 34 |
1 files changed, 21 insertions, 13 deletions
@@ -187,33 +187,39 @@ use Errno 'EPIPE'; use Carp 'croak'; -# Args: Tanja::Server, AnyEvent::Handle, %options +# Args: Tanja::Server, %options # Options: # sync => bool. True to fetch the other servers' patterns. (Default true) +# handle => AnyEvent::Handle. Equivalent to setting write_handle and read_handle to the same thing. +# write_handle => AnyEvent::Handle. Used for writing. +# read_handle => AnyEvent::Handle. Used for reading. # on_error => $cb->($message). Called when an error occurs, $message = undef for disconnect. # on_ready => $cb->(). Called when the link is "ready". (i.e. if you send tuples they will be forwarded) # on_write => $cb->(message => args). Called when queueing a (non-handshake) message for sending. # on_read => $cb->(message => args). Called when a (non-handshake) message has been received. sub new { - my($own, $serv, $handle, %o) = @_; + my($own, $serv, %o) = @_; require AnyEvent::Handle; croak 'No JSON module available.' if !Tanja::Link::JSON->init; my $s = bless { sync => 1, %o, serv => $serv, - hdl => $handle, reg => {}, ret => {}, lastret => 1, lasttup => [], }, $own; + $s->{write_handle} ||= $s->{handle}; + $s->{read_handle} ||= $s->{handle}; + croak 'No read or write handle set.' if !$s->{write_handle} || !$s->{read_handle}; $s->{tup} = $s->_tuple; - $handle->on_error(sub { $s->_err($! == EPIPE ? undef : $_[2]) }); + $s->{write_handle}->on_error(sub { $s->_err($! == EPIPE ? undef : $_[2]) }); + $s->{read_handle}->on_error(sub { $s->_err($! == EPIPE ? undef : $_[2]) }); # TODO: Better to have a timeout on writes. (Although this buffer limit isn't bad either) - $handle->wbuf_max(5*1024*1024); # This would be really bad. - $handle->rbuf_max(5*1024*1024); + $s->{write_handle}->wbuf_max(5*1024*1024); # This would be really bad. + $s->{read_handle}->rbuf_max(5*1024*1024); $s->_handshake; return $s; } @@ -229,20 +235,22 @@ sub _cleanup { sub _err { my($s, $m) = @_; $s->_cleanup; - $s->{hdl}->destroy; + $s->{read_handle}->destroy; + $s->{write_handle}->destroy; $s->{on_error} && $s->{on_error}->($m); } sub close { $_[0]->_cleanup; - $_[0]->{hdl}->push_shutdown; + $_[0]->{write_handle}->push_shutdown; + $_[0]->{read_handle}->push_shutdown; } sub _write { my $s = shift; - $s->{hdl}->push_write('Tanja::Link::JSON' => @_); + $s->{write_handle}->push_write('Tanja::Link::JSON' => @_); $s->{on_write} && $s->{on_write}->(@_); } @@ -251,10 +259,10 @@ sub _handshake { my $s = shift; # Send our handshake message - $s->{hdl}->push_write("ver,1.0 seri,json sero,json\012"); + $s->{write_handle}->push_write("ver,1.0 seri,json sero,json\012"); # Receive handshake from other party - $s->{hdl}->push_read(line => "\012" => sub { + $s->{read_handle}->push_read(line => "\012" => sub { my $ver = ($_[1] =~ /\bver,([^ ]+)\b/) && $1; my $seri = ($_[1] =~ /\bseri,([^ ]+)\b/) && $1; my $sero = ($_[1] =~ /\bsero,([^ ]+)\b/) && $1; @@ -266,7 +274,7 @@ sub _handshake { or !grep $_ eq 'json', split /,/, $sero; # Handshake complete, start receiving stuff - $s->{hdl}->push_read('Tanja::Link::JSON' => sub { $s->_recv(@_) }); + $s->{read_handle}->push_read('Tanja::Link::JSON' => sub { $s->_recv(@_) }); # Setup tuple forwarding if($s->{sync}) { @@ -374,7 +382,7 @@ sub _recv { } # and receive next message - $s->{hdl}->push_read('Tanja::Link::JSON' => sub { $s->_recv(@_) }); + $s->{read_handle}->push_read('Tanja::Link::JSON' => sub { $s->_recv(@_) }); } |