Sunday, 29 September 2013

How to do asynchronous www-mechanize using anyevent

How to do asynchronous www-mechanize using anyevent

I've been doing a fair amount of research on the topic and while there are
some questions out there that relate, I'm really having a hard time
understanding how to properly do async programming using AnyEvent and
www-mechanize. I'm trying to stick with mechanize because it has a clean
interface and has functions built-in that I'm expecting to do: (like get
all images of a site etc). If there is no reliable/good way to do what I
want, then I'll start looking at AnyEvent::HTTP but I figure I'd ask first
before moving in that direction.
I'm a newbie to AnyEvent programming but have done a fair amount of perl
and javascript / jquery async calls with callbacks before. These make a
lot of sense to me but it's just not clicking for me with AnyEvent + Mech.
Here is the code I'm working on that pulls URLs from an upstream queue.
give the URL, I want to one get that says pulls in all the images on a
page, and then async. grabs all the images.
So pseudo-code would look something like this:
grab url from queue get page get all img url links do many async calls on
the img urls (store the imgs for example in a backend)
I've read, I cannot (after researching errors) block in an AnyEvent
callback. How do I structure my program to do the async calls without
blocking?
I'm using LWP::Protocol::AnyEvent::http, which i don't fully understand
but from what I've gathered, magically makes LWP non-blocking?
The worker gets created like:
my Worker->new(upstream_job_url => "tcp://127.0.0.1:5555', run_on_create
=> 1);
Async part is sub _recv_msg which calls _proc_msg.
I already have an AnyEvent loop watching the ZeroMQ socket as per the
ZeroMQ perl binding docks...
Any help much appreciated!
Code:
package Worker;
use 5.12.0;
use Moose;
use AnyEvent;
use LWP::Protocol::AnyEvent::http;
use ZMQ::LibZMQ3;
use ZMQ::Constants qw/ZMQ_PUSH ZMQ_PULL ZMQ_POLLIN ZMQ_FD/;
use JSON;
use WWW::Mechanize;
use Carp;
has 'max_children' => (
is => 'rw',
isa => 'Int',
required => 1,
default => sub { 0 }
);
has 'upstream_job_url' => (
is => 'rw',
isa => 'URI',
required => 1,
);
has ['uri','sink_url'] => (
is => 'rw',
isa => 'URI',
required => 0,
);
has 'run_on_create' => (
is => 'rw',
isa => 'Bool',
required => 1,
default => sub { 1 }
);
has '_receiver' => (
is => 'rw',
isa => 'ZMQ::LibZMQ3::Socket',
required => 0
);
sub BUILD {
my $self = shift;
$self->start if $self->run_on_create;
}
sub start
{
my $self = shift;
$self->_init_zmq();
my $fh = zmq_getsockopt( $self->_receiver, ZMQ_FD );
my $w; $w = AnyEvent->io( fh => $fh, poll => "r", cb => sub {
$self->_recv_msg } );
AnyEvent->condvar->recv;
}
sub _init_zmq
{
my $self = shift;
my $c = zmq_init() or die "zmq_init: $!\n";
my $recv = zmq_socket($c, ZMQ_PULL) or die "zmq_socket: $!\n";
if( zmq_connect($recv, $self->upstream_job_url) != 0 ) {
croak "zmq_connect: $!\n";
}
$self->_receiver($recv);
}
sub _recv_msg
{
my $self = shift;
while(my $message = zmq_msg_data(zmq_recvmsg($self->_receiver)) ) {
my $msg = JSON::from_json($message, {utf8 => 1});
$self->uri(URI->new($msg->{url}));
$self->_proc_msg;
}
}
sub _proc_msg
{
my $self = shift;
my $c = async {
my $ua = WWW::Mechanize->new;
$ua->protocols_allowed(['http']);
print "$$ processing " . $self->uri->as_string . "... ";
$ua->get($self->uri->as_string);
if ($ua->success()) {
say $ua->status . " OK";
} else {
say $ua->status . " NOT OK";
}
};
$c->join;
}
1;

No comments:

Post a Comment