1 /* Licensed to the Apache Software Foundation (ASF) under one or more
2  * contributor license agreements.  See the NOTICE file distributed with
3  * this work for additional information regarding copyright ownership.
4  * The ASF licenses this file to You under the Apache License, Version 2.0
5  * (the "License"); you may not use this file except in compliance with
6  * the License.  You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <stddef.h>
18 #include <apr_strings.h>
19 #include <nghttp2/nghttp2.h>
20 
21 #include <mpm_common.h>
22 #include <httpd.h>
23 #include <mod_proxy.h>
24 
25 #include "mod_http2.h"
26 #include "h2.h"
27 #include "h2_proxy_util.h"
28 #include "h2_proxy_session.h"
29 
30 APLOG_USE_MODULE(proxy_http2);
31 
32 typedef struct h2_proxy_stream {
33     int id;
34     apr_pool_t *pool;
35     h2_proxy_session *session;
36 
37     const char *url;
38     request_rec *r;
39     h2_proxy_request *req;
40     const char *real_server_uri;
41     const char *p_server_uri;
42     int standalone;
43 
44     h2_proxy_stream_state_t state;
45     unsigned int suspended : 1;
46     unsigned int waiting_on_100 : 1;
47     unsigned int waiting_on_ping : 1;
48     unsigned int headers_ended : 1;
49     uint32_t error_code;
50 
51     apr_bucket_brigade *input;
52     apr_off_t data_sent;
53     apr_bucket_brigade *output;
54     apr_off_t data_received;
55 
56     apr_table_t *saves;
57 } h2_proxy_stream;
58 
59 
60 static void dispatch_event(h2_proxy_session *session, h2_proxys_event_t ev,
61                            int arg, const char *msg);
62 static void ping_arrived(h2_proxy_session *session);
63 static apr_status_t check_suspended(h2_proxy_session *session);
64 static void stream_resume(h2_proxy_stream *stream);
65 static apr_status_t submit_trailers(h2_proxy_stream *stream);
66 
67 /*
68  * The H2_PING connection sub-state: a state independant of the H2_SESSION state
69  * of the connection:
70  * - H2_PING_ST_NONE: no interference with request handling, ProxyTimeout in effect.
71  *   When entered, all suspended streams are unsuspended again.
72  * - H2_PING_ST_AWAIT_ANY: new requests are suspended, a possibly configured "ping"
73  *   timeout is in effect. Any frame received transits to H2_PING_ST_NONE.
74  * - H2_PING_ST_AWAIT_PING: same as above, but only a PING frame transits
75  *   to H2_PING_ST_NONE.
76  *
77  * An AWAIT state is entered on a new connection or when re-using a connection and
78  * the last frame received has been some time ago. The latter sends a PING frame
79  * and insists on an answer, the former is satisfied by any frame received from the
80  * backend.
81  *
82  * This works for new connections as there is always at least one SETTINGS frame
83  * that the backend sends. When re-using connection, we send a PING and insist on
84  * receiving one back, as there might be frames in our connection buffers from
85  * some time ago. Since some servers have protections against PING flooding, we
86  * only ever have one PING unanswered.
87  *
88  * Requests are suspended while in a PING state, as we do not want to send data
89  * before we can be reasonably sure that the connection is working (at least on
90  * the h2 protocol level). This also means that the session can do blocking reads
91  * when expecting PING answers.
92  */
set_ping_timeout(h2_proxy_session * session)93 static void set_ping_timeout(h2_proxy_session *session)
94 {
95     if (session->ping_timeout != -1 && session->save_timeout == -1) {
96         apr_socket_t *socket = NULL;
97 
98         socket = ap_get_conn_socket(session->c);
99         if (socket) {
100             apr_socket_timeout_get(socket, &session->save_timeout);
101             apr_socket_timeout_set(socket, session->ping_timeout);
102         }
103     }
104 }
105 
unset_ping_timeout(h2_proxy_session * session)106 static void unset_ping_timeout(h2_proxy_session *session)
107 {
108     if (session->save_timeout != -1) {
109         apr_socket_t *socket = NULL;
110 
111         socket = ap_get_conn_socket(session->c);
112         if (socket) {
113             apr_socket_timeout_set(socket, session->save_timeout);
114             session->save_timeout = -1;
115         }
116     }
117 }
118 
enter_ping_state(h2_proxy_session * session,h2_ping_state_t state)119 static void enter_ping_state(h2_proxy_session *session, h2_ping_state_t state)
120 {
121     if (session->ping_state == state) return;
122     switch (session->ping_state) {
123     case H2_PING_ST_NONE:
124         /* leaving NONE, enforce timeout, send frame maybe */
125         if (H2_PING_ST_AWAIT_PING == state) {
126             unset_ping_timeout(session);
127             nghttp2_submit_ping(session->ngh2, 0, (const uint8_t *)"nevergonnagiveyouup");
128         }
129         set_ping_timeout(session);
130         session->ping_state = state;
131         break;
132     default:
133         /* no switching between the != NONE states */
134         if (H2_PING_ST_NONE == state) {
135             session->ping_state = state;
136             unset_ping_timeout(session);
137             ping_arrived(session);
138         }
139         break;
140     }
141 }
142 
ping_new_session(h2_proxy_session * session,proxy_conn_rec * p_conn)143 static void ping_new_session(h2_proxy_session *session, proxy_conn_rec *p_conn)
144 {
145     session->save_timeout = -1;
146     session->ping_timeout = (p_conn->worker->s->ping_timeout_set?
147                              p_conn->worker->s->ping_timeout : -1);
148     session->ping_state = H2_PING_ST_NONE;
149     enter_ping_state(session, H2_PING_ST_AWAIT_ANY);
150 }
151 
ping_reuse_session(h2_proxy_session * session)152 static void ping_reuse_session(h2_proxy_session *session)
153 {
154     if (H2_PING_ST_NONE == session->ping_state) {
155         apr_interval_time_t age = apr_time_now() - session->last_frame_received;
156         if (age > apr_time_from_sec(1)) {
157             enter_ping_state(session, H2_PING_ST_AWAIT_PING);
158         }
159     }
160 }
161 
ping_ev_frame_received(h2_proxy_session * session,const nghttp2_frame * frame)162 static void ping_ev_frame_received(h2_proxy_session *session, const nghttp2_frame *frame)
163 {
164     session->last_frame_received = apr_time_now();
165     switch (session->ping_state) {
166     case H2_PING_ST_NONE:
167         /* nop */
168         break;
169     case H2_PING_ST_AWAIT_ANY:
170         enter_ping_state(session, H2_PING_ST_NONE);
171         break;
172     case H2_PING_ST_AWAIT_PING:
173         if (NGHTTP2_PING == frame->hd.type) {
174             enter_ping_state(session, H2_PING_ST_NONE);
175         }
176         /* we may receive many other frames while we are waiting for the
177          * PING answer. They may come all from our connection buffers and
178          * say nothing about the current state of the backend. */
179         break;
180     }
181 }
182 
proxy_session_pre_close(void * theconn)183 static apr_status_t proxy_session_pre_close(void *theconn)
184 {
185     proxy_conn_rec *p_conn = (proxy_conn_rec *)theconn;
186     h2_proxy_session *session = p_conn->data;
187 
188     if (session && session->ngh2) {
189         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
190                       "proxy_session(%s): pool cleanup, state=%d, streams=%d",
191                       session->id, session->state,
192                       (int)h2_proxy_ihash_count(session->streams));
193         session->aborted = 1;
194         dispatch_event(session, H2_PROXYS_EV_PRE_CLOSE, 0, NULL);
195         nghttp2_session_del(session->ngh2);
196         session->ngh2 = NULL;
197         p_conn->data = NULL;
198     }
199     return APR_SUCCESS;
200 }
201 
proxy_pass_brigade(apr_bucket_alloc_t * bucket_alloc,proxy_conn_rec * p_conn,conn_rec * origin,apr_bucket_brigade * bb,int flush)202 static int proxy_pass_brigade(apr_bucket_alloc_t *bucket_alloc,
203                               proxy_conn_rec *p_conn,
204                               conn_rec *origin, apr_bucket_brigade *bb,
205                               int flush)
206 {
207     apr_status_t status;
208     apr_off_t transferred;
209 
210     if (flush) {
211         apr_bucket *e = apr_bucket_flush_create(bucket_alloc);
212         APR_BRIGADE_INSERT_TAIL(bb, e);
213     }
214     apr_brigade_length(bb, 0, &transferred);
215     if (transferred != -1)
216         p_conn->worker->s->transferred += transferred;
217     status = ap_pass_brigade(origin->output_filters, bb);
218     /* Cleanup the brigade now to avoid buckets lifetime
219      * issues in case of error returned below. */
220     apr_brigade_cleanup(bb);
221     if (status != APR_SUCCESS) {
222         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, origin, APLOGNO(03357)
223                       "pass output failed to %pI (%s)",
224                       p_conn->addr, p_conn->hostname);
225     }
226     return status;
227 }
228 
raw_send(nghttp2_session * ngh2,const uint8_t * data,size_t length,int flags,void * user_data)229 static ssize_t raw_send(nghttp2_session *ngh2, const uint8_t *data,
230                         size_t length, int flags, void *user_data)
231 {
232     h2_proxy_session *session = user_data;
233     apr_bucket *b;
234     apr_status_t status;
235     int flush = 1;
236 
237     if (data) {
238         b = apr_bucket_transient_create((const char*)data, length,
239                                         session->c->bucket_alloc);
240         APR_BRIGADE_INSERT_TAIL(session->output, b);
241     }
242 
243     status = proxy_pass_brigade(session->c->bucket_alloc,
244                                 session->p_conn, session->c,
245                                 session->output, flush);
246     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
247                   "h2_proxy_sesssion(%s): raw_send %d bytes, flush=%d",
248                   session->id, (int)length, flush);
249     if (status != APR_SUCCESS) {
250         return NGHTTP2_ERR_CALLBACK_FAILURE;
251     }
252     return length;
253 }
254 
on_frame_recv(nghttp2_session * ngh2,const nghttp2_frame * frame,void * user_data)255 static int on_frame_recv(nghttp2_session *ngh2, const nghttp2_frame *frame,
256                          void *user_data)
257 {
258     h2_proxy_session *session = user_data;
259     h2_proxy_stream *stream;
260     request_rec *r;
261     int n;
262 
263     if (APLOGcdebug(session->c)) {
264         char buffer[256];
265 
266         h2_proxy_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
267         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03341)
268                       "h2_proxy_session(%s): recv FRAME[%s]",
269                       session->id, buffer);
270     }
271 
272     ping_ev_frame_received(session, frame);
273     /* Action for frame types: */
274     switch (frame->hd.type) {
275         case NGHTTP2_HEADERS:
276             stream = nghttp2_session_get_stream_user_data(ngh2, frame->hd.stream_id);
277             if (!stream) {
278                 return NGHTTP2_ERR_CALLBACK_FAILURE;
279             }
280             r = stream->r;
281             if (r->status >= 100 && r->status < 200) {
282                 /* By default, we will forward all interim responses when
283                  * we are sitting on a HTTP/2 connection to the client */
284                 int forward = session->h2_front;
285                 switch(r->status) {
286                     case 100:
287                         if (stream->waiting_on_100) {
288                             stream->waiting_on_100 = 0;
289                             r->status_line = ap_get_status_line(r->status);
290                             forward = 1;
291                         }
292                         break;
293                     case 103:
294                         /* workaround until we get this into http protocol base
295                          * parts. without this, unknown codes are converted to
296                          * 500... */
297                         r->status_line = "103 Early Hints";
298                         break;
299                     default:
300                         r->status_line = ap_get_status_line(r->status);
301                         break;
302                 }
303                 ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, APLOGNO(03487)
304                               "h2_proxy_session(%s): got interim HEADERS, "
305                               "status=%d, will forward=%d",
306                               session->id, r->status, forward);
307                 if (forward) {
308                     ap_send_interim_response(r, 1);
309                 }
310             }
311             stream_resume(stream);
312             break;
313         case NGHTTP2_PING:
314             break;
315         case NGHTTP2_PUSH_PROMISE:
316             break;
317         case NGHTTP2_SETTINGS:
318             if (frame->settings.niv > 0) {
319                 n = nghttp2_session_get_remote_settings(ngh2, NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS);
320                 if (n > 0) {
321                     session->remote_max_concurrent = n;
322                 }
323             }
324             break;
325         case NGHTTP2_GOAWAY:
326             /* we expect the remote server to tell us the highest stream id
327              * that it has started processing. */
328             session->last_stream_id = frame->goaway.last_stream_id;
329             dispatch_event(session, H2_PROXYS_EV_REMOTE_GOAWAY, 0, NULL);
330             break;
331         default:
332             break;
333     }
334     return 0;
335 }
336 
before_frame_send(nghttp2_session * ngh2,const nghttp2_frame * frame,void * user_data)337 static int before_frame_send(nghttp2_session *ngh2,
338                              const nghttp2_frame *frame, void *user_data)
339 {
340     h2_proxy_session *session = user_data;
341     if (APLOGcdebug(session->c)) {
342         char buffer[256];
343 
344         h2_proxy_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
345         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03343)
346                       "h2_proxy_session(%s): sent FRAME[%s]",
347                       session->id, buffer);
348     }
349     return 0;
350 }
351 
add_header(void * table,const char * n,const char * v)352 static int add_header(void *table, const char *n, const char *v)
353 {
354     apr_table_add(table, n, v);
355     return 1;
356 }
357 
process_proxy_header(apr_table_t * headers,h2_proxy_stream * stream,const char * n,const char * v)358 static void process_proxy_header(apr_table_t *headers, h2_proxy_stream *stream,
359                                  const char *n, const char *v)
360 {
361     static const struct {
362         const char *name;
363         ap_proxy_header_reverse_map_fn func;
364     } transform_hdrs[] = {
365         { "Location", ap_proxy_location_reverse_map },
366         { "Content-Location", ap_proxy_location_reverse_map },
367         { "URI", ap_proxy_location_reverse_map },
368         { "Destination", ap_proxy_location_reverse_map },
369         { "Set-Cookie", ap_proxy_cookie_reverse_map },
370         { NULL, NULL }
371     };
372     request_rec *r = stream->r;
373     proxy_dir_conf *dconf;
374     int i;
375 
376     dconf = ap_get_module_config(r->per_dir_config, &proxy_module);
377     if (!dconf->preserve_host) {
378         for (i = 0; transform_hdrs[i].name; ++i) {
379             if (!ap_cstr_casecmp(transform_hdrs[i].name, n)) {
380                 apr_table_add(headers, n, (*transform_hdrs[i].func)(r, dconf, v));
381                 return;
382             }
383         }
384         if (!ap_cstr_casecmp("Link", n)) {
385             dconf = ap_get_module_config(r->per_dir_config, &proxy_module);
386             apr_table_add(headers, n, h2_proxy_link_reverse_map(r, dconf,
387                             stream->real_server_uri, stream->p_server_uri, v));
388             return;
389         }
390     }
391     apr_table_add(headers, n, v);
392 }
393 
h2_proxy_stream_add_header_out(h2_proxy_stream * stream,const char * n,apr_size_t nlen,const char * v,apr_size_t vlen)394 static apr_status_t h2_proxy_stream_add_header_out(h2_proxy_stream *stream,
395                                                    const char *n, apr_size_t nlen,
396                                                    const char *v, apr_size_t vlen)
397 {
398     if (n[0] == ':') {
399         if (!stream->data_received && !strncmp(":status", n, nlen)) {
400             char *s = apr_pstrndup(stream->r->pool, v, vlen);
401 
402             apr_table_setn(stream->r->notes, "proxy-status", s);
403             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
404                           "h2_proxy_stream(%s-%d): got status %s",
405                           stream->session->id, stream->id, s);
406             stream->r->status = (int)apr_atoi64(s);
407             if (stream->r->status <= 0) {
408                 stream->r->status = 500;
409                 return APR_EGENERAL;
410             }
411         }
412         return APR_SUCCESS;
413     }
414 
415     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
416                   "h2_proxy_stream(%s-%d): on_header %s: %s",
417                   stream->session->id, stream->id, n, v);
418     if (!h2_proxy_res_ignore_header(n, nlen)) {
419         char *hname, *hvalue;
420         apr_table_t *headers = (stream->headers_ended?
421                                stream->r->trailers_out : stream->r->headers_out);
422 
423         hname = apr_pstrndup(stream->pool, n, nlen);
424         h2_proxy_util_camel_case_header(hname, nlen);
425         hvalue = apr_pstrndup(stream->pool, v, vlen);
426 
427         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
428                       "h2_proxy_stream(%s-%d): got header %s: %s",
429                       stream->session->id, stream->id, hname, hvalue);
430         process_proxy_header(headers, stream, hname, hvalue);
431     }
432     return APR_SUCCESS;
433 }
434 
log_header(void * ctx,const char * key,const char * value)435 static int log_header(void *ctx, const char *key, const char *value)
436 {
437     h2_proxy_stream *stream = ctx;
438     ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, stream->r,
439                   "h2_proxy_stream(%s-%d), header_out %s: %s",
440                   stream->session->id, stream->id, key, value);
441     return 1;
442 }
443 
h2_proxy_stream_end_headers_out(h2_proxy_stream * stream)444 static void h2_proxy_stream_end_headers_out(h2_proxy_stream *stream)
445 {
446     h2_proxy_session *session = stream->session;
447     request_rec *r = stream->r;
448     apr_pool_t *p = r->pool;
449 
450     /* Now, add in the cookies from the response to the ones already saved */
451     apr_table_do(add_header, stream->saves, r->headers_out, "Set-Cookie", NULL);
452 
453     /* and now load 'em all in */
454     if (!apr_is_empty_table(stream->saves)) {
455         apr_table_unset(r->headers_out, "Set-Cookie");
456         r->headers_out = apr_table_overlay(p, r->headers_out, stream->saves);
457     }
458 
459     /* handle Via header in response */
460     if (session->conf->viaopt != via_off
461         && session->conf->viaopt != via_block) {
462         const char *server_name = ap_get_server_name(stream->r);
463         apr_port_t port = ap_get_server_port(stream->r);
464         char portstr[32];
465 
466         /* If USE_CANONICAL_NAME_OFF was configured for the proxy virtual host,
467          * then the server name returned by ap_get_server_name() is the
468          * origin server name (which doesn't make sense with Via: headers)
469          * so we use the proxy vhost's name instead.
470          */
471         if (server_name == stream->r->hostname) {
472             server_name = stream->r->server->server_hostname;
473         }
474         if (ap_is_default_port(port, stream->r)) {
475             portstr[0] = '\0';
476         }
477         else {
478             apr_snprintf(portstr, sizeof(portstr), ":%d", port);
479         }
480 
481         /* create a "Via:" response header entry and merge it */
482         apr_table_add(r->headers_out, "Via",
483                        (session->conf->viaopt == via_full)
484                        ? apr_psprintf(p, "%d.%d %s%s (%s)",
485                                       HTTP_VERSION_MAJOR(r->proto_num),
486                                       HTTP_VERSION_MINOR(r->proto_num),
487                                       server_name, portstr,
488                                       AP_SERVER_BASEVERSION)
489                        : apr_psprintf(p, "%d.%d %s%s",
490                                       HTTP_VERSION_MAJOR(r->proto_num),
491                                       HTTP_VERSION_MINOR(r->proto_num),
492                                       server_name, portstr)
493                        );
494     }
495     if (r->status >= 200) stream->headers_ended = 1;
496 
497     if (APLOGrtrace2(stream->r)) {
498         ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, stream->r,
499                       "h2_proxy_stream(%s-%d), header_out after merging",
500                       stream->session->id, stream->id);
501         apr_table_do(log_header, stream, stream->r->headers_out, NULL);
502     }
503 }
504 
stream_response_data(nghttp2_session * ngh2,uint8_t flags,int32_t stream_id,const uint8_t * data,size_t len,void * user_data)505 static int stream_response_data(nghttp2_session *ngh2, uint8_t flags,
506                                 int32_t stream_id, const uint8_t *data,
507                                 size_t len, void *user_data)
508 {
509     h2_proxy_session *session = user_data;
510     h2_proxy_stream *stream;
511     apr_bucket *b;
512     apr_status_t status;
513 
514     stream = nghttp2_session_get_stream_user_data(ngh2, stream_id);
515     if (!stream) {
516         ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(03358)
517                      "h2_proxy_session(%s): recv data chunk for "
518                      "unknown stream %d, ignored",
519                      session->id, stream_id);
520         return 0;
521     }
522 
523     if (!stream->data_received) {
524         /* last chance to manipulate response headers.
525          * after this, only trailers */
526         h2_proxy_stream_end_headers_out(stream);
527     }
528     stream->data_received += len;
529 
530     b = apr_bucket_transient_create((const char*)data, len,
531                                     stream->r->connection->bucket_alloc);
532     APR_BRIGADE_INSERT_TAIL(stream->output, b);
533     /* always flush after a DATA frame, as we have no other indication
534      * of buffer use */
535     b = apr_bucket_flush_create(stream->r->connection->bucket_alloc);
536     APR_BRIGADE_INSERT_TAIL(stream->output, b);
537 
538     status = ap_pass_brigade(stream->r->output_filters, stream->output);
539     ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, stream->r, APLOGNO(03359)
540                   "h2_proxy_session(%s): stream=%d, response DATA %ld, %ld"
541                   " total", session->id, stream_id, (long)len,
542                   (long)stream->data_received);
543     if (status != APR_SUCCESS) {
544         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, APLOGNO(03344)
545                       "h2_proxy_session(%s): passing output on stream %d",
546                       session->id, stream->id);
547         nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE,
548                                   stream_id, NGHTTP2_STREAM_CLOSED);
549         return NGHTTP2_ERR_STREAM_CLOSING;
550     }
551     return 0;
552 }
553 
on_stream_close(nghttp2_session * ngh2,int32_t stream_id,uint32_t error_code,void * user_data)554 static int on_stream_close(nghttp2_session *ngh2, int32_t stream_id,
555                            uint32_t error_code, void *user_data)
556 {
557     h2_proxy_session *session = user_data;
558     h2_proxy_stream *stream;
559     if (!session->aborted) {
560         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03360)
561                       "h2_proxy_session(%s): stream=%d, closed, err=%d",
562                       session->id, stream_id, error_code);
563         stream = h2_proxy_ihash_get(session->streams, stream_id);
564         if (stream) {
565             stream->error_code = error_code;
566         }
567         dispatch_event(session, H2_PROXYS_EV_STREAM_DONE, stream_id, NULL);
568     }
569     return 0;
570 }
571 
on_header(nghttp2_session * ngh2,const nghttp2_frame * frame,const uint8_t * namearg,size_t nlen,const uint8_t * valuearg,size_t vlen,uint8_t flags,void * user_data)572 static int on_header(nghttp2_session *ngh2, const nghttp2_frame *frame,
573                      const uint8_t *namearg, size_t nlen,
574                      const uint8_t *valuearg, size_t vlen, uint8_t flags,
575                      void *user_data)
576 {
577     h2_proxy_session *session = user_data;
578     h2_proxy_stream *stream;
579     const char *n = (const char*)namearg;
580     const char *v = (const char*)valuearg;
581 
582     (void)session;
583     if (frame->hd.type == NGHTTP2_HEADERS && nlen) {
584         stream = nghttp2_session_get_stream_user_data(ngh2, frame->hd.stream_id);
585         if (stream) {
586             if (h2_proxy_stream_add_header_out(stream, n, nlen, v, vlen)) {
587                 return NGHTTP2_ERR_CALLBACK_FAILURE;
588             }
589         }
590     }
591     else if (frame->hd.type == NGHTTP2_PUSH_PROMISE) {
592     }
593 
594     return 0;
595 }
596 
stream_request_data(nghttp2_session * ngh2,int32_t stream_id,uint8_t * buf,size_t length,uint32_t * data_flags,nghttp2_data_source * source,void * user_data)597 static ssize_t stream_request_data(nghttp2_session *ngh2, int32_t stream_id,
598                                    uint8_t *buf, size_t length,
599                                    uint32_t *data_flags,
600                                    nghttp2_data_source *source, void *user_data)
601 {
602     h2_proxy_stream *stream;
603     apr_status_t status = APR_SUCCESS;
604 
605     *data_flags = 0;
606     stream = nghttp2_session_get_stream_user_data(ngh2, stream_id);
607     if (!stream) {
608         ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(03361)
609                      "h2_proxy_stream(NULL): data_read, stream %d not found",
610                      stream_id);
611         return NGHTTP2_ERR_CALLBACK_FAILURE;
612     }
613 
614     if (stream->session->ping_state != H2_PING_ST_NONE) {
615         /* suspend until we hear from the other side */
616         stream->waiting_on_ping = 1;
617         status = APR_EAGAIN;
618     }
619     else if (stream->r->expecting_100) {
620         /* suspend until the answer comes */
621         stream->waiting_on_100 = 1;
622         status = APR_EAGAIN;
623     }
624     else if (APR_BRIGADE_EMPTY(stream->input)) {
625         status = ap_get_brigade(stream->r->input_filters, stream->input,
626                                 AP_MODE_READBYTES, APR_NONBLOCK_READ,
627                                 H2MAX(APR_BUCKET_BUFF_SIZE, length));
628         ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, stream->r,
629                       "h2_proxy_stream(%s-%d): request body read",
630                       stream->session->id, stream->id);
631     }
632 
633     if (status == APR_SUCCESS) {
634         ssize_t readlen = 0;
635         while (status == APR_SUCCESS
636                && (readlen < length)
637                && !APR_BRIGADE_EMPTY(stream->input)) {
638             apr_bucket* b = APR_BRIGADE_FIRST(stream->input);
639             if (APR_BUCKET_IS_METADATA(b)) {
640                 if (APR_BUCKET_IS_EOS(b)) {
641                     *data_flags |= NGHTTP2_DATA_FLAG_EOF;
642                 }
643                 else {
644                     /* we do nothing more regarding any meta here */
645                 }
646             }
647             else {
648                 const char *bdata = NULL;
649                 apr_size_t blen = 0;
650                 status = apr_bucket_read(b, &bdata, &blen, APR_BLOCK_READ);
651 
652                 if (status == APR_SUCCESS && blen > 0) {
653                     ssize_t copylen = H2MIN(length - readlen, blen);
654                     memcpy(buf, bdata, copylen);
655                     buf += copylen;
656                     readlen += copylen;
657                     if (copylen < blen) {
658                         /* We have data left in the bucket. Split it. */
659                         status = apr_bucket_split(b, copylen);
660                     }
661                 }
662             }
663             apr_bucket_delete(b);
664         }
665 
666         stream->data_sent += readlen;
667         ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, stream->r, APLOGNO(03468)
668                       "h2_proxy_stream(%d): request DATA %ld, %ld"
669                       " total, flags=%d", stream->id, (long)readlen, (long)stream->data_sent,
670                       (int)*data_flags);
671         if ((*data_flags & NGHTTP2_DATA_FLAG_EOF) && !apr_is_empty_table(stream->r->trailers_in)) {
672             ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, stream->r, APLOGNO(10179)
673                           "h2_proxy_stream(%d): submit trailers", stream->id);
674             *data_flags |= NGHTTP2_DATA_FLAG_NO_END_STREAM;
675             submit_trailers(stream);
676         }
677         return readlen;
678     }
679     else if (APR_STATUS_IS_EAGAIN(status)) {
680         /* suspended stream, needs to be re-awakened */
681         ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, stream->r,
682                       "h2_proxy_stream(%s-%d): suspending",
683                       stream->session->id, stream_id);
684         stream->suspended = 1;
685         h2_proxy_iq_add(stream->session->suspended, stream->id, NULL, NULL);
686         return NGHTTP2_ERR_DEFERRED;
687     }
688     else {
689         nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE,
690                                   stream_id, NGHTTP2_STREAM_CLOSED);
691         return NGHTTP2_ERR_STREAM_CLOSING;
692     }
693 }
694 
695 #ifdef H2_NG2_INVALID_HEADER_CB
on_invalid_header_cb(nghttp2_session * ngh2,const nghttp2_frame * frame,const uint8_t * name,size_t namelen,const uint8_t * value,size_t valuelen,uint8_t flags,void * user_data)696 static int on_invalid_header_cb(nghttp2_session *ngh2,
697                                 const nghttp2_frame *frame,
698                                 const uint8_t *name, size_t namelen,
699                                 const uint8_t *value, size_t valuelen,
700                                 uint8_t flags, void *user_data)
701 {
702     h2_proxy_session *session = user_data;
703     if (APLOGcdebug(session->c)) {
704         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03469)
705                       "h2_proxy_session(%s-%d): denying stream with invalid header "
706                       "'%s: %s'", session->id, (int)frame->hd.stream_id,
707                       apr_pstrndup(session->pool, (const char *)name, namelen),
708                       apr_pstrndup(session->pool, (const char *)value, valuelen));
709     }
710     return nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
711                                      frame->hd.stream_id,
712                                      NGHTTP2_PROTOCOL_ERROR);
713 }
714 #endif
715 
h2_proxy_session_setup(const char * id,proxy_conn_rec * p_conn,proxy_server_conf * conf,int h2_front,unsigned char window_bits_connection,unsigned char window_bits_stream,h2_proxy_request_done * done)716 h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn,
717                                          proxy_server_conf *conf,
718                                          int h2_front,
719                                          unsigned char window_bits_connection,
720                                          unsigned char window_bits_stream,
721                                          h2_proxy_request_done *done)
722 {
723     if (!p_conn->data) {
724         apr_pool_t *pool = p_conn->scpool;
725         h2_proxy_session *session;
726         nghttp2_session_callbacks *cbs;
727         nghttp2_option *option;
728 
729         session = apr_pcalloc(pool, sizeof(*session));
730         apr_pool_pre_cleanup_register(pool, p_conn, proxy_session_pre_close);
731         p_conn->data = session;
732 
733         session->id = apr_pstrdup(p_conn->scpool, id);
734         session->c = p_conn->connection;
735         session->p_conn = p_conn;
736         session->conf = conf;
737         session->pool = p_conn->scpool;
738         session->state = H2_PROXYS_ST_INIT;
739         session->h2_front = h2_front;
740         session->window_bits_stream = window_bits_stream;
741         session->window_bits_connection = window_bits_connection;
742         session->streams = h2_proxy_ihash_create(pool, offsetof(h2_proxy_stream, id));
743         session->suspended = h2_proxy_iq_create(pool, 5);
744         session->done = done;
745 
746         session->input = apr_brigade_create(session->pool, session->c->bucket_alloc);
747         session->output = apr_brigade_create(session->pool, session->c->bucket_alloc);
748 
749         nghttp2_session_callbacks_new(&cbs);
750         nghttp2_session_callbacks_set_on_frame_recv_callback(cbs, on_frame_recv);
751         nghttp2_session_callbacks_set_on_data_chunk_recv_callback(cbs, stream_response_data);
752         nghttp2_session_callbacks_set_on_stream_close_callback(cbs, on_stream_close);
753         nghttp2_session_callbacks_set_on_header_callback(cbs, on_header);
754         nghttp2_session_callbacks_set_before_frame_send_callback(cbs, before_frame_send);
755         nghttp2_session_callbacks_set_send_callback(cbs, raw_send);
756 #ifdef H2_NG2_INVALID_HEADER_CB
757         nghttp2_session_callbacks_set_on_invalid_header_callback(cbs, on_invalid_header_cb);
758 #endif
759 
760         nghttp2_option_new(&option);
761         nghttp2_option_set_peer_max_concurrent_streams(option, 100);
762         nghttp2_option_set_no_auto_window_update(option, 0);
763 
764         nghttp2_session_client_new2(&session->ngh2, cbs, session, option);
765 
766         nghttp2_option_del(option);
767         nghttp2_session_callbacks_del(cbs);
768 
769         ping_new_session(session, p_conn);
770         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03362)
771                       "setup session for %s", p_conn->hostname);
772     }
773     else {
774         h2_proxy_session *session = p_conn->data;
775         ping_reuse_session(session);
776     }
777     return p_conn->data;
778 }
779 
session_start(h2_proxy_session * session)780 static apr_status_t session_start(h2_proxy_session *session)
781 {
782     nghttp2_settings_entry settings[2];
783     int rv, add_conn_window;
784     apr_socket_t *s;
785 
786     s = ap_get_conn_socket(session->c);
787 #if (!defined(WIN32) && !defined(NETWARE)) || defined(DOXYGEN)
788     if (s) {
789         ap_sock_disable_nagle(s);
790     }
791 #endif
792 
793     settings[0].settings_id = NGHTTP2_SETTINGS_ENABLE_PUSH;
794     settings[0].value = 0;
795     settings[1].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
796     settings[1].value = (1 << session->window_bits_stream) - 1;
797 
798     rv = nghttp2_submit_settings(session->ngh2, NGHTTP2_FLAG_NONE, settings,
799                                  H2_ALEN(settings));
800 
801     /* If the connection window is larger than our default, trigger a WINDOW_UPDATE */
802     add_conn_window = ((1 << session->window_bits_connection) - 1 -
803                        NGHTTP2_INITIAL_CONNECTION_WINDOW_SIZE);
804     if (!rv && add_conn_window != 0) {
805         rv = nghttp2_submit_window_update(session->ngh2, NGHTTP2_FLAG_NONE, 0, add_conn_window);
806     }
807     return rv? APR_EGENERAL : APR_SUCCESS;
808 }
809 
open_stream(h2_proxy_session * session,const char * url,request_rec * r,int standalone,h2_proxy_stream ** pstream)810 static apr_status_t open_stream(h2_proxy_session *session, const char *url,
811                                 request_rec *r, int standalone,
812                                 h2_proxy_stream **pstream)
813 {
814     h2_proxy_stream *stream;
815     apr_uri_t puri;
816     const char *authority, *scheme, *path;
817     apr_status_t status;
818     proxy_dir_conf *dconf;
819 
820     stream = apr_pcalloc(r->pool, sizeof(*stream));
821 
822     stream->pool = r->pool;
823     stream->url = url;
824     stream->r = r;
825     stream->standalone = standalone;
826     stream->session = session;
827     stream->state = H2_STREAM_ST_IDLE;
828 
829     stream->input = apr_brigade_create(stream->pool, session->c->bucket_alloc);
830     stream->output = apr_brigade_create(stream->pool, session->c->bucket_alloc);
831 
832     stream->req = h2_proxy_req_create(1, stream->pool, 0);
833 
834     status = apr_uri_parse(stream->pool, url, &puri);
835     if (status != APR_SUCCESS)
836         return status;
837 
838     scheme = (strcmp(puri.scheme, "h2")? "http" : "https");
839 
840     dconf = ap_get_module_config(r->per_dir_config, &proxy_module);
841     if (dconf->preserve_host) {
842         authority = r->hostname;
843     }
844     else {
845         authority = puri.hostname;
846         if (!ap_strchr_c(authority, ':') && puri.port
847             && apr_uri_port_of_scheme(scheme) != puri.port) {
848             /* port info missing and port is not default for scheme: append */
849             authority = apr_psprintf(stream->pool, "%s:%d", authority, puri.port);
850         }
851     }
852 
853     /* we need this for mapping relative uris in headers ("Link") back
854      * to local uris */
855     stream->real_server_uri = apr_psprintf(stream->pool, "%s://%s", scheme, authority);
856     stream->p_server_uri = apr_psprintf(stream->pool, "%s://%s", puri.scheme, authority);
857     path = apr_uri_unparse(stream->pool, &puri, APR_URI_UNP_OMITSITEPART);
858 
859 
860     h2_proxy_req_make(stream->req, stream->pool, r->method, scheme,
861                 authority, path, r->headers_in);
862 
863     if (dconf->add_forwarded_headers) {
864         if (PROXYREQ_REVERSE == r->proxyreq) {
865             const char *buf;
866 
867             /* Add X-Forwarded-For: so that the upstream has a chance to
868              * determine, where the original request came from.
869              */
870             apr_table_mergen(stream->req->headers, "X-Forwarded-For",
871                              r->useragent_ip);
872 
873             /* Add X-Forwarded-Host: so that upstream knows what the
874              * original request hostname was.
875              */
876             if ((buf = apr_table_get(r->headers_in, "Host"))) {
877                 apr_table_mergen(stream->req->headers, "X-Forwarded-Host", buf);
878             }
879 
880             /* Add X-Forwarded-Server: so that upstream knows what the
881              * name of this proxy server is (if there are more than one)
882              * XXX: This duplicates Via: - do we strictly need it?
883              */
884             apr_table_mergen(stream->req->headers, "X-Forwarded-Server",
885                              r->server->server_hostname);
886         }
887     }
888 
889     /* Tuck away all already existing cookies */
890     stream->saves = apr_table_make(r->pool, 2);
891     apr_table_do(add_header, stream->saves, r->headers_out, "Set-Cookie", NULL);
892 
893     *pstream = stream;
894 
895     return APR_SUCCESS;
896 }
897 
submit_stream(h2_proxy_session * session,h2_proxy_stream * stream)898 static apr_status_t submit_stream(h2_proxy_session *session, h2_proxy_stream *stream)
899 {
900     h2_proxy_ngheader *hd;
901     nghttp2_data_provider *pp = NULL;
902     nghttp2_data_provider provider;
903     int rv, may_have_request_body = 1;
904     apr_status_t status;
905 
906     hd = h2_proxy_util_nghd_make_req(stream->pool, stream->req);
907 
908     /* If we expect a 100-continue response, we must refrain from reading
909        any input until we get it. Reading the input will possibly trigger
910        HTTP_IN filter to generate the 100-continue itself. */
911     if (stream->waiting_on_100 || stream->waiting_on_ping) {
912         /* make a small test if we get an EOF/EOS immediately */
913         status = ap_get_brigade(stream->r->input_filters, stream->input,
914                                 AP_MODE_READBYTES, APR_NONBLOCK_READ,
915                                 APR_BUCKET_BUFF_SIZE);
916         may_have_request_body = APR_STATUS_IS_EAGAIN(status)
917                                 || (status == APR_SUCCESS
918                                     && !APR_BUCKET_IS_EOS(APR_BRIGADE_FIRST(stream->input)));
919     }
920 
921     if (may_have_request_body) {
922         provider.source.fd = 0;
923         provider.source.ptr = NULL;
924         provider.read_callback = stream_request_data;
925         pp = &provider;
926     }
927 
928     rv = nghttp2_submit_request(session->ngh2, NULL,
929                                 hd->nv, hd->nvlen, pp, stream);
930 
931     ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03363)
932                   "h2_proxy_session(%s): submit %s%s -> %d",
933                   session->id, stream->req->authority, stream->req->path,
934                   rv);
935     if (rv > 0) {
936         stream->id = rv;
937         stream->state = H2_STREAM_ST_OPEN;
938         h2_proxy_ihash_add(session->streams, stream);
939         dispatch_event(session, H2_PROXYS_EV_STREAM_SUBMITTED, rv, NULL);
940 
941         return APR_SUCCESS;
942     }
943     return APR_EGENERAL;
944 }
945 
submit_trailers(h2_proxy_stream * stream)946 static apr_status_t submit_trailers(h2_proxy_stream *stream)
947 {
948     h2_proxy_ngheader *hd;
949     int rv;
950 
951     hd = h2_proxy_util_nghd_make(stream->pool, stream->r->trailers_in);
952     rv = nghttp2_submit_trailer(stream->session->ngh2, stream->id, hd->nv, hd->nvlen);
953     return rv == 0? APR_SUCCESS: APR_EGENERAL;
954 }
955 
feed_brigade(h2_proxy_session * session,apr_bucket_brigade * bb)956 static apr_status_t feed_brigade(h2_proxy_session *session, apr_bucket_brigade *bb)
957 {
958     apr_status_t status = APR_SUCCESS;
959     apr_size_t readlen = 0;
960     ssize_t n;
961 
962     while (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(bb)) {
963         apr_bucket* b = APR_BRIGADE_FIRST(bb);
964 
965         if (APR_BUCKET_IS_METADATA(b)) {
966             /* nop */
967         }
968         else {
969             const char *bdata = NULL;
970             apr_size_t blen = 0;
971 
972             status = apr_bucket_read(b, &bdata, &blen, APR_BLOCK_READ);
973             if (status == APR_SUCCESS && blen > 0) {
974                 n = nghttp2_session_mem_recv(session->ngh2, (const uint8_t *)bdata, blen);
975                 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
976                               "h2_proxy_session(%s): feeding %ld bytes -> %ld",
977                               session->id, (long)blen, (long)n);
978                 if (n < 0) {
979                     if (nghttp2_is_fatal((int)n)) {
980                         status = APR_EGENERAL;
981                     }
982                 }
983                 else {
984                     readlen += n;
985                     if (n < blen) {
986                         apr_bucket_split(b, n);
987                     }
988                 }
989             }
990         }
991         apr_bucket_delete(b);
992     }
993 
994     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
995                   "h2_proxy_session(%s): fed %ld bytes of input to session",
996                   session->id, (long)readlen);
997     if (readlen == 0 && status == APR_SUCCESS) {
998         return APR_EAGAIN;
999     }
1000     return status;
1001 }
1002 
h2_proxy_session_read(h2_proxy_session * session,int block,apr_interval_time_t timeout)1003 static apr_status_t h2_proxy_session_read(h2_proxy_session *session, int block,
1004                                           apr_interval_time_t timeout)
1005 {
1006     apr_status_t status = APR_SUCCESS;
1007 
1008     if (APR_BRIGADE_EMPTY(session->input)) {
1009         apr_socket_t *socket = NULL;
1010         apr_time_t save_timeout = -1;
1011 
1012         if (block && timeout > 0) {
1013             socket = ap_get_conn_socket(session->c);
1014             if (socket) {
1015                 apr_socket_timeout_get(socket, &save_timeout);
1016                 apr_socket_timeout_set(socket, timeout);
1017             }
1018             else {
1019                 /* cannot block on timeout */
1020                 ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, session->c, APLOGNO(03379)
1021                               "h2_proxy_session(%s): unable to get conn socket",
1022                               session->id);
1023                 return APR_ENOTIMPL;
1024             }
1025         }
1026 
1027         status = ap_get_brigade(session->c->input_filters, session->input,
1028                                 AP_MODE_READBYTES,
1029                                 block? APR_BLOCK_READ : APR_NONBLOCK_READ,
1030                                 64 * 1024);
1031         ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, session->c,
1032                       "h2_proxy_session(%s): read from conn", session->id);
1033         if (socket && save_timeout != -1) {
1034             apr_socket_timeout_set(socket, save_timeout);
1035         }
1036     }
1037 
1038     if (status == APR_SUCCESS) {
1039         status = feed_brigade(session, session->input);
1040     }
1041     else if (APR_STATUS_IS_TIMEUP(status)) {
1042         /* nop */
1043     }
1044     else if (!APR_STATUS_IS_EAGAIN(status)) {
1045         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, APLOGNO(03380)
1046                       "h2_proxy_session(%s): read error", session->id);
1047         dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, status, NULL);
1048     }
1049 
1050     return status;
1051 }
1052 
h2_proxy_session_submit(h2_proxy_session * session,const char * url,request_rec * r,int standalone)1053 apr_status_t h2_proxy_session_submit(h2_proxy_session *session,
1054                                      const char *url, request_rec *r,
1055                                      int standalone)
1056 {
1057     h2_proxy_stream *stream;
1058     apr_status_t status;
1059 
1060     status = open_stream(session, url, r, standalone, &stream);
1061     if (status == APR_SUCCESS) {
1062         ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, APLOGNO(03381)
1063                       "process stream(%d): %s %s%s, original: %s",
1064                       stream->id, stream->req->method,
1065                       stream->req->authority, stream->req->path,
1066                       r->the_request);
1067         status = submit_stream(session, stream);
1068     }
1069     return status;
1070 }
1071 
stream_resume(h2_proxy_stream * stream)1072 static void stream_resume(h2_proxy_stream *stream)
1073 {
1074     h2_proxy_session *session = stream->session;
1075     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
1076                   "h2_proxy_stream(%s-%d): resuming",
1077                   session->id, stream->id);
1078     stream->suspended = 0;
1079     h2_proxy_iq_remove(session->suspended, stream->id);
1080     nghttp2_session_resume_data(session->ngh2, stream->id);
1081     dispatch_event(session, H2_PROXYS_EV_STREAM_RESUMED, 0, NULL);
1082 }
1083 
is_waiting_for_backend(h2_proxy_session * session)1084 static int is_waiting_for_backend(h2_proxy_session *session)
1085 {
1086     return ((session->ping_state != H2_PING_ST_NONE)
1087             || ((session->suspended->nelts <= 0)
1088                 && !nghttp2_session_want_write(session->ngh2)
1089                 && nghttp2_session_want_read(session->ngh2)));
1090 }
1091 
check_suspended(h2_proxy_session * session)1092 static apr_status_t check_suspended(h2_proxy_session *session)
1093 {
1094     h2_proxy_stream *stream;
1095     int i, stream_id;
1096     apr_status_t status;
1097 
1098     for (i = 0; i < session->suspended->nelts; ++i) {
1099         stream_id = session->suspended->elts[i];
1100         stream = nghttp2_session_get_stream_user_data(session->ngh2, stream_id);
1101         if (stream) {
1102             if (stream->waiting_on_100 || stream->waiting_on_ping) {
1103                 status = APR_EAGAIN;
1104             }
1105             else {
1106                 status = ap_get_brigade(stream->r->input_filters, stream->input,
1107                                         AP_MODE_READBYTES, APR_NONBLOCK_READ,
1108                                         APR_BUCKET_BUFF_SIZE);
1109             }
1110             if (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(stream->input)) {
1111                 stream_resume(stream);
1112                 check_suspended(session);
1113                 return APR_SUCCESS;
1114             }
1115             else if (status != APR_SUCCESS && !APR_STATUS_IS_EAGAIN(status)) {
1116                 ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, session->c,
1117                               APLOGNO(03382) "h2_proxy_stream(%s-%d): check input",
1118                               session->id, stream_id);
1119                 stream_resume(stream);
1120                 check_suspended(session);
1121                 return APR_SUCCESS;
1122             }
1123         }
1124         else {
1125             /* gone? */
1126             h2_proxy_iq_remove(session->suspended, stream_id);
1127             check_suspended(session);
1128             return APR_SUCCESS;
1129         }
1130     }
1131     return APR_EAGAIN;
1132 }
1133 
session_shutdown(h2_proxy_session * session,int reason,const char * msg)1134 static apr_status_t session_shutdown(h2_proxy_session *session, int reason,
1135                                      const char *msg)
1136 {
1137     apr_status_t status = APR_SUCCESS;
1138     const char *err = msg;
1139 
1140     ap_assert(session);
1141     if (!err && reason) {
1142         err = nghttp2_strerror(reason);
1143     }
1144     nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, 0,
1145                           reason, (uint8_t*)err, err? strlen(err):0);
1146     status = nghttp2_session_send(session->ngh2);
1147     dispatch_event(session, H2_PROXYS_EV_LOCAL_GOAWAY, reason, err);
1148     return status;
1149 }
1150 
1151 
1152 static const char *StateNames[] = {
1153     "INIT",      /* H2_PROXYS_ST_INIT */
1154     "DONE",      /* H2_PROXYS_ST_DONE */
1155     "IDLE",      /* H2_PROXYS_ST_IDLE */
1156     "BUSY",      /* H2_PROXYS_ST_BUSY */
1157     "WAIT",      /* H2_PROXYS_ST_WAIT */
1158     "LSHUTDOWN", /* H2_PROXYS_ST_LOCAL_SHUTDOWN */
1159     "RSHUTDOWN", /* H2_PROXYS_ST_REMOTE_SHUTDOWN */
1160 };
1161 
state_name(h2_proxys_state state)1162 static const char *state_name(h2_proxys_state state)
1163 {
1164     if (state >= (sizeof(StateNames)/sizeof(StateNames[0]))) {
1165         return "unknown";
1166     }
1167     return StateNames[state];
1168 }
1169 
is_accepting_streams(h2_proxy_session * session)1170 static int is_accepting_streams(h2_proxy_session *session)
1171 {
1172     switch (session->state) {
1173         case H2_PROXYS_ST_IDLE:
1174         case H2_PROXYS_ST_BUSY:
1175         case H2_PROXYS_ST_WAIT:
1176             return 1;
1177         default:
1178             return 0;
1179     }
1180 }
1181 
transit(h2_proxy_session * session,const char * action,h2_proxys_state nstate)1182 static void transit(h2_proxy_session *session, const char *action,
1183                     h2_proxys_state nstate)
1184 {
1185     ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03345)
1186                   "h2_proxy_session(%s): transit [%s] -- %s --> [%s]", session->id,
1187                   state_name(session->state), action, state_name(nstate));
1188     session->state = nstate;
1189 }
1190 
ev_init(h2_proxy_session * session,int arg,const char * msg)1191 static void ev_init(h2_proxy_session *session, int arg, const char *msg)
1192 {
1193     switch (session->state) {
1194         case H2_PROXYS_ST_INIT:
1195             if (h2_proxy_ihash_empty(session->streams)) {
1196                 transit(session, "init", H2_PROXYS_ST_IDLE);
1197             }
1198             else {
1199                 transit(session, "init", H2_PROXYS_ST_BUSY);
1200             }
1201             break;
1202 
1203         default:
1204             /* nop */
1205             break;
1206     }
1207 }
1208 
ev_local_goaway(h2_proxy_session * session,int arg,const char * msg)1209 static void ev_local_goaway(h2_proxy_session *session, int arg, const char *msg)
1210 {
1211     switch (session->state) {
1212         case H2_PROXYS_ST_LOCAL_SHUTDOWN:
1213             /* already did that? */
1214             break;
1215         case H2_PROXYS_ST_IDLE:
1216         case H2_PROXYS_ST_REMOTE_SHUTDOWN:
1217             /* all done */
1218             transit(session, "local goaway", H2_PROXYS_ST_DONE);
1219             break;
1220         default:
1221             transit(session, "local goaway", H2_PROXYS_ST_LOCAL_SHUTDOWN);
1222             break;
1223     }
1224 }
1225 
ev_remote_goaway(h2_proxy_session * session,int arg,const char * msg)1226 static void ev_remote_goaway(h2_proxy_session *session, int arg, const char *msg)
1227 {
1228     switch (session->state) {
1229         case H2_PROXYS_ST_REMOTE_SHUTDOWN:
1230             /* already received that? */
1231             break;
1232         case H2_PROXYS_ST_IDLE:
1233         case H2_PROXYS_ST_LOCAL_SHUTDOWN:
1234             /* all done */
1235             transit(session, "remote goaway", H2_PROXYS_ST_DONE);
1236             break;
1237         default:
1238             transit(session, "remote goaway", H2_PROXYS_ST_REMOTE_SHUTDOWN);
1239             break;
1240     }
1241 }
1242 
ev_conn_error(h2_proxy_session * session,int arg,const char * msg)1243 static void ev_conn_error(h2_proxy_session *session, int arg, const char *msg)
1244 {
1245     switch (session->state) {
1246         case H2_PROXYS_ST_INIT:
1247         case H2_PROXYS_ST_DONE:
1248         case H2_PROXYS_ST_LOCAL_SHUTDOWN:
1249             /* just leave */
1250             transit(session, "conn error", H2_PROXYS_ST_DONE);
1251             break;
1252 
1253         default:
1254             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, arg, session->c,
1255                           "h2_proxy_session(%s): conn error -> shutdown", session->id);
1256             session_shutdown(session, arg, msg);
1257             break;
1258     }
1259 }
1260 
ev_proto_error(h2_proxy_session * session,int arg,const char * msg)1261 static void ev_proto_error(h2_proxy_session *session, int arg, const char *msg)
1262 {
1263     switch (session->state) {
1264         case H2_PROXYS_ST_DONE:
1265         case H2_PROXYS_ST_LOCAL_SHUTDOWN:
1266             /* just leave */
1267             transit(session, "proto error", H2_PROXYS_ST_DONE);
1268             break;
1269 
1270         default:
1271             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
1272                           "h2_proxy_session(%s): proto error -> shutdown", session->id);
1273             session_shutdown(session, arg, msg);
1274             break;
1275     }
1276 }
1277 
ev_conn_timeout(h2_proxy_session * session,int arg,const char * msg)1278 static void ev_conn_timeout(h2_proxy_session *session, int arg, const char *msg)
1279 {
1280     switch (session->state) {
1281         case H2_PROXYS_ST_LOCAL_SHUTDOWN:
1282             transit(session, "conn timeout", H2_PROXYS_ST_DONE);
1283             break;
1284         default:
1285             session_shutdown(session, arg, msg);
1286             transit(session, "conn timeout", H2_PROXYS_ST_DONE);
1287             break;
1288     }
1289 }
1290 
ev_no_io(h2_proxy_session * session,int arg,const char * msg)1291 static void ev_no_io(h2_proxy_session *session, int arg, const char *msg)
1292 {
1293     switch (session->state) {
1294         case H2_PROXYS_ST_BUSY:
1295         case H2_PROXYS_ST_LOCAL_SHUTDOWN:
1296         case H2_PROXYS_ST_REMOTE_SHUTDOWN:
1297             /* nothing for input and output to do. If we remain
1298              * in this state, we go into a tight loop and suck up
1299              * CPU cycles. Ideally, we'd like to do a blocking read, but that
1300              * is not possible if we have scheduled tasks and wait
1301              * for them to produce something. */
1302             if (h2_proxy_ihash_empty(session->streams)) {
1303                 if (!is_accepting_streams(session)) {
1304                     /* We are no longer accepting new streams and have
1305                      * finished processing existing ones. Time to leave. */
1306                     session_shutdown(session, arg, msg);
1307                     transit(session, "no io", H2_PROXYS_ST_DONE);
1308                 }
1309                 else {
1310                     /* When we have no streams, no task events are possible,
1311                      * switch to blocking reads */
1312                     transit(session, "no io", H2_PROXYS_ST_IDLE);
1313                 }
1314             }
1315             else {
1316                 /* Unable to do blocking reads, as we wait on events from
1317                  * task processing in other threads. Do a busy wait with
1318                  * backoff timer. */
1319                 transit(session, "no io", H2_PROXYS_ST_WAIT);
1320             }
1321             break;
1322         default:
1323             /* nop */
1324             break;
1325     }
1326 }
1327 
ev_stream_submitted(h2_proxy_session * session,int stream_id,const char * msg)1328 static void ev_stream_submitted(h2_proxy_session *session, int stream_id,
1329                                 const char *msg)
1330 {
1331     switch (session->state) {
1332         case H2_PROXYS_ST_IDLE:
1333         case H2_PROXYS_ST_WAIT:
1334             transit(session, "stream submitted", H2_PROXYS_ST_BUSY);
1335             break;
1336         default:
1337             /* nop */
1338             break;
1339     }
1340 }
1341 
ev_stream_done(h2_proxy_session * session,int stream_id,const char * msg)1342 static void ev_stream_done(h2_proxy_session *session, int stream_id,
1343                            const char *msg)
1344 {
1345     h2_proxy_stream *stream;
1346 
1347     stream = nghttp2_session_get_stream_user_data(session->ngh2, stream_id);
1348     if (stream) {
1349         int touched = (stream->data_sent ||
1350                        stream_id <= session->last_stream_id);
1351         apr_status_t status = (stream->error_code == 0)? APR_SUCCESS : APR_EINVAL;
1352         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03364)
1353                       "h2_proxy_sesssion(%s): stream(%d) closed "
1354                       "(touched=%d, error=%d)",
1355                       session->id, stream_id, touched, stream->error_code);
1356 
1357         if (status != APR_SUCCESS) {
1358             stream->r->status = 500;
1359         }
1360         else if (!stream->data_received) {
1361             apr_bucket *b;
1362             /* if the response had no body, this is the time to flush
1363              * an empty brigade which will also write the resonse
1364              * headers */
1365             h2_proxy_stream_end_headers_out(stream);
1366             stream->data_received = 1;
1367             b = apr_bucket_flush_create(stream->r->connection->bucket_alloc);
1368             APR_BRIGADE_INSERT_TAIL(stream->output, b);
1369             b = apr_bucket_eos_create(stream->r->connection->bucket_alloc);
1370             APR_BRIGADE_INSERT_TAIL(stream->output, b);
1371             ap_pass_brigade(stream->r->output_filters, stream->output);
1372         }
1373 
1374         stream->state = H2_STREAM_ST_CLOSED;
1375         h2_proxy_ihash_remove(session->streams, stream_id);
1376         h2_proxy_iq_remove(session->suspended, stream_id);
1377         if (session->done) {
1378             session->done(session, stream->r, status, touched);
1379         }
1380     }
1381 
1382     switch (session->state) {
1383         default:
1384             /* nop */
1385             break;
1386     }
1387 }
1388 
ev_stream_resumed(h2_proxy_session * session,int arg,const char * msg)1389 static void ev_stream_resumed(h2_proxy_session *session, int arg, const char *msg)
1390 {
1391     switch (session->state) {
1392         case H2_PROXYS_ST_WAIT:
1393             transit(session, "stream resumed", H2_PROXYS_ST_BUSY);
1394             break;
1395         default:
1396             /* nop */
1397             break;
1398     }
1399 }
1400 
ev_data_read(h2_proxy_session * session,int arg,const char * msg)1401 static void ev_data_read(h2_proxy_session *session, int arg, const char *msg)
1402 {
1403     switch (session->state) {
1404         case H2_PROXYS_ST_IDLE:
1405         case H2_PROXYS_ST_WAIT:
1406             transit(session, "data read", H2_PROXYS_ST_BUSY);
1407             break;
1408         default:
1409             /* nop */
1410             break;
1411     }
1412 }
1413 
ev_ngh2_done(h2_proxy_session * session,int arg,const char * msg)1414 static void ev_ngh2_done(h2_proxy_session *session, int arg, const char *msg)
1415 {
1416     switch (session->state) {
1417         case H2_PROXYS_ST_DONE:
1418             /* nop */
1419             break;
1420         default:
1421             transit(session, "nghttp2 done", H2_PROXYS_ST_DONE);
1422             break;
1423     }
1424 }
1425 
ev_pre_close(h2_proxy_session * session,int arg,const char * msg)1426 static void ev_pre_close(h2_proxy_session *session, int arg, const char *msg)
1427 {
1428     switch (session->state) {
1429         case H2_PROXYS_ST_DONE:
1430         case H2_PROXYS_ST_LOCAL_SHUTDOWN:
1431             /* nop */
1432             break;
1433         default:
1434             session_shutdown(session, arg, msg);
1435             break;
1436     }
1437 }
1438 
dispatch_event(h2_proxy_session * session,h2_proxys_event_t ev,int arg,const char * msg)1439 static void dispatch_event(h2_proxy_session *session, h2_proxys_event_t ev,
1440                            int arg, const char *msg)
1441 {
1442     switch (ev) {
1443         case H2_PROXYS_EV_INIT:
1444             ev_init(session, arg, msg);
1445             break;
1446         case H2_PROXYS_EV_LOCAL_GOAWAY:
1447             ev_local_goaway(session, arg, msg);
1448             break;
1449         case H2_PROXYS_EV_REMOTE_GOAWAY:
1450             ev_remote_goaway(session, arg, msg);
1451             break;
1452         case H2_PROXYS_EV_CONN_ERROR:
1453             ev_conn_error(session, arg, msg);
1454             break;
1455         case H2_PROXYS_EV_PROTO_ERROR:
1456             ev_proto_error(session, arg, msg);
1457             break;
1458         case H2_PROXYS_EV_CONN_TIMEOUT:
1459             ev_conn_timeout(session, arg, msg);
1460             break;
1461         case H2_PROXYS_EV_NO_IO:
1462             ev_no_io(session, arg, msg);
1463             break;
1464         case H2_PROXYS_EV_STREAM_SUBMITTED:
1465             ev_stream_submitted(session, arg, msg);
1466             break;
1467         case H2_PROXYS_EV_STREAM_DONE:
1468             ev_stream_done(session, arg, msg);
1469             break;
1470         case H2_PROXYS_EV_STREAM_RESUMED:
1471             ev_stream_resumed(session, arg, msg);
1472             break;
1473         case H2_PROXYS_EV_DATA_READ:
1474             ev_data_read(session, arg, msg);
1475             break;
1476         case H2_PROXYS_EV_NGH2_DONE:
1477             ev_ngh2_done(session, arg, msg);
1478             break;
1479         case H2_PROXYS_EV_PRE_CLOSE:
1480             ev_pre_close(session, arg, msg);
1481             break;
1482         default:
1483             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
1484                           "h2_proxy_session(%s): unknown event %d",
1485                           session->id, ev);
1486             break;
1487     }
1488 }
1489 
send_loop(h2_proxy_session * session)1490 static int send_loop(h2_proxy_session *session)
1491 {
1492     while (nghttp2_session_want_write(session->ngh2)) {
1493         int rv = nghttp2_session_send(session->ngh2);
1494         if (rv < 0 && nghttp2_is_fatal(rv)) {
1495             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
1496                           "h2_proxy_session(%s): write, rv=%d", session->id, rv);
1497             dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, rv, NULL);
1498             break;
1499         }
1500         return 1;
1501     }
1502     return 0;
1503 }
1504 
h2_proxy_session_process(h2_proxy_session * session)1505 apr_status_t h2_proxy_session_process(h2_proxy_session *session)
1506 {
1507     apr_status_t status;
1508     int have_written = 0, have_read = 0;
1509 
1510     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
1511                   "h2_proxy_session(%s): process", session->id);
1512 
1513 run_loop:
1514     switch (session->state) {
1515         case H2_PROXYS_ST_INIT:
1516             status = session_start(session);
1517             if (status == APR_SUCCESS) {
1518                 dispatch_event(session, H2_PROXYS_EV_INIT, 0, NULL);
1519                 goto run_loop;
1520             }
1521             else {
1522                 dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, status, NULL);
1523             }
1524             break;
1525 
1526         case H2_PROXYS_ST_BUSY:
1527         case H2_PROXYS_ST_LOCAL_SHUTDOWN:
1528         case H2_PROXYS_ST_REMOTE_SHUTDOWN:
1529             have_written = send_loop(session);
1530 
1531             if (nghttp2_session_want_read(session->ngh2)) {
1532                 status = h2_proxy_session_read(session, 0, 0);
1533                 if (status == APR_SUCCESS) {
1534                     have_read = 1;
1535                 }
1536             }
1537 
1538             if (!have_written && !have_read
1539                 && !nghttp2_session_want_write(session->ngh2)) {
1540                 dispatch_event(session, H2_PROXYS_EV_NO_IO, 0, NULL);
1541                 goto run_loop;
1542             }
1543             break;
1544 
1545         case H2_PROXYS_ST_WAIT:
1546             if (is_waiting_for_backend(session)) {
1547                 /* we can do a blocking read with the default timeout (as
1548                  * configured via ProxyTimeout in our socket. There is
1549                  * nothing we want to send or check until we get more data
1550                  * from the backend. */
1551                 status = h2_proxy_session_read(session, 1, 0);
1552                 if (status == APR_SUCCESS) {
1553                     have_read = 1;
1554                     dispatch_event(session, H2_PROXYS_EV_DATA_READ, 0, NULL);
1555                 }
1556                 else {
1557                     dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, status, NULL);
1558                     return status;
1559                 }
1560             }
1561             else if (check_suspended(session) == APR_EAGAIN) {
1562                 /* no stream has become resumed. Do a blocking read with
1563                  * ever increasing timeouts... */
1564                 if (session->wait_timeout < 25) {
1565                     session->wait_timeout = 25;
1566                 }
1567                 else {
1568                     session->wait_timeout = H2MIN(apr_time_from_msec(100),
1569                                                   2*session->wait_timeout);
1570                 }
1571 
1572                 status = h2_proxy_session_read(session, 1, session->wait_timeout);
1573                 ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, session->c,
1574                               APLOGNO(03365)
1575                               "h2_proxy_session(%s): WAIT read, timeout=%fms",
1576                               session->id, session->wait_timeout/1000.0);
1577                 if (status == APR_SUCCESS) {
1578                     have_read = 1;
1579                     dispatch_event(session, H2_PROXYS_EV_DATA_READ, 0, NULL);
1580                 }
1581                 else if (APR_STATUS_IS_TIMEUP(status)
1582                     || APR_STATUS_IS_EAGAIN(status)) {
1583                     /* go back to checking all inputs again */
1584                     transit(session, "wait cycle", H2_PROXYS_ST_BUSY);
1585                 }
1586             }
1587             break;
1588 
1589         case H2_PROXYS_ST_IDLE:
1590             break;
1591 
1592         case H2_PROXYS_ST_DONE: /* done, session terminated */
1593             return APR_EOF;
1594 
1595         default:
1596             ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_EGENERAL, session->c,
1597                           APLOGNO(03346)"h2_proxy_session(%s): unknown state %d",
1598                           session->id, session->state);
1599             dispatch_event(session, H2_PROXYS_EV_PROTO_ERROR, 0, NULL);
1600             break;
1601     }
1602 
1603 
1604     if (have_read || have_written) {
1605         session->wait_timeout = 0;
1606     }
1607 
1608     if (!nghttp2_session_want_read(session->ngh2)
1609         && !nghttp2_session_want_write(session->ngh2)) {
1610         dispatch_event(session, H2_PROXYS_EV_NGH2_DONE, 0, NULL);
1611     }
1612 
1613     return APR_SUCCESS; /* needs to be called again */
1614 }
1615 
1616 typedef struct {
1617     h2_proxy_session *session;
1618     h2_proxy_request_done *done;
1619 } cleanup_iter_ctx;
1620 
cancel_iter(void * udata,void * val)1621 static int cancel_iter(void *udata, void *val)
1622 {
1623     cleanup_iter_ctx *ctx = udata;
1624     h2_proxy_stream *stream = val;
1625     nghttp2_submit_rst_stream(ctx->session->ngh2, NGHTTP2_FLAG_NONE,
1626                               stream->id, 0);
1627     return 1;
1628 }
1629 
h2_proxy_session_cancel_all(h2_proxy_session * session)1630 void h2_proxy_session_cancel_all(h2_proxy_session *session)
1631 {
1632     if (!h2_proxy_ihash_empty(session->streams)) {
1633         cleanup_iter_ctx ctx;
1634         ctx.session = session;
1635         ctx.done = session->done;
1636         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03366)
1637                       "h2_proxy_session(%s): cancel  %d streams",
1638                       session->id, (int)h2_proxy_ihash_count(session->streams));
1639         h2_proxy_ihash_iter(session->streams, cancel_iter, &ctx);
1640         session_shutdown(session, 0, NULL);
1641     }
1642 }
1643 
done_iter(void * udata,void * val)1644 static int done_iter(void *udata, void *val)
1645 {
1646     cleanup_iter_ctx *ctx = udata;
1647     h2_proxy_stream *stream = val;
1648     int touched = (stream->data_sent ||
1649                    stream->id <= ctx->session->last_stream_id);
1650     ctx->done(ctx->session, stream->r, APR_ECONNABORTED, touched);
1651     return 1;
1652 }
1653 
h2_proxy_session_cleanup(h2_proxy_session * session,h2_proxy_request_done * done)1654 void h2_proxy_session_cleanup(h2_proxy_session *session,
1655                               h2_proxy_request_done *done)
1656 {
1657     if (!h2_proxy_ihash_empty(session->streams)) {
1658         cleanup_iter_ctx ctx;
1659         ctx.session = session;
1660         ctx.done = done;
1661         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03519)
1662                       "h2_proxy_session(%s): terminated, %d streams unfinished",
1663                       session->id, (int)h2_proxy_ihash_count(session->streams));
1664         h2_proxy_ihash_iter(session->streams, done_iter, &ctx);
1665         h2_proxy_ihash_clear(session->streams);
1666     }
1667 }
1668 
ping_arrived_iter(void * udata,void * val)1669 static int ping_arrived_iter(void *udata, void *val)
1670 {
1671     h2_proxy_stream *stream = val;
1672     if (stream->waiting_on_ping) {
1673         stream->waiting_on_ping = 0;
1674         stream_resume(stream);
1675     }
1676     return 1;
1677 }
1678 
ping_arrived(h2_proxy_session * session)1679 static void ping_arrived(h2_proxy_session *session)
1680 {
1681     if (!h2_proxy_ihash_empty(session->streams)) {
1682         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03470)
1683                       "h2_proxy_session(%s): ping arrived, unblocking streams",
1684                       session->id);
1685         h2_proxy_ihash_iter(session->streams, ping_arrived_iter, &session);
1686     }
1687 }
1688 
1689 typedef struct {
1690     h2_proxy_session *session;
1691     conn_rec *c;
1692     apr_off_t bytes;
1693     int updated;
1694 } win_update_ctx;
1695 
1696