diff options
author | Yorhel <git@yorhel.nl> | 2009-05-17 13:45:08 +0200 |
---|---|---|
committer | Yorhel <git@yorhel.nl> | 2009-05-17 13:45:08 +0200 |
commit | 2eaee749b530098dd9a904609434919bb121b04a (patch) | |
tree | 8a953eb0d63b9ec8fe09ce652774122d59c82418 |
Initial commit
-rw-r--r-- | .gitignore | 5 | ||||
-rwxr-xr-x | Build.PL | 11 | ||||
-rw-r--r-- | lib/POE/Component/Pg.pm | 273 |
3 files changed, 289 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a952957 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +Build +MANIFEST* +META.yml +_build/ +Makefile.PL diff --git a/Build.PL b/Build.PL new file mode 100755 index 0000000..43ffda9 --- /dev/null +++ b/Build.PL @@ -0,0 +1,11 @@ +#!/usr/bin/perl + +use Module::Build; + +Module::Build->new( + module_name => 'POE::Component::Pg', + license => 'perl', + create_makefile_pl => 'passthrough', + dist_author => 'Yoran Heling <yorhel@cpan.org>', +)->create_build_script; + diff --git a/lib/POE/Component/Pg.pm b/lib/POE/Component/Pg.pm new file mode 100644 index 0000000..adb06cb --- /dev/null +++ b/lib/POE/Component/Pg.pm @@ -0,0 +1,273 @@ + + +package POE::Component::Pg; + +use strict; +use POE; +use DBI; +use DBD::Pg ':async'; +use Data::Dumper 'Dumper'; + +our $VERSION = '0.1'; + +# General TODO list: +# - Object interface +# - Argument validation +# - Test suite +# - Documentation +# - Transactions? +# - Timeouts? + + +sub QACT { 0 } +sub QQUERY { 1 } +sub QPARAM { 2 } +sub QSESID { 3 } +sub QEVENT { 4 } +sub QARG { 5 } + + +sub spawn { + my($class, %args) = @_; + return POE::Session->create( + package_states => [ $class => [qw| + _start register unregister connect listen query do addqueue shutdown process_queue dbi_canread dbi_canwrite + |]], + heap => { + %args, + dbi => undef, + events => [], # [ [ sess_id, local_event, remote_event ], .. ] + queue => [], + listen => [], # [ [ sess_id, pgsql_event, remote_event ], .. ] + state => 0, # 0=idle, 1=write, 2=read + shutdown => 0, + }, + ); +} + + +# non-POE helper function +sub sendevent { + my($obj, $event, @arg) = @_; + $obj->[KERNEL]->post($_->[0], $_->[2], @arg) + for (grep $_->[1] eq $event, @{$obj->[HEAP]{events}}); +} + + +sub _start { + $_[KERNEL]->alias_set($_[HEAP]{alias}); +} + + +# arguments: event => callback, event => .. +# events supported: error, connect +sub register { + my $id = $_[SENDER]->ID; + my %events = @_[ARG0..$#_]; + for my $e (keys %events) { + my($r) = grep $_->[0] == $id && $_->[1] eq $e, @{$_[HEAP]{events}}; + if(!$r) { + push @{$_[HEAP]{events}}, [ $_[SENDER]->ID, $e, $events{$e} ]; + $_[KERNEL]->refcount_increment($_[SENDER]->ID, 'P:C:PG'); + } else { + $r->[2] = $events{$e}; + } + } +} + + +# arguments: array of event names, no arguments to unregister all +sub unregister { + my $id = $_[SENDER]->ID; + my @u = @_[ARG0..$#_]; + $_[HEAP]{events} = [ grep { + my $n = $_->[1]; + my $rm = $id == $_->[0] && (!@u || grep $n eq $_, @u); + $_[KERNEL]->refcount_decrement($id, 'P:C:PG') if $rm; + !$rm; + } @{$_[HEAP]{events}} ]; +} + + +# WARNING: can block +sub connect { + my %args = ( + dsn => $_[HEAP]{dsn}, + user => $_[HEAP]{user}, + password => $_[HEAP]{password}, + ref($_[ARG0]) eq 'HASH' ? %{$_[ARG0]} : () + ); + + # (re)connect + eval { + $_[HEAP]{dbi}->disconnect if $_[HEAP]{dbi}; + $_[HEAP]{dbi} = DBI->connect( + @args{qw|dsn user password|}, + { RaiseError => 1, PrintError => 0, AutoCommit => 1, pg_enable_utf8 => 1 }, + ); + }; + if($@) { + $_[HEAP]{dbi} = undef; + return sendevent \@_, 'error', 'connect', $@; + } + + # put PgSQL's socket into POE's event loop, this enables us + # to receive NOTIFY events and SQL results without polling + open $_[HEAP]{fh}, '<&=', $_[HEAP]{dbi}->{pg_socket}; + $_[KERNEL]->select_read($_[HEAP]{fh}, 'dbi_canread'); + $_[KERNEL]->select_write($_[HEAP]{fh}, 'dbi_canwrite'); + $_[KERNEL]->select_pause_write($_[HEAP]{fh}); + $_[KERNEL]->call($_[SESSION], 'process_queue'); + sendevent(\@_, 'connect') if $_[HEAP]{dbi}; +} + + +sub listen { + my $id = $_[SENDER]->ID; + my %listen = @_[ARG0..$#_]; + for my $e (keys %listen) { + my($r) = grep $_->[0] == $id && $_->[1] eq $e, @{$_[HEAP]{listen}}; + if(!$r) { + push @{$_[HEAP]{listen}}, [ $_[SENDER]->ID, $e, $listen{$e} ]; + $_[KERNEL]->refcount_increment($_[SENDER]->ID, 'P:C:PG'); + } else { + $r->[2] = $listen{$e}; + } + $_[KERNEL]->call($_[SESSION], 'do', query => "LISTEN $e"); + } +} + + +sub unlisten { + my $id = $_[SENDER]->ID; + my @u = @_[ARG0..$#_]; + $_[HEAP]{listen} = [ grep { + my $n = $_->[1]; + my $rm = $id == $_->[0] && (!@u || grep $n eq $_, @u); + $_[KERNEL]->refcount_decrement($id, 'P:C:PG') if $rm; + !$rm; + } @{$_[HEAP]{listen}} ]; + $_[KERNEL]->call($_[SESSION], 'do', query => "UNLISTEN $_->[1]") + for (grep { my $i = $_; !grep $_->[1] eq $i, @{$_[HEAP]{listen}} } @u); +} + + +# Arguments: hash of: query, params, event, args +sub query { + $_[KERNEL]->call($_[SESSION], 'addqueue', @_[ARG0..$#_], sender => $_[SENDER]->ID, action => 'query'); +} + + +# same as query, but doesn't try to fetch the rows +sub do { + $_[KERNEL]->call($_[SESSION], 'addqueue', @_[ARG0..$#_], sender => $_[SENDER]->ID, action => 'do'); +} + + +sub addqueue { + my %opt = @_[ARG0..$#_]; + + # add query to the queue + push @{$_[HEAP]{queue}}, [ $opt{action}, $opt{query}, $opt{params}, $opt{sender}, $opt{event}, $opt{args} ]; + $_[KERNEL]->refcount_increment($opt{sender}, 'P:C:PG'); + + # if there's no query in progress, initiate query + if($_[HEAP]{state} == 0) { + $_[HEAP]{state} = 1; + $_[KERNEL]->select_resume_write($_[HEAP]{fh}); + } +} + + +sub shutdown { + $_[HEAP]{shutdown} = 1; + + return if @{$_[HEAP]{queue}} && ($_[ARG0]||'') ne 'NOW'; + + $_[KERNEL]->alias_remove($_[HEAP]{alias}); + if($_[HEAP]{dbi}) { + $_[KERNEL]->select_read($_[HEAP]{fh}); + $_[KERNEL]->select_write($_[HEAP]{fh}); + $_[HEAP]{dbi}->disconnect; + $_[HEAP]{dbi} = undef; + } + + $_[KERNEL]->refcount_decrement($_->[0], 'P:C:PG') for (@{$_[HEAP]{events}}); + $_[KERNEL]->refcount_decrement($_->[0], 'P:C:PG') for (@{$_[HEAP]{listen}}); + $_[KERNEL]->refcount_decrement($_->[QSESID], 'P:C:PG') for (@{$_[HEAP]{queue}}); + $_[HEAP]{events} = $_[HEAP]{queue} = $_[HEAP]{listen} = []; +} + + +sub process_queue { + if(@{$_[HEAP]{queue}}) { + $_[HEAP]{state} = 1; + $_[KERNEL]->select_resume_write($_[HEAP]{fh}); + } else { + $_[HEAP]{state} = 0; + $_[KERNEL]->call($_[SESSION], 'shutdown') if $_[HEAP]{shutdown}; + } +} + + +sub dbi_canread { + # check for query results + if($_[HEAP]{state} == 2 && $_[HEAP]{dbi}->pg_ready) { + my $item = shift @{$_[HEAP]{queue}}; + + # fetch results + my($num, $res); + eval { + $num = $_[HEAP]{q}->pg_result(); + $res = $_[HEAP]{q}->fetchall_arrayref({}) if $item->[QACT] eq 'query'; + }; + $_[HEAP]{q} = undef; + + # send event + if($@) { + sendevent \@_, 'error', $item->[QACT], $@, $item->[QQUERY], $item->[QPARAM], $item->[QSESID], $item->[QARG]; + } elsif($item->[QEVENT]) { + $_[KERNEL]->post($item->[QSESID], $item->[QEVENT], $num, $res, $item->[QARG] ? $item->[QARG] : ()); + } + $_[KERNEL]->refcount_decrement($item->[QSESID], 'P:C:PG'); + + # execute next query in the queue, if any + $_[KERNEL]->call($_[SESSION], 'process_queue'); + } + + # check for any notifications + while(my $not = $_[HEAP]{dbi}->pg_notifies) { + $_[KERNEL]->post($_->[0], $_->[2], @$not) + for (grep $_->[1] eq $not->[0], @{$_[HEAP]{listen}}); + } +} + + +sub dbi_canwrite { + # execute topmost query in the queue + my $item = $_[HEAP]{queue}[0]; + eval { + $_[HEAP]{q} = $_[HEAP]{dbi}->prepare($item->[QQUERY], { pg_async => PG_ASYNC }); + $_[HEAP]{q}->execute($item->[QPARAM] && ref($item->[QPARAM]) eq 'ARRAY' ? @{$item->[QPARAM]} : ()); + }; + + # send error or enter read state + if($@) { + sendevent \@_, 'error', $item->[QACT], $@, $item->[QQUERY], $item->[QPARAM], $item->[QSESID], $item->[QARG]; + $_[KERNEL]->call($_[SESSION], 'process_queue'); + } else { + $_[HEAP]{state} = 2; + $_[KERNEL]->select_pause_write($_[HEAP]{fh}); + } +} + + +1; + +__END__ + +=head1 NAME + +POE::Component::Pg - Truely asynchronous interface to PostgreSQL + + |