1#  You may distribute under the terms of either the GNU General Public License
2#  or the Artistic License (the same terms as Perl itself)
3#
4#  (C) Paul Evans, 2008-2019 -- leonerd@leonerd.org.uk
5
6package Net::Async::HTTP::Connection;
7
8use strict;
9use warnings;
10
11our $VERSION = '0.48';
12
13use Carp;
14
15use base qw( IO::Async::Stream );
16IO::Async::Stream->VERSION( '0.59' ); # ->write( ..., on_write )
17
18use Net::Async::HTTP::StallTimer;
19
20use HTTP::Response;
21
22my $CRLF = "\x0d\x0a"; # More portable than \r\n
23
24use Struct::Dumb;
25struct RequestContext => [qw( req on_read stall_timer resp_header resp_bytes on_done is_done f )],
26   named_constructor => 1;
27
28# Detect whether HTTP::Message properly trims whitespace in header values. If
29# it doesn't, we have to deploy a workaround to fix them up.
30#   https://rt.cpan.org/Ticket/Display.html?id=75224
31use constant HTTP_MESSAGE_TRIMS_LWS => HTTP::Message->parse( "Name:   value  " )->header("Name") eq "value";
32
33=head1 NAME
34
35C<Net::Async::HTTP::Connection> - HTTP client protocol handler
36
37=head1 DESCRIPTION
38
39This class provides a connection to a single HTTP server, and is used
40internally by L<Net::Async::HTTP>. It is not intended for general use.
41
42=cut
43
44sub _init
45{
46   my $self = shift;
47   $self->SUPER::_init( @_ );
48
49   $self->{requests_in_flight} = 0;
50}
51
52sub configure
53{
54   my $self = shift;
55   my %params = @_;
56
57   foreach (qw( pipeline max_in_flight ready_queue decode_content is_proxy )) {
58      $self->{$_} = delete $params{$_} if exists $params{$_};
59   }
60
61   if( my $on_closed = $params{on_closed} ) {
62      $params{on_closed} = sub {
63         my $self = shift;
64
65         $self->debug_printf( "CLOSED in-flight=$self->{requests_in_flight}" );
66
67         $self->error_all( "Connection closed" );
68
69         undef $self->{ready_queue};
70         $on_closed->( $self );
71      };
72   }
73
74   croak "max_in_flight parameter required, may be zero" unless defined $self->{max_in_flight};
75
76   $self->SUPER::configure( %params );
77}
78
79sub should_pipeline
80{
81   my $self = shift;
82   return $self->{pipeline} &&
83          $self->{can_pipeline} &&
84          ( !$self->{max_in_flight} || $self->{requests_in_flight} < $self->{max_in_flight} );
85}
86
87sub connect
88{
89   my $self = shift;
90   my %args = @_;
91
92   $self->debug_printf( "CONNECT $args{host}:$args{service}" );
93
94   defined wantarray or die "VOID ->connect";
95
96   $self->SUPER::connect(
97      socktype => "stream",
98      %args
99   )->on_done( sub {
100      $self->debug_printf( "CONNECTED" );
101   });
102}
103
104sub ready
105{
106   my $self = shift;
107
108   my $queue = $self->{ready_queue} or return;
109
110   if( $self->should_pipeline ) {
111      $self->debug_printf( "READY pipelined" );
112      while( @$queue && $self->should_pipeline ) {
113         my $ready = shift @$queue;
114         my $f = $ready->future;
115         next if $f->is_cancelled;
116
117         $ready->connecting and $ready->connecting->cancel;
118
119         $f->done( $self );
120      }
121   }
122   elsif( @$queue and $self->is_idle ) {
123      $self->debug_printf( "READY non-pipelined" );
124      while( @$queue ) {
125         my $ready = shift @$queue;
126         my $f = $ready->future;
127         next if $f->is_cancelled;
128
129         $ready->connecting and $ready->connecting->cancel;
130
131         $f->done( $self );
132         last;
133      }
134   }
135   else {
136      $self->debug_printf( "READY cannot-run queue=%d idle=%s",
137         scalar @$queue, $self->is_idle ? "Y" : "N");
138   }
139}
140
141sub is_idle
142{
143   my $self = shift;
144   return $self->{requests_in_flight} == 0;
145}
146
147sub on_read
148{
149   my $self = shift;
150   my ( $buffref, $closed ) = @_;
151
152   while( my $head = $self->{request_queue}[0]) {
153      shift @{ $self->{request_queue} } and next if $head->is_done;
154
155      $head->stall_timer->reset if $head->stall_timer;
156
157      my $ret = $head->on_read->( $self, $buffref, $closed, $head );
158
159      if( defined $ret ) {
160         return $ret if !ref $ret;
161
162         $head->on_read = $ret;
163         return 1;
164      }
165
166      $head->is_done or die "ARGH: undef return without being marked done";
167
168      shift @{ $self->{request_queue} };
169      return 1 if !$closed and length $$buffref;
170      return;
171   }
172
173   # Reinvoked after switch back to baseline, but may be idle again
174   return if $closed or !length $$buffref;
175
176   $self->invoke_error( "Spurious on_read of connection while idle",
177      http_connection => read => $$buffref );
178   $$buffref = "";
179}
180
181sub on_write_eof
182{
183   my $self = shift;
184   $self->error_all( "Connection closed", http => undef, undef );
185}
186
187sub error_all
188{
189   my $self = shift;
190
191   while( my $head = shift @{ $self->{request_queue} } ) {
192      $head->f->fail( @_ ) unless $head->is_done or $head->f->is_ready;
193   }
194}
195
196sub request
197{
198   my $self = shift;
199   my %args = @_;
200
201   my $on_header = $args{on_header} or croak "Expected 'on_header' as a CODE ref";
202
203   my $req = $args{request};
204   ref $req and $req->isa( "HTTP::Request" ) or croak "Expected 'request' as a HTTP::Request reference";
205
206   $self->debug_printf( "REQUEST %s %s", $req->method, $req->uri );
207
208   my $request_body = $args{request_body};
209   my $expect_continue = !!$args{expect_continue};
210
211   my $method = $req->method;
212
213   if( $method eq "POST" or $method eq "PUT" or length $req->content ) {
214      $req->init_header( "Content-Length", length $req->content );
215   }
216
217   if( $expect_continue ) {
218      $req->init_header( "Expect", "100-continue" );
219   }
220
221   if( $self->{decode_content} ) {
222      #$req->init_header( "Accept-Encoding", Net::Async::HTTP->can_decode )
223      $req->init_header( "Accept-Encoding", "gzip" );
224   }
225
226   my $f = $self->loop->new_future
227      ->set_label( "$method " . $req->uri );
228
229   # TODO: Cancelling a request Future shouldn't necessarily close the socket
230   # if we haven't even started writing the request yet. But we can't know
231   # that currently.
232   $f->on_cancel( sub {
233      $self->debug_printf( "CLOSE on_cancel" );
234      $self->close_now;
235   });
236
237   my $stall_timer;
238   if( $args{stall_timeout} ) {
239      $stall_timer = Net::Async::HTTP::StallTimer->new(
240         delay => $args{stall_timeout},
241         future => $f,
242      );
243      $self->add_child( $stall_timer );
244      # Don't start it yet
245
246      my $remove_timer = sub {
247         $self->remove_child( $stall_timer ) if $stall_timer;
248         undef $stall_timer;
249      };
250
251      $f->on_ready( $remove_timer );
252   }
253
254   push @{ $self->{request_queue} }, my $ctx = RequestContext(
255      req         => $req,
256      on_read     => undef, # will be set later
257      stall_timer => $stall_timer,
258      resp_header => undef, # will be set later
259      resp_bytes  => 0,
260      on_done     => $args{on_done},
261      is_done     => 0,
262      f           => $f,
263   );
264
265   my $on_body_write;
266   if( $stall_timer or $args{on_body_write} ) {
267      my $inner_on_body_write = $args{on_body_write};
268      my $written = 0;
269      $on_body_write = sub {
270         $stall_timer->reset if $stall_timer;
271         $inner_on_body_write->( $written += $_[1] ) if $inner_on_body_write;
272      };
273   }
274
275   my $write_request_body = defined $request_body ? sub {
276      my ( $self ) = @_;
277      $self->write( $request_body,
278         on_write => $on_body_write
279      );
280   } : undef;
281
282   # Unless the request method is CONNECT, or we are connecting to a
283   # non-transparent proxy, the URL is not allowed to contain
284   # an authority; only path
285   # Take a copy of the headers since we'll be hacking them up
286   my $headers = $req->headers->clone;
287   my $path;
288   if( $method eq "CONNECT" ) {
289      $path = $req->uri->as_string;
290   }
291   else {
292      my $uri = $req->uri;
293      if ( $self->{is_proxy} ) {
294          $path = $uri->as_string;
295      }
296      else {
297          $path = $uri->path_query;
298          $path = "/$path" unless $path =~ m{^/};
299      }
300      my $authority = $uri->authority;
301      if( defined $authority and
302          my ( $user, $pass, $host ) = $authority =~ m/^(.*?):(.*)@(.*)$/ ) {
303         $headers->init_header( Host => $host );
304         $headers->authorization_basic( $user, $pass );
305      }
306      else {
307         $headers->init_header( Host => $authority );
308      }
309   }
310
311   my $protocol = $req->protocol || "HTTP/1.1";
312   my @headers = ( "$method $path $protocol" );
313   $headers->scan( sub {
314      my ( $name, $value ) = @_;
315      $name =~ s/^://; # non-canonical header
316      push @headers, "$name: $value";
317   } );
318
319   $stall_timer->start if $stall_timer;
320   $stall_timer->reason = "writing request" if $stall_timer;
321
322   my $on_header_write = $stall_timer ? sub { $stall_timer->reset } : undef;
323
324   $self->write( join( $CRLF, @headers ) .
325                 $CRLF . $CRLF,
326                 on_write => $on_header_write );
327
328   $self->write( $req->content,
329                 on_write => $on_body_write ) if length $req->content;
330   $write_request_body->( $self ) if $write_request_body and !$expect_continue;
331
332   $self->write( "", on_flush => sub {
333      return unless $stall_timer; # test again in case it was cancelled in the meantime
334      $stall_timer->reset;
335      $stall_timer->reason = "waiting for response";
336   }) if $stall_timer;
337
338   $self->{requests_in_flight}++;
339
340   $ctx->on_read = $self->_mk_on_read_header(
341      $args{previous_response}, $expect_continue ? $write_request_body : undef, $on_header
342   );
343
344   return $f;
345}
346
347sub _mk_on_read_header
348{
349   shift; # $self
350   my ( $previous_response, $write_request_body, $on_header ) = @_;
351
352   sub {
353      my ( $self, $buffref, $closed, $ctx ) = @_;
354
355      my $req         = $ctx->req;
356      my $stall_timer = $ctx->stall_timer;
357      my $f           = $ctx->f;
358
359      if( $stall_timer ) {
360         $stall_timer->reason = "receiving response header";
361         $stall_timer->reset;
362      }
363
364      if( length $$buffref >= 4 and $$buffref !~ m/^HTTP/ ) {
365         $self->debug_printf( "ERROR fail" );
366         $f->fail( "Did not receive HTTP response from server", http => undef, $req ) unless $f->is_cancelled;
367         $self->close_now;
368      }
369
370      unless( $$buffref =~ s/^(.*?$CRLF$CRLF)//s ) {
371         if( $closed ) {
372            $self->debug_printf( "ERROR closed" );
373            $f->fail( "Connection closed while awaiting header", http => undef, $req ) unless $f->is_cancelled;
374            $self->close_now;
375         }
376         return 0;
377      }
378
379      $ctx->resp_bytes += $+[0];
380
381      my $header = HTTP::Response->parse( $1 );
382      # HTTP::Response doesn't strip the \rs from this
383      ( my $status_line = $header->status_line ) =~ s/\r$//;
384
385      $ctx->resp_header = $header;
386
387      unless( HTTP_MESSAGE_TRIMS_LWS ) {
388         my @headers;
389         $header->scan( sub {
390            my ( $name, $value ) = @_;
391            s/^\s+//, s/\s+$// for $value;
392            push @headers, $name => $value;
393         } );
394         $header->header( @headers ) if @headers;
395      }
396
397      my $protocol = $header->protocol;
398      if( $protocol =~ m{^HTTP/1\.(\d+)$} and $1 >= 1 ) {
399         $self->{can_pipeline} = 1;
400      }
401
402      if( $header->code =~ m/^1/ ) { # 1xx is not a final response
403         $self->debug_printf( "HEADER [provisional] %s", $status_line );
404         $write_request_body->( $self ) if $write_request_body;
405         return 1;
406      }
407
408      $header->request( $req );
409      $header->previous( $previous_response ) if $previous_response;
410
411      $self->debug_printf( "HEADER %s", $status_line );
412
413      my $on_body_chunk = $on_header->( $header );
414
415      my $code = $header->code;
416
417      my $content_encoding = $header->header( "Content-Encoding" );
418
419      my $decoder;
420      if( $content_encoding and
421          $decoder = Net::Async::HTTP->can_decode( $content_encoding ) ) {
422         $header->init_header( "X-Original-Content-Encoding" => $header->remove_header( "Content-Encoding" ) );
423      }
424
425      # can_pipeline is set for HTTP/1.1 or above; presume it can keep-alive if set
426      my $connection_close = lc( $header->header( "Connection" ) || ( $self->{can_pipeline} ? "keep-alive" : "close" ) )
427                              eq "close";
428
429      if( $connection_close ) {
430         $self->{max_in_flight} = 1;
431      }
432      elsif( defined( my $keep_alive = lc( $header->header("Keep-Alive") || "" ) ) ) {
433         my ( $max ) = ( $keep_alive =~ m/max=(\d+)/ );
434         $self->{max_in_flight} = $max if $max && $max < $self->{max_in_flight};
435      }
436
437      my $on_more = sub {
438         my ( $chunk ) = @_;
439
440         if( $decoder and not eval { $chunk = $decoder->( $chunk ); 1 } ) {
441            $self->debug_printf( "ERROR decode failed" );
442            $f->fail( "Decode error $@", http => undef, $req );
443            $self->close;
444            return undef;
445         }
446
447         $on_body_chunk->( $chunk );
448
449         return 1;
450      };
451      my $on_done = sub {
452         my ( $ctx ) = @_;
453
454         $ctx->is_done++;
455
456         # TODO: IO::Async probably ought to do this. We need to fire the
457         # on_closed event _before_ calling on_body_chunk, to clear the
458         # connection cache in case another request comes - e.g. HEAD->GET
459         $self->close if $connection_close;
460
461         my $final;
462         if( $decoder and not eval { $final = $decoder->(); 1 } ) {
463            $self->debug_printf( "ERROR decode failed" );
464            $f->fail( "Decode error $@", http => undef, $req );
465            $self->close;
466            return undef;
467         }
468
469         $on_body_chunk->( $final ) if defined $final and length $final;
470
471         my $response = $on_body_chunk->();
472         my $e = eval { $f->done( $response ) unless $f->is_cancelled; 1 } ? undef : $@;
473
474         $ctx->on_done->( $ctx ) if $ctx->on_done;
475
476         $self->{requests_in_flight}--;
477         $self->debug_printf( "DONE remaining in-flight=$self->{requests_in_flight}" );
478         $self->ready;
479
480         if( defined $e ) {
481            chomp $e;
482            $self->invoke_error( $e, perl => );
483            # This might not return, if it top-level croaks
484         }
485
486         return undef; # Finished
487      };
488
489      # RFC 2616 says "HEAD" does not have a body, nor do any 1xx codes, nor
490      # 204 (No Content) nor 304 (Not Modified)
491      if( $req->method eq "HEAD" or $code =~ m/^1..$/ or $code eq "204" or $code eq "304" ) {
492         $self->debug_printf( "BODY done [none]" );
493         return $on_done->( $ctx );
494      }
495
496      my $transfer_encoding = $header->header( "Transfer-Encoding" );
497      my $content_length    = $header->content_length;
498
499      if( defined $transfer_encoding and $transfer_encoding eq "chunked" ) {
500         $self->debug_printf( "BODY chunks" );
501
502         $stall_timer->reason = "receiving body chunks" if $stall_timer;
503         return $self->_mk_on_read_chunked( $on_more, $on_done );
504      }
505      elsif( defined $content_length ) {
506         $self->debug_printf( "BODY length $content_length" );
507
508         if( $content_length == 0 ) {
509            $self->debug_printf( "BODY done [length=0]" );
510            return $on_done->( $ctx );
511         }
512
513         $stall_timer->reason = "receiving body" if $stall_timer;
514         return $self->_mk_on_read_length( $content_length, $on_more, $on_done );
515      }
516      else {
517         $self->debug_printf( "BODY until EOF" );
518
519         $stall_timer->reason = "receiving body until EOF" if $stall_timer;
520         return $self->_mk_on_read_until_eof( $on_more, $on_done );
521      }
522   };
523}
524
525sub _mk_on_read_chunked
526{
527   shift; # $self
528   my ( $on_more, $on_done ) = @_;
529
530   my $chunk_length;
531
532   sub {
533      my ( $self, $buffref, $closed, $ctx ) = @_;
534
535      my $req = $ctx->req;
536      my $f   = $ctx->f;
537
538      if( !defined $chunk_length and $$buffref =~ s/^(.*?)$CRLF// ) {
539         my $header = $1;
540         $ctx->resp_bytes += $+[0];
541
542         # Chunk header
543         unless( $header =~ s/^([A-Fa-f0-9]+).*// ) {
544            $f->fail( "Corrupted chunk header", http => undef, $req ) unless $f->is_cancelled;
545            $self->close_now;
546            return 0;
547         }
548
549         $chunk_length = hex( $1 );
550         return 1 if $chunk_length;
551
552         return $self->_mk_on_read_chunk_trailer( $req, $on_more, $on_done, $f );
553      }
554
555      # Chunk is followed by a CRLF, which isn't counted in the length;
556      if( defined $chunk_length and length( $$buffref ) >= $chunk_length + 2 ) {
557         # Chunk body
558         my $chunk = substr( $$buffref, 0, $chunk_length, "" );
559         $ctx->resp_bytes += length $chunk;
560
561         unless( $$buffref =~ s/^$CRLF// ) {
562            $self->debug_printf( "ERROR chunk without CRLF" );
563            $f->fail( "Chunk of size $chunk_length wasn't followed by CRLF", http => undef, $req ) unless $f->is_cancelled;
564            $self->close;
565         }
566
567         $ctx->resp_bytes += $+[0];
568
569         undef $chunk_length;
570
571         return $on_more->( $chunk );
572      }
573
574      if( $closed ) {
575         $self->debug_printf( "ERROR closed" );
576         $f->fail( "Connection closed while awaiting chunk", http => undef, $req ) unless $f->is_cancelled;
577      }
578      return 0;
579   };
580}
581
582sub _mk_on_read_chunk_trailer
583{
584   shift; # $self
585   my ( undef, $on_more, $on_done ) = @_;
586
587   my $trailer = "";
588
589   sub {
590      my ( $self, $buffref, $closed, $ctx ) = @_;
591
592      my $req = $ctx->req;
593      my $f   = $ctx->f;
594
595      if( $closed ) {
596         $self->debug_printf( "ERROR closed" );
597         $f->fail( "Connection closed while awaiting chunk trailer", http => undef, $req ) unless $f->is_cancelled;
598      }
599
600      $$buffref =~ s/^(.*)$CRLF// or return 0;
601      $trailer .= $1;
602      $ctx->resp_bytes += $+[0];
603
604      return 1 if length $1;
605
606      # TODO: Actually use the trailer
607
608      $self->debug_printf( "BODY done [chunked]" );
609      return $on_done->( $ctx );
610   };
611}
612
613sub _mk_on_read_length
614{
615   shift; # $self
616   my ( $content_length, $on_more, $on_done ) = @_;
617
618   sub {
619      my ( $self, $buffref, $closed, $ctx ) = @_;
620
621      my $req = $ctx->req;
622      my $f   = $ctx->f;
623
624      # This will truncate it if the server provided too much
625      my $content = substr( $$buffref, 0, $content_length, "" );
626      $content_length -= length $content;
627      $ctx->resp_bytes += length $content;
628
629      return undef unless $on_more->( $content );
630
631      if( $content_length == 0 ) {
632         $self->debug_printf( "BODY done [length]" );
633         return $on_done->( $ctx );
634      }
635
636      if( $closed ) {
637         $self->debug_printf( "ERROR closed" );
638         $f->fail( "Connection closed while awaiting body", http => undef, $req ) unless $f->is_cancelled;
639      }
640      return 0;
641   };
642}
643
644sub _mk_on_read_until_eof
645{
646   shift; # $self
647   my ( $on_more, $on_done ) = @_;
648
649   sub {
650      my ( $self, $buffref, $closed, $ctx ) = @_;
651
652      my $content = $$buffref;
653      $$buffref = "";
654      $ctx->resp_bytes += length $content;
655
656      return undef unless $on_more->( $content );
657
658      return 0 unless $closed;
659
660      $self->debug_printf( "BODY done [eof]" );
661      return $on_done->( $ctx );
662   };
663}
664
665=head1 AUTHOR
666
667Paul Evans <leonerd@leonerd.org.uk>
668
669=cut
670
6710x55AA;
672