1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
2 /*
3  * Copyright (c) 2011-2012 Los Alamos National Security, LLC. All rights
4  *                         reserved.
5  *               2014      Mellanox Technologies, Inc.
6  *                         All rights reserved.
7  * Copyright (c) 2015-2016 Intel, Inc.  All rights reserved.
8  * $COPYRIGHT$
9  *
10  * Additional copyrights may follow
11  *
12  * $HEADER$
13  *
14  */
15 #include "oob_ud_send.h"
16 #include "orte/mca/errmgr/errmgr.h"
17 
mca_oob_ud_send_cb(mca_oob_ud_msg_t * msg,int rc)18 static void mca_oob_ud_send_cb (mca_oob_ud_msg_t *msg, int rc)
19 {
20     mca_oob_ud_send_complete (msg->req, rc);
21 }
22 
mca_oob_ud_send_self(orte_rml_send_t * msg)23 static int mca_oob_ud_send_self (orte_rml_send_t *msg)
24 {
25     unsigned int srco, dsto;
26     mca_oob_ud_req_t *req;
27     int srci, dsti;
28     int rc, size;
29 
30     MCA_OOB_UD_IOV_SIZE(msg, size);
31 
32     opal_output_verbose(10, orte_oob_base_framework.framework_output,
33                          "%s mca_oob_ud_send_self: sending %d bytes to myself",
34                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), size);
35 
36     rc = mca_oob_ud_get_recv_req (*ORTE_PROC_MY_NAME, msg->tag, &req, (msg->iov != NULL) ? true : false);
37     if (ORTE_SUCCESS != rc) {
38         return rc;
39     }
40 
41     req->req_rem_data_len = size;
42     req->req_is_eager     = true;
43 
44     rc = mca_oob_ud_recv_alloc (req);
45     if (ORTE_SUCCESS != rc) {
46         ORTE_ERROR_LOG(rc);
47         if (MCA_OOB_UD_REQ_IOV == req->req_data_type) {
48             free (req->req_data.iov.uiov);
49         }
50         OBJ_RELEASE(req);
51         return rc;
52     }
53 
54     srci = dsti = 0;
55     srco = dsto = 0;
56 
57     if (msg->iov != NULL) {
58         do {
59             req->req_data_type = MCA_OOB_UD_REQ_IOV;
60             size_t copy = min(msg->iov[srci].iov_len - srco,
61                               req->req_data.iov.uiov[dsti].iov_len - dsto);
62 
63             memmove ((unsigned char *) req->req_data.iov.uiov[dsti].iov_base + dsto,
64                      (unsigned char *) msg->iov[srci].iov_base + srco, copy);
65 
66             srco += copy;
67             if (srco == msg->iov[srci].iov_len) {
68                 srci++;
69                 srco = 0;
70             }
71 
72             dsto += copy;
73             if (dsto == req->req_data.iov.uiov[dsti].iov_len) {
74                 dsti++;
75                 dsto = 0;
76             }
77         } while (srci < req->req_data.iov.count && dsti < msg->count);
78     } else {
79         req->req_data_type = MCA_OOB_UD_REQ_BUF;
80 
81         opal_buffer_t *buffer;
82         buffer = OBJ_NEW(opal_buffer_t);
83 
84         if (OPAL_SUCCESS != (rc = opal_dss.copy_payload(buffer, msg->buffer))) {
85             ORTE_ERROR_LOG(rc);
86             OBJ_RELEASE(buffer);
87             return rc;
88         }
89         if (OPAL_SUCCESS != (rc = opal_dss.unload(buffer, (void **)&req->req_data.buf.p, &req->req_data.buf.size)))
90         {
91             ORTE_ERROR_LOG(rc);
92             OBJ_RELEASE(buffer);
93             free(req->req_data.buf.p);
94             return rc;
95         }
96         OBJ_RELEASE(buffer);
97     }
98 
99     req->state = MCA_OOB_UD_REQ_COMPLETE;
100 
101     opal_output_verbose(10, orte_oob_base_framework.framework_output,
102                          "%s mca_oob_ud_send_self: complete. calling callbacks",
103                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
104 
105     /* queue up recv callback */
106     mca_oob_ud_event_queue_completed (req);
107 
108     req->rml_msg->status = ORTE_SUCCESS;
109 
110     return size;
111 }
112 
mca_oob_ud_process_send_nb(int fd,short args,void * cbdata)113 int mca_oob_ud_process_send_nb(int fd, short args, void *cbdata)
114 {
115     mca_oob_ud_msg_op_t *op = (mca_oob_ud_msg_op_t*)cbdata;
116 
117     orte_process_name_t hop;
118     mca_oob_ud_peer_t *peer;
119     mca_oob_ud_port_t *port;
120     mca_oob_ud_msg_t  *req_msg;
121     mca_oob_ud_req_t  *send_req;
122     bool send_eager = false;
123     char *pack_ptr;
124     int rc, size, i;
125 
126     if (OPAL_EQUAL == orte_util_compare_name_fields
127         (ORTE_NS_CMP_ALL, ORTE_PROC_MY_NAME, &op->msg->dst)) {
128         return mca_oob_ud_send_self (op->msg);
129     }
130 
131     /* if we have a route to this peer, then we can reach it */
132     hop = orte_routed.get_route(NULL, &op->msg->dst);
133     if (ORTE_JOBID_INVALID == hop.jobid ||
134         ORTE_VPID_INVALID == hop.vpid) {
135         ORTE_ERROR_LOG(ORTE_ERR_UNREACH);
136         return ORTE_ERR_UNREACH;
137     }
138 
139     rc = mca_oob_ud_peer_lookup (&hop, &peer);
140     if(ORTE_SUCCESS != rc || NULL == peer) {
141         ORTE_ERROR_LOG((NULL == peer) ? ORTE_ERR_UNREACH : rc);
142         return (NULL == peer) ? ORTE_ERR_UNREACH : rc;
143     }
144 
145     opal_output_verbose(2, orte_oob_base_framework.framework_output,
146                         "%s oob:ud:send_nb to pear %s via hop %s",
147                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
148                         ORTE_NAME_PRINT(&op->msg->dst), ORTE_NAME_PRINT(&hop));
149 
150     /* NTH: TODO -- get a random port? */
151     port = (mca_oob_ud_port_t *) opal_list_get_first (&((mca_oob_ud_device_t *)peer->peer_context)->ports);
152 
153     send_req = OBJ_NEW(mca_oob_ud_req_t);
154     if (!send_req) {
155         ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
156         return ORTE_ERR_OUT_OF_RESOURCE;
157     }
158 
159     /* fill in request */
160     send_req->req_target = op->msg->dst;
161     send_req->req_origin = op->msg->origin;
162     send_req->req_tag    = op->msg->tag;
163     send_req->req_seq_num  = op->msg->seq_num;
164 
165     if (op->msg->data != NULL) {
166         size = op->msg->count;
167 
168         send_req->req_data_type = MCA_OOB_UD_REQ_TR;
169 
170         send_req->req_data.buf.p = (char *)calloc(size, sizeof(char));
171         memcpy(send_req->req_data.buf.p, op->msg->data, op->msg->count);
172         send_req->req_data.buf.size = op->msg->count;
173     } else {
174         MCA_OOB_UD_IOV_SIZE(op->msg, size);
175 
176         if (op->msg->iov != NULL) {
177             send_req->req_data_type = MCA_OOB_UD_REQ_IOV;
178             send_req->req_data.iov.uiov   = op->msg->iov;
179             send_req->req_data.iov.count  = op->msg->count;
180         } else {
181             send_req->req_data_type = MCA_OOB_UD_REQ_BUF;
182 
183             opal_buffer_t *buffer;
184             buffer = OBJ_NEW(opal_buffer_t);
185 
186             if (OPAL_SUCCESS != (rc = opal_dss.copy_payload(buffer, op->msg->buffer))) {
187                 ORTE_ERROR_LOG(rc);
188                 OBJ_RELEASE(buffer);
189                 return rc;
190             }
191 
192             if (OPAL_SUCCESS != (rc = opal_dss.unload(buffer, (void **)&send_req->req_data.buf.p, &send_req->req_data.buf.size)))
193             {
194                 ORTE_ERROR_LOG(rc);
195                 OBJ_RELEASE(buffer);
196                 free(send_req->req_data.buf.p);
197                 return rc;
198             }
199             OBJ_RELEASE(buffer);
200         }
201     }
202     send_req->rml_msg = op->msg;
203     send_req->req_cbdata = op->msg->cbdata;
204     send_req->req_peer   = peer;
205     send_req->req_mtu    = port->mtu;
206     send_req->req_port   = port;
207     send_req->req_rc     = 0;
208 
209     send_req->state      = MCA_OOB_UD_REQ_PENDING;
210     send_req->type       = MCA_OOB_UD_REQ_SEND;
211 
212     OBJ_RETAIN(peer);
213 
214     if (size + sizeof (mca_oob_ud_msg_hdr_t) <= (unsigned int)port->mtu) {
215         send_eager = true;
216     }
217 
218     rc = mca_oob_ud_msg_get (port, send_req, &port->listen_qp, peer, false, &req_msg);
219     if (ORTE_SUCCESS != rc) {
220         OBJ_RELEASE (send_req);
221         return rc;
222     }
223 
224     /* fill in message header */
225     req_msg->hdr->msg_type     = MCA_OOB_UD_MSG_REQUEST;
226     req_msg->hdr->msg_rem_ctx  = send_req;
227 
228     req_msg->hdr->msg_origin   = op->msg->origin;
229     req_msg->hdr->msg_target   = op->msg->dst;
230     req_msg->hdr->msg_seq_num  = op->msg->seq_num;
231 
232     req_msg->hdr->msg_data.req.data_len = size;
233     req_msg->hdr->msg_data.req.mtu      = port->mtu;
234     req_msg->hdr->msg_data.req.tag      = op->msg->tag;
235 
236     if (MCA_OOB_UD_REQ_IOV == send_req->req_data_type) {
237         opal_output_verbose(10, orte_oob_base_framework.framework_output,
238                              "%s-%s send_nb: tag %d size %lu. msg: %p. peer = %p. req = %p."
239                              "count = %d. uiov = %p.\n",
240                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
241                              ORTE_NAME_PRINT(&op->msg->dst),
242                              op->msg->tag, (unsigned long)size,
243                              (void *) req_msg,
244                              (void *) peer, (void *) send_req,
245                               send_req->req_data.iov.count, (void *) send_req->req_data.iov.uiov);
246     } else {
247         opal_output_verbose(10, orte_oob_base_framework.framework_output,
248                              "%s-%s send_nb: tag %d size %lu. msg: %p. peer = %p. req = %p."
249                              "buffer = %p.\n",
250                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
251                              ORTE_NAME_PRINT(&op->msg->dst),
252                              op->msg->tag, (unsigned long)size,
253                              (void *) req_msg,
254                              (void *) peer, (void *) send_req, (void *) send_req->req_data.buf.p);
255     }
256 
257     if (!send_eager) {
258         mca_oob_ud_req_append_to_list (send_req, &mca_oob_ud_component.ud_active_sends);
259 
260         /* send request */
261         return mca_oob_ud_msg_post_send (req_msg);
262     }
263 
264     pack_ptr = (char *)(req_msg->hdr + 1);
265 
266     if (op->msg->iov != NULL) {
267         for (i = 0 ; i < op->msg->count ; ++i) {
268             memcpy (pack_ptr, op->msg->iov[i].iov_base, op->msg->iov[i].iov_len);
269             pack_ptr += op->msg->iov[i].iov_len;
270         }
271     } else {
272         memcpy(pack_ptr, send_req->req_data.buf.p, send_req->req_data.buf.size);
273     }
274 
275     send_req->req_list = NULL;
276 
277     req_msg->hdr->msg_data.req.data_follows = true;
278 
279     req_msg->cbfunc = mca_oob_ud_send_cb;
280     req_msg->req    = send_req;
281 
282     do {
283         /* send request */
284         rc = mca_oob_ud_msg_post_send (req_msg);
285         if (ORTE_SUCCESS != rc) {
286             ORTE_ERROR_LOG(rc);
287             break;
288         }
289     } while (0);
290 
291     return rc;
292 }
293 
mca_oob_ud_send_try_to(int fd,short event,void * ctx)294 static void mca_oob_ud_send_try_to (int fd, short event, void *ctx)
295 {
296     OPAL_THREAD_LOCK(&mca_oob_ud_component.ud_match_lock);
297     (void) mca_oob_ud_send_try ((mca_oob_ud_req_t *) ctx);
298     OPAL_THREAD_UNLOCK(&mca_oob_ud_component.ud_match_lock);
299 }
300 
mca_oob_ud_send_try(mca_oob_ud_req_t * send_req)301 int mca_oob_ud_send_try (mca_oob_ud_req_t *send_req) {
302     int wr_index, wr_count, sge_count, sge_index, iov_index;
303     unsigned int iov_left, iov_offset, packet_size;
304     const unsigned int mtu = send_req->req_mtu;
305     const struct timeval aquire_timeout = {0, 500000};
306     mca_oob_ud_msg_t *com_msg;
307     int data_len;
308     int rc = ORTE_SUCCESS;
309 
310     opal_output_verbose(10, orte_oob_base_framework.framework_output,
311                          "%s oob:ud:send_try sending to %s, tag = %d, "
312                          "req = %p",
313                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
314                          ORTE_NAME_PRINT(&send_req->req_peer->peer_name),
315                          send_req->req_tag, (void *) send_req);
316 
317     do {
318         if (NULL == send_req->req_qp) {
319             rc = mca_oob_ud_qp_data_aquire (send_req->req_port, &send_req->req_qp);
320             if (ORTE_SUCCESS != rc) {
321                 break;
322             }
323         }
324 
325         (void) mca_oob_ud_qp_purge (send_req->req_qp);
326 
327         rc = mca_oob_ud_msg_get (send_req->req_port, send_req, send_req->req_qp, send_req->req_peer, false,
328                                  &com_msg);
329         if (ORTE_SUCCESS != rc) {
330             break;
331         }
332 
333         if (MCA_OOB_UD_REQ_IOV == send_req->req_data_type) {
334             if (NULL == send_req->req_data.iov.mr) {
335                 /* allocate space for memory registers */
336                 send_req->req_data.iov.mr = (struct ibv_mr **) calloc (send_req->req_data.iov.count, sizeof (struct ibv_mr *));
337                 if (NULL == send_req->req_data.iov.mr) {
338                     rc = ORTE_ERR_OUT_OF_RESOURCE;
339                     ORTE_ERROR_LOG(rc);
340                     break;
341                 }
342             }
343 
344             rc = mca_oob_ud_register_iov (send_req->req_data.iov.uiov, send_req->req_data.iov.count,
345                                           send_req->req_data.iov.mr, send_req->req_port->device->ib_pd,
346                                           mtu, &sge_count, &wr_count, &data_len);
347 
348             if (ORTE_SUCCESS != rc) {
349                 break;
350             }
351         } else {
352             data_len = send_req->req_data.buf.size;
353             rc = mca_oob_ud_register_buf(send_req->req_data.buf.p, send_req->req_data.buf.size,
354                                          &send_req->req_data.buf.mr, send_req->req_port->device->ib_pd,
355                                          mtu, &sge_count, &wr_count);
356 
357             if (ORTE_SUCCESS != rc) {
358                 break;
359             }
360         }
361 
362         wr_count = (data_len + mtu - 1) / mtu;
363 
364         if (data_len > 0) {
365             data_len = data_len + 0;
366         }
367 
368         if (MCA_OOB_UD_REQ_IOV == send_req->req_data_type) {
369             opal_output_verbose(5, orte_oob_base_framework.framework_output,
370                                  "%s oob:ud:send_try sending %d bytes in %d "
371                                  "work requests, %d sges. uiov = %p.", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), data_len,
372                                  wr_count, sge_count, (void *) send_req->req_data.iov.uiov);
373         } else {
374             opal_output_verbose(5, orte_oob_base_framework.framework_output,
375                                  "%s oob:ud:send_try sending %d bytes in %d "
376                                  "work requests, %d sges. buf = %p", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), data_len,
377                                  wr_count, sge_count, (void *) send_req->req_data.buf.p);
378         }
379 
380         if (wr_count && NULL == send_req->req_wr.send) {
381             send_req->req_wr.send = (struct ibv_send_wr *) calloc (wr_count, sizeof (struct ibv_send_wr));
382             if (NULL == send_req->req_wr.send) {
383                 rc = ORTE_ERR_OUT_OF_RESOURCE;
384                 ORTE_ERROR_LOG(rc);
385                 break;
386             }
387         }
388 
389         if (wr_count && NULL == send_req->req_sge) {
390             send_req->req_sge = (struct ibv_sge *) calloc (sge_count, sizeof (struct ibv_sge));
391 
392             if (NULL == send_req->req_sge) {
393                 rc = ORTE_ERR_OUT_OF_RESOURCE;
394                 ORTE_ERROR_LOG(rc);
395                 break;
396             }
397         }
398 
399         if (MCA_OOB_UD_REQ_IOV == send_req->req_data_type) {
400             opal_output_verbose(10, orte_oob_base_framework.framework_output,
401                  "%s oob:ud:send_try posting message using iovec",
402                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
403 
404             iov_left   = send_req->req_data.iov.uiov[0].iov_len;
405             iov_offset = 0;
406             iov_index  = 0;
407 
408             for (wr_index = 0, sge_index = 0 ; wr_index < wr_count ; ++wr_index) {
409                 int sge_first = sge_index;
410 
411                 packet_size = 0;
412 
413                 do {
414                     int to_send = min (iov_left, mtu - packet_size);
415 
416                     mca_oob_ud_fill_sge(send_req->req_sge + sge_index++,
417                                         (char *)send_req->req_data.iov.uiov[iov_index].iov_base + iov_offset,
418                                         to_send, send_req->req_data.iov.mr[iov_index]->lkey);
419 
420                     iov_offset  += to_send;
421                     iov_left    -= to_send;
422                     packet_size += to_send;
423 
424                     if (0 == iov_left) {
425                         iov_index++;
426                         iov_offset = 0;
427 
428                         if (iov_index < send_req->req_data.iov.count) {
429                             iov_left = send_req->req_data.iov.uiov[iov_index].iov_len;
430                         }
431                     }
432                 } while ((packet_size < mtu) && (iov_left > 0));
433 
434                 mca_oob_ud_fill_send_wr(send_req->req_wr.send + wr_index,
435                                         send_req->req_sge + sge_first,
436                                         sge_index - sge_first, send_req->req_peer);
437 
438                 /* we don't care about completions for data  */
439                 send_req->req_wr.send[wr_index].send_flags       = IBV_SEND_SOLICITED;
440 
441                 /* sequence number */
442                 send_req->req_wr.send[wr_index].imm_data         = wr_index;
443                 send_req->req_wr.send[wr_index].wr.ud.remote_qpn = send_req->req_rem_qpn;
444                 send_req->req_wr.send[wr_index].opcode           = IBV_WR_SEND_WITH_IMM;
445 
446                 if (wr_index + 1 < wr_count) {
447                     send_req->req_wr.send[wr_index].next = send_req->req_wr.send + wr_index + 1;
448                 }
449             }
450         } else {//data is in buffer
451             unsigned int buffer_offset = 0;
452             unsigned int buffer_size = send_req->req_data.buf.size;
453 
454             opal_output_verbose(10, orte_oob_base_framework.framework_output,
455                                  "%s oob:ud:send_try posting message using buffer",
456                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
457 
458             for (wr_index = 0, sge_index = 0 ; wr_index < wr_count ; ++wr_index) {
459                 int sge_first = sge_index;
460 
461                 packet_size = 0;
462 
463                 do {
464                     int to_send = min (buffer_size, mtu - packet_size);
465 
466                     mca_oob_ud_fill_sge(send_req->req_sge + sge_index++,
467                                         (char *)send_req->req_data.buf.p + buffer_offset,
468                                         to_send, send_req->req_data.buf.mr->lkey);
469 
470                     buffer_offset  += to_send;
471                     buffer_size    -= to_send;
472                     packet_size += to_send;
473                 } while ((packet_size < mtu) && (buffer_size > 0));
474 
475                 mca_oob_ud_fill_send_wr(send_req->req_wr.send + wr_index,
476                                         send_req->req_sge + sge_first,
477                                         sge_index - sge_first, send_req->req_peer);
478 
479                 /* we don't care about completions for data  */
480                 send_req->req_wr.send[wr_index].send_flags       = IBV_SEND_SOLICITED;
481 
482                 /* sequence number */
483                 send_req->req_wr.send[wr_index].imm_data         = wr_index;
484                 send_req->req_wr.send[wr_index].wr.ud.remote_qpn = send_req->req_rem_qpn;
485                 send_req->req_wr.send[wr_index].opcode           = IBV_WR_SEND_WITH_IMM;
486 
487                 opal_output_verbose(10, orte_oob_base_framework.framework_output,
488                                      "%s oob:ud:send_try imm_data = %d",
489                                      ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), wr_index);
490 
491                 if (wr_index + 1 < wr_count) {
492                     send_req->req_wr.send[wr_index].next = send_req->req_wr.send + wr_index + 1;
493                 }
494             }
495         }
496 
497         /* send data */
498         rc = mca_oob_ud_qp_post_send (send_req->req_qp, send_req->req_wr.send, 0);
499         if (ORTE_SUCCESS != rc) {
500             ORTE_ERROR_LOG(rc);
501             break;
502         }
503 
504         opal_output_verbose(10, orte_oob_base_framework.framework_output,
505                              "%s oob:ud:send_try posting completion message",
506                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
507 
508         /* Fill in completion message. This message will go to the peers listen QP but
509            must originate from our data qp to ensure that it is sent last. */
510         com_msg->hdr->msg_type    = MCA_OOB_UD_MSG_COMPLETE;
511         com_msg->hdr->msg_lcl_ctx = send_req->req_rem_ctx;
512         com_msg->hdr->msg_rem_ctx = send_req;
513 
514         /* send message header */
515         rc = mca_oob_ud_msg_post_send (com_msg);
516 
517         /* post_send already returned the message */
518         com_msg = NULL;
519     } while (0);
520 
521     if (ORTE_ERR_TEMP_OUT_OF_RESOURCE == rc) {
522         /* set timer to retry post */
523         mca_oob_ud_req_timer_set (send_req, &aquire_timeout, 1, mca_oob_ud_send_try_to);
524         rc = ORTE_SUCCESS;
525     }
526 
527     if (ORTE_SUCCESS != rc) {
528         ORTE_ERROR_LOG(rc);
529         /* damn */
530         return mca_oob_ud_send_complete (send_req, rc);
531     }
532 
533     send_req->state = MCA_OOB_UD_REQ_ACTIVE;
534 
535     return rc;
536 }
537 
mca_oob_ud_send_complete(mca_oob_ud_req_t * send_req,int rc)538 int mca_oob_ud_send_complete (mca_oob_ud_req_t *send_req, int rc)
539 {
540     mca_oob_ud_req_complete (send_req, rc);
541 
542     return rc;
543 }
544