1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include <string.h>
40 #include <db.h>
41 
42 #include <portability/memory.h>
43 #include <limits.h>
44 
45 namespace toku {
46 
47 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
create(void)48 void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::create(void) {
49     toku_mempool_zero(&this->mp);
50     this->values_same_size = true;
51     this->value_length = 0;
52     this->is_array = true;
53     this->d.a.num_values = 0;
54     //TODO: maybe allocate enough space for something by default?
55     //      We may be relying on not needing to allocate space the first time (due to limited time spent while a lock is held)
56 }
57 
58 /**
59  * Note: create_from_sorted_memory_of_fixed_size_elements does not take ownership of 'mem'.
60  * Owner is still responsible for freeing it.
61  * While in the OMT a similar function would steal ownership, this doesn't make sense for the DMT because
62  * we (usually) have to add padding for alignment (mem has all of the elements PACKED).
63  * Also all current uses (as of Jan 12, 2014) of this function would require mallocing a new array
64  * in order to allow stealing.
65  */
66 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
create_from_sorted_memory_of_fixed_size_elements(const void * mem,const uint32_t numvalues,const uint32_t mem_length,const uint32_t fixed_value_length)67 void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::create_from_sorted_memory_of_fixed_size_elements(
68         const void *mem,
69         const uint32_t numvalues,
70         const uint32_t mem_length,
71         const uint32_t fixed_value_length) {
72     this->values_same_size = true;
73     this->value_length = fixed_value_length;
74     this->is_array = true;
75     this->d.a.num_values = numvalues;
76     const uint8_t pad_bytes = get_fixed_length_alignment_overhead();
77     uint32_t aligned_memsize = mem_length + numvalues * pad_bytes;
78     toku_mempool_construct(&this->mp, aligned_memsize);
79     if (aligned_memsize > 0) {
80         paranoid_invariant(numvalues > 0);
81         void *ptr = toku_mempool_malloc(&this->mp, aligned_memsize);
82         paranoid_invariant_notnull(ptr);
83         uint8_t * const dest = static_cast<uint8_t *>(ptr);
84         const uint8_t * const src = static_cast<const uint8_t *>(mem);
85         if (pad_bytes == 0) {
86             paranoid_invariant(aligned_memsize == mem_length);
87             memcpy(dest, src, aligned_memsize);
88         } else {
89             // TODO(leif): check what vectorizes best: multiplying like this or adding to offsets
90             const uint32_t fixed_len = this->value_length;
91             const uint32_t fixed_aligned_len = align(this->value_length);
92             paranoid_invariant(this->d.a.num_values*fixed_len == mem_length);
93             for (uint32_t i = 0; i < this->d.a.num_values; i++) {
94                 memcpy(&dest[i*fixed_aligned_len], &src[i*fixed_len], fixed_len);
95             }
96         }
97     }
98 }
99 
100 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
clone(const dmt & src)101 void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::clone(const dmt &src) {
102     *this = src;
103     toku_mempool_clone(&src.mp, &this->mp);
104 }
105 
106 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
clear(void)107 void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::clear(void) {
108     this->is_array = true;
109     this->d.a.num_values = 0;
110     this->values_same_size = true;  // Reset state
111     this->value_length = 0;
112     //TODO(leif): Note that this can mess with our memory_footprint calculation (we may touch past what is marked as 'used' in the mempool)
113     //            One 'fix' is for mempool to also track what was touched, and reset() shouldn't reset that, though realloc() might.
114     toku_mempool_reset(&this->mp);
115 }
116 
117 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
destroy(void)118 void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::destroy(void) {
119     this->clear();
120     toku_mempool_destroy(&this->mp);
121 }
122 
123 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
size(void) const124 uint32_t dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::size(void) const {
125     if (this->is_array) {
126         return this->d.a.num_values;
127     } else {
128         return this->nweight(this->d.t.root);
129     }
130 }
131 
132 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
nweight(const subtree & subtree) const133 uint32_t dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::nweight(const subtree &subtree) const {
134     if (subtree.is_null()) {
135         return 0;
136     } else {
137         const dmt_node & node = get_node(subtree);
138         return node.weight;
139     }
140 }
141 
142 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
143 template<typename dmtcmp_t, int (*h)(const uint32_t size, const dmtdata_t &, const dmtcmp_t &)>
insert(const dmtwriter_t & value,const dmtcmp_t & v,uint32_t * const idx)144 int dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::insert(const dmtwriter_t &value, const dmtcmp_t &v, uint32_t *const idx) {
145     int r;
146     uint32_t insert_idx;
147 
148     r = this->find_zero<dmtcmp_t, h>(v, nullptr, nullptr, &insert_idx);
149     if (r==0) {
150         if (idx) *idx = insert_idx;
151         return DB_KEYEXIST;
152     }
153     if (r != DB_NOTFOUND) return r;
154 
155     if ((r = this->insert_at(value, insert_idx))) return r;
156     if (idx) *idx = insert_idx;
157 
158     return 0;
159 }
160 
161 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
insert_at(const dmtwriter_t & value,const uint32_t idx)162 int dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::insert_at(const dmtwriter_t &value, const uint32_t idx) {
163     if (idx > this->size()) { return EINVAL; }
164 
165     bool same_size = this->values_same_size && (this->size() == 0 || value.get_size() == this->value_length);
166     if (this->is_array) {
167         if (same_size && idx == this->d.a.num_values) {
168             return this->insert_at_array_end<true>(value);
169         }
170         this->convert_from_array_to_tree();
171     }
172     // Is a tree.
173     paranoid_invariant(!is_array);
174     if (!same_size) {
175         this->values_same_size = false;
176         this->value_length = 0;
177     }
178 
179     this->maybe_resize_tree(&value);
180     subtree *rebalance_subtree = nullptr;
181     this->insert_internal(&this->d.t.root, value, idx, &rebalance_subtree);
182     if (rebalance_subtree != nullptr) {
183         this->rebalance(rebalance_subtree);
184     }
185     return 0;
186 }
187 
188 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
189 template<bool with_resize>
insert_at_array_end(const dmtwriter_t & value_in)190 int dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::insert_at_array_end(const dmtwriter_t& value_in) {
191     paranoid_invariant(this->is_array);
192     paranoid_invariant(this->values_same_size);
193     if (this->d.a.num_values == 0) {
194         this->value_length = value_in.get_size();
195     }
196     paranoid_invariant(this->value_length == value_in.get_size());
197 
198     if (with_resize) {
199         this->maybe_resize_array_for_insert();
200     }
201     dmtdata_t *dest = this->alloc_array_value_end();
202     value_in.write_to(dest);
203     return 0;
204 }
205 
206 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
207 dmtdata_t * dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::alloc_array_value_end(void) {
208     paranoid_invariant(this->is_array);
209     paranoid_invariant(this->values_same_size);
210     this->d.a.num_values++;
211 
212     void *ptr = toku_mempool_malloc(&this->mp, align(this->value_length));
213     paranoid_invariant_notnull(ptr);
214     paranoid_invariant(reinterpret_cast<size_t>(ptr) % ALIGNMENT == 0);
215     dmtdata_t *CAST_FROM_VOIDP(n, ptr);
216     paranoid_invariant(n == get_array_value(this->d.a.num_values - 1));
217     return n;
218 }
219 
220 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
221 dmtdata_t * dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::get_array_value(const uint32_t idx) const {
222     paranoid_invariant(this->is_array);
223     paranoid_invariant(this->values_same_size);
224 
225     paranoid_invariant(idx < this->d.a.num_values);
226     return get_array_value_internal(&this->mp, idx);
227 }
228 
229 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
230 dmtdata_t * dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::get_array_value_internal(const struct mempool *mempool, const uint32_t idx) const {
231     void* ptr = toku_mempool_get_pointer_from_base_and_offset(mempool, idx * align(this->value_length));
232     dmtdata_t *CAST_FROM_VOIDP(value, ptr);
233     return value;
234 }
235 
236 //TODO(leif) write microbenchmarks to compare growth factor.  Note:  growth factor here is actually 2.5 because of mempool_construct
237 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
maybe_resize_array_for_insert(void)238 void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::maybe_resize_array_for_insert(void) {
239     bool space_available = toku_mempool_get_free_size(&this->mp) >= align(this->value_length);
240 
241     if (!space_available) {
242         const uint32_t n = this->d.a.num_values + 1;
243         const uint32_t new_n = n <=2 ? 4 : 2*n;
244         const uint32_t new_space = align(this->value_length) * new_n;
245 
246         struct mempool new_kvspace;
247         toku_mempool_construct(&new_kvspace, new_space);
248         size_t copy_bytes = this->d.a.num_values * align(this->value_length);
249         invariant(copy_bytes + align(this->value_length) <= new_space);
250         paranoid_invariant(copy_bytes <= toku_mempool_get_used_size(&this->mp));
251         // Copy over to new mempool
252         if (this->d.a.num_values > 0) {
253             void* dest = toku_mempool_malloc(&new_kvspace, copy_bytes);
254             invariant(dest!=nullptr);
255             memcpy(dest, get_array_value(0), copy_bytes);
256         }
257         toku_mempool_destroy(&this->mp);
258         this->mp = new_kvspace;
259     }
260 }
261 
262 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
align(const uint32_t x) const263 uint32_t dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::align(const uint32_t x) const {
264     return roundup_to_multiple(ALIGNMENT, x);
265 }
266 
267 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
prepare_for_serialize(void)268 void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::prepare_for_serialize(void) {
269     if (!this->is_array) {
270         this->convert_from_tree_to_array();
271     }
272 }
273 
274 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
convert_from_tree_to_array(void)275 void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::convert_from_tree_to_array(void) {
276     paranoid_invariant(!this->is_array);
277     paranoid_invariant(this->values_same_size);
278 
279     const uint32_t num_values = this->size();
280 
281     node_offset *tmp_array;
282     bool malloced = false;
283     tmp_array = alloc_temp_node_offsets(num_values);
284     if (!tmp_array) {
285         malloced = true;
286         XMALLOC_N(num_values, tmp_array);
287     }
288     this->fill_array_with_subtree_offsets(tmp_array, this->d.t.root);
289 
290     struct mempool new_mp;
291     const uint32_t fixed_len = this->value_length;
292     const uint32_t fixed_aligned_len = align(this->value_length);
293     size_t mem_needed = num_values * fixed_aligned_len;
294     toku_mempool_construct(&new_mp, mem_needed);
295     uint8_t* CAST_FROM_VOIDP(dest, toku_mempool_malloc(&new_mp, mem_needed));
296     paranoid_invariant_notnull(dest);
297     for (uint32_t i = 0; i < num_values; i++) {
298         const dmt_node &n = get_node(tmp_array[i]);
299         memcpy(&dest[i*fixed_aligned_len], &n.value, fixed_len);
300     }
301     toku_mempool_destroy(&this->mp);
302     this->mp = new_mp;
303     this->is_array = true;
304     this->d.a.num_values = num_values;
305 
306     if (malloced) toku_free(tmp_array);
307 }
308 
309 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
convert_from_array_to_tree(void)310 void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::convert_from_array_to_tree(void) {
311     paranoid_invariant(this->is_array);
312     paranoid_invariant(this->values_same_size);
313 
314     //save array-format information to locals
315     const uint32_t num_values = this->d.a.num_values;
316 
317     node_offset *tmp_array;
318     bool malloced = false;
319     tmp_array = alloc_temp_node_offsets(num_values);
320     if (!tmp_array) {
321         malloced = true;
322         XMALLOC_N(num_values, tmp_array);
323     }
324 
325     struct mempool old_mp = this->mp;
326     size_t mem_needed = num_values * align(this->value_length + __builtin_offsetof(dmt_node, value));
327     toku_mempool_construct(&this->mp, mem_needed);
328 
329     for (uint32_t i = 0; i < num_values; i++) {
330         dmtwriter_t writer(this->value_length, get_array_value_internal(&old_mp, i));
331         tmp_array[i] = node_malloc_and_set_value(writer);
332     }
333     this->is_array = false;
334     this->rebuild_subtree_from_offsets(&this->d.t.root, tmp_array, num_values);
335 
336     if (malloced) toku_free(tmp_array);
337     toku_mempool_destroy(&old_mp);
338 }
339 
340 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
delete_at(const uint32_t idx)341 int dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::delete_at(const uint32_t idx) {
342     uint32_t n = this->size();
343     if (idx >= n) { return EINVAL; }
344 
345     if (n == 1) {
346         this->clear();  //Emptying out the entire dmt.
347         return 0;
348     }
349     if (this->is_array) {
350         this->convert_from_array_to_tree();
351     }
352     paranoid_invariant(!is_array);
353 
354     subtree *rebalance_subtree = nullptr;
355     this->delete_internal(&this->d.t.root, idx, nullptr, &rebalance_subtree);
356     if (rebalance_subtree != nullptr) {
357         this->rebalance(rebalance_subtree);
358     }
359     this->maybe_resize_tree(nullptr);
360     return 0;
361 }
362 
363 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
364 template<typename iterate_extra_t,
365          int (*f)(const uint32_t, const dmtdata_t &, const uint32_t, iterate_extra_t *const)>
iterate(iterate_extra_t * const iterate_extra) const366 int dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::iterate(iterate_extra_t *const iterate_extra) const {
367     return this->iterate_on_range<iterate_extra_t, f>(0, this->size(), iterate_extra);
368 }
369 
370 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
371 template<typename iterate_extra_t,
372          int (*f)(const uint32_t, const dmtdata_t &, const uint32_t, iterate_extra_t *const)>
iterate_on_range(const uint32_t left,const uint32_t right,iterate_extra_t * const iterate_extra) const373 int dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::iterate_on_range(const uint32_t left, const uint32_t right, iterate_extra_t *const iterate_extra) const {
374     if (right > this->size()) { return EINVAL; }
375     if (left == right) { return 0; }
376     if (this->is_array) {
377         return this->iterate_internal_array<iterate_extra_t, f>(left, right, iterate_extra);
378     }
379     return this->iterate_internal<iterate_extra_t, f>(left, right, this->d.t.root, 0, iterate_extra);
380 }
381 
382 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
verify(void) const383 void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::verify(void) const {
384     uint32_t num_values = this->size();
385     invariant(num_values < UINT32_MAX);
386     size_t pool_used = toku_mempool_get_used_size(&this->mp);
387     size_t pool_size = toku_mempool_get_size(&this->mp);
388     size_t pool_frag = toku_mempool_get_frag_size(&this->mp);
389     invariant(pool_used <= pool_size);
390     if (this->is_array) {
391         invariant(this->values_same_size);
392         invariant(num_values == this->d.a.num_values);
393 
394         // We know exactly how much memory should be used.
395         invariant(pool_used == num_values * align(this->value_length));
396 
397         // Array form must have 0 fragmentation in mempool.
398         invariant(pool_frag == 0);
399     } else {
400         if (this->values_same_size) {
401             // We know exactly how much memory should be used.
402             invariant(pool_used == num_values * align(this->value_length + __builtin_offsetof(dmt_node, value)));
403         } else {
404             // We can only do a lower bound on memory usage.
405             invariant(pool_used >= num_values * __builtin_offsetof(dmt_node, value));
406         }
407         std::vector<bool> touched(pool_size, false);
408         verify_internal(this->d.t.root, &touched);
409         size_t bytes_used = 0;
410         for (size_t i = 0; i < pool_size; i++) {
411             if (touched.at(i)) {
412                 ++bytes_used;
413             }
414         }
415         invariant(bytes_used == pool_used);
416     }
417 }
418 
419 // Verifies all weights are internally consistent.
420 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
verify_internal(const subtree & subtree,std::vector<bool> * touched) const421 void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::verify_internal(const subtree &subtree, std::vector<bool> *touched) const {
422     if (subtree.is_null()) {
423         return;
424     }
425     const dmt_node &node = get_node(subtree);
426 
427     if (this->values_same_size) {
428         invariant(node.value_length == this->value_length);
429     }
430 
431     size_t offset = toku_mempool_get_offset_from_pointer_and_base(&this->mp, &node);
432     size_t node_size = align(__builtin_offsetof(dmt_node, value) + node.value_length);
433     invariant(offset <= touched->size());
434     invariant(offset+node_size <= touched->size());
435     invariant(offset % ALIGNMENT == 0);
436     // Mark memory as touched and never allocated to multiple nodes.
437     for (size_t i = offset; i < offset+node_size; ++i) {
438         invariant(!touched->at(i));
439         touched->at(i) = true;
440     }
441 
442     const uint32_t leftweight = this->nweight(node.left);
443     const uint32_t rightweight = this->nweight(node.right);
444 
445     invariant(leftweight + rightweight + 1 == this->nweight(subtree));
446     verify_internal(node.left, touched);
447     verify_internal(node.right, touched);
448 }
449 
450 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
451 template<typename iterate_extra_t,
452          int (*f)(const uint32_t, dmtdata_t *, const uint32_t, iterate_extra_t *const)>
iterate_ptr(iterate_extra_t * const iterate_extra)453 void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::iterate_ptr(iterate_extra_t *const iterate_extra) {
454     if (this->is_array) {
455         this->iterate_ptr_internal_array<iterate_extra_t, f>(0, this->size(), iterate_extra);
456     } else {
457         this->iterate_ptr_internal<iterate_extra_t, f>(0, this->size(), this->d.t.root, 0, iterate_extra);
458     }
459 }
460 
461 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
fetch(const uint32_t idx,uint32_t * const value_len,dmtdataout_t * const value) const462 int dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::fetch(const uint32_t idx, uint32_t *const value_len, dmtdataout_t *const value) const {
463     if (idx >= this->size()) { return EINVAL; }
464     if (this->is_array) {
465         this->fetch_internal_array(idx, value_len, value);
466     } else {
467         this->fetch_internal(this->d.t.root, idx, value_len, value);
468     }
469     return 0;
470 }
471 
472 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
473 template<typename dmtcmp_t,
474          int (*h)(const uint32_t, const dmtdata_t &, const dmtcmp_t &)>
find_zero(const dmtcmp_t & extra,uint32_t * const value_len,dmtdataout_t * const value,uint32_t * const idxp) const475 int dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::find_zero(const dmtcmp_t &extra, uint32_t *const value_len, dmtdataout_t *const value, uint32_t *const idxp) const {
476     uint32_t tmp_index;
477     uint32_t *const child_idxp = (idxp != nullptr) ? idxp : &tmp_index;
478     int r;
479     if (this->is_array) {
480         r = this->find_internal_zero_array<dmtcmp_t, h>(extra, value_len, value, child_idxp);
481     }
482     else {
483         r = this->find_internal_zero<dmtcmp_t, h>(this->d.t.root, extra, value_len, value, child_idxp);
484     }
485     return r;
486 }
487 
488 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
489 template<typename dmtcmp_t,
490          int (*h)(const uint32_t, const dmtdata_t &, const dmtcmp_t &)>
find(const dmtcmp_t & extra,int direction,uint32_t * const value_len,dmtdataout_t * const value,uint32_t * const idxp) const491 int dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::find(const dmtcmp_t &extra, int direction, uint32_t *const value_len, dmtdataout_t *const value, uint32_t *const idxp) const {
492     uint32_t tmp_index;
493     uint32_t *const child_idxp = (idxp != nullptr) ? idxp : &tmp_index;
494     paranoid_invariant(direction != 0);
495     if (direction < 0) {
496         if (this->is_array) {
497             return this->find_internal_minus_array<dmtcmp_t, h>(extra, value_len,  value, child_idxp);
498         } else {
499             return this->find_internal_minus<dmtcmp_t, h>(this->d.t.root, extra, value_len,  value, child_idxp);
500         }
501     } else {
502         if (this->is_array) {
503             return this->find_internal_plus_array<dmtcmp_t, h>(extra, value_len,  value, child_idxp);
504         } else {
505             return this->find_internal_plus<dmtcmp_t, h>(this->d.t.root, extra, value_len, value, child_idxp);
506         }
507     }
508 }
509 
510 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
memory_size(void)511 size_t dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::memory_size(void) {
512     return (sizeof *this) + toku_mempool_get_size(&this->mp);
513 }
514 
515 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
get_node(const subtree & subtree) const516 dmt_node_templated<dmtdata_t> & dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::get_node(const subtree &subtree) const {
517     paranoid_invariant(!subtree.is_null());
518     return get_node(subtree.get_offset());
519 }
520 
521 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
get_node(const node_offset offset) const522 dmt_node_templated<dmtdata_t> & dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::get_node(const node_offset offset) const {
523     void* ptr = toku_mempool_get_pointer_from_base_and_offset(&this->mp, offset);
524     dmt_node *CAST_FROM_VOIDP(node, ptr);
525     return *node;
526 }
527 
528 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
node_set_value(dmt_node * n,const dmtwriter_t & value)529 void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::node_set_value(dmt_node * n, const dmtwriter_t &value) {
530     n->value_length = value.get_size();
531     value.write_to(&n->value);
532 }
533 
534 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
node_malloc_and_set_value(const dmtwriter_t & value)535 node_offset dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::node_malloc_and_set_value(const dmtwriter_t &value) {
536     size_t val_size = value.get_size();
537     size_t size_to_alloc = __builtin_offsetof(dmt_node, value) + val_size;
538     size_to_alloc = align(size_to_alloc);
539     void* np = toku_mempool_malloc(&this->mp, size_to_alloc);
540     paranoid_invariant_notnull(np);
541     dmt_node *CAST_FROM_VOIDP(n, np);
542     node_set_value(n, value);
543 
544     return toku_mempool_get_offset_from_pointer_and_base(&this->mp, np);
545 }
546 
547 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
node_free(const subtree & st)548 void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::node_free(const subtree &st) {
549     dmt_node &n = get_node(st);
550     size_t size_to_free = __builtin_offsetof(dmt_node, value) + n.value_length;
551     size_to_free = align(size_to_free);
552     toku_mempool_mfree(&this->mp, &n, size_to_free);
553 }
554 
555 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
maybe_resize_tree(const dmtwriter_t * value)556 void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::maybe_resize_tree(const dmtwriter_t * value) {
557     const ssize_t curr_capacity = toku_mempool_get_size(&this->mp);
558     const ssize_t curr_free = toku_mempool_get_free_size(&this->mp);
559     const ssize_t curr_used = toku_mempool_get_used_size(&this->mp);
560     ssize_t add_size = 0;
561     if (value) {
562         add_size = __builtin_offsetof(dmt_node, value) + value->get_size();
563         add_size = align(add_size);
564     }
565 
566     const ssize_t need_size = curr_used + add_size;
567     paranoid_invariant(need_size <= UINT32_MAX);
568     //TODO(leif) consider different growth rates
569     const ssize_t new_size = 2*need_size;
570     paranoid_invariant(new_size <= UINT32_MAX);
571 
572     if ((curr_capacity / 2 >= new_size) || // Way too much allocated
573         (curr_free < add_size)) {  // No room in mempool
574         // Copy all memory and reconstruct dmt in new mempool.
575         if (curr_free < add_size && toku_mempool_get_frag_size(&this->mp) == 0) {
576             // TODO(yoni) or TODO(leif) consider doing this not just when frag size is zero, but also when it is a small percentage of the total mempool size
577             // Offsets remain the same in the new mempool so we can just realloc.
578             toku_mempool_realloc_larger(&this->mp, new_size);
579         } else if (!this->d.t.root.is_null()) {
580             struct mempool new_kvspace;
581             toku_mempool_construct(&new_kvspace, new_size);
582 
583             const dmt_node &n = get_node(this->d.t.root);
584             node_offset *tmp_array;
585             bool malloced = false;
586             tmp_array = alloc_temp_node_offsets(n.weight);
587             if (!tmp_array) {
588                 malloced = true;
589                 XMALLOC_N(n.weight, tmp_array);
590             }
591             this->fill_array_with_subtree_offsets(tmp_array, this->d.t.root);
592             for (node_offset i = 0; i < n.weight; i++) {
593                 dmt_node &node = get_node(tmp_array[i]);
594                 const size_t bytes_to_copy = __builtin_offsetof(dmt_node, value) + node.value_length;
595                 const size_t bytes_to_alloc = align(bytes_to_copy);
596                 void* newdata = toku_mempool_malloc(&new_kvspace, bytes_to_alloc);
597                 memcpy(newdata, &node, bytes_to_copy);
598                 tmp_array[i] = toku_mempool_get_offset_from_pointer_and_base(&new_kvspace, newdata);
599             }
600 
601             struct mempool old_kvspace = this->mp;
602             this->mp = new_kvspace;
603             this->rebuild_subtree_from_offsets(&this->d.t.root, tmp_array, n.weight);
604             if (malloced) toku_free(tmp_array);
605             toku_mempool_destroy(&old_kvspace);
606         } else {
607             toku_mempool_destroy(&this->mp);
608             toku_mempool_construct(&this->mp, new_size);
609         }
610     }
611 }
612 
613 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
will_need_rebalance(const subtree & subtree,const int leftmod,const int rightmod) const614 bool dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::will_need_rebalance(const subtree &subtree, const int leftmod, const int rightmod) const {
615     if (subtree.is_null()) { return false; }
616     const dmt_node &n = get_node(subtree);
617     // one of the 1's is for the root.
618     // the other is to take ceil(n/2)
619     const uint32_t weight_left  = this->nweight(n.left)  + leftmod;
620     const uint32_t weight_right = this->nweight(n.right) + rightmod;
621     return ((1+weight_left < (1+1+weight_right)/2)
622             ||
623             (1+weight_right < (1+1+weight_left)/2));
624 }
625 
626 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
insert_internal(subtree * const subtreep,const dmtwriter_t & value,const uint32_t idx,subtree ** const rebalance_subtree)627 void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::insert_internal(subtree *const subtreep, const dmtwriter_t &value, const uint32_t idx, subtree **const rebalance_subtree) {
628     if (subtreep->is_null()) {
629         paranoid_invariant_zero(idx);
630         const node_offset newoffset = this->node_malloc_and_set_value(value);
631         dmt_node &newnode = get_node(newoffset);
632         newnode.weight = 1;
633         newnode.left.set_to_null();
634         newnode.right.set_to_null();
635         subtreep->set_offset(newoffset);
636     } else {
637         dmt_node &n = get_node(*subtreep);
638         n.weight++;
639         if (idx <= this->nweight(n.left)) {
640             if (*rebalance_subtree == nullptr && this->will_need_rebalance(*subtreep, 1, 0)) {
641                 *rebalance_subtree = subtreep;
642             }
643             this->insert_internal(&n.left, value, idx, rebalance_subtree);
644         } else {
645             if (*rebalance_subtree == nullptr && this->will_need_rebalance(*subtreep, 0, 1)) {
646                 *rebalance_subtree = subtreep;
647             }
648             const uint32_t sub_index = idx - this->nweight(n.left) - 1;
649             this->insert_internal(&n.right, value, sub_index, rebalance_subtree);
650         }
651     }
652 }
653 
654 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
delete_internal(subtree * const subtreep,const uint32_t idx,subtree * const subtree_replace,subtree ** const rebalance_subtree)655 void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::delete_internal(subtree *const subtreep, const uint32_t idx, subtree *const subtree_replace, subtree **const rebalance_subtree) {
656     paranoid_invariant_notnull(subtreep);
657     paranoid_invariant_notnull(rebalance_subtree);
658     paranoid_invariant(!subtreep->is_null());
659     dmt_node &n = get_node(*subtreep);
660     const uint32_t leftweight = this->nweight(n.left);
661     if (idx < leftweight) {
662         n.weight--;
663         if (*rebalance_subtree == nullptr && this->will_need_rebalance(*subtreep, -1, 0)) {
664             *rebalance_subtree = subtreep;
665         }
666         this->delete_internal(&n.left, idx, subtree_replace, rebalance_subtree);
667     } else if (idx == leftweight) {
668         // Found the correct index.
669         if (n.left.is_null()) {
670             paranoid_invariant_zero(idx);
671             // Delete n and let parent point to n.right
672             subtree ptr_this = *subtreep;
673             *subtreep = n.right;
674             subtree to_free;
675             if (subtree_replace != nullptr) {
676                 // Swap self with the other node.  Taking over all responsibility.
677                 to_free = *subtree_replace;
678                 dmt_node &ancestor = get_node(*subtree_replace);
679                 if (*rebalance_subtree == &ancestor.right) {
680                     // Take over rebalance responsibility.
681                     *rebalance_subtree = &n.right;
682                 }
683                 n.weight = ancestor.weight;
684                 n.left = ancestor.left;
685                 n.right = ancestor.right;
686                 *subtree_replace = ptr_this;
687             } else {
688                 to_free = ptr_this;
689             }
690             this->node_free(to_free);
691         } else if (n.right.is_null()) {
692             // Delete n and let parent point to n.left
693             subtree to_free = *subtreep;
694             *subtreep = n.left;
695             paranoid_invariant(idx>0);
696             paranoid_invariant_null(subtree_replace);  // To be recursive, we're looking for index 0.  n is index > 0 here.
697             this->node_free(to_free);
698         } else {
699             if (*rebalance_subtree == nullptr && this->will_need_rebalance(*subtreep, 0, -1)) {
700                 *rebalance_subtree = subtreep;
701             }
702             // don't need to copy up value, it's only used by this
703             // next call, and when that gets to the bottom there
704             // won't be any more recursion
705             n.weight--;
706             this->delete_internal(&n.right, 0, subtreep, rebalance_subtree);
707         }
708     } else {
709         n.weight--;
710         if (*rebalance_subtree == nullptr && this->will_need_rebalance(*subtreep, 0, -1)) {
711             *rebalance_subtree = subtreep;
712         }
713         this->delete_internal(&n.right, idx - leftweight - 1, subtree_replace, rebalance_subtree);
714     }
715 }
716 
717 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
718 template<typename iterate_extra_t,
719          int (*f)(const uint32_t, const dmtdata_t &, const uint32_t, iterate_extra_t *const)>
iterate_internal_array(const uint32_t left,const uint32_t right,iterate_extra_t * const iterate_extra) const720 int dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::iterate_internal_array(const uint32_t left, const uint32_t right,
721                                                          iterate_extra_t *const iterate_extra) const {
722     int r;
723     for (uint32_t i = left; i < right; ++i) {
724         r = f(this->value_length, *get_array_value(i), i, iterate_extra);
725         if (r != 0) {
726             return r;
727         }
728     }
729     return 0;
730 }
731 
732 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
733 template<typename iterate_extra_t,
734          int (*f)(const uint32_t, dmtdata_t *, const uint32_t, iterate_extra_t *const)>
iterate_ptr_internal(const uint32_t left,const uint32_t right,const subtree & subtree,const uint32_t idx,iterate_extra_t * const iterate_extra)735 void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::iterate_ptr_internal(const uint32_t left, const uint32_t right,
736                                                         const subtree &subtree, const uint32_t idx,
737                                                         iterate_extra_t *const iterate_extra) {
738     if (!subtree.is_null()) {
739         dmt_node &n = get_node(subtree);
740         const uint32_t idx_root = idx + this->nweight(n.left);
741         if (left < idx_root) {
742             this->iterate_ptr_internal<iterate_extra_t, f>(left, right, n.left, idx, iterate_extra);
743         }
744         if (left <= idx_root && idx_root < right) {
745             int r = f(n.value_length, &n.value, idx_root, iterate_extra);
746             lazy_assert_zero(r);
747         }
748         if (idx_root + 1 < right) {
749             this->iterate_ptr_internal<iterate_extra_t, f>(left, right, n.right, idx_root + 1, iterate_extra);
750         }
751     }
752 }
753 
754 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
755 template<typename iterate_extra_t,
756          int (*f)(const uint32_t, dmtdata_t *, const uint32_t, iterate_extra_t *const)>
iterate_ptr_internal_array(const uint32_t left,const uint32_t right,iterate_extra_t * const iterate_extra)757 void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::iterate_ptr_internal_array(const uint32_t left, const uint32_t right,
758                                                               iterate_extra_t *const iterate_extra) {
759     for (uint32_t i = left; i < right; ++i) {
760         int r = f(this->value_length, get_array_value(i), i, iterate_extra);
761         lazy_assert_zero(r);
762     }
763 }
764 
765 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
766 template<typename iterate_extra_t,
767          int (*f)(const uint32_t, const dmtdata_t &, const uint32_t, iterate_extra_t *const)>
iterate_internal(const uint32_t left,const uint32_t right,const subtree & subtree,const uint32_t idx,iterate_extra_t * const iterate_extra) const768 int dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::iterate_internal(const uint32_t left, const uint32_t right,
769                                                    const subtree &subtree, const uint32_t idx,
770                                                    iterate_extra_t *const iterate_extra) const {
771     if (subtree.is_null()) { return 0; }
772     int r;
773     const dmt_node &n = get_node(subtree);
774     const uint32_t idx_root = idx + this->nweight(n.left);
775     if (left < idx_root) {
776         r = this->iterate_internal<iterate_extra_t, f>(left, right, n.left, idx, iterate_extra);
777         if (r != 0) { return r; }
778     }
779     if (left <= idx_root && idx_root < right) {
780         r = f(n.value_length, n.value, idx_root, iterate_extra);
781         if (r != 0) { return r; }
782     }
783     if (idx_root + 1 < right) {
784         return this->iterate_internal<iterate_extra_t, f>(left, right, n.right, idx_root + 1, iterate_extra);
785     }
786     return 0;
787 }
788 
789 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
fetch_internal_array(const uint32_t i,uint32_t * const value_len,dmtdataout_t * const value) const790 void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::fetch_internal_array(const uint32_t i, uint32_t *const value_len, dmtdataout_t *const value) const {
791     copyout(value_len, value, this->value_length, get_array_value(i));
792 }
793 
794 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
fetch_internal(const subtree & subtree,const uint32_t i,uint32_t * const value_len,dmtdataout_t * const value) const795 void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::fetch_internal(const subtree &subtree, const uint32_t i, uint32_t *const value_len, dmtdataout_t *const value) const {
796     dmt_node &n = get_node(subtree);
797     const uint32_t leftweight = this->nweight(n.left);
798     if (i < leftweight) {
799         this->fetch_internal(n.left, i, value_len, value);
800     } else if (i == leftweight) {
801         copyout(value_len, value, &n);
802     } else {
803         this->fetch_internal(n.right, i - leftweight - 1, value_len, value);
804     }
805 }
806 
807 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
fill_array_with_subtree_offsets(node_offset * const array,const subtree & subtree) const808 void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::fill_array_with_subtree_offsets(node_offset *const array, const subtree &subtree) const {
809     if (!subtree.is_null()) {
810         const dmt_node &tree = get_node(subtree);
811         this->fill_array_with_subtree_offsets(&array[0], tree.left);
812         array[this->nweight(tree.left)] = subtree.get_offset();
813         this->fill_array_with_subtree_offsets(&array[this->nweight(tree.left) + 1], tree.right);
814     }
815 }
816 
817 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
rebuild_subtree_from_offsets(subtree * const subtree,const node_offset * const offsets,const uint32_t numvalues)818 void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::rebuild_subtree_from_offsets(subtree *const subtree, const node_offset *const offsets, const uint32_t numvalues) {
819     if (numvalues==0) {
820         subtree->set_to_null();
821     } else {
822         uint32_t halfway = numvalues/2;
823         subtree->set_offset(offsets[halfway]);
824         dmt_node &newnode = get_node(offsets[halfway]);
825         newnode.weight = numvalues;
826         // value is already in there.
827         this->rebuild_subtree_from_offsets(&newnode.left,  &offsets[0], halfway);
828         this->rebuild_subtree_from_offsets(&newnode.right, &offsets[halfway+1], numvalues-(halfway+1));
829     }
830 }
831 
832 //TODO(leif): Note that this can mess with our memory_footprint calculation (we may touch past what is marked as 'used' in the mempool)
833 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
alloc_temp_node_offsets(uint32_t num_offsets)834 node_offset* dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::alloc_temp_node_offsets(uint32_t num_offsets) {
835     size_t mem_needed = num_offsets * sizeof(node_offset);
836     size_t mem_free;
837     mem_free = toku_mempool_get_free_size(&this->mp);
838     node_offset* CAST_FROM_VOIDP(tmp, toku_mempool_get_next_free_ptr(&this->mp));
839     if (mem_free >= mem_needed) {
840         return tmp;
841     }
842     return nullptr;
843 }
844 
845 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
rebalance(subtree * const subtree)846 void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::rebalance(subtree *const subtree) {
847     paranoid_invariant(!subtree->is_null());
848 
849     // There is a possible "optimization" here:
850     //   if (this->values_same_size && subtree == &this->d.t.root) {
851     //       this->convert_from_tree_to_array();
852     //       return;
853     //   }
854     // but we don't want to do it because it involves actually copying values around
855     // as opposed to stopping in the middle of rebalancing (like in the OMT)
856 
857     node_offset offset = subtree->get_offset();
858     const dmt_node &n = get_node(offset);
859     node_offset *tmp_array;
860     bool malloced = false;
861     tmp_array = alloc_temp_node_offsets(n.weight);
862     if (!tmp_array) {
863         malloced = true;
864         XMALLOC_N(n.weight, tmp_array);
865     }
866     this->fill_array_with_subtree_offsets(tmp_array, *subtree);
867     this->rebuild_subtree_from_offsets(subtree, tmp_array, n.weight);
868     if (malloced) toku_free(tmp_array);
869 }
870 
871 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
copyout(uint32_t * const outlen,dmtdata_t * const out,const dmt_node * const n)872 void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::copyout(uint32_t *const outlen, dmtdata_t *const out, const dmt_node *const n) {
873     if (outlen) {
874         *outlen = n->value_length;
875     }
876     if (out) {
877         *out = n->value;
878     }
879 }
880 
881 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
copyout(uint32_t * const outlen,dmtdata_t ** const out,dmt_node * const n)882 void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::copyout(uint32_t *const outlen, dmtdata_t **const out, dmt_node *const n) {
883     if (outlen) {
884         *outlen = n->value_length;
885     }
886     if (out) {
887         *out = &n->value;
888     }
889 }
890 
891 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
copyout(uint32_t * const outlen,dmtdata_t * const out,const uint32_t len,const dmtdata_t * const stored_value_ptr)892 void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::copyout(uint32_t *const outlen, dmtdata_t *const out, const uint32_t len, const dmtdata_t *const stored_value_ptr) {
893     if (outlen) {
894         *outlen = len;
895     }
896     if (out) {
897         *out = *stored_value_ptr;
898     }
899 }
900 
901 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
copyout(uint32_t * const outlen,dmtdata_t ** const out,const uint32_t len,dmtdata_t * const stored_value_ptr)902 void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::copyout(uint32_t *const outlen, dmtdata_t **const out, const uint32_t len, dmtdata_t *const stored_value_ptr) {
903     if (outlen) {
904         *outlen = len;
905     }
906     if (out) {
907         *out = stored_value_ptr;
908     }
909 }
910 
911 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
912 template<typename dmtcmp_t,
913          int (*h)(const uint32_t, const dmtdata_t &, const dmtcmp_t &)>
find_internal_zero_array(const dmtcmp_t & extra,uint32_t * const value_len,dmtdataout_t * const value,uint32_t * const idxp) const914 int dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::find_internal_zero_array(const dmtcmp_t &extra, uint32_t *const value_len, dmtdataout_t *const value, uint32_t *const idxp) const {
915     paranoid_invariant_notnull(idxp);
916     uint32_t min = 0;
917     uint32_t limit = this->d.a.num_values;
918     uint32_t best_pos = subtree::NODE_NULL;
919     uint32_t best_zero = subtree::NODE_NULL;
920 
921     while (min!=limit) {
922         uint32_t mid = (min + limit) / 2;
923         int hv = h(this->value_length, *get_array_value(mid), extra);
924         if (hv<0) {
925             min = mid+1;
926         }
927         else if (hv>0) {
928             best_pos  = mid;
929             limit     = mid;
930         }
931         else {
932             best_zero = mid;
933             limit     = mid;
934         }
935     }
936     if (best_zero!=subtree::NODE_NULL) {
937         //Found a zero
938         copyout(value_len, value, this->value_length, get_array_value(best_zero));
939         *idxp = best_zero;
940         return 0;
941     }
942     if (best_pos!=subtree::NODE_NULL) *idxp = best_pos;
943     else                     *idxp = this->d.a.num_values;
944     return DB_NOTFOUND;
945 }
946 
947 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
948 template<typename dmtcmp_t,
949          int (*h)(const uint32_t, const dmtdata_t &, const dmtcmp_t &)>
find_internal_zero(const subtree & subtree,const dmtcmp_t & extra,uint32_t * const value_len,dmtdataout_t * const value,uint32_t * const idxp) const950 int dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::find_internal_zero(const subtree &subtree, const dmtcmp_t &extra, uint32_t *const value_len, dmtdataout_t *const value, uint32_t *const idxp) const {
951     paranoid_invariant_notnull(idxp);
952     if (subtree.is_null()) {
953         *idxp = 0;
954         return DB_NOTFOUND;
955     }
956     dmt_node &n = get_node(subtree);
957     int hv = h(n.value_length, n.value, extra);
958     if (hv<0) {
959         int r = this->find_internal_zero<dmtcmp_t, h>(n.right, extra, value_len, value, idxp);
960         *idxp += this->nweight(n.left)+1;
961         return r;
962     } else if (hv>0) {
963         return this->find_internal_zero<dmtcmp_t, h>(n.left, extra, value_len, value, idxp);
964     } else {
965         int r = this->find_internal_zero<dmtcmp_t, h>(n.left, extra, value_len, value, idxp);
966         if (r==DB_NOTFOUND) {
967             *idxp = this->nweight(n.left);
968             copyout(value_len, value, &n);
969             r = 0;
970         }
971         return r;
972     }
973 }
974 
975 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
976 template<typename dmtcmp_t,
977          int (*h)(const uint32_t, const dmtdata_t &, const dmtcmp_t &)>
find_internal_plus_array(const dmtcmp_t & extra,uint32_t * const value_len,dmtdataout_t * const value,uint32_t * const idxp) const978 int dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::find_internal_plus_array(const dmtcmp_t &extra, uint32_t *const value_len, dmtdataout_t *const value, uint32_t *const idxp) const {
979     paranoid_invariant_notnull(idxp);
980     uint32_t min = 0;
981     uint32_t limit = this->d.a.num_values;
982     uint32_t best = subtree::NODE_NULL;
983 
984     while (min != limit) {
985         const uint32_t mid = (min + limit) / 2;
986         const int hv = h(this->value_length, *get_array_value(mid), extra);
987         if (hv > 0) {
988             best = mid;
989             limit = mid;
990         } else {
991             min = mid + 1;
992         }
993     }
994     if (best == subtree::NODE_NULL) { return DB_NOTFOUND; }
995     copyout(value_len, value, this->value_length, get_array_value(best));
996     *idxp = best;
997     return 0;
998 }
999 
1000 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
1001 template<typename dmtcmp_t,
1002          int (*h)(const uint32_t, const dmtdata_t &, const dmtcmp_t &)>
find_internal_plus(const subtree & subtree,const dmtcmp_t & extra,uint32_t * const value_len,dmtdataout_t * const value,uint32_t * const idxp) const1003 int dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::find_internal_plus(const subtree &subtree, const dmtcmp_t &extra, uint32_t *const value_len, dmtdataout_t *const value, uint32_t *const idxp) const {
1004     paranoid_invariant_notnull(idxp);
1005     if (subtree.is_null()) {
1006         return DB_NOTFOUND;
1007     }
1008     dmt_node & n = get_node(subtree);
1009     int hv = h(n.value_length, n.value, extra);
1010     int r;
1011     if (hv > 0) {
1012         r = this->find_internal_plus<dmtcmp_t, h>(n.left, extra, value_len, value, idxp);
1013         if (r == DB_NOTFOUND) {
1014             *idxp = this->nweight(n.left);
1015             copyout(value_len, value, &n);
1016             r = 0;
1017         }
1018     } else {
1019         r = this->find_internal_plus<dmtcmp_t, h>(n.right, extra, value_len, value, idxp);
1020         if (r == 0) {
1021             *idxp += this->nweight(n.left) + 1;
1022         }
1023     }
1024     return r;
1025 }
1026 
1027 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
1028 template<typename dmtcmp_t,
1029          int (*h)(const uint32_t, const dmtdata_t &, const dmtcmp_t &)>
find_internal_minus_array(const dmtcmp_t & extra,uint32_t * const value_len,dmtdataout_t * const value,uint32_t * const idxp) const1030 int dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::find_internal_minus_array(const dmtcmp_t &extra, uint32_t *const value_len, dmtdataout_t *const value, uint32_t *const idxp) const {
1031     paranoid_invariant_notnull(idxp);
1032     uint32_t min = 0;
1033     uint32_t limit = this->d.a.num_values;
1034     uint32_t best = subtree::NODE_NULL;
1035 
1036     while (min != limit) {
1037         const uint32_t mid = (min + limit) / 2;
1038         const int hv = h(this->value_length, *get_array_value(mid), extra);
1039         if (hv < 0) {
1040             best = mid;
1041             min = mid + 1;
1042         } else {
1043             limit = mid;
1044         }
1045     }
1046     if (best == subtree::NODE_NULL) { return DB_NOTFOUND; }
1047     copyout(value_len, value, this->value_length, get_array_value(best));
1048     *idxp = best;
1049     return 0;
1050 }
1051 
1052 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
1053 template<typename dmtcmp_t,
1054          int (*h)(const uint32_t, const dmtdata_t &, const dmtcmp_t &)>
find_internal_minus(const subtree & subtree,const dmtcmp_t & extra,uint32_t * const value_len,dmtdataout_t * const value,uint32_t * const idxp) const1055 int dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::find_internal_minus(const subtree &subtree, const dmtcmp_t &extra, uint32_t *const value_len, dmtdataout_t *const value, uint32_t *const idxp) const {
1056     paranoid_invariant_notnull(idxp);
1057     if (subtree.is_null()) {
1058         return DB_NOTFOUND;
1059     }
1060     dmt_node & n = get_node(subtree);
1061     int hv = h(n.value_length, n.value, extra);
1062     if (hv < 0) {
1063         int r = this->find_internal_minus<dmtcmp_t, h>(n.right, extra, value_len, value, idxp);
1064         if (r == 0) {
1065             *idxp += this->nweight(n.left) + 1;
1066         } else if (r == DB_NOTFOUND) {
1067             *idxp = this->nweight(n.left);
1068             copyout(value_len, value, &n);
1069             r = 0;
1070         }
1071         return r;
1072     } else {
1073         return this->find_internal_minus<dmtcmp_t, h>(n.left, extra, value_len, value, idxp);
1074     }
1075 }
1076 
1077 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
get_fixed_length(void) const1078 uint32_t dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::get_fixed_length(void) const {
1079     return this->values_same_size ? this->value_length : 0;
1080 }
1081 
1082 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
get_fixed_length_alignment_overhead(void) const1083 uint32_t dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::get_fixed_length_alignment_overhead(void) const {
1084     return this->values_same_size ? align(this->value_length) - this->value_length : 0;
1085 }
1086 
1087 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
value_length_is_fixed(void) const1088 bool dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::value_length_is_fixed(void) const {
1089     return this->values_same_size;
1090 }
1091 
1092 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
serialize_values(uint32_t expected_unpadded_memory,struct wbuf * wb) const1093 void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::serialize_values(uint32_t expected_unpadded_memory, struct wbuf *wb) const {
1094     invariant(this->is_array);
1095     invariant(this->values_same_size);
1096     const uint8_t pad_bytes = get_fixed_length_alignment_overhead();
1097     const uint32_t fixed_len = this->value_length;
1098     const uint32_t fixed_aligned_len = align(this->value_length);
1099     paranoid_invariant(expected_unpadded_memory == this->d.a.num_values * this->value_length);
1100     paranoid_invariant(toku_mempool_get_used_size(&this->mp) >=
1101                        expected_unpadded_memory + pad_bytes * this->d.a.num_values);
1102     if (this->d.a.num_values == 0) {
1103         // Nothing to serialize
1104     } else if (pad_bytes == 0) {
1105         // Basically a memcpy
1106         wbuf_nocrc_literal_bytes(wb, get_array_value(0), expected_unpadded_memory);
1107     } else {
1108         uint8_t* const dest = wbuf_nocrc_reserve_literal_bytes(wb, expected_unpadded_memory);
1109         const uint8_t* const src = reinterpret_cast<uint8_t*>(get_array_value(0));
1110         //TODO(leif) maybe look at vectorization here
1111         for (uint32_t i = 0; i < this->d.a.num_values; i++) {
1112             memcpy(&dest[i*fixed_len], &src[i*fixed_aligned_len], fixed_len);
1113         }
1114     }
1115 }
1116 
1117 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
create(uint32_t _max_values,uint32_t _max_value_bytes)1118 void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::builder::create(uint32_t _max_values, uint32_t _max_value_bytes) {
1119     this->max_values = _max_values;
1120     this->max_value_bytes = _max_value_bytes;
1121     this->temp.create();
1122     paranoid_invariant_null(toku_mempool_get_base(&this->temp.mp));
1123     this->temp_valid = true;
1124     this->sorted_node_offsets = nullptr;
1125     // Include enough space for alignment padding
1126     size_t initial_space = (ALIGNMENT - 1) * _max_values + _max_value_bytes;
1127 
1128     toku_mempool_construct(&this->temp.mp, initial_space);  // Adds 25%
1129 }
1130 
1131 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
append(const dmtwriter_t & value)1132 void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::builder::append(const dmtwriter_t &value) {
1133     paranoid_invariant(this->temp_valid);
1134     //NOTE: Always use d.a.num_values for size because we have not yet created root.
1135     if (this->temp.values_same_size && (this->temp.d.a.num_values == 0 || value.get_size() == this->temp.value_length)) {
1136         temp.insert_at_array_end<false>(value);
1137         return;
1138     }
1139     if (this->temp.is_array) {
1140         // Convert to tree format (without weights and linkage)
1141         XMALLOC_N(this->max_values, this->sorted_node_offsets);
1142 
1143         // Include enough space for alignment padding
1144         size_t mem_needed = (ALIGNMENT - 1 + __builtin_offsetof(dmt_node, value)) * max_values + max_value_bytes;
1145         struct mempool old_mp = this->temp.mp;
1146 
1147         const uint32_t num_values = this->temp.d.a.num_values;
1148         toku_mempool_construct(&this->temp.mp, mem_needed);
1149 
1150         // Copy over and get node_offsets
1151         for (uint32_t i = 0; i < num_values; i++) {
1152             dmtwriter_t writer(this->temp.value_length, this->temp.get_array_value_internal(&old_mp, i));
1153             this->sorted_node_offsets[i] = this->temp.node_malloc_and_set_value(writer);
1154         }
1155         this->temp.is_array = false;
1156         this->temp.values_same_size = false;
1157         this->temp.value_length = 0;
1158         toku_mempool_destroy(&old_mp);
1159     }
1160     paranoid_invariant(!this->temp.is_array);
1161     this->sorted_node_offsets[this->temp.d.a.num_values++] = this->temp.node_malloc_and_set_value(value);
1162 }
1163 
1164 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
value_length_is_fixed(void)1165 bool dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::builder::value_length_is_fixed(void) {
1166     paranoid_invariant(this->temp_valid);
1167     return this->temp.values_same_size;
1168 }
1169 
1170 template<typename dmtdata_t, typename dmtdataout_t, typename dmtwriter_t>
build(dmt<dmtdata_t,dmtdataout_t,dmtwriter_t> * dest)1171 void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::builder::build(dmt<dmtdata_t, dmtdataout_t, dmtwriter_t> *dest) {
1172     invariant(this->temp_valid);
1173     //NOTE: Always use d.a.num_values for size because we have not yet created root.
1174     invariant(this->temp.d.a.num_values <= this->max_values);
1175     // Memory invariant is taken care of incrementally (during append())
1176 
1177     if (!this->temp.is_array) {
1178         invariant_notnull(this->sorted_node_offsets);
1179         this->temp.rebuild_subtree_from_offsets(&this->temp.d.t.root, this->sorted_node_offsets, this->temp.d.a.num_values);
1180         toku_free(this->sorted_node_offsets);
1181         this->sorted_node_offsets = nullptr;
1182     }
1183     paranoid_invariant_null(this->sorted_node_offsets);
1184 
1185     const size_t used = toku_mempool_get_used_size(&this->temp.mp);
1186     const size_t allocated = toku_mempool_get_size(&this->temp.mp);
1187     // We want to use no more than (about) the actual used space + 25% overhead for mempool growth.
1188     // When we know the elements are fixed-length, we use the better dmt constructor.
1189     // In practice, as of Jan 2014, we use the builder in two cases:
1190     //  - When we know the elements are not fixed-length.
1191     //  - During upgrade of a pre version 26 basement node.
1192     // During upgrade, we will probably wildly overallocate because we don't account for the values that aren't stored in the dmt, so here we want to shrink the mempool.
1193     // When we know the elements are not fixed-length, we still know how much memory they occupy in total, modulo alignment, so we want to allow for mempool overhead and worst-case alignment overhead, and not shrink the mempool.
1194     const size_t max_allowed = used + (ALIGNMENT-1) * this->temp.size();
1195     const size_t max_allowed_with_mempool_overhead = max_allowed + max_allowed / 4;
1196     //TODO(leif): get footprint calculation correct (under jemalloc) and add some form of footprint constraint
1197     if (allocated > max_allowed_with_mempool_overhead) {
1198         // Reallocate smaller mempool to save memory
1199         invariant_zero(toku_mempool_get_frag_size(&this->temp.mp));
1200         struct mempool new_mp;
1201         toku_mempool_construct(&new_mp, used);
1202         void * newbase = toku_mempool_malloc(&new_mp, used);
1203         invariant_notnull(newbase);
1204         memcpy(newbase, toku_mempool_get_base(&this->temp.mp), used);
1205         toku_mempool_destroy(&this->temp.mp);
1206         this->temp.mp = new_mp;
1207     }
1208 
1209     *dest = this->temp;
1210     this->temp_valid = false;
1211 
1212 }
1213 } // namespace toku
1214