1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22 ----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39 #include <ft/bndata.h>
40 #include <ft/ft-internal.h>
41
42 using namespace toku;
klpair_disksize(const uint32_t klpair_len,const klpair_struct * klpair) const43 uint32_t bn_data::klpair_disksize(const uint32_t klpair_len, const klpair_struct *klpair) const {
44 return sizeof(*klpair) + keylen_from_klpair_len(klpair_len) + leafentry_disksize(get_le_from_klpair(klpair));
45 }
46
init_zero()47 void bn_data::init_zero() {
48 toku_mempool_zero(&m_buffer_mempool);
49 m_disksize_of_keys = 0;
50 }
51
initialize_empty()52 void bn_data::initialize_empty() {
53 init_zero();
54 m_buffer.create();
55 }
56
add_key(uint32_t keylen)57 void bn_data::add_key(uint32_t keylen) {
58 m_disksize_of_keys += sizeof(keylen) + keylen;
59 }
60
add_keys(uint32_t n_keys,uint32_t combined_klpair_len)61 void bn_data::add_keys(uint32_t n_keys, uint32_t combined_klpair_len) {
62 invariant(n_keys * sizeof(uint32_t) <= combined_klpair_len);
63 m_disksize_of_keys += combined_klpair_len;
64 }
65
remove_key(uint32_t keylen)66 void bn_data::remove_key(uint32_t keylen) {
67 m_disksize_of_keys -= sizeof(keylen) + keylen;
68 }
69
70 // Deserialize from format optimized for keys being inlined.
71 // Currently only supports fixed-length keys.
initialize_from_separate_keys_and_vals(uint32_t num_entries,struct rbuf * rb,uint32_t data_size,uint32_t version UU (),uint32_t key_data_size,uint32_t val_data_size,bool all_keys_same_length,uint32_t fixed_klpair_length)72 void bn_data::initialize_from_separate_keys_and_vals(uint32_t num_entries, struct rbuf *rb, uint32_t data_size, uint32_t version UU(),
73 uint32_t key_data_size, uint32_t val_data_size, bool all_keys_same_length,
74 uint32_t fixed_klpair_length) {
75 paranoid_invariant(version >= FT_LAYOUT_VERSION_26); // Support was added @26
76 uint32_t ndone_before = rb->ndone;
77 init_zero();
78 invariant(all_keys_same_length); // Until otherwise supported.
79 const void *keys_src;
80 rbuf_literal_bytes(rb, &keys_src, key_data_size);
81 //Generate dmt
82 this->m_buffer.create_from_sorted_memory_of_fixed_size_elements(
83 keys_src, num_entries, key_data_size, fixed_klpair_length);
84 toku_mempool_construct(&this->m_buffer_mempool, val_data_size);
85
86 const void *vals_src;
87 rbuf_literal_bytes(rb, &vals_src, val_data_size);
88
89 if (num_entries > 0) {
90 void *vals_dest = toku_mempool_malloc(&this->m_buffer_mempool, val_data_size);
91 paranoid_invariant_notnull(vals_dest);
92 memcpy(vals_dest, vals_src, val_data_size);
93 }
94
95 add_keys(num_entries, num_entries * fixed_klpair_length);
96
97 toku_note_deserialized_basement_node(all_keys_same_length);
98
99 invariant(rb->ndone - ndone_before == data_size);
100 }
101
102 static int
wbufwriteleafentry(const void * key,const uint32_t keylen,const LEAFENTRY & le,const uint32_t UU (idx),struct wbuf * const wb)103 wbufwriteleafentry(const void* key, const uint32_t keylen, const LEAFENTRY &le, const uint32_t UU(idx), struct wbuf * const wb) {
104 // need to pack the leafentry as it was in versions
105 // where the key was integrated into it (< 26)
106 uint32_t begin_spot UU() = wb->ndone;
107 uint32_t le_disk_size = leafentry_disksize(le);
108 wbuf_nocrc_uint8_t(wb, le->type);
109 wbuf_nocrc_uint32_t(wb, keylen);
110 if (le->type == LE_CLEAN) {
111 wbuf_nocrc_uint32_t(wb, le->u.clean.vallen);
112 wbuf_nocrc_literal_bytes(wb, key, keylen);
113 wbuf_nocrc_literal_bytes(wb, le->u.clean.val, le->u.clean.vallen);
114 }
115 else {
116 paranoid_invariant(le->type == LE_MVCC);
117 wbuf_nocrc_uint32_t(wb, le->u.mvcc.num_cxrs);
118 wbuf_nocrc_uint8_t(wb, le->u.mvcc.num_pxrs);
119 wbuf_nocrc_literal_bytes(wb, key, keylen);
120 wbuf_nocrc_literal_bytes(wb, le->u.mvcc.xrs, le_disk_size - (1 + 4 + 1));
121 }
122 uint32_t end_spot UU() = wb->ndone;
123 paranoid_invariant((end_spot - begin_spot) == keylen + sizeof(keylen) + le_disk_size);
124 return 0;
125 }
126
serialize_to_wbuf(struct wbuf * const wb)127 void bn_data::serialize_to_wbuf(struct wbuf *const wb) {
128 prepare_to_serialize();
129 serialize_header(wb);
130 if (m_buffer.value_length_is_fixed()) {
131 serialize_rest(wb);
132 } else {
133 //
134 // iterate over leafentries and place them into the buffer
135 //
136 iterate<struct wbuf, wbufwriteleafentry>(wb);
137 }
138 }
139
140 // If we have fixed-length keys, we prepare the dmt and mempool.
141 // The mempool is prepared by removing any fragmented space and ordering leafentries in the same order as their keys.
prepare_to_serialize(void)142 void bn_data::prepare_to_serialize(void) {
143 if (m_buffer.value_length_is_fixed()) {
144 m_buffer.prepare_for_serialize();
145 dmt_compress_kvspace(0, nullptr, true); // Gets it ready for easy serialization.
146 }
147 }
148
serialize_header(struct wbuf * wb) const149 void bn_data::serialize_header(struct wbuf *wb) const {
150 bool fixed = m_buffer.value_length_is_fixed();
151
152 //key_data_size
153 wbuf_nocrc_uint(wb, m_disksize_of_keys);
154 //val_data_size
155 wbuf_nocrc_uint(wb, toku_mempool_get_used_size(&m_buffer_mempool));
156 //fixed_klpair_length
157 wbuf_nocrc_uint(wb, m_buffer.get_fixed_length());
158 // all_keys_same_length
159 wbuf_nocrc_uint8_t(wb, fixed);
160 // keys_vals_separate
161 wbuf_nocrc_uint8_t(wb, fixed);
162 }
163
serialize_rest(struct wbuf * wb) const164 void bn_data::serialize_rest(struct wbuf *wb) const {
165 //Write keys
166 invariant(m_buffer.value_length_is_fixed()); //Assumes prepare_to_serialize was called
167 m_buffer.serialize_values(m_disksize_of_keys, wb);
168
169 //Write leafentries
170 //Just ran dmt_compress_kvspace so there is no fragmentation and also leafentries are in sorted order.
171 paranoid_invariant(toku_mempool_get_frag_size(&m_buffer_mempool) == 0);
172 uint32_t val_data_size = toku_mempool_get_used_size(&m_buffer_mempool);
173 wbuf_nocrc_literal_bytes(wb, toku_mempool_get_base(&m_buffer_mempool), val_data_size);
174 }
175
176 // Deserialize from rbuf
deserialize_from_rbuf(uint32_t num_entries,struct rbuf * rb,uint32_t data_size,uint32_t version)177 void bn_data::deserialize_from_rbuf(uint32_t num_entries, struct rbuf *rb, uint32_t data_size, uint32_t version) {
178 uint32_t key_data_size = data_size; // overallocate if < version 26 (best guess that is guaranteed not too small)
179 uint32_t val_data_size = data_size; // overallocate if < version 26 (best guess that is guaranteed not too small)
180
181 bool all_keys_same_length = false;
182 bool keys_vals_separate = false;
183 uint32_t fixed_klpair_length = 0;
184
185 // In version 25 and older there is no header. Skip reading header for old version.
186 if (version >= FT_LAYOUT_VERSION_26) {
187 uint32_t ndone_before = rb->ndone;
188 key_data_size = rbuf_int(rb);
189 val_data_size = rbuf_int(rb);
190 fixed_klpair_length = rbuf_int(rb); // 0 if !all_keys_same_length
191 all_keys_same_length = rbuf_char(rb);
192 keys_vals_separate = rbuf_char(rb);
193 invariant(all_keys_same_length == keys_vals_separate); // Until we support otherwise
194 uint32_t header_size = rb->ndone - ndone_before;
195 data_size -= header_size;
196 invariant(header_size == HEADER_LENGTH);
197 if (keys_vals_separate) {
198 invariant(fixed_klpair_length >= sizeof(klpair_struct) || num_entries == 0);
199 initialize_from_separate_keys_and_vals(num_entries, rb, data_size, version,
200 key_data_size, val_data_size, all_keys_same_length,
201 fixed_klpair_length);
202 return;
203 }
204 }
205 // Version >= 26 and version 25 deserialization are now identical except that <= 25 might allocate too much memory.
206 const void *bytes;
207 rbuf_literal_bytes(rb, &bytes, data_size);
208 const unsigned char *CAST_FROM_VOIDP(buf, bytes);
209 if (data_size == 0) {
210 invariant_zero(num_entries);
211 }
212 init_zero();
213 klpair_dmt_t::builder dmt_builder;
214 dmt_builder.create(num_entries, key_data_size);
215
216 // TODO(leif): clean this up (#149)
217 unsigned char *newmem = nullptr;
218 // add 25% extra wiggle room
219 uint32_t allocated_bytes_vals = val_data_size + (val_data_size / 4);
220 CAST_FROM_VOIDP(newmem, toku_xmalloc(allocated_bytes_vals));
221 const unsigned char* curr_src_pos = buf;
222 unsigned char* curr_dest_pos = newmem;
223 for (uint32_t i = 0; i < num_entries; i++) {
224 uint8_t curr_type = curr_src_pos[0];
225 curr_src_pos++;
226 // first thing we do is lay out the key,
227 // to do so, we must extract it from the leafentry
228 // and write it in
229 uint32_t keylen = 0;
230 const void* keyp = nullptr;
231 keylen = *(uint32_t *)curr_src_pos;
232 curr_src_pos += sizeof(uint32_t);
233 uint32_t clean_vallen = 0;
234 uint32_t num_cxrs = 0;
235 uint8_t num_pxrs = 0;
236 if (curr_type == LE_CLEAN) {
237 clean_vallen = toku_dtoh32(*(uint32_t *)curr_src_pos);
238 curr_src_pos += sizeof(clean_vallen); // val_len
239 keyp = curr_src_pos;
240 curr_src_pos += keylen;
241 }
242 else {
243 paranoid_invariant(curr_type == LE_MVCC);
244 num_cxrs = toku_htod32(*(uint32_t *)curr_src_pos);
245 curr_src_pos += sizeof(uint32_t); // num_cxrs
246 num_pxrs = curr_src_pos[0];
247 curr_src_pos += sizeof(uint8_t); //num_pxrs
248 keyp = curr_src_pos;
249 curr_src_pos += keylen;
250 }
251 uint32_t le_offset = curr_dest_pos - newmem;
252 dmt_builder.append(klpair_dmtwriter(keylen, le_offset, keyp));
253 add_key(keylen);
254
255 // now curr_dest_pos is pointing to where the leafentry should be packed
256 curr_dest_pos[0] = curr_type;
257 curr_dest_pos++;
258 if (curr_type == LE_CLEAN) {
259 *(uint32_t *)curr_dest_pos = toku_htod32(clean_vallen);
260 curr_dest_pos += sizeof(clean_vallen);
261 memcpy(curr_dest_pos, curr_src_pos, clean_vallen); // copy the val
262 curr_dest_pos += clean_vallen;
263 curr_src_pos += clean_vallen;
264 }
265 else {
266 // pack num_cxrs and num_pxrs
267 *(uint32_t *)curr_dest_pos = toku_htod32(num_cxrs);
268 curr_dest_pos += sizeof(num_cxrs);
269 *(uint8_t *)curr_dest_pos = num_pxrs;
270 curr_dest_pos += sizeof(num_pxrs);
271 // now we need to pack the rest of the data
272 uint32_t num_rest_bytes = leafentry_rest_memsize(num_pxrs, num_cxrs, const_cast<uint8_t*>(curr_src_pos));
273 memcpy(curr_dest_pos, curr_src_pos, num_rest_bytes);
274 curr_dest_pos += num_rest_bytes;
275 curr_src_pos += num_rest_bytes;
276 }
277 }
278 dmt_builder.build(&this->m_buffer);
279 toku_note_deserialized_basement_node(m_buffer.value_length_is_fixed());
280
281 uint32_t num_bytes_read = (uint32_t)(curr_src_pos - buf);
282 invariant(num_bytes_read == data_size);
283
284 uint32_t num_bytes_written = curr_dest_pos - newmem + m_disksize_of_keys;
285 invariant(num_bytes_written == data_size);
286 toku_mempool_init(&m_buffer_mempool, newmem, (size_t)(curr_dest_pos - newmem), allocated_bytes_vals);
287
288 invariant(get_disk_size() == data_size);
289 // Versions older than 26 might have allocated too much memory. Try to shrink the mempool now that we
290 // know how much memory we need.
291 if (version < FT_LAYOUT_VERSION_26) {
292 // Unnecessary after version 26
293 // Reallocate smaller mempool to save memory
294 invariant_zero(toku_mempool_get_frag_size(&m_buffer_mempool));
295 toku_mempool_realloc_larger(&m_buffer_mempool, toku_mempool_get_used_size(&m_buffer_mempool));
296 }
297 }
298
get_memory_size()299 uint64_t bn_data::get_memory_size() {
300 uint64_t retval = 0;
301 //TODO: Maybe ask for memory_size instead of mempool_footprint (either this todo or the next)
302 // include fragmentation overhead but do not include space in the
303 // mempool that has not yet been allocated for leaf entries
304 size_t poolsize = toku_mempool_footprint(&m_buffer_mempool);
305 retval += poolsize;
306 // This one includes not-yet-allocated for nodes (just like old constant-key omt)
307 //TODO: Maybe ask for mempool_footprint instead of memory_size.
308 retval += m_buffer.memory_size();
309 invariant(retval >= get_disk_size());
310 return retval;
311 }
312
delete_leafentry(uint32_t idx,uint32_t keylen,uint32_t old_le_size)313 void bn_data::delete_leafentry (
314 uint32_t idx,
315 uint32_t keylen,
316 uint32_t old_le_size
317 )
318 {
319 remove_key(keylen);
320 m_buffer.delete_at(idx);
321 toku_mempool_mfree(&m_buffer_mempool, nullptr, old_le_size);
322 }
323
324 /* mempool support */
325
326 struct dmt_compressor_state {
327 struct mempool *new_kvspace;
328 class bn_data *bd;
329 };
330
move_it(const uint32_t,klpair_struct * klpair,const uint32_t idx UU (),struct dmt_compressor_state * const oc)331 static int move_it (const uint32_t, klpair_struct *klpair, const uint32_t idx UU(), struct dmt_compressor_state * const oc) {
332 LEAFENTRY old_le = oc->bd->get_le_from_klpair(klpair);
333 uint32_t size = leafentry_memsize(old_le);
334 void* newdata = toku_mempool_malloc(oc->new_kvspace, size);
335 paranoid_invariant_notnull(newdata); // we do this on a fresh mempool, so nothing bad should happen
336 memcpy(newdata, old_le, size);
337 klpair->le_offset = toku_mempool_get_offset_from_pointer_and_base(oc->new_kvspace, newdata);
338 return 0;
339 }
340
341 // Compress things, and grow or shrink the mempool if needed.
342 // May (always if force_compress) have a side effect of putting contents of mempool in sorted order.
dmt_compress_kvspace(size_t added_size,void ** maybe_free,bool force_compress)343 void bn_data::dmt_compress_kvspace(size_t added_size, void **maybe_free, bool force_compress) {
344 uint32_t total_size_needed = toku_mempool_get_used_size(&m_buffer_mempool) + added_size;
345
346 // If there is no fragmentation, e.g. in serial inserts, we can just increase the size
347 // of the mempool and move things over with a cheap memcpy. If force_compress is true,
348 // the caller needs the side effect that all contents are put in sorted order.
349 bool do_compress = toku_mempool_get_frag_size(&m_buffer_mempool) > 0 || force_compress;
350
351 void *old_mempool_base = toku_mempool_get_base(&m_buffer_mempool);
352 struct mempool new_kvspace;
353 if (do_compress) {
354 size_t requested_size = force_compress ? total_size_needed : ((total_size_needed * 3) / 2);
355 toku_mempool_construct(&new_kvspace, requested_size);
356 struct dmt_compressor_state oc = { &new_kvspace, this };
357 m_buffer.iterate_ptr< decltype(oc), move_it >(&oc);
358 } else {
359 toku_mempool_construct(&new_kvspace, total_size_needed);
360 size_t old_offset_limit = toku_mempool_get_offset_limit(&m_buffer_mempool);
361 void *new_mempool_base = toku_mempool_malloc(&new_kvspace, old_offset_limit);
362 memcpy(new_mempool_base, old_mempool_base, old_offset_limit);
363 }
364
365 if (maybe_free) {
366 *maybe_free = old_mempool_base;
367 } else {
368 toku_free(old_mempool_base);
369 }
370 m_buffer_mempool = new_kvspace;
371 }
372
373 // Effect: Allocate a new object of size SIZE in MP. If MP runs out of space, allocate new a new mempool space, and copy all the items
374 // from the OMT (which items refer to items in the old mempool) into the new mempool.
375 // If MAYBE_FREE is nullptr then free the old mempool's space.
376 // Otherwise, store the old mempool's space in maybe_free.
mempool_malloc_and_update_dmt(size_t size,void ** maybe_free)377 LEAFENTRY bn_data::mempool_malloc_and_update_dmt(size_t size, void **maybe_free) {
378 void *v = toku_mempool_malloc(&m_buffer_mempool, size);
379 if (v == nullptr) {
380 dmt_compress_kvspace(size, maybe_free, false);
381 v = toku_mempool_malloc(&m_buffer_mempool, size);
382 paranoid_invariant_notnull(v);
383 }
384 return (LEAFENTRY)v;
385 }
386
get_space_for_overwrite(uint32_t idx,const void * keyp UU (),uint32_t keylen UU (),uint32_t old_keylen,uint32_t old_le_size,uint32_t new_size,LEAFENTRY * new_le_space,void ** const maybe_free)387 void bn_data::get_space_for_overwrite(
388 uint32_t idx,
389 const void* keyp UU(),
390 uint32_t keylen UU(),
391 uint32_t old_keylen,
392 uint32_t old_le_size,
393 uint32_t new_size,
394 LEAFENTRY* new_le_space,
395 void **const maybe_free
396 )
397 {
398 *maybe_free = nullptr;
399 LEAFENTRY new_le = mempool_malloc_and_update_dmt(new_size, maybe_free);
400 toku_mempool_mfree(&m_buffer_mempool, nullptr, old_le_size);
401 klpair_struct* klp = nullptr;
402 uint32_t klpair_len;
403 int r = m_buffer.fetch(idx, &klpair_len, &klp);
404 invariant_zero(r);
405 paranoid_invariant(klp!=nullptr);
406 // Old key length should be consistent with what is stored in the DMT
407 invariant(keylen_from_klpair_len(klpair_len) == old_keylen);
408
409 size_t new_le_offset = toku_mempool_get_offset_from_pointer_and_base(&this->m_buffer_mempool, new_le);
410 paranoid_invariant(new_le_offset <= UINT32_MAX - new_size); // Not using > 4GB
411 klp->le_offset = new_le_offset;
412
413 paranoid_invariant(new_le == get_le_from_klpair(klp));
414 *new_le_space = new_le;
415 }
416
get_space_for_insert(uint32_t idx,const void * keyp,uint32_t keylen,size_t size,LEAFENTRY * new_le_space,void ** const maybe_free)417 void bn_data::get_space_for_insert(
418 uint32_t idx,
419 const void* keyp,
420 uint32_t keylen,
421 size_t size,
422 LEAFENTRY* new_le_space,
423 void **const maybe_free
424 )
425 {
426 add_key(keylen);
427
428 *maybe_free = nullptr;
429 LEAFENTRY new_le = mempool_malloc_and_update_dmt(size, maybe_free);
430 size_t new_le_offset = toku_mempool_get_offset_from_pointer_and_base(&this->m_buffer_mempool, new_le);
431
432 klpair_dmtwriter kl(keylen, new_le_offset, keyp);
433 m_buffer.insert_at(kl, idx);
434
435 *new_le_space = new_le;
436 }
437
438 class split_klpairs_extra {
439 bn_data *const m_left_bn;
440 bn_data *const m_right_bn;
441 klpair_dmt_t::builder *const m_left_builder;
442 klpair_dmt_t::builder *const m_right_builder;
443 struct mempool *const m_left_dest_mp;
444 uint32_t m_split_at;
445
left_dest_mp(void) const446 struct mempool *left_dest_mp(void) const { return m_left_dest_mp; }
right_dest_mp(void) const447 struct mempool *right_dest_mp(void) const { return &m_right_bn->m_buffer_mempool; }
448
copy_klpair(const uint32_t klpair_len,const klpair_struct & klpair,klpair_dmt_t::builder * const builder,struct mempool * const dest_mp,bn_data * const bn)449 void copy_klpair(const uint32_t klpair_len, const klpair_struct &klpair,
450 klpair_dmt_t::builder *const builder,
451 struct mempool *const dest_mp,
452 bn_data *const bn) {
453 LEAFENTRY old_le = m_left_bn->get_le_from_klpair(&klpair);
454 size_t le_size = leafentry_memsize(old_le);
455
456 void *new_le = toku_mempool_malloc(dest_mp, le_size);
457 paranoid_invariant_notnull(new_le);
458 memcpy(new_le, old_le, le_size);
459 size_t le_offset = toku_mempool_get_offset_from_pointer_and_base(dest_mp, new_le);
460 size_t keylen = keylen_from_klpair_len(klpair_len);
461 builder->append(klpair_dmtwriter(keylen, le_offset, klpair.key));
462
463 bn->add_key(keylen);
464 }
465
move_leafentry(const uint32_t klpair_len,const klpair_struct & klpair,const uint32_t idx)466 int move_leafentry(const uint32_t klpair_len, const klpair_struct &klpair, const uint32_t idx) {
467 m_left_bn->remove_key(keylen_from_klpair_len(klpair_len));
468
469 if (idx < m_split_at) {
470 copy_klpair(klpair_len, klpair, m_left_builder, left_dest_mp(), m_left_bn);
471 } else {
472 copy_klpair(klpair_len, klpair, m_right_builder, right_dest_mp(), m_right_bn);
473 }
474 return 0;
475 }
476
477 public:
split_klpairs_extra(bn_data * const left_bn,bn_data * const right_bn,klpair_dmt_t::builder * const left_builder,klpair_dmt_t::builder * const right_builder,struct mempool * const left_new_mp,uint32_t split_at)478 split_klpairs_extra(bn_data *const left_bn, bn_data *const right_bn,
479 klpair_dmt_t::builder *const left_builder,
480 klpair_dmt_t::builder *const right_builder,
481 struct mempool *const left_new_mp,
482 uint32_t split_at)
483 : m_left_bn(left_bn),
484 m_right_bn(right_bn),
485 m_left_builder(left_builder),
486 m_right_builder(right_builder),
487 m_left_dest_mp(left_new_mp),
488 m_split_at(split_at) {}
cb(const uint32_t klpair_len,const klpair_struct & klpair,const uint32_t idx,split_klpairs_extra * const thisp)489 static int cb(const uint32_t klpair_len, const klpair_struct &klpair, const uint32_t idx, split_klpairs_extra *const thisp) {
490 return thisp->move_leafentry(klpair_len, klpair, idx);
491 }
492 };
493
split_klpairs(bn_data * right_bd,uint32_t split_at)494 void bn_data::split_klpairs(
495 bn_data* right_bd,
496 uint32_t split_at //lower bound inclusive for right_bd
497 )
498 {
499 // We use move_leafentries_to during a split, and the split algorithm should never call this
500 // if it's splitting on a boundary, so there must be some leafentries in the range to move.
501 paranoid_invariant(split_at < num_klpairs());
502
503 right_bd->init_zero();
504
505 size_t mpsize = toku_mempool_get_used_size(&m_buffer_mempool); // overkill, but safe
506
507 struct mempool new_left_mp;
508 toku_mempool_construct(&new_left_mp, mpsize);
509
510 struct mempool *right_mp = &right_bd->m_buffer_mempool;
511 toku_mempool_construct(right_mp, mpsize);
512
513 klpair_dmt_t::builder left_dmt_builder;
514 left_dmt_builder.create(split_at, m_disksize_of_keys); // overkill, but safe (builder will realloc at the end)
515
516 klpair_dmt_t::builder right_dmt_builder;
517 right_dmt_builder.create(num_klpairs() - split_at, m_disksize_of_keys); // overkill, but safe (builder will realloc at the end)
518
519 split_klpairs_extra extra(this, right_bd, &left_dmt_builder, &right_dmt_builder, &new_left_mp, split_at);
520
521 int r = m_buffer.iterate<split_klpairs_extra, split_klpairs_extra::cb>(&extra);
522 invariant_zero(r);
523
524 m_buffer.destroy();
525 toku_mempool_destroy(&m_buffer_mempool);
526
527 m_buffer_mempool = new_left_mp;
528
529 left_dmt_builder.build(&m_buffer);
530 right_dmt_builder.build(&right_bd->m_buffer);
531
532 // Potentially shrink memory pool for destination.
533 // We overallocated ("overkill") above
534 struct mempool *const left_mp = &m_buffer_mempool;
535 paranoid_invariant_zero(toku_mempool_get_frag_size(left_mp));
536 toku_mempool_realloc_larger(left_mp, toku_mempool_get_used_size(left_mp));
537 paranoid_invariant_zero(toku_mempool_get_frag_size(right_mp));
538 toku_mempool_realloc_larger(right_mp, toku_mempool_get_used_size(right_mp));
539 }
540
get_disk_size()541 uint64_t bn_data::get_disk_size() {
542 return m_disksize_of_keys +
543 toku_mempool_get_used_size(&m_buffer_mempool);
544 }
545
546 struct verify_le_in_mempool_state {
547 size_t offset_limit;
548 class bn_data *bd;
549 };
550
verify_le_in_mempool(const uint32_t,klpair_struct * klpair,const uint32_t idx UU (),struct verify_le_in_mempool_state * const state)551 static int verify_le_in_mempool (const uint32_t, klpair_struct *klpair, const uint32_t idx UU(), struct verify_le_in_mempool_state * const state) {
552 invariant(klpair->le_offset < state->offset_limit);
553
554 LEAFENTRY le = state->bd->get_le_from_klpair(klpair);
555 uint32_t size = leafentry_memsize(le);
556
557 size_t end_offset = klpair->le_offset+size;
558
559 invariant(end_offset <= state->offset_limit);
560 return 0;
561 }
562
563 //This is a debug-only (paranoid) verification.
564 //Verifies the dmt is valid, and all leafentries are entirely in the mempool's memory.
verify_mempool(void)565 void bn_data::verify_mempool(void) {
566 //Verify the dmt itself <- paranoid and slow
567 m_buffer.verify();
568
569 verify_le_in_mempool_state state = { .offset_limit = toku_mempool_get_offset_limit(&m_buffer_mempool), .bd = this };
570 //Verify every leafentry pointed to by the keys in the dmt are fully inside the mempool
571 m_buffer.iterate_ptr< decltype(state), verify_le_in_mempool >(&state);
572 }
573
num_klpairs(void) const574 uint32_t bn_data::num_klpairs(void) const {
575 return m_buffer.size();
576 }
577
destroy(void)578 void bn_data::destroy(void) {
579 // The buffer may have been freed already, in some cases.
580 m_buffer.destroy();
581 toku_mempool_destroy(&m_buffer_mempool);
582 m_disksize_of_keys = 0;
583 }
584
set_contents_as_clone_of_sorted_array(uint32_t num_les,const void ** old_key_ptrs,uint32_t * old_keylens,LEAFENTRY * old_les,size_t * le_sizes,size_t total_key_size,size_t total_le_size)585 void bn_data::set_contents_as_clone_of_sorted_array(
586 uint32_t num_les,
587 const void** old_key_ptrs,
588 uint32_t* old_keylens,
589 LEAFENTRY* old_les,
590 size_t *le_sizes,
591 size_t total_key_size,
592 size_t total_le_size
593 )
594 {
595 //Enforce "just created" invariant.
596 paranoid_invariant_zero(m_disksize_of_keys);
597 paranoid_invariant_zero(num_klpairs());
598 paranoid_invariant_null(toku_mempool_get_base(&m_buffer_mempool));
599 paranoid_invariant_zero(toku_mempool_get_size(&m_buffer_mempool));
600
601 toku_mempool_construct(&m_buffer_mempool, total_le_size);
602 m_buffer.destroy();
603 m_disksize_of_keys = 0;
604
605 klpair_dmt_t::builder dmt_builder;
606 dmt_builder.create(num_les, total_key_size);
607
608 for (uint32_t idx = 0; idx < num_les; idx++) {
609 void* new_le = toku_mempool_malloc(&m_buffer_mempool, le_sizes[idx]);
610 paranoid_invariant_notnull(new_le);
611 memcpy(new_le, old_les[idx], le_sizes[idx]);
612 size_t le_offset = toku_mempool_get_offset_from_pointer_and_base(&m_buffer_mempool, new_le);
613 dmt_builder.append(klpair_dmtwriter(old_keylens[idx], le_offset, old_key_ptrs[idx]));
614 add_key(old_keylens[idx]);
615 }
616 dmt_builder.build(&this->m_buffer);
617 }
618
get_le_from_klpair(const klpair_struct * klpair) const619 LEAFENTRY bn_data::get_le_from_klpair(const klpair_struct *klpair) const {
620 void * ptr = toku_mempool_get_pointer_from_base_and_offset(&this->m_buffer_mempool, klpair->le_offset);
621 LEAFENTRY CAST_FROM_VOIDP(le, ptr);
622 return le;
623 }
624
625
626 // get info about a single leafentry by index
fetch_le(uint32_t idx,LEAFENTRY * le)627 int bn_data::fetch_le(uint32_t idx, LEAFENTRY *le) {
628 klpair_struct* klpair = nullptr;
629 int r = m_buffer.fetch(idx, nullptr, &klpair);
630 if (r == 0) {
631 *le = get_le_from_klpair(klpair);
632 }
633 return r;
634 }
635
fetch_klpair(uint32_t idx,LEAFENTRY * le,uint32_t * len,void ** key)636 int bn_data::fetch_klpair(uint32_t idx, LEAFENTRY *le, uint32_t *len, void** key) {
637 klpair_struct* klpair = nullptr;
638 uint32_t klpair_len;
639 int r = m_buffer.fetch(idx, &klpair_len, &klpair);
640 if (r == 0) {
641 *len = keylen_from_klpair_len(klpair_len);
642 *key = klpair->key;
643 *le = get_le_from_klpair(klpair);
644 }
645 return r;
646 }
647
fetch_klpair_disksize(uint32_t idx,size_t * size)648 int bn_data::fetch_klpair_disksize(uint32_t idx, size_t *size) {
649 klpair_struct* klpair = nullptr;
650 uint32_t klpair_len;
651 int r = m_buffer.fetch(idx, &klpair_len, &klpair);
652 if (r == 0) {
653 *size = klpair_disksize(klpair_len, klpair);
654 }
655 return r;
656 }
657
fetch_key_and_len(uint32_t idx,uint32_t * len,void ** key)658 int bn_data::fetch_key_and_len(uint32_t idx, uint32_t *len, void** key) {
659 klpair_struct* klpair = nullptr;
660 uint32_t klpair_len;
661 int r = m_buffer.fetch(idx, &klpair_len, &klpair);
662 if (r == 0) {
663 *len = keylen_from_klpair_len(klpair_len);
664 *key = klpair->key;
665 }
666 return r;
667 }
668
clone(bn_data * orig_bn_data)669 void bn_data::clone(bn_data* orig_bn_data) {
670 toku_mempool_clone(&orig_bn_data->m_buffer_mempool, &m_buffer_mempool);
671 m_buffer.clone(orig_bn_data->m_buffer);
672 this->m_disksize_of_keys = orig_bn_data->m_disksize_of_keys;
673 }
674
675