1 2 #ifndef AFQMC_TASK_GROUP_H 3 #define AFQMC_TASK_GROUP_H 4 5 #include <vector> 6 #include <string> 7 #include <map> 8 #include <ctime> 9 #include <sys/time.h> 10 #include <cstdlib> 11 #include <ctype.h> 12 #include <algorithm> 13 #include <iostream> 14 #include <ostream> 15 #include <mpi.h> 16 17 #include "AFQMC/config.h" 18 #include "AFQMC/Memory/utilities.hpp" 19 #include "mpi3/communicator.hpp" 20 #include "mpi3/shared_communicator.hpp" 21 22 namespace qmcplusplus 23 { 24 namespace afqmc 25 { 26 using communicator = boost::mpi3::communicator; 27 using shared_communicator = boost::mpi3::shared_communicator; 28 29 class GlobalTaskGroup 30 { 31 public: GlobalTaskGroup(communicator & comm)32 GlobalTaskGroup(communicator& comm) 33 : global_(comm), 34 //#ifdef ENABLE_CUDA 35 // PAMI_DISABLE_IPC issue 36 // node_(comm.split_shared(boost::mpi3::communicator_type::socket,comm.rank())), 37 //#else 38 node_(comm.split_shared(comm.rank())), 39 //#endif 40 core_(comm.split(node_.rank(), comm.rank())) 41 { 42 // app_log()<<std::endl 43 // <<"**************************************************************\n" 44 // <<" Setting up Global Task Group " <<std::endl; 45 46 // check for consistency 47 int dum = node_.size(); 48 49 global_.broadcast_n(&dum, 1, 0); 50 if (dum != node_.size()) 51 { 52 app_error() << " Error: Inconsistent number of cores in node: " << dum << " " << node_.size() << " " 53 << global_.rank() << std::endl; 54 APP_ABORT(" Error in GlobalTaskGroup::GlobalTaskGroup() \n"); 55 } 56 57 // consistency checks 58 dum = core_.size(); 59 global_.broadcast_n(&dum, 1, 0); 60 if (dum != core_.size()) 61 { 62 app_error() << " Error: Inconsistent number of nodes: " << dum << " " << core_.size() << " " << global_.rank() 63 << std::endl; 64 APP_ABORT(" Error in GlobalTaskGroup::GlobalTaskGroup() \n"); 65 } 66 dum = core_.rank(); 67 node_.broadcast_n(&dum, 1, 0); 68 if (dum != core_.rank()) 69 { 70 app_error() << " Error: Inconsistent node number: " << dum << " " << core_.rank() << " " << global_.rank() 71 << std::endl; 72 APP_ABORT(" Error in GlobalTaskGroup::GlobalTaskGroup() \n"); 73 } 74 75 // app_log()<<"**************************************************************" <<std::endl; 76 } 77 78 ~GlobalTaskGroup() = default; 79 80 // Not sure if this is necessary, but I want to keep control of the number of communicators 81 GlobalTaskGroup(const GlobalTaskGroup& other) = delete; 82 GlobalTaskGroup(GlobalTaskGroup&& other) = delete; 83 GlobalTaskGroup& operator=(const GlobalTaskGroup& other) = delete; 84 GlobalTaskGroup& operator=(GlobalTaskGroup&& other) = delete; 85 getGlobalRank()86 int getGlobalRank() const { return global_.rank(); } 87 getGlobalSize()88 int getGlobalSize() const { return global_.size(); } 89 getTotalNodes()90 int getTotalNodes() const { return core_.size(); } 91 getTotalCores()92 int getTotalCores() const { return node_.size(); } 93 getNodeID()94 int getNodeID() const { return core_.rank(); } 95 getCoreID()96 int getCoreID() const { return node_.rank(); } 97 98 // over full TG using mpi communicator global_barrier()99 void global_barrier() { global_.barrier(); } 100 node_barrier()101 void node_barrier() { node_.barrier(); } 102 Global()103 communicator& Global() { return global_; } Node()104 shared_communicator& Node() { return node_; } Cores()105 communicator& Cores() { return core_; } 106 107 private: 108 // communicators 109 communicator& global_; // global communicator (e.g. MPI_COMM_WORLD or some sub-partition) 110 shared_communicator node_; // communicator with all cores in a SHM node 111 communicator core_; // "horizontal" comm, all cores with the same position on a node 112 }; 113 114 class TaskGroup_ 115 { 116 public: TaskGroup_(GlobalTaskGroup & gTG,std::string name,int nn,int nc)117 TaskGroup_(GlobalTaskGroup& gTG, std::string name, int nn, int nc) 118 : tgname(name), 119 global_(gTG.Global()), 120 node_(gTG.Node()), 121 core_(gTG.Cores()), 122 #if defined(ENABLE_CUDA) || defined(ENABLE_HIP) 123 local_tg_(node_.split(node_.rank(), node_.rank())), 124 tgrp_(), 125 tgrp_cores_(), 126 tg_heads_(global_.split(0, global_.rank())) 127 #else 128 local_tg_(node_.split(node_.rank() / ((nc < 1) ? (1) : (std::min(nc, node_.size()))), node_.rank())), 129 tgrp_(), 130 tgrp_cores_(), 131 tg_heads_(global_.split(node_.rank() % ((nc < 1) ? (1) : (std::min(nc, node_.size()))), global_.rank())) 132 #endif 133 { 134 #if defined(ENABLE_CUDA) || defined(ENABLE_HIP) 135 if (nc != 1) 136 { 137 nc = 1; 138 app_log() << " WARNING: Code was compiled with ENABLE_CUDA or ENABLE_HIP, setting ncores=1. \n"; 139 } 140 #endif 141 setup(nn, nc); 142 } 143 TaskGroup_(TaskGroup_ & other,std::string name,int nn,int nc)144 TaskGroup_(TaskGroup_& other, std::string name, int nn, int nc) 145 : tgname(name), 146 global_(other.Global()), 147 node_(other.Node()), 148 core_(other.Cores()), 149 #if defined(ENABLE_CUDA) || defined(ENABLE_HIP) 150 local_tg_(node_.split(node_.rank(), node_.rank())), 151 tgrp_(), 152 tgrp_cores_(), 153 tg_heads_(global_.split(0, global_.rank())) 154 #else 155 local_tg_(node_.split(node_.rank() / ((nc < 1) ? (1) : (std::min(nc, node_.size()))), node_.rank())), 156 tgrp_(), 157 tgrp_cores_(), 158 tg_heads_(global_.split(node_.rank() % ((nc < 1) ? (1) : (std::min(nc, node_.size()))), global_.rank())) 159 #endif 160 { 161 #if defined(ENABLE_CUDA) || defined(ENABLE_HIP) 162 if (nc != 1) 163 { 164 nc = 1; 165 app_log() << " WARNING: Code was compiled with ENABLE_CUDA or ENABLE_HIP, setting ncores=1. \n"; 166 } 167 #endif 168 setup(nn, nc); 169 } 170 171 ~TaskGroup_() = default; 172 173 TaskGroup_(const TaskGroup_& other) = delete; TaskGroup_(TaskGroup_ && other)174 TaskGroup_(TaskGroup_&& other) 175 : global_(other.global_), 176 node_(other.node_), 177 core_(other.core_), 178 local_tg_(other.node_.split(0, other.node_.rank())), // inefficient, but needed to get around lack of 179 // default constructor in shared_communicator 180 tgrp_(), 181 tgrp_cores_(), 182 tg_heads_() 183 { 184 *this = std::move(other); 185 } 186 TaskGroup_& operator=(const TaskGroup_& other) = delete; 187 TaskGroup_& operator =(TaskGroup_&& other) 188 { 189 if (this != &other) 190 { 191 tgname = other.tgname; 192 TG_number = other.TG_number; 193 number_of_TGs = other.number_of_TGs; 194 nnodes_per_TG = other.nnodes_per_TG; 195 next_core_circular_node_pattern = other.next_core_circular_node_pattern; 196 prev_core_circular_node_pattern = other.prev_core_circular_node_pattern; 197 local_tg_ = std::move(other.local_tg_); 198 tgrp_ = std::move(other.tgrp_); 199 tgrp_cores_ = std::move(other.tgrp_cores_); 200 tg_heads_ = std::move(other.tg_heads_); 201 #ifdef ENABLE_CUDA 202 #ifdef BUILD_AFQMC_WITH_NCCL 203 nccl_TGcomm_ = std::move(other.nccl_TGcomm_); 204 nccl_Stream_ = std::move(other.nccl_Stream_); 205 #endif 206 #endif 207 } 208 return *this; 209 } 210 getGlobalRank()211 int getGlobalRank() const { return global_.rank(); } 212 getGlobalSize()213 int getGlobalSize() const { return global_.size(); } 214 getTotalNodes()215 int getTotalNodes() const { return core_.size(); } 216 getTotalCores()217 int getTotalCores() const { return node_.size(); } 218 getNodeID()219 int getNodeID() const { return core_.rank(); } 220 getCoreID()221 int getCoreID() const { return node_.rank(); } 222 getLocalTGRank()223 int getLocalTGRank() const { return local_tg_.rank(); } 224 getNCoresPerTG()225 int getNCoresPerTG() const { return local_tg_.size(); } 226 getNGroupsPerTG()227 int getNGroupsPerTG() const { return nnodes_per_TG; } 228 229 // THIS NEEDS TO CHANGE IN GPU CODE! 230 #if defined(ENABLE_CUDA) || defined(ENABLE_HIP) getLocalGroupNumber()231 int getLocalGroupNumber() const { return getTGRank(); } 232 #else getLocalGroupNumber()233 int getLocalGroupNumber() const { return core_.rank() % nnodes_per_TG; } 234 #endif 235 getTGNumber()236 int getTGNumber() const { return TG_number; } 237 getNumberOfTGs()238 int getNumberOfTGs() const { return number_of_TGs; } 239 getTGRank()240 int getTGRank() const { return tgrp_.rank(); } 241 getTGSize()242 int getTGSize() const { return tgrp_.size(); } 243 244 // over full TG using mpi communicator global_barrier()245 void global_barrier() { global_.barrier(); } 246 247 // over local node local_barrier()248 void local_barrier() { local_tg_.barrier(); } 249 node_barrier()250 void node_barrier() { node_.barrier(); } 251 barrier()252 void barrier() { tgrp_.barrier(); } 253 Global()254 communicator& Global() { return global_; } Node()255 shared_communicator& Node() { return node_; } Cores()256 communicator& Cores() { return core_; } TG_local()257 shared_communicator& TG_local() { return local_tg_; } TG_heads()258 communicator& TG_heads() { return tg_heads_; } TG()259 communicator& TG() { return tgrp_; } TG_Cores()260 communicator& TG_Cores() { return tgrp_cores_; } 261 #ifdef ENABLE_CUDA 262 #ifdef BUILD_AFQMC_WITH_NCCL ncclStream()263 cudaStream_t& ncclStream() { return nccl_Stream_; } ncclTG()264 ncclComm_t& ncclTG() { return nccl_TGcomm_; } 265 #endif 266 #endif 267 next_core()268 int next_core() const { return next_core_circular_node_pattern; } prev_core()269 int prev_core() const { return prev_core_circular_node_pattern; } 270 271 private: 272 std::string tgname; 273 274 int TG_number; 275 int number_of_TGs; 276 int nnodes_per_TG; 277 278 int next_core_circular_node_pattern; 279 int prev_core_circular_node_pattern; 280 281 // communicators (from Global) 282 communicator& global_; // global communicator (e.g. MPI_COMM_WORLD or some sub-partition) 283 shared_communicator& node_; // communicator with all cores in a SHM node 284 communicator& core_; // "horizontal" comm, all cores with the same position on a node 285 // communicators (for this TG) 286 shared_communicator local_tg_; // all the cores in a node in the same TG 287 communicator tgrp_; 288 communicator tgrp_cores_; 289 communicator tg_heads_; 290 #ifdef ENABLE_CUDA 291 #ifdef BUILD_AFQMC_WITH_NCCL 292 cudaStream_t nccl_Stream_; 293 ncclComm_t nccl_TGcomm_; 294 #endif 295 #endif 296 setup(int nn,int nc)297 void setup(int nn, int nc) 298 { 299 // app_log()<<std::endl 300 // <<"**************************************************************\n" 301 // <<" Setting up Task Group: " <<tgname <<std::endl; 302 303 int ncores_per_TG = (nc < 1) ? (1) : (std::min(nc, node_.size())); 304 305 // now setup local_tg_ 306 307 if (node_.size() % ncores_per_TG != 0) 308 { 309 app_error() << "Found " << node_.size() << " cores per node. " << std::endl; 310 app_error() << " Error in TaskGroup setup(): Number of cores per node is not divisible by requested number of " 311 "cores in Task Group." 312 << std::endl; 313 APP_ABORT(" Error in TaskGroup::TaskGroup() \n"); 314 } 315 316 if (local_tg_.size() != ncores_per_TG) 317 { 318 app_log() << nn << " " << nc << std::endl; 319 app_error() << "Problems creating local TG: " << local_tg_.size() << " " << ncores_per_TG << std::endl; 320 APP_ABORT(" Error in TaskGroup::TaskGroup() \n"); 321 } 322 323 int ndevices(number_of_devices()); 324 325 if (ndevices == 0) 326 { 327 nnodes_per_TG = (nn < 1) ? (1) : (std::min(nn, core_.size())); 328 329 // assign groups from different nodes to TGs 330 if (core_.size() % nnodes_per_TG != 0) 331 { 332 app_error() << "Found " << core_.size() << " nodes. " << std::endl; 333 app_error() << " Error in TaskGroup setup(): Number of nodes is not divisible by requested number of nodes in " 334 "Task Group." 335 << std::endl; 336 APP_ABORT(" Error in TaskGroup_::TaskGroup_() \n"); 337 } 338 339 int myrow, mycol, nrows, ncols, node_in_TG; 340 // setup TG grid 341 nrows = node_.size() / ncores_per_TG; 342 ncols = core_.size() / nnodes_per_TG; 343 mycol = core_.rank() / nnodes_per_TG; // simple square asignment 344 node_in_TG = core_.rank() % nnodes_per_TG; 345 myrow = node_.rank() / ncores_per_TG; 346 TG_number = mycol + ncols * myrow; 347 number_of_TGs = nrows * ncols; 348 349 // split communicator 350 tgrp_ = global_.split(TG_number, global_.rank()); 351 352 if (tgrp_.rank() != node_in_TG * ncores_per_TG + local_tg_.rank()) 353 { 354 app_error() << " Error in TG::setup(): Unexpected TG_rank: " << tgrp_.rank() << " " << node_in_TG << " " 355 << local_tg_.rank() << " " << node_in_TG * ncores_per_TG + local_tg_.rank() << std::endl; 356 APP_ABORT("Error in TG::setup(): Unexpected TG_rank \n"); 357 } 358 359 // define ring communication pattern 360 // these are the ranks (in TGcomm) of cores with the same core_rank 361 // on nodes with id +-1 (with respect to local node). 362 next_core_circular_node_pattern = ((node_in_TG + 1) % nnodes_per_TG) * ncores_per_TG + local_tg_.rank(); 363 if (node_in_TG == 0) 364 prev_core_circular_node_pattern = (nnodes_per_TG - 1) * ncores_per_TG + local_tg_.rank(); 365 else 366 prev_core_circular_node_pattern = ((node_in_TG - 1) % nnodes_per_TG) * ncores_per_TG + local_tg_.rank(); 367 } 368 else 369 { // ndevices > 0 370 // assign groups from the same node to TGs 371 372 nnodes_per_TG = (nn < 1) ? (1) : (std::min(nn, global_.size())); 373 374 if (ncores_per_TG > 1) 375 { 376 app_error() << " Error in TaskGroup setup(): ncores > 1 incompatible with ndevices>0." << std::endl; 377 APP_ABORT(" Error in TaskGroup_::TaskGroup_() \n"); 378 } 379 // limiting to full nodes if nnodes_per_TG > ndevices 380 if (global_.size() % nnodes_per_TG != 0) 381 { 382 app_error() << "Found " << global_.size() << " MPI tasks. " << std::endl; 383 app_error() << " Error in TaskGroup setup(): Number of MPI tasks is not divisible by requested number of " 384 "groups in Task Group." 385 << std::endl; 386 APP_ABORT(" Error in TaskGroup_::TaskGroup_() \n"); 387 } 388 389 // MAM: How do I check that distribution of ranks is made along a node first??? 390 // Not sure how to check, but print warning if nnodes>1 and ranks are 391 // distributed over nodes first (e.g. adjacent ranks in Global are in different nodes) 392 // This would kill performance!!! 393 TG_number = global_.rank() / nnodes_per_TG; 394 number_of_TGs = global_.size() / nnodes_per_TG; 395 396 // split communicator 397 tgrp_ = global_.split(TG_number, global_.rank()); 398 399 // define ring communication pattern 400 next_core_circular_node_pattern = (tgrp_.rank() + 1) % tgrp_.size(); 401 if (tgrp_.rank() == 0) 402 prev_core_circular_node_pattern = tgrp_.size() - 1; 403 else 404 prev_core_circular_node_pattern = tgrp_.rank() - 1; 405 } 406 407 #ifdef ENABLE_CUDA 408 #ifdef BUILD_AFQMC_WITH_NCCL 409 qmc_cuda::cuda_check(cudaStreamCreate(&nccl_Stream_), "cudaStreamCreate(&s)"); 410 { 411 ncclUniqueId id; 412 if (tgrp_.rank() == 0) 413 ncclGetUniqueId(&id); 414 MPI_Bcast((void*)&id, sizeof(id), MPI_BYTE, 0, tgrp_.get()); 415 NCCLCHECK(ncclCommInitRank(&nccl_TGcomm_, tgrp_.size(), id, tgrp_.rank())); 416 } 417 #endif 418 #endif 419 420 // split communicator 421 tgrp_cores_ = tgrp_.split(getLocalTGRank(), getLocalGroupNumber()); 422 } 423 }; 424 425 class TaskGroupHandler 426 { 427 public: TaskGroupHandler(afqmc::GlobalTaskGroup & gtg_,int nc)428 TaskGroupHandler(afqmc::GlobalTaskGroup& gtg_, int nc) : gTG_(gtg_), ncores(nc) {} 429 ~TaskGroupHandler()430 ~TaskGroupHandler() {} 431 getTG(int nn)432 TaskGroup_& getTG(int nn) 433 { 434 if (ncores <= 0) 435 APP_ABORT(" Error: Calling TaskGroupHandler::getTG() before setting ncores. \n\n\n"); 436 auto t = TGMap.find(nn); 437 if (t == TGMap.end()) 438 { 439 auto p = TGMap.insert( 440 std::make_pair(nn, afqmc::TaskGroup_(gTG_, std::string("TaskGroup_") + std::to_string(nn), nn, ncores))); 441 if (!p.second) 442 APP_ABORT(" Error: Problems creating new TG in TaskGroupHandler::getTG(int). \n"); 443 return (p.first)->second; 444 } 445 return t->second; 446 } 447 setNCores(int nc)448 void setNCores(int nc) { ncores = nc; } 449 gTG()450 afqmc::GlobalTaskGroup& gTG() { return gTG_; } 451 452 private: 453 afqmc::GlobalTaskGroup& gTG_; 454 455 int ncores; 456 457 std::map<int, afqmc::TaskGroup_> TGMap; 458 }; 459 460 461 } // namespace afqmc 462 463 } // namespace qmcplusplus 464 465 466 #endif 467