1 /*-
2  * Copyright (c) 2014-2018 MongoDB, Inc.
3  * Copyright (c) 2008-2014 WiredTiger, Inc.
4  *	All rights reserved.
5  *
6  * See the file LICENSE for redistribution information.
7  */
8 
9 #include "wt_internal.h"
10 
11 /*
12  * __thread_run --
13  *	General wrapper for any thread.
14  */
15 static WT_THREAD_RET
__thread_run(void * arg)16 __thread_run(void *arg)
17 {
18 	WT_DECL_RET;
19 	WT_SESSION_IMPL *session;
20 	WT_THREAD *thread;
21 
22 	thread = (WT_THREAD*)arg;
23 	session = thread->session;
24 
25 	for (;;) {
26 		if (!F_ISSET(thread, WT_THREAD_RUN))
27 			break;
28 		if (!F_ISSET(thread, WT_THREAD_ACTIVE))
29 			__wt_cond_wait(session, thread->pause_cond,
30 			    WT_THREAD_PAUSE * WT_MILLION, thread->chk_func);
31 		WT_ERR(thread->run_func(session, thread));
32 	}
33 
34 	/*
35 	 * If a thread is stopping it may have subsystem cleanup to do.
36 	 */
37 err:	if (thread->stop_func != NULL)
38 		ret = thread->stop_func(session, thread);
39 
40 	if (ret != 0 && F_ISSET(thread, WT_THREAD_PANIC_FAIL))
41 		WT_PANIC_MSG(session, ret,
42 		    "Unrecoverable utility thread error");
43 
44 	/*
45 	 * The three cases when threads are expected to stop are:
46 	 * 1.  When recovery is done.
47 	 * 2.  When the connection is closing.
48 	 * 3.  When a shutdown has been requested via clearing the run flag.
49 	 */
50 	WT_ASSERT(session, !F_ISSET(thread, WT_THREAD_RUN) ||
51 	    F_ISSET(S2C(session), WT_CONN_CLOSING | WT_CONN_RECOVERING));
52 
53 	return (WT_THREAD_RET_VALUE);
54 }
55 
56 /*
57  * __thread_group_shrink --
58  *	Decrease the number of threads in the group and free memory
59  *	associated with slots larger than the new count.
60  */
61 static int
__thread_group_shrink(WT_SESSION_IMPL * session,WT_THREAD_GROUP * group,uint32_t new_count)62 __thread_group_shrink(
63     WT_SESSION_IMPL *session, WT_THREAD_GROUP *group, uint32_t new_count)
64 {
65 	WT_DECL_RET;
66 	WT_SESSION *wt_session;
67 	WT_THREAD *thread;
68 	uint32_t current_slot;
69 
70 	WT_ASSERT(session, __wt_rwlock_islocked(session, &group->lock));
71 
72 	for (current_slot = group->alloc; current_slot > new_count; ) {
73 		/*
74 		 * The offset value is a counter not an array index,
75 		 * so adjust it before finding the last thread in the group.
76 		 */
77 		thread = group->threads[--current_slot];
78 
79 		if (thread == NULL)
80 			continue;
81 
82 		WT_ASSERT(session, thread->tid.created);
83 		__wt_verbose(session, WT_VERB_THREAD_GROUP,
84 		    "Stopping utility thread: %s:%" PRIu32,
85 		    group->name, thread->id);
86 		if (F_ISSET(thread, WT_THREAD_ACTIVE))
87 			--group->current_threads;
88 		F_CLR(thread, WT_THREAD_ACTIVE | WT_THREAD_RUN);
89 		/*
90 		 * Signal the thread in case it is in a long timeout.
91 		 */
92 		__wt_cond_signal(session, thread->pause_cond);
93 		__wt_cond_signal(session, group->wait_cond);
94 	}
95 
96 	/*
97 	 * We have to perform the join without holding the lock because
98 	 * the threads themselves may be waiting on the lock.
99 	 */
100 	__wt_writeunlock(session, &group->lock);
101 	for (current_slot = group->alloc; current_slot > new_count; ) {
102 		thread = group->threads[--current_slot];
103 
104 		if (thread == NULL)
105 			continue;
106 		WT_TRET(__wt_thread_join(session, &thread->tid));
107 		__wt_cond_destroy(session, &thread->pause_cond);
108 	}
109 	__wt_writelock(session, &group->lock);
110 	for (current_slot = group->alloc; current_slot > new_count; ) {
111 		thread = group->threads[--current_slot];
112 
113 		if (thread == NULL)
114 			continue;
115 		WT_ASSERT(session, thread->session != NULL);
116 		wt_session = (WT_SESSION *)thread->session;
117 		WT_TRET(wt_session->close(wt_session, NULL));
118 		thread->session = NULL;
119 		__wt_free(session, thread);
120 		group->threads[current_slot] = NULL;
121 	}
122 
123 	return (ret);
124 }
125 
126 /*
127  * __thread_group_resize --
128  *	Resize an array of utility threads already holding the lock.
129  */
130 static int
__thread_group_resize(WT_SESSION_IMPL * session,WT_THREAD_GROUP * group,uint32_t new_min,uint32_t new_max,uint32_t flags)131 __thread_group_resize(
132     WT_SESSION_IMPL *session, WT_THREAD_GROUP *group,
133     uint32_t new_min, uint32_t new_max, uint32_t flags)
134 {
135 	WT_CONNECTION_IMPL *conn;
136 	WT_DECL_RET;
137 	WT_SESSION *wt_session;
138 	WT_THREAD *thread;
139 	size_t alloc;
140 	uint32_t i, session_flags;
141 
142 	conn = S2C(session);
143 	thread = NULL;
144 
145 	__wt_verbose(session, WT_VERB_THREAD_GROUP,
146 	    "Resize thread group: %s, from min: %" PRIu32 " -> %" PRIu32
147 	    " from max: %" PRIu32 " -> %" PRIu32,
148 	    group->name, group->min, new_min, group->max, new_max);
149 
150 	WT_ASSERT(session,
151 	    group->current_threads <= group->alloc &&
152 	    __wt_rwlock_islocked(session, &group->lock));
153 
154 	if (new_min == group->min && new_max == group->max)
155 		return (0);
156 
157 	if (new_min > new_max)
158 		WT_RET_MSG(session, EINVAL,
159 		    "Illegal thread group resize: %s, from min: %" PRIu32
160 		    " -> %" PRIu32 " from max: %" PRIu32 " -> %" PRIu32,
161 		    group->name, group->min, new_min, group->max, new_max);
162 
163 	/*
164 	 * Call shrink to reduce the number of thread structures and running
165 	 * threads if required by the change in group size.
166 	 */
167 	WT_RET(__thread_group_shrink(session, group, new_max));
168 
169 	/*
170 	 * Only reallocate the thread array if it is the largest ever, since
171 	 * our realloc doesn't support shrinking the allocated size.
172 	 */
173 	if (group->alloc < new_max) {
174 		alloc = group->alloc * sizeof(*group->threads);
175 		WT_RET(__wt_realloc(session, &alloc,
176 		    new_max * sizeof(*group->threads), &group->threads));
177 		group->alloc = new_max;
178 	}
179 
180 	/*
181 	 * Initialize the structures based on the previous group size, not
182 	 * the previous allocated size.
183 	 */
184 	for (i = group->max; i < new_max; i++) {
185 		WT_ERR(__wt_calloc_one(session, &thread));
186 		/*
187 		 * Threads get their own session and lookaside table cursor
188 		 * (if the lookaside table is open).
189 		 */
190 		session_flags =
191 		    LF_ISSET(WT_THREAD_CAN_WAIT) ? WT_SESSION_CAN_WAIT : 0;
192 		WT_ERR(__wt_open_internal_session(conn, group->name,
193 		    false, session_flags, &thread->session));
194 		if (LF_ISSET(WT_THREAD_LOOKASIDE) &&
195 		    F_ISSET(conn, WT_CONN_LOOKASIDE_OPEN))
196 			WT_ERR(__wt_las_cursor_open(thread->session));
197 		if (LF_ISSET(WT_THREAD_PANIC_FAIL))
198 			F_SET(thread, WT_THREAD_PANIC_FAIL);
199 		thread->id = i;
200 		thread->chk_func = group->chk_func;
201 		thread->run_func = group->run_func;
202 		thread->stop_func = group->stop_func;
203 		WT_ERR(__wt_cond_alloc(
204 		    session, "Thread cond", &thread->pause_cond));
205 
206 		/*
207 		 * Start thread as inactive.  We'll activate the needed
208 		 * number later.
209 		 */
210 		__wt_verbose(session, WT_VERB_THREAD_GROUP,
211 		    "Starting utility thread: %s:%" PRIu32,
212 		    group->name, thread->id);
213 		F_SET(thread, WT_THREAD_RUN);
214 		WT_ERR(__wt_thread_create(thread->session,
215 		    &thread->tid, __thread_run, thread));
216 
217 		WT_ASSERT(session, group->threads[i] == NULL);
218 		group->threads[i] = thread;
219 		thread = NULL;
220 	}
221 
222 	group->max = new_max;
223 	group->min = new_min;
224 	while (group->current_threads < new_min)
225 		__wt_thread_group_start_one(session, group, true);
226 	return (0);
227 
228 err:	/*
229 	 * An error resizing a thread array is currently fatal, it should only
230 	 * happen in an out of memory situation. Do real cleanup just in case
231 	 * that changes in the future.
232 	 */
233 	if (thread != NULL) {
234 		if (thread->session != NULL) {
235 			wt_session = (WT_SESSION *)thread->session;
236 			WT_TRET(wt_session->close(wt_session, NULL));
237 		}
238 		__wt_cond_destroy(session, &thread->pause_cond);
239 		__wt_free(session, thread);
240 	}
241 
242 	/*
243 	 * Update the thread group information even on failure to improve our
244 	 * chances of cleaning up properly.
245 	 */
246 	group->max = new_max;
247 	group->min = new_min;
248 	WT_TRET(__wt_thread_group_destroy(session, group));
249 
250 	WT_PANIC_RET(session, ret, "Error while resizing thread group");
251 }
252 
253 /*
254  * __wt_thread_group_resize --
255  *	Resize an array of utility threads taking the lock.
256  */
257 int
__wt_thread_group_resize(WT_SESSION_IMPL * session,WT_THREAD_GROUP * group,uint32_t new_min,uint32_t new_max,uint32_t flags)258 __wt_thread_group_resize(
259     WT_SESSION_IMPL *session, WT_THREAD_GROUP *group,
260     uint32_t new_min, uint32_t new_max, uint32_t flags)
261 {
262 	WT_DECL_RET;
263 
264 	__wt_writelock(session, &group->lock);
265 	WT_TRET(__thread_group_resize(session, group, new_min, new_max, flags));
266 	__wt_writeunlock(session, &group->lock);
267 	return (ret);
268 }
269 
270 /*
271  * __wt_thread_group_create --
272  *	Create a new thread group, assumes incoming group structure is
273  *	zero initialized.
274  */
275 int
__wt_thread_group_create(WT_SESSION_IMPL * session,WT_THREAD_GROUP * group,const char * name,uint32_t min,uint32_t max,uint32_t flags,bool (* chk_func)(WT_SESSION_IMPL * session),int (* run_func)(WT_SESSION_IMPL * session,WT_THREAD * context),int (* stop_func)(WT_SESSION_IMPL * session,WT_THREAD * context))276 __wt_thread_group_create(
277     WT_SESSION_IMPL *session, WT_THREAD_GROUP *group, const char *name,
278     uint32_t min, uint32_t max, uint32_t flags,
279     bool (*chk_func)(WT_SESSION_IMPL *session),
280     int (*run_func)(WT_SESSION_IMPL *session, WT_THREAD *context),
281     int (*stop_func)(WT_SESSION_IMPL *session, WT_THREAD *context))
282 {
283 	WT_DECL_RET;
284 	bool cond_alloced;
285 
286 	/* Check that the structure is initialized as expected */
287 	WT_ASSERT(session, group->alloc == 0);
288 
289 	cond_alloced = false;
290 
291 	__wt_verbose(session,
292 	    WT_VERB_THREAD_GROUP, "Creating thread group: %s", name);
293 
294 	WT_RET(__wt_rwlock_init(session, &group->lock));
295 	WT_ERR(__wt_cond_alloc(
296 	    session, "thread group cond", &group->wait_cond));
297 	cond_alloced = true;
298 
299 	__wt_writelock(session, &group->lock);
300 	group->chk_func = chk_func;
301 	group->run_func = run_func;
302 	group->stop_func = stop_func;
303 	group->name = name;
304 
305 	WT_TRET(__thread_group_resize(session, group, min, max, flags));
306 	__wt_writeunlock(session, &group->lock);
307 
308 	/* Cleanup on error to avoid leaking resources */
309 err:	if (ret != 0) {
310 		if (cond_alloced)
311 			__wt_cond_destroy(session, &group->wait_cond);
312 		__wt_rwlock_destroy(session, &group->lock);
313 	}
314 	return (ret);
315 }
316 
317 /*
318  * __wt_thread_group_destroy --
319  *	Shut down a thread group.  Our caller must hold the lock.
320  */
321 int
__wt_thread_group_destroy(WT_SESSION_IMPL * session,WT_THREAD_GROUP * group)322 __wt_thread_group_destroy(WT_SESSION_IMPL *session, WT_THREAD_GROUP *group)
323 {
324 	WT_DECL_RET;
325 
326 	__wt_verbose(session, WT_VERB_THREAD_GROUP,
327 	    "Destroying thread group: %s", group->name);
328 
329 	WT_ASSERT(session, __wt_rwlock_islocked(session, &group->lock));
330 
331 	/* Shut down all threads and free associated resources. */
332 	WT_TRET(__thread_group_shrink(session, group, 0));
333 
334 	__wt_free(session, group->threads);
335 
336 	__wt_cond_destroy(session, &group->wait_cond);
337 	__wt_rwlock_destroy(session, &group->lock);
338 
339 	/*
340 	 * Clear out any settings from the group, some structures are reused
341 	 * for different thread groups - in particular the eviction thread
342 	 * group for recovery and then normal runtime.
343 	 */
344 	memset(group, 0, sizeof(*group));
345 
346 	return (ret);
347 }
348 
349 /*
350  * __wt_thread_group_start_one --
351  *	Start a new thread if possible.
352  */
353 void
__wt_thread_group_start_one(WT_SESSION_IMPL * session,WT_THREAD_GROUP * group,bool is_locked)354 __wt_thread_group_start_one(
355     WT_SESSION_IMPL *session, WT_THREAD_GROUP *group, bool is_locked)
356 {
357 	WT_THREAD *thread;
358 
359 	if (group->current_threads >= group->max)
360 		return;
361 
362 	if (!is_locked)
363 		__wt_writelock(session, &group->lock);
364 
365 	/* Recheck the bounds now that we hold the lock */
366 	if (group->current_threads < group->max) {
367 		thread = group->threads[group->current_threads++];
368 		WT_ASSERT(session, thread != NULL);
369 		__wt_verbose(session, WT_VERB_THREAD_GROUP,
370 		    "Activating utility thread: %s:%" PRIu32,
371 		    group->name, thread->id);
372 		WT_ASSERT(session, !F_ISSET(thread, WT_THREAD_ACTIVE));
373 		F_SET(thread, WT_THREAD_ACTIVE);
374 		__wt_cond_signal(session, thread->pause_cond);
375 	}
376 	if (!is_locked)
377 		__wt_writeunlock(session, &group->lock);
378 }
379 
380 /*
381  * __wt_thread_group_stop_one --
382  *	Pause one thread if possible.
383  */
384 void
__wt_thread_group_stop_one(WT_SESSION_IMPL * session,WT_THREAD_GROUP * group)385 __wt_thread_group_stop_one(WT_SESSION_IMPL *session, WT_THREAD_GROUP *group)
386 {
387 	WT_THREAD *thread;
388 
389 	if (group->current_threads <= group->min)
390 		return;
391 
392 	__wt_writelock(session, &group->lock);
393 	/* Recheck the bounds now that we hold the lock */
394 	if (group->current_threads > group->min) {
395 		thread = group->threads[--group->current_threads];
396 		__wt_verbose(session, WT_VERB_THREAD_GROUP,
397 		    "Pausing utility thread: %s:%" PRIu32,
398 		    group->name, thread->id);
399 		WT_ASSERT(session, F_ISSET(thread, WT_THREAD_ACTIVE));
400 		F_CLR(thread, WT_THREAD_ACTIVE);
401 		__wt_cond_signal(session, thread->pause_cond);
402 	}
403 	__wt_writeunlock(session, &group->lock);
404 }
405