1 /* Threaded work queue.
2  *
3  * Contents:
4  *    1. Work queue routines
5  *    2. Examples.
6  */
7 #include "esl_config.h"
8 
9 #ifdef HAVE_PTHREAD
10 
11 #include <stdio.h>
12 #include <stdlib.h>
13 #include <string.h>
14 #include <pthread.h>
15 
16 #include "easel.h"
17 #include "esl_workqueue.h"
18 
19 /*****************************************************************
20  *# 1. Work queue routines
21  *****************************************************************/
22 
23 /* Function:  esl_workqueue_Create()
24  * Synopsis:  Create a work queue object.
25  * Incept:    MSF, Thu Jun 18 11:51:39 2009
26  *
27  * Purpose:   Creates an <ESL_WORK_QUEUE> object of <size>.  The
28  *            queues are used to handle objects <void *> that
29  *            are ready to be processed and that have been
30  *            processed by worker threads.
31  *
32  * Returns:   ptr to the new <ESL_WORK_QUEUE> object.
33  *
34  * Throws:    <eslESYS> on allocation or initialization failure.
35  */
36 ESL_WORK_QUEUE *
esl_workqueue_Create(int size)37 esl_workqueue_Create(int size)
38 {
39   int             i;
40   int             status;
41   ESL_WORK_QUEUE *queue = NULL;
42 
43   ESL_ALLOC(queue, sizeof(ESL_WORK_QUEUE));
44 
45   queue->readerQueue     = NULL;
46   queue->readerQueueCnt  = 0;
47   queue->readerQueueHead = 0;
48 
49   queue->workerQueue     = NULL;
50   queue->workerQueueCnt  = 0;
51   queue->workerQueueHead = 0;
52 
53   queue->queueSize       = size;
54   queue->pendingWorkers  = 0;
55 
56   if (pthread_mutex_init(&queue->queueMutex, NULL) != 0)     ESL_XEXCEPTION(eslESYS, "mutex init failed");
57 
58   if (pthread_cond_init(&queue->readerQueueCond, NULL) != 0) ESL_XEXCEPTION(eslESYS, "cond reader init failed");
59   if (pthread_cond_init(&queue->workerQueueCond, NULL) != 0) ESL_XEXCEPTION(eslESYS, "cond worker init failed");
60 
61   ESL_ALLOC(queue->readerQueue, sizeof(void *) * size);
62   ESL_ALLOC(queue->workerQueue, sizeof(void *) * size);
63 
64   for (i = 0; i < queue->queueSize; ++i)
65     {
66       queue->readerQueue[i] = NULL;
67       queue->workerQueue[i] = NULL;
68     }
69 
70   return queue;
71 
72  ERROR:
73   esl_workqueue_Destroy(queue);
74   return NULL;
75 }
76 
77 /* Function:  esl_workqueue_Destroy()
78  * Synopsis:  Destroys an <ESL_WORK_QUEUE> object.
79  * Incept:    MSF, Thu Jun 18 11:51:39 2009
80  *
81  * Purpose:   Frees an <ESL_WORK_QUEUE> object.
82  *
83  *            The calling routine is responsible for freeing the
84  *            memory of the actual queued objects.
85  *
86  * Returns:   void
87  */
88 void
esl_workqueue_Destroy(ESL_WORK_QUEUE * queue)89 esl_workqueue_Destroy(ESL_WORK_QUEUE *queue)
90 {
91   if (queue == NULL) return;
92 
93   pthread_mutex_destroy (&queue->queueMutex);
94   pthread_cond_destroy  (&queue->readerQueueCond);
95   pthread_cond_destroy  (&queue->workerQueueCond);
96 
97   if (queue->readerQueue != NULL) free(queue->readerQueue);
98   if (queue->workerQueue != NULL) free(queue->workerQueue);
99 
100   free(queue);
101 }
102 
103 /* Function:  esl_workqueue_Init()
104  * Synopsis:  Adds a queued object to the producers list.
105  * Incept:    MSF, Thu Jun 18 11:51:39 2009
106  *
107  * Purpose:   Added a work object <void> to the producers list checking for
108  *            any errors.
109  *
110  * Returns:   <eslOK> on success.
111  *
112  * Throws:    <eslESYS> if thread synchronization fails somewhere.
113  *            <eslEINVAL> if something's wrong with <queue> or <ptr>.
114  */
esl_workqueue_Init(ESL_WORK_QUEUE * queue,void * ptr)115 int esl_workqueue_Init(ESL_WORK_QUEUE *queue, void *ptr)
116 {
117   int cnt;
118   int inx;
119 
120   int queueSize;
121 
122   if (queue == NULL) ESL_EXCEPTION(eslEINVAL, "Invalid queue object");
123   if (ptr == NULL)   ESL_EXCEPTION(eslEINVAL, "Invalid reader object");
124 
125   if (pthread_mutex_lock (&queue->queueMutex) != 0) ESL_EXCEPTION(eslESYS, "mutex lock failed");
126 
127   queueSize = queue->queueSize;
128 
129   /* check to make sure we won't overflow */
130   cnt = queue->readerQueueCnt;
131   if (cnt >= queueSize) ESL_EXCEPTION(eslEINVAL, "Reader queue overflow");
132 
133   inx = (queue->readerQueueHead + cnt) % queueSize;
134   queue->readerQueue[inx] = ptr;
135 
136   ++queue->readerQueueCnt;
137   if (cnt == 0)
138     {
139       if (pthread_cond_signal (&queue->readerQueueCond) != 0) ESL_EXCEPTION(eslESYS, "cond signal failed");
140     }
141 
142   if (pthread_mutex_unlock (&queue->queueMutex) != 0) ESL_EXCEPTION(eslESYS, "mutex unlock failed");
143 
144   return eslOK;
145 }
146 
147 /* Function:  esl_workqueue_Remove()
148  * Synopsis:  Removes a queued object from the producers list.
149  * Incept:    MSF, Thu Jun 18 11:51:39 2009
150  *
151  * Purpose:   Removes a queued object from the producers list.
152  *
153  *            A object <void> that has already been consumed by a worker
154  *            is removed the the producers list.  If there are no empty
155  *            objects, a <obj> is set to NULL.
156  *
157  *            The pointer to the object is returned in the obj arguement.
158  *
159  * Returns:   <eslOK>  on success.
160  *            <eslEOD> if no objects are in the queue.
161  *
162  * Throws:    <eslESYS> if thread synchronization fails somewhere.
163  *            <eslEINVAL> if something's wrong with <queue>.
164  */
165 int
esl_workqueue_Remove(ESL_WORK_QUEUE * queue,void ** obj)166 esl_workqueue_Remove(ESL_WORK_QUEUE *queue, void **obj)
167 {
168   int inx;
169   int status = eslEOD;
170 
171   if (obj == NULL)   ESL_EXCEPTION(eslEINVAL, "Invalid object pointer");
172   if (queue == NULL) ESL_EXCEPTION(eslEINVAL, "Invalid queue object");
173 
174   if (pthread_mutex_lock (&queue->queueMutex) != 0) ESL_EXCEPTION(eslESYS, "mutex lock failed");
175 
176   /* check if there are any items on the readers list */
177   *obj = NULL;
178   if (queue->readerQueueCnt > 0)
179     {
180       inx = (queue->readerQueueHead + queue->readerQueueCnt) % queue->queueSize;
181       *obj = queue->readerQueue[inx];
182       queue->readerQueue[inx] = NULL;
183       --queue->readerQueueCnt;
184       status = eslOK;
185     }
186 
187   if (pthread_mutex_unlock (&queue->queueMutex) != 0) ESL_EXCEPTION(eslESYS, "mutex unlock failed");
188 
189   return status;
190 }
191 
192 /* Function:  esl_workqueue_Complete()
193  * Synopsis:  Signals the end of the queue.
194  * Incept:    MSF, Thu Jun 18 11:51:39 2009
195  *
196  * Purpose:   Signal the end of the queue.  If there are any threads
197  *            waiting on an object, signal them to wake up and complete
198  *            their processing.
199  *
200  * Returns:   <eslOK> on success.
201  *
202  * Throws:    <eslESYS> if thread synchronization fails somewhere.
203  *            <eslEINVAL> if something's wrong with <queue>.
204  */
205 int
esl_workqueue_Complete(ESL_WORK_QUEUE * queue)206 esl_workqueue_Complete(ESL_WORK_QUEUE *queue)
207 {
208   if (queue == NULL)                                ESL_EXCEPTION(eslEINVAL, "Invalid queue object");
209   if (pthread_mutex_lock (&queue->queueMutex) != 0) ESL_EXCEPTION(eslESYS,   "mutex lock failed");
210 
211   if (queue->pendingWorkers != 0)
212     {
213       if (pthread_cond_broadcast (&queue->workerQueueCond) != 0) ESL_EXCEPTION(eslESYS, "broadcast failed");
214     }
215 
216   if (pthread_mutex_unlock (&queue->queueMutex) != 0) ESL_EXCEPTION(eslESYS, "mutex unlock failed");
217 
218   return eslOK;
219 }
220 
221 /* Function:  esl_workqueue_Reset()
222  * Synopsis:  Reset the queue for another run.
223  * Incept:    MSF, Thu Jun 18 11:51:39 2009
224  *
225  * Purpose:   Reset the queue for another run.  This is done by moving
226  *            all the queued object to the reader's list (i.e. producer).
227  *
228  * Returns:   <eslOK> on success.
229  *
230  * Throws:    <eslESYS> if thread synchronization fails somewhere.
231  *            <eslEINVAL> if something's wrong with <queue>.
232  */
233 int
esl_workqueue_Reset(ESL_WORK_QUEUE * queue)234 esl_workqueue_Reset(ESL_WORK_QUEUE *queue)
235 {
236   int inx;
237   int queueSize;
238 
239   if (queue == NULL)                                ESL_EXCEPTION(eslEINVAL, "Invalid queue object");
240   if (pthread_mutex_lock (&queue->queueMutex) != 0) ESL_EXCEPTION(eslESYS,   "mutex lock failed");
241 
242   queueSize = queue->queueSize;
243 
244   /* move all buffers back to the reader queue */
245   while (queue->workerQueueCnt > 0)
246     {
247       inx = (queue->readerQueueHead + queue->readerQueueCnt) % queueSize;
248       queue->readerQueue[inx] = queue->workerQueue[queue->workerQueueHead];
249       ++queue->readerQueueCnt;
250 
251       queue->workerQueue[queue->workerQueueHead] = NULL;
252       queue->workerQueueHead = (queue->workerQueueHead + 1) % queueSize;
253       --queue->workerQueueCnt;
254     }
255 
256   queue->pendingWorkers = 0;
257 
258   if (pthread_mutex_unlock (&queue->queueMutex) != 0) ESL_EXCEPTION(eslESYS, "mutex unlock failed");
259 
260   return eslOK;
261 }
262 
263 /* Function:  esl_workqueue_ReaderUpdate()
264  * Synopsis:  Producer routine.
265  * Incept:    MSF, Thu Jun 18 11:51:39 2009
266  *
267  * Purpose:   The producer (i.e. Reader) places an object, that is
268  *            ready to be processed by a worker on the consumers
269  *            (i.e. Workers) work queue.
270  *
271  *            If the <in> object is not null, it is placed on the
272  *            workers queue.  If there are any workers waiting for
273  *            an object, they are signaled to wake up.
274  *
275  *            If the reader routine has supplied an <out> pointer,
276  *            an object that has already been processed by a worker,
277  *            is placed in <out> so the object can be made ready
278  *            for another worker thread.
279  *
280  * Returns:   <eslOK> on success.
281  *
282  * Throws:    <eslESYS> if thread synchronization fails somewhere.
283  *            <eslEINVAL> if something's wrong with <queue>.
284  */
esl_workqueue_ReaderUpdate(ESL_WORK_QUEUE * queue,void * in,void ** out)285 int esl_workqueue_ReaderUpdate(ESL_WORK_QUEUE *queue, void *in, void **out)
286 {
287   int inx;
288   int queueSize;
289 
290   if (queue == NULL)                                ESL_EXCEPTION(eslEINVAL, "Invalid queue object");
291   if (pthread_mutex_lock (&queue->queueMutex) != 0) ESL_EXCEPTION(eslESYS,   "mutex lock failed");
292 
293   queueSize = queue->queueSize;
294 
295   /* check if the caller is queuing up an item */
296   if (in != NULL)
297     {
298 
299       /* check to make sure we don't overflow */
300       if (queue->workerQueueCnt >= queueSize) ESL_EXCEPTION(eslEINVAL, "Work queue overflow");
301 
302       inx = (queue->workerQueueHead + queue->workerQueueCnt) % queueSize;
303       queue->workerQueue[inx] = in;
304       ++queue->workerQueueCnt;
305 
306       if (queue->pendingWorkers != 0)
307 	{
308 	  if (pthread_cond_broadcast (&queue->workerQueueCond) != 0) ESL_EXCEPTION(eslESYS, "broadcast failed");
309 	}
310     }
311 
312   /* check if the caller is waiting for a queued item */
313   if (out != NULL)
314     {
315 
316       /* wait for a processed buffers to be returned */
317       while (queue->readerQueueCnt == 0)
318 	{
319 	  if (pthread_cond_wait (&queue->readerQueueCond, &queue->queueMutex) != 0) ESL_EXCEPTION(eslESYS, "cond wait failed");
320 	}
321 
322       inx = queue->readerQueueHead;
323       *out = queue->readerQueue[inx];
324       queue->readerQueue[inx] = NULL;
325       queue->readerQueueHead = (queue->readerQueueHead + 1) % queueSize;
326       --queue->readerQueueCnt;
327     }
328 
329   if (pthread_mutex_unlock (&queue->queueMutex) != 0) ESL_EXCEPTION(eslESYS, "mutex unlock failed");
330 
331   return eslOK;
332 }
333 
334 /* Function:  esl_workqueue_WorkerUpdate()
335  * Synopsis:  Consumer routine.
336  * Incept:    MSF, Thu Jun 18 11:51:39 2009
337  *
338  * Purpose:   The consumer (i.e. Worker) places an object that has
339  *            been processed on the producers (i.e. Readers) queue.
340  *
341  *            If the <in> object is not null, it is placed on the
342  *            readers queue.  If the reader is waiting for an object,
343  *            it is signaled it to wake up.
344  *
345  *            If the worker routine has supplied an <out> pointer,
346  *            an object that is ready for processing by a worker,
347  *            is placed in <out> so the worker thread can continue.
348  *
349  * Returns:   <eslOK> on success.
350  *
351  * Throws:    <eslESYS> if thread synchronization fails somewhere.
352  *            <eslEINVAL> if something's wrong with <queue>.
353  */
esl_workqueue_WorkerUpdate(ESL_WORK_QUEUE * queue,void * in,void ** out)354 int esl_workqueue_WorkerUpdate(ESL_WORK_QUEUE *queue, void *in, void **out)
355 {
356   int cnt;
357   int inx;
358   int queueSize;
359   int status;
360 
361   if (queue == NULL)                                ESL_XEXCEPTION(eslEINVAL, "Invalid queue object");
362   if (pthread_mutex_lock (&queue->queueMutex) != 0) ESL_XEXCEPTION(eslESYS,   "mutex lock failed");
363 
364   queueSize = queue->queueSize;
365 
366   /* check if the caller is queuing up an item */
367   if (in != NULL)
368     {
369 
370       /* check to make sure we don't overflow */
371       if (queue->readerQueueCnt >= queueSize) ESL_XEXCEPTION(eslEINVAL, "Reader queue overflow");
372 
373       inx = (queue->readerQueueHead + queue->readerQueueCnt) % queueSize;
374       queue->readerQueue[inx] = in;
375       cnt = queue->readerQueueCnt++;
376       if (cnt == 0)
377 	{
378 	  if (pthread_cond_signal (&queue->readerQueueCond) != 0) ESL_XEXCEPTION(eslESYS, "cond signal failed");
379 	}
380     }
381 
382   /* check if the caller is waiting for a queued item */
383   if (out != NULL)
384     {
385 
386       if (queue->workerQueueCnt == 0)
387 	{
388 	  /* wait for a processed buffers to be returned */
389 	  ++queue->pendingWorkers;
390 	  while (queue->workerQueueCnt == 0)
391 	    {
392 	      if (pthread_cond_wait (&queue->workerQueueCond, &queue->queueMutex) != 0) ESL_XEXCEPTION(eslESYS, "cond wait failed");
393 	    }
394 	  --queue->pendingWorkers;
395 	}
396 
397       inx = queue->workerQueueHead;
398       *out = queue->workerQueue[inx];
399       queue->workerQueue[inx] = NULL;
400       queue->workerQueueHead = (queue->workerQueueHead + 1) % queueSize;
401       --queue->workerQueueCnt;
402     }
403 
404   if (pthread_mutex_unlock (&queue->queueMutex) != 0) ESL_XEXCEPTION(eslESYS, "mutex unlock failed");
405   return eslOK;
406 
407  ERROR:
408   if (out) *out = NULL;
409   return status;
410 }
411 
412 /* Function:  esl_workqueue_Dump()
413  * Synopsis:  Print the contents of the queues.
414  * Incept:    MSF, Thu Jun 18 11:51:39 2009
415  *
416  * Purpose:   Print the contents of the queues and their pointers.
417  *
418  * Returns:   <eslOK> on success.
419  */
esl_workqueue_Dump(ESL_WORK_QUEUE * queue)420 int esl_workqueue_Dump(ESL_WORK_QUEUE *queue)
421 {
422   int i;
423 
424   if (queue == NULL)                                ESL_EXCEPTION(eslEINVAL, "Invalid queue object");
425   if (pthread_mutex_lock (&queue->queueMutex) != 0) ESL_EXCEPTION(eslESYS,   "mutex lock failed");
426 
427   printf ("Reader head: %2d  count: %2d\n", queue->readerQueueHead, queue->readerQueueCnt);
428   printf ("Worker head: %2d  count: %2d\n", queue->workerQueueHead, queue->workerQueueCnt);
429   for (i = 0; i < queue->queueSize; ++i)
430     {
431       printf ("  %2d:  %p  %p\n", i, queue->readerQueue[i], queue->workerQueue[i]);
432     }
433   printf ("Pending: %2d\n\n", queue->pendingWorkers);
434 
435   if (pthread_mutex_unlock (&queue->queueMutex) != 0) ESL_EXCEPTION(eslESYS, "mutex unlock failed");
436 
437   return eslOK;
438 }
439 
440 /*****************************************************************
441  * 2. Example
442  *****************************************************************/
443 
444 #ifdef eslWORKQUEUE_EXAMPLE
445 #include "easel.h"
446 #include "esl_threads.h"
447 #include "esl_workqueue.h"
448 
449 typedef struct {
450   char             id;
451   ESL_WORK_QUEUE  *queue;
452 } WORK_INFO;
453 
454 /* gcc --std=gnu99 -g -Wall -pthread -o esl_workqueue_example -I. -DeslWORKQUEUE_EXAMPLE esl_workqueue.c easel.c */
455 static void
worker_thread(void * data)456 worker_thread(void *data)
457 {
458   ESL_THREADS *thr  = (ESL_THREADS *) data;
459   WORK_INFO   *info = NULL;
460   int          idx;
461 
462   int         *obj;
463 
464   esl_threads_Started(thr, &idx);
465 
466   info = (WORK_INFO *) esl_threads_GetData(thr, idx);
467   printf("THREAD %c: ready\n", info->id);
468 
469   esl_workqueue_WorkerUpdate(info->queue, NULL, (void *) &obj);
470   while (*obj > 0)
471     {
472       printf("THREAD %c: processing %d\n", info->id, *obj);
473       esl_workqueue_WorkerUpdate(info->queue, obj, (void *) &obj);
474     }
475 
476   printf("THREAD %c: done\n", info->id);
477 
478   esl_threads_Finished(thr, idx);
479   return;
480 }
481 
482 int
main(void)483 main(void)
484 {
485   int            i;
486   int            ncpu    = 4;
487   int            iter    = 25;
488   WORK_INFO     *worker  = NULL;
489 
490   ESL_THREADS    *thr    = NULL;
491   ESL_WORK_QUEUE *queue  = NULL;
492 
493   int            *objs   = NULL;
494   int            *obj;
495 
496   objs   = malloc(sizeof(int) * ncpu * 2);
497   worker = malloc(sizeof(WORK_INFO) * ncpu);
498 
499   thr = esl_threads_Create(&worker_thread);
500 
501   /* Create a work queue that is able to hold two items per thread.
502    * The idea is that while one object is being processed by a
503    * worker thread, another item is being readied.  So, when the
504    * worker thread has completed processing its current object,
505    * its next object to processes is hopefully waiting.
506    */
507   queue = esl_workqueue_Create(ncpu * 2);
508   for (i = 0; i < ncpu * 2; i++)
509     {
510       objs[i] = 0;
511       esl_workqueue_Init(queue, &objs[i]);
512     }
513 
514   for (i = 0; i < ncpu; i++)
515     {
516       worker[i].id    = 'A' + i;
517       worker[i].queue = queue;
518       esl_threads_AddThread(thr, (void *) &worker[i]);
519     }
520 
521   esl_threads_WaitForStart (thr);
522 
523   /* For N number of iterations, get an object that has been
524    * processed, i.e. on the readers input queue and place it
525    * on the ready queue.
526    */
527   esl_workqueue_ReaderUpdate(queue, NULL, (void **) &obj);
528   for (i = 1; i <= iter; ++i)
529     {
530       *obj = i;
531       printf("Item %d is ready to be processed\n", *obj);
532       esl_workqueue_ReaderUpdate(queue, obj, (void **) &obj);
533     }
534 
535   /* put zeros on the queues to signal the worker that we are done */
536   for (i = 0; i < ncpu; ++i)
537     {
538       *obj = 0;
539       esl_workqueue_ReaderUpdate(queue, obj, (void **) &obj);
540     }
541 
542   /* The worker threads now run their work. */
543   esl_threads_WaitForFinish(thr);
544   esl_threads_Destroy(thr);
545 
546   esl_workqueue_Destroy(queue);
547 
548   free(worker);
549   free(objs);
550 
551   return eslOK;
552 }
553 #endif /*eslWORKQUEUE_EXAMPLE*/
554 #endif /* HAVE_PTHREAD */
555 
556 
557 
558