1 /*-
2 * Copyright (c) 2014-2018 MongoDB, Inc.
3 * Copyright (c) 2008-2014 WiredTiger, Inc.
4 * All rights reserved.
5 *
6 * See the file LICENSE for redistribution information.
7 */
8
9 #include "wt_internal.h"
10
11 /*
12 * __thread_run --
13 * General wrapper for any thread.
14 */
15 static WT_THREAD_RET
__thread_run(void * arg)16 __thread_run(void *arg)
17 {
18 WT_DECL_RET;
19 WT_SESSION_IMPL *session;
20 WT_THREAD *thread;
21
22 thread = (WT_THREAD*)arg;
23 session = thread->session;
24
25 for (;;) {
26 if (!F_ISSET(thread, WT_THREAD_RUN))
27 break;
28 if (!F_ISSET(thread, WT_THREAD_ACTIVE))
29 __wt_cond_wait(session, thread->pause_cond,
30 WT_THREAD_PAUSE * WT_MILLION, thread->chk_func);
31 WT_ERR(thread->run_func(session, thread));
32 }
33
34 /*
35 * If a thread is stopping it may have subsystem cleanup to do.
36 */
37 err: if (thread->stop_func != NULL)
38 ret = thread->stop_func(session, thread);
39
40 if (ret != 0 && F_ISSET(thread, WT_THREAD_PANIC_FAIL))
41 WT_PANIC_MSG(session, ret,
42 "Unrecoverable utility thread error");
43
44 /*
45 * The three cases when threads are expected to stop are:
46 * 1. When recovery is done.
47 * 2. When the connection is closing.
48 * 3. When a shutdown has been requested via clearing the run flag.
49 */
50 WT_ASSERT(session, !F_ISSET(thread, WT_THREAD_RUN) ||
51 F_ISSET(S2C(session), WT_CONN_CLOSING | WT_CONN_RECOVERING));
52
53 return (WT_THREAD_RET_VALUE);
54 }
55
56 /*
57 * __thread_group_shrink --
58 * Decrease the number of threads in the group and free memory
59 * associated with slots larger than the new count.
60 */
61 static int
__thread_group_shrink(WT_SESSION_IMPL * session,WT_THREAD_GROUP * group,uint32_t new_count)62 __thread_group_shrink(
63 WT_SESSION_IMPL *session, WT_THREAD_GROUP *group, uint32_t new_count)
64 {
65 WT_DECL_RET;
66 WT_SESSION *wt_session;
67 WT_THREAD *thread;
68 uint32_t current_slot;
69
70 WT_ASSERT(session, __wt_rwlock_islocked(session, &group->lock));
71
72 for (current_slot = group->alloc; current_slot > new_count; ) {
73 /*
74 * The offset value is a counter not an array index,
75 * so adjust it before finding the last thread in the group.
76 */
77 thread = group->threads[--current_slot];
78
79 if (thread == NULL)
80 continue;
81
82 WT_ASSERT(session, thread->tid.created);
83 __wt_verbose(session, WT_VERB_THREAD_GROUP,
84 "Stopping utility thread: %s:%" PRIu32,
85 group->name, thread->id);
86 if (F_ISSET(thread, WT_THREAD_ACTIVE))
87 --group->current_threads;
88 F_CLR(thread, WT_THREAD_ACTIVE | WT_THREAD_RUN);
89 /*
90 * Signal the thread in case it is in a long timeout.
91 */
92 __wt_cond_signal(session, thread->pause_cond);
93 __wt_cond_signal(session, group->wait_cond);
94 }
95
96 /*
97 * We have to perform the join without holding the lock because
98 * the threads themselves may be waiting on the lock.
99 */
100 __wt_writeunlock(session, &group->lock);
101 for (current_slot = group->alloc; current_slot > new_count; ) {
102 thread = group->threads[--current_slot];
103
104 if (thread == NULL)
105 continue;
106 WT_TRET(__wt_thread_join(session, &thread->tid));
107 __wt_cond_destroy(session, &thread->pause_cond);
108 }
109 __wt_writelock(session, &group->lock);
110 for (current_slot = group->alloc; current_slot > new_count; ) {
111 thread = group->threads[--current_slot];
112
113 if (thread == NULL)
114 continue;
115 WT_ASSERT(session, thread->session != NULL);
116 wt_session = (WT_SESSION *)thread->session;
117 WT_TRET(wt_session->close(wt_session, NULL));
118 thread->session = NULL;
119 __wt_free(session, thread);
120 group->threads[current_slot] = NULL;
121 }
122
123 return (ret);
124 }
125
126 /*
127 * __thread_group_resize --
128 * Resize an array of utility threads already holding the lock.
129 */
130 static int
__thread_group_resize(WT_SESSION_IMPL * session,WT_THREAD_GROUP * group,uint32_t new_min,uint32_t new_max,uint32_t flags)131 __thread_group_resize(
132 WT_SESSION_IMPL *session, WT_THREAD_GROUP *group,
133 uint32_t new_min, uint32_t new_max, uint32_t flags)
134 {
135 WT_CONNECTION_IMPL *conn;
136 WT_DECL_RET;
137 WT_SESSION *wt_session;
138 WT_THREAD *thread;
139 size_t alloc;
140 uint32_t i, session_flags;
141
142 conn = S2C(session);
143 thread = NULL;
144
145 __wt_verbose(session, WT_VERB_THREAD_GROUP,
146 "Resize thread group: %s, from min: %" PRIu32 " -> %" PRIu32
147 " from max: %" PRIu32 " -> %" PRIu32,
148 group->name, group->min, new_min, group->max, new_max);
149
150 WT_ASSERT(session,
151 group->current_threads <= group->alloc &&
152 __wt_rwlock_islocked(session, &group->lock));
153
154 if (new_min == group->min && new_max == group->max)
155 return (0);
156
157 if (new_min > new_max)
158 WT_RET_MSG(session, EINVAL,
159 "Illegal thread group resize: %s, from min: %" PRIu32
160 " -> %" PRIu32 " from max: %" PRIu32 " -> %" PRIu32,
161 group->name, group->min, new_min, group->max, new_max);
162
163 /*
164 * Call shrink to reduce the number of thread structures and running
165 * threads if required by the change in group size.
166 */
167 WT_RET(__thread_group_shrink(session, group, new_max));
168
169 /*
170 * Only reallocate the thread array if it is the largest ever, since
171 * our realloc doesn't support shrinking the allocated size.
172 */
173 if (group->alloc < new_max) {
174 alloc = group->alloc * sizeof(*group->threads);
175 WT_RET(__wt_realloc(session, &alloc,
176 new_max * sizeof(*group->threads), &group->threads));
177 group->alloc = new_max;
178 }
179
180 /*
181 * Initialize the structures based on the previous group size, not
182 * the previous allocated size.
183 */
184 for (i = group->max; i < new_max; i++) {
185 WT_ERR(__wt_calloc_one(session, &thread));
186 /*
187 * Threads get their own session and lookaside table cursor
188 * (if the lookaside table is open).
189 */
190 session_flags =
191 LF_ISSET(WT_THREAD_CAN_WAIT) ? WT_SESSION_CAN_WAIT : 0;
192 WT_ERR(__wt_open_internal_session(conn, group->name,
193 false, session_flags, &thread->session));
194 if (LF_ISSET(WT_THREAD_LOOKASIDE) &&
195 F_ISSET(conn, WT_CONN_LOOKASIDE_OPEN))
196 WT_ERR(__wt_las_cursor_open(thread->session));
197 if (LF_ISSET(WT_THREAD_PANIC_FAIL))
198 F_SET(thread, WT_THREAD_PANIC_FAIL);
199 thread->id = i;
200 thread->chk_func = group->chk_func;
201 thread->run_func = group->run_func;
202 thread->stop_func = group->stop_func;
203 WT_ERR(__wt_cond_alloc(
204 session, "Thread cond", &thread->pause_cond));
205
206 /*
207 * Start thread as inactive. We'll activate the needed
208 * number later.
209 */
210 __wt_verbose(session, WT_VERB_THREAD_GROUP,
211 "Starting utility thread: %s:%" PRIu32,
212 group->name, thread->id);
213 F_SET(thread, WT_THREAD_RUN);
214 WT_ERR(__wt_thread_create(thread->session,
215 &thread->tid, __thread_run, thread));
216
217 WT_ASSERT(session, group->threads[i] == NULL);
218 group->threads[i] = thread;
219 thread = NULL;
220 }
221
222 group->max = new_max;
223 group->min = new_min;
224 while (group->current_threads < new_min)
225 __wt_thread_group_start_one(session, group, true);
226 return (0);
227
228 err: /*
229 * An error resizing a thread array is currently fatal, it should only
230 * happen in an out of memory situation. Do real cleanup just in case
231 * that changes in the future.
232 */
233 if (thread != NULL) {
234 if (thread->session != NULL) {
235 wt_session = (WT_SESSION *)thread->session;
236 WT_TRET(wt_session->close(wt_session, NULL));
237 }
238 __wt_cond_destroy(session, &thread->pause_cond);
239 __wt_free(session, thread);
240 }
241
242 /*
243 * Update the thread group information even on failure to improve our
244 * chances of cleaning up properly.
245 */
246 group->max = new_max;
247 group->min = new_min;
248 WT_TRET(__wt_thread_group_destroy(session, group));
249
250 WT_PANIC_RET(session, ret, "Error while resizing thread group");
251 }
252
253 /*
254 * __wt_thread_group_resize --
255 * Resize an array of utility threads taking the lock.
256 */
257 int
__wt_thread_group_resize(WT_SESSION_IMPL * session,WT_THREAD_GROUP * group,uint32_t new_min,uint32_t new_max,uint32_t flags)258 __wt_thread_group_resize(
259 WT_SESSION_IMPL *session, WT_THREAD_GROUP *group,
260 uint32_t new_min, uint32_t new_max, uint32_t flags)
261 {
262 WT_DECL_RET;
263
264 __wt_writelock(session, &group->lock);
265 WT_TRET(__thread_group_resize(session, group, new_min, new_max, flags));
266 __wt_writeunlock(session, &group->lock);
267 return (ret);
268 }
269
270 /*
271 * __wt_thread_group_create --
272 * Create a new thread group, assumes incoming group structure is
273 * zero initialized.
274 */
275 int
__wt_thread_group_create(WT_SESSION_IMPL * session,WT_THREAD_GROUP * group,const char * name,uint32_t min,uint32_t max,uint32_t flags,bool (* chk_func)(WT_SESSION_IMPL * session),int (* run_func)(WT_SESSION_IMPL * session,WT_THREAD * context),int (* stop_func)(WT_SESSION_IMPL * session,WT_THREAD * context))276 __wt_thread_group_create(
277 WT_SESSION_IMPL *session, WT_THREAD_GROUP *group, const char *name,
278 uint32_t min, uint32_t max, uint32_t flags,
279 bool (*chk_func)(WT_SESSION_IMPL *session),
280 int (*run_func)(WT_SESSION_IMPL *session, WT_THREAD *context),
281 int (*stop_func)(WT_SESSION_IMPL *session, WT_THREAD *context))
282 {
283 WT_DECL_RET;
284 bool cond_alloced;
285
286 /* Check that the structure is initialized as expected */
287 WT_ASSERT(session, group->alloc == 0);
288
289 cond_alloced = false;
290
291 __wt_verbose(session,
292 WT_VERB_THREAD_GROUP, "Creating thread group: %s", name);
293
294 WT_RET(__wt_rwlock_init(session, &group->lock));
295 WT_ERR(__wt_cond_alloc(
296 session, "thread group cond", &group->wait_cond));
297 cond_alloced = true;
298
299 __wt_writelock(session, &group->lock);
300 group->chk_func = chk_func;
301 group->run_func = run_func;
302 group->stop_func = stop_func;
303 group->name = name;
304
305 WT_TRET(__thread_group_resize(session, group, min, max, flags));
306 __wt_writeunlock(session, &group->lock);
307
308 /* Cleanup on error to avoid leaking resources */
309 err: if (ret != 0) {
310 if (cond_alloced)
311 __wt_cond_destroy(session, &group->wait_cond);
312 __wt_rwlock_destroy(session, &group->lock);
313 }
314 return (ret);
315 }
316
317 /*
318 * __wt_thread_group_destroy --
319 * Shut down a thread group. Our caller must hold the lock.
320 */
321 int
__wt_thread_group_destroy(WT_SESSION_IMPL * session,WT_THREAD_GROUP * group)322 __wt_thread_group_destroy(WT_SESSION_IMPL *session, WT_THREAD_GROUP *group)
323 {
324 WT_DECL_RET;
325
326 __wt_verbose(session, WT_VERB_THREAD_GROUP,
327 "Destroying thread group: %s", group->name);
328
329 WT_ASSERT(session, __wt_rwlock_islocked(session, &group->lock));
330
331 /* Shut down all threads and free associated resources. */
332 WT_TRET(__thread_group_shrink(session, group, 0));
333
334 __wt_free(session, group->threads);
335
336 __wt_cond_destroy(session, &group->wait_cond);
337 __wt_rwlock_destroy(session, &group->lock);
338
339 /*
340 * Clear out any settings from the group, some structures are reused
341 * for different thread groups - in particular the eviction thread
342 * group for recovery and then normal runtime.
343 */
344 memset(group, 0, sizeof(*group));
345
346 return (ret);
347 }
348
349 /*
350 * __wt_thread_group_start_one --
351 * Start a new thread if possible.
352 */
353 void
__wt_thread_group_start_one(WT_SESSION_IMPL * session,WT_THREAD_GROUP * group,bool is_locked)354 __wt_thread_group_start_one(
355 WT_SESSION_IMPL *session, WT_THREAD_GROUP *group, bool is_locked)
356 {
357 WT_THREAD *thread;
358
359 if (group->current_threads >= group->max)
360 return;
361
362 if (!is_locked)
363 __wt_writelock(session, &group->lock);
364
365 /* Recheck the bounds now that we hold the lock */
366 if (group->current_threads < group->max) {
367 thread = group->threads[group->current_threads++];
368 WT_ASSERT(session, thread != NULL);
369 __wt_verbose(session, WT_VERB_THREAD_GROUP,
370 "Activating utility thread: %s:%" PRIu32,
371 group->name, thread->id);
372 WT_ASSERT(session, !F_ISSET(thread, WT_THREAD_ACTIVE));
373 F_SET(thread, WT_THREAD_ACTIVE);
374 __wt_cond_signal(session, thread->pause_cond);
375 }
376 if (!is_locked)
377 __wt_writeunlock(session, &group->lock);
378 }
379
380 /*
381 * __wt_thread_group_stop_one --
382 * Pause one thread if possible.
383 */
384 void
__wt_thread_group_stop_one(WT_SESSION_IMPL * session,WT_THREAD_GROUP * group)385 __wt_thread_group_stop_one(WT_SESSION_IMPL *session, WT_THREAD_GROUP *group)
386 {
387 WT_THREAD *thread;
388
389 if (group->current_threads <= group->min)
390 return;
391
392 __wt_writelock(session, &group->lock);
393 /* Recheck the bounds now that we hold the lock */
394 if (group->current_threads > group->min) {
395 thread = group->threads[--group->current_threads];
396 __wt_verbose(session, WT_VERB_THREAD_GROUP,
397 "Pausing utility thread: %s:%" PRIu32,
398 group->name, thread->id);
399 WT_ASSERT(session, F_ISSET(thread, WT_THREAD_ACTIVE));
400 F_CLR(thread, WT_THREAD_ACTIVE);
401 __wt_cond_signal(session, thread->pause_cond);
402 }
403 __wt_writeunlock(session, &group->lock);
404 }
405