1 
2 /*
3 Bullet Continuous Collision Detection and Physics Library
4 Copyright (c) 2003-2018 Erwin Coumans  http://bulletphysics.com
5 
6 This software is provided 'as-is', without any express or implied warranty.
7 In no event will the authors be held liable for any damages arising from the use of this software.
8 Permission is granted to anyone to use this software for any purpose,
9 including commercial applications, and to alter it and redistribute it freely,
10 subject to the following restrictions:
11 
12 1. The origin of this software must not be misrepresented; you must not claim that you wrote the original software. If you use this software in a product, an acknowledgment in the product documentation would be appreciated but is not required.
13 2. Altered source versions must be plainly marked as such, and must not be misrepresented as being the original software.
14 3. This notice may not be removed or altered from any source distribution.
15 */
16 
17 #if BT_THREADSAFE && !defined(_WIN32)
18 
19 #include "LinearMath/btScalar.h"
20 #include "LinearMath/btAlignedObjectArray.h"
21 #include "LinearMath/btThreads.h"
22 #include "LinearMath/btMinMax.h"
23 #include "btThreadSupportInterface.h"
24 
25 #include <stdio.h>
26 #include <errno.h>
27 #include <unistd.h>
28 
29 #ifndef _XOPEN_SOURCE
30 #define _XOPEN_SOURCE 600  //for definition of pthread_barrier_t, see http://pages.cs.wisc.edu/~travitch/pthreads_primer.html
31 #endif                     //_XOPEN_SOURCE
32 #include <pthread.h>
33 #include <semaphore.h>
34 #include <unistd.h>  //for sysconf
35 
36 ///
37 /// getNumHardwareThreads()
38 ///
39 ///
40 /// https://stackoverflow.com/questions/150355/programmatically-find-the-number-of-cores-on-a-machine
41 ///
42 #if __cplusplus >= 201103L
43 
44 #include <thread>
45 
btGetNumHardwareThreads()46 int btGetNumHardwareThreads()
47 {
48 	return btMax(1u, btMin(BT_MAX_THREAD_COUNT, std::thread::hardware_concurrency()));
49 }
50 
51 #else
52 
btGetNumHardwareThreads()53 int btGetNumHardwareThreads()
54 {
55 	return btMax(1, btMin<int>(BT_MAX_THREAD_COUNT, sysconf(_SC_NPROCESSORS_ONLN)));
56 }
57 
58 #endif
59 
60 // btThreadSupportPosix helps to initialize/shutdown libspe2, start/stop SPU tasks and communication
61 class btThreadSupportPosix : public btThreadSupportInterface
62 {
63 public:
64 	struct btThreadStatus
65 	{
66 		int m_taskId;
67 		int m_commandId;
68 		int m_status;
69 
70 		ThreadFunc m_userThreadFunc;
71 		void* m_userPtr;  //for taskDesc etc
72 
73 		pthread_t thread;
74 		//each tread will wait until this signal to start its work
75 		sem_t* startSemaphore;
76 		btCriticalSection* m_cs;
77 		// this is a copy of m_mainSemaphore,
78 		//each tread will signal once it is finished with its work
79 		sem_t* m_mainSemaphore;
80 		unsigned long threadUsed;
81 	};
82 
83 private:
84 	typedef unsigned long long UINT64;
85 
86 	btAlignedObjectArray<btThreadStatus> m_activeThreadStatus;
87 	// m_mainSemaphoresemaphore will signal, if and how many threads are finished with their work
88 	sem_t* m_mainSemaphore;
89 	int m_numThreads;
90 	UINT64 m_startedThreadsMask;
91 	void startThreads(const ConstructionInfo& threadInfo);
92 	void stopThreads();
93 	int waitForResponse();
94 	btCriticalSection* m_cs;
95 public:
96 	btThreadSupportPosix(const ConstructionInfo& threadConstructionInfo);
97 	virtual ~btThreadSupportPosix();
98 
getNumWorkerThreads() const99 	virtual int getNumWorkerThreads() const BT_OVERRIDE { return m_numThreads; }
100 	// TODO: return the number of logical processors sharing the first L3 cache
getCacheFriendlyNumThreads() const101 	virtual int getCacheFriendlyNumThreads() const BT_OVERRIDE { return m_numThreads + 1; }
102 	// TODO: detect if CPU has hyperthreading enabled
getLogicalToPhysicalCoreRatio() const103 	virtual int getLogicalToPhysicalCoreRatio() const BT_OVERRIDE { return 1; }
104 
105 	virtual void runTask(int threadIndex, void* userData) BT_OVERRIDE;
106 	virtual void waitForAllTasks() BT_OVERRIDE;
107 
108 	virtual btCriticalSection* createCriticalSection() BT_OVERRIDE;
109 	virtual void deleteCriticalSection(btCriticalSection* criticalSection) BT_OVERRIDE;
110 };
111 
112 #define checkPThreadFunction(returnValue)                                                                 \
113 	if (0 != returnValue)                                                                                 \
114 	{                                                                                                     \
115 		printf("PThread problem at line %i in file %s: %i %d\n", __LINE__, __FILE__, returnValue, errno); \
116 	}
117 
118 // The number of threads should be equal to the number of available cores
119 // Todo: each worker should be linked to a single core, using SetThreadIdealProcessor.
120 
btThreadSupportPosix(const ConstructionInfo & threadConstructionInfo)121 btThreadSupportPosix::btThreadSupportPosix(const ConstructionInfo& threadConstructionInfo)
122 {
123 	m_cs = createCriticalSection();
124 	startThreads(threadConstructionInfo);
125 }
126 
127 // cleanup/shutdown Libspe2
~btThreadSupportPosix()128 btThreadSupportPosix::~btThreadSupportPosix()
129 {
130 	stopThreads();
131 	deleteCriticalSection(m_cs);
132 	m_cs=0;
133 }
134 
135 #if (defined(__APPLE__))
136 #define NAMED_SEMAPHORES
137 #endif
138 
createSem(const char * baseName)139 static sem_t* createSem(const char* baseName)
140 {
141 	static int semCount = 0;
142 #ifdef NAMED_SEMAPHORES
143 	/// Named semaphore begin
144 	char name[32];
145 	snprintf(name, 32, "/%8.s-%4.d-%4.4d", baseName, getpid(), semCount++);
146 	sem_t* tempSem = sem_open(name, O_CREAT, 0600, 0);
147 
148 	if (tempSem != reinterpret_cast<sem_t*>(SEM_FAILED))
149 	{
150 		//        printf("Created \"%s\" Semaphore %p\n", name, tempSem);
151 	}
152 	else
153 	{
154 		//printf("Error creating Semaphore %d\n", errno);
155 		exit(-1);
156 	}
157 	/// Named semaphore end
158 #else
159 	sem_t* tempSem = new sem_t;
160 	checkPThreadFunction(sem_init(tempSem, 0, 0));
161 #endif
162 	return tempSem;
163 }
164 
destroySem(sem_t * semaphore)165 static void destroySem(sem_t* semaphore)
166 {
167 #ifdef NAMED_SEMAPHORES
168 	checkPThreadFunction(sem_close(semaphore));
169 #else
170 	checkPThreadFunction(sem_destroy(semaphore));
171 	delete semaphore;
172 #endif
173 }
174 
threadFunction(void * argument)175 static void* threadFunction(void* argument)
176 {
177 	btThreadSupportPosix::btThreadStatus* status = (btThreadSupportPosix::btThreadStatus*)argument;
178 
179 	while (1)
180 	{
181 		checkPThreadFunction(sem_wait(status->startSemaphore));
182 		void* userPtr = status->m_userPtr;
183 
184 		if (userPtr)
185 		{
186 			btAssert(status->m_status);
187 			status->m_userThreadFunc(userPtr);
188 			status->m_cs->lock();
189 			status->m_status = 2;
190 			status->m_cs->unlock();
191 			checkPThreadFunction(sem_post(status->m_mainSemaphore));
192 			status->threadUsed++;
193 		}
194 		else
195 		{
196 			//exit Thread
197 			status->m_cs->lock();
198 			status->m_status = 3;
199 			status->m_cs->unlock();
200 			checkPThreadFunction(sem_post(status->m_mainSemaphore));
201 			break;
202 		}
203 	}
204 
205 	return 0;
206 }
207 
208 ///send messages to SPUs
runTask(int threadIndex,void * userData)209 void btThreadSupportPosix::runTask(int threadIndex, void* userData)
210 {
211 	///we should spawn an SPU task here, and in 'waitForResponse' it should wait for response of the (one of) the first tasks that finished
212 	btThreadStatus& threadStatus = m_activeThreadStatus[threadIndex];
213 	btAssert(threadIndex >= 0);
214 	btAssert(threadIndex < m_activeThreadStatus.size());
215 	threadStatus.m_cs = m_cs;
216 	threadStatus.m_commandId = 1;
217 	threadStatus.m_status = 1;
218 	threadStatus.m_userPtr = userData;
219 	m_startedThreadsMask |= UINT64(1) << threadIndex;
220 
221 	// fire event to start new task
222 	checkPThreadFunction(sem_post(threadStatus.startSemaphore));
223 }
224 
225 ///check for messages from SPUs
waitForResponse()226 int btThreadSupportPosix::waitForResponse()
227 {
228 	///We should wait for (one of) the first tasks to finish (or other SPU messages), and report its response
229 	///A possible response can be 'yes, SPU handled it', or 'no, please do a PPU fallback'
230 
231 	btAssert(m_activeThreadStatus.size());
232 
233 	// wait for any of the threads to finish
234 	checkPThreadFunction(sem_wait(m_mainSemaphore));
235 	// get at least one thread which has finished
236 	size_t last = -1;
237 
238 	for (size_t t = 0; t < size_t(m_activeThreadStatus.size()); ++t)
239 	{
240 		m_cs->lock();
241 		bool hasFinished = (2 == m_activeThreadStatus[t].m_status);
242 		m_cs->unlock();
243 		if (hasFinished)
244 		{
245 			last = t;
246 			break;
247 		}
248 	}
249 
250 	btThreadStatus& threadStatus = m_activeThreadStatus[last];
251 
252 	btAssert(threadStatus.m_status > 1);
253 	threadStatus.m_status = 0;
254 
255 	// need to find an active spu
256 	btAssert(last >= 0);
257 	m_startedThreadsMask &= ~(UINT64(1) << last);
258 
259 	return last;
260 }
261 
waitForAllTasks()262 void btThreadSupportPosix::waitForAllTasks()
263 {
264 	while (m_startedThreadsMask)
265 	{
266 		waitForResponse();
267 	}
268 }
269 
startThreads(const ConstructionInfo & threadConstructionInfo)270 void btThreadSupportPosix::startThreads(const ConstructionInfo& threadConstructionInfo)
271 {
272 	m_numThreads = btGetNumHardwareThreads() - 1;  // main thread exists already
273 	m_activeThreadStatus.resize(m_numThreads);
274 	m_startedThreadsMask = 0;
275 
276 	m_mainSemaphore = createSem("main");
277 	//checkPThreadFunction(sem_wait(mainSemaphore));
278 
279 	for (int i = 0; i < m_numThreads; i++)
280 	{
281 		btThreadStatus& threadStatus = m_activeThreadStatus[i];
282 		threadStatus.startSemaphore = createSem("threadLocal");
283 		threadStatus.m_userPtr = 0;
284 		threadStatus.m_cs = m_cs;
285 		threadStatus.m_taskId = i;
286 		threadStatus.m_commandId = 0;
287 		threadStatus.m_status = 0;
288 		threadStatus.m_mainSemaphore = m_mainSemaphore;
289 		threadStatus.m_userThreadFunc = threadConstructionInfo.m_userThreadFunc;
290 		threadStatus.threadUsed = 0;
291 		checkPThreadFunction(pthread_create(&threadStatus.thread, NULL, &threadFunction, (void*)&threadStatus));
292 
293 	}
294 }
295 
296 ///tell the task scheduler we are done with the SPU tasks
stopThreads()297 void btThreadSupportPosix::stopThreads()
298 {
299 	for (size_t t = 0; t < size_t(m_activeThreadStatus.size()); ++t)
300 	{
301 		btThreadStatus& threadStatus = m_activeThreadStatus[t];
302 
303 		threadStatus.m_userPtr = 0;
304 		checkPThreadFunction(sem_post(threadStatus.startSemaphore));
305 		checkPThreadFunction(sem_wait(m_mainSemaphore));
306 
307 		checkPThreadFunction(pthread_join(threadStatus.thread, 0));
308 		destroySem(threadStatus.startSemaphore);
309 	}
310 	destroySem(m_mainSemaphore);
311 	m_activeThreadStatus.clear();
312 }
313 
314 class btCriticalSectionPosix : public btCriticalSection
315 {
316 	pthread_mutex_t m_mutex;
317 
318 public:
btCriticalSectionPosix()319 	btCriticalSectionPosix()
320 	{
321 		pthread_mutex_init(&m_mutex, NULL);
322 	}
~btCriticalSectionPosix()323 	virtual ~btCriticalSectionPosix()
324 	{
325 		pthread_mutex_destroy(&m_mutex);
326 	}
327 
lock()328 	virtual void lock()
329 	{
330 		pthread_mutex_lock(&m_mutex);
331 	}
unlock()332 	virtual void unlock()
333 	{
334 		pthread_mutex_unlock(&m_mutex);
335 	}
336 };
337 
createCriticalSection()338 btCriticalSection* btThreadSupportPosix::createCriticalSection()
339 {
340 	return new btCriticalSectionPosix();
341 }
342 
deleteCriticalSection(btCriticalSection * cs)343 void btThreadSupportPosix::deleteCriticalSection(btCriticalSection* cs)
344 {
345 	delete cs;
346 }
347 
create(const ConstructionInfo & info)348 btThreadSupportInterface* btThreadSupportInterface::create(const ConstructionInfo& info)
349 {
350 	return new btThreadSupportPosix(info);
351 }
352 
353 #endif  // BT_THREADSAFE && !defined( _WIN32 )
354