1 #include "PhysicsClientUDP.h"
2 #include <enet/enet.h>
3 #include <stdio.h>
4 #include <string.h>
5 #include "../Utils/b3Clock.h"
6 #include "PhysicsClient.h"
7 //#include "LinearMath/btVector3.h"
8 #include "SharedMemoryCommands.h"
9 #include <string>
10 #include "Bullet3Common/b3Logging.h"
11 #include "../MultiThreading/b3ThreadSupportInterface.h"
12 void UDPThreadFunc(void* userPtr, void* lsMemory);
13 void* UDPlsMemoryFunc();
14 void UDPlsMemoryReleaseFunc(void* ptr);
15
16 bool gVerboseNetworkMessagesClient = false;
17
18 #ifndef _WIN32
19 #include "../MultiThreading/b3PosixThreadSupport.h"
20
createUDPThreadSupport(int numThreads)21 b3ThreadSupportInterface* createUDPThreadSupport(int numThreads)
22 {
23 b3PosixThreadSupport::ThreadConstructionInfo constructionInfo("UDPThread",
24 UDPThreadFunc,
25 UDPlsMemoryFunc,
26 UDPlsMemoryReleaseFunc,
27 numThreads);
28 b3ThreadSupportInterface* threadSupport = new b3PosixThreadSupport(constructionInfo);
29
30 return threadSupport;
31 }
32
33 #elif defined(_WIN32)
34 #include "../MultiThreading/b3Win32ThreadSupport.h"
35
createUDPThreadSupport(int numThreads)36 b3ThreadSupportInterface* createUDPThreadSupport(int numThreads)
37 {
38 b3Win32ThreadSupport::Win32ThreadConstructionInfo threadConstructionInfo("UDPThread", UDPThreadFunc, UDPlsMemoryFunc, UDPlsMemoryReleaseFunc, numThreads);
39 b3Win32ThreadSupport* threadSupport = new b3Win32ThreadSupport(threadConstructionInfo);
40 return threadSupport;
41 }
42 #endif
43
44 struct UDPThreadLocalStorage
45 {
46 int threadId;
47 };
48
b3DeserializeInt(const unsigned char * input)49 unsigned int b3DeserializeInt(const unsigned char* input)
50 {
51 unsigned int tmp = (input[3] << 24) + (input[2] << 16) + (input[1] << 8) + input[0];
52 return tmp;
53 }
54
55 struct UdpNetworkedInternalData
56 {
57 ENetHost* m_client;
58 ENetAddress m_address;
59 ENetPeer* m_peer;
60 ENetEvent m_event;
61 bool m_isConnected;
62
63 b3ThreadSupportInterface* m_threadSupport;
64
65 b3CriticalSection* m_cs;
66
67 UdpNetworkedInternalData* m_udpInternalData;
68
69 SharedMemoryCommand m_clientCmd;
70 bool m_hasCommand;
71
72 bool m_hasStatus;
73 SharedMemoryStatus m_lastStatus;
74 b3AlignedObjectArray<char> m_stream;
75
76 std::string m_hostName;
77 int m_port;
78 double m_timeOutInSeconds;
79
UdpNetworkedInternalDataUdpNetworkedInternalData80 UdpNetworkedInternalData()
81 : m_client(0),
82 m_peer(0),
83 m_isConnected(false),
84 m_threadSupport(0),
85 m_hasCommand(false),
86 m_hasStatus(false),
87 m_timeOutInSeconds(60)
88 {
89 }
90
connectUDPUdpNetworkedInternalData91 bool connectUDP()
92 {
93 if (m_isConnected)
94 return true;
95
96 if (enet_initialize() != 0)
97 {
98 fprintf(stderr, "Error initialising enet");
99
100 exit(EXIT_FAILURE);
101 }
102
103 m_client = enet_host_create(NULL, /* create a client host */
104 1, /* number of clients */
105 2, /* number of channels */
106 57600 / 8, /* incoming bandwith */
107 14400 / 8); /* outgoing bandwith */
108
109 if (m_client == NULL)
110 {
111 fprintf(stderr, "Could not create client host");
112 return false;
113 }
114
115 enet_address_set_host(&m_address, m_hostName.c_str());
116 m_address.port = m_port;
117
118 m_peer = enet_host_connect(m_client,
119 &m_address, /* address to connect to */
120 2, /* number of channels */
121 0); /* user data supplied to
122 the receiving host */
123
124 if (m_peer == NULL)
125 {
126 fprintf(stderr,
127 "No available peers for initiating an ENet "
128 "connection.\n");
129 return false;
130 }
131
132 /* Try to connect to server within 5 seconds */
133 if (enet_host_service(m_client, &m_event, 5000) > 0 &&
134 m_event.type == ENET_EVENT_TYPE_CONNECT)
135 {
136 puts("Connection to server succeeded.");
137 }
138 else
139 {
140 /* Either the 5 seconds are up or a disconnect event was */
141 /* received. Reset the peer in the event the 5 seconds */
142 /* had run out without any significant event. */
143 enet_peer_reset(m_peer);
144
145 fprintf(stderr, "Connection to server failed.");
146 return false;
147 }
148
149 int serviceResult = enet_host_service(m_client, &m_event, 0);
150
151 if (serviceResult > 0)
152 {
153 switch (m_event.type)
154 {
155 case ENET_EVENT_TYPE_CONNECT:
156 printf("A new client connected from %x:%u.\n",
157 m_event.peer->address.host,
158 m_event.peer->address.port);
159 m_event.peer->data = (void*)"New User";
160 break;
161
162 case ENET_EVENT_TYPE_RECEIVE:
163
164 if (gVerboseNetworkMessagesClient)
165 {
166 printf(
167 "A packet of length %lu containing '%s' was "
168 "received from %s on channel %u.\n",
169 m_event.packet->dataLength,
170 (char*)m_event.packet->data,
171 (char*)m_event.peer->data,
172 m_event.channelID);
173 }
174 /* Clean up the packet now that we're done using it.
175 > */
176 enet_packet_destroy(m_event.packet);
177
178 break;
179
180 case ENET_EVENT_TYPE_DISCONNECT:
181 printf("%s disconnected.\n", (char*)m_event.peer->data);
182
183 break;
184 default:
185 {
186 printf("unknown event type: %d.\n", m_event.type);
187 }
188 }
189 }
190 else if (serviceResult > 0)
191 {
192 puts("Error with servicing the client");
193 return false;
194 }
195
196 m_isConnected = true;
197 return m_isConnected;
198 }
199
checkDataUdpNetworkedInternalData200 bool checkData()
201 {
202 bool hasStatus = false;
203
204 int serviceResult = enet_host_service(m_client, &m_event, 0);
205
206 if (serviceResult > 0)
207 {
208 switch (m_event.type)
209 {
210 case ENET_EVENT_TYPE_CONNECT:
211 printf("A new client connected from %x:%u.\n",
212 m_event.peer->address.host,
213 m_event.peer->address.port);
214
215 m_event.peer->data = (void*)"New User";
216 break;
217
218 case ENET_EVENT_TYPE_RECEIVE:
219 {
220 if (gVerboseNetworkMessagesClient)
221 {
222 printf(
223 "A packet of length %lu containing '%s' was "
224 "received from %s on channel %u.\n",
225 m_event.packet->dataLength,
226 (char*)m_event.packet->data,
227 (char*)m_event.peer->data,
228 m_event.channelID);
229 }
230
231 int packetSizeInBytes = b3DeserializeInt(m_event.packet->data);
232
233 if (packetSizeInBytes == m_event.packet->dataLength)
234 {
235 SharedMemoryStatus* statPtr = (SharedMemoryStatus*)&m_event.packet->data[4];
236 if (statPtr->m_type == CMD_STEP_FORWARD_SIMULATION_COMPLETED)
237 {
238 SharedMemoryStatus dummy;
239 dummy.m_type = CMD_STEP_FORWARD_SIMULATION_COMPLETED;
240 m_lastStatus = dummy;
241 m_stream.resize(0);
242 }
243 else
244 {
245 m_lastStatus = *statPtr;
246 int streamOffsetInBytes = 4 + sizeof(SharedMemoryStatus);
247 int numStreamBytes = packetSizeInBytes - streamOffsetInBytes;
248 m_stream.resize(numStreamBytes);
249 for (int i = 0; i < numStreamBytes; i++)
250 {
251 m_stream[i] = m_event.packet->data[i + streamOffsetInBytes];
252 }
253 }
254 }
255 else
256 {
257 printf("unknown status message received\n");
258 }
259 enet_packet_destroy(m_event.packet);
260 hasStatus = true;
261 break;
262 }
263 case ENET_EVENT_TYPE_DISCONNECT:
264 {
265 printf("%s disconnected.\n", (char*)m_event.peer->data);
266
267 break;
268 }
269 default:
270 {
271 printf("unknown event type: %d.\n", m_event.type);
272 }
273 }
274 }
275 else if (serviceResult > 0)
276 {
277 puts("Error with servicing the client");
278 }
279
280 return hasStatus;
281 }
282 };
283
284 enum UDPThreadEnums
285 {
286 eUDPRequestTerminate = 13,
287 eUDPIsUnInitialized,
288 eUDPIsInitialized,
289 eUDPInitializationFailed,
290 eUDPHasTerminated
291 };
292
293 enum UDPCommandEnums
294 {
295 eUDPIdle = 13,
296 eUDP_ConnectRequest,
297 eUDP_Connected,
298 eUDP_ConnectionFailed,
299 eUDP_DisconnectRequest,
300 eUDP_Disconnected,
301
302 };
303
UDPThreadFunc(void * userPtr,void * lsMemory)304 void UDPThreadFunc(void* userPtr, void* lsMemory)
305 {
306 printf("UDPThreadFunc thread started\n");
307 // UDPThreadLocalStorage* localStorage = (UDPThreadLocalStorage*)lsMemory;
308
309 UdpNetworkedInternalData* args = (UdpNetworkedInternalData*)userPtr;
310 // int workLeft = true;
311 b3Clock clock;
312 clock.reset();
313 bool init = true;
314 if (init)
315 {
316 args->m_cs->lock();
317 args->m_cs->setSharedParam(0, eUDPIsInitialized);
318 args->m_cs->unlock();
319
320 double deltaTimeInSeconds = 0;
321
322 do
323 {
324 b3Clock::usleep(0);
325
326 deltaTimeInSeconds += double(clock.getTimeMicroseconds()) / 1000000.;
327
328 {
329 clock.reset();
330 deltaTimeInSeconds = 0.f;
331 switch (args->m_cs->getSharedParam(1))
332 {
333 case eUDP_ConnectRequest:
334 {
335 bool connected = args->connectUDP();
336 if (connected)
337 {
338 args->m_cs->setSharedParam(1, eUDP_Connected);
339 }
340 else
341 {
342 args->m_cs->setSharedParam(1, eUDP_ConnectionFailed);
343 }
344 break;
345 }
346 default:
347 {
348 }
349 };
350
351 if (args->m_isConnected)
352 {
353 args->m_cs->lock();
354 bool hasCommand = args->m_hasCommand;
355 args->m_cs->unlock();
356
357 if (hasCommand)
358 {
359 int sz = 0;
360 ENetPacket* packet = 0;
361
362 if (args->m_clientCmd.m_type == CMD_STEP_FORWARD_SIMULATION)
363 {
364 sz = sizeof(int);
365 packet = enet_packet_create(&args->m_clientCmd.m_type, sz, ENET_PACKET_FLAG_RELIABLE);
366 }
367 else
368 {
369 sz = sizeof(SharedMemoryCommand);
370 packet = enet_packet_create(&args->m_clientCmd, sz, ENET_PACKET_FLAG_RELIABLE);
371 }
372 int res;
373 res = enet_peer_send(args->m_peer, 0, packet);
374 args->m_cs->lock();
375 args->m_hasCommand = false;
376 args->m_cs->unlock();
377 }
378
379 bool hasNewStatus = args->checkData();
380 if (hasNewStatus)
381 {
382 if (args->m_hasStatus)
383 {
384 //overflow: last status hasn't been processed yet
385 b3Assert(0);
386 printf("Error: received new status but previous status not processed yet");
387 }
388 else
389 {
390 args->m_cs->lock();
391 args->m_hasStatus = hasNewStatus;
392 args->m_cs->unlock();
393 }
394 }
395 }
396 }
397
398 } while (args->m_cs->getSharedParam(0) != eUDPRequestTerminate);
399 }
400 else
401 {
402 args->m_cs->lock();
403 args->m_cs->setSharedParam(0, eUDPInitializationFailed);
404 args->m_cs->unlock();
405 }
406
407 printf("finished\n");
408 }
409
UDPlsMemoryFunc()410 void* UDPlsMemoryFunc()
411 {
412 //don't create local store memory, just return 0
413 return new UDPThreadLocalStorage;
414 }
415
UDPlsMemoryReleaseFunc(void * ptr)416 void UDPlsMemoryReleaseFunc(void* ptr)
417 {
418 UDPThreadLocalStorage* p = (UDPThreadLocalStorage*)ptr;
419 delete p;
420 }
421
UdpNetworkedPhysicsProcessor(const char * hostName,int port)422 UdpNetworkedPhysicsProcessor::UdpNetworkedPhysicsProcessor(const char* hostName, int port)
423 {
424 m_data = new UdpNetworkedInternalData;
425 if (hostName)
426 {
427 m_data->m_hostName = hostName;
428 }
429 m_data->m_port = port;
430 }
431
~UdpNetworkedPhysicsProcessor()432 UdpNetworkedPhysicsProcessor::~UdpNetworkedPhysicsProcessor()
433 {
434 disconnect();
435 delete m_data;
436 }
437
processCommand(const struct SharedMemoryCommand & clientCmd,struct SharedMemoryStatus & serverStatusOut,char * bufferServerToClient,int bufferSizeInBytes)438 bool UdpNetworkedPhysicsProcessor::processCommand(const struct SharedMemoryCommand& clientCmd, struct SharedMemoryStatus& serverStatusOut, char* bufferServerToClient, int bufferSizeInBytes)
439 {
440 if (gVerboseNetworkMessagesClient)
441 {
442 printf("PhysicsClientUDP::processCommand\n");
443 }
444 // int sz = sizeof(SharedMemoryCommand);
445
446 b3Clock clock;
447 double startTime = clock.getTimeInSeconds();
448 double timeOutInSeconds = m_data->m_timeOutInSeconds;
449
450 m_data->m_cs->lock();
451 m_data->m_clientCmd = clientCmd;
452 m_data->m_hasCommand = true;
453 m_data->m_cs->unlock();
454
455 while ((m_data->m_hasCommand) && (clock.getTimeInSeconds() - startTime < timeOutInSeconds))
456 {
457 b3Clock::usleep(0);
458 }
459
460 #if 0
461
462
463 bool hasStatus = false;
464
465 b3Clock clock;
466 double startTime = clock.getTimeInSeconds();
467 double timeOutInSeconds = m_data->m_timeOutInSeconds;
468
469 const SharedMemoryStatus* stat = 0;
470 while ((!hasStatus) && (clock.getTimeInSeconds() - startTime < timeOutInSeconds))
471 {
472 hasStatus = receiveStatus(serverStatusOut, bufferServerToClient, bufferSizeInBytes);
473 b3Clock::usleep(100);
474 }
475 return hasStatus;
476
477 #endif
478
479 return false;
480 }
481
receiveStatus(struct SharedMemoryStatus & serverStatusOut,char * bufferServerToClient,int bufferSizeInBytes)482 bool UdpNetworkedPhysicsProcessor::receiveStatus(struct SharedMemoryStatus& serverStatusOut, char* bufferServerToClient, int bufferSizeInBytes)
483 {
484 bool hasStatus = false;
485 if (m_data->m_hasStatus)
486 {
487 if (gVerboseNetworkMessagesClient)
488 {
489 printf("UdpNetworkedPhysicsProcessor::receiveStatus\n");
490 }
491
492 hasStatus = true;
493 serverStatusOut = m_data->m_lastStatus;
494 int numStreamBytes = m_data->m_stream.size();
495
496 if (numStreamBytes < bufferSizeInBytes)
497 {
498 for (int i = 0; i < numStreamBytes; i++)
499 {
500 bufferServerToClient[i] = m_data->m_stream[i];
501 }
502 }
503 else
504 {
505 printf("Error: steam buffer overflow\n");
506 }
507
508 m_data->m_cs->lock();
509 m_data->m_hasStatus = false;
510 m_data->m_cs->unlock();
511 }
512
513 return hasStatus;
514 }
515
renderScene(int renderFlags)516 void UdpNetworkedPhysicsProcessor::renderScene(int renderFlags)
517 {
518 }
519
physicsDebugDraw(int debugDrawFlags)520 void UdpNetworkedPhysicsProcessor::physicsDebugDraw(int debugDrawFlags)
521 {
522 }
523
setGuiHelper(struct GUIHelperInterface * guiHelper)524 void UdpNetworkedPhysicsProcessor::setGuiHelper(struct GUIHelperInterface* guiHelper)
525 {
526 }
527
isConnected() const528 bool UdpNetworkedPhysicsProcessor::isConnected() const
529 {
530 return m_data->m_isConnected;
531 }
532
connect()533 bool UdpNetworkedPhysicsProcessor::connect()
534 {
535 if (m_data->m_threadSupport == 0)
536 {
537 m_data->m_threadSupport = createUDPThreadSupport(1);
538
539 m_data->m_cs = m_data->m_threadSupport->createCriticalSection();
540 m_data->m_cs->setSharedParam(0, eUDPIsUnInitialized);
541 m_data->m_threadSupport->runTask(B3_THREAD_SCHEDULE_TASK, (void*)m_data, 0);
542
543 while (m_data->m_cs->getSharedParam(0) == eUDPIsUnInitialized)
544 {
545 b3Clock::usleep(1000);
546 }
547
548 m_data->m_cs->lock();
549 m_data->m_cs->setSharedParam(1, eUDP_ConnectRequest);
550 m_data->m_cs->unlock();
551
552 while (m_data->m_cs->getSharedParam(1) == eUDP_ConnectRequest)
553 {
554 b3Clock::usleep(1000);
555 }
556 }
557 unsigned int sharedParam = m_data->m_cs->getSharedParam(1);
558 bool isConnected = (sharedParam == eUDP_Connected);
559 return isConnected;
560 }
561
disconnect()562 void UdpNetworkedPhysicsProcessor::disconnect()
563 {
564 if (m_data->m_threadSupport)
565 {
566 m_data->m_cs->lock();
567 m_data->m_cs->setSharedParam(0, eUDPRequestTerminate);
568 m_data->m_cs->unlock();
569
570 int numActiveThreads = 1;
571
572 while (numActiveThreads)
573 {
574 int arg0, arg1;
575 if (m_data->m_threadSupport->isTaskCompleted(&arg0, &arg1, 0))
576 {
577 numActiveThreads--;
578 printf("numActiveThreads = %d\n", numActiveThreads);
579 }
580 else
581 {
582 b3Clock::usleep(1000);
583 }
584 };
585
586 printf("stopping threads\n");
587
588 delete m_data->m_threadSupport;
589 m_data->m_threadSupport = 0;
590 m_data->m_isConnected = false;
591 }
592 }
593
setTimeOut(double timeOutInSeconds)594 void UdpNetworkedPhysicsProcessor::setTimeOut(double timeOutInSeconds)
595 {
596 m_data->m_timeOutInSeconds = timeOutInSeconds;
597 }
598