1 /*-
2  * Copyright (c) 2014-2018 MongoDB, Inc.
3  * Copyright (c) 2008-2014 WiredTiger, Inc.
4  *	All rights reserved.
5  *
6  * See the file LICENSE for redistribution information.
7  */
8 
9 /*
10  * __col_insert_search_gt --
11  *	Search a column-store insert list for the next larger record.
12  */
13 static inline WT_INSERT *
__col_insert_search_gt(WT_INSERT_HEAD * ins_head,uint64_t recno)14 __col_insert_search_gt(WT_INSERT_HEAD *ins_head, uint64_t recno)
15 {
16 	WT_INSERT *ins, **insp;
17 	int i;
18 
19 	/* If there's no insert chain to search, we're done. */
20 	if ((ins = WT_SKIP_LAST(ins_head)) == NULL)
21 		return (NULL);
22 
23 	/* Fast path check for targets past the end of the skiplist. */
24 	if (recno >= WT_INSERT_RECNO(ins))
25 		return (NULL);
26 
27 	/*
28 	 * The insert list is a skip list: start at the highest skip level, then
29 	 * go as far as possible at each level before stepping down to the next.
30 	 */
31 	ins = NULL;
32 	for (i = WT_SKIP_MAXDEPTH - 1, insp = &ins_head->head[i]; i >= 0;)
33 		if (*insp != NULL && recno >= WT_INSERT_RECNO(*insp)) {
34 			ins = *insp;	/* GTE: keep going at this level */
35 			insp = &(*insp)->next[i];
36 		} else {
37 			--i;		/* LT: drop down a level */
38 			--insp;
39 		}
40 
41 	/*
42 	 * If we didn't find any records smaller than the target, we never set
43 	 * the return value, set it to the first record in the list. Otherwise,
44 	 * it references a record less-than-or-equal to the target, move to a
45 	 * later record, that is, a subsequent record greater than the target.
46 	 * Because inserts happen concurrently, additional records might be
47 	 * inserted after the searched-for record that are still smaller than
48 	 * the target, continue to move forward until reaching a record larger
49 	 * than the target. There isn't any safety testing because we confirmed
50 	 * such a record exists before searching.
51 	 */
52 	if (ins == NULL)
53 		ins = WT_SKIP_FIRST(ins_head);
54 	while (recno >= WT_INSERT_RECNO(ins))
55 		ins = WT_SKIP_NEXT(ins);
56 	return (ins);
57 }
58 
59 /*
60  * __col_insert_search_lt --
61  *	Search a column-store insert list for the next smaller record.
62  */
63 static inline WT_INSERT *
__col_insert_search_lt(WT_INSERT_HEAD * ins_head,uint64_t recno)64 __col_insert_search_lt(WT_INSERT_HEAD *ins_head, uint64_t recno)
65 {
66 	WT_INSERT *ins, **insp;
67 	int i;
68 
69 	/* If there's no insert chain to search, we're done. */
70 	if ((ins = WT_SKIP_FIRST(ins_head)) == NULL)
71 		return (NULL);
72 
73 	/* Fast path check for targets before the skiplist. */
74 	if (recno <= WT_INSERT_RECNO(ins))
75 		return (NULL);
76 
77 	/*
78 	 * The insert list is a skip list: start at the highest skip level, then
79 	 * go as far as possible at each level before stepping down to the next.
80 	 */
81 	for (i = WT_SKIP_MAXDEPTH - 1, insp = &ins_head->head[i]; i >= 0;)
82 		if (*insp != NULL && recno > WT_INSERT_RECNO(*insp)) {
83 			ins = *insp;	/* GT: keep going at this level */
84 			insp = &(*insp)->next[i];
85 		} else  {
86 			--i;		/* LTE: drop down a level */
87 			--insp;
88 		}
89 
90 	return (ins);
91 }
92 
93 /*
94  * __col_insert_search_match --
95  *	Search a column-store insert list for an exact match.
96  */
97 static inline WT_INSERT *
__col_insert_search_match(WT_INSERT_HEAD * ins_head,uint64_t recno)98 __col_insert_search_match(WT_INSERT_HEAD *ins_head, uint64_t recno)
99 {
100 	WT_INSERT **insp, *ret_ins;
101 	uint64_t ins_recno;
102 	int cmp, i;
103 
104 	/* If there's no insert chain to search, we're done. */
105 	if ((ret_ins = WT_SKIP_LAST(ins_head)) == NULL)
106 		return (NULL);
107 
108 	/* Fast path the check for values at the end of the skiplist. */
109 	if (recno > WT_INSERT_RECNO(ret_ins))
110 		return (NULL);
111 	if (recno == WT_INSERT_RECNO(ret_ins))
112 		return (ret_ins);
113 
114 	/*
115 	 * The insert list is a skip list: start at the highest skip level, then
116 	 * go as far as possible at each level before stepping down to the next.
117 	 */
118 	for (i = WT_SKIP_MAXDEPTH - 1, insp = &ins_head->head[i]; i >= 0; ) {
119 		if (*insp == NULL) {
120 			--i;
121 			--insp;
122 			continue;
123 		}
124 
125 		ins_recno = WT_INSERT_RECNO(*insp);
126 		cmp = (recno == ins_recno) ? 0 : (recno < ins_recno) ? -1 : 1;
127 
128 		if (cmp == 0)			/* Exact match: return */
129 			return (*insp);
130 		if (cmp > 0)			/* Keep going at this level */
131 			insp = &(*insp)->next[i];
132 		else {				/* Drop down a level */
133 			--i;
134 			--insp;
135 		}
136 	}
137 
138 	return (NULL);
139 }
140 
141 /*
142  * __col_insert_search --
143  *	Search a column-store insert list, creating a skiplist stack as we go.
144  */
145 static inline WT_INSERT *
__col_insert_search(WT_INSERT_HEAD * ins_head,WT_INSERT *** ins_stack,WT_INSERT ** next_stack,uint64_t recno)146 __col_insert_search(WT_INSERT_HEAD *ins_head,
147     WT_INSERT ***ins_stack, WT_INSERT **next_stack, uint64_t recno)
148 {
149 	WT_INSERT **insp, *ret_ins;
150 	uint64_t ins_recno;
151 	int cmp, i;
152 
153 	/* If there's no insert chain to search, we're done. */
154 	if ((ret_ins = WT_SKIP_LAST(ins_head)) == NULL)
155 		return (NULL);
156 
157 	/* Fast path appends. */
158 	if (recno >= WT_INSERT_RECNO(ret_ins)) {
159 		for (i = 0; i < WT_SKIP_MAXDEPTH; i++) {
160 			ins_stack[i] = (i == 0) ? &ret_ins->next[0] :
161 			    (ins_head->tail[i] != NULL) ?
162 			    &ins_head->tail[i]->next[i] : &ins_head->head[i];
163 			next_stack[i] = NULL;
164 		}
165 		return (ret_ins);
166 	}
167 
168 	/*
169 	 * The insert list is a skip list: start at the highest skip level, then
170 	 * go as far as possible at each level before stepping down to the next.
171 	 */
172 	for (i = WT_SKIP_MAXDEPTH - 1, insp = &ins_head->head[i]; i >= 0; ) {
173 		if ((ret_ins = *insp) == NULL) {
174 			next_stack[i] = NULL;
175 			ins_stack[i--] = insp--;
176 			continue;
177 		}
178 
179 		/*
180 		 * When no exact match is found, the search returns the smallest
181 		 * key larger than the searched-for key, or the largest key
182 		 * smaller than the searched-for key, if there is no larger key.
183 		 * Our callers depend on that: specifically, the fixed-length
184 		 * column store cursor code interprets returning a key smaller
185 		 * than the searched-for key to mean the searched-for key is
186 		 * larger than any key on the page. Don't change that behavior,
187 		 * things will break.
188 		 */
189 		ins_recno = WT_INSERT_RECNO(ret_ins);
190 		cmp = (recno == ins_recno) ? 0 : (recno < ins_recno) ? -1 : 1;
191 
192 		if (cmp > 0)			/* Keep going at this level */
193 			insp = &ret_ins->next[i];
194 		else if (cmp == 0)		/* Exact match: return */
195 			for (; i >= 0; i--) {
196 				next_stack[i] = ret_ins->next[i];
197 				ins_stack[i] = &ret_ins->next[i];
198 			}
199 		else {				/* Drop down a level */
200 			next_stack[i] = ret_ins;
201 			ins_stack[i--] = insp--;
202 		}
203 	}
204 	return (ret_ins);
205 }
206 
207 /*
208  * __col_var_last_recno --
209  *	Return the last record number for a variable-length column-store page.
210  */
211 static inline uint64_t
__col_var_last_recno(WT_REF * ref)212 __col_var_last_recno(WT_REF *ref)
213 {
214 	WT_COL_RLE *repeat;
215 	WT_PAGE *page;
216 
217 	page = ref->page;
218 
219 	/*
220 	 * If there's an append list, there may be more records on the page.
221 	 * This function ignores those records, our callers must handle that
222 	 * explicitly, if they care.
223 	 */
224 	if (!WT_COL_VAR_REPEAT_SET(page))
225 		return (page->entries == 0 ? 0 :
226 		    ref->ref_recno + (page->entries - 1));
227 
228 	repeat = &page->pg_var_repeats[page->pg_var_nrepeats - 1];
229 	return ((repeat->recno + repeat->rle) - 1 +
230 	    (page->entries - (repeat->indx + 1)));
231 }
232 
233 /*
234  * __col_fix_last_recno --
235  *	Return the last record number for a fixed-length column-store page.
236  */
237 static inline uint64_t
__col_fix_last_recno(WT_REF * ref)238 __col_fix_last_recno(WT_REF *ref)
239 {
240 	WT_PAGE *page;
241 
242 	page = ref->page;
243 
244 	/*
245 	 * If there's an append list, there may be more records on the page.
246 	 * This function ignores those records, our callers must handle that
247 	 * explicitly, if they care.
248 	 */
249 	return (page->entries == 0 ? 0 : ref->ref_recno + (page->entries - 1));
250 }
251 
252 /*
253  * __col_var_search --
254  *	Search a variable-length column-store page for a record.
255  */
256 static inline WT_COL *
__col_var_search(WT_REF * ref,uint64_t recno,uint64_t * start_recnop)257 __col_var_search(WT_REF *ref, uint64_t recno, uint64_t *start_recnop)
258 {
259 	WT_COL_RLE *repeat;
260 	WT_PAGE *page;
261 	uint64_t start_recno;
262 	uint32_t base, indx, limit, start_indx;
263 
264 	page = ref->page;
265 
266 	/*
267 	 * Find the matching slot.
268 	 *
269 	 * This is done in two stages: first, we do a binary search among any
270 	 * repeating records to find largest repeating less than the search key.
271 	 * Once there, we can do a simple offset calculation to find the correct
272 	 * slot for this record number, because we know any intervening records
273 	 * have repeat counts of 1.
274 	 */
275 	for (base = 0,
276 	    limit = WT_COL_VAR_REPEAT_SET(page) ? page->pg_var_nrepeats : 0;
277 	    limit != 0; limit >>= 1) {
278 		indx = base + (limit >> 1);
279 
280 		repeat = page->pg_var_repeats + indx;
281 		if (recno >= repeat->recno &&
282 		    recno < repeat->recno + repeat->rle) {
283 			if (start_recnop != NULL)
284 				*start_recnop = repeat->recno;
285 			return (page->pg_var + repeat->indx);
286 		}
287 		if (recno < repeat->recno)
288 			continue;
289 		base = indx + 1;
290 		--limit;
291 	}
292 
293 	/*
294 	 * We didn't find an exact match, move forward from the largest repeat
295 	 * less than the search key.
296 	 */
297 	if (base == 0) {
298 		start_indx = 0;
299 		start_recno = ref->ref_recno;
300 	} else {
301 		repeat = page->pg_var_repeats + (base - 1);
302 		start_indx = repeat->indx + 1;
303 		start_recno = repeat->recno + repeat->rle;
304 	}
305 
306 	/*
307 	 * !!!
308 	 * The test could be written more simply as:
309 	 *
310 	 * 	(recno >= start_recno + (page->entries - start_indx))
311 	 *
312 	 * It's split into two parts because the simpler test will overflow if
313 	 * searching for large record numbers.
314 	 */
315 	if (recno >= start_recno &&
316 	    recno - start_recno >= page->entries - start_indx)
317 		return (NULL);
318 
319 	return (page->pg_var + start_indx + (uint32_t)(recno - start_recno));
320 }
321