1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
2 /*
3  * Copyright (c) 2004-2007 The Trustees of Indiana University.
4  *                         All rights reserved.
5  * Copyright (c) 2004-2017 The University of Tennessee and The University
6  *                         of Tennessee Research Foundation.  All rights
7  *                         reserved.
8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
9  *                         University of Stuttgart.  All rights reserved.
10  * Copyright (c) 2004-2005 The Regents of the University of California.
11  *                         All rights reserved.
12  * Copyright (c) 2007-2015 Los Alamos National Security, LLC.  All rights
13  *                         reserved.
14  * Copyright (c) 2006-2008 University of Houston.  All rights reserved.
15  * Copyright (c) 2010      Oracle and/or its affiliates.  All rights reserved.
16  * Copyright (c) 2012-2013 Sandia National Laboratories.  All rights reserved.
17  * Copyright (c) 2015-2016 Research Organization for Information Science
18  *                         and Technology (RIST). All rights reserved.
19  * Copyright (c) 2016-2017 IBM Corporation. All rights reserved.
20  * $COPYRIGHT$
21  *
22  * Additional copyrights may follow
23  *
24  * $HEADER$
25  */
26 
27 #include "ompi_config.h"
28 #include "opal/util/show_help.h"
29 
30 #include <string.h>
31 
32 #include "osc_pt2pt.h"
33 #include "osc_pt2pt_frag.h"
34 #include "osc_pt2pt_request.h"
35 #include "osc_pt2pt_data_move.h"
36 
37 #include "ompi/mca/osc/base/osc_base_obj_convert.h"
38 
39 static int component_register(void);
40 static int component_init(bool enable_progress_threads, bool enable_mpi_threads);
41 static int component_finalize(void);
42 static int component_query(struct ompi_win_t *win, void **base, size_t size, int disp_unit,
43                            struct ompi_communicator_t *comm, struct opal_info_t *info,
44                            int flavor);
45 static int component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit,
46                             struct ompi_communicator_t *comm, struct opal_info_t *info,
47                             int flavor, int *model);
48 
49 ompi_osc_pt2pt_component_t mca_osc_pt2pt_component = {
50     { /* ompi_osc_base_component_t */
51         .osc_version = {
52             OMPI_OSC_BASE_VERSION_3_0_0,
53             .mca_component_name = "pt2pt",
54             MCA_BASE_MAKE_VERSION(component, OMPI_MAJOR_VERSION, OMPI_MINOR_VERSION,
55                                   OMPI_RELEASE_VERSION),
56             .mca_register_component_params = component_register,
57         },
58         .osc_data = {
59             /* The component is not checkpoint ready */
60             MCA_BASE_METADATA_PARAM_NONE
61         },
62         .osc_init = component_init,
63         .osc_query = component_query,
64         .osc_select = component_select,
65         .osc_finalize = component_finalize,
66     }
67 };
68 
69 
70 ompi_osc_pt2pt_module_t ompi_osc_pt2pt_module_template = {
71     {
72         NULL, /* shared_query */
73 
74         ompi_osc_pt2pt_attach,
75         ompi_osc_pt2pt_detach,
76         ompi_osc_pt2pt_free,
77 
78         ompi_osc_pt2pt_put,
79         ompi_osc_pt2pt_get,
80         ompi_osc_pt2pt_accumulate,
81         ompi_osc_pt2pt_compare_and_swap,
82         ompi_osc_pt2pt_fetch_and_op,
83         ompi_osc_pt2pt_get_accumulate,
84 
85         ompi_osc_pt2pt_rput,
86         ompi_osc_pt2pt_rget,
87         ompi_osc_pt2pt_raccumulate,
88         ompi_osc_pt2pt_rget_accumulate,
89 
90         ompi_osc_pt2pt_fence,
91 
92         ompi_osc_pt2pt_start,
93         ompi_osc_pt2pt_complete,
94         ompi_osc_pt2pt_post,
95         ompi_osc_pt2pt_wait,
96         ompi_osc_pt2pt_test,
97 
98         ompi_osc_pt2pt_lock,
99         ompi_osc_pt2pt_unlock,
100         ompi_osc_pt2pt_lock_all,
101         ompi_osc_pt2pt_unlock_all,
102 
103         ompi_osc_pt2pt_sync,
104         ompi_osc_pt2pt_flush,
105         ompi_osc_pt2pt_flush_all,
106         ompi_osc_pt2pt_flush_local,
107         ompi_osc_pt2pt_flush_local_all,
108     }
109 };
110 
111 bool ompi_osc_pt2pt_no_locks = false;
112 static bool using_thread_multiple = false;
113 
114 /* look up parameters for configuring this window.  The code first
115    looks in the info structure passed by the user, then through mca
116    parameters. */
check_config_value_bool(char * key,opal_info_t * info,bool result)117 static bool check_config_value_bool(char *key, opal_info_t *info, bool result)
118 {
119     int flag;
120 
121     (void) opal_info_get_bool (info, key, &result, &flag);
122     return result;
123 }
124 
component_register(void)125 static int component_register (void)
126 {
127     ompi_osc_pt2pt_no_locks = false;
128     (void) mca_base_component_var_register(&mca_osc_pt2pt_component.super.osc_version,
129                                            "no_locks",
130                                            "Enable optimizations available only if MPI_LOCK is "
131                                            "not used.  "
132                                            "Info key of same name overrides this value.",
133                                            MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0,
134                                            OPAL_INFO_LVL_9,
135                                            MCA_BASE_VAR_SCOPE_READONLY,
136                                            &ompi_osc_pt2pt_no_locks);
137 
138     mca_osc_pt2pt_component.buffer_size = 8192;
139     (void) mca_base_component_var_register (&mca_osc_pt2pt_component.super.osc_version, "buffer_size",
140 					    "Data transfers smaller than this limit may be coalesced before "
141 					    "being transferred (default: 8k)", MCA_BASE_VAR_TYPE_UNSIGNED_INT,
142 					    NULL, 0, 0, OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY,
143 					    &mca_osc_pt2pt_component.buffer_size);
144 
145     mca_osc_pt2pt_component.receive_count = 4;
146     (void) mca_base_component_var_register (&mca_osc_pt2pt_component.super.osc_version, "receive_count",
147                                             "Number of receives to post for each window for incoming fragments "
148                                             "(default: 4)", MCA_BASE_VAR_TYPE_UNSIGNED_INT, NULL, 0, 0, OPAL_INFO_LVL_4,
149                                             MCA_BASE_VAR_SCOPE_READONLY, &mca_osc_pt2pt_component.receive_count);
150 
151     return OMPI_SUCCESS;
152 }
153 
component_progress(void)154 static int component_progress (void)
155 {
156     int completed = 0;
157     int pending_count = opal_list_get_size (&mca_osc_pt2pt_component.pending_operations);
158     int recv_count = opal_list_get_size (&mca_osc_pt2pt_component.pending_receives);
159     ompi_osc_pt2pt_pending_t *pending, *next;
160 
161     if (recv_count) {
162         for (int i = 0 ; i < recv_count ; ++i) {
163             OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.pending_receives_lock);
164             ompi_osc_pt2pt_receive_t *recv = (ompi_osc_pt2pt_receive_t *) opal_list_remove_first (&mca_osc_pt2pt_component.pending_receives);
165             OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.pending_receives_lock);
166             if (NULL == recv) {
167                 break;
168             }
169 
170             (void) ompi_osc_pt2pt_process_receive (recv);
171             completed++;
172         }
173     }
174 
175     /* process one incoming request */
176     if (pending_count) {
177         OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.pending_operations_lock);
178         OPAL_LIST_FOREACH_SAFE(pending, next, &mca_osc_pt2pt_component.pending_operations, ompi_osc_pt2pt_pending_t) {
179             int ret;
180 
181             switch (pending->header.base.type) {
182             case OMPI_OSC_PT2PT_HDR_TYPE_FLUSH_REQ:
183                 ret = ompi_osc_pt2pt_process_flush (pending->module, pending->source,
184                                                     &pending->header.flush);
185                 break;
186             case OMPI_OSC_PT2PT_HDR_TYPE_UNLOCK_REQ:
187                 ret = ompi_osc_pt2pt_process_unlock (pending->module, pending->source,
188                                                      &pending->header.unlock);
189                 break;
190             default:
191                 /* shouldn't happen */
192                 assert (0);
193                 abort ();
194             }
195 
196             if (OMPI_SUCCESS == ret) {
197                 opal_list_remove_item (&mca_osc_pt2pt_component.pending_operations, &pending->super);
198                 OBJ_RELEASE(pending);
199                 completed++;
200             }
201         }
202         OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.pending_operations_lock);
203     }
204 
205     return completed;
206 }
207 
208 static int
component_init(bool enable_progress_threads,bool enable_mpi_threads)209 component_init(bool enable_progress_threads,
210                bool enable_mpi_threads)
211 {
212     int ret;
213 
214     if (enable_mpi_threads) {
215         using_thread_multiple = true;
216     }
217 
218     OBJ_CONSTRUCT(&mca_osc_pt2pt_component.lock, opal_mutex_t);
219     OBJ_CONSTRUCT(&mca_osc_pt2pt_component.pending_operations, opal_list_t);
220     OBJ_CONSTRUCT(&mca_osc_pt2pt_component.pending_operations_lock, opal_mutex_t);
221     OBJ_CONSTRUCT(&mca_osc_pt2pt_component.pending_receives, opal_list_t);
222     OBJ_CONSTRUCT(&mca_osc_pt2pt_component.pending_receives_lock, opal_mutex_t);
223 
224     OBJ_CONSTRUCT(&mca_osc_pt2pt_component.modules,
225                   opal_hash_table_t);
226     opal_hash_table_init(&mca_osc_pt2pt_component.modules, 2);
227 
228     mca_osc_pt2pt_component.progress_enable = false;
229     mca_osc_pt2pt_component.module_count = 0;
230 
231     OBJ_CONSTRUCT(&mca_osc_pt2pt_component.frags, opal_free_list_t);
232     ret = opal_free_list_init (&mca_osc_pt2pt_component.frags,
233                                sizeof(ompi_osc_pt2pt_frag_t), 8,
234                                OBJ_CLASS(ompi_osc_pt2pt_frag_t),
235                                mca_osc_pt2pt_component.buffer_size +
236                                sizeof (ompi_osc_pt2pt_frag_header_t),
237                                8, 1, -1, 1, NULL, 0, NULL, NULL, NULL);
238     if (OMPI_SUCCESS != ret) {
239 	opal_output_verbose(1, ompi_osc_base_framework.framework_output,
240 			    "%s:%d: opal_free_list_init failed: %d",
241 			    __FILE__, __LINE__, ret);
242 	return ret;
243     }
244 
245     OBJ_CONSTRUCT(&mca_osc_pt2pt_component.requests, opal_free_list_t);
246     ret = opal_free_list_init (&mca_osc_pt2pt_component.requests,
247                                sizeof(ompi_osc_pt2pt_request_t), 8,
248                                OBJ_CLASS(ompi_osc_pt2pt_request_t),
249                                0, 0, 0, -1, 32, NULL, 0, NULL, NULL, NULL);
250     if (OMPI_SUCCESS != ret) {
251         opal_output_verbose(1, ompi_osc_base_framework.framework_output,
252                             "%s:%d: opal_free_list_init failed: %d\n",
253                             __FILE__, __LINE__, ret);
254         return ret;
255     }
256 
257     return ret;
258 }
259 
260 
261 int
component_finalize(void)262 component_finalize(void)
263 {
264     size_t num_modules;
265 
266     if (mca_osc_pt2pt_component.progress_enable) {
267 	opal_progress_unregister (component_progress);
268     }
269 
270     if (0 !=
271         (num_modules = opal_hash_table_get_size(&mca_osc_pt2pt_component.modules))) {
272         opal_output(ompi_osc_base_framework.framework_output,
273                     "WARNING: There were %d Windows created but not freed.",
274                     (int) num_modules);
275     }
276 
277     OBJ_DESTRUCT(&mca_osc_pt2pt_component.frags);
278     OBJ_DESTRUCT(&mca_osc_pt2pt_component.modules);
279     OBJ_DESTRUCT(&mca_osc_pt2pt_component.lock);
280     OBJ_DESTRUCT(&mca_osc_pt2pt_component.requests);
281     OBJ_DESTRUCT(&mca_osc_pt2pt_component.pending_operations);
282     OBJ_DESTRUCT(&mca_osc_pt2pt_component.pending_operations_lock);
283     OBJ_DESTRUCT(&mca_osc_pt2pt_component.pending_receives);
284     OBJ_DESTRUCT(&mca_osc_pt2pt_component.pending_receives_lock);
285 
286     return OMPI_SUCCESS;
287 }
288 
289 
290 static int
component_query(struct ompi_win_t * win,void ** base,size_t size,int disp_unit,struct ompi_communicator_t * comm,struct opal_info_t * info,int flavor)291 component_query(struct ompi_win_t *win, void **base, size_t size, int disp_unit,
292                 struct ompi_communicator_t *comm, struct opal_info_t *info,
293                 int flavor)
294 {
295     if (MPI_WIN_FLAVOR_SHARED == flavor) return -1;
296 
297     return 10;
298 }
299 
300 
301 static int
component_select(struct ompi_win_t * win,void ** base,size_t size,int disp_unit,struct ompi_communicator_t * comm,struct opal_info_t * info,int flavor,int * model)302 component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit,
303                  struct ompi_communicator_t *comm, struct opal_info_t *info,
304                  int flavor, int *model)
305 {
306     ompi_osc_pt2pt_module_t *module = NULL;
307     int ret;
308     char *name;
309 
310     /* We don't support shared windows; that's for the sm onesided
311        component */
312     if (MPI_WIN_FLAVOR_SHARED == flavor) return OMPI_ERR_NOT_SUPPORTED;
313 
314     /*
315      * workaround for issue https://github.com/open-mpi/ompi/issues/2614
316      * The following check needs to be removed once 2614 is addressed.
317      */
318     if (using_thread_multiple) {
319         opal_show_help("help-osc-pt2pt.txt", "mpi-thread-multiple-not-supported", true);
320         return OMPI_ERR_NOT_SUPPORTED;
321     }
322 
323     /* create module structure with all fields initialized to zero */
324     module = (ompi_osc_pt2pt_module_t*)
325         calloc(1, sizeof(ompi_osc_pt2pt_module_t));
326     if (NULL == module) return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
327 
328     /* fill in the function pointer part */
329     memcpy(module, &ompi_osc_pt2pt_module_template,
330            sizeof(ompi_osc_base_module_t));
331 
332     /* initialize the objects, so that always free in cleanup */
333     OBJ_CONSTRUCT(&module->lock, opal_recursive_mutex_t);
334     OBJ_CONSTRUCT(&module->cond, opal_condition_t);
335     OBJ_CONSTRUCT(&module->locks_pending, opal_list_t);
336     OBJ_CONSTRUCT(&module->locks_pending_lock, opal_mutex_t);
337     OBJ_CONSTRUCT(&module->outstanding_locks, opal_hash_table_t);
338     OBJ_CONSTRUCT(&module->pending_acc, opal_list_t);
339     OBJ_CONSTRUCT(&module->pending_acc_lock, opal_mutex_t);
340     OBJ_CONSTRUCT(&module->buffer_gc, opal_list_t);
341     OBJ_CONSTRUCT(&module->gc_lock, opal_mutex_t);
342     OBJ_CONSTRUCT(&module->all_sync, ompi_osc_pt2pt_sync_t);
343     OBJ_CONSTRUCT(&module->peer_hash, opal_hash_table_t);
344     OBJ_CONSTRUCT(&module->peer_lock, opal_mutex_t);
345 
346     ret = opal_hash_table_init (&module->outstanding_locks, 64);
347     if (OPAL_SUCCESS != ret) {
348         goto cleanup;
349     }
350 
351     ret = opal_hash_table_init (&module->peer_hash, 128);
352     if (OPAL_SUCCESS != ret) {
353         goto cleanup;
354     }
355 
356     /* options */
357     /* FIX ME: should actually check this value... */
358 #if 1
359     module->accumulate_ordering = 1;
360 #else
361     ompi_osc_base_config_value_equal("accumulate_ordering", info, "none");
362 #endif
363 
364     /* fill in our part */
365     if (MPI_WIN_FLAVOR_ALLOCATE == flavor && size) {
366 	module->free_after = *base = malloc(size);
367 	if (NULL == *base) {
368 	    ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
369 	    goto cleanup;
370 	}
371     }
372 
373     /* in the dynamic case base is MPI_BOTTOM */
374     if (MPI_WIN_FLAVOR_DYNAMIC != flavor) {
375 	module->baseptr = *base;
376     }
377 
378     ret = ompi_comm_dup(comm, &module->comm);
379     if (OMPI_SUCCESS != ret) goto cleanup;
380 
381     OPAL_OUTPUT_VERBOSE((10, ompi_osc_base_framework.framework_output,
382                          "pt2pt component creating window with id %d",
383                          ompi_comm_get_cid(module->comm)));
384 
385     /* record my displacement unit.  Always resolved at target */
386     module->disp_unit = disp_unit;
387 
388     /* peer op count data */
389     module->epoch_outgoing_frag_count = calloc (ompi_comm_size(comm), sizeof(uint32_t));
390     if (NULL == module->epoch_outgoing_frag_count) {
391         ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
392         goto cleanup;
393     }
394 
395     /* the statement below (from Brian) does not seem correct so disable active target on the
396      * window. if this end up being incorrect please revert this one change */
397 #if 0
398     /* initially, we're in that pseudo-fence state, so we allow eager
399        sends (yay for Fence).  Other protocols will disable before
400        they start their epochs, so this isn't a problem. */
401     module->all_sync.type = OMPI_OSC_PT2PT_SYNC_TYPE_FENCE;
402     module->all_sync.eager_send_active = true;
403 #endif
404 
405     /* lock data */
406     module->no_locks = check_config_value_bool ("no_locks", info, ompi_osc_pt2pt_no_locks);
407 
408     /* update component data */
409     OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.lock);
410     ret = opal_hash_table_set_value_uint32(&mca_osc_pt2pt_component.modules,
411                                            ompi_comm_get_cid(module->comm),
412                                            module);
413     OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.lock);
414     if (OMPI_SUCCESS != ret) goto cleanup;
415 
416     /* fill in window information */
417     *model = MPI_WIN_UNIFIED;
418     win->w_osc_module = (ompi_osc_base_module_t*) module;
419     asprintf(&name, "pt2pt window %d", ompi_comm_get_cid(module->comm));
420     ompi_win_set_name(win, name);
421     free(name);
422 
423     /* sync memory - make sure all initialization completed */
424     opal_atomic_mb();
425 
426     ret = ompi_osc_pt2pt_frag_start_receive (module);
427     if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
428 	goto cleanup;
429     }
430 
431     /* barrier to prevent arrival of lock requests before we're
432        fully created */
433     ret = module->comm->c_coll->coll_barrier(module->comm,
434                                             module->comm->c_coll->coll_barrier_module);
435     if (OMPI_SUCCESS != ret) goto cleanup;
436 
437     if (!mca_osc_pt2pt_component.progress_enable) {
438 	opal_progress_register (component_progress);
439 	mca_osc_pt2pt_component.progress_enable = true;
440     }
441 
442     if (module->no_locks) {
443         win->w_flags |= OMPI_WIN_NO_LOCKS;
444     }
445 
446     OPAL_OUTPUT_VERBOSE((10, ompi_osc_base_framework.framework_output,
447                          "done creating pt2pt window %d", ompi_comm_get_cid(module->comm)));
448 
449     return OMPI_SUCCESS;
450 
451  cleanup:
452     /* set the module so we properly cleanup */
453     win->w_osc_module = (ompi_osc_base_module_t*) module;
454     ompi_osc_pt2pt_free (win);
455 
456     return ret;
457 }
458 
459 
460 int
ompi_osc_pt2pt_set_info(struct ompi_win_t * win,struct opal_info_t * info)461 ompi_osc_pt2pt_set_info(struct ompi_win_t *win, struct opal_info_t *info)
462 {
463     ompi_osc_pt2pt_module_t *module =
464         (ompi_osc_pt2pt_module_t*) win->w_osc_module;
465 
466     /* enforce collectiveness... */
467     return module->comm->c_coll->coll_barrier(module->comm,
468                                              module->comm->c_coll->coll_barrier_module);
469 }
470 
471 
472 int
ompi_osc_pt2pt_get_info(struct ompi_win_t * win,struct opal_info_t ** info_used)473 ompi_osc_pt2pt_get_info(struct ompi_win_t *win, struct opal_info_t **info_used)
474 {
475     opal_info_t *info = OBJ_NEW(opal_info_t);
476     if (NULL == info) return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
477 
478     *info_used = info;
479 
480     return OMPI_SUCCESS;
481 }
482 
483 OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_pending_t, opal_list_item_t, NULL, NULL);
484 
ompi_osc_pt2pt_receive_construct(ompi_osc_pt2pt_receive_t * recv)485 static void ompi_osc_pt2pt_receive_construct (ompi_osc_pt2pt_receive_t *recv)
486 {
487     recv->buffer = NULL;
488     recv->pml_request = NULL;
489 }
490 
ompi_osc_pt2pt_receive_destruct(ompi_osc_pt2pt_receive_t * recv)491 static void ompi_osc_pt2pt_receive_destruct (ompi_osc_pt2pt_receive_t *recv)
492 {
493     free (recv->buffer);
494     if (recv->pml_request && MPI_REQUEST_NULL != recv->pml_request) {
495         recv->pml_request->req_complete_cb = NULL;
496         ompi_request_cancel (recv->pml_request);
497         ompi_request_free (&recv->pml_request);
498     }
499 }
500 
501 OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_receive_t, opal_list_item_t,
502                    ompi_osc_pt2pt_receive_construct,
503                    ompi_osc_pt2pt_receive_destruct);
504 
ompi_osc_pt2pt_peer_construct(ompi_osc_pt2pt_peer_t * peer)505 static void ompi_osc_pt2pt_peer_construct (ompi_osc_pt2pt_peer_t *peer)
506 {
507     OBJ_CONSTRUCT(&peer->queued_frags, opal_list_t);
508     OBJ_CONSTRUCT(&peer->lock, opal_mutex_t);
509     peer->active_frag = NULL;
510     peer->passive_incoming_frag_count = 0;
511     peer->flags = 0;
512 }
513 
ompi_osc_pt2pt_peer_destruct(ompi_osc_pt2pt_peer_t * peer)514 static void ompi_osc_pt2pt_peer_destruct (ompi_osc_pt2pt_peer_t *peer)
515 {
516     OBJ_DESTRUCT(&peer->queued_frags);
517     OBJ_DESTRUCT(&peer->lock);
518 }
519 
520 OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_peer_t, opal_object_t,
521                    ompi_osc_pt2pt_peer_construct,
522                    ompi_osc_pt2pt_peer_destruct);
523