summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYorhel <git@yorhel.nl>2012-02-22 11:30:31 +0100
committerYorhel <git@yorhel.nl>2012-02-22 11:30:31 +0100
commit13c2cfbd986d124d8762fd62414395ea8ed28f86 (patch)
treee0ee96ac527cd9f1e4737f8d9fff071db6b7edcf
parent6a066068c70336d205dfcd1adc9e0c68ba34ffc0 (diff)
perl: Allow a different AnyEvent::Handle for reading and writing
This allows two unidirectional pipes (e.g. STDIN/STDOUT) to be used for communication. There's no obvious or efficient way to combine two pipes into a single AnyEvent::Handle.
-rw-r--r--perl/Tanja.pm34
-rwxr-xr-xperl/test.pl8
2 files changed, 27 insertions, 15 deletions
diff --git a/perl/Tanja.pm b/perl/Tanja.pm
index ae04aaa..58b2ac6 100644
--- a/perl/Tanja.pm
+++ b/perl/Tanja.pm
@@ -187,33 +187,39 @@ use Errno 'EPIPE';
use Carp 'croak';
-# Args: Tanja::Server, AnyEvent::Handle, %options
+# Args: Tanja::Server, %options
# Options:
# sync => bool. True to fetch the other servers' patterns. (Default true)
+# handle => AnyEvent::Handle. Equivalent to setting write_handle and read_handle to the same thing.
+# write_handle => AnyEvent::Handle. Used for writing.
+# read_handle => AnyEvent::Handle. Used for reading.
# on_error => $cb->($message). Called when an error occurs, $message = undef for disconnect.
# on_ready => $cb->(). Called when the link is "ready". (i.e. if you send tuples they will be forwarded)
# on_write => $cb->(message => args). Called when queueing a (non-handshake) message for sending.
# on_read => $cb->(message => args). Called when a (non-handshake) message has been received.
sub new {
- my($own, $serv, $handle, %o) = @_;
+ my($own, $serv, %o) = @_;
require AnyEvent::Handle;
croak 'No JSON module available.' if !Tanja::Link::JSON->init;
my $s = bless {
sync => 1,
%o,
serv => $serv,
- hdl => $handle,
reg => {},
ret => {},
lastret => 1,
lasttup => [],
}, $own;
+ $s->{write_handle} ||= $s->{handle};
+ $s->{read_handle} ||= $s->{handle};
+ croak 'No read or write handle set.' if !$s->{write_handle} || !$s->{read_handle};
$s->{tup} = $s->_tuple;
- $handle->on_error(sub { $s->_err($! == EPIPE ? undef : $_[2]) });
+ $s->{write_handle}->on_error(sub { $s->_err($! == EPIPE ? undef : $_[2]) });
+ $s->{read_handle}->on_error(sub { $s->_err($! == EPIPE ? undef : $_[2]) });
# TODO: Better to have a timeout on writes. (Although this buffer limit isn't bad either)
- $handle->wbuf_max(5*1024*1024); # This would be really bad.
- $handle->rbuf_max(5*1024*1024);
+ $s->{write_handle}->wbuf_max(5*1024*1024); # This would be really bad.
+ $s->{read_handle}->rbuf_max(5*1024*1024);
$s->_handshake;
return $s;
}
@@ -229,20 +235,22 @@ sub _cleanup {
sub _err {
my($s, $m) = @_;
$s->_cleanup;
- $s->{hdl}->destroy;
+ $s->{read_handle}->destroy;
+ $s->{write_handle}->destroy;
$s->{on_error} && $s->{on_error}->($m);
}
sub close {
$_[0]->_cleanup;
- $_[0]->{hdl}->push_shutdown;
+ $_[0]->{write_handle}->push_shutdown;
+ $_[0]->{read_handle}->push_shutdown;
}
sub _write {
my $s = shift;
- $s->{hdl}->push_write('Tanja::Link::JSON' => @_);
+ $s->{write_handle}->push_write('Tanja::Link::JSON' => @_);
$s->{on_write} && $s->{on_write}->(@_);
}
@@ -251,10 +259,10 @@ sub _handshake {
my $s = shift;
# Send our handshake message
- $s->{hdl}->push_write("ver,1.0 seri,json sero,json\012");
+ $s->{write_handle}->push_write("ver,1.0 seri,json sero,json\012");
# Receive handshake from other party
- $s->{hdl}->push_read(line => "\012" => sub {
+ $s->{read_handle}->push_read(line => "\012" => sub {
my $ver = ($_[1] =~ /\bver,([^ ]+)\b/) && $1;
my $seri = ($_[1] =~ /\bseri,([^ ]+)\b/) && $1;
my $sero = ($_[1] =~ /\bsero,([^ ]+)\b/) && $1;
@@ -266,7 +274,7 @@ sub _handshake {
or !grep $_ eq 'json', split /,/, $sero;
# Handshake complete, start receiving stuff
- $s->{hdl}->push_read('Tanja::Link::JSON' => sub { $s->_recv(@_) });
+ $s->{read_handle}->push_read('Tanja::Link::JSON' => sub { $s->_recv(@_) });
# Setup tuple forwarding
if($s->{sync}) {
@@ -374,7 +382,7 @@ sub _recv {
}
# and receive next message
- $s->{hdl}->push_read('Tanja::Link::JSON' => sub { $s->_recv(@_) });
+ $s->{read_handle}->push_read('Tanja::Link::JSON' => sub { $s->_recv(@_) });
}
diff --git a/perl/test.pl b/perl/test.pl
index bed74e0..25470b2 100755
--- a/perl/test.pl
+++ b/perl/test.pl
@@ -117,11 +117,15 @@ sub t_double {
my $done = AnyEvent->condvar;
$done->begin;
$done->begin;
- $sa->link(AnyEvent::Handle->new(fh => $socka), sync => $f&1,
+ $sa->link(
+ handle => AnyEvent::Handle->new(fh => $socka),
+ sync => $f&1,
on_ready => sub { $done->end },
$DEBUG ? (on_write => sub { note 'A: ',Data::Dumper::Dumper(\@_) }) : (),
);
- $sb->link(AnyEvent::Handle->new(fh => $sockb), sync => $f&2,
+ $sb->link(
+ handle => AnyEvent::Handle->new(fh => $sockb),
+ sync => $f&2,
on_ready => sub { $done->end },
$DEBUG ? (on_write => sub { note 'B: ',Data::Dumper::Dumper(\@_) }) : (),
);