1 /* ====================================================================
2  * The Kannel Software License, Version 1.0
3  *
4  * Copyright (c) 2001-2014 Kannel Group
5  * Copyright (c) 1998-2001 WapIT Ltd.
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  *
15  * 2. Redistributions in binary form must reproduce the above copyright
16  *    notice, this list of conditions and the following disclaimer in
17  *    the documentation and/or other materials provided with the
18  *    distribution.
19  *
20  * 3. The end-user documentation included with the redistribution,
21  *    if any, must include the following acknowledgment:
22  *       "This product includes software developed by the
23  *        Kannel Group (http://www.kannel.org/)."
24  *    Alternately, this acknowledgment may appear in the software itself,
25  *    if and wherever such third-party acknowledgments normally appear.
26  *
27  * 4. The names "Kannel" and "Kannel Group" must not be used to
28  *    endorse or promote products derived from this software without
29  *    prior written permission. For written permission, please
30  *    contact org@kannel.org.
31  *
32  * 5. Products derived from this software may not be called "Kannel",
33  *    nor may "Kannel" appear in their name, without prior written
34  *    permission of the Kannel Group.
35  *
36  * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
37  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
38  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
39  * DISCLAIMED.  IN NO EVENT SHALL THE KANNEL GROUP OR ITS CONTRIBUTORS
40  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
41  * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
42  * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
43  * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
44  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
45  * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
46  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
47  * ====================================================================
48  *
49  * This software consists of voluntary contributions made by many
50  * individuals on behalf of the Kannel Group.  For more information on
51  * the Kannel Group, please see <http://www.kannel.org/>.
52  *
53  * Portions of this software are based upon software originally written at
54  * WapIT Ltd., Helsinki, Finland for the Kannel project.
55  */
56 
57 /*
58  * gw-prioqueue.c - generic priority queue with guaranteed order.
59  *
60  * Algorithm ala Robert Sedgewick.
61  *
62  * Alexander Malysh <amalysh at kannel.org>, 2004, 2008
63  */
64 
65 #include "gw-config.h"
66 #include <pthread.h>
67 #include "thread.h"
68 #include "gwmem.h"
69 #include "gwassert.h"
70 #include "gwthread.h"
71 #include "gw-prioqueue.h"
72 
73 
74 struct element {
75     void *item;
76     long long seq;
77 };
78 
79 struct gw_prioqueue {
80     Mutex *mutex;
81     struct element **tab;
82     size_t size;
83     long len;
84     long producers;
85     long long seq;
86     pthread_cond_t nonempty;
87     int (*cmp)(const void*, const void *);
88 };
89 
90 
queue_lock(gw_prioqueue_t * queue)91 static void inline queue_lock(gw_prioqueue_t *queue)
92 {
93     mutex_lock(queue->mutex);
94 }
95 
96 
queue_unlock(gw_prioqueue_t * queue)97 static void inline queue_unlock(gw_prioqueue_t *queue)
98 {
99     mutex_unlock(queue->mutex);
100 }
101 
102 
make_bigger(gw_prioqueue_t * queue,long items)103 static void make_bigger(gw_prioqueue_t *queue, long items)
104 {
105     size_t size = queue->size;
106     size_t new_size = sizeof(*queue->tab) * (queue->len + items);
107 
108     if (size >= new_size)
109         return;
110 
111     queue->tab = gw_realloc(queue->tab, new_size);
112     queue->size = new_size;
113 }
114 
115 
compare(struct element * a,struct element * b,int (* cmp)(const void *,const void *))116 static int compare(struct element *a, struct element *b, int(*cmp)(const void*, const void *))
117 {
118     int rc;
119 
120     rc = cmp(a->item, b->item);
121     if (rc == 0) {
122         /* check sequence to guarantee order */
123         if (a->seq < b->seq)
124             rc = 1;
125         else if (a->seq > b->seq)
126             rc = -1;
127     }
128 
129     return rc;
130 }
131 
132 
133 /**
134  * Heapize up
135  * @queue - our prioqueue
136  + @index - start index
137  */
upheap(gw_prioqueue_t * queue,register long index)138 static void upheap(gw_prioqueue_t *queue, register long index)
139 {
140     struct element *v = queue->tab[index];
141     while (queue->tab[index / 2]->item != NULL && compare(queue->tab[index / 2], v, queue->cmp) < 0) {
142         queue->tab[index] = queue->tab[index / 2];
143         index /= 2;
144     }
145     queue->tab[index] = v;
146 }
147 
148 
149 /**
150  * Heapize down
151  * @queue - our prioqueue
152  * @index - start index
153  */
downheap(gw_prioqueue_t * queue,register long index)154 static void downheap(gw_prioqueue_t *queue, register long index)
155 {
156     struct element *v = queue->tab[index];
157     register long j;
158 
159     while (index <= queue->len / 2) {
160         j = 2 * index;
161         /* take the biggest child item */
162         if (j < queue->len && compare(queue->tab[j], queue->tab[j + 1], queue->cmp) < 0)
163             j++;
164         /* break if our item bigger */
165         if (compare(v, queue->tab[j], queue->cmp) >= 0)
166             break;
167         queue->tab[index] = queue->tab[j];
168         index = j;
169     }
170     queue->tab[index] = v;
171 }
172 
173 
gw_prioqueue_create(int (* cmp)(const void *,const void *))174 gw_prioqueue_t *gw_prioqueue_create(int(*cmp)(const void*, const void *))
175 {
176     gw_prioqueue_t *ret;
177 
178     gw_assert(cmp != NULL);
179 
180     ret = gw_malloc(sizeof(*ret));
181     ret->producers = 0;
182     pthread_cond_init(&ret->nonempty, NULL);
183     ret->mutex = mutex_create();
184     ret->tab = NULL;
185     ret->size = 0;
186     ret->len = 0;
187     ret->seq = 0;
188     ret->cmp = cmp;
189 
190     /* put NULL item at pos 0 that is our stop marker */
191     make_bigger(ret, 1);
192     ret->tab[0] = gw_malloc(sizeof(**ret->tab));
193     ret->tab[0]->item = NULL;
194     ret->tab[0]->seq = ret->seq++;
195     ret->len++;
196 
197     return ret;
198 }
199 
200 
gw_prioqueue_destroy(gw_prioqueue_t * queue,void (* item_destroy)(void *))201 void gw_prioqueue_destroy(gw_prioqueue_t *queue, void(*item_destroy)(void*))
202 {
203     long i;
204 
205     if (queue == NULL)
206         return;
207 
208     for (i = 0; i < queue->len; i++) {
209         if (item_destroy != NULL && queue->tab[i]->item != NULL)
210             item_destroy(queue->tab[i]->item);
211         gw_free(queue->tab[i]);
212     }
213     mutex_destroy(queue->mutex);
214     pthread_cond_destroy(&queue->nonempty);
215     gw_free(queue->tab);
216     gw_free(queue);
217 }
218 
219 
gw_prioqueue_len(gw_prioqueue_t * queue)220 long gw_prioqueue_len(gw_prioqueue_t *queue)
221 {
222     long len;
223 
224     if (queue == NULL)
225         return 0;
226 
227     queue_lock(queue);
228     len = queue->len - 1;
229     queue_unlock(queue);
230 
231     return len;
232 }
233 
234 
gw_prioqueue_insert(gw_prioqueue_t * queue,void * item)235 void gw_prioqueue_insert(gw_prioqueue_t *queue, void *item)
236 {
237     gw_assert(queue != NULL);
238     gw_assert(item != NULL);
239 
240     queue_lock(queue);
241     make_bigger(queue, 1);
242     queue->tab[queue->len] = gw_malloc(sizeof(**queue->tab));
243     queue->tab[queue->len]->item = item;
244     queue->tab[queue->len]->seq = queue->seq++;
245     upheap(queue, queue->len);
246     queue->len++;
247     pthread_cond_signal(&queue->nonempty);
248     queue_unlock(queue);
249 }
250 
251 
gw_prioqueue_foreach(gw_prioqueue_t * queue,void (* fn)(const void *,long))252 void gw_prioqueue_foreach(gw_prioqueue_t *queue, void(*fn)(const void *, long))
253 {
254     register long i;
255 
256     gw_assert(queue != NULL && fn != NULL);
257 
258     queue_lock(queue);
259     for (i = 1; i < queue->len; i++)
260         fn(queue->tab[i]->item, i - 1);
261     queue_unlock(queue);
262 }
263 
264 
gw_prioqueue_remove(gw_prioqueue_t * queue)265 void *gw_prioqueue_remove(gw_prioqueue_t *queue)
266 {
267     void *ret;
268 
269     gw_assert(queue != NULL);
270 
271     queue_lock(queue);
272     if (queue->len <= 1) {
273         queue_unlock(queue);
274         return NULL;
275     }
276     ret = queue->tab[1]->item;
277     gw_free(queue->tab[1]);
278     queue->tab[1] = queue->tab[--queue->len];
279     downheap(queue, 1);
280     queue_unlock(queue);
281 
282     return ret;
283 }
284 
285 
gw_prioqueue_get(gw_prioqueue_t * queue)286 void *gw_prioqueue_get(gw_prioqueue_t *queue)
287 {
288     void *ret;
289 
290     gw_assert(queue != NULL);
291 
292     queue_lock(queue);
293     if (queue->len > 1)
294         ret = queue->tab[1]->item;
295     else
296         ret = NULL;
297     queue_unlock(queue);
298 
299     return ret;
300 }
301 
302 
gw_prioqueue_consume(gw_prioqueue_t * queue)303 void *gw_prioqueue_consume(gw_prioqueue_t *queue)
304 {
305     void *ret;
306 
307     gw_assert(queue != NULL);
308 
309     queue_lock(queue);
310     while (queue->len == 1 && queue->producers > 0) {
311         queue->mutex->owner = -1;
312         pthread_cleanup_push((void(*)(void*))pthread_mutex_unlock, &queue->mutex->mutex);
313         pthread_cond_wait(&queue->nonempty, &queue->mutex->mutex);
314         pthread_cleanup_pop(0);
315         queue->mutex->owner = gwthread_self();
316     }
317     if (queue->len > 1) {
318         ret = queue->tab[1]->item;
319         gw_free(queue->tab[1]);
320         queue->tab[1] = queue->tab[--queue->len];
321         downheap(queue, 1);
322     } else {
323         ret = NULL;
324     }
325     queue_unlock(queue);
326 
327     return ret;
328 }
329 
330 
gw_prioqueue_add_producer(gw_prioqueue_t * queue)331 void gw_prioqueue_add_producer(gw_prioqueue_t *queue)
332 {
333     gw_assert(queue != NULL);
334 
335     queue_lock(queue);
336     queue->producers++;
337     queue_unlock(queue);
338 }
339 
340 
gw_prioqueue_remove_producer(gw_prioqueue_t * queue)341 void gw_prioqueue_remove_producer(gw_prioqueue_t *queue)
342 {
343     gw_assert(queue != NULL);
344 
345     queue_lock(queue);
346     gw_assert(queue->producers > 0);
347     queue->producers--;
348     pthread_cond_broadcast(&queue->nonempty);
349     queue_unlock(queue);
350 }
351 
352 
gw_prioqueue_producer_count(gw_prioqueue_t * queue)353 long gw_prioqueue_producer_count(gw_prioqueue_t *queue)
354 {
355     long ret;
356 
357     gw_assert(queue != NULL);
358 
359     queue_lock(queue);
360     ret = queue->producers;
361     queue_unlock(queue);
362 
363     return ret;
364 }
365 
366