From 958580125aa74b825a3198f184719fa28e82b82f Mon Sep 17 00:00:00 2001 From: Yorhel Date: Thu, 22 Mar 2012 19:36:41 +0100 Subject: Moved Perl implementation to a separate repository --- perl/Tanja.pm | 472 ---------------------------------------------------------- perl/test.pl | 141 ------------------ 2 files changed, 613 deletions(-) delete mode 100644 perl/Tanja.pm delete mode 100755 perl/test.pl diff --git a/perl/Tanja.pm b/perl/Tanja.pm deleted file mode 100644 index a0873f8..0000000 --- a/perl/Tanja.pm +++ /dev/null @@ -1,472 +0,0 @@ -package Tanja; - -use strict; -use warnings; - -# Just make sure it's loaded. It doesn't export anything. -use AnyEvent (); - - -# Args: [ pattern ], [ tuple ] -# Returns: a true value on match, a false value otherwise -# TODO: this match() implementation is for experimental purposes only, it -# should actually follow some established semantics to interoperate properly. -# undef is used as wildcard (in either the tuple or the pattern). -sub match { - my($p, $t) = @_; - return 0 if @$p > @$t; - for my $i (keys @$p) { - next if !defined $p->[$i] or !defined $t->[$i]; - return 0 if $p->[$i] ne $t->[$i]; - } - return 1; -} - - -package Tanja::Node; - -use strict; -use warnings; - - -# Create a new Node -sub new { - return bless({ - lastid => 1, - pat => {}, # Note: Also used directly by Tanja::Link - lnk => {}, # Same note. - }, shift); -} - - -# Create a new session -sub session { - return Tanja::Session->new(shift); -} - - -# Link with a remote node via an AnyEvent::Handle stream -sub link { - return Tanja::Link->new(@_); -} - - -# Send a tuple to the network. -# Arguments: [ tuple ], $return_cb->([tuple] or undef, $special_arg), $special_arg -# $special_arg is local only within the node, it will not be routed. -# TODO: Some way to indicate that the session isn't interested in replies anymore? -sub send { - my($s, $t, $cb, $sa) = @_; - my $ret = Tanja::ReturnPath->_new($cb); - for my $reg (grep Tanja::match($_->[0], $t), values %{$s->{pat}}) { - AnyEvent::postpone { - $reg->[1]->($t, $ret, $sa, \$reg->[2]); - } - } -} - - -sub _register { - my($s, $pat, $cb) = @_; - my $id = $s->{lastid}; - # Explicitely wrap around long before 2^31, to avoid having Perl turn the ID into a float. - do { ++$id >= 1<<30 and $id = 1 } while $s->{pat}{$id}; - $s->{pat}{$id} = [ $pat, $cb, 1 ]; - $s->{lastid} = $id; - $_->_srv_reg($id, $pat, $cb) for(values %{$s->{lnk}}); - return $id; -} - - -sub _unregister { - my($s, $id) = @_; - if($s->{pat}{$id}) { - $s->{pat}{$id}[2] = 0; - delete $s->{pat}{$id}; - $_->_srv_unreg($id) for(values %{$s->{lnk}}); - } -} - - -package Tanja::Session; - -use strict; -use warnings; - - -# Create a new session (usually called by $node->session) -sub new { - my($own, $node) = @_; - return bless({ - node => $node, - pat => {}, - }, $own); -} - - -# Register for a pattern -# Args: [ pattern ], callback -# Returns: $id, for use with unreg() -# Callback: -# Args: [ tuple ], $return_path -sub reg { - my($s, $pat, $cb) = @_; - my $id; - $id = $s->{node}->_register($pat, sub { ${$_[3]} && $cb->($_[0], $_[1]) }); - $s->{pat}{$id} = 1; - return $id; -} - - -# Unregister a single pattern -sub unreg { - my($s, $id) = @_; - $s->{node}->_unregister($id); - delete $s->{pat}{$id}; -} - - -# For convenience. Same as ->reg(), but unregisters automatically after one call. -sub reg_once { - my($s, $pat, $cb) = @_; - my $id; - $id = $s->reg($pat, sub { $s->unreg($id); $cb->(@_) }); - return $id; -} - - -sub send { - my $s = shift; - $s->{node}->send(@_); -} - - -# "Close" the session (simply unregisters all its patterns) -sub close { - my $s = shift; - $s->{node}->_unregister($_) for (keys %{$s->{pat}}); - $s->{pat} = {}; -} - - -package Tanja::ReturnPath; - -use strict; -use warnings; - - -sub _new { - my($own, $cb) = @_; - return bless \$cb, $own; -} - - -sub null { - my $cb = shift; - return !$$cb; -} - - -sub reply { - my($cb, $t) = @_; - $$cb && AnyEvent::postpone { $$cb->($t); }; -} - - -sub DESTROY { - my $cb = shift; - my $c = $$cb; - $c && AnyEvent::postpone { $c->() }; - undef $$cb; -} - - -package Tanja::Link; - -use strict; -use warnings; -use Errno 'EPIPE'; -use Carp 'croak'; - - -# Args: Tanja::Node, %options -# Options: -# sync => bool. True to fetch the pattern list of the remote node. (Default true) -# handle => AnyEvent::Handle. Equivalent to setting write_handle and read_handle to the same thing. -# write_handle => AnyEvent::Handle. Used for writing. -# read_handle => AnyEvent::Handle. Used for reading. -# on_error => $cb->($message). Called when an error occurs, $message = undef for disconnect. -# on_ready => $cb->(). Called when the link is "ready". (i.e. if you send tuples they will be forwarded) -# on_write => $cb->(message => args). Called when queueing a (non-handshake) message for sending. -# on_read => $cb->(message => args). Called when a (non-handshake) message has been received. -# formats => [ name => 'module', ... ], -sub new { - my($own, $node, %o) = @_; - require AnyEvent::Handle; - my $s = bless { - sync => 1, - formats => [ - json => 'Tanja::Link::JSON', - ], - %o, - node => $node, - reg => {}, - ret => {}, - lastret => 1, - lasttup => [], - fmti => undef, - fmto => undef, - }, $own; - - $s->{write_handle} ||= $s->{handle}; - $s->{read_handle} ||= $s->{handle}; - croak 'No read or write handle set.' if !$s->{write_handle} || !$s->{read_handle}; - - # Init all formats and weed out those that can't be used. - my %fmts; - my @fmts; - for (grep $_%2==0, keys @{$s->{formats}}) { - my($n, $m) = ($s->{formats}[$_], $s->{formats}[$_+1]); - next if $fmts{$n}++; - next if !eval "$m->init()"; - push @fmts, $n, $m; - } - $s->{formats} = \@fmts; - croak 'Can\'t communicate without serialization formats.' if !@{$s->{formats}}; - - $s->{tup} = $s->_tuple; - $s->{write_handle}->on_error(sub { $s->_err($! == EPIPE ? undef : $_[2]) }); - $s->{read_handle}->on_error(sub { $s->_err($! == EPIPE ? undef : $_[2]) }); - - # TODO: Better to have a timeout on writes. (Although this buffer limit isn't bad either) - $s->{write_handle}->wbuf_max(5*1024*1024); # This would be really bad. - $s->{read_handle}->rbuf_max(5*1024*1024); - $s->_handshake; - return $s; -} - - -sub _cleanup { - my $s = shift; - $s->{node}->_unregister($_) for (values %{$s->{pat}}); - $s->{pat} = {}; - delete $s->{node}{lnk}{$s}; -} - -sub _err { - my($s, $m) = @_; - $s->_cleanup; - $s->{read_handle}->destroy; - $s->{write_handle}->destroy; - $s->{on_error} && $s->{on_error}->($m); -} - - -sub close { - $_[0]->_cleanup; - $_[0]->{write_handle}->push_shutdown; - $_[0]->{read_handle}->push_shutdown; -} - - -sub _write { - my $s = shift; - $s->{write_handle}->push_write($s->{fmto} => @_); - $s->{on_write} && $s->{on_write}->(@_); -} - - -sub _handshake { - my $s = shift; - - # Send our handshake message - my @own = map $_%2 == 0 ? $s->{formats}[$_] : (), keys @{$s->{formats}}; - my %own = map $_%2 == 0 ? ($s->{formats}[$_], $s->{formats}[$_+1]) : (), keys @{$s->{formats}}; - my $own = join ',', @own; - $s->{write_handle}->push_write("ver,1.0 seri,$own sero,$own\012"); - - # Receive handshake from other party - $s->{read_handle}->push_read(line => "\012" => sub { - my $ver = ($_[1] =~ /\bver,([^ ]+)\b/) && $1; - my $seri = ($_[1] =~ /\bseri,([^ ]+)\b/) && $1; - my $sero = ($_[1] =~ /\bsero,([^ ]+)\b/) && $1; - - # Validate - return $s->_err("Invalid handshake: $_[1]") if !$ver or !$seri or !$sero - or !grep /^1\./, split /,/, $ver - or !grep $own{$_}, split /,/, $seri - or !grep $own{$_}, split /,/, $sero; - - # Figure out fmti and fmto - $s->{fmto} = (map $own{$_}, split /,/, $seri)[0]; - my @sero = split /,/, $sero; - for my $i (@own) { - if(grep $_ eq $i, @sero) { - $s->{fmti} = $own{$i}; - last; - } - } - - # Handshake complete, start receiving stuff - $s->{read_handle}->push_read($s->{fmti} => sub { $s->_recv(@_) }); - - # Setup tuple forwarding - if($s->{sync}) { - $s->_write(patternsync => 1); - } else { - $s->{pat}{_} = $s->{node}->_register([], $s->{tup}); - $s->{on_ready} && $s->{on_ready}->(); - delete $s->{on_ready}; - } - }); -} - - -# Generates the subroutine used for registering patterns with the local node. -sub _tuple { - my $s = shift; - sub { # tuple, ret, sa - - # Don't route back tuples that we received from this link - return if $_[2] && ref($_[2]) eq 'Tanja::Link' && $_[2] == $s; - - # Only send a tuple once even if there have been multiple - # registrations. (This relies on the property that the Node does not - # make a copy of the tuple when dispatching the callbacks.) - return if $_[0] == $s->{lasttup}; - $s->{lasttup} = $_[0]; - - # Register return-path, if there is one - my $id = $_[1]->null ? 0 : $s->{lastret}; - if($id) { - do { ++$id >= 1<<30 and $id = 1 } while $s->{ret}{$id}; - $s->{ret}{$id} = $_[1]; - $s->{lastret} = $id; - } - - $s->_write(tuple => $id+0, $_[0]); - } -} - - -# Someone registered a pattern with the local node -sub _srv_reg { - my($s, $id, $pat, $cb) = @_; - $cb != $s->{tup} && $s->_write(register => $id+0, $pat); -} - - -# Someone unregistered a pattern with the local node -sub _srv_unreg { - my($s, $id) = @_; - # Note that this also sends out unregister messages for patterns that the - # connected party just unregistered. This shouldn't be a problem, though. - $s->_write(unregister => $id+0); -} - - -sub _recv { - my($s, $hdl, $cmd, @arg) = @_; - - # Not very helpful error message... - return $s->_err("Invalid message") if !$cmd; - - $s->{on_read} && $s->{on_read}->($cmd, @arg); - - # patternsync - if($cmd eq 'patternsync') { - if($arg[0]) { - $s->{node}{lnk}{$s} = $s; - $s->{node}{pat}{$_}[1] != $s->{tup} && $s->_write(register => $_+0, $s->{node}{pat}{$_}[0]) - for (keys %{$s->{node}{pat}}); - $s->_write('regdone'); - } else { - delete $s->{node}{lnk}{$s}; - } - - # register - } elsif($cmd eq 'register') { - $s->{pat}{$arg[0]} = $s->{node}->_register($arg[1], $s->{tup}) if $s->{sync}; - - # regdone - } elsif($cmd eq 'regdone') { - $s->{on_ready} && $s->{on_ready}->(); - delete $s->{on_ready}; - - # unregister - } elsif($cmd eq 'unregister') { - if($s->{sync}) { - $s->{node}->_unregister($s->{pat}{$arg[0]}); - delete $s->{pat}{$arg[0]}; - } - - # tuple - } elsif($cmd eq 'tuple') { - $s->{node}->send($arg[1], !$arg[0] ? undef : sub { - $s->_write($_[0]?'reply':'close', $arg[0]+0, $_[0]?$_[0]:()); - }, $s); - - # reply - } elsif($cmd eq 'reply') { - $s->{ret}{$arg[0]}->reply($arg[1]) if $s->{ret}{$arg[0]}; - - # close - } elsif($cmd eq 'close') { - delete $s->{ret}{$arg[0]}; - } - - # and receive next message - $s->{read_handle}->push_read($s->{fmti} => sub { $s->_recv(@_) }); -} - - -# JSON serialization format. Requires either JSON or JSON::XS -package Tanja::Link::JSON; - -use strict; -use warnings; - -my @num_to_cmd = ('', qw|patternsync register regdone unregister tuple reply close|); -my %cmd_to_num = map +($num_to_cmd[$_], $_), keys @num_to_cmd; -my($json, $true, $false); - - -sub init { - $json = eval { - require JSON::XS; - $true = JSON::XS::true(); - $false = JSON::XS::false(); - JSON::XS->new->utf8 - } || eval { - require JSON; - $true = JSON::true(); - $false = JSON::false(); - JSON->new->utf8 - }; - !!$json; -} - -sub anyevent_read_type { - my $cb = $_[1]; - sub { - $_[0]{rbuf} =~ s/^([^\012]*)\012// or return; - my $d = eval { $json->decode($1); }; - return $cb->($_[0], undef) || 1 if !$d || ref($d) ne 'ARRAY'; - my $num = shift @$d; - return $cb->($_[0], undef) || 1 if !$num || $num !~ /^\d+$/ || $num > @num_to_cmd; - # TODO: argument validation - $cb->($_[0], $num_to_cmd[$num], @$d); - 1 - } -} - - -sub anyevent_write_type { - my(undef, $cmd, @args) = @_; - $json->encode([$cmd_to_num{$cmd}, $cmd eq 'patternsync' ? ($args[0]?$true:$false) : @args])."\012"; -} - - -1; - -# vim:noet:sw=4:ts=4 diff --git a/perl/test.pl b/perl/test.pl deleted file mode 100755 index b0f6272..0000000 --- a/perl/test.pl +++ /dev/null @@ -1,141 +0,0 @@ -#!/usr/bin/perl - -use strict; -use warnings; -use Test::More; -use AnyEvent; -use AnyEvent::Handle; -use Socket; - -my $DEBUG = 0; -if($DEBUG) { - no warnings 'once'; - require Data::Dumper; - $Data::Dumper::Indent = 0; - $Data::Dumper::Terse = 1; -} - -use_ok 'Tanja'; - - -# Simple matching tests. -# (Doesn't test a whole lot, since the semantics aren't final yet anyway). -ok Tanja::match([], []); -ok Tanja::match([], [1, 2, 3]); -ok Tanja::match([], [undef]); -ok !Tanja::match([undef], []); -ok Tanja::match([undef], [1]); -ok Tanja::match([1], [1]); -ok Tanja::match([1], [1, "b"]); -ok Tanja::match([1], [undef, 3]); -ok !Tanja::match([2], [1]); - - -# Simple single-session test -{ - my $node = Tanja::Node->new; - isa_ok $node, 'Tanja::Node'; - my $ses = $node->session; - isa_ok $ses, 'Tanja::Session'; - my $done = AnyEvent->condvar; - my $n = 0; - my $n2 = 0; - $ses->reg([], sub { - my($t, $r) = @_; - isa_ok $r, 'Tanja::ReturnPath'; - ok $r->null; - is_deeply $t, [$n]; - ok $n <= 5; - if(++$n == 5) { - $ses->close; - $done->send; - } - }); - $ses->reg_once([undef], sub { - my($t, $r) = @_; - isa_ok $r, 'Tanja::ReturnPath'; - ok $r->null; - is_deeply $t, [0]; - $n2++; - }); - $ses->send([$_]) for (0..10); - is $n, 0; # Make sure that ->send() doesn't run the callbacks. The event system should. - is $n2, 0; - $done->recv; - is $n, 5; - is $n2, 1; -} - - -# Simple double-session test with return-path -sub t_double { - my($sa, $sb, $link) = @_; - my $a = $sa->session; - my $b = $sb->session; - my $done = AnyEvent->condvar; - my $msgn = 0; - $a->reg(["msg"], sub { - my($t, $r) = @_; - ok !$msgn++; - isa_ok $r, 'Tanja::ReturnPath'; - ok !$r->null; - $r->reply(['b', 9]); - is_deeply $t, ["msg", 'a']; - $a->send(["b"]); - }); - my $bn = 0; - $b->reg(["b"], sub { - my($t, $r) = @_; - ok !$bn++; - isa_ok $r, 'Tanja::ReturnPath'; - ok $r->null; - is_deeply $t, ["b"]; - $done->send; - }); - $link && $link->(); - my $n = 0; - $b->send(["msg", 'a'], sub { - !$n++ ? is_deeply $_[0], ['b', 9] : ok !@_; - }); - $done->recv; - is $n, 2; -} - -{ # same node - my $s = Tanja::Node->new; - note 'Same Node'; - t_double($s, $s); -} - -{ # different nodes, linked. (With various combinations of the 'sync' flag) - for my $f (0..3) { - note "Linked nodes, $f"; - my $sa = Tanja::Node->new; - my $sb = Tanja::Node->new; - t_double($sa, $sb, sub { - socketpair my $socka, my $sockb, AF_UNIX, SOCK_STREAM, PF_UNSPEC; - my $done = AnyEvent->condvar; - $done->begin; - $done->begin; - $sa->link( - handle => AnyEvent::Handle->new(fh => $socka), - sync => $f&1, - on_ready => sub { $done->end }, - $DEBUG ? (on_write => sub { note 'A: ',Data::Dumper::Dumper(\@_) }) : (), - ); - $sb->link( - handle => AnyEvent::Handle->new(fh => $sockb), - sync => $f&2, - on_ready => sub { $done->end }, - $DEBUG ? (on_write => sub { note 'B: ',Data::Dumper::Dumper(\@_) }) : (), - ); - $done->recv; - }); - } -} - - - -done_testing(); - -# vim:noet:sw=4:ts=4 -- cgit v1.2.3