summaryrefslogtreecommitdiff
path: root/lib/Multi/Anime.pm
diff options
context:
space:
mode:
authorYorhel <git@yorhel.nl>2009-07-18 15:22:01 +0200
committerYorhel <git@yorhel.nl>2009-07-18 15:22:01 +0200
commit0a3ebf61cb7ea608f440029c2f138d97fb5a102a (patch)
treeb52f3de2204d5dff99dca77efba5f5fcffa6cac2 /lib/Multi/Anime.pm
parent8299ff15d3c1940377cb830c0ac4f3585cf228cf (diff)
Rewrote Multi::Anime
No shared memory! Asynchronous SQL! yay!
Diffstat (limited to 'lib/Multi/Anime.pm')
-rw-r--r--lib/Multi/Anime.pm266
1 files changed, 104 insertions, 162 deletions
diff --git a/lib/Multi/Anime.pm b/lib/Multi/Anime.pm
index 1215a4a1..b1d5ed3e 100644
--- a/lib/Multi/Anime.pm
+++ b/lib/Multi/Anime.pm
@@ -8,7 +8,6 @@ package Multi::Anime;
use strict;
use warnings;
use POE 'Wheel::UDP', 'Filter::Stream';
-use Tie::ShareLite ':lock';
use Socket 'inet_ntoa';
use Time::HiRes 'time';
@@ -26,48 +25,45 @@ sub BANNED () { 555 }
sub ANIDB_OUT_OF_SERVICE () { 601 }
sub SERVER_BUSY () { 602 }
-my @expected_codes = ( TIMEOUT, LOGIN_ACCEPTED, LOGIN_ACCEPTED_NEW_VER, ANIME, NO_SUCH_ANIME, NOT_LOGGED_IN, LOGIN_FIRST, INVALID_SESSION );
+my @handled_codes = (
+ TIMEOUT, LOGIN_ACCEPTED, LOGIN_ACCEPTED_NEW_VER, ANIME, NO_SUCH_ANIME, NOT_LOGGED_IN,
+ LOGIN_FIRST,CLIENT_BANNED, INVALID_SESSION, BANNED, ANIDB_OUT_OF_SERVICE, SERVER_BUSY
+);
-sub spawn {
- # The 'anime' command doesn't actually do anything, it just
- # adds IDs to process to the internal queue, which is seperate
- # from the global processing queue.
- # This module -only- fetches anime information in daemon mode!
- # Calling the anime command with an ID as argument will force
- # the information to be refreshed. This is not recommended,
- # just use 'anime' for normal usage.
+sub spawn {
my $p = shift;
+ my %o = @_;
+
+ die "No AniDB user/pass configured!" if !$o{user} || !$o{pass};
+
+ my $addr = delete($o{PeerAddr}) || 'api.anidb.info';
+ $addr = gethostbyname($addr) or die "Couldn't resolve domain";
+ $addr = inet_ntoa($addr);
+
POE::Session->create(
package_states => [
- $p => [qw| _start shutdown cmd_anime nextcmd receivepacket updateanime |],
+ $p => [qw| _start shutdown check_anime fetch_anime nextcmd receivepacket |],
],
heap => {
- # POE::Wheels::UDP options
+ # POE::Wheels::UDP options
LocalAddr => '0.0.0.0',
LocalPort => 9000,
- PeerAddr => do {
- if(!$Multi::DAEMONIZE) {
- my $a = gethostbyname('api.anidb.info');
- die "ERROR: Couldn't resolve domain" if !defined $a;
- inet_ntoa($a);
- } else {
- 0;
- }
- },
+ PeerAddr => $addr,
PeerPort => 9000,
- # AniDB UDP API options
+ # AniDB UDP API options
client => 'multi',
clientver => 1,
- # Misc settings
+ # Misc settings
msgdelay => 10,
timeout => 30,
timeoutdelay => 0.4, # $delay = $msgdelay ^ (1 + $tm*$timeoutdelay)
maxtimeoutdelay => 2*3600, # two hours
- cachetime => 30*24*3600, # one month
+ check_delay => 3600, # one hour
+ cachetime => '1 month',
- @_,
+ %o,
w => undef,
s => '', # session key, '' = not logged in
tm => 0, # number of repeated timeouts
@@ -81,79 +77,50 @@ sub spawn {
sub _start {
$_[KERNEL]->alias_set('anime');
- $_[KERNEL]->call(core => register => qr/^anime(?: ([0-9]+))?$/, 'cmd_anime');
-
- # check for anime twice a day
- $_[KERNEL]->post(core => addcron => '0 0,12 * * *', 'anime');
- $_[KERNEL]->sig('shutdown' => 'shutdown');
-
- if(!$Multi::DAEMONIZE) {
- # init the UDP 'connection'
- $_[HEAP]{w} = POE::Wheel::UDP->new(
- (map { $_ => $_[HEAP]{$_} } qw| LocalAddr LocalPort PeerAddr PeerPort |),
- InputEvent => 'receivepacket',
- Filter => POE::Filter::Stream->new(),
- );
+ $_[KERNEL]->sig(shutdown => 'shutdown');
- # start executing commands
- $_[KERNEL]->delay(nextcmd => 0); #$_[HEAP]{msgdelay});
- }
+ # init the UDP 'connection'
+ $_[HEAP]{w} = POE::Wheel::UDP->new(
+ (map { $_ => $_[HEAP]{$_} } qw| LocalAddr LocalPort PeerAddr PeerPort |),
+ InputEvent => 'receivepacket',
+ Filter => POE::Filter::Stream->new(),
+ );
+
+ # look for something to do
+ $_[KERNEL]->yield('check_anime');
}
sub shutdown {
undef $_[HEAP]{w};
+ $_[KERNEL]->delay('check_anime');
$_[KERNEL]->delay('nextcmd');
$_[KERNEL]->delay('receivepacket');
}
-sub cmd_anime { # cmd, arg
- my @push;
- if(!$_[ARG1]) {
- # only animes we have never fetched, or haven't been updated for a month
- my $q = $Multi::SQL->prepare(q|
- SELECT id
- FROM anime
- WHERE lastfetch < ?
- AND lastfetch <> -1|);
- $q->execute(int(time-$_[HEAP]{cachetime}));
- push @push, map $_->[0], @{$q->fetchall_arrayref([])};
- $_[KERNEL]->call(core => log => 2, 'All anime info is up-to-date!') if !@push;
- } else {
- push @push, $_[ARG1];
- }
+sub check_anime {
+ return if $_[HEAP]{aid};
+ $_[KERNEL]->delay('check_anime');
+ $_[KERNEL]->post(pg => query => 'SELECT id FROM anime WHERE lastfetch IS NULL OR lastfetch < NOW() - ?::interval LIMIT 1',
+ [ $_[HEAP]{cachetime} ], 'fetch_anime');
+}
- if(@push) {
- my $s = tie my %s, 'Tie::ShareLite', -key => $VNDB::S{sharedmem_key}, -create => 'yes', -destroy => 'no', -mode => 0666;
- $s->lock(LOCK_EX);
- my @q = $s{anime} ? @{$s{anime}} : ();
- push @q, grep {
- my $ia = $_;
- !(scalar grep $ia == $_, @q)
- } @push;
- $s{anime} = \@q;
- $s->unlock();
- }
- $_[KERNEL]->post(core => finish => $_[ARG0]);
+sub fetch_anime { # num, res
+ # nothing to do, check again later
+ return $_[KERNEL]->delay('check_anime', $_[HEAP]{check_delay}) if !$_[ARG0] == 0;
+
+ # otherwise, fetch info (if we aren't doing so already)
+ return if $_[HEAP]{aid};
+ $_[HEAP]{aid} = $_[ARG1][0]{id};
+ $_[KERNEL]->yield('nextcmd');
}
sub nextcmd {
- return if $_[HEAP]{lm};
-
- my $s = tie my %s, 'Tie::ShareLite', -key => $VNDB::S{sharedmem_key}, -create => 'yes', -destroy => 'no', -mode => 0666;
- my @q = $s{anime} ? @{$s{anime}} : ();
- undef $s;
-
- if(!@q) { # nothing to do...
- $_[KERNEL]->delay(nextcmd => $_[HEAP]{msgdelay});
- return;
- }
my %cmd;
-
- # not logged in, get a session
+ # not logged in, get a session
if(!$_[HEAP]{s}) {
%cmd = (
command => 'AUTH',
@@ -164,21 +131,19 @@ sub nextcmd {
clientver => $_[HEAP]{clientver},
enc => 'UTF-8',
);
- $_[KERNEL]->call(core => log => 3, 'Authenticating with AniDB...');
+ $_[KERNEL]->call(core => log => 'Authenticating with AniDB...');
}
-
- # logged in, get anime
+ # logged in, get anime
else {
- $_[HEAP]{aid} = $q[0];
%cmd = (
command => 'ANIME',
- aid => $q[0],
+ aid => $_[HEAP]{aid},
acode => 3973121, # aid, ANN id, NFO id, year, type, romaji, kanji
);
- $_[KERNEL]->call(core => log => 3, 'Fetching info for a%d', $q[0]);
+ $_[KERNEL]->call(core => log => 'Fetching info for a%d', $_[HEAP]{aid});
}
- # send command
+ # send command
my $cmd = delete $cmd{command};
$cmd{tag} = ++$_[HEAP]{tag};
$cmd{s} = $_[HEAP]{s} if $_[HEAP]{s};
@@ -189,111 +154,88 @@ sub nextcmd {
} keys %cmd);
$_[HEAP]{w}->put({ payload => [ $cmd ]});
- #$_[KERNEL]->call(core => log => 3, '> %s', $cmd);
$_[KERNEL]->delay(receivepacket => $_[HEAP]{timeout}, { payload => [ $_[HEAP]{tag}.' 100 TIMEOUT' ] });
$_[HEAP]{lm} = time;
}
sub receivepacket { # input, wheelid
- $_[KERNEL]->delay('receivepacket'); # disable the timeout
+ # parse message
my @r = split /\n/, $_[ARG0]{payload}[0];
- my $delay = $_[HEAP]{msgdelay};
-
my($tag, $code, $msg) = ($1, $2, $3) if $r[0] =~ /^([0-9]+) ([0-9]+) (.+)$/;
- if(!grep $_ == $code, @expected_codes) {
- $_[KERNEL]->call(core => log => 1, "Received an unexpected reply after %.2fs!\n < %s",
- time-$_[HEAP]{lm}, join("\n < ", @r));
- } else {
- $_[KERNEL]->call(core => log => 3, 'Received from AniDB after %.2fs: %d %s',
- time-$_[HEAP]{lm}, $code, $msg);
- }
+ # log
+ $_[KERNEL]->call(core => log => 'Received from AniDB after %.2fs: %d %s',
+ time-$_[HEAP]{lm}, $code, $msg);
- # just handle anime data, even if the tag is not correct
- if($code == ANIME) {
- $_[KERNEL]->yield(updateanime => $_[HEAP]{aid}, $r[1]);
- }
+ # tag incorrect, ignore message
+ return $_[KERNEL]->call(core => log => 'Ignoring incorrect tag')
+ if $tag != $_[HEAP]{tag};
- # tag incorrect, ignore message
- if($tag != $_[HEAP]{tag}) {
- $_[KERNEL]->call(core => log => 3, 'Ignoring incorrect tag') if $code != ANIME;
- return;
- }
+ # unhandled code, ignore as well
+ return $_[KERNEL]->call(core => log => 'Ignoring unhandled code')
+ if !grep $_ == $code, @handled_codes;
+
+ # at this point, we have a message we can handle, so disable the timeout
+ $_[KERNEL]->delay('receivepacket');
+ $_[HEAP]{lm} = 0;
- # try again later
+ # received a timeout of some sorts, try again later
if($code == TIMEOUT || $code == CLIENT_BANNED || $code == BANNED || $code == ANIDB_OUT_OF_SERVICE || $code == SERVER_BUSY) {
$_[HEAP]{tm}++;
- $delay = $_[HEAP]{msgdelay}**(1 + $_[HEAP]{tm}*$_[HEAP]{timeoutdelay});
+ my $delay = $_[HEAP]{msgdelay}**(1 + $_[HEAP]{tm}*$_[HEAP]{timeoutdelay});
$delay = $_[HEAP]{maxtimeoutdelay} if $delay > $_[HEAP]{maxtimeoutdelay};
- $_[KERNEL]->call(core => log => 1, 'Delaying %.0fs.', $delay);
+ $_[KERNEL]->call(core => log => 'Delaying %.0fs.', $delay);
+ return $_[KERNEL]->delay(nextcmd => $_[HEAP]{msgdelay});
}
- # oops, wrong id
- if($code == NO_SUCH_ANIME) {
- $_[KERNEL]->yield(updateanime => $_[HEAP]{aid}, 'notfound');
- }
+ # message wasn't a timeout, reset timeout counter
+ $_[HEAP]{tm} = 0;
- # ok, we have a session now
- if($code == LOGIN_ACCEPTED || $code == LOGIN_ACCEPTED_NEW_VER) {
- $_[HEAP]{s} = $1 if $msg =~ /^\s*([a-zA-Z0-9]{4,8}) /;
- }
-
- # oops, we should've logged in, get a new session
+ # our session isn't valid, discard it and call nextcmd to get a new one
if($code == NOT_LOGGED_IN || $code == LOGIN_FIRST || $code == INVALID_SESSION) {
$_[HEAP]{s} = '';
+ return $_[KERNEL]->delay(nextcmd => $_[HEAP]{msgdelay});
}
- $_[HEAP]{lm} = $_[HEAP]{aid} = 0;
- $_[HEAP]{tm} = 0 if $delay == $_[HEAP]{msgdelay};
- $_[KERNEL]->delay(nextcmd => $delay);
-}
-
-
-sub updateanime { # aid, data|'notfound'
- # aid, ANN id, NFO id, year, type, romaji, kanji, lastfetch
- my @col = $_[ARG1] eq 'notfound'
- ? ($_[ARG0], 0, 0, 0, 0, '', '', -1)
- : (split(/\|/, $_[ARG1], 7), int time);
+ # we received a session ID, call nextcmd again to fetch anime info
+ if($code == LOGIN_ACCEPTED || $code == LOGIN_ACCEPTED_NEW_VER) {
+ $_[HEAP]{s} = $1 if $msg =~ /^\s*([a-zA-Z0-9]{4,8}) /;
+ return $_[KERNEL]->delay(nextcmd => $_[HEAP]{msgdelay});
+ }
- if($col[7] > 0) {
+ # we now know something about the anime we requested, update DB
+ if($code == NO_SUCH_ANIME) {
+ $_[KERNEL]->call(core => log => 'ERROR: No anime found with id = %d', $_[HEAP]{aid});
+ $_[KERNEL]->post(pg => do => 'UPDATE anime SET lastfetch = NOW() WHERE id = ?', [ $_[HEAP]{aid} ]);
+ } else {
+ # aid, ANN id, NFO id, year, type, romaji, kanji
+ my @col = split(/\|/, $r[1], 7);
for (@col) {
$_ =~ s/<br \/>/\n/g;
$_ =~ s/`/'/g;
}
+ $col[1] = undef if !$col[1];
+ $col[2] = undef if !$col[2] || $col[2] =~ /^0,/;
$col[3] = $1 if $col[3] =~ /^([0-9]+)/; # remove multi-year stuff
- for(0..$#{$VNDB::S{anime_types}}) {
- $col[4] = $_ if lc($VNDB::S{anime_types}[$_][1]) eq lc($col[4]);
- }
- $col[4] = 0 if $col[4] !~ /^[0-9]+$/;
- $col[2] = '' if $col[2] =~ /^0,/;
+ $col[3] = undef if !$col[3];
+ $col[4] = (grep lc($VNDB::S{anime_types}[$_]) eq lc($col[4]), 0..$#{$VNDB::S{anime_types}})[0];
+ $col[5] = undef if !$col[5];
+ $col[6] = undef if !$col[6];
+ $_[KERNEL]->post(pg => do => 'UPDATE anime
+ SET id = ?, ann_id = ?, nfo_id = ?, year = ?, type = ?,
+ title_romaji = ?,title_kanji = ?, lastfetch = NOW()
+ WHERE id = ?',
+ [ @col, $_[HEAP]{aid} ]
+ );
+ $_[KERNEL]->call(core => log => 'Updated anime info for a%d', $_[HEAP]{aid});
+ $_[KERNEL]->call(core => log => 'ERROR: a%d doesn\'t have a title or year!', $_[HEAP]{aid})
+ if !$col[3] || !$col[5];
}
- # try to UPDATE first
- my $r = $Multi::SQL->do(q|
- UPDATE anime
- SET id = ?, ann_id = ?, nfo_id = ?, year = ?, type = ?,
- title_romaji = ?, title_kanji = ?, lastfetch = ?
- WHERE id = ?|,
- undef, @col, $col[0]);
-
- # fall back to INSERT when nothing was updated
- $Multi::SQL->do(q|
- INSERT INTO anime
- (id, ann_id, nfo_id, year, type, title_romaji, title_kanji, lastfetch)
- VALUES (?, ?, ?, ?, ?, ?, ?, ?)|,
- undef, @col) if $r < 1;
-
- # remove from queue
- my $s = tie my %s, 'Tie::ShareLite', -key => $VNDB::S{sharedmem_key}, -create => 'yes', -destroy => 'no', -mode => 0666;
- $s->lock(LOCK_EX);
- my @q = grep $_ != $_[ARG0], ($s{anime} ? @{$s{anime}} : ());
- $s{anime} = \@q;
- $s->unlock();
-
- $col[7] > 0
- ? $_[KERNEL]->post(core => log => 2, 'Updated anime info for a%d', $col[0])
- : $_[KERNEL]->post(core => log => 1, 'Anime a%d not found!', $col[0]);
+ # this anime is handled, check for more
+ $_[HEAP]{aid} = 0;
+ $_[KERNEL]->delay(check_anime => $_[HEAP]{msgdelay});
}