From 6619ab4d30627be5da3591b1c1fe81b684d1d8c8 Mon Sep 17 00:00:00 2001 From: Yorhel Date: Fri, 24 Oct 2014 13:58:35 +0200 Subject: Multi::Anime: Converted to use AnyEvent --- lib/Multi/Anime.pm | 348 +++++++++++++++++++++++++++-------------------------- 1 file changed, 176 insertions(+), 172 deletions(-) (limited to 'lib') diff --git a/lib/Multi/Anime.pm b/lib/Multi/Anime.pm index c31e7e8c..13d3b6f1 100644 --- a/lib/Multi/Anime.pm +++ b/lib/Multi/Anime.pm @@ -7,12 +7,11 @@ package Multi::Anime; use strict; use warnings; -use POE 'Wheel::UDP', 'Filter::Stream'; -use Socket 'inet_ntoa'; -use Time::HiRes 'time'; +use Multi::Core; +use AnyEvent::Socket; +use AnyEvent::Util; -sub TIMEOUT () { 100 } # not part of the API sub LOGIN_ACCEPTED () { 200 } sub LOGIN_ACCEPTED_NEW_VER () { 201 } sub ANIME () { 230 } @@ -26,230 +25,235 @@ sub ANIDB_OUT_OF_SERVICE () { 601 } sub SERVER_BUSY () { 602 } my @handled_codes = ( - TIMEOUT, LOGIN_ACCEPTED, LOGIN_ACCEPTED_NEW_VER, ANIME, NO_SUCH_ANIME, NOT_LOGGED_IN, + LOGIN_ACCEPTED, LOGIN_ACCEPTED_NEW_VER, ANIME, NO_SUCH_ANIME, NOT_LOGGED_IN, LOGIN_FIRST,CLIENT_BANNED, INVALID_SESSION, BANNED, ANIDB_OUT_OF_SERVICE, SERVER_BUSY ); +my %O = ( + apihost => 'api.anidb.net', + apiport => 9000, + # AniDB UDP API options + client => 'multi', + clientver => 1, + # Misc settings + msgdelay => 10, + timeout => 30, + timeoutdelay => 0.4, # $delay = $msgdelay ** (1 + $tm*$timeoutdelay) + maxtimeoutdelay => 2*3600, + check_delay => 3600, + cachetime => '3 months', +); -sub spawn { - my $p = shift; - my %o = @_; - - die "No AniDB user/pass configured!" if !$o{user} || !$o{pass}; - - my $addr = delete($o{PeerAddr}) || 'api.anidb.info'; - $addr = gethostbyname($addr) or die "Couldn't resolve domain"; - $addr = inet_ntoa($addr); - - POE::Session->create( - package_states => [ - $p => [qw| _start shutdown check_anime fetch_anime nextcmd receivepacket |], - ], - heap => { - # POE::Wheels::UDP options - LocalAddr => '0.0.0.0', - LocalPort => 9000, - PeerAddr => $addr, - PeerPort => 9000, - # AniDB UDP API options - client => 'multi', - clientver => 1, - # Misc settings - msgdelay => 10, - timeout => 30, - timeoutdelay => 0.4, # $delay = $msgdelay ^ (1 + $tm*$timeoutdelay) - maxtimeoutdelay => 2*3600, # two hours - check_delay => 3600, # one hour - cachetime => '1 month', - - %o, - w => undef, - s => '', # session key, '' = not logged in - tm => 0, # number of repeated timeouts - lm => 0, # timestamp of last outgoing message, 0=no running msg - aid => 0, # anime ID of the last sent ANIME command - tag => int(rand()*50000), - # anime types as returned by AniDB (lowercased) - anime_types => { - 'unknown' => undef, # NULL - 'tv series' => 'tv', - 'ova' => 'ova', - 'movie' => 'mov', - 'other' => 'oth', - 'web' => 'web', - 'tv special' => 'spe', - 'music video' => 'mv', - }, - }, - ); -} +my %C = ( + sock => undef, + tw => undef,# timer guard + s => '', # session key, '' = not logged in + tm => 0, # number of repeated timeouts + lm => 0, # timestamp of last outgoing message + aid => 0, # anime ID of the last sent ANIME command + tag => int(rand()*50000), + # anime types as returned by AniDB (lowercased) + anime_types => { + 'unknown' => undef, # NULL + 'tv series' => 'tv', + 'ova' => 'ova', + 'movie' => 'mov', + 'other' => 'oth', + 'web' => 'web', + 'tv special' => 'spe', + 'music video' => 'mv', + }, +); -sub _start { - $_[KERNEL]->alias_set('anime'); - $_[KERNEL]->sig(shutdown => 'shutdown'); - # listen for 'anime' notifies - $_[KERNEL]->post(pg => listen => anime => 'check_anime'); +sub run { + shift; + %O = (%O, @_); + die "No AniDB user/pass configured!" if !$O{user} || !$O{pass}; - # init the UDP 'connection' - $_[HEAP]{w} = POE::Wheel::UDP->new( - (map { $_ => $_[HEAP]{$_} } qw| LocalAddr LocalPort PeerAddr PeerPort |), - InputEvent => 'receivepacket', - Filter => POE::Filter::Stream->new(), - ); + AnyEvent::Socket::resolve_sockaddr $O{apihost}, $O{apiport}, 'udp', 0, undef, sub { + my($fam, $type, $proto, $saddr) = @{$_[0]}; + socket $C{sock}, $fam, $type, $proto or die "Can't create UDP socket: $!"; + connect $C{sock}, $saddr or die "Can't connect() UDP socket: $!"; + fh_nonblocking $C{sock}, 1; - # look for something to do - $_[KERNEL]->yield('check_anime'); -} + my($p, $h) = AnyEvent::Socket::unpack_sockaddr($saddr); + AE::log info => sprintf "AniDB API client started, communicating with %s:%d", format_address($h), $p; + push_watcher pg->listen(anime => on_notify => \&check_anime); + push_watcher schedule 0, $O{check_delay}, \&check_anime; + push_watcher AE::io $C{sock}, 0, \&receivemsg; -sub shutdown { - undef $_[HEAP]{w}; - $_[KERNEL]->post(pg => unlisten => 'anime'); - $_[KERNEL]->delay('check_anime'); - $_[KERNEL]->delay('nextcmd'); - $_[KERNEL]->delay('receivepacket'); - $_[KERNEL]->alias_remove('anime'); + check_anime(); + }; } -sub check_anime { - return if $_[HEAP]{aid}; - $_[KERNEL]->delay('check_anime'); - $_[KERNEL]->post(pg => query => 'SELECT id FROM anime WHERE lastfetch IS NULL OR lastfetch < NOW() - ?::interval LIMIT 1', - [ $_[HEAP]{cachetime} ], 'fetch_anime'); +sub unload { + undef $C{tw}; } -sub fetch_anime { # num, res - # nothing to do, check again later - return $_[KERNEL]->delay('check_anime', $_[HEAP]{check_delay}) if $_[ARG0] == 0; - - # otherwise, fetch info (if we aren't doing so already) - return if $_[HEAP]{aid}; - $_[HEAP]{aid} = $_[ARG1][0]{id}; - $_[KERNEL]->yield('nextcmd'); +sub check_anime { + return if $C{aid}; + pg_cmd 'SELECT id FROM anime WHERE lastfetch IS NULL OR lastfetch < NOW() - $1::interval LIMIT 1', [ $O{cachetime} ], sub { + my $res = shift; + return if pg_expect $res, 1 or $C{aid} or !$res->rows; + $C{aid} = $res->value(0,0); + nextcmd(); + }; } sub nextcmd { - my %cmd; - # not logged in, get a session - if(!$_[HEAP]{s}) { - %cmd = ( + return if $C{tw}; # don't send a command if we're waiting for a reply or timeout. + return if !$C{aid}; # don't send a command if we've got nothing to fetch... + + my %cmd = !$C{s} ? + ( # not logged in, get a session command => 'AUTH', - user => $_[HEAP]{user}, - pass => $_[HEAP]{pass}, + user => $O{user}, + pass => $O{pass}, protover => 3, - client => $_[HEAP]{client}, - clientver => $_[HEAP]{clientver}, + client => $O{client}, + clientver => $O{clientver}, enc => 'UTF-8', - ); - } - # logged in, get anime - else { - %cmd = ( + ) : ( # logged in, get anime command => 'ANIME', - aid => $_[HEAP]{aid}, + aid => $C{aid}, acode => 3973121, # aid, ANN id, NFO id, year, type, romaji, kanji ); - } - # send command + # XXX: We don't have a writability watcher, but since we're only ever sending + # out one packet at a time, I assume (or rather, hope) that the kernel buffer + # always has space for it. If not, the timeout code will retry the command + # anyway. + my $cmd = fmtcmd(%cmd); + AE::log debug => "Sending command: $cmd"; + my $n = syswrite $C{sock}, fmtcmd(%cmd); + AE::log warn => sprintf "Didn't write command: only sent %d of %d bytes: %s", $n, length($cmd), $! if $n != length($cmd); + + $C{tw} = AE::timer $O{timeout}, 0, \&handletimeout; + $C{lm} = AE::now; +} + + +sub fmtcmd { + my %cmd = @_; my $cmd = delete $cmd{command}; - $cmd{tag} = ++$_[HEAP]{tag}; - $cmd{s} = $_[HEAP]{s} if $_[HEAP]{s}; - $cmd .= ' '.join('&', map { + $cmd{tag} = ++$C{tag}; + $cmd{s} = $C{s} if $C{s}; + return $cmd.' '.join('&', map { $cmd{$_} =~ s/&/&/g; $cmd{$_} =~ s/\r?\n/
/g; $_.'='.$cmd{$_} } keys %cmd); - $_[HEAP]{w}->put({ payload => [ $cmd ]}); - - $_[KERNEL]->delay(receivepacket => $_[HEAP]{timeout}, { payload => [ $_[HEAP]{tag}.' 100 TIMEOUT' ] }); - $_[HEAP]{lm} = time; } -sub receivepacket { # input, wheelid - # parse message - my @r = split /\n/, $_[ARG0]{payload}[0]; - my($tag, $code, $msg) = ($1, $2, $3) if $r[0] =~ /^([0-9]+) ([0-9]+) (.+)$/; - my $time = time-$_[HEAP]{lm}; +sub receivemsg { + my $buf = ''; + my $n = sysread $C{sock}, $buf, 4096; + return AE::log warn => "sysread() failed: $!" if $n < 0; - # tag incorrect, ignore message - return $_[KERNEL]->call(core => log => 'Ignoring incorrect tag of message: %s', $r[0]) - if !$tag || $tag != $_[HEAP]{tag}; + my $time = AE::now-$C{lm}; + AE::log debug => sprintf "Received message in %.2fs: %s", $time, $buf; - # unhandled code, ignore as well - return $_[KERNEL]->call(core => log => 'Ignoring unhandled code %d (%s)', $code, $msg) + my @r = split /\n/, $buf; + my($tag, $code, $msg) = ($1, $2, $3) if $r[0] =~ /^([0-9]+) ([0-9]+) (.+)$/; + + return AE::log warn => "Ignoring message due to incorrect tag: $buf" + if !$tag || $tag != $C{tag}; + return AE::log warn => "Ignoring message with unknown code: $buf" if !grep $_ == $code, @handled_codes; - # at this point, we have a message we can handle, so disable the timeout - $_[KERNEL]->delay('receivepacket'); - $_[HEAP]{lm} = 0; - - # received a timeout of some sorts, try again later - if($code == TIMEOUT || $code == CLIENT_BANNED || $code == BANNED || $code == ANIDB_OUT_OF_SERVICE || $code == SERVER_BUSY) { - $_[HEAP]{tm}++; - my $delay = $_[HEAP]{msgdelay}**(1 + $_[HEAP]{tm}*$_[HEAP]{timeoutdelay}); - $delay = $_[HEAP]{maxtimeoutdelay} if $delay > $_[HEAP]{maxtimeoutdelay}; - $_[KERNEL]->call(core => log => 'Reply timed out, delaying %.0fs.', $delay); - return $_[KERNEL]->delay(nextcmd => $delay); + # Now we have a message we can handle, reset timer + undef $C{tw}; + + # Consider some codes to be equivalent to a timeout + if($code == CLIENT_BANNED || $code == BANNED || $code == ANIDB_OUT_OF_SERVICE || $code == SERVER_BUSY) { + # Might want to look into these... + AE::log warn => "AniDB doesn't seem to like me: $buf" if $code == CLIENT_BANNED || $code == BANNED; + handletimeout(); + return; } - # message wasn't a timeout, reset timeout counter - $_[HEAP]{tm} = 0; + handlemsg($tag, $code, $msg, @r); +} + + +sub handlemsg { + my($tag, $code, $msg, @r) = @_; + my $f; # our session isn't valid, discard it and call nextcmd to get a new one if($code == NOT_LOGGED_IN || $code == LOGIN_FIRST || $code == INVALID_SESSION) { - $_[HEAP]{s} = ''; - $_[KERNEL]->call(core => log => 'Our session was invalid, logging in again...'); - return $_[KERNEL]->delay(nextcmd => $_[HEAP]{msgdelay}); + $C{s} = ''; + $f = \&nextcmd; + AE::log info => 'Our session was invalid, logging in again...'; } # we received a session ID, call nextcmd again to fetch anime info - if($code == LOGIN_ACCEPTED || $code == LOGIN_ACCEPTED_NEW_VER) { - $_[HEAP]{s} = $1 if $msg =~ /^\s*([a-zA-Z0-9]{4,8}) /; - $_[KERNEL]->call(core => log => 'Successfully logged in to AniDB in %.2fs.', $time); - return $_[KERNEL]->delay(nextcmd => $_[HEAP]{msgdelay}); + elsif($code == LOGIN_ACCEPTED || $code == LOGIN_ACCEPTED_NEW_VER) { + $C{s} = $1 if $msg =~ /^\s*([a-zA-Z0-9]{4,8}) /; + $f = \&nextcmd; + AE::log info => 'Successfully logged in to AniDB.'; } # we now know something about the anime we requested, update DB - if($code == NO_SUCH_ANIME) { - $_[KERNEL]->call(core => log => 'ERROR: No anime found with id = %d', $_[HEAP]{aid}); - $_[KERNEL]->post(pg => do => 'UPDATE anime SET lastfetch = NOW() WHERE id = ?', [ $_[HEAP]{aid} ]); + elsif($code == NO_SUCH_ANIME) { + AE::log info => "No anime found with id = $C{aid}"; + pg_cmd 'UPDATE anime SET lastfetch = NOW() WHERE id = ?', + [ $C{aid} ], sub { pg_expect $_[0], 0 }; + $f = \&check_anime; + $C{aid} = 0; + } else { - # aid, ANN id, NFO id, year, type, romaji, kanji - my @col = split(/\|/, $r[1], 7); - for (@col) { - $_ =~ s/
/\n/g; - $_ =~ s/`/'/g; - } - $col[1] = undef if !$col[1]; - $col[2] = undef if !$col[2] || $col[2] =~ /^0,/; - $col[3] = $col[3] =~ /^([0-9]+)/ ? $1 : undef; - $col[4] = $_[HEAP]{anime_types}{ lc($col[4]) }; - $col[5] = undef if !$col[5]; - $col[6] = undef if !$col[6]; - $_[KERNEL]->post(pg => do => 'UPDATE anime - SET id = ?, ann_id = ?, nfo_id = ?, year = ?, type = ?, - title_romaji = ?,title_kanji = ?, lastfetch = NOW() - WHERE id = ?', - [ @col, $_[HEAP]{aid} ] - ); - $_[KERNEL]->call(core => log => 'Fetched anime info for a%d in %.2fs', $_[HEAP]{aid}, $time); - $_[KERNEL]->call(core => log => 'ERROR: a%d doesn\'t have a title or year!', $_[HEAP]{aid}) - if !$col[3] || !$col[5]; + update_anime($r[1]); + $f = \&check_anime; + $C{aid} = 0; } - # this anime is handled, check for more - $_[HEAP]{aid} = 0; - $_[KERNEL]->delay(check_anime => $_[HEAP]{msgdelay}); + $C{tw} = AE::timer $O{msgdelay}, 0, sub { undef $C{tw}; $f->() }; } -1; +sub update_anime { + my $r = shift; + + # aid, ANN id, NFO id, year, type, romaji, kanji + my @col = split(/\|/, $r, 7); + for(@col) { + $_ =~ s/
/\n/g; + $_ =~ s/`/'/g; + } + $col[1] = undef if !$col[1]; + $col[2] = undef if !$col[2] || $col[2] =~ /^0,/; + $col[3] = $col[3] =~ /^([0-9]+)/ ? $1 : undef; + $col[4] = $O{anime_types}{ lc($col[4]) }; + $col[5] = undef if !$col[5]; + $col[6] = undef if !$col[6]; + + pg_cmd 'UPDATE anime + SET id = $1, ann_id = $2, nfo_id = $3, year = $4, type = $5, + title_romaji = $6, title_kanji = $7, lastfetch = NOW() + WHERE id = $8', + [ @col, $C{aid} ], + sub { pg_expect $_[0], 0 }; + AE::log info => "Fetched anime info for a$C{aid}"; + AE::log warn => "a$C{aid} doesn't have a title or year!" + if !$col[3] || !$col[5]; +} + +sub handletimeout { + $C{tm}++; + my $delay = $O{msgdelay}**(1 + $C{tm}*$O{timeoutdelay}); + $delay = $O{maxtimeoutdelay} if $delay > $O{maxtimeoutdelay}; + AE::log info => 'Reply timed out, delaying %.0fs.', $delay; + $C{tw} = AE::timer $delay, 0, sub { undef $C{tw}; nextcmd() }; +} + +1; -- cgit v1.2.3