1 /*===========================================================================*
2 * This file is part of the Abstract Library for Parallel Search (ALPS). *
3 * *
4 * ALPS is distributed under the Eclipse Public License as part of the *
5 * COIN-OR repository (http://www.coin-or.org). *
6 * *
7 * Authors: *
8 * *
9 * Yan Xu, Lehigh University *
10 * Aykut Bulut, Lehigh University *
11 * Ted Ralphs, Lehigh University *
12 * *
13 * Conceptual Design: *
14 * *
15 * Yan Xu, Lehigh University *
16 * Ted Ralphs, Lehigh University *
17 * Laszlo Ladanyi, IBM T.J. Watson Research Center *
18 * Matthew Saltzman, Clemson University *
19 * *
20 * *
21 * Copyright (C) 2001-2019, Lehigh University, Yan Xu, Aykut Bulut, and *
22 * Ted Ralphs. *
23 * All Rights Reserved. *
24 *===========================================================================*/
25
26
27 #include <algorithm>
28 #include <cassert>
29 #include <cstring>
30 #include <exception>
31
32 #include "CoinError.hpp"
33 #include "CoinHelperFunctions.hpp"
34 #include "CoinTime.hpp"
35
36 #include "AlpsHelperFunctions.h"
37 #include "AlpsKnowledgeBrokerMPI.h"
38 #include "AlpsMessageTag.h"
39 #include "AlpsModel.h"
40 #include "AlpsNodePool.h"
41 #include "AlpsTreeNode.h"
42
43 using std::exception;
44
45 //#############################################################################
46
cluster2GlobalRank(int masterRank,int myHubRank,int clusterRank)47 static int cluster2GlobalRank(int masterRank, int myHubRank, int clusterRank)
48 {
49 return (myHubRank - masterRank + clusterRank);
50 }
51
52 //#############################################################################
53
54 // Form binary tree to broadcast solutions.
rankToSequence(const int incRank,const int rank)55 static int rankToSequence(const int incRank, const int rank)
56 {
57 if (rank < incRank) return rank + 1;
58 else if (rank == incRank) return 0;
59 else return rank;
60 }
61
sequenceToRank(const int incRank,const int sequence)62 static int sequenceToRank(const int incRank, const int sequence)
63 {
64 if (sequence == 0) return incRank;
65 else if (sequence <= incRank) return sequence - 1;
66 else return sequence;
67 }
68
69 #if 0 // not used
70 static int parentSequence(const int sequence, const int numProcesses)
71 {
72 if (sequence <= 0) return -1;
73 else return (sequence - 1) / 2;
74 }
75 #endif
76
leftSequence(const int sequence,const int numProcesses)77 static int leftSequence(const int sequence, const int numProcesses)
78 {
79 int seq = 2 * sequence + 1;
80 if ( seq >= numProcesses ) return -1;
81 else return seq;
82 }
83
rightSequence(const int sequence,const int numProcesses)84 static int rightSequence(const int sequence, const int numProcesses)
85 {
86 int seq = 2 * sequence + 2;
87 if ( seq >= numProcesses ) return -1;
88 else return seq;
89 }
90
91 //#############################################################################
92
computeUnitNodes(int oldUnitNodes,double nodeProcessingTime)93 static int computeUnitNodes(int oldUnitNodes,
94 double nodeProcessingTime)
95 {
96 int unitNodes = 1;
97
98 if (nodeProcessingTime < 1.0e-14) {
99 nodeProcessingTime = 1.0e-5;
100 }
101
102 if (nodeProcessingTime > 30.0) {
103 unitNodes = 1;
104 }
105 else if (nodeProcessingTime > 5.0) {
106 unitNodes = 1;
107 }
108 else if (nodeProcessingTime > 1.0) {
109 unitNodes = 2;
110 }
111 else if (nodeProcessingTime > 0.1) {
112 unitNodes = 3;
113 }
114 else if (nodeProcessingTime > 0.05) {
115 unitNodes = 4;
116 }
117 else if (nodeProcessingTime > 0.01) {
118 unitNodes = 5;
119 }
120 else if (nodeProcessingTime > 0.005) {
121 unitNodes = 7;
122 }
123 else if (nodeProcessingTime > 0.001) {
124 unitNodes = 30;
125 }
126 else if (nodeProcessingTime > 0.0005) {
127 unitNodes = 50;
128 }
129 else if (nodeProcessingTime > 0.0001) {
130 unitNodes = 200;
131 }
132 else if (nodeProcessingTime > 0.00005) {
133 unitNodes = 400;
134 }
135 else {
136 unitNodes = 500;
137 }
138
139 if (oldUnitNodes > 0) {
140 unitNodes = (int)(0.5 * (unitNodes + oldUnitNodes));
141 }
142
143 #if 0
144 std::cout << "$$$ Old unitNodes = " << oldUnitNodes
145 << "$$$ new unitWork = " << unitNodes
146 << ", nodeProcessingTime = " << nodeProcessingTime
147 << std::endl;
148 #endif
149
150 return unitNodes;
151 }
152
153 //#############################################################################
154
computeBalancePeriod(bool hasUserInput,double oldBalancePeriod,double nodeProcessingTime)155 static double computeBalancePeriod(bool hasUserInput,
156 double oldBalancePeriod,
157 double nodeProcessingTime)
158 {
159 if (hasUserInput) return oldBalancePeriod;
160
161 //todo(aykut) added to fix when nodeProcessingTime is inf.
162 if (nodeProcessingTime>1e80) {
163 return 0.3;
164 }
165
166 double newPeriod = 0.1;
167
168 if (nodeProcessingTime < 1.0e-14) {
169 nodeProcessingTime = 1.0e-5;
170 }
171
172 if (nodeProcessingTime > 30.0) {
173 newPeriod = nodeProcessingTime;
174 }
175 else if (nodeProcessingTime > 5.0) {
176 newPeriod = nodeProcessingTime;
177 }
178 else if (nodeProcessingTime > 1.0) {
179 newPeriod = nodeProcessingTime;
180 }
181 else if (nodeProcessingTime > 0.5) {
182 newPeriod = nodeProcessingTime;
183 }
184 else if (nodeProcessingTime > 0.1) {
185 newPeriod = nodeProcessingTime;
186 }
187 else if (nodeProcessingTime > 0.05) {
188 newPeriod = nodeProcessingTime * 2;
189 }
190 else if (nodeProcessingTime > 0.01) {
191 newPeriod = nodeProcessingTime * 3;
192 }
193 else if (nodeProcessingTime > 0.005) {
194 newPeriod = nodeProcessingTime * 10;
195 }
196 else if (nodeProcessingTime > 0.001) {
197 newPeriod = nodeProcessingTime * 40;
198 }
199 else if (nodeProcessingTime > 0.0005) {
200 newPeriod = nodeProcessingTime * 100;
201 }
202 else if (nodeProcessingTime > 0.0001) {
203 newPeriod = nodeProcessingTime * 200;
204 }
205 else if (nodeProcessingTime > 0.00005) {
206 newPeriod = nodeProcessingTime * 600;
207 }
208 else if (nodeProcessingTime > 0.00001) {
209 newPeriod = nodeProcessingTime * 1500;
210 }
211 else {
212 newPeriod = 0.03;
213 }
214
215 if (oldBalancePeriod > 0.0) {
216 newPeriod = 0.5 * (newPeriod + oldBalancePeriod);
217 }
218
219 #if 0
220 std::cout << "$$$ newPeriod = " << newPeriod
221 << ", oldBalancePeriod = " << oldBalancePeriod
222 << ", nodeProcessingTime = " << nodeProcessingTime
223 << std::endl;
224 #endif
225 return newPeriod;
226 }
227
228 //#############################################################################
229
230 void
masterMain(AlpsTreeNode * root)231 AlpsKnowledgeBrokerMPI::masterMain(AlpsTreeNode* root)
232 {
233 int i;
234 int preSysSendCount = 0;
235 int masterCheckCount = 0;
236 int hubCheckCount = 0;
237 int reportInterval = 0;
238 int reportIntervalCount = 0;
239
240 double elaspeTime = 0.0;
241
242 char reply = 'C';
243
244 bool allWorkerReported = false;
245 bool terminate = false;
246
247 //------------------------------------------------------
248 // Get parameters.
249 //------------------------------------------------------
250 const int printSystemStatus =
251 model_->AlpsPar()->entry(AlpsParams::printSystemStatus);
252
253 const int staticBalanceScheme =
254 model_->AlpsPar()->entry(AlpsParams::staticBalanceScheme);
255
256 //const bool clockType =
257 //model_->AlpsPar()->entry(AlpsParams::clockType);
258 const bool interCB =
259 model_->AlpsPar()->entry(AlpsParams::interClusterBalance);
260 const bool intraCB =
261 model_->AlpsPar()->entry(AlpsParams::intraClusterBalance);
262
263 const int smallSize =
264 model_->AlpsPar()->entry(AlpsParams::smallSize);
265
266 const int nodeLimit =
267 model_->AlpsPar()->entry(AlpsParams::nodeLimit);
268 const int solLimit = model_->AlpsPar()->entry(AlpsParams::solLimit);
269
270 const double zeroLoad =
271 model_->AlpsPar()->entry(AlpsParams::zeroLoad);
272
273 masterBalancePeriod_ =
274 model_->AlpsPar()->entry(AlpsParams::masterBalancePeriod);
275 if (masterBalancePeriod_ > 0.0) {
276 userBalancePeriod_ = true;
277 }
278 else {
279 masterBalancePeriod_ = -masterBalancePeriod_;
280 }
281
282 largeSize_ = model_->AlpsPar()->entry(AlpsParams::largeSize);
283
284 //------------------------------------------------------
285 // Initialization and setup.
286 //------------------------------------------------------
287
288 masterTimer_.setClockType(AlpsClockTypeWallClock);
289
290 hubNodeProcesseds_ = new int [hubNum_];
291 hubWorkQualities_ = new double [hubNum_];
292 hubWorkQuantities_ = new double [hubNum_];
293 hubReported_ = new bool [hubNum_];
294 workerNodeProcesseds_ = new int [clusterSize_];
295 workerWorkQualities_ = new double [clusterSize_];
296 workerWorkQuantities_ = new double [clusterSize_];
297 workerReported_ = new bool [clusterSize_];
298
299 for (i = 0; i < hubNum_; ++i) {
300 hubNodeProcesseds_[i] = 0;
301 hubWorkQualities_[i] = 0.0;
302 hubWorkQuantities_[i] = 0.0;
303 hubReported_[i] = false;
304 }
305 for (i = 0; i < clusterSize_; ++i) {
306 workerNodeProcesseds_[i] = 0;
307 workerWorkQualities_[i] = ALPS_OBJ_MAX;
308 workerWorkQuantities_[i] = 0.0;
309 workerReported_[i] = false;
310 }
311
312 workerNodeProcesseds_[0] = nodeProcessedNum_;
313 workerWorkQualities_[0] = workQuality_;
314 workerWorkQuantities_[0] = workQuantity_;
315
316 root->setBroker(this);
317 root->setQuality(ALPS_OBJ_MAX);
318 root->setDepth(0);
319 root->setIndex(0);
320 root->setExplicit(1); // Always true for root.
321
322 //------------------------------------------------------
323 // Estimate a tree node size and send it to other hubs.
324 //------------------------------------------------------
325
326 AlpsEncoded* encSize = root->encode();
327 setNodeMemSize(static_cast<int>(encSize->size() * 4));
328 delete encSize;
329
330 // Adjust largeSize to avoid extreme cases.
331 largeSize_ = CoinMax(largeSize_, nodeMemSize_ * 3);
332
333 // Allocate msg buffers
334 largeBuffer_ = new char [largeSize_];
335 largeBuffer2_ = new char [largeSize_];
336 smallBuffer_ = new char [smallSize];
337
338 for (i = 0; i < hubNum_; ++i) {
339 if (hubRanks_[i] != globalRank_) {
340 MPI_Send(&nodeMemSize_, 1, MPI_INT, i, AlpsMsgNodeSize, hubComm_);
341 }
342 }
343
344 if (msgLevel_ > 0) {
345 messageHandler()->message(ALPS_NODE_MEM_SIZE, messages())
346 << nodeMemSize_ << CoinMessageEol;
347 }
348
349 //------------------------------------------------------
350 // Send estimated node size to master's workers.
351 //------------------------------------------------------
352
353 for (i = 0; i < clusterSize_; ++i) {
354 if (i != clusterRank_) {
355 MPI_Send(&nodeMemSize_, 1, MPI_INT, i, AlpsMsgNodeSize,
356 clusterComm_);
357 }
358 }
359
360 MPI_Barrier(MPI_COMM_WORLD); // Sync before rampup
361
362 //======================================================
363 // Master's Ramp-up.
364 //======================================================
365
366 // Start to measure rampup
367 masterTimer_.start();
368
369 setPhase(AlpsPhaseRampup);
370 switch (staticBalanceScheme) {
371 case AlpsRootInit:
372 rootInitMaster(root);
373 break;
374 case AlpsSpiral:
375 spiralMaster(root);
376 break;
377 default:
378 throw CoinError("Unknown static balance scheme", "masterMain",
379 "AlpsKnowledgeBrokerMPI");
380 }
381
382 rampUpTime_ = masterTimer_.getTime();
383
384 //======================================================
385 // Search for solutions.
386 //======================================================
387
388 setPhase(AlpsPhaseSearch);
389
390 //------------------------------------------------------
391 // MASTER SCHEDULER:
392 // a. Listen and process messages periodically.
393 // b. Do termination check if the conditions are statisfied,
394 // otherwise, balances the workload of hubs.
395 //------------------------------------------------------
396
397 masterBalancePeriod_ = computeBalancePeriod(userBalancePeriod_,
398 masterBalancePeriod_,
399 nodeProcessingTime_);
400
401 if (msgLevel_ > 0 && interCB) {
402 messageHandler()->message(ALPS_LOADBAL_MASTER_PERIOD, messages())
403 << globalRank_ << masterBalancePeriod_ << CoinMessageEol;
404 }
405
406 MPI_Request request;
407 MPI_Status status;
408 int flag;
409
410 MPI_Irecv(largeBuffer_, largeSize_, MPI_PACKED, MPI_ANY_SOURCE,
411 MPI_ANY_TAG, MPI_COMM_WORLD, &request);
412
413 while (true) {
414
415 //**------------------------------------------------
416 // Listen and process msg for a period.
417 //**------------------------------------------------
418
419 masterTimer_.start();
420 elaspeTime = 0.0;
421
422 //masterBalancePeriod_ = 0.50; // Test frequency
423 //std::cout << ", masterBalancePeriod_ = " << masterBalancePeriod_
424 // << std::endl;
425
426 while (elaspeTime < masterBalancePeriod_) {
427 MPI_Test(&request, &flag, &status);
428 if (flag) { // Receive a msg
429 processMessages(largeBuffer_, status, request);
430 }
431 elaspeTime = masterTimer_.getTime();
432 // std::cout << "***** elaspeTime = " << elaspeTime
433 // << ", masterBalancePeriod_ = " << masterBalancePeriod_
434 // << std::endl;
435 }
436
437 //**------------------------------------------------
438 // Check if all workers in this cluster have reported their status.
439 //**------------------------------------------------
440
441 if (!allWorkerReported) {
442 workerReported_[masterRank_] = true;
443 allWorkerReported = true;
444 for (i = 0; i < clusterSize_; ++i) {
445 if (workerReported_[i] == false) {
446 allWorkerReported = false;
447 break;
448 }
449 }
450 }
451
452 //**------------------------------------------------
453 // Check whether all hubs have reported their status.
454 //**------------------------------------------------
455
456 if (!allHubReported_) {
457 allHubReported_ = true;
458 // NOTE: The position of this hub is 0.
459 hubReported_[0] = true;
460 for (i = 0; i < hubNum_; ++i) {
461 if ( hubReported_[i] != true ) {
462 allHubReported_ = false;
463 break;
464 }
465 }
466 }
467
468 //**------------------------------------------------
469 // Master add the status of its cluster to system status.
470 //**------------------------------------------------
471
472 refreshSysStatus();
473
474 //**------------------------------------------------
475 // Check if force terminate.
476 // FIXME: clean up msg and memory before leaving.
477 //**------------------------------------------------
478
479 // NOTE: Use wall clock time for parallel.
480 if (!forceTerminate_) {
481 if (allWorkerReported && allHubReported_ &&
482 timer_.reachWallLimit()) {
483
484 forceTerminate_ = true;
485 setExitStatus(AlpsExitStatusTimeLimit);
486 masterForceHubTerm();
487 hubForceWorkerTerm();
488 if (msgLevel_ > 0) {
489 if (incumbentValue_ < ALPS_INFINITY) {
490 messageHandler()->message(ALPS_LOADREPORT_MASTER_F, messages())
491 << systemNodeProcessed_
492 << systemWorkQuantity_
493 << systemWorkQuantityForce_
494 << systemSendCount_ << systemRecvCount_
495 << masterCheckCount
496 << incumbentValue_
497 << timer_.getWallClock()
498 << CoinMessageEol;
499 }
500 else {
501 messageHandler()->message(ALPS_LOADREPORT_MASTER_F_N, messages())
502 << systemNodeProcessed_
503 << systemWorkQuantity_
504 << systemWorkQuantityForce_
505 << systemSendCount_ << systemRecvCount_
506 << masterCheckCount
507 << timer_.getWallClock()
508 << CoinMessageEol;
509 }
510
511 messageHandler()->message(ALPS_TERM_FORCE_TIME, messages())
512 << timer_.limit_ << CoinMessageEol;
513 }
514 systemWorkQuantityForce_ = systemWorkQuantity_;
515 }
516 else if ( allWorkerReported && allHubReported_ &&
517 ((systemNodeProcessed_ >= nodeLimit) ||
518 (getNumKnowledges(AlpsKnowledgeTypeSolution)+solNum_ >=
519 solLimit)) ) {
520 forceTerminate_ = true;
521 if (systemNodeProcessed_ >= nodeLimit) {
522 setExitStatus(AlpsExitStatusNodeLimit);
523 messageHandler()->message(ALPS_TERM_FORCE_NODE, messages())
524 << nodeLimit << CoinMessageEol;
525 }
526 else {
527 //std::cout << "num solution = " << getNumKnowledges(AlpsKnowledgeTypeSolution) << std::endl;
528 setExitStatus(AlpsExitStatusSolLimit);
529 messageHandler()->message(ALPS_TERM_FORCE_SOL, messages())
530 << solLimit << CoinMessageEol;
531 }
532
533 masterForceHubTerm();
534 hubForceWorkerTerm();
535 if (msgLevel_ > 0) {
536 if (incumbentValue_ < ALPS_INFINITY) {
537 messageHandler()->message(ALPS_LOADREPORT_MASTER_F, messages())
538 << systemNodeProcessed_
539 << systemWorkQuantity_
540 << systemWorkQuantityForce_
541 << systemSendCount_ << systemRecvCount_
542 << masterCheckCount
543 << incumbentValue_
544 << timer_.getWallClock()
545 << CoinMessageEol;
546 }
547 else {
548 messageHandler()->message(ALPS_LOADREPORT_MASTER_F_N, messages())
549 << systemNodeProcessed_
550 << systemWorkQuantity_
551 << systemWorkQuantityForce_
552 << systemSendCount_ << systemRecvCount_
553 << masterCheckCount
554 << timer_.getWallClock()
555 << CoinMessageEol;
556 }
557 }
558 systemWorkQuantityForce_ = systemWorkQuantity_;
559 }
560 }
561
562 //**------------------------------------------------
563 // Print workload information to screen.
564 //**------------------------------------------------
565
566 reportInterval =
567 model_->AlpsPar()->entry(AlpsParams::masterReportInterval);
568
569 assert(reportInterval > 0);
570
571 if (reportIntervalCount % reportInterval == 0) {
572 if (msgLevel_ > 0) {
573 if (forceTerminate_) {
574 //d::cout << "******systemWorkQuantityForce_ = " << systemWorkQuantityForce_<< std::endl;
575 if (incumbentValue_ < ALPS_INFINITY) {
576 messageHandler()->message(ALPS_LOADREPORT_MASTER_F, messages())
577 << systemNodeProcessed_
578 << systemWorkQuantity_
579 << systemWorkQuantityForce_
580 << systemSendCount_ << systemRecvCount_
581 << masterCheckCount
582 << incumbentValue_
583 << timer_.getWallClock()
584 << CoinMessageEol;
585 }
586 else {
587 messageHandler()->message(ALPS_LOADREPORT_MASTER_F_N, messages())
588 << systemNodeProcessed_
589 << systemWorkQuantity_
590 << systemWorkQuantityForce_
591 << systemSendCount_ << systemRecvCount_
592 << masterCheckCount
593 << timer_.getWallClock()
594 << CoinMessageEol;
595 }
596 }
597 else if (printSystemStatus) {
598 if (incumbentValue_ < ALPS_INFINITY) {
599 messageHandler()->message(ALPS_LOADREPORT_MASTER, messages())
600 << systemNodeProcessed_
601 << systemWorkQuantity_
602 << systemSendCount_ << systemRecvCount_
603 << masterCheckCount << masterBalancePeriod_
604 << nodeProcessingTime_ << unitWorkNodes_
605 << incumbentValue_
606 << timer_.getWallClock()
607 << CoinMessageEol;
608 }
609 else {
610 messageHandler()->message(ALPS_LOADREPORT_MASTER_N, messages())
611 << systemNodeProcessed_
612 << systemWorkQuantity_
613 << systemSendCount_ << systemRecvCount_
614 << masterCheckCount << masterBalancePeriod_
615 << nodeProcessingTime_ << unitWorkNodes_
616 << timer_.getWallClock()
617 << CoinMessageEol;
618 }
619 }
620 }
621
622 #if 0
623 std::cout << "Master: ";
624 for (i = 0; i < hubNum_; ++i) {
625 std::cout << "hub[" << i << "]=" <<
626 hubWorkQuantities_[i] << ", ";
627 }
628 for (i = 0; i < clusterSize_; ++i) {
629 std::cout << "w[" << i << "]=" <<
630 workerWorkQuantities_[i] << ", ";
631 }
632 std::cout << std::endl;
633 #endif
634 }
635 ++reportIntervalCount;
636
637 //**------------------------------------------------
638 // If terminate check can be activated.
639 //**------------------------------------------------
640
641 if ( allWorkerReported &&
642 allHubReported_ &&
643 (systemWorkQuantity_ < zeroLoad ) &&
644 (systemSendCount_ == systemRecvCount_) ) {
645
646 preSysSendCount = systemSendCount_;
647 blockTermCheck_ = false; // Activate termination check
648 }
649
650 // Print node log if there is and not force terminate
651 // (will print system status)
652 if (!forceTerminate_) {
653 getModel()->nodeLog(NULL, false);
654 }
655
656 #if 0
657 std::cout << "blockTermCheck_=" << blockTermCheck_
658 << ", allWorkerReported=" << allWorkerReported
659 << ", allHubReported_=" << allHubReported_
660 << ", systemWorkQuantity_=" << systemWorkQuantity_
661 << ", preSysSendCount=" << preSysSendCount
662 << ", systemSendCount_=" << systemSendCount_
663 << ", systemRecvCount_=" << systemRecvCount_
664 << std::endl;
665 #endif
666
667 //**------------------------------------------------
668 // Do termination checking if activated.
669 // Note: All msgs during termination checking are not counted.
670 //**------------------------------------------------
671
672 if ( ! blockTermCheck_ ) {
673 if (msgLevel_ > 0) {
674 messageHandler()->message(ALPS_TERM_MASTER_START, messages())
675 << globalRank_ << CoinMessageEol;
676 }
677 // Ask other hubs to do termination check.
678 for (i = 0; i < hubNum_; ++i) {
679 if (hubRanks_[i] != globalRank_) {
680 MPI_Send(smallBuffer_, 0, MPI_PACKED, hubRanks_[i],
681 AlpsMsgAskHubPause, MPI_COMM_WORLD);
682 #ifdef NF_DEBUG
683 std::cout << "Master["<< masterRank_ << "] ask hub["
684 << hubRanks_[i] << "] to do termination check."
685 << std::endl;
686 #endif
687 }
688 }
689
690 // Ask master's workers to do termination check.
691 for (i = 0; i < clusterSize_; ++i) {
692 //if (i != clusterRank_) {
693 if (i != globalRank_) {
694 #ifdef NF_DEBUG
695 std::cout << "Master["<< masterRank_ << "] ask its worker["
696 << i << "] to do termination check."<< std::endl;
697 #endif
698 MPI_Send(smallBuffer_, 0, MPI_PACKED, i,
699 AlpsMsgAskPause, MPI_COMM_WORLD);
700 }
701 }
702
703
704 MPI_Request termReq;
705 MPI_Status termSta;
706
707 // Update the cluster status to which the Master belongs
708 for(i = 1; i < clusterSize_; ++i) {
709 MPI_Irecv(smallBuffer_, smallSize, MPI_PACKED, MPI_ANY_SOURCE,
710 AlpsMsgWorkerTermStatus, clusterComm_, &termReq);
711 MPI_Wait(&termReq, &termSta);
712 hubUpdateCluStatus(smallBuffer_, &termSta, clusterComm_);
713 }
714
715 #ifdef NF_DEBUG
716 std::cout << "Master: TERM: finished updating its cluster."
717 << std::endl;
718 #endif
719 // clusterWorkQuality_ += workQuality_;
720 clusterWorkQuantity_ += workQuantity_;// workQuantity = 0
721 clusterSendCount_ += sendCount_;
722 clusterRecvCount_ += recvCount_;
723 sendCount_ = recvCount_ = 0;
724
725 // Update system status
726 for(i = 1; i < hubNum_; ++i) {
727 MPI_Irecv(smallBuffer_, smallSize, MPI_PACKED, MPI_ANY_SOURCE,
728 AlpsMsgHubTermStatus, hubComm_, &termReq);
729 MPI_Wait(&termReq, &termSta);
730 masterUpdateSysStatus(smallBuffer_, &termSta, hubComm_);
731 }
732
733 #ifdef NF_DEBUG
734 std::cout << "Master: TERM: finished updating hubs." <<std::endl;
735 #endif
736 systemWorkQuantity_ += clusterWorkQuantity_;
737 systemSendCount_ += clusterSendCount_;
738 systemRecvCount_ += clusterRecvCount_;
739 clusterSendCount_ = clusterRecvCount_ = 0;
740
741 #ifdef NF_DEBUG
742 std::cout << "Master: TERM: Quantity_ = " << systemWorkQuantity_
743 << ", systemSendCount_ = " << systemSendCount_
744 << ", systemRecvCount_ = " << systemRecvCount_
745 << "; preSysSendCount = " << preSysSendCount
746 << std::endl;
747 #endif
748
749 if ( (systemWorkQuantity_ < zeroLoad) &&
750 (preSysSendCount == systemSendCount_) ) {
751 terminate = true;
752 }
753 else {
754 terminate = false;
755 // Do forget to deduct! refeshSysStatus will add clusterWorkQuantitY_
756 systemWorkQuantity_ -= clusterWorkQuantity_;
757 }
758
759 #ifdef NF_DEBUG
760 std::cout << "Master: TERM: terminate=" << terminate <<std::endl;
761 #endif
762 // True idle, tell others to terminate.
763 if (terminate) {
764 if (msgLevel_ > 0) {
765 messageHandler()->message(ALPS_TERM_MASTER_INFORM,messages())
766 << globalRank_ << "stop searching" << CoinMessageEol;
767 }
768
769 // Send instruction to my hubs (as the master)
770 for (i = 0; i < hubNum_; ++i) {
771 if (hubRanks_[i] != globalRank_) {
772 // i is not Master cluster.
773 reply = 'T';
774 MPI_Send(&reply, 1, MPI_CHAR, i, AlpsMsgContOrTerm,
775 hubComm_);
776 }
777 }
778
779 // Send instruction to my works (as a hub)
780 for (i = 0; i < clusterSize_; ++i) {
781 if (i != globalRank_) {
782 reply = 'T';
783 #ifdef NF_DEBUG
784 std::cout << "Master["<< masterRank_
785 << "] ask its worker["
786 << i << "] to terminate."
787 << " clusterRank_=" << clusterRank_
788 << std::endl;
789 #endif
790 MPI_Send(&reply, 1, MPI_CHAR, i, AlpsMsgContOrTerm,
791 clusterComm_);
792 }
793 }
794 }
795 else { // Not true idle yet
796 if (msgLevel_ > 0) {
797 messageHandler()->message(ALPS_TERM_MASTER_INFORM, messages())
798 << globalRank_ << "continue searching" << CoinMessageEol;
799 }
800
801 blockTermCheck_ = true;
802
803 // Send instruction to the hubs (as the master)
804 for (i = 0; i < hubNum_; ++i) {
805 if (hubRanks_[i] != globalRank_) { // i is not Master
806 reply = 'C';
807 MPI_Send(&reply, 1, MPI_CHAR, i,AlpsMsgContOrTerm,
808 hubComm_);
809 }
810 }
811 // Send instruction to the works (as a hub)
812 for (i = 1; i < clusterSize_; ++i) {
813 reply = 'C';
814 MPI_Send(&reply, 1, MPI_CHAR, i, AlpsMsgContOrTerm,
815 clusterComm_);
816 }
817 }
818
819 if (msgLevel_ > 0) {
820 if (forceTerminate_) {
821 if (incumbentValue_ < ALPS_INFINITY) {
822 messageHandler()->message(ALPS_LOADREPORT_MASTER_F, messages())
823 << systemNodeProcessed_
824 << systemWorkQuantity_
825 << systemWorkQuantityForce_
826 << systemSendCount_ << systemRecvCount_
827 << masterCheckCount
828 << incumbentValue_
829 << timer_.getWallClock()
830 << CoinMessageEol;
831 }
832 else {
833 messageHandler()->message(ALPS_LOADREPORT_MASTER_F_N, messages())
834 << systemNodeProcessed_
835 << systemWorkQuantity_
836 << systemWorkQuantityForce_
837 << systemSendCount_ << systemRecvCount_
838 << masterCheckCount
839 << timer_.getWallClock()
840 << CoinMessageEol;
841 }
842 }
843 else if (printSystemStatus) {
844 if (incumbentValue_ < ALPS_INFINITY) {
845 messageHandler()->message(ALPS_LOADREPORT_MASTER, messages())
846 << systemNodeProcessed_
847 << systemWorkQuantity_
848 << systemSendCount_ << systemRecvCount_
849 << masterCheckCount << masterBalancePeriod_
850 << nodeProcessingTime_ << unitWorkNodes_
851 << incumbentValue_
852 << timer_.getWallClock()
853 << CoinMessageEol;
854 }
855 else {
856 messageHandler()->message(ALPS_LOADREPORT_MASTER_N, messages())
857 << systemNodeProcessed_
858 << systemWorkQuantity_
859 << systemSendCount_ << systemRecvCount_
860 << masterCheckCount << masterBalancePeriod_
861 << nodeProcessingTime_ << unitWorkNodes_
862 << timer_.getWallClock()
863 << CoinMessageEol;
864 }
865 }
866 }
867
868 if (terminate){
869 // Break after printing msg
870 break; // Break *, Master terminates
871 }
872 }
873
874 //**------------------------------------------------
875 // Master balances work load of hubs if
876 // (1) not terminate,
877 // (2) not force terminate,
878 // (3) all hubs reported, and
879 // (4) previous balance has been completed.
880 //**------------------------------------------------
881
882 if (msgLevel_ > 10) {
883 std::cout << "masterDoBalance_ = " << masterDoBalance_
884 << ", allHubReported_ = " << allHubReported_
885 << std::endl;
886 }
887
888 if ( !terminate && !forceTerminate_ &&
889 allHubReported_ && (masterDoBalance_ == 0) ) {
890 if (hubNum_ > 1 && interCB) {
891 masterBalanceHubs();
892 ++masterCheckCount;
893 if (masterCheckCount % 10 == 0) {
894 if (msgLevel_ > 10) {
895 messageHandler()->message(ALPS_LOADBAL_MASTER, messages())
896 << globalRank_ << masterCheckCount << CoinMessageEol;
897 }
898 }
899 }
900 }
901
902 if (msgLevel_ > 10) {
903 std::cout << "++++ hubDoBalance_ = " << hubDoBalance_
904 << ", allWorkerReported = " << allWorkerReported
905 << std::endl;
906 }
907
908 if ( !terminate && !forceTerminate_ &&
909 allWorkerReported && hubDoBalance_ == 0 ) {
910 if (clusterSize_ > 2 && intraCB) {
911 hubBalanceWorkers();
912 ++hubCheckCount;
913 if (hubCheckCount % 10 == 0) {
914 if (msgLevel_ > 10) {
915 messageHandler()->message(ALPS_LOADBAL_HUB, messages())
916 << globalRank_ << hubCheckCount << CoinMessageEol;
917 }
918 }
919 }
920 }
921 }
922
923 //------------------------------------------------------
924 // Clean up before leaving.
925 //------------------------------------------------------
926
927 int cancelNum = 0;
928
929 MPI_Cancel(&request);
930 MPI_Wait(&request, &status);
931
932 MPI_Test_cancelled(&status, &flag);
933 if(flag) { // Cancel succeeded
934 ++cancelNum;
935 }
936 }
937
938 //#############################################################################
939
940 void
hubMain()941 AlpsKnowledgeBrokerMPI::hubMain()
942 {
943 int i;
944 int hubCheckCount = 0;
945 int errorCode = 0;
946
947 char reply = 'C';
948
949 double elaspeTime = 0.0;
950
951 bool allWorkerReported = false; // Workers report load at least once
952 bool terminate = false;
953 bool deletedSTs = false;
954 bool comUnitWork = false;
955
956 int numComUnitWork = 0;
957
958 AlpsReturnStatus rCode = AlpsReturnStatusOk;
959 AlpsExitStatus exitStatus = AlpsExitStatusInfeasible;
960 MPI_Status status;
961
962 //------------------------------------------------------
963 // Get parameters.
964 //------------------------------------------------------
965
966 const int staticBalanceScheme =
967 model_->AlpsPar()->entry(AlpsParams::staticBalanceScheme);
968 const int smallSize =
969 model_->AlpsPar()->entry(AlpsParams::smallSize);
970 const bool intraCB =
971 model_->AlpsPar()->entry(AlpsParams::intraClusterBalance);
972 const double zeroLoad =
973 model_->AlpsPar()->entry(AlpsParams::zeroLoad);
974 double unitTime =
975 model_->AlpsPar()->entry(AlpsParams::unitWorkTime);
976 if (unitTime <= 0.0) {
977 // Not set by user
978 unitTime = ALPS_DBL_MAX;
979 }
980
981 double changeWorkThreshold = model_->AlpsPar()->
982 entry(AlpsParams::changeWorkThreshold);
983
984 hubReportPeriod_ = model_->AlpsPar()->entry(AlpsParams::hubReportPeriod);
985 if (hubReportPeriod_ > 0.0) {
986 userBalancePeriod_ = true;
987 }
988 else {
989 hubReportPeriod_ = -hubReportPeriod_;
990 }
991
992 largeSize_ = model_->AlpsPar()->entry(AlpsParams::largeSize);
993
994 unitWorkNodes_ = model_->AlpsPar()->entry(AlpsParams::unitWorkNodes);
995 if (unitWorkNodes_ <= 0) {
996 comUnitWork = true;
997 }
998
999 //------------------------------------------------------
1000 // Initialization and setup.
1001 //------------------------------------------------------
1002
1003 hubTimer_.setClockType(AlpsClockTypeWallClock);
1004
1005 workerNodeProcesseds_ = new int [clusterSize_];
1006 workerWorkQualities_ = new double [clusterSize_];
1007 workerWorkQuantities_ = new double [clusterSize_];
1008 workerReported_ = new bool [clusterSize_];
1009
1010 for (i = 0; i < clusterSize_; ++i) {
1011 workerNodeProcesseds_[i] = 0;
1012 workerWorkQualities_[i] = ALPS_OBJ_MAX;
1013 workerWorkQuantities_[i] = 0.0;
1014 }
1015
1016 workerNodeProcesseds_[0] = nodeProcessedNum_;
1017 workerWorkQualities_[0] = workQuality_;
1018 workerWorkQuantities_[0] = workQuantity_ = 0.0;
1019
1020 //------------------------------------------------------
1021 // Recv tree node size and send it to my worker.
1022 // NOTE: master's rank is always 0 in hubComm_.
1023 //------------------------------------------------------
1024
1025 MPI_Recv(&nodeMemSize_, 1, MPI_INT, 0, AlpsMsgNodeSize, hubComm_, &status);
1026 for (i = 0; i < clusterSize_; ++i) {
1027 if (i != clusterRank_) {
1028 MPI_Send(&nodeMemSize_, 1, MPI_INT, i, AlpsMsgNodeSize,
1029 clusterComm_);
1030 }
1031 }
1032
1033 // Adjust largeSize to avoid extreme cases.
1034 largeSize_ = CoinMax(largeSize_, nodeMemSize_ * 3);
1035
1036 largeBuffer_ = new char [largeSize_];
1037 largeBuffer2_ = new char [largeSize_];
1038 smallBuffer_ = new char [smallSize];
1039
1040 MPI_Barrier(MPI_COMM_WORLD); // Sync before rampup
1041
1042 //======================================================
1043 // Hub's Ramp-up.
1044 //======================================================
1045
1046 // Start to measure rampup
1047 hubTimer_.start();
1048
1049 setPhase(AlpsPhaseRampup);
1050 switch (staticBalanceScheme) {
1051 case AlpsRootInit:
1052 rootInitHub();
1053 break;
1054 case AlpsSpiral:
1055 spiralHub();
1056 break;
1057 default:
1058 throw CoinError("Unknown static balance scheme", "hubMain",
1059 "AlpsKnowledgeBrokerMPI");
1060 }
1061
1062 rampUpTime_ = hubTimer_.getTime();
1063
1064 //======================================================
1065 // Hub's search
1066 //======================================================
1067
1068 setPhase(AlpsPhaseSearch);
1069
1070 //------------------------------------------------------
1071 // HUB SCHEDULER:
1072 // (1) Listen and process messages periodically.
1073 // (2) If required, do one unit of work.
1074 // (3) Send work quality, quantity, and msg counts to Master.
1075 // (4) Balance workload quality of my workers.
1076 // (5) Do termination check if requred.
1077 //------------------------------------------------------
1078
1079 hubReportPeriod_ = computeBalancePeriod(userBalancePeriod_,
1080 hubReportPeriod_,
1081 nodeProcessingTime_);
1082
1083 if (hubMsgLevel_ > 5 && intraCB) {
1084 messageHandler()->message(ALPS_LOADBAL_HUB_PERIOD, messages())
1085 << globalRank_ << hubReportPeriod_ << CoinMessageEol;
1086 }
1087
1088 MPI_Request request;
1089 int flag = 1;
1090
1091 MPI_Irecv(largeBuffer_, largeSize_, MPI_PACKED, MPI_ANY_SOURCE,
1092 MPI_ANY_TAG, MPI_COMM_WORLD, &request);
1093
1094 while (true) {
1095
1096 flag = 1;
1097
1098 //**------------------------------------------------
1099 // Listen and process msg for a period.
1100 //**------------------------------------------------
1101
1102 hubTimer_.start();
1103 elaspeTime = 0.0;
1104 while (elaspeTime < hubReportPeriod_ ) {
1105 MPI_Test(&request, &flag, &status);
1106 if (flag) { // Received a msg
1107 processMessages(largeBuffer_, status, request);
1108 }
1109 elaspeTime = hubTimer_.getTime();
1110 }
1111
1112 //std::cout << "++++++ hubReportPeriod_ = "
1113 // << hubReportPeriod_ << std::endl;
1114
1115
1116 //**------------------------------------------------
1117 // if forceTerminate_ == true;
1118 //**------------------------------------------------
1119
1120 if (forceTerminate_) {
1121 // Delete all subtrees if hub work.
1122 if (hubWork_) {
1123 deleteSubTrees();
1124 updateWorkloadInfo();
1125 assert(!(subTreePool_->hasKnowledge()));
1126 }
1127 if (!deletedSTs) {
1128 hubForceWorkerTerm();
1129 }
1130
1131 deletedSTs = true;
1132 }
1133
1134 //**------------------------------------------------
1135 // Check if all my workers have reported.
1136 //**------------------------------------------------
1137
1138 if (!allWorkerReported) {
1139 workerWorkQualities_[masterRank_] = workQuality_;
1140 workerWorkQuantities_[masterRank_] = workQuantity_;
1141 workerReported_[masterRank_] = true;
1142 allWorkerReported = true;
1143 for (i = 0; i < clusterSize_; ++i) {
1144 if (workerReported_[i] == false) {
1145 allWorkerReported = false;
1146 break;
1147
1148 }
1149 }
1150 }
1151
1152 //**------------------------------------------------
1153 // If hub work, do one unit of work.
1154 //**------------------------------------------------
1155
1156 if ( !haltSearch_ && hubWork_ &&
1157 (workingSubTree_ != 0 || hasKnowledge(AlpsKnowledgeTypeSubTree))){
1158
1159 //reportCount = hubReportFreqency;
1160
1161 // NOTE: will stop when there is a better solution.
1162 bool betterSolution = true;
1163 int thisNumProcessed = 0;
1164 int thisNumBranched = 0;
1165 int thisNumDiscarded = 0;
1166 int thisNumPartial = 0;
1167
1168 // Compute unit work based on node processing time
1169 if (comUnitWork) {
1170 unitWorkNodes_ = computeUnitNodes(unitWorkNodes_, nodeProcessingTime_);
1171 if (++numComUnitWork > 200) {
1172 comUnitWork = false;
1173 }
1174 }
1175 try {
1176 rCode = doOneUnitWork(unitWorkNodes_,
1177 unitTime,
1178 exitStatus,
1179 thisNumProcessed,
1180 thisNumBranched,
1181 thisNumDiscarded,
1182 thisNumPartial,
1183 treeDepth_,
1184 betterSolution);
1185 nodeProcessedNum_ += thisNumProcessed;
1186 nodeBranchedNum_ += thisNumBranched;
1187 nodeDiscardedNum_ += thisNumDiscarded;
1188 nodePartialNum_ += thisNumPartial;
1189 }
1190 catch (std::bad_alloc&) {
1191 errorCode = 1;
1192 }
1193 catch(CoinError& er) {
1194 errorCode = 2;
1195 }
1196 catch(...) {
1197 errorCode = 3;
1198 }
1199
1200 if (errorCode) {
1201 // Do we want to free some memory?
1202 if (hubWork_) {
1203 deleteSubTrees();
1204 updateWorkloadInfo();
1205 assert(!(subTreePool_->hasKnowledge()));
1206 }
1207 haltSearch_ = true;
1208 sendErrorCodeToMaster(errorCode);
1209 }
1210
1211 // Update work load quantity and quality info.
1212 updateWorkloadInfo();
1213
1214 // Adjust workingSubTree_ if it 'much' worse than the best one.
1215 changeWorkingSubTree(changeWorkThreshold);
1216
1217 #if 0
1218 std::cout << "******** HUB [" << globalRank_ << "]: "
1219 << " quality = " << workQuality_
1220 << "; quantity = " << workQuantity_ << std::endl;
1221 #endif
1222
1223 if (betterSolution) {
1224 // Double check if better. If yes, update and sent it to Master
1225 double incVal =
1226 getBestKnowledge(AlpsKnowledgeTypeSolution).second;
1227 if(incVal < incumbentValue_) { // Minimization
1228 incumbentValue_ = incVal;
1229 incumbentID_ = globalRank_;
1230 sendKnowledge(AlpsKnowledgeTypeSolution, //useful
1231 globalRank_,
1232 0,
1233 smallBuffer_,
1234 0,
1235 MPI_ANY_TAG,
1236 MPI_COMM_WORLD,
1237 false);
1238 //sendIncumbent();
1239 if (hubMsgLevel_ > 0) {
1240 messageHandler()->message(ALPS_SOLUTION_SEARCH,
1241 messages())
1242 << globalRank_ << incVal << CoinMessageEol;
1243 }
1244 }
1245 }
1246 }// EOF if hub work
1247
1248 //**------------------------------------------------
1249 // Add into the hub's status to cluster's status.
1250 // Need to check if report.
1251 //**------------------------------------------------
1252
1253 refreshClusterStatus();
1254
1255 #if 0
1256 std::cout << "HUB "<< globalRank_
1257 << ": clusterWorkQuality_ = " << clusterWorkQuality_
1258 << ": clusterWorkQuantity_ = " << clusterWorkQuantity_
1259 << ", sendCount_ = " << sendCount_
1260 << ", recvCount_ = " << recvCount_
1261 << ", allWorkerReported = " << allWorkerReported
1262 << ", blockHubReport_ = " << blockHubReport_
1263 << ", blockTermCheck_ = " << blockTermCheck_
1264 << std::endl;
1265 #endif
1266
1267 //**------------------------------------------------
1268 // If needed, report cluster status, and check if should
1269 // block report.
1270 //**------------------------------------------------
1271
1272 if ((clusterSendCount_ || clusterRecvCount_ || !blockHubReport_)) {
1273 //&& reportCount == hubReportFreqency) {
1274
1275 //reportCount = 0;
1276 incSendCount("hubMain()-hubReportStatus");
1277
1278 if (hubWork_) {
1279 updateWorkloadInfo();
1280 }
1281 refreshClusterStatus(); // IMPORTANT: report latest state
1282 hubReportStatus(AlpsMsgHubPeriodReport, MPI_COMM_WORLD);
1283
1284 if (clusterWorkQuantity_ < zeroLoad) {
1285 blockHubReport_ = true;
1286 #ifdef NF_DEBUG_MORE
1287 std::cout << "HUB[" << globalRank_ << "]: blockHubReport"
1288 << std::endl;
1289 #endif
1290 }
1291 else {
1292 blockHubReport_ = false;
1293 #ifdef NF_DEBUG_MORE
1294 std::cout << "HUB[" << globalRank_ << "]: unblockHubReport"
1295 << std::endl;
1296 #endif
1297 }
1298 }
1299
1300 //**------------------------------------------------
1301 // If master ask hub to do termination check.
1302 //**------------------------------------------------
1303
1304 if (!blockTermCheck_) {
1305
1306 // Ask hub's workers to check termination
1307 for (i = 0; i < clusterSize_; ++i) {
1308 if (i != clusterRank_) {
1309 int workRank = globalRank_ - masterRank_ + i;
1310 MPI_Send(smallBuffer_, 0, MPI_PACKED, workRank,
1311 AlpsMsgAskPause, MPI_COMM_WORLD);
1312 #ifdef NF_DEBUG
1313 std::cout << "HUB[" << globalRank_ << "]: ask its worker "
1314 << workRank << " to do TERM check."
1315 << ", clusterRank_=" << clusterRank_
1316 << ", i=" << i << std::endl;
1317 #endif
1318 }
1319 }
1320
1321 // Recv workers' stati
1322 MPI_Request termReq;
1323 MPI_Status termSta;
1324 for(i = 1; i < clusterSize_; ++i) {
1325 MPI_Irecv(smallBuffer_, smallSize, MPI_PACKED, MPI_ANY_SOURCE,
1326 AlpsMsgWorkerTermStatus, clusterComm_, &termReq);
1327 MPI_Wait(&termReq, &termSta);
1328 hubUpdateCluStatus(smallBuffer_, &termSta, clusterComm_);
1329 }
1330
1331 // No workQuantity_ and workQuality for the hub
1332 //clusterWorkQuality_ += workQuality_;
1333 //clusterWorkQuantity_ += workQuantity_;
1334 clusterSendCount_ += sendCount_;
1335 clusterRecvCount_ += recvCount_;
1336 sendCount_ = recvCount_ = 0;
1337
1338 // Report my status to master
1339 hubReportStatus(AlpsMsgHubTermStatus, hubComm_);
1340
1341 #ifdef NF_DEBUG
1342 std::cout << "HUB[" << globalRank_ << "]: reported TERM status to "
1343 << "master " << masterRank_ << std::endl;
1344 #endif
1345
1346 // Get termination instruction from Master
1347 // NOTE: master's rank is always 0 in hubComm_.
1348 MPI_Irecv(&reply, 1, MPI_CHAR, 0, AlpsMsgContOrTerm, hubComm_,
1349 &termReq);
1350 MPI_Wait(&termReq, &termSta);
1351
1352 #ifdef NF_DEBUG
1353 std::cout << "HUB[" << globalRank_ << "]: received TERM instruction "
1354 << reply << " from master " << masterRank_ << std::endl;
1355 #endif
1356
1357
1358 if(reply == 'T') {
1359 if (hubMsgLevel_ > 0) {
1360 messageHandler()->message(ALPS_TERM_HUB_INFORM, messages())
1361 << globalRank_<< "exit" << CoinMessageEol;
1362 }
1363
1364 // Send instruction to my workers
1365 for (i = 0; i < clusterSize_; ++i) {
1366 if (i != clusterRank_) {
1367 reply = 'T';
1368 MPI_Send(&reply, 1, MPI_CHAR, i, AlpsMsgContOrTerm,
1369 clusterComm_);
1370 #ifdef NF_DEBUG
1371 std::cout << "HUB[" << globalRank_ << "]: ask its worker "
1372 << i << " to TERM." << std::endl;
1373 #endif
1374 }
1375 }
1376 terminate = true;
1377 break; // Break * and terminate
1378 }
1379 else {
1380 if (hubMsgLevel_ > 0) {
1381 messageHandler()->message(ALPS_TERM_HUB_INFORM, messages())
1382 << globalRank_<< "continue" << CoinMessageEol;
1383 }
1384 for (i = 0; i < clusterSize_; ++i) {
1385 if (i != clusterRank_) {
1386 reply = 'C';
1387 MPI_Send(&reply, 1, MPI_CHAR, i, AlpsMsgContOrTerm,
1388 clusterComm_);
1389 #ifdef NF_DEBUG
1390 std::cout << "HUB[" << globalRank_ << "]: ask its worker "
1391 << i << " to continue." << std::endl;
1392 #endif
1393 }
1394 }
1395 terminate = false;
1396 blockTermCheck_ = true;
1397 blockHubReport_ = false;
1398 }
1399 }
1400
1401 //**------------------------------------------------
1402 // Hub balances the workload of its workers if
1403 // (1) not terminate,
1404 // (2) not force terminate,
1405 // (3) all workers have reported, and
1406 // (4) previous balance has been completed.
1407 //**------------------------------------------------
1408
1409 if ( !terminate && !forceTerminate_ &&
1410 allWorkerReported && hubDoBalance_ == 0 ) {
1411 if (clusterSize_ > 2 && intraCB) {
1412 hubBalanceWorkers();
1413 ++hubCheckCount;
1414 if (hubCheckCount % 10 == 0) {
1415 if (hubMsgLevel_ > 0) {
1416 messageHandler()->message(ALPS_LOADBAL_HUB, messages())
1417 << globalRank_ << hubCheckCount << CoinMessageEol;
1418 }
1419 }
1420 }
1421 }
1422
1423 } // EOF while
1424
1425
1426 //------------------------------------------------------
1427 // Clean up before leaving.
1428 //------------------------------------------------------
1429
1430 int cancelNum = 0;
1431 MPI_Cancel(&request);
1432 MPI_Wait(&request, &status);
1433 MPI_Test_cancelled(&status, &flag);
1434 if(flag) { // Cancel succeeded
1435 ++cancelNum;
1436 }
1437 }
1438
1439 //#############################################################################
1440
1441 void
workerMain()1442 AlpsKnowledgeBrokerMPI::workerMain()
1443 {
1444 bool isIdle = false;
1445 int thisNumProcessed = 0;
1446 int thisNumBranched = 0;
1447 int thisNumDiscarded = 0;
1448 int thisNumPartial = 0;
1449 int errorCode = 0;
1450
1451 char reply = 'C';
1452 char* tempBuffer = 0;
1453
1454 bool terminate = false;
1455 bool allMsgReceived = false;
1456 bool deletedSTs = false;
1457 bool comUnitWork = false;
1458
1459 int numComUnitWork = 0;
1460
1461 MPI_Status status;
1462 AlpsReturnStatus rCode = AlpsReturnStatusOk;
1463 AlpsExitStatus exitStatus = AlpsExitStatusInfeasible;
1464
1465 AlpsTimer msgTimer;
1466
1467 //------------------------------------------------------
1468 // Get parameters.
1469 //------------------------------------------------------
1470
1471 const int staticBalanceScheme =
1472 model_->AlpsPar()->entry(AlpsParams::staticBalanceScheme);
1473 const int nodeLogInterval =
1474 model_->AlpsPar()->entry(AlpsParams::nodeLogInterval);
1475 const int smallSize =
1476 model_->AlpsPar()->entry(AlpsParams::smallSize);
1477 const int workerMsgLevel =
1478 model_->AlpsPar()->entry(AlpsParams::workerMsgLevel);
1479 const double zeroLoad =
1480 model_->AlpsPar()->entry(AlpsParams::zeroLoad);
1481 double changeWorkThreshold =
1482 model_->AlpsPar()->entry(AlpsParams::changeWorkThreshold);
1483 const double needWorkThreshold =
1484 model_->AlpsPar()->entry(AlpsParams::needWorkThreshold);
1485 const bool intraCB =
1486 model_->AlpsPar()->entry(AlpsParams::intraClusterBalance);
1487 double unitTime =
1488 model_->AlpsPar()->entry(AlpsParams::unitWorkTime);
1489 if (unitTime <= 0.0) {
1490 unitTime = ALPS_DBL_MAX;
1491 }
1492
1493 unitWorkNodes_ = model_->AlpsPar()->entry(AlpsParams::unitWorkNodes);
1494 if (unitWorkNodes_ <= 0) {
1495 comUnitWork = true;
1496 }
1497
1498 largeSize_ = model_->AlpsPar()->entry(AlpsParams::largeSize);
1499
1500 workerTimer_.setClockType(AlpsClockTypeWallClock);
1501 msgTimer.setClockType(AlpsClockTypeWallClock);
1502
1503 //------------------------------------------------------
1504 // Recv node memory size from hub.
1505 //------------------------------------------------------
1506
1507 MPI_Recv(&nodeMemSize_, 1, MPI_INT, masterRank_, /* masterRank_ is 0 or 1*/
1508 AlpsMsgNodeSize, clusterComm_, &status);
1509
1510 // Adjust largeSize to avoid extreme cases.
1511 largeSize_ = CoinMax(largeSize_, nodeMemSize_ * 3);
1512
1513 largeBuffer_ = new char [largeSize_];
1514 largeBuffer2_ = new char [largeSize_];
1515 smallBuffer_ = new char [smallSize];
1516
1517 MPI_Barrier(MPI_COMM_WORLD); // Sync before rampup
1518
1519 //======================================================
1520 // Worker's Ramp-up.
1521 //======================================================
1522
1523 workerTimer_.start();
1524
1525 setPhase(AlpsPhaseRampup);
1526 switch (staticBalanceScheme) {
1527 case AlpsRootInit:
1528 rootInitWorker();
1529 break;
1530 case AlpsSpiral:
1531 spiralWorker();
1532 break;
1533 default:
1534 throw CoinError("Unknown static balance scheme", "workerMain",
1535 "AlpsKnowledgeBrokerMPI");
1536 }
1537
1538 rampUpTime_ = workerTimer_.getTime();
1539
1540 //======================================================
1541 // Worker start to search.
1542 //======================================================
1543
1544 if (workerMsgLevel > 0) {
1545 messageHandler()->message(ALPS_SEARCH_WORKER_START, messages())
1546 <<globalRank_ << CoinMessageEol;
1547 }
1548
1549 setPhase(AlpsPhaseSearch);
1550
1551 //------------------------------------------------------
1552 // WORKER SCHEDULER:
1553 // (1) Listen and process messages until no message is in msg queue.
1554 // (2) Do one unit of work.
1555 // (3) Report status or check termination.
1556 //------------------------------------------------------
1557
1558 MPI_Request request;
1559 int flag;
1560
1561 MPI_Irecv(largeBuffer_, largeSize_, MPI_PACKED, MPI_ANY_SOURCE,
1562 MPI_ANY_TAG, MPI_COMM_WORLD, &request);
1563
1564 while (true) {
1565
1566 blockTermCheck_ = true;
1567
1568 //**------------------------------------------------
1569 // Listen and process until no message.
1570 //**------------------------------------------------
1571
1572 msgTimer.start();
1573 while (true) {
1574 MPI_Test(&request, &flag, &status);
1575 if (flag) { // Receive a msg
1576 allMsgReceived = false;
1577 processMessages(largeBuffer_, status, request);
1578 }
1579 else {
1580 allMsgReceived = true;
1581 break;
1582 }
1583 }
1584 msgTime_ += msgTimer.getTime();
1585
1586 //**------------------------------------------------
1587 // if forceTerminate_ == true;
1588 //**------------------------------------------------
1589
1590 //if (forceTerminate_ && !deletedSTs) {
1591 // Don't know why error on limits
1592 if (forceTerminate_) {
1593
1594 deletedSTs = true;
1595
1596 // Remove nodes from node pools
1597 deleteSubTrees();
1598 assert(!(subTreePool_->hasKnowledge()));
1599 updateWorkloadInfo();
1600 }
1601
1602 //**------------------------------------------------
1603 // If don't check termination, do one unit of work.
1604 //**------------------------------------------------
1605
1606 if (blockTermCheck_) {
1607
1608 bool betterSolution = false;
1609
1610 // Check whether need ask for node index before doing work
1611 if (!haltSearch_) {
1612 if (getMaxNodeIndex() - getNextNodeIndex() < (unitWorkNodes_ + 5)) {
1613 workerAskIndices();
1614 haltSearch_ = true;
1615 }
1616 }
1617
1618 if ( !haltSearch_ &&
1619 (workingSubTree_ || hasKnowledge(AlpsKnowledgeTypeSubTree))) {
1620 if(isIdle) {
1621 // Since has work now, set isIdle to false
1622 idleTime_ += workerTimer_.getTime();
1623 isIdle = false;
1624 }
1625
1626 // Need check better solution.
1627 betterSolution = true;
1628
1629 // Compute unit work based on node processing time
1630 if (comUnitWork) {
1631 unitWorkNodes_ = computeUnitNodes(unitWorkNodes_,
1632 nodeProcessingTime_);
1633 if (++numComUnitWork > 20) {
1634 comUnitWork = false;
1635 }
1636 }
1637
1638 try {
1639 rCode = doOneUnitWork(unitWorkNodes_,
1640 unitTime,
1641 exitStatus,
1642 thisNumProcessed,
1643 thisNumBranched,
1644 thisNumDiscarded,
1645 thisNumPartial,
1646 treeDepth_,
1647 betterSolution);
1648 nodeProcessedNum_ += thisNumProcessed;
1649 nodeBranchedNum_ += thisNumBranched;
1650 nodeDiscardedNum_ += thisNumDiscarded;
1651 nodePartialNum_ += thisNumPartial;
1652 //std::cout << "Nodes branched" << thisNumBranched << std::endl;
1653 }
1654 catch (std::bad_alloc&) {
1655 errorCode = 1;
1656 }
1657 catch(CoinError& er) {
1658 errorCode = 2;
1659 }
1660 catch(...) {
1661 errorCode = 3;
1662 }
1663 if (errorCode) {
1664 haltSearch_ = true;
1665 deleteSubTrees();
1666 assert(!(subTreePool_->hasKnowledge()));
1667 updateWorkloadInfo();
1668 sendErrorCodeToMaster(errorCode);
1669 }
1670
1671 // Adjust workingSubTree_ if it 'much' worse than the best one
1672 changeWorkingSubTree(changeWorkThreshold);
1673
1674 // Print tree size
1675 if ( (msgLevel_ == 1) &&
1676 (nodeProcessedNum_ % nodeLogInterval == 0) ) {
1677 if (workerMsgLevel > 0) {
1678 messageHandler()->message(ALPS_NODE_COUNT, messages())
1679 << globalRank_<<nodeProcessedNum_
1680 <<updateNumNodesLeft()
1681 << CoinMessageEol;
1682 }
1683 }
1684
1685 // Share generated knowledge.
1686 sendModelKnowledge(MPI_COMM_WORLD, // Comm
1687 globalRank_); // Receiver(no use)
1688 }
1689 else {
1690 // Worker is idle.
1691 if (!isIdle) {
1692 workerTimer_.start();
1693 isIdle = true;
1694 }
1695 }
1696
1697 // If has better solution, check whether need to send it
1698 if (betterSolution) {
1699 // Double check if better. If yes, update and sent it to Master
1700 double incVal = getBestKnowledge(AlpsKnowledgeTypeSolution).second;
1701 if(incVal < incumbentValue_) { // Minimization
1702 incumbentValue_ = incVal;
1703 incumbentID_ = globalRank_;
1704 #ifdef NF_DEBUG
1705 std::cout << "\nWORKDER[" << globalRank_ <<
1706 "]: send a better solution. Quality = "
1707 << incumbentValue_ << std::endl;
1708 #endif
1709 sendKnowledge(AlpsKnowledgeTypeSolution, //useful
1710 globalRank_,
1711 0,
1712 smallBuffer_,
1713 0,
1714 MPI_ANY_TAG,
1715 MPI_COMM_WORLD,
1716 false);
1717 //sendIncumbent();
1718 if (workerMsgLevel > 0) {
1719 messageHandler()->message(ALPS_SOLUTION_SEARCH,
1720 messages())
1721 << globalRank_ << incVal << CoinMessageEol;
1722 }
1723 }
1724 }
1725
1726 // Report its status to hub periodically if msg counts are not
1727 // zero or not blocked. If no load, worker will block report
1728
1729 updateWorkloadInfo();
1730
1731 if (sendCount_ || recvCount_ || !blockWorkerReport_) {
1732 incSendCount("workerMain() - workerReportStatus");
1733 workerReportStatus(AlpsMsgWorkerStatus, MPI_COMM_WORLD);
1734
1735 if (workQuantity_ < zeroLoad) {
1736 blockWorkerReport_ = true;
1737 }
1738 if ( intraCB && !forceTerminate_ &&
1739 (workQuantity_ < needWorkThreshold) &&
1740 (blockAskForWork_ == false) ) {
1741
1742 ++(psStats_.workerAsk_);
1743
1744 MPI_Send(tempBuffer, 0, MPI_PACKED, myHubRank_,
1745 AlpsMsgWorkerNeedWork, MPI_COMM_WORLD);
1746 incSendCount("workerAskForWork");
1747 blockAskForWork_ = true;
1748 }
1749
1750 #if 0
1751 std::cout << "WORKER: " << globalRank_
1752 << " after updateWorkloadInfo() = "
1753 << workQuantity_ << std::endl;
1754 #endif
1755 }
1756 }
1757 else { /* Do termination check. */
1758 #if 0
1759 std::cout << "WORKER[" << globalRank_ << "] check termination."
1760 << std::endl;
1761 #endif
1762 // Record rampdown
1763 rampDownTime_ = workerTimer_.getTime();
1764
1765 // Report my latest status
1766 updateWorkloadInfo();
1767 workerReportStatus(AlpsMsgWorkerTermStatus, clusterComm_);
1768
1769 #ifdef NF_DEBUG
1770 std::cout << "WORKER[" << globalRank_ << "] reported TEMR status."
1771 << std::endl;
1772 #endif
1773 // Get instruction from my hub
1774 MPI_Request termReq;
1775 MPI_Status termSta;
1776 MPI_Irecv(&reply, 1, MPI_CHAR, masterRank_, AlpsMsgContOrTerm,
1777 clusterComm_, &termReq);
1778 MPI_Wait(&termReq, &termSta);
1779
1780 if(reply == 'T') {
1781 terminate = true;
1782 if (workerMsgLevel > 0) {
1783 messageHandler()->message(ALPS_TERM_WORKER_INFORM, messages())
1784 << globalRank_ << "exit" << CoinMessageEol;
1785 }
1786 break; // Break * and terminate
1787 }
1788 else {
1789 if (workerMsgLevel > 0) {
1790 messageHandler()->message(ALPS_TERM_WORKER_INFORM, messages())
1791 << globalRank_ << "continue" << CoinMessageEol;
1792 }
1793 blockWorkerReport_ = false;
1794 blockTermCheck_ = true;
1795 terminate = false;
1796 }
1797 }
1798 }
1799
1800 //------------------------------------------------------
1801 // How many node left?
1802 //------------------------------------------------------
1803
1804 updateNumNodesLeft();
1805
1806 //------------------------------------------------------
1807 // Cancel MPI_Irecv before leaving.
1808 //------------------------------------------------------
1809
1810 int cancelNum = 0;
1811 MPI_Cancel(&request);
1812 MPI_Wait(&request, &status);
1813 MPI_Test_cancelled(&status, &flag);
1814 if(flag) { // Cancel succeeded
1815 ++cancelNum;
1816 }
1817 }
1818
1819 //#############################################################################
1820
1821 // Process messages.
1822 void
processMessages(char * & bufLarge,MPI_Status & status,MPI_Request & request)1823 AlpsKnowledgeBrokerMPI::processMessages(char *&bufLarge,
1824 MPI_Status &status,
1825 MPI_Request &request)
1826 {
1827 int count;
1828 bool success = false;
1829
1830 incRecvCount("Processing msg");
1831 switch (status.MPI_TAG) {
1832
1833 //--------------------------------------------------
1834 // Following are master's msgs.
1835 //--------------------------------------------------
1836 case AlpsMsgErrorCode:
1837 recvErrorCode(bufLarge);
1838 masterForceHubTerm();
1839 hubForceWorkerTerm();
1840 break;
1841 case AlpsMsgHubFailFindDonor:
1842 --masterDoBalance_;
1843 break;
1844 case AlpsMsgHubPeriodReport:
1845 masterUpdateSysStatus(bufLarge, &status, MPI_COMM_WORLD);
1846 break;
1847 case AlpsMsgTellMasterRecv:
1848 --masterDoBalance_;
1849 break;
1850 case AlpsMsgWorkerAskIndices:
1851 masterSendIndices(bufLarge);
1852 break;
1853
1854 //-------------------------------------------------
1855 // Following are hub's msgs.
1856 //-------------------------------------------------
1857
1858 case AlpsMsgWorkerNeedWork:
1859 hubSatisfyWorkerRequest(bufLarge, &status);
1860 break;
1861 case AlpsMsgAskHubShare:
1862 hubsShareWork(bufLarge, &status);
1863 break;
1864 case AlpsMsgAskHubPause:
1865 // Do not count messages during terminate checking
1866 decRecvCount("hub periodical listening: AlpsMsgAskPause");
1867 blockTermCheck_ = false;
1868 break;
1869 case AlpsMsgTellHubRecv:
1870 --hubDoBalance_;
1871 break;
1872 case AlpsMsgWorkerStatus:
1873 hubUpdateCluStatus(bufLarge, &status, MPI_COMM_WORLD);
1874 blockHubReport_ = false;
1875 break;
1876
1877 //--------------------------------------------------
1878 // Following are worker's msg.
1879 //-------------------------------------------------
1880
1881 case AlpsMsgAskDonate:
1882 donateWork(bufLarge, AlpsMsgSubTree, &status);
1883 incSendCount("worker listening - AlpsMsgAskDonate");
1884 break;
1885 case AlpsMsgAskDonateToHub:
1886 donateWork(bufLarge, AlpsMsgSubTreeByMaster, &status);
1887 incSendCount("worker listening - AlpsMsgAskDonateToHub");
1888 break;
1889 case AlpsMsgAskDonateToWorker:
1890 donateWork(bufLarge, AlpsMsgSubTreeByWorker, &status);
1891 incSendCount("worker listening - AlpsMsgAskDonateToWorker");
1892 break;
1893 case AlpsMsgAskPause:
1894 // Do not count messages during terminate checking
1895 decRecvCount("worker listening - AlpsMsgAskPause");
1896 blockTermCheck_ = false;
1897 break;
1898 case AlpsMsgIndicesFromMaster:
1899 workerRecvIndices(bufLarge);
1900 break;
1901 case AlpsMsgSubTree:
1902 receiveSubTree(bufLarge, status.MPI_SOURCE, &status);
1903 tellHubRecv();
1904 MPI_Get_count(&status, MPI_PACKED, &count);
1905 if (count > 0){
1906 blockWorkerReport_ = false;
1907 blockAskForWork_ = false;
1908 }
1909 break;
1910 case AlpsMsgSubTreeByWorker:
1911 receiveSubTree(bufLarge, status.MPI_SOURCE, &status);
1912 MPI_Get_count(&status, MPI_PACKED, &count);
1913 if (count > 0){
1914 blockWorkerReport_ = false;
1915 blockAskForWork_ = false;
1916 }
1917 break;
1918
1919 //--------------------------------------------------
1920 // Following are common msgs.
1921 //--------------------------------------------------
1922
1923 case AlpsMsgModelGenSearch:
1924 receiveModelKnowledge(MPI_COMM_WORLD);
1925 forwardModelKnowledge();
1926 break;
1927 case AlpsMsgIncumbentTwo:
1928 success = unpackSetIncumbent(bufLarge, &status);
1929 if (success) {
1930 sendIncumbent();
1931 //updateIncumbent_ = false;
1932 }
1933 break;
1934 case AlpsMsgSubTreeByMaster:
1935 if (globalRank_ == masterRank_) {
1936 hubAllocateDonation(bufLarge, &status);
1937 MPI_Get_count(&status, MPI_PACKED, &count);
1938 --masterDoBalance_;
1939 }
1940 else if (globalRank_ == myHubRank_) {
1941 hubAllocateDonation(bufLarge, &status);
1942 tellMasterRecv();
1943 MPI_Get_count(&status, MPI_PACKED, &count);
1944 if (count > 0) {
1945 blockHubReport_ = false;
1946 }
1947 }
1948 else {
1949 receiveSubTree(bufLarge, status.MPI_SOURCE, &status);
1950 MPI_Get_count(&status, MPI_PACKED, &count);
1951 if (count > 0) blockWorkerReport_ = false;
1952 break;
1953 }
1954
1955 break;
1956 case AlpsMsgForceTerm:
1957 forceTerminate_ = true;
1958 break;
1959 default:
1960 std::cout << "PROC " << globalRank_ << " : processMessages"
1961 << " : received UNKNOWN message. tag = "
1962 << status.MPI_TAG << std::endl;
1963 throw CoinError("Unknown message type", "workermain",
1964 "AlpsKnowledgeBrokerMPI");
1965 }
1966
1967 MPI_Irecv(bufLarge, largeSize_, MPI_PACKED, MPI_ANY_SOURCE,
1968 MPI_ANY_TAG, MPI_COMM_WORLD, &request);
1969
1970 }
1971
1972 //#############################################################################
1973
1974 // Master ask the donor hub to send work to the receiver hub
1975 void
masterAskHubDonate(int donorID,int receiverID,double receiverWorkload)1976 AlpsKnowledgeBrokerMPI::masterAskHubDonate(int donorID,
1977 int receiverID,
1978 double receiverWorkload)
1979 {
1980 int smallSize =
1981 model_->AlpsPar()->entry(AlpsParams::smallSize);
1982 int pos = 0;
1983
1984 MPI_Pack(&receiverID, 1, MPI_INT, smallBuffer_, smallSize, &pos, MPI_COMM_WORLD);
1985 MPI_Pack(&receiverWorkload, 1, MPI_DOUBLE, smallBuffer_, smallSize, &pos,
1986 MPI_COMM_WORLD);
1987
1988 #if 0
1989 std::cout << "masterAskHubDonate(): donor is " << donorID << std::endl;
1990 #endif
1991
1992 MPI_Send(smallBuffer_,pos, MPI_PACKED, donorID, AlpsMsgAskHubShare, MPI_COMM_WORLD);
1993 incSendCount("masterAskHubDonate()");
1994
1995 }
1996
1997 //#############################################################################
1998
1999 // Master ask the donor hub to send work to the receiver hub
2000 void
hubAskWorkerDonate(int donorID,int receiverID,double receiverWorkload)2001 AlpsKnowledgeBrokerMPI::hubAskWorkerDonate(int donorID,
2002 int receiverID,
2003 double receiverWorkload)
2004 {
2005 int smallSize = model_->AlpsPar()->entry(AlpsParams::smallSize);
2006 int pos = 0;
2007
2008 MPI_Pack(&receiverID, 1, MPI_INT, smallBuffer_, smallSize, &pos,
2009 MPI_COMM_WORLD);
2010 MPI_Pack(&receiverWorkload, 1, MPI_DOUBLE, smallBuffer_, smallSize, &pos,
2011 MPI_COMM_WORLD);
2012
2013 #ifdef NF_DEBUG_MORE
2014 std::cout << "hubAskHubDonate() send to " << donorID << std::endl;
2015 #endif
2016
2017 MPI_Send(smallBuffer_, pos, MPI_PACKED, donorID, AlpsMsgAskDonate,
2018 MPI_COMM_WORLD);
2019 incSendCount("hubAskWorkerDonate()");
2020 }
2021
2022 //#############################################################################
2023
2024 // Calculate the quality and quantity of workload on this process
2025 void
updateWorkloadInfo()2026 AlpsKnowledgeBrokerMPI::updateWorkloadInfo()
2027 {
2028 int count;
2029
2030 workQuality_ = ALPS_OBJ_MAX; // Worst ever possible
2031 workQuantity_ = 0.0; // No work
2032
2033 if ( (workingSubTree_ == NULL) &&
2034 (subTreePool_->hasKnowledge() == false) ) {
2035 return;
2036 }
2037
2038 std::vector<AlpsSubTree* > subTreeVec =
2039 subTreePool_->getSubTreeList().getContainer();
2040 std::vector<AlpsSubTree* >::iterator pos1, pos2;
2041 pos1 = subTreeVec.begin();
2042 pos2 = subTreeVec.end();
2043
2044 workQuality_ = subTreePool_->getBestQuality();
2045
2046 // if (pos1 != pos2) {
2047 // workQuality_ = (*pos1)->getQuality();//Best in pool
2048 // }
2049
2050 for (; pos1 != pos2; ++pos1) {
2051 if ((*pos1)->getNumNodes() > 5) {
2052 //count = 5;
2053 count = (*pos1)->getNumNodes();
2054 }
2055 else {
2056 count = (*pos1)->getNumNodes();
2057 }
2058 workQuantity_ += count;
2059 }
2060
2061 if (workingSubTree_ != 0) {
2062 workingSubTree_->calculateQuality();
2063 if (workQuality_ > workingSubTree_->getQuality()) {
2064 workQuality_ = workingSubTree_->getQuality();
2065 }
2066 if (workingSubTree_->getNumNodes() > 5) {
2067 //count = 5;
2068 count = workingSubTree_->getNumNodes();
2069 }
2070 else {
2071 count = workingSubTree_->getNumNodes();
2072 }
2073 workQuantity_ += count;
2074 }
2075
2076 #ifdef NF_DEBUG_MORE
2077 std::cout << "PROC[" << globalRank_ << "] incumbentValue_ = "
2078 << incumbentValue_ << "; workQuality_ = "
2079 << workQuality_<< "; workQuantity_ = "
2080 << workQuantity_ << std::endl;
2081 #endif
2082 }
2083
2084 //#############################################################################
2085
2086 // A worker donates a subtree to another worker(whose info is in bufLarge)
2087 void
donateWork(char * & anyBuffer,int tag,MPI_Status * status,int recvID,double recvWL)2088 AlpsKnowledgeBrokerMPI::donateWork(char*& anyBuffer,
2089 int tag,
2090 MPI_Status* status,
2091 int recvID,
2092 double recvWL)
2093 {
2094 int size =
2095 model_->AlpsPar()->entry(AlpsParams::smallSize);
2096 int pos = 0;
2097 int receiverID = 0;
2098 int treeSize;
2099
2100 char* dummyBuf = 0;
2101 double receiverWorkload = 0.0;
2102 bool sentSuccessful = false;
2103 AlpsSubTree* aSubTree = 0;
2104
2105 #ifdef NF_DEBUG_MORE
2106 updateWorkloadInfo();
2107 messageHandler()->message(ALPS_DONATE_BEFORE, messages())
2108 << globalRank_ << workQuantity_ << subTreePool_->getNumKnowledges()
2109 << CoinMessageEol;
2110 #endif
2111
2112 //------------------------------------------------------
2113 // Find out to which process I should donate.
2114 //------------------------------------------------------
2115
2116 if (recvID == -1) {
2117 MPI_Unpack(anyBuffer, size, &pos, &receiverID, 1, MPI_INT, MPI_COMM_WORLD);
2118 MPI_Unpack(anyBuffer, size, &pos, &receiverWorkload, 1, MPI_DOUBLE,
2119 MPI_COMM_WORLD);
2120 }
2121 else {
2122 receiverID = recvID;
2123 receiverWorkload = recvWL;
2124 }
2125
2126
2127 //------------------------------------------------------
2128 // Check if previous sending subtree has completed.
2129 //------------------------------------------------------
2130
2131 int alreadySent = true;
2132 if (subTreeRequest_ != MPI_REQUEST_NULL){
2133 MPI_Status sentStatus;
2134 MPI_Test(&subTreeRequest_, &alreadySent, &sentStatus);
2135 #if 0
2136 if (alreadySent) {
2137 std::cout << "++++ Process[" << globalRank_
2138 << "] have completed sending a subtree." << std::endl;
2139 }
2140 else {
2141 std::cout << "++++ Process[" << globalRank_
2142 << "] haven't completed sending a subtree." << std::endl;
2143 }
2144 #endif
2145 }
2146 else {
2147 #if 0
2148 std::cout << "++++ Process[" << globalRank_
2149 << "] ready to sent a subtree." << std::endl;
2150 #endif
2151 }
2152
2153 //------------------------------------------------------
2154 // Case 1: If subTreePool has subtrees, send the best one.
2155 // if the size of the best one is big large, split it.
2156 // Case 2: If subTreePool has no subtrees AND workingSubTree_ does not
2157 // point to NULL, split it and send one part of it.
2158 // Case 3: Otherwise, sent a empty msg.
2159 //------------------------------------------------------
2160
2161 if (alreadySent) {
2162 if (subTreePool_->hasKnowledge()) { // Case 1
2163 aSubTree = dynamic_cast<AlpsSubTree* >
2164 (subTreePool_->getKnowledge().first);
2165 sentSuccessful = sendSubTree(receiverID, aSubTree, tag);
2166
2167 if (sentSuccessful) {
2168 ++(psStats_.subtreeWhole_);
2169
2170 subTreePool_->popKnowledge();
2171 // Since sent to other process, delete it.
2172 delete aSubTree;
2173 aSubTree = NULL;
2174 if (msgLevel_ > 100) {
2175 messageHandler()->message(ALPS_DONATE_WHOLE, messages())
2176 << globalRank_ << receiverID
2177 << status->MPI_TAG << CoinMessageEol;
2178 }
2179 }
2180 else {
2181 // Split subtree.
2182 aSubTree = aSubTree->splitSubTree(treeSize);
2183 if (treeSize > ALPS_GEN_TOL) {
2184 ++(psStats_.subtreeSplit_);
2185 sentSuccessful = sendSubTree(receiverID, aSubTree, tag);
2186 assert(sentSuccessful == true);
2187 // Since sent to other process, delete it.
2188 delete aSubTree;
2189 aSubTree = NULL;
2190 }
2191
2192 if (msgLevel_ > 100) {
2193 messageHandler()->message(ALPS_DONATE_SPLIT, messages())
2194 << globalRank_ << receiverID << status->MPI_TAG
2195 << CoinMessageEol;
2196 }
2197 }
2198 }
2199 else if (workingSubTree_ != 0) { // Case 2
2200 aSubTree = workingSubTree_->splitSubTree(treeSize);
2201 if (treeSize > ALPS_GEN_TOL) {
2202 ++(psStats_.subtreeSplit_);
2203
2204 sentSuccessful = sendSubTree(receiverID, aSubTree, tag);
2205 assert(sentSuccessful == true);
2206
2207 // Since sent to other process, delete it.
2208 delete aSubTree;
2209 aSubTree = NULL;
2210
2211 if (msgLevel_ > 100) {
2212 messageHandler()->message(ALPS_DONATE_SPLIT, messages())
2213 << globalRank_ << receiverID << status->MPI_TAG
2214 << CoinMessageEol;
2215 }
2216 }
2217 }
2218 }
2219
2220 if (!sentSuccessful || !alreadySent) { // Case 3
2221 #if 0
2222 std::cout << "donateWork(): " << globalRank_ << "has nothing send to "
2223 << receiverID <<"; alreadySent "<<alreadySent<< std::endl;
2224 #endif
2225 ++(psStats_.donateFail_);
2226
2227 //MPI_Send(dummyBuf, 0, MPI_PACKED, receiverID, tag, MPI_COMM_WORLD);
2228 MPI_Isend(dummyBuf, 0, MPI_PACKED, receiverID, tag,
2229 MPI_COMM_WORLD, &subTreeRequest_); // request is overrided.
2230
2231 if (msgLevel_ > 100) {
2232 messageHandler()->message(ALPS_DONATE_FAIL, messages())
2233 << globalRank_ << receiverID << status->MPI_TAG
2234 << CoinMessageEol;
2235 }
2236 }
2237 else {
2238 ++(psStats_.donateSuccess_);
2239 }
2240
2241 #ifdef NF_DEBUG_MORE
2242 updateWorkloadInfo();
2243 messageHandler()->message(ALPS_DONATE_AFTER, messages())
2244 << globalRank_ << workQuantity_ << subTreePool_->getNumKnowledges()
2245 << CoinMessageEol;
2246 #endif
2247 }
2248
2249 //#############################################################################
2250
2251 // The hub allocate the donated subtree to the worker who has the worst
2252 // workload. Note the donated subtree is from another cluster.
2253 void
hubAllocateDonation(char * & bufLarge,MPI_Status * status)2254 AlpsKnowledgeBrokerMPI::hubAllocateDonation(char*& bufLarge,
2255 MPI_Status* status)
2256 {
2257 int i;
2258 int count = 0;
2259
2260 int worstRank = -1;
2261 double worst = -ALPS_OBJ_MAX; // Minimization: best possible
2262
2263 //------------------------------------------------------
2264 // Get the number of elements in bufLarge.
2265 //------------------------------------------------------
2266
2267 MPI_Get_count(status, MPI_PACKED, &count);
2268
2269 //------------------------------------------------------
2270 // Find the worker who has worst work quality.
2271 //------------------------------------------------------
2272
2273 for (i = 0; i < clusterSize_; ++i) {
2274 if (!hubWork_) {
2275 if (i == clusterRank_) continue;
2276 }
2277 if (workerWorkQuantities_[i] > worst) {
2278 worst = workerWorkQualities_[i];
2279 worstRank = globalRank_ + i;
2280 }
2281 }
2282
2283 //------------------------------------------------------
2284 // Forward the bufLarge to this worst worker.
2285 //------------------------------------------------------
2286
2287 if (worstRank != -1) {
2288
2289 #ifdef NF_DEBUG_MORE
2290 std::cout << "hubAllocateDonation() send to "
2291 << worstRank << std::endl;
2292 #endif
2293 if (worstRank != globalRank_) {
2294 MPI_Send(bufLarge, count, MPI_PACKED, worstRank,
2295 AlpsMsgSubTreeByMaster, MPI_COMM_WORLD);
2296 incSendCount("hubAllocateDonation");
2297 }
2298 else { // Hub self
2299 //std::cout << "Allocate to myself" << std::endl;
2300 receiveSubTree(bufLarge, status->MPI_SOURCE, status);
2301 }
2302 }
2303 else {
2304 std::cout << "ERROR: worstRank == -1" << std::endl;
2305 throw CoinError("worstRank == -1",
2306 "hubAllocateDonation", "AlpsKnowledgeBrokerMPI");
2307 }
2308 }
2309
2310 //#############################################################################
2311
2312 // Hub balances the workload of its workers. Check if there are workers has
2313 // no workload. If there is, do quantity balance. If not, do quality balance.
2314 void
hubBalanceWorkers()2315 AlpsKnowledgeBrokerMPI::hubBalanceWorkers()
2316 {
2317 const double zeroQuantity =
2318 model_->AlpsPar()->entry(AlpsParams::zeroLoad);
2319 if (clusterWorkQuantity_ < zeroQuantity) {
2320 if (msgLevel_ > 100) {
2321 messageHandler()->message(ALPS_LOADBAL_HUB_NO, messages())
2322 //<< globalRank_ << systemWorkQuantity_ << CoinMessageEol;
2323 << globalRank_ << clusterWorkQuantity_ << CoinMessageEol;
2324 }
2325 return;
2326 }
2327
2328 int i;
2329 char* dummyBuf = NULL;
2330
2331 std::vector<std::pair<double, int> > loadIDVector;
2332 loadIDVector.reserve(hubNum_);
2333 std::multimap<double, int, std::greater<double> > receivers;
2334 std::multimap<double, int> donors;
2335
2336 const double donorSh =
2337 model_->AlpsPar()->entry(AlpsParams::donorThreshold);
2338 const double receiverSh =
2339 model_->AlpsPar()->entry(AlpsParams::receiverThreshold);
2340 const double needWorkThreshold =
2341 model_->AlpsPar()->entry(AlpsParams::needWorkThreshold);
2342 assert(needWorkThreshold > 0.0);
2343
2344 ++(psStats_.intraBalance_);
2345
2346 // Indentify donors and receivers and decide to do quantity or quality.
2347 // Do quantity balance if any hubs do not have work
2348 bool quantityBalance = false;
2349
2350 for (i = 0; i < clusterSize_; ++i) { // NOTE: i start from 1
2351 #ifdef NF_DEBUG
2352 std::cout << "Hub["<<globalRank_ <<"] : HUB BALANCE: worker "
2353 << i << ", quantitity = " << workerWorkQuantities_[i]
2354 << ", quality = " << workerWorkQualities_[i]
2355 << ", needWorkThreshold = " << needWorkThreshold << std::endl;
2356 #endif
2357 if ( i == clusterRank_ ) continue;
2358
2359 if (workerWorkQuantities_[i] <= needWorkThreshold) {
2360 receivers.insert(std::pair<double, int>(workerWorkQualities_[i],
2361 globalRank_ - masterRank_ + i));
2362 quantityBalance = true;
2363 }
2364 }
2365
2366 if (quantityBalance) { // Quantity balance
2367 ++(psStats_.quantityBalance_);
2368
2369 for (i = 0; i < clusterSize_; ++i) {
2370 if ( i == clusterRank_ ) continue;
2371
2372 if (workerWorkQuantities_[i] > needWorkThreshold) {
2373 donors.insert(std::pair<double, int>(workerWorkQualities_[i],
2374 globalRank_-masterRank_ + i));
2375 }
2376 }
2377 }
2378 else { // Quality balance
2379 ++(psStats_.qualityBalance_);
2380
2381 double averQuality = 0.0;
2382 for (i = 0; i < clusterSize_; ++i) {
2383 if ( i == clusterRank_ ) continue;
2384 averQuality += workerWorkQualities_[i];
2385 }
2386 averQuality /= (clusterSize_ - 1);
2387
2388 for (i = 0; i < clusterSize_; ++i) {
2389 if ( i == clusterRank_ ) continue;
2390 double diff = workerWorkQualities_[i] - averQuality;
2391 double diffRatio = fabs(diff / averQuality);
2392 if (diff < 0 && diffRatio > donorSh) { // Donor candidates
2393 donors.insert(std::pair<double, int>(workerWorkQualities_[i],
2394 globalRank_ -masterRank_ + i));
2395 }
2396 else if (diff > 0 && diffRatio > receiverSh){// Receiver candidates
2397 receivers.insert(std::pair<double, int>(
2398 workerWorkQualities_[i],
2399 globalRank_ - masterRank_ + i));
2400 }
2401 }
2402 }
2403
2404 // Instruct donor workers to send nodes to receiver workers
2405 const int numDonor = (int) donors.size();
2406 const int numReceiver = (int) receivers.size();
2407 const int numPair = CoinMin(numDonor, numReceiver);
2408
2409 #ifdef NF_DEBUG
2410 std::cout << "Hub[" << globalRank_ << "]: num of donors = " << numDonor
2411 << ", num of receivers = " << numReceiver << std::endl;
2412 #endif
2413
2414 int donorID;
2415 int receiverID;
2416
2417 std::multimap<double, int, std::greater<double> >::iterator posD =
2418 donors.begin();
2419 std::multimap<double, int>::iterator posR = receivers.begin();
2420
2421 for(i = 0; i < numPair; ++i) {
2422 donorID = posD->second;
2423 receiverID = posR->second;
2424
2425 #ifdef NF_DEBUG
2426 std::cout << "Hub["<<globalRank_ <<"] : HUB BALANCE: receiver worker ID = "
2427 << receiverID << std::endl;
2428 std::cout << "Hub["<<globalRank_ <<"] : donor worker ID = "
2429 << donorID << std::endl;
2430 #endif
2431
2432 assert(receiverID < processNum_ && donorID < processNum_);
2433
2434 ++hubDoBalance_;
2435 if (donorID == globalRank_) { // Hub self is donor
2436 MPI_Status status;
2437 donateWork(dummyBuf, AlpsMsgSubTree, &status,
2438 receiverID, posR->first);
2439 incSendCount("hubBalanceWorkers");
2440 }
2441 else {
2442 hubAskWorkerDonate(donorID, receiverID, posR->first);
2443 }
2444
2445 ++posD;
2446 ++posR;
2447 }
2448 }
2449
2450 //#############################################################################
2451
2452 // Upon receiving request for workload from a worker, this hub asks its most
2453 // loaded worker to send a subtree to the worker
2454 void
hubSatisfyWorkerRequest(char * & bufLarge,MPI_Status * status)2455 AlpsKnowledgeBrokerMPI::hubSatisfyWorkerRequest(char*& bufLarge, MPI_Status* status)
2456 {
2457 int i;
2458 int requestorRank;
2459 int pos = 0;
2460 int donorRank = -1;
2461 int donorGlobalRank = -1; // Global rank
2462 int size = model_->AlpsPar()->entry(AlpsParams::smallSize);
2463
2464 double bestQuality = ALPS_OBJ_MAX; // Worst could be
2465
2466 //------------------------------------------------------
2467 // Indentify the requestor's cluster rank.
2468 //------------------------------------------------------
2469
2470 int requestor = status->MPI_SOURCE;
2471 requestorRank = requestor % userClusterSize_;
2472
2473 //------------------------------------------------------
2474 // Find the worker having best quality in this cluster.
2475 //------------------------------------------------------
2476
2477 for (i = 0; i < clusterSize_; ++i) {
2478 if (i == clusterRank_ || i == requestorRank){
2479 // Skip hub and the requestor.
2480 // TODO: do not skip hub if hub works.
2481 continue;
2482 }
2483 if (workerWorkQualities_[i] < bestQuality) {
2484 bestQuality = workerWorkQualities_[i];
2485 donorRank = i;
2486 donorGlobalRank = globalRank_ - masterRank_ + i;
2487 }
2488 }
2489
2490 //------------------------------------------------------
2491 // Ask the worker has best qaulity to share work with requestor.
2492 //------------------------------------------------------
2493
2494 double temp = 0.0;
2495
2496 if ( (donorGlobalRank != -1) && (donorRank != requestorRank) ) {
2497 // Send the requestor rank and quality to donor.
2498 MPI_Pack(&requestor, 1, MPI_INT, smallBuffer_, size, &pos,
2499 MPI_COMM_WORLD);
2500 MPI_Pack(&temp, 1, MPI_DOUBLE, smallBuffer_, size, &pos,
2501 MPI_COMM_WORLD);
2502 MPI_Send(smallBuffer_, pos, MPI_PACKED,
2503 donorGlobalRank,
2504 AlpsMsgAskDonateToWorker,
2505 MPI_COMM_WORLD);
2506
2507 #ifdef NF_DEBUG_MORE
2508 std::cout << "HUB " << globalRank_ << "ask worker " <<donorGlobalRank
2509 << "to donate workload with " << requestor << std::endl;
2510 #endif
2511 }
2512 else {
2513 // Failed to find a donor, send a empty buffer to requestor.
2514 ++(psStats_.donateFail_);
2515
2516 MPI_Send(smallBuffer_, 0, MPI_PACKED,
2517 requestor,
2518 AlpsMsgSubTreeByWorker,
2519 MPI_COMM_WORLD);
2520 if (msgLevel_ > 100) {
2521 messageHandler()->message(ALPS_LOADBAL_HUB_FAIL, messages())
2522 << globalRank_ << requestor << CoinMessageEol;
2523 }
2524 }
2525
2526 incSendCount("hubSatisfyWorkerRequest");
2527 }
2528
2529 //#############################################################################
2530
2531 // Report cluster work quality, quantity, and msg counts.
2532 void
hubReportStatus(int tag,MPI_Comm comm)2533 AlpsKnowledgeBrokerMPI::hubReportStatus(int tag, MPI_Comm comm)
2534 {
2535 int pos = 0;
2536 int size = model_->AlpsPar()->entry(AlpsParams::smallSize);
2537
2538 int receiver;
2539
2540 if (comm == MPI_COMM_WORLD) {
2541 receiver = masterRank_;
2542 }
2543 else if (comm == hubComm_) {
2544 // NOTE: master's rank is always 0 in hubComm_.
2545 receiver = 0;
2546 }
2547 else {
2548 std::cout << "HUB " << globalRank_
2549 <<" : Unkown Comm in hubReportStatus" << std::endl;
2550 throw CoinError("Unkown Comm", "hubReportStatus",
2551 "AlpsKnowledgeBrokerMPI");
2552 }
2553
2554 #if defined(NF_DEBUG_MORE)
2555 std::cout << "HUB " << globalRank_
2556 << " : quantity = " << clusterWorkQuantity_ << ", ";
2557 for (int i = 0; i < clusterSize_; ++i) {
2558 std::cout << "w[" << globalRank_+i << "]="
2559 << workerWorkQuantities_[i] << ", ";
2560 }
2561 std::cout << std::endl;
2562 std::cout << "HUB " << globalRank_
2563 << " : quality = " << clusterWorkQuality_ << ", ";
2564 for (int i = 0; i < clusterSize_; ++i) {
2565 std::cout << "w[" << globalRank_+i << "]="
2566 << workerWorkQualities_[i] << ", ";
2567 }
2568 std::cout << std::endl;
2569 #endif
2570
2571 MPI_Pack(&clusterNodeProcessed_, 1, MPI_INT, smallBuffer_,size,&pos, comm);
2572 MPI_Pack(&clusterWorkQuality_, 1, MPI_DOUBLE, smallBuffer_,size,&pos,comm);
2573 MPI_Pack(&clusterWorkQuantity_, 1, MPI_DOUBLE,smallBuffer_,size,&pos,comm);
2574 MPI_Pack(&clusterSendCount_, 1, MPI_INT, smallBuffer_, size, &pos, comm);
2575 MPI_Pack(&clusterRecvCount_, 1, MPI_INT, smallBuffer_, size, &pos, comm);
2576 MPI_Pack(&nodeProcessingTime_, 1, MPI_DOUBLE, smallBuffer_, size, &pos,
2577 comm);
2578 MPI_Pack(&unitWorkNodes_, 1, MPI_INT, smallBuffer_, size, &pos, comm);
2579
2580 MPI_Send(smallBuffer_, pos, MPI_PACKED, receiver, tag, comm);
2581
2582 clusterSendCount_ = clusterRecvCount_ = 0; // Only count new msg
2583 }
2584
2585 //#############################################################################
2586
2587 // After receiving status (in buf) from a worker, a hub updates its
2588 // cluster's status
2589 void
hubUpdateCluStatus(char * & bufLarge,MPI_Status * status,MPI_Comm comm)2590 AlpsKnowledgeBrokerMPI::hubUpdateCluStatus(char*& bufLarge,
2591 MPI_Status* status,
2592 MPI_Comm comm)
2593 {
2594 int msgSendNum, msgRecvNum;
2595 int position = 0;
2596 int sender;
2597 int size = model_->AlpsPar()->entry(AlpsParams::smallSize);
2598
2599 if (comm == MPI_COMM_WORLD)
2600 sender = (status->MPI_SOURCE) % userClusterSize_;
2601 else if (comm == clusterComm_)
2602 sender = status->MPI_SOURCE;
2603 else {
2604 std::cout << "unknown sender in hubUpdateCluStatus()" << std::endl;
2605 throw CoinError("unknown sender",
2606 "hubUpdateCluStatus()", "AlpsKnowledgeBrokerMPI");
2607 }
2608
2609 workerReported_[sender] = true;
2610
2611 int preNodeProcessed = workerNodeProcesseds_[sender];
2612 int curNodeProcessed;
2613 //double preQuality = workerWorkQualities_[sender];
2614 double curQuality;
2615 double preQuantity = workerWorkQuantities_[sender];
2616 double curQuantity;
2617 double npTime;
2618
2619 MPI_Unpack(bufLarge, size, &position, &curNodeProcessed, 1, MPI_INT, comm);
2620 MPI_Unpack(bufLarge, size, &position, &curQuality, 1, MPI_DOUBLE, comm);
2621 MPI_Unpack(bufLarge, size, &position, &curQuantity, 1, MPI_DOUBLE, comm);
2622 MPI_Unpack(bufLarge, size, &position, &msgSendNum, 1, MPI_INT, comm);
2623 MPI_Unpack(bufLarge, size, &position, &msgRecvNum, 1, MPI_INT, comm);
2624 MPI_Unpack(bufLarge, size, &position, &npTime, 1, MPI_DOUBLE, comm);
2625 MPI_Unpack(bufLarge, size, &position, &unitWorkNodes_, 1, MPI_INT, comm);
2626
2627 workerNodeProcesseds_[sender] = curNodeProcessed;
2628 workerWorkQualities_[sender] = curQuality;
2629 workerWorkQuantities_[sender] = curQuantity;
2630
2631 clusterNodeProcessed_ -= preNodeProcessed;
2632 clusterNodeProcessed_ += curNodeProcessed;
2633
2634 clusterWorkQuantity_ -= preQuantity;
2635 clusterWorkQuantity_ += curQuantity;
2636
2637 if (clusterWorkQuantity_ < ALPS_QUALITY_TOL)
2638 clusterWorkQuality_ = ALPS_OBJ_MAX;
2639 else {
2640 #if 0 // Have problems
2641 // NOTE: no work means quality is ALPS_OBJ_MAX.
2642 if (hubWork_) {
2643 clusterWorkQuality_ *= userClusterSize_;
2644 }
2645 else {
2646 clusterWorkQuality_ *= (userClusterSize_ - 1);
2647 }
2648
2649 clusterWorkQuality_ -= preQuality;
2650 clusterWorkQuality_ += curQuality;
2651 if (hubWork_) {
2652 clusterWorkQuality_ /= userClusterSize_;
2653 }
2654 else {
2655 clusterWorkQuality_ /= (userClusterSize_ - 1);
2656 }
2657 #endif
2658 clusterWorkQuality_ = std::min(clusterWorkQuality_, curQuality);
2659
2660 }
2661
2662 clusterSendCount_ += msgSendNum;
2663 clusterRecvCount_ += msgRecvNum;
2664
2665 if (npTime != ALPS_NODE_PROCESS_TIME && npTime > 1.0e-8) {
2666 if (nodeProcessingTime_ == ALPS_NODE_PROCESS_TIME) {
2667 nodeProcessingTime_ = npTime;
2668 }
2669 else {
2670 nodeProcessingTime_ = 0.5 * (nodeProcessingTime_ + npTime);
2671 }
2672
2673 if (globalRank_ == masterRank_) {
2674 masterBalancePeriod_ = computeBalancePeriod(userBalancePeriod_,
2675 masterBalancePeriod_,
2676 nodeProcessingTime_);
2677 }
2678 else {
2679 hubReportPeriod_ = computeBalancePeriod(userBalancePeriod_,
2680 hubReportPeriod_,
2681 nodeProcessingTime_);
2682 }
2683
2684 if (hubMsgLevel_ > 5) {
2685 messageHandler()->message(ALPS_LOADBAL_HUB_PERIOD, messages())
2686 << globalRank_ << hubReportPeriod_ << CoinMessageEol;
2687 }
2688
2689 #if 0
2690 std::cout << "HUB[" << globalRank_ << "]: After updateSystem(),"
2691 << "hubReportPeriod_ = " << masterBalancePeriod_
2692 << ", npTime = " << npTime
2693 << ", nodeProcessingTime_ = " << nodeProcessingTime_
2694 << std::endl;
2695 #endif
2696
2697 }
2698
2699 #if 0
2700 std::cout << "HUB["<< globalRank_ <<"]: after hubUpdateCluStatus(): "
2701 << "curQuality = " << curQuality
2702 << ", sender = " << sender
2703 << ", curQuantity = " << curQuantity
2704 << ", preQuantity = " << preQuantity
2705 << ", clusterWorkQuantity_ = " << clusterWorkQuantity_
2706 << ", clusterWorkQuality_ = " << clusterWorkQuality_
2707 << ", clusterSendCount_ = " << clusterSendCount_
2708 << ", clusterRecvCount_ = " << clusterRecvCount_
2709 << ", clusterSize_ = " << clusterSize_
2710 << ", status->MPI_SOURCE = " << status->MPI_SOURCE
2711 << std::endl;
2712 #endif
2713
2714 }
2715
2716 //#############################################################################
2717
2718 // After receive master's require to donate work, this hub finds its most
2719 // loaded worker and ask it to donate some work to another hub, whose
2720 // information (id and load) is in bufLarge.
2721 void
hubsShareWork(char * & bufLarge,MPI_Status * status)2722 AlpsKnowledgeBrokerMPI::hubsShareWork(char*& bufLarge,
2723 MPI_Status* status)
2724 {
2725 int i;
2726 int pos = 0;
2727 int receiverID = 0;
2728 int maxLoadRank = -1; // Global rank
2729 double maxLoad = ALPS_DBL_MAX;
2730 int size = model_->AlpsPar()->entry(AlpsParams::smallSize);
2731
2732 int skipRank;
2733
2734 double receiverWorkload = 0.0;
2735
2736 //------------------------------------------------------
2737 // Indentify the receiver and its current workload.
2738 //------------------------------------------------------
2739
2740 MPI_Unpack(bufLarge, size, &pos, &receiverID, 1, MPI_INT, MPI_COMM_WORLD);
2741 MPI_Unpack(bufLarge, size, &pos, &receiverWorkload, 1, MPI_DOUBLE,
2742 MPI_COMM_WORLD);
2743
2744 //------------------------------------------------------
2745 // Find the donor worker in hub's cluster.
2746 //------------------------------------------------------
2747
2748 if (hubWork_) {
2749 skipRank = -1;
2750 }
2751 else {
2752 skipRank = clusterRank_;
2753 }
2754
2755 for (i = 0; i < clusterSize_; ++i) {
2756 if (i != skipRank) {
2757 if ( (workerWorkQuantities_[i] > ALPS_QUALITY_TOL) &&
2758 (workerWorkQualities_[i] < maxLoad) ) {
2759 maxLoad = workerWorkQualities_[i];
2760 maxLoadRank = globalRank_ - masterRank_ + i;
2761 }
2762 }
2763 }
2764
2765 //------------------------------------------------------
2766 // Ask the donor to share work (send a subtree to receiving hub).
2767 //------------------------------------------------------
2768
2769 if (maxLoadRank != -1) {
2770 #if 0
2771 std::cout << "HUB[" << globalRank_ << "] : will ask process "
2772 << maxLoadRank
2773 << " to donate workload to process " << receiverID
2774 << std::endl;
2775 #endif
2776
2777 if(maxLoadRank != globalRank_) { // Not hub
2778 // max loaded is NOT this hub.
2779
2780 pos = 0; // Reset position to pack
2781 MPI_Pack(&receiverID, 1, MPI_INT, smallBuffer_, size,
2782 &pos, MPI_COMM_WORLD);
2783 MPI_Pack(&receiverWorkload, 1, MPI_DOUBLE, smallBuffer_, size,
2784 &pos, MPI_COMM_WORLD);
2785 MPI_Send(smallBuffer_, pos, MPI_PACKED, maxLoadRank,
2786 AlpsMsgAskDonateToHub, MPI_COMM_WORLD);
2787 incSendCount("hubsShareWork");
2788 }
2789 else {
2790 // Max loaded is this hub.
2791 // NOTE: smallBuffer_ is NOT useful in this case.
2792 donateWork(smallBuffer_,
2793 AlpsMsgSubTreeByMaster,
2794 status,
2795 receiverID,
2796 receiverWorkload);
2797 incSendCount("hubsShareWork");
2798 }
2799 }
2800 else {
2801 #if 0
2802 std::cout << "HUB[" << globalRank_
2803 << "] :can not find a process to donate workload to process "
2804 << receiverID << std::endl;
2805 #endif
2806 if (msgLevel_ > 100) {
2807 messageHandler()->message(ALPS_LOADBAL_HUB_FAIL, messages())
2808 << globalRank_ << receiverID << CoinMessageEol;
2809 }
2810
2811 // See a empty msg to master to reduce masterDoBalance_ by 1
2812 MPI_Send(smallBuffer_, 0, MPI_PACKED, masterRank_,
2813 AlpsMsgHubFailFindDonor, MPI_COMM_WORLD);
2814 incSendCount("hubsShareWork");
2815 }
2816 }
2817
2818 //#############################################################################
2819
2820 // Master balance the workload of hubs. Check if there are hubs has
2821 // no workload. If there is, do quantity balance. If not, do quality balance.
2822 void
masterBalanceHubs()2823 AlpsKnowledgeBrokerMPI::masterBalanceHubs()
2824 {
2825 const double zeroQuantity =
2826 model_->AlpsPar()->entry(AlpsParams::zeroLoad);
2827 if (systemWorkQuantity_ < zeroQuantity) {
2828 if (msgLevel_ > 100) {
2829 messageHandler()->message(ALPS_LOADBAL_MASTER_NO, messages())
2830 << globalRank_ << systemWorkQuantity_ << CoinMessageEol;
2831 }
2832 return;
2833 }
2834
2835 int i;
2836 int size = model_->AlpsPar()->entry(AlpsParams::smallSize);
2837
2838 std::vector<std::pair<double, int> > loadIDVector;
2839
2840 loadIDVector.reserve(hubNum_);
2841
2842 std::multimap<double, int, std::greater<double> > receivers;
2843 std::multimap<double, int> donors;
2844
2845 const double donorSh =
2846 model_->AlpsPar()->entry(AlpsParams::donorThreshold);
2847 const double receiverSh =
2848 model_->AlpsPar()->entry(AlpsParams::receiverThreshold);
2849
2850 //------------------------------------------------------
2851 // Identify donors and receivers and decide do quality or quantity.
2852 // Do quantity balance immediately if any hubs do not have work.
2853 //------------------------------------------------------
2854
2855 ++(psStats_.interBalance_);
2856
2857 bool quantityBalance = false;
2858 for (i = 0; i < hubNum_; ++i) {
2859 if (hubWorkQuantities_[i] < ALPS_QUALITY_TOL) { // Have not work
2860 #if 0
2861 std::cout << "++++ master balance: find donor hub " << hubRanks_[i]
2862 << "; quantity = " << hubWorkQuantities_[i]
2863 << std::endl;
2864 #endif
2865 receivers.insert(std::pair<double, int>(hubWorkQualities_[i],
2866 hubRanks_[i]));
2867 quantityBalance = true;
2868 }
2869 }
2870
2871 if (quantityBalance) { // Quantity balance
2872 ++(psStats_.quantityBalance_);
2873
2874 for (i = 0; i < hubNum_; ++i) {
2875 if (hubWorkQuantities_[i] > ALPS_QUALITY_TOL) {
2876 #if 0
2877 std::cout << "++++ master balance: find donor hub "
2878 << hubRanks_[i] << "; quantity = "
2879 << hubWorkQuantities_[i] << std::endl;
2880 #endif
2881 donors.insert(std::pair<double, int>(hubWorkQualities_[i],
2882 hubRanks_[i]));
2883 }
2884 }
2885 }
2886 else { // Quality balance
2887 ++(psStats_.qualityBalance_);
2888
2889 double averQuality = 0.0;
2890 for (i = 0; i < hubNum_; ++i) {
2891 averQuality += hubWorkQualities_[i];
2892 }
2893 averQuality /= hubNum_;
2894
2895 for (i = 0; i < hubNum_; ++i) {
2896 double diff = hubWorkQualities_[i] - averQuality;
2897 double diffRatio = fabs(diff / (averQuality + 1.0));
2898 if (diff < 0 && diffRatio > donorSh) { // Donor candidates
2899 #if 0
2900 std::cout << "++++ master balance: find donor hub "
2901 << hubRanks_[i] << "; quality = "
2902 << hubWorkQualities_[i] << std::endl;
2903 #endif
2904 donors.insert(std::pair<double, int>(hubWorkQualities_[i],
2905 hubRanks_[i]));
2906 }
2907 else if (diff > 0 && diffRatio > receiverSh){// Receiver candidates
2908 #if 0
2909 std::cout << "++++ master balance: quality: find receiver hub "
2910 << hubRanks_[i] << "; quality = "
2911 << hubWorkQualities_[i] << std::endl;
2912 #endif
2913 receivers.insert(std::pair<double, int>(hubWorkQualities_[i],
2914 hubRanks_[i]));
2915 }
2916 }
2917 }
2918
2919 //------------------------------------------------------
2920 // Tell the donor hubs to send subtree to receiver hubs.
2921 //------------------------------------------------------
2922
2923 const int numDonor = (int) donors.size();
2924 const int numReceiver = (int) receivers.size();
2925 const int numPair = CoinMin(numDonor, numReceiver);
2926
2927 #if 0
2928 std::cout << "Master: donors size = " << numDonor
2929 << ", receiver size = " << numReceiver
2930 << ", numPair = " << numPair << std::endl;
2931 #endif
2932
2933 int donorID;
2934 int receiverID;
2935
2936 std::multimap<double, int, std::greater<double> >::iterator posD =
2937 donors.begin();
2938 std::multimap<double, int>::iterator posR = receivers.begin();
2939
2940 for(i = 0; i < numPair; ++i) {
2941 donorID = posD->second;
2942 receiverID = posR->second;
2943
2944 #if 0
2945 std::cout << "MASTER : receiver hub ID = " << receiverID
2946 << "; quality = " << posR->first << std::endl;
2947 std::cout << "MASTER : donor hub ID = " << donorID
2948 << "; quality = " << posD->first << std::endl;
2949 #endif
2950
2951 if (CoinAbs(receiverID) >= processNum_ ||
2952 CoinAbs(donorID) >= processNum_) {
2953 std::cout << "ERROR: receiverID = " << receiverID << std::endl;
2954 std::cout << "ERROR : donorID = " << donorID << std::endl;
2955 throw
2956 CoinError("receiverID>=processNum_ || donorID >= processNum_",
2957 "masterBalanceHubs", "AlpsKnowledgeBrokerMPI");
2958
2959 }
2960
2961 if (donorID != masterRank_) {
2962 ++masterDoBalance_;
2963 masterAskHubDonate(donorID, receiverID, posR->first);
2964 }
2965 else { // donor is the master
2966 double maxLoad = ALPS_DBL_MAX;
2967 int maxLoadRank = -1;
2968 int pos = 0;
2969 double recvLoad = posR->first;
2970
2971 // Find the donor worker in hub's cluster
2972 for (int k = 0; k < clusterSize_; ++k) {
2973 if (k != clusterRank_) {
2974 if (workerWorkQualities_[k] < maxLoad) {
2975 maxLoad = workerWorkQualities_[k];
2976 maxLoadRank = globalRank_ - masterRank_ + k;
2977 }
2978 }
2979 }
2980
2981 // FIXME: how many workload to share? Righ now just 1 subtree.
2982 // Ask the donor worker to send a subtree to receiving hub)
2983 if (maxLoadRank != -1) {
2984 MPI_Pack(&receiverID, 1, MPI_INT, smallBuffer_, size, &pos,
2985 MPI_COMM_WORLD);
2986 MPI_Pack(&recvLoad, 1, MPI_DOUBLE, smallBuffer_, size, &pos,
2987 MPI_COMM_WORLD);
2988
2989 incSendCount("masterBalanceHubs()");
2990 MPI_Send(smallBuffer_, pos, MPI_PACKED, maxLoadRank,
2991 AlpsMsgAskDonateToHub, MPI_COMM_WORLD);
2992 ++masterDoBalance_;
2993
2994 #ifdef NF_DEBUG_MORE
2995 std::cout << "MASTER : ask its worker " << maxLoadRank
2996 << " to donate load to hub"
2997 << receiverID << std::endl;
2998 #endif
2999 }
3000 }
3001 ++posD;
3002 ++posR;
3003 }
3004 }
3005
3006 //#############################################################################
3007
3008 // After recieve status (in buf) of a hub, update system status
3009 void
masterUpdateSysStatus(char * & bufLarge,MPI_Status * status,MPI_Comm comm)3010 AlpsKnowledgeBrokerMPI::masterUpdateSysStatus(char*& bufLarge,
3011 MPI_Status* status,
3012 MPI_Comm comm)
3013 {
3014 int position = 0;
3015 int msgSendNum, msgRecvNum;
3016 int sender = -1;
3017 int size = model_->AlpsPar()->entry(AlpsParams::smallSize);
3018
3019 if (comm == MPI_COMM_WORLD) {
3020 //Be careful with / or %
3021 sender = (int)(status->MPI_SOURCE) / userClusterSize_;
3022 }
3023 else if (comm == hubComm_) {
3024 sender = (int)(status->MPI_SOURCE);
3025 }
3026 else {
3027 std::cout << "unknown COMM in masterUpdateSysStatus"
3028 << std::endl;
3029 throw CoinError("unknown sender",
3030 "masterUpdateSysStatus", "AlpsKnowledgeBrokerMPI");
3031 }
3032
3033 int preNodeProcessed = hubNodeProcesseds_[sender];
3034 int curNodeProcessed = 0;
3035 //double preQuality = hubWorkQualities_[sender];
3036 double curQuality;
3037 double preQuantity = hubWorkQuantities_[sender];
3038 double curQuantity;
3039 double npTime;
3040
3041 MPI_Unpack(bufLarge, size, &position, &curNodeProcessed, 1, MPI_INT, comm);
3042 MPI_Unpack(bufLarge, size, &position, &curQuality, 1, MPI_DOUBLE, comm);
3043 MPI_Unpack(bufLarge, size, &position, &curQuantity, 1, MPI_DOUBLE, comm);
3044 MPI_Unpack(bufLarge, size, &position, &msgSendNum, 1, MPI_INT, comm);
3045 MPI_Unpack(bufLarge, size, &position, &msgRecvNum, 1, MPI_INT, comm);
3046 MPI_Unpack(bufLarge, size, &position, &npTime, 1, MPI_DOUBLE, comm);
3047 MPI_Unpack(bufLarge, size, &position, &unitWorkNodes_, 1, MPI_INT, comm);
3048
3049 // Update the hub's status
3050 hubNodeProcesseds_[sender] = curNodeProcessed;
3051 hubWorkQualities_[sender] = curQuality;
3052 hubWorkQuantities_[sender] = curQuantity;
3053
3054 // Update system status
3055 systemSendCount_ += msgSendNum;
3056 systemRecvCount_ += msgRecvNum;
3057
3058 systemNodeProcessed_ += curNodeProcessed;
3059 systemNodeProcessed_ -= preNodeProcessed;
3060
3061 systemWorkQuantity_ += curQuantity;
3062 systemWorkQuantity_ -= preQuantity;
3063
3064 if (systemWorkQuantity_ < ALPS_QUALITY_TOL) {
3065 systemWorkQuality_ = ALPS_OBJ_MAX;
3066 }
3067 else {
3068 systemWorkQuality_ = std::min(systemWorkQuality_, curQuality);
3069 }
3070
3071 if ( hubReported_[sender] != true ) {
3072 hubReported_[sender] = true;
3073 }
3074
3075 int large = ALPS_INT_MAX/10;
3076 if (systemSendCount_ > large || systemRecvCount_ > large) {
3077 int minCount = std::min(systemSendCount_, systemRecvCount_);
3078 systemSendCount_ -= minCount;
3079 systemRecvCount_ -= minCount;
3080 }
3081
3082 if (npTime != ALPS_NODE_PROCESS_TIME && npTime > 1.0e-10) {
3083 if (nodeProcessingTime_ == ALPS_NODE_PROCESS_TIME) {
3084 nodeProcessingTime_ = npTime;
3085 }
3086 else {
3087 nodeProcessingTime_ = 0.5 * (nodeProcessingTime_ + npTime);
3088 }
3089
3090 masterBalancePeriod_ = computeBalancePeriod(userBalancePeriod_,
3091 masterBalancePeriod_,
3092 nodeProcessingTime_);
3093 #if 0
3094 std::cout << "MASTER[" << globalRank_ << "]: After updateSystem(),"
3095 << "masterBalancePeriod_ = " << masterBalancePeriod_
3096 << std::endl;
3097 #endif
3098 }
3099
3100 #ifdef NF_DEBUG
3101 std::cout << "MASTER[" << globalRank_
3102 << "]: After updateSystem() : curQuality "
3103 << curQuality
3104 //<< " preQuality = " << preQuality
3105 << ", hubReported_[" << sender << "] = "
3106 << hubReported_[sender]
3107 << ", systemSendCount_ = " << systemSendCount_
3108 << ", systemRecvCount_ = " << systemRecvCount_
3109 << ", systemWorkQuality_ = " << systemWorkQuality_
3110 << std::endl;
3111 #endif
3112
3113 }
3114
3115 //#############################################################################
3116
3117 // Add master and master cluster status to system status
3118 void
refreshSysStatus()3119 AlpsKnowledgeBrokerMPI::refreshSysStatus()
3120 {
3121 //------------------------------------------------------
3122 // Add master's quantity (0 anyway) to hub 1.
3123 //------------------------------------------------------
3124
3125 workerWorkQuantities_[masterRank_] = workQuantity_;
3126 clusterWorkQuantity_ += workerWorkQuantities_[masterRank_];
3127
3128 //------------------------------------------------------
3129 // Add master's node processed num to hub 1.
3130 //------------------------------------------------------
3131
3132 int preMasterNodeP = workerNodeProcesseds_[masterRank_];
3133 workerNodeProcesseds_[masterRank_] = nodeProcessedNum_;
3134 clusterNodeProcessed_ += workerNodeProcesseds_[masterRank_];
3135 clusterNodeProcessed_ -= preMasterNodeP;
3136
3137 // Note: Nothing need to do about master's quality.
3138
3139 //------------------------------------------------------
3140 // Add hub1(master) cluster's quantity into system.
3141 //------------------------------------------------------
3142
3143 double preHub1QT = hubWorkQuantities_[0];
3144 hubWorkQuantities_[0] = clusterWorkQuantity_;
3145 systemWorkQuantity_ += hubWorkQuantities_[0];
3146 systemWorkQuantity_ -= preHub1QT;
3147
3148 //------------------------------------------------------
3149 // Add hub1(master) cluster's number of nodes processed into system.
3150 //------------------------------------------------------
3151
3152 int preHub1NodeP = hubNodeProcesseds_[0];
3153 hubNodeProcesseds_[0] = clusterNodeProcessed_;
3154 systemNodeProcessed_ += hubNodeProcesseds_[0];
3155 systemNodeProcessed_ -= preHub1NodeP;
3156
3157 //------------------------------------------------------
3158 // Add hub1(master) cluster's quality into system.
3159 //------------------------------------------------------
3160
3161 hubWorkQualities_[0] = clusterWorkQuality_;
3162 if (systemWorkQuantity_ < ALPS_QUALITY_TOL) {
3163 systemWorkQuality_ = ALPS_OBJ_MAX;
3164 }
3165 else {
3166 systemWorkQuality_ = std::min(systemWorkQuality_,hubWorkQualities_[0]);
3167 }
3168
3169 #if 0
3170 int i;
3171
3172 std::cout << "WORKLOAD: ";
3173 for (i = 0; i < clusterSize_; ++i) {
3174 std::cout << "worker[" <<i<< "] = "
3175 << workerWorkQuantities_[i] << ", ";
3176 }
3177 std::cout << std::endl;
3178 for (i = 0; i < hubNum_; ++i) {
3179 std::cout << "hub[" <<i<< "] = " << hubWorkQuantities_[i]
3180 << "; ";
3181 }
3182 std::cout << std::endl;
3183 #endif
3184
3185 //------------------------------------------------------
3186 // Add master' msg counts to system.
3187 //------------------------------------------------------
3188
3189 systemSendCount_ += sendCount_;
3190 systemRecvCount_ += recvCount_;
3191 sendCount_ = recvCount_ = 0;
3192
3193 //------------------------------------------------------
3194 // Add cluster's msg counts to system.
3195 //------------------------------------------------------
3196
3197 systemSendCount_ += clusterSendCount_;
3198 systemRecvCount_ += clusterRecvCount_;
3199 clusterSendCount_ = clusterRecvCount_ = 0;
3200 }
3201
3202 //#############################################################################
3203
3204 // Add the state of the hub to cluster state
3205 void
refreshClusterStatus()3206 AlpsKnowledgeBrokerMPI::refreshClusterStatus()
3207 {
3208 //------------------------------------------------------
3209 // Add hub's msg counts into cluster counts.
3210 //------------------------------------------------------
3211
3212 clusterSendCount_ += sendCount_;
3213 clusterRecvCount_ += recvCount_;
3214 sendCount_ = recvCount_ = 0; // IMPORTANT!
3215
3216 //------------------------------------------------------
3217 // Add hub's quantity into cluster.
3218 //------------------------------------------------------
3219
3220 double preWorkQuantity = workerWorkQuantities_[masterRank_];
3221 workerWorkQuantities_[masterRank_] = workQuantity_;
3222 clusterWorkQuantity_ -= preWorkQuantity;
3223 clusterWorkQuantity_ += workQuantity_;
3224
3225 //------------------------------------------------------
3226 // Incorporate hub's quality.
3227 //------------------------------------------------------
3228
3229 workerWorkQualities_[masterRank_] = workQuality_;
3230 clusterWorkQuality_ = std::min(clusterWorkQuality_, workQuality_);
3231
3232 // Add hub's node processed num into cluster
3233 int PreNodeP = workerNodeProcesseds_[masterRank_];
3234 workerNodeProcesseds_[masterRank_] = nodeProcessedNum_;
3235 clusterNodeProcessed_ += workerNodeProcesseds_[masterRank_];
3236 clusterNodeProcessed_ -= PreNodeP;
3237
3238 #ifdef NF_DEBUG_MORE
3239 std::cout << "Hub[" << globalRank_ << "]: clusterWorkQuality = "
3240 << clusterWorkQuality_ << std::endl;
3241 for (int i = 0; i < clusterSize_; ++i) {
3242 std::cout << "wQL[" <<i + globalRank_ << "] = "
3243 << workerWorkQualities_[i] << ", ";
3244 }
3245 std::cout << std::endl;
3246 #endif
3247 }
3248
3249 //#############################################################################
3250
3251 // Unpack received incumbent value and process ID from bufLarge.
3252 // Update incumbent value and ID stored on this process.
3253 bool
unpackSetIncumbent(char * & bufLarge,MPI_Status * status)3254 AlpsKnowledgeBrokerMPI::unpackSetIncumbent(char*& bufLarge, MPI_Status* status)
3255 {
3256 bool accept = false;
3257 int size =
3258 model_->AlpsPar()->entry(AlpsParams::smallSize);
3259 int position = 0;
3260 double incVal= 0.0;
3261 int incID = 0;
3262
3263 // Unpack incumbent value from bufLarge
3264 MPI_Unpack(bufLarge, size, &position, &incVal, 1, MPI_DOUBLE,
3265 MPI_COMM_WORLD);
3266 MPI_Unpack(bufLarge, size, &position, &incID, 1, MPI_INT,
3267 MPI_COMM_WORLD);
3268
3269 if (incID == globalRank_) {
3270
3271 #ifdef NF_DEBUG_MORE
3272 std::cout << "PROC " << globalRank_
3273 << " : unpack and ingore incVal = " << incVal << " at PROC"
3274 << incID << ", since I am " << incID
3275 << std::endl;
3276 #endif
3277 return false; // Do nothing
3278 }
3279
3280 // Assume minimization
3281 if (incVal < incumbentValue_) {
3282 // Better solution
3283 ++solNum_;
3284 incumbentValue_ = incVal;
3285 incumbentID_ = incID;
3286 accept = true;
3287
3288 if (globalRank_ == masterRank_) {
3289 bestSolNode_ = systemNodeProcessed_;
3290 }
3291
3292 #ifdef NF_DEBUG_MORE
3293 std::cout << "PROC " << globalRank_ << " : accept incVal = "
3294 << incVal << " from PROC " << incID << std::endl;
3295 #endif
3296
3297 //updateIncumbent_ = true; // The incumbent value is updated.
3298 }
3299 else if(incVal == incumbentValue_ ) {
3300 ++solNum_;
3301 if (incID < incumbentID_) { // So that all process consistant
3302 incumbentValue_ = incVal;
3303 incumbentID_ = incID;
3304 accept = true;
3305 }
3306 else{
3307 accept = false;
3308
3309 #ifdef NF_DEBUG_MORE
3310 std::cout << "PROC " << globalRank_
3311 << " : recv but discard incVal = " << incVal
3312 << " from PROC (SAME) " << incID << std::endl;
3313 #endif
3314 }
3315 }
3316 else { // >
3317 accept = false;
3318
3319 #ifdef NF_DEBUG_MORE
3320 std::cout << "PROC " << globalRank_
3321 << " : recv but discard incVal = " << incVal
3322 << " from PROC (WORSE)" << incID << std::endl;
3323 #endif
3324 }
3325
3326 return accept;
3327 }
3328
3329 //#############################################################################
3330
3331 void
workerReportStatus(int tag,MPI_Comm comm)3332 AlpsKnowledgeBrokerMPI::workerReportStatus(int tag,
3333 MPI_Comm comm)
3334 {
3335 int pos = 0;
3336 int size = model_->AlpsPar()->entry(AlpsParams::smallSize);
3337
3338 int receiver;
3339
3340 if (comm == MPI_COMM_WORLD) {
3341 receiver = myHubRank_;
3342 }
3343 else if (comm == clusterComm_) {
3344 receiver = masterRank_;
3345 }
3346 else {
3347 std::cout << "WORKER " << globalRank_
3348 <<" : Unkown Comm in workerReportStatus" << std::endl;
3349 throw CoinError("Unkown Comm",
3350 "workerReportStatus",
3351 "AlpsKnowledgeBrokerMPI");
3352 }
3353
3354 MPI_Pack(&nodeProcessedNum_, 1, MPI_INT, smallBuffer_, size, &pos, comm);
3355 MPI_Pack(&workQuality_, 1, MPI_DOUBLE, smallBuffer_, size, &pos, comm);
3356 MPI_Pack(&workQuantity_, 1, MPI_DOUBLE, smallBuffer_, size, &pos, comm);
3357 MPI_Pack(&sendCount_, 1, MPI_INT, smallBuffer_, size, &pos, comm);
3358 MPI_Pack(&recvCount_, 1, MPI_INT, smallBuffer_, size, &pos, comm);
3359 MPI_Pack(&nodeProcessingTime_, 1, MPI_DOUBLE, smallBuffer_, size, &pos,
3360 comm);
3361 MPI_Pack(&unitWorkNodes_, 1, MPI_INT, smallBuffer_, size, &pos, comm);
3362
3363 MPI_Send(smallBuffer_, pos, MPI_PACKED, receiver, tag, comm);
3364
3365 #if 0
3366 std::cout << "WORKER " << globalRank_
3367 << " : report quality = " << workQuality_
3368 << " : report quantity = " << workQuantity_
3369 << " to hub "
3370 << myHubRank_ << "; Tag = " << tag << "; sendCount_ = "
3371 << sendCount_ << "; recvCount_ = " << recvCount_
3372 << "unitWorkNodes_ = " << unitWorkNodes_
3373 << std::endl;
3374 #endif
3375
3376 sendCount_ = recvCount_ = 0; // Only count new message
3377 }
3378
3379 //#############################################################################
3380
3381 void
workerRecvIndices(char * & bufLarge)3382 AlpsKnowledgeBrokerMPI::workerRecvIndices(char *&bufLarge)
3383 {
3384 int position = 0;
3385 int size = 100;
3386 int nextIndex;
3387 int maxIndex;
3388
3389 MPI_Unpack(bufLarge, size, &position, &nextIndex, 1, MPI_INT,
3390 MPI_COMM_WORLD);
3391 MPI_Unpack(bufLarge, size, &position, &maxIndex, 1, MPI_INT,
3392 MPI_COMM_WORLD);
3393
3394 #if 0
3395 std::cout << "++++ worker[" << globalRank_
3396 << "] received indices from master"
3397 << ", nextIndex = " << nextIndex
3398 << ", maxIndex = " << maxIndex
3399 << std::endl;
3400 #endif
3401
3402 if (nextIndex < 0 || maxIndex < 0 || nextIndex > maxIndex) {
3403 haltSearch_ = true;
3404 }
3405 else {
3406 haltSearch_ = false;
3407 setNextNodeIndex(nextIndex);
3408 setMaxNodeIndex(maxIndex);
3409 }
3410 }
3411
3412 //#############################################################################
3413
3414 void
workerAskIndices()3415 AlpsKnowledgeBrokerMPI::workerAskIndices()
3416 {
3417 int position = 0;
3418 int size = model_->AlpsPar()->entry(AlpsParams::smallSize);
3419
3420 #if 0
3421 std::cout << "++++ worker[" << globalRank_ << "] ask indices from master."
3422 << std::endl;
3423 #endif
3424
3425 MPI_Pack(&globalRank_, 1, MPI_INT, smallBuffer_, size, &position,
3426 MPI_COMM_WORLD);
3427 MPI_Send(smallBuffer_, position, MPI_PACKED, masterRank_,
3428 AlpsMsgWorkerAskIndices, MPI_COMM_WORLD);
3429 incSendCount("workerAskIndices()");
3430 }
3431
3432 //#############################################################################
3433
3434 void
masterSendIndices(char * & bufLarge)3435 AlpsKnowledgeBrokerMPI::masterSendIndices(char *&bufLarge)
3436 {
3437 int nextIndex = getNextNodeIndex();
3438 int maxIndex = getMaxNodeIndex();
3439 int leftIndex = maxIndex - nextIndex;
3440
3441 int pos = 0;
3442 int size = 100;
3443 int recvWorker = -1;
3444
3445 if (leftIndex > masterIndexBatch_) {
3446 maxIndex = nextIndex + masterIndexBatch_;
3447 setNextNodeIndex(maxIndex + 1);
3448 }
3449 else if (leftIndex > 100) {
3450 setNextNodeIndex(maxIndex);
3451 }
3452 else {
3453 // No index
3454 forceTerminate_ = true;
3455 nextIndex = -1;
3456 maxIndex = -1;
3457 }
3458
3459 // Unpack the rank of worker.
3460 MPI_Unpack(bufLarge, size, &pos, &recvWorker, 1, MPI_INT, MPI_COMM_WORLD);
3461
3462 #if 0
3463 std::cout << "++++ master[" << globalRank_ << "] send indices to worker "
3464 << recvWorker << ", nextIndex = " << nextIndex
3465 << ", maxIndex = " << maxIndex
3466 << std::endl;
3467 #endif
3468
3469 // Pack nextIndex and maxIndex into small buffer and send it.
3470 pos = 0;
3471 MPI_Pack(&nextIndex, 1, MPI_INT, smallBuffer_, size, &pos,
3472 MPI_COMM_WORLD);
3473 MPI_Pack(&maxIndex, 1, MPI_INT, smallBuffer_, size, &pos,
3474 MPI_COMM_WORLD);
3475 MPI_Send(smallBuffer_, pos, MPI_PACKED, recvWorker,
3476 AlpsMsgIndicesFromMaster, MPI_COMM_WORLD);
3477 incSendCount("masterSendIndices");
3478 }
3479
3480 //#############################################################################
3481
3482 void
broadcastModel(const int id,const int sender)3483 AlpsKnowledgeBrokerMPI::broadcastModel(const int id, const int sender)
3484 {
3485 char* modelBuffer = 0;
3486 int size = -1;
3487 int position = 0; // Initialize to 0
3488 AlpsEncoded* encodedModel = NULL;
3489
3490 //------------------------------------------------------
3491 // Encode matrix and pack into modelBuffer.
3492 //------------------------------------------------------
3493
3494 if (id == sender) {
3495 // Pack model
3496 encodedModel = model_->encode();
3497 packEncoded(encodedModel, modelBuffer, size, position, MPI_COMM_WORLD);
3498
3499 #ifdef NF_DEBUG
3500 std::cout << "MASTER: packed model. size = "
3501 << size << std::endl;
3502 #endif
3503 }
3504
3505 //------------------------------------------------------
3506 // Broadcost the size of matrix.
3507 //------------------------------------------------------
3508
3509 // Broadcast modelBuffer size first
3510 MPI_Bcast(&size, 1, MPI_INT, sender, MPI_COMM_WORLD);
3511
3512 if (id != sender) {
3513 // Process except master receive the size of model.
3514 if (size <= 0) {
3515 throw CoinError("Msg size <= 0",
3516 "broadcastModel",
3517 "AlpsKnowledgeBrokerMPI");
3518 }
3519 if( modelBuffer != NULL ) {
3520 delete [] modelBuffer;
3521 modelBuffer = NULL;
3522 }
3523 modelBuffer = new char[size + 100];
3524 }
3525
3526 //------------------------------------------------------
3527 // Broadcost matrix.
3528 //------------------------------------------------------
3529
3530 MPI_Bcast(modelBuffer, size, MPI_CHAR, sender, MPI_COMM_WORLD);
3531
3532 if (id != sender) {
3533
3534 //--------------------------------------------------
3535 // Unpack to encoded model.
3536 //--------------------------------------------------
3537
3538 position = 0;
3539 encodedModel = unpackEncoded(modelBuffer,
3540 position,
3541 MPI_COMM_WORLD,
3542 size+100);
3543
3544 #ifdef NF_DEBUG
3545 std::cout << "PROCESS[" <<id<< "]: start to decode model."
3546 << ", knowledge type="<< encodedModel->type() << std::endl;
3547 #endif
3548
3549 //--------------------------------------------------
3550 // Note AlpsDataPool have a application model, do not need to
3551 // create more than one model. Just need to fill in model data.
3552 //--------------------------------------------------
3553
3554 model_->decodeToSelf(*encodedModel);
3555
3556 #ifdef NF_DEBUG
3557 std::cout << "PROCESS[" <<id<< "]: finished decoding model."
3558 << std::endl;
3559 #endif
3560
3561 //--------------------------------------------------
3562 // Set up self.
3563 //--------------------------------------------------
3564
3565 //(modifyDataPool()->getModel())->setupSelf();
3566
3567 model_->setupSelf();
3568 }
3569
3570 if (modelBuffer) {
3571 delete [] modelBuffer;
3572 modelBuffer = 0;
3573 }
3574 if (encodedModel) {
3575 delete encodedModel;
3576 encodedModel = 0;
3577 }
3578 }
3579
3580 //#############################################################################
3581
3582 void
sendIncumbent()3583 AlpsKnowledgeBrokerMPI::sendIncumbent()
3584 {
3585 int position = 0;
3586 int size = model_->AlpsPar()->entry(AlpsParams::smallSize);
3587
3588 int mySeq = rankToSequence(incumbentID_, globalRank_);
3589 int leftSeq = leftSequence(mySeq, processNum_);
3590 int rightSeq = rightSequence(mySeq, processNum_);
3591
3592 if (mySeq >= processNum_ || leftSeq >= processNum_
3593 || rightSeq >= processNum_) {
3594 std::cout << "sequence is Wrong !!!" << std::endl;
3595 abort();
3596 }
3597
3598 MPI_Pack(&incumbentValue_, 1, MPI_DOUBLE, smallBuffer_, size, &position,
3599 MPI_COMM_WORLD);
3600 MPI_Pack(&incumbentID_, 1, MPI_INT, smallBuffer_, size, &position,
3601 MPI_COMM_WORLD);
3602
3603 if (leftSeq != -1) {
3604 int leftRank = sequenceToRank(incumbentID_, leftSeq);
3605 #if 0
3606 std::cout << "PROC " << globalRank_
3607 <<" : init send a solution - L, value = "
3608 << incumbentValue_ << " to "<< leftRank << std::endl;
3609 #endif
3610 MPI_Isend(smallBuffer_, position, MPI_PACKED, leftRank,
3611 AlpsMsgIncumbentTwo, MPI_COMM_WORLD, &forwardRequestL_);
3612 }
3613
3614 if (rightSeq != -1) {
3615 int rightRank = sequenceToRank(incumbentID_, rightSeq);
3616 #if 0
3617 std::cout << "PROC " << globalRank_
3618 <<" : init send a solution - R, value = "
3619 << incumbentValue_ << " to "<< rightRank << std::endl;
3620 #endif
3621 MPI_Isend(smallBuffer_, position, MPI_PACKED, rightRank,
3622 AlpsMsgIncumbentTwo, MPI_COMM_WORLD, &forwardRequestR_);
3623 }
3624
3625 if (leftSeq != -1) {
3626 MPI_Status sentStatusL;
3627 MPI_Wait(&forwardRequestL_, &sentStatusL);
3628 incSendCount("sendIncumbent()");
3629 }
3630
3631 if (rightSeq != -1) {
3632 MPI_Status sentStatusR;
3633 MPI_Wait(&forwardRequestR_, &sentStatusR);
3634 incSendCount("sendIncumbent()");
3635 }
3636 }
3637
3638 //#############################################################################
3639
3640 void
collectBestSolution(int destination)3641 AlpsKnowledgeBrokerMPI::collectBestSolution(int destination)
3642 {
3643 MPI_Status status;
3644 int sender = incumbentID_;
3645 int position = 0;
3646
3647 #ifdef NF_DEBUG
3648 std::cout << "CollectBestSolution: sender=" << sender
3649 << ", destination=" << destination << std::endl;
3650 #endif
3651
3652 if ( sender == destination ) {
3653 // DO NOTHING since the best solution is in the solution of its pool
3654 }
3655 else {
3656 if (globalRank_ == sender) { // Send solu
3657 char* senderBuf = NULL; // MUST init to NULL, otherwise
3658 int size = -1; // packEncoded(..) crashes.
3659
3660 const AlpsSolution* solu = static_cast<const AlpsSolution* >
3661 (getBestKnowledge(AlpsKnowledgeTypeSolution).first);
3662
3663 double value = getBestKnowledge(AlpsKnowledgeTypeSolution).second;
3664
3665 AlpsEncoded* enc = solu->encode();
3666 packEncoded(enc, senderBuf, size, position, MPI_COMM_WORLD);
3667
3668 sendSizeBuf(senderBuf, size, position, destination,
3669 AlpsMsgIncumbent, MPI_COMM_WORLD);
3670 MPI_Send(&value, 1, MPI_DOUBLE, destination, AlpsMsgIncumbent,
3671 MPI_COMM_WORLD);
3672 if(senderBuf) {
3673 delete [] senderBuf;
3674 senderBuf = NULL;
3675 }
3676
3677 delete enc;
3678 enc = NULL;
3679
3680 #ifdef NF_DEBUG
3681 std::cout << "CollectBestSolution: sender " << sender
3682 << " sent solution " << value << std::endl;
3683 #endif
3684 }
3685 else if (globalRank_ == destination) { // Recv solu
3686 double value = 0.0;
3687 char* destBuf = NULL; //new char [ls];
3688
3689 receiveSizeBuf(destBuf, sender, AlpsMsgIncumbent,
3690 MPI_COMM_WORLD, &status);
3691 MPI_Recv(&value, 1, MPI_DOUBLE, sender, AlpsMsgIncumbent,
3692 MPI_COMM_WORLD, &status);
3693
3694 position = 0;
3695 AlpsEncoded* encodedSolu = unpackEncoded(destBuf,
3696 position,
3697 MPI_COMM_WORLD);
3698 AlpsSolution* bestSolu = static_cast<AlpsSolution* >
3699 ( decoderObject(encodedSolu->type())->decode(*encodedSolu) );
3700
3701 addKnowledge(AlpsKnowledgeTypeSolution, bestSolu, value);
3702
3703 if(destBuf) {
3704 delete [] destBuf;
3705 destBuf = NULL;
3706 }
3707 if (encodedSolu) {
3708 delete encodedSolu;
3709 encodedSolu = NULL;
3710 }
3711 #ifdef NF_DEBUG
3712 std::cout << "CollectBestSolution: destination " <<destination
3713 << " received solution " << value << std::endl;
3714 #endif
3715 }
3716 }
3717 }
3718
3719 //#############################################################################
3720
3721 void
tellMasterRecv()3722 AlpsKnowledgeBrokerMPI::tellMasterRecv()
3723 {
3724 char* dummyBuf = 0;
3725 MPI_Send(dummyBuf, 0, MPI_CHAR, masterRank_, AlpsMsgTellMasterRecv,
3726 MPI_COMM_WORLD);
3727 incSendCount("tellMasterRecv()");
3728 }
3729
3730 //#############################################################################
3731
3732 void
tellHubRecv()3733 AlpsKnowledgeBrokerMPI::tellHubRecv()
3734 {
3735 char* dummyBuf = 0;
3736 if (globalRank_ == myHubRank_)
3737 --hubDoBalance_;
3738 else {
3739 MPI_Send(dummyBuf, 0, MPI_CHAR, myHubRank_, AlpsMsgTellHubRecv,
3740 MPI_COMM_WORLD);
3741 incSendCount("tellHubRecv()");
3742 }
3743 }
3744
3745 //#############################################################################
3746
3747 void
packEncoded(AlpsEncoded * enc,char * & packBuffer,int & size,int & position,MPI_Comm comm)3748 AlpsKnowledgeBrokerMPI::packEncoded(AlpsEncoded* enc,
3749 char*& packBuffer,
3750 int& size,
3751 int& position,
3752 MPI_Comm comm)
3753 {
3754 const int bufSpare = model_->AlpsPar()->entry(AlpsParams::bufSpare);
3755
3756 int type = static_cast<int>(enc->type());
3757 int repSize = static_cast<int>(enc->size());
3758
3759 if(!packBuffer) {
3760 size = static_cast<int>(repSize + 2*sizeof(int) + bufSpare);
3761 packBuffer = new char[size];
3762 }
3763
3764 // Pack repSize, type_, representation_ of enc
3765 MPI_Pack(&type, 1, MPI_INT, packBuffer, size, &position, comm);
3766 MPI_Pack(&repSize, 1, MPI_INT, packBuffer, size, &position, comm);
3767 //MPI_Pack(const_cast<char*>(type), typeSize, MPI_CHAR,
3768 //packBuffer, size, &position, comm);
3769 MPI_Pack(const_cast<char*>(enc->representation()), repSize, MPI_CHAR,
3770 packBuffer, size, &position, comm);
3771 }
3772
3773 //#############################################################################
3774
3775 AlpsEncoded*
unpackEncoded(char * & unpackBuffer,int & position,MPI_Comm comm,int size)3776 AlpsKnowledgeBrokerMPI::unpackEncoded(char*& unpackBuffer,
3777 int& position,
3778 MPI_Comm comm,
3779 int size)
3780 {
3781 int type, repSize;
3782 AlpsEncoded *encoded = NULL;
3783
3784 if (size <= 0) {
3785 size = largeSize_;
3786 }
3787
3788 // Unpack typeSize, repSize, type and rep from unpackBuffer_
3789 MPI_Unpack(unpackBuffer, size, &position, &type, 1, MPI_INT, comm);
3790
3791 #if defined(NF_DEBUG_MORE)
3792 std::cout << "PROC "<< globalRank_ <<" : typeSize is "
3793 << typeSize << ";\tsize = "<< size << std::endl;
3794 #endif
3795
3796 MPI_Unpack(unpackBuffer, size, &position, &repSize, 1, MPI_INT, comm);
3797
3798 char *rep = new char[repSize + 1];
3799 MPI_Unpack(unpackBuffer, size, &position, rep, repSize, MPI_CHAR, comm);
3800 rep[repSize] = '\0';
3801
3802 #if defined(NF_DEBUG_MORE)
3803 std::cout << "PROC "<< globalRank_ << ": type = " << type
3804 << "; repSize = " << repSize << std::endl;
3805 #endif
3806
3807 // NOTE: Take over the memory of rep, but not type.
3808 encoded = new AlpsEncoded(type, repSize, rep );
3809
3810 return encoded;
3811 }
3812
3813 //#############################################################################
3814
3815 void
receiveSizeBuf(char * & buf,int sender,int tag,MPI_Comm comm,MPI_Status * status)3816 AlpsKnowledgeBrokerMPI::receiveSizeBuf(char*& buf,
3817 int sender,
3818 int tag,
3819 MPI_Comm comm,
3820 MPI_Status* status)
3821 {
3822 int size = -1;
3823
3824 // First recv the msg size
3825 MPI_Recv(&size, 1, MPI_INT, sender, MPI_ANY_TAG, comm, status);
3826 if (status->MPI_TAG == AlpsMsgFinishInit)
3827 return;
3828
3829 if (size < 0) {
3830 throw CoinError("size < 0", "receiveSizeBuf",
3831 "AlpsKnowledgeBrokerMPI");
3832 }
3833
3834 // Second, allocate memory for buf
3835 if (buf != 0) {
3836 delete [] buf; buf = 0;
3837 }
3838 buf = new char [size];
3839
3840 // Third, receive MPI packed data and put in buf
3841 MPI_Recv(buf, size, MPI_PACKED, sender, tag, comm, status);
3842 }
3843
3844 //#############################################################################
3845
3846 // Block receive a node and store in rampUpSubTree_
3847 // Used in rampup
3848 void
receiveRampUpNode(int sender,MPI_Comm comm,MPI_Status * status)3849 AlpsKnowledgeBrokerMPI::receiveRampUpNode(int sender,
3850 MPI_Comm comm,
3851 MPI_Status* status)
3852 {
3853 char* buf = 0;
3854 int position = 0;
3855
3856 receiveSizeBuf(buf, sender, AlpsMsgNode, comm, status);
3857
3858 if (status->MPI_TAG == AlpsMsgFinishInit ) {
3859 //std::cout << "PROC: " << globalRank_
3860 // <<" : rec AlpsMsgFinishInit... STOP INIT." << std::endl;
3861 }
3862 else if (status->MPI_TAG == AlpsMsgNode) {
3863 AlpsEncoded* encodedNode = unpackEncoded(buf, position, comm);
3864
3865 #ifdef NF_DEBUG
3866 std::cout << "WORKER: received and unpacked a node." << std::endl;
3867 std::cout << "WORKER: type() is " << encodedNode->type() << std::endl;
3868 std::cout << "WORKER: finish unpacking node." << std::endl;
3869 std::cout << "WORKER: start to decode node." << std::endl;
3870
3871 // const AlpsTreeNode* bugNode = dynamic_cast<const AlpsTreeNode* >(
3872 // AlpsKnowledge::decoderObject(encodedNode->type()));
3873 const AlpsTreeNode* bugNode = dynamic_cast<const AlpsTreeNode* >
3874 ( decoderObject(encodedNode->type()) );
3875
3876 std::cout <<"WORKER: bugNode's Priority = "
3877 << bugNode->getQuality() << std::endl;
3878 std::cout << "WORKER: finish just decoding node." << std::endl;
3879 #endif
3880
3881 AlpsTreeNode* node = dynamic_cast<AlpsTreeNode* >
3882 ( decoderObject(encodedNode->type())->decode(*encodedNode) );
3883
3884 #ifdef NF_DEBUG_MORE
3885 std::cout << "WORKER " << globalRank_ << " : received a node from "
3886 << sender << std::endl;
3887
3888 #endif
3889 // Make the node as a subtree root, then add it to the
3890 // local node pool
3891
3892 //node->setSubTree(st);
3893 node->setBroker(this);
3894 // todo(aykut) node desc does not hold a pointer to the model
3895 // it holds pointer to the broker.
3896 //node->modifyDesc()->setModel(model_);
3897 node->setParent(NULL);
3898
3899 // Do not want to do this, parent index is unique.
3900 //node->setParentIndex(-1);
3901
3902 // Can not do this since there are other status.
3903 // node->setStatus(AlpsNodeStatusCandidate);
3904
3905 rampUpSubTree_->nodePool()->addKnowledge(node, node->getQuality());
3906 assert(rampUpSubTree_->getNumNodes() > 0);
3907 if ( (rampUpSubTree_->nodePool()->getNumKnowledges() ) == 1) {
3908 // Make the first node as root.
3909 rampUpSubTree_->setRoot(node);
3910 }
3911
3912 if (encodedNode) {
3913 delete encodedNode;
3914 encodedNode = 0;
3915 }
3916 node = NULL;
3917 }
3918 else {
3919 std::cout << "PROC " << globalRank_ << " : receiveRampUpNode"
3920 << " : received UNKNOWN message: tag = " << status->MPI_TAG
3921 << "; sender = " << status->MPI_SOURCE <<std::endl;
3922 throw CoinError("Unknow message type",
3923 "receiveSizeNode()", "AlpsKnowledgeBrokerMPI");
3924 }
3925
3926 if (buf != 0) {
3927 delete [] buf;
3928 buf = 0;
3929 }
3930
3931 }
3932
3933 //#############################################################################
3934
3935 void
receiveSubTree(char * & bufLarge,int sender,MPI_Status * status)3936 AlpsKnowledgeBrokerMPI::receiveSubTree(char*& bufLarge,
3937 int sender,
3938 MPI_Status* status)
3939 {
3940 #ifdef NF_DEBUG_MORE
3941 std::cout << "WORKER " << globalRank_
3942 << " : start to receive a subtree from " << sender
3943 << "; num of subtrees = "
3944 << getNumKnowledges(AlpsKnowledgeTypeSubTree)
3945 << "; hasKnowlege() = "
3946 << subTreePool_->hasKnowledge() <<std::endl;
3947 #endif
3948
3949 int count = 0;
3950 int position = 0;
3951
3952 MPI_Get_count(status, MPI_PACKED, &count);
3953
3954 if (count <= 0) {
3955
3956 #ifdef NF_DEBUG_MORE
3957 std::cout << "PROC " << globalRank_
3958 << " : ask for a subtree but receive nothing from "
3959 << status->MPI_SOURCE << std::endl;
3960 #endif
3961 return;
3962 }
3963
3964 if (status->MPI_TAG == AlpsMsgSubTree ||
3965 status->MPI_TAG == AlpsMsgSubTreeByMaster ||
3966 status->MPI_TAG == AlpsMsgSubTreeByWorker) {
3967
3968 AlpsEncoded* encodedST = unpackEncoded(bufLarge,
3969 position,
3970 MPI_COMM_WORLD);
3971 AlpsSubTree* tempST = dynamic_cast<AlpsSubTree*>
3972 (const_cast<AlpsKnowledge *>
3973 (decoderObject(AlpsKnowledgeTypeSubTree)))->
3974 newSubTree();
3975
3976 tempST->setBroker(this);
3977 tempST->setNodeSelection(nodeSelection_);
3978
3979 AlpsSubTree* subTree =
3980 dynamic_cast<AlpsSubTree* >(tempST->decode(*encodedST) );
3981
3982 assert(subTree->getNumNodes() > 0);
3983
3984 subTree->calculateQuality();
3985 addKnowledge(AlpsKnowledgeTypeSubTree, subTree, subTree->getQuality());
3986
3987
3988 #if 0
3989 std::cout << "WORKER " << globalRank_ << " : received a subtree from "
3990 << sender << " of size " << count
3991 << "; num of subtrees = "
3992 << getNumKnowledges(AlpsKnowledgeTypeSubTree) <<std::endl;
3993 #endif
3994
3995 if (encodedST) {
3996 delete encodedST;
3997 encodedST = 0;
3998 }
3999 if (tempST) {
4000 delete tempST;
4001 tempST = 0;
4002 }
4003 subTree = 0;
4004
4005 }
4006 else {
4007 std::cout << "PROC " << globalRank_ << " : receiveSubTree"
4008 <<" : received UNKNOWN message, tag = " << status->MPI_TAG
4009 << ", source = " << status->MPI_SOURCE <<std::endl;
4010 throw CoinError("Unknow message type",
4011 "receiveSubTree()", "AlpsKnowledgeBrokerMPI");
4012 }
4013
4014 }
4015
4016 //#############################################################################
4017
4018 void
sendSizeBuf(char * & buf,int size,int position,const int target,const int tag,const MPI_Comm comm)4019 AlpsKnowledgeBrokerMPI::sendSizeBuf(char*& buf,
4020 int size,
4021 int position,
4022 const int target,
4023 const int tag,
4024 const MPI_Comm comm)
4025 {
4026 // If packing successfully, send buf size and buf to target
4027 if (size >= 0) {
4028 MPI_Send(&size, 1, MPI_INT, target, AlpsMsgSize, comm);
4029 MPI_Send(buf, position, MPI_PACKED, target, tag, comm);
4030 }
4031 else
4032 throw CoinError("Msg size is < 0", "send", "AlpsKnowledgeBrokerMPI");
4033 }
4034
4035 //#############################################################################
4036
4037 // Send a node from rampUpSubTree's node pool.
4038 // NOTE: comm can be hubComm_ or clusterComm_.
4039 void
sendRampUpNode(const int receiver,MPI_Comm comm)4040 AlpsKnowledgeBrokerMPI::sendRampUpNode(const int receiver, MPI_Comm comm)
4041 {
4042 char* buf = 0;
4043 int size = 0;
4044 int position = 0;
4045
4046 AlpsTreeNode* node = dynamic_cast<AlpsTreeNode* >
4047 (rampUpSubTree_->nodePool()->getKnowledge().first);
4048
4049 AlpsEncoded* enc = node->encode();
4050
4051 rampUpSubTree_->nodePool()->popKnowledge();
4052
4053 delete node; // Since sending to other process
4054
4055 packEncoded(enc, buf, size, position, comm);
4056 sendSizeBuf(buf, size, position, receiver, AlpsMsgNode, comm);
4057
4058 if (buf) {
4059 delete [] buf;
4060 buf = 0;
4061 }
4062
4063 if (enc) {
4064 delete enc;
4065 enc = 0; // Allocated in encode()
4066 }
4067 }
4068
4069 //#############################################################################
4070
4071 // Send a node from rampUpSubTree's node pool and generated model knowledge
4072 void
sendNodeModelGen(int receiver,int doUnitWork)4073 AlpsKnowledgeBrokerMPI::sendNodeModelGen(int receiver, int doUnitWork)
4074 {
4075 int position = 0;
4076 AlpsEncoded* enc = NULL;
4077
4078 // Pack if doUnitWork
4079 MPI_Pack(&doUnitWork, 1, MPI_INT, largeBuffer_, largeSize_, &position,
4080 MPI_COMM_WORLD);
4081
4082 // Pack a node
4083 AlpsTreeNode* node = dynamic_cast<AlpsTreeNode* >
4084 (rampUpSubTree_->nodePool()->getKnowledge().first);
4085 enc = node->encode();
4086 rampUpSubTree_->nodePool()->popKnowledge();
4087 delete node; // Since sending to other process
4088
4089 packEncoded(enc, largeBuffer_, largeSize_, position, MPI_COMM_WORLD);
4090 delete enc;
4091 enc = NULL;
4092
4093 // Pack generated model knowledge
4094 int hasKnowledge = 0;
4095 enc = model_->packSharedKnowlege();
4096 if (enc) {
4097 hasKnowledge = 1;
4098 // Pack flag indicating that there is model knowledge
4099 MPI_Pack(&hasKnowledge, 1, MPI_INT, largeBuffer_, largeSize_,
4100 &position, MPI_COMM_WORLD);
4101 // pack knowledge
4102 packEncoded(enc, largeBuffer_, largeSize_, position, MPI_COMM_WORLD);
4103 delete enc;
4104 enc = NULL;
4105 }
4106 else {
4107 // Pack flag indicating that there is no model knowledge
4108 MPI_Pack(&hasKnowledge, 1, MPI_INT, largeBuffer_, largeSize_,
4109 &position, MPI_COMM_WORLD);
4110 }
4111
4112 MPI_Send(largeBuffer_, position, MPI_PACKED, receiver, AlpsMsgNode,
4113 MPI_COMM_WORLD);
4114
4115 }
4116
4117 //#############################################################################
4118
4119 // return sent or not.
4120 bool
sendSubTree(const int receiver,AlpsSubTree * & st,int tag)4121 AlpsKnowledgeBrokerMPI::sendSubTree(const int receiver,
4122 AlpsSubTree*& st,
4123 int tag)
4124 {
4125 #ifdef NF_DEBUG
4126 std::cout << "WORKER["<< globalRank_
4127 << "]: start to donate a subtree to PROC " << receiver
4128 << std::endl;
4129 #endif
4130
4131 bool success = false;
4132 char* buf = 0;
4133 int size = 0;
4134 int position = 0;
4135
4136 AlpsEncoded* enc = st->encode();
4137 packEncoded(enc, buf, size, position, MPI_COMM_WORLD);
4138
4139 #if 0
4140 std::cout << "WORKER["<< globalRank_
4141 << "]: donor a subtree to PROC " << receiver
4142 << "; buf pos = " << position
4143 << "; buf size = " << size
4144 << "; largeSize_ = " << largeSize_ << std::endl;
4145 #endif
4146
4147 assert(size > 0);
4148
4149 if (size <= largeSize_) {
4150 // Attach buffer
4151 #if 0
4152 //MPI_Bsend(buf, position, MPI_PACKED, receiver, tag, MPI_COMM_WORLD);
4153 MPI_Send(buf, position, MPI_PACKED, receiver, tag, MPI_COMM_WORLD);
4154 #endif
4155 if (!attachBuffer_) {
4156 int attachSize = largeSize_* 6 + MPI_BSEND_OVERHEAD;
4157 attachBuffer_ = new char [attachSize];
4158 MPI_Buffer_attach(attachBuffer_, attachSize);
4159 }
4160 MPI_Ibsend(buf, position, MPI_PACKED, receiver, tag,
4161 MPI_COMM_WORLD, &subTreeRequest_);
4162
4163 success = true;
4164 }
4165 else {
4166 // msg size is larger than buffer size. Must not happen.
4167 success = false;
4168 std::cout << "WARNING: Subtree size is larger than message buffer size, will split it." << std::endl;
4169 }
4170
4171 if (buf) {
4172 delete [] buf;
4173 buf = 0;
4174 }
4175 if (enc) {
4176 delete enc;
4177 enc = 0; // Allocated in encode()
4178 }
4179
4180 return success;
4181 }
4182
4183 //#############################################################################
4184
4185 void
sendFinishInit(const int receiver,MPI_Comm comm)4186 AlpsKnowledgeBrokerMPI::sendFinishInit(const int receiver,
4187 MPI_Comm comm)
4188 {
4189 char* dummyBuf = 0;
4190 MPI_Send(dummyBuf, 0, MPI_PACKED, receiver, AlpsMsgFinishInit, comm);
4191 }
4192
4193 //#############################################################################
4194
deleteSubTrees()4195 void AlpsKnowledgeBrokerMPI::deleteSubTrees()
4196 {
4197 if (workingSubTree_) {
4198 delete workingSubTree_;
4199 workingSubTree_ = NULL;
4200 needWorkingSubTree_ = true;
4201 }
4202 subTreePool_-> deleteGuts();
4203 }
4204
4205 //#############################################################################
4206
4207 void
initializeSearch(int argc,char * argv[],AlpsModel & model)4208 AlpsKnowledgeBrokerMPI::initializeSearch(int argc,
4209 char* argv[],
4210 AlpsModel& model)
4211 {
4212
4213 //------------------------------------------------------
4214 // Store a pointer to model.
4215 //------------------------------------------------------
4216
4217 model.setBroker(this);
4218 model_ = &model;
4219
4220 //------------------------------------------------------
4221 // Init msg env.
4222 //------------------------------------------------------
4223
4224 MPI_Init(&argc, &argv);
4225 MPI_Comm_rank(MPI_COMM_WORLD, &globalRank_);
4226 MPI_Comm_size(MPI_COMM_WORLD, &processNum_);
4227
4228 // CORRECTME
4229 // NOTE: masterRank_ is 0 or 1 (debug). Must smaller than cluster size.
4230 masterRank_ = 0;
4231
4232 int color, i;
4233 int key = globalRank_;
4234
4235 //------------------------------------------------------
4236 // Register knowledge before broadcasting model.
4237 //------------------------------------------------------
4238
4239 model.registerKnowledge();
4240
4241 //------------------------------------------------------
4242 // Master read in parameters and model data.
4243 //------------------------------------------------------
4244
4245 if (globalRank_ == masterRank_) {
4246
4247 //--------------------------------------------------
4248 // Read in params.
4249 //--------------------------------------------------
4250
4251 model.readParameters(argc, argv);
4252
4253 msgLevel_ = model_->AlpsPar()->entry(AlpsParams::msgLevel);
4254 hubNum_ = model_->AlpsPar()->entry(AlpsParams::hubNum);
4255
4256 if (msgLevel_ > 0) {
4257 std::cout << "== Welcome to the Abstract Library for Parallel Search (ALPS) \n";
4258 std::cout << "== Copyright 2000-2019 Lehigh University and others \n";
4259 std::cout << "== All Rights Reserved. \n";
4260 std::cout << "== Distributed under the Eclipse Public License 1.0 \n";
4261 if (strcmp(ALPS_VERSION, "trunk")){
4262 std::cout << "== Version: " << ALPS_VERSION << std::endl;
4263 }else{
4264 std::cout << "== Version: Trunk (unstable) \n";
4265 }
4266 std::cout << "== Build Date: " << __DATE__;
4267 #ifdef ALPS_SVN_REV
4268 //std::cout << "\n== Revision Number: " << ALPS_SVN_REV;
4269 #endif
4270 std::cout << std::endl;
4271 }
4272
4273 // 12/20/06, do we need print parameter file name?
4274 if (msgLevel_ > 100) {
4275 messageHandler()->message(ALPS_PARAMFILE, messages())
4276 << argv[2] << CoinMessageEol;
4277 }
4278
4279 //--------------------------------------------------
4280 // Set up logfile if requred.
4281 //--------------------------------------------------
4282
4283 logFileLevel_ = model_->AlpsPar()->entry(AlpsParams::logFileLevel);
4284 if (logFileLevel_ > 0) { // Require log file
4285 logfile_ = model_->AlpsPar()->entry(AlpsParams::logFile);
4286 }
4287
4288 //--------------------------------------------------
4289 // Read in model data if required.
4290 //--------------------------------------------------
4291
4292 if (argc == 2) {
4293 // FIXME: only work for mpich, not lam
4294 model_->AlpsPar()->setEntry(AlpsParams::instance, argv[1]);
4295 }
4296
4297 std::string dataFile = model_->AlpsPar()->entry(AlpsParams::instance);
4298
4299 if (dataFile != "NONE") {
4300 if (msgLevel_ > 0) {
4301 messageHandler()->message(ALPS_DATAFILE, messages())
4302 << dataFile.c_str() << CoinMessageEol;
4303 }
4304
4305 model.readInstance(dataFile.c_str());
4306
4307 if (logFileLevel_ > 0) {
4308
4309 std::string fileDir = dataFile;
4310 std::string::size_type pos1 =
4311 fileDir.rfind ('/', std::string::npos);
4312 if (pos1 == std::string::npos) {
4313 // No /, put at beginning.
4314 pos1 = 0;
4315 //std::cout << "No /, pos1 = "<< pos1 << std::endl;
4316 }
4317 else {
4318 //std::cout << "Found /, pos1 = "<< pos1 << std::endl;
4319 ++pos1;
4320 }
4321
4322 std::string::size_type pos2 = fileDir.find (".mps", pos1);
4323
4324 if (pos2 == std::string::npos) {
4325 // No .mps
4326 //std::cout << "No .mps" << std::endl;
4327
4328 pos2 = fileDir.find (".gz", pos1);
4329 if (pos2 == std::string::npos) {
4330 // No .gz
4331 pos2 = fileDir.length();
4332 //std::cout << "No .gz, pos2 = "<< pos2 << std::endl;
4333 }
4334 else {
4335 //std::cout << "Found .gz, pos2 = "<< pos2 << std::endl;
4336 }
4337 }
4338
4339 // Sub-string from pos1 to pos2(not included)
4340 int length = static_cast<int>(pos2 - pos1);
4341 // std::cout << "pos1=" << pos1 <<", pos2="<< pos2
4342 // << ", lenght=" << length << std::endl;
4343
4344 instanceName_ = fileDir.substr(pos1, length);
4345 logfile_ = instanceName_ + ".log";
4346
4347 model_->AlpsPar()->setEntry(AlpsParams::logFile,
4348 logfile_.c_str());
4349
4350 std::ofstream logFout(logfile_.c_str());
4351
4352 logFout << "\n\n================================================"
4353 << std::endl;
4354 logFout << "Problem = " << instanceName_ << std::endl;
4355 logFout << "Data file = " << dataFile << std::endl;
4356 logFout << "Log file = " << logfile_ << std::endl;
4357 std::cout << "Problem = " << instanceName_ << std::endl;
4358 std::cout << "Log file = " << logfile_ << std::endl;
4359 }
4360 }
4361
4362 // modifyDataPool()->setAppParams(&userParams);// setupself needs userPar
4363
4364 model.preprocess();
4365 model.setupSelf();
4366 //std::cout << "Here1" << std::endl;
4367 }
4368
4369 MPI_Barrier(MPI_COMM_WORLD);
4370
4371 //------------------------------------------------------
4372 // Broadcast model.
4373 //------------------------------------------------------
4374
4375 broadcastModel(globalRank_, masterRank_);
4376
4377 MPI_Barrier(MPI_COMM_WORLD);
4378
4379 //------------------------------------------------------
4380 // Deside the cluster size and actual hubNum_.
4381 //------------------------------------------------------
4382
4383 msgLevel_ = model_->AlpsPar()->entry(AlpsParams::msgLevel);
4384 hubMsgLevel_ = model_->AlpsPar()->entry(AlpsParams::hubMsgLevel);
4385 workerMsgLevel_ = model_->AlpsPar()->entry(AlpsParams::workerMsgLevel);
4386 messageHandler()->setLogLevel(msgLevel_);
4387 hubNum_ = model_->AlpsPar()->entry(AlpsParams::hubNum);
4388
4389 timer_.limit_ = model_->AlpsPar()->entry(AlpsParams::timeLimit);
4390
4391 while(true) {
4392 if (hubNum_ > 0) {
4393 userClusterSize_ = 1;
4394 while (userClusterSize_ * hubNum_ < processNum_) {
4395 ++userClusterSize_;
4396 }
4397 // [0,...,cluSize-1] in group 1
4398 color = globalRank_ / userClusterSize_;
4399 }
4400 else {
4401 std::cout << "hubNum_ <= 0" << std::endl;
4402 throw CoinError("hubNum_ <= 0",
4403 "initSearch",
4404 "AlpsKnowledgeBrokerMPI");
4405 }
4406
4407 // more than 1 proc in the last cluser
4408 if (processNum_- userClusterSize_ * (hubNum_ - 1) > 1) {
4409 break;
4410 }
4411 else {
4412 --hubNum_;
4413 }
4414 }
4415 if ( (globalRank_ == masterRank_) && (msgLevel_ > 0) ) {
4416 messageHandler()->message(ALPS_HUB_NUM, messages())
4417 << hubNum_ << CoinMessageEol;
4418 }
4419
4420 //------------------------------------------------------
4421 // Create clusterComm_.
4422 //------------------------------------------------------
4423
4424 MPI_Comm_split(MPI_COMM_WORLD, color, key, &clusterComm_);
4425 MPI_Comm_rank(clusterComm_, &clusterRank_);
4426 MPI_Comm_size(clusterComm_, &clusterSize_);
4427
4428 #if 0
4429 std::cout << "+++ masterRank_ = " << masterRank_
4430 << ", clusterSize_ = " << clusterSize_
4431 << ", userClusterSize_ = " << userClusterSize_ << std::endl;
4432 #endif
4433
4434 //------------------------------------------------------
4435 // Create hubGroup_ and hubComm_.
4436 //------------------------------------------------------
4437
4438 hubRanks_ = new int [hubNum_];
4439 int k = 0;
4440 for (i = 0; i < processNum_; i += userClusterSize_) {
4441 hubRanks_[k++] = i + masterRank_;
4442 }
4443
4444 MPI_Group worldGroup;
4445 MPI_Comm_group(MPI_COMM_WORLD, &worldGroup);
4446 MPI_Group_incl(worldGroup, hubNum_, hubRanks_, &hubGroup_);
4447 MPI_Comm_create(MPI_COMM_WORLD, hubGroup_, &hubComm_);
4448
4449 //------------------------------------------------------
4450 // Allcoate index, classify process types, set up knowledge pools
4451 // Master : [0, INT_MAX/5]
4452 // Others : divide [INT_MAX/5 + 1, INT_MAX]
4453 //------------------------------------------------------
4454
4455 int workerDown = -1, workerUp = -1;
4456 int masterDown = 0;
4457 int masterUp = static_cast<int>(ALPS_INT_MAX / 5);
4458 int workerIndexR = static_cast<int>( static_cast<double>(ALPS_INT_MAX - masterUp)/(processNum_-1));
4459 workerIndexR -= 2; // leave some buffer
4460
4461 if (globalRank_ < masterRank_) {
4462 workerDown = masterUp + globalRank_ * workerIndexR;
4463 workerUp = workerDown + workerIndexR;
4464 }
4465 else if (globalRank_ > masterRank_) {
4466 workerDown = masterUp + (globalRank_-1) * workerIndexR;
4467 workerUp = workerDown + workerIndexR;
4468 }
4469
4470 masterIndexBatch_ = masterUp / (processNum_ * 2);
4471
4472 //------------------------------------------------------
4473 // Decide process type.
4474 //------------------------------------------------------
4475
4476 if (globalRank_ == masterRank_) {
4477 processType_ = AlpsProcessTypeMaster;
4478 setNextNodeIndex(masterDown);
4479 setMaxNodeIndex(masterUp);
4480
4481 #if 0
4482 std::cout << "masterDown = " << masterDown
4483 << "; masterUp = " << masterUp
4484 << "; workerDown = " << workerDown
4485 << "; workerUp = " << workerUp
4486 << "; masterIndexBatch_ = " << masterIndexBatch_
4487 << std::endl;
4488 #endif
4489 }
4490 else if ( (globalRank_ % userClusterSize_) == masterRank_ ) {
4491 processType_ = AlpsProcessTypeHub;
4492 setNextNodeIndex(workerDown);
4493 setMaxNodeIndex(workerUp);
4494 }
4495 else {
4496 processType_ = AlpsProcessTypeWorker;
4497 setNextNodeIndex(workerDown);
4498 setMaxNodeIndex(workerUp);
4499 }
4500
4501 // Determine if hubs process nodes.
4502 int hubWorkClusterSizeLimit =
4503 model_->AlpsPar()->entry(AlpsParams::hubWorkClusterSizeLimit);
4504 if (userClusterSize_ > hubWorkClusterSizeLimit) {
4505 hubWork_ = false;
4506 }
4507 else {
4508 hubWork_ = true;
4509 }
4510
4511 if (!processTypeList_) {
4512 delete [] processTypeList_;
4513 processTypeList_ = NULL;
4514 }
4515 processTypeList_ = new AlpsProcessType [processNum_];
4516 for (i = 0; i < processNum_; ++i) {
4517 if (i == masterRank_) {
4518 processTypeList_[i] = AlpsProcessTypeMaster;
4519 }
4520 else if ( (i % userClusterSize_) == masterRank_) {
4521 processTypeList_[i] = AlpsProcessTypeHub;
4522 }
4523 else {
4524 processTypeList_[i] = AlpsProcessTypeWorker;
4525 }
4526 }
4527
4528 //------------------------------------------------------
4529 // Set hub's global rank for workers and hubs.
4530 //------------------------------------------------------
4531
4532 if (processType_ == AlpsProcessTypeWorker ||
4533 processType_ == AlpsProcessTypeHub) {
4534
4535 myHubRank_ =
4536 (globalRank_ / userClusterSize_) * userClusterSize_ + masterRank_;
4537
4538 #ifdef NF_DEBUG
4539 std::cout << "PROCESS[" << globalRank_ << "] : my hub rank = "
4540 << myHubRank_ << std::endl;
4541 #endif
4542 }
4543
4544 //------------------------------------------------------
4545 // Set up knowledge pools.
4546 //------------------------------------------------------
4547
4548 setupKnowledgePools();
4549 MPI_Barrier(MPI_COMM_WORLD);
4550
4551 //------------------------------------------------------
4552 // Set max number of solutions.
4553 //------------------------------------------------------
4554
4555 const int mns = model_->AlpsPar()->entry(AlpsParams::solLimit);
4556 setMaxNumKnowledges(AlpsKnowledgeTypeSolution, mns);
4557
4558 //------------------------------------------------------
4559 // Set clock type
4560 //------------------------------------------------------
4561
4562 const bool clockType =
4563 model_->AlpsPar()->entry(AlpsParams::clockType);
4564
4565 timer_.setClockType(clockType);
4566 subTreeTimer_.setClockType(clockType);
4567 tempTimer_.setClockType(clockType);
4568
4569 //------------------------------------------------------
4570 // Allocate memory. TODO.
4571 //------------------------------------------------------
4572 }
4573
4574 //#############################################################################
4575
4576 /** Search best solution for a given model. */
search(AlpsModel * model)4577 void AlpsKnowledgeBrokerMPI::search(AlpsModel *model)
4578 {
4579 AlpsTreeNode* root = NULL;
4580 if (getProcRank() == masterRank_) {
4581 // Only master need create root.
4582 // NOTE: masterRank_ has been assigned in intializeSearch().
4583 root = model->createRoot();
4584 root->setBroker(this);
4585 root->modifyDesc()->setBroker(this);
4586 }
4587
4588 rootSearch(root);
4589 }
4590
4591 //#############################################################################
4592
rootSearch(AlpsTreeNode * root)4593 void AlpsKnowledgeBrokerMPI::rootSearch(AlpsTreeNode* root)
4594 {
4595
4596 timer_.start();
4597
4598 //------------------------------------------------------
4599 // Call main functions.
4600 //------------------------------------------------------
4601
4602 if (processType_ == AlpsProcessTypeMaster) {
4603 masterMain(root);
4604 }
4605 else if(processType_ == AlpsProcessTypeHub) {
4606 hubMain();
4607 }
4608 else if (processType_ == AlpsProcessTypeWorker){
4609 workerMain();
4610 }
4611 else {
4612 assert(0);
4613 }
4614
4615 //------------------------------------------------------
4616 // Collect best solution.
4617 //------------------------------------------------------
4618
4619 MPI_Barrier(MPI_COMM_WORLD);
4620 collectBestSolution(masterRank_);
4621
4622 // Search to end.
4623 if (processType_ == AlpsProcessTypeMaster &&
4624 exitStatus_ == AlpsExitStatusUnknown) {
4625 if (hasKnowledge(AlpsKnowledgeTypeSolution)) {
4626 setExitStatus(AlpsExitStatusOptimal);
4627 }
4628 else {
4629 setExitStatus(AlpsExitStatusInfeasible);
4630 }
4631 }
4632
4633 timer_.stop();
4634
4635 //------------------------------------------------------
4636 // log statistics.
4637 //------------------------------------------------------
4638
4639 if (processType_ == AlpsProcessTypeMaster) {
4640 model_->postprocess();
4641 model_->modelLog();
4642 }
4643 searchLog();
4644 }
4645
4646 //#############################################################################
4647
4648 // Master tell hubs to terminate due to reaching limits or other reason.
4649 void
masterForceHubTerm()4650 AlpsKnowledgeBrokerMPI::masterForceHubTerm()
4651 {
4652 char* buf = 0;
4653 forceTerminate_ = true;
4654 for (int i = 0; i < hubNum_; ++i) {
4655 if (i != masterRank_) {
4656 MPI_Send(buf, 0, MPI_PACKED, hubRanks_[i],
4657 AlpsMsgForceTerm, MPI_COMM_WORLD);
4658 incSendCount("masterForceHubTerm");
4659 }
4660 }
4661 }
4662
4663 //#############################################################################
4664
4665 // Hub tell workers to terminate due to reaching limits or other reason.
4666 void
hubForceWorkerTerm()4667 AlpsKnowledgeBrokerMPI::hubForceWorkerTerm()
4668 {
4669 char* buf = 0;
4670 forceTerminate_ = true;
4671 for (int i = 0; i < clusterSize_; ++i) {
4672 if (i != clusterRank_) {
4673 MPI_Send(buf, 0, MPI_PACKED, globalRank_+i,
4674 AlpsMsgForceTerm, MPI_COMM_WORLD);
4675 incSendCount("hubForceWorkerTerm");
4676 }
4677 }
4678 }
4679
4680 //#############################################################################
4681
4682 void
changeWorkingSubTree(double & changeWorkThreshold)4683 AlpsKnowledgeBrokerMPI::changeWorkingSubTree(double & changeWorkThreshold)
4684 {
4685 if ( workingSubTree_ && subTreePool_->hasKnowledge() ) {
4686 //std::cout << "Process[" << globalRank_ << "here 1" << std::endl;
4687 workingSubTree_->calculateQuality();
4688 //std::cout << "Process[" << globalRank_ << "here 2" << std::endl;
4689
4690 double curQuality = workingSubTree_->getQuality();
4691 double topQuality = subTreePool_->getBestQuality();
4692 //double topQuality = subTreePool_->getKnowledge().second;
4693
4694 if (curQuality > topQuality) {
4695 double perDiff = ALPS_FABS(curQuality - topQuality)/
4696 (ALPS_FABS(curQuality) + 1.0e-9);
4697 if (perDiff > changeWorkThreshold) {
4698 // Need put node in regular node pool.
4699 workingSubTree_->reset();
4700
4701 AlpsSubTree* tempST = workingSubTree_;
4702 workingSubTree_ = dynamic_cast<AlpsSubTree* >(
4703 subTreePool_->getKnowledge().first);
4704 subTreePool_->popKnowledge();
4705 addKnowledge(AlpsKnowledgeTypeSubTree, tempST, curQuality);
4706 ++(psStats_.subtreeChange_);
4707
4708 // Avoid too much change.
4709 if (psStats_.subtreeChange_ / 10 == 0) {
4710 changeWorkThreshold *= 2.0;
4711 }
4712 #if 0
4713 std::cout << "Process[" << globalRank_
4714 << "]: change subtree " << curQuality
4715 << " to " << topQuality
4716 << ", new working tree nodes "
4717 << workingSubTree_->getNumNodes()
4718 << std::endl;
4719 #endif
4720
4721 }
4722 }
4723 }
4724 }
4725
4726 //#############################################################################
4727
4728 void
searchLog()4729 AlpsKnowledgeBrokerMPI::searchLog()
4730 {
4731 int i, j;
4732 int* numProcessed = 0;
4733 int* numPartial = 0;
4734 int* numBranched = 0;
4735 int* numDiscarded = 0;
4736 int* numLeft = 0;
4737 int* depths = 0;
4738 double* idles = 0;
4739 double* msgTimes = 0;
4740 double* rampUps = 0;
4741 double* rampDowns = 0;
4742 double *cpuTimes = NULL;
4743 double *wallClocks = NULL;
4744
4745 int *qualityBalance = 0;
4746 int *quantityBalance = 0;
4747 int *interBalance = 0;
4748 int *intraBalance = 0;
4749 int *workerAsk = 0;
4750 int *donateSuccess = 0;
4751 int *donateFail = 0;
4752 int *subtreeSplit = 0;
4753 int *subtreeWhole = 0;
4754 int *subtreeChange = 0;
4755
4756 if (globalRank_ == masterRank_) {
4757 numProcessed = new int [processNum_];
4758 numPartial = new int [processNum_];
4759 numBranched = new int [processNum_];
4760 numDiscarded = new int [processNum_];
4761 numLeft = new int [processNum_];
4762 depths = new int [processNum_];
4763 idles = new double [processNum_];
4764 msgTimes = new double [processNum_];
4765 rampUps = new double [processNum_];
4766 rampDowns = new double [processNum_];
4767 cpuTimes = new double [processNum_];
4768 wallClocks = new double [processNum_];
4769 qualityBalance = new int [processNum_];
4770 quantityBalance = new int [processNum_];
4771 interBalance = new int [processNum_];
4772 intraBalance = new int [processNum_];
4773 workerAsk = new int [processNum_];
4774 donateSuccess = new int [processNum_];
4775 donateFail = new int [processNum_];
4776 subtreeSplit = new int [processNum_];
4777 subtreeWhole = new int [processNum_];
4778 subtreeChange = new int [processNum_];
4779 }
4780
4781 MPI_Gather(&nodeProcessedNum_, 1, MPI_INT, numProcessed, 1, MPI_INT,
4782 masterRank_, MPI_COMM_WORLD);
4783 MPI_Gather(&nodePartialNum_, 1, MPI_INT, numPartial, 1, MPI_INT,
4784 masterRank_, MPI_COMM_WORLD);
4785 MPI_Gather(&nodeBranchedNum_, 1, MPI_INT, numBranched, 1, MPI_INT,
4786 masterRank_, MPI_COMM_WORLD);
4787 MPI_Gather(&nodeDiscardedNum_, 1, MPI_INT, numDiscarded, 1, MPI_INT,
4788 masterRank_, MPI_COMM_WORLD);
4789 MPI_Gather(&nodeLeftNum_, 1, MPI_INT, numLeft, 1, MPI_INT,
4790 masterRank_, MPI_COMM_WORLD);
4791 MPI_Gather(&treeDepth_, 1, MPI_INT, depths, 1, MPI_INT, masterRank_,
4792 MPI_COMM_WORLD);
4793 MPI_Gather(&idleTime_, 1, MPI_DOUBLE, idles, 1, MPI_DOUBLE, masterRank_,
4794 MPI_COMM_WORLD);
4795 MPI_Gather(&msgTime_, 1, MPI_DOUBLE, msgTimes, 1, MPI_DOUBLE, masterRank_,
4796 MPI_COMM_WORLD);
4797 MPI_Gather(&rampUpTime_, 1, MPI_DOUBLE, rampUps, 1, MPI_DOUBLE,
4798 masterRank_, MPI_COMM_WORLD);
4799 MPI_Gather(&rampDownTime_, 1, MPI_DOUBLE, rampDowns, 1, MPI_DOUBLE,
4800 masterRank_, MPI_COMM_WORLD);
4801 MPI_Gather(&(timer_.cpu_), 1, MPI_DOUBLE, cpuTimes, 1, MPI_DOUBLE,
4802 masterRank_, MPI_COMM_WORLD);
4803 MPI_Gather(&(timer_.wall_), 1, MPI_DOUBLE, wallClocks, 1, MPI_DOUBLE,
4804 masterRank_, MPI_COMM_WORLD);
4805 MPI_Gather(&(psStats_.qualityBalance_), 1, MPI_INT, qualityBalance, 1,
4806 MPI_INT, masterRank_, MPI_COMM_WORLD);
4807 MPI_Gather(&(psStats_.quantityBalance_), 1, MPI_INT, quantityBalance, 1,
4808 MPI_INT, masterRank_, MPI_COMM_WORLD);
4809 MPI_Gather(&(psStats_.interBalance_), 1, MPI_INT, interBalance, 1,
4810 MPI_INT, masterRank_, MPI_COMM_WORLD);
4811 MPI_Gather(&(psStats_.intraBalance_), 1, MPI_INT, intraBalance, 1,
4812 MPI_INT, masterRank_, MPI_COMM_WORLD);
4813 MPI_Gather(&(psStats_.workerAsk_), 1, MPI_INT, workerAsk, 1,
4814 MPI_INT, masterRank_, MPI_COMM_WORLD);
4815 MPI_Gather(&(psStats_.donateSuccess_), 1, MPI_INT, donateSuccess, 1,
4816 MPI_INT, masterRank_, MPI_COMM_WORLD);
4817 MPI_Gather(&(psStats_.donateFail_), 1, MPI_INT, donateFail, 1,
4818 MPI_INT, masterRank_, MPI_COMM_WORLD);
4819 MPI_Gather(&(psStats_.subtreeSplit_), 1, MPI_INT, subtreeSplit, 1,
4820 MPI_INT, masterRank_, MPI_COMM_WORLD);
4821 MPI_Gather(&(psStats_.subtreeWhole_), 1, MPI_INT, subtreeWhole, 1,
4822 MPI_INT, masterRank_, MPI_COMM_WORLD);
4823 MPI_Gather(&(psStats_.subtreeChange_), 1, MPI_INT, subtreeChange, 1,
4824 MPI_INT, masterRank_, MPI_COMM_WORLD);
4825
4826 if (processType_ == AlpsProcessTypeMaster) {
4827 int numWorkers = 0; // Number of process are processing nodes.
4828
4829 int sumSize = 0, aveSize = 0, maxSize = 0, minSize = ALPS_INT_MAX;
4830 int sumDep = 0, aveDep = 0, maxDep = 0, minDep = ALPS_INT_MAX;
4831
4832 double sumIdle = 0.0, aveIdle = 0.0;
4833 double maxIdle = 0.0, minIdle = ALPS_DBL_MAX;
4834
4835 double sumMsgTime = 0.0, aveMsgTime = 0.0;
4836 double maxMsgTime = 0.0, minMsgTime = ALPS_DBL_MAX;
4837
4838 double sumRampUp = 0.0, aveRampUp = 0.0;
4839 double maxRampUp = 0.0, minRampUp = ALPS_DBL_MAX;
4840
4841 double sumRampDown = 0.0, aveRampDown = 0.0;
4842 double maxRampDown = 0.0, minRampDown = ALPS_DBL_MAX;
4843
4844 double sumCpuTime = 0.0, aveCpuTime = 0.0;
4845 double maxCpuTime = 0.0, minCpuTime = ALPS_DBL_MAX;
4846
4847 double sumWallClock = 0.0, aveWallClock = 0.0;
4848 double maxWallClock = 0.0, minWallClock = ALPS_DBL_MAX;
4849
4850 double varSize = 0.0, stdSize = 0.0;
4851 double varDep = 0.0, stdDep = 0.0;
4852 double varIdle = 0.0, stdIdle = 0.0;
4853 double varMsgTime = 0.0, stdMsgTime = 0.0;
4854 double varRampUp = 0.0, stdRampUp = 0.0;
4855 double varRampDown = 0.0, stdRampDown = 0.0;
4856
4857 int totalProcessed = 0;
4858 int totalPartial = 0;
4859 int totalBranched = 0;
4860 int totalDiscarded = 0;
4861 int totalLeft = 0;
4862
4863 int sumQualityBalance = 0;
4864 int sumQuantityBalance = 0;
4865 int sumInterBalance = 0;
4866 int sumIntraBalance = 0;
4867 int sumWorkerAsk = 0;
4868 int sumDonateSuccess = 0;
4869 int sumDonateFail = 0;
4870 int sumSubtreeSplit = 0;
4871 int sumSubtreeWhole = 0;
4872 int sumSubtreeChange = 0;
4873
4874 if(logFileLevel_ > 0 || msgLevel_ > 0) {
4875 for(i = 0; i < processNum_; ++i) {
4876 totalProcessed += numProcessed[i];
4877 totalPartial += numPartial[i];
4878 totalBranched += numBranched[i];
4879 totalDiscarded += numDiscarded[i];
4880 totalLeft += numLeft[i];
4881
4882 sumRampUp += rampUps[i];
4883 sumCpuTime += cpuTimes[i];
4884 sumWallClock += wallClocks[i];
4885
4886 sumQualityBalance += qualityBalance[i];
4887 sumQuantityBalance += quantityBalance[i];
4888 sumInterBalance += interBalance[i];
4889 sumIntraBalance += intraBalance[i];
4890 sumWorkerAsk += workerAsk[i];
4891 sumDonateSuccess += donateSuccess[i];
4892 sumDonateFail += donateFail[i];
4893 sumSubtreeSplit += subtreeSplit[i];
4894 sumSubtreeWhole += subtreeWhole[i];
4895 sumSubtreeChange += subtreeChange[i];
4896
4897 // Only valid for workers.
4898 if (processTypeList_[i] == AlpsProcessTypeWorker) {
4899 ++numWorkers;
4900
4901 sumSize += numProcessed[i];
4902 sumDep += depths[i];
4903 sumIdle += idles[i];
4904 sumMsgTime += msgTimes[i];
4905 sumRampDown += rampDowns[i];
4906
4907 if (numProcessed[i] > maxSize) maxSize = numProcessed[i];
4908 if (numProcessed[i] < minSize) minSize = numProcessed[i];
4909
4910 if (depths[i] > maxDep) maxDep = depths[i];
4911 if (depths[i] < minDep) minDep = depths[i];
4912
4913 if (idles[i] > maxIdle) maxIdle = idles[i];
4914 if (idles[i] < minIdle) minIdle = idles[i];
4915
4916 if (msgTimes[i] > maxMsgTime) maxMsgTime = msgTimes[i];
4917 if (msgTimes[i] < minMsgTime) minMsgTime = msgTimes[i];
4918
4919 if (rampDowns[i] > maxRampDown) maxRampDown = rampDowns[i];
4920 if (rampDowns[i] < minRampDown) minRampDown = rampDowns[i];
4921 }
4922
4923 if (cpuTimes[i] > maxCpuTime) maxCpuTime = cpuTimes[i];
4924 if (cpuTimes[i] < minCpuTime) minCpuTime = cpuTimes[i];
4925
4926 if (wallClocks[i] > maxWallClock) maxWallClock = wallClocks[i];
4927 if (wallClocks[i] < minWallClock) minWallClock = wallClocks[i];
4928
4929 if (rampUps[i] > maxRampUp) maxRampUp = rampUps[i];
4930 if (rampUps[i] < minRampUp) minRampUp = rampUps[i];
4931 }
4932
4933 #if 0
4934 // Does not make sense, 2/24/07
4935 for(i = 0; i < processNum_; ++i) { // Adjust idle and rampDown
4936 if (numProcessed[i] > 0) { // due to periodically checking.
4937 //idles[i] -= minIdle;
4938 //rampDowns[i] -= minRampDown;
4939 }
4940 }
4941 sumIdle -= minIdle * numWorkers;
4942 sumRampDown -= minRampDown * numWorkers;
4943 maxIdle -= minIdle;
4944 minIdle = 0.0;
4945 maxRampDown -= minRampDown;
4946 minRampDown = 0.0;
4947 #endif
4948
4949 if( numWorkers != 0) {
4950 aveSize = sumSize / numWorkers;
4951 aveDep = sumDep / numWorkers;
4952 aveIdle = sumIdle / numWorkers;
4953 aveMsgTime = sumMsgTime / numWorkers;
4954 aveRampDown = sumRampDown / numWorkers;
4955 }
4956 aveRampUp = sumRampUp / processNum_;
4957
4958 for (i = 0; i < processNum_; ++i) {
4959 if (processTypeList_[i] == AlpsProcessTypeWorker) {
4960 varSize += pow(numProcessed[i] - aveSize, 2);
4961 varDep += pow(depths[i] - aveDep, 2);
4962 varIdle = pow(idles[i] - aveIdle, 2);
4963 varMsgTime = pow(msgTimes[i] - aveMsgTime, 2);
4964 varRampDown = pow(rampDowns[i] - aveRampDown, 2);
4965 }
4966 varRampUp = pow(rampUps[i] - aveRampUp, 2);
4967 }
4968 if( numWorkers != 0) {
4969 varSize /= numWorkers;
4970 stdSize = sqrt(varSize);
4971 varDep /= numWorkers;
4972 stdDep = sqrt(varDep);
4973 varIdle /= numWorkers;
4974 stdIdle = sqrt(varIdle);
4975 varMsgTime /= numWorkers;
4976 stdMsgTime = sqrt(varMsgTime);
4977 varRampDown /= numWorkers;
4978 stdRampDown = sqrt(varRampDown);
4979 varRampUp /= numWorkers;
4980 }
4981 stdRampUp = sqrt(varRampUp);
4982 aveCpuTime = sumCpuTime/processNum_;
4983 aveWallClock = sumWallClock/processNum_;
4984 }
4985
4986 if (logFileLevel_ > 0) {
4987 std::ofstream logFout(logfile_.c_str(), std::ofstream::app);
4988
4989 logFout << "Number of processes = " << processNum_ << std::endl;
4990 logFout << "Number of hubs = " << hubNum_ << std::endl;
4991
4992 //----------------------------------------------
4993 // Tree size and depth.
4994 //----------------------------------------------
4995
4996 logFout << "Number of nodes processed = "
4997 << totalProcessed << std::endl;
4998 if (totalPartial > 0){
4999 logFout << "Number of nodes partially processed = "
5000 << totalPartial << std::endl;
5001 }
5002 logFout << "Number of nodes branched = "
5003 << totalBranched << std::endl;
5004 logFout << "Number of nodes pruned before processing = "
5005 << totalDiscarded << std::endl;
5006 logFout << "Number of nodes left = "
5007 << totalLeft << std::endl;
5008 logFout << "Max number of nodes processed by a worker = "
5009 << maxSize << std::endl;
5010 logFout << "Min number of nodes processed by a worker = "
5011 << (minSize != ALPS_INT_MAX ? minSize : 0) << std::endl;
5012 logFout << "Std Dev of number of nodes processed by a worker = "
5013 << stdSize << std::endl;
5014 logFout << std::endl << "Max Tree Depth on workers = "
5015 << maxDep << std::endl;
5016 logFout << "Min Tree Depth on workers = "
5017 << (minDep != ALPS_INT_MAX ? minDep : 0) << std::endl;
5018 logFout << "Std Dev of Tree Depth on workers = "
5019 << stdDep << std::endl;
5020
5021 if (logFileLevel_ > 0) {
5022 j = 0;
5023 logFout << std::endl
5024 << "Numbers of Nodes processed by processes: "
5025 << std::endl;
5026 for (i = 0; i < processNum_; ++i) {
5027 ++j;
5028 if (j % 5 == 0) logFout << std::endl;
5029 logFout << numProcessed[i] << "\t";
5030 }
5031
5032 j = 0;
5033 logFout << std::endl << std::endl
5034 << "Tree depths on processes: "
5035 << std::endl;
5036 for (i = 0; i < processNum_; ++i) {
5037 ++j;
5038 if (j % 5 == 0) logFout << std::endl;
5039 logFout << depths[i] << "\t";
5040 }
5041 logFout << std::endl << std::endl;
5042 } // Log if logFileLevel_ > 0
5043
5044 //----------------------------------------------
5045 // Times.
5046 //----------------------------------------------
5047
5048 // Ramp up.
5049 logFout << "Average RampUp = " << aveRampUp << std::endl;
5050 logFout << "Max RampUp = " << maxRampUp << std::endl;
5051 logFout << "Min RampUp = "
5052 << (minRampUp != ALPS_DBL_MAX ? minRampUp : 0.0)
5053 << std::endl;
5054 logFout << "Std Dev of RampUp = " << stdRampUp << std::endl;
5055
5056 if (logFileLevel_ > 0) {
5057 logFout << std::endl << "Ramp ups of processes: " << std::endl;
5058 for (i = 0; i < processNum_; ++i) {
5059 if (i % 5 == 0)
5060 logFout << std::endl;
5061 logFout << rampUps[i] << "\t";
5062 }
5063 logFout << std::endl;
5064 }
5065
5066 // Idle.
5067 logFout << "Average Idle = " << aveIdle << std::endl;
5068 logFout << "Max Idle = " << maxIdle << std::endl;
5069 logFout << "Min Idle = "
5070 << (minIdle != ALPS_DBL_MAX ? minIdle : 0.0) << std::endl;
5071 logFout << "Std Dev of Idle = " << stdIdle << std::endl;
5072
5073 // Msg time.
5074 logFout << "Average MsgTime = " << aveMsgTime << std::endl;
5075 logFout << "Max MsgTime = " << maxMsgTime << std::endl;
5076 logFout << "Min MsgTime = "
5077 << (minMsgTime != ALPS_DBL_MAX ? minMsgTime : 0.0)
5078 << std::endl;
5079 logFout << "Std Dev of MsgTime = " << stdMsgTime << std::endl;
5080
5081 // Ramp down.
5082 logFout << "Average RampDown = " << aveRampDown << std::endl;
5083 logFout << "Max RampDown = " << maxRampDown << std::endl;
5084 logFout << "Min RampDown = "
5085 << (minRampDown != ALPS_DBL_MAX ? minRampDown : 0.0)
5086 << std::endl;
5087 logFout << "Std Dev of RampDown = " << stdRampDown << std::endl;
5088
5089 // Overall.
5090 logFout << "Search CPU time (master) = " << timer_.getCpuTime()
5091 << " seconds" << std::endl;
5092 logFout << "Max CPU time (worker) = " << maxCpuTime << std::endl;
5093 logFout << "Min CPU time (worker) = " << minCpuTime << std::endl;
5094 logFout << "Total CPU time = " << sumCpuTime << std::endl;
5095 logFout << "Search wallclock time (master) = "
5096 << timer_.getWallClock() << " seconds" << std::endl;
5097 logFout << "Max wallclock (worker) = "<< maxWallClock << std::endl;
5098 logFout << "Min wallclock (worker) = "<< minWallClock << std::endl;
5099 logFout << "Total wallclock = " << sumWallClock << std::endl;
5100
5101 //----------------------------------------------
5102 // Solution.
5103 //----------------------------------------------
5104
5105 if (hasKnowledge(AlpsKnowledgeTypeSolution)) {
5106 logFout << "Best solution quality = " << getBestQuality()
5107 << std::endl;
5108 logFout << "Nodes processed before finding this solution = "
5109 << bestSolNode_ << std::endl;
5110 }
5111 else {
5112 logFout << "No solution was found." << std::endl;
5113 }
5114 if (hasKnowledge(AlpsKnowledgeTypeSolution) ) {
5115 dynamic_cast<AlpsSolution* >
5116 (getBestKnowledge(AlpsKnowledgeTypeSolution).first)->print(logFout);
5117 }
5118
5119 } // Log if logFileLevel_ > 0
5120
5121 if (msgLevel_ > 0) {
5122 if (getSolStatus() == AlpsExitStatusOptimal) {
5123 messageHandler()->message(ALPS_T_COMPLETE, messages())
5124 << systemNodeProcessed_
5125 << nodePartialNum_
5126 << static_cast<int>(systemWorkQuantity_)
5127 << CoinMessageEol;
5128 }
5129 else if (getSolStatus() == AlpsExitStatusNodeLimit) {
5130 if (forceTerminate_) {
5131 messageHandler()->message(ALPS_T_NODE_LIMIT, messages())
5132 << systemNodeProcessed_
5133 << static_cast<int>(systemWorkQuantityForce_)
5134 << CoinMessageEol;
5135 }
5136 else {
5137 messageHandler()->message(ALPS_T_NODE_LIMIT, messages())
5138 << systemNodeProcessed_
5139 << static_cast<int>(systemWorkQuantity_)
5140 << CoinMessageEol;
5141 }
5142 }
5143 else if (getSolStatus() == AlpsExitStatusSolLimit) {
5144 if (forceTerminate_) {
5145 messageHandler()->message(ALPS_T_SOL_LIMIT, messages())
5146 << systemNodeProcessed_
5147 << static_cast<int>(systemWorkQuantityForce_)
5148 << CoinMessageEol;
5149 }
5150 else {
5151 messageHandler()->message(ALPS_T_SOL_LIMIT, messages())
5152 << systemNodeProcessed_
5153 << static_cast<int>(systemWorkQuantity_)
5154 << CoinMessageEol;
5155 }
5156 }
5157 else if (getSolStatus() == AlpsExitStatusTimeLimit) {
5158 #if 0
5159 std::cout << "------ forceTerminate_=" << forceTerminate_
5160 << ", systemWorkQuantityForce_ = "
5161 << static_cast<int>(systemWorkQuantityForce_)
5162 << std::endl;
5163 #endif
5164
5165 if (forceTerminate_) {
5166 messageHandler()->message(ALPS_T_TIME_LIMIT, messages())
5167 << systemNodeProcessed_
5168 << static_cast<int>(systemWorkQuantityForce_)
5169 << CoinMessageEol;
5170 }
5171 else {
5172 messageHandler()->message(ALPS_T_TIME_LIMIT, messages())
5173 << systemNodeProcessed_
5174 << static_cast<int>(systemWorkQuantity_)
5175 << CoinMessageEol;
5176 }
5177 }
5178 else if (getSolStatus() == AlpsExitStatusFeasible) {
5179 messageHandler()->message(ALPS_T_FEASIBLE, messages())
5180 << systemNodeProcessed_
5181 << static_cast<int>(systemWorkQuantity_)
5182 << CoinMessageEol;
5183 }
5184 else if (getSolStatus() == AlpsExitStatusNoMemory) {
5185 messageHandler()->message(ALPS_T_NO_MEMORY, messages())
5186 << systemNodeProcessed_
5187 << static_cast<int>(systemWorkQuantity_)
5188 << CoinMessageEol;
5189 }
5190 else if (getSolStatus() == AlpsExitStatusFailed) {
5191 messageHandler()->message(ALPS_T_FAILED, messages())
5192 << systemNodeProcessed_
5193 << static_cast<int>(systemWorkQuantity_)
5194 << CoinMessageEol;
5195 }
5196 else {
5197 messageHandler()->message(ALPS_T_INFEASIBLE, messages())
5198 << systemNodeProcessed_
5199 << static_cast<int>(systemWorkQuantity_)
5200 << CoinMessageEol;
5201 }
5202
5203 std::cout << "\n=================== SEARCH RESULTS =================="
5204 << std::endl;
5205
5206 //----------------------------------------------
5207 // Tree size, depth, overheads.
5208 //----------------------------------------------
5209
5210 std::cout << "Number of nodes processed = "
5211 << totalProcessed << std::endl;
5212 if (totalPartial >0){
5213 std::cout << "Number of nodes partially processed = "
5214 << totalPartial << std::endl;
5215 }
5216 std::cout << "Number of nodes branched = "
5217 << totalBranched << std::endl;
5218 std::cout << "Number of nodes pruned before processing = "
5219 << totalDiscarded << std::endl;
5220 std::cout << "Number of nodes left = "
5221 << totalLeft << std::endl;
5222 std::cout << "Max number of nodes processed by a worker = "
5223 << maxSize << std::endl;
5224 std::cout << "Min number of nodes processed by a worker = "
5225 << (minSize != ALPS_INT_MAX ? minSize : 0) << std::endl;
5226 std::cout << "Std Dev of number of nodes processed by a worker = "
5227 << stdSize << std::endl;
5228 std::cout << "----------------------------------" << std::endl;
5229
5230 std::cout << "Average tree depth = " << aveDep << std::endl;
5231 std::cout << "Max tree depth on workers = " << maxDep << std::endl;
5232 std::cout << "Min tree depth on workers = "
5233 << (minDep != ALPS_INT_MAX ? minDep : 0) << std::endl;
5234 std::cout << "Std Dev of tree depth on workers = " << stdDep
5235 << std::endl;
5236 std::cout << "----------------------------------" << std::endl;
5237
5238 // Ramp up.
5239 std::cout << "Average RampUp = " << aveRampUp << std::endl;
5240 std::cout << "Max RampUp = " << maxRampUp << std::endl;
5241 std::cout << "Min RampUp = "
5242 << (minRampUp != ALPS_DBL_MAX ? minRampUp : 0.0)
5243 << std::endl;
5244 std::cout << "Std Dev of RampUp = " << stdRampUp << std::endl;
5245 std::cout << "----------------------------------" << std::endl;
5246
5247 // Idle.
5248 std::cout << "Average Idle = " << aveIdle << std::endl;
5249 std::cout << "Max Idle = " << maxIdle << std::endl;
5250 std::cout << "Min Idle = "
5251 << (minIdle != ALPS_DBL_MAX ? minIdle : 0.0)
5252 << std::endl;
5253 std::cout << "Std Dev of Idle = " << stdIdle << std::endl;
5254 std::cout << "----------------------------------" << std::endl;
5255
5256 // MsgTime.
5257 std::cout << "Average MsgTime = " << aveMsgTime << std::endl;
5258 std::cout << "Max MsgTime = " << maxMsgTime << std::endl;
5259 std::cout << "Min MsgTime = "
5260 << (minMsgTime != ALPS_DBL_MAX ? minMsgTime : 0.0)
5261 << std::endl;
5262 std::cout << "Std Dev of MsgTime = " << stdMsgTime << std::endl;
5263 std::cout << "----------------------------------" << std::endl;
5264
5265 // Ramp down.
5266 std::cout << "Average RampDown = " << aveRampDown << std::endl;
5267 std::cout << "Max RampDown = " << maxRampDown << std::endl;
5268 std::cout << "Min RampDown = "
5269 << (minRampDown != ALPS_DBL_MAX ? minRampDown : 0.0)
5270 << std::endl;
5271 std::cout << "Std Dev of RampDown = " << stdRampDown << std::endl;
5272 std::cout << "----------------------------------" << std::endl;
5273
5274 //----------------------------------------------
5275 // Load balancing
5276 //----------------------------------------------
5277
5278 std::cout << "Quality balance = " << sumQualityBalance
5279 << std::endl;
5280 std::cout << "Quantity balance = " << sumQuantityBalance
5281 << std::endl;
5282 std::cout << "Inter-balance = " << sumInterBalance
5283 << std::endl;
5284 std::cout << "Intra-balance = " << sumIntraBalance
5285 << std::endl;
5286 std::cout << "Worker asked workload = " << sumWorkerAsk
5287 << std::endl;
5288 std::cout << "Worker donate success = " << sumDonateSuccess
5289 << std::endl;
5290 std::cout << "Worker donate fail = " << sumDonateFail
5291 << std::endl;
5292 std::cout << "Subtree split = " << sumSubtreeSplit
5293 << std::endl;
5294 std::cout << "Subtree whole = " << sumSubtreeWhole
5295 << std::endl;
5296 std::cout << "Subtree change = " << sumSubtreeChange
5297 << std::endl;
5298
5299 std::cout << "----------------------------------" << std::endl;
5300
5301 //----------------------------------------------
5302 // Overall.
5303 //----------------------------------------------
5304
5305 std::cout << "Search CPU time = "<<timer_.getCpuTime() <<" seconds"
5306 << ", max = " << maxCpuTime
5307 << ", min = "<< minCpuTime
5308 << ", total CPU time = " << sumCpuTime << std::endl;
5309 std::cout << "Search wallclock = "<<timer_.getWallClock()
5310 <<" seconds"
5311 << ", max = " << maxWallClock
5312 << ", min = "<< minWallClock
5313 <<", total wallclock = " << sumWallClock << std::endl;
5314 if (hasKnowledge(AlpsKnowledgeTypeSolution)) {
5315 std::cout << "Best solution quality = " << getBestQuality()
5316 << " ; node required to find this solution = " << bestSolNode_
5317 << std::endl;
5318 }
5319 else {
5320 std::cout << "No solution was found." << std::endl;
5321 }
5322 std::cout << "====================================================="
5323 << std::endl << std::endl;
5324 } // EOF msgLevel_ > 0
5325 } // EOF master log
5326
5327 if (globalRank_ == masterRank_) {
5328 delete [] numProcessed; numProcessed = 0;
5329 delete [] numPartial; numPartial = 0;
5330 delete [] numBranched; numBranched = 0;
5331 delete [] numDiscarded; numDiscarded = 0;
5332 delete [] numLeft; numLeft = 0;
5333 delete [] depths; depths = 0;
5334 delete [] idles; idles = 0;
5335 delete [] rampUps; rampUps = 0;
5336 delete [] rampDowns; rampDowns = 0;
5337 delete [] cpuTimes;
5338 delete [] wallClocks;
5339 delete [] qualityBalance;
5340 delete [] quantityBalance;
5341 delete [] interBalance;
5342 delete [] intraBalance;
5343 delete [] workerAsk;
5344 delete [] donateSuccess;
5345 delete [] donateFail;
5346 delete [] subtreeSplit;
5347 delete [] subtreeWhole;
5348 delete [] subtreeChange;
5349 }
5350 }
5351
5352 //#############################################################################
5353
5354 // Explore a subtree from subtree pool for certain units of work and time.
5355 // Return how many nodes have been processed. The same subtree will be
5356 // explored next time if it still have unexplored nodes.
5357 AlpsReturnStatus
doOneUnitWork(int unitWork,double unitTime,AlpsExitStatus & exitStatus,int & numNodesProcessed,int & numNodesBranched,int & numNodesDiscarded,int & numNodesPartial,int & depth,bool & betterSolution)5358 AlpsKnowledgeBrokerMPI::doOneUnitWork(int unitWork,
5359 double unitTime,
5360 AlpsExitStatus & exitStatus,
5361 int & numNodesProcessed,
5362 int & numNodesBranched,
5363 int & numNodesDiscarded,
5364 int & numNodesPartial,
5365 int & depth,
5366 bool & betterSolution)
5367 {
5368
5369 AlpsReturnStatus rCode = AlpsReturnStatusOk;
5370
5371 numNodesProcessed = 0; /* Output */
5372 numNodesBranched = 0; /* Output */
5373 numNodesDiscarded = 0; /* Output */
5374 numNodesPartial = 0; /* Output */
5375
5376 if( !workingSubTree_ && !(subTreePool_->hasKnowledge()) ) {
5377 return rCode;
5378 }
5379
5380 if (forceTerminate_) {
5381 // Subtree from load balancing
5382 deleteSubTrees();
5383 return rCode;
5384 }
5385 #if 1
5386
5387 if ( ! needWorkingSubTree_ ) {
5388 // Already has a subtree working on.
5389
5390
5391 assert(workingSubTree_);
5392 rCode = workingSubTree_->exploreUnitWork(true, /* leaveAsIt*/
5393 unitWork,
5394 unitTime,
5395 exitStatus,
5396 numNodesProcessed,
5397 numNodesBranched, /* Output */
5398 numNodesDiscarded, /* Output */
5399 numNodesPartial, /* Output */
5400 treeDepth_,
5401 betterSolution);
5402
5403 if ( !(workingSubTree_->getNumNodes()) ) {
5404 delete workingSubTree_; // Empty subtree
5405 workingSubTree_ = NULL;
5406 needWorkingSubTree_ = true;
5407 }
5408 }
5409 else if( needWorkingSubTree_ && (subTreePool_->hasKnowledge()) ) {
5410
5411 workingSubTree_ = dynamic_cast<AlpsSubTree* >
5412 (subTreePool_->getKnowledge().first);
5413
5414 subTreePool_->popKnowledge(); // Remove from pool
5415 needWorkingSubTree_ = false; // Mark alread have one
5416
5417 #ifdef NF_DEBUG
5418 std::cout << "pop a subtree." << std::endl;
5419 #endif
5420
5421 rCode = workingSubTree_->exploreUnitWork(true,
5422 unitWork,
5423 unitTime,
5424 exitStatus,
5425 numNodesProcessed,
5426 numNodesBranched, /* Output */
5427 numNodesDiscarded, /* Output */
5428 numNodesPartial, /* Output */
5429 treeDepth_,
5430 betterSolution);
5431
5432 if ( !( workingSubTree_->getNumNodes()) ) {
5433 delete workingSubTree_; // Empty subtree
5434 workingSubTree_ = 0;
5435 needWorkingSubTree_ = true;
5436 }
5437 }
5438 else { // need subtree, but system has no workload
5439 if (workingSubTree_ != 0) {
5440 delete workingSubTree_;
5441 workingSubTree_ = 0;
5442 }
5443 }
5444 #endif
5445 return rCode;
5446 }
5447
5448 //#############################################################################
5449
5450 /** Initialize member data. */
5451 void
init()5452 AlpsKnowledgeBrokerMPI::init()
5453 {
5454 processNum_ = 0;
5455 globalRank_ = -1;
5456 clusterComm_ = MPI_COMM_NULL;
5457 hubComm_ = MPI_COMM_NULL;
5458 hubGroup_ = MPI_GROUP_NULL;
5459 clusterSize_ = 0;
5460 userClusterSize_ = 0;
5461 clusterRank_ = -1;
5462 hubRanks_ = 0;
5463 myHubRank_ = -1;
5464 masterRank_ = -1;
5465 processType_ = AlpsProcessTypeAny;
5466 processTypeList_ = NULL;
5467 hubWork_ = false;
5468 subTreeRequest_ = MPI_REQUEST_NULL;
5469 solRequestL_ = MPI_REQUEST_NULL;
5470 solRequestR_ = MPI_REQUEST_NULL;
5471 modelKnowRequestL_ = MPI_REQUEST_NULL;
5472 modelKnowRequestR_ = MPI_REQUEST_NULL;
5473 forwardRequestL_ = MPI_REQUEST_NULL;
5474 forwardRequestR_ = MPI_REQUEST_NULL;
5475 incumbentValue_ = ALPS_INC_MAX;
5476 incumbentID_ = 0;
5477 updateIncumbent_ = false;
5478 workQuality_ = ALPS_OBJ_MAX; // Worst possible
5479 clusterWorkQuality_ = ALPS_OBJ_MAX;
5480 systemWorkQuality_ = ALPS_OBJ_MAX;
5481 hubWorkQualities_ = 0;
5482 workerWorkQualities_ = 0;
5483 workQuantity_ = 0.0;
5484 clusterWorkQuantity_ = 0.0;
5485 systemWorkQuantity_ = 0.0;
5486 systemWorkQuantityForce_ = 0.0;
5487 hubWorkQuantities_ = 0;
5488 workerWorkQuantities_ = 0;
5489 workerReported_ = 0;
5490 hubReported_ = 0;
5491 allHubReported_ = false;
5492 masterDoBalance_ = 0;
5493 hubDoBalance_ = 0;
5494 workerNodeProcesseds_ = 0;
5495 clusterNodeProcessed_ = 0;
5496 hubNodeProcesseds_ = 0;
5497 sendCount_ = 0;
5498 recvCount_ = 0;
5499 clusterSendCount_ = 0;
5500 clusterRecvCount_ = 0;
5501 systemSendCount_ = 0;
5502 systemRecvCount_ = 0;
5503 masterIndexBatch_ = 0;
5504 rampUpTime_ = 0.0;
5505 rampDownTime_ = 0.0;
5506 idleTime_ = 0.0;
5507 msgTime_ = 0.0;
5508
5509 psStats_.qualityBalance_ = 0;
5510 psStats_.quantityBalance_ = 0;
5511 psStats_.interBalance_ = 0;
5512 psStats_.intraBalance_ = 0;
5513 psStats_.workerAsk_ = 0;
5514 psStats_.donateSuccess_ = 0;
5515 psStats_.donateFail_ = 0;
5516 psStats_.subtreeSplit_ = 0;
5517 psStats_.subtreeWhole_ = 0;
5518 psStats_.subtreeChange_ = 0;
5519
5520 forceTerminate_ = false;
5521 blockTermCheck_ = true;
5522 blockHubReport_ = false;
5523 blockWorkerReport_ = false;
5524 blockAskForWork_ = false;
5525 attachBuffer_ = 0;
5526 largeBuffer_ = 0;
5527 largeBuffer2_ = 0;
5528 smallBuffer_ = 0;
5529 masterBalancePeriod_ = 0.01;
5530 hubReportPeriod_ = 0.01;
5531 modelGenID_ = -1;
5532 modelGenPos_ = -1;
5533
5534 rampUpSubTree_ = 0;
5535 unitWorkNodes_ = 0;
5536 haltSearch_ = false;
5537
5538 userBalancePeriod_ = false;
5539 }
5540
5541 //#############################################################################
5542
5543 /** Destructor. */
~AlpsKnowledgeBrokerMPI()5544 AlpsKnowledgeBrokerMPI::~AlpsKnowledgeBrokerMPI()
5545 {
5546 if (workerWorkQualities_) {
5547 delete [] workerWorkQualities_;
5548 workerWorkQualities_ = 0;
5549 }
5550
5551 // std::cout << "Here1 -- " << globalRank_ << std::endl;
5552 if (hubRanks_) {
5553 delete [] hubRanks_; hubRanks_ = 0;
5554 }
5555 // std::cout << "Here2 -- " << globalRank_ << std::endl;
5556 if (processTypeList_) {
5557 delete [] processTypeList_; processTypeList_ = NULL;
5558 }
5559 // std::cout << "Here3 -- " << globalRank_ << std::endl;
5560 if (hubWorkQualities_) {
5561 delete [] hubWorkQualities_; hubWorkQualities_ = 0;
5562 }
5563 // std::cout << "Here4 -- " << globalRank_
5564 // << "c size = " << clusterSize_ << std::endl;
5565
5566 if (hubWorkQuantities_) {
5567 delete [] hubWorkQuantities_; hubWorkQuantities_ = 0;
5568 }
5569 if (workerWorkQuantities_){
5570 delete [] workerWorkQuantities_;
5571 workerWorkQuantities_ = 0;
5572 }
5573 // std::cout << "Here5 -- " << globalRank_ << std::endl;
5574
5575 if (workerReported_) {
5576 delete [] workerReported_; workerReported_ = 0;
5577 }
5578 if (hubReported_) {
5579 delete [] hubReported_; hubReported_ = 0;
5580 }
5581 if (workerNodeProcesseds_) {
5582 delete [] workerNodeProcesseds_;
5583 workerNodeProcesseds_ = 0;
5584 }
5585 if (hubNodeProcesseds_) {
5586 delete [] hubNodeProcesseds_;
5587 hubNodeProcesseds_ = 0;
5588 }
5589 #if 0
5590 if (attachBuffer_) {
5591 delete [] attachBuffer_;
5592 attachBuffer_ = 0;
5593 }
5594 #endif
5595 if (largeBuffer_) {
5596 delete [] largeBuffer_;
5597 largeBuffer_ = 0;
5598 }
5599 if (largeBuffer2_) {
5600 delete [] largeBuffer2_;
5601 largeBuffer2_ = 0;
5602 }
5603 if (smallBuffer_) {
5604 delete [] smallBuffer_;
5605 smallBuffer_ = 0;
5606 }
5607
5608 if (rampUpSubTree_) {
5609 delete rampUpSubTree_;
5610 rampUpSubTree_ = 0;
5611 }
5612 // Terminate MPI environment.
5613 MPI_Finalize();
5614 }
5615
5616 //#############################################################################
5617
5618 void
printBestSolution(char * outputFile) const5619 AlpsKnowledgeBrokerMPI::printBestSolution(char* outputFile) const
5620 {
5621 if (globalRank_ == masterRank_) {
5622
5623 if (msgLevel_ < 1) return;
5624
5625 if (outputFile != 0) {
5626 // Write to outputFile
5627 std::ofstream os(outputFile);
5628 if (hasKnowledge(AlpsKnowledgeTypeSolution)) {
5629 os << "Cost = " << getBestQuality() << std::endl;
5630 dynamic_cast<AlpsSolution* >
5631 (getBestKnowledge(AlpsKnowledgeTypeSolution).first)->print(os);
5632 }
5633 else {
5634 os << "No solution was found." << std::endl;
5635 }
5636 }
5637 else {
5638 // Write to std::cout
5639 std::cout << std::endl;
5640 if (getSolStatus() == AlpsExitStatusOptimal) {
5641 std::cout << "================= OPTIMAL SOLUTION =================="
5642 << std::endl;
5643 }
5644 else {
5645 std::cout << "================= BEST SOLUTION FOUND ==============="
5646 << std::endl;
5647 }
5648
5649 if (hasKnowledge(AlpsKnowledgeTypeSolution)) {
5650 std::cout << "Cost = " << getBestQuality()
5651 << std::endl;
5652 dynamic_cast<AlpsSolution* >(getBestKnowledge(AlpsKnowledgeTypeSolution).first)->print(std::cout);
5653 }
5654 else {
5655 std::cout << "No solution was found." << std::endl;
5656 }
5657 std::cout << "====================================================="
5658 << std::endl << std::endl;
5659 }
5660 }
5661 }
5662
5663 //#############################################################################
5664
5665 void
incSendCount(const char * how,int s)5666 AlpsKnowledgeBrokerMPI::incSendCount(const char* how, int s)
5667 {
5668 if (msgLevel_ > 100) {
5669 messageHandler()->message(ALPS_MSG_HOW, messages())
5670 << globalRank_ << "increment send" << s << how << CoinMessageEol;
5671 }
5672
5673 sendCount_ += s;
5674 }
5675
5676 //#############################################################################
5677
5678 /** Decrement the number of sent message. */
5679 void
decSendCount(const char * how,int s)5680 AlpsKnowledgeBrokerMPI::decSendCount(const char* how, int s)
5681 {
5682 if (msgLevel_ > 100) {
5683 messageHandler()->message(ALPS_MSG_HOW, messages())
5684 << globalRank_ << "decrement send" << s << how << CoinMessageEol;
5685 }
5686
5687 sendCount_ -= s;
5688 }
5689
5690 //#############################################################################
5691
5692 /** Increment the number of received message. */
5693 void
incRecvCount(const char * how,int s)5694 AlpsKnowledgeBrokerMPI::incRecvCount(const char* how, int s)
5695 {
5696 if (msgLevel_ > 100) {
5697 messageHandler()->message(ALPS_MSG_HOW, messages())
5698 << globalRank_ << "increment recieve" << s << how <<CoinMessageEol;
5699 }
5700
5701 recvCount_ += s;
5702 }
5703
5704 //#############################################################################
5705
5706 /** Decrement the number of sent message. */
5707 void
decRecvCount(const char * how,int s)5708 AlpsKnowledgeBrokerMPI::decRecvCount(const char* how, int s)
5709 {
5710 if (msgLevel_ > 100) {
5711 messageHandler()->message(ALPS_MSG_HOW, messages())
5712 << globalRank_ << "decrement recieve" <<s<< how<< CoinMessageEol;
5713 }
5714
5715 recvCount_ -= s;
5716 }
5717
5718 //#############################################################################
5719
5720 /** Set knowlege to receiver. */
5721 void
sendKnowledge(AlpsKnowledgeType type,int sender,int receiver,char * & msgBuffer,int msgSize,int msgTag,MPI_Comm comm,bool blocking)5722 AlpsKnowledgeBrokerMPI::sendKnowledge(AlpsKnowledgeType type,
5723 int sender,
5724 int receiver,
5725 char *& msgBuffer,
5726 int msgSize,
5727 int msgTag,
5728 MPI_Comm comm,
5729 bool blocking)
5730 {
5731 switch (type) {
5732 case AlpsKnowledgeTypeModel:
5733 break;
5734 case AlpsKnowledgeTypeModelGen:
5735 sendModelKnowledge(comm, receiver);
5736 break;
5737 case AlpsKnowledgeTypeNode:
5738 sendRampUpNode(receiver, comm);
5739 break;
5740 case AlpsKnowledgeTypeSolution:
5741 sendIncumbent();
5742 break;
5743 case AlpsKnowledgeTypeSubTree:
5744 break;
5745 default:
5746 assert(0);
5747 throw CoinError("Unknown knowledge type", "sendKnowledge",
5748 "AlpsKnowledgeBrokerMPI");
5749 }
5750 }
5751
5752 //#############################################################################
5753
5754 /** Receive knowlege from sender. */
5755 void
receiveKnowledge(AlpsKnowledgeType type,int sender,int receiver,char * & msgBuffer,int msgSize,int msgTag,MPI_Comm comm,MPI_Status * status,bool blocking)5756 AlpsKnowledgeBrokerMPI::receiveKnowledge(AlpsKnowledgeType type,
5757 int sender,
5758 int receiver,
5759 char *& msgBuffer,
5760 int msgSize,
5761 int msgTag,
5762 MPI_Comm comm,
5763 MPI_Status* status,
5764 bool blocking)
5765 {
5766 switch (type) {
5767 case AlpsKnowledgeTypeModel:
5768 break;
5769 case AlpsKnowledgeTypeModelGen:
5770 receiveModelKnowledge(comm);
5771 break;
5772 case AlpsKnowledgeTypeNode:
5773 receiveRampUpNode(sender, comm, status);
5774 break;
5775 case AlpsKnowledgeTypeSolution:
5776 break;
5777 case AlpsKnowledgeTypeSubTree:
5778 break;
5779 default:
5780 assert(0);
5781 throw CoinError("Unknown knowledge type", "receiveKnowledge",
5782 "AlpsKnowledgeBrokerMPI");
5783 }
5784 }
5785
5786 //#############################################################################
5787
5788 void
requestKnowledge(AlpsKnowledgeType type,int sender,int receiver,char * & msgBuffer,int msgSize,int msgTag,MPI_Comm comm,bool blocking)5789 AlpsKnowledgeBrokerMPI::requestKnowledge(AlpsKnowledgeType type,
5790 int sender,
5791 int receiver,
5792 char *& msgBuffer,
5793 int msgSize,
5794 int msgTag,
5795 MPI_Comm comm,
5796 bool blocking)
5797 {
5798 switch (type) {
5799 case AlpsKnowledgeTypeModel:
5800 break;
5801 case AlpsKnowledgeTypeModelGen:
5802 break;
5803 case AlpsKnowledgeTypeNode:
5804 break;
5805 case AlpsKnowledgeTypeSolution:
5806 break;
5807 case AlpsKnowledgeTypeSubTree:
5808 break;
5809 default:
5810 assert(0);
5811 throw CoinError("Unknown knowledge type", "requestKnowledge",
5812 "AlpsKnowledgeBrokerMPI");
5813 }
5814 }
5815
5816 //#############################################################################
5817
5818 /** Set generated knowlege (related to model) to receiver. */
5819 // NOTE: comm is hubComm_ or MPI_COMM_WORLD.
5820 void
forwardModelKnowledge()5821 AlpsKnowledgeBrokerMPI::forwardModelKnowledge()
5822 {
5823 assert(modelGenPos_ > 0);
5824
5825 // Binary forewarding
5826 int mySeq = rankToSequence(modelGenID_, globalRank_);
5827 int leftSeq = leftSequence(mySeq, processNum_);
5828 int rightSeq = rightSequence(mySeq, processNum_);
5829
5830 #if 0
5831 std::cout << "++++ forwardModelKnowledge: mySeq = " << mySeq
5832 << ", leftSeq = " << leftSeq
5833 << ", rightSeq = " << rightSeq
5834 << ", modelGenID_ = " << modelGenID_
5835 << ", globalRank_ = " << globalRank_
5836 << std::endl;
5837 #endif
5838
5839 if (leftSeq != -1) {
5840 if (forwardRequestL_ != MPI_REQUEST_NULL) {
5841 // Test if previous sending has completed.
5842 int alreadySent = false;
5843 MPI_Status sentStatus;
5844 MPI_Test(&forwardRequestL_, &alreadySent, &sentStatus);
5845 if (!alreadySent) { // Previous sending hasn't complete
5846 std::cout << "++++ Previous left forwardModelKnowledge hasn't finished."
5847 << std::endl;
5848 return;
5849 }
5850 }
5851
5852 // Note: modelGenPos_ is set by recieveModelKnowledge
5853 int leftRank = sequenceToRank(modelGenID_, leftSeq);
5854 MPI_Isend(largeBuffer_, modelGenPos_, MPI_PACKED, leftRank,
5855 AlpsMsgModelGenSearch, MPI_COMM_WORLD, &forwardRequestL_);
5856 #if 0
5857 std::cout << "++++ forwardModelKnowledge: leftRank = "
5858 << leftRank << std::endl;
5859 #endif
5860 MPI_Status sentStatusL;
5861 MPI_Wait(&forwardRequestL_, &sentStatusL);
5862 incSendCount("forwardModelKnowledge during search");
5863 }
5864
5865 if (rightSeq != -1) {
5866 if (forwardRequestR_ != MPI_REQUEST_NULL) {
5867 // Test if previous sending has completed.
5868 int alreadySent = false;
5869 MPI_Status sentStatus;
5870 MPI_Test(&forwardRequestR_, &alreadySent, &sentStatus);
5871 if (!alreadySent) { // Previous sending hasn't complete
5872 std::cout << "++++ Previous right forwardModelKnowledge hasn't finished."
5873 << std::endl;
5874 return;
5875 }
5876 }
5877 int rightRank = sequenceToRank(modelGenID_, rightSeq);
5878 #if 0
5879 std::cout << "++++ forwardModelKnowledge: rightRank = "
5880 << rightRank << std::endl;
5881 #endif
5882 MPI_Isend(largeBuffer_, modelGenPos_, MPI_PACKED, rightRank,
5883 AlpsMsgModelGenSearch, MPI_COMM_WORLD, &forwardRequestR_);
5884 MPI_Status sentStatusR;
5885 MPI_Wait(&forwardRequestR_, &sentStatusR);
5886 incSendCount("forwardModelKnowledge during search");
5887 }
5888 }
5889
5890 //#############################################################################
5891
5892 /** Set generated knowlege (related to model) to receiver. */
5893 // NOTE: comm is hubComm_ or MPI_COMM_WORLD.
5894 // During search, need use buffered send since other process process may send
5895 // this process at the same time, then will be deadlock.
5896 void
sendModelKnowledge(MPI_Comm comm,int receiver)5897 AlpsKnowledgeBrokerMPI::sendModelKnowledge(MPI_Comm comm, int receiver)
5898 {
5899 int size = largeSize_;
5900 int position = 0;
5901
5902 bool hasKnowledge = false;
5903
5904 assert(largeBuffer2_);
5905
5906 // Pack original sender's global rank
5907 position = 0;
5908 MPI_Pack(&globalRank_, 1, MPI_INT, largeBuffer2_, largeSize_,
5909 &position, comm);
5910
5911 //std::cout << "----- 1. position = " << position << std::endl;
5912
5913 // Retrieve and pack generated model knowledge
5914 AlpsEncoded *encoded = model_->packSharedKnowlege();
5915
5916 if (encoded) {
5917 hasKnowledge = true;
5918 // Pack into local buffer
5919 packEncoded(encoded, largeBuffer2_, size, position, comm);
5920 }
5921 #if 0
5922 else {
5923 std::cout << "---- Process[" << globalRank_
5924 << "]: have no mode knowledge to share." << std::endl;
5925 }
5926 #endif
5927
5928 //std::cout << "----- 2. position = " << position
5929 // << ", size = " << size << std::endl;
5930 delete encoded;
5931
5932 if (phase_ == AlpsPhaseRampup) {
5933 assert(comm == clusterComm_ || comm == hubComm_);
5934 assert(receiver > -1);
5935
5936 if (hasKnowledge) {
5937 // Send packed knowledge of size "position"
5938 MPI_Send(largeBuffer2_, position, MPI_PACKED, receiver,
5939 AlpsMsgModelGenRampUp, comm);
5940 incSendCount("sendModelKnowledge during rampup");
5941 }
5942 else {
5943 // Send an empty message
5944 MPI_Send(largeBuffer2_, 0, MPI_PACKED, receiver,
5945 AlpsMsgModelGenRampUp, comm);
5946 incSendCount("sendModelKnowledge during rampup");
5947 }
5948 }
5949 else if ( hasKnowledge && (phase_ == AlpsPhaseSearch) ) {
5950 assert(comm == MPI_COMM_WORLD);
5951 #if 0
5952 // Attach buffer
5953 if (!attachBuffer_) {
5954 int attachSize = 2 * (largeSize_ + MPI_BSEND_OVERHEAD );
5955 attachBuffer_ = new char [attachSize];
5956 MPI_Buffer_attach(attachBuffer_, attachSize);
5957 }
5958 #endif
5959
5960 #if 0
5961 std::cout << "---- Process[" << globalRank_
5962 << "]: Share mode knowledge by buffer sending, "
5963 << "position " << position << std::endl;
5964 #endif
5965
5966 // During search, binary sending
5967 int mySeq = rankToSequence(globalRank_, globalRank_);
5968 int leftSeq = leftSequence(mySeq, processNum_);
5969 int rightSeq = rightSequence(mySeq, processNum_);
5970
5971 if (leftSeq != -1) {
5972 if (modelKnowRequestL_ != MPI_REQUEST_NULL) {
5973 // Test if previous sending has completed.
5974 int alreadySent = false;
5975 MPI_Status sentStatus;
5976 MPI_Test(&modelKnowRequestL_, &alreadySent, &sentStatus);
5977 if (!alreadySent) { // Previous sending hasn't complete
5978 return;
5979 }
5980 }
5981 int leftRank = sequenceToRank(globalRank_, leftSeq);
5982 // Buffered send
5983 //MPI_Bsend(largeBuffer2_, position, MPI_PACKED, leftRank,
5984 // AlpsMsgModelGenSearch, comm);
5985 //MPI_Send(largeBuffer2_, position, MPI_PACKED, leftRank,
5986 // AlpsMsgModelGenSearch, comm);
5987 MPI_Isend(largeBuffer2_, position, MPI_PACKED, leftRank,
5988 AlpsMsgModelGenSearch, comm, &modelKnowRequestL_);
5989 MPI_Status sentStatusL;
5990 MPI_Wait(&forwardRequestL_, &sentStatusL);
5991 incSendCount("sendModelKnowledge during search");
5992 }
5993
5994 if (rightSeq != -1) {
5995 if (modelKnowRequestR_ != MPI_REQUEST_NULL) {
5996 // Test if previous sending has completed.
5997 int alreadySent = false;
5998 MPI_Status sentStatus;
5999 MPI_Test(&modelKnowRequestR_, &alreadySent, &sentStatus);
6000 if (!alreadySent) { // Previous sending hasn't complete
6001 return;
6002 }
6003 }
6004 int rightRank = sequenceToRank(globalRank_, rightSeq);
6005 // Buffered send
6006 //MPI_Bsend(largeBuffer2_, position, MPI_PACKED, rightRank,
6007 // AlpsMsgModelGenSearch, comm);
6008 //MPI_Send(largeBuffer2_, position, MPI_PACKED, rightRank,
6009 // AlpsMsgModelGenSearch, comm);
6010 MPI_Isend(largeBuffer2_, position, MPI_PACKED, rightRank,
6011 AlpsMsgModelGenSearch, comm, &modelKnowRequestR_);
6012 MPI_Status sentStatusR;
6013 MPI_Wait(&forwardRequestR_, &sentStatusR);
6014 incSendCount("sendModelKnowledge during search");
6015 }
6016 }
6017 }
6018
6019 //#############################################################################
6020
6021 /** Receive generated knowlege (related to model) from sender. */
6022 // NOTE: comm is hubComm_ or MPI_COMM_WORLD.
6023 void
receiveModelKnowledge(MPI_Comm comm)6024 AlpsKnowledgeBrokerMPI::receiveModelKnowledge(MPI_Comm comm)
6025 {
6026 int position = 0;
6027 int count = 0;
6028 MPI_Status status;
6029 bool hasKnowledge = true;
6030
6031 char *localBuffer = 0;// FIXME: add a largebuffer2_
6032
6033 if (phase_ == AlpsPhaseRampup) {
6034 localBuffer = new char [largeSize_];
6035
6036 // std::cout << "PROCESS[" << globalRank_ << "] : B: recive gen model"
6037 // << std::endl;
6038
6039 // Receive
6040 MPI_Recv(localBuffer, largeSize_, MPI_PACKED, MPI_ANY_SOURCE,
6041 AlpsMsgModelGenRampUp, comm, &status);
6042
6043 MPI_Get_count(&status, MPI_PACKED, &count);
6044
6045 if (count < 1) {
6046 hasKnowledge = false;
6047 }
6048
6049 incRecvCount("receiveModelKnowledge during rampup");
6050
6051 //std::cout << "PROCESS[" << globalRank_ << "] : A: recive gen model"
6052 // << ", count = " << count << std::endl;
6053 }
6054 else if (phase_ == AlpsPhaseSearch) {
6055 localBuffer = largeBuffer_;
6056 }
6057 else {
6058 assert(0);
6059 }
6060
6061 if (hasKnowledge) {
6062 // Upack original sender's global rank
6063 position = 0; // Start from position 0 in local buffer
6064 MPI_Unpack(localBuffer, largeSize_, &position, &modelGenID_, 1,
6065 MPI_INT, comm);
6066
6067 //std::cout << "PROCESS[" << globalRank_ << "] : recive gen model"
6068 // << ", count = " << count << ", from " << modelGenID_
6069 // << std::endl;
6070
6071 // Upack knowledge from buffer.
6072 AlpsEncoded* encodedModelGen = unpackEncoded(localBuffer,
6073 position,
6074 comm,
6075 largeSize_);
6076
6077 // Record actuall buffer size for broadcasting
6078 modelGenPos_ = position;
6079
6080 // Upack and store knowledge from larger buffer.
6081 model_->unpackSharedKnowledge(*encodedModelGen);
6082
6083 delete encodedModelGen;
6084 encodedModelGen = 0;
6085 }
6086
6087 if ( (phase_ == AlpsPhaseRampup) && localBuffer ) {
6088 delete [] localBuffer;
6089 }
6090 localBuffer = 0;
6091 }
6092
6093 //#############################################################################
6094
6095 void
sendErrorCodeToMaster(int errorCode)6096 AlpsKnowledgeBrokerMPI::sendErrorCodeToMaster(int errorCode)
6097 {
6098 int size = model_->AlpsPar()->entry(AlpsParams::smallSize);
6099 int pos = 0;
6100 MPI_Pack(&errorCode, 1, MPI_INT, smallBuffer_, size, &pos, MPI_COMM_WORLD);
6101 MPI_Send(smallBuffer_, pos, MPI_PACKED, masterRank_, AlpsMsgErrorCode,
6102 MPI_COMM_WORLD);
6103 incSendCount("sendErrorCodeToMaster");
6104 }
6105
6106 //#############################################################################
6107
6108 void
recvErrorCode(char * & bufLarge)6109 AlpsKnowledgeBrokerMPI::recvErrorCode(char *& bufLarge)
6110 {
6111 int size = 10;
6112 int pos = -1;
6113 int errorCode = 0;
6114
6115 MPI_Unpack(bufLarge, size, &pos, &errorCode, 1, MPI_INT, MPI_COMM_WORLD);
6116 if (errorCode == 1) {
6117 // out of memory
6118 exitStatus_ = AlpsExitStatusNoMemory;
6119 }
6120 else {
6121 // error
6122 exitStatus_ = AlpsExitStatusFailed;
6123 }
6124
6125 forceTerminate_ = true;
6126 }
6127
6128 //#############################################################################
6129
6130 void
rootInitMaster(AlpsTreeNode * root)6131 AlpsKnowledgeBrokerMPI::rootInitMaster(AlpsTreeNode* root)
6132 {
6133 int i = 0;
6134 int requiredNumNodes =
6135 model_->AlpsPar()->entry(AlpsParams::masterInitNodeNum);
6136 double rampUpTimeMaster = 0.0;
6137
6138 AlpsNodePool* tempNodePool= NULL;
6139
6140 if (msgLevel_ > 0) {
6141 messageHandler()->message(ALPS_STATIC_BALANCE_BEG, messages())
6142 << "the Two-level Root Initialization" << CoinMessageEol;
6143 }
6144
6145 //------------------------------------------------------
6146 // Create required number of subtrees(nodes) for hubs.
6147 //------------------------------------------------------
6148
6149 if (msgLevel_ > 0 && requiredNumNodes > 0) {
6150 messageHandler()->message(ALPS_RAMPUP_MASTER_START, messages())
6151 << globalRank_ << requiredNumNodes << CoinMessageEol;
6152 }
6153
6154 rampUpSubTree_ = dynamic_cast<AlpsSubTree*>
6155 (const_cast<AlpsKnowledge *>(decoderObject(AlpsKnowledgeTypeSubTree)))
6156 ->newSubTree();
6157 rampUpSubTree_->setBroker(this);
6158 rampUpSubTree_->setNodeSelection(rampUpNodeSelection_);
6159 rampUpSubTree_->setNextIndex(1); // One more than root's index
6160
6161 nodeProcessedNum_ += rampUpSubTree_->rampUp(hubNum_,
6162 requiredNumNodes,
6163 treeDepth_,
6164 root);
6165
6166 int numGenNodes = rampUpSubTree_->nodePool()->getNumKnowledges();
6167
6168 //------------------------------------------------------
6169 // Spawn hub/worker processes (For dynamic process management only)
6170 // Nothing now. FILL IN HERE IF NEEDED LATER.
6171 //------------------------------------------------------
6172
6173 //------------------------------------------------------
6174 // Distribute subtrees(nodes) to hubs in Round-robin way.
6175 //------------------------------------------------------
6176
6177 // Temporary store node for hub 0(master)
6178 tempNodePool = new AlpsNodePool;
6179 tempNodePool->setNodeSelection(*rampUpNodeSelection_);
6180
6181 if (numGenNodes <= 0) {
6182 if (msgLevel_ > 0) {
6183 messageHandler()->message(ALPS_RAMPUP_MASTER_FAIL, messages())
6184 << globalRank_ << CoinMessageEol;
6185 }
6186 }
6187 else {
6188 int numSent = 0;
6189 while (numSent < numGenNodes) {
6190 for (i = 0; i < hubNum_; ++i) {
6191 if(numSent >= numGenNodes) {
6192 break; // Break in the sending round
6193 }
6194 if (hubRanks_[i] != globalRank_) {
6195 // NOTE: master's rank is always 0 in hubComm_.
6196 if (msgLevel_ > 100) {
6197 std::cout << "master send a node to hub "
6198 << hubRanks_[i]
6199 << std::endl;
6200 }
6201 sendKnowledge(AlpsKnowledgeTypeNode,
6202 globalRank_,
6203 i,
6204 smallBuffer_,
6205 0,
6206 MPI_ANY_TAG,
6207 hubComm_,
6208 true);
6209
6210 }
6211 else {
6212 AlpsTreeNode* nodeM = dynamic_cast<AlpsTreeNode* >
6213 (rampUpSubTree_->nodePool()->getKnowledge().first);
6214 rampUpSubTree_->nodePool()->popKnowledge();
6215 tempNodePool->addKnowledge(nodeM, nodeM->getQuality());
6216 }
6217 ++numSent;
6218 }
6219 }
6220 }
6221
6222 //------------------------------------------------------
6223 // Move nodes from tempNodePool to rampUpSubTree_'s node pool.
6224 // NOTE: These nodes do not necessary form a subtree. We want to use
6225 // subtree's functions to generates more nodes for workers.
6226 //------------------------------------------------------
6227
6228 while (tempNodePool->hasKnowledge()) {
6229 AlpsTreeNode* nodeT = dynamic_cast<AlpsTreeNode* >
6230 (tempNodePool->getKnowledge().first);
6231 double valT = tempNodePool->getKnowledge().second;
6232 rampUpSubTree_->nodePool()->addKnowledge(nodeT, valT);
6233 tempNodePool->popKnowledge();
6234 }
6235
6236 //------------------------------------------------------
6237 // Tell hubs no more nodes to send.
6238 //------------------------------------------------------
6239
6240 for (i = 0; i < hubNum_; ++i) {
6241 if (hubRanks_[i] != globalRank_) sendFinishInit(i, hubComm_);
6242 }
6243
6244 //------------------------------------------------------
6245 // Sent generated model knowledge to hubs.
6246 //------------------------------------------------------
6247
6248 for (i = 0; i < hubNum_; ++i) {
6249 if (hubRanks_[i] != globalRank_) {
6250 sendKnowledge(AlpsKnowledgeTypeModelGen,
6251 masterRank_,
6252 i, // receiver
6253 largeBuffer2_,
6254 0,
6255 AlpsMsgModelGenRampUp,
6256 hubComm_, // comm
6257 true);
6258 //sendModelKnowledge(genBuf, AlpsMsgModelGenRampUp, hubComm_, i);
6259 }
6260 }
6261 if (msgLevel_ > 0) {
6262 messageHandler()->message(ALPS_KNOWLEDGE_GEN, messages())
6263 << globalRank_ << CoinMessageEol;
6264 }
6265
6266 // Master's rampup stops.
6267 rampUpTimeMaster = masterTimer_.getTime();
6268
6269 //------------------------------------------------------
6270 // If have solution, broadcast its value and process id.
6271 //------------------------------------------------------
6272
6273 if (hasKnowledge(AlpsKnowledgeTypeSolution)) {
6274 double incVal = getBestKnowledge(AlpsKnowledgeTypeSolution).second;
6275 if(incVal < incumbentValue_) { // Assume Minimization
6276 incumbentValue_ = incVal;
6277 incumbentID_ = globalRank_;
6278 sendKnowledge(AlpsKnowledgeTypeSolution, //useful
6279 globalRank_,
6280 0,
6281 smallBuffer_,
6282 0,
6283 MPI_ANY_TAG,
6284 MPI_COMM_WORLD,
6285 false);
6286 //sendIncumbent();
6287 if (msgLevel_ > 0) {
6288 messageHandler()->message(ALPS_RAMPUP_MASTER_SOL, messages())
6289 << globalRank_ << incVal << CoinMessageEol;
6290 }
6291 }
6292 }
6293
6294 //------------------------------------------------------
6295 // Print out statistics about root ramp up.
6296 //------------------------------------------------------
6297
6298 if (msgLevel_ > 0) {
6299 messageHandler()->message(ALPS_RAMPUP_MASTER, messages())
6300 << globalRank_ << rampUpTimeMaster
6301 << nodeProcessedNum_ << numGenNodes
6302 << CoinMessageEol;
6303 }
6304
6305 if (nodeProcessedNum_) {
6306 nodeProcessingTime_ = rampUpTimeMaster/nodeProcessedNum_;
6307 }
6308
6309 //------------------------------------------------------
6310 // Generate and send required number of nodes for master's workers.
6311 //------------------------------------------------------
6312
6313 requiredNumNodes = model_->AlpsPar()->entry(AlpsParams::hubInitNodeNum);
6314
6315 if (msgLevel_ > 0 && requiredNumNodes > 0) {
6316 messageHandler()->message(ALPS_RAMPUP_HUB_START, messages())
6317 << globalRank_ << requiredNumNodes << CoinMessageEol;
6318 }
6319
6320 int treeSizeByHub = rampUpSubTree_->rampUp(clusterSize_ - 1,
6321 requiredNumNodes,
6322 treeDepth_);
6323
6324 nodeProcessedNum_ += treeSizeByHub;
6325 const int numNode2 = rampUpSubTree_->nodePool()->getNumKnowledges();
6326
6327 if (numNode2 == 0) {
6328 if (msgLevel_ > 0) {
6329 messageHandler()->message(ALPS_RAMPUP_HUB_FAIL, messages())
6330 << globalRank_ << CoinMessageEol;
6331 }
6332 }
6333 else { // Send nodes to my workers
6334 int numSent2 = 0;
6335 while (numSent2 < numNode2) {
6336 for (i = 0; i < clusterSize_; ++i) {
6337 if(numSent2 >= numNode2) {
6338 break;
6339 }
6340 if (i != clusterRank_) {
6341 sendKnowledge(AlpsKnowledgeTypeNode,
6342 globalRank_,
6343 i,
6344 smallBuffer_,
6345 0,
6346 MPI_ANY_TAG,
6347 clusterComm_,
6348 true);
6349
6350 ++numSent2;
6351 if (msgLevel_ > 200) {
6352 std::cout << "MASTER/HUB " << clusterRank_
6353 <<" : sent nodes to Worker "
6354 << i << std::endl;
6355 }
6356 }
6357 }
6358 }
6359 }
6360
6361 assert(rampUpSubTree_->getNumNodes() == 0);
6362 rampUpSubTree_->nullRootActiveNode();
6363
6364 //------------------------------------------------------
6365 // Tell its workers no more nodes to send.
6366 //------------------------------------------------------
6367
6368 for (i = 0; i < clusterSize_; ++i) {
6369 if (i != clusterRank_) {
6370 sendFinishInit(i, clusterComm_);
6371 }
6372 }
6373
6374 //------------------------------------------------------
6375 // Sent generated model knowledge to its workers.
6376 //------------------------------------------------------
6377
6378 for (i = 0; i < clusterSize_; ++i) {
6379 if (i != clusterRank_) {
6380 sendKnowledge(AlpsKnowledgeTypeModelGen,
6381 clusterRank_,
6382 i, // receiver
6383 largeBuffer2_,
6384 0,
6385 AlpsMsgModelGenRampUp,
6386 clusterComm_, // comm
6387 true);
6388 //sendModelKnowledge(genBuf, AlpsMsgModelGenRampUp, clusterComm_, i);
6389 }
6390 }
6391
6392 //------------------------------------------------------
6393 // If have better solution, broadcast its value and process id.
6394 //------------------------------------------------------
6395
6396 if (hasKnowledge(AlpsKnowledgeTypeSolution)) {
6397 double incVal = getBestKnowledge(AlpsKnowledgeTypeSolution).second;
6398 if(incVal < incumbentValue_) { // Assume Minimization
6399 incumbentValue_ = incVal;
6400 incumbentID_ = globalRank_;
6401 sendKnowledge(AlpsKnowledgeTypeSolution, //useful
6402 globalRank_,
6403 0,
6404 smallBuffer_,
6405 0,
6406 MPI_ANY_TAG,
6407 MPI_COMM_WORLD,
6408 false);
6409 //sendIncumbent();
6410 if (msgLevel_ > 0) {
6411 messageHandler()->message(ALPS_RAMPUP_HUB_SOL, messages())
6412 << globalRank_ << incVal << CoinMessageEol;
6413 }
6414 }
6415 }
6416
6417 rampUpTime_ = masterTimer_.getTime();
6418
6419 if (msgLevel_ > 0) {
6420 messageHandler()->message(ALPS_RAMPUP_HUB, messages())
6421 << globalRank_ << (rampUpTime_ - rampUpTimeMaster)
6422 << treeSizeByHub << numNode2 << CoinMessageEol;
6423 }
6424
6425 if (treeSizeByHub) {
6426 nodeProcessingTime_=0.5*(nodeProcessingTime_ +
6427 (rampUpTime_-rampUpTimeMaster)/treeSizeByHub);
6428 }
6429
6430 // Reset to normal selection.
6431 rampUpSubTree_->setNodeSelection(nodeSelection_);
6432
6433 // Free pool.
6434 delete tempNodePool;
6435
6436 if (msgLevel_ > 0) {
6437 messageHandler()->message(ALPS_STATIC_BALANCE_END, messages())
6438 << "the Two-level Root Initialization" << CoinMessageEol;
6439 }
6440 }
6441
6442 //#############################################################################
6443
6444 void
rootInitHub()6445 AlpsKnowledgeBrokerMPI::rootInitHub()
6446 {
6447 int i;
6448
6449 MPI_Status status;
6450
6451 int requiredNumNodes =
6452 model_->AlpsPar()->entry(AlpsParams::hubInitNodeNum);
6453
6454 rampUpSubTree_ = dynamic_cast<AlpsSubTree*>
6455 (const_cast<AlpsKnowledge *>
6456 (decoderObject(AlpsKnowledgeTypeSubTree)))->newSubTree();
6457 rampUpSubTree_->setBroker(this);
6458 rampUpSubTree_->setNodeSelection(rampUpNodeSelection_);
6459
6460 // Make sure the number of nodes created is larger than cluster size.
6461 int minNumNodes = 0;
6462 if (hubWork_) {
6463 minNumNodes = clusterSize_;
6464 }
6465 else {
6466 minNumNodes = clusterSize_ - 1;
6467 }
6468
6469 //------------------------------------------------------
6470 // Receive subtrees (nodes) sent by Master.
6471 // NOTE: Put all nodes in rampUpSubtree_. We just want to use the
6472 // subtree's functions to generates more nodes.
6473 //------------------------------------------------------
6474
6475 while(true) {
6476 // NOTE: master's rank is always 0 in hubComm_.
6477 receiveKnowledge(AlpsKnowledgeTypeNode,
6478 0, // sender,
6479 globalRank_, // receiver,
6480 smallBuffer_,
6481 0,
6482 MPI_ANY_TAG,
6483 hubComm_,
6484 &status,
6485 true);
6486
6487 if (hubMsgLevel_ > 100) {
6488 std::cout << "HUB " << globalRank_ << ": received a node from master "
6489 << masterRank_ << std::endl;
6490 }
6491
6492 if (status.MPI_TAG == AlpsMsgFinishInit) {
6493 if (hubMsgLevel_ > 0) {
6494 messageHandler()->message(ALPS_RAMPUP_HUB_RECV, messages())
6495 << globalRank_ << masterRank_ << CoinMessageEol;
6496 }
6497
6498 break;
6499 }
6500 }
6501
6502 //------------------------------------------------------
6503 // Receive generated knowledge form the master.
6504 //------------------------------------------------------
6505
6506 receiveKnowledge(AlpsKnowledgeTypeModelGen,
6507 masterRank_, // sender
6508 globalRank_, // receiver,
6509 smallBuffer_,
6510 0,
6511 MPI_ANY_TAG,
6512 hubComm_, // hub comm
6513 &status,
6514 true);
6515
6516 //receiveModelKnowlege(hubComm_);
6517
6518 //------------------------------------------------------
6519 // Generate and send required number of nodes(subtree) for hub's workers.
6520 //------------------------------------------------------
6521
6522 if (hubMsgLevel_ > 0 && requiredNumNodes > 0) {
6523 messageHandler()->message(ALPS_RAMPUP_HUB_START, messages())
6524 << globalRank_ << requiredNumNodes << CoinMessageEol;
6525 }
6526
6527 nodeProcessedNum_ += rampUpSubTree_->rampUp(minNumNodes,
6528 requiredNumNodes,
6529 treeDepth_);
6530
6531 const int numNode = rampUpSubTree_->nodePool()->getNumKnowledges();
6532
6533 if (numNode == 0) {
6534 if (hubMsgLevel_ > 0) {
6535 messageHandler()->message(ALPS_RAMPUP_HUB_FAIL, messages())
6536 << globalRank_ << CoinMessageEol;
6537 }
6538 }
6539 else { // Distribute nodes
6540 int numSent = 0;
6541 while ( numSent < numNode) {
6542 for (i = 0; i < clusterSize_; ++i) {
6543 if (numSent >= numNode ) break;
6544
6545 if (i == clusterRank_) {
6546 if (hubWork_) {
6547 // Hub need work, so Keep a node for self
6548 AlpsSubTree* myTree = dynamic_cast<AlpsSubTree*>
6549 (const_cast<AlpsKnowledge *> (decoderObject
6550 (AlpsKnowledgeTypeSubTree)))->newSubTree();
6551 myTree->setBroker(this);
6552 myTree->setNodeSelection(nodeSelection_);
6553
6554 AlpsTreeNode* node = static_cast<AlpsTreeNode* >
6555 (rampUpSubTree_->nodePool()->getKnowledge().first);
6556 rampUpSubTree_->nodePool()->popKnowledge();
6557
6558 node->setBroker(this);
6559 // todo(aykut) node desc does not hold a pointer to the model
6560 // it holds pointer to the broker.
6561 //node->modifyDesc()->setModel(model_);
6562 node->setParent(NULL);
6563 node->setParentIndex(-1);
6564 node->setNumChildren(0);
6565 node->setStatus(AlpsNodeStatusCandidate);
6566 myTree->nodePool()->addKnowledge(node,
6567 node->getQuality());
6568 assert(myTree->getNumNodes() == 1);
6569 myTree->setRoot(node); // Don't forget!
6570 myTree->calculateQuality();
6571 addKnowledge(AlpsKnowledgeTypeSubTree, myTree,
6572 myTree->getQuality());
6573 ++numSent;
6574 }
6575 }
6576 else {
6577 // Send a node to work_i
6578 sendKnowledge(AlpsKnowledgeTypeNode,
6579 globalRank_,
6580 i,
6581 smallBuffer_,
6582 0,
6583 MPI_ANY_TAG,
6584 clusterComm_,
6585 true);
6586
6587 ++numSent;
6588 }
6589 } // EOF of for loop
6590 } // EOF of while loop
6591 }
6592
6593 assert(rampUpSubTree_->getNumNodes() == 0);
6594 rampUpSubTree_->nullRootActiveNode();
6595
6596 //------------------------------------------------------
6597 // Sent finish initialization tag to workers so that they know to
6598 // stop receiving subtrees.
6599 //------------------------------------------------------
6600
6601 for (i = 0; i < clusterSize_; ++i) {
6602 if (i != clusterRank_) {
6603 sendFinishInit(i, clusterComm_);
6604 }
6605 }
6606
6607 //------------------------------------------------------
6608 // Sent generated model knowledge to its workers.
6609 //------------------------------------------------------
6610
6611 for (i = 0; i < clusterSize_; ++i) {
6612 if (i != clusterRank_) {
6613 sendKnowledge(AlpsKnowledgeTypeModelGen,
6614 globalRank_,
6615 i, // receiver
6616 largeBuffer2_,
6617 0,
6618 AlpsMsgModelGenRampUp,
6619 clusterComm_, // comm
6620 true);
6621 // sendModelKnowledge(genBuf, AlpsMsgModelGenRampUp, clusterComm_, i);
6622 }
6623 }
6624
6625 //------------------------------------------------------
6626 // If have better solution, broadcast its value and process id.
6627 //------------------------------------------------------
6628
6629 if (hasKnowledge(AlpsKnowledgeTypeSolution)) {
6630 double incVal = getBestKnowledge(AlpsKnowledgeTypeSolution).second;
6631 if(incVal < incumbentValue_) { // Assume Minimization
6632 incumbentValue_ = incVal;
6633 incumbentID_ = globalRank_;
6634 sendKnowledge(AlpsKnowledgeTypeSolution, //useful
6635 globalRank_,
6636 0,
6637 smallBuffer_,
6638 0,
6639 MPI_ANY_TAG,
6640 MPI_COMM_WORLD,
6641 false);
6642 //sendIncumbent();
6643 if (hubMsgLevel_ > 0) {
6644 messageHandler()->message(ALPS_RAMPUP_HUB_SOL, messages())
6645 << globalRank_ << incVal << CoinMessageEol;
6646 }
6647 }
6648 }
6649
6650 rampUpTime_ = hubTimer_.getTime();
6651
6652 if (hubMsgLevel_ > 0) {
6653 messageHandler()->message(ALPS_RAMPUP_HUB, messages())
6654 << globalRank_ << rampUpTime_ << nodeProcessedNum_ << numNode
6655 << CoinMessageEol;
6656 }
6657
6658 if (nodeProcessedNum_) {
6659 nodeProcessingTime_ = rampUpTime_/nodeProcessedNum_;
6660 }
6661
6662 /* Reset to normal selection. */
6663 rampUpSubTree_->setNodeSelection(nodeSelection_);
6664 }
6665
6666 //#############################################################################
6667
6668 void
rootInitWorker()6669 AlpsKnowledgeBrokerMPI::rootInitWorker()
6670 {
6671 int numTrees = 0;
6672 MPI_Status status;
6673 const int workerMsgLevel =
6674 model_->AlpsPar()->entry(AlpsParams::workerMsgLevel);
6675
6676 //------------------------------------------------------
6677 // Recv subtrees(nodes) sent by my hub.
6678 //------------------------------------------------------
6679
6680 #ifdef NF_DEBUG
6681 std::cout << "WORKER["<< globalRank_ << "]: before rampup." << std::endl;
6682 #endif
6683
6684 while(true) {
6685
6686 rampUpSubTree_ = dynamic_cast<AlpsSubTree*>
6687 (const_cast<AlpsKnowledge *>
6688 (decoderObject(AlpsKnowledgeTypeSubTree)))->
6689 newSubTree();
6690 rampUpSubTree_->setBroker(this);
6691 rampUpSubTree_->setNodeSelection(nodeSelection_);
6692
6693 // NOTE: received subtree is decoded into rampUpSubTree_
6694 receiveKnowledge(AlpsKnowledgeTypeNode,
6695 masterRank_, // sender,
6696 globalRank_, // receiver,
6697 smallBuffer_,
6698 0,
6699 MPI_ANY_TAG,
6700 clusterComm_, // cluster comm
6701 &status,
6702 true);
6703
6704 if (status.MPI_TAG == AlpsMsgFinishInit) {
6705 if (workerMsgLevel > 0) {
6706 messageHandler()->message(ALPS_RAMPUP_WORKER_RECV, messages())
6707 << globalRank_ << myHubRank_ << CoinMessageEol;
6708 }
6709 delete rampUpSubTree_;
6710 rampUpSubTree_ = NULL;
6711 break;
6712 }
6713
6714 assert(rampUpSubTree_->getNumNodes() > 0);
6715
6716 rampUpSubTree_->calculateQuality();
6717
6718 addKnowledge(AlpsKnowledgeTypeSubTree,
6719 rampUpSubTree_,
6720 rampUpSubTree_->getQuality());
6721
6722 rampUpSubTree_ = NULL;
6723
6724 ++numTrees;
6725 assert(numTrees == subTreePool_->getNumKnowledges());
6726 }
6727
6728 //------------------------------------------------------
6729 // Receive generated knowledge form its hub.
6730 //------------------------------------------------------
6731
6732 receiveKnowledge(AlpsKnowledgeTypeModelGen,
6733 myHubRank_, // sender
6734 globalRank_, // receiver,
6735 smallBuffer_,
6736 0,
6737 MPI_ANY_TAG,
6738 clusterComm_,// cluster comm
6739 &status,
6740 true);
6741 // receiveModelKnowlege(clusterComm_);
6742
6743 }
6744
6745 //#############################################################################
6746
6747 void
spiralMaster(AlpsTreeNode * root)6748 AlpsKnowledgeBrokerMPI::spiralMaster(AlpsTreeNode *root)
6749 {
6750 // Create initial number of nodes (large than 1)
6751 int k;
6752 int requiredNumNodes = 2;
6753 int receiver, pos, sender;
6754 int numSent = 0;
6755 int numLoaded = 0;
6756 int doUnitWork = 1;
6757 int donorNodes = 0;
6758 int numFinishSiginal = 1; // Count master itself
6759 int numCompleted = 0;
6760
6761 bool stopSearch = false;
6762 bool inRampUp = true;
6763 double incVal;
6764
6765 MPI_Status status;
6766
6767 const int smallSize = model_->AlpsPar()->entry(AlpsParams::smallSize);
6768
6769 if (msgLevel_ > 0) {
6770 messageHandler()->message(ALPS_STATIC_BALANCE_BEG, messages())
6771 << "spiral initialization" << CoinMessageEol;
6772 }
6773
6774 if (msgLevel_ > 0 && requiredNumNodes > 0) {
6775 messageHandler()->message(ALPS_RAMPUP_MASTER_START, messages())
6776 << globalRank_ << requiredNumNodes << CoinMessageEol;
6777 }
6778
6779 rampUpSubTree_ = dynamic_cast<AlpsSubTree*>
6780 (const_cast<AlpsKnowledge *>(decoderObject(AlpsKnowledgeTypeSubTree)))
6781 ->newSubTree();
6782 rampUpSubTree_->setBroker(this);
6783 rampUpSubTree_->setNodeSelection(rampUpNodeSelection_);
6784 rampUpSubTree_->setNextIndex(1); // One more than root's index
6785
6786 nodeProcessedNum_ += rampUpSubTree_->rampUp(1, // minimum number
6787 requiredNumNodes,
6788 treeDepth_,
6789 root);
6790
6791 // Distribute created nodes to other processes
6792 int numGenNodes = rampUpSubTree_->nodePool()->getNumKnowledges();
6793
6794 if (msgLevel_ > 200) {
6795 std::cout << "master: spiral: numGenNodes = " << numGenNodes<< std::endl;
6796 }
6797
6798
6799 if (numGenNodes == 0) {
6800 // Send initialization complete signal to other processes.
6801 for (k = 0; k < processNum_; ++k) {
6802 if (k != globalRank_) {
6803 if (msgLevel_ > 200) {
6804 std::cout<<"master: spiral: ask others to exit"<<std::endl;
6805 }
6806 MPI_Send(smallBuffer_, 0, MPI_PACKED, k,
6807 AlpsMsgFinishInit, MPI_COMM_WORLD);
6808 }
6809 }
6810 stopSearch = true;
6811 goto TERM_SPIRAL_MASTER;
6812 }
6813
6814 numSent = 0;
6815 numLoaded = 1;
6816 doUnitWork = 1;
6817 if (numGenNodes > processNum_ - 2) {
6818 doUnitWork = 0;
6819 }
6820
6821 while (numSent < numGenNodes) {
6822 for (k = 0; k < processNum_; ++k) {
6823 if(numSent == numGenNodes) {
6824 break; // Distributed all nodes
6825 }
6826 if (k != globalRank_) {
6827 if (msgLevel_ > 200) {
6828 std::cout << "master: spiral: send a node to process "
6829 << k << std::endl;
6830 }
6831 sendNodeModelGen(k, doUnitWork);
6832 ++numSent;
6833 ++numLoaded;
6834 }
6835 }
6836 }
6837
6838 assert(rampUpSubTree_->getNumNodes() == 0);
6839 rampUpSubTree_->nullRootActiveNode();
6840
6841 // Manage rampup
6842 inRampUp = true;
6843 donorNodes = 0; // Count master itself
6844 numFinishSiginal = 1; // Count master itself
6845 numCompleted = 0;
6846
6847 if (doUnitWork) {
6848 // If ask other process do unit work, then they will report back.
6849 while (inRampUp) {
6850 MPI_Recv(&donorNodes, 1, MPI_INT, MPI_ANY_SOURCE,
6851 AlpsMsgRampUpLoad, MPI_COMM_WORLD, &status);
6852 sender = status.MPI_SOURCE;
6853 if ((donorNodes > 1) && (numLoaded < processNum_)) {
6854 // Ask the sender to donate a node to process numLoaded;
6855 receiver = numLoaded;
6856 pos = 0;
6857 MPI_Pack(&receiver, 1, MPI_INT, smallBuffer_, smallSize, &pos,
6858 MPI_COMM_WORLD);
6859 MPI_Send(smallBuffer_, pos, MPI_PACKED, sender,
6860 AlpsMsgRampUpDonate, MPI_COMM_WORLD);
6861 ++numLoaded;
6862 }
6863 else {
6864 MPI_Send(smallBuffer_, 0, MPI_PACKED, sender,
6865 AlpsMsgFinishInit, MPI_COMM_WORLD);
6866 ++numFinishSiginal;
6867 if (donorNodes < 1) {
6868 ++numCompleted;
6869 }
6870 }
6871
6872 //----------------------------------------------
6873 // If have sent all "loaded" processes finish signial OR
6874 // if all "loaded" processes have no nodes, search completed.
6875 //----------------------------------------------
6876 // NOTE: numLoaded is greater than the actual number of nodes
6877 // sent by 1
6878 //----------------------------------------------
6879
6880 if (msgLevel_ > 200) {
6881 std::cout<<"master: spiral: numCompleted = " << numCompleted
6882 << "; numLoaded = " << numLoaded << std::endl;
6883 }
6884
6885 if (numCompleted == numLoaded - 1){
6886 if (msgLevel_ > 200) {
6887 std::cout<<"master: spiral: no nodes left, search completed."
6888 << std::endl;
6889 }
6890 inRampUp = false;
6891 // Send finish signal to rest processes
6892 for (k = numLoaded; k < processNum_; ++k) {
6893 MPI_Send(smallBuffer_, 0, MPI_PACKED, k,
6894 AlpsMsgFinishInit, MPI_COMM_WORLD);
6895 ++numFinishSiginal;
6896 }
6897 assert(numFinishSiginal == processNum_);
6898 }
6899
6900 if (numFinishSiginal == processNum_) {
6901 if (msgLevel_ > 200) {
6902 std::cout<<"master: spiral: all processed have been loaded."
6903 << std::endl;
6904 }
6905 inRampUp = false;
6906 }
6907 }
6908 }
6909 else {
6910 // Just ask them to exit from rampup
6911 for (k = 0; k < processNum_; ++k) {
6912 if (k != globalRank_) {
6913 MPI_Send(smallBuffer_, 0, MPI_PACKED, k,
6914 AlpsMsgFinishInit, MPI_COMM_WORLD);
6915 }
6916 }
6917 }
6918
6919
6920 TERM_SPIRAL_MASTER:
6921
6922 MPI_Barrier(MPI_COMM_WORLD);
6923
6924 //------------------------------------------------------
6925 // If have better solution, broadcast its value and process id.
6926 //------------------------------------------------------
6927
6928 if (!stopSearch && hasKnowledge(AlpsKnowledgeTypeSolution)) {
6929 incVal = getBestKnowledge(AlpsKnowledgeTypeSolution).second;
6930 if(incVal < incumbentValue_) { // Assume Minimization
6931 incumbentValue_ = incVal;
6932 incumbentID_ = globalRank_;
6933 sendKnowledge(AlpsKnowledgeTypeSolution, //useful
6934 globalRank_,
6935 0,
6936 smallBuffer_,
6937 0,
6938 MPI_ANY_TAG,
6939 MPI_COMM_WORLD,
6940 false);
6941 //sendIncumbent();
6942 if (msgLevel_ > 0) {
6943 messageHandler()->message(ALPS_RAMPUP_MASTER_SOL, messages())
6944 << globalRank_ << incVal << CoinMessageEol;
6945 }
6946 }
6947 }
6948
6949 if (msgLevel_ > 0) {
6950 messageHandler()->message(ALPS_STATIC_BALANCE_END, messages())
6951 << "spiral initialization" << CoinMessageEol;
6952 }
6953 }
6954
6955 //#############################################################################
6956
6957 void
spiralHub()6958 AlpsKnowledgeBrokerMPI::spiralHub()
6959 {
6960 bool inRampUp = true;
6961 MPI_Status status;
6962
6963 while(inRampUp) {
6964 MPI_Recv(largeBuffer_, largeSize_, MPI_PACKED, MPI_ANY_SOURCE,
6965 MPI_ANY_TAG, MPI_COMM_WORLD, &status);
6966 switch(status.MPI_TAG) {
6967 case AlpsMsgNode:
6968 // Unpack the node, explore it and send load info to master
6969 spiralRecvProcessNode();
6970 break;
6971 case AlpsMsgRampUpDonate:
6972 // Unpack msg and donate a node
6973 spiralDonateNode();
6974 break;
6975 case AlpsMsgFinishInit:
6976 inRampUp = false;
6977 break;
6978 default:
6979 std::cout << "PROC " << globalRank_
6980 << " : spiralHub: received UNKNOWN message. tag = "
6981 << status.MPI_TAG << std::endl;
6982 assert(0);
6983 throw CoinError("Unknown message type", "spiralHubWorker",
6984 "AlpsKnowledgeBrokerMPI");
6985 }
6986 }
6987
6988 if (hubMsgLevel_ > 10) {
6989 std::cout << "hub[" << globalRank_
6990 << "]: spiral: exited from while." << std::endl;
6991 }
6992
6993 if (hubWork_) {
6994 // Put rampup subtree in pool
6995 if (rampUpSubTree_) {
6996 rampUpSubTree_->calculateQuality();
6997 addKnowledge(AlpsKnowledgeTypeSubTree,
6998 rampUpSubTree_,
6999 rampUpSubTree_->getQuality());
7000 rampUpSubTree_ = NULL;
7001 }
7002 }
7003 else {
7004 // hub send its nodes to its workers.
7005 int i;
7006 int numSent = 0;
7007 int receiver = -1;
7008
7009 if (rampUpSubTree_) { // Probably deleted by spiralRecvProcessNode()
7010 const int numNode = rampUpSubTree_->nodePool()->getNumKnowledges();
7011 while ( numSent < numNode) {
7012 for (i = 0; i < clusterSize_; ++i) {
7013 if (numSent == numNode ) break;
7014 if (i != clusterRank_) {
7015 // Send a node to work receiver
7016 receiver =
7017 cluster2GlobalRank(masterRank_, myHubRank_, i);
7018 sendNodeModelGen(receiver, 0); // Just recv,not work
7019 ++numSent;
7020 }
7021 }
7022 } // EOF of while loop
7023 assert(rampUpSubTree_->getNumNodes() == 0);
7024 rampUpSubTree_->nullRootActiveNode();
7025 }
7026
7027 // Send hub finish tag
7028 char* dummyBuf = 0;
7029 for (i = 0; i < clusterSize_; ++i) {
7030 if (i != clusterRank_) {
7031 receiver = cluster2GlobalRank(masterRank_, myHubRank_, i);
7032 MPI_Send(dummyBuf, 0, MPI_PACKED, receiver,
7033 AlpsMsgFinishInitHub, MPI_COMM_WORLD);
7034 }
7035 }
7036 }
7037
7038 MPI_Barrier(MPI_COMM_WORLD);
7039
7040 if (hubMsgLevel_ > 10) {
7041 std::cout << "hub: spiral: finished." << std::endl;
7042 }
7043
7044 //------------------------------------------------------
7045 // If have better solution, broadcast its value and process id.
7046 //------------------------------------------------------
7047
7048 if (hasKnowledge(AlpsKnowledgeTypeSolution)) {
7049 double incVal = getBestKnowledge(AlpsKnowledgeTypeSolution).second;
7050 if(incVal < incumbentValue_) { // Assume Minimization
7051 incumbentValue_ = incVal;
7052 incumbentID_ = globalRank_;
7053 sendKnowledge(AlpsKnowledgeTypeSolution, //useful
7054 globalRank_,
7055 0,
7056 smallBuffer_,
7057 0,
7058 MPI_ANY_TAG,
7059 MPI_COMM_WORLD,
7060 false);
7061 //sendIncumbent();
7062 if (msgLevel_ > 0) {
7063 messageHandler()->message(ALPS_RAMPUP_HUB_SOL, messages())
7064 << globalRank_ << incVal << CoinMessageEol;
7065 }
7066 }
7067 }
7068 }
7069
7070 //#############################################################################
7071
7072 void
spiralWorker()7073 AlpsKnowledgeBrokerMPI::spiralWorker()
7074 {
7075 // Exit when receive instruction from Hub and master
7076 int exitCount = 2;
7077 int count = 0;
7078 bool inRampUp = true;
7079
7080 if (hubWork_ || (globalRank_ < clusterSize_)) {
7081 // Hub won't send it node to me since it works.
7082 // If in master's cluster, then exit now.
7083 exitCount = 1;
7084 }
7085
7086 // Note the sequence to receive AlpsMsgFinishInit or
7087 // AlpsMsgFinishInitHub is not deterministic.
7088 MPI_Status status;
7089 while(inRampUp) {
7090 MPI_Recv(largeBuffer_, largeSize_, MPI_PACKED, MPI_ANY_SOURCE,
7091 MPI_ANY_TAG, MPI_COMM_WORLD, &status);
7092 switch(status.MPI_TAG) {
7093 case AlpsMsgNode:
7094 // Unpack the node, explore it and send load info to master
7095 spiralRecvProcessNode();
7096 break;
7097 case AlpsMsgRampUpDonate:
7098 // Unpack msg and donate a node
7099 spiralDonateNode();
7100 break;
7101 case AlpsMsgFinishInit:
7102 ++count;
7103 if (count == exitCount) {
7104 inRampUp = false;
7105 }
7106 break;
7107 case AlpsMsgFinishInitHub:
7108 ++count;
7109 if (count == exitCount) {
7110 inRampUp = false;
7111 }
7112 break;
7113 default:
7114 std::cout << "PROC " << globalRank_
7115 << " : spiralWorker: received UNKNOWN message. tag = "
7116 << status.MPI_TAG << std::endl;
7117 assert(0);
7118 throw CoinError("Unknown message type", "spiralHubWorker",
7119 "AlpsKnowledgeBrokerMPI");
7120 }
7121 }
7122
7123 #if 0
7124 std::cout << "worker[" << globalRank_
7125 << "]: spiral: exited from while." << std::endl;
7126 #endif
7127
7128 // Put rampup subtree in pool
7129 if (rampUpSubTree_) {
7130 rampUpSubTree_->calculateQuality();
7131 addKnowledge(AlpsKnowledgeTypeSubTree,
7132 rampUpSubTree_,
7133 rampUpSubTree_->getQuality());
7134 rampUpSubTree_ = NULL;
7135 }
7136
7137 MPI_Barrier(MPI_COMM_WORLD);
7138
7139 //------------------------------------------------------
7140 // If have better solution, broadcast its value and process id.
7141 //------------------------------------------------------
7142
7143 if (hasKnowledge(AlpsKnowledgeTypeSolution)) {
7144 double incVal = getBestKnowledge(AlpsKnowledgeTypeSolution).second;
7145 if(incVal < incumbentValue_) { // Assume Minimization
7146 incumbentValue_ = incVal;
7147 incumbentID_ = globalRank_;
7148 sendKnowledge(AlpsKnowledgeTypeSolution, //useful
7149 globalRank_,
7150 0,
7151 smallBuffer_,
7152 0,
7153 MPI_ANY_TAG,
7154 MPI_COMM_WORLD,
7155 false);
7156 //sendIncumbent();
7157 if (msgLevel_ > 10) {
7158 messageHandler()->message(ALPS_RAMPUP_WORKER_SOL, messages())
7159 << globalRank_ << incVal << CoinMessageEol;
7160 }
7161 }
7162 }
7163
7164 if (msgLevel_ > 200) {
7165 std::cout << "worker[" << globalRank_<<"]: spiral: finished." << std::endl;
7166 }
7167 }
7168
7169 //#############################################################################
7170
7171 void
spiralRecvProcessNode()7172 AlpsKnowledgeBrokerMPI::spiralRecvProcessNode()
7173 {
7174 //--------------------------------------------
7175 // Unpack if doUnitWork
7176 //--------------------------------------------
7177
7178 int pos = 0;
7179 int doUnitWork = 0;
7180 MPI_Unpack(largeBuffer_, largeSize_, &pos, &doUnitWork, 1,
7181 MPI_INT, MPI_COMM_WORLD);
7182 if (msgLevel_ > 200) {
7183 std::cout << "PROCESS " << globalRank_ << " : received a node, doUnitWork "
7184 << doUnitWork << std::endl;
7185 }
7186
7187 //--------------------------------------------
7188 // Unpack a node
7189 //--------------------------------------------
7190
7191 rampUpSubTree_ = dynamic_cast<AlpsSubTree*>
7192 (const_cast<AlpsKnowledge *>(decoderObject(AlpsKnowledgeTypeSubTree)))->newSubTree();
7193 rampUpSubTree_->setBroker(this);
7194 rampUpSubTree_->setNodeSelection(nodeSelection_);
7195
7196 AlpsEncoded* encodedNode = unpackEncoded(largeBuffer_,pos,MPI_COMM_WORLD);
7197 AlpsTreeNode* node = dynamic_cast<AlpsTreeNode* >
7198 ( decoderObject(encodedNode->type())->decode(*encodedNode) );
7199
7200 node->setBroker(this);
7201 // todo(aykut) node desc does not hold a pointer to the model
7202 // it holds pointer to the broker.
7203 //node->modifyDesc()->setModel(model_);
7204 node->setParent(NULL);
7205
7206 rampUpSubTree_->nodePool()->addKnowledge(node, node->getQuality());
7207 assert(rampUpSubTree_->getNumNodes() > 0);
7208 if ( (rampUpSubTree_->nodePool()->getNumKnowledges() ) == 1) {
7209 // Make the first node as root.
7210 rampUpSubTree_->setRoot(node);
7211 }
7212
7213 if (encodedNode) {
7214 delete encodedNode;
7215 encodedNode = 0;
7216 }
7217 node = NULL;
7218
7219 //--------------------------------------------
7220 // Unpack model knowledge
7221 //--------------------------------------------
7222 int hasKnowledge = 0;
7223 MPI_Unpack(largeBuffer_, largeSize_, &pos, &hasKnowledge, 1,
7224 MPI_INT, MPI_COMM_WORLD);
7225 if (hasKnowledge) {
7226 // Upack knowledge from buffer.
7227 AlpsEncoded* encodedModelGen = unpackEncoded(largeBuffer_,
7228 pos,
7229 MPI_COMM_WORLD,
7230 largeSize_);
7231 // Upack and store knowledge from larger buffer.
7232 model_->unpackSharedKnowledge(*encodedModelGen);
7233 delete encodedModelGen;
7234 encodedModelGen = NULL;
7235 }
7236
7237 //--------------------------------------------
7238 // Do one unit work or store the subtree
7239 //--------------------------------------------
7240
7241 if (doUnitWork) {
7242 // Do one unit of work and report load info
7243 int requiredNumNodes = 2;
7244 nodeProcessedNum_ += rampUpSubTree_->rampUp(1, // minimum number
7245 requiredNumNodes,
7246 treeDepth_,
7247 node);
7248
7249 // Send load infor to master
7250 int numNodes = rampUpSubTree_->getNumNodes();
7251 if (msgLevel_ > 200) {
7252 std::cout << "PROCESS " << globalRank_
7253 << " : numNodes = " << numNodes << std::endl;
7254 }
7255
7256 const int smallSize = model_->AlpsPar()->entry(AlpsParams::smallSize);
7257
7258 pos = 0;
7259 MPI_Pack(&numNodes, 1, MPI_INT, smallBuffer_, smallSize, &pos,
7260 MPI_COMM_WORLD);
7261 MPI_Send(smallBuffer_, pos, MPI_PACKED, masterRank_,
7262 AlpsMsgRampUpLoad, MPI_COMM_WORLD);
7263 if (numNodes == 0) {
7264 delete rampUpSubTree_;
7265 rampUpSubTree_ = NULL;
7266 }
7267 }
7268 else {
7269 // Put rampup subtree in subtree pool
7270 rampUpSubTree_->calculateQuality();
7271 addKnowledge(AlpsKnowledgeTypeSubTree,
7272 rampUpSubTree_,
7273 rampUpSubTree_->getQuality());
7274 rampUpSubTree_ = NULL;
7275 }
7276 }
7277
7278 //#############################################################################
7279
7280 // TODO: With model knowledge?
7281 void
spiralDonateNode()7282 AlpsKnowledgeBrokerMPI::spiralDonateNode()
7283 {
7284 // Get receiver rank
7285 int i;
7286 int position = 0;
7287 int receiver = -1;
7288
7289 int numNodes = rampUpSubTree_->getNumNodes();
7290 assert(numNodes > 0);
7291
7292 MPI_Unpack(largeBuffer_, largeSize_, &position, &receiver, 1,
7293 MPI_INT, MPI_COMM_WORLD);
7294
7295 if (msgLevel_ > 200) {
7296 std::cout << "worker " << globalRank_ << ": spiral: has " << numNodes
7297 << " nodes; send a node to process "
7298 << receiver << std::endl;
7299 }
7300 // Get a node for receiver
7301 // Send load infor to master
7302 AlpsTreeNode* node = dynamic_cast<AlpsTreeNode* >
7303 (rampUpSubTree_->nodePool()->getKnowledge().first);
7304 AlpsEncoded* enc = node->encode();
7305
7306 rampUpSubTree_->nodePool()->popKnowledge();
7307
7308 // Update parent linkage
7309 AlpsTreeNode *parent = node->getParent();
7310 int numChildren = -1;
7311 if (parent) {
7312 numChildren = parent->getNumChildren();
7313 for (i = 0; i < numChildren; ++i) {
7314 if (parent->getChild(i) == node) break;
7315 }
7316 parent->setChild(i, parent->getChild(numChildren - 1));
7317 parent->setChild(numChildren - 1, NULL);
7318 parent->modifyNumChildren(-1); // A child have gone
7319 }
7320
7321 // Update children's linkage
7322 numChildren = node->getNumChildren();
7323 AlpsTreeNode *child = NULL;
7324 for (i = 0; i < numChildren; ++i) {
7325 child = node->getChild(i);
7326 child->setParent(NULL);
7327 child->setParentIndex(-1);
7328 }
7329
7330 // Delete the node since it been sent to other process
7331 delete node;
7332 node = NULL;
7333
7334 // Send the encoded node
7335 position = 0;
7336 int doUnitWork = 1;
7337 int hasKnowledge = 0;
7338
7339 // Pack if doUnitWork
7340 MPI_Pack(&doUnitWork, 1, MPI_INT, largeBuffer_, largeSize_, &position,
7341 MPI_COMM_WORLD);
7342 packEncoded(enc, largeBuffer_, largeSize_, position, MPI_COMM_WORLD);
7343 MPI_Send(largeBuffer_, position, MPI_PACKED, receiver, AlpsMsgNode,
7344 MPI_COMM_WORLD);
7345 MPI_Pack(&hasKnowledge, 1, MPI_INT, largeBuffer_, largeSize_, &position,
7346 MPI_COMM_WORLD);
7347 if (enc) {
7348 delete enc;
7349 enc = 0; // Allocated in encode()
7350 }
7351
7352 // Send load infor to master
7353 int numNodesAfter = rampUpSubTree_->getNumNodes();
7354 if (msgLevel_ > 200) {
7355 std::cout << "worker " << globalRank_ << ": spiral: has "
7356 << numNodesAfter << " nodes after donating" << std::endl;
7357 }
7358
7359 // Do one unit of work
7360 int requiredNumNodes = 2;
7361 nodeProcessedNum_ += rampUpSubTree_->rampUp(1, // minimum number
7362 requiredNumNodes,
7363 treeDepth_,
7364 NULL);
7365
7366 // Send load infor to master
7367 numNodesAfter = rampUpSubTree_->getNumNodes();
7368 if (msgLevel_ > 200) {
7369 std::cout << "worker " << globalRank_ << ": spiral: has "
7370 << numNodesAfter << " nodes after do unit work" << std::endl;
7371 }
7372 const int smallSize = model_->AlpsPar()->entry(AlpsParams::smallSize);
7373 int pos = 0;
7374 MPI_Pack(&numNodesAfter, 1, MPI_INT, smallBuffer_, smallSize, &pos,
7375 MPI_COMM_WORLD);
7376 MPI_Send(smallBuffer_, pos, MPI_PACKED, masterRank_, AlpsMsgRampUpLoad,
7377 MPI_COMM_WORLD);
7378
7379 if (numNodesAfter == 0) {
7380 delete rampUpSubTree_;
7381 rampUpSubTree_ = NULL;
7382 }
7383 }
7384
7385 //#############################################################################
7386