1package Protocol::HTTP2::Stream;
2use strict;
3use warnings;
4use Protocol::HTTP2::Constants qw(:states :endpoints :settings :frame_types
5  :limits :errors);
6use Protocol::HTTP2::HeaderCompression qw( headers_decode );
7use Protocol::HTTP2::Trace qw(tracer);
8
9# Streams related part of Protocol::HTTP2::Conntection
10
11# Autogen properties
12{
13    no strict 'refs';
14    for my $prop (
15        qw(promised_sid headers pp_headers header_block trailer
16        trailer_headers length blocked_data weight end reset)
17      )
18    {
19        *{ __PACKAGE__ . '::stream_' . $prop } = sub {
20            return
21                !exists $_[0]->{streams}->{ $_[1] } ? undef
22              : @_ == 2 ? $_[0]->{streams}->{ $_[1] }->{$prop}
23              :           ( $_[0]->{streams}->{ $_[1] }->{$prop} = $_[2] );
24          }
25    }
26}
27
28sub new_stream {
29    my $self = shift;
30    return undef if $self->goaway;
31
32    $self->{last_stream} += 2
33      if exists $self->{streams}->{ $self->{type} == CLIENT ? 1 : 2 };
34    $self->{streams}->{ $self->{last_stream} } = {
35        'state'      => IDLE,
36        'weight'     => DEFAULT_WEIGHT,
37        'stream_dep' => 0,
38        'fcw_recv'   => $self->dec_setting(SETTINGS_INITIAL_WINDOW_SIZE),
39        'fcw_send'   => $self->enc_setting(SETTINGS_INITIAL_WINDOW_SIZE),
40    };
41    return $self->{last_stream};
42}
43
44sub new_peer_stream {
45    my $self      = shift;
46    my $stream_id = shift;
47    if (   $stream_id < $self->{last_peer_stream}
48        || ( $stream_id % 2 ) == ( $self->{type} == CLIENT ) ? 1 : 0
49        || $self->goaway )
50    {
51        tracer->error("Peer send invalid stream id: $stream_id\n");
52        $self->error(PROTOCOL_ERROR);
53        return undef;
54    }
55    $self->{last_peer_stream} = $stream_id;
56    if ( $self->dec_setting(SETTINGS_MAX_CONCURRENT_STREAMS) <=
57        $self->{active_peer_streams} )
58    {
59        tracer->warning("SETTINGS_MAX_CONCURRENT_STREAMS exceeded\n");
60        $self->stream_error( $stream_id, REFUSED_STREAM );
61        return undef;
62    }
63    $self->{active_peer_streams}++;
64    tracer->debug("Active streams: $self->{active_peer_streams}");
65    $self->{streams}->{$stream_id} = {
66        'state'      => IDLE,
67        'weight'     => DEFAULT_WEIGHT,
68        'stream_dep' => 0,
69        'fcw_recv'   => $self->dec_setting(SETTINGS_INITIAL_WINDOW_SIZE),
70        'fcw_send'   => $self->enc_setting(SETTINGS_INITIAL_WINDOW_SIZE),
71    };
72    $self->{on_new_peer_stream}->($stream_id)
73      if exists $self->{on_new_peer_stream};
74
75    return $self->{last_peer_stream};
76}
77
78sub stream {
79    my ( $self, $stream_id ) = @_;
80    return undef unless exists $self->{streams}->{$stream_id};
81
82    $self->{streams}->{$stream_id};
83}
84
85# stream_state ( $self, $stream_id, $new_state?, $pending? )
86
87sub stream_state {
88    my $self      = shift;
89    my $stream_id = shift;
90    return undef unless exists $self->{streams}->{$stream_id};
91    my $s = $self->{streams}->{$stream_id};
92
93    if (@_) {
94        my ( $new_state, $pending ) = @_;
95
96        if ($pending) {
97            $self->stream_pending_state( $stream_id, $new_state );
98        }
99        else {
100            $self->{on_change_state}->( $stream_id, $s->{state}, $new_state )
101              if exists $self->{on_change_state};
102
103            $s->{state} = $new_state;
104
105            # Exec callbacks for new state
106            if ( exists $s->{cb} && exists $s->{cb}->{ $s->{state} } ) {
107                for my $cb ( @{ $s->{cb}->{ $s->{state} } } ) {
108                    $cb->();
109                }
110            }
111
112            # Cleanup
113            if ( $new_state == CLOSED ) {
114                $self->{active_peer_streams}--
115                  if $self->{active_peer_streams}
116                  && ( ( $stream_id % 2 ) ^ ( $self->{type} == CLIENT ) );
117                tracer->info(
118                    "Active streams: $self->{active_peer_streams} $stream_id");
119                for my $key ( keys %$s ) {
120                    next if grep { $key eq $_ } (
121                        qw(state weight stream_dep
122                          fcw_recv fcw_send reset)
123                    );
124                    delete $s->{$key};
125                }
126            }
127        }
128    }
129
130    $s->{state};
131}
132
133sub stream_pending_state {
134    my $self      = shift;
135    my $stream_id = shift;
136    return undef unless exists $self->{streams}->{$stream_id};
137    my $s = $self->{streams}->{$stream_id};
138    if (@_) {
139        $s->{pending_state} = shift;
140        $self->{pending_stream} =
141          defined $s->{pending_state} ? $stream_id : undef;
142    }
143    $s->{pending_state};
144}
145
146sub stream_cb {
147    my ( $self, $stream_id, $state, $cb ) = @_;
148
149    return undef unless exists $self->{streams}->{$stream_id};
150
151    push @{ $self->{streams}->{$stream_id}->{cb}->{$state} }, $cb;
152}
153
154sub stream_frame_cb {
155    my ( $self, $stream_id, $frame, $cb ) = @_;
156
157    return undef unless exists $self->{streams}->{$stream_id};
158
159    push @{ $self->{streams}->{$stream_id}->{frame_cb}->{$frame} }, $cb;
160}
161
162sub stream_data {
163    my $self      = shift;
164    my $stream_id = shift;
165    return undef unless exists $self->{streams}->{$stream_id};
166    my $s = $self->{streams}->{$stream_id};
167
168    if (@_) {
169
170        # Exec callbacks for data
171        if ( exists $s->{frame_cb} && exists $s->{frame_cb}->{&DATA} ) {
172            for my $cb ( @{ $s->{frame_cb}->{&DATA} } ) {
173                $cb->( $_[0] );
174            }
175        }
176        else {
177            $s->{data} .= shift;
178        }
179    }
180
181    $s->{data};
182}
183
184sub stream_headers_done {
185    my $self      = shift;
186    my $stream_id = shift;
187    return undef unless exists $self->{streams}->{$stream_id};
188    my $s = $self->{streams}->{$stream_id};
189
190    my $res =
191      headers_decode( $self, \$s->{header_block}, 0,
192        length $s->{header_block}, $stream_id );
193
194    tracer->debug("Headers done for stream $stream_id\n");
195
196    return undef unless defined $res;
197
198    # Clear header_block
199    $s->{header_block} = '';
200
201    my $eh          = $self->decode_context->{emitted_headers};
202    my $is_response = $self->{type} == CLIENT && !$s->{promised_sid};
203    my $is_trailer  = !!$self->stream_trailer($stream_id);
204
205    return undef
206      unless $self->validate_headers( $eh, $stream_id, $is_response );
207
208    if ( $s->{promised_sid} ) {
209        $self->{streams}->{ $s->{promised_sid} }->{pp_headers} = $eh;
210    }
211    elsif ($is_trailer) {
212        $self->stream_trailer_headers( $stream_id, $eh );
213    }
214    else {
215        $s->{headers} = $eh;
216    }
217
218    # Exec callbacks for headers
219    if ( exists $s->{frame_cb} && exists $s->{frame_cb}->{&HEADERS} ) {
220        for my $cb ( @{ $s->{frame_cb}->{&HEADERS} } ) {
221            $cb->($eh);
222        }
223    }
224
225    # Clear emitted headers
226    $self->decode_context->{emitted_headers} = [];
227
228    return 1;
229}
230
231sub validate_headers {
232    my ( $self, $headers, $stream_id, $is_response ) = @_;
233    my $pseudo_flag = 1;
234    my %pseudo_hash = ();
235    my @h           = $is_response ? (qw(:status)) : (
236        qw(:method :scheme :authority
237          :path)
238    );
239
240    # Trailer headers ?
241    if ( my $t = $self->stream_trailer($stream_id) ) {
242        for my $i ( 0 .. @$headers / 2 - 1 ) {
243            my ( $h, $v ) = ( $headers->[ $i * 2 ], $headers->[ $i * 2 + 1 ] );
244            if ( !exists $t->{$h} ) {
245                tracer->warning(
246                    "header <$h> doesn't listed in the trailer header");
247                $self->stream_error( $stream_id, PROTOCOL_ERROR );
248                return undef;
249            }
250        }
251        return 1;
252    }
253
254    for my $i ( 0 .. @$headers / 2 - 1 ) {
255        my ( $h, $v ) = ( $headers->[ $i * 2 ], $headers->[ $i * 2 + 1 ] );
256        if ( $h =~ /^\:/ ) {
257            if ( !$pseudo_flag ) {
258                tracer->warning(
259                    "pseudo-header <$h> appears after a regular header");
260                $self->stream_error( $stream_id, PROTOCOL_ERROR );
261                return undef;
262            }
263            elsif ( !grep { $_ eq $h } @h ) {
264                tracer->warning("invalid pseudo-header <$h>");
265                $self->stream_error( $stream_id, PROTOCOL_ERROR );
266                return undef;
267            }
268            elsif ( exists $pseudo_hash{$h} ) {
269                tracer->warning("repeated pseudo-header <$h>");
270                $self->stream_error( $stream_id, PROTOCOL_ERROR );
271                return undef;
272            }
273
274            $pseudo_hash{$h} = $v;
275            next;
276        }
277
278        $pseudo_flag = 0 if $pseudo_flag;
279
280        if ( $h eq 'connection' ) {
281            tracer->warning("connection header is not valid in http/2");
282            $self->stream_error( $stream_id, PROTOCOL_ERROR );
283            return undef;
284        }
285        elsif ( $h eq 'te' && $v ne 'trailers' ) {
286            tracer->warning("TE header can contain only value 'trailers'");
287            $self->stream_error( $stream_id, PROTOCOL_ERROR );
288            return undef;
289        }
290        elsif ( $h eq 'content-length' ) {
291            $self->stream_length( $stream_id, $v );
292        }
293        elsif ( $h eq 'trailer' ) {
294            my %th = map { $_ => 1 } split /\s*,\s*/, lc($v);
295            if (
296                grep { exists $th{$_} } (
297                    qw(transfer-encoding content-length host authentication
298                      cache-control expect max-forwards pragma range te
299                      content-encoding content-type content-range trailer)
300                )
301              )
302            {
303                tracer->warning("trailer header contain forbidden headers");
304                $self->stream_error( $stream_id, PROTOCOL_ERROR );
305                return undef;
306            }
307            $self->stream_trailer( $stream_id, {%th} );
308        }
309    }
310
311    for my $h (@h) {
312        next if exists $pseudo_hash{$h};
313
314        tracer->warning("missed mandatory pseudo-header $h");
315        $self->stream_error( $stream_id, PROTOCOL_ERROR );
316        return undef;
317    }
318
319    1;
320}
321
322# RST_STREAM for stream errors
323sub stream_error {
324    my ( $self, $stream_id, $error ) = @_;
325    $self->enqueue( RST_STREAM, 0, $stream_id, $error );
326}
327
328# Flow control windown of stream
329sub _stream_fcw {
330    my $dir       = shift;
331    my $self      = shift;
332    my $stream_id = shift;
333    return undef unless exists $self->{streams}->{$stream_id};
334    my $s = $self->{streams}->{$stream_id};
335
336    if (@_) {
337        $s->{$dir} += shift;
338        tracer->debug( "Stream $stream_id $dir now is " . $s->{$dir} . "\n" );
339    }
340    $s->{$dir};
341}
342
343sub stream_fcw_send {
344    _stream_fcw( 'fcw_send', @_ );
345}
346
347sub stream_fcw_recv {
348    _stream_fcw( 'fcw_recv', @_ );
349}
350
351sub stream_fcw_update {
352    my ( $self, $stream_id ) = @_;
353
354    # TODO: check size of data of stream  in memory
355    my $size = $self->dec_setting(SETTINGS_INITIAL_WINDOW_SIZE);
356    tracer->debug("update fcw recv of stream $stream_id with $size b.\n");
357    $self->stream_fcw_recv( $stream_id, $size );
358    $self->enqueue( WINDOW_UPDATE, 0, $stream_id, $size );
359}
360
361sub stream_send_blocked {
362    my ( $self, $stream_id ) = @_;
363    my $s = $self->{streams}->{$stream_id} or return undef;
364
365    if ( length( $s->{blocked_data} )
366        && $self->stream_fcw_send($stream_id) > 0 )
367    {
368        $self->send_data($stream_id);
369    }
370}
371
372sub stream_reprio {
373    my ( $self, $stream_id, $exclusive, $stream_dep ) = @_;
374    return undef
375      unless exists $self->{streams}->{$stream_id}
376      && ( $stream_dep == 0 || exists $self->{streams}->{$stream_dep} )
377      && $stream_id != $stream_dep;
378    my $s = $self->{streams};
379
380    if ( $s->{$stream_id}->{stream_dep} != $stream_dep ) {
381
382        # check if new stream_dep is stream child
383        if ( $stream_dep != 0 ) {
384            my $sid = $stream_dep;
385            while ( $sid = $s->{$sid}->{stream_dep} ) {
386                next unless $sid == $stream_id;
387
388                # Child take my stream dep
389                $s->{$stream_dep}->{stream_dep} =
390                  $s->{$stream_id}->{stream_dep};
391                last;
392            }
393        }
394
395        # Set new stream dep
396        $s->{$stream_id}->{stream_dep} = $stream_dep;
397    }
398
399    if ($exclusive) {
400
401        # move all siblings to childs
402        for my $sid ( keys %$s ) {
403            next
404              if $s->{$sid}->{stream_dep} != $stream_dep
405              || $sid == $stream_id;
406
407            $s->{$sid}->{stream_dep} = $stream_id;
408        }
409    }
410
411    return 1;
412}
413
4141;
415