1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include <ft/bndata.h>
40 #include <ft/ft-internal.h>
41 
42 using namespace toku;
klpair_disksize(const uint32_t klpair_len,const klpair_struct * klpair) const43 uint32_t bn_data::klpair_disksize(const uint32_t klpair_len, const klpair_struct *klpair) const {
44     return sizeof(*klpair) + keylen_from_klpair_len(klpair_len) + leafentry_disksize(get_le_from_klpair(klpair));
45 }
46 
init_zero()47 void bn_data::init_zero() {
48     toku_mempool_zero(&m_buffer_mempool);
49     m_disksize_of_keys = 0;
50 }
51 
initialize_empty()52 void bn_data::initialize_empty() {
53     init_zero();
54     m_buffer.create();
55 }
56 
add_key(uint32_t keylen)57 void bn_data::add_key(uint32_t keylen) {
58     m_disksize_of_keys += sizeof(keylen) + keylen;
59 }
60 
add_keys(uint32_t n_keys,uint32_t combined_klpair_len)61 void bn_data::add_keys(uint32_t n_keys, uint32_t combined_klpair_len) {
62     invariant(n_keys * sizeof(uint32_t) <= combined_klpair_len);
63     m_disksize_of_keys += combined_klpair_len;
64 }
65 
remove_key(uint32_t keylen)66 void bn_data::remove_key(uint32_t keylen) {
67     m_disksize_of_keys -= sizeof(keylen) + keylen;
68 }
69 
70 // Deserialize from format optimized for keys being inlined.
71 // Currently only supports fixed-length keys.
initialize_from_separate_keys_and_vals(uint32_t num_entries,struct rbuf * rb,uint32_t data_size,uint32_t version UU (),uint32_t key_data_size,uint32_t val_data_size,bool all_keys_same_length,uint32_t fixed_klpair_length)72 void bn_data::initialize_from_separate_keys_and_vals(uint32_t num_entries, struct rbuf *rb, uint32_t data_size, uint32_t version UU(),
73                                                      uint32_t key_data_size, uint32_t val_data_size, bool all_keys_same_length,
74                                                      uint32_t fixed_klpair_length) {
75     paranoid_invariant(version >= FT_LAYOUT_VERSION_26);  // Support was added @26
76     uint32_t ndone_before = rb->ndone;
77     init_zero();
78     invariant(all_keys_same_length);  // Until otherwise supported.
79     const void *keys_src;
80     rbuf_literal_bytes(rb, &keys_src, key_data_size);
81     //Generate dmt
82     this->m_buffer.create_from_sorted_memory_of_fixed_size_elements(
83             keys_src, num_entries, key_data_size, fixed_klpair_length);
84     toku_mempool_construct(&this->m_buffer_mempool, val_data_size);
85 
86     const void *vals_src;
87     rbuf_literal_bytes(rb, &vals_src, val_data_size);
88 
89     if (num_entries > 0) {
90         void *vals_dest = toku_mempool_malloc(&this->m_buffer_mempool, val_data_size);
91         paranoid_invariant_notnull(vals_dest);
92         memcpy(vals_dest, vals_src, val_data_size);
93     }
94 
95     add_keys(num_entries, num_entries * fixed_klpair_length);
96 
97     toku_note_deserialized_basement_node(all_keys_same_length);
98 
99     invariant(rb->ndone - ndone_before == data_size);
100 }
101 
102 static int
wbufwriteleafentry(const void * key,const uint32_t keylen,const LEAFENTRY & le,const uint32_t UU (idx),struct wbuf * const wb)103 wbufwriteleafentry(const void* key, const uint32_t keylen, const LEAFENTRY &le, const uint32_t UU(idx), struct wbuf * const wb) {
104     // need to pack the leafentry as it was in versions
105     // where the key was integrated into it (< 26)
106     uint32_t begin_spot UU() = wb->ndone;
107     uint32_t le_disk_size = leafentry_disksize(le);
108     wbuf_nocrc_uint8_t(wb, le->type);
109     wbuf_nocrc_uint32_t(wb, keylen);
110     if (le->type == LE_CLEAN) {
111         wbuf_nocrc_uint32_t(wb, le->u.clean.vallen);
112         wbuf_nocrc_literal_bytes(wb, key, keylen);
113         wbuf_nocrc_literal_bytes(wb, le->u.clean.val, le->u.clean.vallen);
114     }
115     else {
116         paranoid_invariant(le->type == LE_MVCC);
117         wbuf_nocrc_uint32_t(wb, le->u.mvcc.num_cxrs);
118         wbuf_nocrc_uint8_t(wb, le->u.mvcc.num_pxrs);
119         wbuf_nocrc_literal_bytes(wb, key, keylen);
120         wbuf_nocrc_literal_bytes(wb, le->u.mvcc.xrs, le_disk_size - (1 + 4 + 1));
121     }
122     uint32_t end_spot UU() = wb->ndone;
123     paranoid_invariant((end_spot - begin_spot) == keylen + sizeof(keylen) + le_disk_size);
124     return 0;
125 }
126 
serialize_to_wbuf(struct wbuf * const wb)127 void bn_data::serialize_to_wbuf(struct wbuf *const wb) {
128     prepare_to_serialize();
129     serialize_header(wb);
130     if (m_buffer.value_length_is_fixed()) {
131         serialize_rest(wb);
132     } else {
133         //
134         // iterate over leafentries and place them into the buffer
135         //
136         iterate<struct wbuf, wbufwriteleafentry>(wb);
137     }
138 }
139 
140 // If we have fixed-length keys, we prepare the dmt and mempool.
141 // The mempool is prepared by removing any fragmented space and ordering leafentries in the same order as their keys.
prepare_to_serialize(void)142 void bn_data::prepare_to_serialize(void) {
143     if (m_buffer.value_length_is_fixed()) {
144         m_buffer.prepare_for_serialize();
145         dmt_compress_kvspace(0, nullptr, true);  // Gets it ready for easy serialization.
146     }
147 }
148 
serialize_header(struct wbuf * wb) const149 void bn_data::serialize_header(struct wbuf *wb) const {
150     bool fixed = m_buffer.value_length_is_fixed();
151 
152     //key_data_size
153     wbuf_nocrc_uint(wb, m_disksize_of_keys);
154     //val_data_size
155     wbuf_nocrc_uint(wb, toku_mempool_get_used_size(&m_buffer_mempool));
156     //fixed_klpair_length
157     wbuf_nocrc_uint(wb, m_buffer.get_fixed_length());
158     // all_keys_same_length
159     wbuf_nocrc_uint8_t(wb, fixed);
160     // keys_vals_separate
161     wbuf_nocrc_uint8_t(wb, fixed);
162 }
163 
serialize_rest(struct wbuf * wb) const164 void bn_data::serialize_rest(struct wbuf *wb) const {
165     //Write keys
166     invariant(m_buffer.value_length_is_fixed()); //Assumes prepare_to_serialize was called
167     m_buffer.serialize_values(m_disksize_of_keys, wb);
168 
169     //Write leafentries
170     //Just ran dmt_compress_kvspace so there is no fragmentation and also leafentries are in sorted order.
171     paranoid_invariant(toku_mempool_get_frag_size(&m_buffer_mempool) == 0);
172     uint32_t val_data_size = toku_mempool_get_used_size(&m_buffer_mempool);
173     wbuf_nocrc_literal_bytes(wb, toku_mempool_get_base(&m_buffer_mempool), val_data_size);
174 }
175 
176 // Deserialize from rbuf
deserialize_from_rbuf(uint32_t num_entries,struct rbuf * rb,uint32_t data_size,uint32_t version)177 void bn_data::deserialize_from_rbuf(uint32_t num_entries, struct rbuf *rb, uint32_t data_size, uint32_t version) {
178     uint32_t key_data_size = data_size;  // overallocate if < version 26 (best guess that is guaranteed not too small)
179     uint32_t val_data_size = data_size;  // overallocate if < version 26 (best guess that is guaranteed not too small)
180 
181     bool all_keys_same_length = false;
182     bool keys_vals_separate = false;
183     uint32_t fixed_klpair_length = 0;
184 
185     // In version 25 and older there is no header.  Skip reading header for old version.
186     if (version >= FT_LAYOUT_VERSION_26) {
187         uint32_t ndone_before = rb->ndone;
188         key_data_size = rbuf_int(rb);
189         val_data_size = rbuf_int(rb);
190         fixed_klpair_length = rbuf_int(rb);  // 0 if !all_keys_same_length
191         all_keys_same_length = rbuf_char(rb);
192         keys_vals_separate = rbuf_char(rb);
193         invariant(all_keys_same_length == keys_vals_separate);  // Until we support otherwise
194         uint32_t header_size = rb->ndone - ndone_before;
195         data_size -= header_size;
196         invariant(header_size == HEADER_LENGTH);
197         if (keys_vals_separate) {
198             invariant(fixed_klpair_length >= sizeof(klpair_struct) || num_entries == 0);
199             initialize_from_separate_keys_and_vals(num_entries, rb, data_size, version,
200                                                    key_data_size, val_data_size, all_keys_same_length,
201                                                    fixed_klpair_length);
202             return;
203         }
204     }
205     // Version >= 26 and version 25 deserialization are now identical except that <= 25 might allocate too much memory.
206     const void *bytes;
207     rbuf_literal_bytes(rb, &bytes, data_size);
208     const unsigned char *CAST_FROM_VOIDP(buf, bytes);
209     if (data_size == 0) {
210         invariant_zero(num_entries);
211     }
212     init_zero();
213     klpair_dmt_t::builder dmt_builder;
214     dmt_builder.create(num_entries, key_data_size);
215 
216     // TODO(leif): clean this up (#149)
217     unsigned char *newmem = nullptr;
218     // add 25% extra wiggle room
219     uint32_t allocated_bytes_vals = val_data_size + (val_data_size / 4);
220     CAST_FROM_VOIDP(newmem, toku_xmalloc(allocated_bytes_vals));
221     const unsigned char* curr_src_pos = buf;
222     unsigned char* curr_dest_pos = newmem;
223     for (uint32_t i = 0; i < num_entries; i++) {
224         uint8_t curr_type = curr_src_pos[0];
225         curr_src_pos++;
226         // first thing we do is lay out the key,
227         // to do so, we must extract it from the leafentry
228         // and write it in
229         uint32_t keylen = 0;
230         const void* keyp = nullptr;
231         keylen = *(uint32_t *)curr_src_pos;
232         curr_src_pos += sizeof(uint32_t);
233         uint32_t clean_vallen = 0;
234         uint32_t num_cxrs = 0;
235         uint8_t num_pxrs = 0;
236         if (curr_type == LE_CLEAN) {
237             clean_vallen = toku_dtoh32(*(uint32_t *)curr_src_pos);
238             curr_src_pos += sizeof(clean_vallen); // val_len
239             keyp = curr_src_pos;
240             curr_src_pos += keylen;
241         }
242         else {
243             paranoid_invariant(curr_type == LE_MVCC);
244             num_cxrs = toku_htod32(*(uint32_t *)curr_src_pos);
245             curr_src_pos += sizeof(uint32_t); // num_cxrs
246             num_pxrs = curr_src_pos[0];
247             curr_src_pos += sizeof(uint8_t); //num_pxrs
248             keyp = curr_src_pos;
249             curr_src_pos += keylen;
250         }
251         uint32_t le_offset = curr_dest_pos - newmem;
252         dmt_builder.append(klpair_dmtwriter(keylen, le_offset, keyp));
253         add_key(keylen);
254 
255         // now curr_dest_pos is pointing to where the leafentry should be packed
256         curr_dest_pos[0] = curr_type;
257         curr_dest_pos++;
258         if (curr_type == LE_CLEAN) {
259              *(uint32_t *)curr_dest_pos = toku_htod32(clean_vallen);
260              curr_dest_pos += sizeof(clean_vallen);
261              memcpy(curr_dest_pos, curr_src_pos, clean_vallen); // copy the val
262              curr_dest_pos += clean_vallen;
263              curr_src_pos += clean_vallen;
264         }
265         else {
266             // pack num_cxrs and num_pxrs
267             *(uint32_t *)curr_dest_pos = toku_htod32(num_cxrs);
268             curr_dest_pos += sizeof(num_cxrs);
269             *(uint8_t *)curr_dest_pos = num_pxrs;
270             curr_dest_pos += sizeof(num_pxrs);
271             // now we need to pack the rest of the data
272             uint32_t num_rest_bytes = leafentry_rest_memsize(num_pxrs, num_cxrs, const_cast<uint8_t*>(curr_src_pos));
273             memcpy(curr_dest_pos, curr_src_pos, num_rest_bytes);
274             curr_dest_pos += num_rest_bytes;
275             curr_src_pos += num_rest_bytes;
276         }
277     }
278     dmt_builder.build(&this->m_buffer);
279     toku_note_deserialized_basement_node(m_buffer.value_length_is_fixed());
280 
281     uint32_t num_bytes_read = (uint32_t)(curr_src_pos - buf);
282     invariant(num_bytes_read == data_size);
283 
284     uint32_t num_bytes_written = curr_dest_pos - newmem + m_disksize_of_keys;
285     invariant(num_bytes_written == data_size);
286     toku_mempool_init(&m_buffer_mempool, newmem, (size_t)(curr_dest_pos - newmem), allocated_bytes_vals);
287 
288     invariant(get_disk_size() == data_size);
289     // Versions older than 26 might have allocated too much memory.  Try to shrink the mempool now that we
290     // know how much memory we need.
291     if (version < FT_LAYOUT_VERSION_26) {
292         // Unnecessary after version 26
293         // Reallocate smaller mempool to save memory
294         invariant_zero(toku_mempool_get_frag_size(&m_buffer_mempool));
295         toku_mempool_realloc_larger(&m_buffer_mempool, toku_mempool_get_used_size(&m_buffer_mempool));
296     }
297 }
298 
get_memory_size()299 uint64_t bn_data::get_memory_size() {
300     uint64_t retval = 0;
301     //TODO: Maybe ask for memory_size instead of mempool_footprint (either this todo or the next)
302     // include fragmentation overhead but do not include space in the
303     // mempool that has not yet been allocated for leaf entries
304     size_t poolsize = toku_mempool_footprint(&m_buffer_mempool);
305     retval += poolsize;
306     // This one includes not-yet-allocated for nodes (just like old constant-key omt)
307     //TODO: Maybe ask for mempool_footprint instead of memory_size.
308     retval += m_buffer.memory_size();
309     invariant(retval >= get_disk_size());
310     return retval;
311 }
312 
delete_leafentry(uint32_t idx,uint32_t keylen,uint32_t old_le_size)313 void bn_data::delete_leafentry (
314     uint32_t idx,
315     uint32_t keylen,
316     uint32_t old_le_size
317     )
318 {
319     remove_key(keylen);
320     m_buffer.delete_at(idx);
321     toku_mempool_mfree(&m_buffer_mempool, nullptr, old_le_size);
322 }
323 
324 /* mempool support */
325 
326 struct dmt_compressor_state {
327     struct mempool *new_kvspace;
328     class bn_data *bd;
329 };
330 
move_it(const uint32_t,klpair_struct * klpair,const uint32_t idx UU (),struct dmt_compressor_state * const oc)331 static int move_it (const uint32_t, klpair_struct *klpair, const uint32_t idx UU(), struct dmt_compressor_state * const oc) {
332     LEAFENTRY old_le = oc->bd->get_le_from_klpair(klpair);
333     uint32_t size = leafentry_memsize(old_le);
334     void* newdata = toku_mempool_malloc(oc->new_kvspace, size);
335     paranoid_invariant_notnull(newdata); // we do this on a fresh mempool, so nothing bad should happen
336     memcpy(newdata, old_le, size);
337     klpair->le_offset = toku_mempool_get_offset_from_pointer_and_base(oc->new_kvspace, newdata);
338     return 0;
339 }
340 
341 // Compress things, and grow or shrink the mempool if needed.
342 // May (always if force_compress) have a side effect of putting contents of mempool in sorted order.
dmt_compress_kvspace(size_t added_size,void ** maybe_free,bool force_compress)343 void bn_data::dmt_compress_kvspace(size_t added_size, void **maybe_free, bool force_compress) {
344     uint32_t total_size_needed = toku_mempool_get_used_size(&m_buffer_mempool) + added_size;
345 
346     // If there is no fragmentation, e.g. in serial inserts, we can just increase the size
347     // of the mempool and move things over with a cheap memcpy. If force_compress is true,
348     // the caller needs the side effect that all contents are put in sorted order.
349     bool do_compress = toku_mempool_get_frag_size(&m_buffer_mempool) > 0 || force_compress;
350 
351     void *old_mempool_base = toku_mempool_get_base(&m_buffer_mempool);
352     struct mempool new_kvspace;
353     if (do_compress) {
354         size_t requested_size = force_compress ? total_size_needed : ((total_size_needed * 3) / 2);
355         toku_mempool_construct(&new_kvspace, requested_size);
356         struct dmt_compressor_state oc = { &new_kvspace, this };
357         m_buffer.iterate_ptr< decltype(oc), move_it >(&oc);
358     } else {
359         toku_mempool_construct(&new_kvspace, total_size_needed);
360         size_t old_offset_limit = toku_mempool_get_offset_limit(&m_buffer_mempool);
361         void *new_mempool_base = toku_mempool_malloc(&new_kvspace, old_offset_limit);
362         memcpy(new_mempool_base, old_mempool_base, old_offset_limit);
363     }
364 
365     if (maybe_free) {
366         *maybe_free = old_mempool_base;
367     } else {
368         toku_free(old_mempool_base);
369     }
370     m_buffer_mempool = new_kvspace;
371 }
372 
373 // Effect: Allocate a new object of size SIZE in MP.  If MP runs out of space, allocate new a new mempool space, and copy all the items
374 //  from the OMT (which items refer to items in the old mempool) into the new mempool.
375 //  If MAYBE_FREE is nullptr then free the old mempool's space.
376 //  Otherwise, store the old mempool's space in maybe_free.
mempool_malloc_and_update_dmt(size_t size,void ** maybe_free)377 LEAFENTRY bn_data::mempool_malloc_and_update_dmt(size_t size, void **maybe_free) {
378     void *v = toku_mempool_malloc(&m_buffer_mempool, size);
379     if (v == nullptr) {
380         dmt_compress_kvspace(size, maybe_free, false);
381         v = toku_mempool_malloc(&m_buffer_mempool, size);
382         paranoid_invariant_notnull(v);
383     }
384     return (LEAFENTRY)v;
385 }
386 
get_space_for_overwrite(uint32_t idx,const void * keyp UU (),uint32_t keylen UU (),uint32_t old_keylen,uint32_t old_le_size,uint32_t new_size,LEAFENTRY * new_le_space,void ** const maybe_free)387 void bn_data::get_space_for_overwrite(
388     uint32_t idx,
389     const void* keyp UU(),
390     uint32_t keylen UU(),
391     uint32_t old_keylen,
392     uint32_t old_le_size,
393     uint32_t new_size,
394     LEAFENTRY* new_le_space,
395     void **const maybe_free
396     )
397 {
398     *maybe_free = nullptr;
399     LEAFENTRY new_le = mempool_malloc_and_update_dmt(new_size, maybe_free);
400     toku_mempool_mfree(&m_buffer_mempool, nullptr, old_le_size);
401     klpair_struct* klp = nullptr;
402     uint32_t klpair_len;
403     int r = m_buffer.fetch(idx, &klpair_len, &klp);
404     invariant_zero(r);
405     paranoid_invariant(klp!=nullptr);
406     // Old key length should be consistent with what is stored in the DMT
407     invariant(keylen_from_klpair_len(klpair_len) == old_keylen);
408 
409     size_t new_le_offset = toku_mempool_get_offset_from_pointer_and_base(&this->m_buffer_mempool, new_le);
410     paranoid_invariant(new_le_offset <= UINT32_MAX - new_size);  // Not using > 4GB
411     klp->le_offset = new_le_offset;
412 
413     paranoid_invariant(new_le == get_le_from_klpair(klp));
414     *new_le_space = new_le;
415 }
416 
get_space_for_insert(uint32_t idx,const void * keyp,uint32_t keylen,size_t size,LEAFENTRY * new_le_space,void ** const maybe_free)417 void bn_data::get_space_for_insert(
418     uint32_t idx,
419     const void* keyp,
420     uint32_t keylen,
421     size_t size,
422     LEAFENTRY* new_le_space,
423     void **const maybe_free
424     )
425 {
426     add_key(keylen);
427 
428     *maybe_free = nullptr;
429     LEAFENTRY new_le = mempool_malloc_and_update_dmt(size, maybe_free);
430     size_t new_le_offset = toku_mempool_get_offset_from_pointer_and_base(&this->m_buffer_mempool, new_le);
431 
432     klpair_dmtwriter kl(keylen, new_le_offset, keyp);
433     m_buffer.insert_at(kl, idx);
434 
435     *new_le_space = new_le;
436 }
437 
438 class split_klpairs_extra {
439     bn_data *const m_left_bn;
440     bn_data *const m_right_bn;
441     klpair_dmt_t::builder *const m_left_builder;
442     klpair_dmt_t::builder *const m_right_builder;
443     struct mempool *const m_left_dest_mp;
444     uint32_t m_split_at;
445 
left_dest_mp(void) const446     struct mempool *left_dest_mp(void) const { return m_left_dest_mp; }
right_dest_mp(void) const447     struct mempool *right_dest_mp(void) const { return &m_right_bn->m_buffer_mempool; }
448 
copy_klpair(const uint32_t klpair_len,const klpair_struct & klpair,klpair_dmt_t::builder * const builder,struct mempool * const dest_mp,bn_data * const bn)449     void copy_klpair(const uint32_t klpair_len, const klpair_struct &klpair,
450                      klpair_dmt_t::builder *const builder,
451                      struct mempool *const dest_mp,
452                      bn_data *const bn) {
453         LEAFENTRY old_le = m_left_bn->get_le_from_klpair(&klpair);
454         size_t le_size = leafentry_memsize(old_le);
455 
456         void *new_le = toku_mempool_malloc(dest_mp, le_size);
457         paranoid_invariant_notnull(new_le);
458         memcpy(new_le, old_le, le_size);
459         size_t le_offset = toku_mempool_get_offset_from_pointer_and_base(dest_mp, new_le);
460         size_t keylen = keylen_from_klpair_len(klpair_len);
461         builder->append(klpair_dmtwriter(keylen, le_offset, klpair.key));
462 
463         bn->add_key(keylen);
464     }
465 
move_leafentry(const uint32_t klpair_len,const klpair_struct & klpair,const uint32_t idx)466     int move_leafentry(const uint32_t klpair_len, const klpair_struct &klpair, const uint32_t idx) {
467         m_left_bn->remove_key(keylen_from_klpair_len(klpair_len));
468 
469         if (idx < m_split_at) {
470             copy_klpair(klpair_len, klpair, m_left_builder, left_dest_mp(), m_left_bn);
471         } else {
472             copy_klpair(klpair_len, klpair, m_right_builder, right_dest_mp(), m_right_bn);
473         }
474         return 0;
475     }
476 
477   public:
split_klpairs_extra(bn_data * const left_bn,bn_data * const right_bn,klpair_dmt_t::builder * const left_builder,klpair_dmt_t::builder * const right_builder,struct mempool * const left_new_mp,uint32_t split_at)478     split_klpairs_extra(bn_data *const left_bn, bn_data *const right_bn,
479                         klpair_dmt_t::builder *const left_builder,
480                         klpair_dmt_t::builder *const right_builder,
481                         struct mempool *const left_new_mp,
482                         uint32_t split_at)
483         : m_left_bn(left_bn),
484           m_right_bn(right_bn),
485           m_left_builder(left_builder),
486           m_right_builder(right_builder),
487           m_left_dest_mp(left_new_mp),
488           m_split_at(split_at) {}
cb(const uint32_t klpair_len,const klpair_struct & klpair,const uint32_t idx,split_klpairs_extra * const thisp)489     static int cb(const uint32_t klpair_len, const klpair_struct &klpair, const uint32_t idx, split_klpairs_extra *const thisp) {
490         return thisp->move_leafentry(klpair_len, klpair, idx);
491     }
492 };
493 
split_klpairs(bn_data * right_bd,uint32_t split_at)494 void bn_data::split_klpairs(
495      bn_data* right_bd,
496      uint32_t split_at //lower bound inclusive for right_bd
497      )
498 {
499     // We use move_leafentries_to during a split, and the split algorithm should never call this
500     // if it's splitting on a boundary, so there must be some leafentries in the range to move.
501     paranoid_invariant(split_at < num_klpairs());
502 
503     right_bd->init_zero();
504 
505     size_t mpsize = toku_mempool_get_used_size(&m_buffer_mempool);   // overkill, but safe
506 
507     struct mempool new_left_mp;
508     toku_mempool_construct(&new_left_mp, mpsize);
509 
510     struct mempool *right_mp = &right_bd->m_buffer_mempool;
511     toku_mempool_construct(right_mp, mpsize);
512 
513     klpair_dmt_t::builder left_dmt_builder;
514     left_dmt_builder.create(split_at, m_disksize_of_keys);  // overkill, but safe (builder will realloc at the end)
515 
516     klpair_dmt_t::builder right_dmt_builder;
517     right_dmt_builder.create(num_klpairs() - split_at, m_disksize_of_keys);  // overkill, but safe (builder will realloc at the end)
518 
519     split_klpairs_extra extra(this, right_bd, &left_dmt_builder, &right_dmt_builder, &new_left_mp, split_at);
520 
521     int r = m_buffer.iterate<split_klpairs_extra, split_klpairs_extra::cb>(&extra);
522     invariant_zero(r);
523 
524     m_buffer.destroy();
525     toku_mempool_destroy(&m_buffer_mempool);
526 
527     m_buffer_mempool = new_left_mp;
528 
529     left_dmt_builder.build(&m_buffer);
530     right_dmt_builder.build(&right_bd->m_buffer);
531 
532     // Potentially shrink memory pool for destination.
533     // We overallocated ("overkill") above
534     struct mempool *const left_mp = &m_buffer_mempool;
535     paranoid_invariant_zero(toku_mempool_get_frag_size(left_mp));
536     toku_mempool_realloc_larger(left_mp, toku_mempool_get_used_size(left_mp));
537     paranoid_invariant_zero(toku_mempool_get_frag_size(right_mp));
538     toku_mempool_realloc_larger(right_mp, toku_mempool_get_used_size(right_mp));
539 }
540 
get_disk_size()541 uint64_t bn_data::get_disk_size() {
542     return m_disksize_of_keys +
543            toku_mempool_get_used_size(&m_buffer_mempool);
544 }
545 
546 struct verify_le_in_mempool_state {
547     size_t offset_limit;
548     class bn_data *bd;
549 };
550 
verify_le_in_mempool(const uint32_t,klpair_struct * klpair,const uint32_t idx UU (),struct verify_le_in_mempool_state * const state)551 static int verify_le_in_mempool (const uint32_t, klpair_struct *klpair, const uint32_t idx UU(), struct verify_le_in_mempool_state * const state) {
552     invariant(klpair->le_offset < state->offset_limit);
553 
554     LEAFENTRY le = state->bd->get_le_from_klpair(klpair);
555     uint32_t size = leafentry_memsize(le);
556 
557     size_t end_offset = klpair->le_offset+size;
558 
559     invariant(end_offset <= state->offset_limit);
560     return 0;
561 }
562 
563 //This is a debug-only (paranoid) verification.
564 //Verifies the dmt is valid, and all leafentries are entirely in the mempool's memory.
verify_mempool(void)565 void bn_data::verify_mempool(void) {
566     //Verify the dmt itself <- paranoid and slow
567     m_buffer.verify();
568 
569     verify_le_in_mempool_state state = { .offset_limit = toku_mempool_get_offset_limit(&m_buffer_mempool), .bd = this };
570     //Verify every leafentry pointed to by the keys in the dmt are fully inside the mempool
571     m_buffer.iterate_ptr< decltype(state), verify_le_in_mempool >(&state);
572 }
573 
num_klpairs(void) const574 uint32_t bn_data::num_klpairs(void) const {
575     return m_buffer.size();
576 }
577 
destroy(void)578 void bn_data::destroy(void) {
579     // The buffer may have been freed already, in some cases.
580     m_buffer.destroy();
581     toku_mempool_destroy(&m_buffer_mempool);
582     m_disksize_of_keys = 0;
583 }
584 
set_contents_as_clone_of_sorted_array(uint32_t num_les,const void ** old_key_ptrs,uint32_t * old_keylens,LEAFENTRY * old_les,size_t * le_sizes,size_t total_key_size,size_t total_le_size)585 void bn_data::set_contents_as_clone_of_sorted_array(
586     uint32_t num_les,
587     const void** old_key_ptrs,
588     uint32_t* old_keylens,
589     LEAFENTRY* old_les,
590     size_t *le_sizes,
591     size_t total_key_size,
592     size_t total_le_size
593     )
594 {
595     //Enforce "just created" invariant.
596     paranoid_invariant_zero(m_disksize_of_keys);
597     paranoid_invariant_zero(num_klpairs());
598     paranoid_invariant_null(toku_mempool_get_base(&m_buffer_mempool));
599     paranoid_invariant_zero(toku_mempool_get_size(&m_buffer_mempool));
600 
601     toku_mempool_construct(&m_buffer_mempool, total_le_size);
602     m_buffer.destroy();
603     m_disksize_of_keys = 0;
604 
605     klpair_dmt_t::builder dmt_builder;
606     dmt_builder.create(num_les, total_key_size);
607 
608     for (uint32_t idx = 0; idx < num_les; idx++) {
609         void* new_le = toku_mempool_malloc(&m_buffer_mempool, le_sizes[idx]);
610         paranoid_invariant_notnull(new_le);
611         memcpy(new_le, old_les[idx], le_sizes[idx]);
612         size_t le_offset = toku_mempool_get_offset_from_pointer_and_base(&m_buffer_mempool, new_le);
613         dmt_builder.append(klpair_dmtwriter(old_keylens[idx], le_offset, old_key_ptrs[idx]));
614         add_key(old_keylens[idx]);
615     }
616     dmt_builder.build(&this->m_buffer);
617 }
618 
get_le_from_klpair(const klpair_struct * klpair) const619 LEAFENTRY bn_data::get_le_from_klpair(const klpair_struct *klpair) const {
620     void * ptr = toku_mempool_get_pointer_from_base_and_offset(&this->m_buffer_mempool, klpair->le_offset);
621     LEAFENTRY CAST_FROM_VOIDP(le, ptr);
622     return le;
623 }
624 
625 
626 // get info about a single leafentry by index
fetch_le(uint32_t idx,LEAFENTRY * le)627 int bn_data::fetch_le(uint32_t idx, LEAFENTRY *le) {
628     klpair_struct* klpair = nullptr;
629     int r = m_buffer.fetch(idx, nullptr, &klpair);
630     if (r == 0) {
631         *le = get_le_from_klpair(klpair);
632     }
633     return r;
634 }
635 
fetch_klpair(uint32_t idx,LEAFENTRY * le,uint32_t * len,void ** key)636 int bn_data::fetch_klpair(uint32_t idx, LEAFENTRY *le, uint32_t *len, void** key) {
637     klpair_struct* klpair = nullptr;
638     uint32_t klpair_len;
639     int r = m_buffer.fetch(idx, &klpair_len, &klpair);
640     if (r == 0) {
641         *len = keylen_from_klpair_len(klpair_len);
642         *key = klpair->key;
643         *le = get_le_from_klpair(klpair);
644     }
645     return r;
646 }
647 
fetch_klpair_disksize(uint32_t idx,size_t * size)648 int bn_data::fetch_klpair_disksize(uint32_t idx, size_t *size) {
649     klpair_struct* klpair = nullptr;
650     uint32_t klpair_len;
651     int r = m_buffer.fetch(idx, &klpair_len, &klpair);
652     if (r == 0) {
653         *size = klpair_disksize(klpair_len, klpair);
654     }
655     return r;
656 }
657 
fetch_key_and_len(uint32_t idx,uint32_t * len,void ** key)658 int bn_data::fetch_key_and_len(uint32_t idx, uint32_t *len, void** key) {
659     klpair_struct* klpair = nullptr;
660     uint32_t klpair_len;
661     int r = m_buffer.fetch(idx, &klpair_len, &klpair);
662     if (r == 0) {
663         *len = keylen_from_klpair_len(klpair_len);
664         *key = klpair->key;
665     }
666     return r;
667 }
668 
clone(bn_data * orig_bn_data)669 void bn_data::clone(bn_data* orig_bn_data) {
670     toku_mempool_clone(&orig_bn_data->m_buffer_mempool, &m_buffer_mempool);
671     m_buffer.clone(orig_bn_data->m_buffer);
672     this->m_disksize_of_keys = orig_bn_data->m_disksize_of_keys;
673 }
674 
675