1 /*-
2 * Copyright (c) 2014-2018 MongoDB, Inc.
3 * Copyright (c) 2008-2014 WiredTiger, Inc.
4 * All rights reserved.
5 *
6 * See the file LICENSE for redistribution information.
7 */
8
9 /*
10 * __col_insert_search_gt --
11 * Search a column-store insert list for the next larger record.
12 */
13 static inline WT_INSERT *
__col_insert_search_gt(WT_INSERT_HEAD * ins_head,uint64_t recno)14 __col_insert_search_gt(WT_INSERT_HEAD *ins_head, uint64_t recno)
15 {
16 WT_INSERT *ins, **insp;
17 int i;
18
19 /* If there's no insert chain to search, we're done. */
20 if ((ins = WT_SKIP_LAST(ins_head)) == NULL)
21 return (NULL);
22
23 /* Fast path check for targets past the end of the skiplist. */
24 if (recno >= WT_INSERT_RECNO(ins))
25 return (NULL);
26
27 /*
28 * The insert list is a skip list: start at the highest skip level, then
29 * go as far as possible at each level before stepping down to the next.
30 */
31 ins = NULL;
32 for (i = WT_SKIP_MAXDEPTH - 1, insp = &ins_head->head[i]; i >= 0;)
33 if (*insp != NULL && recno >= WT_INSERT_RECNO(*insp)) {
34 ins = *insp; /* GTE: keep going at this level */
35 insp = &(*insp)->next[i];
36 } else {
37 --i; /* LT: drop down a level */
38 --insp;
39 }
40
41 /*
42 * If we didn't find any records smaller than the target, we never set
43 * the return value, set it to the first record in the list. Otherwise,
44 * it references a record less-than-or-equal to the target, move to a
45 * later record, that is, a subsequent record greater than the target.
46 * Because inserts happen concurrently, additional records might be
47 * inserted after the searched-for record that are still smaller than
48 * the target, continue to move forward until reaching a record larger
49 * than the target. There isn't any safety testing because we confirmed
50 * such a record exists before searching.
51 */
52 if (ins == NULL)
53 ins = WT_SKIP_FIRST(ins_head);
54 while (recno >= WT_INSERT_RECNO(ins))
55 ins = WT_SKIP_NEXT(ins);
56 return (ins);
57 }
58
59 /*
60 * __col_insert_search_lt --
61 * Search a column-store insert list for the next smaller record.
62 */
63 static inline WT_INSERT *
__col_insert_search_lt(WT_INSERT_HEAD * ins_head,uint64_t recno)64 __col_insert_search_lt(WT_INSERT_HEAD *ins_head, uint64_t recno)
65 {
66 WT_INSERT *ins, **insp;
67 int i;
68
69 /* If there's no insert chain to search, we're done. */
70 if ((ins = WT_SKIP_FIRST(ins_head)) == NULL)
71 return (NULL);
72
73 /* Fast path check for targets before the skiplist. */
74 if (recno <= WT_INSERT_RECNO(ins))
75 return (NULL);
76
77 /*
78 * The insert list is a skip list: start at the highest skip level, then
79 * go as far as possible at each level before stepping down to the next.
80 */
81 for (i = WT_SKIP_MAXDEPTH - 1, insp = &ins_head->head[i]; i >= 0;)
82 if (*insp != NULL && recno > WT_INSERT_RECNO(*insp)) {
83 ins = *insp; /* GT: keep going at this level */
84 insp = &(*insp)->next[i];
85 } else {
86 --i; /* LTE: drop down a level */
87 --insp;
88 }
89
90 return (ins);
91 }
92
93 /*
94 * __col_insert_search_match --
95 * Search a column-store insert list for an exact match.
96 */
97 static inline WT_INSERT *
__col_insert_search_match(WT_INSERT_HEAD * ins_head,uint64_t recno)98 __col_insert_search_match(WT_INSERT_HEAD *ins_head, uint64_t recno)
99 {
100 WT_INSERT **insp, *ret_ins;
101 uint64_t ins_recno;
102 int cmp, i;
103
104 /* If there's no insert chain to search, we're done. */
105 if ((ret_ins = WT_SKIP_LAST(ins_head)) == NULL)
106 return (NULL);
107
108 /* Fast path the check for values at the end of the skiplist. */
109 if (recno > WT_INSERT_RECNO(ret_ins))
110 return (NULL);
111 if (recno == WT_INSERT_RECNO(ret_ins))
112 return (ret_ins);
113
114 /*
115 * The insert list is a skip list: start at the highest skip level, then
116 * go as far as possible at each level before stepping down to the next.
117 */
118 for (i = WT_SKIP_MAXDEPTH - 1, insp = &ins_head->head[i]; i >= 0; ) {
119 if (*insp == NULL) {
120 --i;
121 --insp;
122 continue;
123 }
124
125 ins_recno = WT_INSERT_RECNO(*insp);
126 cmp = (recno == ins_recno) ? 0 : (recno < ins_recno) ? -1 : 1;
127
128 if (cmp == 0) /* Exact match: return */
129 return (*insp);
130 if (cmp > 0) /* Keep going at this level */
131 insp = &(*insp)->next[i];
132 else { /* Drop down a level */
133 --i;
134 --insp;
135 }
136 }
137
138 return (NULL);
139 }
140
141 /*
142 * __col_insert_search --
143 * Search a column-store insert list, creating a skiplist stack as we go.
144 */
145 static inline WT_INSERT *
__col_insert_search(WT_INSERT_HEAD * ins_head,WT_INSERT *** ins_stack,WT_INSERT ** next_stack,uint64_t recno)146 __col_insert_search(WT_INSERT_HEAD *ins_head,
147 WT_INSERT ***ins_stack, WT_INSERT **next_stack, uint64_t recno)
148 {
149 WT_INSERT **insp, *ret_ins;
150 uint64_t ins_recno;
151 int cmp, i;
152
153 /* If there's no insert chain to search, we're done. */
154 if ((ret_ins = WT_SKIP_LAST(ins_head)) == NULL)
155 return (NULL);
156
157 /* Fast path appends. */
158 if (recno >= WT_INSERT_RECNO(ret_ins)) {
159 for (i = 0; i < WT_SKIP_MAXDEPTH; i++) {
160 ins_stack[i] = (i == 0) ? &ret_ins->next[0] :
161 (ins_head->tail[i] != NULL) ?
162 &ins_head->tail[i]->next[i] : &ins_head->head[i];
163 next_stack[i] = NULL;
164 }
165 return (ret_ins);
166 }
167
168 /*
169 * The insert list is a skip list: start at the highest skip level, then
170 * go as far as possible at each level before stepping down to the next.
171 */
172 for (i = WT_SKIP_MAXDEPTH - 1, insp = &ins_head->head[i]; i >= 0; ) {
173 if ((ret_ins = *insp) == NULL) {
174 next_stack[i] = NULL;
175 ins_stack[i--] = insp--;
176 continue;
177 }
178
179 /*
180 * When no exact match is found, the search returns the smallest
181 * key larger than the searched-for key, or the largest key
182 * smaller than the searched-for key, if there is no larger key.
183 * Our callers depend on that: specifically, the fixed-length
184 * column store cursor code interprets returning a key smaller
185 * than the searched-for key to mean the searched-for key is
186 * larger than any key on the page. Don't change that behavior,
187 * things will break.
188 */
189 ins_recno = WT_INSERT_RECNO(ret_ins);
190 cmp = (recno == ins_recno) ? 0 : (recno < ins_recno) ? -1 : 1;
191
192 if (cmp > 0) /* Keep going at this level */
193 insp = &ret_ins->next[i];
194 else if (cmp == 0) /* Exact match: return */
195 for (; i >= 0; i--) {
196 next_stack[i] = ret_ins->next[i];
197 ins_stack[i] = &ret_ins->next[i];
198 }
199 else { /* Drop down a level */
200 next_stack[i] = ret_ins;
201 ins_stack[i--] = insp--;
202 }
203 }
204 return (ret_ins);
205 }
206
207 /*
208 * __col_var_last_recno --
209 * Return the last record number for a variable-length column-store page.
210 */
211 static inline uint64_t
__col_var_last_recno(WT_REF * ref)212 __col_var_last_recno(WT_REF *ref)
213 {
214 WT_COL_RLE *repeat;
215 WT_PAGE *page;
216
217 page = ref->page;
218
219 /*
220 * If there's an append list, there may be more records on the page.
221 * This function ignores those records, our callers must handle that
222 * explicitly, if they care.
223 */
224 if (!WT_COL_VAR_REPEAT_SET(page))
225 return (page->entries == 0 ? 0 :
226 ref->ref_recno + (page->entries - 1));
227
228 repeat = &page->pg_var_repeats[page->pg_var_nrepeats - 1];
229 return ((repeat->recno + repeat->rle) - 1 +
230 (page->entries - (repeat->indx + 1)));
231 }
232
233 /*
234 * __col_fix_last_recno --
235 * Return the last record number for a fixed-length column-store page.
236 */
237 static inline uint64_t
__col_fix_last_recno(WT_REF * ref)238 __col_fix_last_recno(WT_REF *ref)
239 {
240 WT_PAGE *page;
241
242 page = ref->page;
243
244 /*
245 * If there's an append list, there may be more records on the page.
246 * This function ignores those records, our callers must handle that
247 * explicitly, if they care.
248 */
249 return (page->entries == 0 ? 0 : ref->ref_recno + (page->entries - 1));
250 }
251
252 /*
253 * __col_var_search --
254 * Search a variable-length column-store page for a record.
255 */
256 static inline WT_COL *
__col_var_search(WT_REF * ref,uint64_t recno,uint64_t * start_recnop)257 __col_var_search(WT_REF *ref, uint64_t recno, uint64_t *start_recnop)
258 {
259 WT_COL_RLE *repeat;
260 WT_PAGE *page;
261 uint64_t start_recno;
262 uint32_t base, indx, limit, start_indx;
263
264 page = ref->page;
265
266 /*
267 * Find the matching slot.
268 *
269 * This is done in two stages: first, we do a binary search among any
270 * repeating records to find largest repeating less than the search key.
271 * Once there, we can do a simple offset calculation to find the correct
272 * slot for this record number, because we know any intervening records
273 * have repeat counts of 1.
274 */
275 for (base = 0,
276 limit = WT_COL_VAR_REPEAT_SET(page) ? page->pg_var_nrepeats : 0;
277 limit != 0; limit >>= 1) {
278 indx = base + (limit >> 1);
279
280 repeat = page->pg_var_repeats + indx;
281 if (recno >= repeat->recno &&
282 recno < repeat->recno + repeat->rle) {
283 if (start_recnop != NULL)
284 *start_recnop = repeat->recno;
285 return (page->pg_var + repeat->indx);
286 }
287 if (recno < repeat->recno)
288 continue;
289 base = indx + 1;
290 --limit;
291 }
292
293 /*
294 * We didn't find an exact match, move forward from the largest repeat
295 * less than the search key.
296 */
297 if (base == 0) {
298 start_indx = 0;
299 start_recno = ref->ref_recno;
300 } else {
301 repeat = page->pg_var_repeats + (base - 1);
302 start_indx = repeat->indx + 1;
303 start_recno = repeat->recno + repeat->rle;
304 }
305
306 /*
307 * !!!
308 * The test could be written more simply as:
309 *
310 * (recno >= start_recno + (page->entries - start_indx))
311 *
312 * It's split into two parts because the simpler test will overflow if
313 * searching for large record numbers.
314 */
315 if (recno >= start_recno &&
316 recno - start_recno >= page->entries - start_indx)
317 return (NULL);
318
319 return (page->pg_var + start_indx + (uint32_t)(recno - start_recno));
320 }
321