1 /* Simple master/worker data parallelization using POSIX threads.
2 *
3 * Contents:
4 * 1. The <ESL_THREADS> object: a gang of workers.
5 * 2. Determining thread number to use.
6 * 3. Examples.
7 */
8 #include "esl_config.h"
9
10 #ifdef HAVE_PTHREAD
11
12 #include <stdio.h>
13 #include <stdlib.h>
14 #include <string.h>
15 #include <pthread.h>
16
17 #ifdef HAVE_UNISTD_H
18 #include <unistd.h>
19 #endif
20 #ifdef HAVE_SYS_PARAM_H /* On OpenBSD, sys/sysctl.h requires sys/param.h */
21 #include <sys/param.h>
22 #endif
23 #ifdef HAVE_SYS_SYSCTL_H
24 #include <sys/sysctl.h>
25 #endif
26 #ifdef HAVE_SYS_TYPES_H
27 #include <sys/types.h>
28 #endif
29
30 #include "easel.h"
31 #include "esl_threads.h"
32
33
34 /*****************************************************************
35 *# 1. The <ESL_THREADS> object: a gang of workers.
36 *****************************************************************/
37
38 /* Function: esl_threads_Create()
39 * Synopsis: Create a threads object.
40 * Incept: MSF, Thu Jun 18 11:51:39 2009
41 *
42 * Purpose: Creates an <ESL_THREADS> object, for organizing
43 8 a bunch of worker threads that will all run
44 * the worker function <fnptr>. This object is a shell
45 * for now; the worker threads themselves are
46 * created individually with <esl_threads_AddThread()>.
47 *
48 * Returns: ptr to the new <ESL_THREADS> object.
49 *
50 * Throws: <NULL> on allocation or initialization failure.
51 */
52 ESL_THREADS *
esl_threads_Create(void (* fnptr)(void *))53 esl_threads_Create(void (*fnptr)(void *))
54 {
55 ESL_THREADS *obj = NULL;
56 int status;
57
58 ESL_ALLOC(obj, sizeof(ESL_THREADS));
59
60 obj->threadCount = 0;
61 obj->threadId = NULL;
62 obj->data = NULL;
63 obj->startThread = 0;
64 obj->func = fnptr;
65
66 if (pthread_mutex_init(&obj->startMutex, NULL) != 0) ESL_XEXCEPTION(eslESYS, "mutex init failed");
67 if (pthread_cond_init (&obj->startCond, NULL) != 0) ESL_XEXCEPTION(eslESYS, "cond init failed");
68 return obj;
69
70 ERROR:
71 return NULL;
72 }
73
74 /* Function: esl_threads_Destroy()
75 * Synopsis: Destroys an <ESL_THREADS> object.
76 * Incept: MSF, Thu Jun 18 11:51:39 2009
77 *
78 * Purpose: Frees an <ESL_THREADS> object.
79 *
80 * The caller routine must first free the
81 * contents of each <obj->data[]>.
82 *
83 * Returns: void
84 */
85 void
esl_threads_Destroy(ESL_THREADS * obj)86 esl_threads_Destroy(ESL_THREADS *obj)
87 {
88 if (obj == NULL) return;
89
90 if (obj->threadId != NULL) free(obj->threadId);
91 if (obj->data != NULL) free(obj->data);
92 pthread_mutex_destroy(&obj->startMutex);
93 pthread_cond_destroy (&obj->startCond);
94 free(obj);
95 return;
96 }
97
98 /* Function: esl_threads_AddThread()
99 * Synopsis: Add a worker thread to the <ESL_THREADS> object.
100 * Incept: MSF, Thu Jun 18 11:51:39 2009
101 *
102 * Purpose: Create a new worker thread for the <ESL_THREADS> object,
103 * assigning it the work unit pointed to by <data>.
104 *
105 * The caller remains responsible for any memory allocated
106 * to <data>; the <ESL_THREADS> object will only manage
107 * a copy of a pointer to <data>.
108 *
109 * Returns: <eslOK> on success.
110 *
111 * Throws: <eslEMEM> on allocation failure.
112 * <eslESYS> if thread creation fails.
113 * <eslEINVAL> if something's wrong with the <obj>.
114 */
115 int
esl_threads_AddThread(ESL_THREADS * obj,void * data)116 esl_threads_AddThread(ESL_THREADS *obj, void *data)
117 {
118 int status;
119 void *p;
120
121 if (obj == NULL) ESL_EXCEPTION(eslEINVAL, "Invalid thread object");
122
123 /* allocate inside the ESL_THREADS object to hold another worker */
124 ESL_RALLOC(obj->threadId, p, sizeof(pthread_t) * (obj->threadCount+1));
125 ESL_RALLOC(obj->data, p, sizeof(void *) * (obj->threadCount+1));
126
127 obj->data[obj->threadCount] = data;
128 if (pthread_create(&(obj->threadId[obj->threadCount]), NULL, (void *(*)(void *)) obj->func, obj) != 0) ESL_EXCEPTION(eslESYS, "thread creation failed");
129 obj->threadCount++;
130 return eslOK;
131
132 ERROR:
133 return status;
134 }
135
136 /* Function: esl_threads_GetWorkerCount()
137 * Synopsis: Return the total number of worker threads.
138 * Incept: SRE, Fri Aug 21 13:22:52 2009 [Janelia]
139 *
140 * Purpose: Returns the total number of worker threads.
141 */
142 int
esl_threads_GetWorkerCount(ESL_THREADS * obj)143 esl_threads_GetWorkerCount(ESL_THREADS *obj)
144 {
145 return obj->threadCount;
146 }
147
148
149 /* Function: esl_threads_WaitForStart()
150 * Synopsis: Blocks master until all workers have started.
151 * Incept: MSF, Thu Jun 18 11:51:39 2009
152 *
153 * Purpose: Make the master thread wait until all the worker threads have
154 * started. When all the worker threads have started and
155 * are blocking at the start mutex, release them.
156 *
157 * Returns: <eslOK> on success.
158 *
159 * Throws: <eslESYS> if thread synchronization fails somewhere.
160 * <eslEINVAL> if something is awry with <obj>.
161 */
162 int
esl_threads_WaitForStart(ESL_THREADS * obj)163 esl_threads_WaitForStart(ESL_THREADS *obj)
164 {
165 if (obj == NULL) ESL_EXCEPTION(eslEINVAL, "Invalid thread object");
166
167 if (pthread_mutex_lock (&obj->startMutex) != 0) ESL_EXCEPTION(eslESYS, "mutex lock failed");
168
169 /* wait for all worker threads to start */
170 while (obj->startThread < obj->threadCount) {
171 if (pthread_cond_wait(&obj->startCond, &obj->startMutex) != 0) ESL_EXCEPTION(eslESYS, "wait cond failed");
172 }
173
174 /* release all the worker threads */
175 obj->startThread = 0;
176 if (pthread_cond_broadcast(&obj->startCond) != 0) ESL_EXCEPTION(eslESYS, "cond broadcast failed");
177 if (pthread_mutex_unlock (&obj->startMutex) != 0) ESL_EXCEPTION(eslESYS, "mutex unlock failed");
178 return eslOK;
179 }
180
181 /* Function: esl_threads_WaitForFinish()
182 * Synopsis: Blocks master until all workers have completed.
183 * Incept: MSF, Thu Jun 18 11:51:39 2009
184 *
185 * Purpose: Block the master thread until all the worker threads have
186 * completed. As each worker completes, remove it from the
187 * <obj>.
188 *
189 * Upon exit, the <obj> is returned to the same (empty)
190 * state it was in after it was created. It may be reused
191 * for a new problem by adding new workers.
192 *
193 * Returns: <eslOK> on success.
194 *
195 * Throws: <eslESYS> if thread synchronization fails somewhere.
196 * <eslEINVAL> if something is awry with <obj>.
197 */
198 int
esl_threads_WaitForFinish(ESL_THREADS * obj)199 esl_threads_WaitForFinish(ESL_THREADS *obj)
200 {
201 int w;
202
203 if (obj == NULL) ESL_EXCEPTION(eslEINVAL, "Invalid thread object");
204
205 /* wait for all worker threads to complete */
206 for (w = obj->threadCount-1; w >= 0; w--)
207 {
208 if (pthread_join(obj->threadId[w], NULL) != 0) ESL_EXCEPTION(eslESYS, "pthread join failed");
209 obj->threadCount--;
210 }
211
212 return eslOK;
213 }
214
215 /* Function: esl_threads_Started()
216 * Synopsis: Blocks worker until master gives the start signal.
217 * Incept: MSF, Thu Jun 18 11:51:39 2009
218 *
219 * Purpose: Block a worker thread until master sees that all workers
220 * have started and gives the start signal. Assign the worker
221 * a unique number (0..nworkers-1), and return it in
222 * <*ret_workeridx>. The worker uses this index to
223 * retrieve its work units.
224 *
225 * Returns: <eslOK> on success.
226 *
227 * Throws: <eslESYS> if thread synchronization fails somewhere.
228 * <eslEINVAL> if something is awry with <obj>.
229 */
230 int
esl_threads_Started(ESL_THREADS * obj,int * ret_workeridx)231 esl_threads_Started(ESL_THREADS *obj, int *ret_workeridx)
232 {
233 int w;
234 pthread_t threadId;
235 int status;
236
237 if (obj == NULL) ESL_XEXCEPTION(eslEINVAL, "Invalid thread object");
238 if (pthread_mutex_lock (&obj->startMutex) != 0) ESL_XEXCEPTION(eslESYS, "mutex lock failed");
239
240 /* signal that we're started */
241 obj->startThread++;
242 if (pthread_cond_broadcast (&obj->startCond) != 0) ESL_XEXCEPTION(eslESYS, "cond broadcast failed");
243
244 /* wait for the master's signal to start the calculations */
245 while (obj->startThread) {
246 if (pthread_cond_wait(&obj->startCond, &obj->startMutex) != 0) ESL_XEXCEPTION(eslESYS, "cond wait failed");
247 }
248
249 if (pthread_mutex_unlock (&obj->startMutex) != 0) ESL_XEXCEPTION(eslESYS, "mutex unlock failed");
250
251 /* Figure out the worker's index */
252 threadId = pthread_self();
253 for (w = 0; w < obj->threadCount; w++)
254 if (pthread_equal(threadId, obj->threadId[w])) break;
255 if (w == obj->threadCount) ESL_XEXCEPTION(eslESYS, "thread not registered");
256
257 *ret_workeridx = w;
258 return eslOK;
259
260 ERROR:
261 *ret_workeridx = 0;
262 return status;
263 }
264
265
266 /* Function: esl_threads_GetData()
267 * Synopsis: Return the data associated with this thread.
268 * Incept: MSF, Thu Jun 18 11:51:39 2009
269 *
270 * Purpose: Return the data pointer associated with the worker thread
271 * <workeridx>. The data pointer was set by the
272 * <esl_threads_AddThread()> function.
273 *
274 * Returns: void *
275 */
276 void *
esl_threads_GetData(ESL_THREADS * obj,int workeridx)277 esl_threads_GetData(ESL_THREADS *obj, int workeridx)
278 {
279 return obj->data[workeridx];
280 }
281
282
283 /* Function: esl_threads_Finished()
284 * Synopsis: Terminate the thread.
285 * Incept: MSF, Thu Jun 18 11:51:39 2009
286 *
287 * Purpose: Terminate a worker thread.
288 * This is currently a no-op, serving as
289 * a placeholder in case we eventually need
290 * any cleanup.
291 *
292 * Returns: <eslOK> on success.
293 */
294 int
esl_threads_Finished(ESL_THREADS * obj,int workeridx)295 esl_threads_Finished(ESL_THREADS *obj, int workeridx)
296 {
297 return eslOK;
298 }
299
300
301 /*****************************************************************
302 * 2. Determining thread number to use
303 *****************************************************************/
304
305 /* Function: esl_threads_CPUCount()
306 * Synopsis: Figure out how many cpus the machine has.
307 * Incept: SRE, Wed Aug 19 11:31:24 2009 [Janelia]
308 *
309 * Purpose: Determine the number of logical processors on this
310 * machine; return that number in <*ret_ncpu>.
311 *
312 * The number of available processors is found by
313 * <sysconf(_SC_NPROCESSORS_ONLN)>,
314 * <sysconf(_SC_NPROC_ONLN)>, or a <sysctl()> call,
315 * depending on the host system. This determined number of
316 * available processors will be the number of logical
317 * processors, not physical processors. On systems with
318 * hyperthreading, the number of logical processors is more
319 * than the number of physical cpus. It may or may not be a
320 * good thing to spawn more threads than physical
321 * processors.
322 *
323 * Args: ret_ncpu - RETURN: number of logical CPUs
324 *
325 * Returns: <eslOK> on success.
326 *
327 * Throws: (no abnormal error conditions)
328 *
329 * Xref: J5/68
330 */
331 int
esl_threads_CPUCount(int * ret_ncpu)332 esl_threads_CPUCount(int *ret_ncpu)
333 {
334 int ncpu = 1;
335
336 #if defined (HAVE_SYSCONF) && defined (_SC_NPROCESSORS_ONLN) /* Many systems (including Linux) */
337 ncpu = sysconf(_SC_NPROCESSORS_ONLN);
338 #elif defined (HAVE_SYSCONF) && defined (_SC_NPROC_ONLN) /* Silicon Graphics IRIX */
339 ncpu = sysconf(_SC_NPROC_ONLN);
340 #elif defined (HAVE_SYSCTL) /* BSD systems including OS/X */
341 int mib[2] = {CTL_HW, HW_NCPU};
342 size_t len = sizeof(int);
343 int status;
344
345 status = sysctl(mib, 2, &ncpu, &len, NULL, (size_t) NULL);
346 if (status < 0 || len != sizeof(int)) ncpu = 1;
347 #endif
348
349 if (ncpu < 1) ncpu = 1;
350
351 *ret_ncpu = ncpu;
352 return eslOK;
353 }
354
355
356 /* Function: esl_threads_GetCPUCount()
357 * Synopsis: Returns the number of CPU cores on machine.
358 * Incept: SRE, Mon Aug 21 08:52:29 2017
359 *
360 * Purpose: Identical to <esl_threads_CPUCount()>, except
361 * it directly returns the result.
362 */
363 int
esl_threads_GetCPUCount(void)364 esl_threads_GetCPUCount(void)
365 {
366 static int ncpu = -1; // so we only make system calls once.
367 if (ncpu == -1) esl_threads_CPUCount(&ncpu);
368 return ncpu;
369 }
370
371
372 /*****************************************************************
373 * 3. Example
374 *****************************************************************/
375
376 #ifdef eslTHREADS_EXAMPLE
377 #include "easel.h"
378 #include "esl_threads.h"
379
380 /* gcc --std=gnu99 -g -Wall -pthread -o esl_threads_example -I. -DeslTHREADS_EXAMPLE esl_threads.c easel.c */
381 static void
worker_thread(void * data)382 worker_thread(void *data)
383 {
384 ESL_THREADS *thr = (ESL_THREADS *) data;
385 char *s = NULL;
386 int w;
387
388 esl_threads_Started(thr, &w);
389
390 s = (char *) esl_threads_GetData(thr, w);
391 printf("worker thread %d receives: %s\n", w, s);
392
393 esl_threads_Finished(thr, w);
394 return;
395 }
396
397 int
main(void)398 main(void)
399 {
400 ESL_THREADS *thr = NULL;
401 int ncpu = 8;
402 int i;
403 char **work = NULL;
404
405 work = malloc(sizeof(char *) * ncpu);
406 for (i = 0; i < ncpu; i++)
407 esl_sprintf(&(work[i]), "work packet %d", i);
408
409 thr = esl_threads_Create(&worker_thread);
410
411 for (i = 0; i < ncpu; i++)
412 esl_threads_AddThread(thr, (void *) work[i]);
413
414 esl_threads_WaitForStart (thr);
415 /* The worker threads now run their work. */
416 esl_threads_WaitForFinish(thr);
417 esl_threads_Destroy(thr);
418 for (i = 0; i < ncpu; i++) free(work[i]);
419 free(work);
420 return eslOK;
421 }
422 #endif /*eslTHREADS_EXAMPLE*/
423
424
425 #ifdef eslTHREADS_EXAMPLE2
426 #include "easel.h"
427 #include "esl_threads.h"
428
429 /* gcc --std=gnu99 -g -Wall -pthread -o esl_threads_example2 -I. -DeslTHREADS_EXAMPLE2 esl_threads.c easel.c */
430 int
main(void)431 main(void)
432 {
433 int ncpu;
434
435 esl_threads_CPUCount(&ncpu);
436 printf("Processors: %d\n", ncpu);
437
438 return eslOK;
439 }
440 #endif /*eslTHREADS_EXAMPLE2*/
441 #endif /*HAVE_PTHREAD*/
442
443
444