summaryrefslogtreecommitdiff
path: root/lib/Multi/Core.pm
diff options
context:
space:
mode:
authorYorhel <git@yorhel.nl>2014-10-22 13:05:11 +0200
committerYorhel <git@yorhel.nl>2014-10-22 13:05:11 +0200
commit4166a6a8813fbd074c41e6d456f4eef08d86cf01 (patch)
tree6e7615e0a143da569d9121f84f5908b0ecac18da /lib/Multi/Core.pm
parentbc07a24f87de5da7f12907d1486099766b7fa27b (diff)
Multi: Add pg_cmd function for more robust error handling
Diffstat (limited to 'lib/Multi/Core.pm')
-rw-r--r--lib/Multi/Core.pm43
1 files changed, 39 insertions, 4 deletions
diff --git a/lib/Multi/Core.pm b/lib/Multi/Core.pm
index 5bfa2a19..ac58403c 100644
--- a/lib/Multi/Core.pm
+++ b/lib/Multi/Core.pm
@@ -15,7 +15,7 @@ use DBI;
use POSIX 'setsid', 'pause', 'SIGUSR1';
use Exporter 'import';
-our @EXPORT = qw|pg pg_expect schedule push_watcher|;
+our @EXPORT = qw|pg pg_cmd pg_expect schedule push_watcher|;
my $PG;
@@ -76,6 +76,7 @@ sub load_pg {
my %vars = split /[,=]/, $dsn[4];
$PG = AnyEvent::Pg::Pool->new(
{%vars, user => $db[1], password => $db[2], host => 'localhost'},
+ timeout => 0, # Some maintenance queries can take a while to run...
on_error => sub { die "Lost connection to PostgreSQL\n"; },
on_connect_error => sub { die "Lost connection to PostgreSQL\n"; },
);
@@ -151,15 +152,49 @@ sub schedule {
# Logs any unexpected results and returns 0 if the expectations were met.
sub pg_expect {
my($res, $exp) = @_;
- return 0 if !$exp && $res->status == PGRES_COMMAND_OK;
- return 0 if $exp && $res->status == PGRES_TUPLES_OK;
- AE::log alert => $res->errorMessage
+ return 0 if !$exp && $res && $res->status == PGRES_COMMAND_OK;
+ return 0 if $exp && $res && $res->status == PGRES_TUPLES_OK;
+ AE::log alert => !$res
+ ? sprintf 'AnyEvent::Pg error at %s:%d', (caller)[0,2] : $res->errorMessage
? sprintf 'SQL error at %s:%d: %s', (caller)[0,2], $res->errorMessage
: sprintf 'Unexpected status at %s:%d: %s', (caller)[0,2], $res->statusMessage;
return 1;
}
+# Wrapper around pg->push_query().
+# Args: $query, \@args, sub {}
+# The sub will be called on either on_error or on_done, and has two args: The
+# result and the running time. Only a single on_result is expected. The result
+# argument is undef on error.
+# Unlike most AE watchers, this function does not return a watcher object and
+# can not be cancelled.
+sub pg_cmd {
+ my($q, $a, $s) = @_;
+ my $r;
+ my $w; $w = pg->push_query(
+ query => $q,
+ $a ? (args => $a) : (),
+ on_error => sub {
+ undef $w;
+ $s->(undef, 0);
+ },
+ on_result => sub {
+ if($r) {
+ AE::log warn => "Received more than one result for query: $q";
+ $s->(undef, 0);
+ } else {
+ $r = $_[2];
+ }
+ },
+ on_done => sub {
+ undef $w;
+ $s->($r, $_[2]);
+ },
+ );
+}
+
+
# Tiny class for forwarding output for STDERR/STDOUT to the log file using tie().
package Multi::Core::STDIO;