1 /*-
2 * Copyright (c) 2014-2018 MongoDB, Inc.
3 * Copyright (c) 2008-2014 WiredTiger, Inc.
4 * All rights reserved.
5 *
6 * See the file LICENSE for redistribution information.
7 */
8
9 #include "wt_internal.h"
10
11 static int __lsm_manager_run_server(WT_SESSION_IMPL *);
12
13 static WT_THREAD_RET __lsm_worker_manager(void *);
14
15 /*
16 * __wt_lsm_manager_config --
17 * Configure the LSM manager.
18 */
19 int
__wt_lsm_manager_config(WT_SESSION_IMPL * session,const char ** cfg)20 __wt_lsm_manager_config(WT_SESSION_IMPL *session, const char **cfg)
21 {
22 WT_CONFIG_ITEM cval;
23 WT_CONNECTION_IMPL *conn;
24
25 conn = S2C(session);
26
27 WT_RET(__wt_config_gets(session, cfg, "lsm_manager.merge", &cval));
28 if (cval.val)
29 F_SET(conn, WT_CONN_LSM_MERGE);
30 WT_RET(__wt_config_gets(
31 session, cfg, "lsm_manager.worker_thread_max", &cval));
32 if (cval.val)
33 conn->lsm_manager.lsm_workers_max = (uint32_t)cval.val;
34 return (0);
35 }
36
37 /*
38 * __lsm_general_worker_start --
39 * Start up all of the general LSM worker threads.
40 */
41 static int
__lsm_general_worker_start(WT_SESSION_IMPL * session)42 __lsm_general_worker_start(WT_SESSION_IMPL *session)
43 {
44 WT_CONNECTION_IMPL *conn;
45 WT_LSM_MANAGER *manager;
46 WT_LSM_WORKER_ARGS *worker_args;
47
48 conn = S2C(session);
49 manager = &conn->lsm_manager;
50
51 /*
52 * Start the worker threads or new worker threads if called via
53 * reconfigure. The LSM manager is worker[0].
54 * This should get more sophisticated in the future - only launching
55 * as many worker threads as are required to keep up with demand.
56 */
57 WT_ASSERT(session, manager->lsm_workers > 0);
58 WT_ASSERT(session, manager->lsm_workers < manager->lsm_workers_max);
59 for (; manager->lsm_workers < manager->lsm_workers_max;
60 manager->lsm_workers++) {
61 worker_args =
62 &manager->lsm_worker_cookies[manager->lsm_workers];
63 worker_args->work_cond = manager->work_cond;
64 worker_args->id = manager->lsm_workers;
65 /*
66 * The first worker only does switch and drop operations as
67 * these are both short operations and it is essential
68 * that switches are responsive to avoid introducing
69 * throttling stalls.
70 */
71 if (manager->lsm_workers == 1)
72 worker_args->type =
73 WT_LSM_WORK_DROP | WT_LSM_WORK_SWITCH;
74 else {
75 worker_args->type = WT_LSM_WORK_GENERAL_OPS;
76 /*
77 * Only allow half of the threads to run merges to
78 * avoid all all workers getting stuck in long-running
79 * merge operations. Make sure the first worker is
80 * allowed, so that there is at least one thread
81 * capable of running merges. We know the first
82 * worker is id 2, so set merges on even numbered
83 * workers.
84 */
85 if (manager->lsm_workers % 2 == 0)
86 FLD_SET(worker_args->type, WT_LSM_WORK_MERGE);
87 }
88 WT_RET(__wt_lsm_worker_start(session, worker_args));
89 }
90
91 /*
92 * Setup the first worker properly - if there are only a minimal
93 * number of workers allow the first worker to flush. Otherwise a
94 * single merge can lead to switched chunks filling up the cache.
95 * This is separate to the main loop so that it is applied on startup
96 * and reconfigure.
97 */
98 if (manager->lsm_workers_max == WT_LSM_MIN_WORKERS)
99 FLD_SET(manager->lsm_worker_cookies[1].type, WT_LSM_WORK_FLUSH);
100 else
101 FLD_CLR(manager->lsm_worker_cookies[1].type, WT_LSM_WORK_FLUSH);
102
103 return (0);
104 }
105
106 /*
107 * __lsm_stop_workers --
108 * Stop worker threads until the number reaches the configured amount.
109 */
110 static int
__lsm_stop_workers(WT_SESSION_IMPL * session)111 __lsm_stop_workers(WT_SESSION_IMPL *session)
112 {
113 WT_LSM_MANAGER *manager;
114 WT_LSM_WORKER_ARGS *worker_args;
115
116 manager = &S2C(session)->lsm_manager;
117 /*
118 * Start at the end of the list of threads and stop them until we have
119 * the desired number. We want to keep all active threads packed at the
120 * front of the worker array.
121 */
122 WT_ASSERT(session, manager->lsm_workers > manager->lsm_workers_max);
123 for (; manager->lsm_workers > manager->lsm_workers_max;
124 manager->lsm_workers--) {
125 worker_args =
126 &manager->lsm_worker_cookies[manager->lsm_workers - 1];
127 WT_ASSERT(session, worker_args->tid_set);
128
129 WT_RET(__wt_lsm_worker_stop(session, worker_args));
130 worker_args->type = 0;
131
132 /*
133 * We do not clear the other fields because they are allocated
134 * statically when the connection was opened.
135 */
136 }
137
138 /*
139 * Setup the first worker properly - if there are only a minimal
140 * number of workers it should flush. Since the number of threads
141 * is being reduced the field can't already be set.
142 */
143 if (manager->lsm_workers_max == WT_LSM_MIN_WORKERS)
144 FLD_SET(manager->lsm_worker_cookies[1].type, WT_LSM_WORK_FLUSH);
145
146 return (0);
147 }
148
149 /*
150 * __wt_lsm_manager_reconfig --
151 * Re-configure the LSM manager.
152 */
153 int
__wt_lsm_manager_reconfig(WT_SESSION_IMPL * session,const char ** cfg)154 __wt_lsm_manager_reconfig(WT_SESSION_IMPL *session, const char **cfg)
155 {
156 WT_LSM_MANAGER *manager;
157 uint32_t orig_workers;
158
159 manager = &S2C(session)->lsm_manager;
160 orig_workers = manager->lsm_workers_max;
161
162 WT_RET(__wt_lsm_manager_config(session, cfg));
163 /*
164 * If LSM hasn't started yet, we simply reconfigured the settings
165 * and we'll let the normal code path start the threads.
166 */
167 if (manager->lsm_workers_max == 0)
168 return (0);
169 if (manager->lsm_workers == 0)
170 return (0);
171 /*
172 * If the number of workers has not changed, we're done.
173 */
174 if (orig_workers == manager->lsm_workers_max)
175 return (0);
176 /*
177 * If we want more threads, start them.
178 */
179 if (manager->lsm_workers_max > orig_workers)
180 return (__lsm_general_worker_start(session));
181
182 /*
183 * Otherwise we want to reduce the number of workers.
184 */
185 WT_ASSERT(session, manager->lsm_workers_max < orig_workers);
186 WT_RET(__lsm_stop_workers(session));
187 return (0);
188 }
189
190 /*
191 * __wt_lsm_manager_start --
192 * Start the LSM management infrastructure. Our queues and locks were
193 * initialized when the connection was initialized.
194 */
195 int
__wt_lsm_manager_start(WT_SESSION_IMPL * session)196 __wt_lsm_manager_start(WT_SESSION_IMPL *session)
197 {
198 WT_CONNECTION_IMPL *conn;
199 WT_DECL_RET;
200 WT_LSM_MANAGER *manager;
201 WT_SESSION_IMPL *worker_session;
202 uint32_t i;
203
204 conn = S2C(session);
205 manager = &conn->lsm_manager;
206
207 /*
208 * If readonly or the manager is running, or we've already failed,
209 * there's no work to do.
210 */
211 if (F_ISSET(conn, WT_CONN_READONLY) ||
212 manager->lsm_workers != 0 ||
213 F_ISSET(manager, WT_LSM_MANAGER_SHUTDOWN))
214 return (0);
215
216 /* It's possible to race, see if we're the winner. */
217 if (!__wt_atomic_cas32(&manager->lsm_workers, 0, 1))
218 return (0);
219
220 /* We need at least a manager, a switch thread and a generic worker. */
221 WT_ASSERT(session, manager->lsm_workers_max > 2);
222
223 /*
224 * Open sessions for all potential worker threads here - it's not
225 * safe to have worker threads open/close sessions themselves.
226 * All the LSM worker threads do their operations on read-only
227 * files. Use read-uncommitted isolation to avoid keeping
228 * updates in cache unnecessarily.
229 */
230 for (i = 0; i < WT_LSM_MAX_WORKERS; i++) {
231 WT_ERR(__wt_open_internal_session(
232 conn, "lsm-worker", false, 0, &worker_session));
233 worker_session->isolation = WT_ISO_READ_UNCOMMITTED;
234 manager->lsm_worker_cookies[i].session = worker_session;
235 }
236
237 F_SET(conn, WT_CONN_SERVER_LSM);
238
239 /* Start the LSM manager thread. */
240 WT_ERR(__wt_thread_create(session, &manager->lsm_worker_cookies[0].tid,
241 __lsm_worker_manager, &manager->lsm_worker_cookies[0]));
242
243 if (0) {
244 err: for (i = 0;
245 (worker_session =
246 manager->lsm_worker_cookies[i].session) != NULL;
247 i++)
248 WT_TRET((&worker_session->iface)->close(
249 &worker_session->iface, NULL));
250
251 /* Make the failure permanent, we won't try again. */
252 F_SET(manager, WT_LSM_MANAGER_SHUTDOWN);
253
254 /*
255 * Reset the workers count (otherwise, LSM destroy will hang
256 * waiting for threads to exit.
257 */
258 WT_PUBLISH(manager->lsm_workers, 0);
259 }
260 return (ret);
261 }
262
263 /*
264 * __wt_lsm_manager_free_work_unit --
265 * Release an LSM tree work unit.
266 */
267 void
__wt_lsm_manager_free_work_unit(WT_SESSION_IMPL * session,WT_LSM_WORK_UNIT * entry)268 __wt_lsm_manager_free_work_unit(
269 WT_SESSION_IMPL *session, WT_LSM_WORK_UNIT *entry)
270 {
271 if (entry != NULL) {
272 WT_ASSERT(session, entry->lsm_tree->queue_ref > 0);
273
274 (void)__wt_atomic_sub32(&entry->lsm_tree->queue_ref, 1);
275 __wt_free(session, entry);
276 }
277 }
278
279 /*
280 * __wt_lsm_manager_destroy --
281 * Destroy the LSM manager threads and subsystem.
282 */
283 int
__wt_lsm_manager_destroy(WT_SESSION_IMPL * session)284 __wt_lsm_manager_destroy(WT_SESSION_IMPL *session)
285 {
286 WT_CONNECTION_IMPL *conn;
287 WT_DECL_RET;
288 WT_LSM_MANAGER *manager;
289 WT_LSM_WORK_UNIT *current;
290 WT_SESSION *wt_session;
291 uint64_t removed;
292 uint32_t i;
293
294 conn = S2C(session);
295 manager = &conn->lsm_manager;
296 removed = 0;
297
298 /* Clear the LSM server flag. */
299 F_CLR(conn, WT_CONN_SERVER_LSM);
300
301 WT_ASSERT(session, !F_ISSET(conn, WT_CONN_READONLY) ||
302 manager->lsm_workers == 0);
303 if (manager->lsm_workers > 0) {
304 /* Wait for the main LSM manager thread to finish. */
305 while (!F_ISSET(manager, WT_LSM_MANAGER_SHUTDOWN)) {
306 WT_STAT_CONN_INCR(session, conn_close_blocked_lsm);
307 __wt_yield();
308 }
309
310 /* Clean up open LSM handles. */
311 ret = __wt_lsm_tree_close_all(session);
312
313 WT_TRET(__wt_thread_join(
314 session, &manager->lsm_worker_cookies[0].tid));
315
316 /* Release memory from any operations left on the queue. */
317 while ((current = TAILQ_FIRST(&manager->switchqh)) != NULL) {
318 TAILQ_REMOVE(&manager->switchqh, current, q);
319 ++removed;
320 __wt_lsm_manager_free_work_unit(session, current);
321 }
322 while ((current = TAILQ_FIRST(&manager->appqh)) != NULL) {
323 TAILQ_REMOVE(&manager->appqh, current, q);
324 ++removed;
325 __wt_lsm_manager_free_work_unit(session, current);
326 }
327 while ((current = TAILQ_FIRST(&manager->managerqh)) != NULL) {
328 TAILQ_REMOVE(&manager->managerqh, current, q);
329 ++removed;
330 __wt_lsm_manager_free_work_unit(session, current);
331 }
332
333 /* Close all LSM worker sessions. */
334 for (i = 0; i < WT_LSM_MAX_WORKERS; i++) {
335 wt_session =
336 &manager->lsm_worker_cookies[i].session->iface;
337 WT_TRET(wt_session->close(wt_session, NULL));
338 }
339 }
340 WT_STAT_CONN_INCRV(session, lsm_work_units_discarded, removed);
341
342 /* Free resources that are allocated in connection initialize */
343 __wt_spin_destroy(session, &manager->switch_lock);
344 __wt_spin_destroy(session, &manager->app_lock);
345 __wt_spin_destroy(session, &manager->manager_lock);
346 __wt_cond_destroy(session, &manager->work_cond);
347
348 return (ret);
349 }
350
351 /*
352 * __lsm_manager_worker_shutdown --
353 * Shutdown the LSM worker threads.
354 */
355 static int
__lsm_manager_worker_shutdown(WT_SESSION_IMPL * session)356 __lsm_manager_worker_shutdown(WT_SESSION_IMPL *session)
357 {
358 WT_DECL_RET;
359 WT_LSM_MANAGER *manager;
360 u_int i;
361
362 manager = &S2C(session)->lsm_manager;
363
364 /*
365 * Wait for the rest of the LSM workers to shutdown. Start at index
366 * one - since we (the manager) are at index 0.
367 */
368 for (i = 1; i < manager->lsm_workers; i++) {
369 WT_ASSERT(session, manager->lsm_worker_cookies[i].tid_set);
370 WT_TRET(__wt_lsm_worker_stop(
371 session, &manager->lsm_worker_cookies[i]));
372 }
373 return (ret);
374 }
375
376 /*
377 * __lsm_manager_run_server --
378 * Run manager thread operations.
379 */
380 static int
__lsm_manager_run_server(WT_SESSION_IMPL * session)381 __lsm_manager_run_server(WT_SESSION_IMPL *session)
382 {
383 struct timespec now;
384 WT_CONNECTION_IMPL *conn;
385 WT_DECL_RET;
386 WT_LSM_TREE *lsm_tree;
387 uint64_t fillms, idlems;
388 bool dhandle_locked;
389
390 conn = S2C(session);
391 dhandle_locked = false;
392
393 while (F_ISSET(conn, WT_CONN_SERVER_LSM)) {
394 __wt_sleep(0, 10000);
395 if (TAILQ_EMPTY(&conn->lsmqh))
396 continue;
397 __wt_readlock(session, &conn->dhandle_lock);
398 F_SET(session, WT_SESSION_LOCKED_HANDLE_LIST_READ);
399 dhandle_locked = true;
400 TAILQ_FOREACH(lsm_tree, &conn->lsmqh, q) {
401 if (!lsm_tree->active)
402 continue;
403 __wt_epoch(session, &now);
404 /*
405 * If work was added reset our counts and time.
406 * Otherwise compute an idle time.
407 */
408 if (lsm_tree->work_count != lsm_tree->mgr_work_count ||
409 lsm_tree->work_count == 0) {
410 idlems = 0;
411 lsm_tree->mgr_work_count = lsm_tree->work_count;
412 lsm_tree->last_active = now;
413 } else
414 idlems =
415 WT_TIMEDIFF_MS(now, lsm_tree->last_active);
416 fillms = 3 * lsm_tree->chunk_fill_ms;
417 if (fillms == 0)
418 fillms = 10000;
419 /*
420 * If the tree appears to not be triggering enough
421 * LSM maintenance, help it out. Some types of
422 * additional work units don't hurt, and can be
423 * necessary if some work units aren't completed for
424 * some reason.
425 * If the tree hasn't been modified, and there are
426 * more than 1 chunks - try to get the tree smaller
427 * so queries run faster.
428 * If we are getting aggressive - ensure there are
429 * enough work units that we can get chunks merged.
430 * If we aren't pushing enough work units, compared
431 * to how often new chunks are being created add some
432 * more.
433 */
434 if (lsm_tree->queue_ref >= LSM_TREE_MAX_QUEUE)
435 WT_STAT_CONN_INCR(session,
436 lsm_work_queue_max);
437 else if ((!lsm_tree->modified &&
438 lsm_tree->nchunks > 1) ||
439 (lsm_tree->queue_ref == 0 &&
440 lsm_tree->nchunks > 1) ||
441 (lsm_tree->merge_aggressiveness >
442 WT_LSM_AGGRESSIVE_THRESHOLD &&
443 !F_ISSET(lsm_tree, WT_LSM_TREE_COMPACTING)) ||
444 idlems > fillms) {
445 WT_ERR(__wt_lsm_manager_push_entry(
446 session, WT_LSM_WORK_SWITCH, 0, lsm_tree));
447 WT_ERR(__wt_lsm_manager_push_entry(
448 session, WT_LSM_WORK_DROP, 0, lsm_tree));
449 WT_ERR(__wt_lsm_manager_push_entry(
450 session, WT_LSM_WORK_FLUSH, 0, lsm_tree));
451 WT_ERR(__wt_lsm_manager_push_entry(
452 session, WT_LSM_WORK_BLOOM, 0, lsm_tree));
453 __wt_verbose(session,
454 WT_VERB_LSM_MANAGER,
455 "MGR %s: queue %" PRIu32 " mod %d "
456 "nchunks %" PRIu32
457 " flags %#" PRIx32 " aggressive %" PRIu32
458 " idlems %" PRIu64
459 " fillms %" PRIu64,
460 lsm_tree->name, lsm_tree->queue_ref,
461 lsm_tree->modified, lsm_tree->nchunks,
462 lsm_tree->flags,
463 lsm_tree->merge_aggressiveness,
464 idlems, fillms);
465 WT_ERR(__wt_lsm_manager_push_entry(
466 session, WT_LSM_WORK_MERGE, 0, lsm_tree));
467 }
468 }
469 __wt_readunlock(session, &conn->dhandle_lock);
470 F_CLR(session, WT_SESSION_LOCKED_HANDLE_LIST_READ);
471 dhandle_locked = false;
472 }
473
474 err: if (dhandle_locked) {
475 __wt_readunlock(session, &conn->dhandle_lock);
476 F_CLR(session, WT_SESSION_LOCKED_HANDLE_LIST_READ);
477 }
478 return (ret);
479 }
480
481 /*
482 * __lsm_worker_manager --
483 * A thread that manages all open LSM trees, and the shared LSM worker
484 * threads.
485 */
486 static WT_THREAD_RET
__lsm_worker_manager(void * arg)487 __lsm_worker_manager(void *arg)
488 {
489 WT_DECL_RET;
490 WT_LSM_MANAGER *manager;
491 WT_LSM_WORKER_ARGS *cookie;
492 WT_SESSION_IMPL *session;
493
494 cookie = (WT_LSM_WORKER_ARGS *)arg;
495 session = cookie->session;
496 manager = &S2C(session)->lsm_manager;
497
498 WT_ERR(__lsm_general_worker_start(session));
499 WT_ERR(__lsm_manager_run_server(session));
500 WT_ERR(__lsm_manager_worker_shutdown(session));
501
502 if (ret != 0) {
503 err: WT_PANIC_MSG(session, ret, "LSM worker manager thread error");
504 }
505
506 /* Connection close waits on us to shutdown, let it know we're done. */
507 F_SET(manager, WT_LSM_MANAGER_SHUTDOWN);
508 WT_FULL_BARRIER();
509
510 return (WT_THREAD_RET_VALUE);
511 }
512
513 /*
514 * __wt_lsm_manager_clear_tree --
515 * Remove all entries for a tree from the LSM manager queues. This
516 * introduces an inefficiency if LSM trees are being opened and closed
517 * regularly.
518 */
519 void
__wt_lsm_manager_clear_tree(WT_SESSION_IMPL * session,WT_LSM_TREE * lsm_tree)520 __wt_lsm_manager_clear_tree(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree)
521 {
522 WT_LSM_MANAGER *manager;
523 WT_LSM_WORK_UNIT *current, *tmp;
524 uint64_t removed;
525
526 manager = &S2C(session)->lsm_manager;
527 removed = 0;
528
529 /* Clear out the tree from the switch queue */
530 __wt_spin_lock(session, &manager->switch_lock);
531 TAILQ_FOREACH_SAFE(current, &manager->switchqh, q, tmp) {
532 if (current->lsm_tree != lsm_tree)
533 continue;
534 ++removed;
535 TAILQ_REMOVE(&manager->switchqh, current, q);
536 __wt_lsm_manager_free_work_unit(session, current);
537 }
538 __wt_spin_unlock(session, &manager->switch_lock);
539 /* Clear out the tree from the application queue */
540 __wt_spin_lock(session, &manager->app_lock);
541 TAILQ_FOREACH_SAFE(current, &manager->appqh, q, tmp) {
542 if (current->lsm_tree != lsm_tree)
543 continue;
544 ++removed;
545 TAILQ_REMOVE(&manager->appqh, current, q);
546 __wt_lsm_manager_free_work_unit(session, current);
547 }
548 __wt_spin_unlock(session, &manager->app_lock);
549 /* Clear out the tree from the manager queue */
550 __wt_spin_lock(session, &manager->manager_lock);
551 TAILQ_FOREACH_SAFE(current, &manager->managerqh, q, tmp) {
552 if (current->lsm_tree != lsm_tree)
553 continue;
554 ++removed;
555 TAILQ_REMOVE(&manager->managerqh, current, q);
556 __wt_lsm_manager_free_work_unit(session, current);
557 }
558 __wt_spin_unlock(session, &manager->manager_lock);
559 WT_STAT_CONN_INCRV(session, lsm_work_units_discarded, removed);
560 }
561
562 /*
563 * We assume this is only called from __wt_lsm_manager_pop_entry and we
564 * have session, entry and type available to use. If the queue is empty
565 * we may return from the macro.
566 */
567 #define LSM_POP_ENTRY(qh, qlock, qlen) do { \
568 if (TAILQ_EMPTY(qh)) \
569 return (0); \
570 __wt_spin_lock(session, qlock); \
571 TAILQ_FOREACH(entry, (qh), q) { \
572 if (FLD_ISSET(type, entry->type)) { \
573 TAILQ_REMOVE(qh, entry, q); \
574 WT_STAT_CONN_DECR(session, qlen); \
575 break; \
576 } \
577 } \
578 __wt_spin_unlock(session, (qlock)); \
579 } while (0)
580
581 /*
582 * __wt_lsm_manager_pop_entry --
583 * Retrieve the head of the queue, if it matches the requested work
584 * unit type.
585 */
586 int
__wt_lsm_manager_pop_entry(WT_SESSION_IMPL * session,uint32_t type,WT_LSM_WORK_UNIT ** entryp)587 __wt_lsm_manager_pop_entry(
588 WT_SESSION_IMPL *session, uint32_t type, WT_LSM_WORK_UNIT **entryp)
589 {
590 WT_LSM_MANAGER *manager;
591 WT_LSM_WORK_UNIT *entry;
592
593 *entryp = entry = NULL;
594
595 manager = &S2C(session)->lsm_manager;
596
597 /*
598 * Pop the entry off the correct queue based on our work type.
599 */
600 if (type == WT_LSM_WORK_SWITCH)
601 LSM_POP_ENTRY(&manager->switchqh,
602 &manager->switch_lock, lsm_work_queue_switch);
603 else if (type == WT_LSM_WORK_MERGE)
604 LSM_POP_ENTRY(&manager->managerqh,
605 &manager->manager_lock, lsm_work_queue_manager);
606 else
607 LSM_POP_ENTRY(&manager->appqh,
608 &manager->app_lock, lsm_work_queue_app);
609 if (entry != NULL)
610 WT_STAT_CONN_INCR(session, lsm_work_units_done);
611 *entryp = entry;
612 return (0);
613 }
614
615 /*
616 * Push a work unit onto the appropriate queue. This macro assumes we are
617 * called from __wt_lsm_manager_push_entry and we have session and entry
618 * available for use.
619 */
620 #define LSM_PUSH_ENTRY(qh, qlock, qlen) do { \
621 __wt_spin_lock(session, qlock); \
622 TAILQ_INSERT_TAIL((qh), entry, q); \
623 WT_STAT_CONN_INCR(session, qlen); \
624 __wt_spin_unlock(session, qlock); \
625 } while (0)
626
627 /*
628 * __wt_lsm_manager_push_entry --
629 * Add an entry to the end of the switch queue.
630 */
631 int
__wt_lsm_manager_push_entry(WT_SESSION_IMPL * session,uint32_t type,uint32_t flags,WT_LSM_TREE * lsm_tree)632 __wt_lsm_manager_push_entry(WT_SESSION_IMPL *session,
633 uint32_t type, uint32_t flags, WT_LSM_TREE *lsm_tree)
634 {
635 WT_LSM_MANAGER *manager;
636 WT_LSM_WORK_UNIT *entry;
637
638 manager = &S2C(session)->lsm_manager;
639
640 WT_ASSERT(session, !F_ISSET(S2C(session), WT_CONN_READONLY));
641 /*
642 * Don't add merges or bloom filter creates if merges
643 * or bloom filters are disabled in the tree.
644 */
645 switch (type) {
646 case WT_LSM_WORK_BLOOM:
647 if (FLD_ISSET(lsm_tree->bloom, WT_LSM_BLOOM_OFF))
648 return (0);
649 break;
650 case WT_LSM_WORK_MERGE:
651 if (!F_ISSET(lsm_tree, WT_LSM_TREE_MERGES))
652 return (0);
653 break;
654 }
655
656 /*
657 * Don't allow any work units unless a tree is active, this avoids
658 * races on shutdown between clearing out queues and pushing new
659 * work units.
660 *
661 * Increment the queue reference before checking the flag since
662 * on close, the flag is cleared and then the queue reference count
663 * is checked.
664 */
665 (void)__wt_atomic_add32(&lsm_tree->queue_ref, 1);
666 if (!lsm_tree->active) {
667 (void)__wt_atomic_sub32(&lsm_tree->queue_ref, 1);
668 return (0);
669 }
670
671 (void)__wt_atomic_add64(&lsm_tree->work_count, 1);
672 WT_RET(__wt_calloc_one(session, &entry));
673 entry->type = type;
674 entry->flags = flags;
675 entry->lsm_tree = lsm_tree;
676 WT_STAT_CONN_INCR(session, lsm_work_units_created);
677
678 if (type == WT_LSM_WORK_SWITCH)
679 LSM_PUSH_ENTRY(&manager->switchqh,
680 &manager->switch_lock, lsm_work_queue_switch);
681 else if (type == WT_LSM_WORK_MERGE)
682 LSM_PUSH_ENTRY(&manager->managerqh,
683 &manager->manager_lock, lsm_work_queue_manager);
684 else
685 LSM_PUSH_ENTRY(&manager->appqh,
686 &manager->app_lock, lsm_work_queue_app);
687
688 __wt_cond_signal(session, manager->work_cond);
689 return (0);
690 }
691