1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
2 /*
3 * Copyright (c) 2011-2012 Los Alamos National Security, LLC. All rights
4 * reserved.
5 * 2014 Mellanox Technologies, Inc.
6 * All rights reserved.
7 * Copyright (c) 2015-2016 Intel, Inc. All rights reserved.
8 * $COPYRIGHT$
9 *
10 * Additional copyrights may follow
11 *
12 * $HEADER$
13 *
14 */
15 #include "oob_ud_send.h"
16 #include "orte/mca/errmgr/errmgr.h"
17
mca_oob_ud_send_cb(mca_oob_ud_msg_t * msg,int rc)18 static void mca_oob_ud_send_cb (mca_oob_ud_msg_t *msg, int rc)
19 {
20 mca_oob_ud_send_complete (msg->req, rc);
21 }
22
mca_oob_ud_send_self(orte_rml_send_t * msg)23 static int mca_oob_ud_send_self (orte_rml_send_t *msg)
24 {
25 unsigned int srco, dsto;
26 mca_oob_ud_req_t *req;
27 int srci, dsti;
28 int rc, size;
29
30 MCA_OOB_UD_IOV_SIZE(msg, size);
31
32 opal_output_verbose(10, orte_oob_base_framework.framework_output,
33 "%s mca_oob_ud_send_self: sending %d bytes to myself",
34 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), size);
35
36 rc = mca_oob_ud_get_recv_req (*ORTE_PROC_MY_NAME, msg->tag, &req, (msg->iov != NULL) ? true : false);
37 if (ORTE_SUCCESS != rc) {
38 return rc;
39 }
40
41 req->req_rem_data_len = size;
42 req->req_is_eager = true;
43
44 rc = mca_oob_ud_recv_alloc (req);
45 if (ORTE_SUCCESS != rc) {
46 ORTE_ERROR_LOG(rc);
47 if (MCA_OOB_UD_REQ_IOV == req->req_data_type) {
48 free (req->req_data.iov.uiov);
49 }
50 OBJ_RELEASE(req);
51 return rc;
52 }
53
54 srci = dsti = 0;
55 srco = dsto = 0;
56
57 if (msg->iov != NULL) {
58 do {
59 req->req_data_type = MCA_OOB_UD_REQ_IOV;
60 size_t copy = min(msg->iov[srci].iov_len - srco,
61 req->req_data.iov.uiov[dsti].iov_len - dsto);
62
63 memmove ((unsigned char *) req->req_data.iov.uiov[dsti].iov_base + dsto,
64 (unsigned char *) msg->iov[srci].iov_base + srco, copy);
65
66 srco += copy;
67 if (srco == msg->iov[srci].iov_len) {
68 srci++;
69 srco = 0;
70 }
71
72 dsto += copy;
73 if (dsto == req->req_data.iov.uiov[dsti].iov_len) {
74 dsti++;
75 dsto = 0;
76 }
77 } while (srci < req->req_data.iov.count && dsti < msg->count);
78 } else {
79 req->req_data_type = MCA_OOB_UD_REQ_BUF;
80
81 opal_buffer_t *buffer;
82 buffer = OBJ_NEW(opal_buffer_t);
83
84 if (OPAL_SUCCESS != (rc = opal_dss.copy_payload(buffer, msg->buffer))) {
85 ORTE_ERROR_LOG(rc);
86 OBJ_RELEASE(buffer);
87 return rc;
88 }
89 if (OPAL_SUCCESS != (rc = opal_dss.unload(buffer, (void **)&req->req_data.buf.p, &req->req_data.buf.size)))
90 {
91 ORTE_ERROR_LOG(rc);
92 OBJ_RELEASE(buffer);
93 free(req->req_data.buf.p);
94 return rc;
95 }
96 OBJ_RELEASE(buffer);
97 }
98
99 req->state = MCA_OOB_UD_REQ_COMPLETE;
100
101 opal_output_verbose(10, orte_oob_base_framework.framework_output,
102 "%s mca_oob_ud_send_self: complete. calling callbacks",
103 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
104
105 /* queue up recv callback */
106 mca_oob_ud_event_queue_completed (req);
107
108 req->rml_msg->status = ORTE_SUCCESS;
109
110 return size;
111 }
112
mca_oob_ud_process_send_nb(int fd,short args,void * cbdata)113 int mca_oob_ud_process_send_nb(int fd, short args, void *cbdata)
114 {
115 mca_oob_ud_msg_op_t *op = (mca_oob_ud_msg_op_t*)cbdata;
116
117 orte_process_name_t hop;
118 mca_oob_ud_peer_t *peer;
119 mca_oob_ud_port_t *port;
120 mca_oob_ud_msg_t *req_msg;
121 mca_oob_ud_req_t *send_req;
122 bool send_eager = false;
123 char *pack_ptr;
124 int rc, size, i;
125
126 if (OPAL_EQUAL == orte_util_compare_name_fields
127 (ORTE_NS_CMP_ALL, ORTE_PROC_MY_NAME, &op->msg->dst)) {
128 return mca_oob_ud_send_self (op->msg);
129 }
130
131 /* if we have a route to this peer, then we can reach it */
132 hop = orte_routed.get_route(NULL, &op->msg->dst);
133 if (ORTE_JOBID_INVALID == hop.jobid ||
134 ORTE_VPID_INVALID == hop.vpid) {
135 ORTE_ERROR_LOG(ORTE_ERR_UNREACH);
136 return ORTE_ERR_UNREACH;
137 }
138
139 rc = mca_oob_ud_peer_lookup (&hop, &peer);
140 if(ORTE_SUCCESS != rc || NULL == peer) {
141 ORTE_ERROR_LOG((NULL == peer) ? ORTE_ERR_UNREACH : rc);
142 return (NULL == peer) ? ORTE_ERR_UNREACH : rc;
143 }
144
145 opal_output_verbose(2, orte_oob_base_framework.framework_output,
146 "%s oob:ud:send_nb to pear %s via hop %s",
147 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
148 ORTE_NAME_PRINT(&op->msg->dst), ORTE_NAME_PRINT(&hop));
149
150 /* NTH: TODO -- get a random port? */
151 port = (mca_oob_ud_port_t *) opal_list_get_first (&((mca_oob_ud_device_t *)peer->peer_context)->ports);
152
153 send_req = OBJ_NEW(mca_oob_ud_req_t);
154 if (!send_req) {
155 ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
156 return ORTE_ERR_OUT_OF_RESOURCE;
157 }
158
159 /* fill in request */
160 send_req->req_target = op->msg->dst;
161 send_req->req_origin = op->msg->origin;
162 send_req->req_tag = op->msg->tag;
163 send_req->req_seq_num = op->msg->seq_num;
164
165 if (op->msg->data != NULL) {
166 size = op->msg->count;
167
168 send_req->req_data_type = MCA_OOB_UD_REQ_TR;
169
170 send_req->req_data.buf.p = (char *)calloc(size, sizeof(char));
171 memcpy(send_req->req_data.buf.p, op->msg->data, op->msg->count);
172 send_req->req_data.buf.size = op->msg->count;
173 } else {
174 MCA_OOB_UD_IOV_SIZE(op->msg, size);
175
176 if (op->msg->iov != NULL) {
177 send_req->req_data_type = MCA_OOB_UD_REQ_IOV;
178 send_req->req_data.iov.uiov = op->msg->iov;
179 send_req->req_data.iov.count = op->msg->count;
180 } else {
181 send_req->req_data_type = MCA_OOB_UD_REQ_BUF;
182
183 opal_buffer_t *buffer;
184 buffer = OBJ_NEW(opal_buffer_t);
185
186 if (OPAL_SUCCESS != (rc = opal_dss.copy_payload(buffer, op->msg->buffer))) {
187 ORTE_ERROR_LOG(rc);
188 OBJ_RELEASE(buffer);
189 return rc;
190 }
191
192 if (OPAL_SUCCESS != (rc = opal_dss.unload(buffer, (void **)&send_req->req_data.buf.p, &send_req->req_data.buf.size)))
193 {
194 ORTE_ERROR_LOG(rc);
195 OBJ_RELEASE(buffer);
196 free(send_req->req_data.buf.p);
197 return rc;
198 }
199 OBJ_RELEASE(buffer);
200 }
201 }
202 send_req->rml_msg = op->msg;
203 send_req->req_cbdata = op->msg->cbdata;
204 send_req->req_peer = peer;
205 send_req->req_mtu = port->mtu;
206 send_req->req_port = port;
207 send_req->req_rc = 0;
208
209 send_req->state = MCA_OOB_UD_REQ_PENDING;
210 send_req->type = MCA_OOB_UD_REQ_SEND;
211
212 OBJ_RETAIN(peer);
213
214 if (size + sizeof (mca_oob_ud_msg_hdr_t) <= (unsigned int)port->mtu) {
215 send_eager = true;
216 }
217
218 rc = mca_oob_ud_msg_get (port, send_req, &port->listen_qp, peer, false, &req_msg);
219 if (ORTE_SUCCESS != rc) {
220 OBJ_RELEASE (send_req);
221 return rc;
222 }
223
224 /* fill in message header */
225 req_msg->hdr->msg_type = MCA_OOB_UD_MSG_REQUEST;
226 req_msg->hdr->msg_rem_ctx = send_req;
227
228 req_msg->hdr->msg_origin = op->msg->origin;
229 req_msg->hdr->msg_target = op->msg->dst;
230 req_msg->hdr->msg_seq_num = op->msg->seq_num;
231
232 req_msg->hdr->msg_data.req.data_len = size;
233 req_msg->hdr->msg_data.req.mtu = port->mtu;
234 req_msg->hdr->msg_data.req.tag = op->msg->tag;
235
236 if (MCA_OOB_UD_REQ_IOV == send_req->req_data_type) {
237 opal_output_verbose(10, orte_oob_base_framework.framework_output,
238 "%s-%s send_nb: tag %d size %lu. msg: %p. peer = %p. req = %p."
239 "count = %d. uiov = %p.\n",
240 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
241 ORTE_NAME_PRINT(&op->msg->dst),
242 op->msg->tag, (unsigned long)size,
243 (void *) req_msg,
244 (void *) peer, (void *) send_req,
245 send_req->req_data.iov.count, (void *) send_req->req_data.iov.uiov);
246 } else {
247 opal_output_verbose(10, orte_oob_base_framework.framework_output,
248 "%s-%s send_nb: tag %d size %lu. msg: %p. peer = %p. req = %p."
249 "buffer = %p.\n",
250 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
251 ORTE_NAME_PRINT(&op->msg->dst),
252 op->msg->tag, (unsigned long)size,
253 (void *) req_msg,
254 (void *) peer, (void *) send_req, (void *) send_req->req_data.buf.p);
255 }
256
257 if (!send_eager) {
258 mca_oob_ud_req_append_to_list (send_req, &mca_oob_ud_component.ud_active_sends);
259
260 /* send request */
261 return mca_oob_ud_msg_post_send (req_msg);
262 }
263
264 pack_ptr = (char *)(req_msg->hdr + 1);
265
266 if (op->msg->iov != NULL) {
267 for (i = 0 ; i < op->msg->count ; ++i) {
268 memcpy (pack_ptr, op->msg->iov[i].iov_base, op->msg->iov[i].iov_len);
269 pack_ptr += op->msg->iov[i].iov_len;
270 }
271 } else {
272 memcpy(pack_ptr, send_req->req_data.buf.p, send_req->req_data.buf.size);
273 }
274
275 send_req->req_list = NULL;
276
277 req_msg->hdr->msg_data.req.data_follows = true;
278
279 req_msg->cbfunc = mca_oob_ud_send_cb;
280 req_msg->req = send_req;
281
282 do {
283 /* send request */
284 rc = mca_oob_ud_msg_post_send (req_msg);
285 if (ORTE_SUCCESS != rc) {
286 ORTE_ERROR_LOG(rc);
287 break;
288 }
289 } while (0);
290
291 return rc;
292 }
293
mca_oob_ud_send_try_to(int fd,short event,void * ctx)294 static void mca_oob_ud_send_try_to (int fd, short event, void *ctx)
295 {
296 OPAL_THREAD_LOCK(&mca_oob_ud_component.ud_match_lock);
297 (void) mca_oob_ud_send_try ((mca_oob_ud_req_t *) ctx);
298 OPAL_THREAD_UNLOCK(&mca_oob_ud_component.ud_match_lock);
299 }
300
mca_oob_ud_send_try(mca_oob_ud_req_t * send_req)301 int mca_oob_ud_send_try (mca_oob_ud_req_t *send_req) {
302 int wr_index, wr_count, sge_count, sge_index, iov_index;
303 unsigned int iov_left, iov_offset, packet_size;
304 const unsigned int mtu = send_req->req_mtu;
305 const struct timeval aquire_timeout = {0, 500000};
306 mca_oob_ud_msg_t *com_msg;
307 int data_len;
308 int rc = ORTE_SUCCESS;
309
310 opal_output_verbose(10, orte_oob_base_framework.framework_output,
311 "%s oob:ud:send_try sending to %s, tag = %d, "
312 "req = %p",
313 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
314 ORTE_NAME_PRINT(&send_req->req_peer->peer_name),
315 send_req->req_tag, (void *) send_req);
316
317 do {
318 if (NULL == send_req->req_qp) {
319 rc = mca_oob_ud_qp_data_aquire (send_req->req_port, &send_req->req_qp);
320 if (ORTE_SUCCESS != rc) {
321 break;
322 }
323 }
324
325 (void) mca_oob_ud_qp_purge (send_req->req_qp);
326
327 rc = mca_oob_ud_msg_get (send_req->req_port, send_req, send_req->req_qp, send_req->req_peer, false,
328 &com_msg);
329 if (ORTE_SUCCESS != rc) {
330 break;
331 }
332
333 if (MCA_OOB_UD_REQ_IOV == send_req->req_data_type) {
334 if (NULL == send_req->req_data.iov.mr) {
335 /* allocate space for memory registers */
336 send_req->req_data.iov.mr = (struct ibv_mr **) calloc (send_req->req_data.iov.count, sizeof (struct ibv_mr *));
337 if (NULL == send_req->req_data.iov.mr) {
338 rc = ORTE_ERR_OUT_OF_RESOURCE;
339 ORTE_ERROR_LOG(rc);
340 break;
341 }
342 }
343
344 rc = mca_oob_ud_register_iov (send_req->req_data.iov.uiov, send_req->req_data.iov.count,
345 send_req->req_data.iov.mr, send_req->req_port->device->ib_pd,
346 mtu, &sge_count, &wr_count, &data_len);
347
348 if (ORTE_SUCCESS != rc) {
349 break;
350 }
351 } else {
352 data_len = send_req->req_data.buf.size;
353 rc = mca_oob_ud_register_buf(send_req->req_data.buf.p, send_req->req_data.buf.size,
354 &send_req->req_data.buf.mr, send_req->req_port->device->ib_pd,
355 mtu, &sge_count, &wr_count);
356
357 if (ORTE_SUCCESS != rc) {
358 break;
359 }
360 }
361
362 wr_count = (data_len + mtu - 1) / mtu;
363
364 if (data_len > 0) {
365 data_len = data_len + 0;
366 }
367
368 if (MCA_OOB_UD_REQ_IOV == send_req->req_data_type) {
369 opal_output_verbose(5, orte_oob_base_framework.framework_output,
370 "%s oob:ud:send_try sending %d bytes in %d "
371 "work requests, %d sges. uiov = %p.", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), data_len,
372 wr_count, sge_count, (void *) send_req->req_data.iov.uiov);
373 } else {
374 opal_output_verbose(5, orte_oob_base_framework.framework_output,
375 "%s oob:ud:send_try sending %d bytes in %d "
376 "work requests, %d sges. buf = %p", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), data_len,
377 wr_count, sge_count, (void *) send_req->req_data.buf.p);
378 }
379
380 if (wr_count && NULL == send_req->req_wr.send) {
381 send_req->req_wr.send = (struct ibv_send_wr *) calloc (wr_count, sizeof (struct ibv_send_wr));
382 if (NULL == send_req->req_wr.send) {
383 rc = ORTE_ERR_OUT_OF_RESOURCE;
384 ORTE_ERROR_LOG(rc);
385 break;
386 }
387 }
388
389 if (wr_count && NULL == send_req->req_sge) {
390 send_req->req_sge = (struct ibv_sge *) calloc (sge_count, sizeof (struct ibv_sge));
391
392 if (NULL == send_req->req_sge) {
393 rc = ORTE_ERR_OUT_OF_RESOURCE;
394 ORTE_ERROR_LOG(rc);
395 break;
396 }
397 }
398
399 if (MCA_OOB_UD_REQ_IOV == send_req->req_data_type) {
400 opal_output_verbose(10, orte_oob_base_framework.framework_output,
401 "%s oob:ud:send_try posting message using iovec",
402 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
403
404 iov_left = send_req->req_data.iov.uiov[0].iov_len;
405 iov_offset = 0;
406 iov_index = 0;
407
408 for (wr_index = 0, sge_index = 0 ; wr_index < wr_count ; ++wr_index) {
409 int sge_first = sge_index;
410
411 packet_size = 0;
412
413 do {
414 int to_send = min (iov_left, mtu - packet_size);
415
416 mca_oob_ud_fill_sge(send_req->req_sge + sge_index++,
417 (char *)send_req->req_data.iov.uiov[iov_index].iov_base + iov_offset,
418 to_send, send_req->req_data.iov.mr[iov_index]->lkey);
419
420 iov_offset += to_send;
421 iov_left -= to_send;
422 packet_size += to_send;
423
424 if (0 == iov_left) {
425 iov_index++;
426 iov_offset = 0;
427
428 if (iov_index < send_req->req_data.iov.count) {
429 iov_left = send_req->req_data.iov.uiov[iov_index].iov_len;
430 }
431 }
432 } while ((packet_size < mtu) && (iov_left > 0));
433
434 mca_oob_ud_fill_send_wr(send_req->req_wr.send + wr_index,
435 send_req->req_sge + sge_first,
436 sge_index - sge_first, send_req->req_peer);
437
438 /* we don't care about completions for data */
439 send_req->req_wr.send[wr_index].send_flags = IBV_SEND_SOLICITED;
440
441 /* sequence number */
442 send_req->req_wr.send[wr_index].imm_data = wr_index;
443 send_req->req_wr.send[wr_index].wr.ud.remote_qpn = send_req->req_rem_qpn;
444 send_req->req_wr.send[wr_index].opcode = IBV_WR_SEND_WITH_IMM;
445
446 if (wr_index + 1 < wr_count) {
447 send_req->req_wr.send[wr_index].next = send_req->req_wr.send + wr_index + 1;
448 }
449 }
450 } else {//data is in buffer
451 unsigned int buffer_offset = 0;
452 unsigned int buffer_size = send_req->req_data.buf.size;
453
454 opal_output_verbose(10, orte_oob_base_framework.framework_output,
455 "%s oob:ud:send_try posting message using buffer",
456 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
457
458 for (wr_index = 0, sge_index = 0 ; wr_index < wr_count ; ++wr_index) {
459 int sge_first = sge_index;
460
461 packet_size = 0;
462
463 do {
464 int to_send = min (buffer_size, mtu - packet_size);
465
466 mca_oob_ud_fill_sge(send_req->req_sge + sge_index++,
467 (char *)send_req->req_data.buf.p + buffer_offset,
468 to_send, send_req->req_data.buf.mr->lkey);
469
470 buffer_offset += to_send;
471 buffer_size -= to_send;
472 packet_size += to_send;
473 } while ((packet_size < mtu) && (buffer_size > 0));
474
475 mca_oob_ud_fill_send_wr(send_req->req_wr.send + wr_index,
476 send_req->req_sge + sge_first,
477 sge_index - sge_first, send_req->req_peer);
478
479 /* we don't care about completions for data */
480 send_req->req_wr.send[wr_index].send_flags = IBV_SEND_SOLICITED;
481
482 /* sequence number */
483 send_req->req_wr.send[wr_index].imm_data = wr_index;
484 send_req->req_wr.send[wr_index].wr.ud.remote_qpn = send_req->req_rem_qpn;
485 send_req->req_wr.send[wr_index].opcode = IBV_WR_SEND_WITH_IMM;
486
487 opal_output_verbose(10, orte_oob_base_framework.framework_output,
488 "%s oob:ud:send_try imm_data = %d",
489 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), wr_index);
490
491 if (wr_index + 1 < wr_count) {
492 send_req->req_wr.send[wr_index].next = send_req->req_wr.send + wr_index + 1;
493 }
494 }
495 }
496
497 /* send data */
498 rc = mca_oob_ud_qp_post_send (send_req->req_qp, send_req->req_wr.send, 0);
499 if (ORTE_SUCCESS != rc) {
500 ORTE_ERROR_LOG(rc);
501 break;
502 }
503
504 opal_output_verbose(10, orte_oob_base_framework.framework_output,
505 "%s oob:ud:send_try posting completion message",
506 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
507
508 /* Fill in completion message. This message will go to the peers listen QP but
509 must originate from our data qp to ensure that it is sent last. */
510 com_msg->hdr->msg_type = MCA_OOB_UD_MSG_COMPLETE;
511 com_msg->hdr->msg_lcl_ctx = send_req->req_rem_ctx;
512 com_msg->hdr->msg_rem_ctx = send_req;
513
514 /* send message header */
515 rc = mca_oob_ud_msg_post_send (com_msg);
516
517 /* post_send already returned the message */
518 com_msg = NULL;
519 } while (0);
520
521 if (ORTE_ERR_TEMP_OUT_OF_RESOURCE == rc) {
522 /* set timer to retry post */
523 mca_oob_ud_req_timer_set (send_req, &aquire_timeout, 1, mca_oob_ud_send_try_to);
524 rc = ORTE_SUCCESS;
525 }
526
527 if (ORTE_SUCCESS != rc) {
528 ORTE_ERROR_LOG(rc);
529 /* damn */
530 return mca_oob_ud_send_complete (send_req, rc);
531 }
532
533 send_req->state = MCA_OOB_UD_REQ_ACTIVE;
534
535 return rc;
536 }
537
mca_oob_ud_send_complete(mca_oob_ud_req_t * send_req,int rc)538 int mca_oob_ud_send_complete (mca_oob_ud_req_t *send_req, int rc)
539 {
540 mca_oob_ud_req_complete (send_req, rc);
541
542 return rc;
543 }
544