From 0a3ebf61cb7ea608f440029c2f138d97fb5a102a Mon Sep 17 00:00:00 2001 From: Yorhel Date: Sat, 18 Jul 2009 15:22:01 +0200 Subject: Rewrote Multi::Anime No shared memory! Asynchronous SQL! yay! --- lib/Multi/Anime.pm | 266 +++++++++++++++++++++-------------------------------- 1 file changed, 104 insertions(+), 162 deletions(-) (limited to 'lib/Multi') diff --git a/lib/Multi/Anime.pm b/lib/Multi/Anime.pm index 1215a4a1..b1d5ed3e 100644 --- a/lib/Multi/Anime.pm +++ b/lib/Multi/Anime.pm @@ -8,7 +8,6 @@ package Multi::Anime; use strict; use warnings; use POE 'Wheel::UDP', 'Filter::Stream'; -use Tie::ShareLite ':lock'; use Socket 'inet_ntoa'; use Time::HiRes 'time'; @@ -26,48 +25,45 @@ sub BANNED () { 555 } sub ANIDB_OUT_OF_SERVICE () { 601 } sub SERVER_BUSY () { 602 } -my @expected_codes = ( TIMEOUT, LOGIN_ACCEPTED, LOGIN_ACCEPTED_NEW_VER, ANIME, NO_SUCH_ANIME, NOT_LOGGED_IN, LOGIN_FIRST, INVALID_SESSION ); +my @handled_codes = ( + TIMEOUT, LOGIN_ACCEPTED, LOGIN_ACCEPTED_NEW_VER, ANIME, NO_SUCH_ANIME, NOT_LOGGED_IN, + LOGIN_FIRST,CLIENT_BANNED, INVALID_SESSION, BANNED, ANIDB_OUT_OF_SERVICE, SERVER_BUSY +); -sub spawn { - # The 'anime' command doesn't actually do anything, it just - # adds IDs to process to the internal queue, which is seperate - # from the global processing queue. - # This module -only- fetches anime information in daemon mode! - # Calling the anime command with an ID as argument will force - # the information to be refreshed. This is not recommended, - # just use 'anime' for normal usage. +sub spawn { my $p = shift; + my %o = @_; + + die "No AniDB user/pass configured!" if !$o{user} || !$o{pass}; + + my $addr = delete($o{PeerAddr}) || 'api.anidb.info'; + $addr = gethostbyname($addr) or die "Couldn't resolve domain"; + $addr = inet_ntoa($addr); + POE::Session->create( package_states => [ - $p => [qw| _start shutdown cmd_anime nextcmd receivepacket updateanime |], + $p => [qw| _start shutdown check_anime fetch_anime nextcmd receivepacket |], ], heap => { - # POE::Wheels::UDP options + # POE::Wheels::UDP options LocalAddr => '0.0.0.0', LocalPort => 9000, - PeerAddr => do { - if(!$Multi::DAEMONIZE) { - my $a = gethostbyname('api.anidb.info'); - die "ERROR: Couldn't resolve domain" if !defined $a; - inet_ntoa($a); - } else { - 0; - } - }, + PeerAddr => $addr, PeerPort => 9000, - # AniDB UDP API options + # AniDB UDP API options client => 'multi', clientver => 1, - # Misc settings + # Misc settings msgdelay => 10, timeout => 30, timeoutdelay => 0.4, # $delay = $msgdelay ^ (1 + $tm*$timeoutdelay) maxtimeoutdelay => 2*3600, # two hours - cachetime => 30*24*3600, # one month + check_delay => 3600, # one hour + cachetime => '1 month', - @_, + %o, w => undef, s => '', # session key, '' = not logged in tm => 0, # number of repeated timeouts @@ -81,79 +77,50 @@ sub spawn { sub _start { $_[KERNEL]->alias_set('anime'); - $_[KERNEL]->call(core => register => qr/^anime(?: ([0-9]+))?$/, 'cmd_anime'); - - # check for anime twice a day - $_[KERNEL]->post(core => addcron => '0 0,12 * * *', 'anime'); - $_[KERNEL]->sig('shutdown' => 'shutdown'); - - if(!$Multi::DAEMONIZE) { - # init the UDP 'connection' - $_[HEAP]{w} = POE::Wheel::UDP->new( - (map { $_ => $_[HEAP]{$_} } qw| LocalAddr LocalPort PeerAddr PeerPort |), - InputEvent => 'receivepacket', - Filter => POE::Filter::Stream->new(), - ); + $_[KERNEL]->sig(shutdown => 'shutdown'); - # start executing commands - $_[KERNEL]->delay(nextcmd => 0); #$_[HEAP]{msgdelay}); - } + # init the UDP 'connection' + $_[HEAP]{w} = POE::Wheel::UDP->new( + (map { $_ => $_[HEAP]{$_} } qw| LocalAddr LocalPort PeerAddr PeerPort |), + InputEvent => 'receivepacket', + Filter => POE::Filter::Stream->new(), + ); + + # look for something to do + $_[KERNEL]->yield('check_anime'); } sub shutdown { undef $_[HEAP]{w}; + $_[KERNEL]->delay('check_anime'); $_[KERNEL]->delay('nextcmd'); $_[KERNEL]->delay('receivepacket'); } -sub cmd_anime { # cmd, arg - my @push; - if(!$_[ARG1]) { - # only animes we have never fetched, or haven't been updated for a month - my $q = $Multi::SQL->prepare(q| - SELECT id - FROM anime - WHERE lastfetch < ? - AND lastfetch <> -1|); - $q->execute(int(time-$_[HEAP]{cachetime})); - push @push, map $_->[0], @{$q->fetchall_arrayref([])}; - $_[KERNEL]->call(core => log => 2, 'All anime info is up-to-date!') if !@push; - } else { - push @push, $_[ARG1]; - } +sub check_anime { + return if $_[HEAP]{aid}; + $_[KERNEL]->delay('check_anime'); + $_[KERNEL]->post(pg => query => 'SELECT id FROM anime WHERE lastfetch IS NULL OR lastfetch < NOW() - ?::interval LIMIT 1', + [ $_[HEAP]{cachetime} ], 'fetch_anime'); +} - if(@push) { - my $s = tie my %s, 'Tie::ShareLite', -key => $VNDB::S{sharedmem_key}, -create => 'yes', -destroy => 'no', -mode => 0666; - $s->lock(LOCK_EX); - my @q = $s{anime} ? @{$s{anime}} : (); - push @q, grep { - my $ia = $_; - !(scalar grep $ia == $_, @q) - } @push; - $s{anime} = \@q; - $s->unlock(); - } - $_[KERNEL]->post(core => finish => $_[ARG0]); +sub fetch_anime { # num, res + # nothing to do, check again later + return $_[KERNEL]->delay('check_anime', $_[HEAP]{check_delay}) if !$_[ARG0] == 0; + + # otherwise, fetch info (if we aren't doing so already) + return if $_[HEAP]{aid}; + $_[HEAP]{aid} = $_[ARG1][0]{id}; + $_[KERNEL]->yield('nextcmd'); } sub nextcmd { - return if $_[HEAP]{lm}; - - my $s = tie my %s, 'Tie::ShareLite', -key => $VNDB::S{sharedmem_key}, -create => 'yes', -destroy => 'no', -mode => 0666; - my @q = $s{anime} ? @{$s{anime}} : (); - undef $s; - - if(!@q) { # nothing to do... - $_[KERNEL]->delay(nextcmd => $_[HEAP]{msgdelay}); - return; - } my %cmd; - - # not logged in, get a session + # not logged in, get a session if(!$_[HEAP]{s}) { %cmd = ( command => 'AUTH', @@ -164,21 +131,19 @@ sub nextcmd { clientver => $_[HEAP]{clientver}, enc => 'UTF-8', ); - $_[KERNEL]->call(core => log => 3, 'Authenticating with AniDB...'); + $_[KERNEL]->call(core => log => 'Authenticating with AniDB...'); } - - # logged in, get anime + # logged in, get anime else { - $_[HEAP]{aid} = $q[0]; %cmd = ( command => 'ANIME', - aid => $q[0], + aid => $_[HEAP]{aid}, acode => 3973121, # aid, ANN id, NFO id, year, type, romaji, kanji ); - $_[KERNEL]->call(core => log => 3, 'Fetching info for a%d', $q[0]); + $_[KERNEL]->call(core => log => 'Fetching info for a%d', $_[HEAP]{aid}); } - # send command + # send command my $cmd = delete $cmd{command}; $cmd{tag} = ++$_[HEAP]{tag}; $cmd{s} = $_[HEAP]{s} if $_[HEAP]{s}; @@ -189,111 +154,88 @@ sub nextcmd { } keys %cmd); $_[HEAP]{w}->put({ payload => [ $cmd ]}); - #$_[KERNEL]->call(core => log => 3, '> %s', $cmd); $_[KERNEL]->delay(receivepacket => $_[HEAP]{timeout}, { payload => [ $_[HEAP]{tag}.' 100 TIMEOUT' ] }); $_[HEAP]{lm} = time; } sub receivepacket { # input, wheelid - $_[KERNEL]->delay('receivepacket'); # disable the timeout + # parse message my @r = split /\n/, $_[ARG0]{payload}[0]; - my $delay = $_[HEAP]{msgdelay}; - my($tag, $code, $msg) = ($1, $2, $3) if $r[0] =~ /^([0-9]+) ([0-9]+) (.+)$/; - if(!grep $_ == $code, @expected_codes) { - $_[KERNEL]->call(core => log => 1, "Received an unexpected reply after %.2fs!\n < %s", - time-$_[HEAP]{lm}, join("\n < ", @r)); - } else { - $_[KERNEL]->call(core => log => 3, 'Received from AniDB after %.2fs: %d %s', - time-$_[HEAP]{lm}, $code, $msg); - } + # log + $_[KERNEL]->call(core => log => 'Received from AniDB after %.2fs: %d %s', + time-$_[HEAP]{lm}, $code, $msg); - # just handle anime data, even if the tag is not correct - if($code == ANIME) { - $_[KERNEL]->yield(updateanime => $_[HEAP]{aid}, $r[1]); - } + # tag incorrect, ignore message + return $_[KERNEL]->call(core => log => 'Ignoring incorrect tag') + if $tag != $_[HEAP]{tag}; - # tag incorrect, ignore message - if($tag != $_[HEAP]{tag}) { - $_[KERNEL]->call(core => log => 3, 'Ignoring incorrect tag') if $code != ANIME; - return; - } + # unhandled code, ignore as well + return $_[KERNEL]->call(core => log => 'Ignoring unhandled code') + if !grep $_ == $code, @handled_codes; + + # at this point, we have a message we can handle, so disable the timeout + $_[KERNEL]->delay('receivepacket'); + $_[HEAP]{lm} = 0; - # try again later + # received a timeout of some sorts, try again later if($code == TIMEOUT || $code == CLIENT_BANNED || $code == BANNED || $code == ANIDB_OUT_OF_SERVICE || $code == SERVER_BUSY) { $_[HEAP]{tm}++; - $delay = $_[HEAP]{msgdelay}**(1 + $_[HEAP]{tm}*$_[HEAP]{timeoutdelay}); + my $delay = $_[HEAP]{msgdelay}**(1 + $_[HEAP]{tm}*$_[HEAP]{timeoutdelay}); $delay = $_[HEAP]{maxtimeoutdelay} if $delay > $_[HEAP]{maxtimeoutdelay}; - $_[KERNEL]->call(core => log => 1, 'Delaying %.0fs.', $delay); + $_[KERNEL]->call(core => log => 'Delaying %.0fs.', $delay); + return $_[KERNEL]->delay(nextcmd => $_[HEAP]{msgdelay}); } - # oops, wrong id - if($code == NO_SUCH_ANIME) { - $_[KERNEL]->yield(updateanime => $_[HEAP]{aid}, 'notfound'); - } + # message wasn't a timeout, reset timeout counter + $_[HEAP]{tm} = 0; - # ok, we have a session now - if($code == LOGIN_ACCEPTED || $code == LOGIN_ACCEPTED_NEW_VER) { - $_[HEAP]{s} = $1 if $msg =~ /^\s*([a-zA-Z0-9]{4,8}) /; - } - - # oops, we should've logged in, get a new session + # our session isn't valid, discard it and call nextcmd to get a new one if($code == NOT_LOGGED_IN || $code == LOGIN_FIRST || $code == INVALID_SESSION) { $_[HEAP]{s} = ''; + return $_[KERNEL]->delay(nextcmd => $_[HEAP]{msgdelay}); } - $_[HEAP]{lm} = $_[HEAP]{aid} = 0; - $_[HEAP]{tm} = 0 if $delay == $_[HEAP]{msgdelay}; - $_[KERNEL]->delay(nextcmd => $delay); -} - - -sub updateanime { # aid, data|'notfound' - # aid, ANN id, NFO id, year, type, romaji, kanji, lastfetch - my @col = $_[ARG1] eq 'notfound' - ? ($_[ARG0], 0, 0, 0, 0, '', '', -1) - : (split(/\|/, $_[ARG1], 7), int time); + # we received a session ID, call nextcmd again to fetch anime info + if($code == LOGIN_ACCEPTED || $code == LOGIN_ACCEPTED_NEW_VER) { + $_[HEAP]{s} = $1 if $msg =~ /^\s*([a-zA-Z0-9]{4,8}) /; + return $_[KERNEL]->delay(nextcmd => $_[HEAP]{msgdelay}); + } - if($col[7] > 0) { + # we now know something about the anime we requested, update DB + if($code == NO_SUCH_ANIME) { + $_[KERNEL]->call(core => log => 'ERROR: No anime found with id = %d', $_[HEAP]{aid}); + $_[KERNEL]->post(pg => do => 'UPDATE anime SET lastfetch = NOW() WHERE id = ?', [ $_[HEAP]{aid} ]); + } else { + # aid, ANN id, NFO id, year, type, romaji, kanji + my @col = split(/\|/, $r[1], 7); for (@col) { $_ =~ s/
/\n/g; $_ =~ s/`/'/g; } + $col[1] = undef if !$col[1]; + $col[2] = undef if !$col[2] || $col[2] =~ /^0,/; $col[3] = $1 if $col[3] =~ /^([0-9]+)/; # remove multi-year stuff - for(0..$#{$VNDB::S{anime_types}}) { - $col[4] = $_ if lc($VNDB::S{anime_types}[$_][1]) eq lc($col[4]); - } - $col[4] = 0 if $col[4] !~ /^[0-9]+$/; - $col[2] = '' if $col[2] =~ /^0,/; + $col[3] = undef if !$col[3]; + $col[4] = (grep lc($VNDB::S{anime_types}[$_]) eq lc($col[4]), 0..$#{$VNDB::S{anime_types}})[0]; + $col[5] = undef if !$col[5]; + $col[6] = undef if !$col[6]; + $_[KERNEL]->post(pg => do => 'UPDATE anime + SET id = ?, ann_id = ?, nfo_id = ?, year = ?, type = ?, + title_romaji = ?,title_kanji = ?, lastfetch = NOW() + WHERE id = ?', + [ @col, $_[HEAP]{aid} ] + ); + $_[KERNEL]->call(core => log => 'Updated anime info for a%d', $_[HEAP]{aid}); + $_[KERNEL]->call(core => log => 'ERROR: a%d doesn\'t have a title or year!', $_[HEAP]{aid}) + if !$col[3] || !$col[5]; } - # try to UPDATE first - my $r = $Multi::SQL->do(q| - UPDATE anime - SET id = ?, ann_id = ?, nfo_id = ?, year = ?, type = ?, - title_romaji = ?, title_kanji = ?, lastfetch = ? - WHERE id = ?|, - undef, @col, $col[0]); - - # fall back to INSERT when nothing was updated - $Multi::SQL->do(q| - INSERT INTO anime - (id, ann_id, nfo_id, year, type, title_romaji, title_kanji, lastfetch) - VALUES (?, ?, ?, ?, ?, ?, ?, ?)|, - undef, @col) if $r < 1; - - # remove from queue - my $s = tie my %s, 'Tie::ShareLite', -key => $VNDB::S{sharedmem_key}, -create => 'yes', -destroy => 'no', -mode => 0666; - $s->lock(LOCK_EX); - my @q = grep $_ != $_[ARG0], ($s{anime} ? @{$s{anime}} : ()); - $s{anime} = \@q; - $s->unlock(); - - $col[7] > 0 - ? $_[KERNEL]->post(core => log => 2, 'Updated anime info for a%d', $col[0]) - : $_[KERNEL]->post(core => log => 1, 'Anime a%d not found!', $col[0]); + # this anime is handled, check for more + $_[HEAP]{aid} = 0; + $_[KERNEL]->delay(check_anime => $_[HEAP]{msgdelay}); } -- cgit v1.2.3