summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYorhel <git@yorhel.nl>2012-02-19 20:38:44 +0100
committerYorhel <git@yorhel.nl>2012-02-19 20:38:44 +0100
commit1732917f143c2d8eaa3820910a803602cfa8c5a9 (patch)
tree0fd4260fc728eb6a313d9364dd9fa21a15128b34
parentdbefc77cec73d88bed04f10eb6a20f0e926781e2 (diff)
Implemented new proto + fixed many bugs; doc: Updated proto/JSON
-rw-r--r--Tanja.pm126
-rwxr-xr-xtest.pl47
2 files changed, 131 insertions, 42 deletions
diff --git a/Tanja.pm b/Tanja.pm
index 181f513..6dd29ab 100644
--- a/Tanja.pm
+++ b/Tanja.pm
@@ -187,38 +187,65 @@ use Errno 'EPIPE';
use Carp 'croak';
-# Args: Tanja::Server, $server||!$client, AnyEvent::Handle, $error->($message)
-# Automatically initiates handshake and stuff. Error callback will also be
-# called when the remote side simply disconnected, $message is in that case
-# undef.
+# Args: Tanja::Server, AnyEvent::Handle, %options
+# Options:
+# init => bool. True if the other party connected to us, false otherwise. (Default false)
+# sync => bool. True to fetch the other servers' patterns. (Default true)
+# on_error => $cb->($message). Called when an error occurs, $message = undef for disconnect.
+# on_ready => $cb->(). Called when the link is "ready". (i.e. if you send tuples they will be forwarded)
+# on_write => $cb->(message => args). Called when queueing a (non-handshake) message for sending.
+# on_read => $cb->(message => args). Called when a (non-handshake) message has been received.
sub new {
- my($own, $serv, $init, $handle, $err) = @_;
+ my($own, $serv, $handle, %o) = @_;
require AnyEvent::Handle;
croak 'No JSON module available.' if !Tanja::Link::JSON->init;
my $s = bless {
+ init => 0,
+ sync => 1,
+ %o,
serv => $serv,
hdl => $handle,
- init => $init,
- err => $err,
reg => {},
ret => {},
lastret => 1,
+ lasttup => [],
}, $own;
$s->{tup} = $s->_tuple;
- $serv->{lnk}{$s} = $s;
$handle->on_error(sub { $s->_err($! == EPIPE ? undef : $_[2]) });
+
# TODO: Better to have a timeout on writes. (Although this buffer limit isn't bad either)
$handle->wbuf_max(5*1024*1024); # This would be really bad.
+ $handle->rbuf_max(5*1024*1024);
$s->_handshake;
return $s;
}
+sub _cleanup {
+ my $s = shift;
+ $s->{serv}->_unregister($_) for (values %{$s->{pat}});
+ $s->{pat} = {};
+ delete $s->{serv}{lnk}{$s};
+}
+
sub _err {
my($s, $m) = @_;
- delete $_[0]{serv}{$_[0]};
+ $s->_cleanup;
$s->{hdl}->destroy;
- $s->{err} && $s->{err}->($m);
+ $s->{on_error} && $s->{on_error}->($m);
+}
+
+
+sub close {
+ $_[0]->_cleanup;
+ $_[0]->{hdl}->push_shutdown;
+}
+
+
+sub _write {
+ my $s = shift;
+ $s->{hdl}->push_write('Tanja::Link::JSON' => @_);
+ $s->{on_write} && $s->{on_write}->(@_);
}
@@ -241,9 +268,17 @@ sub _handshake {
# Reply with our handshake if we hadn't sent it yet.
!$s->{init} && $s->{hdl}->push_write("ver,1.0 ser,json\012");
- # Handshake complete, send out initial register messages and start receiving stuff
- $s->{hdl}->push_write('Tanja::Link::JSON' => register => $_+0, $s->{serv}{pat}{$_}[0]) for (keys %{$s->{serv}{pat}});
+ # Handshake complete, start receiving stuff
$s->{hdl}->push_read('Tanja::Link::JSON' => sub { $s->_recv(@_) });
+
+ # Setup tuple forwarding
+ if($s->{sync}) {
+ $s->_write(patternsync => 1);
+ } else {
+ $s->{pat}{_} = $s->{serv}->_register([], $s->{tup});
+ $s->{on_ready} && $s->{on_ready}->();
+ delete $s->{on_ready};
+ }
});
}
@@ -256,6 +291,12 @@ sub _tuple {
# Don't route back tuples that we received from this link
return if $_[2] && ref($_[2]) eq 'Tanja::Link' && $_[2] == $s;
+ # Only send a tuple once even if there have been multiple
+ # registrations. (This relies on the property that the Server does not
+ # make a copy of the tuple when dispatching the callbacks.)
+ return if $_[0] == $s->{lasttup};
+ $s->{lasttup} = $_[0];
+
# Register return-path, if there is one
my $id = $_[1]->null ? 0 : $s->{lastret};
if($id) {
@@ -264,7 +305,7 @@ sub _tuple {
$s->{lastret} = $id;
}
- $s->{hdl}->push_write('Tanja::Link::JSON' => tuple => $id+0, $_[0]);
+ $s->_write(tuple => $id+0, $_[0]);
}
}
@@ -272,7 +313,7 @@ sub _tuple {
# Someone registered a pattern with the server
sub _srv_reg {
my($s, $id, $pat, $cb) = @_;
- $cb != $s->{tup} && $s->{hdl}->push_write('Tanja::Link::JSON' => register => $id+0, $pat);
+ $cb != $s->{tup} && $s->_write(register => $id+0, $pat);
}
@@ -281,7 +322,7 @@ sub _srv_unreg {
my($s, $id) = @_;
# Note that this also sends out unregister messages for patterns that the
# connected party just unregistered. This shouldn't be a problem, though.
- $s->{hdl}->push_write('Tanja::Link::JSON' => unregister => $id+0);
+ $s->_write(unregister => $id+0);
}
@@ -291,19 +332,39 @@ sub _recv {
# Not very helpful error message...
return $s->_err("Invalid message") if !$cmd;
+ $s->{on_read} && $s->{on_read}->($cmd, @arg);
+
+ # patternsync
+ if($cmd eq 'patternsync') {
+ if($arg[0]) {
+ $s->{serv}{lnk}{$s} = $s;
+ $s->{serv}{pat}{$_}[1] != $s->{tup} && $s->_write(register => $_+0, $s->{serv}{pat}{$_}[0])
+ for (keys %{$s->{serv}{pat}});
+ $s->_write('regdone');
+ } else {
+ delete $s->{serv}{lnk}{$s};
+ }
+
# register
- if($cmd eq 'register') {
- $s->{pat}{$arg[0]} = $s->{serv}->_register($arg[1], $s->{tup});
+ } elsif($cmd eq 'register') {
+ $s->{pat}{$arg[0]} = $s->{serv}->_register($arg[1], $s->{tup}) if $s->{sync};
+
+ # regdone
+ } elsif($cmd eq 'regdone') {
+ $s->{on_ready} && $s->{on_ready}->();
+ delete $s->{on_ready};
# unregister
} elsif($cmd eq 'unregister') {
- $s->{serv}->_unregister($s->{pat}{$arg[0]});
- delete $s->{pat}{$arg[0]};
+ if($s->{sync}) {
+ $s->{serv}->_unregister($s->{pat}{$arg[0]});
+ delete $s->{pat}{$arg[0]};
+ }
# tuple
} elsif($cmd eq 'tuple') {
$s->{serv}->send($arg[1], !$arg[0] ? undef : sub {
- $s->{hdl}->push_write('Tanja::Link::JSON' => $_[0]?'reply':'close', $arg[0]+0, $_[0]?$_[0]:());
+ $s->_write($_[0]?'reply':'close', $arg[0]+0, $_[0]?$_[0]:());
}, $s);
# reply
@@ -320,26 +381,29 @@ sub _recv {
}
-sub close {
- delete $_[0]{serv}{$_[0]};
- $_[0]->{hdl}->push_shutdown;
-}
-
-
# JSON serialization format. Requires either JSON or JSON::XS
package Tanja::Link::JSON;
use strict;
use warnings;
-my @num_to_cmd = ('', qw|register unregister tuple reply close|);
+my @num_to_cmd = ('', qw|patternsync register regdone unregister tuple reply close|);
my %cmd_to_num = map +($num_to_cmd[$_], $_), keys @num_to_cmd;
-my $json;
+my($json, $true, $false);
sub init {
- $json = eval { require JSON::XS; JSON::XS->new->utf8 }
- || eval { require JSON; JSON->new->utf8 };
+ $json = eval {
+ require JSON::XS;
+ $true = JSON::XS::true();
+ $false = JSON::XS::false();
+ JSON::XS->new->utf8
+ } || eval {
+ require JSON;
+ $true = JSON::true();
+ $false = JSON::false();
+ JSON->new->utf8
+ };
!!$json;
}
@@ -360,7 +424,7 @@ sub anyevent_read_type {
sub anyevent_write_type {
my(undef, $cmd, @args) = @_;
- $json->encode([$cmd_to_num{$cmd}, @args])."\012";
+ $json->encode([$cmd_to_num{$cmd}, $cmd eq 'patternsync' ? ($args[0]?$true:$false) : @args])."\012";
}
diff --git a/test.pl b/test.pl
index d6487a3..3d86748 100755
--- a/test.pl
+++ b/test.pl
@@ -7,6 +7,14 @@ use AnyEvent;
use AnyEvent::Handle;
use Socket;
+my $DEBUG = 0;
+if($DEBUG) {
+ no warnings 'once';
+ require Data::Dumper;
+ $Data::Dumper::Indent = 0;
+ $Data::Dumper::Terse = 1;
+}
+
use_ok 'Tanja';
@@ -61,25 +69,30 @@ ok !Tanja::match([2], [1]);
# Simple double-session test with return-path
sub t_double {
- my($sa, $sb) = @_;
+ my($sa, $sb, $link) = @_;
my $a = $sa->session;
my $b = $sb->session;
my $done = AnyEvent->condvar;
+ my $msgn = 0;
$a->reg(["msg"], sub {
my($t, $r) = @_;
+ ok !$msgn++;
isa_ok $r, 'Tanja::ReturnPath';
ok !$r->null;
$r->reply(['b', 9]);
is_deeply $t, ["msg", 'a'];
$a->send(["b"]);
});
+ my $bn = 0;
$b->reg(["b"], sub {
my($t, $r) = @_;
+ ok !$bn++;
isa_ok $r, 'Tanja::ReturnPath';
ok $r->null;
is_deeply $t, ["b"];
$done->send;
});
+ $link && $link->();
my $n = 0;
$b->send(["msg", 'a'], sub {
!$n++ ? is_deeply $_[0], ['b', 9] : ok !@_;
@@ -90,19 +103,31 @@ sub t_double {
{ # same server
my $s = Tanja::Server->new;
+ note 'Same server';
t_double($s, $s);
}
-TODO: { # different servers, linked
- local $TODO = 'Fundamental flaw in the protocol';
-
- my $sa = Tanja::Server->new;
- my $sb = Tanja::Server->new;
- socketpair my $socka, my $sockb, AF_UNIX, SOCK_STREAM, PF_UNSPEC;
- $sa->link(0, AnyEvent::Handle->new(fh => $socka), sub { });
- $sb->link(1, AnyEvent::Handle->new(fh => $sockb), sub { });
- # Currently hangs
- #t_double($sa, $sb);
+{ # different servers, linked. (With various combinations of the 'sync' flag)
+ for my $f (0..3) {
+ note "Linked servers, $f";
+ my $sa = Tanja::Server->new;
+ my $sb = Tanja::Server->new;
+ t_double($sa, $sb, sub {
+ socketpair my $socka, my $sockb, AF_UNIX, SOCK_STREAM, PF_UNSPEC;
+ my $done = AnyEvent->condvar;
+ $done->begin;
+ $done->begin;
+ $sa->link(AnyEvent::Handle->new(fh => $socka), sync => $f&1,
+ on_ready => sub { $done->end },
+ $DEBUG ? (on_write => sub { note 'A: ',Data::Dumper::Dumper(\@_) }) : (),
+ );
+ $sb->link(AnyEvent::Handle->new(fh => $sockb), sync => $f&2, init => 1,
+ on_ready => sub { $done->end },
+ $DEBUG ? (on_write => sub { note 'B: ',Data::Dumper::Dumper(\@_) }) : (),
+ );
+ $done->recv;
+ });
+ }
}