1 /*
2 * Copyright (c) 2015 Sandia National Laboratories. All rights reserved.
3 * Copyright (c) 2015 Bull SAS. All rights reserved.
4 * Copyright (c) 2015 Research Organization for Information Science
5 * and Technology (RIST). All rights reserved.
6 * Copyright (c) 2017 IBM Corporation. All rights reserved.
7 * $COPYRIGHT$
8 *
9 * Additional copyrights may follow
10 *
11 * $HEADER$
12 */
13
14 #include "ompi_config.h"
15 #include "coll_portals4.h"
16 #include "coll_portals4_request.h"
17
18 #include <stdio.h>
19
20 #include "mpi.h"
21 #include "ompi/constants.h"
22 #include "ompi/datatype/ompi_datatype.h"
23 #include "ompi/datatype/ompi_datatype_internal.h"
24 #include "ompi/op/op.h"
25 #include "opal/util/bit_ops.h"
26 #include "ompi/mca/pml/pml.h"
27 #include "ompi/mca/coll/coll.h"
28 #include "ompi/mca/coll/base/base.h"
29
30 #define COLL_PORTALS4_REDUCE_MAX_CHILDREN 2
31
32
33 static int
reduce_kary_tree_top(const void * sendbuf,void * recvbuf,int count,MPI_Datatype dtype,MPI_Op op,int root,struct ompi_communicator_t * comm,ompi_coll_portals4_request_t * request,mca_coll_portals4_module_t * module)34 reduce_kary_tree_top(const void *sendbuf, void *recvbuf, int count,
35 MPI_Datatype dtype, MPI_Op op,
36 int root,
37 struct ompi_communicator_t *comm,
38 ompi_coll_portals4_request_t *request,
39 mca_coll_portals4_module_t *module)
40 {
41 bool is_sync = request->is_sync;
42 int ret;
43 unsigned int i;
44 int size = ompi_comm_size(comm);
45 int rank = ompi_comm_rank(comm);
46 ptl_rank_t parent, child[COLL_PORTALS4_REDUCE_MAX_CHILDREN];
47 size_t internal_count, length;
48 ptl_handle_md_t zero_md_h, data_md_h;
49 ptl_handle_me_t me_h;
50 ptl_me_t me;
51 ptl_match_bits_t match_bits_ack, match_bits_rtr, match_bits;
52 ptl_ct_event_t ct;
53 ptl_op_t ptl_op;
54 ptl_datatype_t ptl_dtype;
55
56
57 request->type = OMPI_COLL_PORTALS4_TYPE_REDUCE;
58
59 /*
60 ** Initialization
61 */
62
63 for (i = 0 ; i < COLL_PORTALS4_REDUCE_MAX_CHILDREN ; i++) {
64 child[i] = PTL_INVALID_RANK;
65 }
66
67 parent = PTL_INVALID_RANK;
68
69 zero_md_h = mca_coll_portals4_component.zero_md_h;
70 data_md_h = mca_coll_portals4_component.data_md_h;
71
72 internal_count = opal_atomic_add_fetch_size_t(&module->coll_count, 1);
73
74 /*
75 ** DATATYPE and SIZES
76 */
77 ret = ompi_datatype_type_size(dtype, &length);
78 length *= count;
79
80 request->u.reduce.is_optim = is_reduce_optimizable(dtype, length, op, &ptl_dtype, &ptl_op);
81
82 if (request->u.reduce.is_optim) {
83
84 /*
85 * TOPOLOGY
86 */
87
88 /* this function is dependent on the number of segments,
89 * if we use segmentation pipe-line is preferred, and
90 * binary tree otherwise */
91
92 get_k_ary_tree(COLL_PORTALS4_REDUCE_MAX_CHILDREN,
93 rank, size, root, &parent, child, &request->u.reduce.child_nb);
94
95 /*
96 * PORTALS4 RESOURCE ALLOCATION
97 */
98
99 /* Compute match bits */
100 COLL_PORTALS4_SET_BITS(match_bits_ack, ompi_comm_get_cid(comm), 1, 0,
101 COLL_PORTALS4_REDUCE, 0, internal_count);
102
103 COLL_PORTALS4_SET_BITS(match_bits_rtr, ompi_comm_get_cid(comm), 0, 1,
104 COLL_PORTALS4_REDUCE, 0, internal_count);
105
106 COLL_PORTALS4_SET_BITS(match_bits, ompi_comm_get_cid(comm), 0, 0,
107 COLL_PORTALS4_REDUCE, 0, internal_count);
108
109 if ((ret = PtlCTAlloc(mca_coll_portals4_component.ni_h, &request->u.reduce.trig_ct_h)) != 0) {
110 return opal_stderr("PtlCTAlloc failed", __FILE__, __LINE__, ret);
111 }
112
113 /* warning : all the operations will be executed on the recvbuf */
114 if (rank != root) {
115 request->u.reduce.free_buffer = malloc(length);
116 if (NULL == request->u.reduce.free_buffer) {
117 return OMPI_ERR_OUT_OF_RESOURCE;
118 }
119 recvbuf = (void*)request->u.reduce.free_buffer;
120
121 memcpy(recvbuf, sendbuf, length);
122 }
123 else {
124 request->u.reduce.free_buffer = NULL;
125 if (sendbuf != MPI_IN_PLACE) {
126 memcpy(recvbuf, sendbuf, length);
127 }
128 }
129
130 if (request->u.reduce.child_nb) {
131
132 /*
133 ** Prepare Data ME
134 */
135 memset(&me, 0, sizeof(ptl_me_t));
136 me.start = recvbuf;
137 me.length = length;
138 me.ct_handle = request->u.reduce.trig_ct_h;
139 me.uid = mca_coll_portals4_component.uid;
140 me.options = PTL_ME_OP_PUT | PTL_ME_EVENT_SUCCESS_DISABLE |
141 PTL_ME_EVENT_LINK_DISABLE | PTL_ME_EVENT_UNLINK_DISABLE |
142 PTL_ME_EVENT_CT_COMM;
143 me.match_id.phys.nid = PTL_NID_ANY;
144 me.match_id.phys.pid = PTL_PID_ANY;
145 me.match_bits = match_bits;
146 me.ignore_bits = 0;
147
148 if ((ret = PtlMEAppend(mca_coll_portals4_component.ni_h,
149 mca_coll_portals4_component.pt_idx,
150 &me, PTL_PRIORITY_LIST, NULL,
151 &request->u.reduce.data_me_h)) != 0) {
152 return opal_stderr("PtlMEAppend failed", __FILE__, __LINE__, ret);
153 }
154 }
155
156 if (rank != root) {
157 request->u.reduce.use_ack_ct_h = true;
158
159 if ((ret = PtlCTAlloc(mca_coll_portals4_component.ni_h, &request->u.reduce.ack_ct_h)) != 0) {
160 return opal_stderr("PtlCTAlloc failed", __FILE__, __LINE__, ret);
161 }
162
163 /*
164 ** Prepare ME for data ACK Put
165 ** Priority List
166 */
167
168 memset(&me, 0, sizeof(ptl_me_t));
169 me.start = NULL;
170 me.length = 0;
171 me.min_free = 0;
172 me.uid = mca_coll_portals4_component.uid;
173 me.options = PTL_ME_OP_PUT | PTL_ME_EVENT_SUCCESS_DISABLE |
174 PTL_ME_EVENT_LINK_DISABLE | PTL_ME_EVENT_UNLINK_DISABLE |
175 PTL_ME_USE_ONCE |
176 PTL_ME_EVENT_CT_COMM;
177 me.match_id.phys.nid = PTL_NID_ANY;
178 me.match_id.phys.pid = PTL_PID_ANY;
179 me.match_bits = match_bits_ack;
180 me.ignore_bits = 0;
181 me.ct_handle = request->u.reduce.ack_ct_h;
182
183 if ((ret = PtlMEAppend(mca_coll_portals4_component.ni_h,
184 mca_coll_portals4_component.pt_idx,
185 &me, PTL_PRIORITY_LIST,
186 NULL,
187 &me_h)) != 0) {
188 return opal_stderr("PtlMEAppend failed", __FILE__, __LINE__, ret);
189 }
190
191 /*
192 ** Prepare ME for sending RTR Put
193 ** Priority List, match also with "Overflow list Me" in coll_portals4_component
194 */
195
196 memset(&me, 0, sizeof(ptl_me_t));
197 me.start = NULL;
198 me.length = 0;
199 me.min_free = 0;
200 me.uid = mca_coll_portals4_component.uid;
201 me.options = PTL_ME_OP_PUT | PTL_ME_EVENT_SUCCESS_DISABLE |
202 PTL_ME_EVENT_LINK_DISABLE | PTL_ME_EVENT_UNLINK_DISABLE |
203 PTL_ME_USE_ONCE |
204 PTL_ME_EVENT_CT_COMM | PTL_ME_EVENT_CT_OVERFLOW;
205 me.match_id.phys.nid = PTL_NID_ANY;
206 me.match_id.phys.pid = PTL_PID_ANY;
207 me.match_bits = match_bits_rtr;
208 me.ignore_bits = 0;
209 me.ct_handle = request->u.reduce.trig_ct_h;
210
211 if ((ret = PtlMEAppend(mca_coll_portals4_component.ni_h,
212 mca_coll_portals4_component.pt_idx,
213 &me, PTL_PRIORITY_LIST,
214 NULL,
215 &me_h)) != 0) {
216 return opal_stderr("PtlMEAppend failed", __FILE__, __LINE__, ret);
217 }
218
219 /* Send Atomic operation to the parent */
220 if ((ret = PtlTriggeredAtomic(data_md_h,
221 (uint64_t)recvbuf,
222 length, PTL_NO_ACK_REQ,
223 ompi_coll_portals4_get_peer(comm, parent),
224 mca_coll_portals4_component.pt_idx,
225 match_bits, 0, NULL, 0,
226 ptl_op, ptl_dtype, request->u.reduce.trig_ct_h,
227 request->u.reduce.child_nb + 1)) != 0) {
228 return opal_stderr("PtlTriggeredAtomic failed", __FILE__, __LINE__, ret);
229 }
230 }
231 else {
232 request->u.reduce.use_ack_ct_h = false;
233 }
234
235 if (request->u.reduce.child_nb) {
236 for (i = 0 ; i < COLL_PORTALS4_REDUCE_MAX_CHILDREN ; i++) {
237 if (child[i] != PTL_INVALID_RANK) {
238 /*
239 * Prepare Triggered Put to ACK Data to children
240 *
241 */
242
243 if ((ret = PtlTriggeredPut (zero_md_h, 0, 0, PTL_NO_ACK_REQ,
244 ompi_coll_portals4_get_peer(comm, child[i]),
245 mca_coll_portals4_component.pt_idx,
246 match_bits_ack, 0, NULL, 0,
247 request->u.reduce.trig_ct_h,
248 (rank != root) ?
249 request->u.reduce.child_nb + 1 :
250 request->u.reduce.child_nb)) != 0) {
251 return opal_stderr("PtlTriggeredPut failed", __FILE__, __LINE__, ret);
252 }
253
254 /*
255 * Send RTR to children
256 *
257 */
258
259 /* and there, we only send the RTR when all the MEs are ready */
260 if ((ret = PtlPut(zero_md_h, 0, 0, PTL_NO_ACK_REQ,
261 ompi_coll_portals4_get_peer(comm, child[i]),
262 mca_coll_portals4_component.pt_idx,
263 match_bits_rtr, 0, NULL, 0)) != PTL_OK) {
264 return opal_stderr("Put RTR failed %d", __FILE__, __LINE__, ret);
265 }
266 }
267 }
268 }
269
270 if (rank != root) {
271 if (is_sync) {
272 if ((ret = PtlCTWait(request->u.reduce.ack_ct_h, 1, &ct)) != 0) {
273 opal_stderr("PtlCTWait failed", __FILE__, __LINE__, ret);
274 }
275 }
276 else {
277 if ((ret = PtlTriggeredPut (zero_md_h, 0, 0, PTL_NO_ACK_REQ,
278 ompi_coll_portals4_get_peer(comm, rank),
279 mca_coll_portals4_component.finish_pt_idx,
280 0, 0, NULL, (uintptr_t) request,
281 request->u.reduce.ack_ct_h,
282 1)) != 0) {
283 return opal_stderr("PtlTriggeredPut failed", __FILE__, __LINE__, ret);
284 }
285 }
286 }
287 else {
288 if (is_sync) {
289 if ((ret = PtlCTWait(request->u.reduce.trig_ct_h,
290 request->u.reduce.child_nb, &ct)) != 0) {
291 opal_stderr("PtlCTWait failed", __FILE__, __LINE__, ret);
292 }
293 }
294 else {
295 if ((ret = PtlTriggeredPut (zero_md_h, 0, 0, PTL_NO_ACK_REQ,
296 ompi_coll_portals4_get_peer(comm, rank),
297 mca_coll_portals4_component.finish_pt_idx,
298 0, 0, NULL, (uintptr_t) request,
299 request->u.reduce.trig_ct_h,
300 request->u.reduce.child_nb)) != 0) {
301 return opal_stderr("PtlTriggeredPut failed", __FILE__, __LINE__, ret);
302 }
303 }
304 }
305 }
306 else {
307 opal_output_verbose(100, ompi_coll_base_framework.framework_output,
308 "rank %d - optimization not supported, falling back to previous handler\n", rank);
309
310 if (request->is_sync) {
311 if ((module->previous_reduce) && (module->previous_reduce_module)) {
312 ret = module->previous_reduce(sendbuf, recvbuf, count, dtype, op, root,
313 comm, module->previous_reduce_module);
314 }
315 else {
316 opal_output_verbose(1, ompi_coll_base_framework.framework_output,
317 "rank %d - no previous reduce handler is available, aborting\n", rank);
318 return (OMPI_ERROR);
319 }
320 }
321 else {
322 if ((module->previous_ireduce) && (module->previous_ireduce_module)) {
323 ret = module->previous_ireduce(sendbuf, recvbuf, count, dtype, op, root,
324 comm, request->fallback_request, module->previous_ireduce_module);
325 }
326 else {
327 opal_output_verbose(1, ompi_coll_base_framework.framework_output,
328 "rank %d - no previous ireduce handler is available, aborting\n", rank);
329 return (OMPI_ERROR);
330 }
331 }
332 return ret;
333 }
334 return (OMPI_SUCCESS);
335 }
336
337
338
339
340 static int
reduce_kary_tree_bottom(ompi_coll_portals4_request_t * request)341 reduce_kary_tree_bottom(ompi_coll_portals4_request_t *request)
342 {
343 int ret, line;
344
345 if (request->u.reduce.is_optim) {
346 PtlAtomicSync();
347
348 if (request->u.reduce.use_ack_ct_h) {
349 ret = PtlCTFree(request->u.reduce.ack_ct_h);
350 if (PTL_OK != ret) { ret = OMPI_ERROR; line = __LINE__; goto err_hdlr; }
351 }
352
353 if (request->u.reduce.child_nb) {
354 do {
355 ret = PtlMEUnlink(request->u.reduce.data_me_h);
356 } while (PTL_IN_USE == ret);
357 if (PTL_OK != ret) { ret = OMPI_ERROR; line = __LINE__; goto err_hdlr; }
358 }
359
360 ret = PtlCTFree(request->u.reduce.trig_ct_h);
361 if (PTL_OK != ret) { ret = OMPI_ERROR; line = __LINE__; goto err_hdlr; }
362
363 if (request->u.reduce.free_buffer) {
364 free(request->u.reduce.free_buffer);
365 }
366 }
367 return (OMPI_SUCCESS);
368
369 err_hdlr:
370 opal_output(ompi_coll_base_framework.framework_output,
371 "%s:%4d:%4d\tError occurred ret=%d",
372 __FILE__, __LINE__, line, ret);
373
374 return ret;
375 }
376
377
378 int
ompi_coll_portals4_reduce_intra(const void * sendbuf,void * recvbuf,int count,MPI_Datatype dtype,MPI_Op op,int root,struct ompi_communicator_t * comm,mca_coll_base_module_t * module)379 ompi_coll_portals4_reduce_intra(const void *sendbuf, void *recvbuf, int count,
380 MPI_Datatype dtype, MPI_Op op,
381 int root,
382 struct ompi_communicator_t *comm,
383 mca_coll_base_module_t *module)
384 {
385 int ret;
386 mca_coll_portals4_module_t *portals4_module = (mca_coll_portals4_module_t*) module;
387 ompi_coll_portals4_request_t *request;
388
389 OMPI_COLL_PORTALS4_REQUEST_ALLOC(comm, request);
390 if (NULL == request) {
391 opal_output_verbose(1, ompi_coll_base_framework.framework_output,
392 "%s:%d: request alloc failed\n",
393 __FILE__, __LINE__);
394 return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
395 }
396
397 request->is_sync = true;
398 request->fallback_request = NULL;
399
400 ret = reduce_kary_tree_top(sendbuf, recvbuf, count,
401 dtype, op, root, comm, request, portals4_module);
402 if (OMPI_SUCCESS != ret)
403 return ret;
404 ret = reduce_kary_tree_bottom(request);
405 if (OMPI_SUCCESS != ret)
406 return ret;
407
408 OMPI_COLL_PORTALS4_REQUEST_RETURN(request);
409 return (OMPI_SUCCESS);
410 }
411
412
413 int
ompi_coll_portals4_ireduce_intra(const void * sendbuf,void * recvbuf,int count,MPI_Datatype dtype,MPI_Op op,int root,struct ompi_communicator_t * comm,ompi_request_t ** ompi_request,struct mca_coll_base_module_2_3_0_t * module)414 ompi_coll_portals4_ireduce_intra(const void* sendbuf, void* recvbuf, int count,
415 MPI_Datatype dtype, MPI_Op op,
416 int root,
417 struct ompi_communicator_t *comm,
418 ompi_request_t ** ompi_request,
419 struct mca_coll_base_module_2_3_0_t *module)
420 {
421 int ret;
422 mca_coll_portals4_module_t *portals4_module = (mca_coll_portals4_module_t*) module;
423 ompi_coll_portals4_request_t *request;
424
425 OMPI_COLL_PORTALS4_REQUEST_ALLOC(comm, request);
426 if (NULL == request) {
427 opal_output_verbose(1, ompi_coll_base_framework.framework_output,
428 "%s:%d: request alloc failed\n",
429 __FILE__, __LINE__);
430 return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
431 }
432
433 *ompi_request = &request->super;
434 request->fallback_request = ompi_request;
435 request->is_sync = false;
436
437 ret = reduce_kary_tree_top(sendbuf, recvbuf, count,
438 dtype, op, root, comm, request, portals4_module);
439 if (OMPI_SUCCESS != ret)
440 return ret;
441
442 if (!request->u.reduce.is_optim) {
443 OMPI_COLL_PORTALS4_REQUEST_RETURN(request);
444 }
445
446 opal_output_verbose(10, ompi_coll_base_framework.framework_output, "ireduce");
447 return (OMPI_SUCCESS);
448 }
449
450 int
ompi_coll_portals4_ireduce_intra_fini(ompi_coll_portals4_request_t * request)451 ompi_coll_portals4_ireduce_intra_fini(ompi_coll_portals4_request_t *request)
452 {
453 int ret;
454
455 ret = reduce_kary_tree_bottom(request);
456 if (OMPI_SUCCESS != ret)
457 return ret;
458
459 ompi_request_complete(&request->super, true);
460
461 return (OMPI_SUCCESS);
462 }
463