1 #ifndef PARALLELIZING_INFO_H_
2 #define PARALLELIZING_INFO_H_
3 
4 #include <stdio.h>            // for FILE
5 
6 #include <sys/types.h>        // for ssize_t
7 #include <sys/time.h>   /* for struct timeval */
8 #include "barrier.h"          // for barrier_t
9 #include "macros.h"           // for OMPI_VERSION_ATLEAST
10 #include "params.h"
11 #include "select_mpi.h"
12 #include "mpfq/mpfq_vbase.h"
13 
14 /*
15  * The main guy here is the parallelizing_info data type. It is commonly
16  * declared as pi.
17  *
18  * Threads and jobs are arranged in a grid-like manner. Several logical
19  * communication groups are defined. One for the global grid, as well as
20  * one for each column / row.
21  *
22  * pi->m denotes the global communicator. There is only one such
23  * communicator. All jobs, all threads contribute to it. At the mpi
24  * level, the related communicator is MPI_COMM_WORLD (well, unless we're
25  * working in interleaving mode).
26  *
27  * Two other communicators are defined:
28  *      pi->wr[0] denotes horizontal, a.k.a. ROW groups.
29  *      pi->wr[1] denotes vertical, a.k.a. COLUMN groups.
30  * [Note: communicators used to be called "wirings", hence the variable
31  * name]
32  *
33  * When a matrix is mapped to a grid process of this kind, say a matrix
34  * having been split in nhs * nvs slices, then there are exactly nhs ROW
35  * groups, and nvs COLUMN groups. ROW groups consist of nvs (job,thread)
36  * items (as many as one finds COL groups), and conversely.
37  */
38 
39 /* don't enable this. Clutters output a lot */
40 #define xxxCONCURRENCY_DEBUG
41 
42 /*
43  * MPI_LIBRARY_MT_CAPABLE: do mpi communications in a furiously
44  * concurrent manner.
45  *
46  * This isn't ready yet. In presence of an MPI library which correctly
47  * supports MPI_THREAD_MULTIPLE, chances are that this gets close to
48  * working ok, and even maybe improve performance quite a bit.
49  *
50  * As of openmpi-1.8, this is not reliable enough when the basic
51  * transport layer is openib (on infiniband). Status may of course vary
52  * for other mpi implementations.
53  *
54  * As for the ``fake mpi'' implementation we have, well, it's up to the
55  * reader to decide. All collectives are nops, so it's trivially capable.
56  * OTOH, not setting the flags ensures that the rest of the code compiles
57  * ok.
58  *
59  * --> we never allow this flag currently. Corresponding pieces of code
60  *  have been deleted as the program evolved, given that it had zero
61  *  testing.
62  */
63 
64 #if defined(MPICH2) && MPICH2_NUMVERSION >= 10100002
65 /* In fact, even in this case we might consider disabling it. */
66 #define xxxMPI_LIBRARY_MT_CAPABLE
67 #elif defined(OPEN_MPI) && OMPI_VERSION_ATLEAST(1,8,2)
68 #define xxxMPI_LIBRARY_MT_CAPABLE
69 /*
70  * at present I know of no version of openmpi with MPI_THREAD_MULTIPLE
71  * working, but to be honest I haven't tried hard. For sure there are
72  * some bugs in my code as well anyway, at least that's what enabling it
73  * shows.
74  */
75 #else
76 /* Assume it does not work */
77 #endif
78 
79 /* {{{ definition of the parallelizing_info communicator type */
80 
81 /* {{{ utility structures for communicators. */
82 
83 /* {{{ This one is stored in thread-shared memory ; shared locks and so on */
84 struct pthread_things {
85     barrier_t bh[1];
86     my_pthread_barrier_t b[1];
87 
88     my_pthread_mutex_t m[1];
89     char * desc;
90     void * utility_ptr;
91     // int count;
92 };
93 /* }}} */
94 
95 /* {{{ logging. To be activated in debug mode only */
96 struct pi_log_entry {
97     struct timeval tv[1];
98     char what[80];
99 };
100 
101 #define PI_LOG_BOOK_ENTRIES     32
102 struct pi_log_book {
103     struct pi_log_entry t[PI_LOG_BOOK_ENTRIES];
104     int hsize;  // history size -- only a count, once the things wraps.
105     int next;   // next free pointer.
106 };
107 /* }}} */
108 /* }}} */
109 
110 struct pi_comm_s { /* {{{ */
111     /* njobs : number of mpi jobs concerned by this logical group */
112     /* ncores : number of threads concerned by this logical group */
113     unsigned int njobs;
114     unsigned int ncores;
115 
116     /* product njobs * ncores */
117     unsigned int totalsize;
118     unsigned int jrank;
119     unsigned int trank;
120     MPI_Comm pals;
121 
122     struct pthread_things * th;
123 #ifdef  CONCURRENCY_DEBUG
124     int th_count;
125 #endif
126     struct pi_log_book * log_book;
127 };
128 typedef struct pi_comm_s pi_comm[1];
129 typedef struct pi_comm_s * pi_comm_ptr;
130 typedef const struct pi_comm_s * pi_comm_srcptr;
131 /* }}} */
132 /* }}} */
133 
134 /* {{{ interleaving two pi structures. */
135 struct pi_interleaving_s {
136     int idx;                    /* 0 or 1 */
137     my_pthread_barrier_t * b;   /* not a 1-sized array on purpose --
138                                    being next to index, it can't ! */
139 };
140 typedef struct pi_interleaving_s pi_interleaving[1];
141 typedef struct pi_interleaving_s * pi_interleaving_ptr;
142 /* }}} */
143 
144 /* {{{ This arbitrary associative array is meant to be very global, even
145  * common to two interleaved pi structures. Used to pass lightweight info
146  * only */
147 typedef struct pi_dictionary_entry_s * pi_dictionary_entry_ptr;
148 struct pi_dictionary_entry_s {
149     unsigned long key;
150     unsigned long who;
151     void * value;
152     pi_dictionary_entry_ptr next;
153 };
154 typedef struct pi_dictionary_entry_s pi_dictionary_entry[1];
155 
156 struct pi_dictionary_s {
157     my_pthread_rwlock_t m[1];
158     pi_dictionary_entry_ptr e;
159 };
160 typedef struct pi_dictionary_s pi_dictionary[1];
161 typedef struct pi_dictionary_s * pi_dictionary_ptr;
162 /* }}} */
163 
164 /* {{{ global parallelizing_info handle */
165 #define PI_NAMELEN      32
166 struct parallelizing_info_s {
167     // row-wise, column-wise.
168     pi_comm wr[2];
169     // main.
170     pi_comm m;
171     pi_interleaving_ptr interleaved;
172     pi_dictionary_ptr dict;
173     char nodename[PI_NAMELEN];
174     char nodeprefix[PI_NAMELEN];
175     char nodenumber_s[PI_NAMELEN];
176     /* This pointer is identical on all threads. It is non-null only in
177      * case we happen to have sufficiently recent gcc, together with
178      * sufficiently recent hwloc */
179     void * cpubinding_info;
180     int thr_orig[2];            /* when only_mpi is 1, this is what the
181                                    thr parameter was set to originally.
182                                    Otherwise we have {0,0} here. */
183 };
184 
185 typedef struct parallelizing_info_s parallelizing_info[1];
186 typedef struct parallelizing_info_s * parallelizing_info_ptr;
187 typedef const struct parallelizing_info_s * parallelizing_info_srcptr;
188 /* }}} */
189 
190 /* {{{ collective operations and user-defined types */
191 
192 struct pi_datatype_s {
193     MPI_Datatype datatype;
194     /* two attributes we're really happy to use */
195     mpfq_vbase_ptr abase;
196     size_t item_size;
197 };
198 typedef struct pi_datatype_s * pi_datatype_ptr;
199 extern pi_datatype_ptr BWC_PI_INT;
200 extern pi_datatype_ptr BWC_PI_DOUBLE;
201 extern pi_datatype_ptr BWC_PI_BYTE;
202 extern pi_datatype_ptr BWC_PI_UNSIGNED;
203 extern pi_datatype_ptr BWC_PI_UNSIGNED_LONG;
204 extern pi_datatype_ptr BWC_PI_UNSIGNED_LONG_LONG;
205 extern pi_datatype_ptr BWC_PI_LONG;
206 extern pi_datatype_ptr BWC_PI_SIZE_T;
207 
208 
209 struct pi_op_s {
210     MPI_Op stock;  /* typically MPI_SUM */
211     MPI_Op custom;  /* for mpfq types, the mpi-level user-defined op */
212     void (*f_stock)(void *, void *, int *, MPI_Datatype *);
213     void (*f_custom)(void *, void *, size_t, pi_datatype_ptr);
214 };
215 typedef struct pi_op_s * pi_op_ptr;
216 extern struct pi_op_s BWC_PI_MIN[1];
217 extern struct pi_op_s BWC_PI_MAX[1];
218 extern struct pi_op_s BWC_PI_SUM[1];
219 extern struct pi_op_s BWC_PI_BXOR[1];
220 extern struct pi_op_s BWC_PI_BAND[1];
221 extern struct pi_op_s BWC_PI_BOR[1];
222 
223 /* we define new datatypes in a way which diverts from the mpi calling
224  * interface, because that interface is slightly awkward for our needs */
225 
226 #ifdef __cplusplus
227 extern "C" {
228 #endif
229 extern pi_datatype_ptr pi_alloc_mpfq_datatype(parallelizing_info_ptr pi, mpfq_vbase_ptr abase);
230 extern void pi_free_mpfq_datatype(parallelizing_info_ptr pi, pi_datatype_ptr ptr);
231 #ifdef __cplusplus
232 }
233 #endif
234 
235 
236 
237 /* }}} */
238 
239 /* {{{ I/O layer */
240 struct pi_file_handle_s {
241     char * name;        /* just for reference. I doubt we'll need them */
242     char * mode;
243     FILE * f;   /* meaningful only at root */
244     parallelizing_info_ptr pi;
245     int inner;
246     int outer;
247 };
248 typedef struct pi_file_handle_s pi_file_handle[1];
249 typedef struct pi_file_handle_s * pi_file_handle_ptr;
250 /* }}} */
251 
252 #ifdef __cplusplus
253 extern "C" {
254 #endif
255 
256 extern void parallelizing_info_init();
257 extern void parallelizing_info_finish();
258 extern void parallelizing_info_decl_usage(param_list pl);
259 extern void parallelizing_info_lookup_parameters(param_list_ptr pl);
260 
261 /* pi_go is the main function. It is responsible of creating all the
262  * parallelizing_info data structures, set up the different inter-job and
263  * inter-thread conciliation toys (communicators, pthread barriers, and
264  * so on), and eventually run the provided function.
265  *
266  * the param_list is checked for parameters mpi and thr, so as to define
267  * the mpi and thr splttings.
268  *
269  * nhc, nvc are the same for threads (cores).
270  */
271 extern void pi_go(
272         void *(*fcn)(parallelizing_info_ptr, param_list pl, void * arg),
273         param_list pl,
274         void * arg);
275 
276 extern void pi_hello(parallelizing_info_ptr pi);
277 
278 /* I/O functions */
279 extern int pi_file_open(pi_file_handle_ptr f, parallelizing_info_ptr pi, int inner, const char * name, const char * mode);
280 extern int pi_file_close(pi_file_handle_ptr f);
281 /* totalsize is the size which should be on disk. It may be shorter than
282  * the sum of the individual sizes, in case of padding */
283 extern ssize_t pi_file_write(pi_file_handle_ptr f, void * buf, size_t size, size_t totalsize);
284 extern ssize_t pi_file_read(pi_file_handle_ptr f, void * buf, size_t size, size_t totalsize);
285 extern ssize_t pi_file_write_chunk(pi_file_handle_ptr f, void * buf, size_t size, size_t totalsize, size_t chunksize, size_t spos, size_t epos);
286 extern ssize_t pi_file_read_chunk(pi_file_handle_ptr f, void * buf, size_t size, size_t totalsize, size_t chunksize, size_t spos, size_t epos);
287 
288 /* the parallelizing_info layer has some collective operations which
289  * deliberately have prototypes simlar or identical to their mpi
290  * counterparts (we use size_t for the count arguments, though).
291  *
292  * Note: These functions use the wr->utility_ptr field.
293  */
294 
295 /* almost similar to the mpi-level reduction functions, except that
296  * we added const, dropped the int *, and dropped the multiplexing
297  * argument with the datatype (perhaps we shouldn't, but so far we have
298  * no use for it) */
299 typedef void (*thread_reducer_t)(const void *, void *, size_t);
300 
301 /* pointers must be different on all threads */
302 extern void pi_thread_bcast(void * sendbuf,
303         size_t count, pi_datatype_ptr datatype,
304         unsigned int root,
305         pi_comm_ptr wr);
306 extern void pi_bcast(void * sendbuf,
307         size_t count, pi_datatype_ptr datatype,
308         unsigned int jroot, unsigned int troot,
309         pi_comm_ptr wr);
310 extern void pi_abort(int err, pi_comm_ptr wr);
311 extern void pi_thread_allreduce(void * sendbuf, void * recvbuf,
312         size_t count, pi_datatype_ptr datatype, pi_op_ptr op,
313         pi_comm_ptr wr);
314 extern void pi_allreduce(void * sendbuf, void *recvbuf,
315         size_t count, pi_datatype_ptr datatype, pi_op_ptr op,
316         pi_comm_ptr wr);
317 extern void pi_allgather(void * sendbuf,
318         size_t sendcount, pi_datatype_ptr sendtype,
319         void *recvbuf,
320         size_t recvcount, pi_datatype_ptr recvtype,
321         pi_comm_ptr wr);
322 extern int pi_thread_data_eq(void * buffer,
323         size_t count, pi_datatype_ptr datatype,
324         pi_comm_ptr wr);
325 extern int pi_data_eq(void * buffer,
326         size_t count, pi_datatype_ptr datatype,
327         pi_comm_ptr wr);
328 
329 /* shared_malloc is like malloc, except that the pointer returned will be
330  * equal on all threads (proper access will deserve proper locking of
331  * course). shared_malloc_set_zero sets to zero too */
332 /* As a side-effect, all shared_* functions serialize threads */
333 extern void * shared_malloc(pi_comm_ptr wr, size_t size);
334 extern void * shared_malloc_set_zero(pi_comm_ptr wr, size_t size);
335 extern void shared_free(pi_comm_ptr wr, void * ptr);
336 
337 
338 /* prints the given string in a ascii-form matrix. */
339 extern void grid_print(parallelizing_info_ptr pi, char * buf, size_t siz, int print);
340 
341 #define serialize(w)   serialize__(w, __FILE__, __LINE__)
342 extern int serialize__(pi_comm_ptr, const char *, unsigned int);
343 #define serialize_threads(w)   serialize_threads__(w, __FILE__, __LINE__)
344 extern int serialize_threads__(pi_comm_ptr, const char *, unsigned int);
345 
346 /* stuff related to log entry printing */
347 extern void pi_log_init(pi_comm_ptr);
348 extern void pi_log_clear(pi_comm_ptr);
349 extern void pi_log_op(pi_comm_ptr, const char * fmt, ...);
350 extern void pi_log_print_all(parallelizing_info_ptr);
351 extern void pi_log_print(pi_comm_ptr);
352 
353 /* These are the calls for interleaving. The 2n threads are divided into
354  * two grous. It is guaranteed that at a given point, the two groups of n
355  * threads are separated on either size of the pi_interleaving_flip call.
356  *
357  * The called function must make sure that alternating blocks (delimited
358  * by _flip) either do or don't contain mpi calls, IN TURN.
359  *
360  * _enter and _leave are called from pi_go, so although they are exposed,
361  * one does not have to know about them.
362  */
363 extern void pi_interleaving_flip(parallelizing_info_ptr);
364 extern void pi_interleaving_enter(parallelizing_info_ptr);
365 extern void pi_interleaving_leave(parallelizing_info_ptr);
366 
367 extern void pi_store_generic(parallelizing_info_ptr, unsigned long, unsigned long, void *);
368 extern void * pi_load_generic(parallelizing_info_ptr, unsigned long, unsigned long);
369 
370 extern void pi_dictionary_init(pi_dictionary_ptr);
371 extern void pi_dictionary_clear(pi_dictionary_ptr);
372 
373 #ifdef __cplusplus
374 }
375 #endif
376 
377 /* This provides a fairly typical construct, used like this:
378  * SEVERAL_THREADS_PLAY_MPI_BEGIN(some pi communicator) {
379  *      some code which happens for threads in the pi comm in turn
380  * }
381  * SEVERAL_THREADS_PLAY_MPI_END();
382  */
383 #ifndef MPI_LIBRARY_MT_CAPABLE
384 #define SEVERAL_THREADS_PLAY_MPI_BEGIN(comm) do {			\
385     for(unsigned int t__ = 0 ; t__ < comm->ncores ; t__++) {		\
386         serialize_threads(comm);					\
387         if (t__ != comm->trank) continue; /* not our turn. */           \
388         do
389 #define SEVERAL_THREADS_PLAY_MPI_END()    while (0); } } while (0)
390 /* This construct is used similarly. It differs slightly, in that we
391  * guarantee that only one thread (in the communicator) will issue mpi
392  * calls */
393 #define SEVERAL_THREADS_PLAY_MPI_BEGIN2(comm, t__) do { 		\
394     serialize_threads(comm);                                            \
395     if (comm->trank == 0) {                                             \
396         for(unsigned int t__ = 0 ; t__ < comm->ncores ; t__++) {	\
397             do
398 #define SEVERAL_THREADS_PLAY_MPI_END2(comm)                             \
399             while (0);							\
400         }								\
401     }									\
402     serialize_threads(comm);                                            \
403 } while (0)
404 #else
405 #define SEVERAL_THREADS_PLAY_MPI_BEGIN(comm)     /**/
406 #define SEVERAL_THREADS_PLAY_MPI_END()           /**/
407 #define SEVERAL_THREADS_PLAY_MPI_BEGIN2(comm, t) /**/
408 #define SEVERAL_THREADS_PLAY_MPI_END2(comm)      /**/
409 #endif
410 
411 #endif	/* PARALLELIZING_INFO_H_ */
412