1 /*===========================================================================*
2  * This file is part of the Abstract Library for Parallel Search (ALPS).     *
3  *                                                                           *
4  * ALPS is distributed under the Eclipse Public License as part of the       *
5  * COIN-OR repository (http://www.coin-or.org).                              *
6  *                                                                           *
7  * Authors:                                                                  *
8  *                                                                           *
9  *          Yan Xu, Lehigh University                                        *
10  *          Aykut Bulut, Lehigh University                                   *
11  *          Ted Ralphs, Lehigh University                                    *
12  *                                                                           *
13  * Conceptual Design:                                                        *
14  *                                                                           *
15  *          Yan Xu, Lehigh University                                        *
16  *          Ted Ralphs, Lehigh University                                    *
17  *          Laszlo Ladanyi, IBM T.J. Watson Research Center                  *
18  *          Matthew Saltzman, Clemson University                             *
19  *                                                                           *
20  *                                                                           *
21  * Copyright (C) 2001-2019, Lehigh University, Yan Xu, Aykut Bulut, and      *
22  *                          Ted Ralphs.                                      *
23  * All Rights Reserved.                                                      *
24  *===========================================================================*/
25 
26 
27 #ifndef AlpsKnowledgeBrokerMPI_h_
28 #define AlpsKnowledgeBrokerMPI_h_
29 
30 #include <cmath>
31 #include <iosfwd>
32 
33 // #undef SEEK_SET
34 // #undef SEEK_END
35 // #undef SEEK_CUR
36 #include <mpi.h>
37 
38 #include "AlpsEnumProcessT.h"
39 #include "AlpsKnowledge.h"
40 #include "AlpsKnowledgeBroker.h"
41 #include "AlpsParams.h"
42 
43 //#############################################################################
44 
45 /*!
46 
47   MPI implementation of #AlpsKnowledgeBroker interface. Each hub will have an
48   instance of this class to manage the knowledges sent and received with its
49   workers and with the master.
50 
51   There are 3 functions that controls the workflow #workerMain(), #hubMain() and
52   #masterMain().
53 
54  */
55 
56 class AlpsKnowledgeBrokerMPI : public AlpsKnowledgeBroker {
57 
58  private:
59     /** Disable copy construct and assignment operator. */
60     AlpsKnowledgeBrokerMPI(const AlpsKnowledgeBrokerMPI&);
61     AlpsKnowledgeBrokerMPI& operator=(const AlpsKnowledgeBrokerMPI&);
62 
63  protected:
64 
65     /** @name Process information
66      *
67      */
68     //@{
69     /** The Number of processes launched. */
70     int processNum_;
71 
72     /** The Number of hubs. */
73     int hubNum_;
74 
75     /** The rank of the process in MPI_COMM_WORLD. */
76     int globalRank_;
77 
78     /** Communicator of the cluster to which the process belongs. */
79     MPI_Comm clusterComm_;
80 
81     /** Communicator consists of all hubs. */
82     MPI_Comm hubComm_;
83 
84     /** MPI_Group consists of all hubs. */
85     MPI_Group hubGroup_;
86 
87     /** The actual size of the cluster to which the process belongs. */
88     int clusterSize_;
89 
90     /** The user reqested size of a cluster. */
91     int userClusterSize_;
92 
93     /** The local rank of the process in clusterComm_. */
94     int clusterRank_;
95 
96     /** The global ranks of the hubs. */
97     int* hubRanks_;
98 
99     /** The global rank of its hub for a worker. */
100     int myHubRank_;
101 
102     /** The global rank of the master. */
103     int masterRank_;
104 
105     /** The AlpsProcessType of this process. */
106     AlpsProcessType processType_;
107 
108     /** The AlpsProcessType of all process. */
109     AlpsProcessType* processTypeList_;
110 
111     /** Whether hub should also work as a worker. */
112     bool hubWork_;
113 
114     /** Send subtree request. */
115     MPI_Request subTreeRequest_;
116 
117     /** Send model knoledge request. */
118     MPI_Request solRequestL_;
119     MPI_Request solRequestR_;
120 
121     /** Send model knoledge request. */
122     MPI_Request modelKnowRequestL_;
123     MPI_Request modelKnowRequestR_;
124 
125     /** Forward model knoledge request. */
126     MPI_Request forwardRequestL_;
127     MPI_Request forwardRequestR_;
128     //@}
129 
130     /** @name Incumbent data
131      *
132      */
133     //@{
134     /** Incumbent value. */
135     double incumbentValue_;
136 
137     /** The process id that store the incumbent. */
138     int incumbentID_;
139 
140     /** Indicate whether the incumbent value is updated between two
141         checking point. */
142     bool updateIncumbent_;
143     //@}
144 
145     /** @name Workload balancing
146      *
147      */
148     //@{
149     /** The workload quality of the process. */
150     double workQuality_;
151 
152     /** The workload quality of the cluster to which the process belong. */
153     double clusterWorkQuality_;
154 
155     /** The workload quality of the whole system. */
156     double systemWorkQuality_;
157 
158     /** The workload qualities of hubs. */
159     double* hubWorkQualities_;
160 
161     /** The workload qualities of workers in the cluster to which this proces
162         belongs. Number of nodes is used as the quantities criteria. */
163     double* workerWorkQualities_;
164 
165     /** The workload quantity of the workload on the process. */
166     double workQuantity_;
167 
168     /** The workload quantity of the cluster to which the process belongs. */
169     double clusterWorkQuantity_;
170 
171     /** The workload quantity of the whole system. */
172     double systemWorkQuantity_;
173 
174     /** The workload quantity of the whole system before forcing termination. */
175     double systemWorkQuantityForce_;
176 
177     /** The workload quantities of all clusters/hubs. */
178     double* hubWorkQuantities_;
179 
180     /** The workload quantities of workers in the cluster to which this proces
181         belongs. */
182     double* workerWorkQuantities_;
183 
184     /** Indicate which worker has been reported its work. */
185     bool* workerReported_;
186 
187     /** Indicate which hub has been reported its work. */
188     bool* hubReported_;
189 
190     /** Indicate whether all hubs have reported status to master at least once.*/
191     bool allHubReported_;
192 
193     /** Whether master do load balance. 0: do; >0: blocked. */
194     int masterDoBalance_;
195 
196     /** Whether a hub do load balance. 0: do; >0: blocked. */
197     int hubDoBalance_;
198 
199     /** To record how many nodes processed for each worker in a cluster. */
200     int* workerNodeProcesseds_;
201 
202     /** To record how many nodes by a cluster. */
203     int clusterNodeProcessed_;
204 
205     /** To record how many nodes processed for each hub */
206     int* hubNodeProcesseds_;
207     //@}
208 
209     /** @name Message counts
210      *
211      */
212     //@{
213     /** The number of new messages sent by the process after last survey. */
214     int sendCount_;
215 
216     /** The number of new messages received by the process after last survey.*/
217     int recvCount_;
218 
219     /** The number of new messages sent by the processes in clusterComm_
220         after last survey.*/
221     int clusterSendCount_;
222 
223     /** The number of new messages received by the processes in clusterComm_
224         after last survey.*/
225     int clusterRecvCount_;
226 
227     /** The total number of messages sent by the all processes. */
228     int systemSendCount_;
229 
230     /** The total number of messages sent by the all processes. */
231     int systemRecvCount_;
232     //@}
233 
234     /** @name Node index
235      *
236      */
237     //@{
238     int masterIndexBatch_;
239     //@}
240 
241     /** @name Parallel statistics
242      *
243      */
244     //@{
245     /** Master timer */
246     AlpsTimer masterTimer_;
247 
248     /** Hub timer */
249     AlpsTimer hubTimer_;
250 
251     /** Worker timer */
252     AlpsTimer workerTimer_;
253 
254     /** The time spent in ramp up. */
255     double rampUpTime_;
256 
257     /** The time spent in ramp down. */
258     double rampDownTime_;
259 
260     /** The time spent waiting for work. */
261     double idleTime_;
262 
263     /** The time spent processing messages (include idle). */
264     double msgTime_;
265 
266     /** More statistics */
267     AlpsPsStats psStats_;
268     //@}
269 
270     /** Terminate due to reaching limits (time and node) or other reason. */
271     bool forceTerminate_;
272 
273     /** Indicate whether do termination check */
274     bool blockTermCheck_;
275 
276     /** Indicate whether a hub need to report state to master */
277     bool blockHubReport_;
278 
279     /** Indicate whether a worker need to report state to its hub */
280     bool blockWorkerReport_;
281 
282     /** Indicate whether a worker need to as for work from its hub */
283     bool blockAskForWork_;
284 
285     /** Buffer attached to MPI when sharing generated knowledge. */
286     char *attachBuffer_;
287 
288     /** Large message buffer. */
289     char *largeBuffer_;
290 
291     /** Large message buffer. Used for sharing model knowledge */
292     char *largeBuffer2_;
293 
294     /** Small message buffer. */
295     char *smallBuffer_;
296 
297     /** The period that master do load balancing. It changes
298         as search progresses. */
299     double masterBalancePeriod_;
300 
301     /** The period that a hub load balancing and report cluster status.
302         It changes as search progresses. */
303     double hubReportPeriod_;
304 
305     /** The global rank of the process that share generated model knowledge. */
306     int modelGenID_;
307 
308     /** Size of the shared knowledge. */
309     int modelGenPos_;
310 
311     /** A subtree used in during up. */
312     AlpsSubTree* rampUpSubTree_;
313 
314     /** Number of nodes in one unit of work */
315     int unitWorkNodes_;
316 
317     /** Temporily halt search */
318     int haltSearch_;
319 
320  protected:
321 
322     /** Initialize member data. */
323     void init();
324 
325     /** @name Core member functions for master, hubs and workers.
326      */
327     //@{
328     /** Master generates subtrees and sends them to hubs in Round-Robin way.
329         Master periodically do inter-cluster load balancing,
330         and termination check. */
331     void masterMain(AlpsTreeNode* root);
332 
333     /** Hub generates subtrees and sends them to workers in Round-Robin way.
334         Hub do intra-cluster load balancing. */
335     void hubMain();
336 
337     /** Worker first receive subtrees, then start to explore them. Worker also
338         peroidically check message and process message. */
339     void workerMain();
340     //@}
341 
342     /** Explore a subtree from subtree pool for certain units of work/time.*/
343     // The same subtree will be explored next time if it still have
344     // unexplored nodes.
345     AlpsReturnStatus doOneUnitWork(int unitWork,
346                                    double unitTime,
347                                    AlpsExitStatus & exitStatus,
348                                    int & numNodesProcessed,
349                                    int & numNodesBranched,
350                                    int & numNodesDiscarded,
351                                    int & numNodesPartial,
352                                    int & depth,
353                                    bool & betterSolution);
354 
355     /** Processing messages. */
356     void processMessages(char *&buffer,
357                          MPI_Status &status,
358                          MPI_Request &request);
359 
360     /** Static load balancing: Root Initialization */
361     void rootInitMaster(AlpsTreeNode* root);
362     void rootInitHub();
363     void rootInitWorker();
364 
365     /** Static load balancing: spiral */
366     void spiralMaster(AlpsTreeNode* root);
367     void spiralHub();
368     void spiralWorker();
369 
370     //------------------------------------------------------
371 
372     /** @name Load balancing member functions
373      */
374     //@{
375     /** Master asks a hub to donate its workload to another hub. */
376     void masterAskHubDonate(int donorID,
377                             int receiverID,
378                             double receiverWorkload);
379 
380     /** Hub asks a worker to donate its workload to another worker. */
381     void hubAskWorkerDonate(int donorID,
382                             int receiverID,
383                             double receiverWorkload);
384 
385     /** Calculate the work quality and quantity on this process. */
386     void updateWorkloadInfo();
387 
getNumNodeLeftSystem()388     virtual int getNumNodeLeftSystem()
389     { return static_cast<int>(systemWorkQuantity_); }
390 
391     /** A worker donate its workload to the specified worker. */
392     void donateWork(char*& buf,
393                     int tag,
394                     MPI_Status* status,
395                     int recvID = -1,
396                     double recvWL = 0.0);
397 
398     /** Hub allocates the donated workload to its workers. */
399     void hubAllocateDonation(char*& buf, MPI_Status* status);
400 
401     /** Hub balances the workloads of its workers. */
402     void hubBalanceWorkers();
403 
404     /** Hub satisfies the workload rquest from a worker. */
405     void hubSatisfyWorkerRequest(char*& buf, MPI_Status* status);
406 
407     /** A hub reports its status (workload and msg counts) to the master. */
408     // NOTE: comm is hubComm_ or MPI_COMM_WORLD.
409     void hubReportStatus(int tag, MPI_Comm comm);
410 
411     /** A hub unpacks the status of a worker from buffer. */
412     // NOTE: comm is clusterComm_ or MPI_COMM_WORLD.
413     void hubUpdateCluStatus(char*& buf, MPI_Status* status, MPI_Comm comm);
414 
415     /** Two hubs share their workload. */
416     void hubsShareWork(char*& buf, MPI_Status* status);
417 
418     /** Master balance the workload of hubs. */
419     void masterBalanceHubs();
420 
421     /** Master unpack the status of a hub from buf and update system status.*/
422     // NOTE: comm is hubComm or MPI_COMM_WORLD.
423     void masterUpdateSysStatus(char*& buf, MPI_Status* status, MPI_Comm comm);
424 
425     /** The master re-calculate the system status. */
426     void refreshSysStatus();
427 
428     /** A hub adds its status to the cluster's status. */
429     void refreshClusterStatus();
430 
431     /** A worker report its status (workload and msg counts) to its hub. */
432     // NOTE: comm is clusterComm_ or MPI_COMM_WORLD.
433     void workerReportStatus(int tag, MPI_Comm comm);
434     //@}
435 
436     //------------------------------------------------------
437 
438     /** @name Node index functions  // msg counts is modified inside
439      *
440      */
441     //@{
442     /** A worker ask for node index from master. */
443     void workerAskIndices();
444 
445     /** A worker receive node index from master. */
446     void workerRecvIndices(char *&bufLarge);
447 
448     /** Master send a batch of node indices to the receiving worker. */
449     void masterSendIndices(char *&bufLarge);
450     //@}
451 
452     //------------------------------------------------------
453 
454     /** @name Other message passing member functions
455      *
456      */
457     //@{
458     /** Broadcast the model from source to other processes. */
459     void broadcastModel(const int id, const int source);
460 
461     /** Sent the incumbent value and rank to its two child if eixt */
462     void sendIncumbent();
463 
464     /** unpack the incumbent value, then store it and the id of the process
465         having the incumbent in AlpsDataPool. */
466     bool unpackSetIncumbent(char*& buf, MPI_Status* status);
467 
468     /** Send the best solution from the process having it to destination. */
469     void collectBestSolution(int destination);
470 
471     /** Inform master that a proc has received workload during a load
472         balance initialized by master. */
473     void tellMasterRecv();
474 
475     /** Inform hub that a proc has received workload during a load
476         balance initialized by a hub. */
477     // Not used
478     void tellHubRecv();
479 
480     /** Pack an AlpsEncoded instance into buf. Return filled buf and size of
481         packed message.
482         position: where to start if buf is allocated.
483     */
484     void packEncoded(AlpsEncoded* enc,
485                      char*& buf,
486                      int& size,
487                      int& position,
488                      MPI_Comm comm);
489 
490     /** Unpack the given buffer into an AlpsEncoded instance. */
491     AlpsEncoded* unpackEncoded(char*& buf,
492                                int& position,
493                                MPI_Comm comm,
494                                int size = -1);
495 
496     /** Receive the size of buffer, allocate memory for buffer, then
497         receive the message and put it in buffer. */
498     // NOTE: comm is hubComm_ or clusterComm_
499     void receiveSizeBuf(char*& buf,
500                         int sender,
501                         int tag,
502                         MPI_Comm comm,
503                         MPI_Status* status);
504 
505     /** First receive the size and the contend of a node, then construct
506         a subtree with this received node. */
507     // NOTE: comm is hubComm_ or clusterComm_
508     void receiveRampUpNode(int sender,
509                            MPI_Comm comm,
510                            MPI_Status* status);
511 
512     /** Receive a subtree from the sender process and add it into
513         the subtree pool.*/
514     void receiveSubTree(char*& buf, int sender, MPI_Status* status);
515 
516     /** Send the size and content of a buffer to the target process. */
517     // NOTE: comm is hubComm_ or clusterComm_.
518     void sendSizeBuf(char*& buf,
519                      int size,
520                      int position,
521                      const int target,
522                      const int tag,
523                      MPI_Comm comm);
524 
525     /** Send the size and the content of the best node of a given subtree
526         to the target process. */
527     // NOTE: comm is hubComm_ or clusterComm_.
528     void sendRampUpNode(const int target, MPI_Comm comm);
529 
530     /** Send a node from rampUpSubTree's node pool and generated model
531         knowledge */
532     void sendNodeModelGen(int receiver, int doUnitWork);
533 
534     /** Send a given subtree to the target process. */
535     bool sendSubTree(const int target, AlpsSubTree*& st, int tag);
536 
537     /** Send finish initialization signal to the target process. */
538     // NOTE: comm is hubComm_ or clusterComm_.
539     void sendFinishInit(const int target, MPI_Comm comm);
540     //@}
541 
542     /** Delete subTrees in pools and the active subtree. */
543     void deleteSubTrees();
544 
545 
546     void forwardModelKnowledge();
547 
548     /** Set generated knowlege (related to model) to receiver. */
549     // NOTE: comm is hubComm_ or MPI_COMM_WORLD.
550     void sendModelKnowledge(MPI_Comm comm, int receiver=-1);
551 
552     /** Receive generated knowlege (related to model) from sender. */
553     // NOTE: comm is hubComm_ or MPI_COMM_WORLD.
554     void receiveModelKnowledge(MPI_Comm comm);
555 
556     /** @name Change message counts functions
557      */
558     //@{
559     /** Increment the number of sent message. */
560     void incSendCount(const char* how, int s = 1);
561     /** Decrement the number of sent message. */
562     void decSendCount(const char* how, int s = 1);
563     /** Increment the number of received message. */
564     void incRecvCount(const char* how, int s = 1);
565     /** Decrement the number of sent message. */
566     void decRecvCount(const char* how, int s = 1);
567     //@}
568 
569     /** Master tell hubs to terminate due to reaching limits or other reason.*/
570     void masterForceHubTerm();
571 
572     /** Hub tell workers to terminate due to reaching limits or other reason.*/
573     void hubForceWorkerTerm();
574 
575     /** Change subtree to be explored if it is too worse. */
576     void changeWorkingSubTree(double & changeWorkThreshold);
577 
578     /** Send error code to master. */
579     void sendErrorCodeToMaster(int errorCode);
580 
581     /** Receive error code and set solution status. */
582     void recvErrorCode(char *& bufLarge);
583 
584     /** Unpack the node, explore it and send load info to master. */
585     void spiralRecvProcessNode();
586 
587     /** Unpack msg and donate a node. */
588     void spiralDonateNode();
589 
590  public:
591 
592     /** Default construtor.
593         NOTE: must call initializeSearch() later. */
AlpsKnowledgeBrokerMPI()594     AlpsKnowledgeBrokerMPI()
595         :
596         AlpsKnowledgeBroker()
597         {
598             init();
599         }
600 
601     /** Useful construtor. */
AlpsKnowledgeBrokerMPI(int argc,char * argv[],AlpsModel & model)602     AlpsKnowledgeBrokerMPI(int argc,
603                            char* argv[],
604                            AlpsModel& model)
605         :
606         AlpsKnowledgeBroker()
607         {
608             init();
609             initializeSearch(argc, argv, model);
610         }
611 
612     /** Destructor. */
613     ~AlpsKnowledgeBrokerMPI();
614 
615     /** Query the global rank of the process. */
getProcRank()616     virtual int getProcRank() const { return globalRank_; }
617 
618     /** Query the global rank of the Master. */
getMasterRank()619     virtual int getMasterRank() const { return masterRank_; }
620 
621     /** Query the type (master, hub, or worker) of the process. */
getProcType()622     virtual AlpsProcessType getProcType() const { return processType_; }
623 
624     /** This function
625      * <ul>
626      *  <li> initializes the message environment;
627      *  <li> the master reads in ALPS and user's parameter sets. If the model
628      *  data is input from file, then it reads in the model data.
629      *  <li> sets up user params and model;
630      *  <li> broadcast parameters from the master to all other processes;
631      *  <li> creates MPI communicators and groups;
632      *  <li> classifies process types, sets up subtree and pools
633      *  <li> determines their hub's global rank for workers
634      * </ul>
635      */
636     void initializeSearch(int argc, char* argv[], AlpsModel& model);
637 
638     /** Search best solution for a given model. */
639     void search(AlpsModel *model);
640 
641     /** This function
642      * <ul>
643      * <li> broadcasts model data from the master to all other processes;
644      * <li> calls its associated main function to explore the sub tree;
645      * <li> collects the best solution found.
646      * </ul>
647      */
648     void rootSearch(AlpsTreeNode* root);
649 
650     /** @name Report search results. */
651     //@{
652     /** The process queries the quality of the incumbent this process stores. */
getIncumbentValue()653     virtual double getIncumbentValue() const {
654         double bestObj = ALPS_OBJ_MAX;
655         if (AlpsKnowledgeBroker::hasKnowledge(AlpsKnowledgeTypeSolution)) {
656             bestObj = getBestKnowledge(AlpsKnowledgeTypeSolution).second;
657             if (incumbentValue_ > bestObj) {
658                 return bestObj;
659             }
660         }
661         return incumbentValue_;
662     }
663 
664     /** The master queries the quality of the best solution it knowns. */
665   //todo(aykut) why not this returns to incumbentValue_
getBestQuality()666     virtual double getBestQuality() const {
667         if (globalRank_ == masterRank_) {
668             if (getNumKnowledges(AlpsKnowledgeTypeSolution) > 0) {
669                 return getBestKnowledge(AlpsKnowledgeTypeSolution).second;
670             }
671             else {
672                 return ALPS_OBJ_MAX;
673             }
674         }
675         else {
676             return ALPS_OBJ_MAX;
677         }
678     }
679 
680     /** Get best estimalted quality in system. */
getBestEstimateQuality()681     virtual double getBestEstimateQuality() { return systemWorkQuality_; }
682 
683     /** Master prints out the best solution that it knows. */
684     virtual void printBestSolution(char* outputFile = 0) const;
685 
686     /** Log search statistics. */
687     virtual void searchLog();
688     //@}
689 
690     //------------------------------------------------------
691 
692     /** @name Knowledge sharing functions
693      */
694     //@{
695     /** Set knowlege. */
696     void sendKnowledge(AlpsKnowledgeType type,
697                        int sender,
698                        int receiver,
699                        char *& msgBuffer,
700                        int msgSize,
701                        int msgTag,
702                        MPI_Comm comm,
703                        bool blocking);
704 
705     /** Receive knowlege. */
706     void receiveKnowledge(AlpsKnowledgeType type,
707                           int sender,
708                           int receiver,
709                           char *& msgBuffer,
710                           int msgSize,
711                           int msgTag,
712                           MPI_Comm comm,
713                           MPI_Status* status,
714                           bool blocking);
715 
716     /** Request knowlege. */
717     void requestKnowledge(AlpsKnowledgeType type,
718                           int sender,
719                           int receiver,
720                           char *& msgBuffer,
721                           int msgSize,
722                           int msgTag,
723                           MPI_Comm comm,
724                           bool blocking);
725     //@}
726 
727 };
728 #endif
729 
730 //#############################################################################
731