1 /*
2 Title: Task farm for Multi-Threaded Garbage Collector
3
4 Copyright (c) 2010, 2019 David C. J. Matthews
5
6 This library is free software; you can redistribute it and/or
7 modify it under the terms of the GNU Lesser General Public
8 License version 2.1 as published by the Free Software Foundation.
9
10 This library is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Lesser General Public License for more details.
14
15 You should have received a copy of the GNU Lesser General Public
16 License along with this library; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
19 */
20
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #elif defined(_WIN32)
24 #include "winconfig.h"
25 #else
26 #error "No configuration file"
27 #endif
28
29 #ifdef HAVE_STDLIB_H
30 #include <stdlib.h>
31 #endif
32 #ifdef HAVE_MALLOC_H
33 #include <malloc.h>
34 #endif
35
36 #ifdef HAVE_SYS_TIME_H
37 #include <sys/time.h>
38 #endif
39
40 #ifdef HAVE_ASSERT_H
41 #include <assert.h>
42 #define ASSERT(x) assert(x)
43 #else
44 #define ASSERT(x)
45 #endif
46
47 #include "gctaskfarm.h"
48 #include "diagnostics.h"
49 #include "timing.h"
50
51 static GCTaskId gTask;
52
53 GCTaskId *globalTask = &gTask;
54
GCTaskFarm()55 GCTaskFarm::GCTaskFarm(): workLock("GC task farm work")
56 {
57 queueSize = queueIn = queuedItems = 0;
58 workQueue = 0;
59 terminate = false;
60 threadCount = activeThreadCount = 0;
61 threadHandles = 0;
62 }
63
~GCTaskFarm()64 GCTaskFarm::~GCTaskFarm()
65 {
66 Terminate();
67 free(workQueue);
68 free(threadHandles);
69 }
70
71
Initialise(unsigned thrdCount,unsigned qSize)72 bool GCTaskFarm::Initialise(unsigned thrdCount, unsigned qSize)
73 {
74 terminate = false;
75 if (!waitForWork.Init(0, thrdCount)) return false;
76 workQueue = (queue_entry*)calloc(qSize, sizeof(queue_entry));
77 if (workQueue == 0) return false;
78 #if (!defined(_WIN32))
79 queueSize = qSize;
80 threadHandles = (pthread_t*)calloc(thrdCount, sizeof(pthread_t));
81 if (threadHandles == 0) return false;
82 #else
83 queueSize = qSize;
84 threadHandles = (HANDLE*)calloc(thrdCount, sizeof(HANDLE));
85 if (threadHandles == 0) return false;
86 #endif
87 // Create the worker threads.
88 for (unsigned i = 0; i < thrdCount; i++) {
89 // Fork a thread
90 #if (!defined(_WIN32))
91 // Create a thread that isn't joinable since we don't want to wait
92 // for it to finish.
93 pthread_t pthreadId;
94 bool isError = pthread_create(&pthreadId, NULL, WorkerThreadFunction, this) != 0;
95 if (isError) break;
96 threadHandles[threadCount++] = pthreadId;
97 #else
98 DWORD dwThrdId; // Have to provide this although we don't use it.
99 HANDLE threadHandle =
100 CreateThread(NULL, 0, WorkerThreadFunction, this, 0, &dwThrdId);
101 if (threadHandle == NULL) break;
102 threadHandles[threadCount++] = threadHandle;
103 #endif
104 }
105
106 return true;
107 }
108
Terminate()109 void GCTaskFarm::Terminate()
110 {
111 terminate = true;
112 // Increment the semaphore by the number of threads to release them all.
113 for (unsigned i = 0; i < threadCount; i++) waitForWork.Signal();
114 // Wait for the threads to terminate.
115 #if (!defined(_WIN32))
116 for (unsigned j = 0; j < threadCount; j++)
117 pthread_join(threadHandles[j], NULL);
118 #else
119 if (threadCount != 0)
120 WaitForMultipleObjects(threadCount, threadHandles, TRUE, 10000);
121 #endif
122 }
123
124 // Add work to the queue. Returns true if it succeeds.
AddWork(gctask work,void * arg1,void * arg2)125 bool GCTaskFarm::AddWork(gctask work, void *arg1, void *arg2)
126 {
127 bool wantSignal = false;
128 {
129 PLocker l(&workLock);
130 if (queuedItems == queueSize)
131 return false; // Queue is full
132 workQueue[queueIn].task = work;
133 workQueue[queueIn].arg1 = arg1;
134 workQueue[queueIn].arg2 = arg2;
135 queueIn++;
136 if (queueIn == queueSize) queueIn = 0;
137 queuedItems++;
138 wantSignal = queuedItems <= threadCount;
139 }
140 if (wantSignal) waitForWork.Signal();
141 return true;
142 }
143
144 // Schedule this as a task or run it immediately if the queue is full.
AddWorkOrRunNow(gctask work,void * arg1,void * arg2)145 void GCTaskFarm::AddWorkOrRunNow(gctask work, void *arg1, void *arg2)
146 {
147 if (! AddWork(work, arg1, arg2))
148 (*work)(globalTask, arg1, arg2);
149 }
150
ThreadFunction()151 void GCTaskFarm::ThreadFunction()
152 {
153 GCTaskId myTaskId;
154 #if (defined(_WIN32))
155 DWORD startActive = GetTickCount();
156 #else
157 struct timeval startTime;
158 gettimeofday(&startTime, NULL);
159 #endif
160 workLock.Lock();
161 activeThreadCount++;
162 while (! terminate) {
163 // Invariant: We have the lock and the activeThreadCount includes this thread.
164 // Find some work.
165
166 if (queuedItems > 0) { // There is work
167 unsigned outPos;
168 if (queuedItems > queueIn)
169 outPos = queueIn+queueSize-queuedItems;
170 else outPos = queueIn-queuedItems;
171 gctask work = workQueue[outPos].task;
172 void *arg1 = workQueue[outPos].arg1;
173 void *arg2 = workQueue[outPos].arg2;
174 workQueue[outPos].task = 0;
175 queuedItems--;
176 ASSERT(work != 0);
177 workLock.Unlock();
178 (*work)(&myTaskId, arg1, arg2);
179 workLock.Lock();
180 }
181 else {
182 activeThreadCount--; // We're no longer active
183 // If there is no work and we're the last active thread signal the
184 // main thread that the queue is empty
185 bool wantSignal = activeThreadCount == 0;
186 if (wantSignal)
187 waitForCompletion.Signal();
188 // Now release the lock. In our Windows partial implementation of
189 // condition vars we assume that signalling is done with the lock
190 // still held.
191 workLock.Unlock();
192
193 if (debugOptions & DEBUG_GCTASKS)
194 {
195 #if (defined(_WIN32))
196 Log("GCTask: Thread %p blocking after %u milliseconds\n", &myTaskId,
197 GetTickCount() - startActive);
198 #else
199 struct timeval endTime;
200 gettimeofday(&endTime, NULL);
201 subTimevals(&endTime, &startTime);
202 Log("GCTask: Thread %p blocking after %0.4f seconds\n", &myTaskId,
203 (float)endTime.tv_sec + (float)endTime.tv_usec / 1.0E6);
204 #endif
205 }
206
207 if (terminate) return;
208 // Block until there's work.
209 waitForWork.Wait();
210 // We've been woken up
211 if (debugOptions & DEBUG_GCTASKS)
212 {
213 #if (defined(_WIN32))
214 startActive = GetTickCount();
215 #else
216 gettimeofday(&startTime, NULL);
217 #endif
218 Log("GCTask: Thread %p resuming\n", &myTaskId);
219 }
220 workLock.Lock();
221 activeThreadCount++;
222 }
223 }
224 activeThreadCount--;
225 workLock.Unlock();
226 }
227
228 #if (!defined(_WIN32))
WorkerThreadFunction(void * parameter)229 void *GCTaskFarm::WorkerThreadFunction(void *parameter)
230 {
231 GCTaskFarm *t = (GCTaskFarm *)parameter;
232 t->ThreadFunction();
233 return 0;
234 }
235 #else
WorkerThreadFunction(void * parameter)236 DWORD WINAPI GCTaskFarm::WorkerThreadFunction(void *parameter)
237 {
238 GCTaskFarm *t = (GCTaskFarm *)parameter;
239 t->ThreadFunction();
240 return 0;
241 }
242 #endif
243
244 // Wait until the queue is empty.
WaitForCompletion(void)245 void GCTaskFarm::WaitForCompletion(void)
246 {
247 #if (defined(_WIN32))
248 DWORD startWait;
249 if (debugOptions & DEBUG_GCTASKS)
250 startWait = GetTickCount();
251 #else
252 struct timeval startWait;
253 if (debugOptions & DEBUG_GCTASKS)
254 gettimeofday(&startWait, NULL);
255 #endif
256 workLock.Lock();
257 while (activeThreadCount > 0 || queuedItems > 0)
258 waitForCompletion.Wait(&workLock);
259 workLock.Unlock();
260
261 if (debugOptions & DEBUG_GCTASKS)
262 {
263 #if (defined(_WIN32))
264 Log("GCTask: Threads completed after %u milliseconds\n", GetTickCount()-startWait);
265 #else
266 struct timeval endWait;
267 gettimeofday(&endWait, NULL);
268 subTimevals(&endWait, &startWait);
269 Log("GCTask: Threads completed after %0.4f seconds\n",
270 (float)endWait.tv_sec + (float)endWait.tv_usec / 1.0E6);
271 #endif
272 }
273 }
274