1 /*
2  *  ThreadPool.cpp
3  *  OpenLieroX
4  *
5  *  Created by Albert Zeyer on 08.02.09.
6  *  code under LGPL
7  *
8  */
9 
10 #include <SDL_thread.h>
11 #include "ThreadPool.h"
12 #include "Debug.h"
13 #include "AuxLib.h"
14 #include "ReadWriteLock.h" // for ScopedLock
15 #include "Command.h"
16 
17 
18 
ThreadPool(unsigned int size)19 ThreadPool::ThreadPool(unsigned int size) {
20 	nextAction = NULL; nextIsHeadless = false; nextData = NULL;
21 	quitting = false;
22 	mutex = SDL_CreateMutex();
23 	awakeThread = SDL_CreateCond();
24 	threadStartedWork = SDL_CreateCond();
25 	threadStatusChanged = SDL_CreateCond();
26 	startMutex = SDL_CreateMutex();
27 
28 	notes << "ThreadPool: creating " << size << " threads ..." << endl;
29 	while(availableThreads.size() < size)
30 		prepareNewThread();
31 }
32 
~ThreadPool()33 ThreadPool::~ThreadPool() {
34 	waitAll();
35 
36 	// this is the hint for all available threads to break
37 	SDL_mutexP(mutex); // lock to be sure that every thread is outside that region, we could get crashes otherwise
38 	nextAction = NULL;
39 	quitting = true;
40 	SDL_CondBroadcast(awakeThread);
41 	for(std::set<ThreadPoolItem*>::iterator i = availableThreads.begin(); i != availableThreads.end(); ++i) {
42 		SDL_mutexV(mutex);
43 		SDL_WaitThread((*i)->thread, NULL);
44 		SDL_mutexP(mutex);
45 		SDL_DestroyCond((*i)->finishedSignal);
46 		SDL_DestroyCond((*i)->readyForNewWork);
47 		delete *i;
48 	}
49 	availableThreads.clear();
50 	SDL_mutexV(mutex);
51 
52 	SDL_DestroyMutex(startMutex);
53 	SDL_DestroyCond(threadStartedWork);
54 	SDL_DestroyCond(threadStatusChanged);
55 	SDL_DestroyCond(awakeThread);
56 	SDL_DestroyMutex(mutex);
57 }
58 
prepareNewThread()59 void ThreadPool::prepareNewThread() {
60 	ThreadPoolItem* t = new ThreadPoolItem();
61 	t->pool = this;
62 	t->finishedSignal = SDL_CreateCond();
63 	t->readyForNewWork = SDL_CreateCond();
64 	t->finished = false;
65 	t->working = false;
66 	availableThreads.insert(t);
67 	t->thread = SDL_CreateThread(threadWrapper, t);
68 }
69 
threadWrapper(void * param)70 int ThreadPool::threadWrapper(void* param) {
71 	ThreadPoolItem* data = (ThreadPoolItem*)param;
72 
73 	SDL_mutexP(data->pool->mutex);
74 	while(true) {
75 		while(data->pool->nextAction == NULL && !data->pool->quitting)
76 			SDL_CondWait(data->pool->awakeThread, data->pool->mutex);
77 		if(data->pool->quitting) break;
78 		data->pool->usedThreads.insert(data);
79 		data->pool->availableThreads.erase(data);
80 
81 		Action* act = data->pool->nextAction; data->pool->nextAction = NULL;
82 		data->headless = data->pool->nextIsHeadless;
83 		data->name = data->pool->nextName;
84 		data->finished = false;
85 		data->working = true;
86 		data->pool->nextData = data;
87 		SDL_mutexV(data->pool->mutex);
88 
89 		SDL_CondSignal(data->pool->threadStartedWork);
90 		setCurThreadName(data->name);
91 		data->ret = act->handle();
92 		delete act;
93 		setCurThreadName(data->name + " [finished]");
94 		SDL_mutexP(data->pool->mutex);
95 		data->finished = true;
96 		SDL_CondSignal(data->pool->threadStatusChanged);
97 
98 		if(!data->headless) { // headless means that we just can clean it up right now without waiting
99 			SDL_CondSignal(data->finishedSignal);
100 			while(data->working) SDL_CondWait(data->readyForNewWork, data->pool->mutex);
101 		} else
102 			data->working = false;
103 		data->pool->usedThreads.erase(data);
104 		data->pool->availableThreads.insert(data);
105 		SDL_CondSignal(data->pool->threadStatusChanged);
106 		setCurThreadName("");
107 	}
108 
109 	SDL_mutexV(data->pool->mutex);
110 
111 	return 0;
112 }
113 
start(Action * act,const std::string & name,bool headless)114 ThreadPoolItem* ThreadPool::start(Action* act, const std::string& name, bool headless) {
115 	SDL_mutexP(startMutex); // If start() method will be called from different threads without mutex, hard-to-find crashes will occur
116 	SDL_mutexP(mutex);
117 	if(availableThreads.size() == 0) {
118 		warnings << "no available thread in ThreadPool for " << name << ", creating new one..." << endl;
119 		prepareNewThread();
120 	}
121 	assert(nextAction == NULL);
122 	assert(nextData == NULL);
123 	nextAction = act;
124 	nextIsHeadless = headless;
125 	nextName = name;
126 
127 	SDL_CondSignal(awakeThread);
128 	while(nextData == NULL) SDL_CondWait(threadStartedWork, mutex);
129 	ThreadPoolItem* data = nextData; nextData = NULL;
130 	SDL_mutexV(mutex);
131 
132 	SDL_mutexV(startMutex);
133 	return data;
134 }
135 
start(ThreadFunc fct,void * param,const std::string & name)136 ThreadPoolItem* ThreadPool::start(ThreadFunc fct, void* param, const std::string& name) {
137 	struct StaticAction : Action {
138 		ThreadFunc fct; void* param;
139 		int handle() { return (*fct) (param); }
140 	};
141 	StaticAction* act = new StaticAction();
142 	act->fct = fct;
143 	act->param = param;
144 	ThreadPoolItem* item = start(act, name);
145 	if(item) return item;
146 	delete act;
147 	return NULL;
148 }
149 
wait(ThreadPoolItem * thread,int * status)150 bool ThreadPool::wait(ThreadPoolItem* thread, int* status) {
151 	if(!thread) return false;
152 	SDL_mutexP(mutex);
153 	if(!thread->working) {
154 		warnings << "given thread " << thread->name << " is not working anymore" << endl;
155 		SDL_mutexV(mutex);
156 		return false;
157 	}
158 	while(!thread->finished) SDL_CondWait(thread->finishedSignal, mutex);
159 	if(status) *status = thread->ret;
160 	thread->working = false;
161 	SDL_mutexV(mutex);
162 
163 	SDL_CondSignal(thread->readyForNewWork);
164 	return true;
165 }
166 
waitAll()167 bool ThreadPool::waitAll() {
168 	SDL_mutexP(mutex);
169 	while(usedThreads.size() > 0) {
170 		warnings << "ThreadPool: waiting for " << usedThreads.size() << " threads to finish:" << endl;
171 		for(std::set<ThreadPoolItem*>::iterator i = usedThreads.begin(); i != usedThreads.end(); ++i) {
172 			if((*i)->working && (*i)->finished) {
173 				warnings << "  thread " << (*i)->name << " is ready but was not cleaned up" << endl;
174 				(*i)->working = false;
175 				SDL_CondSignal((*i)->readyForNewWork);
176 			}
177 			else if((*i)->working && !(*i)->finished) {
178 				warnings << "  thread " << (*i)->name << " is still working" << endl;
179 			}
180 			else if(!(*i)->working && !(*i)->headless && (*i)->finished) {
181 				warnings << "  thread " << (*i)->name << " is cleaning itself up right now" << endl;
182 			}
183 			else {
184 				warnings << "  thread " << (*i)->name << " is in an invalid state" << endl;
185 			}
186 		}
187 		SDL_CondWait(threadStatusChanged, mutex);
188 	}
189 	SDL_mutexV(mutex);
190 
191 	return true;
192 }
193 
dumpState(CmdLineIntf & cli) const194 void ThreadPool::dumpState(CmdLineIntf& cli) const {
195 	ScopedLock lock(mutex);
196 	for(std::set<ThreadPoolItem*>::const_iterator i = usedThreads.begin(); i != usedThreads.end(); ++i) {
197 		if((*i)->working && (*i)->finished)
198 			cli.writeMsg("thread '" + (*i)->name + "': ready but was not cleaned up");
199 		else if((*i)->working && !(*i)->finished)
200 			cli.writeMsg("thread '" + (*i)->name + "': working");
201 		else if(!(*i)->working && !(*i)->headless && (*i)->finished)
202 			cli.writeMsg("thread '" + (*i)->name + "': cleanup");
203 		else
204 			cli.writeMsg("thread '" + (*i)->name + "': invalid");
205 	}
206 }
207 
208 
209 ThreadPool* threadPool = NULL;
210 
InitThreadPool(unsigned int size)211 void InitThreadPool(unsigned int size) {
212 	if(!threadPool)
213 		threadPool = new ThreadPool(size);
214 	else
215 		errors << "ThreadPool inited twice" << endl;
216 }
217 
UnInitThreadPool()218 void UnInitThreadPool() {
219 	if(threadPool) {
220 		delete threadPool;
221 		threadPool = NULL;
222 	} else
223 		errors << "ThreadPool already uninited" << endl;
224 }
225 
226 
227