1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * This program is free software; you can redistribute it and/or modify
4  * it under the terms of the GNU General Public License version 2 as
5  * published by the Free Software Foundation;
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program; if not, write to the Free Software
14  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
15  *
16  * Author: George Riley <riley@ece.gatech.edu>
17  */
18 
19 /**
20  * \file
21  * \ingroup mpi
22  * Implementation of classes ns3::SentBuffer and ns3::GrantedTimeWindowMpiInterface.
23  */
24 
25 // This object contains static methods that provide an easy interface
26 // to the necessary MPI information.
27 
28 #include <iostream>
29 #include <iomanip>
30 #include <list>
31 
32 #include "granted-time-window-mpi-interface.h"
33 #include "mpi-receiver.h"
34 #include "mpi-interface.h"
35 
36 #include "ns3/node.h"
37 #include "ns3/node-list.h"
38 #include "ns3/net-device.h"
39 #include "ns3/simulator.h"
40 #include "ns3/simulator-impl.h"
41 #include "ns3/nstime.h"
42 #include "ns3/log.h"
43 
44 #include <mpi.h>
45 
46 namespace ns3 {
47 
48 NS_LOG_COMPONENT_DEFINE ("GrantedTimeWindowMpiInterface");
49 
50 NS_OBJECT_ENSURE_REGISTERED (GrantedTimeWindowMpiInterface);
51 
SentBuffer()52 SentBuffer::SentBuffer ()
53 {
54   m_buffer = 0;
55   m_request = 0;
56 }
57 
~SentBuffer()58 SentBuffer::~SentBuffer ()
59 {
60   delete [] m_buffer;
61 }
62 
63 uint8_t*
GetBuffer()64 SentBuffer::GetBuffer ()
65 {
66   return m_buffer;
67 }
68 
69 void
SetBuffer(uint8_t * buffer)70 SentBuffer::SetBuffer (uint8_t* buffer)
71 {
72   m_buffer = buffer;
73 }
74 
75 MPI_Request*
GetRequest()76 SentBuffer::GetRequest ()
77 {
78   return &m_request;
79 }
80 
81 uint32_t              GrantedTimeWindowMpiInterface::g_sid = 0;
82 uint32_t              GrantedTimeWindowMpiInterface::g_size = 1;
83 bool                  GrantedTimeWindowMpiInterface::g_enabled = false;
84 bool                  GrantedTimeWindowMpiInterface::g_mpiInitCalled = false;
85 uint32_t              GrantedTimeWindowMpiInterface::g_rxCount = 0;
86 uint32_t              GrantedTimeWindowMpiInterface::g_txCount = 0;
87 std::list<SentBuffer> GrantedTimeWindowMpiInterface::g_pendingTx;
88 
89 MPI_Request* GrantedTimeWindowMpiInterface::g_requests;
90 char**       GrantedTimeWindowMpiInterface::g_pRxBuffers;
91 MPI_Comm     GrantedTimeWindowMpiInterface::g_communicator = MPI_COMM_WORLD;
92 bool         GrantedTimeWindowMpiInterface::g_freeCommunicator = false;;
93 
94 TypeId
GetTypeId(void)95 GrantedTimeWindowMpiInterface::GetTypeId (void)
96 {
97   static TypeId tid = TypeId ("ns3::GrantedTimeWindowMpiInterface")
98     .SetParent<Object> ()
99     .SetGroupName ("Mpi")
100   ;
101   return tid;
102 }
103 
104 void
Destroy()105 GrantedTimeWindowMpiInterface::Destroy ()
106 {
107   NS_LOG_FUNCTION (this);
108 
109   for (uint32_t i = 0; i < GetSize (); ++i)
110     {
111       delete [] g_pRxBuffers[i];
112     }
113   delete [] g_pRxBuffers;
114   delete [] g_requests;
115 
116   g_pendingTx.clear ();
117 }
118 
119 uint32_t
GetRxCount()120 GrantedTimeWindowMpiInterface::GetRxCount ()
121 {
122   NS_ASSERT (g_enabled);
123   return g_rxCount;
124 }
125 
126 uint32_t
GetTxCount()127 GrantedTimeWindowMpiInterface::GetTxCount ()
128 {
129   NS_ASSERT (g_enabled);
130   return g_txCount;
131 }
132 
133 uint32_t
GetSystemId()134 GrantedTimeWindowMpiInterface::GetSystemId ()
135 {
136   NS_ASSERT (g_enabled);
137   return g_sid;
138 }
139 
140 uint32_t
GetSize()141 GrantedTimeWindowMpiInterface::GetSize ()
142 {
143   NS_ASSERT (g_enabled);
144   return g_size;
145 }
146 
147 bool
IsEnabled()148 GrantedTimeWindowMpiInterface::IsEnabled ()
149 {
150   return g_enabled;
151 }
152 
153 MPI_Comm
GetCommunicator()154 GrantedTimeWindowMpiInterface::GetCommunicator()
155 {
156   NS_ASSERT (g_enabled);
157   return g_communicator;
158 }
159 
160 void
Enable(int * pargc,char *** pargv)161 GrantedTimeWindowMpiInterface::Enable (int* pargc, char*** pargv)
162 {
163   NS_LOG_FUNCTION (this << pargc << pargv);
164 
165   NS_ASSERT (g_enabled == false);
166 
167   // Initialize the MPI interface
168   MPI_Init (pargc, pargv);
169   Enable (MPI_COMM_WORLD);
170   g_mpiInitCalled = true;
171   g_enabled = true;
172 }
173 
174 void
Enable(MPI_Comm communicator)175 GrantedTimeWindowMpiInterface::Enable (MPI_Comm communicator)
176 {
177   NS_LOG_FUNCTION (this);
178 
179   NS_ASSERT (g_enabled == false);
180 
181   // Standard MPI practice is to duplicate the communicator for
182   // library to use.  Library communicates in isolated communication
183   // context.
184   MPI_Comm_dup (communicator, &g_communicator);
185   g_freeCommunicator = true;
186 
187   MPI_Barrier (g_communicator);
188 
189   int mpiSystemId;
190   int mpiSize;
191   MPI_Comm_rank (g_communicator, &mpiSystemId);
192   MPI_Comm_size (g_communicator, &mpiSize);
193   g_sid = mpiSystemId;
194   g_size = mpiSize;
195 
196   g_enabled = true;
197   // Post a non-blocking receive for all peers
198   g_pRxBuffers = new char*[g_size];
199   g_requests = new MPI_Request[g_size];
200   for (uint32_t i = 0; i < GetSize (); ++i)
201     {
202       g_pRxBuffers[i] = new char[MAX_MPI_MSG_SIZE];
203       MPI_Irecv (g_pRxBuffers[i], MAX_MPI_MSG_SIZE, MPI_CHAR, MPI_ANY_SOURCE, 0,
204                  g_communicator, &g_requests[i]);
205     }
206 }
207 
208 void
SendPacket(Ptr<Packet> p,const Time & rxTime,uint32_t node,uint32_t dev)209 GrantedTimeWindowMpiInterface::SendPacket (Ptr<Packet> p, const Time& rxTime, uint32_t node, uint32_t dev)
210 {
211   NS_LOG_FUNCTION (this << p << rxTime.GetTimeStep () << node << dev);
212 
213   SentBuffer sendBuf;
214   g_pendingTx.push_back (sendBuf);
215   std::list<SentBuffer>::reverse_iterator i = g_pendingTx.rbegin (); // Points to the last element
216 
217   uint32_t serializedSize = p->GetSerializedSize ();
218   uint8_t* buffer =  new uint8_t[serializedSize + 16];
219   i->SetBuffer (buffer);
220   // Add the time, dest node and dest device
221   uint64_t t = rxTime.GetInteger ();
222   uint64_t* pTime = reinterpret_cast <uint64_t *> (buffer);
223   *pTime++ = t;
224   uint32_t* pData = reinterpret_cast<uint32_t *> (pTime);
225   *pData++ = node;
226   *pData++ = dev;
227   // Serialize the packet
228   p->Serialize (reinterpret_cast<uint8_t *> (pData), serializedSize);
229 
230   // Find the system id for the destination node
231   Ptr<Node> destNode = NodeList::GetNode (node);
232   uint32_t nodeSysId = destNode->GetSystemId ();
233 
234   MPI_Isend (reinterpret_cast<void *> (i->GetBuffer ()), serializedSize + 16, MPI_CHAR, nodeSysId,
235              0, g_communicator, (i->GetRequest ()));
236   g_txCount++;
237 }
238 
239 void
ReceiveMessages()240 GrantedTimeWindowMpiInterface::ReceiveMessages ()
241 {
242   NS_LOG_FUNCTION_NOARGS ();
243 
244   // Poll the non-block reads to see if data arrived
245   while (true)
246     {
247       int flag = 0;
248       int index = 0;
249       MPI_Status status;
250 
251       MPI_Testany (MpiInterface::GetSize (), g_requests, &index, &flag, &status);
252       if (!flag)
253         {
254           break;        // No more messages
255         }
256       int count;
257       MPI_Get_count (&status, MPI_CHAR, &count);
258       g_rxCount++; // Count this receive
259 
260       // Get the meta data first
261       uint64_t* pTime = reinterpret_cast<uint64_t *> (g_pRxBuffers[index]);
262       uint64_t time = *pTime++;
263       uint32_t* pData = reinterpret_cast<uint32_t *> (pTime);
264       uint32_t node = *pData++;
265       uint32_t dev  = *pData++;
266 
267       Time rxTime (time);
268 
269       count -= sizeof (time) + sizeof (node) + sizeof (dev);
270 
271       Ptr<Packet> p = Create<Packet> (reinterpret_cast<uint8_t *> (pData), count, true);
272 
273       // Find the correct node/device to schedule receive event
274       Ptr<Node> pNode = NodeList::GetNode (node);
275       Ptr<MpiReceiver> pMpiRec = 0;
276       uint32_t nDevices = pNode->GetNDevices ();
277       for (uint32_t i = 0; i < nDevices; ++i)
278         {
279           Ptr<NetDevice> pThisDev = pNode->GetDevice (i);
280           if (pThisDev->GetIfIndex () == dev)
281             {
282               pMpiRec = pThisDev->GetObject<MpiReceiver> ();
283               break;
284             }
285         }
286 
287       NS_ASSERT (pNode && pMpiRec);
288 
289       // Schedule the rx event
290       Simulator::ScheduleWithContext (pNode->GetId (), rxTime - Simulator::Now (),
291                                       &MpiReceiver::Receive, pMpiRec, p);
292 
293       // Re-queue the next read
294       MPI_Irecv (g_pRxBuffers[index], MAX_MPI_MSG_SIZE, MPI_CHAR, MPI_ANY_SOURCE, 0,
295                  g_communicator, &g_requests[index]);
296     }
297 }
298 
299 void
TestSendComplete()300 GrantedTimeWindowMpiInterface::TestSendComplete ()
301 {
302   NS_LOG_FUNCTION_NOARGS ();
303 
304   std::list<SentBuffer>::iterator i = g_pendingTx.begin ();
305   while (i != g_pendingTx.end ())
306     {
307       MPI_Status status;
308       int flag = 0;
309       MPI_Test (i->GetRequest (), &flag, &status);
310       std::list<SentBuffer>::iterator current = i; // Save current for erasing
311       i++;                                    // Advance to next
312       if (flag)
313         { // This message is complete
314           g_pendingTx.erase (current);
315         }
316     }
317 }
318 
319 void
Disable()320 GrantedTimeWindowMpiInterface::Disable ()
321 {
322   NS_LOG_FUNCTION_NOARGS ();
323 
324   if (g_freeCommunicator)
325     {
326       MPI_Comm_free (&g_communicator);
327       g_freeCommunicator = false;
328     }
329 
330   // ns-3 should MPI finalize only if ns-3 was used to initialize
331   if (g_mpiInitCalled)
332     {
333       int flag = 0;
334       MPI_Initialized (&flag);
335       if (flag)
336         {
337           MPI_Finalize ();
338         }
339       else
340         {
341           NS_FATAL_ERROR ("Cannot disable MPI environment without Initializing it first");
342         }
343       g_mpiInitCalled = false;
344     }
345 
346   g_enabled = false;
347 }
348 
349 
350 } // namespace ns3
351