1 /* Threaded work queue.
2 *
3 * Contents:
4 * 1. Work queue routines
5 * 2. Examples.
6 */
7 #include "esl_config.h"
8
9 #ifdef HAVE_PTHREAD
10
11 #include <stdio.h>
12 #include <stdlib.h>
13 #include <string.h>
14 #include <pthread.h>
15
16 #include "easel.h"
17 #include "esl_workqueue.h"
18
19 /*****************************************************************
20 *# 1. Work queue routines
21 *****************************************************************/
22
23 /* Function: esl_workqueue_Create()
24 * Synopsis: Create a work queue object.
25 * Incept: MSF, Thu Jun 18 11:51:39 2009
26 *
27 * Purpose: Creates an <ESL_WORK_QUEUE> object of <size>. The
28 * queues are used to handle objects <void *> that
29 * are ready to be processed and that have been
30 * processed by worker threads.
31 *
32 * Returns: ptr to the new <ESL_WORK_QUEUE> object.
33 *
34 * Throws: <eslESYS> on allocation or initialization failure.
35 */
36 ESL_WORK_QUEUE *
esl_workqueue_Create(int size)37 esl_workqueue_Create(int size)
38 {
39 int i;
40 int status;
41 ESL_WORK_QUEUE *queue = NULL;
42
43 ESL_ALLOC(queue, sizeof(ESL_WORK_QUEUE));
44
45 queue->readerQueue = NULL;
46 queue->readerQueueCnt = 0;
47 queue->readerQueueHead = 0;
48
49 queue->workerQueue = NULL;
50 queue->workerQueueCnt = 0;
51 queue->workerQueueHead = 0;
52
53 queue->queueSize = size;
54 queue->pendingWorkers = 0;
55
56 if (pthread_mutex_init(&queue->queueMutex, NULL) != 0) ESL_XEXCEPTION(eslESYS, "mutex init failed");
57
58 if (pthread_cond_init(&queue->readerQueueCond, NULL) != 0) ESL_XEXCEPTION(eslESYS, "cond reader init failed");
59 if (pthread_cond_init(&queue->workerQueueCond, NULL) != 0) ESL_XEXCEPTION(eslESYS, "cond worker init failed");
60
61 ESL_ALLOC(queue->readerQueue, sizeof(void *) * size);
62 ESL_ALLOC(queue->workerQueue, sizeof(void *) * size);
63
64 for (i = 0; i < queue->queueSize; ++i)
65 {
66 queue->readerQueue[i] = NULL;
67 queue->workerQueue[i] = NULL;
68 }
69
70 return queue;
71
72 ERROR:
73 esl_workqueue_Destroy(queue);
74 return NULL;
75 }
76
77 /* Function: esl_workqueue_Destroy()
78 * Synopsis: Destroys an <ESL_WORK_QUEUE> object.
79 * Incept: MSF, Thu Jun 18 11:51:39 2009
80 *
81 * Purpose: Frees an <ESL_WORK_QUEUE> object.
82 *
83 * The calling routine is responsible for freeing the
84 * memory of the actual queued objects.
85 *
86 * Returns: void
87 */
88 void
esl_workqueue_Destroy(ESL_WORK_QUEUE * queue)89 esl_workqueue_Destroy(ESL_WORK_QUEUE *queue)
90 {
91 if (queue == NULL) return;
92
93 pthread_mutex_destroy (&queue->queueMutex);
94 pthread_cond_destroy (&queue->readerQueueCond);
95 pthread_cond_destroy (&queue->workerQueueCond);
96
97 if (queue->readerQueue != NULL) free(queue->readerQueue);
98 if (queue->workerQueue != NULL) free(queue->workerQueue);
99
100 free(queue);
101 }
102
103 /* Function: esl_workqueue_Init()
104 * Synopsis: Adds a queued object to the producers list.
105 * Incept: MSF, Thu Jun 18 11:51:39 2009
106 *
107 * Purpose: Added a work object <void> to the producers list checking for
108 * any errors.
109 *
110 * Returns: <eslOK> on success.
111 *
112 * Throws: <eslESYS> if thread synchronization fails somewhere.
113 * <eslEINVAL> if something's wrong with <queue> or <ptr>.
114 */
esl_workqueue_Init(ESL_WORK_QUEUE * queue,void * ptr)115 int esl_workqueue_Init(ESL_WORK_QUEUE *queue, void *ptr)
116 {
117 int cnt;
118 int inx;
119
120 int queueSize;
121
122 if (queue == NULL) ESL_EXCEPTION(eslEINVAL, "Invalid queue object");
123 if (ptr == NULL) ESL_EXCEPTION(eslEINVAL, "Invalid reader object");
124
125 if (pthread_mutex_lock (&queue->queueMutex) != 0) ESL_EXCEPTION(eslESYS, "mutex lock failed");
126
127 queueSize = queue->queueSize;
128
129 /* check to make sure we won't overflow */
130 cnt = queue->readerQueueCnt;
131 if (cnt >= queueSize) ESL_EXCEPTION(eslEINVAL, "Reader queue overflow");
132
133 inx = (queue->readerQueueHead + cnt) % queueSize;
134 queue->readerQueue[inx] = ptr;
135
136 ++queue->readerQueueCnt;
137 if (cnt == 0)
138 {
139 if (pthread_cond_signal (&queue->readerQueueCond) != 0) ESL_EXCEPTION(eslESYS, "cond signal failed");
140 }
141
142 if (pthread_mutex_unlock (&queue->queueMutex) != 0) ESL_EXCEPTION(eslESYS, "mutex unlock failed");
143
144 return eslOK;
145 }
146
147 /* Function: esl_workqueue_Remove()
148 * Synopsis: Removes a queued object from the producers list.
149 * Incept: MSF, Thu Jun 18 11:51:39 2009
150 *
151 * Purpose: Removes a queued object from the producers list.
152 *
153 * A object <void> that has already been consumed by a worker
154 * is removed the the producers list. If there are no empty
155 * objects, a <obj> is set to NULL.
156 *
157 * The pointer to the object is returned in the obj arguement.
158 *
159 * Returns: <eslOK> on success.
160 * <eslEOD> if no objects are in the queue.
161 *
162 * Throws: <eslESYS> if thread synchronization fails somewhere.
163 * <eslEINVAL> if something's wrong with <queue>.
164 */
165 int
esl_workqueue_Remove(ESL_WORK_QUEUE * queue,void ** obj)166 esl_workqueue_Remove(ESL_WORK_QUEUE *queue, void **obj)
167 {
168 int inx;
169 int status = eslEOD;
170
171 if (obj == NULL) ESL_EXCEPTION(eslEINVAL, "Invalid object pointer");
172 if (queue == NULL) ESL_EXCEPTION(eslEINVAL, "Invalid queue object");
173
174 if (pthread_mutex_lock (&queue->queueMutex) != 0) ESL_EXCEPTION(eslESYS, "mutex lock failed");
175
176 /* check if there are any items on the readers list */
177 *obj = NULL;
178 if (queue->readerQueueCnt > 0)
179 {
180 inx = (queue->readerQueueHead + queue->readerQueueCnt) % queue->queueSize;
181 *obj = queue->readerQueue[inx];
182 queue->readerQueue[inx] = NULL;
183 --queue->readerQueueCnt;
184 status = eslOK;
185 }
186
187 if (pthread_mutex_unlock (&queue->queueMutex) != 0) ESL_EXCEPTION(eslESYS, "mutex unlock failed");
188
189 return status;
190 }
191
192 /* Function: esl_workqueue_Complete()
193 * Synopsis: Signals the end of the queue.
194 * Incept: MSF, Thu Jun 18 11:51:39 2009
195 *
196 * Purpose: Signal the end of the queue. If there are any threads
197 * waiting on an object, signal them to wake up and complete
198 * their processing.
199 *
200 * Returns: <eslOK> on success.
201 *
202 * Throws: <eslESYS> if thread synchronization fails somewhere.
203 * <eslEINVAL> if something's wrong with <queue>.
204 */
205 int
esl_workqueue_Complete(ESL_WORK_QUEUE * queue)206 esl_workqueue_Complete(ESL_WORK_QUEUE *queue)
207 {
208 if (queue == NULL) ESL_EXCEPTION(eslEINVAL, "Invalid queue object");
209 if (pthread_mutex_lock (&queue->queueMutex) != 0) ESL_EXCEPTION(eslESYS, "mutex lock failed");
210
211 if (queue->pendingWorkers != 0)
212 {
213 if (pthread_cond_broadcast (&queue->workerQueueCond) != 0) ESL_EXCEPTION(eslESYS, "broadcast failed");
214 }
215
216 if (pthread_mutex_unlock (&queue->queueMutex) != 0) ESL_EXCEPTION(eslESYS, "mutex unlock failed");
217
218 return eslOK;
219 }
220
221 /* Function: esl_workqueue_Reset()
222 * Synopsis: Reset the queue for another run.
223 * Incept: MSF, Thu Jun 18 11:51:39 2009
224 *
225 * Purpose: Reset the queue for another run. This is done by moving
226 * all the queued object to the reader's list (i.e. producer).
227 *
228 * Returns: <eslOK> on success.
229 *
230 * Throws: <eslESYS> if thread synchronization fails somewhere.
231 * <eslEINVAL> if something's wrong with <queue>.
232 */
233 int
esl_workqueue_Reset(ESL_WORK_QUEUE * queue)234 esl_workqueue_Reset(ESL_WORK_QUEUE *queue)
235 {
236 int inx;
237 int queueSize;
238
239 if (queue == NULL) ESL_EXCEPTION(eslEINVAL, "Invalid queue object");
240 if (pthread_mutex_lock (&queue->queueMutex) != 0) ESL_EXCEPTION(eslESYS, "mutex lock failed");
241
242 queueSize = queue->queueSize;
243
244 /* move all buffers back to the reader queue */
245 while (queue->workerQueueCnt > 0)
246 {
247 inx = (queue->readerQueueHead + queue->readerQueueCnt) % queueSize;
248 queue->readerQueue[inx] = queue->workerQueue[queue->workerQueueHead];
249 ++queue->readerQueueCnt;
250
251 queue->workerQueue[queue->workerQueueHead] = NULL;
252 queue->workerQueueHead = (queue->workerQueueHead + 1) % queueSize;
253 --queue->workerQueueCnt;
254 }
255
256 queue->pendingWorkers = 0;
257
258 if (pthread_mutex_unlock (&queue->queueMutex) != 0) ESL_EXCEPTION(eslESYS, "mutex unlock failed");
259
260 return eslOK;
261 }
262
263 /* Function: esl_workqueue_ReaderUpdate()
264 * Synopsis: Producer routine.
265 * Incept: MSF, Thu Jun 18 11:51:39 2009
266 *
267 * Purpose: The producer (i.e. Reader) places an object, that is
268 * ready to be processed by a worker on the consumers
269 * (i.e. Workers) work queue.
270 *
271 * If the <in> object is not null, it is placed on the
272 * workers queue. If there are any workers waiting for
273 * an object, they are signaled to wake up.
274 *
275 * If the reader routine has supplied an <out> pointer,
276 * an object that has already been processed by a worker,
277 * is placed in <out> so the object can be made ready
278 * for another worker thread.
279 *
280 * Returns: <eslOK> on success.
281 *
282 * Throws: <eslESYS> if thread synchronization fails somewhere.
283 * <eslEINVAL> if something's wrong with <queue>.
284 */
esl_workqueue_ReaderUpdate(ESL_WORK_QUEUE * queue,void * in,void ** out)285 int esl_workqueue_ReaderUpdate(ESL_WORK_QUEUE *queue, void *in, void **out)
286 {
287 int inx;
288 int queueSize;
289
290 if (queue == NULL) ESL_EXCEPTION(eslEINVAL, "Invalid queue object");
291 if (pthread_mutex_lock (&queue->queueMutex) != 0) ESL_EXCEPTION(eslESYS, "mutex lock failed");
292
293 queueSize = queue->queueSize;
294
295 /* check if the caller is queuing up an item */
296 if (in != NULL)
297 {
298
299 /* check to make sure we don't overflow */
300 if (queue->workerQueueCnt >= queueSize) ESL_EXCEPTION(eslEINVAL, "Work queue overflow");
301
302 inx = (queue->workerQueueHead + queue->workerQueueCnt) % queueSize;
303 queue->workerQueue[inx] = in;
304 ++queue->workerQueueCnt;
305
306 if (queue->pendingWorkers != 0)
307 {
308 if (pthread_cond_broadcast (&queue->workerQueueCond) != 0) ESL_EXCEPTION(eslESYS, "broadcast failed");
309 }
310 }
311
312 /* check if the caller is waiting for a queued item */
313 if (out != NULL)
314 {
315
316 /* wait for a processed buffers to be returned */
317 while (queue->readerQueueCnt == 0)
318 {
319 if (pthread_cond_wait (&queue->readerQueueCond, &queue->queueMutex) != 0) ESL_EXCEPTION(eslESYS, "cond wait failed");
320 }
321
322 inx = queue->readerQueueHead;
323 *out = queue->readerQueue[inx];
324 queue->readerQueue[inx] = NULL;
325 queue->readerQueueHead = (queue->readerQueueHead + 1) % queueSize;
326 --queue->readerQueueCnt;
327 }
328
329 if (pthread_mutex_unlock (&queue->queueMutex) != 0) ESL_EXCEPTION(eslESYS, "mutex unlock failed");
330
331 return eslOK;
332 }
333
334 /* Function: esl_workqueue_WorkerUpdate()
335 * Synopsis: Consumer routine.
336 * Incept: MSF, Thu Jun 18 11:51:39 2009
337 *
338 * Purpose: The consumer (i.e. Worker) places an object that has
339 * been processed on the producers (i.e. Readers) queue.
340 *
341 * If the <in> object is not null, it is placed on the
342 * readers queue. If the reader is waiting for an object,
343 * it is signaled it to wake up.
344 *
345 * If the worker routine has supplied an <out> pointer,
346 * an object that is ready for processing by a worker,
347 * is placed in <out> so the worker thread can continue.
348 *
349 * Returns: <eslOK> on success.
350 *
351 * Throws: <eslESYS> if thread synchronization fails somewhere.
352 * <eslEINVAL> if something's wrong with <queue>.
353 */
esl_workqueue_WorkerUpdate(ESL_WORK_QUEUE * queue,void * in,void ** out)354 int esl_workqueue_WorkerUpdate(ESL_WORK_QUEUE *queue, void *in, void **out)
355 {
356 int cnt;
357 int inx;
358 int queueSize;
359 int status;
360
361 if (queue == NULL) ESL_XEXCEPTION(eslEINVAL, "Invalid queue object");
362 if (pthread_mutex_lock (&queue->queueMutex) != 0) ESL_XEXCEPTION(eslESYS, "mutex lock failed");
363
364 queueSize = queue->queueSize;
365
366 /* check if the caller is queuing up an item */
367 if (in != NULL)
368 {
369
370 /* check to make sure we don't overflow */
371 if (queue->readerQueueCnt >= queueSize) ESL_XEXCEPTION(eslEINVAL, "Reader queue overflow");
372
373 inx = (queue->readerQueueHead + queue->readerQueueCnt) % queueSize;
374 queue->readerQueue[inx] = in;
375 cnt = queue->readerQueueCnt++;
376 if (cnt == 0)
377 {
378 if (pthread_cond_signal (&queue->readerQueueCond) != 0) ESL_XEXCEPTION(eslESYS, "cond signal failed");
379 }
380 }
381
382 /* check if the caller is waiting for a queued item */
383 if (out != NULL)
384 {
385
386 if (queue->workerQueueCnt == 0)
387 {
388 /* wait for a processed buffers to be returned */
389 ++queue->pendingWorkers;
390 while (queue->workerQueueCnt == 0)
391 {
392 if (pthread_cond_wait (&queue->workerQueueCond, &queue->queueMutex) != 0) ESL_XEXCEPTION(eslESYS, "cond wait failed");
393 }
394 --queue->pendingWorkers;
395 }
396
397 inx = queue->workerQueueHead;
398 *out = queue->workerQueue[inx];
399 queue->workerQueue[inx] = NULL;
400 queue->workerQueueHead = (queue->workerQueueHead + 1) % queueSize;
401 --queue->workerQueueCnt;
402 }
403
404 if (pthread_mutex_unlock (&queue->queueMutex) != 0) ESL_XEXCEPTION(eslESYS, "mutex unlock failed");
405 return eslOK;
406
407 ERROR:
408 if (out) *out = NULL;
409 return status;
410 }
411
412 /* Function: esl_workqueue_Dump()
413 * Synopsis: Print the contents of the queues.
414 * Incept: MSF, Thu Jun 18 11:51:39 2009
415 *
416 * Purpose: Print the contents of the queues and their pointers.
417 *
418 * Returns: <eslOK> on success.
419 */
esl_workqueue_Dump(ESL_WORK_QUEUE * queue)420 int esl_workqueue_Dump(ESL_WORK_QUEUE *queue)
421 {
422 int i;
423
424 if (queue == NULL) ESL_EXCEPTION(eslEINVAL, "Invalid queue object");
425 if (pthread_mutex_lock (&queue->queueMutex) != 0) ESL_EXCEPTION(eslESYS, "mutex lock failed");
426
427 printf ("Reader head: %2d count: %2d\n", queue->readerQueueHead, queue->readerQueueCnt);
428 printf ("Worker head: %2d count: %2d\n", queue->workerQueueHead, queue->workerQueueCnt);
429 for (i = 0; i < queue->queueSize; ++i)
430 {
431 printf (" %2d: %p %p\n", i, queue->readerQueue[i], queue->workerQueue[i]);
432 }
433 printf ("Pending: %2d\n\n", queue->pendingWorkers);
434
435 if (pthread_mutex_unlock (&queue->queueMutex) != 0) ESL_EXCEPTION(eslESYS, "mutex unlock failed");
436
437 return eslOK;
438 }
439
440 /*****************************************************************
441 * 2. Example
442 *****************************************************************/
443
444 #ifdef eslWORKQUEUE_EXAMPLE
445 #include "easel.h"
446 #include "esl_threads.h"
447 #include "esl_workqueue.h"
448
449 typedef struct {
450 char id;
451 ESL_WORK_QUEUE *queue;
452 } WORK_INFO;
453
454 /* gcc --std=gnu99 -g -Wall -pthread -o esl_workqueue_example -I. -DeslWORKQUEUE_EXAMPLE esl_workqueue.c easel.c */
455 static void
worker_thread(void * data)456 worker_thread(void *data)
457 {
458 ESL_THREADS *thr = (ESL_THREADS *) data;
459 WORK_INFO *info = NULL;
460 int idx;
461
462 int *obj;
463
464 esl_threads_Started(thr, &idx);
465
466 info = (WORK_INFO *) esl_threads_GetData(thr, idx);
467 printf("THREAD %c: ready\n", info->id);
468
469 esl_workqueue_WorkerUpdate(info->queue, NULL, (void *) &obj);
470 while (*obj > 0)
471 {
472 printf("THREAD %c: processing %d\n", info->id, *obj);
473 esl_workqueue_WorkerUpdate(info->queue, obj, (void *) &obj);
474 }
475
476 printf("THREAD %c: done\n", info->id);
477
478 esl_threads_Finished(thr, idx);
479 return;
480 }
481
482 int
main(void)483 main(void)
484 {
485 int i;
486 int ncpu = 4;
487 int iter = 25;
488 WORK_INFO *worker = NULL;
489
490 ESL_THREADS *thr = NULL;
491 ESL_WORK_QUEUE *queue = NULL;
492
493 int *objs = NULL;
494 int *obj;
495
496 objs = malloc(sizeof(int) * ncpu * 2);
497 worker = malloc(sizeof(WORK_INFO) * ncpu);
498
499 thr = esl_threads_Create(&worker_thread);
500
501 /* Create a work queue that is able to hold two items per thread.
502 * The idea is that while one object is being processed by a
503 * worker thread, another item is being readied. So, when the
504 * worker thread has completed processing its current object,
505 * its next object to processes is hopefully waiting.
506 */
507 queue = esl_workqueue_Create(ncpu * 2);
508 for (i = 0; i < ncpu * 2; i++)
509 {
510 objs[i] = 0;
511 esl_workqueue_Init(queue, &objs[i]);
512 }
513
514 for (i = 0; i < ncpu; i++)
515 {
516 worker[i].id = 'A' + i;
517 worker[i].queue = queue;
518 esl_threads_AddThread(thr, (void *) &worker[i]);
519 }
520
521 esl_threads_WaitForStart (thr);
522
523 /* For N number of iterations, get an object that has been
524 * processed, i.e. on the readers input queue and place it
525 * on the ready queue.
526 */
527 esl_workqueue_ReaderUpdate(queue, NULL, (void **) &obj);
528 for (i = 1; i <= iter; ++i)
529 {
530 *obj = i;
531 printf("Item %d is ready to be processed\n", *obj);
532 esl_workqueue_ReaderUpdate(queue, obj, (void **) &obj);
533 }
534
535 /* put zeros on the queues to signal the worker that we are done */
536 for (i = 0; i < ncpu; ++i)
537 {
538 *obj = 0;
539 esl_workqueue_ReaderUpdate(queue, obj, (void **) &obj);
540 }
541
542 /* The worker threads now run their work. */
543 esl_threads_WaitForFinish(thr);
544 esl_threads_Destroy(thr);
545
546 esl_workqueue_Destroy(queue);
547
548 free(worker);
549 free(objs);
550
551 return eslOK;
552 }
553 #endif /*eslWORKQUEUE_EXAMPLE*/
554 #endif /* HAVE_PTHREAD */
555
556
557
558