1 /**
2  * \file
3  * Threadpool for all concurrent GC work.
4  *
5  * Copyright (C) 2015 Xamarin Inc
6  *
7  * Licensed under the MIT license. See LICENSE file in the project root for full license information.
8  */
9 
10 #include "config.h"
11 #ifdef HAVE_SGEN_GC
12 
13 #include "mono/sgen/sgen-gc.h"
14 #include "mono/sgen/sgen-thread-pool.h"
15 #include "mono/sgen/sgen-client.h"
16 #include "mono/utils/mono-os-mutex.h"
17 
18 static mono_mutex_t lock;
19 static mono_cond_t work_cond;
20 static mono_cond_t done_cond;
21 
22 static int threads_num;
23 static MonoNativeThreadId threads [SGEN_THREADPOOL_MAX_NUM_THREADS];
24 static int threads_context [SGEN_THREADPOOL_MAX_NUM_THREADS];
25 
26 static volatile gboolean threadpool_shutdown;
27 static volatile int threads_finished;
28 
29 static int contexts_num;
30 static SgenThreadPoolContext pool_contexts [SGEN_THREADPOOL_MAX_NUM_CONTEXTS];
31 
32 enum {
33 	STATE_WAITING,
34 	STATE_IN_PROGRESS,
35 	STATE_DONE
36 };
37 
38 /* Assumes that the lock is held. */
39 static SgenThreadPoolJob*
get_job_and_set_in_progress(SgenThreadPoolContext * context)40 get_job_and_set_in_progress (SgenThreadPoolContext *context)
41 {
42 	for (size_t i = 0; i < context->job_queue.next_slot; ++i) {
43 		SgenThreadPoolJob *job = (SgenThreadPoolJob *)context->job_queue.data [i];
44 		if (job->state == STATE_WAITING) {
45 			job->state = STATE_IN_PROGRESS;
46 			return job;
47 		}
48 	}
49 	return NULL;
50 }
51 
52 /* Assumes that the lock is held. */
53 static ssize_t
find_job_in_queue(SgenThreadPoolContext * context,SgenThreadPoolJob * job)54 find_job_in_queue (SgenThreadPoolContext *context, SgenThreadPoolJob *job)
55 {
56 	for (ssize_t i = 0; i < context->job_queue.next_slot; ++i) {
57 		if (context->job_queue.data [i] == job)
58 			return i;
59 	}
60 	return -1;
61 }
62 
63 /* Assumes that the lock is held. */
64 static void
remove_job(SgenThreadPoolContext * context,SgenThreadPoolJob * job)65 remove_job (SgenThreadPoolContext *context, SgenThreadPoolJob *job)
66 {
67 	ssize_t index;
68 	SGEN_ASSERT (0, job->state == STATE_DONE, "Why are we removing a job that's not done?");
69 	index = find_job_in_queue (context, job);
70 	SGEN_ASSERT (0, index >= 0, "Why is the job we're trying to remove not in the queue?");
71 	context->job_queue.data [index] = NULL;
72 	sgen_pointer_queue_remove_nulls (&context->job_queue);
73 	sgen_thread_pool_job_free (job);
74 }
75 
76 static gboolean
continue_idle_job(SgenThreadPoolContext * context,void * thread_data)77 continue_idle_job (SgenThreadPoolContext *context, void *thread_data)
78 {
79 	if (!context->continue_idle_job_func)
80 		return FALSE;
81 	return context->continue_idle_job_func (thread_data, context - pool_contexts);
82 }
83 
84 static gboolean
should_work(SgenThreadPoolContext * context,void * thread_data)85 should_work (SgenThreadPoolContext *context, void *thread_data)
86 {
87 	if (!context->should_work_func)
88 		return TRUE;
89 	return context->should_work_func (thread_data);
90 }
91 
92 /*
93  * Tells whether we should lock and attempt to get work from
94  * a higher priority context.
95  */
96 static gboolean
has_priority_work(int worker_index,int current_context)97 has_priority_work (int worker_index, int current_context)
98 {
99 	int i;
100 
101 	for (i = 0; i < current_context; i++) {
102 		SgenThreadPoolContext *context = &pool_contexts [i];
103 		void *thread_data;
104 
105 		if (worker_index >= context->num_threads)
106 			continue;
107 		thread_data = (context->thread_datas) ? context->thread_datas [worker_index] : NULL;
108 		if (!should_work (context, thread_data))
109 			continue;
110 		if (context->job_queue.next_slot > 0)
111 			return TRUE;
112 		if (continue_idle_job (context, thread_data))
113 			return TRUE;
114 	}
115 
116 	/* Return if job enqueued on current context. Jobs have priority over idle work */
117 	if (pool_contexts [current_context].job_queue.next_slot > 0)
118 		return TRUE;
119 
120 	return FALSE;
121 }
122 
123 /*
124  * Gets the highest priority work. If there is none, it waits
125  * for work_cond. Should always be called with lock held.
126  */
127 static void
get_work(int worker_index,int * work_context,int * do_idle,SgenThreadPoolJob ** job)128 get_work (int worker_index, int *work_context, int *do_idle, SgenThreadPoolJob **job)
129 {
130 	while (!threadpool_shutdown) {
131 		int i;
132 
133 		for (i = 0; i < contexts_num; i++) {
134 			SgenThreadPoolContext *context = &pool_contexts [i];
135 			void *thread_data;
136 
137 			if (worker_index >= context->num_threads)
138 				continue;
139 			thread_data = (context->thread_datas) ? context->thread_datas [worker_index] : NULL;
140 
141 			if (!should_work (context, thread_data))
142 				continue;
143 
144 			/*
145 			 * It's important that we check the continue idle flag with the lock held.
146 			 * Suppose we didn't check with the lock held, and the result is FALSE.  The
147 			 * main thread might then set continue idle and signal us before we can take
148 			 * the lock, and we'd lose the signal.
149 			 */
150 			*do_idle = continue_idle_job (context, thread_data);
151 			*job = get_job_and_set_in_progress (context);
152 
153 			if (*job || *do_idle) {
154 				*work_context = i;
155 				return;
156 			}
157 		}
158 
159 		/*
160 		 * Nothing to do on any context
161 		 * pthread_cond_wait() can return successfully despite the condition
162 		 * not being signalled, so we have to run this in a loop until we
163 		 * really have work to do.
164 		 */
165 		mono_os_cond_wait (&work_cond, &lock);
166 	}
167 }
168 
169 static mono_native_thread_return_t
thread_func(void * data)170 thread_func (void *data)
171 {
172 	int worker_index = (int)(gsize)data;
173 	int current_context;
174 	void *thread_data = NULL;
175 
176 	sgen_client_thread_register_worker ();
177 
178 	for (current_context = 0; current_context < contexts_num; current_context++) {
179 		if (worker_index >= pool_contexts [current_context].num_threads ||
180 				!pool_contexts [current_context].thread_init_func)
181 			break;
182 
183 		thread_data = (pool_contexts [current_context].thread_datas) ? pool_contexts [current_context].thread_datas [worker_index] : NULL;
184 		pool_contexts [current_context].thread_init_func (thread_data);
185 	}
186 
187 	current_context = 0;
188 
189 	mono_os_mutex_lock (&lock);
190 	for (;;) {
191 		gboolean do_idle = FALSE;
192 		SgenThreadPoolJob *job = NULL;
193 		SgenThreadPoolContext *context = NULL;
194 
195 		threads_context [worker_index] = -1;
196 		get_work (worker_index, &current_context, &do_idle, &job);
197 		threads_context [worker_index] = current_context;
198 
199 		if (!threadpool_shutdown) {
200 			context = &pool_contexts [current_context];
201 			thread_data = (context->thread_datas) ? context->thread_datas [worker_index] : NULL;
202 		}
203 
204 		mono_os_mutex_unlock (&lock);
205 
206 		if (job) {
207 			job->func (thread_data, job);
208 
209 			mono_os_mutex_lock (&lock);
210 
211 			SGEN_ASSERT (0, job->state == STATE_IN_PROGRESS, "The job should still be in progress.");
212 			job->state = STATE_DONE;
213 			remove_job (context, job);
214 			/*
215 			 * Only the main GC thread will ever wait on the done condition, so we don't
216 			 * have to broadcast.
217 			 */
218 			mono_os_cond_signal (&done_cond);
219 		} else if (do_idle) {
220 			SGEN_ASSERT (0, context->idle_job_func, "Why do we have idle work when there's no idle job function?");
221 			do {
222 				context->idle_job_func (thread_data);
223 				do_idle = continue_idle_job (context, thread_data);
224 			} while (do_idle && !has_priority_work (worker_index, current_context));
225 
226 			mono_os_mutex_lock (&lock);
227 
228 			if (!do_idle)
229 				mono_os_cond_signal (&done_cond);
230 		} else {
231 			SGEN_ASSERT (0, threadpool_shutdown, "Why did we unlock if no jobs and not shutting down?");
232 			mono_os_mutex_lock (&lock);
233 			threads_finished++;
234 			mono_os_cond_signal (&done_cond);
235 			mono_os_mutex_unlock (&lock);
236 			return 0;
237 		}
238 	}
239 
240 	return (mono_native_thread_return_t)0;
241 }
242 
243 int
sgen_thread_pool_create_context(int num_threads,SgenThreadPoolThreadInitFunc init_func,SgenThreadPoolIdleJobFunc idle_func,SgenThreadPoolContinueIdleJobFunc continue_idle_func,SgenThreadPoolShouldWorkFunc should_work_func,void ** thread_datas)244 sgen_thread_pool_create_context (int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, SgenThreadPoolContinueIdleJobFunc continue_idle_func, SgenThreadPoolShouldWorkFunc should_work_func, void **thread_datas)
245 {
246 	int context_id = contexts_num;
247 
248 	SGEN_ASSERT (0, contexts_num < SGEN_THREADPOOL_MAX_NUM_CONTEXTS, "Maximum sgen thread pool contexts reached");
249 
250 	pool_contexts [context_id].thread_init_func = init_func;
251 	pool_contexts [context_id].idle_job_func = idle_func;
252 	pool_contexts [context_id].continue_idle_job_func = continue_idle_func;
253 	pool_contexts [context_id].should_work_func = should_work_func;
254 	pool_contexts [context_id].thread_datas = thread_datas;
255 
256 	SGEN_ASSERT (0, num_threads <= SGEN_THREADPOOL_MAX_NUM_THREADS, "Maximum sgen thread pool threads exceeded");
257 
258 	pool_contexts [context_id].num_threads = num_threads;
259 
260 	sgen_pointer_queue_init (&pool_contexts [contexts_num].job_queue, 0);
261 
262 	contexts_num++;
263 
264 	return context_id;
265 }
266 
267 void
sgen_thread_pool_start(void)268 sgen_thread_pool_start (void)
269 {
270 	int i;
271 
272 	for (i = 0; i < contexts_num; i++) {
273 		if (threads_num < pool_contexts [i].num_threads)
274 			threads_num = pool_contexts [i].num_threads;
275 	}
276 
277 	if (!threads_num)
278 		return;
279 
280 	mono_os_mutex_init (&lock);
281 	mono_os_cond_init (&work_cond);
282 	mono_os_cond_init (&done_cond);
283 
284 	threads_finished = 0;
285 	threadpool_shutdown = FALSE;
286 
287 	for (i = 0; i < threads_num; i++) {
288 		mono_native_thread_create (&threads [i], thread_func, (void*)(gsize)i);
289 	}
290 }
291 
292 void
sgen_thread_pool_shutdown(void)293 sgen_thread_pool_shutdown (void)
294 {
295 	if (!threads_num)
296 		return;
297 
298 	mono_os_mutex_lock (&lock);
299 	threadpool_shutdown = TRUE;
300 	mono_os_cond_broadcast (&work_cond);
301 	while (threads_finished < threads_num)
302 		mono_os_cond_wait (&done_cond, &lock);
303 	mono_os_mutex_unlock (&lock);
304 
305 	mono_os_mutex_destroy (&lock);
306 	mono_os_cond_destroy (&work_cond);
307 	mono_os_cond_destroy (&done_cond);
308 
309 	for (int i = 0; i < threads_num; i++) {
310 		mono_threads_add_joinable_thread ((gpointer)threads [i]);
311 	}
312 }
313 
314 SgenThreadPoolJob*
sgen_thread_pool_job_alloc(const char * name,SgenThreadPoolJobFunc func,size_t size)315 sgen_thread_pool_job_alloc (const char *name, SgenThreadPoolJobFunc func, size_t size)
316 {
317 	SgenThreadPoolJob *job = (SgenThreadPoolJob *)sgen_alloc_internal_dynamic (size, INTERNAL_MEM_THREAD_POOL_JOB, TRUE);
318 	job->name = name;
319 	job->size = size;
320 	job->state = STATE_WAITING;
321 	job->func = func;
322 	return job;
323 }
324 
325 void
sgen_thread_pool_job_free(SgenThreadPoolJob * job)326 sgen_thread_pool_job_free (SgenThreadPoolJob *job)
327 {
328 	sgen_free_internal_dynamic (job, job->size, INTERNAL_MEM_THREAD_POOL_JOB);
329 }
330 
331 void
sgen_thread_pool_job_enqueue(int context_id,SgenThreadPoolJob * job)332 sgen_thread_pool_job_enqueue (int context_id, SgenThreadPoolJob *job)
333 {
334 	mono_os_mutex_lock (&lock);
335 
336 	sgen_pointer_queue_add (&pool_contexts [context_id].job_queue, job);
337 	mono_os_cond_broadcast (&work_cond);
338 
339 	mono_os_mutex_unlock (&lock);
340 }
341 
342 void
sgen_thread_pool_job_wait(int context_id,SgenThreadPoolJob * job)343 sgen_thread_pool_job_wait (int context_id, SgenThreadPoolJob *job)
344 {
345 	SGEN_ASSERT (0, job, "Where's the job?");
346 
347 	mono_os_mutex_lock (&lock);
348 
349 	while (find_job_in_queue (&pool_contexts [context_id], job) >= 0)
350 		mono_os_cond_wait (&done_cond, &lock);
351 
352 	mono_os_mutex_unlock (&lock);
353 }
354 
355 void
sgen_thread_pool_idle_signal(int context_id)356 sgen_thread_pool_idle_signal (int context_id)
357 {
358 	SGEN_ASSERT (0, pool_contexts [context_id].idle_job_func, "Why are we signaling idle without an idle function?");
359 
360 	mono_os_mutex_lock (&lock);
361 
362 	if (pool_contexts [context_id].continue_idle_job_func (NULL, context_id))
363 		mono_os_cond_broadcast (&work_cond);
364 
365 	mono_os_mutex_unlock (&lock);
366 }
367 
368 void
sgen_thread_pool_idle_wait(int context_id,SgenThreadPoolContinueIdleWaitFunc continue_wait)369 sgen_thread_pool_idle_wait (int context_id, SgenThreadPoolContinueIdleWaitFunc continue_wait)
370 {
371 	SGEN_ASSERT (0, pool_contexts [context_id].idle_job_func, "Why are we waiting for idle without an idle function?");
372 
373 	mono_os_mutex_lock (&lock);
374 
375 	while (continue_wait (context_id, threads_context))
376 		mono_os_cond_wait (&done_cond, &lock);
377 
378 	mono_os_mutex_unlock (&lock);
379 }
380 
381 void
sgen_thread_pool_wait_for_all_jobs(int context_id)382 sgen_thread_pool_wait_for_all_jobs (int context_id)
383 {
384 	mono_os_mutex_lock (&lock);
385 
386 	while (!sgen_pointer_queue_is_empty (&pool_contexts [context_id].job_queue))
387 		mono_os_cond_wait (&done_cond, &lock);
388 
389 	mono_os_mutex_unlock (&lock);
390 }
391 
392 /* Return 0 if is not a thread pool thread or the thread number otherwise */
393 int
sgen_thread_pool_is_thread_pool_thread(MonoNativeThreadId some_thread)394 sgen_thread_pool_is_thread_pool_thread (MonoNativeThreadId some_thread)
395 {
396 	int i;
397 
398 	for (i = 0; i < threads_num; i++) {
399 		if (some_thread == threads [i])
400 			return i + 1;
401 	}
402 
403 	return 0;
404 }
405 
406 #endif
407