summaryrefslogtreecommitdiff
path: root/perl/Tanja.pm
diff options
context:
space:
mode:
Diffstat (limited to 'perl/Tanja.pm')
-rw-r--r--perl/Tanja.pm472
1 files changed, 0 insertions, 472 deletions
diff --git a/perl/Tanja.pm b/perl/Tanja.pm
deleted file mode 100644
index a0873f8..0000000
--- a/perl/Tanja.pm
+++ /dev/null
@@ -1,472 +0,0 @@
-package Tanja;
-
-use strict;
-use warnings;
-
-# Just make sure it's loaded. It doesn't export anything.
-use AnyEvent ();
-
-
-# Args: [ pattern ], [ tuple ]
-# Returns: a true value on match, a false value otherwise
-# TODO: this match() implementation is for experimental purposes only, it
-# should actually follow some established semantics to interoperate properly.
-# undef is used as wildcard (in either the tuple or the pattern).
-sub match {
- my($p, $t) = @_;
- return 0 if @$p > @$t;
- for my $i (keys @$p) {
- next if !defined $p->[$i] or !defined $t->[$i];
- return 0 if $p->[$i] ne $t->[$i];
- }
- return 1;
-}
-
-
-package Tanja::Node;
-
-use strict;
-use warnings;
-
-
-# Create a new Node
-sub new {
- return bless({
- lastid => 1,
- pat => {}, # Note: Also used directly by Tanja::Link
- lnk => {}, # Same note.
- }, shift);
-}
-
-
-# Create a new session
-sub session {
- return Tanja::Session->new(shift);
-}
-
-
-# Link with a remote node via an AnyEvent::Handle stream
-sub link {
- return Tanja::Link->new(@_);
-}
-
-
-# Send a tuple to the network.
-# Arguments: [ tuple ], $return_cb->([tuple] or undef, $special_arg), $special_arg
-# $special_arg is local only within the node, it will not be routed.
-# TODO: Some way to indicate that the session isn't interested in replies anymore?
-sub send {
- my($s, $t, $cb, $sa) = @_;
- my $ret = Tanja::ReturnPath->_new($cb);
- for my $reg (grep Tanja::match($_->[0], $t), values %{$s->{pat}}) {
- AnyEvent::postpone {
- $reg->[1]->($t, $ret, $sa, \$reg->[2]);
- }
- }
-}
-
-
-sub _register {
- my($s, $pat, $cb) = @_;
- my $id = $s->{lastid};
- # Explicitely wrap around long before 2^31, to avoid having Perl turn the ID into a float.
- do { ++$id >= 1<<30 and $id = 1 } while $s->{pat}{$id};
- $s->{pat}{$id} = [ $pat, $cb, 1 ];
- $s->{lastid} = $id;
- $_->_srv_reg($id, $pat, $cb) for(values %{$s->{lnk}});
- return $id;
-}
-
-
-sub _unregister {
- my($s, $id) = @_;
- if($s->{pat}{$id}) {
- $s->{pat}{$id}[2] = 0;
- delete $s->{pat}{$id};
- $_->_srv_unreg($id) for(values %{$s->{lnk}});
- }
-}
-
-
-package Tanja::Session;
-
-use strict;
-use warnings;
-
-
-# Create a new session (usually called by $node->session)
-sub new {
- my($own, $node) = @_;
- return bless({
- node => $node,
- pat => {},
- }, $own);
-}
-
-
-# Register for a pattern
-# Args: [ pattern ], callback
-# Returns: $id, for use with unreg()
-# Callback:
-# Args: [ tuple ], $return_path
-sub reg {
- my($s, $pat, $cb) = @_;
- my $id;
- $id = $s->{node}->_register($pat, sub { ${$_[3]} && $cb->($_[0], $_[1]) });
- $s->{pat}{$id} = 1;
- return $id;
-}
-
-
-# Unregister a single pattern
-sub unreg {
- my($s, $id) = @_;
- $s->{node}->_unregister($id);
- delete $s->{pat}{$id};
-}
-
-
-# For convenience. Same as ->reg(), but unregisters automatically after one call.
-sub reg_once {
- my($s, $pat, $cb) = @_;
- my $id;
- $id = $s->reg($pat, sub { $s->unreg($id); $cb->(@_) });
- return $id;
-}
-
-
-sub send {
- my $s = shift;
- $s->{node}->send(@_);
-}
-
-
-# "Close" the session (simply unregisters all its patterns)
-sub close {
- my $s = shift;
- $s->{node}->_unregister($_) for (keys %{$s->{pat}});
- $s->{pat} = {};
-}
-
-
-package Tanja::ReturnPath;
-
-use strict;
-use warnings;
-
-
-sub _new {
- my($own, $cb) = @_;
- return bless \$cb, $own;
-}
-
-
-sub null {
- my $cb = shift;
- return !$$cb;
-}
-
-
-sub reply {
- my($cb, $t) = @_;
- $$cb && AnyEvent::postpone { $$cb->($t); };
-}
-
-
-sub DESTROY {
- my $cb = shift;
- my $c = $$cb;
- $c && AnyEvent::postpone { $c->() };
- undef $$cb;
-}
-
-
-package Tanja::Link;
-
-use strict;
-use warnings;
-use Errno 'EPIPE';
-use Carp 'croak';
-
-
-# Args: Tanja::Node, %options
-# Options:
-# sync => bool. True to fetch the pattern list of the remote node. (Default true)
-# handle => AnyEvent::Handle. Equivalent to setting write_handle and read_handle to the same thing.
-# write_handle => AnyEvent::Handle. Used for writing.
-# read_handle => AnyEvent::Handle. Used for reading.
-# on_error => $cb->($message). Called when an error occurs, $message = undef for disconnect.
-# on_ready => $cb->(). Called when the link is "ready". (i.e. if you send tuples they will be forwarded)
-# on_write => $cb->(message => args). Called when queueing a (non-handshake) message for sending.
-# on_read => $cb->(message => args). Called when a (non-handshake) message has been received.
-# formats => [ name => 'module', ... ],
-sub new {
- my($own, $node, %o) = @_;
- require AnyEvent::Handle;
- my $s = bless {
- sync => 1,
- formats => [
- json => 'Tanja::Link::JSON',
- ],
- %o,
- node => $node,
- reg => {},
- ret => {},
- lastret => 1,
- lasttup => [],
- fmti => undef,
- fmto => undef,
- }, $own;
-
- $s->{write_handle} ||= $s->{handle};
- $s->{read_handle} ||= $s->{handle};
- croak 'No read or write handle set.' if !$s->{write_handle} || !$s->{read_handle};
-
- # Init all formats and weed out those that can't be used.
- my %fmts;
- my @fmts;
- for (grep $_%2==0, keys @{$s->{formats}}) {
- my($n, $m) = ($s->{formats}[$_], $s->{formats}[$_+1]);
- next if $fmts{$n}++;
- next if !eval "$m->init()";
- push @fmts, $n, $m;
- }
- $s->{formats} = \@fmts;
- croak 'Can\'t communicate without serialization formats.' if !@{$s->{formats}};
-
- $s->{tup} = $s->_tuple;
- $s->{write_handle}->on_error(sub { $s->_err($! == EPIPE ? undef : $_[2]) });
- $s->{read_handle}->on_error(sub { $s->_err($! == EPIPE ? undef : $_[2]) });
-
- # TODO: Better to have a timeout on writes. (Although this buffer limit isn't bad either)
- $s->{write_handle}->wbuf_max(5*1024*1024); # This would be really bad.
- $s->{read_handle}->rbuf_max(5*1024*1024);
- $s->_handshake;
- return $s;
-}
-
-
-sub _cleanup {
- my $s = shift;
- $s->{node}->_unregister($_) for (values %{$s->{pat}});
- $s->{pat} = {};
- delete $s->{node}{lnk}{$s};
-}
-
-sub _err {
- my($s, $m) = @_;
- $s->_cleanup;
- $s->{read_handle}->destroy;
- $s->{write_handle}->destroy;
- $s->{on_error} && $s->{on_error}->($m);
-}
-
-
-sub close {
- $_[0]->_cleanup;
- $_[0]->{write_handle}->push_shutdown;
- $_[0]->{read_handle}->push_shutdown;
-}
-
-
-sub _write {
- my $s = shift;
- $s->{write_handle}->push_write($s->{fmto} => @_);
- $s->{on_write} && $s->{on_write}->(@_);
-}
-
-
-sub _handshake {
- my $s = shift;
-
- # Send our handshake message
- my @own = map $_%2 == 0 ? $s->{formats}[$_] : (), keys @{$s->{formats}};
- my %own = map $_%2 == 0 ? ($s->{formats}[$_], $s->{formats}[$_+1]) : (), keys @{$s->{formats}};
- my $own = join ',', @own;
- $s->{write_handle}->push_write("ver,1.0 seri,$own sero,$own\012");
-
- # Receive handshake from other party
- $s->{read_handle}->push_read(line => "\012" => sub {
- my $ver = ($_[1] =~ /\bver,([^ ]+)\b/) && $1;
- my $seri = ($_[1] =~ /\bseri,([^ ]+)\b/) && $1;
- my $sero = ($_[1] =~ /\bsero,([^ ]+)\b/) && $1;
-
- # Validate
- return $s->_err("Invalid handshake: $_[1]") if !$ver or !$seri or !$sero
- or !grep /^1\./, split /,/, $ver
- or !grep $own{$_}, split /,/, $seri
- or !grep $own{$_}, split /,/, $sero;
-
- # Figure out fmti and fmto
- $s->{fmto} = (map $own{$_}, split /,/, $seri)[0];
- my @sero = split /,/, $sero;
- for my $i (@own) {
- if(grep $_ eq $i, @sero) {
- $s->{fmti} = $own{$i};
- last;
- }
- }
-
- # Handshake complete, start receiving stuff
- $s->{read_handle}->push_read($s->{fmti} => sub { $s->_recv(@_) });
-
- # Setup tuple forwarding
- if($s->{sync}) {
- $s->_write(patternsync => 1);
- } else {
- $s->{pat}{_} = $s->{node}->_register([], $s->{tup});
- $s->{on_ready} && $s->{on_ready}->();
- delete $s->{on_ready};
- }
- });
-}
-
-
-# Generates the subroutine used for registering patterns with the local node.
-sub _tuple {
- my $s = shift;
- sub { # tuple, ret, sa
-
- # Don't route back tuples that we received from this link
- return if $_[2] && ref($_[2]) eq 'Tanja::Link' && $_[2] == $s;
-
- # Only send a tuple once even if there have been multiple
- # registrations. (This relies on the property that the Node does not
- # make a copy of the tuple when dispatching the callbacks.)
- return if $_[0] == $s->{lasttup};
- $s->{lasttup} = $_[0];
-
- # Register return-path, if there is one
- my $id = $_[1]->null ? 0 : $s->{lastret};
- if($id) {
- do { ++$id >= 1<<30 and $id = 1 } while $s->{ret}{$id};
- $s->{ret}{$id} = $_[1];
- $s->{lastret} = $id;
- }
-
- $s->_write(tuple => $id+0, $_[0]);
- }
-}
-
-
-# Someone registered a pattern with the local node
-sub _srv_reg {
- my($s, $id, $pat, $cb) = @_;
- $cb != $s->{tup} && $s->_write(register => $id+0, $pat);
-}
-
-
-# Someone unregistered a pattern with the local node
-sub _srv_unreg {
- my($s, $id) = @_;
- # Note that this also sends out unregister messages for patterns that the
- # connected party just unregistered. This shouldn't be a problem, though.
- $s->_write(unregister => $id+0);
-}
-
-
-sub _recv {
- my($s, $hdl, $cmd, @arg) = @_;
-
- # Not very helpful error message...
- return $s->_err("Invalid message") if !$cmd;
-
- $s->{on_read} && $s->{on_read}->($cmd, @arg);
-
- # patternsync
- if($cmd eq 'patternsync') {
- if($arg[0]) {
- $s->{node}{lnk}{$s} = $s;
- $s->{node}{pat}{$_}[1] != $s->{tup} && $s->_write(register => $_+0, $s->{node}{pat}{$_}[0])
- for (keys %{$s->{node}{pat}});
- $s->_write('regdone');
- } else {
- delete $s->{node}{lnk}{$s};
- }
-
- # register
- } elsif($cmd eq 'register') {
- $s->{pat}{$arg[0]} = $s->{node}->_register($arg[1], $s->{tup}) if $s->{sync};
-
- # regdone
- } elsif($cmd eq 'regdone') {
- $s->{on_ready} && $s->{on_ready}->();
- delete $s->{on_ready};
-
- # unregister
- } elsif($cmd eq 'unregister') {
- if($s->{sync}) {
- $s->{node}->_unregister($s->{pat}{$arg[0]});
- delete $s->{pat}{$arg[0]};
- }
-
- # tuple
- } elsif($cmd eq 'tuple') {
- $s->{node}->send($arg[1], !$arg[0] ? undef : sub {
- $s->_write($_[0]?'reply':'close', $arg[0]+0, $_[0]?$_[0]:());
- }, $s);
-
- # reply
- } elsif($cmd eq 'reply') {
- $s->{ret}{$arg[0]}->reply($arg[1]) if $s->{ret}{$arg[0]};
-
- # close
- } elsif($cmd eq 'close') {
- delete $s->{ret}{$arg[0]};
- }
-
- # and receive next message
- $s->{read_handle}->push_read($s->{fmti} => sub { $s->_recv(@_) });
-}
-
-
-# JSON serialization format. Requires either JSON or JSON::XS
-package Tanja::Link::JSON;
-
-use strict;
-use warnings;
-
-my @num_to_cmd = ('', qw|patternsync register regdone unregister tuple reply close|);
-my %cmd_to_num = map +($num_to_cmd[$_], $_), keys @num_to_cmd;
-my($json, $true, $false);
-
-
-sub init {
- $json = eval {
- require JSON::XS;
- $true = JSON::XS::true();
- $false = JSON::XS::false();
- JSON::XS->new->utf8
- } || eval {
- require JSON;
- $true = JSON::true();
- $false = JSON::false();
- JSON->new->utf8
- };
- !!$json;
-}
-
-sub anyevent_read_type {
- my $cb = $_[1];
- sub {
- $_[0]{rbuf} =~ s/^([^\012]*)\012// or return;
- my $d = eval { $json->decode($1); };
- return $cb->($_[0], undef) || 1 if !$d || ref($d) ne 'ARRAY';
- my $num = shift @$d;
- return $cb->($_[0], undef) || 1 if !$num || $num !~ /^\d+$/ || $num > @num_to_cmd;
- # TODO: argument validation
- $cb->($_[0], $num_to_cmd[$num], @$d);
- 1
- }
-}
-
-
-sub anyevent_write_type {
- my(undef, $cmd, @args) = @_;
- $json->encode([$cmd_to_num{$cmd}, $cmd eq 'patternsync' ? ($args[0]?$true:$false) : @args])."\012";
-}
-
-
-1;
-
-# vim:noet:sw=4:ts=4