1 /*
2  * Copyright (c) 2015      Sandia National Laboratories. All rights reserved.
3  * Copyright (c) 2015      Bull SAS.  All rights reserved.
4  * Copyright (c) 2015      Research Organization for Information Science
5  *                         and Technology (RIST). All rights reserved.
6  * Copyright (c) 2017      IBM Corporation.  All rights reserved.
7  * $COPYRIGHT$
8  *
9  * Additional copyrights may follow
10  *
11  * $HEADER$
12  */
13 
14 #include "ompi_config.h"
15 #include "coll_portals4.h"
16 #include "coll_portals4_request.h"
17 
18 #include <stdio.h>
19 
20 #include "mpi.h"
21 #include "ompi/constants.h"
22 #include "ompi/datatype/ompi_datatype.h"
23 #include "ompi/datatype/ompi_datatype_internal.h"
24 #include "ompi/op/op.h"
25 #include "opal/util/bit_ops.h"
26 #include "ompi/mca/pml/pml.h"
27 #include "ompi/mca/coll/coll.h"
28 #include "ompi/mca/coll/base/base.h"
29 
30 #define COLL_PORTALS4_REDUCE_MAX_CHILDREN	2
31 
32 
33 static int
reduce_kary_tree_top(const void * sendbuf,void * recvbuf,int count,MPI_Datatype dtype,MPI_Op op,int root,struct ompi_communicator_t * comm,ompi_coll_portals4_request_t * request,mca_coll_portals4_module_t * module)34 reduce_kary_tree_top(const void *sendbuf, void *recvbuf, int count,
35         MPI_Datatype dtype, MPI_Op op,
36         int root,
37         struct ompi_communicator_t *comm,
38         ompi_coll_portals4_request_t *request,
39         mca_coll_portals4_module_t *module)
40 {
41     bool is_sync = request->is_sync;
42     int ret;
43     unsigned int i;
44     int size = ompi_comm_size(comm);
45     int rank = ompi_comm_rank(comm);
46     ptl_rank_t parent, child[COLL_PORTALS4_REDUCE_MAX_CHILDREN];
47     size_t internal_count, length;
48     ptl_handle_md_t zero_md_h, data_md_h;
49     ptl_handle_me_t me_h;
50     ptl_me_t me;
51     ptl_match_bits_t match_bits_ack, match_bits_rtr, match_bits;
52     ptl_ct_event_t ct;
53     ptl_op_t ptl_op;
54     ptl_datatype_t ptl_dtype;
55 
56 
57     request->type = OMPI_COLL_PORTALS4_TYPE_REDUCE;
58 
59     /*
60      ** Initialization
61      */
62 
63     for (i = 0 ; i < COLL_PORTALS4_REDUCE_MAX_CHILDREN ; i++) {
64         child[i] = PTL_INVALID_RANK;
65     }
66 
67     parent = PTL_INVALID_RANK;
68 
69     zero_md_h = mca_coll_portals4_component.zero_md_h;
70     data_md_h = mca_coll_portals4_component.data_md_h;
71 
72     internal_count = opal_atomic_add_fetch_size_t(&module->coll_count, 1);
73 
74     /*
75      ** DATATYPE and SIZES
76      */
77     ret = ompi_datatype_type_size(dtype, &length);
78     length *= count;
79 
80     request->u.reduce.is_optim = is_reduce_optimizable(dtype, length, op, &ptl_dtype, &ptl_op);
81 
82     if (request->u.reduce.is_optim) {
83 
84         /*
85          * TOPOLOGY
86          */
87 
88         /* this function is dependent on the number of segments,
89          * if we use segmentation pipe-line is preferred, and
90          * binary tree otherwise */
91 
92         get_k_ary_tree(COLL_PORTALS4_REDUCE_MAX_CHILDREN,
93                 rank, size, root, &parent, child, &request->u.reduce.child_nb);
94 
95         /*
96          * PORTALS4 RESOURCE ALLOCATION
97          */
98 
99         /* Compute match bits */
100         COLL_PORTALS4_SET_BITS(match_bits_ack, ompi_comm_get_cid(comm), 1, 0,
101                 COLL_PORTALS4_REDUCE, 0, internal_count);
102 
103         COLL_PORTALS4_SET_BITS(match_bits_rtr, ompi_comm_get_cid(comm), 0, 1,
104                 COLL_PORTALS4_REDUCE, 0, internal_count);
105 
106         COLL_PORTALS4_SET_BITS(match_bits, ompi_comm_get_cid(comm), 0, 0,
107                 COLL_PORTALS4_REDUCE, 0, internal_count);
108 
109         if ((ret = PtlCTAlloc(mca_coll_portals4_component.ni_h, &request->u.reduce.trig_ct_h)) != 0) {
110             return opal_stderr("PtlCTAlloc failed", __FILE__, __LINE__, ret);
111         }
112 
113         /* warning : all the operations will be executed on the recvbuf */
114         if (rank != root) {
115             request->u.reduce.free_buffer = malloc(length);
116             if (NULL == request->u.reduce.free_buffer) {
117                 return OMPI_ERR_OUT_OF_RESOURCE;
118             }
119             recvbuf = (void*)request->u.reduce.free_buffer;
120 
121             memcpy(recvbuf, sendbuf, length);
122         }
123         else {
124             request->u.reduce.free_buffer = NULL;
125             if (sendbuf != MPI_IN_PLACE) {
126                 memcpy(recvbuf, sendbuf, length);
127             }
128         }
129 
130         if (request->u.reduce.child_nb) {
131 
132             /*
133              ** Prepare Data ME
134              */
135             memset(&me, 0, sizeof(ptl_me_t));
136             me.start = recvbuf;
137             me.length = length;
138             me.ct_handle = request->u.reduce.trig_ct_h;
139             me.uid = mca_coll_portals4_component.uid;
140             me.options = PTL_ME_OP_PUT | PTL_ME_EVENT_SUCCESS_DISABLE |
141                     PTL_ME_EVENT_LINK_DISABLE | PTL_ME_EVENT_UNLINK_DISABLE |
142                     PTL_ME_EVENT_CT_COMM;
143             me.match_id.phys.nid = PTL_NID_ANY;
144             me.match_id.phys.pid = PTL_PID_ANY;
145             me.match_bits = match_bits;
146             me.ignore_bits = 0;
147 
148             if ((ret = PtlMEAppend(mca_coll_portals4_component.ni_h,
149                     mca_coll_portals4_component.pt_idx,
150                     &me, PTL_PRIORITY_LIST, NULL,
151                     &request->u.reduce.data_me_h)) != 0) {
152                 return opal_stderr("PtlMEAppend failed", __FILE__, __LINE__, ret);
153             }
154         }
155 
156         if (rank != root) {
157             request->u.reduce.use_ack_ct_h = true;
158 
159             if ((ret = PtlCTAlloc(mca_coll_portals4_component.ni_h, &request->u.reduce.ack_ct_h)) != 0) {
160                 return opal_stderr("PtlCTAlloc failed", __FILE__, __LINE__, ret);
161             }
162 
163             /*
164              ** Prepare ME for data ACK Put
165              ** Priority List
166              */
167 
168             memset(&me, 0, sizeof(ptl_me_t));
169             me.start = NULL;
170             me.length = 0;
171             me.min_free = 0;
172             me.uid = mca_coll_portals4_component.uid;
173             me.options = PTL_ME_OP_PUT | PTL_ME_EVENT_SUCCESS_DISABLE |
174                     PTL_ME_EVENT_LINK_DISABLE | PTL_ME_EVENT_UNLINK_DISABLE |
175                     PTL_ME_USE_ONCE |
176                     PTL_ME_EVENT_CT_COMM;
177             me.match_id.phys.nid = PTL_NID_ANY;
178             me.match_id.phys.pid = PTL_PID_ANY;
179             me.match_bits = match_bits_ack;
180             me.ignore_bits = 0;
181             me.ct_handle = request->u.reduce.ack_ct_h;
182 
183             if ((ret = PtlMEAppend(mca_coll_portals4_component.ni_h,
184                     mca_coll_portals4_component.pt_idx,
185                     &me, PTL_PRIORITY_LIST,
186                     NULL,
187                     &me_h)) != 0) {
188                 return opal_stderr("PtlMEAppend failed", __FILE__, __LINE__, ret);
189             }
190 
191             /*
192              ** Prepare ME for sending RTR Put
193              ** Priority List, match also with "Overflow list Me" in coll_portals4_component
194              */
195 
196             memset(&me, 0, sizeof(ptl_me_t));
197             me.start = NULL;
198             me.length = 0;
199             me.min_free = 0;
200             me.uid = mca_coll_portals4_component.uid;
201             me.options = PTL_ME_OP_PUT | PTL_ME_EVENT_SUCCESS_DISABLE |
202                     PTL_ME_EVENT_LINK_DISABLE | PTL_ME_EVENT_UNLINK_DISABLE |
203                     PTL_ME_USE_ONCE |
204                     PTL_ME_EVENT_CT_COMM | PTL_ME_EVENT_CT_OVERFLOW;
205             me.match_id.phys.nid = PTL_NID_ANY;
206             me.match_id.phys.pid = PTL_PID_ANY;
207             me.match_bits = match_bits_rtr;
208             me.ignore_bits = 0;
209             me.ct_handle = request->u.reduce.trig_ct_h;
210 
211             if ((ret = PtlMEAppend(mca_coll_portals4_component.ni_h,
212                     mca_coll_portals4_component.pt_idx,
213                     &me, PTL_PRIORITY_LIST,
214                     NULL,
215                     &me_h)) != 0) {
216                 return opal_stderr("PtlMEAppend failed", __FILE__, __LINE__, ret);
217             }
218 
219             /* Send Atomic operation to the parent */
220             if ((ret = PtlTriggeredAtomic(data_md_h,
221                     (uint64_t)recvbuf,
222                     length, PTL_NO_ACK_REQ,
223                     ompi_coll_portals4_get_peer(comm, parent),
224                     mca_coll_portals4_component.pt_idx,
225                     match_bits, 0, NULL, 0,
226                     ptl_op, ptl_dtype, request->u.reduce.trig_ct_h,
227                     request->u.reduce.child_nb + 1)) != 0) {
228                 return opal_stderr("PtlTriggeredAtomic failed", __FILE__, __LINE__, ret);
229             }
230         }
231         else {
232             request->u.reduce.use_ack_ct_h = false;
233         }
234 
235         if (request->u.reduce.child_nb) {
236             for (i = 0 ; i < COLL_PORTALS4_REDUCE_MAX_CHILDREN ; i++) {
237                 if (child[i] != PTL_INVALID_RANK) {
238                     /*
239                      * Prepare Triggered Put to ACK Data to children
240                      *
241                      */
242 
243                     if ((ret = PtlTriggeredPut (zero_md_h, 0, 0, PTL_NO_ACK_REQ,
244                             ompi_coll_portals4_get_peer(comm, child[i]),
245                             mca_coll_portals4_component.pt_idx,
246                             match_bits_ack, 0, NULL, 0,
247                             request->u.reduce.trig_ct_h,
248                             (rank != root) ?
249                                     request->u.reduce.child_nb + 1 :
250                                     request->u.reduce.child_nb)) != 0) {
251                         return opal_stderr("PtlTriggeredPut failed", __FILE__, __LINE__, ret);
252                     }
253 
254                     /*
255                      * Send RTR to children
256                      *
257                      */
258 
259                     /* and there, we only send the RTR when all the MEs are ready */
260                     if ((ret = PtlPut(zero_md_h, 0, 0, PTL_NO_ACK_REQ,
261                             ompi_coll_portals4_get_peer(comm, child[i]),
262                             mca_coll_portals4_component.pt_idx,
263                             match_bits_rtr, 0, NULL, 0)) != PTL_OK) {
264                         return opal_stderr("Put RTR failed %d", __FILE__, __LINE__, ret);
265                     }
266                 }
267             }
268         }
269 
270         if (rank != root) {
271             if (is_sync) {
272                 if ((ret = PtlCTWait(request->u.reduce.ack_ct_h, 1, &ct)) != 0) {
273                     opal_stderr("PtlCTWait failed", __FILE__, __LINE__, ret);
274                 }
275             }
276             else {
277                 if ((ret = PtlTriggeredPut (zero_md_h, 0, 0, PTL_NO_ACK_REQ,
278                         ompi_coll_portals4_get_peer(comm, rank),
279                         mca_coll_portals4_component.finish_pt_idx,
280                         0, 0, NULL, (uintptr_t) request,
281                         request->u.reduce.ack_ct_h,
282                         1)) != 0) {
283                     return opal_stderr("PtlTriggeredPut failed", __FILE__, __LINE__, ret);
284                 }
285             }
286         }
287         else {
288             if (is_sync) {
289                 if ((ret = PtlCTWait(request->u.reduce.trig_ct_h,
290                         request->u.reduce.child_nb, &ct)) != 0) {
291                     opal_stderr("PtlCTWait failed", __FILE__, __LINE__, ret);
292                 }
293             }
294             else {
295                 if ((ret = PtlTriggeredPut (zero_md_h, 0, 0, PTL_NO_ACK_REQ,
296                         ompi_coll_portals4_get_peer(comm, rank),
297                         mca_coll_portals4_component.finish_pt_idx,
298                         0, 0, NULL, (uintptr_t) request,
299                         request->u.reduce.trig_ct_h,
300                         request->u.reduce.child_nb)) != 0) {
301                     return opal_stderr("PtlTriggeredPut failed", __FILE__, __LINE__, ret);
302                 }
303             }
304         }
305     }
306     else {
307         opal_output_verbose(100, ompi_coll_base_framework.framework_output,
308                 "rank %d - optimization not supported, falling back to previous handler\n", rank);
309 
310         if (request->is_sync) {
311             if ((module->previous_reduce) && (module->previous_reduce_module)) {
312                 ret = module->previous_reduce(sendbuf, recvbuf, count, dtype, op, root,
313                         comm, module->previous_reduce_module);
314             }
315             else {
316                 opal_output_verbose(1, ompi_coll_base_framework.framework_output,
317                         "rank %d - no previous reduce handler is available, aborting\n", rank);
318                 return (OMPI_ERROR);
319             }
320         }
321         else {
322             if ((module->previous_ireduce) && (module->previous_ireduce_module)) {
323                 ret =  module->previous_ireduce(sendbuf, recvbuf, count, dtype, op, root,
324                         comm, request->fallback_request, module->previous_ireduce_module);
325             }
326             else {
327                 opal_output_verbose(1, ompi_coll_base_framework.framework_output,
328                         "rank %d - no previous ireduce handler is available, aborting\n", rank);
329                 return (OMPI_ERROR);
330             }
331         }
332         return ret;
333     }
334     return (OMPI_SUCCESS);
335 }
336 
337 
338 
339 
340 static int
reduce_kary_tree_bottom(ompi_coll_portals4_request_t * request)341 reduce_kary_tree_bottom(ompi_coll_portals4_request_t *request)
342 {
343     int ret, line;
344 
345     if (request->u.reduce.is_optim) {
346         PtlAtomicSync();
347 
348         if (request->u.reduce.use_ack_ct_h) {
349             ret = PtlCTFree(request->u.reduce.ack_ct_h);
350             if (PTL_OK != ret) { ret = OMPI_ERROR; line = __LINE__; goto err_hdlr; }
351         }
352 
353         if (request->u.reduce.child_nb) {
354             do {
355                 ret = PtlMEUnlink(request->u.reduce.data_me_h);
356             } while (PTL_IN_USE == ret);
357             if (PTL_OK != ret) { ret = OMPI_ERROR; line = __LINE__; goto err_hdlr; }
358         }
359 
360         ret = PtlCTFree(request->u.reduce.trig_ct_h);
361         if (PTL_OK != ret) { ret = OMPI_ERROR; line = __LINE__; goto err_hdlr; }
362 
363         if (request->u.reduce.free_buffer) {
364             free(request->u.reduce.free_buffer);
365         }
366     }
367     return (OMPI_SUCCESS);
368 
369 err_hdlr:
370     opal_output(ompi_coll_base_framework.framework_output,
371                 "%s:%4d:%4d\tError occurred ret=%d",
372                 __FILE__, __LINE__, line, ret);
373 
374     return ret;
375 }
376 
377 
378 int
ompi_coll_portals4_reduce_intra(const void * sendbuf,void * recvbuf,int count,MPI_Datatype dtype,MPI_Op op,int root,struct ompi_communicator_t * comm,mca_coll_base_module_t * module)379 ompi_coll_portals4_reduce_intra(const void *sendbuf, void *recvbuf, int count,
380         MPI_Datatype dtype, MPI_Op op,
381         int root,
382         struct ompi_communicator_t *comm,
383         mca_coll_base_module_t *module)
384 {
385     int ret;
386     mca_coll_portals4_module_t *portals4_module = (mca_coll_portals4_module_t*) module;
387     ompi_coll_portals4_request_t *request;
388 
389     OMPI_COLL_PORTALS4_REQUEST_ALLOC(comm, request);
390     if (NULL == request) {
391         opal_output_verbose(1, ompi_coll_base_framework.framework_output,
392                 "%s:%d: request alloc failed\n",
393                 __FILE__, __LINE__);
394         return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
395     }
396 
397     request->is_sync = true;
398     request->fallback_request = NULL;
399 
400     ret = reduce_kary_tree_top(sendbuf, recvbuf, count,
401             dtype, op, root, comm,  request,  portals4_module);
402     if (OMPI_SUCCESS != ret)
403         return ret;
404     ret = reduce_kary_tree_bottom(request);
405     if (OMPI_SUCCESS != ret)
406         return ret;
407 
408     OMPI_COLL_PORTALS4_REQUEST_RETURN(request);
409     return (OMPI_SUCCESS);
410 }
411 
412 
413 int
ompi_coll_portals4_ireduce_intra(const void * sendbuf,void * recvbuf,int count,MPI_Datatype dtype,MPI_Op op,int root,struct ompi_communicator_t * comm,ompi_request_t ** ompi_request,struct mca_coll_base_module_2_3_0_t * module)414 ompi_coll_portals4_ireduce_intra(const void* sendbuf, void* recvbuf, int count,
415         MPI_Datatype dtype, MPI_Op op,
416         int root,
417         struct ompi_communicator_t *comm,
418         ompi_request_t ** ompi_request,
419         struct mca_coll_base_module_2_3_0_t *module)
420 {
421     int ret;
422     mca_coll_portals4_module_t *portals4_module = (mca_coll_portals4_module_t*) module;
423     ompi_coll_portals4_request_t *request;
424 
425     OMPI_COLL_PORTALS4_REQUEST_ALLOC(comm, request);
426     if (NULL == request) {
427         opal_output_verbose(1, ompi_coll_base_framework.framework_output,
428                 "%s:%d: request alloc failed\n",
429                 __FILE__, __LINE__);
430         return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
431     }
432 
433     *ompi_request = &request->super;
434     request->fallback_request = ompi_request;
435     request->is_sync = false;
436 
437     ret = reduce_kary_tree_top(sendbuf, recvbuf, count,
438             dtype, op, root, comm,  request,  portals4_module);
439     if (OMPI_SUCCESS != ret)
440         return ret;
441 
442     if (!request->u.reduce.is_optim) {
443         OMPI_COLL_PORTALS4_REQUEST_RETURN(request);
444     }
445 
446     opal_output_verbose(10, ompi_coll_base_framework.framework_output, "ireduce");
447     return (OMPI_SUCCESS);
448 }
449 
450 int
ompi_coll_portals4_ireduce_intra_fini(ompi_coll_portals4_request_t * request)451 ompi_coll_portals4_ireduce_intra_fini(ompi_coll_portals4_request_t *request)
452 {
453     int ret;
454 
455     ret = reduce_kary_tree_bottom(request);
456     if (OMPI_SUCCESS != ret)
457         return ret;
458 
459     ompi_request_complete(&request->super, true);
460 
461     return (OMPI_SUCCESS);
462 }
463