summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYorhel <git@yorhel.nl>2009-07-16 16:25:40 +0200
committerYorhel <git@yorhel.nl>2009-07-16 16:25:40 +0200
commit42734f7fb1b500c09745d45708630803a82ae2ad (patch)
tree9792b9efc521c7297005e16502645352d69c0eee
parent9a07126464f4ca05f33e7375883ada66abe7dcf4 (diff)
Made a start on the Multi-rewrite
Started on multi.pl and Multi::Core, the main differences: - Uses POE::Component::Pg now (get it from http://g.blicky.net/poco-pg.git/) - Doesn't use shared memory anymore - No 'commands' anymore, every session has to handle its own events (communication goes either through POE itself, or the PostgreSQL DB) - No weird Cron stuff anymore All other Multi modules will have to be updated/rewritten to reflect these changes. None of them will work at the moment.
-rw-r--r--data/global.pl1
-rw-r--r--lib/Multi/Core.pm138
-rwxr-xr-xutil/multi.pl57
3 files changed, 38 insertions, 158 deletions
diff --git a/data/global.pl b/data/global.pl
index 7f3ab7a2..de7307cd 100644
--- a/data/global.pl
+++ b/data/global.pl
@@ -268,7 +268,6 @@ our %S = (%S,
# Multi-specific options (Multi also uses some options in %S and %O)
our %M = (
log_dir => $ROOT.'/data/log',
- log_level => 3, # 3: dbg, 2: wrn, 1: err
modules => {
RG => {},
Image => {},
diff --git a/lib/Multi/Core.pm b/lib/Multi/Core.pm
index 7b2ae24b..0e17f894 100644
--- a/lib/Multi/Core.pm
+++ b/lib/Multi/Core.pm
@@ -7,134 +7,70 @@ package Multi::Core;
use strict;
use warnings;
-use POE 'Component::Cron';
-use Tie::ShareLite ':lock';
-use Time::HiRes 'time', 'gettimeofday', 'tv_interval'; # overload time()
-use DateTime::Event::Cron; # bug in PoCo::Cron (rt #35422, fixed in 0.019)
+use POE;
+use POE::Component::Pg;
+use DBI;
-sub spawn {
+sub run {
my $p = shift;
+
+ # spawn our SQL handling session
+ my @db = @{$VNDB::O{db_login}};
+ my(@dsn) = DBI->parse_dsn($db[0]);
+ $dsn[2] = ($dsn[2]?',':'').'pg_enable_utf8=>1';
+ $db[0] = "$dsn[0]:$dsn[1]($dsn[2]):$dsn[4]";
+ POE::Component::Pg->spawn(alias => 'pg', dsn => $db[0], user => $db[1], password => $db[2]);
+
+ # spawn the core session (which only handles logging at this point)
POE::Session->create(
package_states => [
- $p => [qw| _start register addcron heartbeat queue prepare execute finish log cmd_exit |],
+ $p => [qw| _start log pg_error |],
],
- heap => { cron => [], cmds => [], running => 0, starttime => 0 },
);
-}
-
-
-sub _start {
- $_[KERNEL]->alias_set('core');
- $_[KERNEL]->call(core => register => qr/^(exit|reload)$/, 'cmd_exit');
- $_[KERNEL]->yield(queue => $_) for (grep !/^-/, @ARGV);
- $_[KERNEL]->yield(heartbeat => time) if $Multi::DAEMONIZE != 1;
- $_[KERNEL]->yield('prepare');
-}
-
-
-sub register { # regex, state
- push @{$_[HEAP]{cmds}}, [ $_[ARG0], $_[SENDER], $_[ARG1] ];
- (my $p = $_[SENDER][2]{$_[CALLER_STATE]}[0]) =~ s/^Multi:://; # NOT PORTABLE
- $_[KERNEL]->call(core => log => 3, "Command '%s' handled by %s::%s", $_[ARG0], $p, $_[ARG1]);
-}
-
-
-sub addcron { # cronline, cmd
- return if $Multi::DAEMONIZE; # no cronjobs when we aren't a daemon!
- push @{$_[HEAP]{cron}}, POE::Component::Cron->from_cron($_[ARG0], $_[SESSION], queue => $_[ARG1]);
- $_[KERNEL]->call(core => log => 3, "Added cron: %s %s", $_[ARG0], $_[ARG1]);
-}
-
-
-sub heartbeat { # last beat
- $_[KERNEL]->yield('prepare');
- $_[KERNEL]->call(core => log => 1, 'Heartbeat took %.2fs, possible block', time-$_[ARG0])
- if time > $_[ARG0]+3;
- $_[KERNEL]->delay(heartbeat => 1, time) if $Multi::DAEMONIZE == 0;
-}
-
-
-sub queue { # cmd
- my $s = tie my %s, 'Tie::ShareLite', -key => $VNDB::S{sharedmem_key}, -create => 'yes', -destroy => 'no', -mode => 0666;
- $s->lock(LOCK_EX);
- my @q = ( ($s{queue} ? @{$s{queue}} : ()), $_[ARG0] );
- $s{queue} = \@q;
- $s->unlock();
-
- $_[KERNEL]->call(core => log => 3, "Queuing '%s'.", $_[ARG0]);
- $_[KERNEL]->yield('prepare');
-}
-
-sub prepare { # determines whether to execute a new cmd
- return if $Multi::STOP || $_[HEAP]{running};
-
- my $s = tie my %s, 'Tie::ShareLite', -key => $VNDB::S{sharedmem_key}, -create => 'yes', -destroy => 'no', -mode => 0666;
- $s->lock(LOCK_SH);
- if($s{queue} && @{$s{queue}}) {
- $_[KERNEL]->yield(execute => $s{queue}[0]);
- $_[HEAP]{running} = 1;
+ # dynamically load and spawn modules
+ for (keys %{$VNDB::M{modules}}) {
+ my($mod, $args) = ($_, $VNDB::M{modules}{$_});
+ next if !$args || ref($args) ne 'HASH';
+ require "Multi/$mod.pm";
+ # I'm surprised the strict pagma isn't complaining about this
+ "Multi::$mod"->spawn(%$args);
}
- $s->unlock();
-}
+ # log warnings
+ $SIG{__WARN__} = sub {(local$_=shift)=~s/\r?\n//;$poe_kernel->call(core=>log=>'__WARN__: '.$_)};
-sub execute { # cmd
- $_[HEAP]{starttime} = [ gettimeofday ];
- my $cmd = (grep { $_[ARG0] =~ /$_->[0]/ } @{$_[HEAP]{cmds}})[0];
- if(!$cmd) {
- $_[KERNEL]->call(core => log => 1, 'Unknown cmd: %s', $_[ARG0]);
- $_[KERNEL]->yield(finish => $_[ARG0]);
- return;
- }
- $_[KERNEL]->call(core => log => 2, 'Executing cmd: %s', $_[ARG0]);
- $_[ARG0] =~ /$cmd->[0]/; # determine arguments (see perlvar for the magic)
- my @arg = $#- ? map { substr $_[ARG0], $-[$_], $+[$_]-$-[$_] } 1..$#- : ();
- $_[KERNEL]->post($cmd->[1] => $cmd->[2], $_[ARG0], @arg);
+ $poe_kernel->run();
}
-sub finish { # cmd
- $_[HEAP]{running} = 0;
- $_[KERNEL]->call(core => log => 2, "Unqueuing '%s' after %.2fs.",
- $_[ARG0], tv_interval($_[HEAP]{starttime}));
-
- my $s = tie my %s, 'Tie::ShareLite', -key => $VNDB::S{sharedmem_key}, -create => 'yes', -destroy => 'no', -mode => 0666;
- $s->lock(LOCK_EX);
- my @q = grep { $_ ne $_[ARG0] } $s{queue} ? @{$s{queue}} : ();
- $s{queue} = \@q;
- $s->unlock();
-
- $_[KERNEL]->yield('prepare');
+sub _start {
+ $_[KERNEL]->alias_set('core');
+ $_[KERNEL]->post(pg => register => error => 'pg_error');
+ $_[KERNEL]->post(pg => 'connect');
+ $_[KERNEL]->call(core => log => 'Starting Multi '.$VNDB::S{version});
}
sub log { # level, msg
- return if $_[ARG0] > $VNDB::M{log_level};
-
(my $p = eval { $_[SENDER][2]{$_[CALLER_STATE]}[0] } || '') =~ s/^Multi:://;
- my $msg = sprintf '(%s) %s::%s: %s',
- (qw|WRN ACT DBG|)[$_[ARG0]-1], $p, $_[CALLER_STATE],
- $_[ARG2] ? sprintf($_[ARG1], @_[ARG2..$#_]) : $_[ARG1];
+ my $msg = sprintf '%s::%s: %s', $p, $_[CALLER_STATE],
+ $_[ARG1] ? sprintf($_[ARG0], @_[ARG1..$#_]) : $_[ARG0];
open(my $F, '>>', $VNDB::M{log_dir}.'/multi.log');
- printf $F "[%s] %s\n", scalar localtime, $msg;
+ printf "[%s] %s\n", scalar localtime, $msg;
close $F;
}
-sub cmd_exit {
- $Multi::STOP = $_[ARG0] eq 'reload' ? 2 : 1;
- $_[KERNEL]->call(core => finish => $_[ARG0]);
- $_[KERNEL]->call(core => log => 2, 'Exiting...');
-
- $_[KERNEL]->delay('heartbeat'); # stop the heartbeats
- $_->delete() for (@{$_[HEAP]{cron}}); # stop scheduling cron jobs
- $_[KERNEL]->signal($_[KERNEL], 'shutdown'); # Broadcast to other sessions
+sub pg_error { # ARG: command, errmsg, [ query, params, orig_session, event-args ]
+ my $s = $_[ARG2] ? sprintf ' (Session: %s, Query: "%s", Params: %s, Args: %s)',
+ join(', ', $_[KERNEL]->alias_list($_[ARG4])), $_[ARG2],
+ join(', ', $_[ARG3] ? map qq|"$_"|, @{$_[ARG3]} : '[none]'), $_[ARG5] : '';
+ $_[KERNEL]->call(core => log => 'SQL Error for command %s: %s %s', $_[ARG0], $_[ARG1], $s);
}
1;
-
diff --git a/util/multi.pl b/util/multi.pl
index e234534d..010951a2 100755
--- a/util/multi.pl
+++ b/util/multi.pl
@@ -1,36 +1,17 @@
#!/usr/bin/perl
-# Usage:
-# ./multi.pl [-c] [-s] [-a] [cmd1] [cmd2] ..
-# -c Do not daemonize, just execute the commands specified
-# on the command line and exit.
-# -s Same as -c, but also execute commands in the shared
-# memory processing queue.
-# -a Don't do anything, just add the commands specified on
-# the command line to the shared memory processing queue.
-
#
# Multi - core namespace for initialisation and global variables
#
-# NOTE: in case of errors, clearing the shared memory might work:
-# $ ipcrm -S 0x42444e56 -M 0x42444e56
-
package Multi;
use strict;
use warnings;
-no warnings 'once';
-use Tie::ShareLite ':lock';
-use Time::HiRes;
-use POE;
-use DBI;
use Cwd 'abs_path';
-# loading & initialization
-
our $ROOT;
BEGIN { ($ROOT = abs_path $0) =~ s{/util/multi\.pl$}{}; *VNDB::ROOT = \$ROOT }
use lib $VNDB::ROOT.'/lib';
@@ -38,42 +19,6 @@ use lib $VNDB::ROOT.'/lib';
use Multi::Core;
require $VNDB::ROOT.'/data/global.pl';
-our $STOP = 0;
-our $DAEMONIZE = (grep /^-c$/, @ARGV) ? 1 : (grep /^-s$/, @ARGV) ? 2 : 0;
-
-
-
-# only add commands with the -a argument
-
-if(grep /^-a$/, @ARGV) {
- my $s = tie my %s, 'Tie::ShareLite', -key => $VNDB::S{sharedmem_key}, -create => 'yes', -destroy => 'no', -mode => 0666;
- $s->lock(LOCK_EX);
- my @q = ( ($s{queue} ? @{$s{queue}} : ()), (grep !/^-/, @ARGV) );
- $s{queue} = \@q;
- $s->unlock();
- exit;
-}
-
-# one shared pgsql connection for all sessions
-our $SQL = DBI->connect(@{$VNDB::O{db_login}},
- { PrintError => 1, RaiseError => 0, AutoCommit => 1, pg_enable_utf8 => 1 });
-
-
-Multi::Core->spawn();
-
-# dynamically load and spawn modules
-for (keys %{$VNDB::M{modules}}) {
- my($mod, $args) = ($_, $VNDB::M{modules}{$_});
- next if !$args || ref($args) ne 'HASH';
- require "Multi/$mod.pm";
- # I'm surprised the strict pagma isn't complaining about this
- "Multi::$mod"->spawn(%$args);
-}
-
-$SIG{__WARN__} = sub {(local$_=shift)=~s/\r?\n//;$poe_kernel->call(core=>log=>1,'__WARN__: '.$_)};
-
-$poe_kernel->run();
-exec $0, grep /^-/, @ARGV if $STOP == 2;
-
+Multi::Core->run();