1 #ifndef COMMLAYER_H
2 #define COMMLAYER_H 1
3 
4 #include "Messages.h"
5 #include <mpi.h>
6 #include <vector>
7 
8 enum APMessage
9 {
10 	APM_NONE,
11 	APM_CONTROL,
12 	APM_BUFFERED
13 };
14 
15 enum APControl
16 {
17 	APC_SET_STATE,
18 	APC_ERODE_COMPLETE,
19 	APC_TRIM,
20 	APC_POPBUBBLE,
21 	APC_ASSEMBLE,
22 	APC_CHECKPOINT,
23 	APC_WAIT,
24 	APC_BARRIER,
25 };
26 
27 typedef std::vector<Message*> MessagePtrVector;
28 
29 struct ControlMessage
30 {
31 	int64_t id;
32 	APControl msgType;
33 	int argument;
34 };
35 
36 /** Interprocess communication and synchronization primitives. */
37 class CommLayer
38 {
39 	public:
40 		CommLayer();
41 		~CommLayer();
42 
43 		// Check if a message exists, if it does return the type
44 		APMessage checkMessage(int &sendID);
45 
46 		// Return whether a message has been received.
47 		bool receiveEmpty();
48 
49 		// Block until all processes have reached this routine.
50 		void barrier();
51 
52 		void broadcast(int message);
53 		int receiveBroadcast();
54 
55 		// Block until all processes have reached this routine.
56 		long long unsigned reduce(long long unsigned count);
57 		std::vector<unsigned> reduce(const std::vector<unsigned>& v);
58 		std::vector<long unsigned> reduce(
59 				const std::vector<long unsigned>& v);
60 
61 		// Send a control message
62 		void sendControlMessage(APControl m, int argument = 0);
63 
64 		// Send a control message to a specific node
65 		uint64_t sendControlMessageToNode(int nodeID,
66 				APControl command, int argument = 0);
67 
68 		// Receive a control message
69 		ControlMessage receiveControlMessage();
70 
71 		// Send a message that the checkpoint has been reached
72 		uint64_t sendCheckPointMessage(int argument = 0);
73 
74 		// Send a buffered message
75 		void sendBufferedMessage(int destID, char* msg, size_t size);
76 
77 		// Receive a buffered sequence of messages
78 		void receiveBufferedMessage(MessagePtrVector& outmessages);
79 
reduceInflight()80 		uint64_t reduceInflight()
81 		{
82 			return reduce(m_txPackets - m_rxPackets);
83 		}
84 
85 	private:
86 		uint64_t m_msgID;
87 		uint8_t* m_rxBuffer;
88 		MPI_Request m_request;
89 
90 	protected:
91 		// Counters
92 		uint64_t m_rxPackets;
93 		uint64_t m_rxMessages;
94 		uint64_t m_rxBytes;
95 		uint64_t m_txPackets;
96 		uint64_t m_txMessages;
97 		uint64_t m_txBytes;
98 };
99 
100 #endif
101