summaryrefslogtreecommitdiff
path: root/Tanja.pm
diff options
context:
space:
mode:
Diffstat (limited to 'Tanja.pm')
-rw-r--r--Tanja.pm34
1 files changed, 21 insertions, 13 deletions
diff --git a/Tanja.pm b/Tanja.pm
index ae04aaa..58b2ac6 100644
--- a/Tanja.pm
+++ b/Tanja.pm
@@ -187,33 +187,39 @@ use Errno 'EPIPE';
use Carp 'croak';
-# Args: Tanja::Server, AnyEvent::Handle, %options
+# Args: Tanja::Server, %options
# Options:
# sync => bool. True to fetch the other servers' patterns. (Default true)
+# handle => AnyEvent::Handle. Equivalent to setting write_handle and read_handle to the same thing.
+# write_handle => AnyEvent::Handle. Used for writing.
+# read_handle => AnyEvent::Handle. Used for reading.
# on_error => $cb->($message). Called when an error occurs, $message = undef for disconnect.
# on_ready => $cb->(). Called when the link is "ready". (i.e. if you send tuples they will be forwarded)
# on_write => $cb->(message => args). Called when queueing a (non-handshake) message for sending.
# on_read => $cb->(message => args). Called when a (non-handshake) message has been received.
sub new {
- my($own, $serv, $handle, %o) = @_;
+ my($own, $serv, %o) = @_;
require AnyEvent::Handle;
croak 'No JSON module available.' if !Tanja::Link::JSON->init;
my $s = bless {
sync => 1,
%o,
serv => $serv,
- hdl => $handle,
reg => {},
ret => {},
lastret => 1,
lasttup => [],
}, $own;
+ $s->{write_handle} ||= $s->{handle};
+ $s->{read_handle} ||= $s->{handle};
+ croak 'No read or write handle set.' if !$s->{write_handle} || !$s->{read_handle};
$s->{tup} = $s->_tuple;
- $handle->on_error(sub { $s->_err($! == EPIPE ? undef : $_[2]) });
+ $s->{write_handle}->on_error(sub { $s->_err($! == EPIPE ? undef : $_[2]) });
+ $s->{read_handle}->on_error(sub { $s->_err($! == EPIPE ? undef : $_[2]) });
# TODO: Better to have a timeout on writes. (Although this buffer limit isn't bad either)
- $handle->wbuf_max(5*1024*1024); # This would be really bad.
- $handle->rbuf_max(5*1024*1024);
+ $s->{write_handle}->wbuf_max(5*1024*1024); # This would be really bad.
+ $s->{read_handle}->rbuf_max(5*1024*1024);
$s->_handshake;
return $s;
}
@@ -229,20 +235,22 @@ sub _cleanup {
sub _err {
my($s, $m) = @_;
$s->_cleanup;
- $s->{hdl}->destroy;
+ $s->{read_handle}->destroy;
+ $s->{write_handle}->destroy;
$s->{on_error} && $s->{on_error}->($m);
}
sub close {
$_[0]->_cleanup;
- $_[0]->{hdl}->push_shutdown;
+ $_[0]->{write_handle}->push_shutdown;
+ $_[0]->{read_handle}->push_shutdown;
}
sub _write {
my $s = shift;
- $s->{hdl}->push_write('Tanja::Link::JSON' => @_);
+ $s->{write_handle}->push_write('Tanja::Link::JSON' => @_);
$s->{on_write} && $s->{on_write}->(@_);
}
@@ -251,10 +259,10 @@ sub _handshake {
my $s = shift;
# Send our handshake message
- $s->{hdl}->push_write("ver,1.0 seri,json sero,json\012");
+ $s->{write_handle}->push_write("ver,1.0 seri,json sero,json\012");
# Receive handshake from other party
- $s->{hdl}->push_read(line => "\012" => sub {
+ $s->{read_handle}->push_read(line => "\012" => sub {
my $ver = ($_[1] =~ /\bver,([^ ]+)\b/) && $1;
my $seri = ($_[1] =~ /\bseri,([^ ]+)\b/) && $1;
my $sero = ($_[1] =~ /\bsero,([^ ]+)\b/) && $1;
@@ -266,7 +274,7 @@ sub _handshake {
or !grep $_ eq 'json', split /,/, $sero;
# Handshake complete, start receiving stuff
- $s->{hdl}->push_read('Tanja::Link::JSON' => sub { $s->_recv(@_) });
+ $s->{read_handle}->push_read('Tanja::Link::JSON' => sub { $s->_recv(@_) });
# Setup tuple forwarding
if($s->{sync}) {
@@ -374,7 +382,7 @@ sub _recv {
}
# and receive next message
- $s->{hdl}->push_read('Tanja::Link::JSON' => sub { $s->_recv(@_) });
+ $s->{read_handle}->push_read('Tanja::Link::JSON' => sub { $s->_recv(@_) });
}