1 /*-
2 * Copyright (c) 2014-2018 MongoDB, Inc.
3 * Copyright (c) 2008-2014 WiredTiger, Inc.
4 * All rights reserved.
5 *
6 * See the file LICENSE for redistribution information.
7 */
8
9 #include "wt_internal.h"
10
11 #define WT_FORALL_CURSORS(clsm, c, i) \
12 for ((i) = (clsm)->nchunks; (i) > 0;) \
13 if (((c) = (clsm)->chunks[--(i)]->cursor) != NULL)
14
15 #define WT_LSM_CURCMP(s, lsm_tree, c1, c2, cmp) \
16 __wt_compare(s, (lsm_tree)->collator, &(c1)->key, &(c2)->key, &(cmp))
17
18 static int __clsm_lookup(WT_CURSOR_LSM *, WT_ITEM *);
19 static int __clsm_open_cursors(WT_CURSOR_LSM *, bool, u_int, uint32_t);
20 static int __clsm_reset_cursors(WT_CURSOR_LSM *, WT_CURSOR *);
21 static int __clsm_search_near(WT_CURSOR *cursor, int *exactp);
22
23 /*
24 * __wt_clsm_request_switch --
25 * Request an LSM tree switch for a cursor operation.
26 */
27 int
__wt_clsm_request_switch(WT_CURSOR_LSM * clsm)28 __wt_clsm_request_switch(WT_CURSOR_LSM *clsm)
29 {
30 WT_DECL_RET;
31 WT_LSM_TREE *lsm_tree;
32 WT_SESSION_IMPL *session;
33
34 lsm_tree = clsm->lsm_tree;
35 session = (WT_SESSION_IMPL *)clsm->iface.session;
36
37 if (!lsm_tree->need_switch) {
38 /*
39 * Check that we are up-to-date: don't set the switch if the
40 * tree has changed since we last opened cursors: that can lead
41 * to switching multiple times when only one switch is
42 * required, creating very small chunks.
43 */
44 __wt_lsm_tree_readlock(session, lsm_tree);
45 if (lsm_tree->nchunks == 0 ||
46 (clsm->dsk_gen == lsm_tree->dsk_gen &&
47 !lsm_tree->need_switch)) {
48 lsm_tree->need_switch = true;
49 ret = __wt_lsm_manager_push_entry(
50 session, WT_LSM_WORK_SWITCH, 0, lsm_tree);
51 }
52 __wt_lsm_tree_readunlock(session, lsm_tree);
53 }
54
55 return (ret);
56 }
57
58 /*
59 * __wt_clsm_await_switch --
60 * Wait for a switch to have completed in the LSM tree
61 */
62 int
__wt_clsm_await_switch(WT_CURSOR_LSM * clsm)63 __wt_clsm_await_switch(WT_CURSOR_LSM *clsm)
64 {
65 WT_LSM_TREE *lsm_tree;
66 WT_SESSION_IMPL *session;
67 int waited;
68
69 lsm_tree = clsm->lsm_tree;
70 session = (WT_SESSION_IMPL *)clsm->iface.session;
71
72 /*
73 * If there is no primary chunk, or a chunk has overflowed the hard
74 * limit, which either means a worker thread has fallen behind or there
75 * has just been a user-level checkpoint, wait until the tree changes.
76 *
77 * We used to switch chunks in the application thread here, but that is
78 * problematic because there is a transaction in progress and it could
79 * roll back, leaving the metadata inconsistent.
80 */
81 for (waited = 0;
82 lsm_tree->nchunks == 0 ||
83 clsm->dsk_gen == lsm_tree->dsk_gen;
84 ++waited) {
85 if (waited % WT_THOUSAND == 0)
86 WT_RET(__wt_lsm_manager_push_entry(
87 session, WT_LSM_WORK_SWITCH, 0, lsm_tree));
88 __wt_sleep(0, 10);
89 }
90 return (0);
91 }
92
93 /*
94 * __clsm_enter_update --
95 * Make sure an LSM cursor is ready to perform an update.
96 */
97 static int
__clsm_enter_update(WT_CURSOR_LSM * clsm)98 __clsm_enter_update(WT_CURSOR_LSM *clsm)
99 {
100 WT_CURSOR *primary;
101 WT_LSM_CHUNK *primary_chunk;
102 WT_LSM_TREE *lsm_tree;
103 WT_SESSION_IMPL *session;
104 bool hard_limit, have_primary, ovfl;
105
106 lsm_tree = clsm->lsm_tree;
107 session = (WT_SESSION_IMPL *)clsm->iface.session;
108
109 if (clsm->nchunks == 0) {
110 primary = NULL;
111 have_primary = false;
112 } else {
113 primary = clsm->chunks[clsm->nchunks - 1]->cursor;
114 primary_chunk = clsm->primary_chunk;
115 WT_ASSERT(session, F_ISSET(&session->txn, WT_TXN_HAS_ID));
116 have_primary = (primary != NULL && primary_chunk != NULL &&
117 (primary_chunk->switch_txn == WT_TXN_NONE ||
118 WT_TXNID_LT(session->txn.id, primary_chunk->switch_txn)));
119 }
120
121 /*
122 * In LSM there are multiple btrees active at one time. The tree
123 * switch code needs to use btree API methods, and it wants to
124 * operate on the btree for the primary chunk. Set that up now.
125 *
126 * If the primary chunk has grown too large, set a flag so the worker
127 * thread will switch when it gets a chance to avoid introducing high
128 * latency into application threads. Don't do this indefinitely: if a
129 * chunk grows twice as large as the configured size, block until it
130 * can be switched.
131 */
132 hard_limit = lsm_tree->need_switch;
133
134 if (have_primary) {
135 WT_ENTER_PAGE_INDEX(session);
136 WT_WITH_BTREE(session, ((WT_CURSOR_BTREE *)primary)->btree,
137 ovfl = __wt_btree_lsm_over_size(session, hard_limit ?
138 2 * lsm_tree->chunk_size : lsm_tree->chunk_size));
139 WT_LEAVE_PAGE_INDEX(session);
140
141 /* If there was no overflow, we're done. */
142 if (!ovfl)
143 return (0);
144 }
145
146 /* Request a switch. */
147 WT_RET(__wt_clsm_request_switch(clsm));
148
149 /* If we only overflowed the soft limit, we're done. */
150 if (have_primary && !hard_limit)
151 return (0);
152
153 WT_RET(__wt_clsm_await_switch(clsm));
154
155 return (0);
156 }
157
158 /*
159 * __clsm_enter --
160 * Start an operation on an LSM cursor, update if the tree has changed.
161 */
162 static inline int
__clsm_enter(WT_CURSOR_LSM * clsm,bool reset,bool update)163 __clsm_enter(WT_CURSOR_LSM *clsm, bool reset, bool update)
164 {
165 WT_DECL_RET;
166 WT_LSM_TREE *lsm_tree;
167 WT_SESSION_IMPL *session;
168 WT_TXN *txn;
169 uint64_t i, pinned_id , switch_txn;
170
171 lsm_tree = clsm->lsm_tree;
172 session = (WT_SESSION_IMPL *)clsm->iface.session;
173 txn = &session->txn;
174
175 /* Merge cursors never update. */
176 if (F_ISSET(clsm, WT_CLSM_MERGE))
177 return (0);
178
179 if (reset) {
180 WT_ASSERT(session, !F_ISSET(&clsm->iface,
181 WT_CURSTD_KEY_INT | WT_CURSTD_VALUE_INT));
182 WT_RET(__clsm_reset_cursors(clsm, NULL));
183 }
184
185 for (;;) {
186 /* Check if the cursor looks up-to-date. */
187 if (clsm->dsk_gen != lsm_tree->dsk_gen &&
188 lsm_tree->nchunks != 0)
189 goto open;
190
191 /* Update the maximum transaction ID in the primary chunk. */
192 if (update) {
193 /*
194 * Ensure that there is a transaction snapshot active.
195 */
196 WT_RET(__wt_txn_autocommit_check(session));
197 WT_RET(__wt_txn_id_check(session));
198
199 WT_RET(__clsm_enter_update(clsm));
200 /*
201 * Switching the tree will update the generation before
202 * updating the switch transaction. We test the
203 * transaction in clsm_enter_update. Now test the
204 * disk generation to avoid races.
205 */
206 if (clsm->dsk_gen != clsm->lsm_tree->dsk_gen)
207 goto open;
208
209 if (txn->isolation == WT_ISO_SNAPSHOT)
210 __wt_txn_cursor_op(session);
211
212 /*
213 * Figure out how many updates are required for
214 * snapshot isolation.
215 *
216 * This is not a normal visibility check on the maximum
217 * transaction ID in each chunk: any transaction ID
218 * that overlaps with our snapshot is a potential
219 * conflict.
220 *
221 * Note that the pinned ID is correct here: it tracks
222 * concurrent transactions excluding special
223 * transactions such as checkpoint (which we can't
224 * conflict with because checkpoint only writes the
225 * metadata, which is not an LSM tree).
226 */
227 clsm->nupdates = 1;
228 if (txn->isolation == WT_ISO_SNAPSHOT &&
229 F_ISSET(clsm, WT_CLSM_OPEN_SNAPSHOT)) {
230 WT_ASSERT(session,
231 F_ISSET(txn, WT_TXN_HAS_SNAPSHOT));
232 pinned_id =
233 WT_SESSION_TXN_STATE(session)->pinned_id;
234 for (i = clsm->nchunks - 2;
235 clsm->nupdates < clsm->nchunks;
236 clsm->nupdates++, i--) {
237 switch_txn =
238 clsm->chunks[i]->switch_txn;
239 if (WT_TXNID_LT(switch_txn, pinned_id))
240 break;
241 WT_ASSERT(session,
242 !__wt_txn_visible_all(
243 session, switch_txn, NULL));
244 }
245 }
246 }
247
248 /*
249 * Stop when we are up-to-date, as long as this is:
250 * - a snapshot isolation update and the cursor is set up for
251 * that;
252 * - an update operation with a primary chunk, or
253 * - a read operation and the cursor is open for reading.
254 */
255 if ((!update ||
256 txn->isolation != WT_ISO_SNAPSHOT ||
257 F_ISSET(clsm, WT_CLSM_OPEN_SNAPSHOT)) &&
258 ((update && clsm->primary_chunk != NULL) ||
259 (!update && F_ISSET(clsm, WT_CLSM_OPEN_READ))))
260 break;
261
262 open: WT_WITH_SCHEMA_LOCK(session,
263 ret = __clsm_open_cursors(clsm, update, 0, 0));
264 WT_RET(ret);
265 }
266
267 if (!F_ISSET(clsm, WT_CLSM_ACTIVE)) {
268 /*
269 * Opening this LSM cursor has opened a number of btree
270 * cursors, ensure other code doesn't think this is the first
271 * cursor in a session.
272 */
273 ++session->ncursors;
274 WT_RET(__cursor_enter(session));
275 F_SET(clsm, WT_CLSM_ACTIVE);
276 }
277
278 return (0);
279 }
280
281 /*
282 * __clsm_leave --
283 * Finish an operation on an LSM cursor.
284 */
285 static void
__clsm_leave(WT_CURSOR_LSM * clsm)286 __clsm_leave(WT_CURSOR_LSM *clsm)
287 {
288 WT_SESSION_IMPL *session;
289
290 session = (WT_SESSION_IMPL *)clsm->iface.session;
291
292 if (F_ISSET(clsm, WT_CLSM_ACTIVE)) {
293 --session->ncursors;
294 __cursor_leave(session);
295 F_CLR(clsm, WT_CLSM_ACTIVE);
296 }
297 }
298
299 /*
300 * We need a tombstone to mark deleted records, and we use the special
301 * value below for that purpose. We use two 0x14 (Device Control 4) bytes to
302 * minimize the likelihood of colliding with an application-chosen encoding
303 * byte, if the application uses two leading DC4 byte for some reason, we'll do
304 * a wasted data copy each time a new value is inserted into the object.
305 */
306 static const WT_ITEM __tombstone = { "\x14\x14", 2, NULL, 0, 0 };
307
308 /*
309 * __clsm_deleted --
310 * Check whether the current value is a tombstone.
311 */
312 static inline bool
__clsm_deleted(WT_CURSOR_LSM * clsm,const WT_ITEM * item)313 __clsm_deleted(WT_CURSOR_LSM *clsm, const WT_ITEM *item)
314 {
315 return (!F_ISSET(clsm, WT_CLSM_MINOR_MERGE) &&
316 item->size == __tombstone.size &&
317 memcmp(item->data, __tombstone.data, __tombstone.size) == 0);
318 }
319
320 /*
321 * __clsm_deleted_encode --
322 * Encode values that are in the encoded name space.
323 */
324 static inline int
__clsm_deleted_encode(WT_SESSION_IMPL * session,const WT_ITEM * value,WT_ITEM * final_value,WT_ITEM ** tmpp)325 __clsm_deleted_encode(WT_SESSION_IMPL *session,
326 const WT_ITEM *value, WT_ITEM *final_value, WT_ITEM **tmpp)
327 {
328 WT_ITEM *tmp;
329
330 /*
331 * If value requires encoding, get a scratch buffer of the right size
332 * and create a copy of the data with the first byte of the tombstone
333 * appended.
334 */
335 if (value->size >= __tombstone.size &&
336 memcmp(value->data, __tombstone.data, __tombstone.size) == 0) {
337 WT_RET(__wt_scr_alloc(session, value->size + 1, tmpp));
338 tmp = *tmpp;
339
340 memcpy(tmp->mem, value->data, value->size);
341 memcpy((uint8_t *)tmp->mem + value->size, __tombstone.data, 1);
342 final_value->data = tmp->mem;
343 final_value->size = value->size + 1;
344 } else {
345 final_value->data = value->data;
346 final_value->size = value->size;
347 }
348
349 return (0);
350 }
351
352 /*
353 * __clsm_deleted_decode --
354 * Decode values that start with the tombstone.
355 */
356 static inline void
__clsm_deleted_decode(WT_CURSOR_LSM * clsm,WT_ITEM * value)357 __clsm_deleted_decode(WT_CURSOR_LSM *clsm, WT_ITEM *value)
358 {
359 /*
360 * Take care with this check: when an LSM cursor is used for a merge,
361 * and/or to create a Bloom filter, it is valid to return the tombstone
362 * value.
363 */
364 if (!F_ISSET(clsm, WT_CLSM_MERGE) &&
365 value->size > __tombstone.size &&
366 memcmp(value->data, __tombstone.data, __tombstone.size) == 0)
367 --value->size;
368 }
369
370 /*
371 * __clsm_close_cursors --
372 * Close any btree cursors that are not needed.
373 */
374 static int
__clsm_close_cursors(WT_SESSION_IMPL * session,WT_CURSOR_LSM * clsm,u_int start,u_int end)375 __clsm_close_cursors(
376 WT_SESSION_IMPL *session, WT_CURSOR_LSM *clsm, u_int start, u_int end)
377 {
378 WT_BLOOM *bloom;
379 WT_CURSOR *c;
380 u_int i;
381
382 __wt_verbose(session, WT_VERB_LSM,
383 "LSM closing cursor session(%p):clsm(%p), start: %u, end: %u",
384 (void *)session, (void *)clsm, start, end);
385
386 if (clsm->chunks == NULL || clsm->nchunks == 0)
387 return (0);
388
389 /*
390 * Walk the cursors, closing any we don't need. Note that the exit
391 * condition here is special, don't use WT_FORALL_CURSORS, and be
392 * careful with unsigned integer wrapping.
393 */
394 for (i = start; i < end; i++) {
395 if ((c = (clsm)->chunks[i]->cursor) != NULL) {
396 clsm->chunks[i]->cursor = NULL;
397 WT_RET(c->close(c));
398 }
399 if ((bloom = clsm->chunks[i]->bloom) != NULL) {
400 clsm->chunks[i]->bloom = NULL;
401 WT_RET(__wt_bloom_close(bloom));
402 }
403 }
404
405 return (0);
406 }
407
408 /*
409 * __clsm_resize_chunks --
410 * Allocates an array of unit objects for each chunk.
411 */
412 static int
__clsm_resize_chunks(WT_SESSION_IMPL * session,WT_CURSOR_LSM * clsm,u_int nchunks)413 __clsm_resize_chunks(
414 WT_SESSION_IMPL *session, WT_CURSOR_LSM *clsm, u_int nchunks)
415 {
416 WT_LSM_CURSOR_CHUNK *chunk;
417
418 /* Don't allocate more iterators if we don't need them. */
419 if (clsm->chunks_count >= nchunks)
420 return (0);
421
422 WT_RET(__wt_realloc_def(session, &clsm->chunks_alloc, nchunks,
423 &clsm->chunks));
424 for (; clsm->chunks_count < nchunks; clsm->chunks_count++) {
425 WT_RET(__wt_calloc_one(session, &chunk));
426 clsm->chunks[clsm->chunks_count] = chunk;
427 }
428 return (0);
429 }
430
431 /*
432 * __clsm_free_chunks --
433 * Allocates an array of unit objects for each chunk.
434 */
435 static void
__clsm_free_chunks(WT_SESSION_IMPL * session,WT_CURSOR_LSM * clsm)436 __clsm_free_chunks(WT_SESSION_IMPL *session, WT_CURSOR_LSM *clsm)
437 {
438 size_t i;
439
440 for (i = 0; i < clsm->chunks_count; i++)
441 __wt_free(session, clsm->chunks[i]);
442
443 __wt_free(session, clsm->chunks);
444 }
445
446 /*
447 * __clsm_open_cursors --
448 * Open cursors for the current set of files.
449 */
450 static int
__clsm_open_cursors(WT_CURSOR_LSM * clsm,bool update,u_int start_chunk,uint32_t start_id)451 __clsm_open_cursors(
452 WT_CURSOR_LSM *clsm, bool update, u_int start_chunk, uint32_t start_id)
453 {
454 WT_BTREE *btree;
455 WT_CURSOR *c, *cursor, *primary;
456 WT_DECL_RET;
457 WT_LSM_CHUNK *chunk;
458 WT_LSM_TREE *lsm_tree;
459 WT_SESSION_IMPL *session;
460 WT_TXN *txn;
461 uint64_t saved_gen;
462 u_int close_range_end, close_range_start;
463 u_int i, nchunks, ngood, nupdates;
464 const char *checkpoint, *ckpt_cfg[3];
465 bool locked;
466
467 c = &clsm->iface;
468 cursor = NULL;
469 session = (WT_SESSION_IMPL *)c->session;
470 txn = &session->txn;
471 chunk = NULL;
472 locked = false;
473 lsm_tree = clsm->lsm_tree;
474
475 /*
476 * Ensure that any snapshot update has cursors on the right set of
477 * chunks to guarantee visibility is correct.
478 */
479 if (update && txn->isolation == WT_ISO_SNAPSHOT)
480 F_SET(clsm, WT_CLSM_OPEN_SNAPSHOT);
481
482 /*
483 * Query operations need a full set of cursors. Overwrite cursors
484 * do queries in service of updates.
485 */
486 if (!update || !F_ISSET(c, WT_CURSTD_OVERWRITE))
487 F_SET(clsm, WT_CLSM_OPEN_READ);
488
489 if (lsm_tree->nchunks == 0)
490 return (0);
491
492 ckpt_cfg[0] = WT_CONFIG_BASE(session, WT_SESSION_open_cursor);
493 ckpt_cfg[1] = "checkpoint=" WT_CHECKPOINT ",raw";
494 ckpt_cfg[2] = NULL;
495
496 /*
497 * If the key is pointing to memory that is pinned by a chunk
498 * cursor, take a copy before closing cursors.
499 */
500 if (F_ISSET(c, WT_CURSTD_KEY_INT))
501 WT_ERR(__cursor_needkey(c));
502
503 F_CLR(clsm, WT_CLSM_ITERATE_NEXT | WT_CLSM_ITERATE_PREV);
504
505 __wt_lsm_tree_readlock(session, lsm_tree);
506 locked = true;
507
508 /* Merge cursors have already figured out how many chunks they need. */
509 retry: if (F_ISSET(clsm, WT_CLSM_MERGE)) {
510 nchunks = clsm->nchunks;
511 ngood = 0;
512 WT_ERR(__clsm_resize_chunks(session, clsm, nchunks));
513 /*
514 * We may have raced with another merge completing. Check that
515 * we're starting at the right offset in the chunk array.
516 */
517 if (start_chunk >= lsm_tree->nchunks ||
518 lsm_tree->chunk[start_chunk]->id != start_id) {
519 for (start_chunk = 0;
520 start_chunk < lsm_tree->nchunks;
521 start_chunk++) {
522 chunk = lsm_tree->chunk[start_chunk];
523 if (chunk->id == start_id)
524 break;
525 }
526 /* We have to find the start chunk: merge locked it. */
527 WT_ASSERT(session, start_chunk < lsm_tree->nchunks);
528 }
529 } else {
530 nchunks = lsm_tree->nchunks;
531 WT_ERR(__clsm_resize_chunks(session, clsm, nchunks));
532
533 /*
534 * If we are only opening the cursor for updates, only open the
535 * primary chunk, plus any other chunks that might be required
536 * to detect snapshot isolation conflicts.
537 */
538 if (F_ISSET(clsm, WT_CLSM_OPEN_READ))
539 ngood = nupdates = 0;
540 else if (F_ISSET(clsm, WT_CLSM_OPEN_SNAPSHOT)) {
541 /*
542 * Keep going until all updates in the next
543 * chunk are globally visible. Copy the maximum
544 * transaction IDs into the cursor as we go.
545 */
546 for (ngood = nchunks - 1, nupdates = 1; ngood > 0;
547 ngood--, nupdates++) {
548 chunk = lsm_tree->chunk[ngood - 1];
549 clsm->chunks[ngood - 1]->switch_txn =
550 chunk->switch_txn;
551 if (__wt_lsm_chunk_visible_all(session, chunk))
552 break;
553 }
554 } else {
555 nupdates = 1;
556 ngood = nchunks - 1;
557 }
558
559 /* Check how many cursors are already open. */
560 for (; ngood < clsm->nchunks && ngood < nchunks; ngood++) {
561 chunk = lsm_tree->chunk[ngood];
562 cursor = clsm->chunks[ngood]->cursor;
563
564 /* If the cursor isn't open yet, we're done. */
565 if (cursor == NULL)
566 break;
567
568 /* Easy case: the URIs don't match. */
569 if (strcmp(cursor->uri, chunk->uri) != 0)
570 break;
571
572 /*
573 * Make sure the checkpoint config matches when not
574 * using a custom data source.
575 */
576 if (lsm_tree->custom_generation == 0 ||
577 chunk->generation < lsm_tree->custom_generation) {
578 checkpoint = ((WT_CURSOR_BTREE *)cursor)->
579 btree->dhandle->checkpoint;
580 if (checkpoint == NULL &&
581 F_ISSET(chunk, WT_LSM_CHUNK_ONDISK) &&
582 !chunk->empty)
583 break;
584 }
585
586 /* Make sure the Bloom config matches. */
587 if (clsm->chunks[ngood]->bloom == NULL &&
588 F_ISSET(chunk, WT_LSM_CHUNK_BLOOM))
589 break;
590 }
591
592 /* Spurious generation bump? */
593 if (ngood == clsm->nchunks && clsm->nchunks == nchunks) {
594 clsm->dsk_gen = lsm_tree->dsk_gen;
595 goto err;
596 }
597
598 /*
599 * Close any cursors we no longer need.
600 *
601 * Drop the LSM tree lock while we do this: if the cache is
602 * full, we may block while closing a cursor. Save the
603 * generation number and retry if it has changed under us.
604 */
605 if (clsm->chunks != NULL && ngood < clsm->nchunks) {
606 close_range_start = ngood;
607 close_range_end = clsm->nchunks;
608 } else if (!F_ISSET(clsm, WT_CLSM_OPEN_READ) && nupdates > 0) {
609 close_range_start = 0;
610 close_range_end = WT_MIN(nchunks, clsm->nchunks);
611 if (close_range_end > nupdates)
612 close_range_end -= nupdates;
613 else
614 close_range_end = 0;
615 WT_ASSERT(session, ngood >= close_range_end);
616 } else {
617 close_range_end = 0;
618 close_range_start = 0;
619 }
620 if (close_range_end > close_range_start) {
621 saved_gen = lsm_tree->dsk_gen;
622 locked = false;
623 __wt_lsm_tree_readunlock(session, lsm_tree);
624 WT_ERR(__clsm_close_cursors(session,
625 clsm, close_range_start, close_range_end));
626 __wt_lsm_tree_readlock(session, lsm_tree);
627 locked = true;
628 if (lsm_tree->dsk_gen != saved_gen)
629 goto retry;
630 }
631
632 /* Detach from our old primary. */
633 clsm->primary_chunk = NULL;
634 clsm->current = NULL;
635 }
636
637 WT_ASSERT(session, start_chunk + nchunks <= lsm_tree->nchunks);
638 clsm->nchunks = nchunks;
639
640 /* Open the cursors for chunks that have changed. */
641 __wt_verbose(session, WT_VERB_LSM,
642 "LSM opening cursor session(%p):clsm(%p)%s, chunks: %u, good: %u",
643 (void *)session, (void *)clsm,
644 update ? ", update" : "", nchunks, ngood);
645 for (i = ngood; i != nchunks; i++) {
646 chunk = lsm_tree->chunk[i + start_chunk];
647 /* Copy the maximum transaction ID. */
648 if (F_ISSET(clsm, WT_CLSM_OPEN_SNAPSHOT))
649 clsm->chunks[i]->switch_txn = chunk->switch_txn;
650
651 /*
652 * Read from the checkpoint if the file has been written.
653 * Once all cursors switch, the in-memory tree can be evicted.
654 */
655 WT_ASSERT(session, clsm->chunks[i]->cursor == NULL);
656 ret = __wt_open_cursor(session, chunk->uri, c,
657 (F_ISSET(chunk, WT_LSM_CHUNK_ONDISK) && !chunk->empty) ?
658 ckpt_cfg : NULL, &clsm->chunks[i]->cursor);
659
660 /*
661 * XXX kludge: we may have an empty chunk where no checkpoint
662 * was written. If so, try to open the ordinary handle on that
663 * chunk instead.
664 */
665 if (ret == WT_NOTFOUND && F_ISSET(chunk, WT_LSM_CHUNK_ONDISK)) {
666 ret = __wt_open_cursor(session,
667 chunk->uri, c, NULL, &clsm->chunks[i]->cursor);
668 if (ret == 0)
669 chunk->empty = 1;
670 }
671 WT_ERR(ret);
672
673 /*
674 * Setup all cursors other than the primary to only do conflict
675 * checks on insert operations. This allows us to execute
676 * inserts on non-primary chunks as a way of checking for
677 * write conflicts with concurrent updates.
678 */
679 if (i != nchunks - 1)
680 clsm->chunks[i]->cursor->insert =
681 __wt_curfile_insert_check;
682
683 if (!F_ISSET(clsm, WT_CLSM_MERGE) &&
684 F_ISSET(chunk, WT_LSM_CHUNK_BLOOM))
685 WT_ERR(__wt_bloom_open(session, chunk->bloom_uri,
686 lsm_tree->bloom_bit_count,
687 lsm_tree->bloom_hash_count,
688 c, &clsm->chunks[i]->bloom));
689
690 /* Child cursors always use overwrite and raw mode. */
691 F_SET(clsm->chunks[i]->cursor,
692 WT_CURSTD_OVERWRITE | WT_CURSTD_RAW);
693 }
694
695 /* Setup the count values for each chunk in the chunks */
696 for (i = 0; i != clsm->nchunks; i++)
697 clsm->chunks[i]->count =
698 lsm_tree->chunk[i + start_chunk]->count;
699
700 /* The last chunk is our new primary. */
701 if (chunk != NULL &&
702 !F_ISSET(chunk, WT_LSM_CHUNK_ONDISK) &&
703 chunk->switch_txn == WT_TXN_NONE) {
704 primary = clsm->chunks[clsm->nchunks - 1]->cursor;
705 btree = ((WT_CURSOR_BTREE *)primary)->btree;
706
707 /*
708 * If the primary is not yet set as the primary, do that now.
709 * Note that eviction was configured off when the underlying
710 * object was created, which is what we want, leave it alone.
711 *
712 * We don't have to worry about races here: every thread that
713 * modifies the tree will have to come through here, at worse
714 * we set the flag repeatedly. We don't use a WT_BTREE handle
715 * flag, however, we could race doing the read-modify-write of
716 * the flags field.
717 *
718 * If something caused the chunk to be closed and reopened
719 * since it was created, we can no longer use it as a primary
720 * chunk and we need to force a switch. We detect the tree was
721 * created when it was opened by checking the "original" flag.
722 */
723 if (!btree->lsm_primary && btree->original)
724 btree->lsm_primary = true;
725 if (btree->lsm_primary)
726 clsm->primary_chunk = chunk;
727 }
728
729 clsm->dsk_gen = lsm_tree->dsk_gen;
730
731 err:
732 #ifdef HAVE_DIAGNOSTIC
733 /* Check that all cursors are open as expected. */
734 if (ret == 0 && F_ISSET(clsm, WT_CLSM_OPEN_READ)) {
735 for (i = 0; i != clsm->nchunks; i++) {
736 cursor = clsm->chunks[i]->cursor;
737 chunk = lsm_tree->chunk[i + start_chunk];
738
739 /* Make sure the first cursor is open. */
740 WT_ASSERT(session, cursor != NULL);
741
742 /* Easy case: the URIs should match. */
743 WT_ASSERT(
744 session, strcmp(cursor->uri, chunk->uri) == 0);
745
746 /*
747 * Make sure the checkpoint config matches when not
748 * using a custom data source.
749 */
750 if (lsm_tree->custom_generation == 0 ||
751 chunk->generation < lsm_tree->custom_generation) {
752 checkpoint = ((WT_CURSOR_BTREE *)cursor)->
753 btree->dhandle->checkpoint;
754 WT_ASSERT(session,
755 (F_ISSET(chunk, WT_LSM_CHUNK_ONDISK) &&
756 !chunk->empty) ?
757 checkpoint != NULL : checkpoint == NULL);
758 }
759
760 /* Make sure the Bloom config matches. */
761 WT_ASSERT(session,
762 (F_ISSET(chunk, WT_LSM_CHUNK_BLOOM) &&
763 !F_ISSET(clsm, WT_CLSM_MERGE)) ?
764 clsm->chunks[i]->bloom != NULL :
765 clsm->chunks[i]->bloom == NULL);
766 }
767 }
768 #endif
769 if (locked)
770 __wt_lsm_tree_readunlock(session, lsm_tree);
771 return (ret);
772 }
773
774 /*
775 * __wt_clsm_init_merge --
776 * Initialize an LSM cursor for a merge.
777 */
778 int
__wt_clsm_init_merge(WT_CURSOR * cursor,u_int start_chunk,uint32_t start_id,u_int nchunks)779 __wt_clsm_init_merge(
780 WT_CURSOR *cursor, u_int start_chunk, uint32_t start_id, u_int nchunks)
781 {
782 WT_CURSOR_LSM *clsm;
783 WT_DECL_RET;
784 WT_SESSION_IMPL *session;
785
786 clsm = (WT_CURSOR_LSM *)cursor;
787 session = (WT_SESSION_IMPL *)cursor->session;
788
789 F_SET(clsm, WT_CLSM_MERGE);
790 if (start_chunk != 0)
791 F_SET(clsm, WT_CLSM_MINOR_MERGE);
792 clsm->nchunks = nchunks;
793
794 WT_WITH_SCHEMA_LOCK(session,
795 ret = __clsm_open_cursors(clsm, false, start_chunk, start_id));
796 return (ret);
797 }
798
799 /*
800 * __clsm_get_current --
801 * Find the smallest / largest of the cursors and copy its key/value.
802 */
803 static int
__clsm_get_current(WT_SESSION_IMPL * session,WT_CURSOR_LSM * clsm,bool smallest,bool * deletedp)804 __clsm_get_current(WT_SESSION_IMPL *session,
805 WT_CURSOR_LSM *clsm, bool smallest, bool *deletedp)
806 {
807 WT_CURSOR *c, *current;
808 u_int i;
809 int cmp;
810 bool multiple;
811
812 current = NULL;
813 multiple = false;
814
815 WT_FORALL_CURSORS(clsm, c, i) {
816 if (!F_ISSET(c, WT_CURSTD_KEY_INT))
817 continue;
818 if (current == NULL) {
819 current = c;
820 continue;
821 }
822 WT_RET(WT_LSM_CURCMP(session, clsm->lsm_tree, c, current, cmp));
823 if (smallest ? cmp < 0 : cmp > 0) {
824 current = c;
825 multiple = false;
826 } else if (cmp == 0)
827 multiple = true;
828 }
829
830 c = &clsm->iface;
831 if ((clsm->current = current) == NULL) {
832 F_CLR(c, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET);
833 return (WT_NOTFOUND);
834 }
835
836 if (multiple)
837 F_SET(clsm, WT_CLSM_MULTIPLE);
838 else
839 F_CLR(clsm, WT_CLSM_MULTIPLE);
840
841 WT_RET(current->get_key(current, &c->key));
842 WT_RET(current->get_value(current, &c->value));
843
844 F_CLR(c, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET);
845 if ((*deletedp = __clsm_deleted(clsm, &c->value)) == false)
846 F_SET(c, WT_CURSTD_KEY_INT | WT_CURSTD_VALUE_INT);
847
848 return (0);
849 }
850
851 /*
852 * __clsm_compare --
853 * WT_CURSOR->compare implementation for the LSM cursor type.
854 */
855 static int
__clsm_compare(WT_CURSOR * a,WT_CURSOR * b,int * cmpp)856 __clsm_compare(WT_CURSOR *a, WT_CURSOR *b, int *cmpp)
857 {
858 WT_CURSOR_LSM *alsm;
859 WT_DECL_RET;
860 WT_SESSION_IMPL *session;
861
862 /* There's no need to sync with the LSM tree, avoid WT_LSM_ENTER. */
863 alsm = (WT_CURSOR_LSM *)a;
864 CURSOR_API_CALL(a, session, compare, NULL);
865
866 /*
867 * Confirm both cursors refer to the same source and have keys, then
868 * compare the keys.
869 */
870 if (strcmp(a->uri, b->uri) != 0)
871 WT_ERR_MSG(session, EINVAL,
872 "comparison method cursors must reference the same object");
873
874 WT_ERR(__cursor_needkey(a));
875 WT_ERR(__cursor_needkey(b));
876
877 WT_ERR(__wt_compare(
878 session, alsm->lsm_tree->collator, &a->key, &b->key, cmpp));
879
880 err: API_END_RET(session, ret);
881 }
882
883 /*
884 * __clsm_position_chunk --
885 * Position a chunk cursor.
886 */
887 static int
__clsm_position_chunk(WT_CURSOR_LSM * clsm,WT_CURSOR * c,bool forward,int * cmpp)888 __clsm_position_chunk(
889 WT_CURSOR_LSM *clsm, WT_CURSOR *c, bool forward, int *cmpp)
890 {
891 WT_CURSOR *cursor;
892 WT_SESSION_IMPL *session;
893
894 cursor = &clsm->iface;
895 session = (WT_SESSION_IMPL *)cursor->session;
896
897 c->set_key(c, &cursor->key);
898 WT_RET(c->search_near(c, cmpp));
899
900 while (forward ? *cmpp < 0 : *cmpp > 0) {
901 WT_RET(forward ? c->next(c) : c->prev(c));
902
903 /*
904 * With higher isolation levels, where we have stable reads,
905 * we're done: the cursor is now positioned as expected.
906 *
907 * With read-uncommitted isolation, a new record could have
908 * appeared in between the search and stepping forward / back.
909 * In that case, keep going until we see a key in the expected
910 * range.
911 */
912 if (session->txn.isolation != WT_ISO_READ_UNCOMMITTED)
913 return (0);
914
915 WT_RET(WT_LSM_CURCMP(session,
916 clsm->lsm_tree, c, cursor, *cmpp));
917 }
918
919 return (0);
920 }
921
922 /*
923 * __clsm_next --
924 * WT_CURSOR->next method for the LSM cursor type.
925 */
926 static int
__clsm_next(WT_CURSOR * cursor)927 __clsm_next(WT_CURSOR *cursor)
928 {
929 WT_CURSOR *c;
930 WT_CURSOR_LSM *clsm;
931 WT_DECL_RET;
932 WT_SESSION_IMPL *session;
933 u_int i;
934 int cmp;
935 bool deleted;
936
937 clsm = (WT_CURSOR_LSM *)cursor;
938
939 CURSOR_API_CALL(cursor, session, next, NULL);
940 __cursor_novalue(cursor);
941 WT_ERR(__clsm_enter(clsm, false, false));
942
943 /* If we aren't positioned for a forward scan, get started. */
944 if (clsm->current == NULL || !F_ISSET(clsm, WT_CLSM_ITERATE_NEXT)) {
945 WT_FORALL_CURSORS(clsm, c, i) {
946 if (!F_ISSET(cursor, WT_CURSTD_KEY_SET)) {
947 WT_ERR(c->reset(c));
948 ret = c->next(c);
949 } else if (c != clsm->current && (ret =
950 __clsm_position_chunk(clsm, c, true, &cmp)) == 0 &&
951 cmp == 0 && clsm->current == NULL)
952 clsm->current = c;
953 WT_ERR_NOTFOUND_OK(ret);
954 }
955 F_SET(clsm, WT_CLSM_ITERATE_NEXT | WT_CLSM_MULTIPLE);
956 F_CLR(clsm, WT_CLSM_ITERATE_PREV);
957
958 /* We just positioned *at* the key, now move. */
959 if (clsm->current != NULL)
960 goto retry;
961 } else {
962 retry: /*
963 * If there are multiple cursors on that key, move them
964 * forward.
965 */
966 if (F_ISSET(clsm, WT_CLSM_MULTIPLE)) {
967 WT_FORALL_CURSORS(clsm, c, i) {
968 if (!F_ISSET(c, WT_CURSTD_KEY_INT))
969 continue;
970 if (c != clsm->current) {
971 WT_ERR(WT_LSM_CURCMP(session,
972 clsm->lsm_tree, c, clsm->current,
973 cmp));
974 if (cmp == 0)
975 WT_ERR_NOTFOUND_OK(c->next(c));
976 }
977 }
978 }
979
980 /* Move the smallest cursor forward. */
981 c = clsm->current;
982 WT_ERR_NOTFOUND_OK(c->next(c));
983 }
984
985 /* Find the cursor(s) with the smallest key. */
986 if ((ret = __clsm_get_current(session, clsm, true, &deleted)) == 0 &&
987 deleted)
988 goto retry;
989
990 err: __clsm_leave(clsm);
991 if (ret == 0)
992 __clsm_deleted_decode(clsm, &cursor->value);
993 API_END_RET(session, ret);
994 }
995
996 /*
997 * __clsm_random_chunk --
998 * Pick a chunk at random, weighted by the size of all chunks. Weighting
999 * proportional to documents avoids biasing towards small chunks. Then return
1000 * the cursor on the chunk we have picked.
1001 */
1002 static int
__clsm_random_chunk(WT_SESSION_IMPL * session,WT_CURSOR_LSM * clsm,WT_CURSOR ** cursor)1003 __clsm_random_chunk(WT_SESSION_IMPL *session,
1004 WT_CURSOR_LSM *clsm, WT_CURSOR **cursor)
1005 {
1006 uint64_t checked_docs, i, rand_doc, total_docs;
1007
1008 /*
1009 * If the tree is empty we cannot do a random lookup, so return a
1010 * WT_NOTFOUND.
1011 */
1012 if (clsm->nchunks == 0)
1013 return (WT_NOTFOUND);
1014 for (total_docs = i = 0; i < clsm->nchunks; i++) {
1015 total_docs += clsm->chunks[i]->count;
1016 }
1017 if (total_docs == 0)
1018 return (WT_NOTFOUND);
1019
1020 rand_doc = __wt_random(&session->rnd) % total_docs;
1021
1022 for (checked_docs = i = 0; i < clsm->nchunks; i++) {
1023 checked_docs += clsm->chunks[i]->count;
1024 if (rand_doc <= checked_docs) {
1025 *cursor = clsm->chunks[i]->cursor;
1026 break;
1027 }
1028 }
1029 return (0);
1030 }
1031
1032 /*
1033 * __clsm_next_random --
1034 * WT_CURSOR->next method for the LSM cursor type when configured with
1035 * next_random.
1036 */
1037 static int
__clsm_next_random(WT_CURSOR * cursor)1038 __clsm_next_random(WT_CURSOR *cursor)
1039 {
1040 WT_CURSOR *c;
1041 WT_CURSOR_LSM *clsm;
1042 WT_DECL_RET;
1043 WT_SESSION_IMPL *session;
1044 int exact;
1045
1046 c = NULL;
1047 clsm = (WT_CURSOR_LSM *)cursor;
1048
1049 CURSOR_API_CALL(cursor, session, next, NULL);
1050 __cursor_novalue(cursor);
1051 WT_ERR(__clsm_enter(clsm, false, false));
1052
1053 for (;;) {
1054 WT_ERR(__clsm_random_chunk(session, clsm, &c));
1055 /*
1056 * This call to next_random on the chunk can potentially end in
1057 * WT_NOTFOUND if the chunk we picked is empty. We want to retry
1058 * in that case.
1059 */
1060 ret = __wt_curfile_next_random(c);
1061 if (ret == WT_NOTFOUND)
1062 continue;
1063
1064 WT_ERR(ret);
1065 F_SET(cursor, WT_CURSTD_KEY_INT);
1066 WT_ERR(c->get_key(c, &cursor->key));
1067 /*
1068 * Search near the current key to resolve any tombstones
1069 * and position to a valid document. If we see a
1070 * WT_NOTFOUND here that is valid, as the tree has no
1071 * documents visible to us.
1072 */
1073 WT_ERR(__clsm_search_near(cursor, &exact));
1074 break;
1075 }
1076
1077 /* We have found a valid doc. Set that we are now positioned */
1078 if (0) {
1079 err: F_CLR(cursor, WT_CURSTD_KEY_INT | WT_CURSTD_VALUE_INT);
1080 }
1081 __clsm_leave(clsm);
1082 API_END_RET(session, ret);
1083 }
1084
1085 /*
1086 * __clsm_prev --
1087 * WT_CURSOR->prev method for the LSM cursor type.
1088 */
1089 static int
__clsm_prev(WT_CURSOR * cursor)1090 __clsm_prev(WT_CURSOR *cursor)
1091 {
1092 WT_CURSOR *c;
1093 WT_CURSOR_LSM *clsm;
1094 WT_DECL_RET;
1095 WT_SESSION_IMPL *session;
1096 u_int i;
1097 int cmp;
1098 bool deleted;
1099
1100 clsm = (WT_CURSOR_LSM *)cursor;
1101
1102 CURSOR_API_CALL(cursor, session, prev, NULL);
1103 __cursor_novalue(cursor);
1104 WT_ERR(__clsm_enter(clsm, false, false));
1105
1106 /* If we aren't positioned for a reverse scan, get started. */
1107 if (clsm->current == NULL || !F_ISSET(clsm, WT_CLSM_ITERATE_PREV)) {
1108 WT_FORALL_CURSORS(clsm, c, i) {
1109 if (!F_ISSET(cursor, WT_CURSTD_KEY_SET)) {
1110 WT_ERR(c->reset(c));
1111 ret = c->prev(c);
1112 } else if (c != clsm->current && (ret =
1113 __clsm_position_chunk(clsm, c, false, &cmp)) == 0 &&
1114 cmp == 0 && clsm->current == NULL)
1115 clsm->current = c;
1116 WT_ERR_NOTFOUND_OK(ret);
1117 }
1118 F_SET(clsm, WT_CLSM_ITERATE_PREV | WT_CLSM_MULTIPLE);
1119 F_CLR(clsm, WT_CLSM_ITERATE_NEXT);
1120
1121 /* We just positioned *at* the key, now move. */
1122 if (clsm->current != NULL)
1123 goto retry;
1124 } else {
1125 retry: /*
1126 * If there are multiple cursors on that key, move them
1127 * backwards.
1128 */
1129 if (F_ISSET(clsm, WT_CLSM_MULTIPLE)) {
1130 WT_FORALL_CURSORS(clsm, c, i) {
1131 if (!F_ISSET(c, WT_CURSTD_KEY_INT))
1132 continue;
1133 if (c != clsm->current) {
1134 WT_ERR(WT_LSM_CURCMP(session,
1135 clsm->lsm_tree, c, clsm->current,
1136 cmp));
1137 if (cmp == 0)
1138 WT_ERR_NOTFOUND_OK(c->prev(c));
1139 }
1140 }
1141 }
1142
1143 /* Move the largest cursor backwards. */
1144 c = clsm->current;
1145 WT_ERR_NOTFOUND_OK(c->prev(c));
1146 }
1147
1148 /* Find the cursor(s) with the largest key. */
1149 if ((ret = __clsm_get_current(session, clsm, false, &deleted)) == 0 &&
1150 deleted)
1151 goto retry;
1152
1153 err: __clsm_leave(clsm);
1154 if (ret == 0)
1155 __clsm_deleted_decode(clsm, &cursor->value);
1156 API_END_RET(session, ret);
1157 }
1158
1159 /*
1160 * __clsm_reset_cursors --
1161 * Reset any positioned chunk cursors.
1162 *
1163 * If the skip parameter is non-NULL, that cursor is about to be used, so
1164 * there is no need to reset it.
1165 */
1166 static int
__clsm_reset_cursors(WT_CURSOR_LSM * clsm,WT_CURSOR * skip)1167 __clsm_reset_cursors(WT_CURSOR_LSM *clsm, WT_CURSOR *skip)
1168 {
1169 WT_CURSOR *c;
1170 WT_DECL_RET;
1171 u_int i;
1172
1173 /* Fast path if the cursor is not positioned. */
1174 if ((clsm->current == NULL || clsm->current == skip) &&
1175 !F_ISSET(clsm, WT_CLSM_ITERATE_NEXT | WT_CLSM_ITERATE_PREV))
1176 return (0);
1177
1178 WT_FORALL_CURSORS(clsm, c, i) {
1179 if (c == skip)
1180 continue;
1181 if (F_ISSET(c, WT_CURSTD_KEY_INT))
1182 WT_TRET(c->reset(c));
1183 }
1184
1185 clsm->current = NULL;
1186 F_CLR(clsm, WT_CLSM_ITERATE_NEXT | WT_CLSM_ITERATE_PREV);
1187
1188 return (ret);
1189 }
1190
1191 /*
1192 * __clsm_reset --
1193 * WT_CURSOR->reset method for the LSM cursor type.
1194 */
1195 static int
__clsm_reset(WT_CURSOR * cursor)1196 __clsm_reset(WT_CURSOR *cursor)
1197 {
1198 WT_CURSOR_LSM *clsm;
1199 WT_DECL_RET;
1200 WT_SESSION_IMPL *session;
1201
1202 /*
1203 * Don't use the normal __clsm_enter path: that is wasted work when all
1204 * we want to do is give up our position.
1205 */
1206 clsm = (WT_CURSOR_LSM *)cursor;
1207 CURSOR_API_CALL_PREPARE_ALLOWED(cursor, session, reset, NULL);
1208 F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET);
1209
1210 WT_TRET(__clsm_reset_cursors(clsm, NULL));
1211
1212 /* In case we were left positioned, clear that. */
1213 __clsm_leave(clsm);
1214
1215 err: API_END_RET(session, ret);
1216 }
1217
1218 /*
1219 * __clsm_lookup --
1220 * Position an LSM cursor.
1221 */
1222 static int
__clsm_lookup(WT_CURSOR_LSM * clsm,WT_ITEM * value)1223 __clsm_lookup(WT_CURSOR_LSM *clsm, WT_ITEM *value)
1224 {
1225 WT_BLOOM *bloom;
1226 WT_BLOOM_HASH bhash;
1227 WT_CURSOR *c, *cursor;
1228 WT_DECL_RET;
1229 WT_SESSION_IMPL *session;
1230 u_int i;
1231 bool have_hash;
1232
1233 c = NULL;
1234 cursor = &clsm->iface;
1235 have_hash = false;
1236 session = (WT_SESSION_IMPL *)cursor->session;
1237
1238 WT_FORALL_CURSORS(clsm, c, i) {
1239 /* If there is a Bloom filter, see if we can skip the read. */
1240 bloom = NULL;
1241 if ((bloom = clsm->chunks[i]->bloom) != NULL) {
1242 if (!have_hash) {
1243 __wt_bloom_hash(bloom, &cursor->key, &bhash);
1244 have_hash = true;
1245 }
1246
1247 ret = __wt_bloom_hash_get(bloom, &bhash);
1248 if (ret == WT_NOTFOUND) {
1249 WT_LSM_TREE_STAT_INCR(
1250 session, clsm->lsm_tree->bloom_miss);
1251 continue;
1252 }
1253 if (ret == 0)
1254 WT_LSM_TREE_STAT_INCR(
1255 session, clsm->lsm_tree->bloom_hit);
1256 WT_ERR(ret);
1257 }
1258 c->set_key(c, &cursor->key);
1259 if ((ret = c->search(c)) == 0) {
1260 WT_ERR(c->get_key(c, &cursor->key));
1261 WT_ERR(c->get_value(c, value));
1262 if (__clsm_deleted(clsm, value))
1263 ret = WT_NOTFOUND;
1264 goto done;
1265 }
1266 WT_ERR_NOTFOUND_OK(ret);
1267 F_CLR(c, WT_CURSTD_KEY_SET);
1268 /* Update stats: the active chunk can't have a bloom filter. */
1269 if (bloom != NULL)
1270 WT_LSM_TREE_STAT_INCR(session,
1271 clsm->lsm_tree->bloom_false_positive);
1272 else if (clsm->primary_chunk == NULL || i != clsm->nchunks)
1273 WT_LSM_TREE_STAT_INCR(session,
1274 clsm->lsm_tree->lsm_lookup_no_bloom);
1275 }
1276 WT_ERR(WT_NOTFOUND);
1277
1278 done:
1279 err: if (ret == 0) {
1280 F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET);
1281 F_SET(cursor, WT_CURSTD_KEY_INT);
1282 clsm->current = c;
1283 if (value == &cursor->value)
1284 F_SET(cursor, WT_CURSTD_VALUE_INT);
1285 } else if (c != NULL)
1286 WT_TRET(c->reset(c));
1287
1288 return (ret);
1289 }
1290
1291 /*
1292 * __clsm_search --
1293 * WT_CURSOR->search method for the LSM cursor type.
1294 */
1295 static int
__clsm_search(WT_CURSOR * cursor)1296 __clsm_search(WT_CURSOR *cursor)
1297 {
1298 WT_CURSOR_LSM *clsm;
1299 WT_DECL_RET;
1300 WT_SESSION_IMPL *session;
1301
1302 clsm = (WT_CURSOR_LSM *)cursor;
1303
1304 CURSOR_API_CALL(cursor, session, search, NULL);
1305 WT_ERR(__cursor_needkey(cursor));
1306 __cursor_novalue(cursor);
1307 WT_ERR(__clsm_enter(clsm, true, false));
1308 F_CLR(clsm, WT_CLSM_ITERATE_NEXT | WT_CLSM_ITERATE_PREV);
1309
1310 ret = __clsm_lookup(clsm, &cursor->value);
1311
1312 err: __clsm_leave(clsm);
1313 if (ret == 0)
1314 __clsm_deleted_decode(clsm, &cursor->value);
1315 API_END_RET(session, ret);
1316 }
1317
1318 /*
1319 * __clsm_search_near --
1320 * WT_CURSOR->search_near method for the LSM cursor type.
1321 */
1322 static int
__clsm_search_near(WT_CURSOR * cursor,int * exactp)1323 __clsm_search_near(WT_CURSOR *cursor, int *exactp)
1324 {
1325 WT_CURSOR *c, *closest;
1326 WT_CURSOR_LSM *clsm;
1327 WT_DECL_RET;
1328 WT_SESSION_IMPL *session;
1329 u_int i;
1330 int cmp, exact;
1331 bool deleted;
1332
1333 closest = NULL;
1334 clsm = (WT_CURSOR_LSM *)cursor;
1335 exact = 0;
1336
1337 CURSOR_API_CALL(cursor, session, search_near, NULL);
1338 WT_ERR(__cursor_needkey(cursor));
1339 __cursor_novalue(cursor);
1340 WT_ERR(__clsm_enter(clsm, true, false));
1341 F_CLR(clsm, WT_CLSM_ITERATE_NEXT | WT_CLSM_ITERATE_PREV);
1342
1343 /*
1344 * search_near is somewhat fiddly: we can't just use a nearby key from
1345 * the in-memory chunk because there could be a closer key on disk.
1346 *
1347 * As we search down the chunks, we stop as soon as we find an exact
1348 * match. Otherwise, we maintain the smallest cursor larger than the
1349 * search key and the largest cursor smaller than the search key. At
1350 * the end, we prefer the larger cursor, but if no record is larger,
1351 * position on the last record in the tree.
1352 */
1353 WT_FORALL_CURSORS(clsm, c, i) {
1354 c->set_key(c, &cursor->key);
1355 if ((ret = c->search_near(c, &cmp)) == WT_NOTFOUND) {
1356 ret = 0;
1357 continue;
1358 }
1359 if (ret != 0)
1360 goto err;
1361
1362 /* Do we have an exact match? */
1363 if (cmp == 0) {
1364 closest = c;
1365 exact = 1;
1366 break;
1367 }
1368
1369 /*
1370 * Prefer larger cursors. There are two reasons: (1) we expect
1371 * prefix searches to be a common case (as in our own indices);
1372 * and (2) we need a way to unambiguously know we have the
1373 * "closest" result.
1374 */
1375 if (cmp < 0) {
1376 if ((ret = c->next(c)) == WT_NOTFOUND) {
1377 ret = 0;
1378 continue;
1379 }
1380 if (ret != 0)
1381 goto err;
1382 }
1383
1384 /*
1385 * We are trying to find the smallest cursor greater than the
1386 * search key.
1387 */
1388 if (closest == NULL)
1389 closest = c;
1390 else {
1391 WT_ERR(WT_LSM_CURCMP(session,
1392 clsm->lsm_tree, c, closest, cmp));
1393 if (cmp < 0)
1394 closest = c;
1395 }
1396 }
1397
1398 /*
1399 * At this point, we either have an exact match, or closest is the
1400 * smallest cursor larger than the search key, or it is NULL if the
1401 * search key is larger than any record in the tree.
1402 */
1403 cmp = exact ? 0 : 1;
1404
1405 /*
1406 * If we land on a deleted item, try going forwards or backwards to
1407 * find one that isn't deleted. If the whole tree is empty, we'll
1408 * end up with WT_NOTFOUND, as expected.
1409 */
1410 if (closest == NULL)
1411 deleted = true;
1412 else {
1413 WT_ERR(closest->get_key(closest, &cursor->key));
1414 WT_ERR(closest->get_value(closest, &cursor->value));
1415 clsm->current = closest;
1416 closest = NULL;
1417 deleted = __clsm_deleted(clsm, &cursor->value);
1418 if (!deleted)
1419 __clsm_deleted_decode(clsm, &cursor->value);
1420 else {
1421 /*
1422 * We have a key pointing at memory that is
1423 * pinned by the current chunk cursor. In the
1424 * unlikely event that we have to reopen cursors
1425 * to move to the next record, make sure the cursor
1426 * flags are set so a copy is made before the current
1427 * chunk cursor releases its position.
1428 */
1429 F_CLR(cursor, WT_CURSTD_KEY_SET);
1430 F_SET(cursor, WT_CURSTD_KEY_INT);
1431 /*
1432 * We call __clsm_next here as we want to advance
1433 * forward. If we are a random LSM cursor calling next
1434 * on the cursor will not advance as we intend.
1435 */
1436 if ((ret = __clsm_next(cursor)) == 0) {
1437 cmp = 1;
1438 deleted = false;
1439 }
1440 }
1441 WT_ERR_NOTFOUND_OK(ret);
1442 }
1443 if (deleted) {
1444 clsm->current = NULL;
1445 /*
1446 * We call prev directly here as cursor->prev may be "invalid"
1447 * if this is a random cursor.
1448 */
1449 WT_ERR(__clsm_prev(cursor));
1450 cmp = -1;
1451 }
1452 *exactp = cmp;
1453
1454 err: __clsm_leave(clsm);
1455 if (closest != NULL)
1456 WT_TRET(closest->reset(closest));
1457
1458 F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET);
1459 if (ret == 0) {
1460 F_SET(cursor, WT_CURSTD_KEY_INT | WT_CURSTD_VALUE_INT);
1461 } else
1462 clsm->current = NULL;
1463
1464 API_END_RET(session, ret);
1465 }
1466
1467 /*
1468 * __clsm_put --
1469 * Put an entry into the in-memory tree, trigger a file switch if
1470 * necessary.
1471 */
1472 static inline int
__clsm_put(WT_SESSION_IMPL * session,WT_CURSOR_LSM * clsm,const WT_ITEM * key,const WT_ITEM * value,bool position,bool reserve)1473 __clsm_put(WT_SESSION_IMPL *session, WT_CURSOR_LSM *clsm,
1474 const WT_ITEM *key, const WT_ITEM *value, bool position, bool reserve)
1475 {
1476 WT_CURSOR *c, *primary;
1477 WT_LSM_TREE *lsm_tree;
1478 u_int i, slot;
1479 int (*func)(WT_CURSOR *);
1480
1481 lsm_tree = clsm->lsm_tree;
1482
1483 WT_ASSERT(session,
1484 F_ISSET(&session->txn, WT_TXN_HAS_ID) &&
1485 clsm->primary_chunk != NULL &&
1486 (clsm->primary_chunk->switch_txn == WT_TXN_NONE ||
1487 WT_TXNID_LE(session->txn.id, clsm->primary_chunk->switch_txn)));
1488
1489 /*
1490 * Clear the existing cursor position. Don't clear the primary cursor:
1491 * we're about to use it anyway.
1492 */
1493 primary = clsm->chunks[clsm->nchunks - 1]->cursor;
1494 WT_RET(__clsm_reset_cursors(clsm, primary));
1495
1496 /* If necessary, set the position for future scans. */
1497 if (position)
1498 clsm->current = primary;
1499
1500 for (i = 0, slot = clsm->nchunks - 1; i < clsm->nupdates; i++, slot--) {
1501 /* Check if we need to keep updating old chunks. */
1502 if (i > 0 && __wt_txn_visible(
1503 session, clsm->chunks[slot]->switch_txn, NULL)) {
1504 clsm->nupdates = i;
1505 break;
1506 }
1507
1508 c = clsm->chunks[slot]->cursor;
1509 c->set_key(c, key);
1510 func = c->insert;
1511 if (i == 0 && position)
1512 func = reserve ? c->reserve : c->update;
1513 if (func != c->reserve)
1514 c->set_value(c, value);
1515 WT_RET(func(c));
1516 }
1517
1518 /*
1519 * Update the record count. It is in a shared structure, but it's only
1520 * approximate, so don't worry about protecting access.
1521 *
1522 * Throttle if necessary. Every 100 update operations on each cursor,
1523 * check if throttling is required. Don't rely only on the shared
1524 * counter because it can race, and because for some workloads, there
1525 * may not be enough records per chunk to get effective throttling.
1526 */
1527 if ((++clsm->primary_chunk->count % 100 == 0 ||
1528 ++clsm->update_count >= 100) &&
1529 lsm_tree->merge_throttle + lsm_tree->ckpt_throttle > 0) {
1530 clsm->update_count = 0;
1531 WT_LSM_TREE_STAT_INCRV(session,
1532 lsm_tree->lsm_checkpoint_throttle, lsm_tree->ckpt_throttle);
1533 WT_STAT_CONN_INCRV(session,
1534 lsm_checkpoint_throttle, lsm_tree->ckpt_throttle);
1535 WT_LSM_TREE_STAT_INCRV(session,
1536 lsm_tree->lsm_merge_throttle, lsm_tree->merge_throttle);
1537 WT_STAT_CONN_INCRV(session,
1538 lsm_merge_throttle, lsm_tree->merge_throttle);
1539 __wt_sleep(0,
1540 lsm_tree->ckpt_throttle + lsm_tree->merge_throttle);
1541 }
1542
1543 return (0);
1544 }
1545
1546 /*
1547 * __clsm_insert --
1548 * WT_CURSOR->insert method for the LSM cursor type.
1549 */
1550 static int
__clsm_insert(WT_CURSOR * cursor)1551 __clsm_insert(WT_CURSOR *cursor)
1552 {
1553 WT_CURSOR_LSM *clsm;
1554 WT_DECL_ITEM(buf);
1555 WT_DECL_RET;
1556 WT_ITEM value;
1557 WT_SESSION_IMPL *session;
1558
1559 clsm = (WT_CURSOR_LSM *)cursor;
1560
1561 CURSOR_UPDATE_API_CALL(cursor, session, insert);
1562 WT_ERR(__cursor_needkey(cursor));
1563 WT_ERR(__cursor_needvalue(cursor));
1564 WT_ERR(__clsm_enter(clsm, false, true));
1565
1566 /*
1567 * It isn't necessary to copy the key out after the lookup in this
1568 * case because any non-failed lookup results in an error, and a
1569 * failed lookup leaves the original key intact.
1570 */
1571 if (!F_ISSET(cursor, WT_CURSTD_OVERWRITE) &&
1572 (ret = __clsm_lookup(clsm, &value)) != WT_NOTFOUND) {
1573 if (ret == 0)
1574 ret = WT_DUPLICATE_KEY;
1575 goto err;
1576 }
1577
1578 WT_ERR(__clsm_deleted_encode(session, &cursor->value, &value, &buf));
1579 WT_ERR(__clsm_put(session, clsm, &cursor->key, &value, false, false));
1580
1581 /*
1582 * WT_CURSOR.insert doesn't leave the cursor positioned, and the
1583 * application may want to free the memory used to configure the
1584 * insert; don't read that memory again (matching the underlying
1585 * file object cursor insert semantics).
1586 */
1587 F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET);
1588
1589 err: __wt_scr_free(session, &buf);
1590 __clsm_leave(clsm);
1591 CURSOR_UPDATE_API_END(session, ret);
1592 return (ret);
1593 }
1594
1595 /*
1596 * __clsm_update --
1597 * WT_CURSOR->update method for the LSM cursor type.
1598 */
1599 static int
__clsm_update(WT_CURSOR * cursor)1600 __clsm_update(WT_CURSOR *cursor)
1601 {
1602 WT_CURSOR_LSM *clsm;
1603 WT_DECL_ITEM(buf);
1604 WT_DECL_RET;
1605 WT_ITEM value;
1606 WT_SESSION_IMPL *session;
1607
1608 clsm = (WT_CURSOR_LSM *)cursor;
1609
1610 CURSOR_UPDATE_API_CALL(cursor, session, update);
1611 WT_ERR(__cursor_needkey(cursor));
1612 WT_ERR(__cursor_needvalue(cursor));
1613 WT_ERR(__clsm_enter(clsm, false, true));
1614
1615 if (!F_ISSET(cursor, WT_CURSTD_OVERWRITE)) {
1616 WT_ERR(__clsm_lookup(clsm, &value));
1617 /*
1618 * Copy the key out, since the insert resets non-primary chunk
1619 * cursors which our lookup may have landed on.
1620 */
1621 WT_ERR(__cursor_needkey(cursor));
1622 }
1623 WT_ERR(__clsm_deleted_encode(session, &cursor->value, &value, &buf));
1624 WT_ERR(__clsm_put(session, clsm, &cursor->key, &value, true, false));
1625
1626 /*
1627 * Set the cursor to reference the internal key/value of the positioned
1628 * cursor.
1629 */
1630 F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET);
1631 WT_ITEM_SET(cursor->key, clsm->current->key);
1632 WT_ITEM_SET(cursor->value, clsm->current->value);
1633 WT_ASSERT(session,
1634 F_MASK(clsm->current, WT_CURSTD_KEY_SET) == WT_CURSTD_KEY_INT);
1635 WT_ASSERT(session,
1636 F_MASK(clsm->current, WT_CURSTD_VALUE_SET) == WT_CURSTD_VALUE_INT);
1637 F_SET(cursor, WT_CURSTD_KEY_INT | WT_CURSTD_VALUE_INT);
1638
1639 err: __wt_scr_free(session, &buf);
1640 __clsm_leave(clsm);
1641 CURSOR_UPDATE_API_END(session, ret);
1642 return (ret);
1643 }
1644
1645 /*
1646 * __clsm_remove --
1647 * WT_CURSOR->remove method for the LSM cursor type.
1648 */
1649 static int
__clsm_remove(WT_CURSOR * cursor)1650 __clsm_remove(WT_CURSOR *cursor)
1651 {
1652 WT_CURSOR_LSM *clsm;
1653 WT_DECL_RET;
1654 WT_ITEM value;
1655 WT_SESSION_IMPL *session;
1656 bool positioned;
1657
1658 clsm = (WT_CURSOR_LSM *)cursor;
1659
1660 /* Check if the cursor is positioned. */
1661 positioned = F_ISSET(cursor, WT_CURSTD_KEY_INT);
1662
1663 CURSOR_REMOVE_API_CALL(cursor, session, NULL);
1664 WT_ERR(__cursor_needkey(cursor));
1665 __cursor_novalue(cursor);
1666 WT_ERR(__clsm_enter(clsm, false, true));
1667
1668 if (!F_ISSET(cursor, WT_CURSTD_OVERWRITE)) {
1669 WT_ERR(__clsm_lookup(clsm, &value));
1670 /*
1671 * Copy the key out, since the insert resets non-primary chunk
1672 * cursors which our lookup may have landed on.
1673 */
1674 WT_ERR(__cursor_needkey(cursor));
1675 }
1676 WT_ERR(__clsm_put(
1677 session, clsm, &cursor->key, &__tombstone, positioned, false));
1678
1679 /*
1680 * If the cursor was positioned, it stays positioned with a key but no
1681 * no value, otherwise, there's no position, key or value. This isn't
1682 * just cosmetic, without a reset, iteration on this cursor won't start
1683 * at the beginning/end of the table.
1684 */
1685 F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET);
1686 if (positioned)
1687 F_SET(cursor, WT_CURSTD_KEY_INT);
1688 else
1689 WT_TRET(cursor->reset(cursor));
1690
1691 err: __clsm_leave(clsm);
1692 CURSOR_UPDATE_API_END(session, ret);
1693 return (ret);
1694 }
1695
1696 /*
1697 * __clsm_reserve --
1698 * WT_CURSOR->reserve method for the LSM cursor type.
1699 */
1700 static int
__clsm_reserve(WT_CURSOR * cursor)1701 __clsm_reserve(WT_CURSOR *cursor)
1702 {
1703 WT_CURSOR_LSM *clsm;
1704 WT_DECL_RET;
1705 WT_ITEM value;
1706 WT_SESSION_IMPL *session;
1707
1708 clsm = (WT_CURSOR_LSM *)cursor;
1709
1710 CURSOR_UPDATE_API_CALL(cursor, session, reserve);
1711 WT_ERR(__cursor_needkey(cursor));
1712 __cursor_novalue(cursor);
1713 WT_ERR(__wt_txn_context_check(session, true));
1714 WT_ERR(__clsm_enter(clsm, false, true));
1715
1716 WT_ERR(__clsm_lookup(clsm, &value));
1717 /*
1718 * Copy the key out, since the insert resets non-primary chunk cursors
1719 * which our lookup may have landed on.
1720 */
1721 WT_ERR(__cursor_needkey(cursor));
1722 ret = __clsm_put(session, clsm, &cursor->key, NULL, true, true);
1723
1724 err: __clsm_leave(clsm);
1725 CURSOR_UPDATE_API_END(session, ret);
1726
1727 /*
1728 * The application might do a WT_CURSOR.get_value call when we return,
1729 * so we need a value and the underlying functions didn't set one up.
1730 * For various reasons, those functions may not have done a search and
1731 * any previous value in the cursor might race with WT_CURSOR.reserve
1732 * (and in cases like LSM, the reserve never encountered the original
1733 * key). For simplicity, repeat the search here.
1734 */
1735 return (ret == 0 ? cursor->search(cursor) : ret);
1736 }
1737
1738 /*
1739 * __wt_clsm_close --
1740 * WT_CURSOR->close method for the LSM cursor type.
1741 */
1742 int
__wt_clsm_close(WT_CURSOR * cursor)1743 __wt_clsm_close(WT_CURSOR *cursor)
1744 {
1745 WT_CURSOR_LSM *clsm;
1746 WT_DECL_RET;
1747 WT_SESSION_IMPL *session;
1748
1749 /*
1750 * Don't use the normal __clsm_enter path: that is wasted work when
1751 * closing, and the cursor may never have been used.
1752 */
1753 clsm = (WT_CURSOR_LSM *)cursor;
1754 CURSOR_API_CALL_PREPARE_ALLOWED(cursor, session, close, NULL);
1755 err:
1756
1757 WT_TRET(__clsm_close_cursors(session, clsm, 0, clsm->nchunks));
1758 __clsm_free_chunks(session, clsm);
1759
1760 /* In case we were somehow left positioned, clear that. */
1761 __clsm_leave(clsm);
1762
1763 if (clsm->lsm_tree != NULL)
1764 __wt_lsm_tree_release(session, clsm->lsm_tree);
1765 __wt_cursor_close(cursor);
1766
1767 API_END_RET(session, ret);
1768 }
1769
1770 /*
1771 * __wt_clsm_open --
1772 * WT_SESSION->open_cursor method for LSM cursors.
1773 */
1774 int
__wt_clsm_open(WT_SESSION_IMPL * session,const char * uri,WT_CURSOR * owner,const char * cfg[],WT_CURSOR ** cursorp)1775 __wt_clsm_open(WT_SESSION_IMPL *session,
1776 const char *uri, WT_CURSOR *owner, const char *cfg[], WT_CURSOR **cursorp)
1777 {
1778 WT_CONFIG_ITEM cval;
1779 WT_CURSOR_STATIC_INIT(iface,
1780 __wt_cursor_get_key, /* get-key */
1781 __wt_cursor_get_value, /* get-value */
1782 __wt_cursor_set_key, /* set-key */
1783 __wt_cursor_set_value, /* set-value */
1784 __clsm_compare, /* compare */
1785 __wt_cursor_equals, /* equals */
1786 __clsm_next, /* next */
1787 __clsm_prev, /* prev */
1788 __clsm_reset, /* reset */
1789 __clsm_search, /* search */
1790 __clsm_search_near, /* search-near */
1791 __clsm_insert, /* insert */
1792 __wt_cursor_modify_notsup, /* modify */
1793 __clsm_update, /* update */
1794 __clsm_remove, /* remove */
1795 __clsm_reserve, /* reserve */
1796 __wt_cursor_reconfigure, /* reconfigure */
1797 __wt_cursor_notsup, /* cache */
1798 __wt_cursor_reopen_notsup, /* reopen */
1799 __wt_clsm_close); /* close */
1800 WT_CURSOR *cursor;
1801 WT_CURSOR_LSM *clsm;
1802 WT_DECL_RET;
1803 WT_LSM_TREE *lsm_tree;
1804 bool bulk;
1805
1806 WT_STATIC_ASSERT(offsetof(WT_CURSOR_LSM, iface) == 0);
1807
1808 clsm = NULL;
1809 cursor = NULL;
1810 lsm_tree = NULL;
1811
1812 if (!WT_PREFIX_MATCH(uri, "lsm:"))
1813 return (__wt_unexpected_object_type(session, uri, "lsm:"));
1814
1815 WT_RET(__wt_inmem_unsupported_op(session, "LSM trees"));
1816
1817 WT_RET(__wt_config_gets_def(session, cfg, "checkpoint", 0, &cval));
1818 if (cval.len != 0)
1819 WT_RET_MSG(session, EINVAL,
1820 "LSM does not support opening by checkpoint");
1821
1822 WT_RET(__wt_config_gets_def(session, cfg, "bulk", 0, &cval));
1823 bulk = cval.val != 0;
1824
1825 /* Get the LSM tree. */
1826 ret = __wt_lsm_tree_get(session, uri, bulk, &lsm_tree);
1827
1828 /*
1829 * Check whether the exclusive open for a bulk load succeeded, and
1830 * if it did ensure that it's safe to bulk load into the tree.
1831 */
1832 if (bulk && (ret == EBUSY || (ret == 0 && lsm_tree->nchunks > 1)))
1833 WT_ERR_MSG(session, EINVAL,
1834 "bulk-load is only supported on newly created LSM trees");
1835 /* Flag any errors from the tree get. */
1836 WT_ERR(ret);
1837
1838 /* Make sure we have exclusive access if and only if we want it */
1839 WT_ASSERT(session, !bulk || lsm_tree->excl_session != NULL);
1840
1841 WT_ERR(__wt_calloc_one(session, &clsm));
1842 cursor = (WT_CURSOR *)clsm;
1843 *cursor = iface;
1844 cursor->session = (WT_SESSION *)session;
1845 WT_ERR(__wt_strdup(session, lsm_tree->name, &cursor->uri));
1846 cursor->key_format = lsm_tree->key_format;
1847 cursor->value_format = lsm_tree->value_format;
1848
1849 clsm->lsm_tree = lsm_tree;
1850 lsm_tree = NULL;
1851
1852 /*
1853 * The tree's dsk_gen starts at one, so starting the cursor on zero
1854 * will force a call into open_cursors on the first operation.
1855 */
1856 clsm->dsk_gen = 0;
1857
1858 /* If the next_random option is set, configure a random cursor */
1859 WT_ERR(__wt_config_gets_def(session, cfg, "next_random", 0, &cval));
1860 if (cval.val != 0) {
1861 __wt_cursor_set_notsup(cursor);
1862 cursor->next = __clsm_next_random;
1863 }
1864
1865 WT_ERR(__wt_cursor_init(cursor, cursor->uri, owner, cfg, cursorp));
1866
1867 if (bulk)
1868 WT_ERR(__wt_clsm_open_bulk(clsm, cfg));
1869
1870 if (0) {
1871 err:
1872 if (clsm != NULL)
1873 WT_TRET(__wt_clsm_close(cursor));
1874 else if (lsm_tree != NULL)
1875 __wt_lsm_tree_release(session, lsm_tree);
1876
1877 *cursorp = NULL;
1878 }
1879
1880 return (ret);
1881 }
1882