1 /*
2 * Copyright (c) 2012-2013 Los Alamos National Security, LLC.
3 * All rights reserved.
4 * Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
5 * Copyright (c) 2014 Research Organization for Information Science
6 * and Technology (RIST). All rights reserved.
7 * $COPYRIGHT$
8 *
9 * Additional copyrights may follow
10 *
11 * $HEADER$
12 */
13
14 #include "orte_config.h"
15
16 #include <sys/types.h>
17 #ifdef HAVE_UNISTD_H
18 #include <unistd.h>
19 #endif /* HAVE_UNISTD_H */
20 #include <string.h>
21 #ifdef HAVE_FCNTL_H
22 #include <fcntl.h>
23 #endif
24 #include <sys/stat.h>
25
26 #include "opal/util/if.h"
27 #include "opal/util/output.h"
28 #include "opal/util/uri.h"
29 #include "opal/dss/dss.h"
30 #include "opal/mca/pmix/pmix.h"
31
32 #include "orte/util/error_strings.h"
33 #include "orte/util/name_fns.h"
34 #include "orte/util/proc_info.h"
35 #include "orte/util/show_help.h"
36 #include "orte/util/threads.h"
37 #include "orte/runtime/orte_globals.h"
38 #include "orte/mca/errmgr/errmgr.h"
39 #include "orte/mca/rml/rml.h"
40
41 #include "orte/mca/dfs/base/base.h"
42 #include "dfs_app.h"
43
44 /*
45 * Module functions: Global
46 */
47 static int init(void);
48 static int finalize(void);
49
50 static void dfs_open(char *uri,
51 orte_dfs_open_callback_fn_t cbfunc,
52 void *cbdata);
53 static void dfs_close(int fd,
54 orte_dfs_close_callback_fn_t cbfunc,
55 void *cbdata);
56 static void dfs_get_file_size(int fd,
57 orte_dfs_size_callback_fn_t cbfunc,
58 void *cbdata);
59 static void dfs_seek(int fd, long offset, int whence,
60 orte_dfs_seek_callback_fn_t cbfunc,
61 void *cbdata);
62 static void dfs_read(int fd, uint8_t *buffer,
63 long length,
64 orte_dfs_read_callback_fn_t cbfunc,
65 void *cbdata);
66 static void dfs_post_file_map(opal_buffer_t *bo,
67 orte_dfs_post_callback_fn_t cbfunc,
68 void *cbdata);
69 static void dfs_get_file_map(orte_process_name_t *target,
70 orte_dfs_fm_callback_fn_t cbfunc,
71 void *cbdata);
72 static void dfs_load_file_maps(orte_jobid_t jobid,
73 opal_buffer_t *bo,
74 orte_dfs_load_callback_fn_t cbfunc,
75 void *cbdata);
76 static void dfs_purge_file_maps(orte_jobid_t jobid,
77 orte_dfs_purge_callback_fn_t cbfunc,
78 void *cbdata);
79
80 /******************
81 * APP module
82 ******************/
83 orte_dfs_base_module_t orte_dfs_app_module = {
84 init,
85 finalize,
86 dfs_open,
87 dfs_close,
88 dfs_get_file_size,
89 dfs_seek,
90 dfs_read,
91 dfs_post_file_map,
92 dfs_get_file_map,
93 dfs_load_file_maps,
94 dfs_purge_file_maps
95 };
96
97 static opal_list_t requests, active_files;
98 static int local_fd = 0;
99 static uint64_t req_id = 0;
100 static void recv_dfs(int status, orte_process_name_t* sender,
101 opal_buffer_t* buffer, orte_rml_tag_t tag,
102 void* cbdata);
103
init(void)104 static int init(void)
105 {
106 OBJ_CONSTRUCT(&requests, opal_list_t);
107 OBJ_CONSTRUCT(&active_files, opal_list_t);
108 orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
109 ORTE_RML_TAG_DFS_DATA,
110 ORTE_RML_PERSISTENT,
111 recv_dfs,
112 NULL);
113 return ORTE_SUCCESS;
114 }
115
finalize(void)116 static int finalize(void)
117 {
118 opal_list_item_t *item;
119
120 orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DFS_DATA);
121 while (NULL != (item = opal_list_remove_first(&requests))) {
122 OBJ_RELEASE(item);
123 }
124 OBJ_DESTRUCT(&requests);
125 while (NULL != (item = opal_list_remove_first(&active_files))) {
126 OBJ_RELEASE(item);
127 }
128 OBJ_DESTRUCT(&active_files);
129 return ORTE_SUCCESS;
130 }
131
132 /* receives take place in an event, so we are free to process
133 * the request list without fear of getting things out-of-order
134 */
recv_dfs(int status,orte_process_name_t * sender,opal_buffer_t * buffer,orte_rml_tag_t tag,void * cbdata)135 static void recv_dfs(int status, orte_process_name_t* sender,
136 opal_buffer_t* buffer, orte_rml_tag_t tag,
137 void* cbdata)
138 {
139 orte_dfs_cmd_t cmd;
140 int32_t cnt;
141 orte_dfs_request_t *dfs, *dptr;
142 opal_list_item_t *item;
143 int remote_fd, rc;
144 int64_t i64;
145 uint64_t rid;
146 orte_dfs_tracker_t *trk;
147
148 /* unpack the command this message is responding to */
149 cnt = 1;
150 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &cmd, &cnt, ORTE_DFS_CMD_T))) {
151 ORTE_ERROR_LOG(rc);
152 return;
153 }
154
155 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
156 "%s recvd cmd %d from sender %s",
157 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)cmd,
158 ORTE_NAME_PRINT(sender));
159
160 switch (cmd) {
161 case ORTE_DFS_OPEN_CMD:
162 /* unpack the request id */
163 cnt = 1;
164 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
165 ORTE_ERROR_LOG(rc);
166 return;
167 }
168 /* unpack the remote fd */
169 cnt = 1;
170 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &remote_fd, &cnt, OPAL_INT))) {
171 ORTE_ERROR_LOG(rc);
172 return;
173 }
174 /* search our list of requests to find the matching one */
175 dfs = NULL;
176 for (item = opal_list_get_first(&requests);
177 item != opal_list_get_end(&requests);
178 item = opal_list_get_next(item)) {
179 dptr = (orte_dfs_request_t*)item;
180 if (dptr->id == rid) {
181 /* as the request has been fulfilled, remove it */
182 opal_list_remove_item(&requests, item);
183 dfs = dptr;
184 break;
185 }
186 }
187 if (NULL == dfs) {
188 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
189 "%s recvd open file - no corresponding request found for local fd %d",
190 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), local_fd);
191 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
192 return;
193 }
194
195 /* if the remote_fd < 0, then we had an error, so return
196 * the error value to the caller
197 */
198 if (remote_fd < 0) {
199 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
200 "%s recvd open file response error file %s [error: %d]",
201 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
202 dfs->uri, remote_fd);
203 if (NULL != dfs->open_cbfunc) {
204 dfs->open_cbfunc(remote_fd, dfs->cbdata);
205 }
206 /* release the request */
207 OBJ_RELEASE(dfs);
208 return;
209 }
210 /* otherwise, create a tracker for this file */
211 trk = OBJ_NEW(orte_dfs_tracker_t);
212 trk->requestor.jobid = ORTE_PROC_MY_NAME->jobid;
213 trk->requestor.vpid = ORTE_PROC_MY_NAME->vpid;
214 trk->host_daemon.jobid = sender->jobid;
215 trk->host_daemon.vpid = sender->vpid;
216 trk->uri = strdup(dfs->uri);
217 /* break the uri down into scheme and filename */
218 trk->scheme = opal_uri_get_scheme(dfs->uri);
219 trk->filename = opal_filename_from_uri(dfs->uri, NULL);
220 /* define the local fd */
221 trk->local_fd = local_fd++;
222 /* record the remote file descriptor */
223 trk->remote_fd = remote_fd;
224 /* add it to our list of active files */
225 opal_list_append(&active_files, &trk->super);
226 /* return the local_fd to the caller for
227 * subsequent operations
228 */
229 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
230 "%s recvd open file completed for file %s [local fd: %d remote fd: %d]",
231 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
232 dfs->uri, trk->local_fd, remote_fd);
233 if (NULL != dfs->open_cbfunc) {
234 dfs->open_cbfunc(trk->local_fd, dfs->cbdata);
235 }
236 /* release the request */
237 OBJ_RELEASE(dfs);
238 break;
239
240 case ORTE_DFS_SIZE_CMD:
241 /* unpack the request id for this request */
242 cnt = 1;
243 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
244 ORTE_ERROR_LOG(rc);
245 return;
246 }
247 /* search our list of requests to find the matching one */
248 dfs = NULL;
249 for (item = opal_list_get_first(&requests);
250 item != opal_list_get_end(&requests);
251 item = opal_list_get_next(item)) {
252 dptr = (orte_dfs_request_t*)item;
253 if (dptr->id == rid) {
254 /* request was fulfilled, so remove it */
255 opal_list_remove_item(&requests, item);
256 dfs = dptr;
257 break;
258 }
259 }
260 if (NULL == dfs) {
261 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
262 "%s recvd size - no corresponding request found for local fd %d",
263 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), local_fd);
264 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
265 return;
266 }
267 /* get the size */
268 cnt = 1;
269 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &i64, &cnt, OPAL_INT64))) {
270 ORTE_ERROR_LOG(rc);
271 OBJ_RELEASE(dfs);
272 return;
273 }
274 /* pass it back to the original caller */
275 if (NULL != dfs->size_cbfunc) {
276 dfs->size_cbfunc(i64, dfs->cbdata);
277 }
278 /* release the request */
279 OBJ_RELEASE(dfs);
280 break;
281
282 case ORTE_DFS_SEEK_CMD:
283 /* unpack the request id for this read */
284 cnt = 1;
285 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
286 ORTE_ERROR_LOG(rc);
287 return;
288 }
289 /* search our list of requests to find the matching one */
290 dfs = NULL;
291 for (item = opal_list_get_first(&requests);
292 item != opal_list_get_end(&requests);
293 item = opal_list_get_next(item)) {
294 dptr = (orte_dfs_request_t*)item;
295 if (dptr->id == rid) {
296 /* request was fulfilled, so remove it */
297 opal_list_remove_item(&requests, item);
298 dfs = dptr;
299 break;
300 }
301 }
302 if (NULL == dfs) {
303 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
304 "%s recvd seek - no corresponding request found for local fd %d",
305 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), local_fd);
306 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
307 return;
308 }
309 /* get the returned offset/status */
310 cnt = 1;
311 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &i64, &cnt, OPAL_INT64))) {
312 ORTE_ERROR_LOG(rc);
313 OBJ_RELEASE(dfs);
314 return;
315 }
316 /* pass it back to the original caller */
317 if (NULL != dfs->seek_cbfunc) {
318 dfs->seek_cbfunc(i64, dfs->cbdata);
319 }
320 /* release the request */
321 OBJ_RELEASE(dfs);
322 break;
323
324 case ORTE_DFS_READ_CMD:
325 /* unpack the request id for this read */
326 cnt = 1;
327 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
328 ORTE_ERROR_LOG(rc);
329 return;
330 }
331 /* search our list of requests to find the matching one */
332 dfs = NULL;
333 for (item = opal_list_get_first(&requests);
334 item != opal_list_get_end(&requests);
335 item = opal_list_get_next(item)) {
336 dptr = (orte_dfs_request_t*)item;
337 if (dptr->id == rid) {
338 /* request was fulfilled, so remove it */
339 opal_list_remove_item(&requests, item);
340 dfs = dptr;
341 break;
342 }
343 }
344 if (NULL == dfs) {
345 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
346 "%s recvd read - no corresponding request found for local fd %d",
347 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), local_fd);
348 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
349 return;
350 }
351 /* get the bytes read */
352 cnt = 1;
353 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &i64, &cnt, OPAL_INT64))) {
354 ORTE_ERROR_LOG(rc);
355 OBJ_RELEASE(dfs);
356 return;
357 }
358 if (0 < i64) {
359 cnt = i64;
360 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, dfs->read_buffer, &cnt, OPAL_UINT8))) {
361 ORTE_ERROR_LOG(rc);
362 OBJ_RELEASE(dfs);
363 return;
364 }
365 }
366 /* pass them back to the original caller */
367 if (NULL != dfs->read_cbfunc) {
368 dfs->read_cbfunc(i64, dfs->read_buffer, dfs->cbdata);
369 }
370 /* release the request */
371 OBJ_RELEASE(dfs);
372 break;
373
374 case ORTE_DFS_POST_CMD:
375 /* unpack the request id for this read */
376 cnt = 1;
377 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
378 ORTE_ERROR_LOG(rc);
379 return;
380 }
381 /* search our list of requests to find the matching one */
382 dfs = NULL;
383 for (item = opal_list_get_first(&requests);
384 item != opal_list_get_end(&requests);
385 item = opal_list_get_next(item)) {
386 dptr = (orte_dfs_request_t*)item;
387 if (dptr->id == rid) {
388 /* request was fulfilled, so remove it */
389 opal_list_remove_item(&requests, item);
390 dfs = dptr;
391 break;
392 }
393 }
394 if (NULL == dfs) {
395 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
396 "%s recvd post - no corresponding request found",
397 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
398 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
399 return;
400 }
401 if (NULL != dfs->post_cbfunc) {
402 dfs->post_cbfunc(dfs->cbdata);
403 }
404 OBJ_RELEASE(dfs);
405 break;
406
407 case ORTE_DFS_GETFM_CMD:
408 /* unpack the request id for this read */
409 cnt = 1;
410 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
411 ORTE_ERROR_LOG(rc);
412 return;
413 }
414 /* search our list of requests to find the matching one */
415 dfs = NULL;
416 for (item = opal_list_get_first(&requests);
417 item != opal_list_get_end(&requests);
418 item = opal_list_get_next(item)) {
419 dptr = (orte_dfs_request_t*)item;
420 if (dptr->id == rid) {
421 /* request was fulfilled, so remove it */
422 opal_list_remove_item(&requests, item);
423 dfs = dptr;
424 break;
425 }
426 }
427 if (NULL == dfs) {
428 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
429 "%s recvd getfm - no corresponding request found",
430 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
431 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
432 return;
433 }
434 /* return it to caller */
435 if (NULL != dfs->fm_cbfunc) {
436 dfs->fm_cbfunc(buffer, dfs->cbdata);
437 }
438 OBJ_RELEASE(dfs);
439 break;
440
441 default:
442 opal_output(0, "APP:DFS:RECV WTF");
443 break;
444 }
445 }
446
open_local_file(orte_dfs_request_t * dfs)447 static void open_local_file(orte_dfs_request_t *dfs)
448 {
449 char *filename;
450 orte_dfs_tracker_t *trk;
451
452 /* extract the filename from the uri */
453 if (NULL == (filename = opal_filename_from_uri(dfs->uri, NULL))) {
454 /* something wrong - error was reported, so just get out */
455 if (NULL != dfs->open_cbfunc) {
456 dfs->open_cbfunc(-1, dfs->cbdata);
457 }
458 OBJ_RELEASE(dfs);
459 return;
460 }
461 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
462 "%s opening local file %s",
463 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
464 filename);
465 /* attempt to open the file */
466 if (0 > (dfs->remote_fd = open(filename, O_RDONLY))) {
467 ORTE_ERROR_LOG(ORTE_ERR_FILE_OPEN_FAILURE);
468 if (NULL != dfs->open_cbfunc) {
469 dfs->open_cbfunc(dfs->remote_fd, dfs->cbdata);
470 }
471 return;
472 }
473 /* otherwise, create a tracker for this file */
474 trk = OBJ_NEW(orte_dfs_tracker_t);
475 trk->requestor.jobid = ORTE_PROC_MY_NAME->jobid;
476 trk->requestor.vpid = ORTE_PROC_MY_NAME->vpid;
477 trk->uri = strdup(dfs->uri);
478 /* break the uri down into scheme and filename */
479 trk->scheme = opal_uri_get_scheme(dfs->uri);
480 trk->filename = strdup(filename);
481 /* define the local fd */
482 trk->local_fd = local_fd++;
483 /* record the remote file descriptor */
484 trk->remote_fd = dfs->remote_fd;
485 /* add it to our list of active files */
486 opal_list_append(&active_files, &trk->super);
487 /* the file is locally hosted */
488 trk->host_daemon.jobid = ORTE_PROC_MY_DAEMON->jobid;
489 trk->host_daemon.vpid = ORTE_PROC_MY_DAEMON->vpid;
490 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
491 "%s local file %s mapped localfd %d to remotefd %d",
492 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
493 filename, trk->local_fd, trk->remote_fd);
494 /* let the caller know */
495 if (NULL != dfs->open_cbfunc) {
496 dfs->open_cbfunc(trk->local_fd, dfs->cbdata);
497 }
498 /* request will be released by the calling routing */
499 }
500
process_opens(int fd,short args,void * cbdata)501 static void process_opens(int fd, short args, void *cbdata)
502 {
503 orte_dfs_request_t *dfs = (orte_dfs_request_t*)cbdata;
504 int rc;
505 opal_buffer_t *buffer;
506 char *scheme, *host, *filename;
507 orte_process_name_t daemon;
508 opal_list_t lt;
509 opal_namelist_t *nm;
510
511 ORTE_ACQUIRE_OBJECT(dfs);
512
513 /* get the scheme to determine if we can process locally or not */
514 if (NULL == (scheme = opal_uri_get_scheme(dfs->uri))) {
515 ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
516 goto complete;
517 }
518
519 if (0 == strcmp(scheme, "nfs")) {
520 open_local_file(dfs);
521 /* the callback was done in the above function */
522 OBJ_RELEASE(dfs);
523 return;
524 }
525
526 if (0 != strcmp(scheme, "file")) {
527 /* not yet supported */
528 orte_show_help("orte_dfs_help.txt", "unsupported-filesystem",
529 true, dfs->uri);
530 goto complete;
531 }
532
533 /* dissect the uri to extract host and filename/path */
534 if (NULL == (filename = opal_filename_from_uri(dfs->uri, &host))) {
535 goto complete;
536 }
537 if (NULL == host) {
538 host = strdup(orte_process_info.nodename);
539 }
540
541 /* if the host is our own, then treat it as a local file */
542 if (orte_ifislocal(host)) {
543 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
544 "%s file %s on local host",
545 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
546 filename);
547 open_local_file(dfs);
548 /* the callback was done in the above function */
549 OBJ_RELEASE(dfs);
550 return;
551 }
552
553 /* ident the daemon on that host */
554 daemon.jobid = ORTE_PROC_MY_DAEMON->jobid;
555 /* fetch the daemon for this hostname */
556 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
557 "%s looking for daemon on host %s",
558 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), host);
559 OBJ_CONSTRUCT(<, opal_list_t);
560 if (ORTE_SUCCESS != (rc = opal_pmix.resolve_peers(host, daemon.jobid, <))) {
561 ORTE_ERROR_LOG(rc);
562 OBJ_DESTRUCT(<);
563 goto complete;
564 }
565 nm = (opal_namelist_t*)opal_list_get_first(<);
566 daemon.vpid = nm->name.vpid;
567 OPAL_LIST_DESTRUCT(<);
568
569 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
570 "%s file %s on host %s daemon %s",
571 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
572 filename, host, ORTE_NAME_PRINT(&daemon));
573
574 /* double-check: if it is our local daemon, then we
575 * treat this as local
576 */
577 if (daemon.vpid == ORTE_PROC_MY_DAEMON->vpid) {
578 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
579 "%s local file %s on same daemon",
580 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
581 filename);
582 open_local_file(dfs);
583 /* the callback was done in the above function */
584 OBJ_RELEASE(dfs);
585 return;
586 }
587
588 /* add this request to our local list so we can
589 * match it with the returned response when it comes
590 */
591 dfs->id = req_id++;
592 opal_list_append(&requests, &dfs->super);
593
594 /* setup a message for the daemon telling
595 * them what file we want to access
596 */
597 buffer = OBJ_NEW(opal_buffer_t);
598 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->cmd, 1, ORTE_DFS_CMD_T))) {
599 ORTE_ERROR_LOG(rc);
600 opal_list_remove_item(&requests, &dfs->super);
601 goto complete;
602 }
603 /* pass the request id */
604 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->id, 1, OPAL_UINT64))) {
605 ORTE_ERROR_LOG(rc);
606 opal_list_remove_item(&requests, &dfs->super);
607 goto complete;
608 }
609 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &filename, 1, OPAL_STRING))) {
610 ORTE_ERROR_LOG(rc);
611 opal_list_remove_item(&requests, &dfs->super);
612 goto complete;
613 }
614
615 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
616 "%s sending open file request to %s file %s",
617 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
618 ORTE_NAME_PRINT(&daemon),
619 filename);
620 /* send it */
621 if (0 > (rc = orte_rml.send_buffer_nb(orte_mgmt_conduit,
622 &daemon, buffer,
623 ORTE_RML_TAG_DFS_CMD,
624 orte_rml_send_callback, NULL))) {
625 ORTE_ERROR_LOG(rc);
626 OBJ_RELEASE(buffer);
627 opal_list_remove_item(&requests, &dfs->super);
628 goto complete;
629 }
630 /* don't release it */
631 return;
632
633 complete:
634 /* we get here if an error occurred - execute any
635 * pending callback so the proc doesn't hang
636 */
637 if (NULL != dfs->open_cbfunc) {
638 dfs->open_cbfunc(-1, dfs->cbdata);
639 }
640 OBJ_RELEASE(dfs);
641 }
642
643
644 /* in order to handle the possible opening/reading of files by
645 * multiple threads, we have to ensure that all operations are
646 * carried out in events - so the "open" cmd simply posts an
647 * event containing the required info, and then returns
648 */
dfs_open(char * uri,orte_dfs_open_callback_fn_t cbfunc,void * cbdata)649 static void dfs_open(char *uri,
650 orte_dfs_open_callback_fn_t cbfunc,
651 void *cbdata)
652 {
653 orte_dfs_request_t *dfs;
654
655 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
656 "%s opening file %s",
657 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), uri);
658
659 /* setup the request */
660 dfs = OBJ_NEW(orte_dfs_request_t);
661 dfs->cmd = ORTE_DFS_OPEN_CMD;
662 dfs->uri = strdup(uri);
663 dfs->open_cbfunc = cbfunc;
664 dfs->cbdata = cbdata;
665
666 /* post it for processing */
667 ORTE_THREADSHIFT(dfs, orte_event_base, process_opens, ORTE_SYS_PRI);
668 }
669
process_close(int fd,short args,void * cbdata)670 static void process_close(int fd, short args, void *cbdata)
671 {
672 orte_dfs_request_t *close_dfs = (orte_dfs_request_t*)cbdata;
673 orte_dfs_tracker_t *tptr, *trk;
674 opal_list_item_t *item;
675 opal_buffer_t *buffer;
676 int rc;
677
678 ORTE_ACQUIRE_OBJECT(close_dfs);
679
680 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
681 "%s closing fd %d",
682 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
683 close_dfs->local_fd);
684
685 /* look in our local records for this fd */
686 trk = NULL;
687 for (item = opal_list_get_first(&active_files);
688 item != opal_list_get_end(&active_files);
689 item = opal_list_get_next(item)) {
690 tptr = (orte_dfs_tracker_t*)item;
691 if (tptr->local_fd == close_dfs->local_fd) {
692 trk = tptr;
693 break;
694 }
695 }
696 if (NULL == trk) {
697 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
698 if (NULL != close_dfs->close_cbfunc) {
699 close_dfs->close_cbfunc(close_dfs->local_fd, close_dfs->cbdata);
700 }
701 OBJ_RELEASE(close_dfs);
702 return;
703 }
704
705 /* if the file is local, close it */
706 if (trk->host_daemon.vpid == ORTE_PROC_MY_DAEMON->vpid) {
707 close(trk->remote_fd);
708 goto complete;
709 }
710
711 /* setup a message for the daemon telling
712 * them what file to close
713 */
714 buffer = OBJ_NEW(opal_buffer_t);
715 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &close_dfs->cmd, 1, ORTE_DFS_CMD_T))) {
716 ORTE_ERROR_LOG(rc);
717 goto complete;
718 }
719 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &trk->remote_fd, 1, OPAL_INT))) {
720 ORTE_ERROR_LOG(rc);
721 goto complete;
722 }
723
724 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
725 "%s sending close file request to %s for fd %d",
726 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
727 ORTE_NAME_PRINT(&trk->host_daemon),
728 trk->local_fd);
729 /* send it */
730 if (0 > (rc = orte_rml.send_buffer_nb(orte_mgmt_conduit,
731 &trk->host_daemon, buffer,
732 ORTE_RML_TAG_DFS_CMD,
733 orte_rml_send_callback, NULL))) {
734 ORTE_ERROR_LOG(rc);
735 OBJ_RELEASE(buffer);
736 goto complete;
737 }
738
739 complete:
740 opal_list_remove_item(&active_files, &trk->super);
741 OBJ_RELEASE(trk);
742 if (NULL != close_dfs->close_cbfunc) {
743 close_dfs->close_cbfunc(close_dfs->local_fd, close_dfs->cbdata);
744 }
745 OBJ_RELEASE(close_dfs);
746 }
747
dfs_close(int fd,orte_dfs_close_callback_fn_t cbfunc,void * cbdata)748 static void dfs_close(int fd,
749 orte_dfs_close_callback_fn_t cbfunc,
750 void *cbdata)
751 {
752 orte_dfs_request_t *dfs;
753
754 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
755 "%s close called on fd %d",
756 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), fd);
757
758 dfs = OBJ_NEW(orte_dfs_request_t);
759 dfs->cmd = ORTE_DFS_CLOSE_CMD;
760 dfs->local_fd = fd;
761 dfs->close_cbfunc = cbfunc;
762 dfs->cbdata = cbdata;
763
764 /* post it for processing */
765 ORTE_THREADSHIFT(dfs, orte_event_base, process_close, ORTE_SYS_PRI);
766 }
767
process_sizes(int fd,short args,void * cbdata)768 static void process_sizes(int fd, short args, void *cbdata)
769 {
770 orte_dfs_request_t *size_dfs = (orte_dfs_request_t*)cbdata;
771 orte_dfs_tracker_t *tptr, *trk;
772 opal_list_item_t *item;
773 opal_buffer_t *buffer;
774 int rc;
775 struct stat buf;
776
777 ORTE_ACQUIRE_OBJECT(size_dfs);
778
779 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
780 "%s processing get_size on fd %d",
781 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
782 size_dfs->local_fd);
783
784 /* look in our local records for this fd */
785 trk = NULL;
786 for (item = opal_list_get_first(&active_files);
787 item != opal_list_get_end(&active_files);
788 item = opal_list_get_next(item)) {
789 tptr = (orte_dfs_tracker_t*)item;
790 if (tptr->local_fd == size_dfs->local_fd) {
791 trk = tptr;
792 break;
793 }
794 }
795 if (NULL == trk) {
796 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
797 OBJ_RELEASE(size_dfs);
798 return;
799 }
800
801 /* if the file is local, execute the seek on it - we
802 * stuck the "whence" value in the remote_fd
803 */
804 if (trk->host_daemon.vpid == ORTE_PROC_MY_DAEMON->vpid) {
805 /* stat the file and get its size */
806 if (0 > stat(trk->filename, &buf)) {
807 /* cannot stat file */
808 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
809 "%s could not stat %s",
810 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
811 trk->filename);
812 if (NULL != size_dfs->size_cbfunc) {
813 size_dfs->size_cbfunc(-1, size_dfs->cbdata);
814 }
815 } else {
816 if (NULL != size_dfs->size_cbfunc) {
817 size_dfs->size_cbfunc(buf.st_size, size_dfs->cbdata);
818 }
819 }
820 goto complete;
821 }
822 /* add this request to our local list so we can
823 * match it with the returned response when it comes
824 */
825 size_dfs->id = req_id++;
826 opal_list_append(&requests, &size_dfs->super);
827
828 /* setup a message for the daemon telling
829 * them what file we want to access
830 */
831 buffer = OBJ_NEW(opal_buffer_t);
832 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &size_dfs->cmd, 1, ORTE_DFS_CMD_T))) {
833 ORTE_ERROR_LOG(rc);
834 opal_list_remove_item(&requests, &size_dfs->super);
835 goto complete;
836 }
837 /* pass the request id */
838 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &size_dfs->id, 1, OPAL_UINT64))) {
839 ORTE_ERROR_LOG(rc);
840 opal_list_remove_item(&requests, &size_dfs->super);
841 goto complete;
842 }
843 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &trk->remote_fd, 1, OPAL_INT))) {
844 ORTE_ERROR_LOG(rc);
845 opal_list_remove_item(&requests, &size_dfs->super);
846 goto complete;
847 }
848
849 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
850 "%s sending get_size request to %s for fd %d",
851 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
852 ORTE_NAME_PRINT(&trk->host_daemon),
853 trk->local_fd);
854 /* send it */
855 if (0 > (rc = orte_rml.send_buffer_nb(orte_mgmt_conduit,
856 &trk->host_daemon, buffer,
857 ORTE_RML_TAG_DFS_CMD,
858 orte_rml_send_callback, NULL))) {
859 ORTE_ERROR_LOG(rc);
860 OBJ_RELEASE(buffer);
861 opal_list_remove_item(&requests, &size_dfs->super);
862 if (NULL != size_dfs->size_cbfunc) {
863 size_dfs->size_cbfunc(-1, size_dfs->cbdata);
864 }
865 goto complete;
866 }
867 /* leave the request there */
868 return;
869
870 complete:
871 OBJ_RELEASE(size_dfs);
872 }
873
dfs_get_file_size(int fd,orte_dfs_size_callback_fn_t cbfunc,void * cbdata)874 static void dfs_get_file_size(int fd,
875 orte_dfs_size_callback_fn_t cbfunc,
876 void *cbdata)
877 {
878 orte_dfs_request_t *dfs;
879
880 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
881 "%s get_size called on fd %d",
882 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), fd);
883
884 dfs = OBJ_NEW(orte_dfs_request_t);
885 dfs->cmd = ORTE_DFS_SIZE_CMD;
886 dfs->local_fd = fd;
887 dfs->size_cbfunc = cbfunc;
888 dfs->cbdata = cbdata;
889
890 /* post it for processing */
891 ORTE_THREADSHIFT(dfs, orte_event_base, process_sizes, ORTE_SYS_PRI);
892 }
893
894
process_seeks(int fd,short args,void * cbdata)895 static void process_seeks(int fd, short args, void *cbdata)
896 {
897 orte_dfs_request_t *seek_dfs = (orte_dfs_request_t*)cbdata;
898 orte_dfs_tracker_t *tptr, *trk;
899 opal_list_item_t *item;
900 opal_buffer_t *buffer;
901 int64_t i64;
902 int rc;
903 struct stat buf;
904
905 ORTE_ACQUIRE_OBJECT(seek_dfs);
906
907 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
908 "%s processing seek on fd %d",
909 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
910 seek_dfs->local_fd);
911
912 /* look in our local records for this fd */
913 trk = NULL;
914 for (item = opal_list_get_first(&active_files);
915 item != opal_list_get_end(&active_files);
916 item = opal_list_get_next(item)) {
917 tptr = (orte_dfs_tracker_t*)item;
918 if (tptr->local_fd == seek_dfs->local_fd) {
919 trk = tptr;
920 break;
921 }
922 }
923 if (NULL == trk) {
924 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
925 OBJ_RELEASE(seek_dfs);
926 return;
927 }
928
929 /* if the file is local, execute the seek on it - we
930 * stuck the "whence" value in the remote_fd
931 */
932 if (trk->host_daemon.vpid == ORTE_PROC_MY_DAEMON->vpid) {
933 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
934 "%s local seek on fd %d",
935 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
936 seek_dfs->local_fd);
937 /* stat the file and get its size */
938 if (0 > stat(trk->filename, &buf)) {
939 /* cannot stat file */
940 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
941 "%s could not stat %s",
942 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
943 trk->filename);
944 if (NULL != seek_dfs->seek_cbfunc) {
945 seek_dfs->seek_cbfunc(-1, seek_dfs->cbdata);
946 }
947 } else if (buf.st_size < seek_dfs->read_length &&
948 SEEK_SET == seek_dfs->remote_fd) {
949 /* seek would take us past EOF */
950 if (NULL != seek_dfs->seek_cbfunc) {
951 seek_dfs->seek_cbfunc(-1, seek_dfs->cbdata);
952 }
953 } else if (buf.st_size < (off_t)(trk->location + seek_dfs->read_length) &&
954 SEEK_CUR == seek_dfs->remote_fd) {
955 /* seek would take us past EOF */
956 if (NULL != seek_dfs->seek_cbfunc) {
957 seek_dfs->seek_cbfunc(-1, seek_dfs->cbdata);
958 }
959 } else {
960 lseek(trk->remote_fd, seek_dfs->read_length, seek_dfs->remote_fd);
961 if (SEEK_SET == seek_dfs->remote_fd) {
962 trk->location = seek_dfs->read_length;
963 } else {
964 trk->location += seek_dfs->read_length;
965 }
966 if (NULL != seek_dfs->seek_cbfunc) {
967 seek_dfs->seek_cbfunc(seek_dfs->read_length, seek_dfs->cbdata);
968 }
969 }
970 goto complete;
971 }
972 /* add this request to our local list so we can
973 * match it with the returned response when it comes
974 */
975 seek_dfs->id = req_id++;
976 opal_list_append(&requests, &seek_dfs->super);
977
978 /* setup a message for the daemon telling
979 * them what file to seek
980 */
981 buffer = OBJ_NEW(opal_buffer_t);
982 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &seek_dfs->cmd, 1, ORTE_DFS_CMD_T))) {
983 ORTE_ERROR_LOG(rc);
984 goto complete;
985 }
986 /* pass the request id */
987 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &seek_dfs->id, 1, OPAL_UINT64))) {
988 ORTE_ERROR_LOG(rc);
989 opal_list_remove_item(&requests, &seek_dfs->super);
990 goto complete;
991 }
992 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &trk->remote_fd, 1, OPAL_INT))) {
993 ORTE_ERROR_LOG(rc);
994 goto complete;
995 }
996 i64 = (int64_t)seek_dfs->read_length;
997 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &i64, 1, OPAL_INT64))) {
998 ORTE_ERROR_LOG(rc);
999 goto complete;
1000 }
1001 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &seek_dfs->remote_fd, 1, OPAL_INT))) {
1002 ORTE_ERROR_LOG(rc);
1003 goto complete;
1004 }
1005
1006 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
1007 "%s sending seek file request to %s for fd %d",
1008 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1009 ORTE_NAME_PRINT(&trk->host_daemon),
1010 trk->local_fd);
1011 /* send it */
1012 if (0 > (rc = orte_rml.send_buffer_nb(orte_mgmt_conduit,
1013 &trk->host_daemon, buffer,
1014 ORTE_RML_TAG_DFS_CMD,
1015 orte_rml_send_callback, NULL))) {
1016 ORTE_ERROR_LOG(rc);
1017 OBJ_RELEASE(buffer);
1018 goto complete;
1019 }
1020 /* leave the request */
1021 return;
1022
1023 complete:
1024 OBJ_RELEASE(seek_dfs);
1025 }
1026
1027
dfs_seek(int fd,long offset,int whence,orte_dfs_seek_callback_fn_t cbfunc,void * cbdata)1028 static void dfs_seek(int fd, long offset, int whence,
1029 orte_dfs_seek_callback_fn_t cbfunc,
1030 void *cbdata)
1031 {
1032 orte_dfs_request_t *dfs;
1033
1034 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
1035 "%s seek called on fd %d",
1036 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), fd);
1037
1038 dfs = OBJ_NEW(orte_dfs_request_t);
1039 dfs->cmd = ORTE_DFS_SEEK_CMD;
1040 dfs->local_fd = fd;
1041 dfs->read_length = offset;
1042 dfs->remote_fd = whence;
1043 dfs->seek_cbfunc = cbfunc;
1044 dfs->cbdata = cbdata;
1045
1046 /* post it for processing */
1047 ORTE_THREADSHIFT(dfs, orte_event_base, process_seeks, ORTE_SYS_PRI);
1048 }
1049
process_reads(int fd,short args,void * cbdata)1050 static void process_reads(int fd, short args, void *cbdata)
1051 {
1052 orte_dfs_request_t *read_dfs = (orte_dfs_request_t*)cbdata;
1053 orte_dfs_tracker_t *tptr, *trk;
1054 long nbytes;
1055 opal_list_item_t *item;
1056 opal_buffer_t *buffer;
1057 int64_t i64;
1058 int rc;
1059
1060 ORTE_ACQUIRE_OBJECT(read_dfs);
1061
1062 /* look in our local records for this fd */
1063 trk = NULL;
1064 for (item = opal_list_get_first(&active_files);
1065 item != opal_list_get_end(&active_files);
1066 item = opal_list_get_next(item)) {
1067 tptr = (orte_dfs_tracker_t*)item;
1068 if (tptr->local_fd == read_dfs->local_fd) {
1069 trk = tptr;
1070 break;
1071 }
1072 }
1073 if (NULL == trk) {
1074 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
1075 OBJ_RELEASE(read_dfs);
1076 return;
1077 }
1078
1079 /* if the file is local, read the desired bytes */
1080 if (trk->host_daemon.vpid == ORTE_PROC_MY_DAEMON->vpid) {
1081 nbytes = read(trk->remote_fd, read_dfs->read_buffer, read_dfs->read_length);
1082 if (0 < nbytes) {
1083 /* update our location */
1084 trk->location += nbytes;
1085 }
1086 /* pass them back to the caller */
1087 if (NULL != read_dfs->read_cbfunc) {
1088 read_dfs->read_cbfunc(nbytes, read_dfs->read_buffer, read_dfs->cbdata);
1089 }
1090 /* request is complete */
1091 OBJ_RELEASE(read_dfs);
1092 return;
1093 }
1094 /* add this request to our pending list */
1095 read_dfs->id = req_id++;
1096 opal_list_append(&requests, &read_dfs->super);
1097
1098 /* setup a message for the daemon telling
1099 * them what file to read
1100 */
1101 buffer = OBJ_NEW(opal_buffer_t);
1102 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &read_dfs->cmd, 1, ORTE_DFS_CMD_T))) {
1103 ORTE_ERROR_LOG(rc);
1104 goto complete;
1105 }
1106 /* include the request id */
1107 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &read_dfs->id, 1, OPAL_UINT64))) {
1108 ORTE_ERROR_LOG(rc);
1109 goto complete;
1110 }
1111 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &trk->remote_fd, 1, OPAL_INT))) {
1112 ORTE_ERROR_LOG(rc);
1113 goto complete;
1114 }
1115 i64 = (int64_t)read_dfs->read_length;
1116 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &i64, 1, OPAL_INT64))) {
1117 ORTE_ERROR_LOG(rc);
1118 goto complete;
1119 }
1120
1121 opal_output_verbose(1, orte_dfs_base_framework.framework_output,
1122 "%s sending read file request to %s for fd %d",
1123 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1124 ORTE_NAME_PRINT(&trk->host_daemon),
1125 trk->local_fd);
1126 /* send it */
1127 if (0 > (rc = orte_rml.send_buffer_nb(orte_mgmt_conduit,
1128 &trk->host_daemon, buffer,
1129 ORTE_RML_TAG_DFS_CMD,
1130 orte_rml_send_callback, NULL))) {
1131 ORTE_ERROR_LOG(rc);
1132 OBJ_RELEASE(buffer);
1133 }
1134 /* don't release the request */
1135 return;
1136
1137 complete:
1138 /* don't need to hang on to this request */
1139 opal_list_remove_item(&requests, &read_dfs->super);
1140 OBJ_RELEASE(read_dfs);
1141 }
1142
dfs_read(int fd,uint8_t * buffer,long length,orte_dfs_read_callback_fn_t cbfunc,void * cbdata)1143 static void dfs_read(int fd, uint8_t *buffer,
1144 long length,
1145 orte_dfs_read_callback_fn_t cbfunc,
1146 void *cbdata)
1147 {
1148 orte_dfs_request_t *dfs;
1149
1150 dfs = OBJ_NEW(orte_dfs_request_t);
1151 dfs->cmd = ORTE_DFS_READ_CMD;
1152 dfs->local_fd = fd;
1153 dfs->read_buffer = buffer;
1154 dfs->read_length = length;
1155 dfs->read_cbfunc = cbfunc;
1156 dfs->cbdata = cbdata;
1157
1158 /* post it for processing */
1159 ORTE_THREADSHIFT(dfs, orte_event_base, process_reads, ORTE_SYS_PRI);
1160 }
1161
process_posts(int fd,short args,void * cbdata)1162 static void process_posts(int fd, short args, void *cbdata)
1163 {
1164 orte_dfs_request_t *dfs = (orte_dfs_request_t*)cbdata;
1165 opal_buffer_t *buffer;
1166 int rc;
1167
1168 ORTE_ACQUIRE_OBJECT(dfs);
1169
1170 /* we will get confirmation in our receive function, so
1171 * add this request to our list */
1172 dfs->id = req_id++;
1173 opal_list_append(&requests, &dfs->super);
1174
1175 /* Send the buffer's contents to our local daemon for storage */
1176 buffer = OBJ_NEW(opal_buffer_t);
1177 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->cmd, 1, ORTE_DFS_CMD_T))) {
1178 ORTE_ERROR_LOG(rc);
1179 goto error;
1180 }
1181 /* include the request id */
1182 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->id, 1, OPAL_UINT64))) {
1183 ORTE_ERROR_LOG(rc);
1184 goto error;
1185 }
1186 /* add my name */
1187 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) {
1188 ORTE_ERROR_LOG(rc);
1189 goto error;
1190 }
1191 /* pack the payload */
1192 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->bptr, 1, OPAL_BUFFER))) {
1193 ORTE_ERROR_LOG(rc);
1194 goto error;
1195 }
1196 /* send it */
1197 if (0 > (rc = orte_rml.send_buffer_nb(orte_mgmt_conduit,
1198 ORTE_PROC_MY_DAEMON, buffer,
1199 ORTE_RML_TAG_DFS_CMD,
1200 orte_rml_send_callback, NULL))) {
1201 ORTE_ERROR_LOG(rc);
1202 goto error;
1203 }
1204 return;
1205
1206 error:
1207 OBJ_RELEASE(buffer);
1208 opal_list_remove_item(&requests, &dfs->super);
1209 if (NULL != dfs->post_cbfunc) {
1210 dfs->post_cbfunc(dfs->cbdata);
1211 }
1212 OBJ_RELEASE(dfs);
1213 }
1214
dfs_post_file_map(opal_buffer_t * bo,orte_dfs_post_callback_fn_t cbfunc,void * cbdata)1215 static void dfs_post_file_map(opal_buffer_t *bo,
1216 orte_dfs_post_callback_fn_t cbfunc,
1217 void *cbdata)
1218 {
1219 orte_dfs_request_t *dfs;
1220
1221 dfs = OBJ_NEW(orte_dfs_request_t);
1222 dfs->cmd = ORTE_DFS_POST_CMD;
1223 dfs->bptr = bo;
1224 dfs->post_cbfunc = cbfunc;
1225 dfs->cbdata = cbdata;
1226
1227 /* post it for processing */
1228 ORTE_THREADSHIFT(dfs, orte_event_base, process_posts, ORTE_SYS_PRI);
1229 }
1230
process_getfm(int fd,short args,void * cbdata)1231 static void process_getfm(int fd, short args, void *cbdata)
1232 {
1233 orte_dfs_request_t *dfs = (orte_dfs_request_t*)cbdata;
1234 opal_buffer_t *buffer;
1235 int rc;
1236
1237 ORTE_ACQUIRE_OBJECT(dfs);
1238
1239 /* we will get confirmation in our receive function, so
1240 * add this request to our list */
1241 dfs->id = req_id++;
1242 opal_list_append(&requests, &dfs->super);
1243
1244 /* Send the request to our local daemon */
1245 buffer = OBJ_NEW(opal_buffer_t);
1246 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->cmd, 1, ORTE_DFS_CMD_T))) {
1247 ORTE_ERROR_LOG(rc);
1248 goto error;
1249 }
1250 /* include the request id */
1251 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->id, 1, OPAL_UINT64))) {
1252 ORTE_ERROR_LOG(rc);
1253 goto error;
1254 }
1255 /* and the target */
1256 if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->target, 1, ORTE_NAME))) {
1257 ORTE_ERROR_LOG(rc);
1258 goto error;
1259 }
1260 /* send it */
1261 if (0 > (rc = orte_rml.send_buffer_nb(orte_mgmt_conduit,
1262 ORTE_PROC_MY_DAEMON, buffer,
1263 ORTE_RML_TAG_DFS_CMD,
1264 orte_rml_send_callback, NULL))) {
1265 ORTE_ERROR_LOG(rc);
1266 goto error;
1267 }
1268 return;
1269
1270 error:
1271 OBJ_RELEASE(buffer);
1272 opal_list_remove_item(&requests, &dfs->super);
1273 if (NULL != dfs->fm_cbfunc) {
1274 dfs->fm_cbfunc(NULL, dfs->cbdata);
1275 }
1276 OBJ_RELEASE(dfs);
1277 }
1278
dfs_get_file_map(orte_process_name_t * target,orte_dfs_fm_callback_fn_t cbfunc,void * cbdata)1279 static void dfs_get_file_map(orte_process_name_t *target,
1280 orte_dfs_fm_callback_fn_t cbfunc,
1281 void *cbdata)
1282 {
1283 orte_dfs_request_t *dfs;
1284
1285 dfs = OBJ_NEW(orte_dfs_request_t);
1286 dfs->cmd = ORTE_DFS_GETFM_CMD;
1287 dfs->target.jobid = target->jobid;
1288 dfs->target.vpid = target->vpid;
1289 dfs->fm_cbfunc = cbfunc;
1290 dfs->cbdata = cbdata;
1291
1292 /* post it for processing */
1293 ORTE_THREADSHIFT(dfs, orte_event_base, process_getfm, ORTE_SYS_PRI);
1294 }
1295
dfs_load_file_maps(orte_jobid_t jobid,opal_buffer_t * bo,orte_dfs_load_callback_fn_t cbfunc,void * cbdata)1296 static void dfs_load_file_maps(orte_jobid_t jobid,
1297 opal_buffer_t *bo,
1298 orte_dfs_load_callback_fn_t cbfunc,
1299 void *cbdata)
1300 {
1301 /* apps don't store file maps */
1302 if (NULL != cbfunc) {
1303 cbfunc(cbdata);
1304 }
1305 }
1306
dfs_purge_file_maps(orte_jobid_t jobid,orte_dfs_purge_callback_fn_t cbfunc,void * cbdata)1307 static void dfs_purge_file_maps(orte_jobid_t jobid,
1308 orte_dfs_purge_callback_fn_t cbfunc,
1309 void *cbdata)
1310 {
1311 /* apps don't store file maps */
1312 if (NULL != cbfunc) {
1313 cbfunc(cbdata);
1314 }
1315 }
1316