1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include <my_global.h>
40 #include <ft/bndata.h>
41 #include <ft/ft-internal.h>
42 
43 using namespace toku;
klpair_disksize(const uint32_t klpair_len,const klpair_struct * klpair) const44 uint32_t bn_data::klpair_disksize(const uint32_t klpair_len, const klpair_struct *klpair) const {
45     return sizeof(*klpair) + keylen_from_klpair_len(klpair_len) + leafentry_disksize(get_le_from_klpair(klpair));
46 }
47 
init_zero()48 void bn_data::init_zero() {
49     toku_mempool_zero(&m_buffer_mempool);
50     m_disksize_of_keys = 0;
51 }
52 
initialize_empty()53 void bn_data::initialize_empty() {
54     init_zero();
55     m_buffer.create();
56 }
57 
add_key(uint32_t keylen)58 void bn_data::add_key(uint32_t keylen) {
59     m_disksize_of_keys += sizeof(keylen) + keylen;
60 }
61 
add_keys(uint32_t n_keys,uint32_t combined_klpair_len)62 void bn_data::add_keys(uint32_t n_keys, uint32_t combined_klpair_len) {
63     invariant(n_keys * sizeof(uint32_t) <= combined_klpair_len);
64     m_disksize_of_keys += combined_klpair_len;
65 }
66 
remove_key(uint32_t keylen)67 void bn_data::remove_key(uint32_t keylen) {
68     m_disksize_of_keys -= sizeof(keylen) + keylen;
69 }
70 
71 // Deserialize from format optimized for keys being inlined.
72 // Currently only supports fixed-length keys.
initialize_from_separate_keys_and_vals(uint32_t num_entries,struct rbuf * rb,uint32_t data_size,uint32_t version UU (),uint32_t key_data_size,uint32_t val_data_size,bool all_keys_same_length,uint32_t fixed_klpair_length)73 void bn_data::initialize_from_separate_keys_and_vals(uint32_t num_entries, struct rbuf *rb, uint32_t data_size, uint32_t version UU(),
74                                                      uint32_t key_data_size, uint32_t val_data_size, bool all_keys_same_length,
75                                                      uint32_t fixed_klpair_length) {
76     paranoid_invariant(version >= FT_LAYOUT_VERSION_26);  // Support was added @26
77     uint32_t ndone_before = rb->ndone;
78     init_zero();
79     invariant(all_keys_same_length);  // Until otherwise supported.
80     const void *keys_src;
81     rbuf_literal_bytes(rb, &keys_src, key_data_size);
82     //Generate dmt
83     this->m_buffer.create_from_sorted_memory_of_fixed_size_elements(
84             keys_src, num_entries, key_data_size, fixed_klpair_length);
85     toku_mempool_construct(&this->m_buffer_mempool, val_data_size);
86 
87     const void *vals_src;
88     rbuf_literal_bytes(rb, &vals_src, val_data_size);
89 
90     if (num_entries > 0) {
91         void *vals_dest = toku_mempool_malloc(&this->m_buffer_mempool, val_data_size);
92         paranoid_invariant_notnull(vals_dest);
93         memcpy(vals_dest, vals_src, val_data_size);
94     }
95 
96     add_keys(num_entries, num_entries * fixed_klpair_length);
97 
98     toku_note_deserialized_basement_node(all_keys_same_length);
99 
100     invariant(rb->ndone - ndone_before == data_size);
101 }
102 
103 static int
wbufwriteleafentry(const void * key,const uint32_t keylen,const LEAFENTRY & le,const uint32_t UU (idx),struct wbuf * const wb)104 wbufwriteleafentry(const void* key, const uint32_t keylen, const LEAFENTRY &le, const uint32_t UU(idx), struct wbuf * const wb) {
105     // need to pack the leafentry as it was in versions
106     // where the key was integrated into it (< 26)
107     uint32_t begin_spot UU() = wb->ndone;
108     uint32_t le_disk_size = leafentry_disksize(le);
109     wbuf_nocrc_uint8_t(wb, le->type);
110     wbuf_nocrc_uint32_t(wb, keylen);
111     if (le->type == LE_CLEAN) {
112         wbuf_nocrc_uint32_t(wb, le->u.clean.vallen);
113         wbuf_nocrc_literal_bytes(wb, key, keylen);
114         wbuf_nocrc_literal_bytes(wb, le->u.clean.val, le->u.clean.vallen);
115     }
116     else {
117         paranoid_invariant(le->type == LE_MVCC);
118         wbuf_nocrc_uint32_t(wb, le->u.mvcc.num_cxrs);
119         wbuf_nocrc_uint8_t(wb, le->u.mvcc.num_pxrs);
120         wbuf_nocrc_literal_bytes(wb, key, keylen);
121         wbuf_nocrc_literal_bytes(wb, le->u.mvcc.xrs, le_disk_size - (1 + 4 + 1));
122     }
123     uint32_t end_spot UU() = wb->ndone;
124     paranoid_invariant((end_spot - begin_spot) == keylen + sizeof(keylen) + le_disk_size);
125     return 0;
126 }
127 
serialize_to_wbuf(struct wbuf * const wb)128 void bn_data::serialize_to_wbuf(struct wbuf *const wb) {
129     prepare_to_serialize();
130     serialize_header(wb);
131     if (m_buffer.value_length_is_fixed()) {
132         serialize_rest(wb);
133     } else {
134         //
135         // iterate over leafentries and place them into the buffer
136         //
137         iterate<struct wbuf, wbufwriteleafentry>(wb);
138     }
139 }
140 
141 // If we have fixed-length keys, we prepare the dmt and mempool.
142 // The mempool is prepared by removing any fragmented space and ordering leafentries in the same order as their keys.
prepare_to_serialize(void)143 void bn_data::prepare_to_serialize(void) {
144     if (m_buffer.value_length_is_fixed()) {
145         m_buffer.prepare_for_serialize();
146         dmt_compress_kvspace(0, nullptr, true);  // Gets it ready for easy serialization.
147     }
148 }
149 
serialize_header(struct wbuf * wb) const150 void bn_data::serialize_header(struct wbuf *wb) const {
151     bool fixed = m_buffer.value_length_is_fixed();
152 
153     //key_data_size
154     wbuf_nocrc_uint(wb, m_disksize_of_keys);
155     //val_data_size
156     wbuf_nocrc_uint(wb, toku_mempool_get_used_size(&m_buffer_mempool));
157     //fixed_klpair_length
158     wbuf_nocrc_uint(wb, m_buffer.get_fixed_length());
159     // all_keys_same_length
160     wbuf_nocrc_uint8_t(wb, fixed);
161     // keys_vals_separate
162     wbuf_nocrc_uint8_t(wb, fixed);
163 }
164 
serialize_rest(struct wbuf * wb) const165 void bn_data::serialize_rest(struct wbuf *wb) const {
166     //Write keys
167     invariant(m_buffer.value_length_is_fixed()); //Assumes prepare_to_serialize was called
168     m_buffer.serialize_values(m_disksize_of_keys, wb);
169 
170     //Write leafentries
171     //Just ran dmt_compress_kvspace so there is no fragmentation and also leafentries are in sorted order.
172     paranoid_invariant(toku_mempool_get_frag_size(&m_buffer_mempool) == 0);
173     uint32_t val_data_size = toku_mempool_get_used_size(&m_buffer_mempool);
174     wbuf_nocrc_literal_bytes(wb, toku_mempool_get_base(&m_buffer_mempool), val_data_size);
175 }
176 
177 // Deserialize from rbuf
deserialize_from_rbuf(uint32_t num_entries,struct rbuf * rb,uint32_t data_size,uint32_t version)178 void bn_data::deserialize_from_rbuf(uint32_t num_entries, struct rbuf *rb, uint32_t data_size, uint32_t version) {
179     uint32_t key_data_size = data_size;  // overallocate if < version 26 (best guess that is guaranteed not too small)
180     uint32_t val_data_size = data_size;  // overallocate if < version 26 (best guess that is guaranteed not too small)
181 
182     bool all_keys_same_length = false;
183     bool keys_vals_separate = false;
184     uint32_t fixed_klpair_length = 0;
185 
186     // In version 25 and older there is no header.  Skip reading header for old version.
187     if (version >= FT_LAYOUT_VERSION_26) {
188         uint32_t ndone_before = rb->ndone;
189         key_data_size = rbuf_int(rb);
190         val_data_size = rbuf_int(rb);
191         fixed_klpair_length = rbuf_int(rb);  // 0 if !all_keys_same_length
192         all_keys_same_length = rbuf_char(rb);
193         keys_vals_separate = rbuf_char(rb);
194         invariant(all_keys_same_length == keys_vals_separate);  // Until we support otherwise
195         uint32_t header_size = rb->ndone - ndone_before;
196         data_size -= header_size;
197         invariant(header_size == HEADER_LENGTH);
198         if (keys_vals_separate) {
199             invariant(fixed_klpair_length >= sizeof(klpair_struct) || num_entries == 0);
200             initialize_from_separate_keys_and_vals(num_entries, rb, data_size, version,
201                                                    key_data_size, val_data_size, all_keys_same_length,
202                                                    fixed_klpair_length);
203             return;
204         }
205     }
206     // Version >= 26 and version 25 deserialization are now identical except that <= 25 might allocate too much memory.
207     const void *bytes;
208     rbuf_literal_bytes(rb, &bytes, data_size);
209     const unsigned char *CAST_FROM_VOIDP(buf, bytes);
210     if (data_size == 0) {
211         invariant_zero(num_entries);
212     }
213     init_zero();
214     klpair_dmt_t::builder dmt_builder;
215     dmt_builder.create(num_entries, key_data_size);
216 
217     // TODO(leif): clean this up (#149)
218     unsigned char *newmem = nullptr;
219     // add 25% extra wiggle room
220     uint32_t allocated_bytes_vals = val_data_size + (val_data_size / 4);
221     CAST_FROM_VOIDP(newmem, toku_xmalloc(allocated_bytes_vals));
222     const unsigned char* curr_src_pos = buf;
223     unsigned char* curr_dest_pos = newmem;
224     for (uint32_t i = 0; i < num_entries; i++) {
225         uint8_t curr_type = curr_src_pos[0];
226         curr_src_pos++;
227         // first thing we do is lay out the key,
228         // to do so, we must extract it from the leafentry
229         // and write it in
230         uint32_t keylen = 0;
231         const void* keyp = nullptr;
232         keylen = *(uint32_t *)curr_src_pos;
233         curr_src_pos += sizeof(uint32_t);
234         uint32_t clean_vallen = 0;
235         uint32_t num_cxrs = 0;
236         uint8_t num_pxrs = 0;
237         if (curr_type == LE_CLEAN) {
238             clean_vallen = toku_dtoh32(*(uint32_t *)curr_src_pos);
239             curr_src_pos += sizeof(clean_vallen); // val_len
240             keyp = curr_src_pos;
241             curr_src_pos += keylen;
242         }
243         else {
244             paranoid_invariant(curr_type == LE_MVCC);
245             num_cxrs = toku_htod32(*(uint32_t *)curr_src_pos);
246             curr_src_pos += sizeof(uint32_t); // num_cxrs
247             num_pxrs = curr_src_pos[0];
248             curr_src_pos += sizeof(uint8_t); //num_pxrs
249             keyp = curr_src_pos;
250             curr_src_pos += keylen;
251         }
252         uint32_t le_offset = curr_dest_pos - newmem;
253         dmt_builder.append(klpair_dmtwriter(keylen, le_offset, keyp));
254         add_key(keylen);
255 
256         // now curr_dest_pos is pointing to where the leafentry should be packed
257         curr_dest_pos[0] = curr_type;
258         curr_dest_pos++;
259         if (curr_type == LE_CLEAN) {
260              *(uint32_t *)curr_dest_pos = toku_htod32(clean_vallen);
261              curr_dest_pos += sizeof(clean_vallen);
262              memcpy(curr_dest_pos, curr_src_pos, clean_vallen); // copy the val
263              curr_dest_pos += clean_vallen;
264              curr_src_pos += clean_vallen;
265         }
266         else {
267             // pack num_cxrs and num_pxrs
268             *(uint32_t *)curr_dest_pos = toku_htod32(num_cxrs);
269             curr_dest_pos += sizeof(num_cxrs);
270             *(uint8_t *)curr_dest_pos = num_pxrs;
271             curr_dest_pos += sizeof(num_pxrs);
272             // now we need to pack the rest of the data
273             uint32_t num_rest_bytes = leafentry_rest_memsize(num_pxrs, num_cxrs, const_cast<uint8_t*>(curr_src_pos));
274             memcpy(curr_dest_pos, curr_src_pos, num_rest_bytes);
275             curr_dest_pos += num_rest_bytes;
276             curr_src_pos += num_rest_bytes;
277         }
278     }
279     dmt_builder.build(&this->m_buffer);
280     toku_note_deserialized_basement_node(m_buffer.value_length_is_fixed());
281 
282     uint32_t num_bytes_read = (uint32_t)(curr_src_pos - buf);
283     invariant(num_bytes_read == data_size);
284 
285     uint32_t num_bytes_written = curr_dest_pos - newmem + m_disksize_of_keys;
286     invariant(num_bytes_written == data_size);
287     toku_mempool_init(&m_buffer_mempool, newmem, (size_t)(curr_dest_pos - newmem), allocated_bytes_vals);
288 
289     invariant(get_disk_size() == data_size);
290     // Versions older than 26 might have allocated too much memory.  Try to shrink the mempool now that we
291     // know how much memory we need.
292     if (version < FT_LAYOUT_VERSION_26) {
293         // Unnecessary after version 26
294         // Reallocate smaller mempool to save memory
295         invariant_zero(toku_mempool_get_frag_size(&m_buffer_mempool));
296         toku_mempool_realloc_larger(&m_buffer_mempool, toku_mempool_get_used_size(&m_buffer_mempool));
297     }
298 }
299 
get_memory_size()300 uint64_t bn_data::get_memory_size() {
301     uint64_t retval = 0;
302     //TODO: Maybe ask for memory_size instead of mempool_footprint (either this todo or the next)
303     // include fragmentation overhead but do not include space in the
304     // mempool that has not yet been allocated for leaf entries
305     size_t poolsize = toku_mempool_footprint(&m_buffer_mempool);
306     retval += poolsize;
307     // This one includes not-yet-allocated for nodes (just like old constant-key omt)
308     //TODO: Maybe ask for mempool_footprint instead of memory_size.
309     retval += m_buffer.memory_size();
310     invariant(retval >= get_disk_size());
311     return retval;
312 }
313 
delete_leafentry(uint32_t idx,uint32_t keylen,uint32_t old_le_size)314 void bn_data::delete_leafentry (
315     uint32_t idx,
316     uint32_t keylen,
317     uint32_t old_le_size
318     )
319 {
320     remove_key(keylen);
321     m_buffer.delete_at(idx);
322     toku_mempool_mfree(&m_buffer_mempool, nullptr, old_le_size);
323 }
324 
325 /* mempool support */
326 
327 struct dmt_compressor_state {
328     struct mempool *new_kvspace;
329     class bn_data *bd;
330 };
331 
move_it(const uint32_t,klpair_struct * klpair,const uint32_t idx UU (),struct dmt_compressor_state * const oc)332 static int move_it (const uint32_t, klpair_struct *klpair, const uint32_t idx UU(), struct dmt_compressor_state * const oc) {
333     LEAFENTRY old_le = oc->bd->get_le_from_klpair(klpair);
334     uint32_t size = leafentry_memsize(old_le);
335     void* newdata = toku_mempool_malloc(oc->new_kvspace, size);
336     paranoid_invariant_notnull(newdata); // we do this on a fresh mempool, so nothing bad should happen
337     memcpy(newdata, old_le, size);
338     klpair->le_offset = toku_mempool_get_offset_from_pointer_and_base(oc->new_kvspace, newdata);
339     return 0;
340 }
341 
342 // Compress things, and grow or shrink the mempool if needed.
343 // May (always if force_compress) have a side effect of putting contents of mempool in sorted order.
dmt_compress_kvspace(size_t added_size,void ** maybe_free,bool force_compress)344 void bn_data::dmt_compress_kvspace(size_t added_size, void **maybe_free, bool force_compress) {
345     uint32_t total_size_needed = toku_mempool_get_used_size(&m_buffer_mempool) + added_size;
346 
347     // If there is no fragmentation, e.g. in serial inserts, we can just increase the size
348     // of the mempool and move things over with a cheap memcpy. If force_compress is true,
349     // the caller needs the side effect that all contents are put in sorted order.
350     bool do_compress = toku_mempool_get_frag_size(&m_buffer_mempool) > 0 || force_compress;
351 
352     void *old_mempool_base = toku_mempool_get_base(&m_buffer_mempool);
353     struct mempool new_kvspace;
354     if (do_compress) {
355         size_t requested_size = force_compress ? total_size_needed : ((total_size_needed * 3) / 2);
356         toku_mempool_construct(&new_kvspace, requested_size);
357         struct dmt_compressor_state oc = { &new_kvspace, this };
358         m_buffer.iterate_ptr< decltype(oc), move_it >(&oc);
359     } else {
360         toku_mempool_construct(&new_kvspace, total_size_needed);
361         size_t old_offset_limit = toku_mempool_get_offset_limit(&m_buffer_mempool);
362         void *new_mempool_base = toku_mempool_malloc(&new_kvspace, old_offset_limit);
363         memcpy(new_mempool_base, old_mempool_base, old_offset_limit);
364     }
365 
366     if (maybe_free) {
367         *maybe_free = old_mempool_base;
368     } else {
369         toku_free(old_mempool_base);
370     }
371     m_buffer_mempool = new_kvspace;
372 }
373 
374 // Effect: Allocate a new object of size SIZE in MP.  If MP runs out of space, allocate new a new mempool space, and copy all the items
375 //  from the OMT (which items refer to items in the old mempool) into the new mempool.
376 //  If MAYBE_FREE is nullptr then free the old mempool's space.
377 //  Otherwise, store the old mempool's space in maybe_free.
mempool_malloc_and_update_dmt(size_t size,void ** maybe_free)378 LEAFENTRY bn_data::mempool_malloc_and_update_dmt(size_t size, void **maybe_free) {
379     void *v = toku_mempool_malloc(&m_buffer_mempool, size);
380     if (v == nullptr) {
381         dmt_compress_kvspace(size, maybe_free, false);
382         v = toku_mempool_malloc(&m_buffer_mempool, size);
383         paranoid_invariant_notnull(v);
384     }
385     return (LEAFENTRY)v;
386 }
387 
get_space_for_overwrite(uint32_t idx,const void * keyp UU (),uint32_t keylen UU (),uint32_t old_keylen,uint32_t old_le_size,uint32_t new_size,LEAFENTRY * new_le_space,void ** const maybe_free)388 void bn_data::get_space_for_overwrite(
389     uint32_t idx,
390     const void* keyp UU(),
391     uint32_t keylen UU(),
392     uint32_t old_keylen,
393     uint32_t old_le_size,
394     uint32_t new_size,
395     LEAFENTRY* new_le_space,
396     void **const maybe_free
397     )
398 {
399     *maybe_free = nullptr;
400     LEAFENTRY new_le = mempool_malloc_and_update_dmt(new_size, maybe_free);
401     toku_mempool_mfree(&m_buffer_mempool, nullptr, old_le_size);
402     klpair_struct* klp = nullptr;
403     uint32_t klpair_len;
404     int r = m_buffer.fetch(idx, &klpair_len, &klp);
405     invariant_zero(r);
406     paranoid_invariant(klp!=nullptr);
407     // Old key length should be consistent with what is stored in the DMT
408     invariant(keylen_from_klpair_len(klpair_len) == old_keylen);
409 
410     size_t new_le_offset = toku_mempool_get_offset_from_pointer_and_base(&this->m_buffer_mempool, new_le);
411     paranoid_invariant(new_le_offset <= UINT32_MAX - new_size);  // Not using > 4GB
412     klp->le_offset = new_le_offset;
413 
414     paranoid_invariant(new_le == get_le_from_klpair(klp));
415     *new_le_space = new_le;
416 }
417 
get_space_for_insert(uint32_t idx,const void * keyp,uint32_t keylen,size_t size,LEAFENTRY * new_le_space,void ** const maybe_free)418 void bn_data::get_space_for_insert(
419     uint32_t idx,
420     const void* keyp,
421     uint32_t keylen,
422     size_t size,
423     LEAFENTRY* new_le_space,
424     void **const maybe_free
425     )
426 {
427     add_key(keylen);
428 
429     *maybe_free = nullptr;
430     LEAFENTRY new_le = mempool_malloc_and_update_dmt(size, maybe_free);
431     size_t new_le_offset = toku_mempool_get_offset_from_pointer_and_base(&this->m_buffer_mempool, new_le);
432 
433     klpair_dmtwriter kl(keylen, new_le_offset, keyp);
434     m_buffer.insert_at(kl, idx);
435 
436     *new_le_space = new_le;
437 }
438 
439 class split_klpairs_extra {
440     bn_data *const m_left_bn;
441     bn_data *const m_right_bn;
442     klpair_dmt_t::builder *const m_left_builder;
443     klpair_dmt_t::builder *const m_right_builder;
444     struct mempool *const m_left_dest_mp;
445     uint32_t m_split_at;
446 
left_dest_mp(void) const447     struct mempool *left_dest_mp(void) const { return m_left_dest_mp; }
right_dest_mp(void) const448     struct mempool *right_dest_mp(void) const { return &m_right_bn->m_buffer_mempool; }
449 
copy_klpair(const uint32_t klpair_len,const klpair_struct & klpair,klpair_dmt_t::builder * const builder,struct mempool * const dest_mp,bn_data * const bn)450     void copy_klpair(const uint32_t klpair_len, const klpair_struct &klpair,
451                      klpair_dmt_t::builder *const builder,
452                      struct mempool *const dest_mp,
453                      bn_data *const bn) {
454         LEAFENTRY old_le = m_left_bn->get_le_from_klpair(&klpair);
455         size_t le_size = leafentry_memsize(old_le);
456 
457         void *new_le = toku_mempool_malloc(dest_mp, le_size);
458         paranoid_invariant_notnull(new_le);
459         memcpy(new_le, old_le, le_size);
460         size_t le_offset = toku_mempool_get_offset_from_pointer_and_base(dest_mp, new_le);
461         size_t keylen = keylen_from_klpair_len(klpair_len);
462         builder->append(klpair_dmtwriter(keylen, le_offset, klpair.key));
463 
464         bn->add_key(keylen);
465     }
466 
move_leafentry(const uint32_t klpair_len,const klpair_struct & klpair,const uint32_t idx)467     int move_leafentry(const uint32_t klpair_len, const klpair_struct &klpair, const uint32_t idx) {
468         m_left_bn->remove_key(keylen_from_klpair_len(klpair_len));
469 
470         if (idx < m_split_at) {
471             copy_klpair(klpair_len, klpair, m_left_builder, left_dest_mp(), m_left_bn);
472         } else {
473             copy_klpair(klpair_len, klpair, m_right_builder, right_dest_mp(), m_right_bn);
474         }
475         return 0;
476     }
477 
478   public:
split_klpairs_extra(bn_data * const left_bn,bn_data * const right_bn,klpair_dmt_t::builder * const left_builder,klpair_dmt_t::builder * const right_builder,struct mempool * const left_new_mp,uint32_t split_at)479     split_klpairs_extra(bn_data *const left_bn, bn_data *const right_bn,
480                         klpair_dmt_t::builder *const left_builder,
481                         klpair_dmt_t::builder *const right_builder,
482                         struct mempool *const left_new_mp,
483                         uint32_t split_at)
484         : m_left_bn(left_bn),
485           m_right_bn(right_bn),
486           m_left_builder(left_builder),
487           m_right_builder(right_builder),
488           m_left_dest_mp(left_new_mp),
489           m_split_at(split_at) {}
cb(const uint32_t klpair_len,const klpair_struct & klpair,const uint32_t idx,split_klpairs_extra * const thisp)490     static int cb(const uint32_t klpair_len, const klpair_struct &klpair, const uint32_t idx, split_klpairs_extra *const thisp) {
491         return thisp->move_leafentry(klpair_len, klpair, idx);
492     }
493 };
494 
split_klpairs(bn_data * right_bd,uint32_t split_at)495 void bn_data::split_klpairs(
496      bn_data* right_bd,
497      uint32_t split_at //lower bound inclusive for right_bd
498      )
499 {
500     // We use move_leafentries_to during a split, and the split algorithm should never call this
501     // if it's splitting on a boundary, so there must be some leafentries in the range to move.
502     paranoid_invariant(split_at < num_klpairs());
503 
504     right_bd->init_zero();
505 
506     size_t mpsize = toku_mempool_get_used_size(&m_buffer_mempool);   // overkill, but safe
507 
508     struct mempool new_left_mp;
509     toku_mempool_construct(&new_left_mp, mpsize);
510 
511     struct mempool *right_mp = &right_bd->m_buffer_mempool;
512     toku_mempool_construct(right_mp, mpsize);
513 
514     klpair_dmt_t::builder left_dmt_builder;
515     left_dmt_builder.create(split_at, m_disksize_of_keys);  // overkill, but safe (builder will realloc at the end)
516 
517     klpair_dmt_t::builder right_dmt_builder;
518     right_dmt_builder.create(num_klpairs() - split_at, m_disksize_of_keys);  // overkill, but safe (builder will realloc at the end)
519 
520     split_klpairs_extra extra(this, right_bd, &left_dmt_builder, &right_dmt_builder, &new_left_mp, split_at);
521 
522     int r = m_buffer.iterate<split_klpairs_extra, split_klpairs_extra::cb>(&extra);
523     invariant_zero(r);
524 
525     m_buffer.destroy();
526     toku_mempool_destroy(&m_buffer_mempool);
527 
528     m_buffer_mempool = new_left_mp;
529 
530     left_dmt_builder.build(&m_buffer);
531     right_dmt_builder.build(&right_bd->m_buffer);
532 
533     // Potentially shrink memory pool for destination.
534     // We overallocated ("overkill") above
535     struct mempool *const left_mp = &m_buffer_mempool;
536     paranoid_invariant_zero(toku_mempool_get_frag_size(left_mp));
537     toku_mempool_realloc_larger(left_mp, toku_mempool_get_used_size(left_mp));
538     paranoid_invariant_zero(toku_mempool_get_frag_size(right_mp));
539     toku_mempool_realloc_larger(right_mp, toku_mempool_get_used_size(right_mp));
540 }
541 
get_disk_size()542 uint64_t bn_data::get_disk_size() {
543     return m_disksize_of_keys +
544            toku_mempool_get_used_size(&m_buffer_mempool);
545 }
546 
547 struct verify_le_in_mempool_state {
548     size_t offset_limit;
549     class bn_data *bd;
550 };
551 
verify_le_in_mempool(const uint32_t,klpair_struct * klpair,const uint32_t idx UU (),struct verify_le_in_mempool_state * const state)552 static int verify_le_in_mempool (const uint32_t, klpair_struct *klpair, const uint32_t idx UU(), struct verify_le_in_mempool_state * const state) {
553     invariant(klpair->le_offset < state->offset_limit);
554 
555     LEAFENTRY le = state->bd->get_le_from_klpair(klpair);
556     uint32_t size = leafentry_memsize(le);
557 
558     size_t end_offset = klpair->le_offset+size;
559 
560     invariant(end_offset <= state->offset_limit);
561     return 0;
562 }
563 
564 //This is a debug-only (paranoid) verification.
565 //Verifies the dmt is valid, and all leafentries are entirely in the mempool's memory.
verify_mempool(void)566 void bn_data::verify_mempool(void) {
567     //Verify the dmt itself <- paranoid and slow
568     m_buffer.verify();
569 
570     verify_le_in_mempool_state state = { .offset_limit = toku_mempool_get_offset_limit(&m_buffer_mempool), .bd = this };
571     //Verify every leafentry pointed to by the keys in the dmt are fully inside the mempool
572     m_buffer.iterate_ptr< decltype(state), verify_le_in_mempool >(&state);
573 }
574 
num_klpairs(void) const575 uint32_t bn_data::num_klpairs(void) const {
576     return m_buffer.size();
577 }
578 
destroy(void)579 void bn_data::destroy(void) {
580     // The buffer may have been freed already, in some cases.
581     m_buffer.destroy();
582     toku_mempool_destroy(&m_buffer_mempool);
583     m_disksize_of_keys = 0;
584 }
585 
set_contents_as_clone_of_sorted_array(uint32_t num_les,const void ** old_key_ptrs,uint32_t * old_keylens,LEAFENTRY * old_les,size_t * le_sizes,size_t total_key_size,size_t total_le_size)586 void bn_data::set_contents_as_clone_of_sorted_array(
587     uint32_t num_les,
588     const void** old_key_ptrs,
589     uint32_t* old_keylens,
590     LEAFENTRY* old_les,
591     size_t *le_sizes,
592     size_t total_key_size,
593     size_t total_le_size
594     )
595 {
596     //Enforce "just created" invariant.
597     paranoid_invariant_zero(m_disksize_of_keys);
598     paranoid_invariant_zero(num_klpairs());
599     paranoid_invariant_null(toku_mempool_get_base(&m_buffer_mempool));
600     paranoid_invariant_zero(toku_mempool_get_size(&m_buffer_mempool));
601 
602     toku_mempool_construct(&m_buffer_mempool, total_le_size);
603     m_buffer.destroy();
604     m_disksize_of_keys = 0;
605 
606     klpair_dmt_t::builder dmt_builder;
607     dmt_builder.create(num_les, total_key_size);
608 
609     for (uint32_t idx = 0; idx < num_les; idx++) {
610         void* new_le = toku_mempool_malloc(&m_buffer_mempool, le_sizes[idx]);
611         paranoid_invariant_notnull(new_le);
612         memcpy(new_le, old_les[idx], le_sizes[idx]);
613         size_t le_offset = toku_mempool_get_offset_from_pointer_and_base(&m_buffer_mempool, new_le);
614         dmt_builder.append(klpair_dmtwriter(old_keylens[idx], le_offset, old_key_ptrs[idx]));
615         add_key(old_keylens[idx]);
616     }
617     dmt_builder.build(&this->m_buffer);
618 }
619 
get_le_from_klpair(const klpair_struct * klpair) const620 LEAFENTRY bn_data::get_le_from_klpair(const klpair_struct *klpair) const {
621     void * ptr = toku_mempool_get_pointer_from_base_and_offset(&this->m_buffer_mempool, klpair->le_offset);
622     LEAFENTRY CAST_FROM_VOIDP(le, ptr);
623     return le;
624 }
625 
626 
627 // get info about a single leafentry by index
fetch_le(uint32_t idx,LEAFENTRY * le)628 int bn_data::fetch_le(uint32_t idx, LEAFENTRY *le) {
629     klpair_struct* klpair = nullptr;
630     int r = m_buffer.fetch(idx, nullptr, &klpair);
631     if (r == 0) {
632         *le = get_le_from_klpair(klpair);
633     }
634     return r;
635 }
636 
fetch_klpair(uint32_t idx,LEAFENTRY * le,uint32_t * len,void ** key)637 int bn_data::fetch_klpair(uint32_t idx, LEAFENTRY *le, uint32_t *len, void** key) {
638     klpair_struct* klpair = nullptr;
639     uint32_t klpair_len;
640     int r = m_buffer.fetch(idx, &klpair_len, &klpair);
641     if (r == 0) {
642         *len = keylen_from_klpair_len(klpair_len);
643         *key = klpair->key;
644         *le = get_le_from_klpair(klpair);
645     }
646     return r;
647 }
648 
fetch_klpair_disksize(uint32_t idx,size_t * size)649 int bn_data::fetch_klpair_disksize(uint32_t idx, size_t *size) {
650     klpair_struct* klpair = nullptr;
651     uint32_t klpair_len;
652     int r = m_buffer.fetch(idx, &klpair_len, &klpair);
653     if (r == 0) {
654         *size = klpair_disksize(klpair_len, klpair);
655     }
656     return r;
657 }
658 
fetch_key_and_len(uint32_t idx,uint32_t * len,void ** key)659 int bn_data::fetch_key_and_len(uint32_t idx, uint32_t *len, void** key) {
660     klpair_struct* klpair = nullptr;
661     uint32_t klpair_len;
662     int r = m_buffer.fetch(idx, &klpair_len, &klpair);
663     if (r == 0) {
664         *len = keylen_from_klpair_len(klpair_len);
665         *key = klpair->key;
666     }
667     return r;
668 }
669 
clone(bn_data * orig_bn_data)670 void bn_data::clone(bn_data* orig_bn_data) {
671     toku_mempool_clone(&orig_bn_data->m_buffer_mempool, &m_buffer_mempool);
672     m_buffer.clone(orig_bn_data->m_buffer);
673     this->m_disksize_of_keys = orig_bn_data->m_disksize_of_keys;
674 }
675 
676