summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Tanja.pm104
1 files changed, 95 insertions, 9 deletions
diff --git a/Tanja.pm b/Tanja.pm
index 0d1915a..c3d5094 100644
--- a/Tanja.pm
+++ b/Tanja.pm
@@ -37,6 +37,7 @@ sub new {
return bless({
lastid => 1,
pat => {}, # Note: Also used directly by Tanja::Link
+ lnk => {}, # Same note.
}, shift);
}
@@ -54,14 +55,15 @@ sub link {
# Send a tuple to the network.
-# Arguments: [ tuple ], $return_cb->([tuple] or ())
+# Arguments: [ tuple ], $return_cb->([tuple] or undef, $special_arg), $special_arg
+# $special_arg is local only within the server, it will not be routed.
# TODO: Some way to indicate that the session isn't interested in replies anymore?
sub send {
- my($s, $t, $cb) = @_;
+ my($s, $t, $cb, $sa) = @_;
my $ret = Tanja::ReturnPath->_new($cb);
for my $reg (grep Tanja::match($_->[0], $t), values %{$s->{pat}}) {
AnyEvent::postpone {
- $reg->[1]->($t, $ret);
+ $reg->[1]->($t, $ret, $sa);
}
}
}
@@ -71,16 +73,19 @@ sub _register {
my($s, $pat, $cb) = @_;
my $id = $s->{lastid};
# Explicitely wrap around long before 2^31, to avoid having Perl turn the ID into a float.
- ++$id >= 1<<30 and $id = 1 while $s->{pat}{$id};
+ do { ++$id >= 1<<30 and $id = 1 } while $s->{pat}{$id};
$s->{pat}{$id} = [ $pat, $cb ];
$s->{lastid} = $id;
+ $_->_srv_reg($id, $pat, $cb) for(values %{$s->{lnk}});
return $id;
}
sub _unregister {
my($s, $id) = @_;
- delete $s->{pat}{$id};
+ if(delete $s->{pat}{$id}) {
+ $_->_srv_unreg($id) for(values %{$s->{lnk}});
+ }
}
@@ -196,7 +201,12 @@ sub new {
hdl => $handle,
init => $init,
err => $err,
+ reg => {},
+ ret => {},
+ lastret => 1,
}, $own;
+ $s->{tup} = $s->_tuple;
+ $serv->{lnk}{$s} = $s;
$handle->on_error(sub { $s->_err($! == EPIPE ? undef : $_[2]) });
# TODO: Better to have a timeout on writes. (Although this buffer limit isn't bad either)
$handle->wbuf_max(5*1024*1024); # This would be really bad.
@@ -207,6 +217,7 @@ sub new {
sub _err {
my($s, $m) = @_;
+ delete $_[0]{serv}{$_[0]};
$s->{hdl}->destroy;
$s->{err} && $s->{err}->($m);
}
@@ -231,15 +242,87 @@ sub _handshake {
# Reply with our handshake if we hadn't sent it yet.
!$s->{init} && $s->{hdl}->push_write("ver,1.0 ser,json\012");
- # Handshake complete, send out initial register messages
+ # Handshake complete, send out initial register messages and start receiving stuff
$s->{hdl}->push_write('Tanja::Link::JSON' => register => $_+0, $s->{serv}{pat}{$_}[0]) for (keys %{$s->{serv}{pat}});
-
- # TODO: start regular message exchange.
+ $s->{hdl}->push_read('Tanja::Link::JSON' => sub { $s->_recv(@_) });
});
}
+# Generates the subroutine used for registering patterns with the server.
+sub _tuple {
+ my $s = shift;
+ sub { # tuple, ret, sa
+
+ # Don't route back tuples that we received from this link
+ return if $_[2] && ref($_[2]) eq 'Tanja::Link' && $_[2] == $s;
+
+ # Register return-path, if there is one
+ my $id = $_[1]->null ? 0 : $s->{lastret};
+ if($id) {
+ do { ++$id >= 1<<30 and $id = 1 } while $s->{ret}{$id};
+ $s->{ret}{$id} = $_[1];
+ $s->{lastret} = $id;
+ }
+
+ $s->{hdl}->push_write('Tanja::Link::JSON' => tuple => $id+0, $_[0]);
+ }
+}
+
+
+# Someone registered a pattern with the server
+sub _srv_reg {
+ my($s, $id, $pat, $cb) = @_;
+ $cb != $s->{tup} && $s->{hdl}->push_write('Tanja::Link::JSON' => register => $id+0, $pat);
+}
+
+
+# Someone unregistered a pattern with the server
+sub _srv_unreg {
+ my($s, $id) = @_;
+ # Note that this also sends out unregister messages for patterns that the
+ # connected party just unregistered. This shouldn't be a problem, though.
+ $s->{hdl}->push_write('Tanja::Link::JSON' => unregister => $id+0);
+}
+
+
+sub _recv {
+ my($s, $hdl, $cmd, @arg) = @_;
+
+ # Not very helpful error message...
+ return $s->_err("Invalid message") if !$cmd;
+
+ # register
+ if($cmd eq 'register') {
+ $s->{pat}{$arg[0]} = $s->{serv}->_register($arg[1], $s->{tup});
+
+ # unregister
+ } elsif($cmd eq 'unregister') {
+ $s->{serv}->_unregister($s->{pat}{$arg[0]});
+ delete $s->{pat}{$arg[0]};
+
+ # tuple
+ } elsif($cmd eq 'tuple') {
+ $s->{serv}->send($arg[1], !$arg[0] ? undef : sub {
+ $s->{hdl}->push_write('Tanja::Link::JSON' => $_[0]?'response':'close', $arg[0]+0, $_[0]?$_[0]:());
+ }, $s);
+
+ # response
+ } elsif($cmd eq 'response') {
+ $s->{ret}{$arg[0]}->reply($arg[1]) if $s->{ret}{$arg[0]};
+
+ # close
+ } elsif($cmd eq 'close') {
+ delete $s->{ret}{$arg[0]};
+ }
+
+ # and receive next message
+ $s->{hdl}->push_read('Tanja::Link::JSON' => sub { $s->_recv(@_) });
+}
+
+
sub close {
+ delete $_[0]{serv}{$_[0]};
$_[0]->{hdl}->push_shutdown;
}
@@ -263,17 +346,20 @@ sub anyevent_read_type {
return $cb->($_[0], undef) || 1 if !$d || ref($d) ne 'ARRAY';
my $num = shift @$d;
return $cb->($_[0], undef) || 1 if !$num || $num !~ /^\d+$/ || $num > @num_to_cmd;
+ # TODO: argument validation
$cb->($_[0], $num_to_cmd[$num], @$d);
1
}
}
+
sub anyevent_write_type {
my(undef, $cmd, @args) = @_;
- encode_json([$cmd_to_num{$cmd}, @args])."\n";
+ encode_json([$cmd_to_num{$cmd}, @args])."\012";
}
1;
# vim:noet:sw=4:ts=4
+