1 /*===========================================================================*
2  * This file is part of the Abstract Library for Parallel Search (ALPS).     *
3  *                                                                           *
4  * ALPS is distributed under the Eclipse Public License as part of the       *
5  * COIN-OR repository (http://www.coin-or.org).                              *
6  *                                                                           *
7  * Authors:                                                                  *
8  *                                                                           *
9  *          Yan Xu, Lehigh University                                        *
10  *          Aykut Bulut, Lehigh University                                   *
11  *          Ted Ralphs, Lehigh University                                    *
12  *                                                                           *
13  * Conceptual Design:                                                        *
14  *                                                                           *
15  *          Yan Xu, Lehigh University                                        *
16  *          Ted Ralphs, Lehigh University                                    *
17  *          Laszlo Ladanyi, IBM T.J. Watson Research Center                  *
18  *          Matthew Saltzman, Clemson University                             *
19  *                                                                           *
20  *                                                                           *
21  * Copyright (C) 2001-2019, Lehigh University, Yan Xu, Aykut Bulut, and      *
22  *                          Ted Ralphs.                                      *
23  * All Rights Reserved.                                                      *
24  *===========================================================================*/
25 
26 
27 #include <algorithm>
28 #include <cassert>
29 #include <cstring>
30 #include <exception>
31 
32 #include "CoinError.hpp"
33 #include "CoinHelperFunctions.hpp"
34 #include "CoinTime.hpp"
35 
36 #include "AlpsHelperFunctions.h"
37 #include "AlpsKnowledgeBrokerMPI.h"
38 #include "AlpsMessageTag.h"
39 #include "AlpsModel.h"
40 #include "AlpsNodePool.h"
41 #include "AlpsTreeNode.h"
42 
43 using std::exception;
44 
45 //#############################################################################
46 
cluster2GlobalRank(int masterRank,int myHubRank,int clusterRank)47 static int cluster2GlobalRank(int masterRank, int myHubRank, int clusterRank)
48 {
49     return (myHubRank - masterRank + clusterRank);
50 }
51 
52 //#############################################################################
53 
54 // Form binary tree to broadcast solutions.
rankToSequence(const int incRank,const int rank)55 static int rankToSequence(const int incRank, const int rank)
56 {
57     if (rank < incRank) return rank + 1;
58     else if (rank == incRank) return 0;
59     else return rank;
60 }
61 
sequenceToRank(const int incRank,const int sequence)62 static int sequenceToRank(const int incRank, const int sequence)
63 {
64     if (sequence == 0) return incRank;
65     else if (sequence <= incRank) return sequence - 1;
66     else return sequence;
67 }
68 
69 #if 0 // not used
70 static int parentSequence(const int sequence, const int numProcesses)
71 {
72     if (sequence <= 0) return -1;
73     else return (sequence - 1) / 2;
74 }
75 #endif
76 
leftSequence(const int sequence,const int numProcesses)77 static int leftSequence(const int sequence, const int numProcesses)
78 {
79     int seq =  2 * sequence + 1;
80     if ( seq >= numProcesses ) return -1;
81     else return seq;
82 }
83 
rightSequence(const int sequence,const int numProcesses)84 static int rightSequence(const int sequence, const int numProcesses)
85 {
86     int seq =  2 * sequence + 2;
87     if ( seq >= numProcesses ) return -1;
88     else return seq;
89 }
90 
91 //#############################################################################
92 
computeUnitNodes(int oldUnitNodes,double nodeProcessingTime)93 static int computeUnitNodes(int oldUnitNodes,
94                             double nodeProcessingTime)
95 {
96     int unitNodes = 1;
97 
98     if (nodeProcessingTime < 1.0e-14) {
99         nodeProcessingTime = 1.0e-5;
100     }
101 
102     if (nodeProcessingTime > 30.0) {
103         unitNodes = 1;
104     }
105     else if (nodeProcessingTime > 5.0) {
106         unitNodes = 1;
107     }
108     else if (nodeProcessingTime > 1.0) {
109         unitNodes = 2;
110     }
111     else if (nodeProcessingTime > 0.1) {
112         unitNodes = 3;
113     }
114     else if (nodeProcessingTime > 0.05) {
115         unitNodes = 4;
116     }
117     else if (nodeProcessingTime > 0.01) {
118         unitNodes = 5;
119     }
120     else if (nodeProcessingTime > 0.005) {
121         unitNodes = 7;
122     }
123     else if (nodeProcessingTime > 0.001) {
124         unitNodes = 30;
125     }
126     else if (nodeProcessingTime > 0.0005) {
127         unitNodes = 50;
128     }
129     else if (nodeProcessingTime > 0.0001) {
130         unitNodes = 200;
131     }
132     else if (nodeProcessingTime > 0.00005) {
133         unitNodes = 400;
134     }
135     else {
136         unitNodes = 500;
137     }
138 
139     if (oldUnitNodes > 0) {
140         unitNodes = (int)(0.5 * (unitNodes + oldUnitNodes));
141     }
142 
143 #if 0
144     std::cout << "$$$ Old unitNodes = " << oldUnitNodes
145               << "$$$ new unitWork = " << unitNodes
146               << ", nodeProcessingTime = " << nodeProcessingTime
147               << std::endl;
148 #endif
149 
150     return unitNodes;
151 }
152 
153 //#############################################################################
154 
computeBalancePeriod(bool hasUserInput,double oldBalancePeriod,double nodeProcessingTime)155 static double computeBalancePeriod(bool hasUserInput,
156                                    double oldBalancePeriod,
157                                    double nodeProcessingTime)
158 {
159     if (hasUserInput) return oldBalancePeriod;
160 
161     //todo(aykut) added to fix when nodeProcessingTime is inf.
162     if (nodeProcessingTime>1e80) {
163       return 0.3;
164     }
165 
166     double newPeriod = 0.1;
167 
168     if (nodeProcessingTime < 1.0e-14) {
169         nodeProcessingTime = 1.0e-5;
170     }
171 
172     if (nodeProcessingTime > 30.0) {
173         newPeriod = nodeProcessingTime;
174     }
175     else if (nodeProcessingTime > 5.0) {
176         newPeriod = nodeProcessingTime;
177     }
178     else if (nodeProcessingTime > 1.0) {
179         newPeriod = nodeProcessingTime;
180     }
181     else if (nodeProcessingTime > 0.5) {
182         newPeriod = nodeProcessingTime;
183     }
184     else if (nodeProcessingTime > 0.1) {
185         newPeriod = nodeProcessingTime;
186     }
187     else if (nodeProcessingTime > 0.05) {
188         newPeriod = nodeProcessingTime * 2;
189     }
190     else if (nodeProcessingTime > 0.01) {
191         newPeriod = nodeProcessingTime * 3;
192     }
193     else if (nodeProcessingTime > 0.005) {
194         newPeriod = nodeProcessingTime * 10;
195     }
196     else if (nodeProcessingTime > 0.001) {
197         newPeriod = nodeProcessingTime * 40;
198     }
199     else if (nodeProcessingTime > 0.0005) {
200         newPeriod = nodeProcessingTime * 100;
201     }
202     else if (nodeProcessingTime > 0.0001) {
203         newPeriod = nodeProcessingTime * 200;
204     }
205     else if (nodeProcessingTime > 0.00005) {
206         newPeriod = nodeProcessingTime * 600;
207     }
208     else if (nodeProcessingTime > 0.00001) {
209         newPeriod = nodeProcessingTime * 1500;
210     }
211     else {
212         newPeriod = 0.03;
213     }
214 
215     if (oldBalancePeriod > 0.0) {
216         newPeriod = 0.5 * (newPeriod + oldBalancePeriod);
217     }
218 
219 #if 0
220     std::cout << "$$$ newPeriod = " << newPeriod
221               << ", oldBalancePeriod = " << oldBalancePeriod
222               << ", nodeProcessingTime = " << nodeProcessingTime
223               << std::endl;
224 #endif
225     return newPeriod;
226 }
227 
228 //#############################################################################
229 
230 void
masterMain(AlpsTreeNode * root)231 AlpsKnowledgeBrokerMPI::masterMain(AlpsTreeNode* root)
232 {
233     int i;
234     int preSysSendCount = 0;
235     int masterCheckCount = 0;
236     int hubCheckCount = 0;
237     int reportInterval = 0;
238     int reportIntervalCount = 0;
239 
240     double elaspeTime  = 0.0;
241 
242     char reply = 'C';
243 
244     bool allWorkerReported = false;
245     bool terminate = false;
246 
247     //------------------------------------------------------
248     // Get parameters.
249     //------------------------------------------------------
250     const int printSystemStatus =
251         model_->AlpsPar()->entry(AlpsParams::printSystemStatus);
252 
253     const int staticBalanceScheme =
254         model_->AlpsPar()->entry(AlpsParams::staticBalanceScheme);
255 
256     //const bool clockType =
257     //model_->AlpsPar()->entry(AlpsParams::clockType);
258     const bool interCB =
259         model_->AlpsPar()->entry(AlpsParams::interClusterBalance);
260     const bool intraCB =
261         model_->AlpsPar()->entry(AlpsParams::intraClusterBalance);
262 
263     const int smallSize =
264         model_->AlpsPar()->entry(AlpsParams::smallSize);
265 
266     const int nodeLimit =
267         model_->AlpsPar()->entry(AlpsParams::nodeLimit);
268     const int solLimit = model_->AlpsPar()->entry(AlpsParams::solLimit);
269 
270     const double zeroLoad =
271         model_->AlpsPar()->entry(AlpsParams::zeroLoad);
272 
273     masterBalancePeriod_ =
274         model_->AlpsPar()->entry(AlpsParams::masterBalancePeriod);
275     if (masterBalancePeriod_ > 0.0) {
276         userBalancePeriod_ = true;
277     }
278     else {
279         masterBalancePeriod_ = -masterBalancePeriod_;
280     }
281 
282     largeSize_ = model_->AlpsPar()->entry(AlpsParams::largeSize);
283 
284     //------------------------------------------------------
285     // Initialization and setup.
286     //------------------------------------------------------
287 
288     masterTimer_.setClockType(AlpsClockTypeWallClock);
289 
290     hubNodeProcesseds_ = new int [hubNum_];
291     hubWorkQualities_ = new double [hubNum_];
292     hubWorkQuantities_ = new double [hubNum_];
293     hubReported_ = new bool [hubNum_];
294     workerNodeProcesseds_ = new int [clusterSize_];
295     workerWorkQualities_ = new double [clusterSize_];
296     workerWorkQuantities_ = new double [clusterSize_];
297     workerReported_ = new bool [clusterSize_];
298 
299     for (i = 0; i < hubNum_; ++i) {
300         hubNodeProcesseds_[i] = 0;
301         hubWorkQualities_[i] = 0.0;
302         hubWorkQuantities_[i] = 0.0;
303         hubReported_[i] = false;
304     }
305     for (i = 0; i < clusterSize_; ++i) {
306         workerNodeProcesseds_[i] = 0;
307         workerWorkQualities_[i] = ALPS_OBJ_MAX;
308         workerWorkQuantities_[i] = 0.0;
309         workerReported_[i] = false;
310     }
311 
312     workerNodeProcesseds_[0] = nodeProcessedNum_;
313     workerWorkQualities_[0] = workQuality_;
314     workerWorkQuantities_[0] = workQuantity_;
315 
316     root->setBroker(this);
317     root->setQuality(ALPS_OBJ_MAX);
318     root->setDepth(0);
319     root->setIndex(0);
320     root->setExplicit(1); // Always true for root.
321 
322     //------------------------------------------------------
323     // Estimate a tree node size and send it to other hubs.
324     //------------------------------------------------------
325 
326     AlpsEncoded* encSize = root->encode();
327     setNodeMemSize(static_cast<int>(encSize->size() * 4));
328     delete encSize;
329 
330     // Adjust largeSize to avoid extreme cases.
331     largeSize_ = CoinMax(largeSize_, nodeMemSize_ * 3);
332 
333     // Allocate msg buffers
334     largeBuffer_ = new char [largeSize_];
335     largeBuffer2_ = new char [largeSize_];
336     smallBuffer_ = new char [smallSize];
337 
338     for (i = 0; i < hubNum_; ++i) {
339         if (hubRanks_[i] != globalRank_) {
340             MPI_Send(&nodeMemSize_, 1, MPI_INT, i, AlpsMsgNodeSize, hubComm_);
341         }
342     }
343 
344     if (msgLevel_ > 0) {
345         messageHandler()->message(ALPS_NODE_MEM_SIZE, messages())
346             << nodeMemSize_ << CoinMessageEol;
347     }
348 
349     //------------------------------------------------------
350     // Send estimated node size to master's workers.
351     //------------------------------------------------------
352 
353     for (i = 0; i < clusterSize_; ++i) {
354         if (i != clusterRank_) {
355             MPI_Send(&nodeMemSize_, 1, MPI_INT, i, AlpsMsgNodeSize,
356                      clusterComm_);
357         }
358     }
359 
360     MPI_Barrier(MPI_COMM_WORLD); // Sync before rampup
361 
362     //======================================================
363     // Master's Ramp-up.
364     //======================================================
365 
366     // Start to measure rampup
367     masterTimer_.start();
368 
369     setPhase(AlpsPhaseRampup);
370     switch (staticBalanceScheme) {
371     case AlpsRootInit:
372         rootInitMaster(root);
373         break;
374     case AlpsSpiral:
375         spiralMaster(root);
376         break;
377     default:
378         throw CoinError("Unknown static balance scheme", "masterMain",
379                         "AlpsKnowledgeBrokerMPI");
380     }
381 
382     rampUpTime_ = masterTimer_.getTime();
383 
384     //======================================================
385     // Search for solutions.
386     //======================================================
387 
388     setPhase(AlpsPhaseSearch);
389 
390     //------------------------------------------------------
391     // MASTER SCHEDULER:
392     // a. Listen and process messages periodically.
393     // b. Do termination check if the conditions are statisfied,
394     //    otherwise, balances the workload of hubs.
395     //------------------------------------------------------
396 
397     masterBalancePeriod_ = computeBalancePeriod(userBalancePeriod_,
398                                                 masterBalancePeriod_,
399                                                 nodeProcessingTime_);
400 
401     if (msgLevel_ > 0 && interCB) {
402         messageHandler()->message(ALPS_LOADBAL_MASTER_PERIOD, messages())
403             << globalRank_ <<  masterBalancePeriod_ << CoinMessageEol;
404     }
405 
406     MPI_Request request;
407     MPI_Status status;
408     int flag;
409 
410     MPI_Irecv(largeBuffer_, largeSize_, MPI_PACKED, MPI_ANY_SOURCE,
411               MPI_ANY_TAG, MPI_COMM_WORLD, &request);
412 
413     while (true) {
414 
415         //**------------------------------------------------
416         // Listen and process msg for a period.
417         //**------------------------------------------------
418 
419         masterTimer_.start();
420         elaspeTime = 0.0;
421 
422         //masterBalancePeriod_ = 0.50;  // Test frequency
423         //std::cout << ", masterBalancePeriod_ = " << masterBalancePeriod_
424         //        << std::endl;
425 
426         while (elaspeTime < masterBalancePeriod_) {
427             MPI_Test(&request, &flag, &status);
428             if (flag) { // Receive a msg
429                 processMessages(largeBuffer_, status, request);
430             }
431             elaspeTime = masterTimer_.getTime();
432             // std::cout << "***** elaspeTime = " << elaspeTime
433             //       << ", masterBalancePeriod_ = " << masterBalancePeriod_
434             //       << std::endl;
435         }
436 
437         //**------------------------------------------------
438         // Check if all workers in this cluster have reported their status.
439         //**------------------------------------------------
440 
441         if (!allWorkerReported) {
442             workerReported_[masterRank_] = true;
443             allWorkerReported = true;
444             for (i = 0; i < clusterSize_; ++i) {
445                 if (workerReported_[i] == false) {
446                     allWorkerReported = false;
447                     break;
448                 }
449             }
450         }
451 
452         //**------------------------------------------------
453         // Check whether all hubs have reported their status.
454         //**------------------------------------------------
455 
456         if (!allHubReported_) {
457             allHubReported_ = true;
458             // NOTE: The position of this hub is 0.
459             hubReported_[0] = true;
460             for (i = 0; i < hubNum_; ++i) {
461                 if ( hubReported_[i] != true ) {
462                     allHubReported_ =  false;
463                     break;
464                 }
465             }
466         }
467 
468         //**------------------------------------------------
469         // Master add the status of its cluster to system status.
470         //**------------------------------------------------
471 
472         refreshSysStatus();
473 
474         //**------------------------------------------------
475         // Check if force terminate.
476         // FIXME: clean up msg and memory before leaving.
477         //**------------------------------------------------
478 
479         // NOTE: Use wall clock time for parallel.
480         if (!forceTerminate_) {
481             if (allWorkerReported && allHubReported_ &&
482                 timer_.reachWallLimit()) {
483 
484                 forceTerminate_ = true;
485                 setExitStatus(AlpsExitStatusTimeLimit);
486                 masterForceHubTerm();
487                 hubForceWorkerTerm();
488                 if (msgLevel_ > 0) {
489                     if (incumbentValue_ < ALPS_INFINITY) {
490                         messageHandler()->message(ALPS_LOADREPORT_MASTER_F, messages())
491                             << systemNodeProcessed_
492                             << systemWorkQuantity_
493                             << systemWorkQuantityForce_
494                             << systemSendCount_ << systemRecvCount_
495                             << masterCheckCount
496                             << incumbentValue_
497                             << timer_.getWallClock()
498                             << CoinMessageEol;
499                     }
500                     else {
501                         messageHandler()->message(ALPS_LOADREPORT_MASTER_F_N, messages())
502                             << systemNodeProcessed_
503                             << systemWorkQuantity_
504                             << systemWorkQuantityForce_
505                             << systemSendCount_ << systemRecvCount_
506                             << masterCheckCount
507                             << timer_.getWallClock()
508                             << CoinMessageEol;
509                     }
510 
511                   messageHandler()->message(ALPS_TERM_FORCE_TIME, messages())
512                     << timer_.limit_ << CoinMessageEol;
513                 }
514                 systemWorkQuantityForce_ = systemWorkQuantity_;
515             }
516             else if ( allWorkerReported && allHubReported_ &&
517                       ((systemNodeProcessed_ >= nodeLimit) ||
518                        (getNumKnowledges(AlpsKnowledgeTypeSolution)+solNum_ >=
519                         solLimit)) ) {
520                 forceTerminate_ = true;
521                 if (systemNodeProcessed_ >= nodeLimit) {
522                     setExitStatus(AlpsExitStatusNodeLimit);
523                     messageHandler()->message(ALPS_TERM_FORCE_NODE, messages())
524                         << nodeLimit << CoinMessageEol;
525                 }
526                 else {
527                     //std::cout << "num solution = " << getNumKnowledges(AlpsKnowledgeTypeSolution) << std::endl;
528                     setExitStatus(AlpsExitStatusSolLimit);
529                     messageHandler()->message(ALPS_TERM_FORCE_SOL, messages())
530                         << solLimit << CoinMessageEol;
531                 }
532 
533                 masterForceHubTerm();
534                 hubForceWorkerTerm();
535                 if (msgLevel_ > 0) {
536                     if (incumbentValue_ < ALPS_INFINITY) {
537                         messageHandler()->message(ALPS_LOADREPORT_MASTER_F, messages())
538                             << systemNodeProcessed_
539                             << systemWorkQuantity_
540                             << systemWorkQuantityForce_
541                             << systemSendCount_ << systemRecvCount_
542                             << masterCheckCount
543                             << incumbentValue_
544                             << timer_.getWallClock()
545                             << CoinMessageEol;
546                     }
547                     else {
548                         messageHandler()->message(ALPS_LOADREPORT_MASTER_F_N, messages())
549                             << systemNodeProcessed_
550                             << systemWorkQuantity_
551                             << systemWorkQuantityForce_
552                             << systemSendCount_ << systemRecvCount_
553                             << masterCheckCount
554                             << timer_.getWallClock()
555                             << CoinMessageEol;
556                     }
557                 }
558                 systemWorkQuantityForce_ = systemWorkQuantity_;
559             }
560         }
561 
562         //**------------------------------------------------
563         // Print workload information to screen.
564         //**------------------------------------------------
565 
566         reportInterval =
567             model_->AlpsPar()->entry(AlpsParams::masterReportInterval);
568 
569         assert(reportInterval > 0);
570 
571         if (reportIntervalCount % reportInterval == 0) {
572             if (msgLevel_ > 0) {
573                 if (forceTerminate_) {
574                     //d::cout << "******systemWorkQuantityForce_ = " << systemWorkQuantityForce_<< std::endl;
575                     if (incumbentValue_ < ALPS_INFINITY) {
576                         messageHandler()->message(ALPS_LOADREPORT_MASTER_F, messages())
577                             << systemNodeProcessed_
578                             << systemWorkQuantity_
579                             << systemWorkQuantityForce_
580                             << systemSendCount_ << systemRecvCount_
581                             << masterCheckCount
582                             << incumbentValue_
583                             << timer_.getWallClock()
584                             << CoinMessageEol;
585                     }
586                     else {
587                         messageHandler()->message(ALPS_LOADREPORT_MASTER_F_N, messages())
588                             << systemNodeProcessed_
589                             << systemWorkQuantity_
590                             << systemWorkQuantityForce_
591                             << systemSendCount_ << systemRecvCount_
592                             << masterCheckCount
593                             << timer_.getWallClock()
594                             << CoinMessageEol;
595                     }
596                 }
597                 else if (printSystemStatus) {
598                     if (incumbentValue_ < ALPS_INFINITY) {
599                         messageHandler()->message(ALPS_LOADREPORT_MASTER, messages())
600                             << systemNodeProcessed_
601                             << systemWorkQuantity_
602                             << systemSendCount_ << systemRecvCount_
603                             << masterCheckCount << masterBalancePeriod_
604                             << nodeProcessingTime_ << unitWorkNodes_
605                             << incumbentValue_
606                             << timer_.getWallClock()
607                             << CoinMessageEol;
608                     }
609                     else {
610                         messageHandler()->message(ALPS_LOADREPORT_MASTER_N, messages())
611                             << systemNodeProcessed_
612                             << systemWorkQuantity_
613                             << systemSendCount_ << systemRecvCount_
614                             << masterCheckCount << masterBalancePeriod_
615                             << nodeProcessingTime_ << unitWorkNodes_
616                             << timer_.getWallClock()
617                             << CoinMessageEol;
618                     }
619                 }
620             }
621 
622 #if 0
623             std::cout << "Master: ";
624             for (i = 0; i < hubNum_; ++i) {
625                 std::cout << "hub[" << i << "]=" <<
626                     hubWorkQuantities_[i] << ", ";
627             }
628             for (i = 0; i < clusterSize_; ++i) {
629                 std::cout << "w[" << i << "]=" <<
630                     workerWorkQuantities_[i] << ", ";
631             }
632             std::cout << std::endl;
633 #endif
634         }
635         ++reportIntervalCount;
636 
637         //**------------------------------------------------
638         // If terminate check can be activated.
639         //**------------------------------------------------
640 
641         if ( allWorkerReported &&
642              allHubReported_ &&
643              (systemWorkQuantity_ < zeroLoad ) &&
644              (systemSendCount_ == systemRecvCount_) ) {
645 
646             preSysSendCount = systemSendCount_;
647             blockTermCheck_ = false;   // Activate termination check
648         }
649 
650         // Print node log if there is and not force terminate
651         // (will print system status)
652         if (!forceTerminate_) {
653             getModel()->nodeLog(NULL, false);
654         }
655 
656 #if 0
657         std::cout << "blockTermCheck_=" << blockTermCheck_
658                   << ", allWorkerReported=" << allWorkerReported
659                   << ", allHubReported_=" << allHubReported_
660                   << ", systemWorkQuantity_=" << systemWorkQuantity_
661                   << ", preSysSendCount=" << preSysSendCount
662                   << ", systemSendCount_=" << systemSendCount_
663                   << ", systemRecvCount_=" << systemRecvCount_
664                   << std::endl;
665 #endif
666 
667         //**------------------------------------------------
668         // Do termination checking if activated.
669         // Note: All msgs during termination checking are not counted.
670         //**------------------------------------------------
671 
672         if ( ! blockTermCheck_ ) {
673             if (msgLevel_ > 0) {
674                 messageHandler()->message(ALPS_TERM_MASTER_START, messages())
675                   << globalRank_ << CoinMessageEol;
676             }
677             // Ask other hubs to do termination check.
678             for (i = 0; i < hubNum_; ++i) {
679                 if (hubRanks_[i] != globalRank_) {
680                     MPI_Send(smallBuffer_, 0, MPI_PACKED, hubRanks_[i],
681                              AlpsMsgAskHubPause, MPI_COMM_WORLD);
682 #ifdef NF_DEBUG
683                     std::cout << "Master["<< masterRank_ << "] ask hub["
684                               << hubRanks_[i] << "] to do termination check."
685                               << std::endl;
686 #endif
687                 }
688             }
689 
690             // Ask master's workers to do termination check.
691             for (i = 0; i < clusterSize_; ++i) {
692                 //if (i != clusterRank_) {
693                 if (i != globalRank_) {
694 #ifdef NF_DEBUG
695                     std::cout << "Master["<< masterRank_ << "] ask its worker["
696                               << i << "] to do termination check."<< std::endl;
697 #endif
698                     MPI_Send(smallBuffer_, 0, MPI_PACKED, i,
699                              AlpsMsgAskPause, MPI_COMM_WORLD);
700                 }
701             }
702 
703 
704             MPI_Request termReq;
705             MPI_Status termSta;
706 
707             // Update the cluster status to which the Master belongs
708             for(i = 1; i < clusterSize_; ++i) {
709                 MPI_Irecv(smallBuffer_, smallSize, MPI_PACKED, MPI_ANY_SOURCE,
710                           AlpsMsgWorkerTermStatus, clusterComm_, &termReq);
711                 MPI_Wait(&termReq, &termSta);
712                 hubUpdateCluStatus(smallBuffer_, &termSta, clusterComm_);
713             }
714 
715 #ifdef NF_DEBUG
716             std::cout << "Master: TERM: finished updating its cluster."
717                       << std::endl;
718 #endif
719             // clusterWorkQuality_ += workQuality_;
720             clusterWorkQuantity_ += workQuantity_;// workQuantity = 0
721             clusterSendCount_ += sendCount_;
722             clusterRecvCount_ += recvCount_;
723             sendCount_ = recvCount_ = 0;
724 
725             // Update system status
726             for(i = 1; i < hubNum_; ++i) {
727                 MPI_Irecv(smallBuffer_, smallSize, MPI_PACKED, MPI_ANY_SOURCE,
728                           AlpsMsgHubTermStatus, hubComm_, &termReq);
729                 MPI_Wait(&termReq, &termSta);
730                 masterUpdateSysStatus(smallBuffer_, &termSta, hubComm_);
731             }
732 
733 #ifdef NF_DEBUG
734             std::cout << "Master: TERM: finished updating hubs." <<std::endl;
735 #endif
736             systemWorkQuantity_ += clusterWorkQuantity_;
737             systemSendCount_ += clusterSendCount_;
738             systemRecvCount_ += clusterRecvCount_;
739             clusterSendCount_ = clusterRecvCount_ = 0;
740 
741 #ifdef NF_DEBUG
742             std::cout << "Master: TERM: Quantity_ = " << systemWorkQuantity_
743                       << ", systemSendCount_ = " << systemSendCount_
744                       << ", systemRecvCount_ = " << systemRecvCount_
745                       << "; preSysSendCount = " << preSysSendCount
746                       << std::endl;
747 #endif
748 
749             if ( (systemWorkQuantity_ < zeroLoad) &&
750                  (preSysSendCount == systemSendCount_) ) {
751                 terminate = true;
752             }
753             else {
754                 terminate = false;
755                 // Do forget to deduct! refeshSysStatus will add clusterWorkQuantitY_
756                 systemWorkQuantity_ -= clusterWorkQuantity_;
757             }
758 
759 #ifdef NF_DEBUG
760             std::cout << "Master: TERM: terminate=" << terminate <<std::endl;
761 #endif
762             // True idle, tell others to terminate.
763             if (terminate) {
764                 if (msgLevel_ > 0) {
765                     messageHandler()->message(ALPS_TERM_MASTER_INFORM,messages())
766                         << globalRank_ << "stop searching" << CoinMessageEol;
767                 }
768 
769                 // Send instruction to my hubs (as the master)
770                 for (i = 0; i < hubNum_; ++i) {
771                     if (hubRanks_[i] != globalRank_) {
772                         // i is not Master cluster.
773                         reply = 'T';
774                         MPI_Send(&reply, 1, MPI_CHAR, i, AlpsMsgContOrTerm,
775                                  hubComm_);
776                     }
777                 }
778 
779                 // Send instruction to my works (as a hub)
780                 for (i = 0; i < clusterSize_; ++i) {
781                     if (i != globalRank_) {
782                         reply = 'T';
783 #ifdef NF_DEBUG
784                         std::cout << "Master["<< masterRank_
785                                   << "] ask its worker["
786                                   << i << "] to terminate."
787                                   << " clusterRank_=" <<  clusterRank_
788                                   << std::endl;
789 #endif
790                         MPI_Send(&reply, 1, MPI_CHAR, i, AlpsMsgContOrTerm,
791                                  clusterComm_);
792                     }
793                 }
794             }
795             else {  // Not true idle yet
796                 if (msgLevel_ > 0) {
797                     messageHandler()->message(ALPS_TERM_MASTER_INFORM, messages())
798                         << globalRank_ << "continue searching" << CoinMessageEol;
799                 }
800 
801                 blockTermCheck_ = true;
802 
803                 // Send instruction to the hubs (as the master)
804                 for (i = 0; i < hubNum_; ++i) {
805                     if (hubRanks_[i] != globalRank_) { // i is not Master
806                         reply = 'C';
807                         MPI_Send(&reply, 1, MPI_CHAR, i,AlpsMsgContOrTerm,
808                                  hubComm_);
809                     }
810                 }
811                 // Send instruction to the works (as a hub)
812                 for (i = 1; i < clusterSize_; ++i) {
813                     reply = 'C';
814                     MPI_Send(&reply, 1, MPI_CHAR, i, AlpsMsgContOrTerm,
815                              clusterComm_);
816                 }
817             }
818 
819             if (msgLevel_ > 0) {
820                 if (forceTerminate_) {
821                     if (incumbentValue_ < ALPS_INFINITY) {
822                         messageHandler()->message(ALPS_LOADREPORT_MASTER_F, messages())
823                             << systemNodeProcessed_
824                             << systemWorkQuantity_
825                             << systemWorkQuantityForce_
826                             << systemSendCount_ << systemRecvCount_
827                             << masterCheckCount
828                             << incumbentValue_
829                             << timer_.getWallClock()
830                             << CoinMessageEol;
831                     }
832                     else {
833                         messageHandler()->message(ALPS_LOADREPORT_MASTER_F_N, messages())
834                             << systemNodeProcessed_
835                             << systemWorkQuantity_
836                             << systemWorkQuantityForce_
837                             << systemSendCount_ << systemRecvCount_
838                             << masterCheckCount
839                             << timer_.getWallClock()
840                             << CoinMessageEol;
841                     }
842                 }
843                 else if (printSystemStatus) {
844                     if (incumbentValue_ < ALPS_INFINITY) {
845                         messageHandler()->message(ALPS_LOADREPORT_MASTER, messages())
846                             << systemNodeProcessed_
847                             << systemWorkQuantity_
848                             << systemSendCount_ << systemRecvCount_
849                             << masterCheckCount << masterBalancePeriod_
850                             << nodeProcessingTime_ << unitWorkNodes_
851                             << incumbentValue_
852                             << timer_.getWallClock()
853                             << CoinMessageEol;
854                     }
855                     else {
856                         messageHandler()->message(ALPS_LOADREPORT_MASTER_N, messages())
857                             << systemNodeProcessed_
858                             << systemWorkQuantity_
859                             << systemSendCount_ << systemRecvCount_
860                             << masterCheckCount << masterBalancePeriod_
861                             << nodeProcessingTime_ << unitWorkNodes_
862                             << timer_.getWallClock()
863                             << CoinMessageEol;
864                     }
865                 }
866             }
867 
868             if (terminate){
869                 // Break after printing msg
870                 break;  // Break *,  Master terminates
871             }
872         }
873 
874         //**------------------------------------------------
875         // Master balances work load of hubs if
876         // (1) not terminate,
877         // (2) not force terminate,
878         // (3) all hubs reported, and
879         // (4) previous balance has been completed.
880         //**------------------------------------------------
881 
882         if (msgLevel_ > 10) {
883             std::cout << "masterDoBalance_ = " << masterDoBalance_
884                       << ", allHubReported_ = " << allHubReported_
885                       << std::endl;
886         }
887 
888         if ( !terminate && !forceTerminate_ &&
889              allHubReported_ && (masterDoBalance_ == 0) ) {
890             if (hubNum_ > 1 && interCB) {
891                 masterBalanceHubs();
892                 ++masterCheckCount;
893                 if (masterCheckCount % 10 == 0) {
894                     if (msgLevel_ > 10) {
895                         messageHandler()->message(ALPS_LOADBAL_MASTER, messages())
896                             << globalRank_ << masterCheckCount << CoinMessageEol;
897                     }
898                 }
899             }
900         }
901 
902         if (msgLevel_ > 10) {
903             std::cout << "++++ hubDoBalance_ = " << hubDoBalance_
904                       << ", allWorkerReported = " << allWorkerReported
905                       << std::endl;
906         }
907 
908         if ( !terminate && !forceTerminate_ &&
909              allWorkerReported && hubDoBalance_ == 0 ) {
910             if (clusterSize_ > 2 && intraCB) {
911                 hubBalanceWorkers();
912                 ++hubCheckCount;
913                 if (hubCheckCount % 10 == 0) {
914                     if (msgLevel_ > 10) {
915                         messageHandler()->message(ALPS_LOADBAL_HUB, messages())
916                             << globalRank_ << hubCheckCount << CoinMessageEol;
917                     }
918                 }
919             }
920         }
921     }
922 
923     //------------------------------------------------------
924     // Clean up before leaving.
925     //------------------------------------------------------
926 
927     int cancelNum = 0;
928 
929     MPI_Cancel(&request);
930     MPI_Wait(&request, &status);
931 
932     MPI_Test_cancelled(&status, &flag);
933     if(flag) {  // Cancel succeeded
934         ++cancelNum;
935     }
936 }
937 
938 //#############################################################################
939 
940 void
hubMain()941 AlpsKnowledgeBrokerMPI::hubMain()
942 {
943     int i;
944     int hubCheckCount = 0;
945     int errorCode = 0;
946 
947     char reply = 'C';
948 
949     double elaspeTime = 0.0;
950 
951     bool allWorkerReported = false;   // Workers report load at least once
952     bool terminate = false;
953     bool deletedSTs = false;
954     bool comUnitWork = false;
955 
956     int numComUnitWork = 0;
957 
958     AlpsReturnStatus rCode = AlpsReturnStatusOk;
959     AlpsExitStatus exitStatus = AlpsExitStatusInfeasible;
960     MPI_Status status;
961 
962     //------------------------------------------------------
963     // Get parameters.
964     //------------------------------------------------------
965 
966     const int staticBalanceScheme =
967         model_->AlpsPar()->entry(AlpsParams::staticBalanceScheme);
968     const int smallSize =
969         model_->AlpsPar()->entry(AlpsParams::smallSize);
970     const bool intraCB =
971         model_->AlpsPar()->entry(AlpsParams::intraClusterBalance);
972     const double zeroLoad =
973         model_->AlpsPar()->entry(AlpsParams::zeroLoad);
974     double unitTime =
975         model_->AlpsPar()->entry(AlpsParams::unitWorkTime);
976     if (unitTime <= 0.0) {
977         // Not set by user
978         unitTime  = ALPS_DBL_MAX;
979     }
980 
981     double changeWorkThreshold = model_->AlpsPar()->
982         entry(AlpsParams::changeWorkThreshold);
983 
984     hubReportPeriod_ = model_->AlpsPar()->entry(AlpsParams::hubReportPeriod);
985     if (hubReportPeriod_ > 0.0) {
986         userBalancePeriod_ = true;
987     }
988     else {
989         hubReportPeriod_ = -hubReportPeriod_;
990     }
991 
992     largeSize_ = model_->AlpsPar()->entry(AlpsParams::largeSize);
993 
994     unitWorkNodes_ = model_->AlpsPar()->entry(AlpsParams::unitWorkNodes);
995     if (unitWorkNodes_ <= 0) {
996         comUnitWork = true;
997     }
998 
999     //------------------------------------------------------
1000     // Initialization and setup.
1001     //------------------------------------------------------
1002 
1003     hubTimer_.setClockType(AlpsClockTypeWallClock);
1004 
1005     workerNodeProcesseds_ = new int [clusterSize_];
1006     workerWorkQualities_ = new double [clusterSize_];
1007     workerWorkQuantities_ = new double [clusterSize_];
1008     workerReported_ = new bool [clusterSize_];
1009 
1010     for (i = 0; i < clusterSize_; ++i) {
1011         workerNodeProcesseds_[i] = 0;
1012         workerWorkQualities_[i] = ALPS_OBJ_MAX;
1013         workerWorkQuantities_[i] = 0.0;
1014     }
1015 
1016     workerNodeProcesseds_[0] = nodeProcessedNum_;
1017     workerWorkQualities_[0] = workQuality_;
1018     workerWorkQuantities_[0] = workQuantity_ = 0.0;
1019 
1020     //------------------------------------------------------
1021     // Recv tree node size and send it to my worker.
1022     // NOTE: master's rank is always 0 in hubComm_.
1023     //------------------------------------------------------
1024 
1025     MPI_Recv(&nodeMemSize_, 1, MPI_INT, 0, AlpsMsgNodeSize, hubComm_, &status);
1026     for (i = 0; i < clusterSize_; ++i) {
1027         if (i != clusterRank_) {
1028             MPI_Send(&nodeMemSize_, 1, MPI_INT, i, AlpsMsgNodeSize,
1029                      clusterComm_);
1030         }
1031     }
1032 
1033     // Adjust largeSize to avoid extreme cases.
1034     largeSize_ = CoinMax(largeSize_, nodeMemSize_ * 3);
1035 
1036     largeBuffer_ = new char [largeSize_];
1037     largeBuffer2_ = new char [largeSize_];
1038     smallBuffer_ = new char [smallSize];
1039 
1040     MPI_Barrier(MPI_COMM_WORLD); // Sync before rampup
1041 
1042     //======================================================
1043     // Hub's Ramp-up.
1044     //======================================================
1045 
1046     // Start to measure rampup
1047     hubTimer_.start();
1048 
1049     setPhase(AlpsPhaseRampup);
1050     switch (staticBalanceScheme) {
1051     case AlpsRootInit:
1052         rootInitHub();
1053         break;
1054     case AlpsSpiral:
1055         spiralHub();
1056         break;
1057     default:
1058         throw CoinError("Unknown static balance scheme", "hubMain",
1059                         "AlpsKnowledgeBrokerMPI");
1060     }
1061 
1062     rampUpTime_ = hubTimer_.getTime();
1063 
1064     //======================================================
1065     // Hub's search
1066     //======================================================
1067 
1068     setPhase(AlpsPhaseSearch);
1069 
1070     //------------------------------------------------------
1071     // HUB SCHEDULER:
1072     // (1) Listen and process messages periodically.
1073     // (2) If required, do one unit of work.
1074     // (3) Send work quality, quantity, and msg counts to Master.
1075     // (4) Balance workload quality of my workers.
1076     // (5) Do termination check if requred.
1077     //------------------------------------------------------
1078 
1079     hubReportPeriod_ = computeBalancePeriod(userBalancePeriod_,
1080                                             hubReportPeriod_,
1081                                             nodeProcessingTime_);
1082 
1083     if (hubMsgLevel_ > 5 && intraCB) {
1084         messageHandler()->message(ALPS_LOADBAL_HUB_PERIOD, messages())
1085             << globalRank_ << hubReportPeriod_ << CoinMessageEol;
1086     }
1087 
1088     MPI_Request request;
1089     int flag = 1;
1090 
1091     MPI_Irecv(largeBuffer_, largeSize_, MPI_PACKED, MPI_ANY_SOURCE,
1092               MPI_ANY_TAG, MPI_COMM_WORLD, &request);
1093 
1094     while (true) {
1095 
1096         flag = 1;
1097 
1098         //**------------------------------------------------
1099         // Listen and process msg for a period.
1100         //**------------------------------------------------
1101 
1102         hubTimer_.start();
1103         elaspeTime = 0.0;
1104         while (elaspeTime < hubReportPeriod_ ) {
1105             MPI_Test(&request, &flag, &status);
1106             if (flag) { // Received a msg
1107               processMessages(largeBuffer_, status, request);
1108             }
1109             elaspeTime = hubTimer_.getTime();
1110         }
1111 
1112         //std::cout << "++++++ hubReportPeriod_ = "
1113         //    << hubReportPeriod_ << std::endl;
1114 
1115 
1116         //**------------------------------------------------
1117         // if forceTerminate_ == true;
1118         //**------------------------------------------------
1119 
1120         if (forceTerminate_) {
1121             // Delete all subtrees if hub work.
1122             if (hubWork_) {
1123                 deleteSubTrees();
1124                 updateWorkloadInfo();
1125                 assert(!(subTreePool_->hasKnowledge()));
1126             }
1127             if (!deletedSTs) {
1128                 hubForceWorkerTerm();
1129             }
1130 
1131             deletedSTs = true;
1132         }
1133 
1134         //**------------------------------------------------
1135         // Check if all my workers have reported.
1136         //**------------------------------------------------
1137 
1138         if (!allWorkerReported) {
1139             workerWorkQualities_[masterRank_] = workQuality_;
1140             workerWorkQuantities_[masterRank_] = workQuantity_;
1141             workerReported_[masterRank_] = true;
1142             allWorkerReported = true;
1143             for (i = 0; i < clusterSize_; ++i) {
1144                 if (workerReported_[i] == false) {
1145                     allWorkerReported = false;
1146                     break;
1147 
1148                 }
1149             }
1150         }
1151 
1152         //**------------------------------------------------
1153         // If hub work, do one unit of work.
1154         //**------------------------------------------------
1155 
1156         if ( !haltSearch_ && hubWork_ &&
1157              (workingSubTree_ != 0 || hasKnowledge(AlpsKnowledgeTypeSubTree))){
1158 
1159             //reportCount = hubReportFreqency;
1160 
1161             // NOTE: will stop when there is a better solution.
1162             bool betterSolution = true;
1163             int thisNumProcessed = 0;
1164             int thisNumBranched = 0;
1165             int thisNumDiscarded = 0;
1166             int thisNumPartial = 0;
1167 
1168             // Compute unit work based on node processing time
1169             if (comUnitWork) {
1170                 unitWorkNodes_ = computeUnitNodes(unitWorkNodes_, nodeProcessingTime_);
1171                 if (++numComUnitWork > 200) {
1172                     comUnitWork = false;
1173                 }
1174             }
1175             try {
1176                 rCode = doOneUnitWork(unitWorkNodes_,
1177                                       unitTime,
1178                                       exitStatus,
1179                                       thisNumProcessed,
1180                                       thisNumBranched,
1181                                       thisNumDiscarded,
1182                                       thisNumPartial,
1183                                       treeDepth_,
1184                                       betterSolution);
1185                 nodeProcessedNum_ += thisNumProcessed;
1186                 nodeBranchedNum_ += thisNumBranched;
1187                 nodeDiscardedNum_ += thisNumDiscarded;
1188                 nodePartialNum_ += thisNumPartial;
1189             }
1190             catch (std::bad_alloc&) {
1191                 errorCode = 1;
1192             }
1193             catch(CoinError& er) {
1194                 errorCode = 2;
1195             }
1196             catch(...) {
1197                 errorCode = 3;
1198             }
1199 
1200             if (errorCode) {
1201                 // Do we want to free some memory?
1202                 if (hubWork_) {
1203                     deleteSubTrees();
1204                     updateWorkloadInfo();
1205                     assert(!(subTreePool_->hasKnowledge()));
1206                 }
1207                 haltSearch_ = true;
1208                 sendErrorCodeToMaster(errorCode);
1209             }
1210 
1211             // Update work load quantity and quality info.
1212             updateWorkloadInfo();
1213 
1214             // Adjust workingSubTree_ if it 'much' worse than the best one.
1215             changeWorkingSubTree(changeWorkThreshold);
1216 
1217 #if 0
1218             std::cout << "******** HUB [" << globalRank_ << "]: "
1219                       << " quality = " << workQuality_
1220                       << "; quantity = " << workQuantity_ << std::endl;
1221 #endif
1222 
1223             if (betterSolution) {
1224                 // Double check if better. If yes, update and sent it to Master
1225                 double incVal =
1226                    getBestKnowledge(AlpsKnowledgeTypeSolution).second;
1227                 if(incVal < incumbentValue_) {  // Minimization
1228                     incumbentValue_ = incVal;
1229                     incumbentID_ = globalRank_;
1230                     sendKnowledge(AlpsKnowledgeTypeSolution, //useful
1231                                   globalRank_,
1232                                   0,
1233                                   smallBuffer_,
1234                                   0,
1235                                   MPI_ANY_TAG,
1236                                   MPI_COMM_WORLD,
1237                                   false);
1238                     //sendIncumbent();
1239                     if (hubMsgLevel_ > 0) {
1240                         messageHandler()->message(ALPS_SOLUTION_SEARCH,
1241                                                   messages())
1242                             << globalRank_ << incVal << CoinMessageEol;
1243                     }
1244                 }
1245             }
1246         }// EOF if hub work
1247 
1248         //**------------------------------------------------
1249         // Add into the hub's status to cluster's status.
1250         // Need to check if report.
1251         //**------------------------------------------------
1252 
1253         refreshClusterStatus();
1254 
1255 #if 0
1256         std::cout << "HUB "<< globalRank_
1257                   << ": clusterWorkQuality_  = " << clusterWorkQuality_
1258                   << ": clusterWorkQuantity_  = " << clusterWorkQuantity_
1259                   << ", sendCount_ = " << sendCount_
1260                   << ", recvCount_ = " << recvCount_
1261                   << ", allWorkerReported = " << allWorkerReported
1262                   << ", blockHubReport_ = " << blockHubReport_
1263                   << ", blockTermCheck_ = " << blockTermCheck_
1264                   << std::endl;
1265 #endif
1266 
1267         //**------------------------------------------------
1268         // If needed, report cluster status, and check if should
1269         // block report.
1270         //**------------------------------------------------
1271 
1272         if ((clusterSendCount_ || clusterRecvCount_ || !blockHubReport_)) {
1273             //&& reportCount == hubReportFreqency) {
1274 
1275             //reportCount = 0;
1276             incSendCount("hubMain()-hubReportStatus");
1277 
1278             if (hubWork_) {
1279                 updateWorkloadInfo();
1280             }
1281             refreshClusterStatus();  // IMPORTANT: report latest state
1282             hubReportStatus(AlpsMsgHubPeriodReport, MPI_COMM_WORLD);
1283 
1284             if (clusterWorkQuantity_ < zeroLoad) {
1285                 blockHubReport_ = true;
1286 #ifdef NF_DEBUG_MORE
1287                 std::cout << "HUB[" << globalRank_ << "]: blockHubReport"
1288                           << std::endl;
1289 #endif
1290             }
1291             else {
1292                 blockHubReport_ = false;
1293 #ifdef NF_DEBUG_MORE
1294                 std::cout << "HUB[" << globalRank_ << "]: unblockHubReport"
1295                           << std::endl;
1296 #endif
1297             }
1298         }
1299 
1300         //**------------------------------------------------
1301         // If master ask hub to do termination check.
1302         //**------------------------------------------------
1303 
1304         if (!blockTermCheck_) {
1305 
1306             // Ask hub's workers to check termination
1307             for (i = 0; i < clusterSize_; ++i) {
1308                 if (i != clusterRank_) {
1309                     int workRank = globalRank_ - masterRank_ + i;
1310                     MPI_Send(smallBuffer_, 0, MPI_PACKED, workRank,
1311                              AlpsMsgAskPause, MPI_COMM_WORLD);
1312 #ifdef NF_DEBUG
1313                     std::cout << "HUB[" << globalRank_ << "]: ask its worker "
1314                               << workRank << " to do TERM check."
1315                               << ", clusterRank_=" << clusterRank_
1316                               << ", i=" << i << std::endl;
1317 #endif
1318                 }
1319             }
1320 
1321             // Recv workers' stati
1322             MPI_Request termReq;
1323             MPI_Status termSta;
1324             for(i = 1; i < clusterSize_; ++i) {
1325                 MPI_Irecv(smallBuffer_, smallSize, MPI_PACKED, MPI_ANY_SOURCE,
1326                           AlpsMsgWorkerTermStatus, clusterComm_, &termReq);
1327                 MPI_Wait(&termReq, &termSta);
1328                 hubUpdateCluStatus(smallBuffer_, &termSta, clusterComm_);
1329             }
1330 
1331             // No workQuantity_ and workQuality for the hub
1332             //clusterWorkQuality_ += workQuality_;
1333             //clusterWorkQuantity_ += workQuantity_;
1334             clusterSendCount_ += sendCount_;
1335             clusterRecvCount_ += recvCount_;
1336             sendCount_ = recvCount_ = 0;
1337 
1338             // Report my status to master
1339             hubReportStatus(AlpsMsgHubTermStatus, hubComm_);
1340 
1341 #ifdef NF_DEBUG
1342             std::cout << "HUB[" << globalRank_ << "]: reported  TERM status to "
1343                       << "master " << masterRank_ << std::endl;
1344 #endif
1345 
1346             // Get termination instruction from Master
1347             // NOTE: master's rank is always 0 in hubComm_.
1348             MPI_Irecv(&reply, 1, MPI_CHAR, 0, AlpsMsgContOrTerm, hubComm_,
1349                       &termReq);
1350             MPI_Wait(&termReq, &termSta);
1351 
1352 #ifdef NF_DEBUG
1353             std::cout << "HUB[" << globalRank_ << "]: received TERM instruction "
1354                       << reply << " from master " << masterRank_ << std::endl;
1355 #endif
1356 
1357 
1358             if(reply == 'T') {
1359                 if (hubMsgLevel_ > 0) {
1360                     messageHandler()->message(ALPS_TERM_HUB_INFORM, messages())
1361                         << globalRank_<< "exit" << CoinMessageEol;
1362                 }
1363 
1364                 // Send instruction to my workers
1365                 for (i = 0; i < clusterSize_; ++i) {
1366                     if (i != clusterRank_) {
1367                         reply = 'T';
1368                         MPI_Send(&reply, 1, MPI_CHAR, i, AlpsMsgContOrTerm,
1369                                  clusterComm_);
1370 #ifdef NF_DEBUG
1371                         std::cout << "HUB[" << globalRank_ << "]: ask its worker "
1372                                   << i << " to TERM." << std::endl;
1373 #endif
1374                     }
1375                 }
1376                 terminate = true;
1377                 break;    // Break * and terminate
1378             }
1379             else {
1380                 if (hubMsgLevel_ > 0) {
1381                     messageHandler()->message(ALPS_TERM_HUB_INFORM, messages())
1382                         << globalRank_<< "continue" << CoinMessageEol;
1383                 }
1384                 for (i = 0; i < clusterSize_; ++i) {
1385                     if (i != clusterRank_) {
1386                         reply = 'C';
1387                         MPI_Send(&reply, 1, MPI_CHAR, i, AlpsMsgContOrTerm,
1388                                  clusterComm_);
1389 #ifdef NF_DEBUG
1390                         std::cout << "HUB[" << globalRank_ << "]: ask its worker "
1391                                   << i << " to continue." << std::endl;
1392 #endif
1393                     }
1394                 }
1395                 terminate = false;
1396                 blockTermCheck_ = true;
1397                 blockHubReport_ = false;
1398             }
1399         }
1400 
1401         //**------------------------------------------------
1402         // Hub balances the workload of its workers if
1403         // (1) not terminate,
1404         // (2) not force terminate,
1405         // (3) all workers have reported, and
1406         // (4) previous balance has been completed.
1407         //**------------------------------------------------
1408 
1409         if ( !terminate && !forceTerminate_ &&
1410              allWorkerReported && hubDoBalance_ == 0 ) {
1411             if (clusterSize_ > 2 && intraCB) {
1412                 hubBalanceWorkers();
1413                 ++hubCheckCount;
1414                 if (hubCheckCount % 10 == 0) {
1415                     if (hubMsgLevel_ > 0) {
1416                         messageHandler()->message(ALPS_LOADBAL_HUB, messages())
1417                             << globalRank_ << hubCheckCount << CoinMessageEol;
1418                     }
1419                 }
1420             }
1421         }
1422 
1423     } // EOF while
1424 
1425 
1426     //------------------------------------------------------
1427     // Clean up before leaving.
1428     //------------------------------------------------------
1429 
1430     int cancelNum = 0;
1431     MPI_Cancel(&request);
1432     MPI_Wait(&request, &status);
1433     MPI_Test_cancelled(&status, &flag);
1434     if(flag) {  // Cancel succeeded
1435         ++cancelNum;
1436     }
1437 }
1438 
1439 //#############################################################################
1440 
1441 void
workerMain()1442 AlpsKnowledgeBrokerMPI::workerMain()
1443 {
1444     bool isIdle = false;
1445     int thisNumProcessed = 0;
1446     int thisNumBranched = 0;
1447     int thisNumDiscarded = 0;
1448     int thisNumPartial = 0;
1449     int errorCode = 0;
1450 
1451     char reply = 'C';
1452     char* tempBuffer = 0;
1453 
1454     bool terminate = false;
1455     bool allMsgReceived = false;
1456     bool deletedSTs = false;
1457     bool comUnitWork = false;
1458 
1459     int numComUnitWork = 0;
1460 
1461     MPI_Status status;
1462     AlpsReturnStatus rCode = AlpsReturnStatusOk;
1463     AlpsExitStatus exitStatus = AlpsExitStatusInfeasible;
1464 
1465     AlpsTimer msgTimer;
1466 
1467     //------------------------------------------------------
1468     // Get parameters.
1469     //------------------------------------------------------
1470 
1471     const int staticBalanceScheme =
1472         model_->AlpsPar()->entry(AlpsParams::staticBalanceScheme);
1473     const int nodeLogInterval =
1474         model_->AlpsPar()->entry(AlpsParams::nodeLogInterval);
1475     const int smallSize =
1476         model_->AlpsPar()->entry(AlpsParams::smallSize);
1477     const int workerMsgLevel =
1478         model_->AlpsPar()->entry(AlpsParams::workerMsgLevel);
1479     const double zeroLoad =
1480         model_->AlpsPar()->entry(AlpsParams::zeroLoad);
1481     double changeWorkThreshold =
1482         model_->AlpsPar()->entry(AlpsParams::changeWorkThreshold);
1483     const double needWorkThreshold =
1484         model_->AlpsPar()->entry(AlpsParams::needWorkThreshold);
1485     const bool intraCB =
1486         model_->AlpsPar()->entry(AlpsParams::intraClusterBalance);
1487     double unitTime  =
1488         model_->AlpsPar()->entry(AlpsParams::unitWorkTime);
1489     if (unitTime <= 0.0) {
1490         unitTime  = ALPS_DBL_MAX;
1491     }
1492 
1493     unitWorkNodes_ = model_->AlpsPar()->entry(AlpsParams::unitWorkNodes);
1494     if (unitWorkNodes_ <= 0) {
1495         comUnitWork = true;
1496     }
1497 
1498     largeSize_ = model_->AlpsPar()->entry(AlpsParams::largeSize);
1499 
1500     workerTimer_.setClockType(AlpsClockTypeWallClock);
1501     msgTimer.setClockType(AlpsClockTypeWallClock);
1502 
1503     //------------------------------------------------------
1504     // Recv node memory size from hub.
1505     //------------------------------------------------------
1506 
1507     MPI_Recv(&nodeMemSize_, 1, MPI_INT, masterRank_, /* masterRank_ is 0 or 1*/
1508              AlpsMsgNodeSize, clusterComm_, &status);
1509 
1510     // Adjust largeSize to avoid extreme cases.
1511     largeSize_ = CoinMax(largeSize_, nodeMemSize_ * 3);
1512 
1513     largeBuffer_ = new char [largeSize_];
1514     largeBuffer2_ = new char [largeSize_];
1515     smallBuffer_ = new char [smallSize];
1516 
1517     MPI_Barrier(MPI_COMM_WORLD); // Sync before rampup
1518 
1519     //======================================================
1520     // Worker's Ramp-up.
1521     //======================================================
1522 
1523     workerTimer_.start();
1524 
1525     setPhase(AlpsPhaseRampup);
1526     switch (staticBalanceScheme) {
1527     case AlpsRootInit:
1528         rootInitWorker();
1529         break;
1530     case AlpsSpiral:
1531         spiralWorker();
1532         break;
1533     default:
1534         throw CoinError("Unknown static balance scheme", "workerMain",
1535                         "AlpsKnowledgeBrokerMPI");
1536     }
1537 
1538     rampUpTime_ = workerTimer_.getTime();
1539 
1540     //======================================================
1541     // Worker start to search.
1542     //======================================================
1543 
1544     if (workerMsgLevel > 0) {
1545         messageHandler()->message(ALPS_SEARCH_WORKER_START, messages())
1546             <<globalRank_ << CoinMessageEol;
1547     }
1548 
1549     setPhase(AlpsPhaseSearch);
1550 
1551     //------------------------------------------------------
1552     // WORKER SCHEDULER:
1553     // (1) Listen and process messages until no message is in msg queue.
1554     // (2) Do one unit of work.
1555     // (3) Report status or check termination.
1556     //------------------------------------------------------
1557 
1558     MPI_Request request;
1559     int flag;
1560 
1561     MPI_Irecv(largeBuffer_, largeSize_, MPI_PACKED, MPI_ANY_SOURCE,
1562               MPI_ANY_TAG, MPI_COMM_WORLD, &request);
1563 
1564     while (true) {
1565 
1566         blockTermCheck_ = true;
1567 
1568         //**------------------------------------------------
1569         // Listen and process until no message.
1570         //**------------------------------------------------
1571 
1572         msgTimer.start();
1573         while (true) {
1574             MPI_Test(&request, &flag, &status);
1575             if (flag) { // Receive a msg
1576                 allMsgReceived = false;
1577                 processMessages(largeBuffer_, status, request);
1578             }
1579             else {
1580                 allMsgReceived = true;
1581                 break;
1582             }
1583         }
1584         msgTime_ += msgTimer.getTime();
1585 
1586         //**------------------------------------------------
1587         // if forceTerminate_ == true;
1588         //**------------------------------------------------
1589 
1590         //if (forceTerminate_ && !deletedSTs) {
1591         // Don't know why error on limits
1592         if (forceTerminate_) {
1593 
1594             deletedSTs = true;
1595 
1596             // Remove nodes from node pools
1597             deleteSubTrees();
1598             assert(!(subTreePool_->hasKnowledge()));
1599             updateWorkloadInfo();
1600         }
1601 
1602         //**------------------------------------------------
1603         // If don't check termination, do one unit of work.
1604         //**------------------------------------------------
1605 
1606         if (blockTermCheck_) {
1607 
1608             bool betterSolution = false;
1609 
1610             // Check whether need ask for node index before doing work
1611             if (!haltSearch_) {
1612                 if (getMaxNodeIndex() - getNextNodeIndex() < (unitWorkNodes_ + 5)) {
1613                     workerAskIndices();
1614                     haltSearch_ = true;
1615                 }
1616             }
1617 
1618             if ( !haltSearch_ &&
1619                  (workingSubTree_ || hasKnowledge(AlpsKnowledgeTypeSubTree))) {
1620                 if(isIdle) {
1621                   // Since has work now, set isIdle to false
1622                   idleTime_ += workerTimer_.getTime();
1623                   isIdle = false;
1624                 }
1625 
1626                 // Need check better solution.
1627                 betterSolution = true;
1628 
1629                 // Compute unit work based on node processing time
1630                 if (comUnitWork) {
1631                     unitWorkNodes_ = computeUnitNodes(unitWorkNodes_,
1632                                                 nodeProcessingTime_);
1633                     if (++numComUnitWork > 20) {
1634                         comUnitWork = false;
1635                     }
1636                 }
1637 
1638                 try {
1639                    rCode = doOneUnitWork(unitWorkNodes_,
1640                                          unitTime,
1641                                          exitStatus,
1642                                          thisNumProcessed,
1643                                          thisNumBranched,
1644                                          thisNumDiscarded,
1645                                          thisNumPartial,
1646                                          treeDepth_,
1647                                          betterSolution);
1648                    nodeProcessedNum_ += thisNumProcessed;
1649                    nodeBranchedNum_ += thisNumBranched;
1650                    nodeDiscardedNum_ += thisNumDiscarded;
1651                    nodePartialNum_ += thisNumPartial;
1652                    //std::cout << "Nodes branched" << thisNumBranched << std::endl;
1653                 }
1654                 catch (std::bad_alloc&) {
1655                     errorCode = 1;
1656                 }
1657                 catch(CoinError& er) {
1658                     errorCode = 2;
1659                 }
1660                 catch(...) {
1661                     errorCode = 3;
1662                 }
1663                 if (errorCode) {
1664                     haltSearch_ = true;
1665                     deleteSubTrees();
1666                     assert(!(subTreePool_->hasKnowledge()));
1667                     updateWorkloadInfo();
1668                     sendErrorCodeToMaster(errorCode);
1669                 }
1670 
1671                 // Adjust workingSubTree_ if it 'much' worse than the best one
1672                 changeWorkingSubTree(changeWorkThreshold);
1673 
1674                 // Print tree size
1675                 if ( (msgLevel_ == 1) &&
1676                      (nodeProcessedNum_ % nodeLogInterval == 0) ) {
1677                     if (workerMsgLevel > 0) {
1678                         messageHandler()->message(ALPS_NODE_COUNT, messages())
1679                             << globalRank_<<nodeProcessedNum_
1680                             <<updateNumNodesLeft()
1681                             << CoinMessageEol;
1682                     }
1683                 }
1684 
1685                 // Share generated knowledge.
1686                 sendModelKnowledge(MPI_COMM_WORLD, // Comm
1687                                    globalRank_);   // Receiver(no use)
1688             }
1689             else {
1690                 // Worker is idle.
1691                 if (!isIdle) {
1692                     workerTimer_.start();
1693                     isIdle = true;
1694                 }
1695             }
1696 
1697             // If has better solution, check whether need to send it
1698             if (betterSolution) {
1699                 // Double check if better. If yes, update and sent it to Master
1700                 double incVal = getBestKnowledge(AlpsKnowledgeTypeSolution).second;
1701                 if(incVal < incumbentValue_) {  // Minimization
1702                     incumbentValue_ = incVal;
1703                     incumbentID_ = globalRank_;
1704 #ifdef NF_DEBUG
1705                     std::cout << "\nWORKDER[" << globalRank_ <<
1706                         "]: send a better solution. Quality = "
1707                               << incumbentValue_ << std::endl;
1708 #endif
1709                     sendKnowledge(AlpsKnowledgeTypeSolution, //useful
1710                                   globalRank_,
1711                                   0,
1712                                   smallBuffer_,
1713                                   0,
1714                                   MPI_ANY_TAG,
1715                                   MPI_COMM_WORLD,
1716                                   false);
1717                    //sendIncumbent();
1718                     if (workerMsgLevel > 0) {
1719                         messageHandler()->message(ALPS_SOLUTION_SEARCH,
1720                                                   messages())
1721                             << globalRank_ << incVal << CoinMessageEol;
1722                     }
1723                 }
1724             }
1725 
1726             // Report its status to hub periodically if msg counts are not
1727             // zero or not blocked. If no load, worker will block report
1728 
1729             updateWorkloadInfo();
1730 
1731             if (sendCount_ || recvCount_ || !blockWorkerReport_) {
1732                 incSendCount("workerMain() - workerReportStatus");
1733                 workerReportStatus(AlpsMsgWorkerStatus, MPI_COMM_WORLD);
1734 
1735                 if (workQuantity_ < zeroLoad) {
1736                     blockWorkerReport_ = true;
1737                 }
1738                 if ( intraCB && !forceTerminate_ &&
1739                      (workQuantity_ < needWorkThreshold) &&
1740                      (blockAskForWork_ == false) ) {
1741 
1742                     ++(psStats_.workerAsk_);
1743 
1744                     MPI_Send(tempBuffer, 0, MPI_PACKED, myHubRank_,
1745                              AlpsMsgWorkerNeedWork, MPI_COMM_WORLD);
1746                     incSendCount("workerAskForWork");
1747                     blockAskForWork_ = true;
1748                 }
1749 
1750 #if 0
1751                 std::cout << "WORKER: " << globalRank_
1752                           << " after updateWorkloadInfo() = "
1753                           << workQuantity_ << std::endl;
1754 #endif
1755             }
1756         }
1757         else { /* Do termination check. */
1758 #if 0
1759             std::cout << "WORKER[" << globalRank_ << "] check termination."
1760                       << std::endl;
1761 #endif
1762             // Record rampdown
1763             rampDownTime_ = workerTimer_.getTime();
1764 
1765             // Report my latest status
1766             updateWorkloadInfo();
1767             workerReportStatus(AlpsMsgWorkerTermStatus, clusterComm_);
1768 
1769 #ifdef NF_DEBUG
1770             std::cout << "WORKER[" << globalRank_ << "] reported TEMR status."
1771                       << std::endl;
1772 #endif
1773             // Get instruction from my hub
1774             MPI_Request termReq;
1775             MPI_Status termSta;
1776             MPI_Irecv(&reply, 1, MPI_CHAR, masterRank_, AlpsMsgContOrTerm,
1777                       clusterComm_, &termReq);
1778             MPI_Wait(&termReq, &termSta);
1779 
1780             if(reply == 'T') {
1781                 terminate = true;
1782                 if (workerMsgLevel > 0) {
1783                     messageHandler()->message(ALPS_TERM_WORKER_INFORM, messages())
1784                         << globalRank_ << "exit" << CoinMessageEol;
1785                 }
1786                 break;  // Break * and terminate
1787             }
1788             else {
1789                 if (workerMsgLevel > 0) {
1790                     messageHandler()->message(ALPS_TERM_WORKER_INFORM, messages())
1791                         << globalRank_ << "continue" << CoinMessageEol;
1792                 }
1793                 blockWorkerReport_ = false;
1794                 blockTermCheck_ = true;
1795                 terminate = false;
1796             }
1797         }
1798     }
1799 
1800     //------------------------------------------------------
1801     // How many node left?
1802     //------------------------------------------------------
1803 
1804     updateNumNodesLeft();
1805 
1806     //------------------------------------------------------
1807     // Cancel MPI_Irecv before leaving.
1808     //------------------------------------------------------
1809 
1810     int cancelNum = 0;
1811     MPI_Cancel(&request);
1812     MPI_Wait(&request, &status);
1813     MPI_Test_cancelled(&status, &flag);
1814     if(flag) {  // Cancel succeeded
1815         ++cancelNum;
1816     }
1817 }
1818 
1819 //#############################################################################
1820 
1821 // Process messages.
1822 void
processMessages(char * & bufLarge,MPI_Status & status,MPI_Request & request)1823 AlpsKnowledgeBrokerMPI::processMessages(char *&bufLarge,
1824                                         MPI_Status &status,
1825                                         MPI_Request &request)
1826 {
1827     int count;
1828     bool success = false;
1829 
1830     incRecvCount("Processing msg");
1831     switch (status.MPI_TAG) {
1832 
1833         //--------------------------------------------------
1834         // Following are master's msgs.
1835         //--------------------------------------------------
1836     case AlpsMsgErrorCode:
1837         recvErrorCode(bufLarge);
1838         masterForceHubTerm();
1839         hubForceWorkerTerm();
1840         break;
1841     case AlpsMsgHubFailFindDonor:
1842         --masterDoBalance_;
1843         break;
1844     case AlpsMsgHubPeriodReport:
1845         masterUpdateSysStatus(bufLarge, &status, MPI_COMM_WORLD);
1846         break;
1847     case AlpsMsgTellMasterRecv:
1848         --masterDoBalance_;
1849         break;
1850     case AlpsMsgWorkerAskIndices:
1851         masterSendIndices(bufLarge);
1852         break;
1853 
1854         //-------------------------------------------------
1855         // Following are hub's msgs.
1856         //-------------------------------------------------
1857 
1858     case AlpsMsgWorkerNeedWork:
1859         hubSatisfyWorkerRequest(bufLarge, &status);
1860         break;
1861     case AlpsMsgAskHubShare:
1862         hubsShareWork(bufLarge, &status);
1863         break;
1864     case AlpsMsgAskHubPause:
1865         // Do not count messages during terminate checking
1866         decRecvCount("hub periodical listening: AlpsMsgAskPause");
1867         blockTermCheck_ = false;
1868         break;
1869     case AlpsMsgTellHubRecv:
1870         --hubDoBalance_;
1871         break;
1872     case AlpsMsgWorkerStatus:
1873         hubUpdateCluStatus(bufLarge, &status, MPI_COMM_WORLD);
1874         blockHubReport_ = false;
1875         break;
1876 
1877         //--------------------------------------------------
1878         // Following are worker's msg.
1879         //-------------------------------------------------
1880 
1881     case AlpsMsgAskDonate:
1882         donateWork(bufLarge, AlpsMsgSubTree, &status);
1883         incSendCount("worker listening - AlpsMsgAskDonate");
1884         break;
1885     case AlpsMsgAskDonateToHub:
1886         donateWork(bufLarge, AlpsMsgSubTreeByMaster, &status);
1887         incSendCount("worker listening - AlpsMsgAskDonateToHub");
1888         break;
1889     case AlpsMsgAskDonateToWorker:
1890         donateWork(bufLarge, AlpsMsgSubTreeByWorker, &status);
1891         incSendCount("worker listening - AlpsMsgAskDonateToWorker");
1892         break;
1893     case AlpsMsgAskPause:
1894         // Do not count messages during terminate checking
1895         decRecvCount("worker listening - AlpsMsgAskPause");
1896         blockTermCheck_ = false;
1897         break;
1898     case AlpsMsgIndicesFromMaster:
1899         workerRecvIndices(bufLarge);
1900         break;
1901     case AlpsMsgSubTree:
1902         receiveSubTree(bufLarge, status.MPI_SOURCE, &status);
1903         tellHubRecv();
1904         MPI_Get_count(&status, MPI_PACKED, &count);
1905         if (count > 0){
1906             blockWorkerReport_ = false;
1907             blockAskForWork_  = false;
1908         }
1909         break;
1910     case AlpsMsgSubTreeByWorker:
1911         receiveSubTree(bufLarge, status.MPI_SOURCE, &status);
1912         MPI_Get_count(&status, MPI_PACKED, &count);
1913         if (count > 0){
1914             blockWorkerReport_ = false;
1915             blockAskForWork_  = false;
1916         }
1917         break;
1918 
1919         //--------------------------------------------------
1920         // Following are common msgs.
1921         //--------------------------------------------------
1922 
1923     case AlpsMsgModelGenSearch:
1924         receiveModelKnowledge(MPI_COMM_WORLD);
1925         forwardModelKnowledge();
1926         break;
1927     case AlpsMsgIncumbentTwo:
1928         success = unpackSetIncumbent(bufLarge, &status);
1929         if (success) {
1930             sendIncumbent();
1931             //updateIncumbent_ = false;
1932         }
1933         break;
1934     case AlpsMsgSubTreeByMaster:
1935         if (globalRank_ == masterRank_) {
1936             hubAllocateDonation(bufLarge, &status);
1937             MPI_Get_count(&status, MPI_PACKED, &count);
1938             --masterDoBalance_;
1939         }
1940         else if (globalRank_ == myHubRank_) {
1941             hubAllocateDonation(bufLarge, &status);
1942             tellMasterRecv();
1943             MPI_Get_count(&status, MPI_PACKED, &count);
1944             if (count > 0) {
1945                 blockHubReport_ = false;
1946             }
1947         }
1948         else {
1949             receiveSubTree(bufLarge, status.MPI_SOURCE, &status);
1950             MPI_Get_count(&status, MPI_PACKED, &count);
1951             if (count > 0) blockWorkerReport_ = false;
1952             break;
1953         }
1954 
1955         break;
1956     case AlpsMsgForceTerm:
1957         forceTerminate_ = true;
1958         break;
1959     default:
1960         std::cout << "PROC " << globalRank_ << " : processMessages"
1961                   << " : received UNKNOWN message. tag = "
1962                   << status.MPI_TAG <<  std::endl;
1963         throw CoinError("Unknown message type", "workermain",
1964                         "AlpsKnowledgeBrokerMPI");
1965     }
1966 
1967     MPI_Irecv(bufLarge, largeSize_, MPI_PACKED, MPI_ANY_SOURCE,
1968               MPI_ANY_TAG, MPI_COMM_WORLD, &request);
1969 
1970 }
1971 
1972 //#############################################################################
1973 
1974 // Master ask the donor hub to send work to the receiver hub
1975 void
masterAskHubDonate(int donorID,int receiverID,double receiverWorkload)1976 AlpsKnowledgeBrokerMPI::masterAskHubDonate(int donorID,
1977                                            int receiverID,
1978                                            double receiverWorkload)
1979 {
1980     int smallSize =
1981         model_->AlpsPar()->entry(AlpsParams::smallSize);
1982     int pos       = 0;
1983 
1984     MPI_Pack(&receiverID, 1, MPI_INT, smallBuffer_, smallSize, &pos, MPI_COMM_WORLD);
1985     MPI_Pack(&receiverWorkload, 1, MPI_DOUBLE, smallBuffer_, smallSize, &pos,
1986              MPI_COMM_WORLD);
1987 
1988 #if 0
1989     std::cout << "masterAskHubDonate(): donor is " << donorID << std::endl;
1990 #endif
1991 
1992     MPI_Send(smallBuffer_,pos, MPI_PACKED, donorID, AlpsMsgAskHubShare, MPI_COMM_WORLD);
1993     incSendCount("masterAskHubDonate()");
1994 
1995 }
1996 
1997 //#############################################################################
1998 
1999 // Master ask the donor hub to send work to the receiver hub
2000 void
hubAskWorkerDonate(int donorID,int receiverID,double receiverWorkload)2001 AlpsKnowledgeBrokerMPI::hubAskWorkerDonate(int donorID,
2002                                            int receiverID,
2003                                            double receiverWorkload)
2004 {
2005     int smallSize = model_->AlpsPar()->entry(AlpsParams::smallSize);
2006     int pos = 0;
2007 
2008     MPI_Pack(&receiverID, 1, MPI_INT, smallBuffer_, smallSize, &pos,
2009              MPI_COMM_WORLD);
2010     MPI_Pack(&receiverWorkload, 1, MPI_DOUBLE, smallBuffer_, smallSize, &pos,
2011              MPI_COMM_WORLD);
2012 
2013 #ifdef NF_DEBUG_MORE
2014     std::cout << "hubAskHubDonate() send to " << donorID << std::endl;
2015 #endif
2016 
2017     MPI_Send(smallBuffer_, pos, MPI_PACKED, donorID, AlpsMsgAskDonate,
2018              MPI_COMM_WORLD);
2019     incSendCount("hubAskWorkerDonate()");
2020 }
2021 
2022 //#############################################################################
2023 
2024 // Calculate the quality and quantity of workload on this process
2025 void
updateWorkloadInfo()2026 AlpsKnowledgeBrokerMPI::updateWorkloadInfo()
2027 {
2028     int count;
2029 
2030     workQuality_ = ALPS_OBJ_MAX;  // Worst ever possible
2031     workQuantity_ = 0.0;          // No work
2032 
2033     if ( (workingSubTree_ == NULL) &&
2034          (subTreePool_->hasKnowledge() == false) ) {
2035         return;
2036     }
2037 
2038     std::vector<AlpsSubTree* > subTreeVec =
2039         subTreePool_->getSubTreeList().getContainer();
2040     std::vector<AlpsSubTree* >::iterator pos1, pos2;
2041     pos1 = subTreeVec.begin();
2042     pos2 = subTreeVec.end();
2043 
2044     workQuality_ = subTreePool_->getBestQuality();
2045 
2046     //    if (pos1 != pos2) {
2047     //        workQuality_ = (*pos1)->getQuality();//Best in pool
2048     //    }
2049 
2050     for (; pos1 != pos2; ++pos1) {
2051         if ((*pos1)->getNumNodes() > 5) {
2052             //count = 5;
2053             count = (*pos1)->getNumNodes();
2054         }
2055         else {
2056             count = (*pos1)->getNumNodes();
2057         }
2058         workQuantity_ += count;
2059     }
2060 
2061     if (workingSubTree_ != 0) {
2062         workingSubTree_->calculateQuality();
2063         if (workQuality_ > workingSubTree_->getQuality()) {
2064             workQuality_ = workingSubTree_->getQuality();
2065         }
2066         if (workingSubTree_->getNumNodes() > 5) {
2067             //count = 5;
2068             count = workingSubTree_->getNumNodes();
2069         }
2070         else {
2071             count = workingSubTree_->getNumNodes();
2072         }
2073         workQuantity_ += count;
2074     }
2075 
2076 #ifdef NF_DEBUG_MORE
2077     std::cout << "PROC[" << globalRank_  << "] incumbentValue_ = "
2078               << incumbentValue_ << "; workQuality_ = "
2079               << workQuality_<< "; workQuantity_ = "
2080               << workQuantity_ << std::endl;
2081 #endif
2082 }
2083 
2084 //#############################################################################
2085 
2086 // A worker donates a subtree to another worker(whose info is in bufLarge)
2087 void
donateWork(char * & anyBuffer,int tag,MPI_Status * status,int recvID,double recvWL)2088 AlpsKnowledgeBrokerMPI::donateWork(char*& anyBuffer,
2089                                    int tag,
2090                                    MPI_Status* status,
2091                                    int recvID,
2092                                    double recvWL)
2093 {
2094     int size       =
2095         model_->AlpsPar()->entry(AlpsParams::smallSize);
2096     int pos        = 0;
2097     int receiverID = 0;
2098     int treeSize;
2099 
2100     char* dummyBuf = 0;
2101     double receiverWorkload = 0.0;
2102     bool sentSuccessful = false;
2103     AlpsSubTree* aSubTree = 0;
2104 
2105 #ifdef NF_DEBUG_MORE
2106     updateWorkloadInfo();
2107     messageHandler()->message(ALPS_DONATE_BEFORE, messages())
2108         << globalRank_ << workQuantity_ << subTreePool_->getNumKnowledges()
2109         << CoinMessageEol;
2110 #endif
2111 
2112     //------------------------------------------------------
2113     // Find out to which process I should donate.
2114     //------------------------------------------------------
2115 
2116     if (recvID == -1) {
2117         MPI_Unpack(anyBuffer, size, &pos, &receiverID, 1, MPI_INT, MPI_COMM_WORLD);
2118         MPI_Unpack(anyBuffer, size, &pos, &receiverWorkload, 1, MPI_DOUBLE,
2119                    MPI_COMM_WORLD);
2120     }
2121     else {
2122         receiverID = recvID;
2123         receiverWorkload = recvWL;
2124     }
2125 
2126 
2127     //------------------------------------------------------
2128     // Check if previous sending subtree has completed.
2129     //------------------------------------------------------
2130 
2131     int alreadySent = true;
2132     if (subTreeRequest_ != MPI_REQUEST_NULL){
2133         MPI_Status sentStatus;
2134         MPI_Test(&subTreeRequest_, &alreadySent, &sentStatus);
2135 #if 0
2136         if (alreadySent) {
2137             std::cout << "++++ Process[" << globalRank_
2138                       << "] have completed sending a subtree." << std::endl;
2139         }
2140         else {
2141             std::cout << "++++ Process[" << globalRank_
2142                       << "] haven't completed sending a subtree." << std::endl;
2143         }
2144 #endif
2145     }
2146     else {
2147 #if 0
2148         std::cout << "++++ Process[" << globalRank_
2149                   << "] ready to sent a subtree." << std::endl;
2150 #endif
2151     }
2152 
2153     //------------------------------------------------------
2154     // Case 1: If subTreePool has subtrees, send the best one.
2155     //         if the size of the best one is big large, split it.
2156     // Case 2: If subTreePool has no subtrees AND workingSubTree_ does not
2157     //         point to NULL, split it and send one part of it.
2158     // Case 3: Otherwise, sent a empty msg.
2159     //------------------------------------------------------
2160 
2161     if (alreadySent) {
2162         if (subTreePool_->hasKnowledge()) {   // Case 1
2163             aSubTree = dynamic_cast<AlpsSubTree* >
2164                 (subTreePool_->getKnowledge().first);
2165             sentSuccessful = sendSubTree(receiverID, aSubTree, tag);
2166 
2167             if (sentSuccessful) {
2168                 ++(psStats_.subtreeWhole_);
2169 
2170                 subTreePool_->popKnowledge();
2171                 // Since sent to other process, delete it.
2172                 delete aSubTree;
2173                 aSubTree = NULL;
2174                 if (msgLevel_ > 100) {
2175                     messageHandler()->message(ALPS_DONATE_WHOLE, messages())
2176                         << globalRank_ << receiverID
2177                         << status->MPI_TAG << CoinMessageEol;
2178                 }
2179             }
2180             else {
2181                 // Split subtree.
2182                 aSubTree = aSubTree->splitSubTree(treeSize);
2183                 if (treeSize > ALPS_GEN_TOL) {
2184                     ++(psStats_.subtreeSplit_);
2185                     sentSuccessful = sendSubTree(receiverID, aSubTree, tag);
2186                     assert(sentSuccessful == true);
2187                     // Since sent to other process, delete it.
2188                     delete aSubTree;
2189                     aSubTree = NULL;
2190                 }
2191 
2192                 if (msgLevel_ > 100) {
2193                     messageHandler()->message(ALPS_DONATE_SPLIT, messages())
2194                         << globalRank_ << receiverID << status->MPI_TAG
2195                         << CoinMessageEol;
2196                 }
2197             }
2198         }
2199         else if (workingSubTree_ != 0) {     // Case 2
2200             aSubTree = workingSubTree_->splitSubTree(treeSize);
2201             if (treeSize > ALPS_GEN_TOL) {
2202                 ++(psStats_.subtreeSplit_);
2203 
2204                 sentSuccessful = sendSubTree(receiverID, aSubTree, tag);
2205                 assert(sentSuccessful == true);
2206 
2207                 // Since sent to other process, delete it.
2208                 delete aSubTree;
2209                 aSubTree = NULL;
2210 
2211                 if (msgLevel_ > 100) {
2212                     messageHandler()->message(ALPS_DONATE_SPLIT, messages())
2213                         << globalRank_ << receiverID << status->MPI_TAG
2214                         << CoinMessageEol;
2215                 }
2216             }
2217         }
2218     }
2219 
2220     if (!sentSuccessful || !alreadySent) {               // Case 3
2221 #if 0
2222         std::cout << "donateWork(): " << globalRank_ << "has nothing send to "
2223                   << receiverID <<"; alreadySent "<<alreadySent<< std::endl;
2224 #endif
2225         ++(psStats_.donateFail_);
2226 
2227         //MPI_Send(dummyBuf, 0, MPI_PACKED, receiverID, tag, MPI_COMM_WORLD);
2228         MPI_Isend(dummyBuf, 0, MPI_PACKED, receiverID, tag,
2229                   MPI_COMM_WORLD, &subTreeRequest_); // request is overrided.
2230 
2231         if (msgLevel_ > 100) {
2232             messageHandler()->message(ALPS_DONATE_FAIL, messages())
2233                 << globalRank_ << receiverID << status->MPI_TAG
2234                 << CoinMessageEol;
2235         }
2236     }
2237     else {
2238         ++(psStats_.donateSuccess_);
2239     }
2240 
2241 #ifdef NF_DEBUG_MORE
2242     updateWorkloadInfo();
2243     messageHandler()->message(ALPS_DONATE_AFTER, messages())
2244         << globalRank_ << workQuantity_ << subTreePool_->getNumKnowledges()
2245         << CoinMessageEol;
2246 #endif
2247 }
2248 
2249 //#############################################################################
2250 
2251 // The hub allocate the donated subtree to the worker who has the worst
2252 // workload. Note the donated subtree is from another cluster.
2253 void
hubAllocateDonation(char * & bufLarge,MPI_Status * status)2254 AlpsKnowledgeBrokerMPI::hubAllocateDonation(char*& bufLarge,
2255                                             MPI_Status* status)
2256 {
2257     int i;
2258     int count = 0;
2259 
2260     int worstRank = -1;
2261     double worst = -ALPS_OBJ_MAX; // Minimization: best possible
2262 
2263     //------------------------------------------------------
2264     // Get the number of elements in bufLarge.
2265     //------------------------------------------------------
2266 
2267     MPI_Get_count(status, MPI_PACKED, &count);
2268 
2269     //------------------------------------------------------
2270     // Find the worker who has worst work quality.
2271     //------------------------------------------------------
2272 
2273     for (i = 0; i < clusterSize_; ++i) {
2274         if (!hubWork_) {
2275             if (i == clusterRank_) continue;
2276         }
2277         if (workerWorkQuantities_[i] > worst) {
2278             worst = workerWorkQualities_[i];
2279             worstRank = globalRank_ + i;
2280         }
2281     }
2282 
2283     //------------------------------------------------------
2284     // Forward the bufLarge to this worst worker.
2285     //------------------------------------------------------
2286 
2287     if (worstRank != -1) {
2288 
2289 #ifdef NF_DEBUG_MORE
2290         std::cout << "hubAllocateDonation() send to "
2291                   << worstRank << std::endl;
2292 #endif
2293         if (worstRank != globalRank_) {
2294             MPI_Send(bufLarge, count, MPI_PACKED, worstRank,
2295                      AlpsMsgSubTreeByMaster, MPI_COMM_WORLD);
2296             incSendCount("hubAllocateDonation");
2297         }
2298         else { // Hub self
2299             //std::cout << "Allocate to myself" << std::endl;
2300             receiveSubTree(bufLarge, status->MPI_SOURCE, status);
2301         }
2302     }
2303     else {
2304         std::cout << "ERROR: worstRank == -1" << std::endl;
2305         throw CoinError("worstRank == -1",
2306                         "hubAllocateDonation", "AlpsKnowledgeBrokerMPI");
2307     }
2308 }
2309 
2310 //#############################################################################
2311 
2312 // Hub balances the workload of its workers. Check if there are workers has
2313 // no workload. If there is, do quantity balance. If not, do quality balance.
2314 void
hubBalanceWorkers()2315 AlpsKnowledgeBrokerMPI::hubBalanceWorkers()
2316 {
2317     const double zeroQuantity =
2318         model_->AlpsPar()->entry(AlpsParams::zeroLoad);
2319     if (clusterWorkQuantity_ < zeroQuantity) {
2320         if (msgLevel_ > 100) {
2321             messageHandler()->message(ALPS_LOADBAL_HUB_NO, messages())
2322                 //<< globalRank_ << systemWorkQuantity_ << CoinMessageEol;
2323                 << globalRank_ << clusterWorkQuantity_ << CoinMessageEol;
2324         }
2325         return;
2326     }
2327 
2328     int i;
2329     char* dummyBuf = NULL;
2330 
2331     std::vector<std::pair<double, int> > loadIDVector;
2332     loadIDVector.reserve(hubNum_);
2333     std::multimap<double, int, std::greater<double> > receivers;
2334     std::multimap<double, int> donors;
2335 
2336     const double donorSh      =
2337         model_->AlpsPar()->entry(AlpsParams::donorThreshold);
2338     const double receiverSh   =
2339         model_->AlpsPar()->entry(AlpsParams::receiverThreshold);
2340     const double needWorkThreshold =
2341         model_->AlpsPar()->entry(AlpsParams::needWorkThreshold);
2342     assert(needWorkThreshold > 0.0);
2343 
2344     ++(psStats_.intraBalance_);
2345 
2346     // Indentify donors and receivers and decide to do quantity or quality.
2347     // Do quantity balance if any hubs do not have work
2348     bool quantityBalance = false;
2349 
2350     for (i = 0; i < clusterSize_; ++i) { // NOTE: i start from 1
2351 #ifdef NF_DEBUG
2352         std::cout << "Hub["<<globalRank_ <<"] : HUB BALANCE: worker "
2353                   << i << ", quantitity = " << workerWorkQuantities_[i]
2354                   << ", quality = " << workerWorkQualities_[i]
2355                   << ", needWorkThreshold = " << needWorkThreshold << std::endl;
2356 #endif
2357         if ( i == clusterRank_ ) continue;
2358 
2359         if (workerWorkQuantities_[i] <= needWorkThreshold) {
2360             receivers.insert(std::pair<double, int>(workerWorkQualities_[i],
2361                                                     globalRank_ - masterRank_ + i));
2362             quantityBalance = true;
2363         }
2364     }
2365 
2366     if (quantityBalance) {  // Quantity balance
2367         ++(psStats_.quantityBalance_);
2368 
2369         for (i = 0; i < clusterSize_; ++i) {
2370             if ( i == clusterRank_ ) continue;
2371 
2372             if (workerWorkQuantities_[i] > needWorkThreshold) {
2373                 donors.insert(std::pair<double, int>(workerWorkQualities_[i],
2374                                                      globalRank_-masterRank_ + i));
2375             }
2376         }
2377     }
2378     else {   // Quality balance
2379         ++(psStats_.qualityBalance_);
2380 
2381         double averQuality = 0.0;
2382         for (i = 0; i < clusterSize_; ++i) {
2383             if ( i == clusterRank_ ) continue;
2384             averQuality += workerWorkQualities_[i];
2385         }
2386         averQuality /= (clusterSize_ - 1);
2387 
2388         for (i = 0; i < clusterSize_; ++i) {
2389             if ( i == clusterRank_ ) continue;
2390             double diff = workerWorkQualities_[i] - averQuality;
2391             double diffRatio = fabs(diff / averQuality);
2392             if (diff < 0 && diffRatio > donorSh) {  // Donor candidates
2393                 donors.insert(std::pair<double, int>(workerWorkQualities_[i],
2394                                                      globalRank_ -masterRank_ + i));
2395             }
2396             else if (diff > 0 && diffRatio > receiverSh){// Receiver candidates
2397                 receivers.insert(std::pair<double, int>(
2398                                      workerWorkQualities_[i],
2399                                      globalRank_ - masterRank_ + i));
2400             }
2401         }
2402     }
2403 
2404     // Instruct donor workers to send nodes to receiver workers
2405     const int numDonor    = (int) donors.size();
2406     const int numReceiver = (int) receivers.size();
2407     const int numPair     = CoinMin(numDonor, numReceiver);
2408 
2409 #ifdef NF_DEBUG
2410     std::cout << "Hub[" << globalRank_ << "]: num of donors = " << numDonor
2411               << ", num of receivers = " << numReceiver << std::endl;
2412 #endif
2413 
2414     int donorID;
2415     int receiverID;
2416 
2417     std::multimap<double, int, std::greater<double> >::iterator posD =
2418         donors.begin();
2419     std::multimap<double, int>::iterator posR = receivers.begin();
2420 
2421     for(i = 0; i < numPair; ++i) {
2422         donorID = posD->second;
2423         receiverID = posR->second;
2424 
2425 #ifdef NF_DEBUG
2426         std::cout << "Hub["<<globalRank_ <<"] : HUB BALANCE: receiver worker ID = "
2427                   << receiverID << std::endl;
2428         std::cout << "Hub["<<globalRank_ <<"] : donor worker ID = "
2429                   << donorID << std::endl;
2430 #endif
2431 
2432         assert(receiverID < processNum_ && donorID < processNum_);
2433 
2434         ++hubDoBalance_;
2435         if (donorID == globalRank_) { // Hub self is donor
2436             MPI_Status status;
2437             donateWork(dummyBuf, AlpsMsgSubTree, &status,
2438                        receiverID, posR->first);
2439             incSendCount("hubBalanceWorkers");
2440         }
2441         else {
2442             hubAskWorkerDonate(donorID, receiverID, posR->first);
2443         }
2444 
2445         ++posD;
2446         ++posR;
2447     }
2448 }
2449 
2450 //#############################################################################
2451 
2452 // Upon receiving request for workload from a worker, this hub asks its most
2453 // loaded worker to send a subtree to the worker
2454 void
hubSatisfyWorkerRequest(char * & bufLarge,MPI_Status * status)2455 AlpsKnowledgeBrokerMPI::hubSatisfyWorkerRequest(char*& bufLarge, MPI_Status* status)
2456 {
2457     int i;
2458     int requestorRank;
2459     int pos = 0;
2460     int donorRank = -1;
2461     int donorGlobalRank = -1;      // Global rank
2462     int size = model_->AlpsPar()->entry(AlpsParams::smallSize);
2463 
2464     double bestQuality = ALPS_OBJ_MAX;  // Worst could be
2465 
2466     //------------------------------------------------------
2467     // Indentify the requestor's cluster rank.
2468     //------------------------------------------------------
2469 
2470     int requestor = status->MPI_SOURCE;
2471     requestorRank = requestor % userClusterSize_;
2472 
2473     //------------------------------------------------------
2474     // Find the worker having best quality in this cluster.
2475     //------------------------------------------------------
2476 
2477     for (i = 0; i < clusterSize_; ++i) {
2478         if (i == clusterRank_ || i == requestorRank){
2479             // Skip hub and the requestor.
2480             // TODO: do not skip hub if hub works.
2481             continue;
2482         }
2483         if (workerWorkQualities_[i] < bestQuality) {
2484             bestQuality = workerWorkQualities_[i];
2485             donorRank = i;
2486             donorGlobalRank = globalRank_ - masterRank_ + i;
2487         }
2488     }
2489 
2490     //------------------------------------------------------
2491     // Ask the worker has best qaulity to share work with requestor.
2492     //------------------------------------------------------
2493 
2494     double temp = 0.0;
2495 
2496     if ( (donorGlobalRank != -1) && (donorRank != requestorRank) ) {
2497         // Send the requestor rank and quality to donor.
2498         MPI_Pack(&requestor, 1, MPI_INT, smallBuffer_, size, &pos,
2499                  MPI_COMM_WORLD);
2500         MPI_Pack(&temp, 1, MPI_DOUBLE, smallBuffer_, size, &pos,
2501                  MPI_COMM_WORLD);
2502         MPI_Send(smallBuffer_, pos, MPI_PACKED,
2503                  donorGlobalRank,
2504                  AlpsMsgAskDonateToWorker,
2505                  MPI_COMM_WORLD);
2506 
2507 #ifdef NF_DEBUG_MORE
2508         std::cout << "HUB " << globalRank_ << "ask worker " <<donorGlobalRank
2509                   << "to donate workload with " << requestor << std::endl;
2510 #endif
2511     }
2512     else {
2513         // Failed to find a donor, send a empty buffer to requestor.
2514         ++(psStats_.donateFail_);
2515 
2516         MPI_Send(smallBuffer_, 0, MPI_PACKED,
2517                  requestor,
2518                  AlpsMsgSubTreeByWorker,
2519                  MPI_COMM_WORLD);
2520         if (msgLevel_ > 100) {
2521             messageHandler()->message(ALPS_LOADBAL_HUB_FAIL, messages())
2522                 << globalRank_ << requestor << CoinMessageEol;
2523         }
2524     }
2525 
2526     incSendCount("hubSatisfyWorkerRequest");
2527 }
2528 
2529 //#############################################################################
2530 
2531 // Report cluster work quality, quantity, and msg counts.
2532 void
hubReportStatus(int tag,MPI_Comm comm)2533 AlpsKnowledgeBrokerMPI::hubReportStatus(int tag, MPI_Comm comm)
2534 {
2535     int pos   = 0;
2536     int size = model_->AlpsPar()->entry(AlpsParams::smallSize);
2537 
2538     int receiver;
2539 
2540     if (comm == MPI_COMM_WORLD) {
2541         receiver = masterRank_;
2542     }
2543     else if (comm == hubComm_) {
2544         // NOTE: master's rank is always 0 in hubComm_.
2545         receiver = 0;
2546     }
2547     else {
2548         std::cout << "HUB " << globalRank_
2549                   <<" : Unkown Comm in hubReportStatus" << std::endl;
2550         throw CoinError("Unkown Comm", "hubReportStatus",
2551                         "AlpsKnowledgeBrokerMPI");
2552     }
2553 
2554 #if defined(NF_DEBUG_MORE)
2555     std::cout << "HUB " << globalRank_
2556               << " : quantity = " << clusterWorkQuantity_ << ", ";
2557     for (int i = 0; i < clusterSize_; ++i) {
2558         std::cout << "w[" << globalRank_+i << "]="
2559                   << workerWorkQuantities_[i] << ", ";
2560     }
2561     std::cout << std::endl;
2562     std::cout << "HUB " << globalRank_
2563               << " : quality = " << clusterWorkQuality_ << ", ";
2564     for (int i = 0; i < clusterSize_; ++i) {
2565         std::cout << "w[" << globalRank_+i << "]="
2566                   << workerWorkQualities_[i] << ", ";
2567     }
2568     std::cout << std::endl;
2569 #endif
2570 
2571     MPI_Pack(&clusterNodeProcessed_, 1, MPI_INT, smallBuffer_,size,&pos, comm);
2572     MPI_Pack(&clusterWorkQuality_, 1, MPI_DOUBLE, smallBuffer_,size,&pos,comm);
2573     MPI_Pack(&clusterWorkQuantity_, 1, MPI_DOUBLE,smallBuffer_,size,&pos,comm);
2574     MPI_Pack(&clusterSendCount_, 1, MPI_INT, smallBuffer_, size, &pos, comm);
2575     MPI_Pack(&clusterRecvCount_, 1, MPI_INT, smallBuffer_, size, &pos, comm);
2576     MPI_Pack(&nodeProcessingTime_, 1, MPI_DOUBLE, smallBuffer_, size, &pos,
2577              comm);
2578     MPI_Pack(&unitWorkNodes_, 1, MPI_INT, smallBuffer_, size, &pos, comm);
2579 
2580     MPI_Send(smallBuffer_, pos, MPI_PACKED, receiver, tag, comm);
2581 
2582     clusterSendCount_ = clusterRecvCount_ = 0;  // Only count new msg
2583 }
2584 
2585 //#############################################################################
2586 
2587 // After receiving status (in buf) from a worker, a hub updates its
2588 // cluster's status
2589 void
hubUpdateCluStatus(char * & bufLarge,MPI_Status * status,MPI_Comm comm)2590 AlpsKnowledgeBrokerMPI::hubUpdateCluStatus(char*& bufLarge,
2591                                            MPI_Status* status,
2592                                            MPI_Comm comm)
2593 {
2594     int msgSendNum, msgRecvNum;
2595     int position = 0;
2596     int sender;
2597     int size = model_->AlpsPar()->entry(AlpsParams::smallSize);
2598 
2599     if (comm == MPI_COMM_WORLD)
2600         sender = (status->MPI_SOURCE) % userClusterSize_;
2601     else if (comm == clusterComm_)
2602         sender = status->MPI_SOURCE;
2603     else {
2604         std::cout << "unknown sender in hubUpdateCluStatus()" << std::endl;
2605         throw CoinError("unknown sender",
2606                         "hubUpdateCluStatus()", "AlpsKnowledgeBrokerMPI");
2607     }
2608 
2609     workerReported_[sender] = true;
2610 
2611     int preNodeProcessed = workerNodeProcesseds_[sender];
2612     int curNodeProcessed;
2613     //double preQuality = workerWorkQualities_[sender];
2614     double curQuality;
2615     double preQuantity = workerWorkQuantities_[sender];
2616     double curQuantity;
2617     double npTime;
2618 
2619     MPI_Unpack(bufLarge, size, &position, &curNodeProcessed, 1, MPI_INT, comm);
2620     MPI_Unpack(bufLarge, size, &position, &curQuality, 1, MPI_DOUBLE, comm);
2621     MPI_Unpack(bufLarge, size, &position, &curQuantity, 1, MPI_DOUBLE, comm);
2622     MPI_Unpack(bufLarge, size, &position, &msgSendNum, 1, MPI_INT, comm);
2623     MPI_Unpack(bufLarge, size, &position, &msgRecvNum, 1, MPI_INT, comm);
2624     MPI_Unpack(bufLarge, size, &position, &npTime, 1, MPI_DOUBLE, comm);
2625     MPI_Unpack(bufLarge, size, &position, &unitWorkNodes_, 1, MPI_INT, comm);
2626 
2627     workerNodeProcesseds_[sender] = curNodeProcessed;
2628     workerWorkQualities_[sender] = curQuality;
2629     workerWorkQuantities_[sender] = curQuantity;
2630 
2631     clusterNodeProcessed_ -= preNodeProcessed;
2632     clusterNodeProcessed_ += curNodeProcessed;
2633 
2634     clusterWorkQuantity_ -= preQuantity;
2635     clusterWorkQuantity_ += curQuantity;
2636 
2637     if (clusterWorkQuantity_ < ALPS_QUALITY_TOL)
2638         clusterWorkQuality_ = ALPS_OBJ_MAX;
2639     else {
2640 #if 0  // Have problems
2641         // NOTE: no work means quality is ALPS_OBJ_MAX.
2642         if (hubWork_) {
2643             clusterWorkQuality_ *= userClusterSize_;
2644         }
2645         else {
2646             clusterWorkQuality_ *= (userClusterSize_ - 1);
2647         }
2648 
2649         clusterWorkQuality_ -= preQuality;
2650         clusterWorkQuality_ += curQuality;
2651         if (hubWork_) {
2652             clusterWorkQuality_ /= userClusterSize_;
2653         }
2654         else {
2655             clusterWorkQuality_ /= (userClusterSize_ - 1);
2656         }
2657 #endif
2658         clusterWorkQuality_ = std::min(clusterWorkQuality_, curQuality);
2659 
2660     }
2661 
2662     clusterSendCount_ += msgSendNum;
2663     clusterRecvCount_ += msgRecvNum;
2664 
2665     if (npTime != ALPS_NODE_PROCESS_TIME && npTime > 1.0e-8) {
2666         if (nodeProcessingTime_ == ALPS_NODE_PROCESS_TIME) {
2667             nodeProcessingTime_ = npTime;
2668         }
2669         else {
2670             nodeProcessingTime_ = 0.5 * (nodeProcessingTime_ + npTime);
2671         }
2672 
2673         if (globalRank_ == masterRank_) {
2674             masterBalancePeriod_ = computeBalancePeriod(userBalancePeriod_,
2675                                                         masterBalancePeriod_,
2676                                                         nodeProcessingTime_);
2677         }
2678         else {
2679             hubReportPeriod_ = computeBalancePeriod(userBalancePeriod_,
2680                                                     hubReportPeriod_,
2681                                                     nodeProcessingTime_);
2682         }
2683 
2684         if (hubMsgLevel_ > 5) {
2685             messageHandler()->message(ALPS_LOADBAL_HUB_PERIOD, messages())
2686                 << globalRank_ << hubReportPeriod_ << CoinMessageEol;
2687         }
2688 
2689 #if 0
2690         std::cout << "HUB[" << globalRank_ << "]: After updateSystem(),"
2691                   << "hubReportPeriod_ =  " << masterBalancePeriod_
2692                   << ", npTime = " << npTime
2693                   << ", nodeProcessingTime_ = " << nodeProcessingTime_
2694                   << std::endl;
2695 #endif
2696 
2697     }
2698 
2699 #if 0
2700     std::cout << "HUB["<< globalRank_ <<"]: after hubUpdateCluStatus(): "
2701               << "curQuality = " << curQuality
2702               << ", sender = " << sender
2703               << ", curQuantity = " << curQuantity
2704               << ", preQuantity = " << preQuantity
2705               << ", clusterWorkQuantity_  = " << clusterWorkQuantity_
2706               << ", clusterWorkQuality_  = " << clusterWorkQuality_
2707               << ", clusterSendCount_ = " << clusterSendCount_
2708               << ", clusterRecvCount_ = " << clusterRecvCount_
2709               << ", clusterSize_ = " << clusterSize_
2710               << ", status->MPI_SOURCE = " << status->MPI_SOURCE
2711               << std::endl;
2712 #endif
2713 
2714 }
2715 
2716 //#############################################################################
2717 
2718 // After receive master's require to donate work, this hub finds its most
2719 // loaded worker and ask it to donate some work to another hub, whose
2720 // information (id and load) is in bufLarge.
2721 void
hubsShareWork(char * & bufLarge,MPI_Status * status)2722 AlpsKnowledgeBrokerMPI::hubsShareWork(char*& bufLarge,
2723                                       MPI_Status* status)
2724 {
2725     int i;
2726     int pos = 0;
2727     int receiverID = 0;
2728     int maxLoadRank = -1;            // Global rank
2729     double maxLoad  = ALPS_DBL_MAX;
2730     int size = model_->AlpsPar()->entry(AlpsParams::smallSize);
2731 
2732     int skipRank;
2733 
2734     double receiverWorkload = 0.0;
2735 
2736     //------------------------------------------------------
2737     // Indentify the receiver and its current workload.
2738     //------------------------------------------------------
2739 
2740     MPI_Unpack(bufLarge, size, &pos, &receiverID, 1, MPI_INT, MPI_COMM_WORLD);
2741     MPI_Unpack(bufLarge, size, &pos, &receiverWorkload, 1, MPI_DOUBLE,
2742                MPI_COMM_WORLD);
2743 
2744     //------------------------------------------------------
2745     // Find the donor worker in hub's cluster.
2746     //------------------------------------------------------
2747 
2748     if (hubWork_) {
2749         skipRank = -1;
2750     }
2751     else {
2752         skipRank = clusterRank_;
2753     }
2754 
2755     for (i = 0; i < clusterSize_; ++i) {
2756         if (i != skipRank) {
2757             if ( (workerWorkQuantities_[i] > ALPS_QUALITY_TOL) &&
2758                  (workerWorkQualities_[i] < maxLoad) ) {
2759                 maxLoad = workerWorkQualities_[i];
2760                 maxLoadRank = globalRank_ - masterRank_ + i;
2761             }
2762         }
2763     }
2764 
2765     //------------------------------------------------------
2766     // Ask the donor to share work (send a subtree to receiving hub).
2767     //------------------------------------------------------
2768 
2769     if (maxLoadRank != -1) {
2770 #if 0
2771         std::cout << "HUB[" << globalRank_ << "] : will ask process "
2772                   << maxLoadRank
2773                   << " to donate workload to process " << receiverID
2774                   << std::endl;
2775 #endif
2776 
2777         if(maxLoadRank != globalRank_) { // Not hub
2778             // max loaded is NOT this hub.
2779 
2780             pos = 0;                   // Reset position to pack
2781             MPI_Pack(&receiverID, 1, MPI_INT, smallBuffer_, size,
2782                      &pos, MPI_COMM_WORLD);
2783             MPI_Pack(&receiverWorkload, 1, MPI_DOUBLE, smallBuffer_, size,
2784                      &pos, MPI_COMM_WORLD);
2785             MPI_Send(smallBuffer_, pos, MPI_PACKED, maxLoadRank,
2786                      AlpsMsgAskDonateToHub, MPI_COMM_WORLD);
2787             incSendCount("hubsShareWork");
2788         }
2789         else {
2790             // Max loaded is this hub.
2791             // NOTE: smallBuffer_ is NOT useful in this case.
2792             donateWork(smallBuffer_,
2793                        AlpsMsgSubTreeByMaster,
2794                        status,
2795                        receiverID,
2796                        receiverWorkload);
2797             incSendCount("hubsShareWork");
2798         }
2799     }
2800     else {
2801 #if 0
2802         std::cout << "HUB[" << globalRank_
2803                   << "] :can not find a process to donate workload to process "
2804                   << receiverID << std::endl;
2805 #endif
2806         if (msgLevel_ > 100) {
2807             messageHandler()->message(ALPS_LOADBAL_HUB_FAIL, messages())
2808                 << globalRank_ << receiverID << CoinMessageEol;
2809         }
2810 
2811         // See a empty msg to master to reduce masterDoBalance_ by 1
2812         MPI_Send(smallBuffer_, 0, MPI_PACKED, masterRank_,
2813                  AlpsMsgHubFailFindDonor, MPI_COMM_WORLD);
2814         incSendCount("hubsShareWork");
2815     }
2816 }
2817 
2818 //#############################################################################
2819 
2820 // Master balance the workload of hubs. Check if there are hubs has
2821 // no workload. If there is, do quantity balance. If not, do quality balance.
2822 void
masterBalanceHubs()2823 AlpsKnowledgeBrokerMPI::masterBalanceHubs()
2824 {
2825     const double zeroQuantity =
2826         model_->AlpsPar()->entry(AlpsParams::zeroLoad);
2827     if (systemWorkQuantity_ < zeroQuantity) {
2828         if (msgLevel_ > 100) {
2829             messageHandler()->message(ALPS_LOADBAL_MASTER_NO, messages())
2830                 << globalRank_ << systemWorkQuantity_ << CoinMessageEol;
2831         }
2832         return;
2833     }
2834 
2835     int i;
2836     int size = model_->AlpsPar()->entry(AlpsParams::smallSize);
2837 
2838     std::vector<std::pair<double, int> > loadIDVector;
2839 
2840     loadIDVector.reserve(hubNum_);
2841 
2842     std::multimap<double, int, std::greater<double> > receivers;
2843     std::multimap<double, int> donors;
2844 
2845     const double donorSh      =
2846         model_->AlpsPar()->entry(AlpsParams::donorThreshold);
2847     const double receiverSh   =
2848         model_->AlpsPar()->entry(AlpsParams::receiverThreshold);
2849 
2850     //------------------------------------------------------
2851     // Identify donors and receivers and decide do quality or quantity.
2852     // Do quantity balance immediately if any hubs do not have work.
2853     //------------------------------------------------------
2854 
2855     ++(psStats_.interBalance_);
2856 
2857     bool quantityBalance = false;
2858     for (i = 0; i < hubNum_; ++i) {
2859         if (hubWorkQuantities_[i] < ALPS_QUALITY_TOL) {    // Have not work
2860 #if 0
2861             std::cout << "++++ master balance: find donor hub " << hubRanks_[i]
2862                       << "; quantity = " << hubWorkQuantities_[i]
2863                       << std::endl;
2864 #endif
2865             receivers.insert(std::pair<double, int>(hubWorkQualities_[i],
2866                                                     hubRanks_[i]));
2867             quantityBalance = true;
2868         }
2869     }
2870 
2871     if (quantityBalance) {  // Quantity balance
2872         ++(psStats_.quantityBalance_);
2873 
2874         for (i = 0; i < hubNum_; ++i) {
2875             if (hubWorkQuantities_[i] > ALPS_QUALITY_TOL) {
2876 #if 0
2877                 std::cout << "++++ master balance: find donor hub "
2878                           << hubRanks_[i] << "; quantity = "
2879                           << hubWorkQuantities_[i] << std::endl;
2880 #endif
2881                 donors.insert(std::pair<double, int>(hubWorkQualities_[i],
2882                                                      hubRanks_[i]));
2883             }
2884         }
2885     }
2886     else {   // Quality balance
2887         ++(psStats_.qualityBalance_);
2888 
2889         double averQuality  = 0.0;
2890         for (i = 0; i < hubNum_; ++i) {
2891             averQuality += hubWorkQualities_[i];
2892         }
2893         averQuality /= hubNum_;
2894 
2895         for (i = 0; i < hubNum_; ++i) {
2896             double diff = hubWorkQualities_[i] - averQuality;
2897             double diffRatio = fabs(diff / (averQuality + 1.0));
2898             if (diff < 0 && diffRatio > donorSh) {  // Donor candidates
2899 #if 0
2900                 std::cout << "++++ master balance: find donor hub "
2901                           << hubRanks_[i] << "; quality = "
2902                           << hubWorkQualities_[i] << std::endl;
2903 #endif
2904                 donors.insert(std::pair<double, int>(hubWorkQualities_[i],
2905                                                      hubRanks_[i]));
2906             }
2907             else if (diff > 0 && diffRatio > receiverSh){// Receiver candidates
2908 #if 0
2909                 std::cout << "++++ master balance: quality: find receiver hub "
2910                           << hubRanks_[i] << "; quality = "
2911                           << hubWorkQualities_[i] << std::endl;
2912 #endif
2913                 receivers.insert(std::pair<double, int>(hubWorkQualities_[i],
2914                                                         hubRanks_[i]));
2915             }
2916         }
2917     }
2918 
2919     //------------------------------------------------------
2920     // Tell the donor hubs to send subtree to receiver hubs.
2921     //------------------------------------------------------
2922 
2923     const int numDonor    = (int) donors.size();
2924     const int numReceiver = (int) receivers.size();
2925     const int numPair     = CoinMin(numDonor, numReceiver);
2926 
2927 #if 0
2928     std::cout << "Master: donors size = " << numDonor
2929               << ", receiver size = " << numReceiver
2930               << ", numPair = " << numPair << std::endl;
2931 #endif
2932 
2933     int donorID;
2934     int receiverID;
2935 
2936     std::multimap<double, int, std::greater<double> >::iterator posD =
2937         donors.begin();
2938     std::multimap<double, int>::iterator posR = receivers.begin();
2939 
2940     for(i = 0; i < numPair; ++i) {
2941         donorID = posD->second;
2942         receiverID = posR->second;
2943 
2944 #if 0
2945         std::cout << "MASTER : receiver hub ID = " << receiverID
2946                   << "; quality = " << posR->first << std::endl;
2947         std::cout << "MASTER : donor hub ID = " << donorID
2948                   << "; quality = " << posD->first << std::endl;
2949 #endif
2950 
2951         if (CoinAbs(receiverID) >= processNum_ ||
2952             CoinAbs(donorID) >= processNum_) {
2953             std::cout << "ERROR: receiverID = " << receiverID <<  std::endl;
2954             std::cout << "ERROR : donorID = " << donorID << std::endl;
2955             throw
2956                 CoinError("receiverID>=processNum_ || donorID >= processNum_",
2957                           "masterBalanceHubs", "AlpsKnowledgeBrokerMPI");
2958 
2959         }
2960 
2961         if (donorID != masterRank_) {
2962             ++masterDoBalance_;
2963             masterAskHubDonate(donorID, receiverID, posR->first);
2964         }
2965         else {  // donor is the master
2966             double maxLoad = ALPS_DBL_MAX;
2967             int maxLoadRank = -1;
2968             int pos = 0;
2969             double recvLoad = posR->first;
2970 
2971             // Find the donor worker in hub's cluster
2972             for (int k = 0; k < clusterSize_; ++k) {
2973                 if (k != clusterRank_) {
2974                     if (workerWorkQualities_[k] < maxLoad) {
2975                         maxLoad = workerWorkQualities_[k];
2976                         maxLoadRank = globalRank_ - masterRank_ + k;
2977                     }
2978                 }
2979             }
2980 
2981             // FIXME: how many workload to share? Righ now just 1 subtree.
2982             // Ask the donor worker to send a subtree to receiving hub)
2983             if (maxLoadRank != -1) {
2984                 MPI_Pack(&receiverID, 1, MPI_INT, smallBuffer_, size, &pos,
2985                          MPI_COMM_WORLD);
2986                 MPI_Pack(&recvLoad, 1, MPI_DOUBLE, smallBuffer_, size, &pos,
2987                          MPI_COMM_WORLD);
2988 
2989                 incSendCount("masterBalanceHubs()");
2990                 MPI_Send(smallBuffer_, pos, MPI_PACKED, maxLoadRank,
2991                          AlpsMsgAskDonateToHub, MPI_COMM_WORLD);
2992                 ++masterDoBalance_;
2993 
2994 #ifdef NF_DEBUG_MORE
2995                 std::cout << "MASTER : ask its worker " << maxLoadRank
2996                           << " to donate load to hub"
2997                           << receiverID << std::endl;
2998 #endif
2999             }
3000         }
3001         ++posD;
3002         ++posR;
3003     }
3004 }
3005 
3006 //#############################################################################
3007 
3008 // After recieve status (in buf) of a hub, update system status
3009 void
masterUpdateSysStatus(char * & bufLarge,MPI_Status * status,MPI_Comm comm)3010 AlpsKnowledgeBrokerMPI::masterUpdateSysStatus(char*& bufLarge,
3011                                               MPI_Status* status,
3012                                               MPI_Comm comm)
3013 {
3014     int position = 0;
3015     int msgSendNum, msgRecvNum;
3016     int sender = -1;
3017     int size = model_->AlpsPar()->entry(AlpsParams::smallSize);
3018 
3019     if (comm == MPI_COMM_WORLD) {
3020         //Be careful with / or %
3021         sender = (int)(status->MPI_SOURCE) / userClusterSize_;
3022     }
3023     else if (comm == hubComm_) {
3024         sender = (int)(status->MPI_SOURCE);
3025     }
3026     else {
3027         std::cout << "unknown COMM in masterUpdateSysStatus"
3028                   << std::endl;
3029         throw CoinError("unknown sender",
3030                         "masterUpdateSysStatus", "AlpsKnowledgeBrokerMPI");
3031     }
3032 
3033     int preNodeProcessed = hubNodeProcesseds_[sender];
3034     int curNodeProcessed = 0;
3035     //double preQuality = hubWorkQualities_[sender];
3036     double curQuality;
3037     double preQuantity = hubWorkQuantities_[sender];
3038     double curQuantity;
3039     double npTime;
3040 
3041     MPI_Unpack(bufLarge, size, &position, &curNodeProcessed, 1, MPI_INT, comm);
3042     MPI_Unpack(bufLarge, size, &position, &curQuality, 1, MPI_DOUBLE, comm);
3043     MPI_Unpack(bufLarge, size, &position, &curQuantity, 1, MPI_DOUBLE, comm);
3044     MPI_Unpack(bufLarge, size, &position, &msgSendNum, 1, MPI_INT, comm);
3045     MPI_Unpack(bufLarge, size, &position, &msgRecvNum, 1, MPI_INT, comm);
3046     MPI_Unpack(bufLarge, size, &position, &npTime, 1, MPI_DOUBLE, comm);
3047     MPI_Unpack(bufLarge, size, &position, &unitWorkNodes_, 1, MPI_INT, comm);
3048 
3049     // Update the hub's status
3050     hubNodeProcesseds_[sender] = curNodeProcessed;
3051     hubWorkQualities_[sender] = curQuality;
3052     hubWorkQuantities_[sender] = curQuantity;
3053 
3054     // Update system status
3055     systemSendCount_ += msgSendNum;
3056     systemRecvCount_ += msgRecvNum;
3057 
3058     systemNodeProcessed_ += curNodeProcessed;
3059     systemNodeProcessed_ -= preNodeProcessed;
3060 
3061     systemWorkQuantity_ += curQuantity;
3062     systemWorkQuantity_ -= preQuantity;
3063 
3064     if (systemWorkQuantity_ < ALPS_QUALITY_TOL) {
3065         systemWorkQuality_ = ALPS_OBJ_MAX;
3066     }
3067     else {
3068         systemWorkQuality_ = std::min(systemWorkQuality_, curQuality);
3069     }
3070 
3071     if ( hubReported_[sender] != true ) {
3072         hubReported_[sender] = true;
3073     }
3074 
3075     int large = ALPS_INT_MAX/10;
3076     if (systemSendCount_ > large || systemRecvCount_ > large) {
3077         int minCount = std::min(systemSendCount_, systemRecvCount_);
3078         systemSendCount_ -= minCount;
3079         systemRecvCount_ -= minCount;
3080     }
3081 
3082     if (npTime != ALPS_NODE_PROCESS_TIME && npTime > 1.0e-10) {
3083         if (nodeProcessingTime_ == ALPS_NODE_PROCESS_TIME) {
3084             nodeProcessingTime_ = npTime;
3085         }
3086         else {
3087             nodeProcessingTime_ = 0.5 * (nodeProcessingTime_ + npTime);
3088         }
3089 
3090         masterBalancePeriod_ = computeBalancePeriod(userBalancePeriod_,
3091                                                     masterBalancePeriod_,
3092                                                     nodeProcessingTime_);
3093 #if 0
3094         std::cout << "MASTER[" << globalRank_ << "]: After updateSystem(),"
3095                   << "masterBalancePeriod_ =  " << masterBalancePeriod_
3096                   << std::endl;
3097 #endif
3098     }
3099 
3100 #ifdef NF_DEBUG
3101     std::cout << "MASTER[" << globalRank_
3102               << "]: After updateSystem() : curQuality "
3103               << curQuality
3104       //<< " preQuality = " << preQuality
3105               << ", hubReported_[" << sender << "] = "
3106               << hubReported_[sender]
3107               << ", systemSendCount_ = " << systemSendCount_
3108               << ", systemRecvCount_ = " << systemRecvCount_
3109               << ", systemWorkQuality_ = " << systemWorkQuality_
3110               << std::endl;
3111 #endif
3112 
3113 }
3114 
3115 //#############################################################################
3116 
3117 // Add master and master cluster status to system status
3118 void
refreshSysStatus()3119 AlpsKnowledgeBrokerMPI::refreshSysStatus()
3120 {
3121     //------------------------------------------------------
3122     // Add master's quantity (0 anyway) to hub 1.
3123     //------------------------------------------------------
3124 
3125     workerWorkQuantities_[masterRank_] = workQuantity_;
3126     clusterWorkQuantity_ += workerWorkQuantities_[masterRank_];
3127 
3128     //------------------------------------------------------
3129     // Add master's node processed num to hub 1.
3130     //------------------------------------------------------
3131 
3132     int preMasterNodeP = workerNodeProcesseds_[masterRank_];
3133     workerNodeProcesseds_[masterRank_] = nodeProcessedNum_;
3134     clusterNodeProcessed_ += workerNodeProcesseds_[masterRank_];
3135     clusterNodeProcessed_ -= preMasterNodeP;
3136 
3137     // Note: Nothing need to do about master's quality.
3138 
3139     //------------------------------------------------------
3140     // Add hub1(master) cluster's quantity into system.
3141     //------------------------------------------------------
3142 
3143     double preHub1QT = hubWorkQuantities_[0];
3144     hubWorkQuantities_[0] = clusterWorkQuantity_;
3145     systemWorkQuantity_  += hubWorkQuantities_[0];
3146     systemWorkQuantity_  -= preHub1QT;
3147 
3148     //------------------------------------------------------
3149     // Add hub1(master) cluster's number of nodes processed into system.
3150     //------------------------------------------------------
3151 
3152     int preHub1NodeP = hubNodeProcesseds_[0];
3153     hubNodeProcesseds_[0] = clusterNodeProcessed_;
3154     systemNodeProcessed_ += hubNodeProcesseds_[0];
3155     systemNodeProcessed_ -= preHub1NodeP;
3156 
3157     //------------------------------------------------------
3158     // Add hub1(master) cluster's quality into system.
3159     //------------------------------------------------------
3160 
3161     hubWorkQualities_[0] = clusterWorkQuality_;
3162     if (systemWorkQuantity_ < ALPS_QUALITY_TOL) {
3163         systemWorkQuality_ = ALPS_OBJ_MAX;
3164     }
3165     else {
3166         systemWorkQuality_ = std::min(systemWorkQuality_,hubWorkQualities_[0]);
3167     }
3168 
3169 #if 0
3170     int i;
3171 
3172     std::cout << "WORKLOAD: ";
3173     for (i = 0; i < clusterSize_; ++i) {
3174         std::cout << "worker[" <<i<< "] = "
3175                   << workerWorkQuantities_[i] << ", ";
3176     }
3177     std::cout << std::endl;
3178     for (i = 0; i < hubNum_; ++i) {
3179         std::cout << "hub[" <<i<< "] = " << hubWorkQuantities_[i]
3180                   << "; ";
3181     }
3182     std::cout << std::endl;
3183 #endif
3184 
3185     //------------------------------------------------------
3186     // Add master' msg counts to system.
3187     //------------------------------------------------------
3188 
3189     systemSendCount_ += sendCount_;
3190     systemRecvCount_ += recvCount_;
3191     sendCount_ = recvCount_ = 0;
3192 
3193     //------------------------------------------------------
3194     // Add cluster's msg counts to system.
3195     //------------------------------------------------------
3196 
3197     systemSendCount_ += clusterSendCount_;
3198     systemRecvCount_ += clusterRecvCount_;
3199     clusterSendCount_ = clusterRecvCount_ = 0;
3200 }
3201 
3202 //#############################################################################
3203 
3204 // Add the state of the hub to cluster state
3205 void
refreshClusterStatus()3206 AlpsKnowledgeBrokerMPI::refreshClusterStatus()
3207 {
3208     //------------------------------------------------------
3209     // Add hub's msg counts into cluster counts.
3210     //------------------------------------------------------
3211 
3212     clusterSendCount_ += sendCount_;
3213     clusterRecvCount_ += recvCount_;
3214     sendCount_ = recvCount_ = 0;        // IMPORTANT!
3215 
3216     //------------------------------------------------------
3217     // Add hub's quantity into cluster.
3218     //------------------------------------------------------
3219 
3220     double preWorkQuantity = workerWorkQuantities_[masterRank_];
3221     workerWorkQuantities_[masterRank_] = workQuantity_;
3222     clusterWorkQuantity_ -= preWorkQuantity;
3223     clusterWorkQuantity_ += workQuantity_;
3224 
3225     //------------------------------------------------------
3226     // Incorporate hub's quality.
3227     //------------------------------------------------------
3228 
3229     workerWorkQualities_[masterRank_] = workQuality_;
3230     clusterWorkQuality_ = std::min(clusterWorkQuality_, workQuality_);
3231 
3232     // Add hub's node processed num into cluster
3233     int PreNodeP = workerNodeProcesseds_[masterRank_];
3234     workerNodeProcesseds_[masterRank_] = nodeProcessedNum_;
3235     clusterNodeProcessed_ += workerNodeProcesseds_[masterRank_];
3236     clusterNodeProcessed_ -= PreNodeP;
3237 
3238 #ifdef NF_DEBUG_MORE
3239     std::cout << "Hub[" << globalRank_ << "]: clusterWorkQuality = "
3240               << clusterWorkQuality_ << std::endl;
3241     for (int i = 0; i < clusterSize_; ++i) {
3242         std::cout << "wQL[" <<i + globalRank_ << "] = "
3243                   << workerWorkQualities_[i] << ", ";
3244     }
3245     std::cout << std::endl;
3246 #endif
3247 }
3248 
3249 //#############################################################################
3250 
3251 // Unpack received incumbent value and process ID from bufLarge.
3252 // Update incumbent value and ID stored on this process.
3253 bool
unpackSetIncumbent(char * & bufLarge,MPI_Status * status)3254 AlpsKnowledgeBrokerMPI::unpackSetIncumbent(char*& bufLarge, MPI_Status* status)
3255 {
3256     bool accept = false;
3257     int size     =
3258         model_->AlpsPar()->entry(AlpsParams::smallSize);
3259     int position = 0;
3260     double incVal= 0.0;
3261     int incID    = 0;
3262 
3263     // Unpack incumbent value from bufLarge
3264     MPI_Unpack(bufLarge, size, &position, &incVal, 1, MPI_DOUBLE,
3265                MPI_COMM_WORLD);
3266     MPI_Unpack(bufLarge, size, &position, &incID, 1, MPI_INT,
3267                MPI_COMM_WORLD);
3268 
3269     if (incID == globalRank_) {
3270 
3271 #ifdef NF_DEBUG_MORE
3272         std::cout << "PROC " << globalRank_
3273                   << " : unpack and ingore incVal = " << incVal << " at PROC"
3274                   << incID << ", since I am " << incID
3275                   << std::endl;
3276 #endif
3277         return false;  // Do nothing
3278     }
3279 
3280     // Assume minimization
3281     if (incVal < incumbentValue_) {
3282         // Better solution
3283         ++solNum_;
3284         incumbentValue_ = incVal;
3285         incumbentID_ = incID;
3286         accept = true;
3287 
3288         if (globalRank_ == masterRank_) {
3289             bestSolNode_ = systemNodeProcessed_;
3290         }
3291 
3292 #ifdef NF_DEBUG_MORE
3293         std::cout << "PROC " << globalRank_ << " : accept incVal = "
3294                   << incVal << " from PROC " << incID << std::endl;
3295 #endif
3296 
3297         //updateIncumbent_ = true;        // The incumbent value is updated.
3298     }
3299     else if(incVal == incumbentValue_ ) {
3300         ++solNum_;
3301         if (incID < incumbentID_) {     // So that all process consistant
3302             incumbentValue_ = incVal;
3303             incumbentID_ = incID;
3304             accept = true;
3305         }
3306         else{
3307             accept = false;
3308 
3309 #ifdef NF_DEBUG_MORE
3310             std::cout << "PROC " << globalRank_
3311                       << " : recv but discard incVal = " << incVal
3312                       << " from PROC (SAME) " << incID << std::endl;
3313 #endif
3314         }
3315     }
3316     else {   // >
3317         accept = false;
3318 
3319 #ifdef NF_DEBUG_MORE
3320         std::cout << "PROC " << globalRank_
3321                   << " : recv but discard incVal = " << incVal
3322                   << " from PROC (WORSE)" << incID << std::endl;
3323 #endif
3324     }
3325 
3326     return accept;
3327 }
3328 
3329 //#############################################################################
3330 
3331 void
workerReportStatus(int tag,MPI_Comm comm)3332 AlpsKnowledgeBrokerMPI::workerReportStatus(int tag,
3333                                            MPI_Comm comm)
3334 {
3335     int pos = 0;
3336     int size = model_->AlpsPar()->entry(AlpsParams::smallSize);
3337 
3338     int receiver;
3339 
3340     if (comm == MPI_COMM_WORLD)	{
3341         receiver = myHubRank_;
3342     }
3343     else if (comm == clusterComm_) {
3344         receiver = masterRank_;
3345     }
3346     else {
3347         std::cout << "WORKER " << globalRank_
3348                   <<" : Unkown Comm in workerReportStatus" << std::endl;
3349         throw CoinError("Unkown Comm",
3350                         "workerReportStatus",
3351                         "AlpsKnowledgeBrokerMPI");
3352     }
3353 
3354     MPI_Pack(&nodeProcessedNum_, 1, MPI_INT, smallBuffer_, size, &pos, comm);
3355     MPI_Pack(&workQuality_, 1, MPI_DOUBLE, smallBuffer_, size, &pos, comm);
3356     MPI_Pack(&workQuantity_, 1, MPI_DOUBLE, smallBuffer_, size, &pos, comm);
3357     MPI_Pack(&sendCount_, 1, MPI_INT, smallBuffer_, size, &pos, comm);
3358     MPI_Pack(&recvCount_, 1, MPI_INT, smallBuffer_, size, &pos, comm);
3359     MPI_Pack(&nodeProcessingTime_, 1, MPI_DOUBLE, smallBuffer_, size, &pos,
3360              comm);
3361     MPI_Pack(&unitWorkNodes_, 1, MPI_INT, smallBuffer_, size, &pos, comm);
3362 
3363     MPI_Send(smallBuffer_, pos, MPI_PACKED, receiver, tag, comm);
3364 
3365 #if 0
3366     std::cout << "WORKER " << globalRank_
3367               << " : report quality = " << workQuality_
3368               << " : report quantity = " << workQuantity_
3369               << " to hub "
3370               << myHubRank_ << "; Tag = " << tag << "; sendCount_ = "
3371               << sendCount_ << "; recvCount_ = " << recvCount_
3372               << "unitWorkNodes_ = " << unitWorkNodes_
3373               << std::endl;
3374 #endif
3375 
3376     sendCount_ = recvCount_ = 0;      // Only count new message
3377 }
3378 
3379 //#############################################################################
3380 
3381 void
workerRecvIndices(char * & bufLarge)3382 AlpsKnowledgeBrokerMPI::workerRecvIndices(char *&bufLarge)
3383 {
3384     int position = 0;
3385     int size = 100;
3386     int nextIndex;
3387     int maxIndex;
3388 
3389     MPI_Unpack(bufLarge, size, &position, &nextIndex, 1, MPI_INT,
3390                MPI_COMM_WORLD);
3391     MPI_Unpack(bufLarge, size, &position, &maxIndex, 1, MPI_INT,
3392                MPI_COMM_WORLD);
3393 
3394 #if 0
3395     std::cout << "++++ worker[" << globalRank_
3396               << "] received indices from master"
3397               << ", nextIndex = " << nextIndex
3398               << ", maxIndex = " << maxIndex
3399               << std::endl;
3400 #endif
3401 
3402     if (nextIndex < 0 || maxIndex < 0 || nextIndex > maxIndex) {
3403         haltSearch_ = true;
3404     }
3405     else {
3406         haltSearch_ = false;
3407         setNextNodeIndex(nextIndex);
3408         setMaxNodeIndex(maxIndex);
3409     }
3410 }
3411 
3412 //#############################################################################
3413 
3414 void
workerAskIndices()3415 AlpsKnowledgeBrokerMPI::workerAskIndices()
3416 {
3417     int position = 0;
3418     int size = model_->AlpsPar()->entry(AlpsParams::smallSize);
3419 
3420 #if 0
3421     std::cout << "++++ worker[" << globalRank_ << "] ask indices from master."
3422               << std::endl;
3423 #endif
3424 
3425     MPI_Pack(&globalRank_, 1, MPI_INT, smallBuffer_, size, &position,
3426              MPI_COMM_WORLD);
3427     MPI_Send(smallBuffer_, position, MPI_PACKED, masterRank_,
3428              AlpsMsgWorkerAskIndices, MPI_COMM_WORLD);
3429     incSendCount("workerAskIndices()");
3430 }
3431 
3432 //#############################################################################
3433 
3434 void
masterSendIndices(char * & bufLarge)3435 AlpsKnowledgeBrokerMPI::masterSendIndices(char *&bufLarge)
3436 {
3437     int nextIndex = getNextNodeIndex();
3438     int maxIndex = getMaxNodeIndex();
3439     int leftIndex = maxIndex - nextIndex;
3440 
3441     int pos = 0;
3442     int size = 100;
3443     int recvWorker = -1;
3444 
3445     if (leftIndex >  masterIndexBatch_) {
3446         maxIndex = nextIndex + masterIndexBatch_;
3447         setNextNodeIndex(maxIndex + 1);
3448     }
3449     else if (leftIndex > 100) {
3450         setNextNodeIndex(maxIndex);
3451     }
3452     else {
3453         // No index
3454         forceTerminate_ = true;
3455         nextIndex = -1;
3456         maxIndex = -1;
3457     }
3458 
3459     // Unpack the rank of worker.
3460     MPI_Unpack(bufLarge, size, &pos, &recvWorker, 1, MPI_INT, MPI_COMM_WORLD);
3461 
3462 #if 0
3463     std::cout << "++++ master[" << globalRank_ << "] send indices to worker "
3464               << recvWorker << ", nextIndex = " << nextIndex
3465               << ", maxIndex = " << maxIndex
3466               << std::endl;
3467 #endif
3468 
3469     // Pack nextIndex and maxIndex into small buffer and send it.
3470     pos = 0;
3471     MPI_Pack(&nextIndex, 1, MPI_INT, smallBuffer_, size, &pos,
3472              MPI_COMM_WORLD);
3473     MPI_Pack(&maxIndex, 1, MPI_INT, smallBuffer_, size, &pos,
3474              MPI_COMM_WORLD);
3475     MPI_Send(smallBuffer_, pos, MPI_PACKED, recvWorker,
3476              AlpsMsgIndicesFromMaster, MPI_COMM_WORLD);
3477     incSendCount("masterSendIndices");
3478 }
3479 
3480 //#############################################################################
3481 
3482 void
broadcastModel(const int id,const int sender)3483 AlpsKnowledgeBrokerMPI::broadcastModel(const int id, const int sender)
3484 {
3485     char* modelBuffer = 0;
3486     int size = -1;
3487     int position = 0;  // Initialize to 0
3488     AlpsEncoded* encodedModel = NULL;
3489 
3490     //------------------------------------------------------
3491     // Encode matrix and pack into modelBuffer.
3492     //------------------------------------------------------
3493 
3494     if (id == sender) {
3495         // Pack model
3496         encodedModel = model_->encode();
3497         packEncoded(encodedModel, modelBuffer, size, position, MPI_COMM_WORLD);
3498 
3499 #ifdef NF_DEBUG
3500         std::cout << "MASTER: packed model. size = "
3501                   << size << std::endl;
3502 #endif
3503     }
3504 
3505     //------------------------------------------------------
3506     // Broadcost the size of matrix.
3507     //------------------------------------------------------
3508 
3509     // Broadcast modelBuffer size first
3510     MPI_Bcast(&size, 1, MPI_INT, sender, MPI_COMM_WORLD);
3511 
3512     if (id != sender) {
3513         // Process except master receive the size of model.
3514         if (size <= 0) {
3515             throw CoinError("Msg size <= 0",
3516                             "broadcastModel",
3517                             "AlpsKnowledgeBrokerMPI");
3518         }
3519         if( modelBuffer != NULL ) {
3520             delete [] modelBuffer;
3521             modelBuffer = NULL;
3522         }
3523         modelBuffer = new char[size + 100];
3524     }
3525 
3526     //------------------------------------------------------
3527     // Broadcost matrix.
3528     //------------------------------------------------------
3529 
3530     MPI_Bcast(modelBuffer, size, MPI_CHAR, sender, MPI_COMM_WORLD);
3531 
3532     if (id != sender) {
3533 
3534         //--------------------------------------------------
3535         // Unpack to encoded model.
3536         //--------------------------------------------------
3537 
3538         position = 0;
3539         encodedModel = unpackEncoded(modelBuffer,
3540                                      position,
3541                                      MPI_COMM_WORLD,
3542                                      size+100);
3543 
3544 #ifdef NF_DEBUG
3545         std::cout << "PROCESS[" <<id<< "]: start to decode model."
3546                   << ", knowledge type="<< encodedModel->type() << std::endl;
3547 #endif
3548 
3549         //--------------------------------------------------
3550         // Note AlpsDataPool have a application model, do not need to
3551         // create more than one model. Just need to fill in model data.
3552         //--------------------------------------------------
3553 
3554         model_->decodeToSelf(*encodedModel);
3555 
3556 #ifdef NF_DEBUG
3557         std::cout << "PROCESS[" <<id<< "]: finished decoding model."
3558                   << std::endl;
3559 #endif
3560 
3561         //--------------------------------------------------
3562         // Set up self.
3563         //--------------------------------------------------
3564 
3565         //(modifyDataPool()->getModel())->setupSelf();
3566 
3567         model_->setupSelf();
3568     }
3569 
3570     if (modelBuffer) {
3571         delete [] modelBuffer;
3572         modelBuffer = 0;
3573     }
3574     if (encodedModel) {
3575         delete encodedModel;
3576         encodedModel = 0;
3577     }
3578 }
3579 
3580 //#############################################################################
3581 
3582 void
sendIncumbent()3583 AlpsKnowledgeBrokerMPI::sendIncumbent()
3584 {
3585     int position = 0;
3586     int size = model_->AlpsPar()->entry(AlpsParams::smallSize);
3587 
3588     int mySeq = rankToSequence(incumbentID_, globalRank_);
3589     int leftSeq = leftSequence(mySeq, processNum_);
3590     int rightSeq = rightSequence(mySeq, processNum_);
3591 
3592     if (mySeq >= processNum_ || leftSeq >= processNum_
3593         || rightSeq >= processNum_) {
3594         std::cout << "sequence is Wrong !!!" << std::endl;
3595         abort();
3596     }
3597 
3598     MPI_Pack(&incumbentValue_, 1, MPI_DOUBLE, smallBuffer_, size, &position,
3599              MPI_COMM_WORLD);
3600     MPI_Pack(&incumbentID_, 1, MPI_INT, smallBuffer_, size, &position,
3601              MPI_COMM_WORLD);
3602 
3603     if (leftSeq != -1) {
3604         int leftRank = sequenceToRank(incumbentID_, leftSeq);
3605 #if 0
3606         std::cout << "PROC " <<  globalRank_
3607                   <<" : init send a solution - L,  value = "
3608                   << incumbentValue_ << " to "<< leftRank << std::endl;
3609 #endif
3610         MPI_Isend(smallBuffer_, position, MPI_PACKED, leftRank,
3611                   AlpsMsgIncumbentTwo, MPI_COMM_WORLD, &forwardRequestL_);
3612     }
3613 
3614     if (rightSeq != -1) {
3615         int rightRank = sequenceToRank(incumbentID_, rightSeq);
3616 #if 0
3617         std::cout << "PROC " <<  globalRank_
3618                   <<" : init send a solution - R,  value = "
3619                   << incumbentValue_ << " to "<< rightRank << std::endl;
3620 #endif
3621         MPI_Isend(smallBuffer_, position, MPI_PACKED, rightRank,
3622                   AlpsMsgIncumbentTwo, MPI_COMM_WORLD, &forwardRequestR_);
3623     }
3624 
3625     if (leftSeq != -1) {
3626         MPI_Status sentStatusL;
3627         MPI_Wait(&forwardRequestL_, &sentStatusL);
3628         incSendCount("sendIncumbent()");
3629     }
3630 
3631     if (rightSeq != -1) {
3632         MPI_Status sentStatusR;
3633         MPI_Wait(&forwardRequestR_, &sentStatusR);
3634         incSendCount("sendIncumbent()");
3635     }
3636 }
3637 
3638 //#############################################################################
3639 
3640 void
collectBestSolution(int destination)3641 AlpsKnowledgeBrokerMPI::collectBestSolution(int destination)
3642 {
3643     MPI_Status status;
3644     int sender = incumbentID_;
3645     int position = 0;
3646 
3647 #ifdef NF_DEBUG
3648     std::cout << "CollectBestSolution: sender=" << sender
3649               << ", destination=" << destination << std::endl;
3650 #endif
3651 
3652     if ( sender == destination ) {
3653         // DO NOTHING since the best solution is in the solution of its pool
3654     }
3655     else {
3656         if (globalRank_ == sender) {                 // Send solu
3657             char* senderBuf = NULL;  // MUST init to NULL, otherwise
3658             int size = -1;           // packEncoded(..) crashes.
3659 
3660             const AlpsSolution* solu = static_cast<const AlpsSolution* >
3661                 (getBestKnowledge(AlpsKnowledgeTypeSolution).first);
3662 
3663             double value = getBestKnowledge(AlpsKnowledgeTypeSolution).second;
3664 
3665             AlpsEncoded* enc = solu->encode();
3666             packEncoded(enc, senderBuf, size, position, MPI_COMM_WORLD);
3667 
3668             sendSizeBuf(senderBuf, size, position, destination,
3669                         AlpsMsgIncumbent, MPI_COMM_WORLD);
3670             MPI_Send(&value, 1, MPI_DOUBLE, destination, AlpsMsgIncumbent,
3671                      MPI_COMM_WORLD);
3672             if(senderBuf) {
3673                 delete [] senderBuf;
3674                 senderBuf = NULL;
3675             }
3676 
3677             delete enc;
3678             enc = NULL;
3679 
3680 #ifdef NF_DEBUG
3681             std::cout << "CollectBestSolution: sender " << sender
3682                       << " sent solution " << value << std::endl;
3683 #endif
3684         }
3685         else if (globalRank_ == destination) {            // Recv solu
3686             double value = 0.0;
3687             char* destBuf = NULL;  //new char [ls];
3688 
3689             receiveSizeBuf(destBuf, sender, AlpsMsgIncumbent,
3690                            MPI_COMM_WORLD, &status);
3691             MPI_Recv(&value, 1, MPI_DOUBLE, sender, AlpsMsgIncumbent,
3692                      MPI_COMM_WORLD, &status);
3693 
3694             position = 0;
3695             AlpsEncoded* encodedSolu = unpackEncoded(destBuf,
3696                                                      position,
3697                                                      MPI_COMM_WORLD);
3698             AlpsSolution* bestSolu = static_cast<AlpsSolution* >
3699                 ( decoderObject(encodedSolu->type())->decode(*encodedSolu) );
3700 
3701             addKnowledge(AlpsKnowledgeTypeSolution, bestSolu, value);
3702 
3703             if(destBuf) {
3704                 delete [] destBuf;
3705                 destBuf = NULL;
3706             }
3707             if (encodedSolu) {
3708                 delete encodedSolu;
3709                 encodedSolu = NULL;
3710             }
3711 #ifdef NF_DEBUG
3712             std::cout << "CollectBestSolution: destination " <<destination
3713                       << " received solution " << value << std::endl;
3714 #endif
3715         }
3716     }
3717 }
3718 
3719 //#############################################################################
3720 
3721 void
tellMasterRecv()3722 AlpsKnowledgeBrokerMPI::tellMasterRecv()
3723 {
3724     char* dummyBuf = 0;
3725     MPI_Send(dummyBuf, 0, MPI_CHAR, masterRank_, AlpsMsgTellMasterRecv,
3726              MPI_COMM_WORLD);
3727     incSendCount("tellMasterRecv()");
3728 }
3729 
3730 //#############################################################################
3731 
3732 void
tellHubRecv()3733 AlpsKnowledgeBrokerMPI::tellHubRecv()
3734 {
3735     char* dummyBuf = 0;
3736     if (globalRank_ == myHubRank_)
3737         --hubDoBalance_;
3738     else {
3739         MPI_Send(dummyBuf, 0, MPI_CHAR, myHubRank_, AlpsMsgTellHubRecv,
3740                  MPI_COMM_WORLD);
3741         incSendCount("tellHubRecv()");
3742     }
3743 }
3744 
3745 //#############################################################################
3746 
3747 void
packEncoded(AlpsEncoded * enc,char * & packBuffer,int & size,int & position,MPI_Comm comm)3748 AlpsKnowledgeBrokerMPI::packEncoded(AlpsEncoded* enc,
3749                                     char*& packBuffer,
3750                                     int& size,
3751                                     int& position,
3752                                     MPI_Comm comm)
3753 {
3754     const int bufSpare = model_->AlpsPar()->entry(AlpsParams::bufSpare);
3755 
3756     int type = static_cast<int>(enc->type());
3757     int repSize = static_cast<int>(enc->size());
3758 
3759     if(!packBuffer) {
3760         size = static_cast<int>(repSize + 2*sizeof(int) + bufSpare);
3761         packBuffer = new char[size];
3762     }
3763 
3764     // Pack repSize, type_, representation_ of enc
3765     MPI_Pack(&type, 1, MPI_INT, packBuffer, size, &position, comm);
3766     MPI_Pack(&repSize, 1, MPI_INT, packBuffer, size, &position, comm);
3767     //MPI_Pack(const_cast<char*>(type), typeSize, MPI_CHAR,
3768     //packBuffer, size, &position, comm);
3769     MPI_Pack(const_cast<char*>(enc->representation()), repSize, MPI_CHAR,
3770              packBuffer, size, &position, comm);
3771 }
3772 
3773 //#############################################################################
3774 
3775 AlpsEncoded*
unpackEncoded(char * & unpackBuffer,int & position,MPI_Comm comm,int size)3776 AlpsKnowledgeBrokerMPI::unpackEncoded(char*& unpackBuffer,
3777                                       int& position,
3778                                       MPI_Comm comm,
3779                                       int size)
3780 {
3781     int type, repSize;
3782     AlpsEncoded *encoded = NULL;
3783 
3784     if (size <= 0) {
3785         size = largeSize_;
3786     }
3787 
3788     // Unpack typeSize, repSize, type and rep from unpackBuffer_
3789     MPI_Unpack(unpackBuffer, size, &position, &type, 1, MPI_INT, comm);
3790 
3791 #if defined(NF_DEBUG_MORE)
3792     std::cout << "PROC "<< globalRank_ <<" : typeSize is "
3793               << typeSize << ";\tsize = "<< size <<  std::endl;
3794 #endif
3795 
3796     MPI_Unpack(unpackBuffer, size, &position, &repSize, 1, MPI_INT, comm);
3797 
3798     char *rep = new char[repSize + 1];
3799     MPI_Unpack(unpackBuffer, size, &position, rep, repSize, MPI_CHAR, comm);
3800     rep[repSize] = '\0';
3801 
3802 #if defined(NF_DEBUG_MORE)
3803     std::cout << "PROC "<< globalRank_ << ": type = " << type
3804               << "; repSize = " << repSize << std::endl;
3805 #endif
3806 
3807     // NOTE: Take over the memory of rep, but not type.
3808     encoded = new AlpsEncoded(type, repSize, rep );
3809 
3810     return encoded;
3811 }
3812 
3813 //#############################################################################
3814 
3815 void
receiveSizeBuf(char * & buf,int sender,int tag,MPI_Comm comm,MPI_Status * status)3816 AlpsKnowledgeBrokerMPI::receiveSizeBuf(char*& buf,
3817                                        int sender,
3818                                        int tag,
3819                                        MPI_Comm comm,
3820                                        MPI_Status* status)
3821 {
3822     int size = -1;
3823 
3824     // First recv the msg size
3825     MPI_Recv(&size, 1, MPI_INT, sender, MPI_ANY_TAG, comm, status);
3826     if (status->MPI_TAG == AlpsMsgFinishInit)
3827         return;
3828 
3829     if (size < 0) {
3830         throw CoinError("size < 0", "receiveSizeBuf",
3831                         "AlpsKnowledgeBrokerMPI");
3832     }
3833 
3834     // Second, allocate memory for buf
3835     if (buf != 0) {
3836         delete [] buf;  buf = 0;
3837     }
3838     buf = new char [size];
3839 
3840     // Third, receive MPI packed data and put in buf
3841     MPI_Recv(buf, size, MPI_PACKED, sender, tag, comm, status);
3842 }
3843 
3844 //#############################################################################
3845 
3846 // Block receive a node and store in rampUpSubTree_
3847 // Used in rampup
3848 void
receiveRampUpNode(int sender,MPI_Comm comm,MPI_Status * status)3849 AlpsKnowledgeBrokerMPI::receiveRampUpNode(int sender,
3850                                           MPI_Comm comm,
3851                                           MPI_Status* status)
3852 {
3853     char* buf = 0;
3854     int position = 0;
3855 
3856     receiveSizeBuf(buf, sender, AlpsMsgNode, comm, status);
3857 
3858     if (status->MPI_TAG == AlpsMsgFinishInit ) {
3859         //std::cout << "PROC: " << globalRank_
3860         //        <<" : rec AlpsMsgFinishInit... STOP INIT." << std::endl;
3861     }
3862     else if (status->MPI_TAG == AlpsMsgNode) {
3863         AlpsEncoded* encodedNode = unpackEncoded(buf, position, comm);
3864 
3865 #ifdef NF_DEBUG
3866         std::cout << "WORKER: received and unpacked a node." << std::endl;
3867         std::cout << "WORKER: type() is " << encodedNode->type() << std::endl;
3868         std::cout << "WORKER: finish unpacking node." << std::endl;
3869         std::cout << "WORKER: start to decode node." << std::endl;
3870 
3871         //    const AlpsTreeNode* bugNode = dynamic_cast<const AlpsTreeNode* >(
3872         //        AlpsKnowledge::decoderObject(encodedNode->type()));
3873         const AlpsTreeNode* bugNode = dynamic_cast<const AlpsTreeNode* >
3874             ( decoderObject(encodedNode->type()) );
3875 
3876         std::cout <<"WORKER: bugNode's Priority = "
3877                   << bugNode->getQuality() << std::endl;
3878         std::cout << "WORKER: finish just decoding node." << std::endl;
3879 #endif
3880 
3881         AlpsTreeNode* node = dynamic_cast<AlpsTreeNode* >
3882             ( decoderObject(encodedNode->type())->decode(*encodedNode) );
3883 
3884 #ifdef NF_DEBUG_MORE
3885         std::cout << "WORKER " << globalRank_ << " : received a node from "
3886                   << sender << std::endl;
3887 
3888 #endif
3889         // Make the node as a subtree root, then add it to the
3890         // local node pool
3891 
3892         //node->setSubTree(st);
3893         node->setBroker(this);
3894         // todo(aykut) node desc does not hold a pointer to the model
3895         // it holds  pointer to the broker.
3896         //node->modifyDesc()->setModel(model_);
3897         node->setParent(NULL);
3898 
3899         // Do not want to do this, parent index is unique.
3900         //node->setParentIndex(-1);
3901 
3902         // Can not do this since there are other status.
3903         // node->setStatus(AlpsNodeStatusCandidate);
3904 
3905         rampUpSubTree_->nodePool()->addKnowledge(node, node->getQuality());
3906         assert(rampUpSubTree_->getNumNodes() > 0);
3907         if ( (rampUpSubTree_->nodePool()->getNumKnowledges() ) == 1) {
3908             // Make the first node as root.
3909             rampUpSubTree_->setRoot(node);
3910         }
3911 
3912         if (encodedNode) {
3913             delete encodedNode;
3914             encodedNode = 0;
3915         }
3916         node = NULL;
3917     }
3918     else {
3919         std::cout << "PROC " << globalRank_ << " : receiveRampUpNode"
3920                   << " : received UNKNOWN message: tag = " << status->MPI_TAG
3921                   << "; sender = " << status->MPI_SOURCE <<std::endl;
3922         throw CoinError("Unknow message type",
3923                         "receiveSizeNode()", "AlpsKnowledgeBrokerMPI");
3924     }
3925 
3926     if (buf != 0) {
3927         delete [] buf;
3928         buf = 0;
3929     }
3930 
3931 }
3932 
3933 //#############################################################################
3934 
3935 void
receiveSubTree(char * & bufLarge,int sender,MPI_Status * status)3936 AlpsKnowledgeBrokerMPI::receiveSubTree(char*& bufLarge,
3937                                        int sender,
3938                                        MPI_Status* status)
3939 {
3940 #ifdef NF_DEBUG_MORE
3941     std::cout << "WORKER " << globalRank_
3942               << " : start to receive a subtree from " << sender
3943               << "; num of subtrees = "
3944               << getNumKnowledges(AlpsKnowledgeTypeSubTree)
3945               << "; hasKnowlege() = "
3946               << subTreePool_->hasKnowledge() <<std::endl;
3947 #endif
3948 
3949     int count = 0;
3950     int position = 0;
3951 
3952     MPI_Get_count(status, MPI_PACKED, &count);
3953 
3954     if (count <= 0) {
3955 
3956 #ifdef NF_DEBUG_MORE
3957         std::cout << "PROC " << globalRank_
3958                   << " : ask for a subtree but receive nothing from "
3959                   << status->MPI_SOURCE << std::endl;
3960 #endif
3961         return;
3962     }
3963 
3964     if (status->MPI_TAG == AlpsMsgSubTree ||
3965         status->MPI_TAG == AlpsMsgSubTreeByMaster ||
3966         status->MPI_TAG == AlpsMsgSubTreeByWorker) {
3967 
3968         AlpsEncoded* encodedST = unpackEncoded(bufLarge,
3969                                                position,
3970                                                MPI_COMM_WORLD);
3971         AlpsSubTree* tempST = dynamic_cast<AlpsSubTree*>
3972             (const_cast<AlpsKnowledge *>
3973              (decoderObject(AlpsKnowledgeTypeSubTree)))->
3974             newSubTree();
3975 
3976         tempST->setBroker(this);
3977         tempST->setNodeSelection(nodeSelection_);
3978 
3979         AlpsSubTree* subTree =
3980             dynamic_cast<AlpsSubTree* >(tempST->decode(*encodedST) );
3981 
3982         assert(subTree->getNumNodes() > 0);
3983 
3984         subTree->calculateQuality();
3985         addKnowledge(AlpsKnowledgeTypeSubTree, subTree, subTree->getQuality());
3986 
3987 
3988 #if 0
3989         std::cout << "WORKER " << globalRank_ << " : received a subtree from "
3990                   << sender << " of size " << count
3991                   << "; num of subtrees = "
3992                   << getNumKnowledges(AlpsKnowledgeTypeSubTree) <<std::endl;
3993 #endif
3994 
3995         if (encodedST) {
3996             delete encodedST;
3997             encodedST = 0;
3998         }
3999         if (tempST) {
4000             delete tempST;
4001             tempST = 0;
4002         }
4003         subTree = 0;
4004 
4005     }
4006     else {
4007         std::cout << "PROC " << globalRank_ << " : receiveSubTree"
4008                   <<" : received UNKNOWN message, tag = " << status->MPI_TAG
4009                   << ", source = " << status->MPI_SOURCE <<std::endl;
4010         throw CoinError("Unknow message type",
4011                         "receiveSubTree()", "AlpsKnowledgeBrokerMPI");
4012     }
4013 
4014 }
4015 
4016 //#############################################################################
4017 
4018 void
sendSizeBuf(char * & buf,int size,int position,const int target,const int tag,const MPI_Comm comm)4019 AlpsKnowledgeBrokerMPI::sendSizeBuf(char*& buf,
4020                                     int size,
4021                                     int position,
4022                                     const int target,
4023                                     const int tag,
4024                                     const MPI_Comm comm)
4025 {
4026     // If packing successfully, send buf size and buf to target
4027     if (size >= 0) {
4028         MPI_Send(&size, 1, MPI_INT, target, AlpsMsgSize, comm);
4029         MPI_Send(buf, position, MPI_PACKED, target, tag, comm);
4030     }
4031     else
4032         throw CoinError("Msg size is < 0", "send", "AlpsKnowledgeBrokerMPI");
4033 }
4034 
4035 //#############################################################################
4036 
4037 // Send a node from rampUpSubTree's node pool.
4038 // NOTE: comm can be hubComm_ or clusterComm_.
4039 void
sendRampUpNode(const int receiver,MPI_Comm comm)4040 AlpsKnowledgeBrokerMPI::sendRampUpNode(const int receiver, MPI_Comm comm)
4041 {
4042     char* buf = 0;
4043     int size = 0;
4044     int position = 0;
4045 
4046     AlpsTreeNode* node = dynamic_cast<AlpsTreeNode* >
4047         (rampUpSubTree_->nodePool()->getKnowledge().first);
4048 
4049     AlpsEncoded* enc = node->encode();
4050 
4051     rampUpSubTree_->nodePool()->popKnowledge();
4052 
4053     delete node;   // Since sending to other process
4054 
4055     packEncoded(enc, buf, size, position, comm);
4056     sendSizeBuf(buf, size, position, receiver, AlpsMsgNode, comm);
4057 
4058     if (buf) {
4059         delete [] buf;
4060         buf = 0;
4061     }
4062 
4063     if (enc) {
4064         delete enc;
4065         enc = 0;                 // Allocated in encode()
4066     }
4067 }
4068 
4069 //#############################################################################
4070 
4071 // Send a node from rampUpSubTree's node pool and generated model knowledge
4072 void
sendNodeModelGen(int receiver,int doUnitWork)4073 AlpsKnowledgeBrokerMPI::sendNodeModelGen(int receiver, int doUnitWork)
4074 {
4075     int position = 0;
4076     AlpsEncoded* enc = NULL;
4077 
4078     // Pack if doUnitWork
4079     MPI_Pack(&doUnitWork, 1, MPI_INT, largeBuffer_, largeSize_, &position,
4080              MPI_COMM_WORLD);
4081 
4082     // Pack a node
4083     AlpsTreeNode* node = dynamic_cast<AlpsTreeNode* >
4084         (rampUpSubTree_->nodePool()->getKnowledge().first);
4085     enc = node->encode();
4086     rampUpSubTree_->nodePool()->popKnowledge();
4087     delete node;   // Since sending to other process
4088 
4089     packEncoded(enc, largeBuffer_, largeSize_, position, MPI_COMM_WORLD);
4090     delete enc;
4091     enc = NULL;
4092 
4093     // Pack generated model knowledge
4094     int hasKnowledge = 0;
4095     enc = model_->packSharedKnowlege();
4096     if (enc) {
4097         hasKnowledge = 1;
4098         // Pack flag indicating that there is model knowledge
4099         MPI_Pack(&hasKnowledge, 1, MPI_INT, largeBuffer_, largeSize_,
4100                  &position, MPI_COMM_WORLD);
4101         // pack knowledge
4102         packEncoded(enc, largeBuffer_, largeSize_, position, MPI_COMM_WORLD);
4103         delete enc;
4104         enc = NULL;
4105     }
4106     else {
4107         // Pack flag indicating that there is no model knowledge
4108         MPI_Pack(&hasKnowledge, 1, MPI_INT, largeBuffer_, largeSize_,
4109                  &position, MPI_COMM_WORLD);
4110     }
4111 
4112     MPI_Send(largeBuffer_, position, MPI_PACKED, receiver, AlpsMsgNode,
4113              MPI_COMM_WORLD);
4114 
4115 }
4116 
4117 //#############################################################################
4118 
4119 // return sent or not.
4120 bool
sendSubTree(const int receiver,AlpsSubTree * & st,int tag)4121 AlpsKnowledgeBrokerMPI::sendSubTree(const int receiver,
4122                                     AlpsSubTree*& st,
4123                                     int tag)
4124 {
4125 #ifdef NF_DEBUG
4126     std::cout << "WORKER["<< globalRank_
4127               << "]: start to donate a subtree to PROC " << receiver
4128               << std::endl;
4129 #endif
4130 
4131     bool success = false;
4132     char* buf = 0;
4133     int size = 0;
4134     int position = 0;
4135 
4136     AlpsEncoded* enc = st->encode();
4137     packEncoded(enc, buf, size, position, MPI_COMM_WORLD);
4138 
4139 #if 0
4140     std::cout << "WORKER["<< globalRank_
4141               << "]: donor a subtree to PROC " << receiver
4142               << "; buf pos = " << position
4143               << "; buf size = " << size
4144               << "; largeSize_ = " << largeSize_ <<  std::endl;
4145 #endif
4146 
4147     assert(size > 0);
4148 
4149     if (size <= largeSize_) {
4150         // Attach buffer
4151 #if 0
4152         //MPI_Bsend(buf, position, MPI_PACKED, receiver, tag, MPI_COMM_WORLD);
4153         MPI_Send(buf, position, MPI_PACKED, receiver, tag, MPI_COMM_WORLD);
4154 #endif
4155         if (!attachBuffer_) {
4156             int attachSize = largeSize_* 6 + MPI_BSEND_OVERHEAD;
4157             attachBuffer_ =  new char [attachSize];
4158             MPI_Buffer_attach(attachBuffer_, attachSize);
4159         }
4160         MPI_Ibsend(buf, position, MPI_PACKED, receiver, tag,
4161                    MPI_COMM_WORLD, &subTreeRequest_);
4162 
4163         success = true;
4164     }
4165     else {
4166         // msg size is larger than buffer size. Must not happen.
4167         success = false;
4168         std::cout << "WARNING: Subtree size is larger than message buffer size, will split it." << std::endl;
4169     }
4170 
4171     if (buf) {
4172         delete [] buf;
4173         buf = 0;
4174     }
4175     if (enc) {
4176         delete enc;
4177         enc = 0;                 // Allocated in encode()
4178     }
4179 
4180     return success;
4181 }
4182 
4183 //#############################################################################
4184 
4185 void
sendFinishInit(const int receiver,MPI_Comm comm)4186 AlpsKnowledgeBrokerMPI::sendFinishInit(const int receiver,
4187                                        MPI_Comm comm)
4188 {
4189     char* dummyBuf = 0;
4190     MPI_Send(dummyBuf, 0, MPI_PACKED, receiver, AlpsMsgFinishInit, comm);
4191 }
4192 
4193 //#############################################################################
4194 
deleteSubTrees()4195 void AlpsKnowledgeBrokerMPI::deleteSubTrees()
4196 {
4197     if (workingSubTree_) {
4198         delete workingSubTree_;
4199         workingSubTree_ = NULL;
4200         needWorkingSubTree_ = true;
4201     }
4202     subTreePool_-> deleteGuts();
4203 }
4204 
4205 //#############################################################################
4206 
4207 void
initializeSearch(int argc,char * argv[],AlpsModel & model)4208 AlpsKnowledgeBrokerMPI::initializeSearch(int argc,
4209                                          char* argv[],
4210                                          AlpsModel& model)
4211 {
4212 
4213     //------------------------------------------------------
4214     // Store a pointer to model.
4215     //------------------------------------------------------
4216 
4217     model.setBroker(this);
4218     model_ = &model;
4219 
4220     //------------------------------------------------------
4221     // Init msg env.
4222     //------------------------------------------------------
4223 
4224     MPI_Init(&argc, &argv);
4225     MPI_Comm_rank(MPI_COMM_WORLD, &globalRank_);
4226     MPI_Comm_size(MPI_COMM_WORLD, &processNum_);
4227 
4228     // CORRECTME
4229     // NOTE: masterRank_ is 0 or 1 (debug). Must smaller than cluster size.
4230     masterRank_ = 0;
4231 
4232     int color, i;
4233     int key   = globalRank_;
4234 
4235     //------------------------------------------------------
4236     // Register knowledge before broadcasting model.
4237     //------------------------------------------------------
4238 
4239     model.registerKnowledge();
4240 
4241     //------------------------------------------------------
4242     // Master read in parameters and model data.
4243     //------------------------------------------------------
4244 
4245     if (globalRank_ == masterRank_) {
4246 
4247         //--------------------------------------------------
4248         // Read in params.
4249         //--------------------------------------------------
4250 
4251         model.readParameters(argc, argv);
4252 
4253         msgLevel_ = model_->AlpsPar()->entry(AlpsParams::msgLevel);
4254         hubNum_ = model_->AlpsPar()->entry(AlpsParams::hubNum);
4255 
4256         if (msgLevel_ > 0) {
4257 	std::cout << "==  Welcome to the Abstract Library for Parallel Search (ALPS) \n";
4258 	std::cout << "==  Copyright 2000-2019 Lehigh University and others \n";
4259 	    std::cout << "==  All Rights Reserved. \n";
4260 	    std::cout << "==  Distributed under the Eclipse Public License 1.0 \n";
4261 	    if (strcmp(ALPS_VERSION, "trunk")){
4262 		std::cout << "==  Version: " << ALPS_VERSION << std::endl;
4263 	    }else{
4264 		std::cout << "==  Version: Trunk (unstable) \n";
4265 	    }
4266 	    std::cout << "==  Build Date: " <<  __DATE__;
4267 #ifdef ALPS_SVN_REV
4268             //std::cout << "\n==  Revision Number: " << ALPS_SVN_REV;
4269 #endif
4270             std::cout << std::endl;
4271         }
4272 
4273         // 12/20/06, do we need print parameter file name?
4274         if (msgLevel_ > 100) {
4275             messageHandler()->message(ALPS_PARAMFILE, messages())
4276                 << argv[2] << CoinMessageEol;
4277         }
4278 
4279         //--------------------------------------------------
4280         // Set up logfile if requred.
4281         //--------------------------------------------------
4282 
4283         logFileLevel_ = model_->AlpsPar()->entry(AlpsParams::logFileLevel);
4284         if (logFileLevel_ > 0) {    // Require log file
4285             logfile_ = model_->AlpsPar()->entry(AlpsParams::logFile);
4286         }
4287 
4288         //--------------------------------------------------
4289         // Read in model data if required.
4290         //--------------------------------------------------
4291 
4292         if (argc == 2) {
4293             // FIXME: only work for mpich, not lam
4294             model_->AlpsPar()->setEntry(AlpsParams::instance, argv[1]);
4295         }
4296 
4297         std::string dataFile = model_->AlpsPar()->entry(AlpsParams::instance);
4298 
4299         if (dataFile != "NONE") {
4300             if (msgLevel_ > 0) {
4301                 messageHandler()->message(ALPS_DATAFILE, messages())
4302                     << dataFile.c_str() << CoinMessageEol;
4303             }
4304 
4305             model.readInstance(dataFile.c_str());
4306 
4307             if (logFileLevel_ > 0) {
4308 
4309                 std::string fileDir = dataFile;
4310                 std::string::size_type pos1 =
4311                     fileDir.rfind ('/', std::string::npos);
4312                 if (pos1 == std::string::npos) {
4313                     // No /, put at beginning.
4314                     pos1 = 0;
4315                     //std::cout << "No /, pos1 = "<< pos1 << std::endl;
4316                 }
4317                 else {
4318                     //std::cout << "Found /, pos1 = "<< pos1 << std::endl;
4319                     ++pos1;
4320                 }
4321 
4322                 std::string::size_type pos2 = fileDir.find (".mps", pos1);
4323 
4324                 if (pos2 == std::string::npos) {
4325                     // No .mps
4326                     //std::cout << "No .mps" << std::endl;
4327 
4328                     pos2 = fileDir.find (".gz", pos1);
4329                     if (pos2 == std::string::npos) {
4330                         // No .gz
4331                         pos2 = fileDir.length();
4332                         //std::cout << "No .gz, pos2 = "<< pos2 << std::endl;
4333                     }
4334                     else {
4335                         //std::cout << "Found .gz, pos2 = "<< pos2 << std::endl;
4336                     }
4337                 }
4338 
4339                 // Sub-string from pos1 to pos2(not included)
4340                 int length = static_cast<int>(pos2 - pos1);
4341                 // std::cout << "pos1=" << pos1 <<", pos2="<< pos2
4342                 //        << ", lenght=" << length << std::endl;
4343 
4344                 instanceName_ = fileDir.substr(pos1, length);
4345                 logfile_ = instanceName_ + ".log";
4346 
4347                 model_->AlpsPar()->setEntry(AlpsParams::logFile,
4348                                             logfile_.c_str());
4349 
4350                 std::ofstream logFout(logfile_.c_str());
4351 
4352                 logFout << "\n\n================================================"
4353                         << std::endl;
4354                 logFout << "Problem = " << instanceName_ << std::endl;
4355                 logFout << "Data file = " << dataFile << std::endl;
4356                 logFout << "Log file = " << logfile_ << std::endl;
4357                 std::cout << "Problem = " << instanceName_ << std::endl;
4358                 std::cout << "Log file = " << logfile_ << std::endl;
4359             }
4360         }
4361 
4362         // modifyDataPool()->setAppParams(&userParams);// setupself needs userPar
4363 
4364         model.preprocess();
4365         model.setupSelf();
4366         //std::cout << "Here1" << std::endl;
4367     }
4368 
4369     MPI_Barrier(MPI_COMM_WORLD);
4370 
4371     //------------------------------------------------------
4372     // Broadcast model.
4373     //------------------------------------------------------
4374 
4375     broadcastModel(globalRank_, masterRank_);
4376 
4377     MPI_Barrier(MPI_COMM_WORLD);
4378 
4379     //------------------------------------------------------
4380     // Deside the cluster size and actual hubNum_.
4381     //------------------------------------------------------
4382 
4383     msgLevel_ = model_->AlpsPar()->entry(AlpsParams::msgLevel);
4384     hubMsgLevel_ = model_->AlpsPar()->entry(AlpsParams::hubMsgLevel);
4385     workerMsgLevel_ = model_->AlpsPar()->entry(AlpsParams::workerMsgLevel);
4386     messageHandler()->setLogLevel(msgLevel_);
4387     hubNum_ = model_->AlpsPar()->entry(AlpsParams::hubNum);
4388 
4389     timer_.limit_ = model_->AlpsPar()->entry(AlpsParams::timeLimit);
4390 
4391     while(true) {
4392         if (hubNum_ > 0) {
4393             userClusterSize_ = 1;
4394             while (userClusterSize_ * hubNum_ < processNum_) {
4395                 ++userClusterSize_;
4396             }
4397             // [0,...,cluSize-1] in group 1
4398             color = globalRank_ / userClusterSize_;
4399         }
4400         else {
4401             std::cout << "hubNum_ <= 0" << std::endl;
4402             throw CoinError("hubNum_ <= 0",
4403                             "initSearch",
4404                             "AlpsKnowledgeBrokerMPI");
4405         }
4406 
4407         // more than 1 proc in the last cluser
4408         if (processNum_- userClusterSize_ * (hubNum_ - 1) > 1) {
4409             break;
4410         }
4411         else {
4412             --hubNum_;
4413         }
4414     }
4415     if ( (globalRank_ == masterRank_) && (msgLevel_ > 0) ) {
4416         messageHandler()->message(ALPS_HUB_NUM, messages())
4417             << hubNum_ << CoinMessageEol;
4418     }
4419 
4420     //------------------------------------------------------
4421     // Create clusterComm_.
4422     //------------------------------------------------------
4423 
4424     MPI_Comm_split(MPI_COMM_WORLD, color, key, &clusterComm_);
4425     MPI_Comm_rank(clusterComm_, &clusterRank_);
4426     MPI_Comm_size(clusterComm_, &clusterSize_);
4427 
4428 #if 0
4429     std::cout << "+++ masterRank_ = " << masterRank_
4430               << ", clusterSize_ = " << clusterSize_
4431               << ", userClusterSize_ = " << userClusterSize_ << std::endl;
4432 #endif
4433 
4434     //------------------------------------------------------
4435     // Create hubGroup_ and hubComm_.
4436     //------------------------------------------------------
4437 
4438     hubRanks_ = new int [hubNum_];
4439     int k = 0;
4440     for (i = 0; i < processNum_; i += userClusterSize_) {
4441         hubRanks_[k++] = i + masterRank_;
4442     }
4443 
4444     MPI_Group worldGroup;
4445     MPI_Comm_group(MPI_COMM_WORLD, &worldGroup);
4446     MPI_Group_incl(worldGroup, hubNum_, hubRanks_, &hubGroup_);
4447     MPI_Comm_create(MPI_COMM_WORLD, hubGroup_, &hubComm_);
4448 
4449     //------------------------------------------------------
4450     // Allcoate index, classify process types, set up knowledge pools
4451     //   Master  :  [0, INT_MAX/5]
4452     //   Others  :  divide [INT_MAX/5 + 1, INT_MAX]
4453     //------------------------------------------------------
4454 
4455     int workerDown = -1, workerUp = -1;
4456     int masterDown = 0;
4457     int masterUp = static_cast<int>(ALPS_INT_MAX / 5);
4458     int workerIndexR = static_cast<int>( static_cast<double>(ALPS_INT_MAX - masterUp)/(processNum_-1));
4459     workerIndexR -= 2;  // leave some buffer
4460 
4461     if (globalRank_ < masterRank_) {
4462         workerDown = masterUp + globalRank_ * workerIndexR;
4463         workerUp = workerDown + workerIndexR;
4464     }
4465     else if (globalRank_ > masterRank_) {
4466         workerDown = masterUp + (globalRank_-1) * workerIndexR;
4467         workerUp = workerDown + workerIndexR;
4468     }
4469 
4470     masterIndexBatch_ = masterUp / (processNum_ * 2);
4471 
4472     //------------------------------------------------------
4473     // Decide process type.
4474     //------------------------------------------------------
4475 
4476     if (globalRank_ == masterRank_) {
4477         processType_ = AlpsProcessTypeMaster;
4478         setNextNodeIndex(masterDown);
4479         setMaxNodeIndex(masterUp);
4480 
4481 #if 0
4482         std::cout << "masterDown = " << masterDown
4483                   << "; masterUp = " << masterUp
4484                   << "; workerDown = " << workerDown
4485                   << "; workerUp = " << workerUp
4486                   << "; masterIndexBatch_ = " << masterIndexBatch_
4487                   << std::endl;
4488 #endif
4489     }
4490     else if ( (globalRank_ % userClusterSize_) == masterRank_ ) {
4491         processType_ = AlpsProcessTypeHub;
4492         setNextNodeIndex(workerDown);
4493         setMaxNodeIndex(workerUp);
4494     }
4495     else {
4496         processType_ = AlpsProcessTypeWorker;
4497         setNextNodeIndex(workerDown);
4498         setMaxNodeIndex(workerUp);
4499     }
4500 
4501     // Determine if hubs process nodes.
4502     int hubWorkClusterSizeLimit =
4503         model_->AlpsPar()->entry(AlpsParams::hubWorkClusterSizeLimit);
4504     if (userClusterSize_ > hubWorkClusterSizeLimit) {
4505         hubWork_ = false;
4506     }
4507     else {
4508         hubWork_ = true;
4509     }
4510 
4511     if (!processTypeList_) {
4512         delete [] processTypeList_;
4513         processTypeList_ = NULL;
4514     }
4515     processTypeList_ = new AlpsProcessType [processNum_];
4516     for (i = 0; i < processNum_; ++i) {
4517         if (i == masterRank_) {
4518             processTypeList_[i] = AlpsProcessTypeMaster;
4519         }
4520         else if ( (i % userClusterSize_) == masterRank_) {
4521             processTypeList_[i] = AlpsProcessTypeHub;
4522         }
4523         else {
4524             processTypeList_[i] = AlpsProcessTypeWorker;
4525         }
4526     }
4527 
4528     //------------------------------------------------------
4529     // Set hub's global rank for workers and hubs.
4530     //------------------------------------------------------
4531 
4532     if (processType_ == AlpsProcessTypeWorker ||
4533         processType_ == AlpsProcessTypeHub) {
4534 
4535         myHubRank_ =
4536             (globalRank_ / userClusterSize_) * userClusterSize_ + masterRank_;
4537 
4538 #ifdef NF_DEBUG
4539         std::cout << "PROCESS[" << globalRank_ << "] : my hub rank = "
4540                   << myHubRank_ << std::endl;
4541 #endif
4542     }
4543 
4544     //------------------------------------------------------
4545     // Set up knowledge pools.
4546     //------------------------------------------------------
4547 
4548     setupKnowledgePools();
4549     MPI_Barrier(MPI_COMM_WORLD);
4550 
4551     //------------------------------------------------------
4552     // Set max number of solutions.
4553     //------------------------------------------------------
4554 
4555     const int mns = model_->AlpsPar()->entry(AlpsParams::solLimit);
4556     setMaxNumKnowledges(AlpsKnowledgeTypeSolution, mns);
4557 
4558     //------------------------------------------------------
4559     // Set clock type
4560     //------------------------------------------------------
4561 
4562     const bool clockType =
4563       model_->AlpsPar()->entry(AlpsParams::clockType);
4564 
4565     timer_.setClockType(clockType);
4566     subTreeTimer_.setClockType(clockType);
4567     tempTimer_.setClockType(clockType);
4568 
4569     //------------------------------------------------------
4570     // Allocate memory. TODO.
4571     //------------------------------------------------------
4572 }
4573 
4574 //#############################################################################
4575 
4576 /** Search best solution for a given model. */
search(AlpsModel * model)4577 void AlpsKnowledgeBrokerMPI::search(AlpsModel *model)
4578 {
4579     AlpsTreeNode* root = NULL;
4580     if (getProcRank() == masterRank_) {
4581         // Only master need create root.
4582         // NOTE: masterRank_ has been assigned in intializeSearch().
4583         root = model->createRoot();
4584         root->setBroker(this);
4585         root->modifyDesc()->setBroker(this);
4586     }
4587 
4588     rootSearch(root);
4589 }
4590 
4591 //#############################################################################
4592 
rootSearch(AlpsTreeNode * root)4593 void AlpsKnowledgeBrokerMPI::rootSearch(AlpsTreeNode* root)
4594 {
4595 
4596     timer_.start();
4597 
4598     //------------------------------------------------------
4599     // Call main functions.
4600     //------------------------------------------------------
4601 
4602     if (processType_ == AlpsProcessTypeMaster) {
4603         masterMain(root);
4604     }
4605     else if(processType_ == AlpsProcessTypeHub) {
4606         hubMain();
4607     }
4608     else if (processType_ == AlpsProcessTypeWorker){
4609         workerMain();
4610     }
4611     else {
4612         assert(0);
4613     }
4614 
4615     //------------------------------------------------------
4616     // Collect best solution.
4617     //------------------------------------------------------
4618 
4619     MPI_Barrier(MPI_COMM_WORLD);
4620     collectBestSolution(masterRank_);
4621 
4622     // Search to end.
4623     if (processType_ == AlpsProcessTypeMaster &&
4624         exitStatus_ == AlpsExitStatusUnknown) {
4625         if (hasKnowledge(AlpsKnowledgeTypeSolution)) {
4626             setExitStatus(AlpsExitStatusOptimal);
4627         }
4628         else {
4629             setExitStatus(AlpsExitStatusInfeasible);
4630         }
4631     }
4632 
4633     timer_.stop();
4634 
4635     //------------------------------------------------------
4636     // log statistics.
4637     //------------------------------------------------------
4638 
4639     if (processType_ == AlpsProcessTypeMaster) {
4640         model_->postprocess();
4641         model_->modelLog();
4642     }
4643     searchLog();
4644 }
4645 
4646 //#############################################################################
4647 
4648 // Master tell hubs to terminate due to reaching limits or other reason.
4649 void
masterForceHubTerm()4650 AlpsKnowledgeBrokerMPI::masterForceHubTerm()
4651 {
4652     char* buf = 0;
4653     forceTerminate_ = true;
4654     for (int i = 0; i < hubNum_; ++i) {
4655         if (i != masterRank_) {
4656             MPI_Send(buf, 0, MPI_PACKED, hubRanks_[i],
4657                      AlpsMsgForceTerm, MPI_COMM_WORLD);
4658             incSendCount("masterForceHubTerm");
4659         }
4660     }
4661 }
4662 
4663 //#############################################################################
4664 
4665 // Hub tell workers to terminate due to reaching limits or other reason.
4666 void
hubForceWorkerTerm()4667 AlpsKnowledgeBrokerMPI::hubForceWorkerTerm()
4668 {
4669     char* buf = 0;
4670     forceTerminate_ = true;
4671     for (int i = 0; i < clusterSize_; ++i) {
4672         if (i != clusterRank_) {
4673             MPI_Send(buf, 0, MPI_PACKED, globalRank_+i,
4674                      AlpsMsgForceTerm, MPI_COMM_WORLD);
4675             incSendCount("hubForceWorkerTerm");
4676         }
4677     }
4678 }
4679 
4680 //#############################################################################
4681 
4682 void
changeWorkingSubTree(double & changeWorkThreshold)4683 AlpsKnowledgeBrokerMPI::changeWorkingSubTree(double & changeWorkThreshold)
4684 {
4685     if ( workingSubTree_ && subTreePool_->hasKnowledge() ) {
4686         //std::cout << "Process[" << globalRank_ << "here 1" << std::endl;
4687         workingSubTree_->calculateQuality();
4688         //std::cout << "Process[" << globalRank_ << "here 2" << std::endl;
4689 
4690         double curQuality = workingSubTree_->getQuality();
4691         double topQuality = subTreePool_->getBestQuality();
4692         //double topQuality = subTreePool_->getKnowledge().second;
4693 
4694         if (curQuality > topQuality) {
4695             double perDiff = ALPS_FABS(curQuality - topQuality)/
4696                 (ALPS_FABS(curQuality) + 1.0e-9);
4697             if (perDiff > changeWorkThreshold) {
4698                 // Need put node in regular node pool.
4699                 workingSubTree_->reset();
4700 
4701                 AlpsSubTree* tempST = workingSubTree_;
4702                 workingSubTree_ = dynamic_cast<AlpsSubTree* >(
4703                     subTreePool_->getKnowledge().first);
4704                 subTreePool_->popKnowledge();
4705                 addKnowledge(AlpsKnowledgeTypeSubTree, tempST, curQuality);
4706                 ++(psStats_.subtreeChange_);
4707 
4708                 // Avoid too much change.
4709                 if (psStats_.subtreeChange_ / 10 == 0) {
4710                     changeWorkThreshold *= 2.0;
4711                 }
4712 #if 0
4713                 std::cout << "Process[" << globalRank_
4714                           << "]: change subtree " << curQuality
4715                           << " to " << topQuality
4716                           << ", new working tree nodes "
4717                           << workingSubTree_->getNumNodes()
4718                           << std::endl;
4719 #endif
4720 
4721             }
4722         }
4723     }
4724 }
4725 
4726 //#############################################################################
4727 
4728 void
searchLog()4729 AlpsKnowledgeBrokerMPI::searchLog()
4730 {
4731     int i, j;
4732     int* numProcessed = 0;
4733     int* numPartial = 0;
4734     int* numBranched = 0;
4735     int* numDiscarded = 0;
4736     int* numLeft = 0;
4737     int* depths = 0;
4738     double* idles = 0;
4739     double* msgTimes = 0;
4740     double* rampUps = 0;
4741     double* rampDowns = 0;
4742     double *cpuTimes =  NULL;
4743     double *wallClocks = NULL;
4744 
4745     int *qualityBalance = 0;
4746     int *quantityBalance = 0;
4747     int *interBalance = 0;
4748     int *intraBalance = 0;
4749     int *workerAsk = 0;
4750     int *donateSuccess = 0;
4751     int *donateFail = 0;
4752     int *subtreeSplit = 0;
4753     int *subtreeWhole = 0;
4754     int *subtreeChange = 0;
4755 
4756     if (globalRank_ == masterRank_) {
4757         numProcessed = new int [processNum_];
4758         numPartial = new int [processNum_];
4759         numBranched = new int [processNum_];
4760         numDiscarded = new int [processNum_];
4761         numLeft = new int [processNum_];
4762         depths = new int [processNum_];
4763         idles = new double [processNum_];
4764         msgTimes = new double [processNum_];
4765         rampUps = new double [processNum_];
4766         rampDowns = new double [processNum_];
4767         cpuTimes = new double [processNum_];
4768         wallClocks = new double [processNum_];
4769         qualityBalance = new int [processNum_];
4770         quantityBalance = new int [processNum_];
4771         interBalance = new int [processNum_];
4772         intraBalance = new int [processNum_];
4773         workerAsk = new int [processNum_];
4774         donateSuccess = new int [processNum_];
4775         donateFail = new int [processNum_];
4776         subtreeSplit = new int [processNum_];
4777         subtreeWhole = new int [processNum_];
4778         subtreeChange = new int [processNum_];
4779     }
4780 
4781     MPI_Gather(&nodeProcessedNum_, 1, MPI_INT, numProcessed, 1, MPI_INT,
4782                masterRank_, MPI_COMM_WORLD);
4783     MPI_Gather(&nodePartialNum_, 1, MPI_INT, numPartial, 1, MPI_INT,
4784                masterRank_, MPI_COMM_WORLD);
4785     MPI_Gather(&nodeBranchedNum_, 1, MPI_INT, numBranched, 1, MPI_INT,
4786                masterRank_, MPI_COMM_WORLD);
4787     MPI_Gather(&nodeDiscardedNum_, 1, MPI_INT, numDiscarded, 1, MPI_INT,
4788                masterRank_, MPI_COMM_WORLD);
4789     MPI_Gather(&nodeLeftNum_, 1, MPI_INT, numLeft, 1, MPI_INT,
4790                masterRank_, MPI_COMM_WORLD);
4791     MPI_Gather(&treeDepth_, 1, MPI_INT, depths, 1, MPI_INT, masterRank_,
4792                MPI_COMM_WORLD);
4793     MPI_Gather(&idleTime_, 1, MPI_DOUBLE, idles, 1, MPI_DOUBLE, masterRank_,
4794                MPI_COMM_WORLD);
4795     MPI_Gather(&msgTime_, 1, MPI_DOUBLE, msgTimes, 1, MPI_DOUBLE, masterRank_,
4796                MPI_COMM_WORLD);
4797     MPI_Gather(&rampUpTime_, 1, MPI_DOUBLE, rampUps, 1, MPI_DOUBLE,
4798                masterRank_, MPI_COMM_WORLD);
4799     MPI_Gather(&rampDownTime_, 1, MPI_DOUBLE, rampDowns, 1, MPI_DOUBLE,
4800                masterRank_, MPI_COMM_WORLD);
4801     MPI_Gather(&(timer_.cpu_), 1, MPI_DOUBLE, cpuTimes, 1, MPI_DOUBLE,
4802                masterRank_, MPI_COMM_WORLD);
4803     MPI_Gather(&(timer_.wall_), 1, MPI_DOUBLE, wallClocks, 1, MPI_DOUBLE,
4804                masterRank_, MPI_COMM_WORLD);
4805     MPI_Gather(&(psStats_.qualityBalance_), 1, MPI_INT, qualityBalance, 1,
4806                MPI_INT, masterRank_, MPI_COMM_WORLD);
4807     MPI_Gather(&(psStats_.quantityBalance_), 1, MPI_INT, quantityBalance, 1,
4808                 MPI_INT, masterRank_, MPI_COMM_WORLD);
4809     MPI_Gather(&(psStats_.interBalance_), 1, MPI_INT, interBalance, 1,
4810                MPI_INT, masterRank_, MPI_COMM_WORLD);
4811     MPI_Gather(&(psStats_.intraBalance_), 1, MPI_INT, intraBalance, 1,
4812                MPI_INT, masterRank_, MPI_COMM_WORLD);
4813     MPI_Gather(&(psStats_.workerAsk_), 1, MPI_INT, workerAsk, 1,
4814                MPI_INT, masterRank_, MPI_COMM_WORLD);
4815     MPI_Gather(&(psStats_.donateSuccess_), 1, MPI_INT, donateSuccess, 1,
4816                MPI_INT, masterRank_, MPI_COMM_WORLD);
4817     MPI_Gather(&(psStats_.donateFail_), 1, MPI_INT, donateFail, 1,
4818                MPI_INT, masterRank_, MPI_COMM_WORLD);
4819     MPI_Gather(&(psStats_.subtreeSplit_), 1, MPI_INT, subtreeSplit, 1,
4820                MPI_INT, masterRank_, MPI_COMM_WORLD);
4821     MPI_Gather(&(psStats_.subtreeWhole_), 1, MPI_INT, subtreeWhole, 1,
4822                MPI_INT, masterRank_, MPI_COMM_WORLD);
4823     MPI_Gather(&(psStats_.subtreeChange_), 1, MPI_INT, subtreeChange, 1,
4824                MPI_INT, masterRank_, MPI_COMM_WORLD);
4825 
4826     if (processType_ == AlpsProcessTypeMaster) {
4827         int numWorkers = 0;   // Number of process are processing nodes.
4828 
4829         int sumSize = 0, aveSize = 0, maxSize = 0, minSize = ALPS_INT_MAX;
4830         int sumDep = 0, aveDep = 0, maxDep = 0, minDep = ALPS_INT_MAX;
4831 
4832         double sumIdle = 0.0, aveIdle = 0.0;
4833         double maxIdle = 0.0, minIdle = ALPS_DBL_MAX;
4834 
4835         double sumMsgTime = 0.0, aveMsgTime = 0.0;
4836         double maxMsgTime = 0.0, minMsgTime = ALPS_DBL_MAX;
4837 
4838         double sumRampUp = 0.0, aveRampUp = 0.0;
4839         double maxRampUp = 0.0, minRampUp = ALPS_DBL_MAX;
4840 
4841         double sumRampDown = 0.0, aveRampDown = 0.0;
4842         double maxRampDown = 0.0, minRampDown = ALPS_DBL_MAX;
4843 
4844         double sumCpuTime = 0.0, aveCpuTime = 0.0;
4845         double maxCpuTime = 0.0, minCpuTime = ALPS_DBL_MAX;
4846 
4847         double sumWallClock = 0.0, aveWallClock = 0.0;
4848         double maxWallClock = 0.0, minWallClock = ALPS_DBL_MAX;
4849 
4850         double varSize = 0.0, stdSize = 0.0;
4851         double varDep = 0.0, stdDep = 0.0;
4852         double varIdle = 0.0, stdIdle = 0.0;
4853         double varMsgTime = 0.0, stdMsgTime = 0.0;
4854         double varRampUp = 0.0, stdRampUp = 0.0;
4855         double varRampDown = 0.0, stdRampDown = 0.0;
4856 
4857         int totalProcessed = 0;
4858         int totalPartial = 0;
4859         int totalBranched = 0;
4860         int totalDiscarded = 0;
4861         int totalLeft = 0;
4862 
4863         int sumQualityBalance = 0;
4864         int sumQuantityBalance = 0;
4865         int sumInterBalance = 0;
4866         int sumIntraBalance = 0;
4867         int sumWorkerAsk = 0;
4868         int sumDonateSuccess = 0;
4869         int sumDonateFail = 0;
4870         int sumSubtreeSplit = 0;
4871         int sumSubtreeWhole = 0;
4872         int sumSubtreeChange = 0;
4873 
4874         if(logFileLevel_ > 0 || msgLevel_ > 0) {
4875             for(i = 0; i < processNum_; ++i) {
4876                 totalProcessed += numProcessed[i];
4877                 totalPartial += numPartial[i];
4878                 totalBranched += numBranched[i];
4879                 totalDiscarded += numDiscarded[i];
4880                 totalLeft += numLeft[i];
4881 
4882                 sumRampUp += rampUps[i];
4883                 sumCpuTime += cpuTimes[i];
4884                 sumWallClock += wallClocks[i];
4885 
4886                 sumQualityBalance += qualityBalance[i];
4887                 sumQuantityBalance += quantityBalance[i];
4888                 sumInterBalance += interBalance[i];
4889                 sumIntraBalance += intraBalance[i];
4890                 sumWorkerAsk += workerAsk[i];
4891                 sumDonateSuccess += donateSuccess[i];
4892                 sumDonateFail += donateFail[i];
4893                 sumSubtreeSplit += subtreeSplit[i];
4894                 sumSubtreeWhole += subtreeWhole[i];
4895                 sumSubtreeChange += subtreeChange[i];
4896 
4897                 // Only valid for workers.
4898                 if (processTypeList_[i] == AlpsProcessTypeWorker) {
4899                     ++numWorkers;
4900 
4901                     sumSize += numProcessed[i];
4902                     sumDep += depths[i];
4903                     sumIdle += idles[i];
4904                     sumMsgTime += msgTimes[i];
4905                     sumRampDown += rampDowns[i];
4906 
4907                     if (numProcessed[i] > maxSize) maxSize = numProcessed[i];
4908                     if (numProcessed[i] < minSize) minSize = numProcessed[i];
4909 
4910                     if (depths[i] > maxDep) maxDep = depths[i];
4911                     if (depths[i] < minDep) minDep = depths[i];
4912 
4913                     if (idles[i] > maxIdle) maxIdle = idles[i];
4914                     if (idles[i] < minIdle) minIdle = idles[i];
4915 
4916                     if (msgTimes[i] > maxMsgTime) maxMsgTime = msgTimes[i];
4917                     if (msgTimes[i] < minMsgTime) minMsgTime = msgTimes[i];
4918 
4919                     if (rampDowns[i] > maxRampDown) maxRampDown = rampDowns[i];
4920                     if (rampDowns[i] < minRampDown) minRampDown = rampDowns[i];
4921                 }
4922 
4923                 if (cpuTimes[i] > maxCpuTime) maxCpuTime = cpuTimes[i];
4924                 if (cpuTimes[i] < minCpuTime) minCpuTime = cpuTimes[i];
4925 
4926                 if (wallClocks[i] > maxWallClock) maxWallClock = wallClocks[i];
4927                 if (wallClocks[i] < minWallClock) minWallClock = wallClocks[i];
4928 
4929                 if (rampUps[i] > maxRampUp) maxRampUp = rampUps[i];
4930                 if (rampUps[i] < minRampUp) minRampUp = rampUps[i];
4931             }
4932 
4933 #if 0
4934             // Does not make sense, 2/24/07
4935             for(i = 0; i < processNum_; ++i) { // Adjust idle and rampDown
4936                 if (numProcessed[i] > 0) {           // due to periodically checking.
4937                     //idles[i] -= minIdle;
4938                     //rampDowns[i] -= minRampDown;
4939                 }
4940             }
4941             sumIdle -= minIdle * numWorkers;
4942             sumRampDown -= minRampDown * numWorkers;
4943             maxIdle -= minIdle;
4944             minIdle = 0.0;
4945             maxRampDown -= minRampDown;
4946             minRampDown = 0.0;
4947 #endif
4948 
4949             if( numWorkers != 0) {
4950                 aveSize = sumSize / numWorkers;
4951                 aveDep = sumDep / numWorkers;
4952                 aveIdle = sumIdle / numWorkers;
4953                 aveMsgTime = sumMsgTime / numWorkers;
4954                 aveRampDown = sumRampDown / numWorkers;
4955             }
4956             aveRampUp = sumRampUp / processNum_;
4957 
4958             for (i = 0; i < processNum_; ++i) {
4959                 if (processTypeList_[i] == AlpsProcessTypeWorker) {
4960                     varSize += pow(numProcessed[i] - aveSize, 2);
4961                     varDep += pow(depths[i] - aveDep, 2);
4962                     varIdle = pow(idles[i] - aveIdle, 2);
4963                     varMsgTime = pow(msgTimes[i] - aveMsgTime, 2);
4964                     varRampDown = pow(rampDowns[i] - aveRampDown, 2);
4965                 }
4966                 varRampUp = pow(rampUps[i] - aveRampUp, 2);
4967             }
4968             if( numWorkers != 0) {
4969                 varSize /= numWorkers;
4970                 stdSize = sqrt(varSize);
4971                 varDep /= numWorkers;
4972                 stdDep = sqrt(varDep);
4973                 varIdle /= numWorkers;
4974                 stdIdle = sqrt(varIdle);
4975                 varMsgTime /= numWorkers;
4976                 stdMsgTime = sqrt(varMsgTime);
4977                 varRampDown /= numWorkers;
4978                 stdRampDown = sqrt(varRampDown);
4979                 varRampUp /= numWorkers;
4980             }
4981             stdRampUp = sqrt(varRampUp);
4982             aveCpuTime = sumCpuTime/processNum_;
4983             aveWallClock = sumWallClock/processNum_;
4984         }
4985 
4986         if (logFileLevel_ > 0) {
4987             std::ofstream logFout(logfile_.c_str(), std::ofstream::app);
4988 
4989             logFout << "Number of processes = " << processNum_ << std::endl;
4990             logFout << "Number of hubs = " << hubNum_ << std::endl;
4991 
4992             //----------------------------------------------
4993             // Tree size and depth.
4994             //----------------------------------------------
4995 
4996             logFout << "Number of nodes processed = "
4997                     << totalProcessed << std::endl;
4998             if (totalPartial > 0){
4999                logFout << "Number of nodes partially processed = "
5000                        << totalPartial << std::endl;
5001             }
5002             logFout << "Number of nodes branched = "
5003                     << totalBranched << std::endl;
5004             logFout << "Number of nodes pruned before processing = "
5005                     << totalDiscarded << std::endl;
5006             logFout << "Number of nodes left = "
5007                     << totalLeft << std::endl;
5008             logFout << "Max number of nodes processed by a worker = "
5009                     << maxSize << std::endl;
5010             logFout << "Min number of nodes processed by a worker = "
5011                     << (minSize != ALPS_INT_MAX ? minSize : 0) << std::endl;
5012             logFout << "Std Dev of number of nodes processed by a worker = "
5013                     << stdSize << std::endl;
5014             logFout << std::endl << "Max Tree Depth on workers = "
5015                     << maxDep << std::endl;
5016             logFout << "Min Tree Depth on workers = "
5017                     << (minDep != ALPS_INT_MAX ? minDep : 0) << std::endl;
5018             logFout << "Std Dev of Tree Depth on workers = "
5019                     << stdDep << std::endl;
5020 
5021             if (logFileLevel_ > 0) {
5022                 j = 0;
5023                 logFout << std::endl
5024                         << "Numbers of Nodes processed by processes: "
5025                         << std::endl;
5026                 for (i = 0; i < processNum_; ++i) {
5027                     ++j;
5028                     if (j % 5 == 0) logFout << std::endl;
5029                     logFout << numProcessed[i] << "\t";
5030                 }
5031 
5032                 j = 0;
5033                 logFout << std::endl << std::endl
5034                         << "Tree depths on processes: "
5035                         << std::endl;
5036                 for (i = 0; i < processNum_; ++i) {
5037                     ++j;
5038                     if (j % 5 == 0) logFout << std::endl;
5039                     logFout << depths[i] << "\t";
5040                 }
5041                 logFout << std::endl << std::endl;
5042             } // Log if logFileLevel_ > 0
5043 
5044             //----------------------------------------------
5045             // Times.
5046             //----------------------------------------------
5047 
5048             // Ramp up.
5049             logFout << "Average RampUp = " << aveRampUp << std::endl;
5050             logFout << "Max RampUp = " << maxRampUp << std::endl;
5051             logFout << "Min RampUp = "
5052                     << (minRampUp != ALPS_DBL_MAX ? minRampUp : 0.0)
5053                     << std::endl;
5054             logFout << "Std Dev of RampUp = " << stdRampUp << std::endl;
5055 
5056             if (logFileLevel_ > 0) {
5057                 logFout << std::endl << "Ramp ups of processes: " << std::endl;
5058                 for (i = 0; i < processNum_; ++i) {
5059                     if (i % 5 == 0)
5060                         logFout << std::endl;
5061                     logFout << rampUps[i] << "\t";
5062                 }
5063                 logFout << std::endl;
5064             }
5065 
5066             // Idle.
5067             logFout << "Average Idle = " << aveIdle << std::endl;
5068             logFout << "Max Idle = " << maxIdle << std::endl;
5069             logFout << "Min Idle = "
5070                     << (minIdle != ALPS_DBL_MAX ? minIdle : 0.0) << std::endl;
5071             logFout << "Std Dev of Idle = " << stdIdle << std::endl;
5072 
5073             // Msg time.
5074             logFout << "Average MsgTime = " << aveMsgTime << std::endl;
5075             logFout << "Max MsgTime = " << maxMsgTime << std::endl;
5076             logFout << "Min MsgTime = "
5077                     << (minMsgTime != ALPS_DBL_MAX ? minMsgTime : 0.0)
5078                     << std::endl;
5079             logFout << "Std Dev of MsgTime = " << stdMsgTime << std::endl;
5080 
5081             // Ramp down.
5082             logFout << "Average RampDown = " << aveRampDown << std::endl;
5083             logFout << "Max RampDown = " << maxRampDown << std::endl;
5084             logFout << "Min RampDown = "
5085                     << (minRampDown != ALPS_DBL_MAX ? minRampDown : 0.0)
5086                     << std::endl;
5087             logFout << "Std Dev of RampDown = " << stdRampDown << std::endl;
5088 
5089             // Overall.
5090             logFout << "Search CPU time (master) = " << timer_.getCpuTime()
5091                     << " seconds" <<  std::endl;
5092             logFout << "Max CPU time (worker) = " << maxCpuTime << std::endl;
5093             logFout << "Min CPU time (worker) = " << minCpuTime << std::endl;
5094             logFout << "Total CPU time = " << sumCpuTime << std::endl;
5095             logFout << "Search wallclock time (master) = "
5096                     << timer_.getWallClock() << " seconds" <<  std::endl;
5097             logFout << "Max wallclock (worker) = "<< maxWallClock << std::endl;
5098             logFout << "Min wallclock (worker) = "<< minWallClock << std::endl;
5099             logFout << "Total wallclock = " << sumWallClock << std::endl;
5100 
5101             //----------------------------------------------
5102             // Solution.
5103             //----------------------------------------------
5104 
5105             if (hasKnowledge(AlpsKnowledgeTypeSolution)) {
5106                 logFout << "Best solution quality = " << getBestQuality()
5107                         <<  std::endl;
5108                 logFout << "Nodes processed before finding this solution = "
5109                         << bestSolNode_ << std::endl;
5110             }
5111             else {
5112                 logFout << "No solution was found." << std::endl;
5113             }
5114             if (hasKnowledge(AlpsKnowledgeTypeSolution) ) {
5115                 dynamic_cast<AlpsSolution* >
5116                     (getBestKnowledge(AlpsKnowledgeTypeSolution).first)->print(logFout);
5117             }
5118 
5119         }  // Log if logFileLevel_ > 0
5120 
5121         if (msgLevel_ > 0) {
5122             if (getSolStatus() == AlpsExitStatusOptimal) {
5123                 messageHandler()->message(ALPS_T_COMPLETE, messages())
5124                     << systemNodeProcessed_
5125                     << nodePartialNum_
5126                     << static_cast<int>(systemWorkQuantity_)
5127                     << CoinMessageEol;
5128             }
5129             else if (getSolStatus() == AlpsExitStatusNodeLimit) {
5130                 if (forceTerminate_) {
5131                     messageHandler()->message(ALPS_T_NODE_LIMIT, messages())
5132                         << systemNodeProcessed_
5133                         << static_cast<int>(systemWorkQuantityForce_)
5134                         << CoinMessageEol;
5135                 }
5136                 else {
5137                     messageHandler()->message(ALPS_T_NODE_LIMIT, messages())
5138                         << systemNodeProcessed_
5139                         << static_cast<int>(systemWorkQuantity_)
5140                         << CoinMessageEol;
5141                 }
5142             }
5143             else if (getSolStatus() == AlpsExitStatusSolLimit) {
5144                 if (forceTerminate_) {
5145                     messageHandler()->message(ALPS_T_SOL_LIMIT, messages())
5146                         << systemNodeProcessed_
5147                         << static_cast<int>(systemWorkQuantityForce_)
5148                         << CoinMessageEol;
5149                 }
5150                 else {
5151                     messageHandler()->message(ALPS_T_SOL_LIMIT, messages())
5152                         << systemNodeProcessed_
5153                         << static_cast<int>(systemWorkQuantity_)
5154                         << CoinMessageEol;
5155                 }
5156             }
5157             else if (getSolStatus() == AlpsExitStatusTimeLimit) {
5158 #if 0
5159                 std::cout << "------ forceTerminate_=" << forceTerminate_
5160                           << ", systemWorkQuantityForce_ = "
5161                           << static_cast<int>(systemWorkQuantityForce_)
5162                           << std::endl;
5163 #endif
5164 
5165                 if (forceTerminate_) {
5166                     messageHandler()->message(ALPS_T_TIME_LIMIT, messages())
5167                         << systemNodeProcessed_
5168                         << static_cast<int>(systemWorkQuantityForce_)
5169                         << CoinMessageEol;
5170                 }
5171                 else  {
5172                   messageHandler()->message(ALPS_T_TIME_LIMIT, messages())
5173                       << systemNodeProcessed_
5174                         << static_cast<int>(systemWorkQuantity_)
5175                         << CoinMessageEol;
5176                 }
5177             }
5178             else if (getSolStatus() == AlpsExitStatusFeasible) {
5179                 messageHandler()->message(ALPS_T_FEASIBLE, messages())
5180                     << systemNodeProcessed_
5181                     << static_cast<int>(systemWorkQuantity_)
5182                     << CoinMessageEol;
5183             }
5184             else if (getSolStatus() == AlpsExitStatusNoMemory) {
5185                 messageHandler()->message(ALPS_T_NO_MEMORY, messages())
5186                     << systemNodeProcessed_
5187                     << static_cast<int>(systemWorkQuantity_)
5188                     << CoinMessageEol;
5189             }
5190             else if (getSolStatus() == AlpsExitStatusFailed) {
5191                 messageHandler()->message(ALPS_T_FAILED, messages())
5192                     << systemNodeProcessed_
5193                     << static_cast<int>(systemWorkQuantity_)
5194                     << CoinMessageEol;
5195             }
5196             else {
5197                 messageHandler()->message(ALPS_T_INFEASIBLE, messages())
5198                     << systemNodeProcessed_
5199                     << static_cast<int>(systemWorkQuantity_)
5200                     << CoinMessageEol;
5201             }
5202 
5203             std::cout << "\n=================== SEARCH RESULTS =================="
5204                       << std::endl;
5205 
5206             //----------------------------------------------
5207             // Tree size, depth, overheads.
5208             //----------------------------------------------
5209 
5210             std::cout << "Number of nodes processed = "
5211                       << totalProcessed << std::endl;
5212             if (totalPartial >0){
5213                std::cout << "Number of nodes partially processed = "
5214                          << totalPartial << std::endl;
5215             }
5216             std::cout << "Number of nodes branched = "
5217                       << totalBranched << std::endl;
5218             std::cout << "Number of nodes pruned before processing = "
5219                       << totalDiscarded << std::endl;
5220             std::cout << "Number of nodes left = "
5221                       << totalLeft << std::endl;
5222             std::cout << "Max number of nodes processed by a worker = "
5223                       << maxSize << std::endl;
5224             std::cout << "Min number of nodes processed by a worker = "
5225                       << (minSize != ALPS_INT_MAX ? minSize : 0) << std::endl;
5226             std::cout << "Std Dev of number of nodes processed by a worker = "
5227                       << stdSize << std::endl;
5228             std::cout << "----------------------------------" << std::endl;
5229 
5230             std::cout << "Average tree depth = " << aveDep << std::endl;
5231             std::cout << "Max tree depth on workers = " << maxDep << std::endl;
5232             std::cout << "Min tree depth on workers = "
5233                       << (minDep != ALPS_INT_MAX ? minDep : 0) << std::endl;
5234             std::cout << "Std Dev of tree depth on workers = " << stdDep
5235                       << std::endl;
5236             std::cout << "----------------------------------" << std::endl;
5237 
5238             // Ramp up.
5239             std::cout << "Average RampUp = " << aveRampUp << std::endl;
5240             std::cout << "Max RampUp = " << maxRampUp << std::endl;
5241             std::cout << "Min RampUp = "
5242                       << (minRampUp != ALPS_DBL_MAX ? minRampUp : 0.0)
5243                       << std::endl;
5244             std::cout << "Std Dev of RampUp = " << stdRampUp << std::endl;
5245             std::cout << "----------------------------------" << std::endl;
5246 
5247             // Idle.
5248             std::cout << "Average Idle = " << aveIdle << std::endl;
5249             std::cout << "Max Idle = " << maxIdle << std::endl;
5250             std::cout << "Min Idle = "
5251                       << (minIdle != ALPS_DBL_MAX ? minIdle : 0.0)
5252                       << std::endl;
5253             std::cout << "Std Dev of Idle = " << stdIdle << std::endl;
5254             std::cout << "----------------------------------" << std::endl;
5255 
5256             // MsgTime.
5257             std::cout << "Average MsgTime = " << aveMsgTime << std::endl;
5258             std::cout << "Max MsgTime = " << maxMsgTime << std::endl;
5259             std::cout << "Min MsgTime = "
5260                       << (minMsgTime != ALPS_DBL_MAX ? minMsgTime : 0.0)
5261                       << std::endl;
5262             std::cout << "Std Dev of MsgTime = " << stdMsgTime << std::endl;
5263             std::cout << "----------------------------------" << std::endl;
5264 
5265             // Ramp down.
5266             std::cout << "Average RampDown = " << aveRampDown << std::endl;
5267             std::cout << "Max RampDown = " << maxRampDown << std::endl;
5268             std::cout << "Min RampDown = "
5269                       << (minRampDown != ALPS_DBL_MAX ? minRampDown : 0.0)
5270                       << std::endl;
5271             std::cout << "Std Dev of RampDown = " << stdRampDown << std::endl;
5272             std::cout << "----------------------------------" << std::endl;
5273 
5274             //----------------------------------------------
5275             // Load balancing
5276             //----------------------------------------------
5277 
5278             std::cout << "Quality balance = " << sumQualityBalance
5279                       << std::endl;
5280             std::cout << "Quantity balance = " << sumQuantityBalance
5281                       << std::endl;
5282             std::cout << "Inter-balance = " << sumInterBalance
5283                       << std::endl;
5284             std::cout << "Intra-balance = " << sumIntraBalance
5285                       << std::endl;
5286             std::cout << "Worker asked workload = " << sumWorkerAsk
5287                       << std::endl;
5288             std::cout << "Worker donate success = " << sumDonateSuccess
5289                       << std::endl;
5290             std::cout << "Worker donate fail = " << sumDonateFail
5291                       << std::endl;
5292             std::cout << "Subtree split = " << sumSubtreeSplit
5293                       << std::endl;
5294             std::cout << "Subtree whole = " << sumSubtreeWhole
5295                       << std::endl;
5296             std::cout << "Subtree change = " << sumSubtreeChange
5297                       << std::endl;
5298 
5299             std::cout << "----------------------------------" << std::endl;
5300 
5301             //----------------------------------------------
5302             // Overall.
5303             //----------------------------------------------
5304 
5305             std::cout << "Search CPU time = "<<timer_.getCpuTime() <<" seconds"
5306                       << ", max = " << maxCpuTime
5307                       << ", min = "<< minCpuTime
5308                       << ", total CPU time = " << sumCpuTime << std::endl;
5309             std::cout << "Search wallclock  = "<<timer_.getWallClock()
5310                       <<" seconds"
5311                       << ", max = " << maxWallClock
5312                       << ", min = "<< minWallClock
5313                       <<", total wallclock = " << sumWallClock << std::endl;
5314             if (hasKnowledge(AlpsKnowledgeTypeSolution)) {
5315                 std::cout << "Best solution quality = " << getBestQuality()
5316                           << " ; node required to find this solution = " << bestSolNode_
5317                           << std::endl;
5318             }
5319             else {
5320                 std::cout << "No solution was found." << std::endl;
5321             }
5322             std::cout << "====================================================="
5323                       << std::endl << std::endl;
5324         } // EOF msgLevel_ > 0
5325     }  // EOF master log
5326 
5327     if (globalRank_ == masterRank_) {
5328         delete [] numProcessed;     numProcessed = 0;
5329         delete [] numPartial;       numPartial = 0;
5330         delete [] numBranched;      numBranched = 0;
5331         delete [] numDiscarded;     numDiscarded = 0;
5332         delete [] numLeft;          numLeft = 0;
5333         delete [] depths;           depths = 0;
5334         delete [] idles;            idles = 0;
5335         delete [] rampUps;          rampUps = 0;
5336         delete [] rampDowns;        rampDowns = 0;
5337         delete [] cpuTimes;
5338         delete [] wallClocks;
5339         delete [] qualityBalance;
5340         delete [] quantityBalance;
5341         delete [] interBalance;
5342         delete [] intraBalance;
5343         delete [] workerAsk;
5344         delete [] donateSuccess;
5345         delete [] donateFail;
5346         delete [] subtreeSplit;
5347         delete [] subtreeWhole;
5348         delete [] subtreeChange;
5349     }
5350 }
5351 
5352 //#############################################################################
5353 
5354 // Explore a subtree from subtree pool for certain units of work and time.
5355 // Return how many nodes have been processed. The same subtree will be
5356 // explored next time if it still have unexplored nodes.
5357 AlpsReturnStatus
doOneUnitWork(int unitWork,double unitTime,AlpsExitStatus & exitStatus,int & numNodesProcessed,int & numNodesBranched,int & numNodesDiscarded,int & numNodesPartial,int & depth,bool & betterSolution)5358 AlpsKnowledgeBrokerMPI::doOneUnitWork(int unitWork,
5359                                       double unitTime,
5360                                       AlpsExitStatus & exitStatus,
5361                                       int & numNodesProcessed,
5362                                       int & numNodesBranched,
5363                                       int & numNodesDiscarded,
5364                                       int & numNodesPartial,
5365                                       int & depth,
5366                                       bool & betterSolution)
5367 {
5368 
5369     AlpsReturnStatus rCode = AlpsReturnStatusOk;
5370 
5371     numNodesProcessed = 0; /* Output */
5372     numNodesBranched = 0;  /* Output */
5373     numNodesDiscarded = 0; /* Output */
5374     numNodesPartial = 0;   /* Output */
5375 
5376     if( !workingSubTree_ && !(subTreePool_->hasKnowledge()) ) {
5377         return rCode;
5378     }
5379 
5380     if (forceTerminate_) {
5381         // Subtree from load balancing
5382         deleteSubTrees();
5383         return rCode;
5384     }
5385 #if 1
5386 
5387     if ( ! needWorkingSubTree_ )  {
5388         // Already has a subtree working on.
5389 
5390 
5391         assert(workingSubTree_);
5392         rCode = workingSubTree_->exploreUnitWork(true, /* leaveAsIt*/
5393                                                  unitWork,
5394                                                  unitTime,
5395                                                  exitStatus,
5396                                                  numNodesProcessed,
5397                                                  numNodesBranched,  /* Output */
5398                                                  numNodesDiscarded, /* Output */
5399                                                  numNodesPartial,   /* Output */
5400                                                  treeDepth_,
5401                                                  betterSolution);
5402 
5403         if ( !(workingSubTree_->getNumNodes()) ) {
5404             delete workingSubTree_;  // Empty subtree
5405             workingSubTree_ = NULL;
5406             needWorkingSubTree_ = true;
5407         }
5408     }
5409     else if( needWorkingSubTree_ && (subTreePool_->hasKnowledge()) ) {
5410 
5411         workingSubTree_ = dynamic_cast<AlpsSubTree* >
5412             (subTreePool_->getKnowledge().first);
5413 
5414         subTreePool_->popKnowledge();     // Remove from pool
5415         needWorkingSubTree_ = false;      // Mark alread have one
5416 
5417 #ifdef NF_DEBUG
5418         std::cout << "pop a subtree." << std::endl;
5419 #endif
5420 
5421         rCode = workingSubTree_->exploreUnitWork(true,
5422                                                  unitWork,
5423                                                  unitTime,
5424                                                  exitStatus,
5425                                                  numNodesProcessed,
5426                                                  numNodesBranched,  /* Output */
5427                                                  numNodesDiscarded, /* Output */
5428                                                  numNodesPartial,   /* Output */
5429                                                  treeDepth_,
5430                                                  betterSolution);
5431 
5432         if ( !( workingSubTree_->getNumNodes()) ) {
5433             delete workingSubTree_;   // Empty subtree
5434             workingSubTree_ = 0;
5435             needWorkingSubTree_ = true;
5436         }
5437     }
5438     else {   // need subtree, but system has no workload
5439         if (workingSubTree_ != 0) {
5440             delete workingSubTree_;
5441             workingSubTree_ = 0;
5442         }
5443     }
5444 #endif
5445     return rCode;
5446 }
5447 
5448 //#############################################################################
5449 
5450 /** Initialize member data. */
5451 void
init()5452 AlpsKnowledgeBrokerMPI::init()
5453 {
5454     processNum_ = 0;
5455     globalRank_ = -1;
5456     clusterComm_ = MPI_COMM_NULL;
5457     hubComm_ = MPI_COMM_NULL;
5458     hubGroup_ = MPI_GROUP_NULL;
5459     clusterSize_ = 0;
5460     userClusterSize_ = 0;
5461     clusterRank_ = -1;
5462     hubRanks_ = 0;
5463     myHubRank_ = -1;
5464     masterRank_ = -1;
5465     processType_ = AlpsProcessTypeAny;
5466     processTypeList_ = NULL;
5467     hubWork_ = false;
5468     subTreeRequest_ = MPI_REQUEST_NULL;
5469     solRequestL_ = MPI_REQUEST_NULL;
5470     solRequestR_ = MPI_REQUEST_NULL;
5471     modelKnowRequestL_ = MPI_REQUEST_NULL;
5472     modelKnowRequestR_ = MPI_REQUEST_NULL;
5473     forwardRequestL_ = MPI_REQUEST_NULL;
5474     forwardRequestR_ = MPI_REQUEST_NULL;
5475     incumbentValue_ = ALPS_INC_MAX;
5476     incumbentID_ = 0;
5477     updateIncumbent_ = false;
5478     workQuality_ = ALPS_OBJ_MAX;     // Worst possible
5479     clusterWorkQuality_ = ALPS_OBJ_MAX;
5480     systemWorkQuality_ = ALPS_OBJ_MAX;
5481     hubWorkQualities_ = 0;
5482     workerWorkQualities_ = 0;
5483     workQuantity_ = 0.0;
5484     clusterWorkQuantity_ = 0.0;
5485     systemWorkQuantity_ = 0.0;
5486     systemWorkQuantityForce_ = 0.0;
5487     hubWorkQuantities_ = 0;
5488     workerWorkQuantities_ = 0;
5489     workerReported_ = 0;
5490     hubReported_ = 0;
5491     allHubReported_  = false;
5492     masterDoBalance_ = 0;
5493     hubDoBalance_ = 0;
5494     workerNodeProcesseds_ = 0;
5495     clusterNodeProcessed_ = 0;
5496     hubNodeProcesseds_ = 0;
5497     sendCount_ = 0;
5498     recvCount_ = 0;
5499     clusterSendCount_ = 0;
5500     clusterRecvCount_ = 0;
5501     systemSendCount_ = 0;
5502     systemRecvCount_ = 0;
5503     masterIndexBatch_ = 0;
5504     rampUpTime_ = 0.0;
5505     rampDownTime_ = 0.0;
5506     idleTime_ = 0.0;
5507     msgTime_ = 0.0;
5508 
5509     psStats_.qualityBalance_ = 0;
5510     psStats_.quantityBalance_ = 0;
5511     psStats_.interBalance_ = 0;
5512     psStats_.intraBalance_ = 0;
5513     psStats_.workerAsk_ = 0;
5514     psStats_.donateSuccess_ = 0;
5515     psStats_.donateFail_ = 0;
5516     psStats_.subtreeSplit_ = 0;
5517     psStats_.subtreeWhole_ = 0;
5518     psStats_.subtreeChange_ = 0;
5519 
5520     forceTerminate_ = false;
5521     blockTermCheck_ = true;
5522     blockHubReport_ = false;
5523     blockWorkerReport_ = false;
5524     blockAskForWork_   = false;
5525     attachBuffer_ = 0;
5526     largeBuffer_ = 0;
5527     largeBuffer2_ = 0;
5528     smallBuffer_ = 0;
5529     masterBalancePeriod_ = 0.01;
5530     hubReportPeriod_ = 0.01;
5531     modelGenID_ = -1;
5532     modelGenPos_ = -1;
5533 
5534     rampUpSubTree_ = 0;
5535     unitWorkNodes_ = 0;
5536     haltSearch_ = false;
5537 
5538     userBalancePeriod_ = false;
5539 }
5540 
5541 //#############################################################################
5542 
5543 /** Destructor. */
~AlpsKnowledgeBrokerMPI()5544 AlpsKnowledgeBrokerMPI::~AlpsKnowledgeBrokerMPI()
5545 {
5546     if (workerWorkQualities_) {
5547         delete [] workerWorkQualities_;
5548         workerWorkQualities_ = 0;
5549     }
5550 
5551     //	std::cout << "Here1 -- " << globalRank_ << std::endl;
5552     if (hubRanks_) {
5553         delete [] hubRanks_; hubRanks_ = 0;
5554     }
5555     //	std::cout << "Here2 -- " << globalRank_ << std::endl;
5556     if (processTypeList_) {
5557         delete [] processTypeList_; processTypeList_ = NULL;
5558     }
5559     //	std::cout << "Here3 -- " << globalRank_ << std::endl;
5560     if (hubWorkQualities_) {
5561         delete [] hubWorkQualities_; hubWorkQualities_ = 0;
5562     }
5563     //	std::cout << "Here4 -- " << globalRank_
5564     //            << "c size = " << clusterSize_ << std::endl;
5565 
5566     if (hubWorkQuantities_) {
5567         delete [] hubWorkQuantities_; hubWorkQuantities_ = 0;
5568     }
5569     if (workerWorkQuantities_){
5570         delete [] workerWorkQuantities_;
5571         workerWorkQuantities_ = 0;
5572     }
5573     //	std::cout << "Here5 -- " << globalRank_ << std::endl;
5574 
5575     if (workerReported_) {
5576         delete [] workerReported_; workerReported_ = 0;
5577     }
5578     if (hubReported_) {
5579         delete [] hubReported_; hubReported_ = 0;
5580     }
5581     if (workerNodeProcesseds_) {
5582         delete [] workerNodeProcesseds_;
5583         workerNodeProcesseds_ = 0;
5584     }
5585     if (hubNodeProcesseds_) {
5586         delete [] hubNodeProcesseds_;
5587         hubNodeProcesseds_ = 0;
5588     }
5589 #if 0
5590     if (attachBuffer_) {
5591         delete [] attachBuffer_;
5592         attachBuffer_ = 0;
5593     }
5594 #endif
5595     if (largeBuffer_) {
5596         delete [] largeBuffer_;
5597         largeBuffer_ = 0;
5598     }
5599     if (largeBuffer2_) {
5600         delete [] largeBuffer2_;
5601         largeBuffer2_ = 0;
5602     }
5603     if (smallBuffer_) {
5604         delete [] smallBuffer_;
5605         smallBuffer_ = 0;
5606     }
5607 
5608     if (rampUpSubTree_) {
5609         delete rampUpSubTree_;
5610         rampUpSubTree_ = 0;
5611     }
5612     // Terminate MPI environment.
5613     MPI_Finalize();
5614 }
5615 
5616 //#############################################################################
5617 
5618 void
printBestSolution(char * outputFile) const5619 AlpsKnowledgeBrokerMPI::printBestSolution(char* outputFile) const
5620 {
5621     if (globalRank_ == masterRank_) {
5622 
5623         if (msgLevel_ < 1) return;
5624 
5625         if (outputFile != 0) {
5626             // Write to outputFile
5627             std::ofstream os(outputFile);
5628             if (hasKnowledge(AlpsKnowledgeTypeSolution)) {
5629                 os << "Cost = " << getBestQuality() << std::endl;
5630                 dynamic_cast<AlpsSolution* >
5631                     (getBestKnowledge(AlpsKnowledgeTypeSolution).first)->print(os);
5632             }
5633             else {
5634                 os << "No solution was found." << std::endl;
5635             }
5636         }
5637         else {
5638             // Write to std::cout
5639             std::cout << std::endl;
5640             if (getSolStatus() == AlpsExitStatusOptimal) {
5641                 std::cout << "================= OPTIMAL SOLUTION =================="
5642                           << std::endl;
5643             }
5644             else {
5645                 std::cout << "================= BEST SOLUTION FOUND ==============="
5646                           << std::endl;
5647             }
5648 
5649             if (hasKnowledge(AlpsKnowledgeTypeSolution)) {
5650                 std::cout << "Cost = " << getBestQuality()
5651                           << std::endl;
5652                 dynamic_cast<AlpsSolution* >(getBestKnowledge(AlpsKnowledgeTypeSolution).first)->print(std::cout);
5653             }
5654             else {
5655                 std::cout << "No solution was found." << std::endl;
5656             }
5657             std::cout << "====================================================="
5658                       << std::endl << std::endl;
5659         }
5660     }
5661 }
5662 
5663 //#############################################################################
5664 
5665 void
incSendCount(const char * how,int s)5666 AlpsKnowledgeBrokerMPI::incSendCount(const char* how, int s)
5667 {
5668     if (msgLevel_ > 100) {
5669         messageHandler()->message(ALPS_MSG_HOW, messages())
5670             << globalRank_ << "increment send" << s << how << CoinMessageEol;
5671     }
5672 
5673     sendCount_ += s;
5674 }
5675 
5676 //#############################################################################
5677 
5678 /** Decrement the number of sent message. */
5679 void
decSendCount(const char * how,int s)5680 AlpsKnowledgeBrokerMPI::decSendCount(const char* how, int s)
5681 {
5682     if (msgLevel_ > 100) {
5683         messageHandler()->message(ALPS_MSG_HOW, messages())
5684             << globalRank_ << "decrement send" << s << how << CoinMessageEol;
5685     }
5686 
5687     sendCount_ -= s;
5688 }
5689 
5690 //#############################################################################
5691 
5692 /** Increment the number of received message. */
5693 void
incRecvCount(const char * how,int s)5694 AlpsKnowledgeBrokerMPI::incRecvCount(const char* how, int s)
5695 {
5696     if (msgLevel_ > 100) {
5697         messageHandler()->message(ALPS_MSG_HOW, messages())
5698             << globalRank_ << "increment recieve" << s << how <<CoinMessageEol;
5699     }
5700 
5701     recvCount_ += s;
5702 }
5703 
5704 //#############################################################################
5705 
5706 /** Decrement the number of sent message. */
5707 void
decRecvCount(const char * how,int s)5708 AlpsKnowledgeBrokerMPI::decRecvCount(const char* how, int s)
5709 {
5710     if (msgLevel_ > 100) {
5711         messageHandler()->message(ALPS_MSG_HOW, messages())
5712             << globalRank_  << "decrement recieve" <<s<< how<< CoinMessageEol;
5713     }
5714 
5715     recvCount_ -= s;
5716 }
5717 
5718 //#############################################################################
5719 
5720 /** Set knowlege to receiver. */
5721 void
sendKnowledge(AlpsKnowledgeType type,int sender,int receiver,char * & msgBuffer,int msgSize,int msgTag,MPI_Comm comm,bool blocking)5722 AlpsKnowledgeBrokerMPI::sendKnowledge(AlpsKnowledgeType type,
5723                                       int sender,
5724                                       int receiver,
5725                                       char *& msgBuffer,
5726                                       int msgSize,
5727                                       int msgTag,
5728                                       MPI_Comm comm,
5729                                       bool blocking)
5730 {
5731     switch (type) {
5732     case AlpsKnowledgeTypeModel:
5733         break;
5734     case AlpsKnowledgeTypeModelGen:
5735         sendModelKnowledge(comm, receiver);
5736         break;
5737     case AlpsKnowledgeTypeNode:
5738         sendRampUpNode(receiver, comm);
5739         break;
5740     case AlpsKnowledgeTypeSolution:
5741         sendIncumbent();
5742         break;
5743     case AlpsKnowledgeTypeSubTree:
5744         break;
5745     default:
5746         assert(0);
5747         throw CoinError("Unknown knowledge type", "sendKnowledge",
5748                         "AlpsKnowledgeBrokerMPI");
5749     }
5750 }
5751 
5752 //#############################################################################
5753 
5754 /** Receive knowlege from sender. */
5755 void
receiveKnowledge(AlpsKnowledgeType type,int sender,int receiver,char * & msgBuffer,int msgSize,int msgTag,MPI_Comm comm,MPI_Status * status,bool blocking)5756 AlpsKnowledgeBrokerMPI::receiveKnowledge(AlpsKnowledgeType type,
5757                                          int sender,
5758                                          int receiver,
5759                                          char *& msgBuffer,
5760                                          int msgSize,
5761                                          int msgTag,
5762                                          MPI_Comm comm,
5763                                          MPI_Status* status,
5764                                          bool blocking)
5765 {
5766     switch (type) {
5767     case AlpsKnowledgeTypeModel:
5768         break;
5769     case AlpsKnowledgeTypeModelGen:
5770         receiveModelKnowledge(comm);
5771         break;
5772     case AlpsKnowledgeTypeNode:
5773         receiveRampUpNode(sender, comm, status);
5774         break;
5775     case AlpsKnowledgeTypeSolution:
5776         break;
5777     case AlpsKnowledgeTypeSubTree:
5778         break;
5779     default:
5780         assert(0);
5781         throw CoinError("Unknown knowledge type", "receiveKnowledge",
5782                         "AlpsKnowledgeBrokerMPI");
5783     }
5784 }
5785 
5786 //#############################################################################
5787 
5788 void
requestKnowledge(AlpsKnowledgeType type,int sender,int receiver,char * & msgBuffer,int msgSize,int msgTag,MPI_Comm comm,bool blocking)5789 AlpsKnowledgeBrokerMPI::requestKnowledge(AlpsKnowledgeType type,
5790                                          int sender,
5791                                          int receiver,
5792                                          char *& msgBuffer,
5793                                          int msgSize,
5794                                          int msgTag,
5795                                          MPI_Comm comm,
5796                                          bool blocking)
5797 {
5798     switch (type) {
5799     case AlpsKnowledgeTypeModel:
5800         break;
5801     case AlpsKnowledgeTypeModelGen:
5802         break;
5803     case AlpsKnowledgeTypeNode:
5804         break;
5805     case AlpsKnowledgeTypeSolution:
5806         break;
5807     case AlpsKnowledgeTypeSubTree:
5808         break;
5809     default:
5810         assert(0);
5811         throw CoinError("Unknown knowledge type", "requestKnowledge",
5812                         "AlpsKnowledgeBrokerMPI");
5813     }
5814 }
5815 
5816 //#############################################################################
5817 
5818 /** Set generated knowlege (related to model) to receiver. */
5819 // NOTE: comm is hubComm_ or MPI_COMM_WORLD.
5820 void
forwardModelKnowledge()5821 AlpsKnowledgeBrokerMPI::forwardModelKnowledge()
5822 {
5823     assert(modelGenPos_ > 0);
5824 
5825     // Binary forewarding
5826     int mySeq = rankToSequence(modelGenID_, globalRank_);
5827     int leftSeq = leftSequence(mySeq, processNum_);
5828     int rightSeq = rightSequence(mySeq, processNum_);
5829 
5830 #if 0
5831     std::cout << "++++ forwardModelKnowledge: mySeq = " << mySeq
5832               << ", leftSeq = " << leftSeq
5833               << ", rightSeq = " << rightSeq
5834               << ", modelGenID_ = " << modelGenID_
5835               << ", globalRank_ = " << globalRank_
5836               << std::endl;
5837 #endif
5838 
5839     if (leftSeq != -1) {
5840         if (forwardRequestL_ != MPI_REQUEST_NULL) {
5841             // Test if previous sending has completed.
5842             int alreadySent = false;
5843             MPI_Status sentStatus;
5844             MPI_Test(&forwardRequestL_, &alreadySent, &sentStatus);
5845             if (!alreadySent) { // Previous sending hasn't complete
5846                 std::cout << "++++ Previous left forwardModelKnowledge hasn't finished."
5847                           << std::endl;
5848                 return;
5849             }
5850         }
5851 
5852         // Note: modelGenPos_ is set by recieveModelKnowledge
5853         int leftRank = sequenceToRank(modelGenID_, leftSeq);
5854         MPI_Isend(largeBuffer_, modelGenPos_, MPI_PACKED, leftRank,
5855                   AlpsMsgModelGenSearch, MPI_COMM_WORLD, &forwardRequestL_);
5856 #if 0
5857         std::cout << "++++ forwardModelKnowledge: leftRank = "
5858                   << leftRank << std::endl;
5859 #endif
5860         MPI_Status sentStatusL;
5861         MPI_Wait(&forwardRequestL_, &sentStatusL);
5862         incSendCount("forwardModelKnowledge during search");
5863     }
5864 
5865     if (rightSeq != -1) {
5866         if (forwardRequestR_ != MPI_REQUEST_NULL) {
5867             // Test if previous sending has completed.
5868             int alreadySent = false;
5869             MPI_Status sentStatus;
5870             MPI_Test(&forwardRequestR_, &alreadySent, &sentStatus);
5871             if (!alreadySent) { // Previous sending hasn't complete
5872                 std::cout << "++++ Previous right forwardModelKnowledge hasn't finished."
5873                           << std::endl;
5874                 return;
5875             }
5876         }
5877         int rightRank = sequenceToRank(modelGenID_, rightSeq);
5878 #if 0
5879         std::cout << "++++ forwardModelKnowledge: rightRank = "
5880                   << rightRank << std::endl;
5881 #endif
5882         MPI_Isend(largeBuffer_, modelGenPos_, MPI_PACKED, rightRank,
5883                   AlpsMsgModelGenSearch, MPI_COMM_WORLD, &forwardRequestR_);
5884         MPI_Status sentStatusR;
5885         MPI_Wait(&forwardRequestR_, &sentStatusR);
5886         incSendCount("forwardModelKnowledge during search");
5887     }
5888 }
5889 
5890 //#############################################################################
5891 
5892 /** Set generated knowlege (related to model) to receiver. */
5893 // NOTE: comm is hubComm_ or MPI_COMM_WORLD.
5894 // During search, need use buffered send since other process process may send
5895 // this process at the same time, then will be deadlock.
5896 void
sendModelKnowledge(MPI_Comm comm,int receiver)5897 AlpsKnowledgeBrokerMPI::sendModelKnowledge(MPI_Comm comm, int receiver)
5898 {
5899     int size = largeSize_;
5900     int position = 0;
5901 
5902     bool hasKnowledge = false;
5903 
5904     assert(largeBuffer2_);
5905 
5906     // Pack original sender's global rank
5907     position = 0;
5908     MPI_Pack(&globalRank_, 1, MPI_INT, largeBuffer2_, largeSize_,
5909              &position, comm);
5910 
5911     //std::cout << "----- 1. position = " << position << std::endl;
5912 
5913     // Retrieve and pack generated model knowledge
5914     AlpsEncoded *encoded = model_->packSharedKnowlege();
5915 
5916     if (encoded) {
5917         hasKnowledge = true;
5918         // Pack into local buffer
5919         packEncoded(encoded, largeBuffer2_, size, position, comm);
5920     }
5921 #if 0
5922     else {
5923         std::cout << "---- Process[" << globalRank_
5924                   << "]: have no mode knowledge to share." << std::endl;
5925     }
5926 #endif
5927 
5928     //std::cout << "----- 2. position = " << position
5929     //        << ", size = " << size << std::endl;
5930     delete encoded;
5931 
5932     if (phase_ == AlpsPhaseRampup) {
5933         assert(comm == clusterComm_ || comm == hubComm_);
5934         assert(receiver > -1);
5935 
5936         if (hasKnowledge) {
5937             // Send packed knowledge of size "position"
5938             MPI_Send(largeBuffer2_, position, MPI_PACKED, receiver,
5939                      AlpsMsgModelGenRampUp, comm);
5940             incSendCount("sendModelKnowledge during rampup");
5941         }
5942         else {
5943             // Send an empty message
5944             MPI_Send(largeBuffer2_, 0, MPI_PACKED, receiver,
5945                      AlpsMsgModelGenRampUp, comm);
5946             incSendCount("sendModelKnowledge during rampup");
5947         }
5948     }
5949     else if ( hasKnowledge && (phase_ == AlpsPhaseSearch) ) {
5950         assert(comm == MPI_COMM_WORLD);
5951 #if 0
5952         // Attach buffer
5953         if (!attachBuffer_) {
5954             int attachSize = 2 * (largeSize_  + MPI_BSEND_OVERHEAD );
5955             attachBuffer_ =  new char [attachSize];
5956             MPI_Buffer_attach(attachBuffer_, attachSize);
5957         }
5958 #endif
5959 
5960 #if 0
5961         std::cout << "---- Process[" << globalRank_
5962                   << "]: Share mode knowledge by buffer sending, "
5963                   << "position " << position << std::endl;
5964 #endif
5965 
5966         // During search, binary sending
5967         int mySeq = rankToSequence(globalRank_, globalRank_);
5968         int leftSeq = leftSequence(mySeq, processNum_);
5969         int rightSeq = rightSequence(mySeq, processNum_);
5970 
5971         if (leftSeq != -1) {
5972             if (modelKnowRequestL_ != MPI_REQUEST_NULL) {
5973                 // Test if previous sending has completed.
5974                 int alreadySent = false;
5975                 MPI_Status sentStatus;
5976                 MPI_Test(&modelKnowRequestL_, &alreadySent, &sentStatus);
5977                 if (!alreadySent) { // Previous sending hasn't complete
5978                     return;
5979                 }
5980             }
5981             int leftRank = sequenceToRank(globalRank_, leftSeq);
5982             // Buffered send
5983             //MPI_Bsend(largeBuffer2_, position, MPI_PACKED, leftRank,
5984             //      AlpsMsgModelGenSearch, comm);
5985             //MPI_Send(largeBuffer2_, position, MPI_PACKED, leftRank,
5986             //     AlpsMsgModelGenSearch, comm);
5987             MPI_Isend(largeBuffer2_, position, MPI_PACKED, leftRank,
5988                       AlpsMsgModelGenSearch, comm, &modelKnowRequestL_);
5989             MPI_Status sentStatusL;
5990             MPI_Wait(&forwardRequestL_, &sentStatusL);
5991             incSendCount("sendModelKnowledge during search");
5992         }
5993 
5994         if (rightSeq != -1) {
5995             if (modelKnowRequestR_ != MPI_REQUEST_NULL) {
5996                 // Test if previous sending has completed.
5997                 int alreadySent = false;
5998                 MPI_Status sentStatus;
5999                 MPI_Test(&modelKnowRequestR_, &alreadySent, &sentStatus);
6000                 if (!alreadySent) { // Previous sending hasn't complete
6001                     return;
6002                 }
6003             }
6004             int rightRank = sequenceToRank(globalRank_, rightSeq);
6005             // Buffered send
6006             //MPI_Bsend(largeBuffer2_, position, MPI_PACKED, rightRank,
6007             //      AlpsMsgModelGenSearch, comm);
6008             //MPI_Send(largeBuffer2_, position, MPI_PACKED, rightRank,
6009             //     AlpsMsgModelGenSearch, comm);
6010             MPI_Isend(largeBuffer2_, position, MPI_PACKED, rightRank,
6011                       AlpsMsgModelGenSearch, comm, &modelKnowRequestR_);
6012             MPI_Status sentStatusR;
6013             MPI_Wait(&forwardRequestR_, &sentStatusR);
6014             incSendCount("sendModelKnowledge during search");
6015         }
6016     }
6017 }
6018 
6019 //#############################################################################
6020 
6021 /** Receive generated knowlege (related to model) from sender. */
6022 // NOTE: comm is hubComm_ or MPI_COMM_WORLD.
6023 void
receiveModelKnowledge(MPI_Comm comm)6024 AlpsKnowledgeBrokerMPI::receiveModelKnowledge(MPI_Comm comm)
6025 {
6026     int position = 0;
6027     int count = 0;
6028     MPI_Status status;
6029     bool hasKnowledge = true;
6030 
6031     char *localBuffer = 0;// FIXME: add a largebuffer2_
6032 
6033     if (phase_ == AlpsPhaseRampup) {
6034         localBuffer = new char [largeSize_];
6035 
6036         // std::cout << "PROCESS[" << globalRank_ << "] : B: recive gen model"
6037         //        << std::endl;
6038 
6039         // Receive
6040         MPI_Recv(localBuffer, largeSize_, MPI_PACKED, MPI_ANY_SOURCE,
6041                  AlpsMsgModelGenRampUp, comm, &status);
6042 
6043         MPI_Get_count(&status, MPI_PACKED, &count);
6044 
6045         if (count < 1) {
6046             hasKnowledge = false;
6047         }
6048 
6049         incRecvCount("receiveModelKnowledge during rampup");
6050 
6051         //std::cout << "PROCESS[" << globalRank_ << "] : A: recive gen model"
6052         //  << ", count = " << count << std::endl;
6053     }
6054     else if (phase_ == AlpsPhaseSearch) {
6055         localBuffer = largeBuffer_;
6056     }
6057     else {
6058         assert(0);
6059     }
6060 
6061     if (hasKnowledge) {
6062         // Upack original sender's global rank
6063         position = 0;  // Start from position 0 in local buffer
6064         MPI_Unpack(localBuffer, largeSize_, &position, &modelGenID_, 1,
6065                    MPI_INT, comm);
6066 
6067         //std::cout << "PROCESS[" << globalRank_ << "] : recive gen model"
6068         //  << ", count = " << count << ", from " << modelGenID_
6069         //        << std::endl;
6070 
6071         // Upack knowledge from buffer.
6072         AlpsEncoded* encodedModelGen = unpackEncoded(localBuffer,
6073                                                      position,
6074                                                      comm,
6075                                                      largeSize_);
6076 
6077         // Record actuall buffer size for broadcasting
6078         modelGenPos_ = position;
6079 
6080         // Upack and store knowledge from larger buffer.
6081         model_->unpackSharedKnowledge(*encodedModelGen);
6082 
6083         delete encodedModelGen;
6084         encodedModelGen = 0;
6085     }
6086 
6087     if ( (phase_ == AlpsPhaseRampup) && localBuffer ) {
6088         delete [] localBuffer;
6089     }
6090     localBuffer = 0;
6091 }
6092 
6093 //#############################################################################
6094 
6095 void
sendErrorCodeToMaster(int errorCode)6096 AlpsKnowledgeBrokerMPI::sendErrorCodeToMaster(int errorCode)
6097 {
6098     int size = model_->AlpsPar()->entry(AlpsParams::smallSize);
6099     int pos = 0;
6100     MPI_Pack(&errorCode, 1, MPI_INT, smallBuffer_, size, &pos, MPI_COMM_WORLD);
6101     MPI_Send(smallBuffer_, pos, MPI_PACKED, masterRank_, AlpsMsgErrorCode,
6102              MPI_COMM_WORLD);
6103     incSendCount("sendErrorCodeToMaster");
6104 }
6105 
6106 //#############################################################################
6107 
6108 void
recvErrorCode(char * & bufLarge)6109 AlpsKnowledgeBrokerMPI::recvErrorCode(char *& bufLarge)
6110 {
6111     int size = 10;
6112     int pos = -1;
6113     int errorCode = 0;
6114 
6115     MPI_Unpack(bufLarge, size, &pos, &errorCode, 1, MPI_INT, MPI_COMM_WORLD);
6116     if (errorCode == 1) {
6117         // out of memory
6118         exitStatus_ = AlpsExitStatusNoMemory;
6119     }
6120     else {
6121         // error
6122         exitStatus_ = AlpsExitStatusFailed;
6123     }
6124 
6125     forceTerminate_ = true;
6126 }
6127 
6128 //#############################################################################
6129 
6130 void
rootInitMaster(AlpsTreeNode * root)6131 AlpsKnowledgeBrokerMPI::rootInitMaster(AlpsTreeNode* root)
6132 {
6133     int i = 0;
6134     int requiredNumNodes =
6135         model_->AlpsPar()->entry(AlpsParams::masterInitNodeNum);
6136     double rampUpTimeMaster = 0.0;
6137 
6138     AlpsNodePool* tempNodePool= NULL;
6139 
6140     if (msgLevel_ > 0) {
6141         messageHandler()->message(ALPS_STATIC_BALANCE_BEG, messages())
6142             << "the Two-level Root Initialization" << CoinMessageEol;
6143     }
6144 
6145     //------------------------------------------------------
6146     // Create required number of subtrees(nodes) for hubs.
6147     //------------------------------------------------------
6148 
6149     if (msgLevel_ > 0 && requiredNumNodes > 0) {
6150         messageHandler()->message(ALPS_RAMPUP_MASTER_START, messages())
6151             << globalRank_ << requiredNumNodes << CoinMessageEol;
6152     }
6153 
6154     rampUpSubTree_ = dynamic_cast<AlpsSubTree*>
6155         (const_cast<AlpsKnowledge *>(decoderObject(AlpsKnowledgeTypeSubTree)))
6156         ->newSubTree();
6157     rampUpSubTree_->setBroker(this);
6158     rampUpSubTree_->setNodeSelection(rampUpNodeSelection_);
6159     rampUpSubTree_->setNextIndex(1); // One more than root's index
6160 
6161     nodeProcessedNum_ += rampUpSubTree_->rampUp(hubNum_,
6162                                                 requiredNumNodes,
6163                                                 treeDepth_,
6164                                                 root);
6165 
6166     int numGenNodes = rampUpSubTree_->nodePool()->getNumKnowledges();
6167 
6168     //------------------------------------------------------
6169     // Spawn hub/worker processes (For dynamic process management only)
6170     // Nothing now. FILL IN HERE IF NEEDED LATER.
6171     //------------------------------------------------------
6172 
6173     //------------------------------------------------------
6174     // Distribute subtrees(nodes) to hubs in Round-robin way.
6175     //------------------------------------------------------
6176 
6177     // Temporary store node for hub 0(master)
6178     tempNodePool = new AlpsNodePool;
6179     tempNodePool->setNodeSelection(*rampUpNodeSelection_);
6180 
6181     if (numGenNodes <= 0) {
6182         if (msgLevel_ > 0) {
6183             messageHandler()->message(ALPS_RAMPUP_MASTER_FAIL, messages())
6184                 << globalRank_ << CoinMessageEol;
6185         }
6186     }
6187     else {
6188         int numSent = 0;
6189         while (numSent < numGenNodes) {
6190             for (i = 0; i < hubNum_; ++i) {
6191                 if(numSent >= numGenNodes) {
6192                     break; // Break in the sending round
6193                 }
6194                 if (hubRanks_[i] != globalRank_) {
6195                     // NOTE: master's rank is always 0 in hubComm_.
6196                     if (msgLevel_ > 100) {
6197                         std::cout << "master send a node to hub "
6198                                   << hubRanks_[i]
6199                                   << std::endl;
6200                     }
6201                     sendKnowledge(AlpsKnowledgeTypeNode,
6202                                   globalRank_,
6203                                   i,
6204                                   smallBuffer_,
6205                                   0,
6206                                   MPI_ANY_TAG,
6207                                   hubComm_,
6208                                   true);
6209 
6210                 }
6211                 else {
6212                     AlpsTreeNode* nodeM = dynamic_cast<AlpsTreeNode* >
6213                         (rampUpSubTree_->nodePool()->getKnowledge().first);
6214                     rampUpSubTree_->nodePool()->popKnowledge();
6215                     tempNodePool->addKnowledge(nodeM, nodeM->getQuality());
6216                 }
6217                 ++numSent;
6218             }
6219         }
6220     }
6221 
6222     //------------------------------------------------------
6223     // Move nodes from tempNodePool to rampUpSubTree_'s node pool.
6224     // NOTE: These nodes do not necessary form a subtree. We want to use
6225     //       subtree's functions to generates more nodes for workers.
6226     //------------------------------------------------------
6227 
6228     while (tempNodePool->hasKnowledge()) {
6229         AlpsTreeNode* nodeT = dynamic_cast<AlpsTreeNode* >
6230             (tempNodePool->getKnowledge().first);
6231         double valT = tempNodePool->getKnowledge().second;
6232         rampUpSubTree_->nodePool()->addKnowledge(nodeT, valT);
6233         tempNodePool->popKnowledge();
6234     }
6235 
6236     //------------------------------------------------------
6237     // Tell hubs no more nodes to send.
6238     //------------------------------------------------------
6239 
6240     for (i = 0; i < hubNum_; ++i) {
6241         if (hubRanks_[i] != globalRank_) sendFinishInit(i, hubComm_);
6242     }
6243 
6244     //------------------------------------------------------
6245     // Sent generated model knowledge to hubs.
6246     //------------------------------------------------------
6247 
6248     for (i = 0; i < hubNum_; ++i) {
6249         if (hubRanks_[i] != globalRank_) {
6250             sendKnowledge(AlpsKnowledgeTypeModelGen,
6251                           masterRank_,
6252                           i,            // receiver
6253                           largeBuffer2_,
6254                           0,
6255                           AlpsMsgModelGenRampUp,
6256                           hubComm_,     // comm
6257                           true);
6258             //sendModelKnowledge(genBuf, AlpsMsgModelGenRampUp, hubComm_, i);
6259         }
6260     }
6261     if (msgLevel_ > 0) {
6262         messageHandler()->message(ALPS_KNOWLEDGE_GEN, messages())
6263             << globalRank_ << CoinMessageEol;
6264     }
6265 
6266     // Master's rampup stops.
6267     rampUpTimeMaster = masterTimer_.getTime();
6268 
6269     //------------------------------------------------------
6270     // If have solution, broadcast its value and process id.
6271     //------------------------------------------------------
6272 
6273     if (hasKnowledge(AlpsKnowledgeTypeSolution)) {
6274         double incVal = getBestKnowledge(AlpsKnowledgeTypeSolution).second;
6275         if(incVal < incumbentValue_) {   // Assume Minimization
6276             incumbentValue_ = incVal;
6277             incumbentID_ = globalRank_;
6278             sendKnowledge(AlpsKnowledgeTypeSolution, //useful
6279                           globalRank_,
6280                           0,
6281                           smallBuffer_,
6282                           0,
6283                           MPI_ANY_TAG,
6284                           MPI_COMM_WORLD,
6285                           false);
6286             //sendIncumbent();
6287             if (msgLevel_ > 0) {
6288                 messageHandler()->message(ALPS_RAMPUP_MASTER_SOL, messages())
6289                     << globalRank_ << incVal << CoinMessageEol;
6290             }
6291         }
6292     }
6293 
6294     //------------------------------------------------------
6295     // Print out statistics about root ramp up.
6296     //------------------------------------------------------
6297 
6298     if (msgLevel_ > 0) {
6299         messageHandler()->message(ALPS_RAMPUP_MASTER, messages())
6300             << globalRank_ << rampUpTimeMaster
6301             << nodeProcessedNum_ << numGenNodes
6302             << CoinMessageEol;
6303     }
6304 
6305     if (nodeProcessedNum_) {
6306         nodeProcessingTime_ = rampUpTimeMaster/nodeProcessedNum_;
6307     }
6308 
6309     //------------------------------------------------------
6310     // Generate and send required number of nodes for master's workers.
6311     //------------------------------------------------------
6312 
6313     requiredNumNodes = model_->AlpsPar()->entry(AlpsParams::hubInitNodeNum);
6314 
6315     if (msgLevel_ > 0 && requiredNumNodes > 0) {
6316         messageHandler()->message(ALPS_RAMPUP_HUB_START, messages())
6317             << globalRank_ << requiredNumNodes << CoinMessageEol;
6318     }
6319 
6320     int treeSizeByHub = rampUpSubTree_->rampUp(clusterSize_ - 1,
6321                                                requiredNumNodes,
6322                                                treeDepth_);
6323 
6324     nodeProcessedNum_ += treeSizeByHub;
6325     const int numNode2 = rampUpSubTree_->nodePool()->getNumKnowledges();
6326 
6327     if (numNode2 == 0) {
6328         if (msgLevel_ > 0) {
6329             messageHandler()->message(ALPS_RAMPUP_HUB_FAIL, messages())
6330                 << globalRank_ << CoinMessageEol;
6331         }
6332     }
6333     else {    // Send nodes to my workers
6334         int numSent2 = 0;
6335         while (numSent2 < numNode2) {
6336             for (i = 0; i < clusterSize_; ++i) {
6337                 if(numSent2 >= numNode2) {
6338                     break;
6339                 }
6340                 if (i != clusterRank_) {
6341                     sendKnowledge(AlpsKnowledgeTypeNode,
6342                                   globalRank_,
6343                                   i,
6344                                   smallBuffer_,
6345                                   0,
6346                                   MPI_ANY_TAG,
6347                                   clusterComm_,
6348                                   true);
6349 
6350                     ++numSent2;
6351                     if (msgLevel_ > 200) {
6352                         std::cout << "MASTER/HUB " << clusterRank_
6353                                   <<" : sent nodes to Worker "
6354                                   << i << std::endl;
6355                     }
6356                 }
6357             }
6358         }
6359     }
6360 
6361     assert(rampUpSubTree_->getNumNodes() == 0);
6362     rampUpSubTree_->nullRootActiveNode();
6363 
6364     //------------------------------------------------------
6365     // Tell its workers no more nodes to send.
6366     //------------------------------------------------------
6367 
6368     for (i = 0; i < clusterSize_; ++i) {
6369         if (i != clusterRank_) {
6370             sendFinishInit(i, clusterComm_);
6371         }
6372     }
6373 
6374     //------------------------------------------------------
6375     // Sent generated model knowledge to its workers.
6376     //------------------------------------------------------
6377 
6378     for (i = 0; i < clusterSize_; ++i) {
6379         if (i != clusterRank_) {
6380             sendKnowledge(AlpsKnowledgeTypeModelGen,
6381                           clusterRank_,
6382                           i,             // receiver
6383                           largeBuffer2_,
6384                           0,
6385                           AlpsMsgModelGenRampUp,
6386                           clusterComm_,  // comm
6387                           true);
6388             //sendModelKnowledge(genBuf, AlpsMsgModelGenRampUp, clusterComm_, i);
6389         }
6390     }
6391 
6392     //------------------------------------------------------
6393     // If have better solution, broadcast its value and process id.
6394     //------------------------------------------------------
6395 
6396     if (hasKnowledge(AlpsKnowledgeTypeSolution)) {
6397         double incVal = getBestKnowledge(AlpsKnowledgeTypeSolution).second;
6398         if(incVal < incumbentValue_) {   // Assume Minimization
6399             incumbentValue_ = incVal;
6400             incumbentID_ = globalRank_;
6401             sendKnowledge(AlpsKnowledgeTypeSolution, //useful
6402                           globalRank_,
6403                           0,
6404                           smallBuffer_,
6405                           0,
6406                           MPI_ANY_TAG,
6407                           MPI_COMM_WORLD,
6408                           false);
6409             //sendIncumbent();
6410             if (msgLevel_ > 0) {
6411                 messageHandler()->message(ALPS_RAMPUP_HUB_SOL, messages())
6412                     << globalRank_ << incVal << CoinMessageEol;
6413             }
6414         }
6415     }
6416 
6417     rampUpTime_ = masterTimer_.getTime();
6418 
6419     if (msgLevel_ > 0) {
6420         messageHandler()->message(ALPS_RAMPUP_HUB, messages())
6421             << globalRank_ << (rampUpTime_ - rampUpTimeMaster)
6422             << treeSizeByHub << numNode2 << CoinMessageEol;
6423     }
6424 
6425     if (treeSizeByHub) {
6426         nodeProcessingTime_=0.5*(nodeProcessingTime_ +
6427                                  (rampUpTime_-rampUpTimeMaster)/treeSizeByHub);
6428     }
6429 
6430     // Reset to normal selection.
6431     rampUpSubTree_->setNodeSelection(nodeSelection_);
6432 
6433     // Free pool.
6434     delete tempNodePool;
6435 
6436     if (msgLevel_ > 0) {
6437         messageHandler()->message(ALPS_STATIC_BALANCE_END, messages())
6438             << "the Two-level Root Initialization" << CoinMessageEol;
6439     }
6440 }
6441 
6442 //#############################################################################
6443 
6444 void
rootInitHub()6445 AlpsKnowledgeBrokerMPI::rootInitHub()
6446 {
6447     int i;
6448 
6449     MPI_Status status;
6450 
6451     int requiredNumNodes =
6452         model_->AlpsPar()->entry(AlpsParams::hubInitNodeNum);
6453 
6454     rampUpSubTree_ = dynamic_cast<AlpsSubTree*>
6455         (const_cast<AlpsKnowledge *>
6456          (decoderObject(AlpsKnowledgeTypeSubTree)))->newSubTree();
6457     rampUpSubTree_->setBroker(this);
6458     rampUpSubTree_->setNodeSelection(rampUpNodeSelection_);
6459 
6460     // Make sure the number of nodes created is larger than cluster size.
6461     int minNumNodes = 0;
6462     if (hubWork_) {
6463         minNumNodes = clusterSize_;
6464     }
6465     else {
6466         minNumNodes = clusterSize_ - 1;
6467     }
6468 
6469     //------------------------------------------------------
6470     // Receive subtrees (nodes) sent by Master.
6471     // NOTE: Put all nodes in rampUpSubtree_. We just want to use the
6472     //       subtree's functions to generates more nodes.
6473     //------------------------------------------------------
6474 
6475     while(true) {
6476         // NOTE: master's rank is always 0 in hubComm_.
6477         receiveKnowledge(AlpsKnowledgeTypeNode,
6478                          0,           // sender,
6479                          globalRank_, // receiver,
6480                          smallBuffer_,
6481                          0,
6482                          MPI_ANY_TAG,
6483                          hubComm_,
6484                          &status,
6485                          true);
6486 
6487         if (hubMsgLevel_ > 100) {
6488             std::cout << "HUB " << globalRank_ << ": received a node from master "
6489                       << masterRank_ << std::endl;
6490         }
6491 
6492         if (status.MPI_TAG == AlpsMsgFinishInit) {
6493             if (hubMsgLevel_ > 0) {
6494                 messageHandler()->message(ALPS_RAMPUP_HUB_RECV, messages())
6495                     << globalRank_ << masterRank_ << CoinMessageEol;
6496             }
6497 
6498             break;
6499         }
6500     }
6501 
6502     //------------------------------------------------------
6503     // Receive generated knowledge form the master.
6504     //------------------------------------------------------
6505 
6506     receiveKnowledge(AlpsKnowledgeTypeModelGen,
6507                      masterRank_, // sender
6508                      globalRank_, // receiver,
6509                      smallBuffer_,
6510                      0,
6511                      MPI_ANY_TAG,
6512                      hubComm_,    // hub comm
6513                      &status,
6514                      true);
6515 
6516     //receiveModelKnowlege(hubComm_);
6517 
6518     //------------------------------------------------------
6519     // Generate and send required number of nodes(subtree) for hub's workers.
6520     //------------------------------------------------------
6521 
6522     if (hubMsgLevel_ > 0 && requiredNumNodes > 0) {
6523         messageHandler()->message(ALPS_RAMPUP_HUB_START, messages())
6524             << globalRank_ << requiredNumNodes << CoinMessageEol;
6525     }
6526 
6527     nodeProcessedNum_ += rampUpSubTree_->rampUp(minNumNodes,
6528                                                 requiredNumNodes,
6529                                                 treeDepth_);
6530 
6531     const int numNode = rampUpSubTree_->nodePool()->getNumKnowledges();
6532 
6533     if (numNode == 0) {
6534         if (hubMsgLevel_ > 0) {
6535             messageHandler()->message(ALPS_RAMPUP_HUB_FAIL, messages())
6536                 << globalRank_ << CoinMessageEol;
6537         }
6538     }
6539     else {    // Distribute nodes
6540         int numSent = 0;
6541         while ( numSent < numNode) {
6542             for (i = 0; i < clusterSize_; ++i) {
6543                 if (numSent >= numNode ) break;
6544 
6545                 if (i == clusterRank_) {
6546                     if (hubWork_) {
6547                         // Hub need work, so Keep a node for self
6548                         AlpsSubTree* myTree = dynamic_cast<AlpsSubTree*>
6549                             (const_cast<AlpsKnowledge *> (decoderObject
6550                                 (AlpsKnowledgeTypeSubTree)))->newSubTree();
6551                         myTree->setBroker(this);
6552                         myTree->setNodeSelection(nodeSelection_);
6553 
6554                         AlpsTreeNode* node = static_cast<AlpsTreeNode* >
6555                             (rampUpSubTree_->nodePool()->getKnowledge().first);
6556                         rampUpSubTree_->nodePool()->popKnowledge();
6557 
6558                         node->setBroker(this);
6559                         // todo(aykut) node desc does not hold a pointer to the model
6560                         // it holds  pointer to the broker.
6561                         //node->modifyDesc()->setModel(model_);
6562                         node->setParent(NULL);
6563                         node->setParentIndex(-1);
6564                         node->setNumChildren(0);
6565                         node->setStatus(AlpsNodeStatusCandidate);
6566                         myTree->nodePool()->addKnowledge(node,
6567                                                          node->getQuality());
6568                         assert(myTree->getNumNodes() == 1);
6569                         myTree->setRoot(node); // Don't forget!
6570                         myTree->calculateQuality();
6571                         addKnowledge(AlpsKnowledgeTypeSubTree, myTree,
6572                                      myTree->getQuality());
6573                         ++numSent;
6574                     }
6575                 }
6576                 else {
6577                     // Send a node to work_i
6578                     sendKnowledge(AlpsKnowledgeTypeNode,
6579                                   globalRank_,
6580                                   i,
6581                                   smallBuffer_,
6582                                   0,
6583                                   MPI_ANY_TAG,
6584                                   clusterComm_,
6585                                   true);
6586 
6587                     ++numSent;
6588                 }
6589             } // EOF of for loop
6590         } // EOF of while loop
6591     }
6592 
6593     assert(rampUpSubTree_->getNumNodes() == 0);
6594     rampUpSubTree_->nullRootActiveNode();
6595 
6596     //------------------------------------------------------
6597     // Sent finish initialization tag to workers so that they know to
6598     // stop receiving subtrees.
6599     //------------------------------------------------------
6600 
6601     for (i = 0; i < clusterSize_; ++i) {
6602         if (i != clusterRank_) {
6603             sendFinishInit(i, clusterComm_);
6604         }
6605     }
6606 
6607     //------------------------------------------------------
6608     // Sent generated model knowledge to its workers.
6609     //------------------------------------------------------
6610 
6611     for (i = 0; i < clusterSize_; ++i) {
6612         if (i != clusterRank_) {
6613             sendKnowledge(AlpsKnowledgeTypeModelGen,
6614                           globalRank_,
6615                           i,             // receiver
6616                           largeBuffer2_,
6617                           0,
6618                           AlpsMsgModelGenRampUp,
6619                           clusterComm_,  // comm
6620                           true);
6621             // sendModelKnowledge(genBuf, AlpsMsgModelGenRampUp, clusterComm_, i);
6622         }
6623     }
6624 
6625     //------------------------------------------------------
6626     // If have better solution, broadcast its value and process id.
6627     //------------------------------------------------------
6628 
6629     if (hasKnowledge(AlpsKnowledgeTypeSolution)) {
6630         double incVal = getBestKnowledge(AlpsKnowledgeTypeSolution).second;
6631         if(incVal < incumbentValue_) {    // Assume Minimization
6632             incumbentValue_ = incVal;
6633             incumbentID_ = globalRank_;
6634             sendKnowledge(AlpsKnowledgeTypeSolution, //useful
6635                           globalRank_,
6636                           0,
6637                           smallBuffer_,
6638                           0,
6639                           MPI_ANY_TAG,
6640                           MPI_COMM_WORLD,
6641                           false);
6642             //sendIncumbent();
6643             if (hubMsgLevel_ > 0) {
6644                 messageHandler()->message(ALPS_RAMPUP_HUB_SOL, messages())
6645                     << globalRank_ << incVal << CoinMessageEol;
6646             }
6647         }
6648     }
6649 
6650     rampUpTime_ = hubTimer_.getTime();
6651 
6652     if (hubMsgLevel_ > 0) {
6653         messageHandler()->message(ALPS_RAMPUP_HUB, messages())
6654             << globalRank_ << rampUpTime_ << nodeProcessedNum_ << numNode
6655             << CoinMessageEol;
6656     }
6657 
6658     if (nodeProcessedNum_) {
6659         nodeProcessingTime_ = rampUpTime_/nodeProcessedNum_;
6660     }
6661 
6662     /* Reset to normal selection. */
6663     rampUpSubTree_->setNodeSelection(nodeSelection_);
6664 }
6665 
6666 //#############################################################################
6667 
6668 void
rootInitWorker()6669 AlpsKnowledgeBrokerMPI::rootInitWorker()
6670 {
6671     int numTrees = 0;
6672     MPI_Status status;
6673     const int workerMsgLevel =
6674         model_->AlpsPar()->entry(AlpsParams::workerMsgLevel);
6675 
6676     //------------------------------------------------------
6677     // Recv subtrees(nodes) sent by my hub.
6678     //------------------------------------------------------
6679 
6680 #ifdef NF_DEBUG
6681     std::cout << "WORKER["<< globalRank_ << "]: before rampup." << std::endl;
6682 #endif
6683 
6684     while(true) {
6685 
6686         rampUpSubTree_ = dynamic_cast<AlpsSubTree*>
6687             (const_cast<AlpsKnowledge *>
6688              (decoderObject(AlpsKnowledgeTypeSubTree)))->
6689             newSubTree();
6690         rampUpSubTree_->setBroker(this);
6691         rampUpSubTree_->setNodeSelection(nodeSelection_);
6692 
6693         // NOTE: received subtree is decoded into rampUpSubTree_
6694         receiveKnowledge(AlpsKnowledgeTypeNode,
6695                          masterRank_,  // sender,
6696                          globalRank_,  // receiver,
6697                          smallBuffer_,
6698                          0,
6699                          MPI_ANY_TAG,
6700                          clusterComm_, // cluster comm
6701                          &status,
6702                          true);
6703 
6704         if (status.MPI_TAG == AlpsMsgFinishInit) {
6705             if (workerMsgLevel > 0) {
6706                 messageHandler()->message(ALPS_RAMPUP_WORKER_RECV, messages())
6707                     << globalRank_ << myHubRank_ << CoinMessageEol;
6708             }
6709             delete rampUpSubTree_;
6710             rampUpSubTree_ = NULL;
6711             break;
6712         }
6713 
6714         assert(rampUpSubTree_->getNumNodes() > 0);
6715 
6716         rampUpSubTree_->calculateQuality();
6717 
6718         addKnowledge(AlpsKnowledgeTypeSubTree,
6719                      rampUpSubTree_,
6720                      rampUpSubTree_->getQuality());
6721 
6722         rampUpSubTree_ = NULL;
6723 
6724         ++numTrees;
6725         assert(numTrees == subTreePool_->getNumKnowledges());
6726     }
6727 
6728     //------------------------------------------------------
6729     // Receive generated knowledge form its hub.
6730     //------------------------------------------------------
6731 
6732     receiveKnowledge(AlpsKnowledgeTypeModelGen,
6733                      myHubRank_,  // sender
6734                      globalRank_, // receiver,
6735                      smallBuffer_,
6736                      0,
6737                      MPI_ANY_TAG,
6738                      clusterComm_,// cluster comm
6739                      &status,
6740                      true);
6741     // receiveModelKnowlege(clusterComm_);
6742 
6743 }
6744 
6745 //#############################################################################
6746 
6747 void
spiralMaster(AlpsTreeNode * root)6748 AlpsKnowledgeBrokerMPI::spiralMaster(AlpsTreeNode *root)
6749 {
6750     // Create initial number of nodes (large than 1)
6751     int k;
6752     int requiredNumNodes = 2;
6753     int receiver, pos, sender;
6754     int numSent = 0;
6755     int numLoaded = 0;
6756     int doUnitWork = 1;
6757     int donorNodes = 0;
6758     int numFinishSiginal = 1; // Count master itself
6759     int numCompleted = 0;
6760 
6761     bool stopSearch = false;
6762     bool inRampUp = true;
6763     double incVal;
6764 
6765     MPI_Status status;
6766 
6767     const int smallSize = model_->AlpsPar()->entry(AlpsParams::smallSize);
6768 
6769     if (msgLevel_ > 0) {
6770         messageHandler()->message(ALPS_STATIC_BALANCE_BEG, messages())
6771             << "spiral initialization" << CoinMessageEol;
6772     }
6773 
6774     if (msgLevel_ > 0 && requiredNumNodes > 0) {
6775         messageHandler()->message(ALPS_RAMPUP_MASTER_START, messages())
6776             << globalRank_ << requiredNumNodes << CoinMessageEol;
6777     }
6778 
6779     rampUpSubTree_ = dynamic_cast<AlpsSubTree*>
6780         (const_cast<AlpsKnowledge *>(decoderObject(AlpsKnowledgeTypeSubTree)))
6781         ->newSubTree();
6782     rampUpSubTree_->setBroker(this);
6783     rampUpSubTree_->setNodeSelection(rampUpNodeSelection_);
6784     rampUpSubTree_->setNextIndex(1); // One more than root's index
6785 
6786     nodeProcessedNum_ += rampUpSubTree_->rampUp(1,  // minimum number
6787                                                 requiredNumNodes,
6788                                                 treeDepth_,
6789                                                 root);
6790 
6791     // Distribute created nodes to other processes
6792     int numGenNodes = rampUpSubTree_->nodePool()->getNumKnowledges();
6793 
6794     if (msgLevel_ > 200) {
6795         std::cout << "master: spiral: numGenNodes = " << numGenNodes<< std::endl;
6796     }
6797 
6798 
6799     if (numGenNodes == 0) {
6800         // Send initialization complete signal to other processes.
6801         for (k = 0; k < processNum_; ++k) {
6802             if (k != globalRank_) {
6803                 if (msgLevel_ > 200) {
6804                     std::cout<<"master: spiral: ask others to exit"<<std::endl;
6805                 }
6806                 MPI_Send(smallBuffer_, 0, MPI_PACKED, k,
6807                          AlpsMsgFinishInit, MPI_COMM_WORLD);
6808             }
6809         }
6810         stopSearch = true;
6811         goto TERM_SPIRAL_MASTER;
6812     }
6813 
6814     numSent = 0;
6815     numLoaded = 1;
6816     doUnitWork = 1;
6817     if (numGenNodes > processNum_ - 2) {
6818         doUnitWork = 0;
6819     }
6820 
6821     while (numSent < numGenNodes) {
6822         for (k = 0; k < processNum_; ++k) {
6823             if(numSent == numGenNodes) {
6824                 break; // Distributed all nodes
6825             }
6826             if (k != globalRank_) {
6827                 if (msgLevel_ > 200) {
6828                     std::cout << "master: spiral: send a node to process "
6829                               << k << std::endl;
6830                 }
6831                 sendNodeModelGen(k, doUnitWork);
6832                 ++numSent;
6833                 ++numLoaded;
6834             }
6835         }
6836     }
6837 
6838     assert(rampUpSubTree_->getNumNodes() == 0);
6839     rampUpSubTree_->nullRootActiveNode();
6840 
6841     // Manage rampup
6842     inRampUp = true;
6843     donorNodes = 0; // Count master itself
6844     numFinishSiginal = 1; // Count master itself
6845     numCompleted = 0;
6846 
6847     if (doUnitWork) {
6848         // If ask other process do unit work, then they will report back.
6849         while (inRampUp) {
6850             MPI_Recv(&donorNodes, 1, MPI_INT, MPI_ANY_SOURCE,
6851                      AlpsMsgRampUpLoad, MPI_COMM_WORLD, &status);
6852             sender = status.MPI_SOURCE;
6853             if ((donorNodes > 1) && (numLoaded < processNum_)) {
6854                 // Ask the sender to donate a node to process numLoaded;
6855                 receiver = numLoaded;
6856                 pos = 0;
6857                 MPI_Pack(&receiver, 1, MPI_INT, smallBuffer_, smallSize, &pos,
6858                          MPI_COMM_WORLD);
6859                 MPI_Send(smallBuffer_, pos, MPI_PACKED, sender,
6860                          AlpsMsgRampUpDonate, MPI_COMM_WORLD);
6861                 ++numLoaded;
6862             }
6863             else {
6864                 MPI_Send(smallBuffer_, 0, MPI_PACKED, sender,
6865                          AlpsMsgFinishInit, MPI_COMM_WORLD);
6866                 ++numFinishSiginal;
6867                 if (donorNodes < 1) {
6868                     ++numCompleted;
6869                 }
6870             }
6871 
6872             //----------------------------------------------
6873             // If have sent all "loaded" processes finish signial OR
6874             // if all "loaded" processes have no nodes, search completed.
6875             //----------------------------------------------
6876             // NOTE: numLoaded is greater than the actual number of nodes
6877             //       sent by 1
6878             //----------------------------------------------
6879 
6880             if (msgLevel_ > 200) {
6881                 std::cout<<"master: spiral: numCompleted = " << numCompleted
6882                          << "; numLoaded = " << numLoaded << std::endl;
6883             }
6884 
6885             if (numCompleted == numLoaded - 1){
6886                 if (msgLevel_ > 200) {
6887                     std::cout<<"master: spiral: no nodes left, search completed."
6888                              << std::endl;
6889                 }
6890                 inRampUp = false;
6891                 // Send finish signal to rest processes
6892                 for (k = numLoaded; k < processNum_; ++k) {
6893                     MPI_Send(smallBuffer_, 0, MPI_PACKED, k,
6894                              AlpsMsgFinishInit, MPI_COMM_WORLD);
6895                     ++numFinishSiginal;
6896                 }
6897                 assert(numFinishSiginal == processNum_);
6898             }
6899 
6900             if (numFinishSiginal == processNum_) {
6901                 if (msgLevel_ > 200) {
6902                     std::cout<<"master: spiral: all processed have been loaded."
6903                              << std::endl;
6904                 }
6905                 inRampUp = false;
6906             }
6907         }
6908     }
6909     else {
6910         // Just ask them to exit from rampup
6911         for (k = 0; k < processNum_; ++k) {
6912             if (k != globalRank_) {
6913                 MPI_Send(smallBuffer_, 0, MPI_PACKED, k,
6914                          AlpsMsgFinishInit, MPI_COMM_WORLD);
6915             }
6916         }
6917     }
6918 
6919 
6920 TERM_SPIRAL_MASTER:
6921 
6922     MPI_Barrier(MPI_COMM_WORLD);
6923 
6924     //------------------------------------------------------
6925     // If have better solution, broadcast its value and process id.
6926     //------------------------------------------------------
6927 
6928     if (!stopSearch && hasKnowledge(AlpsKnowledgeTypeSolution)) {
6929         incVal = getBestKnowledge(AlpsKnowledgeTypeSolution).second;
6930         if(incVal < incumbentValue_) {   // Assume Minimization
6931             incumbentValue_ = incVal;
6932             incumbentID_ = globalRank_;
6933             sendKnowledge(AlpsKnowledgeTypeSolution, //useful
6934                           globalRank_,
6935                           0,
6936                           smallBuffer_,
6937                           0,
6938                           MPI_ANY_TAG,
6939                           MPI_COMM_WORLD,
6940                           false);
6941             //sendIncumbent();
6942             if (msgLevel_ > 0) {
6943                 messageHandler()->message(ALPS_RAMPUP_MASTER_SOL, messages())
6944                     << globalRank_ << incVal << CoinMessageEol;
6945             }
6946         }
6947     }
6948 
6949     if (msgLevel_ > 0) {
6950         messageHandler()->message(ALPS_STATIC_BALANCE_END, messages())
6951             << "spiral initialization" << CoinMessageEol;
6952     }
6953 }
6954 
6955 //#############################################################################
6956 
6957 void
spiralHub()6958 AlpsKnowledgeBrokerMPI::spiralHub()
6959 {
6960     bool inRampUp = true;
6961     MPI_Status status;
6962 
6963     while(inRampUp) {
6964         MPI_Recv(largeBuffer_, largeSize_, MPI_PACKED, MPI_ANY_SOURCE,
6965                  MPI_ANY_TAG, MPI_COMM_WORLD, &status);
6966         switch(status.MPI_TAG) {
6967         case AlpsMsgNode:
6968             // Unpack the node, explore it and send load info to master
6969             spiralRecvProcessNode();
6970             break;
6971         case AlpsMsgRampUpDonate:
6972             // Unpack msg and donate a node
6973             spiralDonateNode();
6974             break;
6975         case AlpsMsgFinishInit:
6976             inRampUp = false;
6977             break;
6978         default:
6979             std::cout << "PROC " << globalRank_
6980                       << " : spiralHub: received UNKNOWN message. tag = "
6981                       << status.MPI_TAG <<  std::endl;
6982             assert(0);
6983             throw CoinError("Unknown message type", "spiralHubWorker",
6984                             "AlpsKnowledgeBrokerMPI");
6985         }
6986     }
6987 
6988     if (hubMsgLevel_ > 10) {
6989         std::cout << "hub[" << globalRank_
6990                   << "]: spiral: exited from while." << std::endl;
6991     }
6992 
6993     if (hubWork_) {
6994         // Put rampup subtree in pool
6995         if (rampUpSubTree_) {
6996             rampUpSubTree_->calculateQuality();
6997             addKnowledge(AlpsKnowledgeTypeSubTree,
6998                          rampUpSubTree_,
6999                          rampUpSubTree_->getQuality());
7000             rampUpSubTree_ = NULL;
7001         }
7002     }
7003     else {
7004         // hub send its nodes to its workers.
7005         int i;
7006         int numSent = 0;
7007         int receiver = -1;
7008 
7009         if (rampUpSubTree_) { // Probably deleted by spiralRecvProcessNode()
7010             const int numNode = rampUpSubTree_->nodePool()->getNumKnowledges();
7011             while ( numSent < numNode) {
7012                 for (i = 0; i < clusterSize_; ++i) {
7013                     if (numSent == numNode ) break;
7014                     if (i != clusterRank_) {
7015                         // Send a node to work receiver
7016                         receiver =
7017                             cluster2GlobalRank(masterRank_, myHubRank_, i);
7018                         sendNodeModelGen(receiver, 0); // Just recv,not work
7019                         ++numSent;
7020                     }
7021                 }
7022             } // EOF of while loop
7023             assert(rampUpSubTree_->getNumNodes() == 0);
7024             rampUpSubTree_->nullRootActiveNode();
7025         }
7026 
7027         // Send hub finish tag
7028         char* dummyBuf = 0;
7029         for (i = 0; i < clusterSize_; ++i) {
7030             if (i != clusterRank_) {
7031                 receiver = cluster2GlobalRank(masterRank_, myHubRank_, i);
7032                 MPI_Send(dummyBuf, 0, MPI_PACKED, receiver,
7033                          AlpsMsgFinishInitHub, MPI_COMM_WORLD);
7034             }
7035         }
7036     }
7037 
7038     MPI_Barrier(MPI_COMM_WORLD);
7039 
7040     if (hubMsgLevel_ > 10) {
7041         std::cout << "hub: spiral: finished." << std::endl;
7042     }
7043 
7044     //------------------------------------------------------
7045     // If have better solution, broadcast its value and process id.
7046     //------------------------------------------------------
7047 
7048     if (hasKnowledge(AlpsKnowledgeTypeSolution)) {
7049         double incVal = getBestKnowledge(AlpsKnowledgeTypeSolution).second;
7050         if(incVal < incumbentValue_) {   // Assume Minimization
7051             incumbentValue_ = incVal;
7052             incumbentID_ = globalRank_;
7053             sendKnowledge(AlpsKnowledgeTypeSolution, //useful
7054                           globalRank_,
7055                           0,
7056                           smallBuffer_,
7057                           0,
7058                           MPI_ANY_TAG,
7059                           MPI_COMM_WORLD,
7060                           false);
7061             //sendIncumbent();
7062             if (msgLevel_ > 0) {
7063                 messageHandler()->message(ALPS_RAMPUP_HUB_SOL, messages())
7064                     << globalRank_ << incVal << CoinMessageEol;
7065             }
7066         }
7067     }
7068 }
7069 
7070 //#############################################################################
7071 
7072 void
spiralWorker()7073 AlpsKnowledgeBrokerMPI::spiralWorker()
7074 {
7075     // Exit when receive instruction from Hub and master
7076     int exitCount = 2;
7077     int count = 0;
7078     bool inRampUp = true;
7079 
7080     if (hubWork_ || (globalRank_ < clusterSize_)) {
7081         // Hub won't send it node to me since it works.
7082         // If in master's cluster, then exit now.
7083         exitCount = 1;
7084     }
7085 
7086     // Note the sequence to receive AlpsMsgFinishInit or
7087     // AlpsMsgFinishInitHub is not deterministic.
7088     MPI_Status status;
7089     while(inRampUp) {
7090         MPI_Recv(largeBuffer_, largeSize_, MPI_PACKED, MPI_ANY_SOURCE,
7091                  MPI_ANY_TAG, MPI_COMM_WORLD, &status);
7092         switch(status.MPI_TAG) {
7093         case AlpsMsgNode:
7094             // Unpack the node, explore it and send load info to master
7095             spiralRecvProcessNode();
7096             break;
7097         case AlpsMsgRampUpDonate:
7098             // Unpack msg and donate a node
7099             spiralDonateNode();
7100             break;
7101         case AlpsMsgFinishInit:
7102             ++count;
7103             if (count == exitCount) {
7104                 inRampUp = false;
7105             }
7106             break;
7107         case AlpsMsgFinishInitHub:
7108             ++count;
7109             if (count == exitCount) {
7110                 inRampUp = false;
7111             }
7112             break;
7113         default:
7114             std::cout << "PROC " << globalRank_
7115                       << " : spiralWorker: received UNKNOWN message. tag = "
7116                       << status.MPI_TAG <<  std::endl;
7117             assert(0);
7118             throw CoinError("Unknown message type", "spiralHubWorker",
7119                             "AlpsKnowledgeBrokerMPI");
7120         }
7121     }
7122 
7123 #if 0
7124     std::cout << "worker[" << globalRank_
7125               << "]: spiral: exited from while." << std::endl;
7126 #endif
7127 
7128     // Put rampup subtree in pool
7129     if (rampUpSubTree_) {
7130         rampUpSubTree_->calculateQuality();
7131         addKnowledge(AlpsKnowledgeTypeSubTree,
7132                      rampUpSubTree_,
7133                      rampUpSubTree_->getQuality());
7134         rampUpSubTree_ = NULL;
7135     }
7136 
7137     MPI_Barrier(MPI_COMM_WORLD);
7138 
7139     //------------------------------------------------------
7140     // If have better solution, broadcast its value and process id.
7141     //------------------------------------------------------
7142 
7143     if (hasKnowledge(AlpsKnowledgeTypeSolution)) {
7144         double incVal = getBestKnowledge(AlpsKnowledgeTypeSolution).second;
7145         if(incVal < incumbentValue_) {   // Assume Minimization
7146             incumbentValue_ = incVal;
7147             incumbentID_ = globalRank_;
7148             sendKnowledge(AlpsKnowledgeTypeSolution, //useful
7149                           globalRank_,
7150                           0,
7151                           smallBuffer_,
7152                           0,
7153                           MPI_ANY_TAG,
7154                           MPI_COMM_WORLD,
7155                           false);
7156             //sendIncumbent();
7157             if (msgLevel_ > 10) {
7158                 messageHandler()->message(ALPS_RAMPUP_WORKER_SOL, messages())
7159                     << globalRank_ << incVal << CoinMessageEol;
7160             }
7161         }
7162     }
7163 
7164     if (msgLevel_ > 200) {
7165         std::cout << "worker[" << globalRank_<<"]: spiral: finished." << std::endl;
7166     }
7167 }
7168 
7169 //#############################################################################
7170 
7171 void
spiralRecvProcessNode()7172 AlpsKnowledgeBrokerMPI::spiralRecvProcessNode()
7173 {
7174     //--------------------------------------------
7175     // Unpack if doUnitWork
7176     //--------------------------------------------
7177 
7178     int pos = 0;
7179     int doUnitWork = 0;
7180     MPI_Unpack(largeBuffer_, largeSize_, &pos, &doUnitWork, 1,
7181                MPI_INT, MPI_COMM_WORLD);
7182     if (msgLevel_ > 200) {
7183         std::cout << "PROCESS " << globalRank_ << " : received a node, doUnitWork "
7184                   << doUnitWork << std::endl;
7185     }
7186 
7187     //--------------------------------------------
7188     // Unpack a node
7189     //--------------------------------------------
7190 
7191     rampUpSubTree_ = dynamic_cast<AlpsSubTree*>
7192         (const_cast<AlpsKnowledge *>(decoderObject(AlpsKnowledgeTypeSubTree)))->newSubTree();
7193     rampUpSubTree_->setBroker(this);
7194     rampUpSubTree_->setNodeSelection(nodeSelection_);
7195 
7196     AlpsEncoded* encodedNode = unpackEncoded(largeBuffer_,pos,MPI_COMM_WORLD);
7197     AlpsTreeNode* node = dynamic_cast<AlpsTreeNode* >
7198         ( decoderObject(encodedNode->type())->decode(*encodedNode) );
7199 
7200     node->setBroker(this);
7201     // todo(aykut) node desc does not hold a pointer to the model
7202     // it holds  pointer to the broker.
7203     //node->modifyDesc()->setModel(model_);
7204     node->setParent(NULL);
7205 
7206     rampUpSubTree_->nodePool()->addKnowledge(node, node->getQuality());
7207     assert(rampUpSubTree_->getNumNodes() > 0);
7208     if ( (rampUpSubTree_->nodePool()->getNumKnowledges() ) == 1) {
7209         // Make the first node as root.
7210         rampUpSubTree_->setRoot(node);
7211     }
7212 
7213     if (encodedNode) {
7214         delete encodedNode;
7215         encodedNode = 0;
7216     }
7217     node = NULL;
7218 
7219     //--------------------------------------------
7220     // Unpack model knowledge
7221     //--------------------------------------------
7222     int hasKnowledge = 0;
7223     MPI_Unpack(largeBuffer_, largeSize_, &pos, &hasKnowledge, 1,
7224                MPI_INT, MPI_COMM_WORLD);
7225     if (hasKnowledge) {
7226         // Upack knowledge from buffer.
7227         AlpsEncoded* encodedModelGen = unpackEncoded(largeBuffer_,
7228                                                      pos,
7229                                                      MPI_COMM_WORLD,
7230                                                      largeSize_);
7231         // Upack and store knowledge from larger buffer.
7232         model_->unpackSharedKnowledge(*encodedModelGen);
7233         delete encodedModelGen;
7234         encodedModelGen = NULL;
7235     }
7236 
7237     //--------------------------------------------
7238     // Do one unit work or store the subtree
7239     //--------------------------------------------
7240 
7241     if (doUnitWork) {
7242         // Do one unit of work and report load info
7243         int requiredNumNodes = 2;
7244         nodeProcessedNum_ += rampUpSubTree_->rampUp(1,  // minimum number
7245                                                     requiredNumNodes,
7246                                                     treeDepth_,
7247                                                     node);
7248 
7249         // Send load infor to master
7250         int numNodes = rampUpSubTree_->getNumNodes();
7251         if (msgLevel_ > 200) {
7252             std::cout << "PROCESS " << globalRank_
7253                       << " : numNodes = " << numNodes << std::endl;
7254         }
7255 
7256         const int smallSize = model_->AlpsPar()->entry(AlpsParams::smallSize);
7257 
7258         pos = 0;
7259         MPI_Pack(&numNodes, 1, MPI_INT, smallBuffer_, smallSize, &pos,
7260                  MPI_COMM_WORLD);
7261         MPI_Send(smallBuffer_, pos, MPI_PACKED, masterRank_,
7262                  AlpsMsgRampUpLoad, MPI_COMM_WORLD);
7263         if (numNodes == 0) {
7264             delete rampUpSubTree_;
7265             rampUpSubTree_ = NULL;
7266         }
7267     }
7268     else {
7269         // Put rampup subtree in subtree pool
7270         rampUpSubTree_->calculateQuality();
7271         addKnowledge(AlpsKnowledgeTypeSubTree,
7272                      rampUpSubTree_,
7273                      rampUpSubTree_->getQuality());
7274         rampUpSubTree_ = NULL;
7275     }
7276 }
7277 
7278 //#############################################################################
7279 
7280 // TODO: With model knowledge?
7281 void
spiralDonateNode()7282 AlpsKnowledgeBrokerMPI::spiralDonateNode()
7283 {
7284     // Get receiver rank
7285     int i;
7286     int position = 0;
7287     int receiver = -1;
7288 
7289     int numNodes = rampUpSubTree_->getNumNodes();
7290     assert(numNodes > 0);
7291 
7292     MPI_Unpack(largeBuffer_, largeSize_, &position, &receiver, 1,
7293                MPI_INT, MPI_COMM_WORLD);
7294 
7295     if (msgLevel_ > 200) {
7296         std::cout << "worker " << globalRank_ << ": spiral: has " << numNodes
7297                   << " nodes; send a node to process "
7298                   << receiver << std::endl;
7299     }
7300     // Get a node for receiver
7301     // Send load infor to master
7302     AlpsTreeNode* node = dynamic_cast<AlpsTreeNode* >
7303         (rampUpSubTree_->nodePool()->getKnowledge().first);
7304     AlpsEncoded* enc = node->encode();
7305 
7306     rampUpSubTree_->nodePool()->popKnowledge();
7307 
7308     // Update parent linkage
7309     AlpsTreeNode *parent = node->getParent();
7310     int numChildren = -1;
7311     if (parent) {
7312         numChildren = parent->getNumChildren();
7313         for (i = 0; i < numChildren; ++i) {
7314             if (parent->getChild(i) == node) break;
7315         }
7316         parent->setChild(i, parent->getChild(numChildren - 1));
7317         parent->setChild(numChildren - 1, NULL);
7318         parent->modifyNumChildren(-1);  // A child have gone
7319     }
7320 
7321     // Update children's linkage
7322     numChildren = node->getNumChildren();
7323     AlpsTreeNode *child = NULL;
7324     for (i = 0; i < numChildren; ++i) {
7325         child = node->getChild(i);
7326         child->setParent(NULL);
7327         child->setParentIndex(-1);
7328     }
7329 
7330     // Delete the node since it been sent to other process
7331     delete node;
7332     node = NULL;
7333 
7334     // Send the encoded node
7335     position = 0;
7336     int doUnitWork = 1;
7337     int hasKnowledge = 0;
7338 
7339     // Pack if doUnitWork
7340     MPI_Pack(&doUnitWork, 1, MPI_INT, largeBuffer_, largeSize_, &position,
7341              MPI_COMM_WORLD);
7342     packEncoded(enc, largeBuffer_, largeSize_, position, MPI_COMM_WORLD);
7343     MPI_Send(largeBuffer_, position, MPI_PACKED, receiver, AlpsMsgNode,
7344              MPI_COMM_WORLD);
7345     MPI_Pack(&hasKnowledge, 1, MPI_INT, largeBuffer_, largeSize_, &position,
7346              MPI_COMM_WORLD);
7347     if (enc) {
7348         delete enc;
7349         enc = 0;                 // Allocated in encode()
7350     }
7351 
7352     // Send load infor to master
7353     int numNodesAfter = rampUpSubTree_->getNumNodes();
7354     if (msgLevel_ > 200) {
7355         std::cout << "worker " << globalRank_ << ": spiral: has "
7356                   << numNodesAfter << " nodes after donating" << std::endl;
7357     }
7358 
7359     // Do one unit of work
7360     int requiredNumNodes = 2;
7361     nodeProcessedNum_ += rampUpSubTree_->rampUp(1,  // minimum number
7362                                                 requiredNumNodes,
7363                                                 treeDepth_,
7364                                                 NULL);
7365 
7366     // Send load infor to master
7367     numNodesAfter = rampUpSubTree_->getNumNodes();
7368     if (msgLevel_ > 200) {
7369         std::cout << "worker " << globalRank_ << ": spiral: has "
7370                   << numNodesAfter << " nodes after do unit work" << std::endl;
7371     }
7372     const int smallSize = model_->AlpsPar()->entry(AlpsParams::smallSize);
7373     int pos = 0;
7374     MPI_Pack(&numNodesAfter, 1, MPI_INT, smallBuffer_, smallSize, &pos,
7375              MPI_COMM_WORLD);
7376     MPI_Send(smallBuffer_, pos, MPI_PACKED, masterRank_, AlpsMsgRampUpLoad,
7377              MPI_COMM_WORLD);
7378 
7379     if (numNodesAfter == 0) {
7380         delete rampUpSubTree_;
7381         rampUpSubTree_ = NULL;
7382     }
7383 }
7384 
7385 //#############################################################################
7386