1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
2 /*
3 * Copyright (c) 2004-2007 The Trustees of Indiana University.
4 * All rights reserved.
5 * Copyright (c) 2004-2017 The University of Tennessee and The University
6 * of Tennessee Research Foundation. All rights
7 * reserved.
8 * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
9 * University of Stuttgart. All rights reserved.
10 * Copyright (c) 2004-2005 The Regents of the University of California.
11 * All rights reserved.
12 * Copyright (c) 2007-2015 Los Alamos National Security, LLC. All rights
13 * reserved.
14 * Copyright (c) 2006-2008 University of Houston. All rights reserved.
15 * Copyright (c) 2010 Oracle and/or its affiliates. All rights reserved.
16 * Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved.
17 * Copyright (c) 2015-2016 Research Organization for Information Science
18 * and Technology (RIST). All rights reserved.
19 * Copyright (c) 2016-2017 IBM Corporation. All rights reserved.
20 * $COPYRIGHT$
21 *
22 * Additional copyrights may follow
23 *
24 * $HEADER$
25 */
26
27 #include "ompi_config.h"
28 #include "opal/util/show_help.h"
29
30 #include <string.h>
31
32 #include "osc_pt2pt.h"
33 #include "osc_pt2pt_frag.h"
34 #include "osc_pt2pt_request.h"
35 #include "osc_pt2pt_data_move.h"
36
37 #include "ompi/mca/osc/base/osc_base_obj_convert.h"
38
39 static int component_register(void);
40 static int component_init(bool enable_progress_threads, bool enable_mpi_threads);
41 static int component_finalize(void);
42 static int component_query(struct ompi_win_t *win, void **base, size_t size, int disp_unit,
43 struct ompi_communicator_t *comm, struct opal_info_t *info,
44 int flavor);
45 static int component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit,
46 struct ompi_communicator_t *comm, struct opal_info_t *info,
47 int flavor, int *model);
48
49 ompi_osc_pt2pt_component_t mca_osc_pt2pt_component = {
50 { /* ompi_osc_base_component_t */
51 .osc_version = {
52 OMPI_OSC_BASE_VERSION_3_0_0,
53 .mca_component_name = "pt2pt",
54 MCA_BASE_MAKE_VERSION(component, OMPI_MAJOR_VERSION, OMPI_MINOR_VERSION,
55 OMPI_RELEASE_VERSION),
56 .mca_register_component_params = component_register,
57 },
58 .osc_data = {
59 /* The component is not checkpoint ready */
60 MCA_BASE_METADATA_PARAM_NONE
61 },
62 .osc_init = component_init,
63 .osc_query = component_query,
64 .osc_select = component_select,
65 .osc_finalize = component_finalize,
66 }
67 };
68
69
70 ompi_osc_pt2pt_module_t ompi_osc_pt2pt_module_template = {
71 {
72 NULL, /* shared_query */
73
74 ompi_osc_pt2pt_attach,
75 ompi_osc_pt2pt_detach,
76 ompi_osc_pt2pt_free,
77
78 ompi_osc_pt2pt_put,
79 ompi_osc_pt2pt_get,
80 ompi_osc_pt2pt_accumulate,
81 ompi_osc_pt2pt_compare_and_swap,
82 ompi_osc_pt2pt_fetch_and_op,
83 ompi_osc_pt2pt_get_accumulate,
84
85 ompi_osc_pt2pt_rput,
86 ompi_osc_pt2pt_rget,
87 ompi_osc_pt2pt_raccumulate,
88 ompi_osc_pt2pt_rget_accumulate,
89
90 ompi_osc_pt2pt_fence,
91
92 ompi_osc_pt2pt_start,
93 ompi_osc_pt2pt_complete,
94 ompi_osc_pt2pt_post,
95 ompi_osc_pt2pt_wait,
96 ompi_osc_pt2pt_test,
97
98 ompi_osc_pt2pt_lock,
99 ompi_osc_pt2pt_unlock,
100 ompi_osc_pt2pt_lock_all,
101 ompi_osc_pt2pt_unlock_all,
102
103 ompi_osc_pt2pt_sync,
104 ompi_osc_pt2pt_flush,
105 ompi_osc_pt2pt_flush_all,
106 ompi_osc_pt2pt_flush_local,
107 ompi_osc_pt2pt_flush_local_all,
108 }
109 };
110
111 bool ompi_osc_pt2pt_no_locks = false;
112 static bool using_thread_multiple = false;
113
114 /* look up parameters for configuring this window. The code first
115 looks in the info structure passed by the user, then through mca
116 parameters. */
check_config_value_bool(char * key,opal_info_t * info,bool result)117 static bool check_config_value_bool(char *key, opal_info_t *info, bool result)
118 {
119 int flag;
120
121 (void) opal_info_get_bool (info, key, &result, &flag);
122 return result;
123 }
124
component_register(void)125 static int component_register (void)
126 {
127 ompi_osc_pt2pt_no_locks = false;
128 (void) mca_base_component_var_register(&mca_osc_pt2pt_component.super.osc_version,
129 "no_locks",
130 "Enable optimizations available only if MPI_LOCK is "
131 "not used. "
132 "Info key of same name overrides this value.",
133 MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0,
134 OPAL_INFO_LVL_9,
135 MCA_BASE_VAR_SCOPE_READONLY,
136 &ompi_osc_pt2pt_no_locks);
137
138 mca_osc_pt2pt_component.buffer_size = 8192;
139 (void) mca_base_component_var_register (&mca_osc_pt2pt_component.super.osc_version, "buffer_size",
140 "Data transfers smaller than this limit may be coalesced before "
141 "being transferred (default: 8k)", MCA_BASE_VAR_TYPE_UNSIGNED_INT,
142 NULL, 0, 0, OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY,
143 &mca_osc_pt2pt_component.buffer_size);
144
145 mca_osc_pt2pt_component.receive_count = 4;
146 (void) mca_base_component_var_register (&mca_osc_pt2pt_component.super.osc_version, "receive_count",
147 "Number of receives to post for each window for incoming fragments "
148 "(default: 4)", MCA_BASE_VAR_TYPE_UNSIGNED_INT, NULL, 0, 0, OPAL_INFO_LVL_4,
149 MCA_BASE_VAR_SCOPE_READONLY, &mca_osc_pt2pt_component.receive_count);
150
151 return OMPI_SUCCESS;
152 }
153
component_progress(void)154 static int component_progress (void)
155 {
156 int completed = 0;
157 int pending_count = opal_list_get_size (&mca_osc_pt2pt_component.pending_operations);
158 int recv_count = opal_list_get_size (&mca_osc_pt2pt_component.pending_receives);
159 ompi_osc_pt2pt_pending_t *pending, *next;
160
161 if (recv_count) {
162 for (int i = 0 ; i < recv_count ; ++i) {
163 OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.pending_receives_lock);
164 ompi_osc_pt2pt_receive_t *recv = (ompi_osc_pt2pt_receive_t *) opal_list_remove_first (&mca_osc_pt2pt_component.pending_receives);
165 OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.pending_receives_lock);
166 if (NULL == recv) {
167 break;
168 }
169
170 (void) ompi_osc_pt2pt_process_receive (recv);
171 completed++;
172 }
173 }
174
175 /* process one incoming request */
176 if (pending_count) {
177 OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.pending_operations_lock);
178 OPAL_LIST_FOREACH_SAFE(pending, next, &mca_osc_pt2pt_component.pending_operations, ompi_osc_pt2pt_pending_t) {
179 int ret;
180
181 switch (pending->header.base.type) {
182 case OMPI_OSC_PT2PT_HDR_TYPE_FLUSH_REQ:
183 ret = ompi_osc_pt2pt_process_flush (pending->module, pending->source,
184 &pending->header.flush);
185 break;
186 case OMPI_OSC_PT2PT_HDR_TYPE_UNLOCK_REQ:
187 ret = ompi_osc_pt2pt_process_unlock (pending->module, pending->source,
188 &pending->header.unlock);
189 break;
190 default:
191 /* shouldn't happen */
192 assert (0);
193 abort ();
194 }
195
196 if (OMPI_SUCCESS == ret) {
197 opal_list_remove_item (&mca_osc_pt2pt_component.pending_operations, &pending->super);
198 OBJ_RELEASE(pending);
199 completed++;
200 }
201 }
202 OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.pending_operations_lock);
203 }
204
205 return completed;
206 }
207
208 static int
component_init(bool enable_progress_threads,bool enable_mpi_threads)209 component_init(bool enable_progress_threads,
210 bool enable_mpi_threads)
211 {
212 int ret;
213
214 if (enable_mpi_threads) {
215 using_thread_multiple = true;
216 }
217
218 OBJ_CONSTRUCT(&mca_osc_pt2pt_component.lock, opal_mutex_t);
219 OBJ_CONSTRUCT(&mca_osc_pt2pt_component.pending_operations, opal_list_t);
220 OBJ_CONSTRUCT(&mca_osc_pt2pt_component.pending_operations_lock, opal_mutex_t);
221 OBJ_CONSTRUCT(&mca_osc_pt2pt_component.pending_receives, opal_list_t);
222 OBJ_CONSTRUCT(&mca_osc_pt2pt_component.pending_receives_lock, opal_mutex_t);
223
224 OBJ_CONSTRUCT(&mca_osc_pt2pt_component.modules,
225 opal_hash_table_t);
226 opal_hash_table_init(&mca_osc_pt2pt_component.modules, 2);
227
228 mca_osc_pt2pt_component.progress_enable = false;
229 mca_osc_pt2pt_component.module_count = 0;
230
231 OBJ_CONSTRUCT(&mca_osc_pt2pt_component.frags, opal_free_list_t);
232 ret = opal_free_list_init (&mca_osc_pt2pt_component.frags,
233 sizeof(ompi_osc_pt2pt_frag_t), 8,
234 OBJ_CLASS(ompi_osc_pt2pt_frag_t),
235 mca_osc_pt2pt_component.buffer_size +
236 sizeof (ompi_osc_pt2pt_frag_header_t),
237 8, 1, -1, 1, NULL, 0, NULL, NULL, NULL);
238 if (OMPI_SUCCESS != ret) {
239 opal_output_verbose(1, ompi_osc_base_framework.framework_output,
240 "%s:%d: opal_free_list_init failed: %d",
241 __FILE__, __LINE__, ret);
242 return ret;
243 }
244
245 OBJ_CONSTRUCT(&mca_osc_pt2pt_component.requests, opal_free_list_t);
246 ret = opal_free_list_init (&mca_osc_pt2pt_component.requests,
247 sizeof(ompi_osc_pt2pt_request_t), 8,
248 OBJ_CLASS(ompi_osc_pt2pt_request_t),
249 0, 0, 0, -1, 32, NULL, 0, NULL, NULL, NULL);
250 if (OMPI_SUCCESS != ret) {
251 opal_output_verbose(1, ompi_osc_base_framework.framework_output,
252 "%s:%d: opal_free_list_init failed: %d\n",
253 __FILE__, __LINE__, ret);
254 return ret;
255 }
256
257 return ret;
258 }
259
260
261 int
component_finalize(void)262 component_finalize(void)
263 {
264 size_t num_modules;
265
266 if (mca_osc_pt2pt_component.progress_enable) {
267 opal_progress_unregister (component_progress);
268 }
269
270 if (0 !=
271 (num_modules = opal_hash_table_get_size(&mca_osc_pt2pt_component.modules))) {
272 opal_output(ompi_osc_base_framework.framework_output,
273 "WARNING: There were %d Windows created but not freed.",
274 (int) num_modules);
275 }
276
277 OBJ_DESTRUCT(&mca_osc_pt2pt_component.frags);
278 OBJ_DESTRUCT(&mca_osc_pt2pt_component.modules);
279 OBJ_DESTRUCT(&mca_osc_pt2pt_component.lock);
280 OBJ_DESTRUCT(&mca_osc_pt2pt_component.requests);
281 OBJ_DESTRUCT(&mca_osc_pt2pt_component.pending_operations);
282 OBJ_DESTRUCT(&mca_osc_pt2pt_component.pending_operations_lock);
283 OBJ_DESTRUCT(&mca_osc_pt2pt_component.pending_receives);
284 OBJ_DESTRUCT(&mca_osc_pt2pt_component.pending_receives_lock);
285
286 return OMPI_SUCCESS;
287 }
288
289
290 static int
component_query(struct ompi_win_t * win,void ** base,size_t size,int disp_unit,struct ompi_communicator_t * comm,struct opal_info_t * info,int flavor)291 component_query(struct ompi_win_t *win, void **base, size_t size, int disp_unit,
292 struct ompi_communicator_t *comm, struct opal_info_t *info,
293 int flavor)
294 {
295 if (MPI_WIN_FLAVOR_SHARED == flavor) return -1;
296
297 return 10;
298 }
299
300
301 static int
component_select(struct ompi_win_t * win,void ** base,size_t size,int disp_unit,struct ompi_communicator_t * comm,struct opal_info_t * info,int flavor,int * model)302 component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit,
303 struct ompi_communicator_t *comm, struct opal_info_t *info,
304 int flavor, int *model)
305 {
306 ompi_osc_pt2pt_module_t *module = NULL;
307 int ret;
308 char *name;
309
310 /* We don't support shared windows; that's for the sm onesided
311 component */
312 if (MPI_WIN_FLAVOR_SHARED == flavor) return OMPI_ERR_NOT_SUPPORTED;
313
314 /*
315 * workaround for issue https://github.com/open-mpi/ompi/issues/2614
316 * The following check needs to be removed once 2614 is addressed.
317 */
318 if (using_thread_multiple) {
319 opal_show_help("help-osc-pt2pt.txt", "mpi-thread-multiple-not-supported", true);
320 return OMPI_ERR_NOT_SUPPORTED;
321 }
322
323 /* create module structure with all fields initialized to zero */
324 module = (ompi_osc_pt2pt_module_t*)
325 calloc(1, sizeof(ompi_osc_pt2pt_module_t));
326 if (NULL == module) return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
327
328 /* fill in the function pointer part */
329 memcpy(module, &ompi_osc_pt2pt_module_template,
330 sizeof(ompi_osc_base_module_t));
331
332 /* initialize the objects, so that always free in cleanup */
333 OBJ_CONSTRUCT(&module->lock, opal_recursive_mutex_t);
334 OBJ_CONSTRUCT(&module->cond, opal_condition_t);
335 OBJ_CONSTRUCT(&module->locks_pending, opal_list_t);
336 OBJ_CONSTRUCT(&module->locks_pending_lock, opal_mutex_t);
337 OBJ_CONSTRUCT(&module->outstanding_locks, opal_hash_table_t);
338 OBJ_CONSTRUCT(&module->pending_acc, opal_list_t);
339 OBJ_CONSTRUCT(&module->pending_acc_lock, opal_mutex_t);
340 OBJ_CONSTRUCT(&module->buffer_gc, opal_list_t);
341 OBJ_CONSTRUCT(&module->gc_lock, opal_mutex_t);
342 OBJ_CONSTRUCT(&module->all_sync, ompi_osc_pt2pt_sync_t);
343 OBJ_CONSTRUCT(&module->peer_hash, opal_hash_table_t);
344 OBJ_CONSTRUCT(&module->peer_lock, opal_mutex_t);
345
346 ret = opal_hash_table_init (&module->outstanding_locks, 64);
347 if (OPAL_SUCCESS != ret) {
348 goto cleanup;
349 }
350
351 ret = opal_hash_table_init (&module->peer_hash, 128);
352 if (OPAL_SUCCESS != ret) {
353 goto cleanup;
354 }
355
356 /* options */
357 /* FIX ME: should actually check this value... */
358 #if 1
359 module->accumulate_ordering = 1;
360 #else
361 ompi_osc_base_config_value_equal("accumulate_ordering", info, "none");
362 #endif
363
364 /* fill in our part */
365 if (MPI_WIN_FLAVOR_ALLOCATE == flavor && size) {
366 module->free_after = *base = malloc(size);
367 if (NULL == *base) {
368 ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
369 goto cleanup;
370 }
371 }
372
373 /* in the dynamic case base is MPI_BOTTOM */
374 if (MPI_WIN_FLAVOR_DYNAMIC != flavor) {
375 module->baseptr = *base;
376 }
377
378 ret = ompi_comm_dup(comm, &module->comm);
379 if (OMPI_SUCCESS != ret) goto cleanup;
380
381 OPAL_OUTPUT_VERBOSE((10, ompi_osc_base_framework.framework_output,
382 "pt2pt component creating window with id %d",
383 ompi_comm_get_cid(module->comm)));
384
385 /* record my displacement unit. Always resolved at target */
386 module->disp_unit = disp_unit;
387
388 /* peer op count data */
389 module->epoch_outgoing_frag_count = calloc (ompi_comm_size(comm), sizeof(uint32_t));
390 if (NULL == module->epoch_outgoing_frag_count) {
391 ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
392 goto cleanup;
393 }
394
395 /* the statement below (from Brian) does not seem correct so disable active target on the
396 * window. if this end up being incorrect please revert this one change */
397 #if 0
398 /* initially, we're in that pseudo-fence state, so we allow eager
399 sends (yay for Fence). Other protocols will disable before
400 they start their epochs, so this isn't a problem. */
401 module->all_sync.type = OMPI_OSC_PT2PT_SYNC_TYPE_FENCE;
402 module->all_sync.eager_send_active = true;
403 #endif
404
405 /* lock data */
406 module->no_locks = check_config_value_bool ("no_locks", info, ompi_osc_pt2pt_no_locks);
407
408 /* update component data */
409 OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.lock);
410 ret = opal_hash_table_set_value_uint32(&mca_osc_pt2pt_component.modules,
411 ompi_comm_get_cid(module->comm),
412 module);
413 OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.lock);
414 if (OMPI_SUCCESS != ret) goto cleanup;
415
416 /* fill in window information */
417 *model = MPI_WIN_UNIFIED;
418 win->w_osc_module = (ompi_osc_base_module_t*) module;
419 asprintf(&name, "pt2pt window %d", ompi_comm_get_cid(module->comm));
420 ompi_win_set_name(win, name);
421 free(name);
422
423 /* sync memory - make sure all initialization completed */
424 opal_atomic_mb();
425
426 ret = ompi_osc_pt2pt_frag_start_receive (module);
427 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
428 goto cleanup;
429 }
430
431 /* barrier to prevent arrival of lock requests before we're
432 fully created */
433 ret = module->comm->c_coll->coll_barrier(module->comm,
434 module->comm->c_coll->coll_barrier_module);
435 if (OMPI_SUCCESS != ret) goto cleanup;
436
437 if (!mca_osc_pt2pt_component.progress_enable) {
438 opal_progress_register (component_progress);
439 mca_osc_pt2pt_component.progress_enable = true;
440 }
441
442 if (module->no_locks) {
443 win->w_flags |= OMPI_WIN_NO_LOCKS;
444 }
445
446 OPAL_OUTPUT_VERBOSE((10, ompi_osc_base_framework.framework_output,
447 "done creating pt2pt window %d", ompi_comm_get_cid(module->comm)));
448
449 return OMPI_SUCCESS;
450
451 cleanup:
452 /* set the module so we properly cleanup */
453 win->w_osc_module = (ompi_osc_base_module_t*) module;
454 ompi_osc_pt2pt_free (win);
455
456 return ret;
457 }
458
459
460 int
ompi_osc_pt2pt_set_info(struct ompi_win_t * win,struct opal_info_t * info)461 ompi_osc_pt2pt_set_info(struct ompi_win_t *win, struct opal_info_t *info)
462 {
463 ompi_osc_pt2pt_module_t *module =
464 (ompi_osc_pt2pt_module_t*) win->w_osc_module;
465
466 /* enforce collectiveness... */
467 return module->comm->c_coll->coll_barrier(module->comm,
468 module->comm->c_coll->coll_barrier_module);
469 }
470
471
472 int
ompi_osc_pt2pt_get_info(struct ompi_win_t * win,struct opal_info_t ** info_used)473 ompi_osc_pt2pt_get_info(struct ompi_win_t *win, struct opal_info_t **info_used)
474 {
475 opal_info_t *info = OBJ_NEW(opal_info_t);
476 if (NULL == info) return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
477
478 *info_used = info;
479
480 return OMPI_SUCCESS;
481 }
482
483 OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_pending_t, opal_list_item_t, NULL, NULL);
484
ompi_osc_pt2pt_receive_construct(ompi_osc_pt2pt_receive_t * recv)485 static void ompi_osc_pt2pt_receive_construct (ompi_osc_pt2pt_receive_t *recv)
486 {
487 recv->buffer = NULL;
488 recv->pml_request = NULL;
489 }
490
ompi_osc_pt2pt_receive_destruct(ompi_osc_pt2pt_receive_t * recv)491 static void ompi_osc_pt2pt_receive_destruct (ompi_osc_pt2pt_receive_t *recv)
492 {
493 free (recv->buffer);
494 if (recv->pml_request && MPI_REQUEST_NULL != recv->pml_request) {
495 recv->pml_request->req_complete_cb = NULL;
496 ompi_request_cancel (recv->pml_request);
497 ompi_request_free (&recv->pml_request);
498 }
499 }
500
501 OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_receive_t, opal_list_item_t,
502 ompi_osc_pt2pt_receive_construct,
503 ompi_osc_pt2pt_receive_destruct);
504
ompi_osc_pt2pt_peer_construct(ompi_osc_pt2pt_peer_t * peer)505 static void ompi_osc_pt2pt_peer_construct (ompi_osc_pt2pt_peer_t *peer)
506 {
507 OBJ_CONSTRUCT(&peer->queued_frags, opal_list_t);
508 OBJ_CONSTRUCT(&peer->lock, opal_mutex_t);
509 peer->active_frag = NULL;
510 peer->passive_incoming_frag_count = 0;
511 peer->flags = 0;
512 }
513
ompi_osc_pt2pt_peer_destruct(ompi_osc_pt2pt_peer_t * peer)514 static void ompi_osc_pt2pt_peer_destruct (ompi_osc_pt2pt_peer_t *peer)
515 {
516 OBJ_DESTRUCT(&peer->queued_frags);
517 OBJ_DESTRUCT(&peer->lock);
518 }
519
520 OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_peer_t, opal_object_t,
521 ompi_osc_pt2pt_peer_construct,
522 ompi_osc_pt2pt_peer_destruct);
523