1 /*********************************************************************
2 Functions to facilitate using threads.
3 This is part of GNU Astronomy Utilities (Gnuastro) package.
4
5 Original author:
6 Mohammad Akhlaghi <mohammad@akhlaghi.org>
7 Contributing author(s):
8 Copyright (C) 2015-2021, Free Software Foundation, Inc.
9
10 Gnuastro is free software: you can redistribute it and/or modify it
11 under the terms of the GNU General Public License as published by the
12 Free Software Foundation, either version 3 of the License, or (at your
13 option) any later version.
14
15 Gnuastro is distributed in the hope that it will be useful, but
16 WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with Gnuastro. If not, see <http://www.gnu.org/licenses/>.
22 **********************************************************************/
23 #include <config.h>
24
25 #include <time.h>
26 #include <stdio.h>
27 #include <errno.h>
28 #include <error.h>
29 #include <stdlib.h>
30
31 #include <gnuastro/threads.h>
32 #include <gnuastro/pointer.h>
33
34 #include <nproc.h> /* from Gnulib, in Gnuastro's source */
35
36
37
38
39
40 /*****************************************************************/
41 /********* Implementation of pthread_barrier ***************/
42 /*****************************************************************/
43 /* Re-implementation of the example code given in:
44 http://blog.albertarmea.com/post/47089939939/using-pthread-barrier-on-mac-os-x
45 */
46 #if GAL_CONFIG_HAVE_PTHREAD_BARRIER == 0
47
48 /* Initialize the barrier structure. A barrier is a high-level way to wait
49 until several threads have finished. */
50 int
pthread_barrier_init(pthread_barrier_t * b,pthread_barrierattr_t * attr,unsigned int limit)51 pthread_barrier_init(pthread_barrier_t *b, pthread_barrierattr_t *attr,
52 unsigned int limit)
53 {
54 int err;
55
56 /* Sanity check: */
57 if(limit==0)
58 {
59 errno = EINVAL;
60 error(EXIT_FAILURE, errno, "%s: limit is zero", __func__);
61 }
62
63 /* Initialize the mutex: */
64 err=pthread_mutex_init(&b->mutex, 0);
65 if(err)
66 error(EXIT_FAILURE, err, "%s: inializing mutex", __func__);
67
68 /* Initialize the condition variable: */
69 err=pthread_cond_init(&b->cond, 0);
70 if(err)
71 {
72 pthread_mutex_destroy(&b->mutex);
73 error(EXIT_FAILURE, err, "%s: inializing cond", __func__);
74 }
75
76 /* set the values: */
77 b->limit=limit;
78 b->condfinished=b->count=0;
79
80 return 0;
81 }
82
83
84
85
86
87 /* Suspend the calling thread (tell it to wait), until the limiting number
88 of barriers is reached by the other threads. When the number isn't
89 reached yet (process goes into the 'else'), then we use the
90 'pthread_cond_wait' function, which will wait until a broadcast is
91 announced by another thread that succeeds the 'if'. After the thread no
92 longer needs the condition variable, we increment 'b->condfinished' so
93 'pthread_barrier_destroy' can know if it should wait (sleep) or
94 continue.*/
95 int
pthread_barrier_wait(pthread_barrier_t * b)96 pthread_barrier_wait(pthread_barrier_t *b)
97 {
98 pthread_mutex_lock(&b->mutex);
99 ++b->count;
100 if(b->count >= b->limit)
101 {
102 pthread_cond_broadcast(&b->cond);
103 ++b->condfinished;
104 pthread_mutex_unlock(&b->mutex);
105 return 1;
106 }
107 else
108 {
109 /* Initially b->count is smaller than b->limit, otherwise control
110 would never have reached here. However, when it get to
111 'pthread_cond_wait', this thread goes into a suspended period and
112 is only awaken when a broad-cast is made. But we only want this
113 thread to finish when b->count >= b->limit, so we have this while
114 loop. */
115 while(b->count < b->limit)
116 pthread_cond_wait(&b->cond, &b->mutex);
117 ++b->condfinished;
118 pthread_mutex_unlock(&b->mutex);
119 return 0;
120 }
121 }
122
123
124
125
126
127 /* Wait until all the threads that were blocked behind this barrier have
128 finished (don't need the mutex and condition variable anymore. Then
129 destroy the two. After this function, you can safely re-use the
130 pthread_barrier_t structure. */
131 int
pthread_barrier_destroy(pthread_barrier_t * b)132 pthread_barrier_destroy(pthread_barrier_t *b)
133 {
134 struct timespec request, remaining;
135
136 /* Wait until no more threads need the barrier. */
137 request.tv_sec=0;
138 request.tv_nsec=GAL_THREADS_BARRIER_DESTROY_NANOSECS;
139 while( b->condfinished < b->limit )
140 nanosleep(&request, &remaining);
141
142 /* Destroy the condition variable and mutex */
143 pthread_cond_destroy(&b->cond);
144 pthread_mutex_destroy(&b->mutex);
145 return 0;
146 }
147
148 #endif /* GAL_CONFIG_HAVE_PTHREAD_BARRIER == 0 */
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169 /*******************************************************************/
170 /************ Thread utilities **************/
171 /*******************************************************************/
172 size_t
gal_threads_number()173 gal_threads_number()
174 {
175 return num_processors(NPROC_CURRENT);
176 }
177
178
179
180
181
182 /* We have 'numactions' jobs and we want their indexs to be divided
183 between 'numthreads' CPU threads. This function will give each index to
184 a thread such that the maximum difference between the number of
185 images for each thread is 1. The results will be saved in a 2D
186 array of 'outthrdcols' columns and each row will finish with a
187 (size_t) -1, which is larger than any possible index!. */
188 char *
gal_threads_dist_in_threads(size_t numactions,size_t numthreads,size_t minmapsize,int quietmmap,size_t ** outthrds,size_t * outthrdcols)189 gal_threads_dist_in_threads(size_t numactions, size_t numthreads,
190 size_t minmapsize, int quietmmap,
191 size_t **outthrds, size_t *outthrdcols)
192 {
193 size_t *sp, *fp;
194 char *mmapname=NULL;
195 size_t i, *thrds, thrdcols;
196 *outthrdcols = thrdcols = numactions/numthreads+2;
197
198 /* Allocate the space to keep the identifiers. */
199 thrds=*outthrds=gal_pointer_allocate_ram_or_mmap(GAL_TYPE_SIZE_T,
200 numthreads*thrdcols, 0, minmapsize, &mmapname,
201 0, __func__, "thrds");
202
203 /* Initialize all the elements to NONINDEX. */
204 fp=(sp=thrds)+numthreads*thrdcols;
205 do *sp=GAL_BLANK_SIZE_T; while(++sp<fp);
206
207 /* Distribute the labels in the threads. */
208 for(i=0;i<numactions;++i)
209 thrds[ (i%numthreads)*thrdcols+(i/numthreads) ] = i;
210
211 /* In case you want to see the result:
212 for(i=0;i<numthreads;++i)
213 {
214 size_t j;
215 printf("\n\n############################\n");
216 printf("THREAD %zu: \n", i);
217 for(j=0;thrds[i*thrdcols+j]!=GAL_BLANK_SIZE_T;j++)
218 printf("%zu, ", thrds[i*thrdcols+j]);
219 printf("\b\b.\n");
220 }
221 exit(0);
222 */
223
224 /* Return the name of the possibly memory-mapped file. */
225 return mmapname;
226 }
227
228
229
230
231
232 void
gal_threads_attr_barrier_init(pthread_attr_t * attr,pthread_barrier_t * b,size_t limit)233 gal_threads_attr_barrier_init(pthread_attr_t *attr, pthread_barrier_t *b,
234 size_t limit)
235 {
236 int err;
237
238 err=pthread_attr_init(attr);
239 if(err) error(EXIT_FAILURE, 0, "%s: thread attr not initialized", __func__);
240 err=pthread_attr_setdetachstate(attr, PTHREAD_CREATE_DETACHED);
241 if(err) error(EXIT_FAILURE, 0, "%s: thread attr not detached", __func__);
242 err=pthread_barrier_init(b, NULL, limit);
243 if(err) error(EXIT_FAILURE, 0, "%s: thread barrier not initialized",
244 __func__);
245 }
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266 /*******************************************************************/
267 /************ Run a function on multiple threads **************/
268 /*******************************************************************/
269 /* Run a given function on the given tiles. The function has to be
270 link-able with your final executable and has to have only one 'void *'
271 argument and return a 'void *' value. To have access to
272 variables/parameters in the function, you have to define a structure and
273 pass its pointer as 'caller_params'.
274
275 Here is one simple example. At least two functions and one structure are
276 necessary to use this function.
277
278 --------- Parameters to keep values you need ---------
279 struct my_params
280 {
281 int value1;
282 double value2;
283 float *array;
284 };
285
286
287 --------- Function to run on each thread ---------
288 void *
289 run_on_thread(void *in_prm)
290 {
291 struct gal_threads_params *tprm=(struct gal_threads_params *)in_prm;
292 struct my_params *prm=(struct my_params *)(tprm->params);
293
294 size_t i;
295
296 for(i=0; tprm->indexs[i] != GAL_BLANK_SIZE_T; ++i)
297 {
298
299 THE INDEX OF THE TARGET IS NOW AVAILABLE AS
300 'tprm->indexs[i]'. YOU CAN USE IT IN WHAT EVER MANNER YOU LIKE
301 ALONG WITH THE SET OF VARIABLES/ARRAYS in 'prm'.
302
303 }
304
305 if(tprm->b) pthread_barrier_wait(tprm->b);
306 return NULL;
307 }
308
309
310 --------- High-level function ---------
311 int
312 higher_level_function(float *array, size_t num_in_array, int value1)
313 {
314 double value2;
315 struct my_params;
316 size_t numthreads;
317
318 my_params.value1=value1;
319 my_params.value2=value2;
320 my_params.arary=array;
321
322 gal_threads_spin_off(run_on_thread, &my_params, num_in_array,
323 numthreads);
324
325 return 1;
326 }
327
328 For real world applications of this function, you can also inspect the
329 Gnuastro source. There are also many cases in Gnuastro where we benefit
330 from this function. Please run the following command from the top source
331 directory of Gnuastro to see where:
332
333 $ grep -r gal_threads_spin_off ./
334 */
335 void
gal_threads_spin_off(void * (* worker)(void *),void * caller_params,size_t numactions,size_t numthreads,size_t minmapsize,int quietmmap)336 gal_threads_spin_off(void *(*worker)(void *), void *caller_params,
337 size_t numactions, size_t numthreads,
338 size_t minmapsize, int quietmmap)
339 {
340 int err;
341 pthread_t t; /* All thread ids saved in this, not used. */
342 char *mmapname=NULL;
343 pthread_attr_t attr;
344 pthread_barrier_t b;
345 struct gal_threads_params *prm;
346 size_t i, *indexs, thrdcols, numbarriers;
347
348 /* If there are no actions, then just return. */
349 if(numactions==0) return;
350
351 /* Sanity check. */
352 if(numthreads==0)
353 error(EXIT_FAILURE, 0, "%s: the number of threads ('numthreads') "
354 "cannot be zero", __func__);
355
356 /* Allocate the array of parameters structure structures. */
357 errno=0;
358 prm=malloc(numthreads*sizeof *prm);
359 if(prm==NULL)
360 {
361 fprintf(stderr, "%zu bytes could not be allocated for prm.",
362 numthreads*sizeof *prm);
363 exit(EXIT_FAILURE);
364 }
365
366 /* Distribute the actions into the threads: */
367 mmapname=gal_threads_dist_in_threads(numactions, numthreads, minmapsize,
368 quietmmap, &indexs, &thrdcols);
369
370 /* Do the job: when only one thread is necessary, there is no need to
371 spin off one thread, just call the workerfunction directly (spinning
372 off threads is expensive). This is for the generic thread spinner
373 function, not this simple function where 'numthreads' is a
374 constant. */
375 if(numthreads==1)
376 {
377 prm[0].id=0;
378 prm[0].b=NULL;
379 prm[0].indexs=indexs;
380 prm[0].params=caller_params;
381 worker(&prm[0]);
382 }
383 else
384 {
385 /* Initialize the attributes. Note that this running thread
386 (that spinns off the nt threads) is also a thread, so the
387 number the barriers should be one more than the number of
388 threads spinned off. */
389 numbarriers = (numactions<numthreads ? numactions : numthreads) + 1;
390 gal_threads_attr_barrier_init(&attr, &b, numbarriers);
391
392 /* Spin off the threads: */
393 for(i=0;i<numthreads;++i)
394 if(indexs[i*thrdcols]!=GAL_BLANK_SIZE_T)
395 {
396 prm[i].id=i;
397 prm[i].b=&b;
398 prm[i].params=caller_params;
399 prm[i].indexs=&indexs[i*thrdcols];
400 err=pthread_create(&t, &attr, worker, &prm[i]);
401 if(err)
402 {
403 fprintf(stderr, "can't create thread %zu", i);
404 exit(EXIT_FAILURE);
405 }
406 }
407
408 /* Wait for all threads to finish and free the spaces. */
409 pthread_barrier_wait(&b);
410 pthread_attr_destroy(&attr);
411 pthread_barrier_destroy(&b);
412 }
413
414 /* If 'mmapname' is NULL, then 'indexs' is in RAM and we can safely
415 'free' it. However, when its not NULL, then the space for 'indexs' has
416 been memory-mapped (its not in RAM) so special treatment is necessary
417 to delete it through the proper function. */
418 if(mmapname) gal_pointer_mmap_free(&mmapname, quietmmap);
419 else free(indexs);
420
421 /* Clean up. */
422 free(prm);
423 }
424