1 /* Licensed to the Apache Software Foundation (ASF) under one or more
2  * contributor license agreements.  See the NOTICE file distributed with
3  * this work for additional information regarding copyright ownership.
4  * The ASF licenses this file to You under the Apache License, Version 2.0
5  * (the "License"); you may not use this file except in compliance with
6  * the License.  You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <assert.h>
18 #include <stddef.h>
19 #include <stdlib.h>
20 
21 #include <apr_atomic.h>
22 #include <apr_thread_mutex.h>
23 #include <apr_thread_cond.h>
24 #include <apr_strings.h>
25 #include <apr_time.h>
26 
27 #include <httpd.h>
28 #include <http_core.h>
29 #include <http_log.h>
30 
31 #include <mpm_common.h>
32 
33 #include "mod_http2.h"
34 
35 #include "h2.h"
36 #include "h2_private.h"
37 #include "h2_bucket_beam.h"
38 #include "h2_config.h"
39 #include "h2_c1.h"
40 #include "h2_conn_ctx.h"
41 #include "h2_protocol.h"
42 #include "h2_mplx.h"
43 #include "h2_request.h"
44 #include "h2_stream.h"
45 #include "h2_session.h"
46 #include "h2_c2.h"
47 #include "h2_workers.h"
48 #include "h2_util.h"
49 
50 
51 /* utility for iterating over ihash stream sets */
52 typedef struct {
53     h2_mplx *m;
54     h2_stream *stream;
55     apr_time_t now;
56     apr_size_t count;
57 } stream_iter_ctx;
58 
59 static apr_status_t s_mplx_be_happy(h2_mplx *m, conn_rec *c, h2_conn_ctx_t *conn_ctx);
60 static apr_status_t m_be_annoyed(h2_mplx *m);
61 
62 static apr_status_t mplx_pollset_create(h2_mplx *m);
63 static apr_status_t mplx_pollset_add(h2_mplx *m, h2_conn_ctx_t *conn_ctx);
64 static apr_status_t mplx_pollset_remove(h2_mplx *m, h2_conn_ctx_t *conn_ctx);
65 static apr_status_t mplx_pollset_poll(h2_mplx *m, apr_interval_time_t timeout,
66                             stream_ev_callback *on_stream_input,
67                             stream_ev_callback *on_stream_output,
68                             void *on_ctx);
69 
70 static apr_pool_t *pchild;
71 
h2_mplx_c1_child_init(apr_pool_t * pool,server_rec * s)72 apr_status_t h2_mplx_c1_child_init(apr_pool_t *pool, server_rec *s)
73 {
74     pchild = pool;
75     return APR_SUCCESS;
76 }
77 
78 #define H2_MPLX_ENTER(m)    \
79     do { apr_status_t rv_lock; if ((rv_lock = apr_thread_mutex_lock(m->lock)) != APR_SUCCESS) {\
80         return rv_lock;\
81     } } while(0)
82 
83 #define H2_MPLX_LEAVE(m)    \
84     apr_thread_mutex_unlock(m->lock)
85 
86 #define H2_MPLX_ENTER_ALWAYS(m)    \
87     apr_thread_mutex_lock(m->lock)
88 
89 #define H2_MPLX_ENTER_MAYBE(m, dolock)    \
90     if (dolock) apr_thread_mutex_lock(m->lock)
91 
92 #define H2_MPLX_LEAVE_MAYBE(m, dolock)    \
93     if (dolock) apr_thread_mutex_unlock(m->lock)
94 
c1_input_consumed(void * ctx,h2_bucket_beam * beam,apr_off_t length)95 static void c1_input_consumed(void *ctx, h2_bucket_beam *beam, apr_off_t length)
96 {
97     h2_stream_in_consumed(ctx, length);
98 }
99 
stream_is_running(h2_stream * stream)100 static int stream_is_running(h2_stream *stream)
101 {
102     h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(stream->c2);
103     return conn_ctx && conn_ctx->started_at != 0 && !conn_ctx->done;
104 }
105 
h2_mplx_c1_stream_is_running(h2_mplx * m,h2_stream * stream)106 int h2_mplx_c1_stream_is_running(h2_mplx *m, h2_stream *stream)
107 {
108     int rv;
109 
110     H2_MPLX_ENTER(m);
111     rv = stream_is_running(stream);
112     H2_MPLX_LEAVE(m);
113     return rv;
114 }
115 
c1c2_stream_joined(h2_mplx * m,h2_stream * stream)116 static void c1c2_stream_joined(h2_mplx *m, h2_stream *stream)
117 {
118     ap_assert(!stream_is_running(stream));
119 
120     h2_ihash_remove(m->shold, stream->id);
121     APR_ARRAY_PUSH(m->spurge, h2_stream *) = stream;
122 }
123 
m_stream_cleanup(h2_mplx * m,h2_stream * stream)124 static void m_stream_cleanup(h2_mplx *m, h2_stream *stream)
125 {
126     h2_conn_ctx_t *c2_ctx = stream->c2? h2_conn_ctx_get(stream->c2) : NULL;
127 
128     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
129                   H2_STRM_MSG(stream, "cleanup, unsubscribing from beam events"));
130     if (stream->output) {
131         h2_beam_on_was_empty(stream->output, NULL, NULL);
132     }
133     if (stream->input) {
134         h2_beam_on_received(stream->input, NULL, NULL);
135         h2_beam_on_consumed(stream->input, NULL, NULL);
136     }
137 
138     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
139                   H2_STRM_MSG(stream, "cleanup, removing from registries"));
140     ap_assert(stream->state == H2_SS_CLEANUP);
141     h2_stream_cleanup(stream);
142     h2_ihash_remove(m->streams, stream->id);
143     h2_iq_remove(m->q, stream->id);
144 
145     if (c2_ctx) {
146         if (!stream_is_running(stream)) {
147             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
148                           H2_STRM_MSG(stream, "cleanup, c2 is done, move to spurge"));
149             /* processing has finished */
150             APR_ARRAY_PUSH(m->spurge, h2_stream *) = stream;
151         }
152         else {
153             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
154                           H2_STRM_MSG(stream, "cleanup, c2 is running, abort"));
155             /* c2 is still running */
156             stream->c2->aborted = 1;
157             if (stream->input) {
158                 h2_beam_abort(stream->input, m->c1);
159             }
160             if (stream->output) {
161                 h2_beam_abort(stream->output, m->c1);
162             }
163             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
164                           H2_STRM_MSG(stream, "cleanup, c2 is done, move to shold"));
165             h2_ihash_add(m->shold, stream);
166         }
167     }
168     else {
169         /* never started */
170         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
171                       H2_STRM_MSG(stream, "cleanup, never started, move to spurge"));
172         APR_ARRAY_PUSH(m->spurge, h2_stream *) = stream;
173     }
174 }
175 
176 /**
177  * A h2_mplx needs to be thread-safe *and* if will be called by
178  * the h2_session thread *and* the h2_worker threads. Therefore:
179  * - calls are protected by a mutex lock, m->lock
180  * - the pool needs its own allocator, since apr_allocator_t are
181  *   not re-entrant. The separate allocator works without a
182  *   separate lock since we already protect h2_mplx itself.
183  *   Since HTTP/2 connections can be expected to live longer than
184  *   their HTTP/1 cousins, the separate allocator seems to work better
185  *   than protecting a shared h2_session one with an own lock.
186  */
h2_mplx_c1_create(h2_stream * stream0,server_rec * s,apr_pool_t * parent,h2_workers * workers)187 h2_mplx *h2_mplx_c1_create(h2_stream *stream0, server_rec *s, apr_pool_t *parent,
188                           h2_workers *workers)
189 {
190     h2_conn_ctx_t *conn_ctx;
191     apr_status_t status = APR_SUCCESS;
192     apr_allocator_t *allocator;
193     apr_thread_mutex_t *mutex = NULL;
194     h2_mplx *m = NULL;
195 
196     m = apr_pcalloc(parent, sizeof(h2_mplx));
197     m->stream0 = stream0;
198     m->c1 = stream0->c2;
199     m->s = s;
200     m->id = m->c1->id;
201 
202     /* We create a pool with its own allocator to be used for
203      * processing secondary connections. This is the only way to have the
204      * processing independent of its parent pool in the sense that it
205      * can work in another thread. Also, the new allocator needs its own
206      * mutex to synchronize sub-pools.
207      */
208     status = apr_allocator_create(&allocator);
209     if (status != APR_SUCCESS) {
210         allocator = NULL;
211         goto failure;
212     }
213 
214     apr_allocator_max_free_set(allocator, ap_max_mem_free);
215     apr_pool_create_ex(&m->pool, parent, NULL, allocator);
216     if (!m->pool) goto failure;
217 
218     apr_pool_tag(m->pool, "h2_mplx");
219     apr_allocator_owner_set(allocator, m->pool);
220 
221     status = apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_DEFAULT,
222                                      m->pool);
223     if (APR_SUCCESS != status) goto failure;
224     apr_allocator_mutex_set(allocator, mutex);
225 
226     status = apr_thread_mutex_create(&m->lock, APR_THREAD_MUTEX_DEFAULT,
227                                      m->pool);
228     if (APR_SUCCESS != status) goto failure;
229 
230     status = apr_thread_cond_create(&m->join_wait, m->pool);
231     if (APR_SUCCESS != status) goto failure;
232 
233     m->max_streams = h2_config_sgeti(s, H2_CONF_MAX_STREAMS);
234     m->stream_max_mem = h2_config_sgeti(s, H2_CONF_STREAM_MAX_MEM);
235 
236     m->streams = h2_ihash_create(m->pool, offsetof(h2_stream,id));
237     m->shold = h2_ihash_create(m->pool, offsetof(h2_stream,id));
238     m->spurge = apr_array_make(m->pool, 10, sizeof(h2_stream*));
239     m->q = h2_iq_create(m->pool, m->max_streams);
240 
241     m->workers = workers;
242     m->processing_max = workers->max_workers;
243     m->processing_limit = 6; /* the original h1 max parallel connections */
244     m->last_mood_change = apr_time_now();
245     m->mood_update_interval = apr_time_from_msec(100);
246 
247     status = mplx_pollset_create(m);
248     if (APR_SUCCESS != status) {
249         ap_log_cerror(APLOG_MARK, APLOG_ERR, status, m->c1, APLOGNO(10308)
250                       "nghttp2: could not create pollset");
251         goto failure;
252     }
253     m->streams_to_poll = apr_array_make(m->pool, 10, sizeof(h2_stream*));
254     m->streams_ev_in = apr_array_make(m->pool, 10, sizeof(h2_stream*));
255     m->streams_ev_out = apr_array_make(m->pool, 10, sizeof(h2_stream*));
256 
257 #if !H2_POLL_STREAMS
258     status = apr_thread_mutex_create(&m->poll_lock, APR_THREAD_MUTEX_DEFAULT,
259                                      m->pool);
260     if (APR_SUCCESS != status) goto failure;
261     m->streams_input_read = h2_iq_create(m->pool, 10);
262     m->streams_output_written = h2_iq_create(m->pool, 10);
263 #endif
264 
265     conn_ctx = h2_conn_ctx_get(m->c1);
266     mplx_pollset_add(m, conn_ctx);
267 
268     m->scratch_r = apr_pcalloc(m->pool, sizeof(*m->scratch_r));
269 
270     return m;
271 
272 failure:
273     if (m->pool) {
274         apr_pool_destroy(m->pool);
275     }
276     else if (allocator) {
277         apr_allocator_destroy(allocator);
278     }
279     return NULL;
280 }
281 
h2_mplx_c1_shutdown(h2_mplx * m)282 int h2_mplx_c1_shutdown(h2_mplx *m)
283 {
284     int max_stream_id_started = 0;
285 
286     H2_MPLX_ENTER(m);
287 
288     max_stream_id_started = m->max_stream_id_started;
289     /* Clear schedule queue, disabling existing streams from starting */
290     h2_iq_clear(m->q);
291 
292     H2_MPLX_LEAVE(m);
293     return max_stream_id_started;
294 }
295 
296 typedef struct {
297     h2_mplx_stream_cb *cb;
298     void *ctx;
299 } stream_iter_ctx_t;
300 
m_stream_iter_wrap(void * ctx,void * stream)301 static int m_stream_iter_wrap(void *ctx, void *stream)
302 {
303     stream_iter_ctx_t *x = ctx;
304     return x->cb(stream, x->ctx);
305 }
306 
h2_mplx_c1_streams_do(h2_mplx * m,h2_mplx_stream_cb * cb,void * ctx)307 apr_status_t h2_mplx_c1_streams_do(h2_mplx *m, h2_mplx_stream_cb *cb, void *ctx)
308 {
309     stream_iter_ctx_t x;
310 
311     H2_MPLX_ENTER(m);
312 
313     x.cb = cb;
314     x.ctx = ctx;
315     h2_ihash_iter(m->streams, m_stream_iter_wrap, &x);
316 
317     H2_MPLX_LEAVE(m);
318     return APR_SUCCESS;
319 }
320 
m_report_stream_iter(void * ctx,void * val)321 static int m_report_stream_iter(void *ctx, void *val) {
322     h2_mplx *m = ctx;
323     h2_stream *stream = val;
324     h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(stream->c2);
325     ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c1,
326                   H2_STRM_MSG(stream, "started=%d, scheduled=%d, ready=%d, out_buffer=%ld"),
327                   !!stream->c2, stream->scheduled, h2_stream_is_ready(stream),
328                   (long)(stream->output? h2_beam_get_buffered(stream->output) : -1));
329     if (conn_ctx) {
330         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c1, /* NO APLOGNO */
331                       H2_STRM_MSG(stream, "->03198: %s %s %s"
332                       "[started=%d/done=%d]"),
333                       conn_ctx->request->method, conn_ctx->request->authority,
334                       conn_ctx->request->path, conn_ctx->started_at != 0,
335                       conn_ctx->done);
336     }
337     else {
338         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c1, /* NO APLOGNO */
339                       H2_STRM_MSG(stream, "->03198: not started"));
340     }
341     return 1;
342 }
343 
m_unexpected_stream_iter(void * ctx,void * val)344 static int m_unexpected_stream_iter(void *ctx, void *val) {
345     h2_mplx *m = ctx;
346     h2_stream *stream = val;
347     ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c1, /* NO APLOGNO */
348                   H2_STRM_MSG(stream, "unexpected, started=%d, scheduled=%d, ready=%d"),
349                   !!stream->c2, stream->scheduled, h2_stream_is_ready(stream));
350     return 1;
351 }
352 
m_stream_cancel_iter(void * ctx,void * val)353 static int m_stream_cancel_iter(void *ctx, void *val) {
354     h2_mplx *m = ctx;
355     h2_stream *stream = val;
356 
357     /* disable input consumed reporting */
358     if (stream->input) {
359         h2_beam_abort(stream->input, m->c1);
360     }
361     /* take over event monitoring */
362     h2_stream_set_monitor(stream, NULL);
363     /* Reset, should transit to CLOSED state */
364     h2_stream_rst(stream, H2_ERR_NO_ERROR);
365     /* All connection data has been sent, simulate cleanup */
366     h2_stream_dispatch(stream, H2_SEV_EOS_SENT);
367     m_stream_cleanup(m, stream);
368     return 0;
369 }
370 
h2_mplx_c1_destroy(h2_mplx * m)371 void h2_mplx_c1_destroy(h2_mplx *m)
372 {
373     apr_status_t status;
374     int i, wait_secs = 60, old_aborted;
375 
376     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
377                   "h2_mplx(%ld): start release", m->id);
378     /* How to shut down a h2 connection:
379      * 0. abort and tell the workers that no more work will come from us */
380     m->aborted = 1;
381     h2_workers_unregister(m->workers, m);
382 
383     H2_MPLX_ENTER_ALWAYS(m);
384 
385     /* While really terminating any c2 connections, treat the master
386      * connection as aborted. It's not as if we could send any more data
387      * at this point. */
388     old_aborted = m->c1->aborted;
389     m->c1->aborted = 1;
390 
391     /* How to shut down a h2 connection:
392      * 1. cancel all streams still active */
393     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c1,
394                   "h2_mplx(%ld): release, %d/%d/%d streams (total/hold/purge), %d streams",
395                   m->id, (int)h2_ihash_count(m->streams),
396                   (int)h2_ihash_count(m->shold), m->spurge->nelts, m->processing_count);
397     while (!h2_ihash_iter(m->streams, m_stream_cancel_iter, m)) {
398         /* until empty */
399     }
400 
401     /* 2. no more streams should be scheduled or in the active set */
402     ap_assert(h2_ihash_empty(m->streams));
403     ap_assert(h2_iq_empty(m->q));
404 
405     /* 3. while workers are busy on this connection, meaning they
406      *    are processing streams from this connection, wait on them finishing
407      *    in order to wake us and let us check again.
408      *    Eventually, this has to succeed. */
409     for (i = 0; h2_ihash_count(m->shold) > 0; ++i) {
410         status = apr_thread_cond_timedwait(m->join_wait, m->lock, apr_time_from_sec(wait_secs));
411 
412         if (APR_STATUS_IS_TIMEUP(status)) {
413             /* This can happen if we have very long running requests
414              * that do not time out on IO. */
415             ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c1, APLOGNO(03198)
416                           "h2_mplx(%ld): waited %d sec for %d streams",
417                           m->id, i*wait_secs, (int)h2_ihash_count(m->shold));
418             h2_ihash_iter(m->shold, m_report_stream_iter, m);
419         }
420     }
421     m->join_wait = NULL;
422 
423     /* 4. With all workers done, all streams should be in spurge */
424     ap_assert(m->processing_count == 0);
425     if (!h2_ihash_empty(m->shold)) {
426         ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c1, APLOGNO(03516)
427                       "h2_mplx(%ld): unexpected %d streams in hold",
428                       m->id, (int)h2_ihash_count(m->shold));
429         h2_ihash_iter(m->shold, m_unexpected_stream_iter, m);
430     }
431 
432     m->c1->aborted = old_aborted;
433     H2_MPLX_LEAVE(m);
434 
435     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c1, "h2_mplx(%ld): released", m->id);
436 }
437 
h2_mplx_c1_stream_cleanup(h2_mplx * m,h2_stream * stream,int * pstream_count)438 apr_status_t h2_mplx_c1_stream_cleanup(h2_mplx *m, h2_stream *stream,
439                                        int *pstream_count)
440 {
441     H2_MPLX_ENTER(m);
442 
443     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
444                   H2_STRM_MSG(stream, "cleanup"));
445     m_stream_cleanup(m, stream);
446     *pstream_count = (int)h2_ihash_count(m->streams);
447     H2_MPLX_LEAVE(m);
448     return APR_SUCCESS;
449 }
450 
h2_mplx_c2_stream_get(h2_mplx * m,int stream_id)451 const h2_stream *h2_mplx_c2_stream_get(h2_mplx *m, int stream_id)
452 {
453     h2_stream *s = NULL;
454 
455     H2_MPLX_ENTER_ALWAYS(m);
456     s = h2_ihash_get(m->streams, stream_id);
457     H2_MPLX_LEAVE(m);
458 
459     return s;
460 }
461 
462 
c1_update_scoreboard(h2_mplx * m,h2_stream * stream)463 static void c1_update_scoreboard(h2_mplx *m, h2_stream *stream)
464 {
465     if (stream->c2) {
466         m->scratch_r->connection = stream->c2;
467         m->scratch_r->bytes_sent = stream->out_frame_octets;
468         ap_increment_counts(m->c1->sbh, m->scratch_r);
469         m->scratch_r->connection = NULL;
470     }
471 }
472 
c1_purge_streams(h2_mplx * m)473 static void c1_purge_streams(h2_mplx *m)
474 {
475     h2_stream *stream;
476     int i;
477 
478     for (i = 0; i < m->spurge->nelts; ++i) {
479         stream = APR_ARRAY_IDX(m->spurge, i, h2_stream*);
480         ap_assert(stream->state == H2_SS_CLEANUP);
481 
482         c1_update_scoreboard(m, stream);
483 
484         if (stream->input) {
485             h2_beam_destroy(stream->input, m->c1);
486             stream->input = NULL;
487         }
488         if (stream->c2) {
489             conn_rec *c2 = stream->c2;
490             h2_conn_ctx_t *c2_ctx = h2_conn_ctx_get(c2);
491             apr_status_t rv;
492 
493             stream->c2 = NULL;
494             ap_assert(c2_ctx);
495             rv = mplx_pollset_remove(m, c2_ctx);
496             if (APR_SUCCESS != rv) {
497                 ap_log_cerror(APLOG_MARK, APLOG_INFO, rv, m->c1,
498                               "h2_mplx(%ld-%d): pollset_remove %d on purge",
499                               m->id, stream->id, c2_ctx->stream_id);
500             }
501 
502             h2_conn_ctx_destroy(c2);
503             h2_c2_destroy(c2);
504         }
505         h2_stream_destroy(stream);
506     }
507     apr_array_clear(m->spurge);
508 }
509 
h2_mplx_c1_poll(h2_mplx * m,apr_interval_time_t timeout,stream_ev_callback * on_stream_input,stream_ev_callback * on_stream_output,void * on_ctx)510 apr_status_t h2_mplx_c1_poll(h2_mplx *m, apr_interval_time_t timeout,
511                             stream_ev_callback *on_stream_input,
512                             stream_ev_callback *on_stream_output,
513                             void *on_ctx)
514 {
515     apr_status_t rv;
516 
517     H2_MPLX_ENTER(m);
518 
519     if (m->aborted) {
520         rv = APR_ECONNABORTED;
521         goto cleanup;
522     }
523     /* Purge (destroy) streams outside of pollset processing.
524      * Streams that are registered in the pollset, will be removed
525      * when they are destroyed, but the pollset works on copies
526      * of these registrations. So, if we destroy streams while
527      * processing pollset events, we might access freed memory.
528      */
529     if (m->spurge->nelts) {
530         c1_purge_streams(m);
531     }
532     rv = mplx_pollset_poll(m, timeout, on_stream_input, on_stream_output, on_ctx);
533 
534 cleanup:
535     H2_MPLX_LEAVE(m);
536     return rv;
537 }
538 
h2_mplx_c1_reprioritize(h2_mplx * m,h2_stream_pri_cmp_fn * cmp,h2_session * session)539 apr_status_t h2_mplx_c1_reprioritize(h2_mplx *m, h2_stream_pri_cmp_fn *cmp,
540                                     h2_session *session)
541 {
542     apr_status_t status;
543 
544     H2_MPLX_ENTER(m);
545 
546     if (m->aborted) {
547         status = APR_ECONNABORTED;
548     }
549     else {
550         h2_iq_sort(m->q, cmp, session);
551         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c1,
552                       "h2_mplx(%ld): reprioritize streams", m->id);
553         status = APR_SUCCESS;
554     }
555 
556     H2_MPLX_LEAVE(m);
557     return status;
558 }
559 
ms_register_if_needed(h2_mplx * m,int from_master)560 static void ms_register_if_needed(h2_mplx *m, int from_master)
561 {
562     if (!m->aborted && !m->is_registered && !h2_iq_empty(m->q)) {
563         apr_status_t status = h2_workers_register(m->workers, m);
564         if (status == APR_SUCCESS) {
565             m->is_registered = 1;
566         }
567         else if (from_master) {
568             ap_log_cerror(APLOG_MARK, APLOG_ERR, status, m->c1, APLOGNO(10021)
569                           "h2_mplx(%ld): register at workers", m->id);
570         }
571     }
572 }
573 
c1_process_stream(h2_mplx * m,h2_stream * stream,h2_stream_pri_cmp_fn * cmp,h2_session * session)574 static apr_status_t c1_process_stream(h2_mplx *m,
575                                       h2_stream *stream,
576                                       h2_stream_pri_cmp_fn *cmp,
577                                       h2_session *session)
578 {
579     apr_status_t rv;
580 
581     if (m->aborted) {
582         rv = APR_ECONNABORTED;
583         goto cleanup;
584     }
585     if (!stream->request) {
586         rv = APR_EINVAL;
587         goto cleanup;
588     }
589     if (APLOGctrace1(m->c1)) {
590         const h2_request *r = stream->request;
591         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c1,
592                       H2_STRM_MSG(stream, "process %s %s://%s%s chunked=%d"),
593                       r->method, r->scheme, r->authority, r->path, r->chunked);
594     }
595 
596     rv = h2_stream_setup_input(stream);
597     if (APR_SUCCESS != rv) goto cleanup;
598 
599     stream->scheduled = 1;
600     h2_ihash_add(m->streams, stream);
601     if (h2_stream_is_ready(stream)) {
602         /* already have a response */
603         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c1,
604                       H2_STRM_MSG(stream, "process, ready already"));
605     }
606     else {
607         h2_iq_add(m->q, stream->id, cmp, session);
608         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c1,
609                       H2_STRM_MSG(stream, "process, added to q"));
610     }
611 
612 cleanup:
613     return rv;
614 }
615 
h2_mplx_c1_process(h2_mplx * m,h2_iqueue * ready_to_process,h2_stream_get_fn * get_stream,h2_stream_pri_cmp_fn * stream_pri_cmp,h2_session * session,int * pstream_count)616 apr_status_t h2_mplx_c1_process(h2_mplx *m,
617                                 h2_iqueue *ready_to_process,
618                                 h2_stream_get_fn *get_stream,
619                                 h2_stream_pri_cmp_fn *stream_pri_cmp,
620                                 h2_session *session,
621                                 int *pstream_count)
622 {
623     apr_status_t rv = APR_SUCCESS;
624     int sid;
625 
626     H2_MPLX_ENTER(m);
627 
628     while ((sid = h2_iq_shift(ready_to_process)) > 0) {
629         h2_stream *stream = get_stream(session, sid);
630         if (stream) {
631             ap_assert(!stream->scheduled);
632             rv = c1_process_stream(session->mplx, stream, stream_pri_cmp, session);
633             if (APR_SUCCESS != rv) {
634                 h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR);
635             }
636         }
637         else {
638             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c1,
639                           "h2_stream(%ld-%d): not found to process", m->id, sid);
640         }
641     }
642     ms_register_if_needed(m, 1);
643     *pstream_count = (int)h2_ihash_count(m->streams);
644 #if APR_POOL_DEBUG
645     do {
646         apr_size_t mem_g, mem_m, mem_s, mem_w, mem_c1;
647 
648         mem_g = pchild? apr_pool_num_bytes(pchild, 1) : 0;
649         mem_m = apr_pool_num_bytes(m->pool, 1);
650         mem_s = apr_pool_num_bytes(session->pool, 1);
651         mem_w = apr_pool_num_bytes(m->workers->pool, 1);
652         mem_c1 = apr_pool_num_bytes(m->c1->pool, 1);
653         ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, m->c1,
654                       "h2_mplx(%ld): child mem=%ld, mplx mem=%ld, session mem=%ld, workers=%ld, c1=%ld",
655                       m->id, (long)mem_g, (long)mem_m, (long)mem_s, (long)mem_w, (long)mem_c1);
656 
657     } while (0);
658 #endif
659 
660     H2_MPLX_LEAVE(m);
661     return rv;
662 }
663 
h2_mplx_c1_fwd_input(h2_mplx * m,struct h2_iqueue * input_pending,h2_stream_get_fn * get_stream,struct h2_session * session)664 apr_status_t h2_mplx_c1_fwd_input(h2_mplx *m, struct h2_iqueue *input_pending,
665                                   h2_stream_get_fn *get_stream,
666                                   struct h2_session *session)
667 {
668     int sid;
669 
670     H2_MPLX_ENTER(m);
671 
672     while ((sid = h2_iq_shift(input_pending)) > 0) {
673         h2_stream *stream = get_stream(session, sid);
674         if (stream) {
675             H2_MPLX_LEAVE(m);
676             h2_stream_flush_input(stream);
677             H2_MPLX_ENTER(m);
678         }
679     }
680 
681     H2_MPLX_LEAVE(m);
682     return APR_SUCCESS;
683 }
684 
c2_beam_input_write_notify(void * ctx,h2_bucket_beam * beam)685 static void c2_beam_input_write_notify(void *ctx, h2_bucket_beam *beam)
686 {
687     conn_rec *c = ctx;
688     h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c);
689 
690     (void)beam;
691     if (conn_ctx && conn_ctx->stream_id && conn_ctx->pipe_in_prod[H2_PIPE_IN]) {
692         apr_file_putc(1, conn_ctx->pipe_in_prod[H2_PIPE_IN]);
693     }
694 }
695 
c2_beam_input_read_notify(void * ctx,h2_bucket_beam * beam)696 static void c2_beam_input_read_notify(void *ctx, h2_bucket_beam *beam)
697 {
698     conn_rec *c = ctx;
699     h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c);
700 
701     if (conn_ctx && conn_ctx->stream_id) {
702         if (conn_ctx->pipe_in_drain[H2_PIPE_IN]) {
703             apr_file_putc(1, conn_ctx->pipe_in_drain[H2_PIPE_IN]);
704         }
705 #if !H2_POLL_STREAMS
706         else {
707             apr_thread_mutex_lock(conn_ctx->mplx->poll_lock);
708             h2_iq_append(conn_ctx->mplx->streams_input_read, conn_ctx->stream_id);
709             apr_pollset_wakeup(conn_ctx->mplx->pollset);
710             apr_thread_mutex_unlock(conn_ctx->mplx->poll_lock);
711         }
712 #endif
713     }
714 }
715 
c2_beam_output_write_notify(void * ctx,h2_bucket_beam * beam)716 static void c2_beam_output_write_notify(void *ctx, h2_bucket_beam *beam)
717 {
718     conn_rec *c = ctx;
719     h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c);
720 
721     if (conn_ctx && conn_ctx->stream_id) {
722         if (conn_ctx->pipe_out_prod[H2_PIPE_IN]) {
723             apr_file_putc(1, conn_ctx->pipe_out_prod[H2_PIPE_IN]);
724         }
725 #if !H2_POLL_STREAMS
726         else {
727             apr_thread_mutex_lock(conn_ctx->mplx->poll_lock);
728             h2_iq_append(conn_ctx->mplx->streams_output_written, conn_ctx->stream_id);
729             apr_pollset_wakeup(conn_ctx->mplx->pollset);
730             apr_thread_mutex_unlock(conn_ctx->mplx->poll_lock);
731         }
732 #endif
733     }
734 }
735 
c2_setup_io(h2_mplx * m,conn_rec * c2,h2_stream * stream)736 static apr_status_t c2_setup_io(h2_mplx *m, conn_rec *c2, h2_stream *stream)
737 {
738     h2_conn_ctx_t *conn_ctx;
739     apr_status_t rv = APR_SUCCESS;
740     const char *action = "init";
741 
742     rv = h2_conn_ctx_init_for_c2(&conn_ctx, c2, m, stream);
743     if (APR_SUCCESS != rv) goto cleanup;
744 
745     if (!conn_ctx->beam_out) {
746         action = "create output beam";
747         rv = h2_beam_create(&conn_ctx->beam_out, c2, conn_ctx->req_pool,
748                             stream->id, "output", 0, c2->base_server->timeout);
749         if (APR_SUCCESS != rv) goto cleanup;
750 
751         h2_beam_buffer_size_set(conn_ctx->beam_out, m->stream_max_mem);
752         h2_beam_on_was_empty(conn_ctx->beam_out, c2_beam_output_write_notify, c2);
753     }
754 
755     if (stream->input) {
756         conn_ctx->beam_in = stream->input;
757         h2_beam_on_was_empty(stream->input, c2_beam_input_write_notify, c2);
758         h2_beam_on_received(stream->input, c2_beam_input_read_notify, c2);
759         h2_beam_on_consumed(stream->input, c1_input_consumed, stream);
760     }
761     else {
762         memset(&conn_ctx->pfd_in_drain, 0, sizeof(conn_ctx->pfd_in_drain));
763     }
764 
765 #if H2_POLL_STREAMS
766     if (!conn_ctx->mplx_pool) {
767         apr_pool_create(&conn_ctx->mplx_pool, m->pool);
768         apr_pool_tag(conn_ctx->mplx_pool, "H2_MPLX_C2");
769     }
770 
771     if (!conn_ctx->pipe_out_prod[H2_PIPE_OUT]) {
772         action = "create output pipe";
773         rv = apr_file_pipe_create_pools(&conn_ctx->pipe_out_prod[H2_PIPE_OUT],
774                                         &conn_ctx->pipe_out_prod[H2_PIPE_IN],
775                                         APR_FULL_NONBLOCK,
776                                         conn_ctx->mplx_pool, c2->pool);
777         if (APR_SUCCESS != rv) goto cleanup;
778     }
779     conn_ctx->pfd_out_prod.desc_type = APR_POLL_FILE;
780     conn_ctx->pfd_out_prod.desc.f = conn_ctx->pipe_out_prod[H2_PIPE_OUT];
781     conn_ctx->pfd_out_prod.reqevents = APR_POLLIN | APR_POLLERR | APR_POLLHUP;
782     conn_ctx->pfd_out_prod.client_data = conn_ctx;
783 
784     if (stream->input) {
785         if (!conn_ctx->pipe_in_prod[H2_PIPE_OUT]) {
786             action = "create input write pipe";
787             rv = apr_file_pipe_create_pools(&conn_ctx->pipe_in_prod[H2_PIPE_OUT],
788                                             &conn_ctx->pipe_in_prod[H2_PIPE_IN],
789                                             APR_READ_BLOCK,
790                                             c2->pool, conn_ctx->mplx_pool);
791             if (APR_SUCCESS != rv) goto cleanup;
792         }
793         if (!conn_ctx->pipe_in_drain[H2_PIPE_OUT]) {
794             action = "create input read pipe";
795             rv = apr_file_pipe_create_pools(&conn_ctx->pipe_in_drain[H2_PIPE_OUT],
796                                             &conn_ctx->pipe_in_drain[H2_PIPE_IN],
797                                             APR_FULL_NONBLOCK,
798                                             c2->pool, conn_ctx->mplx_pool);
799             if (APR_SUCCESS != rv) goto cleanup;
800         }
801         conn_ctx->pfd_in_drain.desc_type = APR_POLL_FILE;
802         conn_ctx->pfd_in_drain.desc.f = conn_ctx->pipe_in_drain[H2_PIPE_OUT];
803         conn_ctx->pfd_in_drain.reqevents = APR_POLLIN | APR_POLLERR | APR_POLLHUP;
804         conn_ctx->pfd_in_drain.client_data = conn_ctx;
805     }
806 #else
807     memset(&conn_ctx->pfd_out_prod, 0, sizeof(conn_ctx->pfd_out_prod));
808     memset(&conn_ctx->pipe_in_prod, 0, sizeof(conn_ctx->pipe_in_prod));
809     memset(&conn_ctx->pipe_in_drain, 0, sizeof(conn_ctx->pipe_in_drain));
810 #endif
811 
812 cleanup:
813     stream->output = (APR_SUCCESS == rv)? conn_ctx->beam_out : NULL;
814     if (APR_SUCCESS != rv) {
815         ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, c2,
816                       H2_STRM_LOG(APLOGNO(10309), stream,
817                       "error %s"), action);
818     }
819     return rv;
820 }
821 
s_next_c2(h2_mplx * m)822 static conn_rec *s_next_c2(h2_mplx *m)
823 {
824     h2_stream *stream = NULL;
825     apr_status_t rv;
826     int sid;
827     conn_rec *c2;
828 
829     while (!m->aborted && !stream && (m->processing_count < m->processing_limit)
830            && (sid = h2_iq_shift(m->q)) > 0) {
831         stream = h2_ihash_get(m->streams, sid);
832     }
833 
834     if (!stream) {
835         if (m->processing_count >= m->processing_limit && !h2_iq_empty(m->q)) {
836             ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c1,
837                           "h2_session(%ld): delaying request processing. "
838                           "Current limit is %d and %d workers are in use.",
839                           m->id, m->processing_limit, m->processing_count);
840         }
841         return NULL;
842     }
843 
844     if (sid > m->max_stream_id_started) {
845         m->max_stream_id_started = sid;
846     }
847 
848     c2 = h2_c2_create(m->c1, m->pool);
849     ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c1,
850                   H2_STRM_MSG(stream, "created new c2"));
851 
852     rv = c2_setup_io(m, c2, stream);
853     if (APR_SUCCESS != rv) {
854         return NULL;
855     }
856 
857     stream->c2 = c2;
858     ++m->processing_count;
859     APR_ARRAY_PUSH(m->streams_to_poll, h2_stream *) = stream;
860     apr_pollset_wakeup(m->pollset);
861 
862     return c2;
863 }
864 
h2_mplx_worker_pop_c2(h2_mplx * m,conn_rec ** out_c)865 apr_status_t h2_mplx_worker_pop_c2(h2_mplx *m, conn_rec **out_c)
866 {
867     apr_status_t rv = APR_EOF;
868 
869     *out_c = NULL;
870     ap_assert(m);
871     ap_assert(m->lock);
872 
873     if (APR_SUCCESS != (rv = apr_thread_mutex_lock(m->lock))) {
874         return rv;
875     }
876 
877     if (m->aborted) {
878         rv = APR_EOF;
879     }
880     else {
881         *out_c = s_next_c2(m);
882         rv = (*out_c != NULL && !h2_iq_empty(m->q))? APR_EAGAIN : APR_SUCCESS;
883     }
884     if (APR_EAGAIN != rv) {
885         m->is_registered = 0; /* h2_workers will discard this mplx */
886     }
887     H2_MPLX_LEAVE(m);
888     return rv;
889 }
890 
s_c2_done(h2_mplx * m,conn_rec * c2,h2_conn_ctx_t * conn_ctx)891 static void s_c2_done(h2_mplx *m, conn_rec *c2, h2_conn_ctx_t *conn_ctx)
892 {
893     h2_stream *stream;
894 
895     ap_assert(conn_ctx);
896     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c2,
897                   "h2_mplx(%s-%d): c2 done", conn_ctx->id, conn_ctx->stream_id);
898 
899     ap_assert(conn_ctx->done == 0);
900     conn_ctx->done = 1;
901     conn_ctx->done_at = apr_time_now();
902     ++c2->keepalives;
903     /* From here on, the final handling of c2 is done by c1 processing.
904      * Which means we can give it c1's scoreboard handle for updates. */
905     c2->sbh = m->c1->sbh;
906 
907     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c2,
908                   "h2_mplx(%s-%d): request done, %f ms elapsed",
909                   conn_ctx->id, conn_ctx->stream_id,
910                   (conn_ctx->done_at - conn_ctx->started_at) / 1000.0);
911 
912     if (!conn_ctx->has_final_response) {
913         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, conn_ctx->last_err, c2,
914                       "h2_c2(%s-%d): processing finished without final response",
915                       conn_ctx->id, conn_ctx->stream_id);
916         c2->aborted = 1;
917     }
918     else if (!c2->aborted && conn_ctx->started_at > m->last_mood_change) {
919         s_mplx_be_happy(m, c2, conn_ctx);
920     }
921 
922     stream = h2_ihash_get(m->streams, conn_ctx->stream_id);
923     if (stream) {
924         /* stream not done yet. trigger a potential polling on the output
925          * since nothing more will happening here. */
926         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c2,
927                       H2_STRM_MSG(stream, "c2_done, stream open"));
928         c2_beam_output_write_notify(c2, NULL);
929     }
930     else if ((stream = h2_ihash_get(m->shold, conn_ctx->stream_id)) != NULL) {
931         /* stream is done, was just waiting for this. */
932         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c2,
933                       H2_STRM_MSG(stream, "c2_done, in hold"));
934         c1c2_stream_joined(m, stream);
935     }
936     else {
937         int i;
938 
939         for (i = 0; i < m->spurge->nelts; ++i) {
940             if (stream == APR_ARRAY_IDX(m->spurge, i, h2_stream*)) {
941                 ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, c2,
942                               H2_STRM_LOG(APLOGNO(03517), stream, "already in spurge"));
943                 ap_assert("stream should not be in spurge" == NULL);
944                 return;
945             }
946         }
947 
948         ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, c2, APLOGNO(03518)
949                       "h2_mplx(%s-%d): c2_done, stream not found",
950                       conn_ctx->id, conn_ctx->stream_id);
951         ap_assert("stream should still be available" == NULL);
952     }
953 }
954 
h2_mplx_worker_c2_done(conn_rec * c2,conn_rec ** out_c2)955 void h2_mplx_worker_c2_done(conn_rec *c2, conn_rec **out_c2)
956 {
957     h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c2);
958     h2_mplx *m;
959 
960     if (!conn_ctx || !conn_ctx->mplx) return;
961     m = conn_ctx->mplx;
962 
963     H2_MPLX_ENTER_ALWAYS(m);
964 
965     --m->processing_count;
966     s_c2_done(m, c2, conn_ctx);
967 
968     if (m->join_wait) {
969         apr_thread_cond_signal(m->join_wait);
970     }
971     if (out_c2) {
972         /* caller wants another connection to process */
973         *out_c2 = s_next_c2(m);
974     }
975     ms_register_if_needed(m, 0);
976 
977     H2_MPLX_LEAVE(m);
978 }
979 
980 /*******************************************************************************
981  * h2_mplx DoS protection
982  ******************************************************************************/
983 
s_mplx_be_happy(h2_mplx * m,conn_rec * c,h2_conn_ctx_t * conn_ctx)984 static apr_status_t s_mplx_be_happy(h2_mplx *m, conn_rec *c, h2_conn_ctx_t *conn_ctx)
985 {
986     apr_time_t now;
987 
988     --m->irritations_since;
989     now = apr_time_now();
990     if (m->processing_limit < m->processing_max
991         && (now - m->last_mood_change >= m->mood_update_interval
992             || m->irritations_since < -m->processing_limit)) {
993         m->processing_limit = H2MIN(m->processing_limit * 2, m->processing_max);
994         m->last_mood_change = now;
995         m->irritations_since = 0;
996         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
997                       "h2_mplx(%ld): mood update, increasing worker limit to %d",
998                       m->id, m->processing_limit);
999     }
1000     return APR_SUCCESS;
1001 }
1002 
m_be_annoyed(h2_mplx * m)1003 static apr_status_t m_be_annoyed(h2_mplx *m)
1004 {
1005     apr_status_t status = APR_SUCCESS;
1006     apr_time_t now;
1007 
1008     ++m->irritations_since;
1009     now = apr_time_now();
1010     if (m->processing_limit > 2 &&
1011         ((now - m->last_mood_change >= m->mood_update_interval)
1012          || (m->irritations_since >= m->processing_limit))) {
1013 
1014         if (m->processing_limit > 16) {
1015             m->processing_limit = 16;
1016         }
1017         else if (m->processing_limit > 8) {
1018             m->processing_limit = 8;
1019         }
1020         else if (m->processing_limit > 4) {
1021             m->processing_limit = 4;
1022         }
1023         else if (m->processing_limit > 2) {
1024             m->processing_limit = 2;
1025         }
1026         m->last_mood_change = now;
1027         m->irritations_since = 0;
1028         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c1,
1029                       "h2_mplx(%ld): mood update, decreasing worker limit to %d",
1030                       m->id, m->processing_limit);
1031     }
1032     return status;
1033 }
1034 
1035 /*******************************************************************************
1036  * mplx master events dispatching
1037  ******************************************************************************/
1038 
reset_is_acceptable(h2_stream * stream)1039 static int reset_is_acceptable(h2_stream *stream)
1040 {
1041     /* client may terminate a stream via H2 RST_STREAM message at any time.
1042      * This is annyoing when we have committed resources (e.g. worker threads)
1043      * to it, so our mood (e.g. willingness to commit resources on this
1044      * connection in the future) goes down.
1045      *
1046      * This is a DoS protection. We do not want to make it too easy for
1047      * a client to eat up server resources.
1048      *
1049      * However: there are cases where a RST_STREAM is the only way to end
1050      * a request. This includes websockets and server-side-event streams (SSEs).
1051      * The responses to such requests continue forever otherwise.
1052      *
1053      */
1054     if (!stream_is_running(stream)) return 1;
1055     if (!(stream->id & 0x01)) return 1; /* stream initiated by us. acceptable. */
1056     if (!stream->response) return 0; /* no response headers produced yet. bad. */
1057     if (!stream->out_data_frames) return 0; /* no response body data sent yet. bad. */
1058     return 1; /* otherwise, be forgiving */
1059 }
1060 
h2_mplx_c1_client_rst(h2_mplx * m,int stream_id)1061 apr_status_t h2_mplx_c1_client_rst(h2_mplx *m, int stream_id)
1062 {
1063     h2_stream *stream;
1064     apr_status_t status = APR_SUCCESS;
1065 
1066     H2_MPLX_ENTER_ALWAYS(m);
1067     stream = h2_ihash_get(m->streams, stream_id);
1068     if (stream && !reset_is_acceptable(stream)) {
1069         status = m_be_annoyed(m);
1070     }
1071     H2_MPLX_LEAVE(m);
1072     return status;
1073 }
1074 
mplx_pollset_create(h2_mplx * m)1075 static apr_status_t mplx_pollset_create(h2_mplx *m)
1076 {
1077     int max_pdfs;
1078 
1079     /* stream0 output, pdf_out+pfd_in_consume per active streams */
1080     max_pdfs = 1 + 2 * H2MIN(m->processing_max, m->max_streams);
1081     return apr_pollset_create(&m->pollset, max_pdfs, m->pool,
1082                               APR_POLLSET_WAKEABLE);
1083 }
1084 
mplx_pollset_add(h2_mplx * m,h2_conn_ctx_t * conn_ctx)1085 static apr_status_t mplx_pollset_add(h2_mplx *m, h2_conn_ctx_t *conn_ctx)
1086 {
1087     apr_status_t rv = APR_SUCCESS;
1088     const char *name = "";
1089 
1090     if (conn_ctx->pfd_out_prod.reqevents) {
1091         name = "adding out";
1092         rv = apr_pollset_add(m->pollset, &conn_ctx->pfd_out_prod);
1093         if (APR_SUCCESS != rv) goto cleanup;
1094     }
1095 
1096     if (conn_ctx->pfd_in_drain.reqevents) {
1097         name = "adding in_read";
1098         rv = apr_pollset_add(m->pollset, &conn_ctx->pfd_in_drain);
1099     }
1100 
1101 cleanup:
1102     if (APR_SUCCESS != rv) {
1103         ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, m->c1,
1104                       "h2_mplx(%ld-%d): error while adding to pollset %s",
1105                       m->id, conn_ctx->stream_id, name);
1106     }
1107     return rv;
1108 }
1109 
mplx_pollset_remove(h2_mplx * m,h2_conn_ctx_t * conn_ctx)1110 static apr_status_t mplx_pollset_remove(h2_mplx *m, h2_conn_ctx_t *conn_ctx)
1111 {
1112     apr_status_t rv = APR_SUCCESS;
1113     const char *name = "";
1114 
1115     if (conn_ctx->pfd_out_prod.reqevents) {
1116         rv = apr_pollset_remove(m->pollset, &conn_ctx->pfd_out_prod);
1117         conn_ctx->pfd_out_prod.reqevents = 0;
1118         if (APR_SUCCESS != rv) goto cleanup;
1119     }
1120 
1121     if (conn_ctx->pfd_in_drain.reqevents) {
1122         name = "in_read";
1123         rv = apr_pollset_remove(m->pollset, &conn_ctx->pfd_in_drain);
1124         conn_ctx->pfd_in_drain.reqevents = 0;
1125         if (APR_SUCCESS != rv) goto cleanup;
1126     }
1127 
1128 cleanup:
1129     if (APR_SUCCESS != rv) {
1130         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, rv, m->c1,
1131                       "h2_mplx(%ld-%d): error removing from pollset %s",
1132                       m->id, conn_ctx->stream_id, name);
1133     }
1134     return rv;
1135 }
1136 
mplx_pollset_poll(h2_mplx * m,apr_interval_time_t timeout,stream_ev_callback * on_stream_input,stream_ev_callback * on_stream_output,void * on_ctx)1137 static apr_status_t mplx_pollset_poll(h2_mplx *m, apr_interval_time_t timeout,
1138                             stream_ev_callback *on_stream_input,
1139                             stream_ev_callback *on_stream_output,
1140                             void *on_ctx)
1141 {
1142     apr_status_t rv;
1143     const apr_pollfd_t *results, *pfd;
1144     apr_int32_t nresults, i;
1145     h2_conn_ctx_t *conn_ctx;
1146     h2_stream *stream;
1147 
1148     /* Make sure we are not called recursively. */
1149     ap_assert(!m->polling);
1150     m->polling = 1;
1151     do {
1152         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
1153                       "h2_mplx(%ld): enter polling timeout=%d",
1154                       m->id, (int)apr_time_sec(timeout));
1155 
1156         apr_array_clear(m->streams_ev_in);
1157         apr_array_clear(m->streams_ev_out);
1158 
1159         do {
1160             /* add streams we started processing in the meantime */
1161             if (m->streams_to_poll->nelts) {
1162                 for (i = 0; i < m->streams_to_poll->nelts; ++i) {
1163                     stream = APR_ARRAY_IDX(m->streams_to_poll, i, h2_stream*);
1164                     if (stream && stream->c2 && (conn_ctx = h2_conn_ctx_get(stream->c2))) {
1165                         mplx_pollset_add(m, conn_ctx);
1166                     }
1167                 }
1168                 apr_array_clear(m->streams_to_poll);
1169             }
1170 
1171 #if !H2_POLL_STREAMS
1172             apr_thread_mutex_lock(m->poll_lock);
1173             if (!h2_iq_empty(m->streams_input_read)
1174                 || !h2_iq_empty(m->streams_output_written)) {
1175                 while ((i = h2_iq_shift(m->streams_input_read))) {
1176                     stream = h2_ihash_get(m->streams, i);
1177                     if (stream) {
1178                         APR_ARRAY_PUSH(m->streams_ev_in, h2_stream*) = stream;
1179                     }
1180                 }
1181                 while ((i = h2_iq_shift(m->streams_output_written))) {
1182                     stream = h2_ihash_get(m->streams, i);
1183                     if (stream) {
1184                         APR_ARRAY_PUSH(m->streams_ev_out, h2_stream*) = stream;
1185                     }
1186                 }
1187                 nresults = 0;
1188                 rv = APR_SUCCESS;
1189                 apr_thread_mutex_unlock(m->poll_lock);
1190                 break;
1191             }
1192             apr_thread_mutex_unlock(m->poll_lock);
1193 #endif
1194             H2_MPLX_LEAVE(m);
1195             rv = apr_pollset_poll(m->pollset, timeout >= 0? timeout : -1, &nresults, &results);
1196             H2_MPLX_ENTER_ALWAYS(m);
1197 
1198         } while (APR_STATUS_IS_EINTR(rv));
1199 
1200         if (APR_SUCCESS != rv) {
1201             if (APR_STATUS_IS_TIMEUP(rv)) {
1202                 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
1203                               "h2_mplx(%ld): polling timed out ",
1204                               m->id);
1205             }
1206             else {
1207                 ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, m->c1, APLOGNO(10310)
1208                               "h2_mplx(%ld): polling failed", m->id);
1209             }
1210             goto cleanup;
1211         }
1212 
1213         for (i = 0; i < nresults; i++) {
1214             pfd = &results[i];
1215             conn_ctx = pfd->client_data;
1216 
1217             ap_assert(conn_ctx);
1218             if (conn_ctx->stream_id == 0) {
1219                 if (on_stream_input) {
1220                     APR_ARRAY_PUSH(m->streams_ev_in, h2_stream*) = m->stream0;
1221                 }
1222                 continue;
1223             }
1224 
1225             h2_util_drain_pipe(pfd->desc.f);
1226             stream = h2_ihash_get(m->streams, conn_ctx->stream_id);
1227             if (!stream) {
1228                 stream = h2_ihash_get(m->shold, conn_ctx->stream_id);
1229                 if (stream) {
1230                     /* This is normal and means that stream processing on c1 has
1231                      * already finished to CLEANUP and c2 is not done yet */
1232                     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, m->c1,
1233                                   "h2_mplx(%ld-%d): stream already in hold for poll event %hx",
1234                                    m->id, conn_ctx->stream_id, pfd->rtnevents);
1235                 }
1236                 else {
1237                     h2_stream *sp = NULL;
1238                     int j;
1239 
1240                     for (j = 0; j < m->spurge->nelts; ++j) {
1241                         sp = APR_ARRAY_IDX(m->spurge, j, h2_stream*);
1242                         if (sp->id == conn_ctx->stream_id) {
1243                             stream = sp;
1244                             break;
1245                         }
1246                     }
1247 
1248                     if (stream) {
1249                         /* This is normal and means that stream processing on c1 has
1250                          * already finished to CLEANUP and c2 is not done yet */
1251                         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, rv, m->c1, APLOGNO(10311)
1252                                       "h2_mplx(%ld-%d): stream already in purge for poll event %hx",
1253                                        m->id, conn_ctx->stream_id, pfd->rtnevents);
1254                     }
1255                     else {
1256                         /* This should not happen. When a stream has been purged,
1257                          * it MUST no longer appear in the pollset. Puring is done
1258                          * outside the poll result processing. */
1259                         ap_log_cerror(APLOG_MARK, APLOG_WARNING, rv, m->c1, APLOGNO(10312)
1260                                       "h2_mplx(%ld-%d): stream no longer known for poll event %hx"
1261                                       ", m->streams=%d, conn_ctx=%lx, fd=%lx",
1262                                        m->id, conn_ctx->stream_id, pfd->rtnevents,
1263                                        (int)h2_ihash_count(m->streams),
1264                                        (long)conn_ctx, (long)pfd->desc.f);
1265                         h2_ihash_iter(m->streams, m_report_stream_iter, m);
1266                     }
1267                 }
1268                 continue;
1269             }
1270 
1271             if (conn_ctx->pfd_out_prod.desc.f == pfd->desc.f) {
1272                 /* output is available */
1273                 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
1274                               "[%s-%d] poll output event %hx",
1275                               conn_ctx->id, conn_ctx->stream_id,
1276                               pfd->rtnevents);
1277                 APR_ARRAY_PUSH(m->streams_ev_out, h2_stream*) = stream;
1278             }
1279             else if (conn_ctx->pfd_in_drain.desc.f == pfd->desc.f) {
1280                 /* input has been consumed */
1281                 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
1282                               "[%s-%d] poll input event %hx",
1283                               conn_ctx->id, conn_ctx->stream_id,
1284                               pfd->rtnevents);
1285                 APR_ARRAY_PUSH(m->streams_ev_in, h2_stream*) = stream;
1286             }
1287         }
1288 
1289         if (on_stream_input && m->streams_ev_in->nelts) {
1290             H2_MPLX_LEAVE(m);
1291             for (i = 0; i < m->streams_ev_in->nelts; ++i) {
1292                 on_stream_input(on_ctx, APR_ARRAY_IDX(m->streams_ev_in, i, h2_stream*));
1293             }
1294             H2_MPLX_ENTER_ALWAYS(m);
1295         }
1296         if (on_stream_output && m->streams_ev_out->nelts) {
1297             H2_MPLX_LEAVE(m);
1298             for (i = 0; i < m->streams_ev_out->nelts; ++i) {
1299                 on_stream_output(on_ctx, APR_ARRAY_IDX(m->streams_ev_out, i, h2_stream*));
1300             }
1301             H2_MPLX_ENTER_ALWAYS(m);
1302         }
1303         break;
1304     } while(1);
1305 
1306 cleanup:
1307     m->polling = 0;
1308     return rv;
1309 }
1310 
1311