1 /* Licensed to the Apache Software Foundation (ASF) under one or more
2 * contributor license agreements. See the NOTICE file distributed with
3 * this work for additional information regarding copyright ownership.
4 * The ASF licenses this file to You under the Apache License, Version 2.0
5 * (the "License"); you may not use this file except in compliance with
6 * the License. You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <assert.h>
18 #include <stddef.h>
19
20 #include <apr_strings.h>
21
22 #include <httpd.h>
23 #include <http_core.h>
24 #include <http_connection.h>
25 #include <http_log.h>
26
27 #include <nghttp2/nghttp2.h>
28
29 #include "h2_private.h"
30 #include "h2.h"
31 #include "h2_bucket_beam.h"
32 #include "h2_conn.h"
33 #include "h2_config.h"
34 #include "h2_h2.h"
35 #include "h2_mplx.h"
36 #include "h2_push.h"
37 #include "h2_request.h"
38 #include "h2_headers.h"
39 #include "h2_session.h"
40 #include "h2_stream.h"
41 #include "h2_task.h"
42 #include "h2_ctx.h"
43 #include "h2_task.h"
44 #include "h2_util.h"
45
46
h2_ss_str(h2_stream_state_t state)47 static const char *h2_ss_str(h2_stream_state_t state)
48 {
49 switch (state) {
50 case H2_SS_IDLE:
51 return "IDLE";
52 case H2_SS_RSVD_L:
53 return "RESERVED_LOCAL";
54 case H2_SS_RSVD_R:
55 return "RESERVED_REMOTE";
56 case H2_SS_OPEN:
57 return "OPEN";
58 case H2_SS_CLOSED_L:
59 return "HALF_CLOSED_LOCAL";
60 case H2_SS_CLOSED_R:
61 return "HALF_CLOSED_REMOTE";
62 case H2_SS_CLOSED:
63 return "CLOSED";
64 case H2_SS_CLEANUP:
65 return "CLEANUP";
66 default:
67 return "UNKNOWN";
68 }
69 }
70
h2_stream_state_str(h2_stream * stream)71 const char *h2_stream_state_str(h2_stream *stream)
72 {
73 return h2_ss_str(stream->state);
74 }
75
76 /* Abbreviations for stream transit tables */
77 #define S_XXX (-2) /* Programming Error */
78 #define S_ERR (-1) /* Protocol Error */
79 #define S_NOP (0) /* No Change */
80 #define S_IDL (H2_SS_IDL + 1)
81 #define S_RS_L (H2_SS_RSVD_L + 1)
82 #define S_RS_R (H2_SS_RSVD_R + 1)
83 #define S_OPEN (H2_SS_OPEN + 1)
84 #define S_CL_L (H2_SS_CLOSED_L + 1)
85 #define S_CL_R (H2_SS_CLOSED_R + 1)
86 #define S_CLS (H2_SS_CLOSED + 1)
87 #define S_CLN (H2_SS_CLEANUP + 1)
88
89 /* state transisitions when certain frame types are sent */
90 static int trans_on_send[][H2_SS_MAX] = {
91 /*S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS, S_CLN, */
92 { S_ERR, S_ERR, S_ERR, S_NOP, S_NOP, S_ERR, S_NOP, S_NOP, },/* DATA */
93 { S_ERR, S_ERR, S_CL_R, S_NOP, S_NOP, S_ERR, S_NOP, S_NOP, },/* HEADERS */
94 { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* PRIORITY */
95 { S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, },/* RST_STREAM */
96 { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* SETTINGS */
97 { S_RS_L,S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* PUSH_PROMISE */
98 { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* PING */
99 { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* GOAWAY */
100 { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* WINDOW_UPDATE */
101 { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* CONT */
102 };
103 /* state transisitions when certain frame types are received */
104 static int trans_on_recv[][H2_SS_MAX] = {
105 /*S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS, S_CLN, */
106 { S_ERR, S_ERR, S_ERR, S_NOP, S_ERR, S_NOP, S_NOP, S_NOP, },/* DATA */
107 { S_OPEN,S_CL_L, S_ERR, S_NOP, S_ERR, S_NOP, S_NOP, S_NOP, },/* HEADERS */
108 { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* PRIORITY */
109 { S_ERR, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, },/* RST_STREAM */
110 { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* SETTINGS */
111 { S_RS_R,S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* PUSH_PROMISE */
112 { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* PING */
113 { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* GOAWAY */
114 { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* WINDOW_UPDATE */
115 { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* CONT */
116 };
117 /* state transisitions when certain events happen */
118 static int trans_on_event[][H2_SS_MAX] = {
119 /*S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS, S_CLN, */
120 { S_XXX, S_ERR, S_ERR, S_CL_L, S_CLS, S_XXX, S_XXX, S_XXX, },/* EV_CLOSED_L*/
121 { S_ERR, S_ERR, S_ERR, S_CL_R, S_ERR, S_CLS, S_NOP, S_NOP, },/* EV_CLOSED_R*/
122 { S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, },/* EV_CANCELLED*/
123 { S_NOP, S_XXX, S_XXX, S_XXX, S_XXX, S_CLS, S_CLN, S_XXX, },/* EV_EOS_SENT*/
124 };
125
on_map(h2_stream_state_t state,int map[H2_SS_MAX])126 static int on_map(h2_stream_state_t state, int map[H2_SS_MAX])
127 {
128 int op = map[state];
129 switch (op) {
130 case S_XXX:
131 case S_ERR:
132 return op;
133 case S_NOP:
134 return state;
135 default:
136 return op-1;
137 }
138 }
139
on_frame(h2_stream_state_t state,int frame_type,int frame_map[][H2_SS_MAX],apr_size_t maxlen)140 static int on_frame(h2_stream_state_t state, int frame_type,
141 int frame_map[][H2_SS_MAX], apr_size_t maxlen)
142 {
143 ap_assert(frame_type >= 0);
144 ap_assert(state >= 0);
145 if (frame_type >= maxlen) {
146 return state; /* NOP, ignore unknown frame types */
147 }
148 return on_map(state, frame_map[frame_type]);
149 }
150
on_frame_send(h2_stream_state_t state,int frame_type)151 static int on_frame_send(h2_stream_state_t state, int frame_type)
152 {
153 return on_frame(state, frame_type, trans_on_send, H2_ALEN(trans_on_send));
154 }
155
on_frame_recv(h2_stream_state_t state,int frame_type)156 static int on_frame_recv(h2_stream_state_t state, int frame_type)
157 {
158 return on_frame(state, frame_type, trans_on_recv, H2_ALEN(trans_on_recv));
159 }
160
on_event(h2_stream * stream,h2_stream_event_t ev)161 static int on_event(h2_stream* stream, h2_stream_event_t ev)
162 {
163 if (stream->monitor && stream->monitor->on_event) {
164 stream->monitor->on_event(stream->monitor->ctx, stream, ev);
165 }
166 if (ev < H2_ALEN(trans_on_event)) {
167 return on_map(stream->state, trans_on_event[ev]);
168 }
169 return stream->state;
170 }
171
H2_STREAM_OUT_LOG(int lvl,h2_stream * s,const char * tag)172 static void H2_STREAM_OUT_LOG(int lvl, h2_stream *s, const char *tag)
173 {
174 if (APLOG_C_IS_LEVEL(s->session->c, lvl)) {
175 conn_rec *c = s->session->c;
176 char buffer[4 * 1024];
177 apr_size_t len, bmax = sizeof(buffer)/sizeof(buffer[0]);
178
179 len = h2_util_bb_print(buffer, bmax, tag, "", s->out_buffer);
180 ap_log_cerror(APLOG_MARK, lvl, 0, c,
181 H2_STRM_MSG(s, "out-buffer(%s)"), len? buffer : "empty");
182 }
183 }
184
setup_input(h2_stream * stream)185 static apr_status_t setup_input(h2_stream *stream) {
186 if (stream->input == NULL) {
187 int empty = (stream->input_eof
188 && (!stream->in_buffer
189 || APR_BRIGADE_EMPTY(stream->in_buffer)));
190 if (!empty) {
191 h2_beam_create(&stream->input, stream->pool, stream->id,
192 "input", H2_BEAM_OWNER_SEND, 0,
193 stream->session->s->timeout);
194 h2_beam_send_from(stream->input, stream->pool);
195 }
196 }
197 return APR_SUCCESS;
198 }
199
close_input(h2_stream * stream)200 static apr_status_t close_input(h2_stream *stream)
201 {
202 conn_rec *c = stream->session->c;
203 apr_status_t status = APR_SUCCESS;
204
205 stream->input_eof = 1;
206 if (stream->input && h2_beam_is_closed(stream->input)) {
207 return APR_SUCCESS;
208 }
209
210 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
211 H2_STRM_MSG(stream, "closing input"));
212 if (stream->rst_error) {
213 return APR_ECONNRESET;
214 }
215
216 if (stream->trailers && !apr_is_empty_table(stream->trailers)) {
217 apr_bucket *b;
218 h2_headers *r;
219
220 if (!stream->in_buffer) {
221 stream->in_buffer = apr_brigade_create(stream->pool, c->bucket_alloc);
222 }
223
224 r = h2_headers_create(HTTP_OK, stream->trailers, NULL,
225 stream->in_trailer_octets, stream->pool);
226 stream->trailers = NULL;
227 b = h2_bucket_headers_create(c->bucket_alloc, r);
228 APR_BRIGADE_INSERT_TAIL(stream->in_buffer, b);
229
230 b = apr_bucket_eos_create(c->bucket_alloc);
231 APR_BRIGADE_INSERT_TAIL(stream->in_buffer, b);
232
233 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
234 H2_STRM_MSG(stream, "added trailers"));
235 h2_stream_dispatch(stream, H2_SEV_IN_DATA_PENDING);
236 }
237 if (stream->input) {
238 h2_stream_flush_input(stream);
239 return h2_beam_close(stream->input);
240 }
241 return status;
242 }
243
close_output(h2_stream * stream)244 static apr_status_t close_output(h2_stream *stream)
245 {
246 if (!stream->output || h2_beam_is_closed(stream->output)) {
247 return APR_SUCCESS;
248 }
249 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
250 H2_STRM_MSG(stream, "closing output"));
251 return h2_beam_leave(stream->output);
252 }
253
on_state_enter(h2_stream * stream)254 static void on_state_enter(h2_stream *stream)
255 {
256 if (stream->monitor && stream->monitor->on_state_enter) {
257 stream->monitor->on_state_enter(stream->monitor->ctx, stream);
258 }
259 }
260
on_state_event(h2_stream * stream,h2_stream_event_t ev)261 static void on_state_event(h2_stream *stream, h2_stream_event_t ev)
262 {
263 if (stream->monitor && stream->monitor->on_state_event) {
264 stream->monitor->on_state_event(stream->monitor->ctx, stream, ev);
265 }
266 }
267
on_state_invalid(h2_stream * stream)268 static void on_state_invalid(h2_stream *stream)
269 {
270 if (stream->monitor && stream->monitor->on_state_invalid) {
271 stream->monitor->on_state_invalid(stream->monitor->ctx, stream);
272 }
273 /* stream got an event/frame invalid in its state */
274 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
275 H2_STRM_MSG(stream, "invalid state event"));
276 switch (stream->state) {
277 case H2_SS_OPEN:
278 case H2_SS_RSVD_L:
279 case H2_SS_RSVD_R:
280 case H2_SS_CLOSED_L:
281 case H2_SS_CLOSED_R:
282 h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR);
283 break;
284 default:
285 break;
286 }
287 }
288
transit(h2_stream * stream,int new_state)289 static apr_status_t transit(h2_stream *stream, int new_state)
290 {
291 if (new_state == stream->state) {
292 return APR_SUCCESS;
293 }
294 else if (new_state < 0) {
295 ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, stream->session->c,
296 H2_STRM_LOG(APLOGNO(03081), stream, "invalid transition"));
297 on_state_invalid(stream);
298 return APR_EINVAL;
299 }
300
301 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
302 H2_STRM_MSG(stream, "transit to [%s]"), h2_ss_str(new_state));
303 stream->state = new_state;
304 switch (new_state) {
305 case H2_SS_IDLE:
306 break;
307 case H2_SS_RSVD_L:
308 close_input(stream);
309 break;
310 case H2_SS_RSVD_R:
311 break;
312 case H2_SS_OPEN:
313 break;
314 case H2_SS_CLOSED_L:
315 close_output(stream);
316 break;
317 case H2_SS_CLOSED_R:
318 close_input(stream);
319 break;
320 case H2_SS_CLOSED:
321 close_input(stream);
322 close_output(stream);
323 if (stream->out_buffer) {
324 apr_brigade_cleanup(stream->out_buffer);
325 }
326 break;
327 case H2_SS_CLEANUP:
328 break;
329 }
330 on_state_enter(stream);
331 return APR_SUCCESS;
332 }
333
h2_stream_set_monitor(h2_stream * stream,h2_stream_monitor * monitor)334 void h2_stream_set_monitor(h2_stream *stream, h2_stream_monitor *monitor)
335 {
336 stream->monitor = monitor;
337 }
338
h2_stream_dispatch(h2_stream * stream,h2_stream_event_t ev)339 void h2_stream_dispatch(h2_stream *stream, h2_stream_event_t ev)
340 {
341 int new_state;
342
343 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
344 H2_STRM_MSG(stream, "dispatch event %d"), ev);
345 new_state = on_event(stream, ev);
346 if (new_state < 0) {
347 ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, stream->session->c,
348 H2_STRM_LOG(APLOGNO(10002), stream, "invalid event %d"), ev);
349 on_state_invalid(stream);
350 AP_DEBUG_ASSERT(new_state > S_XXX);
351 return;
352 }
353 else if (new_state == stream->state) {
354 /* nop */
355 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
356 H2_STRM_MSG(stream, "non-state event %d"), ev);
357 return;
358 }
359 else {
360 on_state_event(stream, ev);
361 transit(stream, new_state);
362 }
363 }
364
set_policy_for(h2_stream * stream,h2_request * r)365 static void set_policy_for(h2_stream *stream, h2_request *r)
366 {
367 int enabled = h2_session_push_enabled(stream->session);
368 stream->push_policy = h2_push_policy_determine(r->headers, stream->pool, enabled);
369 r->serialize = h2_config_sgeti(stream->session->s, H2_CONF_SER_HEADERS);
370 }
371
h2_stream_send_frame(h2_stream * stream,int ftype,int flags,size_t frame_len)372 apr_status_t h2_stream_send_frame(h2_stream *stream, int ftype, int flags, size_t frame_len)
373 {
374 apr_status_t status = APR_SUCCESS;
375 int new_state, eos = 0;
376
377 new_state = on_frame_send(stream->state, ftype);
378 if (new_state < 0) {
379 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
380 H2_STRM_MSG(stream, "invalid frame %d send"), ftype);
381 AP_DEBUG_ASSERT(new_state > S_XXX);
382 return transit(stream, new_state);
383 }
384
385 ++stream->out_frames;
386 stream->out_frame_octets += frame_len;
387 switch (ftype) {
388 case NGHTTP2_DATA:
389 eos = (flags & NGHTTP2_FLAG_END_STREAM);
390 break;
391
392 case NGHTTP2_HEADERS:
393 eos = (flags & NGHTTP2_FLAG_END_STREAM);
394 break;
395
396 case NGHTTP2_PUSH_PROMISE:
397 /* start pushed stream */
398 ap_assert(stream->request == NULL);
399 ap_assert(stream->rtmp != NULL);
400 status = h2_stream_end_headers(stream, 1, 0);
401 if (status != APR_SUCCESS) goto leave;
402 break;
403
404 default:
405 break;
406 }
407 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
408 H2_STRM_MSG(stream, "send frame %d, eos=%d"), ftype, eos);
409 status = transit(stream, new_state);
410 if (status == APR_SUCCESS && eos) {
411 status = transit(stream, on_event(stream, H2_SEV_CLOSED_L));
412 }
413 leave:
414 return status;
415 }
416
h2_stream_recv_frame(h2_stream * stream,int ftype,int flags,size_t frame_len)417 apr_status_t h2_stream_recv_frame(h2_stream *stream, int ftype, int flags, size_t frame_len)
418 {
419 apr_status_t status = APR_SUCCESS;
420 int new_state, eos = 0;
421
422 new_state = on_frame_recv(stream->state, ftype);
423 if (new_state < 0) {
424 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
425 H2_STRM_MSG(stream, "invalid frame %d recv"), ftype);
426 AP_DEBUG_ASSERT(new_state > S_XXX);
427 return transit(stream, new_state);
428 }
429
430 switch (ftype) {
431 case NGHTTP2_DATA:
432 eos = (flags & NGHTTP2_FLAG_END_STREAM);
433 break;
434
435 case NGHTTP2_HEADERS:
436 eos = (flags & NGHTTP2_FLAG_END_STREAM);
437 if (stream->state == H2_SS_OPEN) {
438 /* trailer HEADER */
439 if (!eos) {
440 h2_stream_rst(stream, H2_ERR_PROTOCOL_ERROR);
441 }
442 stream->in_trailer_octets += frame_len;
443 }
444 else {
445 /* request HEADER */
446 ap_assert(stream->request == NULL);
447 if (stream->rtmp == NULL) {
448 /* This can only happen, if the stream has received no header
449 * name/value pairs at all. The latest nghttp2 version have become
450 * pretty good at detecting this early. In any case, we have
451 * to abort the connection here, since this is clearly a protocol error */
452 return APR_EINVAL;
453 }
454 status = h2_stream_end_headers(stream, eos, frame_len);
455 if (status != APR_SUCCESS) goto leave;
456 }
457 break;
458
459 default:
460 break;
461 }
462 status = transit(stream, new_state);
463 if (status == APR_SUCCESS && eos) {
464 status = transit(stream, on_event(stream, H2_SEV_CLOSED_R));
465 }
466 leave:
467 return status;
468 }
469
h2_stream_flush_input(h2_stream * stream)470 apr_status_t h2_stream_flush_input(h2_stream *stream)
471 {
472 apr_status_t status = APR_SUCCESS;
473
474 if (stream->in_buffer && !APR_BRIGADE_EMPTY(stream->in_buffer)) {
475 setup_input(stream);
476 status = h2_beam_send(stream->input, stream->in_buffer, APR_BLOCK_READ);
477 stream->in_last_write = apr_time_now();
478 }
479 if (stream->input_eof
480 && stream->input && !h2_beam_is_closed(stream->input)) {
481 status = h2_beam_close(stream->input);
482 }
483 return status;
484 }
485
h2_stream_recv_DATA(h2_stream * stream,uint8_t flags,const uint8_t * data,size_t len)486 apr_status_t h2_stream_recv_DATA(h2_stream *stream, uint8_t flags,
487 const uint8_t *data, size_t len)
488 {
489 h2_session *session = stream->session;
490 apr_status_t status = APR_SUCCESS;
491
492 stream->in_data_frames++;
493 if (len > 0) {
494 if (APLOGctrace3(session->c)) {
495 const char *load = apr_pstrndup(stream->pool, (const char *)data, len);
496 ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, session->c,
497 H2_STRM_MSG(stream, "recv DATA, len=%d: -->%s<--"),
498 (int)len, load);
499 }
500 else {
501 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c,
502 H2_STRM_MSG(stream, "recv DATA, len=%d"), (int)len);
503 }
504 stream->in_data_octets += len;
505 if (!stream->in_buffer) {
506 stream->in_buffer = apr_brigade_create(stream->pool,
507 session->c->bucket_alloc);
508 }
509 apr_brigade_write(stream->in_buffer, NULL, NULL, (const char *)data, len);
510 h2_stream_dispatch(stream, H2_SEV_IN_DATA_PENDING);
511 }
512 return status;
513 }
514
prep_output(h2_stream * stream)515 static void prep_output(h2_stream *stream) {
516 conn_rec *c = stream->session->c;
517 if (!stream->out_buffer) {
518 stream->out_buffer = apr_brigade_create(stream->pool, c->bucket_alloc);
519 }
520 }
521
h2_stream_create(int id,apr_pool_t * pool,h2_session * session,h2_stream_monitor * monitor,int initiated_on)522 h2_stream *h2_stream_create(int id, apr_pool_t *pool, h2_session *session,
523 h2_stream_monitor *monitor, int initiated_on)
524 {
525 h2_stream *stream = apr_pcalloc(pool, sizeof(h2_stream));
526
527 stream->id = id;
528 stream->initiated_on = initiated_on;
529 stream->created = apr_time_now();
530 stream->state = H2_SS_IDLE;
531 stream->pool = pool;
532 stream->session = session;
533 stream->monitor = monitor;
534 stream->max_mem = session->max_stream_mem;
535
536 #ifdef H2_NG2_LOCAL_WIN_SIZE
537 stream->in_window_size =
538 nghttp2_session_get_stream_local_window_size(
539 stream->session->ngh2, stream->id);
540 #endif
541
542 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
543 H2_STRM_LOG(APLOGNO(03082), stream, "created"));
544 on_state_enter(stream);
545 return stream;
546 }
547
h2_stream_cleanup(h2_stream * stream)548 void h2_stream_cleanup(h2_stream *stream)
549 {
550 apr_status_t status;
551
552 ap_assert(stream);
553 if (stream->out_buffer) {
554 /* remove any left over output buckets that may still have
555 * references into request pools */
556 apr_brigade_cleanup(stream->out_buffer);
557 }
558 if (stream->input) {
559 h2_beam_abort(stream->input);
560 status = h2_beam_wait_empty(stream->input, APR_NONBLOCK_READ);
561 if (status == APR_EAGAIN) {
562 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
563 H2_STRM_MSG(stream, "wait on input drain"));
564 status = h2_beam_wait_empty(stream->input, APR_BLOCK_READ);
565 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, stream->session->c,
566 H2_STRM_MSG(stream, "input drain returned"));
567 }
568 }
569 }
570
h2_stream_destroy(h2_stream * stream)571 void h2_stream_destroy(h2_stream *stream)
572 {
573 ap_assert(stream);
574 ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, stream->session->c,
575 H2_STRM_MSG(stream, "destroy"));
576 apr_pool_destroy(stream->pool);
577 }
578
h2_stream_prep_processing(h2_stream * stream)579 apr_status_t h2_stream_prep_processing(h2_stream *stream)
580 {
581 if (stream->request) {
582 const h2_request *r = stream->request;
583 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
584 H2_STRM_MSG(stream, "schedule %s %s://%s%s chunked=%d"),
585 r->method, r->scheme, r->authority, r->path, r->chunked);
586 setup_input(stream);
587 stream->scheduled = 1;
588 return APR_SUCCESS;
589 }
590 return APR_EINVAL;
591 }
592
h2_stream_rst(h2_stream * stream,int error_code)593 void h2_stream_rst(h2_stream *stream, int error_code)
594 {
595 stream->rst_error = error_code;
596 if (stream->input) {
597 h2_beam_abort(stream->input);
598 }
599 if (stream->output) {
600 h2_beam_leave(stream->output);
601 }
602 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
603 H2_STRM_MSG(stream, "reset, error=%d"), error_code);
604 h2_stream_dispatch(stream, H2_SEV_CANCELLED);
605 }
606
h2_stream_set_request_rec(h2_stream * stream,request_rec * r,int eos)607 apr_status_t h2_stream_set_request_rec(h2_stream *stream,
608 request_rec *r, int eos)
609 {
610 h2_request *req;
611 apr_status_t status;
612
613 ap_assert(stream->request == NULL);
614 ap_assert(stream->rtmp == NULL);
615 if (stream->rst_error) {
616 return APR_ECONNRESET;
617 }
618 status = h2_request_rcreate(&req, stream->pool, r);
619 if (status == APR_SUCCESS) {
620 ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r,
621 H2_STRM_LOG(APLOGNO(03058), stream,
622 "set_request_rec %s host=%s://%s%s"),
623 req->method, req->scheme, req->authority, req->path);
624 stream->rtmp = req;
625 /* simulate the frames that led to this */
626 return h2_stream_recv_frame(stream, NGHTTP2_HEADERS,
627 NGHTTP2_FLAG_END_STREAM, 0);
628 }
629 return status;
630 }
631
h2_stream_set_request(h2_stream * stream,const h2_request * r)632 void h2_stream_set_request(h2_stream *stream, const h2_request *r)
633 {
634 ap_assert(stream->request == NULL);
635 ap_assert(stream->rtmp == NULL);
636 stream->rtmp = h2_request_clone(stream->pool, r);
637 }
638
set_error_response(h2_stream * stream,int http_status)639 static void set_error_response(h2_stream *stream, int http_status)
640 {
641 if (!h2_stream_is_ready(stream) && stream->rtmp) {
642 stream->rtmp->http_status = http_status;
643 }
644 }
645
add_trailer(h2_stream * stream,const char * name,size_t nlen,const char * value,size_t vlen,size_t max_field_len,int * pwas_added)646 static apr_status_t add_trailer(h2_stream *stream,
647 const char *name, size_t nlen,
648 const char *value, size_t vlen,
649 size_t max_field_len, int *pwas_added)
650 {
651 conn_rec *c = stream->session->c;
652 char *hname, *hvalue;
653 const char *existing;
654
655 *pwas_added = 0;
656 if (nlen == 0 || name[0] == ':') {
657 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, APR_EINVAL, c,
658 H2_STRM_LOG(APLOGNO(03060), stream,
659 "pseudo header in trailer"));
660 return APR_EINVAL;
661 }
662 if (h2_req_ignore_trailer(name, nlen)) {
663 return APR_SUCCESS;
664 }
665 if (!stream->trailers) {
666 stream->trailers = apr_table_make(stream->pool, 5);
667 }
668 hname = apr_pstrndup(stream->pool, name, nlen);
669 h2_util_camel_case_header(hname, nlen);
670 existing = apr_table_get(stream->trailers, hname);
671 if (max_field_len
672 && ((existing? strlen(existing)+2 : 0) + vlen + nlen + 2 > max_field_len)) {
673 /* "key: (oldval, )?nval" is too long */
674 return APR_EINVAL;
675 }
676 if (!existing) *pwas_added = 1;
677 hvalue = apr_pstrndup(stream->pool, value, vlen);
678 apr_table_mergen(stream->trailers, hname, hvalue);
679 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c,
680 H2_STRM_MSG(stream, "added trailer '%s: %s'"), hname, hvalue);
681
682 return APR_SUCCESS;
683 }
684
h2_stream_add_header(h2_stream * stream,const char * name,size_t nlen,const char * value,size_t vlen)685 apr_status_t h2_stream_add_header(h2_stream *stream,
686 const char *name, size_t nlen,
687 const char *value, size_t vlen)
688 {
689 h2_session *session = stream->session;
690 int error = 0, was_added = 0;
691 apr_status_t status = APR_SUCCESS;
692
693 if (stream->has_response) {
694 return APR_EINVAL;
695 }
696
697 if (name[0] == ':') {
698 if ((vlen) > session->s->limit_req_line) {
699 /* pseudo header: approximation of request line size check */
700 if (!h2_stream_is_ready(stream)) {
701 ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, session->c,
702 H2_STRM_LOG(APLOGNO(10178), stream,
703 "Request pseudo header exceeds "
704 "LimitRequestFieldSize: %s"), name);
705 }
706 error = HTTP_REQUEST_URI_TOO_LARGE;
707 goto cleanup;
708 }
709 }
710
711 if (session->s->limit_req_fields > 0
712 && stream->request_headers_added > session->s->limit_req_fields) {
713 /* already over limit, count this attempt, but do not take it in */
714 ++stream->request_headers_added;
715 }
716 else if (H2_SS_IDLE == stream->state) {
717 if (!stream->rtmp) {
718 stream->rtmp = h2_req_create(stream->id, stream->pool,
719 NULL, NULL, NULL, NULL, NULL, 0);
720 }
721 status = h2_request_add_header(stream->rtmp, stream->pool,
722 name, nlen, value, vlen,
723 session->s->limit_req_fieldsize, &was_added);
724 if (was_added) ++stream->request_headers_added;
725 }
726 else if (H2_SS_OPEN == stream->state) {
727 status = add_trailer(stream, name, nlen, value, vlen,
728 session->s->limit_req_fieldsize, &was_added);
729 if (was_added) ++stream->request_headers_added;
730 }
731 else {
732 status = APR_EINVAL;
733 goto cleanup;
734 }
735
736 if (APR_EINVAL == status) {
737 /* header too long */
738 if (!h2_stream_is_ready(stream)) {
739 ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, session->c,
740 H2_STRM_LOG(APLOGNO(10180), stream,"Request header exceeds "
741 "LimitRequestFieldSize: %.*s"),
742 (int)H2MIN(nlen, 80), name);
743 }
744 error = HTTP_REQUEST_HEADER_FIELDS_TOO_LARGE;
745 goto cleanup;
746 }
747
748 if (session->s->limit_req_fields > 0
749 && stream->request_headers_added > session->s->limit_req_fields) {
750 /* too many header lines */
751 if (stream->request_headers_added > session->s->limit_req_fields + 100) {
752 /* yeah, right, this request is way over the limit, say goodbye */
753 h2_stream_rst(stream, H2_ERR_ENHANCE_YOUR_CALM);
754 return APR_ECONNRESET;
755 }
756 if (!h2_stream_is_ready(stream)) {
757 ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, session->c,
758 H2_STRM_LOG(APLOGNO(10181), stream, "Number of request headers "
759 "exceeds LimitRequestFields"));
760 }
761 error = HTTP_REQUEST_HEADER_FIELDS_TOO_LARGE;
762 goto cleanup;
763 }
764
765 cleanup:
766 if (error) {
767 set_error_response(stream, error);
768 return APR_EINVAL;
769 }
770 else if (status != APR_SUCCESS) {
771 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
772 H2_STRM_MSG(stream, "header %s not accepted"), name);
773 h2_stream_dispatch(stream, H2_SEV_CANCELLED);
774 }
775 return status;
776 }
777
778 typedef struct {
779 apr_size_t maxlen;
780 const char *failed_key;
781 } val_len_check_ctx;
782
table_check_val_len(void * baton,const char * key,const char * value)783 static int table_check_val_len(void *baton, const char *key, const char *value)
784 {
785 val_len_check_ctx *ctx = baton;
786
787 if (strlen(value) <= ctx->maxlen) return 1;
788 ctx->failed_key = key;
789 return 0;
790 }
791
h2_stream_end_headers(h2_stream * stream,int eos,size_t raw_bytes)792 apr_status_t h2_stream_end_headers(h2_stream *stream, int eos, size_t raw_bytes)
793 {
794 apr_status_t status;
795 val_len_check_ctx ctx;
796
797 status = h2_request_end_headers(stream->rtmp, stream->pool, eos, raw_bytes);
798 if (APR_SUCCESS == status) {
799 set_policy_for(stream, stream->rtmp);
800 stream->request = stream->rtmp;
801 stream->rtmp = NULL;
802
803 ctx.maxlen = stream->session->s->limit_req_fieldsize;
804 ctx.failed_key = NULL;
805 apr_table_do(table_check_val_len, &ctx, stream->request->headers, NULL);
806 if (ctx.failed_key) {
807 if (!h2_stream_is_ready(stream)) {
808 ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, stream->session->c,
809 H2_STRM_LOG(APLOGNO(10230), stream,"Request header exceeds "
810 "LimitRequestFieldSize: %.*s"),
811 (int)H2MIN(strlen(ctx.failed_key), 80), ctx.failed_key);
812 }
813 set_error_response(stream, HTTP_REQUEST_HEADER_FIELDS_TOO_LARGE);
814 /* keep on returning APR_SUCCESS, so that we send a HTTP response and
815 * do not RST the stream. */
816 }
817 }
818 return status;
819 }
820
get_first_headers_bucket(apr_bucket_brigade * bb)821 static apr_bucket *get_first_headers_bucket(apr_bucket_brigade *bb)
822 {
823 if (bb) {
824 apr_bucket *b = APR_BRIGADE_FIRST(bb);
825 while (b != APR_BRIGADE_SENTINEL(bb)) {
826 if (H2_BUCKET_IS_HEADERS(b)) {
827 return b;
828 }
829 b = APR_BUCKET_NEXT(b);
830 }
831 }
832 return NULL;
833 }
834
add_buffered_data(h2_stream * stream,apr_off_t requested,apr_off_t * plen,int * peos,int * is_all,h2_headers ** pheaders)835 static apr_status_t add_buffered_data(h2_stream *stream, apr_off_t requested,
836 apr_off_t *plen, int *peos, int *is_all,
837 h2_headers **pheaders)
838 {
839 apr_bucket *b, *e;
840
841 *peos = 0;
842 *plen = 0;
843 *is_all = 0;
844 if (pheaders) {
845 *pheaders = NULL;
846 }
847
848 H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "add_buffered_data");
849 b = APR_BRIGADE_FIRST(stream->out_buffer);
850 while (b != APR_BRIGADE_SENTINEL(stream->out_buffer)) {
851 e = APR_BUCKET_NEXT(b);
852 if (APR_BUCKET_IS_METADATA(b)) {
853 if (APR_BUCKET_IS_FLUSH(b)) {
854 APR_BUCKET_REMOVE(b);
855 apr_bucket_destroy(b);
856 }
857 else if (APR_BUCKET_IS_EOS(b)) {
858 *peos = 1;
859 return APR_SUCCESS;
860 }
861 else if (H2_BUCKET_IS_HEADERS(b)) {
862 if (*plen > 0) {
863 /* data before the response, can only return up to here */
864 return APR_SUCCESS;
865 }
866 else if (pheaders) {
867 *pheaders = h2_bucket_headers_get(b);
868 APR_BUCKET_REMOVE(b);
869 apr_bucket_destroy(b);
870 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
871 H2_STRM_MSG(stream, "prep, -> response %d"),
872 (*pheaders)->status);
873 return APR_SUCCESS;
874 }
875 else {
876 return APR_EAGAIN;
877 }
878 }
879 }
880 else if (b->length == 0) {
881 APR_BUCKET_REMOVE(b);
882 apr_bucket_destroy(b);
883 }
884 else {
885 ap_assert(b->length != (apr_size_t)-1);
886 *plen += b->length;
887 if (*plen >= requested) {
888 *plen = requested;
889 return APR_SUCCESS;
890 }
891 }
892 b = e;
893 }
894 *is_all = 1;
895 return APR_SUCCESS;
896 }
897
h2_stream_out_prepare(h2_stream * stream,apr_off_t * plen,int * peos,h2_headers ** pheaders)898 apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen,
899 int *peos, h2_headers **pheaders)
900 {
901 apr_status_t status = APR_SUCCESS;
902 apr_off_t requested, missing, max_chunk = H2_DATA_CHUNK_SIZE;
903 conn_rec *c;
904 int complete, was_closed = 0;
905
906 ap_assert(stream);
907
908 if (stream->rst_error) {
909 *plen = 0;
910 *peos = 1;
911 return APR_ECONNRESET;
912 }
913
914 c = stream->session->c;
915 prep_output(stream);
916
917 /* determine how much we'd like to send. We cannot send more than
918 * is requested. But we can reduce the size in case the master
919 * connection operates in smaller chunks. (TSL warmup) */
920 if (stream->session->io.write_size > 0) {
921 max_chunk = stream->session->io.write_size - H2_FRAME_HDR_LEN;
922 }
923 requested = (*plen > 0)? H2MIN(*plen, max_chunk) : max_chunk;
924
925 /* count the buffered data until eos or a headers bucket */
926 status = add_buffered_data(stream, requested, plen, peos, &complete, pheaders);
927
928 if (status == APR_EAGAIN) {
929 /* TODO: ugly, someone needs to retrieve the response first */
930 h2_mplx_m_keep_active(stream->session->mplx, stream);
931 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
932 H2_STRM_MSG(stream, "prep, response eagain"));
933 return status;
934 }
935 else if (status != APR_SUCCESS) {
936 return status;
937 }
938
939 if (pheaders && *pheaders) {
940 return APR_SUCCESS;
941 }
942
943 /* If there we do not have enough buffered data to satisfy the requested
944 * length *and* we counted the _complete_ buffer (and did not stop in the middle
945 * because of meta data there), lets see if we can read more from the
946 * output beam */
947 missing = H2MIN(requested, stream->max_mem) - *plen;
948 if (complete && !*peos && missing > 0) {
949 apr_status_t rv = APR_EOF;
950
951 if (stream->output) {
952 H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "pre");
953 h2_beam_log(stream->output, c, APLOG_TRACE2, "pre read output");
954 rv = h2_beam_receive(stream->output, stream->out_buffer,
955 APR_NONBLOCK_READ, stream->max_mem - *plen, &was_closed);
956 H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "post");
957 h2_beam_log(stream->output, c, APLOG_TRACE2, "post read output");
958 }
959
960 if (rv == APR_SUCCESS) {
961 /* count the buffer again, now that we have read output */
962 status = add_buffered_data(stream, requested, plen, peos, &complete, pheaders);
963 }
964 else if (APR_STATUS_IS_EOF(rv)) {
965 apr_bucket *eos = apr_bucket_eos_create(c->bucket_alloc);
966 APR_BRIGADE_INSERT_TAIL(stream->out_buffer, eos);
967 *peos = 1;
968 }
969 else if (APR_STATUS_IS_EAGAIN(rv)) {
970 /* we set this is the status of this call only if there
971 * is no buffered data, see check below */
972 }
973 else {
974 /* real error reading. Give this back directly, even though
975 * we may have something buffered. */
976 status = rv;
977 }
978 }
979
980 if (status == APR_SUCCESS) {
981 if (*peos || *plen) {
982 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
983 H2_STRM_MSG(stream, "prepare, len=%ld eos=%d"),
984 (long)*plen, *peos);
985 }
986 else {
987 status = was_closed? APR_EOF : APR_EAGAIN;
988 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
989 H2_STRM_MSG(stream, "prepare, no data"));
990 }
991 }
992 return status;
993 }
994
is_not_headers(apr_bucket * b)995 static int is_not_headers(apr_bucket *b)
996 {
997 return !H2_BUCKET_IS_HEADERS(b);
998 }
999
h2_stream_read_to(h2_stream * stream,apr_bucket_brigade * bb,apr_off_t * plen,int * peos)1000 apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb,
1001 apr_off_t *plen, int *peos)
1002 {
1003 conn_rec *c = stream->session->c;
1004 apr_status_t status = APR_SUCCESS;
1005
1006 if (stream->rst_error) {
1007 return APR_ECONNRESET;
1008 }
1009 status = h2_append_brigade(bb, stream->out_buffer, plen, peos, is_not_headers);
1010 if (status == APR_SUCCESS && !*peos && !*plen) {
1011 status = APR_EAGAIN;
1012 }
1013 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, c,
1014 H2_STRM_MSG(stream, "read_to, len=%ld eos=%d"),
1015 (long)*plen, *peos);
1016 return status;
1017 }
1018
1019
h2_stream_submit_pushes(h2_stream * stream,h2_headers * response)1020 apr_status_t h2_stream_submit_pushes(h2_stream *stream, h2_headers *response)
1021 {
1022 apr_status_t status = APR_SUCCESS;
1023 apr_array_header_t *pushes;
1024 int i;
1025
1026 pushes = h2_push_collect_update(stream, stream->request, response);
1027 if (pushes && !apr_is_empty_array(pushes)) {
1028 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
1029 H2_STRM_MSG(stream, "found %d push candidates"),
1030 pushes->nelts);
1031 for (i = 0; i < pushes->nelts; ++i) {
1032 h2_push *push = APR_ARRAY_IDX(pushes, i, h2_push*);
1033 h2_stream *s = h2_session_push(stream->session, stream, push);
1034 if (!s) {
1035 status = APR_ECONNRESET;
1036 break;
1037 }
1038 }
1039 }
1040 return status;
1041 }
1042
h2_stream_get_trailers(h2_stream * stream)1043 apr_table_t *h2_stream_get_trailers(h2_stream *stream)
1044 {
1045 return NULL;
1046 }
1047
h2_stream_get_priority(h2_stream * stream,h2_headers * response)1048 const h2_priority *h2_stream_get_priority(h2_stream *stream,
1049 h2_headers *response)
1050 {
1051 if (response && stream->initiated_on) {
1052 const char *ctype = apr_table_get(response->headers, "content-type");
1053 if (ctype) {
1054 /* FIXME: Not good enough, config needs to come from request->server */
1055 return h2_cconfig_get_priority(stream->session->c, ctype);
1056 }
1057 }
1058 return NULL;
1059 }
1060
h2_stream_is_ready(h2_stream * stream)1061 int h2_stream_is_ready(h2_stream *stream)
1062 {
1063 if (stream->has_response) {
1064 return 1;
1065 }
1066 else if (stream->out_buffer && get_first_headers_bucket(stream->out_buffer)) {
1067 return 1;
1068 }
1069 return 0;
1070 }
1071
h2_stream_was_closed(const h2_stream * stream)1072 int h2_stream_was_closed(const h2_stream *stream)
1073 {
1074 switch (stream->state) {
1075 case H2_SS_CLOSED:
1076 case H2_SS_CLEANUP:
1077 return 1;
1078 default:
1079 return 0;
1080 }
1081 }
1082
h2_stream_in_consumed(h2_stream * stream,apr_off_t amount)1083 apr_status_t h2_stream_in_consumed(h2_stream *stream, apr_off_t amount)
1084 {
1085 h2_session *session = stream->session;
1086
1087 if (amount > 0) {
1088 apr_off_t consumed = amount;
1089
1090 while (consumed > 0) {
1091 int len = (consumed > INT_MAX)? INT_MAX : (int)consumed;
1092 nghttp2_session_consume(session->ngh2, stream->id, len);
1093 consumed -= len;
1094 }
1095
1096 #ifdef H2_NG2_LOCAL_WIN_SIZE
1097 if (1) {
1098 int cur_size = nghttp2_session_get_stream_local_window_size(
1099 session->ngh2, stream->id);
1100 int win = stream->in_window_size;
1101 int thigh = win * 8/10;
1102 int tlow = win * 2/10;
1103 const int win_max = 2*1024*1024;
1104 const int win_min = 32*1024;
1105
1106 /* Work in progress, probably should add directives for these
1107 * values once this stabilizes somewhat. The general idea is
1108 * to adapt stream window sizes if the input window changes
1109 * a) very quickly (< good RTT) from full to empty
1110 * b) only a little bit (> bad RTT)
1111 * where in a) it grows and in b) it shrinks again.
1112 */
1113 if (cur_size > thigh && amount > thigh && win < win_max) {
1114 /* almost empty again with one reported consumption, how
1115 * long did this take? */
1116 long ms = apr_time_msec(apr_time_now() - stream->in_last_write);
1117 if (ms < 40) {
1118 win = H2MIN(win_max, win + (64*1024));
1119 }
1120 }
1121 else if (cur_size < tlow && amount < tlow && win > win_min) {
1122 /* staying full, for how long already? */
1123 long ms = apr_time_msec(apr_time_now() - stream->in_last_write);
1124 if (ms > 700) {
1125 win = H2MAX(win_min, win - (32*1024));
1126 }
1127 }
1128
1129 if (win != stream->in_window_size) {
1130 stream->in_window_size = win;
1131 nghttp2_session_set_local_window_size(session->ngh2,
1132 NGHTTP2_FLAG_NONE, stream->id, win);
1133 }
1134 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
1135 "h2_stream(%ld-%d): consumed %ld bytes, window now %d/%d",
1136 session->id, stream->id, (long)amount,
1137 cur_size, stream->in_window_size);
1138 }
1139 #endif
1140 }
1141 return APR_SUCCESS;
1142 }
1143
1144