1 /* -*- Mode: C; c-basic-offset:4 ; -*- */
2 /*
3 * (C) 2010 by Argonne National Laboratory.
4 * See COPYRIGHT in top-level directory.
5 */
6 #include "mpid_nem_impl.h"
7 #include "mpid_nem_datatypes.h"
8
9 MPIU_SUPPRESS_OSX_HAS_NO_SYMBOLS_WARNING;
10
11 #if defined(HAVE_VMSPLICE)
12
13 /* must come first for now */
14 #define _GNU_SOURCE
15 #include <fcntl.h>
16 #include <sys/uio.h>
17
18 #include "mpid_nem_impl.h"
19 #include "mpid_nem_datatypes.h"
20
21
22 /* These are for maintaining a linked-list of outstanding requests on which we
23 can make progress. */
24 struct lmt_vmsplice_node {
25 struct lmt_vmsplice_node *next;
26 int pipe_fd;
27 MPID_Request *req;
28 };
29
30 /* MT: this stack is not thread-safe */
31 static struct lmt_vmsplice_node *outstanding_head = NULL;
32
33 /* Returns true if the IOV has been completely xferred, false otherwise.
34
35 iov_count and iov_offset are pointers so that this function can manipulate
36 them */
adjust_partially_xferred_iov(MPID_IOV iov[],int * iov_offset,int * iov_count,int bytes_xferred)37 static int adjust_partially_xferred_iov(MPID_IOV iov[], int *iov_offset,
38 int *iov_count, int bytes_xferred)
39 {
40 int i;
41 int complete = 1;
42
43 for (i = *iov_offset; i < (*iov_offset + *iov_count); ++i)
44 {
45 if (bytes_xferred < iov[i].MPID_IOV_LEN)
46 {
47 iov[i].MPID_IOV_BUF = (char *)iov[i].MPID_IOV_BUF + bytes_xferred;
48 iov[i].MPID_IOV_LEN -= bytes_xferred;
49 /* iov_count should be equal to the number of iov's remaining */
50 *iov_count -= (i - *iov_offset);
51 *iov_offset = i;
52 complete = 0;
53 break;
54 }
55 bytes_xferred -= iov[i].MPID_IOV_LEN;
56 }
57
58 return complete;
59 }
60
check_req_complete(MPIDI_VC_t * vc,MPID_Request * req,int * complete)61 static inline int check_req_complete(MPIDI_VC_t *vc, MPID_Request *req, int *complete)
62 {
63 int mpi_errno = MPI_SUCCESS;
64 int (*reqFn)(MPIDI_VC_t *, MPID_Request *, int *);
65 reqFn = req->dev.OnDataAvail;
66 if (reqFn) {
67 *complete = 0;
68
69 /* XXX DJG FIXME this feels like a hack */
70 req->dev.iov_count = MPID_IOV_LIMIT;
71 req->dev.iov_offset = 0;
72
73 mpi_errno = reqFn(vc, req, complete);
74 if (mpi_errno) MPIU_ERR_POP(mpi_errno);
75 }
76 else {
77 *complete = 1;
78 MPIDI_CH3U_Request_complete(req);
79 }
80
81 fn_fail:
82 return mpi_errno;
83 }
84
85 /* fills in req->dev.iov{,_offset,_count} based on the datatype info in the
86 request, creating a segment if necessary */
populate_iov_from_req(MPID_Request * req)87 static int populate_iov_from_req(MPID_Request *req)
88 {
89 int mpi_errno = MPI_SUCCESS;
90 int dt_contig;
91 MPI_Aint dt_true_lb;
92 MPIDI_msg_sz_t data_sz;
93 MPID_Datatype * dt_ptr;
94
95 /* find out contig/noncontig, size, and lb for the datatype */
96 MPIDI_Datatype_get_info(req->dev.user_count, req->dev.datatype,
97 dt_contig, data_sz, dt_ptr, dt_true_lb);
98
99 if (dt_contig) {
100 /* handle the iov creation ourselves */
101 req->dev.iov[0].MPID_IOV_BUF = (char *)req->dev.user_buf + dt_true_lb;
102 req->dev.iov[0].MPID_IOV_LEN = data_sz;
103 req->dev.iov_count = 1;
104 }
105 else {
106 /* use the segment routines to handle the iovec creation */
107 MPIU_Assert(req->dev.segment_ptr == NULL);
108
109 req->dev.iov_count = MPID_IOV_LIMIT;
110 req->dev.iov_offset = 0;
111
112 /* XXX DJG FIXME where is this segment freed? */
113 req->dev.segment_ptr = MPID_Segment_alloc();
114 MPIU_ERR_CHKANDJUMP1((req->dev.segment_ptr == NULL), mpi_errno,
115 MPI_ERR_OTHER, "**nomem",
116 "**nomem %s", "MPID_Segment_alloc");
117 MPID_Segment_init(req->dev.user_buf, req->dev.user_count,
118 req->dev.datatype, req->dev.segment_ptr, 0);
119 req->dev.segment_first = 0;
120 req->dev.segment_size = data_sz;
121
122
123 /* FIXME we should write our own function that isn't dependent on
124 the in-request iov array. This will let us use IOVs that are
125 larger than MPID_IOV_LIMIT. */
126 mpi_errno = MPIDI_CH3U_Request_load_send_iov(req, &req->dev.iov[0],
127 &req->dev.iov_count);
128 if (mpi_errno) MPIU_ERR_POP(mpi_errno);
129 }
130
131 fn_fail:
132 return mpi_errno;
133 }
134
do_vmsplice(MPID_Request * sreq,int pipe_fd,MPID_IOV iov[],int * iov_offset,int * iov_count,int * complete)135 static int do_vmsplice(MPID_Request *sreq, int pipe_fd, MPID_IOV iov[],
136 int *iov_offset, int *iov_count, int *complete)
137 {
138 int mpi_errno = MPI_SUCCESS;
139 ssize_t err;
140
141 #if 1
142 err = vmsplice(pipe_fd, &iov[*iov_offset], *iov_count, SPLICE_F_NONBLOCK);
143 #else
144 err = writev(pipe_fd, &iov[*iov_offset], *iov_count);
145 #endif
146
147 if (err < 0) {
148 if (errno == EAGAIN) goto fn_exit;
149 MPIU_ERR_CHKANDJUMP2(errno != EAGAIN, mpi_errno, MPI_ERR_OTHER, "**vmsplice",
150 "**vmsplice %d %s", errno, MPIU_Strerror(errno));
151 }
152
153 *complete = adjust_partially_xferred_iov(iov, iov_offset, iov_count, err);
154 if (*complete) {
155 /* look for additional data to send and reload IOV if there is more */
156 mpi_errno = check_req_complete(sreq->ch.vc, sreq, complete);
157 if (mpi_errno) MPIU_ERR_POP(mpi_errno);
158
159 if (*complete) {
160 err = close(pipe_fd);
161 MPIU_ERR_CHKANDJUMP(err < 0, mpi_errno, MPI_ERR_OTHER, "**close");
162 MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, ".... complete");
163 }
164 }
165
166 fn_fail:
167 fn_exit:
168 return mpi_errno;
169 }
170
171 #undef FUNCNAME
172 #define FUNCNAME MPID_nem_lmt_vmsplice_initiate_lmt
173 #undef FCNAME
174 #define FCNAME MPIDI_QUOTE(FUNCNAME)
MPID_nem_lmt_vmsplice_initiate_lmt(MPIDI_VC_t * vc,MPIDI_CH3_Pkt_t * pkt,MPID_Request * sreq)175 int MPID_nem_lmt_vmsplice_initiate_lmt(MPIDI_VC_t *vc, MPIDI_CH3_Pkt_t *pkt, MPID_Request *sreq)
176 {
177 int mpi_errno = MPI_SUCCESS;
178 MPID_nem_pkt_lmt_rts_t * const rts_pkt = (MPID_nem_pkt_lmt_rts_t *)pkt;
179 MPIDI_CH3I_VC *vc_ch = VC_CH(vc);
180 int complete = 0;
181 MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_LMT_VMSPLICE_INITIATE_LMT);
182
183 MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_LMT_VMSPLICE_INITIATE_LMT);
184
185 /* re-use the same pipe per-pair,per-sender */
186 if (vc_ch->lmt_copy_buf_handle == NULL) {
187 int err;
188 char *pipe_name;
189 MPIDI_CH3I_VC *vc_ch = VC_CH(vc);
190
191 pipe_name = tempnam(NULL, "lmt_");
192 MPIU_ERR_CHKANDJUMP2(!pipe_name, mpi_errno, MPI_ERR_OTHER, "**tempnam",
193 "**tempnam %d %s", errno, MPIU_Strerror(errno));
194
195 vc_ch->lmt_copy_buf_handle = MPIU_Strdup(pipe_name);
196 /* XXX DJG hack */
197 #undef free
198 free(pipe_name);
199
200 err = mkfifo(vc_ch->lmt_copy_buf_handle, 0660);
201 MPIU_ERR_CHKANDJUMP2(err < 0, mpi_errno, MPI_ERR_OTHER, "**mkfifo",
202 "**mkfifo %d %s", errno, MPIU_Strerror(errno));
203 }
204
205 /* can't start sending data yet, need full RTS/CTS handshake */
206
207 MPID_nem_lmt_send_RTS(vc, rts_pkt, vc_ch->lmt_copy_buf_handle,
208 strlen(vc_ch->lmt_copy_buf_handle)+1);
209
210 fn_fail:
211 fn_exit:
212 MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_LMT_VMSPLICE_INITIATE_LMT);
213 return mpi_errno;
214 }
215
do_readv(MPID_Request * rreq,int pipe_fd,MPID_IOV iov[],int * iov_offset,int * iov_count,int * complete)216 static int do_readv(MPID_Request *rreq, int pipe_fd, MPID_IOV iov[],
217 int *iov_offset, int *iov_count, int *complete)
218 {
219 int mpi_errno = MPI_SUCCESS;
220 ssize_t nread;
221
222 nread = readv(pipe_fd, &rreq->dev.iov[rreq->dev.iov_offset], rreq->dev.iov_count);
223 MPIU_ERR_CHKANDJUMP2(nread < 0 && errno != EAGAIN, mpi_errno, MPI_ERR_OTHER, "**read",
224 "**readv %d %s", errno, MPIU_Strerror(errno));
225
226 if (nread < 0) {
227 if (errno == EAGAIN) goto fn_exit;
228 MPIU_ERR_CHKANDJUMP2(errno != EAGAIN, mpi_errno, MPI_ERR_OTHER, "**vmsplice",
229 "**vmsplice %d %s", errno, MPIU_Strerror(errno));
230 }
231
232 *complete = adjust_partially_xferred_iov(iov, iov_offset, iov_count, nread);
233 if (*complete) {
234 /* look for additional data to send and reload IOV if there is more */
235 mpi_errno = check_req_complete(rreq->ch.vc, rreq, complete);
236 if (mpi_errno) MPIU_ERR_POP(mpi_errno);
237
238 if (*complete) {
239 nread = close(pipe_fd);
240 MPIU_ERR_CHKANDJUMP(nread < 0, mpi_errno, MPI_ERR_OTHER, "**close");
241 MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, ".... complete");
242 }
243 }
244
245 fn_fail:
246 fn_exit:
247 return mpi_errno;
248 }
249
250 /* This function is called when an RTS message comes in. */
251 #undef FUNCNAME
252 #define FUNCNAME MPID_nem_lmt_vmsplice_start_recv
253 #undef FCNAME
254 #define FCNAME MPIDI_QUOTE(FUNCNAME)
MPID_nem_lmt_vmsplice_start_recv(MPIDI_VC_t * vc,MPID_Request * rreq,MPID_IOV s_cookie)255 int MPID_nem_lmt_vmsplice_start_recv(MPIDI_VC_t *vc, MPID_Request *rreq, MPID_IOV s_cookie)
256 {
257 int mpi_errno = MPI_SUCCESS;
258 int i;
259 int complete = 0;
260 struct lmt_vmsplice_node *node = NULL;
261 MPIDI_CH3I_VC *vc_ch = VC_CH(vc);
262 int pipe_fd;
263 MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_LMT_VMSPLICE_START_RECV);
264
265 MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_LMT_VMSPLICE_START_RECV);
266
267 if (vc_ch->lmt_recv_copy_buf_handle == NULL) {
268 MPIU_Assert(s_cookie.MPID_IOV_BUF != NULL);
269 vc_ch->lmt_recv_copy_buf_handle = MPIU_Strdup(s_cookie.MPID_IOV_BUF);
270 }
271
272 /* XXX DJG FIXME in a real version we would want to cache the fd on the vc
273 so that we don't have two open's on the critical path every time. */
274 pipe_fd = open(vc_ch->lmt_recv_copy_buf_handle, O_NONBLOCK|O_RDONLY);
275 MPIU_ERR_CHKANDJUMP1(pipe_fd < 0, mpi_errno, MPI_ERR_OTHER, "**open",
276 "**open %s", MPIU_Strerror(errno));
277
278 MPID_nem_lmt_send_CTS(vc, rreq, NULL, 0);
279
280 mpi_errno = populate_iov_from_req(rreq);
281 if (mpi_errno) MPIU_ERR_POP(mpi_errno);
282
283 mpi_errno = do_readv(rreq, pipe_fd, rreq->dev.iov, &rreq->dev.iov_offset,
284 &rreq->dev.iov_count, &complete);
285
286 /* push request if not complete for progress checks later */
287 if (!complete) {
288 node = MPIU_Malloc(sizeof(struct lmt_vmsplice_node));
289 node->pipe_fd = pipe_fd;
290 node->req = rreq;
291 node->next = outstanding_head;
292 outstanding_head = node;
293 ++MPID_nem_local_lmt_pending;
294 }
295
296 fn_exit:
297 MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_LMT_VMSPLICE_START_RECV);
298 return mpi_errno;
299 fn_fail:
300 goto fn_exit;
301 }
302
303 /* XXX DJG FIXME at some point this should poll, much like the newtcp module.
304 But then we have that whole pollfd array to manage, which we don't really
305 need until this proof-of-concept proves itself. */
MPID_nem_lmt_vmsplice_progress(void)306 int MPID_nem_lmt_vmsplice_progress(void)
307 {
308 int mpi_errno = MPI_SUCCESS;
309 struct lmt_vmsplice_node *prev = NULL;
310 struct lmt_vmsplice_node *free_me = NULL;
311 struct lmt_vmsplice_node *cur = outstanding_head;
312 MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_LMT_VMSPLICE_PROGRESS);
313
314 MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_LMT_VMSPLICE_PROGRESS);
315
316 while (cur) {
317 int complete = 0;
318
319 switch (MPIDI_Request_get_type(cur->req)) {
320 case MPIDI_REQUEST_TYPE_RECV:
321 mpi_errno = do_readv(cur->req, cur->pipe_fd, cur->req->dev.iov,
322 &cur->req->dev.iov_offset,
323 &cur->req->dev.iov_count, &complete);
324 /* FIXME: set the error status of the req and complete it, rather than POP */
325 if (mpi_errno) MPIU_ERR_POP(mpi_errno);
326 break;
327 case MPIDI_REQUEST_TYPE_SEND:
328 mpi_errno = do_vmsplice(cur->req, cur->pipe_fd, cur->req->dev.iov,
329 &cur->req->dev.iov_offset,
330 &cur->req->dev.iov_count, &complete);
331 /* FIXME: set the error status of the req and complete it, rather than POP */
332 if (mpi_errno) MPIU_ERR_POP(mpi_errno);
333 break;
334 default:
335 MPIU_ERR_INTERNALANDJUMP(mpi_errno, "unexpected request type");
336 break;
337 }
338
339 if (complete) {
340 MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, ".... complete");
341
342 /* remove the node from the list */
343 if (cur == outstanding_head) {
344 outstanding_head = cur->next;
345 prev = NULL;
346 free_me = cur;
347 cur = cur->next;
348 }
349 else {
350 prev->next = cur->next;
351 prev = cur;
352 free_me = cur;
353 cur = cur->next;
354 }
355 if (free_me) MPIU_Free(free_me);
356 --MPID_nem_local_lmt_pending;
357 }
358
359 if (!cur) break; /* we might have made cur NULL above */
360
361 prev = cur;
362 cur = cur->next;
363 }
364
365 fn_exit:
366 MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_LMT_VMSPLICE_PROGRESS);
367 return mpi_errno;
368 fn_fail:
369 goto fn_exit;
370 }
371
372 /* called when a CTS message is received */
373 #undef FUNCNAME
374 #define FUNCNAME MPID_nem_lmt_vmsplice_start_send
375 #undef FCNAME
376 #define FCNAME MPIDI_QUOTE(FUNCNAME)
MPID_nem_lmt_vmsplice_start_send(MPIDI_VC_t * vc,MPID_Request * sreq,MPID_IOV r_cookie)377 int MPID_nem_lmt_vmsplice_start_send(MPIDI_VC_t *vc, MPID_Request *sreq, MPID_IOV r_cookie)
378 {
379 int mpi_errno = MPI_SUCCESS;
380 MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_LMT_VMSPLICE_START_SEND);
381 int pipe_fd;
382 int complete;
383 struct lmt_vmsplice_node *node = NULL;
384 int (*reqFn)(MPIDI_VC_t *, MPID_Request *, int *);
385 MPIDI_CH3I_VC *vc_ch = VC_CH(vc);
386
387 MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_LMT_VMSPLICE_START_SEND);
388
389 /* Must do this after the other side has opened for reading, otherwise we
390 will error out with ENXIO. This will be indicated by the receipt of a
391 CTS message. */
392 pipe_fd = open(vc_ch->lmt_copy_buf_handle, O_NONBLOCK|O_WRONLY);
393 MPIU_ERR_CHKANDJUMP1(pipe_fd < 0, mpi_errno, MPI_ERR_OTHER, "**open",
394 "**open %s", MPIU_Strerror(errno));
395
396 mpi_errno = populate_iov_from_req(sreq);
397 if (mpi_errno) MPIU_ERR_POP(mpi_errno);
398
399 /* send the first flight */
400 sreq->ch.vc = vc; /* XXX DJG is this already assigned? */
401 complete = 0;
402 mpi_errno = do_vmsplice(sreq, pipe_fd, sreq->dev.iov,
403 &sreq->dev.iov_offset, &sreq->dev.iov_count, &complete);
404 if (mpi_errno) MPIU_ERR_POP(mpi_errno);
405
406 if (!complete) {
407 /* push for later progress */
408 node = MPIU_Malloc(sizeof(struct lmt_vmsplice_node));
409 node->pipe_fd = pipe_fd;
410 node->req = sreq;
411 node->next = outstanding_head;
412 outstanding_head = node;
413 ++MPID_nem_local_lmt_pending;
414 }
415
416 fn_fail:
417 fn_exit:
418 MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_LMT_VMSPLICE_START_SEND);
419 return mpi_errno;
420 }
421
422 #undef FUNCNAME
423 #define FUNCNAME MPIDI_CH3_MPID_nem_lmt_vmsplice_vc_terminated
424 #undef FCNAME
425 #define FCNAME MPIU_QUOTE(FUNCNAME)
MPIDI_CH3_MPID_nem_lmt_vmsplice_vc_terminated(MPIDI_VC_t * vc)426 int MPIDI_CH3_MPID_nem_lmt_vmsplice_vc_terminated(MPIDI_VC_t *vc)
427 {
428 int mpi_errno = MPI_SUCCESS;
429 MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_MPID_NEM_LMT_VMSPLICE_VC_TERMINATED);
430
431 MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_MPID_NEM_LMT_VMSPLICE_VC_TERMINATED);
432
433 /* FIXME: need to handle the case where a VC is terminated due to
434 a process failure. We need to remove any outstanding LMT ops
435 for this VC. */
436
437 fn_exit:
438 MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_MPID_NEM_LMT_VMSPLICE_VC_TERMINATED);
439 return mpi_errno;
440 fn_fail:
441 goto fn_exit;
442 }
443
444
445 /* --------------------------------------------------------------------------
446 The functions below are nops, stubs that might be used in later versions of
447 this code.
448 -------------------------------------------------------------------------- */
449
450 /* called when a DONE message is received for a receive request */
451 #undef FUNCNAME
452 #define FUNCNAME MPID_nem_lmt_vmsplice_done_recv
453 #undef FCNAME
454 #define FCNAME MPIDI_QUOTE(FUNCNAME)
MPID_nem_lmt_vmsplice_done_recv(MPIDI_VC_t * vc,MPID_Request * rreq)455 int MPID_nem_lmt_vmsplice_done_recv(MPIDI_VC_t *vc, MPID_Request *rreq)
456 {
457 MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_LMT_VMSPLICE_DONE_RECV);
458
459 MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_LMT_VMSPLICE_DONE_RECV);
460
461 /* nop */
462
463 MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_LMT_VMSPLICE_DONE_RECV);
464 return MPI_SUCCESS;
465 }
466
467 #undef FUNCNAME
468 #define FUNCNAME MPID_nem_lmt_vmsplice_done_send
469 #undef FCNAME
470 #define FCNAME MPIDI_QUOTE(FUNCNAME)
MPID_nem_lmt_vmsplice_done_send(MPIDI_VC_t * vc,MPID_Request * sreq)471 int MPID_nem_lmt_vmsplice_done_send(MPIDI_VC_t *vc, MPID_Request *sreq)
472 {
473 int mpi_errno = MPI_SUCCESS;
474 MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_LMT_VMSPLICE_DONE_SEND);
475
476 MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_LMT_VMSPLICE_DONE_SEND);
477
478 /* nop */
479
480 MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_LMT_VMSPLICE_DONE_SEND);
481 return MPI_SUCCESS;
482 }
483
484 /* called when a COOKIE message is received */
485 #undef FUNCNAME
486 #define FUNCNAME MPID_nem_lmt_vmsplice_handle_cookie
487 #undef FCNAME
488 #define FCNAME MPIDI_QUOTE(FUNCNAME)
MPID_nem_lmt_vmsplice_handle_cookie(MPIDI_VC_t * vc,MPID_Request * req,MPID_IOV cookie)489 int MPID_nem_lmt_vmsplice_handle_cookie(MPIDI_VC_t *vc, MPID_Request *req, MPID_IOV cookie)
490 {
491 int mpi_errno = MPI_SUCCESS;
492 MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_LMT_VMSPLICE_HANDLE_COOKIE);
493
494 MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_LMT_VMSPLICE_HANDLE_COOKIE);
495
496 /* nop */
497
498 MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_LMT_VMSPLICE_HANDLE_COOKIE);
499 return MPI_SUCCESS;
500 }
501
502 #endif
503