1 /*
2  * Copyright (c) 2018 Ichito Nagata, Fastly, Inc.
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a copy
5  * of this software and associated documentation files (the "Software"), to
6  * deal in the Software without restriction, including without limitation the
7  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
8  * sell copies of the Software, and to permit persons to whom the Software is
9  * furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in
12  * all copies or substantial portions of the Software.
13  *
14  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20  * IN THE SOFTWARE.
21  */
22 #include <arpa/inet.h>
23 #include <netdb.h>
24 #include <netinet/in.h>
25 #include <sys/socket.h>
26 #include <sys/types.h>
27 #include <sys/un.h>
28 #include "khash.h"
29 #include "h2o/hpack.h"
30 #include "h2o/httpclient.h"
31 #include "h2o/http2_common.h"
32 
33 #define H2O_HTTP2_SETTINGS_CLIENT_CONNECTION_WINDOW_SIZE 16777216
34 #define H2O_HTTP2_SETTINGS_CLIENT_HEADER_TABLE_SIZE 4096
35 #define H2O_HTTP2_SETTINGS_CLIENT_MAX_FRAME_SIZE 16384
36 
37 enum enum_h2o_http2client_stream_state {
38     STREAM_STATE_HEAD,
39     STREAM_STATE_BODY,
40     STREAM_STATE_CLOSED,
41 };
42 
43 enum enum_h2o_http2client_conn_state {
44     H2O_HTTP2CLIENT_CONN_STATE_OPEN,
45     H2O_HTTP2CLIENT_CONN_STATE_HALF_CLOSED,
46     H2O_HTTP2CLIENT_CONN_STATE_IS_CLOSING,
47 };
48 
49 struct st_h2o_http2client_stream_t;
50 KHASH_MAP_INIT_INT64(stream, struct st_h2o_http2client_stream_t *)
51 
52 struct st_h2o_http2client_conn_t {
53     h2o_httpclient__h2_conn_t super;
54     enum enum_h2o_http2client_conn_state state;
55     khash_t(stream) * streams;
56     h2o_http2_settings_t peer_settings;
57     uint32_t max_open_stream_id;
58     h2o_timer_t io_timeout;
59     h2o_timer_t keepalive_timeout;
60 
61     struct {
62         h2o_hpack_header_table_t header_table;
63         h2o_http2_window_t window;
64         h2o_buffer_t *buf;
65         h2o_buffer_t *buf_in_flight;
66         h2o_timer_t defer_timeout;
67         h2o_linklist_t sending_streams;
68         h2o_linklist_t sent_streams;
69     } output;
70 
71     struct {
72         h2o_hpack_header_table_t header_table;
73         h2o_http2_window_t window;
74         ssize_t (*read_frame)(struct st_h2o_http2client_conn_t *conn, const uint8_t *src, size_t len, const char **err_desc);
75         h2o_buffer_t *headers_unparsed;
76     } input;
77 };
78 
79 struct st_h2o_http2client_stream_t {
80     h2o_httpclient_t super;
81     struct st_h2o_http2client_conn_t *conn;
82     uint32_t stream_id;
83     struct {
84         enum enum_h2o_http2client_stream_state req;
85         enum enum_h2o_http2client_stream_state res;
86     } state;
87 
88     struct {
89         h2o_http2_window_t window;
90         h2o_buffer_t *buf;
91         h2o_linklist_t sending_link;
92     } output;
93 
94     struct {
95         h2o_http2_window_t window;
96         int status;
97         h2o_headers_t headers;
98         h2o_buffer_t *body;
99     } input;
100 
101     struct {
102         h2o_httpclient_proceed_req_cb proceed_req;
103         unsigned char done : 1;
104         unsigned char inflight : 1;
105     } streaming;
106 };
107 
108 static void do_emit_writereq(struct st_h2o_http2client_conn_t *conn);
109 
request_write(struct st_h2o_http2client_conn_t * conn)110 static void request_write(struct st_h2o_http2client_conn_t *conn)
111 {
112     if (conn->state == H2O_HTTP2CLIENT_CONN_STATE_IS_CLOSING)
113         return;
114     if (!h2o_socket_is_writing(conn->super.sock) && !h2o_timer_is_linked(&conn->output.defer_timeout))
115         h2o_timer_link(conn->super.ctx->loop, 0, &conn->output.defer_timeout);
116 }
117 
enqueue_window_update(struct st_h2o_http2client_conn_t * conn,uint32_t stream_id,h2o_http2_window_t * window,size_t desired)118 static void enqueue_window_update(struct st_h2o_http2client_conn_t *conn, uint32_t stream_id, h2o_http2_window_t *window,
119                                   size_t desired)
120 {
121     assert(desired <= INT32_MAX);
122     if (h2o_http2_window_get_avail(window) * 2 < desired) {
123         int32_t delta = (int32_t)(desired - h2o_http2_window_get_avail(window));
124         h2o_http2_encode_window_update_frame(&conn->output.buf, stream_id, delta);
125         request_write(conn);
126         h2o_http2_window_update(window, delta);
127     }
128 }
129 
stream_send_error(struct st_h2o_http2client_conn_t * conn,uint32_t stream_id,int errnum)130 static void stream_send_error(struct st_h2o_http2client_conn_t *conn, uint32_t stream_id, int errnum)
131 {
132     assert(stream_id != 0);
133     assert(conn->state != H2O_HTTP2CLIENT_CONN_STATE_IS_CLOSING);
134 
135     h2o_http2_encode_rst_stream_frame(&conn->output.buf, stream_id, -errnum);
136     request_write(conn);
137 }
138 
get_stream(struct st_h2o_http2client_conn_t * conn,uint32_t stream_id)139 static struct st_h2o_http2client_stream_t *get_stream(struct st_h2o_http2client_conn_t *conn, uint32_t stream_id)
140 {
141     khiter_t iter = kh_get(stream, conn->streams, stream_id);
142     if (iter != kh_end(conn->streams))
143         return (struct st_h2o_http2client_stream_t *)kh_val(conn->streams, iter);
144     return NULL;
145 }
146 
get_max_buffer_size(h2o_httpclient_ctx_t * ctx)147 static uint32_t get_max_buffer_size(h2o_httpclient_ctx_t *ctx)
148 {
149     size_t sz = ctx->max_buffer_size;
150     if (sz > INT32_MAX)
151         sz = INT32_MAX;
152     return (uint32_t)sz;
153 }
154 
h2o_httpclient__h2_get_max_concurrent_streams(h2o_httpclient__h2_conn_t * _conn)155 uint32_t h2o_httpclient__h2_get_max_concurrent_streams(h2o_httpclient__h2_conn_t *_conn)
156 {
157     struct st_h2o_http2client_conn_t *conn = (void *)_conn;
158     return conn->peer_settings.max_concurrent_streams < conn->super.ctx->http2.max_concurrent_streams
159                ? conn->peer_settings.max_concurrent_streams
160                : conn->super.ctx->http2.max_concurrent_streams;
161 }
162 
adjust_conn_linkedlist(h2o_httpclient_connection_pool_t * connpool,struct st_h2o_http2client_conn_t * conn,int forward)163 static void adjust_conn_linkedlist(h2o_httpclient_connection_pool_t *connpool, struct st_h2o_http2client_conn_t *conn, int forward)
164 {
165     if (connpool == NULL) {
166         assert(!h2o_linklist_is_linked(&conn->super.link));
167         return;
168     }
169     if (!h2o_linklist_is_linked(&conn->super.link))
170         return;
171 
172     double ratio = (double)conn->super.num_streams / h2o_httpclient__h2_get_max_concurrent_streams(&conn->super);
173 
174     /* adjust connection linked list */
175     h2o_linklist_t *node = forward ? conn->super.link.next : conn->super.link.prev;
176     while (node != &connpool->http2.conns) {
177         struct st_h2o_http2client_conn_t *cur = H2O_STRUCT_FROM_MEMBER(struct st_h2o_http2client_conn_t, super.link, node);
178         double cur_ratio = (double)cur->super.num_streams / h2o_httpclient__h2_get_max_concurrent_streams(&cur->super);
179         if (forward ? (ratio <= cur_ratio) : (ratio >= cur_ratio))
180             break;
181         node = forward ? node->next : node->prev;
182     }
183     if (forward) {
184         if (node == conn->super.link.next)
185             return;
186     } else {
187         if (node == conn->super.link.prev)
188             return;
189         if (node != &connpool->http2.conns)
190             node = node->next; /* do `insert after` rather than `insert before` */
191     }
192     h2o_linklist_unlink(&conn->super.link);
193     h2o_linklist_insert(node, &conn->super.link);
194 }
195 
register_stream(struct st_h2o_http2client_stream_t * stream,struct st_h2o_http2client_conn_t * conn)196 static void register_stream(struct st_h2o_http2client_stream_t *stream, struct st_h2o_http2client_conn_t *conn)
197 {
198     assert(stream->stream_id == 0);
199 
200     stream->conn = conn;
201 
202     stream->stream_id = conn->max_open_stream_id == 0 ? 1 : conn->max_open_stream_id + 2;
203     conn->max_open_stream_id = stream->stream_id;
204 
205     int r;
206     khiter_t iter = kh_put(stream, conn->streams, stream->stream_id, &r);
207     assert(iter != kh_end(conn->streams));
208     kh_val(conn->streams, iter) = stream;
209 
210     ++conn->super.num_streams;
211 
212     if (h2o_timer_is_linked(&conn->keepalive_timeout))
213         h2o_timer_unlink(&conn->keepalive_timeout);
214 
215     adjust_conn_linkedlist(stream->super.connpool, conn, 1);
216 }
217 
unregister_stream(struct st_h2o_http2client_stream_t * stream)218 static void unregister_stream(struct st_h2o_http2client_stream_t *stream)
219 {
220     khiter_t iter = kh_get(stream, stream->conn->streams, stream->stream_id);
221     assert(iter != kh_end(stream->conn->streams));
222     kh_del(stream, stream->conn->streams, iter);
223 
224     --stream->conn->super.num_streams;
225 
226     if (stream->conn->super.num_streams == 0)
227         h2o_timer_link(stream->conn->super.ctx->loop, stream->conn->super.ctx->keepalive_timeout, &stream->conn->keepalive_timeout);
228 
229     adjust_conn_linkedlist(stream->super.connpool, stream->conn, 0);
230 }
231 
close_stream(struct st_h2o_http2client_stream_t * stream)232 static void close_stream(struct st_h2o_http2client_stream_t *stream)
233 {
234     if (stream->conn != NULL) {
235         unregister_stream(stream);
236     }
237 
238     if (h2o_timer_is_linked(&stream->super._timeout))
239         h2o_timer_unlink(&stream->super._timeout);
240     if (h2o_linklist_is_linked(&stream->output.sending_link))
241         h2o_linklist_unlink(&stream->output.sending_link);
242 
243     if (stream->output.buf != NULL)
244         h2o_buffer_dispose(&stream->output.buf);
245     h2o_buffer_dispose(&stream->input.body);
246 
247     free(stream);
248 }
249 
close_response(struct st_h2o_http2client_stream_t * stream)250 static void close_response(struct st_h2o_http2client_stream_t *stream)
251 {
252     assert(stream->state.res != STREAM_STATE_CLOSED);
253     stream->state.res = STREAM_STATE_CLOSED;
254     if (stream->state.req == STREAM_STATE_CLOSED) {
255         close_stream(stream);
256     }
257 }
258 
call_callback_with_error(struct st_h2o_http2client_stream_t * stream,const char * errstr)259 static void call_callback_with_error(struct st_h2o_http2client_stream_t *stream, const char *errstr)
260 {
261     assert(errstr != NULL);
262     switch (stream->state.res) {
263     case STREAM_STATE_HEAD: {
264         h2o_httpclient_on_head_t on_head = {.version = 0x200};
265         stream->super._cb.on_head(&stream->super, errstr, &on_head);
266     } break;
267     case STREAM_STATE_BODY:
268         stream->super._cb.on_body(&stream->super, errstr);
269         break;
270     case STREAM_STATE_CLOSED:
271         if (stream->streaming.proceed_req != NULL) {
272             stream->streaming.inflight = 0; /* proceed_req can be called to indicate error, regardless of write being inflight */
273             stream->streaming.proceed_req(&stream->super, errstr);
274         }
275         break;
276     }
277 }
278 
call_stream_callbacks_with_error(struct st_h2o_http2client_conn_t * conn,const char * errstr)279 static void call_stream_callbacks_with_error(struct st_h2o_http2client_conn_t *conn, const char *errstr)
280 {
281     struct st_h2o_http2client_stream_t *stream;
282     kh_foreach_value(conn->streams, stream, { call_callback_with_error(stream, errstr); });
283 }
284 
on_head(struct st_h2o_http2client_conn_t * conn,struct st_h2o_http2client_stream_t * stream,const uint8_t * src,size_t len,const char ** err_desc,int is_end_stream)285 static int on_head(struct st_h2o_http2client_conn_t *conn, struct st_h2o_http2client_stream_t *stream, const uint8_t *src,
286                    size_t len, const char **err_desc, int is_end_stream)
287 {
288     int ret;
289 
290     //    assert(stream->state == H2O_HTTP2CLIENT_STREAM_STATE_RECV_HEADERS);
291 
292     if ((ret = h2o_hpack_parse_response(stream->super.pool, h2o_hpack_decode_header, &conn->input.header_table,
293                                         &stream->input.status, &stream->input.headers, NULL, src, len, err_desc)) != 0) {
294         if (ret == H2O_HTTP2_ERROR_INVALID_HEADER_CHAR) {
295             ret = H2O_HTTP2_ERROR_PROTOCOL;
296             goto Failed;
297         }
298         return ret;
299     }
300 
301     if (100 <= stream->input.status && stream->input.status <= 199) {
302         if (stream->input.status == 101) {
303             ret = H2O_HTTP2_ERROR_PROTOCOL; // TODO is this alright?
304             goto Failed;
305         }
306         if (stream->super.informational_cb != NULL &&
307             stream->super.informational_cb(&stream->super, 0, stream->input.status, h2o_iovec_init(NULL, 0),
308                                            stream->input.headers.entries, stream->input.headers.size) != 0) {
309             ret = H2O_HTTP2_ERROR_INTERNAL;
310             goto SendRSTStream;
311         }
312         return 0;
313     }
314 
315     h2o_httpclient_on_head_t on_head = {.version = 0x200,
316                                         .status = stream->input.status,
317                                         .msg = h2o_iovec_init(NULL, 0),
318                                         .headers = stream->input.headers.entries,
319                                         .num_headers = stream->input.headers.size};
320     stream->super._cb.on_body =
321         stream->super._cb.on_head(&stream->super, is_end_stream ? h2o_httpclient_error_is_eos : NULL, &on_head);
322 
323     if (is_end_stream) {
324         close_response(stream);
325         return 0;
326     }
327     if (stream->super._cb.on_body == NULL) {
328         /**
329          * NOTE: if on_head returns NULL due to invalid response (e.g. invalid content-length header)
330          * sending RST_STREAM with PROTOCOL_ERROR might be more suitable than CANCEL
331          * (see: https://tools.ietf.org/html/rfc7540#section-8.1.2.6)
332          * but sending CANCEL is not wrong, so we leave this as-is for now.
333          */
334         ret = H2O_HTTP2_ERROR_CANCEL;
335         goto SendRSTStream;
336     }
337 
338     stream->state.res = STREAM_STATE_BODY;
339 
340     return 0;
341 
342 Failed:
343     assert(ret == H2O_HTTP2_ERROR_PROTOCOL);
344     call_callback_with_error(stream, h2o_httpclient_error_protocol_violation);
345 SendRSTStream:
346     stream_send_error(conn, stream->stream_id, ret);
347     close_stream(stream);
348     return 0;
349 }
350 
351 ssize_t expect_default(struct st_h2o_http2client_conn_t *conn, const uint8_t *src, size_t len, const char **err_desc);
expect_continuation_of_headers(struct st_h2o_http2client_conn_t * conn,const uint8_t * src,size_t len,const char ** err_desc)352 static ssize_t expect_continuation_of_headers(struct st_h2o_http2client_conn_t *conn, const uint8_t *src, size_t len,
353                                               const char **err_desc)
354 {
355     h2o_http2_frame_t frame;
356     ssize_t ret;
357     struct st_h2o_http2client_stream_t *stream;
358     int hret;
359 
360     if ((ret = h2o_http2_decode_frame(&frame, src, len, H2O_HTTP2_SETTINGS_CLIENT_MAX_FRAME_SIZE, err_desc)) < 0)
361         return ret;
362     if (frame.type != H2O_HTTP2_FRAME_TYPE_CONTINUATION) {
363         *err_desc = "expected CONTINUATION frame";
364         return H2O_HTTP2_ERROR_PROTOCOL;
365     }
366 
367     if ((stream = get_stream(conn, frame.stream_id)) == NULL || stream->state.res == STREAM_STATE_CLOSED) {
368         *err_desc = "unexpected stream id in CONTINUATION frame";
369         return H2O_HTTP2_ERROR_PROTOCOL;
370     }
371 
372     if (stream->state.res == STREAM_STATE_BODY) {
373         /* is a trailer, do nothing */
374         return ret;
375     }
376 
377     h2o_buffer_reserve(&conn->input.headers_unparsed, frame.length);
378     memcpy(conn->input.headers_unparsed->bytes + conn->input.headers_unparsed->size, frame.payload, frame.length);
379     conn->input.headers_unparsed->size += frame.length;
380 
381     if ((frame.flags & H2O_HTTP2_FRAME_FLAG_END_HEADERS) != 0) {
382         int is_end_stream = (frame.flags & H2O_HTTP2_FRAME_FLAG_END_STREAM) != 0;
383         conn->input.read_frame = expect_default;
384         hret = on_head(conn, stream, (const uint8_t *)conn->input.headers_unparsed->bytes, conn->input.headers_unparsed->size,
385                        err_desc, is_end_stream);
386         if (hret != 0)
387             ret = hret;
388         h2o_buffer_dispose(&conn->input.headers_unparsed);
389         conn->input.headers_unparsed = NULL;
390     }
391 
392     return ret;
393 }
394 
395 static void do_update_window(h2o_httpclient_t *client);
handle_data_frame(struct st_h2o_http2client_conn_t * conn,h2o_http2_frame_t * frame,const char ** err_desc)396 static int handle_data_frame(struct st_h2o_http2client_conn_t *conn, h2o_http2_frame_t *frame, const char **err_desc)
397 {
398     h2o_http2_data_payload_t payload;
399     struct st_h2o_http2client_stream_t *stream;
400     int ret;
401 
402     if ((ret = h2o_http2_decode_data_payload(&payload, frame, err_desc)) != 0)
403         return ret;
404 
405     /* save the input in the request body buffer, or send error (and close the stream) */
406     if ((stream = get_stream(conn, frame->stream_id)) == NULL) {
407         if (frame->stream_id <= conn->max_open_stream_id) {
408             stream_send_error(conn, frame->stream_id, H2O_HTTP2_ERROR_STREAM_CLOSED);
409             return 0;
410         } else {
411             *err_desc = "invalid DATA frame";
412             return H2O_HTTP2_ERROR_PROTOCOL;
413         }
414     }
415 
416     if (stream->state.res != STREAM_STATE_BODY) {
417         stream_send_error(conn, frame->stream_id, H2O_HTTP2_ERROR_PROTOCOL);
418         call_callback_with_error(stream, h2o_httpclient_error_protocol_violation);
419         close_stream(stream);
420         return 0;
421     }
422 
423     size_t max_size = get_max_buffer_size(stream->super.ctx);
424     if (stream->input.body->size + payload.length > max_size) {
425         stream->super._cb.on_body(&stream->super, h2o_httpclient_error_flow_control);
426         stream_send_error(stream->conn, stream->stream_id, H2O_HTTP2_ERROR_FLOW_CONTROL);
427         close_stream(stream);
428         return 0;
429     }
430 
431     h2o_buffer_append(&stream->input.body, (void *)payload.data, payload.length);
432 
433     h2o_http2_window_consume_window(&conn->input.window, payload.length);
434     h2o_http2_window_consume_window(&stream->input.window, payload.length);
435 
436     int is_final = (frame->flags & H2O_HTTP2_FRAME_FLAG_END_STREAM) != 0;
437     if (stream->super._cb.on_body(&stream->super, is_final ? h2o_httpclient_error_is_eos : NULL) != 0) {
438         stream_send_error(conn, frame->stream_id, H2O_HTTP2_ERROR_INTERNAL);
439         close_stream(stream);
440         return 0;
441     }
442 
443     if (is_final) {
444         close_response(stream);
445     } else {
446         /* update connection-level window */
447         enqueue_window_update(stream->conn, 0, &stream->conn->input.window, H2O_HTTP2_SETTINGS_CLIENT_CONNECTION_WINDOW_SIZE);
448         /* update stream-level window */
449         do_update_window(&stream->super);
450     }
451 
452     return 0;
453 }
454 
handle_headers_frame(struct st_h2o_http2client_conn_t * conn,h2o_http2_frame_t * frame,const char ** err_desc)455 static int handle_headers_frame(struct st_h2o_http2client_conn_t *conn, h2o_http2_frame_t *frame, const char **err_desc)
456 {
457     h2o_http2_headers_payload_t payload;
458     struct st_h2o_http2client_stream_t *stream;
459     int ret;
460 
461     /* decode */
462     if ((ret = h2o_http2_decode_headers_payload(&payload, frame, err_desc)) != 0)
463         return ret;
464     if ((frame->stream_id & 1) == 0) {
465         *err_desc = "invalid stream id in HEADERS frame";
466         return H2O_HTTP2_ERROR_PROTOCOL;
467     }
468 
469     if (frame->stream_id == payload.priority.dependency) {
470         *err_desc = "stream cannot depend on itself";
471         return H2O_HTTP2_ERROR_PROTOCOL;
472     }
473 
474     if ((stream = get_stream(conn, frame->stream_id)) == NULL) {
475         *err_desc = "invalid stream id in HEADERS frame";
476         return H2O_HTTP2_ERROR_STREAM_CLOSED;
477     }
478 
479     h2o_timer_unlink(&stream->super._timeout);
480 
481     if (stream->state.res == STREAM_STATE_BODY) {
482         /* is a trailer (ignore after only validating it) */
483         if ((frame->flags & H2O_HTTP2_FRAME_FLAG_END_STREAM) == 0) {
484             *err_desc = "trailing HEADERS frame MUST have END_STREAM flag set";
485             return H2O_HTTP2_ERROR_PROTOCOL;
486         }
487         if ((frame->flags & H2O_HTTP2_FRAME_FLAG_END_HEADERS) == 0) {
488             /* read following continuation frames without initializing `headers_unparsed` */
489             conn->input.read_frame = expect_continuation_of_headers;
490         }
491         return 0;
492     }
493 
494     if ((frame->flags & H2O_HTTP2_FRAME_FLAG_END_HEADERS) == 0) {
495         /* request is not complete, store in buffer */
496         conn->input.read_frame = expect_continuation_of_headers;
497         h2o_buffer_init(&conn->input.headers_unparsed, &h2o_socket_buffer_prototype);
498         h2o_buffer_reserve(&conn->input.headers_unparsed, payload.headers_len);
499         memcpy(conn->input.headers_unparsed->bytes, payload.headers, payload.headers_len);
500         conn->input.headers_unparsed->size = payload.headers_len;
501         return 0;
502     }
503 
504     int is_end_stream = (frame->flags & H2O_HTTP2_FRAME_FLAG_END_STREAM) != 0;
505 
506     /* response header is complete, handle it */
507     return on_head(conn, stream, payload.headers, payload.headers_len, err_desc, is_end_stream);
508 }
509 
handle_priority_frame(struct st_h2o_http2client_conn_t * conn,h2o_http2_frame_t * frame,const char ** err_desc)510 static int handle_priority_frame(struct st_h2o_http2client_conn_t *conn, h2o_http2_frame_t *frame, const char **err_desc)
511 {
512     h2o_http2_priority_t payload;
513     int ret;
514 
515     if ((ret = h2o_http2_decode_priority_payload(&payload, frame, err_desc)) != 0)
516         return ret;
517     if (frame->stream_id == payload.dependency) {
518         *err_desc = "stream cannot depend on itself";
519         return H2O_HTTP2_ERROR_PROTOCOL;
520     }
521 
522     /* Ignore PRIORITY frames */
523     return 0;
524 }
525 
handle_rst_stream_frame(struct st_h2o_http2client_conn_t * conn,h2o_http2_frame_t * frame,const char ** err_desc)526 static int handle_rst_stream_frame(struct st_h2o_http2client_conn_t *conn, h2o_http2_frame_t *frame, const char **err_desc)
527 {
528     h2o_http2_rst_stream_payload_t payload;
529     struct st_h2o_http2client_stream_t *stream;
530     int ret;
531 
532     if ((ret = h2o_http2_decode_rst_stream_payload(&payload, frame, err_desc)) != 0)
533         return ret;
534     if (frame->stream_id > conn->max_open_stream_id) {
535         *err_desc = "unexpected stream id in RST_STREAM frame";
536         return H2O_HTTP2_ERROR_PROTOCOL;
537     }
538 
539     stream = get_stream(conn, frame->stream_id);
540     if (stream != NULL) {
541         /* reset the stream */
542         call_callback_with_error(stream, h2o_httpclient_error_refused_stream);
543         close_stream(stream);
544     }
545 
546     return 0;
547 }
548 
update_stream_output_window(struct st_h2o_http2client_stream_t * stream,ssize_t delta)549 static int update_stream_output_window(struct st_h2o_http2client_stream_t *stream, ssize_t delta)
550 {
551     ssize_t before = h2o_http2_window_get_avail(&stream->output.window);
552     if (h2o_http2_window_update(&stream->output.window, delta) != 0)
553         return -1;
554     ssize_t after = h2o_http2_window_get_avail(&stream->output.window);
555     if (before <= 0 && 0 < after && stream->output.buf != NULL && stream->output.buf->size != 0) {
556         assert(!h2o_linklist_is_linked(&stream->output.sending_link));
557         h2o_linklist_insert(&stream->conn->output.sending_streams, &stream->output.sending_link);
558     }
559     return 0;
560 }
561 
conn_get_buffer_window(struct st_h2o_http2client_conn_t * conn)562 static ssize_t conn_get_buffer_window(struct st_h2o_http2client_conn_t *conn)
563 {
564     ssize_t ret, winsz;
565     size_t capacity, cwnd_left;
566 
567     capacity = conn->output.buf->capacity;
568     if ((cwnd_left = h2o_socket_prepare_for_latency_optimized_write(conn->super.sock,
569                                                                     &conn->super.ctx->http2.latency_optimization)) < capacity) {
570         capacity = cwnd_left;
571         if (capacity < conn->output.buf->size)
572             return 0;
573     }
574 
575     ret = capacity - conn->output.buf->size;
576     if (ret < H2O_HTTP2_FRAME_HEADER_SIZE)
577         return 0;
578     ret -= H2O_HTTP2_FRAME_HEADER_SIZE;
579     winsz = h2o_http2_window_get_avail(&conn->output.window);
580     if (winsz < ret)
581         ret = winsz;
582     return ret;
583 }
584 
handle_settings_frame(struct st_h2o_http2client_conn_t * conn,h2o_http2_frame_t * frame,const char ** err_desc)585 static int handle_settings_frame(struct st_h2o_http2client_conn_t *conn, h2o_http2_frame_t *frame, const char **err_desc)
586 {
587     if (frame->stream_id != 0) {
588         *err_desc = "invalid stream id in SETTINGS frame";
589         return H2O_HTTP2_ERROR_PROTOCOL;
590     }
591 
592     if ((frame->flags & H2O_HTTP2_FRAME_FLAG_ACK) != 0) {
593         if (frame->length != 0) {
594             *err_desc = "invalid SETTINGS frame (+ACK)";
595             return H2O_HTTP2_ERROR_FRAME_SIZE;
596         }
597     } else {
598         uint32_t prev_initial_window_size = conn->peer_settings.initial_window_size;
599         int ret = h2o_http2_update_peer_settings(&conn->peer_settings, frame->payload, frame->length, err_desc);
600         if (ret != 0)
601             return ret;
602         { /* schedule ack */
603             h2o_iovec_t header_buf = h2o_buffer_reserve(&conn->output.buf, H2O_HTTP2_FRAME_HEADER_SIZE);
604             h2o_http2_encode_frame_header((void *)header_buf.base, 0, H2O_HTTP2_FRAME_TYPE_SETTINGS, H2O_HTTP2_FRAME_FLAG_ACK, 0);
605             conn->output.buf->size += H2O_HTTP2_FRAME_HEADER_SIZE;
606             request_write(conn);
607         }
608         /* apply the change to window size (to all the streams but not the connection, see 6.9.2 of draft-15) */
609         if (prev_initial_window_size != conn->peer_settings.initial_window_size) {
610             ssize_t delta = conn->peer_settings.initial_window_size - prev_initial_window_size;
611             struct st_h2o_http2client_stream_t *stream;
612             kh_foreach_value(conn->streams, stream, { update_stream_output_window((void *)stream, delta); });
613 
614             if (conn_get_buffer_window(conn) > 0)
615                 request_write(conn);
616         }
617     }
618 
619     return 0;
620 }
621 
handle_push_promise_frame(struct st_h2o_http2client_conn_t * conn,h2o_http2_frame_t * frame,const char ** err_desc)622 static int handle_push_promise_frame(struct st_h2o_http2client_conn_t *conn, h2o_http2_frame_t *frame, const char **err_desc)
623 {
624     *err_desc = "received PUSH_PROMISE frame";
625     return H2O_HTTP2_ERROR_PROTOCOL;
626 }
627 
handle_ping_frame(struct st_h2o_http2client_conn_t * conn,h2o_http2_frame_t * frame,const char ** err_desc)628 static int handle_ping_frame(struct st_h2o_http2client_conn_t *conn, h2o_http2_frame_t *frame, const char **err_desc)
629 {
630     h2o_http2_ping_payload_t payload;
631     int ret;
632 
633     if ((ret = h2o_http2_decode_ping_payload(&payload, frame, err_desc)) != 0)
634         return ret;
635 
636     if ((frame->flags & H2O_HTTP2_FRAME_FLAG_ACK) == 0) {
637         h2o_http2_encode_ping_frame(&conn->output.buf, 1, payload.data);
638         request_write(conn);
639     }
640 
641     return 0;
642 }
643 
handle_goaway_frame(struct st_h2o_http2client_conn_t * conn,h2o_http2_frame_t * frame,const char ** err_desc)644 static int handle_goaway_frame(struct st_h2o_http2client_conn_t *conn, h2o_http2_frame_t *frame, const char **err_desc)
645 {
646     h2o_http2_goaway_payload_t payload;
647     int ret;
648 
649     if ((ret = h2o_http2_decode_goaway_payload(&payload, frame, err_desc)) != 0)
650         return ret;
651 
652     struct st_h2o_http2client_stream_t *stream;
653     kh_foreach_value(conn->streams, stream, {
654         if (stream->stream_id > payload.last_stream_id) {
655             call_callback_with_error(stream, h2o_httpclient_error_refused_stream);
656             close_stream(stream);
657         }
658     });
659 
660     /* stop opening new streams */
661     if (h2o_linklist_is_linked(&conn->super.link))
662         h2o_linklist_unlink(&conn->super.link);
663 
664     return 0;
665 }
666 
handle_window_update_frame(struct st_h2o_http2client_conn_t * conn,h2o_http2_frame_t * frame,const char ** err_desc)667 static int handle_window_update_frame(struct st_h2o_http2client_conn_t *conn, h2o_http2_frame_t *frame, const char **err_desc)
668 {
669     h2o_http2_window_update_payload_t payload;
670     int ret, err_is_stream_level;
671 
672     if ((ret = h2o_http2_decode_window_update_payload(&payload, frame, err_desc, &err_is_stream_level)) != 0) {
673         if (err_is_stream_level) {
674             stream_send_error(conn, frame->stream_id, ret);
675             struct st_h2o_http2client_stream_t *stream = get_stream(conn, frame->stream_id);
676             if (stream != NULL) {
677                 call_callback_with_error(stream, h2o_httpclient_error_protocol_violation);
678                 close_stream(stream);
679             }
680             return 0;
681         } else {
682             return ret;
683         }
684     }
685 
686     if (frame->stream_id == 0) {
687         if (h2o_http2_window_update(&conn->output.window, payload.window_size_increment) != 0) {
688             *err_desc = "flow control window overflow";
689             return H2O_HTTP2_ERROR_FLOW_CONTROL;
690         }
691     } else if (frame->stream_id <= conn->max_open_stream_id) {
692         struct st_h2o_http2client_stream_t *stream = get_stream(conn, frame->stream_id);
693         if (stream != NULL) {
694             if (update_stream_output_window(stream, payload.window_size_increment) != 0) {
695                 stream_send_error(conn, frame->stream_id, H2O_HTTP2_ERROR_FLOW_CONTROL);
696                 call_callback_with_error(stream, h2o_httpclient_error_flow_control);
697                 close_stream(stream);
698                 return 0;
699             }
700         }
701     } else {
702         *err_desc = "invalid stream id in WINDOW_UPDATE frame";
703         return H2O_HTTP2_ERROR_PROTOCOL;
704     }
705 
706     if (conn_get_buffer_window(conn) > 0)
707         request_write(conn);
708 
709     return 0;
710 }
711 
handle_invalid_continuation_frame(struct st_h2o_http2client_conn_t * conn,h2o_http2_frame_t * frame,const char ** err_desc)712 static int handle_invalid_continuation_frame(struct st_h2o_http2client_conn_t *conn, h2o_http2_frame_t *frame,
713                                              const char **err_desc)
714 {
715     *err_desc = "received invalid CONTINUATION frame";
716     return H2O_HTTP2_ERROR_PROTOCOL;
717 }
718 
expect_default(struct st_h2o_http2client_conn_t * conn,const uint8_t * src,size_t len,const char ** err_desc)719 ssize_t expect_default(struct st_h2o_http2client_conn_t *conn, const uint8_t *src, size_t len, const char **err_desc)
720 {
721     assert(conn->state != H2O_HTTP2CLIENT_CONN_STATE_IS_CLOSING);
722 
723     h2o_http2_frame_t frame;
724     ssize_t ret;
725     static int (*FRAME_HANDLERS[])(struct st_h2o_http2client_conn_t * conn, h2o_http2_frame_t * frame, const char **err_desc) = {
726         handle_data_frame,                /* DATA */
727         handle_headers_frame,             /* HEADERS */
728         handle_priority_frame,            /* PRIORITY */
729         handle_rst_stream_frame,          /* RST_STREAM */
730         handle_settings_frame,            /* SETTINGS */
731         handle_push_promise_frame,        /* PUSH_PROMISE */
732         handle_ping_frame,                /* PING */
733         handle_goaway_frame,              /* GOAWAY */
734         handle_window_update_frame,       /* WINDOW_UPDATE */
735         handle_invalid_continuation_frame /* CONTINUATION */
736     };
737 
738     if ((ret = h2o_http2_decode_frame(&frame, src, len, H2O_HTTP2_SETTINGS_CLIENT_MAX_FRAME_SIZE, err_desc)) < 0)
739         return ret;
740 
741     if (frame.type < sizeof(FRAME_HANDLERS) / sizeof(FRAME_HANDLERS[0])) {
742         int hret = FRAME_HANDLERS[frame.type](conn, &frame, err_desc);
743         if (hret != 0)
744             ret = hret;
745     } else {
746         h2o_error_printf("skipping frame (type:%d)\n", frame.type);
747     }
748 
749     return ret;
750 }
751 
expect_settings(struct st_h2o_http2client_conn_t * conn,const uint8_t * src,size_t len,const char ** err_desc)752 static ssize_t expect_settings(struct st_h2o_http2client_conn_t *conn, const uint8_t *src, size_t len, const char **err_desc)
753 {
754     assert(conn->state != H2O_HTTP2CLIENT_CONN_STATE_IS_CLOSING);
755 
756     h2o_http2_frame_t frame;
757     ssize_t ret;
758 
759     if ((ret = h2o_http2_decode_frame(&frame, src, len, H2O_HTTP2_SETTINGS_CLIENT_MAX_FRAME_SIZE, err_desc)) < 0)
760         return ret;
761 
762     if (frame.type != H2O_HTTP2_FRAME_TYPE_SETTINGS)
763         return H2O_HTTP2_ERROR_PROTOCOL_CLOSE_IMMEDIATELY;
764 
765     int hret = handle_settings_frame(conn, &frame, err_desc);
766     if (hret != 0)
767         return hret;
768 
769     conn->input.read_frame = expect_default;
770     return ret;
771 }
772 
close_connection_now(struct st_h2o_http2client_conn_t * conn)773 static void close_connection_now(struct st_h2o_http2client_conn_t *conn)
774 {
775     free(conn->super.origin_url.authority.base);
776     free(conn->super.origin_url.host.base);
777     free(conn->super.origin_url.path.base);
778 
779     h2o_socket_close(conn->super.sock);
780 
781     struct st_h2o_http2client_stream_t *stream;
782     kh_foreach_value(conn->streams, stream, { close_stream(stream); });
783     kh_destroy(stream, conn->streams);
784 
785     if (h2o_linklist_is_linked(&conn->super.link))
786         h2o_linklist_unlink(&conn->super.link);
787 
788     if (h2o_timer_is_linked(&conn->io_timeout))
789         h2o_timer_unlink(&conn->io_timeout);
790     if (h2o_timer_is_linked(&conn->keepalive_timeout))
791         h2o_timer_unlink(&conn->keepalive_timeout);
792 
793     /* output */
794     h2o_hpack_dispose_header_table(&conn->output.header_table);
795     h2o_buffer_dispose(&conn->output.buf);
796     if (conn->output.buf_in_flight != NULL)
797         h2o_buffer_dispose(&conn->output.buf_in_flight);
798     if (h2o_timer_is_linked(&conn->output.defer_timeout))
799         h2o_timer_unlink(&conn->output.defer_timeout);
800     assert(h2o_linklist_is_empty(&conn->output.sending_streams));
801     assert(h2o_linklist_is_empty(&conn->output.sent_streams));
802 
803     /* input */
804     h2o_hpack_dispose_header_table(&conn->input.header_table);
805     if (conn->input.headers_unparsed != NULL)
806         h2o_buffer_dispose(&conn->input.headers_unparsed);
807 
808     free(conn);
809 }
810 
close_connection_if_necessary(struct st_h2o_http2client_conn_t * conn)811 static int close_connection_if_necessary(struct st_h2o_http2client_conn_t *conn)
812 {
813     if (conn->state == H2O_HTTP2CLIENT_CONN_STATE_HALF_CLOSED && conn->super.num_streams == 0)
814         conn->state = H2O_HTTP2CLIENT_CONN_STATE_IS_CLOSING;
815     if (conn->state == H2O_HTTP2CLIENT_CONN_STATE_IS_CLOSING) {
816         close_connection_now(conn);
817         return 1;
818     }
819     return 0;
820 }
821 
close_connection(struct st_h2o_http2client_conn_t * conn)822 static int close_connection(struct st_h2o_http2client_conn_t *conn)
823 {
824     conn->state = H2O_HTTP2CLIENT_CONN_STATE_IS_CLOSING;
825     h2o_socket_read_stop(conn->super.sock);
826 
827     if (conn->output.buf_in_flight != NULL || h2o_timer_is_linked(&conn->output.defer_timeout)) {
828         /* there is a pending write, let close_connection_if_necessary actually close the connection */
829     } else {
830         close_connection_now(conn);
831         return -1;
832     }
833     return 0;
834 }
835 
enqueue_goaway(struct st_h2o_http2client_conn_t * conn,int errnum,h2o_iovec_t additional_data)836 static void enqueue_goaway(struct st_h2o_http2client_conn_t *conn, int errnum, h2o_iovec_t additional_data)
837 {
838     if (conn->state == H2O_HTTP2CLIENT_CONN_STATE_IS_CLOSING)
839         return;
840 
841     h2o_http2_encode_goaway_frame(&conn->output.buf, 0, errnum, additional_data);
842     request_write(conn);
843     conn->state = H2O_HTTP2CLIENT_CONN_STATE_HALF_CLOSED;
844 
845     /* stop opening new streams */
846     if (h2o_linklist_is_linked(&conn->super.link))
847         h2o_linklist_unlink(&conn->super.link);
848 }
849 
on_connect_error(struct st_h2o_http2client_stream_t * stream,const char * errstr)850 static void on_connect_error(struct st_h2o_http2client_stream_t *stream, const char *errstr)
851 {
852     assert(errstr != NULL);
853     stream->super._cb.on_connect(&stream->super, errstr, NULL, NULL, NULL, 0, NULL, NULL, NULL, NULL);
854     close_stream(stream);
855 }
856 
do_stream_timeout(struct st_h2o_http2client_stream_t * stream)857 static void do_stream_timeout(struct st_h2o_http2client_stream_t *stream)
858 {
859     if (stream->conn == NULL) {
860         on_connect_error(stream, h2o_httpclient_error_connect_timeout);
861         return;
862     }
863     const char *errstr =
864         stream->state.res == STREAM_STATE_HEAD ? h2o_httpclient_error_first_byte_timeout : h2o_httpclient_error_io_timeout;
865     call_callback_with_error(stream, errstr);
866     close_stream(stream);
867 }
868 
on_stream_timeout(h2o_timer_t * entry)869 static void on_stream_timeout(h2o_timer_t *entry)
870 {
871     struct st_h2o_http2client_stream_t *stream = H2O_STRUCT_FROM_MEMBER(struct st_h2o_http2client_stream_t, super._timeout, entry);
872     do_stream_timeout(stream);
873 }
874 
on_io_timeout(h2o_timer_t * entry)875 static void on_io_timeout(h2o_timer_t *entry)
876 {
877     struct st_h2o_http2client_conn_t *conn = H2O_STRUCT_FROM_MEMBER(struct st_h2o_http2client_conn_t, io_timeout, entry);
878     struct st_h2o_http2client_stream_t *stream;
879     kh_foreach_value(conn->streams, stream, { do_stream_timeout(stream); });
880     close_connection_now(conn);
881 }
882 
on_keepalive_timeout(h2o_timer_t * entry)883 static void on_keepalive_timeout(h2o_timer_t *entry)
884 {
885     struct st_h2o_http2client_conn_t *conn = H2O_STRUCT_FROM_MEMBER(struct st_h2o_http2client_conn_t, keepalive_timeout, entry);
886     enqueue_goaway(conn, H2O_HTTP2_ERROR_NONE, h2o_iovec_init(NULL, 0));
887     request_write(conn);
888     close_connection(conn);
889 }
890 
parse_input(struct st_h2o_http2client_conn_t * conn)891 static int parse_input(struct st_h2o_http2client_conn_t *conn)
892 {
893     /* handle the input */
894     while (conn->state != H2O_HTTP2CLIENT_CONN_STATE_IS_CLOSING && conn->super.sock->input->size != 0) {
895         /* process a frame */
896         const char *err_desc = NULL;
897         ssize_t ret =
898             conn->input.read_frame(conn, (uint8_t *)conn->super.sock->input->bytes, conn->super.sock->input->size, &err_desc);
899         if (ret == H2O_HTTP2_ERROR_INCOMPLETE) {
900             break;
901         } else if (ret < 0) {
902             if (ret != H2O_HTTP2_ERROR_PROTOCOL_CLOSE_IMMEDIATELY) {
903                 enqueue_goaway(conn, (int)ret,
904                                err_desc != NULL ? (h2o_iovec_t){(char *)err_desc, strlen(err_desc)} : (h2o_iovec_t){NULL});
905             }
906             call_stream_callbacks_with_error(conn, h2o_httpclient_error_protocol_violation);
907             return close_connection(conn);
908         }
909         /* advance to the next frame */
910         h2o_buffer_consume(&conn->super.sock->input, ret);
911     }
912     return 0;
913 }
914 
on_read(h2o_socket_t * sock,const char * err)915 static void on_read(h2o_socket_t *sock, const char *err)
916 {
917     struct st_h2o_http2client_conn_t *conn = sock->data;
918 
919     h2o_timer_unlink(&conn->io_timeout);
920 
921     if (err != NULL) {
922         call_stream_callbacks_with_error(conn, h2o_httpclient_error_io);
923         close_connection(conn);
924         return;
925     }
926 
927     if (parse_input(conn) != 0)
928         return;
929 
930     /* write immediately, if pending write exists */
931     if (h2o_timer_is_linked(&conn->output.defer_timeout)) {
932         h2o_timer_unlink(&conn->output.defer_timeout);
933         do_emit_writereq(conn);
934     }
935 
936     if (!h2o_timer_is_linked(&conn->io_timeout))
937         h2o_timer_link(conn->super.ctx->loop, conn->super.ctx->io_timeout, &conn->io_timeout);
938 }
939 
on_connection_ready(struct st_h2o_http2client_stream_t * stream,struct st_h2o_http2client_conn_t * conn)940 static void on_connection_ready(struct st_h2o_http2client_stream_t *stream, struct st_h2o_http2client_conn_t *conn)
941 {
942     h2o_iovec_t method;
943     h2o_url_t url;
944     h2o_header_t *headers;
945     size_t num_headers;
946     h2o_iovec_t body;
947     h2o_httpclient_properties_t props = (h2o_httpclient_properties_t){NULL};
948 
949     register_stream(stream, conn);
950 
951     stream->super._cb.on_head =
952         stream->super._cb.on_connect(&stream->super, NULL, &method, &url, (const h2o_header_t **)&headers, &num_headers, &body,
953                                      &stream->streaming.proceed_req, &props, &conn->super.origin_url);
954     if (stream->super._cb.on_head == NULL) {
955         close_stream(stream);
956         return;
957     }
958 
959     h2o_http2_window_init(&stream->output.window, conn->peer_settings.initial_window_size);
960 
961     /* forward request state */
962     if (stream->streaming.proceed_req != NULL) {
963         stream->state.req = STREAM_STATE_BODY;
964         if (body.len != 0)
965             stream->streaming.inflight = 1;
966     } else if (body.base != NULL) {
967         stream->state.req = STREAM_STATE_BODY;
968     } else {
969         stream->state.req = STREAM_STATE_CLOSED;
970     }
971 
972     /* send headers */
973     h2o_hpack_flatten_request(&conn->output.buf, &conn->output.header_table, conn->peer_settings.header_table_size,
974                               stream->stream_id, conn->peer_settings.max_frame_size, method, &url, headers, num_headers,
975                               stream->state.req == STREAM_STATE_CLOSED);
976 
977     if (stream->state.req == STREAM_STATE_BODY) {
978         h2o_buffer_init(&stream->output.buf, &h2o_socket_buffer_prototype);
979         h2o_buffer_append(&stream->output.buf, body.base, body.len);
980     }
981     h2o_linklist_insert(&conn->output.sending_streams, &stream->output.sending_link);
982     request_write(conn);
983 }
984 
on_notify_write(h2o_socket_t * sock,const char * err)985 static void on_notify_write(h2o_socket_t *sock, const char *err)
986 {
987     struct st_h2o_http2client_conn_t *conn = sock->data;
988 
989     if (err != NULL) {
990         call_stream_callbacks_with_error(conn, h2o_httpclient_error_io);
991         close_connection_now(conn);
992         return;
993     }
994     do_emit_writereq(conn);
995     close_connection_if_necessary(conn);
996 }
997 
on_write_complete(h2o_socket_t * sock,const char * err)998 static void on_write_complete(h2o_socket_t *sock, const char *err)
999 {
1000     struct st_h2o_http2client_conn_t *conn = sock->data;
1001 
1002     assert(conn->output.buf_in_flight != NULL);
1003 
1004     h2o_timer_unlink(&conn->io_timeout);
1005 
1006     /* close by error if necessary */
1007     if (err != NULL) {
1008         call_stream_callbacks_with_error(conn, h2o_httpclient_error_io);
1009         close_connection_now(conn);
1010         return;
1011     }
1012 
1013     if (close_connection_if_necessary(conn))
1014         return;
1015 
1016     /* unlink timeouts of streams that has finished sending requests */
1017     while (!h2o_linklist_is_empty(&conn->output.sent_streams)) {
1018         h2o_linklist_t *link = conn->output.sent_streams.next;
1019         struct st_h2o_http2client_stream_t *stream =
1020             H2O_STRUCT_FROM_MEMBER(struct st_h2o_http2client_stream_t, output.sending_link, link);
1021         h2o_linklist_unlink(link);
1022 
1023         if (stream->streaming.proceed_req != NULL && stream->streaming.inflight) {
1024             stream->streaming.inflight = 0;
1025             stream->streaming.proceed_req(&stream->super, NULL);
1026         }
1027 
1028         if (stream->streaming.proceed_req == NULL || stream->streaming.done) {
1029             stream->state.req = STREAM_STATE_CLOSED;
1030             h2o_timer_link(stream->super.ctx->loop, stream->super.ctx->first_byte_timeout, &stream->super._timeout);
1031         }
1032     }
1033 
1034     /* reset the other buffer */
1035     h2o_buffer_dispose(&conn->output.buf_in_flight);
1036 
1037 #if !H2O_USE_LIBUV
1038     if (conn->state == H2O_HTTP2CLIENT_CONN_STATE_OPEN) {
1039         if (conn->output.buf->size != 0 || !h2o_linklist_is_empty(&conn->output.sending_streams))
1040             h2o_socket_notify_write(sock, on_notify_write);
1041         return;
1042     }
1043 #endif
1044 
1045     /* write more, if possible */
1046     do_emit_writereq(conn);
1047     close_connection_if_necessary(conn);
1048 }
1049 
sz_min(size_t x,size_t y)1050 static size_t sz_min(size_t x, size_t y)
1051 {
1052     return x < y ? x : y;
1053 }
1054 
calc_max_payload_size(struct st_h2o_http2client_stream_t * stream)1055 static size_t calc_max_payload_size(struct st_h2o_http2client_stream_t *stream)
1056 {
1057     ssize_t conn_max, stream_max;
1058 
1059     if ((conn_max = conn_get_buffer_window(stream->conn)) <= 0)
1060         return 0;
1061     if ((stream_max = h2o_http2_window_get_avail(&stream->output.window)) <= 0)
1062         return 0;
1063     return sz_min(sz_min(conn_max, stream_max), stream->conn->peer_settings.max_frame_size);
1064 }
1065 
stream_emit_pending_data(struct st_h2o_http2client_stream_t * stream)1066 static size_t stream_emit_pending_data(struct st_h2o_http2client_stream_t *stream)
1067 {
1068     size_t max_payload_size = calc_max_payload_size(stream);
1069     size_t payload_size = sz_min(max_payload_size, stream->output.buf->size);
1070     if (payload_size == 0)
1071         return 0;
1072     char *dst = h2o_buffer_reserve(&stream->conn->output.buf, H2O_HTTP2_FRAME_HEADER_SIZE + payload_size).base;
1073     int end_stream = (stream->streaming.proceed_req == NULL || stream->streaming.done) && payload_size == stream->output.buf->size;
1074     h2o_http2_encode_frame_header((void *)dst, stream->output.buf->size, H2O_HTTP2_FRAME_TYPE_DATA,
1075                                   end_stream ? H2O_HTTP2_FRAME_FLAG_END_STREAM : 0, stream->stream_id);
1076     h2o_memcpy(dst + H2O_HTTP2_FRAME_HEADER_SIZE, stream->output.buf->bytes, payload_size);
1077     stream->conn->output.buf->size += H2O_HTTP2_FRAME_HEADER_SIZE + payload_size;
1078     h2o_buffer_consume(&stream->output.buf, payload_size);
1079 
1080     h2o_http2_window_consume_window(&stream->conn->output.window, payload_size);
1081     h2o_http2_window_consume_window(&stream->output.window, payload_size);
1082 
1083     return payload_size;
1084 }
1085 
do_emit_writereq(struct st_h2o_http2client_conn_t * conn)1086 static void do_emit_writereq(struct st_h2o_http2client_conn_t *conn)
1087 {
1088     assert(conn->output.buf_in_flight == NULL);
1089 
1090     /* emit DATA frames */
1091     h2o_linklist_t *node = conn->output.sending_streams.next;
1092     h2o_linklist_t *first = node;
1093     while (node != &conn->output.sending_streams) {
1094         h2o_linklist_t *next = node->next;
1095         struct st_h2o_http2client_stream_t *stream =
1096             H2O_STRUCT_FROM_MEMBER(struct st_h2o_http2client_stream_t, output.sending_link, node);
1097         h2o_linklist_unlink(node);
1098 
1099         size_t bytes_emitted = 0;
1100         if (stream->output.buf != NULL && stream->output.buf->size != 0)
1101             bytes_emitted = stream_emit_pending_data(stream);
1102 
1103         if (stream->output.buf != NULL && stream->output.buf->size == 0) {
1104             h2o_linklist_insert(&conn->output.sent_streams, node);
1105         } else if (h2o_http2_window_get_avail(&stream->output.window) > 0) {
1106             h2o_linklist_insert(&conn->output.sending_streams, node); /* move to the tail to rotate buffers */
1107         }
1108 
1109         if (next == first)
1110             break;
1111         node = next;
1112     }
1113 
1114     if (conn->output.buf->size != 0) {
1115         /* write and wait for completion */
1116         h2o_iovec_t buf = {conn->output.buf->bytes, conn->output.buf->size};
1117         h2o_socket_write(conn->super.sock, &buf, 1, on_write_complete);
1118         conn->output.buf_in_flight = conn->output.buf;
1119         h2o_buffer_init(&conn->output.buf, &h2o_http2_wbuf_buffer_prototype);
1120         if (!h2o_timer_is_linked(&conn->io_timeout))
1121             h2o_timer_link(conn->super.ctx->loop, conn->super.ctx->io_timeout, &conn->io_timeout);
1122     }
1123 }
1124 
emit_writereq(h2o_timer_t * entry)1125 static void emit_writereq(h2o_timer_t *entry)
1126 {
1127     struct st_h2o_http2client_conn_t *conn = H2O_STRUCT_FROM_MEMBER(struct st_h2o_http2client_conn_t, output.defer_timeout, entry);
1128     do_emit_writereq(conn);
1129 }
1130 
create_connection(h2o_httpclient_ctx_t * ctx,h2o_socket_t * sock,h2o_url_t * origin_url,h2o_httpclient_connection_pool_t * connpool)1131 static struct st_h2o_http2client_conn_t *create_connection(h2o_httpclient_ctx_t *ctx, h2o_socket_t *sock, h2o_url_t *origin_url,
1132                                                            h2o_httpclient_connection_pool_t *connpool)
1133 {
1134     struct st_h2o_http2client_conn_t *conn = h2o_mem_alloc(sizeof(*conn));
1135     memset(conn, 0, sizeof(*conn));
1136     conn->super.ctx = ctx;
1137     conn->super.sock = sock;
1138     conn->state = H2O_HTTP2CLIENT_CONN_STATE_OPEN;
1139     conn->peer_settings = H2O_HTTP2_SETTINGS_DEFAULT;
1140     conn->streams = kh_init(stream);
1141     h2o_url_copy(NULL, &conn->super.origin_url, origin_url);
1142     if (connpool != NULL)
1143         h2o_linklist_insert(&connpool->http2.conns, &conn->super.link);
1144     conn->io_timeout.cb = on_io_timeout;
1145     conn->keepalive_timeout.cb = on_keepalive_timeout;
1146 
1147     /* output */
1148     conn->output.header_table.hpack_capacity = H2O_HTTP2_SETTINGS_CLIENT_HEADER_TABLE_SIZE;
1149     h2o_http2_window_init(&conn->output.window, conn->peer_settings.initial_window_size);
1150     h2o_buffer_init(&conn->output.buf, &h2o_http2_wbuf_buffer_prototype);
1151     conn->output.defer_timeout.cb = emit_writereq;
1152     h2o_linklist_init_anchor(&conn->output.sending_streams);
1153     h2o_linklist_init_anchor(&conn->output.sent_streams);
1154 
1155     /* input */
1156     conn->input.header_table.hpack_capacity = conn->input.header_table.hpack_max_capacity =
1157         H2O_HTTP2_SETTINGS_DEFAULT.header_table_size;
1158     h2o_http2_window_init(&conn->input.window, H2O_HTTP2_SETTINGS_DEFAULT.initial_window_size);
1159     conn->input.read_frame = expect_settings;
1160 
1161     return conn;
1162 }
1163 
send_client_preface(struct st_h2o_http2client_conn_t * conn,h2o_httpclient_ctx_t * ctx)1164 static void send_client_preface(struct st_h2o_http2client_conn_t *conn, h2o_httpclient_ctx_t *ctx)
1165 {
1166 #define PREFIX                                                                                                                     \
1167     "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"                                                                                             \
1168     "\x00\x00\x12"     /* frame size */                                                                                            \
1169     "\x04"             /* settings frame */                                                                                        \
1170     "\x00"             /* no flags */                                                                                              \
1171     "\x00\x00\x00\x00" /* stream id */                                                                                             \
1172     "\x00\x02"         /* enable_push */                                                                                           \
1173     "\x00\x00\x00\x00" /* 0 */                                                                                                     \
1174     "\x00\x03"         /* max_concurrent_streams */                                                                                \
1175     "\x00\x00\x00\x64" /* 100 */                                                                                                   \
1176     "\x00\x04"         /* initial_window_size */
1177     static const size_t len = sizeof(PREFIX) - 1 + 4;
1178 
1179     uint32_t initial_window_size = get_max_buffer_size(ctx);
1180 
1181     h2o_iovec_t vec = h2o_buffer_reserve(&conn->output.buf, len);
1182     memcpy(vec.base, PREFIX, sizeof(PREFIX) - 1);
1183 
1184     /* encode max_buffer_size */
1185     vec.base[len - 4] = (char)((initial_window_size >> 24) & 0xff);
1186     vec.base[len - 3] = (char)((initial_window_size >> 16) & 0xff);
1187     vec.base[len - 2] = (char)((initial_window_size >> 8) & 0xff);
1188     vec.base[len - 1] = (char)(initial_window_size & 0xff);
1189 
1190     conn->output.buf->size += len;
1191     request_write(conn);
1192 #undef PREFIX
1193 }
1194 
do_cancel(h2o_httpclient_t * _client)1195 static void do_cancel(h2o_httpclient_t *_client)
1196 {
1197     struct st_h2o_http2client_stream_t *stream = (void *)_client;
1198     stream_send_error(stream->conn, stream->stream_id, H2O_HTTP2_ERROR_CANCEL);
1199     close_stream(stream);
1200 }
1201 
do_get_conn_properties(h2o_httpclient_t * _client,h2o_httpclient_conn_properties_t * properties)1202 static void do_get_conn_properties(h2o_httpclient_t *_client, h2o_httpclient_conn_properties_t *properties)
1203 {
1204     struct st_h2o_http2client_stream_t *stream = (void *)_client;
1205     h2o_httpclient_set_conn_properties_of_socket(stream->conn->super.sock, properties);
1206 }
1207 
do_update_window(h2o_httpclient_t * _client)1208 static void do_update_window(h2o_httpclient_t *_client)
1209 {
1210     struct st_h2o_http2client_stream_t *stream = (void *)_client;
1211     size_t max = get_max_buffer_size(stream->super.ctx);
1212     size_t bufsize = stream->input.body->size;
1213     assert(bufsize <= max);
1214     enqueue_window_update(stream->conn, stream->stream_id, &stream->input.window, max - bufsize);
1215 }
1216 
do_write_req(h2o_httpclient_t * _client,h2o_iovec_t chunk,int is_end_stream)1217 static int do_write_req(h2o_httpclient_t *_client, h2o_iovec_t chunk, int is_end_stream)
1218 {
1219     struct st_h2o_http2client_stream_t *stream = (void *)_client;
1220     assert(stream->streaming.proceed_req != NULL);
1221     assert(!stream->streaming.inflight);
1222 
1223     stream->streaming.inflight = 1;
1224     if (is_end_stream)
1225         stream->streaming.done = 1;
1226 
1227     if (stream->output.buf == NULL) {
1228         h2o_buffer_init(&stream->output.buf, &h2o_socket_buffer_prototype);
1229     }
1230 
1231     if (chunk.len != 0) {
1232         h2o_buffer_append(&stream->output.buf, chunk.base, chunk.len);
1233     }
1234 
1235     if (!h2o_linklist_is_linked(&stream->output.sending_link)) {
1236         h2o_linklist_insert(&stream->conn->output.sending_streams, &stream->output.sending_link);
1237         request_write(stream->conn);
1238     }
1239 
1240     return 0;
1241 }
1242 
setup_stream(struct st_h2o_http2client_stream_t * stream)1243 static void setup_stream(struct st_h2o_http2client_stream_t *stream)
1244 {
1245     memset(&stream->conn, 0, sizeof(*stream) - offsetof(struct st_h2o_http2client_stream_t, conn));
1246 
1247     stream->super._timeout.cb = on_stream_timeout;
1248     h2o_http2_window_init(&stream->input.window, get_max_buffer_size(stream->super.ctx));
1249     h2o_buffer_init(&stream->input.body, &h2o_socket_buffer_prototype);
1250 
1251     stream->super.buf = &stream->input.body;
1252     stream->super.cancel = do_cancel;
1253     stream->super.get_conn_properties = do_get_conn_properties;
1254     stream->super.update_window = do_update_window;
1255     stream->super.write_req = do_write_req;
1256 }
1257 
h2o_httpclient__h2_on_connect(h2o_httpclient_t * _client,h2o_socket_t * sock,h2o_url_t * origin)1258 void h2o_httpclient__h2_on_connect(h2o_httpclient_t *_client, h2o_socket_t *sock, h2o_url_t *origin)
1259 {
1260     struct st_h2o_http2client_stream_t *stream = (void *)_client;
1261 
1262     assert(!h2o_timer_is_linked(&stream->super._timeout));
1263 
1264     struct st_h2o_http2client_conn_t *conn = sock->data;
1265     if (conn == NULL) {
1266         conn = create_connection(stream->super.ctx, sock, origin, stream->super.connpool);
1267         sock->data = conn;
1268         /* send preface, settings, and connection-level window update */
1269         send_client_preface(conn, stream->super.ctx);
1270         h2o_socket_read_start(conn->super.sock, on_read);
1271     }
1272 
1273     setup_stream(stream);
1274 
1275     if (!h2o_timer_is_linked(&conn->io_timeout))
1276         h2o_timer_link(conn->super.ctx->loop, conn->super.ctx->io_timeout, &conn->io_timeout);
1277     on_connection_ready(stream, conn);
1278 }
1279 
1280 const size_t h2o_httpclient__h2_size = sizeof(struct st_h2o_http2client_stream_t);
1281