1 /* Licensed to the Apache Software Foundation (ASF) under one or more
2 * contributor license agreements. See the NOTICE file distributed with
3 * this work for additional information regarding copyright ownership.
4 * The ASF licenses this file to You under the Apache License, Version 2.0
5 * (the "License"); you may not use this file except in compliance with
6 * the License. You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <apr_lib.h>
18 #include <apr_atomic.h>
19 #include <apr_strings.h>
20 #include <apr_time.h>
21 #include <apr_buckets.h>
22 #include <apr_thread_mutex.h>
23 #include <apr_thread_cond.h>
24
25 #include <httpd.h>
26 #include <http_protocol.h>
27 #include <http_log.h>
28
29 #include "h2_private.h"
30 #include "h2_util.h"
31 #include "h2_bucket_beam.h"
32
33 static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy);
34
35 #define H2_BPROXY_NEXT(e) APR_RING_NEXT((e), link)
36 #define H2_BPROXY_PREV(e) APR_RING_PREV((e), link)
37 #define H2_BPROXY_REMOVE(e) APR_RING_REMOVE((e), link)
38
39 #define H2_BPROXY_LIST_INIT(b) APR_RING_INIT(&(b)->list, h2_beam_proxy, link);
40 #define H2_BPROXY_LIST_SENTINEL(b) APR_RING_SENTINEL(&(b)->list, h2_beam_proxy, link)
41 #define H2_BPROXY_LIST_EMPTY(b) APR_RING_EMPTY(&(b)->list, h2_beam_proxy, link)
42 #define H2_BPROXY_LIST_FIRST(b) APR_RING_FIRST(&(b)->list)
43 #define H2_BPROXY_LIST_LAST(b) APR_RING_LAST(&(b)->list)
44 #define H2_PROXY_BLIST_INSERT_HEAD(b, e) do { \
45 h2_beam_proxy *ap__b = (e); \
46 APR_RING_INSERT_HEAD(&(b)->list, ap__b, h2_beam_proxy, link); \
47 } while (0)
48 #define H2_BPROXY_LIST_INSERT_TAIL(b, e) do { \
49 h2_beam_proxy *ap__b = (e); \
50 APR_RING_INSERT_TAIL(&(b)->list, ap__b, h2_beam_proxy, link); \
51 } while (0)
52 #define H2_BPROXY_LIST_CONCAT(a, b) do { \
53 APR_RING_CONCAT(&(a)->list, &(b)->list, h2_beam_proxy, link); \
54 } while (0)
55 #define H2_BPROXY_LIST_PREPEND(a, b) do { \
56 APR_RING_PREPEND(&(a)->list, &(b)->list, h2_beam_proxy, link); \
57 } while (0)
58
59
60 /*******************************************************************************
61 * beam bucket with reference to beam and bucket it represents
62 ******************************************************************************/
63
64 const apr_bucket_type_t h2_bucket_type_beam;
65
66 #define H2_BUCKET_IS_BEAM(e) (e->type == &h2_bucket_type_beam)
67
68 struct h2_beam_proxy {
69 apr_bucket_refcount refcount;
70 APR_RING_ENTRY(h2_beam_proxy) link;
71 h2_bucket_beam *beam;
72 apr_bucket *bsender;
73 apr_size_t n;
74 };
75
76 static const char Dummy = '\0';
77
beam_bucket_read(apr_bucket * b,const char ** str,apr_size_t * len,apr_read_type_e block)78 static apr_status_t beam_bucket_read(apr_bucket *b, const char **str,
79 apr_size_t *len, apr_read_type_e block)
80 {
81 h2_beam_proxy *d = b->data;
82 if (d->bsender) {
83 const char *data;
84 apr_status_t status = apr_bucket_read(d->bsender, &data, len, block);
85 if (status == APR_SUCCESS) {
86 *str = data + b->start;
87 *len = b->length;
88 }
89 return status;
90 }
91 *str = &Dummy;
92 *len = 0;
93 return APR_ECONNRESET;
94 }
95
beam_bucket_destroy(void * data)96 static void beam_bucket_destroy(void *data)
97 {
98 h2_beam_proxy *d = data;
99
100 if (apr_bucket_shared_destroy(d)) {
101 /* When the beam gets destroyed before this bucket, it will
102 * NULLify its reference here. This is not protected by a mutex,
103 * so it will not help with race conditions.
104 * But it lets us shut down memory pool with circulare beam
105 * references. */
106 if (d->beam) {
107 h2_beam_emitted(d->beam, d);
108 }
109 apr_bucket_free(d);
110 }
111 }
112
h2_beam_bucket_make(apr_bucket * b,h2_bucket_beam * beam,apr_bucket * bsender,apr_size_t n)113 static apr_bucket * h2_beam_bucket_make(apr_bucket *b,
114 h2_bucket_beam *beam,
115 apr_bucket *bsender, apr_size_t n)
116 {
117 h2_beam_proxy *d;
118
119 d = apr_bucket_alloc(sizeof(*d), b->list);
120 H2_BPROXY_LIST_INSERT_TAIL(&beam->proxies, d);
121 d->beam = beam;
122 d->bsender = bsender;
123 d->n = n;
124
125 b = apr_bucket_shared_make(b, d, 0, bsender? bsender->length : 0);
126 b->type = &h2_bucket_type_beam;
127
128 return b;
129 }
130
h2_beam_bucket_create(h2_bucket_beam * beam,apr_bucket * bsender,apr_bucket_alloc_t * list,apr_size_t n)131 static apr_bucket *h2_beam_bucket_create(h2_bucket_beam *beam,
132 apr_bucket *bsender,
133 apr_bucket_alloc_t *list,
134 apr_size_t n)
135 {
136 apr_bucket *b = apr_bucket_alloc(sizeof(*b), list);
137
138 APR_BUCKET_INIT(b);
139 b->free = apr_bucket_free;
140 b->list = list;
141 return h2_beam_bucket_make(b, beam, bsender, n);
142 }
143
144 const apr_bucket_type_t h2_bucket_type_beam = {
145 "BEAM", 5, APR_BUCKET_DATA,
146 beam_bucket_destroy,
147 beam_bucket_read,
148 apr_bucket_setaside_noop,
149 apr_bucket_shared_split,
150 apr_bucket_shared_copy
151 };
152
153 /*******************************************************************************
154 * h2_blist, a brigade without allocations
155 ******************************************************************************/
156
157 static apr_array_header_t *beamers;
158
cleanup_beamers(void * dummy)159 static apr_status_t cleanup_beamers(void *dummy)
160 {
161 (void)dummy;
162 beamers = NULL;
163 return APR_SUCCESS;
164 }
165
h2_register_bucket_beamer(h2_bucket_beamer * beamer)166 void h2_register_bucket_beamer(h2_bucket_beamer *beamer)
167 {
168 if (!beamers) {
169 apr_pool_cleanup_register(apr_hook_global_pool, NULL,
170 cleanup_beamers, apr_pool_cleanup_null);
171 beamers = apr_array_make(apr_hook_global_pool, 10,
172 sizeof(h2_bucket_beamer*));
173 }
174 APR_ARRAY_PUSH(beamers, h2_bucket_beamer*) = beamer;
175 }
176
h2_beam_bucket(h2_bucket_beam * beam,apr_bucket_brigade * dest,const apr_bucket * src)177 static apr_bucket *h2_beam_bucket(h2_bucket_beam *beam,
178 apr_bucket_brigade *dest,
179 const apr_bucket *src)
180 {
181 apr_bucket *b = NULL;
182 int i;
183 if (beamers) {
184 for (i = 0; i < beamers->nelts && b == NULL; ++i) {
185 h2_bucket_beamer *beamer;
186
187 beamer = APR_ARRAY_IDX(beamers, i, h2_bucket_beamer*);
188 b = beamer(beam, dest, src);
189 }
190 }
191 return b;
192 }
193
194
195 /*******************************************************************************
196 * bucket beam that can transport buckets across threads
197 ******************************************************************************/
198
mutex_leave(apr_thread_mutex_t * lock)199 static void mutex_leave(apr_thread_mutex_t *lock)
200 {
201 apr_thread_mutex_unlock(lock);
202 }
203
mutex_enter(void * ctx,h2_beam_lock * pbl)204 static apr_status_t mutex_enter(void *ctx, h2_beam_lock *pbl)
205 {
206 h2_bucket_beam *beam = ctx;
207 pbl->mutex = beam->lock;
208 pbl->leave = mutex_leave;
209 return apr_thread_mutex_lock(pbl->mutex);
210 }
211
enter_yellow(h2_bucket_beam * beam,h2_beam_lock * pbl)212 static apr_status_t enter_yellow(h2_bucket_beam *beam, h2_beam_lock *pbl)
213 {
214 return mutex_enter(beam, pbl);
215 }
216
leave_yellow(h2_bucket_beam * beam,h2_beam_lock * pbl)217 static void leave_yellow(h2_bucket_beam *beam, h2_beam_lock *pbl)
218 {
219 (void)beam;
220 if (pbl->leave) {
221 pbl->leave(pbl->mutex);
222 }
223 }
224
bucket_mem_used(apr_bucket * b)225 static apr_off_t bucket_mem_used(apr_bucket *b)
226 {
227 if (APR_BUCKET_IS_FILE(b)) {
228 return 0;
229 }
230 else {
231 /* should all have determinate length */
232 return (apr_off_t)b->length;
233 }
234 }
235
report_consumption(h2_bucket_beam * beam,h2_beam_lock * pbl)236 static int report_consumption(h2_bucket_beam *beam, h2_beam_lock *pbl)
237 {
238 int rv = 0;
239 apr_off_t len = beam->received_bytes - beam->cons_bytes_reported;
240 h2_beam_io_callback *cb = beam->cons_io_cb;
241
242 if (len > 0) {
243 if (cb) {
244 void *ctx = beam->cons_ctx;
245
246 if (pbl) leave_yellow(beam, pbl);
247 cb(ctx, beam, len);
248 if (pbl) enter_yellow(beam, pbl);
249 rv = 1;
250 }
251 beam->cons_bytes_reported += len;
252 }
253 return rv;
254 }
255
report_prod_io(h2_bucket_beam * beam,int force,h2_beam_lock * pbl)256 static void report_prod_io(h2_bucket_beam *beam, int force, h2_beam_lock *pbl)
257 {
258 apr_off_t len = beam->sent_bytes - beam->prod_bytes_reported;
259 if (force || len > 0) {
260 h2_beam_io_callback *cb = beam->prod_io_cb;
261 if (cb) {
262 void *ctx = beam->prod_ctx;
263
264 leave_yellow(beam, pbl);
265 cb(ctx, beam, len);
266 enter_yellow(beam, pbl);
267 }
268 beam->prod_bytes_reported += len;
269 }
270 }
271
calc_buffered(h2_bucket_beam * beam)272 static apr_size_t calc_buffered(h2_bucket_beam *beam)
273 {
274 apr_size_t len = 0;
275 apr_bucket *b;
276 for (b = H2_BLIST_FIRST(&beam->send_list);
277 b != H2_BLIST_SENTINEL(&beam->send_list);
278 b = APR_BUCKET_NEXT(b)) {
279 if (b->length == ((apr_size_t)-1)) {
280 /* do not count */
281 }
282 else if (APR_BUCKET_IS_FILE(b)) {
283 /* if unread, has no real mem footprint. */
284 }
285 else {
286 len += b->length;
287 }
288 }
289 return len;
290 }
291
r_purge_sent(h2_bucket_beam * beam)292 static void r_purge_sent(h2_bucket_beam *beam)
293 {
294 apr_bucket *b;
295 /* delete all sender buckets in purge brigade, needs to be called
296 * from sender thread only */
297 while (!H2_BLIST_EMPTY(&beam->purge_list)) {
298 b = H2_BLIST_FIRST(&beam->purge_list);
299 apr_bucket_delete(b);
300 }
301 }
302
calc_space_left(h2_bucket_beam * beam)303 static apr_size_t calc_space_left(h2_bucket_beam *beam)
304 {
305 if (beam->max_buf_size > 0) {
306 apr_size_t len = calc_buffered(beam);
307 return (beam->max_buf_size > len? (beam->max_buf_size - len) : 0);
308 }
309 return APR_SIZE_MAX;
310 }
311
buffer_is_empty(h2_bucket_beam * beam)312 static int buffer_is_empty(h2_bucket_beam *beam)
313 {
314 return ((!beam->recv_buffer || APR_BRIGADE_EMPTY(beam->recv_buffer))
315 && H2_BLIST_EMPTY(&beam->send_list));
316 }
317
wait_empty(h2_bucket_beam * beam,apr_read_type_e block,apr_thread_mutex_t * lock)318 static apr_status_t wait_empty(h2_bucket_beam *beam, apr_read_type_e block,
319 apr_thread_mutex_t *lock)
320 {
321 apr_status_t rv = APR_SUCCESS;
322
323 while (!buffer_is_empty(beam) && APR_SUCCESS == rv) {
324 if (APR_BLOCK_READ != block || !lock) {
325 rv = APR_EAGAIN;
326 }
327 else if (beam->timeout > 0) {
328 rv = apr_thread_cond_timedwait(beam->change, lock, beam->timeout);
329 }
330 else {
331 rv = apr_thread_cond_wait(beam->change, lock);
332 }
333 }
334 return rv;
335 }
336
wait_not_empty(h2_bucket_beam * beam,apr_read_type_e block,apr_thread_mutex_t * lock)337 static apr_status_t wait_not_empty(h2_bucket_beam *beam, apr_read_type_e block,
338 apr_thread_mutex_t *lock)
339 {
340 apr_status_t rv = APR_SUCCESS;
341
342 while (buffer_is_empty(beam) && APR_SUCCESS == rv) {
343 if (beam->aborted) {
344 rv = APR_ECONNABORTED;
345 }
346 else if (beam->closed) {
347 rv = APR_EOF;
348 }
349 else if (APR_BLOCK_READ != block || !lock) {
350 rv = APR_EAGAIN;
351 }
352 else if (beam->timeout > 0) {
353 rv = apr_thread_cond_timedwait(beam->change, lock, beam->timeout);
354 }
355 else {
356 rv = apr_thread_cond_wait(beam->change, lock);
357 }
358 }
359 return rv;
360 }
361
wait_not_full(h2_bucket_beam * beam,apr_read_type_e block,apr_size_t * pspace_left,h2_beam_lock * bl)362 static apr_status_t wait_not_full(h2_bucket_beam *beam, apr_read_type_e block,
363 apr_size_t *pspace_left, h2_beam_lock *bl)
364 {
365 apr_status_t rv = APR_SUCCESS;
366 apr_size_t left;
367
368 while (0 == (left = calc_space_left(beam)) && APR_SUCCESS == rv) {
369 if (beam->aborted) {
370 rv = APR_ECONNABORTED;
371 }
372 else if (block != APR_BLOCK_READ || !bl->mutex) {
373 rv = APR_EAGAIN;
374 }
375 else {
376 if (beam->timeout > 0) {
377 rv = apr_thread_cond_timedwait(beam->change, bl->mutex, beam->timeout);
378 }
379 else {
380 rv = apr_thread_cond_wait(beam->change, bl->mutex);
381 }
382 }
383 }
384 *pspace_left = left;
385 return rv;
386 }
387
h2_beam_emitted(h2_bucket_beam * beam,h2_beam_proxy * proxy)388 static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy)
389 {
390 h2_beam_lock bl;
391 apr_bucket *b, *next;
392
393 if (enter_yellow(beam, &bl) == APR_SUCCESS) {
394 /* even when beam buckets are split, only the one where
395 * refcount drops to 0 will call us */
396 H2_BPROXY_REMOVE(proxy);
397 /* invoked from receiver thread, the last beam bucket for the send
398 * bucket is about to be destroyed.
399 * remove it from the hold, where it should be now */
400 if (proxy->bsender) {
401 for (b = H2_BLIST_FIRST(&beam->hold_list);
402 b != H2_BLIST_SENTINEL(&beam->hold_list);
403 b = APR_BUCKET_NEXT(b)) {
404 if (b == proxy->bsender) {
405 break;
406 }
407 }
408 if (b != H2_BLIST_SENTINEL(&beam->hold_list)) {
409 /* bucket is in hold as it should be, mark this one
410 * and all before it for purging. We might have placed meta
411 * buckets without a receiver proxy into the hold before it
412 * and schedule them for purging now */
413 for (b = H2_BLIST_FIRST(&beam->hold_list);
414 b != H2_BLIST_SENTINEL(&beam->hold_list);
415 b = next) {
416 next = APR_BUCKET_NEXT(b);
417 if (b == proxy->bsender) {
418 APR_BUCKET_REMOVE(b);
419 H2_BLIST_INSERT_TAIL(&beam->purge_list, b);
420 break;
421 }
422 else if (APR_BUCKET_IS_METADATA(b)) {
423 APR_BUCKET_REMOVE(b);
424 H2_BLIST_INSERT_TAIL(&beam->purge_list, b);
425 }
426 else {
427 /* another data bucket before this one in hold. this
428 * is normal since DATA buckets need not be destroyed
429 * in order */
430 }
431 }
432
433 proxy->bsender = NULL;
434 }
435 else {
436 /* it should be there unless we screwed up */
437 ap_log_perror(APLOG_MARK, APLOG_WARNING, 0, beam->send_pool,
438 APLOGNO(03384) "h2_beam(%d-%s): emitted bucket not "
439 "in hold, n=%d", beam->id, beam->tag,
440 (int)proxy->n);
441 ap_assert(!proxy->bsender);
442 }
443 }
444 /* notify anyone waiting on space to become available */
445 if (!bl.mutex) {
446 r_purge_sent(beam);
447 }
448 else {
449 apr_thread_cond_broadcast(beam->change);
450 }
451 leave_yellow(beam, &bl);
452 }
453 }
454
h2_blist_cleanup(h2_blist * bl)455 static void h2_blist_cleanup(h2_blist *bl)
456 {
457 apr_bucket *e;
458
459 while (!H2_BLIST_EMPTY(bl)) {
460 e = H2_BLIST_FIRST(bl);
461 apr_bucket_delete(e);
462 }
463 }
464
beam_close(h2_bucket_beam * beam)465 static apr_status_t beam_close(h2_bucket_beam *beam)
466 {
467 if (!beam->closed) {
468 beam->closed = 1;
469 apr_thread_cond_broadcast(beam->change);
470 }
471 return APR_SUCCESS;
472 }
473
h2_beam_is_closed(h2_bucket_beam * beam)474 int h2_beam_is_closed(h2_bucket_beam *beam)
475 {
476 return beam->closed;
477 }
478
pool_register(h2_bucket_beam * beam,apr_pool_t * pool,apr_status_t (* cleanup)(void *))479 static int pool_register(h2_bucket_beam *beam, apr_pool_t *pool,
480 apr_status_t (*cleanup)(void *))
481 {
482 if (pool && pool != beam->pool) {
483 apr_pool_pre_cleanup_register(pool, beam, cleanup);
484 return 1;
485 }
486 return 0;
487 }
488
pool_kill(h2_bucket_beam * beam,apr_pool_t * pool,apr_status_t (* cleanup)(void *))489 static int pool_kill(h2_bucket_beam *beam, apr_pool_t *pool,
490 apr_status_t (*cleanup)(void *)) {
491 if (pool && pool != beam->pool) {
492 apr_pool_cleanup_kill(pool, beam, cleanup);
493 return 1;
494 }
495 return 0;
496 }
497
beam_recv_cleanup(void * data)498 static apr_status_t beam_recv_cleanup(void *data)
499 {
500 h2_bucket_beam *beam = data;
501 /* receiver pool has gone away, clear references */
502 beam->recv_buffer = NULL;
503 beam->recv_pool = NULL;
504 return APR_SUCCESS;
505 }
506
beam_send_cleanup(void * data)507 static apr_status_t beam_send_cleanup(void *data)
508 {
509 h2_bucket_beam *beam = data;
510 /* sender is going away, clear up all references to its memory */
511 r_purge_sent(beam);
512 h2_blist_cleanup(&beam->send_list);
513 report_consumption(beam, NULL);
514 while (!H2_BPROXY_LIST_EMPTY(&beam->proxies)) {
515 h2_beam_proxy *proxy = H2_BPROXY_LIST_FIRST(&beam->proxies);
516 H2_BPROXY_REMOVE(proxy);
517 proxy->beam = NULL;
518 proxy->bsender = NULL;
519 }
520 h2_blist_cleanup(&beam->purge_list);
521 h2_blist_cleanup(&beam->hold_list);
522 beam->send_pool = NULL;
523 return APR_SUCCESS;
524 }
525
beam_set_send_pool(h2_bucket_beam * beam,apr_pool_t * pool)526 static void beam_set_send_pool(h2_bucket_beam *beam, apr_pool_t *pool)
527 {
528 if (beam->send_pool != pool) {
529 if (beam->send_pool && beam->send_pool != beam->pool) {
530 pool_kill(beam, beam->send_pool, beam_send_cleanup);
531 beam_send_cleanup(beam);
532 }
533 beam->send_pool = pool;
534 pool_register(beam, beam->send_pool, beam_send_cleanup);
535 }
536 }
537
recv_buffer_cleanup(h2_bucket_beam * beam,h2_beam_lock * bl)538 static void recv_buffer_cleanup(h2_bucket_beam *beam, h2_beam_lock *bl)
539 {
540 if (beam->recv_buffer && !APR_BRIGADE_EMPTY(beam->recv_buffer)) {
541 apr_bucket_brigade *bb = beam->recv_buffer;
542 apr_off_t bblen = 0;
543
544 beam->recv_buffer = NULL;
545 apr_brigade_length(bb, 0, &bblen);
546 beam->received_bytes += bblen;
547
548 /* need to do this unlocked since bucket destroy might
549 * call this beam again. */
550 if (bl) leave_yellow(beam, bl);
551 apr_brigade_destroy(bb);
552 if (bl) enter_yellow(beam, bl);
553
554 apr_thread_cond_broadcast(beam->change);
555 if (beam->cons_ev_cb) {
556 beam->cons_ev_cb(beam->cons_ctx, beam);
557 }
558 }
559 }
560
beam_cleanup(h2_bucket_beam * beam,int from_pool)561 static apr_status_t beam_cleanup(h2_bucket_beam *beam, int from_pool)
562 {
563 apr_status_t status = APR_SUCCESS;
564 int safe_send = (beam->owner == H2_BEAM_OWNER_SEND);
565 int safe_recv = (beam->owner == H2_BEAM_OWNER_RECV);
566
567 /*
568 * Owner of the beam is going away, depending on which side it owns,
569 * cleanup strategies will differ.
570 *
571 * In general, receiver holds references to memory from sender.
572 * Clean up receiver first, if safe, then cleanup sender, if safe.
573 */
574
575 /* When called from pool destroy, io callbacks are disabled */
576 if (from_pool) {
577 beam->cons_io_cb = NULL;
578 }
579
580 /* When modify send is not safe, this means we still have multi-thread
581 * protection and the owner is receiving the buckets. If the sending
582 * side has not gone away, this means we could have dangling buckets
583 * in our lists that never get destroyed. This should not happen. */
584 ap_assert(safe_send || !beam->send_pool);
585 if (!H2_BLIST_EMPTY(&beam->send_list)) {
586 ap_assert(beam->send_pool);
587 }
588
589 if (safe_recv) {
590 if (beam->recv_pool) {
591 pool_kill(beam, beam->recv_pool, beam_recv_cleanup);
592 beam->recv_pool = NULL;
593 }
594 recv_buffer_cleanup(beam, NULL);
595 }
596 else {
597 beam->recv_buffer = NULL;
598 beam->recv_pool = NULL;
599 }
600
601 if (safe_send && beam->send_pool) {
602 pool_kill(beam, beam->send_pool, beam_send_cleanup);
603 status = beam_send_cleanup(beam);
604 }
605
606 if (safe_recv) {
607 ap_assert(H2_BPROXY_LIST_EMPTY(&beam->proxies));
608 ap_assert(H2_BLIST_EMPTY(&beam->send_list));
609 ap_assert(H2_BLIST_EMPTY(&beam->hold_list));
610 ap_assert(H2_BLIST_EMPTY(&beam->purge_list));
611 }
612 return status;
613 }
614
beam_pool_cleanup(void * data)615 static apr_status_t beam_pool_cleanup(void *data)
616 {
617 return beam_cleanup(data, 1);
618 }
619
h2_beam_destroy(h2_bucket_beam * beam)620 apr_status_t h2_beam_destroy(h2_bucket_beam *beam)
621 {
622 apr_pool_cleanup_kill(beam->pool, beam, beam_pool_cleanup);
623 return beam_cleanup(beam, 0);
624 }
625
h2_beam_create(h2_bucket_beam ** pbeam,apr_pool_t * pool,int id,const char * tag,h2_beam_owner_t owner,apr_size_t max_buf_size,apr_interval_time_t timeout)626 apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *pool,
627 int id, const char *tag,
628 h2_beam_owner_t owner,
629 apr_size_t max_buf_size,
630 apr_interval_time_t timeout)
631 {
632 h2_bucket_beam *beam;
633 apr_status_t rv = APR_SUCCESS;
634
635 beam = apr_pcalloc(pool, sizeof(*beam));
636 if (!beam) {
637 return APR_ENOMEM;
638 }
639
640 beam->id = id;
641 beam->tag = tag;
642 beam->pool = pool;
643 beam->owner = owner;
644 H2_BLIST_INIT(&beam->send_list);
645 H2_BLIST_INIT(&beam->hold_list);
646 H2_BLIST_INIT(&beam->purge_list);
647 H2_BPROXY_LIST_INIT(&beam->proxies);
648 beam->tx_mem_limits = 1;
649 beam->max_buf_size = max_buf_size;
650 beam->timeout = timeout;
651
652 rv = apr_thread_mutex_create(&beam->lock, APR_THREAD_MUTEX_DEFAULT, pool);
653 if (APR_SUCCESS == rv) {
654 rv = apr_thread_cond_create(&beam->change, pool);
655 if (APR_SUCCESS == rv) {
656 apr_pool_pre_cleanup_register(pool, beam, beam_pool_cleanup);
657 *pbeam = beam;
658 }
659 }
660 return rv;
661 }
662
h2_beam_buffer_size_set(h2_bucket_beam * beam,apr_size_t buffer_size)663 void h2_beam_buffer_size_set(h2_bucket_beam *beam, apr_size_t buffer_size)
664 {
665 h2_beam_lock bl;
666
667 if (enter_yellow(beam, &bl) == APR_SUCCESS) {
668 beam->max_buf_size = buffer_size;
669 leave_yellow(beam, &bl);
670 }
671 }
672
h2_beam_buffer_size_get(h2_bucket_beam * beam)673 apr_size_t h2_beam_buffer_size_get(h2_bucket_beam *beam)
674 {
675 h2_beam_lock bl;
676 apr_size_t buffer_size = 0;
677
678 if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
679 buffer_size = beam->max_buf_size;
680 leave_yellow(beam, &bl);
681 }
682 return buffer_size;
683 }
684
h2_beam_timeout_set(h2_bucket_beam * beam,apr_interval_time_t timeout)685 void h2_beam_timeout_set(h2_bucket_beam *beam, apr_interval_time_t timeout)
686 {
687 h2_beam_lock bl;
688
689 if (enter_yellow(beam, &bl) == APR_SUCCESS) {
690 beam->timeout = timeout;
691 leave_yellow(beam, &bl);
692 }
693 }
694
h2_beam_timeout_get(h2_bucket_beam * beam)695 apr_interval_time_t h2_beam_timeout_get(h2_bucket_beam *beam)
696 {
697 h2_beam_lock bl;
698 apr_interval_time_t timeout = 0;
699
700 if (enter_yellow(beam, &bl) == APR_SUCCESS) {
701 timeout = beam->timeout;
702 leave_yellow(beam, &bl);
703 }
704 return timeout;
705 }
706
h2_beam_abort(h2_bucket_beam * beam)707 void h2_beam_abort(h2_bucket_beam *beam)
708 {
709 h2_beam_lock bl;
710
711 if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
712 beam->aborted = 1;
713 r_purge_sent(beam);
714 h2_blist_cleanup(&beam->send_list);
715 report_consumption(beam, &bl);
716 apr_thread_cond_broadcast(beam->change);
717 leave_yellow(beam, &bl);
718 }
719 }
720
h2_beam_close(h2_bucket_beam * beam)721 apr_status_t h2_beam_close(h2_bucket_beam *beam)
722 {
723 h2_beam_lock bl;
724
725 if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
726 r_purge_sent(beam);
727 beam_close(beam);
728 report_consumption(beam, &bl);
729 leave_yellow(beam, &bl);
730 }
731 return beam->aborted? APR_ECONNABORTED : APR_SUCCESS;
732 }
733
h2_beam_leave(h2_bucket_beam * beam)734 apr_status_t h2_beam_leave(h2_bucket_beam *beam)
735 {
736 h2_beam_lock bl;
737
738 if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
739 recv_buffer_cleanup(beam, &bl);
740 beam->aborted = 1;
741 beam_close(beam);
742 leave_yellow(beam, &bl);
743 }
744 return APR_SUCCESS;
745 }
746
h2_beam_wait_empty(h2_bucket_beam * beam,apr_read_type_e block)747 apr_status_t h2_beam_wait_empty(h2_bucket_beam *beam, apr_read_type_e block)
748 {
749 apr_status_t status;
750 h2_beam_lock bl;
751
752 if ((status = enter_yellow(beam, &bl)) == APR_SUCCESS) {
753 status = wait_empty(beam, block, bl.mutex);
754 leave_yellow(beam, &bl);
755 }
756 return status;
757 }
758
move_to_hold(h2_bucket_beam * beam,apr_bucket_brigade * sender_bb)759 static void move_to_hold(h2_bucket_beam *beam,
760 apr_bucket_brigade *sender_bb)
761 {
762 apr_bucket *b;
763 while (sender_bb && !APR_BRIGADE_EMPTY(sender_bb)) {
764 b = APR_BRIGADE_FIRST(sender_bb);
765 APR_BUCKET_REMOVE(b);
766 H2_BLIST_INSERT_TAIL(&beam->send_list, b);
767 }
768 }
769
append_bucket(h2_bucket_beam * beam,apr_bucket * b,apr_read_type_e block,apr_size_t * pspace_left,h2_beam_lock * pbl)770 static apr_status_t append_bucket(h2_bucket_beam *beam,
771 apr_bucket *b,
772 apr_read_type_e block,
773 apr_size_t *pspace_left,
774 h2_beam_lock *pbl)
775 {
776 const char *data;
777 apr_size_t len;
778 apr_status_t status;
779 int can_beam = 0, check_len;
780
781 (void)block;
782 (void)pbl;
783 if (beam->aborted) {
784 return APR_ECONNABORTED;
785 }
786
787 if (APR_BUCKET_IS_METADATA(b)) {
788 if (APR_BUCKET_IS_EOS(b)) {
789 beam->closed = 1;
790 }
791 APR_BUCKET_REMOVE(b);
792 H2_BLIST_INSERT_TAIL(&beam->send_list, b);
793 return APR_SUCCESS;
794 }
795 else if (APR_BUCKET_IS_FILE(b)) {
796 /* For file buckets the problem is their internal readpool that
797 * is used on the first read to allocate buffer/mmap.
798 * Since setting aside a file bucket will de-register the
799 * file cleanup function from the previous pool, we need to
800 * call that only from the sender thread.
801 *
802 * Currently, we do not handle file bucket with refcount > 1 as
803 * the beam is then not in complete control of the file's lifetime.
804 * Which results in the bug that a file get closed by the receiver
805 * while the sender or the beam still have buckets using it.
806 *
807 * Additionally, we allow callbacks to prevent beaming file
808 * handles across. The use case for this is to limit the number
809 * of open file handles and rather use a less efficient beam
810 * transport. */
811 apr_bucket_file *bf = b->data;
812 apr_file_t *fd = bf->fd;
813 can_beam = (bf->refcount.refcount == 1);
814 if (can_beam && beam->can_beam_fn) {
815 can_beam = beam->can_beam_fn(beam->can_beam_ctx, beam, fd);
816 }
817 check_len = !can_beam;
818 }
819 else {
820 if (b->length == ((apr_size_t)-1)) {
821 const char *data2;
822 status = apr_bucket_read(b, &data2, &len, APR_BLOCK_READ);
823 if (status != APR_SUCCESS) {
824 return status;
825 }
826 }
827 check_len = 1;
828 }
829
830 if (check_len) {
831 if (b->length > *pspace_left) {
832 apr_bucket_split(b, *pspace_left);
833 }
834 *pspace_left -= b->length;
835 }
836
837 /* The fundamental problem is that reading a sender bucket from
838 * a receiver thread is a total NO GO, because the bucket might use
839 * its pool/bucket_alloc from a foreign thread and that will
840 * corrupt. */
841 status = APR_ENOTIMPL;
842 if (APR_BUCKET_IS_TRANSIENT(b)) {
843 /* this takes care of transient buckets and converts them
844 * into heap ones. Other bucket types might or might not be
845 * affected by this. */
846 status = apr_bucket_setaside(b, beam->send_pool);
847 }
848 else if (APR_BUCKET_IS_HEAP(b)) {
849 /* For heap buckets read from a receiver thread is fine. The
850 * data will be there and live until the bucket itself is
851 * destroyed. */
852 status = APR_SUCCESS;
853 }
854 else if (APR_BUCKET_IS_POOL(b)) {
855 /* pool buckets are bastards that register at pool cleanup
856 * to morph themselves into heap buckets. That may happen anytime,
857 * even after the bucket data pointer has been read. So at
858 * any time inside the receiver thread, the pool bucket memory
859 * may disappear. yikes. */
860 status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
861 if (status == APR_SUCCESS) {
862 apr_bucket_heap_make(b, data, len, NULL);
863 }
864 }
865 else if (APR_BUCKET_IS_FILE(b) && can_beam) {
866 status = apr_bucket_setaside(b, beam->send_pool);
867 }
868
869 if (status == APR_ENOTIMPL) {
870 /* we have no knowledge about the internals of this bucket,
871 * but hope that after read, its data stays immutable for the
872 * lifetime of the bucket. (see pool bucket handling above for
873 * a counter example).
874 * We do the read while in the sender thread, so that the bucket may
875 * use pools/allocators safely. */
876 status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
877 if (status == APR_SUCCESS) {
878 status = apr_bucket_setaside(b, beam->send_pool);
879 }
880 }
881
882 if (status != APR_SUCCESS && status != APR_ENOTIMPL) {
883 return status;
884 }
885
886 APR_BUCKET_REMOVE(b);
887 H2_BLIST_INSERT_TAIL(&beam->send_list, b);
888 beam->sent_bytes += b->length;
889
890 return APR_SUCCESS;
891 }
892
h2_beam_send_from(h2_bucket_beam * beam,apr_pool_t * p)893 void h2_beam_send_from(h2_bucket_beam *beam, apr_pool_t *p)
894 {
895 h2_beam_lock bl;
896 /* Called from the sender thread to add buckets to the beam */
897 if (enter_yellow(beam, &bl) == APR_SUCCESS) {
898 r_purge_sent(beam);
899 beam_set_send_pool(beam, p);
900 leave_yellow(beam, &bl);
901 }
902 }
903
h2_beam_send(h2_bucket_beam * beam,apr_bucket_brigade * sender_bb,apr_read_type_e block)904 apr_status_t h2_beam_send(h2_bucket_beam *beam,
905 apr_bucket_brigade *sender_bb,
906 apr_read_type_e block)
907 {
908 apr_bucket *b;
909 apr_status_t rv = APR_SUCCESS;
910 apr_size_t space_left = 0;
911 h2_beam_lock bl;
912
913 /* Called from the sender thread to add buckets to the beam */
914 if (enter_yellow(beam, &bl) == APR_SUCCESS) {
915 ap_assert(beam->send_pool);
916 r_purge_sent(beam);
917
918 if (beam->aborted) {
919 move_to_hold(beam, sender_bb);
920 rv = APR_ECONNABORTED;
921 }
922 else if (sender_bb) {
923 int force_report = !APR_BRIGADE_EMPTY(sender_bb);
924
925 space_left = calc_space_left(beam);
926 while (!APR_BRIGADE_EMPTY(sender_bb) && APR_SUCCESS == rv) {
927 if (space_left <= 0) {
928 report_prod_io(beam, force_report, &bl);
929 r_purge_sent(beam);
930 rv = wait_not_full(beam, block, &space_left, &bl);
931 if (APR_SUCCESS != rv) {
932 break;
933 }
934 }
935 b = APR_BRIGADE_FIRST(sender_bb);
936 rv = append_bucket(beam, b, block, &space_left, &bl);
937 }
938
939 report_prod_io(beam, force_report, &bl);
940 apr_thread_cond_broadcast(beam->change);
941 }
942 report_consumption(beam, &bl);
943 leave_yellow(beam, &bl);
944 }
945 return rv;
946 }
947
h2_beam_receive(h2_bucket_beam * beam,apr_bucket_brigade * bb,apr_read_type_e block,apr_off_t readbytes,int * pclosed)948 apr_status_t h2_beam_receive(h2_bucket_beam *beam,
949 apr_bucket_brigade *bb,
950 apr_read_type_e block,
951 apr_off_t readbytes,
952 int *pclosed)
953 {
954 h2_beam_lock bl;
955 apr_bucket *bsender, *brecv, *ng;
956 int transferred = 0;
957 apr_status_t status = APR_SUCCESS;
958 apr_off_t remain;
959 int transferred_buckets = 0;
960
961 /* Called from the receiver thread to take buckets from the beam */
962 if (enter_yellow(beam, &bl) == APR_SUCCESS) {
963 if (readbytes <= 0) {
964 readbytes = (apr_off_t)APR_SIZE_MAX;
965 }
966 remain = readbytes;
967
968 transfer:
969 if (beam->aborted) {
970 recv_buffer_cleanup(beam, &bl);
971 status = APR_ECONNABORTED;
972 goto leave;
973 }
974
975 /* transfer enough buckets from our receiver brigade, if we have one */
976 while (remain >= 0
977 && beam->recv_buffer
978 && !APR_BRIGADE_EMPTY(beam->recv_buffer)) {
979
980 brecv = APR_BRIGADE_FIRST(beam->recv_buffer);
981 if (brecv->length > 0 && remain <= 0) {
982 break;
983 }
984 APR_BUCKET_REMOVE(brecv);
985 APR_BRIGADE_INSERT_TAIL(bb, brecv);
986 remain -= brecv->length;
987 ++transferred;
988 }
989
990 /* transfer from our sender brigade, transforming sender buckets to
991 * receiver ones until we have enough */
992 while (remain >= 0 && !H2_BLIST_EMPTY(&beam->send_list)) {
993
994 brecv = NULL;
995 bsender = H2_BLIST_FIRST(&beam->send_list);
996 if (bsender->length > 0 && remain <= 0) {
997 break;
998 }
999
1000 if (APR_BUCKET_IS_METADATA(bsender)) {
1001 if (APR_BUCKET_IS_EOS(bsender)) {
1002 brecv = apr_bucket_eos_create(bb->bucket_alloc);
1003 beam->close_sent = 1;
1004 }
1005 else if (APR_BUCKET_IS_FLUSH(bsender)) {
1006 brecv = apr_bucket_flush_create(bb->bucket_alloc);
1007 }
1008 else if (AP_BUCKET_IS_ERROR(bsender)) {
1009 ap_bucket_error *eb = (ap_bucket_error *)bsender;
1010 brecv = ap_bucket_error_create(eb->status, eb->data,
1011 bb->p, bb->bucket_alloc);
1012 }
1013 }
1014 else if (bsender->length == 0) {
1015 APR_BUCKET_REMOVE(bsender);
1016 H2_BLIST_INSERT_TAIL(&beam->hold_list, bsender);
1017 continue;
1018 }
1019 else if (APR_BUCKET_IS_FILE(bsender)) {
1020 /* This is set aside into the target brigade pool so that
1021 * any read operation messes with that pool and not
1022 * the sender one. */
1023 apr_bucket_file *f = (apr_bucket_file *)bsender->data;
1024 apr_file_t *fd = f->fd;
1025 int setaside = (f->readpool != bb->p);
1026
1027 if (setaside) {
1028 status = apr_file_setaside(&fd, fd, bb->p);
1029 if (status != APR_SUCCESS) {
1030 goto leave;
1031 }
1032 ++beam->files_beamed;
1033 }
1034 ng = apr_brigade_insert_file(bb, fd, bsender->start, (apr_off_t)bsender->length,
1035 bb->p);
1036 #if APR_HAS_MMAP
1037 /* disable mmap handling as this leads to segfaults when
1038 * the underlying file is changed while memory pointer has
1039 * been handed out. See also PR 59348 */
1040 apr_bucket_file_enable_mmap(ng, 0);
1041 #endif
1042 APR_BUCKET_REMOVE(bsender);
1043 H2_BLIST_INSERT_TAIL(&beam->hold_list, bsender);
1044
1045 remain -= bsender->length;
1046 beam->received_bytes += bsender->length;
1047 ++transferred;
1048 ++transferred_buckets;
1049 continue;
1050 }
1051 else {
1052 /* create a "receiver" standin bucket. we took care about the
1053 * underlying sender bucket and its data when we placed it into
1054 * the sender brigade.
1055 * the beam bucket will notify us on destruction that bsender is
1056 * no longer needed. */
1057 brecv = h2_beam_bucket_create(beam, bsender, bb->bucket_alloc,
1058 beam->buckets_sent++);
1059 }
1060
1061 /* Place the sender bucket into our hold, to be destroyed when no
1062 * receiver bucket references it any more. */
1063 APR_BUCKET_REMOVE(bsender);
1064 H2_BLIST_INSERT_TAIL(&beam->hold_list, bsender);
1065
1066 beam->received_bytes += bsender->length;
1067 ++transferred_buckets;
1068
1069 if (brecv) {
1070 APR_BRIGADE_INSERT_TAIL(bb, brecv);
1071 remain -= brecv->length;
1072 ++transferred;
1073 }
1074 else {
1075 /* let outside hook determine how bucket is beamed */
1076 leave_yellow(beam, &bl);
1077 brecv = h2_beam_bucket(beam, bb, bsender);
1078 enter_yellow(beam, &bl);
1079
1080 while (brecv && brecv != APR_BRIGADE_SENTINEL(bb)) {
1081 ++transferred;
1082 remain -= brecv->length;
1083 brecv = APR_BUCKET_NEXT(brecv);
1084 }
1085 }
1086 }
1087
1088 if (remain < 0) {
1089 /* too much, put some back into out recv_buffer */
1090 remain = readbytes;
1091 for (brecv = APR_BRIGADE_FIRST(bb);
1092 brecv != APR_BRIGADE_SENTINEL(bb);
1093 brecv = APR_BUCKET_NEXT(brecv)) {
1094 remain -= (beam->tx_mem_limits? bucket_mem_used(brecv)
1095 : (apr_off_t)brecv->length);
1096 if (remain < 0) {
1097 apr_bucket_split(brecv, (apr_size_t)((apr_off_t)brecv->length+remain));
1098 beam->recv_buffer = apr_brigade_split_ex(bb,
1099 APR_BUCKET_NEXT(brecv),
1100 beam->recv_buffer);
1101 break;
1102 }
1103 }
1104 }
1105
1106 if (beam->closed && buffer_is_empty(beam)) {
1107 /* beam is closed and we have nothing more to receive */
1108 if (!beam->close_sent) {
1109 apr_bucket *b = apr_bucket_eos_create(bb->bucket_alloc);
1110 APR_BRIGADE_INSERT_TAIL(bb, b);
1111 beam->close_sent = 1;
1112 ++transferred;
1113 status = APR_SUCCESS;
1114 }
1115 }
1116
1117 if (transferred_buckets > 0) {
1118 if (beam->cons_ev_cb) {
1119 beam->cons_ev_cb(beam->cons_ctx, beam);
1120 }
1121 }
1122
1123 if (transferred) {
1124 apr_thread_cond_broadcast(beam->change);
1125 status = APR_SUCCESS;
1126 }
1127 else {
1128 status = wait_not_empty(beam, block, bl.mutex);
1129 if (status != APR_SUCCESS) {
1130 goto leave;
1131 }
1132 goto transfer;
1133 }
1134 leave:
1135 if (pclosed) *pclosed = beam->closed? 1 : 0;
1136 leave_yellow(beam, &bl);
1137 }
1138 return status;
1139 }
1140
h2_beam_on_consumed(h2_bucket_beam * beam,h2_beam_ev_callback * ev_cb,h2_beam_io_callback * io_cb,void * ctx)1141 void h2_beam_on_consumed(h2_bucket_beam *beam,
1142 h2_beam_ev_callback *ev_cb,
1143 h2_beam_io_callback *io_cb, void *ctx)
1144 {
1145 h2_beam_lock bl;
1146 if (enter_yellow(beam, &bl) == APR_SUCCESS) {
1147 beam->cons_ev_cb = ev_cb;
1148 beam->cons_io_cb = io_cb;
1149 beam->cons_ctx = ctx;
1150 leave_yellow(beam, &bl);
1151 }
1152 }
1153
h2_beam_on_produced(h2_bucket_beam * beam,h2_beam_io_callback * io_cb,void * ctx)1154 void h2_beam_on_produced(h2_bucket_beam *beam,
1155 h2_beam_io_callback *io_cb, void *ctx)
1156 {
1157 h2_beam_lock bl;
1158 if (enter_yellow(beam, &bl) == APR_SUCCESS) {
1159 beam->prod_io_cb = io_cb;
1160 beam->prod_ctx = ctx;
1161 leave_yellow(beam, &bl);
1162 }
1163 }
1164
h2_beam_on_file_beam(h2_bucket_beam * beam,h2_beam_can_beam_callback * cb,void * ctx)1165 void h2_beam_on_file_beam(h2_bucket_beam *beam,
1166 h2_beam_can_beam_callback *cb, void *ctx)
1167 {
1168 h2_beam_lock bl;
1169
1170 if (enter_yellow(beam, &bl) == APR_SUCCESS) {
1171 beam->can_beam_fn = cb;
1172 beam->can_beam_ctx = ctx;
1173 leave_yellow(beam, &bl);
1174 }
1175 }
1176
1177
h2_beam_get_buffered(h2_bucket_beam * beam)1178 apr_off_t h2_beam_get_buffered(h2_bucket_beam *beam)
1179 {
1180 apr_bucket *b;
1181 apr_off_t l = 0;
1182 h2_beam_lock bl;
1183
1184 if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
1185 for (b = H2_BLIST_FIRST(&beam->send_list);
1186 b != H2_BLIST_SENTINEL(&beam->send_list);
1187 b = APR_BUCKET_NEXT(b)) {
1188 /* should all have determinate length */
1189 l += b->length;
1190 }
1191 leave_yellow(beam, &bl);
1192 }
1193 return l;
1194 }
1195
h2_beam_get_mem_used(h2_bucket_beam * beam)1196 apr_off_t h2_beam_get_mem_used(h2_bucket_beam *beam)
1197 {
1198 apr_bucket *b;
1199 apr_off_t l = 0;
1200 h2_beam_lock bl;
1201
1202 if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
1203 for (b = H2_BLIST_FIRST(&beam->send_list);
1204 b != H2_BLIST_SENTINEL(&beam->send_list);
1205 b = APR_BUCKET_NEXT(b)) {
1206 l += bucket_mem_used(b);
1207 }
1208 leave_yellow(beam, &bl);
1209 }
1210 return l;
1211 }
1212
h2_beam_empty(h2_bucket_beam * beam)1213 int h2_beam_empty(h2_bucket_beam *beam)
1214 {
1215 int empty = 1;
1216 h2_beam_lock bl;
1217
1218 if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
1219 empty = (H2_BLIST_EMPTY(&beam->send_list)
1220 && (!beam->recv_buffer || APR_BRIGADE_EMPTY(beam->recv_buffer)));
1221 leave_yellow(beam, &bl);
1222 }
1223 return empty;
1224 }
1225
h2_beam_holds_proxies(h2_bucket_beam * beam)1226 int h2_beam_holds_proxies(h2_bucket_beam *beam)
1227 {
1228 int has_proxies = 1;
1229 h2_beam_lock bl;
1230
1231 if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
1232 has_proxies = !H2_BPROXY_LIST_EMPTY(&beam->proxies);
1233 leave_yellow(beam, &bl);
1234 }
1235 return has_proxies;
1236 }
1237
h2_beam_was_received(h2_bucket_beam * beam)1238 int h2_beam_was_received(h2_bucket_beam *beam)
1239 {
1240 int happend = 0;
1241 h2_beam_lock bl;
1242
1243 if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
1244 happend = (beam->received_bytes > 0);
1245 leave_yellow(beam, &bl);
1246 }
1247 return happend;
1248 }
1249
h2_beam_get_files_beamed(h2_bucket_beam * beam)1250 apr_size_t h2_beam_get_files_beamed(h2_bucket_beam *beam)
1251 {
1252 apr_size_t n = 0;
1253 h2_beam_lock bl;
1254
1255 if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
1256 n = beam->files_beamed;
1257 leave_yellow(beam, &bl);
1258 }
1259 return n;
1260 }
1261
h2_beam_no_files(void * ctx,h2_bucket_beam * beam,apr_file_t * file)1262 int h2_beam_no_files(void *ctx, h2_bucket_beam *beam, apr_file_t *file)
1263 {
1264 (void)ctx; (void)beam; (void)file;
1265 return 0;
1266 }
1267
h2_beam_report_consumption(h2_bucket_beam * beam)1268 int h2_beam_report_consumption(h2_bucket_beam *beam)
1269 {
1270 h2_beam_lock bl;
1271 int rv = 0;
1272 if (enter_yellow(beam, &bl) == APR_SUCCESS) {
1273 rv = report_consumption(beam, &bl);
1274 leave_yellow(beam, &bl);
1275 }
1276 return rv;
1277 }
1278
h2_beam_log(h2_bucket_beam * beam,conn_rec * c,int level,const char * msg)1279 void h2_beam_log(h2_bucket_beam *beam, conn_rec *c, int level, const char *msg)
1280 {
1281 if (beam && APLOG_C_IS_LEVEL(c,level)) {
1282 ap_log_cerror(APLOG_MARK, level, 0, c,
1283 "beam(%ld-%d,%s,closed=%d,aborted=%d,empty=%d,buf=%ld): %s",
1284 (c->master? c->master->id : c->id), beam->id, beam->tag,
1285 beam->closed, beam->aborted, h2_beam_empty(beam),
1286 (long)h2_beam_get_buffered(beam), msg);
1287 }
1288 }
1289
1290
1291