package Tanja; use strict; use warnings; # Just make sure it's loaded. It doesn't export anything. use AnyEvent (); # Args: [ pattern ], [ tuple ] # Returns: a true value on match, a false value otherwise # TODO: this match() implementation is for experimental purposes only, it # should actually follow some established semantics to interoperate properly. # undef is used as wildcard (in either the tuple or the pattern). sub match { my($p, $t) = @_; return 0 if @$p > @$t; for my $i (keys @$p) { next if !defined $p->[$i] or !defined $t->[$i]; return 0 if $p->[$i] ne $t->[$i]; } return 1; } package Tanja::Server; use strict; use warnings; # Create a new server (representing a "network") sub new { return bless({ lastid => 1, pat => {}, # Note: Also used directly by Tanja::Link lnk => {}, # Same note. }, shift); } # Create a new session sub session { return Tanja::Session->new(shift); } # Link with another server via an AnyEvent::Handle stream sub link { return Tanja::Link->new(@_); } # Send a tuple to the network. # Arguments: [ tuple ], $return_cb->([tuple] or undef, $special_arg), $special_arg # $special_arg is local only within the server, it will not be routed. # TODO: Some way to indicate that the session isn't interested in replies anymore? sub send { my($s, $t, $cb, $sa) = @_; my $ret = Tanja::ReturnPath->_new($cb); for my $reg (grep Tanja::match($_->[0], $t), values %{$s->{pat}}) { AnyEvent::postpone { $reg->[1]->($t, $ret, $sa); } } } sub _register { my($s, $pat, $cb) = @_; my $id = $s->{lastid}; # Explicitely wrap around long before 2^31, to avoid having Perl turn the ID into a float. do { ++$id >= 1<<30 and $id = 1 } while $s->{pat}{$id}; $s->{pat}{$id} = [ $pat, $cb ]; $s->{lastid} = $id; $_->_srv_reg($id, $pat, $cb) for(values %{$s->{lnk}}); return $id; } sub _unregister { my($s, $id) = @_; if(delete $s->{pat}{$id}) { $_->_srv_unreg($id) for(values %{$s->{lnk}}); } } package Tanja::Session; use strict; use warnings; # Create a new session (usually called by $serv->session) sub new { my($own, $serv) = @_; return bless({ server => $serv, pat => {}, }, $own); } # Register for a pattern # Args: [ pattern ], callback # Returns: $id, for use with unreg() # Callback: # Args: [ tuple ], $return_path sub reg { my($s, $pat, $cb) = @_; my $id; $id = $s->{server}->_register($pat, sub { $s->{pat}{$id} && $cb->(@_) }); $s->{pat}{$id} = 1; return $id; } # Unregister a single pattern sub unreg { my($s, $id) = @_; $s->{server}->_unregister($id); delete $s->{pat}{$id}; } # For convenience. Same as ->reg(), but unregisters automatically after one call. sub reg_once { my($s, $pat, $cb) = @_; my $id; $id = $s->reg($pat, sub { $s->unreg($id); $cb->(@_) }); return $id; } sub send { my $s = shift; $s->{server}->send(@_); } # "Close" the session (simply unregisters all its patterns) sub close { my $s = shift; $s->{server}->_unregister($_) for (keys %{$s->{pat}}); $s->{pat} = {}; } package Tanja::ReturnPath; use strict; use warnings; sub _new { my($own, $cb) = @_; return bless \$cb, $own; } sub null { my $cb = shift; return !$$cb; } sub reply { my($cb, $t) = @_; $$cb && AnyEvent::postpone { $$cb->($t); }; } sub DESTROY { my $cb = shift; my $c = $$cb; $c && AnyEvent::postpone { $c->() }; undef $$cb; } package Tanja::Link; use strict; use warnings; use Errno 'EPIPE'; use Carp 'croak'; # Args: Tanja::Server, AnyEvent::Handle, %options # Options: # sync => bool. True to fetch the other servers' patterns. (Default true) # on_error => $cb->($message). Called when an error occurs, $message = undef for disconnect. # on_ready => $cb->(). Called when the link is "ready". (i.e. if you send tuples they will be forwarded) # on_write => $cb->(message => args). Called when queueing a (non-handshake) message for sending. # on_read => $cb->(message => args). Called when a (non-handshake) message has been received. sub new { my($own, $serv, $handle, %o) = @_; require AnyEvent::Handle; croak 'No JSON module available.' if !Tanja::Link::JSON->init; my $s = bless { sync => 1, %o, serv => $serv, hdl => $handle, reg => {}, ret => {}, lastret => 1, lasttup => [], }, $own; $s->{tup} = $s->_tuple; $handle->on_error(sub { $s->_err($! == EPIPE ? undef : $_[2]) }); # TODO: Better to have a timeout on writes. (Although this buffer limit isn't bad either) $handle->wbuf_max(5*1024*1024); # This would be really bad. $handle->rbuf_max(5*1024*1024); $s->_handshake; return $s; } sub _cleanup { my $s = shift; $s->{serv}->_unregister($_) for (values %{$s->{pat}}); $s->{pat} = {}; delete $s->{serv}{lnk}{$s}; } sub _err { my($s, $m) = @_; $s->_cleanup; $s->{hdl}->destroy; $s->{on_error} && $s->{on_error}->($m); } sub close { $_[0]->_cleanup; $_[0]->{hdl}->push_shutdown; } sub _write { my $s = shift; $s->{hdl}->push_write('Tanja::Link::JSON' => @_); $s->{on_write} && $s->{on_write}->(@_); } sub _handshake { my $s = shift; # Send our handshake message $s->{hdl}->push_write("ver,1.0 seri,json sero,json\012"); # Receive handshake from other party $s->{hdl}->push_read(line => "\012" => sub { my $ver = ($_[1] =~ /\bver,([^ ]+)\b/) && $1; my $seri = ($_[1] =~ /\bseri,([^ ]+)\b/) && $1; my $sero = ($_[1] =~ /\bsero,([^ ]+)\b/) && $1; # Validate $s->_err("Invalid handshake: $_[1]") if !$ver or !$seri or !$sero or !grep /^1\./, split /,/, $ver or !grep $_ eq 'json', split /,/, $seri or !grep $_ eq 'json', split /,/, $sero; # Handshake complete, start receiving stuff $s->{hdl}->push_read('Tanja::Link::JSON' => sub { $s->_recv(@_) }); # Setup tuple forwarding if($s->{sync}) { $s->_write(patternsync => 1); } else { $s->{pat}{_} = $s->{serv}->_register([], $s->{tup}); $s->{on_ready} && $s->{on_ready}->(); delete $s->{on_ready}; } }); } # Generates the subroutine used for registering patterns with the server. sub _tuple { my $s = shift; sub { # tuple, ret, sa # Don't route back tuples that we received from this link return if $_[2] && ref($_[2]) eq 'Tanja::Link' && $_[2] == $s; # Only send a tuple once even if there have been multiple # registrations. (This relies on the property that the Server does not # make a copy of the tuple when dispatching the callbacks.) return if $_[0] == $s->{lasttup}; $s->{lasttup} = $_[0]; # Register return-path, if there is one my $id = $_[1]->null ? 0 : $s->{lastret}; if($id) { do { ++$id >= 1<<30 and $id = 1 } while $s->{ret}{$id}; $s->{ret}{$id} = $_[1]; $s->{lastret} = $id; } $s->_write(tuple => $id+0, $_[0]); } } # Someone registered a pattern with the server sub _srv_reg { my($s, $id, $pat, $cb) = @_; $cb != $s->{tup} && $s->_write(register => $id+0, $pat); } # Someone unregistered a pattern with the server sub _srv_unreg { my($s, $id) = @_; # Note that this also sends out unregister messages for patterns that the # connected party just unregistered. This shouldn't be a problem, though. $s->_write(unregister => $id+0); } sub _recv { my($s, $hdl, $cmd, @arg) = @_; # Not very helpful error message... return $s->_err("Invalid message") if !$cmd; $s->{on_read} && $s->{on_read}->($cmd, @arg); # patternsync if($cmd eq 'patternsync') { if($arg[0]) { $s->{serv}{lnk}{$s} = $s; $s->{serv}{pat}{$_}[1] != $s->{tup} && $s->_write(register => $_+0, $s->{serv}{pat}{$_}[0]) for (keys %{$s->{serv}{pat}}); $s->_write('regdone'); } else { delete $s->{serv}{lnk}{$s}; } # register } elsif($cmd eq 'register') { $s->{pat}{$arg[0]} = $s->{serv}->_register($arg[1], $s->{tup}) if $s->{sync}; # regdone } elsif($cmd eq 'regdone') { $s->{on_ready} && $s->{on_ready}->(); delete $s->{on_ready}; # unregister } elsif($cmd eq 'unregister') { if($s->{sync}) { $s->{serv}->_unregister($s->{pat}{$arg[0]}); delete $s->{pat}{$arg[0]}; } # tuple } elsif($cmd eq 'tuple') { $s->{serv}->send($arg[1], !$arg[0] ? undef : sub { $s->_write($_[0]?'reply':'close', $arg[0]+0, $_[0]?$_[0]:()); }, $s); # reply } elsif($cmd eq 'reply') { $s->{ret}{$arg[0]}->reply($arg[1]) if $s->{ret}{$arg[0]}; # close } elsif($cmd eq 'close') { delete $s->{ret}{$arg[0]}; } # and receive next message $s->{hdl}->push_read('Tanja::Link::JSON' => sub { $s->_recv(@_) }); } # JSON serialization format. Requires either JSON or JSON::XS package Tanja::Link::JSON; use strict; use warnings; my @num_to_cmd = ('', qw|patternsync register regdone unregister tuple reply close|); my %cmd_to_num = map +($num_to_cmd[$_], $_), keys @num_to_cmd; my($json, $true, $false); sub init { $json = eval { require JSON::XS; $true = JSON::XS::true(); $false = JSON::XS::false(); JSON::XS->new->utf8 } || eval { require JSON; $true = JSON::true(); $false = JSON::false(); JSON->new->utf8 }; !!$json; } sub anyevent_read_type { my $cb = $_[1]; sub { $_[0]{rbuf} =~ s/^([^\012]*)\012// or return; my $d = eval { $json->decode($1); }; return $cb->($_[0], undef) || 1 if !$d || ref($d) ne 'ARRAY'; my $num = shift @$d; return $cb->($_[0], undef) || 1 if !$num || $num !~ /^\d+$/ || $num > @num_to_cmd; # TODO: argument validation $cb->($_[0], $num_to_cmd[$num], @$d); 1 } } sub anyevent_write_type { my(undef, $cmd, @args) = @_; $json->encode([$cmd_to_num{$cmd}, $cmd eq 'patternsync' ? ($args[0]?$true:$false) : @args])."\012"; } 1; # vim:noet:sw=4:ts=4