1 /*  _______________________________________________________________________
2 
3     DAKOTA: Design Analysis Kit for Optimization and Terascale Applications
4     Copyright 2014-2020 National Technology & Engineering Solutions of Sandia, LLC (NTESS).
5     This software is distributed under the GNU Lesser General Public License.
6     For more information, see the README file in the top Dakota directory.
7     _______________________________________________________________________ */
8 
9 //- Class:        ApplicationInterface
10 //- Description:  Implementation of base class for application interfaces
11 //- Owner:        Mike Eldred
12 
13 #include "dakota_system_defs.hpp"
14 #include "ApplicationInterface.hpp"
15 #include "ParamResponsePair.hpp"
16 #include "ProblemDescDB.hpp"
17 #include "ParallelLibrary.hpp"
18 
19 namespace Dakota {
20 
21 extern PRPCache data_pairs;
22 
23 ApplicationInterface::
ApplicationInterface(const ProblemDescDB & problem_db)24 ApplicationInterface(const ProblemDescDB& problem_db):
25   Interface(BaseConstructor(), problem_db),
26   parallelLib(problem_db.parallel_library()),
27   batchEval(problem_db.get_bool("interface.batch")),
28   asynchFlag(problem_db.get_bool("interface.asynch")),
29   batchIdCntr(0),
30   suppressOutput(false), evalCommSize(1), evalCommRank(0), evalServerId(1),
31   eaDedMasterFlag(false), analysisCommSize(1), analysisCommRank(0),
32   analysisServerId(1), multiProcAnalysisFlag(false),
33   asynchLocalAnalysisFlag(false),
34   asynchLocalEvalConcSpec(
35     problem_db.get_int("interface.asynch_local_evaluation_concurrency")),
36   asynchLocalAnalysisConcSpec(
37     problem_db.get_int("interface.asynch_local_analysis_concurrency")),
38   numAnalysisDrivers(
39     problem_db.get_sa("interface.application.analysis_drivers").size()),
40   failureMessage("Failure captured"),
41   worldSize(parallelLib.world_size()), worldRank(parallelLib.world_rank()),
42   iteratorCommSize(1), iteratorCommRank(0), ieMessagePass(false),
43   numEvalServersSpec(problem_db.get_int("interface.evaluation_servers")),
44   procsPerEvalSpec(problem_db.get_int("interface.processors_per_evaluation")),
45   eaMessagePass(false),
46   numAnalysisServersSpec(problem_db.get_int("interface.analysis_servers")),
47   procsPerAnalysisSpec(
48     problem_db.get_int("interface.direct.processors_per_analysis")),
49   lenVarsMessage(0), lenVarsActSetMessage(0), lenResponseMessage(0),
50   lenPRPairMessage(0),
51   evalScheduling(problem_db.get_short("interface.evaluation_scheduling")),
52   analysisScheduling(problem_db.get_short("interface.analysis_scheduling")),
53   asynchLocalEvalStatic(
54     problem_db.get_short("interface.local_evaluation_scheduling") ==
55     STATIC_SCHEDULING),
56   interfaceSynchronization(
57       (batchEval | asynchFlag) ?
58         ASYNCHRONOUS_INTERFACE : SYNCHRONOUS_INTERFACE
59       ),
60   headerFlag(true),
61   asvControlFlag(problem_db.get_bool("interface.active_set_vector")),
62   evalCacheFlag(problem_db.get_bool("interface.evaluation_cache")),
63   nearbyDuplicateDetect(
64     problem_db.get_bool("interface.nearby_evaluation_cache")),
65   nearbyTolerance(
66     problem_db.get_real("interface.nearby_evaluation_cache_tolerance")),
67   restartFileFlag(problem_db.get_bool("interface.restart_file")),
68   sharedRespData(SharedResponseData(problem_db)),
69   gradientType(problem_db.get_string("responses.gradient_type")),
70   hessianType(problem_db.get_string("responses.hessian_type")),
71   gradMixedAnalyticIds(
72     problem_db.get_is("responses.gradients.mixed.id_analytic")),
73   hessMixedAnalyticIds(
74     problem_db.get_is("responses.hessians.mixed.id_analytic")),
75   failAction(problem_db.get_string("interface.failure_capture.action")),
76   failRetryLimit(problem_db.get_int("interface.failure_capture.retry_limit")),
77   failRecoveryFnVals(
78     problem_db.get_rv("interface.failure_capture.recovery_fn_vals")),
79   sendBuffers(NULL), recvBuffers(NULL), recvRequests(NULL)
80 {
81   // set coreMappings flag based on presence of analysis_drivers specification
82   coreMappings = (numAnalysisDrivers > 0);
83   if (!coreMappings && !algebraicMappings && interfaceType > DEFAULT_INTERFACE){
84     Cerr << "\nError: no parameter to response mapping defined in "
85 	 << "ApplicationInterface.\n" << std::endl;
86     abort_handler(-1);
87   }
88 }
89 
90 
~ApplicationInterface()91 ApplicationInterface::~ApplicationInterface()
92 { }
93 
94 
95 void ApplicationInterface::
init_communicators(const IntArray & message_lengths,int max_eval_concurrency)96 init_communicators(const IntArray& message_lengths, int max_eval_concurrency)
97 {
98   // Initialize comms for evaluations (partitions of iteratorComm).
99 
100   bool direct_int         = (interfaceType & DIRECT_INTERFACE_BIT);
101   // Peer dynamic requires asynch local executions and dynamic job assignment
102   bool peer_dynamic_avail = (!direct_int && !asynchLocalEvalStatic);
103   // min_procs_per_eval captures overrides at the analysis level.  This lower
104   // bound is defined bottom up and counters the top-down allocation of
105   // resources.  max_procs_per_eval captures available concurrency and explicit
106   // user overrides at the analysis level (user overrides for the evaluation
107   // level can be managed by resolve_inputs()).
108   // > procsPerAnalysisSpec defaults to zero, which is the result when the
109   //   processors_per_analysis spec is unreachable (system/fork/spawn)
110   int min_ppa = 1, max_ppa = (direct_int) ? worldSize : 1;
111   int min_procs_per_eval = ProblemDescDB::
112     min_procs_per_level(min_ppa, procsPerAnalysisSpec, numAnalysisServersSpec);
113   int max_procs_per_eval = ProblemDescDB::
114     max_procs_per_level(max_ppa, procsPerAnalysisSpec, numAnalysisServersSpec,
115 			analysisScheduling, asynchLocalAnalysisConcSpec, false,
116 			std::max(1, numAnalysisDrivers));
117 
118   const ParallelLevel& ie_pl = parallelLib.init_evaluation_communicators(
119     numEvalServersSpec, procsPerEvalSpec, min_procs_per_eval,
120     max_procs_per_eval, max_eval_concurrency, asynchLocalEvalConcSpec,
121     PUSH_UP, evalScheduling, peer_dynamic_avail);
122 
123   set_evaluation_communicators(message_lengths);
124 
125   // Initialize communicators for analyses (partitions of evalComm).  This
126   // call is protected from an iterator dedicated master in the same way a
127   // meta-iterator master never calls init_eval_comms (prevents some warnings
128   // in ParallelLibrary::resolve_inputs when there is a user spec that can't
129   // be supported from a single processor).  However, a dedicated master can
130   // run a local single-processor job if the algorithm uses a synchronous eval
131   // (see derived_master_overload() usage in Model::evaluate()).
132   if (ieDedMasterFlag && iteratorCommRank == 0 && multiProcEvalFlag)
133     init_serial_analyses();
134   else {
135     const ParallelLevel& ea_pl = parallelLib.init_analysis_communicators(
136       numAnalysisServersSpec, procsPerAnalysisSpec, min_ppa, max_ppa,
137       numAnalysisDrivers, asynchLocalAnalysisConcSpec, PUSH_UP,
138       analysisScheduling, false); // peer_dynamic is not available
139 
140     set_analysis_communicators();
141   }
142 
143   // print parallel configuration (prior to configuration checking
144   // so that error messages can be more readily debugged)
145   if (worldSize > 1)
146     parallelLib.print_configuration();
147 
148   // check for configuration errors
149   init_communicators_checks(max_eval_concurrency);
150 }
151 
152 
153 void ApplicationInterface::
set_communicators(const IntArray & message_lengths,int max_eval_concurrency)154 set_communicators(const IntArray& message_lengths, int max_eval_concurrency)
155 {
156   set_evaluation_communicators(message_lengths);
157 
158   // Initialize communicators for analyses (partitions of evalComm).  This call
159   // is protected from an iterator dedicated master in cases where local
160   // evaluations are precluded (see comments in init_communicators()).
161   if (ieDedMasterFlag && iteratorCommRank == 0 && multiProcEvalFlag)
162     init_serial_analyses();
163   else
164     set_analysis_communicators();
165 
166   // check for configuration errors
167   set_communicators_checks(max_eval_concurrency);
168 }
169 
170 
171 void ApplicationInterface::
set_evaluation_communicators(const IntArray & message_lengths)172 set_evaluation_communicators(const IntArray& message_lengths)
173 {
174   // Buffer sizes for function evaluation message transfers are estimated in
175   // Model::init_communicators() so that hard-coded MPIUnpackBuffer
176   // lengths can be avoided.  This estimation is reperformed on every call to
177   // IteratorScheduler::run_iterator().  A Bcast is not currently needed since
178   // every processor performs the estimation.
179   //MPI_Bcast(message_lengths.data(), 4, MPI_INT, 0, iteratorComm);
180   lenVarsMessage       = message_lengths[0];
181   lenVarsActSetMessage = message_lengths[1];
182   lenResponseMessage   = message_lengths[2];
183   lenPRPairMessage     = message_lengths[3];
184 
185   // Pull data from (the lowest) concurrent iterator partition.  The active
186   // parallel configuration is managed in Model::init_communicators().
187   const ParallelConfiguration& pc = parallelLib.parallel_configuration();
188   const ParallelLevel& mi_pl = pc.mi_parallel_level(); // last mi level
189   iteratorCommSize = mi_pl.server_communicator_size();
190   iteratorCommRank = mi_pl.server_communicator_rank();
191 
192   // These attributes are set by init_evaluation_communicators and are not
193   // available for use in the constructor.
194   const ParallelLevel& ie_pl = pc.ie_parallel_level();
195   ieDedMasterFlag = ie_pl.dedicated_master();
196   ieMessagePass   = ie_pl.message_pass();
197   numEvalServers  = ie_pl.num_servers(); // may differ from numEvalServersSpec
198   evalCommRank    = ie_pl.server_communicator_rank();
199   evalCommSize    = ie_pl.server_communicator_size();
200   evalServerId    = ie_pl.server_id();
201   if (ieDedMasterFlag)
202     multiProcEvalFlag = (ie_pl.processors_per_server() > 1 ||
203 			 ie_pl.processor_remainder());
204   else // peer: split flag insufficient if 1 server
205     multiProcEvalFlag = (evalCommSize > 1); // could vary
206 
207   // simplify downstream logic by resetting default asynch local concurrency
208   // to 1 for the case of message passing.  This allows schedulers to more
209   // readily distinguish unlimited concurrency for asynch local parallelism
210   // (default is unlimited unless user concurrency spec) from message passing
211   // parallelism with synchronous local evals (default; hybrid mode requires
212   // user spec > 1).
213   asynchLocalEvalConcurrency = (ieMessagePass && asynchLocalEvalConcSpec == 0)
214                              ? 1 : asynchLocalEvalConcSpec;
215 }
216 
217 
set_analysis_communicators()218 void ApplicationInterface::set_analysis_communicators()
219 {
220   const ParallelConfiguration& pc = parallelLib.parallel_configuration();
221   const ParallelLevel& ea_pl = pc.ea_parallel_level();
222 
223   // Extract attributes for analysis partitions
224   eaDedMasterFlag    = ea_pl.dedicated_master();
225   eaMessagePass      = ea_pl.message_pass();
226   numAnalysisServers = ea_pl.num_servers();//may differ from numAnalysisSrvSpec
227   analysisCommRank   = ea_pl.server_communicator_rank();
228   analysisCommSize   = ea_pl.server_communicator_size();
229   analysisServerId   = ea_pl.server_id();
230   if (eaDedMasterFlag)
231     multiProcAnalysisFlag = (ea_pl.processors_per_server() > 1 ||
232 			     ea_pl.processor_remainder());
233   else // peer: split flag insufficient if 1 server
234     multiProcAnalysisFlag = (analysisCommSize > 1); // could vary
235 
236   if ( iteratorCommRank // any processor other than rank 0 in iteratorComm
237        || ( outputLevel == SILENT_OUTPUT && evalCommRank == 0 &&
238 	   !eaDedMasterFlag && numAnalysisServers < 2) )
239     suppressOutput = true; // suppress output of fn. eval. echoes
240   /* Additional output granularity:
241   if (ieMessagePass)                // suppress fn eval output in
242     suppressLowLevelOutput = true;  //   SysCall/Fork/Direct
243   if (methodOutput == "quiet")      // suppress scheduling & vars/response
244     suppressHighLevelOutput = true; //   output in ApplicationInterface & Model
245   */
246 
247   // simplify downstream logic by resetting default asynch local concurrency
248   // to 1 for the case of message passing.  This allows schedulers to more
249   // readily distinguish unlimited concurrency for asynch local parallelism
250   // (default is unlimited unless user spec) from message passing parallelism
251   // with synchronous local evals (default; hybrid mode requires user spec > 1).
252   asynchLocalAnalysisConcurrency
253     = (eaMessagePass && asynchLocalAnalysisConcSpec == 0)
254     ? 1 : asynchLocalAnalysisConcSpec;
255 
256   // Set flag for asynch local parallelism of analyses.  In the local asynch
257   // case (no message passing), a concurrency specification is interpreted as
258   // a limit (default is unlimited).  In the message passing case, the user
259   // must explicitly specify analysis concurrency to get hybrid parallelism
260   // (default is no analysis concurrency).
261   if ( numAnalysisDrivers > 1 &&
262        interfaceSynchronization == ASYNCHRONOUS_INTERFACE &&
263        ( asynchLocalAnalysisConcurrency > 1 || ( !eaMessagePass &&
264         !asynchLocalAnalysisConcurrency ) ) )
265     asynchLocalAnalysisFlag = true;
266 }
267 
268 
269 /** Override DirectApplicInterface definition if plug-in to allow batch
270     processing in Plugin{Serial,Parallel}DirectApplicInterface.cpp */
271 void ApplicationInterface::
init_communicators_checks(int max_eval_concurrency)272 init_communicators_checks(int max_eval_concurrency)
273 { } // default is no-op
274 
275 
276 /** Override DirectApplicInterface definition if plug-in to allow batch
277     processing in Plugin{Serial,Parallel}DirectApplicInterface.cpp */
278 void ApplicationInterface::
set_communicators_checks(int max_eval_concurrency)279 set_communicators_checks(int max_eval_concurrency)
280 { } // default is no-op
281 
282 
check_multiprocessor_analysis(bool warn)283 bool ApplicationInterface::check_multiprocessor_analysis(bool warn)
284 {
285   bool issue_flag = false;
286   // multiprocessor analyses are only valid for synchronous direct interfaces.
287   // Neither system calls (synch or asynch), forks (synch or asynch), nor POSIX
288   // threads (asynch direct) can share a communicator.  Attempting parallel
289   // analyses without a shared communicator can result in correct answers if
290   // analysisComm's local leader computes the total result, but it does NOT
291   // perform the intended multiprocessor analysis and is therefore misleading
292   // and should be explicitly prevented.
293   if (multiProcAnalysisFlag) { // not valid for system/fork
294     issue_flag = true;
295     if (iteratorCommRank == 0) {
296       if (warn) Cerr << "Warning: ";
297       else      Cerr << "Error:   ";
298       Cerr << "Multiprocessor analyses are not valid with "
299 	   << interface_enum_to_string(interfaceType) << " interfaces.";
300       if (warn) Cerr << "\n         This issue may be resolved at run time.";
301       else
302 	Cerr << "\n         Your processor allocation may exceed the "
303 	     << "concurrency in the problem,\n         requiring a reduction "
304 	     << "in allocation to eliminate the assignment of\n         excess "
305 	     << "processors to the analysis level.";
306       Cerr << std::endl;
307     }
308   }
309   return issue_flag;
310 }
311 
312 
313 bool ApplicationInterface::
check_asynchronous(bool warn,int max_eval_concurrency)314 check_asynchronous(bool warn, int max_eval_concurrency)
315 {
316   bool issue_flag = false, asynch_local_eval_flag
317     = ( max_eval_concurrency > 1 &&
318 	interfaceSynchronization == ASYNCHRONOUS_INTERFACE &&
319 	( asynchLocalEvalConcurrency > 1 ||           // captures hybrid mode
320 	  ( !ieMessagePass && !asynchLocalEvalConcurrency ) ) ); // unlimited
321 
322   // Check for asynchronous local evaluations or analyses
323   if (asynch_local_eval_flag || asynchLocalAnalysisFlag) {
324     issue_flag = true;
325     if (iteratorCommRank == 0) {
326       if (warn) Cerr << "Warning: ";
327       else      Cerr << "Error:   ";
328       Cerr << "asynchronous capability not supported in "
329 	   << interface_enum_to_string(interfaceType) << " interfaces.";
330       if (warn) Cerr << "\n         This issue may be resolved at run time.";
331       Cerr << std::endl;
332     }
333   }
334   return issue_flag;
335 }
336 
337 
338 bool ApplicationInterface::
check_multiprocessor_asynchronous(bool warn,int max_eval_concurrency)339 check_multiprocessor_asynchronous(bool warn, int max_eval_concurrency)
340 {
341   bool issue_flag = false, asynch_local_eval_flag
342     = ( max_eval_concurrency > 1 &&
343 	interfaceSynchronization == ASYNCHRONOUS_INTERFACE &&
344 	( asynchLocalEvalConcurrency > 1 ||           // captures hybrid mode
345 	  ( !ieMessagePass && !asynchLocalEvalConcurrency ) ) ); // unlimited
346 
347   // Performing asynch local concurrency requires a single processor.
348   // multiProcEvalFlag & asynchLocalAnalysisConcurrency can coexist provided
349   // that evalComm is divided into single-processor analysis servers.
350   if ( (multiProcEvalFlag     && asynch_local_eval_flag) ||
351        (multiProcAnalysisFlag && asynchLocalAnalysisFlag) ) {
352     issue_flag = true;
353     if (iteratorCommRank == 0) {
354       if (warn) Cerr << "Warning: ";
355       else      Cerr << "Error:   ";
356       Cerr << "asynchronous local jobs are not supported for multiprocessor\n"
357 	   << "         communicator partitions.";
358       if (warn) Cerr << "  This issue may be resolved at run time.";
359       else      Cerr << "  Your processor allocation may need adjustment.";
360       Cerr << std::endl;
361     }
362   }
363   return issue_flag;
364 }
365 
366 /// form and return the final batch ID tag
367 
368 String ApplicationInterface::
final_batch_id_tag()369 final_batch_id_tag() {
370   return evalTagPrefix + "." + std::to_string(batchIdCntr);
371 }
372 
373 String ApplicationInterface::
final_eval_id_tag(int iface_eval_id)374 final_eval_id_tag(int iface_eval_id)
375 {
376   if (appendIfaceId) {
377     if(batchEval)
378       return evalTagPrefix + "." + std::to_string(batchIdCntr) + "." + std::to_string(iface_eval_id);
379      else
380       return evalTagPrefix + "." + std::to_string(iface_eval_id);
381   } else
382     return evalTagPrefix;
383 }
384 
385 
386 
387 //void ApplicationInterface::free_communicators()
388 //{ }
389 
390 
391 /** The function evaluator for application interfaces.  Called from
392     derived_evaluate() and derived_evaluate_nowait() in derived Model
393     classes.  If asynch_flag is not set, perform a blocking evaluation
394     (using derived_map()).  If asynch_flag is set, add the job to the
395     beforeSynchCorePRPQueue queue for execution by one of the scheduler
396     routines in synchronize() or synchronize_nowait().  Duplicate
397     function evaluations are detected with duplication_detect(). */
map(const Variables & vars,const ActiveSet & set,Response & response,bool asynch_flag)398 void ApplicationInterface::map(const Variables& vars, const ActiveSet& set,
399 			       Response& response, bool asynch_flag)
400 {
401   ++evalIdCntr; // all calls to map for this interface instance
402   const ShortArray& asv = set.request_vector();
403   size_t num_fns = asv.size();
404   if (fineGrainEvalCounters) { // detailed evaluation reporting
405     init_evaluation_counters(num_fns);
406     for (size_t i=0; i<num_fns; ++i) {
407       short asv_val = asv[i];
408       if (asv_val & 1) ++fnValCounter[i];
409       if (asv_val & 2) ++fnGradCounter[i];
410       if (asv_val & 4) ++fnHessCounter[i];
411     }
412     if (fnLabels.empty())
413       fnLabels = response.function_labels();
414   }
415   if (outputLevel > SILENT_OUTPUT) {
416     if (interfaceId.empty() || interfaceId == "NO_ID")
417       Cout << "\n---------------------\nBegin ";
418     else
419       Cout << "\n------------------------------\nBegin "
420 	   << std::setw(8) << interfaceId << ' ';
421     Cout << "Evaluation " << std::setw(4) << evalIdCntr;
422     // This may be more confusing than helpful:
423     //if (evalIdRefPt)
424     //  Cout << " (local evaluation " << evalIdCntr - evalIdRefPt << ")";
425     if (interfaceId.empty() || interfaceId == "NO_ID") Cout << "\n---------------------\n";
426     else Cout << "\n------------------------------\n";
427   }
428   if (outputLevel > QUIET_OUTPUT)
429     Cout << "Parameters for evaluation " << evalIdCntr << ":\n" << vars << '\n';
430 
431   response.active_set(set); // responseActiveSet = set for duplicate search
432 
433   // Subdivide ActiveSet for algebraic_mappings() and derived_map()
434   Response algebraic_resp, core_resp; // empty handles
435   ActiveSet core_set;
436 
437   if (algebraicMappings) {
438     if (evalIdCntr == 1)
439       init_algebraic_mappings(vars, response);
440 
441     // Always allocate a separate algebraic_resp, even if no coreMappings.
442     // Cannot share a rep with the incoming response, because even if only
443     // algebraic mappings are present, they may need reordering to form the
444     // requested set and response.
445     ActiveSet algebraic_set;
446     asv_mapping(set, algebraic_set, core_set);
447     algebraic_resp = Response(sharedRespData, algebraic_set);
448     if (asynch_flag) {
449       ParamResponsePair prp(vars, interfaceId, algebraic_resp, evalIdCntr);
450       beforeSynchAlgPRPQueue.insert(prp);
451     }
452     else
453       algebraic_mappings(vars, algebraic_set, algebraic_resp);
454 
455     if (coreMappings) { // both core and algebraic mappings active
456       // separate core_resp from response
457       core_resp = response.copy();
458       core_resp.active_set(core_set);
459     }
460   }
461   else if (coreMappings) { // analysis_driver mappings only
462     core_set  = set;
463     core_resp = response; // shared rep: no need for response_mapping()
464   }
465 
466   bool duplicate = false;
467   if (coreMappings) {
468     if (evalCacheFlag && duplication_detect(vars, core_resp, asynch_flag)) {
469       // catches duplication both in data_pairs (core evals already computed)
470       // and in beforeSynchCorePRPQueue (core evals queued for processing).
471       duplicate = true;
472       if (outputLevel > SILENT_OUTPUT)
473 	Cout << "Duplication detected: analysis_drivers not invoked.\n";
474     }
475     else {
476 
477       //if ( partial_duplication_detect(vars, set, core_resp) ) {
478         // sets augmentFlag (for use by Response::read), adds to core_resp,
479         // and decrements the asv to be used in derived_map, but saves the
480         // original asv for resetting once everything's reintegrated.  The
481         // original asv must be restored prior to any I/O of the core_resp.
482       //}
483 
484       // For new evaluations, manage the user's active_set_vector specification.
485       //    on: asv seen by user's interface may change on each eval (default)
486       //   off: asv seen by user's interface is constant for all evals
487       if (!asvControlFlag) { // set ASV's to defaultASV for the mapping
488 	init_default_asv(num_fns);  // initialize if not already done
489 	core_set.request_vector(defaultASV); // DVV assigned above
490 	core_resp.active_set(core_set);
491       }
492 
493       if (asynch_flag) { // multiple simultaneous evals. (local or parallel)
494 	// use this constructor since deep copies of vars/response are needed
495 	ParamResponsePair prp(vars, interfaceId, core_resp, evalIdCntr);
496 	beforeSynchCorePRPQueue.insert(prp);
497 	// jobs are not queued until call to synchronize() to allow dynamic
498 	// scheduling. Response data headers and data_pair list insertion
499 	// appear in synchronize().
500       }
501       else { // local synchronous evaluation
502 
503 	// bcast the job to other processors within peer 1 (if required)
504 	if (multiProcEvalFlag)
505 	  broadcast_evaluation(evalIdCntr, vars, core_set);
506 
507 	//common_input_filtering(vars);
508 
509 	currEvalId = evalIdCntr;
510 	try { derived_map(vars, core_set, core_resp, currEvalId); }
511 
512 	catch(const FunctionEvalFailure& fneval_except) {
513 	  //Cout << "Caught FunctionEvalFailure in map; message: "
514 	  //<< fneval_except.what() << std::endl;
515 	  manage_failure(vars, core_set, core_resp, currEvalId);
516 	}
517 
518 	//common_output_filtering(core_resp);
519 
520 	if (evalCacheFlag || restartFileFlag) {
521 	  // manage shallow/deep copy of vars/response with evalCacheFlag
522 	  ParamResponsePair prp(vars, interfaceId, core_resp, currEvalId,
523 				evalCacheFlag);
524 	  if (evalCacheFlag)   data_pairs.insert(prp);
525 	  if (restartFileFlag) parallelLib.write_restart(prp);
526 	}
527       }
528     }
529   }
530 
531   if (!duplicate) {
532     ++newEvalIdCntr; // nonduplicate evaluations (used ONLY in fn eval summary)
533     if (fineGrainEvalCounters) { // detailed evaluation reporting
534       const ShortArray& asv = set.request_vector();
535       size_t i, num_fns = asv.size();
536       for (i=0; i<num_fns; ++i) {
537 	short asv_val = asv[i];
538 	if (asv_val & 1) ++newFnValCounter[i];
539 	if (asv_val & 2) ++newFnGradCounter[i];
540 	if (asv_val & 4) ++newFnHessCounter[i];
541       }
542     }
543   }
544 
545   if (asynch_flag) {
546     // Output appears here to support core | algebraic | both
547     if (!duplicate && outputLevel > SILENT_OUTPUT) {
548       if(batchEval) Cout << "(Batch job ";
549       else Cout << "(Asynchronous job ";
550       Cout << evalIdCntr;
551       if (interfaceId.empty() || interfaceId == "NO_ID") Cout << " added to queue)\n";
552       else Cout << " added to " << interfaceId << " queue)\n";
553     }
554   }
555   else {
556     // call response_mapping even when no coreMapping, as even with
557     // algebraic only, the functions may have to be reordered
558     if (algebraicMappings)
559       response_mapping(algebraic_resp, core_resp, response);
560 
561     if (outputLevel > QUIET_OUTPUT) {
562       if (duplicate)
563 	Cout << "\nActive response data retrieved from database";
564       else {
565 	Cout << "\nActive response data for ";
566 	if (!(interfaceId.empty() || interfaceId == "NO_ID")) Cout << interfaceId << ' ';
567 	Cout << "evaluation " << evalIdCntr;
568       }
569       Cout << ":\n" << response << std::endl;
570     }
571   }
572 }
573 
574 
575 /** Called from map() to check incoming evaluation request for
576     duplication with content of data_pairs and beforeSynchCorePRPQueue.
577     If duplication is detected, return true, else return false.  Manage
578     bookkeeping with historyDuplicateMap and beforeSynchDuplicateMap.
579     Note that the list searches can get very expensive if a long list
580     is searched on every new function evaluation (either from a large
581     number of previous jobs, a large number of pending jobs, or both).
582     For this reason, a user request for deactivation of the evaluation
583     cache results in a complete bypass of duplication_detect(), even
584     though a beforeSynchCorePRPQueue search would still be meaningful.
585     Since the intent of this request is to streamline operations, both
586     list searches are bypassed. */
587 bool ApplicationInterface::
duplication_detect(const Variables & vars,Response & response,bool asynch_flag)588 duplication_detect(const Variables& vars, Response& response, bool asynch_flag)
589 {
590   // Two flavors of cache lookup are supported: exact and tolerance-based.
591   // Note 1: incoming response's responseActiveSet was updated in map(), but
592   //   the rest of response is out-of-date (the previous fn. eval).  Due to
593   //   set_compare(), the desired response set could be a subset of the
594   //   data_pairs response -> use update().
595   // Note 2: If duplication detected with an eval from restart/file import,
596   //   promote the database eval id to current run status to enable more
597   //   meaningful lookups (e.g., eval id of final result) downstream.  There
598   //   are several possible bookkeeping update approaches; here, we choose to
599   //   delete the old record and readd it with original ASV content and updated
600   //   eval id.  This approach is efficient and doesn't discard any data, but
601   //   has the downside that the record's ASV may contain more than the current
602   //   request.  Another reasonable approach would be to keep the old record
603   //   (original ASV) and add a new one (with the current ASV subset), but
604   //   this is less efficient and complicates eval id management in downstream
605   //   lookups (multiple records can match a particular Ids/Vars/Set lookup,
606   //   requiring an additional test to prefer positive id's in some use cases).
607   PRPCacheOIter ord_it; PRPCacheHIter hash_it;
608   ParamResponsePair cache_pr; int cache_eval_id; bool cache_hit = false;
609   if (nearbyDuplicateDetect) { // slow but allows tolerance on equality
610     ord_it = lookup_by_nearby_val(data_pairs, interfaceId, vars,
611 				  response.active_set(), nearbyTolerance);
612     cache_hit = (ord_it != data_pairs.end());
613     if (cache_hit) { // ordered-specific updates (shared updates below)
614       response.update(ord_it->response());
615       cache_eval_id = ord_it->eval_id();
616       if (cache_eval_id <= 0)
617 	{ cache_pr = *ord_it; data_pairs.erase(ord_it); }
618     }
619   }
620   else { // fast but requires exact binary match
621     hash_it = lookup_by_val(data_pairs, interfaceId, vars,
622 			    response.active_set());
623     cache_hit = (hash_it != data_pairs.get<hashed>().end());
624     if (cache_hit) { // hashed-specific updates (shared updates below)
625       response.update(hash_it->response());
626       cache_eval_id = hash_it->eval_id();
627       if (cache_eval_id <= 0)
628 	{ cache_pr = *hash_it; data_pairs.get<hashed>().erase(hash_it); }
629     }
630   }
631   if (cache_hit) { // updates shared among ordered/hashed lookups
632     if (cache_eval_id <= 0) {
633       // ordered key is const; must remove (above) & change/add (below)
634       cache_pr.eval_id(evalIdCntr); // promote
635       data_pairs.insert(cache_pr);  // shallow copy of previous vars/resp
636     }
637 
638     if (asynch_flag) // asynch case: bookkeep
639       historyDuplicateMap[evalIdCntr] = response.copy();
640 
641     return true; // Duplication detected
642   }
643 
644   // check beforeSynchCorePRPQueue as well (if asynchronous and no cache hit)
645   if (asynch_flag) {
646     // queue lookups only support one flavor for simplicity: exact lookup
647     PRPQueueHIter queue_it = lookup_by_val(beforeSynchCorePRPQueue, interfaceId,
648 					   vars, response.active_set());
649     if (queue_it != beforeSynchCorePRPQueue.get<hashed>().end()) {
650       // Duplication detected: bookkeep
651       beforeSynchDuplicateMap[evalIdCntr]
652 	= std::make_pair(queue_it, response.copy());
653       return true; // Duplication detected
654     }
655   }
656 
657   return false; // Duplication not detected
658 }
659 
660 /** If the user has specified active_set_vector as off, then map()
661     uses a default ASV which is constant for all function evaluations
662     (so that the user need not check the content of the ASV on each
663     evaluation).  Only initialized if needed and not already sized. */
init_default_asv(size_t num_fns)664 void ApplicationInterface::init_default_asv(size_t num_fns) {
665   if (!asvControlFlag && defaultASV.size() != num_fns) {
666     short asv_value = 1;
667     if (gradientType == "analytic")
668       asv_value |= 2;
669     if (hessianType == "analytic")
670       asv_value |= 4;
671     defaultASV.assign(num_fns, asv_value);
672     // TODO: the mixed ID sizes from the problem DB may not be
673     // commensurate with num_fns due to Recast transformations (MO
674     // reduce or experiment data); consider managing this in Model
675     if (gradientType == "mixed") {
676       ISCIter cit = gradMixedAnalyticIds.begin();
677       ISCIter cend = gradMixedAnalyticIds.end();
678       for ( ; cit != cend; ++cit)
679         defaultASV[*cit - 1] |= 2;
680     }
681     if (hessianType == "mixed") {
682       ISCIter cit = hessMixedAnalyticIds.begin();
683       ISCIter cend = hessMixedAnalyticIds.end();
684       for ( ; cit != cend; ++cit)
685         defaultASV[*cit - 1] |= 4;
686     }
687   }
688 }
689 
690 
691 /** This function provides blocking synchronization for all cases of
692     asynchronous evaluations, including the local asynchronous case
693     (background system call, nonblocking fork, & multithreads), the
694     message passing case, and the hybrid case.  Called from
695     derived_synchronize() in derived Model classes. */
synchronize()696 const IntResponseMap& ApplicationInterface::synchronize()
697 {
698   rawResponseMap.clear();
699 
700   size_t cached_eval = cachedResponseMap.size(),
701     hist_duplicates  = historyDuplicateMap.size(),
702     queue_duplicates = beforeSynchDuplicateMap.size();
703 
704   // Process cached responses
705   if (cached_eval) std::swap(rawResponseMap, cachedResponseMap);
706 
707   // Process history duplicates (see duplication_detect) since response data
708   // has been extracted from the data_pairs list.  These duplicates are not
709   // written to data_pairs or write_restart.
710   if (hist_duplicates) {
711     if (rawResponseMap.empty()) std::swap(rawResponseMap, historyDuplicateMap);
712     else {
713       rawResponseMap.insert(historyDuplicateMap.begin(),
714 			    historyDuplicateMap.end());
715       historyDuplicateMap.clear();
716     }
717   }
718 
719   if (coreMappings) {
720     size_t core_prp_jobs = beforeSynchCorePRPQueue.size();
721     Cout << "\nBlocking synchronize of " << core_prp_jobs << " asynchronous ";
722     if (!(interfaceId.empty() || interfaceId == "NO_ID")) Cout << interfaceId << ' ';
723     Cout << "evaluations";
724     if (cached_eval || hist_duplicates || queue_duplicates)
725       Cout << ", " << cached_eval << " cached evaluations, and "
726 	   << hist_duplicates + queue_duplicates << " duplicates";
727     Cout << std::endl;
728 
729     // Process nonduplicate evaluations for either the message passing or local
730     // asynchronous case.
731     if (core_prp_jobs) {
732       if (ieMessagePass) { // single or multi-processor servers
733 	if (ieDedMasterFlag) master_dynamic_schedule_evaluations();
734 	else {
735 	  // utilize asynch local evals to accomplish a dynamic peer schedule
736 	  // (even if hybrid mode not specified) unless precluded by direct
737 	  // interface, multiProcEvalFlag (includes single proc analysis cases),
738 	  // static scheduling override, or static asynch local specification.
739 	  if ( asynchLocalEvalStatic || multiProcEvalFlag ||
740 	       (interfaceType & DIRECT_INTERFACE_BIT) ||
741 	       evalScheduling == PEER_STATIC_SCHEDULING )
742 	    peer_static_schedule_evaluations();
743 	  else // utilizes asynch local evals even if hybrid mode not specified
744 	    peer_dynamic_schedule_evaluations();
745 	}
746       }
747       else // local to processor
748 	asynchronous_local_evaluations(beforeSynchCorePRPQueue);
749     }
750   }
751   else if (!beforeSynchAlgPRPQueue.empty()) {
752     Cout << "\nBlocking synchronize of " << beforeSynchAlgPRPQueue.size();
753     if (!(interfaceId.empty() || interfaceId == "NO_ID")) Cout << ' ' << interfaceId;
754     Cout << " algebraic mappings" << std::endl;
755   }
756 
757   // Now that beforeSynchCorePRPQueue processing is complete, process duplicates
758   // detected within beforeSynchCorePRPQueue (see duplication_detect).
759   if (queue_duplicates) {
760     for (std::map<int, std::pair<PRPQueueHIter, Response> >::const_iterator
761 	 bsd_iter  = beforeSynchDuplicateMap.begin();
762 	 bsd_iter != beforeSynchDuplicateMap.end(); bsd_iter++) {
763       // due to id_vars_set_compare, the desired response set could be a subset
764       // of the beforeSynchCorePRPQueue duplicate response -> use update().
765       rawResponseMap[bsd_iter->first] = (bsd_iter->second).second;
766       rawResponseMap[bsd_iter->first].update(
767 	(bsd_iter->second).first->response());
768     }
769     beforeSynchDuplicateMap.clear();
770   }
771   beforeSynchCorePRPQueue.clear();
772 
773   // Merge core mappings and algebraic mappings into rawResponseMap
774   if (algebraicMappings) { // complete all algebraic jobs and overlay
775     for (PRPQueueIter alg_prp_it = beforeSynchAlgPRPQueue.begin();
776 	 alg_prp_it != beforeSynchAlgPRPQueue.end(); alg_prp_it++) {
777       Response alg_response = alg_prp_it->response();
778       algebraic_mappings(alg_prp_it->variables(), alg_prp_it->active_set(),
779 			 alg_response);
780       if (coreMappings) {
781 	Response& response = rawResponseMap[alg_prp_it->eval_id()];
782 	response_mapping(alg_response, response, response);
783       }
784       else {
785 	// call response_mapping even when no coreMapping, as even with
786 	// algebraic only, the functions may have to be reordered
787 
788 	// Recreate total_response with the correct (possibly
789 	// reordered) ASV since when no CoreMapping, rawResponseMap
790 	// doesn't have a valid Response to update
791 	ActiveSet total_set(alg_prp_it->active_set());
792 	asv_mapping(alg_prp_it->active_set(), total_set);
793 	Response total_response = Response(sharedRespData, total_set);
794 	response_mapping(alg_response, total_response, total_response);
795 	rawResponseMap[alg_prp_it->eval_id()] = total_response;
796       }
797     }
798     beforeSynchAlgPRPQueue.clear();
799   }
800 
801   if (outputLevel > QUIET_OUTPUT) // output completed responses
802     for (IntRespMCIter rr_iter = rawResponseMap.begin();
803 	 rr_iter != rawResponseMap.end(); ++rr_iter) {
804       Cout << "\nActive response data for ";
805       if (!(interfaceId.empty() || interfaceId == "NO_ID")) Cout << interfaceId << ' ';
806       Cout << "evaluation " << rr_iter->first
807 	   << ":\n" << rr_iter->second;
808     }
809 
810   return rawResponseMap;
811 }
812 
813 
814 /** This function provides nonblocking synchronization for the local
815     asynchronous case and selected nonblocking message passing schedulers.
816     Called from derived_synchronize_nowait() in derived Model classes. */
synchronize_nowait()817 const IntResponseMap& ApplicationInterface::synchronize_nowait()
818 {
819   rawResponseMap.clear();
820 
821   size_t cached_eval = cachedResponseMap.size(),
822     hist_duplicates  = historyDuplicateMap.size(),
823     queue_duplicates = beforeSynchDuplicateMap.size();
824 
825   if (coreMappings) {
826     size_t core_prp_jobs = beforeSynchCorePRPQueue.size();
827     // suppress repeated header output for longer jobs;
828     // don't need to check queue_duplicates since none w/o core queue
829     if ( headerFlag && (core_prp_jobs || hist_duplicates) ) {
830       Cout << "\nNonblocking synchronize of " << core_prp_jobs
831 	   << " asynchronous ";
832       if (!(interfaceId.empty() || interfaceId == "NO_ID")) Cout << interfaceId << ' ';
833       Cout << "evaluations";
834       if (cached_eval || hist_duplicates || queue_duplicates)
835 	Cout << ", " << cached_eval << " cached evaluations, and "
836 	     << hist_duplicates + queue_duplicates << " duplicates";
837       Cout << std::endl;
838     }
839 
840     // Test nonduplicate evaluations and add completions to rawResponseMap
841     if (core_prp_jobs) {
842       if (ieMessagePass) { // single or multi-processor servers
843 	if (ieDedMasterFlag)
844 	  master_dynamic_schedule_evaluations_nowait();
845 	else {
846 	  // prefer to use peer_dynamic to avoid blocking on local jobs, as
847 	  // is consistent with nowait requirement; however, a fallback to
848 	  // peer_static is needed for the special cases listed below:
849 	  if ( asynchLocalEvalStatic || multiProcEvalFlag ||
850 	       (interfaceType & DIRECT_INTERFACE_BIT) ||
851 	       evalScheduling == PEER_STATIC_SCHEDULING )
852 	    peer_static_schedule_evaluations_nowait();
853 	  else
854 	    peer_dynamic_schedule_evaluations_nowait();
855 	}
856       }
857       else // local to processor
858 	asynchronous_local_evaluations_nowait(beforeSynchCorePRPQueue);
859     }
860     // suppress header on next pass if no new completions on this pass
861     headerFlag = !rawResponseMap.empty();
862   }
863   else if (!beforeSynchAlgPRPQueue.empty()) {
864     Cout << "\nNonblocking synchronize of " << beforeSynchAlgPRPQueue.size();
865     if (!(interfaceId.empty() || interfaceId == "NO_ID")) Cout << ' ' << interfaceId;
866     Cout << " algebraic mappings" << std::endl;
867   }
868 
869   // Since beforeSynchCorePRPQueue processing will not in general be completed,
870   // process duplicates listed in beforeSynchDuplicateMap only if the
871   // original/nonduplicate beforeSynchCorePRPQueue job is complete.
872   if (queue_duplicates && !rawResponseMap.empty())
873     for (std::map<int, std::pair<PRPQueueHIter, Response> >::iterator
874 	 bsd_iter  = beforeSynchDuplicateMap.begin();
875 	 bsd_iter != beforeSynchDuplicateMap.end(); bsd_iter++) {
876       const ParamResponsePair& scheduled_pr = *(bsd_iter->second).first;
877       if (rawResponseMap.find(scheduled_pr.eval_id()) != rawResponseMap.end()) {
878 	// due to id_vars_set_compare, the desired response set could be
879 	// a subset of the duplicate response -> use update().
880 	Response& response = (bsd_iter->second).second;
881 	response.update(scheduled_pr.response());
882 	rawResponseMap[bsd_iter->first] = response;
883       }
884     }
885 
886   // Process history duplicates (see duplication_detect) and cached evaluations.
887   // In the _nowait case, this goes after the schedulers so as to not interfere
888   // with rawResponseMap usage in the schedulers and after the
889   // beforeSynchDuplicates in order to streamline their rawResponseMap searches.
890   // Note: since data_pairs is checked first in duplication_detect(), it is not
891   // possible to have a beforeSynchDuplicateMap entry that references a
892   // historyDuplicateMap entry.
893   if (cached_eval) {
894     rawResponseMap.insert(cachedResponseMap.begin(), cachedResponseMap.end());
895     cachedResponseMap.clear(); headerFlag = true;
896   }
897   if (hist_duplicates) {
898     rawResponseMap.insert(historyDuplicateMap.begin(),
899 			  historyDuplicateMap.end());
900     historyDuplicateMap.clear(); headerFlag = true;
901   }
902 
903   // Merge core mappings and algebraic mappings into rawResponseMap
904   if (coreMappings && algebraicMappings) { // update completed core jobs
905     for (IntRespMIter rr_iter = rawResponseMap.begin();
906 	 rr_iter != rawResponseMap.end(); ++rr_iter) {
907       PRPQueueIter alg_prp_it
908 	= lookup_by_eval_id(beforeSynchAlgPRPQueue, rr_iter->first);
909       Response alg_response = alg_prp_it->response(); // shared rep
910       algebraic_mappings(alg_prp_it->variables(), alg_prp_it->active_set(),
911 			 alg_response);               // update rep
912       response_mapping(alg_response, rr_iter->second, rr_iter->second);
913       beforeSynchAlgPRPQueue.erase(alg_prp_it);
914     }
915   }
916   else if (algebraicMappings) { // complete all algebraic jobs
917     for (PRPQueueIter alg_prp_it = beforeSynchAlgPRPQueue.begin();
918 	 alg_prp_it != beforeSynchAlgPRPQueue.end(); alg_prp_it++) {
919 
920       Response algebraic_resp = alg_prp_it->response(); // shared rep
921       algebraic_mappings(alg_prp_it->variables(), alg_prp_it->active_set(),
922 			 algebraic_resp);               // update rep
923       // call response_mapping even when no coreMapping, as even with
924       // algebraic only, the functions may have to be reordered
925 
926       // Recreate total_response with the correct (possibly reordered)
927       // ASV since when no CoreMapping, rawResponseMap doesn't have a
928       // valid Response to update
929       ActiveSet total_set(alg_prp_it->active_set());
930       asv_mapping(alg_prp_it->active_set(), total_set);
931       Response total_response = Response(sharedRespData, total_set);
932       response_mapping(algebraic_resp, total_response, total_response);
933       rawResponseMap[alg_prp_it->eval_id()] = total_response;
934     }
935     beforeSynchAlgPRPQueue.clear();
936   }
937 
938   for (IntRespMCIter rr_iter = rawResponseMap.begin();
939        rr_iter != rawResponseMap.end(); ++rr_iter) {
940     int fn_eval_id = rr_iter->first;
941     // output completed responses
942     if (outputLevel > QUIET_OUTPUT) {
943       Cout << "\nActive response data for ";
944       if (!(interfaceId.empty() || interfaceId == "NO_ID")) Cout << interfaceId << ' ';
945       Cout << "evaluation " << fn_eval_id << ":\n" << rr_iter->second;
946     }
947     // clean up bookkeeping
948     if (coreMappings) {
949       PRPQueueIter prp_iter
950 	= lookup_by_eval_id(beforeSynchCorePRPQueue, fn_eval_id);
951       if (prp_iter != beforeSynchCorePRPQueue.end()) // duplicates not in list
952 	beforeSynchCorePRPQueue.erase(prp_iter);
953       beforeSynchDuplicateMap.erase(fn_eval_id); // if present
954     }
955   }
956 
957   return rawResponseMap;
958 }
959 
960 
961 /** This code is called from synchronize() to provide the master portion of
962     a master-slave algorithm for the dynamic scheduling of evaluations among
963     slave servers.  It performs no evaluations locally and matches either
964     serve_evaluations_synch() or serve_evaluations_asynch() on the slave
965     servers, depending on the value of asynchLocalEvalConcurrency.  Dynamic
966     scheduling assigns jobs in 2 passes.  The 1st pass gives each server
967     the same number of jobs (equal to asynchLocalEvalConcurrency).  The
968     2nd pass assigns the remaining jobs to slave servers as previous jobs
969     are completed and returned.  Single- and multilevel parallel use intra-
970     and inter-communicators, respectively, for send/receive.  Specific
971     syntax is encapsulated within ParallelLibrary. */
master_dynamic_schedule_evaluations()972 void ApplicationInterface::master_dynamic_schedule_evaluations()
973 {
974   int capacity = numEvalServers;
975   if (asynchLocalEvalConcurrency > 1) capacity *= asynchLocalEvalConcurrency;
976   int num_jobs = beforeSynchCorePRPQueue.size(),
977      num_sends = std::min(capacity, num_jobs);
978   Cout << "Master dynamic schedule: first pass assigning " << num_sends
979        << " jobs among " << numEvalServers << " servers\n";
980 
981   // only need num_sends entries (not num_jobs) due to reuse
982   sendBuffers  = new MPIPackBuffer   [num_sends];
983   recvBuffers  = new MPIUnpackBuffer [num_sends];
984   recvRequests = new MPI_Request     [num_sends];
985 
986   // send data & post receives for 1st set of jobs
987   int i, server_id, fn_eval_id;
988   PRPQueueIter prp_iter;
989   for (i=0, prp_iter = beforeSynchCorePRPQueue.begin(); i<num_sends;
990        ++i, ++prp_iter) {
991     server_id  = i%numEvalServers + 1; // from 1 to numEvalServers
992     send_evaluation(prp_iter, i, server_id, false); // !peer
993   }
994 
995   // schedule remaining jobs
996   if (num_sends < num_jobs) {
997     Cout << "Master dynamic schedule: second pass scheduling "
998 	 << num_jobs-num_sends << " remaining jobs\n";
999     int send_cntr = num_sends, recv_cntr = 0, out_count;
1000     MPI_Status* status_array = new MPI_Status [num_sends];
1001     int* index_array = new int [num_sends];
1002     PRPQueueIter return_iter;
1003     while (recv_cntr < num_jobs) {
1004       if (outputLevel > SILENT_OUTPUT)
1005         Cout << "Master dynamic schedule: waiting on completed jobs"<<std::endl;
1006       parallelLib.waitsome(num_sends, recvRequests, out_count, index_array,
1007 			   status_array);
1008       recv_cntr += out_count;
1009       for (i=0; i<out_count; ++i) {
1010         int index   = index_array[i]; // index of recv_request that completed
1011         server_id   = index%numEvalServers + 1; // from 1 to numEvalServers
1012         fn_eval_id  = status_array[i].MPI_TAG;
1013 	return_iter = lookup_by_eval_id(beforeSynchCorePRPQueue, fn_eval_id);
1014 	receive_evaluation(return_iter, index, server_id, false);  //!peer
1015         if (send_cntr < num_jobs) {
1016 	  send_evaluation(prp_iter, index, server_id, false); // !peer
1017           ++send_cntr; ++prp_iter;
1018         }
1019       }
1020     }
1021     delete [] status_array;
1022     delete [] index_array;
1023   }
1024   else { // all jobs assigned in first pass
1025     if (outputLevel > SILENT_OUTPUT)
1026       Cout << "Master dynamic schedule: waiting on all jobs" << std::endl;
1027     parallelLib.waitall(num_jobs, recvRequests);
1028     // All buffers received, now generate rawResponseMap
1029     for (i=0, prp_iter = beforeSynchCorePRPQueue.begin(); i<num_jobs;
1030          ++i, ++prp_iter) {
1031       server_id = i%numEvalServers + 1; // from 1 to numEvalServers
1032       receive_evaluation(prp_iter, i, server_id, false);
1033     }
1034   }
1035   // deallocate MPI & buffer arrays
1036   delete [] sendBuffers;   sendBuffers = NULL;
1037   delete [] recvBuffers;   recvBuffers = NULL;
1038   delete [] recvRequests; recvRequests = NULL;
1039 }
1040 
1041 
1042 /** This code runs on the iteratorCommRank 0 processor (the iterator) and is
1043     called from synchronize() in order to manage a static schedule for cases
1044     where peer 1 must block when evaluating its local job allocation (e.g.,
1045     single or multiprocessor direct interface evaluations).  It matches
1046     serve_evaluations_peer() for any other processors within the first
1047     evaluation partition and serve_evaluations_{synch,asynch}() for all other
1048     evaluation partitions (depending on asynchLocalEvalConcurrency).  It
1049     performs function evaluations locally for its portion of the job
1050     allocation using either asynchronous_local_evaluations() or
1051     synchronous_local_evaluations().  Single-level and multilevel parallel
1052     use intra- and inter-communicators, respectively, for send/receive.
1053     Specific syntax is encapsulated within ParallelLibrary.  The
1054     iteratorCommRank 0 processor assigns the static schedule since it is the
1055     only processor with access to beforeSynchCorePRPQueue (it runs the
1056     iterator and calls synchronize).  The alternate design of each peer
1057     selecting its own jobs using the modulus operator would be applicable if
1058     execution of this function (and therefore the job list) were distributed. */
peer_static_schedule_evaluations()1059 void ApplicationInterface::peer_static_schedule_evaluations()
1060 {
1061   // rounding down num_peer1_jobs offloads this processor (which has additional
1062   // work relative to other peers), but results in a few more passed messages.
1063   int num_jobs       = beforeSynchCorePRPQueue.size(),
1064       num_peer1_jobs = (int)std::floor((Real)num_jobs/numEvalServers),
1065       num_sends      = num_jobs - num_peer1_jobs;
1066   Cout << "Peer static schedule: assigning " << num_jobs << " jobs among "
1067        << numEvalServers << " peers\n";
1068   sendBuffers  = new MPIPackBuffer   [num_sends];
1069   recvBuffers  = new MPIUnpackBuffer [num_sends];
1070   recvRequests = new MPI_Request     [num_sends];
1071   int i, server_id, fn_eval_id;
1072 
1073   // Assign jobs locally + remotely using a round-robin assignment.  Since
1074   // this is a static schedule, all remote job assignments are sent now.  This
1075   // assignment is not dependent on the capacity of the other peers (i.e.,
1076   // on whether they run serve_evaluation_synch or serve_evaluation_asynch).
1077   PRPQueueIter prp_iter = beforeSynchCorePRPQueue.begin();
1078   PRPQueue local_prp_queue; size_t buff_index = 0;
1079   for (i=1; i<=num_jobs; ++i, ++prp_iter) { // shift by 1 to reduce peer 1 work
1080     server_id = i%numEvalServers; // 0 to numEvalServers-1
1081     if (server_id) { // 1 to numEvalServers-1
1082       send_evaluation(prp_iter, buff_index, server_id, true); // peer
1083       ++buff_index;
1084     }
1085     else
1086       local_prp_queue.insert(*prp_iter);
1087   }
1088   // This simple approach is not best for hybrid mode + asynchLocalEvalStatic:
1089   // Peer 1 retains the first num_peer1_jobs and spreads the rest to peers 2
1090   // through n in a round-robin assignment.
1091   //PRPQueueIter prp_iter = beforeSynchCorePRPQueue.begin();
1092   //std::advance(prp_iter, num_peer1_jobs); // offset PRP list by num_peer1_jobs
1093   //PRPQueueIter prp_iter_save = prp_iter;
1094   //for (i=0; i<num_sends; ++i, ++prp_iter) {
1095   //  server_id = i%(numEvalServers-1) + 1; // 1 to numEvalServers-1
1096   //  send_evaluation(prp_iter, i, server_id, true); // peer
1097   //}
1098   // Perform computation for first num_peer1_jobs jobs on peer 1.
1099   //PRPQueue local_prp_queue(beforeSynchCorePRPQueue.begin(), prp_iter_save);
1100 
1101   // Perform computations on peer 1.  Default behavior is synchronous evaluation
1102   // of jobs on each peer.  Only if asynchLocalEvalConcurrency > 1 do we get the
1103   // hybrid parallelism of asynch jobs on each peer (asynchLocalEvalConcurrency
1104   // serves a dual role: throttles concurrency if asynch local alone, and
1105   // multiplies concurrency if hybrid).
1106   if (asynchLocalEvalConcurrency > 1) { // peer_dynamic is default in this case
1107     Cout << "Peer static schedule: peer 1 scheduling " << num_peer1_jobs
1108 	 << " local jobs\n";
1109     asynchronous_local_evaluations(local_prp_queue);
1110   }
1111   else { // 1 synchronous job at a time
1112     Cout << "Peer static schedule: peer 1 evaluating " << num_peer1_jobs
1113 	 << " local jobs\n";
1114     synchronous_local_evaluations(local_prp_queue);
1115   }
1116   // reassign used to be required for beforeSynchDuplicates to work properly in
1117   // synchronize(), but is now unnecessary due to response representation
1118   // sharing between local_prp_queue and beforeSynchCorePRPQueue.
1119   //for (i=0; i<num_peer1_jobs; ++i)
1120   //  beforeSynchCorePRPQueue[core_index].response(
1121   //    local_prp_queue[i].response());
1122 
1123   if (num_sends) { // Retrieve results from peers
1124     if (outputLevel > SILENT_OUTPUT)
1125       Cout << "Peer static schedule: waiting on assigned jobs" << std::endl;
1126     parallelLib.waitall(num_sends, recvRequests);
1127 
1128     // All buffers received, now generate rawResponseMap
1129     prp_iter = beforeSynchCorePRPQueue.begin(); buff_index = 0;
1130     for (i=1; i<=num_jobs; ++i, ++prp_iter) {// shift by 1 to reduce peer 1 work
1131       server_id = i%numEvalServers; // 0 to numEvalServers-1
1132       if (server_id) {
1133 	receive_evaluation(prp_iter, buff_index, server_id, true); // peer
1134 	++buff_index;
1135       }
1136     }
1137     // Mirrors simple assignment approach above.
1138     //prp_iter = prp_iter_save; // offset start by num_peer1_jobs
1139     //for (i=0; i<num_sends; ++i, ++prp_iter) {
1140     //  server_id = i%(numEvalServers-1) + 1; // 1 to numEvalServers-1
1141     //  receive_evaluation(prp_iter, i, server_id, true); // peer
1142     //}
1143   }
1144 
1145   // deallocate MPI & buffer arrays
1146   delete [] sendBuffers;   sendBuffers = NULL;
1147   delete [] recvBuffers;   recvBuffers = NULL;
1148   delete [] recvRequests; recvRequests = NULL;
1149 }
1150 
1151 
1152 /** This code runs on the iteratorCommRank 0 processor (the iterator) and is
1153     called from synchronize() in order to manage a dynamic schedule, as
1154     enabled by nonblocking management of local asynchronous jobs.  It
1155     matches serve_evaluations_{synch,asynch}() for other evaluation
1156     partitions, depending on asynchLocalEvalConcurrency; it does not match
1157     serve_evaluations_peer() since, for local asynchronous jobs, the first
1158     evaluation partition cannot be multiprocessor.  It performs function
1159     evaluations locally for its portion of the job allocation using
1160     asynchronous_local_evaluations_nowait().  Single-level and multilevel
1161     parallel use intra- and inter-communicators, respectively, for
1162     send/receive.  Specific syntax is encapsulated within ParallelLibrary. */
peer_dynamic_schedule_evaluations()1163 void ApplicationInterface::peer_dynamic_schedule_evaluations()
1164 {
1165   size_t num_jobs   = beforeSynchCorePRPQueue.size(),
1166     server_capacity = std::max(1, asynchLocalEvalConcurrency),
1167     total_capacity  = numEvalServers * server_capacity,
1168     remote_capacity = total_capacity - server_capacity;
1169 
1170   // avoid nonblocking local scheduling, if not required.
1171   // Note: this scheduler switch needs to be fully consistent with inter-comm
1172   // creation logic in init_communicators().
1173   //if (num_jobs <= remote_capacity && !multiProcEvalFlag)
1174   //  { master_dynamic_schedule_evaluations(); return; }
1175 
1176   // Use round-robin assignment, skipping peer1 on 1st round.  Alternatively,
1177   // could minimize local job count, but this is less balanced (also consider
1178   // the memory budgeting for sims) and leads to less intuitive id assignments.
1179   size_t num_assign   = std::min(total_capacity, num_jobs),
1180     num_local_assign  = num_assign / numEvalServers, // truncates fractional
1181     num_remote_assign = num_assign - num_local_assign;
1182   Cout << "Peer dynamic schedule: first pass assigning " << num_remote_assign
1183        << " jobs among " << numEvalServers-1 << " remote peers\n";
1184   sendBuffers  = new MPIPackBuffer   [num_remote_assign];
1185   recvBuffers  = new MPIUnpackBuffer [num_remote_assign];
1186   recvRequests = new MPI_Request     [num_remote_assign];
1187   int i, server_id, fn_eval_id;
1188   PRPQueueIter assign_iter = beforeSynchCorePRPQueue.begin();
1189   PRPQueue local_prp_queue; size_t buff_index = 0;
1190   for (i=1; i<=num_assign; ++i, ++assign_iter) {//shift +1 to prefer remote work
1191     server_id = i%numEvalServers; // 0 to numEvalServers-1
1192     if (server_id) { // 1 to numEvalServers-1
1193       send_evaluation(assign_iter, buff_index, server_id, true); // peer
1194       msgPassRunningMap[assign_iter->eval_id()]
1195 	= IntSizetPair(server_id, buff_index);
1196       ++buff_index;
1197     }
1198     else
1199       local_prp_queue.insert(*assign_iter);
1200   }
1201 
1202   // Start nonblocking asynch local computations on peer 1
1203   Cout << "Peer dynamic schedule: first pass launching " << num_local_assign
1204        << " local jobs\n";
1205   // "Step 1" of asynch_local_evaluations_nowait()
1206   PRPQueueIter local_prp_iter;
1207   assign_asynch_local_queue(local_prp_queue, local_prp_iter);
1208 
1209   // block until local and remote scheduling are complete
1210   if (outputLevel > SILENT_OUTPUT && num_jobs > num_assign)
1211     Cout << "Peer dynamic schedule: second pass scheduling "
1212 	 << num_jobs - num_assign << " remaining jobs" << std::endl;
1213   size_t recv_cntr = 0;
1214   while (recv_cntr < num_jobs) {
1215     // process completed message passing jobs and backfill
1216     recv_cntr += test_receives_backfill(assign_iter, true); // peer
1217     // "Step 2" and "Step 3" of asynch_local_evaluations_nowait()
1218     recv_cntr += test_local_backfill(beforeSynchCorePRPQueue, assign_iter);
1219   }
1220 
1221   // deallocate MPI & buffer arrays
1222   delete [] sendBuffers;   sendBuffers = NULL;
1223   delete [] recvBuffers;   recvBuffers = NULL;
1224   delete [] recvRequests; recvRequests = NULL;
1225 }
1226 
1227 
1228 /** This function provides blocking synchronization for the local asynch
1229     case (background system call, nonblocking fork, or threads).  It can
1230     be called from synchronize() for a complete local scheduling of all
1231     asynchronous jobs or from peer_{static,dynamic}_schedule_evaluations()
1232     to perform a local portion of the total job set.  It uses
1233     derived_map_asynch() to initiate asynchronous evaluations and
1234     wait_local_evaluations() to capture completed jobs, and mirrors the
1235     master_dynamic_schedule_evaluations() message passing scheduler as much
1236     as possible (wait_local_evaluations() is modeled after MPI_Waitsome()). */
1237 void ApplicationInterface::
asynchronous_local_evaluations(PRPQueue & local_prp_queue)1238 asynchronous_local_evaluations(PRPQueue& local_prp_queue)
1239 {
1240   size_t i, static_servers, server_index, num_jobs = local_prp_queue.size(),
1241     num_active, /*num_launch,*/ num_sends = (asynchLocalEvalConcurrency) ?
1242       std::min((size_t)asynchLocalEvalConcurrency, num_jobs) : num_jobs;
1243   bool static_limited
1244     = (asynchLocalEvalStatic && asynchLocalEvalConcurrency > 1);
1245   if (static_limited)
1246     static_servers = asynchLocalEvalConcurrency * numEvalServers;
1247 
1248   // Step 1: first pass launching of jobs up to the local server capacity
1249   Cout << "First pass: initiating ";
1250   if (static_limited) Cout << "at most ";
1251   Cout << num_sends << " local asynchronous jobs\n";
1252   PRPQueueIter local_prp_iter;
1253   assign_asynch_local_queue(local_prp_queue, local_prp_iter);
1254 
1255   num_active = /*num_launch =*/ asynchLocalActivePRPQueue.size();
1256   if (num_active < num_jobs) {
1257     Cout << "Second pass: ";
1258     if (static_limited) Cout << "static ";
1259     Cout << "scheduling " << num_jobs - num_active
1260 	 << " remaining local asynchronous jobs\n";
1261   }
1262 
1263   size_t recv_cntr = 0, completed; bool launch;
1264   while (recv_cntr < num_jobs) {
1265 
1266     /*
1267     // SEND TERM ASAP
1268     if (multiProcEvalFlag && num_launch == num_jobs) {
1269       // stop serve_evaluation_{,a}synch_peer procs
1270       int fn_eval_id = 0;
1271       parallelLib.bcast_e(fn_eval_id);
1272     }
1273     */
1274 
1275     // Step 2: process completed jobs with wait_local_evaluations()
1276     if (outputLevel > SILENT_OUTPUT) {
1277       if(batchEval)
1278         Cout << "Waiting on completed batch" << std::endl;
1279       else
1280         Cout << "Waiting on completed jobs" << std::endl;
1281     }
1282     completionSet.clear();
1283     wait_local_evaluations(asynchLocalActivePRPQueue); // rebuilds completionSet
1284     recv_cntr += completed = completionSet.size();
1285     for (ISCIter id_iter = completionSet.begin();
1286 	 id_iter != completionSet.end(); ++id_iter)
1287       { process_asynch_local(*id_iter); --num_active; }
1288 
1289     // Step 3: backfill completed jobs with the next pending jobs (if present)
1290     if (static_limited) // reset to start of local queue
1291       local_prp_iter = local_prp_queue.begin();
1292     for (i=0; local_prp_iter != local_prp_queue.end(); ++i, ++local_prp_iter) {
1293       int fn_eval_id = local_prp_iter->eval_id();
1294       launch = false;
1295       if (static_limited) {
1296 	server_index = (fn_eval_id - 1) % static_servers;
1297 	if ( lookup_by_eval_id(asynchLocalActivePRPQueue, fn_eval_id) ==
1298 	     asynchLocalActivePRPQueue.end() &&
1299 	     rawResponseMap.find(fn_eval_id) == rawResponseMap.end() &&
1300 	   //all_completed.find(fn_eval_id) == all_completed.end() &&
1301 	     !localServerAssigned[server_index] )
1302 	  { launch = true; localServerAssigned.set(server_index); }
1303       }
1304       else {
1305 	if (i < completed) launch = true;
1306 	else               break;
1307       }
1308 
1309       if (launch) {
1310 	launch_asynch_local(local_prp_iter); ++num_active; //++num_launch;
1311 	if (static_limited && num_active == asynchLocalEvalConcurrency)
1312 	  break;
1313       }
1314     }
1315   }
1316 
1317   //clear_bookkeeping(); // clear any bookkeeping lists in derived classes
1318 }
1319 
1320 
1321 void ApplicationInterface::
assign_asynch_local_queue(PRPQueue & local_prp_queue,PRPQueueIter & local_prp_iter)1322 assign_asynch_local_queue(PRPQueue& local_prp_queue,
1323 			  PRPQueueIter& local_prp_iter)
1324 {
1325   // This fn is used to assign an initial set of jobs; no local jobs should
1326   // be active at this point.
1327   if (!asynchLocalActivePRPQueue.empty()) {
1328     Cerr << "Error: ApplicationInterface::assign_asynch_local_queue() invoked "
1329 	 << "with existing asynch local jobs." << std::endl;
1330     abort_handler(-1);
1331   }
1332 
1333   // special data for static-scheduling case: asynch local concurrency is
1334   // limited and we need to stratify the job scheduling according to eval id.
1335   // This case has to handle non-contiguous eval IDs as it could happen with
1336   // restart; fill jobs until all servers busy or at end of queue -- this is
1337   // similar to nowait case.
1338   bool static_limited
1339     = (asynchLocalEvalStatic && asynchLocalEvalConcurrency > 1);
1340   size_t static_servers;
1341   if (static_limited) {
1342     static_servers = asynchLocalEvalConcurrency * numEvalServers;
1343     if (localServerAssigned.size() != static_servers)
1344       localServerAssigned.resize(static_servers);
1345     localServerAssigned.reset(); // in blocking case, always reset job map
1346   }
1347   // for static_limited, need an aggregated set of completions for job launch
1348   // testing (completionList only spans the current asynchLocalActivePRPQueue;
1349   // rawResponseMap spans beforeSynchCorePRPQueue jobs at the synch{,_nowait}()
1350   // level).  Since the incoming local_prp_queue can involve a subset of
1351   // beforeSynchCorePRPQueue, all_completed spans a desired intermediate level.
1352   // However, for purposes of identifying completed evaluations, presence in
1353   // the higher-level rawResponseMap scope is sufficient to indicate
1354   // completion, with a small penalty of searches within a larger queue.
1355   //IntSet all_completed; // track all completed evals within local_prp_queue
1356 
1357   int fn_eval_id, num_jobs = local_prp_queue.size();
1358   if (multiProcEvalFlag) // TO DO: deactivate this bcast
1359     parallelLib.bcast_e(num_jobs);
1360   size_t i, server_index, num_active = 0,
1361     num_sends = (asynchLocalEvalConcurrency) ?
1362       std::min(asynchLocalEvalConcurrency, num_jobs) : // limited by user spec.
1363       num_jobs; // unlimited (default): launch all jobs in first pass
1364   bool launch;
1365 
1366   // Step 1: launch jobs up to asynch concurrency limit (if specified)
1367   for (i=0, local_prp_iter = local_prp_queue.begin();
1368        local_prp_iter != local_prp_queue.end(); ++i, ++local_prp_iter) {
1369     launch = false;
1370     fn_eval_id = local_prp_iter->eval_id();
1371     if (static_limited) {
1372       server_index = (fn_eval_id - 1) % static_servers;
1373       if (!localServerAssigned[server_index]) // if local "server" not busy
1374 	{ launch = true; ++num_active; localServerAssigned.set(server_index); }
1375     }
1376     else {
1377       if (i<num_sends) launch = true;
1378       else             break;
1379     }
1380     if (launch)
1381       launch_asynch_local(local_prp_iter);
1382     if (static_limited && num_active == asynchLocalEvalConcurrency)
1383       break;
1384   }
1385 }
1386 
1387 
1388 size_t ApplicationInterface::
test_receives_backfill(PRPQueueIter & assign_iter,bool peer_flag)1389 test_receives_backfill(PRPQueueIter& assign_iter, bool peer_flag)
1390 {
1391   if (outputLevel == DEBUG_OUTPUT)
1392     Cout << "Testing message receives from remote servers\n";
1393 
1394   int mpi_test_flag, fn_eval_id, new_eval_id, server_id;
1395   size_t buff_index;
1396   MPI_Status status; // only 1 needed for parallelLib.test()
1397   std::map<int, IntSizetPair>::iterator run_iter; PRPQueueIter return_iter;
1398   IntIntMap removals; size_t receives = 0;
1399 
1400   for (run_iter  = msgPassRunningMap.begin();
1401        run_iter != msgPassRunningMap.end(); ++run_iter) {
1402     IntSizetPair& id_index = run_iter->second;
1403     buff_index = id_index.second;
1404     parallelLib.test(recvRequests[buff_index], mpi_test_flag, status);
1405     if (mpi_test_flag) {
1406       fn_eval_id  = run_iter->first; // or status.MPI_TAG;
1407       server_id   = id_index.first;
1408       return_iter = lookup_by_eval_id(beforeSynchCorePRPQueue, fn_eval_id);
1409       receive_evaluation(return_iter, buff_index, server_id, peer_flag);
1410       ++receives;
1411 
1412       // replace job if more are pending
1413       bool new_job = false;
1414       while (assign_iter != beforeSynchCorePRPQueue.end()) {
1415 	new_eval_id = assign_iter->eval_id();
1416 	if (msgPassRunningMap.find(new_eval_id) == msgPassRunningMap.end() &&
1417 	    lookup_by_eval_id(asynchLocalActivePRPQueue, new_eval_id) ==
1418 	    asynchLocalActivePRPQueue.end() &&
1419 	    rawResponseMap.find(new_eval_id) == rawResponseMap.end())
1420 	  { new_job = true; break; }
1421 	++assign_iter;
1422       }
1423       if (new_job) {
1424 	// assign job
1425 	send_evaluation(assign_iter, buff_index, server_id, peer_flag);
1426 	// update bookkeeping
1427 	removals[fn_eval_id] = new_eval_id; // replace old with new
1428 	++assign_iter;
1429       }
1430       else
1431 	removals[fn_eval_id] = 0; // no replacement, just remove
1432     }
1433   }
1434 
1435   // update msgPassRunningMap.  This is not done inside loop above to:
1436   // (1) avoid any iterator invalidation, and
1437   // (2) avoid testing of newly inserted jobs (violates scheduling fairness)
1438   for (IntIntMIter it=removals.begin(); it!=removals.end(); ++it) {
1439     int remove_id = it->first, replace_id = it->second;
1440     if (replace_id) {
1441       run_iter = msgPassRunningMap.find(remove_id);
1442       msgPassRunningMap[replace_id] = run_iter->second; // does not invalidate
1443       msgPassRunningMap.erase(run_iter); // run_iter still valid
1444     }
1445     else
1446       msgPassRunningMap.erase(remove_id);
1447   }
1448 
1449   return receives;
1450 }
1451 
1452 
1453 size_t ApplicationInterface::
test_local_backfill(PRPQueue & assign_queue,PRPQueueIter & assign_iter)1454 test_local_backfill(PRPQueue& assign_queue, PRPQueueIter& assign_iter)
1455 {
1456   if (outputLevel == DEBUG_OUTPUT) // explicit debug user specification
1457     Cout << "Testing local completions\n";
1458 
1459   bool static_limited
1460     = (asynchLocalEvalStatic && asynchLocalEvalConcurrency > 1);
1461   size_t static_servers, server_index;
1462   if (static_limited)
1463     static_servers = asynchLocalEvalConcurrency * numEvalServers;
1464 
1465   // "Step 2" (of asynch_local_evaluations_nowait()): process any completed
1466   // jobs using test_local_evaluations()
1467   completionSet.clear();
1468   test_local_evaluations(asynchLocalActivePRPQueue); // rebuilds completionSet
1469   size_t completed = completionSet.size();
1470   for (ISCIter id_iter = completionSet.begin();
1471        id_iter != completionSet.end(); ++id_iter)
1472     process_asynch_local(*id_iter);
1473 
1474   // "Step 3" (of asynch_local_evaluations_nowait()): backfill completed
1475   // jobs with the next pending jobs from assign_queue (if present)
1476   if (completed) {
1477     int fn_eval_id; bool launch;
1478     size_t num_active = asynchLocalActivePRPQueue.size();
1479     if (static_limited) assign_iter = assign_queue.begin(); // reset to start
1480     for (; assign_iter != assign_queue.end(); ++assign_iter) {
1481       fn_eval_id = assign_iter->eval_id();
1482       // test of msgPassRunningMap needed for static_limited or for
1483       // peer_dynamic_schedule_evaluations_nowait()
1484       if ( lookup_by_eval_id(asynchLocalActivePRPQueue, fn_eval_id) ==
1485 	   asynchLocalActivePRPQueue.end() &&
1486 	   msgPassRunningMap.find(fn_eval_id) == msgPassRunningMap.end() &&
1487 	   rawResponseMap.find(fn_eval_id)    == rawResponseMap.end() ) {
1488 	launch = true;
1489 	if (static_limited) { // only schedule if local "server" not busy
1490 	  server_index = (fn_eval_id - 1) % static_servers;
1491 	  if (localServerAssigned[server_index]) launch = false;
1492 	  else            localServerAssigned.set(server_index);
1493 	}
1494 	if (launch) {
1495 	  launch_asynch_local(assign_iter); ++num_active;
1496 	  if (asynchLocalEvalConcurrency && // if throttled
1497 	      num_active >= asynchLocalEvalConcurrency)
1498 	    { ++assign_iter; break; } // else assign_iter incremented by loop
1499 	}
1500       }
1501     }
1502   }
1503 
1504   return completed;
1505 }
1506 
1507 
1508 /** This code is called from synchronize_nowait() to provide the master
1509     portion of a nonblocking master-slave algorithm for the dynamic
1510     scheduling of evaluations among slave servers.  It performs no
1511     evaluations locally and matches either serve_evaluations_synch()
1512     or serve_evaluations_asynch() on the slave servers, depending on
1513     the value of asynchLocalEvalConcurrency.  Dynamic scheduling
1514     assigns jobs in 2 passes.  The 1st pass gives each server the same
1515     number of jobs (equal to asynchLocalEvalConcurrency).  The 2nd
1516     pass assigns the remaining jobs to slave servers as previous jobs
1517     are completed.  Single- and multilevel parallel use intra- and
1518     inter-communicators, respectively, for send/receive.  Specific
1519     syntax is encapsulated within ParallelLibrary. */
master_dynamic_schedule_evaluations_nowait()1520 void ApplicationInterface::master_dynamic_schedule_evaluations_nowait()
1521 {
1522   // beforeSynchCorePRPQueue includes running evaluations plus new requests;
1523   // previous completions have been removed by synchronize_nowait().  Thus,
1524   // queue size could be larger or smaller than on previous nowait invocation.
1525   size_t i, num_jobs = beforeSynchCorePRPQueue.size(), buff_index, server_index,
1526     server_job_index, num_running  = msgPassRunningMap.size(),
1527     num_backfill = num_jobs - num_running, capacity = numEvalServers;
1528   if (asynchLocalEvalConcurrency > 1) capacity *= asynchLocalEvalConcurrency;
1529   int fn_eval_id, server_id;
1530 
1531   // allocate capacity entries since this avoids need for dynamic resizing
1532   if (!sendBuffers) {
1533     sendBuffers  = new MPIPackBuffer   [capacity];
1534     recvBuffers  = new MPIUnpackBuffer [capacity];
1535     recvRequests = new MPI_Request     [capacity];
1536   }
1537 
1538   // Step 1: launch any new jobs up to capacity limit
1539   PRPQueueIter assign_iter = beforeSynchCorePRPQueue.begin();
1540   if (!num_running) { // simplest case
1541     num_running = std::min(capacity, num_jobs);
1542     Cout << "Master dynamic schedule: first pass assigning " << num_running
1543 	 << " jobs among " << numEvalServers << " servers\n";
1544     // send data & post receives for 1st set of jobs
1545     for (i=0; i<num_running; ++i, ++assign_iter) {
1546       server_index = i%numEvalServers;
1547       server_id    = server_index + 1; // from 1 to numEvalServers
1548       // stratify buff_index in a manner that's convenient for backfill lookups
1549       server_job_index = i/numEvalServers; // job index local to each server
1550       buff_index = server_index * asynchLocalEvalConcurrency + server_job_index;
1551       // assign job
1552       send_evaluation(assign_iter, buff_index, server_id, false); // !peer
1553       // update bookkeeping
1554       fn_eval_id = assign_iter->eval_id();
1555       msgPassRunningMap[fn_eval_id] = IntSizetPair(server_id, buff_index);
1556     }
1557   }
1558   else if (num_backfill && num_running < capacity) { // fill in any gaps
1559     Cout << "Master dynamic schedule: first pass backfilling jobs up to "
1560 	 << "available capacity\n";
1561     UShortSetArray server_jobs(numEvalServers);
1562     for (std::map<int, IntSizetPair>::iterator r_it = msgPassRunningMap.begin();
1563 	 r_it != msgPassRunningMap.end(); ++r_it) {
1564       server_index = r_it->second.first - 1; buff_index = r_it->second.second;
1565       server_jobs[server_index].insert(buff_index);
1566     }
1567     for (; assign_iter != beforeSynchCorePRPQueue.end() &&
1568 	   num_running < capacity; ++assign_iter) {
1569       fn_eval_id = assign_iter->eval_id();
1570       if (msgPassRunningMap.find(fn_eval_id) == msgPassRunningMap.end()) {
1571 	// find server to use and define index within buffers/requests
1572 	// Approach 1: grab first available slot
1573 	// for (buff_index=0, server_index=0; server_index<numEvalServers;
1574 	//      ++server_index) {
1575 	//   buff_index += server_jobs[server_index];
1576 	//   if (server_jobs[server_index] < asynchEvalConcurrency)
1577 	//     { ++server_jobs[server_index]; break; }
1578 	// }
1579 	// Approach 2: load balance by finding min within server_jobs
1580 	unsigned short load, min_load = server_jobs[0].size();
1581 	size_t min_index = 0;
1582 	for (server_index=1; server_index<numEvalServers; ++server_index) {
1583 	  if (min_load == 0) break;
1584 	  load = server_jobs[server_index].size();
1585 	  if (load < min_load)
1586 	    { min_index = server_index; min_load = load; }
1587 	}
1588 	server_id = min_index + 1; // 1 to numEvalServers
1589 	// find an available buff_index for this server_id.  Logic uses first
1590 	// available within this server's allocation of buffer indices.
1591 	UShortSet& server_jobs_mi = server_jobs[min_index]; bool avail = false;
1592 	size_t max_buff_index = server_id*asynchLocalEvalConcurrency;
1593 	for (buff_index=min_index*asynchLocalEvalConcurrency;
1594 	     buff_index<max_buff_index; ++buff_index)
1595 	  if (server_jobs_mi.find(buff_index) == server_jobs_mi.end())
1596 	    { avail = true; break; }
1597 	if (!avail) {
1598 	  Cerr << "Error: no available buffer index for backfill in Application"
1599 	       << "Interface::master_dynamic_schedule_evaluations_nowait()."
1600 	       << std::endl;
1601 	  abort_handler(-1);
1602 	}
1603 	// assign job
1604 	send_evaluation(assign_iter, buff_index, server_id, false); // !peer
1605 	// update bookkeeping
1606 	msgPassRunningMap[fn_eval_id] = IntSizetPair(server_id, buff_index);
1607 	server_jobs[min_index].insert(buff_index); ++num_running;
1608       }
1609     }
1610   }
1611 
1612   // Step 2: check status of running jobs and backfill any completions
1613   if (headerFlag) {
1614     Cout << "Master dynamic schedule: second pass testing for completions ("
1615 	 << num_running << " running)";
1616     if (num_running == num_jobs) Cout << '\n';
1617     else Cout << " and backfilling (" << num_jobs-num_running <<" remaining)\n";
1618   }
1619   test_receives_backfill(assign_iter, false); // !peer
1620 
1621   if (msgPassRunningMap.empty()) {
1622     // deallocate MPI & buffer arrays
1623     delete [] sendBuffers;   sendBuffers = NULL;
1624     delete [] recvBuffers;   recvBuffers = NULL;
1625     delete [] recvRequests; recvRequests = NULL;
1626   }
1627 }
1628 
1629 
1630 /** This code runs on the iteratorCommRank 0 processor (the iterator)
1631     and is called from synchronize_nowait() in order to manage a
1632     nonblocking static schedule.  It matches serve_evaluations_synch()
1633     for other evaluation partitions (asynchLocalEvalConcurrency == 1).
1634     It performs blocking local function evaluations, one at a time,
1635     for its portion of the static schedule and checks for remote
1636     completions in between each local completion.  Therefore, unlike
1637     peer_dynamic_schedule_evaluations_nowait(), this scheduler will
1638     always return at least one job.  Single-level and multilevel
1639     parallel use intra- and inter-communicators, respectively, for
1640     send/receive, with specific syntax as encapsulated within
1641     ParallelLibrary.  The iteratorCommRank 0 processor assigns the
1642     static schedule since it is the only processor with access to
1643     beforeSynchCorePRPQueue (it runs the iterator and calls
1644     synchronize).  The alternate design of each peer selecting its own
1645     jobs using the modulus operator would be applicable if execution
1646     of this function (and therefore the job list) were distributed. */
peer_static_schedule_evaluations_nowait()1647 void ApplicationInterface::peer_static_schedule_evaluations_nowait()
1648 {
1649   // beforeSynchCorePRPQueue includes running evaluations plus new requests;
1650   // previous completions have been removed by synchronize_nowait().  Thus,
1651   // queue size could be larger or smaller than on previous nowait invocation.
1652   // Rounding down num_local_jobs offloads this processor (which has additional
1653   // work relative to other peers), but results in a few more passed messages.
1654   int fn_eval_id, server_id;
1655   size_t i, num_jobs = beforeSynchCorePRPQueue.size(), buff_index, server_index,
1656     server_job_index, num_remote_running = msgPassRunningMap.size(),
1657     num_local_running = asynchLocalActivePRPQueue.size(),
1658     num_running  = num_remote_running + num_local_running,
1659     num_backfill = num_jobs - num_running, local_capacity = 1,
1660     capacity     = numEvalServers;
1661   if (asynchLocalEvalConcurrency > 1) {
1662     local_capacity = asynchLocalEvalConcurrency;
1663     capacity      *= asynchLocalEvalConcurrency;
1664   }
1665   size_t remote_capacity = capacity - local_capacity;
1666 
1667   // peer static nowait supports blocking local evals, taken one at a time,
1668   // if necessary to support direct interfaces or multiproc evals.
1669   bool synch_local
1670     = ( multiProcEvalFlag || ( interfaceType & DIRECT_INTERFACE_BIT ) );
1671   //bool static_limited
1672   //  = ( asynchLocalEvalStatic || evalScheduling == PEER_STATIC_SCHEDULING );
1673 
1674   // allocate remote_capacity entries as this avoids need for dynamic resizing
1675   if (!sendBuffers) {
1676     sendBuffers  = new MPIPackBuffer   [remote_capacity];
1677     recvBuffers  = new MPIUnpackBuffer [remote_capacity];
1678     recvRequests = new MPI_Request     [remote_capacity];
1679   }
1680 
1681   PRPQueueIter assign_iter = beforeSynchCorePRPQueue.begin(), local_prp_iter;
1682   if (!num_running) { // simplest case
1683 
1684     size_t num_local_jobs = (size_t)std::floor((Real)num_jobs/numEvalServers),
1685       num_remote_jobs  = num_jobs - num_local_jobs;
1686     num_local_running  = std::min(local_capacity,  num_local_jobs);
1687     num_remote_running = std::min(remote_capacity, num_remote_jobs);
1688     num_running = num_local_running + num_remote_running;
1689 
1690     // this reference used to preserve static server assignment
1691     //if (static_limited)
1692     nowaitEvalIdRef = assign_iter->eval_id();
1693 
1694     Cout << "Peer static schedule: first pass assigning " << num_remote_running
1695 	 << " jobs among " << numEvalServers-1 << " remote peers\n";
1696     PRPQueue local_prp_queue; buff_index = 0;
1697     for (i=1; i<=num_running; ++i, ++assign_iter) {//shift by 1 to reduce pr1 wk
1698       server_id = i%numEvalServers; // 0 to numEvalServers-1
1699       if (server_id) { // 1 to numEvalServers-1
1700 	send_evaluation(assign_iter, buff_index, server_id, true); // peer
1701 	// update bookkeeping
1702 	fn_eval_id = assign_iter->eval_id();
1703 	msgPassRunningMap[fn_eval_id] = IntSizetPair(server_id, buff_index);
1704 	++buff_index;
1705       }
1706       else
1707 	local_prp_queue.insert(*assign_iter);
1708     }
1709 
1710     // Perform computation for first num_local_jobs jobs on peer 1.  Default
1711     // behavior is synchronous evaluation of jobs on each peer.  Only if
1712     // asynchLocalEvalConcurrency > 1 do we get the hybrid parallelism of
1713     // asynch jobs on each peer.
1714     Cout << "Peer static schedule: first pass launching " << num_local_running
1715 	 << " local jobs\n";
1716     if (synch_local) // synch launch and complete
1717       synchronous_local_evaluations(local_prp_queue);
1718     else // asynch launch only w/o backfill logic
1719       assign_asynch_local_queue(local_prp_queue, local_prp_iter);
1720   }
1721   else if (num_backfill && num_running < capacity) { // fill in any gaps
1722     Cout << "Peer static schedule: first pass backfilling jobs up to "
1723 	 << "available capacity\n";
1724 
1725     // server_id is 1:numEvalServ-1, server_jobs is indexed 0:numEvalServ-2
1726     UShortSetArray server_jobs(numEvalServers-1); PRPQueue local_prp_queue;
1727     for (std::map<int, IntSizetPair>::iterator r_it = msgPassRunningMap.begin();
1728 	 r_it != msgPassRunningMap.end(); ++r_it) {
1729       server_index = r_it->second.first - 1; buff_index = r_it->second.second;
1730       server_jobs[server_index].insert(buff_index);
1731     }
1732     bool running_mp, running_al, backfill_local = false;
1733     for (; assign_iter != beforeSynchCorePRPQueue.end() &&
1734 	   ( num_local_running  < local_capacity ||
1735 	     num_remote_running < remote_capacity ); ++assign_iter) {
1736       fn_eval_id = assign_iter->eval_id();
1737       // look here first
1738       running_mp = (msgPassRunningMap.find(fn_eval_id) !=
1739 		    msgPassRunningMap.end());
1740       // look here second if not already found
1741       running_al = (running_mp) ? false :
1742 	(lookup_by_eval_id(asynchLocalActivePRPQueue, fn_eval_id) !=
1743 	 asynchLocalActivePRPQueue.end());
1744       if (!running_mp && !running_al) { // can launch as new job
1745 
1746 	// to enable consistent modulo arithmetic on server assignment, use
1747 	// the first eval id from the last complete job set (!num_running) as
1748 	// a reference; +1 is consistent with shift by 1 to reduce peer1 work
1749 	//if (static_limited)
1750 	server_id = (fn_eval_id-nowaitEvalIdRef+1)%numEvalServers;//0 to numES-1
1751         //else could assign based on min load as in peer dynamic nowait case
1752 
1753 	// assign job
1754 	if (server_id == 0 && num_local_running < local_capacity) {
1755 	  local_prp_queue.insert(*assign_iter);
1756 	  ++num_local_running; backfill_local = true;
1757 	}
1758 	else if (server_id && server_jobs[server_id-1].size() < local_capacity){
1759 	  // find an available buff_index for this server_id.  Logic uses first
1760 	  // available within this server's allocation of buffer indices.
1761 	  size_t server_index = server_id - 1,
1762 	    min_buff = server_index * asynchLocalEvalConcurrency,
1763 	    max_buff =     min_buff + asynchLocalEvalConcurrency;
1764 	  UShortSet& server_jobs_mi = server_jobs[server_index];
1765 	  bool avail = false;
1766 	  for (buff_index=min_buff; buff_index<max_buff; ++buff_index)
1767 	    if (server_jobs_mi.find(buff_index) == server_jobs_mi.end())
1768 	      { avail = true; break; }
1769 	  if (!avail) {
1770 	    Cerr << "Error: no available buffer index for backfill in "
1771 		 << "ApplicationInterface::peer_static_schedule_evaluations_"
1772 		 << "nowait()." << std::endl;
1773 	    abort_handler(-1);
1774 	  }
1775 	  // assign job
1776 	  send_evaluation(assign_iter, buff_index, server_id, true);
1777 	  // update bookkeeping
1778 	  msgPassRunningMap[fn_eval_id] = IntSizetPair(server_id, buff_index);
1779 	  server_jobs[server_index].insert(buff_index);
1780 	  ++num_remote_running;
1781 	}
1782       }
1783       else if (running_al) // include in local queue for asynch processing
1784 	local_prp_queue.insert(*assign_iter);
1785     }
1786 
1787     if (backfill_local) {
1788       if (synch_local) synchronous_local_evaluations(local_prp_queue);
1789       else assign_asynch_local_queue_nowait(local_prp_queue, local_prp_iter);
1790     }
1791 
1792     num_running = num_local_running + num_remote_running; // update
1793   }
1794 
1795   // Step 2: check status of running jobs and backfill any completions
1796   if (headerFlag) {
1797     Cout << "Peer static schedule: second pass testing for completions ("
1798 	 << num_running << " running)";
1799     if (num_running == num_jobs) Cout << '\n';
1800     else Cout << " and backfilling (" << num_jobs-num_running <<" remaining)\n";
1801   }
1802   if (num_remote_running)
1803     test_receives_backfill(assign_iter, true); // peer
1804   if (!synch_local && num_local_running)
1805     test_local_backfill(beforeSynchCorePRPQueue, assign_iter);
1806 
1807   if (msgPassRunningMap.empty()) {
1808     // deallocate MPI & buffer arrays
1809     delete [] sendBuffers;   sendBuffers = NULL;
1810     delete [] recvBuffers;   recvBuffers = NULL;
1811     delete [] recvRequests; recvRequests = NULL;
1812   }
1813 }
1814 
1815 
1816 /** This code runs on the iteratorCommRank 0 processor (the iterator) and
1817     is called from synchronize_nowait() in order to manage a nonblocking
1818     static schedule.  It matches serve_evaluations_{synch,asynch}() for
1819     other evaluation partitions (depending on asynchLocalEvalConcurrency).
1820     It performs nonblocking local function evaluations for its portion of
1821     the static schedule using asynchronous_local_evaluations().
1822     Single-level and multilevel parallel use intra- and inter-communicators,
1823     respectively, for send/receive, with specific syntax as encapsulated
1824     within ParallelLibrary.  The iteratorCommRank 0 processor assigns the
1825     dynamic schedule since it is the only processor with access to
1826     beforeSynchCorePRPQueue (it runs the iterator and calls synchronize).
1827     The alternate design of each peer selecting its own jobs using the
1828     modulus operator would be applicable if execution of this function
1829     (and therefore the job list) were distributed. */
peer_dynamic_schedule_evaluations_nowait()1830 void ApplicationInterface::peer_dynamic_schedule_evaluations_nowait()
1831 {
1832   // beforeSynchCorePRPQueue includes running evaluations plus new requests;
1833   // previous completions have been removed by synchronize_nowait().  Thus,
1834   // queue size could be larger or smaller than on previous nowait invocation.
1835   // Rounding down num_local_jobs offloads this processor (which has additional
1836   // work relative to other peers), but results in a few more passed messages.
1837   int fn_eval_id, server_id;
1838   size_t i, num_jobs = beforeSynchCorePRPQueue.size(), buff_index, server_index,
1839     server_job_index, num_remote_running = msgPassRunningMap.size(),
1840     num_local_running = asynchLocalActivePRPQueue.size(),
1841     num_running  = num_remote_running + num_local_running,
1842     num_backfill = num_jobs - num_running, local_capacity = 1,
1843     capacity     = numEvalServers;
1844   if (asynchLocalEvalConcurrency > 1) {
1845     local_capacity = asynchLocalEvalConcurrency;
1846     capacity      *= asynchLocalEvalConcurrency;
1847   }
1848   size_t remote_capacity = capacity - local_capacity;
1849 
1850   // allocate remote_capacity entries as this avoids need for dynamic resizing
1851   if (!sendBuffers) {
1852     sendBuffers  = new MPIPackBuffer   [remote_capacity];
1853     recvBuffers  = new MPIUnpackBuffer [remote_capacity];
1854     recvRequests = new MPI_Request     [remote_capacity];
1855   }
1856 
1857   PRPQueueIter assign_iter = beforeSynchCorePRPQueue.begin(), local_prp_iter;
1858   if (!num_running) { // simplest case
1859     size_t num_local_jobs = (size_t)std::floor((Real)num_jobs/numEvalServers),
1860       num_remote_jobs  = num_jobs - num_local_jobs;
1861     num_local_running  = std::min(local_capacity,  num_local_jobs);
1862     num_remote_running = std::min(remote_capacity, num_remote_jobs);
1863 
1864     // don't leave a static gap since we're going to dynamically assign...
1865     Cout << "Peer dynamic schedule: first pass assigning " << num_remote_running
1866 	 << " jobs among " << numEvalServers-1 << " remote peers\n";
1867     std::advance(assign_iter, num_local_running);//num_local_jobs);
1868     PRPQueueIter assign_iter_save = assign_iter;
1869     for (i=0; i<num_remote_running; ++i, ++assign_iter) {
1870       server_index = i%(numEvalServers-1);
1871       server_id    = server_index + 1; // 1 to numEvalServers-1
1872       // stratify buff_index in a manner that's convenient for backfill lookups
1873       server_job_index = i/(numEvalServers-1); // job index local to each server
1874       buff_index = server_index * asynchLocalEvalConcurrency + server_job_index;
1875       // assign job to remote peer
1876       send_evaluation(assign_iter, buff_index, server_id, true); // peer
1877       // update bookkeeping
1878       fn_eval_id = assign_iter->eval_id();
1879       msgPassRunningMap[fn_eval_id] = IntSizetPair(server_id, buff_index);
1880     }
1881 
1882     // Perform computation for first num_local_jobs jobs on peer 1.  Default
1883     // behavior is synchronous evaluation of jobs on each peer.  Only if
1884     // asynchLocalEvalConcurrency > 1 do we get the hybrid parallelism of
1885     // asynch jobs on each peer.
1886     PRPQueue local_prp_queue(beforeSynchCorePRPQueue.begin(), assign_iter_save);
1887     Cout << "Peer dynamic schedule: first pass launching " << num_local_running
1888 	 << " local jobs\n";
1889     assign_asynch_local_queue(local_prp_queue, local_prp_iter);
1890 
1891     num_running = num_local_running + num_remote_running; // update
1892   }
1893   else if (num_backfill && num_running < capacity) { // fill in any gaps
1894     Cout << "Peer dynamic schedule: first pass backfilling jobs up to "
1895 	 << "available capacity\n";
1896     // server_id is 1:numEvalServ-1, server_jobs is indexed 0:numEvalServ-2
1897     UShortSetArray server_jobs(numEvalServers-1); PRPQueue local_prp_queue;
1898     for (std::map<int, IntSizetPair>::iterator r_it = msgPassRunningMap.begin();
1899 	 r_it != msgPassRunningMap.end(); ++r_it) {
1900       server_index = r_it->second.first - 1; buff_index = r_it->second.second;
1901       server_jobs[server_index].insert(buff_index);
1902     }
1903     bool running_mp, running_al, backfill_local = false;
1904     for (; assign_iter != beforeSynchCorePRPQueue.end() &&
1905 	   ( num_local_running  < local_capacity ||
1906 	     num_remote_running < remote_capacity ); ++assign_iter) {
1907       fn_eval_id = assign_iter->eval_id();
1908       // look here first
1909       running_mp = (msgPassRunningMap.find(fn_eval_id) !=
1910 		    msgPassRunningMap.end());
1911       // look here second if not already found
1912       running_al = (running_mp) ? false :
1913 	(lookup_by_eval_id(asynchLocalActivePRPQueue, fn_eval_id) !=
1914 	 asynchLocalActivePRPQueue.end());
1915       if (!running_mp && !running_al) { // can launch as new job
1916 	// determine min among local and remote loadings; tie goes to remote
1917 	// server_id is 1:numEvalServ-1, server_index is 0:numEvalServ-2
1918 	unsigned short load, min_load = server_jobs[0].size();
1919 	size_t min_server_id = 1;
1920 	for (server_index=1; server_index<numEvalServers-1; ++server_index) {
1921 	  if (min_load == 0) break;
1922 	  load = server_jobs[server_index].size();
1923 	  if (load < min_load)
1924 	    { min_server_id = server_index + 1; min_load = load; }
1925 	}
1926 	if (num_local_running < min_load) // tie goes to remote
1927 	  { min_server_id = 0; min_load = num_local_running; }
1928 
1929 	// assign job
1930 	if (min_server_id == 0 && num_local_running < local_capacity) {
1931 	  local_prp_queue.insert(*assign_iter);
1932 	  ++num_local_running; backfill_local = true;
1933 	}
1934 	else if (min_server_id && num_remote_running < remote_capacity) {
1935 	  // find an available buff_index for this server_id.  Logic uses first
1936 	  // available within this server's allocation of buffer indices.
1937 	  size_t min_server_index = min_server_id - 1,
1938 	    max_buff_index = min_server_id*asynchLocalEvalConcurrency;
1939 	  UShortSet& server_jobs_mi = server_jobs[min_server_index];
1940 	  bool avail = false;
1941 	  for (buff_index=max_buff_index-asynchLocalEvalConcurrency;
1942 	       buff_index<max_buff_index; ++buff_index)
1943 	    if (server_jobs_mi.find(buff_index) == server_jobs_mi.end())
1944 	      { avail = true; break; }
1945 	  if (!avail) {
1946 	    Cerr << "Error: no available buffer index for backfill in "
1947 		 << "ApplicationInterface::peer_dynamic_schedule_evaluations_"
1948 		 << "nowait()."<< std::endl;
1949 	    abort_handler(-1);
1950 	  }
1951 	  // assign job
1952 	  send_evaluation(assign_iter, buff_index, min_server_id, true); // peer
1953 	  // update bookkeeping
1954 	  msgPassRunningMap[fn_eval_id]
1955 	    = IntSizetPair(min_server_id, buff_index);
1956 	  server_jobs[min_server_index].insert(buff_index);
1957 	  ++num_remote_running;
1958 	}
1959       }
1960       else if (running_al) // include in local queue for asynch processing
1961 	local_prp_queue.insert(*assign_iter);
1962     }
1963 
1964     if (backfill_local)
1965       assign_asynch_local_queue_nowait(local_prp_queue, local_prp_iter);
1966 
1967     num_running = num_local_running + num_remote_running; // update
1968   }
1969 
1970   // Step 2: check status of running jobs and backfill any completions
1971   if (headerFlag) {
1972     Cout << "Peer dynamic schedule: second pass testing for completions ("
1973 	 << num_running << " running)";
1974     if (num_running == num_jobs) Cout << '\n';
1975     else Cout << " and backfilling (" << num_jobs-num_running <<" remaining)\n";
1976   }
1977   if (num_remote_running)
1978     test_receives_backfill(assign_iter, true); // peer
1979   if (num_local_running)
1980     test_local_backfill(beforeSynchCorePRPQueue, assign_iter);
1981 
1982   if (msgPassRunningMap.empty()) {
1983     // deallocate MPI & buffer arrays
1984     delete [] sendBuffers;   sendBuffers = NULL;
1985     delete [] recvBuffers;   recvBuffers = NULL;
1986     delete [] recvRequests; recvRequests = NULL;
1987   }
1988 }
1989 
1990 
1991 /** This function provides nonblocking synchronization for the local
1992     asynch case (background system call, nonblocking fork, or
1993     threads).  It is called from synchronize_nowait() and passed the
1994     complete set of all asynchronous jobs (beforeSynchCorePRPQueue).
1995     It uses derived_map_asynch() to initiate asynchronous evaluations
1996     and test_local_evaluations() to capture completed jobs in
1997     nonblocking mode.  It mirrors a nonblocking message passing
1998     scheduler as much as possible (test_local_evaluations() modeled
1999     after MPI_Testsome()).  The result of this function is
2000     rawResponseMap, which uses eval_id as a key.  It is assumed that
2001     the incoming local_prp_queue contains only active and new jobs -
2002     i.e., all completed jobs are cleared by synchronize_nowait().
2003 
2004     Also supports asynchronous local evaluations with static
2005     scheduling.  This scheduling policy specifically ensures that a
2006     completed asynchronous evaluation eval_id is replaced with an
2007     equivalent one, modulo asynchLocalEvalConcurrency.  In the nowait
2008     case, this could render some servers idle if evaluations don't
2009     come in eval_id order or some evaluations are cancelled by the
2010     caller in between calls. If this function is called with unlimited
2011     local eval concurrency, the static scheduling request is ignored. */
2012 void ApplicationInterface::
asynchronous_local_evaluations_nowait(PRPQueue & local_prp_queue)2013 asynchronous_local_evaluations_nowait(PRPQueue& local_prp_queue)
2014 {
2015   size_t num_jobs = local_prp_queue.size(),
2016     num_target = (asynchLocalEvalConcurrency) ?
2017       std::min((size_t)asynchLocalEvalConcurrency, num_jobs) : num_jobs,
2018     num_active = asynchLocalActivePRPQueue.size(),
2019     num_launch = num_target - num_active;
2020   bool static_limited
2021     = (asynchLocalEvalStatic && asynchLocalEvalConcurrency > 1);
2022 
2023   // Step 1: update asynchLocalActivePRPQueue with jobs from local_prp_queue
2024   PRPQueueIter local_prp_iter = local_prp_queue.begin();
2025   if (num_launch) {
2026     Cout << "First pass: initiating ";
2027     if (static_limited) Cout << "at most ";
2028     Cout << num_launch << " local asynchronous jobs\n";
2029     assign_asynch_local_queue_nowait(local_prp_queue, local_prp_iter);
2030   }
2031 
2032   // Step 2: process any completed jobs and backfill if necessary
2033   num_active = asynchLocalActivePRPQueue.size(); // update
2034   if (headerFlag) {
2035     Cout << "Second pass: testing for completions (" << num_active<<" running)";
2036     if (num_active == num_jobs) Cout << '\n';
2037     else Cout << " and backfilling (" << num_jobs-num_active << " remaining)\n";
2038   }
2039   test_local_backfill(local_prp_queue, local_prp_iter);
2040 }
2041 
2042 
2043 void ApplicationInterface::
assign_asynch_local_queue_nowait(PRPQueue & local_prp_queue,PRPQueueIter & local_prp_iter)2044 assign_asynch_local_queue_nowait(PRPQueue& local_prp_queue,
2045 				 PRPQueueIter& local_prp_iter)
2046 {
2047   // As compared to assign_asynch_local_queue(), this fn may be used to augment
2048   // an existing set of active jobs (as is appropriate within a nowait context).
2049 
2050   // special data for static scheduling case: asynch local concurrency is
2051   // limited and we need to stratify the job scheduling according to eval id.
2052   bool static_limited
2053     = (asynchLocalEvalStatic && asynchLocalEvalConcurrency > 1);
2054   size_t static_servers;
2055   if (static_limited) {
2056     static_servers = asynchLocalEvalConcurrency * numEvalServers;
2057     if (localServerAssigned.size() != static_servers) {
2058       localServerAssigned.resize(static_servers);
2059       localServerAssigned.reset(); // nonblocking case: only reset on 1st call
2060     }
2061   }
2062 
2063   int fn_eval_id, num_jobs = local_prp_queue.size();
2064   size_t server_index, num_active = asynchLocalActivePRPQueue.size();
2065   bool launch;
2066   if (multiProcEvalFlag) // TO DO: deactivate this bcast
2067     parallelLib.bcast_e(num_jobs);
2068 
2069   // Step 1: launch any new jobs up to asynch concurrency limit (if specified)
2070   for (local_prp_iter  = local_prp_queue.begin();
2071        local_prp_iter != local_prp_queue.end(); ++local_prp_iter) {
2072     if (asynchLocalEvalConcurrency && // not unlimited
2073 	num_active >= asynchLocalEvalConcurrency)
2074       break;
2075     fn_eval_id = local_prp_iter->eval_id();
2076     if (lookup_by_eval_id(asynchLocalActivePRPQueue, fn_eval_id) ==
2077 	asynchLocalActivePRPQueue.end()) {
2078       launch = false;
2079       if (static_limited) { // only schedule if local "server" not busy
2080 	server_index = (fn_eval_id - 1) % static_servers;
2081 	if (!localServerAssigned[server_index])
2082 	  { launch = true; localServerAssigned.set(server_index); }
2083       }
2084       else launch = true;
2085       if (launch)
2086 	{ launch_asynch_local(local_prp_iter); ++num_active; }
2087     }
2088   }
2089 }
2090 
2091 
2092 /** This function provides blocking synchronization for the local
2093     synchronous case (foreground system call, blocking fork, or
2094     procedure call from derived_map()).  It is called from
2095     peer_static_schedule_evaluations() to perform a local portion of
2096     the total job set. */
2097 void ApplicationInterface::
synchronous_local_evaluations(PRPQueue & local_prp_queue)2098 synchronous_local_evaluations(PRPQueue& local_prp_queue)
2099 {
2100   for (PRPQueueIter local_prp_iter = local_prp_queue.begin();
2101        local_prp_iter != local_prp_queue.end(); ++local_prp_iter) {
2102     currEvalId              = local_prp_iter->eval_id();
2103     const Variables& vars   = local_prp_iter->variables();
2104     const ActiveSet& set    = local_prp_iter->active_set();
2105     Response local_response = local_prp_iter->response(); // shared rep
2106 
2107     // bcast the job to other processors within peer 1 (if required)
2108     if (multiProcEvalFlag)
2109       broadcast_evaluation(*local_prp_iter);
2110 
2111     try { derived_map(vars, set, local_response, currEvalId); } // synch. local
2112 
2113     catch(const FunctionEvalFailure& fneval_except) {
2114       manage_failure(vars, set, local_response, currEvalId);
2115     }
2116 
2117     process_synch_local(local_prp_iter);
2118   }
2119 }
2120 
2121 
2122 void ApplicationInterface::
broadcast_evaluation(int fn_eval_id,const Variables & vars,const ActiveSet & set)2123 broadcast_evaluation(int fn_eval_id, const Variables& vars,
2124 		     const ActiveSet& set)
2125 {
2126   // match bcast_e()'s in serve_evaluations_{synch,asynch,peer}
2127   parallelLib.bcast_e(fn_eval_id);
2128   MPIPackBuffer send_buffer(lenVarsActSetMessage);
2129   send_buffer << vars << set;
2130 
2131 #ifdef MPI_DEBUG
2132   Cout << "broadcast_evaluation() for eval " << fn_eval_id
2133        << " with send_buffer size = " << send_buffer.size()
2134        << " and ActiveSet:\n" << set << std::endl;
2135 #endif // MPI_DEBUG
2136 
2137   parallelLib.bcast_e(send_buffer);
2138 }
2139 
2140 
2141 /** Invoked by the serve() function in derived Model classes.  Passes
2142     control to serve_evaluations_synch(), serve_evaluations_asynch(),
2143     serve_evaluations_synch_peer(), or serve_evaluations_asynch_peer()
2144     according to specified concurrency, partition, and scheduler
2145     configuration. */
serve_evaluations()2146 void ApplicationInterface::serve_evaluations()
2147 {
2148   // This function is only called when message passing, therefore the simple
2149   // test below is sufficient.  Would like to use serve_evaluation_synch() in
2150   // the case where a request for asynch local concurrency is made but it is
2151   // not utilized (num_jobs <= numEvalServers in *_schedule_evaluations), but
2152   // this is not currently detectable by the slave since num_jobs is not known
2153   // a priori.  serve_evaluation_asynch() still works in this case, but it
2154   // exposes the run to race conditions from the asynch processing procedure.
2155   // Thus, it is up to the user to avoid specifying asynch local concurrency
2156   // when num_jobs is not strictly greater than numEvalServers.
2157 
2158   // test for server 1 within a static schedule: multiProcEvalFlag is implied
2159   // since evalCommRank 0 is running the iterator/job schedulers
2160   bool peer_server1 = (!ieDedMasterFlag && evalServerId == 1);
2161 
2162   if (asynchLocalEvalConcurrency > 1) {
2163     if (peer_server1) serve_evaluations_asynch_peer();
2164     else              serve_evaluations_asynch();
2165   }
2166   else {
2167     if (peer_server1) serve_evaluations_synch_peer();
2168     else              serve_evaluations_synch();
2169   }
2170 }
2171 
2172 
2173 /** This code is invoked by serve_evaluations() to perform one synchronous
2174     job at a time on each slave/peer server.  The servers receive requests
2175     (blocking receive), do local synchronous maps, and return results.
2176     This is done continuously until a termination signal is received from
2177     the master (sent via stop_evaluation_servers()). */
serve_evaluations_synch()2178 void ApplicationInterface::serve_evaluations_synch()
2179 {
2180   // update class member eval id for usage on iteratorCommRank!=0 processors
2181   // (Use case: special logic within derived direct interface plug-ins)
2182   currEvalId = 1;
2183   MPI_Status status; // holds source, tag, and number received in MPI_Recv
2184   MPI_Request request = MPI_REQUEST_NULL; // bypass MPI_Wait on first pass
2185   MPIPackBuffer send_buffer(lenResponseMessage); // prevent dealloc @loop end
2186   while (currEvalId) {
2187     MPIUnpackBuffer recv_buffer(lenVarsActSetMessage);
2188     // blocking receive of x & set
2189     if (evalCommRank == 0) { // 1-level or local comm. leader in 2-level
2190       parallelLib.recv_ie(recv_buffer, 0, MPI_ANY_TAG, status);
2191       currEvalId = status.MPI_TAG;
2192     }
2193     if (multiProcEvalFlag) { // multilevel must Bcast x/set over evalComm
2194       parallelLib.bcast_e(currEvalId);
2195       if (currEvalId)
2196         parallelLib.bcast_e(recv_buffer);
2197     }
2198 
2199     if (currEvalId) { // currEvalId = 0 is the termination signal
2200 
2201       // could slave's Model::currentVariables be used instead?
2202       // (would remove need to pass vars flags in MPI buffers)
2203       Variables vars; ActiveSet set;
2204       recv_buffer >> vars >> set;
2205 
2206 #ifdef MPI_DEBUG
2207       Cout << "Slave receives vars/set buffer which unpacks to:\n" << vars
2208            << "Active set vector = { ";
2209       array_write_annotated(Cout, set.request_vector(), false);
2210       Cout << "} Deriv values vector = { ";
2211       array_write_annotated(Cout, set.derivative_vector(), false);
2212       Cout << '}' << std::endl;
2213 #endif // MPI_DEBUG
2214 
2215       Response local_response(sharedRespData, set); // special constructor
2216 
2217       // slaves invoke derived_map to avoid repeating overhead of map fn.
2218       try { derived_map(vars, set, local_response, currEvalId); } // synch local
2219 
2220       catch(const FunctionEvalFailure& fneval_except) {
2221         //Cerr<< "Slave has caught exception from local derived_map; message: "
2222 	//<< fneval_except.what() << std::endl;
2223         manage_failure(vars, set, local_response, currEvalId);
2224       }
2225 
2226       // bypass MPI_Wait the 1st time through, since there has been no Isend
2227       if (request != MPI_REQUEST_NULL) // MPI_REQUEST_NULL = -1 IBM, 0 elsewhere
2228         parallelLib.wait(request, status); // Removes need for send_bufs array
2229         // by assuring that send_buffer is undisturbed prior to receipt by
2230         // master.  This design should work well for expensive fn. evals., but
2231         // the master might become overloaded for many inexpensive evals.
2232 
2233       // Return local_response data.  Isend allows execution of next job to
2234       // start (if master has posted several jobs for this slave -- e.g., a
2235       // static schedule) prior to master receiving the current results.  This
2236       // allows the response to be overwritten, but send_buffer overwrite is
2237       // prevented by MPI_Wait.
2238       if (evalCommRank == 0) { // 1-level or local comm. leader in 2-level
2239         send_buffer.reset();
2240         send_buffer << local_response;
2241         parallelLib.isend_ie(send_buffer, 0, currEvalId, request);
2242       }
2243     }
2244   }
2245 }
2246 
2247 
2248 /** This code is invoked by serve_evaluations() to perform a synchronous
2249     evaluation in coordination with the iteratorCommRank 0 processor
2250     (the iterator) for static schedules.  The bcast() matches either the
2251     bcast() in synchronous_local_evaluations(), which is invoked by
2252     peer_static_schedule_evaluations(), or the bcast() in map(). */
serve_evaluations_synch_peer()2253 void ApplicationInterface::serve_evaluations_synch_peer()
2254 {
2255   // update class member eval id for usage on iteratorCommRank!=0 processors
2256   // (Use case: special logic within derived direct interface plug-ins)
2257   currEvalId = 1;
2258   while (currEvalId) {
2259     parallelLib.bcast_e(currEvalId); // incoming from iterator
2260 
2261     if (currEvalId) { // currEvalId = 0 is the termination signal
2262 
2263       MPIUnpackBuffer recv_buffer(lenVarsActSetMessage);
2264       parallelLib.bcast_e(recv_buffer); // incoming from iterator
2265 
2266       Variables vars; ActiveSet set;
2267       recv_buffer >> vars >> set;
2268 
2269 #ifdef MPI_DEBUG
2270       Cout << "Peer receives vars/set buffer which unpacks to:\n" << vars
2271            << "Active set vector = { ";
2272       array_write_annotated(Cout, set.request_vector(), false);
2273       Cout << "} Deriv values vector = { ";
2274       array_write_annotated(Cout, set.derivative_vector(), false);
2275       Cout << '}' << std::endl;
2276 #endif // MPI_DEBUG
2277 
2278       Response local_response(sharedRespData, set); // special constructor
2279 
2280       // slaves invoke derived_map to avoid repeating overhead of map fn.
2281       try { derived_map(vars, set, local_response, currEvalId); } //synch local
2282 
2283       catch(const FunctionEvalFailure& fneval_except) {
2284         //Cerr<< "Slave has caught exception from local derived_map; message: "
2285 	//<< fneval_except.what() << std::endl;
2286         manage_failure(vars, set, local_response, currEvalId);
2287       }
2288     }
2289   }
2290 }
2291 
2292 
2293 /** This code is invoked by serve_evaluations() to perform multiple
2294     asynchronous jobs on each slave/peer server.  The servers test for
2295     any incoming jobs, launch any new jobs, process any completed jobs,
2296     and return any results.  Each of these components is nonblocking,
2297     although the server loop continues until a termination signal is
2298     received from the master (sent via stop_evaluation_servers()).  In
2299     the master-slave case, the master maintains the correct number of
2300     jobs on each slave.  In the static scheduling case, each server is
2301     responsible for limiting concurrency (since the entire static
2302     schedule is sent to the peers at start up). */
serve_evaluations_asynch()2303 void ApplicationInterface::serve_evaluations_asynch()
2304 {
2305   // ---------------------------------------------------------------------------
2306   // multiprocessor system calls, forks, and threads can't share a communicator,
2307   // only synchronous direct interfaces can.  Therefore, this asynch job
2308   // scheduler would not normally need to support a multiprocessor evalComm
2309   // (ApplicationInterface::init_communicators_checks() normally prevents this
2310   // in its check for multiproc evalComm and asynchLocalEvalConcurrency > 1).
2311   // However, direct interface plug-ins can use derived_{synchronize,
2312   // synchronize_nowait} to provide a poor man's batch processing capability,
2313   // so this function has been extended to allow for multiProcEvalFlag to
2314   // accommodate this case.
2315   // ---------------------------------------------------------------------------
2316 
2317   // ----------------------------------------------------------
2318   // Step 1: block on first message before entering while loops
2319   // ----------------------------------------------------------
2320   MPIUnpackBuffer recv_buffer(lenVarsActSetMessage);
2321   MPI_Status status; // holds MPI_SOURCE, MPI_TAG, & MPI_ERROR
2322   int fn_eval_id = 1, num_active = 0;
2323   MPI_Request recv_request = MPI_REQUEST_NULL; // bypass MPI_Test on first pass
2324   if (evalCommRank == 0) // 1-level or local comm. leader in 2-level
2325     parallelLib.recv_ie(recv_buffer, 0, MPI_ANY_TAG, status);
2326 
2327   do { // main loop
2328 
2329     // -------------------------------------------------------------------
2330     // Step 2: check for additional incoming messages & unpack/execute all
2331     //         jobs received
2332     // -------------------------------------------------------------------
2333     int mpi_test_flag = 1;
2334     // num_active < asynchLocalEvalConcurrency check below manages concurrency
2335     // in the static scheduling case (master_dynamic_schedule_evaluations()
2336     // handles this from the master side)
2337     while (mpi_test_flag && fn_eval_id &&
2338            num_active < asynchLocalEvalConcurrency) {
2339       // test for completion (Note: MPI_REQUEST_NULL != 0 on IBM)
2340       if (evalCommRank == 0 && recv_request != MPI_REQUEST_NULL)
2341 	parallelLib.test(recv_request, mpi_test_flag, status);
2342       if (multiProcEvalFlag)
2343 	parallelLib.bcast_e(mpi_test_flag);
2344       // if test indicates a completion: unpack, execute, & repost
2345       if (mpi_test_flag) {
2346 
2347 	if (evalCommRank == 0)
2348 	  fn_eval_id = status.MPI_TAG;
2349 	if (multiProcEvalFlag)
2350 	  parallelLib.bcast_e(fn_eval_id);
2351 
2352         if (fn_eval_id) {
2353 	  launch_asynch_local(recv_buffer, fn_eval_id); ++num_active;
2354 	  // repost
2355 	  if (evalCommRank == 0)
2356 	    parallelLib.irecv_ie(recv_buffer, 0, MPI_ANY_TAG, recv_request);
2357 	}
2358       }
2359     }
2360 
2361     // -----------------------------------------------------------------
2362     // Step 3: check for any completed jobs and return results to master
2363     // -----------------------------------------------------------------
2364     if (num_active > 0) {
2365       completionSet.clear();
2366       test_local_evaluations(asynchLocalActivePRPQueue);//rebuilds completionSet
2367       num_active -= completionSet.size();
2368       PRPQueueIter q_it;
2369       for (ISCIter id_iter = completionSet.begin();
2370 	   id_iter != completionSet.end(); ++id_iter) {
2371         int completed_eval_id = *id_iter;
2372 	q_it = lookup_by_eval_id(asynchLocalActivePRPQueue, completed_eval_id);
2373 	if (q_it == asynchLocalActivePRPQueue.end()) {
2374 	  Cerr << "Error: failure in queue lookup within ApplicationInterface::"
2375 	       << "serve_evaluations_asynch()." << std::endl;
2376 	  abort_handler(-1);
2377 	}
2378 	else {
2379 	  if (evalCommRank == 0) {
2380 	    // In this case, use a blocking send to avoid having to manage waits
2381 	    // on multiple send buffers (which would be a pain since the number
2382 	    // of sendBuffers would vary with completionSet length).  The eval
2383 	    // scheduler processor should have pre-posted corresponding recv's.
2384 	    MPIPackBuffer send_buffer(lenResponseMessage);
2385 	    send_buffer << q_it->response();
2386 	    parallelLib.send_ie(send_buffer, 0, completed_eval_id);
2387 	  }
2388 	  asynchLocalActivePRPQueue.erase(q_it);
2389 	}
2390       }
2391     }
2392 
2393   } while (fn_eval_id || num_active > 0);
2394 
2395   //clear_bookkeeping(); // clear any bookkeeping lists in derived classes
2396 }
2397 
2398 
2399 /** This code is invoked by serve_evaluations() to perform multiple
2400     asynchronous jobs on multiprocessor slave/peer servers.  It
2401     matches the multiProcEvalFlag bcasts in
2402     ApplicationInterface::asynchronous_local_evaluations(). */
serve_evaluations_asynch_peer()2403 void ApplicationInterface::serve_evaluations_asynch_peer()
2404 {
2405   MPIUnpackBuffer recv_buffer(lenVarsActSetMessage);
2406   int fn_eval_id = 1, num_jobs;
2407   size_t num_active = 0, num_launch = 0, num_completed;
2408 
2409   // TO DO: redesign this logic for greater generality.
2410   // ONE OPTION: MOVE THIS INSIDE LOOP AND SYNC #JOBS THIS PASS
2411   // > num_jobs could be inefficient to determine a priori (static_limited)
2412   // MORE ATTRACTIVE OPTION: ELIMINATE BCAST & HAVE PEER MASTER SEND TERM ASAP
2413   // > straightforward for asynchronous_local_evaluations()
2414   // > asynch_local_evaluations_nowait() presents a problem (it can be active in
2415   //   the multiProcEvalFlag peer asynch context...) since the job queue can
2416   //   dynamically expand/contract across multiple invocations and we want the
2417   //   server to remain up --> may need a different implementation for
2418   //   serve_evaluations_asynch_peer_nowait()?
2419   parallelLib.bcast_e(num_jobs);
2420 
2421   do { // main loop
2422 
2423     // TO DO: generalize for nowait uses where num_jobs expands/shrinks
2424     // Preferred approach is to send term code more immediately...
2425 
2426     // -------------------------------------------------------------------
2427     // check for incoming messages & unpack/execute all jobs received
2428     // -------------------------------------------------------------------
2429     while (num_active < asynchLocalEvalConcurrency && num_launch < num_jobs) {
2430       parallelLib.bcast_e(fn_eval_id);
2431       if (fn_eval_id) {
2432 	launch_asynch_local(recv_buffer, fn_eval_id);
2433 	++num_active; ++num_launch;
2434       }
2435     }
2436 
2437     // -----------------------------------------------------------------
2438     // check for any completed jobs and return results to master
2439     // -----------------------------------------------------------------
2440     if (num_active > 0) {
2441       completionSet.clear();
2442       test_local_evaluations(asynchLocalActivePRPQueue);//rebuilds completionSet
2443       num_completed = completionSet.size();
2444       if (num_completed == num_active)
2445 	{ num_active = 0; asynchLocalActivePRPQueue.clear(); }
2446       else {
2447 	num_active -= num_completed; //num_jobs_remaining -= num_completed;
2448 	PRPQueueIter q_it; ISCIter id_it;
2449 	for (id_it=completionSet.begin(); id_it!=completionSet.end(); ++id_it) {
2450 	  q_it = lookup_by_eval_id(asynchLocalActivePRPQueue, *id_it);
2451 	  if (q_it == asynchLocalActivePRPQueue.end()) {
2452 	    Cerr << "Error: failure in queue lookup within ApplicationInterface"
2453 		 << "::serve_evaluations_asynch_peer()." << std::endl;
2454 	    abort_handler(-1);
2455 	  }
2456 	  else
2457 	    asynchLocalActivePRPQueue.erase(q_it);
2458 	}
2459       }
2460     }
2461     else // catch termination code (short term fix)
2462       parallelLib.bcast_e(fn_eval_id);
2463 
2464   } while (fn_eval_id || num_active > 0);
2465 
2466   //clear_bookkeeping(); // clear any bookkeeping lists in derived classes
2467 }
2468 
2469 
2470 /** This code is executed on the iteratorComm rank 0 processor when
2471     iteration on a particular model is complete.  It sends a
2472     termination signal (tag = 0 instead of a valid fn_eval_id) to each
2473     of the slave analysis servers.  NOTE: This function is called from
2474     the Strategy layer even when in serial mode.  Therefore, use
2475     iteratorCommSize to provide appropriate fall through behavior. */
stop_evaluation_servers()2476 void ApplicationInterface::stop_evaluation_servers()
2477 {
2478   if (iteratorCommSize > 1) {
2479     if (!ieDedMasterFlag) {
2480       if (outputLevel > NORMAL_OUTPUT)
2481 	Cout << "Peer 1 stopping" << std::endl;
2482 
2483       // TO DO: deactivate this block
2484       if (multiProcEvalFlag) {// stop serve_evaluation_{,a}synch_peer procs
2485         int fn_eval_id = 0;
2486         parallelLib.bcast_e(fn_eval_id);
2487       }
2488 
2489     }
2490     MPIPackBuffer send_buffer(0); // empty buffer
2491     MPI_Request send_request;
2492     int server_id, term_tag = 0; // triggers termination
2493     // Peer partitions have one fewer interComm from server 1 to servers 2-n,
2494     // relative to ded master partitions which have interComms from server 0
2495     // to servers 1-n.
2496     int end = (ieDedMasterFlag) ? numEvalServers+1 : numEvalServers;
2497     for (server_id=1; server_id<end; ++server_id) {
2498       // stop serve_evaluation_{synch,asynch} procs
2499       if (outputLevel > NORMAL_OUTPUT) {
2500 	if (ieDedMasterFlag)
2501 	  Cout << "Master stopping server " << server_id << std::endl;
2502 	else
2503 	  Cout << "Peer " << server_id+1 << " stopping" << std::endl;
2504       }
2505       // nonblocking sends: master posts all terminate messages without waiting
2506       // for completion.  Bcast cannot be used since all procs must call it and
2507       // slaves are using Recv/Irecv in serve_evaluation_synch/asynch.
2508       parallelLib.isend_ie(send_buffer, server_id, term_tag, send_request);
2509       parallelLib.free(send_request); // no test/wait on send_request
2510     }
2511     // if the communicator partitioning resulted in a trailing partition of
2512     // idle processors due to a partitioning remainder (caused by strictly
2513     // honoring a processors_per_server override), then these are not
2514     // included in numEvalServers and we quietly free them separately.
2515     // We assume a single server with all idle processors and a valid
2516     // inter-communicator (enforced during split).
2517     // > dedicated master: server_id = #servers+1 -> interComm[#servers]
2518     // > peer:             server_id = #servers   -> interComm[#servers-1]
2519     if (parallelLib.parallel_configuration().ie_parallel_level().
2520 	idle_partition()) {
2521       parallelLib.isend_ie(send_buffer, server_id, term_tag, send_request);
2522       parallelLib.free(send_request); // no test/wait on send_request
2523     }
2524   }
2525 }
2526 
2527 
2528 // --------------------------------------------------
2529 // Schedulers for concurrent analyses within fn evals
2530 // --------------------------------------------------
2531 /** This code is called from derived classes to provide the master
2532     portion of a master-slave algorithm for the dynamic scheduling of
2533     analyses among slave servers.  It is patterned after
2534     master_dynamic_schedule_evaluations(). It performs no analyses
2535     locally and matches either serve_analyses_synch() or
2536     serve_analyses_asynch() on the slave servers, depending on the
2537     value of asynchLocalAnalysisConcurrency.  Dynamic scheduling
2538     assigns jobs in 2 passes.  The 1st pass gives each server the same
2539     number of jobs (equal to asynchLocalAnalysisConcurrency).  The 2nd
2540     pass assigns the remaining jobs to slave servers as previous jobs
2541     are completed.  Single- and multilevel parallel use intra- and
2542     inter-communicators, respectively, for send/receive.  Specific
2543     syntax is encapsulated within ParallelLibrary. */
master_dynamic_schedule_analyses()2544 void ApplicationInterface::master_dynamic_schedule_analyses()
2545 {
2546   int capacity  = (asynchLocalAnalysisConcurrency > 1) ?
2547                    asynchLocalAnalysisConcurrency*numAnalysisServers :
2548                    numAnalysisServers;
2549   int num_sends = (capacity < numAnalysisDrivers) ?
2550                    capacity : numAnalysisDrivers;
2551 #ifdef MPI_DEBUG
2552   Cout << "First pass: assigning " << num_sends << " analyses among "
2553        << numAnalysisServers << " servers\n";
2554 #endif // MPI_DEBUG
2555   MPI_Request  send_request; // only 1 needed since no test/wait on sends
2556   int*         rtn_codes     = new int [num_sends];
2557   MPI_Request* recv_requests = new MPI_Request [num_sends];
2558   int i, server_id, analysis_id;
2559   for (i=0; i<num_sends; ++i) { // send data & post recvs for 1st pass
2560     server_id = i%numAnalysisServers + 1; // from 1 to numAnalysisServers
2561     analysis_id = i+1;
2562 #ifdef MPI_DEBUG
2563     Cout << "Master assigning analysis " << analysis_id << " to server "
2564          << server_id << '\n';
2565 #endif // MPI_DEBUG
2566     // pre-post receives.
2567     parallelLib.irecv_ea(rtn_codes[i], server_id, analysis_id,
2568                          recv_requests[i]);
2569     parallelLib.isend_ea(analysis_id, server_id, analysis_id, send_request);
2570     parallelLib.free(send_request); // no test/wait on send_request
2571   }
2572   if (num_sends < numAnalysisDrivers) { // schedule remaining analyses
2573 #ifdef MPI_DEBUG
2574     Cout << "Second pass: scheduling " << numAnalysisDrivers-num_sends
2575          << " remaining analyses\n";
2576 #endif // MPI_DEBUG
2577     int send_cntr = num_sends, recv_cntr = 0, out_count;
2578     MPI_Status* status_array = new MPI_Status [num_sends];
2579     int*        index_array  = new int [num_sends];
2580     while (recv_cntr < numAnalysisDrivers) {
2581 #ifdef MPI_DEBUG
2582       Cout << "Waiting on completed analyses" << std::endl;
2583 #endif // MPI_DEBUG
2584       parallelLib.waitsome(num_sends, recv_requests, out_count, index_array,
2585 			   status_array);
2586       recv_cntr += out_count;
2587       for (i=0; i<out_count; ++i) {
2588         int index = index_array[i]; // index of recv_request that completed
2589         server_id = index%numAnalysisServers + 1;// from 1 to numAnalysisServers
2590 #ifdef MPI_DEBUG
2591         Cout << "analysis " << status_array[i].MPI_TAG
2592              <<" has returned from slave server " << server_id << '\n';
2593 #endif // MPI_DEBUG
2594         if (send_cntr < numAnalysisDrivers) {
2595           analysis_id = send_cntr+1;
2596 #ifdef MPI_DEBUG
2597           Cout << "Master assigning analysis " << analysis_id << " to server "
2598                << server_id << '\n';
2599 #endif // MPI_DEBUG
2600 	  // pre-post receives
2601           parallelLib.irecv_ea(rtn_codes[index], server_id, analysis_id,
2602                                recv_requests[index]);
2603           parallelLib.isend_ea(analysis_id, server_id, analysis_id,
2604                                send_request);
2605           parallelLib.free(send_request); // no test/wait on send_request
2606           ++send_cntr;
2607         }
2608       }
2609     }
2610     delete [] status_array;
2611     delete [] index_array;
2612   }
2613   else { // all analyses assigned in first pass
2614 #ifdef MPI_DEBUG
2615     Cout << "Waiting on all analyses" << std::endl;
2616 #endif // MPI_DEBUG
2617     parallelLib.waitall(numAnalysisDrivers, recv_requests);
2618   }
2619   delete [] rtn_codes;
2620   delete [] recv_requests;
2621 
2622   // Unlike ApplicationInterface::master_dynamic_schedule_evaluations() (which
2623   // terminates servers only when the iterator/model is complete), terminate
2624   // servers now so that they can return from derived_map to the higher level.
2625   analysis_id = 0; // termination flag for servers
2626   for (i=0; i<numAnalysisServers; ++i) {
2627     parallelLib.isend_ea(analysis_id, i+1, 0, send_request);
2628     parallelLib.free(send_request); // no test/wait on send_request
2629   }
2630 }
2631 
2632 
2633 /** This code is called from derived classes to run synchronous
2634     analyses on slave processors.  The slaves receive requests
2635     (blocking receive), do local derived_map_ac's, and return codes.
2636     This is done continuously until a termination signal is received
2637     from the master.  It is patterned after
2638     serve_evaluations_synch(). */
serve_analyses_synch()2639 void ApplicationInterface::serve_analyses_synch()
2640 {
2641   int analysis_id = 1;
2642   MPI_Status status; // holds source, tag, and number received in MPI_Recv
2643   MPI_Request request = MPI_REQUEST_NULL; // bypass MPI_Wait on first pass
2644   while (analysis_id) {
2645     // blocking receive of analysis id
2646     if (analysisCommRank == 0) // 1-level or comm. leader in 2-level (DirectFn)
2647       parallelLib.recv_ea(analysis_id, 0, MPI_ANY_TAG, status);
2648     // NOTE: could get analysis_id from status.MPI_TAG & receive something
2649     //       else useful in 1st integer slot.  One possibility: tag with
2650     //       (fn_eval_id-1)*numAnalysisDrivers+analysis_id to prevent any
2651     //       possible tag overlap with message traffic at other parallelism
2652     //       levels.  However, I think the use of communicators prevent these
2653     //       types of errors.
2654     if (multiProcAnalysisFlag) // must Bcast over analysisComm
2655       parallelLib.bcast_a(analysis_id);
2656 
2657     if (analysis_id) { // analysis_id = 0 is the termination signal
2658 
2659       int rtn_code = synchronous_local_analysis(analysis_id);
2660 
2661       if (request != MPI_REQUEST_NULL) // MPI_REQUEST_NULL = -1 IBM, 0 elsewhere
2662         parallelLib.wait(request, status); // block until prev isend completes
2663 
2664       // Return the simulation failure code and tag with analysis_id
2665       // NOTE: the rtn_code is not currently used for anything on the master
2666       //       since failure capturing exceptions are captured in
2667       //       read_results_files (sys call, fork) or derived_map_ac (direct).
2668       //       As for recv_ea above, it could be replaced with something else.
2669       if (analysisCommRank == 0)// 1-level or comm. leader in 2-level (DirectFn)
2670         parallelLib.isend_ea(rtn_code, 0, analysis_id, request);
2671     }
2672   }
2673 }
2674 
2675 
2676 // -----------------------------------------
2677 // Routines for managing simulation failures
2678 // -----------------------------------------
2679 void ApplicationInterface::
manage_failure(const Variables & vars,const ActiveSet & set,Response & response,int failed_eval_id)2680 manage_failure(const Variables& vars, const ActiveSet& set, Response& response,
2681 	       int failed_eval_id)
2682 {
2683   // SysCall/Fork application interface exception handling:
2684   // When a simulation failure is detected w/i
2685   // Response::read(istream), a FunctionEvalFailure exception is
2686   // thrown which is first caught by catch(FunctionEvalFailure) within
2687   // derived_map or wait_local_evaluations, depending on whether the
2688   // simulation was synch or asynch.  In the case of derived_map, it
2689   // rethrows the exception to an outer catch in map or serve which
2690   // then invokes manage_failure.  In the case of
2691   // wait_local_evaluations, it invokes manage_failure directly.
2692   // manage_failure can then catch subsequent exceptions resulting
2693   // from the try blocks below.
2694 
2695   // DirectFn application interface exception handling:
2696   // The DirectFn case is simpler than SysCall since (1) synch case: the
2697   // exception can be thrown directly from derived_map to map or serve which
2698   // then invokes manage_failure (which then catches additional exceptions
2699   // thrown directly from derived_map), or (2) asynch case:
2700   // wait_local_evaluations throws no exceptions, but invokes manage_failure
2701   // directly.
2702 
2703   if (failAction == "retry") {
2704     //retry(vars, set, response, failRetryLimit);
2705     int retries = 0;
2706     bool fail_flag = 1; // allow 1st pass through the while test
2707     while (fail_flag) {
2708       fail_flag = 0; // reset each time prior to derived_map
2709       ++retries;
2710       Cout << failureMessage << ": retry attempt " << retries << "/" <<
2711         failRetryLimit << " for evaluation " << failed_eval_id << ".\n";
2712       try { derived_map(vars, set, response, failed_eval_id); }
2713       catch(const FunctionEvalFailure& fneval_except) {
2714         //Cout << "Caught FunctionEvalFailure in manage_failure; message: "
2715 	// fneval_except.what() << std::endl;
2716         fail_flag = 1;
2717 	if (retries >= failRetryLimit) {
2718 	  Cerr << "Retry limit exceeded for evaluation " << failed_eval_id <<
2719             ".  Aborting..." << std::endl;
2720           abort_handler(INTERFACE_ERROR);
2721 	}
2722       }
2723     }
2724   }
2725   else if (failAction == "recover") {
2726     Cout << failureMessage << ": recovering with specified function values " <<
2727       "for evaluation " << failed_eval_id << ".\n";
2728     if (failRecoveryFnVals.length() != response.num_functions() ) {
2729       Cerr << "Error: length of recovery function values specification\n"
2730            << "       must equal the total number of functions." << std::endl;
2731       abort_handler(-1);
2732     }
2733     // reset response to avoid bleeding over of derivatives from previous eval.
2734     response.reset();
2735     // Set specified recovery function values
2736     response.function_values(failRecoveryFnVals);
2737   }
2738   else if (failAction == "continuation") {
2739     // Compute closest source pt. for continuation from extern data_pairs.
2740     ParamResponsePair source_pair;
2741     // THIS CODE BLOCK IS A PLACEHOLDER AND IS NOT YET OPERATIONAL
2742     if (iteratorCommRank) { // if other than master
2743       // Get source pt. for continuation.  Master calls get_source_point for
2744       // slave (since it has access to data_pairs) and returns result.
2745       MPIPackBuffer send_buffer(lenVarsMessage);
2746       send_buffer << vars;
2747       // master must receive failure message w/i *_schedule_evaluations
2748       // after MPI_Waitany or MPI_Waitall.  Use tag < 0 ?
2749       parallelLib.send_ie(send_buffer, 0, failed_eval_id);
2750       MPIUnpackBuffer recv_buffer(lenPRPairMessage);
2751       MPI_Status recv_status;
2752       parallelLib.recv_ie(recv_buffer, 0, failed_eval_id, recv_status);
2753       recv_buffer >> source_pair;
2754     }
2755     else
2756       source_pair = get_source_pair(vars); // also the serial case
2757 
2758     Cout << '\n' << failureMessage << ": halving interval and retrying " <<
2759      "evaluation " << failed_eval_id << "." << std::endl;
2760 
2761     // Now that source pt. is available, call the continuation algorithm.
2762     // Mimic retry case in use of failed_eval_id to manage file names.
2763 
2764     continuation(vars, set, response, source_pair, failed_eval_id);
2765 
2766   }
2767   else { // default is abort
2768     Cerr << failureMessage << ": aborting due to failure in evaluation " <<
2769       failed_eval_id << "..." << std::endl;
2770     abort_handler(INTERFACE_ERROR);
2771   }
2772 }
2773 
2774 
2775 const ParamResponsePair&
get_source_pair(const Variables & target_vars)2776 ApplicationInterface::get_source_pair(const Variables& target_vars)
2777 {
2778   if (data_pairs.size() == 0) {
2779     Cerr << "Failure captured: No points available, aborting" << std::endl;
2780     abort_handler(-1);
2781   }
2782 
2783   // TO DO: should check both continuous_variables and discrete_variables
2784   const RealVector& xc_target = target_vars.continuous_variables();
2785 
2786   size_t i, num_vars = xc_target.length();
2787   Real best_sos = DBL_MAX;
2788 
2789   // TO DO: Need to check for same interfaceId as well.  Currently, this is
2790   // part of Response -> need to add to Interface.
2791   PRPCacheCIter prp_iter, prp_end_iter = data_pairs.end(), best_iter;
2792   for (prp_iter = data_pairs.begin(); prp_iter != prp_end_iter; ++prp_iter) {
2793     //if (interfaceId == prp_iter->interface_id()) {
2794       const RealVector& xc_source
2795 	= prp_iter->variables().continuous_variables();
2796       Real sum_of_squares = 0.;
2797       for (i=0; i<num_vars; ++i)
2798         sum_of_squares += std::pow( xc_source[i] - xc_target[i], 2);
2799       if (prp_iter == data_pairs.begin() || sum_of_squares < best_sos) {
2800         best_iter = prp_iter;
2801         best_sos  = sum_of_squares;
2802       }
2803     //}
2804   }
2805 
2806   // Desired implementation:
2807   //return *best_iter;
2808 
2809   // For now, this asks the least of the simulation management:
2810   --prp_iter; // last PRPair is one back from end()
2811   return *prp_iter;
2812   // Both the variables and the response (in a PRPair) must be returned since
2813   // the variables alone are not enough to enact the continuation.  The
2814   // corresponding response must be used as the simulator initial guess.
2815 }
2816 
2817 
2818 void ApplicationInterface::
continuation(const Variables & target_vars,const ActiveSet & set,Response & response,const ParamResponsePair & source_pair,int failed_eval_id)2819 continuation(const Variables& target_vars, const ActiveSet& set,
2820 	     Response& response, const ParamResponsePair& source_pair,
2821 	     int failed_eval_id)
2822 {
2823   // TO DO: should use both continuous_variables and discrete_variables
2824   const Variables& source_vars = source_pair.variables();
2825   const RealVector& source_pt = source_vars.continuous_variables();
2826   const RealVector& target_pt = target_vars.continuous_variables();
2827 
2828   // copy() required to avoid modifying variables in data_pairs entry (standard
2829   // operator= behavior would have source_vars sharing a variablesRep with the
2830   // identified data_pairs entry).
2831   Variables current_vars = source_vars.copy();
2832 
2833   size_t i, num_cv = source_pt.length();
2834   short  failures = 1, MAX_FAILURES = 10;
2835   bool   target_reached = false;
2836   float  EPS = 1.0e-10;
2837 
2838   RealVector current_pt(num_cv, 0.), delta(num_cv, 0.);
2839   for (i=0; i<num_cv; ++i) {
2840     // Define delta to be half the distance to the target
2841     delta[i] = (target_pt[i] - source_pt[i])/2.;
2842     current_pt[i] = source_pt[i] + delta[i];
2843   }
2844 
2845   while (!target_reached) {
2846 
2847     // Perform the intermediate function evaluation
2848     current_vars.continuous_variables(current_pt);
2849 
2850     // TO DO: the simulation must be seeded with the response corresponding to
2851     // the (current) source information.  Thus, the current implementation
2852     // only works for best_iter from get_source_point = last evaluation.
2853     //write_continuation_seed_file(source_response);
2854 
2855     bool fail_flag = 0;
2856     try { derived_map(current_vars, set, response, failed_eval_id); }
2857     catch(const FunctionEvalFailure& fneval_except) {
2858       fail_flag = 1;
2859     }
2860 
2861     if (fail_flag) {
2862       Cout << "\nInterval halving attempt " << failures << " for evaluation " <<
2863         failed_eval_id << " failed. Halving again." << std::endl;
2864       ++failures;
2865       if (failures > MAX_FAILURES) {
2866 	Cerr << "\n\nInterval halving limit exceeded in continuation for " <<
2867           "evaluation " << failed_eval_id << ": " << "aborting..." << std::endl;
2868 	abort_handler(INTERFACE_ERROR);
2869       }
2870 
2871       // take a half-step back from the failed current point (and don't update
2872       // source_vars -> keep the same seed file)
2873       for (i=0; i<num_cv; ++i) {
2874 	delta[i] /= 2.0;
2875 	current_pt[i] -= delta[i];
2876       }
2877     }
2878     else {
2879       Cout << "\nEvaluation succeeded.\nContinuing with current step size " <<
2880         "for evaluation " << failed_eval_id << "." << std::endl;
2881       // Possibly append new continuation evals. to data_pairs list (?)
2882 
2883       if (current_pt == target_pt) // operator== in data_types.cpp
2884         target_reached = true;
2885       else {
2886         for (i=0; i<num_cv; ++i) {
2887           current_pt[i] += delta[i];
2888           if (std::fabs(1.0 - current_pt[i]/target_pt[i]) < EPS)
2889             current_pt[i] = target_pt[i]; // make sure operator== is within
2890                                           // DBL_EPSILON tolerance
2891         }
2892         // TO DO: update source response for seed file
2893       }
2894     }
2895   }
2896   Cout << "Finished with continuation for evaluation " << failed_eval_id <<
2897     "." << std::endl;
2898 }
2899 
2900 
2901 // NOTE:  The following 3 methods CANNOT be inlined due to linkage errors on
2902 //        native, Windows MSVC builds (strange handling of extern symbols
2903 //        BoStream write_restart and PRPCache data_pairs)
2904 
2905 void ApplicationInterface::
receive_evaluation(PRPQueueIter & prp_it,size_t buff_index,int server_id,bool peer_flag)2906 receive_evaluation(PRPQueueIter& prp_it, size_t buff_index, int server_id,
2907                    bool peer_flag)
2908 {
2909   int fn_eval_id = prp_it->eval_id();
2910   if (outputLevel > SILENT_OUTPUT) {
2911     if (interfaceId.empty() || interfaceId == "NO_ID") Cout << "Evaluation ";
2912     else Cout << interfaceId << " evaluation ";
2913     Cout << fn_eval_id << " has returned from ";
2914     if (peer_flag) Cout << "peer server " << server_id+1 << '\n';
2915     else           Cout << "slave server " << server_id << '\n';
2916   }
2917 
2918 #ifdef MPI_DEBUG
2919   Cout << "receive_evaluation() buff_index = " << buff_index << " fn_eval_id = "
2920        << fn_eval_id << " server_id = " << server_id << std::endl;
2921 #endif // MPI_DEBUG
2922 
2923   // Process incoming buffer from remote server.  Avoid multiple key-value
2924   // lookups.  Incoming response is a lightweight constructed response
2925   // corresponding to a particular ActiveSet.
2926   Response remote_response;
2927   recvBuffers[buff_index] >> remote_response; // lightweight response
2928   // share the rep among between rawResponseMap and the processing queue, but
2929   // don't trample raw response sizing with lightweight remote response
2930   Response raw_response = rawResponseMap[fn_eval_id] = prp_it->response();
2931   raw_response.update(remote_response);
2932 
2933   // insert into restart and eval cache ASAP
2934   if (evalCacheFlag)   data_pairs.insert(*prp_it);
2935   if (restartFileFlag) parallelLib.write_restart(*prp_it);
2936 }
2937 
2938 
process_asynch_local(int fn_eval_id)2939 void ApplicationInterface::process_asynch_local(int fn_eval_id)
2940 {
2941   PRPQueueIter prp_it
2942     = lookup_by_eval_id(asynchLocalActivePRPQueue, fn_eval_id);
2943   if (prp_it == asynchLocalActivePRPQueue.end()) {
2944     Cerr << "Error: failure in eval id lookup in ApplicationInterface::"
2945          << "process_asynch_local()." << std::endl;
2946     abort_handler(-1);
2947   }
2948 
2949   if (outputLevel > SILENT_OUTPUT) {
2950     if (interfaceId.empty() || interfaceId == "NO_ID") Cout << "Evaluation ";
2951     else Cout << interfaceId << " evaluation ";
2952     Cout << fn_eval_id;
2953     if(batchEval) Cout << " (batch " << batchIdCntr << ")";
2954     Cout << " has completed\n";
2955   }
2956 
2957   rawResponseMap[fn_eval_id] = prp_it->response();
2958   if (evalCacheFlag)   data_pairs.insert(*prp_it);
2959   if (restartFileFlag) parallelLib.write_restart(*prp_it);
2960 
2961   asynchLocalActivePRPQueue.erase(prp_it);
2962   if (asynchLocalEvalStatic && asynchLocalEvalConcurrency > 1) {// free "server"
2963     size_t static_servers = asynchLocalEvalConcurrency * numEvalServers,
2964       server_index = (fn_eval_id - 1) % static_servers;
2965     localServerAssigned.reset(server_index);
2966   }
2967 }
2968 
2969 
process_synch_local(PRPQueueIter & prp_it)2970 void ApplicationInterface::process_synch_local(PRPQueueIter& prp_it)
2971 {
2972   int fn_eval_id = prp_it->eval_id();
2973   if (outputLevel > SILENT_OUTPUT) {
2974     Cout << "Performing ";
2975     if (!(interfaceId.empty() || interfaceId == "NO_ID")) Cout << interfaceId << ' ';
2976     Cout << "evaluation " << fn_eval_id << std::endl;
2977   }
2978   rawResponseMap[fn_eval_id] = prp_it->response();
2979   if (evalCacheFlag)   data_pairs.insert(*prp_it);
2980   if (restartFileFlag) parallelLib.write_restart(*prp_it);
2981 }
2982 
2983 
common_input_filtering(const Variables & vars)2984 void ApplicationInterface::common_input_filtering(const Variables& vars)
2985 { } // empty for now
2986 
2987 
common_output_filtering(Response & response)2988 void ApplicationInterface::common_output_filtering(Response& response)
2989 { } // empty for now
2990 
2991 } // namespace Dakota
2992