1 /*
2 * ThreadPool.cpp
3 * OpenLieroX
4 *
5 * Created by Albert Zeyer on 08.02.09.
6 * code under LGPL
7 *
8 */
9
10 #include <SDL_thread.h>
11 #include "ThreadPool.h"
12 #include "Debug.h"
13 #include "AuxLib.h"
14 #include "ReadWriteLock.h" // for ScopedLock
15 #include "Command.h"
16
17
18
ThreadPool(unsigned int size)19 ThreadPool::ThreadPool(unsigned int size) {
20 nextAction = NULL; nextIsHeadless = false; nextData = NULL;
21 quitting = false;
22 mutex = SDL_CreateMutex();
23 awakeThread = SDL_CreateCond();
24 threadStartedWork = SDL_CreateCond();
25 threadStatusChanged = SDL_CreateCond();
26 startMutex = SDL_CreateMutex();
27
28 notes << "ThreadPool: creating " << size << " threads ..." << endl;
29 while(availableThreads.size() < size)
30 prepareNewThread();
31 }
32
~ThreadPool()33 ThreadPool::~ThreadPool() {
34 waitAll();
35
36 // this is the hint for all available threads to break
37 SDL_mutexP(mutex); // lock to be sure that every thread is outside that region, we could get crashes otherwise
38 nextAction = NULL;
39 quitting = true;
40 SDL_CondBroadcast(awakeThread);
41 for(std::set<ThreadPoolItem*>::iterator i = availableThreads.begin(); i != availableThreads.end(); ++i) {
42 SDL_mutexV(mutex);
43 SDL_WaitThread((*i)->thread, NULL);
44 SDL_mutexP(mutex);
45 SDL_DestroyCond((*i)->finishedSignal);
46 SDL_DestroyCond((*i)->readyForNewWork);
47 delete *i;
48 }
49 availableThreads.clear();
50 SDL_mutexV(mutex);
51
52 SDL_DestroyMutex(startMutex);
53 SDL_DestroyCond(threadStartedWork);
54 SDL_DestroyCond(threadStatusChanged);
55 SDL_DestroyCond(awakeThread);
56 SDL_DestroyMutex(mutex);
57 }
58
prepareNewThread()59 void ThreadPool::prepareNewThread() {
60 ThreadPoolItem* t = new ThreadPoolItem();
61 t->pool = this;
62 t->finishedSignal = SDL_CreateCond();
63 t->readyForNewWork = SDL_CreateCond();
64 t->finished = false;
65 t->working = false;
66 availableThreads.insert(t);
67 t->thread = SDL_CreateThread(threadWrapper, t);
68 }
69
threadWrapper(void * param)70 int ThreadPool::threadWrapper(void* param) {
71 ThreadPoolItem* data = (ThreadPoolItem*)param;
72
73 SDL_mutexP(data->pool->mutex);
74 while(true) {
75 while(data->pool->nextAction == NULL && !data->pool->quitting)
76 SDL_CondWait(data->pool->awakeThread, data->pool->mutex);
77 if(data->pool->quitting) break;
78 data->pool->usedThreads.insert(data);
79 data->pool->availableThreads.erase(data);
80
81 Action* act = data->pool->nextAction; data->pool->nextAction = NULL;
82 data->headless = data->pool->nextIsHeadless;
83 data->name = data->pool->nextName;
84 data->finished = false;
85 data->working = true;
86 data->pool->nextData = data;
87 SDL_mutexV(data->pool->mutex);
88
89 SDL_CondSignal(data->pool->threadStartedWork);
90 setCurThreadName(data->name);
91 data->ret = act->handle();
92 delete act;
93 setCurThreadName(data->name + " [finished]");
94 SDL_mutexP(data->pool->mutex);
95 data->finished = true;
96 SDL_CondSignal(data->pool->threadStatusChanged);
97
98 if(!data->headless) { // headless means that we just can clean it up right now without waiting
99 SDL_CondSignal(data->finishedSignal);
100 while(data->working) SDL_CondWait(data->readyForNewWork, data->pool->mutex);
101 } else
102 data->working = false;
103 data->pool->usedThreads.erase(data);
104 data->pool->availableThreads.insert(data);
105 SDL_CondSignal(data->pool->threadStatusChanged);
106 setCurThreadName("");
107 }
108
109 SDL_mutexV(data->pool->mutex);
110
111 return 0;
112 }
113
start(Action * act,const std::string & name,bool headless)114 ThreadPoolItem* ThreadPool::start(Action* act, const std::string& name, bool headless) {
115 SDL_mutexP(startMutex); // If start() method will be called from different threads without mutex, hard-to-find crashes will occur
116 SDL_mutexP(mutex);
117 if(availableThreads.size() == 0) {
118 warnings << "no available thread in ThreadPool for " << name << ", creating new one..." << endl;
119 prepareNewThread();
120 }
121 assert(nextAction == NULL);
122 assert(nextData == NULL);
123 nextAction = act;
124 nextIsHeadless = headless;
125 nextName = name;
126
127 SDL_CondSignal(awakeThread);
128 while(nextData == NULL) SDL_CondWait(threadStartedWork, mutex);
129 ThreadPoolItem* data = nextData; nextData = NULL;
130 SDL_mutexV(mutex);
131
132 SDL_mutexV(startMutex);
133 return data;
134 }
135
start(ThreadFunc fct,void * param,const std::string & name)136 ThreadPoolItem* ThreadPool::start(ThreadFunc fct, void* param, const std::string& name) {
137 struct StaticAction : Action {
138 ThreadFunc fct; void* param;
139 int handle() { return (*fct) (param); }
140 };
141 StaticAction* act = new StaticAction();
142 act->fct = fct;
143 act->param = param;
144 ThreadPoolItem* item = start(act, name);
145 if(item) return item;
146 delete act;
147 return NULL;
148 }
149
wait(ThreadPoolItem * thread,int * status)150 bool ThreadPool::wait(ThreadPoolItem* thread, int* status) {
151 if(!thread) return false;
152 SDL_mutexP(mutex);
153 if(!thread->working) {
154 warnings << "given thread " << thread->name << " is not working anymore" << endl;
155 SDL_mutexV(mutex);
156 return false;
157 }
158 while(!thread->finished) SDL_CondWait(thread->finishedSignal, mutex);
159 if(status) *status = thread->ret;
160 thread->working = false;
161 SDL_mutexV(mutex);
162
163 SDL_CondSignal(thread->readyForNewWork);
164 return true;
165 }
166
waitAll()167 bool ThreadPool::waitAll() {
168 SDL_mutexP(mutex);
169 while(usedThreads.size() > 0) {
170 warnings << "ThreadPool: waiting for " << usedThreads.size() << " threads to finish:" << endl;
171 for(std::set<ThreadPoolItem*>::iterator i = usedThreads.begin(); i != usedThreads.end(); ++i) {
172 if((*i)->working && (*i)->finished) {
173 warnings << " thread " << (*i)->name << " is ready but was not cleaned up" << endl;
174 (*i)->working = false;
175 SDL_CondSignal((*i)->readyForNewWork);
176 }
177 else if((*i)->working && !(*i)->finished) {
178 warnings << " thread " << (*i)->name << " is still working" << endl;
179 }
180 else if(!(*i)->working && !(*i)->headless && (*i)->finished) {
181 warnings << " thread " << (*i)->name << " is cleaning itself up right now" << endl;
182 }
183 else {
184 warnings << " thread " << (*i)->name << " is in an invalid state" << endl;
185 }
186 }
187 SDL_CondWait(threadStatusChanged, mutex);
188 }
189 SDL_mutexV(mutex);
190
191 return true;
192 }
193
dumpState(CmdLineIntf & cli) const194 void ThreadPool::dumpState(CmdLineIntf& cli) const {
195 ScopedLock lock(mutex);
196 for(std::set<ThreadPoolItem*>::const_iterator i = usedThreads.begin(); i != usedThreads.end(); ++i) {
197 if((*i)->working && (*i)->finished)
198 cli.writeMsg("thread '" + (*i)->name + "': ready but was not cleaned up");
199 else if((*i)->working && !(*i)->finished)
200 cli.writeMsg("thread '" + (*i)->name + "': working");
201 else if(!(*i)->working && !(*i)->headless && (*i)->finished)
202 cli.writeMsg("thread '" + (*i)->name + "': cleanup");
203 else
204 cli.writeMsg("thread '" + (*i)->name + "': invalid");
205 }
206 }
207
208
209 ThreadPool* threadPool = NULL;
210
InitThreadPool(unsigned int size)211 void InitThreadPool(unsigned int size) {
212 if(!threadPool)
213 threadPool = new ThreadPool(size);
214 else
215 errors << "ThreadPool inited twice" << endl;
216 }
217
UnInitThreadPool()218 void UnInitThreadPool() {
219 if(threadPool) {
220 delete threadPool;
221 threadPool = NULL;
222 } else
223 errors << "ThreadPool already uninited" << endl;
224 }
225
226
227