i have a web crawler using POE::Component::Client::HTTP that has been
working fine to suck down non-small binary files (pdf, ps, tar.gz,
etc.). i tried to eek out a little more performance by using IO::AIO,
but now about 5% of the downloads are corrupt. they are the correct
size as indicated by the Content-Length header, but the contents are
invalid data. so i'm guessing that the response data is sometimes
being written out of order.
i know PerlBal does something similar with IO::AIO, so i'm doubting
that aio threads are messing the order. can anybody see a heisenbug
in my code below? thanks.
sub ua_make_request {
my ($kernel, $heap, $session) = @_[ KERNEL, HEAP, SESSION ];
return if $heap->{is_shutdown};
if ( @{ $heap->{ids} } < $MAX_PARALLEL * 2 ) {
$kernel->yield('dbi_get_urls');
}
my $url= shift @{ $heap->{urls} };
# Check later.
unless ($url) {
$kernel->delay_set( ua_make_request => 60 );
return;
}
# construct HTTP::Request and set $file and $path
...
aio_mkdir( $path, 0777, sub {
my $ret = $_[0];
if ( $ret and not $!{EEXIST} ) {
printf STDERR "%s: $!\n", $path;
return $kernel->post( $session, ua_cleanup_request =>
$req, $ret );
}
my $file = sprintf "%s.tmp", $heap->{req}{$req}{file};
aio_open( $file, O_WRONLY|O_CREAT|O_TRUNC, 0644, sub {
unless ( $heap->{req}{$req}{fh} = $_[0] ) {
printf STDERR "%s: $!\n", $file;
return $kernel->post( $session, ua_cleanup_request =>
$req, -1 );
}
$kernel->post(
ua => request => ua_got_response => $req => "$req",
undef,
$proxy->proxy
);
});
});
}
sub ua_got_response {
my ($kernel, $heap, $session) = @_[KERNEL, HEAP, SESSION];
my ($req, $res, $data) = ( $_[ARG0]->[0], @{ $_[ARG1] } );
# Might have previously cancelled the request.
my $href = $heap->{req}{$req} or return;
if ($res->is_error) {
...
}
elsif (defined $data) {
my ($fh, $file, $got) = @$href{qw(fh file got)};
my $len = do { use bytes; length $data };
$href->{is_writing} = 1;
# Only need to do these tests on the initial response.
if ($got == 0) {
...
}
aio_write( $fh, $got, $len, $data, 0, sub {
my $put = $_[0];
if ($put < 0) {
printf STDERR "Failed to write to %s: $!\n", $file;
$kernel->call( $session, ua_cleanup_request => $req,
-1 );
}
elsif ($put != $len) {
printf STDERR "Only wrote %d / %d bytes for %s\n",
$put, $len,
$file;
$kernel->call( $session, ua_cleanup_request => $req,
-1 );
}
else {
$href->{got} += $len;
$href->{is_writing} = 0;
$href->{response} = $res;
# Call after writing each chunk to avoid a race
condition when
# the last chunk is scheduled to be written, but the
last empty
# response is received before the writing is finished.
$kernel->yield( ua_finish_response => $res );
}
});
}
else {
$href->{is_done} = 1;
$kernel->yield( ua_finish_response => $res );
}
}
sub ua_finish_response {
my ($kernel, $heap, $res) = @_[ KERNEL, HEAP, ARG0 ];
my ($req, $ret) = ( $res->request, -1 );
my $href = $heap->{req}{$req};
return if not $href->{is_done} or $href->{is_writing};
# Prevent race condition.
$href->{is_writing} = 1;
if ($res->is_error) {
...
}
else {
# rename the file
}
}
|