package POE::Component::Pg; use strict; use POE; our $VERSION = '0.1'; # --------------------- # C O N S T R U C T O R sub spawn { my($class, %args) = @_; my $self = bless { %args, dbi => undef, events => [], # [ [ sess_id, local_event, remote_event ], .. ] queue => [], listen => [], # [ [ sess_id, pgsql_event, remote_event ], .. ] state => 0, # 0=idle, 1=write, 2=read shutdown => 0, }, $class; my $s = POE::Session->create( package_states => [ $class.'::STATES' => [qw| _start register unregister connect listen query do addqueue shutdown process_queue dbi_canread dbi_canwrite |]], heap => $self, ) or return undef; return $self; } # ------------- # M E T H O D S sub dbi_handle { return shift->{dbi}; } sub session_id { return shift->{session_id}; } sub yield { $poe_kernel->post(shift->session_id, @_); } sub call { $poe_kernel->call(shift->session_id, @_); } # --------- # I N P U T package POE::Component::Pg::STATES; use POE; use DBI; use DBD::Pg ':async'; sub QACT { 0 } sub QSESID { 1 } sub QQUERY { 2 } sub QPARAM { 3 } sub QEVENT { 4 } sub QARG { 5 } # non-POE helper function sub sendevent { my($obj, $event, @arg) = @_; $obj->[KERNEL]->post($_->[0], $_->[2], @arg) for (grep $_->[1] eq $event, @{$obj->[HEAP]{events}}); } sub _start { $_[KERNEL]->alias_set($_[HEAP]{alias}); $_[HEAP]{session_id} = $_[SESSION]->ID(); } # arguments: event => callback, event => .. # events supported: error, connect sub register { my $id = $_[SENDER]->ID; my %events = @_[ARG0..$#_]; for my $e (keys %events) { my($r) = grep $_->[0] == $id && $_->[1] eq $e, @{$_[HEAP]{events}}; if(!$r) { push @{$_[HEAP]{events}}, [ $_[SENDER]->ID, $e, $events{$e} ]; $_[KERNEL]->refcount_increment($_[SENDER]->ID, 'P:C:PG'); } else { $r->[2] = $events{$e}; } } } # arguments: array of event names, no arguments to unregister all sub unregister { my $id = $_[SENDER]->ID; my @u = @_[ARG0..$#_]; $_[HEAP]{events} = [ grep { my $n = $_->[1]; my $rm = $id == $_->[0] && (!@u || grep $n eq $_, @u); $_[KERNEL]->refcount_decrement($id, 'P:C:PG') if $rm; !$rm; } @{$_[HEAP]{events}} ]; } # WARNING: can block sub connect { my %args = ( dsn => $_[HEAP]{dsn}, user => $_[HEAP]{user}, password => $_[HEAP]{password}, ref($_[ARG0]) eq 'HASH' ? %{$_[ARG0]} : () ); # (re)connect eval { $_[HEAP]{dbi}->disconnect if $_[HEAP]{dbi}; $_[HEAP]{dbi} = DBI->connect( @args{qw|dsn user password|}, { RaiseError => 1, PrintError => 0, AutoCommit => 1 }, ); }; if($@) { $_[HEAP]{dbi} = undef; return sendevent \@_, 'error', 'connect', $@; } # put PgSQL's socket into POE's event loop, this enables us # to receive NOTIFY events and SQL results without polling open $_[HEAP]{fh}, '<&=', $_[HEAP]{dbi}->{pg_socket}; $_[KERNEL]->select_read($_[HEAP]{fh}, 'dbi_canread'); $_[KERNEL]->select_write($_[HEAP]{fh}, 'dbi_canwrite'); $_[KERNEL]->select_pause_write($_[HEAP]{fh}); $_[KERNEL]->call($_[SESSION], 'process_queue'); sendevent(\@_, 'connect') if $_[HEAP]{dbi}; } sub listen { my $id = $_[SENDER]->ID; my %listen = @_[ARG0..$#_]; for my $e (keys %listen) { my($r) = grep $_->[0] == $id && $_->[1] eq $e, @{$_[HEAP]{listen}}; if(!$r) { push @{$_[HEAP]{listen}}, [ $id, $e, $listen{$e} ]; $_[KERNEL]->refcount_increment($id, 'P:C:PG'); } else { $r->[2] = $listen{$e}; } $_[KERNEL]->call($_[SESSION], 'do', "LISTEN $e"); } } sub unlisten { my $id = $_[SENDER]->ID; my @u = @_[ARG0..$#_]; $_[HEAP]{listen} = [ grep { my $n = $_->[1]; my $rm = $id == $_->[0] && (!@u || grep $n eq $_, @u); $_[KERNEL]->refcount_decrement($id, 'P:C:PG') if $rm; !$rm; } @{$_[HEAP]{listen}} ]; $_[KERNEL]->call($_[SESSION], 'do', "UNLISTEN $_->[1]") for (grep { my $i = $_; !grep $_->[1] eq $i, @{$_[HEAP]{listen}} } @u); } # Arguments: query, params, event, args sub query { $_[KERNEL]->call($_[SESSION], 'addqueue', [ 'query', $_[SENDER]->ID, @_[ARG0..$#_] ]); } # same as query, but doesn't try to fetch the rows sub do { $_[KERNEL]->call($_[SESSION], 'addqueue', [ 'do', $_[SENDER]->ID, @_[ARG0..$#_] ]); } sub addqueue { # add query to the queue push @{$_[HEAP]{queue}}, $_[ARG0]; $_[KERNEL]->refcount_increment($_[ARG0][QSESID], 'P:C:PG'); # if there's no query in progress, initiate query if($_[HEAP]{state} == 0) { $_[HEAP]{state} = 1; $_[KERNEL]->select_resume_write($_[HEAP]{fh}); } } sub shutdown { $_[HEAP]{shutdown} = 1; return if @{$_[HEAP]{queue}} && ($_[ARG0]||'') ne 'NOW'; $_[KERNEL]->alias_remove($_[HEAP]{alias}); if($_[HEAP]{dbi}) { $_[KERNEL]->select_read($_[HEAP]{fh}); $_[KERNEL]->select_write($_[HEAP]{fh}); $_[HEAP]{dbi}->disconnect; $_[HEAP]{dbi} = undef; } $_[KERNEL]->refcount_decrement($_->[0], 'P:C:PG') for (@{$_[HEAP]{events}}); $_[KERNEL]->refcount_decrement($_->[0], 'P:C:PG') for (@{$_[HEAP]{listen}}); $_[KERNEL]->refcount_decrement($_->[QSESID], 'P:C:PG') for (@{$_[HEAP]{queue}}); $_[HEAP]{events} = $_[HEAP]{queue} = $_[HEAP]{listen} = []; } sub process_queue { if(@{$_[HEAP]{queue}}) { $_[HEAP]{state} = 1; $_[KERNEL]->select_resume_write($_[HEAP]{fh}); } else { $_[HEAP]{state} = 0; $_[KERNEL]->call($_[SESSION], 'shutdown') if $_[HEAP]{shutdown}; } } sub dbi_canread { # check for query results if($_[HEAP]{state} == 2 && $_[HEAP]{dbi}->pg_ready) { my $item = shift @{$_[HEAP]{queue}}; # fetch results my($num, $res); eval { $num = $_[HEAP]{q}->pg_result(); $res = $_[HEAP]{q}->fetchall_arrayref({}) if $item->[QACT] eq 'query'; }; $_[HEAP]{q} = undef; # send event if($@) { sendevent \@_, 'error', $item->[QACT], $@, $item->[QQUERY], $item->[QPARAM], $item->[QSESID], $item->[QARG]; } elsif($item->[QEVENT]) { $_[KERNEL]->post($item->[QSESID], $item->[QEVENT], $num, $res, exists $item->[QARG] ? $item->[QARG] : ()); } $_[KERNEL]->refcount_decrement($item->[QSESID], 'P:C:PG'); # execute next query in the queue, if any $_[KERNEL]->call($_[SESSION], 'process_queue'); } # check for any notifications while(my $not = $_[HEAP]{dbi}->pg_notifies) { $_[KERNEL]->post($_->[0], $_->[2], @$not) for (grep $_->[1] eq $not->[0], @{$_[HEAP]{listen}}); } } sub dbi_canwrite { # execute topmost query in the queue my $item = $_[HEAP]{queue}[0]; eval { $_[HEAP]{q} = $_[HEAP]{dbi}->prepare($item->[QQUERY], { pg_async => PG_ASYNC }); $_[HEAP]{q}->execute($item->[QPARAM] && ref($item->[QPARAM]) eq 'ARRAY' ? @{$item->[QPARAM]} : ()); }; # send error or enter read state if($@) { sendevent \@_, 'error', $item->[QACT], $@, $item->[QQUERY], $item->[QPARAM], $item->[QSESID], $item->[QARG]; $_[KERNEL]->call($_[SESSION], 'process_queue'); } else { $_[HEAP]{state} = 2; $_[KERNEL]->select_pause_write($_[HEAP]{fh}); } } 1; __END__ =head1 NAME POE::Component::Pg - Truly asynchronous interface to PostgreSQL