1 #ifndef PARALLELIZING_INFO_H_ 2 #define PARALLELIZING_INFO_H_ 3 4 #include <stdio.h> // for FILE 5 6 #include <sys/types.h> // for ssize_t 7 #include <sys/time.h> /* for struct timeval */ 8 #include "barrier.h" // for barrier_t 9 #include "macros.h" // for OMPI_VERSION_ATLEAST 10 #include "params.h" 11 #include "select_mpi.h" 12 #include "mpfq/mpfq_vbase.h" 13 14 /* 15 * The main guy here is the parallelizing_info data type. It is commonly 16 * declared as pi. 17 * 18 * Threads and jobs are arranged in a grid-like manner. Several logical 19 * communication groups are defined. One for the global grid, as well as 20 * one for each column / row. 21 * 22 * pi->m denotes the global communicator. There is only one such 23 * communicator. All jobs, all threads contribute to it. At the mpi 24 * level, the related communicator is MPI_COMM_WORLD (well, unless we're 25 * working in interleaving mode). 26 * 27 * Two other communicators are defined: 28 * pi->wr[0] denotes horizontal, a.k.a. ROW groups. 29 * pi->wr[1] denotes vertical, a.k.a. COLUMN groups. 30 * [Note: communicators used to be called "wirings", hence the variable 31 * name] 32 * 33 * When a matrix is mapped to a grid process of this kind, say a matrix 34 * having been split in nhs * nvs slices, then there are exactly nhs ROW 35 * groups, and nvs COLUMN groups. ROW groups consist of nvs (job,thread) 36 * items (as many as one finds COL groups), and conversely. 37 */ 38 39 /* don't enable this. Clutters output a lot */ 40 #define xxxCONCURRENCY_DEBUG 41 42 /* 43 * MPI_LIBRARY_MT_CAPABLE: do mpi communications in a furiously 44 * concurrent manner. 45 * 46 * This isn't ready yet. In presence of an MPI library which correctly 47 * supports MPI_THREAD_MULTIPLE, chances are that this gets close to 48 * working ok, and even maybe improve performance quite a bit. 49 * 50 * As of openmpi-1.8, this is not reliable enough when the basic 51 * transport layer is openib (on infiniband). Status may of course vary 52 * for other mpi implementations. 53 * 54 * As for the ``fake mpi'' implementation we have, well, it's up to the 55 * reader to decide. All collectives are nops, so it's trivially capable. 56 * OTOH, not setting the flags ensures that the rest of the code compiles 57 * ok. 58 * 59 * --> we never allow this flag currently. Corresponding pieces of code 60 * have been deleted as the program evolved, given that it had zero 61 * testing. 62 */ 63 64 #if defined(MPICH2) && MPICH2_NUMVERSION >= 10100002 65 /* In fact, even in this case we might consider disabling it. */ 66 #define xxxMPI_LIBRARY_MT_CAPABLE 67 #elif defined(OPEN_MPI) && OMPI_VERSION_ATLEAST(1,8,2) 68 #define xxxMPI_LIBRARY_MT_CAPABLE 69 /* 70 * at present I know of no version of openmpi with MPI_THREAD_MULTIPLE 71 * working, but to be honest I haven't tried hard. For sure there are 72 * some bugs in my code as well anyway, at least that's what enabling it 73 * shows. 74 */ 75 #else 76 /* Assume it does not work */ 77 #endif 78 79 /* {{{ definition of the parallelizing_info communicator type */ 80 81 /* {{{ utility structures for communicators. */ 82 83 /* {{{ This one is stored in thread-shared memory ; shared locks and so on */ 84 struct pthread_things { 85 barrier_t bh[1]; 86 my_pthread_barrier_t b[1]; 87 88 my_pthread_mutex_t m[1]; 89 char * desc; 90 void * utility_ptr; 91 // int count; 92 }; 93 /* }}} */ 94 95 /* {{{ logging. To be activated in debug mode only */ 96 struct pi_log_entry { 97 struct timeval tv[1]; 98 char what[80]; 99 }; 100 101 #define PI_LOG_BOOK_ENTRIES 32 102 struct pi_log_book { 103 struct pi_log_entry t[PI_LOG_BOOK_ENTRIES]; 104 int hsize; // history size -- only a count, once the things wraps. 105 int next; // next free pointer. 106 }; 107 /* }}} */ 108 /* }}} */ 109 110 struct pi_comm_s { /* {{{ */ 111 /* njobs : number of mpi jobs concerned by this logical group */ 112 /* ncores : number of threads concerned by this logical group */ 113 unsigned int njobs; 114 unsigned int ncores; 115 116 /* product njobs * ncores */ 117 unsigned int totalsize; 118 unsigned int jrank; 119 unsigned int trank; 120 MPI_Comm pals; 121 122 struct pthread_things * th; 123 #ifdef CONCURRENCY_DEBUG 124 int th_count; 125 #endif 126 struct pi_log_book * log_book; 127 }; 128 typedef struct pi_comm_s pi_comm[1]; 129 typedef struct pi_comm_s * pi_comm_ptr; 130 typedef const struct pi_comm_s * pi_comm_srcptr; 131 /* }}} */ 132 /* }}} */ 133 134 /* {{{ interleaving two pi structures. */ 135 struct pi_interleaving_s { 136 int idx; /* 0 or 1 */ 137 my_pthread_barrier_t * b; /* not a 1-sized array on purpose -- 138 being next to index, it can't ! */ 139 }; 140 typedef struct pi_interleaving_s pi_interleaving[1]; 141 typedef struct pi_interleaving_s * pi_interleaving_ptr; 142 /* }}} */ 143 144 /* {{{ This arbitrary associative array is meant to be very global, even 145 * common to two interleaved pi structures. Used to pass lightweight info 146 * only */ 147 typedef struct pi_dictionary_entry_s * pi_dictionary_entry_ptr; 148 struct pi_dictionary_entry_s { 149 unsigned long key; 150 unsigned long who; 151 void * value; 152 pi_dictionary_entry_ptr next; 153 }; 154 typedef struct pi_dictionary_entry_s pi_dictionary_entry[1]; 155 156 struct pi_dictionary_s { 157 my_pthread_rwlock_t m[1]; 158 pi_dictionary_entry_ptr e; 159 }; 160 typedef struct pi_dictionary_s pi_dictionary[1]; 161 typedef struct pi_dictionary_s * pi_dictionary_ptr; 162 /* }}} */ 163 164 /* {{{ global parallelizing_info handle */ 165 #define PI_NAMELEN 32 166 struct parallelizing_info_s { 167 // row-wise, column-wise. 168 pi_comm wr[2]; 169 // main. 170 pi_comm m; 171 pi_interleaving_ptr interleaved; 172 pi_dictionary_ptr dict; 173 char nodename[PI_NAMELEN]; 174 char nodeprefix[PI_NAMELEN]; 175 char nodenumber_s[PI_NAMELEN]; 176 /* This pointer is identical on all threads. It is non-null only in 177 * case we happen to have sufficiently recent gcc, together with 178 * sufficiently recent hwloc */ 179 void * cpubinding_info; 180 int thr_orig[2]; /* when only_mpi is 1, this is what the 181 thr parameter was set to originally. 182 Otherwise we have {0,0} here. */ 183 }; 184 185 typedef struct parallelizing_info_s parallelizing_info[1]; 186 typedef struct parallelizing_info_s * parallelizing_info_ptr; 187 typedef const struct parallelizing_info_s * parallelizing_info_srcptr; 188 /* }}} */ 189 190 /* {{{ collective operations and user-defined types */ 191 192 struct pi_datatype_s { 193 MPI_Datatype datatype; 194 /* two attributes we're really happy to use */ 195 mpfq_vbase_ptr abase; 196 size_t item_size; 197 }; 198 typedef struct pi_datatype_s * pi_datatype_ptr; 199 extern pi_datatype_ptr BWC_PI_INT; 200 extern pi_datatype_ptr BWC_PI_DOUBLE; 201 extern pi_datatype_ptr BWC_PI_BYTE; 202 extern pi_datatype_ptr BWC_PI_UNSIGNED; 203 extern pi_datatype_ptr BWC_PI_UNSIGNED_LONG; 204 extern pi_datatype_ptr BWC_PI_UNSIGNED_LONG_LONG; 205 extern pi_datatype_ptr BWC_PI_LONG; 206 extern pi_datatype_ptr BWC_PI_SIZE_T; 207 208 209 struct pi_op_s { 210 MPI_Op stock; /* typically MPI_SUM */ 211 MPI_Op custom; /* for mpfq types, the mpi-level user-defined op */ 212 void (*f_stock)(void *, void *, int *, MPI_Datatype *); 213 void (*f_custom)(void *, void *, size_t, pi_datatype_ptr); 214 }; 215 typedef struct pi_op_s * pi_op_ptr; 216 extern struct pi_op_s BWC_PI_MIN[1]; 217 extern struct pi_op_s BWC_PI_MAX[1]; 218 extern struct pi_op_s BWC_PI_SUM[1]; 219 extern struct pi_op_s BWC_PI_BXOR[1]; 220 extern struct pi_op_s BWC_PI_BAND[1]; 221 extern struct pi_op_s BWC_PI_BOR[1]; 222 223 /* we define new datatypes in a way which diverts from the mpi calling 224 * interface, because that interface is slightly awkward for our needs */ 225 226 #ifdef __cplusplus 227 extern "C" { 228 #endif 229 extern pi_datatype_ptr pi_alloc_mpfq_datatype(parallelizing_info_ptr pi, mpfq_vbase_ptr abase); 230 extern void pi_free_mpfq_datatype(parallelizing_info_ptr pi, pi_datatype_ptr ptr); 231 #ifdef __cplusplus 232 } 233 #endif 234 235 236 237 /* }}} */ 238 239 /* {{{ I/O layer */ 240 struct pi_file_handle_s { 241 char * name; /* just for reference. I doubt we'll need them */ 242 char * mode; 243 FILE * f; /* meaningful only at root */ 244 parallelizing_info_ptr pi; 245 int inner; 246 int outer; 247 }; 248 typedef struct pi_file_handle_s pi_file_handle[1]; 249 typedef struct pi_file_handle_s * pi_file_handle_ptr; 250 /* }}} */ 251 252 #ifdef __cplusplus 253 extern "C" { 254 #endif 255 256 extern void parallelizing_info_init(); 257 extern void parallelizing_info_finish(); 258 extern void parallelizing_info_decl_usage(param_list pl); 259 extern void parallelizing_info_lookup_parameters(param_list_ptr pl); 260 261 /* pi_go is the main function. It is responsible of creating all the 262 * parallelizing_info data structures, set up the different inter-job and 263 * inter-thread conciliation toys (communicators, pthread barriers, and 264 * so on), and eventually run the provided function. 265 * 266 * the param_list is checked for parameters mpi and thr, so as to define 267 * the mpi and thr splttings. 268 * 269 * nhc, nvc are the same for threads (cores). 270 */ 271 extern void pi_go( 272 void *(*fcn)(parallelizing_info_ptr, param_list pl, void * arg), 273 param_list pl, 274 void * arg); 275 276 extern void pi_hello(parallelizing_info_ptr pi); 277 278 /* I/O functions */ 279 extern int pi_file_open(pi_file_handle_ptr f, parallelizing_info_ptr pi, int inner, const char * name, const char * mode); 280 extern int pi_file_close(pi_file_handle_ptr f); 281 /* totalsize is the size which should be on disk. It may be shorter than 282 * the sum of the individual sizes, in case of padding */ 283 extern ssize_t pi_file_write(pi_file_handle_ptr f, void * buf, size_t size, size_t totalsize); 284 extern ssize_t pi_file_read(pi_file_handle_ptr f, void * buf, size_t size, size_t totalsize); 285 extern ssize_t pi_file_write_chunk(pi_file_handle_ptr f, void * buf, size_t size, size_t totalsize, size_t chunksize, size_t spos, size_t epos); 286 extern ssize_t pi_file_read_chunk(pi_file_handle_ptr f, void * buf, size_t size, size_t totalsize, size_t chunksize, size_t spos, size_t epos); 287 288 /* the parallelizing_info layer has some collective operations which 289 * deliberately have prototypes simlar or identical to their mpi 290 * counterparts (we use size_t for the count arguments, though). 291 * 292 * Note: These functions use the wr->utility_ptr field. 293 */ 294 295 /* almost similar to the mpi-level reduction functions, except that 296 * we added const, dropped the int *, and dropped the multiplexing 297 * argument with the datatype (perhaps we shouldn't, but so far we have 298 * no use for it) */ 299 typedef void (*thread_reducer_t)(const void *, void *, size_t); 300 301 /* pointers must be different on all threads */ 302 extern void pi_thread_bcast(void * sendbuf, 303 size_t count, pi_datatype_ptr datatype, 304 unsigned int root, 305 pi_comm_ptr wr); 306 extern void pi_bcast(void * sendbuf, 307 size_t count, pi_datatype_ptr datatype, 308 unsigned int jroot, unsigned int troot, 309 pi_comm_ptr wr); 310 extern void pi_abort(int err, pi_comm_ptr wr); 311 extern void pi_thread_allreduce(void * sendbuf, void * recvbuf, 312 size_t count, pi_datatype_ptr datatype, pi_op_ptr op, 313 pi_comm_ptr wr); 314 extern void pi_allreduce(void * sendbuf, void *recvbuf, 315 size_t count, pi_datatype_ptr datatype, pi_op_ptr op, 316 pi_comm_ptr wr); 317 extern void pi_allgather(void * sendbuf, 318 size_t sendcount, pi_datatype_ptr sendtype, 319 void *recvbuf, 320 size_t recvcount, pi_datatype_ptr recvtype, 321 pi_comm_ptr wr); 322 extern int pi_thread_data_eq(void * buffer, 323 size_t count, pi_datatype_ptr datatype, 324 pi_comm_ptr wr); 325 extern int pi_data_eq(void * buffer, 326 size_t count, pi_datatype_ptr datatype, 327 pi_comm_ptr wr); 328 329 /* shared_malloc is like malloc, except that the pointer returned will be 330 * equal on all threads (proper access will deserve proper locking of 331 * course). shared_malloc_set_zero sets to zero too */ 332 /* As a side-effect, all shared_* functions serialize threads */ 333 extern void * shared_malloc(pi_comm_ptr wr, size_t size); 334 extern void * shared_malloc_set_zero(pi_comm_ptr wr, size_t size); 335 extern void shared_free(pi_comm_ptr wr, void * ptr); 336 337 338 /* prints the given string in a ascii-form matrix. */ 339 extern void grid_print(parallelizing_info_ptr pi, char * buf, size_t siz, int print); 340 341 #define serialize(w) serialize__(w, __FILE__, __LINE__) 342 extern int serialize__(pi_comm_ptr, const char *, unsigned int); 343 #define serialize_threads(w) serialize_threads__(w, __FILE__, __LINE__) 344 extern int serialize_threads__(pi_comm_ptr, const char *, unsigned int); 345 346 /* stuff related to log entry printing */ 347 extern void pi_log_init(pi_comm_ptr); 348 extern void pi_log_clear(pi_comm_ptr); 349 extern void pi_log_op(pi_comm_ptr, const char * fmt, ...); 350 extern void pi_log_print_all(parallelizing_info_ptr); 351 extern void pi_log_print(pi_comm_ptr); 352 353 /* These are the calls for interleaving. The 2n threads are divided into 354 * two grous. It is guaranteed that at a given point, the two groups of n 355 * threads are separated on either size of the pi_interleaving_flip call. 356 * 357 * The called function must make sure that alternating blocks (delimited 358 * by _flip) either do or don't contain mpi calls, IN TURN. 359 * 360 * _enter and _leave are called from pi_go, so although they are exposed, 361 * one does not have to know about them. 362 */ 363 extern void pi_interleaving_flip(parallelizing_info_ptr); 364 extern void pi_interleaving_enter(parallelizing_info_ptr); 365 extern void pi_interleaving_leave(parallelizing_info_ptr); 366 367 extern void pi_store_generic(parallelizing_info_ptr, unsigned long, unsigned long, void *); 368 extern void * pi_load_generic(parallelizing_info_ptr, unsigned long, unsigned long); 369 370 extern void pi_dictionary_init(pi_dictionary_ptr); 371 extern void pi_dictionary_clear(pi_dictionary_ptr); 372 373 #ifdef __cplusplus 374 } 375 #endif 376 377 /* This provides a fairly typical construct, used like this: 378 * SEVERAL_THREADS_PLAY_MPI_BEGIN(some pi communicator) { 379 * some code which happens for threads in the pi comm in turn 380 * } 381 * SEVERAL_THREADS_PLAY_MPI_END(); 382 */ 383 #ifndef MPI_LIBRARY_MT_CAPABLE 384 #define SEVERAL_THREADS_PLAY_MPI_BEGIN(comm) do { \ 385 for(unsigned int t__ = 0 ; t__ < comm->ncores ; t__++) { \ 386 serialize_threads(comm); \ 387 if (t__ != comm->trank) continue; /* not our turn. */ \ 388 do 389 #define SEVERAL_THREADS_PLAY_MPI_END() while (0); } } while (0) 390 /* This construct is used similarly. It differs slightly, in that we 391 * guarantee that only one thread (in the communicator) will issue mpi 392 * calls */ 393 #define SEVERAL_THREADS_PLAY_MPI_BEGIN2(comm, t__) do { \ 394 serialize_threads(comm); \ 395 if (comm->trank == 0) { \ 396 for(unsigned int t__ = 0 ; t__ < comm->ncores ; t__++) { \ 397 do 398 #define SEVERAL_THREADS_PLAY_MPI_END2(comm) \ 399 while (0); \ 400 } \ 401 } \ 402 serialize_threads(comm); \ 403 } while (0) 404 #else 405 #define SEVERAL_THREADS_PLAY_MPI_BEGIN(comm) /**/ 406 #define SEVERAL_THREADS_PLAY_MPI_END() /**/ 407 #define SEVERAL_THREADS_PLAY_MPI_BEGIN2(comm, t) /**/ 408 #define SEVERAL_THREADS_PLAY_MPI_END2(comm) /**/ 409 #endif 410 411 #endif /* PARALLELIZING_INFO_H_ */ 412