1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
2 /*
3 * Copyright (c) 2011-2012 Los Alamos National Security, LLC. All rights
4 * reserved.
5 * 2014 Mellanox Technologies, Inc.
6 * All rights reserved.
7 * Copyright (c) 2015 Intel, Inc. All rights reserved.
8 * $COPYRIGHT$
9 *
10 * Additional copyrights may follow
11 *
12 * $HEADER$
13 *
14 */
15
16 #include "orte_config.h"
17 #include "orte/types.h"
18 #include "opal/types.h"
19
20 #include "orte/mca/errmgr/errmgr.h"
21 #include "orte/runtime/orte_globals.h"
22 #include "orte/util/name_fns.h"
23 #include "orte/util/show_help.h"
24
25 #include "math.h"
26
27 #include "oob_ud_component.h"
28
29 #define min(a,b) ((a) < (b) ? (a) : (b))
30
31 /* Caller MUST hold the matching lock before calling */
mca_oob_ud_find_recv(opal_list_t * list,const orte_process_name_t name,const int tag,mca_oob_ud_req_t ** req)32 static inline int mca_oob_ud_find_recv (opal_list_t *list, const orte_process_name_t name,
33 const int tag, mca_oob_ud_req_t **req)
34 {
35 opal_list_item_t *item;
36 int rc = ORTE_ERR_NOT_FOUND;
37
38 *req = NULL;
39
40 OPAL_THREAD_LOCK(&mca_oob_ud_component.ud_match_lock);
41
42 for (item = opal_list_get_first (list) ; item != opal_list_get_end (list) ;
43 item = opal_list_get_next (item)) {
44 mca_oob_ud_req_t *recv_req = (mca_oob_ud_req_t *) item;
45
46 opal_output_verbose(15, orte_oob_base_framework.framework_output,
47 "%s oob:ud:find_recv matching against "
48 "peer: %s, tag: %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
49 ORTE_NAME_PRINT(&recv_req->req_origin), recv_req->req_tag);
50
51 if (OPAL_EQUAL == opal_dss.compare (&name, &recv_req->req_origin, ORTE_NAME) &&
52 tag == recv_req->req_tag) {
53 *req = recv_req;
54 rc = ORTE_SUCCESS;
55 break;
56 }
57 }
58
59 opal_output_verbose(15, orte_oob_base_framework.framework_output,
60 "%s oob:ud:find_recv %sfound",
61 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_SUCCESS != rc ? "not " : "");
62
63
64 OPAL_THREAD_UNLOCK(&mca_oob_ud_component.ud_match_lock);
65
66 if (ORTE_SUCCESS == rc) {
67 mca_oob_ud_req_append_to_list (*req, NULL);
68 }
69
70 return rc;
71 }
72
mca_oob_ud_get_recv_req(const orte_process_name_t name,const int tag,mca_oob_ud_req_t ** reqp,bool iovec_used)73 int mca_oob_ud_get_recv_req (const orte_process_name_t name, const int tag,
74 mca_oob_ud_req_t **reqp, bool iovec_used) {
75 mca_oob_ud_req_t *req;
76
77 opal_output_verbose(15, orte_oob_base_framework.framework_output,
78 "%s oob:ud:get_recv_req create receive request against: %s, tag: %d",
79 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
80 ORTE_NAME_PRINT(&name), tag);
81
82 *reqp = req = OBJ_NEW(mca_oob_ud_req_t);
83
84 req->req_origin = name;
85 req->req_tag = tag;
86 req->req_channel = ORTE_RML_INVALID_CHANNEL_NUM;
87 req->req_seq_num = 0;
88 /* this receive was not expected */
89 req->type = MCA_OOB_UD_REQ_RECV;
90
91 /* let mca_oob_ud_recv_alloc alloc memory for the receive */
92 if (iovec_used) {
93 req->req_data.iov.uiov = calloc (1, sizeof (struct iovec));
94 req->req_data_type = MCA_OOB_UD_REQ_IOV;
95 } else {
96 req->req_data_type = MCA_OOB_UD_REQ_BUF;
97 }
98 req->req_data.iov.count = 1;
99
100 return ORTE_SUCCESS;
101 }
102
mca_oob_ud_find_active_recv(const orte_process_name_t name,const int tag,mca_oob_ud_req_t ** req)103 static inline int mca_oob_ud_find_active_recv (const orte_process_name_t name, const int tag,
104 mca_oob_ud_req_t **req) {
105 opal_output_verbose(15, orte_oob_base_framework.framework_output,
106 "%s oob:ud:recv_match active receive request "
107 "against: %s, tag: %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
108 ORTE_NAME_PRINT(&name), tag);
109
110 return mca_oob_ud_find_recv (&mca_oob_ud_component.ud_active_recvs, name, tag, req);
111 }
112
mca_oob_ud_recv_try_to(int fd,short event,void * data)113 static void mca_oob_ud_recv_try_to (int fd, short event, void *data)
114 {
115 (void) mca_oob_ud_recv_try ((mca_oob_ud_req_t *) data);
116 }
117
mca_oob_ud_recv_try(mca_oob_ud_req_t * recv_req)118 int mca_oob_ud_recv_try (mca_oob_ud_req_t *recv_req)
119 {
120 int rc, data_len;
121 int wr_count, sge_count, wr_index, sge_index, iov_index;
122 unsigned int iov_left, iov_offset, packet_size;
123 const unsigned int mtu = recv_req->req_mtu;
124 struct timeval aquire_timeout = {0, 500000};
125 mca_oob_ud_msg_t *rep_msg = NULL;
126
127 opal_output_verbose(10, orte_oob_base_framework.framework_output,
128 "%s oob:ud:recv_try receiving from %s. recv_req = %p. rem ctx = %p",
129 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
130 ORTE_NAME_PRINT(&recv_req->req_peer->peer_name),
131 (void *)recv_req, (void *)recv_req->req_rem_ctx);
132
133 do {
134 if (NULL == recv_req->req_qp) {
135 rc = mca_oob_ud_qp_data_aquire (recv_req->req_port, &recv_req->req_qp);
136 if (ORTE_SUCCESS != rc) {
137 break;
138 }
139 }
140
141 (void) mca_oob_ud_qp_purge (recv_req->req_qp);
142
143 rc = mca_oob_ud_msg_get (recv_req->req_port, recv_req, &recv_req->req_port->listen_qp,
144 recv_req->req_peer, NULL, &rep_msg);
145 if (ORTE_SUCCESS != rc) {
146 break;
147 }
148
149 if (MCA_OOB_UD_REQ_IOV == recv_req->req_data_type) {
150 if (NULL == recv_req->req_data.iov.mr) {
151 /* allocate space for memory registers */
152 recv_req->req_data.iov.mr = (struct ibv_mr **) calloc (recv_req->req_data.iov.count, sizeof (struct ibv_mr *));
153 if (NULL == recv_req->req_data.iov.mr) {
154 ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
155 rc = ORTE_ERR_OUT_OF_RESOURCE;
156 break;
157 }
158 }
159
160 rc = mca_oob_ud_register_iov (recv_req->req_data.iov.uiov, recv_req->req_data.iov.count,
161 recv_req->req_data.iov.mr, recv_req->req_port->device->ib_pd,
162 mtu, &sge_count, &wr_count, &data_len);
163
164 if (ORTE_SUCCESS != rc) {
165 break;
166 }
167 } else {
168 data_len = recv_req->req_data.buf.size;
169 rc = mca_oob_ud_register_buf (recv_req->req_data.buf.p, recv_req->req_data.buf.size,
170 &recv_req->req_data.buf.mr, recv_req->req_port->device->ib_pd,
171 mtu, &sge_count, &wr_count);
172
173 if (ORTE_SUCCESS != rc) {
174 break;
175 }
176 }
177
178 data_len = min(data_len, recv_req->req_rem_data_len);
179 if (data_len < recv_req->req_rem_data_len) {
180 opal_output_verbose(5, orte_oob_base_framework.framework_output,
181 "%s oob:ud:recv_try receive buffers are not big. this is probably an error condition."
182 "data_len = %d, recv_req->req_rem_data_len = %d.",
183 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), data_len, recv_req->req_rem_data_len);
184 rc = ORTE_ERR_BAD_PARAM;
185 break;
186 }
187
188 wr_count = (data_len + mtu - 1) / mtu;
189 sge_count += wr_count;
190
191 opal_output_verbose(5, orte_oob_base_framework.framework_output,
192 "%s oob:ud:recv_try receiving %d bytes in %d "
193 "work requests, %d sges", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), data_len,
194 wr_count, sge_count);
195
196 recv_req->req_packet_count = wr_count;
197
198 if (NULL == recv_req->req_wr.recv) {
199 /* allocate work requests */
200 recv_req->req_wr.recv = (struct ibv_recv_wr *) calloc (wr_count, sizeof (struct ibv_recv_wr));
201 if (NULL == recv_req->req_wr.recv) {
202 ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
203 rc = ORTE_ERR_OUT_OF_RESOURCE;
204 break;
205 }
206 }
207
208 if (NULL == recv_req->req_sge) {
209 /* allocate scatter-gather lists. we need more to hold the grh */
210 recv_req->req_sge = (struct ibv_sge *) calloc (sge_count, sizeof (struct ibv_sge));
211 if (NULL == recv_req->req_sge) {
212 ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
213 rc = ORTE_ERR_OUT_OF_RESOURCE;
214 break;
215 }
216 }
217
218 if (NULL == recv_req->req_grh) {
219 /* allocate grh buffers */
220 recv_req->req_grh = (struct ibv_grh *) calloc (wr_count, sizeof (struct ibv_grh));
221 if (NULL == recv_req->req_grh) {
222 ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
223 rc = ORTE_ERR_OUT_OF_RESOURCE;
224 break;
225 }
226 }
227
228 if (NULL == recv_req->req_grh_mr) {
229 /* register grh buffers */
230 recv_req->req_grh_mr = ibv_reg_mr (recv_req->req_port->device->ib_pd, recv_req->req_grh,
231 wr_count * sizeof (struct ibv_grh),
232 IBV_ACCESS_LOCAL_WRITE);
233 if (NULL == recv_req->req_grh_mr) {
234 orte_show_help("help-oob-ud.txt", "reg-mr-failed", true,
235 orte_process_info.nodename, recv_req->req_grh,
236 wr_count * sizeof (struct ibv_grh), strerror(errno));
237 /* could not register memory */
238 rc = ORTE_ERR_OUT_OF_RESOURCE;
239 break;
240 }
241 }
242
243 rc = ORTE_SUCCESS;
244
245 if (MCA_OOB_UD_REQ_IOV == recv_req->req_data_type) {
246 iov_left = recv_req->req_data.iov.uiov[0].iov_len;
247 iov_offset = 0;
248 iov_index = 0;
249
250 opal_output_verbose(5, orte_oob_base_framework.framework_output,
251 "%s oob:ud: recv_req->req_data.iov.uiov[0].iov_len = %d",
252 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)recv_req->req_data.iov.uiov[0].iov_len);
253
254 for (wr_index = 0, sge_index = 0 ; wr_index < wr_count ; ++wr_index) {
255 int sge_first = sge_index;
256
257 packet_size = 0;
258
259 /* grh */
260 mca_oob_ud_fill_sge(recv_req->req_sge + sge_index++,
261 recv_req->req_grh + wr_index,
262 sizeof (struct ibv_grh),
263 recv_req->req_grh_mr->lkey);
264
265 do {
266 int to_recv = min (iov_left, mtu - packet_size);
267
268 mca_oob_ud_fill_sge(recv_req->req_sge + sge_index++,
269 (char *)recv_req->req_data.iov.uiov[iov_index].iov_base + iov_offset,
270 to_recv, recv_req->req_data.iov.mr[iov_index]->lkey);
271
272 iov_offset += to_recv;
273 iov_left -= to_recv;
274 packet_size += to_recv;
275
276 if (0 == iov_left) {
277 iov_index++;
278 iov_offset = 0;
279
280 if (iov_index < recv_req->req_data.iov.count) {
281 iov_left = recv_req->req_data.iov.uiov[iov_index].iov_len;
282 }
283 }
284 } while ((packet_size < mtu) && (iov_left > 0));
285
286 mca_oob_ud_fill_recv_wr(recv_req->req_wr.recv + wr_index,
287 recv_req->req_sge + sge_first,
288 sge_index - sge_first);
289
290 if (wr_index + 1 < wr_count) {
291 recv_req->req_wr.recv[wr_index].next = recv_req->req_wr.recv + wr_index + 1;
292 }
293 }
294 } else {
295 unsigned int buffer_left = recv_req->req_data.buf.size;
296 unsigned int buffer_offset = 0;
297
298 opal_output_verbose(5, orte_oob_base_framework.framework_output,
299 "%s oob:ud:recv_try recv_req->req_data.buf.size = %d",
300 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), recv_req->req_data.buf.size);
301
302 for (wr_index = 0, sge_index = 0 ; wr_index < wr_count ; ++wr_index) {
303 int sge_first = sge_index;
304
305 packet_size = 0;
306
307 /* grh */
308 mca_oob_ud_fill_sge(recv_req->req_sge + sge_index++,
309 recv_req->req_grh + wr_index,
310 sizeof (struct ibv_grh),
311 recv_req->req_grh_mr->lkey);
312
313 do {
314 int to_recv = min (buffer_left, mtu - packet_size);
315
316 mca_oob_ud_fill_sge(recv_req->req_sge + sge_index++,
317 (char *)recv_req->req_data.buf.p + buffer_offset,
318 to_recv, recv_req->req_data.buf.mr->lkey);
319
320 buffer_offset += to_recv;
321 buffer_left -= to_recv;
322 packet_size += to_recv;
323 } while ((packet_size < mtu) && (buffer_left > 0));
324
325 mca_oob_ud_fill_recv_wr(recv_req->req_wr.recv + wr_index,
326 recv_req->req_sge + sge_first,
327 sge_index - sge_first);
328
329 if (wr_index + 1 < wr_count) {
330 recv_req->req_wr.recv[wr_index].next = recv_req->req_wr.recv + wr_index + 1;
331 }
332 }
333 }
334
335 rc = mca_oob_ud_qp_post_recv (recv_req->req_qp, recv_req->req_wr.recv);
336 if (ORTE_SUCCESS != rc) {
337 break;
338 }
339
340 opal_output_verbose(5, orte_oob_base_framework.framework_output,
341 "%s oob:ud:recv_try posting reply message",
342 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
343
344 /* ok, we have a data queue pair */
345 rep_msg->hdr->msg_type = MCA_OOB_UD_MSG_REPLY;
346 rep_msg->hdr->msg_lcl_ctx = recv_req->req_rem_ctx;
347 rep_msg->hdr->msg_rem_ctx = recv_req;
348
349 rep_msg->hdr->msg_data.rep.qpn = recv_req->req_qp->ib_qp->qp_num;
350 rep_msg->hdr->msg_data.rep.data_len = data_len;
351 rep_msg->hdr->msg_data.rep.mtu = mtu;
352
353 rc = mca_oob_ud_msg_post_send (rep_msg);
354
355 /* post send already returned the message */
356 rep_msg = NULL;
357 } while (0);
358
359 if (ORTE_ERR_TEMP_OUT_OF_RESOURCE == rc) {
360 mca_oob_ud_req_timer_set (recv_req, &aquire_timeout, 1, mca_oob_ud_recv_try_to);
361 rc = ORTE_SUCCESS;
362 }
363
364 if (ORTE_SUCCESS != rc) {
365 /* bad stuff happened */
366 mca_oob_ud_req_complete (recv_req, rc);
367
368 if (mca_oob_ud_req_is_in_list(recv_req, &mca_oob_ud_component.ud_active_recvs)) {
369 opal_list_remove_item (&mca_oob_ud_component.ud_active_recvs, (opal_list_item_t *) recv_req);
370 }
371 OBJ_RELEASE(recv_req);
372 return rc;
373 }
374
375 recv_req->state = MCA_OOB_UD_REQ_ACTIVE;
376
377 return rc;
378 }
379
mca_oob_ud_recv_complete(mca_oob_ud_req_t * recv_req)380 int mca_oob_ud_recv_complete (mca_oob_ud_req_t *recv_req)
381 {
382 mca_oob_ud_msg_t *dataok;
383 int i, j, rc = ORTE_SUCCESS;
384 uint32_t expected;
385 bool error = false, out_of_order = false;
386 #if defined(HAVE_VALGRIND)
387 int iov_index;
388 #endif
389
390 opal_output_verbose(5, orte_oob_base_framework.framework_output,
391 "%s oob:ud:recv_complete req = %p",
392 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (void *) recv_req);
393
394 if (false == recv_req->req_is_eager) {
395 for (i = 0, expected = 0 ; i < recv_req->req_packet_count ; ) {
396 struct ibv_wc wc[10];
397
398 rc = ibv_poll_cq (recv_req->req_qp->ib_recv_cq, 10, wc);
399 for (j = 0 ; j < rc ; ++j) {
400 if (wc[j].imm_data != expected) {
401 out_of_order = true;
402 }
403 if (IBV_WC_SUCCESS != wc[j].status) {
404 error = true;
405 }
406
407 opal_output_verbose(5, orte_oob_base_framework.framework_output,
408 "%s oob:ud:recv_complete wc status = %d. imm data = %u. len = %d",
409 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), wc[j].status, wc[j].imm_data,
410 wc[j].byte_len);
411
412 expected++;
413 }
414
415 if (rc <= 0) {
416 break;
417 }
418
419 i += rc;
420 }
421
422 if (i != recv_req->req_packet_count || error || out_of_order) {
423 /* retry */
424 recv_req->state = MCA_OOB_UD_REQ_PENDING;
425
426 opal_output_verbose(5, orte_oob_base_framework.framework_output,
427 "%s oob:ud:recv_complete receive incomplete. error: %d, "
428 "out_of_order: %d packets: %d/%d. rc = %d, errno = %d.",
429 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), error, out_of_order, i,
430 recv_req->req_packet_count, rc, errno);
431 mca_oob_ud_recv_try (recv_req);
432
433 return ORTE_SUCCESS;
434 }
435
436 opal_output_verbose(10, orte_oob_base_framework.framework_output,
437 "%s oob:ud:recv_complete data received ok!",
438 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
439
440 /* send data ok and wait for ack */
441 rc = mca_oob_ud_msg_get (recv_req->req_port, recv_req, &recv_req->req_port->listen_qp,
442 recv_req->req_peer, false, &dataok);
443 if (ORTE_SUCCESS != rc) {
444 return rc;
445 }
446
447 dataok->hdr->msg_type = MCA_OOB_UD_MSG_DATA_OK;
448 dataok->hdr->msg_lcl_ctx = recv_req->req_rem_ctx;
449
450 rc = mca_oob_ud_msg_post_send (dataok);
451 if (ORTE_SUCCESS != rc) {
452 return rc;
453 }
454 }
455
456 #if defined(HAVE_VALGRIND)
457 for (iov_index = 0 ; iov_index < recv_req->req_count ; ++iov_index) {
458 VALGRIND_MAKE_MEM_DEFINED(recv_req->req_uiov[iov_index].iov_base,
459 recv_req->req_uiov[iov_index].iov_len);
460 }
461 #endif
462
463 mca_oob_ud_req_complete (recv_req, rc);
464
465 return ORTE_SUCCESS;
466 }
467
mca_oob_ud_recv_match_send(mca_oob_ud_port_t * port,mca_oob_ud_peer_t * peer,mca_oob_ud_msg_hdr_t * msg_hdr,mca_oob_ud_req_t ** reqp)468 int mca_oob_ud_recv_match_send (mca_oob_ud_port_t *port, mca_oob_ud_peer_t *peer, mca_oob_ud_msg_hdr_t *msg_hdr,
469 mca_oob_ud_req_t **reqp)
470 {
471 char *data = (msg_hdr->msg_data.req.data_follows ? (char *)(msg_hdr + 1) : NULL);
472 mca_oob_ud_req_t *req;
473 int rc, i;
474
475 *reqp = NULL;
476
477 opal_output_verbose(10, orte_oob_base_framework.framework_output,
478 "%s oob:ud:recv_incoming_send matching incoming "
479 "send from peer %s with tag %d (data_follows = %d, data = %p, iovec_use = %d)", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
480 ORTE_NAME_PRINT(&msg_hdr->msg_origin), msg_hdr->msg_data.req.tag,
481 msg_hdr->msg_data.req.data_follows, (void *)data, msg_hdr->msg_data.req.data_iovec_used);
482
483 rc = mca_oob_ud_get_recv_req (msg_hdr->msg_origin, msg_hdr->msg_data.req.tag, &req, msg_hdr->msg_data.req.data_iovec_used);
484 if (ORTE_SUCCESS != rc) {
485 ORTE_ERROR_LOG(rc);
486 return rc;
487 }
488
489 req->req_rem_ctx = msg_hdr->msg_rem_ctx;
490 req->req_port = port;
491 req->req_mtu = min(port->mtu, msg_hdr->msg_data.req.mtu);
492 req->req_origin = msg_hdr->msg_origin;
493 req->req_target = msg_hdr->msg_target;
494 req->req_rem_data_len = msg_hdr->msg_data.req.data_len;
495 req->req_channel = msg_hdr->msg_channel;
496 req->req_seq_num = msg_hdr->msg_seq_num;
497
498 do {
499 rc = mca_oob_ud_recv_alloc (req);
500 if (ORTE_SUCCESS != rc) {
501 ORTE_ERROR_LOG(rc);
502 free (req->req_data.iov.uiov);
503 OBJ_RELEASE(req);
504 req = NULL;
505 break;
506 }
507 req->req_peer = peer;
508 OBJ_RETAIN(req->req_peer);
509
510 if (NULL == data) {
511 req->state = MCA_OOB_UD_REQ_ACTIVE;
512 opal_output_verbose(10, orte_oob_base_framework.framework_output,
513 "%s oob:ud:recv_incoming_send request still active",
514 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
515 break;
516 }
517
518 opal_output_verbose(10, orte_oob_base_framework.framework_output,
519 "%s oob:ud:recv_incoming_send send was eager",
520 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
521
522 req->req_is_eager = true;
523
524 if (msg_hdr->msg_data.req.data_iovec_used) {
525 for (i = 0 ; i < req->req_data.iov.count; ++i) {
526 memcpy (req->req_data.iov.uiov[i].iov_base, data, req->req_data.iov.uiov[i].iov_len);
527 data += req->req_data.iov.uiov[i].iov_len;
528 }
529 } else {
530 memcpy(req->req_data.buf.p, data, msg_hdr->msg_data.req.data_len);
531 }
532
533 req->state = MCA_OOB_UD_REQ_COMPLETE;
534 } while (0);
535
536 *reqp = req;
537
538 return rc;
539 }
540