1package Protocol::HTTP2::Stream; 2use strict; 3use warnings; 4use Protocol::HTTP2::Constants qw(:states :endpoints :settings :frame_types 5 :limits :errors); 6use Protocol::HTTP2::HeaderCompression qw( headers_decode ); 7use Protocol::HTTP2::Trace qw(tracer); 8 9# Streams related part of Protocol::HTTP2::Conntection 10 11# Autogen properties 12{ 13 no strict 'refs'; 14 for my $prop ( 15 qw(promised_sid headers pp_headers header_block trailer 16 trailer_headers length blocked_data weight end reset) 17 ) 18 { 19 *{ __PACKAGE__ . '::stream_' . $prop } = sub { 20 return 21 !exists $_[0]->{streams}->{ $_[1] } ? undef 22 : @_ == 2 ? $_[0]->{streams}->{ $_[1] }->{$prop} 23 : ( $_[0]->{streams}->{ $_[1] }->{$prop} = $_[2] ); 24 } 25 } 26} 27 28sub new_stream { 29 my $self = shift; 30 return undef if $self->goaway; 31 32 $self->{last_stream} += 2 33 if exists $self->{streams}->{ $self->{type} == CLIENT ? 1 : 2 }; 34 $self->{streams}->{ $self->{last_stream} } = { 35 'state' => IDLE, 36 'weight' => DEFAULT_WEIGHT, 37 'stream_dep' => 0, 38 'fcw_recv' => $self->dec_setting(SETTINGS_INITIAL_WINDOW_SIZE), 39 'fcw_send' => $self->enc_setting(SETTINGS_INITIAL_WINDOW_SIZE), 40 }; 41 return $self->{last_stream}; 42} 43 44sub new_peer_stream { 45 my $self = shift; 46 my $stream_id = shift; 47 if ( $stream_id < $self->{last_peer_stream} 48 || ( $stream_id % 2 ) == ( $self->{type} == CLIENT ) ? 1 : 0 49 || $self->goaway ) 50 { 51 tracer->error("Peer send invalid stream id: $stream_id\n"); 52 $self->error(PROTOCOL_ERROR); 53 return undef; 54 } 55 $self->{last_peer_stream} = $stream_id; 56 if ( $self->dec_setting(SETTINGS_MAX_CONCURRENT_STREAMS) <= 57 $self->{active_peer_streams} ) 58 { 59 tracer->warning("SETTINGS_MAX_CONCURRENT_STREAMS exceeded\n"); 60 $self->stream_error( $stream_id, REFUSED_STREAM ); 61 return undef; 62 } 63 $self->{active_peer_streams}++; 64 tracer->debug("Active streams: $self->{active_peer_streams}"); 65 $self->{streams}->{$stream_id} = { 66 'state' => IDLE, 67 'weight' => DEFAULT_WEIGHT, 68 'stream_dep' => 0, 69 'fcw_recv' => $self->dec_setting(SETTINGS_INITIAL_WINDOW_SIZE), 70 'fcw_send' => $self->enc_setting(SETTINGS_INITIAL_WINDOW_SIZE), 71 }; 72 $self->{on_new_peer_stream}->($stream_id) 73 if exists $self->{on_new_peer_stream}; 74 75 return $self->{last_peer_stream}; 76} 77 78sub stream { 79 my ( $self, $stream_id ) = @_; 80 return undef unless exists $self->{streams}->{$stream_id}; 81 82 $self->{streams}->{$stream_id}; 83} 84 85# stream_state ( $self, $stream_id, $new_state?, $pending? ) 86 87sub stream_state { 88 my $self = shift; 89 my $stream_id = shift; 90 return undef unless exists $self->{streams}->{$stream_id}; 91 my $s = $self->{streams}->{$stream_id}; 92 93 if (@_) { 94 my ( $new_state, $pending ) = @_; 95 96 if ($pending) { 97 $self->stream_pending_state( $stream_id, $new_state ); 98 } 99 else { 100 $self->{on_change_state}->( $stream_id, $s->{state}, $new_state ) 101 if exists $self->{on_change_state}; 102 103 $s->{state} = $new_state; 104 105 # Exec callbacks for new state 106 if ( exists $s->{cb} && exists $s->{cb}->{ $s->{state} } ) { 107 for my $cb ( @{ $s->{cb}->{ $s->{state} } } ) { 108 $cb->(); 109 } 110 } 111 112 # Cleanup 113 if ( $new_state == CLOSED ) { 114 $self->{active_peer_streams}-- 115 if $self->{active_peer_streams} 116 && ( ( $stream_id % 2 ) ^ ( $self->{type} == CLIENT ) ); 117 tracer->info( 118 "Active streams: $self->{active_peer_streams} $stream_id"); 119 for my $key ( keys %$s ) { 120 next if grep { $key eq $_ } ( 121 qw(state weight stream_dep 122 fcw_recv fcw_send reset) 123 ); 124 delete $s->{$key}; 125 } 126 } 127 } 128 } 129 130 $s->{state}; 131} 132 133sub stream_pending_state { 134 my $self = shift; 135 my $stream_id = shift; 136 return undef unless exists $self->{streams}->{$stream_id}; 137 my $s = $self->{streams}->{$stream_id}; 138 if (@_) { 139 $s->{pending_state} = shift; 140 $self->{pending_stream} = 141 defined $s->{pending_state} ? $stream_id : undef; 142 } 143 $s->{pending_state}; 144} 145 146sub stream_cb { 147 my ( $self, $stream_id, $state, $cb ) = @_; 148 149 return undef unless exists $self->{streams}->{$stream_id}; 150 151 push @{ $self->{streams}->{$stream_id}->{cb}->{$state} }, $cb; 152} 153 154sub stream_frame_cb { 155 my ( $self, $stream_id, $frame, $cb ) = @_; 156 157 return undef unless exists $self->{streams}->{$stream_id}; 158 159 push @{ $self->{streams}->{$stream_id}->{frame_cb}->{$frame} }, $cb; 160} 161 162sub stream_data { 163 my $self = shift; 164 my $stream_id = shift; 165 return undef unless exists $self->{streams}->{$stream_id}; 166 my $s = $self->{streams}->{$stream_id}; 167 168 if (@_) { 169 170 # Exec callbacks for data 171 if ( exists $s->{frame_cb} && exists $s->{frame_cb}->{&DATA} ) { 172 for my $cb ( @{ $s->{frame_cb}->{&DATA} } ) { 173 $cb->( $_[0] ); 174 } 175 } 176 else { 177 $s->{data} .= shift; 178 } 179 } 180 181 $s->{data}; 182} 183 184sub stream_headers_done { 185 my $self = shift; 186 my $stream_id = shift; 187 return undef unless exists $self->{streams}->{$stream_id}; 188 my $s = $self->{streams}->{$stream_id}; 189 190 my $res = 191 headers_decode( $self, \$s->{header_block}, 0, 192 length $s->{header_block}, $stream_id ); 193 194 tracer->debug("Headers done for stream $stream_id\n"); 195 196 return undef unless defined $res; 197 198 # Clear header_block 199 $s->{header_block} = ''; 200 201 my $eh = $self->decode_context->{emitted_headers}; 202 my $is_response = $self->{type} == CLIENT && !$s->{promised_sid}; 203 my $is_trailer = !!$self->stream_trailer($stream_id); 204 205 return undef 206 unless $self->validate_headers( $eh, $stream_id, $is_response ); 207 208 if ( $s->{promised_sid} ) { 209 $self->{streams}->{ $s->{promised_sid} }->{pp_headers} = $eh; 210 } 211 elsif ($is_trailer) { 212 $self->stream_trailer_headers( $stream_id, $eh ); 213 } 214 else { 215 $s->{headers} = $eh; 216 } 217 218 # Exec callbacks for headers 219 if ( exists $s->{frame_cb} && exists $s->{frame_cb}->{&HEADERS} ) { 220 for my $cb ( @{ $s->{frame_cb}->{&HEADERS} } ) { 221 $cb->($eh); 222 } 223 } 224 225 # Clear emitted headers 226 $self->decode_context->{emitted_headers} = []; 227 228 return 1; 229} 230 231sub validate_headers { 232 my ( $self, $headers, $stream_id, $is_response ) = @_; 233 my $pseudo_flag = 1; 234 my %pseudo_hash = (); 235 my @h = $is_response ? (qw(:status)) : ( 236 qw(:method :scheme :authority 237 :path) 238 ); 239 240 # Trailer headers ? 241 if ( my $t = $self->stream_trailer($stream_id) ) { 242 for my $i ( 0 .. @$headers / 2 - 1 ) { 243 my ( $h, $v ) = ( $headers->[ $i * 2 ], $headers->[ $i * 2 + 1 ] ); 244 if ( !exists $t->{$h} ) { 245 tracer->warning( 246 "header <$h> doesn't listed in the trailer header"); 247 $self->stream_error( $stream_id, PROTOCOL_ERROR ); 248 return undef; 249 } 250 } 251 return 1; 252 } 253 254 for my $i ( 0 .. @$headers / 2 - 1 ) { 255 my ( $h, $v ) = ( $headers->[ $i * 2 ], $headers->[ $i * 2 + 1 ] ); 256 if ( $h =~ /^\:/ ) { 257 if ( !$pseudo_flag ) { 258 tracer->warning( 259 "pseudo-header <$h> appears after a regular header"); 260 $self->stream_error( $stream_id, PROTOCOL_ERROR ); 261 return undef; 262 } 263 elsif ( !grep { $_ eq $h } @h ) { 264 tracer->warning("invalid pseudo-header <$h>"); 265 $self->stream_error( $stream_id, PROTOCOL_ERROR ); 266 return undef; 267 } 268 elsif ( exists $pseudo_hash{$h} ) { 269 tracer->warning("repeated pseudo-header <$h>"); 270 $self->stream_error( $stream_id, PROTOCOL_ERROR ); 271 return undef; 272 } 273 274 $pseudo_hash{$h} = $v; 275 next; 276 } 277 278 $pseudo_flag = 0 if $pseudo_flag; 279 280 if ( $h eq 'connection' ) { 281 tracer->warning("connection header is not valid in http/2"); 282 $self->stream_error( $stream_id, PROTOCOL_ERROR ); 283 return undef; 284 } 285 elsif ( $h eq 'te' && $v ne 'trailers' ) { 286 tracer->warning("TE header can contain only value 'trailers'"); 287 $self->stream_error( $stream_id, PROTOCOL_ERROR ); 288 return undef; 289 } 290 elsif ( $h eq 'content-length' ) { 291 $self->stream_length( $stream_id, $v ); 292 } 293 elsif ( $h eq 'trailer' ) { 294 my %th = map { $_ => 1 } split /\s*,\s*/, lc($v); 295 if ( 296 grep { exists $th{$_} } ( 297 qw(transfer-encoding content-length host authentication 298 cache-control expect max-forwards pragma range te 299 content-encoding content-type content-range trailer) 300 ) 301 ) 302 { 303 tracer->warning("trailer header contain forbidden headers"); 304 $self->stream_error( $stream_id, PROTOCOL_ERROR ); 305 return undef; 306 } 307 $self->stream_trailer( $stream_id, {%th} ); 308 } 309 } 310 311 for my $h (@h) { 312 next if exists $pseudo_hash{$h}; 313 314 tracer->warning("missed mandatory pseudo-header $h"); 315 $self->stream_error( $stream_id, PROTOCOL_ERROR ); 316 return undef; 317 } 318 319 1; 320} 321 322# RST_STREAM for stream errors 323sub stream_error { 324 my ( $self, $stream_id, $error ) = @_; 325 $self->enqueue( RST_STREAM, 0, $stream_id, $error ); 326} 327 328# Flow control windown of stream 329sub _stream_fcw { 330 my $dir = shift; 331 my $self = shift; 332 my $stream_id = shift; 333 return undef unless exists $self->{streams}->{$stream_id}; 334 my $s = $self->{streams}->{$stream_id}; 335 336 if (@_) { 337 $s->{$dir} += shift; 338 tracer->debug( "Stream $stream_id $dir now is " . $s->{$dir} . "\n" ); 339 } 340 $s->{$dir}; 341} 342 343sub stream_fcw_send { 344 _stream_fcw( 'fcw_send', @_ ); 345} 346 347sub stream_fcw_recv { 348 _stream_fcw( 'fcw_recv', @_ ); 349} 350 351sub stream_fcw_update { 352 my ( $self, $stream_id ) = @_; 353 354 # TODO: check size of data of stream in memory 355 my $size = $self->dec_setting(SETTINGS_INITIAL_WINDOW_SIZE); 356 tracer->debug("update fcw recv of stream $stream_id with $size b.\n"); 357 $self->stream_fcw_recv( $stream_id, $size ); 358 $self->enqueue( WINDOW_UPDATE, 0, $stream_id, $size ); 359} 360 361sub stream_send_blocked { 362 my ( $self, $stream_id ) = @_; 363 my $s = $self->{streams}->{$stream_id} or return undef; 364 365 if ( length( $s->{blocked_data} ) 366 && $self->stream_fcw_send($stream_id) > 0 ) 367 { 368 $self->send_data($stream_id); 369 } 370} 371 372sub stream_reprio { 373 my ( $self, $stream_id, $exclusive, $stream_dep ) = @_; 374 return undef 375 unless exists $self->{streams}->{$stream_id} 376 && ( $stream_dep == 0 || exists $self->{streams}->{$stream_dep} ) 377 && $stream_id != $stream_dep; 378 my $s = $self->{streams}; 379 380 if ( $s->{$stream_id}->{stream_dep} != $stream_dep ) { 381 382 # check if new stream_dep is stream child 383 if ( $stream_dep != 0 ) { 384 my $sid = $stream_dep; 385 while ( $sid = $s->{$sid}->{stream_dep} ) { 386 next unless $sid == $stream_id; 387 388 # Child take my stream dep 389 $s->{$stream_dep}->{stream_dep} = 390 $s->{$stream_id}->{stream_dep}; 391 last; 392 } 393 } 394 395 # Set new stream dep 396 $s->{$stream_id}->{stream_dep} = $stream_dep; 397 } 398 399 if ($exclusive) { 400 401 # move all siblings to childs 402 for my $sid ( keys %$s ) { 403 next 404 if $s->{$sid}->{stream_dep} != $stream_dep 405 || $sid == $stream_id; 406 407 $s->{$sid}->{stream_dep} = $stream_id; 408 } 409 } 410 411 return 1; 412} 413 4141; 415