summaryrefslogtreecommitdiff
path: root/Tanja.pm
diff options
context:
space:
mode:
Diffstat (limited to 'Tanja.pm')
-rw-r--r--Tanja.pm126
1 files changed, 95 insertions, 31 deletions
diff --git a/Tanja.pm b/Tanja.pm
index 181f513..6dd29ab 100644
--- a/Tanja.pm
+++ b/Tanja.pm
@@ -187,38 +187,65 @@ use Errno 'EPIPE';
use Carp 'croak';
-# Args: Tanja::Server, $server||!$client, AnyEvent::Handle, $error->($message)
-# Automatically initiates handshake and stuff. Error callback will also be
-# called when the remote side simply disconnected, $message is in that case
-# undef.
+# Args: Tanja::Server, AnyEvent::Handle, %options
+# Options:
+# init => bool. True if the other party connected to us, false otherwise. (Default false)
+# sync => bool. True to fetch the other servers' patterns. (Default true)
+# on_error => $cb->($message). Called when an error occurs, $message = undef for disconnect.
+# on_ready => $cb->(). Called when the link is "ready". (i.e. if you send tuples they will be forwarded)
+# on_write => $cb->(message => args). Called when queueing a (non-handshake) message for sending.
+# on_read => $cb->(message => args). Called when a (non-handshake) message has been received.
sub new {
- my($own, $serv, $init, $handle, $err) = @_;
+ my($own, $serv, $handle, %o) = @_;
require AnyEvent::Handle;
croak 'No JSON module available.' if !Tanja::Link::JSON->init;
my $s = bless {
+ init => 0,
+ sync => 1,
+ %o,
serv => $serv,
hdl => $handle,
- init => $init,
- err => $err,
reg => {},
ret => {},
lastret => 1,
+ lasttup => [],
}, $own;
$s->{tup} = $s->_tuple;
- $serv->{lnk}{$s} = $s;
$handle->on_error(sub { $s->_err($! == EPIPE ? undef : $_[2]) });
+
# TODO: Better to have a timeout on writes. (Although this buffer limit isn't bad either)
$handle->wbuf_max(5*1024*1024); # This would be really bad.
+ $handle->rbuf_max(5*1024*1024);
$s->_handshake;
return $s;
}
+sub _cleanup {
+ my $s = shift;
+ $s->{serv}->_unregister($_) for (values %{$s->{pat}});
+ $s->{pat} = {};
+ delete $s->{serv}{lnk}{$s};
+}
+
sub _err {
my($s, $m) = @_;
- delete $_[0]{serv}{$_[0]};
+ $s->_cleanup;
$s->{hdl}->destroy;
- $s->{err} && $s->{err}->($m);
+ $s->{on_error} && $s->{on_error}->($m);
+}
+
+
+sub close {
+ $_[0]->_cleanup;
+ $_[0]->{hdl}->push_shutdown;
+}
+
+
+sub _write {
+ my $s = shift;
+ $s->{hdl}->push_write('Tanja::Link::JSON' => @_);
+ $s->{on_write} && $s->{on_write}->(@_);
}
@@ -241,9 +268,17 @@ sub _handshake {
# Reply with our handshake if we hadn't sent it yet.
!$s->{init} && $s->{hdl}->push_write("ver,1.0 ser,json\012");
- # Handshake complete, send out initial register messages and start receiving stuff
- $s->{hdl}->push_write('Tanja::Link::JSON' => register => $_+0, $s->{serv}{pat}{$_}[0]) for (keys %{$s->{serv}{pat}});
+ # Handshake complete, start receiving stuff
$s->{hdl}->push_read('Tanja::Link::JSON' => sub { $s->_recv(@_) });
+
+ # Setup tuple forwarding
+ if($s->{sync}) {
+ $s->_write(patternsync => 1);
+ } else {
+ $s->{pat}{_} = $s->{serv}->_register([], $s->{tup});
+ $s->{on_ready} && $s->{on_ready}->();
+ delete $s->{on_ready};
+ }
});
}
@@ -256,6 +291,12 @@ sub _tuple {
# Don't route back tuples that we received from this link
return if $_[2] && ref($_[2]) eq 'Tanja::Link' && $_[2] == $s;
+ # Only send a tuple once even if there have been multiple
+ # registrations. (This relies on the property that the Server does not
+ # make a copy of the tuple when dispatching the callbacks.)
+ return if $_[0] == $s->{lasttup};
+ $s->{lasttup} = $_[0];
+
# Register return-path, if there is one
my $id = $_[1]->null ? 0 : $s->{lastret};
if($id) {
@@ -264,7 +305,7 @@ sub _tuple {
$s->{lastret} = $id;
}
- $s->{hdl}->push_write('Tanja::Link::JSON' => tuple => $id+0, $_[0]);
+ $s->_write(tuple => $id+0, $_[0]);
}
}
@@ -272,7 +313,7 @@ sub _tuple {
# Someone registered a pattern with the server
sub _srv_reg {
my($s, $id, $pat, $cb) = @_;
- $cb != $s->{tup} && $s->{hdl}->push_write('Tanja::Link::JSON' => register => $id+0, $pat);
+ $cb != $s->{tup} && $s->_write(register => $id+0, $pat);
}
@@ -281,7 +322,7 @@ sub _srv_unreg {
my($s, $id) = @_;
# Note that this also sends out unregister messages for patterns that the
# connected party just unregistered. This shouldn't be a problem, though.
- $s->{hdl}->push_write('Tanja::Link::JSON' => unregister => $id+0);
+ $s->_write(unregister => $id+0);
}
@@ -291,19 +332,39 @@ sub _recv {
# Not very helpful error message...
return $s->_err("Invalid message") if !$cmd;
+ $s->{on_read} && $s->{on_read}->($cmd, @arg);
+
+ # patternsync
+ if($cmd eq 'patternsync') {
+ if($arg[0]) {
+ $s->{serv}{lnk}{$s} = $s;
+ $s->{serv}{pat}{$_}[1] != $s->{tup} && $s->_write(register => $_+0, $s->{serv}{pat}{$_}[0])
+ for (keys %{$s->{serv}{pat}});
+ $s->_write('regdone');
+ } else {
+ delete $s->{serv}{lnk}{$s};
+ }
+
# register
- if($cmd eq 'register') {
- $s->{pat}{$arg[0]} = $s->{serv}->_register($arg[1], $s->{tup});
+ } elsif($cmd eq 'register') {
+ $s->{pat}{$arg[0]} = $s->{serv}->_register($arg[1], $s->{tup}) if $s->{sync};
+
+ # regdone
+ } elsif($cmd eq 'regdone') {
+ $s->{on_ready} && $s->{on_ready}->();
+ delete $s->{on_ready};
# unregister
} elsif($cmd eq 'unregister') {
- $s->{serv}->_unregister($s->{pat}{$arg[0]});
- delete $s->{pat}{$arg[0]};
+ if($s->{sync}) {
+ $s->{serv}->_unregister($s->{pat}{$arg[0]});
+ delete $s->{pat}{$arg[0]};
+ }
# tuple
} elsif($cmd eq 'tuple') {
$s->{serv}->send($arg[1], !$arg[0] ? undef : sub {
- $s->{hdl}->push_write('Tanja::Link::JSON' => $_[0]?'reply':'close', $arg[0]+0, $_[0]?$_[0]:());
+ $s->_write($_[0]?'reply':'close', $arg[0]+0, $_[0]?$_[0]:());
}, $s);
# reply
@@ -320,26 +381,29 @@ sub _recv {
}
-sub close {
- delete $_[0]{serv}{$_[0]};
- $_[0]->{hdl}->push_shutdown;
-}
-
-
# JSON serialization format. Requires either JSON or JSON::XS
package Tanja::Link::JSON;
use strict;
use warnings;
-my @num_to_cmd = ('', qw|register unregister tuple reply close|);
+my @num_to_cmd = ('', qw|patternsync register regdone unregister tuple reply close|);
my %cmd_to_num = map +($num_to_cmd[$_], $_), keys @num_to_cmd;
-my $json;
+my($json, $true, $false);
sub init {
- $json = eval { require JSON::XS; JSON::XS->new->utf8 }
- || eval { require JSON; JSON->new->utf8 };
+ $json = eval {
+ require JSON::XS;
+ $true = JSON::XS::true();
+ $false = JSON::XS::false();
+ JSON::XS->new->utf8
+ } || eval {
+ require JSON;
+ $true = JSON::true();
+ $false = JSON::false();
+ JSON->new->utf8
+ };
!!$json;
}
@@ -360,7 +424,7 @@ sub anyevent_read_type {
sub anyevent_write_type {
my(undef, $cmd, @args) = @_;
- $json->encode([$cmd_to_num{$cmd}, @args])."\012";
+ $json->encode([$cmd_to_num{$cmd}, $cmd eq 'patternsync' ? ($args[0]?$true:$false) : @args])."\012";
}