1# You may distribute under the terms of either the GNU General Public License 2# or the Artistic License (the same terms as Perl itself) 3# 4# (C) Paul Evans, 2008-2019 -- leonerd@leonerd.org.uk 5 6package Net::Async::HTTP::Connection; 7 8use strict; 9use warnings; 10 11our $VERSION = '0.48'; 12 13use Carp; 14 15use base qw( IO::Async::Stream ); 16IO::Async::Stream->VERSION( '0.59' ); # ->write( ..., on_write ) 17 18use Net::Async::HTTP::StallTimer; 19 20use HTTP::Response; 21 22my $CRLF = "\x0d\x0a"; # More portable than \r\n 23 24use Struct::Dumb; 25struct RequestContext => [qw( req on_read stall_timer resp_header resp_bytes on_done is_done f )], 26 named_constructor => 1; 27 28# Detect whether HTTP::Message properly trims whitespace in header values. If 29# it doesn't, we have to deploy a workaround to fix them up. 30# https://rt.cpan.org/Ticket/Display.html?id=75224 31use constant HTTP_MESSAGE_TRIMS_LWS => HTTP::Message->parse( "Name: value " )->header("Name") eq "value"; 32 33=head1 NAME 34 35C<Net::Async::HTTP::Connection> - HTTP client protocol handler 36 37=head1 DESCRIPTION 38 39This class provides a connection to a single HTTP server, and is used 40internally by L<Net::Async::HTTP>. It is not intended for general use. 41 42=cut 43 44sub _init 45{ 46 my $self = shift; 47 $self->SUPER::_init( @_ ); 48 49 $self->{requests_in_flight} = 0; 50} 51 52sub configure 53{ 54 my $self = shift; 55 my %params = @_; 56 57 foreach (qw( pipeline max_in_flight ready_queue decode_content is_proxy )) { 58 $self->{$_} = delete $params{$_} if exists $params{$_}; 59 } 60 61 if( my $on_closed = $params{on_closed} ) { 62 $params{on_closed} = sub { 63 my $self = shift; 64 65 $self->debug_printf( "CLOSED in-flight=$self->{requests_in_flight}" ); 66 67 $self->error_all( "Connection closed" ); 68 69 undef $self->{ready_queue}; 70 $on_closed->( $self ); 71 }; 72 } 73 74 croak "max_in_flight parameter required, may be zero" unless defined $self->{max_in_flight}; 75 76 $self->SUPER::configure( %params ); 77} 78 79sub should_pipeline 80{ 81 my $self = shift; 82 return $self->{pipeline} && 83 $self->{can_pipeline} && 84 ( !$self->{max_in_flight} || $self->{requests_in_flight} < $self->{max_in_flight} ); 85} 86 87sub connect 88{ 89 my $self = shift; 90 my %args = @_; 91 92 $self->debug_printf( "CONNECT $args{host}:$args{service}" ); 93 94 defined wantarray or die "VOID ->connect"; 95 96 $self->SUPER::connect( 97 socktype => "stream", 98 %args 99 )->on_done( sub { 100 $self->debug_printf( "CONNECTED" ); 101 }); 102} 103 104sub ready 105{ 106 my $self = shift; 107 108 my $queue = $self->{ready_queue} or return; 109 110 if( $self->should_pipeline ) { 111 $self->debug_printf( "READY pipelined" ); 112 while( @$queue && $self->should_pipeline ) { 113 my $ready = shift @$queue; 114 my $f = $ready->future; 115 next if $f->is_cancelled; 116 117 $ready->connecting and $ready->connecting->cancel; 118 119 $f->done( $self ); 120 } 121 } 122 elsif( @$queue and $self->is_idle ) { 123 $self->debug_printf( "READY non-pipelined" ); 124 while( @$queue ) { 125 my $ready = shift @$queue; 126 my $f = $ready->future; 127 next if $f->is_cancelled; 128 129 $ready->connecting and $ready->connecting->cancel; 130 131 $f->done( $self ); 132 last; 133 } 134 } 135 else { 136 $self->debug_printf( "READY cannot-run queue=%d idle=%s", 137 scalar @$queue, $self->is_idle ? "Y" : "N"); 138 } 139} 140 141sub is_idle 142{ 143 my $self = shift; 144 return $self->{requests_in_flight} == 0; 145} 146 147sub on_read 148{ 149 my $self = shift; 150 my ( $buffref, $closed ) = @_; 151 152 while( my $head = $self->{request_queue}[0]) { 153 shift @{ $self->{request_queue} } and next if $head->is_done; 154 155 $head->stall_timer->reset if $head->stall_timer; 156 157 my $ret = $head->on_read->( $self, $buffref, $closed, $head ); 158 159 if( defined $ret ) { 160 return $ret if !ref $ret; 161 162 $head->on_read = $ret; 163 return 1; 164 } 165 166 $head->is_done or die "ARGH: undef return without being marked done"; 167 168 shift @{ $self->{request_queue} }; 169 return 1 if !$closed and length $$buffref; 170 return; 171 } 172 173 # Reinvoked after switch back to baseline, but may be idle again 174 return if $closed or !length $$buffref; 175 176 $self->invoke_error( "Spurious on_read of connection while idle", 177 http_connection => read => $$buffref ); 178 $$buffref = ""; 179} 180 181sub on_write_eof 182{ 183 my $self = shift; 184 $self->error_all( "Connection closed", http => undef, undef ); 185} 186 187sub error_all 188{ 189 my $self = shift; 190 191 while( my $head = shift @{ $self->{request_queue} } ) { 192 $head->f->fail( @_ ) unless $head->is_done or $head->f->is_ready; 193 } 194} 195 196sub request 197{ 198 my $self = shift; 199 my %args = @_; 200 201 my $on_header = $args{on_header} or croak "Expected 'on_header' as a CODE ref"; 202 203 my $req = $args{request}; 204 ref $req and $req->isa( "HTTP::Request" ) or croak "Expected 'request' as a HTTP::Request reference"; 205 206 $self->debug_printf( "REQUEST %s %s", $req->method, $req->uri ); 207 208 my $request_body = $args{request_body}; 209 my $expect_continue = !!$args{expect_continue}; 210 211 my $method = $req->method; 212 213 if( $method eq "POST" or $method eq "PUT" or length $req->content ) { 214 $req->init_header( "Content-Length", length $req->content ); 215 } 216 217 if( $expect_continue ) { 218 $req->init_header( "Expect", "100-continue" ); 219 } 220 221 if( $self->{decode_content} ) { 222 #$req->init_header( "Accept-Encoding", Net::Async::HTTP->can_decode ) 223 $req->init_header( "Accept-Encoding", "gzip" ); 224 } 225 226 my $f = $self->loop->new_future 227 ->set_label( "$method " . $req->uri ); 228 229 # TODO: Cancelling a request Future shouldn't necessarily close the socket 230 # if we haven't even started writing the request yet. But we can't know 231 # that currently. 232 $f->on_cancel( sub { 233 $self->debug_printf( "CLOSE on_cancel" ); 234 $self->close_now; 235 }); 236 237 my $stall_timer; 238 if( $args{stall_timeout} ) { 239 $stall_timer = Net::Async::HTTP::StallTimer->new( 240 delay => $args{stall_timeout}, 241 future => $f, 242 ); 243 $self->add_child( $stall_timer ); 244 # Don't start it yet 245 246 my $remove_timer = sub { 247 $self->remove_child( $stall_timer ) if $stall_timer; 248 undef $stall_timer; 249 }; 250 251 $f->on_ready( $remove_timer ); 252 } 253 254 push @{ $self->{request_queue} }, my $ctx = RequestContext( 255 req => $req, 256 on_read => undef, # will be set later 257 stall_timer => $stall_timer, 258 resp_header => undef, # will be set later 259 resp_bytes => 0, 260 on_done => $args{on_done}, 261 is_done => 0, 262 f => $f, 263 ); 264 265 my $on_body_write; 266 if( $stall_timer or $args{on_body_write} ) { 267 my $inner_on_body_write = $args{on_body_write}; 268 my $written = 0; 269 $on_body_write = sub { 270 $stall_timer->reset if $stall_timer; 271 $inner_on_body_write->( $written += $_[1] ) if $inner_on_body_write; 272 }; 273 } 274 275 my $write_request_body = defined $request_body ? sub { 276 my ( $self ) = @_; 277 $self->write( $request_body, 278 on_write => $on_body_write 279 ); 280 } : undef; 281 282 # Unless the request method is CONNECT, or we are connecting to a 283 # non-transparent proxy, the URL is not allowed to contain 284 # an authority; only path 285 # Take a copy of the headers since we'll be hacking them up 286 my $headers = $req->headers->clone; 287 my $path; 288 if( $method eq "CONNECT" ) { 289 $path = $req->uri->as_string; 290 } 291 else { 292 my $uri = $req->uri; 293 if ( $self->{is_proxy} ) { 294 $path = $uri->as_string; 295 } 296 else { 297 $path = $uri->path_query; 298 $path = "/$path" unless $path =~ m{^/}; 299 } 300 my $authority = $uri->authority; 301 if( defined $authority and 302 my ( $user, $pass, $host ) = $authority =~ m/^(.*?):(.*)@(.*)$/ ) { 303 $headers->init_header( Host => $host ); 304 $headers->authorization_basic( $user, $pass ); 305 } 306 else { 307 $headers->init_header( Host => $authority ); 308 } 309 } 310 311 my $protocol = $req->protocol || "HTTP/1.1"; 312 my @headers = ( "$method $path $protocol" ); 313 $headers->scan( sub { 314 my ( $name, $value ) = @_; 315 $name =~ s/^://; # non-canonical header 316 push @headers, "$name: $value"; 317 } ); 318 319 $stall_timer->start if $stall_timer; 320 $stall_timer->reason = "writing request" if $stall_timer; 321 322 my $on_header_write = $stall_timer ? sub { $stall_timer->reset } : undef; 323 324 $self->write( join( $CRLF, @headers ) . 325 $CRLF . $CRLF, 326 on_write => $on_header_write ); 327 328 $self->write( $req->content, 329 on_write => $on_body_write ) if length $req->content; 330 $write_request_body->( $self ) if $write_request_body and !$expect_continue; 331 332 $self->write( "", on_flush => sub { 333 return unless $stall_timer; # test again in case it was cancelled in the meantime 334 $stall_timer->reset; 335 $stall_timer->reason = "waiting for response"; 336 }) if $stall_timer; 337 338 $self->{requests_in_flight}++; 339 340 $ctx->on_read = $self->_mk_on_read_header( 341 $args{previous_response}, $expect_continue ? $write_request_body : undef, $on_header 342 ); 343 344 return $f; 345} 346 347sub _mk_on_read_header 348{ 349 shift; # $self 350 my ( $previous_response, $write_request_body, $on_header ) = @_; 351 352 sub { 353 my ( $self, $buffref, $closed, $ctx ) = @_; 354 355 my $req = $ctx->req; 356 my $stall_timer = $ctx->stall_timer; 357 my $f = $ctx->f; 358 359 if( $stall_timer ) { 360 $stall_timer->reason = "receiving response header"; 361 $stall_timer->reset; 362 } 363 364 if( length $$buffref >= 4 and $$buffref !~ m/^HTTP/ ) { 365 $self->debug_printf( "ERROR fail" ); 366 $f->fail( "Did not receive HTTP response from server", http => undef, $req ) unless $f->is_cancelled; 367 $self->close_now; 368 } 369 370 unless( $$buffref =~ s/^(.*?$CRLF$CRLF)//s ) { 371 if( $closed ) { 372 $self->debug_printf( "ERROR closed" ); 373 $f->fail( "Connection closed while awaiting header", http => undef, $req ) unless $f->is_cancelled; 374 $self->close_now; 375 } 376 return 0; 377 } 378 379 $ctx->resp_bytes += $+[0]; 380 381 my $header = HTTP::Response->parse( $1 ); 382 # HTTP::Response doesn't strip the \rs from this 383 ( my $status_line = $header->status_line ) =~ s/\r$//; 384 385 $ctx->resp_header = $header; 386 387 unless( HTTP_MESSAGE_TRIMS_LWS ) { 388 my @headers; 389 $header->scan( sub { 390 my ( $name, $value ) = @_; 391 s/^\s+//, s/\s+$// for $value; 392 push @headers, $name => $value; 393 } ); 394 $header->header( @headers ) if @headers; 395 } 396 397 my $protocol = $header->protocol; 398 if( $protocol =~ m{^HTTP/1\.(\d+)$} and $1 >= 1 ) { 399 $self->{can_pipeline} = 1; 400 } 401 402 if( $header->code =~ m/^1/ ) { # 1xx is not a final response 403 $self->debug_printf( "HEADER [provisional] %s", $status_line ); 404 $write_request_body->( $self ) if $write_request_body; 405 return 1; 406 } 407 408 $header->request( $req ); 409 $header->previous( $previous_response ) if $previous_response; 410 411 $self->debug_printf( "HEADER %s", $status_line ); 412 413 my $on_body_chunk = $on_header->( $header ); 414 415 my $code = $header->code; 416 417 my $content_encoding = $header->header( "Content-Encoding" ); 418 419 my $decoder; 420 if( $content_encoding and 421 $decoder = Net::Async::HTTP->can_decode( $content_encoding ) ) { 422 $header->init_header( "X-Original-Content-Encoding" => $header->remove_header( "Content-Encoding" ) ); 423 } 424 425 # can_pipeline is set for HTTP/1.1 or above; presume it can keep-alive if set 426 my $connection_close = lc( $header->header( "Connection" ) || ( $self->{can_pipeline} ? "keep-alive" : "close" ) ) 427 eq "close"; 428 429 if( $connection_close ) { 430 $self->{max_in_flight} = 1; 431 } 432 elsif( defined( my $keep_alive = lc( $header->header("Keep-Alive") || "" ) ) ) { 433 my ( $max ) = ( $keep_alive =~ m/max=(\d+)/ ); 434 $self->{max_in_flight} = $max if $max && $max < $self->{max_in_flight}; 435 } 436 437 my $on_more = sub { 438 my ( $chunk ) = @_; 439 440 if( $decoder and not eval { $chunk = $decoder->( $chunk ); 1 } ) { 441 $self->debug_printf( "ERROR decode failed" ); 442 $f->fail( "Decode error $@", http => undef, $req ); 443 $self->close; 444 return undef; 445 } 446 447 $on_body_chunk->( $chunk ); 448 449 return 1; 450 }; 451 my $on_done = sub { 452 my ( $ctx ) = @_; 453 454 $ctx->is_done++; 455 456 # TODO: IO::Async probably ought to do this. We need to fire the 457 # on_closed event _before_ calling on_body_chunk, to clear the 458 # connection cache in case another request comes - e.g. HEAD->GET 459 $self->close if $connection_close; 460 461 my $final; 462 if( $decoder and not eval { $final = $decoder->(); 1 } ) { 463 $self->debug_printf( "ERROR decode failed" ); 464 $f->fail( "Decode error $@", http => undef, $req ); 465 $self->close; 466 return undef; 467 } 468 469 $on_body_chunk->( $final ) if defined $final and length $final; 470 471 my $response = $on_body_chunk->(); 472 my $e = eval { $f->done( $response ) unless $f->is_cancelled; 1 } ? undef : $@; 473 474 $ctx->on_done->( $ctx ) if $ctx->on_done; 475 476 $self->{requests_in_flight}--; 477 $self->debug_printf( "DONE remaining in-flight=$self->{requests_in_flight}" ); 478 $self->ready; 479 480 if( defined $e ) { 481 chomp $e; 482 $self->invoke_error( $e, perl => ); 483 # This might not return, if it top-level croaks 484 } 485 486 return undef; # Finished 487 }; 488 489 # RFC 2616 says "HEAD" does not have a body, nor do any 1xx codes, nor 490 # 204 (No Content) nor 304 (Not Modified) 491 if( $req->method eq "HEAD" or $code =~ m/^1..$/ or $code eq "204" or $code eq "304" ) { 492 $self->debug_printf( "BODY done [none]" ); 493 return $on_done->( $ctx ); 494 } 495 496 my $transfer_encoding = $header->header( "Transfer-Encoding" ); 497 my $content_length = $header->content_length; 498 499 if( defined $transfer_encoding and $transfer_encoding eq "chunked" ) { 500 $self->debug_printf( "BODY chunks" ); 501 502 $stall_timer->reason = "receiving body chunks" if $stall_timer; 503 return $self->_mk_on_read_chunked( $on_more, $on_done ); 504 } 505 elsif( defined $content_length ) { 506 $self->debug_printf( "BODY length $content_length" ); 507 508 if( $content_length == 0 ) { 509 $self->debug_printf( "BODY done [length=0]" ); 510 return $on_done->( $ctx ); 511 } 512 513 $stall_timer->reason = "receiving body" if $stall_timer; 514 return $self->_mk_on_read_length( $content_length, $on_more, $on_done ); 515 } 516 else { 517 $self->debug_printf( "BODY until EOF" ); 518 519 $stall_timer->reason = "receiving body until EOF" if $stall_timer; 520 return $self->_mk_on_read_until_eof( $on_more, $on_done ); 521 } 522 }; 523} 524 525sub _mk_on_read_chunked 526{ 527 shift; # $self 528 my ( $on_more, $on_done ) = @_; 529 530 my $chunk_length; 531 532 sub { 533 my ( $self, $buffref, $closed, $ctx ) = @_; 534 535 my $req = $ctx->req; 536 my $f = $ctx->f; 537 538 if( !defined $chunk_length and $$buffref =~ s/^(.*?)$CRLF// ) { 539 my $header = $1; 540 $ctx->resp_bytes += $+[0]; 541 542 # Chunk header 543 unless( $header =~ s/^([A-Fa-f0-9]+).*// ) { 544 $f->fail( "Corrupted chunk header", http => undef, $req ) unless $f->is_cancelled; 545 $self->close_now; 546 return 0; 547 } 548 549 $chunk_length = hex( $1 ); 550 return 1 if $chunk_length; 551 552 return $self->_mk_on_read_chunk_trailer( $req, $on_more, $on_done, $f ); 553 } 554 555 # Chunk is followed by a CRLF, which isn't counted in the length; 556 if( defined $chunk_length and length( $$buffref ) >= $chunk_length + 2 ) { 557 # Chunk body 558 my $chunk = substr( $$buffref, 0, $chunk_length, "" ); 559 $ctx->resp_bytes += length $chunk; 560 561 unless( $$buffref =~ s/^$CRLF// ) { 562 $self->debug_printf( "ERROR chunk without CRLF" ); 563 $f->fail( "Chunk of size $chunk_length wasn't followed by CRLF", http => undef, $req ) unless $f->is_cancelled; 564 $self->close; 565 } 566 567 $ctx->resp_bytes += $+[0]; 568 569 undef $chunk_length; 570 571 return $on_more->( $chunk ); 572 } 573 574 if( $closed ) { 575 $self->debug_printf( "ERROR closed" ); 576 $f->fail( "Connection closed while awaiting chunk", http => undef, $req ) unless $f->is_cancelled; 577 } 578 return 0; 579 }; 580} 581 582sub _mk_on_read_chunk_trailer 583{ 584 shift; # $self 585 my ( undef, $on_more, $on_done ) = @_; 586 587 my $trailer = ""; 588 589 sub { 590 my ( $self, $buffref, $closed, $ctx ) = @_; 591 592 my $req = $ctx->req; 593 my $f = $ctx->f; 594 595 if( $closed ) { 596 $self->debug_printf( "ERROR closed" ); 597 $f->fail( "Connection closed while awaiting chunk trailer", http => undef, $req ) unless $f->is_cancelled; 598 } 599 600 $$buffref =~ s/^(.*)$CRLF// or return 0; 601 $trailer .= $1; 602 $ctx->resp_bytes += $+[0]; 603 604 return 1 if length $1; 605 606 # TODO: Actually use the trailer 607 608 $self->debug_printf( "BODY done [chunked]" ); 609 return $on_done->( $ctx ); 610 }; 611} 612 613sub _mk_on_read_length 614{ 615 shift; # $self 616 my ( $content_length, $on_more, $on_done ) = @_; 617 618 sub { 619 my ( $self, $buffref, $closed, $ctx ) = @_; 620 621 my $req = $ctx->req; 622 my $f = $ctx->f; 623 624 # This will truncate it if the server provided too much 625 my $content = substr( $$buffref, 0, $content_length, "" ); 626 $content_length -= length $content; 627 $ctx->resp_bytes += length $content; 628 629 return undef unless $on_more->( $content ); 630 631 if( $content_length == 0 ) { 632 $self->debug_printf( "BODY done [length]" ); 633 return $on_done->( $ctx ); 634 } 635 636 if( $closed ) { 637 $self->debug_printf( "ERROR closed" ); 638 $f->fail( "Connection closed while awaiting body", http => undef, $req ) unless $f->is_cancelled; 639 } 640 return 0; 641 }; 642} 643 644sub _mk_on_read_until_eof 645{ 646 shift; # $self 647 my ( $on_more, $on_done ) = @_; 648 649 sub { 650 my ( $self, $buffref, $closed, $ctx ) = @_; 651 652 my $content = $$buffref; 653 $$buffref = ""; 654 $ctx->resp_bytes += length $content; 655 656 return undef unless $on_more->( $content ); 657 658 return 0 unless $closed; 659 660 $self->debug_printf( "BODY done [eof]" ); 661 return $on_done->( $ctx ); 662 }; 663} 664 665=head1 AUTHOR 666 667Paul Evans <leonerd@leonerd.org.uk> 668 669=cut 670 6710x55AA; 672