summaryrefslogtreecommitdiff
path: root/lib/Multi/Core.pm
blob: 1eea66ca1dacc5e98f1ab4c008206711cd65780f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128

#
#  Multi::Core  -  handles logging and the main command queue
#

package Multi::Core;

use strict;
use warnings;
use POE;
use Storable 'freeze', 'thaw';
use IPC::ShareLite ':lock';
use Time::HiRes 'time', 'gettimeofday', 'tv_interval'; # overload time()


sub spawn {
  my $p = shift;
  POE::Session->create(
    package_states => [
      $p => [qw| _start register fetch queue execute finish log cmd_exit |],
    ],
    heap => { queue => [], cmds => [], running => 0, starttime => 0 },
  );
}


sub _start {
  $_[KERNEL]->alias_set('core');
  $_[KERNEL]->call(core => register => qr/^(exit|reload)$/, 'cmd_exit');
  $_[KERNEL]->yield(queue => $_) for (grep !/^-/, @ARGV);
  $_[KERNEL]->yield(fetch => time) if $Multi::DAEMONIZE != 1;
}


sub register { # regex, state
  push @{$_[HEAP]{cmds}}, [ $_[ARG0], $_[SENDER], $_[ARG1] ];
  (my $p = $_[SENDER][2]{$_[CALLER_STATE]}[0]) =~ s/^Multi:://; # NOT PORTABLE
  $_[KERNEL]->call(core => log => 3, "Command '%s' handled by %s::%s", $_[ARG0], $p, $_[ARG1]);
}


sub fetch { # lastfetch
  my $s = IPC::ShareLite->new(-key => $VNDB::SHMKEY,-create => 1, -destroy => 0);
  $s->lock(LOCK_SH);
  my $l = $s->fetch();
  if($l) {
    my $cmds = thaw($l);
    $_[KERNEL]->yield(queue => $_) for(@$cmds);
    $s->lock(LOCK_EX);
    $s->store('');
  }
  $s->unlock;
  undef $s;

  $_[KERNEL]->call(core => log => 1, 'Heartbeat took %.2fs, possible block', time-$_[ARG0])
    if time > $_[ARG0]+3;
  $_[KERNEL]->delay(fetch => 1, time) if $Multi::DAEMONIZE == 0;
}


sub queue { # cmd
  push @{$_[HEAP]{queue}}, $_[ARG0];
  $_[KERNEL]->call(core => log => 3, "Queuing '%s'. Queue size: %d", $_[ARG0], scalar @{$_[HEAP]{queue}});
  if(!$_[HEAP]{running}) {
    $_[KERNEL]->yield(execute => $_[ARG0]);
    $_[HEAP]{running} = 1;
  }
}


sub execute { # cmd 
  $_[HEAP]{starttime} = [ gettimeofday ];
  my $cmd = (grep { $_[ARG0] =~ /$_->[0]/ } @{$_[HEAP]{cmds}})[0];
  if(!$cmd) {
    $_[KERNEL]->call(core => log => 1, 'Unknown cmd: %s', $_[ARG0]);
    $_[KERNEL]->yield(finish => $_[ARG0]);
    return;
  }
  $_[KERNEL]->call(core => log => 2, 'Executing cmd: %s', $_[ARG0]);
  $_[ARG0] =~ /$cmd->[0]/;  # determine arguments (see perlvar for the magic)
  my @arg = $#- ? map { substr $_[ARG0], $-[$_], $+[$_]-$-[$_] } 1..$#- : ();
  $_[KERNEL]->post($cmd->[1] => $cmd->[2], $_[ARG0], @arg);
}


sub finish { # cmd [, stop ]
  $_[HEAP]{running} = 0;
  $_[HEAP]{queue} = [ grep { $_ ne $_[ARG0] } @{$_[HEAP]{queue}} ];
  $_[KERNEL]->call(core => log => 2, "Unqueuing '%s' after %.2fs. Queue size: %d",
    $_[ARG0], tv_interval($_[HEAP]{starttime}), scalar @{$_[HEAP]{queue}});
  if(@{$_[HEAP]{queue}} && !$_[ARG1]) {
    $_[KERNEL]->yield(execute => $_[HEAP]{queue}[0]);
    $_[HEAP]{running} = 1;
  }
}


sub log { # level, msg
  return if $_[ARG0] > $Multi::LOGLVL; 
  #open(my $F, \&STDOUT); #'>>', $Multi::LOGDIR.'/multi.log');
  (my $p = $_[SENDER][2]{$_[CALLER_STATE]}[0]) =~ s/^Multi:://; # NOT PORTABLE
  printf "[%s] (%s) %s::%s: %s\n", scalar localtime,
    (qw|WRN ACT DBG|)[$_[ARG0]-1], $p, $_[CALLER_STATE],
    $_[ARG2] ? sprintf($_[ARG1], @_[ARG2..$#_]) : $_[ARG1];
  #close $F;
}


sub cmd_exit {
  $Multi::RESTART = 1 if $_[ARG0] eq 'reload';
  $_[KERNEL]->call(core => finish => $_[ARG0], 1);
  $_[KERNEL]->call(core => log => 2, 'Exiting...');

  
  my $s = IPC::ShareLite->new(-key => 'VNDB',-create => 1, -destroy => 0);
  $s->lock(LOCK_EX);
  $s->store(freeze($_[HEAP]->{queue}));
  $s->unlock();
  undef $s;

  $_[KERNEL]->delay('fetch');                 # This'll make the current session stop
  $_[KERNEL]->signal($_[KERNEL], 'shutdown'); # Broadcast to other sessions
}


1;