1 //============================================================================ 2 // Copyright (c) Kitware, Inc. 3 // All rights reserved. 4 // See LICENSE.txt for details. 5 // 6 // This software is distributed WITHOUT ANY WARRANTY; without even 7 // the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR 8 // PURPOSE. See the above copyright notice for more information. 9 //============================================================================ 10 11 #ifndef vtk_m_filter_Messenger_h 12 #define vtk_m_filter_Messenger_h 13 14 #include <vtkm/filter/vtkm_filter_extra_export.h> 15 16 #include <vtkm/Types.h> 17 #include <vtkm/cont/Serialization.h> 18 #include <vtkm/thirdparty/diy/diy.h> 19 20 #include <list> 21 #include <map> 22 #include <set> 23 #include <vector> 24 25 #ifdef VTKM_ENABLE_MPI 26 #include <mpi.h> 27 #endif 28 29 namespace vtkm 30 { 31 namespace filter 32 { 33 namespace particleadvection 34 { 35 36 class VTKM_FILTER_EXTRA_EXPORT Messenger 37 { 38 public: 39 VTKM_CONT Messenger(vtkmdiy::mpi::communicator& comm); ~Messenger()40 VTKM_CONT virtual ~Messenger() 41 { 42 #ifdef VTKM_ENABLE_MPI 43 this->CleanupRequests(); 44 #endif 45 } 46 GetRank()47 int GetRank() const { return this->Rank; } GetNumRanks()48 int GetNumRanks() const { return this->NumRanks; } 49 50 #ifdef VTKM_ENABLE_MPI 51 VTKM_CONT void RegisterTag(int tag, std::size_t numRecvs, std::size_t size); 52 53 protected: 54 static std::size_t CalcMessageBufferSize(std::size_t msgSz); 55 56 void InitializeBuffers(); 57 void CheckPendingSendRequests(); 58 void CleanupRequests(int tag = TAG_ANY); 59 void SendData(int dst, int tag, const vtkmdiy::MemoryBuffer& buff); 60 bool RecvData(const std::set<int>& tags, 61 std::vector<std::pair<int, vtkmdiy::MemoryBuffer>>& buffers, 62 bool blockAndWait = false); 63 64 private: 65 void PostRecv(int tag); 66 void PostRecv(int tag, std::size_t sz, int src = -1); 67 68 69 //Message headers. 70 typedef struct 71 { 72 int rank, tag; 73 std::size_t id, numPackets, packet, packetSz, dataSz; 74 } Header; 75 76 bool RecvData(int tag, std::vector<vtkmdiy::MemoryBuffer>& buffers, bool blockAndWait = false); 77 78 void PrepareForSend(int tag, const vtkmdiy::MemoryBuffer& buff, std::vector<char*>& buffList); GetMsgID()79 vtkm::Id GetMsgID() { return this->MsgID++; } 80 static bool PacketCompare(const char* a, const char* b); 81 void ProcessReceivedBuffers(std::vector<char*>& incomingBuffers, 82 std::vector<std::pair<int, vtkmdiy::MemoryBuffer>>& buffers); 83 84 // Send/Recv buffer management structures. 85 using RequestTagPair = std::pair<MPI_Request, int>; 86 using RankIdPair = std::pair<int, int>; 87 88 //Member data 89 std::map<int, std::pair<std::size_t, std::size_t>> MessageTagInfo; 90 MPI_Comm MPIComm; 91 std::size_t MsgID; 92 int NumRanks; 93 int Rank; 94 std::map<RequestTagPair, char*> RecvBuffers; 95 std::map<RankIdPair, std::list<char*>> RecvPackets; 96 std::map<RequestTagPair, char*> SendBuffers; 97 static constexpr int TAG_ANY = -1; 98 99 void CheckRequests(const std::map<RequestTagPair, char*>& buffer, 100 const std::set<int>& tags, 101 bool BlockAndWait, 102 std::vector<RequestTagPair>& reqTags); 103 #else 104 protected: 105 static constexpr int NumRanks = 1; 106 static constexpr int Rank = 0; 107 #endif 108 }; 109 } 110 } 111 } // namespace vtkm::filter::particleadvection 112 113 114 #endif 115