1 /*
2  * Copyright (c) 2012-2013 Los Alamos National Security, LLC.
3  *                         All rights reserved.
4  * Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
5  * Copyright (c) 2014      Research Organization for Information Science
6  *                         and Technology (RIST). All rights reserved.
7  * $COPYRIGHT$
8  *
9  * Additional copyrights may follow
10  *
11  * $HEADER$
12  */
13 
14 #include "orte_config.h"
15 
16 #include <sys/types.h>
17 #ifdef HAVE_UNISTD_H
18 #include <unistd.h>
19 #endif  /* HAVE_UNISTD_H */
20 #include <string.h>
21 #ifdef HAVE_FCNTL_H
22 #include <fcntl.h>
23 #endif
24 #include <sys/stat.h>
25 
26 #include "opal/util/if.h"
27 #include "opal/util/output.h"
28 #include "opal/util/uri.h"
29 #include "opal/dss/dss.h"
30 #include "opal/mca/pmix/pmix.h"
31 
32 #include "orte/util/error_strings.h"
33 #include "orte/util/name_fns.h"
34 #include "orte/util/proc_info.h"
35 #include "orte/util/show_help.h"
36 #include "orte/util/threads.h"
37 #include "orte/runtime/orte_globals.h"
38 #include "orte/mca/errmgr/errmgr.h"
39 #include "orte/mca/rml/rml.h"
40 
41 #include "orte/mca/dfs/base/base.h"
42 #include "dfs_app.h"
43 
44 /*
45  * Module functions: Global
46  */
47 static int init(void);
48 static int finalize(void);
49 
50 static void dfs_open(char *uri,
51                      orte_dfs_open_callback_fn_t cbfunc,
52                      void *cbdata);
53 static void dfs_close(int fd,
54                       orte_dfs_close_callback_fn_t cbfunc,
55                       void *cbdata);
56 static void dfs_get_file_size(int fd,
57                               orte_dfs_size_callback_fn_t cbfunc,
58                               void *cbdata);
59 static void dfs_seek(int fd, long offset, int whence,
60                      orte_dfs_seek_callback_fn_t cbfunc,
61                      void *cbdata);
62 static void dfs_read(int fd, uint8_t *buffer,
63                      long length,
64                      orte_dfs_read_callback_fn_t cbfunc,
65                      void *cbdata);
66 static void dfs_post_file_map(opal_buffer_t *bo,
67                               orte_dfs_post_callback_fn_t cbfunc,
68                               void *cbdata);
69 static void dfs_get_file_map(orte_process_name_t *target,
70                              orte_dfs_fm_callback_fn_t cbfunc,
71                              void *cbdata);
72 static void dfs_load_file_maps(orte_jobid_t jobid,
73                                opal_buffer_t *bo,
74                                orte_dfs_load_callback_fn_t cbfunc,
75                                void *cbdata);
76 static void dfs_purge_file_maps(orte_jobid_t jobid,
77                                 orte_dfs_purge_callback_fn_t cbfunc,
78                                 void *cbdata);
79 
80 /******************
81  * APP module
82  ******************/
83 orte_dfs_base_module_t orte_dfs_app_module = {
84     init,
85     finalize,
86     dfs_open,
87     dfs_close,
88     dfs_get_file_size,
89     dfs_seek,
90     dfs_read,
91     dfs_post_file_map,
92     dfs_get_file_map,
93     dfs_load_file_maps,
94     dfs_purge_file_maps
95 };
96 
97 static opal_list_t requests, active_files;
98 static int local_fd = 0;
99 static uint64_t req_id = 0;
100 static void recv_dfs(int status, orte_process_name_t* sender,
101                      opal_buffer_t* buffer, orte_rml_tag_t tag,
102                      void* cbdata);
103 
init(void)104 static int init(void)
105 {
106     OBJ_CONSTRUCT(&requests, opal_list_t);
107     OBJ_CONSTRUCT(&active_files, opal_list_t);
108     orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
109                             ORTE_RML_TAG_DFS_DATA,
110                             ORTE_RML_PERSISTENT,
111                             recv_dfs,
112                             NULL);
113     return ORTE_SUCCESS;
114 }
115 
finalize(void)116 static int finalize(void)
117 {
118     opal_list_item_t *item;
119 
120     orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DFS_DATA);
121     while (NULL != (item = opal_list_remove_first(&requests))) {
122         OBJ_RELEASE(item);
123     }
124     OBJ_DESTRUCT(&requests);
125     while (NULL != (item = opal_list_remove_first(&active_files))) {
126         OBJ_RELEASE(item);
127     }
128     OBJ_DESTRUCT(&active_files);
129     return ORTE_SUCCESS;
130 }
131 
132 /* receives take place in an event, so we are free to process
133  * the request list without fear of getting things out-of-order
134  */
recv_dfs(int status,orte_process_name_t * sender,opal_buffer_t * buffer,orte_rml_tag_t tag,void * cbdata)135 static void recv_dfs(int status, orte_process_name_t* sender,
136                      opal_buffer_t* buffer, orte_rml_tag_t tag,
137                      void* cbdata)
138 {
139     orte_dfs_cmd_t cmd;
140     int32_t cnt;
141     orte_dfs_request_t *dfs, *dptr;
142     opal_list_item_t *item;
143     int remote_fd, rc;
144     int64_t i64;
145     uint64_t rid;
146     orte_dfs_tracker_t *trk;
147 
148     /* unpack the command this message is responding to */
149     cnt = 1;
150     if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &cmd, &cnt, ORTE_DFS_CMD_T))) {
151         ORTE_ERROR_LOG(rc);
152         return;
153     }
154 
155     opal_output_verbose(1, orte_dfs_base_framework.framework_output,
156                         "%s recvd cmd %d from sender %s",
157                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)cmd,
158                         ORTE_NAME_PRINT(sender));
159 
160     switch (cmd) {
161     case ORTE_DFS_OPEN_CMD:
162         /* unpack the request id */
163         cnt = 1;
164         if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
165             ORTE_ERROR_LOG(rc);
166             return;
167         }
168         /* unpack the remote fd */
169         cnt = 1;
170         if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &remote_fd, &cnt, OPAL_INT))) {
171             ORTE_ERROR_LOG(rc);
172             return;
173         }
174         /* search our list of requests to find the matching one */
175         dfs = NULL;
176         for (item = opal_list_get_first(&requests);
177              item != opal_list_get_end(&requests);
178              item = opal_list_get_next(item)) {
179             dptr = (orte_dfs_request_t*)item;
180             if (dptr->id == rid) {
181                 /* as the request has been fulfilled, remove it */
182                 opal_list_remove_item(&requests, item);
183                 dfs = dptr;
184                 break;
185             }
186         }
187         if (NULL == dfs) {
188             opal_output_verbose(1, orte_dfs_base_framework.framework_output,
189                                 "%s recvd open file - no corresponding request found for local fd %d",
190                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), local_fd);
191             ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
192             return;
193         }
194 
195         /* if the remote_fd < 0, then we had an error, so return
196          * the error value to the caller
197          */
198         if (remote_fd < 0) {
199             opal_output_verbose(1, orte_dfs_base_framework.framework_output,
200                                 "%s recvd open file response error file %s [error: %d]",
201                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
202                                 dfs->uri, remote_fd);
203             if (NULL != dfs->open_cbfunc) {
204                 dfs->open_cbfunc(remote_fd, dfs->cbdata);
205             }
206             /* release the request */
207             OBJ_RELEASE(dfs);
208             return;
209         }
210         /* otherwise, create a tracker for this file */
211         trk = OBJ_NEW(orte_dfs_tracker_t);
212         trk->requestor.jobid = ORTE_PROC_MY_NAME->jobid;
213         trk->requestor.vpid = ORTE_PROC_MY_NAME->vpid;
214         trk->host_daemon.jobid = sender->jobid;
215         trk->host_daemon.vpid = sender->vpid;
216         trk->uri = strdup(dfs->uri);
217         /* break the uri down into scheme and filename */
218         trk->scheme = opal_uri_get_scheme(dfs->uri);
219         trk->filename = opal_filename_from_uri(dfs->uri, NULL);
220         /* define the local fd */
221         trk->local_fd = local_fd++;
222         /* record the remote file descriptor */
223         trk->remote_fd = remote_fd;
224         /* add it to our list of active files */
225         opal_list_append(&active_files, &trk->super);
226         /* return the local_fd to the caller for
227          * subsequent operations
228          */
229         opal_output_verbose(1, orte_dfs_base_framework.framework_output,
230                             "%s recvd open file completed for file %s [local fd: %d remote fd: %d]",
231                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
232                             dfs->uri, trk->local_fd, remote_fd);
233         if (NULL != dfs->open_cbfunc) {
234             dfs->open_cbfunc(trk->local_fd, dfs->cbdata);
235         }
236         /* release the request */
237         OBJ_RELEASE(dfs);
238         break;
239 
240     case ORTE_DFS_SIZE_CMD:
241         /* unpack the request id for this request */
242         cnt = 1;
243         if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
244             ORTE_ERROR_LOG(rc);
245             return;
246         }
247         /* search our list of requests to find the matching one */
248         dfs = NULL;
249         for (item = opal_list_get_first(&requests);
250              item != opal_list_get_end(&requests);
251              item = opal_list_get_next(item)) {
252             dptr = (orte_dfs_request_t*)item;
253             if (dptr->id == rid) {
254                 /* request was fulfilled, so remove it */
255                 opal_list_remove_item(&requests, item);
256                 dfs = dptr;
257                 break;
258             }
259         }
260         if (NULL == dfs) {
261             opal_output_verbose(1, orte_dfs_base_framework.framework_output,
262                                 "%s recvd size - no corresponding request found for local fd %d",
263                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), local_fd);
264             ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
265             return;
266         }
267         /* get the size */
268         cnt = 1;
269         if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &i64, &cnt, OPAL_INT64))) {
270             ORTE_ERROR_LOG(rc);
271             OBJ_RELEASE(dfs);
272             return;
273         }
274         /* pass it back to the original caller */
275         if (NULL != dfs->size_cbfunc) {
276             dfs->size_cbfunc(i64, dfs->cbdata);
277         }
278         /* release the request */
279         OBJ_RELEASE(dfs);
280         break;
281 
282     case ORTE_DFS_SEEK_CMD:
283         /* unpack the request id for this read */
284         cnt = 1;
285         if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
286             ORTE_ERROR_LOG(rc);
287             return;
288         }
289         /* search our list of requests to find the matching one */
290         dfs = NULL;
291         for (item = opal_list_get_first(&requests);
292              item != opal_list_get_end(&requests);
293              item = opal_list_get_next(item)) {
294             dptr = (orte_dfs_request_t*)item;
295             if (dptr->id == rid) {
296                 /* request was fulfilled, so remove it */
297                 opal_list_remove_item(&requests, item);
298                 dfs = dptr;
299                 break;
300             }
301         }
302         if (NULL == dfs) {
303             opal_output_verbose(1, orte_dfs_base_framework.framework_output,
304                                 "%s recvd seek - no corresponding request found for local fd %d",
305                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), local_fd);
306             ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
307             return;
308         }
309         /* get the returned offset/status */
310         cnt = 1;
311         if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &i64, &cnt, OPAL_INT64))) {
312             ORTE_ERROR_LOG(rc);
313             OBJ_RELEASE(dfs);
314             return;
315         }
316         /* pass it back to the original caller */
317         if (NULL != dfs->seek_cbfunc) {
318             dfs->seek_cbfunc(i64, dfs->cbdata);
319         }
320         /* release the request */
321         OBJ_RELEASE(dfs);
322         break;
323 
324     case ORTE_DFS_READ_CMD:
325         /* unpack the request id for this read */
326         cnt = 1;
327         if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
328             ORTE_ERROR_LOG(rc);
329             return;
330         }
331         /* search our list of requests to find the matching one */
332         dfs = NULL;
333         for (item = opal_list_get_first(&requests);
334              item != opal_list_get_end(&requests);
335              item = opal_list_get_next(item)) {
336             dptr = (orte_dfs_request_t*)item;
337             if (dptr->id == rid) {
338                 /* request was fulfilled, so remove it */
339                 opal_list_remove_item(&requests, item);
340                 dfs = dptr;
341                 break;
342             }
343         }
344         if (NULL == dfs) {
345             opal_output_verbose(1, orte_dfs_base_framework.framework_output,
346                                 "%s recvd read - no corresponding request found for local fd %d",
347                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), local_fd);
348             ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
349             return;
350         }
351         /* get the bytes read */
352         cnt = 1;
353         if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &i64, &cnt, OPAL_INT64))) {
354             ORTE_ERROR_LOG(rc);
355             OBJ_RELEASE(dfs);
356             return;
357         }
358         if (0 < i64) {
359             cnt = i64;
360             if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, dfs->read_buffer, &cnt, OPAL_UINT8))) {
361                 ORTE_ERROR_LOG(rc);
362                 OBJ_RELEASE(dfs);
363                 return;
364             }
365         }
366         /* pass them back to the original caller */
367         if (NULL != dfs->read_cbfunc) {
368             dfs->read_cbfunc(i64, dfs->read_buffer, dfs->cbdata);
369         }
370         /* release the request */
371         OBJ_RELEASE(dfs);
372         break;
373 
374     case ORTE_DFS_POST_CMD:
375         /* unpack the request id for this read */
376         cnt = 1;
377         if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
378             ORTE_ERROR_LOG(rc);
379             return;
380         }
381         /* search our list of requests to find the matching one */
382         dfs = NULL;
383         for (item = opal_list_get_first(&requests);
384              item != opal_list_get_end(&requests);
385              item = opal_list_get_next(item)) {
386             dptr = (orte_dfs_request_t*)item;
387             if (dptr->id == rid) {
388                 /* request was fulfilled, so remove it */
389                 opal_list_remove_item(&requests, item);
390                 dfs = dptr;
391                 break;
392             }
393         }
394         if (NULL == dfs) {
395             opal_output_verbose(1, orte_dfs_base_framework.framework_output,
396                                 "%s recvd post - no corresponding request found",
397                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
398             ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
399             return;
400         }
401         if (NULL != dfs->post_cbfunc) {
402             dfs->post_cbfunc(dfs->cbdata);
403         }
404         OBJ_RELEASE(dfs);
405         break;
406 
407     case ORTE_DFS_GETFM_CMD:
408         /* unpack the request id for this read */
409         cnt = 1;
410         if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
411             ORTE_ERROR_LOG(rc);
412             return;
413         }
414         /* search our list of requests to find the matching one */
415         dfs = NULL;
416         for (item = opal_list_get_first(&requests);
417              item != opal_list_get_end(&requests);
418              item = opal_list_get_next(item)) {
419             dptr = (orte_dfs_request_t*)item;
420             if (dptr->id == rid) {
421                 /* request was fulfilled, so remove it */
422                 opal_list_remove_item(&requests, item);
423                 dfs = dptr;
424                 break;
425             }
426         }
427         if (NULL == dfs) {
428             opal_output_verbose(1, orte_dfs_base_framework.framework_output,
429                                 "%s recvd getfm - no corresponding request found",
430                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
431             ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
432             return;
433         }
434         /* return it to caller */
435         if (NULL != dfs->fm_cbfunc) {
436             dfs->fm_cbfunc(buffer, dfs->cbdata);
437         }
438         OBJ_RELEASE(dfs);
439         break;
440 
441     default:
442         opal_output(0, "APP:DFS:RECV WTF");
443         break;
444     }
445 }
446 
open_local_file(orte_dfs_request_t * dfs)447 static void open_local_file(orte_dfs_request_t *dfs)
448 {
449     char *filename;
450     orte_dfs_tracker_t *trk;
451 
452     /* extract the filename from the uri */
453     if (NULL == (filename = opal_filename_from_uri(dfs->uri, NULL))) {
454         /* something wrong - error was reported, so just get out */
455         if (NULL != dfs->open_cbfunc) {
456             dfs->open_cbfunc(-1, dfs->cbdata);
457         }
458         OBJ_RELEASE(dfs);
459         return;
460     }
461     opal_output_verbose(1, orte_dfs_base_framework.framework_output,
462                         "%s opening local file %s",
463                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
464                         filename);
465     /* attempt to open the file */
466     if (0 > (dfs->remote_fd = open(filename, O_RDONLY))) {
467         ORTE_ERROR_LOG(ORTE_ERR_FILE_OPEN_FAILURE);
468         if (NULL != dfs->open_cbfunc) {
469             dfs->open_cbfunc(dfs->remote_fd, dfs->cbdata);
470         }
471         return;
472     }
473     /* otherwise, create a tracker for this file */
474     trk = OBJ_NEW(orte_dfs_tracker_t);
475     trk->requestor.jobid = ORTE_PROC_MY_NAME->jobid;
476     trk->requestor.vpid = ORTE_PROC_MY_NAME->vpid;
477     trk->uri = strdup(dfs->uri);
478     /* break the uri down into scheme and filename */
479     trk->scheme = opal_uri_get_scheme(dfs->uri);
480     trk->filename = strdup(filename);
481     /* define the local fd */
482     trk->local_fd = local_fd++;
483     /* record the remote file descriptor */
484     trk->remote_fd = dfs->remote_fd;
485     /* add it to our list of active files */
486     opal_list_append(&active_files, &trk->super);
487     /* the file is locally hosted */
488     trk->host_daemon.jobid = ORTE_PROC_MY_DAEMON->jobid;
489     trk->host_daemon.vpid = ORTE_PROC_MY_DAEMON->vpid;
490     opal_output_verbose(1, orte_dfs_base_framework.framework_output,
491                         "%s local file %s mapped localfd %d to remotefd %d",
492                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
493                         filename, trk->local_fd, trk->remote_fd);
494     /* let the caller know */
495     if (NULL != dfs->open_cbfunc) {
496         dfs->open_cbfunc(trk->local_fd, dfs->cbdata);
497     }
498     /* request will be released by the calling routing */
499 }
500 
process_opens(int fd,short args,void * cbdata)501 static void process_opens(int fd, short args, void *cbdata)
502 {
503     orte_dfs_request_t *dfs = (orte_dfs_request_t*)cbdata;
504     int rc;
505     opal_buffer_t *buffer;
506     char *scheme, *host, *filename;
507     orte_process_name_t daemon;
508     opal_list_t lt;
509     opal_namelist_t *nm;
510 
511     ORTE_ACQUIRE_OBJECT(dfs);
512 
513     /* get the scheme to determine if we can process locally or not */
514     if (NULL == (scheme = opal_uri_get_scheme(dfs->uri))) {
515         ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
516         goto complete;
517     }
518 
519     if (0 == strcmp(scheme, "nfs")) {
520         open_local_file(dfs);
521         /* the callback was done in the above function */
522         OBJ_RELEASE(dfs);
523         return;
524     }
525 
526     if (0 != strcmp(scheme, "file")) {
527         /* not yet supported */
528         orte_show_help("orte_dfs_help.txt", "unsupported-filesystem",
529                        true, dfs->uri);
530         goto complete;
531     }
532 
533     /* dissect the uri to extract host and filename/path */
534     if (NULL == (filename = opal_filename_from_uri(dfs->uri, &host))) {
535         goto complete;
536     }
537     if (NULL == host) {
538         host = strdup(orte_process_info.nodename);
539     }
540 
541     /* if the host is our own, then treat it as a local file */
542     if (orte_ifislocal(host)) {
543         opal_output_verbose(1, orte_dfs_base_framework.framework_output,
544                             "%s file %s on local host",
545                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
546                             filename);
547         open_local_file(dfs);
548         /* the callback was done in the above function */
549         OBJ_RELEASE(dfs);
550         return;
551     }
552 
553     /* ident the daemon on that host */
554     daemon.jobid = ORTE_PROC_MY_DAEMON->jobid;
555     /* fetch the daemon for this hostname */
556     opal_output_verbose(1, orte_dfs_base_framework.framework_output,
557                         "%s looking for daemon on host %s",
558                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), host);
559     OBJ_CONSTRUCT(&lt, opal_list_t);
560     if (ORTE_SUCCESS != (rc = opal_pmix.resolve_peers(host, daemon.jobid, &lt))) {
561         ORTE_ERROR_LOG(rc);
562         OBJ_DESTRUCT(&lt);
563         goto complete;
564     }
565     nm = (opal_namelist_t*)opal_list_get_first(&lt);
566     daemon.vpid = nm->name.vpid;
567     OPAL_LIST_DESTRUCT(&lt);
568 
569     opal_output_verbose(1, orte_dfs_base_framework.framework_output,
570                         "%s file %s on host %s daemon %s",
571                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
572                         filename, host, ORTE_NAME_PRINT(&daemon));
573 
574     /* double-check: if it is our local daemon, then we
575      * treat this as local
576      */
577     if (daemon.vpid == ORTE_PROC_MY_DAEMON->vpid) {
578         opal_output_verbose(1, orte_dfs_base_framework.framework_output,
579                             "%s local file %s on same daemon",
580                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
581                             filename);
582         open_local_file(dfs);
583         /* the callback was done in the above function */
584         OBJ_RELEASE(dfs);
585         return;
586     }
587 
588     /* add this request to our local list so we can
589      * match it with the returned response when it comes
590      */
591     dfs->id = req_id++;
592     opal_list_append(&requests, &dfs->super);
593 
594     /* setup a message for the daemon telling
595      * them what file we want to access
596      */
597     buffer = OBJ_NEW(opal_buffer_t);
598     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->cmd, 1, ORTE_DFS_CMD_T))) {
599         ORTE_ERROR_LOG(rc);
600         opal_list_remove_item(&requests, &dfs->super);
601         goto complete;
602     }
603     /* pass the request id */
604     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->id, 1, OPAL_UINT64))) {
605         ORTE_ERROR_LOG(rc);
606         opal_list_remove_item(&requests, &dfs->super);
607         goto complete;
608     }
609     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &filename, 1, OPAL_STRING))) {
610         ORTE_ERROR_LOG(rc);
611         opal_list_remove_item(&requests, &dfs->super);
612         goto complete;
613     }
614 
615     opal_output_verbose(1, orte_dfs_base_framework.framework_output,
616                         "%s sending open file request to %s file %s",
617                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
618                         ORTE_NAME_PRINT(&daemon),
619                         filename);
620     /* send it */
621     if (0 > (rc = orte_rml.send_buffer_nb(orte_mgmt_conduit,
622                                           &daemon, buffer,
623                                           ORTE_RML_TAG_DFS_CMD,
624                                           orte_rml_send_callback, NULL))) {
625         ORTE_ERROR_LOG(rc);
626         OBJ_RELEASE(buffer);
627         opal_list_remove_item(&requests, &dfs->super);
628         goto complete;
629     }
630     /* don't release it */
631     return;
632 
633  complete:
634     /* we get here if an error occurred - execute any
635      * pending callback so the proc doesn't hang
636      */
637     if (NULL != dfs->open_cbfunc) {
638         dfs->open_cbfunc(-1, dfs->cbdata);
639     }
640     OBJ_RELEASE(dfs);
641 }
642 
643 
644 /* in order to handle the possible opening/reading of files by
645  * multiple threads, we have to ensure that all operations are
646  * carried out in events - so the "open" cmd simply posts an
647  * event containing the required info, and then returns
648  */
dfs_open(char * uri,orte_dfs_open_callback_fn_t cbfunc,void * cbdata)649 static void dfs_open(char *uri,
650                      orte_dfs_open_callback_fn_t cbfunc,
651                      void *cbdata)
652 {
653     orte_dfs_request_t *dfs;
654 
655     opal_output_verbose(1, orte_dfs_base_framework.framework_output,
656                         "%s opening file %s",
657                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), uri);
658 
659     /* setup the request */
660     dfs = OBJ_NEW(orte_dfs_request_t);
661     dfs->cmd = ORTE_DFS_OPEN_CMD;
662     dfs->uri = strdup(uri);
663     dfs->open_cbfunc = cbfunc;
664     dfs->cbdata = cbdata;
665 
666     /* post it for processing */
667     ORTE_THREADSHIFT(dfs, orte_event_base, process_opens, ORTE_SYS_PRI);
668 }
669 
process_close(int fd,short args,void * cbdata)670 static void process_close(int fd, short args, void *cbdata)
671 {
672     orte_dfs_request_t *close_dfs = (orte_dfs_request_t*)cbdata;
673     orte_dfs_tracker_t *tptr, *trk;
674     opal_list_item_t *item;
675     opal_buffer_t *buffer;
676     int rc;
677 
678     ORTE_ACQUIRE_OBJECT(close_dfs);
679 
680     opal_output_verbose(1, orte_dfs_base_framework.framework_output,
681                         "%s closing fd %d",
682                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
683                         close_dfs->local_fd);
684 
685     /* look in our local records for this fd */
686     trk = NULL;
687     for (item = opal_list_get_first(&active_files);
688          item != opal_list_get_end(&active_files);
689          item = opal_list_get_next(item)) {
690         tptr = (orte_dfs_tracker_t*)item;
691         if (tptr->local_fd == close_dfs->local_fd) {
692             trk = tptr;
693             break;
694         }
695     }
696     if (NULL == trk) {
697         ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
698         if (NULL != close_dfs->close_cbfunc) {
699             close_dfs->close_cbfunc(close_dfs->local_fd, close_dfs->cbdata);
700         }
701         OBJ_RELEASE(close_dfs);
702         return;
703     }
704 
705     /* if the file is local, close it */
706     if (trk->host_daemon.vpid == ORTE_PROC_MY_DAEMON->vpid) {
707         close(trk->remote_fd);
708         goto complete;
709     }
710 
711     /* setup a message for the daemon telling
712      * them what file to close
713      */
714     buffer = OBJ_NEW(opal_buffer_t);
715     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &close_dfs->cmd, 1, ORTE_DFS_CMD_T))) {
716         ORTE_ERROR_LOG(rc);
717         goto complete;
718     }
719     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &trk->remote_fd, 1, OPAL_INT))) {
720         ORTE_ERROR_LOG(rc);
721         goto complete;
722     }
723 
724     opal_output_verbose(1, orte_dfs_base_framework.framework_output,
725                         "%s sending close file request to %s for fd %d",
726                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
727                         ORTE_NAME_PRINT(&trk->host_daemon),
728                         trk->local_fd);
729     /* send it */
730     if (0 > (rc = orte_rml.send_buffer_nb(orte_mgmt_conduit,
731                                           &trk->host_daemon, buffer,
732                                           ORTE_RML_TAG_DFS_CMD,
733                                           orte_rml_send_callback, NULL))) {
734         ORTE_ERROR_LOG(rc);
735         OBJ_RELEASE(buffer);
736         goto complete;
737     }
738 
739  complete:
740     opal_list_remove_item(&active_files, &trk->super);
741     OBJ_RELEASE(trk);
742     if (NULL != close_dfs->close_cbfunc) {
743         close_dfs->close_cbfunc(close_dfs->local_fd, close_dfs->cbdata);
744     }
745     OBJ_RELEASE(close_dfs);
746 }
747 
dfs_close(int fd,orte_dfs_close_callback_fn_t cbfunc,void * cbdata)748 static void dfs_close(int fd,
749                       orte_dfs_close_callback_fn_t cbfunc,
750                       void *cbdata)
751 {
752     orte_dfs_request_t *dfs;
753 
754     opal_output_verbose(1, orte_dfs_base_framework.framework_output,
755                         "%s close called on fd %d",
756                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), fd);
757 
758     dfs = OBJ_NEW(orte_dfs_request_t);
759     dfs->cmd = ORTE_DFS_CLOSE_CMD;
760     dfs->local_fd = fd;
761     dfs->close_cbfunc = cbfunc;
762     dfs->cbdata = cbdata;
763 
764     /* post it for processing */
765     ORTE_THREADSHIFT(dfs, orte_event_base, process_close, ORTE_SYS_PRI);
766 }
767 
process_sizes(int fd,short args,void * cbdata)768 static void process_sizes(int fd, short args, void *cbdata)
769 {
770     orte_dfs_request_t *size_dfs = (orte_dfs_request_t*)cbdata;
771     orte_dfs_tracker_t *tptr, *trk;
772     opal_list_item_t *item;
773     opal_buffer_t *buffer;
774     int rc;
775     struct stat buf;
776 
777     ORTE_ACQUIRE_OBJECT(size_dfs);
778 
779     opal_output_verbose(1, orte_dfs_base_framework.framework_output,
780                         "%s processing get_size on fd %d",
781                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
782                         size_dfs->local_fd);
783 
784     /* look in our local records for this fd */
785     trk = NULL;
786     for (item = opal_list_get_first(&active_files);
787          item != opal_list_get_end(&active_files);
788          item = opal_list_get_next(item)) {
789         tptr = (orte_dfs_tracker_t*)item;
790         if (tptr->local_fd == size_dfs->local_fd) {
791             trk = tptr;
792             break;
793         }
794     }
795     if (NULL == trk) {
796         ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
797         OBJ_RELEASE(size_dfs);
798         return;
799     }
800 
801     /* if the file is local, execute the seek on it - we
802      * stuck the "whence" value in the remote_fd
803      */
804     if (trk->host_daemon.vpid == ORTE_PROC_MY_DAEMON->vpid) {
805         /* stat the file and get its size */
806         if (0 > stat(trk->filename, &buf)) {
807             /* cannot stat file */
808             opal_output_verbose(1, orte_dfs_base_framework.framework_output,
809                                 "%s could not stat %s",
810                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
811                                 trk->filename);
812             if (NULL != size_dfs->size_cbfunc) {
813                 size_dfs->size_cbfunc(-1, size_dfs->cbdata);
814             }
815         } else {
816             if (NULL != size_dfs->size_cbfunc) {
817                 size_dfs->size_cbfunc(buf.st_size, size_dfs->cbdata);
818             }
819         }
820         goto complete;
821     }
822     /* add this request to our local list so we can
823      * match it with the returned response when it comes
824      */
825     size_dfs->id = req_id++;
826     opal_list_append(&requests, &size_dfs->super);
827 
828     /* setup a message for the daemon telling
829      * them what file we want to access
830      */
831     buffer = OBJ_NEW(opal_buffer_t);
832     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &size_dfs->cmd, 1, ORTE_DFS_CMD_T))) {
833         ORTE_ERROR_LOG(rc);
834         opal_list_remove_item(&requests, &size_dfs->super);
835         goto complete;
836     }
837     /* pass the request id */
838     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &size_dfs->id, 1, OPAL_UINT64))) {
839         ORTE_ERROR_LOG(rc);
840         opal_list_remove_item(&requests, &size_dfs->super);
841         goto complete;
842     }
843     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &trk->remote_fd, 1, OPAL_INT))) {
844         ORTE_ERROR_LOG(rc);
845         opal_list_remove_item(&requests, &size_dfs->super);
846         goto complete;
847     }
848 
849     opal_output_verbose(1, orte_dfs_base_framework.framework_output,
850                         "%s sending get_size request to %s for fd %d",
851                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
852                         ORTE_NAME_PRINT(&trk->host_daemon),
853                         trk->local_fd);
854     /* send it */
855     if (0 > (rc = orte_rml.send_buffer_nb(orte_mgmt_conduit,
856                                           &trk->host_daemon, buffer,
857                                           ORTE_RML_TAG_DFS_CMD,
858                                           orte_rml_send_callback, NULL))) {
859         ORTE_ERROR_LOG(rc);
860         OBJ_RELEASE(buffer);
861         opal_list_remove_item(&requests, &size_dfs->super);
862         if (NULL != size_dfs->size_cbfunc) {
863             size_dfs->size_cbfunc(-1, size_dfs->cbdata);
864         }
865         goto complete;
866     }
867     /* leave the request there */
868     return;
869 
870  complete:
871     OBJ_RELEASE(size_dfs);
872 }
873 
dfs_get_file_size(int fd,orte_dfs_size_callback_fn_t cbfunc,void * cbdata)874 static void dfs_get_file_size(int fd,
875                               orte_dfs_size_callback_fn_t cbfunc,
876                               void *cbdata)
877 {
878     orte_dfs_request_t *dfs;
879 
880     opal_output_verbose(1, orte_dfs_base_framework.framework_output,
881                         "%s get_size called on fd %d",
882                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), fd);
883 
884     dfs = OBJ_NEW(orte_dfs_request_t);
885     dfs->cmd = ORTE_DFS_SIZE_CMD;
886     dfs->local_fd = fd;
887     dfs->size_cbfunc = cbfunc;
888     dfs->cbdata = cbdata;
889 
890     /* post it for processing */
891     ORTE_THREADSHIFT(dfs, orte_event_base, process_sizes, ORTE_SYS_PRI);
892 }
893 
894 
process_seeks(int fd,short args,void * cbdata)895 static void process_seeks(int fd, short args, void *cbdata)
896 {
897     orte_dfs_request_t *seek_dfs = (orte_dfs_request_t*)cbdata;
898     orte_dfs_tracker_t *tptr, *trk;
899     opal_list_item_t *item;
900     opal_buffer_t *buffer;
901     int64_t i64;
902     int rc;
903     struct stat buf;
904 
905     ORTE_ACQUIRE_OBJECT(seek_dfs);
906 
907     opal_output_verbose(1, orte_dfs_base_framework.framework_output,
908                         "%s processing seek on fd %d",
909                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
910                         seek_dfs->local_fd);
911 
912     /* look in our local records for this fd */
913     trk = NULL;
914     for (item = opal_list_get_first(&active_files);
915          item != opal_list_get_end(&active_files);
916          item = opal_list_get_next(item)) {
917         tptr = (orte_dfs_tracker_t*)item;
918         if (tptr->local_fd == seek_dfs->local_fd) {
919             trk = tptr;
920             break;
921         }
922     }
923     if (NULL == trk) {
924         ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
925         OBJ_RELEASE(seek_dfs);
926         return;
927     }
928 
929     /* if the file is local, execute the seek on it - we
930      * stuck the "whence" value in the remote_fd
931      */
932     if (trk->host_daemon.vpid == ORTE_PROC_MY_DAEMON->vpid) {
933         opal_output_verbose(1, orte_dfs_base_framework.framework_output,
934                             "%s local seek on fd %d",
935                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
936                             seek_dfs->local_fd);
937         /* stat the file and get its size */
938         if (0 > stat(trk->filename, &buf)) {
939             /* cannot stat file */
940             opal_output_verbose(1, orte_dfs_base_framework.framework_output,
941                                 "%s could not stat %s",
942                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
943                                 trk->filename);
944             if (NULL != seek_dfs->seek_cbfunc) {
945                 seek_dfs->seek_cbfunc(-1, seek_dfs->cbdata);
946             }
947         } else if (buf.st_size < seek_dfs->read_length &&
948                    SEEK_SET == seek_dfs->remote_fd) {
949             /* seek would take us past EOF */
950             if (NULL != seek_dfs->seek_cbfunc) {
951                 seek_dfs->seek_cbfunc(-1, seek_dfs->cbdata);
952             }
953         } else if (buf.st_size < (off_t)(trk->location + seek_dfs->read_length) &&
954                    SEEK_CUR == seek_dfs->remote_fd) {
955             /* seek would take us past EOF */
956             if (NULL != seek_dfs->seek_cbfunc) {
957                 seek_dfs->seek_cbfunc(-1, seek_dfs->cbdata);
958             }
959         } else {
960             lseek(trk->remote_fd, seek_dfs->read_length, seek_dfs->remote_fd);
961             if (SEEK_SET == seek_dfs->remote_fd) {
962                 trk->location = seek_dfs->read_length;
963             } else {
964                 trk->location += seek_dfs->read_length;
965             }
966             if (NULL != seek_dfs->seek_cbfunc) {
967                 seek_dfs->seek_cbfunc(seek_dfs->read_length, seek_dfs->cbdata);
968             }
969         }
970         goto complete;
971     }
972     /* add this request to our local list so we can
973      * match it with the returned response when it comes
974      */
975     seek_dfs->id = req_id++;
976     opal_list_append(&requests, &seek_dfs->super);
977 
978     /* setup a message for the daemon telling
979      * them what file to seek
980      */
981     buffer = OBJ_NEW(opal_buffer_t);
982     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &seek_dfs->cmd, 1, ORTE_DFS_CMD_T))) {
983         ORTE_ERROR_LOG(rc);
984         goto complete;
985     }
986     /* pass the request id */
987     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &seek_dfs->id, 1, OPAL_UINT64))) {
988         ORTE_ERROR_LOG(rc);
989         opal_list_remove_item(&requests, &seek_dfs->super);
990         goto complete;
991     }
992     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &trk->remote_fd, 1, OPAL_INT))) {
993         ORTE_ERROR_LOG(rc);
994         goto complete;
995     }
996     i64 = (int64_t)seek_dfs->read_length;
997     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &i64, 1, OPAL_INT64))) {
998         ORTE_ERROR_LOG(rc);
999         goto complete;
1000     }
1001     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &seek_dfs->remote_fd, 1, OPAL_INT))) {
1002         ORTE_ERROR_LOG(rc);
1003         goto complete;
1004     }
1005 
1006     opal_output_verbose(1, orte_dfs_base_framework.framework_output,
1007                         "%s sending seek file request to %s for fd %d",
1008                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1009                         ORTE_NAME_PRINT(&trk->host_daemon),
1010                         trk->local_fd);
1011     /* send it */
1012     if (0 > (rc = orte_rml.send_buffer_nb(orte_mgmt_conduit,
1013                                           &trk->host_daemon, buffer,
1014                                           ORTE_RML_TAG_DFS_CMD,
1015                                           orte_rml_send_callback, NULL))) {
1016         ORTE_ERROR_LOG(rc);
1017         OBJ_RELEASE(buffer);
1018         goto complete;
1019     }
1020     /* leave the request */
1021     return;
1022 
1023  complete:
1024     OBJ_RELEASE(seek_dfs);
1025 }
1026 
1027 
dfs_seek(int fd,long offset,int whence,orte_dfs_seek_callback_fn_t cbfunc,void * cbdata)1028 static void dfs_seek(int fd, long offset, int whence,
1029                      orte_dfs_seek_callback_fn_t cbfunc,
1030                      void *cbdata)
1031 {
1032     orte_dfs_request_t *dfs;
1033 
1034     opal_output_verbose(1, orte_dfs_base_framework.framework_output,
1035                         "%s seek called on fd %d",
1036                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), fd);
1037 
1038     dfs = OBJ_NEW(orte_dfs_request_t);
1039     dfs->cmd = ORTE_DFS_SEEK_CMD;
1040     dfs->local_fd = fd;
1041     dfs->read_length = offset;
1042     dfs->remote_fd = whence;
1043     dfs->seek_cbfunc = cbfunc;
1044     dfs->cbdata = cbdata;
1045 
1046     /* post it for processing */
1047     ORTE_THREADSHIFT(dfs, orte_event_base, process_seeks, ORTE_SYS_PRI);
1048 }
1049 
process_reads(int fd,short args,void * cbdata)1050 static void process_reads(int fd, short args, void *cbdata)
1051 {
1052     orte_dfs_request_t *read_dfs = (orte_dfs_request_t*)cbdata;
1053     orte_dfs_tracker_t *tptr, *trk;
1054     long nbytes;
1055     opal_list_item_t *item;
1056     opal_buffer_t *buffer;
1057     int64_t i64;
1058     int rc;
1059 
1060     ORTE_ACQUIRE_OBJECT(read_dfs);
1061 
1062     /* look in our local records for this fd */
1063     trk = NULL;
1064     for (item = opal_list_get_first(&active_files);
1065          item != opal_list_get_end(&active_files);
1066          item = opal_list_get_next(item)) {
1067         tptr = (orte_dfs_tracker_t*)item;
1068         if (tptr->local_fd == read_dfs->local_fd) {
1069             trk = tptr;
1070             break;
1071         }
1072     }
1073     if (NULL == trk) {
1074         ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
1075         OBJ_RELEASE(read_dfs);
1076         return;
1077     }
1078 
1079     /* if the file is local, read the desired bytes */
1080     if (trk->host_daemon.vpid == ORTE_PROC_MY_DAEMON->vpid) {
1081         nbytes = read(trk->remote_fd, read_dfs->read_buffer, read_dfs->read_length);
1082         if (0 < nbytes) {
1083             /* update our location */
1084             trk->location += nbytes;
1085         }
1086         /* pass them back to the caller */
1087         if (NULL != read_dfs->read_cbfunc) {
1088             read_dfs->read_cbfunc(nbytes, read_dfs->read_buffer, read_dfs->cbdata);
1089         }
1090         /* request is complete */
1091         OBJ_RELEASE(read_dfs);
1092         return;
1093     }
1094     /* add this request to our pending list */
1095     read_dfs->id = req_id++;
1096     opal_list_append(&requests, &read_dfs->super);
1097 
1098     /* setup a message for the daemon telling
1099      * them what file to read
1100      */
1101     buffer = OBJ_NEW(opal_buffer_t);
1102     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &read_dfs->cmd, 1, ORTE_DFS_CMD_T))) {
1103         ORTE_ERROR_LOG(rc);
1104         goto complete;
1105     }
1106     /* include the request id */
1107     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &read_dfs->id, 1, OPAL_UINT64))) {
1108         ORTE_ERROR_LOG(rc);
1109         goto complete;
1110     }
1111     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &trk->remote_fd, 1, OPAL_INT))) {
1112         ORTE_ERROR_LOG(rc);
1113         goto complete;
1114     }
1115     i64 = (int64_t)read_dfs->read_length;
1116     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &i64, 1, OPAL_INT64))) {
1117         ORTE_ERROR_LOG(rc);
1118         goto complete;
1119     }
1120 
1121     opal_output_verbose(1, orte_dfs_base_framework.framework_output,
1122                         "%s sending read file request to %s for fd %d",
1123                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1124                         ORTE_NAME_PRINT(&trk->host_daemon),
1125                         trk->local_fd);
1126     /* send it */
1127     if (0 > (rc = orte_rml.send_buffer_nb(orte_mgmt_conduit,
1128                                           &trk->host_daemon, buffer,
1129                                           ORTE_RML_TAG_DFS_CMD,
1130                                           orte_rml_send_callback, NULL))) {
1131         ORTE_ERROR_LOG(rc);
1132         OBJ_RELEASE(buffer);
1133     }
1134     /* don't release the request */
1135     return;
1136 
1137  complete:
1138     /* don't need to hang on to this request */
1139     opal_list_remove_item(&requests, &read_dfs->super);
1140     OBJ_RELEASE(read_dfs);
1141 }
1142 
dfs_read(int fd,uint8_t * buffer,long length,orte_dfs_read_callback_fn_t cbfunc,void * cbdata)1143 static void dfs_read(int fd, uint8_t *buffer,
1144                      long length,
1145                      orte_dfs_read_callback_fn_t cbfunc,
1146                      void *cbdata)
1147 {
1148     orte_dfs_request_t *dfs;
1149 
1150     dfs = OBJ_NEW(orte_dfs_request_t);
1151     dfs->cmd = ORTE_DFS_READ_CMD;
1152     dfs->local_fd = fd;
1153     dfs->read_buffer = buffer;
1154     dfs->read_length = length;
1155     dfs->read_cbfunc = cbfunc;
1156     dfs->cbdata = cbdata;
1157 
1158     /* post it for processing */
1159     ORTE_THREADSHIFT(dfs, orte_event_base, process_reads, ORTE_SYS_PRI);
1160 }
1161 
process_posts(int fd,short args,void * cbdata)1162 static void process_posts(int fd, short args, void *cbdata)
1163 {
1164     orte_dfs_request_t *dfs = (orte_dfs_request_t*)cbdata;
1165     opal_buffer_t *buffer;
1166     int rc;
1167 
1168     ORTE_ACQUIRE_OBJECT(dfs);
1169 
1170     /* we will get confirmation in our receive function, so
1171      * add this request to our list */
1172     dfs->id = req_id++;
1173     opal_list_append(&requests, &dfs->super);
1174 
1175     /* Send the buffer's contents to our local daemon for storage */
1176     buffer = OBJ_NEW(opal_buffer_t);
1177     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->cmd, 1, ORTE_DFS_CMD_T))) {
1178         ORTE_ERROR_LOG(rc);
1179         goto error;
1180     }
1181     /* include the request id */
1182     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->id, 1, OPAL_UINT64))) {
1183         ORTE_ERROR_LOG(rc);
1184         goto error;
1185     }
1186     /* add my name */
1187     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) {
1188         ORTE_ERROR_LOG(rc);
1189         goto error;
1190     }
1191     /* pack the payload */
1192     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->bptr, 1, OPAL_BUFFER))) {
1193         ORTE_ERROR_LOG(rc);
1194         goto error;
1195     }
1196     /* send it */
1197     if (0 > (rc = orte_rml.send_buffer_nb(orte_mgmt_conduit,
1198                                           ORTE_PROC_MY_DAEMON, buffer,
1199                                           ORTE_RML_TAG_DFS_CMD,
1200                                           orte_rml_send_callback, NULL))) {
1201         ORTE_ERROR_LOG(rc);
1202         goto error;
1203     }
1204     return;
1205 
1206  error:
1207     OBJ_RELEASE(buffer);
1208     opal_list_remove_item(&requests, &dfs->super);
1209     if (NULL != dfs->post_cbfunc) {
1210         dfs->post_cbfunc(dfs->cbdata);
1211     }
1212     OBJ_RELEASE(dfs);
1213 }
1214 
dfs_post_file_map(opal_buffer_t * bo,orte_dfs_post_callback_fn_t cbfunc,void * cbdata)1215 static void dfs_post_file_map(opal_buffer_t *bo,
1216                               orte_dfs_post_callback_fn_t cbfunc,
1217                               void *cbdata)
1218 {
1219     orte_dfs_request_t *dfs;
1220 
1221     dfs = OBJ_NEW(orte_dfs_request_t);
1222     dfs->cmd = ORTE_DFS_POST_CMD;
1223     dfs->bptr = bo;
1224     dfs->post_cbfunc = cbfunc;
1225     dfs->cbdata = cbdata;
1226 
1227     /* post it for processing */
1228     ORTE_THREADSHIFT(dfs, orte_event_base, process_posts, ORTE_SYS_PRI);
1229 }
1230 
process_getfm(int fd,short args,void * cbdata)1231 static void process_getfm(int fd, short args, void *cbdata)
1232 {
1233     orte_dfs_request_t *dfs = (orte_dfs_request_t*)cbdata;
1234     opal_buffer_t *buffer;
1235     int rc;
1236 
1237     ORTE_ACQUIRE_OBJECT(dfs);
1238 
1239     /* we will get confirmation in our receive function, so
1240      * add this request to our list */
1241     dfs->id = req_id++;
1242     opal_list_append(&requests, &dfs->super);
1243 
1244     /* Send the request to our local daemon */
1245     buffer = OBJ_NEW(opal_buffer_t);
1246     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->cmd, 1, ORTE_DFS_CMD_T))) {
1247         ORTE_ERROR_LOG(rc);
1248         goto error;
1249     }
1250     /* include the request id */
1251     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->id, 1, OPAL_UINT64))) {
1252         ORTE_ERROR_LOG(rc);
1253         goto error;
1254     }
1255     /* and the target */
1256     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->target, 1, ORTE_NAME))) {
1257         ORTE_ERROR_LOG(rc);
1258         goto error;
1259     }
1260     /* send it */
1261     if (0 > (rc = orte_rml.send_buffer_nb(orte_mgmt_conduit,
1262                                           ORTE_PROC_MY_DAEMON, buffer,
1263                                           ORTE_RML_TAG_DFS_CMD,
1264                                           orte_rml_send_callback, NULL))) {
1265         ORTE_ERROR_LOG(rc);
1266         goto error;
1267     }
1268     return;
1269 
1270  error:
1271     OBJ_RELEASE(buffer);
1272     opal_list_remove_item(&requests, &dfs->super);
1273     if (NULL != dfs->fm_cbfunc) {
1274         dfs->fm_cbfunc(NULL, dfs->cbdata);
1275     }
1276     OBJ_RELEASE(dfs);
1277 }
1278 
dfs_get_file_map(orte_process_name_t * target,orte_dfs_fm_callback_fn_t cbfunc,void * cbdata)1279 static void dfs_get_file_map(orte_process_name_t *target,
1280                              orte_dfs_fm_callback_fn_t cbfunc,
1281                              void *cbdata)
1282 {
1283     orte_dfs_request_t *dfs;
1284 
1285     dfs = OBJ_NEW(orte_dfs_request_t);
1286     dfs->cmd = ORTE_DFS_GETFM_CMD;
1287     dfs->target.jobid = target->jobid;
1288     dfs->target.vpid = target->vpid;
1289     dfs->fm_cbfunc = cbfunc;
1290     dfs->cbdata = cbdata;
1291 
1292     /* post it for processing */
1293     ORTE_THREADSHIFT(dfs, orte_event_base, process_getfm, ORTE_SYS_PRI);
1294 }
1295 
dfs_load_file_maps(orte_jobid_t jobid,opal_buffer_t * bo,orte_dfs_load_callback_fn_t cbfunc,void * cbdata)1296 static void dfs_load_file_maps(orte_jobid_t jobid,
1297                                opal_buffer_t *bo,
1298                                orte_dfs_load_callback_fn_t cbfunc,
1299                                void *cbdata)
1300 {
1301     /* apps don't store file maps */
1302     if (NULL != cbfunc) {
1303         cbfunc(cbdata);
1304     }
1305 }
1306 
dfs_purge_file_maps(orte_jobid_t jobid,orte_dfs_purge_callback_fn_t cbfunc,void * cbdata)1307 static void dfs_purge_file_maps(orte_jobid_t jobid,
1308                                 orte_dfs_purge_callback_fn_t cbfunc,
1309                                 void *cbdata)
1310 {
1311     /* apps don't store file maps */
1312     if (NULL != cbfunc) {
1313         cbfunc(cbdata);
1314     }
1315 }
1316