1 /* -*- Mode: C; c-basic-offset:4 ; -*- */
2 /*
3  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
4  *                         University Research and Technology
5  *                         Corporation.  All rights reserved.
6  * Copyright (c) 2004-2006 The University of Tennessee and The University
7  *                         of Tennessee Research Foundation.  All rights
8  *                         reserved.
9  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
10  *                         University of Stuttgart.  All rights reserved.
11  * Copyright (c) 2004-2005 The Regents of the University of California.
12  *                         All rights reserved.
13  * Copyright (c) 2012-2013 Los Alamos National Security, LLC.  All rights
14  *                         reserved.
15  * Copyright (c) 2013-2017 Intel, Inc.  All rights reserved.
16  * $COPYRIGHT$
17  *
18  * Additional copyrights may follow
19  *
20  * $HEADER$
21  */
22 
23 #include "orte_config.h"
24 #include "opal/types.h"
25 
26 #include "opal/dss/dss.h"
27 #include "opal/util/output.h"
28 
29 #include "orte/mca/errmgr/errmgr.h"
30 #include "orte/mca/oob/base/base.h"
31 #include "orte/util/name_fns.h"
32 #include "orte/util/threads.h"
33 #include "orte/runtime/orte_globals.h"
34 
35 #include "orte/mca/rml/base/base.h"
36 #include "orte/mca/rml/rml_types.h"
37 #include "rml_oob.h"
38 
send_self_exe(int fd,short args,void * data)39 static void send_self_exe(int fd, short args, void* data)
40 {
41     orte_self_send_xfer_t *xfer = (orte_self_send_xfer_t*)data;
42 
43     ORTE_ACQUIRE_OBJECT(xfer);
44 
45     OPAL_OUTPUT_VERBOSE((1, orte_rml_base_framework.framework_output,
46                          "%s rml_send_to_self callback executing for tag %d",
47                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), xfer->tag));
48 
49     /* execute the send callback function - note that
50      * send-to-self always returns a SUCCESS status
51      */
52     if (NULL != xfer->iov) {
53         if (NULL != xfer->cbfunc.iov) {
54             /* non-blocking iovec send */
55             xfer->cbfunc.iov(ORTE_SUCCESS, ORTE_PROC_MY_NAME, xfer->iov, xfer->count,
56                              xfer->tag, xfer->cbdata);
57         }
58     } else if (NULL != xfer->buffer) {
59         if (NULL != xfer->cbfunc.buffer) {
60             /* non-blocking buffer send */
61             xfer->cbfunc.buffer(ORTE_SUCCESS, ORTE_PROC_MY_NAME, xfer->buffer,
62                                 xfer->tag, xfer->cbdata);
63         }
64     } else {
65         /* should never happen */
66         abort();
67     }
68 
69     /* cleanup the memory */
70     OBJ_RELEASE(xfer);
71 }
72 
orte_rml_oob_send_nb(struct orte_rml_base_module_t * mod,orte_process_name_t * peer,struct iovec * iov,int count,orte_rml_tag_t tag,orte_rml_callback_fn_t cbfunc,void * cbdata)73 int orte_rml_oob_send_nb(struct orte_rml_base_module_t *mod,
74                          orte_process_name_t* peer,
75                          struct iovec* iov,
76                          int count,
77                          orte_rml_tag_t tag,
78                          orte_rml_callback_fn_t cbfunc,
79                          void* cbdata)
80 {
81     orte_rml_recv_t *rcv;
82     orte_rml_send_t *snd;
83     int bytes;
84     orte_self_send_xfer_t *xfer;
85     int i;
86     char* ptr;
87 
88     OPAL_OUTPUT_VERBOSE((1, orte_rml_base_framework.framework_output,
89                          "%s rml_send to peer %s at tag %d",
90                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
91                          ORTE_NAME_PRINT(peer), tag));
92 
93     if (ORTE_RML_TAG_INVALID == tag) {
94         /* cannot send to an invalid tag */
95         ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
96         return ORTE_ERR_BAD_PARAM;
97     }
98     if (NULL == peer ||
99         OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, ORTE_NAME_INVALID, peer)) {
100         /* cannot send to an invalid peer */
101         ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
102         return ORTE_ERR_BAD_PARAM;
103     }
104 
105     /* if this is a message to myself, then just post the message
106      * for receipt - no need to dive into the oob
107      */
108     if (OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, peer, ORTE_PROC_MY_NAME)) {  /* local delivery */
109         OPAL_OUTPUT_VERBOSE((1, orte_rml_base_framework.framework_output,
110                              "%s rml_send_iovec_to_self at tag %d",
111                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), tag));
112         /* send to self is a tad tricky - we really don't want
113          * to track the send callback function throughout the recv
114          * process and execute it upon receipt as this would provide
115          * very different timing from a non-self message. Specifically,
116          * if we just retain a pointer to the incoming data
117          * and then execute the send callback prior to the receive,
118          * then the caller will think we are done with the data and
119          * can release it. So we have to copy the data in order to
120          * execute the send callback prior to receiving the message.
121          *
122          * In truth, this really is a better mimic of the non-self
123          * message behavior. If we actually pushed the message out
124          * on the wire and had it loop back, then we would receive
125          * a new block of data anyway.
126          */
127 
128         /* setup the send callback */
129         xfer = OBJ_NEW(orte_self_send_xfer_t);
130         xfer->iov = iov;
131         xfer->count = count;
132         xfer->cbfunc.iov = cbfunc;
133         xfer->tag = tag;
134         xfer->cbdata = cbdata;
135         /* setup the event for the send callback */
136         ORTE_THREADSHIFT(xfer, orte_event_base, send_self_exe, ORTE_MSG_PRI);
137 
138         /* copy the message for the recv */
139         rcv = OBJ_NEW(orte_rml_recv_t);
140         rcv->sender = *peer;
141         rcv->tag = tag;
142         /* get the total number of bytes in the iovec array */
143         bytes = 0;
144         for (i = 0 ; i < count ; ++i) {
145             bytes += iov[i].iov_len;
146         }
147         /* get the required memory allocation */
148         if (0 < bytes) {
149             rcv->iov.iov_base = (IOVBASE_TYPE*)malloc(bytes);
150             rcv->iov.iov_len = bytes;
151             /* transfer the bytes */
152             ptr =  (char*)rcv->iov.iov_base;
153             for (i = 0 ; i < count ; ++i) {
154                 memcpy(ptr, iov[i].iov_base, iov[i].iov_len);
155                 ptr += iov[i].iov_len;
156             }
157         }
158         /* post the message for receipt - since the send callback was posted
159          * first and has the same priority, it will execute first
160          */
161         ORTE_RML_ACTIVATE_MESSAGE(rcv);
162         return ORTE_SUCCESS;
163     }
164 
165     snd = OBJ_NEW(orte_rml_send_t);
166     snd->dst = *peer;
167     snd->origin = *ORTE_PROC_MY_NAME;
168     snd->tag = tag;
169     snd->iov = iov;
170     snd->count = count;
171     snd->cbfunc.iov = cbfunc;
172     snd->cbdata = cbdata;
173     snd->routed = strdup(mod->routed);
174 
175     /* activate the OOB send state */
176     ORTE_OOB_SEND(snd);
177 
178     return ORTE_SUCCESS;
179 }
180 
orte_rml_oob_send_buffer_nb(struct orte_rml_base_module_t * mod,orte_process_name_t * peer,opal_buffer_t * buffer,orte_rml_tag_t tag,orte_rml_buffer_callback_fn_t cbfunc,void * cbdata)181 int orte_rml_oob_send_buffer_nb(struct orte_rml_base_module_t *mod,
182                                 orte_process_name_t* peer,
183                                 opal_buffer_t* buffer,
184                                 orte_rml_tag_t tag,
185                                 orte_rml_buffer_callback_fn_t cbfunc,
186                                 void* cbdata)
187 {
188     orte_rml_recv_t *rcv;
189     orte_rml_send_t *snd;
190     orte_self_send_xfer_t *xfer;
191 
192     OPAL_OUTPUT_VERBOSE((1, orte_rml_base_framework.framework_output,
193                          "%s rml_send_buffer to peer %s at tag %d",
194                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
195                          ORTE_NAME_PRINT(peer), tag));
196 
197     if (ORTE_RML_TAG_INVALID == tag) {
198         /* cannot send to an invalid tag */
199         ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
200         return ORTE_ERR_BAD_PARAM;
201     }
202     if (NULL == peer ||
203         OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, ORTE_NAME_INVALID, peer)) {
204         /* cannot send to an invalid peer */
205         ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
206         return ORTE_ERR_BAD_PARAM;
207     }
208 
209     /* if this is a message to myself, then just post the message
210      * for receipt - no need to dive into the oob
211      */
212     if (OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, peer, ORTE_PROC_MY_NAME)) {  /* local delivery */
213         OPAL_OUTPUT_VERBOSE((1, orte_rml_base_framework.framework_output,
214                              "%s rml_send_iovec_to_self at tag %d",
215                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), tag));
216         /* send to self is a tad tricky - we really don't want
217          * to track the send callback function throughout the recv
218          * process and execute it upon receipt as this would provide
219          * very different timing from a non-self message. Specifically,
220          * if we just retain a pointer to the incoming data
221          * and then execute the send callback prior to the receive,
222          * then the caller will think we are done with the data and
223          * can release it. So we have to copy the data in order to
224          * execute the send callback prior to receiving the message.
225          *
226          * In truth, this really is a better mimic of the non-self
227          * message behavior. If we actually pushed the message out
228          * on the wire and had it loop back, then we would receive
229          * a new block of data anyway.
230          */
231 
232         /* setup the send callback */
233         xfer = OBJ_NEW(orte_self_send_xfer_t);
234         xfer->buffer = buffer;
235         xfer->cbfunc.buffer = cbfunc;
236         xfer->tag = tag;
237         xfer->cbdata = cbdata;
238         /* setup the event for the send callback */
239         ORTE_THREADSHIFT(xfer, orte_event_base, send_self_exe, ORTE_MSG_PRI);
240 
241         /* copy the message for the recv */
242         rcv = OBJ_NEW(orte_rml_recv_t);
243         rcv->sender = *peer;
244         rcv->tag = tag;
245         rcv->iov.iov_base = (IOVBASE_TYPE*)malloc(buffer->bytes_used);
246         memcpy(rcv->iov.iov_base, buffer->base_ptr, buffer->bytes_used);
247         rcv->iov.iov_len = buffer->bytes_used;
248         /* post the message for receipt - since the send callback was posted
249          * first and has the same priority, it will execute first
250          */
251         ORTE_RML_ACTIVATE_MESSAGE(rcv);
252         return ORTE_SUCCESS;
253     }
254 
255     snd = OBJ_NEW(orte_rml_send_t);
256     snd->dst = *peer;
257     snd->origin = *ORTE_PROC_MY_NAME;
258     snd->tag = tag;
259     snd->buffer = buffer;
260     snd->cbfunc.buffer = cbfunc;
261     snd->cbdata = cbdata;
262     snd->routed = strdup(mod->routed);
263 
264     /* activate the OOB send state */
265     ORTE_OOB_SEND(snd);
266 
267     return ORTE_SUCCESS;
268 }
269