1 /*-
2  * Copyright (c) 2014-2018 MongoDB, Inc.
3  * Copyright (c) 2008-2014 WiredTiger, Inc.
4  *	All rights reserved.
5  *
6  * See the file LICENSE for redistribution information.
7  */
8 
9 #include "wt_internal.h"
10 
11 #define	WT_FORALL_CURSORS(clsm, c, i)					\
12 	for ((i) = (clsm)->nchunks; (i) > 0;)				\
13 		if (((c) = (clsm)->chunks[--(i)]->cursor) != NULL)
14 
15 #define	WT_LSM_CURCMP(s, lsm_tree, c1, c2, cmp)				\
16 	__wt_compare(s, (lsm_tree)->collator, &(c1)->key, &(c2)->key, &(cmp))
17 
18 static int __clsm_lookup(WT_CURSOR_LSM *, WT_ITEM *);
19 static int __clsm_open_cursors(WT_CURSOR_LSM *, bool, u_int, uint32_t);
20 static int __clsm_reset_cursors(WT_CURSOR_LSM *, WT_CURSOR *);
21 static int __clsm_search_near(WT_CURSOR *cursor, int *exactp);
22 
23 /*
24  * __wt_clsm_request_switch --
25  *	Request an LSM tree switch for a cursor operation.
26  */
27 int
__wt_clsm_request_switch(WT_CURSOR_LSM * clsm)28 __wt_clsm_request_switch(WT_CURSOR_LSM *clsm)
29 {
30 	WT_DECL_RET;
31 	WT_LSM_TREE *lsm_tree;
32 	WT_SESSION_IMPL *session;
33 
34 	lsm_tree = clsm->lsm_tree;
35 	session = (WT_SESSION_IMPL *)clsm->iface.session;
36 
37 	if (!lsm_tree->need_switch) {
38 		/*
39 		 * Check that we are up-to-date: don't set the switch if the
40 		 * tree has changed since we last opened cursors: that can lead
41 		 * to switching multiple times when only one switch is
42 		 * required, creating very small chunks.
43 		 */
44 		__wt_lsm_tree_readlock(session, lsm_tree);
45 		if (lsm_tree->nchunks == 0 ||
46 		    (clsm->dsk_gen == lsm_tree->dsk_gen &&
47 		    !lsm_tree->need_switch)) {
48 			lsm_tree->need_switch = true;
49 			ret = __wt_lsm_manager_push_entry(
50 			    session, WT_LSM_WORK_SWITCH, 0, lsm_tree);
51 		}
52 		__wt_lsm_tree_readunlock(session, lsm_tree);
53 	}
54 
55 	return (ret);
56 }
57 
58 /*
59  * __wt_clsm_await_switch --
60  *	Wait for a switch to have completed in the LSM tree
61  */
62 int
__wt_clsm_await_switch(WT_CURSOR_LSM * clsm)63 __wt_clsm_await_switch(WT_CURSOR_LSM *clsm)
64 {
65 	WT_LSM_TREE *lsm_tree;
66 	WT_SESSION_IMPL *session;
67 	int waited;
68 
69 	lsm_tree = clsm->lsm_tree;
70 	session = (WT_SESSION_IMPL *)clsm->iface.session;
71 
72 	/*
73 	 * If there is no primary chunk, or a chunk has overflowed the hard
74 	 * limit, which either means a worker thread has fallen behind or there
75 	 * has just been a user-level checkpoint, wait until the tree changes.
76 	 *
77 	 * We used to switch chunks in the application thread here, but that is
78 	 * problematic because there is a transaction in progress and it could
79 	 * roll back, leaving the metadata inconsistent.
80 	 */
81 	for (waited = 0;
82 	    lsm_tree->nchunks == 0 ||
83 	    clsm->dsk_gen == lsm_tree->dsk_gen;
84 	    ++waited) {
85 		if (waited % WT_THOUSAND == 0)
86 			WT_RET(__wt_lsm_manager_push_entry(
87 			    session, WT_LSM_WORK_SWITCH, 0, lsm_tree));
88 		__wt_sleep(0, 10);
89 	}
90 	return (0);
91 }
92 
93 /*
94  * __clsm_enter_update --
95  *	Make sure an LSM cursor is ready to perform an update.
96  */
97 static int
__clsm_enter_update(WT_CURSOR_LSM * clsm)98 __clsm_enter_update(WT_CURSOR_LSM *clsm)
99 {
100 	WT_CURSOR *primary;
101 	WT_LSM_CHUNK *primary_chunk;
102 	WT_LSM_TREE *lsm_tree;
103 	WT_SESSION_IMPL *session;
104 	bool hard_limit, have_primary, ovfl;
105 
106 	lsm_tree = clsm->lsm_tree;
107 	session = (WT_SESSION_IMPL *)clsm->iface.session;
108 
109 	if (clsm->nchunks == 0) {
110 		primary = NULL;
111 		have_primary = false;
112 	} else {
113 		primary = clsm->chunks[clsm->nchunks - 1]->cursor;
114 		primary_chunk = clsm->primary_chunk;
115 		WT_ASSERT(session, F_ISSET(&session->txn, WT_TXN_HAS_ID));
116 		have_primary = (primary != NULL && primary_chunk != NULL &&
117 		    (primary_chunk->switch_txn == WT_TXN_NONE ||
118 		    WT_TXNID_LT(session->txn.id, primary_chunk->switch_txn)));
119 	}
120 
121 	/*
122 	 * In LSM there are multiple btrees active at one time. The tree
123 	 * switch code needs to use btree API methods, and it wants to
124 	 * operate on the btree for the primary chunk. Set that up now.
125 	 *
126 	 * If the primary chunk has grown too large, set a flag so the worker
127 	 * thread will switch when it gets a chance to avoid introducing high
128 	 * latency into application threads.  Don't do this indefinitely: if a
129 	 * chunk grows twice as large as the configured size, block until it
130 	 * can be switched.
131 	 */
132 	hard_limit = lsm_tree->need_switch;
133 
134 	if (have_primary) {
135 		WT_ENTER_PAGE_INDEX(session);
136 		WT_WITH_BTREE(session, ((WT_CURSOR_BTREE *)primary)->btree,
137 		    ovfl = __wt_btree_lsm_over_size(session, hard_limit ?
138 		    2 * lsm_tree->chunk_size : lsm_tree->chunk_size));
139 		WT_LEAVE_PAGE_INDEX(session);
140 
141 		/* If there was no overflow, we're done. */
142 		if (!ovfl)
143 			return (0);
144 	}
145 
146 	/* Request a switch. */
147 	WT_RET(__wt_clsm_request_switch(clsm));
148 
149 	/* If we only overflowed the soft limit, we're done. */
150 	if (have_primary && !hard_limit)
151 		return (0);
152 
153 	WT_RET(__wt_clsm_await_switch(clsm));
154 
155 	return (0);
156 }
157 
158 /*
159  * __clsm_enter --
160  *	Start an operation on an LSM cursor, update if the tree has changed.
161  */
162 static inline int
__clsm_enter(WT_CURSOR_LSM * clsm,bool reset,bool update)163 __clsm_enter(WT_CURSOR_LSM *clsm, bool reset, bool update)
164 {
165 	WT_DECL_RET;
166 	WT_LSM_TREE *lsm_tree;
167 	WT_SESSION_IMPL *session;
168 	WT_TXN *txn;
169 	uint64_t i, pinned_id , switch_txn;
170 
171 	lsm_tree = clsm->lsm_tree;
172 	session = (WT_SESSION_IMPL *)clsm->iface.session;
173 	txn = &session->txn;
174 
175 	/* Merge cursors never update. */
176 	if (F_ISSET(clsm, WT_CLSM_MERGE))
177 		return (0);
178 
179 	if (reset) {
180 		WT_ASSERT(session, !F_ISSET(&clsm->iface,
181 		    WT_CURSTD_KEY_INT | WT_CURSTD_VALUE_INT));
182 		WT_RET(__clsm_reset_cursors(clsm, NULL));
183 	}
184 
185 	for (;;) {
186 		/* Check if the cursor looks up-to-date. */
187 		if (clsm->dsk_gen != lsm_tree->dsk_gen &&
188 		    lsm_tree->nchunks != 0)
189 			goto open;
190 
191 		/* Update the maximum transaction ID in the primary chunk. */
192 		if (update) {
193 			/*
194 			 * Ensure that there is a transaction snapshot active.
195 			 */
196 			WT_RET(__wt_txn_autocommit_check(session));
197 			WT_RET(__wt_txn_id_check(session));
198 
199 			WT_RET(__clsm_enter_update(clsm));
200 			/*
201 			 * Switching the tree will update the generation before
202 			 * updating the switch transaction.  We test the
203 			 * transaction in clsm_enter_update.  Now test the
204 			 * disk generation to avoid races.
205 			 */
206 			if (clsm->dsk_gen != clsm->lsm_tree->dsk_gen)
207 				goto open;
208 
209 			if (txn->isolation == WT_ISO_SNAPSHOT)
210 				__wt_txn_cursor_op(session);
211 
212 			/*
213 			 * Figure out how many updates are required for
214 			 * snapshot isolation.
215 			 *
216 			 * This is not a normal visibility check on the maximum
217 			 * transaction ID in each chunk: any transaction ID
218 			 * that overlaps with our snapshot is a potential
219 			 * conflict.
220 			 *
221 			 * Note that the pinned ID is correct here: it tracks
222 			 * concurrent transactions excluding special
223 			 * transactions such as checkpoint (which we can't
224 			 * conflict with because checkpoint only writes the
225 			 * metadata, which is not an LSM tree).
226 			 */
227 			clsm->nupdates = 1;
228 			if (txn->isolation == WT_ISO_SNAPSHOT &&
229 			    F_ISSET(clsm, WT_CLSM_OPEN_SNAPSHOT)) {
230 				WT_ASSERT(session,
231 				    F_ISSET(txn, WT_TXN_HAS_SNAPSHOT));
232 				pinned_id =
233 				    WT_SESSION_TXN_STATE(session)->pinned_id;
234 				for (i = clsm->nchunks - 2;
235 				    clsm->nupdates < clsm->nchunks;
236 				    clsm->nupdates++, i--) {
237 					switch_txn =
238 					    clsm->chunks[i]->switch_txn;
239 					if (WT_TXNID_LT(switch_txn, pinned_id))
240 						break;
241 					WT_ASSERT(session,
242 					    !__wt_txn_visible_all(
243 					    session, switch_txn, NULL));
244 				}
245 			}
246 		}
247 
248 		/*
249 		 * Stop when we are up-to-date, as long as this is:
250 		 *   - a snapshot isolation update and the cursor is set up for
251 		 *     that;
252 		 *   - an update operation with a primary chunk, or
253 		 *   - a read operation and the cursor is open for reading.
254 		 */
255 		if ((!update ||
256 		    txn->isolation != WT_ISO_SNAPSHOT ||
257 		    F_ISSET(clsm, WT_CLSM_OPEN_SNAPSHOT)) &&
258 		    ((update && clsm->primary_chunk != NULL) ||
259 		    (!update && F_ISSET(clsm, WT_CLSM_OPEN_READ))))
260 			break;
261 
262 open:		WT_WITH_SCHEMA_LOCK(session,
263 		    ret = __clsm_open_cursors(clsm, update, 0, 0));
264 		WT_RET(ret);
265 	}
266 
267 	if (!F_ISSET(clsm, WT_CLSM_ACTIVE)) {
268 		/*
269 		 * Opening this LSM cursor has opened a number of btree
270 		 * cursors, ensure other code doesn't think this is the first
271 		 * cursor in a session.
272 		 */
273 		++session->ncursors;
274 		WT_RET(__cursor_enter(session));
275 		F_SET(clsm, WT_CLSM_ACTIVE);
276 	}
277 
278 	return (0);
279 }
280 
281 /*
282  * __clsm_leave --
283  *	Finish an operation on an LSM cursor.
284  */
285 static void
__clsm_leave(WT_CURSOR_LSM * clsm)286 __clsm_leave(WT_CURSOR_LSM *clsm)
287 {
288 	WT_SESSION_IMPL *session;
289 
290 	session = (WT_SESSION_IMPL *)clsm->iface.session;
291 
292 	if (F_ISSET(clsm, WT_CLSM_ACTIVE)) {
293 		--session->ncursors;
294 		__cursor_leave(session);
295 		F_CLR(clsm, WT_CLSM_ACTIVE);
296 	}
297 }
298 
299 /*
300  * We need a tombstone to mark deleted records, and we use the special
301  * value below for that purpose.  We use two 0x14 (Device Control 4) bytes to
302  * minimize the likelihood of colliding with an application-chosen encoding
303  * byte, if the application uses two leading DC4 byte for some reason, we'll do
304  * a wasted data copy each time a new value is inserted into the object.
305  */
306 static const WT_ITEM __tombstone = { "\x14\x14", 2, NULL, 0, 0 };
307 
308 /*
309  * __clsm_deleted --
310  *	Check whether the current value is a tombstone.
311  */
312 static inline bool
__clsm_deleted(WT_CURSOR_LSM * clsm,const WT_ITEM * item)313 __clsm_deleted(WT_CURSOR_LSM *clsm, const WT_ITEM *item)
314 {
315 	return (!F_ISSET(clsm, WT_CLSM_MINOR_MERGE) &&
316 	    item->size == __tombstone.size &&
317 	    memcmp(item->data, __tombstone.data, __tombstone.size) == 0);
318 }
319 
320 /*
321  * __clsm_deleted_encode --
322  *	Encode values that are in the encoded name space.
323  */
324 static inline int
__clsm_deleted_encode(WT_SESSION_IMPL * session,const WT_ITEM * value,WT_ITEM * final_value,WT_ITEM ** tmpp)325 __clsm_deleted_encode(WT_SESSION_IMPL *session,
326     const WT_ITEM *value, WT_ITEM *final_value, WT_ITEM **tmpp)
327 {
328 	WT_ITEM *tmp;
329 
330 	/*
331 	 * If value requires encoding, get a scratch buffer of the right size
332 	 * and create a copy of the data with the first byte of the tombstone
333 	 * appended.
334 	 */
335 	if (value->size >= __tombstone.size &&
336 	    memcmp(value->data, __tombstone.data, __tombstone.size) == 0) {
337 		WT_RET(__wt_scr_alloc(session, value->size + 1, tmpp));
338 		tmp = *tmpp;
339 
340 		memcpy(tmp->mem, value->data, value->size);
341 		memcpy((uint8_t *)tmp->mem + value->size, __tombstone.data, 1);
342 		final_value->data = tmp->mem;
343 		final_value->size = value->size + 1;
344 	} else {
345 		final_value->data = value->data;
346 		final_value->size = value->size;
347 	}
348 
349 	return (0);
350 }
351 
352 /*
353  * __clsm_deleted_decode --
354  *	Decode values that start with the tombstone.
355  */
356 static inline void
__clsm_deleted_decode(WT_CURSOR_LSM * clsm,WT_ITEM * value)357 __clsm_deleted_decode(WT_CURSOR_LSM *clsm, WT_ITEM *value)
358 {
359 	/*
360 	 * Take care with this check: when an LSM cursor is used for a merge,
361 	 * and/or to create a Bloom filter, it is valid to return the tombstone
362 	 * value.
363 	 */
364 	if (!F_ISSET(clsm, WT_CLSM_MERGE) &&
365 	    value->size > __tombstone.size &&
366 	    memcmp(value->data, __tombstone.data, __tombstone.size) == 0)
367 		--value->size;
368 }
369 
370 /*
371  * __clsm_close_cursors --
372  *	Close any btree cursors that are not needed.
373  */
374 static int
__clsm_close_cursors(WT_SESSION_IMPL * session,WT_CURSOR_LSM * clsm,u_int start,u_int end)375 __clsm_close_cursors(
376     WT_SESSION_IMPL *session, WT_CURSOR_LSM *clsm, u_int start, u_int end)
377 {
378 	WT_BLOOM *bloom;
379 	WT_CURSOR *c;
380 	u_int i;
381 
382 	__wt_verbose(session, WT_VERB_LSM,
383 	    "LSM closing cursor session(%p):clsm(%p), start: %u, end: %u",
384 	    (void *)session, (void *)clsm, start, end);
385 
386 	if (clsm->chunks == NULL || clsm->nchunks == 0)
387 		return (0);
388 
389 	/*
390 	 * Walk the cursors, closing any we don't need.  Note that the exit
391 	 * condition here is special, don't use WT_FORALL_CURSORS, and be
392 	 * careful with unsigned integer wrapping.
393 	 */
394 	for (i = start; i < end; i++) {
395 		if ((c = (clsm)->chunks[i]->cursor) != NULL) {
396 			clsm->chunks[i]->cursor = NULL;
397 			WT_RET(c->close(c));
398 		}
399 		if ((bloom = clsm->chunks[i]->bloom) != NULL) {
400 			clsm->chunks[i]->bloom = NULL;
401 			WT_RET(__wt_bloom_close(bloom));
402 		}
403 	}
404 
405 	return (0);
406 }
407 
408 /*
409  * __clsm_resize_chunks --
410  *	Allocates an array of unit objects for each chunk.
411  */
412 static int
__clsm_resize_chunks(WT_SESSION_IMPL * session,WT_CURSOR_LSM * clsm,u_int nchunks)413 __clsm_resize_chunks(
414     WT_SESSION_IMPL *session, WT_CURSOR_LSM *clsm, u_int nchunks)
415 {
416 	WT_LSM_CURSOR_CHUNK *chunk;
417 
418 	/* Don't allocate more iterators if we don't need them. */
419 	if (clsm->chunks_count >= nchunks)
420 		return (0);
421 
422 	WT_RET(__wt_realloc_def(session, &clsm->chunks_alloc, nchunks,
423 	    &clsm->chunks));
424 	for (; clsm->chunks_count < nchunks; clsm->chunks_count++) {
425 		WT_RET(__wt_calloc_one(session, &chunk));
426 		clsm->chunks[clsm->chunks_count] = chunk;
427 	}
428 	return (0);
429 }
430 
431 /*
432  * __clsm_free_chunks --
433  *	Allocates an array of unit objects for each chunk.
434  */
435 static void
__clsm_free_chunks(WT_SESSION_IMPL * session,WT_CURSOR_LSM * clsm)436 __clsm_free_chunks(WT_SESSION_IMPL *session, WT_CURSOR_LSM *clsm)
437 {
438 	size_t i;
439 
440 	for (i = 0; i < clsm->chunks_count; i++)
441 		__wt_free(session, clsm->chunks[i]);
442 
443 	__wt_free(session, clsm->chunks);
444 }
445 
446 /*
447  * __clsm_open_cursors --
448  *	Open cursors for the current set of files.
449  */
450 static int
__clsm_open_cursors(WT_CURSOR_LSM * clsm,bool update,u_int start_chunk,uint32_t start_id)451 __clsm_open_cursors(
452     WT_CURSOR_LSM *clsm, bool update, u_int start_chunk, uint32_t start_id)
453 {
454 	WT_BTREE *btree;
455 	WT_CURSOR *c, *cursor, *primary;
456 	WT_DECL_RET;
457 	WT_LSM_CHUNK *chunk;
458 	WT_LSM_TREE *lsm_tree;
459 	WT_SESSION_IMPL *session;
460 	WT_TXN *txn;
461 	uint64_t saved_gen;
462 	u_int close_range_end, close_range_start;
463 	u_int i, nchunks, ngood, nupdates;
464 	const char *checkpoint, *ckpt_cfg[3];
465 	bool locked;
466 
467 	c = &clsm->iface;
468 	cursor = NULL;
469 	session = (WT_SESSION_IMPL *)c->session;
470 	txn = &session->txn;
471 	chunk = NULL;
472 	locked = false;
473 	lsm_tree = clsm->lsm_tree;
474 
475 	/*
476 	 * Ensure that any snapshot update has cursors on the right set of
477 	 * chunks to guarantee visibility is correct.
478 	 */
479 	if (update && txn->isolation == WT_ISO_SNAPSHOT)
480 		F_SET(clsm, WT_CLSM_OPEN_SNAPSHOT);
481 
482 	/*
483 	 * Query operations need a full set of cursors. Overwrite cursors
484 	 * do queries in service of updates.
485 	 */
486 	if (!update || !F_ISSET(c, WT_CURSTD_OVERWRITE))
487 		F_SET(clsm, WT_CLSM_OPEN_READ);
488 
489 	if (lsm_tree->nchunks == 0)
490 		return (0);
491 
492 	ckpt_cfg[0] = WT_CONFIG_BASE(session, WT_SESSION_open_cursor);
493 	ckpt_cfg[1] = "checkpoint=" WT_CHECKPOINT ",raw";
494 	ckpt_cfg[2] = NULL;
495 
496 	/*
497 	 * If the key is pointing to memory that is pinned by a chunk
498 	 * cursor, take a copy before closing cursors.
499 	 */
500 	if (F_ISSET(c, WT_CURSTD_KEY_INT))
501 		WT_ERR(__cursor_needkey(c));
502 
503 	F_CLR(clsm, WT_CLSM_ITERATE_NEXT | WT_CLSM_ITERATE_PREV);
504 
505 	__wt_lsm_tree_readlock(session, lsm_tree);
506 	locked = true;
507 
508 	/* Merge cursors have already figured out how many chunks they need. */
509 retry:	if (F_ISSET(clsm, WT_CLSM_MERGE)) {
510 		nchunks = clsm->nchunks;
511 		ngood = 0;
512 		WT_ERR(__clsm_resize_chunks(session, clsm, nchunks));
513 		/*
514 		 * We may have raced with another merge completing.  Check that
515 		 * we're starting at the right offset in the chunk array.
516 		 */
517 		if (start_chunk >= lsm_tree->nchunks ||
518 		    lsm_tree->chunk[start_chunk]->id != start_id) {
519 			for (start_chunk = 0;
520 			    start_chunk < lsm_tree->nchunks;
521 			    start_chunk++) {
522 				chunk = lsm_tree->chunk[start_chunk];
523 				if (chunk->id == start_id)
524 					break;
525 			}
526 			/* We have to find the start chunk: merge locked it. */
527 			WT_ASSERT(session, start_chunk < lsm_tree->nchunks);
528 		}
529 	} else {
530 		nchunks = lsm_tree->nchunks;
531 		WT_ERR(__clsm_resize_chunks(session, clsm, nchunks));
532 
533 		/*
534 		 * If we are only opening the cursor for updates, only open the
535 		 * primary chunk, plus any other chunks that might be required
536 		 * to detect snapshot isolation conflicts.
537 		 */
538 		if (F_ISSET(clsm, WT_CLSM_OPEN_READ))
539 			ngood = nupdates = 0;
540 		else if (F_ISSET(clsm, WT_CLSM_OPEN_SNAPSHOT)) {
541 			/*
542 			 * Keep going until all updates in the next
543 			 * chunk are globally visible.  Copy the maximum
544 			 * transaction IDs into the cursor as we go.
545 			 */
546 			for (ngood = nchunks - 1, nupdates = 1; ngood > 0;
547 			    ngood--, nupdates++) {
548 				chunk = lsm_tree->chunk[ngood - 1];
549 				clsm->chunks[ngood - 1]->switch_txn =
550 				    chunk->switch_txn;
551 				if (__wt_lsm_chunk_visible_all(session, chunk))
552 					break;
553 			}
554 		} else {
555 			nupdates = 1;
556 			ngood = nchunks - 1;
557 		}
558 
559 		/* Check how many cursors are already open. */
560 		for (; ngood < clsm->nchunks && ngood < nchunks; ngood++) {
561 			chunk = lsm_tree->chunk[ngood];
562 			cursor = clsm->chunks[ngood]->cursor;
563 
564 			/* If the cursor isn't open yet, we're done. */
565 			if (cursor == NULL)
566 				break;
567 
568 			/* Easy case: the URIs don't match. */
569 			if (strcmp(cursor->uri, chunk->uri) != 0)
570 				break;
571 
572 			/*
573 			 * Make sure the checkpoint config matches when not
574 			 * using a custom data source.
575 			 */
576 			if (lsm_tree->custom_generation == 0 ||
577 			    chunk->generation < lsm_tree->custom_generation) {
578 				checkpoint = ((WT_CURSOR_BTREE *)cursor)->
579 				    btree->dhandle->checkpoint;
580 				if (checkpoint == NULL &&
581 				    F_ISSET(chunk, WT_LSM_CHUNK_ONDISK) &&
582 				    !chunk->empty)
583 					break;
584 			}
585 
586 			/* Make sure the Bloom config matches. */
587 			if (clsm->chunks[ngood]->bloom == NULL &&
588 			    F_ISSET(chunk, WT_LSM_CHUNK_BLOOM))
589 				break;
590 		}
591 
592 		/* Spurious generation bump? */
593 		if (ngood == clsm->nchunks && clsm->nchunks == nchunks) {
594 			clsm->dsk_gen = lsm_tree->dsk_gen;
595 			goto err;
596 		}
597 
598 		/*
599 		 * Close any cursors we no longer need.
600 		 *
601 		 * Drop the LSM tree lock while we do this: if the cache is
602 		 * full, we may block while closing a cursor.  Save the
603 		 * generation number and retry if it has changed under us.
604 		 */
605 		if (clsm->chunks != NULL && ngood < clsm->nchunks) {
606 			close_range_start = ngood;
607 			close_range_end = clsm->nchunks;
608 		} else if (!F_ISSET(clsm, WT_CLSM_OPEN_READ) && nupdates > 0) {
609 			close_range_start = 0;
610 			close_range_end = WT_MIN(nchunks, clsm->nchunks);
611 			if (close_range_end > nupdates)
612 				close_range_end -= nupdates;
613 			else
614 				close_range_end = 0;
615 			WT_ASSERT(session, ngood >= close_range_end);
616 		} else {
617 			close_range_end = 0;
618 			close_range_start = 0;
619 		}
620 		if (close_range_end > close_range_start) {
621 			saved_gen = lsm_tree->dsk_gen;
622 			locked = false;
623 			__wt_lsm_tree_readunlock(session, lsm_tree);
624 			WT_ERR(__clsm_close_cursors(session,
625 			    clsm, close_range_start, close_range_end));
626 			__wt_lsm_tree_readlock(session, lsm_tree);
627 			locked = true;
628 			if (lsm_tree->dsk_gen != saved_gen)
629 				goto retry;
630 		}
631 
632 		/* Detach from our old primary. */
633 		clsm->primary_chunk = NULL;
634 		clsm->current = NULL;
635 	}
636 
637 	WT_ASSERT(session, start_chunk + nchunks <= lsm_tree->nchunks);
638 	clsm->nchunks = nchunks;
639 
640 	/* Open the cursors for chunks that have changed. */
641 	__wt_verbose(session, WT_VERB_LSM,
642 	    "LSM opening cursor session(%p):clsm(%p)%s, chunks: %u, good: %u",
643 	    (void *)session, (void *)clsm,
644 	    update ? ", update" : "", nchunks, ngood);
645 	for (i = ngood; i != nchunks; i++) {
646 		chunk = lsm_tree->chunk[i + start_chunk];
647 		/* Copy the maximum transaction ID. */
648 		if (F_ISSET(clsm, WT_CLSM_OPEN_SNAPSHOT))
649 			clsm->chunks[i]->switch_txn = chunk->switch_txn;
650 
651 		/*
652 		 * Read from the checkpoint if the file has been written.
653 		 * Once all cursors switch, the in-memory tree can be evicted.
654 		 */
655 		WT_ASSERT(session, clsm->chunks[i]->cursor == NULL);
656 		ret = __wt_open_cursor(session, chunk->uri, c,
657 		    (F_ISSET(chunk, WT_LSM_CHUNK_ONDISK) && !chunk->empty) ?
658 			ckpt_cfg : NULL, &clsm->chunks[i]->cursor);
659 
660 		/*
661 		 * XXX kludge: we may have an empty chunk where no checkpoint
662 		 * was written.  If so, try to open the ordinary handle on that
663 		 * chunk instead.
664 		 */
665 		if (ret == WT_NOTFOUND && F_ISSET(chunk, WT_LSM_CHUNK_ONDISK)) {
666 			ret = __wt_open_cursor(session,
667 			    chunk->uri, c, NULL, &clsm->chunks[i]->cursor);
668 			if (ret == 0)
669 				chunk->empty = 1;
670 		}
671 		WT_ERR(ret);
672 
673 		/*
674 		 * Setup all cursors other than the primary to only do conflict
675 		 * checks on insert operations. This allows us to execute
676 		 * inserts on non-primary chunks as a way of checking for
677 		 * write conflicts with concurrent updates.
678 		 */
679 		if (i != nchunks - 1)
680 			clsm->chunks[i]->cursor->insert =
681 			    __wt_curfile_insert_check;
682 
683 		if (!F_ISSET(clsm, WT_CLSM_MERGE) &&
684 		    F_ISSET(chunk, WT_LSM_CHUNK_BLOOM))
685 			WT_ERR(__wt_bloom_open(session, chunk->bloom_uri,
686 			    lsm_tree->bloom_bit_count,
687 			    lsm_tree->bloom_hash_count,
688 			    c, &clsm->chunks[i]->bloom));
689 
690 		/* Child cursors always use overwrite and raw mode. */
691 		F_SET(clsm->chunks[i]->cursor,
692 		    WT_CURSTD_OVERWRITE | WT_CURSTD_RAW);
693 	}
694 
695 	/* Setup the count values for each chunk in the chunks */
696 	for (i = 0; i != clsm->nchunks; i++)
697 		clsm->chunks[i]->count =
698 		    lsm_tree->chunk[i + start_chunk]->count;
699 
700 	/* The last chunk is our new primary. */
701 	if (chunk != NULL &&
702 	    !F_ISSET(chunk, WT_LSM_CHUNK_ONDISK) &&
703 	    chunk->switch_txn == WT_TXN_NONE) {
704 		primary = clsm->chunks[clsm->nchunks - 1]->cursor;
705 		btree = ((WT_CURSOR_BTREE *)primary)->btree;
706 
707 		/*
708 		 * If the primary is not yet set as the primary, do that now.
709 		 * Note that eviction was configured off when the underlying
710 		 * object was created, which is what we want, leave it alone.
711 		 *
712 		 * We don't have to worry about races here: every thread that
713 		 * modifies the tree will have to come through here, at worse
714 		 * we set the flag repeatedly.  We don't use a WT_BTREE handle
715 		 * flag, however, we could race doing the read-modify-write of
716 		 * the flags field.
717 		 *
718 		 * If something caused the chunk to be closed and reopened
719 		 * since it was created, we can no longer use it as a primary
720 		 * chunk and we need to force a switch. We detect the tree was
721 		 * created when it was opened by checking the "original" flag.
722 		 */
723 		if (!btree->lsm_primary && btree->original)
724 			btree->lsm_primary = true;
725 		if (btree->lsm_primary)
726 			clsm->primary_chunk = chunk;
727 	}
728 
729 	clsm->dsk_gen = lsm_tree->dsk_gen;
730 
731 err:
732 #ifdef HAVE_DIAGNOSTIC
733 	/* Check that all cursors are open as expected. */
734 	if (ret == 0 && F_ISSET(clsm, WT_CLSM_OPEN_READ)) {
735 		for (i = 0; i != clsm->nchunks; i++) {
736 			cursor = clsm->chunks[i]->cursor;
737 			chunk = lsm_tree->chunk[i + start_chunk];
738 
739 			/* Make sure the first cursor is open. */
740 			WT_ASSERT(session, cursor != NULL);
741 
742 			/* Easy case: the URIs should match. */
743 			WT_ASSERT(
744 			    session, strcmp(cursor->uri, chunk->uri) == 0);
745 
746 			/*
747 			 * Make sure the checkpoint config matches when not
748 			 * using a custom data source.
749 			 */
750 			if (lsm_tree->custom_generation == 0 ||
751 			    chunk->generation < lsm_tree->custom_generation) {
752 				checkpoint = ((WT_CURSOR_BTREE *)cursor)->
753 				    btree->dhandle->checkpoint;
754 				WT_ASSERT(session,
755 				    (F_ISSET(chunk, WT_LSM_CHUNK_ONDISK) &&
756 				    !chunk->empty) ?
757 				    checkpoint != NULL : checkpoint == NULL);
758 			}
759 
760 			/* Make sure the Bloom config matches. */
761 			WT_ASSERT(session,
762 			    (F_ISSET(chunk, WT_LSM_CHUNK_BLOOM) &&
763 			    !F_ISSET(clsm, WT_CLSM_MERGE)) ?
764 			    clsm->chunks[i]->bloom != NULL :
765 			    clsm->chunks[i]->bloom == NULL);
766 		}
767 	}
768 #endif
769 	if (locked)
770 		__wt_lsm_tree_readunlock(session, lsm_tree);
771 	return (ret);
772 }
773 
774 /*
775  * __wt_clsm_init_merge --
776  *	Initialize an LSM cursor for a merge.
777  */
778 int
__wt_clsm_init_merge(WT_CURSOR * cursor,u_int start_chunk,uint32_t start_id,u_int nchunks)779 __wt_clsm_init_merge(
780     WT_CURSOR *cursor, u_int start_chunk, uint32_t start_id, u_int nchunks)
781 {
782 	WT_CURSOR_LSM *clsm;
783 	WT_DECL_RET;
784 	WT_SESSION_IMPL *session;
785 
786 	clsm = (WT_CURSOR_LSM *)cursor;
787 	session = (WT_SESSION_IMPL *)cursor->session;
788 
789 	F_SET(clsm, WT_CLSM_MERGE);
790 	if (start_chunk != 0)
791 		F_SET(clsm, WT_CLSM_MINOR_MERGE);
792 	clsm->nchunks = nchunks;
793 
794 	WT_WITH_SCHEMA_LOCK(session,
795 	    ret = __clsm_open_cursors(clsm, false, start_chunk, start_id));
796 	return (ret);
797 }
798 
799 /*
800  * __clsm_get_current --
801  *	Find the smallest / largest of the cursors and copy its key/value.
802  */
803 static int
__clsm_get_current(WT_SESSION_IMPL * session,WT_CURSOR_LSM * clsm,bool smallest,bool * deletedp)804 __clsm_get_current(WT_SESSION_IMPL *session,
805     WT_CURSOR_LSM *clsm, bool smallest, bool *deletedp)
806 {
807 	WT_CURSOR *c, *current;
808 	u_int i;
809 	int cmp;
810 	bool multiple;
811 
812 	current = NULL;
813 	multiple = false;
814 
815 	WT_FORALL_CURSORS(clsm, c, i) {
816 		if (!F_ISSET(c, WT_CURSTD_KEY_INT))
817 			continue;
818 		if (current == NULL) {
819 			current = c;
820 			continue;
821 		}
822 		WT_RET(WT_LSM_CURCMP(session, clsm->lsm_tree, c, current, cmp));
823 		if (smallest ? cmp < 0 : cmp > 0) {
824 			current = c;
825 			multiple = false;
826 		} else if (cmp == 0)
827 			multiple = true;
828 	}
829 
830 	c = &clsm->iface;
831 	if ((clsm->current = current) == NULL) {
832 		F_CLR(c, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET);
833 		return (WT_NOTFOUND);
834 	}
835 
836 	if (multiple)
837 		F_SET(clsm, WT_CLSM_MULTIPLE);
838 	else
839 		F_CLR(clsm, WT_CLSM_MULTIPLE);
840 
841 	WT_RET(current->get_key(current, &c->key));
842 	WT_RET(current->get_value(current, &c->value));
843 
844 	F_CLR(c, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET);
845 	if ((*deletedp = __clsm_deleted(clsm, &c->value)) == false)
846 		F_SET(c, WT_CURSTD_KEY_INT | WT_CURSTD_VALUE_INT);
847 
848 	return (0);
849 }
850 
851 /*
852  * __clsm_compare --
853  *	WT_CURSOR->compare implementation for the LSM cursor type.
854  */
855 static int
__clsm_compare(WT_CURSOR * a,WT_CURSOR * b,int * cmpp)856 __clsm_compare(WT_CURSOR *a, WT_CURSOR *b, int *cmpp)
857 {
858 	WT_CURSOR_LSM *alsm;
859 	WT_DECL_RET;
860 	WT_SESSION_IMPL *session;
861 
862 	/* There's no need to sync with the LSM tree, avoid WT_LSM_ENTER. */
863 	alsm = (WT_CURSOR_LSM *)a;
864 	CURSOR_API_CALL(a, session, compare, NULL);
865 
866 	/*
867 	 * Confirm both cursors refer to the same source and have keys, then
868 	 * compare the keys.
869 	 */
870 	if (strcmp(a->uri, b->uri) != 0)
871 		WT_ERR_MSG(session, EINVAL,
872 		    "comparison method cursors must reference the same object");
873 
874 	WT_ERR(__cursor_needkey(a));
875 	WT_ERR(__cursor_needkey(b));
876 
877 	WT_ERR(__wt_compare(
878 	    session, alsm->lsm_tree->collator, &a->key, &b->key, cmpp));
879 
880 err:	API_END_RET(session, ret);
881 }
882 
883 /*
884  * __clsm_position_chunk --
885  *	Position a chunk cursor.
886  */
887 static int
__clsm_position_chunk(WT_CURSOR_LSM * clsm,WT_CURSOR * c,bool forward,int * cmpp)888 __clsm_position_chunk(
889     WT_CURSOR_LSM *clsm, WT_CURSOR *c, bool forward, int *cmpp)
890 {
891 	WT_CURSOR *cursor;
892 	WT_SESSION_IMPL *session;
893 
894 	cursor = &clsm->iface;
895 	session = (WT_SESSION_IMPL *)cursor->session;
896 
897 	c->set_key(c, &cursor->key);
898 	WT_RET(c->search_near(c, cmpp));
899 
900 	while (forward ? *cmpp < 0 : *cmpp > 0) {
901 		WT_RET(forward ? c->next(c) : c->prev(c));
902 
903 		/*
904 		 * With higher isolation levels, where we have stable reads,
905 		 * we're done: the cursor is now positioned as expected.
906 		 *
907 		 * With read-uncommitted isolation, a new record could have
908 		 * appeared in between the search and stepping forward / back.
909 		 * In that case, keep going until we see a key in the expected
910 		 * range.
911 		 */
912 		if (session->txn.isolation != WT_ISO_READ_UNCOMMITTED)
913 			return (0);
914 
915 		WT_RET(WT_LSM_CURCMP(session,
916 		    clsm->lsm_tree, c, cursor, *cmpp));
917 	}
918 
919 	return (0);
920 }
921 
922 /*
923  * __clsm_next --
924  *	WT_CURSOR->next method for the LSM cursor type.
925  */
926 static int
__clsm_next(WT_CURSOR * cursor)927 __clsm_next(WT_CURSOR *cursor)
928 {
929 	WT_CURSOR *c;
930 	WT_CURSOR_LSM *clsm;
931 	WT_DECL_RET;
932 	WT_SESSION_IMPL *session;
933 	u_int i;
934 	int cmp;
935 	bool deleted;
936 
937 	clsm = (WT_CURSOR_LSM *)cursor;
938 
939 	CURSOR_API_CALL(cursor, session, next, NULL);
940 	__cursor_novalue(cursor);
941 	WT_ERR(__clsm_enter(clsm, false, false));
942 
943 	/* If we aren't positioned for a forward scan, get started. */
944 	if (clsm->current == NULL || !F_ISSET(clsm, WT_CLSM_ITERATE_NEXT)) {
945 		WT_FORALL_CURSORS(clsm, c, i) {
946 			if (!F_ISSET(cursor, WT_CURSTD_KEY_SET)) {
947 				WT_ERR(c->reset(c));
948 				ret = c->next(c);
949 			} else if (c != clsm->current && (ret =
950 			    __clsm_position_chunk(clsm, c, true, &cmp)) == 0 &&
951 			    cmp == 0 && clsm->current == NULL)
952 				clsm->current = c;
953 			WT_ERR_NOTFOUND_OK(ret);
954 		}
955 		F_SET(clsm, WT_CLSM_ITERATE_NEXT | WT_CLSM_MULTIPLE);
956 		F_CLR(clsm, WT_CLSM_ITERATE_PREV);
957 
958 		/* We just positioned *at* the key, now move. */
959 		if (clsm->current != NULL)
960 			goto retry;
961 	} else {
962 retry:		/*
963 		 * If there are multiple cursors on that key, move them
964 		 * forward.
965 		 */
966 		if (F_ISSET(clsm, WT_CLSM_MULTIPLE)) {
967 			WT_FORALL_CURSORS(clsm, c, i) {
968 				if (!F_ISSET(c, WT_CURSTD_KEY_INT))
969 					continue;
970 				if (c != clsm->current) {
971 					WT_ERR(WT_LSM_CURCMP(session,
972 					    clsm->lsm_tree, c, clsm->current,
973 					    cmp));
974 					if (cmp == 0)
975 						WT_ERR_NOTFOUND_OK(c->next(c));
976 				}
977 			}
978 		}
979 
980 		/* Move the smallest cursor forward. */
981 		c = clsm->current;
982 		WT_ERR_NOTFOUND_OK(c->next(c));
983 	}
984 
985 	/* Find the cursor(s) with the smallest key. */
986 	if ((ret = __clsm_get_current(session, clsm, true, &deleted)) == 0 &&
987 	    deleted)
988 		goto retry;
989 
990 err:	__clsm_leave(clsm);
991 	if (ret == 0)
992 		__clsm_deleted_decode(clsm, &cursor->value);
993 	API_END_RET(session, ret);
994 }
995 
996 /*
997  * __clsm_random_chunk --
998  *	Pick a chunk at random, weighted by the size of all chunks. Weighting
999  * proportional to documents avoids biasing towards small chunks. Then return
1000  * the cursor on the chunk we have picked.
1001  */
1002 static int
__clsm_random_chunk(WT_SESSION_IMPL * session,WT_CURSOR_LSM * clsm,WT_CURSOR ** cursor)1003 __clsm_random_chunk(WT_SESSION_IMPL *session,
1004     WT_CURSOR_LSM *clsm, WT_CURSOR **cursor)
1005 {
1006 	uint64_t checked_docs, i, rand_doc, total_docs;
1007 
1008 	/*
1009 	 * If the tree is empty we cannot do a random lookup, so return a
1010 	 * WT_NOTFOUND.
1011 	 */
1012 	if (clsm->nchunks == 0)
1013 		return (WT_NOTFOUND);
1014 	for (total_docs = i = 0; i < clsm->nchunks; i++) {
1015 		total_docs += clsm->chunks[i]->count;
1016 	}
1017 	if (total_docs == 0)
1018 		return (WT_NOTFOUND);
1019 
1020 	rand_doc = __wt_random(&session->rnd) % total_docs;
1021 
1022 	for (checked_docs = i = 0; i < clsm->nchunks; i++) {
1023 		checked_docs += clsm->chunks[i]->count;
1024 		if (rand_doc <= checked_docs) {
1025 			*cursor = clsm->chunks[i]->cursor;
1026 			break;
1027 		}
1028 	}
1029 	return (0);
1030 }
1031 
1032 /*
1033  * __clsm_next_random --
1034  *	WT_CURSOR->next method for the LSM cursor type when configured with
1035  * next_random.
1036  */
1037 static int
__clsm_next_random(WT_CURSOR * cursor)1038 __clsm_next_random(WT_CURSOR *cursor)
1039 {
1040 	WT_CURSOR *c;
1041 	WT_CURSOR_LSM *clsm;
1042 	WT_DECL_RET;
1043 	WT_SESSION_IMPL *session;
1044 	int exact;
1045 
1046 	c = NULL;
1047 	clsm = (WT_CURSOR_LSM *)cursor;
1048 
1049 	CURSOR_API_CALL(cursor, session, next, NULL);
1050 	__cursor_novalue(cursor);
1051 	WT_ERR(__clsm_enter(clsm, false, false));
1052 
1053 	for (;;) {
1054 		WT_ERR(__clsm_random_chunk(session, clsm, &c));
1055 		/*
1056 		 * This call to next_random on the chunk can potentially end in
1057 		 * WT_NOTFOUND if the chunk we picked is empty. We want to retry
1058 		 * in that case.
1059 		 */
1060 		ret = __wt_curfile_next_random(c);
1061 		if (ret == WT_NOTFOUND)
1062 			continue;
1063 
1064 		WT_ERR(ret);
1065 		F_SET(cursor, WT_CURSTD_KEY_INT);
1066 		WT_ERR(c->get_key(c, &cursor->key));
1067 		/*
1068 		 * Search near the current key to resolve any tombstones
1069 		 * and position to a valid document. If we see a
1070 		 * WT_NOTFOUND here that is valid, as the tree has no
1071 		 * documents visible to us.
1072 		 */
1073 		WT_ERR(__clsm_search_near(cursor, &exact));
1074 		break;
1075 	}
1076 
1077 	/* We have found a valid doc. Set that we are now positioned */
1078 	if (0) {
1079 err:		F_CLR(cursor, WT_CURSTD_KEY_INT | WT_CURSTD_VALUE_INT);
1080 	}
1081 	__clsm_leave(clsm);
1082 	API_END_RET(session, ret);
1083 }
1084 
1085 /*
1086  * __clsm_prev --
1087  *	WT_CURSOR->prev method for the LSM cursor type.
1088  */
1089 static int
__clsm_prev(WT_CURSOR * cursor)1090 __clsm_prev(WT_CURSOR *cursor)
1091 {
1092 	WT_CURSOR *c;
1093 	WT_CURSOR_LSM *clsm;
1094 	WT_DECL_RET;
1095 	WT_SESSION_IMPL *session;
1096 	u_int i;
1097 	int cmp;
1098 	bool deleted;
1099 
1100 	clsm = (WT_CURSOR_LSM *)cursor;
1101 
1102 	CURSOR_API_CALL(cursor, session, prev, NULL);
1103 	__cursor_novalue(cursor);
1104 	WT_ERR(__clsm_enter(clsm, false, false));
1105 
1106 	/* If we aren't positioned for a reverse scan, get started. */
1107 	if (clsm->current == NULL || !F_ISSET(clsm, WT_CLSM_ITERATE_PREV)) {
1108 		WT_FORALL_CURSORS(clsm, c, i) {
1109 			if (!F_ISSET(cursor, WT_CURSTD_KEY_SET)) {
1110 				WT_ERR(c->reset(c));
1111 				ret = c->prev(c);
1112 			} else if (c != clsm->current && (ret =
1113 			    __clsm_position_chunk(clsm, c, false, &cmp)) == 0 &&
1114 			    cmp == 0 && clsm->current == NULL)
1115 				clsm->current = c;
1116 			WT_ERR_NOTFOUND_OK(ret);
1117 		}
1118 		F_SET(clsm, WT_CLSM_ITERATE_PREV | WT_CLSM_MULTIPLE);
1119 		F_CLR(clsm, WT_CLSM_ITERATE_NEXT);
1120 
1121 		/* We just positioned *at* the key, now move. */
1122 		if (clsm->current != NULL)
1123 			goto retry;
1124 	} else {
1125 retry:		/*
1126 		 * If there are multiple cursors on that key, move them
1127 		 * backwards.
1128 		 */
1129 		if (F_ISSET(clsm, WT_CLSM_MULTIPLE)) {
1130 			WT_FORALL_CURSORS(clsm, c, i) {
1131 				if (!F_ISSET(c, WT_CURSTD_KEY_INT))
1132 					continue;
1133 				if (c != clsm->current) {
1134 					WT_ERR(WT_LSM_CURCMP(session,
1135 					    clsm->lsm_tree, c, clsm->current,
1136 					    cmp));
1137 					if (cmp == 0)
1138 						WT_ERR_NOTFOUND_OK(c->prev(c));
1139 				}
1140 			}
1141 		}
1142 
1143 		/* Move the largest cursor backwards. */
1144 		c = clsm->current;
1145 		WT_ERR_NOTFOUND_OK(c->prev(c));
1146 	}
1147 
1148 	/* Find the cursor(s) with the largest key. */
1149 	if ((ret = __clsm_get_current(session, clsm, false, &deleted)) == 0 &&
1150 	    deleted)
1151 		goto retry;
1152 
1153 err:	__clsm_leave(clsm);
1154 	if (ret == 0)
1155 		__clsm_deleted_decode(clsm, &cursor->value);
1156 	API_END_RET(session, ret);
1157 }
1158 
1159 /*
1160  * __clsm_reset_cursors --
1161  *	Reset any positioned chunk cursors.
1162  *
1163  *	If the skip parameter is non-NULL, that cursor is about to be used, so
1164  *	there is no need to reset it.
1165  */
1166 static int
__clsm_reset_cursors(WT_CURSOR_LSM * clsm,WT_CURSOR * skip)1167 __clsm_reset_cursors(WT_CURSOR_LSM *clsm, WT_CURSOR *skip)
1168 {
1169 	WT_CURSOR *c;
1170 	WT_DECL_RET;
1171 	u_int i;
1172 
1173 	/* Fast path if the cursor is not positioned. */
1174 	if ((clsm->current == NULL || clsm->current == skip) &&
1175 	    !F_ISSET(clsm, WT_CLSM_ITERATE_NEXT | WT_CLSM_ITERATE_PREV))
1176 		return (0);
1177 
1178 	WT_FORALL_CURSORS(clsm, c, i) {
1179 		if (c == skip)
1180 			continue;
1181 		if (F_ISSET(c, WT_CURSTD_KEY_INT))
1182 			WT_TRET(c->reset(c));
1183 	}
1184 
1185 	clsm->current = NULL;
1186 	F_CLR(clsm, WT_CLSM_ITERATE_NEXT | WT_CLSM_ITERATE_PREV);
1187 
1188 	return (ret);
1189 }
1190 
1191 /*
1192  * __clsm_reset --
1193  *	WT_CURSOR->reset method for the LSM cursor type.
1194  */
1195 static int
__clsm_reset(WT_CURSOR * cursor)1196 __clsm_reset(WT_CURSOR *cursor)
1197 {
1198 	WT_CURSOR_LSM *clsm;
1199 	WT_DECL_RET;
1200 	WT_SESSION_IMPL *session;
1201 
1202 	/*
1203 	 * Don't use the normal __clsm_enter path: that is wasted work when all
1204 	 * we want to do is give up our position.
1205 	 */
1206 	clsm = (WT_CURSOR_LSM *)cursor;
1207 	CURSOR_API_CALL_PREPARE_ALLOWED(cursor, session, reset, NULL);
1208 	F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET);
1209 
1210 	WT_TRET(__clsm_reset_cursors(clsm, NULL));
1211 
1212 	/* In case we were left positioned, clear that. */
1213 	__clsm_leave(clsm);
1214 
1215 err:	API_END_RET(session, ret);
1216 }
1217 
1218 /*
1219  * __clsm_lookup --
1220  *	Position an LSM cursor.
1221  */
1222 static int
__clsm_lookup(WT_CURSOR_LSM * clsm,WT_ITEM * value)1223 __clsm_lookup(WT_CURSOR_LSM *clsm, WT_ITEM *value)
1224 {
1225 	WT_BLOOM *bloom;
1226 	WT_BLOOM_HASH bhash;
1227 	WT_CURSOR *c, *cursor;
1228 	WT_DECL_RET;
1229 	WT_SESSION_IMPL *session;
1230 	u_int i;
1231 	bool have_hash;
1232 
1233 	c = NULL;
1234 	cursor = &clsm->iface;
1235 	have_hash = false;
1236 	session = (WT_SESSION_IMPL *)cursor->session;
1237 
1238 	WT_FORALL_CURSORS(clsm, c, i) {
1239 		/* If there is a Bloom filter, see if we can skip the read. */
1240 		bloom = NULL;
1241 		if ((bloom = clsm->chunks[i]->bloom) != NULL) {
1242 			if (!have_hash) {
1243 				__wt_bloom_hash(bloom, &cursor->key, &bhash);
1244 				have_hash = true;
1245 			}
1246 
1247 			ret = __wt_bloom_hash_get(bloom, &bhash);
1248 			if (ret == WT_NOTFOUND) {
1249 				WT_LSM_TREE_STAT_INCR(
1250 				    session, clsm->lsm_tree->bloom_miss);
1251 				continue;
1252 			}
1253 			if (ret == 0)
1254 				WT_LSM_TREE_STAT_INCR(
1255 				    session, clsm->lsm_tree->bloom_hit);
1256 			WT_ERR(ret);
1257 		}
1258 		c->set_key(c, &cursor->key);
1259 		if ((ret = c->search(c)) == 0) {
1260 			WT_ERR(c->get_key(c, &cursor->key));
1261 			WT_ERR(c->get_value(c, value));
1262 			if (__clsm_deleted(clsm, value))
1263 				ret = WT_NOTFOUND;
1264 			goto done;
1265 		}
1266 		WT_ERR_NOTFOUND_OK(ret);
1267 		F_CLR(c, WT_CURSTD_KEY_SET);
1268 		/* Update stats: the active chunk can't have a bloom filter. */
1269 		if (bloom != NULL)
1270 			WT_LSM_TREE_STAT_INCR(session,
1271 			    clsm->lsm_tree->bloom_false_positive);
1272 		else if (clsm->primary_chunk == NULL || i != clsm->nchunks)
1273 			WT_LSM_TREE_STAT_INCR(session,
1274 			    clsm->lsm_tree->lsm_lookup_no_bloom);
1275 	}
1276 	WT_ERR(WT_NOTFOUND);
1277 
1278 done:
1279 err:	if (ret == 0) {
1280 		F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET);
1281 		F_SET(cursor, WT_CURSTD_KEY_INT);
1282 		clsm->current = c;
1283 		if (value == &cursor->value)
1284 			F_SET(cursor, WT_CURSTD_VALUE_INT);
1285 	} else if (c != NULL)
1286 		WT_TRET(c->reset(c));
1287 
1288 	return (ret);
1289 }
1290 
1291 /*
1292  * __clsm_search --
1293  *	WT_CURSOR->search method for the LSM cursor type.
1294  */
1295 static int
__clsm_search(WT_CURSOR * cursor)1296 __clsm_search(WT_CURSOR *cursor)
1297 {
1298 	WT_CURSOR_LSM *clsm;
1299 	WT_DECL_RET;
1300 	WT_SESSION_IMPL *session;
1301 
1302 	clsm = (WT_CURSOR_LSM *)cursor;
1303 
1304 	CURSOR_API_CALL(cursor, session, search, NULL);
1305 	WT_ERR(__cursor_needkey(cursor));
1306 	__cursor_novalue(cursor);
1307 	WT_ERR(__clsm_enter(clsm, true, false));
1308 	F_CLR(clsm, WT_CLSM_ITERATE_NEXT | WT_CLSM_ITERATE_PREV);
1309 
1310 	ret = __clsm_lookup(clsm, &cursor->value);
1311 
1312 err:	__clsm_leave(clsm);
1313 	if (ret == 0)
1314 		__clsm_deleted_decode(clsm, &cursor->value);
1315 	API_END_RET(session, ret);
1316 }
1317 
1318 /*
1319  * __clsm_search_near --
1320  *	WT_CURSOR->search_near method for the LSM cursor type.
1321  */
1322 static int
__clsm_search_near(WT_CURSOR * cursor,int * exactp)1323 __clsm_search_near(WT_CURSOR *cursor, int *exactp)
1324 {
1325 	WT_CURSOR *c, *closest;
1326 	WT_CURSOR_LSM *clsm;
1327 	WT_DECL_RET;
1328 	WT_SESSION_IMPL *session;
1329 	u_int i;
1330 	int cmp, exact;
1331 	bool deleted;
1332 
1333 	closest = NULL;
1334 	clsm = (WT_CURSOR_LSM *)cursor;
1335 	exact = 0;
1336 
1337 	CURSOR_API_CALL(cursor, session, search_near, NULL);
1338 	WT_ERR(__cursor_needkey(cursor));
1339 	__cursor_novalue(cursor);
1340 	WT_ERR(__clsm_enter(clsm, true, false));
1341 	F_CLR(clsm, WT_CLSM_ITERATE_NEXT | WT_CLSM_ITERATE_PREV);
1342 
1343 	/*
1344 	 * search_near is somewhat fiddly: we can't just use a nearby key from
1345 	 * the in-memory chunk because there could be a closer key on disk.
1346 	 *
1347 	 * As we search down the chunks, we stop as soon as we find an exact
1348 	 * match.  Otherwise, we maintain the smallest cursor larger than the
1349 	 * search key and the largest cursor smaller than the search key.  At
1350 	 * the end, we prefer the larger cursor, but if no record is larger,
1351 	 * position on the last record in the tree.
1352 	 */
1353 	WT_FORALL_CURSORS(clsm, c, i) {
1354 		c->set_key(c, &cursor->key);
1355 		if ((ret = c->search_near(c, &cmp)) == WT_NOTFOUND) {
1356 			ret = 0;
1357 			continue;
1358 		}
1359 		if (ret != 0)
1360 			goto err;
1361 
1362 		/* Do we have an exact match? */
1363 		if (cmp == 0) {
1364 			closest = c;
1365 			exact = 1;
1366 			break;
1367 		}
1368 
1369 		/*
1370 		 * Prefer larger cursors.  There are two reasons: (1) we expect
1371 		 * prefix searches to be a common case (as in our own indices);
1372 		 * and (2) we need a way to unambiguously know we have the
1373 		 * "closest" result.
1374 		 */
1375 		if (cmp < 0) {
1376 			if ((ret = c->next(c)) == WT_NOTFOUND) {
1377 				ret = 0;
1378 				continue;
1379 			}
1380 			if (ret != 0)
1381 				goto err;
1382 		}
1383 
1384 		/*
1385 		 * We are trying to find the smallest cursor greater than the
1386 		 * search key.
1387 		 */
1388 		if (closest == NULL)
1389 			closest = c;
1390 		else {
1391 			WT_ERR(WT_LSM_CURCMP(session,
1392 			    clsm->lsm_tree, c, closest, cmp));
1393 			if (cmp < 0)
1394 				closest = c;
1395 		}
1396 	}
1397 
1398 	/*
1399 	 * At this point, we either have an exact match, or closest is the
1400 	 * smallest cursor larger than the search key, or it is NULL if the
1401 	 * search key is larger than any record in the tree.
1402 	 */
1403 	cmp = exact ? 0 : 1;
1404 
1405 	/*
1406 	 * If we land on a deleted item, try going forwards or backwards to
1407 	 * find one that isn't deleted.  If the whole tree is empty, we'll
1408 	 * end up with WT_NOTFOUND, as expected.
1409 	 */
1410 	if (closest == NULL)
1411 		deleted = true;
1412 	else {
1413 		WT_ERR(closest->get_key(closest, &cursor->key));
1414 		WT_ERR(closest->get_value(closest, &cursor->value));
1415 		clsm->current = closest;
1416 		closest = NULL;
1417 		deleted = __clsm_deleted(clsm, &cursor->value);
1418 		if (!deleted)
1419 			__clsm_deleted_decode(clsm, &cursor->value);
1420 		else  {
1421 			/*
1422 			 * We have a key pointing at memory that is
1423 			 * pinned by the current chunk cursor.  In the
1424 			 * unlikely event that we have to reopen cursors
1425 			 * to move to the next record, make sure the cursor
1426 			 * flags are set so a copy is made before the current
1427 			 * chunk cursor releases its position.
1428 			 */
1429 			F_CLR(cursor, WT_CURSTD_KEY_SET);
1430 			F_SET(cursor, WT_CURSTD_KEY_INT);
1431 			/*
1432 			 * We call __clsm_next here as we want to advance
1433 			 * forward. If we are a random LSM cursor calling next
1434 			 * on the cursor will not advance as we intend.
1435 			 */
1436 			if ((ret = __clsm_next(cursor)) == 0) {
1437 				cmp = 1;
1438 				deleted = false;
1439 			}
1440 		}
1441 		WT_ERR_NOTFOUND_OK(ret);
1442 	}
1443 	if (deleted) {
1444 		clsm->current = NULL;
1445 		/*
1446 		 * We call prev directly here as cursor->prev may be "invalid"
1447 		 * if this is a random cursor.
1448 		 */
1449 		WT_ERR(__clsm_prev(cursor));
1450 		cmp = -1;
1451 	}
1452 	*exactp = cmp;
1453 
1454 err:	__clsm_leave(clsm);
1455 	if (closest != NULL)
1456 		WT_TRET(closest->reset(closest));
1457 
1458 	F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET);
1459 	if (ret == 0) {
1460 		F_SET(cursor, WT_CURSTD_KEY_INT | WT_CURSTD_VALUE_INT);
1461 	} else
1462 		clsm->current = NULL;
1463 
1464 	API_END_RET(session, ret);
1465 }
1466 
1467 /*
1468  * __clsm_put --
1469  *	Put an entry into the in-memory tree, trigger a file switch if
1470  *	necessary.
1471  */
1472 static inline int
__clsm_put(WT_SESSION_IMPL * session,WT_CURSOR_LSM * clsm,const WT_ITEM * key,const WT_ITEM * value,bool position,bool reserve)1473 __clsm_put(WT_SESSION_IMPL *session, WT_CURSOR_LSM *clsm,
1474     const WT_ITEM *key, const WT_ITEM *value, bool position, bool reserve)
1475 {
1476 	WT_CURSOR *c, *primary;
1477 	WT_LSM_TREE *lsm_tree;
1478 	u_int i, slot;
1479 	int (*func)(WT_CURSOR *);
1480 
1481 	lsm_tree = clsm->lsm_tree;
1482 
1483 	WT_ASSERT(session,
1484 	    F_ISSET(&session->txn, WT_TXN_HAS_ID) &&
1485 	    clsm->primary_chunk != NULL &&
1486 	    (clsm->primary_chunk->switch_txn == WT_TXN_NONE ||
1487 	    WT_TXNID_LE(session->txn.id, clsm->primary_chunk->switch_txn)));
1488 
1489 	/*
1490 	 * Clear the existing cursor position.  Don't clear the primary cursor:
1491 	 * we're about to use it anyway.
1492 	 */
1493 	primary = clsm->chunks[clsm->nchunks - 1]->cursor;
1494 	WT_RET(__clsm_reset_cursors(clsm, primary));
1495 
1496 	/* If necessary, set the position for future scans. */
1497 	if (position)
1498 		clsm->current = primary;
1499 
1500 	for (i = 0, slot = clsm->nchunks - 1; i < clsm->nupdates; i++, slot--) {
1501 		/* Check if we need to keep updating old chunks. */
1502 		if (i > 0 && __wt_txn_visible(
1503 		    session, clsm->chunks[slot]->switch_txn, NULL)) {
1504 			clsm->nupdates = i;
1505 			break;
1506 		}
1507 
1508 		c = clsm->chunks[slot]->cursor;
1509 		c->set_key(c, key);
1510 		func = c->insert;
1511 		if (i == 0 && position)
1512 			func = reserve ? c->reserve : c->update;
1513 		if (func != c->reserve)
1514 			c->set_value(c, value);
1515 		WT_RET(func(c));
1516 	}
1517 
1518 	/*
1519 	 * Update the record count.  It is in a shared structure, but it's only
1520 	 * approximate, so don't worry about protecting access.
1521 	 *
1522 	 * Throttle if necessary.  Every 100 update operations on each cursor,
1523 	 * check if throttling is required.  Don't rely only on the shared
1524 	 * counter because it can race, and because for some workloads, there
1525 	 * may not be enough records per chunk to get effective throttling.
1526 	 */
1527 	if ((++clsm->primary_chunk->count % 100 == 0 ||
1528 	    ++clsm->update_count >= 100) &&
1529 	    lsm_tree->merge_throttle + lsm_tree->ckpt_throttle > 0) {
1530 		clsm->update_count = 0;
1531 		WT_LSM_TREE_STAT_INCRV(session,
1532 		    lsm_tree->lsm_checkpoint_throttle, lsm_tree->ckpt_throttle);
1533 		WT_STAT_CONN_INCRV(session,
1534 		    lsm_checkpoint_throttle, lsm_tree->ckpt_throttle);
1535 		WT_LSM_TREE_STAT_INCRV(session,
1536 		    lsm_tree->lsm_merge_throttle, lsm_tree->merge_throttle);
1537 		WT_STAT_CONN_INCRV(session,
1538 		    lsm_merge_throttle, lsm_tree->merge_throttle);
1539 		__wt_sleep(0,
1540 		    lsm_tree->ckpt_throttle + lsm_tree->merge_throttle);
1541 	}
1542 
1543 	return (0);
1544 }
1545 
1546 /*
1547  * __clsm_insert --
1548  *	WT_CURSOR->insert method for the LSM cursor type.
1549  */
1550 static int
__clsm_insert(WT_CURSOR * cursor)1551 __clsm_insert(WT_CURSOR *cursor)
1552 {
1553 	WT_CURSOR_LSM *clsm;
1554 	WT_DECL_ITEM(buf);
1555 	WT_DECL_RET;
1556 	WT_ITEM value;
1557 	WT_SESSION_IMPL *session;
1558 
1559 	clsm = (WT_CURSOR_LSM *)cursor;
1560 
1561 	CURSOR_UPDATE_API_CALL(cursor, session, insert);
1562 	WT_ERR(__cursor_needkey(cursor));
1563 	WT_ERR(__cursor_needvalue(cursor));
1564 	WT_ERR(__clsm_enter(clsm, false, true));
1565 
1566 	/*
1567 	 * It isn't necessary to copy the key out after the lookup in this
1568 	 * case because any non-failed lookup results in an error, and a
1569 	 * failed lookup leaves the original key intact.
1570 	 */
1571 	if (!F_ISSET(cursor, WT_CURSTD_OVERWRITE) &&
1572 	    (ret = __clsm_lookup(clsm, &value)) != WT_NOTFOUND) {
1573 		if (ret == 0)
1574 			ret = WT_DUPLICATE_KEY;
1575 		goto err;
1576 	}
1577 
1578 	WT_ERR(__clsm_deleted_encode(session, &cursor->value, &value, &buf));
1579 	WT_ERR(__clsm_put(session, clsm, &cursor->key, &value, false, false));
1580 
1581 	/*
1582 	 * WT_CURSOR.insert doesn't leave the cursor positioned, and the
1583 	 * application may want to free the memory used to configure the
1584 	 * insert; don't read that memory again (matching the underlying
1585 	 * file object cursor insert semantics).
1586 	 */
1587 	F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET);
1588 
1589 err:	__wt_scr_free(session, &buf);
1590 	__clsm_leave(clsm);
1591 	CURSOR_UPDATE_API_END(session, ret);
1592 	return (ret);
1593 }
1594 
1595 /*
1596  * __clsm_update --
1597  *	WT_CURSOR->update method for the LSM cursor type.
1598  */
1599 static int
__clsm_update(WT_CURSOR * cursor)1600 __clsm_update(WT_CURSOR *cursor)
1601 {
1602 	WT_CURSOR_LSM *clsm;
1603 	WT_DECL_ITEM(buf);
1604 	WT_DECL_RET;
1605 	WT_ITEM value;
1606 	WT_SESSION_IMPL *session;
1607 
1608 	clsm = (WT_CURSOR_LSM *)cursor;
1609 
1610 	CURSOR_UPDATE_API_CALL(cursor, session, update);
1611 	WT_ERR(__cursor_needkey(cursor));
1612 	WT_ERR(__cursor_needvalue(cursor));
1613 	WT_ERR(__clsm_enter(clsm, false, true));
1614 
1615 	if (!F_ISSET(cursor, WT_CURSTD_OVERWRITE)) {
1616 		WT_ERR(__clsm_lookup(clsm, &value));
1617 		/*
1618 		 * Copy the key out, since the insert resets non-primary chunk
1619 		 * cursors which our lookup may have landed on.
1620 		 */
1621 		WT_ERR(__cursor_needkey(cursor));
1622 	}
1623 	WT_ERR(__clsm_deleted_encode(session, &cursor->value, &value, &buf));
1624 	WT_ERR(__clsm_put(session, clsm, &cursor->key, &value, true, false));
1625 
1626 	/*
1627 	 * Set the cursor to reference the internal key/value of the positioned
1628 	 * cursor.
1629 	 */
1630 	F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET);
1631 	WT_ITEM_SET(cursor->key, clsm->current->key);
1632 	WT_ITEM_SET(cursor->value, clsm->current->value);
1633 	WT_ASSERT(session,
1634 	    F_MASK(clsm->current, WT_CURSTD_KEY_SET) == WT_CURSTD_KEY_INT);
1635 	WT_ASSERT(session,
1636 	    F_MASK(clsm->current, WT_CURSTD_VALUE_SET) == WT_CURSTD_VALUE_INT);
1637 	F_SET(cursor, WT_CURSTD_KEY_INT | WT_CURSTD_VALUE_INT);
1638 
1639 err:	__wt_scr_free(session, &buf);
1640 	__clsm_leave(clsm);
1641 	CURSOR_UPDATE_API_END(session, ret);
1642 	return (ret);
1643 }
1644 
1645 /*
1646  * __clsm_remove --
1647  *	WT_CURSOR->remove method for the LSM cursor type.
1648  */
1649 static int
__clsm_remove(WT_CURSOR * cursor)1650 __clsm_remove(WT_CURSOR *cursor)
1651 {
1652 	WT_CURSOR_LSM *clsm;
1653 	WT_DECL_RET;
1654 	WT_ITEM value;
1655 	WT_SESSION_IMPL *session;
1656 	bool positioned;
1657 
1658 	clsm = (WT_CURSOR_LSM *)cursor;
1659 
1660 	/* Check if the cursor is positioned. */
1661 	positioned = F_ISSET(cursor, WT_CURSTD_KEY_INT);
1662 
1663 	CURSOR_REMOVE_API_CALL(cursor, session, NULL);
1664 	WT_ERR(__cursor_needkey(cursor));
1665 	__cursor_novalue(cursor);
1666 	WT_ERR(__clsm_enter(clsm, false, true));
1667 
1668 	if (!F_ISSET(cursor, WT_CURSTD_OVERWRITE)) {
1669 		WT_ERR(__clsm_lookup(clsm, &value));
1670 		/*
1671 		 * Copy the key out, since the insert resets non-primary chunk
1672 		 * cursors which our lookup may have landed on.
1673 		 */
1674 		WT_ERR(__cursor_needkey(cursor));
1675 	}
1676 	WT_ERR(__clsm_put(
1677 	    session, clsm, &cursor->key, &__tombstone, positioned, false));
1678 
1679 	/*
1680 	 * If the cursor was positioned, it stays positioned with a key but no
1681 	 * no value, otherwise, there's no position, key or value. This isn't
1682 	 * just cosmetic, without a reset, iteration on this cursor won't start
1683 	 * at the beginning/end of the table.
1684 	 */
1685 	F_CLR(cursor, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET);
1686 	if (positioned)
1687 		F_SET(cursor, WT_CURSTD_KEY_INT);
1688 	else
1689 		WT_TRET(cursor->reset(cursor));
1690 
1691 err:	__clsm_leave(clsm);
1692 	CURSOR_UPDATE_API_END(session, ret);
1693 	return (ret);
1694 }
1695 
1696 /*
1697  * __clsm_reserve --
1698  *     WT_CURSOR->reserve method for the LSM cursor type.
1699  */
1700 static int
__clsm_reserve(WT_CURSOR * cursor)1701 __clsm_reserve(WT_CURSOR *cursor)
1702 {
1703 	WT_CURSOR_LSM *clsm;
1704 	WT_DECL_RET;
1705 	WT_ITEM value;
1706 	WT_SESSION_IMPL *session;
1707 
1708 	clsm = (WT_CURSOR_LSM *)cursor;
1709 
1710 	CURSOR_UPDATE_API_CALL(cursor, session, reserve);
1711 	WT_ERR(__cursor_needkey(cursor));
1712 	__cursor_novalue(cursor);
1713 	WT_ERR(__wt_txn_context_check(session, true));
1714 	WT_ERR(__clsm_enter(clsm, false, true));
1715 
1716 	WT_ERR(__clsm_lookup(clsm, &value));
1717 	/*
1718 	 * Copy the key out, since the insert resets non-primary chunk cursors
1719 	 * which our lookup may have landed on.
1720 	 */
1721 	WT_ERR(__cursor_needkey(cursor));
1722 	ret = __clsm_put(session, clsm, &cursor->key, NULL, true, true);
1723 
1724 err:	__clsm_leave(clsm);
1725 	CURSOR_UPDATE_API_END(session, ret);
1726 
1727 	/*
1728 	 * The application might do a WT_CURSOR.get_value call when we return,
1729 	 * so we need a value and the underlying functions didn't set one up.
1730 	 * For various reasons, those functions may not have done a search and
1731 	 * any previous value in the cursor might race with WT_CURSOR.reserve
1732 	 * (and in cases like LSM, the reserve never encountered the original
1733 	 * key). For simplicity, repeat the search here.
1734 	 */
1735 	return (ret == 0 ? cursor->search(cursor) : ret);
1736 }
1737 
1738 /*
1739  * __wt_clsm_close --
1740  *	WT_CURSOR->close method for the LSM cursor type.
1741  */
1742 int
__wt_clsm_close(WT_CURSOR * cursor)1743 __wt_clsm_close(WT_CURSOR *cursor)
1744 {
1745 	WT_CURSOR_LSM *clsm;
1746 	WT_DECL_RET;
1747 	WT_SESSION_IMPL *session;
1748 
1749 	/*
1750 	 * Don't use the normal __clsm_enter path: that is wasted work when
1751 	 * closing, and the cursor may never have been used.
1752 	 */
1753 	clsm = (WT_CURSOR_LSM *)cursor;
1754 	CURSOR_API_CALL_PREPARE_ALLOWED(cursor, session, close, NULL);
1755 err:
1756 
1757 	WT_TRET(__clsm_close_cursors(session, clsm, 0, clsm->nchunks));
1758 	__clsm_free_chunks(session, clsm);
1759 
1760 	/* In case we were somehow left positioned, clear that. */
1761 	__clsm_leave(clsm);
1762 
1763 	if (clsm->lsm_tree != NULL)
1764 		__wt_lsm_tree_release(session, clsm->lsm_tree);
1765 	__wt_cursor_close(cursor);
1766 
1767 	API_END_RET(session, ret);
1768 }
1769 
1770 /*
1771  * __wt_clsm_open --
1772  *	WT_SESSION->open_cursor method for LSM cursors.
1773  */
1774 int
__wt_clsm_open(WT_SESSION_IMPL * session,const char * uri,WT_CURSOR * owner,const char * cfg[],WT_CURSOR ** cursorp)1775 __wt_clsm_open(WT_SESSION_IMPL *session,
1776     const char *uri, WT_CURSOR *owner, const char *cfg[], WT_CURSOR **cursorp)
1777 {
1778 	WT_CONFIG_ITEM cval;
1779 	WT_CURSOR_STATIC_INIT(iface,
1780 	    __wt_cursor_get_key,		/* get-key */
1781 	    __wt_cursor_get_value,		/* get-value */
1782 	    __wt_cursor_set_key,		/* set-key */
1783 	    __wt_cursor_set_value,		/* set-value */
1784 	    __clsm_compare,			/* compare */
1785 	    __wt_cursor_equals,			/* equals */
1786 	    __clsm_next,			/* next */
1787 	    __clsm_prev,			/* prev */
1788 	    __clsm_reset,			/* reset */
1789 	    __clsm_search,			/* search */
1790 	    __clsm_search_near,			/* search-near */
1791 	    __clsm_insert,			/* insert */
1792 	    __wt_cursor_modify_notsup,		/* modify */
1793 	    __clsm_update,			/* update */
1794 	    __clsm_remove,			/* remove */
1795 	    __clsm_reserve,			/* reserve */
1796 	    __wt_cursor_reconfigure,		/* reconfigure */
1797 	    __wt_cursor_notsup,			/* cache */
1798 	    __wt_cursor_reopen_notsup,		/* reopen */
1799 	    __wt_clsm_close);			/* close */
1800 	WT_CURSOR *cursor;
1801 	WT_CURSOR_LSM *clsm;
1802 	WT_DECL_RET;
1803 	WT_LSM_TREE *lsm_tree;
1804 	bool bulk;
1805 
1806 	WT_STATIC_ASSERT(offsetof(WT_CURSOR_LSM, iface) == 0);
1807 
1808 	clsm = NULL;
1809 	cursor = NULL;
1810 	lsm_tree = NULL;
1811 
1812 	if (!WT_PREFIX_MATCH(uri, "lsm:"))
1813 		return (__wt_unexpected_object_type(session, uri, "lsm:"));
1814 
1815 	WT_RET(__wt_inmem_unsupported_op(session, "LSM trees"));
1816 
1817 	WT_RET(__wt_config_gets_def(session, cfg, "checkpoint", 0, &cval));
1818 	if (cval.len != 0)
1819 		WT_RET_MSG(session, EINVAL,
1820 		    "LSM does not support opening by checkpoint");
1821 
1822 	WT_RET(__wt_config_gets_def(session, cfg, "bulk", 0, &cval));
1823 	bulk = cval.val != 0;
1824 
1825 	/* Get the LSM tree. */
1826 	ret = __wt_lsm_tree_get(session, uri, bulk, &lsm_tree);
1827 
1828 	/*
1829 	 * Check whether the exclusive open for a bulk load succeeded, and
1830 	 * if it did ensure that it's safe to bulk load into the tree.
1831 	 */
1832 	if (bulk && (ret == EBUSY || (ret == 0 &&  lsm_tree->nchunks > 1)))
1833 		WT_ERR_MSG(session, EINVAL,
1834 		    "bulk-load is only supported on newly created LSM trees");
1835 	/* Flag any errors from the tree get. */
1836 	WT_ERR(ret);
1837 
1838 	/* Make sure we have exclusive access if and only if we want it */
1839 	WT_ASSERT(session, !bulk || lsm_tree->excl_session != NULL);
1840 
1841 	WT_ERR(__wt_calloc_one(session, &clsm));
1842 	cursor = (WT_CURSOR *)clsm;
1843 	*cursor = iface;
1844 	cursor->session = (WT_SESSION *)session;
1845 	WT_ERR(__wt_strdup(session, lsm_tree->name, &cursor->uri));
1846 	cursor->key_format = lsm_tree->key_format;
1847 	cursor->value_format = lsm_tree->value_format;
1848 
1849 	clsm->lsm_tree = lsm_tree;
1850 	lsm_tree = NULL;
1851 
1852 	/*
1853 	 * The tree's dsk_gen starts at one, so starting the cursor on zero
1854 	 * will force a call into open_cursors on the first operation.
1855 	 */
1856 	clsm->dsk_gen = 0;
1857 
1858 	/* If the next_random option is set, configure a random cursor */
1859 	WT_ERR(__wt_config_gets_def(session, cfg, "next_random", 0, &cval));
1860 	if (cval.val != 0) {
1861 		__wt_cursor_set_notsup(cursor);
1862 		cursor->next = __clsm_next_random;
1863 	}
1864 
1865 	WT_ERR(__wt_cursor_init(cursor, cursor->uri, owner, cfg, cursorp));
1866 
1867 	if (bulk)
1868 		WT_ERR(__wt_clsm_open_bulk(clsm, cfg));
1869 
1870 	if (0) {
1871 err:
1872 		if (clsm != NULL)
1873 			WT_TRET(__wt_clsm_close(cursor));
1874 		else if (lsm_tree != NULL)
1875 			__wt_lsm_tree_release(session, lsm_tree);
1876 
1877 		*cursorp = NULL;
1878 	}
1879 
1880 	return (ret);
1881 }
1882