1 /*  _______________________________________________________________________
2 
3     DAKOTA: Design Analysis Kit for Optimization and Terascale Applications
4     Copyright 2014-2020 National Technology & Engineering Solutions of Sandia, LLC (NTESS).
5     This software is distributed under the GNU Lesser General Public License.
6     For more information, see the README file in the top Dakota directory.
7     _______________________________________________________________________ */
8 
9 //- Class:        ProcessHandleApplicInterface
10 //- Description:  Class implementation
11 //- Owner:        Mike Eldred
12 
13 #include "DakotaResponse.hpp"
14 #include "ParamResponsePair.hpp"
15 #include "ProcessHandleApplicInterface.hpp"
16 #include "ProblemDescDB.hpp"
17 #include "ParallelLibrary.hpp"
18 #include "WorkdirHelper.hpp"
19 #include <algorithm>
20 #ifdef HAVE_SYS_WAIT_H
21 #include <sys/wait.h> // for wait process status macros (Posix only)
22 #endif
23 
24 namespace Dakota {
25 
map_bookkeeping(pid_t pid,int fn_eval_id)26 void ProcessHandleApplicInterface::map_bookkeeping(pid_t pid, int fn_eval_id)
27 {
28   // store the process & eval ids in a map.  The correspondence in completed
29   // process id and fn eval id is then mapped to the appropriate index of
30   // prp_queue in process_local_evaluation().  Correspondence between
31   // evalProcessIdMap and beforeSynchCorePRPQueue orders cannot be assumed
32   // due to hybrid parallelism, i.e. ApplicationInterface::serve_asynch().
33   evalProcessIdMap[pid] = fn_eval_id;
34 }
35 
36 
37 void ProcessHandleApplicInterface::
process_local_evaluation(PRPQueue & prp_queue,const pid_t pid)38 process_local_evaluation(PRPQueue& prp_queue, const pid_t pid)
39 {
40   // Common processing code used by {wait,test}_local_evaluations()
41 
42   // Map pid to fn_eval_id (using evalProcessIdMap[pid] does the wrong thing
43   // if the pid key is not found)
44   std::map<pid_t, int>::iterator map_iter = evalProcessIdMap.find(pid);
45   if (map_iter == evalProcessIdMap.end()) {
46     // should not happen so long as wait ignores any 2nd level child processes
47     Cerr << "Error: pid returned from wait does not match any 1st level child "
48 	 << "process for this interface." << std::endl;
49     abort_handler(-1);
50   }
51   int fn_eval_id = map_iter->second;
52 
53   // now populate the corresponding response by reading the results file
54   PRPQueueIter queue_it = lookup_by_eval_id(prp_queue, fn_eval_id);
55   if (queue_it == prp_queue.end()) {
56     Cerr << "Error: failure in queue lookup within ProcessHandleApplicInterface"
57 	 << "::process_local_evaluation()." << std::endl;
58     abort_handler(-1);
59   }
60   Response response = queue_it->response(); // shallow copy
61   try {
62     read_results_files(response, fn_eval_id, final_eval_id_tag(fn_eval_id));
63   }
64   catch(const FileReadException& fr_except) {
65     // For forks, there is no potential for an file write race
66     // condition since the process has completed -> an exception
67     // involving an incomplete file/data set is a true error.
68     Cerr << fr_except.what() << std::endl;
69     abort_handler(INTERFACE_ERROR);
70   }
71 
72   catch(const FunctionEvalFailure& fneval_except) {
73     // If a FunctionEvalFailure exception ("fail" detected in results
74     // file) is caught, call manage_failure which will either (1) repair the
75     // failure and populate response, or (2) abort the run.  NOTE: this
76     // destroys load balancing but trying to load balance failure recovery
77     // would be more difficult than it is worth.
78     manage_failure(queue_it->variables(), response.active_set(), response,
79 		   fn_eval_id);
80   }
81 
82   // bookkeep the completed job
83   //queue_it->response(response);                        // not needed (shallow)
84   //replace_by_eval_id(prp_queue, fn_eval_id, *queue_it);// not needed (shallow)
85   completionSet.insert(fn_eval_id);
86   evalProcessIdMap.erase(pid);
87 }
88 
89 
90 /** Manage the input filter, 1 or more analysis programs, and the
91     output filter in blocking or nonblocking mode as governed by
92     block_flag.  In the case of a single analysis and no filters, a
93     single fork is performed, while in other cases, an initial fork is
94     reforked multiple times.  Called from derived_map() with
95     block_flag == BLOCK and from derived_map_asynch() with block_flag
96     == FALL_THROUGH.  Uses create_analysis_process() to spawn
97     individual program components within the function evaluation. */
create_evaluation_process(bool block_flag)98 pid_t ProcessHandleApplicInterface::create_evaluation_process(bool block_flag)
99 {
100   size_t i;
101   if (evalCommRank == 0 && !suppressOutput) {
102     if (block_flag) {
103       if (eaDedMasterFlag)
104         Cout << "blocking fork dynamic schedule: ";
105       else if (numAnalysisServers > 1)
106         Cout << "blocking fork static schedule: ";
107       else
108         Cout << "blocking fork: ";
109     }
110     else
111       Cout << "nonblocking fork: ";
112 
113     if (!iFilterName.empty()) {
114       Cout << substitute_params_and_results(iFilterName, paramsFileName, resultsFileName);
115       if (commandLineArgs)
116         Cout << ' ' << paramsFileName << ' ' << resultsFileName;
117       Cout << "; ";
118     }
119     for (i=0; i<numAnalysisDrivers; i++) {
120       String params_file = paramsFileName;
121       String results_file = resultsFileName;
122       if (multipleParamsFiles)
123         params_file += '.' + std::to_string(i+1);
124       if (numAnalysisDrivers > 1)
125         results_file += '.' + std::to_string(i+1);
126       Cout << substitute_params_and_results(programNames[i], params_file, results_file);
127       if (commandLineArgs) {
128         Cout << ' ' << params_file << ' ' << results_file;
129       }
130       if (i != numAnalysisDrivers-1)
131         Cout << "; ";
132     }
133     if (!oFilterName.empty()) {
134       Cout << "; " << substitute_params_and_results(oFilterName, paramsFileName, resultsFileName);
135       if (commandLineArgs)
136         Cout << ' ' << paramsFileName << ' ' << resultsFileName;
137     }
138     Cout << '\n';
139   }
140   // Cout must be flushed prior to the fork to clear the stdout buffer.
141   // Otherwise, the intermediate process receives a copy of the contents of
142   // this buffer and outputs the contents on the next buffer flush.
143   Cout << std::flush;
144 
145   pid_t pid = 0;
146   if (iFilterName.empty() && oFilterName.empty() && numAnalysisDrivers == 1) {
147     // fork the one-piece interface directly (no intermediate process required)
148     driver_argument_list(1);
149     bool new_group = (!block_flag && evalProcessIdMap.empty());
150     pid = create_analysis_process(block_flag, new_group);
151     if (new_group) // collapse the 2 groups on behalf of wait_evaluation()
152       evaluation_process_group_id(analysis_process_group_id());
153   }
154   else if (evalCommSize > 1) {
155     // run a blocking schedule of single-proc. analyses over analysis servers.
156     // The schedules are executed by the parent processes.  Forks are not used
157     // at this level since the message passing provides the needed asynchrony
158     // at the evaluation level (unlike the final case below where 2 levels of
159     // forks must be used to provide asynchrony at the eval level).
160 
161     if (!block_flag) {
162       Cerr << "Error: multiprocessor evalComm does not support nonblocking "
163 	   << "ProcessHandleApplicInterface::create_evaluation_process()."
164 	   << std::endl;
165       abort_handler(-1);
166     }
167 
168     if (!iFilterName.empty() && evalCommRank == 0) {
169       ifilter_argument_list();
170       create_analysis_process(BLOCK, false);
171     }
172 
173     // Schedule analyses using either master-slave/dynamic or peer/static
174     if (eaDedMasterFlag) {
175       // master-slave dynamic scheduling requires a central point of control
176       // and therefore needs separate schedule & serve functions.
177       if (evalCommRank == 0)
178         master_dynamic_schedule_analyses();
179       else {
180 	// in message passing mode, the user must explicitly specify analysis
181 	// concurrency to get hybrid parallelism
182         if (asynchLocalAnalysisConcurrency > 1)
183           serve_analyses_asynch();
184         else
185           serve_analyses_synch();
186       }
187     }
188     else {
189       // static scheduling does not require special schedule/serve functions
190       // since it can support message passing & hybrid mode directly using
191       // synchronous_local & asynchronous_local with staggered starts.  However,
192       // it does require barriers since there's no scheduler to enforce
193       // synchronization.
194 
195       // avoid peers 2-n initiating analyses prior to completion of
196       // write_parameters_files() by peer 1
197       parallelLib.barrier_e();
198 
199       if (asynchLocalAnalysisConcurrency > 1) // hybrid requires explicit spec
200         asynchronous_local_analyses(analysisServerId, numAnalysisDrivers,
201                                     numAnalysisServers); // hybrid mode
202       else
203         synchronous_local_analyses(analysisServerId, numAnalysisDrivers,
204                                    numAnalysisServers);  // msg passing mode
205 
206       // avoid peer 1 reading all the results files before peers 2-n have
207       // completed writing them
208       parallelLib.barrier_e();
209     }
210 
211     if (!oFilterName.empty() && evalCommRank == 0) {
212       ofilter_argument_list();
213       create_analysis_process(BLOCK, false);
214     }
215   }
216   else { // schedule all analyses local to this processor
217 
218     // If the evaluation is nonblocking, then an intermediate process must be
219     // forked to manage the 3-piece interface, multiple analysis drivers, or
220     // both.  The intermediate process provides asynchrony at the evaluation
221     // level, even though the input filter execution, analysis drivers
222     // scheduling, and output filter execution are blocking.
223 
224     // In the 3-piece case, it would be desirable to utilize the same format as
225     // is used in the SysCall case, i.e., grouping i_filter, simulator, and
226     // o_filter with ()'s and ;'s, but this is not supported by the exec family
227     // of functions (see exec man pages).
228 
229     // Since we want this intermediate process to be able to execute
230     // concurrently with the parent dakota and other asynch processes,
231     // fork() should be used here since there is no matching exec().
232     // (NOTE: vfork should NOT be used since exec doesn't immediately follow!)
233     if (!block_flag) {
234 #ifdef HAVE_WORKING_FORK
235       // Note: working fork is necessary but not sufficient.
236       // Need a better configure-time probe for reforking of a forked process.
237       pid = fork();
238 #else
239       // Note: Windows spawn does not currently support this asynch mode.
240       Cerr << "Error: fork is not supported under this OS." << std::endl;
241       abort_handler(-1);
242 #endif
243     }
244 
245     bool new_group = evalProcessIdMap.empty();
246     if (block_flag || pid == 0) {
247       // if nonblocking, then this is the intermediate (1st level child)
248       // process.  If blocking, then no fork has yet been performed, and
249       // this is the parent.
250 
251       if (!block_flag) // child evaluation process from fork() above
252 	join_evaluation_process_group(new_group);
253 
254       // run the input filter by reforking the child process (2nd level child).
255       // This refork is always blocking.  The ifilter is used just once per
256       // evaluation since it is responsible for non-replicated pre-processing.
257       // Any replicated pre-processing must be part of the analysis drivers
258       // (see DirectApplicInterface::derived_map for additional info).
259       if (!iFilterName.empty()) {
260         ifilter_argument_list();
261         create_analysis_process(BLOCK, false);
262       }
263 
264       // run the simulator programs by reforking the child process again
265       // (additional 2nd level children).  These reforks run a blocking schedule
266       // (i.e., while jobs within the schedule may be nonblocking, the schedule
267       // itself does not complete until all analyses are completed).  Need for a
268       // nonblocking schedule is not currently anticipated, since the 1st level
269       // fork provides the nonblocking evaluations needed for nonblocking
270       // synchronization by certain iterators.
271       if (asynchLocalAnalysisFlag) // asynch w/ concurrency limit>1 or unlimited
272         asynchronous_local_analyses(1, numAnalysisDrivers, 1);
273       else
274         synchronous_local_analyses(1, numAnalysisDrivers, 1);
275 
276       // run the output filter by reforking the child process again (another 2nd
277       // level child).  This refork is always blocking.  The ofilter is used
278       // just once per evaluation since it is responsible for non-replicated
279       // post-processing.  Any replicated post-processing must be part of the
280       // analysis drivers (see DirectApplicInterface::derived_map for additional
281       // info).
282       if (!oFilterName.empty()) {
283         ofilter_argument_list();
284         create_analysis_process(BLOCK, false);
285       }
286 
287       // If nonblocking, then this is the 1st level child process.  Quit this
288       // process now.
289       if (!block_flag)
290         _exit(1);
291     }
292     else // parent process from fork() above
293       if (new_group)
294 	evaluation_process_group_id(pid);
295   }
296 
297   return(pid);
298 }
299 
300 
301 /** Schedule analyses asynchronously on the local processor using a dynamic
302     scheduling approach (start to end in step increments).  Concurrency is
303     limited by asynchLocalAnalysisConcurrency.  Modeled after
304     ApplicationInterface::asynchronous_local_evaluations().  NOTE: This
305     function should be elevated to ApplicationInterface if and when another
306     derived interface class supports asynchronous local analyses. */
307 void ProcessHandleApplicInterface::
asynchronous_local_analyses(int start,int end,int step)308 asynchronous_local_analyses(int start, int end, int step)
309 {
310   if (numAnalysisDrivers <= 1) {
311     Cerr << "Error: ForkApplicInterface::asynchronous_local_analyses() "
312 	 << "should only be called for multiple analysis_drivers." << std::endl;
313     abort_handler(-1);
314   }
315 
316   // link process id's to analysis id's for asynch jobs
317   analysisProcessIdMap.clear();
318   size_t i, num_sends;
319   int analysis_id, num_jobs = 1 + (int)((end-start)/step);
320   pid_t pid, proc_gp;
321 
322   if (asynchLocalAnalysisConcurrency)  // concurrency limited by user
323     num_sends = std::min(asynchLocalAnalysisConcurrency, num_jobs);
324   else // default: no limit, launch all jobs in first pass
325     num_sends = num_jobs; // don't need to limit num_sends to 1 in the message
326     // passing case since this fn is only called by the message passing
327     // schedulers if there is asynchLocalAnalysisConcurrency
328 
329 #ifdef MPI_DEBUG
330   Cout << "First pass: initiating " << num_sends << " asynchronous analyses\n";
331 #endif // MPI_DEBUG
332   bool new_group = true;
333   for (i=0; i<num_sends; ++i) {
334     analysis_id = start + i*step;
335 #ifdef MPI_DEBUG
336     Cout << "Initiating analysis " << analysis_id << std::endl; // flush buffer
337 #endif // MPI_DEBUG
338     driver_argument_list(analysis_id);
339     pid = create_analysis_process(FALL_THROUGH, new_group);
340     analysisProcessIdMap[pid] = analysis_id;
341     new_group = false;
342   }
343 
344 #ifdef MPI_DEBUG
345   if (num_sends < num_jobs)
346     Cout << "Second pass: dynamic scheduling " << num_jobs-num_sends
347          << " remaining analyses\n";
348 #endif // MPI_DEBUG
349   size_t send_cntr = num_sends, recv_cntr = 0, completed;
350   while (recv_cntr < num_jobs) {
351 #ifdef MPI_DEBUG
352     Cout << "Waiting on completed analyses" << std::endl;
353 #endif // MPI_DEBUG
354     recv_cntr += completed = wait_local_analyses(); // virtual fn
355     new_group = analysisProcessIdMap.empty();
356     for (i=0; i<completed; ++i) {
357       if (send_cntr < num_jobs) {
358         analysis_id = start + send_cntr*step;
359 #ifdef MPI_DEBUG
360         Cout << "Initiating analysis " << analysis_id << std::endl;
361 #endif // MPI_DEBUG
362 	driver_argument_list(analysis_id);
363         pid = create_analysis_process(FALL_THROUGH, new_group);
364         analysisProcessIdMap[pid] = analysis_id;
365         ++send_cntr; new_group = false;
366       }
367       else break;
368     }
369   }
370 }
371 
372 
373 /** This code runs multiple asynch analyses on each server.  It is modeled
374     after ApplicationInterface::serve_evaluations_asynch().  NOTE: This fn
375     should be elevated to ApplicationInterface if and when another derived
376     interface class supports hybrid analysis parallelism. */
serve_analyses_asynch()377 void ProcessHandleApplicInterface::serve_analyses_asynch()
378 {
379   if (numAnalysisDrivers <= 1) {
380     Cerr << "Error: ForkApplicInterface::serve_analyses_asynch should "
381 	 << "only be called for multiple analysis_drivers." << std::endl;
382     abort_handler(-1);
383   }
384 
385   // link process id's to analysis id's for asynch jobs
386   pid_t pid, proc_gp; int analysis_id;
387   analysisProcessIdMap.clear();
388 
389   MPI_Status  status; // holds MPI_SOURCE, MPI_TAG, & MPI_ERROR
390   MPI_Request recv_request = MPI_REQUEST_NULL;
391   bool new_group = true; size_t num_running = 0;
392 
393   // ----------------------------------------------------------
394   // Step 1: block on first message before entering while loops
395   // ----------------------------------------------------------
396   parallelLib.recv_ea(analysis_id, 0, MPI_ANY_TAG, status);
397 
398   do { // main loop
399 
400     // -----------------------------------------------------------------
401     // Step 2: check for additional incoming messages & execute all jobs
402     // -----------------------------------------------------------------
403     int mpi_test_flag = 1;
404     // check on asynchLocalAnalysisConcurrency limit below only required for
405     // static scheduling (self scheduler handles this from the master side).
406     // Leave it in for completeness even though static analysis scheduler
407     // doesn't use serve fns.
408     while (mpi_test_flag && analysis_id &&
409 	   num_running < asynchLocalAnalysisConcurrency) {
410       // test for completion
411       if (recv_request)
412         parallelLib.test(recv_request, mpi_test_flag, status);
413 
414       // if test indicates a completion: unpack, execute, & repost
415       if (mpi_test_flag) {
416         analysis_id = status.MPI_TAG;
417 
418         if (analysis_id) {
419 	  // execute
420 	  driver_argument_list(analysis_id);
421           pid = create_analysis_process(FALL_THROUGH, new_group);
422 	  analysisProcessIdMap[pid] = analysis_id; ++num_running;
423 	  new_group = false;
424 	  // repost
425           parallelLib.irecv_ea(analysis_id, 0, MPI_ANY_TAG, recv_request);
426 	}
427       }
428     }
429 
430     // -----------------------------------------------------------------
431     // Step 3: check for any completed jobs and return results to master
432     // -----------------------------------------------------------------
433     if (num_running)
434       num_running -= test_local_analyses_send(analysis_id); // virtual fn
435 
436   } while (analysis_id || num_running);
437 }
438 
439 
440 /** Check to see if the process terminated abnormally (WIFEXITED(status)==0)
441     or if either execvp or the application returned a status code of -1
442     (WIFEXITED(status)!=0 && (signed char)WEXITSTATUS(status)==-1).  If one
443     of these conditions is detected, output a failure message and abort.
444     Note: the application code should not return a status code of -1 unless an
445     immediate abort of dakota is wanted.  If for instance, failure capturing
446     is to be used, the application code should write the word "FAIL" to the
447     appropriate results file and return a status code of 0 through exit(). */
check_wait(pid_t pid,int status)448 void ProcessHandleApplicInterface::check_wait(pid_t pid, int status)
449 {
450   if (pid == -1) {
451     Cerr << "\nFork error in parent retrieving child; error code " << errno
452 	 << ":\n  ";
453     switch (errno) {
454     case ECHILD:
455       Cerr << "The process specified by pid does not exist or is not a\n  "
456 	   << "child of the calling process";     break;
457     case EINTR:
458       Cerr << "WNOHANG was not set and an unblocked signal or a SIGCHLD\n  "
459 	   << "was caught";                       break;
460     case EINVAL:
461       Cerr << "The options argument was invalid"; break;
462     default:
463       Cerr << std::strerror(errno);               break;
464     }
465     Cerr << ".\nConsider using system interface." << std::endl;
466     abort_handler(-1);
467   }
468   else if (pid > 0) {
469 #ifdef HAVE_SYS_WAIT_H
470     // checks for an abnormal exit status and, if present, abort.
471     // From waitpid man pages: If &status is not NULL, waitpid() stores status
472     // information which can be inspected with the following macros (which take
473     // status as an argument):
474     // > WIFEXITED(status): returns true if the child terminated normally, that
475     //   is, by calling exit or _exit, or by returning from main().
476     // > WEXITSTATUS(status): returns the exit status of the child. This
477     //   consists of the status argument that the child specified in a call
478     //   to exit or _exit or as the argument for a return statement in main().
479     //   This macro should only be employed if WIFEXITED returned true.
480     if ( WIFEXITED(status) == 0 || (signed char)WEXITSTATUS(status) == -1 ) {
481       Cerr << "Fork application failure, aborting.\nSystem error message: "
482 	   << std::strerror(errno) << '\n';
483       // std::strerror() returns the system error message associated with errno
484       // (a system constant defined in errno.h containing the number of the last
485       // observed error).  Note: If it becomes an issue, it would also be
486       // possible to kill simulation drivers or intermediate processes for the
487       // 3-piece interface here, based on pid's obtained from any asynch forks.
488       // This would be beneficial on systems that do not perform a complete
489       // cleanup when one (asynch) child process dies.
490       abort_handler(INTERFACE_ERROR);
491     }
492 #endif // HAVE_SYS_WAIT_H
493   }
494 }
495 
496 
join_evaluation_process_group(bool new_group)497 void ProcessHandleApplicInterface::join_evaluation_process_group(bool new_group)
498 { } // no-op
499 
500 
join_analysis_process_group(bool new_group)501 void ProcessHandleApplicInterface::join_analysis_process_group(bool new_group)
502 { } // no-op
503 
504 
evaluation_process_group_id(pid_t pgid)505 void ProcessHandleApplicInterface::evaluation_process_group_id(pid_t pgid)
506 { } // no-op
507 
508 
evaluation_process_group_id() const509 pid_t ProcessHandleApplicInterface::evaluation_process_group_id() const
510 { return 0; } // dummy default
511 
512 
analysis_process_group_id(pid_t pgid)513 void ProcessHandleApplicInterface::analysis_process_group_id(pid_t pgid)
514 { } // no-op
515 
516 
analysis_process_group_id() const517 pid_t ProcessHandleApplicInterface::analysis_process_group_id() const
518 { return 0; } // dummy default
519 
520 
521 /** This function will split the analysis command in argList[0] based
522     on whitespace, but preserve spaces within quoted strings, such
523     that quoted strings can be passed as single command arguments.
524     NOTE: This function allocates memory in av that might be
525     implicitly freed when the child exits (control never returns to
526     caller).  driver_and_args needs to be a return argument because av
527     will contain pointers into its c_str()'s when done.
528 */
529 void ProcessHandleApplicInterface::
create_command_arguments(boost::shared_array<const char * > & av,StringArray & driver_and_args)530 create_command_arguments(boost::shared_array<const char*>& av,
531 			 StringArray& driver_and_args)
532 {
533 
534   String driver_subbed = substitute_params_and_results(argList[0], argList[1], argList[2]);
535   driver_and_args = WorkdirHelper::tokenize_driver(driver_subbed);
536 
537   // if commandLineArgs, include params/results files at end
538   size_t nargs = driver_and_args.size();
539   if (commandLineArgs)
540     nargs += 2;
541   // ideally would use char *const argv[],
542   av.reset(new const char*[nargs+1]);  // need extra NULL
543 
544   size_t i = 0;
545   for ( ; i<driver_and_args.size(); ++i)
546     av[i] = driver_and_args[i].c_str();
547   // put params/results filenames if needed
548   if (commandLineArgs) {
549     av[i++] = argList[1].c_str();
550     av[i++] = argList[2].c_str();
551   }
552 
553   // last entry must be null-terminated
554   av[i] = NULL;
555 }
556 
557 
558 } // namespace Dakota
559