1 /*
2 * Copyright (c) 2018 Ichito Nagata, Fastly, Inc.
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to
6 * deal in the Software without restriction, including without limitation the
7 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
8 * sell copies of the Software, and to permit persons to whom the Software is
9 * furnished to do so, subject to the following conditions:
10 *
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
13 *
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20 * IN THE SOFTWARE.
21 */
22 #include <arpa/inet.h>
23 #include <netdb.h>
24 #include <netinet/in.h>
25 #include <sys/socket.h>
26 #include <sys/types.h>
27 #include <sys/un.h>
28 #include "khash.h"
29 #include "h2o/hpack.h"
30 #include "h2o/httpclient.h"
31 #include "h2o/http2_common.h"
32
33 #define H2O_HTTP2_SETTINGS_CLIENT_CONNECTION_WINDOW_SIZE 16777216
34 #define H2O_HTTP2_SETTINGS_CLIENT_HEADER_TABLE_SIZE 4096
35 #define H2O_HTTP2_SETTINGS_CLIENT_MAX_FRAME_SIZE 16384
36
37 enum enum_h2o_http2client_stream_state {
38 STREAM_STATE_HEAD,
39 STREAM_STATE_BODY,
40 STREAM_STATE_CLOSED,
41 };
42
43 enum enum_h2o_http2client_conn_state {
44 H2O_HTTP2CLIENT_CONN_STATE_OPEN,
45 H2O_HTTP2CLIENT_CONN_STATE_HALF_CLOSED,
46 H2O_HTTP2CLIENT_CONN_STATE_IS_CLOSING,
47 };
48
49 struct st_h2o_http2client_stream_t;
50 KHASH_MAP_INIT_INT64(stream, struct st_h2o_http2client_stream_t *)
51
52 struct st_h2o_http2client_conn_t {
53 h2o_httpclient__h2_conn_t super;
54 enum enum_h2o_http2client_conn_state state;
55 khash_t(stream) * streams;
56 h2o_http2_settings_t peer_settings;
57 uint32_t max_open_stream_id;
58 h2o_timer_t io_timeout;
59 h2o_timer_t keepalive_timeout;
60
61 struct {
62 h2o_hpack_header_table_t header_table;
63 h2o_http2_window_t window;
64 h2o_buffer_t *buf;
65 h2o_buffer_t *buf_in_flight;
66 h2o_timer_t defer_timeout;
67 h2o_linklist_t sending_streams;
68 h2o_linklist_t sent_streams;
69 } output;
70
71 struct {
72 h2o_hpack_header_table_t header_table;
73 h2o_http2_window_t window;
74 ssize_t (*read_frame)(struct st_h2o_http2client_conn_t *conn, const uint8_t *src, size_t len, const char **err_desc);
75 h2o_buffer_t *headers_unparsed;
76 } input;
77 };
78
79 struct st_h2o_http2client_stream_t {
80 h2o_httpclient_t super;
81 struct st_h2o_http2client_conn_t *conn;
82 uint32_t stream_id;
83 struct {
84 enum enum_h2o_http2client_stream_state req;
85 enum enum_h2o_http2client_stream_state res;
86 } state;
87
88 struct {
89 h2o_http2_window_t window;
90 h2o_buffer_t *buf;
91 h2o_linklist_t sending_link;
92 } output;
93
94 struct {
95 h2o_http2_window_t window;
96 int status;
97 h2o_headers_t headers;
98 h2o_buffer_t *body;
99 } input;
100
101 struct {
102 h2o_httpclient_proceed_req_cb proceed_req;
103 unsigned char done : 1;
104 unsigned char inflight : 1;
105 } streaming;
106 };
107
108 static void do_emit_writereq(struct st_h2o_http2client_conn_t *conn);
109
request_write(struct st_h2o_http2client_conn_t * conn)110 static void request_write(struct st_h2o_http2client_conn_t *conn)
111 {
112 if (conn->state == H2O_HTTP2CLIENT_CONN_STATE_IS_CLOSING)
113 return;
114 if (!h2o_socket_is_writing(conn->super.sock) && !h2o_timer_is_linked(&conn->output.defer_timeout))
115 h2o_timer_link(conn->super.ctx->loop, 0, &conn->output.defer_timeout);
116 }
117
enqueue_window_update(struct st_h2o_http2client_conn_t * conn,uint32_t stream_id,h2o_http2_window_t * window,size_t desired)118 static void enqueue_window_update(struct st_h2o_http2client_conn_t *conn, uint32_t stream_id, h2o_http2_window_t *window,
119 size_t desired)
120 {
121 assert(desired <= INT32_MAX);
122 if (h2o_http2_window_get_avail(window) * 2 < desired) {
123 int32_t delta = (int32_t)(desired - h2o_http2_window_get_avail(window));
124 h2o_http2_encode_window_update_frame(&conn->output.buf, stream_id, delta);
125 request_write(conn);
126 h2o_http2_window_update(window, delta);
127 }
128 }
129
stream_send_error(struct st_h2o_http2client_conn_t * conn,uint32_t stream_id,int errnum)130 static void stream_send_error(struct st_h2o_http2client_conn_t *conn, uint32_t stream_id, int errnum)
131 {
132 assert(stream_id != 0);
133 assert(conn->state != H2O_HTTP2CLIENT_CONN_STATE_IS_CLOSING);
134
135 h2o_http2_encode_rst_stream_frame(&conn->output.buf, stream_id, -errnum);
136 request_write(conn);
137 }
138
get_stream(struct st_h2o_http2client_conn_t * conn,uint32_t stream_id)139 static struct st_h2o_http2client_stream_t *get_stream(struct st_h2o_http2client_conn_t *conn, uint32_t stream_id)
140 {
141 khiter_t iter = kh_get(stream, conn->streams, stream_id);
142 if (iter != kh_end(conn->streams))
143 return (struct st_h2o_http2client_stream_t *)kh_val(conn->streams, iter);
144 return NULL;
145 }
146
get_max_buffer_size(h2o_httpclient_ctx_t * ctx)147 static uint32_t get_max_buffer_size(h2o_httpclient_ctx_t *ctx)
148 {
149 size_t sz = ctx->max_buffer_size;
150 if (sz > INT32_MAX)
151 sz = INT32_MAX;
152 return (uint32_t)sz;
153 }
154
h2o_httpclient__h2_get_max_concurrent_streams(h2o_httpclient__h2_conn_t * _conn)155 uint32_t h2o_httpclient__h2_get_max_concurrent_streams(h2o_httpclient__h2_conn_t *_conn)
156 {
157 struct st_h2o_http2client_conn_t *conn = (void *)_conn;
158 return conn->peer_settings.max_concurrent_streams < conn->super.ctx->http2.max_concurrent_streams
159 ? conn->peer_settings.max_concurrent_streams
160 : conn->super.ctx->http2.max_concurrent_streams;
161 }
162
adjust_conn_linkedlist(h2o_httpclient_connection_pool_t * connpool,struct st_h2o_http2client_conn_t * conn,int forward)163 static void adjust_conn_linkedlist(h2o_httpclient_connection_pool_t *connpool, struct st_h2o_http2client_conn_t *conn, int forward)
164 {
165 if (connpool == NULL) {
166 assert(!h2o_linklist_is_linked(&conn->super.link));
167 return;
168 }
169 if (!h2o_linklist_is_linked(&conn->super.link))
170 return;
171
172 double ratio = (double)conn->super.num_streams / h2o_httpclient__h2_get_max_concurrent_streams(&conn->super);
173
174 /* adjust connection linked list */
175 h2o_linklist_t *node = forward ? conn->super.link.next : conn->super.link.prev;
176 while (node != &connpool->http2.conns) {
177 struct st_h2o_http2client_conn_t *cur = H2O_STRUCT_FROM_MEMBER(struct st_h2o_http2client_conn_t, super.link, node);
178 double cur_ratio = (double)cur->super.num_streams / h2o_httpclient__h2_get_max_concurrent_streams(&cur->super);
179 if (forward ? (ratio <= cur_ratio) : (ratio >= cur_ratio))
180 break;
181 node = forward ? node->next : node->prev;
182 }
183 if (forward) {
184 if (node == conn->super.link.next)
185 return;
186 } else {
187 if (node == conn->super.link.prev)
188 return;
189 if (node != &connpool->http2.conns)
190 node = node->next; /* do `insert after` rather than `insert before` */
191 }
192 h2o_linklist_unlink(&conn->super.link);
193 h2o_linklist_insert(node, &conn->super.link);
194 }
195
register_stream(struct st_h2o_http2client_stream_t * stream,struct st_h2o_http2client_conn_t * conn)196 static void register_stream(struct st_h2o_http2client_stream_t *stream, struct st_h2o_http2client_conn_t *conn)
197 {
198 assert(stream->stream_id == 0);
199
200 stream->conn = conn;
201
202 stream->stream_id = conn->max_open_stream_id == 0 ? 1 : conn->max_open_stream_id + 2;
203 conn->max_open_stream_id = stream->stream_id;
204
205 int r;
206 khiter_t iter = kh_put(stream, conn->streams, stream->stream_id, &r);
207 assert(iter != kh_end(conn->streams));
208 kh_val(conn->streams, iter) = stream;
209
210 ++conn->super.num_streams;
211
212 if (h2o_timer_is_linked(&conn->keepalive_timeout))
213 h2o_timer_unlink(&conn->keepalive_timeout);
214
215 adjust_conn_linkedlist(stream->super.connpool, conn, 1);
216 }
217
unregister_stream(struct st_h2o_http2client_stream_t * stream)218 static void unregister_stream(struct st_h2o_http2client_stream_t *stream)
219 {
220 khiter_t iter = kh_get(stream, stream->conn->streams, stream->stream_id);
221 assert(iter != kh_end(stream->conn->streams));
222 kh_del(stream, stream->conn->streams, iter);
223
224 --stream->conn->super.num_streams;
225
226 if (stream->conn->super.num_streams == 0)
227 h2o_timer_link(stream->conn->super.ctx->loop, stream->conn->super.ctx->keepalive_timeout, &stream->conn->keepalive_timeout);
228
229 adjust_conn_linkedlist(stream->super.connpool, stream->conn, 0);
230 }
231
close_stream(struct st_h2o_http2client_stream_t * stream)232 static void close_stream(struct st_h2o_http2client_stream_t *stream)
233 {
234 if (stream->conn != NULL) {
235 unregister_stream(stream);
236 }
237
238 if (h2o_timer_is_linked(&stream->super._timeout))
239 h2o_timer_unlink(&stream->super._timeout);
240 if (h2o_linklist_is_linked(&stream->output.sending_link))
241 h2o_linklist_unlink(&stream->output.sending_link);
242
243 if (stream->output.buf != NULL)
244 h2o_buffer_dispose(&stream->output.buf);
245 h2o_buffer_dispose(&stream->input.body);
246
247 free(stream);
248 }
249
close_response(struct st_h2o_http2client_stream_t * stream)250 static void close_response(struct st_h2o_http2client_stream_t *stream)
251 {
252 assert(stream->state.res != STREAM_STATE_CLOSED);
253 stream->state.res = STREAM_STATE_CLOSED;
254 if (stream->state.req == STREAM_STATE_CLOSED) {
255 close_stream(stream);
256 }
257 }
258
call_callback_with_error(struct st_h2o_http2client_stream_t * stream,const char * errstr)259 static void call_callback_with_error(struct st_h2o_http2client_stream_t *stream, const char *errstr)
260 {
261 assert(errstr != NULL);
262 switch (stream->state.res) {
263 case STREAM_STATE_HEAD: {
264 h2o_httpclient_on_head_t on_head = {.version = 0x200};
265 stream->super._cb.on_head(&stream->super, errstr, &on_head);
266 } break;
267 case STREAM_STATE_BODY:
268 stream->super._cb.on_body(&stream->super, errstr);
269 break;
270 case STREAM_STATE_CLOSED:
271 if (stream->streaming.proceed_req != NULL) {
272 stream->streaming.inflight = 0; /* proceed_req can be called to indicate error, regardless of write being inflight */
273 stream->streaming.proceed_req(&stream->super, errstr);
274 }
275 break;
276 }
277 }
278
call_stream_callbacks_with_error(struct st_h2o_http2client_conn_t * conn,const char * errstr)279 static void call_stream_callbacks_with_error(struct st_h2o_http2client_conn_t *conn, const char *errstr)
280 {
281 struct st_h2o_http2client_stream_t *stream;
282 kh_foreach_value(conn->streams, stream, { call_callback_with_error(stream, errstr); });
283 }
284
on_head(struct st_h2o_http2client_conn_t * conn,struct st_h2o_http2client_stream_t * stream,const uint8_t * src,size_t len,const char ** err_desc,int is_end_stream)285 static int on_head(struct st_h2o_http2client_conn_t *conn, struct st_h2o_http2client_stream_t *stream, const uint8_t *src,
286 size_t len, const char **err_desc, int is_end_stream)
287 {
288 int ret;
289
290 // assert(stream->state == H2O_HTTP2CLIENT_STREAM_STATE_RECV_HEADERS);
291
292 if ((ret = h2o_hpack_parse_response(stream->super.pool, h2o_hpack_decode_header, &conn->input.header_table,
293 &stream->input.status, &stream->input.headers, NULL, src, len, err_desc)) != 0) {
294 if (ret == H2O_HTTP2_ERROR_INVALID_HEADER_CHAR) {
295 ret = H2O_HTTP2_ERROR_PROTOCOL;
296 goto Failed;
297 }
298 return ret;
299 }
300
301 if (100 <= stream->input.status && stream->input.status <= 199) {
302 if (stream->input.status == 101) {
303 ret = H2O_HTTP2_ERROR_PROTOCOL; // TODO is this alright?
304 goto Failed;
305 }
306 if (stream->super.informational_cb != NULL &&
307 stream->super.informational_cb(&stream->super, 0, stream->input.status, h2o_iovec_init(NULL, 0),
308 stream->input.headers.entries, stream->input.headers.size) != 0) {
309 ret = H2O_HTTP2_ERROR_INTERNAL;
310 goto SendRSTStream;
311 }
312 return 0;
313 }
314
315 h2o_httpclient_on_head_t on_head = {.version = 0x200,
316 .status = stream->input.status,
317 .msg = h2o_iovec_init(NULL, 0),
318 .headers = stream->input.headers.entries,
319 .num_headers = stream->input.headers.size};
320 stream->super._cb.on_body =
321 stream->super._cb.on_head(&stream->super, is_end_stream ? h2o_httpclient_error_is_eos : NULL, &on_head);
322
323 if (is_end_stream) {
324 close_response(stream);
325 return 0;
326 }
327 if (stream->super._cb.on_body == NULL) {
328 /**
329 * NOTE: if on_head returns NULL due to invalid response (e.g. invalid content-length header)
330 * sending RST_STREAM with PROTOCOL_ERROR might be more suitable than CANCEL
331 * (see: https://tools.ietf.org/html/rfc7540#section-8.1.2.6)
332 * but sending CANCEL is not wrong, so we leave this as-is for now.
333 */
334 ret = H2O_HTTP2_ERROR_CANCEL;
335 goto SendRSTStream;
336 }
337
338 stream->state.res = STREAM_STATE_BODY;
339
340 return 0;
341
342 Failed:
343 assert(ret == H2O_HTTP2_ERROR_PROTOCOL);
344 call_callback_with_error(stream, h2o_httpclient_error_protocol_violation);
345 SendRSTStream:
346 stream_send_error(conn, stream->stream_id, ret);
347 close_stream(stream);
348 return 0;
349 }
350
351 ssize_t expect_default(struct st_h2o_http2client_conn_t *conn, const uint8_t *src, size_t len, const char **err_desc);
expect_continuation_of_headers(struct st_h2o_http2client_conn_t * conn,const uint8_t * src,size_t len,const char ** err_desc)352 static ssize_t expect_continuation_of_headers(struct st_h2o_http2client_conn_t *conn, const uint8_t *src, size_t len,
353 const char **err_desc)
354 {
355 h2o_http2_frame_t frame;
356 ssize_t ret;
357 struct st_h2o_http2client_stream_t *stream;
358 int hret;
359
360 if ((ret = h2o_http2_decode_frame(&frame, src, len, H2O_HTTP2_SETTINGS_CLIENT_MAX_FRAME_SIZE, err_desc)) < 0)
361 return ret;
362 if (frame.type != H2O_HTTP2_FRAME_TYPE_CONTINUATION) {
363 *err_desc = "expected CONTINUATION frame";
364 return H2O_HTTP2_ERROR_PROTOCOL;
365 }
366
367 if ((stream = get_stream(conn, frame.stream_id)) == NULL || stream->state.res == STREAM_STATE_CLOSED) {
368 *err_desc = "unexpected stream id in CONTINUATION frame";
369 return H2O_HTTP2_ERROR_PROTOCOL;
370 }
371
372 if (stream->state.res == STREAM_STATE_BODY) {
373 /* is a trailer, do nothing */
374 return ret;
375 }
376
377 h2o_buffer_reserve(&conn->input.headers_unparsed, frame.length);
378 memcpy(conn->input.headers_unparsed->bytes + conn->input.headers_unparsed->size, frame.payload, frame.length);
379 conn->input.headers_unparsed->size += frame.length;
380
381 if ((frame.flags & H2O_HTTP2_FRAME_FLAG_END_HEADERS) != 0) {
382 int is_end_stream = (frame.flags & H2O_HTTP2_FRAME_FLAG_END_STREAM) != 0;
383 conn->input.read_frame = expect_default;
384 hret = on_head(conn, stream, (const uint8_t *)conn->input.headers_unparsed->bytes, conn->input.headers_unparsed->size,
385 err_desc, is_end_stream);
386 if (hret != 0)
387 ret = hret;
388 h2o_buffer_dispose(&conn->input.headers_unparsed);
389 conn->input.headers_unparsed = NULL;
390 }
391
392 return ret;
393 }
394
395 static void do_update_window(h2o_httpclient_t *client);
handle_data_frame(struct st_h2o_http2client_conn_t * conn,h2o_http2_frame_t * frame,const char ** err_desc)396 static int handle_data_frame(struct st_h2o_http2client_conn_t *conn, h2o_http2_frame_t *frame, const char **err_desc)
397 {
398 h2o_http2_data_payload_t payload;
399 struct st_h2o_http2client_stream_t *stream;
400 int ret;
401
402 if ((ret = h2o_http2_decode_data_payload(&payload, frame, err_desc)) != 0)
403 return ret;
404
405 /* save the input in the request body buffer, or send error (and close the stream) */
406 if ((stream = get_stream(conn, frame->stream_id)) == NULL) {
407 if (frame->stream_id <= conn->max_open_stream_id) {
408 stream_send_error(conn, frame->stream_id, H2O_HTTP2_ERROR_STREAM_CLOSED);
409 return 0;
410 } else {
411 *err_desc = "invalid DATA frame";
412 return H2O_HTTP2_ERROR_PROTOCOL;
413 }
414 }
415
416 if (stream->state.res != STREAM_STATE_BODY) {
417 stream_send_error(conn, frame->stream_id, H2O_HTTP2_ERROR_PROTOCOL);
418 call_callback_with_error(stream, h2o_httpclient_error_protocol_violation);
419 close_stream(stream);
420 return 0;
421 }
422
423 size_t max_size = get_max_buffer_size(stream->super.ctx);
424 if (stream->input.body->size + payload.length > max_size) {
425 stream->super._cb.on_body(&stream->super, h2o_httpclient_error_flow_control);
426 stream_send_error(stream->conn, stream->stream_id, H2O_HTTP2_ERROR_FLOW_CONTROL);
427 close_stream(stream);
428 return 0;
429 }
430
431 h2o_buffer_append(&stream->input.body, (void *)payload.data, payload.length);
432
433 h2o_http2_window_consume_window(&conn->input.window, payload.length);
434 h2o_http2_window_consume_window(&stream->input.window, payload.length);
435
436 int is_final = (frame->flags & H2O_HTTP2_FRAME_FLAG_END_STREAM) != 0;
437 if (stream->super._cb.on_body(&stream->super, is_final ? h2o_httpclient_error_is_eos : NULL) != 0) {
438 stream_send_error(conn, frame->stream_id, H2O_HTTP2_ERROR_INTERNAL);
439 close_stream(stream);
440 return 0;
441 }
442
443 if (is_final) {
444 close_response(stream);
445 } else {
446 /* update connection-level window */
447 enqueue_window_update(stream->conn, 0, &stream->conn->input.window, H2O_HTTP2_SETTINGS_CLIENT_CONNECTION_WINDOW_SIZE);
448 /* update stream-level window */
449 do_update_window(&stream->super);
450 }
451
452 return 0;
453 }
454
handle_headers_frame(struct st_h2o_http2client_conn_t * conn,h2o_http2_frame_t * frame,const char ** err_desc)455 static int handle_headers_frame(struct st_h2o_http2client_conn_t *conn, h2o_http2_frame_t *frame, const char **err_desc)
456 {
457 h2o_http2_headers_payload_t payload;
458 struct st_h2o_http2client_stream_t *stream;
459 int ret;
460
461 /* decode */
462 if ((ret = h2o_http2_decode_headers_payload(&payload, frame, err_desc)) != 0)
463 return ret;
464 if ((frame->stream_id & 1) == 0) {
465 *err_desc = "invalid stream id in HEADERS frame";
466 return H2O_HTTP2_ERROR_PROTOCOL;
467 }
468
469 if (frame->stream_id == payload.priority.dependency) {
470 *err_desc = "stream cannot depend on itself";
471 return H2O_HTTP2_ERROR_PROTOCOL;
472 }
473
474 if ((stream = get_stream(conn, frame->stream_id)) == NULL) {
475 *err_desc = "invalid stream id in HEADERS frame";
476 return H2O_HTTP2_ERROR_STREAM_CLOSED;
477 }
478
479 h2o_timer_unlink(&stream->super._timeout);
480
481 if (stream->state.res == STREAM_STATE_BODY) {
482 /* is a trailer (ignore after only validating it) */
483 if ((frame->flags & H2O_HTTP2_FRAME_FLAG_END_STREAM) == 0) {
484 *err_desc = "trailing HEADERS frame MUST have END_STREAM flag set";
485 return H2O_HTTP2_ERROR_PROTOCOL;
486 }
487 if ((frame->flags & H2O_HTTP2_FRAME_FLAG_END_HEADERS) == 0) {
488 /* read following continuation frames without initializing `headers_unparsed` */
489 conn->input.read_frame = expect_continuation_of_headers;
490 }
491 return 0;
492 }
493
494 if ((frame->flags & H2O_HTTP2_FRAME_FLAG_END_HEADERS) == 0) {
495 /* request is not complete, store in buffer */
496 conn->input.read_frame = expect_continuation_of_headers;
497 h2o_buffer_init(&conn->input.headers_unparsed, &h2o_socket_buffer_prototype);
498 h2o_buffer_reserve(&conn->input.headers_unparsed, payload.headers_len);
499 memcpy(conn->input.headers_unparsed->bytes, payload.headers, payload.headers_len);
500 conn->input.headers_unparsed->size = payload.headers_len;
501 return 0;
502 }
503
504 int is_end_stream = (frame->flags & H2O_HTTP2_FRAME_FLAG_END_STREAM) != 0;
505
506 /* response header is complete, handle it */
507 return on_head(conn, stream, payload.headers, payload.headers_len, err_desc, is_end_stream);
508 }
509
handle_priority_frame(struct st_h2o_http2client_conn_t * conn,h2o_http2_frame_t * frame,const char ** err_desc)510 static int handle_priority_frame(struct st_h2o_http2client_conn_t *conn, h2o_http2_frame_t *frame, const char **err_desc)
511 {
512 h2o_http2_priority_t payload;
513 int ret;
514
515 if ((ret = h2o_http2_decode_priority_payload(&payload, frame, err_desc)) != 0)
516 return ret;
517 if (frame->stream_id == payload.dependency) {
518 *err_desc = "stream cannot depend on itself";
519 return H2O_HTTP2_ERROR_PROTOCOL;
520 }
521
522 /* Ignore PRIORITY frames */
523 return 0;
524 }
525
handle_rst_stream_frame(struct st_h2o_http2client_conn_t * conn,h2o_http2_frame_t * frame,const char ** err_desc)526 static int handle_rst_stream_frame(struct st_h2o_http2client_conn_t *conn, h2o_http2_frame_t *frame, const char **err_desc)
527 {
528 h2o_http2_rst_stream_payload_t payload;
529 struct st_h2o_http2client_stream_t *stream;
530 int ret;
531
532 if ((ret = h2o_http2_decode_rst_stream_payload(&payload, frame, err_desc)) != 0)
533 return ret;
534 if (frame->stream_id > conn->max_open_stream_id) {
535 *err_desc = "unexpected stream id in RST_STREAM frame";
536 return H2O_HTTP2_ERROR_PROTOCOL;
537 }
538
539 stream = get_stream(conn, frame->stream_id);
540 if (stream != NULL) {
541 /* reset the stream */
542 call_callback_with_error(stream, h2o_httpclient_error_refused_stream);
543 close_stream(stream);
544 }
545
546 return 0;
547 }
548
update_stream_output_window(struct st_h2o_http2client_stream_t * stream,ssize_t delta)549 static int update_stream_output_window(struct st_h2o_http2client_stream_t *stream, ssize_t delta)
550 {
551 ssize_t before = h2o_http2_window_get_avail(&stream->output.window);
552 if (h2o_http2_window_update(&stream->output.window, delta) != 0)
553 return -1;
554 ssize_t after = h2o_http2_window_get_avail(&stream->output.window);
555 if (before <= 0 && 0 < after && stream->output.buf != NULL && stream->output.buf->size != 0) {
556 assert(!h2o_linklist_is_linked(&stream->output.sending_link));
557 h2o_linklist_insert(&stream->conn->output.sending_streams, &stream->output.sending_link);
558 }
559 return 0;
560 }
561
conn_get_buffer_window(struct st_h2o_http2client_conn_t * conn)562 static ssize_t conn_get_buffer_window(struct st_h2o_http2client_conn_t *conn)
563 {
564 ssize_t ret, winsz;
565 size_t capacity, cwnd_left;
566
567 capacity = conn->output.buf->capacity;
568 if ((cwnd_left = h2o_socket_prepare_for_latency_optimized_write(conn->super.sock,
569 &conn->super.ctx->http2.latency_optimization)) < capacity) {
570 capacity = cwnd_left;
571 if (capacity < conn->output.buf->size)
572 return 0;
573 }
574
575 ret = capacity - conn->output.buf->size;
576 if (ret < H2O_HTTP2_FRAME_HEADER_SIZE)
577 return 0;
578 ret -= H2O_HTTP2_FRAME_HEADER_SIZE;
579 winsz = h2o_http2_window_get_avail(&conn->output.window);
580 if (winsz < ret)
581 ret = winsz;
582 return ret;
583 }
584
handle_settings_frame(struct st_h2o_http2client_conn_t * conn,h2o_http2_frame_t * frame,const char ** err_desc)585 static int handle_settings_frame(struct st_h2o_http2client_conn_t *conn, h2o_http2_frame_t *frame, const char **err_desc)
586 {
587 if (frame->stream_id != 0) {
588 *err_desc = "invalid stream id in SETTINGS frame";
589 return H2O_HTTP2_ERROR_PROTOCOL;
590 }
591
592 if ((frame->flags & H2O_HTTP2_FRAME_FLAG_ACK) != 0) {
593 if (frame->length != 0) {
594 *err_desc = "invalid SETTINGS frame (+ACK)";
595 return H2O_HTTP2_ERROR_FRAME_SIZE;
596 }
597 } else {
598 uint32_t prev_initial_window_size = conn->peer_settings.initial_window_size;
599 int ret = h2o_http2_update_peer_settings(&conn->peer_settings, frame->payload, frame->length, err_desc);
600 if (ret != 0)
601 return ret;
602 { /* schedule ack */
603 h2o_iovec_t header_buf = h2o_buffer_reserve(&conn->output.buf, H2O_HTTP2_FRAME_HEADER_SIZE);
604 h2o_http2_encode_frame_header((void *)header_buf.base, 0, H2O_HTTP2_FRAME_TYPE_SETTINGS, H2O_HTTP2_FRAME_FLAG_ACK, 0);
605 conn->output.buf->size += H2O_HTTP2_FRAME_HEADER_SIZE;
606 request_write(conn);
607 }
608 /* apply the change to window size (to all the streams but not the connection, see 6.9.2 of draft-15) */
609 if (prev_initial_window_size != conn->peer_settings.initial_window_size) {
610 ssize_t delta = conn->peer_settings.initial_window_size - prev_initial_window_size;
611 struct st_h2o_http2client_stream_t *stream;
612 kh_foreach_value(conn->streams, stream, { update_stream_output_window((void *)stream, delta); });
613
614 if (conn_get_buffer_window(conn) > 0)
615 request_write(conn);
616 }
617 }
618
619 return 0;
620 }
621
handle_push_promise_frame(struct st_h2o_http2client_conn_t * conn,h2o_http2_frame_t * frame,const char ** err_desc)622 static int handle_push_promise_frame(struct st_h2o_http2client_conn_t *conn, h2o_http2_frame_t *frame, const char **err_desc)
623 {
624 *err_desc = "received PUSH_PROMISE frame";
625 return H2O_HTTP2_ERROR_PROTOCOL;
626 }
627
handle_ping_frame(struct st_h2o_http2client_conn_t * conn,h2o_http2_frame_t * frame,const char ** err_desc)628 static int handle_ping_frame(struct st_h2o_http2client_conn_t *conn, h2o_http2_frame_t *frame, const char **err_desc)
629 {
630 h2o_http2_ping_payload_t payload;
631 int ret;
632
633 if ((ret = h2o_http2_decode_ping_payload(&payload, frame, err_desc)) != 0)
634 return ret;
635
636 if ((frame->flags & H2O_HTTP2_FRAME_FLAG_ACK) == 0) {
637 h2o_http2_encode_ping_frame(&conn->output.buf, 1, payload.data);
638 request_write(conn);
639 }
640
641 return 0;
642 }
643
handle_goaway_frame(struct st_h2o_http2client_conn_t * conn,h2o_http2_frame_t * frame,const char ** err_desc)644 static int handle_goaway_frame(struct st_h2o_http2client_conn_t *conn, h2o_http2_frame_t *frame, const char **err_desc)
645 {
646 h2o_http2_goaway_payload_t payload;
647 int ret;
648
649 if ((ret = h2o_http2_decode_goaway_payload(&payload, frame, err_desc)) != 0)
650 return ret;
651
652 struct st_h2o_http2client_stream_t *stream;
653 kh_foreach_value(conn->streams, stream, {
654 if (stream->stream_id > payload.last_stream_id) {
655 call_callback_with_error(stream, h2o_httpclient_error_refused_stream);
656 close_stream(stream);
657 }
658 });
659
660 /* stop opening new streams */
661 if (h2o_linklist_is_linked(&conn->super.link))
662 h2o_linklist_unlink(&conn->super.link);
663
664 return 0;
665 }
666
handle_window_update_frame(struct st_h2o_http2client_conn_t * conn,h2o_http2_frame_t * frame,const char ** err_desc)667 static int handle_window_update_frame(struct st_h2o_http2client_conn_t *conn, h2o_http2_frame_t *frame, const char **err_desc)
668 {
669 h2o_http2_window_update_payload_t payload;
670 int ret, err_is_stream_level;
671
672 if ((ret = h2o_http2_decode_window_update_payload(&payload, frame, err_desc, &err_is_stream_level)) != 0) {
673 if (err_is_stream_level) {
674 stream_send_error(conn, frame->stream_id, ret);
675 struct st_h2o_http2client_stream_t *stream = get_stream(conn, frame->stream_id);
676 if (stream != NULL) {
677 call_callback_with_error(stream, h2o_httpclient_error_protocol_violation);
678 close_stream(stream);
679 }
680 return 0;
681 } else {
682 return ret;
683 }
684 }
685
686 if (frame->stream_id == 0) {
687 if (h2o_http2_window_update(&conn->output.window, payload.window_size_increment) != 0) {
688 *err_desc = "flow control window overflow";
689 return H2O_HTTP2_ERROR_FLOW_CONTROL;
690 }
691 } else if (frame->stream_id <= conn->max_open_stream_id) {
692 struct st_h2o_http2client_stream_t *stream = get_stream(conn, frame->stream_id);
693 if (stream != NULL) {
694 if (update_stream_output_window(stream, payload.window_size_increment) != 0) {
695 stream_send_error(conn, frame->stream_id, H2O_HTTP2_ERROR_FLOW_CONTROL);
696 call_callback_with_error(stream, h2o_httpclient_error_flow_control);
697 close_stream(stream);
698 return 0;
699 }
700 }
701 } else {
702 *err_desc = "invalid stream id in WINDOW_UPDATE frame";
703 return H2O_HTTP2_ERROR_PROTOCOL;
704 }
705
706 if (conn_get_buffer_window(conn) > 0)
707 request_write(conn);
708
709 return 0;
710 }
711
handle_invalid_continuation_frame(struct st_h2o_http2client_conn_t * conn,h2o_http2_frame_t * frame,const char ** err_desc)712 static int handle_invalid_continuation_frame(struct st_h2o_http2client_conn_t *conn, h2o_http2_frame_t *frame,
713 const char **err_desc)
714 {
715 *err_desc = "received invalid CONTINUATION frame";
716 return H2O_HTTP2_ERROR_PROTOCOL;
717 }
718
expect_default(struct st_h2o_http2client_conn_t * conn,const uint8_t * src,size_t len,const char ** err_desc)719 ssize_t expect_default(struct st_h2o_http2client_conn_t *conn, const uint8_t *src, size_t len, const char **err_desc)
720 {
721 assert(conn->state != H2O_HTTP2CLIENT_CONN_STATE_IS_CLOSING);
722
723 h2o_http2_frame_t frame;
724 ssize_t ret;
725 static int (*FRAME_HANDLERS[])(struct st_h2o_http2client_conn_t * conn, h2o_http2_frame_t * frame, const char **err_desc) = {
726 handle_data_frame, /* DATA */
727 handle_headers_frame, /* HEADERS */
728 handle_priority_frame, /* PRIORITY */
729 handle_rst_stream_frame, /* RST_STREAM */
730 handle_settings_frame, /* SETTINGS */
731 handle_push_promise_frame, /* PUSH_PROMISE */
732 handle_ping_frame, /* PING */
733 handle_goaway_frame, /* GOAWAY */
734 handle_window_update_frame, /* WINDOW_UPDATE */
735 handle_invalid_continuation_frame /* CONTINUATION */
736 };
737
738 if ((ret = h2o_http2_decode_frame(&frame, src, len, H2O_HTTP2_SETTINGS_CLIENT_MAX_FRAME_SIZE, err_desc)) < 0)
739 return ret;
740
741 if (frame.type < sizeof(FRAME_HANDLERS) / sizeof(FRAME_HANDLERS[0])) {
742 int hret = FRAME_HANDLERS[frame.type](conn, &frame, err_desc);
743 if (hret != 0)
744 ret = hret;
745 } else {
746 h2o_error_printf("skipping frame (type:%d)\n", frame.type);
747 }
748
749 return ret;
750 }
751
expect_settings(struct st_h2o_http2client_conn_t * conn,const uint8_t * src,size_t len,const char ** err_desc)752 static ssize_t expect_settings(struct st_h2o_http2client_conn_t *conn, const uint8_t *src, size_t len, const char **err_desc)
753 {
754 assert(conn->state != H2O_HTTP2CLIENT_CONN_STATE_IS_CLOSING);
755
756 h2o_http2_frame_t frame;
757 ssize_t ret;
758
759 if ((ret = h2o_http2_decode_frame(&frame, src, len, H2O_HTTP2_SETTINGS_CLIENT_MAX_FRAME_SIZE, err_desc)) < 0)
760 return ret;
761
762 if (frame.type != H2O_HTTP2_FRAME_TYPE_SETTINGS)
763 return H2O_HTTP2_ERROR_PROTOCOL_CLOSE_IMMEDIATELY;
764
765 int hret = handle_settings_frame(conn, &frame, err_desc);
766 if (hret != 0)
767 return hret;
768
769 conn->input.read_frame = expect_default;
770 return ret;
771 }
772
close_connection_now(struct st_h2o_http2client_conn_t * conn)773 static void close_connection_now(struct st_h2o_http2client_conn_t *conn)
774 {
775 free(conn->super.origin_url.authority.base);
776 free(conn->super.origin_url.host.base);
777 free(conn->super.origin_url.path.base);
778
779 h2o_socket_close(conn->super.sock);
780
781 struct st_h2o_http2client_stream_t *stream;
782 kh_foreach_value(conn->streams, stream, { close_stream(stream); });
783 kh_destroy(stream, conn->streams);
784
785 if (h2o_linklist_is_linked(&conn->super.link))
786 h2o_linklist_unlink(&conn->super.link);
787
788 if (h2o_timer_is_linked(&conn->io_timeout))
789 h2o_timer_unlink(&conn->io_timeout);
790 if (h2o_timer_is_linked(&conn->keepalive_timeout))
791 h2o_timer_unlink(&conn->keepalive_timeout);
792
793 /* output */
794 h2o_hpack_dispose_header_table(&conn->output.header_table);
795 h2o_buffer_dispose(&conn->output.buf);
796 if (conn->output.buf_in_flight != NULL)
797 h2o_buffer_dispose(&conn->output.buf_in_flight);
798 if (h2o_timer_is_linked(&conn->output.defer_timeout))
799 h2o_timer_unlink(&conn->output.defer_timeout);
800 assert(h2o_linklist_is_empty(&conn->output.sending_streams));
801 assert(h2o_linklist_is_empty(&conn->output.sent_streams));
802
803 /* input */
804 h2o_hpack_dispose_header_table(&conn->input.header_table);
805 if (conn->input.headers_unparsed != NULL)
806 h2o_buffer_dispose(&conn->input.headers_unparsed);
807
808 free(conn);
809 }
810
close_connection_if_necessary(struct st_h2o_http2client_conn_t * conn)811 static int close_connection_if_necessary(struct st_h2o_http2client_conn_t *conn)
812 {
813 if (conn->state == H2O_HTTP2CLIENT_CONN_STATE_HALF_CLOSED && conn->super.num_streams == 0)
814 conn->state = H2O_HTTP2CLIENT_CONN_STATE_IS_CLOSING;
815 if (conn->state == H2O_HTTP2CLIENT_CONN_STATE_IS_CLOSING) {
816 close_connection_now(conn);
817 return 1;
818 }
819 return 0;
820 }
821
close_connection(struct st_h2o_http2client_conn_t * conn)822 static int close_connection(struct st_h2o_http2client_conn_t *conn)
823 {
824 conn->state = H2O_HTTP2CLIENT_CONN_STATE_IS_CLOSING;
825 h2o_socket_read_stop(conn->super.sock);
826
827 if (conn->output.buf_in_flight != NULL || h2o_timer_is_linked(&conn->output.defer_timeout)) {
828 /* there is a pending write, let close_connection_if_necessary actually close the connection */
829 } else {
830 close_connection_now(conn);
831 return -1;
832 }
833 return 0;
834 }
835
enqueue_goaway(struct st_h2o_http2client_conn_t * conn,int errnum,h2o_iovec_t additional_data)836 static void enqueue_goaway(struct st_h2o_http2client_conn_t *conn, int errnum, h2o_iovec_t additional_data)
837 {
838 if (conn->state == H2O_HTTP2CLIENT_CONN_STATE_IS_CLOSING)
839 return;
840
841 h2o_http2_encode_goaway_frame(&conn->output.buf, 0, errnum, additional_data);
842 request_write(conn);
843 conn->state = H2O_HTTP2CLIENT_CONN_STATE_HALF_CLOSED;
844
845 /* stop opening new streams */
846 if (h2o_linklist_is_linked(&conn->super.link))
847 h2o_linklist_unlink(&conn->super.link);
848 }
849
on_connect_error(struct st_h2o_http2client_stream_t * stream,const char * errstr)850 static void on_connect_error(struct st_h2o_http2client_stream_t *stream, const char *errstr)
851 {
852 assert(errstr != NULL);
853 stream->super._cb.on_connect(&stream->super, errstr, NULL, NULL, NULL, 0, NULL, NULL, NULL, NULL);
854 close_stream(stream);
855 }
856
do_stream_timeout(struct st_h2o_http2client_stream_t * stream)857 static void do_stream_timeout(struct st_h2o_http2client_stream_t *stream)
858 {
859 if (stream->conn == NULL) {
860 on_connect_error(stream, h2o_httpclient_error_connect_timeout);
861 return;
862 }
863 const char *errstr =
864 stream->state.res == STREAM_STATE_HEAD ? h2o_httpclient_error_first_byte_timeout : h2o_httpclient_error_io_timeout;
865 call_callback_with_error(stream, errstr);
866 close_stream(stream);
867 }
868
on_stream_timeout(h2o_timer_t * entry)869 static void on_stream_timeout(h2o_timer_t *entry)
870 {
871 struct st_h2o_http2client_stream_t *stream = H2O_STRUCT_FROM_MEMBER(struct st_h2o_http2client_stream_t, super._timeout, entry);
872 do_stream_timeout(stream);
873 }
874
on_io_timeout(h2o_timer_t * entry)875 static void on_io_timeout(h2o_timer_t *entry)
876 {
877 struct st_h2o_http2client_conn_t *conn = H2O_STRUCT_FROM_MEMBER(struct st_h2o_http2client_conn_t, io_timeout, entry);
878 struct st_h2o_http2client_stream_t *stream;
879 kh_foreach_value(conn->streams, stream, { do_stream_timeout(stream); });
880 close_connection_now(conn);
881 }
882
on_keepalive_timeout(h2o_timer_t * entry)883 static void on_keepalive_timeout(h2o_timer_t *entry)
884 {
885 struct st_h2o_http2client_conn_t *conn = H2O_STRUCT_FROM_MEMBER(struct st_h2o_http2client_conn_t, keepalive_timeout, entry);
886 enqueue_goaway(conn, H2O_HTTP2_ERROR_NONE, h2o_iovec_init(NULL, 0));
887 request_write(conn);
888 close_connection(conn);
889 }
890
parse_input(struct st_h2o_http2client_conn_t * conn)891 static int parse_input(struct st_h2o_http2client_conn_t *conn)
892 {
893 /* handle the input */
894 while (conn->state != H2O_HTTP2CLIENT_CONN_STATE_IS_CLOSING && conn->super.sock->input->size != 0) {
895 /* process a frame */
896 const char *err_desc = NULL;
897 ssize_t ret =
898 conn->input.read_frame(conn, (uint8_t *)conn->super.sock->input->bytes, conn->super.sock->input->size, &err_desc);
899 if (ret == H2O_HTTP2_ERROR_INCOMPLETE) {
900 break;
901 } else if (ret < 0) {
902 if (ret != H2O_HTTP2_ERROR_PROTOCOL_CLOSE_IMMEDIATELY) {
903 enqueue_goaway(conn, (int)ret,
904 err_desc != NULL ? (h2o_iovec_t){(char *)err_desc, strlen(err_desc)} : (h2o_iovec_t){NULL});
905 }
906 call_stream_callbacks_with_error(conn, h2o_httpclient_error_protocol_violation);
907 return close_connection(conn);
908 }
909 /* advance to the next frame */
910 h2o_buffer_consume(&conn->super.sock->input, ret);
911 }
912 return 0;
913 }
914
on_read(h2o_socket_t * sock,const char * err)915 static void on_read(h2o_socket_t *sock, const char *err)
916 {
917 struct st_h2o_http2client_conn_t *conn = sock->data;
918
919 h2o_timer_unlink(&conn->io_timeout);
920
921 if (err != NULL) {
922 call_stream_callbacks_with_error(conn, h2o_httpclient_error_io);
923 close_connection(conn);
924 return;
925 }
926
927 if (parse_input(conn) != 0)
928 return;
929
930 /* write immediately, if pending write exists */
931 if (h2o_timer_is_linked(&conn->output.defer_timeout)) {
932 h2o_timer_unlink(&conn->output.defer_timeout);
933 do_emit_writereq(conn);
934 }
935
936 if (!h2o_timer_is_linked(&conn->io_timeout))
937 h2o_timer_link(conn->super.ctx->loop, conn->super.ctx->io_timeout, &conn->io_timeout);
938 }
939
on_connection_ready(struct st_h2o_http2client_stream_t * stream,struct st_h2o_http2client_conn_t * conn)940 static void on_connection_ready(struct st_h2o_http2client_stream_t *stream, struct st_h2o_http2client_conn_t *conn)
941 {
942 h2o_iovec_t method;
943 h2o_url_t url;
944 h2o_header_t *headers;
945 size_t num_headers;
946 h2o_iovec_t body;
947 h2o_httpclient_properties_t props = (h2o_httpclient_properties_t){NULL};
948
949 register_stream(stream, conn);
950
951 stream->super._cb.on_head =
952 stream->super._cb.on_connect(&stream->super, NULL, &method, &url, (const h2o_header_t **)&headers, &num_headers, &body,
953 &stream->streaming.proceed_req, &props, &conn->super.origin_url);
954 if (stream->super._cb.on_head == NULL) {
955 close_stream(stream);
956 return;
957 }
958
959 h2o_http2_window_init(&stream->output.window, conn->peer_settings.initial_window_size);
960
961 /* forward request state */
962 if (stream->streaming.proceed_req != NULL) {
963 stream->state.req = STREAM_STATE_BODY;
964 if (body.len != 0)
965 stream->streaming.inflight = 1;
966 } else if (body.base != NULL) {
967 stream->state.req = STREAM_STATE_BODY;
968 } else {
969 stream->state.req = STREAM_STATE_CLOSED;
970 }
971
972 /* send headers */
973 h2o_hpack_flatten_request(&conn->output.buf, &conn->output.header_table, conn->peer_settings.header_table_size,
974 stream->stream_id, conn->peer_settings.max_frame_size, method, &url, headers, num_headers,
975 stream->state.req == STREAM_STATE_CLOSED);
976
977 if (stream->state.req == STREAM_STATE_BODY) {
978 h2o_buffer_init(&stream->output.buf, &h2o_socket_buffer_prototype);
979 h2o_buffer_append(&stream->output.buf, body.base, body.len);
980 }
981 h2o_linklist_insert(&conn->output.sending_streams, &stream->output.sending_link);
982 request_write(conn);
983 }
984
on_notify_write(h2o_socket_t * sock,const char * err)985 static void on_notify_write(h2o_socket_t *sock, const char *err)
986 {
987 struct st_h2o_http2client_conn_t *conn = sock->data;
988
989 if (err != NULL) {
990 call_stream_callbacks_with_error(conn, h2o_httpclient_error_io);
991 close_connection_now(conn);
992 return;
993 }
994 do_emit_writereq(conn);
995 close_connection_if_necessary(conn);
996 }
997
on_write_complete(h2o_socket_t * sock,const char * err)998 static void on_write_complete(h2o_socket_t *sock, const char *err)
999 {
1000 struct st_h2o_http2client_conn_t *conn = sock->data;
1001
1002 assert(conn->output.buf_in_flight != NULL);
1003
1004 h2o_timer_unlink(&conn->io_timeout);
1005
1006 /* close by error if necessary */
1007 if (err != NULL) {
1008 call_stream_callbacks_with_error(conn, h2o_httpclient_error_io);
1009 close_connection_now(conn);
1010 return;
1011 }
1012
1013 if (close_connection_if_necessary(conn))
1014 return;
1015
1016 /* unlink timeouts of streams that has finished sending requests */
1017 while (!h2o_linklist_is_empty(&conn->output.sent_streams)) {
1018 h2o_linklist_t *link = conn->output.sent_streams.next;
1019 struct st_h2o_http2client_stream_t *stream =
1020 H2O_STRUCT_FROM_MEMBER(struct st_h2o_http2client_stream_t, output.sending_link, link);
1021 h2o_linklist_unlink(link);
1022
1023 if (stream->streaming.proceed_req != NULL && stream->streaming.inflight) {
1024 stream->streaming.inflight = 0;
1025 stream->streaming.proceed_req(&stream->super, NULL);
1026 }
1027
1028 if (stream->streaming.proceed_req == NULL || stream->streaming.done) {
1029 stream->state.req = STREAM_STATE_CLOSED;
1030 h2o_timer_link(stream->super.ctx->loop, stream->super.ctx->first_byte_timeout, &stream->super._timeout);
1031 }
1032 }
1033
1034 /* reset the other buffer */
1035 h2o_buffer_dispose(&conn->output.buf_in_flight);
1036
1037 #if !H2O_USE_LIBUV
1038 if (conn->state == H2O_HTTP2CLIENT_CONN_STATE_OPEN) {
1039 if (conn->output.buf->size != 0 || !h2o_linklist_is_empty(&conn->output.sending_streams))
1040 h2o_socket_notify_write(sock, on_notify_write);
1041 return;
1042 }
1043 #endif
1044
1045 /* write more, if possible */
1046 do_emit_writereq(conn);
1047 close_connection_if_necessary(conn);
1048 }
1049
sz_min(size_t x,size_t y)1050 static size_t sz_min(size_t x, size_t y)
1051 {
1052 return x < y ? x : y;
1053 }
1054
calc_max_payload_size(struct st_h2o_http2client_stream_t * stream)1055 static size_t calc_max_payload_size(struct st_h2o_http2client_stream_t *stream)
1056 {
1057 ssize_t conn_max, stream_max;
1058
1059 if ((conn_max = conn_get_buffer_window(stream->conn)) <= 0)
1060 return 0;
1061 if ((stream_max = h2o_http2_window_get_avail(&stream->output.window)) <= 0)
1062 return 0;
1063 return sz_min(sz_min(conn_max, stream_max), stream->conn->peer_settings.max_frame_size);
1064 }
1065
stream_emit_pending_data(struct st_h2o_http2client_stream_t * stream)1066 static size_t stream_emit_pending_data(struct st_h2o_http2client_stream_t *stream)
1067 {
1068 size_t max_payload_size = calc_max_payload_size(stream);
1069 size_t payload_size = sz_min(max_payload_size, stream->output.buf->size);
1070 if (payload_size == 0)
1071 return 0;
1072 char *dst = h2o_buffer_reserve(&stream->conn->output.buf, H2O_HTTP2_FRAME_HEADER_SIZE + payload_size).base;
1073 int end_stream = (stream->streaming.proceed_req == NULL || stream->streaming.done) && payload_size == stream->output.buf->size;
1074 h2o_http2_encode_frame_header((void *)dst, stream->output.buf->size, H2O_HTTP2_FRAME_TYPE_DATA,
1075 end_stream ? H2O_HTTP2_FRAME_FLAG_END_STREAM : 0, stream->stream_id);
1076 h2o_memcpy(dst + H2O_HTTP2_FRAME_HEADER_SIZE, stream->output.buf->bytes, payload_size);
1077 stream->conn->output.buf->size += H2O_HTTP2_FRAME_HEADER_SIZE + payload_size;
1078 h2o_buffer_consume(&stream->output.buf, payload_size);
1079
1080 h2o_http2_window_consume_window(&stream->conn->output.window, payload_size);
1081 h2o_http2_window_consume_window(&stream->output.window, payload_size);
1082
1083 return payload_size;
1084 }
1085
do_emit_writereq(struct st_h2o_http2client_conn_t * conn)1086 static void do_emit_writereq(struct st_h2o_http2client_conn_t *conn)
1087 {
1088 assert(conn->output.buf_in_flight == NULL);
1089
1090 /* emit DATA frames */
1091 h2o_linklist_t *node = conn->output.sending_streams.next;
1092 h2o_linklist_t *first = node;
1093 while (node != &conn->output.sending_streams) {
1094 h2o_linklist_t *next = node->next;
1095 struct st_h2o_http2client_stream_t *stream =
1096 H2O_STRUCT_FROM_MEMBER(struct st_h2o_http2client_stream_t, output.sending_link, node);
1097 h2o_linklist_unlink(node);
1098
1099 size_t bytes_emitted = 0;
1100 if (stream->output.buf != NULL && stream->output.buf->size != 0)
1101 bytes_emitted = stream_emit_pending_data(stream);
1102
1103 if (stream->output.buf != NULL && stream->output.buf->size == 0) {
1104 h2o_linklist_insert(&conn->output.sent_streams, node);
1105 } else if (h2o_http2_window_get_avail(&stream->output.window) > 0) {
1106 h2o_linklist_insert(&conn->output.sending_streams, node); /* move to the tail to rotate buffers */
1107 }
1108
1109 if (next == first)
1110 break;
1111 node = next;
1112 }
1113
1114 if (conn->output.buf->size != 0) {
1115 /* write and wait for completion */
1116 h2o_iovec_t buf = {conn->output.buf->bytes, conn->output.buf->size};
1117 h2o_socket_write(conn->super.sock, &buf, 1, on_write_complete);
1118 conn->output.buf_in_flight = conn->output.buf;
1119 h2o_buffer_init(&conn->output.buf, &h2o_http2_wbuf_buffer_prototype);
1120 if (!h2o_timer_is_linked(&conn->io_timeout))
1121 h2o_timer_link(conn->super.ctx->loop, conn->super.ctx->io_timeout, &conn->io_timeout);
1122 }
1123 }
1124
emit_writereq(h2o_timer_t * entry)1125 static void emit_writereq(h2o_timer_t *entry)
1126 {
1127 struct st_h2o_http2client_conn_t *conn = H2O_STRUCT_FROM_MEMBER(struct st_h2o_http2client_conn_t, output.defer_timeout, entry);
1128 do_emit_writereq(conn);
1129 }
1130
create_connection(h2o_httpclient_ctx_t * ctx,h2o_socket_t * sock,h2o_url_t * origin_url,h2o_httpclient_connection_pool_t * connpool)1131 static struct st_h2o_http2client_conn_t *create_connection(h2o_httpclient_ctx_t *ctx, h2o_socket_t *sock, h2o_url_t *origin_url,
1132 h2o_httpclient_connection_pool_t *connpool)
1133 {
1134 struct st_h2o_http2client_conn_t *conn = h2o_mem_alloc(sizeof(*conn));
1135 memset(conn, 0, sizeof(*conn));
1136 conn->super.ctx = ctx;
1137 conn->super.sock = sock;
1138 conn->state = H2O_HTTP2CLIENT_CONN_STATE_OPEN;
1139 conn->peer_settings = H2O_HTTP2_SETTINGS_DEFAULT;
1140 conn->streams = kh_init(stream);
1141 h2o_url_copy(NULL, &conn->super.origin_url, origin_url);
1142 if (connpool != NULL)
1143 h2o_linklist_insert(&connpool->http2.conns, &conn->super.link);
1144 conn->io_timeout.cb = on_io_timeout;
1145 conn->keepalive_timeout.cb = on_keepalive_timeout;
1146
1147 /* output */
1148 conn->output.header_table.hpack_capacity = H2O_HTTP2_SETTINGS_CLIENT_HEADER_TABLE_SIZE;
1149 h2o_http2_window_init(&conn->output.window, conn->peer_settings.initial_window_size);
1150 h2o_buffer_init(&conn->output.buf, &h2o_http2_wbuf_buffer_prototype);
1151 conn->output.defer_timeout.cb = emit_writereq;
1152 h2o_linklist_init_anchor(&conn->output.sending_streams);
1153 h2o_linklist_init_anchor(&conn->output.sent_streams);
1154
1155 /* input */
1156 conn->input.header_table.hpack_capacity = conn->input.header_table.hpack_max_capacity =
1157 H2O_HTTP2_SETTINGS_DEFAULT.header_table_size;
1158 h2o_http2_window_init(&conn->input.window, H2O_HTTP2_SETTINGS_DEFAULT.initial_window_size);
1159 conn->input.read_frame = expect_settings;
1160
1161 return conn;
1162 }
1163
send_client_preface(struct st_h2o_http2client_conn_t * conn,h2o_httpclient_ctx_t * ctx)1164 static void send_client_preface(struct st_h2o_http2client_conn_t *conn, h2o_httpclient_ctx_t *ctx)
1165 {
1166 #define PREFIX \
1167 "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" \
1168 "\x00\x00\x12" /* frame size */ \
1169 "\x04" /* settings frame */ \
1170 "\x00" /* no flags */ \
1171 "\x00\x00\x00\x00" /* stream id */ \
1172 "\x00\x02" /* enable_push */ \
1173 "\x00\x00\x00\x00" /* 0 */ \
1174 "\x00\x03" /* max_concurrent_streams */ \
1175 "\x00\x00\x00\x64" /* 100 */ \
1176 "\x00\x04" /* initial_window_size */
1177 static const size_t len = sizeof(PREFIX) - 1 + 4;
1178
1179 uint32_t initial_window_size = get_max_buffer_size(ctx);
1180
1181 h2o_iovec_t vec = h2o_buffer_reserve(&conn->output.buf, len);
1182 memcpy(vec.base, PREFIX, sizeof(PREFIX) - 1);
1183
1184 /* encode max_buffer_size */
1185 vec.base[len - 4] = (char)((initial_window_size >> 24) & 0xff);
1186 vec.base[len - 3] = (char)((initial_window_size >> 16) & 0xff);
1187 vec.base[len - 2] = (char)((initial_window_size >> 8) & 0xff);
1188 vec.base[len - 1] = (char)(initial_window_size & 0xff);
1189
1190 conn->output.buf->size += len;
1191 request_write(conn);
1192 #undef PREFIX
1193 }
1194
do_cancel(h2o_httpclient_t * _client)1195 static void do_cancel(h2o_httpclient_t *_client)
1196 {
1197 struct st_h2o_http2client_stream_t *stream = (void *)_client;
1198 stream_send_error(stream->conn, stream->stream_id, H2O_HTTP2_ERROR_CANCEL);
1199 close_stream(stream);
1200 }
1201
do_get_conn_properties(h2o_httpclient_t * _client,h2o_httpclient_conn_properties_t * properties)1202 static void do_get_conn_properties(h2o_httpclient_t *_client, h2o_httpclient_conn_properties_t *properties)
1203 {
1204 struct st_h2o_http2client_stream_t *stream = (void *)_client;
1205 h2o_httpclient_set_conn_properties_of_socket(stream->conn->super.sock, properties);
1206 }
1207
do_update_window(h2o_httpclient_t * _client)1208 static void do_update_window(h2o_httpclient_t *_client)
1209 {
1210 struct st_h2o_http2client_stream_t *stream = (void *)_client;
1211 size_t max = get_max_buffer_size(stream->super.ctx);
1212 size_t bufsize = stream->input.body->size;
1213 assert(bufsize <= max);
1214 enqueue_window_update(stream->conn, stream->stream_id, &stream->input.window, max - bufsize);
1215 }
1216
do_write_req(h2o_httpclient_t * _client,h2o_iovec_t chunk,int is_end_stream)1217 static int do_write_req(h2o_httpclient_t *_client, h2o_iovec_t chunk, int is_end_stream)
1218 {
1219 struct st_h2o_http2client_stream_t *stream = (void *)_client;
1220 assert(stream->streaming.proceed_req != NULL);
1221 assert(!stream->streaming.inflight);
1222
1223 stream->streaming.inflight = 1;
1224 if (is_end_stream)
1225 stream->streaming.done = 1;
1226
1227 if (stream->output.buf == NULL) {
1228 h2o_buffer_init(&stream->output.buf, &h2o_socket_buffer_prototype);
1229 }
1230
1231 if (chunk.len != 0) {
1232 h2o_buffer_append(&stream->output.buf, chunk.base, chunk.len);
1233 }
1234
1235 if (!h2o_linklist_is_linked(&stream->output.sending_link)) {
1236 h2o_linklist_insert(&stream->conn->output.sending_streams, &stream->output.sending_link);
1237 request_write(stream->conn);
1238 }
1239
1240 return 0;
1241 }
1242
setup_stream(struct st_h2o_http2client_stream_t * stream)1243 static void setup_stream(struct st_h2o_http2client_stream_t *stream)
1244 {
1245 memset(&stream->conn, 0, sizeof(*stream) - offsetof(struct st_h2o_http2client_stream_t, conn));
1246
1247 stream->super._timeout.cb = on_stream_timeout;
1248 h2o_http2_window_init(&stream->input.window, get_max_buffer_size(stream->super.ctx));
1249 h2o_buffer_init(&stream->input.body, &h2o_socket_buffer_prototype);
1250
1251 stream->super.buf = &stream->input.body;
1252 stream->super.cancel = do_cancel;
1253 stream->super.get_conn_properties = do_get_conn_properties;
1254 stream->super.update_window = do_update_window;
1255 stream->super.write_req = do_write_req;
1256 }
1257
h2o_httpclient__h2_on_connect(h2o_httpclient_t * _client,h2o_socket_t * sock,h2o_url_t * origin)1258 void h2o_httpclient__h2_on_connect(h2o_httpclient_t *_client, h2o_socket_t *sock, h2o_url_t *origin)
1259 {
1260 struct st_h2o_http2client_stream_t *stream = (void *)_client;
1261
1262 assert(!h2o_timer_is_linked(&stream->super._timeout));
1263
1264 struct st_h2o_http2client_conn_t *conn = sock->data;
1265 if (conn == NULL) {
1266 conn = create_connection(stream->super.ctx, sock, origin, stream->super.connpool);
1267 sock->data = conn;
1268 /* send preface, settings, and connection-level window update */
1269 send_client_preface(conn, stream->super.ctx);
1270 h2o_socket_read_start(conn->super.sock, on_read);
1271 }
1272
1273 setup_stream(stream);
1274
1275 if (!h2o_timer_is_linked(&conn->io_timeout))
1276 h2o_timer_link(conn->super.ctx->loop, conn->super.ctx->io_timeout, &conn->io_timeout);
1277 on_connection_ready(stream, conn);
1278 }
1279
1280 const size_t h2o_httpclient__h2_size = sizeof(struct st_h2o_http2client_stream_t);
1281