diff options
author | Yorhel <git@yorhel.nl> | 2009-07-16 16:25:40 +0200 |
---|---|---|
committer | Yorhel <git@yorhel.nl> | 2009-07-16 16:25:40 +0200 |
commit | 42734f7fb1b500c09745d45708630803a82ae2ad (patch) | |
tree | 9792b9efc521c7297005e16502645352d69c0eee /lib/Multi/Core.pm | |
parent | 9a07126464f4ca05f33e7375883ada66abe7dcf4 (diff) |
Made a start on the Multi-rewrite
Started on multi.pl and Multi::Core, the main differences:
- Uses POE::Component::Pg now (get it from http://g.blicky.net/poco-pg.git/)
- Doesn't use shared memory anymore
- No 'commands' anymore, every session has to handle its own events
(communication goes either through POE itself, or the PostgreSQL DB)
- No weird Cron stuff anymore
All other Multi modules will have to be updated/rewritten to reflect
these changes. None of them will work at the moment.
Diffstat (limited to 'lib/Multi/Core.pm')
-rw-r--r-- | lib/Multi/Core.pm | 138 |
1 files changed, 37 insertions, 101 deletions
diff --git a/lib/Multi/Core.pm b/lib/Multi/Core.pm index 7b2ae24b..0e17f894 100644 --- a/lib/Multi/Core.pm +++ b/lib/Multi/Core.pm @@ -7,134 +7,70 @@ package Multi::Core; use strict; use warnings; -use POE 'Component::Cron'; -use Tie::ShareLite ':lock'; -use Time::HiRes 'time', 'gettimeofday', 'tv_interval'; # overload time() -use DateTime::Event::Cron; # bug in PoCo::Cron (rt #35422, fixed in 0.019) +use POE; +use POE::Component::Pg; +use DBI; -sub spawn { +sub run { my $p = shift; + + # spawn our SQL handling session + my @db = @{$VNDB::O{db_login}}; + my(@dsn) = DBI->parse_dsn($db[0]); + $dsn[2] = ($dsn[2]?',':'').'pg_enable_utf8=>1'; + $db[0] = "$dsn[0]:$dsn[1]($dsn[2]):$dsn[4]"; + POE::Component::Pg->spawn(alias => 'pg', dsn => $db[0], user => $db[1], password => $db[2]); + + # spawn the core session (which only handles logging at this point) POE::Session->create( package_states => [ - $p => [qw| _start register addcron heartbeat queue prepare execute finish log cmd_exit |], + $p => [qw| _start log pg_error |], ], - heap => { cron => [], cmds => [], running => 0, starttime => 0 }, ); -} - - -sub _start { - $_[KERNEL]->alias_set('core'); - $_[KERNEL]->call(core => register => qr/^(exit|reload)$/, 'cmd_exit'); - $_[KERNEL]->yield(queue => $_) for (grep !/^-/, @ARGV); - $_[KERNEL]->yield(heartbeat => time) if $Multi::DAEMONIZE != 1; - $_[KERNEL]->yield('prepare'); -} - - -sub register { # regex, state - push @{$_[HEAP]{cmds}}, [ $_[ARG0], $_[SENDER], $_[ARG1] ]; - (my $p = $_[SENDER][2]{$_[CALLER_STATE]}[0]) =~ s/^Multi:://; # NOT PORTABLE - $_[KERNEL]->call(core => log => 3, "Command '%s' handled by %s::%s", $_[ARG0], $p, $_[ARG1]); -} - - -sub addcron { # cronline, cmd - return if $Multi::DAEMONIZE; # no cronjobs when we aren't a daemon! - push @{$_[HEAP]{cron}}, POE::Component::Cron->from_cron($_[ARG0], $_[SESSION], queue => $_[ARG1]); - $_[KERNEL]->call(core => log => 3, "Added cron: %s %s", $_[ARG0], $_[ARG1]); -} - - -sub heartbeat { # last beat - $_[KERNEL]->yield('prepare'); - $_[KERNEL]->call(core => log => 1, 'Heartbeat took %.2fs, possible block', time-$_[ARG0]) - if time > $_[ARG0]+3; - $_[KERNEL]->delay(heartbeat => 1, time) if $Multi::DAEMONIZE == 0; -} - - -sub queue { # cmd - my $s = tie my %s, 'Tie::ShareLite', -key => $VNDB::S{sharedmem_key}, -create => 'yes', -destroy => 'no', -mode => 0666; - $s->lock(LOCK_EX); - my @q = ( ($s{queue} ? @{$s{queue}} : ()), $_[ARG0] ); - $s{queue} = \@q; - $s->unlock(); - - $_[KERNEL]->call(core => log => 3, "Queuing '%s'.", $_[ARG0]); - $_[KERNEL]->yield('prepare'); -} - -sub prepare { # determines whether to execute a new cmd - return if $Multi::STOP || $_[HEAP]{running}; - - my $s = tie my %s, 'Tie::ShareLite', -key => $VNDB::S{sharedmem_key}, -create => 'yes', -destroy => 'no', -mode => 0666; - $s->lock(LOCK_SH); - if($s{queue} && @{$s{queue}}) { - $_[KERNEL]->yield(execute => $s{queue}[0]); - $_[HEAP]{running} = 1; + # dynamically load and spawn modules + for (keys %{$VNDB::M{modules}}) { + my($mod, $args) = ($_, $VNDB::M{modules}{$_}); + next if !$args || ref($args) ne 'HASH'; + require "Multi/$mod.pm"; + # I'm surprised the strict pagma isn't complaining about this + "Multi::$mod"->spawn(%$args); } - $s->unlock(); -} + # log warnings + $SIG{__WARN__} = sub {(local$_=shift)=~s/\r?\n//;$poe_kernel->call(core=>log=>'__WARN__: '.$_)}; -sub execute { # cmd - $_[HEAP]{starttime} = [ gettimeofday ]; - my $cmd = (grep { $_[ARG0] =~ /$_->[0]/ } @{$_[HEAP]{cmds}})[0]; - if(!$cmd) { - $_[KERNEL]->call(core => log => 1, 'Unknown cmd: %s', $_[ARG0]); - $_[KERNEL]->yield(finish => $_[ARG0]); - return; - } - $_[KERNEL]->call(core => log => 2, 'Executing cmd: %s', $_[ARG0]); - $_[ARG0] =~ /$cmd->[0]/; # determine arguments (see perlvar for the magic) - my @arg = $#- ? map { substr $_[ARG0], $-[$_], $+[$_]-$-[$_] } 1..$#- : (); - $_[KERNEL]->post($cmd->[1] => $cmd->[2], $_[ARG0], @arg); + $poe_kernel->run(); } -sub finish { # cmd - $_[HEAP]{running} = 0; - $_[KERNEL]->call(core => log => 2, "Unqueuing '%s' after %.2fs.", - $_[ARG0], tv_interval($_[HEAP]{starttime})); - - my $s = tie my %s, 'Tie::ShareLite', -key => $VNDB::S{sharedmem_key}, -create => 'yes', -destroy => 'no', -mode => 0666; - $s->lock(LOCK_EX); - my @q = grep { $_ ne $_[ARG0] } $s{queue} ? @{$s{queue}} : (); - $s{queue} = \@q; - $s->unlock(); - - $_[KERNEL]->yield('prepare'); +sub _start { + $_[KERNEL]->alias_set('core'); + $_[KERNEL]->post(pg => register => error => 'pg_error'); + $_[KERNEL]->post(pg => 'connect'); + $_[KERNEL]->call(core => log => 'Starting Multi '.$VNDB::S{version}); } sub log { # level, msg - return if $_[ARG0] > $VNDB::M{log_level}; - (my $p = eval { $_[SENDER][2]{$_[CALLER_STATE]}[0] } || '') =~ s/^Multi:://; - my $msg = sprintf '(%s) %s::%s: %s', - (qw|WRN ACT DBG|)[$_[ARG0]-1], $p, $_[CALLER_STATE], - $_[ARG2] ? sprintf($_[ARG1], @_[ARG2..$#_]) : $_[ARG1]; + my $msg = sprintf '%s::%s: %s', $p, $_[CALLER_STATE], + $_[ARG1] ? sprintf($_[ARG0], @_[ARG1..$#_]) : $_[ARG0]; open(my $F, '>>', $VNDB::M{log_dir}.'/multi.log'); - printf $F "[%s] %s\n", scalar localtime, $msg; + printf "[%s] %s\n", scalar localtime, $msg; close $F; } -sub cmd_exit { - $Multi::STOP = $_[ARG0] eq 'reload' ? 2 : 1; - $_[KERNEL]->call(core => finish => $_[ARG0]); - $_[KERNEL]->call(core => log => 2, 'Exiting...'); - - $_[KERNEL]->delay('heartbeat'); # stop the heartbeats - $_->delete() for (@{$_[HEAP]{cron}}); # stop scheduling cron jobs - $_[KERNEL]->signal($_[KERNEL], 'shutdown'); # Broadcast to other sessions +sub pg_error { # ARG: command, errmsg, [ query, params, orig_session, event-args ] + my $s = $_[ARG2] ? sprintf ' (Session: %s, Query: "%s", Params: %s, Args: %s)', + join(', ', $_[KERNEL]->alias_list($_[ARG4])), $_[ARG2], + join(', ', $_[ARG3] ? map qq|"$_"|, @{$_[ARG3]} : '[none]'), $_[ARG5] : ''; + $_[KERNEL]->call(core => log => 'SQL Error for command %s: %s %s', $_[ARG0], $_[ARG1], $s); } 1; - |