1 /*-
2  * Copyright (c) 2014-2018 MongoDB, Inc.
3  * Copyright (c) 2008-2014 WiredTiger, Inc.
4  *	All rights reserved.
5  *
6  * See the file LICENSE for redistribution information.
7  */
8 
9 #include "wt_internal.h"
10 
11 static int __lsm_manager_run_server(WT_SESSION_IMPL *);
12 
13 static WT_THREAD_RET __lsm_worker_manager(void *);
14 
15 /*
16  * __wt_lsm_manager_config --
17  *	Configure the LSM manager.
18  */
19 int
__wt_lsm_manager_config(WT_SESSION_IMPL * session,const char ** cfg)20 __wt_lsm_manager_config(WT_SESSION_IMPL *session, const char **cfg)
21 {
22 	WT_CONFIG_ITEM cval;
23 	WT_CONNECTION_IMPL *conn;
24 
25 	conn = S2C(session);
26 
27 	WT_RET(__wt_config_gets(session, cfg, "lsm_manager.merge", &cval));
28 	if (cval.val)
29 		F_SET(conn, WT_CONN_LSM_MERGE);
30 	WT_RET(__wt_config_gets(
31 	    session, cfg, "lsm_manager.worker_thread_max", &cval));
32 	if (cval.val)
33 		conn->lsm_manager.lsm_workers_max = (uint32_t)cval.val;
34 	return (0);
35 }
36 
37 /*
38  * __lsm_general_worker_start --
39  *	Start up all of the general LSM worker threads.
40  */
41 static int
__lsm_general_worker_start(WT_SESSION_IMPL * session)42 __lsm_general_worker_start(WT_SESSION_IMPL *session)
43 {
44 	WT_CONNECTION_IMPL *conn;
45 	WT_LSM_MANAGER *manager;
46 	WT_LSM_WORKER_ARGS *worker_args;
47 
48 	conn = S2C(session);
49 	manager = &conn->lsm_manager;
50 
51 	/*
52 	 * Start the worker threads or new worker threads if called via
53 	 * reconfigure. The LSM manager is worker[0].
54 	 * This should get more sophisticated in the future - only launching
55 	 * as many worker threads as are required to keep up with demand.
56 	 */
57 	WT_ASSERT(session, manager->lsm_workers > 0);
58 	WT_ASSERT(session, manager->lsm_workers < manager->lsm_workers_max);
59 	for (; manager->lsm_workers < manager->lsm_workers_max;
60 	    manager->lsm_workers++) {
61 		worker_args =
62 		    &manager->lsm_worker_cookies[manager->lsm_workers];
63 		worker_args->work_cond = manager->work_cond;
64 		worker_args->id = manager->lsm_workers;
65 		/*
66 		 * The first worker only does switch and drop operations as
67 		 * these are both short operations and it is essential
68 		 * that switches are responsive to avoid introducing
69 		 * throttling stalls.
70 		 */
71 		if (manager->lsm_workers == 1)
72 			worker_args->type =
73 			    WT_LSM_WORK_DROP | WT_LSM_WORK_SWITCH;
74 		else {
75 			worker_args->type = WT_LSM_WORK_GENERAL_OPS;
76 			/*
77 			 * Only allow half of the threads to run merges to
78 			 * avoid all all workers getting stuck in long-running
79 			 * merge operations. Make sure the first worker is
80 			 * allowed, so that there is at least one thread
81 			 * capable of running merges.  We know the first
82 			 * worker is id 2, so set merges on even numbered
83 			 * workers.
84 			 */
85 			if (manager->lsm_workers % 2 == 0)
86 				FLD_SET(worker_args->type, WT_LSM_WORK_MERGE);
87 		}
88 		WT_RET(__wt_lsm_worker_start(session, worker_args));
89 	}
90 
91 	/*
92 	 * Setup the first worker properly - if there are only a minimal
93 	 * number of workers allow the first worker to flush. Otherwise a
94 	 * single merge can lead to switched chunks filling up the cache.
95 	 * This is separate to the main loop so that it is applied on startup
96 	 * and reconfigure.
97 	 */
98 	if (manager->lsm_workers_max == WT_LSM_MIN_WORKERS)
99 		FLD_SET(manager->lsm_worker_cookies[1].type, WT_LSM_WORK_FLUSH);
100 	else
101 		FLD_CLR(manager->lsm_worker_cookies[1].type, WT_LSM_WORK_FLUSH);
102 
103 	return (0);
104 }
105 
106 /*
107  * __lsm_stop_workers --
108  *	Stop worker threads until the number reaches the configured amount.
109  */
110 static int
__lsm_stop_workers(WT_SESSION_IMPL * session)111 __lsm_stop_workers(WT_SESSION_IMPL *session)
112 {
113 	WT_LSM_MANAGER *manager;
114 	WT_LSM_WORKER_ARGS *worker_args;
115 
116 	manager = &S2C(session)->lsm_manager;
117 	/*
118 	 * Start at the end of the list of threads and stop them until we have
119 	 * the desired number. We want to keep all active threads packed at the
120 	 * front of the worker array.
121 	 */
122 	WT_ASSERT(session, manager->lsm_workers > manager->lsm_workers_max);
123 	for (; manager->lsm_workers > manager->lsm_workers_max;
124 	    manager->lsm_workers--) {
125 		worker_args =
126 		    &manager->lsm_worker_cookies[manager->lsm_workers - 1];
127 		WT_ASSERT(session, worker_args->tid_set);
128 
129 		WT_RET(__wt_lsm_worker_stop(session, worker_args));
130 		worker_args->type = 0;
131 
132 		/*
133 		 * We do not clear the other fields because they are allocated
134 		 * statically when the connection was opened.
135 		 */
136 	}
137 
138 	/*
139 	 * Setup the first worker properly - if there are only a minimal
140 	 * number of workers it should flush. Since the number of threads
141 	 * is being reduced the field can't already be set.
142 	 */
143 	if (manager->lsm_workers_max == WT_LSM_MIN_WORKERS)
144 		FLD_SET(manager->lsm_worker_cookies[1].type, WT_LSM_WORK_FLUSH);
145 
146 	return (0);
147 }
148 
149 /*
150  * __wt_lsm_manager_reconfig --
151  *	Re-configure the LSM manager.
152  */
153 int
__wt_lsm_manager_reconfig(WT_SESSION_IMPL * session,const char ** cfg)154 __wt_lsm_manager_reconfig(WT_SESSION_IMPL *session, const char **cfg)
155 {
156 	WT_LSM_MANAGER *manager;
157 	uint32_t orig_workers;
158 
159 	manager = &S2C(session)->lsm_manager;
160 	orig_workers = manager->lsm_workers_max;
161 
162 	WT_RET(__wt_lsm_manager_config(session, cfg));
163 	/*
164 	 * If LSM hasn't started yet, we simply reconfigured the settings
165 	 * and we'll let the normal code path start the threads.
166 	 */
167 	if (manager->lsm_workers_max == 0)
168 		return (0);
169 	if (manager->lsm_workers == 0)
170 		return (0);
171 	/*
172 	 * If the number of workers has not changed, we're done.
173 	 */
174 	if (orig_workers == manager->lsm_workers_max)
175 		return (0);
176 	/*
177 	 * If we want more threads, start them.
178 	 */
179 	if (manager->lsm_workers_max > orig_workers)
180 		return (__lsm_general_worker_start(session));
181 
182 	/*
183 	 * Otherwise we want to reduce the number of workers.
184 	 */
185 	WT_ASSERT(session, manager->lsm_workers_max < orig_workers);
186 	WT_RET(__lsm_stop_workers(session));
187 	return (0);
188 }
189 
190 /*
191  * __wt_lsm_manager_start --
192  *	Start the LSM management infrastructure. Our queues and locks were
193  *	initialized when the connection was initialized.
194  */
195 int
__wt_lsm_manager_start(WT_SESSION_IMPL * session)196 __wt_lsm_manager_start(WT_SESSION_IMPL *session)
197 {
198 	WT_CONNECTION_IMPL *conn;
199 	WT_DECL_RET;
200 	WT_LSM_MANAGER *manager;
201 	WT_SESSION_IMPL *worker_session;
202 	uint32_t i;
203 
204 	conn = S2C(session);
205 	manager = &conn->lsm_manager;
206 
207 	/*
208 	 * If readonly or the manager is running, or we've already failed,
209 	 * there's no work to do.
210 	 */
211 	if (F_ISSET(conn, WT_CONN_READONLY) ||
212 	    manager->lsm_workers != 0 ||
213 	    F_ISSET(manager, WT_LSM_MANAGER_SHUTDOWN))
214 		return (0);
215 
216 	/* It's possible to race, see if we're the winner. */
217 	if (!__wt_atomic_cas32(&manager->lsm_workers, 0, 1))
218 		return (0);
219 
220 	/* We need at least a manager, a switch thread and a generic worker. */
221 	WT_ASSERT(session, manager->lsm_workers_max > 2);
222 
223 	/*
224 	 * Open sessions for all potential worker threads here - it's not
225 	 * safe to have worker threads open/close sessions themselves.
226 	 * All the LSM worker threads do their operations on read-only
227 	 * files. Use read-uncommitted isolation to avoid keeping
228 	 * updates in cache unnecessarily.
229 	 */
230 	for (i = 0; i < WT_LSM_MAX_WORKERS; i++) {
231 		WT_ERR(__wt_open_internal_session(
232 		    conn, "lsm-worker", false, 0, &worker_session));
233 		worker_session->isolation = WT_ISO_READ_UNCOMMITTED;
234 		manager->lsm_worker_cookies[i].session = worker_session;
235 	}
236 
237 	F_SET(conn, WT_CONN_SERVER_LSM);
238 
239 	/* Start the LSM manager thread. */
240 	WT_ERR(__wt_thread_create(session, &manager->lsm_worker_cookies[0].tid,
241 	    __lsm_worker_manager, &manager->lsm_worker_cookies[0]));
242 
243 	if (0) {
244 err:		for (i = 0;
245 		    (worker_session =
246 		    manager->lsm_worker_cookies[i].session) != NULL;
247 		    i++)
248 			WT_TRET((&worker_session->iface)->close(
249 			    &worker_session->iface, NULL));
250 
251 		/* Make the failure permanent, we won't try again. */
252 		F_SET(manager, WT_LSM_MANAGER_SHUTDOWN);
253 
254 		/*
255 		 * Reset the workers count (otherwise, LSM destroy will hang
256 		 * waiting for threads to exit.
257 		 */
258 		WT_PUBLISH(manager->lsm_workers, 0);
259 	}
260 	return (ret);
261 }
262 
263 /*
264  * __wt_lsm_manager_free_work_unit --
265  *	Release an LSM tree work unit.
266  */
267 void
__wt_lsm_manager_free_work_unit(WT_SESSION_IMPL * session,WT_LSM_WORK_UNIT * entry)268 __wt_lsm_manager_free_work_unit(
269     WT_SESSION_IMPL *session, WT_LSM_WORK_UNIT *entry)
270 {
271 	if (entry != NULL) {
272 		WT_ASSERT(session, entry->lsm_tree->queue_ref > 0);
273 
274 		(void)__wt_atomic_sub32(&entry->lsm_tree->queue_ref, 1);
275 		__wt_free(session, entry);
276 	}
277 }
278 
279 /*
280  * __wt_lsm_manager_destroy --
281  *	Destroy the LSM manager threads and subsystem.
282  */
283 int
__wt_lsm_manager_destroy(WT_SESSION_IMPL * session)284 __wt_lsm_manager_destroy(WT_SESSION_IMPL *session)
285 {
286 	WT_CONNECTION_IMPL *conn;
287 	WT_DECL_RET;
288 	WT_LSM_MANAGER *manager;
289 	WT_LSM_WORK_UNIT *current;
290 	WT_SESSION *wt_session;
291 	uint64_t removed;
292 	uint32_t i;
293 
294 	conn = S2C(session);
295 	manager = &conn->lsm_manager;
296 	removed = 0;
297 
298 	/* Clear the LSM server flag. */
299 	F_CLR(conn, WT_CONN_SERVER_LSM);
300 
301 	WT_ASSERT(session, !F_ISSET(conn, WT_CONN_READONLY) ||
302 	    manager->lsm_workers == 0);
303 	if (manager->lsm_workers > 0) {
304 		/* Wait for the main LSM manager thread to finish. */
305 		while (!F_ISSET(manager, WT_LSM_MANAGER_SHUTDOWN)) {
306 			WT_STAT_CONN_INCR(session, conn_close_blocked_lsm);
307 			__wt_yield();
308 		}
309 
310 		/* Clean up open LSM handles. */
311 		ret = __wt_lsm_tree_close_all(session);
312 
313 		WT_TRET(__wt_thread_join(
314 		    session, &manager->lsm_worker_cookies[0].tid));
315 
316 		/* Release memory from any operations left on the queue. */
317 		while ((current = TAILQ_FIRST(&manager->switchqh)) != NULL) {
318 			TAILQ_REMOVE(&manager->switchqh, current, q);
319 			++removed;
320 			__wt_lsm_manager_free_work_unit(session, current);
321 		}
322 		while ((current = TAILQ_FIRST(&manager->appqh)) != NULL) {
323 			TAILQ_REMOVE(&manager->appqh, current, q);
324 			++removed;
325 			__wt_lsm_manager_free_work_unit(session, current);
326 		}
327 		while ((current = TAILQ_FIRST(&manager->managerqh)) != NULL) {
328 			TAILQ_REMOVE(&manager->managerqh, current, q);
329 			++removed;
330 			__wt_lsm_manager_free_work_unit(session, current);
331 		}
332 
333 		/* Close all LSM worker sessions. */
334 		for (i = 0; i < WT_LSM_MAX_WORKERS; i++) {
335 			wt_session =
336 			    &manager->lsm_worker_cookies[i].session->iface;
337 			WT_TRET(wt_session->close(wt_session, NULL));
338 		}
339 	}
340 	WT_STAT_CONN_INCRV(session, lsm_work_units_discarded, removed);
341 
342 	/* Free resources that are allocated in connection initialize */
343 	__wt_spin_destroy(session, &manager->switch_lock);
344 	__wt_spin_destroy(session, &manager->app_lock);
345 	__wt_spin_destroy(session, &manager->manager_lock);
346 	__wt_cond_destroy(session, &manager->work_cond);
347 
348 	return (ret);
349 }
350 
351 /*
352  * __lsm_manager_worker_shutdown --
353  *	Shutdown the LSM worker threads.
354  */
355 static int
__lsm_manager_worker_shutdown(WT_SESSION_IMPL * session)356 __lsm_manager_worker_shutdown(WT_SESSION_IMPL *session)
357 {
358 	WT_DECL_RET;
359 	WT_LSM_MANAGER *manager;
360 	u_int i;
361 
362 	manager = &S2C(session)->lsm_manager;
363 
364 	/*
365 	 * Wait for the rest of the LSM workers to shutdown. Start at index
366 	 * one - since we (the manager) are at index 0.
367 	 */
368 	for (i = 1; i < manager->lsm_workers; i++) {
369 		WT_ASSERT(session, manager->lsm_worker_cookies[i].tid_set);
370 		WT_TRET(__wt_lsm_worker_stop(
371 		    session, &manager->lsm_worker_cookies[i]));
372 	}
373 	return (ret);
374 }
375 
376 /*
377  * __lsm_manager_run_server --
378  *	Run manager thread operations.
379  */
380 static int
__lsm_manager_run_server(WT_SESSION_IMPL * session)381 __lsm_manager_run_server(WT_SESSION_IMPL *session)
382 {
383 	struct timespec now;
384 	WT_CONNECTION_IMPL *conn;
385 	WT_DECL_RET;
386 	WT_LSM_TREE *lsm_tree;
387 	uint64_t fillms, idlems;
388 	bool dhandle_locked;
389 
390 	conn = S2C(session);
391 	dhandle_locked = false;
392 
393 	while (F_ISSET(conn, WT_CONN_SERVER_LSM)) {
394 		__wt_sleep(0, 10000);
395 		if (TAILQ_EMPTY(&conn->lsmqh))
396 			continue;
397 		__wt_readlock(session, &conn->dhandle_lock);
398 		F_SET(session, WT_SESSION_LOCKED_HANDLE_LIST_READ);
399 		dhandle_locked = true;
400 		TAILQ_FOREACH(lsm_tree, &conn->lsmqh, q) {
401 			if (!lsm_tree->active)
402 				continue;
403 			__wt_epoch(session, &now);
404 			/*
405 			 * If work was added reset our counts and time.
406 			 * Otherwise compute an idle time.
407 			 */
408 			if (lsm_tree->work_count != lsm_tree->mgr_work_count ||
409 			    lsm_tree->work_count == 0) {
410 				idlems = 0;
411 				lsm_tree->mgr_work_count = lsm_tree->work_count;
412 				lsm_tree->last_active = now;
413 			} else
414 				idlems =
415 				    WT_TIMEDIFF_MS(now, lsm_tree->last_active);
416 			fillms = 3 * lsm_tree->chunk_fill_ms;
417 			if (fillms == 0)
418 				fillms = 10000;
419 			/*
420 			 * If the tree appears to not be triggering enough
421 			 * LSM maintenance, help it out. Some types of
422 			 * additional work units don't hurt, and can be
423 			 * necessary if some work units aren't completed for
424 			 * some reason.
425 			 * If the tree hasn't been modified, and there are
426 			 * more than 1 chunks - try to get the tree smaller
427 			 * so queries run faster.
428 			 * If we are getting aggressive - ensure there are
429 			 * enough work units that we can get chunks merged.
430 			 * If we aren't pushing enough work units, compared
431 			 * to how often new chunks are being created add some
432 			 * more.
433 			 */
434 			if (lsm_tree->queue_ref >= LSM_TREE_MAX_QUEUE)
435 				WT_STAT_CONN_INCR(session,
436 				    lsm_work_queue_max);
437 			else if ((!lsm_tree->modified &&
438 			    lsm_tree->nchunks > 1) ||
439 			    (lsm_tree->queue_ref == 0 &&
440 			    lsm_tree->nchunks > 1) ||
441 			    (lsm_tree->merge_aggressiveness >
442 			    WT_LSM_AGGRESSIVE_THRESHOLD &&
443 			     !F_ISSET(lsm_tree, WT_LSM_TREE_COMPACTING)) ||
444 			    idlems > fillms) {
445 				WT_ERR(__wt_lsm_manager_push_entry(
446 				    session, WT_LSM_WORK_SWITCH, 0, lsm_tree));
447 				WT_ERR(__wt_lsm_manager_push_entry(
448 				    session, WT_LSM_WORK_DROP, 0, lsm_tree));
449 				WT_ERR(__wt_lsm_manager_push_entry(
450 				    session, WT_LSM_WORK_FLUSH, 0, lsm_tree));
451 				WT_ERR(__wt_lsm_manager_push_entry(
452 				    session, WT_LSM_WORK_BLOOM, 0, lsm_tree));
453 				__wt_verbose(session,
454 				    WT_VERB_LSM_MANAGER,
455 				    "MGR %s: queue %" PRIu32 " mod %d "
456 				    "nchunks %" PRIu32
457 				    " flags %#" PRIx32 " aggressive %" PRIu32
458 				    " idlems %" PRIu64
459 				    " fillms %" PRIu64,
460 				    lsm_tree->name, lsm_tree->queue_ref,
461 				    lsm_tree->modified, lsm_tree->nchunks,
462 				    lsm_tree->flags,
463 				    lsm_tree->merge_aggressiveness,
464 				    idlems, fillms);
465 				WT_ERR(__wt_lsm_manager_push_entry(
466 				    session, WT_LSM_WORK_MERGE, 0, lsm_tree));
467 			}
468 		}
469 		__wt_readunlock(session, &conn->dhandle_lock);
470 		F_CLR(session, WT_SESSION_LOCKED_HANDLE_LIST_READ);
471 		dhandle_locked = false;
472 	}
473 
474 err:	if (dhandle_locked) {
475 		__wt_readunlock(session, &conn->dhandle_lock);
476 		F_CLR(session, WT_SESSION_LOCKED_HANDLE_LIST_READ);
477 	}
478 	return (ret);
479 }
480 
481 /*
482  * __lsm_worker_manager --
483  *	A thread that manages all open LSM trees, and the shared LSM worker
484  *	threads.
485  */
486 static WT_THREAD_RET
__lsm_worker_manager(void * arg)487 __lsm_worker_manager(void *arg)
488 {
489 	WT_DECL_RET;
490 	WT_LSM_MANAGER *manager;
491 	WT_LSM_WORKER_ARGS *cookie;
492 	WT_SESSION_IMPL *session;
493 
494 	cookie = (WT_LSM_WORKER_ARGS *)arg;
495 	session = cookie->session;
496 	manager = &S2C(session)->lsm_manager;
497 
498 	WT_ERR(__lsm_general_worker_start(session));
499 	WT_ERR(__lsm_manager_run_server(session));
500 	WT_ERR(__lsm_manager_worker_shutdown(session));
501 
502 	if (ret != 0) {
503 err:		WT_PANIC_MSG(session, ret, "LSM worker manager thread error");
504 	}
505 
506 	/* Connection close waits on us to shutdown, let it know we're done. */
507 	F_SET(manager, WT_LSM_MANAGER_SHUTDOWN);
508 	WT_FULL_BARRIER();
509 
510 	return (WT_THREAD_RET_VALUE);
511 }
512 
513 /*
514  * __wt_lsm_manager_clear_tree --
515  *	Remove all entries for a tree from the LSM manager queues. This
516  *	introduces an inefficiency if LSM trees are being opened and closed
517  *	regularly.
518  */
519 void
__wt_lsm_manager_clear_tree(WT_SESSION_IMPL * session,WT_LSM_TREE * lsm_tree)520 __wt_lsm_manager_clear_tree(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree)
521 {
522 	WT_LSM_MANAGER *manager;
523 	WT_LSM_WORK_UNIT *current, *tmp;
524 	uint64_t removed;
525 
526 	manager = &S2C(session)->lsm_manager;
527 	removed = 0;
528 
529 	/* Clear out the tree from the switch queue */
530 	__wt_spin_lock(session, &manager->switch_lock);
531 	TAILQ_FOREACH_SAFE(current, &manager->switchqh, q, tmp) {
532 		if (current->lsm_tree != lsm_tree)
533 			continue;
534 		++removed;
535 		TAILQ_REMOVE(&manager->switchqh, current, q);
536 		__wt_lsm_manager_free_work_unit(session, current);
537 	}
538 	__wt_spin_unlock(session, &manager->switch_lock);
539 	/* Clear out the tree from the application queue */
540 	__wt_spin_lock(session, &manager->app_lock);
541 	TAILQ_FOREACH_SAFE(current, &manager->appqh, q, tmp) {
542 		if (current->lsm_tree != lsm_tree)
543 			continue;
544 		++removed;
545 		TAILQ_REMOVE(&manager->appqh, current, q);
546 		__wt_lsm_manager_free_work_unit(session, current);
547 	}
548 	__wt_spin_unlock(session, &manager->app_lock);
549 	/* Clear out the tree from the manager queue */
550 	__wt_spin_lock(session, &manager->manager_lock);
551 	TAILQ_FOREACH_SAFE(current, &manager->managerqh, q, tmp) {
552 		if (current->lsm_tree != lsm_tree)
553 			continue;
554 		++removed;
555 		TAILQ_REMOVE(&manager->managerqh, current, q);
556 		__wt_lsm_manager_free_work_unit(session, current);
557 	}
558 	__wt_spin_unlock(session, &manager->manager_lock);
559 	WT_STAT_CONN_INCRV(session, lsm_work_units_discarded, removed);
560 }
561 
562 /*
563  * We assume this is only called from __wt_lsm_manager_pop_entry and we
564  * have session, entry and type available to use.  If the queue is empty
565  * we may return from the macro.
566  */
567 #define	LSM_POP_ENTRY(qh, qlock, qlen) do {				\
568 	if (TAILQ_EMPTY(qh))						\
569 		return (0);						\
570 	__wt_spin_lock(session, qlock);					\
571 	TAILQ_FOREACH(entry, (qh), q) {					\
572 		if (FLD_ISSET(type, entry->type)) {			\
573 			TAILQ_REMOVE(qh, entry, q);			\
574 			WT_STAT_CONN_DECR(session, qlen);		\
575 			break;						\
576 		}							\
577 	}								\
578 	__wt_spin_unlock(session, (qlock));				\
579 } while (0)
580 
581 /*
582  * __wt_lsm_manager_pop_entry --
583  *	Retrieve the head of the queue, if it matches the requested work
584  *	unit type.
585  */
586 int
__wt_lsm_manager_pop_entry(WT_SESSION_IMPL * session,uint32_t type,WT_LSM_WORK_UNIT ** entryp)587 __wt_lsm_manager_pop_entry(
588     WT_SESSION_IMPL *session, uint32_t type, WT_LSM_WORK_UNIT **entryp)
589 {
590 	WT_LSM_MANAGER *manager;
591 	WT_LSM_WORK_UNIT *entry;
592 
593 	*entryp = entry = NULL;
594 
595 	manager = &S2C(session)->lsm_manager;
596 
597 	/*
598 	 * Pop the entry off the correct queue based on our work type.
599 	 */
600 	if (type == WT_LSM_WORK_SWITCH)
601 		LSM_POP_ENTRY(&manager->switchqh,
602 		    &manager->switch_lock, lsm_work_queue_switch);
603 	else if (type == WT_LSM_WORK_MERGE)
604 		LSM_POP_ENTRY(&manager->managerqh,
605 		    &manager->manager_lock, lsm_work_queue_manager);
606 	else
607 		LSM_POP_ENTRY(&manager->appqh,
608 		    &manager->app_lock, lsm_work_queue_app);
609 	if (entry != NULL)
610 		WT_STAT_CONN_INCR(session, lsm_work_units_done);
611 	*entryp = entry;
612 	return (0);
613 }
614 
615 /*
616  * Push a work unit onto the appropriate queue.  This macro assumes we are
617  * called from __wt_lsm_manager_push_entry and we have session and entry
618  * available for use.
619  */
620 #define	LSM_PUSH_ENTRY(qh, qlock, qlen) do {				\
621 	__wt_spin_lock(session, qlock);					\
622 	TAILQ_INSERT_TAIL((qh), entry, q);				\
623 	WT_STAT_CONN_INCR(session, qlen);				\
624 	__wt_spin_unlock(session, qlock);				\
625 } while (0)
626 
627 /*
628  * __wt_lsm_manager_push_entry --
629  *	Add an entry to the end of the switch queue.
630  */
631 int
__wt_lsm_manager_push_entry(WT_SESSION_IMPL * session,uint32_t type,uint32_t flags,WT_LSM_TREE * lsm_tree)632 __wt_lsm_manager_push_entry(WT_SESSION_IMPL *session,
633     uint32_t type, uint32_t flags, WT_LSM_TREE *lsm_tree)
634 {
635 	WT_LSM_MANAGER *manager;
636 	WT_LSM_WORK_UNIT *entry;
637 
638 	manager = &S2C(session)->lsm_manager;
639 
640 	WT_ASSERT(session, !F_ISSET(S2C(session), WT_CONN_READONLY));
641 	/*
642 	 * Don't add merges or bloom filter creates if merges
643 	 * or bloom filters are disabled in the tree.
644 	 */
645 	switch (type) {
646 	case WT_LSM_WORK_BLOOM:
647 		if (FLD_ISSET(lsm_tree->bloom, WT_LSM_BLOOM_OFF))
648 			return (0);
649 		break;
650 	case WT_LSM_WORK_MERGE:
651 		if (!F_ISSET(lsm_tree, WT_LSM_TREE_MERGES))
652 			return (0);
653 		break;
654 	}
655 
656 	/*
657 	 * Don't allow any work units unless a tree is active, this avoids
658 	 * races on shutdown between clearing out queues and pushing new
659 	 * work units.
660 	 *
661 	 * Increment the queue reference before checking the flag since
662 	 * on close, the flag is cleared and then the queue reference count
663 	 * is checked.
664 	 */
665 	(void)__wt_atomic_add32(&lsm_tree->queue_ref, 1);
666 	if (!lsm_tree->active) {
667 		(void)__wt_atomic_sub32(&lsm_tree->queue_ref, 1);
668 		return (0);
669 	}
670 
671 	(void)__wt_atomic_add64(&lsm_tree->work_count, 1);
672 	WT_RET(__wt_calloc_one(session, &entry));
673 	entry->type = type;
674 	entry->flags = flags;
675 	entry->lsm_tree = lsm_tree;
676 	WT_STAT_CONN_INCR(session, lsm_work_units_created);
677 
678 	if (type == WT_LSM_WORK_SWITCH)
679 		LSM_PUSH_ENTRY(&manager->switchqh,
680 		    &manager->switch_lock, lsm_work_queue_switch);
681 	else if (type == WT_LSM_WORK_MERGE)
682 		LSM_PUSH_ENTRY(&manager->managerqh,
683 		    &manager->manager_lock, lsm_work_queue_manager);
684 	else
685 		LSM_PUSH_ENTRY(&manager->appqh,
686 		    &manager->app_lock, lsm_work_queue_app);
687 
688 	__wt_cond_signal(session, manager->work_cond);
689 	return (0);
690 }
691