1 /* _______________________________________________________________________
2
3 DAKOTA: Design Analysis Kit for Optimization and Terascale Applications
4 Copyright 2014-2020 National Technology & Engineering Solutions of Sandia, LLC (NTESS).
5 This software is distributed under the GNU Lesser General Public License.
6 For more information, see the README file in the top Dakota directory.
7 _______________________________________________________________________ */
8
9 //- Class: ApplicationInterface
10 //- Description: Implementation of base class for application interfaces
11 //- Owner: Mike Eldred
12
13 #include "dakota_system_defs.hpp"
14 #include "ApplicationInterface.hpp"
15 #include "ParamResponsePair.hpp"
16 #include "ProblemDescDB.hpp"
17 #include "ParallelLibrary.hpp"
18
19 namespace Dakota {
20
21 extern PRPCache data_pairs;
22
23 ApplicationInterface::
ApplicationInterface(const ProblemDescDB & problem_db)24 ApplicationInterface(const ProblemDescDB& problem_db):
25 Interface(BaseConstructor(), problem_db),
26 parallelLib(problem_db.parallel_library()),
27 batchEval(problem_db.get_bool("interface.batch")),
28 asynchFlag(problem_db.get_bool("interface.asynch")),
29 batchIdCntr(0),
30 suppressOutput(false), evalCommSize(1), evalCommRank(0), evalServerId(1),
31 eaDedMasterFlag(false), analysisCommSize(1), analysisCommRank(0),
32 analysisServerId(1), multiProcAnalysisFlag(false),
33 asynchLocalAnalysisFlag(false),
34 asynchLocalEvalConcSpec(
35 problem_db.get_int("interface.asynch_local_evaluation_concurrency")),
36 asynchLocalAnalysisConcSpec(
37 problem_db.get_int("interface.asynch_local_analysis_concurrency")),
38 numAnalysisDrivers(
39 problem_db.get_sa("interface.application.analysis_drivers").size()),
40 failureMessage("Failure captured"),
41 worldSize(parallelLib.world_size()), worldRank(parallelLib.world_rank()),
42 iteratorCommSize(1), iteratorCommRank(0), ieMessagePass(false),
43 numEvalServersSpec(problem_db.get_int("interface.evaluation_servers")),
44 procsPerEvalSpec(problem_db.get_int("interface.processors_per_evaluation")),
45 eaMessagePass(false),
46 numAnalysisServersSpec(problem_db.get_int("interface.analysis_servers")),
47 procsPerAnalysisSpec(
48 problem_db.get_int("interface.direct.processors_per_analysis")),
49 lenVarsMessage(0), lenVarsActSetMessage(0), lenResponseMessage(0),
50 lenPRPairMessage(0),
51 evalScheduling(problem_db.get_short("interface.evaluation_scheduling")),
52 analysisScheduling(problem_db.get_short("interface.analysis_scheduling")),
53 asynchLocalEvalStatic(
54 problem_db.get_short("interface.local_evaluation_scheduling") ==
55 STATIC_SCHEDULING),
56 interfaceSynchronization(
57 (batchEval | asynchFlag) ?
58 ASYNCHRONOUS_INTERFACE : SYNCHRONOUS_INTERFACE
59 ),
60 headerFlag(true),
61 asvControlFlag(problem_db.get_bool("interface.active_set_vector")),
62 evalCacheFlag(problem_db.get_bool("interface.evaluation_cache")),
63 nearbyDuplicateDetect(
64 problem_db.get_bool("interface.nearby_evaluation_cache")),
65 nearbyTolerance(
66 problem_db.get_real("interface.nearby_evaluation_cache_tolerance")),
67 restartFileFlag(problem_db.get_bool("interface.restart_file")),
68 sharedRespData(SharedResponseData(problem_db)),
69 gradientType(problem_db.get_string("responses.gradient_type")),
70 hessianType(problem_db.get_string("responses.hessian_type")),
71 gradMixedAnalyticIds(
72 problem_db.get_is("responses.gradients.mixed.id_analytic")),
73 hessMixedAnalyticIds(
74 problem_db.get_is("responses.hessians.mixed.id_analytic")),
75 failAction(problem_db.get_string("interface.failure_capture.action")),
76 failRetryLimit(problem_db.get_int("interface.failure_capture.retry_limit")),
77 failRecoveryFnVals(
78 problem_db.get_rv("interface.failure_capture.recovery_fn_vals")),
79 sendBuffers(NULL), recvBuffers(NULL), recvRequests(NULL)
80 {
81 // set coreMappings flag based on presence of analysis_drivers specification
82 coreMappings = (numAnalysisDrivers > 0);
83 if (!coreMappings && !algebraicMappings && interfaceType > DEFAULT_INTERFACE){
84 Cerr << "\nError: no parameter to response mapping defined in "
85 << "ApplicationInterface.\n" << std::endl;
86 abort_handler(-1);
87 }
88 }
89
90
~ApplicationInterface()91 ApplicationInterface::~ApplicationInterface()
92 { }
93
94
95 void ApplicationInterface::
init_communicators(const IntArray & message_lengths,int max_eval_concurrency)96 init_communicators(const IntArray& message_lengths, int max_eval_concurrency)
97 {
98 // Initialize comms for evaluations (partitions of iteratorComm).
99
100 bool direct_int = (interfaceType & DIRECT_INTERFACE_BIT);
101 // Peer dynamic requires asynch local executions and dynamic job assignment
102 bool peer_dynamic_avail = (!direct_int && !asynchLocalEvalStatic);
103 // min_procs_per_eval captures overrides at the analysis level. This lower
104 // bound is defined bottom up and counters the top-down allocation of
105 // resources. max_procs_per_eval captures available concurrency and explicit
106 // user overrides at the analysis level (user overrides for the evaluation
107 // level can be managed by resolve_inputs()).
108 // > procsPerAnalysisSpec defaults to zero, which is the result when the
109 // processors_per_analysis spec is unreachable (system/fork/spawn)
110 int min_ppa = 1, max_ppa = (direct_int) ? worldSize : 1;
111 int min_procs_per_eval = ProblemDescDB::
112 min_procs_per_level(min_ppa, procsPerAnalysisSpec, numAnalysisServersSpec);
113 int max_procs_per_eval = ProblemDescDB::
114 max_procs_per_level(max_ppa, procsPerAnalysisSpec, numAnalysisServersSpec,
115 analysisScheduling, asynchLocalAnalysisConcSpec, false,
116 std::max(1, numAnalysisDrivers));
117
118 const ParallelLevel& ie_pl = parallelLib.init_evaluation_communicators(
119 numEvalServersSpec, procsPerEvalSpec, min_procs_per_eval,
120 max_procs_per_eval, max_eval_concurrency, asynchLocalEvalConcSpec,
121 PUSH_UP, evalScheduling, peer_dynamic_avail);
122
123 set_evaluation_communicators(message_lengths);
124
125 // Initialize communicators for analyses (partitions of evalComm). This
126 // call is protected from an iterator dedicated master in the same way a
127 // meta-iterator master never calls init_eval_comms (prevents some warnings
128 // in ParallelLibrary::resolve_inputs when there is a user spec that can't
129 // be supported from a single processor). However, a dedicated master can
130 // run a local single-processor job if the algorithm uses a synchronous eval
131 // (see derived_master_overload() usage in Model::evaluate()).
132 if (ieDedMasterFlag && iteratorCommRank == 0 && multiProcEvalFlag)
133 init_serial_analyses();
134 else {
135 const ParallelLevel& ea_pl = parallelLib.init_analysis_communicators(
136 numAnalysisServersSpec, procsPerAnalysisSpec, min_ppa, max_ppa,
137 numAnalysisDrivers, asynchLocalAnalysisConcSpec, PUSH_UP,
138 analysisScheduling, false); // peer_dynamic is not available
139
140 set_analysis_communicators();
141 }
142
143 // print parallel configuration (prior to configuration checking
144 // so that error messages can be more readily debugged)
145 if (worldSize > 1)
146 parallelLib.print_configuration();
147
148 // check for configuration errors
149 init_communicators_checks(max_eval_concurrency);
150 }
151
152
153 void ApplicationInterface::
set_communicators(const IntArray & message_lengths,int max_eval_concurrency)154 set_communicators(const IntArray& message_lengths, int max_eval_concurrency)
155 {
156 set_evaluation_communicators(message_lengths);
157
158 // Initialize communicators for analyses (partitions of evalComm). This call
159 // is protected from an iterator dedicated master in cases where local
160 // evaluations are precluded (see comments in init_communicators()).
161 if (ieDedMasterFlag && iteratorCommRank == 0 && multiProcEvalFlag)
162 init_serial_analyses();
163 else
164 set_analysis_communicators();
165
166 // check for configuration errors
167 set_communicators_checks(max_eval_concurrency);
168 }
169
170
171 void ApplicationInterface::
set_evaluation_communicators(const IntArray & message_lengths)172 set_evaluation_communicators(const IntArray& message_lengths)
173 {
174 // Buffer sizes for function evaluation message transfers are estimated in
175 // Model::init_communicators() so that hard-coded MPIUnpackBuffer
176 // lengths can be avoided. This estimation is reperformed on every call to
177 // IteratorScheduler::run_iterator(). A Bcast is not currently needed since
178 // every processor performs the estimation.
179 //MPI_Bcast(message_lengths.data(), 4, MPI_INT, 0, iteratorComm);
180 lenVarsMessage = message_lengths[0];
181 lenVarsActSetMessage = message_lengths[1];
182 lenResponseMessage = message_lengths[2];
183 lenPRPairMessage = message_lengths[3];
184
185 // Pull data from (the lowest) concurrent iterator partition. The active
186 // parallel configuration is managed in Model::init_communicators().
187 const ParallelConfiguration& pc = parallelLib.parallel_configuration();
188 const ParallelLevel& mi_pl = pc.mi_parallel_level(); // last mi level
189 iteratorCommSize = mi_pl.server_communicator_size();
190 iteratorCommRank = mi_pl.server_communicator_rank();
191
192 // These attributes are set by init_evaluation_communicators and are not
193 // available for use in the constructor.
194 const ParallelLevel& ie_pl = pc.ie_parallel_level();
195 ieDedMasterFlag = ie_pl.dedicated_master();
196 ieMessagePass = ie_pl.message_pass();
197 numEvalServers = ie_pl.num_servers(); // may differ from numEvalServersSpec
198 evalCommRank = ie_pl.server_communicator_rank();
199 evalCommSize = ie_pl.server_communicator_size();
200 evalServerId = ie_pl.server_id();
201 if (ieDedMasterFlag)
202 multiProcEvalFlag = (ie_pl.processors_per_server() > 1 ||
203 ie_pl.processor_remainder());
204 else // peer: split flag insufficient if 1 server
205 multiProcEvalFlag = (evalCommSize > 1); // could vary
206
207 // simplify downstream logic by resetting default asynch local concurrency
208 // to 1 for the case of message passing. This allows schedulers to more
209 // readily distinguish unlimited concurrency for asynch local parallelism
210 // (default is unlimited unless user concurrency spec) from message passing
211 // parallelism with synchronous local evals (default; hybrid mode requires
212 // user spec > 1).
213 asynchLocalEvalConcurrency = (ieMessagePass && asynchLocalEvalConcSpec == 0)
214 ? 1 : asynchLocalEvalConcSpec;
215 }
216
217
set_analysis_communicators()218 void ApplicationInterface::set_analysis_communicators()
219 {
220 const ParallelConfiguration& pc = parallelLib.parallel_configuration();
221 const ParallelLevel& ea_pl = pc.ea_parallel_level();
222
223 // Extract attributes for analysis partitions
224 eaDedMasterFlag = ea_pl.dedicated_master();
225 eaMessagePass = ea_pl.message_pass();
226 numAnalysisServers = ea_pl.num_servers();//may differ from numAnalysisSrvSpec
227 analysisCommRank = ea_pl.server_communicator_rank();
228 analysisCommSize = ea_pl.server_communicator_size();
229 analysisServerId = ea_pl.server_id();
230 if (eaDedMasterFlag)
231 multiProcAnalysisFlag = (ea_pl.processors_per_server() > 1 ||
232 ea_pl.processor_remainder());
233 else // peer: split flag insufficient if 1 server
234 multiProcAnalysisFlag = (analysisCommSize > 1); // could vary
235
236 if ( iteratorCommRank // any processor other than rank 0 in iteratorComm
237 || ( outputLevel == SILENT_OUTPUT && evalCommRank == 0 &&
238 !eaDedMasterFlag && numAnalysisServers < 2) )
239 suppressOutput = true; // suppress output of fn. eval. echoes
240 /* Additional output granularity:
241 if (ieMessagePass) // suppress fn eval output in
242 suppressLowLevelOutput = true; // SysCall/Fork/Direct
243 if (methodOutput == "quiet") // suppress scheduling & vars/response
244 suppressHighLevelOutput = true; // output in ApplicationInterface & Model
245 */
246
247 // simplify downstream logic by resetting default asynch local concurrency
248 // to 1 for the case of message passing. This allows schedulers to more
249 // readily distinguish unlimited concurrency for asynch local parallelism
250 // (default is unlimited unless user spec) from message passing parallelism
251 // with synchronous local evals (default; hybrid mode requires user spec > 1).
252 asynchLocalAnalysisConcurrency
253 = (eaMessagePass && asynchLocalAnalysisConcSpec == 0)
254 ? 1 : asynchLocalAnalysisConcSpec;
255
256 // Set flag for asynch local parallelism of analyses. In the local asynch
257 // case (no message passing), a concurrency specification is interpreted as
258 // a limit (default is unlimited). In the message passing case, the user
259 // must explicitly specify analysis concurrency to get hybrid parallelism
260 // (default is no analysis concurrency).
261 if ( numAnalysisDrivers > 1 &&
262 interfaceSynchronization == ASYNCHRONOUS_INTERFACE &&
263 ( asynchLocalAnalysisConcurrency > 1 || ( !eaMessagePass &&
264 !asynchLocalAnalysisConcurrency ) ) )
265 asynchLocalAnalysisFlag = true;
266 }
267
268
269 /** Override DirectApplicInterface definition if plug-in to allow batch
270 processing in Plugin{Serial,Parallel}DirectApplicInterface.cpp */
271 void ApplicationInterface::
init_communicators_checks(int max_eval_concurrency)272 init_communicators_checks(int max_eval_concurrency)
273 { } // default is no-op
274
275
276 /** Override DirectApplicInterface definition if plug-in to allow batch
277 processing in Plugin{Serial,Parallel}DirectApplicInterface.cpp */
278 void ApplicationInterface::
set_communicators_checks(int max_eval_concurrency)279 set_communicators_checks(int max_eval_concurrency)
280 { } // default is no-op
281
282
check_multiprocessor_analysis(bool warn)283 bool ApplicationInterface::check_multiprocessor_analysis(bool warn)
284 {
285 bool issue_flag = false;
286 // multiprocessor analyses are only valid for synchronous direct interfaces.
287 // Neither system calls (synch or asynch), forks (synch or asynch), nor POSIX
288 // threads (asynch direct) can share a communicator. Attempting parallel
289 // analyses without a shared communicator can result in correct answers if
290 // analysisComm's local leader computes the total result, but it does NOT
291 // perform the intended multiprocessor analysis and is therefore misleading
292 // and should be explicitly prevented.
293 if (multiProcAnalysisFlag) { // not valid for system/fork
294 issue_flag = true;
295 if (iteratorCommRank == 0) {
296 if (warn) Cerr << "Warning: ";
297 else Cerr << "Error: ";
298 Cerr << "Multiprocessor analyses are not valid with "
299 << interface_enum_to_string(interfaceType) << " interfaces.";
300 if (warn) Cerr << "\n This issue may be resolved at run time.";
301 else
302 Cerr << "\n Your processor allocation may exceed the "
303 << "concurrency in the problem,\n requiring a reduction "
304 << "in allocation to eliminate the assignment of\n excess "
305 << "processors to the analysis level.";
306 Cerr << std::endl;
307 }
308 }
309 return issue_flag;
310 }
311
312
313 bool ApplicationInterface::
check_asynchronous(bool warn,int max_eval_concurrency)314 check_asynchronous(bool warn, int max_eval_concurrency)
315 {
316 bool issue_flag = false, asynch_local_eval_flag
317 = ( max_eval_concurrency > 1 &&
318 interfaceSynchronization == ASYNCHRONOUS_INTERFACE &&
319 ( asynchLocalEvalConcurrency > 1 || // captures hybrid mode
320 ( !ieMessagePass && !asynchLocalEvalConcurrency ) ) ); // unlimited
321
322 // Check for asynchronous local evaluations or analyses
323 if (asynch_local_eval_flag || asynchLocalAnalysisFlag) {
324 issue_flag = true;
325 if (iteratorCommRank == 0) {
326 if (warn) Cerr << "Warning: ";
327 else Cerr << "Error: ";
328 Cerr << "asynchronous capability not supported in "
329 << interface_enum_to_string(interfaceType) << " interfaces.";
330 if (warn) Cerr << "\n This issue may be resolved at run time.";
331 Cerr << std::endl;
332 }
333 }
334 return issue_flag;
335 }
336
337
338 bool ApplicationInterface::
check_multiprocessor_asynchronous(bool warn,int max_eval_concurrency)339 check_multiprocessor_asynchronous(bool warn, int max_eval_concurrency)
340 {
341 bool issue_flag = false, asynch_local_eval_flag
342 = ( max_eval_concurrency > 1 &&
343 interfaceSynchronization == ASYNCHRONOUS_INTERFACE &&
344 ( asynchLocalEvalConcurrency > 1 || // captures hybrid mode
345 ( !ieMessagePass && !asynchLocalEvalConcurrency ) ) ); // unlimited
346
347 // Performing asynch local concurrency requires a single processor.
348 // multiProcEvalFlag & asynchLocalAnalysisConcurrency can coexist provided
349 // that evalComm is divided into single-processor analysis servers.
350 if ( (multiProcEvalFlag && asynch_local_eval_flag) ||
351 (multiProcAnalysisFlag && asynchLocalAnalysisFlag) ) {
352 issue_flag = true;
353 if (iteratorCommRank == 0) {
354 if (warn) Cerr << "Warning: ";
355 else Cerr << "Error: ";
356 Cerr << "asynchronous local jobs are not supported for multiprocessor\n"
357 << " communicator partitions.";
358 if (warn) Cerr << " This issue may be resolved at run time.";
359 else Cerr << " Your processor allocation may need adjustment.";
360 Cerr << std::endl;
361 }
362 }
363 return issue_flag;
364 }
365
366 /// form and return the final batch ID tag
367
368 String ApplicationInterface::
final_batch_id_tag()369 final_batch_id_tag() {
370 return evalTagPrefix + "." + std::to_string(batchIdCntr);
371 }
372
373 String ApplicationInterface::
final_eval_id_tag(int iface_eval_id)374 final_eval_id_tag(int iface_eval_id)
375 {
376 if (appendIfaceId) {
377 if(batchEval)
378 return evalTagPrefix + "." + std::to_string(batchIdCntr) + "." + std::to_string(iface_eval_id);
379 else
380 return evalTagPrefix + "." + std::to_string(iface_eval_id);
381 } else
382 return evalTagPrefix;
383 }
384
385
386
387 //void ApplicationInterface::free_communicators()
388 //{ }
389
390
391 /** The function evaluator for application interfaces. Called from
392 derived_evaluate() and derived_evaluate_nowait() in derived Model
393 classes. If asynch_flag is not set, perform a blocking evaluation
394 (using derived_map()). If asynch_flag is set, add the job to the
395 beforeSynchCorePRPQueue queue for execution by one of the scheduler
396 routines in synchronize() or synchronize_nowait(). Duplicate
397 function evaluations are detected with duplication_detect(). */
map(const Variables & vars,const ActiveSet & set,Response & response,bool asynch_flag)398 void ApplicationInterface::map(const Variables& vars, const ActiveSet& set,
399 Response& response, bool asynch_flag)
400 {
401 ++evalIdCntr; // all calls to map for this interface instance
402 const ShortArray& asv = set.request_vector();
403 size_t num_fns = asv.size();
404 if (fineGrainEvalCounters) { // detailed evaluation reporting
405 init_evaluation_counters(num_fns);
406 for (size_t i=0; i<num_fns; ++i) {
407 short asv_val = asv[i];
408 if (asv_val & 1) ++fnValCounter[i];
409 if (asv_val & 2) ++fnGradCounter[i];
410 if (asv_val & 4) ++fnHessCounter[i];
411 }
412 if (fnLabels.empty())
413 fnLabels = response.function_labels();
414 }
415 if (outputLevel > SILENT_OUTPUT) {
416 if (interfaceId.empty() || interfaceId == "NO_ID")
417 Cout << "\n---------------------\nBegin ";
418 else
419 Cout << "\n------------------------------\nBegin "
420 << std::setw(8) << interfaceId << ' ';
421 Cout << "Evaluation " << std::setw(4) << evalIdCntr;
422 // This may be more confusing than helpful:
423 //if (evalIdRefPt)
424 // Cout << " (local evaluation " << evalIdCntr - evalIdRefPt << ")";
425 if (interfaceId.empty() || interfaceId == "NO_ID") Cout << "\n---------------------\n";
426 else Cout << "\n------------------------------\n";
427 }
428 if (outputLevel > QUIET_OUTPUT)
429 Cout << "Parameters for evaluation " << evalIdCntr << ":\n" << vars << '\n';
430
431 response.active_set(set); // responseActiveSet = set for duplicate search
432
433 // Subdivide ActiveSet for algebraic_mappings() and derived_map()
434 Response algebraic_resp, core_resp; // empty handles
435 ActiveSet core_set;
436
437 if (algebraicMappings) {
438 if (evalIdCntr == 1)
439 init_algebraic_mappings(vars, response);
440
441 // Always allocate a separate algebraic_resp, even if no coreMappings.
442 // Cannot share a rep with the incoming response, because even if only
443 // algebraic mappings are present, they may need reordering to form the
444 // requested set and response.
445 ActiveSet algebraic_set;
446 asv_mapping(set, algebraic_set, core_set);
447 algebraic_resp = Response(sharedRespData, algebraic_set);
448 if (asynch_flag) {
449 ParamResponsePair prp(vars, interfaceId, algebraic_resp, evalIdCntr);
450 beforeSynchAlgPRPQueue.insert(prp);
451 }
452 else
453 algebraic_mappings(vars, algebraic_set, algebraic_resp);
454
455 if (coreMappings) { // both core and algebraic mappings active
456 // separate core_resp from response
457 core_resp = response.copy();
458 core_resp.active_set(core_set);
459 }
460 }
461 else if (coreMappings) { // analysis_driver mappings only
462 core_set = set;
463 core_resp = response; // shared rep: no need for response_mapping()
464 }
465
466 bool duplicate = false;
467 if (coreMappings) {
468 if (evalCacheFlag && duplication_detect(vars, core_resp, asynch_flag)) {
469 // catches duplication both in data_pairs (core evals already computed)
470 // and in beforeSynchCorePRPQueue (core evals queued for processing).
471 duplicate = true;
472 if (outputLevel > SILENT_OUTPUT)
473 Cout << "Duplication detected: analysis_drivers not invoked.\n";
474 }
475 else {
476
477 //if ( partial_duplication_detect(vars, set, core_resp) ) {
478 // sets augmentFlag (for use by Response::read), adds to core_resp,
479 // and decrements the asv to be used in derived_map, but saves the
480 // original asv for resetting once everything's reintegrated. The
481 // original asv must be restored prior to any I/O of the core_resp.
482 //}
483
484 // For new evaluations, manage the user's active_set_vector specification.
485 // on: asv seen by user's interface may change on each eval (default)
486 // off: asv seen by user's interface is constant for all evals
487 if (!asvControlFlag) { // set ASV's to defaultASV for the mapping
488 init_default_asv(num_fns); // initialize if not already done
489 core_set.request_vector(defaultASV); // DVV assigned above
490 core_resp.active_set(core_set);
491 }
492
493 if (asynch_flag) { // multiple simultaneous evals. (local or parallel)
494 // use this constructor since deep copies of vars/response are needed
495 ParamResponsePair prp(vars, interfaceId, core_resp, evalIdCntr);
496 beforeSynchCorePRPQueue.insert(prp);
497 // jobs are not queued until call to synchronize() to allow dynamic
498 // scheduling. Response data headers and data_pair list insertion
499 // appear in synchronize().
500 }
501 else { // local synchronous evaluation
502
503 // bcast the job to other processors within peer 1 (if required)
504 if (multiProcEvalFlag)
505 broadcast_evaluation(evalIdCntr, vars, core_set);
506
507 //common_input_filtering(vars);
508
509 currEvalId = evalIdCntr;
510 try { derived_map(vars, core_set, core_resp, currEvalId); }
511
512 catch(const FunctionEvalFailure& fneval_except) {
513 //Cout << "Caught FunctionEvalFailure in map; message: "
514 //<< fneval_except.what() << std::endl;
515 manage_failure(vars, core_set, core_resp, currEvalId);
516 }
517
518 //common_output_filtering(core_resp);
519
520 if (evalCacheFlag || restartFileFlag) {
521 // manage shallow/deep copy of vars/response with evalCacheFlag
522 ParamResponsePair prp(vars, interfaceId, core_resp, currEvalId,
523 evalCacheFlag);
524 if (evalCacheFlag) data_pairs.insert(prp);
525 if (restartFileFlag) parallelLib.write_restart(prp);
526 }
527 }
528 }
529 }
530
531 if (!duplicate) {
532 ++newEvalIdCntr; // nonduplicate evaluations (used ONLY in fn eval summary)
533 if (fineGrainEvalCounters) { // detailed evaluation reporting
534 const ShortArray& asv = set.request_vector();
535 size_t i, num_fns = asv.size();
536 for (i=0; i<num_fns; ++i) {
537 short asv_val = asv[i];
538 if (asv_val & 1) ++newFnValCounter[i];
539 if (asv_val & 2) ++newFnGradCounter[i];
540 if (asv_val & 4) ++newFnHessCounter[i];
541 }
542 }
543 }
544
545 if (asynch_flag) {
546 // Output appears here to support core | algebraic | both
547 if (!duplicate && outputLevel > SILENT_OUTPUT) {
548 if(batchEval) Cout << "(Batch job ";
549 else Cout << "(Asynchronous job ";
550 Cout << evalIdCntr;
551 if (interfaceId.empty() || interfaceId == "NO_ID") Cout << " added to queue)\n";
552 else Cout << " added to " << interfaceId << " queue)\n";
553 }
554 }
555 else {
556 // call response_mapping even when no coreMapping, as even with
557 // algebraic only, the functions may have to be reordered
558 if (algebraicMappings)
559 response_mapping(algebraic_resp, core_resp, response);
560
561 if (outputLevel > QUIET_OUTPUT) {
562 if (duplicate)
563 Cout << "\nActive response data retrieved from database";
564 else {
565 Cout << "\nActive response data for ";
566 if (!(interfaceId.empty() || interfaceId == "NO_ID")) Cout << interfaceId << ' ';
567 Cout << "evaluation " << evalIdCntr;
568 }
569 Cout << ":\n" << response << std::endl;
570 }
571 }
572 }
573
574
575 /** Called from map() to check incoming evaluation request for
576 duplication with content of data_pairs and beforeSynchCorePRPQueue.
577 If duplication is detected, return true, else return false. Manage
578 bookkeeping with historyDuplicateMap and beforeSynchDuplicateMap.
579 Note that the list searches can get very expensive if a long list
580 is searched on every new function evaluation (either from a large
581 number of previous jobs, a large number of pending jobs, or both).
582 For this reason, a user request for deactivation of the evaluation
583 cache results in a complete bypass of duplication_detect(), even
584 though a beforeSynchCorePRPQueue search would still be meaningful.
585 Since the intent of this request is to streamline operations, both
586 list searches are bypassed. */
587 bool ApplicationInterface::
duplication_detect(const Variables & vars,Response & response,bool asynch_flag)588 duplication_detect(const Variables& vars, Response& response, bool asynch_flag)
589 {
590 // Two flavors of cache lookup are supported: exact and tolerance-based.
591 // Note 1: incoming response's responseActiveSet was updated in map(), but
592 // the rest of response is out-of-date (the previous fn. eval). Due to
593 // set_compare(), the desired response set could be a subset of the
594 // data_pairs response -> use update().
595 // Note 2: If duplication detected with an eval from restart/file import,
596 // promote the database eval id to current run status to enable more
597 // meaningful lookups (e.g., eval id of final result) downstream. There
598 // are several possible bookkeeping update approaches; here, we choose to
599 // delete the old record and readd it with original ASV content and updated
600 // eval id. This approach is efficient and doesn't discard any data, but
601 // has the downside that the record's ASV may contain more than the current
602 // request. Another reasonable approach would be to keep the old record
603 // (original ASV) and add a new one (with the current ASV subset), but
604 // this is less efficient and complicates eval id management in downstream
605 // lookups (multiple records can match a particular Ids/Vars/Set lookup,
606 // requiring an additional test to prefer positive id's in some use cases).
607 PRPCacheOIter ord_it; PRPCacheHIter hash_it;
608 ParamResponsePair cache_pr; int cache_eval_id; bool cache_hit = false;
609 if (nearbyDuplicateDetect) { // slow but allows tolerance on equality
610 ord_it = lookup_by_nearby_val(data_pairs, interfaceId, vars,
611 response.active_set(), nearbyTolerance);
612 cache_hit = (ord_it != data_pairs.end());
613 if (cache_hit) { // ordered-specific updates (shared updates below)
614 response.update(ord_it->response());
615 cache_eval_id = ord_it->eval_id();
616 if (cache_eval_id <= 0)
617 { cache_pr = *ord_it; data_pairs.erase(ord_it); }
618 }
619 }
620 else { // fast but requires exact binary match
621 hash_it = lookup_by_val(data_pairs, interfaceId, vars,
622 response.active_set());
623 cache_hit = (hash_it != data_pairs.get<hashed>().end());
624 if (cache_hit) { // hashed-specific updates (shared updates below)
625 response.update(hash_it->response());
626 cache_eval_id = hash_it->eval_id();
627 if (cache_eval_id <= 0)
628 { cache_pr = *hash_it; data_pairs.get<hashed>().erase(hash_it); }
629 }
630 }
631 if (cache_hit) { // updates shared among ordered/hashed lookups
632 if (cache_eval_id <= 0) {
633 // ordered key is const; must remove (above) & change/add (below)
634 cache_pr.eval_id(evalIdCntr); // promote
635 data_pairs.insert(cache_pr); // shallow copy of previous vars/resp
636 }
637
638 if (asynch_flag) // asynch case: bookkeep
639 historyDuplicateMap[evalIdCntr] = response.copy();
640
641 return true; // Duplication detected
642 }
643
644 // check beforeSynchCorePRPQueue as well (if asynchronous and no cache hit)
645 if (asynch_flag) {
646 // queue lookups only support one flavor for simplicity: exact lookup
647 PRPQueueHIter queue_it = lookup_by_val(beforeSynchCorePRPQueue, interfaceId,
648 vars, response.active_set());
649 if (queue_it != beforeSynchCorePRPQueue.get<hashed>().end()) {
650 // Duplication detected: bookkeep
651 beforeSynchDuplicateMap[evalIdCntr]
652 = std::make_pair(queue_it, response.copy());
653 return true; // Duplication detected
654 }
655 }
656
657 return false; // Duplication not detected
658 }
659
660 /** If the user has specified active_set_vector as off, then map()
661 uses a default ASV which is constant for all function evaluations
662 (so that the user need not check the content of the ASV on each
663 evaluation). Only initialized if needed and not already sized. */
init_default_asv(size_t num_fns)664 void ApplicationInterface::init_default_asv(size_t num_fns) {
665 if (!asvControlFlag && defaultASV.size() != num_fns) {
666 short asv_value = 1;
667 if (gradientType == "analytic")
668 asv_value |= 2;
669 if (hessianType == "analytic")
670 asv_value |= 4;
671 defaultASV.assign(num_fns, asv_value);
672 // TODO: the mixed ID sizes from the problem DB may not be
673 // commensurate with num_fns due to Recast transformations (MO
674 // reduce or experiment data); consider managing this in Model
675 if (gradientType == "mixed") {
676 ISCIter cit = gradMixedAnalyticIds.begin();
677 ISCIter cend = gradMixedAnalyticIds.end();
678 for ( ; cit != cend; ++cit)
679 defaultASV[*cit - 1] |= 2;
680 }
681 if (hessianType == "mixed") {
682 ISCIter cit = hessMixedAnalyticIds.begin();
683 ISCIter cend = hessMixedAnalyticIds.end();
684 for ( ; cit != cend; ++cit)
685 defaultASV[*cit - 1] |= 4;
686 }
687 }
688 }
689
690
691 /** This function provides blocking synchronization for all cases of
692 asynchronous evaluations, including the local asynchronous case
693 (background system call, nonblocking fork, & multithreads), the
694 message passing case, and the hybrid case. Called from
695 derived_synchronize() in derived Model classes. */
synchronize()696 const IntResponseMap& ApplicationInterface::synchronize()
697 {
698 rawResponseMap.clear();
699
700 size_t cached_eval = cachedResponseMap.size(),
701 hist_duplicates = historyDuplicateMap.size(),
702 queue_duplicates = beforeSynchDuplicateMap.size();
703
704 // Process cached responses
705 if (cached_eval) std::swap(rawResponseMap, cachedResponseMap);
706
707 // Process history duplicates (see duplication_detect) since response data
708 // has been extracted from the data_pairs list. These duplicates are not
709 // written to data_pairs or write_restart.
710 if (hist_duplicates) {
711 if (rawResponseMap.empty()) std::swap(rawResponseMap, historyDuplicateMap);
712 else {
713 rawResponseMap.insert(historyDuplicateMap.begin(),
714 historyDuplicateMap.end());
715 historyDuplicateMap.clear();
716 }
717 }
718
719 if (coreMappings) {
720 size_t core_prp_jobs = beforeSynchCorePRPQueue.size();
721 Cout << "\nBlocking synchronize of " << core_prp_jobs << " asynchronous ";
722 if (!(interfaceId.empty() || interfaceId == "NO_ID")) Cout << interfaceId << ' ';
723 Cout << "evaluations";
724 if (cached_eval || hist_duplicates || queue_duplicates)
725 Cout << ", " << cached_eval << " cached evaluations, and "
726 << hist_duplicates + queue_duplicates << " duplicates";
727 Cout << std::endl;
728
729 // Process nonduplicate evaluations for either the message passing or local
730 // asynchronous case.
731 if (core_prp_jobs) {
732 if (ieMessagePass) { // single or multi-processor servers
733 if (ieDedMasterFlag) master_dynamic_schedule_evaluations();
734 else {
735 // utilize asynch local evals to accomplish a dynamic peer schedule
736 // (even if hybrid mode not specified) unless precluded by direct
737 // interface, multiProcEvalFlag (includes single proc analysis cases),
738 // static scheduling override, or static asynch local specification.
739 if ( asynchLocalEvalStatic || multiProcEvalFlag ||
740 (interfaceType & DIRECT_INTERFACE_BIT) ||
741 evalScheduling == PEER_STATIC_SCHEDULING )
742 peer_static_schedule_evaluations();
743 else // utilizes asynch local evals even if hybrid mode not specified
744 peer_dynamic_schedule_evaluations();
745 }
746 }
747 else // local to processor
748 asynchronous_local_evaluations(beforeSynchCorePRPQueue);
749 }
750 }
751 else if (!beforeSynchAlgPRPQueue.empty()) {
752 Cout << "\nBlocking synchronize of " << beforeSynchAlgPRPQueue.size();
753 if (!(interfaceId.empty() || interfaceId == "NO_ID")) Cout << ' ' << interfaceId;
754 Cout << " algebraic mappings" << std::endl;
755 }
756
757 // Now that beforeSynchCorePRPQueue processing is complete, process duplicates
758 // detected within beforeSynchCorePRPQueue (see duplication_detect).
759 if (queue_duplicates) {
760 for (std::map<int, std::pair<PRPQueueHIter, Response> >::const_iterator
761 bsd_iter = beforeSynchDuplicateMap.begin();
762 bsd_iter != beforeSynchDuplicateMap.end(); bsd_iter++) {
763 // due to id_vars_set_compare, the desired response set could be a subset
764 // of the beforeSynchCorePRPQueue duplicate response -> use update().
765 rawResponseMap[bsd_iter->first] = (bsd_iter->second).second;
766 rawResponseMap[bsd_iter->first].update(
767 (bsd_iter->second).first->response());
768 }
769 beforeSynchDuplicateMap.clear();
770 }
771 beforeSynchCorePRPQueue.clear();
772
773 // Merge core mappings and algebraic mappings into rawResponseMap
774 if (algebraicMappings) { // complete all algebraic jobs and overlay
775 for (PRPQueueIter alg_prp_it = beforeSynchAlgPRPQueue.begin();
776 alg_prp_it != beforeSynchAlgPRPQueue.end(); alg_prp_it++) {
777 Response alg_response = alg_prp_it->response();
778 algebraic_mappings(alg_prp_it->variables(), alg_prp_it->active_set(),
779 alg_response);
780 if (coreMappings) {
781 Response& response = rawResponseMap[alg_prp_it->eval_id()];
782 response_mapping(alg_response, response, response);
783 }
784 else {
785 // call response_mapping even when no coreMapping, as even with
786 // algebraic only, the functions may have to be reordered
787
788 // Recreate total_response with the correct (possibly
789 // reordered) ASV since when no CoreMapping, rawResponseMap
790 // doesn't have a valid Response to update
791 ActiveSet total_set(alg_prp_it->active_set());
792 asv_mapping(alg_prp_it->active_set(), total_set);
793 Response total_response = Response(sharedRespData, total_set);
794 response_mapping(alg_response, total_response, total_response);
795 rawResponseMap[alg_prp_it->eval_id()] = total_response;
796 }
797 }
798 beforeSynchAlgPRPQueue.clear();
799 }
800
801 if (outputLevel > QUIET_OUTPUT) // output completed responses
802 for (IntRespMCIter rr_iter = rawResponseMap.begin();
803 rr_iter != rawResponseMap.end(); ++rr_iter) {
804 Cout << "\nActive response data for ";
805 if (!(interfaceId.empty() || interfaceId == "NO_ID")) Cout << interfaceId << ' ';
806 Cout << "evaluation " << rr_iter->first
807 << ":\n" << rr_iter->second;
808 }
809
810 return rawResponseMap;
811 }
812
813
814 /** This function provides nonblocking synchronization for the local
815 asynchronous case and selected nonblocking message passing schedulers.
816 Called from derived_synchronize_nowait() in derived Model classes. */
synchronize_nowait()817 const IntResponseMap& ApplicationInterface::synchronize_nowait()
818 {
819 rawResponseMap.clear();
820
821 size_t cached_eval = cachedResponseMap.size(),
822 hist_duplicates = historyDuplicateMap.size(),
823 queue_duplicates = beforeSynchDuplicateMap.size();
824
825 if (coreMappings) {
826 size_t core_prp_jobs = beforeSynchCorePRPQueue.size();
827 // suppress repeated header output for longer jobs;
828 // don't need to check queue_duplicates since none w/o core queue
829 if ( headerFlag && (core_prp_jobs || hist_duplicates) ) {
830 Cout << "\nNonblocking synchronize of " << core_prp_jobs
831 << " asynchronous ";
832 if (!(interfaceId.empty() || interfaceId == "NO_ID")) Cout << interfaceId << ' ';
833 Cout << "evaluations";
834 if (cached_eval || hist_duplicates || queue_duplicates)
835 Cout << ", " << cached_eval << " cached evaluations, and "
836 << hist_duplicates + queue_duplicates << " duplicates";
837 Cout << std::endl;
838 }
839
840 // Test nonduplicate evaluations and add completions to rawResponseMap
841 if (core_prp_jobs) {
842 if (ieMessagePass) { // single or multi-processor servers
843 if (ieDedMasterFlag)
844 master_dynamic_schedule_evaluations_nowait();
845 else {
846 // prefer to use peer_dynamic to avoid blocking on local jobs, as
847 // is consistent with nowait requirement; however, a fallback to
848 // peer_static is needed for the special cases listed below:
849 if ( asynchLocalEvalStatic || multiProcEvalFlag ||
850 (interfaceType & DIRECT_INTERFACE_BIT) ||
851 evalScheduling == PEER_STATIC_SCHEDULING )
852 peer_static_schedule_evaluations_nowait();
853 else
854 peer_dynamic_schedule_evaluations_nowait();
855 }
856 }
857 else // local to processor
858 asynchronous_local_evaluations_nowait(beforeSynchCorePRPQueue);
859 }
860 // suppress header on next pass if no new completions on this pass
861 headerFlag = !rawResponseMap.empty();
862 }
863 else if (!beforeSynchAlgPRPQueue.empty()) {
864 Cout << "\nNonblocking synchronize of " << beforeSynchAlgPRPQueue.size();
865 if (!(interfaceId.empty() || interfaceId == "NO_ID")) Cout << ' ' << interfaceId;
866 Cout << " algebraic mappings" << std::endl;
867 }
868
869 // Since beforeSynchCorePRPQueue processing will not in general be completed,
870 // process duplicates listed in beforeSynchDuplicateMap only if the
871 // original/nonduplicate beforeSynchCorePRPQueue job is complete.
872 if (queue_duplicates && !rawResponseMap.empty())
873 for (std::map<int, std::pair<PRPQueueHIter, Response> >::iterator
874 bsd_iter = beforeSynchDuplicateMap.begin();
875 bsd_iter != beforeSynchDuplicateMap.end(); bsd_iter++) {
876 const ParamResponsePair& scheduled_pr = *(bsd_iter->second).first;
877 if (rawResponseMap.find(scheduled_pr.eval_id()) != rawResponseMap.end()) {
878 // due to id_vars_set_compare, the desired response set could be
879 // a subset of the duplicate response -> use update().
880 Response& response = (bsd_iter->second).second;
881 response.update(scheduled_pr.response());
882 rawResponseMap[bsd_iter->first] = response;
883 }
884 }
885
886 // Process history duplicates (see duplication_detect) and cached evaluations.
887 // In the _nowait case, this goes after the schedulers so as to not interfere
888 // with rawResponseMap usage in the schedulers and after the
889 // beforeSynchDuplicates in order to streamline their rawResponseMap searches.
890 // Note: since data_pairs is checked first in duplication_detect(), it is not
891 // possible to have a beforeSynchDuplicateMap entry that references a
892 // historyDuplicateMap entry.
893 if (cached_eval) {
894 rawResponseMap.insert(cachedResponseMap.begin(), cachedResponseMap.end());
895 cachedResponseMap.clear(); headerFlag = true;
896 }
897 if (hist_duplicates) {
898 rawResponseMap.insert(historyDuplicateMap.begin(),
899 historyDuplicateMap.end());
900 historyDuplicateMap.clear(); headerFlag = true;
901 }
902
903 // Merge core mappings and algebraic mappings into rawResponseMap
904 if (coreMappings && algebraicMappings) { // update completed core jobs
905 for (IntRespMIter rr_iter = rawResponseMap.begin();
906 rr_iter != rawResponseMap.end(); ++rr_iter) {
907 PRPQueueIter alg_prp_it
908 = lookup_by_eval_id(beforeSynchAlgPRPQueue, rr_iter->first);
909 Response alg_response = alg_prp_it->response(); // shared rep
910 algebraic_mappings(alg_prp_it->variables(), alg_prp_it->active_set(),
911 alg_response); // update rep
912 response_mapping(alg_response, rr_iter->second, rr_iter->second);
913 beforeSynchAlgPRPQueue.erase(alg_prp_it);
914 }
915 }
916 else if (algebraicMappings) { // complete all algebraic jobs
917 for (PRPQueueIter alg_prp_it = beforeSynchAlgPRPQueue.begin();
918 alg_prp_it != beforeSynchAlgPRPQueue.end(); alg_prp_it++) {
919
920 Response algebraic_resp = alg_prp_it->response(); // shared rep
921 algebraic_mappings(alg_prp_it->variables(), alg_prp_it->active_set(),
922 algebraic_resp); // update rep
923 // call response_mapping even when no coreMapping, as even with
924 // algebraic only, the functions may have to be reordered
925
926 // Recreate total_response with the correct (possibly reordered)
927 // ASV since when no CoreMapping, rawResponseMap doesn't have a
928 // valid Response to update
929 ActiveSet total_set(alg_prp_it->active_set());
930 asv_mapping(alg_prp_it->active_set(), total_set);
931 Response total_response = Response(sharedRespData, total_set);
932 response_mapping(algebraic_resp, total_response, total_response);
933 rawResponseMap[alg_prp_it->eval_id()] = total_response;
934 }
935 beforeSynchAlgPRPQueue.clear();
936 }
937
938 for (IntRespMCIter rr_iter = rawResponseMap.begin();
939 rr_iter != rawResponseMap.end(); ++rr_iter) {
940 int fn_eval_id = rr_iter->first;
941 // output completed responses
942 if (outputLevel > QUIET_OUTPUT) {
943 Cout << "\nActive response data for ";
944 if (!(interfaceId.empty() || interfaceId == "NO_ID")) Cout << interfaceId << ' ';
945 Cout << "evaluation " << fn_eval_id << ":\n" << rr_iter->second;
946 }
947 // clean up bookkeeping
948 if (coreMappings) {
949 PRPQueueIter prp_iter
950 = lookup_by_eval_id(beforeSynchCorePRPQueue, fn_eval_id);
951 if (prp_iter != beforeSynchCorePRPQueue.end()) // duplicates not in list
952 beforeSynchCorePRPQueue.erase(prp_iter);
953 beforeSynchDuplicateMap.erase(fn_eval_id); // if present
954 }
955 }
956
957 return rawResponseMap;
958 }
959
960
961 /** This code is called from synchronize() to provide the master portion of
962 a master-slave algorithm for the dynamic scheduling of evaluations among
963 slave servers. It performs no evaluations locally and matches either
964 serve_evaluations_synch() or serve_evaluations_asynch() on the slave
965 servers, depending on the value of asynchLocalEvalConcurrency. Dynamic
966 scheduling assigns jobs in 2 passes. The 1st pass gives each server
967 the same number of jobs (equal to asynchLocalEvalConcurrency). The
968 2nd pass assigns the remaining jobs to slave servers as previous jobs
969 are completed and returned. Single- and multilevel parallel use intra-
970 and inter-communicators, respectively, for send/receive. Specific
971 syntax is encapsulated within ParallelLibrary. */
master_dynamic_schedule_evaluations()972 void ApplicationInterface::master_dynamic_schedule_evaluations()
973 {
974 int capacity = numEvalServers;
975 if (asynchLocalEvalConcurrency > 1) capacity *= asynchLocalEvalConcurrency;
976 int num_jobs = beforeSynchCorePRPQueue.size(),
977 num_sends = std::min(capacity, num_jobs);
978 Cout << "Master dynamic schedule: first pass assigning " << num_sends
979 << " jobs among " << numEvalServers << " servers\n";
980
981 // only need num_sends entries (not num_jobs) due to reuse
982 sendBuffers = new MPIPackBuffer [num_sends];
983 recvBuffers = new MPIUnpackBuffer [num_sends];
984 recvRequests = new MPI_Request [num_sends];
985
986 // send data & post receives for 1st set of jobs
987 int i, server_id, fn_eval_id;
988 PRPQueueIter prp_iter;
989 for (i=0, prp_iter = beforeSynchCorePRPQueue.begin(); i<num_sends;
990 ++i, ++prp_iter) {
991 server_id = i%numEvalServers + 1; // from 1 to numEvalServers
992 send_evaluation(prp_iter, i, server_id, false); // !peer
993 }
994
995 // schedule remaining jobs
996 if (num_sends < num_jobs) {
997 Cout << "Master dynamic schedule: second pass scheduling "
998 << num_jobs-num_sends << " remaining jobs\n";
999 int send_cntr = num_sends, recv_cntr = 0, out_count;
1000 MPI_Status* status_array = new MPI_Status [num_sends];
1001 int* index_array = new int [num_sends];
1002 PRPQueueIter return_iter;
1003 while (recv_cntr < num_jobs) {
1004 if (outputLevel > SILENT_OUTPUT)
1005 Cout << "Master dynamic schedule: waiting on completed jobs"<<std::endl;
1006 parallelLib.waitsome(num_sends, recvRequests, out_count, index_array,
1007 status_array);
1008 recv_cntr += out_count;
1009 for (i=0; i<out_count; ++i) {
1010 int index = index_array[i]; // index of recv_request that completed
1011 server_id = index%numEvalServers + 1; // from 1 to numEvalServers
1012 fn_eval_id = status_array[i].MPI_TAG;
1013 return_iter = lookup_by_eval_id(beforeSynchCorePRPQueue, fn_eval_id);
1014 receive_evaluation(return_iter, index, server_id, false); //!peer
1015 if (send_cntr < num_jobs) {
1016 send_evaluation(prp_iter, index, server_id, false); // !peer
1017 ++send_cntr; ++prp_iter;
1018 }
1019 }
1020 }
1021 delete [] status_array;
1022 delete [] index_array;
1023 }
1024 else { // all jobs assigned in first pass
1025 if (outputLevel > SILENT_OUTPUT)
1026 Cout << "Master dynamic schedule: waiting on all jobs" << std::endl;
1027 parallelLib.waitall(num_jobs, recvRequests);
1028 // All buffers received, now generate rawResponseMap
1029 for (i=0, prp_iter = beforeSynchCorePRPQueue.begin(); i<num_jobs;
1030 ++i, ++prp_iter) {
1031 server_id = i%numEvalServers + 1; // from 1 to numEvalServers
1032 receive_evaluation(prp_iter, i, server_id, false);
1033 }
1034 }
1035 // deallocate MPI & buffer arrays
1036 delete [] sendBuffers; sendBuffers = NULL;
1037 delete [] recvBuffers; recvBuffers = NULL;
1038 delete [] recvRequests; recvRequests = NULL;
1039 }
1040
1041
1042 /** This code runs on the iteratorCommRank 0 processor (the iterator) and is
1043 called from synchronize() in order to manage a static schedule for cases
1044 where peer 1 must block when evaluating its local job allocation (e.g.,
1045 single or multiprocessor direct interface evaluations). It matches
1046 serve_evaluations_peer() for any other processors within the first
1047 evaluation partition and serve_evaluations_{synch,asynch}() for all other
1048 evaluation partitions (depending on asynchLocalEvalConcurrency). It
1049 performs function evaluations locally for its portion of the job
1050 allocation using either asynchronous_local_evaluations() or
1051 synchronous_local_evaluations(). Single-level and multilevel parallel
1052 use intra- and inter-communicators, respectively, for send/receive.
1053 Specific syntax is encapsulated within ParallelLibrary. The
1054 iteratorCommRank 0 processor assigns the static schedule since it is the
1055 only processor with access to beforeSynchCorePRPQueue (it runs the
1056 iterator and calls synchronize). The alternate design of each peer
1057 selecting its own jobs using the modulus operator would be applicable if
1058 execution of this function (and therefore the job list) were distributed. */
peer_static_schedule_evaluations()1059 void ApplicationInterface::peer_static_schedule_evaluations()
1060 {
1061 // rounding down num_peer1_jobs offloads this processor (which has additional
1062 // work relative to other peers), but results in a few more passed messages.
1063 int num_jobs = beforeSynchCorePRPQueue.size(),
1064 num_peer1_jobs = (int)std::floor((Real)num_jobs/numEvalServers),
1065 num_sends = num_jobs - num_peer1_jobs;
1066 Cout << "Peer static schedule: assigning " << num_jobs << " jobs among "
1067 << numEvalServers << " peers\n";
1068 sendBuffers = new MPIPackBuffer [num_sends];
1069 recvBuffers = new MPIUnpackBuffer [num_sends];
1070 recvRequests = new MPI_Request [num_sends];
1071 int i, server_id, fn_eval_id;
1072
1073 // Assign jobs locally + remotely using a round-robin assignment. Since
1074 // this is a static schedule, all remote job assignments are sent now. This
1075 // assignment is not dependent on the capacity of the other peers (i.e.,
1076 // on whether they run serve_evaluation_synch or serve_evaluation_asynch).
1077 PRPQueueIter prp_iter = beforeSynchCorePRPQueue.begin();
1078 PRPQueue local_prp_queue; size_t buff_index = 0;
1079 for (i=1; i<=num_jobs; ++i, ++prp_iter) { // shift by 1 to reduce peer 1 work
1080 server_id = i%numEvalServers; // 0 to numEvalServers-1
1081 if (server_id) { // 1 to numEvalServers-1
1082 send_evaluation(prp_iter, buff_index, server_id, true); // peer
1083 ++buff_index;
1084 }
1085 else
1086 local_prp_queue.insert(*prp_iter);
1087 }
1088 // This simple approach is not best for hybrid mode + asynchLocalEvalStatic:
1089 // Peer 1 retains the first num_peer1_jobs and spreads the rest to peers 2
1090 // through n in a round-robin assignment.
1091 //PRPQueueIter prp_iter = beforeSynchCorePRPQueue.begin();
1092 //std::advance(prp_iter, num_peer1_jobs); // offset PRP list by num_peer1_jobs
1093 //PRPQueueIter prp_iter_save = prp_iter;
1094 //for (i=0; i<num_sends; ++i, ++prp_iter) {
1095 // server_id = i%(numEvalServers-1) + 1; // 1 to numEvalServers-1
1096 // send_evaluation(prp_iter, i, server_id, true); // peer
1097 //}
1098 // Perform computation for first num_peer1_jobs jobs on peer 1.
1099 //PRPQueue local_prp_queue(beforeSynchCorePRPQueue.begin(), prp_iter_save);
1100
1101 // Perform computations on peer 1. Default behavior is synchronous evaluation
1102 // of jobs on each peer. Only if asynchLocalEvalConcurrency > 1 do we get the
1103 // hybrid parallelism of asynch jobs on each peer (asynchLocalEvalConcurrency
1104 // serves a dual role: throttles concurrency if asynch local alone, and
1105 // multiplies concurrency if hybrid).
1106 if (asynchLocalEvalConcurrency > 1) { // peer_dynamic is default in this case
1107 Cout << "Peer static schedule: peer 1 scheduling " << num_peer1_jobs
1108 << " local jobs\n";
1109 asynchronous_local_evaluations(local_prp_queue);
1110 }
1111 else { // 1 synchronous job at a time
1112 Cout << "Peer static schedule: peer 1 evaluating " << num_peer1_jobs
1113 << " local jobs\n";
1114 synchronous_local_evaluations(local_prp_queue);
1115 }
1116 // reassign used to be required for beforeSynchDuplicates to work properly in
1117 // synchronize(), but is now unnecessary due to response representation
1118 // sharing between local_prp_queue and beforeSynchCorePRPQueue.
1119 //for (i=0; i<num_peer1_jobs; ++i)
1120 // beforeSynchCorePRPQueue[core_index].response(
1121 // local_prp_queue[i].response());
1122
1123 if (num_sends) { // Retrieve results from peers
1124 if (outputLevel > SILENT_OUTPUT)
1125 Cout << "Peer static schedule: waiting on assigned jobs" << std::endl;
1126 parallelLib.waitall(num_sends, recvRequests);
1127
1128 // All buffers received, now generate rawResponseMap
1129 prp_iter = beforeSynchCorePRPQueue.begin(); buff_index = 0;
1130 for (i=1; i<=num_jobs; ++i, ++prp_iter) {// shift by 1 to reduce peer 1 work
1131 server_id = i%numEvalServers; // 0 to numEvalServers-1
1132 if (server_id) {
1133 receive_evaluation(prp_iter, buff_index, server_id, true); // peer
1134 ++buff_index;
1135 }
1136 }
1137 // Mirrors simple assignment approach above.
1138 //prp_iter = prp_iter_save; // offset start by num_peer1_jobs
1139 //for (i=0; i<num_sends; ++i, ++prp_iter) {
1140 // server_id = i%(numEvalServers-1) + 1; // 1 to numEvalServers-1
1141 // receive_evaluation(prp_iter, i, server_id, true); // peer
1142 //}
1143 }
1144
1145 // deallocate MPI & buffer arrays
1146 delete [] sendBuffers; sendBuffers = NULL;
1147 delete [] recvBuffers; recvBuffers = NULL;
1148 delete [] recvRequests; recvRequests = NULL;
1149 }
1150
1151
1152 /** This code runs on the iteratorCommRank 0 processor (the iterator) and is
1153 called from synchronize() in order to manage a dynamic schedule, as
1154 enabled by nonblocking management of local asynchronous jobs. It
1155 matches serve_evaluations_{synch,asynch}() for other evaluation
1156 partitions, depending on asynchLocalEvalConcurrency; it does not match
1157 serve_evaluations_peer() since, for local asynchronous jobs, the first
1158 evaluation partition cannot be multiprocessor. It performs function
1159 evaluations locally for its portion of the job allocation using
1160 asynchronous_local_evaluations_nowait(). Single-level and multilevel
1161 parallel use intra- and inter-communicators, respectively, for
1162 send/receive. Specific syntax is encapsulated within ParallelLibrary. */
peer_dynamic_schedule_evaluations()1163 void ApplicationInterface::peer_dynamic_schedule_evaluations()
1164 {
1165 size_t num_jobs = beforeSynchCorePRPQueue.size(),
1166 server_capacity = std::max(1, asynchLocalEvalConcurrency),
1167 total_capacity = numEvalServers * server_capacity,
1168 remote_capacity = total_capacity - server_capacity;
1169
1170 // avoid nonblocking local scheduling, if not required.
1171 // Note: this scheduler switch needs to be fully consistent with inter-comm
1172 // creation logic in init_communicators().
1173 //if (num_jobs <= remote_capacity && !multiProcEvalFlag)
1174 // { master_dynamic_schedule_evaluations(); return; }
1175
1176 // Use round-robin assignment, skipping peer1 on 1st round. Alternatively,
1177 // could minimize local job count, but this is less balanced (also consider
1178 // the memory budgeting for sims) and leads to less intuitive id assignments.
1179 size_t num_assign = std::min(total_capacity, num_jobs),
1180 num_local_assign = num_assign / numEvalServers, // truncates fractional
1181 num_remote_assign = num_assign - num_local_assign;
1182 Cout << "Peer dynamic schedule: first pass assigning " << num_remote_assign
1183 << " jobs among " << numEvalServers-1 << " remote peers\n";
1184 sendBuffers = new MPIPackBuffer [num_remote_assign];
1185 recvBuffers = new MPIUnpackBuffer [num_remote_assign];
1186 recvRequests = new MPI_Request [num_remote_assign];
1187 int i, server_id, fn_eval_id;
1188 PRPQueueIter assign_iter = beforeSynchCorePRPQueue.begin();
1189 PRPQueue local_prp_queue; size_t buff_index = 0;
1190 for (i=1; i<=num_assign; ++i, ++assign_iter) {//shift +1 to prefer remote work
1191 server_id = i%numEvalServers; // 0 to numEvalServers-1
1192 if (server_id) { // 1 to numEvalServers-1
1193 send_evaluation(assign_iter, buff_index, server_id, true); // peer
1194 msgPassRunningMap[assign_iter->eval_id()]
1195 = IntSizetPair(server_id, buff_index);
1196 ++buff_index;
1197 }
1198 else
1199 local_prp_queue.insert(*assign_iter);
1200 }
1201
1202 // Start nonblocking asynch local computations on peer 1
1203 Cout << "Peer dynamic schedule: first pass launching " << num_local_assign
1204 << " local jobs\n";
1205 // "Step 1" of asynch_local_evaluations_nowait()
1206 PRPQueueIter local_prp_iter;
1207 assign_asynch_local_queue(local_prp_queue, local_prp_iter);
1208
1209 // block until local and remote scheduling are complete
1210 if (outputLevel > SILENT_OUTPUT && num_jobs > num_assign)
1211 Cout << "Peer dynamic schedule: second pass scheduling "
1212 << num_jobs - num_assign << " remaining jobs" << std::endl;
1213 size_t recv_cntr = 0;
1214 while (recv_cntr < num_jobs) {
1215 // process completed message passing jobs and backfill
1216 recv_cntr += test_receives_backfill(assign_iter, true); // peer
1217 // "Step 2" and "Step 3" of asynch_local_evaluations_nowait()
1218 recv_cntr += test_local_backfill(beforeSynchCorePRPQueue, assign_iter);
1219 }
1220
1221 // deallocate MPI & buffer arrays
1222 delete [] sendBuffers; sendBuffers = NULL;
1223 delete [] recvBuffers; recvBuffers = NULL;
1224 delete [] recvRequests; recvRequests = NULL;
1225 }
1226
1227
1228 /** This function provides blocking synchronization for the local asynch
1229 case (background system call, nonblocking fork, or threads). It can
1230 be called from synchronize() for a complete local scheduling of all
1231 asynchronous jobs or from peer_{static,dynamic}_schedule_evaluations()
1232 to perform a local portion of the total job set. It uses
1233 derived_map_asynch() to initiate asynchronous evaluations and
1234 wait_local_evaluations() to capture completed jobs, and mirrors the
1235 master_dynamic_schedule_evaluations() message passing scheduler as much
1236 as possible (wait_local_evaluations() is modeled after MPI_Waitsome()). */
1237 void ApplicationInterface::
asynchronous_local_evaluations(PRPQueue & local_prp_queue)1238 asynchronous_local_evaluations(PRPQueue& local_prp_queue)
1239 {
1240 size_t i, static_servers, server_index, num_jobs = local_prp_queue.size(),
1241 num_active, /*num_launch,*/ num_sends = (asynchLocalEvalConcurrency) ?
1242 std::min((size_t)asynchLocalEvalConcurrency, num_jobs) : num_jobs;
1243 bool static_limited
1244 = (asynchLocalEvalStatic && asynchLocalEvalConcurrency > 1);
1245 if (static_limited)
1246 static_servers = asynchLocalEvalConcurrency * numEvalServers;
1247
1248 // Step 1: first pass launching of jobs up to the local server capacity
1249 Cout << "First pass: initiating ";
1250 if (static_limited) Cout << "at most ";
1251 Cout << num_sends << " local asynchronous jobs\n";
1252 PRPQueueIter local_prp_iter;
1253 assign_asynch_local_queue(local_prp_queue, local_prp_iter);
1254
1255 num_active = /*num_launch =*/ asynchLocalActivePRPQueue.size();
1256 if (num_active < num_jobs) {
1257 Cout << "Second pass: ";
1258 if (static_limited) Cout << "static ";
1259 Cout << "scheduling " << num_jobs - num_active
1260 << " remaining local asynchronous jobs\n";
1261 }
1262
1263 size_t recv_cntr = 0, completed; bool launch;
1264 while (recv_cntr < num_jobs) {
1265
1266 /*
1267 // SEND TERM ASAP
1268 if (multiProcEvalFlag && num_launch == num_jobs) {
1269 // stop serve_evaluation_{,a}synch_peer procs
1270 int fn_eval_id = 0;
1271 parallelLib.bcast_e(fn_eval_id);
1272 }
1273 */
1274
1275 // Step 2: process completed jobs with wait_local_evaluations()
1276 if (outputLevel > SILENT_OUTPUT) {
1277 if(batchEval)
1278 Cout << "Waiting on completed batch" << std::endl;
1279 else
1280 Cout << "Waiting on completed jobs" << std::endl;
1281 }
1282 completionSet.clear();
1283 wait_local_evaluations(asynchLocalActivePRPQueue); // rebuilds completionSet
1284 recv_cntr += completed = completionSet.size();
1285 for (ISCIter id_iter = completionSet.begin();
1286 id_iter != completionSet.end(); ++id_iter)
1287 { process_asynch_local(*id_iter); --num_active; }
1288
1289 // Step 3: backfill completed jobs with the next pending jobs (if present)
1290 if (static_limited) // reset to start of local queue
1291 local_prp_iter = local_prp_queue.begin();
1292 for (i=0; local_prp_iter != local_prp_queue.end(); ++i, ++local_prp_iter) {
1293 int fn_eval_id = local_prp_iter->eval_id();
1294 launch = false;
1295 if (static_limited) {
1296 server_index = (fn_eval_id - 1) % static_servers;
1297 if ( lookup_by_eval_id(asynchLocalActivePRPQueue, fn_eval_id) ==
1298 asynchLocalActivePRPQueue.end() &&
1299 rawResponseMap.find(fn_eval_id) == rawResponseMap.end() &&
1300 //all_completed.find(fn_eval_id) == all_completed.end() &&
1301 !localServerAssigned[server_index] )
1302 { launch = true; localServerAssigned.set(server_index); }
1303 }
1304 else {
1305 if (i < completed) launch = true;
1306 else break;
1307 }
1308
1309 if (launch) {
1310 launch_asynch_local(local_prp_iter); ++num_active; //++num_launch;
1311 if (static_limited && num_active == asynchLocalEvalConcurrency)
1312 break;
1313 }
1314 }
1315 }
1316
1317 //clear_bookkeeping(); // clear any bookkeeping lists in derived classes
1318 }
1319
1320
1321 void ApplicationInterface::
assign_asynch_local_queue(PRPQueue & local_prp_queue,PRPQueueIter & local_prp_iter)1322 assign_asynch_local_queue(PRPQueue& local_prp_queue,
1323 PRPQueueIter& local_prp_iter)
1324 {
1325 // This fn is used to assign an initial set of jobs; no local jobs should
1326 // be active at this point.
1327 if (!asynchLocalActivePRPQueue.empty()) {
1328 Cerr << "Error: ApplicationInterface::assign_asynch_local_queue() invoked "
1329 << "with existing asynch local jobs." << std::endl;
1330 abort_handler(-1);
1331 }
1332
1333 // special data for static-scheduling case: asynch local concurrency is
1334 // limited and we need to stratify the job scheduling according to eval id.
1335 // This case has to handle non-contiguous eval IDs as it could happen with
1336 // restart; fill jobs until all servers busy or at end of queue -- this is
1337 // similar to nowait case.
1338 bool static_limited
1339 = (asynchLocalEvalStatic && asynchLocalEvalConcurrency > 1);
1340 size_t static_servers;
1341 if (static_limited) {
1342 static_servers = asynchLocalEvalConcurrency * numEvalServers;
1343 if (localServerAssigned.size() != static_servers)
1344 localServerAssigned.resize(static_servers);
1345 localServerAssigned.reset(); // in blocking case, always reset job map
1346 }
1347 // for static_limited, need an aggregated set of completions for job launch
1348 // testing (completionList only spans the current asynchLocalActivePRPQueue;
1349 // rawResponseMap spans beforeSynchCorePRPQueue jobs at the synch{,_nowait}()
1350 // level). Since the incoming local_prp_queue can involve a subset of
1351 // beforeSynchCorePRPQueue, all_completed spans a desired intermediate level.
1352 // However, for purposes of identifying completed evaluations, presence in
1353 // the higher-level rawResponseMap scope is sufficient to indicate
1354 // completion, with a small penalty of searches within a larger queue.
1355 //IntSet all_completed; // track all completed evals within local_prp_queue
1356
1357 int fn_eval_id, num_jobs = local_prp_queue.size();
1358 if (multiProcEvalFlag) // TO DO: deactivate this bcast
1359 parallelLib.bcast_e(num_jobs);
1360 size_t i, server_index, num_active = 0,
1361 num_sends = (asynchLocalEvalConcurrency) ?
1362 std::min(asynchLocalEvalConcurrency, num_jobs) : // limited by user spec.
1363 num_jobs; // unlimited (default): launch all jobs in first pass
1364 bool launch;
1365
1366 // Step 1: launch jobs up to asynch concurrency limit (if specified)
1367 for (i=0, local_prp_iter = local_prp_queue.begin();
1368 local_prp_iter != local_prp_queue.end(); ++i, ++local_prp_iter) {
1369 launch = false;
1370 fn_eval_id = local_prp_iter->eval_id();
1371 if (static_limited) {
1372 server_index = (fn_eval_id - 1) % static_servers;
1373 if (!localServerAssigned[server_index]) // if local "server" not busy
1374 { launch = true; ++num_active; localServerAssigned.set(server_index); }
1375 }
1376 else {
1377 if (i<num_sends) launch = true;
1378 else break;
1379 }
1380 if (launch)
1381 launch_asynch_local(local_prp_iter);
1382 if (static_limited && num_active == asynchLocalEvalConcurrency)
1383 break;
1384 }
1385 }
1386
1387
1388 size_t ApplicationInterface::
test_receives_backfill(PRPQueueIter & assign_iter,bool peer_flag)1389 test_receives_backfill(PRPQueueIter& assign_iter, bool peer_flag)
1390 {
1391 if (outputLevel == DEBUG_OUTPUT)
1392 Cout << "Testing message receives from remote servers\n";
1393
1394 int mpi_test_flag, fn_eval_id, new_eval_id, server_id;
1395 size_t buff_index;
1396 MPI_Status status; // only 1 needed for parallelLib.test()
1397 std::map<int, IntSizetPair>::iterator run_iter; PRPQueueIter return_iter;
1398 IntIntMap removals; size_t receives = 0;
1399
1400 for (run_iter = msgPassRunningMap.begin();
1401 run_iter != msgPassRunningMap.end(); ++run_iter) {
1402 IntSizetPair& id_index = run_iter->second;
1403 buff_index = id_index.second;
1404 parallelLib.test(recvRequests[buff_index], mpi_test_flag, status);
1405 if (mpi_test_flag) {
1406 fn_eval_id = run_iter->first; // or status.MPI_TAG;
1407 server_id = id_index.first;
1408 return_iter = lookup_by_eval_id(beforeSynchCorePRPQueue, fn_eval_id);
1409 receive_evaluation(return_iter, buff_index, server_id, peer_flag);
1410 ++receives;
1411
1412 // replace job if more are pending
1413 bool new_job = false;
1414 while (assign_iter != beforeSynchCorePRPQueue.end()) {
1415 new_eval_id = assign_iter->eval_id();
1416 if (msgPassRunningMap.find(new_eval_id) == msgPassRunningMap.end() &&
1417 lookup_by_eval_id(asynchLocalActivePRPQueue, new_eval_id) ==
1418 asynchLocalActivePRPQueue.end() &&
1419 rawResponseMap.find(new_eval_id) == rawResponseMap.end())
1420 { new_job = true; break; }
1421 ++assign_iter;
1422 }
1423 if (new_job) {
1424 // assign job
1425 send_evaluation(assign_iter, buff_index, server_id, peer_flag);
1426 // update bookkeeping
1427 removals[fn_eval_id] = new_eval_id; // replace old with new
1428 ++assign_iter;
1429 }
1430 else
1431 removals[fn_eval_id] = 0; // no replacement, just remove
1432 }
1433 }
1434
1435 // update msgPassRunningMap. This is not done inside loop above to:
1436 // (1) avoid any iterator invalidation, and
1437 // (2) avoid testing of newly inserted jobs (violates scheduling fairness)
1438 for (IntIntMIter it=removals.begin(); it!=removals.end(); ++it) {
1439 int remove_id = it->first, replace_id = it->second;
1440 if (replace_id) {
1441 run_iter = msgPassRunningMap.find(remove_id);
1442 msgPassRunningMap[replace_id] = run_iter->second; // does not invalidate
1443 msgPassRunningMap.erase(run_iter); // run_iter still valid
1444 }
1445 else
1446 msgPassRunningMap.erase(remove_id);
1447 }
1448
1449 return receives;
1450 }
1451
1452
1453 size_t ApplicationInterface::
test_local_backfill(PRPQueue & assign_queue,PRPQueueIter & assign_iter)1454 test_local_backfill(PRPQueue& assign_queue, PRPQueueIter& assign_iter)
1455 {
1456 if (outputLevel == DEBUG_OUTPUT) // explicit debug user specification
1457 Cout << "Testing local completions\n";
1458
1459 bool static_limited
1460 = (asynchLocalEvalStatic && asynchLocalEvalConcurrency > 1);
1461 size_t static_servers, server_index;
1462 if (static_limited)
1463 static_servers = asynchLocalEvalConcurrency * numEvalServers;
1464
1465 // "Step 2" (of asynch_local_evaluations_nowait()): process any completed
1466 // jobs using test_local_evaluations()
1467 completionSet.clear();
1468 test_local_evaluations(asynchLocalActivePRPQueue); // rebuilds completionSet
1469 size_t completed = completionSet.size();
1470 for (ISCIter id_iter = completionSet.begin();
1471 id_iter != completionSet.end(); ++id_iter)
1472 process_asynch_local(*id_iter);
1473
1474 // "Step 3" (of asynch_local_evaluations_nowait()): backfill completed
1475 // jobs with the next pending jobs from assign_queue (if present)
1476 if (completed) {
1477 int fn_eval_id; bool launch;
1478 size_t num_active = asynchLocalActivePRPQueue.size();
1479 if (static_limited) assign_iter = assign_queue.begin(); // reset to start
1480 for (; assign_iter != assign_queue.end(); ++assign_iter) {
1481 fn_eval_id = assign_iter->eval_id();
1482 // test of msgPassRunningMap needed for static_limited or for
1483 // peer_dynamic_schedule_evaluations_nowait()
1484 if ( lookup_by_eval_id(asynchLocalActivePRPQueue, fn_eval_id) ==
1485 asynchLocalActivePRPQueue.end() &&
1486 msgPassRunningMap.find(fn_eval_id) == msgPassRunningMap.end() &&
1487 rawResponseMap.find(fn_eval_id) == rawResponseMap.end() ) {
1488 launch = true;
1489 if (static_limited) { // only schedule if local "server" not busy
1490 server_index = (fn_eval_id - 1) % static_servers;
1491 if (localServerAssigned[server_index]) launch = false;
1492 else localServerAssigned.set(server_index);
1493 }
1494 if (launch) {
1495 launch_asynch_local(assign_iter); ++num_active;
1496 if (asynchLocalEvalConcurrency && // if throttled
1497 num_active >= asynchLocalEvalConcurrency)
1498 { ++assign_iter; break; } // else assign_iter incremented by loop
1499 }
1500 }
1501 }
1502 }
1503
1504 return completed;
1505 }
1506
1507
1508 /** This code is called from synchronize_nowait() to provide the master
1509 portion of a nonblocking master-slave algorithm for the dynamic
1510 scheduling of evaluations among slave servers. It performs no
1511 evaluations locally and matches either serve_evaluations_synch()
1512 or serve_evaluations_asynch() on the slave servers, depending on
1513 the value of asynchLocalEvalConcurrency. Dynamic scheduling
1514 assigns jobs in 2 passes. The 1st pass gives each server the same
1515 number of jobs (equal to asynchLocalEvalConcurrency). The 2nd
1516 pass assigns the remaining jobs to slave servers as previous jobs
1517 are completed. Single- and multilevel parallel use intra- and
1518 inter-communicators, respectively, for send/receive. Specific
1519 syntax is encapsulated within ParallelLibrary. */
master_dynamic_schedule_evaluations_nowait()1520 void ApplicationInterface::master_dynamic_schedule_evaluations_nowait()
1521 {
1522 // beforeSynchCorePRPQueue includes running evaluations plus new requests;
1523 // previous completions have been removed by synchronize_nowait(). Thus,
1524 // queue size could be larger or smaller than on previous nowait invocation.
1525 size_t i, num_jobs = beforeSynchCorePRPQueue.size(), buff_index, server_index,
1526 server_job_index, num_running = msgPassRunningMap.size(),
1527 num_backfill = num_jobs - num_running, capacity = numEvalServers;
1528 if (asynchLocalEvalConcurrency > 1) capacity *= asynchLocalEvalConcurrency;
1529 int fn_eval_id, server_id;
1530
1531 // allocate capacity entries since this avoids need for dynamic resizing
1532 if (!sendBuffers) {
1533 sendBuffers = new MPIPackBuffer [capacity];
1534 recvBuffers = new MPIUnpackBuffer [capacity];
1535 recvRequests = new MPI_Request [capacity];
1536 }
1537
1538 // Step 1: launch any new jobs up to capacity limit
1539 PRPQueueIter assign_iter = beforeSynchCorePRPQueue.begin();
1540 if (!num_running) { // simplest case
1541 num_running = std::min(capacity, num_jobs);
1542 Cout << "Master dynamic schedule: first pass assigning " << num_running
1543 << " jobs among " << numEvalServers << " servers\n";
1544 // send data & post receives for 1st set of jobs
1545 for (i=0; i<num_running; ++i, ++assign_iter) {
1546 server_index = i%numEvalServers;
1547 server_id = server_index + 1; // from 1 to numEvalServers
1548 // stratify buff_index in a manner that's convenient for backfill lookups
1549 server_job_index = i/numEvalServers; // job index local to each server
1550 buff_index = server_index * asynchLocalEvalConcurrency + server_job_index;
1551 // assign job
1552 send_evaluation(assign_iter, buff_index, server_id, false); // !peer
1553 // update bookkeeping
1554 fn_eval_id = assign_iter->eval_id();
1555 msgPassRunningMap[fn_eval_id] = IntSizetPair(server_id, buff_index);
1556 }
1557 }
1558 else if (num_backfill && num_running < capacity) { // fill in any gaps
1559 Cout << "Master dynamic schedule: first pass backfilling jobs up to "
1560 << "available capacity\n";
1561 UShortSetArray server_jobs(numEvalServers);
1562 for (std::map<int, IntSizetPair>::iterator r_it = msgPassRunningMap.begin();
1563 r_it != msgPassRunningMap.end(); ++r_it) {
1564 server_index = r_it->second.first - 1; buff_index = r_it->second.second;
1565 server_jobs[server_index].insert(buff_index);
1566 }
1567 for (; assign_iter != beforeSynchCorePRPQueue.end() &&
1568 num_running < capacity; ++assign_iter) {
1569 fn_eval_id = assign_iter->eval_id();
1570 if (msgPassRunningMap.find(fn_eval_id) == msgPassRunningMap.end()) {
1571 // find server to use and define index within buffers/requests
1572 // Approach 1: grab first available slot
1573 // for (buff_index=0, server_index=0; server_index<numEvalServers;
1574 // ++server_index) {
1575 // buff_index += server_jobs[server_index];
1576 // if (server_jobs[server_index] < asynchEvalConcurrency)
1577 // { ++server_jobs[server_index]; break; }
1578 // }
1579 // Approach 2: load balance by finding min within server_jobs
1580 unsigned short load, min_load = server_jobs[0].size();
1581 size_t min_index = 0;
1582 for (server_index=1; server_index<numEvalServers; ++server_index) {
1583 if (min_load == 0) break;
1584 load = server_jobs[server_index].size();
1585 if (load < min_load)
1586 { min_index = server_index; min_load = load; }
1587 }
1588 server_id = min_index + 1; // 1 to numEvalServers
1589 // find an available buff_index for this server_id. Logic uses first
1590 // available within this server's allocation of buffer indices.
1591 UShortSet& server_jobs_mi = server_jobs[min_index]; bool avail = false;
1592 size_t max_buff_index = server_id*asynchLocalEvalConcurrency;
1593 for (buff_index=min_index*asynchLocalEvalConcurrency;
1594 buff_index<max_buff_index; ++buff_index)
1595 if (server_jobs_mi.find(buff_index) == server_jobs_mi.end())
1596 { avail = true; break; }
1597 if (!avail) {
1598 Cerr << "Error: no available buffer index for backfill in Application"
1599 << "Interface::master_dynamic_schedule_evaluations_nowait()."
1600 << std::endl;
1601 abort_handler(-1);
1602 }
1603 // assign job
1604 send_evaluation(assign_iter, buff_index, server_id, false); // !peer
1605 // update bookkeeping
1606 msgPassRunningMap[fn_eval_id] = IntSizetPair(server_id, buff_index);
1607 server_jobs[min_index].insert(buff_index); ++num_running;
1608 }
1609 }
1610 }
1611
1612 // Step 2: check status of running jobs and backfill any completions
1613 if (headerFlag) {
1614 Cout << "Master dynamic schedule: second pass testing for completions ("
1615 << num_running << " running)";
1616 if (num_running == num_jobs) Cout << '\n';
1617 else Cout << " and backfilling (" << num_jobs-num_running <<" remaining)\n";
1618 }
1619 test_receives_backfill(assign_iter, false); // !peer
1620
1621 if (msgPassRunningMap.empty()) {
1622 // deallocate MPI & buffer arrays
1623 delete [] sendBuffers; sendBuffers = NULL;
1624 delete [] recvBuffers; recvBuffers = NULL;
1625 delete [] recvRequests; recvRequests = NULL;
1626 }
1627 }
1628
1629
1630 /** This code runs on the iteratorCommRank 0 processor (the iterator)
1631 and is called from synchronize_nowait() in order to manage a
1632 nonblocking static schedule. It matches serve_evaluations_synch()
1633 for other evaluation partitions (asynchLocalEvalConcurrency == 1).
1634 It performs blocking local function evaluations, one at a time,
1635 for its portion of the static schedule and checks for remote
1636 completions in between each local completion. Therefore, unlike
1637 peer_dynamic_schedule_evaluations_nowait(), this scheduler will
1638 always return at least one job. Single-level and multilevel
1639 parallel use intra- and inter-communicators, respectively, for
1640 send/receive, with specific syntax as encapsulated within
1641 ParallelLibrary. The iteratorCommRank 0 processor assigns the
1642 static schedule since it is the only processor with access to
1643 beforeSynchCorePRPQueue (it runs the iterator and calls
1644 synchronize). The alternate design of each peer selecting its own
1645 jobs using the modulus operator would be applicable if execution
1646 of this function (and therefore the job list) were distributed. */
peer_static_schedule_evaluations_nowait()1647 void ApplicationInterface::peer_static_schedule_evaluations_nowait()
1648 {
1649 // beforeSynchCorePRPQueue includes running evaluations plus new requests;
1650 // previous completions have been removed by synchronize_nowait(). Thus,
1651 // queue size could be larger or smaller than on previous nowait invocation.
1652 // Rounding down num_local_jobs offloads this processor (which has additional
1653 // work relative to other peers), but results in a few more passed messages.
1654 int fn_eval_id, server_id;
1655 size_t i, num_jobs = beforeSynchCorePRPQueue.size(), buff_index, server_index,
1656 server_job_index, num_remote_running = msgPassRunningMap.size(),
1657 num_local_running = asynchLocalActivePRPQueue.size(),
1658 num_running = num_remote_running + num_local_running,
1659 num_backfill = num_jobs - num_running, local_capacity = 1,
1660 capacity = numEvalServers;
1661 if (asynchLocalEvalConcurrency > 1) {
1662 local_capacity = asynchLocalEvalConcurrency;
1663 capacity *= asynchLocalEvalConcurrency;
1664 }
1665 size_t remote_capacity = capacity - local_capacity;
1666
1667 // peer static nowait supports blocking local evals, taken one at a time,
1668 // if necessary to support direct interfaces or multiproc evals.
1669 bool synch_local
1670 = ( multiProcEvalFlag || ( interfaceType & DIRECT_INTERFACE_BIT ) );
1671 //bool static_limited
1672 // = ( asynchLocalEvalStatic || evalScheduling == PEER_STATIC_SCHEDULING );
1673
1674 // allocate remote_capacity entries as this avoids need for dynamic resizing
1675 if (!sendBuffers) {
1676 sendBuffers = new MPIPackBuffer [remote_capacity];
1677 recvBuffers = new MPIUnpackBuffer [remote_capacity];
1678 recvRequests = new MPI_Request [remote_capacity];
1679 }
1680
1681 PRPQueueIter assign_iter = beforeSynchCorePRPQueue.begin(), local_prp_iter;
1682 if (!num_running) { // simplest case
1683
1684 size_t num_local_jobs = (size_t)std::floor((Real)num_jobs/numEvalServers),
1685 num_remote_jobs = num_jobs - num_local_jobs;
1686 num_local_running = std::min(local_capacity, num_local_jobs);
1687 num_remote_running = std::min(remote_capacity, num_remote_jobs);
1688 num_running = num_local_running + num_remote_running;
1689
1690 // this reference used to preserve static server assignment
1691 //if (static_limited)
1692 nowaitEvalIdRef = assign_iter->eval_id();
1693
1694 Cout << "Peer static schedule: first pass assigning " << num_remote_running
1695 << " jobs among " << numEvalServers-1 << " remote peers\n";
1696 PRPQueue local_prp_queue; buff_index = 0;
1697 for (i=1; i<=num_running; ++i, ++assign_iter) {//shift by 1 to reduce pr1 wk
1698 server_id = i%numEvalServers; // 0 to numEvalServers-1
1699 if (server_id) { // 1 to numEvalServers-1
1700 send_evaluation(assign_iter, buff_index, server_id, true); // peer
1701 // update bookkeeping
1702 fn_eval_id = assign_iter->eval_id();
1703 msgPassRunningMap[fn_eval_id] = IntSizetPair(server_id, buff_index);
1704 ++buff_index;
1705 }
1706 else
1707 local_prp_queue.insert(*assign_iter);
1708 }
1709
1710 // Perform computation for first num_local_jobs jobs on peer 1. Default
1711 // behavior is synchronous evaluation of jobs on each peer. Only if
1712 // asynchLocalEvalConcurrency > 1 do we get the hybrid parallelism of
1713 // asynch jobs on each peer.
1714 Cout << "Peer static schedule: first pass launching " << num_local_running
1715 << " local jobs\n";
1716 if (synch_local) // synch launch and complete
1717 synchronous_local_evaluations(local_prp_queue);
1718 else // asynch launch only w/o backfill logic
1719 assign_asynch_local_queue(local_prp_queue, local_prp_iter);
1720 }
1721 else if (num_backfill && num_running < capacity) { // fill in any gaps
1722 Cout << "Peer static schedule: first pass backfilling jobs up to "
1723 << "available capacity\n";
1724
1725 // server_id is 1:numEvalServ-1, server_jobs is indexed 0:numEvalServ-2
1726 UShortSetArray server_jobs(numEvalServers-1); PRPQueue local_prp_queue;
1727 for (std::map<int, IntSizetPair>::iterator r_it = msgPassRunningMap.begin();
1728 r_it != msgPassRunningMap.end(); ++r_it) {
1729 server_index = r_it->second.first - 1; buff_index = r_it->second.second;
1730 server_jobs[server_index].insert(buff_index);
1731 }
1732 bool running_mp, running_al, backfill_local = false;
1733 for (; assign_iter != beforeSynchCorePRPQueue.end() &&
1734 ( num_local_running < local_capacity ||
1735 num_remote_running < remote_capacity ); ++assign_iter) {
1736 fn_eval_id = assign_iter->eval_id();
1737 // look here first
1738 running_mp = (msgPassRunningMap.find(fn_eval_id) !=
1739 msgPassRunningMap.end());
1740 // look here second if not already found
1741 running_al = (running_mp) ? false :
1742 (lookup_by_eval_id(asynchLocalActivePRPQueue, fn_eval_id) !=
1743 asynchLocalActivePRPQueue.end());
1744 if (!running_mp && !running_al) { // can launch as new job
1745
1746 // to enable consistent modulo arithmetic on server assignment, use
1747 // the first eval id from the last complete job set (!num_running) as
1748 // a reference; +1 is consistent with shift by 1 to reduce peer1 work
1749 //if (static_limited)
1750 server_id = (fn_eval_id-nowaitEvalIdRef+1)%numEvalServers;//0 to numES-1
1751 //else could assign based on min load as in peer dynamic nowait case
1752
1753 // assign job
1754 if (server_id == 0 && num_local_running < local_capacity) {
1755 local_prp_queue.insert(*assign_iter);
1756 ++num_local_running; backfill_local = true;
1757 }
1758 else if (server_id && server_jobs[server_id-1].size() < local_capacity){
1759 // find an available buff_index for this server_id. Logic uses first
1760 // available within this server's allocation of buffer indices.
1761 size_t server_index = server_id - 1,
1762 min_buff = server_index * asynchLocalEvalConcurrency,
1763 max_buff = min_buff + asynchLocalEvalConcurrency;
1764 UShortSet& server_jobs_mi = server_jobs[server_index];
1765 bool avail = false;
1766 for (buff_index=min_buff; buff_index<max_buff; ++buff_index)
1767 if (server_jobs_mi.find(buff_index) == server_jobs_mi.end())
1768 { avail = true; break; }
1769 if (!avail) {
1770 Cerr << "Error: no available buffer index for backfill in "
1771 << "ApplicationInterface::peer_static_schedule_evaluations_"
1772 << "nowait()." << std::endl;
1773 abort_handler(-1);
1774 }
1775 // assign job
1776 send_evaluation(assign_iter, buff_index, server_id, true);
1777 // update bookkeeping
1778 msgPassRunningMap[fn_eval_id] = IntSizetPair(server_id, buff_index);
1779 server_jobs[server_index].insert(buff_index);
1780 ++num_remote_running;
1781 }
1782 }
1783 else if (running_al) // include in local queue for asynch processing
1784 local_prp_queue.insert(*assign_iter);
1785 }
1786
1787 if (backfill_local) {
1788 if (synch_local) synchronous_local_evaluations(local_prp_queue);
1789 else assign_asynch_local_queue_nowait(local_prp_queue, local_prp_iter);
1790 }
1791
1792 num_running = num_local_running + num_remote_running; // update
1793 }
1794
1795 // Step 2: check status of running jobs and backfill any completions
1796 if (headerFlag) {
1797 Cout << "Peer static schedule: second pass testing for completions ("
1798 << num_running << " running)";
1799 if (num_running == num_jobs) Cout << '\n';
1800 else Cout << " and backfilling (" << num_jobs-num_running <<" remaining)\n";
1801 }
1802 if (num_remote_running)
1803 test_receives_backfill(assign_iter, true); // peer
1804 if (!synch_local && num_local_running)
1805 test_local_backfill(beforeSynchCorePRPQueue, assign_iter);
1806
1807 if (msgPassRunningMap.empty()) {
1808 // deallocate MPI & buffer arrays
1809 delete [] sendBuffers; sendBuffers = NULL;
1810 delete [] recvBuffers; recvBuffers = NULL;
1811 delete [] recvRequests; recvRequests = NULL;
1812 }
1813 }
1814
1815
1816 /** This code runs on the iteratorCommRank 0 processor (the iterator) and
1817 is called from synchronize_nowait() in order to manage a nonblocking
1818 static schedule. It matches serve_evaluations_{synch,asynch}() for
1819 other evaluation partitions (depending on asynchLocalEvalConcurrency).
1820 It performs nonblocking local function evaluations for its portion of
1821 the static schedule using asynchronous_local_evaluations().
1822 Single-level and multilevel parallel use intra- and inter-communicators,
1823 respectively, for send/receive, with specific syntax as encapsulated
1824 within ParallelLibrary. The iteratorCommRank 0 processor assigns the
1825 dynamic schedule since it is the only processor with access to
1826 beforeSynchCorePRPQueue (it runs the iterator and calls synchronize).
1827 The alternate design of each peer selecting its own jobs using the
1828 modulus operator would be applicable if execution of this function
1829 (and therefore the job list) were distributed. */
peer_dynamic_schedule_evaluations_nowait()1830 void ApplicationInterface::peer_dynamic_schedule_evaluations_nowait()
1831 {
1832 // beforeSynchCorePRPQueue includes running evaluations plus new requests;
1833 // previous completions have been removed by synchronize_nowait(). Thus,
1834 // queue size could be larger or smaller than on previous nowait invocation.
1835 // Rounding down num_local_jobs offloads this processor (which has additional
1836 // work relative to other peers), but results in a few more passed messages.
1837 int fn_eval_id, server_id;
1838 size_t i, num_jobs = beforeSynchCorePRPQueue.size(), buff_index, server_index,
1839 server_job_index, num_remote_running = msgPassRunningMap.size(),
1840 num_local_running = asynchLocalActivePRPQueue.size(),
1841 num_running = num_remote_running + num_local_running,
1842 num_backfill = num_jobs - num_running, local_capacity = 1,
1843 capacity = numEvalServers;
1844 if (asynchLocalEvalConcurrency > 1) {
1845 local_capacity = asynchLocalEvalConcurrency;
1846 capacity *= asynchLocalEvalConcurrency;
1847 }
1848 size_t remote_capacity = capacity - local_capacity;
1849
1850 // allocate remote_capacity entries as this avoids need for dynamic resizing
1851 if (!sendBuffers) {
1852 sendBuffers = new MPIPackBuffer [remote_capacity];
1853 recvBuffers = new MPIUnpackBuffer [remote_capacity];
1854 recvRequests = new MPI_Request [remote_capacity];
1855 }
1856
1857 PRPQueueIter assign_iter = beforeSynchCorePRPQueue.begin(), local_prp_iter;
1858 if (!num_running) { // simplest case
1859 size_t num_local_jobs = (size_t)std::floor((Real)num_jobs/numEvalServers),
1860 num_remote_jobs = num_jobs - num_local_jobs;
1861 num_local_running = std::min(local_capacity, num_local_jobs);
1862 num_remote_running = std::min(remote_capacity, num_remote_jobs);
1863
1864 // don't leave a static gap since we're going to dynamically assign...
1865 Cout << "Peer dynamic schedule: first pass assigning " << num_remote_running
1866 << " jobs among " << numEvalServers-1 << " remote peers\n";
1867 std::advance(assign_iter, num_local_running);//num_local_jobs);
1868 PRPQueueIter assign_iter_save = assign_iter;
1869 for (i=0; i<num_remote_running; ++i, ++assign_iter) {
1870 server_index = i%(numEvalServers-1);
1871 server_id = server_index + 1; // 1 to numEvalServers-1
1872 // stratify buff_index in a manner that's convenient for backfill lookups
1873 server_job_index = i/(numEvalServers-1); // job index local to each server
1874 buff_index = server_index * asynchLocalEvalConcurrency + server_job_index;
1875 // assign job to remote peer
1876 send_evaluation(assign_iter, buff_index, server_id, true); // peer
1877 // update bookkeeping
1878 fn_eval_id = assign_iter->eval_id();
1879 msgPassRunningMap[fn_eval_id] = IntSizetPair(server_id, buff_index);
1880 }
1881
1882 // Perform computation for first num_local_jobs jobs on peer 1. Default
1883 // behavior is synchronous evaluation of jobs on each peer. Only if
1884 // asynchLocalEvalConcurrency > 1 do we get the hybrid parallelism of
1885 // asynch jobs on each peer.
1886 PRPQueue local_prp_queue(beforeSynchCorePRPQueue.begin(), assign_iter_save);
1887 Cout << "Peer dynamic schedule: first pass launching " << num_local_running
1888 << " local jobs\n";
1889 assign_asynch_local_queue(local_prp_queue, local_prp_iter);
1890
1891 num_running = num_local_running + num_remote_running; // update
1892 }
1893 else if (num_backfill && num_running < capacity) { // fill in any gaps
1894 Cout << "Peer dynamic schedule: first pass backfilling jobs up to "
1895 << "available capacity\n";
1896 // server_id is 1:numEvalServ-1, server_jobs is indexed 0:numEvalServ-2
1897 UShortSetArray server_jobs(numEvalServers-1); PRPQueue local_prp_queue;
1898 for (std::map<int, IntSizetPair>::iterator r_it = msgPassRunningMap.begin();
1899 r_it != msgPassRunningMap.end(); ++r_it) {
1900 server_index = r_it->second.first - 1; buff_index = r_it->second.second;
1901 server_jobs[server_index].insert(buff_index);
1902 }
1903 bool running_mp, running_al, backfill_local = false;
1904 for (; assign_iter != beforeSynchCorePRPQueue.end() &&
1905 ( num_local_running < local_capacity ||
1906 num_remote_running < remote_capacity ); ++assign_iter) {
1907 fn_eval_id = assign_iter->eval_id();
1908 // look here first
1909 running_mp = (msgPassRunningMap.find(fn_eval_id) !=
1910 msgPassRunningMap.end());
1911 // look here second if not already found
1912 running_al = (running_mp) ? false :
1913 (lookup_by_eval_id(asynchLocalActivePRPQueue, fn_eval_id) !=
1914 asynchLocalActivePRPQueue.end());
1915 if (!running_mp && !running_al) { // can launch as new job
1916 // determine min among local and remote loadings; tie goes to remote
1917 // server_id is 1:numEvalServ-1, server_index is 0:numEvalServ-2
1918 unsigned short load, min_load = server_jobs[0].size();
1919 size_t min_server_id = 1;
1920 for (server_index=1; server_index<numEvalServers-1; ++server_index) {
1921 if (min_load == 0) break;
1922 load = server_jobs[server_index].size();
1923 if (load < min_load)
1924 { min_server_id = server_index + 1; min_load = load; }
1925 }
1926 if (num_local_running < min_load) // tie goes to remote
1927 { min_server_id = 0; min_load = num_local_running; }
1928
1929 // assign job
1930 if (min_server_id == 0 && num_local_running < local_capacity) {
1931 local_prp_queue.insert(*assign_iter);
1932 ++num_local_running; backfill_local = true;
1933 }
1934 else if (min_server_id && num_remote_running < remote_capacity) {
1935 // find an available buff_index for this server_id. Logic uses first
1936 // available within this server's allocation of buffer indices.
1937 size_t min_server_index = min_server_id - 1,
1938 max_buff_index = min_server_id*asynchLocalEvalConcurrency;
1939 UShortSet& server_jobs_mi = server_jobs[min_server_index];
1940 bool avail = false;
1941 for (buff_index=max_buff_index-asynchLocalEvalConcurrency;
1942 buff_index<max_buff_index; ++buff_index)
1943 if (server_jobs_mi.find(buff_index) == server_jobs_mi.end())
1944 { avail = true; break; }
1945 if (!avail) {
1946 Cerr << "Error: no available buffer index for backfill in "
1947 << "ApplicationInterface::peer_dynamic_schedule_evaluations_"
1948 << "nowait()."<< std::endl;
1949 abort_handler(-1);
1950 }
1951 // assign job
1952 send_evaluation(assign_iter, buff_index, min_server_id, true); // peer
1953 // update bookkeeping
1954 msgPassRunningMap[fn_eval_id]
1955 = IntSizetPair(min_server_id, buff_index);
1956 server_jobs[min_server_index].insert(buff_index);
1957 ++num_remote_running;
1958 }
1959 }
1960 else if (running_al) // include in local queue for asynch processing
1961 local_prp_queue.insert(*assign_iter);
1962 }
1963
1964 if (backfill_local)
1965 assign_asynch_local_queue_nowait(local_prp_queue, local_prp_iter);
1966
1967 num_running = num_local_running + num_remote_running; // update
1968 }
1969
1970 // Step 2: check status of running jobs and backfill any completions
1971 if (headerFlag) {
1972 Cout << "Peer dynamic schedule: second pass testing for completions ("
1973 << num_running << " running)";
1974 if (num_running == num_jobs) Cout << '\n';
1975 else Cout << " and backfilling (" << num_jobs-num_running <<" remaining)\n";
1976 }
1977 if (num_remote_running)
1978 test_receives_backfill(assign_iter, true); // peer
1979 if (num_local_running)
1980 test_local_backfill(beforeSynchCorePRPQueue, assign_iter);
1981
1982 if (msgPassRunningMap.empty()) {
1983 // deallocate MPI & buffer arrays
1984 delete [] sendBuffers; sendBuffers = NULL;
1985 delete [] recvBuffers; recvBuffers = NULL;
1986 delete [] recvRequests; recvRequests = NULL;
1987 }
1988 }
1989
1990
1991 /** This function provides nonblocking synchronization for the local
1992 asynch case (background system call, nonblocking fork, or
1993 threads). It is called from synchronize_nowait() and passed the
1994 complete set of all asynchronous jobs (beforeSynchCorePRPQueue).
1995 It uses derived_map_asynch() to initiate asynchronous evaluations
1996 and test_local_evaluations() to capture completed jobs in
1997 nonblocking mode. It mirrors a nonblocking message passing
1998 scheduler as much as possible (test_local_evaluations() modeled
1999 after MPI_Testsome()). The result of this function is
2000 rawResponseMap, which uses eval_id as a key. It is assumed that
2001 the incoming local_prp_queue contains only active and new jobs -
2002 i.e., all completed jobs are cleared by synchronize_nowait().
2003
2004 Also supports asynchronous local evaluations with static
2005 scheduling. This scheduling policy specifically ensures that a
2006 completed asynchronous evaluation eval_id is replaced with an
2007 equivalent one, modulo asynchLocalEvalConcurrency. In the nowait
2008 case, this could render some servers idle if evaluations don't
2009 come in eval_id order or some evaluations are cancelled by the
2010 caller in between calls. If this function is called with unlimited
2011 local eval concurrency, the static scheduling request is ignored. */
2012 void ApplicationInterface::
asynchronous_local_evaluations_nowait(PRPQueue & local_prp_queue)2013 asynchronous_local_evaluations_nowait(PRPQueue& local_prp_queue)
2014 {
2015 size_t num_jobs = local_prp_queue.size(),
2016 num_target = (asynchLocalEvalConcurrency) ?
2017 std::min((size_t)asynchLocalEvalConcurrency, num_jobs) : num_jobs,
2018 num_active = asynchLocalActivePRPQueue.size(),
2019 num_launch = num_target - num_active;
2020 bool static_limited
2021 = (asynchLocalEvalStatic && asynchLocalEvalConcurrency > 1);
2022
2023 // Step 1: update asynchLocalActivePRPQueue with jobs from local_prp_queue
2024 PRPQueueIter local_prp_iter = local_prp_queue.begin();
2025 if (num_launch) {
2026 Cout << "First pass: initiating ";
2027 if (static_limited) Cout << "at most ";
2028 Cout << num_launch << " local asynchronous jobs\n";
2029 assign_asynch_local_queue_nowait(local_prp_queue, local_prp_iter);
2030 }
2031
2032 // Step 2: process any completed jobs and backfill if necessary
2033 num_active = asynchLocalActivePRPQueue.size(); // update
2034 if (headerFlag) {
2035 Cout << "Second pass: testing for completions (" << num_active<<" running)";
2036 if (num_active == num_jobs) Cout << '\n';
2037 else Cout << " and backfilling (" << num_jobs-num_active << " remaining)\n";
2038 }
2039 test_local_backfill(local_prp_queue, local_prp_iter);
2040 }
2041
2042
2043 void ApplicationInterface::
assign_asynch_local_queue_nowait(PRPQueue & local_prp_queue,PRPQueueIter & local_prp_iter)2044 assign_asynch_local_queue_nowait(PRPQueue& local_prp_queue,
2045 PRPQueueIter& local_prp_iter)
2046 {
2047 // As compared to assign_asynch_local_queue(), this fn may be used to augment
2048 // an existing set of active jobs (as is appropriate within a nowait context).
2049
2050 // special data for static scheduling case: asynch local concurrency is
2051 // limited and we need to stratify the job scheduling according to eval id.
2052 bool static_limited
2053 = (asynchLocalEvalStatic && asynchLocalEvalConcurrency > 1);
2054 size_t static_servers;
2055 if (static_limited) {
2056 static_servers = asynchLocalEvalConcurrency * numEvalServers;
2057 if (localServerAssigned.size() != static_servers) {
2058 localServerAssigned.resize(static_servers);
2059 localServerAssigned.reset(); // nonblocking case: only reset on 1st call
2060 }
2061 }
2062
2063 int fn_eval_id, num_jobs = local_prp_queue.size();
2064 size_t server_index, num_active = asynchLocalActivePRPQueue.size();
2065 bool launch;
2066 if (multiProcEvalFlag) // TO DO: deactivate this bcast
2067 parallelLib.bcast_e(num_jobs);
2068
2069 // Step 1: launch any new jobs up to asynch concurrency limit (if specified)
2070 for (local_prp_iter = local_prp_queue.begin();
2071 local_prp_iter != local_prp_queue.end(); ++local_prp_iter) {
2072 if (asynchLocalEvalConcurrency && // not unlimited
2073 num_active >= asynchLocalEvalConcurrency)
2074 break;
2075 fn_eval_id = local_prp_iter->eval_id();
2076 if (lookup_by_eval_id(asynchLocalActivePRPQueue, fn_eval_id) ==
2077 asynchLocalActivePRPQueue.end()) {
2078 launch = false;
2079 if (static_limited) { // only schedule if local "server" not busy
2080 server_index = (fn_eval_id - 1) % static_servers;
2081 if (!localServerAssigned[server_index])
2082 { launch = true; localServerAssigned.set(server_index); }
2083 }
2084 else launch = true;
2085 if (launch)
2086 { launch_asynch_local(local_prp_iter); ++num_active; }
2087 }
2088 }
2089 }
2090
2091
2092 /** This function provides blocking synchronization for the local
2093 synchronous case (foreground system call, blocking fork, or
2094 procedure call from derived_map()). It is called from
2095 peer_static_schedule_evaluations() to perform a local portion of
2096 the total job set. */
2097 void ApplicationInterface::
synchronous_local_evaluations(PRPQueue & local_prp_queue)2098 synchronous_local_evaluations(PRPQueue& local_prp_queue)
2099 {
2100 for (PRPQueueIter local_prp_iter = local_prp_queue.begin();
2101 local_prp_iter != local_prp_queue.end(); ++local_prp_iter) {
2102 currEvalId = local_prp_iter->eval_id();
2103 const Variables& vars = local_prp_iter->variables();
2104 const ActiveSet& set = local_prp_iter->active_set();
2105 Response local_response = local_prp_iter->response(); // shared rep
2106
2107 // bcast the job to other processors within peer 1 (if required)
2108 if (multiProcEvalFlag)
2109 broadcast_evaluation(*local_prp_iter);
2110
2111 try { derived_map(vars, set, local_response, currEvalId); } // synch. local
2112
2113 catch(const FunctionEvalFailure& fneval_except) {
2114 manage_failure(vars, set, local_response, currEvalId);
2115 }
2116
2117 process_synch_local(local_prp_iter);
2118 }
2119 }
2120
2121
2122 void ApplicationInterface::
broadcast_evaluation(int fn_eval_id,const Variables & vars,const ActiveSet & set)2123 broadcast_evaluation(int fn_eval_id, const Variables& vars,
2124 const ActiveSet& set)
2125 {
2126 // match bcast_e()'s in serve_evaluations_{synch,asynch,peer}
2127 parallelLib.bcast_e(fn_eval_id);
2128 MPIPackBuffer send_buffer(lenVarsActSetMessage);
2129 send_buffer << vars << set;
2130
2131 #ifdef MPI_DEBUG
2132 Cout << "broadcast_evaluation() for eval " << fn_eval_id
2133 << " with send_buffer size = " << send_buffer.size()
2134 << " and ActiveSet:\n" << set << std::endl;
2135 #endif // MPI_DEBUG
2136
2137 parallelLib.bcast_e(send_buffer);
2138 }
2139
2140
2141 /** Invoked by the serve() function in derived Model classes. Passes
2142 control to serve_evaluations_synch(), serve_evaluations_asynch(),
2143 serve_evaluations_synch_peer(), or serve_evaluations_asynch_peer()
2144 according to specified concurrency, partition, and scheduler
2145 configuration. */
serve_evaluations()2146 void ApplicationInterface::serve_evaluations()
2147 {
2148 // This function is only called when message passing, therefore the simple
2149 // test below is sufficient. Would like to use serve_evaluation_synch() in
2150 // the case where a request for asynch local concurrency is made but it is
2151 // not utilized (num_jobs <= numEvalServers in *_schedule_evaluations), but
2152 // this is not currently detectable by the slave since num_jobs is not known
2153 // a priori. serve_evaluation_asynch() still works in this case, but it
2154 // exposes the run to race conditions from the asynch processing procedure.
2155 // Thus, it is up to the user to avoid specifying asynch local concurrency
2156 // when num_jobs is not strictly greater than numEvalServers.
2157
2158 // test for server 1 within a static schedule: multiProcEvalFlag is implied
2159 // since evalCommRank 0 is running the iterator/job schedulers
2160 bool peer_server1 = (!ieDedMasterFlag && evalServerId == 1);
2161
2162 if (asynchLocalEvalConcurrency > 1) {
2163 if (peer_server1) serve_evaluations_asynch_peer();
2164 else serve_evaluations_asynch();
2165 }
2166 else {
2167 if (peer_server1) serve_evaluations_synch_peer();
2168 else serve_evaluations_synch();
2169 }
2170 }
2171
2172
2173 /** This code is invoked by serve_evaluations() to perform one synchronous
2174 job at a time on each slave/peer server. The servers receive requests
2175 (blocking receive), do local synchronous maps, and return results.
2176 This is done continuously until a termination signal is received from
2177 the master (sent via stop_evaluation_servers()). */
serve_evaluations_synch()2178 void ApplicationInterface::serve_evaluations_synch()
2179 {
2180 // update class member eval id for usage on iteratorCommRank!=0 processors
2181 // (Use case: special logic within derived direct interface plug-ins)
2182 currEvalId = 1;
2183 MPI_Status status; // holds source, tag, and number received in MPI_Recv
2184 MPI_Request request = MPI_REQUEST_NULL; // bypass MPI_Wait on first pass
2185 MPIPackBuffer send_buffer(lenResponseMessage); // prevent dealloc @loop end
2186 while (currEvalId) {
2187 MPIUnpackBuffer recv_buffer(lenVarsActSetMessage);
2188 // blocking receive of x & set
2189 if (evalCommRank == 0) { // 1-level or local comm. leader in 2-level
2190 parallelLib.recv_ie(recv_buffer, 0, MPI_ANY_TAG, status);
2191 currEvalId = status.MPI_TAG;
2192 }
2193 if (multiProcEvalFlag) { // multilevel must Bcast x/set over evalComm
2194 parallelLib.bcast_e(currEvalId);
2195 if (currEvalId)
2196 parallelLib.bcast_e(recv_buffer);
2197 }
2198
2199 if (currEvalId) { // currEvalId = 0 is the termination signal
2200
2201 // could slave's Model::currentVariables be used instead?
2202 // (would remove need to pass vars flags in MPI buffers)
2203 Variables vars; ActiveSet set;
2204 recv_buffer >> vars >> set;
2205
2206 #ifdef MPI_DEBUG
2207 Cout << "Slave receives vars/set buffer which unpacks to:\n" << vars
2208 << "Active set vector = { ";
2209 array_write_annotated(Cout, set.request_vector(), false);
2210 Cout << "} Deriv values vector = { ";
2211 array_write_annotated(Cout, set.derivative_vector(), false);
2212 Cout << '}' << std::endl;
2213 #endif // MPI_DEBUG
2214
2215 Response local_response(sharedRespData, set); // special constructor
2216
2217 // slaves invoke derived_map to avoid repeating overhead of map fn.
2218 try { derived_map(vars, set, local_response, currEvalId); } // synch local
2219
2220 catch(const FunctionEvalFailure& fneval_except) {
2221 //Cerr<< "Slave has caught exception from local derived_map; message: "
2222 //<< fneval_except.what() << std::endl;
2223 manage_failure(vars, set, local_response, currEvalId);
2224 }
2225
2226 // bypass MPI_Wait the 1st time through, since there has been no Isend
2227 if (request != MPI_REQUEST_NULL) // MPI_REQUEST_NULL = -1 IBM, 0 elsewhere
2228 parallelLib.wait(request, status); // Removes need for send_bufs array
2229 // by assuring that send_buffer is undisturbed prior to receipt by
2230 // master. This design should work well for expensive fn. evals., but
2231 // the master might become overloaded for many inexpensive evals.
2232
2233 // Return local_response data. Isend allows execution of next job to
2234 // start (if master has posted several jobs for this slave -- e.g., a
2235 // static schedule) prior to master receiving the current results. This
2236 // allows the response to be overwritten, but send_buffer overwrite is
2237 // prevented by MPI_Wait.
2238 if (evalCommRank == 0) { // 1-level or local comm. leader in 2-level
2239 send_buffer.reset();
2240 send_buffer << local_response;
2241 parallelLib.isend_ie(send_buffer, 0, currEvalId, request);
2242 }
2243 }
2244 }
2245 }
2246
2247
2248 /** This code is invoked by serve_evaluations() to perform a synchronous
2249 evaluation in coordination with the iteratorCommRank 0 processor
2250 (the iterator) for static schedules. The bcast() matches either the
2251 bcast() in synchronous_local_evaluations(), which is invoked by
2252 peer_static_schedule_evaluations(), or the bcast() in map(). */
serve_evaluations_synch_peer()2253 void ApplicationInterface::serve_evaluations_synch_peer()
2254 {
2255 // update class member eval id for usage on iteratorCommRank!=0 processors
2256 // (Use case: special logic within derived direct interface plug-ins)
2257 currEvalId = 1;
2258 while (currEvalId) {
2259 parallelLib.bcast_e(currEvalId); // incoming from iterator
2260
2261 if (currEvalId) { // currEvalId = 0 is the termination signal
2262
2263 MPIUnpackBuffer recv_buffer(lenVarsActSetMessage);
2264 parallelLib.bcast_e(recv_buffer); // incoming from iterator
2265
2266 Variables vars; ActiveSet set;
2267 recv_buffer >> vars >> set;
2268
2269 #ifdef MPI_DEBUG
2270 Cout << "Peer receives vars/set buffer which unpacks to:\n" << vars
2271 << "Active set vector = { ";
2272 array_write_annotated(Cout, set.request_vector(), false);
2273 Cout << "} Deriv values vector = { ";
2274 array_write_annotated(Cout, set.derivative_vector(), false);
2275 Cout << '}' << std::endl;
2276 #endif // MPI_DEBUG
2277
2278 Response local_response(sharedRespData, set); // special constructor
2279
2280 // slaves invoke derived_map to avoid repeating overhead of map fn.
2281 try { derived_map(vars, set, local_response, currEvalId); } //synch local
2282
2283 catch(const FunctionEvalFailure& fneval_except) {
2284 //Cerr<< "Slave has caught exception from local derived_map; message: "
2285 //<< fneval_except.what() << std::endl;
2286 manage_failure(vars, set, local_response, currEvalId);
2287 }
2288 }
2289 }
2290 }
2291
2292
2293 /** This code is invoked by serve_evaluations() to perform multiple
2294 asynchronous jobs on each slave/peer server. The servers test for
2295 any incoming jobs, launch any new jobs, process any completed jobs,
2296 and return any results. Each of these components is nonblocking,
2297 although the server loop continues until a termination signal is
2298 received from the master (sent via stop_evaluation_servers()). In
2299 the master-slave case, the master maintains the correct number of
2300 jobs on each slave. In the static scheduling case, each server is
2301 responsible for limiting concurrency (since the entire static
2302 schedule is sent to the peers at start up). */
serve_evaluations_asynch()2303 void ApplicationInterface::serve_evaluations_asynch()
2304 {
2305 // ---------------------------------------------------------------------------
2306 // multiprocessor system calls, forks, and threads can't share a communicator,
2307 // only synchronous direct interfaces can. Therefore, this asynch job
2308 // scheduler would not normally need to support a multiprocessor evalComm
2309 // (ApplicationInterface::init_communicators_checks() normally prevents this
2310 // in its check for multiproc evalComm and asynchLocalEvalConcurrency > 1).
2311 // However, direct interface plug-ins can use derived_{synchronize,
2312 // synchronize_nowait} to provide a poor man's batch processing capability,
2313 // so this function has been extended to allow for multiProcEvalFlag to
2314 // accommodate this case.
2315 // ---------------------------------------------------------------------------
2316
2317 // ----------------------------------------------------------
2318 // Step 1: block on first message before entering while loops
2319 // ----------------------------------------------------------
2320 MPIUnpackBuffer recv_buffer(lenVarsActSetMessage);
2321 MPI_Status status; // holds MPI_SOURCE, MPI_TAG, & MPI_ERROR
2322 int fn_eval_id = 1, num_active = 0;
2323 MPI_Request recv_request = MPI_REQUEST_NULL; // bypass MPI_Test on first pass
2324 if (evalCommRank == 0) // 1-level or local comm. leader in 2-level
2325 parallelLib.recv_ie(recv_buffer, 0, MPI_ANY_TAG, status);
2326
2327 do { // main loop
2328
2329 // -------------------------------------------------------------------
2330 // Step 2: check for additional incoming messages & unpack/execute all
2331 // jobs received
2332 // -------------------------------------------------------------------
2333 int mpi_test_flag = 1;
2334 // num_active < asynchLocalEvalConcurrency check below manages concurrency
2335 // in the static scheduling case (master_dynamic_schedule_evaluations()
2336 // handles this from the master side)
2337 while (mpi_test_flag && fn_eval_id &&
2338 num_active < asynchLocalEvalConcurrency) {
2339 // test for completion (Note: MPI_REQUEST_NULL != 0 on IBM)
2340 if (evalCommRank == 0 && recv_request != MPI_REQUEST_NULL)
2341 parallelLib.test(recv_request, mpi_test_flag, status);
2342 if (multiProcEvalFlag)
2343 parallelLib.bcast_e(mpi_test_flag);
2344 // if test indicates a completion: unpack, execute, & repost
2345 if (mpi_test_flag) {
2346
2347 if (evalCommRank == 0)
2348 fn_eval_id = status.MPI_TAG;
2349 if (multiProcEvalFlag)
2350 parallelLib.bcast_e(fn_eval_id);
2351
2352 if (fn_eval_id) {
2353 launch_asynch_local(recv_buffer, fn_eval_id); ++num_active;
2354 // repost
2355 if (evalCommRank == 0)
2356 parallelLib.irecv_ie(recv_buffer, 0, MPI_ANY_TAG, recv_request);
2357 }
2358 }
2359 }
2360
2361 // -----------------------------------------------------------------
2362 // Step 3: check for any completed jobs and return results to master
2363 // -----------------------------------------------------------------
2364 if (num_active > 0) {
2365 completionSet.clear();
2366 test_local_evaluations(asynchLocalActivePRPQueue);//rebuilds completionSet
2367 num_active -= completionSet.size();
2368 PRPQueueIter q_it;
2369 for (ISCIter id_iter = completionSet.begin();
2370 id_iter != completionSet.end(); ++id_iter) {
2371 int completed_eval_id = *id_iter;
2372 q_it = lookup_by_eval_id(asynchLocalActivePRPQueue, completed_eval_id);
2373 if (q_it == asynchLocalActivePRPQueue.end()) {
2374 Cerr << "Error: failure in queue lookup within ApplicationInterface::"
2375 << "serve_evaluations_asynch()." << std::endl;
2376 abort_handler(-1);
2377 }
2378 else {
2379 if (evalCommRank == 0) {
2380 // In this case, use a blocking send to avoid having to manage waits
2381 // on multiple send buffers (which would be a pain since the number
2382 // of sendBuffers would vary with completionSet length). The eval
2383 // scheduler processor should have pre-posted corresponding recv's.
2384 MPIPackBuffer send_buffer(lenResponseMessage);
2385 send_buffer << q_it->response();
2386 parallelLib.send_ie(send_buffer, 0, completed_eval_id);
2387 }
2388 asynchLocalActivePRPQueue.erase(q_it);
2389 }
2390 }
2391 }
2392
2393 } while (fn_eval_id || num_active > 0);
2394
2395 //clear_bookkeeping(); // clear any bookkeeping lists in derived classes
2396 }
2397
2398
2399 /** This code is invoked by serve_evaluations() to perform multiple
2400 asynchronous jobs on multiprocessor slave/peer servers. It
2401 matches the multiProcEvalFlag bcasts in
2402 ApplicationInterface::asynchronous_local_evaluations(). */
serve_evaluations_asynch_peer()2403 void ApplicationInterface::serve_evaluations_asynch_peer()
2404 {
2405 MPIUnpackBuffer recv_buffer(lenVarsActSetMessage);
2406 int fn_eval_id = 1, num_jobs;
2407 size_t num_active = 0, num_launch = 0, num_completed;
2408
2409 // TO DO: redesign this logic for greater generality.
2410 // ONE OPTION: MOVE THIS INSIDE LOOP AND SYNC #JOBS THIS PASS
2411 // > num_jobs could be inefficient to determine a priori (static_limited)
2412 // MORE ATTRACTIVE OPTION: ELIMINATE BCAST & HAVE PEER MASTER SEND TERM ASAP
2413 // > straightforward for asynchronous_local_evaluations()
2414 // > asynch_local_evaluations_nowait() presents a problem (it can be active in
2415 // the multiProcEvalFlag peer asynch context...) since the job queue can
2416 // dynamically expand/contract across multiple invocations and we want the
2417 // server to remain up --> may need a different implementation for
2418 // serve_evaluations_asynch_peer_nowait()?
2419 parallelLib.bcast_e(num_jobs);
2420
2421 do { // main loop
2422
2423 // TO DO: generalize for nowait uses where num_jobs expands/shrinks
2424 // Preferred approach is to send term code more immediately...
2425
2426 // -------------------------------------------------------------------
2427 // check for incoming messages & unpack/execute all jobs received
2428 // -------------------------------------------------------------------
2429 while (num_active < asynchLocalEvalConcurrency && num_launch < num_jobs) {
2430 parallelLib.bcast_e(fn_eval_id);
2431 if (fn_eval_id) {
2432 launch_asynch_local(recv_buffer, fn_eval_id);
2433 ++num_active; ++num_launch;
2434 }
2435 }
2436
2437 // -----------------------------------------------------------------
2438 // check for any completed jobs and return results to master
2439 // -----------------------------------------------------------------
2440 if (num_active > 0) {
2441 completionSet.clear();
2442 test_local_evaluations(asynchLocalActivePRPQueue);//rebuilds completionSet
2443 num_completed = completionSet.size();
2444 if (num_completed == num_active)
2445 { num_active = 0; asynchLocalActivePRPQueue.clear(); }
2446 else {
2447 num_active -= num_completed; //num_jobs_remaining -= num_completed;
2448 PRPQueueIter q_it; ISCIter id_it;
2449 for (id_it=completionSet.begin(); id_it!=completionSet.end(); ++id_it) {
2450 q_it = lookup_by_eval_id(asynchLocalActivePRPQueue, *id_it);
2451 if (q_it == asynchLocalActivePRPQueue.end()) {
2452 Cerr << "Error: failure in queue lookup within ApplicationInterface"
2453 << "::serve_evaluations_asynch_peer()." << std::endl;
2454 abort_handler(-1);
2455 }
2456 else
2457 asynchLocalActivePRPQueue.erase(q_it);
2458 }
2459 }
2460 }
2461 else // catch termination code (short term fix)
2462 parallelLib.bcast_e(fn_eval_id);
2463
2464 } while (fn_eval_id || num_active > 0);
2465
2466 //clear_bookkeeping(); // clear any bookkeeping lists in derived classes
2467 }
2468
2469
2470 /** This code is executed on the iteratorComm rank 0 processor when
2471 iteration on a particular model is complete. It sends a
2472 termination signal (tag = 0 instead of a valid fn_eval_id) to each
2473 of the slave analysis servers. NOTE: This function is called from
2474 the Strategy layer even when in serial mode. Therefore, use
2475 iteratorCommSize to provide appropriate fall through behavior. */
stop_evaluation_servers()2476 void ApplicationInterface::stop_evaluation_servers()
2477 {
2478 if (iteratorCommSize > 1) {
2479 if (!ieDedMasterFlag) {
2480 if (outputLevel > NORMAL_OUTPUT)
2481 Cout << "Peer 1 stopping" << std::endl;
2482
2483 // TO DO: deactivate this block
2484 if (multiProcEvalFlag) {// stop serve_evaluation_{,a}synch_peer procs
2485 int fn_eval_id = 0;
2486 parallelLib.bcast_e(fn_eval_id);
2487 }
2488
2489 }
2490 MPIPackBuffer send_buffer(0); // empty buffer
2491 MPI_Request send_request;
2492 int server_id, term_tag = 0; // triggers termination
2493 // Peer partitions have one fewer interComm from server 1 to servers 2-n,
2494 // relative to ded master partitions which have interComms from server 0
2495 // to servers 1-n.
2496 int end = (ieDedMasterFlag) ? numEvalServers+1 : numEvalServers;
2497 for (server_id=1; server_id<end; ++server_id) {
2498 // stop serve_evaluation_{synch,asynch} procs
2499 if (outputLevel > NORMAL_OUTPUT) {
2500 if (ieDedMasterFlag)
2501 Cout << "Master stopping server " << server_id << std::endl;
2502 else
2503 Cout << "Peer " << server_id+1 << " stopping" << std::endl;
2504 }
2505 // nonblocking sends: master posts all terminate messages without waiting
2506 // for completion. Bcast cannot be used since all procs must call it and
2507 // slaves are using Recv/Irecv in serve_evaluation_synch/asynch.
2508 parallelLib.isend_ie(send_buffer, server_id, term_tag, send_request);
2509 parallelLib.free(send_request); // no test/wait on send_request
2510 }
2511 // if the communicator partitioning resulted in a trailing partition of
2512 // idle processors due to a partitioning remainder (caused by strictly
2513 // honoring a processors_per_server override), then these are not
2514 // included in numEvalServers and we quietly free them separately.
2515 // We assume a single server with all idle processors and a valid
2516 // inter-communicator (enforced during split).
2517 // > dedicated master: server_id = #servers+1 -> interComm[#servers]
2518 // > peer: server_id = #servers -> interComm[#servers-1]
2519 if (parallelLib.parallel_configuration().ie_parallel_level().
2520 idle_partition()) {
2521 parallelLib.isend_ie(send_buffer, server_id, term_tag, send_request);
2522 parallelLib.free(send_request); // no test/wait on send_request
2523 }
2524 }
2525 }
2526
2527
2528 // --------------------------------------------------
2529 // Schedulers for concurrent analyses within fn evals
2530 // --------------------------------------------------
2531 /** This code is called from derived classes to provide the master
2532 portion of a master-slave algorithm for the dynamic scheduling of
2533 analyses among slave servers. It is patterned after
2534 master_dynamic_schedule_evaluations(). It performs no analyses
2535 locally and matches either serve_analyses_synch() or
2536 serve_analyses_asynch() on the slave servers, depending on the
2537 value of asynchLocalAnalysisConcurrency. Dynamic scheduling
2538 assigns jobs in 2 passes. The 1st pass gives each server the same
2539 number of jobs (equal to asynchLocalAnalysisConcurrency). The 2nd
2540 pass assigns the remaining jobs to slave servers as previous jobs
2541 are completed. Single- and multilevel parallel use intra- and
2542 inter-communicators, respectively, for send/receive. Specific
2543 syntax is encapsulated within ParallelLibrary. */
master_dynamic_schedule_analyses()2544 void ApplicationInterface::master_dynamic_schedule_analyses()
2545 {
2546 int capacity = (asynchLocalAnalysisConcurrency > 1) ?
2547 asynchLocalAnalysisConcurrency*numAnalysisServers :
2548 numAnalysisServers;
2549 int num_sends = (capacity < numAnalysisDrivers) ?
2550 capacity : numAnalysisDrivers;
2551 #ifdef MPI_DEBUG
2552 Cout << "First pass: assigning " << num_sends << " analyses among "
2553 << numAnalysisServers << " servers\n";
2554 #endif // MPI_DEBUG
2555 MPI_Request send_request; // only 1 needed since no test/wait on sends
2556 int* rtn_codes = new int [num_sends];
2557 MPI_Request* recv_requests = new MPI_Request [num_sends];
2558 int i, server_id, analysis_id;
2559 for (i=0; i<num_sends; ++i) { // send data & post recvs for 1st pass
2560 server_id = i%numAnalysisServers + 1; // from 1 to numAnalysisServers
2561 analysis_id = i+1;
2562 #ifdef MPI_DEBUG
2563 Cout << "Master assigning analysis " << analysis_id << " to server "
2564 << server_id << '\n';
2565 #endif // MPI_DEBUG
2566 // pre-post receives.
2567 parallelLib.irecv_ea(rtn_codes[i], server_id, analysis_id,
2568 recv_requests[i]);
2569 parallelLib.isend_ea(analysis_id, server_id, analysis_id, send_request);
2570 parallelLib.free(send_request); // no test/wait on send_request
2571 }
2572 if (num_sends < numAnalysisDrivers) { // schedule remaining analyses
2573 #ifdef MPI_DEBUG
2574 Cout << "Second pass: scheduling " << numAnalysisDrivers-num_sends
2575 << " remaining analyses\n";
2576 #endif // MPI_DEBUG
2577 int send_cntr = num_sends, recv_cntr = 0, out_count;
2578 MPI_Status* status_array = new MPI_Status [num_sends];
2579 int* index_array = new int [num_sends];
2580 while (recv_cntr < numAnalysisDrivers) {
2581 #ifdef MPI_DEBUG
2582 Cout << "Waiting on completed analyses" << std::endl;
2583 #endif // MPI_DEBUG
2584 parallelLib.waitsome(num_sends, recv_requests, out_count, index_array,
2585 status_array);
2586 recv_cntr += out_count;
2587 for (i=0; i<out_count; ++i) {
2588 int index = index_array[i]; // index of recv_request that completed
2589 server_id = index%numAnalysisServers + 1;// from 1 to numAnalysisServers
2590 #ifdef MPI_DEBUG
2591 Cout << "analysis " << status_array[i].MPI_TAG
2592 <<" has returned from slave server " << server_id << '\n';
2593 #endif // MPI_DEBUG
2594 if (send_cntr < numAnalysisDrivers) {
2595 analysis_id = send_cntr+1;
2596 #ifdef MPI_DEBUG
2597 Cout << "Master assigning analysis " << analysis_id << " to server "
2598 << server_id << '\n';
2599 #endif // MPI_DEBUG
2600 // pre-post receives
2601 parallelLib.irecv_ea(rtn_codes[index], server_id, analysis_id,
2602 recv_requests[index]);
2603 parallelLib.isend_ea(analysis_id, server_id, analysis_id,
2604 send_request);
2605 parallelLib.free(send_request); // no test/wait on send_request
2606 ++send_cntr;
2607 }
2608 }
2609 }
2610 delete [] status_array;
2611 delete [] index_array;
2612 }
2613 else { // all analyses assigned in first pass
2614 #ifdef MPI_DEBUG
2615 Cout << "Waiting on all analyses" << std::endl;
2616 #endif // MPI_DEBUG
2617 parallelLib.waitall(numAnalysisDrivers, recv_requests);
2618 }
2619 delete [] rtn_codes;
2620 delete [] recv_requests;
2621
2622 // Unlike ApplicationInterface::master_dynamic_schedule_evaluations() (which
2623 // terminates servers only when the iterator/model is complete), terminate
2624 // servers now so that they can return from derived_map to the higher level.
2625 analysis_id = 0; // termination flag for servers
2626 for (i=0; i<numAnalysisServers; ++i) {
2627 parallelLib.isend_ea(analysis_id, i+1, 0, send_request);
2628 parallelLib.free(send_request); // no test/wait on send_request
2629 }
2630 }
2631
2632
2633 /** This code is called from derived classes to run synchronous
2634 analyses on slave processors. The slaves receive requests
2635 (blocking receive), do local derived_map_ac's, and return codes.
2636 This is done continuously until a termination signal is received
2637 from the master. It is patterned after
2638 serve_evaluations_synch(). */
serve_analyses_synch()2639 void ApplicationInterface::serve_analyses_synch()
2640 {
2641 int analysis_id = 1;
2642 MPI_Status status; // holds source, tag, and number received in MPI_Recv
2643 MPI_Request request = MPI_REQUEST_NULL; // bypass MPI_Wait on first pass
2644 while (analysis_id) {
2645 // blocking receive of analysis id
2646 if (analysisCommRank == 0) // 1-level or comm. leader in 2-level (DirectFn)
2647 parallelLib.recv_ea(analysis_id, 0, MPI_ANY_TAG, status);
2648 // NOTE: could get analysis_id from status.MPI_TAG & receive something
2649 // else useful in 1st integer slot. One possibility: tag with
2650 // (fn_eval_id-1)*numAnalysisDrivers+analysis_id to prevent any
2651 // possible tag overlap with message traffic at other parallelism
2652 // levels. However, I think the use of communicators prevent these
2653 // types of errors.
2654 if (multiProcAnalysisFlag) // must Bcast over analysisComm
2655 parallelLib.bcast_a(analysis_id);
2656
2657 if (analysis_id) { // analysis_id = 0 is the termination signal
2658
2659 int rtn_code = synchronous_local_analysis(analysis_id);
2660
2661 if (request != MPI_REQUEST_NULL) // MPI_REQUEST_NULL = -1 IBM, 0 elsewhere
2662 parallelLib.wait(request, status); // block until prev isend completes
2663
2664 // Return the simulation failure code and tag with analysis_id
2665 // NOTE: the rtn_code is not currently used for anything on the master
2666 // since failure capturing exceptions are captured in
2667 // read_results_files (sys call, fork) or derived_map_ac (direct).
2668 // As for recv_ea above, it could be replaced with something else.
2669 if (analysisCommRank == 0)// 1-level or comm. leader in 2-level (DirectFn)
2670 parallelLib.isend_ea(rtn_code, 0, analysis_id, request);
2671 }
2672 }
2673 }
2674
2675
2676 // -----------------------------------------
2677 // Routines for managing simulation failures
2678 // -----------------------------------------
2679 void ApplicationInterface::
manage_failure(const Variables & vars,const ActiveSet & set,Response & response,int failed_eval_id)2680 manage_failure(const Variables& vars, const ActiveSet& set, Response& response,
2681 int failed_eval_id)
2682 {
2683 // SysCall/Fork application interface exception handling:
2684 // When a simulation failure is detected w/i
2685 // Response::read(istream), a FunctionEvalFailure exception is
2686 // thrown which is first caught by catch(FunctionEvalFailure) within
2687 // derived_map or wait_local_evaluations, depending on whether the
2688 // simulation was synch or asynch. In the case of derived_map, it
2689 // rethrows the exception to an outer catch in map or serve which
2690 // then invokes manage_failure. In the case of
2691 // wait_local_evaluations, it invokes manage_failure directly.
2692 // manage_failure can then catch subsequent exceptions resulting
2693 // from the try blocks below.
2694
2695 // DirectFn application interface exception handling:
2696 // The DirectFn case is simpler than SysCall since (1) synch case: the
2697 // exception can be thrown directly from derived_map to map or serve which
2698 // then invokes manage_failure (which then catches additional exceptions
2699 // thrown directly from derived_map), or (2) asynch case:
2700 // wait_local_evaluations throws no exceptions, but invokes manage_failure
2701 // directly.
2702
2703 if (failAction == "retry") {
2704 //retry(vars, set, response, failRetryLimit);
2705 int retries = 0;
2706 bool fail_flag = 1; // allow 1st pass through the while test
2707 while (fail_flag) {
2708 fail_flag = 0; // reset each time prior to derived_map
2709 ++retries;
2710 Cout << failureMessage << ": retry attempt " << retries << "/" <<
2711 failRetryLimit << " for evaluation " << failed_eval_id << ".\n";
2712 try { derived_map(vars, set, response, failed_eval_id); }
2713 catch(const FunctionEvalFailure& fneval_except) {
2714 //Cout << "Caught FunctionEvalFailure in manage_failure; message: "
2715 // fneval_except.what() << std::endl;
2716 fail_flag = 1;
2717 if (retries >= failRetryLimit) {
2718 Cerr << "Retry limit exceeded for evaluation " << failed_eval_id <<
2719 ". Aborting..." << std::endl;
2720 abort_handler(INTERFACE_ERROR);
2721 }
2722 }
2723 }
2724 }
2725 else if (failAction == "recover") {
2726 Cout << failureMessage << ": recovering with specified function values " <<
2727 "for evaluation " << failed_eval_id << ".\n";
2728 if (failRecoveryFnVals.length() != response.num_functions() ) {
2729 Cerr << "Error: length of recovery function values specification\n"
2730 << " must equal the total number of functions." << std::endl;
2731 abort_handler(-1);
2732 }
2733 // reset response to avoid bleeding over of derivatives from previous eval.
2734 response.reset();
2735 // Set specified recovery function values
2736 response.function_values(failRecoveryFnVals);
2737 }
2738 else if (failAction == "continuation") {
2739 // Compute closest source pt. for continuation from extern data_pairs.
2740 ParamResponsePair source_pair;
2741 // THIS CODE BLOCK IS A PLACEHOLDER AND IS NOT YET OPERATIONAL
2742 if (iteratorCommRank) { // if other than master
2743 // Get source pt. for continuation. Master calls get_source_point for
2744 // slave (since it has access to data_pairs) and returns result.
2745 MPIPackBuffer send_buffer(lenVarsMessage);
2746 send_buffer << vars;
2747 // master must receive failure message w/i *_schedule_evaluations
2748 // after MPI_Waitany or MPI_Waitall. Use tag < 0 ?
2749 parallelLib.send_ie(send_buffer, 0, failed_eval_id);
2750 MPIUnpackBuffer recv_buffer(lenPRPairMessage);
2751 MPI_Status recv_status;
2752 parallelLib.recv_ie(recv_buffer, 0, failed_eval_id, recv_status);
2753 recv_buffer >> source_pair;
2754 }
2755 else
2756 source_pair = get_source_pair(vars); // also the serial case
2757
2758 Cout << '\n' << failureMessage << ": halving interval and retrying " <<
2759 "evaluation " << failed_eval_id << "." << std::endl;
2760
2761 // Now that source pt. is available, call the continuation algorithm.
2762 // Mimic retry case in use of failed_eval_id to manage file names.
2763
2764 continuation(vars, set, response, source_pair, failed_eval_id);
2765
2766 }
2767 else { // default is abort
2768 Cerr << failureMessage << ": aborting due to failure in evaluation " <<
2769 failed_eval_id << "..." << std::endl;
2770 abort_handler(INTERFACE_ERROR);
2771 }
2772 }
2773
2774
2775 const ParamResponsePair&
get_source_pair(const Variables & target_vars)2776 ApplicationInterface::get_source_pair(const Variables& target_vars)
2777 {
2778 if (data_pairs.size() == 0) {
2779 Cerr << "Failure captured: No points available, aborting" << std::endl;
2780 abort_handler(-1);
2781 }
2782
2783 // TO DO: should check both continuous_variables and discrete_variables
2784 const RealVector& xc_target = target_vars.continuous_variables();
2785
2786 size_t i, num_vars = xc_target.length();
2787 Real best_sos = DBL_MAX;
2788
2789 // TO DO: Need to check for same interfaceId as well. Currently, this is
2790 // part of Response -> need to add to Interface.
2791 PRPCacheCIter prp_iter, prp_end_iter = data_pairs.end(), best_iter;
2792 for (prp_iter = data_pairs.begin(); prp_iter != prp_end_iter; ++prp_iter) {
2793 //if (interfaceId == prp_iter->interface_id()) {
2794 const RealVector& xc_source
2795 = prp_iter->variables().continuous_variables();
2796 Real sum_of_squares = 0.;
2797 for (i=0; i<num_vars; ++i)
2798 sum_of_squares += std::pow( xc_source[i] - xc_target[i], 2);
2799 if (prp_iter == data_pairs.begin() || sum_of_squares < best_sos) {
2800 best_iter = prp_iter;
2801 best_sos = sum_of_squares;
2802 }
2803 //}
2804 }
2805
2806 // Desired implementation:
2807 //return *best_iter;
2808
2809 // For now, this asks the least of the simulation management:
2810 --prp_iter; // last PRPair is one back from end()
2811 return *prp_iter;
2812 // Both the variables and the response (in a PRPair) must be returned since
2813 // the variables alone are not enough to enact the continuation. The
2814 // corresponding response must be used as the simulator initial guess.
2815 }
2816
2817
2818 void ApplicationInterface::
continuation(const Variables & target_vars,const ActiveSet & set,Response & response,const ParamResponsePair & source_pair,int failed_eval_id)2819 continuation(const Variables& target_vars, const ActiveSet& set,
2820 Response& response, const ParamResponsePair& source_pair,
2821 int failed_eval_id)
2822 {
2823 // TO DO: should use both continuous_variables and discrete_variables
2824 const Variables& source_vars = source_pair.variables();
2825 const RealVector& source_pt = source_vars.continuous_variables();
2826 const RealVector& target_pt = target_vars.continuous_variables();
2827
2828 // copy() required to avoid modifying variables in data_pairs entry (standard
2829 // operator= behavior would have source_vars sharing a variablesRep with the
2830 // identified data_pairs entry).
2831 Variables current_vars = source_vars.copy();
2832
2833 size_t i, num_cv = source_pt.length();
2834 short failures = 1, MAX_FAILURES = 10;
2835 bool target_reached = false;
2836 float EPS = 1.0e-10;
2837
2838 RealVector current_pt(num_cv, 0.), delta(num_cv, 0.);
2839 for (i=0; i<num_cv; ++i) {
2840 // Define delta to be half the distance to the target
2841 delta[i] = (target_pt[i] - source_pt[i])/2.;
2842 current_pt[i] = source_pt[i] + delta[i];
2843 }
2844
2845 while (!target_reached) {
2846
2847 // Perform the intermediate function evaluation
2848 current_vars.continuous_variables(current_pt);
2849
2850 // TO DO: the simulation must be seeded with the response corresponding to
2851 // the (current) source information. Thus, the current implementation
2852 // only works for best_iter from get_source_point = last evaluation.
2853 //write_continuation_seed_file(source_response);
2854
2855 bool fail_flag = 0;
2856 try { derived_map(current_vars, set, response, failed_eval_id); }
2857 catch(const FunctionEvalFailure& fneval_except) {
2858 fail_flag = 1;
2859 }
2860
2861 if (fail_flag) {
2862 Cout << "\nInterval halving attempt " << failures << " for evaluation " <<
2863 failed_eval_id << " failed. Halving again." << std::endl;
2864 ++failures;
2865 if (failures > MAX_FAILURES) {
2866 Cerr << "\n\nInterval halving limit exceeded in continuation for " <<
2867 "evaluation " << failed_eval_id << ": " << "aborting..." << std::endl;
2868 abort_handler(INTERFACE_ERROR);
2869 }
2870
2871 // take a half-step back from the failed current point (and don't update
2872 // source_vars -> keep the same seed file)
2873 for (i=0; i<num_cv; ++i) {
2874 delta[i] /= 2.0;
2875 current_pt[i] -= delta[i];
2876 }
2877 }
2878 else {
2879 Cout << "\nEvaluation succeeded.\nContinuing with current step size " <<
2880 "for evaluation " << failed_eval_id << "." << std::endl;
2881 // Possibly append new continuation evals. to data_pairs list (?)
2882
2883 if (current_pt == target_pt) // operator== in data_types.cpp
2884 target_reached = true;
2885 else {
2886 for (i=0; i<num_cv; ++i) {
2887 current_pt[i] += delta[i];
2888 if (std::fabs(1.0 - current_pt[i]/target_pt[i]) < EPS)
2889 current_pt[i] = target_pt[i]; // make sure operator== is within
2890 // DBL_EPSILON tolerance
2891 }
2892 // TO DO: update source response for seed file
2893 }
2894 }
2895 }
2896 Cout << "Finished with continuation for evaluation " << failed_eval_id <<
2897 "." << std::endl;
2898 }
2899
2900
2901 // NOTE: The following 3 methods CANNOT be inlined due to linkage errors on
2902 // native, Windows MSVC builds (strange handling of extern symbols
2903 // BoStream write_restart and PRPCache data_pairs)
2904
2905 void ApplicationInterface::
receive_evaluation(PRPQueueIter & prp_it,size_t buff_index,int server_id,bool peer_flag)2906 receive_evaluation(PRPQueueIter& prp_it, size_t buff_index, int server_id,
2907 bool peer_flag)
2908 {
2909 int fn_eval_id = prp_it->eval_id();
2910 if (outputLevel > SILENT_OUTPUT) {
2911 if (interfaceId.empty() || interfaceId == "NO_ID") Cout << "Evaluation ";
2912 else Cout << interfaceId << " evaluation ";
2913 Cout << fn_eval_id << " has returned from ";
2914 if (peer_flag) Cout << "peer server " << server_id+1 << '\n';
2915 else Cout << "slave server " << server_id << '\n';
2916 }
2917
2918 #ifdef MPI_DEBUG
2919 Cout << "receive_evaluation() buff_index = " << buff_index << " fn_eval_id = "
2920 << fn_eval_id << " server_id = " << server_id << std::endl;
2921 #endif // MPI_DEBUG
2922
2923 // Process incoming buffer from remote server. Avoid multiple key-value
2924 // lookups. Incoming response is a lightweight constructed response
2925 // corresponding to a particular ActiveSet.
2926 Response remote_response;
2927 recvBuffers[buff_index] >> remote_response; // lightweight response
2928 // share the rep among between rawResponseMap and the processing queue, but
2929 // don't trample raw response sizing with lightweight remote response
2930 Response raw_response = rawResponseMap[fn_eval_id] = prp_it->response();
2931 raw_response.update(remote_response);
2932
2933 // insert into restart and eval cache ASAP
2934 if (evalCacheFlag) data_pairs.insert(*prp_it);
2935 if (restartFileFlag) parallelLib.write_restart(*prp_it);
2936 }
2937
2938
process_asynch_local(int fn_eval_id)2939 void ApplicationInterface::process_asynch_local(int fn_eval_id)
2940 {
2941 PRPQueueIter prp_it
2942 = lookup_by_eval_id(asynchLocalActivePRPQueue, fn_eval_id);
2943 if (prp_it == asynchLocalActivePRPQueue.end()) {
2944 Cerr << "Error: failure in eval id lookup in ApplicationInterface::"
2945 << "process_asynch_local()." << std::endl;
2946 abort_handler(-1);
2947 }
2948
2949 if (outputLevel > SILENT_OUTPUT) {
2950 if (interfaceId.empty() || interfaceId == "NO_ID") Cout << "Evaluation ";
2951 else Cout << interfaceId << " evaluation ";
2952 Cout << fn_eval_id;
2953 if(batchEval) Cout << " (batch " << batchIdCntr << ")";
2954 Cout << " has completed\n";
2955 }
2956
2957 rawResponseMap[fn_eval_id] = prp_it->response();
2958 if (evalCacheFlag) data_pairs.insert(*prp_it);
2959 if (restartFileFlag) parallelLib.write_restart(*prp_it);
2960
2961 asynchLocalActivePRPQueue.erase(prp_it);
2962 if (asynchLocalEvalStatic && asynchLocalEvalConcurrency > 1) {// free "server"
2963 size_t static_servers = asynchLocalEvalConcurrency * numEvalServers,
2964 server_index = (fn_eval_id - 1) % static_servers;
2965 localServerAssigned.reset(server_index);
2966 }
2967 }
2968
2969
process_synch_local(PRPQueueIter & prp_it)2970 void ApplicationInterface::process_synch_local(PRPQueueIter& prp_it)
2971 {
2972 int fn_eval_id = prp_it->eval_id();
2973 if (outputLevel > SILENT_OUTPUT) {
2974 Cout << "Performing ";
2975 if (!(interfaceId.empty() || interfaceId == "NO_ID")) Cout << interfaceId << ' ';
2976 Cout << "evaluation " << fn_eval_id << std::endl;
2977 }
2978 rawResponseMap[fn_eval_id] = prp_it->response();
2979 if (evalCacheFlag) data_pairs.insert(*prp_it);
2980 if (restartFileFlag) parallelLib.write_restart(*prp_it);
2981 }
2982
2983
common_input_filtering(const Variables & vars)2984 void ApplicationInterface::common_input_filtering(const Variables& vars)
2985 { } // empty for now
2986
2987
common_output_filtering(Response & response)2988 void ApplicationInterface::common_output_filtering(Response& response)
2989 { } // empty for now
2990
2991 } // namespace Dakota
2992