1 /* -*- Mode: C; c-basic-offset:4 ; -*- */
2 /*
3 * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
4 * University Research and Technology
5 * Corporation. All rights reserved.
6 * Copyright (c) 2004-2006 The University of Tennessee and The University
7 * of Tennessee Research Foundation. All rights
8 * reserved.
9 * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
10 * University of Stuttgart. All rights reserved.
11 * Copyright (c) 2004-2005 The Regents of the University of California.
12 * All rights reserved.
13 * Copyright (c) 2012-2013 Los Alamos National Security, LLC. All rights
14 * reserved.
15 * Copyright (c) 2013-2017 Intel, Inc. All rights reserved.
16 * $COPYRIGHT$
17 *
18 * Additional copyrights may follow
19 *
20 * $HEADER$
21 */
22
23 #include "orte_config.h"
24 #include "opal/types.h"
25
26 #include "opal/dss/dss.h"
27 #include "opal/util/output.h"
28
29 #include "orte/mca/errmgr/errmgr.h"
30 #include "orte/mca/oob/base/base.h"
31 #include "orte/util/name_fns.h"
32 #include "orte/util/threads.h"
33 #include "orte/runtime/orte_globals.h"
34
35 #include "orte/mca/rml/base/base.h"
36 #include "orte/mca/rml/rml_types.h"
37 #include "rml_oob.h"
38
send_self_exe(int fd,short args,void * data)39 static void send_self_exe(int fd, short args, void* data)
40 {
41 orte_self_send_xfer_t *xfer = (orte_self_send_xfer_t*)data;
42
43 ORTE_ACQUIRE_OBJECT(xfer);
44
45 OPAL_OUTPUT_VERBOSE((1, orte_rml_base_framework.framework_output,
46 "%s rml_send_to_self callback executing for tag %d",
47 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), xfer->tag));
48
49 /* execute the send callback function - note that
50 * send-to-self always returns a SUCCESS status
51 */
52 if (NULL != xfer->iov) {
53 if (NULL != xfer->cbfunc.iov) {
54 /* non-blocking iovec send */
55 xfer->cbfunc.iov(ORTE_SUCCESS, ORTE_PROC_MY_NAME, xfer->iov, xfer->count,
56 xfer->tag, xfer->cbdata);
57 }
58 } else if (NULL != xfer->buffer) {
59 if (NULL != xfer->cbfunc.buffer) {
60 /* non-blocking buffer send */
61 xfer->cbfunc.buffer(ORTE_SUCCESS, ORTE_PROC_MY_NAME, xfer->buffer,
62 xfer->tag, xfer->cbdata);
63 }
64 } else {
65 /* should never happen */
66 abort();
67 }
68
69 /* cleanup the memory */
70 OBJ_RELEASE(xfer);
71 }
72
orte_rml_oob_send_nb(struct orte_rml_base_module_t * mod,orte_process_name_t * peer,struct iovec * iov,int count,orte_rml_tag_t tag,orte_rml_callback_fn_t cbfunc,void * cbdata)73 int orte_rml_oob_send_nb(struct orte_rml_base_module_t *mod,
74 orte_process_name_t* peer,
75 struct iovec* iov,
76 int count,
77 orte_rml_tag_t tag,
78 orte_rml_callback_fn_t cbfunc,
79 void* cbdata)
80 {
81 orte_rml_recv_t *rcv;
82 orte_rml_send_t *snd;
83 int bytes;
84 orte_self_send_xfer_t *xfer;
85 int i;
86 char* ptr;
87
88 OPAL_OUTPUT_VERBOSE((1, orte_rml_base_framework.framework_output,
89 "%s rml_send to peer %s at tag %d",
90 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
91 ORTE_NAME_PRINT(peer), tag));
92
93 if (ORTE_RML_TAG_INVALID == tag) {
94 /* cannot send to an invalid tag */
95 ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
96 return ORTE_ERR_BAD_PARAM;
97 }
98 if (NULL == peer ||
99 OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, ORTE_NAME_INVALID, peer)) {
100 /* cannot send to an invalid peer */
101 ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
102 return ORTE_ERR_BAD_PARAM;
103 }
104
105 /* if this is a message to myself, then just post the message
106 * for receipt - no need to dive into the oob
107 */
108 if (OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, peer, ORTE_PROC_MY_NAME)) { /* local delivery */
109 OPAL_OUTPUT_VERBOSE((1, orte_rml_base_framework.framework_output,
110 "%s rml_send_iovec_to_self at tag %d",
111 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), tag));
112 /* send to self is a tad tricky - we really don't want
113 * to track the send callback function throughout the recv
114 * process and execute it upon receipt as this would provide
115 * very different timing from a non-self message. Specifically,
116 * if we just retain a pointer to the incoming data
117 * and then execute the send callback prior to the receive,
118 * then the caller will think we are done with the data and
119 * can release it. So we have to copy the data in order to
120 * execute the send callback prior to receiving the message.
121 *
122 * In truth, this really is a better mimic of the non-self
123 * message behavior. If we actually pushed the message out
124 * on the wire and had it loop back, then we would receive
125 * a new block of data anyway.
126 */
127
128 /* setup the send callback */
129 xfer = OBJ_NEW(orte_self_send_xfer_t);
130 xfer->iov = iov;
131 xfer->count = count;
132 xfer->cbfunc.iov = cbfunc;
133 xfer->tag = tag;
134 xfer->cbdata = cbdata;
135 /* setup the event for the send callback */
136 ORTE_THREADSHIFT(xfer, orte_event_base, send_self_exe, ORTE_MSG_PRI);
137
138 /* copy the message for the recv */
139 rcv = OBJ_NEW(orte_rml_recv_t);
140 rcv->sender = *peer;
141 rcv->tag = tag;
142 /* get the total number of bytes in the iovec array */
143 bytes = 0;
144 for (i = 0 ; i < count ; ++i) {
145 bytes += iov[i].iov_len;
146 }
147 /* get the required memory allocation */
148 if (0 < bytes) {
149 rcv->iov.iov_base = (IOVBASE_TYPE*)malloc(bytes);
150 rcv->iov.iov_len = bytes;
151 /* transfer the bytes */
152 ptr = (char*)rcv->iov.iov_base;
153 for (i = 0 ; i < count ; ++i) {
154 memcpy(ptr, iov[i].iov_base, iov[i].iov_len);
155 ptr += iov[i].iov_len;
156 }
157 }
158 /* post the message for receipt - since the send callback was posted
159 * first and has the same priority, it will execute first
160 */
161 ORTE_RML_ACTIVATE_MESSAGE(rcv);
162 return ORTE_SUCCESS;
163 }
164
165 snd = OBJ_NEW(orte_rml_send_t);
166 snd->dst = *peer;
167 snd->origin = *ORTE_PROC_MY_NAME;
168 snd->tag = tag;
169 snd->iov = iov;
170 snd->count = count;
171 snd->cbfunc.iov = cbfunc;
172 snd->cbdata = cbdata;
173 snd->routed = strdup(mod->routed);
174
175 /* activate the OOB send state */
176 ORTE_OOB_SEND(snd);
177
178 return ORTE_SUCCESS;
179 }
180
orte_rml_oob_send_buffer_nb(struct orte_rml_base_module_t * mod,orte_process_name_t * peer,opal_buffer_t * buffer,orte_rml_tag_t tag,orte_rml_buffer_callback_fn_t cbfunc,void * cbdata)181 int orte_rml_oob_send_buffer_nb(struct orte_rml_base_module_t *mod,
182 orte_process_name_t* peer,
183 opal_buffer_t* buffer,
184 orte_rml_tag_t tag,
185 orte_rml_buffer_callback_fn_t cbfunc,
186 void* cbdata)
187 {
188 orte_rml_recv_t *rcv;
189 orte_rml_send_t *snd;
190 orte_self_send_xfer_t *xfer;
191
192 OPAL_OUTPUT_VERBOSE((1, orte_rml_base_framework.framework_output,
193 "%s rml_send_buffer to peer %s at tag %d",
194 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
195 ORTE_NAME_PRINT(peer), tag));
196
197 if (ORTE_RML_TAG_INVALID == tag) {
198 /* cannot send to an invalid tag */
199 ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
200 return ORTE_ERR_BAD_PARAM;
201 }
202 if (NULL == peer ||
203 OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, ORTE_NAME_INVALID, peer)) {
204 /* cannot send to an invalid peer */
205 ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
206 return ORTE_ERR_BAD_PARAM;
207 }
208
209 /* if this is a message to myself, then just post the message
210 * for receipt - no need to dive into the oob
211 */
212 if (OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, peer, ORTE_PROC_MY_NAME)) { /* local delivery */
213 OPAL_OUTPUT_VERBOSE((1, orte_rml_base_framework.framework_output,
214 "%s rml_send_iovec_to_self at tag %d",
215 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), tag));
216 /* send to self is a tad tricky - we really don't want
217 * to track the send callback function throughout the recv
218 * process and execute it upon receipt as this would provide
219 * very different timing from a non-self message. Specifically,
220 * if we just retain a pointer to the incoming data
221 * and then execute the send callback prior to the receive,
222 * then the caller will think we are done with the data and
223 * can release it. So we have to copy the data in order to
224 * execute the send callback prior to receiving the message.
225 *
226 * In truth, this really is a better mimic of the non-self
227 * message behavior. If we actually pushed the message out
228 * on the wire and had it loop back, then we would receive
229 * a new block of data anyway.
230 */
231
232 /* setup the send callback */
233 xfer = OBJ_NEW(orte_self_send_xfer_t);
234 xfer->buffer = buffer;
235 xfer->cbfunc.buffer = cbfunc;
236 xfer->tag = tag;
237 xfer->cbdata = cbdata;
238 /* setup the event for the send callback */
239 ORTE_THREADSHIFT(xfer, orte_event_base, send_self_exe, ORTE_MSG_PRI);
240
241 /* copy the message for the recv */
242 rcv = OBJ_NEW(orte_rml_recv_t);
243 rcv->sender = *peer;
244 rcv->tag = tag;
245 rcv->iov.iov_base = (IOVBASE_TYPE*)malloc(buffer->bytes_used);
246 memcpy(rcv->iov.iov_base, buffer->base_ptr, buffer->bytes_used);
247 rcv->iov.iov_len = buffer->bytes_used;
248 /* post the message for receipt - since the send callback was posted
249 * first and has the same priority, it will execute first
250 */
251 ORTE_RML_ACTIVATE_MESSAGE(rcv);
252 return ORTE_SUCCESS;
253 }
254
255 snd = OBJ_NEW(orte_rml_send_t);
256 snd->dst = *peer;
257 snd->origin = *ORTE_PROC_MY_NAME;
258 snd->tag = tag;
259 snd->buffer = buffer;
260 snd->cbfunc.buffer = cbfunc;
261 snd->cbdata = cbdata;
262 snd->routed = strdup(mod->routed);
263
264 /* activate the OOB send state */
265 ORTE_OOB_SEND(snd);
266
267 return ORTE_SUCCESS;
268 }
269