1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License version 2 as
5 * published by the Free Software Foundation;
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
15 *
16 * Author: George Riley <riley@ece.gatech.edu>
17 */
18
19 /**
20 * \file
21 * \ingroup mpi
22 * Implementation of classes ns3::SentBuffer and ns3::GrantedTimeWindowMpiInterface.
23 */
24
25 // This object contains static methods that provide an easy interface
26 // to the necessary MPI information.
27
28 #include <iostream>
29 #include <iomanip>
30 #include <list>
31
32 #include "granted-time-window-mpi-interface.h"
33 #include "mpi-receiver.h"
34 #include "mpi-interface.h"
35
36 #include "ns3/node.h"
37 #include "ns3/node-list.h"
38 #include "ns3/net-device.h"
39 #include "ns3/simulator.h"
40 #include "ns3/simulator-impl.h"
41 #include "ns3/nstime.h"
42 #include "ns3/log.h"
43
44 #include <mpi.h>
45
46 namespace ns3 {
47
48 NS_LOG_COMPONENT_DEFINE ("GrantedTimeWindowMpiInterface");
49
50 NS_OBJECT_ENSURE_REGISTERED (GrantedTimeWindowMpiInterface);
51
SentBuffer()52 SentBuffer::SentBuffer ()
53 {
54 m_buffer = 0;
55 m_request = 0;
56 }
57
~SentBuffer()58 SentBuffer::~SentBuffer ()
59 {
60 delete [] m_buffer;
61 }
62
63 uint8_t*
GetBuffer()64 SentBuffer::GetBuffer ()
65 {
66 return m_buffer;
67 }
68
69 void
SetBuffer(uint8_t * buffer)70 SentBuffer::SetBuffer (uint8_t* buffer)
71 {
72 m_buffer = buffer;
73 }
74
75 MPI_Request*
GetRequest()76 SentBuffer::GetRequest ()
77 {
78 return &m_request;
79 }
80
81 uint32_t GrantedTimeWindowMpiInterface::g_sid = 0;
82 uint32_t GrantedTimeWindowMpiInterface::g_size = 1;
83 bool GrantedTimeWindowMpiInterface::g_enabled = false;
84 bool GrantedTimeWindowMpiInterface::g_mpiInitCalled = false;
85 uint32_t GrantedTimeWindowMpiInterface::g_rxCount = 0;
86 uint32_t GrantedTimeWindowMpiInterface::g_txCount = 0;
87 std::list<SentBuffer> GrantedTimeWindowMpiInterface::g_pendingTx;
88
89 MPI_Request* GrantedTimeWindowMpiInterface::g_requests;
90 char** GrantedTimeWindowMpiInterface::g_pRxBuffers;
91 MPI_Comm GrantedTimeWindowMpiInterface::g_communicator = MPI_COMM_WORLD;
92 bool GrantedTimeWindowMpiInterface::g_freeCommunicator = false;;
93
94 TypeId
GetTypeId(void)95 GrantedTimeWindowMpiInterface::GetTypeId (void)
96 {
97 static TypeId tid = TypeId ("ns3::GrantedTimeWindowMpiInterface")
98 .SetParent<Object> ()
99 .SetGroupName ("Mpi")
100 ;
101 return tid;
102 }
103
104 void
Destroy()105 GrantedTimeWindowMpiInterface::Destroy ()
106 {
107 NS_LOG_FUNCTION (this);
108
109 for (uint32_t i = 0; i < GetSize (); ++i)
110 {
111 delete [] g_pRxBuffers[i];
112 }
113 delete [] g_pRxBuffers;
114 delete [] g_requests;
115
116 g_pendingTx.clear ();
117 }
118
119 uint32_t
GetRxCount()120 GrantedTimeWindowMpiInterface::GetRxCount ()
121 {
122 NS_ASSERT (g_enabled);
123 return g_rxCount;
124 }
125
126 uint32_t
GetTxCount()127 GrantedTimeWindowMpiInterface::GetTxCount ()
128 {
129 NS_ASSERT (g_enabled);
130 return g_txCount;
131 }
132
133 uint32_t
GetSystemId()134 GrantedTimeWindowMpiInterface::GetSystemId ()
135 {
136 NS_ASSERT (g_enabled);
137 return g_sid;
138 }
139
140 uint32_t
GetSize()141 GrantedTimeWindowMpiInterface::GetSize ()
142 {
143 NS_ASSERT (g_enabled);
144 return g_size;
145 }
146
147 bool
IsEnabled()148 GrantedTimeWindowMpiInterface::IsEnabled ()
149 {
150 return g_enabled;
151 }
152
153 MPI_Comm
GetCommunicator()154 GrantedTimeWindowMpiInterface::GetCommunicator()
155 {
156 NS_ASSERT (g_enabled);
157 return g_communicator;
158 }
159
160 void
Enable(int * pargc,char *** pargv)161 GrantedTimeWindowMpiInterface::Enable (int* pargc, char*** pargv)
162 {
163 NS_LOG_FUNCTION (this << pargc << pargv);
164
165 NS_ASSERT (g_enabled == false);
166
167 // Initialize the MPI interface
168 MPI_Init (pargc, pargv);
169 Enable (MPI_COMM_WORLD);
170 g_mpiInitCalled = true;
171 g_enabled = true;
172 }
173
174 void
Enable(MPI_Comm communicator)175 GrantedTimeWindowMpiInterface::Enable (MPI_Comm communicator)
176 {
177 NS_LOG_FUNCTION (this);
178
179 NS_ASSERT (g_enabled == false);
180
181 // Standard MPI practice is to duplicate the communicator for
182 // library to use. Library communicates in isolated communication
183 // context.
184 MPI_Comm_dup (communicator, &g_communicator);
185 g_freeCommunicator = true;
186
187 MPI_Barrier (g_communicator);
188
189 int mpiSystemId;
190 int mpiSize;
191 MPI_Comm_rank (g_communicator, &mpiSystemId);
192 MPI_Comm_size (g_communicator, &mpiSize);
193 g_sid = mpiSystemId;
194 g_size = mpiSize;
195
196 g_enabled = true;
197 // Post a non-blocking receive for all peers
198 g_pRxBuffers = new char*[g_size];
199 g_requests = new MPI_Request[g_size];
200 for (uint32_t i = 0; i < GetSize (); ++i)
201 {
202 g_pRxBuffers[i] = new char[MAX_MPI_MSG_SIZE];
203 MPI_Irecv (g_pRxBuffers[i], MAX_MPI_MSG_SIZE, MPI_CHAR, MPI_ANY_SOURCE, 0,
204 g_communicator, &g_requests[i]);
205 }
206 }
207
208 void
SendPacket(Ptr<Packet> p,const Time & rxTime,uint32_t node,uint32_t dev)209 GrantedTimeWindowMpiInterface::SendPacket (Ptr<Packet> p, const Time& rxTime, uint32_t node, uint32_t dev)
210 {
211 NS_LOG_FUNCTION (this << p << rxTime.GetTimeStep () << node << dev);
212
213 SentBuffer sendBuf;
214 g_pendingTx.push_back (sendBuf);
215 std::list<SentBuffer>::reverse_iterator i = g_pendingTx.rbegin (); // Points to the last element
216
217 uint32_t serializedSize = p->GetSerializedSize ();
218 uint8_t* buffer = new uint8_t[serializedSize + 16];
219 i->SetBuffer (buffer);
220 // Add the time, dest node and dest device
221 uint64_t t = rxTime.GetInteger ();
222 uint64_t* pTime = reinterpret_cast <uint64_t *> (buffer);
223 *pTime++ = t;
224 uint32_t* pData = reinterpret_cast<uint32_t *> (pTime);
225 *pData++ = node;
226 *pData++ = dev;
227 // Serialize the packet
228 p->Serialize (reinterpret_cast<uint8_t *> (pData), serializedSize);
229
230 // Find the system id for the destination node
231 Ptr<Node> destNode = NodeList::GetNode (node);
232 uint32_t nodeSysId = destNode->GetSystemId ();
233
234 MPI_Isend (reinterpret_cast<void *> (i->GetBuffer ()), serializedSize + 16, MPI_CHAR, nodeSysId,
235 0, g_communicator, (i->GetRequest ()));
236 g_txCount++;
237 }
238
239 void
ReceiveMessages()240 GrantedTimeWindowMpiInterface::ReceiveMessages ()
241 {
242 NS_LOG_FUNCTION_NOARGS ();
243
244 // Poll the non-block reads to see if data arrived
245 while (true)
246 {
247 int flag = 0;
248 int index = 0;
249 MPI_Status status;
250
251 MPI_Testany (MpiInterface::GetSize (), g_requests, &index, &flag, &status);
252 if (!flag)
253 {
254 break; // No more messages
255 }
256 int count;
257 MPI_Get_count (&status, MPI_CHAR, &count);
258 g_rxCount++; // Count this receive
259
260 // Get the meta data first
261 uint64_t* pTime = reinterpret_cast<uint64_t *> (g_pRxBuffers[index]);
262 uint64_t time = *pTime++;
263 uint32_t* pData = reinterpret_cast<uint32_t *> (pTime);
264 uint32_t node = *pData++;
265 uint32_t dev = *pData++;
266
267 Time rxTime (time);
268
269 count -= sizeof (time) + sizeof (node) + sizeof (dev);
270
271 Ptr<Packet> p = Create<Packet> (reinterpret_cast<uint8_t *> (pData), count, true);
272
273 // Find the correct node/device to schedule receive event
274 Ptr<Node> pNode = NodeList::GetNode (node);
275 Ptr<MpiReceiver> pMpiRec = 0;
276 uint32_t nDevices = pNode->GetNDevices ();
277 for (uint32_t i = 0; i < nDevices; ++i)
278 {
279 Ptr<NetDevice> pThisDev = pNode->GetDevice (i);
280 if (pThisDev->GetIfIndex () == dev)
281 {
282 pMpiRec = pThisDev->GetObject<MpiReceiver> ();
283 break;
284 }
285 }
286
287 NS_ASSERT (pNode && pMpiRec);
288
289 // Schedule the rx event
290 Simulator::ScheduleWithContext (pNode->GetId (), rxTime - Simulator::Now (),
291 &MpiReceiver::Receive, pMpiRec, p);
292
293 // Re-queue the next read
294 MPI_Irecv (g_pRxBuffers[index], MAX_MPI_MSG_SIZE, MPI_CHAR, MPI_ANY_SOURCE, 0,
295 g_communicator, &g_requests[index]);
296 }
297 }
298
299 void
TestSendComplete()300 GrantedTimeWindowMpiInterface::TestSendComplete ()
301 {
302 NS_LOG_FUNCTION_NOARGS ();
303
304 std::list<SentBuffer>::iterator i = g_pendingTx.begin ();
305 while (i != g_pendingTx.end ())
306 {
307 MPI_Status status;
308 int flag = 0;
309 MPI_Test (i->GetRequest (), &flag, &status);
310 std::list<SentBuffer>::iterator current = i; // Save current for erasing
311 i++; // Advance to next
312 if (flag)
313 { // This message is complete
314 g_pendingTx.erase (current);
315 }
316 }
317 }
318
319 void
Disable()320 GrantedTimeWindowMpiInterface::Disable ()
321 {
322 NS_LOG_FUNCTION_NOARGS ();
323
324 if (g_freeCommunicator)
325 {
326 MPI_Comm_free (&g_communicator);
327 g_freeCommunicator = false;
328 }
329
330 // ns-3 should MPI finalize only if ns-3 was used to initialize
331 if (g_mpiInitCalled)
332 {
333 int flag = 0;
334 MPI_Initialized (&flag);
335 if (flag)
336 {
337 MPI_Finalize ();
338 }
339 else
340 {
341 NS_FATAL_ERROR ("Cannot disable MPI environment without Initializing it first");
342 }
343 g_mpiInitCalled = false;
344 }
345
346 g_enabled = false;
347 }
348
349
350 } // namespace ns3
351