1 #ifndef _WIN32
2 /*
3 Bullet Continuous Collision Detection and Physics Library
4 Copyright (c) 2003-2007 Erwin Coumans  http://bulletphysics.com
5 
6 This software is provided 'as-is', without any express or implied warranty.
7 In no event will the authors be held liable for any damages arising from the use of this software.
8 Permission is granted to anyone to use this software for any purpose,
9 including commercial applications, and to alter it and redistribute it freely,
10 subject to the following restrictions:
11 
12 1. The origin of this software must not be misrepresented; you must not claim that you wrote the original software. If you use this software in a product, an acknowledgment in the product documentation would be appreciated but is not required.
13 2. Altered source versions must be plainly marked as such, and must not be misrepresented as being the original software.
14 3. This notice may not be removed or altered from any source distribution.
15 */
16 
17 #include <stdio.h>
18 #include "b3PosixThreadSupport.h"
19 #include <errno.h>
20 #include <unistd.h>
21 
22 #define checkPThreadFunction(returnValue)                                                                 \
23 	if (0 != returnValue)                                                                                 \
24 	{                                                                                                     \
25 		printf("PThread problem at line %i in file %s: %i %d\n", __LINE__, __FILE__, returnValue, errno); \
26 	}
27 
28 // The number of threads should be equal to the number of available cores
29 // Todo: each worker should be linked to a single core, using SetThreadIdealProcessor.
30 
b3PosixThreadSupport(ThreadConstructionInfo & threadConstructionInfo)31 b3PosixThreadSupport::b3PosixThreadSupport(ThreadConstructionInfo& threadConstructionInfo)
32 {
33 	startThreads(threadConstructionInfo);
34 }
35 
36 // cleanup/shutdown Libspe2
~b3PosixThreadSupport()37 b3PosixThreadSupport::~b3PosixThreadSupport()
38 {
39 	stopThreads();
40 }
41 
42 #if (defined(__APPLE__))
43 #define NAMED_SEMAPHORES
44 #endif
45 
createSem(const char * baseName)46 static sem_t* createSem(const char* baseName)
47 {
48 	static int semCount = 0;
49 #ifdef NAMED_SEMAPHORES
50 	/// Named semaphore begin
51 	char name[32];
52 	snprintf(name, 32, "/%8.s-%4.d-%4.4d", baseName, getpid(), semCount++);
53 	sem_t* tempSem = sem_open(name, O_CREAT, 0600, 0);
54 
55 	if (tempSem != reinterpret_cast<sem_t*>(SEM_FAILED))
56 	{
57 		//        printf("Created \"%s\" Semaphore %p\n", name, tempSem);
58 	}
59 	else
60 	{
61 		//printf("Error creating Semaphore %d\n", errno);
62 		exit(-1);
63 	}
64 	/// Named semaphore end
65 #else
66 	sem_t* tempSem = new sem_t;
67 	checkPThreadFunction(sem_init(tempSem, 0, 0));
68 #endif
69 	return tempSem;
70 }
71 
destroySem(sem_t * semaphore)72 static void destroySem(sem_t* semaphore)
73 {
74 #ifdef NAMED_SEMAPHORES
75 	checkPThreadFunction(sem_close(semaphore));
76 #else
77 	checkPThreadFunction(sem_destroy(semaphore));
78 	delete semaphore;
79 #endif
80 }
81 
threadFunction(void * argument)82 static void* threadFunction(void* argument)
83 {
84 	b3PosixThreadSupport::b3ThreadStatus* status = (b3PosixThreadSupport::b3ThreadStatus*)argument;
85 
86 	while (1)
87 	{
88 		checkPThreadFunction(sem_wait(status->startSemaphore));
89 
90 		void* userPtr = status->m_userPtr;
91 
92 		if (userPtr)
93 		{
94 			b3Assert(status->m_status);
95 			status->m_userThreadFunc(userPtr, status->m_lsMemory);
96 			status->m_status = 2;
97 			checkPThreadFunction(sem_post(status->m_mainSemaphore));
98 			status->threadUsed++;
99 		}
100 		else
101 		{
102 			//exit Thread
103 			status->m_status = 3;
104 			checkPThreadFunction(sem_post(status->m_mainSemaphore));
105 			printf("Thread with taskId %i exiting\n", status->m_taskId);
106 			break;
107 		}
108 	}
109 
110 	printf("Thread TERMINATED\n");
111 	return 0;
112 }
113 
114 ///send messages to SPUs
runTask(int uiCommand,void * uiArgument0,int taskId)115 void b3PosixThreadSupport::runTask(int uiCommand, void* uiArgument0, int taskId)
116 {
117 	///	gMidphaseSPU.sendRequest(CMD_GATHER_AND_PROCESS_PAIRLIST, (int) &taskDesc);
118 
119 	///we should spawn an SPU task here, and in 'waitForResponse' it should wait for response of the (one of) the first tasks that finished
120 
121 	switch (uiCommand)
122 	{
123 		case B3_THREAD_SCHEDULE_TASK:
124 		{
125 			b3ThreadStatus& spuStatus = m_activeThreadStatus[taskId];
126 			b3Assert(taskId >= 0);
127 			b3Assert(taskId < m_activeThreadStatus.size());
128 
129 			spuStatus.m_commandId = uiCommand;
130 			spuStatus.m_status = 1;
131 			spuStatus.m_userPtr = (void*)uiArgument0;
132 
133 			// fire event to start new task
134 			checkPThreadFunction(sem_post(spuStatus.startSemaphore));
135 			break;
136 		}
137 		default:
138 		{
139 			///not implemented
140 			b3Assert(0);
141 		}
142 	};
143 }
144 
145 ///non-blocking test if a task is completed. First implement all versions, and then enable this API
isTaskCompleted(int * puiArgument0,int * puiArgument1,int timeOutInMilliseconds)146 bool b3PosixThreadSupport::isTaskCompleted(int* puiArgument0, int* puiArgument1, int timeOutInMilliseconds)
147 {
148 	b3Assert(m_activeThreadStatus.size());
149 
150 	// wait for any of the threads to finish
151 	int result = sem_trywait(m_mainSemaphore);
152 	if (result == 0)
153 	{
154 		// get at least one thread which has finished
155 		int last = -1;
156 		int status = -1;
157 		for (int t = 0; t < int(m_activeThreadStatus.size()); ++t)
158 		{
159 			status = m_activeThreadStatus[t].m_status;
160 			if (2 == m_activeThreadStatus[t].m_status)
161 			{
162 				last = t;
163 				break;
164 			}
165 		}
166 
167 		b3ThreadStatus& spuStatus = m_activeThreadStatus[last];
168 
169 		b3Assert(spuStatus.m_status > 1);
170 		spuStatus.m_status = 0;
171 
172 		// need to find an active spu
173 		b3Assert(last >= 0);
174 
175 		*puiArgument0 = spuStatus.m_taskId;
176 		*puiArgument1 = spuStatus.m_status;
177 		return true;
178 	}
179 	return false;
180 }
181 
182 ///check for messages from SPUs
waitForResponse(int * puiArgument0,int * puiArgument1)183 void b3PosixThreadSupport::waitForResponse(int* puiArgument0, int* puiArgument1)
184 {
185 	///We should wait for (one of) the first tasks to finish (or other SPU messages), and report its response
186 
187 	///A possible response can be 'yes, SPU handled it', or 'no, please do a PPU fallback'
188 
189 	b3Assert(m_activeThreadStatus.size());
190 
191 	// wait for any of the threads to finish
192 	checkPThreadFunction(sem_wait(m_mainSemaphore));
193 
194 	// get at least one thread which has finished
195 	size_t last = -1;
196 
197 	for (size_t t = 0; t < size_t(m_activeThreadStatus.size()); ++t)
198 	{
199 		if (2 == m_activeThreadStatus[t].m_status)
200 		{
201 			last = t;
202 			break;
203 		}
204 	}
205 
206 	b3ThreadStatus& spuStatus = m_activeThreadStatus[last];
207 
208 	b3Assert(spuStatus.m_status > 1);
209 	spuStatus.m_status = 0;
210 
211 	// need to find an active spu
212 	b3Assert(last >= 0);
213 
214 	*puiArgument0 = spuStatus.m_taskId;
215 	*puiArgument1 = spuStatus.m_status;
216 }
217 
startThreads(ThreadConstructionInfo & threadConstructionInfo)218 void b3PosixThreadSupport::startThreads(ThreadConstructionInfo& threadConstructionInfo)
219 {
220 	printf("%s creating %i threads.\n", __FUNCTION__, threadConstructionInfo.m_numThreads);
221 	m_activeThreadStatus.resize(threadConstructionInfo.m_numThreads);
222 
223 	m_mainSemaphore = createSem("main");
224 	//checkPThreadFunction(sem_wait(mainSemaphore));
225 
226 	for (int i = 0; i < threadConstructionInfo.m_numThreads; i++)
227 	{
228 		printf("starting thread %d\n", i);
229 
230 		b3ThreadStatus& spuStatus = m_activeThreadStatus[i];
231 
232 		spuStatus.startSemaphore = createSem("threadLocal");
233 
234 		checkPThreadFunction(pthread_create(&spuStatus.thread, NULL, &threadFunction, (void*)&spuStatus));
235 
236 		spuStatus.m_userPtr = 0;
237 
238 		spuStatus.m_taskId = i;
239 		spuStatus.m_commandId = 0;
240 		spuStatus.m_status = 0;
241 		spuStatus.m_mainSemaphore = m_mainSemaphore;
242 		spuStatus.m_lsMemory = threadConstructionInfo.m_lsMemoryFunc();
243 		spuStatus.m_userThreadFunc = threadConstructionInfo.m_userThreadFunc;
244 		spuStatus.m_lsMemoryReleaseFunc = threadConstructionInfo.m_lsMemoryReleaseFunc;
245 		spuStatus.threadUsed = 0;
246 
247 		printf("started thread %d \n", i);
248 	}
249 }
250 
251 ///tell the task scheduler we are done with the SPU tasks
stopThreads()252 void b3PosixThreadSupport::stopThreads()
253 {
254 	for (size_t t = 0; t < size_t(m_activeThreadStatus.size()); ++t)
255 	{
256 		b3ThreadStatus& spuStatus = m_activeThreadStatus[t];
257 
258 		// printf("%s: Thread %i used: %ld\n", __FUNCTION__, int(t), spuStatus.threadUsed);
259 
260 		spuStatus.m_userPtr = 0;
261 		checkPThreadFunction(sem_post(spuStatus.startSemaphore));
262 		checkPThreadFunction(sem_wait(m_mainSemaphore));
263 
264 		printf("destroy semaphore\n");
265 		destroySem(spuStatus.startSemaphore);
266 		printf("semaphore destroyed\n");
267 		checkPThreadFunction(pthread_join(spuStatus.thread, 0));
268 
269 		if (spuStatus.m_lsMemoryReleaseFunc)
270 		{
271 			spuStatus.m_lsMemoryReleaseFunc(spuStatus.m_lsMemory);
272 		}
273 	}
274 	printf("destroy main semaphore\n");
275 	destroySem(m_mainSemaphore);
276 	printf("main semaphore destroyed\n");
277 	m_activeThreadStatus.clear();
278 }
279 
280 class b3PosixCriticalSection : public b3CriticalSection
281 {
282 	pthread_mutex_t m_mutex;
283 
284 public:
b3PosixCriticalSection()285 	b3PosixCriticalSection()
286 	{
287 		pthread_mutex_init(&m_mutex, NULL);
288 	}
~b3PosixCriticalSection()289 	virtual ~b3PosixCriticalSection()
290 	{
291 		pthread_mutex_destroy(&m_mutex);
292 	}
293 
294 	B3_ATTRIBUTE_ALIGNED16(unsigned int mCommonBuff[32]);
295 
getSharedParam(int i)296 	virtual unsigned int getSharedParam(int i)
297 	{
298 		if (i < 32)
299 		{
300 			return mCommonBuff[i];
301 		}
302 		else
303 		{
304 			b3Assert(0);
305 		}
306 		return 0;
307 	}
setSharedParam(int i,unsigned int p)308 	virtual void setSharedParam(int i, unsigned int p)
309 	{
310 		if (i < 32)
311 		{
312 			mCommonBuff[i] = p;
313 		}
314 		else
315 		{
316 			b3Assert(0);
317 		}
318 	}
319 
lock()320 	virtual void lock()
321 	{
322 		pthread_mutex_lock(&m_mutex);
323 	}
unlock()324 	virtual void unlock()
325 	{
326 		pthread_mutex_unlock(&m_mutex);
327 	}
328 };
329 
330 #if defined(_POSIX_BARRIERS) && (_POSIX_BARRIERS - 20012L) >= 0
331 /* OK to use barriers on this platform */
332 class b3PosixBarrier : public b3Barrier
333 {
334 	pthread_barrier_t m_barr;
335 	int m_numThreads;
336 
337 public:
b3PosixBarrier()338 	b3PosixBarrier()
339 		: m_numThreads(0) {}
~b3PosixBarrier()340 	virtual ~b3PosixBarrier()
341 	{
342 		pthread_barrier_destroy(&m_barr);
343 	}
344 
sync()345 	virtual void sync()
346 	{
347 		int rc = pthread_barrier_wait(&m_barr);
348 		if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD)
349 		{
350 			printf("Could not wait on barrier\n");
351 			exit(-1);
352 		}
353 	}
setMaxCount(int numThreads)354 	virtual void setMaxCount(int numThreads)
355 	{
356 		int result = pthread_barrier_init(&m_barr, NULL, numThreads);
357 		m_numThreads = numThreads;
358 		b3Assert(result == 0);
359 	}
getMaxCount()360 	virtual int getMaxCount()
361 	{
362 		return m_numThreads;
363 	}
364 };
365 #else
366 /* Not OK to use barriers on this platform - insert alternate code here */
367 class b3PosixBarrier : public b3Barrier
368 {
369 	pthread_mutex_t m_mutex;
370 	pthread_cond_t m_cond;
371 
372 	int m_numThreads;
373 	int m_called;
374 
375 public:
b3PosixBarrier()376 	b3PosixBarrier()
377 		: m_numThreads(0)
378 	{
379 	}
~b3PosixBarrier()380 	virtual ~b3PosixBarrier()
381 	{
382 		if (m_numThreads > 0)
383 		{
384 			pthread_mutex_destroy(&m_mutex);
385 			pthread_cond_destroy(&m_cond);
386 		}
387 	}
388 
sync()389 	virtual void sync()
390 	{
391 		pthread_mutex_lock(&m_mutex);
392 		m_called++;
393 		if (m_called == m_numThreads)
394 		{
395 			m_called = 0;
396 			pthread_cond_broadcast(&m_cond);
397 		}
398 		else
399 		{
400 			pthread_cond_wait(&m_cond, &m_mutex);
401 		}
402 		pthread_mutex_unlock(&m_mutex);
403 	}
setMaxCount(int numThreads)404 	virtual void setMaxCount(int numThreads)
405 	{
406 		if (m_numThreads > 0)
407 		{
408 			pthread_mutex_destroy(&m_mutex);
409 			pthread_cond_destroy(&m_cond);
410 		}
411 		m_called = 0;
412 		pthread_mutex_init(&m_mutex, NULL);
413 		pthread_cond_init(&m_cond, NULL);
414 		m_numThreads = numThreads;
415 	}
getMaxCount()416 	virtual int getMaxCount()
417 	{
418 		return m_numThreads;
419 	}
420 };
421 
422 #endif  //_POSIX_BARRIERS
423 
createBarrier()424 b3Barrier* b3PosixThreadSupport::createBarrier()
425 {
426 	b3PosixBarrier* barrier = new b3PosixBarrier();
427 	barrier->setMaxCount(getNumTasks());
428 	return barrier;
429 }
430 
createCriticalSection()431 b3CriticalSection* b3PosixThreadSupport::createCriticalSection()
432 {
433 	return new b3PosixCriticalSection();
434 }
435 
deleteBarrier(b3Barrier * barrier)436 void b3PosixThreadSupport::deleteBarrier(b3Barrier* barrier)
437 {
438 	delete barrier;
439 }
440 
deleteCriticalSection(b3CriticalSection * cs)441 void b3PosixThreadSupport::deleteCriticalSection(b3CriticalSection* cs)
442 {
443 	delete cs;
444 }
445 #endif  //_WIN32
446