1 // This is a test application for experimenting
2 // with the NORM API implementation during its
3 // development.  A better-documented and complete
4 // example of the NORM API usage will be provided
5 // when the NORM API is more complete.
6 
7 // This tests the use of the NORM API in a multi-threaded app
8 
9 #include "normApi.h"
10 #include "protokit.h"  // for protolib debug, stuff, etc
11 
12 #include <stdio.h>
13 #include <stdlib.h>  // for srand()
14 
15 #ifdef UNIX
16 #include <unistd.h>  // for "sleep()"
17 #endif // UNIX
18 
19 
20 class NormThreadApp : public ProtoApp
21 {
22     public:
23         NormThreadApp();
24         virtual ~NormThreadApp();
25 
26         // Overrides from ProtoApp or NsProtoSimAgent base
27         bool OnStartup(int argc, const char*const* argv);
28         bool ProcessCommands(int argc, const char*const* argv);
29         void OnShutdown();
30 
31         bool OnCommand(const char* cmd, const char* val);
32 
33     private:
34         enum CmdType {CMD_INVALID, CMD_NOARG, CMD_ARG};
35         CmdType CommandType(const char* cmd);
36         static const char* const cmd_list[];
37 
38         static void DoNormEvent(ProtoDispatcher::Descriptor descriptor,
39                                 ProtoDispatcher::Event      theEvent,
40                                 const void*                 userData);
41 
42         void OnNormEvent();
43 
44         static void DoWorkerEvent(const void* receiverData);
45         void WorkerReadStream();
46 
47         enum {MSG_SIZE_MAX = 1024};
48 
49         bool OnTxTimeout(ProtoTimer& timer);
50 
51         // Our NORM API handles
52         NormInstanceHandle  norm_instance;
53         NormSessionHandle   norm_session;
54         NormObjectHandle    norm_tx_stream;
55         NormObjectHandle    norm_rx_stream;
56 
57         bool                sender;
58         bool                receiver;
59 
60         // This timer controls our message tx rate
61         ProtoTimer          tx_msg_timer;
62         char                tx_msg_buffer[MSG_SIZE_MAX];
63         unsigned int        tx_msg_length;
64         unsigned int        tx_msg_index;
65 
66         // We use a ProtoDispatcher for a "worker" thread
67         // to handle NORM events for a specific session
68         ProtoDispatcher worker_thread_dispatcher;
69 
70 
71 
72 };  // end class NormThreadApp
73 
74 // Our application instance
PROTO_INSTANTIATE_APP(NormThreadApp)75 PROTO_INSTANTIATE_APP(NormThreadApp)
76 
77 NormThreadApp::NormThreadApp()
78  : norm_instance(NORM_INSTANCE_INVALID),
79    norm_session(NORM_SESSION_INVALID),
80    norm_tx_stream(NORM_OBJECT_INVALID), norm_rx_stream(NORM_OBJECT_INVALID),
81    tx_msg_length(0), tx_msg_index(0)
82 {
83     tx_msg_timer.SetListener(this, &NormThreadApp::OnTxTimeout);
84     tx_msg_timer.SetInterval(0.00001);
85     tx_msg_timer.SetRepeat(-1);
86 
87     worker_thread_dispatcher.SetPromptCallback(DoWorkerEvent, this);
88 
89 }
90 
~NormThreadApp()91 NormThreadApp::~NormThreadApp()
92 {
93 
94 }
95 
OnStartup(int argc,const char * const * argv)96 bool NormThreadApp::OnStartup(int argc, const char*const* argv)
97 {
98     // 1) (TBD) Process any command-line options
99 
100     if (!ProcessCommands(argc, argv))
101     {
102         TRACE("error with commands\n");
103         return false;
104     }
105 
106     // 2) Create a Norm API instance and "generic" input handler for notifications
107     norm_instance = NormCreateInstance();
108     ASSERT(NORM_INSTANCE_INVALID != norm_instance);
109 
110     SetDebugLevel(3);
111 
112     // Set a callback that will call NormGetNextEvent()
113     if (!dispatcher.InstallGenericInput(NormGetDescriptor(norm_instance), DoNormEvent, this))
114     {
115         PLOG(PL_FATAL, "NormThreadApp::OnStartup() InstallGenericInput() error\n");
116         NormDestroyInstance(norm_instance);
117         return false;
118     }
119 
120 
121     // 3) Create a "custom" unique node identifier
122     // Here's a trick to generate a _hopefully_ unique NormNodeId
123     // based on an XOR of the system's IP address and the process id.
124     // (We use ProtoAddress::GetEndIdentifier() to get the local
125     //  "default" IP address for the system)
126     // (Note that passing "NORM_NODE_ANY" to the last arg of
127     //  NormCreateSession() does a similar thing but without
128     //  the processId XOR ... perhaps we should add the processId
129     //  hack to that default "NORM_NODE_ANY" local NormNodeId picker???
130     ProtoAddress localAddr;
131     if (!localAddr.ResolveLocalAddress())
132     {
133         fprintf(stderr, "normTest: error resolving local IP address\n");
134         OnShutdown();
135         return false;
136     }
137     NormNodeId localId = localAddr.EndIdentifier();
138 #ifdef WIN32
139     DWORD processId = GetCurrentProcessId();
140 #else
141     pid_t processId = getpid();
142 #endif // if/else WIN32/UNIX
143     localId ^= (NormNodeId)processId;
144 
145     // If needed, permutate to a valid, random NormNodeId
146     while ((NORM_NODE_ANY == localId) ||
147            (NORM_NODE_NONE == localId))
148     {
149         localId ^= (NormNodeId)rand();
150     }
151     //localId = 15;  // for testing purposes
152 
153     // 4) Create a NORM session
154     norm_session = NormCreateSession(norm_instance,
155                                     "224.1.1.1",
156                                      6001,
157                                      localId);
158     ASSERT(NORM_SESSION_INVALID != norm_session);
159 
160     if(!NormSetMulticastInterface(norm_session,"eth0"))
161     {
162         fprintf(stderr, "normTest: Unable to set multicast interface to \"eth0\"\n");
163         //return false;
164     }
165 
166     NormSetGrttEstimate(norm_session, 0.250);  // 1 msec initial grtt
167 
168     NormSetTxRate(norm_session, 80.0e+06);  // in bits/second
169 
170 
171     NormSetTxRateBounds(norm_session, 10.0e+06, 10.0e+06);
172 
173     NormSetCongestionControl(norm_session, true);
174 
175     NormSetTxLoss(norm_session, 2.0);
176 
177     //NormSetMessageTrace(norm_session, true);
178 
179     //NormSetLoopback(norm_session, true);
180 
181     // 5) If sender pick a random "session id", start sender, and create a stream
182     // We use a random "sessionId"
183     if (sender)
184     {
185         NormSessionId sessionId = (NormSessionId)rand();
186         //NormStartSender(norm_session, sessionId, 1024*1024, 1400, 64, 8);
187         NormStartSender(norm_session, sessionId, 2000000, 1400, 64, 8);
188         norm_tx_stream = NormStreamOpen(norm_session, 1800000);
189 
190         // Activate tx timer and force first message transmission
191         ActivateTimer(tx_msg_timer);
192         OnTxTimeout(tx_msg_timer);
193     }
194 
195 
196     // 6) If receiver, start receiver
197     if (receiver)
198     {
199         worker_thread_dispatcher.StartThread();
200         NormStartReceiver(norm_session, 2000000);
201     }
202 
203     return true;
204 }  // end NormThreadApp::OnStartup()
205 
OnShutdown()206 void NormThreadApp::OnShutdown()
207 {
208     if (tx_msg_timer.IsActive()) tx_msg_timer.Deactivate();
209 
210     if (NORM_INSTANCE_INVALID != norm_instance)
211     {
212         dispatcher.RemoveGenericInput(NormGetDescriptor(norm_instance));
213         NormDestroyInstance(norm_instance);
214         norm_instance = NORM_INSTANCE_INVALID;
215     }
216 
217 
218     worker_thread_dispatcher.Stop();
219 }  // end NormThreadApp::OnShutdown()
220 
OnTxTimeout(ProtoTimer &)221 bool NormThreadApp::OnTxTimeout(ProtoTimer& /*timer*/)
222 {
223     //TRACE("enter NormThreadApp::OnTxTimeout() ...\n");
224     while (1)
225     {
226         if (0 == tx_msg_length)
227         {
228             // Send a new message
229             unsigned int sendCount = 1;
230             sprintf(tx_msg_buffer, "normThreadTest says hello %u ", sendCount);
231             unsigned int msgLength = strlen(tx_msg_buffer);
232             memset(tx_msg_buffer + msgLength, 'a', 900 - msgLength);
233             tx_msg_length = 900;
234             tx_msg_index = 0;
235         }
236 
237         unsigned int bytesHave = tx_msg_length - tx_msg_index;
238         unsigned int bytesWritten =
239             NormStreamWrite(norm_tx_stream, tx_msg_buffer + tx_msg_index, tx_msg_length - tx_msg_index);
240         //TRACE("wrote %u bytes to stream ...\n", bytesWritten);
241         if (bytesWritten == bytesHave)
242         {
243 
244             tx_msg_length = 0;
245         }
246         else
247         {
248             if (0 == bytesWritten)
249             {
250                 TRACE("ZERO bytes written\n");
251             }
252             // We filled the stream buffer
253             tx_msg_index += bytesWritten;
254             if (tx_msg_timer.IsActive()) tx_msg_timer.Deactivate();
255             break;
256         }
257     }
258     return true;
259 }  // end NormThreadApp::OnTxTimeout()
260 
DoNormEvent(ProtoDispatcher::Descriptor descriptor,ProtoDispatcher::Event theEvent,const void * userData)261 void NormThreadApp::DoNormEvent(ProtoDispatcher::Descriptor descriptor,
262                                 ProtoDispatcher::Event      theEvent,
263                                 const void*                 userData)
264 {
265     NormThreadApp* theApp = reinterpret_cast<NormThreadApp*>((void*)userData);
266     theApp->OnNormEvent();
267 }  // end NormThreadApp::DoNormEvent()
268 
OnNormEvent()269 void NormThreadApp::OnNormEvent()
270 {
271     static unsigned long updateCount = 0;
272     NormEvent theEvent;
273     if (NormGetNextEvent(norm_instance, &theEvent))
274     {
275         switch (theEvent.type)
276         {
277             case NORM_TX_QUEUE_EMPTY:
278             case NORM_TX_QUEUE_VACANCY:
279 
280                 /*if (NORM_TX_QUEUE_VACANCY == theEvent.type)
281                     TRACE("NORM_TX_QUEUE_VACANCY ...\n");
282                 else
283                     TRACE("NORM_TX_QUEUE_EMPTY ...\n");*/
284                 if (!tx_msg_timer.IsActive())
285                 {
286                     //ActivateTimer(tx_msg_timer);
287                     OnTxTimeout(tx_msg_timer);
288                 }
289                 break;
290 
291             case NORM_GRTT_UPDATED:
292                 break;
293 
294             case NORM_CC_ACTIVE:
295                 TRACE("NORM_CC_ACTIVE ...\n");
296                 break;
297 
298             case NORM_CC_INACTIVE:
299                 TRACE("NORM_CC_INACTIVE ...\n");
300                 break;
301 
302             case NORM_REMOTE_SENDER_NEW:
303             case NORM_REMOTE_SENDER_ACTIVE:
304                 break;
305 
306             case NORM_REMOTE_SENDER_INACTIVE:
307                 NormNodeFreeBuffers(theEvent.sender);
308                 break;
309 
310             case NORM_RX_OBJECT_NEW:
311                 TRACE("NORM_RX_OBJECT_NEW ...\n");
312                 norm_rx_stream = theEvent.object;
313                 break;
314 
315             case NORM_RX_OBJECT_UPDATED:
316                 //TRACE("NORM_RX_OBJECT_UPDATED ...\n");
317                 updateCount++;
318                 if ((updateCount % 1000) == 0)
319                     TRACE("updateCount:%lu\n", updateCount);
320                 worker_thread_dispatcher.PromptThread();
321                 break;
322 
323             case NORM_RX_OBJECT_ABORTED:
324                 TRACE("NORM_RX_OBJECT_ABORTED ...\n");
325                 break;
326 
327             default:
328                 TRACE("UNHANDLED NORM EVENT : %d...\n", theEvent.type);
329                 break;
330         }
331     }
332     else
333     {
334         PLOG(PL_ERROR, "NormThreadApp::OnNormEvent() NormGetNextEvent() error?\n");
335     }
336 }  // end NormThreadApp::OnNormEvent()
337 
DoWorkerEvent(const void * receiverData)338 void NormThreadApp::DoWorkerEvent(const void* receiverData)
339 {
340     NormThreadApp* theApp = (NormThreadApp*)receiverData;
341     theApp->WorkerReadStream();
342 }  // end NormThreadApp::DoWorkerEvent()
343 
WorkerReadStream()344 void NormThreadApp::WorkerReadStream()
345 {
346     unsigned int loopCount = 0;
347     static unsigned long readCount = 0;
348     char rxBuffer[2048];
349     unsigned int bytesRead = 1400;
350     do
351     {
352         if (NormStreamRead(norm_rx_stream, rxBuffer, &bytesRead))
353         {
354             //TRACE("read %u bytes from stream ...\n", bytesRead);
355             if (0 != bytesRead)
356             {
357                 readCount++;
358                 if ((readCount % 1000) == 0)
359                     TRACE("readCount:%lu\n", readCount);
360                 bytesRead = 1400;
361             }
362         }
363         else
364         {
365             TRACE("NormThreadApp::WorkerReadStream() stream broken!\n");
366             bytesRead = 0;
367         }
368         loopCount++;
369     } while (0 != bytesRead);
370     //TRACE("loopCount: %lu\n", loopCount);
371 }  // end NormThreadApp::WorkerReadStream()
372 
373 const char* const NormThreadApp::cmd_list[] =
374 {
375     "+debug",        // debug level
376     "-send",
377     "-recv",
378     NULL
379 };  // end  NormThreadApp::cmd_list[]
380 
OnCommand(const char * cmd,const char * val)381 bool NormThreadApp::OnCommand(const char* cmd, const char* val)
382 {
383     size_t cmdlen = strlen(cmd);
384     if (!strncmp("debug", cmd, cmdlen))
385     {
386     }
387     else if (!strncmp("send", cmd, cmdlen))
388     {
389         sender = true;
390     }
391     else if (!strncmp("recv", cmd, cmdlen))
392     {
393         receiver = true;
394     }
395     else if (!strncmp("debug", cmd, cmdlen))
396     {
397         PLOG(PL_FATAL, "NormThreadApp::OnCommand(%s) unknown command\n", cmd);
398         return false;
399     }
400     return true;
401 }  // end NormThreadApp::ProcessCommands()
402 
CommandType(const char * cmd)403 NormThreadApp::CmdType NormThreadApp::CommandType(const char* cmd)
404 {
405     if (!cmd) return CMD_INVALID;
406     size_t len = strlen(cmd);
407     bool matched = false;
408     CmdType type = CMD_INVALID;
409     const char* const* nextCmd = cmd_list;
410     while (*nextCmd)
411     {
412         if (!strncmp(cmd, *nextCmd+1, len))
413         {
414             if (matched)
415             {
416                 // ambiguous command (command should match only once)
417                 return CMD_INVALID;
418             }
419             else
420             {
421                 matched = true;
422                 if ('+' == *nextCmd[0])
423                     type = CMD_ARG;
424                 else
425                     type = CMD_NOARG;
426             }
427         }
428         nextCmd++;
429     }
430     return type;
431 }  // end NormThreadApp::CommandType()
432 
433 
ProcessCommands(int argc,const char * const * argv)434 bool NormThreadApp::ProcessCommands(int argc, const char*const* argv)
435 {
436     int i = 1;
437     while ( i < argc)
438     {
439         CmdType cmdType = CommandType(argv[i]);
440         switch (cmdType)
441         {
442             case CMD_INVALID:
443                 PLOG(PL_FATAL, "NormThreadApp::ProcessCommands() Invalid command:%s\n",
444                         argv[i]);
445                 return false;
446             case CMD_NOARG:
447                 if (!OnCommand(argv[i], NULL))
448                 {
449                     PLOG(PL_FATAL, "NormThreadApp::ProcessCommands() OnCommand(%s) error\n",
450                             argv[i]);
451                     return false;
452                 }
453                 i++;
454                 break;
455             case CMD_ARG:
456                 if (!OnCommand(argv[i], argv[i+1]))
457                 {
458                     PLOG(PL_FATAL, "NormThreadApp::ProcessCommands() OnCommand(%s, %s) error\n",
459                             argv[i], argv[i+1]);
460                     return false;
461                 }
462                 i += 2;
463                 break;
464         }
465     }
466     return true;
467 }  // end NormThreadApp::ProcessCommands()
468 
469