1 /*********************************************************************************************************
2 * Software License Agreement (BSD License) *
3 * Author: Sebastien Decugis <sdecugis@freediameter.net> *
4 * *
5 * Copyright (c) 2020, WIDE Project and NICT *
6 * All rights reserved. *
7 * *
8 * Redistribution and use of this software in source and binary forms, with or without modification, are *
9 * permitted provided that the following conditions are met: *
10 * *
11 * * Redistributions of source code must retain the above *
12 * copyright notice, this list of conditions and the *
13 * following disclaimer. *
14 * *
15 * * Redistributions in binary form must reproduce the above *
16 * copyright notice, this list of conditions and the *
17 * following disclaimer in the documentation and/or other *
18 * materials provided with the distribution. *
19 * *
20 * * Neither the name of the WIDE Project or NICT nor the *
21 * names of its contributors may be used to endorse or *
22 * promote products derived from this software without *
23 * specific prior written permission of WIDE Project and *
24 * NICT. *
25 * *
26 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED *
27 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A *
28 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR *
29 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT *
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS *
31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR *
32 * TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF *
33 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. *
34 *********************************************************************************************************/
35
36 /* FIFO queues module.
37 *
38 * The threads that call these functions must be in the cancellation state PTHREAD_CANCEL_ENABLE and type PTHREAD_CANCEL_DEFERRED.
39 * This is the default state and type on thread creation.
40 *
41 * In order to destroy properly a queue, the application must:
42 * -> shutdown any process that can add into the queue first.
43 * -> pthread_cancel any thread that could be waiting on the queue.
44 * -> consume any element that is in the queue, using fd_qu_tryget_int.
45 * -> then destroy the queue using fd_mq_del.
46 */
47
48 #include "fdproto-internal.h"
49
50 /* Definition of a FIFO queue object */
51 struct fifo {
52 int eyec; /* An eye catcher, also used to check a queue is valid. FIFO_EYEC */
53
54 pthread_mutex_t mtx; /* Mutex protecting this queue */
55 pthread_cond_t cond_pull; /* condition variable for pulling threads */
56 pthread_cond_t cond_push; /* condition variable for pushing threads */
57
58 struct fd_list list; /* sentinel for the list of elements */
59 int count; /* number of objects in the list */
60 int thrs; /* number of threads waiting for a new element (when count is 0) */
61
62 int max; /* maximum number of items to accept if not 0 */
63 int thrs_push; /* number of threads waitnig to push an item */
64
65 uint16_t high; /* High level threshold (see libfreeDiameter.h for details) */
66 uint16_t low; /* Low level threshhold */
67 void *data; /* Opaque pointer for threshold callbacks */
68 void (*h_cb)(struct fifo *, void **); /* The callbacks */
69 void (*l_cb)(struct fifo *, void **);
70 int highest;/* The highest count value for which h_cb has been called */
71 int highest_ever; /* The max count value this queue has reached (for tweaking) */
72
73 long long total_items; /* Cumulated number of items that went through this fifo (excluding current count), always increasing. */
74 struct timespec total_time; /* Cumulated time all items spent in this queue, including blocking time (always growing, use deltas for monitoring) */
75 struct timespec blocking_time; /* Cumulated time threads trying to post new items were blocked (queue full). */
76 struct timespec last_time; /* For the last element retrieved from the queue, how long it take between posting (including blocking) and poping */
77
78 };
79
80 struct fifo_item {
81 struct fd_list item;
82 struct timespec posted_on;
83 };
84
85 /* The eye catcher value */
86 #define FIFO_EYEC 0xe7ec1130
87
88 /* Macro to check a pointer */
89 #define CHECK_FIFO( _queue ) (( (_queue) != NULL) && ( (_queue)->eyec == FIFO_EYEC) )
90
91
92 /* Create a new queue, with max number of items -- use 0 for no max */
fd_fifo_new(struct fifo ** queue,int max)93 int fd_fifo_new ( struct fifo ** queue, int max )
94 {
95 struct fifo * new;
96
97 TRACE_ENTRY( "%p", queue );
98
99 CHECK_PARAMS( queue );
100
101 /* Create a new object */
102 CHECK_MALLOC( new = malloc (sizeof (struct fifo) ) );
103
104 /* Initialize the content */
105 memset(new, 0, sizeof(struct fifo));
106
107 new->eyec = FIFO_EYEC;
108 CHECK_POSIX( pthread_mutex_init(&new->mtx, NULL) );
109 CHECK_POSIX( pthread_cond_init(&new->cond_pull, NULL) );
110 CHECK_POSIX( pthread_cond_init(&new->cond_push, NULL) );
111 new->max = max;
112
113 fd_list_init(&new->list, NULL);
114
115 /* We're done */
116 *queue = new;
117 return 0;
118 }
119
fd_fifo_set_max(struct fifo * queue,int max)120 int fd_fifo_set_max (struct fifo * queue, int max)
121 {
122 queue->max = max;
123 return 0;
124 }
125
126
127 /* Dump the content of a queue */
DECLARE_FD_DUMP_PROTOTYPE(fd_fifo_dump,char * name,struct fifo * queue,fd_fifo_dump_item_cb dump_item)128 DECLARE_FD_DUMP_PROTOTYPE(fd_fifo_dump, char * name, struct fifo * queue, fd_fifo_dump_item_cb dump_item)
129 {
130 FD_DUMP_HANDLE_OFFSET();
131
132 if (name) {
133 CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "'%s'(@%p): ", name, queue), return NULL);
134 } else {
135 CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "{fifo}(@%p): ", queue), return NULL);
136 }
137
138 if (!CHECK_FIFO( queue )) {
139 return fd_dump_extend(FD_DUMP_STD_PARAMS, "INVALID/NULL");
140 }
141
142 CHECK_POSIX_DO( pthread_mutex_lock( &queue->mtx ), /* continue */ );
143 CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "items:%d,%d,%d threads:%d,%d stats:%lld/%ld.%06ld,%ld.%06ld,%ld.%06ld thresholds:%d,%d,%d,%p,%p,%p",
144 queue->count, queue->highest_ever, queue->max,
145 queue->thrs, queue->thrs_push,
146 queue->total_items,(long)queue->total_time.tv_sec,(long)(queue->total_time.tv_nsec/1000),(long)queue->blocking_time.tv_sec,(long)(queue->blocking_time.tv_nsec/1000),(long)queue->last_time.tv_sec,(long)(queue->last_time.tv_nsec/1000),
147 queue->high, queue->low, queue->highest, queue->h_cb, queue->l_cb, queue->data),
148 goto error);
149
150 if (dump_item) {
151 struct fd_list * li;
152 int i = 0;
153 for (li = queue->list.next; li != &queue->list; li = li->next) {
154 struct fifo_item * fi = (struct fifo_item *)li;
155 CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "\n [#%i](@%p)@%ld.%06ld: ",
156 i++, fi->item.o, (long)fi->posted_on.tv_sec,(long)(fi->posted_on.tv_nsec/1000)),
157 goto error);
158 CHECK_MALLOC_DO( (*dump_item)(FD_DUMP_STD_PARAMS, fi->item.o), goto error);
159 }
160 }
161 CHECK_POSIX_DO( pthread_mutex_unlock( &queue->mtx ), /* continue */ );
162
163 return *buf;
164 error:
165 CHECK_POSIX_DO( pthread_mutex_unlock( &queue->mtx ), /* continue */ );
166 return NULL;
167 }
168
169 /* Delete a queue. It must be empty. */
fd_fifo_del(struct fifo ** queue)170 int fd_fifo_del ( struct fifo ** queue )
171 {
172 struct fifo * q;
173 #ifndef NDEBUG
174 int loops = 0;
175 #endif
176
177 TRACE_ENTRY( "%p", queue );
178
179 if (queue && *queue == NULL) {
180 /* Queue already (in the process of being) deleted */
181 return 0;
182 }
183
184 CHECK_PARAMS( queue && CHECK_FIFO( *queue ) );
185
186 q = *queue;
187
188 CHECK_POSIX( pthread_mutex_lock( &q->mtx ) );
189
190 if ((q->count != 0) || (q->data != NULL)) {
191 TRACE_DEBUG(INFO, "The queue cannot be destroyed (%d, %p)", q->count, q->data);
192 CHECK_POSIX_DO( pthread_mutex_unlock( &q->mtx ), /* no fallback */ );
193 return EINVAL;
194 }
195
196 /* Ok, now invalidate the queue */
197 q->eyec = 0xdead;
198
199 /* Have all waiting threads return an error */
200 while (q->thrs) {
201 CHECK_POSIX( pthread_mutex_unlock( &q->mtx ));
202 CHECK_POSIX( pthread_cond_signal(&q->cond_pull) );
203 usleep(1000);
204
205 CHECK_POSIX( pthread_mutex_lock( &q->mtx ) );
206 ASSERT( ++loops < 200 ); /* detect infinite loops */
207 }
208
209 /* sanity check */
210 ASSERT(FD_IS_LIST_EMPTY(&q->list));
211
212 /* And destroy it */
213 CHECK_POSIX( pthread_mutex_unlock( &q->mtx ) );
214
215 CHECK_POSIX_DO( pthread_cond_destroy( &q->cond_pull ), );
216
217 CHECK_POSIX_DO( pthread_cond_destroy( &q->cond_push ), );
218
219 CHECK_POSIX_DO( pthread_mutex_destroy( &q->mtx ), );
220
221 free(q);
222 *queue = NULL;
223
224 return 0;
225 }
226
227 /* Move the content of old into new, and update loc_update atomically. We leave the old queue empty but valid */
fd_fifo_move(struct fifo * old,struct fifo * new,struct fifo ** loc_update)228 int fd_fifo_move ( struct fifo * old, struct fifo * new, struct fifo ** loc_update )
229 {
230 #ifndef NDEBUG
231 int loops = 0;
232 #endif
233
234 TRACE_ENTRY("%p %p %p", old, new, loc_update);
235 CHECK_PARAMS( CHECK_FIFO( old ) && CHECK_FIFO( new ));
236
237 CHECK_PARAMS( ! old->data );
238 if (new->high) {
239 TODO("Implement support for thresholds in fd_fifo_move...");
240 }
241
242 /* Update loc_update */
243 if (loc_update)
244 *loc_update = new;
245
246 /* Lock the queues */
247 CHECK_POSIX( pthread_mutex_lock( &old->mtx ) );
248
249 CHECK_PARAMS_DO( (! old->thrs_push), {
250 pthread_mutex_unlock( &old->mtx );
251 return EINVAL;
252 } );
253
254 CHECK_POSIX( pthread_mutex_lock( &new->mtx ) );
255
256 /* Any waiting thread on the old queue returns an error */
257 old->eyec = 0xdead;
258 while (old->thrs) {
259 CHECK_POSIX( pthread_mutex_unlock( &old->mtx ));
260 CHECK_POSIX( pthread_cond_signal( &old->cond_pull ) );
261 usleep(1000);
262
263 CHECK_POSIX( pthread_mutex_lock( &old->mtx ) );
264 ASSERT( loops < 20 ); /* detect infinite loops */
265 }
266
267 /* Move all data from old to new */
268 fd_list_move_end( &new->list, &old->list );
269 if (old->count && (!new->count)) {
270 CHECK_POSIX( pthread_cond_signal(&new->cond_pull) );
271 }
272 new->count += old->count;
273
274 /* Reset old */
275 old->count = 0;
276 old->eyec = FIFO_EYEC;
277
278 /* Merge the stats in the new queue */
279 new->total_items += old->total_items;
280 old->total_items = 0;
281
282 new->total_time.tv_nsec += old->total_time.tv_nsec;
283 new->total_time.tv_sec += old->total_time.tv_sec + (new->total_time.tv_nsec / 1000000000);
284 new->total_time.tv_nsec %= 1000000000;
285 old->total_time.tv_nsec = 0;
286 old->total_time.tv_sec = 0;
287
288 new->blocking_time.tv_nsec += old->blocking_time.tv_nsec;
289 new->blocking_time.tv_sec += old->blocking_time.tv_sec + (new->blocking_time.tv_nsec / 1000000000);
290 new->blocking_time.tv_nsec %= 1000000000;
291 old->blocking_time.tv_nsec = 0;
292 old->blocking_time.tv_sec = 0;
293
294 /* Unlock, we're done */
295 CHECK_POSIX( pthread_mutex_unlock( &new->mtx ) );
296 CHECK_POSIX( pthread_mutex_unlock( &old->mtx ) );
297
298 return 0;
299 }
300
301 /* Get the information on the queue */
fd_fifo_getstats(struct fifo * queue,int * current_count,int * limit_count,int * highest_count,long long * total_count,struct timespec * total,struct timespec * blocking,struct timespec * last)302 int fd_fifo_getstats( struct fifo * queue, int * current_count, int * limit_count, int * highest_count, long long * total_count,
303 struct timespec * total, struct timespec * blocking, struct timespec * last)
304 {
305 TRACE_ENTRY( "%p %p %p %p %p %p %p %p", queue, current_count, limit_count, highest_count, total_count, total, blocking, last);
306
307 if (queue == NULL) {
308 /* It is not an error if the queue is not available; happens e.g. when peers disappear */
309 return 0;
310 }
311
312 /* Check the parameters */
313 CHECK_PARAMS( CHECK_FIFO( queue ) );
314
315 /* lock the queue */
316 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) );
317
318 if (current_count)
319 *current_count = queue->count;
320
321 if (limit_count)
322 *limit_count = queue->max;
323
324 if (highest_count)
325 *highest_count = queue->highest_ever;
326
327 if (total_count)
328 *total_count = queue->total_items;
329
330 if (total)
331 memcpy(total, &queue->total_time, sizeof(struct timespec));
332
333 if (blocking)
334 memcpy(blocking, &queue->blocking_time, sizeof(struct timespec));
335
336 if (last)
337 memcpy(last, &queue->last_time, sizeof(struct timespec));
338
339 /* Unlock */
340 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) );
341
342 /* Done */
343 return 0;
344 }
345
346
347 /* alternate version with no error checking */
fd_fifo_length(struct fifo * queue)348 int fd_fifo_length ( struct fifo * queue )
349 {
350 if ( !CHECK_FIFO( queue ) )
351 return 0;
352
353 return queue->count; /* Let's hope it's read atomically, since we are not locking... */
354 }
355
356 /* Set the thresholds of the queue */
fd_fifo_setthrhd(struct fifo * queue,void * data,uint16_t high,void (* h_cb)(struct fifo *,void **),uint16_t low,void (* l_cb)(struct fifo *,void **))357 int fd_fifo_setthrhd ( struct fifo * queue, void * data, uint16_t high, void (*h_cb)(struct fifo *, void **), uint16_t low, void (*l_cb)(struct fifo *, void **) )
358 {
359 TRACE_ENTRY( "%p %p %hu %p %hu %p", queue, data, high, h_cb, low, l_cb );
360
361 /* Check the parameters */
362 CHECK_PARAMS( CHECK_FIFO( queue ) && (high > low) && (queue->data == NULL) );
363
364 /* lock the queue */
365 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) );
366
367 /* Save the values */
368 queue->high = high;
369 queue->low = low;
370 queue->data = data;
371 queue->h_cb = h_cb;
372 queue->l_cb = l_cb;
373
374 /* Unlock */
375 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) );
376
377 /* Done */
378 return 0;
379 }
380
381
382 /* This handler is called when a thread is blocked on a queue, and cancelled */
fifo_cleanup_push(void * queue)383 static void fifo_cleanup_push(void * queue)
384 {
385 struct fifo * q = (struct fifo *)queue;
386 TRACE_ENTRY( "%p", queue );
387
388 /* The thread has been cancelled, therefore it does not wait on the queue anymore */
389 q->thrs_push--;
390
391 /* Now unlock the queue, and we're done */
392 CHECK_POSIX_DO( pthread_mutex_unlock( &q->mtx ), /* nothing */ );
393
394 /* End of cleanup handler */
395 return;
396 }
397
398
399 /* Post a new item in the queue */
fd_fifo_post_internal(struct fifo * queue,void ** item,int skip_max)400 int fd_fifo_post_internal ( struct fifo * queue, void ** item, int skip_max )
401 {
402 struct fifo_item * new;
403 int call_cb = 0;
404 struct timespec posted_on, queued_on;
405
406 /* Get the timing of this call */
407 CHECK_SYS( clock_gettime(CLOCK_REALTIME, &posted_on) );
408
409 /* lock the queue */
410 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) );
411
412 if ((!skip_max) && (queue->max)) {
413 while (queue->count >= queue->max) {
414 int ret = 0;
415
416 /* We have to wait for an item to be pulled */
417 queue->thrs_push++ ;
418 pthread_cleanup_push( fifo_cleanup_push, queue);
419 ret = pthread_cond_wait( &queue->cond_push, &queue->mtx );
420 pthread_cleanup_pop(0);
421 queue->thrs_push-- ;
422
423 #ifdef NDEBUG
424 (void)ret;
425 #endif
426 ASSERT( ret == 0 );
427 }
428 }
429
430 /* Create a new list item */
431 CHECK_MALLOC_DO( new = malloc (sizeof (struct fifo_item)) , {
432 pthread_mutex_unlock( &queue->mtx );
433 return ENOMEM;
434 } );
435
436 fd_list_init(&new->item, *item);
437 *item = NULL;
438
439 /* Add the new item at the end */
440 fd_list_insert_before( &queue->list, &new->item);
441 queue->count++;
442 if (queue->highest_ever < queue->count)
443 queue->highest_ever = queue->count;
444 if (queue->high && ((queue->count % queue->high) == 0)) {
445 call_cb = 1;
446 queue->highest = queue->count;
447 }
448
449 /* store timing */
450 memcpy(&new->posted_on, &posted_on, sizeof(struct timespec));
451
452 /* update queue timing info "blocking time" */
453 {
454 long long blocked_ns;
455 CHECK_SYS( clock_gettime(CLOCK_REALTIME, &queued_on) );
456 blocked_ns = (queued_on.tv_sec - posted_on.tv_sec) * 1000000000;
457 blocked_ns += (queued_on.tv_nsec - posted_on.tv_nsec);
458 blocked_ns += queue->blocking_time.tv_nsec;
459 queue->blocking_time.tv_sec += blocked_ns / 1000000000;
460 queue->blocking_time.tv_nsec = blocked_ns % 1000000000;
461 }
462
463 /* Signal if threads are asleep */
464 if (queue->thrs > 0) {
465 CHECK_POSIX( pthread_cond_signal(&queue->cond_pull) );
466 }
467 if (queue->thrs_push > 0) {
468 /* cascade */
469 CHECK_POSIX( pthread_cond_signal(&queue->cond_push) );
470 }
471
472 /* Unlock */
473 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) );
474
475 /* Call high-watermark cb as needed */
476 if (call_cb && queue->h_cb)
477 (*queue->h_cb)(queue, &queue->data);
478
479 /* Done */
480 return 0;
481 }
482
483 /* Post a new item in the queue */
fd_fifo_post_int(struct fifo * queue,void ** item)484 int fd_fifo_post_int ( struct fifo * queue, void ** item )
485 {
486 TRACE_ENTRY( "%p %p", queue, item );
487
488 /* Check the parameters */
489 CHECK_PARAMS( CHECK_FIFO( queue ) && item && *item );
490
491 return fd_fifo_post_internal ( queue,item, 0 );
492
493 }
494
495 /* Post a new item in the queue, not blocking */
fd_fifo_post_noblock(struct fifo * queue,void ** item)496 int fd_fifo_post_noblock ( struct fifo * queue, void ** item )
497 {
498 TRACE_ENTRY( "%p %p", queue, item );
499
500 /* Check the parameters */
501 CHECK_PARAMS( CHECK_FIFO( queue ) && item && *item );
502
503 return fd_fifo_post_internal ( queue,item, 1 );
504
505 }
506
507 /* Pop the first item from the queue */
mq_pop(struct fifo * queue)508 static void * mq_pop(struct fifo * queue)
509 {
510 void * ret = NULL;
511 struct fifo_item * fi;
512 struct timespec now;
513
514 ASSERT( ! FD_IS_LIST_EMPTY(&queue->list) );
515
516 fi = (struct fifo_item *)(queue->list.next);
517 ret = fi->item.o;
518 fd_list_unlink(&fi->item);
519 queue->count--;
520 queue->total_items++;
521
522 /* Update the timings */
523 CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &now), goto skip_timing );
524 {
525 long long elapsed = (now.tv_sec - fi->posted_on.tv_sec) * 1000000000;
526 elapsed += now.tv_nsec - fi->posted_on.tv_nsec;
527
528 queue->last_time.tv_sec = elapsed / 1000000000;
529 queue->last_time.tv_nsec = elapsed % 1000000000;
530
531 elapsed += queue->total_time.tv_nsec;
532 queue->total_time.tv_sec += elapsed / 1000000000;
533 queue->total_time.tv_nsec = elapsed % 1000000000;
534 }
535 skip_timing:
536 free(fi);
537
538 if (queue->thrs_push) {
539 CHECK_POSIX_DO( pthread_cond_signal( &queue->cond_push ), );
540 }
541
542 return ret;
543 }
544
545 /* Check if the low watermark callback must be called. */
test_l_cb(struct fifo * queue)546 static __inline__ int test_l_cb(struct fifo * queue)
547 {
548 if ((queue->high == 0) || (queue->low == 0) || (queue->l_cb == 0))
549 return 0;
550
551 if (((queue->count % queue->high) == queue->low) && (queue->highest > queue->count)) {
552 queue->highest -= queue->high;
553 return 1;
554 }
555
556 return 0;
557 }
558
559 /* Try poping an item */
fd_fifo_tryget_int(struct fifo * queue,void ** item)560 int fd_fifo_tryget_int ( struct fifo * queue, void ** item )
561 {
562 int wouldblock = 0;
563 int call_cb = 0;
564
565 TRACE_ENTRY( "%p %p", queue, item );
566
567 /* Check the parameters */
568 CHECK_PARAMS( CHECK_FIFO( queue ) && item );
569
570 /* lock the queue */
571 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) );
572
573 /* Check queue status */
574 if (queue->count > 0) {
575 got_item:
576 /* There are elements in the queue, so pick the first one */
577 *item = mq_pop(queue);
578 call_cb = test_l_cb(queue);
579 } else {
580 if (queue->thrs_push > 0) {
581 /* A thread is trying to push something, let's give it a chance */
582 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) );
583 CHECK_POSIX( pthread_cond_signal( &queue->cond_push ) );
584 usleep(1000);
585 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) );
586 if (queue->count > 0)
587 goto got_item;
588 }
589
590 wouldblock = 1;
591 *item = NULL;
592 }
593
594 /* Unlock */
595 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) );
596
597 /* Call low watermark callback as needed */
598 if (call_cb)
599 (*queue->l_cb)(queue, &queue->data);
600
601 /* Done */
602 return wouldblock ? EWOULDBLOCK : 0;
603 }
604
605 /* This handler is called when a thread is blocked on a queue, and cancelled */
fifo_cleanup(void * queue)606 static void fifo_cleanup(void * queue)
607 {
608 struct fifo * q = (struct fifo *)queue;
609 TRACE_ENTRY( "%p", queue );
610
611 /* The thread has been cancelled, therefore it does not wait on the queue anymore */
612 q->thrs--;
613
614 /* Now unlock the queue, and we're done */
615 CHECK_POSIX_DO( pthread_mutex_unlock( &q->mtx ), /* nothing */ );
616
617 /* End of cleanup handler */
618 return;
619 }
620
621 /* The internal function for fd_fifo_timedget and fd_fifo_get */
fifo_tget(struct fifo * queue,void ** item,int istimed,const struct timespec * abstime)622 static int fifo_tget ( struct fifo * queue, void ** item, int istimed, const struct timespec *abstime)
623 {
624 int call_cb = 0;
625 int ret = 0;
626
627 /* Check the parameters */
628 CHECK_PARAMS( CHECK_FIFO( queue ) && item && (abstime || !istimed) );
629
630 /* Initialize the return value */
631 *item = NULL;
632
633 /* lock the queue */
634 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) );
635
636 awaken:
637 /* Check queue status */
638 if (!CHECK_FIFO( queue )) {
639 /* The queue is being destroyed */
640 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) );
641 TRACE_DEBUG(FULL, "The queue is being destroyed -> EPIPE");
642 return EPIPE;
643 }
644
645 if (queue->count > 0) {
646 /* There are items in the queue, so pick the first one */
647 *item = mq_pop(queue);
648 call_cb = test_l_cb(queue);
649 } else {
650 /* We have to wait for a new item */
651 queue->thrs++ ;
652 pthread_cleanup_push( fifo_cleanup, queue);
653 if (istimed) {
654 ret = pthread_cond_timedwait( &queue->cond_pull, &queue->mtx, abstime );
655 } else {
656 ret = pthread_cond_wait( &queue->cond_pull, &queue->mtx );
657 }
658 pthread_cleanup_pop(0);
659 queue->thrs-- ;
660 if (ret == 0)
661 goto awaken; /* test for spurious wake-ups */
662
663 /* otherwise (ETIMEDOUT / other error) just continue */
664 }
665
666 /* Unlock */
667 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) );
668
669 /* Call low watermark callback as needed */
670 if (call_cb)
671 (*queue->l_cb)(queue, &queue->data);
672
673 /* Done */
674 return ret;
675 }
676
677 /* Get the next available item, block until there is one */
fd_fifo_get_int(struct fifo * queue,void ** item)678 int fd_fifo_get_int ( struct fifo * queue, void ** item )
679 {
680 TRACE_ENTRY( "%p %p", queue, item );
681 return fifo_tget(queue, item, 0, NULL);
682 }
683
684 /* Get the next available item, block until there is one, or the timeout expires */
fd_fifo_timedget_int(struct fifo * queue,void ** item,const struct timespec * abstime)685 int fd_fifo_timedget_int ( struct fifo * queue, void ** item, const struct timespec *abstime )
686 {
687 TRACE_ENTRY( "%p %p %p", queue, item, abstime );
688 return fifo_tget(queue, item, 1, abstime);
689 }
690
691 /* Test if data is available in the queue, without pulling it */
fd_fifo_select(struct fifo * queue,const struct timespec * abstime)692 int fd_fifo_select ( struct fifo * queue, const struct timespec *abstime )
693 {
694 int ret = 0;
695 TRACE_ENTRY( "%p %p", queue, abstime );
696
697 CHECK_PARAMS_DO( CHECK_FIFO( queue ), return -EINVAL );
698
699 /* lock the queue */
700 CHECK_POSIX_DO( pthread_mutex_lock( &queue->mtx ), return -__ret__ );
701
702 awaken:
703 ret = (queue->count > 0 ) ? queue->count : 0;
704 if ((ret == 0) && (abstime != NULL)) {
705 /* We have to wait for a new item */
706 queue->thrs++ ;
707 pthread_cleanup_push( fifo_cleanup, queue);
708 ret = pthread_cond_timedwait( &queue->cond_pull, &queue->mtx, abstime );
709 pthread_cleanup_pop(0);
710 queue->thrs-- ;
711 if (ret == 0)
712 goto awaken; /* test for spurious wake-ups */
713
714 if (ret == ETIMEDOUT)
715 ret = 0;
716 else
717 ret = -ret;
718 }
719
720 /* Unlock */
721 CHECK_POSIX_DO( pthread_mutex_unlock( &queue->mtx ), return -__ret__ );
722
723 return ret;
724 }
725