1 /*------------------------------------------------------------------------------
2 *
3 * Copyright (c) 2011-2021, EURid vzw. All rights reserved.
4 * The YADIFA TM software product is provided under the BSD 3-clause license:
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * * Neither the name of EURid nor the names of its contributors may be
16 * used to endorse or promote products derived from this software
17 * without specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
30 *
31 *------------------------------------------------------------------------------
32 *
33 */
34
35 #include <dnscore/process.h>
36 #include "dnscore/dnscore-config.h"
37 #include "dnscore/zalloc.h"
38 #include "dnscore/pool.h"
39 #include "dnscore/mutex.h"
40 #include "dnscore/format.h"
41 #include "dnscore/shared-heap.h"
42 #include "dnscore/format.h"
43 #include "dnscore/logger.h"
44 #include "dnscore/async.h"
45
46 #define ASYNC_WAIT_DUMP 0
47
48 #define ASYNC_NO_TIMEOUT 0
49
50 #define ASYNC_FAKE_SHARED_HEAP 0
51
52 #if __FreeBSD__
53 // To avoid some funky stuff with FreeBSD
54 #define ASYNC_WAIT_FINALIZE_DELAY_COUNT 64
55 #define ASYNC_WAIT_DESTROY_SHARED_DELAY_COUNT 64
56 #else
57 #define ASYNC_WAIT_FINALIZE_DELAY_COUNT 0
58 #define ASYNC_WAIT_DESTROY_SHARED_DELAY_COUNT 0
59 #endif
60
61
62
63 #define MODULE_MSG_HANDLE g_system_logger
64
65 #define ASYNCMSG_TAG 0x47534d434e595341
66 #define ASYNCWAIT_TAG 0x5457434e595341
67
68 struct async_message_wait_args
69 {
70 mutex_t mutex;
71 cond_t cond_wait;
72 s32 wait_count;
73 };
74
75 static pool_s async_message_pool;
76
77 static bool async_message_pool_initialized = FALSE;
78
79 void
async_message_call(async_queue_s * queue,async_message_s * msg)80 async_message_call(async_queue_s *queue, async_message_s *msg)
81 {
82 msg->start_time = timeus();
83
84 #if ASYNC_QUEUE_TYPE == ASYNC_QUEUE_TYPE_DLL
85 threaded_dll_cw_enqueue(&queue->queue, msg);
86 #elif ASYNC_QUEUE_TYPE == ASYNC_QUEUE_TYPE_RINGBUFFER
87 threaded_ringbuffer_cw_enqueue(&queue->queue, msg);
88 #else
89 threaded_queue_enqueue(&queue->queue, msg);
90 #endif
91
92 }
93
94 async_message_s*
async_message_next(async_queue_s * queue)95 async_message_next(async_queue_s *queue)
96 {
97 #if ASYNC_QUEUE_TYPE == ASYNC_QUEUE_TYPE_DLL
98 //async_message_s* async = (async_message_s*)threaded_dll_cw_try_dequeue(&queue->queue);
99 async_message_s* async = (async_message_s*)threaded_dll_cw_dequeue_with_timeout(&queue->queue, /*queue->pace.max_us*/1000000);
100 #elif ASYNC_QUEUE_TYPE == ASYNC_QUEUE_TYPE_RINGBUFFER
101 async_message_s* async = (async_message_s*)threaded_ringbuffer_cw_try_dequeue(&queue->queue);
102 #else
103 async_message_s* async = (async_message_s*)threaded_queue_try_dequeue(&queue->queue);
104 #endif
105
106 if(async == NULL)
107 {
108 pace_wait(&queue->pace);
109 }
110 else
111 {
112 pace_work(&queue->pace);
113 }
114
115 return async;
116 }
117
118 async_message_s*
async_message_try_next(async_queue_s * queue)119 async_message_try_next(async_queue_s *queue)
120 {
121 #if ASYNC_QUEUE_TYPE == ASYNC_QUEUE_TYPE_DLL
122 async_message_s* async = (async_message_s*)threaded_dll_cw_try_dequeue(&queue->queue);
123 #elif ASYNC_QUEUE_TYPE == ASYNC_QUEUE_TYPE_RINGBUFFER
124 async_message_s* async = (async_message_s*)threaded_ringbuffer_cw_try_dequeue(&queue->queue);
125 #else
126 async_message_s* async = (async_message_s*)threaded_queue_try_dequeue(&queue->queue);
127 #endif
128
129 return async;
130 }
131
132 /**
133 *
134 * Initialises a synchronisation point
135 * count is the number of releases to do before the async_wait call returns
136 *
137 * @param aw
138 * @param count
139 * @return
140 */
141
142 void
async_wait_init(async_wait_s * aw,s32 count)143 async_wait_init(async_wait_s *aw, s32 count)
144 {
145 #if ASYNC_WAIT_DUMP
146 formatln("[%5i][%p] async_wait_init(%p, %i)", getpid_ex(), thread_self(), aw, count);flushout();
147 #endif
148
149 mutex_init(&aw->mutex);
150 cond_init(&aw->cond_wait);
151 aw->wait_count = count;
152 aw->error_code = SUCCESS;
153 #if ASYNC_WAIT_TAG
154 aw->tag = 0x50505050;
155 #endif
156 }
157
158 #if ASYNC_WAIT_FINALIZE_DELAY_COUNT > 0
159
160 static mutex_t async_wait_finalize_delay_mtx = MUTEX_INITIALIZER;
161 static async_wait_s* async_wait_finalize_delay[ASYNC_WAIT_FINALIZE_DELAY_COUNT] =
162 {
163 NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL,
164 NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL,
165 NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL,
166 NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL,
167 };
168 static int async_wait_finalize_delay_index = 0;
169
170 static mutex_t async_wait_destroy_delay_mtx = MUTEX_INITIALIZER;
171 static async_wait_s* async_wait_destroy_delay[ASYNC_WAIT_FINALIZE_DELAY_COUNT] =
172 {
173 NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL,
174 NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL,
175 NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL,
176 NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL,
177 };
178 static int async_wait_destroy_delay_index = 0;
179
180
181 #endif
182
183 #if ASYNC_WAIT_DESTROY_SHARED_DELAY_COUNT > 0
184
185 static mutex_t async_wait_destroy_shared_delay_mtx = MUTEX_INITIALIZER;
186 static async_wait_s* async_wait_destroy_shared_delay[ASYNC_WAIT_DESTROY_SHARED_DELAY_COUNT] =
187 {
188 NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL,
189 NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL,
190 NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL,
191 NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL,
192 };
193 static int async_wait_destroy_shared_delay_index = 0;
194
195 #endif
196 /**
197 *
198 * Destroys the synchronisation point
199 *
200 * @param aw
201 * @return
202 */
203 #if ASYNC_WAIT_FINALIZE_DELAY_COUNT > 0
204 static void async_wait_finalize_now(async_wait_s *aw);
205
206 void
async_wait_finalize(async_wait_s * aw)207 async_wait_finalize(async_wait_s *aw)
208 {
209 mutex_lock(&async_wait_finalize_delay_mtx);
210 if(async_wait_finalize_delay[async_wait_finalize_delay_index] != NULL)
211 {
212 async_wait_finalize_now(async_wait_finalize_delay[async_wait_finalize_delay_index]);
213 }
214 async_wait_finalize_delay[async_wait_finalize_delay_index] = aw;
215 async_wait_finalize_delay_index = (async_wait_finalize_delay_index + 1) % ASYNC_WAIT_FINALIZE_DELAY_COUNT;
216 mutex_unlock(&async_wait_finalize_delay_mtx);
217 }
218
219 static void
async_wait_finalize_now(async_wait_s * aw)220 async_wait_finalize_now(async_wait_s *aw)
221 #else
222 void
223 async_wait_finalize(async_wait_s *aw)
224 #endif
225 {
226 #if ASYNC_WAIT_DUMP
227 formatln("[%5i][%p] async_wait_finalize(%p)", getpid_ex(), thread_self(), aw);flushout();
228 #endif
229
230 mutex_lock(&aw->mutex);
231 s32 wait_count = aw->wait_count;
232 mutex_unlock(&aw->mutex);
233
234 if(wait_count > 0)
235 {
236 osformat(termerr, "async_wait_finalize: wait_count = %i > 0: finalisation before logical end of life", wait_count);
237 flusherr();
238 }
239
240 cond_finalize(&aw->cond_wait);
241
242 mutex_destroy(&aw->mutex);
243 #if DEBUG
244 memset(aw, 0x5e, sizeof(async_wait_s));
245 #endif
246 #if ASYNC_WAIT_TAG
247 aw->tag &= 0x10101010;
248 #endif
249 }
250
251 async_wait_s *
async_wait_new_instance(s32 count)252 async_wait_new_instance(s32 count)
253 {
254 async_wait_s *ret;
255 ZALLOC_OBJECT_OR_DIE(ret, async_wait_s, ASYNCWAIT_TAG);
256 async_wait_init(ret, count);
257 return ret;
258 }
259
260 #if ASYNC_WAIT_FINALIZE_DELAY_COUNT > 0
261
262 static void async_wait_destroy_now(async_wait_s *aw);
263
264 void
async_wait_destroy(async_wait_s * aw)265 async_wait_destroy(async_wait_s *aw)
266 {
267 mutex_lock(&async_wait_destroy_delay_mtx);
268 if(async_wait_destroy_delay[async_wait_destroy_delay_index] != NULL)
269 {
270 async_wait_destroy_now(async_wait_destroy_delay[async_wait_destroy_delay_index]);
271 }
272 async_wait_destroy_delay[async_wait_destroy_delay_index] = aw;
273 async_wait_destroy_delay_index = (async_wait_destroy_delay_index + 1) % ASYNC_WAIT_FINALIZE_DELAY_COUNT;
274 mutex_unlock(&async_wait_destroy_delay_mtx);
275 }
276
277 static void
async_wait_destroy_now(async_wait_s * aw)278 async_wait_destroy_now(async_wait_s *aw)
279 {
280 async_wait_finalize_now(aw);
281 ZFREE_OBJECT(aw);
282 }
283 #else
284 void
async_wait_destroy(async_wait_s * aw)285 async_wait_destroy(async_wait_s *aw)
286 {
287 async_wait_finalize(aw);
288 ZFREE_OBJECT(aw);
289 }
290 #endif
291
292 async_wait_s *
async_wait_create_shared(u8 id,s32 count)293 async_wait_create_shared(u8 id, s32 count)
294 {
295 #if !ASYNC_FAKE_SHARED_HEAP
296 async_wait_s *aw = (async_wait_s*)shared_heap_wait_alloc(id, sizeof(async_wait_s));
297 #else
298 async_wait_s *aw = (async_wait_s*)malloc(sizeof(async_wait_s));
299 #endif
300
301 #if ASYNC_WAIT_DUMP
302 formatln("[%5i][%p] async_wait_create_shared(%i,%i) -> %p", getpid_ex(), thread_self(), id, count, aw);flushout();
303 #endif
304
305 assert(aw != NULL);
306
307 int err;
308
309 err = mutex_init_process_shared(&aw->mutex);
310
311 if(err != 0)
312 {
313 logger_handle_msg(g_system_logger,MSG_ERR, "async_wait_create_shared: init condition failed: %r", MAKE_ERRNO_ERROR(err));
314 logger_flush();
315 abort();
316 }
317
318
319 err = cond_init_process_shared(&aw->cond_wait);
320
321 if(err != 0)
322 {
323 logger_handle_msg(g_system_logger,MSG_ERR, "async_wait_create_shared: init condition failed: %r", MAKE_ERRNO_ERROR(err));
324 logger_flush();
325 abort();
326 }
327
328 aw->wait_count = count;
329 aw->error_code = SUCCESS;
330
331 #if ASYNC_WAIT_TAG
332 aw->tag = 0x53535353;
333 #endif
334 return aw;
335 }
336
337 #if ASYNC_WAIT_DESTROY_SHARED_DELAY_COUNT > 0
338 static void async_wait_destroy_shared_now(async_wait_s *aw);
339
340 void
async_wait_destroy_shared(async_wait_s * aw)341 async_wait_destroy_shared(async_wait_s *aw)
342 {
343 mutex_lock(&async_wait_destroy_shared_delay_mtx);
344 if(async_wait_destroy_shared_delay[async_wait_destroy_shared_delay_index] != NULL)
345 {
346 async_wait_destroy_shared_now(async_wait_destroy_shared_delay[async_wait_destroy_shared_delay_index]);
347 }
348 async_wait_destroy_shared_delay[async_wait_destroy_shared_delay_index] = aw;
349 async_wait_destroy_shared_delay_index = (async_wait_destroy_shared_delay_index + 1) % ASYNC_WAIT_DESTROY_SHARED_DELAY_COUNT;
350 mutex_unlock(&async_wait_destroy_shared_delay_mtx);
351 }
352
async_wait_destroy_shared_now(async_wait_s * aw)353 static void async_wait_destroy_shared_now(async_wait_s *aw)
354 {
355 #else
356 void async_wait_destroy_shared(async_wait_s *aw)
357 {
358 #endif
359
360 #if ASYNC_WAIT_DUMP
361 formatln("[%5i][%p] async_wait_destroy_shared(%p)", getpid_ex(), thread_self(), aw);flushout();
362 #endif
363 #if ASYNC_WAIT_FINALIZE_DELAY_COUNT > 0
364 async_wait_finalize_now(aw);
365 #else
366 async_wait_finalize(aw);
367 #endif
368 #if !ASYNC_FAKE_SHARED_HEAP
369 shared_heap_free(aw);
370 #else
371 free(aw);
372 #endif
373 }
374
375 /**
376 * Waits until the count has be reduced to 0 (or below if something bad is going on)
377 *
378 * @param aw
379 * @return
380 */
381
382 void
383 async_wait(async_wait_s *aw)
384 {
385 #if ASYNC_WAIT_DUMP
386 formatln("[%5i][%p] async_wait(%p)", getpid_ex(), thread_self(), aw);flushout();
387 #endif
388
389 int err = mutex_lock_unchecked(&aw->mutex);
390
391 if(err == 0)
392 {
393 while(aw->wait_count > 0)
394 {
395 #if !__FreeBSD__
396 cond_wait(&aw->cond_wait, &aw->mutex);
397 #else
398 cond_timedwait(&aw->cond_wait, &aw->mutex, ONE_SECOND_US);
399 #endif
400 }
401 mutex_unlock(&aw->mutex);
402 }
403 else
404 {
405 formatln("[%5i][%p] async_wait(%p) failed to lock mutex: %r", getpid_ex(), thread_self(), aw, MAKE_ERRNO_ERROR(err));
406 flushout();
407 abort();
408 }
409 }
410
411 bool
412 async_wait_timeout_absolute(async_wait_s *aw, u64 epoch_usec)
413 {
414 #if !ASYNC_NO_TIMEOUT
415 struct timespec ts;
416
417 int err = mutex_lock_unchecked(&aw->mutex);
418 if(err == 0)
419 {
420 ts.tv_sec = epoch_usec / 1000000L;
421 ts.tv_nsec = (epoch_usec % 1000000L) * 1000L;
422
423 s32 awc;
424 while((awc = aw->wait_count) > 0)
425 {
426 int err;
427
428 err = cond_timedwait_absolute_ts(&aw->cond_wait, &aw->mutex, &ts);
429
430 if(err == ETIMEDOUT)
431 {
432 #if ASYNC_WAIT_DUMP
433 formatln("[%5i][%p] async_wait_timeout_absolute(%p,%llu) : TIMEOUT (awc=%i)", getpid_ex(), thread_self(), aw, epoch_usec, awc);
434 #endif
435 break;
436 }
437
438 if(err != 0)
439 {
440 log_err("async_wait_timeout: %r", MAKE_ERRNO_ERROR(err));
441 }
442 }
443
444 err = mutex_unlock_unchecked(&aw->mutex);
445
446 if(err == 0)
447 {
448 return awc == 0;
449 }
450 else
451 {
452 formatln("[%5i][%p] async_wait_timeout_absolute(%p) failed to unlock mutex: %r", getpid_ex(), thread_self(), aw, MAKE_ERRNO_ERROR(err));
453 flushout();
454 abort();
455 }
456 }
457 else
458 {
459 formatln("[%5i][%p] async_wait_timeout_absolute(%p) failed to lock mutex: %r", getpid_ex(), thread_self(), aw, MAKE_ERRNO_ERROR(err));
460 flushout();
461 abort();
462 }
463 #else // ASYNC_NO_TIMEOUT
464
465 // timeout disabled
466
467 async_wait(aw);
468 return TRUE;
469 #endif
470 }
471
472 /**
473 * Returns TRUE if the wait is done, FALSE if it timed-out.
474 */
475
476 bool
477 async_wait_timeout(async_wait_s *aw, u64 usec)
478 {
479 #if ASYNC_WAIT_DUMP
480 formatln("[%5i][%p] async_wait_timeout(%p, %llu)", getpid_ex(), thread_self(), aw, usec);flushout();
481 #endif
482
483
484 usec += timeus();
485
486 return async_wait_timeout_absolute(aw, usec);
487 }
488
489 s32
490 async_wait_get_counter(async_wait_s *aw)
491 {
492 s32 counter;
493
494 mutex_lock(&aw->mutex);
495
496 counter = aw->wait_count;
497
498 mutex_unlock(&aw->mutex);
499
500 return counter;
501 }
502
503 /**
504 *
505 * Decreases the count of that amount
506 *
507 * @param aw
508 * @param count
509 * @return
510 */
511
512 void
513 async_wait_progress(async_wait_s *aw, s32 count)
514 {
515 #if ASYNC_WAIT_DUMP
516 formatln("[%5i][%p] async_wait_progress(%p, %i) (lock)", getpid_ex(), thread_self(), aw, count);
517 #endif
518
519 int err = mutex_lock_unchecked(&aw->mutex);
520 if(err == 0)
521 {
522 if(aw->wait_count - count >= 0)
523 {
524 aw->wait_count -= count;
525
526 cond_notify(&aw->cond_wait);
527
528 #if ASYNC_WAIT_DUMP
529 formatln("[%5i][%p] async_wait_progress(%p, %i) (unlock)", getpid_ex(), thread_self(), aw, count);
530 #endif
531 err = mutex_unlock_unchecked(&aw->mutex);
532 #if ASYNC_WAIT_DUMP
533 formatln("[%5i][%p] async_wait_progress(%p, %i) (done)", getpid_ex(), thread_self(), aw, count);
534 #endif
535 if(err != 0)
536 {
537 formatln("[%5i][%p] async_wait_progress(%p) failed to unlock mutex: %r", getpid_ex(), thread_self(), aw, MAKE_ERRNO_ERROR(err));
538 flushout();
539 abort();
540 }
541 }
542 else
543 {
544 log_err("async_wait_progress: count=%i, trying to add %i", aw->wait_count, count);
545
546 aw->wait_count = 0;
547 cond_notify(&aw->cond_wait);
548
549 void *p = aw;
550 err = mutex_unlock_unchecked(&aw->mutex);
551
552 #if ASYNC_WAIT_DUMP
553 formatln("[%5i][%p] async_wait_progress(%p, %i) (done)", getpid_ex(), thread_self(), p, count);
554 #endif
555 if(err != 0)
556 {
557 formatln("[%5i][%p] async_wait_progress(%p) failed to unlock mutex: %r (alt)", getpid_ex(), thread_self(), p, MAKE_ERRNO_ERROR(err));
558 flushout();
559 abort();
560 }
561 }
562 }
563 else
564 {
565 formatln("[%5i][%p] async_wait_progress(%p) failed to lock mutex: %r", getpid_ex(), thread_self(), aw, MAKE_ERRNO_ERROR(err));
566 flushout();
567 abort();
568 }
569 }
570
571 void
572 async_wait_set_first_error(async_wait_s *aw, s32 error)
573 {
574 mutex_lock(&aw->mutex);
575 if(ISOK(aw->error_code))
576 {
577 aw->error_code = error;
578 }
579 mutex_unlock(&aw->mutex);
580 }
581
582 s32
583 async_wait_get_error(async_wait_s *aw)
584 {
585 s32 err;
586
587 mutex_lock(&aw->mutex);
588
589 err = aw->error_code;
590
591 mutex_unlock(&aw->mutex);
592
593 return err;
594 }
595
596 void
597 async_queue_init(async_queue_s *q, u32 size, u64 min_us, u64 max_us, const char* name)
598 {
599 #if ASYNC_QUEUE_TYPE == ASYNC_QUEUE_TYPE_DLL
600 threaded_dll_cw_init(&q->queue, size);
601 #elif ASYNC_QUEUE_TYPE == ASYNC_QUEUE_TYPE_RINGBUFFER
602 threaded_ringbuffer_cw_init(&q->queue, size);
603 #else
604 threaded_queue_init(&q->queue, size);
605 #endif
606
607 pace_init(&q->pace, min_us, max_us, name);
608 }
609
610 void
611 async_queue_finalize(async_queue_s *q)
612 {
613 s32 n;
614 #if ASYNC_QUEUE_TYPE == ASYNC_QUEUE_TYPE_DLL
615 if((n = threaded_dll_cw_size(&q->queue)) > 0)
616 {
617 log_warn("async_dll_cw_finalize: queue still contains %i items", n);
618 }
619 threaded_dll_cw_finalize(&q->queue);
620 #elif ASYNC_QUEUE_TYPE == ASYNC_QUEUE_TYPE_RINGBUFFER
621 if((n = threaded_ringbuffer_cw_size(&q->queue)) > 0)
622 {
623 log_warn("async_ringbuffer_cw_finalize: queue still contains %i items", n);
624 }
625 threaded_ringbuffer_cw_finalize(&q->queue);
626 #else
627 if((n = threaded_queue_size(&q->queue)) > 0)
628 {
629 log_warn("async_queue_finalize: queue still contains %i items", n);
630 }
631 threaded_queue_finalize(&q->queue);
632 #endif
633 }
634
635 bool
636 async_queue_empty(async_queue_s *q)
637 {
638 #if ASYNC_QUEUE_TYPE == ASYNC_QUEUE_TYPE_DLL
639 return threaded_dll_cw_size(&q->queue) == 0;
640 #elif ASYNC_QUEUE_TYPE == ASYNC_QUEUE_TYPE_RINGBUFFER
641 return threaded_ringbuffer_cw_size(&q->queue) == 0;
642 #else
643 return threaded_queue_size(&q->queue) == 0;
644 #endif
645
646 }
647
648 u32
649 async_queue_size(async_queue_s *q)
650 {
651 #if ASYNC_QUEUE_TYPE == ASYNC_QUEUE_TYPE_DLL
652 return (u32)threaded_dll_cw_size(&q->queue);
653 #elif ASYNC_QUEUE_TYPE == ASYNC_QUEUE_TYPE_RINGBUFFER
654 return threaded_ringbuffer_cw_size(&q->queue);
655 #else
656 return threaded_queue_size(&q->queue);
657 #endif
658 }
659
660 static void
661 async_message_wait_handler(struct async_message_s *msg)
662 {
663 struct async_wait_s *args = (struct async_wait_s *)msg->handler_args;
664
665 async_wait_progress(args, 1);
666 }
667
668 static void
669 async_message_nop_handler(struct async_message_s *msg)
670 {
671 (void)msg;
672 }
673
674 static void
675 async_message_release_handler(struct async_message_s *msg)
676 {
677 async_message_release(msg);
678 }
679
680 int
681 async_message_call_and_wait(async_queue_s *queue, async_message_s *msg)
682 {
683 async_done_callback *old_handler = msg->handler;
684 void *old_handler_args = msg->handler_args;
685
686 #if __FreeBSD__
687 struct async_wait_s* message_wait_argsp = async_wait_new_instance(1);
688 #else
689 struct async_wait_s message_wait_args;
690 struct async_wait_s* message_wait_argsp = &message_wait_args;
691 async_wait_init(message_wait_argsp, 1);
692 #endif
693 msg->error_code = SUCCESS;
694 msg->handler = async_message_wait_handler;
695 msg->handler_args = message_wait_argsp;
696
697 async_message_call(queue, msg);
698
699 async_wait(message_wait_argsp);
700
701 msg->handler = old_handler;
702 msg->handler_args = old_handler_args;
703
704 u64 wait_time = (u64)(timeus() - msg->start_time);
705
706 #if __FreeBSD__
707 async_wait_destroy(message_wait_argsp);
708 #else
709 async_wait_finalize(message_wait_argsp); // local stack
710 #endif
711
712 log_debug5("async waited %lluus on '%i@%s'", wait_time, msg->id, queue->pace.name);
713
714 return msg->error_code;
715 }
716
717 void
718 async_message_call_and_forget(async_queue_s *queue, async_message_s *msg)
719 {
720 msg->handler = async_message_nop_handler;
721 msg->handler_args = NULL;
722
723 async_message_call(queue, msg);
724 }
725
726 void
727 async_message_call_and_release(async_queue_s *queue, async_message_s *msg)
728 {
729 msg->handler = async_message_release_handler;
730 msg->handler_args = NULL;
731
732 async_message_call(queue, msg);
733 }
734
735 static void *
736 async_message_pool_alloc(void *_ignored_)
737 {
738 async_message_s *msg;
739
740 (void)_ignored_;
741
742 ZALLOC_OBJECT_OR_DIE( msg, async_message_s, ASYNCMSG_TAG); // POOL
743 ZEROMEMORY(msg, sizeof(async_message_s)); // false positive: msg cannot be NULL
744 return msg;
745 }
746
747 static void
748 async_message_pool_free(void *msg, void *_ignored_)
749 {
750 (void)_ignored_;
751
752 memset(msg, 0xe2, sizeof(async_message_s));
753 ZFREE(msg, async_message_s); // POOL
754 }
755
756 void
757 async_message_pool_init()
758 {
759 if(!async_message_pool_initialized)
760 {
761 pool_init(&async_message_pool, async_message_pool_alloc, async_message_pool_free, NULL, "async message");
762 pool_set_size(&async_message_pool, 0x80000);
763 // for valgrind
764 #ifdef VALGRIND_FRIENDLY
765 pool_set_size(&async_message_pool, 0);
766 #endif
767
768 async_message_pool_initialized = TRUE;
769 }
770 }
771
772 void
773 async_message_pool_finalize()
774 {
775 if(async_message_pool_initialized)
776 {
777 pool_finalize(&async_message_pool);
778
779 #if ASYNC_WAIT_FINALIZE_DELAY_COUNT > 0
780 mutex_lock(&async_wait_finalize_delay_mtx);
781 for(int i = 0; i < ASYNC_WAIT_DESTROY_SHARED_DELAY_COUNT; ++i)
782 {
783 if(async_wait_finalize_delay[i] != NULL)
784 {
785 async_wait_finalize_now(async_wait_finalize_delay[i]);
786 async_wait_finalize_delay[i] = NULL;
787 }
788 }
789 mutex_unlock(&async_wait_finalize_delay_mtx);
790
791 mutex_lock(&async_wait_destroy_delay_mtx);
792 for(int i = 0; i < ASYNC_WAIT_DESTROY_SHARED_DELAY_COUNT; ++i)
793 {
794 if(async_wait_destroy_delay[i] != NULL)
795 {
796 async_wait_destroy_now(async_wait_destroy_delay[i]);
797 async_wait_destroy_delay[i] = NULL;
798 }
799 }
800 mutex_unlock(&async_wait_destroy_delay_mtx);
801 #endif
802
803 #if ASYNC_WAIT_DESTROY_SHARED_DELAY_COUNT > 0
804 mutex_lock(&async_wait_destroy_shared_delay_mtx);
805 for(int i = 0; i < ASYNC_WAIT_DESTROY_SHARED_DELAY_COUNT; ++i)
806 {
807 if(async_wait_destroy_shared_delay[i] != NULL)
808 {
809 async_wait_destroy_shared_now(async_wait_destroy_shared_delay[i]);
810 async_wait_destroy_shared_delay[i] = NULL;
811 }
812 }
813 mutex_unlock(&async_wait_destroy_shared_delay_mtx);
814 #endif
815 async_message_pool_initialized = FALSE;
816 }
817 }
818
819 async_message_s*
820 async_message_alloc()
821 {
822 async_message_s *msg = (async_message_s *)pool_alloc(&async_message_pool);
823 ZEROMEMORY(msg, sizeof(async_message_s));
824 return msg;
825 }
826
827 void async_message_release(async_message_s *msg)
828 {
829 memset(msg, 0xe3, sizeof(async_message_s));
830 pool_release(&async_message_pool, msg);
831 }
832