diff options
author | yorhel <yorhel@1fe2e327-d9db-4752-bcf7-ef0cb4a1748b> | 2008-04-27 06:41:52 +0000 |
---|---|---|
committer | yorhel <yorhel@1fe2e327-d9db-4752-bcf7-ef0cb4a1748b> | 2008-04-27 06:41:52 +0000 |
commit | 04c26425699a3e01d0e2abe52ad9f763396cd940 (patch) | |
tree | 4923a840ac27da9bc9bc322dec51d0504834dfe5 /lib/Multi | |
parent | a9e444bb39f5c356b66ef247b4539937daa25503 (diff) |
Made Multis queue persistent in shared memory
git-svn-id: svn://vndb.org/vndb@8 1fe2e327-d9db-4752-bcf7-ef0cb4a1748b
Diffstat (limited to 'lib/Multi')
-rw-r--r-- | lib/Multi/Core.pm | 82 | ||||
-rw-r--r-- | lib/Multi/IRC.pm | 2 |
2 files changed, 42 insertions, 42 deletions
diff --git a/lib/Multi/Core.pm b/lib/Multi/Core.pm index 6f3b452c..6f94f860 100644 --- a/lib/Multi/Core.pm +++ b/lib/Multi/Core.pm @@ -8,8 +8,7 @@ package Multi::Core; use strict; use warnings; use POE 'Component::Cron'; -use Storable 'freeze', 'thaw'; -use IPC::ShareLite ':lock'; +use Tie::ShareLite ':lock'; use Time::HiRes 'time', 'gettimeofday', 'tv_interval'; # overload time() use DateTime::Event::Cron; # bug in PoCo::Cron @@ -18,9 +17,9 @@ sub spawn { my $p = shift; POE::Session->create( package_states => [ - $p => [qw| _start register addcron fetch queue execute finish log cmd_exit |], + $p => [qw| _start register addcron heartbeat queue prepare execute finish log cmd_exit |], ], - heap => { cron => [], queue => [], cmds => [], running => 0, starttime => 0 }, + heap => { cron => [], cmds => [], running => 0, starttime => 0 }, ); } @@ -29,7 +28,8 @@ sub _start { $_[KERNEL]->alias_set('core'); $_[KERNEL]->call(core => register => qr/^(exit|reload)$/, 'cmd_exit'); $_[KERNEL]->yield(queue => $_) for (grep !/^-/, @ARGV); - $_[KERNEL]->yield(fetch => time) if $Multi::DAEMONIZE != 1; + $_[KERNEL]->yield(heartbeat => time) if $Multi::DAEMONIZE != 1; + $_[KERNEL]->yield('prepare'); } @@ -47,32 +47,36 @@ sub addcron { # cronline, cmd } -sub fetch { # lastfetch - my $s = IPC::ShareLite->new(-key => $VNDB::SHMKEY,-create => 1, -destroy => 0); - $s->lock(LOCK_SH); - my $l = $s->fetch(); - if($l) { - my $cmds = thaw($l); - $_[KERNEL]->yield(queue => $_) for(@$cmds); - $s->lock(LOCK_EX); - $s->store(''); - } - $s->unlock; - undef $s; - +sub heartbeat { # last beat + $_[KERNEL]->yield('prepare'); $_[KERNEL]->call(core => log => 1, 'Heartbeat took %.2fs, possible block', time-$_[ARG0]) if time > $_[ARG0]+3; - $_[KERNEL]->delay(fetch => 1, time) if $Multi::DAEMONIZE == 0; + $_[KERNEL]->delay(heartbeat => 1, time) if $Multi::DAEMONIZE == 0; } sub queue { # cmd - push @{$_[HEAP]{queue}}, $_[ARG0]; - $_[KERNEL]->call(core => log => 3, "Queuing '%s'. Queue size: %d", $_[ARG0], scalar @{$_[HEAP]{queue}}); - if(!$_[HEAP]{running}) { - $_[KERNEL]->yield(execute => $_[ARG0]); + my $s = tie my %s, 'Tie::ShareLite', @VNDB::SHMOPTS; + $s->lock(LOCK_EX); + my @q = ( ($s{queue} ? @{$s{queue}} : ()), $_[ARG0] ); + $s{queue} = \@q; + $s->unlock(); + + $_[KERNEL]->call(core => log => 3, "Queuing '%s'.", $_[ARG0]); + $_[KERNEL]->yield('prepare'); +} + + +sub prepare { # determines whether to execute a new cmd + return if $Multi::STOP || $_[HEAP]{running}; + + my $s = tie my %s, 'Tie::ShareLite', @VNDB::SHMOPTS; + $s->lock(LOCK_SH); + if($s{queue} && @{$s{queue}}) { + $_[KERNEL]->yield(execute => $s{queue}[0]); $_[HEAP]{running} = 1; } + $s->unlock(); } @@ -91,15 +95,18 @@ sub execute { # cmd } -sub finish { # cmd [, stop ] +sub finish { # cmd $_[HEAP]{running} = 0; - $_[HEAP]{queue} = [ grep { $_ ne $_[ARG0] } @{$_[HEAP]{queue}} ]; - $_[KERNEL]->call(core => log => 2, "Unqueuing '%s' after %.2fs. Queue size: %d", - $_[ARG0], tv_interval($_[HEAP]{starttime}), scalar @{$_[HEAP]{queue}}); - if(@{$_[HEAP]{queue}} && !$_[ARG1]) { - $_[KERNEL]->yield(execute => $_[HEAP]{queue}[0]); - $_[HEAP]{running} = 1; - } + $_[KERNEL]->call(core => log => 2, "Unqueuing '%s' after %.2fs.", + $_[ARG0], tv_interval($_[HEAP]{starttime})); + + my $s = tie my %s, 'Tie::ShareLite', @VNDB::SHMOPTS; + $s->lock(LOCK_EX); + my @q = grep { $_ ne $_[ARG0] } $s{queue} ? @{$s{queue}} : (); + $s{queue} = \@q; + $s->unlock(); + + $_[KERNEL]->yield('prepare'); } @@ -121,18 +128,11 @@ sub log { # level, msg sub cmd_exit { - $Multi::RESTART = 1 if $_[ARG0] eq 'reload'; - $_[KERNEL]->call(core => finish => $_[ARG0], 1); + $Multi::STOP = $_[ARG0] eq 'reload' ? 2 : 1; + $_[KERNEL]->call(core => finish => $_[ARG0]); $_[KERNEL]->call(core => log => 2, 'Exiting...'); - - my $s = IPC::ShareLite->new(-key => 'VNDB',-create => 1, -destroy => 0); - $s->lock(LOCK_EX); - $s->store(freeze($_[HEAP]->{queue})); - $s->unlock(); - undef $s; - - $_[KERNEL]->delay('fetch'); # stop fetching + $_[KERNEL]->delay('heartbeat'); # stop the heartbeats $_->delete() for (@{$_[HEAP]{cron}}); # stop scheduling cron jobs $_[KERNEL]->signal($_[KERNEL], 'shutdown'); # Broadcast to other sessions } diff --git a/lib/Multi/IRC.pm b/lib/Multi/IRC.pm index 75300419..4643a8fb 100644 --- a/lib/Multi/IRC.pm +++ b/lib/Multi/IRC.pm @@ -85,7 +85,7 @@ sub irc_001 { sub irc_public { if($_[ARG2] =~ /^!info/) { $_[KERNEL]->post(circ => privmsg => $_[ARG1][0], - 'Hello, I am HMX-12 Multi v'.$Multi::VERSION.' made by the great Yorhel! (Please ask Ayo for more info)'); + 'Hello, I am HMX-12 Multi v'.$VNDB::VERSION.' made by the great Yorhel! (Please ask Ayo for more info)'); } else { $_[KERNEL]->call(irc => vndbid => $_[ARG1][0], $_[ARG2]); } |