1 /*
2  * Copyright (c) 2018 Fastly, Kazuho Oku
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a copy
5  * of this software and associated documentation files (the "Software"), to
6  * deal in the Software without restriction, including without limitation the
7  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
8  * sell copies of the Software, and to permit persons to whom the Software is
9  * furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in
12  * all copies or substantial portions of the Software.
13  *
14  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20  * IN THE SOFTWARE.
21  */
22 #include <sys/socket.h>
23 #include "khash.h"
24 #include "h2o/absprio.h"
25 #include "h2o/http3_common.h"
26 #include "h2o/http3_server.h"
27 #include "h2o/http3_internal.h"
28 #include "./../probes_.h"
29 
30 /**
31  * the scheduler
32  */
33 struct st_h2o_http3_req_scheduler_t {
34     struct {
35         struct {
36             h2o_linklist_t high;
37             h2o_linklist_t low;
38         } urgencies[H2O_ABSPRIO_NUM_URGENCY_LEVELS];
39         size_t smallest_urgency;
40     } active;
41     h2o_linklist_t conn_blocked;
42 };
43 
44 /**
45  *
46  */
47 struct st_h2o_http3_req_scheduler_node_t {
48     h2o_linklist_t link;
49     h2o_absprio_t priority;
50     uint64_t call_cnt;
51 };
52 
53 /**
54  * callback used to compare precedence of the entries within the same urgency level (e.g., by comparing stream IDs)
55  */
56 typedef int (*h2o_http3_req_scheduler_compare_cb)(struct st_h2o_http3_req_scheduler_t *sched,
57                                                   const struct st_h2o_http3_req_scheduler_node_t *x,
58                                                   const struct st_h2o_http3_req_scheduler_node_t *y);
59 
60 /**
61  * Once the size of the request body being received exceeds thit limit, streaming mode will be used (if possible), and the
62  * concurrency of such requests would be limited to one per connection.
63  */
64 #define H2O_HTTP3_REQUEST_BODY_MIN_BYTES_TO_BLOCK 10240
65 
66 enum h2o_http3_server_stream_state {
67     /**
68      * receiving headers
69      */
70     H2O_HTTP3_SERVER_STREAM_STATE_RECV_HEADERS,
71     /**
72      * receiving request body (runs concurrently)
73      */
74     H2O_HTTP3_SERVER_STREAM_STATE_RECV_BODY_BEFORE_BLOCK,
75     /**
76      * blocked, waiting to be unblocked one by one (either in streaming mode or in non-streaming mode)
77      */
78     H2O_HTTP3_SERVER_STREAM_STATE_RECV_BODY_BLOCKED,
79     /**
80      * in non-streaming mode, receiving body
81      */
82     H2O_HTTP3_SERVER_STREAM_STATE_RECV_BODY_UNBLOCKED,
83     /**
84      * in non-streaming mode, waiting for the request to be processed
85      */
86     H2O_HTTP3_SERVER_STREAM_STATE_REQ_PENDING,
87     /**
88      * request has been processed, waiting for the response headers
89      */
90     H2O_HTTP3_SERVER_STREAM_STATE_SEND_HEADERS,
91     /**
92      * sending body (the generator MAY have closed, but the transmission to the client is still ongoing)
93      */
94     H2O_HTTP3_SERVER_STREAM_STATE_SEND_BODY,
95     /**
96      * all data has been sent and ACKed, waiting for the transport stream to close (req might be disposed when entering this state)
97      */
98     H2O_HTTP3_SERVER_STREAM_STATE_CLOSE_WAIT
99 };
100 
101 struct st_h2o_http3_server_stream_t;
102 KHASH_MAP_INIT_INT64(stream, struct st_h2o_http3_server_stream_t *)
103 
104 struct st_h2o_http3_server_conn_t {
105     h2o_conn_t super;
106     h2o_http3_conn_t h3;
107     ptls_handshake_properties_t handshake_properties;
108     h2o_linklist_t _conns; /* linklist to h2o_context_t::http3._conns */
109     /**
110      * link-list of pending requests using st_h2o_http3_server_stream_t::link
111      */
112     struct {
113         /**
114          * holds streams in RECV_BODY_BLOCKED state. They are promoted one by one to the POST_BLOCK State.
115          */
116         h2o_linklist_t recv_body_blocked;
117         /**
118          * holds streams that are in request streaming mode.
119          */
120         h2o_linklist_t req_streaming;
121         /**
122          * holds streams in REQ_PENDING state or RECV_BODY_POST_BLOCK state (that is using streaming; i.e., write_req.cb != NULL).
123          */
124         h2o_linklist_t pending;
125     } delayed_streams;
126     /**
127      * next application-level timeout
128      */
129     h2o_timer_t timeout;
130     /**
131      * counter (the order MUST match that of h2o_http3_server_stream_state; it is accessed by index via the use of counters[])
132      */
133     union {
134         struct {
135             uint32_t recv_headers;
136             uint32_t recv_body_before_block;
137             uint32_t recv_body_blocked;
138             uint32_t recv_body_unblocked;
139             uint32_t req_pending;
140             uint32_t send_headers;
141             uint32_t send_body;
142             uint32_t close_wait;
143         };
144         uint32_t counters[1];
145     } num_streams;
146     /**
147      * Number of streams that is request streaming. The state can be in either one of SEND_HEADERS, SEND_BODY, CLOSE_WAIT.
148      */
149     uint32_t num_streams_req_streaming;
150     /**
151      * number of streams in tunneling mode
152      */
153     uint32_t num_streams_tunnelling;
154     /**
155      * scheduler
156      */
157     struct {
158         /**
159          * States for request streams.
160          */
161         struct st_h2o_http3_req_scheduler_t reqs;
162         /**
163          * States for unidirectional streams. Each element is a bit vector where slot for each stream is defined as: 1 << stream_id.
164          */
165         struct {
166             uint16_t active;
167             uint16_t conn_blocked;
168         } uni;
169     } scheduler;
170     /**
171      * stream map used for datagram flows
172      */
173     khash_t(stream) * datagram_flows;
174 };
175 
176 /**
177  * sendvec, with additional field that contains the starting offset of the content
178  */
179 struct st_h2o_http3_server_sendvec_t {
180     h2o_sendvec_t vec;
181     /**
182      * Starting offset of the content carried by the vector, or UINT64_MAX if it is not carrying body
183      */
184     uint64_t entity_offset;
185 };
186 
187 struct st_h2o_http3_server_stream_t {
188     quicly_stream_t *quic;
189     struct {
190         h2o_buffer_t *buf;
191         int (*handle_input)(struct st_h2o_http3_server_stream_t *stream, const uint8_t **src, const uint8_t *src_end,
192                             int in_generator, const char **err_desc);
193         uint64_t bytes_left_in_data_frame;
194     } recvbuf;
195     struct {
196         H2O_VECTOR(struct st_h2o_http3_server_sendvec_t) vecs;
197         size_t off_within_first_vec;
198         size_t min_index_to_addref;
199         uint64_t final_size, final_body_size;
200         uint8_t data_frame_header_buf[9];
201     } sendbuf;
202     enum h2o_http3_server_stream_state state;
203     h2o_linklist_t link;
204     h2o_ostream_t ostr_final;
205     struct st_h2o_http3_req_scheduler_node_t scheduler;
206     /**
207      * if read is blocked
208      */
209     uint8_t read_blocked : 1;
210     /**
211      * if h2o_proceed_response has been invoked, or if the invocation has been requested
212      */
213     uint8_t proceed_requested : 1;
214     /**
215      * this flag is set by on_send_emit, triggers the invocation h2o_proceed_response in scheduler_do_send, used by do_send to
216      * take different actions based on if it has been called while scheduler_do_send is running.
217      */
218     uint8_t proceed_while_sending : 1;
219     /**
220      * if a PRIORITY_UPDATE frame has been received
221      */
222     uint8_t received_priority_update : 1;
223     /**
224      * used in CLOSE_WAIT state to determine if h2o_dispose_request has been called
225      */
226     uint8_t req_disposed : 1;
227     /**
228      * buffer to hold the request body (or a chunk of, if in streaming mode), or CONNECT payload
229      */
230     h2o_buffer_t *req_body;
231     /**
232      * flow ID used by masque over H3_DATAGRAMS
233      */
234     uint64_t datagram_flow_id;
235     /**
236      * the request. Placed at the end, as it holds the pool.
237      */
238     h2o_req_t req;
239 };
240 
241 static void on_stream_destroy(quicly_stream_t *qs, int err);
242 static int handle_input_post_trailers(struct st_h2o_http3_server_stream_t *stream, const uint8_t **src, const uint8_t *src_end,
243                                       int in_generator, const char **err_desc);
244 static int handle_input_expect_data(struct st_h2o_http3_server_stream_t *stream, const uint8_t **src, const uint8_t *src_end,
245                                     int in_generator, const char **err_desc);
246 
req_scheduler_init(struct st_h2o_http3_req_scheduler_t * sched)247 static void req_scheduler_init(struct st_h2o_http3_req_scheduler_t *sched)
248 {
249     size_t i;
250 
251     for (i = 0; i < H2O_ABSPRIO_NUM_URGENCY_LEVELS; ++i) {
252         h2o_linklist_init_anchor(&sched->active.urgencies[i].high);
253         h2o_linklist_init_anchor(&sched->active.urgencies[i].low);
254     }
255     sched->active.smallest_urgency = i;
256     h2o_linklist_init_anchor(&sched->conn_blocked);
257 }
258 
req_scheduler_activate(struct st_h2o_http3_req_scheduler_t * sched,struct st_h2o_http3_req_scheduler_node_t * node,h2o_http3_req_scheduler_compare_cb comp)259 static void req_scheduler_activate(struct st_h2o_http3_req_scheduler_t *sched, struct st_h2o_http3_req_scheduler_node_t *node,
260                                    h2o_http3_req_scheduler_compare_cb comp)
261 {
262     /* unlink if necessary */
263     if (h2o_linklist_is_linked(&node->link))
264         h2o_linklist_unlink(&node->link);
265 
266     if (!node->priority.incremental || node->call_cnt == 0) {
267         /* non-incremental streams and the first emission of incremental streams go in strict order */
268         h2o_linklist_t *anchor = &sched->active.urgencies[node->priority.urgency].high, *pos;
269         for (pos = anchor->prev; pos != anchor; pos = pos->prev) {
270             struct st_h2o_http3_req_scheduler_node_t *node_at_pos =
271                 H2O_STRUCT_FROM_MEMBER(struct st_h2o_http3_req_scheduler_node_t, link, pos);
272             if (comp(sched, node_at_pos, node) < 0)
273                 break;
274         }
275         h2o_linklist_insert(pos->next, &node->link);
276     } else {
277         /* once sent, incremental streams go into a lower list */
278         h2o_linklist_insert(&sched->active.urgencies[node->priority.urgency].low, &node->link);
279     }
280 
281     /* book keeping */
282     if (node->priority.urgency < sched->active.smallest_urgency)
283         sched->active.smallest_urgency = node->priority.urgency;
284 }
285 
req_scheduler_update_smallest_urgency_post_removal(struct st_h2o_http3_req_scheduler_t * sched,size_t changed)286 static void req_scheduler_update_smallest_urgency_post_removal(struct st_h2o_http3_req_scheduler_t *sched, size_t changed)
287 {
288     if (sched->active.smallest_urgency < changed)
289         return;
290 
291     /* search from the location that *might* have changed */
292     sched->active.smallest_urgency = changed;
293     while (h2o_linklist_is_empty(&sched->active.urgencies[sched->active.smallest_urgency].high) &&
294            h2o_linklist_is_empty(&sched->active.urgencies[sched->active.smallest_urgency].low)) {
295         ++sched->active.smallest_urgency;
296         if (sched->active.smallest_urgency >= H2O_ABSPRIO_NUM_URGENCY_LEVELS)
297             break;
298     }
299 }
300 
req_scheduler_deactivate(struct st_h2o_http3_req_scheduler_t * sched,struct st_h2o_http3_req_scheduler_node_t * node)301 static void req_scheduler_deactivate(struct st_h2o_http3_req_scheduler_t *sched, struct st_h2o_http3_req_scheduler_node_t *node)
302 {
303     if (h2o_linklist_is_linked(&node->link))
304         h2o_linklist_unlink(&node->link);
305 
306     req_scheduler_update_smallest_urgency_post_removal(sched, node->priority.urgency);
307 }
308 
req_scheduler_setup_for_next(struct st_h2o_http3_req_scheduler_t * sched,struct st_h2o_http3_req_scheduler_node_t * node,h2o_http3_req_scheduler_compare_cb comp)309 static void req_scheduler_setup_for_next(struct st_h2o_http3_req_scheduler_t *sched, struct st_h2o_http3_req_scheduler_node_t *node,
310                                          h2o_http3_req_scheduler_compare_cb comp)
311 {
312     assert(h2o_linklist_is_linked(&node->link));
313 
314     /* reschedule to achieve round-robin behavior */
315     if (node->priority.incremental)
316         req_scheduler_activate(sched, node, comp);
317 }
318 
req_scheduler_conn_blocked(struct st_h2o_http3_req_scheduler_t * sched,struct st_h2o_http3_req_scheduler_node_t * node)319 static void req_scheduler_conn_blocked(struct st_h2o_http3_req_scheduler_t *sched, struct st_h2o_http3_req_scheduler_node_t *node)
320 {
321     if (h2o_linklist_is_linked(&node->link))
322         h2o_linklist_unlink(&node->link);
323 
324     h2o_linklist_insert(&sched->conn_blocked, &node->link);
325 
326     req_scheduler_update_smallest_urgency_post_removal(sched, node->priority.urgency);
327 }
328 
req_scheduler_unblock_conn_blocked(struct st_h2o_http3_req_scheduler_t * sched,h2o_http3_req_scheduler_compare_cb comp)329 static void req_scheduler_unblock_conn_blocked(struct st_h2o_http3_req_scheduler_t *sched, h2o_http3_req_scheduler_compare_cb comp)
330 {
331     while (!h2o_linklist_is_empty(&sched->conn_blocked)) {
332         struct st_h2o_http3_req_scheduler_node_t *node =
333             H2O_STRUCT_FROM_MEMBER(struct st_h2o_http3_req_scheduler_node_t, link, sched->conn_blocked.next);
334         req_scheduler_activate(sched, node, comp);
335     }
336 }
337 
req_scheduler_compare_stream_id(struct st_h2o_http3_req_scheduler_t * sched,const struct st_h2o_http3_req_scheduler_node_t * x,const struct st_h2o_http3_req_scheduler_node_t * y)338 static int req_scheduler_compare_stream_id(struct st_h2o_http3_req_scheduler_t *sched,
339                                            const struct st_h2o_http3_req_scheduler_node_t *x,
340                                            const struct st_h2o_http3_req_scheduler_node_t *y)
341 {
342     struct st_h2o_http3_server_stream_t *sx = H2O_STRUCT_FROM_MEMBER(struct st_h2o_http3_server_stream_t, scheduler, x),
343                                         *sy = H2O_STRUCT_FROM_MEMBER(struct st_h2o_http3_server_stream_t, scheduler, y);
344     if (sx->quic->stream_id < sy->quic->stream_id) {
345         return -1;
346     } else if (sx->quic->stream_id > sy->quic->stream_id) {
347         return 1;
348     } else {
349         return 0;
350     }
351 }
352 
get_conn(struct st_h2o_http3_server_stream_t * stream)353 static struct st_h2o_http3_server_conn_t *get_conn(struct st_h2o_http3_server_stream_t *stream)
354 {
355     return (void *)stream->req.conn;
356 }
357 
get_state_counter(struct st_h2o_http3_server_conn_t * conn,enum h2o_http3_server_stream_state state)358 static uint32_t *get_state_counter(struct st_h2o_http3_server_conn_t *conn, enum h2o_http3_server_stream_state state)
359 {
360     return conn->num_streams.counters + (size_t)state;
361 }
362 
tunnel_on_udp_read(h2o_req_t * _req,h2o_iovec_t * datagrams,size_t num_datagrams)363 static void tunnel_on_udp_read(h2o_req_t *_req, h2o_iovec_t *datagrams, size_t num_datagrams)
364 {
365     struct st_h2o_http3_server_stream_t *stream = H2O_STRUCT_FROM_MEMBER(struct st_h2o_http3_server_stream_t, req, _req);
366     h2o_http3_send_h3_datagrams(&get_conn(stream)->h3, stream->datagram_flow_id, datagrams, num_datagrams);
367 }
368 
request_run_delayed(struct st_h2o_http3_server_conn_t * conn)369 static void request_run_delayed(struct st_h2o_http3_server_conn_t *conn)
370 {
371     if (!h2o_timer_is_linked(&conn->timeout))
372         h2o_timer_link(conn->super.ctx->loop, 0, &conn->timeout);
373 }
374 
check_run_blocked(struct st_h2o_http3_server_conn_t * conn)375 static void check_run_blocked(struct st_h2o_http3_server_conn_t *conn)
376 {
377     if (conn->num_streams.recv_body_unblocked + conn->num_streams_req_streaming == 0 &&
378         !h2o_linklist_is_empty(&conn->delayed_streams.recv_body_blocked))
379         request_run_delayed(conn);
380 }
381 
pre_dispose_request(struct st_h2o_http3_server_stream_t * stream)382 static void pre_dispose_request(struct st_h2o_http3_server_stream_t *stream)
383 {
384     struct st_h2o_http3_server_conn_t *conn = get_conn(stream);
385     size_t i;
386 
387     /* release vectors */
388     for (i = 0; i != stream->sendbuf.vecs.size; ++i) {
389         struct st_h2o_http3_server_sendvec_t *vec = stream->sendbuf.vecs.entries + i;
390         if (vec->vec.callbacks->update_refcnt != NULL)
391             vec->vec.callbacks->update_refcnt(&vec->vec, &stream->req, 0);
392     }
393 
394     /* dispose request body buffer */
395     if (stream->req_body != NULL)
396         h2o_buffer_dispose(&stream->req_body);
397 
398     /* clean up request streaming */
399     if (stream->req.write_req.cb != NULL && !stream->req.is_tunnel_req) {
400         assert(conn->num_streams_req_streaming != 0);
401         --conn->num_streams_req_streaming;
402         check_run_blocked(conn);
403     }
404 
405     /* remove stream from datagram flow list */
406     if (stream->datagram_flow_id != UINT64_MAX) {
407         khiter_t iter = kh_get(stream, conn->datagram_flows, stream->datagram_flow_id);
408         /* it's possible the tunnel wasn't established yet */
409         if (iter != kh_end(conn->datagram_flows))
410             kh_del(stream, conn->datagram_flows, iter);
411     }
412 
413     if (stream->req.is_tunnel_req)
414         --get_conn(stream)->num_streams_tunnelling;
415 }
416 
set_state(struct st_h2o_http3_server_stream_t * stream,enum h2o_http3_server_stream_state state,int in_generator)417 static void set_state(struct st_h2o_http3_server_stream_t *stream, enum h2o_http3_server_stream_state state, int in_generator)
418 {
419     struct st_h2o_http3_server_conn_t *conn = get_conn(stream);
420     enum h2o_http3_server_stream_state old_state = stream->state;
421 
422     H2O_PROBE_CONN(H3S_STREAM_SET_STATE, &conn->super, stream->quic->stream_id, (unsigned)state);
423 
424     --*get_state_counter(conn, old_state);
425     stream->state = state;
426     ++*get_state_counter(conn, stream->state);
427 
428     switch (state) {
429     case H2O_HTTP3_SERVER_STREAM_STATE_RECV_BODY_BLOCKED:
430         assert(conn->delayed_streams.recv_body_blocked.prev == &stream->link || !"stream is not registered to the recv_body list?");
431         break;
432     case H2O_HTTP3_SERVER_STREAM_STATE_CLOSE_WAIT: {
433         if (h2o_linklist_is_linked(&stream->link))
434             h2o_linklist_unlink(&stream->link);
435         pre_dispose_request(stream);
436         if (!in_generator) {
437             h2o_dispose_request(&stream->req);
438             stream->req_disposed = 1;
439         }
440         static const quicly_stream_callbacks_t close_wait_callbacks = {on_stream_destroy,
441                                                                        quicly_stream_noop_on_send_shift,
442                                                                        quicly_stream_noop_on_send_emit,
443                                                                        quicly_stream_noop_on_send_stop,
444                                                                        quicly_stream_noop_on_receive,
445                                                                        quicly_stream_noop_on_receive_reset};
446         stream->quic->callbacks = &close_wait_callbacks;
447     } break;
448     default:
449         break;
450     }
451 }
452 
453 /**
454  * Shutdowns a stream. Note that a request stream should not be shut down until receiving some QUIC frame that refers to that
455  * stream, but we might might have created stream state due to receiving a PRIORITY_UPDATE frame prior to that (see
456  * handle_priority_update_frame).
457  */
shutdown_stream(struct st_h2o_http3_server_stream_t * stream,int stop_sending_code,int reset_code,int in_generator)458 static void shutdown_stream(struct st_h2o_http3_server_stream_t *stream, int stop_sending_code, int reset_code, int in_generator)
459 {
460     assert(stream->state < H2O_HTTP3_SERVER_STREAM_STATE_CLOSE_WAIT);
461     if (quicly_stream_has_receive_side(0, stream->quic->stream_id)) {
462         quicly_request_stop(stream->quic, stop_sending_code);
463         h2o_buffer_consume(&stream->recvbuf.buf, stream->recvbuf.buf->size);
464     }
465     if (quicly_stream_has_send_side(0, stream->quic->stream_id) && !quicly_sendstate_transfer_complete(&stream->quic->sendstate))
466         quicly_reset_stream(stream->quic, reset_code);
467     set_state(stream, H2O_HTTP3_SERVER_STREAM_STATE_CLOSE_WAIT, in_generator);
468 }
469 
get_sockname(h2o_conn_t * _conn,struct sockaddr * sa)470 static socklen_t get_sockname(h2o_conn_t *_conn, struct sockaddr *sa)
471 {
472     struct st_h2o_http3_server_conn_t *conn = (void *)_conn;
473     struct sockaddr *src = quicly_get_sockname(conn->h3.super.quic);
474     socklen_t len = src->sa_family == AF_UNSPEC ? sizeof(struct sockaddr) : quicly_get_socklen(src);
475     memcpy(sa, src, len);
476     return len;
477 }
478 
get_peername(h2o_conn_t * _conn,struct sockaddr * sa)479 static socklen_t get_peername(h2o_conn_t *_conn, struct sockaddr *sa)
480 {
481     struct st_h2o_http3_server_conn_t *conn = (void *)_conn;
482     struct sockaddr *src = quicly_get_peername(conn->h3.super.quic);
483     socklen_t len = quicly_get_socklen(src);
484     memcpy(sa, src, len);
485     return len;
486 }
487 
get_ptls(h2o_conn_t * _conn)488 static ptls_t *get_ptls(h2o_conn_t *_conn)
489 {
490     struct st_h2o_http3_server_conn_t *conn = (void *)_conn;
491     return quicly_get_tls(conn->h3.super.quic);
492 }
493 
get_skip_tracing(h2o_conn_t * conn)494 static int get_skip_tracing(h2o_conn_t *conn)
495 {
496     ptls_t *ptls = get_ptls(conn);
497     return ptls_skip_tracing(ptls);
498 }
499 
num_reqs_inflight(h2o_conn_t * _conn)500 static uint32_t num_reqs_inflight(h2o_conn_t *_conn)
501 {
502     struct st_h2o_http3_server_conn_t *conn = (void *)_conn;
503     return quicly_num_streams_by_group(conn->h3.super.quic, 0, 0);
504 }
505 
get_tracer(h2o_conn_t * _conn)506 static quicly_tracer_t *get_tracer(h2o_conn_t *_conn)
507 {
508     struct st_h2o_http3_server_conn_t *conn = (void *)_conn;
509     return quicly_get_tracer(conn->h3.super.quic);
510 }
511 
log_cc_name(h2o_req_t * req)512 static h2o_iovec_t log_cc_name(h2o_req_t *req)
513 {
514     struct st_h2o_http3_server_conn_t *conn = (struct st_h2o_http3_server_conn_t *)req->conn;
515     quicly_stats_t stats;
516 
517     if (quicly_get_stats(conn->h3.super.quic, &stats) == 0)
518         return h2o_iovec_init(stats.cc.type->name, strlen(stats.cc.type->name));
519     return h2o_iovec_init(NULL, 0);
520 }
521 
log_delivery_rate(h2o_req_t * req)522 static h2o_iovec_t log_delivery_rate(h2o_req_t *req)
523 {
524     struct st_h2o_http3_server_conn_t *conn = (struct st_h2o_http3_server_conn_t *)req->conn;
525     quicly_rate_t rate;
526 
527     if (quicly_get_delivery_rate(conn->h3.super.quic, &rate) == 0 && rate.latest != 0) {
528         char *buf = h2o_mem_alloc_pool(&req->pool, char, sizeof(H2O_UINT64_LONGEST_STR));
529         size_t len = sprintf(buf, "%" PRIu64, rate.latest);
530         return h2o_iovec_init(buf, len);
531     }
532 
533     return h2o_iovec_init(NULL, 0);
534 }
535 
log_tls_protocol_version(h2o_req_t * _req)536 static h2o_iovec_t log_tls_protocol_version(h2o_req_t *_req)
537 {
538     return h2o_iovec_init(H2O_STRLIT("TLSv1.3"));
539 }
540 
log_session_reused(h2o_req_t * req)541 static h2o_iovec_t log_session_reused(h2o_req_t *req)
542 {
543     struct st_h2o_http3_server_conn_t *conn = (struct st_h2o_http3_server_conn_t *)req->conn;
544     ptls_t *tls = quicly_get_tls(conn->h3.super.quic);
545     return ptls_is_psk_handshake(tls) ? h2o_iovec_init(H2O_STRLIT("1")) : h2o_iovec_init(H2O_STRLIT("0"));
546 }
547 
log_cipher(h2o_req_t * req)548 static h2o_iovec_t log_cipher(h2o_req_t *req)
549 {
550     struct st_h2o_http3_server_conn_t *conn = (struct st_h2o_http3_server_conn_t *)req->conn;
551     ptls_t *tls = quicly_get_tls(conn->h3.super.quic);
552     ptls_cipher_suite_t *cipher = ptls_get_cipher(tls);
553     return cipher != NULL ? h2o_iovec_init(cipher->aead->name, strlen(cipher->aead->name)) : h2o_iovec_init(NULL, 0);
554 }
555 
log_cipher_bits(h2o_req_t * req)556 static h2o_iovec_t log_cipher_bits(h2o_req_t *req)
557 {
558     struct st_h2o_http3_server_conn_t *conn = (struct st_h2o_http3_server_conn_t *)req->conn;
559     ptls_t *tls = quicly_get_tls(conn->h3.super.quic);
560     ptls_cipher_suite_t *cipher = ptls_get_cipher(tls);
561     if (cipher == NULL)
562         return h2o_iovec_init(NULL, 0);
563 
564     char *buf = h2o_mem_alloc_pool(&req->pool, char, sizeof(H2O_UINT16_LONGEST_STR));
565     return h2o_iovec_init(buf, sprintf(buf, "%" PRIu16, (uint16_t)(cipher->aead->key_size * 8)));
566 }
567 
log_session_id(h2o_req_t * _req)568 static h2o_iovec_t log_session_id(h2o_req_t *_req)
569 {
570     /* FIXME */
571     return h2o_iovec_init(NULL, 0);
572 }
573 
log_server_name(h2o_req_t * req)574 static h2o_iovec_t log_server_name(h2o_req_t *req)
575 {
576     struct st_h2o_http3_server_conn_t *conn = (struct st_h2o_http3_server_conn_t *)req->conn;
577     ptls_t *tls = quicly_get_tls(conn->h3.super.quic);
578     const char *server_name = ptls_get_server_name(tls);
579     return server_name != NULL ? h2o_iovec_init(server_name, strlen(server_name)) : h2o_iovec_init(NULL, 0);
580 }
581 
log_negotiated_protocol(h2o_req_t * req)582 static h2o_iovec_t log_negotiated_protocol(h2o_req_t *req)
583 {
584     struct st_h2o_http3_server_conn_t *conn = (struct st_h2o_http3_server_conn_t *)req->conn;
585     ptls_t *tls = quicly_get_tls(conn->h3.super.quic);
586     const char *proto = ptls_get_negotiated_protocol(tls);
587     return proto != NULL ? h2o_iovec_init(proto, strlen(proto)) : h2o_iovec_init(NULL, 0);
588 }
589 
log_stream_id(h2o_req_t * _req)590 static h2o_iovec_t log_stream_id(h2o_req_t *_req)
591 {
592     struct st_h2o_http3_server_stream_t *stream = H2O_STRUCT_FROM_MEMBER(struct st_h2o_http3_server_stream_t, req, _req);
593     char *buf = h2o_mem_alloc_pool(&stream->req.pool, char, sizeof(H2O_UINT64_LONGEST_STR));
594     return h2o_iovec_init(buf, sprintf(buf, "%" PRIu64, stream->quic->stream_id));
595 }
596 
log_quic_stats(h2o_req_t * req)597 static h2o_iovec_t log_quic_stats(h2o_req_t *req)
598 {
599 #define APPLY_NUM_FRAMES(f, dir)                                                                                                   \
600     f(padding, dir) f(ping, dir) f(ack, dir) f(reset_stream, dir) f(stop_sending, dir) f(crypto, dir) f(new_token, dir)            \
601         f(stream, dir) f(max_data, dir) f(max_stream_data, dir) f(max_streams_bidi, dir) f(max_streams_uni, dir)                   \
602             f(data_blocked, dir) f(stream_data_blocked, dir) f(streams_blocked, dir) f(new_connection_id, dir)                     \
603                 f(retire_connection_id, dir) f(path_challenge, dir) f(path_response, dir) f(transport_close, dir)                  \
604                     f(application_close, dir) f(handshake_done, dir) f(ack_frequency, dir)
605 #define FORMAT_OF_NUM_FRAMES(n, dir) "," H2O_TO_STR(n) "-" H2O_TO_STR(dir) "=%" PRIu64
606 #define VALUE_OF_NUM_FRAMES(n, dir) , stats.num_frames_##dir.n
607 
608     struct st_h2o_http3_server_conn_t *conn = (struct st_h2o_http3_server_conn_t *)req->conn;
609     quicly_stats_t stats;
610 
611     if (quicly_get_stats(conn->h3.super.quic, &stats) != 0)
612         return h2o_iovec_init(H2O_STRLIT("-"));
613 
614     char *buf;
615     size_t len, bufsize = 1400;
616 Redo:
617     buf = h2o_mem_alloc_pool(&req->pool, char, bufsize);
618     len = snprintf(
619         buf, bufsize,
620         "packets-received=%" PRIu64 ",packets-decryption-failed=%" PRIu64 ",packets-sent=%" PRIu64 ",packets-lost=%" PRIu64
621         ",packets-lost-time-threshold=%" PRIu64 ",packets-ack-received=%" PRIu64 ",late-acked=%" PRIu64 ",bytes-received=%" PRIu64
622         ",bytes-sent=%" PRIu64 ",bytes-lost=%" PRIu64 ",bytes-ack-received=%" PRIu64 ",bytes-stream-data-sent=%" PRIu64
623         ",bytes-stream-data-resent=%" PRIu64 ",rtt-minimum=%" PRIu32 ",rtt-smoothed=%" PRIu32 ",rtt-variance=%" PRIu32
624         ",rtt-latest=%" PRIu32 ",cwnd=%" PRIu32 ",ssthresh=%" PRIu32 ",cwnd-initial=%" PRIu32 ",cwnd-exiting-slow-start=%" PRIu32
625         ",cwnd-minimum=%" PRIu32 ",cwnd-maximum=%" PRIu32 ",num-loss-episodes=%" PRIu32 ",num-ptos=%" PRIu64
626         ",delivery-rate-latest=%" PRIu64 ",delivery-rate-smoothed=%" PRIu64
627         ",delivery-rate-stdev=%" PRIu64 APPLY_NUM_FRAMES(FORMAT_OF_NUM_FRAMES, received)
628             APPLY_NUM_FRAMES(FORMAT_OF_NUM_FRAMES, sent),
629         stats.num_packets.received, stats.num_packets.decryption_failed, stats.num_packets.sent, stats.num_packets.lost,
630         stats.num_packets.lost_time_threshold, stats.num_packets.ack_received, stats.num_packets.late_acked,
631         stats.num_bytes.received, stats.num_bytes.sent, stats.num_bytes.lost, stats.num_bytes.ack_received,
632         stats.num_bytes.stream_data_sent, stats.num_bytes.stream_data_resent, stats.rtt.minimum, stats.rtt.smoothed,
633         stats.rtt.variance, stats.rtt.latest, stats.cc.cwnd, stats.cc.ssthresh, stats.cc.cwnd_initial,
634         stats.cc.cwnd_exiting_slow_start, stats.cc.cwnd_minimum, stats.cc.cwnd_maximum, stats.cc.num_loss_episodes, stats.num_ptos,
635         stats.delivery_rate.latest, stats.delivery_rate.smoothed,
636         stats.delivery_rate.stdev APPLY_NUM_FRAMES(VALUE_OF_NUM_FRAMES, received) APPLY_NUM_FRAMES(VALUE_OF_NUM_FRAMES, sent));
637     if (len + 1 > bufsize) {
638         bufsize = len + 1;
639         goto Redo;
640     }
641 
642     return h2o_iovec_init(buf, len);
643 
644 #undef APPLY_NUM_FRAMES
645 #undef FORMAT_OF_NUM_FRAMES
646 #undef VALUE_OF_NUM_FRAMES
647 }
648 
log_quic_version(h2o_req_t * _req)649 static h2o_iovec_t log_quic_version(h2o_req_t *_req)
650 {
651     struct st_h2o_http3_server_stream_t *stream = H2O_STRUCT_FROM_MEMBER(struct st_h2o_http3_server_stream_t, req, _req);
652     char *buf = h2o_mem_alloc_pool(&stream->req.pool, char, sizeof(H2O_UINT32_LONGEST_STR));
653     return h2o_iovec_init(buf, sprintf(buf, "%" PRIu32, quicly_get_protocol_version(stream->quic->conn)));
654 }
655 
on_stream_destroy(quicly_stream_t * qs,int err)656 void on_stream_destroy(quicly_stream_t *qs, int err)
657 {
658     struct st_h2o_http3_server_stream_t *stream = qs->data;
659     struct st_h2o_http3_server_conn_t *conn = get_conn(stream);
660 
661     --*get_state_counter(conn, stream->state);
662 
663     req_scheduler_deactivate(&conn->scheduler.reqs, &stream->scheduler);
664 
665     if (h2o_linklist_is_linked(&stream->link))
666         h2o_linklist_unlink(&stream->link);
667     if (stream->state != H2O_HTTP3_SERVER_STREAM_STATE_CLOSE_WAIT)
668         pre_dispose_request(stream);
669     if (!stream->req_disposed)
670         h2o_dispose_request(&stream->req);
671     /* in case the stream is destroyed before the buffer is fully consumed */
672     h2o_buffer_dispose(&stream->recvbuf.buf);
673 
674     free(stream);
675 }
676 
allocated_vec_update_refcnt(h2o_sendvec_t * vec,h2o_req_t * req,int is_incr)677 static void allocated_vec_update_refcnt(h2o_sendvec_t *vec, h2o_req_t *req, int is_incr)
678 {
679     assert(!is_incr);
680     free(vec->raw);
681 }
682 
retain_sendvecs(struct st_h2o_http3_server_stream_t * stream)683 static int retain_sendvecs(struct st_h2o_http3_server_stream_t *stream)
684 {
685     for (; stream->sendbuf.min_index_to_addref != stream->sendbuf.vecs.size; ++stream->sendbuf.min_index_to_addref) {
686         struct st_h2o_http3_server_sendvec_t *vec = stream->sendbuf.vecs.entries + stream->sendbuf.min_index_to_addref;
687         /* create a copy if it does not provide update_refcnt (update_refcnt is already called in do_send, if available) */
688         if (vec->vec.callbacks->update_refcnt == NULL) {
689             static const h2o_sendvec_callbacks_t vec_callbacks = {h2o_sendvec_flatten_raw, allocated_vec_update_refcnt};
690             size_t off_within_vec = stream->sendbuf.min_index_to_addref == 0 ? stream->sendbuf.off_within_first_vec : 0;
691             h2o_iovec_t copy = h2o_iovec_init(h2o_mem_alloc(vec->vec.len - off_within_vec), vec->vec.len - off_within_vec);
692             if (!(*vec->vec.callbacks->flatten)(&vec->vec, &stream->req, copy, off_within_vec)) {
693                 free(copy.base);
694                 return 0;
695             }
696             vec->vec = (h2o_sendvec_t){&vec_callbacks, copy.len, {copy.base}};
697             if (stream->sendbuf.min_index_to_addref == 0)
698                 stream->sendbuf.off_within_first_vec = 0;
699         }
700     }
701 
702     return 1;
703 }
704 
on_send_shift(quicly_stream_t * qs,size_t delta)705 static void on_send_shift(quicly_stream_t *qs, size_t delta)
706 {
707     struct st_h2o_http3_server_stream_t *stream = qs->data;
708     size_t i;
709 
710     assert(stream->state == H2O_HTTP3_SERVER_STREAM_STATE_SEND_HEADERS || stream->state == H2O_HTTP3_SERVER_STREAM_STATE_SEND_BODY);
711     assert(delta != 0);
712     assert(stream->sendbuf.vecs.size != 0);
713 
714     size_t bytes_avail_in_first_vec = stream->sendbuf.vecs.entries[0].vec.len - stream->sendbuf.off_within_first_vec;
715     if (delta < bytes_avail_in_first_vec) {
716         stream->sendbuf.off_within_first_vec += delta;
717         return;
718     }
719     delta -= bytes_avail_in_first_vec;
720     stream->sendbuf.off_within_first_vec = 0;
721     if (stream->sendbuf.vecs.entries[0].vec.callbacks->update_refcnt != NULL)
722         stream->sendbuf.vecs.entries[0].vec.callbacks->update_refcnt(&stream->sendbuf.vecs.entries[0].vec, &stream->req, 0);
723 
724     for (i = 1; delta != 0; ++i) {
725         assert(i < stream->sendbuf.vecs.size);
726         if (delta < stream->sendbuf.vecs.entries[i].vec.len) {
727             stream->sendbuf.off_within_first_vec = delta;
728             break;
729         }
730         delta -= stream->sendbuf.vecs.entries[i].vec.len;
731         if (stream->sendbuf.vecs.entries[i].vec.callbacks->update_refcnt != NULL)
732             stream->sendbuf.vecs.entries[i].vec.callbacks->update_refcnt(&stream->sendbuf.vecs.entries[i].vec, &stream->req, 0);
733     }
734     memmove(stream->sendbuf.vecs.entries, stream->sendbuf.vecs.entries + i,
735             (stream->sendbuf.vecs.size - i) * sizeof(stream->sendbuf.vecs.entries[0]));
736     stream->sendbuf.vecs.size -= i;
737     if (stream->sendbuf.min_index_to_addref <= i) {
738         stream->sendbuf.min_index_to_addref = 0;
739     } else {
740         stream->sendbuf.min_index_to_addref -= i;
741     }
742 
743     if (stream->sendbuf.vecs.size == 0) {
744         if (quicly_sendstate_is_open(&stream->quic->sendstate)) {
745             assert(stream->state == H2O_HTTP3_SERVER_STREAM_STATE_SEND_HEADERS || stream->proceed_requested);
746         } else {
747             if (quicly_stream_has_receive_side(0, stream->quic->stream_id))
748                 quicly_request_stop(stream->quic, H2O_HTTP3_ERROR_EARLY_RESPONSE);
749             set_state(stream, H2O_HTTP3_SERVER_STREAM_STATE_CLOSE_WAIT, 0);
750         }
751     }
752 }
753 
on_send_emit(quicly_stream_t * qs,size_t off,void * _dst,size_t * len,int * wrote_all)754 static void on_send_emit(quicly_stream_t *qs, size_t off, void *_dst, size_t *len, int *wrote_all)
755 {
756     struct st_h2o_http3_server_stream_t *stream = qs->data;
757 
758     assert(stream->state == H2O_HTTP3_SERVER_STREAM_STATE_SEND_HEADERS || stream->state == H2O_HTTP3_SERVER_STREAM_STATE_SEND_BODY);
759 
760     uint8_t *dst = _dst, *dst_end = dst + *len;
761     size_t vec_index = 0;
762 
763     /* find the start position identified by vec_index and off */
764     off += stream->sendbuf.off_within_first_vec;
765     while (off != 0) {
766         assert(vec_index < stream->sendbuf.vecs.size);
767         if (off < stream->sendbuf.vecs.entries[vec_index].vec.len)
768             break;
769         off -= stream->sendbuf.vecs.entries[vec_index].vec.len;
770         ++vec_index;
771     }
772     assert(vec_index < stream->sendbuf.vecs.size);
773 
774     /* write */
775     *wrote_all = 0;
776     do {
777         struct st_h2o_http3_server_sendvec_t *this_vec = stream->sendbuf.vecs.entries + vec_index;
778         size_t sz = this_vec->vec.len - off;
779         if (dst_end - dst < sz)
780             sz = dst_end - dst;
781         if (!(this_vec->vec.callbacks->flatten)(&this_vec->vec, &stream->req, h2o_iovec_init(dst, sz), off))
782             goto Error;
783         if (this_vec->entity_offset != UINT64_MAX && stream->req.bytes_sent < this_vec->entity_offset + off + sz)
784             stream->req.bytes_sent = this_vec->entity_offset + off + sz;
785         dst += sz;
786         off += sz;
787         /* when reaching the end of the current vector, update vec_index, wrote_all */
788         if (off == this_vec->vec.len) {
789             off = 0;
790             ++vec_index;
791             if (vec_index == stream->sendbuf.vecs.size) {
792                 *wrote_all = 1;
793                 break;
794             }
795         }
796     } while (dst != dst_end);
797 
798     *len = dst - (uint8_t *)_dst;
799 
800     /* retain the payload of response body before calling `h2o_proceed_request`, as the generator might discard the buffer */
801     if (stream->state == H2O_HTTP3_SERVER_STREAM_STATE_SEND_BODY && *wrote_all &&
802         quicly_sendstate_is_open(&stream->quic->sendstate) && !stream->proceed_requested) {
803         if (!retain_sendvecs(stream))
804             goto Error;
805         stream->proceed_requested = 1;
806         stream->proceed_while_sending = 1;
807     }
808 
809     return;
810 Error:
811     *len = 0;
812     *wrote_all = 1;
813     shutdown_stream(stream, H2O_HTTP3_ERROR_EARLY_RESPONSE, H2O_HTTP3_ERROR_INTERNAL, 0);
814 }
815 
on_send_stop(quicly_stream_t * qs,int err)816 static void on_send_stop(quicly_stream_t *qs, int err)
817 {
818     struct st_h2o_http3_server_stream_t *stream = qs->data;
819 
820     shutdown_stream(stream, H2O_HTTP3_ERROR_REQUEST_CANCELLED, err, 0);
821 }
822 
handle_buffered_input(struct st_h2o_http3_server_stream_t * stream,int in_generator)823 static void handle_buffered_input(struct st_h2o_http3_server_stream_t *stream, int in_generator)
824 {
825     struct st_h2o_http3_server_conn_t *conn = get_conn(stream);
826 
827     if (stream->state >= H2O_HTTP3_SERVER_STREAM_STATE_CLOSE_WAIT)
828         return;
829 
830     { /* Process contiguous bytes in the receive buffer until one of the following conditions are reached:
831        * a) connection- or stream-level error (i.e., state advanced to CLOSE_WAIT) is detected - in which case we exit,
832        * b) incomplete frame is detected - wait for more (if the stream is open) or raise a connection error, or
833        * c) all bytes are processed - exit the loop. */
834         size_t bytes_available = quicly_recvstate_bytes_available(&stream->quic->recvstate);
835         assert(bytes_available <= stream->recvbuf.buf->size);
836         const uint8_t *src = (const uint8_t *)stream->recvbuf.buf->bytes, *src_end = src + bytes_available;
837         while (src != src_end) {
838             int err;
839             const char *err_desc = NULL;
840             if ((err = stream->recvbuf.handle_input(stream, &src, src_end, in_generator, &err_desc)) != 0) {
841                 if (err == H2O_HTTP3_ERROR_INCOMPLETE) {
842                     if (!quicly_recvstate_transfer_complete(&stream->quic->recvstate))
843                         break;
844                     err = H2O_HTTP3_ERROR_GENERAL_PROTOCOL;
845                     err_desc = "incomplete frame";
846                 }
847                 h2o_quic_close_connection(&conn->h3.super, err, err_desc);
848                 return;
849             } else if (stream->state >= H2O_HTTP3_SERVER_STREAM_STATE_CLOSE_WAIT) {
850                 return;
851             }
852         }
853         /* Processed zero or more bytes without noticing an error; shift the bytes that have been processed as frames. */
854         size_t bytes_consumed = src - (const uint8_t *)stream->recvbuf.buf->bytes;
855         h2o_buffer_consume(&stream->recvbuf.buf, bytes_consumed);
856         quicly_stream_sync_recvbuf(stream->quic, bytes_consumed);
857     }
858 
859     if (quicly_recvstate_transfer_complete(&stream->quic->recvstate)) {
860         if (stream->recvbuf.buf->size == 0 && (stream->recvbuf.handle_input == handle_input_expect_data ||
861                                                stream->recvbuf.handle_input == handle_input_post_trailers)) {
862             /* have complete request, advance the state and process the request */
863             if (stream->req.content_length != SIZE_MAX && stream->req.content_length != stream->req.req_body_bytes_received) {
864                 /* the request terminated abruptly; reset the stream as we do for HTTP/2 */
865                 shutdown_stream(stream, H2O_HTTP3_ERROR_NONE /* ignored */,
866                                 stream->req.req_body_bytes_received < stream->req.content_length
867                                     ? H2O_HTTP3_ERROR_REQUEST_INCOMPLETE
868                                     : H2O_HTTP3_ERROR_GENERAL_PROTOCOL,
869                                 in_generator);
870             } else {
871                 if (stream->req.write_req.cb != NULL) {
872                     if (!h2o_linklist_is_linked(&stream->link))
873                         h2o_linklist_insert(&conn->delayed_streams.req_streaming, &stream->link);
874                     request_run_delayed(conn);
875                 } else if (!stream->req.process_called && stream->state < H2O_HTTP3_SERVER_STREAM_STATE_SEND_HEADERS) {
876                     /* process the request, if we haven't called h2o_process_request nor send an error response */
877                     switch (stream->state) {
878                     case H2O_HTTP3_SERVER_STREAM_STATE_RECV_HEADERS:
879                     case H2O_HTTP3_SERVER_STREAM_STATE_RECV_BODY_BEFORE_BLOCK:
880                     case H2O_HTTP3_SERVER_STREAM_STATE_RECV_BODY_UNBLOCKED:
881                         break;
882                     default:
883                         assert(!"unexpected state");
884                         break;
885                     }
886                     set_state(stream, H2O_HTTP3_SERVER_STREAM_STATE_REQ_PENDING, in_generator);
887                     h2o_linklist_insert(&conn->delayed_streams.pending, &stream->link);
888                     request_run_delayed(conn);
889                 }
890             }
891         } else {
892             shutdown_stream(stream, H2O_HTTP3_ERROR_NONE /* ignored */, H2O_HTTP3_ERROR_REQUEST_INCOMPLETE, in_generator);
893         }
894     } else {
895         if (stream->state == H2O_HTTP3_SERVER_STREAM_STATE_RECV_BODY_BEFORE_BLOCK && stream->req_body != NULL &&
896             stream->req_body->size >= H2O_HTTP3_REQUEST_BODY_MIN_BYTES_TO_BLOCK) {
897             /* switch to blocked state if the request body is becoming large (this limits the concurrency to the backend) */
898             stream->read_blocked = 1;
899             h2o_linklist_insert(&conn->delayed_streams.recv_body_blocked, &stream->link);
900             set_state(stream, H2O_HTTP3_SERVER_STREAM_STATE_RECV_BODY_BLOCKED, in_generator);
901             check_run_blocked(conn);
902         } else if (stream->req.write_req.cb != NULL && stream->req_body->size != 0) {
903             /* in streaming mode, let the run_delayed invoke write_req */
904             if (!h2o_linklist_is_linked(&stream->link))
905                 h2o_linklist_insert(&conn->delayed_streams.req_streaming, &stream->link);
906             request_run_delayed(conn);
907         }
908     }
909 }
910 
on_receive(quicly_stream_t * qs,size_t off,const void * input,size_t len)911 static void on_receive(quicly_stream_t *qs, size_t off, const void *input, size_t len)
912 {
913     struct st_h2o_http3_server_stream_t *stream = qs->data;
914 
915     /* save received data (FIXME avoid copying if possible; see hqclient.c) */
916     h2o_http3_update_recvbuf(&stream->recvbuf.buf, off, input, len);
917 
918     if (stream->read_blocked)
919         return;
920 
921     /* handle input (FIXME propage err_desc) */
922     handle_buffered_input(stream, 0);
923 }
924 
on_receive_reset(quicly_stream_t * qs,int err)925 static void on_receive_reset(quicly_stream_t *qs, int err)
926 {
927     struct st_h2o_http3_server_stream_t *stream = qs->data;
928 
929     shutdown_stream(stream, H2O_HTTP3_ERROR_NONE /* ignored */,
930                     stream->state == H2O_HTTP3_SERVER_STREAM_STATE_RECV_HEADERS ? H2O_HTTP3_ERROR_REQUEST_REJECTED
931                                                                                 : H2O_HTTP3_ERROR_REQUEST_CANCELLED,
932                     0);
933 }
934 
proceed_request_streaming(h2o_req_t * _req,const char * errstr)935 static void proceed_request_streaming(h2o_req_t *_req, const char *errstr)
936 {
937     struct st_h2o_http3_server_stream_t *stream = H2O_STRUCT_FROM_MEMBER(struct st_h2o_http3_server_stream_t, req, _req);
938     struct st_h2o_http3_server_conn_t *conn = get_conn(stream);
939 
940     assert(stream->req_body != NULL);
941     assert(!h2o_linklist_is_linked(&stream->link));
942     assert(conn->num_streams_req_streaming != 0 || stream->req.is_tunnel_req);
943 
944     if (errstr != NULL || quicly_recvstate_transfer_complete(&stream->quic->recvstate)) {
945         /* tidy up the request streaming */
946         stream->req.write_req.cb = NULL;
947         stream->req.write_req.ctx = NULL;
948         stream->req.proceed_req = NULL;
949         if (!stream->req.is_tunnel_req)
950             --conn->num_streams_req_streaming;
951         check_run_blocked(conn);
952         /* close the stream if an error occurred */
953         if (errstr != NULL) {
954             shutdown_stream(stream, H2O_HTTP3_ERROR_INTERNAL, H2O_HTTP3_ERROR_INTERNAL, 1);
955             return;
956         }
957     }
958 
959     /* remove the bytes from the request body buffer */
960     assert(stream->req.entity.len == stream->req_body->size);
961     h2o_buffer_consume(&stream->req_body, stream->req_body->size);
962     stream->req.entity = h2o_iovec_init(NULL, 0);
963 
964     /* unblock read until the next invocation of write_req, or after the final invocation */
965     stream->read_blocked = 0;
966 
967     /* handle input in the receive buffer */
968     handle_buffered_input(stream, 1);
969 }
970 
run_delayed(h2o_timer_t * timer)971 static void run_delayed(h2o_timer_t *timer)
972 {
973     struct st_h2o_http3_server_conn_t *conn = H2O_STRUCT_FROM_MEMBER(struct st_h2o_http3_server_conn_t, timeout, timer);
974     int made_progress;
975 
976     do {
977         made_progress = 0;
978 
979         /* promote blocked stream to unblocked state, if possible */
980         if (conn->num_streams.recv_body_unblocked + conn->num_streams_req_streaming == 0 &&
981             !h2o_linklist_is_empty(&conn->delayed_streams.recv_body_blocked)) {
982             struct st_h2o_http3_server_stream_t *stream =
983                 H2O_STRUCT_FROM_MEMBER(struct st_h2o_http3_server_stream_t, link, conn->delayed_streams.recv_body_blocked.next);
984             assert(stream->state == H2O_HTTP3_SERVER_STREAM_STATE_RECV_BODY_BLOCKED);
985             assert(stream->read_blocked);
986             h2o_linklist_unlink(&stream->link);
987             made_progress = 1;
988             quicly_stream_set_receive_window(stream->quic, conn->super.ctx->globalconf->http3.active_stream_window_size);
989             if (h2o_req_can_stream_request(&stream->req)) {
990                 /* use streaming mode */
991                 ++conn->num_streams_req_streaming;
992                 stream->req.proceed_req = proceed_request_streaming;
993                 set_state(stream, H2O_HTTP3_SERVER_STREAM_STATE_SEND_HEADERS, 0);
994                 h2o_process_request(&stream->req);
995             } else {
996                 /* unblock, read the bytes in receive buffer */
997                 stream->read_blocked = 0;
998                 set_state(stream, H2O_HTTP3_SERVER_STREAM_STATE_RECV_BODY_UNBLOCKED, 0);
999                 handle_buffered_input(stream, 0);
1000                 if (quicly_get_state(conn->h3.super.quic) >= QUICLY_STATE_CLOSING)
1001                     return;
1002             }
1003         }
1004 
1005         /* process streams using request streaming, that have new data to submit */
1006         while (!h2o_linklist_is_empty(&conn->delayed_streams.req_streaming)) {
1007             struct st_h2o_http3_server_stream_t *stream =
1008                 H2O_STRUCT_FROM_MEMBER(struct st_h2o_http3_server_stream_t, link, conn->delayed_streams.req_streaming.next);
1009             int is_end_stream = quicly_recvstate_transfer_complete(&stream->quic->recvstate);
1010             assert(stream->req.process_called);
1011             assert(stream->req.write_req.cb != NULL);
1012             assert(stream->req_body != NULL);
1013             assert(stream->req_body->size != 0 || is_end_stream);
1014             assert(!stream->read_blocked);
1015             h2o_linklist_unlink(&stream->link);
1016             stream->read_blocked = 1;
1017             made_progress = 1;
1018             assert(stream->req.entity.len == stream->req_body->size &&
1019                    (stream->req.entity.len == 0 || stream->req.entity.base == stream->req_body->bytes));
1020             if (stream->req.write_req.cb(stream->req.write_req.ctx, is_end_stream) != 0)
1021                 shutdown_stream(stream, H2O_HTTP3_ERROR_INTERNAL, H2O_HTTP3_ERROR_INTERNAL, 0);
1022         }
1023 
1024         /* process the requests (not in streaming mode); TODO cap concurrency? */
1025         while (!h2o_linklist_is_empty(&conn->delayed_streams.pending)) {
1026             struct st_h2o_http3_server_stream_t *stream =
1027                 H2O_STRUCT_FROM_MEMBER(struct st_h2o_http3_server_stream_t, link, conn->delayed_streams.pending.next);
1028             assert(stream->state == H2O_HTTP3_SERVER_STREAM_STATE_REQ_PENDING);
1029             assert(!stream->req.process_called);
1030             assert(!stream->read_blocked);
1031             h2o_linklist_unlink(&stream->link);
1032             made_progress = 1;
1033             set_state(stream, H2O_HTTP3_SERVER_STREAM_STATE_SEND_HEADERS, 0);
1034             h2o_process_request(&stream->req);
1035         }
1036 
1037     } while (made_progress);
1038 }
1039 
handle_input_post_trailers(struct st_h2o_http3_server_stream_t * stream,const uint8_t ** src,const uint8_t * src_end,int in_generator,const char ** err_desc)1040 int handle_input_post_trailers(struct st_h2o_http3_server_stream_t *stream, const uint8_t **src, const uint8_t *src_end,
1041                                int in_generator, const char **err_desc)
1042 {
1043     h2o_http3_read_frame_t frame;
1044     int ret;
1045 
1046     /* read and ignore unknown frames */
1047     if ((ret = h2o_http3_read_frame(&frame, 0, H2O_HTTP3_STREAM_TYPE_REQUEST, src, src_end, err_desc)) != 0)
1048         return ret;
1049     switch (frame.type) {
1050     case H2O_HTTP3_FRAME_TYPE_HEADERS:
1051     case H2O_HTTP3_FRAME_TYPE_DATA:
1052         return H2O_HTTP3_ERROR_FRAME_UNEXPECTED;
1053     default:
1054         break;
1055     }
1056 
1057     return 0;
1058 }
1059 
handle_input_expect_data_payload(struct st_h2o_http3_server_stream_t * stream,const uint8_t ** src,const uint8_t * src_end,int in_generator,const char ** err_desc)1060 static int handle_input_expect_data_payload(struct st_h2o_http3_server_stream_t *stream, const uint8_t **src,
1061                                             const uint8_t *src_end, int in_generator, const char **err_desc)
1062 {
1063     size_t bytes_avail = src_end - *src;
1064 
1065     /* append data to body buffer */
1066     if (bytes_avail > stream->recvbuf.bytes_left_in_data_frame)
1067         bytes_avail = stream->recvbuf.bytes_left_in_data_frame;
1068     if (stream->req_body == NULL)
1069         h2o_buffer_init(&stream->req_body, &h2o_socket_buffer_prototype);
1070     if (!h2o_buffer_try_append(&stream->req_body, *src, bytes_avail))
1071         return H2O_HTTP3_ERROR_INTERNAL;
1072     stream->req.entity = h2o_iovec_init(stream->req_body->bytes, stream->req_body->size);
1073     stream->req.req_body_bytes_received += bytes_avail;
1074     stream->recvbuf.bytes_left_in_data_frame -= bytes_avail;
1075     *src += bytes_avail;
1076 
1077     if (stream->recvbuf.bytes_left_in_data_frame == 0)
1078         stream->recvbuf.handle_input = handle_input_expect_data;
1079 
1080     return 0;
1081 }
1082 
handle_input_expect_data(struct st_h2o_http3_server_stream_t * stream,const uint8_t ** src,const uint8_t * src_end,int in_generator,const char ** err_desc)1083 int handle_input_expect_data(struct st_h2o_http3_server_stream_t *stream, const uint8_t **src, const uint8_t *src_end,
1084                              int in_generator, const char **err_desc)
1085 {
1086     h2o_http3_read_frame_t frame;
1087     int ret;
1088 
1089     /* read frame */
1090     if ((ret = h2o_http3_read_frame(&frame, 0, H2O_HTTP3_STREAM_TYPE_REQUEST, src, src_end, err_desc)) != 0)
1091         return ret;
1092     switch (frame.type) {
1093     case H2O_HTTP3_FRAME_TYPE_HEADERS:
1094         /* when in tunnel mode, trailers forbidden */
1095         if (stream->req.is_tunnel_req) {
1096             *err_desc = "unexpected frame type";
1097             return H2O_HTTP3_ERROR_FRAME_UNEXPECTED;
1098         }
1099         /* trailers, ignore but disallow succeeding DATA or HEADERS frame */
1100         stream->recvbuf.handle_input = handle_input_post_trailers;
1101         return 0;
1102     case H2O_HTTP3_FRAME_TYPE_DATA:
1103         if (stream->req.content_length != SIZE_MAX &&
1104             stream->req.content_length - stream->req.req_body_bytes_received < frame.length) {
1105             /* The only viable option here is to reset the stream, as we might have already started streaming the request body
1106              * upstream. This behavior is consistent with what we do in HTTP/2. */
1107             shutdown_stream(stream, H2O_HTTP3_ERROR_EARLY_RESPONSE, H2O_HTTP3_ERROR_GENERAL_PROTOCOL, in_generator);
1108             return 0;
1109         }
1110         break;
1111     default:
1112         return 0;
1113     }
1114 
1115     /* got a DATA frame */
1116     if (frame.length != 0) {
1117         stream->recvbuf.handle_input = handle_input_expect_data_payload;
1118         stream->recvbuf.bytes_left_in_data_frame = frame.length;
1119     }
1120 
1121     return 0;
1122 }
1123 
handle_input_expect_headers_send_http_error(struct st_h2o_http3_server_stream_t * stream,void (* sendfn)(h2o_req_t *,const char *,const char *,int),const char * reason,const char * body,const char ** err_desc)1124 static int handle_input_expect_headers_send_http_error(struct st_h2o_http3_server_stream_t *stream,
1125                                                        void (*sendfn)(h2o_req_t *, const char *, const char *, int),
1126                                                        const char *reason, const char *body, const char **err_desc)
1127 {
1128     if (!quicly_recvstate_transfer_complete(&stream->quic->recvstate))
1129         quicly_request_stop(stream->quic, H2O_HTTP3_ERROR_EARLY_RESPONSE);
1130 
1131     set_state(stream, H2O_HTTP3_SERVER_STREAM_STATE_SEND_HEADERS, 0);
1132     sendfn(&stream->req, reason, body, 0);
1133     *err_desc = NULL;
1134 
1135     return 0;
1136 }
1137 
handle_input_expect_headers_process_connect(struct st_h2o_http3_server_stream_t * stream,h2o_iovec_t * datagram_flow_id_field,const char ** err_desc)1138 static int handle_input_expect_headers_process_connect(struct st_h2o_http3_server_stream_t *stream,
1139                                                        h2o_iovec_t *datagram_flow_id_field, const char **err_desc)
1140 {
1141     if (stream->req.content_length != SIZE_MAX)
1142         return handle_input_expect_headers_send_http_error(stream, h2o_send_error_400, "Invalid Request",
1143                                                            "CONNECT request cannot have request body", err_desc);
1144 
1145     uint64_t datagram_flow_id = UINT64_MAX;
1146     if (datagram_flow_id_field != NULL) {
1147         /* CONNECT-UDP */
1148         if (datagram_flow_id_field->base != NULL) {
1149             /* check if it can be used */
1150             if (!h2o_http3_can_use_h3_datagram(&get_conn(stream)->h3)) {
1151                 *err_desc = "unexpected h3 datagram";
1152                 return H2O_HTTP3_ERROR_GENERAL_PROTOCOL;
1153             }
1154             /* TODO implement proper parsing */
1155             datagram_flow_id = 0;
1156             for (const char *p = datagram_flow_id_field->base; p != datagram_flow_id_field->base + datagram_flow_id_field->len;
1157                  ++p) {
1158                 if (!('0' <= *p && *p <= '9'))
1159                     break;
1160                 datagram_flow_id = datagram_flow_id * 10 + *p - '0';
1161             }
1162         }
1163     }
1164 
1165     stream->req.is_tunnel_req = 1;
1166     h2o_buffer_init(&stream->req_body, &h2o_socket_buffer_prototype);
1167     stream->req.entity = h2o_iovec_init("", 0);
1168     stream->req.proceed_req = proceed_request_streaming;
1169     stream->datagram_flow_id = datagram_flow_id;
1170     ++get_conn(stream)->num_streams_tunnelling;
1171     set_state(stream, H2O_HTTP3_SERVER_STREAM_STATE_SEND_HEADERS, 0);
1172     quicly_stream_set_receive_window(stream->quic, get_conn(stream)->super.ctx->globalconf->http3.active_stream_window_size);
1173     h2o_process_request(&stream->req);
1174 
1175     return 0;
1176 }
1177 
handle_input_expect_headers(struct st_h2o_http3_server_stream_t * stream,const uint8_t ** src,const uint8_t * src_end,int in_generator,const char ** err_desc)1178 static int handle_input_expect_headers(struct st_h2o_http3_server_stream_t *stream, const uint8_t **src, const uint8_t *src_end,
1179                                        int in_generator, const char **err_desc)
1180 {
1181     assert(!in_generator); /* this function is processing headers (before generators get assigned), not trailers */
1182 
1183     struct st_h2o_http3_server_conn_t *conn = get_conn(stream);
1184     h2o_http3_read_frame_t frame;
1185     int header_exists_map = 0, ret;
1186     h2o_iovec_t datagram_flow_id = {};
1187     uint8_t header_ack[H2O_HPACK_ENCODE_INT_MAX_LENGTH];
1188     size_t header_ack_len;
1189 
1190     /* read the HEADERS frame (or a frame that precedes that) */
1191     if ((ret = h2o_http3_read_frame(&frame, 0, H2O_HTTP3_STREAM_TYPE_REQUEST, src, src_end, err_desc)) != 0)
1192         return ret;
1193     if (frame.type != H2O_HTTP3_FRAME_TYPE_HEADERS) {
1194         switch (frame.type) {
1195         case H2O_HTTP3_FRAME_TYPE_DATA:
1196             return H2O_HTTP3_ERROR_FRAME_UNEXPECTED;
1197         default:
1198             break;
1199         }
1200         return 0;
1201     }
1202     stream->recvbuf.handle_input = handle_input_expect_data;
1203 
1204     /* parse the headers, and ack */
1205     if ((ret = h2o_qpack_parse_request(&stream->req.pool, get_conn(stream)->h3.qpack.dec, stream->quic->stream_id,
1206                                        &stream->req.input.method, &stream->req.input.scheme, &stream->req.input.authority,
1207                                        &stream->req.input.path, &stream->req.headers, &header_exists_map,
1208                                        &stream->req.content_length, NULL /* TODO cache-digests */, &datagram_flow_id, header_ack,
1209                                        &header_ack_len, frame.payload, frame.length, err_desc)) != 0 &&
1210         ret != H2O_HTTP2_ERROR_INVALID_HEADER_CHAR)
1211         return ret;
1212     if (header_ack_len != 0)
1213         h2o_http3_send_qpack_header_ack(&conn->h3, header_ack, header_ack_len);
1214 
1215     if (stream->req.input.scheme == NULL)
1216         stream->req.input.scheme = &H2O_URL_SCHEME_HTTPS;
1217 
1218     h2o_probe_log_request(&stream->req, stream->quic->stream_id);
1219 
1220     int is_connect = h2o_memis(stream->req.input.method.base, stream->req.input.method.len, H2O_STRLIT("CONNECT"));
1221     int is_connect_udp = h2o_memis(stream->req.input.method.base, stream->req.input.method.len, H2O_STRLIT("CONNECT-UDP"));
1222 
1223     /* check if existence and non-existence of pseudo headers are correct */
1224     int expected_map = H2O_HPACK_PARSE_HEADERS_METHOD_EXISTS | H2O_HPACK_PARSE_HEADERS_AUTHORITY_EXISTS;
1225     if (!is_connect && !is_connect_udp)
1226         expected_map |= H2O_HPACK_PARSE_HEADERS_SCHEME_EXISTS | H2O_HPACK_PARSE_HEADERS_PATH_EXISTS;
1227     if (is_connect_udp) {
1228         /* only require method and authority for connect-udp for now, ignore if the others are set */
1229         if ((header_exists_map & expected_map) != expected_map) {
1230             shutdown_stream(stream, H2O_HTTP3_ERROR_GENERAL_PROTOCOL, H2O_HTTP3_ERROR_GENERAL_PROTOCOL, 0);
1231             return 0;
1232         }
1233     } else {
1234         if (header_exists_map != expected_map) {
1235             shutdown_stream(stream, H2O_HTTP3_ERROR_GENERAL_PROTOCOL, H2O_HTTP3_ERROR_GENERAL_PROTOCOL, 0);
1236             return 0;
1237         }
1238     }
1239 
1240     /* send a 400 error when observing an invalid header character */
1241     if (ret == H2O_HTTP2_ERROR_INVALID_HEADER_CHAR)
1242         return handle_input_expect_headers_send_http_error(stream, h2o_send_error_400, "Invalid Request", *err_desc, err_desc);
1243 
1244     /* validate semantic requirement */
1245     if (!h2o_req_validate_pseudo_headers(&stream->req)) {
1246         *err_desc = "invalid pseudo headers";
1247         return H2O_HTTP3_ERROR_GENERAL_PROTOCOL;
1248     }
1249 
1250     /* check if content-length is within the permitted bounds */
1251     if (stream->req.content_length != SIZE_MAX && stream->req.content_length > conn->super.ctx->globalconf->max_request_entity_size)
1252         return handle_input_expect_headers_send_http_error(stream, h2o_send_error_413, "Request Entity Too Large",
1253                                                            "request entity is too large", err_desc);
1254 
1255     /* set priority */
1256     assert(!h2o_linklist_is_linked(&stream->scheduler.link));
1257     if (!stream->received_priority_update) {
1258         ssize_t index;
1259         if ((index = h2o_find_header(&stream->req.headers, H2O_TOKEN_PRIORITY, -1)) != -1) {
1260             h2o_iovec_t *value = &stream->req.headers.entries[index].value;
1261             h2o_absprio_parse_priority(value->base, value->len, &stream->scheduler.priority);
1262         }
1263     }
1264 
1265     /* special handling of CONNECT method */
1266     if (is_connect) {
1267         return handle_input_expect_headers_process_connect(stream, NULL, err_desc);
1268     } else if (h2o_memis(stream->req.input.method.base, stream->req.input.method.len, H2O_STRLIT("CONNECT-UDP"))) {
1269         return handle_input_expect_headers_process_connect(stream, &datagram_flow_id, err_desc);
1270     }
1271 
1272     /* change state */
1273     set_state(stream, H2O_HTTP3_SERVER_STREAM_STATE_RECV_BODY_BEFORE_BLOCK, 0);
1274 
1275     return 0;
1276 }
1277 
write_response(struct st_h2o_http3_server_stream_t * stream,h2o_iovec_t datagram_flow_id)1278 static void write_response(struct st_h2o_http3_server_stream_t *stream, h2o_iovec_t datagram_flow_id)
1279 {
1280     h2o_iovec_t frame = h2o_qpack_flatten_response(
1281         get_conn(stream)->h3.qpack.enc, &stream->req.pool, stream->quic->stream_id, NULL, stream->req.res.status,
1282         stream->req.res.headers.entries, stream->req.res.headers.size, &get_conn(stream)->super.ctx->globalconf->server_name,
1283         stream->req.res.content_length, datagram_flow_id);
1284 
1285     h2o_vector_reserve(&stream->req.pool, &stream->sendbuf.vecs, stream->sendbuf.vecs.size + 1);
1286     struct st_h2o_http3_server_sendvec_t *vec = stream->sendbuf.vecs.entries + stream->sendbuf.vecs.size++;
1287     h2o_sendvec_init_immutable(&vec->vec, frame.base, frame.len);
1288     vec->entity_offset = UINT64_MAX;
1289     stream->sendbuf.final_size += frame.len;
1290 }
1291 
flatten_data_frame_header(struct st_h2o_http3_server_stream_t * stream,struct st_h2o_http3_server_sendvec_t * dst,size_t payload_size)1292 static size_t flatten_data_frame_header(struct st_h2o_http3_server_stream_t *stream, struct st_h2o_http3_server_sendvec_t *dst,
1293                                         size_t payload_size)
1294 {
1295     size_t header_size = 0;
1296 
1297     /* build header */
1298     stream->sendbuf.data_frame_header_buf[header_size++] = H2O_HTTP3_FRAME_TYPE_DATA;
1299     header_size =
1300         quicly_encodev(stream->sendbuf.data_frame_header_buf + header_size, payload_size) - stream->sendbuf.data_frame_header_buf;
1301 
1302     /* initilaize the vector */
1303     h2o_sendvec_init_raw(&dst->vec, stream->sendbuf.data_frame_header_buf, header_size);
1304     dst->entity_offset = UINT64_MAX;
1305 
1306     return header_size;
1307 }
1308 
shutdown_by_generator(struct st_h2o_http3_server_stream_t * stream)1309 static void shutdown_by_generator(struct st_h2o_http3_server_stream_t *stream)
1310 {
1311     quicly_sendstate_shutdown(&stream->quic->sendstate, stream->sendbuf.final_size);
1312     if (stream->sendbuf.vecs.size == 0) {
1313         if (quicly_stream_has_receive_side(0, stream->quic->stream_id))
1314             quicly_request_stop(stream->quic, H2O_HTTP3_ERROR_EARLY_RESPONSE);
1315         set_state(stream, H2O_HTTP3_SERVER_STREAM_STATE_CLOSE_WAIT, 0);
1316     }
1317 }
1318 
finalize_do_send_setup_udp_tunnel(struct st_h2o_http3_server_stream_t * stream)1319 static h2o_iovec_t finalize_do_send_setup_udp_tunnel(struct st_h2o_http3_server_stream_t *stream)
1320 {
1321     /* check requirements */
1322     if (!(stream->datagram_flow_id != UINT64_MAX && (200 <= stream->req.res.status && stream->req.res.status <= 299) &&
1323           stream->req.forward_datagram.write_ != NULL)) {
1324         stream->datagram_flow_id = UINT64_MAX;
1325         return h2o_iovec_init(NULL, 0);
1326     }
1327 
1328     /* register to the map */
1329     struct st_h2o_http3_server_conn_t *conn = get_conn(stream);
1330     int r;
1331     khiter_t iter = kh_put(stream, conn->datagram_flows, stream->datagram_flow_id, &r);
1332     assert(iter != kh_end(conn->datagram_flows));
1333     kh_val(conn->datagram_flows, iter) = stream;
1334     /* set the callback */
1335     stream->req.forward_datagram.read_ = tunnel_on_udp_read;
1336 
1337     /* build and return the value of datagram-flow-id header field */
1338     h2o_iovec_t datagram_flow_id;
1339     datagram_flow_id.base = h2o_mem_alloc_pool(&stream->req.pool, char, sizeof(H2O_UINT64_LONGEST_STR));
1340     datagram_flow_id.len = sprintf(datagram_flow_id.base, "%" PRIu64, stream->datagram_flow_id);
1341     return datagram_flow_id;
1342 }
1343 
finalize_do_send(struct st_h2o_http3_server_stream_t * stream)1344 static void finalize_do_send(struct st_h2o_http3_server_stream_t *stream)
1345 {
1346     quicly_stream_sync_sendbuf(stream->quic, 1);
1347     if (!stream->proceed_while_sending)
1348         h2o_quic_schedule_timer(&get_conn(stream)->h3.super);
1349 }
1350 
do_send(h2o_ostream_t * _ostr,h2o_req_t * _req,h2o_sendvec_t * bufs,size_t bufcnt,h2o_send_state_t send_state)1351 static void do_send(h2o_ostream_t *_ostr, h2o_req_t *_req, h2o_sendvec_t *bufs, size_t bufcnt, h2o_send_state_t send_state)
1352 {
1353     struct st_h2o_http3_server_stream_t *stream = H2O_STRUCT_FROM_MEMBER(struct st_h2o_http3_server_stream_t, ostr_final, _ostr);
1354 
1355     assert(&stream->req == _req);
1356 
1357     stream->proceed_requested = 0;
1358 
1359     switch (stream->state) {
1360     case H2O_HTTP3_SERVER_STREAM_STATE_SEND_HEADERS:
1361         write_response(stream, finalize_do_send_setup_udp_tunnel(stream));
1362         h2o_probe_log_response(&stream->req, stream->quic->stream_id);
1363         set_state(stream, H2O_HTTP3_SERVER_STREAM_STATE_SEND_BODY, 1);
1364         break;
1365     case H2O_HTTP3_SERVER_STREAM_STATE_SEND_BODY:
1366         assert(quicly_sendstate_is_open(&stream->quic->sendstate));
1367         break;
1368     case H2O_HTTP3_SERVER_STREAM_STATE_CLOSE_WAIT:
1369         /* This protocol handler transitions to CLOSE_WAIT when the request side is being reset by the origin. But our client-side
1370          * implementations are capable of handling uni-directional close, therefore `do_send` might be invoked. The handler swallows
1371          * the input, and relies on eventual destruction of `h2o_req_t` to discard the generator. */
1372         return;
1373     default:
1374         h2o_fatal("logic flaw");
1375         break;
1376     }
1377 
1378     /* If vectors carrying response body are being provided, copy them, incrementing the reference count if possible (for future
1379      * retransmissions), as well as prepending a DATA frame header */
1380     if (bufcnt != 0) {
1381         h2o_vector_reserve(&stream->req.pool, &stream->sendbuf.vecs, stream->sendbuf.vecs.size + 1 + bufcnt);
1382         uint64_t prev_body_size = stream->sendbuf.final_body_size;
1383         for (size_t i = 0; i != bufcnt; ++i) {
1384             /* copy one body vector */
1385             struct st_h2o_http3_server_sendvec_t *dst = stream->sendbuf.vecs.entries + stream->sendbuf.vecs.size + i + 1;
1386             dst->vec = bufs[i];
1387             dst->entity_offset = stream->sendbuf.final_body_size;
1388             stream->sendbuf.final_body_size += bufs[i].len;
1389             /* retain reference count if possible */
1390             if (bufs[i].callbacks->update_refcnt != NULL)
1391                 bufs[i].callbacks->update_refcnt(bufs + i, &stream->req, 1);
1392         }
1393         uint64_t payload_size = stream->sendbuf.final_body_size - prev_body_size;
1394         /* build DATA frame header */
1395         size_t header_size =
1396             flatten_data_frame_header(stream, stream->sendbuf.vecs.entries + stream->sendbuf.vecs.size, payload_size);
1397         /* update properties */
1398         stream->sendbuf.vecs.size += 1 + bufcnt;
1399         stream->sendbuf.final_size += header_size + payload_size;
1400     }
1401 
1402     switch (send_state) {
1403     case H2O_SEND_STATE_IN_PROGRESS:
1404         break;
1405     case H2O_SEND_STATE_FINAL:
1406     case H2O_SEND_STATE_ERROR:
1407         /* TODO consider how to forward error, pending resolution of https://github.com/quicwg/base-drafts/issues/3300 */
1408         shutdown_by_generator(stream);
1409         break;
1410     }
1411 
1412     finalize_do_send(stream);
1413 }
1414 
do_send_informational(h2o_ostream_t * _ostr,h2o_req_t * _req)1415 static void do_send_informational(h2o_ostream_t *_ostr, h2o_req_t *_req)
1416 {
1417     struct st_h2o_http3_server_stream_t *stream = H2O_STRUCT_FROM_MEMBER(struct st_h2o_http3_server_stream_t, ostr_final, _ostr);
1418     assert(&stream->req == _req);
1419 
1420     write_response(stream, h2o_iovec_init(NULL, 0));
1421 
1422     finalize_do_send(stream);
1423 }
1424 
handle_priority_update_frame(struct st_h2o_http3_server_conn_t * conn,const h2o_http3_priority_update_frame_t * frame)1425 static int handle_priority_update_frame(struct st_h2o_http3_server_conn_t *conn, const h2o_http3_priority_update_frame_t *frame)
1426 {
1427     if (frame->element_is_push)
1428         return H2O_HTTP3_ERROR_GENERAL_PROTOCOL;
1429 
1430     /* obtain the stream being referred to (creating one if necessary), or return if the stream has been closed already */
1431     quicly_stream_t *qs;
1432     if (quicly_get_or_open_stream(conn->h3.super.quic, frame->element, &qs) != 0)
1433         return H2O_HTTP3_ERROR_ID;
1434     if (qs == NULL)
1435         return 0;
1436 
1437     /* apply the changes */
1438     struct st_h2o_http3_server_stream_t *stream = qs->data;
1439     assert(stream != NULL);
1440     stream->received_priority_update = 1;
1441     if (h2o_linklist_is_linked(&stream->scheduler.link)) {
1442         req_scheduler_deactivate(&conn->scheduler.reqs, &stream->scheduler);
1443         stream->scheduler.priority = frame->priority; /* TODO apply only the delta? */
1444         req_scheduler_activate(&conn->scheduler.reqs, &stream->scheduler, req_scheduler_compare_stream_id);
1445     } else {
1446         stream->scheduler.priority = frame->priority; /* TODO apply only the delta? */
1447     }
1448 
1449     return 0;
1450 }
1451 
handle_control_stream_frame(h2o_http3_conn_t * _conn,uint8_t type,const uint8_t * payload,size_t len)1452 static void handle_control_stream_frame(h2o_http3_conn_t *_conn, uint8_t type, const uint8_t *payload, size_t len)
1453 {
1454     struct st_h2o_http3_server_conn_t *conn = H2O_STRUCT_FROM_MEMBER(struct st_h2o_http3_server_conn_t, h3, _conn);
1455     int err;
1456     const char *err_desc = NULL;
1457 
1458     if (!h2o_http3_has_received_settings(&conn->h3)) {
1459         if (type != H2O_HTTP3_FRAME_TYPE_SETTINGS) {
1460             err = H2O_HTTP3_ERROR_MISSING_SETTINGS;
1461             goto Fail;
1462         }
1463         if ((err = h2o_http3_handle_settings_frame(&conn->h3, payload, len, &err_desc)) != 0)
1464             goto Fail;
1465         assert(h2o_http3_has_received_settings(&conn->h3));
1466     } else {
1467         switch (type) {
1468         case H2O_HTTP3_FRAME_TYPE_SETTINGS:
1469             err = H2O_HTTP3_ERROR_FRAME_UNEXPECTED;
1470             err_desc = "unexpected SETTINGS frame";
1471             goto Fail;
1472         case H2O_HTTP3_FRAME_TYPE_PRIORITY_UPDATE: {
1473             h2o_http3_priority_update_frame_t frame;
1474             if ((err = h2o_http3_decode_priority_update_frame(&frame, payload, len, &err_desc)) != 0)
1475                 goto Fail;
1476             if ((err = handle_priority_update_frame(conn, &frame)) != 0) {
1477                 err_desc = "invalid PRIORITY_UPDATE frame";
1478                 goto Fail;
1479             }
1480         } break;
1481         default:
1482             break;
1483         }
1484     }
1485 
1486     return;
1487 Fail:
1488     h2o_quic_close_connection(&conn->h3.super, err, err_desc);
1489 }
1490 
stream_open_cb(quicly_stream_open_t * self,quicly_stream_t * qs)1491 static int stream_open_cb(quicly_stream_open_t *self, quicly_stream_t *qs)
1492 {
1493     static const quicly_stream_callbacks_t callbacks = {on_stream_destroy, on_send_shift, on_send_emit,
1494                                                         on_send_stop,      on_receive,    on_receive_reset};
1495 
1496     /* handling of unidirectional streams is not server-specific */
1497     if (quicly_stream_is_unidirectional(qs->stream_id)) {
1498         h2o_http3_on_create_unidirectional_stream(qs);
1499         return 0;
1500     }
1501 
1502     assert(quicly_stream_is_client_initiated(qs->stream_id));
1503 
1504     struct st_h2o_http3_server_conn_t *conn =
1505         H2O_STRUCT_FROM_MEMBER(struct st_h2o_http3_server_conn_t, h3, *quicly_get_data(qs->conn));
1506 
1507     /* create new stream and start handling the request */
1508     struct st_h2o_http3_server_stream_t *stream = h2o_mem_alloc(sizeof(*stream));
1509     stream->quic = qs;
1510     h2o_buffer_init(&stream->recvbuf.buf, &h2o_socket_buffer_prototype);
1511     stream->recvbuf.handle_input = handle_input_expect_headers;
1512     memset(&stream->sendbuf, 0, sizeof(stream->sendbuf));
1513     stream->state = H2O_HTTP3_SERVER_STREAM_STATE_RECV_HEADERS;
1514     stream->link = (h2o_linklist_t){NULL};
1515     stream->ostr_final = (h2o_ostream_t){NULL, do_send, NULL, do_send_informational};
1516     stream->scheduler.link = (h2o_linklist_t){NULL};
1517     stream->scheduler.priority = h2o_absprio_default;
1518     stream->scheduler.call_cnt = 0;
1519 
1520     stream->read_blocked = 0;
1521     stream->proceed_requested = 0;
1522     stream->proceed_while_sending = 0;
1523     stream->received_priority_update = 0;
1524     stream->req_disposed = 0;
1525     stream->req_body = NULL;
1526 
1527     h2o_init_request(&stream->req, &conn->super, NULL);
1528     stream->req.version = 0x0300;
1529     stream->req._ostr_top = &stream->ostr_final;
1530 
1531     stream->quic->data = stream;
1532     stream->quic->callbacks = &callbacks;
1533 
1534     ++*get_state_counter(get_conn(stream), stream->state);
1535     return 0;
1536 }
1537 
1538 static quicly_stream_open_t on_stream_open = {stream_open_cb};
1539 
unblock_conn_blocked_streams(struct st_h2o_http3_server_conn_t * conn)1540 static void unblock_conn_blocked_streams(struct st_h2o_http3_server_conn_t *conn)
1541 {
1542     conn->scheduler.uni.active |= conn->scheduler.uni.conn_blocked;
1543     conn->scheduler.uni.conn_blocked = 0;
1544     req_scheduler_unblock_conn_blocked(&conn->scheduler.reqs, req_scheduler_compare_stream_id);
1545 }
1546 
scheduler_can_send(quicly_stream_scheduler_t * sched,quicly_conn_t * qc,int conn_is_saturated)1547 static int scheduler_can_send(quicly_stream_scheduler_t *sched, quicly_conn_t *qc, int conn_is_saturated)
1548 {
1549     struct st_h2o_http3_server_conn_t *conn = H2O_STRUCT_FROM_MEMBER(struct st_h2o_http3_server_conn_t, h3, *quicly_get_data(qc));
1550 
1551     if (!conn_is_saturated) {
1552         /* not saturated, activate streams marked as being conn-blocked */
1553         unblock_conn_blocked_streams(conn);
1554     } else {
1555         /* TODO lazily move the active request and unidirectional streams to conn_blocked.  Not doing so results in at most one
1556          * spurious call to quicly_send. */
1557     }
1558 
1559     if (conn->scheduler.uni.active != 0)
1560         return 1;
1561     if (conn->scheduler.reqs.active.smallest_urgency < H2O_ABSPRIO_NUM_URGENCY_LEVELS)
1562         return 1;
1563 
1564     return 0;
1565 }
1566 
scheduler_do_send(quicly_stream_scheduler_t * sched,quicly_conn_t * qc,quicly_send_context_t * s)1567 static int scheduler_do_send(quicly_stream_scheduler_t *sched, quicly_conn_t *qc, quicly_send_context_t *s)
1568 {
1569     struct st_h2o_http3_server_conn_t *conn = H2O_STRUCT_FROM_MEMBER(struct st_h2o_http3_server_conn_t, h3, *quicly_get_data(qc));
1570     int ret = 0;
1571 
1572     while (quicly_can_send_data(conn->h3.super.quic, s)) {
1573         /* The strategy is:
1574          *
1575          * 1. dequeue the first active stream
1576          * 2. link the stream to the conn_blocked list, if nothing can be sent for the stream due to the connection being capped
1577          * 3. otherwise, send
1578          * 4. enqueue to the appropriate place
1579          */
1580         if (conn->scheduler.uni.active != 0) {
1581             static const ptrdiff_t stream_offsets[] = {
1582                 offsetof(struct st_h2o_http3_server_conn_t, h3._control_streams.egress.control),
1583                 offsetof(struct st_h2o_http3_server_conn_t, h3._control_streams.egress.qpack_encoder),
1584                 offsetof(struct st_h2o_http3_server_conn_t, h3._control_streams.egress.qpack_decoder)};
1585             /* 1. obtain pointer to the offending stream */
1586             struct st_h2o_http3_egress_unistream_t *stream = NULL;
1587             size_t i;
1588             for (i = 0; i != sizeof(stream_offsets) / sizeof(stream_offsets[0]); ++i) {
1589                 stream = *(void **)((char *)conn + stream_offsets[i]);
1590                 if ((conn->scheduler.uni.active & (1 << stream->quic->stream_id)) != 0)
1591                     break;
1592             }
1593             assert(i != sizeof(stream_offsets) / sizeof(stream_offsets[0]) && "we should have found one stream");
1594             /* 2. move to the conn_blocked list if necessary */
1595             if (quicly_is_blocked(conn->h3.super.quic) && !quicly_stream_can_send(stream->quic, 0)) {
1596                 conn->scheduler.uni.active &= ~(1 << stream->quic->stream_id);
1597                 conn->scheduler.uni.conn_blocked |= 1 << stream->quic->stream_id;
1598                 continue;
1599             }
1600             /* 3. send */
1601             if ((ret = quicly_send_stream(stream->quic, s)) != 0)
1602                 goto Exit;
1603             /* 4. update scheduler state */
1604             conn->scheduler.uni.active &= ~(1 << stream->quic->stream_id);
1605             if (quicly_stream_can_send(stream->quic, 1)) {
1606                 uint16_t *slot = &conn->scheduler.uni.active;
1607                 if (quicly_is_blocked(conn->h3.super.quic) && !quicly_stream_can_send(stream->quic, 0))
1608                     slot = &conn->scheduler.uni.conn_blocked;
1609                 *slot |= 1 << stream->quic->stream_id;
1610             }
1611         } else if (conn->scheduler.reqs.active.smallest_urgency < H2O_ABSPRIO_NUM_URGENCY_LEVELS) {
1612             /* 1. obtain pointer to the offending stream */
1613             h2o_linklist_t *anchor = &conn->scheduler.reqs.active.urgencies[conn->scheduler.reqs.active.smallest_urgency].high;
1614             if (h2o_linklist_is_empty(anchor)) {
1615                 anchor = &conn->scheduler.reqs.active.urgencies[conn->scheduler.reqs.active.smallest_urgency].low;
1616                 assert(!h2o_linklist_is_empty(anchor));
1617             }
1618             struct st_h2o_http3_server_stream_t *stream =
1619                 H2O_STRUCT_FROM_MEMBER(struct st_h2o_http3_server_stream_t, scheduler.link, anchor->next);
1620             /* 1. link to the conn_blocked list if necessary */
1621             if (quicly_is_blocked(conn->h3.super.quic) && !quicly_stream_can_send(stream->quic, 0)) {
1622                 req_scheduler_conn_blocked(&conn->scheduler.reqs, &stream->scheduler);
1623                 continue;
1624             }
1625             /* 3. send */
1626             if ((ret = quicly_send_stream(stream->quic, s)) != 0)
1627                 goto Exit;
1628             ++stream->scheduler.call_cnt;
1629             /* 4. invoke h2o_proceed_request synchronously, so that we could obtain additional data for the current (i.e. highest)
1630              *    stream. */
1631             if (stream->proceed_while_sending) {
1632                 assert(stream->proceed_requested);
1633                 h2o_proceed_response(&stream->req);
1634                 stream->proceed_while_sending = 0;
1635             }
1636             /* 5. prepare for next */
1637             if (quicly_stream_can_send(stream->quic, 1)) {
1638                 if (quicly_is_blocked(conn->h3.super.quic) && !quicly_stream_can_send(stream->quic, 0)) {
1639                     /* capped by connection-level flow control, move the stream to conn-blocked */
1640                     req_scheduler_conn_blocked(&conn->scheduler.reqs, &stream->scheduler);
1641                 } else {
1642                     /* schedule for next emission */
1643                     req_scheduler_setup_for_next(&conn->scheduler.reqs, &stream->scheduler, req_scheduler_compare_stream_id);
1644                 }
1645             } else {
1646                 /* nothing to send at this moment */
1647                 req_scheduler_deactivate(&conn->scheduler.reqs, &stream->scheduler);
1648             }
1649         } else {
1650             break;
1651         }
1652     }
1653 
1654 Exit:
1655     return ret;
1656 }
1657 
scheduler_update_state(struct st_quicly_stream_scheduler_t * sched,quicly_stream_t * qs)1658 static int scheduler_update_state(struct st_quicly_stream_scheduler_t *sched, quicly_stream_t *qs)
1659 {
1660     struct st_h2o_http3_server_conn_t *conn =
1661         H2O_STRUCT_FROM_MEMBER(struct st_h2o_http3_server_conn_t, h3, *quicly_get_data(qs->conn));
1662     enum { DEACTIVATE, ACTIVATE, CONN_BLOCKED } new_state;
1663 
1664     if (quicly_stream_can_send(qs, 1)) {
1665         if (quicly_is_blocked(conn->h3.super.quic) && !quicly_stream_can_send(qs, 0)) {
1666             new_state = CONN_BLOCKED;
1667         } else {
1668             new_state = ACTIVATE;
1669         }
1670     } else {
1671         new_state = DEACTIVATE;
1672     }
1673 
1674     if (quicly_stream_is_unidirectional(qs->stream_id)) {
1675         assert(qs->stream_id < sizeof(uint16_t) * 8);
1676         uint16_t mask = (uint16_t)1 << qs->stream_id;
1677         switch (new_state) {
1678         case DEACTIVATE:
1679             conn->scheduler.uni.active &= ~mask;
1680             conn->scheduler.uni.conn_blocked &= ~mask;
1681             break;
1682         case ACTIVATE:
1683             conn->scheduler.uni.active |= mask;
1684             conn->scheduler.uni.conn_blocked &= ~mask;
1685             break;
1686         case CONN_BLOCKED:
1687             conn->scheduler.uni.active &= ~mask;
1688             conn->scheduler.uni.conn_blocked |= mask;
1689             break;
1690         }
1691     } else {
1692         struct st_h2o_http3_server_stream_t *stream = qs->data;
1693         if (stream->proceed_while_sending)
1694             return 0;
1695         switch (new_state) {
1696         case DEACTIVATE:
1697             req_scheduler_deactivate(&conn->scheduler.reqs, &stream->scheduler);
1698             break;
1699         case ACTIVATE:
1700             req_scheduler_activate(&conn->scheduler.reqs, &stream->scheduler, req_scheduler_compare_stream_id);
1701             break;
1702         case CONN_BLOCKED:
1703             req_scheduler_conn_blocked(&conn->scheduler.reqs, &stream->scheduler);
1704             break;
1705         }
1706     }
1707 
1708     return 0;
1709 }
1710 
1711 static quicly_stream_scheduler_t scheduler = {scheduler_can_send, scheduler_do_send, scheduler_update_state};
1712 
datagram_frame_receive_cb(quicly_receive_datagram_frame_t * self,quicly_conn_t * quic,ptls_iovec_t datagram)1713 static void datagram_frame_receive_cb(quicly_receive_datagram_frame_t *self, quicly_conn_t *quic, ptls_iovec_t datagram)
1714 {
1715     struct st_h2o_http3_server_conn_t *conn = H2O_STRUCT_FROM_MEMBER(struct st_h2o_http3_server_conn_t, h3, *quicly_get_data(quic));
1716     uint64_t flow_id;
1717     h2o_iovec_t payload;
1718 
1719     /* decode */
1720     if ((flow_id = h2o_http3_decode_h3_datagram(&payload, datagram.base, datagram.len)) == UINT64_MAX) {
1721         h2o_quic_close_connection(&conn->h3.super, H2O_HTTP3_ERROR_GENERAL_PROTOCOL, "invalid DATAGRAM frame");
1722         return;
1723     }
1724 
1725     /* find stream */
1726     khiter_t iter = kh_get(stream, conn->datagram_flows, flow_id);
1727     if (iter == kh_end(conn->datagram_flows))
1728         return;
1729     struct st_h2o_http3_server_stream_t *stream = kh_val(conn->datagram_flows, iter);
1730     assert(stream->req.forward_datagram.write_ != NULL);
1731 
1732     /* forward */
1733     stream->req.forward_datagram.write_(&stream->req, &payload, 1);
1734 }
1735 
1736 static quicly_receive_datagram_frame_t on_receive_datagram_frame = {datagram_frame_receive_cb};
1737 
on_h3_destroy(h2o_quic_conn_t * h3_)1738 static void on_h3_destroy(h2o_quic_conn_t *h3_)
1739 {
1740     h2o_http3_conn_t *h3 = (h2o_http3_conn_t *)h3_;
1741     struct st_h2o_http3_server_conn_t *conn = H2O_STRUCT_FROM_MEMBER(struct st_h2o_http3_server_conn_t, h3, h3);
1742     quicly_stats_t stats;
1743 
1744     H2O_PROBE_CONN0(H3S_DESTROY, &conn->super);
1745 
1746     if (quicly_get_stats(h3_->quic, &stats) == 0) {
1747 #define ACC(fld, _unused) conn->super.ctx->quic_stats.quicly.fld += stats.fld;
1748         H2O_QUIC_AGGREGATED_STATS_APPLY(ACC);
1749 #undef ACC
1750     }
1751 
1752     /* unlink and dispose */
1753     h2o_linklist_unlink(&conn->_conns);
1754     if (h2o_timer_is_linked(&conn->timeout))
1755         h2o_timer_unlink(&conn->timeout);
1756     h2o_http3_dispose_conn(&conn->h3);
1757     kh_destroy(stream, conn->datagram_flows);
1758 
1759     /* check consistency post-disposal */
1760     assert(conn->num_streams.recv_headers == 0);
1761     assert(conn->num_streams.req_pending == 0);
1762     assert(conn->num_streams.send_headers == 0);
1763     assert(conn->num_streams.send_body == 0);
1764     assert(conn->num_streams.close_wait == 0);
1765     assert(conn->num_streams_req_streaming == 0);
1766     assert(conn->num_streams_tunnelling == 0);
1767     assert(h2o_linklist_is_empty(&conn->delayed_streams.recv_body_blocked));
1768     assert(h2o_linklist_is_empty(&conn->delayed_streams.req_streaming));
1769     assert(h2o_linklist_is_empty(&conn->delayed_streams.pending));
1770     assert(conn->scheduler.reqs.active.smallest_urgency >= H2O_ABSPRIO_NUM_URGENCY_LEVELS);
1771     assert(h2o_linklist_is_empty(&conn->scheduler.reqs.conn_blocked));
1772 
1773     /* free memory */
1774     free(conn);
1775 }
1776 
h2o_http3_server_init_context(h2o_context_t * h2o,h2o_quic_ctx_t * ctx,h2o_loop_t * loop,h2o_socket_t * sock,quicly_context_t * quic,h2o_quic_accept_cb acceptor,h2o_quic_notify_connection_update_cb notify_conn_update,uint8_t use_gso)1777 void h2o_http3_server_init_context(h2o_context_t *h2o, h2o_quic_ctx_t *ctx, h2o_loop_t *loop, h2o_socket_t *sock,
1778                                    quicly_context_t *quic, h2o_quic_accept_cb acceptor,
1779                                    h2o_quic_notify_connection_update_cb notify_conn_update, uint8_t use_gso)
1780 {
1781     return h2o_quic_init_context(ctx, loop, sock, quic, acceptor, notify_conn_update, use_gso, &h2o->quic_stats);
1782 }
1783 
h2o_http3_server_accept(h2o_http3_server_ctx_t * ctx,quicly_address_t * destaddr,quicly_address_t * srcaddr,quicly_decoded_packet_t * packet,quicly_address_token_plaintext_t * address_token,int skip_tracing,const h2o_http3_conn_callbacks_t * h3_callbacks)1784 h2o_http3_conn_t *h2o_http3_server_accept(h2o_http3_server_ctx_t *ctx, quicly_address_t *destaddr, quicly_address_t *srcaddr,
1785                                           quicly_decoded_packet_t *packet, quicly_address_token_plaintext_t *address_token,
1786                                           int skip_tracing, const h2o_http3_conn_callbacks_t *h3_callbacks)
1787 {
1788     static const h2o_conn_callbacks_t conn_callbacks = {
1789         .get_sockname = get_sockname,
1790         .get_peername = get_peername,
1791         .get_ptls = get_ptls,
1792         .skip_tracing = get_skip_tracing,
1793         .num_reqs_inflight = num_reqs_inflight,
1794         .get_tracer = get_tracer,
1795         .log_ = {{
1796             .transport =
1797                 {
1798                     .cc_name = log_cc_name,
1799                     .delivery_rate = log_delivery_rate,
1800                 },
1801             .ssl =
1802                 {
1803                     .protocol_version = log_tls_protocol_version,
1804                     .session_reused = log_session_reused,
1805                     .cipher = log_cipher,
1806                     .cipher_bits = log_cipher_bits,
1807                     .session_id = log_session_id,
1808                     .server_name = log_server_name,
1809                     .negotiated_protocol = log_negotiated_protocol,
1810                 },
1811             .http3 =
1812                 {
1813                     .stream_id = log_stream_id,
1814                     .quic_stats = log_quic_stats,
1815                     .quic_version = log_quic_version,
1816                 },
1817         }},
1818     };
1819 
1820     /* setup the structure */
1821     struct st_h2o_http3_server_conn_t *conn = (void *)h2o_create_connection(
1822         sizeof(*conn), ctx->accept_ctx->ctx, ctx->accept_ctx->hosts, h2o_gettimeofday(ctx->accept_ctx->ctx->loop), &conn_callbacks);
1823     h2o_http3_init_conn(&conn->h3, &ctx->super, h3_callbacks, &ctx->qpack);
1824     conn->handshake_properties = (ptls_handshake_properties_t){{{{NULL}}}};
1825     h2o_linklist_init_anchor(&conn->delayed_streams.recv_body_blocked);
1826     h2o_linklist_init_anchor(&conn->delayed_streams.req_streaming);
1827     h2o_linklist_init_anchor(&conn->delayed_streams.pending);
1828     h2o_timer_init(&conn->timeout, run_delayed);
1829     memset(&conn->num_streams, 0, sizeof(conn->num_streams));
1830     conn->num_streams_req_streaming = 0;
1831     conn->num_streams_tunnelling = 0;
1832     req_scheduler_init(&conn->scheduler.reqs);
1833     conn->scheduler.uni.active = 0;
1834     conn->scheduler.uni.conn_blocked = 0;
1835     conn->datagram_flows = kh_init(stream);
1836     conn->_conns = (h2o_linklist_t){};
1837 
1838     /* accept connection */
1839 #if PICOTLS_USE_DTRACE
1840     unsigned orig_skip_tracing = ptls_default_skip_tracing;
1841     ptls_default_skip_tracing = skip_tracing;
1842 #endif
1843     quicly_conn_t *qconn;
1844     int accept_ret = quicly_accept(&qconn, ctx->super.quic, &destaddr->sa, &srcaddr->sa, packet, address_token,
1845                                    &ctx->super.next_cid, &conn->handshake_properties);
1846 #if PICOTLS_USE_DTRACE
1847     ptls_default_skip_tracing = orig_skip_tracing;
1848 #endif
1849     if (accept_ret != 0) {
1850         h2o_http3_conn_t *ret = NULL;
1851         if (accept_ret == QUICLY_ERROR_DECRYPTION_FAILED)
1852             ret = (h2o_http3_conn_t *)H2O_QUIC_ACCEPT_CONN_DECRYPTION_FAILED;
1853         h2o_http3_dispose_conn(&conn->h3);
1854         kh_destroy(stream, conn->datagram_flows);
1855         free(conn);
1856         return ret;
1857     }
1858     if (ctx->super.quic_stats != NULL) {
1859         ++ctx->super.quic_stats->packet_processed;
1860     }
1861     ++ctx->super.next_cid.master_id; /* FIXME check overlap */
1862     h2o_linklist_insert(&ctx->accept_ctx->ctx->http3._conns, &conn->_conns);
1863     h2o_http3_setup(&conn->h3, qconn);
1864 
1865     H2O_PROBE_CONN(H3S_ACCEPT, &conn->super, &conn->super, conn->h3.super.quic, h2o_conn_get_uuid(&conn->super));
1866 
1867     h2o_quic_send(&conn->h3.super);
1868 
1869     return &conn->h3;
1870 }
1871 
h2o_http3_server_amend_quicly_context(h2o_globalconf_t * conf,quicly_context_t * quic)1872 void h2o_http3_server_amend_quicly_context(h2o_globalconf_t *conf, quicly_context_t *quic)
1873 {
1874     quic->transport_params.max_data =
1875         conf->http3.active_stream_window_size; /* set to a size that does not block the unblocked request stream */
1876     quic->transport_params.max_streams_uni = 10;
1877     quic->transport_params.max_stream_data.bidi_remote = H2O_HTTP3_INITIAL_REQUEST_STREAM_WINDOW_SIZE;
1878     quic->transport_params.max_idle_timeout = conf->http3.idle_timeout;
1879     quic->transport_params.min_ack_delay_usec = conf->http3.allow_delayed_ack ? 0 : UINT64_MAX;
1880     quic->ack_frequency = conf->http3.ack_frequency;
1881     quic->transport_params.max_datagram_frame_size = 1500; /* accept DATAGRAM frames; let the sender determine MTU, instead of being
1882                                                             * potentially too restrictive */
1883     quic->stream_open = &on_stream_open;
1884     quic->stream_scheduler = &scheduler;
1885     quic->receive_datagram_frame = &on_receive_datagram_frame;
1886 }
1887 
graceful_shutdown_close_stragglers(h2o_timer_t * entry)1888 static void graceful_shutdown_close_stragglers(h2o_timer_t *entry)
1889 {
1890     h2o_context_t *ctx = H2O_STRUCT_FROM_MEMBER(h2o_context_t, http3._graceful_shutdown_timeout, entry);
1891     h2o_linklist_t *node, *next;
1892 
1893     /* We've sent two GOAWAY frames, close the remaining connections */
1894     for (node = ctx->http3._conns.next; node != &ctx->http3._conns; node = next) {
1895         struct st_h2o_http3_server_conn_t *conn = H2O_STRUCT_FROM_MEMBER(struct st_h2o_http3_server_conn_t, _conns, node);
1896         next = node->next;
1897         h2o_quic_close_connection(&conn->h3.super, 0, "shutting down");
1898     }
1899 
1900     ctx->http3._graceful_shutdown_timeout.cb = NULL;
1901 }
1902 
graceful_shutdown_resend_goaway(h2o_timer_t * entry)1903 static void graceful_shutdown_resend_goaway(h2o_timer_t *entry)
1904 {
1905     h2o_context_t *ctx = H2O_STRUCT_FROM_MEMBER(h2o_context_t, http3._graceful_shutdown_timeout, entry);
1906     h2o_linklist_t *node;
1907     int do_close_stragglers = 0;
1908 
1909     /* HTTP/3 draft section 5.2.8 --
1910      * "After allowing time for any in-flight requests or pushes to arrive, the endpoint can send another GOAWAY frame
1911      * indicating which requests or pushes it might accept before the end of the connection.
1912      * This ensures that a connection can be cleanly shut down without losing requests. */
1913     for (node = ctx->http3._conns.next; node != &ctx->http3._conns; node = node->next) {
1914         struct st_h2o_http3_server_conn_t *conn = H2O_STRUCT_FROM_MEMBER(struct st_h2o_http3_server_conn_t, _conns, node);
1915         if (conn->h3.state < H2O_HTTP3_CONN_STATE_HALF_CLOSED && quicly_get_state(conn->h3.super.quic) == QUICLY_STATE_CONNECTED) {
1916             quicly_stream_id_t next_stream_id = quicly_get_remote_next_stream_id(conn->h3.super.quic, 0 /* == bidi */);
1917             /* Section 5.2-1: "This identifier MAY be zero if no requests or pushes were processed."" */
1918             quicly_stream_id_t max_stream_id = next_stream_id < 4 ? 0 /* we haven't received any stream yet */ : next_stream_id - 4;
1919             h2o_http3_send_goaway_frame(&conn->h3, max_stream_id);
1920             conn->h3.state = H2O_HTTP3_CONN_STATE_HALF_CLOSED;
1921             do_close_stragglers = 1;
1922         }
1923     }
1924 
1925     /* After waiting a second, we still had active connections. If configured, wait one
1926      * final timeout before closing the connections */
1927     if (do_close_stragglers && ctx->globalconf->http3.graceful_shutdown_timeout > 0) {
1928         ctx->http3._graceful_shutdown_timeout.cb = graceful_shutdown_close_stragglers;
1929         h2o_timer_link(ctx->loop, ctx->globalconf->http3.graceful_shutdown_timeout, &ctx->http3._graceful_shutdown_timeout);
1930     } else {
1931         ctx->http3._graceful_shutdown_timeout.cb = NULL;
1932     }
1933 }
1934 
initiate_graceful_shutdown(h2o_context_t * ctx)1935 static void initiate_graceful_shutdown(h2o_context_t *ctx)
1936 {
1937     h2o_linklist_t *node;
1938 
1939     /* only doit once */
1940     if (ctx->http3._graceful_shutdown_timeout.cb != NULL)
1941         return;
1942     ctx->http3._graceful_shutdown_timeout.cb = graceful_shutdown_resend_goaway;
1943 
1944     for (node = ctx->http3._conns.next; node != &ctx->http3._conns; node = node->next) {
1945         struct st_h2o_http3_server_conn_t *conn = H2O_STRUCT_FROM_MEMBER(struct st_h2o_http3_server_conn_t, _conns, node);
1946         /* There is a moment where the control stream is already closed while st_h2o_http3_server_conn_t is not.
1947          * Check QUIC connection state to skip sending GOAWAY in such a case. */
1948         if (conn->h3.state < H2O_HTTP3_CONN_STATE_HALF_CLOSED && quicly_get_state(conn->h3.super.quic) == QUICLY_STATE_CONNECTED) {
1949             /* advertise the maximum stream ID to indicate that we will no longer accept new requests.
1950              * HTTP/3 draft section 5.2.8 --
1951              * "An endpoint that is attempting to gracefully shut down a connection can send a GOAWAY frame with a value set to the
1952              * maximum possible value (2^62-4 for servers, 2^62-1 for clients). This ensures that the peer stops creating new
1953              * requests or pushes." */
1954             h2o_http3_send_goaway_frame(&conn->h3, (UINT64_C(1) << 62) - 4);
1955         }
1956     }
1957     h2o_timer_link(ctx->loop, 1000, &ctx->http3._graceful_shutdown_timeout);
1958 }
1959 
1960 struct foreach_request_ctx {
1961     int (*cb)(h2o_req_t *req, void *cbdata);
1962     void *cbdata;
1963 };
1964 
foreach_request_per_conn(void * _ctx,quicly_stream_t * qs)1965 static int foreach_request_per_conn(void *_ctx, quicly_stream_t *qs)
1966 {
1967     struct foreach_request_ctx *ctx = _ctx;
1968 
1969     /* skip if the stream is not a request stream (TODO handle push?) */
1970     if (!(quicly_stream_is_client_initiated(qs->stream_id) && !quicly_stream_is_unidirectional(qs->stream_id)))
1971         return 0;
1972 
1973     struct st_h2o_http3_server_stream_t *stream = qs->data;
1974     assert(stream->quic == qs);
1975 
1976     if (stream->state == H2O_HTTP3_SERVER_STREAM_STATE_CLOSE_WAIT)
1977         return 0;
1978     return ctx->cb(&stream->req, ctx->cbdata);
1979 }
1980 
foreach_request(h2o_context_t * ctx,int (* cb)(h2o_req_t * req,void * cbdata),void * cbdata)1981 static int foreach_request(h2o_context_t *ctx, int (*cb)(h2o_req_t *req, void *cbdata), void *cbdata)
1982 {
1983     struct foreach_request_ctx foreach_ctx = {.cb = cb, .cbdata = cbdata};
1984 
1985     for (h2o_linklist_t *node = ctx->http3._conns.next; node != &ctx->http3._conns; node = node->next) {
1986         struct st_h2o_http3_server_conn_t *conn = H2O_STRUCT_FROM_MEMBER(struct st_h2o_http3_server_conn_t, _conns, node);
1987         quicly_foreach_stream(conn->h3.super.quic, &foreach_ctx, foreach_request_per_conn);
1988     }
1989 
1990     return 0;
1991 }
1992 
1993 const h2o_protocol_callbacks_t H2O_HTTP3_SERVER_CALLBACKS = {initiate_graceful_shutdown, foreach_request};
1994 const h2o_http3_conn_callbacks_t H2O_HTTP3_CONN_CALLBACKS = {{on_h3_destroy}, handle_control_stream_frame};
1995