summaryrefslogtreecommitdiff
path: root/Tanja.pm
diff options
context:
space:
mode:
Diffstat (limited to 'Tanja.pm')
-rw-r--r--Tanja.pm48
1 files changed, 40 insertions, 8 deletions
diff --git a/Tanja.pm b/Tanja.pm
index 58b2ac6..fa68c06 100644
--- a/Tanja.pm
+++ b/Tanja.pm
@@ -197,22 +197,41 @@ use Carp 'croak';
# on_ready => $cb->(). Called when the link is "ready". (i.e. if you send tuples they will be forwarded)
# on_write => $cb->(message => args). Called when queueing a (non-handshake) message for sending.
# on_read => $cb->(message => args). Called when a (non-handshake) message has been received.
+# formats => [ name => 'module', ... ],
sub new {
my($own, $serv, %o) = @_;
require AnyEvent::Handle;
- croak 'No JSON module available.' if !Tanja::Link::JSON->init;
my $s = bless {
sync => 1,
+ formats => [
+ json => 'Tanja::Link::JSON',
+ ],
%o,
serv => $serv,
reg => {},
ret => {},
lastret => 1,
lasttup => [],
+ fmti => undef,
+ fmto => undef,
}, $own;
+
$s->{write_handle} ||= $s->{handle};
$s->{read_handle} ||= $s->{handle};
croak 'No read or write handle set.' if !$s->{write_handle} || !$s->{read_handle};
+
+ # Init all formats and weed out those that can't be used.
+ my %fmts;
+ my @fmts;
+ for (grep $_%2==0, keys @{$s->{formats}}) {
+ my($n, $m) = ($s->{formats}[$_], $s->{formats}[$_+1]);
+ next if $fmts{$n}++;
+ next if !eval "$m->init()";
+ push @fmts, $n, $m;
+ }
+ $s->{formats} = \@fmts;
+ croak 'Can\'t communicate without serialization formats.' if !@{$s->{formats}};
+
$s->{tup} = $s->_tuple;
$s->{write_handle}->on_error(sub { $s->_err($! == EPIPE ? undef : $_[2]) });
$s->{read_handle}->on_error(sub { $s->_err($! == EPIPE ? undef : $_[2]) });
@@ -250,7 +269,7 @@ sub close {
sub _write {
my $s = shift;
- $s->{write_handle}->push_write('Tanja::Link::JSON' => @_);
+ $s->{write_handle}->push_write($s->{fmto} => @_);
$s->{on_write} && $s->{on_write}->(@_);
}
@@ -259,7 +278,10 @@ sub _handshake {
my $s = shift;
# Send our handshake message
- $s->{write_handle}->push_write("ver,1.0 seri,json sero,json\012");
+ my @own = map $_%2 == 0 ? $s->{formats}[$_] : (), keys @{$s->{formats}};
+ my %own = map $_%2 == 0 ? ($s->{formats}[$_], $s->{formats}[$_+1]) : (), keys @{$s->{formats}};
+ my $own = join ',', @own;
+ $s->{write_handle}->push_write("ver,1.0 seri,$own sero,$own\012");
# Receive handshake from other party
$s->{read_handle}->push_read(line => "\012" => sub {
@@ -268,13 +290,23 @@ sub _handshake {
my $sero = ($_[1] =~ /\bsero,([^ ]+)\b/) && $1;
# Validate
- $s->_err("Invalid handshake: $_[1]") if !$ver or !$seri or !$sero
+ return $s->_err("Invalid handshake: $_[1]") if !$ver or !$seri or !$sero
or !grep /^1\./, split /,/, $ver
- or !grep $_ eq 'json', split /,/, $seri
- or !grep $_ eq 'json', split /,/, $sero;
+ or !grep $own{$_}, split /,/, $seri
+ or !grep $own{$_}, split /,/, $sero;
+
+ # Figure out fmti and fmto
+ $s->{fmto} = (map $own{$_}, split /,/, $seri)[0];
+ my @sero = split /,/, $sero;
+ for my $i (@own) {
+ if(grep $_ eq $i, @sero) {
+ $s->{fmti} = $own{$i};
+ last;
+ }
+ }
# Handshake complete, start receiving stuff
- $s->{read_handle}->push_read('Tanja::Link::JSON' => sub { $s->_recv(@_) });
+ $s->{read_handle}->push_read($s->{fmti} => sub { $s->_recv(@_) });
# Setup tuple forwarding
if($s->{sync}) {
@@ -382,7 +414,7 @@ sub _recv {
}
# and receive next message
- $s->{read_handle}->push_read('Tanja::Link::JSON' => sub { $s->_recv(@_) });
+ $s->{read_handle}->push_read($s->{fmti} => sub { $s->_recv(@_) });
}