1 /*===========================================================================* 2 * This file is part of the Abstract Library for Parallel Search (ALPS). * 3 * * 4 * ALPS is distributed under the Eclipse Public License as part of the * 5 * COIN-OR repository (http://www.coin-or.org). * 6 * * 7 * Authors: * 8 * * 9 * Yan Xu, Lehigh University * 10 * Aykut Bulut, Lehigh University * 11 * Ted Ralphs, Lehigh University * 12 * * 13 * Conceptual Design: * 14 * * 15 * Yan Xu, Lehigh University * 16 * Ted Ralphs, Lehigh University * 17 * Laszlo Ladanyi, IBM T.J. Watson Research Center * 18 * Matthew Saltzman, Clemson University * 19 * * 20 * * 21 * Copyright (C) 2001-2019, Lehigh University, Yan Xu, Aykut Bulut, and * 22 * Ted Ralphs. * 23 * All Rights Reserved. * 24 *===========================================================================*/ 25 26 27 #ifndef AlpsKnowledgeBrokerMPI_h_ 28 #define AlpsKnowledgeBrokerMPI_h_ 29 30 #include <cmath> 31 #include <iosfwd> 32 33 // #undef SEEK_SET 34 // #undef SEEK_END 35 // #undef SEEK_CUR 36 #include <mpi.h> 37 38 #include "AlpsEnumProcessT.h" 39 #include "AlpsKnowledge.h" 40 #include "AlpsKnowledgeBroker.h" 41 #include "AlpsParams.h" 42 43 //############################################################################# 44 45 /*! 46 47 MPI implementation of #AlpsKnowledgeBroker interface. Each hub will have an 48 instance of this class to manage the knowledges sent and received with its 49 workers and with the master. 50 51 There are 3 functions that controls the workflow #workerMain(), #hubMain() and 52 #masterMain(). 53 54 */ 55 56 class AlpsKnowledgeBrokerMPI : public AlpsKnowledgeBroker { 57 58 private: 59 /** Disable copy construct and assignment operator. */ 60 AlpsKnowledgeBrokerMPI(const AlpsKnowledgeBrokerMPI&); 61 AlpsKnowledgeBrokerMPI& operator=(const AlpsKnowledgeBrokerMPI&); 62 63 protected: 64 65 /** @name Process information 66 * 67 */ 68 //@{ 69 /** The Number of processes launched. */ 70 int processNum_; 71 72 /** The Number of hubs. */ 73 int hubNum_; 74 75 /** The rank of the process in MPI_COMM_WORLD. */ 76 int globalRank_; 77 78 /** Communicator of the cluster to which the process belongs. */ 79 MPI_Comm clusterComm_; 80 81 /** Communicator consists of all hubs. */ 82 MPI_Comm hubComm_; 83 84 /** MPI_Group consists of all hubs. */ 85 MPI_Group hubGroup_; 86 87 /** The actual size of the cluster to which the process belongs. */ 88 int clusterSize_; 89 90 /** The user reqested size of a cluster. */ 91 int userClusterSize_; 92 93 /** The local rank of the process in clusterComm_. */ 94 int clusterRank_; 95 96 /** The global ranks of the hubs. */ 97 int* hubRanks_; 98 99 /** The global rank of its hub for a worker. */ 100 int myHubRank_; 101 102 /** The global rank of the master. */ 103 int masterRank_; 104 105 /** The AlpsProcessType of this process. */ 106 AlpsProcessType processType_; 107 108 /** The AlpsProcessType of all process. */ 109 AlpsProcessType* processTypeList_; 110 111 /** Whether hub should also work as a worker. */ 112 bool hubWork_; 113 114 /** Send subtree request. */ 115 MPI_Request subTreeRequest_; 116 117 /** Send model knoledge request. */ 118 MPI_Request solRequestL_; 119 MPI_Request solRequestR_; 120 121 /** Send model knoledge request. */ 122 MPI_Request modelKnowRequestL_; 123 MPI_Request modelKnowRequestR_; 124 125 /** Forward model knoledge request. */ 126 MPI_Request forwardRequestL_; 127 MPI_Request forwardRequestR_; 128 //@} 129 130 /** @name Incumbent data 131 * 132 */ 133 //@{ 134 /** Incumbent value. */ 135 double incumbentValue_; 136 137 /** The process id that store the incumbent. */ 138 int incumbentID_; 139 140 /** Indicate whether the incumbent value is updated between two 141 checking point. */ 142 bool updateIncumbent_; 143 //@} 144 145 /** @name Workload balancing 146 * 147 */ 148 //@{ 149 /** The workload quality of the process. */ 150 double workQuality_; 151 152 /** The workload quality of the cluster to which the process belong. */ 153 double clusterWorkQuality_; 154 155 /** The workload quality of the whole system. */ 156 double systemWorkQuality_; 157 158 /** The workload qualities of hubs. */ 159 double* hubWorkQualities_; 160 161 /** The workload qualities of workers in the cluster to which this proces 162 belongs. Number of nodes is used as the quantities criteria. */ 163 double* workerWorkQualities_; 164 165 /** The workload quantity of the workload on the process. */ 166 double workQuantity_; 167 168 /** The workload quantity of the cluster to which the process belongs. */ 169 double clusterWorkQuantity_; 170 171 /** The workload quantity of the whole system. */ 172 double systemWorkQuantity_; 173 174 /** The workload quantity of the whole system before forcing termination. */ 175 double systemWorkQuantityForce_; 176 177 /** The workload quantities of all clusters/hubs. */ 178 double* hubWorkQuantities_; 179 180 /** The workload quantities of workers in the cluster to which this proces 181 belongs. */ 182 double* workerWorkQuantities_; 183 184 /** Indicate which worker has been reported its work. */ 185 bool* workerReported_; 186 187 /** Indicate which hub has been reported its work. */ 188 bool* hubReported_; 189 190 /** Indicate whether all hubs have reported status to master at least once.*/ 191 bool allHubReported_; 192 193 /** Whether master do load balance. 0: do; >0: blocked. */ 194 int masterDoBalance_; 195 196 /** Whether a hub do load balance. 0: do; >0: blocked. */ 197 int hubDoBalance_; 198 199 /** To record how many nodes processed for each worker in a cluster. */ 200 int* workerNodeProcesseds_; 201 202 /** To record how many nodes by a cluster. */ 203 int clusterNodeProcessed_; 204 205 /** To record how many nodes processed for each hub */ 206 int* hubNodeProcesseds_; 207 //@} 208 209 /** @name Message counts 210 * 211 */ 212 //@{ 213 /** The number of new messages sent by the process after last survey. */ 214 int sendCount_; 215 216 /** The number of new messages received by the process after last survey.*/ 217 int recvCount_; 218 219 /** The number of new messages sent by the processes in clusterComm_ 220 after last survey.*/ 221 int clusterSendCount_; 222 223 /** The number of new messages received by the processes in clusterComm_ 224 after last survey.*/ 225 int clusterRecvCount_; 226 227 /** The total number of messages sent by the all processes. */ 228 int systemSendCount_; 229 230 /** The total number of messages sent by the all processes. */ 231 int systemRecvCount_; 232 //@} 233 234 /** @name Node index 235 * 236 */ 237 //@{ 238 int masterIndexBatch_; 239 //@} 240 241 /** @name Parallel statistics 242 * 243 */ 244 //@{ 245 /** Master timer */ 246 AlpsTimer masterTimer_; 247 248 /** Hub timer */ 249 AlpsTimer hubTimer_; 250 251 /** Worker timer */ 252 AlpsTimer workerTimer_; 253 254 /** The time spent in ramp up. */ 255 double rampUpTime_; 256 257 /** The time spent in ramp down. */ 258 double rampDownTime_; 259 260 /** The time spent waiting for work. */ 261 double idleTime_; 262 263 /** The time spent processing messages (include idle). */ 264 double msgTime_; 265 266 /** More statistics */ 267 AlpsPsStats psStats_; 268 //@} 269 270 /** Terminate due to reaching limits (time and node) or other reason. */ 271 bool forceTerminate_; 272 273 /** Indicate whether do termination check */ 274 bool blockTermCheck_; 275 276 /** Indicate whether a hub need to report state to master */ 277 bool blockHubReport_; 278 279 /** Indicate whether a worker need to report state to its hub */ 280 bool blockWorkerReport_; 281 282 /** Indicate whether a worker need to as for work from its hub */ 283 bool blockAskForWork_; 284 285 /** Buffer attached to MPI when sharing generated knowledge. */ 286 char *attachBuffer_; 287 288 /** Large message buffer. */ 289 char *largeBuffer_; 290 291 /** Large message buffer. Used for sharing model knowledge */ 292 char *largeBuffer2_; 293 294 /** Small message buffer. */ 295 char *smallBuffer_; 296 297 /** The period that master do load balancing. It changes 298 as search progresses. */ 299 double masterBalancePeriod_; 300 301 /** The period that a hub load balancing and report cluster status. 302 It changes as search progresses. */ 303 double hubReportPeriod_; 304 305 /** The global rank of the process that share generated model knowledge. */ 306 int modelGenID_; 307 308 /** Size of the shared knowledge. */ 309 int modelGenPos_; 310 311 /** A subtree used in during up. */ 312 AlpsSubTree* rampUpSubTree_; 313 314 /** Number of nodes in one unit of work */ 315 int unitWorkNodes_; 316 317 /** Temporily halt search */ 318 int haltSearch_; 319 320 protected: 321 322 /** Initialize member data. */ 323 void init(); 324 325 /** @name Core member functions for master, hubs and workers. 326 */ 327 //@{ 328 /** Master generates subtrees and sends them to hubs in Round-Robin way. 329 Master periodically do inter-cluster load balancing, 330 and termination check. */ 331 void masterMain(AlpsTreeNode* root); 332 333 /** Hub generates subtrees and sends them to workers in Round-Robin way. 334 Hub do intra-cluster load balancing. */ 335 void hubMain(); 336 337 /** Worker first receive subtrees, then start to explore them. Worker also 338 peroidically check message and process message. */ 339 void workerMain(); 340 //@} 341 342 /** Explore a subtree from subtree pool for certain units of work/time.*/ 343 // The same subtree will be explored next time if it still have 344 // unexplored nodes. 345 AlpsReturnStatus doOneUnitWork(int unitWork, 346 double unitTime, 347 AlpsExitStatus & exitStatus, 348 int & numNodesProcessed, 349 int & numNodesBranched, 350 int & numNodesDiscarded, 351 int & numNodesPartial, 352 int & depth, 353 bool & betterSolution); 354 355 /** Processing messages. */ 356 void processMessages(char *&buffer, 357 MPI_Status &status, 358 MPI_Request &request); 359 360 /** Static load balancing: Root Initialization */ 361 void rootInitMaster(AlpsTreeNode* root); 362 void rootInitHub(); 363 void rootInitWorker(); 364 365 /** Static load balancing: spiral */ 366 void spiralMaster(AlpsTreeNode* root); 367 void spiralHub(); 368 void spiralWorker(); 369 370 //------------------------------------------------------ 371 372 /** @name Load balancing member functions 373 */ 374 //@{ 375 /** Master asks a hub to donate its workload to another hub. */ 376 void masterAskHubDonate(int donorID, 377 int receiverID, 378 double receiverWorkload); 379 380 /** Hub asks a worker to donate its workload to another worker. */ 381 void hubAskWorkerDonate(int donorID, 382 int receiverID, 383 double receiverWorkload); 384 385 /** Calculate the work quality and quantity on this process. */ 386 void updateWorkloadInfo(); 387 getNumNodeLeftSystem()388 virtual int getNumNodeLeftSystem() 389 { return static_cast<int>(systemWorkQuantity_); } 390 391 /** A worker donate its workload to the specified worker. */ 392 void donateWork(char*& buf, 393 int tag, 394 MPI_Status* status, 395 int recvID = -1, 396 double recvWL = 0.0); 397 398 /** Hub allocates the donated workload to its workers. */ 399 void hubAllocateDonation(char*& buf, MPI_Status* status); 400 401 /** Hub balances the workloads of its workers. */ 402 void hubBalanceWorkers(); 403 404 /** Hub satisfies the workload rquest from a worker. */ 405 void hubSatisfyWorkerRequest(char*& buf, MPI_Status* status); 406 407 /** A hub reports its status (workload and msg counts) to the master. */ 408 // NOTE: comm is hubComm_ or MPI_COMM_WORLD. 409 void hubReportStatus(int tag, MPI_Comm comm); 410 411 /** A hub unpacks the status of a worker from buffer. */ 412 // NOTE: comm is clusterComm_ or MPI_COMM_WORLD. 413 void hubUpdateCluStatus(char*& buf, MPI_Status* status, MPI_Comm comm); 414 415 /** Two hubs share their workload. */ 416 void hubsShareWork(char*& buf, MPI_Status* status); 417 418 /** Master balance the workload of hubs. */ 419 void masterBalanceHubs(); 420 421 /** Master unpack the status of a hub from buf and update system status.*/ 422 // NOTE: comm is hubComm or MPI_COMM_WORLD. 423 void masterUpdateSysStatus(char*& buf, MPI_Status* status, MPI_Comm comm); 424 425 /** The master re-calculate the system status. */ 426 void refreshSysStatus(); 427 428 /** A hub adds its status to the cluster's status. */ 429 void refreshClusterStatus(); 430 431 /** A worker report its status (workload and msg counts) to its hub. */ 432 // NOTE: comm is clusterComm_ or MPI_COMM_WORLD. 433 void workerReportStatus(int tag, MPI_Comm comm); 434 //@} 435 436 //------------------------------------------------------ 437 438 /** @name Node index functions // msg counts is modified inside 439 * 440 */ 441 //@{ 442 /** A worker ask for node index from master. */ 443 void workerAskIndices(); 444 445 /** A worker receive node index from master. */ 446 void workerRecvIndices(char *&bufLarge); 447 448 /** Master send a batch of node indices to the receiving worker. */ 449 void masterSendIndices(char *&bufLarge); 450 //@} 451 452 //------------------------------------------------------ 453 454 /** @name Other message passing member functions 455 * 456 */ 457 //@{ 458 /** Broadcast the model from source to other processes. */ 459 void broadcastModel(const int id, const int source); 460 461 /** Sent the incumbent value and rank to its two child if eixt */ 462 void sendIncumbent(); 463 464 /** unpack the incumbent value, then store it and the id of the process 465 having the incumbent in AlpsDataPool. */ 466 bool unpackSetIncumbent(char*& buf, MPI_Status* status); 467 468 /** Send the best solution from the process having it to destination. */ 469 void collectBestSolution(int destination); 470 471 /** Inform master that a proc has received workload during a load 472 balance initialized by master. */ 473 void tellMasterRecv(); 474 475 /** Inform hub that a proc has received workload during a load 476 balance initialized by a hub. */ 477 // Not used 478 void tellHubRecv(); 479 480 /** Pack an AlpsEncoded instance into buf. Return filled buf and size of 481 packed message. 482 position: where to start if buf is allocated. 483 */ 484 void packEncoded(AlpsEncoded* enc, 485 char*& buf, 486 int& size, 487 int& position, 488 MPI_Comm comm); 489 490 /** Unpack the given buffer into an AlpsEncoded instance. */ 491 AlpsEncoded* unpackEncoded(char*& buf, 492 int& position, 493 MPI_Comm comm, 494 int size = -1); 495 496 /** Receive the size of buffer, allocate memory for buffer, then 497 receive the message and put it in buffer. */ 498 // NOTE: comm is hubComm_ or clusterComm_ 499 void receiveSizeBuf(char*& buf, 500 int sender, 501 int tag, 502 MPI_Comm comm, 503 MPI_Status* status); 504 505 /** First receive the size and the contend of a node, then construct 506 a subtree with this received node. */ 507 // NOTE: comm is hubComm_ or clusterComm_ 508 void receiveRampUpNode(int sender, 509 MPI_Comm comm, 510 MPI_Status* status); 511 512 /** Receive a subtree from the sender process and add it into 513 the subtree pool.*/ 514 void receiveSubTree(char*& buf, int sender, MPI_Status* status); 515 516 /** Send the size and content of a buffer to the target process. */ 517 // NOTE: comm is hubComm_ or clusterComm_. 518 void sendSizeBuf(char*& buf, 519 int size, 520 int position, 521 const int target, 522 const int tag, 523 MPI_Comm comm); 524 525 /** Send the size and the content of the best node of a given subtree 526 to the target process. */ 527 // NOTE: comm is hubComm_ or clusterComm_. 528 void sendRampUpNode(const int target, MPI_Comm comm); 529 530 /** Send a node from rampUpSubTree's node pool and generated model 531 knowledge */ 532 void sendNodeModelGen(int receiver, int doUnitWork); 533 534 /** Send a given subtree to the target process. */ 535 bool sendSubTree(const int target, AlpsSubTree*& st, int tag); 536 537 /** Send finish initialization signal to the target process. */ 538 // NOTE: comm is hubComm_ or clusterComm_. 539 void sendFinishInit(const int target, MPI_Comm comm); 540 //@} 541 542 /** Delete subTrees in pools and the active subtree. */ 543 void deleteSubTrees(); 544 545 546 void forwardModelKnowledge(); 547 548 /** Set generated knowlege (related to model) to receiver. */ 549 // NOTE: comm is hubComm_ or MPI_COMM_WORLD. 550 void sendModelKnowledge(MPI_Comm comm, int receiver=-1); 551 552 /** Receive generated knowlege (related to model) from sender. */ 553 // NOTE: comm is hubComm_ or MPI_COMM_WORLD. 554 void receiveModelKnowledge(MPI_Comm comm); 555 556 /** @name Change message counts functions 557 */ 558 //@{ 559 /** Increment the number of sent message. */ 560 void incSendCount(const char* how, int s = 1); 561 /** Decrement the number of sent message. */ 562 void decSendCount(const char* how, int s = 1); 563 /** Increment the number of received message. */ 564 void incRecvCount(const char* how, int s = 1); 565 /** Decrement the number of sent message. */ 566 void decRecvCount(const char* how, int s = 1); 567 //@} 568 569 /** Master tell hubs to terminate due to reaching limits or other reason.*/ 570 void masterForceHubTerm(); 571 572 /** Hub tell workers to terminate due to reaching limits or other reason.*/ 573 void hubForceWorkerTerm(); 574 575 /** Change subtree to be explored if it is too worse. */ 576 void changeWorkingSubTree(double & changeWorkThreshold); 577 578 /** Send error code to master. */ 579 void sendErrorCodeToMaster(int errorCode); 580 581 /** Receive error code and set solution status. */ 582 void recvErrorCode(char *& bufLarge); 583 584 /** Unpack the node, explore it and send load info to master. */ 585 void spiralRecvProcessNode(); 586 587 /** Unpack msg and donate a node. */ 588 void spiralDonateNode(); 589 590 public: 591 592 /** Default construtor. 593 NOTE: must call initializeSearch() later. */ AlpsKnowledgeBrokerMPI()594 AlpsKnowledgeBrokerMPI() 595 : 596 AlpsKnowledgeBroker() 597 { 598 init(); 599 } 600 601 /** Useful construtor. */ AlpsKnowledgeBrokerMPI(int argc,char * argv[],AlpsModel & model)602 AlpsKnowledgeBrokerMPI(int argc, 603 char* argv[], 604 AlpsModel& model) 605 : 606 AlpsKnowledgeBroker() 607 { 608 init(); 609 initializeSearch(argc, argv, model); 610 } 611 612 /** Destructor. */ 613 ~AlpsKnowledgeBrokerMPI(); 614 615 /** Query the global rank of the process. */ getProcRank()616 virtual int getProcRank() const { return globalRank_; } 617 618 /** Query the global rank of the Master. */ getMasterRank()619 virtual int getMasterRank() const { return masterRank_; } 620 621 /** Query the type (master, hub, or worker) of the process. */ getProcType()622 virtual AlpsProcessType getProcType() const { return processType_; } 623 624 /** This function 625 * <ul> 626 * <li> initializes the message environment; 627 * <li> the master reads in ALPS and user's parameter sets. If the model 628 * data is input from file, then it reads in the model data. 629 * <li> sets up user params and model; 630 * <li> broadcast parameters from the master to all other processes; 631 * <li> creates MPI communicators and groups; 632 * <li> classifies process types, sets up subtree and pools 633 * <li> determines their hub's global rank for workers 634 * </ul> 635 */ 636 void initializeSearch(int argc, char* argv[], AlpsModel& model); 637 638 /** Search best solution for a given model. */ 639 void search(AlpsModel *model); 640 641 /** This function 642 * <ul> 643 * <li> broadcasts model data from the master to all other processes; 644 * <li> calls its associated main function to explore the sub tree; 645 * <li> collects the best solution found. 646 * </ul> 647 */ 648 void rootSearch(AlpsTreeNode* root); 649 650 /** @name Report search results. */ 651 //@{ 652 /** The process queries the quality of the incumbent this process stores. */ getIncumbentValue()653 virtual double getIncumbentValue() const { 654 double bestObj = ALPS_OBJ_MAX; 655 if (AlpsKnowledgeBroker::hasKnowledge(AlpsKnowledgeTypeSolution)) { 656 bestObj = getBestKnowledge(AlpsKnowledgeTypeSolution).second; 657 if (incumbentValue_ > bestObj) { 658 return bestObj; 659 } 660 } 661 return incumbentValue_; 662 } 663 664 /** The master queries the quality of the best solution it knowns. */ 665 //todo(aykut) why not this returns to incumbentValue_ getBestQuality()666 virtual double getBestQuality() const { 667 if (globalRank_ == masterRank_) { 668 if (getNumKnowledges(AlpsKnowledgeTypeSolution) > 0) { 669 return getBestKnowledge(AlpsKnowledgeTypeSolution).second; 670 } 671 else { 672 return ALPS_OBJ_MAX; 673 } 674 } 675 else { 676 return ALPS_OBJ_MAX; 677 } 678 } 679 680 /** Get best estimalted quality in system. */ getBestEstimateQuality()681 virtual double getBestEstimateQuality() { return systemWorkQuality_; } 682 683 /** Master prints out the best solution that it knows. */ 684 virtual void printBestSolution(char* outputFile = 0) const; 685 686 /** Log search statistics. */ 687 virtual void searchLog(); 688 //@} 689 690 //------------------------------------------------------ 691 692 /** @name Knowledge sharing functions 693 */ 694 //@{ 695 /** Set knowlege. */ 696 void sendKnowledge(AlpsKnowledgeType type, 697 int sender, 698 int receiver, 699 char *& msgBuffer, 700 int msgSize, 701 int msgTag, 702 MPI_Comm comm, 703 bool blocking); 704 705 /** Receive knowlege. */ 706 void receiveKnowledge(AlpsKnowledgeType type, 707 int sender, 708 int receiver, 709 char *& msgBuffer, 710 int msgSize, 711 int msgTag, 712 MPI_Comm comm, 713 MPI_Status* status, 714 bool blocking); 715 716 /** Request knowlege. */ 717 void requestKnowledge(AlpsKnowledgeType type, 718 int sender, 719 int receiver, 720 char *& msgBuffer, 721 int msgSize, 722 int msgTag, 723 MPI_Comm comm, 724 bool blocking); 725 //@} 726 727 }; 728 #endif 729 730 //############################################################################# 731