1 /* Simple master/worker data parallelization using POSIX threads.
2  *
3  * Contents:
4  *    1. The <ESL_THREADS> object: a gang of workers.
5  *    2. Determining thread number to use.
6  *    3. Examples.
7  */
8 #include "esl_config.h"
9 
10 #ifdef HAVE_PTHREAD
11 
12 #include <stdio.h>
13 #include <stdlib.h>
14 #include <string.h>
15 #include <pthread.h>
16 
17 #ifdef HAVE_UNISTD_H
18 #include <unistd.h>
19 #endif
20 #ifdef HAVE_SYS_PARAM_H		/* On OpenBSD, sys/sysctl.h requires sys/param.h */
21 #include <sys/param.h>
22 #endif
23 #ifdef HAVE_SYS_SYSCTL_H
24 #include <sys/sysctl.h>
25 #endif
26 #ifdef HAVE_SYS_TYPES_H
27 #include <sys/types.h>
28 #endif
29 
30 #include "easel.h"
31 #include "esl_threads.h"
32 
33 
34 /*****************************************************************
35  *# 1. The <ESL_THREADS> object: a gang of workers.
36  *****************************************************************/
37 
38 /* Function:  esl_threads_Create()
39  * Synopsis:  Create a threads object.
40  * Incept:    MSF, Thu Jun 18 11:51:39 2009
41  *
42  * Purpose:   Creates an <ESL_THREADS> object, for organizing
43  8            a bunch of worker threads that will all run
44  *            the worker function <fnptr>. This object is a shell
45  *            for now; the worker threads themselves are
46  *            created individually with <esl_threads_AddThread()>.
47  *
48  * Returns:   ptr to the new <ESL_THREADS> object.
49  *
50  * Throws:    <NULL> on allocation or initialization failure.
51  */
52 ESL_THREADS *
esl_threads_Create(void (* fnptr)(void *))53 esl_threads_Create(void (*fnptr)(void *))
54 {
55   ESL_THREADS *obj = NULL;
56   int          status;
57 
58   ESL_ALLOC(obj, sizeof(ESL_THREADS));
59 
60   obj->threadCount     = 0;
61   obj->threadId        = NULL;
62   obj->data            = NULL;
63   obj->startThread     = 0;
64   obj->func            = fnptr;
65 
66   if (pthread_mutex_init(&obj->startMutex, NULL) != 0) ESL_XEXCEPTION(eslESYS, "mutex init failed");
67   if (pthread_cond_init (&obj->startCond,  NULL) != 0) ESL_XEXCEPTION(eslESYS, "cond init failed");
68   return obj;
69 
70  ERROR:
71   return NULL;
72 }
73 
74 /* Function:  esl_threads_Destroy()
75  * Synopsis:  Destroys an <ESL_THREADS> object.
76  * Incept:    MSF, Thu Jun 18 11:51:39 2009
77  *
78  * Purpose:   Frees an <ESL_THREADS> object.
79  *
80  *            The caller routine must first free the
81  *            contents of each <obj->data[]>.
82  *
83  * Returns:   void
84  */
85 void
esl_threads_Destroy(ESL_THREADS * obj)86 esl_threads_Destroy(ESL_THREADS *obj)
87 {
88   if (obj == NULL) return;
89 
90   if (obj->threadId != NULL) free(obj->threadId);
91   if (obj->data     != NULL) free(obj->data);
92   pthread_mutex_destroy(&obj->startMutex);
93   pthread_cond_destroy (&obj->startCond);
94   free(obj);
95   return;
96 }
97 
98 /* Function:  esl_threads_AddThread()
99  * Synopsis:  Add a worker thread to the <ESL_THREADS> object.
100  * Incept:    MSF, Thu Jun 18 11:51:39 2009
101  *
102  * Purpose:   Create a new worker thread for the <ESL_THREADS> object,
103  *            assigning it the work unit pointed to by <data>.
104  *
105  *            The caller remains responsible for any memory allocated
106  *            to <data>; the <ESL_THREADS> object will only manage
107  *            a copy of a pointer to <data>.
108  *
109  * Returns:   <eslOK> on success.
110  *
111  * Throws:    <eslEMEM> on allocation failure.
112  *            <eslESYS> if thread creation fails.
113  *            <eslEINVAL> if something's wrong with the <obj>.
114  */
115 int
esl_threads_AddThread(ESL_THREADS * obj,void * data)116 esl_threads_AddThread(ESL_THREADS *obj, void *data)
117 {
118   int    status;
119   void  *p;
120 
121   if (obj == NULL) ESL_EXCEPTION(eslEINVAL, "Invalid thread object");
122 
123   /* allocate inside the ESL_THREADS object to hold another worker */
124   ESL_RALLOC(obj->threadId, p, sizeof(pthread_t) * (obj->threadCount+1));
125   ESL_RALLOC(obj->data,     p, sizeof(void *)    * (obj->threadCount+1));
126 
127   obj->data[obj->threadCount] = data;
128   if (pthread_create(&(obj->threadId[obj->threadCount]), NULL, (void *(*)(void *)) obj->func, obj) != 0) ESL_EXCEPTION(eslESYS, "thread creation failed");
129   obj->threadCount++;
130   return eslOK;
131 
132  ERROR:
133   return status;
134 }
135 
136 /* Function:  esl_threads_GetWorkerCount()
137  * Synopsis:  Return the total number of worker threads.
138  * Incept:    SRE, Fri Aug 21 13:22:52 2009 [Janelia]
139  *
140  * Purpose:   Returns the total number of worker threads.
141  */
142 int
esl_threads_GetWorkerCount(ESL_THREADS * obj)143 esl_threads_GetWorkerCount(ESL_THREADS *obj)
144 {
145   return obj->threadCount;
146 }
147 
148 
149 /* Function:  esl_threads_WaitForStart()
150  * Synopsis:  Blocks master until all workers have started.
151  * Incept:    MSF, Thu Jun 18 11:51:39 2009
152  *
153  * Purpose:   Make the master thread wait until all the worker threads have
154  *            started. When all the worker threads have started and
155  *            are blocking at the start mutex, release them.
156  *
157  * Returns:   <eslOK> on success.
158  *
159  * Throws:    <eslESYS> if thread synchronization fails somewhere.
160  *            <eslEINVAL> if something is awry with <obj>.
161  */
162 int
esl_threads_WaitForStart(ESL_THREADS * obj)163 esl_threads_WaitForStart(ESL_THREADS *obj)
164 {
165   if (obj == NULL) ESL_EXCEPTION(eslEINVAL, "Invalid thread object");
166 
167   if (pthread_mutex_lock (&obj->startMutex) != 0) ESL_EXCEPTION(eslESYS, "mutex lock failed");
168 
169   /* wait for all worker threads to start */
170   while (obj->startThread < obj->threadCount) {
171     if (pthread_cond_wait(&obj->startCond, &obj->startMutex) != 0) ESL_EXCEPTION(eslESYS, "wait cond failed");
172   }
173 
174   /* release all the worker threads */
175   obj->startThread = 0;
176   if (pthread_cond_broadcast(&obj->startCond)  != 0) ESL_EXCEPTION(eslESYS, "cond broadcast failed");
177   if (pthread_mutex_unlock  (&obj->startMutex) != 0) ESL_EXCEPTION(eslESYS, "mutex unlock failed");
178   return eslOK;
179 }
180 
181 /* Function:  esl_threads_WaitForFinish()
182  * Synopsis:  Blocks master until all workers have completed.
183  * Incept:    MSF, Thu Jun 18 11:51:39 2009
184  *
185  * Purpose:   Block the master thread until all the worker threads have
186  *            completed. As each worker completes, remove it from the
187  *            <obj>.
188  *
189  *            Upon exit, the <obj> is returned to the same (empty)
190  *            state it was in after it was created. It may be reused
191  *            for a new problem by adding new workers.
192  *
193  * Returns:   <eslOK> on success.
194  *
195  * Throws:    <eslESYS> if thread synchronization fails somewhere.
196  *            <eslEINVAL> if something is awry with <obj>.
197  */
198 int
esl_threads_WaitForFinish(ESL_THREADS * obj)199 esl_threads_WaitForFinish(ESL_THREADS *obj)
200 {
201   int  w;
202 
203   if (obj == NULL) ESL_EXCEPTION(eslEINVAL, "Invalid thread object");
204 
205   /* wait for all worker threads to complete */
206   for (w = obj->threadCount-1; w >= 0; w--)
207     {
208       if (pthread_join(obj->threadId[w], NULL) != 0) ESL_EXCEPTION(eslESYS, "pthread join failed");
209       obj->threadCount--;
210     }
211 
212   return eslOK;
213 }
214 
215 /* Function:  esl_threads_Started()
216  * Synopsis:  Blocks worker until master gives the start signal.
217  * Incept:    MSF, Thu Jun 18 11:51:39 2009
218  *
219  * Purpose:   Block a worker thread until master sees that all workers
220  *            have started and gives the start signal. Assign the worker
221  *            a unique number (0..nworkers-1), and return it in
222  *            <*ret_workeridx>. The worker uses this index to
223  *            retrieve its work units.
224  *
225  * Returns:   <eslOK> on success.
226  *
227  * Throws:    <eslESYS> if thread synchronization fails somewhere.
228  *            <eslEINVAL> if something is awry with <obj>.
229  */
230 int
esl_threads_Started(ESL_THREADS * obj,int * ret_workeridx)231 esl_threads_Started(ESL_THREADS *obj, int *ret_workeridx)
232 {
233   int           w;
234   pthread_t     threadId;
235   int           status;
236 
237   if (obj == NULL)                                ESL_XEXCEPTION(eslEINVAL, "Invalid thread object");
238   if (pthread_mutex_lock (&obj->startMutex) != 0) ESL_XEXCEPTION(eslESYS,   "mutex lock failed");
239 
240   /* signal that we're started */
241   obj->startThread++;
242   if (pthread_cond_broadcast (&obj->startCond) != 0) ESL_XEXCEPTION(eslESYS, "cond broadcast failed");
243 
244   /* wait for the master's signal to start the calculations */
245   while (obj->startThread) {
246     if (pthread_cond_wait(&obj->startCond, &obj->startMutex) != 0) ESL_XEXCEPTION(eslESYS, "cond wait failed");
247   }
248 
249   if (pthread_mutex_unlock (&obj->startMutex) != 0)  ESL_XEXCEPTION(eslESYS, "mutex unlock failed");
250 
251   /* Figure out the worker's index */
252   threadId = pthread_self();
253   for (w = 0; w < obj->threadCount; w++)
254     if (pthread_equal(threadId, obj->threadId[w])) break;
255   if (w == obj->threadCount) ESL_XEXCEPTION(eslESYS, "thread not registered");
256 
257   *ret_workeridx = w;
258   return eslOK;
259 
260  ERROR:
261   *ret_workeridx = 0;
262   return status;
263 }
264 
265 
266 /* Function:  esl_threads_GetData()
267  * Synopsis:  Return the data associated with this thread.
268  * Incept:    MSF, Thu Jun 18 11:51:39 2009
269  *
270  * Purpose:   Return the data pointer associated with the worker thread
271  *            <workeridx>. The data pointer was set by the
272  *            <esl_threads_AddThread()> function.
273  *
274  * Returns:   void *
275  */
276 void *
esl_threads_GetData(ESL_THREADS * obj,int workeridx)277 esl_threads_GetData(ESL_THREADS *obj, int workeridx)
278 {
279   return obj->data[workeridx];
280 }
281 
282 
283 /* Function:  esl_threads_Finished()
284  * Synopsis:  Terminate the thread.
285  * Incept:    MSF, Thu Jun 18 11:51:39 2009
286  *
287  * Purpose:   Terminate a worker thread.
288  *            This is currently a no-op, serving as
289  *            a placeholder in case we eventually need
290  *            any cleanup.
291  *
292  * Returns:   <eslOK> on success.
293  */
294 int
esl_threads_Finished(ESL_THREADS * obj,int workeridx)295 esl_threads_Finished(ESL_THREADS *obj, int workeridx)
296 {
297   return eslOK;
298 }
299 
300 
301 /*****************************************************************
302  * 2. Determining thread number to use
303  *****************************************************************/
304 
305 /* Function:  esl_threads_CPUCount()
306  * Synopsis:  Figure out how many cpus the machine has.
307  * Incept:    SRE, Wed Aug 19 11:31:24 2009 [Janelia]
308  *
309  * Purpose:   Determine the number of logical processors on this
310  *            machine; return that number in <*ret_ncpu>.
311  *
312  *            The number of available processors is found by
313  *            <sysconf(_SC_NPROCESSORS_ONLN)>,
314  *            <sysconf(_SC_NPROC_ONLN)>, or a <sysctl()> call,
315  *            depending on the host system.  This determined number of
316  *            available processors will be the number of logical
317  *            processors, not physical processors. On systems with
318  *            hyperthreading, the number of logical processors is more
319  *            than the number of physical cpus. It may or may not be a
320  *            good thing to spawn more threads than physical
321  *            processors.
322  *
323  * Args:      ret_ncpu  - RETURN: number of logical CPUs
324  *
325  * Returns:   <eslOK> on success.
326  *
327  * Throws:    (no abnormal error conditions)
328  *
329  * Xref:      J5/68
330  */
331 int
esl_threads_CPUCount(int * ret_ncpu)332 esl_threads_CPUCount(int *ret_ncpu)
333 {
334   int   ncpu = 1;
335 
336 #if defined     (HAVE_SYSCONF) && defined (_SC_NPROCESSORS_ONLN)     /* Many systems (including Linux) */
337   ncpu = sysconf(_SC_NPROCESSORS_ONLN);
338 #elif defined   (HAVE_SYSCONF) && defined (_SC_NPROC_ONLN)	     /* Silicon Graphics IRIX */
339   ncpu = sysconf(_SC_NPROC_ONLN);
340 #elif defined   (HAVE_SYSCTL)	                                     /* BSD systems including OS/X */
341   int    mib[2] = {CTL_HW, HW_NCPU};
342   size_t len    = sizeof(int);
343   int    status;
344 
345   status = sysctl(mib, 2, &ncpu, &len, NULL, (size_t) NULL);
346   if (status < 0 || len != sizeof(int)) ncpu = 1;
347 #endif
348 
349   if (ncpu < 1) ncpu = 1;
350 
351   *ret_ncpu = ncpu;
352   return eslOK;
353 }
354 
355 
356 /* Function:  esl_threads_GetCPUCount()
357  * Synopsis:  Returns the number of CPU cores on machine.
358  * Incept:    SRE, Mon Aug 21 08:52:29 2017
359  *
360  * Purpose:   Identical to <esl_threads_CPUCount()>, except
361  *            it directly returns the result.
362  */
363 int
esl_threads_GetCPUCount(void)364 esl_threads_GetCPUCount(void)
365 {
366   static int ncpu = -1;                         // so we only make system calls once.
367   if (ncpu == -1) esl_threads_CPUCount(&ncpu);
368   return ncpu;
369 }
370 
371 
372 /*****************************************************************
373  * 3. Example
374  *****************************************************************/
375 
376 #ifdef eslTHREADS_EXAMPLE
377 #include "easel.h"
378 #include "esl_threads.h"
379 
380 /* gcc --std=gnu99 -g -Wall -pthread -o esl_threads_example -I. -DeslTHREADS_EXAMPLE esl_threads.c easel.c */
381 static void
worker_thread(void * data)382 worker_thread(void *data)
383 {
384   ESL_THREADS *thr = (ESL_THREADS *) data;
385   char        *s   = NULL;
386   int          w;
387 
388   esl_threads_Started(thr, &w);
389 
390   s = (char *) esl_threads_GetData(thr, w);
391   printf("worker thread %d receives: %s\n", w, s);
392 
393   esl_threads_Finished(thr, w);
394   return;
395 }
396 
397 int
main(void)398 main(void)
399 {
400   ESL_THREADS  *thr  = NULL;
401   int           ncpu = 8;
402   int           i;
403   char        **work = NULL;
404 
405   work = malloc(sizeof(char *) * ncpu);
406   for (i = 0; i < ncpu; i++)
407     esl_sprintf(&(work[i]), "work packet %d", i);
408 
409   thr = esl_threads_Create(&worker_thread);
410 
411   for (i = 0; i < ncpu; i++)
412     esl_threads_AddThread(thr, (void *) work[i]);
413 
414   esl_threads_WaitForStart (thr);
415   /* The worker threads now run their work. */
416   esl_threads_WaitForFinish(thr);
417   esl_threads_Destroy(thr);
418   for (i = 0; i < ncpu; i++) free(work[i]);
419   free(work);
420   return eslOK;
421 }
422 #endif /*eslTHREADS_EXAMPLE*/
423 
424 
425 #ifdef eslTHREADS_EXAMPLE2
426 #include "easel.h"
427 #include "esl_threads.h"
428 
429 /* gcc --std=gnu99 -g -Wall -pthread -o esl_threads_example2 -I. -DeslTHREADS_EXAMPLE2 esl_threads.c easel.c */
430 int
main(void)431 main(void)
432 {
433   int ncpu;
434 
435   esl_threads_CPUCount(&ncpu);
436   printf("Processors: %d\n", ncpu);
437 
438   return eslOK;
439 }
440 #endif /*eslTHREADS_EXAMPLE2*/
441 #endif /*HAVE_PTHREAD*/
442 
443 
444