1 /* ====================================================================
2 * The Kannel Software License, Version 1.0
3 *
4 * Copyright (c) 2001-2014 Kannel Group
5 * Copyright (c) 1998-2001 WapIT Ltd.
6 * All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
10 * are met:
11 *
12 * 1. Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 *
15 * 2. Redistributions in binary form must reproduce the above copyright
16 * notice, this list of conditions and the following disclaimer in
17 * the documentation and/or other materials provided with the
18 * distribution.
19 *
20 * 3. The end-user documentation included with the redistribution,
21 * if any, must include the following acknowledgment:
22 * "This product includes software developed by the
23 * Kannel Group (http://www.kannel.org/)."
24 * Alternately, this acknowledgment may appear in the software itself,
25 * if and wherever such third-party acknowledgments normally appear.
26 *
27 * 4. The names "Kannel" and "Kannel Group" must not be used to
28 * endorse or promote products derived from this software without
29 * prior written permission. For written permission, please
30 * contact org@kannel.org.
31 *
32 * 5. Products derived from this software may not be called "Kannel",
33 * nor may "Kannel" appear in their name, without prior written
34 * permission of the Kannel Group.
35 *
36 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
37 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
38 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
39 * DISCLAIMED. IN NO EVENT SHALL THE KANNEL GROUP OR ITS CONTRIBUTORS
40 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
41 * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
42 * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
43 * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
44 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
45 * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
46 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
47 * ====================================================================
48 *
49 * This software consists of voluntary contributions made by many
50 * individuals on behalf of the Kannel Group. For more information on
51 * the Kannel Group, please see <http://www.kannel.org/>.
52 *
53 * Portions of this software are based upon software originally written at
54 * WapIT Ltd., Helsinki, Finland for the Kannel project.
55 */
56
57 /*
58 * gw-prioqueue.c - generic priority queue with guaranteed order.
59 *
60 * Algorithm ala Robert Sedgewick.
61 *
62 * Alexander Malysh <amalysh at kannel.org>, 2004, 2008
63 */
64
65 #include "gw-config.h"
66 #include <pthread.h>
67 #include "thread.h"
68 #include "gwmem.h"
69 #include "gwassert.h"
70 #include "gwthread.h"
71 #include "gw-prioqueue.h"
72
73
74 struct element {
75 void *item;
76 long long seq;
77 };
78
79 struct gw_prioqueue {
80 Mutex *mutex;
81 struct element **tab;
82 size_t size;
83 long len;
84 long producers;
85 long long seq;
86 pthread_cond_t nonempty;
87 int (*cmp)(const void*, const void *);
88 };
89
90
queue_lock(gw_prioqueue_t * queue)91 static void inline queue_lock(gw_prioqueue_t *queue)
92 {
93 mutex_lock(queue->mutex);
94 }
95
96
queue_unlock(gw_prioqueue_t * queue)97 static void inline queue_unlock(gw_prioqueue_t *queue)
98 {
99 mutex_unlock(queue->mutex);
100 }
101
102
make_bigger(gw_prioqueue_t * queue,long items)103 static void make_bigger(gw_prioqueue_t *queue, long items)
104 {
105 size_t size = queue->size;
106 size_t new_size = sizeof(*queue->tab) * (queue->len + items);
107
108 if (size >= new_size)
109 return;
110
111 queue->tab = gw_realloc(queue->tab, new_size);
112 queue->size = new_size;
113 }
114
115
compare(struct element * a,struct element * b,int (* cmp)(const void *,const void *))116 static int compare(struct element *a, struct element *b, int(*cmp)(const void*, const void *))
117 {
118 int rc;
119
120 rc = cmp(a->item, b->item);
121 if (rc == 0) {
122 /* check sequence to guarantee order */
123 if (a->seq < b->seq)
124 rc = 1;
125 else if (a->seq > b->seq)
126 rc = -1;
127 }
128
129 return rc;
130 }
131
132
133 /**
134 * Heapize up
135 * @queue - our prioqueue
136 + @index - start index
137 */
upheap(gw_prioqueue_t * queue,register long index)138 static void upheap(gw_prioqueue_t *queue, register long index)
139 {
140 struct element *v = queue->tab[index];
141 while (queue->tab[index / 2]->item != NULL && compare(queue->tab[index / 2], v, queue->cmp) < 0) {
142 queue->tab[index] = queue->tab[index / 2];
143 index /= 2;
144 }
145 queue->tab[index] = v;
146 }
147
148
149 /**
150 * Heapize down
151 * @queue - our prioqueue
152 * @index - start index
153 */
downheap(gw_prioqueue_t * queue,register long index)154 static void downheap(gw_prioqueue_t *queue, register long index)
155 {
156 struct element *v = queue->tab[index];
157 register long j;
158
159 while (index <= queue->len / 2) {
160 j = 2 * index;
161 /* take the biggest child item */
162 if (j < queue->len && compare(queue->tab[j], queue->tab[j + 1], queue->cmp) < 0)
163 j++;
164 /* break if our item bigger */
165 if (compare(v, queue->tab[j], queue->cmp) >= 0)
166 break;
167 queue->tab[index] = queue->tab[j];
168 index = j;
169 }
170 queue->tab[index] = v;
171 }
172
173
gw_prioqueue_create(int (* cmp)(const void *,const void *))174 gw_prioqueue_t *gw_prioqueue_create(int(*cmp)(const void*, const void *))
175 {
176 gw_prioqueue_t *ret;
177
178 gw_assert(cmp != NULL);
179
180 ret = gw_malloc(sizeof(*ret));
181 ret->producers = 0;
182 pthread_cond_init(&ret->nonempty, NULL);
183 ret->mutex = mutex_create();
184 ret->tab = NULL;
185 ret->size = 0;
186 ret->len = 0;
187 ret->seq = 0;
188 ret->cmp = cmp;
189
190 /* put NULL item at pos 0 that is our stop marker */
191 make_bigger(ret, 1);
192 ret->tab[0] = gw_malloc(sizeof(**ret->tab));
193 ret->tab[0]->item = NULL;
194 ret->tab[0]->seq = ret->seq++;
195 ret->len++;
196
197 return ret;
198 }
199
200
gw_prioqueue_destroy(gw_prioqueue_t * queue,void (* item_destroy)(void *))201 void gw_prioqueue_destroy(gw_prioqueue_t *queue, void(*item_destroy)(void*))
202 {
203 long i;
204
205 if (queue == NULL)
206 return;
207
208 for (i = 0; i < queue->len; i++) {
209 if (item_destroy != NULL && queue->tab[i]->item != NULL)
210 item_destroy(queue->tab[i]->item);
211 gw_free(queue->tab[i]);
212 }
213 mutex_destroy(queue->mutex);
214 pthread_cond_destroy(&queue->nonempty);
215 gw_free(queue->tab);
216 gw_free(queue);
217 }
218
219
gw_prioqueue_len(gw_prioqueue_t * queue)220 long gw_prioqueue_len(gw_prioqueue_t *queue)
221 {
222 long len;
223
224 if (queue == NULL)
225 return 0;
226
227 queue_lock(queue);
228 len = queue->len - 1;
229 queue_unlock(queue);
230
231 return len;
232 }
233
234
gw_prioqueue_insert(gw_prioqueue_t * queue,void * item)235 void gw_prioqueue_insert(gw_prioqueue_t *queue, void *item)
236 {
237 gw_assert(queue != NULL);
238 gw_assert(item != NULL);
239
240 queue_lock(queue);
241 make_bigger(queue, 1);
242 queue->tab[queue->len] = gw_malloc(sizeof(**queue->tab));
243 queue->tab[queue->len]->item = item;
244 queue->tab[queue->len]->seq = queue->seq++;
245 upheap(queue, queue->len);
246 queue->len++;
247 pthread_cond_signal(&queue->nonempty);
248 queue_unlock(queue);
249 }
250
251
gw_prioqueue_foreach(gw_prioqueue_t * queue,void (* fn)(const void *,long))252 void gw_prioqueue_foreach(gw_prioqueue_t *queue, void(*fn)(const void *, long))
253 {
254 register long i;
255
256 gw_assert(queue != NULL && fn != NULL);
257
258 queue_lock(queue);
259 for (i = 1; i < queue->len; i++)
260 fn(queue->tab[i]->item, i - 1);
261 queue_unlock(queue);
262 }
263
264
gw_prioqueue_remove(gw_prioqueue_t * queue)265 void *gw_prioqueue_remove(gw_prioqueue_t *queue)
266 {
267 void *ret;
268
269 gw_assert(queue != NULL);
270
271 queue_lock(queue);
272 if (queue->len <= 1) {
273 queue_unlock(queue);
274 return NULL;
275 }
276 ret = queue->tab[1]->item;
277 gw_free(queue->tab[1]);
278 queue->tab[1] = queue->tab[--queue->len];
279 downheap(queue, 1);
280 queue_unlock(queue);
281
282 return ret;
283 }
284
285
gw_prioqueue_get(gw_prioqueue_t * queue)286 void *gw_prioqueue_get(gw_prioqueue_t *queue)
287 {
288 void *ret;
289
290 gw_assert(queue != NULL);
291
292 queue_lock(queue);
293 if (queue->len > 1)
294 ret = queue->tab[1]->item;
295 else
296 ret = NULL;
297 queue_unlock(queue);
298
299 return ret;
300 }
301
302
gw_prioqueue_consume(gw_prioqueue_t * queue)303 void *gw_prioqueue_consume(gw_prioqueue_t *queue)
304 {
305 void *ret;
306
307 gw_assert(queue != NULL);
308
309 queue_lock(queue);
310 while (queue->len == 1 && queue->producers > 0) {
311 queue->mutex->owner = -1;
312 pthread_cleanup_push((void(*)(void*))pthread_mutex_unlock, &queue->mutex->mutex);
313 pthread_cond_wait(&queue->nonempty, &queue->mutex->mutex);
314 pthread_cleanup_pop(0);
315 queue->mutex->owner = gwthread_self();
316 }
317 if (queue->len > 1) {
318 ret = queue->tab[1]->item;
319 gw_free(queue->tab[1]);
320 queue->tab[1] = queue->tab[--queue->len];
321 downheap(queue, 1);
322 } else {
323 ret = NULL;
324 }
325 queue_unlock(queue);
326
327 return ret;
328 }
329
330
gw_prioqueue_add_producer(gw_prioqueue_t * queue)331 void gw_prioqueue_add_producer(gw_prioqueue_t *queue)
332 {
333 gw_assert(queue != NULL);
334
335 queue_lock(queue);
336 queue->producers++;
337 queue_unlock(queue);
338 }
339
340
gw_prioqueue_remove_producer(gw_prioqueue_t * queue)341 void gw_prioqueue_remove_producer(gw_prioqueue_t *queue)
342 {
343 gw_assert(queue != NULL);
344
345 queue_lock(queue);
346 gw_assert(queue->producers > 0);
347 queue->producers--;
348 pthread_cond_broadcast(&queue->nonempty);
349 queue_unlock(queue);
350 }
351
352
gw_prioqueue_producer_count(gw_prioqueue_t * queue)353 long gw_prioqueue_producer_count(gw_prioqueue_t *queue)
354 {
355 long ret;
356
357 gw_assert(queue != NULL);
358
359 queue_lock(queue);
360 ret = queue->producers;
361 queue_unlock(queue);
362
363 return ret;
364 }
365
366