1 #ifndef _WIN32
2 /*
3 Bullet Continuous Collision Detection and Physics Library
4 Copyright (c) 2003-2007 Erwin Coumans http://bulletphysics.com
5
6 This software is provided 'as-is', without any express or implied warranty.
7 In no event will the authors be held liable for any damages arising from the use of this software.
8 Permission is granted to anyone to use this software for any purpose,
9 including commercial applications, and to alter it and redistribute it freely,
10 subject to the following restrictions:
11
12 1. The origin of this software must not be misrepresented; you must not claim that you wrote the original software. If you use this software in a product, an acknowledgment in the product documentation would be appreciated but is not required.
13 2. Altered source versions must be plainly marked as such, and must not be misrepresented as being the original software.
14 3. This notice may not be removed or altered from any source distribution.
15 */
16
17 #include <stdio.h>
18 #include "b3PosixThreadSupport.h"
19 #include <errno.h>
20 #include <unistd.h>
21
22 #define checkPThreadFunction(returnValue) \
23 if (0 != returnValue) \
24 { \
25 printf("PThread problem at line %i in file %s: %i %d\n", __LINE__, __FILE__, returnValue, errno); \
26 }
27
28 // The number of threads should be equal to the number of available cores
29 // Todo: each worker should be linked to a single core, using SetThreadIdealProcessor.
30
b3PosixThreadSupport(ThreadConstructionInfo & threadConstructionInfo)31 b3PosixThreadSupport::b3PosixThreadSupport(ThreadConstructionInfo& threadConstructionInfo)
32 {
33 startThreads(threadConstructionInfo);
34 }
35
36 // cleanup/shutdown Libspe2
~b3PosixThreadSupport()37 b3PosixThreadSupport::~b3PosixThreadSupport()
38 {
39 stopThreads();
40 }
41
42 #if (defined(__APPLE__))
43 #define NAMED_SEMAPHORES
44 #endif
45
createSem(const char * baseName)46 static sem_t* createSem(const char* baseName)
47 {
48 static int semCount = 0;
49 #ifdef NAMED_SEMAPHORES
50 /// Named semaphore begin
51 char name[32];
52 snprintf(name, 32, "/%8.s-%4.d-%4.4d", baseName, getpid(), semCount++);
53 sem_t* tempSem = sem_open(name, O_CREAT, 0600, 0);
54
55 if (tempSem != reinterpret_cast<sem_t*>(SEM_FAILED))
56 {
57 // printf("Created \"%s\" Semaphore %p\n", name, tempSem);
58 }
59 else
60 {
61 //printf("Error creating Semaphore %d\n", errno);
62 exit(-1);
63 }
64 /// Named semaphore end
65 #else
66 sem_t* tempSem = new sem_t;
67 checkPThreadFunction(sem_init(tempSem, 0, 0));
68 #endif
69 return tempSem;
70 }
71
destroySem(sem_t * semaphore)72 static void destroySem(sem_t* semaphore)
73 {
74 #ifdef NAMED_SEMAPHORES
75 checkPThreadFunction(sem_close(semaphore));
76 #else
77 checkPThreadFunction(sem_destroy(semaphore));
78 delete semaphore;
79 #endif
80 }
81
threadFunction(void * argument)82 static void* threadFunction(void* argument)
83 {
84 b3PosixThreadSupport::b3ThreadStatus* status = (b3PosixThreadSupport::b3ThreadStatus*)argument;
85
86 while (1)
87 {
88 checkPThreadFunction(sem_wait(status->startSemaphore));
89
90 void* userPtr = status->m_userPtr;
91
92 if (userPtr)
93 {
94 b3Assert(status->m_status);
95 status->m_userThreadFunc(userPtr, status->m_lsMemory);
96 status->m_status = 2;
97 checkPThreadFunction(sem_post(status->m_mainSemaphore));
98 status->threadUsed++;
99 }
100 else
101 {
102 //exit Thread
103 status->m_status = 3;
104 checkPThreadFunction(sem_post(status->m_mainSemaphore));
105 printf("Thread with taskId %i exiting\n", status->m_taskId);
106 break;
107 }
108 }
109
110 printf("Thread TERMINATED\n");
111 return 0;
112 }
113
114 ///send messages to SPUs
runTask(int uiCommand,void * uiArgument0,int taskId)115 void b3PosixThreadSupport::runTask(int uiCommand, void* uiArgument0, int taskId)
116 {
117 /// gMidphaseSPU.sendRequest(CMD_GATHER_AND_PROCESS_PAIRLIST, (int) &taskDesc);
118
119 ///we should spawn an SPU task here, and in 'waitForResponse' it should wait for response of the (one of) the first tasks that finished
120
121 switch (uiCommand)
122 {
123 case B3_THREAD_SCHEDULE_TASK:
124 {
125 b3ThreadStatus& spuStatus = m_activeThreadStatus[taskId];
126 b3Assert(taskId >= 0);
127 b3Assert(taskId < m_activeThreadStatus.size());
128
129 spuStatus.m_commandId = uiCommand;
130 spuStatus.m_status = 1;
131 spuStatus.m_userPtr = (void*)uiArgument0;
132
133 // fire event to start new task
134 checkPThreadFunction(sem_post(spuStatus.startSemaphore));
135 break;
136 }
137 default:
138 {
139 ///not implemented
140 b3Assert(0);
141 }
142 };
143 }
144
145 ///non-blocking test if a task is completed. First implement all versions, and then enable this API
isTaskCompleted(int * puiArgument0,int * puiArgument1,int timeOutInMilliseconds)146 bool b3PosixThreadSupport::isTaskCompleted(int* puiArgument0, int* puiArgument1, int timeOutInMilliseconds)
147 {
148 b3Assert(m_activeThreadStatus.size());
149
150 // wait for any of the threads to finish
151 int result = sem_trywait(m_mainSemaphore);
152 if (result == 0)
153 {
154 // get at least one thread which has finished
155 int last = -1;
156 int status = -1;
157 for (int t = 0; t < int(m_activeThreadStatus.size()); ++t)
158 {
159 status = m_activeThreadStatus[t].m_status;
160 if (2 == m_activeThreadStatus[t].m_status)
161 {
162 last = t;
163 break;
164 }
165 }
166
167 b3ThreadStatus& spuStatus = m_activeThreadStatus[last];
168
169 b3Assert(spuStatus.m_status > 1);
170 spuStatus.m_status = 0;
171
172 // need to find an active spu
173 b3Assert(last >= 0);
174
175 *puiArgument0 = spuStatus.m_taskId;
176 *puiArgument1 = spuStatus.m_status;
177 return true;
178 }
179 return false;
180 }
181
182 ///check for messages from SPUs
waitForResponse(int * puiArgument0,int * puiArgument1)183 void b3PosixThreadSupport::waitForResponse(int* puiArgument0, int* puiArgument1)
184 {
185 ///We should wait for (one of) the first tasks to finish (or other SPU messages), and report its response
186
187 ///A possible response can be 'yes, SPU handled it', or 'no, please do a PPU fallback'
188
189 b3Assert(m_activeThreadStatus.size());
190
191 // wait for any of the threads to finish
192 checkPThreadFunction(sem_wait(m_mainSemaphore));
193
194 // get at least one thread which has finished
195 size_t last = -1;
196
197 for (size_t t = 0; t < size_t(m_activeThreadStatus.size()); ++t)
198 {
199 if (2 == m_activeThreadStatus[t].m_status)
200 {
201 last = t;
202 break;
203 }
204 }
205
206 b3ThreadStatus& spuStatus = m_activeThreadStatus[last];
207
208 b3Assert(spuStatus.m_status > 1);
209 spuStatus.m_status = 0;
210
211 // need to find an active spu
212 b3Assert(last >= 0);
213
214 *puiArgument0 = spuStatus.m_taskId;
215 *puiArgument1 = spuStatus.m_status;
216 }
217
startThreads(ThreadConstructionInfo & threadConstructionInfo)218 void b3PosixThreadSupport::startThreads(ThreadConstructionInfo& threadConstructionInfo)
219 {
220 printf("%s creating %i threads.\n", __FUNCTION__, threadConstructionInfo.m_numThreads);
221 m_activeThreadStatus.resize(threadConstructionInfo.m_numThreads);
222
223 m_mainSemaphore = createSem("main");
224 //checkPThreadFunction(sem_wait(mainSemaphore));
225
226 for (int i = 0; i < threadConstructionInfo.m_numThreads; i++)
227 {
228 printf("starting thread %d\n", i);
229
230 b3ThreadStatus& spuStatus = m_activeThreadStatus[i];
231
232 spuStatus.startSemaphore = createSem("threadLocal");
233
234 checkPThreadFunction(pthread_create(&spuStatus.thread, NULL, &threadFunction, (void*)&spuStatus));
235
236 spuStatus.m_userPtr = 0;
237
238 spuStatus.m_taskId = i;
239 spuStatus.m_commandId = 0;
240 spuStatus.m_status = 0;
241 spuStatus.m_mainSemaphore = m_mainSemaphore;
242 spuStatus.m_lsMemory = threadConstructionInfo.m_lsMemoryFunc();
243 spuStatus.m_userThreadFunc = threadConstructionInfo.m_userThreadFunc;
244 spuStatus.m_lsMemoryReleaseFunc = threadConstructionInfo.m_lsMemoryReleaseFunc;
245 spuStatus.threadUsed = 0;
246
247 printf("started thread %d \n", i);
248 }
249 }
250
251 ///tell the task scheduler we are done with the SPU tasks
stopThreads()252 void b3PosixThreadSupport::stopThreads()
253 {
254 for (size_t t = 0; t < size_t(m_activeThreadStatus.size()); ++t)
255 {
256 b3ThreadStatus& spuStatus = m_activeThreadStatus[t];
257
258 // printf("%s: Thread %i used: %ld\n", __FUNCTION__, int(t), spuStatus.threadUsed);
259
260 spuStatus.m_userPtr = 0;
261 checkPThreadFunction(sem_post(spuStatus.startSemaphore));
262 checkPThreadFunction(sem_wait(m_mainSemaphore));
263
264 printf("destroy semaphore\n");
265 destroySem(spuStatus.startSemaphore);
266 printf("semaphore destroyed\n");
267 checkPThreadFunction(pthread_join(spuStatus.thread, 0));
268
269 if (spuStatus.m_lsMemoryReleaseFunc)
270 {
271 spuStatus.m_lsMemoryReleaseFunc(spuStatus.m_lsMemory);
272 }
273 }
274 printf("destroy main semaphore\n");
275 destroySem(m_mainSemaphore);
276 printf("main semaphore destroyed\n");
277 m_activeThreadStatus.clear();
278 }
279
280 class b3PosixCriticalSection : public b3CriticalSection
281 {
282 pthread_mutex_t m_mutex;
283
284 public:
b3PosixCriticalSection()285 b3PosixCriticalSection()
286 {
287 pthread_mutex_init(&m_mutex, NULL);
288 }
~b3PosixCriticalSection()289 virtual ~b3PosixCriticalSection()
290 {
291 pthread_mutex_destroy(&m_mutex);
292 }
293
294 B3_ATTRIBUTE_ALIGNED16(unsigned int mCommonBuff[32]);
295
getSharedParam(int i)296 virtual unsigned int getSharedParam(int i)
297 {
298 if (i < 32)
299 {
300 return mCommonBuff[i];
301 }
302 else
303 {
304 b3Assert(0);
305 }
306 return 0;
307 }
setSharedParam(int i,unsigned int p)308 virtual void setSharedParam(int i, unsigned int p)
309 {
310 if (i < 32)
311 {
312 mCommonBuff[i] = p;
313 }
314 else
315 {
316 b3Assert(0);
317 }
318 }
319
lock()320 virtual void lock()
321 {
322 pthread_mutex_lock(&m_mutex);
323 }
unlock()324 virtual void unlock()
325 {
326 pthread_mutex_unlock(&m_mutex);
327 }
328 };
329
330 #if defined(_POSIX_BARRIERS) && (_POSIX_BARRIERS - 20012L) >= 0
331 /* OK to use barriers on this platform */
332 class b3PosixBarrier : public b3Barrier
333 {
334 pthread_barrier_t m_barr;
335 int m_numThreads;
336
337 public:
b3PosixBarrier()338 b3PosixBarrier()
339 : m_numThreads(0) {}
~b3PosixBarrier()340 virtual ~b3PosixBarrier()
341 {
342 pthread_barrier_destroy(&m_barr);
343 }
344
sync()345 virtual void sync()
346 {
347 int rc = pthread_barrier_wait(&m_barr);
348 if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD)
349 {
350 printf("Could not wait on barrier\n");
351 exit(-1);
352 }
353 }
setMaxCount(int numThreads)354 virtual void setMaxCount(int numThreads)
355 {
356 int result = pthread_barrier_init(&m_barr, NULL, numThreads);
357 m_numThreads = numThreads;
358 b3Assert(result == 0);
359 }
getMaxCount()360 virtual int getMaxCount()
361 {
362 return m_numThreads;
363 }
364 };
365 #else
366 /* Not OK to use barriers on this platform - insert alternate code here */
367 class b3PosixBarrier : public b3Barrier
368 {
369 pthread_mutex_t m_mutex;
370 pthread_cond_t m_cond;
371
372 int m_numThreads;
373 int m_called;
374
375 public:
b3PosixBarrier()376 b3PosixBarrier()
377 : m_numThreads(0)
378 {
379 }
~b3PosixBarrier()380 virtual ~b3PosixBarrier()
381 {
382 if (m_numThreads > 0)
383 {
384 pthread_mutex_destroy(&m_mutex);
385 pthread_cond_destroy(&m_cond);
386 }
387 }
388
sync()389 virtual void sync()
390 {
391 pthread_mutex_lock(&m_mutex);
392 m_called++;
393 if (m_called == m_numThreads)
394 {
395 m_called = 0;
396 pthread_cond_broadcast(&m_cond);
397 }
398 else
399 {
400 pthread_cond_wait(&m_cond, &m_mutex);
401 }
402 pthread_mutex_unlock(&m_mutex);
403 }
setMaxCount(int numThreads)404 virtual void setMaxCount(int numThreads)
405 {
406 if (m_numThreads > 0)
407 {
408 pthread_mutex_destroy(&m_mutex);
409 pthread_cond_destroy(&m_cond);
410 }
411 m_called = 0;
412 pthread_mutex_init(&m_mutex, NULL);
413 pthread_cond_init(&m_cond, NULL);
414 m_numThreads = numThreads;
415 }
getMaxCount()416 virtual int getMaxCount()
417 {
418 return m_numThreads;
419 }
420 };
421
422 #endif //_POSIX_BARRIERS
423
createBarrier()424 b3Barrier* b3PosixThreadSupport::createBarrier()
425 {
426 b3PosixBarrier* barrier = new b3PosixBarrier();
427 barrier->setMaxCount(getNumTasks());
428 return barrier;
429 }
430
createCriticalSection()431 b3CriticalSection* b3PosixThreadSupport::createCriticalSection()
432 {
433 return new b3PosixCriticalSection();
434 }
435
deleteBarrier(b3Barrier * barrier)436 void b3PosixThreadSupport::deleteBarrier(b3Barrier* barrier)
437 {
438 delete barrier;
439 }
440
deleteCriticalSection(b3CriticalSection * cs)441 void b3PosixThreadSupport::deleteCriticalSection(b3CriticalSection* cs)
442 {
443 delete cs;
444 }
445 #endif //_WIN32
446