1 #include "PhysicsClientUDP.h"
2 #include <enet/enet.h>
3 #include <stdio.h>
4 #include <string.h>
5 #include "../Utils/b3Clock.h"
6 #include "PhysicsClient.h"
7 //#include "LinearMath/btVector3.h"
8 #include "SharedMemoryCommands.h"
9 #include <string>
10 #include "Bullet3Common/b3Logging.h"
11 #include "../MultiThreading/b3ThreadSupportInterface.h"
12 void UDPThreadFunc(void* userPtr, void* lsMemory);
13 void* UDPlsMemoryFunc();
14 void UDPlsMemoryReleaseFunc(void* ptr);
15 
16 bool gVerboseNetworkMessagesClient = false;
17 
18 #ifndef _WIN32
19 #include "../MultiThreading/b3PosixThreadSupport.h"
20 
createUDPThreadSupport(int numThreads)21 b3ThreadSupportInterface* createUDPThreadSupport(int numThreads)
22 {
23 	b3PosixThreadSupport::ThreadConstructionInfo constructionInfo("UDPThread",
24 																  UDPThreadFunc,
25 																  UDPlsMemoryFunc,
26 																  UDPlsMemoryReleaseFunc,
27 																  numThreads);
28 	b3ThreadSupportInterface* threadSupport = new b3PosixThreadSupport(constructionInfo);
29 
30 	return threadSupport;
31 }
32 
33 #elif defined(_WIN32)
34 #include "../MultiThreading/b3Win32ThreadSupport.h"
35 
createUDPThreadSupport(int numThreads)36 b3ThreadSupportInterface* createUDPThreadSupport(int numThreads)
37 {
38 	b3Win32ThreadSupport::Win32ThreadConstructionInfo threadConstructionInfo("UDPThread", UDPThreadFunc, UDPlsMemoryFunc, UDPlsMemoryReleaseFunc, numThreads);
39 	b3Win32ThreadSupport* threadSupport = new b3Win32ThreadSupport(threadConstructionInfo);
40 	return threadSupport;
41 }
42 #endif
43 
44 struct UDPThreadLocalStorage
45 {
46 	int threadId;
47 };
48 
b3DeserializeInt(const unsigned char * input)49 unsigned int b3DeserializeInt(const unsigned char* input)
50 {
51 	unsigned int tmp = (input[3] << 24) + (input[2] << 16) + (input[1] << 8) + input[0];
52 	return tmp;
53 }
54 
55 struct UdpNetworkedInternalData
56 {
57 	ENetHost* m_client;
58 	ENetAddress m_address;
59 	ENetPeer* m_peer;
60 	ENetEvent m_event;
61 	bool m_isConnected;
62 
63 	b3ThreadSupportInterface* m_threadSupport;
64 
65 	b3CriticalSection* m_cs;
66 
67 	UdpNetworkedInternalData* m_udpInternalData;
68 
69 	SharedMemoryCommand m_clientCmd;
70 	bool m_hasCommand;
71 
72 	bool m_hasStatus;
73 	SharedMemoryStatus m_lastStatus;
74 	b3AlignedObjectArray<char> m_stream;
75 
76 	std::string m_hostName;
77 	int m_port;
78 	double m_timeOutInSeconds;
79 
UdpNetworkedInternalDataUdpNetworkedInternalData80 	UdpNetworkedInternalData()
81 		: m_client(0),
82 		  m_peer(0),
83 		  m_isConnected(false),
84 		  m_threadSupport(0),
85 		  m_hasCommand(false),
86 		  m_hasStatus(false),
87 		  m_timeOutInSeconds(60)
88 	{
89 	}
90 
connectUDPUdpNetworkedInternalData91 	bool connectUDP()
92 	{
93 		if (m_isConnected)
94 			return true;
95 
96 		if (enet_initialize() != 0)
97 		{
98 			fprintf(stderr, "Error initialising enet");
99 
100 			exit(EXIT_FAILURE);
101 		}
102 
103 		m_client = enet_host_create(NULL,       /* create a client host */
104 									1,          /* number of clients */
105 									2,          /* number of channels */
106 									57600 / 8,  /* incoming bandwith */
107 									14400 / 8); /* outgoing bandwith */
108 
109 		if (m_client == NULL)
110 		{
111 			fprintf(stderr, "Could not create client host");
112 			return false;
113 		}
114 
115 		enet_address_set_host(&m_address, m_hostName.c_str());
116 		m_address.port = m_port;
117 
118 		m_peer = enet_host_connect(m_client,
119 								   &m_address, /* address to connect to */
120 								   2,          /* number of channels */
121 								   0);         /* user data supplied to
122 						 the receiving host */
123 
124 		if (m_peer == NULL)
125 		{
126 			fprintf(stderr,
127 					"No available peers for initiating an ENet "
128 					"connection.\n");
129 			return false;
130 		}
131 
132 		/* Try to connect to server within 5 seconds */
133 		if (enet_host_service(m_client, &m_event, 5000) > 0 &&
134 			m_event.type == ENET_EVENT_TYPE_CONNECT)
135 		{
136 			puts("Connection to server succeeded.");
137 		}
138 		else
139 		{
140 			/* Either the 5 seconds are up or a disconnect event was */
141 			/* received. Reset the peer in the event the 5 seconds   */
142 			/* had run out without any significant event.            */
143 			enet_peer_reset(m_peer);
144 
145 			fprintf(stderr, "Connection to server failed.");
146 			return false;
147 		}
148 
149 		int serviceResult = enet_host_service(m_client, &m_event, 0);
150 
151 		if (serviceResult > 0)
152 		{
153 			switch (m_event.type)
154 			{
155 				case ENET_EVENT_TYPE_CONNECT:
156 					printf("A new client connected from %x:%u.\n",
157 						   m_event.peer->address.host,
158 						   m_event.peer->address.port);
159 					m_event.peer->data = (void*)"New User";
160 					break;
161 
162 				case ENET_EVENT_TYPE_RECEIVE:
163 
164 					if (gVerboseNetworkMessagesClient)
165 					{
166 						printf(
167 							"A packet of length %lu containing '%s' was "
168 							"received from %s on channel %u.\n",
169 							m_event.packet->dataLength,
170 							(char*)m_event.packet->data,
171 							(char*)m_event.peer->data,
172 							m_event.channelID);
173 					}
174 					/* Clean up the packet now that we're done using it.
175 				> */
176 					enet_packet_destroy(m_event.packet);
177 
178 					break;
179 
180 				case ENET_EVENT_TYPE_DISCONNECT:
181 					printf("%s disconnected.\n", (char*)m_event.peer->data);
182 
183 					break;
184 				default:
185 				{
186 					printf("unknown event type: %d.\n", m_event.type);
187 				}
188 			}
189 		}
190 		else if (serviceResult > 0)
191 		{
192 			puts("Error with servicing the client");
193 			return false;
194 		}
195 
196 		m_isConnected = true;
197 		return m_isConnected;
198 	}
199 
checkDataUdpNetworkedInternalData200 	bool checkData()
201 	{
202 		bool hasStatus = false;
203 
204 		int serviceResult = enet_host_service(m_client, &m_event, 0);
205 
206 		if (serviceResult > 0)
207 		{
208 			switch (m_event.type)
209 			{
210 				case ENET_EVENT_TYPE_CONNECT:
211 					printf("A new client connected from %x:%u.\n",
212 						   m_event.peer->address.host,
213 						   m_event.peer->address.port);
214 
215 					m_event.peer->data = (void*)"New User";
216 					break;
217 
218 				case ENET_EVENT_TYPE_RECEIVE:
219 				{
220 					if (gVerboseNetworkMessagesClient)
221 					{
222 						printf(
223 							"A packet of length %lu containing '%s' was "
224 							"received from %s on channel %u.\n",
225 							m_event.packet->dataLength,
226 							(char*)m_event.packet->data,
227 							(char*)m_event.peer->data,
228 							m_event.channelID);
229 					}
230 
231 					int packetSizeInBytes = b3DeserializeInt(m_event.packet->data);
232 
233 					if (packetSizeInBytes == m_event.packet->dataLength)
234 					{
235 						SharedMemoryStatus* statPtr = (SharedMemoryStatus*)&m_event.packet->data[4];
236 						if (statPtr->m_type == CMD_STEP_FORWARD_SIMULATION_COMPLETED)
237 						{
238 							SharedMemoryStatus dummy;
239 							dummy.m_type = CMD_STEP_FORWARD_SIMULATION_COMPLETED;
240 							m_lastStatus = dummy;
241 							m_stream.resize(0);
242 						}
243 						else
244 						{
245 							m_lastStatus = *statPtr;
246 							int streamOffsetInBytes = 4 + sizeof(SharedMemoryStatus);
247 							int numStreamBytes = packetSizeInBytes - streamOffsetInBytes;
248 							m_stream.resize(numStreamBytes);
249 							for (int i = 0; i < numStreamBytes; i++)
250 							{
251 								m_stream[i] = m_event.packet->data[i + streamOffsetInBytes];
252 							}
253 						}
254 					}
255 					else
256 					{
257 						printf("unknown status message received\n");
258 					}
259 					enet_packet_destroy(m_event.packet);
260 					hasStatus = true;
261 					break;
262 				}
263 				case ENET_EVENT_TYPE_DISCONNECT:
264 				{
265 					printf("%s disconnected.\n", (char*)m_event.peer->data);
266 
267 					break;
268 				}
269 				default:
270 				{
271 					printf("unknown event type: %d.\n", m_event.type);
272 				}
273 			}
274 		}
275 		else if (serviceResult > 0)
276 		{
277 			puts("Error with servicing the client");
278 		}
279 
280 		return hasStatus;
281 	}
282 };
283 
284 enum UDPThreadEnums
285 {
286 	eUDPRequestTerminate = 13,
287 	eUDPIsUnInitialized,
288 	eUDPIsInitialized,
289 	eUDPInitializationFailed,
290 	eUDPHasTerminated
291 };
292 
293 enum UDPCommandEnums
294 {
295 	eUDPIdle = 13,
296 	eUDP_ConnectRequest,
297 	eUDP_Connected,
298 	eUDP_ConnectionFailed,
299 	eUDP_DisconnectRequest,
300 	eUDP_Disconnected,
301 
302 };
303 
UDPThreadFunc(void * userPtr,void * lsMemory)304 void UDPThreadFunc(void* userPtr, void* lsMemory)
305 {
306 	printf("UDPThreadFunc thread started\n");
307 	//	UDPThreadLocalStorage* localStorage = (UDPThreadLocalStorage*)lsMemory;
308 
309 	UdpNetworkedInternalData* args = (UdpNetworkedInternalData*)userPtr;
310 	//	int workLeft = true;
311 	b3Clock clock;
312 	clock.reset();
313 	bool init = true;
314 	if (init)
315 	{
316 		args->m_cs->lock();
317 		args->m_cs->setSharedParam(0, eUDPIsInitialized);
318 		args->m_cs->unlock();
319 
320 		double deltaTimeInSeconds = 0;
321 
322 		do
323 		{
324 			b3Clock::usleep(0);
325 
326 			deltaTimeInSeconds += double(clock.getTimeMicroseconds()) / 1000000.;
327 
328 			{
329 				clock.reset();
330 				deltaTimeInSeconds = 0.f;
331 				switch (args->m_cs->getSharedParam(1))
332 				{
333 					case eUDP_ConnectRequest:
334 					{
335 						bool connected = args->connectUDP();
336 						if (connected)
337 						{
338 							args->m_cs->setSharedParam(1, eUDP_Connected);
339 						}
340 						else
341 						{
342 							args->m_cs->setSharedParam(1, eUDP_ConnectionFailed);
343 						}
344 						break;
345 					}
346 					default:
347 					{
348 					}
349 				};
350 
351 				if (args->m_isConnected)
352 				{
353 					args->m_cs->lock();
354 					bool hasCommand = args->m_hasCommand;
355 					args->m_cs->unlock();
356 
357 					if (hasCommand)
358 					{
359 						int sz = 0;
360 						ENetPacket* packet = 0;
361 
362 						if (args->m_clientCmd.m_type == CMD_STEP_FORWARD_SIMULATION)
363 						{
364 							sz = sizeof(int);
365 							packet = enet_packet_create(&args->m_clientCmd.m_type, sz, ENET_PACKET_FLAG_RELIABLE);
366 						}
367 						else
368 						{
369 							sz = sizeof(SharedMemoryCommand);
370 							packet = enet_packet_create(&args->m_clientCmd, sz, ENET_PACKET_FLAG_RELIABLE);
371 						}
372 						int res;
373 						res = enet_peer_send(args->m_peer, 0, packet);
374 						args->m_cs->lock();
375 						args->m_hasCommand = false;
376 						args->m_cs->unlock();
377 					}
378 
379 					bool hasNewStatus = args->checkData();
380 					if (hasNewStatus)
381 					{
382 						if (args->m_hasStatus)
383 						{
384 							//overflow: last status hasn't been processed yet
385 							b3Assert(0);
386 							printf("Error: received new status but previous status not processed yet");
387 						}
388 						else
389 						{
390 							args->m_cs->lock();
391 							args->m_hasStatus = hasNewStatus;
392 							args->m_cs->unlock();
393 						}
394 					}
395 				}
396 			}
397 
398 		} while (args->m_cs->getSharedParam(0) != eUDPRequestTerminate);
399 	}
400 	else
401 	{
402 		args->m_cs->lock();
403 		args->m_cs->setSharedParam(0, eUDPInitializationFailed);
404 		args->m_cs->unlock();
405 	}
406 
407 	printf("finished\n");
408 }
409 
UDPlsMemoryFunc()410 void* UDPlsMemoryFunc()
411 {
412 	//don't create local store memory, just return 0
413 	return new UDPThreadLocalStorage;
414 }
415 
UDPlsMemoryReleaseFunc(void * ptr)416 void UDPlsMemoryReleaseFunc(void* ptr)
417 {
418 	UDPThreadLocalStorage* p = (UDPThreadLocalStorage*)ptr;
419 	delete p;
420 }
421 
UdpNetworkedPhysicsProcessor(const char * hostName,int port)422 UdpNetworkedPhysicsProcessor::UdpNetworkedPhysicsProcessor(const char* hostName, int port)
423 {
424 	m_data = new UdpNetworkedInternalData;
425 	if (hostName)
426 	{
427 		m_data->m_hostName = hostName;
428 	}
429 	m_data->m_port = port;
430 }
431 
~UdpNetworkedPhysicsProcessor()432 UdpNetworkedPhysicsProcessor::~UdpNetworkedPhysicsProcessor()
433 {
434 	disconnect();
435 	delete m_data;
436 }
437 
processCommand(const struct SharedMemoryCommand & clientCmd,struct SharedMemoryStatus & serverStatusOut,char * bufferServerToClient,int bufferSizeInBytes)438 bool UdpNetworkedPhysicsProcessor::processCommand(const struct SharedMemoryCommand& clientCmd, struct SharedMemoryStatus& serverStatusOut, char* bufferServerToClient, int bufferSizeInBytes)
439 {
440 	if (gVerboseNetworkMessagesClient)
441 	{
442 		printf("PhysicsClientUDP::processCommand\n");
443 	}
444 	//	int sz = sizeof(SharedMemoryCommand);
445 
446 	b3Clock clock;
447 	double startTime = clock.getTimeInSeconds();
448 	double timeOutInSeconds = m_data->m_timeOutInSeconds;
449 
450 	m_data->m_cs->lock();
451 	m_data->m_clientCmd = clientCmd;
452 	m_data->m_hasCommand = true;
453 	m_data->m_cs->unlock();
454 
455 	while ((m_data->m_hasCommand) && (clock.getTimeInSeconds() - startTime < timeOutInSeconds))
456 	{
457 		b3Clock::usleep(0);
458 	}
459 
460 #if 0
461 
462 
463 	bool hasStatus = false;
464 
465 	b3Clock clock;
466 	double startTime = clock.getTimeInSeconds();
467 	double timeOutInSeconds = m_data->m_timeOutInSeconds;
468 
469 	const SharedMemoryStatus* stat = 0;
470 	while ((!hasStatus) && (clock.getTimeInSeconds() - startTime < timeOutInSeconds))
471 	{
472 		hasStatus = receiveStatus(serverStatusOut, bufferServerToClient, bufferSizeInBytes);
473 		b3Clock::usleep(100);
474 	}
475 	return hasStatus;
476 
477 #endif
478 
479 	return false;
480 }
481 
receiveStatus(struct SharedMemoryStatus & serverStatusOut,char * bufferServerToClient,int bufferSizeInBytes)482 bool UdpNetworkedPhysicsProcessor::receiveStatus(struct SharedMemoryStatus& serverStatusOut, char* bufferServerToClient, int bufferSizeInBytes)
483 {
484 	bool hasStatus = false;
485 	if (m_data->m_hasStatus)
486 	{
487 		if (gVerboseNetworkMessagesClient)
488 		{
489 			printf("UdpNetworkedPhysicsProcessor::receiveStatus\n");
490 		}
491 
492 		hasStatus = true;
493 		serverStatusOut = m_data->m_lastStatus;
494 		int numStreamBytes = m_data->m_stream.size();
495 
496 		if (numStreamBytes < bufferSizeInBytes)
497 		{
498 			for (int i = 0; i < numStreamBytes; i++)
499 			{
500 				bufferServerToClient[i] = m_data->m_stream[i];
501 			}
502 		}
503 		else
504 		{
505 			printf("Error: steam buffer overflow\n");
506 		}
507 
508 		m_data->m_cs->lock();
509 		m_data->m_hasStatus = false;
510 		m_data->m_cs->unlock();
511 	}
512 
513 	return hasStatus;
514 }
515 
renderScene(int renderFlags)516 void UdpNetworkedPhysicsProcessor::renderScene(int renderFlags)
517 {
518 }
519 
physicsDebugDraw(int debugDrawFlags)520 void UdpNetworkedPhysicsProcessor::physicsDebugDraw(int debugDrawFlags)
521 {
522 }
523 
setGuiHelper(struct GUIHelperInterface * guiHelper)524 void UdpNetworkedPhysicsProcessor::setGuiHelper(struct GUIHelperInterface* guiHelper)
525 {
526 }
527 
isConnected() const528 bool UdpNetworkedPhysicsProcessor::isConnected() const
529 {
530 	return m_data->m_isConnected;
531 }
532 
connect()533 bool UdpNetworkedPhysicsProcessor::connect()
534 {
535 	if (m_data->m_threadSupport == 0)
536 	{
537 		m_data->m_threadSupport = createUDPThreadSupport(1);
538 
539 		m_data->m_cs = m_data->m_threadSupport->createCriticalSection();
540 		m_data->m_cs->setSharedParam(0, eUDPIsUnInitialized);
541 		m_data->m_threadSupport->runTask(B3_THREAD_SCHEDULE_TASK, (void*)m_data, 0);
542 
543 		while (m_data->m_cs->getSharedParam(0) == eUDPIsUnInitialized)
544 		{
545 			b3Clock::usleep(1000);
546 		}
547 
548 		m_data->m_cs->lock();
549 		m_data->m_cs->setSharedParam(1, eUDP_ConnectRequest);
550 		m_data->m_cs->unlock();
551 
552 		while (m_data->m_cs->getSharedParam(1) == eUDP_ConnectRequest)
553 		{
554 			b3Clock::usleep(1000);
555 		}
556 	}
557 	unsigned int sharedParam = m_data->m_cs->getSharedParam(1);
558 	bool isConnected = (sharedParam == eUDP_Connected);
559 	return isConnected;
560 }
561 
disconnect()562 void UdpNetworkedPhysicsProcessor::disconnect()
563 {
564 	if (m_data->m_threadSupport)
565 	{
566 		m_data->m_cs->lock();
567 		m_data->m_cs->setSharedParam(0, eUDPRequestTerminate);
568 		m_data->m_cs->unlock();
569 
570 		int numActiveThreads = 1;
571 
572 		while (numActiveThreads)
573 		{
574 			int arg0, arg1;
575 			if (m_data->m_threadSupport->isTaskCompleted(&arg0, &arg1, 0))
576 			{
577 				numActiveThreads--;
578 				printf("numActiveThreads = %d\n", numActiveThreads);
579 			}
580 			else
581 			{
582 				b3Clock::usleep(1000);
583 			}
584 		};
585 
586 		printf("stopping threads\n");
587 
588 		delete m_data->m_threadSupport;
589 		m_data->m_threadSupport = 0;
590 		m_data->m_isConnected = false;
591 	}
592 }
593 
setTimeOut(double timeOutInSeconds)594 void UdpNetworkedPhysicsProcessor::setTimeOut(double timeOutInSeconds)
595 {
596 	m_data->m_timeOutInSeconds = timeOutInSeconds;
597 }
598