1 /*
2  * Copyright (c) 2012-2013 Los Alamos National Security, LLC.
3  *                         All rights reserved.
4  * Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
5  * Copyright (c) 2014-2015 Research Organization for Information Science
6  *                         and Technology (RIST). All rights reserved.
7  * $COPYRIGHT$
8  *
9  * Additional copyrights may follow
10  *
11  * $HEADER$
12  */
13 
14 #include "orte_config.h"
15 
16 #include <sys/types.h>
17 #ifdef HAVE_UNISTD_H
18 #include <unistd.h>
19 #endif  /* HAVE_UNISTD_H */
20 #include <string.h>
21 #ifdef HAVE_FCNTL_H
22 #include <fcntl.h>
23 #endif
24 #include <sys/stat.h>
25 
26 #include "opal/util/if.h"
27 #include "opal/util/output.h"
28 #include "opal/util/uri.h"
29 #include "opal/dss/dss.h"
30 #include "opal/mca/pmix/pmix.h"
31 
32 #include "orte/util/error_strings.h"
33 #include "orte/util/name_fns.h"
34 #include "orte/util/show_help.h"
35 #include "orte/util/threads.h"
36 #include "orte/runtime/orte_globals.h"
37 #include "orte/mca/errmgr/errmgr.h"
38 #include "orte/mca/rml/rml.h"
39 
40 #include "orte/mca/dfs/base/base.h"
41 #include "dfs_test.h"
42 
43 /*
44  * Module functions: Global
45  */
46 static int init(void);
47 static int finalize(void);
48 
49 static void dfs_open(char *uri,
50                      orte_dfs_open_callback_fn_t cbfunc,
51                      void *cbdata);
52 static void dfs_close(int fd,
53                       orte_dfs_close_callback_fn_t cbfunc,
54                       void *cbdata);
55 static void dfs_get_file_size(int fd,
56                               orte_dfs_size_callback_fn_t cbfunc,
57                               void *cbdata);
58 static void dfs_seek(int fd, long offset, int whence,
59                      orte_dfs_seek_callback_fn_t cbfunc,
60                      void *cbdata);
61 static void dfs_read(int fd, uint8_t *buffer,
62                      long length,
63                      orte_dfs_read_callback_fn_t cbfunc,
64                      void *cbdata);
65 static void dfs_post_file_map(opal_buffer_t *bo,
66                               orte_dfs_post_callback_fn_t cbfunc,
67                               void *cbdata);
68 static void dfs_get_file_map(orte_process_name_t *target,
69                              orte_dfs_fm_callback_fn_t cbfunc,
70                              void *cbdata);
71 static void dfs_load_file_maps(orte_jobid_t jobid,
72                                opal_buffer_t *bo,
73                                orte_dfs_load_callback_fn_t cbfunc,
74                                void *cbdata);
75 static void dfs_purge_file_maps(orte_jobid_t jobid,
76                                 orte_dfs_purge_callback_fn_t cbfunc,
77                                 void *cbdata);
78 
79 /******************
80  * TEST module
81  ******************/
82 orte_dfs_base_module_t orte_dfs_test_module = {
83     init,
84     finalize,
85     dfs_open,
86     dfs_close,
87     dfs_get_file_size,
88     dfs_seek,
89     dfs_read,
90     dfs_post_file_map,
91     dfs_get_file_map,
92     dfs_load_file_maps,
93     dfs_purge_file_maps
94 };
95 
96 static opal_list_t requests, active_files;
97 static int local_fd = 0;
98 static uint64_t req_id = 0;
99 static void recv_dfs(int status, orte_process_name_t* sender,
100                      opal_buffer_t* buffer, orte_rml_tag_t tag,
101                      void* cbdata);
102 
init(void)103 static int init(void)
104 {
105     OBJ_CONSTRUCT(&requests, opal_list_t);
106     OBJ_CONSTRUCT(&active_files, opal_list_t);
107     orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
108                             ORTE_RML_TAG_DFS_DATA,
109                             ORTE_RML_PERSISTENT,
110                             recv_dfs,
111                             NULL);
112     return ORTE_SUCCESS;
113 }
114 
finalize(void)115 static int finalize(void)
116 {
117     opal_list_item_t *item;
118 
119     orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DFS_DATA);
120     while (NULL != (item = opal_list_remove_first(&requests))) {
121         OBJ_RELEASE(item);
122     }
123     OBJ_DESTRUCT(&requests);
124     while (NULL != (item = opal_list_remove_first(&active_files))) {
125         OBJ_RELEASE(item);
126     }
127     OBJ_DESTRUCT(&active_files);
128     return ORTE_SUCCESS;
129 }
130 
131 /* receives take place in an event, so we are free to process
132  * the request list without fear of getting things out-of-order
133  */
recv_dfs(int status,orte_process_name_t * sender,opal_buffer_t * buffer,orte_rml_tag_t tag,void * cbdata)134 static void recv_dfs(int status, orte_process_name_t* sender,
135                      opal_buffer_t* buffer, orte_rml_tag_t tag,
136                      void* cbdata)
137 {
138     orte_dfs_cmd_t cmd;
139     int32_t cnt;
140     orte_dfs_request_t *dfs, *dptr;
141     opal_list_item_t *item;
142     int remote_fd, rc;
143     int64_t i64;
144     uint64_t rid;
145     orte_dfs_tracker_t *trk;
146 
147     /* unpack the command this message is responding to */
148     cnt = 1;
149     if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &cmd, &cnt, ORTE_DFS_CMD_T))) {
150         ORTE_ERROR_LOG(rc);
151         return;
152     }
153 
154     opal_output_verbose(1, orte_dfs_base_framework.framework_output,
155                         "%s recvd cmd %d from sender %s",
156                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)cmd,
157                         ORTE_NAME_PRINT(sender));
158 
159     switch (cmd) {
160     case ORTE_DFS_OPEN_CMD:
161         /* unpack the request id */
162         cnt = 1;
163         if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
164             ORTE_ERROR_LOG(rc);
165             return;
166         }
167         /* unpack the remote fd */
168         cnt = 1;
169         if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &remote_fd, &cnt, OPAL_INT))) {
170             ORTE_ERROR_LOG(rc);
171             return;
172         }
173         /* search our list of requests to find the matching one */
174         dfs = NULL;
175         for (item = opal_list_get_first(&requests);
176              item != opal_list_get_end(&requests);
177              item = opal_list_get_next(item)) {
178             dptr = (orte_dfs_request_t*)item;
179             if (dptr->id == rid) {
180                 /* as the request has been fulfilled, remove it */
181                 opal_list_remove_item(&requests, item);
182                 dfs = dptr;
183                 break;
184             }
185         }
186         if (NULL == dfs) {
187             opal_output_verbose(1, orte_dfs_base_framework.framework_output,
188                                 "%s recvd open file - no corresponding request found for local fd %d",
189                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), local_fd);
190             ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
191             return;
192         }
193 
194         /* if the remote_fd < 0, then we had an error, so return
195          * the error value to the caller
196          */
197         if (remote_fd < 0) {
198             opal_output_verbose(1, orte_dfs_base_framework.framework_output,
199                                 "%s recvd open file response error file %s [error: %d]",
200                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
201                                 dfs->uri, remote_fd);
202             if (NULL != dfs->open_cbfunc) {
203                 dfs->open_cbfunc(remote_fd, dfs->cbdata);
204             }
205             /* release the request */
206             OBJ_RELEASE(dfs);
207             return;
208         }
209         /* otherwise, create a tracker for this file */
210         trk = OBJ_NEW(orte_dfs_tracker_t);
211         trk->requestor.jobid = ORTE_PROC_MY_NAME->jobid;
212         trk->requestor.vpid = ORTE_PROC_MY_NAME->vpid;
213         trk->host_daemon.jobid = sender->jobid;
214         trk->host_daemon.vpid = sender->vpid;
215         trk->filename = strdup(dfs->uri);
216         /* define the local fd */
217         trk->local_fd = local_fd++;
218         /* record the remote file descriptor */
219         trk->remote_fd = remote_fd;
220         /* add it to our list of active files */
221         opal_list_append(&active_files, &trk->super);
222         /* return the local_fd to the caller for
223          * subsequent operations
224          */
225         opal_output_verbose(1, orte_dfs_base_framework.framework_output,
226                             "%s recvd open file completed for file %s [local fd: %d remote fd: %d]",
227                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
228                             dfs->uri, trk->local_fd, remote_fd);
229         if (NULL != dfs->open_cbfunc) {
230             dfs->open_cbfunc(trk->local_fd, dfs->cbdata);
231         }
232         /* release the request */
233         OBJ_RELEASE(dfs);
234         break;
235 
236     case ORTE_DFS_SIZE_CMD:
237         /* unpack the request id for this request */
238         cnt = 1;
239         if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
240             ORTE_ERROR_LOG(rc);
241             return;
242         }
243         /* search our list of requests to find the matching one */
244         dfs = NULL;
245         for (item = opal_list_get_first(&requests);
246              item != opal_list_get_end(&requests);
247              item = opal_list_get_next(item)) {
248             dptr = (orte_dfs_request_t*)item;
249             if (dptr->id == rid) {
250                 /* request was fulfilled, so remove it */
251                 opal_list_remove_item(&requests, item);
252                 dfs = dptr;
253                 break;
254             }
255         }
256         if (NULL == dfs) {
257             opal_output_verbose(1, orte_dfs_base_framework.framework_output,
258                                 "%s recvd size - no corresponding request found for local fd %d",
259                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), local_fd);
260             ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
261             return;
262         }
263         /* get the size */
264         cnt = 1;
265         if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &i64, &cnt, OPAL_INT64))) {
266             ORTE_ERROR_LOG(rc);
267             OBJ_RELEASE(dfs);
268             return;
269         }
270         /* pass it back to the original caller */
271         if (NULL != dfs->size_cbfunc) {
272             dfs->size_cbfunc(i64, dfs->cbdata);
273         }
274         /* release the request */
275         OBJ_RELEASE(dfs);
276         break;
277 
278     case ORTE_DFS_SEEK_CMD:
279         /* unpack the request id for this read */
280         cnt = 1;
281         if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
282             ORTE_ERROR_LOG(rc);
283             return;
284         }
285         /* search our list of requests to find the matching one */
286         dfs = NULL;
287         for (item = opal_list_get_first(&requests);
288              item != opal_list_get_end(&requests);
289              item = opal_list_get_next(item)) {
290             dptr = (orte_dfs_request_t*)item;
291             if (dptr->id == rid) {
292                 /* request was fulfilled, so remove it */
293                 opal_list_remove_item(&requests, item);
294                 dfs = dptr;
295                 break;
296             }
297         }
298         if (NULL == dfs) {
299             opal_output_verbose(1, orte_dfs_base_framework.framework_output,
300                                 "%s recvd seek - no corresponding request found for local fd %d",
301                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), local_fd);
302             ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
303             return;
304         }
305         /* get the returned offset/status */
306         cnt = 1;
307         if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &i64, &cnt, OPAL_INT64))) {
308             ORTE_ERROR_LOG(rc);
309             OBJ_RELEASE(dfs);
310             return;
311         }
312         /* pass it back to the original caller */
313         if (NULL != dfs->seek_cbfunc) {
314             dfs->seek_cbfunc(i64, dfs->cbdata);
315         }
316         /* release the request */
317         OBJ_RELEASE(dfs);
318         break;
319 
320     case ORTE_DFS_READ_CMD:
321         /* unpack the request id for this read */
322         cnt = 1;
323         if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
324             ORTE_ERROR_LOG(rc);
325             return;
326         }
327         /* search our list of requests to find the matching one */
328         dfs = NULL;
329         for (item = opal_list_get_first(&requests);
330              item != opal_list_get_end(&requests);
331              item = opal_list_get_next(item)) {
332             dptr = (orte_dfs_request_t*)item;
333             if (dptr->id == rid) {
334                 /* request was fulfilled, so remove it */
335                 opal_list_remove_item(&requests, item);
336                 dfs = dptr;
337                 break;
338             }
339         }
340         if (NULL == dfs) {
341             opal_output_verbose(1, orte_dfs_base_framework.framework_output,
342                                 "%s recvd read - no corresponding request found for local fd %d",
343                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), local_fd);
344             ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
345             return;
346         }
347         /* get the bytes read */
348         cnt = 1;
349         if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &i64, &cnt, OPAL_INT64))) {
350             ORTE_ERROR_LOG(rc);
351             OBJ_RELEASE(dfs);
352             return;
353         }
354         if (0 < i64) {
355             cnt = i64;
356             if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, dfs->read_buffer, &cnt, OPAL_UINT8))) {
357                 ORTE_ERROR_LOG(rc);
358                 OBJ_RELEASE(dfs);
359                 return;
360             }
361         }
362         /* pass them back to the original caller */
363         if (NULL != dfs->read_cbfunc) {
364             dfs->read_cbfunc(i64, dfs->read_buffer, dfs->cbdata);
365         }
366         /* release the request */
367         OBJ_RELEASE(dfs);
368         break;
369 
370     case ORTE_DFS_POST_CMD:
371         /* unpack the request id for this read */
372         cnt = 1;
373         if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
374             ORTE_ERROR_LOG(rc);
375             return;
376         }
377         /* search our list of requests to find the matching one */
378         dfs = NULL;
379         for (item = opal_list_get_first(&requests);
380              item != opal_list_get_end(&requests);
381              item = opal_list_get_next(item)) {
382             dptr = (orte_dfs_request_t*)item;
383             if (dptr->id == rid) {
384                 /* request was fulfilled, so remove it */
385                 opal_list_remove_item(&requests, item);
386                 dfs = dptr;
387                 break;
388             }
389         }
390         if (NULL == dfs) {
391             opal_output_verbose(1, orte_dfs_base_framework.framework_output,
392                                 "%s recvd post - no corresponding request found",
393                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
394             ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
395             return;
396         }
397         if (NULL != dfs->post_cbfunc) {
398             dfs->post_cbfunc(dfs->cbdata);
399         }
400         OBJ_RELEASE(dfs);
401         break;
402 
403     case ORTE_DFS_GETFM_CMD:
404         /* unpack the request id for this read */
405         cnt = 1;
406         if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
407             ORTE_ERROR_LOG(rc);
408             return;
409         }
410         /* search our list of requests to find the matching one */
411         dfs = NULL;
412         for (item = opal_list_get_first(&requests);
413              item != opal_list_get_end(&requests);
414              item = opal_list_get_next(item)) {
415             dptr = (orte_dfs_request_t*)item;
416             if (dptr->id == rid) {
417                 /* request was fulfilled, so remove it */
418                 opal_list_remove_item(&requests, item);
419                 dfs = dptr;
420                 break;
421             }
422         }
423         if (NULL == dfs) {
424             opal_output_verbose(1, orte_dfs_base_framework.framework_output,
425                                 "%s recvd getfm - no corresponding request found",
426                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
427             ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
428             return;
429         }
430         /* return it to caller */
431         if (NULL != dfs->fm_cbfunc) {
432             dfs->fm_cbfunc(buffer, dfs->cbdata);
433         }
434         OBJ_RELEASE(dfs);
435         break;
436 
437     default:
438         opal_output(0, "TEST:DFS:RECV WTF");
439         break;
440     }
441 }
442 
process_opens(int fd,short args,void * cbdata)443 static void process_opens(int fd, short args, void *cbdata)
444 {
445     orte_dfs_request_t *dfs = (orte_dfs_request_t*)cbdata;
446     int rc;
447     opal_buffer_t *buffer;
448     char *scheme, *host=NULL, *filename=NULL;
449     orte_process_name_t daemon;
450     opal_list_t lt;
451     opal_namelist_t *nm;
452 
453     ORTE_ACQUIRE_OBJECT(dfs);
454 
455     opal_output_verbose(1, orte_dfs_base_framework.framework_output,
456                         "%s PROCESSING OPEN", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
457 
458     /* get the scheme to determine if we can process locally or not */
459     if (NULL == (scheme = opal_uri_get_scheme(dfs->uri))) {
460         ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
461         goto complete;
462     }
463     opal_output_verbose(1, orte_dfs_base_framework.framework_output,
464                         "%s GOT SCHEME", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
465 
466     if (0 != strcmp(scheme, "file")) {
467         /* not yet supported */
468         orte_show_help("orte_dfs_help.txt", "unsupported-filesystem",
469                        true, dfs->uri);
470         free(scheme);
471         goto complete;
472     }
473     free(scheme);
474 
475     /* dissect the uri to extract host and filename/path */
476     if (NULL == (filename = opal_filename_from_uri(dfs->uri, &host))) {
477         goto complete;
478     }
479     opal_output_verbose(1, orte_dfs_base_framework.framework_output,
480                         "%s GOT FILENAME %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), filename);
481     if (NULL == host) {
482         host = strdup(orte_process_info.nodename);
483     }
484 
485     /* ident the daemon on that host */
486     daemon.jobid = ORTE_PROC_MY_DAEMON->jobid;
487     OBJ_CONSTRUCT(&lt, opal_list_t);
488     if (ORTE_SUCCESS != (rc = opal_pmix.resolve_peers(host, daemon.jobid, &lt))) {
489         ORTE_ERROR_LOG(rc);
490         OBJ_DESTRUCT(&lt);
491         goto complete;
492     }
493     nm = (opal_namelist_t*)opal_list_get_first(&lt);
494     daemon.vpid = nm->name.vpid;
495     OPAL_LIST_DESTRUCT(&lt);
496 
497     opal_output_verbose(1, orte_dfs_base_framework.framework_output,
498                         "%s file %s on host %s daemon %s",
499                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
500                         filename, host, ORTE_NAME_PRINT(&daemon));
501 
502     /* add this request to our local list so we can
503      * match it with the returned response when it comes
504      */
505     dfs->id = req_id++;
506     opal_list_append(&requests, &dfs->super);
507 
508     /* setup a message for the daemon telling
509      * them what file we want to access
510      */
511     buffer = OBJ_NEW(opal_buffer_t);
512     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->cmd, 1, ORTE_DFS_CMD_T))) {
513         ORTE_ERROR_LOG(rc);
514         opal_list_remove_item(&requests, &dfs->super);
515         goto complete;
516     }
517     /* pass the request id */
518     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->id, 1, OPAL_UINT64))) {
519         ORTE_ERROR_LOG(rc);
520         opal_list_remove_item(&requests, &dfs->super);
521         goto complete;
522     }
523     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &filename, 1, OPAL_STRING))) {
524         ORTE_ERROR_LOG(rc);
525         opal_list_remove_item(&requests, &dfs->super);
526         goto complete;
527     }
528 
529     opal_output_verbose(1, orte_dfs_base_framework.framework_output,
530                         "%s sending open file request to %s file %s",
531                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
532                         ORTE_NAME_PRINT(&daemon),
533                         filename);
534     /* send it */
535     if (0 > (rc = orte_rml.send_buffer_nb(orte_mgmt_conduit,
536                                           &daemon, buffer,
537                                           ORTE_RML_TAG_DFS_CMD,
538                                           orte_rml_send_callback, NULL))) {
539         ORTE_ERROR_LOG(rc);
540         OBJ_RELEASE(buffer);
541         opal_list_remove_item(&requests, &dfs->super);
542         goto complete;
543     }
544     /* don't release it */
545     free(host);
546     free(filename);
547     return;
548 
549  complete:
550     /* we get here if an error occurred - execute any
551      * pending callback so the proc doesn't hang
552      */
553     if (NULL != host) {
554         free(host);
555     }
556     if (NULL != filename) {
557         free(filename);
558     }
559     if (NULL != dfs->open_cbfunc) {
560         dfs->open_cbfunc(-1, dfs->cbdata);
561     }
562     OBJ_RELEASE(dfs);
563 }
564 
565 
566 /* in order to handle the possible opening/reading of files by
567  * multiple threads, we have to ensure that all operations are
568  * carried out in events - so the "open" cmd simply posts an
569  * event containing the required info, and then returns
570  */
dfs_open(char * uri,orte_dfs_open_callback_fn_t cbfunc,void * cbdata)571 static void dfs_open(char *uri,
572                      orte_dfs_open_callback_fn_t cbfunc,
573                      void *cbdata)
574 {
575     orte_dfs_request_t *dfs;
576 
577     opal_output_verbose(1, orte_dfs_base_framework.framework_output,
578                         "%s opening file %s",
579                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), uri);
580 
581     /* setup the request */
582     dfs = OBJ_NEW(orte_dfs_request_t);
583     dfs->cmd = ORTE_DFS_OPEN_CMD;
584     dfs->uri = strdup(uri);
585     dfs->open_cbfunc = cbfunc;
586     dfs->cbdata = cbdata;
587 
588     /* post it for processing */
589     ORTE_THREADSHIFT(dfs, orte_event_base, process_opens, ORTE_SYS_PRI);
590 }
591 
process_close(int fd,short args,void * cbdata)592 static void process_close(int fd, short args, void *cbdata)
593 {
594     orte_dfs_request_t *close_dfs = (orte_dfs_request_t*)cbdata;
595     orte_dfs_tracker_t *tptr, *trk;
596     opal_list_item_t *item;
597     opal_buffer_t *buffer;
598     int rc;
599 
600     ORTE_ACQUIRE_OBJECT(close_dfs);
601 
602     opal_output_verbose(1, orte_dfs_base_framework.framework_output,
603                         "%s closing fd %d",
604                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
605                         close_dfs->local_fd);
606 
607     /* look in our local records for this fd */
608     trk = NULL;
609     for (item = opal_list_get_first(&active_files);
610          item != opal_list_get_end(&active_files);
611          item = opal_list_get_next(item)) {
612         tptr = (orte_dfs_tracker_t*)item;
613         if (tptr->local_fd == close_dfs->local_fd) {
614             trk = tptr;
615             break;
616         }
617     }
618     if (NULL == trk) {
619         ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
620         if (NULL != close_dfs->close_cbfunc) {
621             close_dfs->close_cbfunc(close_dfs->local_fd, close_dfs->cbdata);
622         }
623         OBJ_RELEASE(close_dfs);
624         return;
625     }
626 
627     /* setup a message for the daemon telling
628      * them what file to close
629      */
630     buffer = OBJ_NEW(opal_buffer_t);
631     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &close_dfs->cmd, 1, ORTE_DFS_CMD_T))) {
632         ORTE_ERROR_LOG(rc);
633         goto complete;
634     }
635     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &trk->remote_fd, 1, OPAL_INT))) {
636         ORTE_ERROR_LOG(rc);
637         goto complete;
638     }
639 
640     opal_output_verbose(1, orte_dfs_base_framework.framework_output,
641                         "%s sending close file request to %s for fd %d",
642                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
643                         ORTE_NAME_PRINT(&trk->host_daemon),
644                         trk->local_fd);
645     /* send it */
646     if (0 > (rc = orte_rml.send_buffer_nb(orte_mgmt_conduit,
647                                           &trk->host_daemon, buffer,
648                                           ORTE_RML_TAG_DFS_CMD,
649                                           orte_rml_send_callback, NULL))) {
650         ORTE_ERROR_LOG(rc);
651         OBJ_RELEASE(buffer);
652         goto complete;
653     }
654 
655  complete:
656     opal_list_remove_item(&active_files, &trk->super);
657     OBJ_RELEASE(trk);
658     if (NULL != close_dfs->close_cbfunc) {
659         close_dfs->close_cbfunc(close_dfs->local_fd, close_dfs->cbdata);
660     }
661     OBJ_RELEASE(close_dfs);
662 }
663 
dfs_close(int fd,orte_dfs_close_callback_fn_t cbfunc,void * cbdata)664 static void dfs_close(int fd,
665                       orte_dfs_close_callback_fn_t cbfunc,
666                       void *cbdata)
667 {
668     orte_dfs_request_t *dfs;
669 
670     opal_output_verbose(1, orte_dfs_base_framework.framework_output,
671                         "%s close called on fd %d",
672                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), fd);
673 
674     dfs = OBJ_NEW(orte_dfs_request_t);
675     dfs->cmd = ORTE_DFS_CLOSE_CMD;
676     dfs->local_fd = fd;
677     dfs->close_cbfunc = cbfunc;
678     dfs->cbdata = cbdata;
679 
680     /* post it for processing */
681     ORTE_THREADSHIFT(dfs, orte_event_base, process_close, ORTE_SYS_PRI);
682 }
683 
process_sizes(int fd,short args,void * cbdata)684 static void process_sizes(int fd, short args, void *cbdata)
685 {
686     orte_dfs_request_t *size_dfs = (orte_dfs_request_t*)cbdata;
687     orte_dfs_tracker_t *tptr, *trk;
688     opal_list_item_t *item;
689     opal_buffer_t *buffer;
690     int rc;
691 
692     ORTE_ACQUIRE_OBJECT(size_dfs);
693 
694     opal_output_verbose(1, orte_dfs_base_framework.framework_output,
695                         "%s processing get_size on fd %d",
696                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
697                         size_dfs->local_fd);
698 
699     /* look in our local records for this fd */
700     trk = NULL;
701     for (item = opal_list_get_first(&active_files);
702          item != opal_list_get_end(&active_files);
703          item = opal_list_get_next(item)) {
704         tptr = (orte_dfs_tracker_t*)item;
705         if (tptr->local_fd == size_dfs->local_fd) {
706             trk = tptr;
707             break;
708         }
709     }
710     if (NULL == trk) {
711         ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
712         OBJ_RELEASE(size_dfs);
713         return;
714     }
715 
716     /* add this request to our local list so we can
717      * match it with the returned response when it comes
718      */
719     size_dfs->id = req_id++;
720     opal_list_append(&requests, &size_dfs->super);
721 
722     /* setup a message for the daemon telling
723      * them what file we want to access
724      */
725     buffer = OBJ_NEW(opal_buffer_t);
726     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &size_dfs->cmd, 1, ORTE_DFS_CMD_T))) {
727         ORTE_ERROR_LOG(rc);
728         opal_list_remove_item(&requests, &size_dfs->super);
729         goto complete;
730     }
731     /* pass the request id */
732     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &size_dfs->id, 1, OPAL_UINT64))) {
733         ORTE_ERROR_LOG(rc);
734         opal_list_remove_item(&requests, &size_dfs->super);
735         goto complete;
736     }
737     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &trk->remote_fd, 1, OPAL_INT))) {
738         ORTE_ERROR_LOG(rc);
739         opal_list_remove_item(&requests, &size_dfs->super);
740         goto complete;
741     }
742 
743     opal_output_verbose(1, orte_dfs_base_framework.framework_output,
744                         "%s sending get_size request to %s for fd %d",
745                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
746                         ORTE_NAME_PRINT(&trk->host_daemon),
747                         trk->local_fd);
748     /* send it */
749     if (0 > (rc = orte_rml.send_buffer_nb(orte_mgmt_conduit,
750                                           &trk->host_daemon, buffer,
751                                           ORTE_RML_TAG_DFS_CMD,
752                                           orte_rml_send_callback, NULL))) {
753         ORTE_ERROR_LOG(rc);
754         OBJ_RELEASE(buffer);
755         opal_list_remove_item(&requests, &size_dfs->super);
756         if (NULL != size_dfs->size_cbfunc) {
757             size_dfs->size_cbfunc(-1, size_dfs->cbdata);
758         }
759         goto complete;
760     }
761     /* leave the request there */
762     return;
763 
764  complete:
765     OBJ_RELEASE(size_dfs);
766 }
767 
dfs_get_file_size(int fd,orte_dfs_size_callback_fn_t cbfunc,void * cbdata)768 static void dfs_get_file_size(int fd,
769                               orte_dfs_size_callback_fn_t cbfunc,
770                               void *cbdata)
771 {
772     orte_dfs_request_t *dfs;
773 
774     opal_output_verbose(1, orte_dfs_base_framework.framework_output,
775                         "%s get_size called on fd %d",
776                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), fd);
777 
778     dfs = OBJ_NEW(orte_dfs_request_t);
779     dfs->cmd = ORTE_DFS_SIZE_CMD;
780     dfs->local_fd = fd;
781     dfs->size_cbfunc = cbfunc;
782     dfs->cbdata = cbdata;
783 
784     /* post it for processing */
785     ORTE_THREADSHIFT(dfs, orte_event_base, process_sizes, ORTE_SYS_PRI);
786 }
787 
788 
process_seeks(int fd,short args,void * cbdata)789 static void process_seeks(int fd, short args, void *cbdata)
790 {
791     orte_dfs_request_t *seek_dfs = (orte_dfs_request_t*)cbdata;
792     orte_dfs_tracker_t *tptr, *trk;
793     opal_list_item_t *item;
794     opal_buffer_t *buffer;
795     int64_t i64;
796     int rc;
797 
798     ORTE_ACQUIRE_OBJECT(seek_dfs);
799 
800     opal_output_verbose(1, orte_dfs_base_framework.framework_output,
801                         "%s processing seek on fd %d",
802                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
803                         seek_dfs->local_fd);
804 
805     /* look in our local records for this fd */
806     trk = NULL;
807     for (item = opal_list_get_first(&active_files);
808          item != opal_list_get_end(&active_files);
809          item = opal_list_get_next(item)) {
810         tptr = (orte_dfs_tracker_t*)item;
811         if (tptr->local_fd == seek_dfs->local_fd) {
812             trk = tptr;
813             break;
814         }
815     }
816     if (NULL == trk) {
817         ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
818         OBJ_RELEASE(seek_dfs);
819         return;
820     }
821 
822     /* add this request to our local list so we can
823      * match it with the returned response when it comes
824      */
825     seek_dfs->id = req_id++;
826     opal_list_append(&requests, &seek_dfs->super);
827 
828     /* setup a message for the daemon telling
829      * them what file to seek
830      */
831     buffer = OBJ_NEW(opal_buffer_t);
832     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &seek_dfs->cmd, 1, ORTE_DFS_CMD_T))) {
833         ORTE_ERROR_LOG(rc);
834         goto complete;
835     }
836     /* pass the request id */
837     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &seek_dfs->id, 1, OPAL_UINT64))) {
838         ORTE_ERROR_LOG(rc);
839         opal_list_remove_item(&requests, &seek_dfs->super);
840         goto complete;
841     }
842     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &trk->remote_fd, 1, OPAL_INT))) {
843         ORTE_ERROR_LOG(rc);
844         goto complete;
845     }
846     i64 = (int64_t)seek_dfs->read_length;
847     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &i64, 1, OPAL_INT64))) {
848         ORTE_ERROR_LOG(rc);
849         goto complete;
850     }
851     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &seek_dfs->remote_fd, 1, OPAL_INT))) {
852         ORTE_ERROR_LOG(rc);
853         goto complete;
854     }
855 
856     opal_output_verbose(1, orte_dfs_base_framework.framework_output,
857                         "%s sending seek file request to %s for fd %d",
858                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
859                         ORTE_NAME_PRINT(&trk->host_daemon),
860                         trk->local_fd);
861     /* send it */
862     if (0 > (rc = orte_rml.send_buffer_nb(orte_mgmt_conduit,
863                                           &trk->host_daemon, buffer,
864                                           ORTE_RML_TAG_DFS_CMD,
865                                           orte_rml_send_callback, NULL))) {
866         ORTE_ERROR_LOG(rc);
867         OBJ_RELEASE(buffer);
868         goto complete;
869     }
870     /* leave the request */
871     return;
872 
873  complete:
874     OBJ_RELEASE(seek_dfs);
875 }
876 
877 
dfs_seek(int fd,long offset,int whence,orte_dfs_seek_callback_fn_t cbfunc,void * cbdata)878 static void dfs_seek(int fd, long offset, int whence,
879                      orte_dfs_seek_callback_fn_t cbfunc,
880                      void *cbdata)
881 {
882     orte_dfs_request_t *dfs;
883 
884     opal_output_verbose(1, orte_dfs_base_framework.framework_output,
885                         "%s seek called on fd %d",
886                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), fd);
887 
888     dfs = OBJ_NEW(orte_dfs_request_t);
889     dfs->cmd = ORTE_DFS_SEEK_CMD;
890     dfs->local_fd = fd;
891     dfs->read_length = offset;
892     dfs->remote_fd = whence;
893     dfs->seek_cbfunc = cbfunc;
894     dfs->cbdata = cbdata;
895 
896     /* post it for processing */
897     ORTE_THREADSHIFT(dfs, orte_event_base, process_seeks, ORTE_SYS_PRI);
898 }
899 
process_reads(int fd,short args,void * cbdata)900 static void process_reads(int fd, short args, void *cbdata)
901 {
902     orte_dfs_request_t *read_dfs = (orte_dfs_request_t*)cbdata;
903     orte_dfs_tracker_t *tptr, *trk;
904     opal_list_item_t *item;
905     opal_buffer_t *buffer;
906     int64_t i64;
907     int rc;
908 
909     ORTE_ACQUIRE_OBJECT(read_dfs);
910 
911     /* look in our local records for this fd */
912     trk = NULL;
913     for (item = opal_list_get_first(&active_files);
914          item != opal_list_get_end(&active_files);
915          item = opal_list_get_next(item)) {
916         tptr = (orte_dfs_tracker_t*)item;
917         if (tptr->local_fd == read_dfs->local_fd) {
918             trk = tptr;
919             break;
920         }
921     }
922     if (NULL == trk) {
923         ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
924         OBJ_RELEASE(read_dfs);
925         return;
926     }
927 
928     /* add this request to our pending list */
929     read_dfs->id = req_id++;
930     opal_list_append(&requests, &read_dfs->super);
931 
932     /* setup a message for the daemon telling
933      * them what file to read
934      */
935     buffer = OBJ_NEW(opal_buffer_t);
936     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &read_dfs->cmd, 1, ORTE_DFS_CMD_T))) {
937         ORTE_ERROR_LOG(rc);
938         goto complete;
939     }
940     /* include the request id */
941     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &read_dfs->id, 1, OPAL_UINT64))) {
942         ORTE_ERROR_LOG(rc);
943         goto complete;
944     }
945     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &trk->remote_fd, 1, OPAL_INT))) {
946         ORTE_ERROR_LOG(rc);
947         goto complete;
948     }
949     i64 = (int64_t)read_dfs->read_length;
950     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &i64, 1, OPAL_INT64))) {
951         ORTE_ERROR_LOG(rc);
952         goto complete;
953     }
954 
955     opal_output_verbose(1, orte_dfs_base_framework.framework_output,
956                         "%s sending read file request to %s for fd %d",
957                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
958                         ORTE_NAME_PRINT(&trk->host_daemon),
959                         trk->local_fd);
960     /* send it */
961     if (0 > (rc = orte_rml.send_buffer_nb(orte_mgmt_conduit,
962                                           &trk->host_daemon, buffer,
963                                           ORTE_RML_TAG_DFS_CMD,
964                                           orte_rml_send_callback, NULL))) {
965         ORTE_ERROR_LOG(rc);
966         OBJ_RELEASE(buffer);
967     }
968     /* don't release the request */
969     return;
970 
971  complete:
972     /* don't need to hang on to this request */
973     opal_list_remove_item(&requests, &read_dfs->super);
974     OBJ_RELEASE(read_dfs);
975 }
976 
dfs_read(int fd,uint8_t * buffer,long length,orte_dfs_read_callback_fn_t cbfunc,void * cbdata)977 static void dfs_read(int fd, uint8_t *buffer,
978                      long length,
979                      orte_dfs_read_callback_fn_t cbfunc,
980                      void *cbdata)
981 {
982     orte_dfs_request_t *dfs;
983 
984     dfs = OBJ_NEW(orte_dfs_request_t);
985     dfs->cmd = ORTE_DFS_READ_CMD;
986     dfs->local_fd = fd;
987     dfs->read_buffer = buffer;
988     dfs->read_length = length;
989     dfs->read_cbfunc = cbfunc;
990     dfs->cbdata = cbdata;
991 
992     /* post it for processing */
993     ORTE_THREADSHIFT(dfs, orte_event_base, process_reads, ORTE_SYS_PRI);
994 }
995 
process_posts(int fd,short args,void * cbdata)996 static void process_posts(int fd, short args, void *cbdata)
997 {
998     orte_dfs_request_t *dfs = (orte_dfs_request_t*)cbdata;
999     opal_buffer_t *buffer;
1000     int rc;
1001 
1002     ORTE_ACQUIRE_OBJECT(dfs);
1003 
1004     /* we will get confirmation in our receive function, so
1005      * add this request to our list */
1006     dfs->id = req_id++;
1007     opal_list_append(&requests, &dfs->super);
1008 
1009     /* Send the buffer's contents to our local daemon for storage */
1010     buffer = OBJ_NEW(opal_buffer_t);
1011     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->cmd, 1, ORTE_DFS_CMD_T))) {
1012         ORTE_ERROR_LOG(rc);
1013         goto error;
1014     }
1015     /* include the request id */
1016     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->id, 1, OPAL_UINT64))) {
1017         ORTE_ERROR_LOG(rc);
1018         goto error;
1019     }
1020     /* add my name */
1021     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) {
1022         ORTE_ERROR_LOG(rc);
1023         goto error;
1024     }
1025     /* pack the payload */
1026     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->bptr, 1, OPAL_BUFFER))) {
1027         ORTE_ERROR_LOG(rc);
1028         goto error;
1029     }
1030     /* send it */
1031     if (0 > (rc = orte_rml.send_buffer_nb(orte_mgmt_conduit,
1032                                           ORTE_PROC_MY_DAEMON, buffer,
1033                                           ORTE_RML_TAG_DFS_CMD,
1034                                           orte_rml_send_callback, NULL))) {
1035         ORTE_ERROR_LOG(rc);
1036         goto error;
1037     }
1038     return;
1039 
1040  error:
1041     OBJ_RELEASE(buffer);
1042     opal_list_remove_item(&requests, &dfs->super);
1043     if (NULL != dfs->post_cbfunc) {
1044         dfs->post_cbfunc(dfs->cbdata);
1045     }
1046     OBJ_RELEASE(dfs);
1047 }
1048 
dfs_post_file_map(opal_buffer_t * bo,orte_dfs_post_callback_fn_t cbfunc,void * cbdata)1049 static void dfs_post_file_map(opal_buffer_t *bo,
1050                               orte_dfs_post_callback_fn_t cbfunc,
1051                               void *cbdata)
1052 {
1053     orte_dfs_request_t *dfs;
1054 
1055     dfs = OBJ_NEW(orte_dfs_request_t);
1056     dfs->cmd = ORTE_DFS_POST_CMD;
1057     dfs->bptr = bo;
1058     dfs->post_cbfunc = cbfunc;
1059     dfs->cbdata = cbdata;
1060 
1061     /* post it for processing */
1062     ORTE_THREADSHIFT(dfs, orte_event_base, process_posts, ORTE_SYS_PRI);
1063 }
1064 
process_getfm(int fd,short args,void * cbdata)1065 static void process_getfm(int fd, short args, void *cbdata)
1066 {
1067     orte_dfs_request_t *dfs = (orte_dfs_request_t*)cbdata;
1068     opal_buffer_t *buffer;
1069     int rc;
1070 
1071     ORTE_ACQUIRE_OBJECT(dfs);
1072 
1073     /* we will get confirmation in our receive function, so
1074      * add this request to our list */
1075     dfs->id = req_id++;
1076     opal_list_append(&requests, &dfs->super);
1077 
1078     /* Send the request to our local daemon */
1079     buffer = OBJ_NEW(opal_buffer_t);
1080     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->cmd, 1, ORTE_DFS_CMD_T))) {
1081         ORTE_ERROR_LOG(rc);
1082         goto error;
1083     }
1084     /* include the request id */
1085     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->id, 1, OPAL_UINT64))) {
1086         ORTE_ERROR_LOG(rc);
1087         goto error;
1088     }
1089     /* and the target */
1090     if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->target, 1, ORTE_NAME))) {
1091         ORTE_ERROR_LOG(rc);
1092         goto error;
1093     }
1094     /* send it */
1095     if (0 > (rc = orte_rml.send_buffer_nb(orte_mgmt_conduit,
1096                                           ORTE_PROC_MY_DAEMON, buffer,
1097                                           ORTE_RML_TAG_DFS_CMD,
1098                                           orte_rml_send_callback, NULL))) {
1099         ORTE_ERROR_LOG(rc);
1100         goto error;
1101     }
1102     return;
1103 
1104  error:
1105     OBJ_RELEASE(buffer);
1106     opal_list_remove_item(&requests, &dfs->super);
1107     if (NULL != dfs->fm_cbfunc) {
1108         dfs->fm_cbfunc(NULL, dfs->cbdata);
1109     }
1110     OBJ_RELEASE(dfs);
1111 }
1112 
dfs_get_file_map(orte_process_name_t * target,orte_dfs_fm_callback_fn_t cbfunc,void * cbdata)1113 static void dfs_get_file_map(orte_process_name_t *target,
1114                              orte_dfs_fm_callback_fn_t cbfunc,
1115                              void *cbdata)
1116 {
1117     orte_dfs_request_t *dfs;
1118 
1119     dfs = OBJ_NEW(orte_dfs_request_t);
1120     dfs->cmd = ORTE_DFS_GETFM_CMD;
1121     dfs->target.jobid = target->jobid;
1122     dfs->target.vpid = target->vpid;
1123     dfs->fm_cbfunc = cbfunc;
1124     dfs->cbdata = cbdata;
1125 
1126     /* post it for processing */
1127     ORTE_THREADSHIFT(dfs, orte_event_base, process_getfm, ORTE_SYS_PRI);
1128 }
1129 
dfs_load_file_maps(orte_jobid_t jobid,opal_buffer_t * bo,orte_dfs_load_callback_fn_t cbfunc,void * cbdata)1130 static void dfs_load_file_maps(orte_jobid_t jobid,
1131                                opal_buffer_t *bo,
1132                                orte_dfs_load_callback_fn_t cbfunc,
1133                                void *cbdata)
1134 {
1135     /* apps don't store file maps */
1136     if (NULL != cbfunc) {
1137         cbfunc(cbdata);
1138     }
1139 }
1140 
dfs_purge_file_maps(orte_jobid_t jobid,orte_dfs_purge_callback_fn_t cbfunc,void * cbdata)1141 static void dfs_purge_file_maps(orte_jobid_t jobid,
1142                                 orte_dfs_purge_callback_fn_t cbfunc,
1143                                 void *cbdata)
1144 {
1145     /* apps don't store file maps */
1146     if (NULL != cbfunc) {
1147         cbfunc(cbdata);
1148     }
1149 }
1150