package Tanja; use strict; use warnings; # Just make sure it's loaded. It doesn't export anything. use AnyEvent (); # Args: [ pattern ], [ tuple ] # Returns: a true value on match, a false value otherwise # TODO: this match() implementation is for experimental purposes only, it # should actually follow some established semantics to interoperate properly. # undef is used as wildcard (in either the tuple or the pattern). sub match { my($p, $t) = @_; return 0 if @$p > @$t; for my $i (keys @$p) { next if !defined $p->[$i] or !defined $t->[$i]; return 0 if $p->[$i] ne $t->[$i]; } return 1; } package Tanja::Server; use strict; use warnings; # Create a new server (representing a "network") sub new { return bless({ lastid => 1, pat => {}, # Note: Also used directly by Tanja::Link lnk => {}, # Same note. }, shift); } # Create a new session sub session { return Tanja::Session->new(shift); } # Link with another server via an AnyEvent::Handle stream sub link { return Tanja::Link->new(@_); } # Send a tuple to the network. # Arguments: [ tuple ], $return_cb->([tuple] or undef, $special_arg), $special_arg # $special_arg is local only within the server, it will not be routed. # TODO: Some way to indicate that the session isn't interested in replies anymore? sub send { my($s, $t, $cb, $sa) = @_; my $ret = Tanja::ReturnPath->_new($cb); for my $reg (grep Tanja::match($_->[0], $t), values %{$s->{pat}}) { AnyEvent::postpone { $reg->[1]->($t, $ret, $sa); } } } sub _register { my($s, $pat, $cb) = @_; my $id = $s->{lastid}; # Explicitely wrap around long before 2^31, to avoid having Perl turn the ID into a float. do { ++$id >= 1<<30 and $id = 1 } while $s->{pat}{$id}; $s->{pat}{$id} = [ $pat, $cb ]; $s->{lastid} = $id; $_->_srv_reg($id, $pat, $cb) for(values %{$s->{lnk}}); return $id; } sub _unregister { my($s, $id) = @_; if(delete $s->{pat}{$id}) { $_->_srv_unreg($id) for(values %{$s->{lnk}}); } } package Tanja::Session; use strict; use warnings; # Create a new session (usually called by $serv->session) sub new { my($own, $serv) = @_; return bless({ server => $serv, pat => {}, }, $own); } # Register for a pattern # Args: [ pattern ], callback # Returns: $id, for use with unreg() # Callback: # Args: [ tuple ], $return_path sub reg { my($s, $pat, $cb) = @_; my $id; $id = $s->{server}->_register($pat, sub { $s->{pat}{$id} && $cb->(@_) }); $s->{pat}{$id} = 1; return $id; } # Unregister a single pattern sub unreg { my($s, $id) = @_; $s->{server}->_unregister($id); delete $s->{pat}{$id}; } # For convenience. Same as ->reg(), but unregisters automatically after one call. sub reg_once { my($s, $pat, $cb) = @_; my $id; $id = $s->reg($pat, sub { $s->unreg($id); $cb->(@_) }); return $id; } sub send { my $s = shift; $s->{server}->send(@_); } # "Close" the session (simply unregisters all its patterns) sub close { my $s = shift; $s->{server}->_unregister($_) for (keys %{$s->{pat}}); $s->{pat} = {}; } package Tanja::ReturnPath; use strict; use warnings; sub _new { my($own, $cb) = @_; return bless \$cb, $own; } sub null { my $cb = shift; return !$$cb; } sub reply { my($cb, $t) = @_; $$cb && AnyEvent::postpone { $$cb->($t); }; } sub DESTROY { my $cb = shift; my $c = $$cb; $c && AnyEvent::postpone { $c->() }; undef $$cb; } package Tanja::Link; use strict; use warnings; use Errno 'EPIPE'; use Carp 'croak'; # Args: Tanja::Server, $server||!$client, AnyEvent::Handle, $error->($message) # Automatically initiates handshake and stuff. Error callback will also be # called when the remote side simply disconnected, $message is in that case # undef. sub new { my($own, $serv, $init, $handle, $err) = @_; require AnyEvent::Handle; croak 'No JSON module available.' if !Tanja::Link::JSON->init; my $s = bless { serv => $serv, hdl => $handle, init => $init, err => $err, reg => {}, ret => {}, lastret => 1, }, $own; $s->{tup} = $s->_tuple; $serv->{lnk}{$s} = $s; $handle->on_error(sub { $s->_err($! == EPIPE ? undef : $_[2]) }); # TODO: Better to have a timeout on writes. (Although this buffer limit isn't bad either) $handle->wbuf_max(5*1024*1024); # This would be really bad. $s->_handshake; return $s; } sub _err { my($s, $m) = @_; delete $_[0]{serv}{$_[0]}; $s->{hdl}->destroy; $s->{err} && $s->{err}->($m); } sub _handshake { my $s = shift; # If we should initiate the handshake, do so $s->{init} && $s->{hdl}->push_write("ver,1.0 ser,json\012"); # Receive handshake from other party $s->{hdl}->push_read(line => "\012" => sub { my $ver = ($_[1] =~ /\bver,([^ ]+)\b/) && $1; my $ser = ($_[1] =~ /\bser,([^ ]+)\b/) && $1; # Validate $s->_err("Invalid handshake: $_[1]") if !$ver || !$ser || ($s->{init} ? $ver ne '1.0' : !grep /^1\./, split /,/, $ver) || ($s->{init} ? $ser ne 'json' : !grep $_ eq 'json', split /,/, $ser); # Reply with our handshake if we hadn't sent it yet. !$s->{init} && $s->{hdl}->push_write("ver,1.0 ser,json\012"); # Handshake complete, send out initial register messages and start receiving stuff $s->{hdl}->push_write('Tanja::Link::JSON' => register => $_+0, $s->{serv}{pat}{$_}[0]) for (keys %{$s->{serv}{pat}}); $s->{hdl}->push_read('Tanja::Link::JSON' => sub { $s->_recv(@_) }); }); } # Generates the subroutine used for registering patterns with the server. sub _tuple { my $s = shift; sub { # tuple, ret, sa # Don't route back tuples that we received from this link return if $_[2] && ref($_[2]) eq 'Tanja::Link' && $_[2] == $s; # Register return-path, if there is one my $id = $_[1]->null ? 0 : $s->{lastret}; if($id) { do { ++$id >= 1<<30 and $id = 1 } while $s->{ret}{$id}; $s->{ret}{$id} = $_[1]; $s->{lastret} = $id; } $s->{hdl}->push_write('Tanja::Link::JSON' => tuple => $id+0, $_[0]); } } # Someone registered a pattern with the server sub _srv_reg { my($s, $id, $pat, $cb) = @_; $cb != $s->{tup} && $s->{hdl}->push_write('Tanja::Link::JSON' => register => $id+0, $pat); } # Someone unregistered a pattern with the server sub _srv_unreg { my($s, $id) = @_; # Note that this also sends out unregister messages for patterns that the # connected party just unregistered. This shouldn't be a problem, though. $s->{hdl}->push_write('Tanja::Link::JSON' => unregister => $id+0); } sub _recv { my($s, $hdl, $cmd, @arg) = @_; # Not very helpful error message... return $s->_err("Invalid message") if !$cmd; # register if($cmd eq 'register') { $s->{pat}{$arg[0]} = $s->{serv}->_register($arg[1], $s->{tup}); # unregister } elsif($cmd eq 'unregister') { $s->{serv}->_unregister($s->{pat}{$arg[0]}); delete $s->{pat}{$arg[0]}; # tuple } elsif($cmd eq 'tuple') { $s->{serv}->send($arg[1], !$arg[0] ? undef : sub { $s->{hdl}->push_write('Tanja::Link::JSON' => $_[0]?'reply':'close', $arg[0]+0, $_[0]?$_[0]:()); }, $s); # reply } elsif($cmd eq 'reply') { $s->{ret}{$arg[0]}->reply($arg[1]) if $s->{ret}{$arg[0]}; # close } elsif($cmd eq 'close') { delete $s->{ret}{$arg[0]}; } # and receive next message $s->{hdl}->push_read('Tanja::Link::JSON' => sub { $s->_recv(@_) }); } sub close { delete $_[0]{serv}{$_[0]}; $_[0]->{hdl}->push_shutdown; } # JSON serialization format. Requires either JSON or JSON::XS package Tanja::Link::JSON; use strict; use warnings; my @num_to_cmd = ('', qw|register unregister tuple reply close|); my %cmd_to_num = map +($num_to_cmd[$_], $_), keys @num_to_cmd; my $json; sub init { $json = eval { require JSON::XS; JSON::XS->new->utf8 } || eval { require JSON; JSON->new->utf8 }; !!$json; } sub anyevent_read_type { my $cb = $_[1]; sub { $_[0]{rbuf} =~ s/^([^\012]*)\012// or return; my $d = eval { $json->decode($1); }; return $cb->($_[0], undef) || 1 if !$d || ref($d) ne 'ARRAY'; my $num = shift @$d; return $cb->($_[0], undef) || 1 if !$num || $num !~ /^\d+$/ || $num > @num_to_cmd; # TODO: argument validation $cb->($_[0], $num_to_cmd[$num], @$d); 1 } } sub anyevent_write_type { my(undef, $cmd, @args) = @_; $json->encode([$cmd_to_num{$cmd}, @args])."\012"; } 1; # vim:noet:sw=4:ts=4