1 /* Licensed to the Apache Software Foundation (ASF) under one or more
2 * contributor license agreements. See the NOTICE file distributed with
3 * this work for additional information regarding copyright ownership.
4 * The ASF licenses this file to You under the Apache License, Version 2.0
5 * (the "License"); you may not use this file except in compliance with
6 * the License. You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <assert.h>
18 #include <stddef.h>
19 #include <stdlib.h>
20
21 #include <apr_atomic.h>
22 #include <apr_thread_mutex.h>
23 #include <apr_thread_cond.h>
24 #include <apr_strings.h>
25 #include <apr_time.h>
26
27 #include <httpd.h>
28 #include <http_core.h>
29 #include <http_log.h>
30
31 #include <mpm_common.h>
32
33 #include "mod_http2.h"
34
35 #include "h2.h"
36 #include "h2_private.h"
37 #include "h2_bucket_beam.h"
38 #include "h2_config.h"
39 #include "h2_c1.h"
40 #include "h2_conn_ctx.h"
41 #include "h2_protocol.h"
42 #include "h2_mplx.h"
43 #include "h2_request.h"
44 #include "h2_stream.h"
45 #include "h2_session.h"
46 #include "h2_c2.h"
47 #include "h2_workers.h"
48 #include "h2_util.h"
49
50
51 /* utility for iterating over ihash stream sets */
52 typedef struct {
53 h2_mplx *m;
54 h2_stream *stream;
55 apr_time_t now;
56 apr_size_t count;
57 } stream_iter_ctx;
58
59 static apr_status_t s_mplx_be_happy(h2_mplx *m, conn_rec *c, h2_conn_ctx_t *conn_ctx);
60 static apr_status_t m_be_annoyed(h2_mplx *m);
61
62 static apr_status_t mplx_pollset_create(h2_mplx *m);
63 static apr_status_t mplx_pollset_add(h2_mplx *m, h2_conn_ctx_t *conn_ctx);
64 static apr_status_t mplx_pollset_remove(h2_mplx *m, h2_conn_ctx_t *conn_ctx);
65 static apr_status_t mplx_pollset_poll(h2_mplx *m, apr_interval_time_t timeout,
66 stream_ev_callback *on_stream_input,
67 stream_ev_callback *on_stream_output,
68 void *on_ctx);
69
70 static apr_pool_t *pchild;
71
h2_mplx_c1_child_init(apr_pool_t * pool,server_rec * s)72 apr_status_t h2_mplx_c1_child_init(apr_pool_t *pool, server_rec *s)
73 {
74 pchild = pool;
75 return APR_SUCCESS;
76 }
77
78 #define H2_MPLX_ENTER(m) \
79 do { apr_status_t rv_lock; if ((rv_lock = apr_thread_mutex_lock(m->lock)) != APR_SUCCESS) {\
80 return rv_lock;\
81 } } while(0)
82
83 #define H2_MPLX_LEAVE(m) \
84 apr_thread_mutex_unlock(m->lock)
85
86 #define H2_MPLX_ENTER_ALWAYS(m) \
87 apr_thread_mutex_lock(m->lock)
88
89 #define H2_MPLX_ENTER_MAYBE(m, dolock) \
90 if (dolock) apr_thread_mutex_lock(m->lock)
91
92 #define H2_MPLX_LEAVE_MAYBE(m, dolock) \
93 if (dolock) apr_thread_mutex_unlock(m->lock)
94
c1_input_consumed(void * ctx,h2_bucket_beam * beam,apr_off_t length)95 static void c1_input_consumed(void *ctx, h2_bucket_beam *beam, apr_off_t length)
96 {
97 h2_stream_in_consumed(ctx, length);
98 }
99
stream_is_running(h2_stream * stream)100 static int stream_is_running(h2_stream *stream)
101 {
102 h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(stream->c2);
103 return conn_ctx && conn_ctx->started_at != 0 && !conn_ctx->done;
104 }
105
h2_mplx_c1_stream_is_running(h2_mplx * m,h2_stream * stream)106 int h2_mplx_c1_stream_is_running(h2_mplx *m, h2_stream *stream)
107 {
108 int rv;
109
110 H2_MPLX_ENTER(m);
111 rv = stream_is_running(stream);
112 H2_MPLX_LEAVE(m);
113 return rv;
114 }
115
c1c2_stream_joined(h2_mplx * m,h2_stream * stream)116 static void c1c2_stream_joined(h2_mplx *m, h2_stream *stream)
117 {
118 ap_assert(!stream_is_running(stream));
119
120 h2_ihash_remove(m->shold, stream->id);
121 APR_ARRAY_PUSH(m->spurge, h2_stream *) = stream;
122 }
123
m_stream_cleanup(h2_mplx * m,h2_stream * stream)124 static void m_stream_cleanup(h2_mplx *m, h2_stream *stream)
125 {
126 h2_conn_ctx_t *c2_ctx = stream->c2? h2_conn_ctx_get(stream->c2) : NULL;
127
128 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
129 H2_STRM_MSG(stream, "cleanup, unsubscribing from beam events"));
130 if (stream->output) {
131 h2_beam_on_was_empty(stream->output, NULL, NULL);
132 }
133 if (stream->input) {
134 h2_beam_on_received(stream->input, NULL, NULL);
135 h2_beam_on_consumed(stream->input, NULL, NULL);
136 }
137
138 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
139 H2_STRM_MSG(stream, "cleanup, removing from registries"));
140 ap_assert(stream->state == H2_SS_CLEANUP);
141 h2_stream_cleanup(stream);
142 h2_ihash_remove(m->streams, stream->id);
143 h2_iq_remove(m->q, stream->id);
144
145 if (c2_ctx) {
146 if (!stream_is_running(stream)) {
147 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
148 H2_STRM_MSG(stream, "cleanup, c2 is done, move to spurge"));
149 /* processing has finished */
150 APR_ARRAY_PUSH(m->spurge, h2_stream *) = stream;
151 }
152 else {
153 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
154 H2_STRM_MSG(stream, "cleanup, c2 is running, abort"));
155 /* c2 is still running */
156 stream->c2->aborted = 1;
157 if (stream->input) {
158 h2_beam_abort(stream->input, m->c1);
159 }
160 if (stream->output) {
161 h2_beam_abort(stream->output, m->c1);
162 }
163 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
164 H2_STRM_MSG(stream, "cleanup, c2 is done, move to shold"));
165 h2_ihash_add(m->shold, stream);
166 }
167 }
168 else {
169 /* never started */
170 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
171 H2_STRM_MSG(stream, "cleanup, never started, move to spurge"));
172 APR_ARRAY_PUSH(m->spurge, h2_stream *) = stream;
173 }
174 }
175
176 /**
177 * A h2_mplx needs to be thread-safe *and* if will be called by
178 * the h2_session thread *and* the h2_worker threads. Therefore:
179 * - calls are protected by a mutex lock, m->lock
180 * - the pool needs its own allocator, since apr_allocator_t are
181 * not re-entrant. The separate allocator works without a
182 * separate lock since we already protect h2_mplx itself.
183 * Since HTTP/2 connections can be expected to live longer than
184 * their HTTP/1 cousins, the separate allocator seems to work better
185 * than protecting a shared h2_session one with an own lock.
186 */
h2_mplx_c1_create(h2_stream * stream0,server_rec * s,apr_pool_t * parent,h2_workers * workers)187 h2_mplx *h2_mplx_c1_create(h2_stream *stream0, server_rec *s, apr_pool_t *parent,
188 h2_workers *workers)
189 {
190 h2_conn_ctx_t *conn_ctx;
191 apr_status_t status = APR_SUCCESS;
192 apr_allocator_t *allocator;
193 apr_thread_mutex_t *mutex = NULL;
194 h2_mplx *m = NULL;
195
196 m = apr_pcalloc(parent, sizeof(h2_mplx));
197 m->stream0 = stream0;
198 m->c1 = stream0->c2;
199 m->s = s;
200 m->id = m->c1->id;
201
202 /* We create a pool with its own allocator to be used for
203 * processing secondary connections. This is the only way to have the
204 * processing independent of its parent pool in the sense that it
205 * can work in another thread. Also, the new allocator needs its own
206 * mutex to synchronize sub-pools.
207 */
208 status = apr_allocator_create(&allocator);
209 if (status != APR_SUCCESS) {
210 allocator = NULL;
211 goto failure;
212 }
213
214 apr_allocator_max_free_set(allocator, ap_max_mem_free);
215 apr_pool_create_ex(&m->pool, parent, NULL, allocator);
216 if (!m->pool) goto failure;
217
218 apr_pool_tag(m->pool, "h2_mplx");
219 apr_allocator_owner_set(allocator, m->pool);
220
221 status = apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_DEFAULT,
222 m->pool);
223 if (APR_SUCCESS != status) goto failure;
224 apr_allocator_mutex_set(allocator, mutex);
225
226 status = apr_thread_mutex_create(&m->lock, APR_THREAD_MUTEX_DEFAULT,
227 m->pool);
228 if (APR_SUCCESS != status) goto failure;
229
230 status = apr_thread_cond_create(&m->join_wait, m->pool);
231 if (APR_SUCCESS != status) goto failure;
232
233 m->max_streams = h2_config_sgeti(s, H2_CONF_MAX_STREAMS);
234 m->stream_max_mem = h2_config_sgeti(s, H2_CONF_STREAM_MAX_MEM);
235
236 m->streams = h2_ihash_create(m->pool, offsetof(h2_stream,id));
237 m->shold = h2_ihash_create(m->pool, offsetof(h2_stream,id));
238 m->spurge = apr_array_make(m->pool, 10, sizeof(h2_stream*));
239 m->q = h2_iq_create(m->pool, m->max_streams);
240
241 m->workers = workers;
242 m->processing_max = workers->max_workers;
243 m->processing_limit = 6; /* the original h1 max parallel connections */
244 m->last_mood_change = apr_time_now();
245 m->mood_update_interval = apr_time_from_msec(100);
246
247 status = mplx_pollset_create(m);
248 if (APR_SUCCESS != status) {
249 ap_log_cerror(APLOG_MARK, APLOG_ERR, status, m->c1, APLOGNO(10308)
250 "nghttp2: could not create pollset");
251 goto failure;
252 }
253 m->streams_to_poll = apr_array_make(m->pool, 10, sizeof(h2_stream*));
254 m->streams_ev_in = apr_array_make(m->pool, 10, sizeof(h2_stream*));
255 m->streams_ev_out = apr_array_make(m->pool, 10, sizeof(h2_stream*));
256
257 #if !H2_POLL_STREAMS
258 status = apr_thread_mutex_create(&m->poll_lock, APR_THREAD_MUTEX_DEFAULT,
259 m->pool);
260 if (APR_SUCCESS != status) goto failure;
261 m->streams_input_read = h2_iq_create(m->pool, 10);
262 m->streams_output_written = h2_iq_create(m->pool, 10);
263 #endif
264
265 conn_ctx = h2_conn_ctx_get(m->c1);
266 mplx_pollset_add(m, conn_ctx);
267
268 m->scratch_r = apr_pcalloc(m->pool, sizeof(*m->scratch_r));
269
270 return m;
271
272 failure:
273 if (m->pool) {
274 apr_pool_destroy(m->pool);
275 }
276 else if (allocator) {
277 apr_allocator_destroy(allocator);
278 }
279 return NULL;
280 }
281
h2_mplx_c1_shutdown(h2_mplx * m)282 int h2_mplx_c1_shutdown(h2_mplx *m)
283 {
284 int max_stream_id_started = 0;
285
286 H2_MPLX_ENTER(m);
287
288 max_stream_id_started = m->max_stream_id_started;
289 /* Clear schedule queue, disabling existing streams from starting */
290 h2_iq_clear(m->q);
291
292 H2_MPLX_LEAVE(m);
293 return max_stream_id_started;
294 }
295
296 typedef struct {
297 h2_mplx_stream_cb *cb;
298 void *ctx;
299 } stream_iter_ctx_t;
300
m_stream_iter_wrap(void * ctx,void * stream)301 static int m_stream_iter_wrap(void *ctx, void *stream)
302 {
303 stream_iter_ctx_t *x = ctx;
304 return x->cb(stream, x->ctx);
305 }
306
h2_mplx_c1_streams_do(h2_mplx * m,h2_mplx_stream_cb * cb,void * ctx)307 apr_status_t h2_mplx_c1_streams_do(h2_mplx *m, h2_mplx_stream_cb *cb, void *ctx)
308 {
309 stream_iter_ctx_t x;
310
311 H2_MPLX_ENTER(m);
312
313 x.cb = cb;
314 x.ctx = ctx;
315 h2_ihash_iter(m->streams, m_stream_iter_wrap, &x);
316
317 H2_MPLX_LEAVE(m);
318 return APR_SUCCESS;
319 }
320
m_report_stream_iter(void * ctx,void * val)321 static int m_report_stream_iter(void *ctx, void *val) {
322 h2_mplx *m = ctx;
323 h2_stream *stream = val;
324 h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(stream->c2);
325 ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c1,
326 H2_STRM_MSG(stream, "started=%d, scheduled=%d, ready=%d, out_buffer=%ld"),
327 !!stream->c2, stream->scheduled, h2_stream_is_ready(stream),
328 (long)(stream->output? h2_beam_get_buffered(stream->output) : -1));
329 if (conn_ctx) {
330 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c1, /* NO APLOGNO */
331 H2_STRM_MSG(stream, "->03198: %s %s %s"
332 "[started=%d/done=%d]"),
333 conn_ctx->request->method, conn_ctx->request->authority,
334 conn_ctx->request->path, conn_ctx->started_at != 0,
335 conn_ctx->done);
336 }
337 else {
338 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c1, /* NO APLOGNO */
339 H2_STRM_MSG(stream, "->03198: not started"));
340 }
341 return 1;
342 }
343
m_unexpected_stream_iter(void * ctx,void * val)344 static int m_unexpected_stream_iter(void *ctx, void *val) {
345 h2_mplx *m = ctx;
346 h2_stream *stream = val;
347 ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c1, /* NO APLOGNO */
348 H2_STRM_MSG(stream, "unexpected, started=%d, scheduled=%d, ready=%d"),
349 !!stream->c2, stream->scheduled, h2_stream_is_ready(stream));
350 return 1;
351 }
352
m_stream_cancel_iter(void * ctx,void * val)353 static int m_stream_cancel_iter(void *ctx, void *val) {
354 h2_mplx *m = ctx;
355 h2_stream *stream = val;
356
357 /* disable input consumed reporting */
358 if (stream->input) {
359 h2_beam_abort(stream->input, m->c1);
360 }
361 /* take over event monitoring */
362 h2_stream_set_monitor(stream, NULL);
363 /* Reset, should transit to CLOSED state */
364 h2_stream_rst(stream, H2_ERR_NO_ERROR);
365 /* All connection data has been sent, simulate cleanup */
366 h2_stream_dispatch(stream, H2_SEV_EOS_SENT);
367 m_stream_cleanup(m, stream);
368 return 0;
369 }
370
h2_mplx_c1_destroy(h2_mplx * m)371 void h2_mplx_c1_destroy(h2_mplx *m)
372 {
373 apr_status_t status;
374 int i, wait_secs = 60, old_aborted;
375
376 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
377 "h2_mplx(%ld): start release", m->id);
378 /* How to shut down a h2 connection:
379 * 0. abort and tell the workers that no more work will come from us */
380 m->aborted = 1;
381 h2_workers_unregister(m->workers, m);
382
383 H2_MPLX_ENTER_ALWAYS(m);
384
385 /* While really terminating any c2 connections, treat the master
386 * connection as aborted. It's not as if we could send any more data
387 * at this point. */
388 old_aborted = m->c1->aborted;
389 m->c1->aborted = 1;
390
391 /* How to shut down a h2 connection:
392 * 1. cancel all streams still active */
393 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c1,
394 "h2_mplx(%ld): release, %d/%d/%d streams (total/hold/purge), %d streams",
395 m->id, (int)h2_ihash_count(m->streams),
396 (int)h2_ihash_count(m->shold), m->spurge->nelts, m->processing_count);
397 while (!h2_ihash_iter(m->streams, m_stream_cancel_iter, m)) {
398 /* until empty */
399 }
400
401 /* 2. no more streams should be scheduled or in the active set */
402 ap_assert(h2_ihash_empty(m->streams));
403 ap_assert(h2_iq_empty(m->q));
404
405 /* 3. while workers are busy on this connection, meaning they
406 * are processing streams from this connection, wait on them finishing
407 * in order to wake us and let us check again.
408 * Eventually, this has to succeed. */
409 for (i = 0; h2_ihash_count(m->shold) > 0; ++i) {
410 status = apr_thread_cond_timedwait(m->join_wait, m->lock, apr_time_from_sec(wait_secs));
411
412 if (APR_STATUS_IS_TIMEUP(status)) {
413 /* This can happen if we have very long running requests
414 * that do not time out on IO. */
415 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c1, APLOGNO(03198)
416 "h2_mplx(%ld): waited %d sec for %d streams",
417 m->id, i*wait_secs, (int)h2_ihash_count(m->shold));
418 h2_ihash_iter(m->shold, m_report_stream_iter, m);
419 }
420 }
421 m->join_wait = NULL;
422
423 /* 4. With all workers done, all streams should be in spurge */
424 ap_assert(m->processing_count == 0);
425 if (!h2_ihash_empty(m->shold)) {
426 ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c1, APLOGNO(03516)
427 "h2_mplx(%ld): unexpected %d streams in hold",
428 m->id, (int)h2_ihash_count(m->shold));
429 h2_ihash_iter(m->shold, m_unexpected_stream_iter, m);
430 }
431
432 m->c1->aborted = old_aborted;
433 H2_MPLX_LEAVE(m);
434
435 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c1, "h2_mplx(%ld): released", m->id);
436 }
437
h2_mplx_c1_stream_cleanup(h2_mplx * m,h2_stream * stream,int * pstream_count)438 apr_status_t h2_mplx_c1_stream_cleanup(h2_mplx *m, h2_stream *stream,
439 int *pstream_count)
440 {
441 H2_MPLX_ENTER(m);
442
443 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
444 H2_STRM_MSG(stream, "cleanup"));
445 m_stream_cleanup(m, stream);
446 *pstream_count = (int)h2_ihash_count(m->streams);
447 H2_MPLX_LEAVE(m);
448 return APR_SUCCESS;
449 }
450
h2_mplx_c2_stream_get(h2_mplx * m,int stream_id)451 const h2_stream *h2_mplx_c2_stream_get(h2_mplx *m, int stream_id)
452 {
453 h2_stream *s = NULL;
454
455 H2_MPLX_ENTER_ALWAYS(m);
456 s = h2_ihash_get(m->streams, stream_id);
457 H2_MPLX_LEAVE(m);
458
459 return s;
460 }
461
462
c1_update_scoreboard(h2_mplx * m,h2_stream * stream)463 static void c1_update_scoreboard(h2_mplx *m, h2_stream *stream)
464 {
465 if (stream->c2) {
466 m->scratch_r->connection = stream->c2;
467 m->scratch_r->bytes_sent = stream->out_frame_octets;
468 ap_increment_counts(m->c1->sbh, m->scratch_r);
469 m->scratch_r->connection = NULL;
470 }
471 }
472
c1_purge_streams(h2_mplx * m)473 static void c1_purge_streams(h2_mplx *m)
474 {
475 h2_stream *stream;
476 int i;
477
478 for (i = 0; i < m->spurge->nelts; ++i) {
479 stream = APR_ARRAY_IDX(m->spurge, i, h2_stream*);
480 ap_assert(stream->state == H2_SS_CLEANUP);
481
482 c1_update_scoreboard(m, stream);
483
484 if (stream->input) {
485 h2_beam_destroy(stream->input, m->c1);
486 stream->input = NULL;
487 }
488 if (stream->c2) {
489 conn_rec *c2 = stream->c2;
490 h2_conn_ctx_t *c2_ctx = h2_conn_ctx_get(c2);
491 apr_status_t rv;
492
493 stream->c2 = NULL;
494 ap_assert(c2_ctx);
495 rv = mplx_pollset_remove(m, c2_ctx);
496 if (APR_SUCCESS != rv) {
497 ap_log_cerror(APLOG_MARK, APLOG_INFO, rv, m->c1,
498 "h2_mplx(%ld-%d): pollset_remove %d on purge",
499 m->id, stream->id, c2_ctx->stream_id);
500 }
501
502 h2_conn_ctx_destroy(c2);
503 h2_c2_destroy(c2);
504 }
505 h2_stream_destroy(stream);
506 }
507 apr_array_clear(m->spurge);
508 }
509
h2_mplx_c1_poll(h2_mplx * m,apr_interval_time_t timeout,stream_ev_callback * on_stream_input,stream_ev_callback * on_stream_output,void * on_ctx)510 apr_status_t h2_mplx_c1_poll(h2_mplx *m, apr_interval_time_t timeout,
511 stream_ev_callback *on_stream_input,
512 stream_ev_callback *on_stream_output,
513 void *on_ctx)
514 {
515 apr_status_t rv;
516
517 H2_MPLX_ENTER(m);
518
519 if (m->aborted) {
520 rv = APR_ECONNABORTED;
521 goto cleanup;
522 }
523 /* Purge (destroy) streams outside of pollset processing.
524 * Streams that are registered in the pollset, will be removed
525 * when they are destroyed, but the pollset works on copies
526 * of these registrations. So, if we destroy streams while
527 * processing pollset events, we might access freed memory.
528 */
529 if (m->spurge->nelts) {
530 c1_purge_streams(m);
531 }
532 rv = mplx_pollset_poll(m, timeout, on_stream_input, on_stream_output, on_ctx);
533
534 cleanup:
535 H2_MPLX_LEAVE(m);
536 return rv;
537 }
538
h2_mplx_c1_reprioritize(h2_mplx * m,h2_stream_pri_cmp_fn * cmp,h2_session * session)539 apr_status_t h2_mplx_c1_reprioritize(h2_mplx *m, h2_stream_pri_cmp_fn *cmp,
540 h2_session *session)
541 {
542 apr_status_t status;
543
544 H2_MPLX_ENTER(m);
545
546 if (m->aborted) {
547 status = APR_ECONNABORTED;
548 }
549 else {
550 h2_iq_sort(m->q, cmp, session);
551 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c1,
552 "h2_mplx(%ld): reprioritize streams", m->id);
553 status = APR_SUCCESS;
554 }
555
556 H2_MPLX_LEAVE(m);
557 return status;
558 }
559
ms_register_if_needed(h2_mplx * m,int from_master)560 static void ms_register_if_needed(h2_mplx *m, int from_master)
561 {
562 if (!m->aborted && !m->is_registered && !h2_iq_empty(m->q)) {
563 apr_status_t status = h2_workers_register(m->workers, m);
564 if (status == APR_SUCCESS) {
565 m->is_registered = 1;
566 }
567 else if (from_master) {
568 ap_log_cerror(APLOG_MARK, APLOG_ERR, status, m->c1, APLOGNO(10021)
569 "h2_mplx(%ld): register at workers", m->id);
570 }
571 }
572 }
573
c1_process_stream(h2_mplx * m,h2_stream * stream,h2_stream_pri_cmp_fn * cmp,h2_session * session)574 static apr_status_t c1_process_stream(h2_mplx *m,
575 h2_stream *stream,
576 h2_stream_pri_cmp_fn *cmp,
577 h2_session *session)
578 {
579 apr_status_t rv;
580
581 if (m->aborted) {
582 rv = APR_ECONNABORTED;
583 goto cleanup;
584 }
585 if (!stream->request) {
586 rv = APR_EINVAL;
587 goto cleanup;
588 }
589 if (APLOGctrace1(m->c1)) {
590 const h2_request *r = stream->request;
591 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c1,
592 H2_STRM_MSG(stream, "process %s %s://%s%s chunked=%d"),
593 r->method, r->scheme, r->authority, r->path, r->chunked);
594 }
595
596 rv = h2_stream_setup_input(stream);
597 if (APR_SUCCESS != rv) goto cleanup;
598
599 stream->scheduled = 1;
600 h2_ihash_add(m->streams, stream);
601 if (h2_stream_is_ready(stream)) {
602 /* already have a response */
603 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c1,
604 H2_STRM_MSG(stream, "process, ready already"));
605 }
606 else {
607 h2_iq_add(m->q, stream->id, cmp, session);
608 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c1,
609 H2_STRM_MSG(stream, "process, added to q"));
610 }
611
612 cleanup:
613 return rv;
614 }
615
h2_mplx_c1_process(h2_mplx * m,h2_iqueue * ready_to_process,h2_stream_get_fn * get_stream,h2_stream_pri_cmp_fn * stream_pri_cmp,h2_session * session,int * pstream_count)616 apr_status_t h2_mplx_c1_process(h2_mplx *m,
617 h2_iqueue *ready_to_process,
618 h2_stream_get_fn *get_stream,
619 h2_stream_pri_cmp_fn *stream_pri_cmp,
620 h2_session *session,
621 int *pstream_count)
622 {
623 apr_status_t rv = APR_SUCCESS;
624 int sid;
625
626 H2_MPLX_ENTER(m);
627
628 while ((sid = h2_iq_shift(ready_to_process)) > 0) {
629 h2_stream *stream = get_stream(session, sid);
630 if (stream) {
631 ap_assert(!stream->scheduled);
632 rv = c1_process_stream(session->mplx, stream, stream_pri_cmp, session);
633 if (APR_SUCCESS != rv) {
634 h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR);
635 }
636 }
637 else {
638 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c1,
639 "h2_stream(%ld-%d): not found to process", m->id, sid);
640 }
641 }
642 ms_register_if_needed(m, 1);
643 *pstream_count = (int)h2_ihash_count(m->streams);
644 #if APR_POOL_DEBUG
645 do {
646 apr_size_t mem_g, mem_m, mem_s, mem_w, mem_c1;
647
648 mem_g = pchild? apr_pool_num_bytes(pchild, 1) : 0;
649 mem_m = apr_pool_num_bytes(m->pool, 1);
650 mem_s = apr_pool_num_bytes(session->pool, 1);
651 mem_w = apr_pool_num_bytes(m->workers->pool, 1);
652 mem_c1 = apr_pool_num_bytes(m->c1->pool, 1);
653 ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, m->c1,
654 "h2_mplx(%ld): child mem=%ld, mplx mem=%ld, session mem=%ld, workers=%ld, c1=%ld",
655 m->id, (long)mem_g, (long)mem_m, (long)mem_s, (long)mem_w, (long)mem_c1);
656
657 } while (0);
658 #endif
659
660 H2_MPLX_LEAVE(m);
661 return rv;
662 }
663
h2_mplx_c1_fwd_input(h2_mplx * m,struct h2_iqueue * input_pending,h2_stream_get_fn * get_stream,struct h2_session * session)664 apr_status_t h2_mplx_c1_fwd_input(h2_mplx *m, struct h2_iqueue *input_pending,
665 h2_stream_get_fn *get_stream,
666 struct h2_session *session)
667 {
668 int sid;
669
670 H2_MPLX_ENTER(m);
671
672 while ((sid = h2_iq_shift(input_pending)) > 0) {
673 h2_stream *stream = get_stream(session, sid);
674 if (stream) {
675 H2_MPLX_LEAVE(m);
676 h2_stream_flush_input(stream);
677 H2_MPLX_ENTER(m);
678 }
679 }
680
681 H2_MPLX_LEAVE(m);
682 return APR_SUCCESS;
683 }
684
c2_beam_input_write_notify(void * ctx,h2_bucket_beam * beam)685 static void c2_beam_input_write_notify(void *ctx, h2_bucket_beam *beam)
686 {
687 conn_rec *c = ctx;
688 h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c);
689
690 (void)beam;
691 if (conn_ctx && conn_ctx->stream_id && conn_ctx->pipe_in_prod[H2_PIPE_IN]) {
692 apr_file_putc(1, conn_ctx->pipe_in_prod[H2_PIPE_IN]);
693 }
694 }
695
c2_beam_input_read_notify(void * ctx,h2_bucket_beam * beam)696 static void c2_beam_input_read_notify(void *ctx, h2_bucket_beam *beam)
697 {
698 conn_rec *c = ctx;
699 h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c);
700
701 if (conn_ctx && conn_ctx->stream_id) {
702 if (conn_ctx->pipe_in_drain[H2_PIPE_IN]) {
703 apr_file_putc(1, conn_ctx->pipe_in_drain[H2_PIPE_IN]);
704 }
705 #if !H2_POLL_STREAMS
706 else {
707 apr_thread_mutex_lock(conn_ctx->mplx->poll_lock);
708 h2_iq_append(conn_ctx->mplx->streams_input_read, conn_ctx->stream_id);
709 apr_pollset_wakeup(conn_ctx->mplx->pollset);
710 apr_thread_mutex_unlock(conn_ctx->mplx->poll_lock);
711 }
712 #endif
713 }
714 }
715
c2_beam_output_write_notify(void * ctx,h2_bucket_beam * beam)716 static void c2_beam_output_write_notify(void *ctx, h2_bucket_beam *beam)
717 {
718 conn_rec *c = ctx;
719 h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c);
720
721 if (conn_ctx && conn_ctx->stream_id) {
722 if (conn_ctx->pipe_out_prod[H2_PIPE_IN]) {
723 apr_file_putc(1, conn_ctx->pipe_out_prod[H2_PIPE_IN]);
724 }
725 #if !H2_POLL_STREAMS
726 else {
727 apr_thread_mutex_lock(conn_ctx->mplx->poll_lock);
728 h2_iq_append(conn_ctx->mplx->streams_output_written, conn_ctx->stream_id);
729 apr_pollset_wakeup(conn_ctx->mplx->pollset);
730 apr_thread_mutex_unlock(conn_ctx->mplx->poll_lock);
731 }
732 #endif
733 }
734 }
735
c2_setup_io(h2_mplx * m,conn_rec * c2,h2_stream * stream)736 static apr_status_t c2_setup_io(h2_mplx *m, conn_rec *c2, h2_stream *stream)
737 {
738 h2_conn_ctx_t *conn_ctx;
739 apr_status_t rv = APR_SUCCESS;
740 const char *action = "init";
741
742 rv = h2_conn_ctx_init_for_c2(&conn_ctx, c2, m, stream);
743 if (APR_SUCCESS != rv) goto cleanup;
744
745 if (!conn_ctx->beam_out) {
746 action = "create output beam";
747 rv = h2_beam_create(&conn_ctx->beam_out, c2, conn_ctx->req_pool,
748 stream->id, "output", 0, c2->base_server->timeout);
749 if (APR_SUCCESS != rv) goto cleanup;
750
751 h2_beam_buffer_size_set(conn_ctx->beam_out, m->stream_max_mem);
752 h2_beam_on_was_empty(conn_ctx->beam_out, c2_beam_output_write_notify, c2);
753 }
754
755 if (stream->input) {
756 conn_ctx->beam_in = stream->input;
757 h2_beam_on_was_empty(stream->input, c2_beam_input_write_notify, c2);
758 h2_beam_on_received(stream->input, c2_beam_input_read_notify, c2);
759 h2_beam_on_consumed(stream->input, c1_input_consumed, stream);
760 }
761 else {
762 memset(&conn_ctx->pfd_in_drain, 0, sizeof(conn_ctx->pfd_in_drain));
763 }
764
765 #if H2_POLL_STREAMS
766 if (!conn_ctx->mplx_pool) {
767 apr_pool_create(&conn_ctx->mplx_pool, m->pool);
768 apr_pool_tag(conn_ctx->mplx_pool, "H2_MPLX_C2");
769 }
770
771 if (!conn_ctx->pipe_out_prod[H2_PIPE_OUT]) {
772 action = "create output pipe";
773 rv = apr_file_pipe_create_pools(&conn_ctx->pipe_out_prod[H2_PIPE_OUT],
774 &conn_ctx->pipe_out_prod[H2_PIPE_IN],
775 APR_FULL_NONBLOCK,
776 conn_ctx->mplx_pool, c2->pool);
777 if (APR_SUCCESS != rv) goto cleanup;
778 }
779 conn_ctx->pfd_out_prod.desc_type = APR_POLL_FILE;
780 conn_ctx->pfd_out_prod.desc.f = conn_ctx->pipe_out_prod[H2_PIPE_OUT];
781 conn_ctx->pfd_out_prod.reqevents = APR_POLLIN | APR_POLLERR | APR_POLLHUP;
782 conn_ctx->pfd_out_prod.client_data = conn_ctx;
783
784 if (stream->input) {
785 if (!conn_ctx->pipe_in_prod[H2_PIPE_OUT]) {
786 action = "create input write pipe";
787 rv = apr_file_pipe_create_pools(&conn_ctx->pipe_in_prod[H2_PIPE_OUT],
788 &conn_ctx->pipe_in_prod[H2_PIPE_IN],
789 APR_READ_BLOCK,
790 c2->pool, conn_ctx->mplx_pool);
791 if (APR_SUCCESS != rv) goto cleanup;
792 }
793 if (!conn_ctx->pipe_in_drain[H2_PIPE_OUT]) {
794 action = "create input read pipe";
795 rv = apr_file_pipe_create_pools(&conn_ctx->pipe_in_drain[H2_PIPE_OUT],
796 &conn_ctx->pipe_in_drain[H2_PIPE_IN],
797 APR_FULL_NONBLOCK,
798 c2->pool, conn_ctx->mplx_pool);
799 if (APR_SUCCESS != rv) goto cleanup;
800 }
801 conn_ctx->pfd_in_drain.desc_type = APR_POLL_FILE;
802 conn_ctx->pfd_in_drain.desc.f = conn_ctx->pipe_in_drain[H2_PIPE_OUT];
803 conn_ctx->pfd_in_drain.reqevents = APR_POLLIN | APR_POLLERR | APR_POLLHUP;
804 conn_ctx->pfd_in_drain.client_data = conn_ctx;
805 }
806 #else
807 memset(&conn_ctx->pfd_out_prod, 0, sizeof(conn_ctx->pfd_out_prod));
808 memset(&conn_ctx->pipe_in_prod, 0, sizeof(conn_ctx->pipe_in_prod));
809 memset(&conn_ctx->pipe_in_drain, 0, sizeof(conn_ctx->pipe_in_drain));
810 #endif
811
812 cleanup:
813 stream->output = (APR_SUCCESS == rv)? conn_ctx->beam_out : NULL;
814 if (APR_SUCCESS != rv) {
815 ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, c2,
816 H2_STRM_LOG(APLOGNO(10309), stream,
817 "error %s"), action);
818 }
819 return rv;
820 }
821
s_next_c2(h2_mplx * m)822 static conn_rec *s_next_c2(h2_mplx *m)
823 {
824 h2_stream *stream = NULL;
825 apr_status_t rv;
826 int sid;
827 conn_rec *c2;
828
829 while (!m->aborted && !stream && (m->processing_count < m->processing_limit)
830 && (sid = h2_iq_shift(m->q)) > 0) {
831 stream = h2_ihash_get(m->streams, sid);
832 }
833
834 if (!stream) {
835 if (m->processing_count >= m->processing_limit && !h2_iq_empty(m->q)) {
836 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c1,
837 "h2_session(%ld): delaying request processing. "
838 "Current limit is %d and %d workers are in use.",
839 m->id, m->processing_limit, m->processing_count);
840 }
841 return NULL;
842 }
843
844 if (sid > m->max_stream_id_started) {
845 m->max_stream_id_started = sid;
846 }
847
848 c2 = h2_c2_create(m->c1, m->pool);
849 ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c1,
850 H2_STRM_MSG(stream, "created new c2"));
851
852 rv = c2_setup_io(m, c2, stream);
853 if (APR_SUCCESS != rv) {
854 return NULL;
855 }
856
857 stream->c2 = c2;
858 ++m->processing_count;
859 APR_ARRAY_PUSH(m->streams_to_poll, h2_stream *) = stream;
860 apr_pollset_wakeup(m->pollset);
861
862 return c2;
863 }
864
h2_mplx_worker_pop_c2(h2_mplx * m,conn_rec ** out_c)865 apr_status_t h2_mplx_worker_pop_c2(h2_mplx *m, conn_rec **out_c)
866 {
867 apr_status_t rv = APR_EOF;
868
869 *out_c = NULL;
870 ap_assert(m);
871 ap_assert(m->lock);
872
873 if (APR_SUCCESS != (rv = apr_thread_mutex_lock(m->lock))) {
874 return rv;
875 }
876
877 if (m->aborted) {
878 rv = APR_EOF;
879 }
880 else {
881 *out_c = s_next_c2(m);
882 rv = (*out_c != NULL && !h2_iq_empty(m->q))? APR_EAGAIN : APR_SUCCESS;
883 }
884 if (APR_EAGAIN != rv) {
885 m->is_registered = 0; /* h2_workers will discard this mplx */
886 }
887 H2_MPLX_LEAVE(m);
888 return rv;
889 }
890
s_c2_done(h2_mplx * m,conn_rec * c2,h2_conn_ctx_t * conn_ctx)891 static void s_c2_done(h2_mplx *m, conn_rec *c2, h2_conn_ctx_t *conn_ctx)
892 {
893 h2_stream *stream;
894
895 ap_assert(conn_ctx);
896 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c2,
897 "h2_mplx(%s-%d): c2 done", conn_ctx->id, conn_ctx->stream_id);
898
899 ap_assert(conn_ctx->done == 0);
900 conn_ctx->done = 1;
901 conn_ctx->done_at = apr_time_now();
902 ++c2->keepalives;
903 /* From here on, the final handling of c2 is done by c1 processing.
904 * Which means we can give it c1's scoreboard handle for updates. */
905 c2->sbh = m->c1->sbh;
906
907 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c2,
908 "h2_mplx(%s-%d): request done, %f ms elapsed",
909 conn_ctx->id, conn_ctx->stream_id,
910 (conn_ctx->done_at - conn_ctx->started_at) / 1000.0);
911
912 if (!conn_ctx->has_final_response) {
913 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, conn_ctx->last_err, c2,
914 "h2_c2(%s-%d): processing finished without final response",
915 conn_ctx->id, conn_ctx->stream_id);
916 c2->aborted = 1;
917 }
918 else if (!c2->aborted && conn_ctx->started_at > m->last_mood_change) {
919 s_mplx_be_happy(m, c2, conn_ctx);
920 }
921
922 stream = h2_ihash_get(m->streams, conn_ctx->stream_id);
923 if (stream) {
924 /* stream not done yet. trigger a potential polling on the output
925 * since nothing more will happening here. */
926 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c2,
927 H2_STRM_MSG(stream, "c2_done, stream open"));
928 c2_beam_output_write_notify(c2, NULL);
929 }
930 else if ((stream = h2_ihash_get(m->shold, conn_ctx->stream_id)) != NULL) {
931 /* stream is done, was just waiting for this. */
932 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c2,
933 H2_STRM_MSG(stream, "c2_done, in hold"));
934 c1c2_stream_joined(m, stream);
935 }
936 else {
937 int i;
938
939 for (i = 0; i < m->spurge->nelts; ++i) {
940 if (stream == APR_ARRAY_IDX(m->spurge, i, h2_stream*)) {
941 ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, c2,
942 H2_STRM_LOG(APLOGNO(03517), stream, "already in spurge"));
943 ap_assert("stream should not be in spurge" == NULL);
944 return;
945 }
946 }
947
948 ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, c2, APLOGNO(03518)
949 "h2_mplx(%s-%d): c2_done, stream not found",
950 conn_ctx->id, conn_ctx->stream_id);
951 ap_assert("stream should still be available" == NULL);
952 }
953 }
954
h2_mplx_worker_c2_done(conn_rec * c2,conn_rec ** out_c2)955 void h2_mplx_worker_c2_done(conn_rec *c2, conn_rec **out_c2)
956 {
957 h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c2);
958 h2_mplx *m;
959
960 if (!conn_ctx || !conn_ctx->mplx) return;
961 m = conn_ctx->mplx;
962
963 H2_MPLX_ENTER_ALWAYS(m);
964
965 --m->processing_count;
966 s_c2_done(m, c2, conn_ctx);
967
968 if (m->join_wait) {
969 apr_thread_cond_signal(m->join_wait);
970 }
971 if (out_c2) {
972 /* caller wants another connection to process */
973 *out_c2 = s_next_c2(m);
974 }
975 ms_register_if_needed(m, 0);
976
977 H2_MPLX_LEAVE(m);
978 }
979
980 /*******************************************************************************
981 * h2_mplx DoS protection
982 ******************************************************************************/
983
s_mplx_be_happy(h2_mplx * m,conn_rec * c,h2_conn_ctx_t * conn_ctx)984 static apr_status_t s_mplx_be_happy(h2_mplx *m, conn_rec *c, h2_conn_ctx_t *conn_ctx)
985 {
986 apr_time_t now;
987
988 --m->irritations_since;
989 now = apr_time_now();
990 if (m->processing_limit < m->processing_max
991 && (now - m->last_mood_change >= m->mood_update_interval
992 || m->irritations_since < -m->processing_limit)) {
993 m->processing_limit = H2MIN(m->processing_limit * 2, m->processing_max);
994 m->last_mood_change = now;
995 m->irritations_since = 0;
996 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
997 "h2_mplx(%ld): mood update, increasing worker limit to %d",
998 m->id, m->processing_limit);
999 }
1000 return APR_SUCCESS;
1001 }
1002
m_be_annoyed(h2_mplx * m)1003 static apr_status_t m_be_annoyed(h2_mplx *m)
1004 {
1005 apr_status_t status = APR_SUCCESS;
1006 apr_time_t now;
1007
1008 ++m->irritations_since;
1009 now = apr_time_now();
1010 if (m->processing_limit > 2 &&
1011 ((now - m->last_mood_change >= m->mood_update_interval)
1012 || (m->irritations_since >= m->processing_limit))) {
1013
1014 if (m->processing_limit > 16) {
1015 m->processing_limit = 16;
1016 }
1017 else if (m->processing_limit > 8) {
1018 m->processing_limit = 8;
1019 }
1020 else if (m->processing_limit > 4) {
1021 m->processing_limit = 4;
1022 }
1023 else if (m->processing_limit > 2) {
1024 m->processing_limit = 2;
1025 }
1026 m->last_mood_change = now;
1027 m->irritations_since = 0;
1028 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c1,
1029 "h2_mplx(%ld): mood update, decreasing worker limit to %d",
1030 m->id, m->processing_limit);
1031 }
1032 return status;
1033 }
1034
1035 /*******************************************************************************
1036 * mplx master events dispatching
1037 ******************************************************************************/
1038
reset_is_acceptable(h2_stream * stream)1039 static int reset_is_acceptable(h2_stream *stream)
1040 {
1041 /* client may terminate a stream via H2 RST_STREAM message at any time.
1042 * This is annyoing when we have committed resources (e.g. worker threads)
1043 * to it, so our mood (e.g. willingness to commit resources on this
1044 * connection in the future) goes down.
1045 *
1046 * This is a DoS protection. We do not want to make it too easy for
1047 * a client to eat up server resources.
1048 *
1049 * However: there are cases where a RST_STREAM is the only way to end
1050 * a request. This includes websockets and server-side-event streams (SSEs).
1051 * The responses to such requests continue forever otherwise.
1052 *
1053 */
1054 if (!stream_is_running(stream)) return 1;
1055 if (!(stream->id & 0x01)) return 1; /* stream initiated by us. acceptable. */
1056 if (!stream->response) return 0; /* no response headers produced yet. bad. */
1057 if (!stream->out_data_frames) return 0; /* no response body data sent yet. bad. */
1058 return 1; /* otherwise, be forgiving */
1059 }
1060
h2_mplx_c1_client_rst(h2_mplx * m,int stream_id)1061 apr_status_t h2_mplx_c1_client_rst(h2_mplx *m, int stream_id)
1062 {
1063 h2_stream *stream;
1064 apr_status_t status = APR_SUCCESS;
1065
1066 H2_MPLX_ENTER_ALWAYS(m);
1067 stream = h2_ihash_get(m->streams, stream_id);
1068 if (stream && !reset_is_acceptable(stream)) {
1069 status = m_be_annoyed(m);
1070 }
1071 H2_MPLX_LEAVE(m);
1072 return status;
1073 }
1074
mplx_pollset_create(h2_mplx * m)1075 static apr_status_t mplx_pollset_create(h2_mplx *m)
1076 {
1077 int max_pdfs;
1078
1079 /* stream0 output, pdf_out+pfd_in_consume per active streams */
1080 max_pdfs = 1 + 2 * H2MIN(m->processing_max, m->max_streams);
1081 return apr_pollset_create(&m->pollset, max_pdfs, m->pool,
1082 APR_POLLSET_WAKEABLE);
1083 }
1084
mplx_pollset_add(h2_mplx * m,h2_conn_ctx_t * conn_ctx)1085 static apr_status_t mplx_pollset_add(h2_mplx *m, h2_conn_ctx_t *conn_ctx)
1086 {
1087 apr_status_t rv = APR_SUCCESS;
1088 const char *name = "";
1089
1090 if (conn_ctx->pfd_out_prod.reqevents) {
1091 name = "adding out";
1092 rv = apr_pollset_add(m->pollset, &conn_ctx->pfd_out_prod);
1093 if (APR_SUCCESS != rv) goto cleanup;
1094 }
1095
1096 if (conn_ctx->pfd_in_drain.reqevents) {
1097 name = "adding in_read";
1098 rv = apr_pollset_add(m->pollset, &conn_ctx->pfd_in_drain);
1099 }
1100
1101 cleanup:
1102 if (APR_SUCCESS != rv) {
1103 ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, m->c1,
1104 "h2_mplx(%ld-%d): error while adding to pollset %s",
1105 m->id, conn_ctx->stream_id, name);
1106 }
1107 return rv;
1108 }
1109
mplx_pollset_remove(h2_mplx * m,h2_conn_ctx_t * conn_ctx)1110 static apr_status_t mplx_pollset_remove(h2_mplx *m, h2_conn_ctx_t *conn_ctx)
1111 {
1112 apr_status_t rv = APR_SUCCESS;
1113 const char *name = "";
1114
1115 if (conn_ctx->pfd_out_prod.reqevents) {
1116 rv = apr_pollset_remove(m->pollset, &conn_ctx->pfd_out_prod);
1117 conn_ctx->pfd_out_prod.reqevents = 0;
1118 if (APR_SUCCESS != rv) goto cleanup;
1119 }
1120
1121 if (conn_ctx->pfd_in_drain.reqevents) {
1122 name = "in_read";
1123 rv = apr_pollset_remove(m->pollset, &conn_ctx->pfd_in_drain);
1124 conn_ctx->pfd_in_drain.reqevents = 0;
1125 if (APR_SUCCESS != rv) goto cleanup;
1126 }
1127
1128 cleanup:
1129 if (APR_SUCCESS != rv) {
1130 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, rv, m->c1,
1131 "h2_mplx(%ld-%d): error removing from pollset %s",
1132 m->id, conn_ctx->stream_id, name);
1133 }
1134 return rv;
1135 }
1136
mplx_pollset_poll(h2_mplx * m,apr_interval_time_t timeout,stream_ev_callback * on_stream_input,stream_ev_callback * on_stream_output,void * on_ctx)1137 static apr_status_t mplx_pollset_poll(h2_mplx *m, apr_interval_time_t timeout,
1138 stream_ev_callback *on_stream_input,
1139 stream_ev_callback *on_stream_output,
1140 void *on_ctx)
1141 {
1142 apr_status_t rv;
1143 const apr_pollfd_t *results, *pfd;
1144 apr_int32_t nresults, i;
1145 h2_conn_ctx_t *conn_ctx;
1146 h2_stream *stream;
1147
1148 /* Make sure we are not called recursively. */
1149 ap_assert(!m->polling);
1150 m->polling = 1;
1151 do {
1152 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
1153 "h2_mplx(%ld): enter polling timeout=%d",
1154 m->id, (int)apr_time_sec(timeout));
1155
1156 apr_array_clear(m->streams_ev_in);
1157 apr_array_clear(m->streams_ev_out);
1158
1159 do {
1160 /* add streams we started processing in the meantime */
1161 if (m->streams_to_poll->nelts) {
1162 for (i = 0; i < m->streams_to_poll->nelts; ++i) {
1163 stream = APR_ARRAY_IDX(m->streams_to_poll, i, h2_stream*);
1164 if (stream && stream->c2 && (conn_ctx = h2_conn_ctx_get(stream->c2))) {
1165 mplx_pollset_add(m, conn_ctx);
1166 }
1167 }
1168 apr_array_clear(m->streams_to_poll);
1169 }
1170
1171 #if !H2_POLL_STREAMS
1172 apr_thread_mutex_lock(m->poll_lock);
1173 if (!h2_iq_empty(m->streams_input_read)
1174 || !h2_iq_empty(m->streams_output_written)) {
1175 while ((i = h2_iq_shift(m->streams_input_read))) {
1176 stream = h2_ihash_get(m->streams, i);
1177 if (stream) {
1178 APR_ARRAY_PUSH(m->streams_ev_in, h2_stream*) = stream;
1179 }
1180 }
1181 while ((i = h2_iq_shift(m->streams_output_written))) {
1182 stream = h2_ihash_get(m->streams, i);
1183 if (stream) {
1184 APR_ARRAY_PUSH(m->streams_ev_out, h2_stream*) = stream;
1185 }
1186 }
1187 nresults = 0;
1188 rv = APR_SUCCESS;
1189 apr_thread_mutex_unlock(m->poll_lock);
1190 break;
1191 }
1192 apr_thread_mutex_unlock(m->poll_lock);
1193 #endif
1194 H2_MPLX_LEAVE(m);
1195 rv = apr_pollset_poll(m->pollset, timeout >= 0? timeout : -1, &nresults, &results);
1196 H2_MPLX_ENTER_ALWAYS(m);
1197
1198 } while (APR_STATUS_IS_EINTR(rv));
1199
1200 if (APR_SUCCESS != rv) {
1201 if (APR_STATUS_IS_TIMEUP(rv)) {
1202 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
1203 "h2_mplx(%ld): polling timed out ",
1204 m->id);
1205 }
1206 else {
1207 ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, m->c1, APLOGNO(10310)
1208 "h2_mplx(%ld): polling failed", m->id);
1209 }
1210 goto cleanup;
1211 }
1212
1213 for (i = 0; i < nresults; i++) {
1214 pfd = &results[i];
1215 conn_ctx = pfd->client_data;
1216
1217 ap_assert(conn_ctx);
1218 if (conn_ctx->stream_id == 0) {
1219 if (on_stream_input) {
1220 APR_ARRAY_PUSH(m->streams_ev_in, h2_stream*) = m->stream0;
1221 }
1222 continue;
1223 }
1224
1225 h2_util_drain_pipe(pfd->desc.f);
1226 stream = h2_ihash_get(m->streams, conn_ctx->stream_id);
1227 if (!stream) {
1228 stream = h2_ihash_get(m->shold, conn_ctx->stream_id);
1229 if (stream) {
1230 /* This is normal and means that stream processing on c1 has
1231 * already finished to CLEANUP and c2 is not done yet */
1232 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, m->c1,
1233 "h2_mplx(%ld-%d): stream already in hold for poll event %hx",
1234 m->id, conn_ctx->stream_id, pfd->rtnevents);
1235 }
1236 else {
1237 h2_stream *sp = NULL;
1238 int j;
1239
1240 for (j = 0; j < m->spurge->nelts; ++j) {
1241 sp = APR_ARRAY_IDX(m->spurge, j, h2_stream*);
1242 if (sp->id == conn_ctx->stream_id) {
1243 stream = sp;
1244 break;
1245 }
1246 }
1247
1248 if (stream) {
1249 /* This is normal and means that stream processing on c1 has
1250 * already finished to CLEANUP and c2 is not done yet */
1251 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, rv, m->c1, APLOGNO(10311)
1252 "h2_mplx(%ld-%d): stream already in purge for poll event %hx",
1253 m->id, conn_ctx->stream_id, pfd->rtnevents);
1254 }
1255 else {
1256 /* This should not happen. When a stream has been purged,
1257 * it MUST no longer appear in the pollset. Puring is done
1258 * outside the poll result processing. */
1259 ap_log_cerror(APLOG_MARK, APLOG_WARNING, rv, m->c1, APLOGNO(10312)
1260 "h2_mplx(%ld-%d): stream no longer known for poll event %hx"
1261 ", m->streams=%d, conn_ctx=%lx, fd=%lx",
1262 m->id, conn_ctx->stream_id, pfd->rtnevents,
1263 (int)h2_ihash_count(m->streams),
1264 (long)conn_ctx, (long)pfd->desc.f);
1265 h2_ihash_iter(m->streams, m_report_stream_iter, m);
1266 }
1267 }
1268 continue;
1269 }
1270
1271 if (conn_ctx->pfd_out_prod.desc.f == pfd->desc.f) {
1272 /* output is available */
1273 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
1274 "[%s-%d] poll output event %hx",
1275 conn_ctx->id, conn_ctx->stream_id,
1276 pfd->rtnevents);
1277 APR_ARRAY_PUSH(m->streams_ev_out, h2_stream*) = stream;
1278 }
1279 else if (conn_ctx->pfd_in_drain.desc.f == pfd->desc.f) {
1280 /* input has been consumed */
1281 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
1282 "[%s-%d] poll input event %hx",
1283 conn_ctx->id, conn_ctx->stream_id,
1284 pfd->rtnevents);
1285 APR_ARRAY_PUSH(m->streams_ev_in, h2_stream*) = stream;
1286 }
1287 }
1288
1289 if (on_stream_input && m->streams_ev_in->nelts) {
1290 H2_MPLX_LEAVE(m);
1291 for (i = 0; i < m->streams_ev_in->nelts; ++i) {
1292 on_stream_input(on_ctx, APR_ARRAY_IDX(m->streams_ev_in, i, h2_stream*));
1293 }
1294 H2_MPLX_ENTER_ALWAYS(m);
1295 }
1296 if (on_stream_output && m->streams_ev_out->nelts) {
1297 H2_MPLX_LEAVE(m);
1298 for (i = 0; i < m->streams_ev_out->nelts; ++i) {
1299 on_stream_output(on_ctx, APR_ARRAY_IDX(m->streams_ev_out, i, h2_stream*));
1300 }
1301 H2_MPLX_ENTER_ALWAYS(m);
1302 }
1303 break;
1304 } while(1);
1305
1306 cleanup:
1307 m->polling = 0;
1308 return rv;
1309 }
1310
1311