1 // This is a test application for experimenting
2 // with the NORM API implementation during its
3 // development. A better-documented and complete
4 // example of the NORM API usage will be provided
5 // when the NORM API is more complete.
6
7 // This tests the use of the NORM API in a multi-threaded app
8
9 #include "normApi.h"
10 #include "protokit.h" // for protolib debug, stuff, etc
11
12 #include <stdio.h>
13 #include <stdlib.h> // for srand()
14
15 #ifdef UNIX
16 #include <unistd.h> // for "sleep()"
17 #endif // UNIX
18
19
20 class NormThreadApp : public ProtoApp
21 {
22 public:
23 NormThreadApp();
24 virtual ~NormThreadApp();
25
26 // Overrides from ProtoApp or NsProtoSimAgent base
27 bool OnStartup(int argc, const char*const* argv);
28 bool ProcessCommands(int argc, const char*const* argv);
29 void OnShutdown();
30
31 bool OnCommand(const char* cmd, const char* val);
32
33 private:
34 enum CmdType {CMD_INVALID, CMD_NOARG, CMD_ARG};
35 CmdType CommandType(const char* cmd);
36 static const char* const cmd_list[];
37
38 static void DoNormEvent(ProtoDispatcher::Descriptor descriptor,
39 ProtoDispatcher::Event theEvent,
40 const void* userData);
41
42 void OnNormEvent();
43
44 static void DoWorkerEvent(const void* receiverData);
45 void WorkerReadStream();
46
47 enum {MSG_SIZE_MAX = 1024};
48
49 bool OnTxTimeout(ProtoTimer& timer);
50
51 // Our NORM API handles
52 NormInstanceHandle norm_instance;
53 NormSessionHandle norm_session;
54 NormObjectHandle norm_tx_stream;
55 NormObjectHandle norm_rx_stream;
56
57 bool sender;
58 bool receiver;
59
60 // This timer controls our message tx rate
61 ProtoTimer tx_msg_timer;
62 char tx_msg_buffer[MSG_SIZE_MAX];
63 unsigned int tx_msg_length;
64 unsigned int tx_msg_index;
65
66 // We use a ProtoDispatcher for a "worker" thread
67 // to handle NORM events for a specific session
68 ProtoDispatcher worker_thread_dispatcher;
69
70
71
72 }; // end class NormThreadApp
73
74 // Our application instance
PROTO_INSTANTIATE_APP(NormThreadApp)75 PROTO_INSTANTIATE_APP(NormThreadApp)
76
77 NormThreadApp::NormThreadApp()
78 : norm_instance(NORM_INSTANCE_INVALID),
79 norm_session(NORM_SESSION_INVALID),
80 norm_tx_stream(NORM_OBJECT_INVALID), norm_rx_stream(NORM_OBJECT_INVALID),
81 tx_msg_length(0), tx_msg_index(0)
82 {
83 tx_msg_timer.SetListener(this, &NormThreadApp::OnTxTimeout);
84 tx_msg_timer.SetInterval(0.00001);
85 tx_msg_timer.SetRepeat(-1);
86
87 worker_thread_dispatcher.SetPromptCallback(DoWorkerEvent, this);
88
89 }
90
~NormThreadApp()91 NormThreadApp::~NormThreadApp()
92 {
93
94 }
95
OnStartup(int argc,const char * const * argv)96 bool NormThreadApp::OnStartup(int argc, const char*const* argv)
97 {
98 // 1) (TBD) Process any command-line options
99
100 if (!ProcessCommands(argc, argv))
101 {
102 TRACE("error with commands\n");
103 return false;
104 }
105
106 // 2) Create a Norm API instance and "generic" input handler for notifications
107 norm_instance = NormCreateInstance();
108 ASSERT(NORM_INSTANCE_INVALID != norm_instance);
109
110 SetDebugLevel(3);
111
112 // Set a callback that will call NormGetNextEvent()
113 if (!dispatcher.InstallGenericInput(NormGetDescriptor(norm_instance), DoNormEvent, this))
114 {
115 PLOG(PL_FATAL, "NormThreadApp::OnStartup() InstallGenericInput() error\n");
116 NormDestroyInstance(norm_instance);
117 return false;
118 }
119
120
121 // 3) Create a "custom" unique node identifier
122 // Here's a trick to generate a _hopefully_ unique NormNodeId
123 // based on an XOR of the system's IP address and the process id.
124 // (We use ProtoAddress::GetEndIdentifier() to get the local
125 // "default" IP address for the system)
126 // (Note that passing "NORM_NODE_ANY" to the last arg of
127 // NormCreateSession() does a similar thing but without
128 // the processId XOR ... perhaps we should add the processId
129 // hack to that default "NORM_NODE_ANY" local NormNodeId picker???
130 ProtoAddress localAddr;
131 if (!localAddr.ResolveLocalAddress())
132 {
133 fprintf(stderr, "normTest: error resolving local IP address\n");
134 OnShutdown();
135 return false;
136 }
137 NormNodeId localId = localAddr.EndIdentifier();
138 #ifdef WIN32
139 DWORD processId = GetCurrentProcessId();
140 #else
141 pid_t processId = getpid();
142 #endif // if/else WIN32/UNIX
143 localId ^= (NormNodeId)processId;
144
145 // If needed, permutate to a valid, random NormNodeId
146 while ((NORM_NODE_ANY == localId) ||
147 (NORM_NODE_NONE == localId))
148 {
149 localId ^= (NormNodeId)rand();
150 }
151 //localId = 15; // for testing purposes
152
153 // 4) Create a NORM session
154 norm_session = NormCreateSession(norm_instance,
155 "224.1.1.1",
156 6001,
157 localId);
158 ASSERT(NORM_SESSION_INVALID != norm_session);
159
160 if(!NormSetMulticastInterface(norm_session,"eth0"))
161 {
162 fprintf(stderr, "normTest: Unable to set multicast interface to \"eth0\"\n");
163 //return false;
164 }
165
166 NormSetGrttEstimate(norm_session, 0.250); // 1 msec initial grtt
167
168 NormSetTxRate(norm_session, 80.0e+06); // in bits/second
169
170
171 NormSetTxRateBounds(norm_session, 10.0e+06, 10.0e+06);
172
173 NormSetCongestionControl(norm_session, true);
174
175 NormSetTxLoss(norm_session, 2.0);
176
177 //NormSetMessageTrace(norm_session, true);
178
179 //NormSetLoopback(norm_session, true);
180
181 // 5) If sender pick a random "session id", start sender, and create a stream
182 // We use a random "sessionId"
183 if (sender)
184 {
185 NormSessionId sessionId = (NormSessionId)rand();
186 //NormStartSender(norm_session, sessionId, 1024*1024, 1400, 64, 8);
187 NormStartSender(norm_session, sessionId, 2000000, 1400, 64, 8);
188 norm_tx_stream = NormStreamOpen(norm_session, 1800000);
189
190 // Activate tx timer and force first message transmission
191 ActivateTimer(tx_msg_timer);
192 OnTxTimeout(tx_msg_timer);
193 }
194
195
196 // 6) If receiver, start receiver
197 if (receiver)
198 {
199 worker_thread_dispatcher.StartThread();
200 NormStartReceiver(norm_session, 2000000);
201 }
202
203 return true;
204 } // end NormThreadApp::OnStartup()
205
OnShutdown()206 void NormThreadApp::OnShutdown()
207 {
208 if (tx_msg_timer.IsActive()) tx_msg_timer.Deactivate();
209
210 if (NORM_INSTANCE_INVALID != norm_instance)
211 {
212 dispatcher.RemoveGenericInput(NormGetDescriptor(norm_instance));
213 NormDestroyInstance(norm_instance);
214 norm_instance = NORM_INSTANCE_INVALID;
215 }
216
217
218 worker_thread_dispatcher.Stop();
219 } // end NormThreadApp::OnShutdown()
220
OnTxTimeout(ProtoTimer &)221 bool NormThreadApp::OnTxTimeout(ProtoTimer& /*timer*/)
222 {
223 //TRACE("enter NormThreadApp::OnTxTimeout() ...\n");
224 while (1)
225 {
226 if (0 == tx_msg_length)
227 {
228 // Send a new message
229 unsigned int sendCount = 1;
230 sprintf(tx_msg_buffer, "normThreadTest says hello %u ", sendCount);
231 unsigned int msgLength = strlen(tx_msg_buffer);
232 memset(tx_msg_buffer + msgLength, 'a', 900 - msgLength);
233 tx_msg_length = 900;
234 tx_msg_index = 0;
235 }
236
237 unsigned int bytesHave = tx_msg_length - tx_msg_index;
238 unsigned int bytesWritten =
239 NormStreamWrite(norm_tx_stream, tx_msg_buffer + tx_msg_index, tx_msg_length - tx_msg_index);
240 //TRACE("wrote %u bytes to stream ...\n", bytesWritten);
241 if (bytesWritten == bytesHave)
242 {
243
244 tx_msg_length = 0;
245 }
246 else
247 {
248 if (0 == bytesWritten)
249 {
250 TRACE("ZERO bytes written\n");
251 }
252 // We filled the stream buffer
253 tx_msg_index += bytesWritten;
254 if (tx_msg_timer.IsActive()) tx_msg_timer.Deactivate();
255 break;
256 }
257 }
258 return true;
259 } // end NormThreadApp::OnTxTimeout()
260
DoNormEvent(ProtoDispatcher::Descriptor descriptor,ProtoDispatcher::Event theEvent,const void * userData)261 void NormThreadApp::DoNormEvent(ProtoDispatcher::Descriptor descriptor,
262 ProtoDispatcher::Event theEvent,
263 const void* userData)
264 {
265 NormThreadApp* theApp = reinterpret_cast<NormThreadApp*>((void*)userData);
266 theApp->OnNormEvent();
267 } // end NormThreadApp::DoNormEvent()
268
OnNormEvent()269 void NormThreadApp::OnNormEvent()
270 {
271 static unsigned long updateCount = 0;
272 NormEvent theEvent;
273 if (NormGetNextEvent(norm_instance, &theEvent))
274 {
275 switch (theEvent.type)
276 {
277 case NORM_TX_QUEUE_EMPTY:
278 case NORM_TX_QUEUE_VACANCY:
279
280 /*if (NORM_TX_QUEUE_VACANCY == theEvent.type)
281 TRACE("NORM_TX_QUEUE_VACANCY ...\n");
282 else
283 TRACE("NORM_TX_QUEUE_EMPTY ...\n");*/
284 if (!tx_msg_timer.IsActive())
285 {
286 //ActivateTimer(tx_msg_timer);
287 OnTxTimeout(tx_msg_timer);
288 }
289 break;
290
291 case NORM_GRTT_UPDATED:
292 break;
293
294 case NORM_CC_ACTIVE:
295 TRACE("NORM_CC_ACTIVE ...\n");
296 break;
297
298 case NORM_CC_INACTIVE:
299 TRACE("NORM_CC_INACTIVE ...\n");
300 break;
301
302 case NORM_REMOTE_SENDER_NEW:
303 case NORM_REMOTE_SENDER_ACTIVE:
304 break;
305
306 case NORM_REMOTE_SENDER_INACTIVE:
307 NormNodeFreeBuffers(theEvent.sender);
308 break;
309
310 case NORM_RX_OBJECT_NEW:
311 TRACE("NORM_RX_OBJECT_NEW ...\n");
312 norm_rx_stream = theEvent.object;
313 break;
314
315 case NORM_RX_OBJECT_UPDATED:
316 //TRACE("NORM_RX_OBJECT_UPDATED ...\n");
317 updateCount++;
318 if ((updateCount % 1000) == 0)
319 TRACE("updateCount:%lu\n", updateCount);
320 worker_thread_dispatcher.PromptThread();
321 break;
322
323 case NORM_RX_OBJECT_ABORTED:
324 TRACE("NORM_RX_OBJECT_ABORTED ...\n");
325 break;
326
327 default:
328 TRACE("UNHANDLED NORM EVENT : %d...\n", theEvent.type);
329 break;
330 }
331 }
332 else
333 {
334 PLOG(PL_ERROR, "NormThreadApp::OnNormEvent() NormGetNextEvent() error?\n");
335 }
336 } // end NormThreadApp::OnNormEvent()
337
DoWorkerEvent(const void * receiverData)338 void NormThreadApp::DoWorkerEvent(const void* receiverData)
339 {
340 NormThreadApp* theApp = (NormThreadApp*)receiverData;
341 theApp->WorkerReadStream();
342 } // end NormThreadApp::DoWorkerEvent()
343
WorkerReadStream()344 void NormThreadApp::WorkerReadStream()
345 {
346 unsigned int loopCount = 0;
347 static unsigned long readCount = 0;
348 char rxBuffer[2048];
349 unsigned int bytesRead = 1400;
350 do
351 {
352 if (NormStreamRead(norm_rx_stream, rxBuffer, &bytesRead))
353 {
354 //TRACE("read %u bytes from stream ...\n", bytesRead);
355 if (0 != bytesRead)
356 {
357 readCount++;
358 if ((readCount % 1000) == 0)
359 TRACE("readCount:%lu\n", readCount);
360 bytesRead = 1400;
361 }
362 }
363 else
364 {
365 TRACE("NormThreadApp::WorkerReadStream() stream broken!\n");
366 bytesRead = 0;
367 }
368 loopCount++;
369 } while (0 != bytesRead);
370 //TRACE("loopCount: %lu\n", loopCount);
371 } // end NormThreadApp::WorkerReadStream()
372
373 const char* const NormThreadApp::cmd_list[] =
374 {
375 "+debug", // debug level
376 "-send",
377 "-recv",
378 NULL
379 }; // end NormThreadApp::cmd_list[]
380
OnCommand(const char * cmd,const char * val)381 bool NormThreadApp::OnCommand(const char* cmd, const char* val)
382 {
383 size_t cmdlen = strlen(cmd);
384 if (!strncmp("debug", cmd, cmdlen))
385 {
386 }
387 else if (!strncmp("send", cmd, cmdlen))
388 {
389 sender = true;
390 }
391 else if (!strncmp("recv", cmd, cmdlen))
392 {
393 receiver = true;
394 }
395 else if (!strncmp("debug", cmd, cmdlen))
396 {
397 PLOG(PL_FATAL, "NormThreadApp::OnCommand(%s) unknown command\n", cmd);
398 return false;
399 }
400 return true;
401 } // end NormThreadApp::ProcessCommands()
402
CommandType(const char * cmd)403 NormThreadApp::CmdType NormThreadApp::CommandType(const char* cmd)
404 {
405 if (!cmd) return CMD_INVALID;
406 size_t len = strlen(cmd);
407 bool matched = false;
408 CmdType type = CMD_INVALID;
409 const char* const* nextCmd = cmd_list;
410 while (*nextCmd)
411 {
412 if (!strncmp(cmd, *nextCmd+1, len))
413 {
414 if (matched)
415 {
416 // ambiguous command (command should match only once)
417 return CMD_INVALID;
418 }
419 else
420 {
421 matched = true;
422 if ('+' == *nextCmd[0])
423 type = CMD_ARG;
424 else
425 type = CMD_NOARG;
426 }
427 }
428 nextCmd++;
429 }
430 return type;
431 } // end NormThreadApp::CommandType()
432
433
ProcessCommands(int argc,const char * const * argv)434 bool NormThreadApp::ProcessCommands(int argc, const char*const* argv)
435 {
436 int i = 1;
437 while ( i < argc)
438 {
439 CmdType cmdType = CommandType(argv[i]);
440 switch (cmdType)
441 {
442 case CMD_INVALID:
443 PLOG(PL_FATAL, "NormThreadApp::ProcessCommands() Invalid command:%s\n",
444 argv[i]);
445 return false;
446 case CMD_NOARG:
447 if (!OnCommand(argv[i], NULL))
448 {
449 PLOG(PL_FATAL, "NormThreadApp::ProcessCommands() OnCommand(%s) error\n",
450 argv[i]);
451 return false;
452 }
453 i++;
454 break;
455 case CMD_ARG:
456 if (!OnCommand(argv[i], argv[i+1]))
457 {
458 PLOG(PL_FATAL, "NormThreadApp::ProcessCommands() OnCommand(%s, %s) error\n",
459 argv[i], argv[i+1]);
460 return false;
461 }
462 i += 2;
463 break;
464 }
465 }
466 return true;
467 } // end NormThreadApp::ProcessCommands()
468
469