summaryrefslogtreecommitdiff
path: root/lib/Multi/Core.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/Multi/Core.pm')
-rw-r--r--lib/Multi/Core.pm128
1 files changed, 128 insertions, 0 deletions
diff --git a/lib/Multi/Core.pm b/lib/Multi/Core.pm
new file mode 100644
index 00000000..1eea66ca
--- /dev/null
+++ b/lib/Multi/Core.pm
@@ -0,0 +1,128 @@
+
+#
+# Multi::Core - handles logging and the main command queue
+#
+
+package Multi::Core;
+
+use strict;
+use warnings;
+use POE;
+use Storable 'freeze', 'thaw';
+use IPC::ShareLite ':lock';
+use Time::HiRes 'time', 'gettimeofday', 'tv_interval'; # overload time()
+
+
+sub spawn {
+ my $p = shift;
+ POE::Session->create(
+ package_states => [
+ $p => [qw| _start register fetch queue execute finish log cmd_exit |],
+ ],
+ heap => { queue => [], cmds => [], running => 0, starttime => 0 },
+ );
+}
+
+
+sub _start {
+ $_[KERNEL]->alias_set('core');
+ $_[KERNEL]->call(core => register => qr/^(exit|reload)$/, 'cmd_exit');
+ $_[KERNEL]->yield(queue => $_) for (grep !/^-/, @ARGV);
+ $_[KERNEL]->yield(fetch => time) if $Multi::DAEMONIZE != 1;
+}
+
+
+sub register { # regex, state
+ push @{$_[HEAP]{cmds}}, [ $_[ARG0], $_[SENDER], $_[ARG1] ];
+ (my $p = $_[SENDER][2]{$_[CALLER_STATE]}[0]) =~ s/^Multi:://; # NOT PORTABLE
+ $_[KERNEL]->call(core => log => 3, "Command '%s' handled by %s::%s", $_[ARG0], $p, $_[ARG1]);
+}
+
+
+sub fetch { # lastfetch
+ my $s = IPC::ShareLite->new(-key => $VNDB::SHMKEY,-create => 1, -destroy => 0);
+ $s->lock(LOCK_SH);
+ my $l = $s->fetch();
+ if($l) {
+ my $cmds = thaw($l);
+ $_[KERNEL]->yield(queue => $_) for(@$cmds);
+ $s->lock(LOCK_EX);
+ $s->store('');
+ }
+ $s->unlock;
+ undef $s;
+
+ $_[KERNEL]->call(core => log => 1, 'Heartbeat took %.2fs, possible block', time-$_[ARG0])
+ if time > $_[ARG0]+3;
+ $_[KERNEL]->delay(fetch => 1, time) if $Multi::DAEMONIZE == 0;
+}
+
+
+sub queue { # cmd
+ push @{$_[HEAP]{queue}}, $_[ARG0];
+ $_[KERNEL]->call(core => log => 3, "Queuing '%s'. Queue size: %d", $_[ARG0], scalar @{$_[HEAP]{queue}});
+ if(!$_[HEAP]{running}) {
+ $_[KERNEL]->yield(execute => $_[ARG0]);
+ $_[HEAP]{running} = 1;
+ }
+}
+
+
+sub execute { # cmd
+ $_[HEAP]{starttime} = [ gettimeofday ];
+ my $cmd = (grep { $_[ARG0] =~ /$_->[0]/ } @{$_[HEAP]{cmds}})[0];
+ if(!$cmd) {
+ $_[KERNEL]->call(core => log => 1, 'Unknown cmd: %s', $_[ARG0]);
+ $_[KERNEL]->yield(finish => $_[ARG0]);
+ return;
+ }
+ $_[KERNEL]->call(core => log => 2, 'Executing cmd: %s', $_[ARG0]);
+ $_[ARG0] =~ /$cmd->[0]/; # determine arguments (see perlvar for the magic)
+ my @arg = $#- ? map { substr $_[ARG0], $-[$_], $+[$_]-$-[$_] } 1..$#- : ();
+ $_[KERNEL]->post($cmd->[1] => $cmd->[2], $_[ARG0], @arg);
+}
+
+
+sub finish { # cmd [, stop ]
+ $_[HEAP]{running} = 0;
+ $_[HEAP]{queue} = [ grep { $_ ne $_[ARG0] } @{$_[HEAP]{queue}} ];
+ $_[KERNEL]->call(core => log => 2, "Unqueuing '%s' after %.2fs. Queue size: %d",
+ $_[ARG0], tv_interval($_[HEAP]{starttime}), scalar @{$_[HEAP]{queue}});
+ if(@{$_[HEAP]{queue}} && !$_[ARG1]) {
+ $_[KERNEL]->yield(execute => $_[HEAP]{queue}[0]);
+ $_[HEAP]{running} = 1;
+ }
+}
+
+
+sub log { # level, msg
+ return if $_[ARG0] > $Multi::LOGLVL;
+ #open(my $F, \&STDOUT); #'>>', $Multi::LOGDIR.'/multi.log');
+ (my $p = $_[SENDER][2]{$_[CALLER_STATE]}[0]) =~ s/^Multi:://; # NOT PORTABLE
+ printf "[%s] (%s) %s::%s: %s\n", scalar localtime,
+ (qw|WRN ACT DBG|)[$_[ARG0]-1], $p, $_[CALLER_STATE],
+ $_[ARG2] ? sprintf($_[ARG1], @_[ARG2..$#_]) : $_[ARG1];
+ #close $F;
+}
+
+
+sub cmd_exit {
+ $Multi::RESTART = 1 if $_[ARG0] eq 'reload';
+ $_[KERNEL]->call(core => finish => $_[ARG0], 1);
+ $_[KERNEL]->call(core => log => 2, 'Exiting...');
+
+
+ my $s = IPC::ShareLite->new(-key => 'VNDB',-create => 1, -destroy => 0);
+ $s->lock(LOCK_EX);
+ $s->store(freeze($_[HEAP]->{queue}));
+ $s->unlock();
+ undef $s;
+
+ $_[KERNEL]->delay('fetch'); # This'll make the current session stop
+ $_[KERNEL]->signal($_[KERNEL], 'shutdown'); # Broadcast to other sessions
+}
+
+
+1;
+
+