summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorYorhel <git@yorhel.nl>2014-10-24 13:58:35 +0200
committerYorhel <git@yorhel.nl>2014-10-24 13:58:35 +0200
commit6619ab4d30627be5da3591b1c1fe81b684d1d8c8 (patch)
tree59fcaa6102cca78cf691d5695c4d62ab86b7e9d6 /lib
parent46ce00029e3c6f00f801ec0dc9be84db864df6df (diff)
Multi::Anime: Converted to use AnyEvent
Diffstat (limited to 'lib')
-rw-r--r--lib/Multi/Anime.pm348
1 files changed, 176 insertions, 172 deletions
diff --git a/lib/Multi/Anime.pm b/lib/Multi/Anime.pm
index c31e7e8c..13d3b6f1 100644
--- a/lib/Multi/Anime.pm
+++ b/lib/Multi/Anime.pm
@@ -7,12 +7,11 @@ package Multi::Anime;
use strict;
use warnings;
-use POE 'Wheel::UDP', 'Filter::Stream';
-use Socket 'inet_ntoa';
-use Time::HiRes 'time';
+use Multi::Core;
+use AnyEvent::Socket;
+use AnyEvent::Util;
-sub TIMEOUT () { 100 } # not part of the API
sub LOGIN_ACCEPTED () { 200 }
sub LOGIN_ACCEPTED_NEW_VER () { 201 }
sub ANIME () { 230 }
@@ -26,230 +25,235 @@ sub ANIDB_OUT_OF_SERVICE () { 601 }
sub SERVER_BUSY () { 602 }
my @handled_codes = (
- TIMEOUT, LOGIN_ACCEPTED, LOGIN_ACCEPTED_NEW_VER, ANIME, NO_SUCH_ANIME, NOT_LOGGED_IN,
+ LOGIN_ACCEPTED, LOGIN_ACCEPTED_NEW_VER, ANIME, NO_SUCH_ANIME, NOT_LOGGED_IN,
LOGIN_FIRST,CLIENT_BANNED, INVALID_SESSION, BANNED, ANIDB_OUT_OF_SERVICE, SERVER_BUSY
);
+my %O = (
+ apihost => 'api.anidb.net',
+ apiport => 9000,
+ # AniDB UDP API options
+ client => 'multi',
+ clientver => 1,
+ # Misc settings
+ msgdelay => 10,
+ timeout => 30,
+ timeoutdelay => 0.4, # $delay = $msgdelay ** (1 + $tm*$timeoutdelay)
+ maxtimeoutdelay => 2*3600,
+ check_delay => 3600,
+ cachetime => '3 months',
+);
-sub spawn {
- my $p = shift;
- my %o = @_;
-
- die "No AniDB user/pass configured!" if !$o{user} || !$o{pass};
-
- my $addr = delete($o{PeerAddr}) || 'api.anidb.info';
- $addr = gethostbyname($addr) or die "Couldn't resolve domain";
- $addr = inet_ntoa($addr);
-
- POE::Session->create(
- package_states => [
- $p => [qw| _start shutdown check_anime fetch_anime nextcmd receivepacket |],
- ],
- heap => {
- # POE::Wheels::UDP options
- LocalAddr => '0.0.0.0',
- LocalPort => 9000,
- PeerAddr => $addr,
- PeerPort => 9000,
- # AniDB UDP API options
- client => 'multi',
- clientver => 1,
- # Misc settings
- msgdelay => 10,
- timeout => 30,
- timeoutdelay => 0.4, # $delay = $msgdelay ^ (1 + $tm*$timeoutdelay)
- maxtimeoutdelay => 2*3600, # two hours
- check_delay => 3600, # one hour
- cachetime => '1 month',
-
- %o,
- w => undef,
- s => '', # session key, '' = not logged in
- tm => 0, # number of repeated timeouts
- lm => 0, # timestamp of last outgoing message, 0=no running msg
- aid => 0, # anime ID of the last sent ANIME command
- tag => int(rand()*50000),
- # anime types as returned by AniDB (lowercased)
- anime_types => {
- 'unknown' => undef, # NULL
- 'tv series' => 'tv',
- 'ova' => 'ova',
- 'movie' => 'mov',
- 'other' => 'oth',
- 'web' => 'web',
- 'tv special' => 'spe',
- 'music video' => 'mv',
- },
- },
- );
-}
+my %C = (
+ sock => undef,
+ tw => undef,# timer guard
+ s => '', # session key, '' = not logged in
+ tm => 0, # number of repeated timeouts
+ lm => 0, # timestamp of last outgoing message
+ aid => 0, # anime ID of the last sent ANIME command
+ tag => int(rand()*50000),
+ # anime types as returned by AniDB (lowercased)
+ anime_types => {
+ 'unknown' => undef, # NULL
+ 'tv series' => 'tv',
+ 'ova' => 'ova',
+ 'movie' => 'mov',
+ 'other' => 'oth',
+ 'web' => 'web',
+ 'tv special' => 'spe',
+ 'music video' => 'mv',
+ },
+);
-sub _start {
- $_[KERNEL]->alias_set('anime');
- $_[KERNEL]->sig(shutdown => 'shutdown');
- # listen for 'anime' notifies
- $_[KERNEL]->post(pg => listen => anime => 'check_anime');
+sub run {
+ shift;
+ %O = (%O, @_);
+ die "No AniDB user/pass configured!" if !$O{user} || !$O{pass};
- # init the UDP 'connection'
- $_[HEAP]{w} = POE::Wheel::UDP->new(
- (map { $_ => $_[HEAP]{$_} } qw| LocalAddr LocalPort PeerAddr PeerPort |),
- InputEvent => 'receivepacket',
- Filter => POE::Filter::Stream->new(),
- );
+ AnyEvent::Socket::resolve_sockaddr $O{apihost}, $O{apiport}, 'udp', 0, undef, sub {
+ my($fam, $type, $proto, $saddr) = @{$_[0]};
+ socket $C{sock}, $fam, $type, $proto or die "Can't create UDP socket: $!";
+ connect $C{sock}, $saddr or die "Can't connect() UDP socket: $!";
+ fh_nonblocking $C{sock}, 1;
- # look for something to do
- $_[KERNEL]->yield('check_anime');
-}
+ my($p, $h) = AnyEvent::Socket::unpack_sockaddr($saddr);
+ AE::log info => sprintf "AniDB API client started, communicating with %s:%d", format_address($h), $p;
+ push_watcher pg->listen(anime => on_notify => \&check_anime);
+ push_watcher schedule 0, $O{check_delay}, \&check_anime;
+ push_watcher AE::io $C{sock}, 0, \&receivemsg;
-sub shutdown {
- undef $_[HEAP]{w};
- $_[KERNEL]->post(pg => unlisten => 'anime');
- $_[KERNEL]->delay('check_anime');
- $_[KERNEL]->delay('nextcmd');
- $_[KERNEL]->delay('receivepacket');
- $_[KERNEL]->alias_remove('anime');
+ check_anime();
+ };
}
-sub check_anime {
- return if $_[HEAP]{aid};
- $_[KERNEL]->delay('check_anime');
- $_[KERNEL]->post(pg => query => 'SELECT id FROM anime WHERE lastfetch IS NULL OR lastfetch < NOW() - ?::interval LIMIT 1',
- [ $_[HEAP]{cachetime} ], 'fetch_anime');
+sub unload {
+ undef $C{tw};
}
-sub fetch_anime { # num, res
- # nothing to do, check again later
- return $_[KERNEL]->delay('check_anime', $_[HEAP]{check_delay}) if $_[ARG0] == 0;
-
- # otherwise, fetch info (if we aren't doing so already)
- return if $_[HEAP]{aid};
- $_[HEAP]{aid} = $_[ARG1][0]{id};
- $_[KERNEL]->yield('nextcmd');
+sub check_anime {
+ return if $C{aid};
+ pg_cmd 'SELECT id FROM anime WHERE lastfetch IS NULL OR lastfetch < NOW() - $1::interval LIMIT 1', [ $O{cachetime} ], sub {
+ my $res = shift;
+ return if pg_expect $res, 1 or $C{aid} or !$res->rows;
+ $C{aid} = $res->value(0,0);
+ nextcmd();
+ };
}
sub nextcmd {
- my %cmd;
- # not logged in, get a session
- if(!$_[HEAP]{s}) {
- %cmd = (
+ return if $C{tw}; # don't send a command if we're waiting for a reply or timeout.
+ return if !$C{aid}; # don't send a command if we've got nothing to fetch...
+
+ my %cmd = !$C{s} ?
+ ( # not logged in, get a session
command => 'AUTH',
- user => $_[HEAP]{user},
- pass => $_[HEAP]{pass},
+ user => $O{user},
+ pass => $O{pass},
protover => 3,
- client => $_[HEAP]{client},
- clientver => $_[HEAP]{clientver},
+ client => $O{client},
+ clientver => $O{clientver},
enc => 'UTF-8',
- );
- }
- # logged in, get anime
- else {
- %cmd = (
+ ) : ( # logged in, get anime
command => 'ANIME',
- aid => $_[HEAP]{aid},
+ aid => $C{aid},
acode => 3973121, # aid, ANN id, NFO id, year, type, romaji, kanji
);
- }
- # send command
+ # XXX: We don't have a writability watcher, but since we're only ever sending
+ # out one packet at a time, I assume (or rather, hope) that the kernel buffer
+ # always has space for it. If not, the timeout code will retry the command
+ # anyway.
+ my $cmd = fmtcmd(%cmd);
+ AE::log debug => "Sending command: $cmd";
+ my $n = syswrite $C{sock}, fmtcmd(%cmd);
+ AE::log warn => sprintf "Didn't write command: only sent %d of %d bytes: %s", $n, length($cmd), $! if $n != length($cmd);
+
+ $C{tw} = AE::timer $O{timeout}, 0, \&handletimeout;
+ $C{lm} = AE::now;
+}
+
+
+sub fmtcmd {
+ my %cmd = @_;
my $cmd = delete $cmd{command};
- $cmd{tag} = ++$_[HEAP]{tag};
- $cmd{s} = $_[HEAP]{s} if $_[HEAP]{s};
- $cmd .= ' '.join('&', map {
+ $cmd{tag} = ++$C{tag};
+ $cmd{s} = $C{s} if $C{s};
+ return $cmd.' '.join('&', map {
$cmd{$_} =~ s/&/&amp;/g;
$cmd{$_} =~ s/\r?\n/<br \/>/g;
$_.'='.$cmd{$_}
} keys %cmd);
- $_[HEAP]{w}->put({ payload => [ $cmd ]});
-
- $_[KERNEL]->delay(receivepacket => $_[HEAP]{timeout}, { payload => [ $_[HEAP]{tag}.' 100 TIMEOUT' ] });
- $_[HEAP]{lm} = time;
}
-sub receivepacket { # input, wheelid
- # parse message
- my @r = split /\n/, $_[ARG0]{payload}[0];
- my($tag, $code, $msg) = ($1, $2, $3) if $r[0] =~ /^([0-9]+) ([0-9]+) (.+)$/;
- my $time = time-$_[HEAP]{lm};
+sub receivemsg {
+ my $buf = '';
+ my $n = sysread $C{sock}, $buf, 4096;
+ return AE::log warn => "sysread() failed: $!" if $n < 0;
- # tag incorrect, ignore message
- return $_[KERNEL]->call(core => log => 'Ignoring incorrect tag of message: %s', $r[0])
- if !$tag || $tag != $_[HEAP]{tag};
+ my $time = AE::now-$C{lm};
+ AE::log debug => sprintf "Received message in %.2fs: %s", $time, $buf;
- # unhandled code, ignore as well
- return $_[KERNEL]->call(core => log => 'Ignoring unhandled code %d (%s)', $code, $msg)
+ my @r = split /\n/, $buf;
+ my($tag, $code, $msg) = ($1, $2, $3) if $r[0] =~ /^([0-9]+) ([0-9]+) (.+)$/;
+
+ return AE::log warn => "Ignoring message due to incorrect tag: $buf"
+ if !$tag || $tag != $C{tag};
+ return AE::log warn => "Ignoring message with unknown code: $buf"
if !grep $_ == $code, @handled_codes;
- # at this point, we have a message we can handle, so disable the timeout
- $_[KERNEL]->delay('receivepacket');
- $_[HEAP]{lm} = 0;
-
- # received a timeout of some sorts, try again later
- if($code == TIMEOUT || $code == CLIENT_BANNED || $code == BANNED || $code == ANIDB_OUT_OF_SERVICE || $code == SERVER_BUSY) {
- $_[HEAP]{tm}++;
- my $delay = $_[HEAP]{msgdelay}**(1 + $_[HEAP]{tm}*$_[HEAP]{timeoutdelay});
- $delay = $_[HEAP]{maxtimeoutdelay} if $delay > $_[HEAP]{maxtimeoutdelay};
- $_[KERNEL]->call(core => log => 'Reply timed out, delaying %.0fs.', $delay);
- return $_[KERNEL]->delay(nextcmd => $delay);
+ # Now we have a message we can handle, reset timer
+ undef $C{tw};
+
+ # Consider some codes to be equivalent to a timeout
+ if($code == CLIENT_BANNED || $code == BANNED || $code == ANIDB_OUT_OF_SERVICE || $code == SERVER_BUSY) {
+ # Might want to look into these...
+ AE::log warn => "AniDB doesn't seem to like me: $buf" if $code == CLIENT_BANNED || $code == BANNED;
+ handletimeout();
+ return;
}
- # message wasn't a timeout, reset timeout counter
- $_[HEAP]{tm} = 0;
+ handlemsg($tag, $code, $msg, @r);
+}
+
+
+sub handlemsg {
+ my($tag, $code, $msg, @r) = @_;
+ my $f;
# our session isn't valid, discard it and call nextcmd to get a new one
if($code == NOT_LOGGED_IN || $code == LOGIN_FIRST || $code == INVALID_SESSION) {
- $_[HEAP]{s} = '';
- $_[KERNEL]->call(core => log => 'Our session was invalid, logging in again...');
- return $_[KERNEL]->delay(nextcmd => $_[HEAP]{msgdelay});
+ $C{s} = '';
+ $f = \&nextcmd;
+ AE::log info => 'Our session was invalid, logging in again...';
}
# we received a session ID, call nextcmd again to fetch anime info
- if($code == LOGIN_ACCEPTED || $code == LOGIN_ACCEPTED_NEW_VER) {
- $_[HEAP]{s} = $1 if $msg =~ /^\s*([a-zA-Z0-9]{4,8}) /;
- $_[KERNEL]->call(core => log => 'Successfully logged in to AniDB in %.2fs.', $time);
- return $_[KERNEL]->delay(nextcmd => $_[HEAP]{msgdelay});
+ elsif($code == LOGIN_ACCEPTED || $code == LOGIN_ACCEPTED_NEW_VER) {
+ $C{s} = $1 if $msg =~ /^\s*([a-zA-Z0-9]{4,8}) /;
+ $f = \&nextcmd;
+ AE::log info => 'Successfully logged in to AniDB.';
}
# we now know something about the anime we requested, update DB
- if($code == NO_SUCH_ANIME) {
- $_[KERNEL]->call(core => log => 'ERROR: No anime found with id = %d', $_[HEAP]{aid});
- $_[KERNEL]->post(pg => do => 'UPDATE anime SET lastfetch = NOW() WHERE id = ?', [ $_[HEAP]{aid} ]);
+ elsif($code == NO_SUCH_ANIME) {
+ AE::log info => "No anime found with id = $C{aid}";
+ pg_cmd 'UPDATE anime SET lastfetch = NOW() WHERE id = ?',
+ [ $C{aid} ], sub { pg_expect $_[0], 0 };
+ $f = \&check_anime;
+ $C{aid} = 0;
+
} else {
- # aid, ANN id, NFO id, year, type, romaji, kanji
- my @col = split(/\|/, $r[1], 7);
- for (@col) {
- $_ =~ s/<br \/>/\n/g;
- $_ =~ s/`/'/g;
- }
- $col[1] = undef if !$col[1];
- $col[2] = undef if !$col[2] || $col[2] =~ /^0,/;
- $col[3] = $col[3] =~ /^([0-9]+)/ ? $1 : undef;
- $col[4] = $_[HEAP]{anime_types}{ lc($col[4]) };
- $col[5] = undef if !$col[5];
- $col[6] = undef if !$col[6];
- $_[KERNEL]->post(pg => do => 'UPDATE anime
- SET id = ?, ann_id = ?, nfo_id = ?, year = ?, type = ?,
- title_romaji = ?,title_kanji = ?, lastfetch = NOW()
- WHERE id = ?',
- [ @col, $_[HEAP]{aid} ]
- );
- $_[KERNEL]->call(core => log => 'Fetched anime info for a%d in %.2fs', $_[HEAP]{aid}, $time);
- $_[KERNEL]->call(core => log => 'ERROR: a%d doesn\'t have a title or year!', $_[HEAP]{aid})
- if !$col[3] || !$col[5];
+ update_anime($r[1]);
+ $f = \&check_anime;
+ $C{aid} = 0;
}
- # this anime is handled, check for more
- $_[HEAP]{aid} = 0;
- $_[KERNEL]->delay(check_anime => $_[HEAP]{msgdelay});
+ $C{tw} = AE::timer $O{msgdelay}, 0, sub { undef $C{tw}; $f->() };
}
-1;
+sub update_anime {
+ my $r = shift;
+
+ # aid, ANN id, NFO id, year, type, romaji, kanji
+ my @col = split(/\|/, $r, 7);
+ for(@col) {
+ $_ =~ s/<br \/>/\n/g;
+ $_ =~ s/`/'/g;
+ }
+ $col[1] = undef if !$col[1];
+ $col[2] = undef if !$col[2] || $col[2] =~ /^0,/;
+ $col[3] = $col[3] =~ /^([0-9]+)/ ? $1 : undef;
+ $col[4] = $O{anime_types}{ lc($col[4]) };
+ $col[5] = undef if !$col[5];
+ $col[6] = undef if !$col[6];
+
+ pg_cmd 'UPDATE anime
+ SET id = $1, ann_id = $2, nfo_id = $3, year = $4, type = $5,
+ title_romaji = $6, title_kanji = $7, lastfetch = NOW()
+ WHERE id = $8',
+ [ @col, $C{aid} ],
+ sub { pg_expect $_[0], 0 };
+ AE::log info => "Fetched anime info for a$C{aid}";
+ AE::log warn => "a$C{aid} doesn't have a title or year!"
+ if !$col[3] || !$col[5];
+}
+
+sub handletimeout {
+ $C{tm}++;
+ my $delay = $O{msgdelay}**(1 + $C{tm}*$O{timeoutdelay});
+ $delay = $O{maxtimeoutdelay} if $delay > $O{maxtimeoutdelay};
+ AE::log info => 'Reply timed out, delaying %.0fs.', $delay;
+ $C{tw} = AE::timer $delay, 0, sub { undef $C{tw}; nextcmd() };
+}
+
+1;