summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYorhel <git@yorhel.nl>2012-02-18 14:03:22 +0100
committerYorhel <git@yorhel.nl>2012-02-18 14:24:53 +0100
commit52aa1ecfa8b4bf8a7217d02e6fb6fbe3801043cb (patch)
treedf6e70bbbff9287a8194a714bac51c031669836a
parent17fde575fd77113ef93ed65df57391585ed3b6c3 (diff)
Implemented full link support
Yay, this is the first complete and working implementation of the link protocol! Not really final yet, though, but at least something to experiment with.
-rw-r--r--Tanja.pm104
1 files changed, 95 insertions, 9 deletions
diff --git a/Tanja.pm b/Tanja.pm
index 0d1915a..c3d5094 100644
--- a/Tanja.pm
+++ b/Tanja.pm
@@ -37,6 +37,7 @@ sub new {
return bless({
lastid => 1,
pat => {}, # Note: Also used directly by Tanja::Link
+ lnk => {}, # Same note.
}, shift);
}
@@ -54,14 +55,15 @@ sub link {
# Send a tuple to the network.
-# Arguments: [ tuple ], $return_cb->([tuple] or ())
+# Arguments: [ tuple ], $return_cb->([tuple] or undef, $special_arg), $special_arg
+# $special_arg is local only within the server, it will not be routed.
# TODO: Some way to indicate that the session isn't interested in replies anymore?
sub send {
- my($s, $t, $cb) = @_;
+ my($s, $t, $cb, $sa) = @_;
my $ret = Tanja::ReturnPath->_new($cb);
for my $reg (grep Tanja::match($_->[0], $t), values %{$s->{pat}}) {
AnyEvent::postpone {
- $reg->[1]->($t, $ret);
+ $reg->[1]->($t, $ret, $sa);
}
}
}
@@ -71,16 +73,19 @@ sub _register {
my($s, $pat, $cb) = @_;
my $id = $s->{lastid};
# Explicitely wrap around long before 2^31, to avoid having Perl turn the ID into a float.
- ++$id >= 1<<30 and $id = 1 while $s->{pat}{$id};
+ do { ++$id >= 1<<30 and $id = 1 } while $s->{pat}{$id};
$s->{pat}{$id} = [ $pat, $cb ];
$s->{lastid} = $id;
+ $_->_srv_reg($id, $pat, $cb) for(values %{$s->{lnk}});
return $id;
}
sub _unregister {
my($s, $id) = @_;
- delete $s->{pat}{$id};
+ if(delete $s->{pat}{$id}) {
+ $_->_srv_unreg($id) for(values %{$s->{lnk}});
+ }
}
@@ -196,7 +201,12 @@ sub new {
hdl => $handle,
init => $init,
err => $err,
+ reg => {},
+ ret => {},
+ lastret => 1,
}, $own;
+ $s->{tup} = $s->_tuple;
+ $serv->{lnk}{$s} = $s;
$handle->on_error(sub { $s->_err($! == EPIPE ? undef : $_[2]) });
# TODO: Better to have a timeout on writes. (Although this buffer limit isn't bad either)
$handle->wbuf_max(5*1024*1024); # This would be really bad.
@@ -207,6 +217,7 @@ sub new {
sub _err {
my($s, $m) = @_;
+ delete $_[0]{serv}{$_[0]};
$s->{hdl}->destroy;
$s->{err} && $s->{err}->($m);
}
@@ -231,15 +242,87 @@ sub _handshake {
# Reply with our handshake if we hadn't sent it yet.
!$s->{init} && $s->{hdl}->push_write("ver,1.0 ser,json\012");
- # Handshake complete, send out initial register messages
+ # Handshake complete, send out initial register messages and start receiving stuff
$s->{hdl}->push_write('Tanja::Link::JSON' => register => $_+0, $s->{serv}{pat}{$_}[0]) for (keys %{$s->{serv}{pat}});
-
- # TODO: start regular message exchange.
+ $s->{hdl}->push_read('Tanja::Link::JSON' => sub { $s->_recv(@_) });
});
}
+# Generates the subroutine used for registering patterns with the server.
+sub _tuple {
+ my $s = shift;
+ sub { # tuple, ret, sa
+
+ # Don't route back tuples that we received from this link
+ return if $_[2] && ref($_[2]) eq 'Tanja::Link' && $_[2] == $s;
+
+ # Register return-path, if there is one
+ my $id = $_[1]->null ? 0 : $s->{lastret};
+ if($id) {
+ do { ++$id >= 1<<30 and $id = 1 } while $s->{ret}{$id};
+ $s->{ret}{$id} = $_[1];
+ $s->{lastret} = $id;
+ }
+
+ $s->{hdl}->push_write('Tanja::Link::JSON' => tuple => $id+0, $_[0]);
+ }
+}
+
+
+# Someone registered a pattern with the server
+sub _srv_reg {
+ my($s, $id, $pat, $cb) = @_;
+ $cb != $s->{tup} && $s->{hdl}->push_write('Tanja::Link::JSON' => register => $id+0, $pat);
+}
+
+
+# Someone unregistered a pattern with the server
+sub _srv_unreg {
+ my($s, $id) = @_;
+ # Note that this also sends out unregister messages for patterns that the
+ # connected party just unregistered. This shouldn't be a problem, though.
+ $s->{hdl}->push_write('Tanja::Link::JSON' => unregister => $id+0);
+}
+
+
+sub _recv {
+ my($s, $hdl, $cmd, @arg) = @_;
+
+ # Not very helpful error message...
+ return $s->_err("Invalid message") if !$cmd;
+
+ # register
+ if($cmd eq 'register') {
+ $s->{pat}{$arg[0]} = $s->{serv}->_register($arg[1], $s->{tup});
+
+ # unregister
+ } elsif($cmd eq 'unregister') {
+ $s->{serv}->_unregister($s->{pat}{$arg[0]});
+ delete $s->{pat}{$arg[0]};
+
+ # tuple
+ } elsif($cmd eq 'tuple') {
+ $s->{serv}->send($arg[1], !$arg[0] ? undef : sub {
+ $s->{hdl}->push_write('Tanja::Link::JSON' => $_[0]?'response':'close', $arg[0]+0, $_[0]?$_[0]:());
+ }, $s);
+
+ # response
+ } elsif($cmd eq 'response') {
+ $s->{ret}{$arg[0]}->reply($arg[1]) if $s->{ret}{$arg[0]};
+
+ # close
+ } elsif($cmd eq 'close') {
+ delete $s->{ret}{$arg[0]};
+ }
+
+ # and receive next message
+ $s->{hdl}->push_read('Tanja::Link::JSON' => sub { $s->_recv(@_) });
+}
+
+
sub close {
+ delete $_[0]{serv}{$_[0]};
$_[0]->{hdl}->push_shutdown;
}
@@ -263,17 +346,20 @@ sub anyevent_read_type {
return $cb->($_[0], undef) || 1 if !$d || ref($d) ne 'ARRAY';
my $num = shift @$d;
return $cb->($_[0], undef) || 1 if !$num || $num !~ /^\d+$/ || $num > @num_to_cmd;
+ # TODO: argument validation
$cb->($_[0], $num_to_cmd[$num], @$d);
1
}
}
+
sub anyevent_write_type {
my(undef, $cmd, @args) = @_;
- encode_json([$cmd_to_num{$cmd}, @args])."\n";
+ encode_json([$cmd_to_num{$cmd}, @args])."\012";
}
1;
# vim:noet:sw=4:ts=4
+