1 /*
2 * Copyright (c) 2012-2013 Los Alamos National Security, LLC.
3 * All rights reserved.
4 * Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
5 * Copyright (c) 2014-2015 Research Organization for Information Science
6 * and Technology (RIST). All rights reserved.
7 * $COPYRIGHT$
8 *
9 * Additional copyrights may follow
10 *
11 * $HEADER$
12 */
13
14 #include "orte_config.h"
15
16 #include <sys/types.h>
17 #ifdef HAVE_UNISTD_H
18 #include <unistd.h>
19 #endif /* HAVE_UNISTD_H */
20 #include <string.h>
21 #ifdef HAVE_FCNTL_H
22 #include <fcntl.h>
23 #endif
24 #include <sys/stat.h>
25
26 #include "opal/util/if.h"
27 #include "opal/util/output.h"
28 #include "opal/util/uri.h"
29 #include "opal/dss/dss.h"
30 #include "opal/mca/pmix/pmix.h"
31
32 #include "orte/util/error_strings.h"
33 #include "orte/util/name_fns.h"
34 #include "orte/util/show_help.h"
35 #include "orte/util/threads.h"
36 #include "orte/runtime/orte_globals.h"
37 #include "orte/mca/errmgr/errmgr.h"
38 #include "orte/mca/rml/rml.h"
39
40 #include "orte/mca/dfs/base/base.h"
41 #include "dfs_test.h"
42
43 /*
44 * Module functions: Global
45 */
46 static int init(void);
47 static int finalize(void);
48
49 static void dfs_open(char *uri,
50 orte_dfs_open_callback_fn_t cbfunc,
51 void *cbdata);
52 static void dfs_close(int fd,
53 orte_dfs_close_callback_fn_t cbfunc,
54 void *cbdata);
55 static void dfs_get_file_size(int fd,
56 orte_dfs_size_callback_fn_t cbfunc,
57 void *cbdata);
58 static void dfs_seek(int fd, long offset, int whence,
59 orte_dfs_seek_callback_fn_t cbfunc,
60 void *cbdata);
61 static void dfs_read(int fd, uint8_t *buffer,
62 long length,
63 orte_dfs_read_callback_fn_t cbfunc,
64 void *cbdata);
65 static void dfs_post_file_map(opal_buffer_t *bo,
66 orte_dfs_post_callback_fn_t cbfunc,
67 void *cbdata);
68 static void dfs_get_file_map(orte_process_name_t *target,
69 orte_dfs_fm_callback_fn_t cbfunc,
70 void *cbdata);
71 static void dfs_load_file_maps(orte_jobid_t jobid,
72 opal_buffer_t *bo,
73 orte_dfs_load_callback_fn_t cbfunc,
74 void *cbdata);
75 static void dfs_purge_file_maps(orte_jobid_t jobid,
76 orte_dfs_purge_callback_fn_t cbfunc,
77 void *cbdata);
78
79 /******************
80 * TEST module
81 ******************/
82 orte_dfs_base_module_t orte_dfs_test_module = {
83 init,
84 finalize,
85 dfs_open,
86 dfs_close,
87 dfs_get_file_size,
88 dfs_seek,
89 dfs_read,
90 dfs_post_file_map,
91 dfs_get_file_map,
92 dfs_load_file_maps,
93 dfs_purge_file_maps
94 };
95
96 static opal_list_t requests, active_files;
97 static int local_fd = 0;
98 static uint64_t req_id = 0;
99 static void recv_dfs(int status, orte_process_name_t* sender,
100 opal_buffer_t* buffer, orte_rml_tag_t tag,
101 void* cbdata);
102
init(void)103 static int init(void)
104 {
105 OBJ_CONSTRUCT(&requests, opal_list_t);
106 OBJ_CONSTRUCT(&active_files, opal_list_t);
107 orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
108 ORTE_RML_TAG_DFS_DATA,
109 ORTE_RML_PERSISTENT,
110 recv_dfs,
111 NULL);
112 return ORTE_SUCCESS;
113 }
114
finalize(void)115 static int finalize(void)
116 {
117 opal_list_item_t *item;
118
119 orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DFS_DATA);
120 while (NULL != (item = opal_list_remove_first(&requests))) {
121 OBJ_RELEASE(item);
122 }
123 OBJ_DESTRUCT(&requests);
124 while (NULL != (item = opal_list_remove_first(&active_files))) {
125 OBJ_RELEASE(item);
126 }
127 OBJ_DESTRUCT(&active_files);
128 return ORTE_SUCCESS;
129 }
130
131 /* receives take place in an event, so we are free to process
132 * the request list without fear of getting things out-of-order
133 */
recv_dfs(int status,orte_process_name_t * sender,opal_buffer_t * buffer,orte_rml_tag_t tag,void * cbdata)134 static void recv_dfs(int status, orte_process_name_t* sender,
135 opal_buffer_t* buffer, orte_rml_tag_t tag,
136 void* cbdata)
137 {
138 orte_dfs_cmd_t cmd;
139 int32_t cnt;
140 orte_dfs_request_t *dfs, *dptr;
141 opal_list_item_t *item;
142 int remote_fd, rc;
143 int64_t i64;
144 uint64_t rid;
145 orte_dfs_tracker_t *trk;
146
147 /* unpack the command this message is responding to */
148 cnt = 1;
149 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &cmd, &cnt, ORTE_DFS_CMD_T))) {
150 ORTE_ERROR_LOG(rc);
151 return;
152 }
153
154 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
155 "%s recvd cmd %d from sender %s",
156 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)cmd,
157 ORTE_NAME_PRINT(sender));
158
159 switch (cmd) {
160 case ORTE_DFS_OPEN_CMD:
161 /* unpack the request id */
162 cnt = 1;
163 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
164 ORTE_ERROR_LOG(rc);
165 return;
166 }
167 /* unpack the remote fd */
168 cnt = 1;
169 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &remote_fd, &cnt, OPAL_INT))) {
170 ORTE_ERROR_LOG(rc);
171 return;
172 }
173 /* search our list of requests to find the matching one */
174 dfs = NULL;
175 for (item = opal_list_get_first(&requests);
176 item != opal_list_get_end(&requests);
177 item = opal_list_get_next(item)) {
178 dptr = (orte_dfs_request_t*)item;
179 if (dptr->id == rid) {
180 /* as the request has been fulfilled, remove it */
181 opal_list_remove_item(&requests, item);
182 dfs = dptr;
183 break;
184 }
185 }
186 if (NULL == dfs) {
187 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
188 "%s recvd open file - no corresponding request found for local fd %d",
189 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), local_fd);
190 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
191 return;
192 }
193
194 /* if the remote_fd < 0, then we had an error, so return
195 * the error value to the caller
196 */
197 if (remote_fd < 0) {
198 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
199 "%s recvd open file response error file %s [error: %d]",
200 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
201 dfs->uri, remote_fd);
202 if (NULL != dfs->open_cbfunc) {
203 dfs->open_cbfunc(remote_fd, dfs->cbdata);
204 }
205 /* release the request */
206 OBJ_RELEASE(dfs);
207 return;
208 }
209 /* otherwise, create a tracker for this file */
210 trk = OBJ_NEW(orte_dfs_tracker_t);
211 trk->requestor.jobid = ORTE_PROC_MY_NAME->jobid;
212 trk->requestor.vpid = ORTE_PROC_MY_NAME->vpid;
213 trk->host_daemon.jobid = sender->jobid;
214 trk->host_daemon.vpid = sender->vpid;
215 trk->filename = strdup(dfs->uri);
216 /* define the local fd */
217 trk->local_fd = local_fd++;
218 /* record the remote file descriptor */
219 trk->remote_fd = remote_fd;
220 /* add it to our list of active files */
221 opal_list_append(&active_files, &trk->super);
222 /* return the local_fd to the caller for
223 * subsequent operations
224 */
225 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
226 "%s recvd open file completed for file %s [local fd: %d remote fd: %d]",
227 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
228 dfs->uri, trk->local_fd, remote_fd);
229 if (NULL != dfs->open_cbfunc) {
230 dfs->open_cbfunc(trk->local_fd, dfs->cbdata);
231 }
232 /* release the request */
233 OBJ_RELEASE(dfs);
234 break;
235
236 case ORTE_DFS_SIZE_CMD:
237 /* unpack the request id for this request */
238 cnt = 1;
239 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
240 ORTE_ERROR_LOG(rc);
241 return;
242 }
243 /* search our list of requests to find the matching one */
244 dfs = NULL;
245 for (item = opal_list_get_first(&requests);
246 item != opal_list_get_end(&requests);
247 item = opal_list_get_next(item)) {
248 dptr = (orte_dfs_request_t*)item;
249 if (dptr->id == rid) {
250 /* request was fulfilled, so remove it */
251 opal_list_remove_item(&requests, item);
252 dfs = dptr;
253 break;
254 }
255 }
256 if (NULL == dfs) {
257 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
258 "%s recvd size - no corresponding request found for local fd %d",
259 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), local_fd);
260 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
261 return;
262 }
263 /* get the size */
264 cnt = 1;
265 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &i64, &cnt, OPAL_INT64))) {
266 ORTE_ERROR_LOG(rc);
267 OBJ_RELEASE(dfs);
268 return;
269 }
270 /* pass it back to the original caller */
271 if (NULL != dfs->size_cbfunc) {
272 dfs->size_cbfunc(i64, dfs->cbdata);
273 }
274 /* release the request */
275 OBJ_RELEASE(dfs);
276 break;
277
278 case ORTE_DFS_SEEK_CMD:
279 /* unpack the request id for this read */
280 cnt = 1;
281 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
282 ORTE_ERROR_LOG(rc);
283 return;
284 }
285 /* search our list of requests to find the matching one */
286 dfs = NULL;
287 for (item = opal_list_get_first(&requests);
288 item != opal_list_get_end(&requests);
289 item = opal_list_get_next(item)) {
290 dptr = (orte_dfs_request_t*)item;
291 if (dptr->id == rid) {
292 /* request was fulfilled, so remove it */
293 opal_list_remove_item(&requests, item);
294 dfs = dptr;
295 break;
296 }
297 }
298 if (NULL == dfs) {
299 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
300 "%s recvd seek - no corresponding request found for local fd %d",
301 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), local_fd);
302 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
303 return;
304 }
305 /* get the returned offset/status */
306 cnt = 1;
307 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &i64, &cnt, OPAL_INT64))) {
308 ORTE_ERROR_LOG(rc);
309 OBJ_RELEASE(dfs);
310 return;
311 }
312 /* pass it back to the original caller */
313 if (NULL != dfs->seek_cbfunc) {
314 dfs->seek_cbfunc(i64, dfs->cbdata);
315 }
316 /* release the request */
317 OBJ_RELEASE(dfs);
318 break;
319
320 case ORTE_DFS_READ_CMD:
321 /* unpack the request id for this read */
322 cnt = 1;
323 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
324 ORTE_ERROR_LOG(rc);
325 return;
326 }
327 /* search our list of requests to find the matching one */
328 dfs = NULL;
329 for (item = opal_list_get_first(&requests);
330 item != opal_list_get_end(&requests);
331 item = opal_list_get_next(item)) {
332 dptr = (orte_dfs_request_t*)item;
333 if (dptr->id == rid) {
334 /* request was fulfilled, so remove it */
335 opal_list_remove_item(&requests, item);
336 dfs = dptr;
337 break;
338 }
339 }
340 if (NULL == dfs) {
341 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
342 "%s recvd read - no corresponding request found for local fd %d",
343 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), local_fd);
344 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
345 return;
346 }
347 /* get the bytes read */
348 cnt = 1;
349 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &i64, &cnt, OPAL_INT64))) {
350 ORTE_ERROR_LOG(rc);
351 OBJ_RELEASE(dfs);
352 return;
353 }
354 if (0 < i64) {
355 cnt = i64;
356 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, dfs->read_buffer, &cnt, OPAL_UINT8))) {
357 ORTE_ERROR_LOG(rc);
358 OBJ_RELEASE(dfs);
359 return;
360 }
361 }
362 /* pass them back to the original caller */
363 if (NULL != dfs->read_cbfunc) {
364 dfs->read_cbfunc(i64, dfs->read_buffer, dfs->cbdata);
365 }
366 /* release the request */
367 OBJ_RELEASE(dfs);
368 break;
369
370 case ORTE_DFS_POST_CMD:
371 /* unpack the request id for this read */
372 cnt = 1;
373 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
374 ORTE_ERROR_LOG(rc);
375 return;
376 }
377 /* search our list of requests to find the matching one */
378 dfs = NULL;
379 for (item = opal_list_get_first(&requests);
380 item != opal_list_get_end(&requests);
381 item = opal_list_get_next(item)) {
382 dptr = (orte_dfs_request_t*)item;
383 if (dptr->id == rid) {
384 /* request was fulfilled, so remove it */
385 opal_list_remove_item(&requests, item);
386 dfs = dptr;
387 break;
388 }
389 }
390 if (NULL == dfs) {
391 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
392 "%s recvd post - no corresponding request found",
393 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
394 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
395 return;
396 }
397 if (NULL != dfs->post_cbfunc) {
398 dfs->post_cbfunc(dfs->cbdata);
399 }
400 OBJ_RELEASE(dfs);
401 break;
402
403 case ORTE_DFS_GETFM_CMD:
404 /* unpack the request id for this read */
405 cnt = 1;
406 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
407 ORTE_ERROR_LOG(rc);
408 return;
409 }
410 /* search our list of requests to find the matching one */
411 dfs = NULL;
412 for (item = opal_list_get_first(&requests);
413 item != opal_list_get_end(&requests);
414 item = opal_list_get_next(item)) {
415 dptr = (orte_dfs_request_t*)item;
416 if (dptr->id == rid) {
417 /* request was fulfilled, so remove it */
418 opal_list_remove_item(&requests, item);
419 dfs = dptr;
420 break;
421 }
422 }
423 if (NULL == dfs) {
424 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
425 "%s recvd getfm - no corresponding request found",
426 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
427 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
428 return;
429 }
430 /* return it to caller */
431 if (NULL != dfs->fm_cbfunc) {
432 dfs->fm_cbfunc(buffer, dfs->cbdata);
433 }
434 OBJ_RELEASE(dfs);
435 break;
436
437 default:
438 opal_output(0, "TEST:DFS:RECV WTF");
439 break;
440 }
441 }
442
process_opens(int fd,short args,void * cbdata)443 static void process_opens(int fd, short args, void *cbdata)
444 {
445 orte_dfs_request_t *dfs = (orte_dfs_request_t*)cbdata;
446 int rc;
447 opal_buffer_t *buffer;
448 char *scheme, *host=NULL, *filename=NULL;
449 orte_process_name_t daemon;
450 opal_list_t lt;
451 opal_namelist_t *nm;
452
453 ORTE_ACQUIRE_OBJECT(dfs);
454
455 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
456 "%s PROCESSING OPEN", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
457
458 /* get the scheme to determine if we can process locally or not */
459 if (NULL == (scheme = opal_uri_get_scheme(dfs->uri))) {
460 ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
461 goto complete;
462 }
463 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
464 "%s GOT SCHEME", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
465
466 if (0 != strcmp(scheme, "file")) {
467 /* not yet supported */
468 orte_show_help("orte_dfs_help.txt", "unsupported-filesystem",
469 true, dfs->uri);
470 free(scheme);
471 goto complete;
472 }
473 free(scheme);
474
475 /* dissect the uri to extract host and filename/path */
476 if (NULL == (filename = opal_filename_from_uri(dfs->uri, &host))) {
477 goto complete;
478 }
479 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
480 "%s GOT FILENAME %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), filename);
481 if (NULL == host) {
482 host = strdup(orte_process_info.nodename);
483 }
484
485 /* ident the daemon on that host */
486 daemon.jobid = ORTE_PROC_MY_DAEMON->jobid;
487 OBJ_CONSTRUCT(<, opal_list_t);
488 if (ORTE_SUCCESS != (rc = opal_pmix.resolve_peers(host, daemon.jobid, <))) {
489 ORTE_ERROR_LOG(rc);
490 OBJ_DESTRUCT(<);
491 goto complete;
492 }
493 nm = (opal_namelist_t*)opal_list_get_first(<);
494 daemon.vpid = nm->name.vpid;
495 OPAL_LIST_DESTRUCT(<);
496
497 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
498 "%s file %s on host %s daemon %s",
499 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
500 filename, host, ORTE_NAME_PRINT(&daemon));
501
502 /* add this request to our local list so we can
503 * match it with the returned response when it comes
504 */
505 dfs->id = req_id++;
506 opal_list_append(&requests, &dfs->super);
507
508 /* setup a message for the daemon telling
509 * them what file we want to access
510 */
511 buffer = OBJ_NEW(opal_buffer_t);
512 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->cmd, 1, ORTE_DFS_CMD_T))) {
513 ORTE_ERROR_LOG(rc);
514 opal_list_remove_item(&requests, &dfs->super);
515 goto complete;
516 }
517 /* pass the request id */
518 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->id, 1, OPAL_UINT64))) {
519 ORTE_ERROR_LOG(rc);
520 opal_list_remove_item(&requests, &dfs->super);
521 goto complete;
522 }
523 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &filename, 1, OPAL_STRING))) {
524 ORTE_ERROR_LOG(rc);
525 opal_list_remove_item(&requests, &dfs->super);
526 goto complete;
527 }
528
529 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
530 "%s sending open file request to %s file %s",
531 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
532 ORTE_NAME_PRINT(&daemon),
533 filename);
534 /* send it */
535 if (0 > (rc = orte_rml.send_buffer_nb(orte_mgmt_conduit,
536 &daemon, buffer,
537 ORTE_RML_TAG_DFS_CMD,
538 orte_rml_send_callback, NULL))) {
539 ORTE_ERROR_LOG(rc);
540 OBJ_RELEASE(buffer);
541 opal_list_remove_item(&requests, &dfs->super);
542 goto complete;
543 }
544 /* don't release it */
545 free(host);
546 free(filename);
547 return;
548
549 complete:
550 /* we get here if an error occurred - execute any
551 * pending callback so the proc doesn't hang
552 */
553 if (NULL != host) {
554 free(host);
555 }
556 if (NULL != filename) {
557 free(filename);
558 }
559 if (NULL != dfs->open_cbfunc) {
560 dfs->open_cbfunc(-1, dfs->cbdata);
561 }
562 OBJ_RELEASE(dfs);
563 }
564
565
566 /* in order to handle the possible opening/reading of files by
567 * multiple threads, we have to ensure that all operations are
568 * carried out in events - so the "open" cmd simply posts an
569 * event containing the required info, and then returns
570 */
dfs_open(char * uri,orte_dfs_open_callback_fn_t cbfunc,void * cbdata)571 static void dfs_open(char *uri,
572 orte_dfs_open_callback_fn_t cbfunc,
573 void *cbdata)
574 {
575 orte_dfs_request_t *dfs;
576
577 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
578 "%s opening file %s",
579 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), uri);
580
581 /* setup the request */
582 dfs = OBJ_NEW(orte_dfs_request_t);
583 dfs->cmd = ORTE_DFS_OPEN_CMD;
584 dfs->uri = strdup(uri);
585 dfs->open_cbfunc = cbfunc;
586 dfs->cbdata = cbdata;
587
588 /* post it for processing */
589 ORTE_THREADSHIFT(dfs, orte_event_base, process_opens, ORTE_SYS_PRI);
590 }
591
process_close(int fd,short args,void * cbdata)592 static void process_close(int fd, short args, void *cbdata)
593 {
594 orte_dfs_request_t *close_dfs = (orte_dfs_request_t*)cbdata;
595 orte_dfs_tracker_t *tptr, *trk;
596 opal_list_item_t *item;
597 opal_buffer_t *buffer;
598 int rc;
599
600 ORTE_ACQUIRE_OBJECT(close_dfs);
601
602 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
603 "%s closing fd %d",
604 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
605 close_dfs->local_fd);
606
607 /* look in our local records for this fd */
608 trk = NULL;
609 for (item = opal_list_get_first(&active_files);
610 item != opal_list_get_end(&active_files);
611 item = opal_list_get_next(item)) {
612 tptr = (orte_dfs_tracker_t*)item;
613 if (tptr->local_fd == close_dfs->local_fd) {
614 trk = tptr;
615 break;
616 }
617 }
618 if (NULL == trk) {
619 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
620 if (NULL != close_dfs->close_cbfunc) {
621 close_dfs->close_cbfunc(close_dfs->local_fd, close_dfs->cbdata);
622 }
623 OBJ_RELEASE(close_dfs);
624 return;
625 }
626
627 /* setup a message for the daemon telling
628 * them what file to close
629 */
630 buffer = OBJ_NEW(opal_buffer_t);
631 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &close_dfs->cmd, 1, ORTE_DFS_CMD_T))) {
632 ORTE_ERROR_LOG(rc);
633 goto complete;
634 }
635 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &trk->remote_fd, 1, OPAL_INT))) {
636 ORTE_ERROR_LOG(rc);
637 goto complete;
638 }
639
640 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
641 "%s sending close file request to %s for fd %d",
642 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
643 ORTE_NAME_PRINT(&trk->host_daemon),
644 trk->local_fd);
645 /* send it */
646 if (0 > (rc = orte_rml.send_buffer_nb(orte_mgmt_conduit,
647 &trk->host_daemon, buffer,
648 ORTE_RML_TAG_DFS_CMD,
649 orte_rml_send_callback, NULL))) {
650 ORTE_ERROR_LOG(rc);
651 OBJ_RELEASE(buffer);
652 goto complete;
653 }
654
655 complete:
656 opal_list_remove_item(&active_files, &trk->super);
657 OBJ_RELEASE(trk);
658 if (NULL != close_dfs->close_cbfunc) {
659 close_dfs->close_cbfunc(close_dfs->local_fd, close_dfs->cbdata);
660 }
661 OBJ_RELEASE(close_dfs);
662 }
663
dfs_close(int fd,orte_dfs_close_callback_fn_t cbfunc,void * cbdata)664 static void dfs_close(int fd,
665 orte_dfs_close_callback_fn_t cbfunc,
666 void *cbdata)
667 {
668 orte_dfs_request_t *dfs;
669
670 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
671 "%s close called on fd %d",
672 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), fd);
673
674 dfs = OBJ_NEW(orte_dfs_request_t);
675 dfs->cmd = ORTE_DFS_CLOSE_CMD;
676 dfs->local_fd = fd;
677 dfs->close_cbfunc = cbfunc;
678 dfs->cbdata = cbdata;
679
680 /* post it for processing */
681 ORTE_THREADSHIFT(dfs, orte_event_base, process_close, ORTE_SYS_PRI);
682 }
683
process_sizes(int fd,short args,void * cbdata)684 static void process_sizes(int fd, short args, void *cbdata)
685 {
686 orte_dfs_request_t *size_dfs = (orte_dfs_request_t*)cbdata;
687 orte_dfs_tracker_t *tptr, *trk;
688 opal_list_item_t *item;
689 opal_buffer_t *buffer;
690 int rc;
691
692 ORTE_ACQUIRE_OBJECT(size_dfs);
693
694 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
695 "%s processing get_size on fd %d",
696 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
697 size_dfs->local_fd);
698
699 /* look in our local records for this fd */
700 trk = NULL;
701 for (item = opal_list_get_first(&active_files);
702 item != opal_list_get_end(&active_files);
703 item = opal_list_get_next(item)) {
704 tptr = (orte_dfs_tracker_t*)item;
705 if (tptr->local_fd == size_dfs->local_fd) {
706 trk = tptr;
707 break;
708 }
709 }
710 if (NULL == trk) {
711 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
712 OBJ_RELEASE(size_dfs);
713 return;
714 }
715
716 /* add this request to our local list so we can
717 * match it with the returned response when it comes
718 */
719 size_dfs->id = req_id++;
720 opal_list_append(&requests, &size_dfs->super);
721
722 /* setup a message for the daemon telling
723 * them what file we want to access
724 */
725 buffer = OBJ_NEW(opal_buffer_t);
726 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &size_dfs->cmd, 1, ORTE_DFS_CMD_T))) {
727 ORTE_ERROR_LOG(rc);
728 opal_list_remove_item(&requests, &size_dfs->super);
729 goto complete;
730 }
731 /* pass the request id */
732 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &size_dfs->id, 1, OPAL_UINT64))) {
733 ORTE_ERROR_LOG(rc);
734 opal_list_remove_item(&requests, &size_dfs->super);
735 goto complete;
736 }
737 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &trk->remote_fd, 1, OPAL_INT))) {
738 ORTE_ERROR_LOG(rc);
739 opal_list_remove_item(&requests, &size_dfs->super);
740 goto complete;
741 }
742
743 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
744 "%s sending get_size request to %s for fd %d",
745 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
746 ORTE_NAME_PRINT(&trk->host_daemon),
747 trk->local_fd);
748 /* send it */
749 if (0 > (rc = orte_rml.send_buffer_nb(orte_mgmt_conduit,
750 &trk->host_daemon, buffer,
751 ORTE_RML_TAG_DFS_CMD,
752 orte_rml_send_callback, NULL))) {
753 ORTE_ERROR_LOG(rc);
754 OBJ_RELEASE(buffer);
755 opal_list_remove_item(&requests, &size_dfs->super);
756 if (NULL != size_dfs->size_cbfunc) {
757 size_dfs->size_cbfunc(-1, size_dfs->cbdata);
758 }
759 goto complete;
760 }
761 /* leave the request there */
762 return;
763
764 complete:
765 OBJ_RELEASE(size_dfs);
766 }
767
dfs_get_file_size(int fd,orte_dfs_size_callback_fn_t cbfunc,void * cbdata)768 static void dfs_get_file_size(int fd,
769 orte_dfs_size_callback_fn_t cbfunc,
770 void *cbdata)
771 {
772 orte_dfs_request_t *dfs;
773
774 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
775 "%s get_size called on fd %d",
776 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), fd);
777
778 dfs = OBJ_NEW(orte_dfs_request_t);
779 dfs->cmd = ORTE_DFS_SIZE_CMD;
780 dfs->local_fd = fd;
781 dfs->size_cbfunc = cbfunc;
782 dfs->cbdata = cbdata;
783
784 /* post it for processing */
785 ORTE_THREADSHIFT(dfs, orte_event_base, process_sizes, ORTE_SYS_PRI);
786 }
787
788
process_seeks(int fd,short args,void * cbdata)789 static void process_seeks(int fd, short args, void *cbdata)
790 {
791 orte_dfs_request_t *seek_dfs = (orte_dfs_request_t*)cbdata;
792 orte_dfs_tracker_t *tptr, *trk;
793 opal_list_item_t *item;
794 opal_buffer_t *buffer;
795 int64_t i64;
796 int rc;
797
798 ORTE_ACQUIRE_OBJECT(seek_dfs);
799
800 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
801 "%s processing seek on fd %d",
802 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
803 seek_dfs->local_fd);
804
805 /* look in our local records for this fd */
806 trk = NULL;
807 for (item = opal_list_get_first(&active_files);
808 item != opal_list_get_end(&active_files);
809 item = opal_list_get_next(item)) {
810 tptr = (orte_dfs_tracker_t*)item;
811 if (tptr->local_fd == seek_dfs->local_fd) {
812 trk = tptr;
813 break;
814 }
815 }
816 if (NULL == trk) {
817 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
818 OBJ_RELEASE(seek_dfs);
819 return;
820 }
821
822 /* add this request to our local list so we can
823 * match it with the returned response when it comes
824 */
825 seek_dfs->id = req_id++;
826 opal_list_append(&requests, &seek_dfs->super);
827
828 /* setup a message for the daemon telling
829 * them what file to seek
830 */
831 buffer = OBJ_NEW(opal_buffer_t);
832 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &seek_dfs->cmd, 1, ORTE_DFS_CMD_T))) {
833 ORTE_ERROR_LOG(rc);
834 goto complete;
835 }
836 /* pass the request id */
837 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &seek_dfs->id, 1, OPAL_UINT64))) {
838 ORTE_ERROR_LOG(rc);
839 opal_list_remove_item(&requests, &seek_dfs->super);
840 goto complete;
841 }
842 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &trk->remote_fd, 1, OPAL_INT))) {
843 ORTE_ERROR_LOG(rc);
844 goto complete;
845 }
846 i64 = (int64_t)seek_dfs->read_length;
847 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &i64, 1, OPAL_INT64))) {
848 ORTE_ERROR_LOG(rc);
849 goto complete;
850 }
851 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &seek_dfs->remote_fd, 1, OPAL_INT))) {
852 ORTE_ERROR_LOG(rc);
853 goto complete;
854 }
855
856 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
857 "%s sending seek file request to %s for fd %d",
858 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
859 ORTE_NAME_PRINT(&trk->host_daemon),
860 trk->local_fd);
861 /* send it */
862 if (0 > (rc = orte_rml.send_buffer_nb(orte_mgmt_conduit,
863 &trk->host_daemon, buffer,
864 ORTE_RML_TAG_DFS_CMD,
865 orte_rml_send_callback, NULL))) {
866 ORTE_ERROR_LOG(rc);
867 OBJ_RELEASE(buffer);
868 goto complete;
869 }
870 /* leave the request */
871 return;
872
873 complete:
874 OBJ_RELEASE(seek_dfs);
875 }
876
877
dfs_seek(int fd,long offset,int whence,orte_dfs_seek_callback_fn_t cbfunc,void * cbdata)878 static void dfs_seek(int fd, long offset, int whence,
879 orte_dfs_seek_callback_fn_t cbfunc,
880 void *cbdata)
881 {
882 orte_dfs_request_t *dfs;
883
884 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
885 "%s seek called on fd %d",
886 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), fd);
887
888 dfs = OBJ_NEW(orte_dfs_request_t);
889 dfs->cmd = ORTE_DFS_SEEK_CMD;
890 dfs->local_fd = fd;
891 dfs->read_length = offset;
892 dfs->remote_fd = whence;
893 dfs->seek_cbfunc = cbfunc;
894 dfs->cbdata = cbdata;
895
896 /* post it for processing */
897 ORTE_THREADSHIFT(dfs, orte_event_base, process_seeks, ORTE_SYS_PRI);
898 }
899
process_reads(int fd,short args,void * cbdata)900 static void process_reads(int fd, short args, void *cbdata)
901 {
902 orte_dfs_request_t *read_dfs = (orte_dfs_request_t*)cbdata;
903 orte_dfs_tracker_t *tptr, *trk;
904 opal_list_item_t *item;
905 opal_buffer_t *buffer;
906 int64_t i64;
907 int rc;
908
909 ORTE_ACQUIRE_OBJECT(read_dfs);
910
911 /* look in our local records for this fd */
912 trk = NULL;
913 for (item = opal_list_get_first(&active_files);
914 item != opal_list_get_end(&active_files);
915 item = opal_list_get_next(item)) {
916 tptr = (orte_dfs_tracker_t*)item;
917 if (tptr->local_fd == read_dfs->local_fd) {
918 trk = tptr;
919 break;
920 }
921 }
922 if (NULL == trk) {
923 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
924 OBJ_RELEASE(read_dfs);
925 return;
926 }
927
928 /* add this request to our pending list */
929 read_dfs->id = req_id++;
930 opal_list_append(&requests, &read_dfs->super);
931
932 /* setup a message for the daemon telling
933 * them what file to read
934 */
935 buffer = OBJ_NEW(opal_buffer_t);
936 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &read_dfs->cmd, 1, ORTE_DFS_CMD_T))) {
937 ORTE_ERROR_LOG(rc);
938 goto complete;
939 }
940 /* include the request id */
941 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &read_dfs->id, 1, OPAL_UINT64))) {
942 ORTE_ERROR_LOG(rc);
943 goto complete;
944 }
945 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &trk->remote_fd, 1, OPAL_INT))) {
946 ORTE_ERROR_LOG(rc);
947 goto complete;
948 }
949 i64 = (int64_t)read_dfs->read_length;
950 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &i64, 1, OPAL_INT64))) {
951 ORTE_ERROR_LOG(rc);
952 goto complete;
953 }
954
955 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
956 "%s sending read file request to %s for fd %d",
957 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
958 ORTE_NAME_PRINT(&trk->host_daemon),
959 trk->local_fd);
960 /* send it */
961 if (0 > (rc = orte_rml.send_buffer_nb(orte_mgmt_conduit,
962 &trk->host_daemon, buffer,
963 ORTE_RML_TAG_DFS_CMD,
964 orte_rml_send_callback, NULL))) {
965 ORTE_ERROR_LOG(rc);
966 OBJ_RELEASE(buffer);
967 }
968 /* don't release the request */
969 return;
970
971 complete:
972 /* don't need to hang on to this request */
973 opal_list_remove_item(&requests, &read_dfs->super);
974 OBJ_RELEASE(read_dfs);
975 }
976
dfs_read(int fd,uint8_t * buffer,long length,orte_dfs_read_callback_fn_t cbfunc,void * cbdata)977 static void dfs_read(int fd, uint8_t *buffer,
978 long length,
979 orte_dfs_read_callback_fn_t cbfunc,
980 void *cbdata)
981 {
982 orte_dfs_request_t *dfs;
983
984 dfs = OBJ_NEW(orte_dfs_request_t);
985 dfs->cmd = ORTE_DFS_READ_CMD;
986 dfs->local_fd = fd;
987 dfs->read_buffer = buffer;
988 dfs->read_length = length;
989 dfs->read_cbfunc = cbfunc;
990 dfs->cbdata = cbdata;
991
992 /* post it for processing */
993 ORTE_THREADSHIFT(dfs, orte_event_base, process_reads, ORTE_SYS_PRI);
994 }
995
process_posts(int fd,short args,void * cbdata)996 static void process_posts(int fd, short args, void *cbdata)
997 {
998 orte_dfs_request_t *dfs = (orte_dfs_request_t*)cbdata;
999 opal_buffer_t *buffer;
1000 int rc;
1001
1002 ORTE_ACQUIRE_OBJECT(dfs);
1003
1004 /* we will get confirmation in our receive function, so
1005 * add this request to our list */
1006 dfs->id = req_id++;
1007 opal_list_append(&requests, &dfs->super);
1008
1009 /* Send the buffer's contents to our local daemon for storage */
1010 buffer = OBJ_NEW(opal_buffer_t);
1011 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->cmd, 1, ORTE_DFS_CMD_T))) {
1012 ORTE_ERROR_LOG(rc);
1013 goto error;
1014 }
1015 /* include the request id */
1016 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->id, 1, OPAL_UINT64))) {
1017 ORTE_ERROR_LOG(rc);
1018 goto error;
1019 }
1020 /* add my name */
1021 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) {
1022 ORTE_ERROR_LOG(rc);
1023 goto error;
1024 }
1025 /* pack the payload */
1026 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->bptr, 1, OPAL_BUFFER))) {
1027 ORTE_ERROR_LOG(rc);
1028 goto error;
1029 }
1030 /* send it */
1031 if (0 > (rc = orte_rml.send_buffer_nb(orte_mgmt_conduit,
1032 ORTE_PROC_MY_DAEMON, buffer,
1033 ORTE_RML_TAG_DFS_CMD,
1034 orte_rml_send_callback, NULL))) {
1035 ORTE_ERROR_LOG(rc);
1036 goto error;
1037 }
1038 return;
1039
1040 error:
1041 OBJ_RELEASE(buffer);
1042 opal_list_remove_item(&requests, &dfs->super);
1043 if (NULL != dfs->post_cbfunc) {
1044 dfs->post_cbfunc(dfs->cbdata);
1045 }
1046 OBJ_RELEASE(dfs);
1047 }
1048
dfs_post_file_map(opal_buffer_t * bo,orte_dfs_post_callback_fn_t cbfunc,void * cbdata)1049 static void dfs_post_file_map(opal_buffer_t *bo,
1050 orte_dfs_post_callback_fn_t cbfunc,
1051 void *cbdata)
1052 {
1053 orte_dfs_request_t *dfs;
1054
1055 dfs = OBJ_NEW(orte_dfs_request_t);
1056 dfs->cmd = ORTE_DFS_POST_CMD;
1057 dfs->bptr = bo;
1058 dfs->post_cbfunc = cbfunc;
1059 dfs->cbdata = cbdata;
1060
1061 /* post it for processing */
1062 ORTE_THREADSHIFT(dfs, orte_event_base, process_posts, ORTE_SYS_PRI);
1063 }
1064
process_getfm(int fd,short args,void * cbdata)1065 static void process_getfm(int fd, short args, void *cbdata)
1066 {
1067 orte_dfs_request_t *dfs = (orte_dfs_request_t*)cbdata;
1068 opal_buffer_t *buffer;
1069 int rc;
1070
1071 ORTE_ACQUIRE_OBJECT(dfs);
1072
1073 /* we will get confirmation in our receive function, so
1074 * add this request to our list */
1075 dfs->id = req_id++;
1076 opal_list_append(&requests, &dfs->super);
1077
1078 /* Send the request to our local daemon */
1079 buffer = OBJ_NEW(opal_buffer_t);
1080 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->cmd, 1, ORTE_DFS_CMD_T))) {
1081 ORTE_ERROR_LOG(rc);
1082 goto error;
1083 }
1084 /* include the request id */
1085 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->id, 1, OPAL_UINT64))) {
1086 ORTE_ERROR_LOG(rc);
1087 goto error;
1088 }
1089 /* and the target */
1090 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->target, 1, ORTE_NAME))) {
1091 ORTE_ERROR_LOG(rc);
1092 goto error;
1093 }
1094 /* send it */
1095 if (0 > (rc = orte_rml.send_buffer_nb(orte_mgmt_conduit,
1096 ORTE_PROC_MY_DAEMON, buffer,
1097 ORTE_RML_TAG_DFS_CMD,
1098 orte_rml_send_callback, NULL))) {
1099 ORTE_ERROR_LOG(rc);
1100 goto error;
1101 }
1102 return;
1103
1104 error:
1105 OBJ_RELEASE(buffer);
1106 opal_list_remove_item(&requests, &dfs->super);
1107 if (NULL != dfs->fm_cbfunc) {
1108 dfs->fm_cbfunc(NULL, dfs->cbdata);
1109 }
1110 OBJ_RELEASE(dfs);
1111 }
1112
dfs_get_file_map(orte_process_name_t * target,orte_dfs_fm_callback_fn_t cbfunc,void * cbdata)1113 static void dfs_get_file_map(orte_process_name_t *target,
1114 orte_dfs_fm_callback_fn_t cbfunc,
1115 void *cbdata)
1116 {
1117 orte_dfs_request_t *dfs;
1118
1119 dfs = OBJ_NEW(orte_dfs_request_t);
1120 dfs->cmd = ORTE_DFS_GETFM_CMD;
1121 dfs->target.jobid = target->jobid;
1122 dfs->target.vpid = target->vpid;
1123 dfs->fm_cbfunc = cbfunc;
1124 dfs->cbdata = cbdata;
1125
1126 /* post it for processing */
1127 ORTE_THREADSHIFT(dfs, orte_event_base, process_getfm, ORTE_SYS_PRI);
1128 }
1129
dfs_load_file_maps(orte_jobid_t jobid,opal_buffer_t * bo,orte_dfs_load_callback_fn_t cbfunc,void * cbdata)1130 static void dfs_load_file_maps(orte_jobid_t jobid,
1131 opal_buffer_t *bo,
1132 orte_dfs_load_callback_fn_t cbfunc,
1133 void *cbdata)
1134 {
1135 /* apps don't store file maps */
1136 if (NULL != cbfunc) {
1137 cbfunc(cbdata);
1138 }
1139 }
1140
dfs_purge_file_maps(orte_jobid_t jobid,orte_dfs_purge_callback_fn_t cbfunc,void * cbdata)1141 static void dfs_purge_file_maps(orte_jobid_t jobid,
1142 orte_dfs_purge_callback_fn_t cbfunc,
1143 void *cbdata)
1144 {
1145 /* apps don't store file maps */
1146 if (NULL != cbfunc) {
1147 cbfunc(cbdata);
1148 }
1149 }
1150