summaryrefslogtreecommitdiff
path: root/lib/Multi/Core.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/Multi/Core.pm')
-rw-r--r--lib/Multi/Core.pm138
1 files changed, 37 insertions, 101 deletions
diff --git a/lib/Multi/Core.pm b/lib/Multi/Core.pm
index 7b2ae24b..0e17f894 100644
--- a/lib/Multi/Core.pm
+++ b/lib/Multi/Core.pm
@@ -7,134 +7,70 @@ package Multi::Core;
use strict;
use warnings;
-use POE 'Component::Cron';
-use Tie::ShareLite ':lock';
-use Time::HiRes 'time', 'gettimeofday', 'tv_interval'; # overload time()
-use DateTime::Event::Cron; # bug in PoCo::Cron (rt #35422, fixed in 0.019)
+use POE;
+use POE::Component::Pg;
+use DBI;
-sub spawn {
+sub run {
my $p = shift;
+
+ # spawn our SQL handling session
+ my @db = @{$VNDB::O{db_login}};
+ my(@dsn) = DBI->parse_dsn($db[0]);
+ $dsn[2] = ($dsn[2]?',':'').'pg_enable_utf8=>1';
+ $db[0] = "$dsn[0]:$dsn[1]($dsn[2]):$dsn[4]";
+ POE::Component::Pg->spawn(alias => 'pg', dsn => $db[0], user => $db[1], password => $db[2]);
+
+ # spawn the core session (which only handles logging at this point)
POE::Session->create(
package_states => [
- $p => [qw| _start register addcron heartbeat queue prepare execute finish log cmd_exit |],
+ $p => [qw| _start log pg_error |],
],
- heap => { cron => [], cmds => [], running => 0, starttime => 0 },
);
-}
-
-
-sub _start {
- $_[KERNEL]->alias_set('core');
- $_[KERNEL]->call(core => register => qr/^(exit|reload)$/, 'cmd_exit');
- $_[KERNEL]->yield(queue => $_) for (grep !/^-/, @ARGV);
- $_[KERNEL]->yield(heartbeat => time) if $Multi::DAEMONIZE != 1;
- $_[KERNEL]->yield('prepare');
-}
-
-
-sub register { # regex, state
- push @{$_[HEAP]{cmds}}, [ $_[ARG0], $_[SENDER], $_[ARG1] ];
- (my $p = $_[SENDER][2]{$_[CALLER_STATE]}[0]) =~ s/^Multi:://; # NOT PORTABLE
- $_[KERNEL]->call(core => log => 3, "Command '%s' handled by %s::%s", $_[ARG0], $p, $_[ARG1]);
-}
-
-
-sub addcron { # cronline, cmd
- return if $Multi::DAEMONIZE; # no cronjobs when we aren't a daemon!
- push @{$_[HEAP]{cron}}, POE::Component::Cron->from_cron($_[ARG0], $_[SESSION], queue => $_[ARG1]);
- $_[KERNEL]->call(core => log => 3, "Added cron: %s %s", $_[ARG0], $_[ARG1]);
-}
-
-
-sub heartbeat { # last beat
- $_[KERNEL]->yield('prepare');
- $_[KERNEL]->call(core => log => 1, 'Heartbeat took %.2fs, possible block', time-$_[ARG0])
- if time > $_[ARG0]+3;
- $_[KERNEL]->delay(heartbeat => 1, time) if $Multi::DAEMONIZE == 0;
-}
-
-
-sub queue { # cmd
- my $s = tie my %s, 'Tie::ShareLite', -key => $VNDB::S{sharedmem_key}, -create => 'yes', -destroy => 'no', -mode => 0666;
- $s->lock(LOCK_EX);
- my @q = ( ($s{queue} ? @{$s{queue}} : ()), $_[ARG0] );
- $s{queue} = \@q;
- $s->unlock();
-
- $_[KERNEL]->call(core => log => 3, "Queuing '%s'.", $_[ARG0]);
- $_[KERNEL]->yield('prepare');
-}
-
-sub prepare { # determines whether to execute a new cmd
- return if $Multi::STOP || $_[HEAP]{running};
-
- my $s = tie my %s, 'Tie::ShareLite', -key => $VNDB::S{sharedmem_key}, -create => 'yes', -destroy => 'no', -mode => 0666;
- $s->lock(LOCK_SH);
- if($s{queue} && @{$s{queue}}) {
- $_[KERNEL]->yield(execute => $s{queue}[0]);
- $_[HEAP]{running} = 1;
+ # dynamically load and spawn modules
+ for (keys %{$VNDB::M{modules}}) {
+ my($mod, $args) = ($_, $VNDB::M{modules}{$_});
+ next if !$args || ref($args) ne 'HASH';
+ require "Multi/$mod.pm";
+ # I'm surprised the strict pagma isn't complaining about this
+ "Multi::$mod"->spawn(%$args);
}
- $s->unlock();
-}
+ # log warnings
+ $SIG{__WARN__} = sub {(local$_=shift)=~s/\r?\n//;$poe_kernel->call(core=>log=>'__WARN__: '.$_)};
-sub execute { # cmd
- $_[HEAP]{starttime} = [ gettimeofday ];
- my $cmd = (grep { $_[ARG0] =~ /$_->[0]/ } @{$_[HEAP]{cmds}})[0];
- if(!$cmd) {
- $_[KERNEL]->call(core => log => 1, 'Unknown cmd: %s', $_[ARG0]);
- $_[KERNEL]->yield(finish => $_[ARG0]);
- return;
- }
- $_[KERNEL]->call(core => log => 2, 'Executing cmd: %s', $_[ARG0]);
- $_[ARG0] =~ /$cmd->[0]/; # determine arguments (see perlvar for the magic)
- my @arg = $#- ? map { substr $_[ARG0], $-[$_], $+[$_]-$-[$_] } 1..$#- : ();
- $_[KERNEL]->post($cmd->[1] => $cmd->[2], $_[ARG0], @arg);
+ $poe_kernel->run();
}
-sub finish { # cmd
- $_[HEAP]{running} = 0;
- $_[KERNEL]->call(core => log => 2, "Unqueuing '%s' after %.2fs.",
- $_[ARG0], tv_interval($_[HEAP]{starttime}));
-
- my $s = tie my %s, 'Tie::ShareLite', -key => $VNDB::S{sharedmem_key}, -create => 'yes', -destroy => 'no', -mode => 0666;
- $s->lock(LOCK_EX);
- my @q = grep { $_ ne $_[ARG0] } $s{queue} ? @{$s{queue}} : ();
- $s{queue} = \@q;
- $s->unlock();
-
- $_[KERNEL]->yield('prepare');
+sub _start {
+ $_[KERNEL]->alias_set('core');
+ $_[KERNEL]->post(pg => register => error => 'pg_error');
+ $_[KERNEL]->post(pg => 'connect');
+ $_[KERNEL]->call(core => log => 'Starting Multi '.$VNDB::S{version});
}
sub log { # level, msg
- return if $_[ARG0] > $VNDB::M{log_level};
-
(my $p = eval { $_[SENDER][2]{$_[CALLER_STATE]}[0] } || '') =~ s/^Multi:://;
- my $msg = sprintf '(%s) %s::%s: %s',
- (qw|WRN ACT DBG|)[$_[ARG0]-1], $p, $_[CALLER_STATE],
- $_[ARG2] ? sprintf($_[ARG1], @_[ARG2..$#_]) : $_[ARG1];
+ my $msg = sprintf '%s::%s: %s', $p, $_[CALLER_STATE],
+ $_[ARG1] ? sprintf($_[ARG0], @_[ARG1..$#_]) : $_[ARG0];
open(my $F, '>>', $VNDB::M{log_dir}.'/multi.log');
- printf $F "[%s] %s\n", scalar localtime, $msg;
+ printf "[%s] %s\n", scalar localtime, $msg;
close $F;
}
-sub cmd_exit {
- $Multi::STOP = $_[ARG0] eq 'reload' ? 2 : 1;
- $_[KERNEL]->call(core => finish => $_[ARG0]);
- $_[KERNEL]->call(core => log => 2, 'Exiting...');
-
- $_[KERNEL]->delay('heartbeat'); # stop the heartbeats
- $_->delete() for (@{$_[HEAP]{cron}}); # stop scheduling cron jobs
- $_[KERNEL]->signal($_[KERNEL], 'shutdown'); # Broadcast to other sessions
+sub pg_error { # ARG: command, errmsg, [ query, params, orig_session, event-args ]
+ my $s = $_[ARG2] ? sprintf ' (Session: %s, Query: "%s", Params: %s, Args: %s)',
+ join(', ', $_[KERNEL]->alias_list($_[ARG4])), $_[ARG2],
+ join(', ', $_[ARG3] ? map qq|"$_"|, @{$_[ARG3]} : '[none]'), $_[ARG5] : '';
+ $_[KERNEL]->call(core => log => 'SQL Error for command %s: %s %s', $_[ARG0], $_[ARG1], $s);
}
1;
-