1 //============================================================================
2 //  Copyright (c) Kitware, Inc.
3 //  All rights reserved.
4 //  See LICENSE.txt for details.
5 //
6 //  This software is distributed WITHOUT ANY WARRANTY; without even
7 //  the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
8 //  PURPOSE.  See the above copyright notice for more information.
9 //============================================================================
10 
11 #ifndef vtk_m_filter_Messenger_h
12 #define vtk_m_filter_Messenger_h
13 
14 #include <vtkm/filter/vtkm_filter_extra_export.h>
15 
16 #include <vtkm/Types.h>
17 #include <vtkm/cont/Serialization.h>
18 #include <vtkm/thirdparty/diy/diy.h>
19 
20 #include <list>
21 #include <map>
22 #include <set>
23 #include <vector>
24 
25 #ifdef VTKM_ENABLE_MPI
26 #include <mpi.h>
27 #endif
28 
29 namespace vtkm
30 {
31 namespace filter
32 {
33 namespace particleadvection
34 {
35 
36 class VTKM_FILTER_EXTRA_EXPORT Messenger
37 {
38 public:
39   VTKM_CONT Messenger(vtkmdiy::mpi::communicator& comm);
~Messenger()40   VTKM_CONT virtual ~Messenger()
41   {
42 #ifdef VTKM_ENABLE_MPI
43     this->CleanupRequests();
44 #endif
45   }
46 
GetRank()47   int GetRank() const { return this->Rank; }
GetNumRanks()48   int GetNumRanks() const { return this->NumRanks; }
49 
50 #ifdef VTKM_ENABLE_MPI
51   VTKM_CONT void RegisterTag(int tag, std::size_t numRecvs, std::size_t size);
52 
53 protected:
54   static std::size_t CalcMessageBufferSize(std::size_t msgSz);
55 
56   void InitializeBuffers();
57   void CheckPendingSendRequests();
58   void CleanupRequests(int tag = TAG_ANY);
59   void SendData(int dst, int tag, const vtkmdiy::MemoryBuffer& buff);
60   bool RecvData(const std::set<int>& tags,
61                 std::vector<std::pair<int, vtkmdiy::MemoryBuffer>>& buffers,
62                 bool blockAndWait = false);
63 
64 private:
65   void PostRecv(int tag);
66   void PostRecv(int tag, std::size_t sz, int src = -1);
67 
68 
69   //Message headers.
70   typedef struct
71   {
72     int rank, tag;
73     std::size_t id, numPackets, packet, packetSz, dataSz;
74   } Header;
75 
76   bool RecvData(int tag, std::vector<vtkmdiy::MemoryBuffer>& buffers, bool blockAndWait = false);
77 
78   void PrepareForSend(int tag, const vtkmdiy::MemoryBuffer& buff, std::vector<char*>& buffList);
GetMsgID()79   vtkm::Id GetMsgID() { return this->MsgID++; }
80   static bool PacketCompare(const char* a, const char* b);
81   void ProcessReceivedBuffers(std::vector<char*>& incomingBuffers,
82                               std::vector<std::pair<int, vtkmdiy::MemoryBuffer>>& buffers);
83 
84   // Send/Recv buffer management structures.
85   using RequestTagPair = std::pair<MPI_Request, int>;
86   using RankIdPair = std::pair<int, int>;
87 
88   //Member data
89   std::map<int, std::pair<std::size_t, std::size_t>> MessageTagInfo;
90   MPI_Comm MPIComm;
91   std::size_t MsgID;
92   int NumRanks;
93   int Rank;
94   std::map<RequestTagPair, char*> RecvBuffers;
95   std::map<RankIdPair, std::list<char*>> RecvPackets;
96   std::map<RequestTagPair, char*> SendBuffers;
97   static constexpr int TAG_ANY = -1;
98 
99   void CheckRequests(const std::map<RequestTagPair, char*>& buffer,
100                      const std::set<int>& tags,
101                      bool BlockAndWait,
102                      std::vector<RequestTagPair>& reqTags);
103 #else
104 protected:
105   static constexpr int NumRanks = 1;
106   static constexpr int Rank = 0;
107 #endif
108 };
109 }
110 }
111 } // namespace vtkm::filter::particleadvection
112 
113 
114 #endif
115