1 /*********************************************************************************************************
2 * Software License Agreement (BSD License)                                                               *
3 * Author: Sebastien Decugis <sdecugis@freediameter.net>							 *
4 *													 *
5 * Copyright (c) 2020, WIDE Project and NICT								 *
6 * All rights reserved.											 *
7 * 													 *
8 * Redistribution and use of this software in source and binary forms, with or without modification, are  *
9 * permitted provided that the following conditions are met:						 *
10 * 													 *
11 * * Redistributions of source code must retain the above 						 *
12 *   copyright notice, this list of conditions and the 							 *
13 *   following disclaimer.										 *
14 *    													 *
15 * * Redistributions in binary form must reproduce the above 						 *
16 *   copyright notice, this list of conditions and the 							 *
17 *   following disclaimer in the documentation and/or other						 *
18 *   materials provided with the distribution.								 *
19 * 													 *
20 * * Neither the name of the WIDE Project or NICT nor the 						 *
21 *   names of its contributors may be used to endorse or 						 *
22 *   promote products derived from this software without 						 *
23 *   specific prior written permission of WIDE Project and 						 *
24 *   NICT.												 *
25 * 													 *
26 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED *
27 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A *
28 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR *
29 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 	 *
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 	 *
31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR *
32 * TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF   *
33 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.								 *
34 *********************************************************************************************************/
35 
36 /* FIFO queues module.
37  *
38  * The threads that call these functions must be in the cancellation state PTHREAD_CANCEL_ENABLE and type PTHREAD_CANCEL_DEFERRED.
39  * This is the default state and type on thread creation.
40  *
41  * In order to destroy properly a queue, the application must:
42  *  -> shutdown any process that can add into the queue first.
43  *  -> pthread_cancel any thread that could be waiting on the queue.
44  *  -> consume any element that is in the queue, using fd_qu_tryget_int.
45  *  -> then destroy the queue using fd_mq_del.
46  */
47 
48 #include "fdproto-internal.h"
49 
50 /* Definition of a FIFO queue object */
51 struct fifo {
52 	int		eyec;	/* An eye catcher, also used to check a queue is valid. FIFO_EYEC */
53 
54 	pthread_mutex_t	mtx;	/* Mutex protecting this queue */
55 	pthread_cond_t	cond_pull;	/* condition variable for pulling threads */
56 	pthread_cond_t	cond_push;	/* condition variable for pushing threads */
57 
58 	struct fd_list	list;	/* sentinel for the list of elements */
59 	int		count;	/* number of objects in the list */
60 	int		thrs;	/* number of threads waiting for a new element (when count is 0) */
61 
62 	int 		max;	/* maximum number of items to accept if not 0 */
63 	int		thrs_push; /* number of threads waitnig to push an item */
64 
65 	uint16_t	high;	/* High level threshold (see libfreeDiameter.h for details) */
66 	uint16_t	low;	/* Low level threshhold */
67 	void 		*data;	/* Opaque pointer for threshold callbacks */
68 	void		(*h_cb)(struct fifo *, void **); /* The callbacks */
69 	void		(*l_cb)(struct fifo *, void **);
70 	int 		highest;/* The highest count value for which h_cb has been called */
71 	int		highest_ever; /* The max count value this queue has reached (for tweaking) */
72 
73 	long long	total_items;   /* Cumulated number of items that went through this fifo (excluding current count), always increasing. */
74 	struct timespec total_time;    /* Cumulated time all items spent in this queue, including blocking time (always growing, use deltas for monitoring) */
75 	struct timespec blocking_time; /* Cumulated time threads trying to post new items were blocked (queue full). */
76 	struct timespec last_time;     /* For the last element retrieved from the queue, how long it take between posting (including blocking) and poping */
77 
78 };
79 
80 struct fifo_item {
81 	struct fd_list   item;
82 	struct timespec  posted_on;
83 };
84 
85 /* The eye catcher value */
86 #define FIFO_EYEC	0xe7ec1130
87 
88 /* Macro to check a pointer */
89 #define CHECK_FIFO( _queue ) (( (_queue) != NULL) && ( (_queue)->eyec == FIFO_EYEC) )
90 
91 
92 /* Create a new queue, with max number of items -- use 0 for no max */
fd_fifo_new(struct fifo ** queue,int max)93 int fd_fifo_new ( struct fifo ** queue, int max )
94 {
95 	struct fifo * new;
96 
97 	TRACE_ENTRY( "%p", queue );
98 
99 	CHECK_PARAMS( queue );
100 
101 	/* Create a new object */
102 	CHECK_MALLOC( new = malloc (sizeof (struct fifo) )  );
103 
104 	/* Initialize the content */
105 	memset(new, 0, sizeof(struct fifo));
106 
107 	new->eyec = FIFO_EYEC;
108 	CHECK_POSIX( pthread_mutex_init(&new->mtx, NULL) );
109 	CHECK_POSIX( pthread_cond_init(&new->cond_pull, NULL) );
110 	CHECK_POSIX( pthread_cond_init(&new->cond_push, NULL) );
111 	new->max = max;
112 
113 	fd_list_init(&new->list, NULL);
114 
115 	/* We're done */
116 	*queue = new;
117 	return 0;
118 }
119 
fd_fifo_set_max(struct fifo * queue,int max)120 int fd_fifo_set_max (struct fifo * queue, int max)
121 {
122     queue->max = max;
123     return 0;
124 }
125 
126 
127 /* Dump the content of a queue */
DECLARE_FD_DUMP_PROTOTYPE(fd_fifo_dump,char * name,struct fifo * queue,fd_fifo_dump_item_cb dump_item)128 DECLARE_FD_DUMP_PROTOTYPE(fd_fifo_dump, char * name, struct fifo * queue, fd_fifo_dump_item_cb dump_item)
129 {
130 	FD_DUMP_HANDLE_OFFSET();
131 
132 	if (name) {
133 		CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "'%s'(@%p): ", name, queue), return NULL);
134 	} else {
135 		CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "{fifo}(@%p): ", queue), return NULL);
136 	}
137 
138 	if (!CHECK_FIFO( queue )) {
139 		return fd_dump_extend(FD_DUMP_STD_PARAMS, "INVALID/NULL");
140 	}
141 
142 	CHECK_POSIX_DO(  pthread_mutex_lock( &queue->mtx ), /* continue */  );
143 	CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "items:%d,%d,%d threads:%d,%d stats:%lld/%ld.%06ld,%ld.%06ld,%ld.%06ld thresholds:%d,%d,%d,%p,%p,%p",
144 						queue->count, queue->highest_ever, queue->max,
145 						queue->thrs, queue->thrs_push,
146 						queue->total_items,(long)queue->total_time.tv_sec,(long)(queue->total_time.tv_nsec/1000),(long)queue->blocking_time.tv_sec,(long)(queue->blocking_time.tv_nsec/1000),(long)queue->last_time.tv_sec,(long)(queue->last_time.tv_nsec/1000),
147 						queue->high, queue->low, queue->highest, queue->h_cb, queue->l_cb, queue->data),
148 			 goto error);
149 
150 	if (dump_item) {
151 		struct fd_list * li;
152 		int i = 0;
153 		for (li = queue->list.next; li != &queue->list; li = li->next) {
154 			struct fifo_item * fi = (struct fifo_item *)li;
155 			CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "\n [#%i](@%p)@%ld.%06ld: ",
156 						i++, fi->item.o, (long)fi->posted_on.tv_sec,(long)(fi->posted_on.tv_nsec/1000)),
157 					 goto error);
158 			CHECK_MALLOC_DO( (*dump_item)(FD_DUMP_STD_PARAMS, fi->item.o), goto error);
159 		}
160 	}
161 	CHECK_POSIX_DO(  pthread_mutex_unlock( &queue->mtx ), /* continue */  );
162 
163 	return *buf;
164 error:
165 	CHECK_POSIX_DO(  pthread_mutex_unlock( &queue->mtx ), /* continue */  );
166 	return NULL;
167 }
168 
169 /* Delete a queue. It must be empty. */
fd_fifo_del(struct fifo ** queue)170 int fd_fifo_del ( struct fifo  ** queue )
171 {
172 	struct fifo * q;
173 #ifndef NDEBUG
174 	int loops = 0;
175 #endif
176 
177 	TRACE_ENTRY( "%p", queue );
178 
179 	if (queue && *queue == NULL) {
180 		/* Queue already (in the process of being) deleted */
181 		return 0;
182 	}
183 
184 	CHECK_PARAMS( queue && CHECK_FIFO( *queue ) );
185 
186 	q = *queue;
187 
188 	CHECK_POSIX(  pthread_mutex_lock( &q->mtx )  );
189 
190 	if ((q->count != 0) || (q->data != NULL)) {
191 		TRACE_DEBUG(INFO, "The queue cannot be destroyed (%d, %p)", q->count, q->data);
192 		CHECK_POSIX_DO(  pthread_mutex_unlock( &q->mtx ), /* no fallback */  );
193 		return EINVAL;
194 	}
195 
196 	/* Ok, now invalidate the queue */
197 	q->eyec = 0xdead;
198 
199 	/* Have all waiting threads return an error */
200 	while (q->thrs) {
201 		CHECK_POSIX(  pthread_mutex_unlock( &q->mtx ));
202 		CHECK_POSIX(  pthread_cond_signal(&q->cond_pull)  );
203 		usleep(1000);
204 
205 		CHECK_POSIX(  pthread_mutex_lock( &q->mtx )  );
206 		ASSERT( ++loops < 200 ); /* detect infinite loops */
207 	}
208 
209 	/* sanity check */
210 	ASSERT(FD_IS_LIST_EMPTY(&q->list));
211 
212 	/* And destroy it */
213 	CHECK_POSIX(  pthread_mutex_unlock( &q->mtx )  );
214 
215 	CHECK_POSIX_DO(  pthread_cond_destroy( &q->cond_pull ),  );
216 
217 	CHECK_POSIX_DO(  pthread_cond_destroy( &q->cond_push ),  );
218 
219 	CHECK_POSIX_DO(  pthread_mutex_destroy( &q->mtx ),  );
220 
221 	free(q);
222 	*queue = NULL;
223 
224 	return 0;
225 }
226 
227 /* Move the content of old into new, and update loc_update atomically. We leave the old queue empty but valid */
fd_fifo_move(struct fifo * old,struct fifo * new,struct fifo ** loc_update)228 int fd_fifo_move ( struct fifo * old, struct fifo * new, struct fifo ** loc_update )
229 {
230 #ifndef NDEBUG
231 	int loops = 0;
232 #endif
233 
234 	TRACE_ENTRY("%p %p %p", old, new, loc_update);
235 	CHECK_PARAMS( CHECK_FIFO( old ) && CHECK_FIFO( new ));
236 
237 	CHECK_PARAMS( ! old->data );
238 	if (new->high) {
239 		TODO("Implement support for thresholds in fd_fifo_move...");
240 	}
241 
242 	/* Update loc_update */
243 	if (loc_update)
244 		*loc_update = new;
245 
246 	/* Lock the queues */
247 	CHECK_POSIX(  pthread_mutex_lock( &old->mtx )  );
248 
249 	CHECK_PARAMS_DO( (! old->thrs_push), {
250 			pthread_mutex_unlock( &old->mtx );
251 			return EINVAL;
252 		} );
253 
254 	CHECK_POSIX(  pthread_mutex_lock( &new->mtx )  );
255 
256 	/* Any waiting thread on the old queue returns an error */
257 	old->eyec = 0xdead;
258 	while (old->thrs) {
259 		CHECK_POSIX(  pthread_mutex_unlock( &old->mtx ));
260 		CHECK_POSIX(  pthread_cond_signal( &old->cond_pull )  );
261 		usleep(1000);
262 
263 		CHECK_POSIX(  pthread_mutex_lock( &old->mtx )  );
264 		ASSERT( loops < 20 ); /* detect infinite loops */
265 	}
266 
267 	/* Move all data from old to new */
268 	fd_list_move_end( &new->list, &old->list );
269 	if (old->count && (!new->count)) {
270 		CHECK_POSIX(  pthread_cond_signal(&new->cond_pull)  );
271 	}
272 	new->count += old->count;
273 
274 	/* Reset old */
275 	old->count = 0;
276 	old->eyec = FIFO_EYEC;
277 
278 	/* Merge the stats in the new queue */
279 	new->total_items += old->total_items;
280 	old->total_items = 0;
281 
282 	new->total_time.tv_nsec += old->total_time.tv_nsec;
283 	new->total_time.tv_sec += old->total_time.tv_sec + (new->total_time.tv_nsec / 1000000000);
284 	new->total_time.tv_nsec %= 1000000000;
285 	old->total_time.tv_nsec = 0;
286 	old->total_time.tv_sec = 0;
287 
288 	new->blocking_time.tv_nsec += old->blocking_time.tv_nsec;
289 	new->blocking_time.tv_sec += old->blocking_time.tv_sec + (new->blocking_time.tv_nsec / 1000000000);
290 	new->blocking_time.tv_nsec %= 1000000000;
291 	old->blocking_time.tv_nsec = 0;
292 	old->blocking_time.tv_sec = 0;
293 
294 	/* Unlock, we're done */
295 	CHECK_POSIX(  pthread_mutex_unlock( &new->mtx )  );
296 	CHECK_POSIX(  pthread_mutex_unlock( &old->mtx )  );
297 
298 	return 0;
299 }
300 
301 /* Get the information on the queue */
fd_fifo_getstats(struct fifo * queue,int * current_count,int * limit_count,int * highest_count,long long * total_count,struct timespec * total,struct timespec * blocking,struct timespec * last)302 int fd_fifo_getstats( struct fifo * queue, int * current_count, int * limit_count, int * highest_count, long long * total_count,
303 				           struct timespec * total, struct timespec * blocking, struct timespec * last)
304 {
305 	TRACE_ENTRY( "%p %p %p %p %p %p %p %p", queue, current_count, limit_count, highest_count, total_count, total, blocking, last);
306 
307 	if (queue == NULL) {
308 		/* It is not an error if the queue is not available; happens e.g. when peers disappear */
309 		return 0;
310 	}
311 
312 	/* Check the parameters */
313 	CHECK_PARAMS( CHECK_FIFO( queue ) );
314 
315 	/* lock the queue */
316 	CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );
317 
318 	if (current_count)
319 		*current_count = queue->count;
320 
321 	if (limit_count)
322 		*limit_count = queue->max;
323 
324 	if (highest_count)
325 		*highest_count = queue->highest_ever;
326 
327 	if (total_count)
328 		*total_count = queue->total_items;
329 
330 	if (total)
331 		memcpy(total, &queue->total_time, sizeof(struct timespec));
332 
333 	if (blocking)
334 		memcpy(blocking, &queue->blocking_time, sizeof(struct timespec));
335 
336 	if (last)
337 		memcpy(last, &queue->last_time, sizeof(struct timespec));
338 
339 	/* Unlock */
340 	CHECK_POSIX(  pthread_mutex_unlock( &queue->mtx )  );
341 
342 	/* Done */
343 	return 0;
344 }
345 
346 
347 /* alternate version with no error checking */
fd_fifo_length(struct fifo * queue)348 int fd_fifo_length ( struct fifo * queue )
349 {
350 	if ( !CHECK_FIFO( queue ) )
351 		return 0;
352 
353 	return queue->count; /* Let's hope it's read atomically, since we are not locking... */
354 }
355 
356 /* Set the thresholds of the queue */
fd_fifo_setthrhd(struct fifo * queue,void * data,uint16_t high,void (* h_cb)(struct fifo *,void **),uint16_t low,void (* l_cb)(struct fifo *,void **))357 int fd_fifo_setthrhd ( struct fifo * queue, void * data, uint16_t high, void (*h_cb)(struct fifo *, void **), uint16_t low, void (*l_cb)(struct fifo *, void **) )
358 {
359 	TRACE_ENTRY( "%p %p %hu %p %hu %p", queue, data, high, h_cb, low, l_cb );
360 
361 	/* Check the parameters */
362 	CHECK_PARAMS( CHECK_FIFO( queue ) && (high > low) && (queue->data == NULL) );
363 
364 	/* lock the queue */
365 	CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );
366 
367 	/* Save the values */
368 	queue->high = high;
369 	queue->low  = low;
370 	queue->data = data;
371 	queue->h_cb = h_cb;
372 	queue->l_cb = l_cb;
373 
374 	/* Unlock */
375 	CHECK_POSIX(  pthread_mutex_unlock( &queue->mtx )  );
376 
377 	/* Done */
378 	return 0;
379 }
380 
381 
382 /* This handler is called when a thread is blocked on a queue, and cancelled */
fifo_cleanup_push(void * queue)383 static void fifo_cleanup_push(void * queue)
384 {
385 	struct fifo * q = (struct fifo *)queue;
386 	TRACE_ENTRY( "%p", queue );
387 
388 	/* The thread has been cancelled, therefore it does not wait on the queue anymore */
389 	q->thrs_push--;
390 
391 	/* Now unlock the queue, and we're done */
392 	CHECK_POSIX_DO(  pthread_mutex_unlock( &q->mtx ),  /* nothing */  );
393 
394 	/* End of cleanup handler */
395 	return;
396 }
397 
398 
399 /* Post a new item in the queue */
fd_fifo_post_internal(struct fifo * queue,void ** item,int skip_max)400 int fd_fifo_post_internal ( struct fifo * queue, void ** item, int skip_max )
401 {
402 	struct fifo_item * new;
403 	int call_cb = 0;
404 	struct timespec posted_on, queued_on;
405 
406 	/* Get the timing of this call */
407 	CHECK_SYS(  clock_gettime(CLOCK_REALTIME, &posted_on)  );
408 
409 	/* lock the queue */
410 	CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );
411 
412 	if ((!skip_max) && (queue->max)) {
413 		while (queue->count >= queue->max) {
414 			int ret = 0;
415 
416 			/* We have to wait for an item to be pulled */
417 			queue->thrs_push++ ;
418 			pthread_cleanup_push( fifo_cleanup_push, queue);
419 			ret = pthread_cond_wait( &queue->cond_push, &queue->mtx );
420 			pthread_cleanup_pop(0);
421 			queue->thrs_push-- ;
422 
423 #ifdef NDEBUG
424 			(void)ret;
425 #endif
426 			ASSERT( ret == 0 );
427 		}
428 	}
429 
430 	/* Create a new list item */
431 	CHECK_MALLOC_DO(  new = malloc (sizeof (struct fifo_item)) , {
432 			pthread_mutex_unlock( &queue->mtx );
433 			return ENOMEM;
434 		} );
435 
436 	fd_list_init(&new->item, *item);
437 	*item = NULL;
438 
439 	/* Add the new item at the end */
440 	fd_list_insert_before( &queue->list, &new->item);
441 	queue->count++;
442 	if (queue->highest_ever < queue->count)
443 		queue->highest_ever = queue->count;
444 	if (queue->high && ((queue->count % queue->high) == 0)) {
445 		call_cb = 1;
446 		queue->highest = queue->count;
447 	}
448 
449 	/* store timing */
450 	memcpy(&new->posted_on, &posted_on, sizeof(struct timespec));
451 
452 	/* update queue timing info "blocking time" */
453 	{
454 		long long blocked_ns;
455 		CHECK_SYS(  clock_gettime(CLOCK_REALTIME, &queued_on)  );
456 		blocked_ns = (queued_on.tv_sec - posted_on.tv_sec) * 1000000000;
457 		blocked_ns += (queued_on.tv_nsec - posted_on.tv_nsec);
458 		blocked_ns += queue->blocking_time.tv_nsec;
459 		queue->blocking_time.tv_sec += blocked_ns / 1000000000;
460 		queue->blocking_time.tv_nsec = blocked_ns % 1000000000;
461 	}
462 
463 	/* Signal if threads are asleep */
464 	if (queue->thrs > 0) {
465 		CHECK_POSIX(  pthread_cond_signal(&queue->cond_pull)  );
466 	}
467 	if (queue->thrs_push > 0) {
468 		/* cascade */
469 		CHECK_POSIX(  pthread_cond_signal(&queue->cond_push)  );
470 	}
471 
472 	/* Unlock */
473 	CHECK_POSIX(  pthread_mutex_unlock( &queue->mtx )  );
474 
475 	/* Call high-watermark cb as needed */
476 	if (call_cb && queue->h_cb)
477 		(*queue->h_cb)(queue, &queue->data);
478 
479 	/* Done */
480 	return 0;
481 }
482 
483 /* Post a new item in the queue */
fd_fifo_post_int(struct fifo * queue,void ** item)484 int fd_fifo_post_int ( struct fifo * queue, void ** item )
485 {
486 	TRACE_ENTRY( "%p %p", queue, item );
487 
488 	/* Check the parameters */
489 	CHECK_PARAMS( CHECK_FIFO( queue ) && item && *item );
490 
491 	return fd_fifo_post_internal ( queue,item, 0 );
492 
493 }
494 
495 /* Post a new item in the queue, not blocking */
fd_fifo_post_noblock(struct fifo * queue,void ** item)496 int fd_fifo_post_noblock ( struct fifo * queue, void ** item )
497 {
498 	TRACE_ENTRY( "%p %p", queue, item );
499 
500 	/* Check the parameters */
501 	CHECK_PARAMS( CHECK_FIFO( queue ) && item && *item );
502 
503 	return fd_fifo_post_internal ( queue,item, 1 );
504 
505 }
506 
507 /* Pop the first item from the queue */
mq_pop(struct fifo * queue)508 static void * mq_pop(struct fifo * queue)
509 {
510 	void * ret = NULL;
511 	struct fifo_item * fi;
512 	struct timespec now;
513 
514 	ASSERT( ! FD_IS_LIST_EMPTY(&queue->list) );
515 
516 	fi = (struct fifo_item *)(queue->list.next);
517 	ret = fi->item.o;
518 	fd_list_unlink(&fi->item);
519 	queue->count--;
520 	queue->total_items++;
521 
522 	/* Update the timings */
523 	CHECK_SYS_DO(  clock_gettime(CLOCK_REALTIME, &now), goto skip_timing  );
524 	{
525 		long long elapsed = (now.tv_sec - fi->posted_on.tv_sec) * 1000000000;
526 		elapsed += now.tv_nsec - fi->posted_on.tv_nsec;
527 
528 		queue->last_time.tv_sec = elapsed / 1000000000;
529 		queue->last_time.tv_nsec = elapsed % 1000000000;
530 
531 		elapsed += queue->total_time.tv_nsec;
532 		queue->total_time.tv_sec += elapsed / 1000000000;
533 		queue->total_time.tv_nsec = elapsed % 1000000000;
534 	}
535 skip_timing:
536 	free(fi);
537 
538 	if (queue->thrs_push) {
539 		CHECK_POSIX_DO( pthread_cond_signal( &queue->cond_push ), );
540 	}
541 
542 	return ret;
543 }
544 
545 /* Check if the low watermark callback must be called. */
test_l_cb(struct fifo * queue)546 static __inline__ int test_l_cb(struct fifo * queue)
547 {
548 	if ((queue->high == 0) || (queue->low == 0) || (queue->l_cb == 0))
549 		return 0;
550 
551 	if (((queue->count % queue->high) == queue->low) && (queue->highest > queue->count)) {
552 		queue->highest -= queue->high;
553 		return 1;
554 	}
555 
556 	return 0;
557 }
558 
559 /* Try poping an item */
fd_fifo_tryget_int(struct fifo * queue,void ** item)560 int fd_fifo_tryget_int ( struct fifo * queue, void ** item )
561 {
562 	int wouldblock = 0;
563 	int call_cb = 0;
564 
565 	TRACE_ENTRY( "%p %p", queue, item );
566 
567 	/* Check the parameters */
568 	CHECK_PARAMS( CHECK_FIFO( queue ) && item );
569 
570 	/* lock the queue */
571 	CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );
572 
573 	/* Check queue status */
574 	if (queue->count > 0) {
575 got_item:
576 		/* There are elements in the queue, so pick the first one */
577 		*item = mq_pop(queue);
578 		call_cb = test_l_cb(queue);
579 	} else {
580 		if (queue->thrs_push > 0) {
581 			/* A thread is trying to push something, let's give it a chance */
582 			CHECK_POSIX(  pthread_mutex_unlock( &queue->mtx )  );
583 			CHECK_POSIX(  pthread_cond_signal( &queue->cond_push )  );
584 			usleep(1000);
585 			CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );
586 			if (queue->count > 0)
587 				goto got_item;
588 		}
589 
590 		wouldblock = 1;
591 		*item = NULL;
592 	}
593 
594 	/* Unlock */
595 	CHECK_POSIX(  pthread_mutex_unlock( &queue->mtx )  );
596 
597 	/* Call low watermark callback as needed */
598 	if (call_cb)
599 		(*queue->l_cb)(queue, &queue->data);
600 
601 	/* Done */
602 	return wouldblock ? EWOULDBLOCK : 0;
603 }
604 
605 /* This handler is called when a thread is blocked on a queue, and cancelled */
fifo_cleanup(void * queue)606 static void fifo_cleanup(void * queue)
607 {
608 	struct fifo * q = (struct fifo *)queue;
609 	TRACE_ENTRY( "%p", queue );
610 
611 	/* The thread has been cancelled, therefore it does not wait on the queue anymore */
612 	q->thrs--;
613 
614 	/* Now unlock the queue, and we're done */
615 	CHECK_POSIX_DO(  pthread_mutex_unlock( &q->mtx ),  /* nothing */  );
616 
617 	/* End of cleanup handler */
618 	return;
619 }
620 
621 /* The internal function for fd_fifo_timedget and fd_fifo_get */
fifo_tget(struct fifo * queue,void ** item,int istimed,const struct timespec * abstime)622 static int fifo_tget ( struct fifo * queue, void ** item, int istimed, const struct timespec *abstime)
623 {
624 	int call_cb = 0;
625 	int ret = 0;
626 
627 	/* Check the parameters */
628 	CHECK_PARAMS( CHECK_FIFO( queue ) && item && (abstime || !istimed) );
629 
630 	/* Initialize the return value */
631 	*item = NULL;
632 
633 	/* lock the queue */
634 	CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );
635 
636 awaken:
637 	/* Check queue status */
638 	if (!CHECK_FIFO( queue )) {
639 		/* The queue is being destroyed */
640 		CHECK_POSIX(  pthread_mutex_unlock( &queue->mtx )  );
641 		TRACE_DEBUG(FULL, "The queue is being destroyed -> EPIPE");
642 		return EPIPE;
643 	}
644 
645 	if (queue->count > 0) {
646 		/* There are items in the queue, so pick the first one */
647 		*item = mq_pop(queue);
648 		call_cb = test_l_cb(queue);
649 	} else {
650 		/* We have to wait for a new item */
651 		queue->thrs++ ;
652 		pthread_cleanup_push( fifo_cleanup, queue);
653 		if (istimed) {
654 			ret = pthread_cond_timedwait( &queue->cond_pull, &queue->mtx, abstime );
655 		} else {
656 			ret = pthread_cond_wait( &queue->cond_pull, &queue->mtx );
657 		}
658 		pthread_cleanup_pop(0);
659 		queue->thrs-- ;
660 		if (ret == 0)
661 			goto awaken;  /* test for spurious wake-ups */
662 
663 		/* otherwise (ETIMEDOUT / other error) just continue */
664 	}
665 
666 	/* Unlock */
667 	CHECK_POSIX(  pthread_mutex_unlock( &queue->mtx )  );
668 
669 	/* Call low watermark callback as needed */
670 	if (call_cb)
671 		(*queue->l_cb)(queue, &queue->data);
672 
673 	/* Done */
674 	return ret;
675 }
676 
677 /* Get the next available item, block until there is one */
fd_fifo_get_int(struct fifo * queue,void ** item)678 int fd_fifo_get_int ( struct fifo * queue, void ** item )
679 {
680 	TRACE_ENTRY( "%p %p", queue, item );
681 	return fifo_tget(queue, item, 0, NULL);
682 }
683 
684 /* Get the next available item, block until there is one, or the timeout expires */
fd_fifo_timedget_int(struct fifo * queue,void ** item,const struct timespec * abstime)685 int fd_fifo_timedget_int ( struct fifo * queue, void ** item, const struct timespec *abstime )
686 {
687 	TRACE_ENTRY( "%p %p %p", queue, item, abstime );
688 	return fifo_tget(queue, item, 1, abstime);
689 }
690 
691 /* Test if data is available in the queue, without pulling it */
fd_fifo_select(struct fifo * queue,const struct timespec * abstime)692 int fd_fifo_select ( struct fifo * queue, const struct timespec *abstime )
693 {
694 	int ret = 0;
695 	TRACE_ENTRY( "%p %p", queue, abstime );
696 
697 	CHECK_PARAMS_DO( CHECK_FIFO( queue ), return -EINVAL );
698 
699 	/* lock the queue */
700 	CHECK_POSIX_DO(  pthread_mutex_lock( &queue->mtx ), return -__ret__  );
701 
702 awaken:
703 	ret = (queue->count > 0 ) ? queue->count : 0;
704 	if ((ret == 0) && (abstime != NULL)) {
705 		/* We have to wait for a new item */
706 		queue->thrs++ ;
707 		pthread_cleanup_push( fifo_cleanup, queue);
708 		ret = pthread_cond_timedwait( &queue->cond_pull, &queue->mtx, abstime );
709 		pthread_cleanup_pop(0);
710 		queue->thrs-- ;
711 		if (ret == 0)
712 			goto awaken;  /* test for spurious wake-ups */
713 
714 		if (ret == ETIMEDOUT)
715 			ret = 0;
716 		else
717 			ret = -ret;
718 	}
719 
720 	/* Unlock */
721 	CHECK_POSIX_DO(  pthread_mutex_unlock( &queue->mtx ), return -__ret__  );
722 
723 	return ret;
724 }
725