package Tanja; use strict; use warnings; # Just make sure it's loaded. It doesn't export anything. use AnyEvent (); # Args: [ pattern ], [ tuple ] # Returns: a true value on match, a false value otherwise # TODO: this match() implementation is for experimental purposes only, it # should actually follow some established semantics to interoperate properly. # undef is used as wildcard (in either the tuple or the pattern). sub match { my($p, $t) = @_; return 0 if @$p > @$t; for my $i (keys @$p) { next if !defined $p->[$i] or !defined $t->[$i]; return 0 if $p->[$i] ne $t->[$i]; } return 1; } package Tanja::Node; use strict; use warnings; # Create a new Node sub new { return bless({ lastid => 1, pat => {}, # Note: Also used directly by Tanja::Link lnk => {}, # Same note. }, shift); } # Create a new session sub session { return Tanja::Session->new(shift); } # Link with a remote node via an AnyEvent::Handle stream sub link { return Tanja::Link->new(@_); } # Send a tuple to the network. # Arguments: [ tuple ], $return_cb->([tuple] or undef, $special_arg), $special_arg # $special_arg is local only within the node, it will not be routed. # TODO: Some way to indicate that the session isn't interested in replies anymore? sub send { my($s, $t, $cb, $sa) = @_; my $ret = Tanja::ReturnPath->_new($cb); for my $reg (grep Tanja::match($_->[0], $t), values %{$s->{pat}}) { AnyEvent::postpone { $reg->[1]->($t, $ret, $sa, \$reg->[2]); } } } sub _register { my($s, $pat, $cb) = @_; my $id = $s->{lastid}; # Explicitely wrap around long before 2^31, to avoid having Perl turn the ID into a float. do { ++$id >= 1<<30 and $id = 1 } while $s->{pat}{$id}; $s->{pat}{$id} = [ $pat, $cb, 1 ]; $s->{lastid} = $id; $_->_srv_reg($id, $pat, $cb) for(values %{$s->{lnk}}); return $id; } sub _unregister { my($s, $id) = @_; if($s->{pat}{$id}) { $s->{pat}{$id}[2] = 0; delete $s->{pat}{$id}; $_->_srv_unreg($id) for(values %{$s->{lnk}}); } } package Tanja::Session; use strict; use warnings; # Create a new session (usually called by $node->session) sub new { my($own, $node) = @_; return bless({ node => $node, pat => {}, }, $own); } # Register for a pattern # Args: [ pattern ], callback # Returns: $id, for use with unreg() # Callback: # Args: [ tuple ], $return_path sub reg { my($s, $pat, $cb) = @_; my $id; $id = $s->{node}->_register($pat, sub { ${$_[3]} && $cb->($_[0], $_[1]) }); $s->{pat}{$id} = 1; return $id; } # Unregister a single pattern sub unreg { my($s, $id) = @_; $s->{node}->_unregister($id); delete $s->{pat}{$id}; } # For convenience. Same as ->reg(), but unregisters automatically after one call. sub reg_once { my($s, $pat, $cb) = @_; my $id; $id = $s->reg($pat, sub { $s->unreg($id); $cb->(@_) }); return $id; } sub send { my $s = shift; $s->{node}->send(@_); } # "Close" the session (simply unregisters all its patterns) sub close { my $s = shift; $s->{node}->_unregister($_) for (keys %{$s->{pat}}); $s->{pat} = {}; } package Tanja::ReturnPath; use strict; use warnings; sub _new { my($own, $cb) = @_; return bless \$cb, $own; } sub null { my $cb = shift; return !$$cb; } sub reply { my($cb, $t) = @_; $$cb && AnyEvent::postpone { $$cb->($t); }; } sub DESTROY { my $cb = shift; my $c = $$cb; $c && AnyEvent::postpone { $c->() }; undef $$cb; } package Tanja::Link; use strict; use warnings; use Errno 'EPIPE'; use Carp 'croak'; # Args: Tanja::Node, %options # Options: # sync => bool. True to fetch the pattern list of the remote node. (Default true) # handle => AnyEvent::Handle. Equivalent to setting write_handle and read_handle to the same thing. # write_handle => AnyEvent::Handle. Used for writing. # read_handle => AnyEvent::Handle. Used for reading. # on_error => $cb->($message). Called when an error occurs, $message = undef for disconnect. # on_ready => $cb->(). Called when the link is "ready". (i.e. if you send tuples they will be forwarded) # on_write => $cb->(message => args). Called when queueing a (non-handshake) message for sending. # on_read => $cb->(message => args). Called when a (non-handshake) message has been received. # formats => [ name => 'module', ... ], sub new { my($own, $node, %o) = @_; require AnyEvent::Handle; my $s = bless { sync => 1, formats => [ json => 'Tanja::Link::JSON', ], %o, node => $node, reg => {}, ret => {}, lastret => 1, lasttup => [], fmti => undef, fmto => undef, }, $own; $s->{write_handle} ||= $s->{handle}; $s->{read_handle} ||= $s->{handle}; croak 'No read or write handle set.' if !$s->{write_handle} || !$s->{read_handle}; # Init all formats and weed out those that can't be used. my %fmts; my @fmts; for (grep $_%2==0, keys @{$s->{formats}}) { my($n, $m) = ($s->{formats}[$_], $s->{formats}[$_+1]); next if $fmts{$n}++; next if !eval "$m->init()"; push @fmts, $n, $m; } $s->{formats} = \@fmts; croak 'Can\'t communicate without serialization formats.' if !@{$s->{formats}}; $s->{tup} = $s->_tuple; $s->{write_handle}->on_error(sub { $s->_err($! == EPIPE ? undef : $_[2]) }); $s->{read_handle}->on_error(sub { $s->_err($! == EPIPE ? undef : $_[2]) }); # TODO: Better to have a timeout on writes. (Although this buffer limit isn't bad either) $s->{write_handle}->wbuf_max(5*1024*1024); # This would be really bad. $s->{read_handle}->rbuf_max(5*1024*1024); $s->_handshake; return $s; } sub _cleanup { my $s = shift; $s->{node}->_unregister($_) for (values %{$s->{pat}}); $s->{pat} = {}; delete $s->{node}{lnk}{$s}; } sub _err { my($s, $m) = @_; $s->_cleanup; $s->{read_handle}->destroy; $s->{write_handle}->destroy; $s->{on_error} && $s->{on_error}->($m); } sub close { $_[0]->_cleanup; $_[0]->{write_handle}->push_shutdown; $_[0]->{read_handle}->push_shutdown; } sub _write { my $s = shift; $s->{write_handle}->push_write($s->{fmto} => @_); $s->{on_write} && $s->{on_write}->(@_); } sub _handshake { my $s = shift; # Send our handshake message my @own = map $_%2 == 0 ? $s->{formats}[$_] : (), keys @{$s->{formats}}; my %own = map $_%2 == 0 ? ($s->{formats}[$_], $s->{formats}[$_+1]) : (), keys @{$s->{formats}}; my $own = join ',', @own; $s->{write_handle}->push_write("ver,1.0 seri,$own sero,$own\012"); # Receive handshake from other party $s->{read_handle}->push_read(line => "\012" => sub { my $ver = ($_[1] =~ /\bver,([^ ]+)\b/) && $1; my $seri = ($_[1] =~ /\bseri,([^ ]+)\b/) && $1; my $sero = ($_[1] =~ /\bsero,([^ ]+)\b/) && $1; # Validate return $s->_err("Invalid handshake: $_[1]") if !$ver or !$seri or !$sero or !grep /^1\./, split /,/, $ver or !grep $own{$_}, split /,/, $seri or !grep $own{$_}, split /,/, $sero; # Figure out fmti and fmto $s->{fmto} = (map $own{$_}, split /,/, $seri)[0]; my @sero = split /,/, $sero; for my $i (@own) { if(grep $_ eq $i, @sero) { $s->{fmti} = $own{$i}; last; } } # Handshake complete, start receiving stuff $s->{read_handle}->push_read($s->{fmti} => sub { $s->_recv(@_) }); # Setup tuple forwarding if($s->{sync}) { $s->_write(patternsync => 1); } else { $s->{pat}{_} = $s->{node}->_register([], $s->{tup}); $s->{on_ready} && $s->{on_ready}->(); delete $s->{on_ready}; } }); } # Generates the subroutine used for registering patterns with the local node. sub _tuple { my $s = shift; sub { # tuple, ret, sa # Don't route back tuples that we received from this link return if $_[2] && ref($_[2]) eq 'Tanja::Link' && $_[2] == $s; # Only send a tuple once even if there have been multiple # registrations. (This relies on the property that the Node does not # make a copy of the tuple when dispatching the callbacks.) return if $_[0] == $s->{lasttup}; $s->{lasttup} = $_[0]; # Register return-path, if there is one my $id = $_[1]->null ? 0 : $s->{lastret}; if($id) { do { ++$id >= 1<<30 and $id = 1 } while $s->{ret}{$id}; $s->{ret}{$id} = $_[1]; $s->{lastret} = $id; } $s->_write(tuple => $id+0, $_[0]); } } # Someone registered a pattern with the local node sub _srv_reg { my($s, $id, $pat, $cb) = @_; $cb != $s->{tup} && $s->_write(register => $id+0, $pat); } # Someone unregistered a pattern with the local node sub _srv_unreg { my($s, $id) = @_; # Note that this also sends out unregister messages for patterns that the # connected party just unregistered. This shouldn't be a problem, though. $s->_write(unregister => $id+0); } sub _recv { my($s, $hdl, $cmd, @arg) = @_; # Not very helpful error message... return $s->_err("Invalid message") if !$cmd; $s->{on_read} && $s->{on_read}->($cmd, @arg); # patternsync if($cmd eq 'patternsync') { if($arg[0]) { $s->{node}{lnk}{$s} = $s; $s->{node}{pat}{$_}[1] != $s->{tup} && $s->_write(register => $_+0, $s->{node}{pat}{$_}[0]) for (keys %{$s->{node}{pat}}); $s->_write('regdone'); } else { delete $s->{node}{lnk}{$s}; } # register } elsif($cmd eq 'register') { $s->{pat}{$arg[0]} = $s->{node}->_register($arg[1], $s->{tup}) if $s->{sync}; # regdone } elsif($cmd eq 'regdone') { $s->{on_ready} && $s->{on_ready}->(); delete $s->{on_ready}; # unregister } elsif($cmd eq 'unregister') { if($s->{sync}) { $s->{node}->_unregister($s->{pat}{$arg[0]}); delete $s->{pat}{$arg[0]}; } # tuple } elsif($cmd eq 'tuple') { $s->{node}->send($arg[1], !$arg[0] ? undef : sub { $s->_write($_[0]?'reply':'close', $arg[0]+0, $_[0]?$_[0]:()); }, $s); # reply } elsif($cmd eq 'reply') { $s->{ret}{$arg[0]}->reply($arg[1]) if $s->{ret}{$arg[0]}; # close } elsif($cmd eq 'close') { delete $s->{ret}{$arg[0]}; } # and receive next message $s->{read_handle}->push_read($s->{fmti} => sub { $s->_recv(@_) }); } # JSON serialization format. Requires either JSON or JSON::XS package Tanja::Link::JSON; use strict; use warnings; my @num_to_cmd = ('', qw|patternsync register regdone unregister tuple reply close|); my %cmd_to_num = map +($num_to_cmd[$_], $_), keys @num_to_cmd; my($json, $true, $false); sub init { $json = eval { require JSON::XS; $true = JSON::XS::true(); $false = JSON::XS::false(); JSON::XS->new->utf8 } || eval { require JSON; $true = JSON::true(); $false = JSON::false(); JSON->new->utf8 }; !!$json; } sub anyevent_read_type { my $cb = $_[1]; sub { $_[0]{rbuf} =~ s/^([^\012]*)\012// or return; my $d = eval { $json->decode($1); }; return $cb->($_[0], undef) || 1 if !$d || ref($d) ne 'ARRAY'; my $num = shift @$d; return $cb->($_[0], undef) || 1 if !$num || $num !~ /^\d+$/ || $num > @num_to_cmd; # TODO: argument validation $cb->($_[0], $num_to_cmd[$num], @$d); 1 } } sub anyevent_write_type { my(undef, $cmd, @args) = @_; $json->encode([$cmd_to_num{$cmd}, $cmd eq 'patternsync' ? ($args[0]?$true:$false) : @args])."\012"; } 1; # vim:noet:sw=4:ts=4