1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 
36 ----------------------------------------
37 
38    Licensed under the Apache License, Version 2.0 (the "License");
39    you may not use this file except in compliance with the License.
40    You may obtain a copy of the License at
41 
42        http://www.apache.org/licenses/LICENSE-2.0
43 
44    Unless required by applicable law or agreed to in writing, software
45    distributed under the License is distributed on an "AS IS" BASIS,
46    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
47    See the License for the specific language governing permissions and
48    limitations under the License.
49 ======= */
50 
51 #ident \
52     "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
53 
54 #include <string.h>
55 
56 #include "../db.h"
57 #include "../portability/memory.h"
58 
59 namespace toku {
60 
61 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
create(void)62 void omt<omtdata_t, omtdataout_t, supports_marks>::create(void) {
63   this->create_internal(2);
64   if (supports_marks) {
65     this->convert_to_tree();
66   }
67 }
68 
69 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
create_no_array(void)70 void omt<omtdata_t, omtdataout_t, supports_marks>::create_no_array(void) {
71   if (!supports_marks) {
72     this->create_internal_no_array(0);
73   } else {
74     this->is_array = false;
75     this->capacity = 0;
76     this->d.t.nodes = nullptr;
77     this->d.t.root.set_to_null();
78     this->d.t.free_idx = 0;
79   }
80 }
81 
82 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
create_from_sorted_array(const omtdata_t * const values,const uint32_t numvalues)83 void omt<omtdata_t, omtdataout_t, supports_marks>::create_from_sorted_array(
84     const omtdata_t *const values, const uint32_t numvalues) {
85   this->create_internal(numvalues);
86   memcpy(this->d.a.values, values, numvalues * (sizeof values[0]));
87   this->d.a.num_values = numvalues;
88   if (supports_marks) {
89     this->convert_to_tree();
90   }
91 }
92 
93 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
create_steal_sorted_array(omtdata_t ** const values,const uint32_t numvalues,const uint32_t new_capacity)94 void omt<omtdata_t, omtdataout_t, supports_marks>::create_steal_sorted_array(
95     omtdata_t **const values, const uint32_t numvalues,
96     const uint32_t new_capacity) {
97   paranoid_invariant_notnull(values);
98   this->create_internal_no_array(new_capacity);
99   this->d.a.num_values = numvalues;
100   this->d.a.values = *values;
101   *values = nullptr;
102   if (supports_marks) {
103     this->convert_to_tree();
104   }
105 }
106 
107 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
split_at(omt * const newomt,const uint32_t idx)108 int omt<omtdata_t, omtdataout_t, supports_marks>::split_at(omt *const newomt,
109                                                            const uint32_t idx) {
110   barf_if_marked(*this);
111   paranoid_invariant_notnull(newomt);
112   if (idx > this->size()) {
113     return EINVAL;
114   }
115   this->convert_to_array();
116   const uint32_t newsize = this->size() - idx;
117   newomt->create_from_sorted_array(&this->d.a.values[this->d.a.start_idx + idx],
118                                    newsize);
119   this->d.a.num_values = idx;
120   this->maybe_resize_array(idx);
121   if (supports_marks) {
122     this->convert_to_tree();
123   }
124   return 0;
125 }
126 
127 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
merge(omt * const leftomt,omt * const rightomt)128 void omt<omtdata_t, omtdataout_t, supports_marks>::merge(omt *const leftomt,
129                                                          omt *const rightomt) {
130   barf_if_marked(*this);
131   paranoid_invariant_notnull(leftomt);
132   paranoid_invariant_notnull(rightomt);
133   const uint32_t leftsize = leftomt->size();
134   const uint32_t rightsize = rightomt->size();
135   const uint32_t newsize = leftsize + rightsize;
136 
137   if (leftomt->is_array) {
138     if (leftomt->capacity -
139             (leftomt->d.a.start_idx + leftomt->d.a.num_values) >=
140         rightsize) {
141       this->create_steal_sorted_array(
142           &leftomt->d.a.values, leftomt->d.a.num_values, leftomt->capacity);
143       this->d.a.start_idx = leftomt->d.a.start_idx;
144     } else {
145       this->create_internal(newsize);
146       memcpy(&this->d.a.values[0], &leftomt->d.a.values[leftomt->d.a.start_idx],
147              leftomt->d.a.num_values * (sizeof this->d.a.values[0]));
148     }
149   } else {
150     this->create_internal(newsize);
151     leftomt->fill_array_with_subtree_values(&this->d.a.values[0],
152                                             leftomt->d.t.root);
153   }
154   leftomt->destroy();
155   this->d.a.num_values = leftsize;
156 
157   if (rightomt->is_array) {
158     memcpy(&this->d.a.values[this->d.a.start_idx + this->d.a.num_values],
159            &rightomt->d.a.values[rightomt->d.a.start_idx],
160            rightomt->d.a.num_values * (sizeof this->d.a.values[0]));
161   } else {
162     rightomt->fill_array_with_subtree_values(
163         &this->d.a.values[this->d.a.start_idx + this->d.a.num_values],
164         rightomt->d.t.root);
165   }
166   rightomt->destroy();
167   this->d.a.num_values += rightsize;
168   paranoid_invariant(this->size() == newsize);
169   if (supports_marks) {
170     this->convert_to_tree();
171   }
172 }
173 
174 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
clone(const omt & src)175 void omt<omtdata_t, omtdataout_t, supports_marks>::clone(const omt &src) {
176   barf_if_marked(*this);
177   this->create_internal(src.size());
178   if (src.is_array) {
179     memcpy(&this->d.a.values[0], &src.d.a.values[src.d.a.start_idx],
180            src.d.a.num_values * (sizeof this->d.a.values[0]));
181   } else {
182     src.fill_array_with_subtree_values(&this->d.a.values[0], src.d.t.root);
183   }
184   this->d.a.num_values = src.size();
185   if (supports_marks) {
186     this->convert_to_tree();
187   }
188 }
189 
190 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
clear(void)191 void omt<omtdata_t, omtdataout_t, supports_marks>::clear(void) {
192   if (this->is_array) {
193     this->d.a.start_idx = 0;
194     this->d.a.num_values = 0;
195   } else {
196     this->d.t.root.set_to_null();
197     this->d.t.free_idx = 0;
198   }
199 }
200 
201 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
destroy(void)202 void omt<omtdata_t, omtdataout_t, supports_marks>::destroy(void) {
203   this->clear();
204   this->capacity = 0;
205   if (this->is_array) {
206     if (this->d.a.values != nullptr) {
207       toku_free(this->d.a.values);
208     }
209     this->d.a.values = nullptr;
210   } else {
211     if (this->d.t.nodes != nullptr) {
212       toku_free(this->d.t.nodes);
213     }
214     this->d.t.nodes = nullptr;
215   }
216 }
217 
218 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
size(void)219 uint32_t omt<omtdata_t, omtdataout_t, supports_marks>::size(void) const {
220   if (this->is_array) {
221     return this->d.a.num_values;
222   } else {
223     return this->nweight(this->d.t.root);
224   }
225 }
226 
227 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
228 template <typename omtcmp_t, int (*h)(const omtdata_t &, const omtcmp_t &)>
insert(const omtdata_t & value,const omtcmp_t & v,uint32_t * const idx)229 int omt<omtdata_t, omtdataout_t, supports_marks>::insert(const omtdata_t &value,
230                                                          const omtcmp_t &v,
231                                                          uint32_t *const idx) {
232   int r;
233   uint32_t insert_idx;
234 
235   r = this->find_zero<omtcmp_t, h>(v, nullptr, &insert_idx);
236   if (r == 0) {
237     if (idx) *idx = insert_idx;
238     return DB_KEYEXIST;
239   }
240   if (r != DB_NOTFOUND) return r;
241 
242   if ((r = this->insert_at(value, insert_idx))) return r;
243   if (idx) *idx = insert_idx;
244 
245   return 0;
246 }
247 
248 // The following 3 functions implement a static if for us.
249 template <typename omtdata_t, typename omtdataout_t>
barf_if_marked(const omt<omtdata_t,omtdataout_t,false> & UU (omt))250 static void barf_if_marked(const omt<omtdata_t, omtdataout_t, false> &UU(omt)) {
251 }
252 
253 template <typename omtdata_t, typename omtdataout_t>
barf_if_marked(const omt<omtdata_t,omtdataout_t,true> & omt)254 static void barf_if_marked(const omt<omtdata_t, omtdataout_t, true> &omt) {
255   invariant(!omt.has_marks());
256 }
257 
258 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
has_marks(void)259 bool omt<omtdata_t, omtdataout_t, supports_marks>::has_marks(void) const {
260   static_assert(supports_marks, "Does not support marks");
261   if (this->d.t.root.is_null()) {
262     return false;
263   }
264   const omt_node &node = this->d.t.nodes[this->d.t.root.get_index()];
265   return node.get_marks_below() || node.get_marked();
266 }
267 
268 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
insert_at(const omtdata_t & value,const uint32_t idx)269 int omt<omtdata_t, omtdataout_t, supports_marks>::insert_at(
270     const omtdata_t &value, const uint32_t idx) {
271   barf_if_marked(*this);
272   if (idx > this->size()) {
273     return EINVAL;
274   }
275 
276   this->maybe_resize_or_convert(this->size() + 1);
277   if (this->is_array && idx != this->d.a.num_values &&
278       (idx != 0 || this->d.a.start_idx == 0)) {
279     this->convert_to_tree();
280   }
281   if (this->is_array) {
282     if (idx == this->d.a.num_values) {
283       this->d.a.values[this->d.a.start_idx + this->d.a.num_values] = value;
284     } else {
285       this->d.a.values[--this->d.a.start_idx] = value;
286     }
287     this->d.a.num_values++;
288   } else {
289     subtree *rebalance_subtree = nullptr;
290     this->insert_internal(&this->d.t.root, value, idx, &rebalance_subtree);
291     if (rebalance_subtree != nullptr) {
292       this->rebalance(rebalance_subtree);
293     }
294   }
295   return 0;
296 }
297 
298 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
set_at(const omtdata_t & value,const uint32_t idx)299 int omt<omtdata_t, omtdataout_t, supports_marks>::set_at(const omtdata_t &value,
300                                                          const uint32_t idx) {
301   barf_if_marked(*this);
302   if (idx >= this->size()) {
303     return EINVAL;
304   }
305 
306   if (this->is_array) {
307     this->set_at_internal_array(value, idx);
308   } else {
309     this->set_at_internal(this->d.t.root, value, idx);
310   }
311   return 0;
312 }
313 
314 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
delete_at(const uint32_t idx)315 int omt<omtdata_t, omtdataout_t, supports_marks>::delete_at(
316     const uint32_t idx) {
317   barf_if_marked(*this);
318   if (idx >= this->size()) {
319     return EINVAL;
320   }
321 
322   this->maybe_resize_or_convert(this->size() - 1);
323   if (this->is_array && idx != 0 && idx != this->d.a.num_values - 1) {
324     this->convert_to_tree();
325   }
326   if (this->is_array) {
327     // Testing for 0 does not rule out it being the last entry.
328     // Test explicitly for num_values-1
329     if (idx != this->d.a.num_values - 1) {
330       this->d.a.start_idx++;
331     }
332     this->d.a.num_values--;
333   } else {
334     subtree *rebalance_subtree = nullptr;
335     this->delete_internal(&this->d.t.root, idx, nullptr, &rebalance_subtree);
336     if (rebalance_subtree != nullptr) {
337       this->rebalance(rebalance_subtree);
338     }
339   }
340   return 0;
341 }
342 
343 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
344 template <typename iterate_extra_t,
345           int (*f)(const omtdata_t &, const uint32_t, iterate_extra_t *const)>
iterate(iterate_extra_t * const iterate_extra)346 int omt<omtdata_t, omtdataout_t, supports_marks>::iterate(
347     iterate_extra_t *const iterate_extra) const {
348   return this->iterate_on_range<iterate_extra_t, f>(0, this->size(),
349                                                     iterate_extra);
350 }
351 
352 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
353 template <typename iterate_extra_t,
354           int (*f)(const omtdata_t &, const uint32_t, iterate_extra_t *const)>
iterate_on_range(const uint32_t left,const uint32_t right,iterate_extra_t * const iterate_extra)355 int omt<omtdata_t, omtdataout_t, supports_marks>::iterate_on_range(
356     const uint32_t left, const uint32_t right,
357     iterate_extra_t *const iterate_extra) const {
358   if (right > this->size()) {
359     return EINVAL;
360   }
361   if (left == right) {
362     return 0;
363   }
364   if (this->is_array) {
365     return this->iterate_internal_array<iterate_extra_t, f>(left, right,
366                                                             iterate_extra);
367   }
368   return this->iterate_internal<iterate_extra_t, f>(left, right, this->d.t.root,
369                                                     0, iterate_extra);
370 }
371 
372 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
373 template <typename iterate_extra_t,
374           int (*f)(const omtdata_t &, const uint32_t, iterate_extra_t *const)>
iterate_and_mark_range(const uint32_t left,const uint32_t right,iterate_extra_t * const iterate_extra)375 int omt<omtdata_t, omtdataout_t, supports_marks>::iterate_and_mark_range(
376     const uint32_t left, const uint32_t right,
377     iterate_extra_t *const iterate_extra) {
378   static_assert(supports_marks, "does not support marks");
379   if (right > this->size()) {
380     return EINVAL;
381   }
382   if (left == right) {
383     return 0;
384   }
385   paranoid_invariant(!this->is_array);
386   return this->iterate_and_mark_range_internal<iterate_extra_t, f>(
387       left, right, this->d.t.root, 0, iterate_extra);
388 }
389 
390 // TODO: We can optimize this if we steal 3 bits.  1 bit: this node is
391 // marked.  1 bit: left subtree has marks. 1 bit: right subtree has marks.
392 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
393 template <typename iterate_extra_t,
394           int (*f)(const omtdata_t &, const uint32_t, iterate_extra_t *const)>
iterate_over_marked(iterate_extra_t * const iterate_extra)395 int omt<omtdata_t, omtdataout_t, supports_marks>::iterate_over_marked(
396     iterate_extra_t *const iterate_extra) const {
397   static_assert(supports_marks, "does not support marks");
398   paranoid_invariant(!this->is_array);
399   return this->iterate_over_marked_internal<iterate_extra_t, f>(
400       this->d.t.root, 0, iterate_extra);
401 }
402 
403 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
unmark(const subtree & st,const uint32_t index,GrowableArray<node_idx> * const indexes)404 void omt<omtdata_t, omtdataout_t, supports_marks>::unmark(
405     const subtree &st, const uint32_t index,
406     GrowableArray<node_idx> *const indexes) {
407   if (st.is_null()) {
408     return;
409   }
410   omt_node &n = this->d.t.nodes[st.get_index()];
411   const uint32_t index_root = index + this->nweight(n.left);
412 
413   const bool below = n.get_marks_below();
414   if (below) {
415     this->unmark(n.left, index, indexes);
416   }
417   if (n.get_marked()) {
418     indexes->push(index_root);
419   }
420   n.clear_stolen_bits();
421   if (below) {
422     this->unmark(n.right, index_root + 1, indexes);
423   }
424 }
425 
426 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
delete_all_marked(void)427 void omt<omtdata_t, omtdataout_t, supports_marks>::delete_all_marked(void) {
428   static_assert(supports_marks, "does not support marks");
429   if (!this->has_marks()) {
430     return;
431   }
432   paranoid_invariant(!this->is_array);
433   GrowableArray<node_idx> marked_indexes;
434   marked_indexes.init();
435 
436   // Remove all marks.
437   // We need to delete all the stolen bits before calling delete_at to
438   // prevent barfing.
439   this->unmark(this->d.t.root, 0, &marked_indexes);
440 
441   for (uint32_t i = 0; i < marked_indexes.get_size(); i++) {
442     // Delete from left to right, shift by number already deleted.
443     // Alternative is delete from right to left.
444     int r = this->delete_at(marked_indexes.fetch_unchecked(i) - i);
445     lazy_assert_zero(r);
446   }
447   marked_indexes.deinit();
448   barf_if_marked(*this);
449 }
450 
451 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
452 uint32_t
verify_marks_consistent_internal(const subtree & st,const bool UU (allow_marks))453 omt<omtdata_t, omtdataout_t, supports_marks>::verify_marks_consistent_internal(
454     const subtree &st, const bool UU(allow_marks)) const {
455   if (st.is_null()) {
456     return 0;
457   }
458   const omt_node &node = this->d.t.nodes[st.get_index()];
459   uint32_t num_marks =
460       verify_marks_consistent_internal(node.left, node.get_marks_below());
461   num_marks +=
462       verify_marks_consistent_internal(node.right, node.get_marks_below());
463   if (node.get_marks_below()) {
464     paranoid_invariant(allow_marks);
465     paranoid_invariant(num_marks > 0);
466   } else {
467     // redundant with invariant below, but nice to have explicitly
468     paranoid_invariant(num_marks == 0);
469   }
470   if (node.get_marked()) {
471     paranoid_invariant(allow_marks);
472     ++num_marks;
473   }
474   return num_marks;
475 }
476 
477 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
verify_marks_consistent(void)478 void omt<omtdata_t, omtdataout_t, supports_marks>::verify_marks_consistent(
479     void) const {
480   static_assert(supports_marks, "does not support marks");
481   paranoid_invariant(!this->is_array);
482   this->verify_marks_consistent_internal(this->d.t.root, true);
483 }
484 
485 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
486 template <typename iterate_extra_t,
487           int (*f)(omtdata_t *, const uint32_t, iterate_extra_t *const)>
iterate_ptr(iterate_extra_t * const iterate_extra)488 void omt<omtdata_t, omtdataout_t, supports_marks>::iterate_ptr(
489     iterate_extra_t *const iterate_extra) {
490   if (this->is_array) {
491     this->iterate_ptr_internal_array<iterate_extra_t, f>(0, this->size(),
492                                                          iterate_extra);
493   } else {
494     this->iterate_ptr_internal<iterate_extra_t, f>(
495         0, this->size(), this->d.t.root, 0, iterate_extra);
496   }
497 }
498 
499 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
fetch(const uint32_t idx,omtdataout_t * const value)500 int omt<omtdata_t, omtdataout_t, supports_marks>::fetch(
501     const uint32_t idx, omtdataout_t *const value) const {
502   if (idx >= this->size()) {
503     return EINVAL;
504   }
505   if (this->is_array) {
506     this->fetch_internal_array(idx, value);
507   } else {
508     this->fetch_internal(this->d.t.root, idx, value);
509   }
510   return 0;
511 }
512 
513 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
514 template <typename omtcmp_t, int (*h)(const omtdata_t &, const omtcmp_t &)>
find_zero(const omtcmp_t & extra,omtdataout_t * const value,uint32_t * const idxp)515 int omt<omtdata_t, omtdataout_t, supports_marks>::find_zero(
516     const omtcmp_t &extra, omtdataout_t *const value,
517     uint32_t *const idxp) const {
518   uint32_t tmp_index;
519   uint32_t *const child_idxp = (idxp != nullptr) ? idxp : &tmp_index;
520   int r;
521   if (this->is_array) {
522     r = this->find_internal_zero_array<omtcmp_t, h>(extra, value, child_idxp);
523   } else {
524     r = this->find_internal_zero<omtcmp_t, h>(this->d.t.root, extra, value,
525                                               child_idxp);
526   }
527   return r;
528 }
529 
530 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
531 template <typename omtcmp_t, int (*h)(const omtdata_t &, const omtcmp_t &)>
find(const omtcmp_t & extra,int direction,omtdataout_t * const value,uint32_t * const idxp)532 int omt<omtdata_t, omtdataout_t, supports_marks>::find(
533     const omtcmp_t &extra, int direction, omtdataout_t *const value,
534     uint32_t *const idxp) const {
535   uint32_t tmp_index;
536   uint32_t *const child_idxp = (idxp != nullptr) ? idxp : &tmp_index;
537   paranoid_invariant(direction != 0);
538   if (direction < 0) {
539     if (this->is_array) {
540       return this->find_internal_minus_array<omtcmp_t, h>(extra, value,
541                                                           child_idxp);
542     } else {
543       return this->find_internal_minus<omtcmp_t, h>(this->d.t.root, extra,
544                                                     value, child_idxp);
545     }
546   } else {
547     if (this->is_array) {
548       return this->find_internal_plus_array<omtcmp_t, h>(extra, value,
549                                                          child_idxp);
550     } else {
551       return this->find_internal_plus<omtcmp_t, h>(this->d.t.root, extra, value,
552                                                    child_idxp);
553     }
554   }
555 }
556 
557 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
memory_size(void)558 size_t omt<omtdata_t, omtdataout_t, supports_marks>::memory_size(void) {
559   if (this->is_array) {
560     return (sizeof *this) + this->capacity * (sizeof this->d.a.values[0]);
561   }
562   return (sizeof *this) + this->capacity * (sizeof this->d.t.nodes[0]);
563 }
564 
565 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
create_internal_no_array(const uint32_t new_capacity)566 void omt<omtdata_t, omtdataout_t, supports_marks>::create_internal_no_array(
567     const uint32_t new_capacity) {
568   this->is_array = true;
569   this->d.a.start_idx = 0;
570   this->d.a.num_values = 0;
571   this->d.a.values = nullptr;
572   this->capacity = new_capacity;
573 }
574 
575 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
create_internal(const uint32_t new_capacity)576 void omt<omtdata_t, omtdataout_t, supports_marks>::create_internal(
577     const uint32_t new_capacity) {
578   this->create_internal_no_array(new_capacity);
579   XMALLOC_N(this->capacity, this->d.a.values);
580 }
581 
582 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
nweight(const subtree & st)583 uint32_t omt<omtdata_t, omtdataout_t, supports_marks>::nweight(
584     const subtree &st) const {
585   if (st.is_null()) {
586     return 0;
587   } else {
588     return this->d.t.nodes[st.get_index()].weight;
589   }
590 }
591 
592 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
593 typename omt<omtdata_t, omtdataout_t, supports_marks>::node_idx
node_malloc(void)594 omt<omtdata_t, omtdataout_t, supports_marks>::node_malloc(void) {
595   paranoid_invariant(this->d.t.free_idx < this->capacity);
596   omt_node &n = this->d.t.nodes[this->d.t.free_idx];
597   n.clear_stolen_bits();
598   return this->d.t.free_idx++;
599 }
600 
601 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
node_free(const node_idx UU (idx))602 void omt<omtdata_t, omtdataout_t, supports_marks>::node_free(
603     const node_idx UU(idx)) {
604   paranoid_invariant(idx < this->capacity);
605 }
606 
607 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
maybe_resize_array(const uint32_t n)608 void omt<omtdata_t, omtdataout_t, supports_marks>::maybe_resize_array(
609     const uint32_t n) {
610   const uint32_t new_size = n <= 2 ? 4 : 2 * n;
611   const uint32_t room = this->capacity - this->d.a.start_idx;
612 
613   if (room < n || this->capacity / 2 >= new_size) {
614     omtdata_t *XMALLOC_N(new_size, tmp_values);
615     if (this->d.a.num_values) {
616       memcpy(tmp_values, &this->d.a.values[this->d.a.start_idx],
617              this->d.a.num_values * (sizeof tmp_values[0]));
618     }
619     this->d.a.start_idx = 0;
620     this->capacity = new_size;
621     toku_free(this->d.a.values);
622     this->d.a.values = tmp_values;
623   }
624 }
625 
626 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
627 void omt<omtdata_t, omtdataout_t,
fill_array_with_subtree_values(omtdata_t * const array,const subtree & st)628          supports_marks>::fill_array_with_subtree_values(omtdata_t *const array,
629                                                          const subtree &st)
630     const {
631   if (st.is_null()) return;
632   const omt_node &tree = this->d.t.nodes[st.get_index()];
633   this->fill_array_with_subtree_values(&array[0], tree.left);
634   array[this->nweight(tree.left)] = tree.value;
635   this->fill_array_with_subtree_values(&array[this->nweight(tree.left) + 1],
636                                        tree.right);
637 }
638 
639 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
convert_to_array(void)640 void omt<omtdata_t, omtdataout_t, supports_marks>::convert_to_array(void) {
641   if (!this->is_array) {
642     const uint32_t num_values = this->size();
643     uint32_t new_size = 2 * num_values;
644     new_size = new_size < 4 ? 4 : new_size;
645 
646     omtdata_t *XMALLOC_N(new_size, tmp_values);
647     this->fill_array_with_subtree_values(tmp_values, this->d.t.root);
648     toku_free(this->d.t.nodes);
649     this->is_array = true;
650     this->capacity = new_size;
651     this->d.a.num_values = num_values;
652     this->d.a.values = tmp_values;
653     this->d.a.start_idx = 0;
654   }
655 }
656 
657 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
rebuild_from_sorted_array(subtree * const st,const omtdata_t * const values,const uint32_t numvalues)658 void omt<omtdata_t, omtdataout_t, supports_marks>::rebuild_from_sorted_array(
659     subtree *const st, const omtdata_t *const values,
660     const uint32_t numvalues) {
661   if (numvalues == 0) {
662     st->set_to_null();
663   } else {
664     const uint32_t halfway = numvalues / 2;
665     const node_idx newidx = this->node_malloc();
666     omt_node *const newnode = &this->d.t.nodes[newidx];
667     newnode->weight = numvalues;
668     newnode->value = values[halfway];
669     st->set_index(newidx);
670     // update everything before the recursive calls so the second call
671     // can be a tail call.
672     this->rebuild_from_sorted_array(&newnode->left, &values[0], halfway);
673     this->rebuild_from_sorted_array(&newnode->right, &values[halfway + 1],
674                                     numvalues - (halfway + 1));
675   }
676 }
677 
678 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
convert_to_tree(void)679 void omt<omtdata_t, omtdataout_t, supports_marks>::convert_to_tree(void) {
680   if (this->is_array) {
681     const uint32_t num_nodes = this->size();
682     uint32_t new_size = num_nodes * 2;
683     new_size = new_size < 4 ? 4 : new_size;
684 
685     omt_node *XMALLOC_N(new_size, new_nodes);
686     omtdata_t *const values = this->d.a.values;
687     omtdata_t *const tmp_values = &values[this->d.a.start_idx];
688     this->is_array = false;
689     this->d.t.nodes = new_nodes;
690     this->capacity = new_size;
691     this->d.t.free_idx = 0;
692     this->d.t.root.set_to_null();
693     this->rebuild_from_sorted_array(&this->d.t.root, tmp_values, num_nodes);
694     toku_free(values);
695   }
696 }
697 
698 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
maybe_resize_or_convert(const uint32_t n)699 void omt<omtdata_t, omtdataout_t, supports_marks>::maybe_resize_or_convert(
700     const uint32_t n) {
701   if (this->is_array) {
702     this->maybe_resize_array(n);
703   } else {
704     const uint32_t new_size = n <= 2 ? 4 : 2 * n;
705     const uint32_t num_nodes = this->nweight(this->d.t.root);
706     if ((this->capacity / 2 >= new_size) ||
707         (this->d.t.free_idx >= this->capacity && num_nodes < n) ||
708         (this->capacity < n)) {
709       this->convert_to_array();
710       // if we had a free list, the "supports_marks" version could
711       // just resize, as it is now, we have to convert to and back
712       // from an array.
713       if (supports_marks) {
714         this->convert_to_tree();
715       }
716     }
717   }
718 }
719 
720 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
will_need_rebalance(const subtree & st,const int leftmod,const int rightmod)721 bool omt<omtdata_t, omtdataout_t, supports_marks>::will_need_rebalance(
722     const subtree &st, const int leftmod, const int rightmod) const {
723   if (st.is_null()) {
724     return false;
725   }
726   const omt_node &n = this->d.t.nodes[st.get_index()];
727   // one of the 1's is for the root.
728   // the other is to take ceil(n/2)
729   const uint32_t weight_left = this->nweight(n.left) + leftmod;
730   const uint32_t weight_right = this->nweight(n.right) + rightmod;
731   return ((1 + weight_left < (1 + 1 + weight_right) / 2) ||
732           (1 + weight_right < (1 + 1 + weight_left) / 2));
733 }
734 
735 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
insert_internal(subtree * const subtreep,const omtdata_t & value,const uint32_t idx,subtree ** const rebalance_subtree)736 void omt<omtdata_t, omtdataout_t, supports_marks>::insert_internal(
737     subtree *const subtreep, const omtdata_t &value, const uint32_t idx,
738     subtree **const rebalance_subtree) {
739   if (subtreep->is_null()) {
740     paranoid_invariant_zero(idx);
741     const node_idx newidx = this->node_malloc();
742     omt_node *const newnode = &this->d.t.nodes[newidx];
743     newnode->weight = 1;
744     newnode->left.set_to_null();
745     newnode->right.set_to_null();
746     newnode->value = value;
747     subtreep->set_index(newidx);
748   } else {
749     omt_node &n = this->d.t.nodes[subtreep->get_index()];
750     n.weight++;
751     if (idx <= this->nweight(n.left)) {
752       if (*rebalance_subtree == nullptr &&
753           this->will_need_rebalance(*subtreep, 1, 0)) {
754         *rebalance_subtree = subtreep;
755       }
756       this->insert_internal(&n.left, value, idx, rebalance_subtree);
757     } else {
758       if (*rebalance_subtree == nullptr &&
759           this->will_need_rebalance(*subtreep, 0, 1)) {
760         *rebalance_subtree = subtreep;
761       }
762       const uint32_t sub_index = idx - this->nweight(n.left) - 1;
763       this->insert_internal(&n.right, value, sub_index, rebalance_subtree);
764     }
765   }
766 }
767 
768 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
set_at_internal_array(const omtdata_t & value,const uint32_t idx)769 void omt<omtdata_t, omtdataout_t, supports_marks>::set_at_internal_array(
770     const omtdata_t &value, const uint32_t idx) {
771   this->d.a.values[this->d.a.start_idx + idx] = value;
772 }
773 
774 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
set_at_internal(const subtree & st,const omtdata_t & value,const uint32_t idx)775 void omt<omtdata_t, omtdataout_t, supports_marks>::set_at_internal(
776     const subtree &st, const omtdata_t &value, const uint32_t idx) {
777   paranoid_invariant(!st.is_null());
778   omt_node &n = this->d.t.nodes[st.get_index()];
779   const uint32_t leftweight = this->nweight(n.left);
780   if (idx < leftweight) {
781     this->set_at_internal(n.left, value, idx);
782   } else if (idx == leftweight) {
783     n.value = value;
784   } else {
785     this->set_at_internal(n.right, value, idx - leftweight - 1);
786   }
787 }
788 
789 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
delete_internal(subtree * const subtreep,const uint32_t idx,omt_node * const copyn,subtree ** const rebalance_subtree)790 void omt<omtdata_t, omtdataout_t, supports_marks>::delete_internal(
791     subtree *const subtreep, const uint32_t idx, omt_node *const copyn,
792     subtree **const rebalance_subtree) {
793   paranoid_invariant_notnull(subtreep);
794   paranoid_invariant_notnull(rebalance_subtree);
795   paranoid_invariant(!subtreep->is_null());
796   omt_node &n = this->d.t.nodes[subtreep->get_index()];
797   const uint32_t leftweight = this->nweight(n.left);
798   if (idx < leftweight) {
799     n.weight--;
800     if (*rebalance_subtree == nullptr &&
801         this->will_need_rebalance(*subtreep, -1, 0)) {
802       *rebalance_subtree = subtreep;
803     }
804     this->delete_internal(&n.left, idx, copyn, rebalance_subtree);
805   } else if (idx == leftweight) {
806     if (n.left.is_null()) {
807       const uint32_t oldidx = subtreep->get_index();
808       *subtreep = n.right;
809       if (copyn != nullptr) {
810         copyn->value = n.value;
811       }
812       this->node_free(oldidx);
813     } else if (n.right.is_null()) {
814       const uint32_t oldidx = subtreep->get_index();
815       *subtreep = n.left;
816       if (copyn != nullptr) {
817         copyn->value = n.value;
818       }
819       this->node_free(oldidx);
820     } else {
821       if (*rebalance_subtree == nullptr &&
822           this->will_need_rebalance(*subtreep, 0, -1)) {
823         *rebalance_subtree = subtreep;
824       }
825       // don't need to copy up value, it's only used by this
826       // next call, and when that gets to the bottom there
827       // won't be any more recursion
828       n.weight--;
829       this->delete_internal(&n.right, 0, &n, rebalance_subtree);
830     }
831   } else {
832     n.weight--;
833     if (*rebalance_subtree == nullptr &&
834         this->will_need_rebalance(*subtreep, 0, -1)) {
835       *rebalance_subtree = subtreep;
836     }
837     this->delete_internal(&n.right, idx - leftweight - 1, copyn,
838                           rebalance_subtree);
839   }
840 }
841 
842 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
843 template <typename iterate_extra_t,
844           int (*f)(const omtdata_t &, const uint32_t, iterate_extra_t *const)>
iterate_internal_array(const uint32_t left,const uint32_t right,iterate_extra_t * const iterate_extra)845 int omt<omtdata_t, omtdataout_t, supports_marks>::iterate_internal_array(
846     const uint32_t left, const uint32_t right,
847     iterate_extra_t *const iterate_extra) const {
848   int r;
849   for (uint32_t i = left; i < right; ++i) {
850     r = f(this->d.a.values[this->d.a.start_idx + i], i, iterate_extra);
851     if (r != 0) {
852       return r;
853     }
854   }
855   return 0;
856 }
857 
858 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
859 template <typename iterate_extra_t,
860           int (*f)(omtdata_t *, const uint32_t, iterate_extra_t *const)>
iterate_ptr_internal(const uint32_t left,const uint32_t right,const subtree & st,const uint32_t idx,iterate_extra_t * const iterate_extra)861 void omt<omtdata_t, omtdataout_t, supports_marks>::iterate_ptr_internal(
862     const uint32_t left, const uint32_t right, const subtree &st,
863     const uint32_t idx, iterate_extra_t *const iterate_extra) {
864   if (!st.is_null()) {
865     omt_node &n = this->d.t.nodes[st.get_index()];
866     const uint32_t idx_root = idx + this->nweight(n.left);
867     if (left < idx_root) {
868       this->iterate_ptr_internal<iterate_extra_t, f>(left, right, n.left, idx,
869                                                      iterate_extra);
870     }
871     if (left <= idx_root && idx_root < right) {
872       int r = f(&n.value, idx_root, iterate_extra);
873       lazy_assert_zero(r);
874     }
875     if (idx_root + 1 < right) {
876       this->iterate_ptr_internal<iterate_extra_t, f>(
877           left, right, n.right, idx_root + 1, iterate_extra);
878     }
879   }
880 }
881 
882 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
883 template <typename iterate_extra_t,
884           int (*f)(omtdata_t *, const uint32_t, iterate_extra_t *const)>
iterate_ptr_internal_array(const uint32_t left,const uint32_t right,iterate_extra_t * const iterate_extra)885 void omt<omtdata_t, omtdataout_t, supports_marks>::iterate_ptr_internal_array(
886     const uint32_t left, const uint32_t right,
887     iterate_extra_t *const iterate_extra) {
888   for (uint32_t i = left; i < right; ++i) {
889     int r = f(&this->d.a.values[this->d.a.start_idx + i], i, iterate_extra);
890     lazy_assert_zero(r);
891   }
892 }
893 
894 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
895 template <typename iterate_extra_t,
896           int (*f)(const omtdata_t &, const uint32_t, iterate_extra_t *const)>
iterate_internal(const uint32_t left,const uint32_t right,const subtree & st,const uint32_t idx,iterate_extra_t * const iterate_extra)897 int omt<omtdata_t, omtdataout_t, supports_marks>::iterate_internal(
898     const uint32_t left, const uint32_t right, const subtree &st,
899     const uint32_t idx, iterate_extra_t *const iterate_extra) const {
900   if (st.is_null()) {
901     return 0;
902   }
903   int r;
904   const omt_node &n = this->d.t.nodes[st.get_index()];
905   const uint32_t idx_root = idx + this->nweight(n.left);
906   if (left < idx_root) {
907     r = this->iterate_internal<iterate_extra_t, f>(left, right, n.left, idx,
908                                                    iterate_extra);
909     if (r != 0) {
910       return r;
911     }
912   }
913   if (left <= idx_root && idx_root < right) {
914     r = f(n.value, idx_root, iterate_extra);
915     if (r != 0) {
916       return r;
917     }
918   }
919   if (idx_root + 1 < right) {
920     return this->iterate_internal<iterate_extra_t, f>(
921         left, right, n.right, idx_root + 1, iterate_extra);
922   }
923   return 0;
924 }
925 
926 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
927 template <typename iterate_extra_t,
928           int (*f)(const omtdata_t &, const uint32_t, iterate_extra_t *const)>
929 int omt<omtdata_t, omtdataout_t, supports_marks>::
iterate_and_mark_range_internal(const uint32_t left,const uint32_t right,const subtree & st,const uint32_t idx,iterate_extra_t * const iterate_extra)930     iterate_and_mark_range_internal(const uint32_t left, const uint32_t right,
931                                     const subtree &st, const uint32_t idx,
932                                     iterate_extra_t *const iterate_extra) {
933   paranoid_invariant(!st.is_null());
934   int r;
935   omt_node &n = this->d.t.nodes[st.get_index()];
936   const uint32_t idx_root = idx + this->nweight(n.left);
937   if (left < idx_root && !n.left.is_null()) {
938     n.set_marks_below_bit();
939     r = this->iterate_and_mark_range_internal<iterate_extra_t, f>(
940         left, right, n.left, idx, iterate_extra);
941     if (r != 0) {
942       return r;
943     }
944   }
945   if (left <= idx_root && idx_root < right) {
946     n.set_marked_bit();
947     r = f(n.value, idx_root, iterate_extra);
948     if (r != 0) {
949       return r;
950     }
951   }
952   if (idx_root + 1 < right && !n.right.is_null()) {
953     n.set_marks_below_bit();
954     return this->iterate_and_mark_range_internal<iterate_extra_t, f>(
955         left, right, n.right, idx_root + 1, iterate_extra);
956   }
957   return 0;
958 }
959 
960 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
961 template <typename iterate_extra_t,
962           int (*f)(const omtdata_t &, const uint32_t, iterate_extra_t *const)>
iterate_over_marked_internal(const subtree & st,const uint32_t idx,iterate_extra_t * const iterate_extra)963 int omt<omtdata_t, omtdataout_t, supports_marks>::iterate_over_marked_internal(
964     const subtree &st, const uint32_t idx,
965     iterate_extra_t *const iterate_extra) const {
966   if (st.is_null()) {
967     return 0;
968   }
969   int r;
970   const omt_node &n = this->d.t.nodes[st.get_index()];
971   const uint32_t idx_root = idx + this->nweight(n.left);
972   if (n.get_marks_below()) {
973     r = this->iterate_over_marked_internal<iterate_extra_t, f>(n.left, idx,
974                                                                iterate_extra);
975     if (r != 0) {
976       return r;
977     }
978   }
979   if (n.get_marked()) {
980     r = f(n.value, idx_root, iterate_extra);
981     if (r != 0) {
982       return r;
983     }
984   }
985   if (n.get_marks_below()) {
986     return this->iterate_over_marked_internal<iterate_extra_t, f>(
987         n.right, idx_root + 1, iterate_extra);
988   }
989   return 0;
990 }
991 
992 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
fetch_internal_array(const uint32_t i,omtdataout_t * const value)993 void omt<omtdata_t, omtdataout_t, supports_marks>::fetch_internal_array(
994     const uint32_t i, omtdataout_t *const value) const {
995   if (value != nullptr) {
996     copyout(value, &this->d.a.values[this->d.a.start_idx + i]);
997   }
998 }
999 
1000 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
fetch_internal(const subtree & st,const uint32_t i,omtdataout_t * const value)1001 void omt<omtdata_t, omtdataout_t, supports_marks>::fetch_internal(
1002     const subtree &st, const uint32_t i, omtdataout_t *const value) const {
1003   omt_node &n = this->d.t.nodes[st.get_index()];
1004   const uint32_t leftweight = this->nweight(n.left);
1005   if (i < leftweight) {
1006     this->fetch_internal(n.left, i, value);
1007   } else if (i == leftweight) {
1008     if (value != nullptr) {
1009       copyout(value, &n);
1010     }
1011   } else {
1012     this->fetch_internal(n.right, i - leftweight - 1, value);
1013   }
1014 }
1015 
1016 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
fill_array_with_subtree_idxs(node_idx * const array,const subtree & st)1017 void omt<omtdata_t, omtdataout_t, supports_marks>::fill_array_with_subtree_idxs(
1018     node_idx *const array, const subtree &st) const {
1019   if (!st.is_null()) {
1020     const omt_node &tree = this->d.t.nodes[st.get_index()];
1021     this->fill_array_with_subtree_idxs(&array[0], tree.left);
1022     array[this->nweight(tree.left)] = st.get_index();
1023     this->fill_array_with_subtree_idxs(&array[this->nweight(tree.left) + 1],
1024                                        tree.right);
1025   }
1026 }
1027 
1028 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
rebuild_subtree_from_idxs(subtree * const st,const node_idx * const idxs,const uint32_t numvalues)1029 void omt<omtdata_t, omtdataout_t, supports_marks>::rebuild_subtree_from_idxs(
1030     subtree *const st, const node_idx *const idxs, const uint32_t numvalues) {
1031   if (numvalues == 0) {
1032     st->set_to_null();
1033   } else {
1034     uint32_t halfway = numvalues / 2;
1035     st->set_index(idxs[halfway]);
1036     // node_idx newidx = idxs[halfway];
1037     omt_node &newnode = this->d.t.nodes[st->get_index()];
1038     newnode.weight = numvalues;
1039     // value is already in there.
1040     this->rebuild_subtree_from_idxs(&newnode.left, &idxs[0], halfway);
1041     this->rebuild_subtree_from_idxs(&newnode.right, &idxs[halfway + 1],
1042                                     numvalues - (halfway + 1));
1043     // n_idx = newidx;
1044   }
1045 }
1046 
1047 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
rebalance(subtree * const st)1048 void omt<omtdata_t, omtdataout_t, supports_marks>::rebalance(
1049     subtree *const st) {
1050   node_idx idx = st->get_index();
1051   if (idx == this->d.t.root.get_index()) {
1052     // Try to convert to an array.
1053     // If this fails, (malloc) nothing will have changed.
1054     // In the failure case we continue on to the standard rebalance
1055     // algorithm.
1056     this->convert_to_array();
1057     if (supports_marks) {
1058       this->convert_to_tree();
1059     }
1060   } else {
1061     const omt_node &n = this->d.t.nodes[idx];
1062     node_idx *tmp_array;
1063     size_t mem_needed = n.weight * (sizeof tmp_array[0]);
1064     size_t mem_free =
1065         (this->capacity - this->d.t.free_idx) * (sizeof this->d.t.nodes[0]);
1066     bool malloced;
1067     if (mem_needed <= mem_free) {
1068       // There is sufficient free space at the end of the nodes array
1069       // to hold enough node indexes to rebalance.
1070       malloced = false;
1071       tmp_array =
1072           reinterpret_cast<node_idx *>(&this->d.t.nodes[this->d.t.free_idx]);
1073     } else {
1074       malloced = true;
1075       XMALLOC_N(n.weight, tmp_array);
1076     }
1077     this->fill_array_with_subtree_idxs(tmp_array, *st);
1078     this->rebuild_subtree_from_idxs(st, tmp_array, n.weight);
1079     if (malloced) toku_free(tmp_array);
1080   }
1081 }
1082 
1083 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
copyout(omtdata_t * const out,const omt_node * const n)1084 void omt<omtdata_t, omtdataout_t, supports_marks>::copyout(
1085     omtdata_t *const out, const omt_node *const n) {
1086   *out = n->value;
1087 }
1088 
1089 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
copyout(omtdata_t ** const out,omt_node * const n)1090 void omt<omtdata_t, omtdataout_t, supports_marks>::copyout(
1091     omtdata_t **const out, omt_node *const n) {
1092   *out = &n->value;
1093 }
1094 
1095 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
copyout(omtdata_t * const out,const omtdata_t * const stored_value_ptr)1096 void omt<omtdata_t, omtdataout_t, supports_marks>::copyout(
1097     omtdata_t *const out, const omtdata_t *const stored_value_ptr) {
1098   *out = *stored_value_ptr;
1099 }
1100 
1101 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
copyout(omtdata_t ** const out,omtdata_t * const stored_value_ptr)1102 void omt<omtdata_t, omtdataout_t, supports_marks>::copyout(
1103     omtdata_t **const out, omtdata_t *const stored_value_ptr) {
1104   *out = stored_value_ptr;
1105 }
1106 
1107 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
1108 template <typename omtcmp_t, int (*h)(const omtdata_t &, const omtcmp_t &)>
find_internal_zero_array(const omtcmp_t & extra,omtdataout_t * const value,uint32_t * const idxp)1109 int omt<omtdata_t, omtdataout_t, supports_marks>::find_internal_zero_array(
1110     const omtcmp_t &extra, omtdataout_t *const value,
1111     uint32_t *const idxp) const {
1112   paranoid_invariant_notnull(idxp);
1113   uint32_t min = this->d.a.start_idx;
1114   uint32_t limit = this->d.a.start_idx + this->d.a.num_values;
1115   uint32_t best_pos = subtree::NODE_NULL;
1116   uint32_t best_zero = subtree::NODE_NULL;
1117 
1118   while (min != limit) {
1119     uint32_t mid = (min + limit) / 2;
1120     int hv = h(this->d.a.values[mid], extra);
1121     if (hv < 0) {
1122       min = mid + 1;
1123     } else if (hv > 0) {
1124       best_pos = mid;
1125       limit = mid;
1126     } else {
1127       best_zero = mid;
1128       limit = mid;
1129     }
1130   }
1131   if (best_zero != subtree::NODE_NULL) {
1132     // Found a zero
1133     if (value != nullptr) {
1134       copyout(value, &this->d.a.values[best_zero]);
1135     }
1136     *idxp = best_zero - this->d.a.start_idx;
1137     return 0;
1138   }
1139   if (best_pos != subtree::NODE_NULL)
1140     *idxp = best_pos - this->d.a.start_idx;
1141   else
1142     *idxp = this->d.a.num_values;
1143   return DB_NOTFOUND;
1144 }
1145 
1146 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
1147 template <typename omtcmp_t, int (*h)(const omtdata_t &, const omtcmp_t &)>
find_internal_zero(const subtree & st,const omtcmp_t & extra,omtdataout_t * const value,uint32_t * const idxp)1148 int omt<omtdata_t, omtdataout_t, supports_marks>::find_internal_zero(
1149     const subtree &st, const omtcmp_t &extra, omtdataout_t *const value,
1150     uint32_t *const idxp) const {
1151   paranoid_invariant_notnull(idxp);
1152   if (st.is_null()) {
1153     *idxp = 0;
1154     return DB_NOTFOUND;
1155   }
1156   omt_node &n = this->d.t.nodes[st.get_index()];
1157   int hv = h(n.value, extra);
1158   if (hv < 0) {
1159     int r = this->find_internal_zero<omtcmp_t, h>(n.right, extra, value, idxp);
1160     *idxp += this->nweight(n.left) + 1;
1161     return r;
1162   } else if (hv > 0) {
1163     return this->find_internal_zero<omtcmp_t, h>(n.left, extra, value, idxp);
1164   } else {
1165     int r = this->find_internal_zero<omtcmp_t, h>(n.left, extra, value, idxp);
1166     if (r == DB_NOTFOUND) {
1167       *idxp = this->nweight(n.left);
1168       if (value != nullptr) {
1169         copyout(value, &n);
1170       }
1171       r = 0;
1172     }
1173     return r;
1174   }
1175 }
1176 
1177 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
1178 template <typename omtcmp_t, int (*h)(const omtdata_t &, const omtcmp_t &)>
find_internal_plus_array(const omtcmp_t & extra,omtdataout_t * const value,uint32_t * const idxp)1179 int omt<omtdata_t, omtdataout_t, supports_marks>::find_internal_plus_array(
1180     const omtcmp_t &extra, omtdataout_t *const value,
1181     uint32_t *const idxp) const {
1182   paranoid_invariant_notnull(idxp);
1183   uint32_t min = this->d.a.start_idx;
1184   uint32_t limit = this->d.a.start_idx + this->d.a.num_values;
1185   uint32_t best = subtree::NODE_NULL;
1186 
1187   while (min != limit) {
1188     const uint32_t mid = (min + limit) / 2;
1189     const int hv = h(this->d.a.values[mid], extra);
1190     if (hv > 0) {
1191       best = mid;
1192       limit = mid;
1193     } else {
1194       min = mid + 1;
1195     }
1196   }
1197   if (best == subtree::NODE_NULL) {
1198     return DB_NOTFOUND;
1199   }
1200   if (value != nullptr) {
1201     copyout(value, &this->d.a.values[best]);
1202   }
1203   *idxp = best - this->d.a.start_idx;
1204   return 0;
1205 }
1206 
1207 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
1208 template <typename omtcmp_t, int (*h)(const omtdata_t &, const omtcmp_t &)>
find_internal_plus(const subtree & st,const omtcmp_t & extra,omtdataout_t * const value,uint32_t * const idxp)1209 int omt<omtdata_t, omtdataout_t, supports_marks>::find_internal_plus(
1210     const subtree &st, const omtcmp_t &extra, omtdataout_t *const value,
1211     uint32_t *const idxp) const {
1212   paranoid_invariant_notnull(idxp);
1213   if (st.is_null()) {
1214     return DB_NOTFOUND;
1215   }
1216   omt_node *const n = &this->d.t.nodes[st.get_index()];
1217   int hv = h(n->value, extra);
1218   int r;
1219   if (hv > 0) {
1220     r = this->find_internal_plus<omtcmp_t, h>(n->left, extra, value, idxp);
1221     if (r == DB_NOTFOUND) {
1222       *idxp = this->nweight(n->left);
1223       if (value != nullptr) {
1224         copyout(value, n);
1225       }
1226       r = 0;
1227     }
1228   } else {
1229     r = this->find_internal_plus<omtcmp_t, h>(n->right, extra, value, idxp);
1230     if (r == 0) {
1231       *idxp += this->nweight(n->left) + 1;
1232     }
1233   }
1234   return r;
1235 }
1236 
1237 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
1238 template <typename omtcmp_t, int (*h)(const omtdata_t &, const omtcmp_t &)>
find_internal_minus_array(const omtcmp_t & extra,omtdataout_t * const value,uint32_t * const idxp)1239 int omt<omtdata_t, omtdataout_t, supports_marks>::find_internal_minus_array(
1240     const omtcmp_t &extra, omtdataout_t *const value,
1241     uint32_t *const idxp) const {
1242   paranoid_invariant_notnull(idxp);
1243   uint32_t min = this->d.a.start_idx;
1244   uint32_t limit = this->d.a.start_idx + this->d.a.num_values;
1245   uint32_t best = subtree::NODE_NULL;
1246 
1247   while (min != limit) {
1248     const uint32_t mid = (min + limit) / 2;
1249     const int hv = h(this->d.a.values[mid], extra);
1250     if (hv < 0) {
1251       best = mid;
1252       min = mid + 1;
1253     } else {
1254       limit = mid;
1255     }
1256   }
1257   if (best == subtree::NODE_NULL) {
1258     return DB_NOTFOUND;
1259   }
1260   if (value != nullptr) {
1261     copyout(value, &this->d.a.values[best]);
1262   }
1263   *idxp = best - this->d.a.start_idx;
1264   return 0;
1265 }
1266 
1267 template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
1268 template <typename omtcmp_t, int (*h)(const omtdata_t &, const omtcmp_t &)>
find_internal_minus(const subtree & st,const omtcmp_t & extra,omtdataout_t * const value,uint32_t * const idxp)1269 int omt<omtdata_t, omtdataout_t, supports_marks>::find_internal_minus(
1270     const subtree &st, const omtcmp_t &extra, omtdataout_t *const value,
1271     uint32_t *const idxp) const {
1272   paranoid_invariant_notnull(idxp);
1273   if (st.is_null()) {
1274     return DB_NOTFOUND;
1275   }
1276   omt_node *const n = &this->d.t.nodes[st.get_index()];
1277   int hv = h(n->value, extra);
1278   if (hv < 0) {
1279     int r =
1280         this->find_internal_minus<omtcmp_t, h>(n->right, extra, value, idxp);
1281     if (r == 0) {
1282       *idxp += this->nweight(n->left) + 1;
1283     } else if (r == DB_NOTFOUND) {
1284       *idxp = this->nweight(n->left);
1285       if (value != nullptr) {
1286         copyout(value, n);
1287       }
1288       r = 0;
1289     }
1290     return r;
1291   } else {
1292     return this->find_internal_minus<omtcmp_t, h>(n->left, extra, value, idxp);
1293   }
1294 }
1295 }  // namespace toku
1296