1 /*------------------------------------------------------------------------------
2  *
3  * Copyright (c) 2011-2021, EURid vzw. All rights reserved.
4  * The YADIFA TM software product is provided under the BSD 3-clause license:
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  *
10  *        * Redistributions of source code must retain the above copyright
11  *          notice, this list of conditions and the following disclaimer.
12  *        * Redistributions in binary form must reproduce the above copyright
13  *          notice, this list of conditions and the following disclaimer in the
14  *          documentation and/or other materials provided with the distribution.
15  *        * Neither the name of EURid nor the names of its contributors may be
16  *          used to endorse or promote products derived from this software
17  *          without specific prior written permission.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
23  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29  * POSSIBILITY OF SUCH DAMAGE.
30  *
31  *------------------------------------------------------------------------------
32  *
33  */
34 
35 /** @defgroup threading Threading, pools, queues, ...
36  *  @ingroup dnscore
37  *  @brief
38  *
39  *
40  *
41  * @{ *
42  *----------------------------------------------------------------------------*/
43 #include "dnscore/dnscore-config.h"
44 #include <stdlib.h>
45 #include <unistd.h>
46 
47 #include "dnscore/threaded_ringbuffer_cw.h"
48 
49 #define THREADED_QUEUE_RINGBUFFERCW_TAG 0x5743464247525154 /* TQRGBFCW */
50 
51 /*
52  * Note:
53  *
54  * If a mutex_init fails, it's because of a resource, memory or rights issue.
55  * So the application will fail soon enough.
56  * I still should check this and exit.
57  *
58  * mutex_lock will fail only if the current thread aleady owns the mutex
59  *
60  * mutex_unlock will fail only if the current thread does not owns the mutex
61  *
62  */
63 
64 void
threaded_ringbuffer_cw_init(threaded_ringbuffer_cw * queue,int max_size)65 threaded_ringbuffer_cw_init(threaded_ringbuffer_cw *queue, int max_size)
66 {
67 #if DEBUG
68     memset(queue, 0xff, sizeof(threaded_ringbuffer_cw));
69 #endif
70 
71     MALLOC_OR_DIE(void**, queue->buffer, sizeof(void*) * max_size, THREADED_QUEUE_RINGBUFFERCW_TAG);
72 
73     queue->buffer_limit = &queue->buffer[max_size];
74     queue->write_slot = queue->buffer;
75     queue->read_slot = queue->buffer;
76 
77     mutex_init(&queue->mutex);
78     cond_init(&queue->cond_read);
79     cond_init(&queue->cond_write);
80 
81     queue->max_size = max_size;
82     queue->size = 0;
83 }
84 
85 void
threaded_ringbuffer_cw_finalize(threaded_ringbuffer_cw * queue)86 threaded_ringbuffer_cw_finalize(threaded_ringbuffer_cw *queue)
87 {
88     /**
89      * If the queue is not empty : too bad !
90      *
91      * It's the responsibility of the caller to ensure the queue and  set of listeners is empty.
92      */
93 
94     free(queue->buffer);
95     queue->buffer = NULL;
96 
97     cond_finalize(&queue->cond_write);
98     cond_finalize(&queue->cond_read);
99 
100     mutex_destroy(&queue->mutex);
101 #if DEBUG
102     memset(queue, 0xde, sizeof(threaded_ringbuffer_cw));
103 #endif
104 }
105 
106 void
threaded_ringbuffer_cw_enqueue(threaded_ringbuffer_cw * queue,void * constant_pointer)107 threaded_ringbuffer_cw_enqueue(threaded_ringbuffer_cw *queue, void *constant_pointer)
108 {
109     assert(queue->max_size > 0);
110     /*
111      * Ensure I'm allowed to work on queue (only one working on it)
112      */
113 
114     mutex_lock(&queue->mutex);
115     while( queue->size >= queue->max_size )
116     {
117         cond_wait(&queue->cond_write, &queue->mutex);
118     }
119 
120     /*
121      * Set the data to the write position,
122      * and move the write position to the next slot
123      *
124      */
125 
126     /**
127      * @note: "if(overflow) reset" is (much) faster than MOD(limit)
128      */
129 
130     *queue->write_slot++ = constant_pointer;
131 
132     if(queue->write_slot == queue->buffer_limit)
133     {
134         queue->write_slot = queue->buffer;
135     }
136 
137     queue->size++;
138 
139     /*
140      * We are done here, we can always signal the readers
141      */
142 
143     cond_notify(&queue->cond_read);
144     mutex_unlock(&queue->mutex);
145 }
146 
147 void
threaded_ringbuffer_cw_enqueue_set(threaded_ringbuffer_cw * queue,void ** constant_pointer_array,u32 count)148 threaded_ringbuffer_cw_enqueue_set(threaded_ringbuffer_cw *queue, void **constant_pointer_array, u32 count)
149 {
150     assert(queue->max_size > 0);
151     assert(queue->max_size >= count);
152 
153     /*
154      * Ensure I'm allowed to work on queue (only one working on it)
155      */
156 
157     mutex_lock(&queue->mutex);
158     while( queue->size + count > queue->max_size )
159     {
160         cond_wait(&queue->cond_write, &queue->mutex);
161     }
162 
163     /*
164      * Set the data to the write position,
165      * and move the write position to the next slot
166      *
167      */
168 
169     /**
170      * @note: "if(overflow) reset" is (much) faster than MOD(limit)
171      */
172 
173     for(u32 i = 0; i < count; ++i)
174     {
175         *queue->write_slot++ = constant_pointer_array[i];
176 
177         if(queue->write_slot == queue->buffer_limit)
178         {
179             queue->write_slot = queue->buffer;
180         }
181     }
182 
183     queue->size += count;
184 
185     /*
186      * We are done here, we can always signal the readers
187      */
188 
189     cond_notify(&queue->cond_read);
190     mutex_unlock(&queue->mutex);
191 }
192 
193 bool
threaded_ringbuffer_cw_try_enqueue(threaded_ringbuffer_cw * queue,void * constant_pointer)194 threaded_ringbuffer_cw_try_enqueue(threaded_ringbuffer_cw* queue, void* constant_pointer)
195 {
196     /*
197      * Ensure I'm allowed to work on queue (only one working on it)
198      */
199 
200     if(!mutex_trylock(&queue->mutex))
201     {
202         return FALSE;
203     }
204 
205     if( queue->size >= queue->max_size )
206     {
207         mutex_unlock(&queue->mutex);
208         return FALSE;
209     }
210 
211     /*
212      * Set the data to the write position,
213      * and move the write position to the next slot
214      *
215      */
216 
217     /**
218      * @note: "if(overflow) reset" is (much) faster than MOD(limit)
219      */
220 
221     *queue->write_slot++ = constant_pointer;
222 
223     if(queue->write_slot == queue->buffer_limit)
224     {
225         queue->write_slot = queue->buffer;
226     }
227 
228     queue->size++;
229 
230     /*
231      * We are done here, we can always signal the readers
232      */
233 
234     cond_notify(&queue->cond_read);
235     mutex_unlock(&queue->mutex);
236 
237     return TRUE;
238 }
239 
240 void*
threaded_ringbuffer_cw_peek(threaded_ringbuffer_cw * queue)241 threaded_ringbuffer_cw_peek(threaded_ringbuffer_cw *queue)
242 {
243     /*
244      * Ensure I'm allowed to work on queue (only one working on it)
245      */
246 
247     mutex_lock(&queue->mutex);
248 
249     while( queue->size == 0 )
250     {
251         cond_wait(&queue->cond_read,&queue->mutex);
252     }
253 
254     /*
255      * Get the data from the read position,
256      * and move the read position to the next slot
257      *
258      */
259 
260     void* data = *queue->read_slot;
261 
262     /*
263      * We are done here.
264      */
265 
266     mutex_unlock(&queue->mutex);
267 
268     return data;
269 }
270 
271 void*
threaded_ringbuffer_cw_try_peek(threaded_ringbuffer_cw * queue)272 threaded_ringbuffer_cw_try_peek(threaded_ringbuffer_cw *queue)
273 {
274     mutex_lock(&queue->mutex);
275 
276     if( queue->size == 0 )
277     {
278         mutex_unlock(&queue->mutex);
279 
280         return NULL;
281     }
282 
283     /*
284      * Get the data from the read position,
285      * and move the read position to the next slot
286      *
287      */
288 
289     void* data = *queue->read_slot;
290 
291     /*
292      * We are done here.
293      */
294 
295     mutex_unlock(&queue->mutex);
296 
297     return data;
298 }
299 
300 void*
threaded_ringbuffer_cw_dequeue(threaded_ringbuffer_cw * queue)301 threaded_ringbuffer_cw_dequeue(threaded_ringbuffer_cw *queue)
302 {
303     /*
304      * Ensure I'm allowed to work on queue (only one working on it)
305      */
306 
307     mutex_lock(&queue->mutex);
308 
309     while( queue->size == 0 )
310     {
311         cond_wait(&queue->cond_read,&queue->mutex);
312     }
313 
314     /*
315      * Get the data from the read position,
316      * and move the read position to the next slot
317      *
318      */
319 
320     void* data = *queue->read_slot++;
321     if(queue->read_slot == queue->buffer_limit)
322     {
323         queue->read_slot = queue->buffer;
324     }
325 
326     if(queue->size-- == queue->max_size) /* enqueue has just been locked  -> unlock */
327     {
328         /*
329          * The queue is full : the queuers are waiting.
330          * Since we will are removing something, we can free (one of) them.
331          * (They will however still be locked until the queue mutex is released)
332          */
333 
334         cond_notify(&queue->cond_write);
335     }
336 
337     /*
338      * We are done here.
339      */
340 
341     mutex_unlock(&queue->mutex);
342 
343     return data;
344 }
345 
346 void*
threaded_ringbuffer_cw_dequeue_with_timeout(threaded_ringbuffer_cw * queue,s64 timeout_us)347 threaded_ringbuffer_cw_dequeue_with_timeout(threaded_ringbuffer_cw *queue, s64 timeout_us)
348 {
349     /*
350      * Ensure I'm allowed to work on queue (only one working on it)
351      */
352 
353     mutex_lock(&queue->mutex);
354 
355     while( queue->size == 0 )
356     {
357         if(cond_timedwait(&queue->cond_read,&queue->mutex, timeout_us) != 0)
358         {
359             mutex_unlock(&queue->mutex);
360             return NULL;
361         }
362     }
363 
364     /*
365      * Get the data from the read position,
366      * and move the read position to the next slot
367      *
368      */
369 
370     void* data = *queue->read_slot++;
371     if(queue->read_slot == queue->buffer_limit)
372     {
373         queue->read_slot = queue->buffer;
374     }
375 
376     if(queue->size-- == queue->max_size) /* enqueue has just been locked  -> unlock */
377     {
378         /*
379          * The queue is full : the queuers are waiting.
380          * Since we will are removing something, we can free (one of) them.
381          * (They will however still be locked until the queue mutex is released)
382          */
383 
384         cond_notify(&queue->cond_write);
385     }
386 
387     /*
388      * We are done here.
389      */
390 
391     mutex_unlock(&queue->mutex);
392 
393     return data;
394 }
395 
396 void*
threaded_ringbuffer_cw_try_dequeue(threaded_ringbuffer_cw * queue)397 threaded_ringbuffer_cw_try_dequeue(threaded_ringbuffer_cw *queue)
398 {
399     mutex_lock(&queue->mutex);
400 
401     if( queue->size == 0 )
402     {
403         mutex_unlock(&queue->mutex);
404 
405         return NULL;
406     }
407 
408     /*
409      * Get the data from the read position,
410      * and move the read position to the next slot
411      *
412      */
413 
414     void* data = *queue->read_slot++;
415     if(queue->read_slot == queue->buffer_limit)
416     {
417         queue->read_slot = queue->buffer;
418     }
419 
420     if(queue->size-- == queue->max_size) /* enqueue has just been locked  -> unlock */
421     {
422         /*
423         * The queue is full : the queuers are waiting.
424         * Since we will are removing something, we car free (one of) them.
425         * (They will however still be locked until the queue mutex is released)
426         */
427 
428         cond_notify(&queue->cond_write);
429     }
430 
431     /*
432      * We are done here.
433      */
434 
435     mutex_unlock(&queue->mutex);
436 
437     return data;
438 }
439 
440 u32
threaded_ringbuffer_cw_dequeue_set(threaded_ringbuffer_cw * queue,void ** array,u32 array_size)441 threaded_ringbuffer_cw_dequeue_set(threaded_ringbuffer_cw *queue, void **array, u32 array_size)
442 {
443     /*
444      * Ensure I'm allowed to work on queue (only one working on it)
445      */
446 
447     mutex_lock(&queue->mutex);
448 
449     while( queue->size == 0 )
450     {
451         cond_wait(&queue->cond_read,&queue->mutex);
452     }
453 
454     /*
455      * Get up to array_size times the data from the read position,
456      * and move the read position to the next slot
457      *
458      */
459 
460     bool unlock_enqueue = queue->size == queue->max_size; /* enqueue has just been locked -> schedule unlock */
461 
462     u32 loops = MIN(queue->size, array_size);		  /* The amount we will be able to extract */
463 
464     void ** const limit = &array[loops];		  /* compute the limit so we only have one increment and one compare */
465 
466     while(array < limit)
467     {
468         void* data = *queue->read_slot++;
469         *array++ = data;
470 
471         if(queue->read_slot == queue->buffer_limit)
472         {
473             queue->read_slot = queue->buffer;
474         }
475 
476         if(data == NULL)				    /* Break if a terminator is found*/
477         {
478             loops -= limit - array;
479             break;
480         }
481     }
482 
483     queue->size -= loops;				  /* adjust the size */
484 
485     if(unlock_enqueue) /* enqueue has just been locked -> unlock */
486     {
487         /*
488          * The queue is full : the queuers are waiting.
489          * Since we will are removing something, we car free (one of) them.
490          * (They will however still be locked until the queue mutex is released)
491          */
492 
493         cond_notify(&queue->cond_write);
494     }
495 
496     /*
497      * We are done here.
498      */
499 
500     mutex_unlock(&queue->mutex);
501 
502     return loops;	    /* Return the amount we got from the queue */
503 }
504 
505 void
threaded_ringbuffer_cw_wait_empty(threaded_ringbuffer_cw * queue)506 threaded_ringbuffer_cw_wait_empty(threaded_ringbuffer_cw *queue)
507 {
508     int size;
509 
510     for(;;)
511     {
512         mutex_lock(&queue->mutex);
513 
514         size = queue->size;
515 
516         mutex_unlock(&queue->mutex);
517 
518         if(size == 0)
519         {
520             break;
521         }
522 
523         usleep(1000);
524     }
525 }
526 
527 u32
threaded_ringbuffer_cw_size(threaded_ringbuffer_cw * queue)528 threaded_ringbuffer_cw_size(threaded_ringbuffer_cw *queue)
529 {
530     u32 size;
531 
532     mutex_lock(&queue->mutex);
533 
534     size = queue->size;
535 
536     mutex_unlock(&queue->mutex);
537 
538     return size;
539 }
540 
541 int
threaded_ringbuffer_cw_room(threaded_ringbuffer_cw * queue)542 threaded_ringbuffer_cw_room(threaded_ringbuffer_cw *queue)
543 {
544     int room;
545 
546     mutex_lock(&queue->mutex);
547 
548     room = queue->max_size - queue->size;
549 
550     mutex_unlock(&queue->mutex);
551 
552     return room;
553 }
554 
555 ya_result
threaded_ringbuffer_cw_set_maxsize(threaded_ringbuffer_cw * queue,int max_size)556 threaded_ringbuffer_cw_set_maxsize(threaded_ringbuffer_cw *queue, int max_size)
557 {
558     ya_result ret = INVALID_ARGUMENT_ERROR; // can only grow
559 
560     mutex_lock(&queue->mutex);
561 
562     if(max_size >= (int)queue->size)
563     {
564         void** tmp;
565         MALLOC_OR_DIE(void**, tmp, sizeof(void*) * max_size, THREADED_QUEUE_RINGBUFFERCW_TAG);
566 
567         /*
568          * Copy from the read to the write position
569          */
570 
571         void** p = tmp;
572         u32 count = queue->size;
573 
574         while(count-- > 0)
575         {
576             *p++ = *queue->read_slot++;
577 
578             // wrap when the end is reached
579 
580             if(queue->read_slot == queue->buffer_limit)
581             {
582                 queue->read_slot = queue->buffer;
583             }
584         }
585 
586         /*
587          * At this point ...
588          *
589          * tmp is the new "read"
590          * p is the new "write", but it could be at the limit
591          *
592          */
593 
594         free(queue->buffer);
595         queue->buffer = tmp;
596         queue->buffer_limit = &tmp[max_size];
597         queue->read_slot = tmp;
598 
599         if(p == queue->buffer_limit)
600         {
601             p = tmp;
602         }
603 
604         queue->write_slot = p;
605         queue->max_size = max_size;
606     }
607 
608     ret = queue->max_size;
609 
610     mutex_unlock(&queue->mutex);
611 
612     return ret;
613 }
614 
615 /** @} */
616