1 /* _______________________________________________________________________
2
3 DAKOTA: Design Analysis Kit for Optimization and Terascale Applications
4 Copyright 2014-2020 National Technology & Engineering Solutions of Sandia, LLC (NTESS).
5 This software is distributed under the GNU Lesser General Public License.
6 For more information, see the README file in the top Dakota directory.
7 _______________________________________________________________________ */
8
9 //- Class: ProcessHandleApplicInterface
10 //- Description: Class implementation
11 //- Owner: Mike Eldred
12
13 #include "DakotaResponse.hpp"
14 #include "ParamResponsePair.hpp"
15 #include "ProcessHandleApplicInterface.hpp"
16 #include "ProblemDescDB.hpp"
17 #include "ParallelLibrary.hpp"
18 #include "WorkdirHelper.hpp"
19 #include <algorithm>
20 #ifdef HAVE_SYS_WAIT_H
21 #include <sys/wait.h> // for wait process status macros (Posix only)
22 #endif
23
24 namespace Dakota {
25
map_bookkeeping(pid_t pid,int fn_eval_id)26 void ProcessHandleApplicInterface::map_bookkeeping(pid_t pid, int fn_eval_id)
27 {
28 // store the process & eval ids in a map. The correspondence in completed
29 // process id and fn eval id is then mapped to the appropriate index of
30 // prp_queue in process_local_evaluation(). Correspondence between
31 // evalProcessIdMap and beforeSynchCorePRPQueue orders cannot be assumed
32 // due to hybrid parallelism, i.e. ApplicationInterface::serve_asynch().
33 evalProcessIdMap[pid] = fn_eval_id;
34 }
35
36
37 void ProcessHandleApplicInterface::
process_local_evaluation(PRPQueue & prp_queue,const pid_t pid)38 process_local_evaluation(PRPQueue& prp_queue, const pid_t pid)
39 {
40 // Common processing code used by {wait,test}_local_evaluations()
41
42 // Map pid to fn_eval_id (using evalProcessIdMap[pid] does the wrong thing
43 // if the pid key is not found)
44 std::map<pid_t, int>::iterator map_iter = evalProcessIdMap.find(pid);
45 if (map_iter == evalProcessIdMap.end()) {
46 // should not happen so long as wait ignores any 2nd level child processes
47 Cerr << "Error: pid returned from wait does not match any 1st level child "
48 << "process for this interface." << std::endl;
49 abort_handler(-1);
50 }
51 int fn_eval_id = map_iter->second;
52
53 // now populate the corresponding response by reading the results file
54 PRPQueueIter queue_it = lookup_by_eval_id(prp_queue, fn_eval_id);
55 if (queue_it == prp_queue.end()) {
56 Cerr << "Error: failure in queue lookup within ProcessHandleApplicInterface"
57 << "::process_local_evaluation()." << std::endl;
58 abort_handler(-1);
59 }
60 Response response = queue_it->response(); // shallow copy
61 try {
62 read_results_files(response, fn_eval_id, final_eval_id_tag(fn_eval_id));
63 }
64 catch(const FileReadException& fr_except) {
65 // For forks, there is no potential for an file write race
66 // condition since the process has completed -> an exception
67 // involving an incomplete file/data set is a true error.
68 Cerr << fr_except.what() << std::endl;
69 abort_handler(INTERFACE_ERROR);
70 }
71
72 catch(const FunctionEvalFailure& fneval_except) {
73 // If a FunctionEvalFailure exception ("fail" detected in results
74 // file) is caught, call manage_failure which will either (1) repair the
75 // failure and populate response, or (2) abort the run. NOTE: this
76 // destroys load balancing but trying to load balance failure recovery
77 // would be more difficult than it is worth.
78 manage_failure(queue_it->variables(), response.active_set(), response,
79 fn_eval_id);
80 }
81
82 // bookkeep the completed job
83 //queue_it->response(response); // not needed (shallow)
84 //replace_by_eval_id(prp_queue, fn_eval_id, *queue_it);// not needed (shallow)
85 completionSet.insert(fn_eval_id);
86 evalProcessIdMap.erase(pid);
87 }
88
89
90 /** Manage the input filter, 1 or more analysis programs, and the
91 output filter in blocking or nonblocking mode as governed by
92 block_flag. In the case of a single analysis and no filters, a
93 single fork is performed, while in other cases, an initial fork is
94 reforked multiple times. Called from derived_map() with
95 block_flag == BLOCK and from derived_map_asynch() with block_flag
96 == FALL_THROUGH. Uses create_analysis_process() to spawn
97 individual program components within the function evaluation. */
create_evaluation_process(bool block_flag)98 pid_t ProcessHandleApplicInterface::create_evaluation_process(bool block_flag)
99 {
100 size_t i;
101 if (evalCommRank == 0 && !suppressOutput) {
102 if (block_flag) {
103 if (eaDedMasterFlag)
104 Cout << "blocking fork dynamic schedule: ";
105 else if (numAnalysisServers > 1)
106 Cout << "blocking fork static schedule: ";
107 else
108 Cout << "blocking fork: ";
109 }
110 else
111 Cout << "nonblocking fork: ";
112
113 if (!iFilterName.empty()) {
114 Cout << substitute_params_and_results(iFilterName, paramsFileName, resultsFileName);
115 if (commandLineArgs)
116 Cout << ' ' << paramsFileName << ' ' << resultsFileName;
117 Cout << "; ";
118 }
119 for (i=0; i<numAnalysisDrivers; i++) {
120 String params_file = paramsFileName;
121 String results_file = resultsFileName;
122 if (multipleParamsFiles)
123 params_file += '.' + std::to_string(i+1);
124 if (numAnalysisDrivers > 1)
125 results_file += '.' + std::to_string(i+1);
126 Cout << substitute_params_and_results(programNames[i], params_file, results_file);
127 if (commandLineArgs) {
128 Cout << ' ' << params_file << ' ' << results_file;
129 }
130 if (i != numAnalysisDrivers-1)
131 Cout << "; ";
132 }
133 if (!oFilterName.empty()) {
134 Cout << "; " << substitute_params_and_results(oFilterName, paramsFileName, resultsFileName);
135 if (commandLineArgs)
136 Cout << ' ' << paramsFileName << ' ' << resultsFileName;
137 }
138 Cout << '\n';
139 }
140 // Cout must be flushed prior to the fork to clear the stdout buffer.
141 // Otherwise, the intermediate process receives a copy of the contents of
142 // this buffer and outputs the contents on the next buffer flush.
143 Cout << std::flush;
144
145 pid_t pid = 0;
146 if (iFilterName.empty() && oFilterName.empty() && numAnalysisDrivers == 1) {
147 // fork the one-piece interface directly (no intermediate process required)
148 driver_argument_list(1);
149 bool new_group = (!block_flag && evalProcessIdMap.empty());
150 pid = create_analysis_process(block_flag, new_group);
151 if (new_group) // collapse the 2 groups on behalf of wait_evaluation()
152 evaluation_process_group_id(analysis_process_group_id());
153 }
154 else if (evalCommSize > 1) {
155 // run a blocking schedule of single-proc. analyses over analysis servers.
156 // The schedules are executed by the parent processes. Forks are not used
157 // at this level since the message passing provides the needed asynchrony
158 // at the evaluation level (unlike the final case below where 2 levels of
159 // forks must be used to provide asynchrony at the eval level).
160
161 if (!block_flag) {
162 Cerr << "Error: multiprocessor evalComm does not support nonblocking "
163 << "ProcessHandleApplicInterface::create_evaluation_process()."
164 << std::endl;
165 abort_handler(-1);
166 }
167
168 if (!iFilterName.empty() && evalCommRank == 0) {
169 ifilter_argument_list();
170 create_analysis_process(BLOCK, false);
171 }
172
173 // Schedule analyses using either master-slave/dynamic or peer/static
174 if (eaDedMasterFlag) {
175 // master-slave dynamic scheduling requires a central point of control
176 // and therefore needs separate schedule & serve functions.
177 if (evalCommRank == 0)
178 master_dynamic_schedule_analyses();
179 else {
180 // in message passing mode, the user must explicitly specify analysis
181 // concurrency to get hybrid parallelism
182 if (asynchLocalAnalysisConcurrency > 1)
183 serve_analyses_asynch();
184 else
185 serve_analyses_synch();
186 }
187 }
188 else {
189 // static scheduling does not require special schedule/serve functions
190 // since it can support message passing & hybrid mode directly using
191 // synchronous_local & asynchronous_local with staggered starts. However,
192 // it does require barriers since there's no scheduler to enforce
193 // synchronization.
194
195 // avoid peers 2-n initiating analyses prior to completion of
196 // write_parameters_files() by peer 1
197 parallelLib.barrier_e();
198
199 if (asynchLocalAnalysisConcurrency > 1) // hybrid requires explicit spec
200 asynchronous_local_analyses(analysisServerId, numAnalysisDrivers,
201 numAnalysisServers); // hybrid mode
202 else
203 synchronous_local_analyses(analysisServerId, numAnalysisDrivers,
204 numAnalysisServers); // msg passing mode
205
206 // avoid peer 1 reading all the results files before peers 2-n have
207 // completed writing them
208 parallelLib.barrier_e();
209 }
210
211 if (!oFilterName.empty() && evalCommRank == 0) {
212 ofilter_argument_list();
213 create_analysis_process(BLOCK, false);
214 }
215 }
216 else { // schedule all analyses local to this processor
217
218 // If the evaluation is nonblocking, then an intermediate process must be
219 // forked to manage the 3-piece interface, multiple analysis drivers, or
220 // both. The intermediate process provides asynchrony at the evaluation
221 // level, even though the input filter execution, analysis drivers
222 // scheduling, and output filter execution are blocking.
223
224 // In the 3-piece case, it would be desirable to utilize the same format as
225 // is used in the SysCall case, i.e., grouping i_filter, simulator, and
226 // o_filter with ()'s and ;'s, but this is not supported by the exec family
227 // of functions (see exec man pages).
228
229 // Since we want this intermediate process to be able to execute
230 // concurrently with the parent dakota and other asynch processes,
231 // fork() should be used here since there is no matching exec().
232 // (NOTE: vfork should NOT be used since exec doesn't immediately follow!)
233 if (!block_flag) {
234 #ifdef HAVE_WORKING_FORK
235 // Note: working fork is necessary but not sufficient.
236 // Need a better configure-time probe for reforking of a forked process.
237 pid = fork();
238 #else
239 // Note: Windows spawn does not currently support this asynch mode.
240 Cerr << "Error: fork is not supported under this OS." << std::endl;
241 abort_handler(-1);
242 #endif
243 }
244
245 bool new_group = evalProcessIdMap.empty();
246 if (block_flag || pid == 0) {
247 // if nonblocking, then this is the intermediate (1st level child)
248 // process. If blocking, then no fork has yet been performed, and
249 // this is the parent.
250
251 if (!block_flag) // child evaluation process from fork() above
252 join_evaluation_process_group(new_group);
253
254 // run the input filter by reforking the child process (2nd level child).
255 // This refork is always blocking. The ifilter is used just once per
256 // evaluation since it is responsible for non-replicated pre-processing.
257 // Any replicated pre-processing must be part of the analysis drivers
258 // (see DirectApplicInterface::derived_map for additional info).
259 if (!iFilterName.empty()) {
260 ifilter_argument_list();
261 create_analysis_process(BLOCK, false);
262 }
263
264 // run the simulator programs by reforking the child process again
265 // (additional 2nd level children). These reforks run a blocking schedule
266 // (i.e., while jobs within the schedule may be nonblocking, the schedule
267 // itself does not complete until all analyses are completed). Need for a
268 // nonblocking schedule is not currently anticipated, since the 1st level
269 // fork provides the nonblocking evaluations needed for nonblocking
270 // synchronization by certain iterators.
271 if (asynchLocalAnalysisFlag) // asynch w/ concurrency limit>1 or unlimited
272 asynchronous_local_analyses(1, numAnalysisDrivers, 1);
273 else
274 synchronous_local_analyses(1, numAnalysisDrivers, 1);
275
276 // run the output filter by reforking the child process again (another 2nd
277 // level child). This refork is always blocking. The ofilter is used
278 // just once per evaluation since it is responsible for non-replicated
279 // post-processing. Any replicated post-processing must be part of the
280 // analysis drivers (see DirectApplicInterface::derived_map for additional
281 // info).
282 if (!oFilterName.empty()) {
283 ofilter_argument_list();
284 create_analysis_process(BLOCK, false);
285 }
286
287 // If nonblocking, then this is the 1st level child process. Quit this
288 // process now.
289 if (!block_flag)
290 _exit(1);
291 }
292 else // parent process from fork() above
293 if (new_group)
294 evaluation_process_group_id(pid);
295 }
296
297 return(pid);
298 }
299
300
301 /** Schedule analyses asynchronously on the local processor using a dynamic
302 scheduling approach (start to end in step increments). Concurrency is
303 limited by asynchLocalAnalysisConcurrency. Modeled after
304 ApplicationInterface::asynchronous_local_evaluations(). NOTE: This
305 function should be elevated to ApplicationInterface if and when another
306 derived interface class supports asynchronous local analyses. */
307 void ProcessHandleApplicInterface::
asynchronous_local_analyses(int start,int end,int step)308 asynchronous_local_analyses(int start, int end, int step)
309 {
310 if (numAnalysisDrivers <= 1) {
311 Cerr << "Error: ForkApplicInterface::asynchronous_local_analyses() "
312 << "should only be called for multiple analysis_drivers." << std::endl;
313 abort_handler(-1);
314 }
315
316 // link process id's to analysis id's for asynch jobs
317 analysisProcessIdMap.clear();
318 size_t i, num_sends;
319 int analysis_id, num_jobs = 1 + (int)((end-start)/step);
320 pid_t pid, proc_gp;
321
322 if (asynchLocalAnalysisConcurrency) // concurrency limited by user
323 num_sends = std::min(asynchLocalAnalysisConcurrency, num_jobs);
324 else // default: no limit, launch all jobs in first pass
325 num_sends = num_jobs; // don't need to limit num_sends to 1 in the message
326 // passing case since this fn is only called by the message passing
327 // schedulers if there is asynchLocalAnalysisConcurrency
328
329 #ifdef MPI_DEBUG
330 Cout << "First pass: initiating " << num_sends << " asynchronous analyses\n";
331 #endif // MPI_DEBUG
332 bool new_group = true;
333 for (i=0; i<num_sends; ++i) {
334 analysis_id = start + i*step;
335 #ifdef MPI_DEBUG
336 Cout << "Initiating analysis " << analysis_id << std::endl; // flush buffer
337 #endif // MPI_DEBUG
338 driver_argument_list(analysis_id);
339 pid = create_analysis_process(FALL_THROUGH, new_group);
340 analysisProcessIdMap[pid] = analysis_id;
341 new_group = false;
342 }
343
344 #ifdef MPI_DEBUG
345 if (num_sends < num_jobs)
346 Cout << "Second pass: dynamic scheduling " << num_jobs-num_sends
347 << " remaining analyses\n";
348 #endif // MPI_DEBUG
349 size_t send_cntr = num_sends, recv_cntr = 0, completed;
350 while (recv_cntr < num_jobs) {
351 #ifdef MPI_DEBUG
352 Cout << "Waiting on completed analyses" << std::endl;
353 #endif // MPI_DEBUG
354 recv_cntr += completed = wait_local_analyses(); // virtual fn
355 new_group = analysisProcessIdMap.empty();
356 for (i=0; i<completed; ++i) {
357 if (send_cntr < num_jobs) {
358 analysis_id = start + send_cntr*step;
359 #ifdef MPI_DEBUG
360 Cout << "Initiating analysis " << analysis_id << std::endl;
361 #endif // MPI_DEBUG
362 driver_argument_list(analysis_id);
363 pid = create_analysis_process(FALL_THROUGH, new_group);
364 analysisProcessIdMap[pid] = analysis_id;
365 ++send_cntr; new_group = false;
366 }
367 else break;
368 }
369 }
370 }
371
372
373 /** This code runs multiple asynch analyses on each server. It is modeled
374 after ApplicationInterface::serve_evaluations_asynch(). NOTE: This fn
375 should be elevated to ApplicationInterface if and when another derived
376 interface class supports hybrid analysis parallelism. */
serve_analyses_asynch()377 void ProcessHandleApplicInterface::serve_analyses_asynch()
378 {
379 if (numAnalysisDrivers <= 1) {
380 Cerr << "Error: ForkApplicInterface::serve_analyses_asynch should "
381 << "only be called for multiple analysis_drivers." << std::endl;
382 abort_handler(-1);
383 }
384
385 // link process id's to analysis id's for asynch jobs
386 pid_t pid, proc_gp; int analysis_id;
387 analysisProcessIdMap.clear();
388
389 MPI_Status status; // holds MPI_SOURCE, MPI_TAG, & MPI_ERROR
390 MPI_Request recv_request = MPI_REQUEST_NULL;
391 bool new_group = true; size_t num_running = 0;
392
393 // ----------------------------------------------------------
394 // Step 1: block on first message before entering while loops
395 // ----------------------------------------------------------
396 parallelLib.recv_ea(analysis_id, 0, MPI_ANY_TAG, status);
397
398 do { // main loop
399
400 // -----------------------------------------------------------------
401 // Step 2: check for additional incoming messages & execute all jobs
402 // -----------------------------------------------------------------
403 int mpi_test_flag = 1;
404 // check on asynchLocalAnalysisConcurrency limit below only required for
405 // static scheduling (self scheduler handles this from the master side).
406 // Leave it in for completeness even though static analysis scheduler
407 // doesn't use serve fns.
408 while (mpi_test_flag && analysis_id &&
409 num_running < asynchLocalAnalysisConcurrency) {
410 // test for completion
411 if (recv_request)
412 parallelLib.test(recv_request, mpi_test_flag, status);
413
414 // if test indicates a completion: unpack, execute, & repost
415 if (mpi_test_flag) {
416 analysis_id = status.MPI_TAG;
417
418 if (analysis_id) {
419 // execute
420 driver_argument_list(analysis_id);
421 pid = create_analysis_process(FALL_THROUGH, new_group);
422 analysisProcessIdMap[pid] = analysis_id; ++num_running;
423 new_group = false;
424 // repost
425 parallelLib.irecv_ea(analysis_id, 0, MPI_ANY_TAG, recv_request);
426 }
427 }
428 }
429
430 // -----------------------------------------------------------------
431 // Step 3: check for any completed jobs and return results to master
432 // -----------------------------------------------------------------
433 if (num_running)
434 num_running -= test_local_analyses_send(analysis_id); // virtual fn
435
436 } while (analysis_id || num_running);
437 }
438
439
440 /** Check to see if the process terminated abnormally (WIFEXITED(status)==0)
441 or if either execvp or the application returned a status code of -1
442 (WIFEXITED(status)!=0 && (signed char)WEXITSTATUS(status)==-1). If one
443 of these conditions is detected, output a failure message and abort.
444 Note: the application code should not return a status code of -1 unless an
445 immediate abort of dakota is wanted. If for instance, failure capturing
446 is to be used, the application code should write the word "FAIL" to the
447 appropriate results file and return a status code of 0 through exit(). */
check_wait(pid_t pid,int status)448 void ProcessHandleApplicInterface::check_wait(pid_t pid, int status)
449 {
450 if (pid == -1) {
451 Cerr << "\nFork error in parent retrieving child; error code " << errno
452 << ":\n ";
453 switch (errno) {
454 case ECHILD:
455 Cerr << "The process specified by pid does not exist or is not a\n "
456 << "child of the calling process"; break;
457 case EINTR:
458 Cerr << "WNOHANG was not set and an unblocked signal or a SIGCHLD\n "
459 << "was caught"; break;
460 case EINVAL:
461 Cerr << "The options argument was invalid"; break;
462 default:
463 Cerr << std::strerror(errno); break;
464 }
465 Cerr << ".\nConsider using system interface." << std::endl;
466 abort_handler(-1);
467 }
468 else if (pid > 0) {
469 #ifdef HAVE_SYS_WAIT_H
470 // checks for an abnormal exit status and, if present, abort.
471 // From waitpid man pages: If &status is not NULL, waitpid() stores status
472 // information which can be inspected with the following macros (which take
473 // status as an argument):
474 // > WIFEXITED(status): returns true if the child terminated normally, that
475 // is, by calling exit or _exit, or by returning from main().
476 // > WEXITSTATUS(status): returns the exit status of the child. This
477 // consists of the status argument that the child specified in a call
478 // to exit or _exit or as the argument for a return statement in main().
479 // This macro should only be employed if WIFEXITED returned true.
480 if ( WIFEXITED(status) == 0 || (signed char)WEXITSTATUS(status) == -1 ) {
481 Cerr << "Fork application failure, aborting.\nSystem error message: "
482 << std::strerror(errno) << '\n';
483 // std::strerror() returns the system error message associated with errno
484 // (a system constant defined in errno.h containing the number of the last
485 // observed error). Note: If it becomes an issue, it would also be
486 // possible to kill simulation drivers or intermediate processes for the
487 // 3-piece interface here, based on pid's obtained from any asynch forks.
488 // This would be beneficial on systems that do not perform a complete
489 // cleanup when one (asynch) child process dies.
490 abort_handler(INTERFACE_ERROR);
491 }
492 #endif // HAVE_SYS_WAIT_H
493 }
494 }
495
496
join_evaluation_process_group(bool new_group)497 void ProcessHandleApplicInterface::join_evaluation_process_group(bool new_group)
498 { } // no-op
499
500
join_analysis_process_group(bool new_group)501 void ProcessHandleApplicInterface::join_analysis_process_group(bool new_group)
502 { } // no-op
503
504
evaluation_process_group_id(pid_t pgid)505 void ProcessHandleApplicInterface::evaluation_process_group_id(pid_t pgid)
506 { } // no-op
507
508
evaluation_process_group_id() const509 pid_t ProcessHandleApplicInterface::evaluation_process_group_id() const
510 { return 0; } // dummy default
511
512
analysis_process_group_id(pid_t pgid)513 void ProcessHandleApplicInterface::analysis_process_group_id(pid_t pgid)
514 { } // no-op
515
516
analysis_process_group_id() const517 pid_t ProcessHandleApplicInterface::analysis_process_group_id() const
518 { return 0; } // dummy default
519
520
521 /** This function will split the analysis command in argList[0] based
522 on whitespace, but preserve spaces within quoted strings, such
523 that quoted strings can be passed as single command arguments.
524 NOTE: This function allocates memory in av that might be
525 implicitly freed when the child exits (control never returns to
526 caller). driver_and_args needs to be a return argument because av
527 will contain pointers into its c_str()'s when done.
528 */
529 void ProcessHandleApplicInterface::
create_command_arguments(boost::shared_array<const char * > & av,StringArray & driver_and_args)530 create_command_arguments(boost::shared_array<const char*>& av,
531 StringArray& driver_and_args)
532 {
533
534 String driver_subbed = substitute_params_and_results(argList[0], argList[1], argList[2]);
535 driver_and_args = WorkdirHelper::tokenize_driver(driver_subbed);
536
537 // if commandLineArgs, include params/results files at end
538 size_t nargs = driver_and_args.size();
539 if (commandLineArgs)
540 nargs += 2;
541 // ideally would use char *const argv[],
542 av.reset(new const char*[nargs+1]); // need extra NULL
543
544 size_t i = 0;
545 for ( ; i<driver_and_args.size(); ++i)
546 av[i] = driver_and_args[i].c_str();
547 // put params/results filenames if needed
548 if (commandLineArgs) {
549 av[i++] = argList[1].c_str();
550 av[i++] = argList[2].c_str();
551 }
552
553 // last entry must be null-terminated
554 av[i] = NULL;
555 }
556
557
558 } // namespace Dakota
559