1 /* -*- Mode: C; c-basic-offset:4 ; -*- */
2 /*
3  *  (C) 2010 by Argonne National Laboratory.
4  *      See COPYRIGHT in top-level directory.
5  */
6 #include "mpid_nem_impl.h"
7 #include "mpid_nem_datatypes.h"
8 
9 MPIU_SUPPRESS_OSX_HAS_NO_SYMBOLS_WARNING;
10 
11 #if defined(HAVE_VMSPLICE)
12 
13 /* must come first for now */
14 #define _GNU_SOURCE
15 #include <fcntl.h>
16 #include <sys/uio.h>
17 
18 #include "mpid_nem_impl.h"
19 #include "mpid_nem_datatypes.h"
20 
21 
22 /* These are for maintaining a linked-list of outstanding requests on which we
23    can make progress. */
24 struct lmt_vmsplice_node {
25     struct lmt_vmsplice_node *next;
26     int pipe_fd;
27     MPID_Request *req;
28 };
29 
30 /* MT: this stack is not thread-safe */
31 static struct lmt_vmsplice_node *outstanding_head = NULL;
32 
33 /* Returns true if the IOV has been completely xferred, false otherwise.
34 
35    iov_count and iov_offset are pointers so that this function can manipulate
36    them */
adjust_partially_xferred_iov(MPID_IOV iov[],int * iov_offset,int * iov_count,int bytes_xferred)37 static int adjust_partially_xferred_iov(MPID_IOV iov[], int *iov_offset,
38                                         int *iov_count, int bytes_xferred)
39 {
40     int i;
41     int complete = 1;
42 
43     for (i = *iov_offset; i < (*iov_offset + *iov_count); ++i)
44     {
45         if (bytes_xferred < iov[i].MPID_IOV_LEN)
46         {
47             iov[i].MPID_IOV_BUF = (char *)iov[i].MPID_IOV_BUF + bytes_xferred;
48             iov[i].MPID_IOV_LEN -= bytes_xferred;
49             /* iov_count should be equal to the number of iov's remaining */
50             *iov_count -= (i - *iov_offset);
51             *iov_offset = i;
52             complete = 0;
53             break;
54         }
55         bytes_xferred -= iov[i].MPID_IOV_LEN;
56     }
57 
58     return complete;
59 }
60 
check_req_complete(MPIDI_VC_t * vc,MPID_Request * req,int * complete)61 static inline int check_req_complete(MPIDI_VC_t *vc, MPID_Request *req, int *complete)
62 {
63     int mpi_errno = MPI_SUCCESS;
64     int (*reqFn)(MPIDI_VC_t *, MPID_Request *, int *);
65     reqFn = req->dev.OnDataAvail;
66     if (reqFn) {
67         *complete = 0;
68 
69         /* XXX DJG FIXME this feels like a hack */
70         req->dev.iov_count = MPID_IOV_LIMIT;
71         req->dev.iov_offset = 0;
72 
73         mpi_errno = reqFn(vc, req, complete);
74         if (mpi_errno) MPIU_ERR_POP(mpi_errno);
75     }
76     else {
77         *complete = 1;
78         MPIDI_CH3U_Request_complete(req);
79     }
80 
81 fn_fail:
82     return mpi_errno;
83 }
84 
85 /* fills in req->dev.iov{,_offset,_count} based on the datatype info in the
86    request, creating a segment if necessary */
populate_iov_from_req(MPID_Request * req)87 static int populate_iov_from_req(MPID_Request *req)
88 {
89     int mpi_errno = MPI_SUCCESS;
90     int dt_contig;
91     MPI_Aint dt_true_lb;
92     MPIDI_msg_sz_t data_sz;
93     MPID_Datatype * dt_ptr;
94 
95     /* find out contig/noncontig, size, and lb for the datatype */
96     MPIDI_Datatype_get_info(req->dev.user_count, req->dev.datatype,
97                             dt_contig, data_sz, dt_ptr, dt_true_lb);
98 
99     if (dt_contig) {
100         /* handle the iov creation ourselves */
101         req->dev.iov[0].MPID_IOV_BUF = (char *)req->dev.user_buf + dt_true_lb;
102         req->dev.iov[0].MPID_IOV_LEN = data_sz;
103         req->dev.iov_count = 1;
104     }
105     else {
106         /* use the segment routines to handle the iovec creation */
107         MPIU_Assert(req->dev.segment_ptr == NULL);
108 
109         req->dev.iov_count = MPID_IOV_LIMIT;
110         req->dev.iov_offset = 0;
111 
112         /* XXX DJG FIXME where is this segment freed? */
113         req->dev.segment_ptr = MPID_Segment_alloc();
114         MPIU_ERR_CHKANDJUMP1((req->dev.segment_ptr == NULL), mpi_errno,
115                              MPI_ERR_OTHER, "**nomem",
116                              "**nomem %s", "MPID_Segment_alloc");
117         MPID_Segment_init(req->dev.user_buf, req->dev.user_count,
118                           req->dev.datatype, req->dev.segment_ptr, 0);
119         req->dev.segment_first = 0;
120         req->dev.segment_size = data_sz;
121 
122 
123         /* FIXME we should write our own function that isn't dependent on
124            the in-request iov array.  This will let us use IOVs that are
125            larger than MPID_IOV_LIMIT. */
126         mpi_errno = MPIDI_CH3U_Request_load_send_iov(req, &req->dev.iov[0],
127                                                      &req->dev.iov_count);
128         if (mpi_errno) MPIU_ERR_POP(mpi_errno);
129     }
130 
131 fn_fail:
132     return mpi_errno;
133 }
134 
do_vmsplice(MPID_Request * sreq,int pipe_fd,MPID_IOV iov[],int * iov_offset,int * iov_count,int * complete)135 static int do_vmsplice(MPID_Request *sreq, int pipe_fd, MPID_IOV iov[],
136                        int *iov_offset, int *iov_count, int *complete)
137 {
138     int mpi_errno = MPI_SUCCESS;
139     ssize_t err;
140 
141 #if 1
142     err = vmsplice(pipe_fd, &iov[*iov_offset], *iov_count, SPLICE_F_NONBLOCK);
143 #else
144     err = writev(pipe_fd, &iov[*iov_offset], *iov_count);
145 #endif
146 
147     if (err < 0) {
148         if (errno == EAGAIN) goto fn_exit;
149         MPIU_ERR_CHKANDJUMP2(errno != EAGAIN, mpi_errno, MPI_ERR_OTHER, "**vmsplice",
150                              "**vmsplice %d %s", errno, MPIU_Strerror(errno));
151     }
152 
153     *complete = adjust_partially_xferred_iov(iov, iov_offset, iov_count, err);
154     if (*complete) {
155         /* look for additional data to send and reload IOV if there is more */
156         mpi_errno = check_req_complete(sreq->ch.vc, sreq, complete);
157         if (mpi_errno) MPIU_ERR_POP(mpi_errno);
158 
159         if (*complete) {
160             err = close(pipe_fd);
161             MPIU_ERR_CHKANDJUMP(err < 0, mpi_errno, MPI_ERR_OTHER, "**close");
162             MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, ".... complete");
163         }
164     }
165 
166 fn_fail:
167 fn_exit:
168     return mpi_errno;
169 }
170 
171 #undef FUNCNAME
172 #define FUNCNAME MPID_nem_lmt_vmsplice_initiate_lmt
173 #undef FCNAME
174 #define FCNAME MPIDI_QUOTE(FUNCNAME)
MPID_nem_lmt_vmsplice_initiate_lmt(MPIDI_VC_t * vc,MPIDI_CH3_Pkt_t * pkt,MPID_Request * sreq)175 int MPID_nem_lmt_vmsplice_initiate_lmt(MPIDI_VC_t *vc, MPIDI_CH3_Pkt_t *pkt, MPID_Request *sreq)
176 {
177     int mpi_errno = MPI_SUCCESS;
178     MPID_nem_pkt_lmt_rts_t * const rts_pkt = (MPID_nem_pkt_lmt_rts_t *)pkt;
179     MPIDI_CH3I_VC *vc_ch = VC_CH(vc);
180     int complete = 0;
181     MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_LMT_VMSPLICE_INITIATE_LMT);
182 
183     MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_LMT_VMSPLICE_INITIATE_LMT);
184 
185     /* re-use the same pipe per-pair,per-sender */
186     if (vc_ch->lmt_copy_buf_handle == NULL) {
187         int err;
188         char *pipe_name;
189         MPIDI_CH3I_VC *vc_ch = VC_CH(vc);
190 
191         pipe_name = tempnam(NULL, "lmt_");
192         MPIU_ERR_CHKANDJUMP2(!pipe_name, mpi_errno, MPI_ERR_OTHER, "**tempnam",
193                              "**tempnam %d %s", errno, MPIU_Strerror(errno));
194 
195         vc_ch->lmt_copy_buf_handle = MPIU_Strdup(pipe_name);
196         /* XXX DJG hack */
197 #undef free
198         free(pipe_name);
199 
200         err = mkfifo(vc_ch->lmt_copy_buf_handle, 0660);
201         MPIU_ERR_CHKANDJUMP2(err < 0, mpi_errno, MPI_ERR_OTHER, "**mkfifo",
202                              "**mkfifo %d %s", errno, MPIU_Strerror(errno));
203     }
204 
205     /* can't start sending data yet, need full RTS/CTS handshake */
206 
207     MPID_nem_lmt_send_RTS(vc, rts_pkt, vc_ch->lmt_copy_buf_handle,
208                           strlen(vc_ch->lmt_copy_buf_handle)+1);
209 
210 fn_fail:
211 fn_exit:
212     MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_LMT_VMSPLICE_INITIATE_LMT);
213     return mpi_errno;
214 }
215 
do_readv(MPID_Request * rreq,int pipe_fd,MPID_IOV iov[],int * iov_offset,int * iov_count,int * complete)216 static int do_readv(MPID_Request *rreq, int pipe_fd, MPID_IOV iov[],
217                     int *iov_offset, int *iov_count, int *complete)
218 {
219     int mpi_errno = MPI_SUCCESS;
220     ssize_t nread;
221 
222     nread = readv(pipe_fd, &rreq->dev.iov[rreq->dev.iov_offset], rreq->dev.iov_count);
223     MPIU_ERR_CHKANDJUMP2(nread < 0 && errno != EAGAIN, mpi_errno, MPI_ERR_OTHER, "**read",
224                          "**readv %d %s", errno, MPIU_Strerror(errno));
225 
226     if (nread < 0) {
227         if (errno == EAGAIN) goto fn_exit;
228         MPIU_ERR_CHKANDJUMP2(errno != EAGAIN, mpi_errno, MPI_ERR_OTHER, "**vmsplice",
229                              "**vmsplice %d %s", errno, MPIU_Strerror(errno));
230     }
231 
232     *complete = adjust_partially_xferred_iov(iov, iov_offset, iov_count, nread);
233     if (*complete) {
234         /* look for additional data to send and reload IOV if there is more */
235         mpi_errno = check_req_complete(rreq->ch.vc, rreq, complete);
236         if (mpi_errno) MPIU_ERR_POP(mpi_errno);
237 
238         if (*complete) {
239             nread = close(pipe_fd);
240             MPIU_ERR_CHKANDJUMP(nread < 0, mpi_errno, MPI_ERR_OTHER, "**close");
241             MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, ".... complete");
242         }
243     }
244 
245 fn_fail:
246 fn_exit:
247     return mpi_errno;
248 }
249 
250 /* This function is called when an RTS message comes in. */
251 #undef FUNCNAME
252 #define FUNCNAME MPID_nem_lmt_vmsplice_start_recv
253 #undef FCNAME
254 #define FCNAME MPIDI_QUOTE(FUNCNAME)
MPID_nem_lmt_vmsplice_start_recv(MPIDI_VC_t * vc,MPID_Request * rreq,MPID_IOV s_cookie)255 int MPID_nem_lmt_vmsplice_start_recv(MPIDI_VC_t *vc, MPID_Request *rreq, MPID_IOV s_cookie)
256 {
257     int mpi_errno = MPI_SUCCESS;
258     int i;
259     int complete = 0;
260     struct lmt_vmsplice_node *node = NULL;
261     MPIDI_CH3I_VC *vc_ch = VC_CH(vc);
262     int pipe_fd;
263     MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_LMT_VMSPLICE_START_RECV);
264 
265     MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_LMT_VMSPLICE_START_RECV);
266 
267     if (vc_ch->lmt_recv_copy_buf_handle == NULL) {
268         MPIU_Assert(s_cookie.MPID_IOV_BUF != NULL);
269         vc_ch->lmt_recv_copy_buf_handle = MPIU_Strdup(s_cookie.MPID_IOV_BUF);
270     }
271 
272     /* XXX DJG FIXME in a real version we would want to cache the fd on the vc
273        so that we don't have two open's on the critical path every time. */
274     pipe_fd = open(vc_ch->lmt_recv_copy_buf_handle, O_NONBLOCK|O_RDONLY);
275     MPIU_ERR_CHKANDJUMP1(pipe_fd < 0, mpi_errno, MPI_ERR_OTHER, "**open",
276                          "**open %s", MPIU_Strerror(errno));
277 
278     MPID_nem_lmt_send_CTS(vc, rreq, NULL, 0);
279 
280     mpi_errno = populate_iov_from_req(rreq);
281     if (mpi_errno) MPIU_ERR_POP(mpi_errno);
282 
283     mpi_errno = do_readv(rreq, pipe_fd, rreq->dev.iov, &rreq->dev.iov_offset,
284                          &rreq->dev.iov_count, &complete);
285 
286     /* push request if not complete for progress checks later */
287     if (!complete) {
288         node = MPIU_Malloc(sizeof(struct lmt_vmsplice_node));
289         node->pipe_fd = pipe_fd;
290         node->req = rreq;
291         node->next = outstanding_head;
292         outstanding_head = node;
293         ++MPID_nem_local_lmt_pending;
294     }
295 
296 fn_exit:
297     MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_LMT_VMSPLICE_START_RECV);
298     return mpi_errno;
299 fn_fail:
300     goto fn_exit;
301 }
302 
303 /* XXX DJG FIXME at some point this should poll, much like the newtcp module.
304    But then we have that whole pollfd array to manage, which we don't really
305    need until this proof-of-concept proves itself. */
MPID_nem_lmt_vmsplice_progress(void)306 int MPID_nem_lmt_vmsplice_progress(void)
307 {
308     int mpi_errno = MPI_SUCCESS;
309     struct lmt_vmsplice_node *prev = NULL;
310     struct lmt_vmsplice_node *free_me = NULL;
311     struct lmt_vmsplice_node *cur = outstanding_head;
312     MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_LMT_VMSPLICE_PROGRESS);
313 
314     MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_LMT_VMSPLICE_PROGRESS);
315 
316     while (cur) {
317         int complete = 0;
318 
319         switch (MPIDI_Request_get_type(cur->req)) {
320             case MPIDI_REQUEST_TYPE_RECV:
321                 mpi_errno = do_readv(cur->req, cur->pipe_fd, cur->req->dev.iov,
322                                      &cur->req->dev.iov_offset,
323                                      &cur->req->dev.iov_count, &complete);
324                 /* FIXME: set the error status of the req and complete it, rather than POP */
325                 if (mpi_errno) MPIU_ERR_POP(mpi_errno);
326                 break;
327             case MPIDI_REQUEST_TYPE_SEND:
328                 mpi_errno = do_vmsplice(cur->req, cur->pipe_fd, cur->req->dev.iov,
329                                         &cur->req->dev.iov_offset,
330                                         &cur->req->dev.iov_count, &complete);
331                 /* FIXME: set the error status of the req and complete it, rather than POP */
332                 if (mpi_errno) MPIU_ERR_POP(mpi_errno);
333                 break;
334             default:
335                 MPIU_ERR_INTERNALANDJUMP(mpi_errno, "unexpected request type");
336                 break;
337         }
338 
339         if (complete) {
340             MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, ".... complete");
341 
342             /* remove the node from the list */
343             if (cur == outstanding_head) {
344                 outstanding_head = cur->next;
345                 prev = NULL;
346                 free_me = cur;
347                 cur = cur->next;
348             }
349             else {
350                 prev->next = cur->next;
351                 prev = cur;
352                 free_me = cur;
353                 cur = cur->next;
354             }
355             if (free_me) MPIU_Free(free_me);
356             --MPID_nem_local_lmt_pending;
357         }
358 
359         if (!cur) break; /* we might have made cur NULL above */
360 
361         prev = cur;
362         cur = cur->next;
363     }
364 
365 fn_exit:
366     MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_LMT_VMSPLICE_PROGRESS);
367     return mpi_errno;
368 fn_fail:
369     goto fn_exit;
370 }
371 
372 /* called when a CTS message is received */
373 #undef FUNCNAME
374 #define FUNCNAME MPID_nem_lmt_vmsplice_start_send
375 #undef FCNAME
376 #define FCNAME MPIDI_QUOTE(FUNCNAME)
MPID_nem_lmt_vmsplice_start_send(MPIDI_VC_t * vc,MPID_Request * sreq,MPID_IOV r_cookie)377 int MPID_nem_lmt_vmsplice_start_send(MPIDI_VC_t *vc, MPID_Request *sreq, MPID_IOV r_cookie)
378 {
379     int mpi_errno = MPI_SUCCESS;
380     MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_LMT_VMSPLICE_START_SEND);
381     int pipe_fd;
382     int complete;
383     struct lmt_vmsplice_node *node = NULL;
384     int (*reqFn)(MPIDI_VC_t *, MPID_Request *, int *);
385     MPIDI_CH3I_VC *vc_ch = VC_CH(vc);
386 
387     MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_LMT_VMSPLICE_START_SEND);
388 
389     /* Must do this after the other side has opened for reading, otherwise we
390        will error out with ENXIO.  This will be indicated by the receipt of a
391        CTS message. */
392     pipe_fd = open(vc_ch->lmt_copy_buf_handle, O_NONBLOCK|O_WRONLY);
393     MPIU_ERR_CHKANDJUMP1(pipe_fd < 0, mpi_errno, MPI_ERR_OTHER, "**open",
394                          "**open %s", MPIU_Strerror(errno));
395 
396     mpi_errno = populate_iov_from_req(sreq);
397     if (mpi_errno) MPIU_ERR_POP(mpi_errno);
398 
399     /* send the first flight */
400     sreq->ch.vc = vc; /* XXX DJG is this already assigned? */
401     complete = 0;
402     mpi_errno = do_vmsplice(sreq, pipe_fd, sreq->dev.iov,
403                             &sreq->dev.iov_offset, &sreq->dev.iov_count, &complete);
404     if (mpi_errno) MPIU_ERR_POP(mpi_errno);
405 
406     if (!complete) {
407         /* push for later progress */
408         node = MPIU_Malloc(sizeof(struct lmt_vmsplice_node));
409         node->pipe_fd = pipe_fd;
410         node->req = sreq;
411         node->next = outstanding_head;
412         outstanding_head = node;
413         ++MPID_nem_local_lmt_pending;
414     }
415 
416 fn_fail:
417 fn_exit:
418     MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_LMT_VMSPLICE_START_SEND);
419     return mpi_errno;
420 }
421 
422 #undef FUNCNAME
423 #define FUNCNAME MPIDI_CH3_MPID_nem_lmt_vmsplice_vc_terminated
424 #undef FCNAME
425 #define FCNAME MPIU_QUOTE(FUNCNAME)
MPIDI_CH3_MPID_nem_lmt_vmsplice_vc_terminated(MPIDI_VC_t * vc)426 int MPIDI_CH3_MPID_nem_lmt_vmsplice_vc_terminated(MPIDI_VC_t *vc)
427 {
428     int mpi_errno = MPI_SUCCESS;
429     MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_MPID_NEM_LMT_VMSPLICE_VC_TERMINATED);
430 
431     MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_MPID_NEM_LMT_VMSPLICE_VC_TERMINATED);
432 
433     /* FIXME: need to handle the case where a VC is terminated due to
434        a process failure.  We need to remove any outstanding LMT ops
435        for this VC. */
436 
437  fn_exit:
438     MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_MPID_NEM_LMT_VMSPLICE_VC_TERMINATED);
439     return mpi_errno;
440  fn_fail:
441     goto fn_exit;
442 }
443 
444 
445 /* --------------------------------------------------------------------------
446    The functions below are nops, stubs that might be used in later versions of
447    this code.
448    -------------------------------------------------------------------------- */
449 
450 /* called when a DONE message is received for a receive request */
451 #undef FUNCNAME
452 #define FUNCNAME MPID_nem_lmt_vmsplice_done_recv
453 #undef FCNAME
454 #define FCNAME MPIDI_QUOTE(FUNCNAME)
MPID_nem_lmt_vmsplice_done_recv(MPIDI_VC_t * vc,MPID_Request * rreq)455 int MPID_nem_lmt_vmsplice_done_recv(MPIDI_VC_t *vc, MPID_Request *rreq)
456 {
457     MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_LMT_VMSPLICE_DONE_RECV);
458 
459     MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_LMT_VMSPLICE_DONE_RECV);
460 
461     /* nop */
462 
463     MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_LMT_VMSPLICE_DONE_RECV);
464     return MPI_SUCCESS;
465 }
466 
467 #undef FUNCNAME
468 #define FUNCNAME MPID_nem_lmt_vmsplice_done_send
469 #undef FCNAME
470 #define FCNAME MPIDI_QUOTE(FUNCNAME)
MPID_nem_lmt_vmsplice_done_send(MPIDI_VC_t * vc,MPID_Request * sreq)471 int MPID_nem_lmt_vmsplice_done_send(MPIDI_VC_t *vc, MPID_Request *sreq)
472 {
473     int mpi_errno = MPI_SUCCESS;
474     MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_LMT_VMSPLICE_DONE_SEND);
475 
476     MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_LMT_VMSPLICE_DONE_SEND);
477 
478     /* nop */
479 
480     MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_LMT_VMSPLICE_DONE_SEND);
481     return MPI_SUCCESS;
482 }
483 
484 /* called when a COOKIE message is received */
485 #undef FUNCNAME
486 #define FUNCNAME MPID_nem_lmt_vmsplice_handle_cookie
487 #undef FCNAME
488 #define FCNAME MPIDI_QUOTE(FUNCNAME)
MPID_nem_lmt_vmsplice_handle_cookie(MPIDI_VC_t * vc,MPID_Request * req,MPID_IOV cookie)489 int MPID_nem_lmt_vmsplice_handle_cookie(MPIDI_VC_t *vc, MPID_Request *req, MPID_IOV cookie)
490 {
491     int mpi_errno = MPI_SUCCESS;
492     MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_LMT_VMSPLICE_HANDLE_COOKIE);
493 
494     MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_LMT_VMSPLICE_HANDLE_COOKIE);
495 
496     /* nop */
497 
498     MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_LMT_VMSPLICE_HANDLE_COOKIE);
499     return MPI_SUCCESS;
500 }
501 
502 #endif
503