summaryrefslogtreecommitdiff
path: root/lib/Multi
diff options
context:
space:
mode:
authoryorhel <yorhel@1fe2e327-d9db-4752-bcf7-ef0cb4a1748b>2008-04-27 06:41:52 +0000
committeryorhel <yorhel@1fe2e327-d9db-4752-bcf7-ef0cb4a1748b>2008-04-27 06:41:52 +0000
commit04c26425699a3e01d0e2abe52ad9f763396cd940 (patch)
tree4923a840ac27da9bc9bc322dec51d0504834dfe5 /lib/Multi
parenta9e444bb39f5c356b66ef247b4539937daa25503 (diff)
Made Multis queue persistent in shared memory
git-svn-id: svn://vndb.org/vndb@8 1fe2e327-d9db-4752-bcf7-ef0cb4a1748b
Diffstat (limited to 'lib/Multi')
-rw-r--r--lib/Multi/Core.pm82
-rw-r--r--lib/Multi/IRC.pm2
2 files changed, 42 insertions, 42 deletions
diff --git a/lib/Multi/Core.pm b/lib/Multi/Core.pm
index 6f3b452c..6f94f860 100644
--- a/lib/Multi/Core.pm
+++ b/lib/Multi/Core.pm
@@ -8,8 +8,7 @@ package Multi::Core;
use strict;
use warnings;
use POE 'Component::Cron';
-use Storable 'freeze', 'thaw';
-use IPC::ShareLite ':lock';
+use Tie::ShareLite ':lock';
use Time::HiRes 'time', 'gettimeofday', 'tv_interval'; # overload time()
use DateTime::Event::Cron; # bug in PoCo::Cron
@@ -18,9 +17,9 @@ sub spawn {
my $p = shift;
POE::Session->create(
package_states => [
- $p => [qw| _start register addcron fetch queue execute finish log cmd_exit |],
+ $p => [qw| _start register addcron heartbeat queue prepare execute finish log cmd_exit |],
],
- heap => { cron => [], queue => [], cmds => [], running => 0, starttime => 0 },
+ heap => { cron => [], cmds => [], running => 0, starttime => 0 },
);
}
@@ -29,7 +28,8 @@ sub _start {
$_[KERNEL]->alias_set('core');
$_[KERNEL]->call(core => register => qr/^(exit|reload)$/, 'cmd_exit');
$_[KERNEL]->yield(queue => $_) for (grep !/^-/, @ARGV);
- $_[KERNEL]->yield(fetch => time) if $Multi::DAEMONIZE != 1;
+ $_[KERNEL]->yield(heartbeat => time) if $Multi::DAEMONIZE != 1;
+ $_[KERNEL]->yield('prepare');
}
@@ -47,32 +47,36 @@ sub addcron { # cronline, cmd
}
-sub fetch { # lastfetch
- my $s = IPC::ShareLite->new(-key => $VNDB::SHMKEY,-create => 1, -destroy => 0);
- $s->lock(LOCK_SH);
- my $l = $s->fetch();
- if($l) {
- my $cmds = thaw($l);
- $_[KERNEL]->yield(queue => $_) for(@$cmds);
- $s->lock(LOCK_EX);
- $s->store('');
- }
- $s->unlock;
- undef $s;
-
+sub heartbeat { # last beat
+ $_[KERNEL]->yield('prepare');
$_[KERNEL]->call(core => log => 1, 'Heartbeat took %.2fs, possible block', time-$_[ARG0])
if time > $_[ARG0]+3;
- $_[KERNEL]->delay(fetch => 1, time) if $Multi::DAEMONIZE == 0;
+ $_[KERNEL]->delay(heartbeat => 1, time) if $Multi::DAEMONIZE == 0;
}
sub queue { # cmd
- push @{$_[HEAP]{queue}}, $_[ARG0];
- $_[KERNEL]->call(core => log => 3, "Queuing '%s'. Queue size: %d", $_[ARG0], scalar @{$_[HEAP]{queue}});
- if(!$_[HEAP]{running}) {
- $_[KERNEL]->yield(execute => $_[ARG0]);
+ my $s = tie my %s, 'Tie::ShareLite', @VNDB::SHMOPTS;
+ $s->lock(LOCK_EX);
+ my @q = ( ($s{queue} ? @{$s{queue}} : ()), $_[ARG0] );
+ $s{queue} = \@q;
+ $s->unlock();
+
+ $_[KERNEL]->call(core => log => 3, "Queuing '%s'.", $_[ARG0]);
+ $_[KERNEL]->yield('prepare');
+}
+
+
+sub prepare { # determines whether to execute a new cmd
+ return if $Multi::STOP || $_[HEAP]{running};
+
+ my $s = tie my %s, 'Tie::ShareLite', @VNDB::SHMOPTS;
+ $s->lock(LOCK_SH);
+ if($s{queue} && @{$s{queue}}) {
+ $_[KERNEL]->yield(execute => $s{queue}[0]);
$_[HEAP]{running} = 1;
}
+ $s->unlock();
}
@@ -91,15 +95,18 @@ sub execute { # cmd
}
-sub finish { # cmd [, stop ]
+sub finish { # cmd
$_[HEAP]{running} = 0;
- $_[HEAP]{queue} = [ grep { $_ ne $_[ARG0] } @{$_[HEAP]{queue}} ];
- $_[KERNEL]->call(core => log => 2, "Unqueuing '%s' after %.2fs. Queue size: %d",
- $_[ARG0], tv_interval($_[HEAP]{starttime}), scalar @{$_[HEAP]{queue}});
- if(@{$_[HEAP]{queue}} && !$_[ARG1]) {
- $_[KERNEL]->yield(execute => $_[HEAP]{queue}[0]);
- $_[HEAP]{running} = 1;
- }
+ $_[KERNEL]->call(core => log => 2, "Unqueuing '%s' after %.2fs.",
+ $_[ARG0], tv_interval($_[HEAP]{starttime}));
+
+ my $s = tie my %s, 'Tie::ShareLite', @VNDB::SHMOPTS;
+ $s->lock(LOCK_EX);
+ my @q = grep { $_ ne $_[ARG0] } $s{queue} ? @{$s{queue}} : ();
+ $s{queue} = \@q;
+ $s->unlock();
+
+ $_[KERNEL]->yield('prepare');
}
@@ -121,18 +128,11 @@ sub log { # level, msg
sub cmd_exit {
- $Multi::RESTART = 1 if $_[ARG0] eq 'reload';
- $_[KERNEL]->call(core => finish => $_[ARG0], 1);
+ $Multi::STOP = $_[ARG0] eq 'reload' ? 2 : 1;
+ $_[KERNEL]->call(core => finish => $_[ARG0]);
$_[KERNEL]->call(core => log => 2, 'Exiting...');
-
- my $s = IPC::ShareLite->new(-key => 'VNDB',-create => 1, -destroy => 0);
- $s->lock(LOCK_EX);
- $s->store(freeze($_[HEAP]->{queue}));
- $s->unlock();
- undef $s;
-
- $_[KERNEL]->delay('fetch'); # stop fetching
+ $_[KERNEL]->delay('heartbeat'); # stop the heartbeats
$_->delete() for (@{$_[HEAP]{cron}}); # stop scheduling cron jobs
$_[KERNEL]->signal($_[KERNEL], 'shutdown'); # Broadcast to other sessions
}
diff --git a/lib/Multi/IRC.pm b/lib/Multi/IRC.pm
index 75300419..4643a8fb 100644
--- a/lib/Multi/IRC.pm
+++ b/lib/Multi/IRC.pm
@@ -85,7 +85,7 @@ sub irc_001 {
sub irc_public {
if($_[ARG2] =~ /^!info/) {
$_[KERNEL]->post(circ => privmsg => $_[ARG1][0],
- 'Hello, I am HMX-12 Multi v'.$Multi::VERSION.' made by the great Yorhel! (Please ask Ayo for more info)');
+ 'Hello, I am HMX-12 Multi v'.$VNDB::VERSION.' made by the great Yorhel! (Please ask Ayo for more info)');
} else {
$_[KERNEL]->call(irc => vndbid => $_[ARG1][0], $_[ARG2]);
}