1 
2 #ifndef AFQMC_TASK_GROUP_H
3 #define AFQMC_TASK_GROUP_H
4 
5 #include <vector>
6 #include <string>
7 #include <map>
8 #include <ctime>
9 #include <sys/time.h>
10 #include <cstdlib>
11 #include <ctype.h>
12 #include <algorithm>
13 #include <iostream>
14 #include <ostream>
15 #include <mpi.h>
16 
17 #include "AFQMC/config.h"
18 #include "AFQMC/Memory/utilities.hpp"
19 #include "mpi3/communicator.hpp"
20 #include "mpi3/shared_communicator.hpp"
21 
22 namespace qmcplusplus
23 {
24 namespace afqmc
25 {
26 using communicator        = boost::mpi3::communicator;
27 using shared_communicator = boost::mpi3::shared_communicator;
28 
29 class GlobalTaskGroup
30 {
31 public:
GlobalTaskGroup(communicator & comm)32   GlobalTaskGroup(communicator& comm)
33       : global_(comm),
34         //#ifdef ENABLE_CUDA
35         // PAMI_DISABLE_IPC issue
36         //            node_(comm.split_shared(boost::mpi3::communicator_type::socket,comm.rank())),
37         //#else
38         node_(comm.split_shared(comm.rank())),
39         //#endif
40         core_(comm.split(node_.rank(), comm.rank()))
41   {
42     //    app_log()<<std::endl
43     //             <<"**************************************************************\n"
44     //             <<" Setting up Global Task Group " <<std::endl;
45 
46     // check for consistency
47     int dum = node_.size();
48 
49     global_.broadcast_n(&dum, 1, 0);
50     if (dum != node_.size())
51     {
52       app_error() << " Error: Inconsistent number of cores in node: " << dum << " " << node_.size() << " "
53                   << global_.rank() << std::endl;
54       APP_ABORT(" Error in GlobalTaskGroup::GlobalTaskGroup() \n");
55     }
56 
57     // consistency checks
58     dum = core_.size();
59     global_.broadcast_n(&dum, 1, 0);
60     if (dum != core_.size())
61     {
62       app_error() << " Error: Inconsistent number of nodes: " << dum << " " << core_.size() << " " << global_.rank()
63                   << std::endl;
64       APP_ABORT(" Error in GlobalTaskGroup::GlobalTaskGroup() \n");
65     }
66     dum = core_.rank();
67     node_.broadcast_n(&dum, 1, 0);
68     if (dum != core_.rank())
69     {
70       app_error() << " Error: Inconsistent node number: " << dum << " " << core_.rank() << " " << global_.rank()
71                   << std::endl;
72       APP_ABORT(" Error in GlobalTaskGroup::GlobalTaskGroup() \n");
73     }
74 
75     //    app_log()<<"**************************************************************" <<std::endl;
76   }
77 
78   ~GlobalTaskGroup() = default;
79 
80   // Not sure if this is necessary, but I want to keep control of the number of communicators
81   GlobalTaskGroup(const GlobalTaskGroup& other) = delete;
82   GlobalTaskGroup(GlobalTaskGroup&& other)      = delete;
83   GlobalTaskGroup& operator=(const GlobalTaskGroup& other) = delete;
84   GlobalTaskGroup& operator=(GlobalTaskGroup&& other) = delete;
85 
getGlobalRank()86   int getGlobalRank() const { return global_.rank(); }
87 
getGlobalSize()88   int getGlobalSize() const { return global_.size(); }
89 
getTotalNodes()90   int getTotalNodes() const { return core_.size(); }
91 
getTotalCores()92   int getTotalCores() const { return node_.size(); }
93 
getNodeID()94   int getNodeID() const { return core_.rank(); }
95 
getCoreID()96   int getCoreID() const { return node_.rank(); }
97 
98   // over full TG using mpi communicator
global_barrier()99   void global_barrier() { global_.barrier(); }
100 
node_barrier()101   void node_barrier() { node_.barrier(); }
102 
Global()103   communicator& Global() { return global_; }
Node()104   shared_communicator& Node() { return node_; }
Cores()105   communicator& Cores() { return core_; }
106 
107 private:
108   // communicators
109   communicator& global_;     // global communicator (e.g. MPI_COMM_WORLD or some sub-partition)
110   shared_communicator node_; // communicator with all cores in a SHM node
111   communicator core_;        // "horizontal" comm, all cores with the same position on a node
112 };
113 
114 class TaskGroup_
115 {
116 public:
TaskGroup_(GlobalTaskGroup & gTG,std::string name,int nn,int nc)117   TaskGroup_(GlobalTaskGroup& gTG, std::string name, int nn, int nc)
118       : tgname(name),
119         global_(gTG.Global()),
120         node_(gTG.Node()),
121         core_(gTG.Cores()),
122 #if defined(ENABLE_CUDA) || defined(ENABLE_HIP)
123         local_tg_(node_.split(node_.rank(), node_.rank())),
124         tgrp_(),
125         tgrp_cores_(),
126         tg_heads_(global_.split(0, global_.rank()))
127 #else
128         local_tg_(node_.split(node_.rank() / ((nc < 1) ? (1) : (std::min(nc, node_.size()))), node_.rank())),
129         tgrp_(),
130         tgrp_cores_(),
131         tg_heads_(global_.split(node_.rank() % ((nc < 1) ? (1) : (std::min(nc, node_.size()))), global_.rank()))
132 #endif
133   {
134 #if defined(ENABLE_CUDA) || defined(ENABLE_HIP)
135     if (nc != 1)
136     {
137       nc = 1;
138       app_log() << " WARNING: Code was compiled with ENABLE_CUDA or ENABLE_HIP, setting ncores=1. \n";
139     }
140 #endif
141     setup(nn, nc);
142   }
143 
TaskGroup_(TaskGroup_ & other,std::string name,int nn,int nc)144   TaskGroup_(TaskGroup_& other, std::string name, int nn, int nc)
145       : tgname(name),
146         global_(other.Global()),
147         node_(other.Node()),
148         core_(other.Cores()),
149 #if defined(ENABLE_CUDA) || defined(ENABLE_HIP)
150         local_tg_(node_.split(node_.rank(), node_.rank())),
151         tgrp_(),
152         tgrp_cores_(),
153         tg_heads_(global_.split(0, global_.rank()))
154 #else
155         local_tg_(node_.split(node_.rank() / ((nc < 1) ? (1) : (std::min(nc, node_.size()))), node_.rank())),
156         tgrp_(),
157         tgrp_cores_(),
158         tg_heads_(global_.split(node_.rank() % ((nc < 1) ? (1) : (std::min(nc, node_.size()))), global_.rank()))
159 #endif
160   {
161 #if defined(ENABLE_CUDA) || defined(ENABLE_HIP)
162     if (nc != 1)
163     {
164       nc = 1;
165       app_log() << " WARNING: Code was compiled with ENABLE_CUDA or ENABLE_HIP, setting ncores=1. \n";
166     }
167 #endif
168     setup(nn, nc);
169   }
170 
171   ~TaskGroup_() = default;
172 
173   TaskGroup_(const TaskGroup_& other) = delete;
TaskGroup_(TaskGroup_ && other)174   TaskGroup_(TaskGroup_&& other)
175       : global_(other.global_),
176         node_(other.node_),
177         core_(other.core_),
178         local_tg_(other.node_.split(0, other.node_.rank())), // inefficient, but needed to get around lack of
179                                                              // default constructor in shared_communicator
180         tgrp_(),
181         tgrp_cores_(),
182         tg_heads_()
183   {
184     *this = std::move(other);
185   }
186   TaskGroup_& operator=(const TaskGroup_& other) = delete;
187   TaskGroup_& operator                           =(TaskGroup_&& other)
188   {
189     if (this != &other)
190     {
191       tgname                          = other.tgname;
192       TG_number                       = other.TG_number;
193       number_of_TGs                   = other.number_of_TGs;
194       nnodes_per_TG                   = other.nnodes_per_TG;
195       next_core_circular_node_pattern = other.next_core_circular_node_pattern;
196       prev_core_circular_node_pattern = other.prev_core_circular_node_pattern;
197       local_tg_                       = std::move(other.local_tg_);
198       tgrp_                           = std::move(other.tgrp_);
199       tgrp_cores_                     = std::move(other.tgrp_cores_);
200       tg_heads_                       = std::move(other.tg_heads_);
201 #ifdef ENABLE_CUDA
202 #ifdef BUILD_AFQMC_WITH_NCCL
203       nccl_TGcomm_ = std::move(other.nccl_TGcomm_);
204       nccl_Stream_ = std::move(other.nccl_Stream_);
205 #endif
206 #endif
207     }
208     return *this;
209   }
210 
getGlobalRank()211   int getGlobalRank() const { return global_.rank(); }
212 
getGlobalSize()213   int getGlobalSize() const { return global_.size(); }
214 
getTotalNodes()215   int getTotalNodes() const { return core_.size(); }
216 
getTotalCores()217   int getTotalCores() const { return node_.size(); }
218 
getNodeID()219   int getNodeID() const { return core_.rank(); }
220 
getCoreID()221   int getCoreID() const { return node_.rank(); }
222 
getLocalTGRank()223   int getLocalTGRank() const { return local_tg_.rank(); }
224 
getNCoresPerTG()225   int getNCoresPerTG() const { return local_tg_.size(); }
226 
getNGroupsPerTG()227   int getNGroupsPerTG() const { return nnodes_per_TG; }
228 
229 // THIS NEEDS TO CHANGE IN GPU CODE!
230 #if defined(ENABLE_CUDA) || defined(ENABLE_HIP)
getLocalGroupNumber()231   int getLocalGroupNumber() const { return getTGRank(); }
232 #else
getLocalGroupNumber()233   int getLocalGroupNumber() const { return core_.rank() % nnodes_per_TG; }
234 #endif
235 
getTGNumber()236   int getTGNumber() const { return TG_number; }
237 
getNumberOfTGs()238   int getNumberOfTGs() const { return number_of_TGs; }
239 
getTGRank()240   int getTGRank() const { return tgrp_.rank(); }
241 
getTGSize()242   int getTGSize() const { return tgrp_.size(); }
243 
244   // over full TG using mpi communicator
global_barrier()245   void global_barrier() { global_.barrier(); }
246 
247   // over local node
local_barrier()248   void local_barrier() { local_tg_.barrier(); }
249 
node_barrier()250   void node_barrier() { node_.barrier(); }
251 
barrier()252   void barrier() { tgrp_.barrier(); }
253 
Global()254   communicator& Global() { return global_; }
Node()255   shared_communicator& Node() { return node_; }
Cores()256   communicator& Cores() { return core_; }
TG_local()257   shared_communicator& TG_local() { return local_tg_; }
TG_heads()258   communicator& TG_heads() { return tg_heads_; }
TG()259   communicator& TG() { return tgrp_; }
TG_Cores()260   communicator& TG_Cores() { return tgrp_cores_; }
261 #ifdef ENABLE_CUDA
262 #ifdef BUILD_AFQMC_WITH_NCCL
ncclStream()263   cudaStream_t& ncclStream() { return nccl_Stream_; }
ncclTG()264   ncclComm_t& ncclTG() { return nccl_TGcomm_; }
265 #endif
266 #endif
267 
next_core()268   int next_core() const { return next_core_circular_node_pattern; }
prev_core()269   int prev_core() const { return prev_core_circular_node_pattern; }
270 
271 private:
272   std::string tgname;
273 
274   int TG_number;
275   int number_of_TGs;
276   int nnodes_per_TG;
277 
278   int next_core_circular_node_pattern;
279   int prev_core_circular_node_pattern;
280 
281   // communicators (from Global)
282   communicator& global_;      // global communicator (e.g. MPI_COMM_WORLD or some sub-partition)
283   shared_communicator& node_; // communicator with all cores in a SHM node
284   communicator& core_;        // "horizontal" comm, all cores with the same position on a node
285   // communicators (for this TG)
286   shared_communicator local_tg_; // all the cores in a node in the same TG
287   communicator tgrp_;
288   communicator tgrp_cores_;
289   communicator tg_heads_;
290 #ifdef ENABLE_CUDA
291 #ifdef BUILD_AFQMC_WITH_NCCL
292   cudaStream_t nccl_Stream_;
293   ncclComm_t nccl_TGcomm_;
294 #endif
295 #endif
296 
setup(int nn,int nc)297   void setup(int nn, int nc)
298   {
299     //    app_log()<<std::endl
300     //             <<"**************************************************************\n"
301     //             <<" Setting up Task Group: " <<tgname <<std::endl;
302 
303     int ncores_per_TG = (nc < 1) ? (1) : (std::min(nc, node_.size()));
304 
305     // now setup local_tg_
306 
307     if (node_.size() % ncores_per_TG != 0)
308     {
309       app_error() << "Found " << node_.size() << " cores per node. " << std::endl;
310       app_error() << " Error in TaskGroup setup(): Number of cores per node is not divisible by requested number of "
311                      "cores in Task Group."
312                   << std::endl;
313       APP_ABORT(" Error in TaskGroup::TaskGroup() \n");
314     }
315 
316     if (local_tg_.size() != ncores_per_TG)
317     {
318       app_log() << nn << " " << nc << std::endl;
319       app_error() << "Problems creating local TG: " << local_tg_.size() << " " << ncores_per_TG << std::endl;
320       APP_ABORT(" Error in TaskGroup::TaskGroup() \n");
321     }
322 
323     int ndevices(number_of_devices());
324 
325     if (ndevices == 0)
326     {
327       nnodes_per_TG = (nn < 1) ? (1) : (std::min(nn, core_.size()));
328 
329       // assign groups from different nodes to TGs
330       if (core_.size() % nnodes_per_TG != 0)
331       {
332         app_error() << "Found " << core_.size() << " nodes. " << std::endl;
333         app_error() << " Error in TaskGroup setup(): Number of nodes is not divisible by requested number of nodes in "
334                        "Task Group."
335                     << std::endl;
336         APP_ABORT(" Error in TaskGroup_::TaskGroup_() \n");
337       }
338 
339       int myrow, mycol, nrows, ncols, node_in_TG;
340       // setup TG grid
341       nrows         = node_.size() / ncores_per_TG;
342       ncols         = core_.size() / nnodes_per_TG;
343       mycol         = core_.rank() / nnodes_per_TG; // simple square asignment
344       node_in_TG    = core_.rank() % nnodes_per_TG;
345       myrow         = node_.rank() / ncores_per_TG;
346       TG_number     = mycol + ncols * myrow;
347       number_of_TGs = nrows * ncols;
348 
349       // split communicator
350       tgrp_ = global_.split(TG_number, global_.rank());
351 
352       if (tgrp_.rank() != node_in_TG * ncores_per_TG + local_tg_.rank())
353       {
354         app_error() << " Error in TG::setup(): Unexpected TG_rank: " << tgrp_.rank() << " " << node_in_TG << " "
355                     << local_tg_.rank() << " " << node_in_TG * ncores_per_TG + local_tg_.rank() << std::endl;
356         APP_ABORT("Error in TG::setup(): Unexpected TG_rank \n");
357       }
358 
359       // define ring communication pattern
360       // these are the ranks (in TGcomm) of cores with the same core_rank
361       // on nodes with id +-1 (with respect to local node).
362       next_core_circular_node_pattern = ((node_in_TG + 1) % nnodes_per_TG) * ncores_per_TG + local_tg_.rank();
363       if (node_in_TG == 0)
364         prev_core_circular_node_pattern = (nnodes_per_TG - 1) * ncores_per_TG + local_tg_.rank();
365       else
366         prev_core_circular_node_pattern = ((node_in_TG - 1) % nnodes_per_TG) * ncores_per_TG + local_tg_.rank();
367     }
368     else
369     { // ndevices > 0
370       // assign groups from the same node to TGs
371 
372       nnodes_per_TG = (nn < 1) ? (1) : (std::min(nn, global_.size()));
373 
374       if (ncores_per_TG > 1)
375       {
376         app_error() << " Error in TaskGroup setup(): ncores > 1 incompatible with ndevices>0." << std::endl;
377         APP_ABORT(" Error in TaskGroup_::TaskGroup_() \n");
378       }
379       // limiting to full nodes if nnodes_per_TG > ndevices
380       if (global_.size() % nnodes_per_TG != 0)
381       {
382         app_error() << "Found " << global_.size() << " MPI tasks. " << std::endl;
383         app_error() << " Error in TaskGroup setup(): Number of MPI tasks is not divisible by requested number of "
384                        "groups in Task Group."
385                     << std::endl;
386         APP_ABORT(" Error in TaskGroup_::TaskGroup_() \n");
387       }
388 
389       // MAM: How do I check that distribution of ranks is made along a node first???
390       //      Not sure how to check, but print warning if nnodes>1 and ranks are
391       //      distributed over nodes first (e.g. adjacent ranks in Global are in different nodes)
392       //      This would kill performance!!!
393       TG_number     = global_.rank() / nnodes_per_TG;
394       number_of_TGs = global_.size() / nnodes_per_TG;
395 
396       // split communicator
397       tgrp_ = global_.split(TG_number, global_.rank());
398 
399       // define ring communication pattern
400       next_core_circular_node_pattern = (tgrp_.rank() + 1) % tgrp_.size();
401       if (tgrp_.rank() == 0)
402         prev_core_circular_node_pattern = tgrp_.size() - 1;
403       else
404         prev_core_circular_node_pattern = tgrp_.rank() - 1;
405     }
406 
407 #ifdef ENABLE_CUDA
408 #ifdef BUILD_AFQMC_WITH_NCCL
409     qmc_cuda::cuda_check(cudaStreamCreate(&nccl_Stream_), "cudaStreamCreate(&s)");
410     {
411       ncclUniqueId id;
412       if (tgrp_.rank() == 0)
413         ncclGetUniqueId(&id);
414       MPI_Bcast((void*)&id, sizeof(id), MPI_BYTE, 0, tgrp_.get());
415       NCCLCHECK(ncclCommInitRank(&nccl_TGcomm_, tgrp_.size(), id, tgrp_.rank()));
416     }
417 #endif
418 #endif
419 
420     // split communicator
421     tgrp_cores_ = tgrp_.split(getLocalTGRank(), getLocalGroupNumber());
422   }
423 };
424 
425 class TaskGroupHandler
426 {
427 public:
TaskGroupHandler(afqmc::GlobalTaskGroup & gtg_,int nc)428   TaskGroupHandler(afqmc::GlobalTaskGroup& gtg_, int nc) : gTG_(gtg_), ncores(nc) {}
429 
~TaskGroupHandler()430   ~TaskGroupHandler() {}
431 
getTG(int nn)432   TaskGroup_& getTG(int nn)
433   {
434     if (ncores <= 0)
435       APP_ABORT(" Error: Calling TaskGroupHandler::getTG() before setting ncores. \n\n\n");
436     auto t = TGMap.find(nn);
437     if (t == TGMap.end())
438     {
439       auto p = TGMap.insert(
440           std::make_pair(nn, afqmc::TaskGroup_(gTG_, std::string("TaskGroup_") + std::to_string(nn), nn, ncores)));
441       if (!p.second)
442         APP_ABORT(" Error: Problems creating new TG in TaskGroupHandler::getTG(int). \n");
443       return (p.first)->second;
444     }
445     return t->second;
446   }
447 
setNCores(int nc)448   void setNCores(int nc) { ncores = nc; }
449 
gTG()450   afqmc::GlobalTaskGroup& gTG() { return gTG_; }
451 
452 private:
453   afqmc::GlobalTaskGroup& gTG_;
454 
455   int ncores;
456 
457   std::map<int, afqmc::TaskGroup_> TGMap;
458 };
459 
460 
461 } // namespace afqmc
462 
463 } // namespace qmcplusplus
464 
465 
466 #endif
467