1 /*------------------------------------------------------------------------------
2 *
3 * Copyright (c) 2011-2021, EURid vzw. All rights reserved.
4 * The YADIFA TM software product is provided under the BSD 3-clause license:
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * * Neither the name of EURid nor the names of its contributors may be
16 * used to endorse or promote products derived from this software
17 * without specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
30 *
31 *------------------------------------------------------------------------------
32 *
33 */
34
35 /** @defgroup threading Threading, pools, queues, ...
36 * @ingroup dnscore
37 * @brief
38 *
39 *
40 *
41 * @{ *
42 *----------------------------------------------------------------------------*/
43 #include "dnscore/dnscore-config.h"
44 #include <stdlib.h>
45 #include <unistd.h>
46
47 #include "dnscore/threaded_ringbuffer_cw.h"
48
49 #define THREADED_QUEUE_RINGBUFFERCW_TAG 0x5743464247525154 /* TQRGBFCW */
50
51 /*
52 * Note:
53 *
54 * If a mutex_init fails, it's because of a resource, memory or rights issue.
55 * So the application will fail soon enough.
56 * I still should check this and exit.
57 *
58 * mutex_lock will fail only if the current thread aleady owns the mutex
59 *
60 * mutex_unlock will fail only if the current thread does not owns the mutex
61 *
62 */
63
64 void
threaded_ringbuffer_cw_init(threaded_ringbuffer_cw * queue,int max_size)65 threaded_ringbuffer_cw_init(threaded_ringbuffer_cw *queue, int max_size)
66 {
67 #if DEBUG
68 memset(queue, 0xff, sizeof(threaded_ringbuffer_cw));
69 #endif
70
71 MALLOC_OR_DIE(void**, queue->buffer, sizeof(void*) * max_size, THREADED_QUEUE_RINGBUFFERCW_TAG);
72
73 queue->buffer_limit = &queue->buffer[max_size];
74 queue->write_slot = queue->buffer;
75 queue->read_slot = queue->buffer;
76
77 mutex_init(&queue->mutex);
78 cond_init(&queue->cond_read);
79 cond_init(&queue->cond_write);
80
81 queue->max_size = max_size;
82 queue->size = 0;
83 }
84
85 void
threaded_ringbuffer_cw_finalize(threaded_ringbuffer_cw * queue)86 threaded_ringbuffer_cw_finalize(threaded_ringbuffer_cw *queue)
87 {
88 /**
89 * If the queue is not empty : too bad !
90 *
91 * It's the responsibility of the caller to ensure the queue and set of listeners is empty.
92 */
93
94 free(queue->buffer);
95 queue->buffer = NULL;
96
97 cond_finalize(&queue->cond_write);
98 cond_finalize(&queue->cond_read);
99
100 mutex_destroy(&queue->mutex);
101 #if DEBUG
102 memset(queue, 0xde, sizeof(threaded_ringbuffer_cw));
103 #endif
104 }
105
106 void
threaded_ringbuffer_cw_enqueue(threaded_ringbuffer_cw * queue,void * constant_pointer)107 threaded_ringbuffer_cw_enqueue(threaded_ringbuffer_cw *queue, void *constant_pointer)
108 {
109 assert(queue->max_size > 0);
110 /*
111 * Ensure I'm allowed to work on queue (only one working on it)
112 */
113
114 mutex_lock(&queue->mutex);
115 while( queue->size >= queue->max_size )
116 {
117 cond_wait(&queue->cond_write, &queue->mutex);
118 }
119
120 /*
121 * Set the data to the write position,
122 * and move the write position to the next slot
123 *
124 */
125
126 /**
127 * @note: "if(overflow) reset" is (much) faster than MOD(limit)
128 */
129
130 *queue->write_slot++ = constant_pointer;
131
132 if(queue->write_slot == queue->buffer_limit)
133 {
134 queue->write_slot = queue->buffer;
135 }
136
137 queue->size++;
138
139 /*
140 * We are done here, we can always signal the readers
141 */
142
143 cond_notify(&queue->cond_read);
144 mutex_unlock(&queue->mutex);
145 }
146
147 void
threaded_ringbuffer_cw_enqueue_set(threaded_ringbuffer_cw * queue,void ** constant_pointer_array,u32 count)148 threaded_ringbuffer_cw_enqueue_set(threaded_ringbuffer_cw *queue, void **constant_pointer_array, u32 count)
149 {
150 assert(queue->max_size > 0);
151 assert(queue->max_size >= count);
152
153 /*
154 * Ensure I'm allowed to work on queue (only one working on it)
155 */
156
157 mutex_lock(&queue->mutex);
158 while( queue->size + count > queue->max_size )
159 {
160 cond_wait(&queue->cond_write, &queue->mutex);
161 }
162
163 /*
164 * Set the data to the write position,
165 * and move the write position to the next slot
166 *
167 */
168
169 /**
170 * @note: "if(overflow) reset" is (much) faster than MOD(limit)
171 */
172
173 for(u32 i = 0; i < count; ++i)
174 {
175 *queue->write_slot++ = constant_pointer_array[i];
176
177 if(queue->write_slot == queue->buffer_limit)
178 {
179 queue->write_slot = queue->buffer;
180 }
181 }
182
183 queue->size += count;
184
185 /*
186 * We are done here, we can always signal the readers
187 */
188
189 cond_notify(&queue->cond_read);
190 mutex_unlock(&queue->mutex);
191 }
192
193 bool
threaded_ringbuffer_cw_try_enqueue(threaded_ringbuffer_cw * queue,void * constant_pointer)194 threaded_ringbuffer_cw_try_enqueue(threaded_ringbuffer_cw* queue, void* constant_pointer)
195 {
196 /*
197 * Ensure I'm allowed to work on queue (only one working on it)
198 */
199
200 if(!mutex_trylock(&queue->mutex))
201 {
202 return FALSE;
203 }
204
205 if( queue->size >= queue->max_size )
206 {
207 mutex_unlock(&queue->mutex);
208 return FALSE;
209 }
210
211 /*
212 * Set the data to the write position,
213 * and move the write position to the next slot
214 *
215 */
216
217 /**
218 * @note: "if(overflow) reset" is (much) faster than MOD(limit)
219 */
220
221 *queue->write_slot++ = constant_pointer;
222
223 if(queue->write_slot == queue->buffer_limit)
224 {
225 queue->write_slot = queue->buffer;
226 }
227
228 queue->size++;
229
230 /*
231 * We are done here, we can always signal the readers
232 */
233
234 cond_notify(&queue->cond_read);
235 mutex_unlock(&queue->mutex);
236
237 return TRUE;
238 }
239
240 void*
threaded_ringbuffer_cw_peek(threaded_ringbuffer_cw * queue)241 threaded_ringbuffer_cw_peek(threaded_ringbuffer_cw *queue)
242 {
243 /*
244 * Ensure I'm allowed to work on queue (only one working on it)
245 */
246
247 mutex_lock(&queue->mutex);
248
249 while( queue->size == 0 )
250 {
251 cond_wait(&queue->cond_read,&queue->mutex);
252 }
253
254 /*
255 * Get the data from the read position,
256 * and move the read position to the next slot
257 *
258 */
259
260 void* data = *queue->read_slot;
261
262 /*
263 * We are done here.
264 */
265
266 mutex_unlock(&queue->mutex);
267
268 return data;
269 }
270
271 void*
threaded_ringbuffer_cw_try_peek(threaded_ringbuffer_cw * queue)272 threaded_ringbuffer_cw_try_peek(threaded_ringbuffer_cw *queue)
273 {
274 mutex_lock(&queue->mutex);
275
276 if( queue->size == 0 )
277 {
278 mutex_unlock(&queue->mutex);
279
280 return NULL;
281 }
282
283 /*
284 * Get the data from the read position,
285 * and move the read position to the next slot
286 *
287 */
288
289 void* data = *queue->read_slot;
290
291 /*
292 * We are done here.
293 */
294
295 mutex_unlock(&queue->mutex);
296
297 return data;
298 }
299
300 void*
threaded_ringbuffer_cw_dequeue(threaded_ringbuffer_cw * queue)301 threaded_ringbuffer_cw_dequeue(threaded_ringbuffer_cw *queue)
302 {
303 /*
304 * Ensure I'm allowed to work on queue (only one working on it)
305 */
306
307 mutex_lock(&queue->mutex);
308
309 while( queue->size == 0 )
310 {
311 cond_wait(&queue->cond_read,&queue->mutex);
312 }
313
314 /*
315 * Get the data from the read position,
316 * and move the read position to the next slot
317 *
318 */
319
320 void* data = *queue->read_slot++;
321 if(queue->read_slot == queue->buffer_limit)
322 {
323 queue->read_slot = queue->buffer;
324 }
325
326 if(queue->size-- == queue->max_size) /* enqueue has just been locked -> unlock */
327 {
328 /*
329 * The queue is full : the queuers are waiting.
330 * Since we will are removing something, we can free (one of) them.
331 * (They will however still be locked until the queue mutex is released)
332 */
333
334 cond_notify(&queue->cond_write);
335 }
336
337 /*
338 * We are done here.
339 */
340
341 mutex_unlock(&queue->mutex);
342
343 return data;
344 }
345
346 void*
threaded_ringbuffer_cw_dequeue_with_timeout(threaded_ringbuffer_cw * queue,s64 timeout_us)347 threaded_ringbuffer_cw_dequeue_with_timeout(threaded_ringbuffer_cw *queue, s64 timeout_us)
348 {
349 /*
350 * Ensure I'm allowed to work on queue (only one working on it)
351 */
352
353 mutex_lock(&queue->mutex);
354
355 while( queue->size == 0 )
356 {
357 if(cond_timedwait(&queue->cond_read,&queue->mutex, timeout_us) != 0)
358 {
359 mutex_unlock(&queue->mutex);
360 return NULL;
361 }
362 }
363
364 /*
365 * Get the data from the read position,
366 * and move the read position to the next slot
367 *
368 */
369
370 void* data = *queue->read_slot++;
371 if(queue->read_slot == queue->buffer_limit)
372 {
373 queue->read_slot = queue->buffer;
374 }
375
376 if(queue->size-- == queue->max_size) /* enqueue has just been locked -> unlock */
377 {
378 /*
379 * The queue is full : the queuers are waiting.
380 * Since we will are removing something, we can free (one of) them.
381 * (They will however still be locked until the queue mutex is released)
382 */
383
384 cond_notify(&queue->cond_write);
385 }
386
387 /*
388 * We are done here.
389 */
390
391 mutex_unlock(&queue->mutex);
392
393 return data;
394 }
395
396 void*
threaded_ringbuffer_cw_try_dequeue(threaded_ringbuffer_cw * queue)397 threaded_ringbuffer_cw_try_dequeue(threaded_ringbuffer_cw *queue)
398 {
399 mutex_lock(&queue->mutex);
400
401 if( queue->size == 0 )
402 {
403 mutex_unlock(&queue->mutex);
404
405 return NULL;
406 }
407
408 /*
409 * Get the data from the read position,
410 * and move the read position to the next slot
411 *
412 */
413
414 void* data = *queue->read_slot++;
415 if(queue->read_slot == queue->buffer_limit)
416 {
417 queue->read_slot = queue->buffer;
418 }
419
420 if(queue->size-- == queue->max_size) /* enqueue has just been locked -> unlock */
421 {
422 /*
423 * The queue is full : the queuers are waiting.
424 * Since we will are removing something, we car free (one of) them.
425 * (They will however still be locked until the queue mutex is released)
426 */
427
428 cond_notify(&queue->cond_write);
429 }
430
431 /*
432 * We are done here.
433 */
434
435 mutex_unlock(&queue->mutex);
436
437 return data;
438 }
439
440 u32
threaded_ringbuffer_cw_dequeue_set(threaded_ringbuffer_cw * queue,void ** array,u32 array_size)441 threaded_ringbuffer_cw_dequeue_set(threaded_ringbuffer_cw *queue, void **array, u32 array_size)
442 {
443 /*
444 * Ensure I'm allowed to work on queue (only one working on it)
445 */
446
447 mutex_lock(&queue->mutex);
448
449 while( queue->size == 0 )
450 {
451 cond_wait(&queue->cond_read,&queue->mutex);
452 }
453
454 /*
455 * Get up to array_size times the data from the read position,
456 * and move the read position to the next slot
457 *
458 */
459
460 bool unlock_enqueue = queue->size == queue->max_size; /* enqueue has just been locked -> schedule unlock */
461
462 u32 loops = MIN(queue->size, array_size); /* The amount we will be able to extract */
463
464 void ** const limit = &array[loops]; /* compute the limit so we only have one increment and one compare */
465
466 while(array < limit)
467 {
468 void* data = *queue->read_slot++;
469 *array++ = data;
470
471 if(queue->read_slot == queue->buffer_limit)
472 {
473 queue->read_slot = queue->buffer;
474 }
475
476 if(data == NULL) /* Break if a terminator is found*/
477 {
478 loops -= limit - array;
479 break;
480 }
481 }
482
483 queue->size -= loops; /* adjust the size */
484
485 if(unlock_enqueue) /* enqueue has just been locked -> unlock */
486 {
487 /*
488 * The queue is full : the queuers are waiting.
489 * Since we will are removing something, we car free (one of) them.
490 * (They will however still be locked until the queue mutex is released)
491 */
492
493 cond_notify(&queue->cond_write);
494 }
495
496 /*
497 * We are done here.
498 */
499
500 mutex_unlock(&queue->mutex);
501
502 return loops; /* Return the amount we got from the queue */
503 }
504
505 void
threaded_ringbuffer_cw_wait_empty(threaded_ringbuffer_cw * queue)506 threaded_ringbuffer_cw_wait_empty(threaded_ringbuffer_cw *queue)
507 {
508 int size;
509
510 for(;;)
511 {
512 mutex_lock(&queue->mutex);
513
514 size = queue->size;
515
516 mutex_unlock(&queue->mutex);
517
518 if(size == 0)
519 {
520 break;
521 }
522
523 usleep(1000);
524 }
525 }
526
527 u32
threaded_ringbuffer_cw_size(threaded_ringbuffer_cw * queue)528 threaded_ringbuffer_cw_size(threaded_ringbuffer_cw *queue)
529 {
530 u32 size;
531
532 mutex_lock(&queue->mutex);
533
534 size = queue->size;
535
536 mutex_unlock(&queue->mutex);
537
538 return size;
539 }
540
541 int
threaded_ringbuffer_cw_room(threaded_ringbuffer_cw * queue)542 threaded_ringbuffer_cw_room(threaded_ringbuffer_cw *queue)
543 {
544 int room;
545
546 mutex_lock(&queue->mutex);
547
548 room = queue->max_size - queue->size;
549
550 mutex_unlock(&queue->mutex);
551
552 return room;
553 }
554
555 ya_result
threaded_ringbuffer_cw_set_maxsize(threaded_ringbuffer_cw * queue,int max_size)556 threaded_ringbuffer_cw_set_maxsize(threaded_ringbuffer_cw *queue, int max_size)
557 {
558 ya_result ret = INVALID_ARGUMENT_ERROR; // can only grow
559
560 mutex_lock(&queue->mutex);
561
562 if(max_size >= (int)queue->size)
563 {
564 void** tmp;
565 MALLOC_OR_DIE(void**, tmp, sizeof(void*) * max_size, THREADED_QUEUE_RINGBUFFERCW_TAG);
566
567 /*
568 * Copy from the read to the write position
569 */
570
571 void** p = tmp;
572 u32 count = queue->size;
573
574 while(count-- > 0)
575 {
576 *p++ = *queue->read_slot++;
577
578 // wrap when the end is reached
579
580 if(queue->read_slot == queue->buffer_limit)
581 {
582 queue->read_slot = queue->buffer;
583 }
584 }
585
586 /*
587 * At this point ...
588 *
589 * tmp is the new "read"
590 * p is the new "write", but it could be at the limit
591 *
592 */
593
594 free(queue->buffer);
595 queue->buffer = tmp;
596 queue->buffer_limit = &tmp[max_size];
597 queue->read_slot = tmp;
598
599 if(p == queue->buffer_limit)
600 {
601 p = tmp;
602 }
603
604 queue->write_slot = p;
605 queue->max_size = max_size;
606 }
607
608 ret = queue->max_size;
609
610 mutex_unlock(&queue->mutex);
611
612 return ret;
613 }
614
615 /** @} */
616