diff options
-rw-r--r-- | Tanja.pm | 104 |
1 files changed, 95 insertions, 9 deletions
@@ -37,6 +37,7 @@ sub new { return bless({ lastid => 1, pat => {}, # Note: Also used directly by Tanja::Link + lnk => {}, # Same note. }, shift); } @@ -54,14 +55,15 @@ sub link { # Send a tuple to the network. -# Arguments: [ tuple ], $return_cb->([tuple] or ()) +# Arguments: [ tuple ], $return_cb->([tuple] or undef, $special_arg), $special_arg +# $special_arg is local only within the server, it will not be routed. # TODO: Some way to indicate that the session isn't interested in replies anymore? sub send { - my($s, $t, $cb) = @_; + my($s, $t, $cb, $sa) = @_; my $ret = Tanja::ReturnPath->_new($cb); for my $reg (grep Tanja::match($_->[0], $t), values %{$s->{pat}}) { AnyEvent::postpone { - $reg->[1]->($t, $ret); + $reg->[1]->($t, $ret, $sa); } } } @@ -71,16 +73,19 @@ sub _register { my($s, $pat, $cb) = @_; my $id = $s->{lastid}; # Explicitely wrap around long before 2^31, to avoid having Perl turn the ID into a float. - ++$id >= 1<<30 and $id = 1 while $s->{pat}{$id}; + do { ++$id >= 1<<30 and $id = 1 } while $s->{pat}{$id}; $s->{pat}{$id} = [ $pat, $cb ]; $s->{lastid} = $id; + $_->_srv_reg($id, $pat, $cb) for(values %{$s->{lnk}}); return $id; } sub _unregister { my($s, $id) = @_; - delete $s->{pat}{$id}; + if(delete $s->{pat}{$id}) { + $_->_srv_unreg($id) for(values %{$s->{lnk}}); + } } @@ -196,7 +201,12 @@ sub new { hdl => $handle, init => $init, err => $err, + reg => {}, + ret => {}, + lastret => 1, }, $own; + $s->{tup} = $s->_tuple; + $serv->{lnk}{$s} = $s; $handle->on_error(sub { $s->_err($! == EPIPE ? undef : $_[2]) }); # TODO: Better to have a timeout on writes. (Although this buffer limit isn't bad either) $handle->wbuf_max(5*1024*1024); # This would be really bad. @@ -207,6 +217,7 @@ sub new { sub _err { my($s, $m) = @_; + delete $_[0]{serv}{$_[0]}; $s->{hdl}->destroy; $s->{err} && $s->{err}->($m); } @@ -231,15 +242,87 @@ sub _handshake { # Reply with our handshake if we hadn't sent it yet. !$s->{init} && $s->{hdl}->push_write("ver,1.0 ser,json\012"); - # Handshake complete, send out initial register messages + # Handshake complete, send out initial register messages and start receiving stuff $s->{hdl}->push_write('Tanja::Link::JSON' => register => $_+0, $s->{serv}{pat}{$_}[0]) for (keys %{$s->{serv}{pat}}); - - # TODO: start regular message exchange. + $s->{hdl}->push_read('Tanja::Link::JSON' => sub { $s->_recv(@_) }); }); } +# Generates the subroutine used for registering patterns with the server. +sub _tuple { + my $s = shift; + sub { # tuple, ret, sa + + # Don't route back tuples that we received from this link + return if $_[2] && ref($_[2]) eq 'Tanja::Link' && $_[2] == $s; + + # Register return-path, if there is one + my $id = $_[1]->null ? 0 : $s->{lastret}; + if($id) { + do { ++$id >= 1<<30 and $id = 1 } while $s->{ret}{$id}; + $s->{ret}{$id} = $_[1]; + $s->{lastret} = $id; + } + + $s->{hdl}->push_write('Tanja::Link::JSON' => tuple => $id+0, $_[0]); + } +} + + +# Someone registered a pattern with the server +sub _srv_reg { + my($s, $id, $pat, $cb) = @_; + $cb != $s->{tup} && $s->{hdl}->push_write('Tanja::Link::JSON' => register => $id+0, $pat); +} + + +# Someone unregistered a pattern with the server +sub _srv_unreg { + my($s, $id) = @_; + # Note that this also sends out unregister messages for patterns that the + # connected party just unregistered. This shouldn't be a problem, though. + $s->{hdl}->push_write('Tanja::Link::JSON' => unregister => $id+0); +} + + +sub _recv { + my($s, $hdl, $cmd, @arg) = @_; + + # Not very helpful error message... + return $s->_err("Invalid message") if !$cmd; + + # register + if($cmd eq 'register') { + $s->{pat}{$arg[0]} = $s->{serv}->_register($arg[1], $s->{tup}); + + # unregister + } elsif($cmd eq 'unregister') { + $s->{serv}->_unregister($s->{pat}{$arg[0]}); + delete $s->{pat}{$arg[0]}; + + # tuple + } elsif($cmd eq 'tuple') { + $s->{serv}->send($arg[1], !$arg[0] ? undef : sub { + $s->{hdl}->push_write('Tanja::Link::JSON' => $_[0]?'response':'close', $arg[0]+0, $_[0]?$_[0]:()); + }, $s); + + # response + } elsif($cmd eq 'response') { + $s->{ret}{$arg[0]}->reply($arg[1]) if $s->{ret}{$arg[0]}; + + # close + } elsif($cmd eq 'close') { + delete $s->{ret}{$arg[0]}; + } + + # and receive next message + $s->{hdl}->push_read('Tanja::Link::JSON' => sub { $s->_recv(@_) }); +} + + sub close { + delete $_[0]{serv}{$_[0]}; $_[0]->{hdl}->push_shutdown; } @@ -263,17 +346,20 @@ sub anyevent_read_type { return $cb->($_[0], undef) || 1 if !$d || ref($d) ne 'ARRAY'; my $num = shift @$d; return $cb->($_[0], undef) || 1 if !$num || $num !~ /^\d+$/ || $num > @num_to_cmd; + # TODO: argument validation $cb->($_[0], $num_to_cmd[$num], @$d); 1 } } + sub anyevent_write_type { my(undef, $cmd, @args) = @_; - encode_json([$cmd_to_num{$cmd}, @args])."\n"; + encode_json([$cmd_to_num{$cmd}, @args])."\012"; } 1; # vim:noet:sw=4:ts=4 + |