1 #ifndef COMMLAYER_H 2 #define COMMLAYER_H 1 3 4 #include "Messages.h" 5 #include <mpi.h> 6 #include <vector> 7 8 enum APMessage 9 { 10 APM_NONE, 11 APM_CONTROL, 12 APM_BUFFERED 13 }; 14 15 enum APControl 16 { 17 APC_SET_STATE, 18 APC_ERODE_COMPLETE, 19 APC_TRIM, 20 APC_POPBUBBLE, 21 APC_ASSEMBLE, 22 APC_CHECKPOINT, 23 APC_WAIT, 24 APC_BARRIER, 25 }; 26 27 typedef std::vector<Message*> MessagePtrVector; 28 29 struct ControlMessage 30 { 31 int64_t id; 32 APControl msgType; 33 int argument; 34 }; 35 36 /** Interprocess communication and synchronization primitives. */ 37 class CommLayer 38 { 39 public: 40 CommLayer(); 41 ~CommLayer(); 42 43 // Check if a message exists, if it does return the type 44 APMessage checkMessage(int &sendID); 45 46 // Return whether a message has been received. 47 bool receiveEmpty(); 48 49 // Block until all processes have reached this routine. 50 void barrier(); 51 52 void broadcast(int message); 53 int receiveBroadcast(); 54 55 // Block until all processes have reached this routine. 56 long long unsigned reduce(long long unsigned count); 57 std::vector<unsigned> reduce(const std::vector<unsigned>& v); 58 std::vector<long unsigned> reduce( 59 const std::vector<long unsigned>& v); 60 61 // Send a control message 62 void sendControlMessage(APControl m, int argument = 0); 63 64 // Send a control message to a specific node 65 uint64_t sendControlMessageToNode(int nodeID, 66 APControl command, int argument = 0); 67 68 // Receive a control message 69 ControlMessage receiveControlMessage(); 70 71 // Send a message that the checkpoint has been reached 72 uint64_t sendCheckPointMessage(int argument = 0); 73 74 // Send a buffered message 75 void sendBufferedMessage(int destID, char* msg, size_t size); 76 77 // Receive a buffered sequence of messages 78 void receiveBufferedMessage(MessagePtrVector& outmessages); 79 reduceInflight()80 uint64_t reduceInflight() 81 { 82 return reduce(m_txPackets - m_rxPackets); 83 } 84 85 private: 86 uint64_t m_msgID; 87 uint8_t* m_rxBuffer; 88 MPI_Request m_request; 89 90 protected: 91 // Counters 92 uint64_t m_rxPackets; 93 uint64_t m_rxMessages; 94 uint64_t m_rxBytes; 95 uint64_t m_txPackets; 96 uint64_t m_txMessages; 97 uint64_t m_txBytes; 98 }; 99 100 #endif 101