1 /*********************************************************************
2 Functions to facilitate using threads.
3 This is part of GNU Astronomy Utilities (Gnuastro) package.
4 
5 Original author:
6      Mohammad Akhlaghi <mohammad@akhlaghi.org>
7 Contributing author(s):
8 Copyright (C) 2015-2021, Free Software Foundation, Inc.
9 
10 Gnuastro is free software: you can redistribute it and/or modify it
11 under the terms of the GNU General Public License as published by the
12 Free Software Foundation, either version 3 of the License, or (at your
13 option) any later version.
14 
15 Gnuastro is distributed in the hope that it will be useful, but
16 WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18 General Public License for more details.
19 
20 You should have received a copy of the GNU General Public License
21 along with Gnuastro. If not, see <http://www.gnu.org/licenses/>.
22 **********************************************************************/
23 #include <config.h>
24 
25 #include <time.h>
26 #include <stdio.h>
27 #include <errno.h>
28 #include <error.h>
29 #include <stdlib.h>
30 
31 #include <gnuastro/threads.h>
32 #include <gnuastro/pointer.h>
33 
34 #include <nproc.h>         /* from Gnulib, in Gnuastro's source */
35 
36 
37 
38 
39 
40 /*****************************************************************/
41 /*********    Implementation of pthread_barrier    ***************/
42 /*****************************************************************/
43 /* Re-implementation of the example code given in:
44 http://blog.albertarmea.com/post/47089939939/using-pthread-barrier-on-mac-os-x
45  */
46 #if GAL_CONFIG_HAVE_PTHREAD_BARRIER == 0
47 
48 /* Initialize the barrier structure. A barrier is a high-level way to wait
49    until several threads have finished. */
50 int
pthread_barrier_init(pthread_barrier_t * b,pthread_barrierattr_t * attr,unsigned int limit)51 pthread_barrier_init(pthread_barrier_t *b, pthread_barrierattr_t *attr,
52                      unsigned int limit)
53 {
54   int err;
55 
56   /* Sanity check: */
57   if(limit==0)
58     {
59       errno = EINVAL;
60       error(EXIT_FAILURE, errno, "%s: limit is zero", __func__);
61     }
62 
63   /* Initialize the mutex: */
64   err=pthread_mutex_init(&b->mutex, 0);
65   if(err)
66     error(EXIT_FAILURE, err, "%s: inializing mutex", __func__);
67 
68   /* Initialize the condition variable: */
69   err=pthread_cond_init(&b->cond, 0);
70   if(err)
71     {
72       pthread_mutex_destroy(&b->mutex);
73       error(EXIT_FAILURE, err, "%s: inializing cond", __func__);
74     }
75 
76   /* set the values: */
77   b->limit=limit;
78   b->condfinished=b->count=0;
79 
80   return 0;
81 }
82 
83 
84 
85 
86 
87 /* Suspend the calling thread (tell it to wait), until the limiting number
88    of barriers is reached by the other threads. When the number isn't
89    reached yet (process goes into the 'else'), then we use the
90    'pthread_cond_wait' function, which will wait until a broadcast is
91    announced by another thread that succeeds the 'if'. After the thread no
92    longer needs the condition variable, we increment 'b->condfinished' so
93    'pthread_barrier_destroy' can know if it should wait (sleep) or
94    continue.*/
95 int
pthread_barrier_wait(pthread_barrier_t * b)96 pthread_barrier_wait(pthread_barrier_t *b)
97 {
98   pthread_mutex_lock(&b->mutex);
99   ++b->count;
100   if(b->count >= b->limit)
101     {
102       pthread_cond_broadcast(&b->cond);
103       ++b->condfinished;
104       pthread_mutex_unlock(&b->mutex);
105       return 1;
106     }
107   else
108     {
109       /* Initially b->count is smaller than b->limit, otherwise control
110          would never have reached here. However, when it get to
111          'pthread_cond_wait', this thread goes into a suspended period and
112          is only awaken when a broad-cast is made. But we only want this
113          thread to finish when b->count >= b->limit, so we have this while
114          loop. */
115       while(b->count < b->limit)
116         pthread_cond_wait(&b->cond, &b->mutex);
117       ++b->condfinished;
118       pthread_mutex_unlock(&b->mutex);
119       return 0;
120     }
121 }
122 
123 
124 
125 
126 
127 /* Wait until all the threads that were blocked behind this barrier have
128    finished (don't need the mutex and condition variable anymore. Then
129    destroy the two. After this function, you can safely re-use the
130    pthread_barrier_t structure. */
131 int
pthread_barrier_destroy(pthread_barrier_t * b)132 pthread_barrier_destroy(pthread_barrier_t *b)
133 {
134   struct timespec request, remaining;
135 
136   /* Wait until no more threads need the barrier. */
137   request.tv_sec=0;
138   request.tv_nsec=GAL_THREADS_BARRIER_DESTROY_NANOSECS;
139   while( b->condfinished < b->limit )
140     nanosleep(&request, &remaining);
141 
142   /* Destroy the condition variable and mutex */
143   pthread_cond_destroy(&b->cond);
144   pthread_mutex_destroy(&b->mutex);
145   return 0;
146 }
147 
148 #endif  /* GAL_CONFIG_HAVE_PTHREAD_BARRIER == 0 */
149 
150 
151 
152 
153 
154 
155 
156 
157 
158 
159 
160 
161 
162 
163 
164 
165 
166 
167 
168 
169 /*******************************************************************/
170 /************              Thread utilities           **************/
171 /*******************************************************************/
172 size_t
gal_threads_number()173 gal_threads_number()
174 {
175   return num_processors(NPROC_CURRENT);
176 }
177 
178 
179 
180 
181 
182 /* We have 'numactions' jobs and we want their indexs to be divided
183    between 'numthreads' CPU threads. This function will give each index to
184    a thread such that the maximum difference between the number of
185    images for each thread is 1. The results will be saved in a 2D
186    array of 'outthrdcols' columns and each row will finish with a
187    (size_t) -1, which is larger than any possible index!. */
188 char *
gal_threads_dist_in_threads(size_t numactions,size_t numthreads,size_t minmapsize,int quietmmap,size_t ** outthrds,size_t * outthrdcols)189 gal_threads_dist_in_threads(size_t numactions, size_t numthreads,
190                             size_t minmapsize, int quietmmap,
191                             size_t **outthrds, size_t *outthrdcols)
192 {
193   size_t *sp, *fp;
194   char *mmapname=NULL;
195   size_t i, *thrds, thrdcols;
196   *outthrdcols = thrdcols = numactions/numthreads+2;
197 
198   /* Allocate the space to keep the identifiers. */
199   thrds=*outthrds=gal_pointer_allocate_ram_or_mmap(GAL_TYPE_SIZE_T,
200                               numthreads*thrdcols, 0, minmapsize, &mmapname,
201                               0, __func__, "thrds");
202 
203   /* Initialize all the elements to NONINDEX. */
204   fp=(sp=thrds)+numthreads*thrdcols;
205   do *sp=GAL_BLANK_SIZE_T; while(++sp<fp);
206 
207   /* Distribute the labels in the threads.  */
208   for(i=0;i<numactions;++i)
209     thrds[ (i%numthreads)*thrdcols+(i/numthreads) ] = i;
210 
211   /* In case you want to see the result:
212   for(i=0;i<numthreads;++i)
213     {
214       size_t j;
215       printf("\n\n############################\n");
216       printf("THREAD %zu: \n", i);
217       for(j=0;thrds[i*thrdcols+j]!=GAL_BLANK_SIZE_T;j++)
218         printf("%zu, ", thrds[i*thrdcols+j]);
219       printf("\b\b.\n");
220     }
221   exit(0);
222   */
223 
224   /* Return the name of the possibly memory-mapped file. */
225   return mmapname;
226 }
227 
228 
229 
230 
231 
232 void
gal_threads_attr_barrier_init(pthread_attr_t * attr,pthread_barrier_t * b,size_t limit)233 gal_threads_attr_barrier_init(pthread_attr_t *attr, pthread_barrier_t *b,
234                               size_t limit)
235 {
236   int err;
237 
238   err=pthread_attr_init(attr);
239   if(err) error(EXIT_FAILURE, 0, "%s: thread attr not initialized", __func__);
240   err=pthread_attr_setdetachstate(attr, PTHREAD_CREATE_DETACHED);
241   if(err) error(EXIT_FAILURE, 0, "%s: thread attr not detached", __func__);
242   err=pthread_barrier_init(b, NULL, limit);
243   if(err) error(EXIT_FAILURE, 0, "%s: thread barrier not initialized",
244                 __func__);
245 }
246 
247 
248 
249 
250 
251 
252 
253 
254 
255 
256 
257 
258 
259 
260 
261 
262 
263 
264 
265 
266 /*******************************************************************/
267 /************     Run a function on multiple threads  **************/
268 /*******************************************************************/
269 /* Run a given function on the given tiles. The function has to be
270    link-able with your final executable and has to have only one 'void *'
271    argument and return a 'void *' value. To have access to
272    variables/parameters in the function, you have to define a structure and
273    pass its pointer as 'caller_params'.
274 
275    Here is one simple example. At least two functions and one structure are
276    necessary to use this function.
277 
278      --------- Parameters to keep values you need ---------
279      struct my_params
280      {
281        int    value1;
282        double value2;
283        float  *array;
284      };
285 
286 
287      ---------    Function to run on each thread  ---------
288      void *
289      run_on_thread(void *in_prm)
290      {
291        struct gal_threads_params *tprm=(struct gal_threads_params *)in_prm;
292        struct my_params *prm=(struct my_params *)(tprm->params);
293 
294        size_t i;
295 
296        for(i=0; tprm->indexs[i] != GAL_BLANK_SIZE_T; ++i)
297        {
298 
299            THE INDEX OF THE TARGET IS NOW AVAILABLE AS
300            'tprm->indexs[i]'. YOU CAN USE IT IN WHAT EVER MANNER YOU LIKE
301            ALONG WITH THE SET OF VARIABLES/ARRAYS in 'prm'.
302 
303        }
304 
305        if(tprm->b) pthread_barrier_wait(tprm->b);
306        return NULL;
307      }
308 
309 
310      ---------         High-level function        ---------
311      int
312      higher_level_function(float *array, size_t num_in_array, int value1)
313      {
314         double value2;
315         struct my_params;
316         size_t numthreads;
317 
318         my_params.value1=value1;
319         my_params.value2=value2;
320         my_params.arary=array;
321 
322         gal_threads_spin_off(run_on_thread, &my_params, num_in_array,
323                              numthreads);
324 
325         return 1;
326      }
327 
328   For real world applications of this function, you can also inspect the
329   Gnuastro source. There are also many cases in Gnuastro where we benefit
330   from this function. Please run the following command from the top source
331   directory of Gnuastro to see where:
332 
333       $ grep -r gal_threads_spin_off ./
334 */
335 void
gal_threads_spin_off(void * (* worker)(void *),void * caller_params,size_t numactions,size_t numthreads,size_t minmapsize,int quietmmap)336 gal_threads_spin_off(void *(*worker)(void *), void *caller_params,
337                      size_t numactions, size_t numthreads,
338                      size_t minmapsize, int quietmmap)
339 {
340   int err;
341   pthread_t t;          /* All thread ids saved in this, not used. */
342   char *mmapname=NULL;
343   pthread_attr_t attr;
344   pthread_barrier_t b;
345   struct gal_threads_params *prm;
346   size_t i, *indexs, thrdcols, numbarriers;
347 
348   /* If there are no actions, then just return. */
349   if(numactions==0) return;
350 
351   /* Sanity check. */
352   if(numthreads==0)
353     error(EXIT_FAILURE, 0, "%s: the number of threads ('numthreads') "
354           "cannot be zero", __func__);
355 
356   /* Allocate the array of parameters structure structures. */
357   errno=0;
358   prm=malloc(numthreads*sizeof *prm);
359   if(prm==NULL)
360     {
361       fprintf(stderr, "%zu bytes could not be allocated for prm.",
362               numthreads*sizeof *prm);
363       exit(EXIT_FAILURE);
364     }
365 
366   /* Distribute the actions into the threads: */
367   mmapname=gal_threads_dist_in_threads(numactions, numthreads, minmapsize,
368                                        quietmmap, &indexs, &thrdcols);
369 
370   /* Do the job: when only one thread is necessary, there is no need to
371      spin off one thread, just call the workerfunction directly (spinning
372      off threads is expensive). This is for the generic thread spinner
373      function, not this simple function where 'numthreads' is a
374      constant. */
375   if(numthreads==1)
376     {
377       prm[0].id=0;
378       prm[0].b=NULL;
379       prm[0].indexs=indexs;
380       prm[0].params=caller_params;
381       worker(&prm[0]);
382     }
383   else
384     {
385       /* Initialize the attributes. Note that this running thread
386          (that spinns off the nt threads) is also a thread, so the
387          number the barriers should be one more than the number of
388          threads spinned off. */
389       numbarriers = (numactions<numthreads ? numactions : numthreads) + 1;
390       gal_threads_attr_barrier_init(&attr, &b, numbarriers);
391 
392       /* Spin off the threads: */
393       for(i=0;i<numthreads;++i)
394         if(indexs[i*thrdcols]!=GAL_BLANK_SIZE_T)
395           {
396             prm[i].id=i;
397             prm[i].b=&b;
398             prm[i].params=caller_params;
399             prm[i].indexs=&indexs[i*thrdcols];
400             err=pthread_create(&t, &attr, worker, &prm[i]);
401             if(err)
402               {
403                 fprintf(stderr, "can't create thread %zu", i);
404                 exit(EXIT_FAILURE);
405               }
406           }
407 
408       /* Wait for all threads to finish and free the spaces. */
409       pthread_barrier_wait(&b);
410       pthread_attr_destroy(&attr);
411       pthread_barrier_destroy(&b);
412     }
413 
414   /* If 'mmapname' is NULL, then 'indexs' is in RAM and we can safely
415      'free' it. However, when its not NULL, then the space for 'indexs' has
416      been memory-mapped (its not in RAM) so special treatment is necessary
417      to delete it through the proper function. */
418   if(mmapname) gal_pointer_mmap_free(&mmapname, quietmmap);
419   else         free(indexs);
420 
421   /* Clean up. */
422   free(prm);
423 }
424