1
2 /*
3 Bullet Continuous Collision Detection and Physics Library
4 Copyright (c) 2003-2018 Erwin Coumans http://bulletphysics.com
5
6 This software is provided 'as-is', without any express or implied warranty.
7 In no event will the authors be held liable for any damages arising from the use of this software.
8 Permission is granted to anyone to use this software for any purpose,
9 including commercial applications, and to alter it and redistribute it freely,
10 subject to the following restrictions:
11
12 1. The origin of this software must not be misrepresented; you must not claim that you wrote the original software. If you use this software in a product, an acknowledgment in the product documentation would be appreciated but is not required.
13 2. Altered source versions must be plainly marked as such, and must not be misrepresented as being the original software.
14 3. This notice may not be removed or altered from any source distribution.
15 */
16
17 #if BT_THREADSAFE && !defined(_WIN32)
18
19 #include "LinearMath/btScalar.h"
20 #include "LinearMath/btAlignedObjectArray.h"
21 #include "LinearMath/btThreads.h"
22 #include "LinearMath/btMinMax.h"
23 #include "btThreadSupportInterface.h"
24
25 #include <stdio.h>
26 #include <errno.h>
27 #include <unistd.h>
28
29 #ifndef _XOPEN_SOURCE
30 #define _XOPEN_SOURCE 600 //for definition of pthread_barrier_t, see http://pages.cs.wisc.edu/~travitch/pthreads_primer.html
31 #endif //_XOPEN_SOURCE
32 #include <pthread.h>
33 #include <semaphore.h>
34 #include <unistd.h> //for sysconf
35
36 ///
37 /// getNumHardwareThreads()
38 ///
39 ///
40 /// https://stackoverflow.com/questions/150355/programmatically-find-the-number-of-cores-on-a-machine
41 ///
42 #if __cplusplus >= 201103L
43
44 #include <thread>
45
btGetNumHardwareThreads()46 int btGetNumHardwareThreads()
47 {
48 return btMax(1u, btMin(BT_MAX_THREAD_COUNT, std::thread::hardware_concurrency()));
49 }
50
51 #else
52
btGetNumHardwareThreads()53 int btGetNumHardwareThreads()
54 {
55 return btMax(1, btMin<int>(BT_MAX_THREAD_COUNT, sysconf(_SC_NPROCESSORS_ONLN)));
56 }
57
58 #endif
59
60 // btThreadSupportPosix helps to initialize/shutdown libspe2, start/stop SPU tasks and communication
61 class btThreadSupportPosix : public btThreadSupportInterface
62 {
63 public:
64 struct btThreadStatus
65 {
66 int m_taskId;
67 int m_commandId;
68 int m_status;
69
70 ThreadFunc m_userThreadFunc;
71 void* m_userPtr; //for taskDesc etc
72
73 pthread_t thread;
74 //each tread will wait until this signal to start its work
75 sem_t* startSemaphore;
76 btCriticalSection* m_cs;
77 // this is a copy of m_mainSemaphore,
78 //each tread will signal once it is finished with its work
79 sem_t* m_mainSemaphore;
80 unsigned long threadUsed;
81 };
82
83 private:
84 typedef unsigned long long UINT64;
85
86 btAlignedObjectArray<btThreadStatus> m_activeThreadStatus;
87 // m_mainSemaphoresemaphore will signal, if and how many threads are finished with their work
88 sem_t* m_mainSemaphore;
89 int m_numThreads;
90 UINT64 m_startedThreadsMask;
91 void startThreads(const ConstructionInfo& threadInfo);
92 void stopThreads();
93 int waitForResponse();
94 btCriticalSection* m_cs;
95 public:
96 btThreadSupportPosix(const ConstructionInfo& threadConstructionInfo);
97 virtual ~btThreadSupportPosix();
98
getNumWorkerThreads() const99 virtual int getNumWorkerThreads() const BT_OVERRIDE { return m_numThreads; }
100 // TODO: return the number of logical processors sharing the first L3 cache
getCacheFriendlyNumThreads() const101 virtual int getCacheFriendlyNumThreads() const BT_OVERRIDE { return m_numThreads + 1; }
102 // TODO: detect if CPU has hyperthreading enabled
getLogicalToPhysicalCoreRatio() const103 virtual int getLogicalToPhysicalCoreRatio() const BT_OVERRIDE { return 1; }
104
105 virtual void runTask(int threadIndex, void* userData) BT_OVERRIDE;
106 virtual void waitForAllTasks() BT_OVERRIDE;
107
108 virtual btCriticalSection* createCriticalSection() BT_OVERRIDE;
109 virtual void deleteCriticalSection(btCriticalSection* criticalSection) BT_OVERRIDE;
110 };
111
112 #define checkPThreadFunction(returnValue) \
113 if (0 != returnValue) \
114 { \
115 printf("PThread problem at line %i in file %s: %i %d\n", __LINE__, __FILE__, returnValue, errno); \
116 }
117
118 // The number of threads should be equal to the number of available cores
119 // Todo: each worker should be linked to a single core, using SetThreadIdealProcessor.
120
btThreadSupportPosix(const ConstructionInfo & threadConstructionInfo)121 btThreadSupportPosix::btThreadSupportPosix(const ConstructionInfo& threadConstructionInfo)
122 {
123 m_cs = createCriticalSection();
124 startThreads(threadConstructionInfo);
125 }
126
127 // cleanup/shutdown Libspe2
~btThreadSupportPosix()128 btThreadSupportPosix::~btThreadSupportPosix()
129 {
130 stopThreads();
131 deleteCriticalSection(m_cs);
132 m_cs=0;
133 }
134
135 #if (defined(__APPLE__))
136 #define NAMED_SEMAPHORES
137 #endif
138
createSem(const char * baseName)139 static sem_t* createSem(const char* baseName)
140 {
141 static int semCount = 0;
142 #ifdef NAMED_SEMAPHORES
143 /// Named semaphore begin
144 char name[32];
145 snprintf(name, 32, "/%8.s-%4.d-%4.4d", baseName, getpid(), semCount++);
146 sem_t* tempSem = sem_open(name, O_CREAT, 0600, 0);
147
148 if (tempSem != reinterpret_cast<sem_t*>(SEM_FAILED))
149 {
150 // printf("Created \"%s\" Semaphore %p\n", name, tempSem);
151 }
152 else
153 {
154 //printf("Error creating Semaphore %d\n", errno);
155 exit(-1);
156 }
157 /// Named semaphore end
158 #else
159 sem_t* tempSem = new sem_t;
160 checkPThreadFunction(sem_init(tempSem, 0, 0));
161 #endif
162 return tempSem;
163 }
164
destroySem(sem_t * semaphore)165 static void destroySem(sem_t* semaphore)
166 {
167 #ifdef NAMED_SEMAPHORES
168 checkPThreadFunction(sem_close(semaphore));
169 #else
170 checkPThreadFunction(sem_destroy(semaphore));
171 delete semaphore;
172 #endif
173 }
174
threadFunction(void * argument)175 static void* threadFunction(void* argument)
176 {
177 btThreadSupportPosix::btThreadStatus* status = (btThreadSupportPosix::btThreadStatus*)argument;
178
179 while (1)
180 {
181 checkPThreadFunction(sem_wait(status->startSemaphore));
182 void* userPtr = status->m_userPtr;
183
184 if (userPtr)
185 {
186 btAssert(status->m_status);
187 status->m_userThreadFunc(userPtr);
188 status->m_cs->lock();
189 status->m_status = 2;
190 status->m_cs->unlock();
191 checkPThreadFunction(sem_post(status->m_mainSemaphore));
192 status->threadUsed++;
193 }
194 else
195 {
196 //exit Thread
197 status->m_cs->lock();
198 status->m_status = 3;
199 status->m_cs->unlock();
200 checkPThreadFunction(sem_post(status->m_mainSemaphore));
201 break;
202 }
203 }
204
205 return 0;
206 }
207
208 ///send messages to SPUs
runTask(int threadIndex,void * userData)209 void btThreadSupportPosix::runTask(int threadIndex, void* userData)
210 {
211 ///we should spawn an SPU task here, and in 'waitForResponse' it should wait for response of the (one of) the first tasks that finished
212 btThreadStatus& threadStatus = m_activeThreadStatus[threadIndex];
213 btAssert(threadIndex >= 0);
214 btAssert(threadIndex < m_activeThreadStatus.size());
215 threadStatus.m_cs = m_cs;
216 threadStatus.m_commandId = 1;
217 threadStatus.m_status = 1;
218 threadStatus.m_userPtr = userData;
219 m_startedThreadsMask |= UINT64(1) << threadIndex;
220
221 // fire event to start new task
222 checkPThreadFunction(sem_post(threadStatus.startSemaphore));
223 }
224
225 ///check for messages from SPUs
waitForResponse()226 int btThreadSupportPosix::waitForResponse()
227 {
228 ///We should wait for (one of) the first tasks to finish (or other SPU messages), and report its response
229 ///A possible response can be 'yes, SPU handled it', or 'no, please do a PPU fallback'
230
231 btAssert(m_activeThreadStatus.size());
232
233 // wait for any of the threads to finish
234 checkPThreadFunction(sem_wait(m_mainSemaphore));
235 // get at least one thread which has finished
236 size_t last = -1;
237
238 for (size_t t = 0; t < size_t(m_activeThreadStatus.size()); ++t)
239 {
240 m_cs->lock();
241 bool hasFinished = (2 == m_activeThreadStatus[t].m_status);
242 m_cs->unlock();
243 if (hasFinished)
244 {
245 last = t;
246 break;
247 }
248 }
249
250 btThreadStatus& threadStatus = m_activeThreadStatus[last];
251
252 btAssert(threadStatus.m_status > 1);
253 threadStatus.m_status = 0;
254
255 // need to find an active spu
256 btAssert(last >= 0);
257 m_startedThreadsMask &= ~(UINT64(1) << last);
258
259 return last;
260 }
261
waitForAllTasks()262 void btThreadSupportPosix::waitForAllTasks()
263 {
264 while (m_startedThreadsMask)
265 {
266 waitForResponse();
267 }
268 }
269
startThreads(const ConstructionInfo & threadConstructionInfo)270 void btThreadSupportPosix::startThreads(const ConstructionInfo& threadConstructionInfo)
271 {
272 m_numThreads = btGetNumHardwareThreads() - 1; // main thread exists already
273 m_activeThreadStatus.resize(m_numThreads);
274 m_startedThreadsMask = 0;
275
276 m_mainSemaphore = createSem("main");
277 //checkPThreadFunction(sem_wait(mainSemaphore));
278
279 for (int i = 0; i < m_numThreads; i++)
280 {
281 btThreadStatus& threadStatus = m_activeThreadStatus[i];
282 threadStatus.startSemaphore = createSem("threadLocal");
283 threadStatus.m_userPtr = 0;
284 threadStatus.m_cs = m_cs;
285 threadStatus.m_taskId = i;
286 threadStatus.m_commandId = 0;
287 threadStatus.m_status = 0;
288 threadStatus.m_mainSemaphore = m_mainSemaphore;
289 threadStatus.m_userThreadFunc = threadConstructionInfo.m_userThreadFunc;
290 threadStatus.threadUsed = 0;
291 checkPThreadFunction(pthread_create(&threadStatus.thread, NULL, &threadFunction, (void*)&threadStatus));
292
293 }
294 }
295
296 ///tell the task scheduler we are done with the SPU tasks
stopThreads()297 void btThreadSupportPosix::stopThreads()
298 {
299 for (size_t t = 0; t < size_t(m_activeThreadStatus.size()); ++t)
300 {
301 btThreadStatus& threadStatus = m_activeThreadStatus[t];
302
303 threadStatus.m_userPtr = 0;
304 checkPThreadFunction(sem_post(threadStatus.startSemaphore));
305 checkPThreadFunction(sem_wait(m_mainSemaphore));
306
307 checkPThreadFunction(pthread_join(threadStatus.thread, 0));
308 destroySem(threadStatus.startSemaphore);
309 }
310 destroySem(m_mainSemaphore);
311 m_activeThreadStatus.clear();
312 }
313
314 class btCriticalSectionPosix : public btCriticalSection
315 {
316 pthread_mutex_t m_mutex;
317
318 public:
btCriticalSectionPosix()319 btCriticalSectionPosix()
320 {
321 pthread_mutex_init(&m_mutex, NULL);
322 }
~btCriticalSectionPosix()323 virtual ~btCriticalSectionPosix()
324 {
325 pthread_mutex_destroy(&m_mutex);
326 }
327
lock()328 virtual void lock()
329 {
330 pthread_mutex_lock(&m_mutex);
331 }
unlock()332 virtual void unlock()
333 {
334 pthread_mutex_unlock(&m_mutex);
335 }
336 };
337
createCriticalSection()338 btCriticalSection* btThreadSupportPosix::createCriticalSection()
339 {
340 return new btCriticalSectionPosix();
341 }
342
deleteCriticalSection(btCriticalSection * cs)343 void btThreadSupportPosix::deleteCriticalSection(btCriticalSection* cs)
344 {
345 delete cs;
346 }
347
create(const ConstructionInfo & info)348 btThreadSupportInterface* btThreadSupportInterface::create(const ConstructionInfo& info)
349 {
350 return new btThreadSupportPosix(info);
351 }
352
353 #endif // BT_THREADSAFE && !defined( _WIN32 )
354