1 /*------------------------------------------------------------------------------
2  *
3  * Copyright (c) 2011-2021, EURid vzw. All rights reserved.
4  * The YADIFA TM software product is provided under the BSD 3-clause license:
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  *
10  *        * Redistributions of source code must retain the above copyright
11  *          notice, this list of conditions and the following disclaimer.
12  *        * Redistributions in binary form must reproduce the above copyright
13  *          notice, this list of conditions and the following disclaimer in the
14  *          documentation and/or other materials provided with the distribution.
15  *        * Neither the name of EURid nor the names of its contributors may be
16  *          used to endorse or promote products derived from this software
17  *          without specific prior written permission.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
23  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29  * POSSIBILITY OF SUCH DAMAGE.
30  *
31  *------------------------------------------------------------------------------
32  *
33  */
34 
35 #include <dnscore/process.h>
36 #include "dnscore/dnscore-config.h"
37 #include "dnscore/zalloc.h"
38 #include "dnscore/pool.h"
39 #include "dnscore/mutex.h"
40 #include "dnscore/format.h"
41 #include "dnscore/shared-heap.h"
42 #include "dnscore/format.h"
43 #include "dnscore/logger.h"
44 #include "dnscore/async.h"
45 
46 #define ASYNC_WAIT_DUMP 0
47 
48 #define ASYNC_NO_TIMEOUT 0
49 
50 #define ASYNC_FAKE_SHARED_HEAP 0
51 
52 #if __FreeBSD__
53 // To avoid some funky stuff with FreeBSD
54 #define ASYNC_WAIT_FINALIZE_DELAY_COUNT 64
55 #define ASYNC_WAIT_DESTROY_SHARED_DELAY_COUNT 64
56 #else
57 #define ASYNC_WAIT_FINALIZE_DELAY_COUNT 0
58 #define ASYNC_WAIT_DESTROY_SHARED_DELAY_COUNT 0
59 #endif
60 
61 
62 
63 #define MODULE_MSG_HANDLE g_system_logger
64 
65 #define ASYNCMSG_TAG  0x47534d434e595341
66 #define ASYNCWAIT_TAG 0x5457434e595341
67 
68 struct async_message_wait_args
69 {
70     mutex_t mutex;
71     cond_t  cond_wait;
72     s32 wait_count;
73 };
74 
75 static pool_s async_message_pool;
76 
77 static bool async_message_pool_initialized = FALSE;
78 
79 void
async_message_call(async_queue_s * queue,async_message_s * msg)80 async_message_call(async_queue_s *queue, async_message_s *msg)
81 {
82     msg->start_time = timeus();
83 
84 #if ASYNC_QUEUE_TYPE == ASYNC_QUEUE_TYPE_DLL
85     threaded_dll_cw_enqueue(&queue->queue, msg);
86 #elif ASYNC_QUEUE_TYPE == ASYNC_QUEUE_TYPE_RINGBUFFER
87     threaded_ringbuffer_cw_enqueue(&queue->queue, msg);
88 #else
89     threaded_queue_enqueue(&queue->queue, msg);
90 #endif
91 
92 }
93 
94 async_message_s*
async_message_next(async_queue_s * queue)95 async_message_next(async_queue_s *queue)
96 {
97 #if ASYNC_QUEUE_TYPE == ASYNC_QUEUE_TYPE_DLL
98     //async_message_s* async = (async_message_s*)threaded_dll_cw_try_dequeue(&queue->queue);
99     async_message_s* async = (async_message_s*)threaded_dll_cw_dequeue_with_timeout(&queue->queue, /*queue->pace.max_us*/1000000);
100 #elif ASYNC_QUEUE_TYPE == ASYNC_QUEUE_TYPE_RINGBUFFER
101     async_message_s* async = (async_message_s*)threaded_ringbuffer_cw_try_dequeue(&queue->queue);
102 #else
103     async_message_s* async = (async_message_s*)threaded_queue_try_dequeue(&queue->queue);
104 #endif
105 
106     if(async == NULL)
107     {
108         pace_wait(&queue->pace);
109     }
110     else
111     {
112         pace_work(&queue->pace);
113     }
114 
115     return async;
116 }
117 
118 async_message_s*
async_message_try_next(async_queue_s * queue)119 async_message_try_next(async_queue_s *queue)
120 {
121 #if ASYNC_QUEUE_TYPE == ASYNC_QUEUE_TYPE_DLL
122     async_message_s* async = (async_message_s*)threaded_dll_cw_try_dequeue(&queue->queue);
123 #elif ASYNC_QUEUE_TYPE == ASYNC_QUEUE_TYPE_RINGBUFFER
124     async_message_s* async = (async_message_s*)threaded_ringbuffer_cw_try_dequeue(&queue->queue);
125 #else
126     async_message_s* async = (async_message_s*)threaded_queue_try_dequeue(&queue->queue);
127 #endif
128 
129     return async;
130 }
131 
132 /**
133  *
134  * Initialises a synchronisation point
135  * count is the number of releases to do before the async_wait call returns
136  *
137  * @param aw
138  * @param count
139  * @return
140  */
141 
142 void
async_wait_init(async_wait_s * aw,s32 count)143 async_wait_init(async_wait_s *aw, s32 count)
144 {
145 #if ASYNC_WAIT_DUMP
146     formatln("[%5i][%p] async_wait_init(%p, %i)", getpid_ex(), thread_self(), aw, count);flushout();
147 #endif
148 
149     mutex_init(&aw->mutex);
150     cond_init(&aw->cond_wait);
151     aw->wait_count = count;
152     aw->error_code = SUCCESS;
153 #if ASYNC_WAIT_TAG
154     aw->tag = 0x50505050;
155 #endif
156 }
157 
158 #if ASYNC_WAIT_FINALIZE_DELAY_COUNT > 0
159 
160 static mutex_t async_wait_finalize_delay_mtx = MUTEX_INITIALIZER;
161 static async_wait_s* async_wait_finalize_delay[ASYNC_WAIT_FINALIZE_DELAY_COUNT] =
162 {
163     NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL,
164     NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL,
165     NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL,
166     NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL,
167 };
168 static int async_wait_finalize_delay_index = 0;
169 
170 static mutex_t async_wait_destroy_delay_mtx = MUTEX_INITIALIZER;
171 static async_wait_s* async_wait_destroy_delay[ASYNC_WAIT_FINALIZE_DELAY_COUNT] =
172     {
173         NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL,
174         NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL,
175         NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL,
176         NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL,
177     };
178 static int async_wait_destroy_delay_index = 0;
179 
180 
181 #endif
182 
183 #if ASYNC_WAIT_DESTROY_SHARED_DELAY_COUNT > 0
184 
185 static mutex_t async_wait_destroy_shared_delay_mtx = MUTEX_INITIALIZER;
186 static async_wait_s* async_wait_destroy_shared_delay[ASYNC_WAIT_DESTROY_SHARED_DELAY_COUNT] =
187     {
188         NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL,
189         NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL,
190         NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL,
191         NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL,
192     };
193 static int async_wait_destroy_shared_delay_index = 0;
194 
195 #endif
196 /**
197  *
198  * Destroys the synchronisation point
199  *
200  * @param aw
201  * @return
202  */
203 #if ASYNC_WAIT_FINALIZE_DELAY_COUNT > 0
204 static void async_wait_finalize_now(async_wait_s *aw);
205 
206 void
async_wait_finalize(async_wait_s * aw)207 async_wait_finalize(async_wait_s *aw)
208 {
209     mutex_lock(&async_wait_finalize_delay_mtx);
210     if(async_wait_finalize_delay[async_wait_finalize_delay_index] != NULL)
211     {
212         async_wait_finalize_now(async_wait_finalize_delay[async_wait_finalize_delay_index]);
213     }
214     async_wait_finalize_delay[async_wait_finalize_delay_index] = aw;
215     async_wait_finalize_delay_index = (async_wait_finalize_delay_index + 1) % ASYNC_WAIT_FINALIZE_DELAY_COUNT;
216     mutex_unlock(&async_wait_finalize_delay_mtx);
217 }
218 
219 static void
async_wait_finalize_now(async_wait_s * aw)220 async_wait_finalize_now(async_wait_s *aw)
221 #else
222 void
223 async_wait_finalize(async_wait_s *aw)
224 #endif
225 {
226 #if ASYNC_WAIT_DUMP
227     formatln("[%5i][%p] async_wait_finalize(%p)", getpid_ex(), thread_self(), aw);flushout();
228 #endif
229 
230     mutex_lock(&aw->mutex);
231     s32 wait_count = aw->wait_count;
232     mutex_unlock(&aw->mutex);
233 
234     if(wait_count > 0)
235     {
236         osformat(termerr, "async_wait_finalize: wait_count = %i > 0: finalisation before logical end of life", wait_count);
237         flusherr();
238     }
239 
240     cond_finalize(&aw->cond_wait);
241 
242     mutex_destroy(&aw->mutex);
243 #if DEBUG
244     memset(aw, 0x5e, sizeof(async_wait_s));
245 #endif
246 #if ASYNC_WAIT_TAG
247     aw->tag &= 0x10101010;
248 #endif
249 }
250 
251 async_wait_s *
async_wait_new_instance(s32 count)252 async_wait_new_instance(s32 count)
253 {
254     async_wait_s *ret;
255     ZALLOC_OBJECT_OR_DIE(ret, async_wait_s, ASYNCWAIT_TAG);
256     async_wait_init(ret, count);
257     return ret;
258 }
259 
260 #if ASYNC_WAIT_FINALIZE_DELAY_COUNT > 0
261 
262 static void async_wait_destroy_now(async_wait_s *aw);
263 
264 void
async_wait_destroy(async_wait_s * aw)265 async_wait_destroy(async_wait_s *aw)
266 {
267     mutex_lock(&async_wait_destroy_delay_mtx);
268     if(async_wait_destroy_delay[async_wait_destroy_delay_index] != NULL)
269     {
270         async_wait_destroy_now(async_wait_destroy_delay[async_wait_destroy_delay_index]);
271     }
272     async_wait_destroy_delay[async_wait_destroy_delay_index] = aw;
273     async_wait_destroy_delay_index = (async_wait_destroy_delay_index + 1) % ASYNC_WAIT_FINALIZE_DELAY_COUNT;
274     mutex_unlock(&async_wait_destroy_delay_mtx);
275 }
276 
277 static void
async_wait_destroy_now(async_wait_s * aw)278 async_wait_destroy_now(async_wait_s *aw)
279 {
280     async_wait_finalize_now(aw);
281     ZFREE_OBJECT(aw);
282 }
283 #else
284 void
async_wait_destroy(async_wait_s * aw)285 async_wait_destroy(async_wait_s *aw)
286 {
287     async_wait_finalize(aw);
288     ZFREE_OBJECT(aw);
289 }
290 #endif
291 
292 async_wait_s *
async_wait_create_shared(u8 id,s32 count)293 async_wait_create_shared(u8 id, s32 count)
294 {
295 #if !ASYNC_FAKE_SHARED_HEAP
296     async_wait_s *aw = (async_wait_s*)shared_heap_wait_alloc(id, sizeof(async_wait_s));
297 #else
298     async_wait_s *aw = (async_wait_s*)malloc(sizeof(async_wait_s));
299 #endif
300 
301 #if ASYNC_WAIT_DUMP
302     formatln("[%5i][%p] async_wait_create_shared(%i,%i) -> %p", getpid_ex(), thread_self(), id, count, aw);flushout();
303 #endif
304 
305     assert(aw != NULL);
306 
307     int err;
308 
309     err = mutex_init_process_shared(&aw->mutex);
310 
311     if(err != 0)
312     {
313         logger_handle_msg(g_system_logger,MSG_ERR, "async_wait_create_shared: init condition failed: %r", MAKE_ERRNO_ERROR(err));
314         logger_flush();
315         abort();
316     }
317 
318 
319     err = cond_init_process_shared(&aw->cond_wait);
320 
321     if(err != 0)
322     {
323         logger_handle_msg(g_system_logger,MSG_ERR, "async_wait_create_shared: init condition failed: %r", MAKE_ERRNO_ERROR(err));
324         logger_flush();
325         abort();
326     }
327 
328     aw->wait_count = count;
329     aw->error_code = SUCCESS;
330 
331 #if ASYNC_WAIT_TAG
332     aw->tag = 0x53535353;
333 #endif
334     return aw;
335 }
336 
337 #if ASYNC_WAIT_DESTROY_SHARED_DELAY_COUNT > 0
338 static void async_wait_destroy_shared_now(async_wait_s *aw);
339 
340 void
async_wait_destroy_shared(async_wait_s * aw)341 async_wait_destroy_shared(async_wait_s *aw)
342 {
343     mutex_lock(&async_wait_destroy_shared_delay_mtx);
344     if(async_wait_destroy_shared_delay[async_wait_destroy_shared_delay_index] != NULL)
345     {
346         async_wait_destroy_shared_now(async_wait_destroy_shared_delay[async_wait_destroy_shared_delay_index]);
347     }
348     async_wait_destroy_shared_delay[async_wait_destroy_shared_delay_index] = aw;
349     async_wait_destroy_shared_delay_index = (async_wait_destroy_shared_delay_index + 1) % ASYNC_WAIT_DESTROY_SHARED_DELAY_COUNT;
350     mutex_unlock(&async_wait_destroy_shared_delay_mtx);
351 }
352 
async_wait_destroy_shared_now(async_wait_s * aw)353 static void async_wait_destroy_shared_now(async_wait_s *aw)
354 {
355 #else
356 void async_wait_destroy_shared(async_wait_s *aw)
357 {
358 #endif
359 
360 #if ASYNC_WAIT_DUMP
361     formatln("[%5i][%p] async_wait_destroy_shared(%p)", getpid_ex(), thread_self(), aw);flushout();
362 #endif
363 #if ASYNC_WAIT_FINALIZE_DELAY_COUNT > 0
364     async_wait_finalize_now(aw);
365 #else
366     async_wait_finalize(aw);
367 #endif
368 #if !ASYNC_FAKE_SHARED_HEAP
369     shared_heap_free(aw);
370 #else
371     free(aw);
372 #endif
373 }
374 
375 /**
376  * Waits until the count has be reduced to 0 (or below if something bad is going on)
377  *
378  * @param aw
379  * @return
380  */
381 
382 void
383 async_wait(async_wait_s *aw)
384 {
385 #if ASYNC_WAIT_DUMP
386     formatln("[%5i][%p] async_wait(%p)", getpid_ex(), thread_self(), aw);flushout();
387 #endif
388 
389     int err = mutex_lock_unchecked(&aw->mutex);
390 
391     if(err == 0)
392     {
393         while(aw->wait_count > 0)
394         {
395 #if !__FreeBSD__
396             cond_wait(&aw->cond_wait, &aw->mutex);
397 #else
398             cond_timedwait(&aw->cond_wait, &aw->mutex, ONE_SECOND_US);
399 #endif
400         }
401         mutex_unlock(&aw->mutex);
402     }
403     else
404     {
405         formatln("[%5i][%p] async_wait(%p) failed to lock mutex: %r", getpid_ex(), thread_self(), aw, MAKE_ERRNO_ERROR(err));
406         flushout();
407         abort();
408     }
409 }
410 
411 bool
412 async_wait_timeout_absolute(async_wait_s *aw, u64 epoch_usec)
413 {
414 #if !ASYNC_NO_TIMEOUT
415     struct timespec ts;
416 
417     int err = mutex_lock_unchecked(&aw->mutex);
418     if(err == 0)
419     {
420         ts.tv_sec = epoch_usec / 1000000L;
421         ts.tv_nsec = (epoch_usec % 1000000L) * 1000L;
422 
423         s32 awc;
424         while((awc = aw->wait_count) > 0)
425         {
426             int err;
427 
428             err = cond_timedwait_absolute_ts(&aw->cond_wait, &aw->mutex, &ts);
429 
430             if(err == ETIMEDOUT)
431             {
432 #if ASYNC_WAIT_DUMP
433                 formatln("[%5i][%p] async_wait_timeout_absolute(%p,%llu) : TIMEOUT (awc=%i)", getpid_ex(), thread_self(), aw, epoch_usec, awc);
434 #endif
435                 break;
436             }
437 
438             if(err != 0)
439             {
440                 log_err("async_wait_timeout: %r", MAKE_ERRNO_ERROR(err));
441             }
442         }
443 
444         err = mutex_unlock_unchecked(&aw->mutex);
445 
446         if(err == 0)
447         {
448             return awc == 0;
449         }
450         else
451         {
452             formatln("[%5i][%p] async_wait_timeout_absolute(%p) failed to unlock mutex: %r", getpid_ex(), thread_self(), aw, MAKE_ERRNO_ERROR(err));
453             flushout();
454             abort();
455         }
456     }
457     else
458     {
459         formatln("[%5i][%p] async_wait_timeout_absolute(%p) failed to lock mutex: %r", getpid_ex(), thread_self(), aw, MAKE_ERRNO_ERROR(err));
460         flushout();
461         abort();
462     }
463 #else // ASYNC_NO_TIMEOUT
464 
465     // timeout disabled
466 
467     async_wait(aw);
468     return TRUE;
469 #endif
470 }
471 
472 /**
473  * Returns TRUE if the wait is done, FALSE if it timed-out.
474  */
475 
476 bool
477 async_wait_timeout(async_wait_s *aw, u64 usec)
478 {
479 #if ASYNC_WAIT_DUMP
480     formatln("[%5i][%p] async_wait_timeout(%p, %llu)", getpid_ex(), thread_self(), aw, usec);flushout();
481 #endif
482 
483 
484     usec += timeus();
485 
486     return async_wait_timeout_absolute(aw, usec);
487 }
488 
489 s32
490 async_wait_get_counter(async_wait_s *aw)
491 {
492     s32 counter;
493 
494     mutex_lock(&aw->mutex);
495 
496     counter = aw->wait_count;
497 
498     mutex_unlock(&aw->mutex);
499 
500     return counter;
501 }
502 
503 /**
504  *
505  * Decreases the count of that amount
506  *
507  * @param aw
508  * @param count
509  * @return
510  */
511 
512 void
513 async_wait_progress(async_wait_s *aw, s32 count)
514 {
515 #if ASYNC_WAIT_DUMP
516     formatln("[%5i][%p] async_wait_progress(%p, %i) (lock)", getpid_ex(), thread_self(), aw, count);
517 #endif
518 
519     int err = mutex_lock_unchecked(&aw->mutex);
520     if(err == 0)
521     {
522         if(aw->wait_count - count >= 0)
523         {
524             aw->wait_count -= count;
525 
526             cond_notify(&aw->cond_wait);
527 
528 #if ASYNC_WAIT_DUMP
529             formatln("[%5i][%p] async_wait_progress(%p, %i) (unlock)", getpid_ex(), thread_self(), aw, count);
530 #endif
531             err = mutex_unlock_unchecked(&aw->mutex);
532 #if ASYNC_WAIT_DUMP
533             formatln("[%5i][%p] async_wait_progress(%p, %i) (done)", getpid_ex(), thread_self(), aw, count);
534 #endif
535             if(err != 0)
536             {
537                 formatln("[%5i][%p] async_wait_progress(%p) failed to unlock mutex: %r", getpid_ex(), thread_self(), aw, MAKE_ERRNO_ERROR(err));
538                 flushout();
539                 abort();
540             }
541         }
542         else
543         {
544             log_err("async_wait_progress: count=%i, trying to add %i", aw->wait_count, count);
545 
546             aw->wait_count = 0;
547             cond_notify(&aw->cond_wait);
548 
549             void *p = aw;
550             err = mutex_unlock_unchecked(&aw->mutex);
551 
552 #if ASYNC_WAIT_DUMP
553             formatln("[%5i][%p] async_wait_progress(%p, %i) (done)", getpid_ex(), thread_self(), p, count);
554 #endif
555             if(err != 0)
556             {
557                 formatln("[%5i][%p] async_wait_progress(%p) failed to unlock mutex: %r (alt)", getpid_ex(), thread_self(), p, MAKE_ERRNO_ERROR(err));
558                 flushout();
559                 abort();
560             }
561         }
562     }
563     else
564     {
565         formatln("[%5i][%p] async_wait_progress(%p) failed to lock mutex: %r", getpid_ex(), thread_self(), aw, MAKE_ERRNO_ERROR(err));
566         flushout();
567         abort();
568     }
569 }
570 
571 void
572 async_wait_set_first_error(async_wait_s *aw, s32 error)
573 {
574     mutex_lock(&aw->mutex);
575     if(ISOK(aw->error_code))
576     {
577         aw->error_code = error;
578     }
579     mutex_unlock(&aw->mutex);
580 }
581 
582 s32
583 async_wait_get_error(async_wait_s *aw)
584 {
585     s32 err;
586 
587     mutex_lock(&aw->mutex);
588 
589     err = aw->error_code;
590 
591     mutex_unlock(&aw->mutex);
592 
593     return err;
594 }
595 
596 void
597 async_queue_init(async_queue_s *q, u32 size, u64 min_us, u64 max_us, const char* name)
598 {
599 #if ASYNC_QUEUE_TYPE == ASYNC_QUEUE_TYPE_DLL
600     threaded_dll_cw_init(&q->queue, size);
601 #elif ASYNC_QUEUE_TYPE == ASYNC_QUEUE_TYPE_RINGBUFFER
602     threaded_ringbuffer_cw_init(&q->queue, size);
603 #else
604     threaded_queue_init(&q->queue, size);
605 #endif
606 
607     pace_init(&q->pace, min_us, max_us, name);
608 }
609 
610 void
611 async_queue_finalize(async_queue_s *q)
612 {
613     s32 n;
614 #if ASYNC_QUEUE_TYPE == ASYNC_QUEUE_TYPE_DLL
615     if((n = threaded_dll_cw_size(&q->queue)) > 0)
616     {
617         log_warn("async_dll_cw_finalize: queue still contains %i items", n);
618     }
619     threaded_dll_cw_finalize(&q->queue);
620 #elif ASYNC_QUEUE_TYPE == ASYNC_QUEUE_TYPE_RINGBUFFER
621     if((n = threaded_ringbuffer_cw_size(&q->queue)) > 0)
622     {
623         log_warn("async_ringbuffer_cw_finalize: queue still contains %i items", n);
624     }
625     threaded_ringbuffer_cw_finalize(&q->queue);
626 #else
627     if((n = threaded_queue_size(&q->queue)) > 0)
628     {
629         log_warn("async_queue_finalize: queue still contains %i items", n);
630     }
631     threaded_queue_finalize(&q->queue);
632 #endif
633 }
634 
635 bool
636 async_queue_empty(async_queue_s *q)
637 {
638 #if ASYNC_QUEUE_TYPE == ASYNC_QUEUE_TYPE_DLL
639     return threaded_dll_cw_size(&q->queue) == 0;
640 #elif ASYNC_QUEUE_TYPE == ASYNC_QUEUE_TYPE_RINGBUFFER
641     return threaded_ringbuffer_cw_size(&q->queue) == 0;
642 #else
643     return threaded_queue_size(&q->queue) == 0;
644 #endif
645 
646 }
647 
648 u32
649 async_queue_size(async_queue_s *q)
650 {
651 #if ASYNC_QUEUE_TYPE == ASYNC_QUEUE_TYPE_DLL
652     return (u32)threaded_dll_cw_size(&q->queue);
653 #elif ASYNC_QUEUE_TYPE == ASYNC_QUEUE_TYPE_RINGBUFFER
654     return threaded_ringbuffer_cw_size(&q->queue);
655 #else
656     return threaded_queue_size(&q->queue);
657 #endif
658 }
659 
660 static void
661 async_message_wait_handler(struct async_message_s *msg)
662 {
663     struct async_wait_s *args = (struct async_wait_s *)msg->handler_args;
664 
665     async_wait_progress(args, 1);
666 }
667 
668 static void
669 async_message_nop_handler(struct async_message_s *msg)
670 {
671     (void)msg;
672 }
673 
674 static void
675 async_message_release_handler(struct async_message_s *msg)
676 {
677     async_message_release(msg);
678 }
679 
680 int
681 async_message_call_and_wait(async_queue_s *queue, async_message_s *msg)
682 {
683     async_done_callback *old_handler = msg->handler;
684     void *old_handler_args = msg->handler_args;
685 
686 #if __FreeBSD__
687     struct async_wait_s* message_wait_argsp = async_wait_new_instance(1);
688 #else
689     struct async_wait_s message_wait_args;
690     struct async_wait_s* message_wait_argsp = &message_wait_args;
691     async_wait_init(message_wait_argsp, 1);
692 #endif
693     msg->error_code = SUCCESS;
694     msg->handler = async_message_wait_handler;
695     msg->handler_args = message_wait_argsp;
696 
697     async_message_call(queue, msg);
698 
699     async_wait(message_wait_argsp);
700 
701     msg->handler = old_handler;
702     msg->handler_args = old_handler_args;
703 
704     u64 wait_time = (u64)(timeus() - msg->start_time);
705 
706 #if __FreeBSD__
707     async_wait_destroy(message_wait_argsp);
708 #else
709     async_wait_finalize(message_wait_argsp); // local stack
710 #endif
711 
712     log_debug5("async waited %lluus on '%i@%s'", wait_time, msg->id, queue->pace.name);
713 
714     return msg->error_code;
715 }
716 
717 void
718 async_message_call_and_forget(async_queue_s *queue, async_message_s *msg)
719 {
720     msg->handler = async_message_nop_handler;
721     msg->handler_args = NULL;
722 
723     async_message_call(queue, msg);
724 }
725 
726 void
727 async_message_call_and_release(async_queue_s *queue, async_message_s *msg)
728 {
729     msg->handler = async_message_release_handler;
730     msg->handler_args = NULL;
731 
732     async_message_call(queue, msg);
733 }
734 
735 static void *
736 async_message_pool_alloc(void *_ignored_)
737 {
738     async_message_s *msg;
739 
740     (void)_ignored_;
741 
742     ZALLOC_OBJECT_OR_DIE( msg, async_message_s, ASYNCMSG_TAG); // POOL
743     ZEROMEMORY(msg, sizeof(async_message_s)); // false positive: msg cannot be NULL
744     return msg;
745 }
746 
747 static void
748 async_message_pool_free(void *msg, void *_ignored_)
749 {
750     (void)_ignored_;
751 
752     memset(msg, 0xe2, sizeof(async_message_s));
753     ZFREE(msg, async_message_s); // POOL
754 }
755 
756 void
757 async_message_pool_init()
758 {
759     if(!async_message_pool_initialized)
760     {
761         pool_init(&async_message_pool, async_message_pool_alloc, async_message_pool_free, NULL, "async message");
762         pool_set_size(&async_message_pool, 0x80000);
763         // for valgrind
764 #ifdef VALGRIND_FRIENDLY
765         pool_set_size(&async_message_pool, 0);
766 #endif
767 
768         async_message_pool_initialized = TRUE;
769     }
770 }
771 
772 void
773 async_message_pool_finalize()
774 {
775     if(async_message_pool_initialized)
776     {
777         pool_finalize(&async_message_pool);
778 
779 #if ASYNC_WAIT_FINALIZE_DELAY_COUNT > 0
780         mutex_lock(&async_wait_finalize_delay_mtx);
781         for(int i = 0; i < ASYNC_WAIT_DESTROY_SHARED_DELAY_COUNT; ++i)
782         {
783             if(async_wait_finalize_delay[i] != NULL)
784             {
785                 async_wait_finalize_now(async_wait_finalize_delay[i]);
786                 async_wait_finalize_delay[i] = NULL;
787             }
788         }
789         mutex_unlock(&async_wait_finalize_delay_mtx);
790 
791         mutex_lock(&async_wait_destroy_delay_mtx);
792         for(int i = 0; i < ASYNC_WAIT_DESTROY_SHARED_DELAY_COUNT; ++i)
793         {
794             if(async_wait_destroy_delay[i] != NULL)
795             {
796                 async_wait_destroy_now(async_wait_destroy_delay[i]);
797                 async_wait_destroy_delay[i] = NULL;
798             }
799         }
800         mutex_unlock(&async_wait_destroy_delay_mtx);
801 #endif
802 
803 #if ASYNC_WAIT_DESTROY_SHARED_DELAY_COUNT > 0
804         mutex_lock(&async_wait_destroy_shared_delay_mtx);
805         for(int i = 0; i < ASYNC_WAIT_DESTROY_SHARED_DELAY_COUNT; ++i)
806         {
807             if(async_wait_destroy_shared_delay[i] != NULL)
808             {
809                 async_wait_destroy_shared_now(async_wait_destroy_shared_delay[i]);
810                 async_wait_destroy_shared_delay[i] = NULL;
811             }
812         }
813         mutex_unlock(&async_wait_destroy_shared_delay_mtx);
814 #endif
815         async_message_pool_initialized = FALSE;
816     }
817 }
818 
819 async_message_s*
820 async_message_alloc()
821 {
822     async_message_s *msg = (async_message_s *)pool_alloc(&async_message_pool);
823     ZEROMEMORY(msg, sizeof(async_message_s));
824     return msg;
825 }
826 
827 void async_message_release(async_message_s *msg)
828 {
829     memset(msg, 0xe3, sizeof(async_message_s));
830     pool_release(&async_message_pool, msg);
831 }
832