1 /* Licensed to the Apache Software Foundation (ASF) under one or more
2  * contributor license agreements.  See the NOTICE file distributed with
3  * this work for additional information regarding copyright ownership.
4  * The ASF licenses this file to You under the Apache License, Version 2.0
5  * (the "License"); you may not use this file except in compliance with
6  * the License.  You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <apr_lib.h>
18 #include <apr_atomic.h>
19 #include <apr_strings.h>
20 #include <apr_time.h>
21 #include <apr_buckets.h>
22 #include <apr_thread_mutex.h>
23 #include <apr_thread_cond.h>
24 
25 #include <httpd.h>
26 #include <http_protocol.h>
27 #include <http_log.h>
28 
29 #include "h2_private.h"
30 #include "h2_util.h"
31 #include "h2_bucket_beam.h"
32 
33 static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy);
34 
35 #define H2_BPROXY_NEXT(e)             APR_RING_NEXT((e), link)
36 #define H2_BPROXY_PREV(e)             APR_RING_PREV((e), link)
37 #define H2_BPROXY_REMOVE(e)           APR_RING_REMOVE((e), link)
38 
39 #define H2_BPROXY_LIST_INIT(b)        APR_RING_INIT(&(b)->list, h2_beam_proxy, link);
40 #define H2_BPROXY_LIST_SENTINEL(b)    APR_RING_SENTINEL(&(b)->list, h2_beam_proxy, link)
41 #define H2_BPROXY_LIST_EMPTY(b)       APR_RING_EMPTY(&(b)->list, h2_beam_proxy, link)
42 #define H2_BPROXY_LIST_FIRST(b)       APR_RING_FIRST(&(b)->list)
43 #define H2_BPROXY_LIST_LAST(b)	      APR_RING_LAST(&(b)->list)
44 #define H2_PROXY_BLIST_INSERT_HEAD(b, e) do {				\
45 	h2_beam_proxy *ap__b = (e);                                        \
46 	APR_RING_INSERT_HEAD(&(b)->list, ap__b, h2_beam_proxy, link);	\
47     } while (0)
48 #define H2_BPROXY_LIST_INSERT_TAIL(b, e) do {				\
49 	h2_beam_proxy *ap__b = (e);					\
50 	APR_RING_INSERT_TAIL(&(b)->list, ap__b, h2_beam_proxy, link);	\
51     } while (0)
52 #define H2_BPROXY_LIST_CONCAT(a, b) do {					\
53         APR_RING_CONCAT(&(a)->list, &(b)->list, h2_beam_proxy, link);	\
54     } while (0)
55 #define H2_BPROXY_LIST_PREPEND(a, b) do {					\
56         APR_RING_PREPEND(&(a)->list, &(b)->list, h2_beam_proxy, link);	\
57     } while (0)
58 
59 
60 /*******************************************************************************
61  * beam bucket with reference to beam and bucket it represents
62  ******************************************************************************/
63 
64 const apr_bucket_type_t h2_bucket_type_beam;
65 
66 #define H2_BUCKET_IS_BEAM(e)     (e->type == &h2_bucket_type_beam)
67 
68 struct h2_beam_proxy {
69     apr_bucket_refcount refcount;
70     APR_RING_ENTRY(h2_beam_proxy) link;
71     h2_bucket_beam *beam;
72     apr_bucket *bsender;
73     apr_size_t n;
74 };
75 
76 static const char Dummy = '\0';
77 
beam_bucket_read(apr_bucket * b,const char ** str,apr_size_t * len,apr_read_type_e block)78 static apr_status_t beam_bucket_read(apr_bucket *b, const char **str,
79                                      apr_size_t *len, apr_read_type_e block)
80 {
81     h2_beam_proxy *d = b->data;
82     if (d->bsender) {
83         const char *data;
84         apr_status_t status = apr_bucket_read(d->bsender, &data, len, block);
85         if (status == APR_SUCCESS) {
86             *str = data + b->start;
87             *len = b->length;
88         }
89         return status;
90     }
91     *str = &Dummy;
92     *len = 0;
93     return APR_ECONNRESET;
94 }
95 
beam_bucket_destroy(void * data)96 static void beam_bucket_destroy(void *data)
97 {
98     h2_beam_proxy *d = data;
99 
100     if (apr_bucket_shared_destroy(d)) {
101         /* When the beam gets destroyed before this bucket, it will
102          * NULLify its reference here. This is not protected by a mutex,
103          * so it will not help with race conditions.
104          * But it lets us shut down memory pool with circulare beam
105          * references. */
106         if (d->beam) {
107             h2_beam_emitted(d->beam, d);
108         }
109         apr_bucket_free(d);
110     }
111 }
112 
h2_beam_bucket_make(apr_bucket * b,h2_bucket_beam * beam,apr_bucket * bsender,apr_size_t n)113 static apr_bucket * h2_beam_bucket_make(apr_bucket *b,
114                                         h2_bucket_beam *beam,
115                                         apr_bucket *bsender, apr_size_t n)
116 {
117     h2_beam_proxy *d;
118 
119     d = apr_bucket_alloc(sizeof(*d), b->list);
120     H2_BPROXY_LIST_INSERT_TAIL(&beam->proxies, d);
121     d->beam = beam;
122     d->bsender = bsender;
123     d->n = n;
124 
125     b = apr_bucket_shared_make(b, d, 0, bsender? bsender->length : 0);
126     b->type = &h2_bucket_type_beam;
127 
128     return b;
129 }
130 
h2_beam_bucket_create(h2_bucket_beam * beam,apr_bucket * bsender,apr_bucket_alloc_t * list,apr_size_t n)131 static apr_bucket *h2_beam_bucket_create(h2_bucket_beam *beam,
132                                          apr_bucket *bsender,
133                                          apr_bucket_alloc_t *list,
134                                          apr_size_t n)
135 {
136     apr_bucket *b = apr_bucket_alloc(sizeof(*b), list);
137 
138     APR_BUCKET_INIT(b);
139     b->free = apr_bucket_free;
140     b->list = list;
141     return h2_beam_bucket_make(b, beam, bsender, n);
142 }
143 
144 const apr_bucket_type_t h2_bucket_type_beam = {
145     "BEAM", 5, APR_BUCKET_DATA,
146     beam_bucket_destroy,
147     beam_bucket_read,
148     apr_bucket_setaside_noop,
149     apr_bucket_shared_split,
150     apr_bucket_shared_copy
151 };
152 
153 /*******************************************************************************
154  * h2_blist, a brigade without allocations
155  ******************************************************************************/
156 
157 static apr_array_header_t *beamers;
158 
cleanup_beamers(void * dummy)159 static apr_status_t cleanup_beamers(void *dummy)
160 {
161     (void)dummy;
162     beamers = NULL;
163     return APR_SUCCESS;
164 }
165 
h2_register_bucket_beamer(h2_bucket_beamer * beamer)166 void h2_register_bucket_beamer(h2_bucket_beamer *beamer)
167 {
168     if (!beamers) {
169         apr_pool_cleanup_register(apr_hook_global_pool, NULL,
170                                   cleanup_beamers, apr_pool_cleanup_null);
171         beamers = apr_array_make(apr_hook_global_pool, 10,
172                                  sizeof(h2_bucket_beamer*));
173     }
174     APR_ARRAY_PUSH(beamers, h2_bucket_beamer*) = beamer;
175 }
176 
h2_beam_bucket(h2_bucket_beam * beam,apr_bucket_brigade * dest,const apr_bucket * src)177 static apr_bucket *h2_beam_bucket(h2_bucket_beam *beam,
178                                   apr_bucket_brigade *dest,
179                                   const apr_bucket *src)
180 {
181     apr_bucket *b = NULL;
182     int i;
183     if (beamers) {
184         for (i = 0; i < beamers->nelts && b == NULL; ++i) {
185             h2_bucket_beamer *beamer;
186 
187             beamer = APR_ARRAY_IDX(beamers, i, h2_bucket_beamer*);
188             b = beamer(beam, dest, src);
189         }
190     }
191     return b;
192 }
193 
194 
195 /*******************************************************************************
196  * bucket beam that can transport buckets across threads
197  ******************************************************************************/
198 
mutex_leave(apr_thread_mutex_t * lock)199 static void mutex_leave(apr_thread_mutex_t *lock)
200 {
201     apr_thread_mutex_unlock(lock);
202 }
203 
mutex_enter(void * ctx,h2_beam_lock * pbl)204 static apr_status_t mutex_enter(void *ctx, h2_beam_lock *pbl)
205 {
206     h2_bucket_beam *beam = ctx;
207     pbl->mutex = beam->lock;
208     pbl->leave = mutex_leave;
209     return apr_thread_mutex_lock(pbl->mutex);
210 }
211 
enter_yellow(h2_bucket_beam * beam,h2_beam_lock * pbl)212 static apr_status_t enter_yellow(h2_bucket_beam *beam, h2_beam_lock *pbl)
213 {
214     return mutex_enter(beam, pbl);
215 }
216 
leave_yellow(h2_bucket_beam * beam,h2_beam_lock * pbl)217 static void leave_yellow(h2_bucket_beam *beam, h2_beam_lock *pbl)
218 {
219     (void)beam;
220     if (pbl->leave) {
221         pbl->leave(pbl->mutex);
222     }
223 }
224 
bucket_mem_used(apr_bucket * b)225 static apr_off_t bucket_mem_used(apr_bucket *b)
226 {
227     if (APR_BUCKET_IS_FILE(b)) {
228         return 0;
229     }
230     else {
231         /* should all have determinate length */
232         return (apr_off_t)b->length;
233     }
234 }
235 
report_consumption(h2_bucket_beam * beam,h2_beam_lock * pbl)236 static int report_consumption(h2_bucket_beam *beam, h2_beam_lock *pbl)
237 {
238     int rv = 0;
239     apr_off_t len = beam->received_bytes - beam->cons_bytes_reported;
240     h2_beam_io_callback *cb = beam->cons_io_cb;
241 
242     if (len > 0) {
243         if (cb) {
244             void *ctx = beam->cons_ctx;
245 
246             if (pbl) leave_yellow(beam, pbl);
247             cb(ctx, beam, len);
248             if (pbl) enter_yellow(beam, pbl);
249             rv = 1;
250         }
251         beam->cons_bytes_reported += len;
252     }
253     return rv;
254 }
255 
report_prod_io(h2_bucket_beam * beam,int force,h2_beam_lock * pbl)256 static void report_prod_io(h2_bucket_beam *beam, int force, h2_beam_lock *pbl)
257 {
258     apr_off_t len = beam->sent_bytes - beam->prod_bytes_reported;
259     if (force || len > 0) {
260         h2_beam_io_callback *cb = beam->prod_io_cb;
261         if (cb) {
262             void *ctx = beam->prod_ctx;
263 
264             leave_yellow(beam, pbl);
265             cb(ctx, beam, len);
266             enter_yellow(beam, pbl);
267         }
268         beam->prod_bytes_reported += len;
269     }
270 }
271 
calc_buffered(h2_bucket_beam * beam)272 static apr_size_t calc_buffered(h2_bucket_beam *beam)
273 {
274     apr_size_t len = 0;
275     apr_bucket *b;
276     for (b = H2_BLIST_FIRST(&beam->send_list);
277          b != H2_BLIST_SENTINEL(&beam->send_list);
278          b = APR_BUCKET_NEXT(b)) {
279         if (b->length == ((apr_size_t)-1)) {
280             /* do not count */
281         }
282         else if (APR_BUCKET_IS_FILE(b)) {
283             /* if unread, has no real mem footprint. */
284         }
285         else {
286             len += b->length;
287         }
288     }
289     return len;
290 }
291 
r_purge_sent(h2_bucket_beam * beam)292 static void r_purge_sent(h2_bucket_beam *beam)
293 {
294     apr_bucket *b;
295     /* delete all sender buckets in purge brigade, needs to be called
296      * from sender thread only */
297     while (!H2_BLIST_EMPTY(&beam->purge_list)) {
298         b = H2_BLIST_FIRST(&beam->purge_list);
299         apr_bucket_delete(b);
300     }
301 }
302 
calc_space_left(h2_bucket_beam * beam)303 static apr_size_t calc_space_left(h2_bucket_beam *beam)
304 {
305     if (beam->max_buf_size > 0) {
306         apr_size_t len = calc_buffered(beam);
307         return (beam->max_buf_size > len? (beam->max_buf_size - len) : 0);
308     }
309     return APR_SIZE_MAX;
310 }
311 
buffer_is_empty(h2_bucket_beam * beam)312 static int buffer_is_empty(h2_bucket_beam *beam)
313 {
314     return ((!beam->recv_buffer || APR_BRIGADE_EMPTY(beam->recv_buffer))
315             && H2_BLIST_EMPTY(&beam->send_list));
316 }
317 
wait_empty(h2_bucket_beam * beam,apr_read_type_e block,apr_thread_mutex_t * lock)318 static apr_status_t wait_empty(h2_bucket_beam *beam, apr_read_type_e block,
319                                apr_thread_mutex_t *lock)
320 {
321     apr_status_t rv = APR_SUCCESS;
322 
323     while (!buffer_is_empty(beam) && APR_SUCCESS == rv) {
324         if (APR_BLOCK_READ != block || !lock) {
325             rv = APR_EAGAIN;
326         }
327         else if (beam->timeout > 0) {
328             rv = apr_thread_cond_timedwait(beam->change, lock, beam->timeout);
329         }
330         else {
331             rv = apr_thread_cond_wait(beam->change, lock);
332         }
333     }
334     return rv;
335 }
336 
wait_not_empty(h2_bucket_beam * beam,apr_read_type_e block,apr_thread_mutex_t * lock)337 static apr_status_t wait_not_empty(h2_bucket_beam *beam, apr_read_type_e block,
338                                    apr_thread_mutex_t *lock)
339 {
340     apr_status_t rv = APR_SUCCESS;
341 
342     while (buffer_is_empty(beam) && APR_SUCCESS == rv) {
343         if (beam->aborted) {
344             rv = APR_ECONNABORTED;
345         }
346         else if (beam->closed) {
347             rv = APR_EOF;
348         }
349         else if (APR_BLOCK_READ != block || !lock) {
350             rv = APR_EAGAIN;
351         }
352         else if (beam->timeout > 0) {
353             rv = apr_thread_cond_timedwait(beam->change, lock, beam->timeout);
354         }
355         else {
356             rv = apr_thread_cond_wait(beam->change, lock);
357         }
358     }
359     return rv;
360 }
361 
wait_not_full(h2_bucket_beam * beam,apr_read_type_e block,apr_size_t * pspace_left,h2_beam_lock * bl)362 static apr_status_t wait_not_full(h2_bucket_beam *beam, apr_read_type_e block,
363                                   apr_size_t *pspace_left, h2_beam_lock *bl)
364 {
365     apr_status_t rv = APR_SUCCESS;
366     apr_size_t left;
367 
368     while (0 == (left = calc_space_left(beam)) && APR_SUCCESS == rv) {
369         if (beam->aborted) {
370             rv = APR_ECONNABORTED;
371         }
372         else if (block != APR_BLOCK_READ || !bl->mutex) {
373             rv = APR_EAGAIN;
374         }
375         else {
376             if (beam->timeout > 0) {
377                 rv = apr_thread_cond_timedwait(beam->change, bl->mutex, beam->timeout);
378             }
379             else {
380                 rv = apr_thread_cond_wait(beam->change, bl->mutex);
381             }
382         }
383     }
384     *pspace_left = left;
385     return rv;
386 }
387 
h2_beam_emitted(h2_bucket_beam * beam,h2_beam_proxy * proxy)388 static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy)
389 {
390     h2_beam_lock bl;
391     apr_bucket *b, *next;
392 
393     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
394         /* even when beam buckets are split, only the one where
395          * refcount drops to 0 will call us */
396         H2_BPROXY_REMOVE(proxy);
397         /* invoked from receiver thread, the last beam bucket for the send
398          * bucket is about to be destroyed.
399          * remove it from the hold, where it should be now */
400         if (proxy->bsender) {
401             for (b = H2_BLIST_FIRST(&beam->hold_list);
402                  b != H2_BLIST_SENTINEL(&beam->hold_list);
403                  b = APR_BUCKET_NEXT(b)) {
404                  if (b == proxy->bsender) {
405                     break;
406                  }
407             }
408             if (b != H2_BLIST_SENTINEL(&beam->hold_list)) {
409                 /* bucket is in hold as it should be, mark this one
410                  * and all before it for purging. We might have placed meta
411                  * buckets without a receiver proxy into the hold before it
412                  * and schedule them for purging now */
413                 for (b = H2_BLIST_FIRST(&beam->hold_list);
414                      b != H2_BLIST_SENTINEL(&beam->hold_list);
415                      b = next) {
416                     next = APR_BUCKET_NEXT(b);
417                     if (b == proxy->bsender) {
418                         APR_BUCKET_REMOVE(b);
419                         H2_BLIST_INSERT_TAIL(&beam->purge_list, b);
420                         break;
421                     }
422                     else if (APR_BUCKET_IS_METADATA(b)) {
423                         APR_BUCKET_REMOVE(b);
424                         H2_BLIST_INSERT_TAIL(&beam->purge_list, b);
425                     }
426                     else {
427                         /* another data bucket before this one in hold. this
428                          * is normal since DATA buckets need not be destroyed
429                          * in order */
430                     }
431                 }
432 
433                 proxy->bsender = NULL;
434             }
435             else {
436                 /* it should be there unless we screwed up */
437                 ap_log_perror(APLOG_MARK, APLOG_WARNING, 0, beam->send_pool,
438                               APLOGNO(03384) "h2_beam(%d-%s): emitted bucket not "
439                               "in hold, n=%d", beam->id, beam->tag,
440                               (int)proxy->n);
441                 ap_assert(!proxy->bsender);
442             }
443         }
444         /* notify anyone waiting on space to become available */
445         if (!bl.mutex) {
446             r_purge_sent(beam);
447         }
448         else {
449             apr_thread_cond_broadcast(beam->change);
450         }
451         leave_yellow(beam, &bl);
452     }
453 }
454 
h2_blist_cleanup(h2_blist * bl)455 static void h2_blist_cleanup(h2_blist *bl)
456 {
457     apr_bucket *e;
458 
459     while (!H2_BLIST_EMPTY(bl)) {
460         e = H2_BLIST_FIRST(bl);
461         apr_bucket_delete(e);
462     }
463 }
464 
beam_close(h2_bucket_beam * beam)465 static apr_status_t beam_close(h2_bucket_beam *beam)
466 {
467     if (!beam->closed) {
468         beam->closed = 1;
469         apr_thread_cond_broadcast(beam->change);
470     }
471     return APR_SUCCESS;
472 }
473 
h2_beam_is_closed(h2_bucket_beam * beam)474 int h2_beam_is_closed(h2_bucket_beam *beam)
475 {
476     return beam->closed;
477 }
478 
pool_register(h2_bucket_beam * beam,apr_pool_t * pool,apr_status_t (* cleanup)(void *))479 static int pool_register(h2_bucket_beam *beam, apr_pool_t *pool,
480                          apr_status_t (*cleanup)(void *))
481 {
482     if (pool && pool != beam->pool) {
483         apr_pool_pre_cleanup_register(pool, beam, cleanup);
484         return 1;
485     }
486     return 0;
487 }
488 
pool_kill(h2_bucket_beam * beam,apr_pool_t * pool,apr_status_t (* cleanup)(void *))489 static int pool_kill(h2_bucket_beam *beam, apr_pool_t *pool,
490                      apr_status_t (*cleanup)(void *)) {
491     if (pool && pool != beam->pool) {
492         apr_pool_cleanup_kill(pool, beam, cleanup);
493         return 1;
494     }
495     return 0;
496 }
497 
beam_recv_cleanup(void * data)498 static apr_status_t beam_recv_cleanup(void *data)
499 {
500     h2_bucket_beam *beam = data;
501     /* receiver pool has gone away, clear references */
502     beam->recv_buffer = NULL;
503     beam->recv_pool = NULL;
504     return APR_SUCCESS;
505 }
506 
beam_send_cleanup(void * data)507 static apr_status_t beam_send_cleanup(void *data)
508 {
509     h2_bucket_beam *beam = data;
510     /* sender is going away, clear up all references to its memory */
511     r_purge_sent(beam);
512     h2_blist_cleanup(&beam->send_list);
513     report_consumption(beam, NULL);
514     while (!H2_BPROXY_LIST_EMPTY(&beam->proxies)) {
515         h2_beam_proxy *proxy = H2_BPROXY_LIST_FIRST(&beam->proxies);
516         H2_BPROXY_REMOVE(proxy);
517         proxy->beam = NULL;
518         proxy->bsender = NULL;
519     }
520     h2_blist_cleanup(&beam->purge_list);
521     h2_blist_cleanup(&beam->hold_list);
522     beam->send_pool = NULL;
523     return APR_SUCCESS;
524 }
525 
beam_set_send_pool(h2_bucket_beam * beam,apr_pool_t * pool)526 static void beam_set_send_pool(h2_bucket_beam *beam, apr_pool_t *pool)
527 {
528     if (beam->send_pool != pool) {
529         if (beam->send_pool && beam->send_pool != beam->pool) {
530             pool_kill(beam, beam->send_pool, beam_send_cleanup);
531             beam_send_cleanup(beam);
532         }
533         beam->send_pool = pool;
534         pool_register(beam, beam->send_pool, beam_send_cleanup);
535     }
536 }
537 
recv_buffer_cleanup(h2_bucket_beam * beam,h2_beam_lock * bl)538 static void recv_buffer_cleanup(h2_bucket_beam *beam, h2_beam_lock *bl)
539 {
540     if (beam->recv_buffer && !APR_BRIGADE_EMPTY(beam->recv_buffer)) {
541         apr_bucket_brigade *bb = beam->recv_buffer;
542         apr_off_t bblen = 0;
543 
544         beam->recv_buffer = NULL;
545         apr_brigade_length(bb, 0, &bblen);
546         beam->received_bytes += bblen;
547 
548         /* need to do this unlocked since bucket destroy might
549          * call this beam again. */
550         if (bl) leave_yellow(beam, bl);
551         apr_brigade_destroy(bb);
552         if (bl) enter_yellow(beam, bl);
553 
554         apr_thread_cond_broadcast(beam->change);
555         if (beam->cons_ev_cb) {
556             beam->cons_ev_cb(beam->cons_ctx, beam);
557         }
558     }
559 }
560 
beam_cleanup(h2_bucket_beam * beam,int from_pool)561 static apr_status_t beam_cleanup(h2_bucket_beam *beam, int from_pool)
562 {
563     apr_status_t status = APR_SUCCESS;
564     int safe_send = (beam->owner == H2_BEAM_OWNER_SEND);
565     int safe_recv = (beam->owner == H2_BEAM_OWNER_RECV);
566 
567     /*
568      * Owner of the beam is going away, depending on which side it owns,
569      * cleanup strategies will differ.
570      *
571      * In general, receiver holds references to memory from sender.
572      * Clean up receiver first, if safe, then cleanup sender, if safe.
573      */
574 
575      /* When called from pool destroy, io callbacks are disabled */
576      if (from_pool) {
577          beam->cons_io_cb = NULL;
578      }
579 
580     /* When modify send is not safe, this means we still have multi-thread
581      * protection and the owner is receiving the buckets. If the sending
582      * side has not gone away, this means we could have dangling buckets
583      * in our lists that never get destroyed. This should not happen. */
584     ap_assert(safe_send || !beam->send_pool);
585     if (!H2_BLIST_EMPTY(&beam->send_list)) {
586         ap_assert(beam->send_pool);
587     }
588 
589     if (safe_recv) {
590         if (beam->recv_pool) {
591             pool_kill(beam, beam->recv_pool, beam_recv_cleanup);
592             beam->recv_pool = NULL;
593         }
594         recv_buffer_cleanup(beam, NULL);
595     }
596     else {
597         beam->recv_buffer = NULL;
598         beam->recv_pool = NULL;
599     }
600 
601     if (safe_send && beam->send_pool) {
602         pool_kill(beam, beam->send_pool, beam_send_cleanup);
603         status = beam_send_cleanup(beam);
604     }
605 
606     if (safe_recv) {
607         ap_assert(H2_BPROXY_LIST_EMPTY(&beam->proxies));
608         ap_assert(H2_BLIST_EMPTY(&beam->send_list));
609         ap_assert(H2_BLIST_EMPTY(&beam->hold_list));
610         ap_assert(H2_BLIST_EMPTY(&beam->purge_list));
611     }
612     return status;
613 }
614 
beam_pool_cleanup(void * data)615 static apr_status_t beam_pool_cleanup(void *data)
616 {
617     return beam_cleanup(data, 1);
618 }
619 
h2_beam_destroy(h2_bucket_beam * beam)620 apr_status_t h2_beam_destroy(h2_bucket_beam *beam)
621 {
622     apr_pool_cleanup_kill(beam->pool, beam, beam_pool_cleanup);
623     return beam_cleanup(beam, 0);
624 }
625 
h2_beam_create(h2_bucket_beam ** pbeam,apr_pool_t * pool,int id,const char * tag,h2_beam_owner_t owner,apr_size_t max_buf_size,apr_interval_time_t timeout)626 apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *pool,
627                             int id, const char *tag,
628                             h2_beam_owner_t owner,
629                             apr_size_t max_buf_size,
630                             apr_interval_time_t timeout)
631 {
632     h2_bucket_beam *beam;
633     apr_status_t rv = APR_SUCCESS;
634 
635     beam = apr_pcalloc(pool, sizeof(*beam));
636     if (!beam) {
637         return APR_ENOMEM;
638     }
639 
640     beam->id = id;
641     beam->tag = tag;
642     beam->pool = pool;
643     beam->owner = owner;
644     H2_BLIST_INIT(&beam->send_list);
645     H2_BLIST_INIT(&beam->hold_list);
646     H2_BLIST_INIT(&beam->purge_list);
647     H2_BPROXY_LIST_INIT(&beam->proxies);
648     beam->tx_mem_limits = 1;
649     beam->max_buf_size = max_buf_size;
650     beam->timeout = timeout;
651 
652     rv = apr_thread_mutex_create(&beam->lock, APR_THREAD_MUTEX_DEFAULT, pool);
653     if (APR_SUCCESS == rv) {
654         rv = apr_thread_cond_create(&beam->change, pool);
655         if (APR_SUCCESS == rv) {
656             apr_pool_pre_cleanup_register(pool, beam, beam_pool_cleanup);
657             *pbeam = beam;
658         }
659     }
660     return rv;
661 }
662 
h2_beam_buffer_size_set(h2_bucket_beam * beam,apr_size_t buffer_size)663 void h2_beam_buffer_size_set(h2_bucket_beam *beam, apr_size_t buffer_size)
664 {
665     h2_beam_lock bl;
666 
667     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
668         beam->max_buf_size = buffer_size;
669         leave_yellow(beam, &bl);
670     }
671 }
672 
h2_beam_buffer_size_get(h2_bucket_beam * beam)673 apr_size_t h2_beam_buffer_size_get(h2_bucket_beam *beam)
674 {
675     h2_beam_lock bl;
676     apr_size_t buffer_size = 0;
677 
678     if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
679         buffer_size = beam->max_buf_size;
680         leave_yellow(beam, &bl);
681     }
682     return buffer_size;
683 }
684 
h2_beam_timeout_set(h2_bucket_beam * beam,apr_interval_time_t timeout)685 void h2_beam_timeout_set(h2_bucket_beam *beam, apr_interval_time_t timeout)
686 {
687     h2_beam_lock bl;
688 
689     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
690         beam->timeout = timeout;
691         leave_yellow(beam, &bl);
692     }
693 }
694 
h2_beam_timeout_get(h2_bucket_beam * beam)695 apr_interval_time_t h2_beam_timeout_get(h2_bucket_beam *beam)
696 {
697     h2_beam_lock bl;
698     apr_interval_time_t timeout = 0;
699 
700     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
701         timeout = beam->timeout;
702         leave_yellow(beam, &bl);
703     }
704     return timeout;
705 }
706 
h2_beam_abort(h2_bucket_beam * beam)707 void h2_beam_abort(h2_bucket_beam *beam)
708 {
709     h2_beam_lock bl;
710 
711     if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
712         beam->aborted = 1;
713         r_purge_sent(beam);
714         h2_blist_cleanup(&beam->send_list);
715         report_consumption(beam, &bl);
716         apr_thread_cond_broadcast(beam->change);
717         leave_yellow(beam, &bl);
718     }
719 }
720 
h2_beam_close(h2_bucket_beam * beam)721 apr_status_t h2_beam_close(h2_bucket_beam *beam)
722 {
723     h2_beam_lock bl;
724 
725     if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
726         r_purge_sent(beam);
727         beam_close(beam);
728         report_consumption(beam, &bl);
729         leave_yellow(beam, &bl);
730     }
731     return beam->aborted? APR_ECONNABORTED : APR_SUCCESS;
732 }
733 
h2_beam_leave(h2_bucket_beam * beam)734 apr_status_t h2_beam_leave(h2_bucket_beam *beam)
735 {
736     h2_beam_lock bl;
737 
738     if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
739         recv_buffer_cleanup(beam, &bl);
740         beam->aborted = 1;
741         beam_close(beam);
742         leave_yellow(beam, &bl);
743     }
744     return APR_SUCCESS;
745 }
746 
h2_beam_wait_empty(h2_bucket_beam * beam,apr_read_type_e block)747 apr_status_t h2_beam_wait_empty(h2_bucket_beam *beam, apr_read_type_e block)
748 {
749     apr_status_t status;
750     h2_beam_lock bl;
751 
752     if ((status = enter_yellow(beam, &bl)) == APR_SUCCESS) {
753         status = wait_empty(beam, block, bl.mutex);
754         leave_yellow(beam, &bl);
755     }
756     return status;
757 }
758 
move_to_hold(h2_bucket_beam * beam,apr_bucket_brigade * sender_bb)759 static void move_to_hold(h2_bucket_beam *beam,
760                          apr_bucket_brigade *sender_bb)
761 {
762     apr_bucket *b;
763     while (sender_bb && !APR_BRIGADE_EMPTY(sender_bb)) {
764         b = APR_BRIGADE_FIRST(sender_bb);
765         APR_BUCKET_REMOVE(b);
766         H2_BLIST_INSERT_TAIL(&beam->send_list, b);
767     }
768 }
769 
append_bucket(h2_bucket_beam * beam,apr_bucket * b,apr_read_type_e block,apr_size_t * pspace_left,h2_beam_lock * pbl)770 static apr_status_t append_bucket(h2_bucket_beam *beam,
771                                   apr_bucket *b,
772                                   apr_read_type_e block,
773                                   apr_size_t *pspace_left,
774                                   h2_beam_lock *pbl)
775 {
776     const char *data;
777     apr_size_t len;
778     apr_status_t status;
779     int can_beam = 0, check_len;
780 
781     (void)block;
782     (void)pbl;
783     if (beam->aborted) {
784         return APR_ECONNABORTED;
785     }
786 
787     if (APR_BUCKET_IS_METADATA(b)) {
788         if (APR_BUCKET_IS_EOS(b)) {
789             beam->closed = 1;
790         }
791         APR_BUCKET_REMOVE(b);
792         H2_BLIST_INSERT_TAIL(&beam->send_list, b);
793         return APR_SUCCESS;
794     }
795     else if (APR_BUCKET_IS_FILE(b)) {
796         /* For file buckets the problem is their internal readpool that
797          * is used on the first read to allocate buffer/mmap.
798          * Since setting aside a file bucket will de-register the
799          * file cleanup function from the previous pool, we need to
800          * call that only from the sender thread.
801          *
802          * Currently, we do not handle file bucket with refcount > 1 as
803          * the beam is then not in complete control of the file's lifetime.
804          * Which results in the bug that a file get closed by the receiver
805          * while the sender or the beam still have buckets using it.
806          *
807          * Additionally, we allow callbacks to prevent beaming file
808          * handles across. The use case for this is to limit the number
809          * of open file handles and rather use a less efficient beam
810          * transport. */
811         apr_bucket_file *bf = b->data;
812         apr_file_t *fd = bf->fd;
813         can_beam = (bf->refcount.refcount == 1);
814         if (can_beam && beam->can_beam_fn) {
815             can_beam = beam->can_beam_fn(beam->can_beam_ctx, beam, fd);
816         }
817         check_len = !can_beam;
818     }
819     else {
820         if (b->length == ((apr_size_t)-1)) {
821             const char *data2;
822             status = apr_bucket_read(b, &data2, &len, APR_BLOCK_READ);
823             if (status != APR_SUCCESS) {
824                 return status;
825             }
826         }
827         check_len = 1;
828     }
829 
830     if (check_len) {
831         if (b->length > *pspace_left) {
832             apr_bucket_split(b, *pspace_left);
833         }
834         *pspace_left -= b->length;
835     }
836 
837     /* The fundamental problem is that reading a sender bucket from
838      * a receiver thread is a total NO GO, because the bucket might use
839      * its pool/bucket_alloc from a foreign thread and that will
840      * corrupt. */
841     status = APR_ENOTIMPL;
842     if (APR_BUCKET_IS_TRANSIENT(b)) {
843         /* this takes care of transient buckets and converts them
844          * into heap ones. Other bucket types might or might not be
845          * affected by this. */
846         status = apr_bucket_setaside(b, beam->send_pool);
847     }
848     else if (APR_BUCKET_IS_HEAP(b)) {
849         /* For heap buckets read from a receiver thread is fine. The
850          * data will be there and live until the bucket itself is
851          * destroyed. */
852         status = APR_SUCCESS;
853     }
854     else if (APR_BUCKET_IS_POOL(b)) {
855         /* pool buckets are bastards that register at pool cleanup
856          * to morph themselves into heap buckets. That may happen anytime,
857          * even after the bucket data pointer has been read. So at
858          * any time inside the receiver thread, the pool bucket memory
859          * may disappear. yikes. */
860         status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
861         if (status == APR_SUCCESS) {
862             apr_bucket_heap_make(b, data, len, NULL);
863         }
864     }
865     else if (APR_BUCKET_IS_FILE(b) && can_beam) {
866         status = apr_bucket_setaside(b, beam->send_pool);
867     }
868 
869     if (status == APR_ENOTIMPL) {
870         /* we have no knowledge about the internals of this bucket,
871          * but hope that after read, its data stays immutable for the
872          * lifetime of the bucket. (see pool bucket handling above for
873          * a counter example).
874          * We do the read while in the sender thread, so that the bucket may
875          * use pools/allocators safely. */
876         status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
877         if (status == APR_SUCCESS) {
878             status = apr_bucket_setaside(b, beam->send_pool);
879         }
880     }
881 
882     if (status != APR_SUCCESS && status != APR_ENOTIMPL) {
883         return status;
884     }
885 
886     APR_BUCKET_REMOVE(b);
887     H2_BLIST_INSERT_TAIL(&beam->send_list, b);
888     beam->sent_bytes += b->length;
889 
890     return APR_SUCCESS;
891 }
892 
h2_beam_send_from(h2_bucket_beam * beam,apr_pool_t * p)893 void h2_beam_send_from(h2_bucket_beam *beam, apr_pool_t *p)
894 {
895     h2_beam_lock bl;
896     /* Called from the sender thread to add buckets to the beam */
897     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
898         r_purge_sent(beam);
899         beam_set_send_pool(beam, p);
900         leave_yellow(beam, &bl);
901     }
902 }
903 
h2_beam_send(h2_bucket_beam * beam,apr_bucket_brigade * sender_bb,apr_read_type_e block)904 apr_status_t h2_beam_send(h2_bucket_beam *beam,
905                           apr_bucket_brigade *sender_bb,
906                           apr_read_type_e block)
907 {
908     apr_bucket *b;
909     apr_status_t rv = APR_SUCCESS;
910     apr_size_t space_left = 0;
911     h2_beam_lock bl;
912 
913     /* Called from the sender thread to add buckets to the beam */
914     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
915         ap_assert(beam->send_pool);
916         r_purge_sent(beam);
917 
918         if (beam->aborted) {
919             move_to_hold(beam, sender_bb);
920             rv = APR_ECONNABORTED;
921         }
922         else if (sender_bb) {
923             int force_report = !APR_BRIGADE_EMPTY(sender_bb);
924 
925             space_left = calc_space_left(beam);
926             while (!APR_BRIGADE_EMPTY(sender_bb) && APR_SUCCESS == rv) {
927                 if (space_left <= 0) {
928                     report_prod_io(beam, force_report, &bl);
929                     r_purge_sent(beam);
930                     rv = wait_not_full(beam, block, &space_left, &bl);
931                     if (APR_SUCCESS != rv) {
932                         break;
933                     }
934                 }
935                 b = APR_BRIGADE_FIRST(sender_bb);
936                 rv = append_bucket(beam, b, block, &space_left, &bl);
937             }
938 
939             report_prod_io(beam, force_report, &bl);
940             apr_thread_cond_broadcast(beam->change);
941         }
942         report_consumption(beam, &bl);
943         leave_yellow(beam, &bl);
944     }
945     return rv;
946 }
947 
h2_beam_receive(h2_bucket_beam * beam,apr_bucket_brigade * bb,apr_read_type_e block,apr_off_t readbytes,int * pclosed)948 apr_status_t h2_beam_receive(h2_bucket_beam *beam,
949                              apr_bucket_brigade *bb,
950                              apr_read_type_e block,
951                              apr_off_t readbytes,
952                              int *pclosed)
953 {
954     h2_beam_lock bl;
955     apr_bucket *bsender, *brecv, *ng;
956     int transferred = 0;
957     apr_status_t status = APR_SUCCESS;
958     apr_off_t remain;
959     int transferred_buckets = 0;
960 
961     /* Called from the receiver thread to take buckets from the beam */
962     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
963         if (readbytes <= 0) {
964             readbytes = (apr_off_t)APR_SIZE_MAX;
965         }
966         remain = readbytes;
967 
968 transfer:
969         if (beam->aborted) {
970             recv_buffer_cleanup(beam, &bl);
971             status = APR_ECONNABORTED;
972             goto leave;
973         }
974 
975         /* transfer enough buckets from our receiver brigade, if we have one */
976         while (remain >= 0
977                && beam->recv_buffer
978                && !APR_BRIGADE_EMPTY(beam->recv_buffer)) {
979 
980             brecv = APR_BRIGADE_FIRST(beam->recv_buffer);
981             if (brecv->length > 0 && remain <= 0) {
982                 break;
983             }
984             APR_BUCKET_REMOVE(brecv);
985             APR_BRIGADE_INSERT_TAIL(bb, brecv);
986             remain -= brecv->length;
987             ++transferred;
988         }
989 
990         /* transfer from our sender brigade, transforming sender buckets to
991          * receiver ones until we have enough */
992         while (remain >= 0 && !H2_BLIST_EMPTY(&beam->send_list)) {
993 
994             brecv = NULL;
995             bsender = H2_BLIST_FIRST(&beam->send_list);
996             if (bsender->length > 0 && remain <= 0) {
997                 break;
998             }
999 
1000             if (APR_BUCKET_IS_METADATA(bsender)) {
1001                 if (APR_BUCKET_IS_EOS(bsender)) {
1002                     brecv = apr_bucket_eos_create(bb->bucket_alloc);
1003                     beam->close_sent = 1;
1004                 }
1005                 else if (APR_BUCKET_IS_FLUSH(bsender)) {
1006                     brecv = apr_bucket_flush_create(bb->bucket_alloc);
1007                 }
1008                 else if (AP_BUCKET_IS_ERROR(bsender)) {
1009                     ap_bucket_error *eb = (ap_bucket_error *)bsender;
1010                     brecv = ap_bucket_error_create(eb->status, eb->data,
1011                                                     bb->p, bb->bucket_alloc);
1012                 }
1013             }
1014             else if (bsender->length == 0) {
1015                 APR_BUCKET_REMOVE(bsender);
1016                 H2_BLIST_INSERT_TAIL(&beam->hold_list, bsender);
1017                 continue;
1018             }
1019             else if (APR_BUCKET_IS_FILE(bsender)) {
1020                 /* This is set aside into the target brigade pool so that
1021                  * any read operation messes with that pool and not
1022                  * the sender one. */
1023                 apr_bucket_file *f = (apr_bucket_file *)bsender->data;
1024                 apr_file_t *fd = f->fd;
1025                 int setaside = (f->readpool != bb->p);
1026 
1027                 if (setaside) {
1028                     status = apr_file_setaside(&fd, fd, bb->p);
1029                     if (status != APR_SUCCESS) {
1030                         goto leave;
1031                     }
1032                     ++beam->files_beamed;
1033                 }
1034                 ng = apr_brigade_insert_file(bb, fd, bsender->start, (apr_off_t)bsender->length,
1035                                              bb->p);
1036 #if APR_HAS_MMAP
1037                 /* disable mmap handling as this leads to segfaults when
1038                  * the underlying file is changed while memory pointer has
1039                  * been handed out. See also PR 59348 */
1040                 apr_bucket_file_enable_mmap(ng, 0);
1041 #endif
1042                 APR_BUCKET_REMOVE(bsender);
1043                 H2_BLIST_INSERT_TAIL(&beam->hold_list, bsender);
1044 
1045                 remain -= bsender->length;
1046                 beam->received_bytes += bsender->length;
1047                 ++transferred;
1048                 ++transferred_buckets;
1049                 continue;
1050             }
1051             else {
1052                 /* create a "receiver" standin bucket. we took care about the
1053                  * underlying sender bucket and its data when we placed it into
1054                  * the sender brigade.
1055                  * the beam bucket will notify us on destruction that bsender is
1056                  * no longer needed. */
1057                 brecv = h2_beam_bucket_create(beam, bsender, bb->bucket_alloc,
1058                                                beam->buckets_sent++);
1059             }
1060 
1061             /* Place the sender bucket into our hold, to be destroyed when no
1062              * receiver bucket references it any more. */
1063             APR_BUCKET_REMOVE(bsender);
1064             H2_BLIST_INSERT_TAIL(&beam->hold_list, bsender);
1065 
1066             beam->received_bytes += bsender->length;
1067             ++transferred_buckets;
1068 
1069             if (brecv) {
1070                 APR_BRIGADE_INSERT_TAIL(bb, brecv);
1071                 remain -= brecv->length;
1072                 ++transferred;
1073             }
1074             else {
1075                 /* let outside hook determine how bucket is beamed */
1076                 leave_yellow(beam, &bl);
1077                 brecv = h2_beam_bucket(beam, bb, bsender);
1078                 enter_yellow(beam, &bl);
1079 
1080                 while (brecv && brecv != APR_BRIGADE_SENTINEL(bb)) {
1081                     ++transferred;
1082                     remain -= brecv->length;
1083                     brecv = APR_BUCKET_NEXT(brecv);
1084                 }
1085             }
1086         }
1087 
1088         if (remain < 0) {
1089             /* too much, put some back into out recv_buffer */
1090             remain = readbytes;
1091             for (brecv = APR_BRIGADE_FIRST(bb);
1092                  brecv != APR_BRIGADE_SENTINEL(bb);
1093                  brecv = APR_BUCKET_NEXT(brecv)) {
1094                 remain -= (beam->tx_mem_limits? bucket_mem_used(brecv)
1095                            : (apr_off_t)brecv->length);
1096                 if (remain < 0) {
1097                     apr_bucket_split(brecv, (apr_size_t)((apr_off_t)brecv->length+remain));
1098                     beam->recv_buffer = apr_brigade_split_ex(bb,
1099                                                              APR_BUCKET_NEXT(brecv),
1100                                                              beam->recv_buffer);
1101                     break;
1102                 }
1103             }
1104         }
1105 
1106         if (beam->closed && buffer_is_empty(beam)) {
1107             /* beam is closed and we have nothing more to receive */
1108             if (!beam->close_sent) {
1109                 apr_bucket *b = apr_bucket_eos_create(bb->bucket_alloc);
1110                 APR_BRIGADE_INSERT_TAIL(bb, b);
1111                 beam->close_sent = 1;
1112                 ++transferred;
1113                 status = APR_SUCCESS;
1114             }
1115         }
1116 
1117         if (transferred_buckets > 0) {
1118            if (beam->cons_ev_cb) {
1119                beam->cons_ev_cb(beam->cons_ctx, beam);
1120             }
1121         }
1122 
1123         if (transferred) {
1124             apr_thread_cond_broadcast(beam->change);
1125             status = APR_SUCCESS;
1126         }
1127         else {
1128             status = wait_not_empty(beam, block, bl.mutex);
1129             if (status != APR_SUCCESS) {
1130                 goto leave;
1131             }
1132             goto transfer;
1133         }
1134 leave:
1135         if (pclosed) *pclosed = beam->closed? 1 : 0;
1136         leave_yellow(beam, &bl);
1137     }
1138     return status;
1139 }
1140 
h2_beam_on_consumed(h2_bucket_beam * beam,h2_beam_ev_callback * ev_cb,h2_beam_io_callback * io_cb,void * ctx)1141 void h2_beam_on_consumed(h2_bucket_beam *beam,
1142                          h2_beam_ev_callback *ev_cb,
1143                          h2_beam_io_callback *io_cb, void *ctx)
1144 {
1145     h2_beam_lock bl;
1146     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
1147         beam->cons_ev_cb = ev_cb;
1148         beam->cons_io_cb = io_cb;
1149         beam->cons_ctx = ctx;
1150         leave_yellow(beam, &bl);
1151     }
1152 }
1153 
h2_beam_on_produced(h2_bucket_beam * beam,h2_beam_io_callback * io_cb,void * ctx)1154 void h2_beam_on_produced(h2_bucket_beam *beam,
1155                          h2_beam_io_callback *io_cb, void *ctx)
1156 {
1157     h2_beam_lock bl;
1158     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
1159         beam->prod_io_cb = io_cb;
1160         beam->prod_ctx = ctx;
1161         leave_yellow(beam, &bl);
1162     }
1163 }
1164 
h2_beam_on_file_beam(h2_bucket_beam * beam,h2_beam_can_beam_callback * cb,void * ctx)1165 void h2_beam_on_file_beam(h2_bucket_beam *beam,
1166                           h2_beam_can_beam_callback *cb, void *ctx)
1167 {
1168     h2_beam_lock bl;
1169 
1170     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
1171         beam->can_beam_fn = cb;
1172         beam->can_beam_ctx = ctx;
1173         leave_yellow(beam, &bl);
1174     }
1175 }
1176 
1177 
h2_beam_get_buffered(h2_bucket_beam * beam)1178 apr_off_t h2_beam_get_buffered(h2_bucket_beam *beam)
1179 {
1180     apr_bucket *b;
1181     apr_off_t l = 0;
1182     h2_beam_lock bl;
1183 
1184     if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
1185         for (b = H2_BLIST_FIRST(&beam->send_list);
1186             b != H2_BLIST_SENTINEL(&beam->send_list);
1187             b = APR_BUCKET_NEXT(b)) {
1188             /* should all have determinate length */
1189             l += b->length;
1190         }
1191         leave_yellow(beam, &bl);
1192     }
1193     return l;
1194 }
1195 
h2_beam_get_mem_used(h2_bucket_beam * beam)1196 apr_off_t h2_beam_get_mem_used(h2_bucket_beam *beam)
1197 {
1198     apr_bucket *b;
1199     apr_off_t l = 0;
1200     h2_beam_lock bl;
1201 
1202     if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
1203         for (b = H2_BLIST_FIRST(&beam->send_list);
1204             b != H2_BLIST_SENTINEL(&beam->send_list);
1205             b = APR_BUCKET_NEXT(b)) {
1206             l += bucket_mem_used(b);
1207         }
1208         leave_yellow(beam, &bl);
1209     }
1210     return l;
1211 }
1212 
h2_beam_empty(h2_bucket_beam * beam)1213 int h2_beam_empty(h2_bucket_beam *beam)
1214 {
1215     int empty = 1;
1216     h2_beam_lock bl;
1217 
1218     if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
1219         empty = (H2_BLIST_EMPTY(&beam->send_list)
1220                  && (!beam->recv_buffer || APR_BRIGADE_EMPTY(beam->recv_buffer)));
1221         leave_yellow(beam, &bl);
1222     }
1223     return empty;
1224 }
1225 
h2_beam_holds_proxies(h2_bucket_beam * beam)1226 int h2_beam_holds_proxies(h2_bucket_beam *beam)
1227 {
1228     int has_proxies = 1;
1229     h2_beam_lock bl;
1230 
1231     if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
1232         has_proxies = !H2_BPROXY_LIST_EMPTY(&beam->proxies);
1233         leave_yellow(beam, &bl);
1234     }
1235     return has_proxies;
1236 }
1237 
h2_beam_was_received(h2_bucket_beam * beam)1238 int h2_beam_was_received(h2_bucket_beam *beam)
1239 {
1240     int happend = 0;
1241     h2_beam_lock bl;
1242 
1243     if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
1244         happend = (beam->received_bytes > 0);
1245         leave_yellow(beam, &bl);
1246     }
1247     return happend;
1248 }
1249 
h2_beam_get_files_beamed(h2_bucket_beam * beam)1250 apr_size_t h2_beam_get_files_beamed(h2_bucket_beam *beam)
1251 {
1252     apr_size_t n = 0;
1253     h2_beam_lock bl;
1254 
1255     if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
1256         n = beam->files_beamed;
1257         leave_yellow(beam, &bl);
1258     }
1259     return n;
1260 }
1261 
h2_beam_no_files(void * ctx,h2_bucket_beam * beam,apr_file_t * file)1262 int h2_beam_no_files(void *ctx, h2_bucket_beam *beam, apr_file_t *file)
1263 {
1264     (void)ctx; (void)beam; (void)file;
1265     return 0;
1266 }
1267 
h2_beam_report_consumption(h2_bucket_beam * beam)1268 int h2_beam_report_consumption(h2_bucket_beam *beam)
1269 {
1270     h2_beam_lock bl;
1271     int rv = 0;
1272     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
1273         rv = report_consumption(beam, &bl);
1274         leave_yellow(beam, &bl);
1275     }
1276     return rv;
1277 }
1278 
h2_beam_log(h2_bucket_beam * beam,conn_rec * c,int level,const char * msg)1279 void h2_beam_log(h2_bucket_beam *beam, conn_rec *c, int level, const char *msg)
1280 {
1281     if (beam && APLOG_C_IS_LEVEL(c,level)) {
1282         ap_log_cerror(APLOG_MARK, level, 0, c,
1283                       "beam(%ld-%d,%s,closed=%d,aborted=%d,empty=%d,buf=%ld): %s",
1284                       (c->master? c->master->id : c->id), beam->id, beam->tag,
1285                       beam->closed, beam->aborted, h2_beam_empty(beam),
1286                       (long)h2_beam_get_buffered(beam), msg);
1287     }
1288 }
1289 
1290 
1291