summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorYorhel <git@yorhel.nl>2009-05-17 13:45:08 +0200
committerYorhel <git@yorhel.nl>2009-05-17 13:45:08 +0200
commit2eaee749b530098dd9a904609434919bb121b04a (patch)
tree8a953eb0d63b9ec8fe09ce652774122d59c82418 /lib
Initial commit
Diffstat (limited to 'lib')
-rw-r--r--lib/POE/Component/Pg.pm273
1 files changed, 273 insertions, 0 deletions
diff --git a/lib/POE/Component/Pg.pm b/lib/POE/Component/Pg.pm
new file mode 100644
index 0000000..adb06cb
--- /dev/null
+++ b/lib/POE/Component/Pg.pm
@@ -0,0 +1,273 @@
+
+
+package POE::Component::Pg;
+
+use strict;
+use POE;
+use DBI;
+use DBD::Pg ':async';
+use Data::Dumper 'Dumper';
+
+our $VERSION = '0.1';
+
+# General TODO list:
+# - Object interface
+# - Argument validation
+# - Test suite
+# - Documentation
+# - Transactions?
+# - Timeouts?
+
+
+sub QACT { 0 }
+sub QQUERY { 1 }
+sub QPARAM { 2 }
+sub QSESID { 3 }
+sub QEVENT { 4 }
+sub QARG { 5 }
+
+
+sub spawn {
+ my($class, %args) = @_;
+ return POE::Session->create(
+ package_states => [ $class => [qw|
+ _start register unregister connect listen query do addqueue shutdown process_queue dbi_canread dbi_canwrite
+ |]],
+ heap => {
+ %args,
+ dbi => undef,
+ events => [], # [ [ sess_id, local_event, remote_event ], .. ]
+ queue => [],
+ listen => [], # [ [ sess_id, pgsql_event, remote_event ], .. ]
+ state => 0, # 0=idle, 1=write, 2=read
+ shutdown => 0,
+ },
+ );
+}
+
+
+# non-POE helper function
+sub sendevent {
+ my($obj, $event, @arg) = @_;
+ $obj->[KERNEL]->post($_->[0], $_->[2], @arg)
+ for (grep $_->[1] eq $event, @{$obj->[HEAP]{events}});
+}
+
+
+sub _start {
+ $_[KERNEL]->alias_set($_[HEAP]{alias});
+}
+
+
+# arguments: event => callback, event => ..
+# events supported: error, connect
+sub register {
+ my $id = $_[SENDER]->ID;
+ my %events = @_[ARG0..$#_];
+ for my $e (keys %events) {
+ my($r) = grep $_->[0] == $id && $_->[1] eq $e, @{$_[HEAP]{events}};
+ if(!$r) {
+ push @{$_[HEAP]{events}}, [ $_[SENDER]->ID, $e, $events{$e} ];
+ $_[KERNEL]->refcount_increment($_[SENDER]->ID, 'P:C:PG');
+ } else {
+ $r->[2] = $events{$e};
+ }
+ }
+}
+
+
+# arguments: array of event names, no arguments to unregister all
+sub unregister {
+ my $id = $_[SENDER]->ID;
+ my @u = @_[ARG0..$#_];
+ $_[HEAP]{events} = [ grep {
+ my $n = $_->[1];
+ my $rm = $id == $_->[0] && (!@u || grep $n eq $_, @u);
+ $_[KERNEL]->refcount_decrement($id, 'P:C:PG') if $rm;
+ !$rm;
+ } @{$_[HEAP]{events}} ];
+}
+
+
+# WARNING: can block
+sub connect {
+ my %args = (
+ dsn => $_[HEAP]{dsn},
+ user => $_[HEAP]{user},
+ password => $_[HEAP]{password},
+ ref($_[ARG0]) eq 'HASH' ? %{$_[ARG0]} : ()
+ );
+
+ # (re)connect
+ eval {
+ $_[HEAP]{dbi}->disconnect if $_[HEAP]{dbi};
+ $_[HEAP]{dbi} = DBI->connect(
+ @args{qw|dsn user password|},
+ { RaiseError => 1, PrintError => 0, AutoCommit => 1, pg_enable_utf8 => 1 },
+ );
+ };
+ if($@) {
+ $_[HEAP]{dbi} = undef;
+ return sendevent \@_, 'error', 'connect', $@;
+ }
+
+ # put PgSQL's socket into POE's event loop, this enables us
+ # to receive NOTIFY events and SQL results without polling
+ open $_[HEAP]{fh}, '<&=', $_[HEAP]{dbi}->{pg_socket};
+ $_[KERNEL]->select_read($_[HEAP]{fh}, 'dbi_canread');
+ $_[KERNEL]->select_write($_[HEAP]{fh}, 'dbi_canwrite');
+ $_[KERNEL]->select_pause_write($_[HEAP]{fh});
+ $_[KERNEL]->call($_[SESSION], 'process_queue');
+ sendevent(\@_, 'connect') if $_[HEAP]{dbi};
+}
+
+
+sub listen {
+ my $id = $_[SENDER]->ID;
+ my %listen = @_[ARG0..$#_];
+ for my $e (keys %listen) {
+ my($r) = grep $_->[0] == $id && $_->[1] eq $e, @{$_[HEAP]{listen}};
+ if(!$r) {
+ push @{$_[HEAP]{listen}}, [ $_[SENDER]->ID, $e, $listen{$e} ];
+ $_[KERNEL]->refcount_increment($_[SENDER]->ID, 'P:C:PG');
+ } else {
+ $r->[2] = $listen{$e};
+ }
+ $_[KERNEL]->call($_[SESSION], 'do', query => "LISTEN $e");
+ }
+}
+
+
+sub unlisten {
+ my $id = $_[SENDER]->ID;
+ my @u = @_[ARG0..$#_];
+ $_[HEAP]{listen} = [ grep {
+ my $n = $_->[1];
+ my $rm = $id == $_->[0] && (!@u || grep $n eq $_, @u);
+ $_[KERNEL]->refcount_decrement($id, 'P:C:PG') if $rm;
+ !$rm;
+ } @{$_[HEAP]{listen}} ];
+ $_[KERNEL]->call($_[SESSION], 'do', query => "UNLISTEN $_->[1]")
+ for (grep { my $i = $_; !grep $_->[1] eq $i, @{$_[HEAP]{listen}} } @u);
+}
+
+
+# Arguments: hash of: query, params, event, args
+sub query {
+ $_[KERNEL]->call($_[SESSION], 'addqueue', @_[ARG0..$#_], sender => $_[SENDER]->ID, action => 'query');
+}
+
+
+# same as query, but doesn't try to fetch the rows
+sub do {
+ $_[KERNEL]->call($_[SESSION], 'addqueue', @_[ARG0..$#_], sender => $_[SENDER]->ID, action => 'do');
+}
+
+
+sub addqueue {
+ my %opt = @_[ARG0..$#_];
+
+ # add query to the queue
+ push @{$_[HEAP]{queue}}, [ $opt{action}, $opt{query}, $opt{params}, $opt{sender}, $opt{event}, $opt{args} ];
+ $_[KERNEL]->refcount_increment($opt{sender}, 'P:C:PG');
+
+ # if there's no query in progress, initiate query
+ if($_[HEAP]{state} == 0) {
+ $_[HEAP]{state} = 1;
+ $_[KERNEL]->select_resume_write($_[HEAP]{fh});
+ }
+}
+
+
+sub shutdown {
+ $_[HEAP]{shutdown} = 1;
+
+ return if @{$_[HEAP]{queue}} && ($_[ARG0]||'') ne 'NOW';
+
+ $_[KERNEL]->alias_remove($_[HEAP]{alias});
+ if($_[HEAP]{dbi}) {
+ $_[KERNEL]->select_read($_[HEAP]{fh});
+ $_[KERNEL]->select_write($_[HEAP]{fh});
+ $_[HEAP]{dbi}->disconnect;
+ $_[HEAP]{dbi} = undef;
+ }
+
+ $_[KERNEL]->refcount_decrement($_->[0], 'P:C:PG') for (@{$_[HEAP]{events}});
+ $_[KERNEL]->refcount_decrement($_->[0], 'P:C:PG') for (@{$_[HEAP]{listen}});
+ $_[KERNEL]->refcount_decrement($_->[QSESID], 'P:C:PG') for (@{$_[HEAP]{queue}});
+ $_[HEAP]{events} = $_[HEAP]{queue} = $_[HEAP]{listen} = [];
+}
+
+
+sub process_queue {
+ if(@{$_[HEAP]{queue}}) {
+ $_[HEAP]{state} = 1;
+ $_[KERNEL]->select_resume_write($_[HEAP]{fh});
+ } else {
+ $_[HEAP]{state} = 0;
+ $_[KERNEL]->call($_[SESSION], 'shutdown') if $_[HEAP]{shutdown};
+ }
+}
+
+
+sub dbi_canread {
+ # check for query results
+ if($_[HEAP]{state} == 2 && $_[HEAP]{dbi}->pg_ready) {
+ my $item = shift @{$_[HEAP]{queue}};
+
+ # fetch results
+ my($num, $res);
+ eval {
+ $num = $_[HEAP]{q}->pg_result();
+ $res = $_[HEAP]{q}->fetchall_arrayref({}) if $item->[QACT] eq 'query';
+ };
+ $_[HEAP]{q} = undef;
+
+ # send event
+ if($@) {
+ sendevent \@_, 'error', $item->[QACT], $@, $item->[QQUERY], $item->[QPARAM], $item->[QSESID], $item->[QARG];
+ } elsif($item->[QEVENT]) {
+ $_[KERNEL]->post($item->[QSESID], $item->[QEVENT], $num, $res, $item->[QARG] ? $item->[QARG] : ());
+ }
+ $_[KERNEL]->refcount_decrement($item->[QSESID], 'P:C:PG');
+
+ # execute next query in the queue, if any
+ $_[KERNEL]->call($_[SESSION], 'process_queue');
+ }
+
+ # check for any notifications
+ while(my $not = $_[HEAP]{dbi}->pg_notifies) {
+ $_[KERNEL]->post($_->[0], $_->[2], @$not)
+ for (grep $_->[1] eq $not->[0], @{$_[HEAP]{listen}});
+ }
+}
+
+
+sub dbi_canwrite {
+ # execute topmost query in the queue
+ my $item = $_[HEAP]{queue}[0];
+ eval {
+ $_[HEAP]{q} = $_[HEAP]{dbi}->prepare($item->[QQUERY], { pg_async => PG_ASYNC });
+ $_[HEAP]{q}->execute($item->[QPARAM] && ref($item->[QPARAM]) eq 'ARRAY' ? @{$item->[QPARAM]} : ());
+ };
+
+ # send error or enter read state
+ if($@) {
+ sendevent \@_, 'error', $item->[QACT], $@, $item->[QQUERY], $item->[QPARAM], $item->[QSESID], $item->[QARG];
+ $_[KERNEL]->call($_[SESSION], 'process_queue');
+ } else {
+ $_[HEAP]{state} = 2;
+ $_[KERNEL]->select_pause_write($_[HEAP]{fh});
+ }
+}
+
+
+1;
+
+__END__
+
+=head1 NAME
+
+POE::Component::Pg - Truely asynchronous interface to PostgreSQL
+
+