1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 
36 ----------------------------------------
37 
38    Licensed under the Apache License, Version 2.0 (the "License");
39    you may not use this file except in compliance with the License.
40    You may obtain a copy of the License at
41 
42        http://www.apache.org/licenses/LICENSE-2.0
43 
44    Unless required by applicable law or agreed to in writing, software
45    distributed under the License is distributed on an "AS IS" BASIS,
46    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
47    See the License for the specific language governing permissions and
48    limitations under the License.
49 ======= */
50 
51 #ident \
52     "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
53 
54 #include <db.h>
55 #include <string.h>
56 
57 #include <portability/memory.h>
58 
59 namespace toku {
60 
61     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
create(void)62     void omt<omtdata_t, omtdataout_t, supports_marks>::create(void) {
63         this->create_internal(2);
64         if (supports_marks) {
65             this->convert_to_tree();
66         }
67     }
68 
69     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
create_no_array(void)70     void omt<omtdata_t, omtdataout_t, supports_marks>::create_no_array(void) {
71         if (!supports_marks) {
72             this->create_internal_no_array(0);
73         } else {
74             this->is_array = false;
75             this->capacity = 0;
76             this->d.t.nodes = nullptr;
77             this->d.t.root.set_to_null();
78             this->d.t.free_idx = 0;
79         }
80     }
81 
82     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
create_from_sorted_array(const omtdata_t * const values,const uint32_t numvalues)83     void omt<omtdata_t, omtdataout_t, supports_marks>::create_from_sorted_array(
84         const omtdata_t *const values,
85         const uint32_t numvalues) {
86         this->create_internal(numvalues);
87         memcpy(this->d.a.values, values, numvalues * (sizeof values[0]));
88         this->d.a.num_values = numvalues;
89         if (supports_marks) {
90             this->convert_to_tree();
91         }
92     }
93 
94     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
95     void
create_steal_sorted_array(omtdata_t ** const values,const uint32_t numvalues,const uint32_t new_capacity)96     omt<omtdata_t, omtdataout_t, supports_marks>::create_steal_sorted_array(
97         omtdata_t **const values,
98         const uint32_t numvalues,
99         const uint32_t new_capacity) {
100         paranoid_invariant_notnull(values);
101         this->create_internal_no_array(new_capacity);
102         this->d.a.num_values = numvalues;
103         this->d.a.values = *values;
104         *values = nullptr;
105         if (supports_marks) {
106             this->convert_to_tree();
107         }
108     }
109 
110     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
split_at(omt * const newomt,const uint32_t idx)111     int omt<omtdata_t, omtdataout_t, supports_marks>::split_at(
112         omt *const newomt,
113         const uint32_t idx) {
114         barf_if_marked(*this);
115         paranoid_invariant_notnull(newomt);
116         if (idx > this->size()) {
117             return EINVAL;
118         }
119         this->convert_to_array();
120         const uint32_t newsize = this->size() - idx;
121         newomt->create_from_sorted_array(
122             &this->d.a.values[this->d.a.start_idx + idx], newsize);
123         this->d.a.num_values = idx;
124         this->maybe_resize_array(idx);
125         if (supports_marks) {
126             this->convert_to_tree();
127         }
128         return 0;
129     }
130 
131     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
merge(omt * const leftomt,omt * const rightomt)132     void omt<omtdata_t, omtdataout_t, supports_marks>::merge(
133         omt *const leftomt,
134         omt *const rightomt) {
135         barf_if_marked(*this);
136         paranoid_invariant_notnull(leftomt);
137         paranoid_invariant_notnull(rightomt);
138         const uint32_t leftsize = leftomt->size();
139         const uint32_t rightsize = rightomt->size();
140         const uint32_t newsize = leftsize + rightsize;
141 
142         if (leftomt->is_array) {
143             if (leftomt->capacity -
144                     (leftomt->d.a.start_idx + leftomt->d.a.num_values) >=
145                 rightsize) {
146                 this->create_steal_sorted_array(&leftomt->d.a.values,
147                                                 leftomt->d.a.num_values,
148                                                 leftomt->capacity);
149                 this->d.a.start_idx = leftomt->d.a.start_idx;
150             } else {
151                 this->create_internal(newsize);
152                 memcpy(&this->d.a.values[0],
153                        &leftomt->d.a.values[leftomt->d.a.start_idx],
154                        leftomt->d.a.num_values * (sizeof this->d.a.values[0]));
155             }
156         } else {
157             this->create_internal(newsize);
158             leftomt->fill_array_with_subtree_values(&this->d.a.values[0],
159                                                     leftomt->d.t.root);
160         }
161         leftomt->destroy();
162         this->d.a.num_values = leftsize;
163 
164         if (rightomt->is_array) {
165             memcpy(
166                 &this->d.a.values[this->d.a.start_idx + this->d.a.num_values],
167                 &rightomt->d.a.values[rightomt->d.a.start_idx],
168                 rightomt->d.a.num_values * (sizeof this->d.a.values[0]));
169         } else {
170             rightomt->fill_array_with_subtree_values(
171                 &this->d.a.values[this->d.a.start_idx + this->d.a.num_values],
172                 rightomt->d.t.root);
173         }
174         rightomt->destroy();
175         this->d.a.num_values += rightsize;
176         paranoid_invariant(this->size() == newsize);
177         if (supports_marks) {
178             this->convert_to_tree();
179         }
180     }
181 
182     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
clone(const omt & src)183     void omt<omtdata_t, omtdataout_t, supports_marks>::clone(const omt &src) {
184         barf_if_marked(*this);
185         this->create_internal(src.size());
186         if (src.is_array) {
187             memcpy(&this->d.a.values[0],
188                    &src.d.a.values[src.d.a.start_idx],
189                    src.d.a.num_values * (sizeof this->d.a.values[0]));
190         } else {
191             src.fill_array_with_subtree_values(&this->d.a.values[0],
192                                                src.d.t.root);
193         }
194         this->d.a.num_values = src.size();
195         if (supports_marks) {
196             this->convert_to_tree();
197         }
198     }
199 
200     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
clear(void)201     void omt<omtdata_t, omtdataout_t, supports_marks>::clear(void) {
202         if (this->is_array) {
203             this->d.a.start_idx = 0;
204             this->d.a.num_values = 0;
205         } else {
206             this->d.t.root.set_to_null();
207             this->d.t.free_idx = 0;
208         }
209     }
210 
211     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
destroy(void)212     void omt<omtdata_t, omtdataout_t, supports_marks>::destroy(void) {
213         this->clear();
214         this->capacity = 0;
215         if (this->is_array) {
216             if (this->d.a.values != nullptr) {
217                 toku_free(this->d.a.values);
218             }
219             this->d.a.values = nullptr;
220         } else {
221             if (this->d.t.nodes != nullptr) {
222                 toku_free(this->d.t.nodes);
223             }
224             this->d.t.nodes = nullptr;
225         }
226     }
227 
228     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
size(void) const229     uint32_t omt<omtdata_t, omtdataout_t, supports_marks>::size(void) const {
230         if (this->is_array) {
231             return this->d.a.num_values;
232         } else {
233             return this->nweight(this->d.t.root);
234         }
235     }
236 
237     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
238     template <typename omtcmp_t, int (*h)(const omtdata_t &, const omtcmp_t &)>
insert(const omtdata_t & value,const omtcmp_t & v,uint32_t * const idx)239     int omt<omtdata_t, omtdataout_t, supports_marks>::insert(
240         const omtdata_t &value,
241         const omtcmp_t &v,
242         uint32_t *const idx) {
243         int r;
244         uint32_t insert_idx;
245 
246         r = this->find_zero<omtcmp_t, h>(v, nullptr, &insert_idx);
247         if (r == 0) {
248             if (idx)
249                 *idx = insert_idx;
250             return DB_KEYEXIST;
251         }
252         if (r != DB_NOTFOUND)
253             return r;
254 
255         if ((r = this->insert_at(value, insert_idx)))
256             return r;
257         if (idx)
258             *idx = insert_idx;
259 
260         return 0;
261     }
262 
263     // The following 3 functions implement a static if for us.
264     template <typename omtdata_t, typename omtdataout_t>
barf_if_marked(const omt<omtdata_t,omtdataout_t,false> & UU (omt))265     static void barf_if_marked(
266         const omt<omtdata_t, omtdataout_t, false> &UU(omt)) {}
267 
268     template <typename omtdata_t, typename omtdataout_t>
barf_if_marked(const omt<omtdata_t,omtdataout_t,true> & omt)269     static void barf_if_marked(const omt<omtdata_t, omtdataout_t, true> &omt) {
270         invariant(!omt.has_marks());
271     }
272 
273     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
has_marks(void) const274     bool omt<omtdata_t, omtdataout_t, supports_marks>::has_marks(void) const {
275         static_assert(supports_marks, "Does not support marks");
276         if (this->d.t.root.is_null()) {
277             return false;
278         }
279         const omt_node &node = this->d.t.nodes[this->d.t.root.get_index()];
280         return node.get_marks_below() || node.get_marked();
281     }
282 
283     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
insert_at(const omtdata_t & value,const uint32_t idx)284     int omt<omtdata_t, omtdataout_t, supports_marks>::insert_at(
285         const omtdata_t &value,
286         const uint32_t idx) {
287         barf_if_marked(*this);
288         if (idx > this->size()) {
289             return EINVAL;
290         }
291 
292         this->maybe_resize_or_convert(this->size() + 1);
293         if (this->is_array && idx != this->d.a.num_values &&
294             (idx != 0 || this->d.a.start_idx == 0)) {
295             this->convert_to_tree();
296         }
297         if (this->is_array) {
298             if (idx == this->d.a.num_values) {
299                 this->d.a.values[this->d.a.start_idx + this->d.a.num_values] =
300                     value;
301             } else {
302                 this->d.a.values[--this->d.a.start_idx] = value;
303             }
304             this->d.a.num_values++;
305         } else {
306             subtree *rebalance_subtree = nullptr;
307             this->insert_internal(
308                 &this->d.t.root, value, idx, &rebalance_subtree);
309             if (rebalance_subtree != nullptr) {
310                 this->rebalance(rebalance_subtree);
311             }
312         }
313         return 0;
314     }
315 
316     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
set_at(const omtdata_t & value,const uint32_t idx)317     int omt<omtdata_t, omtdataout_t, supports_marks>::set_at(
318         const omtdata_t &value,
319         const uint32_t idx) {
320         barf_if_marked(*this);
321         if (idx >= this->size()) {
322             return EINVAL;
323         }
324 
325         if (this->is_array) {
326             this->set_at_internal_array(value, idx);
327         } else {
328             this->set_at_internal(this->d.t.root, value, idx);
329         }
330         return 0;
331     }
332 
333     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
delete_at(const uint32_t idx)334     int omt<omtdata_t, omtdataout_t, supports_marks>::delete_at(
335         const uint32_t idx) {
336         barf_if_marked(*this);
337         if (idx >= this->size()) {
338             return EINVAL;
339         }
340 
341         this->maybe_resize_or_convert(this->size() - 1);
342         if (this->is_array && idx != 0 && idx != this->d.a.num_values - 1) {
343             this->convert_to_tree();
344         }
345         if (this->is_array) {
346             // Testing for 0 does not rule out it being the last entry.
347             // Test explicitly for num_values-1
348             if (idx != this->d.a.num_values - 1) {
349                 this->d.a.start_idx++;
350             }
351             this->d.a.num_values--;
352         } else {
353             subtree *rebalance_subtree = nullptr;
354             this->delete_internal(
355                 &this->d.t.root, idx, nullptr, &rebalance_subtree);
356             if (rebalance_subtree != nullptr) {
357                 this->rebalance(rebalance_subtree);
358             }
359         }
360         return 0;
361     }
362 
363     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
364     template <
365         typename iterate_extra_t,
366         int (*f)(const omtdata_t &, const uint32_t, iterate_extra_t *const)>
iterate(iterate_extra_t * const iterate_extra) const367     int omt<omtdata_t, omtdataout_t, supports_marks>::iterate(
368         iterate_extra_t *const iterate_extra) const {
369         return this->iterate_on_range<iterate_extra_t, f>(
370             0, this->size(), iterate_extra);
371     }
372 
373     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
374     template <
375         typename iterate_extra_t,
376         int (*f)(const omtdata_t &, const uint32_t, iterate_extra_t *const)>
iterate_on_range(const uint32_t left,const uint32_t right,iterate_extra_t * const iterate_extra) const377     int omt<omtdata_t, omtdataout_t, supports_marks>::iterate_on_range(
378         const uint32_t left,
379         const uint32_t right,
380         iterate_extra_t *const iterate_extra) const {
381         if (right > this->size()) {
382             return EINVAL;
383         }
384         if (left == right) {
385             return 0;
386         }
387         if (this->is_array) {
388             return this->iterate_internal_array<iterate_extra_t, f>(
389                 left, right, iterate_extra);
390         }
391         return this->iterate_internal<iterate_extra_t, f>(
392             left, right, this->d.t.root, 0, iterate_extra);
393     }
394 
395     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
396     template <
397         typename iterate_extra_t,
398         int (*f)(const omtdata_t &, const uint32_t, iterate_extra_t *const)>
iterate_and_mark_range(const uint32_t left,const uint32_t right,iterate_extra_t * const iterate_extra)399     int omt<omtdata_t, omtdataout_t, supports_marks>::iterate_and_mark_range(
400         const uint32_t left,
401         const uint32_t right,
402         iterate_extra_t *const iterate_extra) {
403         static_assert(supports_marks, "does not support marks");
404         if (right > this->size()) {
405             return EINVAL;
406         }
407         if (left == right) {
408             return 0;
409         }
410         paranoid_invariant(!this->is_array);
411         return this->iterate_and_mark_range_internal<iterate_extra_t, f>(
412             left, right, this->d.t.root, 0, iterate_extra);
413     }
414 
415     // TODO: We can optimize this if we steal 3 bits.  1 bit: this node is
416     // marked.  1 bit: left subtree has marks. 1 bit: right subtree has marks.
417     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
418     template <
419         typename iterate_extra_t,
420         int (*f)(const omtdata_t &, const uint32_t, iterate_extra_t *const)>
iterate_over_marked(iterate_extra_t * const iterate_extra) const421     int omt<omtdata_t, omtdataout_t, supports_marks>::iterate_over_marked(
422         iterate_extra_t *const iterate_extra) const {
423         static_assert(supports_marks, "does not support marks");
424         paranoid_invariant(!this->is_array);
425         return this->iterate_over_marked_internal<iterate_extra_t, f>(
426             this->d.t.root, 0, iterate_extra);
427     }
428 
429     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
unmark(const subtree & st,const uint32_t index,GrowableArray<node_idx> * const indexes)430     void omt<omtdata_t, omtdataout_t, supports_marks>::unmark(
431         const subtree &st,
432         const uint32_t index,
433         GrowableArray<node_idx> *const indexes) {
434         if (st.is_null()) {
435             return;
436         }
437         omt_node &n = this->d.t.nodes[st.get_index()];
438         const uint32_t index_root = index + this->nweight(n.left);
439 
440         const bool below = n.get_marks_below();
441         if (below) {
442             this->unmark(n.left, index, indexes);
443         }
444         if (n.get_marked()) {
445             indexes->push(index_root);
446         }
447         n.clear_stolen_bits();
448         if (below) {
449             this->unmark(n.right, index_root + 1, indexes);
450         }
451     }
452 
453     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
delete_all_marked(void)454     void omt<omtdata_t, omtdataout_t, supports_marks>::delete_all_marked(void) {
455         static_assert(supports_marks, "does not support marks");
456         if (!this->has_marks()) {
457             return;
458         }
459         paranoid_invariant(!this->is_array);
460         GrowableArray<node_idx> marked_indexes;
461         marked_indexes.init();
462 
463         // Remove all marks.
464         // We need to delete all the stolen bits before calling delete_at to
465         // prevent barfing.
466         this->unmark(this->d.t.root, 0, &marked_indexes);
467 
468         for (uint32_t i = 0; i < marked_indexes.get_size(); i++) {
469             // Delete from left to right, shift by number already deleted.
470             // Alternative is delete from right to left.
471             int r = this->delete_at(marked_indexes.fetch_unchecked(i) - i);
472             lazy_assert_zero(r);
473         }
474         marked_indexes.deinit();
475         barf_if_marked(*this);
476     }
477 
478     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
479     uint32_t omt<omtdata_t, omtdataout_t, supports_marks>::
verify_marks_consistent_internal(const subtree & st,const bool UU (allow_marks)) const480         verify_marks_consistent_internal(const subtree &st,
481                                          const bool UU(allow_marks)) const {
482         if (st.is_null()) {
483             return 0;
484         }
485         const omt_node &node = this->d.t.nodes[st.get_index()];
486         uint32_t num_marks =
487             verify_marks_consistent_internal(node.left, node.get_marks_below());
488         num_marks += verify_marks_consistent_internal(node.right,
489                                                       node.get_marks_below());
490         if (node.get_marks_below()) {
491             paranoid_invariant(allow_marks);
492             paranoid_invariant(num_marks > 0);
493         } else {
494             // redundant with invariant below, but nice to have explicitly
495             paranoid_invariant(num_marks == 0);
496         }
497         if (node.get_marked()) {
498             paranoid_invariant(allow_marks);
499             ++num_marks;
500         }
501         return num_marks;
502     }
503 
504     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
verify_marks_consistent(void) const505     void omt<omtdata_t, omtdataout_t, supports_marks>::verify_marks_consistent(
506         void) const {
507         static_assert(supports_marks, "does not support marks");
508         paranoid_invariant(!this->is_array);
509         this->verify_marks_consistent_internal(this->d.t.root, true);
510     }
511 
512     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
513     template <typename iterate_extra_t,
514               int (*f)(omtdata_t *, const uint32_t, iterate_extra_t *const)>
iterate_ptr(iterate_extra_t * const iterate_extra)515     void omt<omtdata_t, omtdataout_t, supports_marks>::iterate_ptr(
516         iterate_extra_t *const iterate_extra) {
517         if (this->is_array) {
518             this->iterate_ptr_internal_array<iterate_extra_t, f>(
519                 0, this->size(), iterate_extra);
520         } else {
521             this->iterate_ptr_internal<iterate_extra_t, f>(
522                 0, this->size(), this->d.t.root, 0, iterate_extra);
523         }
524     }
525 
526     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
fetch(const uint32_t idx,omtdataout_t * const value) const527     int omt<omtdata_t, omtdataout_t, supports_marks>::fetch(
528         const uint32_t idx,
529         omtdataout_t *const value) const {
530         if (idx >= this->size()) {
531             return EINVAL;
532         }
533         if (this->is_array) {
534             this->fetch_internal_array(idx, value);
535         } else {
536             this->fetch_internal(this->d.t.root, idx, value);
537         }
538         return 0;
539     }
540 
541     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
542     template <typename omtcmp_t, int (*h)(const omtdata_t &, const omtcmp_t &)>
find_zero(const omtcmp_t & extra,omtdataout_t * const value,uint32_t * const idxp) const543     int omt<omtdata_t, omtdataout_t, supports_marks>::find_zero(
544         const omtcmp_t &extra,
545         omtdataout_t *const value,
546         uint32_t *const idxp) const {
547         uint32_t tmp_index;
548         uint32_t *const child_idxp = (idxp != nullptr) ? idxp : &tmp_index;
549         int r;
550         if (this->is_array) {
551             r = this->find_internal_zero_array<omtcmp_t, h>(
552                 extra, value, child_idxp);
553         } else {
554             r = this->find_internal_zero<omtcmp_t, h>(
555                 this->d.t.root, extra, value, child_idxp);
556         }
557         return r;
558     }
559 
560     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
561     template <typename omtcmp_t, int (*h)(const omtdata_t &, const omtcmp_t &)>
find(const omtcmp_t & extra,int direction,omtdataout_t * const value,uint32_t * const idxp) const562     int omt<omtdata_t, omtdataout_t, supports_marks>::find(
563         const omtcmp_t &extra,
564         int direction,
565         omtdataout_t *const value,
566         uint32_t *const idxp) const {
567         uint32_t tmp_index;
568         uint32_t *const child_idxp = (idxp != nullptr) ? idxp : &tmp_index;
569         paranoid_invariant(direction != 0);
570         if (direction < 0) {
571             if (this->is_array) {
572                 return this->find_internal_minus_array<omtcmp_t, h>(
573                     extra, value, child_idxp);
574             } else {
575                 return this->find_internal_minus<omtcmp_t, h>(
576                     this->d.t.root, extra, value, child_idxp);
577             }
578         } else {
579             if (this->is_array) {
580                 return this->find_internal_plus_array<omtcmp_t, h>(
581                     extra, value, child_idxp);
582             } else {
583                 return this->find_internal_plus<omtcmp_t, h>(
584                     this->d.t.root, extra, value, child_idxp);
585             }
586         }
587     }
588 
589     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
memory_size(void)590     size_t omt<omtdata_t, omtdataout_t, supports_marks>::memory_size(void) {
591         if (this->is_array) {
592             return (sizeof *this) +
593                    this->capacity * (sizeof this->d.a.values[0]);
594         }
595         return (sizeof *this) + this->capacity * (sizeof this->d.t.nodes[0]);
596     }
597 
598     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
create_internal_no_array(const uint32_t new_capacity)599     void omt<omtdata_t, omtdataout_t, supports_marks>::create_internal_no_array(
600         const uint32_t new_capacity) {
601         this->is_array = true;
602         this->d.a.start_idx = 0;
603         this->d.a.num_values = 0;
604         this->d.a.values = nullptr;
605         this->capacity = new_capacity;
606     }
607 
608     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
create_internal(const uint32_t new_capacity)609     void omt<omtdata_t, omtdataout_t, supports_marks>::create_internal(
610         const uint32_t new_capacity) {
611         this->create_internal_no_array(new_capacity);
612         XMALLOC_N(this->capacity, this->d.a.values);
613     }
614 
615     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
nweight(const subtree & st) const616     uint32_t omt<omtdata_t, omtdataout_t, supports_marks>::nweight(
617         const subtree &st) const {
618         if (st.is_null()) {
619             return 0;
620         } else {
621             return this->d.t.nodes[st.get_index()].weight;
622         }
623     }
624 
625     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
626     typename omt<omtdata_t, omtdataout_t, supports_marks>::node_idx
node_malloc(void)627     omt<omtdata_t, omtdataout_t, supports_marks>::node_malloc(void) {
628         paranoid_invariant(this->d.t.free_idx < this->capacity);
629         omt_node &n = this->d.t.nodes[this->d.t.free_idx];
630         n.clear_stolen_bits();
631         return this->d.t.free_idx++;
632     }
633 
634     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
node_free(const node_idx UU (idx))635     void omt<omtdata_t, omtdataout_t, supports_marks>::node_free(
636         const node_idx UU(idx)) {
637         paranoid_invariant(idx < this->capacity);
638     }
639 
640     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
maybe_resize_array(const uint32_t n)641     void omt<omtdata_t, omtdataout_t, supports_marks>::maybe_resize_array(
642         const uint32_t n) {
643         const uint32_t new_size = n <= 2 ? 4 : 2 * n;
644         const uint32_t room = this->capacity - this->d.a.start_idx;
645 
646         if (room < n || this->capacity / 2 >= new_size) {
647             omtdata_t *XMALLOC_N(new_size, tmp_values);
648             memcpy(tmp_values,
649                    &this->d.a.values[this->d.a.start_idx],
650                    this->d.a.num_values * (sizeof tmp_values[0]));
651             this->d.a.start_idx = 0;
652             this->capacity = new_size;
653             toku_free(this->d.a.values);
654             this->d.a.values = tmp_values;
655         }
656     }
657 
658     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
659     void omt<omtdata_t, omtdataout_t, supports_marks>::
fill_array_with_subtree_values(omtdata_t * const array,const subtree & st) const660         fill_array_with_subtree_values(omtdata_t *const array,
661                                        const subtree &st) const {
662         if (st.is_null())
663             return;
664         const omt_node &tree = this->d.t.nodes[st.get_index()];
665         this->fill_array_with_subtree_values(&array[0], tree.left);
666         array[this->nweight(tree.left)] = tree.value;
667         this->fill_array_with_subtree_values(
668             &array[this->nweight(tree.left) + 1], tree.right);
669     }
670 
671     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
convert_to_array(void)672     void omt<omtdata_t, omtdataout_t, supports_marks>::convert_to_array(void) {
673         if (!this->is_array) {
674             const uint32_t num_values = this->size();
675             uint32_t new_size = 2 * num_values;
676             new_size = new_size < 4 ? 4 : new_size;
677 
678             omtdata_t *XMALLOC_N(new_size, tmp_values);
679             this->fill_array_with_subtree_values(tmp_values, this->d.t.root);
680             toku_free(this->d.t.nodes);
681             this->is_array = true;
682             this->capacity = new_size;
683             this->d.a.num_values = num_values;
684             this->d.a.values = tmp_values;
685             this->d.a.start_idx = 0;
686         }
687     }
688 
689     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
690     void
rebuild_from_sorted_array(subtree * const st,const omtdata_t * const values,const uint32_t numvalues)691     omt<omtdata_t, omtdataout_t, supports_marks>::rebuild_from_sorted_array(
692         subtree *const st,
693         const omtdata_t *const values,
694         const uint32_t numvalues) {
695         if (numvalues == 0) {
696             st->set_to_null();
697         } else {
698             const uint32_t halfway = numvalues / 2;
699             const node_idx newidx = this->node_malloc();
700             omt_node *const newnode = &this->d.t.nodes[newidx];
701             newnode->weight = numvalues;
702             newnode->value = values[halfway];
703             st->set_index(newidx);
704             // update everything before the recursive calls so the second call
705             // can be a tail call.
706             this->rebuild_from_sorted_array(
707                 &newnode->left, &values[0], halfway);
708             this->rebuild_from_sorted_array(&newnode->right,
709                                             &values[halfway + 1],
710                                             numvalues - (halfway + 1));
711         }
712     }
713 
714     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
convert_to_tree(void)715     void omt<omtdata_t, omtdataout_t, supports_marks>::convert_to_tree(void) {
716         if (this->is_array) {
717             const uint32_t num_nodes = this->size();
718             uint32_t new_size = num_nodes * 2;
719             new_size = new_size < 4 ? 4 : new_size;
720 
721             omt_node *XMALLOC_N(new_size, new_nodes);
722             omtdata_t *const values = this->d.a.values;
723             omtdata_t *const tmp_values = &values[this->d.a.start_idx];
724             this->is_array = false;
725             this->d.t.nodes = new_nodes;
726             this->capacity = new_size;
727             this->d.t.free_idx = 0;
728             this->d.t.root.set_to_null();
729             this->rebuild_from_sorted_array(
730                 &this->d.t.root, tmp_values, num_nodes);
731             toku_free(values);
732         }
733     }
734 
735     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
maybe_resize_or_convert(const uint32_t n)736     void omt<omtdata_t, omtdataout_t, supports_marks>::maybe_resize_or_convert(
737         const uint32_t n) {
738         if (this->is_array) {
739             this->maybe_resize_array(n);
740         } else {
741             const uint32_t new_size = n <= 2 ? 4 : 2 * n;
742             const uint32_t num_nodes = this->nweight(this->d.t.root);
743             if ((this->capacity / 2 >= new_size) ||
744                 (this->d.t.free_idx >= this->capacity && num_nodes < n) ||
745                 (this->capacity < n)) {
746                 this->convert_to_array();
747                 // if we had a free list, the "supports_marks" version could
748                 // just resize, as it is now, we have to convert to and back
749                 // from an array.
750                 if (supports_marks) {
751                     this->convert_to_tree();
752                 }
753             }
754         }
755     }
756 
757     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
will_need_rebalance(const subtree & st,const int leftmod,const int rightmod) const758     bool omt<omtdata_t, omtdataout_t, supports_marks>::will_need_rebalance(
759         const subtree &st,
760         const int leftmod,
761         const int rightmod) const {
762         if (st.is_null()) {
763             return false;
764         }
765         const omt_node &n = this->d.t.nodes[st.get_index()];
766         // one of the 1's is for the root.
767         // the other is to take ceil(n/2)
768         const uint32_t weight_left = this->nweight(n.left) + leftmod;
769         const uint32_t weight_right = this->nweight(n.right) + rightmod;
770         return ((1 + weight_left < (1 + 1 + weight_right) / 2) ||
771                 (1 + weight_right < (1 + 1 + weight_left) / 2));
772     }
773 
774     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
insert_internal(subtree * const subtreep,const omtdata_t & value,const uint32_t idx,subtree ** const rebalance_subtree)775     void omt<omtdata_t, omtdataout_t, supports_marks>::insert_internal(
776         subtree *const subtreep,
777         const omtdata_t &value,
778         const uint32_t idx,
779         subtree **const rebalance_subtree) {
780         if (subtreep->is_null()) {
781             paranoid_invariant_zero(idx);
782             const node_idx newidx = this->node_malloc();
783             omt_node *const newnode = &this->d.t.nodes[newidx];
784             newnode->weight = 1;
785             newnode->left.set_to_null();
786             newnode->right.set_to_null();
787             newnode->value = value;
788             subtreep->set_index(newidx);
789         } else {
790             omt_node &n = this->d.t.nodes[subtreep->get_index()];
791             n.weight++;
792             if (idx <= this->nweight(n.left)) {
793                 if (*rebalance_subtree == nullptr &&
794                     this->will_need_rebalance(*subtreep, 1, 0)) {
795                     *rebalance_subtree = subtreep;
796                 }
797                 this->insert_internal(&n.left, value, idx, rebalance_subtree);
798             } else {
799                 if (*rebalance_subtree == nullptr &&
800                     this->will_need_rebalance(*subtreep, 0, 1)) {
801                     *rebalance_subtree = subtreep;
802                 }
803                 const uint32_t sub_index = idx - this->nweight(n.left) - 1;
804                 this->insert_internal(
805                     &n.right, value, sub_index, rebalance_subtree);
806             }
807         }
808     }
809 
810     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
set_at_internal_array(const omtdata_t & value,const uint32_t idx)811     void omt<omtdata_t, omtdataout_t, supports_marks>::set_at_internal_array(
812         const omtdata_t &value,
813         const uint32_t idx) {
814         this->d.a.values[this->d.a.start_idx + idx] = value;
815     }
816 
817     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
set_at_internal(const subtree & st,const omtdata_t & value,const uint32_t idx)818     void omt<omtdata_t, omtdataout_t, supports_marks>::set_at_internal(
819         const subtree &st,
820         const omtdata_t &value,
821         const uint32_t idx) {
822         paranoid_invariant(!st.is_null());
823         omt_node &n = this->d.t.nodes[st.get_index()];
824         const uint32_t leftweight = this->nweight(n.left);
825         if (idx < leftweight) {
826             this->set_at_internal(n.left, value, idx);
827         } else if (idx == leftweight) {
828             n.value = value;
829         } else {
830             this->set_at_internal(n.right, value, idx - leftweight - 1);
831         }
832     }
833 
834     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
delete_internal(subtree * const subtreep,const uint32_t idx,omt_node * const copyn,subtree ** const rebalance_subtree)835     void omt<omtdata_t, omtdataout_t, supports_marks>::delete_internal(
836         subtree *const subtreep,
837         const uint32_t idx,
838         omt_node *const copyn,
839         subtree **const rebalance_subtree) {
840         paranoid_invariant_notnull(subtreep);
841         paranoid_invariant_notnull(rebalance_subtree);
842         paranoid_invariant(!subtreep->is_null());
843         omt_node &n = this->d.t.nodes[subtreep->get_index()];
844         const uint32_t leftweight = this->nweight(n.left);
845         if (idx < leftweight) {
846             n.weight--;
847             if (*rebalance_subtree == nullptr &&
848                 this->will_need_rebalance(*subtreep, -1, 0)) {
849                 *rebalance_subtree = subtreep;
850             }
851             this->delete_internal(&n.left, idx, copyn, rebalance_subtree);
852         } else if (idx == leftweight) {
853             if (n.left.is_null()) {
854                 const uint32_t oldidx = subtreep->get_index();
855                 *subtreep = n.right;
856                 if (copyn != nullptr) {
857                     copyn->value = n.value;
858                 }
859                 this->node_free(oldidx);
860             } else if (n.right.is_null()) {
861                 const uint32_t oldidx = subtreep->get_index();
862                 *subtreep = n.left;
863                 if (copyn != nullptr) {
864                     copyn->value = n.value;
865                 }
866                 this->node_free(oldidx);
867             } else {
868                 if (*rebalance_subtree == nullptr &&
869                     this->will_need_rebalance(*subtreep, 0, -1)) {
870                     *rebalance_subtree = subtreep;
871                 }
872                 // don't need to copy up value, it's only used by this
873                 // next call, and when that gets to the bottom there
874                 // won't be any more recursion
875                 n.weight--;
876                 this->delete_internal(&n.right, 0, &n, rebalance_subtree);
877             }
878         } else {
879             n.weight--;
880             if (*rebalance_subtree == nullptr &&
881                 this->will_need_rebalance(*subtreep, 0, -1)) {
882                 *rebalance_subtree = subtreep;
883             }
884             this->delete_internal(
885                 &n.right, idx - leftweight - 1, copyn, rebalance_subtree);
886         }
887     }
888 
889     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
890     template <
891         typename iterate_extra_t,
892         int (*f)(const omtdata_t &, const uint32_t, iterate_extra_t *const)>
iterate_internal_array(const uint32_t left,const uint32_t right,iterate_extra_t * const iterate_extra) const893     int omt<omtdata_t, omtdataout_t, supports_marks>::iterate_internal_array(
894         const uint32_t left,
895         const uint32_t right,
896         iterate_extra_t *const iterate_extra) const {
897         int r;
898         for (uint32_t i = left; i < right; ++i) {
899             r = f(this->d.a.values[this->d.a.start_idx + i], i, iterate_extra);
900             if (r != 0) {
901                 return r;
902             }
903         }
904         return 0;
905     }
906 
907     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
908     template <typename iterate_extra_t,
909               int (*f)(omtdata_t *, const uint32_t, iterate_extra_t *const)>
iterate_ptr_internal(const uint32_t left,const uint32_t right,const subtree & st,const uint32_t idx,iterate_extra_t * const iterate_extra)910     void omt<omtdata_t, omtdataout_t, supports_marks>::iterate_ptr_internal(
911         const uint32_t left,
912         const uint32_t right,
913         const subtree &st,
914         const uint32_t idx,
915         iterate_extra_t *const iterate_extra) {
916         if (!st.is_null()) {
917             omt_node &n = this->d.t.nodes[st.get_index()];
918             const uint32_t idx_root = idx + this->nweight(n.left);
919             if (left < idx_root) {
920                 this->iterate_ptr_internal<iterate_extra_t, f>(
921                     left, right, n.left, idx, iterate_extra);
922             }
923             if (left <= idx_root && idx_root < right) {
924                 int r = f(&n.value, idx_root, iterate_extra);
925                 lazy_assert_zero(r);
926             }
927             if (idx_root + 1 < right) {
928                 this->iterate_ptr_internal<iterate_extra_t, f>(
929                     left, right, n.right, idx_root + 1, iterate_extra);
930             }
931         }
932     }
933 
934     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
935     template <typename iterate_extra_t,
936               int (*f)(omtdata_t *, const uint32_t, iterate_extra_t *const)>
937     void
iterate_ptr_internal_array(const uint32_t left,const uint32_t right,iterate_extra_t * const iterate_extra)938     omt<omtdata_t, omtdataout_t, supports_marks>::iterate_ptr_internal_array(
939         const uint32_t left,
940         const uint32_t right,
941         iterate_extra_t *const iterate_extra) {
942         for (uint32_t i = left; i < right; ++i) {
943             int r =
944                 f(&this->d.a.values[this->d.a.start_idx + i], i, iterate_extra);
945             lazy_assert_zero(r);
946         }
947     }
948 
949     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
950     template <
951         typename iterate_extra_t,
952         int (*f)(const omtdata_t &, const uint32_t, iterate_extra_t *const)>
iterate_internal(const uint32_t left,const uint32_t right,const subtree & st,const uint32_t idx,iterate_extra_t * const iterate_extra) const953     int omt<omtdata_t, omtdataout_t, supports_marks>::iterate_internal(
954         const uint32_t left,
955         const uint32_t right,
956         const subtree &st,
957         const uint32_t idx,
958         iterate_extra_t *const iterate_extra) const {
959         if (st.is_null()) {
960             return 0;
961         }
962         int r;
963         const omt_node &n = this->d.t.nodes[st.get_index()];
964         const uint32_t idx_root = idx + this->nweight(n.left);
965         if (left < idx_root) {
966             r = this->iterate_internal<iterate_extra_t, f>(
967                 left, right, n.left, idx, iterate_extra);
968             if (r != 0) {
969                 return r;
970             }
971         }
972         if (left <= idx_root && idx_root < right) {
973             r = f(n.value, idx_root, iterate_extra);
974             if (r != 0) {
975                 return r;
976             }
977         }
978         if (idx_root + 1 < right) {
979             return this->iterate_internal<iterate_extra_t, f>(
980                 left, right, n.right, idx_root + 1, iterate_extra);
981         }
982         return 0;
983     }
984 
985     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
986     template <
987         typename iterate_extra_t,
988         int (*f)(const omtdata_t &, const uint32_t, iterate_extra_t *const)>
989     int omt<omtdata_t, omtdataout_t, supports_marks>::
iterate_and_mark_range_internal(const uint32_t left,const uint32_t right,const subtree & st,const uint32_t idx,iterate_extra_t * const iterate_extra)990         iterate_and_mark_range_internal(const uint32_t left,
991                                         const uint32_t right,
992                                         const subtree &st,
993                                         const uint32_t idx,
994                                         iterate_extra_t *const iterate_extra) {
995         paranoid_invariant(!st.is_null());
996         int r;
997         omt_node &n = this->d.t.nodes[st.get_index()];
998         const uint32_t idx_root = idx + this->nweight(n.left);
999         if (left < idx_root && !n.left.is_null()) {
1000             n.set_marks_below_bit();
1001             r = this->iterate_and_mark_range_internal<iterate_extra_t, f>(
1002                 left, right, n.left, idx, iterate_extra);
1003             if (r != 0) {
1004                 return r;
1005             }
1006         }
1007         if (left <= idx_root && idx_root < right) {
1008             n.set_marked_bit();
1009             r = f(n.value, idx_root, iterate_extra);
1010             if (r != 0) {
1011                 return r;
1012             }
1013         }
1014         if (idx_root + 1 < right && !n.right.is_null()) {
1015             n.set_marks_below_bit();
1016             return this->iterate_and_mark_range_internal<iterate_extra_t, f>(
1017                 left, right, n.right, idx_root + 1, iterate_extra);
1018         }
1019         return 0;
1020     }
1021 
1022     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
1023     template <
1024         typename iterate_extra_t,
1025         int (*f)(const omtdata_t &, const uint32_t, iterate_extra_t *const)>
1026     int
iterate_over_marked_internal(const subtree & st,const uint32_t idx,iterate_extra_t * const iterate_extra) const1027     omt<omtdata_t, omtdataout_t, supports_marks>::iterate_over_marked_internal(
1028         const subtree &st,
1029         const uint32_t idx,
1030         iterate_extra_t *const iterate_extra) const {
1031         if (st.is_null()) {
1032             return 0;
1033         }
1034         int r;
1035         const omt_node &n = this->d.t.nodes[st.get_index()];
1036         const uint32_t idx_root = idx + this->nweight(n.left);
1037         if (n.get_marks_below()) {
1038             r = this->iterate_over_marked_internal<iterate_extra_t, f>(
1039                 n.left, idx, iterate_extra);
1040             if (r != 0) {
1041                 return r;
1042             }
1043         }
1044         if (n.get_marked()) {
1045             r = f(n.value, idx_root, iterate_extra);
1046             if (r != 0) {
1047                 return r;
1048             }
1049         }
1050         if (n.get_marks_below()) {
1051             return this->iterate_over_marked_internal<iterate_extra_t, f>(
1052                 n.right, idx_root + 1, iterate_extra);
1053         }
1054         return 0;
1055     }
1056 
1057     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
fetch_internal_array(const uint32_t i,omtdataout_t * const value) const1058     void omt<omtdata_t, omtdataout_t, supports_marks>::fetch_internal_array(
1059         const uint32_t i,
1060         omtdataout_t *const value) const {
1061         if (value != nullptr) {
1062             copyout(value, &this->d.a.values[this->d.a.start_idx + i]);
1063         }
1064     }
1065 
1066     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
fetch_internal(const subtree & st,const uint32_t i,omtdataout_t * const value) const1067     void omt<omtdata_t, omtdataout_t, supports_marks>::fetch_internal(
1068         const subtree &st,
1069         const uint32_t i,
1070         omtdataout_t *const value) const {
1071         omt_node &n = this->d.t.nodes[st.get_index()];
1072         const uint32_t leftweight = this->nweight(n.left);
1073         if (i < leftweight) {
1074             this->fetch_internal(n.left, i, value);
1075         } else if (i == leftweight) {
1076             if (value != nullptr) {
1077                 copyout(value, &n);
1078             }
1079         } else {
1080             this->fetch_internal(n.right, i - leftweight - 1, value);
1081         }
1082     }
1083 
1084     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
1085     void
fill_array_with_subtree_idxs(node_idx * const array,const subtree & st) const1086     omt<omtdata_t, omtdataout_t, supports_marks>::fill_array_with_subtree_idxs(
1087         node_idx *const array,
1088         const subtree &st) const {
1089         if (!st.is_null()) {
1090             const omt_node &tree = this->d.t.nodes[st.get_index()];
1091             this->fill_array_with_subtree_idxs(&array[0], tree.left);
1092             array[this->nweight(tree.left)] = st.get_index();
1093             this->fill_array_with_subtree_idxs(
1094                 &array[this->nweight(tree.left) + 1], tree.right);
1095         }
1096     }
1097 
1098     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
1099     void
rebuild_subtree_from_idxs(subtree * const st,const node_idx * const idxs,const uint32_t numvalues)1100     omt<omtdata_t, omtdataout_t, supports_marks>::rebuild_subtree_from_idxs(
1101         subtree *const st,
1102         const node_idx *const idxs,
1103         const uint32_t numvalues) {
1104         if (numvalues == 0) {
1105             st->set_to_null();
1106         } else {
1107             uint32_t halfway = numvalues / 2;
1108             st->set_index(idxs[halfway]);
1109             // node_idx newidx = idxs[halfway];
1110             omt_node &newnode = this->d.t.nodes[st->get_index()];
1111             newnode.weight = numvalues;
1112             // value is already in there.
1113             this->rebuild_subtree_from_idxs(&newnode.left, &idxs[0], halfway);
1114             this->rebuild_subtree_from_idxs(
1115                 &newnode.right, &idxs[halfway + 1], numvalues - (halfway + 1));
1116             // n_idx = newidx;
1117         }
1118     }
1119 
1120     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
rebalance(subtree * const st)1121     void omt<omtdata_t, omtdataout_t, supports_marks>::rebalance(
1122         subtree *const st) {
1123         node_idx idx = st->get_index();
1124         if (idx == this->d.t.root.get_index()) {
1125             // Try to convert to an array.
1126             // If this fails, (malloc) nothing will have changed.
1127             // In the failure case we continue on to the standard rebalance
1128             // algorithm.
1129             this->convert_to_array();
1130             if (supports_marks) {
1131                 this->convert_to_tree();
1132             }
1133         } else {
1134             const omt_node &n = this->d.t.nodes[idx];
1135             node_idx *tmp_array;
1136             size_t mem_needed = n.weight * (sizeof tmp_array[0]);
1137             size_t mem_free = (this->capacity - this->d.t.free_idx) *
1138                               (sizeof this->d.t.nodes[0]);
1139             bool malloced;
1140             if (mem_needed <= mem_free) {
1141                 // There is sufficient free space at the end of the nodes array
1142                 // to hold enough node indexes to rebalance.
1143                 malloced = false;
1144                 tmp_array = reinterpret_cast<node_idx *>(
1145                     &this->d.t.nodes[this->d.t.free_idx]);
1146             } else {
1147                 malloced = true;
1148                 XMALLOC_N(n.weight, tmp_array);
1149             }
1150             this->fill_array_with_subtree_idxs(tmp_array, *st);
1151             this->rebuild_subtree_from_idxs(st, tmp_array, n.weight);
1152             if (malloced)
1153                 toku_free(tmp_array);
1154         }
1155     }
1156 
1157     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
copyout(omtdata_t * const out,const omt_node * const n)1158     void omt<omtdata_t, omtdataout_t, supports_marks>::copyout(
1159         omtdata_t *const out,
1160         const omt_node *const n) {
1161         *out = n->value;
1162     }
1163 
1164     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
copyout(omtdata_t ** const out,omt_node * const n)1165     void omt<omtdata_t, omtdataout_t, supports_marks>::copyout(
1166         omtdata_t **const out,
1167         omt_node *const n) {
1168         *out = &n->value;
1169     }
1170 
1171     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
copyout(omtdata_t * const out,const omtdata_t * const stored_value_ptr)1172     void omt<omtdata_t, omtdataout_t, supports_marks>::copyout(
1173         omtdata_t *const out,
1174         const omtdata_t *const stored_value_ptr) {
1175         *out = *stored_value_ptr;
1176     }
1177 
1178     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
copyout(omtdata_t ** const out,omtdata_t * const stored_value_ptr)1179     void omt<omtdata_t, omtdataout_t, supports_marks>::copyout(
1180         omtdata_t **const out,
1181         omtdata_t *const stored_value_ptr) {
1182         *out = stored_value_ptr;
1183     }
1184 
1185     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
1186     template <typename omtcmp_t, int (*h)(const omtdata_t &, const omtcmp_t &)>
find_internal_zero_array(const omtcmp_t & extra,omtdataout_t * const value,uint32_t * const idxp) const1187     int omt<omtdata_t, omtdataout_t, supports_marks>::find_internal_zero_array(
1188         const omtcmp_t &extra,
1189         omtdataout_t *const value,
1190         uint32_t *const idxp) const {
1191         paranoid_invariant_notnull(idxp);
1192         uint32_t min = this->d.a.start_idx;
1193         uint32_t limit = this->d.a.start_idx + this->d.a.num_values;
1194         uint32_t best_pos = subtree::NODE_NULL;
1195         uint32_t best_zero = subtree::NODE_NULL;
1196 
1197         while (min != limit) {
1198             uint32_t mid = (min + limit) / 2;
1199             int hv = h(this->d.a.values[mid], extra);
1200             if (hv < 0) {
1201                 min = mid + 1;
1202             } else if (hv > 0) {
1203                 best_pos = mid;
1204                 limit = mid;
1205             } else {
1206                 best_zero = mid;
1207                 limit = mid;
1208             }
1209         }
1210         if (best_zero != subtree::NODE_NULL) {
1211             // Found a zero
1212             if (value != nullptr) {
1213                 copyout(value, &this->d.a.values[best_zero]);
1214             }
1215             *idxp = best_zero - this->d.a.start_idx;
1216             return 0;
1217         }
1218         if (best_pos != subtree::NODE_NULL)
1219             *idxp = best_pos - this->d.a.start_idx;
1220         else
1221             *idxp = this->d.a.num_values;
1222         return DB_NOTFOUND;
1223     }
1224 
1225     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
1226     template <typename omtcmp_t, int (*h)(const omtdata_t &, const omtcmp_t &)>
find_internal_zero(const subtree & st,const omtcmp_t & extra,omtdataout_t * const value,uint32_t * const idxp) const1227     int omt<omtdata_t, omtdataout_t, supports_marks>::find_internal_zero(
1228         const subtree &st,
1229         const omtcmp_t &extra,
1230         omtdataout_t *const value,
1231         uint32_t *const idxp) const {
1232         paranoid_invariant_notnull(idxp);
1233         if (st.is_null()) {
1234             *idxp = 0;
1235             return DB_NOTFOUND;
1236         }
1237         omt_node &n = this->d.t.nodes[st.get_index()];
1238         int hv = h(n.value, extra);
1239         if (hv < 0) {
1240             int r = this->find_internal_zero<omtcmp_t, h>(
1241                 n.right, extra, value, idxp);
1242             *idxp += this->nweight(n.left) + 1;
1243             return r;
1244         } else if (hv > 0) {
1245             return this->find_internal_zero<omtcmp_t, h>(
1246                 n.left, extra, value, idxp);
1247         } else {
1248             int r = this->find_internal_zero<omtcmp_t, h>(
1249                 n.left, extra, value, idxp);
1250             if (r == DB_NOTFOUND) {
1251                 *idxp = this->nweight(n.left);
1252                 if (value != nullptr) {
1253                     copyout(value, &n);
1254                 }
1255                 r = 0;
1256             }
1257             return r;
1258         }
1259     }
1260 
1261     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
1262     template <typename omtcmp_t, int (*h)(const omtdata_t &, const omtcmp_t &)>
find_internal_plus_array(const omtcmp_t & extra,omtdataout_t * const value,uint32_t * const idxp) const1263     int omt<omtdata_t, omtdataout_t, supports_marks>::find_internal_plus_array(
1264         const omtcmp_t &extra,
1265         omtdataout_t *const value,
1266         uint32_t *const idxp) const {
1267         paranoid_invariant_notnull(idxp);
1268         uint32_t min = this->d.a.start_idx;
1269         uint32_t limit = this->d.a.start_idx + this->d.a.num_values;
1270         uint32_t best = subtree::NODE_NULL;
1271 
1272         while (min != limit) {
1273             const uint32_t mid = (min + limit) / 2;
1274             const int hv = h(this->d.a.values[mid], extra);
1275             if (hv > 0) {
1276                 best = mid;
1277                 limit = mid;
1278             } else {
1279                 min = mid + 1;
1280             }
1281         }
1282         if (best == subtree::NODE_NULL) {
1283             return DB_NOTFOUND;
1284         }
1285         if (value != nullptr) {
1286             copyout(value, &this->d.a.values[best]);
1287         }
1288         *idxp = best - this->d.a.start_idx;
1289         return 0;
1290     }
1291 
1292     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
1293     template <typename omtcmp_t, int (*h)(const omtdata_t &, const omtcmp_t &)>
find_internal_plus(const subtree & st,const omtcmp_t & extra,omtdataout_t * const value,uint32_t * const idxp) const1294     int omt<omtdata_t, omtdataout_t, supports_marks>::find_internal_plus(
1295         const subtree &st,
1296         const omtcmp_t &extra,
1297         omtdataout_t *const value,
1298         uint32_t *const idxp) const {
1299         paranoid_invariant_notnull(idxp);
1300         if (st.is_null()) {
1301             return DB_NOTFOUND;
1302         }
1303         omt_node *const n = &this->d.t.nodes[st.get_index()];
1304         int hv = h(n->value, extra);
1305         int r;
1306         if (hv > 0) {
1307             r = this->find_internal_plus<omtcmp_t, h>(
1308                 n->left, extra, value, idxp);
1309             if (r == DB_NOTFOUND) {
1310                 *idxp = this->nweight(n->left);
1311                 if (value != nullptr) {
1312                     copyout(value, n);
1313                 }
1314                 r = 0;
1315             }
1316         } else {
1317             r = this->find_internal_plus<omtcmp_t, h>(
1318                 n->right, extra, value, idxp);
1319             if (r == 0) {
1320                 *idxp += this->nweight(n->left) + 1;
1321             }
1322         }
1323         return r;
1324     }
1325 
1326     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
1327     template <typename omtcmp_t, int (*h)(const omtdata_t &, const omtcmp_t &)>
find_internal_minus_array(const omtcmp_t & extra,omtdataout_t * const value,uint32_t * const idxp) const1328     int omt<omtdata_t, omtdataout_t, supports_marks>::find_internal_minus_array(
1329         const omtcmp_t &extra,
1330         omtdataout_t *const value,
1331         uint32_t *const idxp) const {
1332         paranoid_invariant_notnull(idxp);
1333         uint32_t min = this->d.a.start_idx;
1334         uint32_t limit = this->d.a.start_idx + this->d.a.num_values;
1335         uint32_t best = subtree::NODE_NULL;
1336 
1337         while (min != limit) {
1338             const uint32_t mid = (min + limit) / 2;
1339             const int hv = h(this->d.a.values[mid], extra);
1340             if (hv < 0) {
1341                 best = mid;
1342                 min = mid + 1;
1343             } else {
1344                 limit = mid;
1345             }
1346         }
1347         if (best == subtree::NODE_NULL) {
1348             return DB_NOTFOUND;
1349         }
1350         if (value != nullptr) {
1351             copyout(value, &this->d.a.values[best]);
1352         }
1353         *idxp = best - this->d.a.start_idx;
1354         return 0;
1355     }
1356 
1357     template <typename omtdata_t, typename omtdataout_t, bool supports_marks>
1358     template <typename omtcmp_t, int (*h)(const omtdata_t &, const omtcmp_t &)>
find_internal_minus(const subtree & st,const omtcmp_t & extra,omtdataout_t * const value,uint32_t * const idxp) const1359     int omt<omtdata_t, omtdataout_t, supports_marks>::find_internal_minus(
1360         const subtree &st,
1361         const omtcmp_t &extra,
1362         omtdataout_t *const value,
1363         uint32_t *const idxp) const {
1364         paranoid_invariant_notnull(idxp);
1365         if (st.is_null()) {
1366             return DB_NOTFOUND;
1367         }
1368         omt_node *const n = &this->d.t.nodes[st.get_index()];
1369         int hv = h(n->value, extra);
1370         if (hv < 0) {
1371             int r = this->find_internal_minus<omtcmp_t, h>(
1372                 n->right, extra, value, idxp);
1373             if (r == 0) {
1374                 *idxp += this->nweight(n->left) + 1;
1375             } else if (r == DB_NOTFOUND) {
1376                 *idxp = this->nweight(n->left);
1377                 if (value != nullptr) {
1378                     copyout(value, n);
1379                 }
1380                 r = 0;
1381             }
1382             return r;
1383         } else {
1384             return this->find_internal_minus<omtcmp_t, h>(
1385                 n->left, extra, value, idxp);
1386         }
1387     }
1388 }  // namespace toku
1389