1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include "portability/toku_atomic.h"
40 
41 #include "ft/cachetable/cachetable.h"
42 #include "ft/ft.h"
43 #include "ft/ft-internal.h"
44 #include "ft/node.h"
45 #include "ft/logger/log-internal.h"
46 #include "ft/txn/rollback.h"
47 #include "ft/serialize/block_allocator.h"
48 #include "ft/serialize/block_table.h"
49 #include "ft/serialize/compress.h"
50 #include "ft/serialize/ft_node-serialize.h"
51 #include "ft/serialize/sub_block.h"
52 #include "util/sort.h"
53 #include "util/threadpool.h"
54 #include "util/status.h"
55 #include "util/scoped_malloc.h"
56 
57 static FT_UPGRADE_STATUS_S ft_upgrade_status;
58 
59 #define STATUS_INIT(k,c,t,l,inc) TOKUFT_STATUS_INIT(ft_upgrade_status, k, c, t, "ft upgrade: " l, inc)
60 
61 static void
status_init(void)62 status_init(void)
63 {
64     // Note, this function initializes the keyname, type, and legend fields.
65     // Value fields are initialized to zero by compiler.
66     STATUS_INIT(FT_UPGRADE_FOOTPRINT,             nullptr, UINT64, "footprint", TOKU_ENGINE_STATUS);
67     ft_upgrade_status.initialized = true;
68 }
69 #undef STATUS_INIT
70 
71 #define UPGRADE_STATUS_VALUE(x) ft_upgrade_status.status[x].value.num
72 
73 void
toku_ft_upgrade_get_status(FT_UPGRADE_STATUS s)74 toku_ft_upgrade_get_status(FT_UPGRADE_STATUS s) {
75     if (!ft_upgrade_status.initialized) {
76         status_init();
77     }
78     UPGRADE_STATUS_VALUE(FT_UPGRADE_FOOTPRINT) = toku_log_upgrade_get_footprint();
79     *s = ft_upgrade_status;
80 }
81 
82 static int num_cores = 0; // cache the number of cores for the parallelization
83 static struct toku_thread_pool *ft_pool = NULL;
84 bool toku_serialize_in_parallel;
85 
get_num_cores(void)86 int get_num_cores(void) {
87     return num_cores;
88 }
89 
get_ft_pool(void)90 struct toku_thread_pool *get_ft_pool(void) {
91     return ft_pool;
92 }
93 
toku_serialize_set_parallel(bool in_parallel)94 void toku_serialize_set_parallel(bool in_parallel) {
95     toku_unsafe_set(&toku_serialize_in_parallel, in_parallel);
96 }
97 
toku_ft_serialize_layer_init(void)98 void toku_ft_serialize_layer_init(void) {
99     num_cores = toku_os_get_number_active_processors();
100     int r = toku_thread_pool_create(&ft_pool, num_cores);
101     lazy_assert_zero(r);
102     toku_serialize_in_parallel = false;
103 }
104 
toku_ft_serialize_layer_destroy(void)105 void toku_ft_serialize_layer_destroy(void) {
106     toku_thread_pool_destroy(&ft_pool);
107 }
108 
109 enum { FILE_CHANGE_INCREMENT = (16 << 20) };
110 
111 static inline uint64_t
alignup64(uint64_t a,uint64_t b)112 alignup64(uint64_t a, uint64_t b) {
113     return ((a+b-1)/b)*b;
114 }
115 
116 // safe_file_size_lock must be held.
117 void
toku_maybe_truncate_file(int fd,uint64_t size_used,uint64_t expected_size,uint64_t * new_sizep)118 toku_maybe_truncate_file (int fd, uint64_t size_used, uint64_t expected_size, uint64_t *new_sizep)
119 // Effect: If file size >= SIZE+32MiB, reduce file size.
120 // (32 instead of 16.. hysteresis).
121 // Return 0 on success, otherwise an error number.
122 {
123     int64_t file_size;
124     {
125         int r = toku_os_get_file_size(fd, &file_size);
126         lazy_assert_zero(r);
127         invariant(file_size >= 0);
128     }
129     invariant(expected_size == (uint64_t)file_size);
130     // If file space is overallocated by at least 32M
131     if ((uint64_t)file_size >= size_used + (2*FILE_CHANGE_INCREMENT)) {
132         toku_off_t new_size = alignup64(size_used, (2*FILE_CHANGE_INCREMENT)); //Truncate to new size_used.
133         invariant(new_size < file_size);
134         invariant(new_size >= 0);
135         int r = ftruncate(fd, new_size);
136         lazy_assert_zero(r);
137         *new_sizep = new_size;
138     }
139     else {
140         *new_sizep = file_size;
141     }
142     return;
143 }
144 
145 static int64_t
min64(int64_t a,int64_t b)146 min64(int64_t a, int64_t b) {
147     if (a<b) return a;
148     return b;
149 }
150 
151 void
toku_maybe_preallocate_in_file(int fd,int64_t size,int64_t expected_size,int64_t * new_size)152 toku_maybe_preallocate_in_file (int fd, int64_t size, int64_t expected_size, int64_t *new_size)
153 // Effect: make the file bigger by either doubling it or growing by 16MiB whichever is less, until it is at least size
154 // Return 0 on success, otherwise an error number.
155 {
156     int64_t file_size = 0;
157     //TODO(yoni): Allow variable stripe_width (perhaps from ft) for larger raids
158     const uint64_t stripe_width = 4096;
159     {
160         int r = toku_os_get_file_size(fd, &file_size);
161         if (r != 0) { // debug #2463
162             int the_errno = get_maybe_error_errno();
163             fprintf(stderr, "%s:%d fd=%d size=%" PRIu64 " r=%d errno=%d\n", __FUNCTION__, __LINE__, fd, size, r, the_errno); fflush(stderr);
164         }
165         lazy_assert_zero(r);
166     }
167     invariant(file_size >= 0);
168     invariant(expected_size == file_size);
169     // We want to double the size of the file, or add 16MiB, whichever is less.
170     // We emulate calling this function repeatedly until it satisfies the request.
171     int64_t to_write = 0;
172     if (file_size == 0) {
173         // Prevent infinite loop by starting with stripe_width as a base case.
174         to_write = stripe_width;
175     }
176     while (file_size + to_write < size) {
177         to_write += alignup64(min64(file_size + to_write, FILE_CHANGE_INCREMENT), stripe_width);
178     }
179     if (to_write > 0) {
180         assert(to_write%512==0);
181         toku::scoped_malloc_aligned wbuf_aligned(to_write, 512);
182         char *wbuf = reinterpret_cast<char *>(wbuf_aligned.get());
183         memset(wbuf, 0, to_write);
184         toku_off_t start_write = alignup64(file_size, stripe_width);
185         invariant(start_write >= file_size);
186         toku_os_full_pwrite(fd, wbuf, to_write, start_write);
187         *new_size = start_write + to_write;
188     }
189     else {
190         *new_size = file_size;
191     }
192 }
193 
194 // Don't include the sub_block header
195 // Overhead calculated in same order fields are written to wbuf
196 enum {
197     node_header_overhead = (8+   // magic "tokunode" or "tokuleaf" or "tokuroll"
198                             4+   // layout_version
199                             4+   // layout_version_original
200                             4),  // build_id
201 };
202 
203 // uncompressed header offsets
204 enum {
205     uncompressed_magic_offset = 0,
206     uncompressed_version_offset = 8,
207 };
208 
209 static uint32_t
serialize_node_header_size(FTNODE node)210 serialize_node_header_size(FTNODE node) {
211     uint32_t retval = 0;
212     retval += 8; // magic
213     retval += sizeof(node->layout_version);
214     retval += sizeof(node->layout_version_original);
215     retval += 4; // BUILD_ID
216     retval += 4; // n_children
217     retval += node->n_children*8; // encode start offset and length of each partition
218     retval += 4; // checksum
219     return retval;
220 }
221 
222 static void
serialize_node_header(FTNODE node,FTNODE_DISK_DATA ndd,struct wbuf * wbuf)223 serialize_node_header(FTNODE node, FTNODE_DISK_DATA ndd, struct wbuf *wbuf) {
224     if (node->height == 0)
225         wbuf_nocrc_literal_bytes(wbuf, "tokuleaf", 8);
226     else
227         wbuf_nocrc_literal_bytes(wbuf, "tokunode", 8);
228     paranoid_invariant(node->layout_version == FT_LAYOUT_VERSION);
229     wbuf_nocrc_int(wbuf, node->layout_version);
230     wbuf_nocrc_int(wbuf, node->layout_version_original);
231     wbuf_nocrc_uint(wbuf, BUILD_ID);
232     wbuf_nocrc_int (wbuf, node->n_children);
233     for (int i=0; i<node->n_children; i++) {
234         assert(BP_SIZE(ndd,i)>0);
235         wbuf_nocrc_int(wbuf, BP_START(ndd, i)); // save the beginning of the partition
236         wbuf_nocrc_int(wbuf, BP_SIZE (ndd, i));         // and the size
237     }
238     // checksum the header
239     uint32_t end_to_end_checksum = toku_x1764_memory(wbuf->buf, wbuf_get_woffset(wbuf));
240     wbuf_nocrc_int(wbuf, end_to_end_checksum);
241     invariant(wbuf->ndone == wbuf->size);
242 }
243 
244 static uint32_t
serialize_ftnode_partition_size(FTNODE node,int i)245 serialize_ftnode_partition_size (FTNODE node, int i)
246 {
247     uint32_t result = 0;
248     paranoid_invariant(node->bp[i].state == PT_AVAIL);
249     result++; // Byte that states what the partition is
250     if (node->height > 0) {
251         NONLEAF_CHILDINFO bnc = BNC(node, i);
252         // number of messages (4 bytes) plus size of the buffer
253         result += (4 + toku_bnc_nbytesinbuf(bnc));
254         // number of offsets (4 bytes) plus an array of 4 byte offsets, for each message tree
255         result += (4 + (4 * bnc->fresh_message_tree.size()));
256         result += (4 + (4 * bnc->stale_message_tree.size()));
257         result += (4 + (4 * bnc->broadcast_list.size()));
258     }
259     else {
260         result += 4 + bn_data::HEADER_LENGTH; // n_entries in buffer table + basement header
261         result += BLB_NBYTESINDATA(node, i);
262     }
263     result += 4; // checksum
264     return result;
265 }
266 
267 #define FTNODE_PARTITION_DMT_LEAVES 0xaa
268 #define FTNODE_PARTITION_MSG_BUFFER 0xbb
269 
UU()270 UU() static int
271 assert_fresh(const int32_t &offset, const uint32_t UU(idx), message_buffer *const msg_buffer) {
272     bool is_fresh = msg_buffer->get_freshness(offset);
273     assert(is_fresh);
274     return 0;
275 }
276 
UU()277 UU() static int
278 assert_stale(const int32_t &offset, const uint32_t UU(idx), message_buffer *const msg_buffer) {
279     bool is_fresh = msg_buffer->get_freshness(offset);
280     assert(!is_fresh);
281     return 0;
282 }
283 
bnc_verify_message_trees(NONLEAF_CHILDINFO UU (bnc))284 static void bnc_verify_message_trees(NONLEAF_CHILDINFO UU(bnc)) {
285 #ifdef TOKU_DEBUG_PARANOID
286     bnc->fresh_message_tree.iterate<message_buffer, assert_fresh>(&bnc->msg_buffer);
287     bnc->stale_message_tree.iterate<message_buffer, assert_stale>(&bnc->msg_buffer);
288 #endif
289 }
290 
291 static int
wbuf_write_offset(const int32_t & offset,const uint32_t UU (idx),struct wbuf * const wb)292 wbuf_write_offset(const int32_t &offset, const uint32_t UU(idx), struct wbuf *const wb) {
293     wbuf_nocrc_int(wb, offset);
294     return 0;
295 }
296 
serialize_child_buffer(NONLEAF_CHILDINFO bnc,struct wbuf * wb)297 static void serialize_child_buffer(NONLEAF_CHILDINFO bnc, struct wbuf *wb) {
298     unsigned char ch = FTNODE_PARTITION_MSG_BUFFER;
299     wbuf_nocrc_char(wb, ch);
300 
301     // serialize the message buffer
302     bnc->msg_buffer.serialize_to_wbuf(wb);
303 
304     // serialize the message trees (num entries, offsets array):
305     // first, verify their contents are consistent with the message buffer
306     bnc_verify_message_trees(bnc);
307 
308     // fresh
309     wbuf_nocrc_int(wb, bnc->fresh_message_tree.size());
310     bnc->fresh_message_tree.iterate<struct wbuf, wbuf_write_offset>(wb);
311 
312     // stale
313     wbuf_nocrc_int(wb, bnc->stale_message_tree.size());
314     bnc->stale_message_tree.iterate<struct wbuf, wbuf_write_offset>(wb);
315 
316     // broadcast
317     wbuf_nocrc_int(wb, bnc->broadcast_list.size());
318     bnc->broadcast_list.iterate<struct wbuf, wbuf_write_offset>(wb);
319 }
320 
321 //
322 // Serialize the i'th partition of node into sb
323 // For leaf nodes, this would be the i'th basement node
324 // For internal nodes, this would be the i'th internal node
325 //
326 static void
serialize_ftnode_partition(FTNODE node,int i,struct sub_block * sb)327 serialize_ftnode_partition(FTNODE node, int i, struct sub_block *sb) {
328     // Caller should have allocated memory.
329     invariant_notnull(sb->uncompressed_ptr);
330     invariant(sb->uncompressed_size > 0);
331     paranoid_invariant(sb->uncompressed_size == serialize_ftnode_partition_size(node, i));
332 
333     //
334     // Now put the data into sb->uncompressed_ptr
335     //
336     struct wbuf wb;
337     wbuf_init(&wb, sb->uncompressed_ptr, sb->uncompressed_size);
338     if (node->height > 0) {
339         // TODO: (Zardosht) possibly exit early if there are no messages
340         serialize_child_buffer(BNC(node, i), &wb);
341     }
342     else {
343         unsigned char ch = FTNODE_PARTITION_DMT_LEAVES;
344         bn_data* bd = BLB_DATA(node, i);
345 
346         wbuf_nocrc_char(&wb, ch);
347         wbuf_nocrc_uint(&wb, bd->num_klpairs());
348 
349         bd->serialize_to_wbuf(&wb);
350     }
351     uint32_t end_to_end_checksum = toku_x1764_memory(sb->uncompressed_ptr, wbuf_get_woffset(&wb));
352     wbuf_nocrc_int(&wb, end_to_end_checksum);
353     invariant(wb.ndone == wb.size);
354     invariant(sb->uncompressed_size==wb.ndone);
355 }
356 
357 //
358 // Takes the data in sb->uncompressed_ptr, and compresses it
359 // into a newly allocated buffer sb->compressed_ptr
360 //
361 static void
compress_ftnode_sub_block(struct sub_block * sb,enum toku_compression_method method)362 compress_ftnode_sub_block(struct sub_block *sb, enum toku_compression_method method) {
363     invariant(sb->compressed_ptr != nullptr);
364     invariant(sb->compressed_size_bound > 0);
365     paranoid_invariant(sb->compressed_size_bound == toku_compress_bound(method, sb->uncompressed_size));
366 
367     //
368     // This probably seems a bit complicated. Here is what is going on.
369     // In PerconaFT 5.0, sub_blocks were compressed and the compressed data
370     // was checksummed. The checksum did NOT include the size of the compressed data
371     // and the size of the uncompressed data. The fields of sub_block only reference the
372     // compressed data, and it is the responsibility of the user of the sub_block
373     // to write the length
374     //
375     // For Dr. No, we want the checksum to also include the size of the compressed data, and the
376     // size of the decompressed data, because this data
377     // may be read off of disk alone, so it must be verifiable alone.
378     //
379     // So, we pass in a buffer to compress_nocrc_sub_block that starts 8 bytes after the beginning
380     // of sb->compressed_ptr, so we have space to put in the sizes, and then run the checksum.
381     //
382     sb->compressed_size = compress_nocrc_sub_block(
383         sb,
384         (char *)sb->compressed_ptr + 8,
385         sb->compressed_size_bound,
386         method
387         );
388 
389     uint32_t* extra = (uint32_t *)(sb->compressed_ptr);
390     // store the compressed and uncompressed size at the beginning
391     extra[0] = toku_htod32(sb->compressed_size);
392     extra[1] = toku_htod32(sb->uncompressed_size);
393     // now checksum the entire thing
394     sb->compressed_size += 8; // now add the eight bytes that we saved for the sizes
395     sb->xsum = toku_x1764_memory(sb->compressed_ptr,sb->compressed_size);
396 
397     //
398     // This is the end result for Dr. No and forward. For ftnodes, sb->compressed_ptr contains
399     // two integers at the beginning, the size and uncompressed size, and then the compressed
400     // data. sb->xsum contains the checksum of this entire thing.
401     //
402     // In PerconaFT 5.0, sb->compressed_ptr only contained the compressed data, sb->xsum
403     // checksummed only the compressed data, and the checksumming of the sizes were not
404     // done here.
405     //
406 }
407 
408 //
409 // Returns the size needed to serialize the ftnode info
410 // Does not include header information that is common with rollback logs
411 // such as the magic, layout_version, and build_id
412 // Includes only node specific info such as pivot information, n_children, and so on
413 //
414 static uint32_t
serialize_ftnode_info_size(FTNODE node)415 serialize_ftnode_info_size(FTNODE node)
416 {
417     uint32_t retval = 0;
418     retval += 8; // max_msn_applied_to_node_on_disk
419     retval += 4; // nodesize
420     retval += 4; // flags
421     retval += 4; // height;
422     retval += 8; // oldest_referenced_xid_known
423     retval += node->pivotkeys.serialized_size();
424     retval += (node->n_children-1)*4; // encode length of each pivot
425     if (node->height > 0) {
426         retval += node->n_children*8; // child blocknum's
427     }
428     retval += 4; // checksum
429     return retval;
430 }
431 
serialize_ftnode_info(FTNODE node,SUB_BLOCK sb)432 static void serialize_ftnode_info(FTNODE node, SUB_BLOCK sb) {
433     // Memory must have been allocated by our caller.
434     invariant(sb->uncompressed_size > 0);
435     invariant_notnull(sb->uncompressed_ptr);
436     paranoid_invariant(sb->uncompressed_size == serialize_ftnode_info_size(node));
437 
438     struct wbuf wb;
439     wbuf_init(&wb, sb->uncompressed_ptr, sb->uncompressed_size);
440 
441     wbuf_MSN(&wb, node->max_msn_applied_to_node_on_disk);
442     wbuf_nocrc_uint(&wb, 0); // write a dummy value for where node->nodesize used to be
443     wbuf_nocrc_uint(&wb, node->flags);
444     wbuf_nocrc_int (&wb, node->height);
445     wbuf_TXNID(&wb, node->oldest_referenced_xid_known);
446     node->pivotkeys.serialize_to_wbuf(&wb);
447 
448     // child blocks, only for internal nodes
449     if (node->height > 0) {
450         for (int i = 0; i < node->n_children; i++) {
451             wbuf_nocrc_BLOCKNUM(&wb, BP_BLOCKNUM(node,i));
452         }
453     }
454 
455     uint32_t end_to_end_checksum = toku_x1764_memory(sb->uncompressed_ptr, wbuf_get_woffset(&wb));
456     wbuf_nocrc_int(&wb, end_to_end_checksum);
457     invariant(wb.ndone == wb.size);
458     invariant(sb->uncompressed_size==wb.ndone);
459 }
460 
461 // This is the size of the uncompressed data, not including the compression headers
462 unsigned int
toku_serialize_ftnode_size(FTNODE node)463 toku_serialize_ftnode_size (FTNODE node) {
464     unsigned int result = 0;
465     //
466     // As of now, this seems to be called if and only if the entire node is supposed
467     // to be in memory, so we will assert it.
468     //
469     toku_ftnode_assert_fully_in_memory(node);
470     result += serialize_node_header_size(node);
471     result += serialize_ftnode_info_size(node);
472     for (int i = 0; i < node->n_children; i++) {
473         result += serialize_ftnode_partition_size(node,i);
474     }
475     return result;
476 }
477 
478 struct serialize_times {
479     tokutime_t serialize_time;
480     tokutime_t compress_time;
481 };
482 
483 static void
serialize_and_compress_partition(FTNODE node,int childnum,enum toku_compression_method compression_method,SUB_BLOCK sb,struct serialize_times * st)484 serialize_and_compress_partition(FTNODE node,
485                                  int childnum,
486                                  enum toku_compression_method compression_method,
487                                  SUB_BLOCK sb,
488                                  struct serialize_times *st)
489 {
490     // serialize, compress, update status
491     tokutime_t t0 = toku_time_now();
492     serialize_ftnode_partition(node, childnum, sb);
493     tokutime_t t1 = toku_time_now();
494     compress_ftnode_sub_block(sb, compression_method);
495     tokutime_t t2 = toku_time_now();
496 
497     st->serialize_time += t1 - t0;
498     st->compress_time += t2 - t1;
499 }
500 
501 void
toku_create_compressed_partition_from_available(FTNODE node,int childnum,enum toku_compression_method compression_method,SUB_BLOCK sb)502 toku_create_compressed_partition_from_available(
503     FTNODE node,
504     int childnum,
505     enum toku_compression_method compression_method,
506     SUB_BLOCK sb
507     )
508 {
509     tokutime_t t0 = toku_time_now();
510 
511     // serialize
512     sb->uncompressed_size = serialize_ftnode_partition_size(node, childnum);
513     toku::scoped_malloc uncompressed_buf(sb->uncompressed_size);
514     sb->uncompressed_ptr = uncompressed_buf.get();
515     serialize_ftnode_partition(node, childnum, sb);
516 
517     tokutime_t t1 = toku_time_now();
518 
519     // compress. no need to pad with extra bytes for sizes/xsum - we're not storing them
520     set_compressed_size_bound(sb, compression_method);
521     sb->compressed_ptr = toku_xmalloc(sb->compressed_size_bound);
522     sb->compressed_size = compress_nocrc_sub_block(
523         sb,
524         sb->compressed_ptr,
525         sb->compressed_size_bound,
526         compression_method
527         );
528     sb->uncompressed_ptr = NULL;
529 
530     tokutime_t t2 = toku_time_now();
531 
532     toku_ft_status_update_serialize_times(node, t1 - t0, t2 - t1);
533 }
534 
535 static void
serialize_and_compress_serially(FTNODE node,int npartitions,enum toku_compression_method compression_method,struct sub_block sb[],struct serialize_times * st)536 serialize_and_compress_serially(FTNODE node,
537                                 int npartitions,
538                                 enum toku_compression_method compression_method,
539                                 struct sub_block sb[],
540                                 struct serialize_times *st) {
541     for (int i = 0; i < npartitions; i++) {
542         serialize_and_compress_partition(node, i, compression_method, &sb[i], st);
543     }
544 }
545 
546 struct serialize_compress_work {
547     struct work base;
548     FTNODE node;
549     int i;
550     enum toku_compression_method compression_method;
551     struct sub_block *sb;
552     struct serialize_times st;
553 };
554 
555 static void *
serialize_and_compress_worker(void * arg)556 serialize_and_compress_worker(void *arg) {
557     struct workset *ws = (struct workset *) arg;
558     while (1) {
559         struct serialize_compress_work *w = (struct serialize_compress_work *) workset_get(ws);
560         if (w == NULL)
561             break;
562         int i = w->i;
563         serialize_and_compress_partition(w->node, i, w->compression_method, &w->sb[i], &w->st);
564     }
565     workset_release_ref(ws);
566     return arg;
567 }
568 
569 static void
serialize_and_compress_in_parallel(FTNODE node,int npartitions,enum toku_compression_method compression_method,struct sub_block sb[],struct serialize_times * st)570 serialize_and_compress_in_parallel(FTNODE node,
571                                    int npartitions,
572                                    enum toku_compression_method compression_method,
573                                    struct sub_block sb[],
574                                    struct serialize_times *st) {
575     if (npartitions == 1) {
576         serialize_and_compress_partition(node, 0, compression_method, &sb[0], st);
577     } else {
578         int T = num_cores;
579         if (T > npartitions)
580             T = npartitions;
581         if (T > 0)
582             T = T - 1;
583         struct workset ws;
584         ZERO_STRUCT(ws);
585         workset_init(&ws);
586         struct serialize_compress_work work[npartitions];
587         workset_lock(&ws);
588         for (int i = 0; i < npartitions; i++) {
589             work[i] = (struct serialize_compress_work) { .base = {{NULL, NULL}},
590                                                          .node = node,
591                                                          .i = i,
592                                                          .compression_method = compression_method,
593                                                          .sb = sb,
594                                                          .st = { .serialize_time = 0, .compress_time = 0} };
595             workset_put_locked(&ws, &work[i].base);
596         }
597         workset_unlock(&ws);
598         toku_thread_pool_run(ft_pool, 0, &T, serialize_and_compress_worker, &ws);
599         workset_add_ref(&ws, T);
600         serialize_and_compress_worker(&ws);
601         workset_join(&ws);
602         workset_destroy(&ws);
603 
604         // gather up the statistics from each thread's work item
605         for (int i = 0; i < npartitions; i++) {
606             st->serialize_time += work[i].st.serialize_time;
607             st->compress_time += work[i].st.compress_time;
608         }
609     }
610 }
611 
612 static void
serialize_and_compress_sb_node_info(FTNODE node,struct sub_block * sb,enum toku_compression_method compression_method,struct serialize_times * st)613 serialize_and_compress_sb_node_info(FTNODE node, struct sub_block *sb,
614         enum toku_compression_method compression_method, struct serialize_times *st) {
615     // serialize, compress, update serialize times.
616     tokutime_t t0 = toku_time_now();
617     serialize_ftnode_info(node, sb);
618     tokutime_t t1 = toku_time_now();
619     compress_ftnode_sub_block(sb, compression_method);
620     tokutime_t t2 = toku_time_now();
621 
622     st->serialize_time += t1 - t0;
623     st->compress_time += t2 - t1;
624 }
625 
toku_serialize_ftnode_to_memory(FTNODE node,FTNODE_DISK_DATA * ndd,unsigned int basementnodesize,enum toku_compression_method compression_method,bool do_rebalancing,bool in_parallel,size_t * n_bytes_to_write,size_t * n_uncompressed_bytes,char ** bytes_to_write)626 int toku_serialize_ftnode_to_memory(FTNODE node,
627                                     FTNODE_DISK_DATA* ndd,
628                                     unsigned int basementnodesize,
629                                     enum toku_compression_method compression_method,
630                                     bool do_rebalancing,
631                                     bool in_parallel, // for loader is true, for toku_ftnode_flush_callback, is false
632                             /*out*/ size_t *n_bytes_to_write,
633                             /*out*/ size_t *n_uncompressed_bytes,
634                             /*out*/ char  **bytes_to_write)
635 // Effect: Writes out each child to a separate malloc'd buffer, then compresses
636 //   all of them, and writes the uncompressed header, to bytes_to_write,
637 //   which is malloc'd.
638 //
639 //   The resulting buffer is guaranteed to be 512-byte aligned and the total length is a multiple of 512 (so we pad with zeros at the end if needed).
640 //   512-byte padding is for O_DIRECT to work.
641 {
642     toku_ftnode_assert_fully_in_memory(node);
643 
644     if (do_rebalancing && node->height == 0) {
645         toku_ftnode_leaf_rebalance(node, basementnodesize);
646     }
647     const int npartitions = node->n_children;
648 
649     // Each partition represents a compressed sub block
650     // For internal nodes, a sub block is a message buffer
651     // For leaf nodes, a sub block is a basement node
652     toku::scoped_calloc sb_buf(sizeof(struct sub_block) * npartitions);
653     struct sub_block *sb = reinterpret_cast<struct sub_block *>(sb_buf.get());
654     XREALLOC_N(npartitions, *ndd);
655 
656     //
657     // First, let's serialize and compress the individual sub blocks
658     //
659 
660     // determine how large our serialization and compression buffers need to be.
661     size_t serialize_buf_size = 0, compression_buf_size = 0;
662     for (int i = 0; i < node->n_children; i++) {
663         sb[i].uncompressed_size = serialize_ftnode_partition_size(node, i);
664         sb[i].compressed_size_bound = toku_compress_bound(compression_method, sb[i].uncompressed_size);
665         serialize_buf_size += sb[i].uncompressed_size;
666         compression_buf_size += sb[i].compressed_size_bound + 8; // add 8 extra bytes, 4 for compressed size, 4 for decompressed size
667     }
668 
669     // give each sub block a base pointer to enough buffer space for serialization and compression
670     toku::scoped_malloc serialize_buf(serialize_buf_size);
671     toku::scoped_malloc compression_buf(compression_buf_size);
672     for (size_t i = 0, uncompressed_offset = 0, compressed_offset = 0; i < (size_t) node->n_children; i++) {
673         sb[i].uncompressed_ptr = reinterpret_cast<char *>(serialize_buf.get()) + uncompressed_offset;
674         sb[i].compressed_ptr = reinterpret_cast<char *>(compression_buf.get()) + compressed_offset;
675         uncompressed_offset += sb[i].uncompressed_size;
676         compressed_offset += sb[i].compressed_size_bound + 8; // add 8 extra bytes, 4 for compressed size, 4 for decompressed size
677         invariant(uncompressed_offset <= serialize_buf_size);
678         invariant(compressed_offset <= compression_buf_size);
679     }
680 
681     // do the actual serialization now that we have buffer space
682     struct serialize_times st = { 0, 0 };
683     if (in_parallel) {
684         serialize_and_compress_in_parallel(node, npartitions, compression_method, sb, &st);
685     } else {
686         serialize_and_compress_serially(node, npartitions, compression_method, sb, &st);
687     }
688 
689     //
690     // Now lets create a sub-block that has the common node information,
691     // This does NOT include the header
692     //
693 
694     // determine how large our serialization and copmression buffers need to be
695     struct sub_block sb_node_info;
696     sub_block_init(&sb_node_info);
697     size_t sb_node_info_uncompressed_size = serialize_ftnode_info_size(node);
698     size_t sb_node_info_compressed_size_bound = toku_compress_bound(compression_method, sb_node_info_uncompressed_size);
699     toku::scoped_malloc sb_node_info_uncompressed_buf(sb_node_info_uncompressed_size);
700     toku::scoped_malloc sb_node_info_compressed_buf(sb_node_info_compressed_size_bound + 8); // add 8 extra bytes, 4 for compressed size, 4 for decompressed size
701     sb_node_info.uncompressed_size = sb_node_info_uncompressed_size;
702     sb_node_info.uncompressed_ptr = sb_node_info_uncompressed_buf.get();
703     sb_node_info.compressed_size_bound = sb_node_info_compressed_size_bound;
704     sb_node_info.compressed_ptr = sb_node_info_compressed_buf.get();
705 
706     // do the actual serialization now that we have buffer space
707     serialize_and_compress_sb_node_info(node, &sb_node_info, compression_method, &st);
708 
709     //
710     // At this point, we have compressed each of our pieces into individual sub_blocks,
711     // we can put the header and all the subblocks into a single buffer and return it.
712     //
713 
714     // update the serialize times, ignore the header for simplicity. we captured all
715     // of the partitions' serialize times so that's probably good enough.
716     toku_ft_status_update_serialize_times(node, st.serialize_time, st.compress_time);
717 
718     // The total size of the node is:
719     // size of header + disk size of the n+1 sub_block's created above
720     uint32_t total_node_size = (serialize_node_header_size(node) // uncompressed header
721                                  + sb_node_info.compressed_size   // compressed nodeinfo (without its checksum)
722                                  + 4);                            // nodeinfo's checksum
723     uint32_t total_uncompressed_size = (serialize_node_header_size(node) // uncompressed header
724                                  + sb_node_info.uncompressed_size   // uncompressed nodeinfo (without its checksum)
725                                  + 4);                            // nodeinfo's checksum
726     // store the BP_SIZESs
727     for (int i = 0; i < node->n_children; i++) {
728         uint32_t len         = sb[i].compressed_size + 4; // data and checksum
729         BP_SIZE (*ndd,i) = len;
730         BP_START(*ndd,i) = total_node_size;
731         total_node_size += sb[i].compressed_size + 4;
732         total_uncompressed_size += sb[i].uncompressed_size + 4;
733     }
734 
735     // now create the final serialized node
736     uint32_t total_buffer_size = roundup_to_multiple(512, total_node_size); // make the buffer be 512 bytes.
737     char *XMALLOC_N_ALIGNED(512, total_buffer_size, data);
738     char *curr_ptr = data;
739 
740     // write the header
741     struct wbuf wb;
742     wbuf_init(&wb, curr_ptr, serialize_node_header_size(node));
743     serialize_node_header(node, *ndd, &wb);
744     assert(wb.ndone == wb.size);
745     curr_ptr += serialize_node_header_size(node);
746 
747     // now write sb_node_info
748     memcpy(curr_ptr, sb_node_info.compressed_ptr, sb_node_info.compressed_size);
749     curr_ptr += sb_node_info.compressed_size;
750     // write the checksum
751     *(uint32_t *)curr_ptr = toku_htod32(sb_node_info.xsum);
752     curr_ptr += sizeof(sb_node_info.xsum);
753 
754     for (int i = 0; i < npartitions; i++) {
755         memcpy(curr_ptr, sb[i].compressed_ptr, sb[i].compressed_size);
756         curr_ptr += sb[i].compressed_size;
757         // write the checksum
758         *(uint32_t *)curr_ptr = toku_htod32(sb[i].xsum);
759         curr_ptr += sizeof(sb[i].xsum);
760     }
761     // Zero the rest of the buffer
762     memset(data + total_node_size, 0, total_buffer_size - total_node_size);
763 
764     assert(curr_ptr - data == total_node_size);
765     *bytes_to_write = data;
766     *n_bytes_to_write = total_buffer_size;
767     *n_uncompressed_bytes = total_uncompressed_size;
768 
769     invariant(*n_bytes_to_write % 512 == 0);
770     invariant(reinterpret_cast<unsigned long long>(*bytes_to_write) % 512 == 0);
771     return 0;
772 }
773 
toku_serialize_ftnode_to(int fd,BLOCKNUM blocknum,FTNODE node,FTNODE_DISK_DATA * ndd,bool do_rebalancing,FT ft,bool for_checkpoint)774 int toku_serialize_ftnode_to(int fd,
775                              BLOCKNUM blocknum,
776                              FTNODE node,
777                              FTNODE_DISK_DATA *ndd,
778                              bool do_rebalancing,
779                              FT ft,
780                              bool for_checkpoint) {
781     size_t n_to_write;
782     size_t n_uncompressed_bytes;
783     char *compressed_buf = nullptr;
784 
785     // because toku_serialize_ftnode_to is only called for
786     // in toku_ftnode_flush_callback, we pass false
787     // for in_parallel. The reasoning is that when we write
788     // nodes to disk via toku_ftnode_flush_callback, we
789     // assume that it is being done on a non-critical
790     // background thread (probably for checkpointing), and therefore
791     // should not hog CPU,
792     //
793     // Should the above facts change, we may want to revisit
794     // passing false for in_parallel here
795     //
796     // alternatively, we could have made in_parallel a parameter
797     // for toku_serialize_ftnode_to, but instead we did this.
798     int r = toku_serialize_ftnode_to_memory(
799         node,
800         ndd,
801         ft->h->basementnodesize,
802         ft->h->compression_method,
803         do_rebalancing,
804         toku_unsafe_fetch(&toku_serialize_in_parallel),
805         &n_to_write,
806         &n_uncompressed_bytes,
807         &compressed_buf);
808     if (r != 0) {
809         return r;
810     }
811 
812     // If the node has never been written, then write the whole buffer,
813     // including the zeros
814     invariant(blocknum.b >= 0);
815     DISKOFF offset;
816 
817     // Dirties the ft
818     ft->blocktable.realloc_on_disk(
819         blocknum, n_to_write, &offset, ft, fd, for_checkpoint);
820 
821     tokutime_t t0 = toku_time_now();
822     toku_os_full_pwrite(fd, compressed_buf, n_to_write, offset);
823     tokutime_t t1 = toku_time_now();
824 
825     tokutime_t io_time = t1 - t0;
826     toku_ft_status_update_flush_reason(
827         node, n_uncompressed_bytes, n_to_write, io_time, for_checkpoint);
828 
829     toku_free(compressed_buf);
830     node->clear_dirty();  // See #1957.   Must set the node to be clean after
831                       // serializing it so that it doesn't get written again on
832                       // the next checkpoint or eviction.
833     if (node->height == 0) {
834         for (int i = 0; i < node->n_children; i++) {
835             if (BP_STATE(node, i) == PT_AVAIL) {
836                 BLB_LRD(node, i) = 0;
837             }
838         }
839     }
840     return 0;
841 }
842 
843 static void
sort_and_steal_offset_arrays(NONLEAF_CHILDINFO bnc,const toku::comparator & cmp,int32_t ** fresh_offsets,int32_t nfresh,int32_t ** stale_offsets,int32_t nstale,int32_t ** broadcast_offsets,int32_t nbroadcast)844 sort_and_steal_offset_arrays(NONLEAF_CHILDINFO bnc,
845                              const toku::comparator &cmp,
846                              int32_t **fresh_offsets, int32_t nfresh,
847                              int32_t **stale_offsets, int32_t nstale,
848                              int32_t **broadcast_offsets, int32_t nbroadcast) {
849     // We always have fresh / broadcast offsets (even if they are empty)
850     // but we may not have stale offsets, in the case of v13 upgrade.
851     invariant(fresh_offsets != nullptr);
852     invariant(broadcast_offsets != nullptr);
853     invariant(cmp.valid());
854 
855     typedef toku::sort<int32_t, const struct toku_msg_buffer_key_msn_cmp_extra, toku_msg_buffer_key_msn_cmp> msn_sort;
856 
857     const int32_t n_in_this_buffer = nfresh + nstale + nbroadcast;
858     struct toku_msg_buffer_key_msn_cmp_extra extra(cmp, &bnc->msg_buffer);
859     msn_sort::mergesort_r(*fresh_offsets, nfresh, extra);
860     bnc->fresh_message_tree.destroy();
861     bnc->fresh_message_tree.create_steal_sorted_array(fresh_offsets, nfresh, n_in_this_buffer);
862     if (stale_offsets) {
863         msn_sort::mergesort_r(*stale_offsets, nstale, extra);
864         bnc->stale_message_tree.destroy();
865         bnc->stale_message_tree.create_steal_sorted_array(stale_offsets, nstale, n_in_this_buffer);
866     }
867     bnc->broadcast_list.destroy();
868     bnc->broadcast_list.create_steal_sorted_array(broadcast_offsets, nbroadcast, n_in_this_buffer);
869 }
870 
871 static MSN
deserialize_child_buffer_v13(FT ft,NONLEAF_CHILDINFO bnc,struct rbuf * rb)872 deserialize_child_buffer_v13(FT ft, NONLEAF_CHILDINFO bnc, struct rbuf *rb) {
873     // We skip 'stale' offsets for upgraded nodes.
874     int32_t nfresh = 0, nbroadcast = 0;
875     int32_t *fresh_offsets = nullptr, *broadcast_offsets = nullptr;
876 
877     // Only sort buffers if we have a valid comparison function. In certain scenarios,
878     // like deserialie_ft_versioned() or tokuftdump, we'll need to deserialize ftnodes
879     // for simple inspection and don't actually require that the message buffers are
880     // properly sorted. This is very ugly, but correct.
881     const bool sort = ft->cmp.valid();
882 
883     MSN highest_msn_in_this_buffer =
884         bnc->msg_buffer.deserialize_from_rbuf_v13(rb, &ft->h->highest_unused_msn_for_upgrade,
885                                                   sort ? &fresh_offsets : nullptr, &nfresh,
886                                                   sort ? &broadcast_offsets : nullptr, &nbroadcast);
887 
888     if (sort) {
889         sort_and_steal_offset_arrays(bnc, ft->cmp,
890                                      &fresh_offsets, nfresh,
891                                      nullptr, 0, // no stale offsets
892                                      &broadcast_offsets, nbroadcast);
893     }
894 
895     return highest_msn_in_this_buffer;
896 }
897 
898 static void
deserialize_child_buffer_v26(NONLEAF_CHILDINFO bnc,struct rbuf * rb,const toku::comparator & cmp)899 deserialize_child_buffer_v26(NONLEAF_CHILDINFO bnc, struct rbuf *rb, const toku::comparator &cmp) {
900     int32_t nfresh = 0, nstale = 0, nbroadcast = 0;
901     int32_t *fresh_offsets, *stale_offsets, *broadcast_offsets;
902 
903     // Only sort buffers if we have a valid comparison function. In certain scenarios,
904     // like deserialie_ft_versioned() or tokuftdump, we'll need to deserialize ftnodes
905     // for simple inspection and don't actually require that the message buffers are
906     // properly sorted. This is very ugly, but correct.
907     const bool sort = cmp.valid();
908 
909     // read in the message buffer
910     bnc->msg_buffer.deserialize_from_rbuf(rb,
911                                           sort ? &fresh_offsets : nullptr, &nfresh,
912                                           sort ? &stale_offsets : nullptr, &nstale,
913                                           sort ? &broadcast_offsets : nullptr, &nbroadcast);
914 
915     if (sort) {
916         sort_and_steal_offset_arrays(bnc, cmp,
917                                      &fresh_offsets, nfresh,
918                                      &stale_offsets, nstale,
919                                      &broadcast_offsets, nbroadcast);
920     }
921 }
922 
923 static void
deserialize_child_buffer(NONLEAF_CHILDINFO bnc,struct rbuf * rb)924 deserialize_child_buffer(NONLEAF_CHILDINFO bnc, struct rbuf *rb) {
925     // read in the message buffer
926     bnc->msg_buffer.deserialize_from_rbuf(rb,
927                                           nullptr, nullptr,  // fresh_offsets, nfresh,
928                                           nullptr, nullptr,  // stale_offsets, nstale,
929                                           nullptr, nullptr); // broadcast_offsets, nbroadcast
930 
931     // read in each message tree (fresh, stale, broadcast)
932     int32_t nfresh = rbuf_int(rb);
933     int32_t *XMALLOC_N(nfresh, fresh_offsets);
934     for (int i = 0; i < nfresh; i++) {
935         fresh_offsets[i] = rbuf_int(rb);
936     }
937 
938     int32_t nstale = rbuf_int(rb);
939     int32_t *XMALLOC_N(nstale, stale_offsets);
940     for (int i = 0; i < nstale; i++) {
941         stale_offsets[i] = rbuf_int(rb);
942     }
943 
944     int32_t nbroadcast = rbuf_int(rb);
945     int32_t *XMALLOC_N(nbroadcast, broadcast_offsets);
946     for (int i = 0; i < nbroadcast; i++) {
947         broadcast_offsets[i] = rbuf_int(rb);
948     }
949 
950     // build OMTs out of each offset array
951     bnc->fresh_message_tree.destroy();
952     bnc->fresh_message_tree.create_steal_sorted_array(&fresh_offsets, nfresh, nfresh);
953     bnc->stale_message_tree.destroy();
954     bnc->stale_message_tree.create_steal_sorted_array(&stale_offsets, nstale, nstale);
955     bnc->broadcast_list.destroy();
956     bnc->broadcast_list.create_steal_sorted_array(&broadcast_offsets, nbroadcast, nbroadcast);
957 }
958 
959 // dump a buffer to stderr
960 // no locking around this for now
961 void
dump_bad_block(unsigned char * vp,uint64_t size)962 dump_bad_block(unsigned char *vp, uint64_t size) {
963     const uint64_t linesize = 64;
964     uint64_t n = size / linesize;
965     for (uint64_t i = 0; i < n; i++) {
966         fprintf(stderr, "%p: ", vp);
967         for (uint64_t j = 0; j < linesize; j++) {
968             unsigned char c = vp[j];
969             fprintf(stderr, "%2.2X", c);
970         }
971         fprintf(stderr, "\n");
972         vp += linesize;
973     }
974     size = size % linesize;
975     for (uint64_t i=0; i<size; i++) {
976         if ((i % linesize) == 0)
977             fprintf(stderr, "%p: ", vp+i);
978         fprintf(stderr, "%2.2X", vp[i]);
979         if (((i+1) % linesize) == 0)
980             fprintf(stderr, "\n");
981     }
982     fprintf(stderr, "\n");
983 }
984 
985 ////////////////////////////////////////////////////////////////////
986 ////////////////////////////////////////////////////////////////////
987 ////////////////////////////////////////////////////////////////////
988 ////////////////////////////////////////////////////////////////////
989 ////////////////////////////////////////////////////////////////////
990 ////////////////////////////////////////////////////////////////////
991 ////////////////////////////////////////////////////////////////////
992 ////////////////////////////////////////////////////////////////////
993 
toku_create_empty_bn(void)994 BASEMENTNODE toku_create_empty_bn(void) {
995     BASEMENTNODE bn = toku_create_empty_bn_no_buffer();
996     bn->data_buffer.initialize_empty();
997     return bn;
998 }
999 
toku_clone_bn(BASEMENTNODE orig_bn)1000 BASEMENTNODE toku_clone_bn(BASEMENTNODE orig_bn) {
1001     BASEMENTNODE bn = toku_create_empty_bn_no_buffer();
1002     bn->max_msn_applied = orig_bn->max_msn_applied;
1003     bn->seqinsert = orig_bn->seqinsert;
1004     bn->stale_ancestor_messages_applied = orig_bn->stale_ancestor_messages_applied;
1005     bn->stat64_delta = orig_bn->stat64_delta;
1006     bn->logical_rows_delta = orig_bn->logical_rows_delta;
1007     bn->data_buffer.clone(&orig_bn->data_buffer);
1008     return bn;
1009 }
1010 
toku_create_empty_bn_no_buffer(void)1011 BASEMENTNODE toku_create_empty_bn_no_buffer(void) {
1012     BASEMENTNODE XMALLOC(bn);
1013     bn->max_msn_applied.msn = 0;
1014     bn->seqinsert = 0;
1015     bn->stale_ancestor_messages_applied = false;
1016     bn->stat64_delta = ZEROSTATS;
1017     bn->logical_rows_delta = 0;
1018     bn->data_buffer.init_zero();
1019     return bn;
1020 }
1021 
toku_create_empty_nl(void)1022 NONLEAF_CHILDINFO toku_create_empty_nl(void) {
1023     NONLEAF_CHILDINFO XMALLOC(cn);
1024     cn->msg_buffer.create();
1025     cn->fresh_message_tree.create_no_array();
1026     cn->stale_message_tree.create_no_array();
1027     cn->broadcast_list.create_no_array();
1028     memset(cn->flow, 0, sizeof cn->flow);
1029     return cn;
1030 }
1031 
1032 // must clone the OMTs, since we serialize them along with the message buffer
toku_clone_nl(NONLEAF_CHILDINFO orig_childinfo)1033 NONLEAF_CHILDINFO toku_clone_nl(NONLEAF_CHILDINFO orig_childinfo) {
1034     NONLEAF_CHILDINFO XMALLOC(cn);
1035     cn->msg_buffer.clone(&orig_childinfo->msg_buffer);
1036     cn->fresh_message_tree.create_no_array();
1037     cn->fresh_message_tree.clone(orig_childinfo->fresh_message_tree);
1038     cn->stale_message_tree.create_no_array();
1039     cn->stale_message_tree.clone(orig_childinfo->stale_message_tree);
1040     cn->broadcast_list.create_no_array();
1041     cn->broadcast_list.clone(orig_childinfo->broadcast_list);
1042     memset(cn->flow, 0, sizeof cn->flow);
1043     return cn;
1044 }
1045 
destroy_basement_node(BASEMENTNODE bn)1046 void destroy_basement_node (BASEMENTNODE bn)
1047 {
1048     bn->data_buffer.destroy();
1049     toku_free(bn);
1050 }
1051 
destroy_nonleaf_childinfo(NONLEAF_CHILDINFO nl)1052 void destroy_nonleaf_childinfo (NONLEAF_CHILDINFO nl)
1053 {
1054     nl->msg_buffer.destroy();
1055     nl->fresh_message_tree.destroy();
1056     nl->stale_message_tree.destroy();
1057     nl->broadcast_list.destroy();
1058     toku_free(nl);
1059 }
1060 
read_block_from_fd_into_rbuf(int fd,BLOCKNUM blocknum,FT ft,struct rbuf * rb)1061 void read_block_from_fd_into_rbuf(
1062     int fd,
1063     BLOCKNUM blocknum,
1064     FT ft,
1065     struct rbuf *rb
1066     )
1067 {
1068     // get the file offset and block size for the block
1069     DISKOFF offset, size;
1070     ft->blocktable.translate_blocknum_to_offset_size(blocknum, &offset, &size);
1071     DISKOFF size_aligned = roundup_to_multiple(512, size);
1072     uint8_t *XMALLOC_N_ALIGNED(512, size_aligned, raw_block);
1073     rbuf_init(rb, raw_block, size);
1074     // read the block
1075     ssize_t rlen = toku_os_pread(fd, raw_block, size_aligned, offset);
1076     assert((DISKOFF)rlen >= size);
1077     assert((DISKOFF)rlen <= size_aligned);
1078 }
1079 
1080 static const int read_header_heuristic_max = 32*1024;
1081 
1082 #ifndef MIN
1083 #define MIN(a,b) (((a)>(b)) ? (b) : (a))
1084 #endif
1085 
1086 // Effect: If the header part of the node is small enough, then read it into the rbuf.  The rbuf will be allocated to be big enough in any case.
read_ftnode_header_from_fd_into_rbuf_if_small_enough(int fd,BLOCKNUM blocknum,FT ft,struct rbuf * rb,ftnode_fetch_extra * bfe)1087 static void read_ftnode_header_from_fd_into_rbuf_if_small_enough(int fd, BLOCKNUM blocknum,
1088                                                                  FT ft, struct rbuf *rb,
1089                                                                  ftnode_fetch_extra *bfe) {
1090     DISKOFF offset, size;
1091     ft->blocktable.translate_blocknum_to_offset_size(blocknum, &offset, &size);
1092     DISKOFF read_size = roundup_to_multiple(512, MIN(read_header_heuristic_max, size));
1093     uint8_t *XMALLOC_N_ALIGNED(512, roundup_to_multiple(512, size), raw_block);
1094     rbuf_init(rb, raw_block, read_size);
1095 
1096     // read the block
1097     tokutime_t t0 = toku_time_now();
1098     ssize_t rlen = toku_os_pread(fd, raw_block, read_size, offset);
1099     tokutime_t t1 = toku_time_now();
1100 
1101     assert(rlen >= 0);
1102     rbuf_init(rb, raw_block, rlen);
1103 
1104     bfe->bytes_read = rlen;
1105     bfe->io_time = t1 - t0;
1106     toku_ft_status_update_pivot_fetch_reason(bfe);
1107 }
1108 
1109 //
1110 // read the compressed partition into the sub_block,
1111 // validate the checksum of the compressed data
1112 //
1113 int
read_compressed_sub_block(struct rbuf * rb,struct sub_block * sb)1114 read_compressed_sub_block(struct rbuf *rb, struct sub_block *sb)
1115 {
1116     int r = 0;
1117     sb->compressed_size = rbuf_int(rb);
1118     sb->uncompressed_size = rbuf_int(rb);
1119     const void **cp = (const void **) &sb->compressed_ptr;
1120     rbuf_literal_bytes(rb, cp, sb->compressed_size);
1121     sb->xsum = rbuf_int(rb);
1122     // let's check the checksum
1123     uint32_t actual_xsum = toku_x1764_memory((char *)sb->compressed_ptr-8, 8+sb->compressed_size);
1124     if (sb->xsum != actual_xsum) {
1125         r = TOKUDB_BAD_CHECKSUM;
1126     }
1127     return r;
1128 }
1129 
1130 static int
read_and_decompress_sub_block(struct rbuf * rb,struct sub_block * sb)1131 read_and_decompress_sub_block(struct rbuf *rb, struct sub_block *sb)
1132 {
1133     int r = 0;
1134     r = read_compressed_sub_block(rb, sb);
1135     if (r != 0) {
1136         goto exit;
1137     }
1138 
1139     just_decompress_sub_block(sb);
1140 exit:
1141     return r;
1142 }
1143 
1144 // Allocates space for the sub-block and de-compresses the data from
1145 // the supplied compressed pointer..
1146 void
just_decompress_sub_block(struct sub_block * sb)1147 just_decompress_sub_block(struct sub_block *sb)
1148 {
1149     // <CER> TODO: Add assert that the subblock was read in.
1150     sb->uncompressed_ptr = toku_xmalloc(sb->uncompressed_size);
1151 
1152     toku_decompress(
1153         (Bytef *) sb->uncompressed_ptr,
1154         sb->uncompressed_size,
1155         (Bytef *) sb->compressed_ptr,
1156         sb->compressed_size
1157         );
1158 }
1159 
1160 // verify the checksum
verify_ftnode_sub_block(struct sub_block * sb,const char * fname,BLOCKNUM blocknum)1161 int verify_ftnode_sub_block(struct sub_block *sb,
1162                             const char *fname,
1163                             BLOCKNUM blocknum) {
1164     int r = 0;
1165     // first verify the checksum
1166     uint32_t data_size = sb->uncompressed_size - 4; // checksum is 4 bytes at end
1167     uint32_t stored_xsum = toku_dtoh32(*((uint32_t *)((char *)sb->uncompressed_ptr + data_size)));
1168     uint32_t actual_xsum = toku_x1764_memory(sb->uncompressed_ptr, data_size);
1169     if (stored_xsum != actual_xsum) {
1170         fprintf(
1171             stderr,
1172             "%s:%d:verify_ftnode_sub_block - "
1173             "file[%s], blocknum[%ld], stored_xsum[%u] != actual_xsum[%u]\n",
1174             __FILE__,
1175             __LINE__,
1176             fname ? fname : "unknown",
1177             blocknum.b,
1178             stored_xsum,
1179             actual_xsum);
1180         dump_bad_block((Bytef *) sb->uncompressed_ptr, sb->uncompressed_size);
1181         r = TOKUDB_BAD_CHECKSUM;
1182     }
1183     return r;
1184 }
1185 
1186 // This function deserializes the data stored by serialize_ftnode_info
deserialize_ftnode_info(struct sub_block * sb,FTNODE node)1187 static int deserialize_ftnode_info(struct sub_block *sb, FTNODE node) {
1188 
1189     // sb_node_info->uncompressed_ptr stores the serialized node information
1190     // this function puts that information into node
1191 
1192     // first verify the checksum
1193     int r = 0;
1194     const char *fname = toku_ftnode_get_cachefile_fname_in_env(node);
1195     r = verify_ftnode_sub_block(sb, fname, node->blocknum);
1196     if (r != 0) {
1197         fprintf(
1198             stderr,
1199             "%s:%d:deserialize_ftnode_info - "
1200             "file[%s], blocknum[%ld], verify_ftnode_sub_block failed with %d\n",
1201             __FILE__,
1202             __LINE__,
1203             fname ? fname : "unknown",
1204             node->blocknum.b,
1205             r);
1206         dump_bad_block(static_cast<unsigned char *>(sb->uncompressed_ptr),
1207                        sb->uncompressed_size);
1208         goto exit;
1209     }
1210 
1211     uint32_t data_size;
1212     data_size = sb->uncompressed_size - 4; // checksum is 4 bytes at end
1213 
1214     // now with the data verified, we can read the information into the node
1215     struct rbuf rb;
1216     rbuf_init(&rb, (unsigned char *) sb->uncompressed_ptr, data_size);
1217 
1218     node->max_msn_applied_to_node_on_disk = rbuf_MSN(&rb);
1219     (void)rbuf_int(&rb);
1220     node->flags = rbuf_int(&rb);
1221     node->height = rbuf_int(&rb);
1222     if (node->layout_version_read_from_disk < FT_LAYOUT_VERSION_19) {
1223         (void) rbuf_int(&rb); // optimized_for_upgrade
1224     }
1225     if (node->layout_version_read_from_disk >= FT_LAYOUT_VERSION_22) {
1226         rbuf_TXNID(&rb, &node->oldest_referenced_xid_known);
1227     }
1228 
1229     // now create the basement nodes or childinfos, depending on whether this is a
1230     // leaf node or internal node
1231     // now the subtree_estimates
1232 
1233     // n_children is now in the header, nd the allocatio of the node->bp is in deserialize_ftnode_from_rbuf.
1234 
1235     // now the pivots
1236     if (node->n_children > 1) {
1237         node->pivotkeys.deserialize_from_rbuf(&rb, node->n_children - 1);
1238     } else {
1239         node->pivotkeys.create_empty();
1240     }
1241 
1242     // if this is an internal node, unpack the block nums, and fill in necessary fields
1243     // of childinfo
1244     if (node->height > 0) {
1245         for (int i = 0; i < node->n_children; i++) {
1246             BP_BLOCKNUM(node,i) = rbuf_blocknum(&rb);
1247             BP_WORKDONE(node, i) = 0;
1248         }
1249     }
1250 
1251     // make sure that all the data was read
1252     if (data_size != rb.ndone) {
1253         fprintf(
1254             stderr,
1255             "%s:%d:deserialize_ftnode_info - "
1256             "file[%s], blocknum[%ld], data_size[%d] != rb.ndone[%d]\n",
1257             __FILE__,
1258             __LINE__,
1259             fname ? fname : "unknown",
1260             node->blocknum.b,
1261             data_size,
1262             rb.ndone);
1263         dump_bad_block(rb.buf, rb.size);
1264         abort();
1265     }
1266 exit:
1267     return r;
1268 }
1269 
1270 static void
setup_available_ftnode_partition(FTNODE node,int i)1271 setup_available_ftnode_partition(FTNODE node, int i) {
1272     if (node->height == 0) {
1273         set_BLB(node, i, toku_create_empty_bn());
1274         BLB_MAX_MSN_APPLIED(node,i) = node->max_msn_applied_to_node_on_disk;
1275     }
1276     else {
1277         set_BNC(node, i, toku_create_empty_nl());
1278     }
1279 }
1280 
1281 // Assign the child_to_read member of the bfe from the given ftnode
1282 // that has been brought into memory.
1283 static void
update_bfe_using_ftnode(FTNODE node,ftnode_fetch_extra * bfe)1284 update_bfe_using_ftnode(FTNODE node, ftnode_fetch_extra *bfe)
1285 {
1286     if (bfe->type == ftnode_fetch_subset && bfe->search != NULL) {
1287         // we do not take into account prefetching yet
1288         // as of now, if we need a subset, the only thing
1289         // we can possibly require is a single basement node
1290         // we find out what basement node the query cares about
1291         // and check if it is available
1292         bfe->child_to_read = toku_ft_search_which_child(
1293             bfe->ft->cmp,
1294             node,
1295             bfe->search
1296             );
1297     } else if (bfe->type == ftnode_fetch_keymatch) {
1298         // we do not take into account prefetching yet
1299         // as of now, if we need a subset, the only thing
1300         // we can possibly require is a single basement node
1301         // we find out what basement node the query cares about
1302         // and check if it is available
1303         if (node->height == 0) {
1304             int left_child = bfe->leftmost_child_wanted(node);
1305             int right_child = bfe->rightmost_child_wanted(node);
1306             if (left_child == right_child) {
1307                 bfe->child_to_read = left_child;
1308             }
1309         }
1310     }
1311 }
1312 
1313 // Using the search parameters in the bfe, this function will
1314 // initialize all of the given ftnode's partitions.
1315 static void
setup_partitions_using_bfe(FTNODE node,ftnode_fetch_extra * bfe,bool data_in_memory)1316 setup_partitions_using_bfe(FTNODE node,
1317                            ftnode_fetch_extra *bfe,
1318                            bool data_in_memory)
1319 {
1320     // Leftmost and Rightmost Child bounds.
1321     int lc, rc;
1322     if (bfe->type == ftnode_fetch_subset || bfe->type == ftnode_fetch_prefetch) {
1323         lc = bfe->leftmost_child_wanted(node);
1324         rc = bfe->rightmost_child_wanted(node);
1325     } else {
1326         lc = -1;
1327         rc = -1;
1328     }
1329 
1330     //
1331     // setup memory needed for the node
1332     //
1333     //printf("node height %d, blocknum %" PRId64 ", type %d lc %d rc %d\n", node->height, node->blocknum.b, bfe->type, lc, rc);
1334     for (int i = 0; i < node->n_children; i++) {
1335         BP_INIT_UNTOUCHED_CLOCK(node,i);
1336         if (data_in_memory) {
1337             BP_STATE(node, i) = ((bfe->wants_child_available(i) || (lc <= i && i <= rc))
1338                                  ? PT_AVAIL : PT_COMPRESSED);
1339         } else {
1340             BP_STATE(node, i) = PT_ON_DISK;
1341         }
1342         BP_WORKDONE(node,i) = 0;
1343 
1344         switch (BP_STATE(node,i)) {
1345         case PT_AVAIL:
1346             setup_available_ftnode_partition(node, i);
1347             BP_TOUCH_CLOCK(node,i);
1348             break;
1349         case PT_COMPRESSED:
1350             set_BSB(node, i, sub_block_creat());
1351             break;
1352         case PT_ON_DISK:
1353             set_BNULL(node, i);
1354             break;
1355         case PT_INVALID:
1356             abort();
1357         }
1358     }
1359 }
1360 
setup_ftnode_partitions(FTNODE node,ftnode_fetch_extra * bfe,bool data_in_memory)1361 static void setup_ftnode_partitions(FTNODE node, ftnode_fetch_extra *bfe, bool data_in_memory)
1362 // Effect: Used when reading a ftnode into main memory, this sets up the partitions.
1363 //   We set bfe->child_to_read as well as the BP_STATE and the data pointers (e.g., with set_BSB or set_BNULL or other set_ operations).
1364 // Arguments:  Node: the node to set up.
1365 //             bfe:  Describes the key range needed.
1366 //             data_in_memory: true if we have all the data (in which case we set the BP_STATE to be either PT_AVAIL or PT_COMPRESSED depending on the bfe.
1367 //                             false if we don't have the partitions in main memory (in which case we set the state to PT_ON_DISK.
1368 {
1369     // Set bfe->child_to_read.
1370     update_bfe_using_ftnode(node, bfe);
1371 
1372     // Setup the partitions.
1373     setup_partitions_using_bfe(node, bfe, data_in_memory);
1374 }
1375 
1376 /* deserialize the partition from the sub-block's uncompressed buffer
1377  * and destroy the uncompressed buffer
1378  */
deserialize_ftnode_partition(struct sub_block * sb,FTNODE node,int childnum,const toku::comparator & cmp)1379 static int deserialize_ftnode_partition(
1380     struct sub_block *sb,
1381     FTNODE node,
1382     int childnum,  // which partition to deserialize
1383     const toku::comparator &cmp) {
1384 
1385     int r = 0;
1386     const char *fname = toku_ftnode_get_cachefile_fname_in_env(node);
1387     r = verify_ftnode_sub_block(sb, fname, node->blocknum);
1388     if (r != 0) {
1389         fprintf(stderr,
1390                 "%s:%d:deserialize_ftnode_partition - "
1391                 "file[%s], blocknum[%ld], "
1392                 "verify_ftnode_sub_block failed with %d\n",
1393                 __FILE__,
1394                 __LINE__,
1395                 fname ? fname : "unknown",
1396                 node->blocknum.b,
1397                 r);
1398         goto exit;
1399     }
1400     uint32_t data_size;
1401     data_size = sb->uncompressed_size - 4; // checksum is 4 bytes at end
1402 
1403     // now with the data verified, we can read the information into the node
1404     struct rbuf rb;
1405     rbuf_init(&rb, (unsigned char *) sb->uncompressed_ptr, data_size);
1406     unsigned char ch;
1407     ch = rbuf_char(&rb);
1408 
1409     if (node->height > 0) {
1410         if (ch != FTNODE_PARTITION_MSG_BUFFER) {
1411             fprintf(stderr,
1412                     "%s:%d:deserialize_ftnode_partition - "
1413                     "file[%s], blocknum[%ld], ch[%d] != "
1414                     "FTNODE_PARTITION_MSG_BUFFER[%d]\n",
1415                     __FILE__,
1416                     __LINE__,
1417                     fname ? fname : "unknown",
1418                     node->blocknum.b,
1419                     ch,
1420                     FTNODE_PARTITION_MSG_BUFFER);
1421             dump_bad_block(rb.buf, rb.size);
1422             assert(ch == FTNODE_PARTITION_MSG_BUFFER);
1423         }
1424         NONLEAF_CHILDINFO bnc = BNC(node, childnum);
1425         if (node->layout_version_read_from_disk <= FT_LAYOUT_VERSION_26) {
1426             // Layout version <= 26 did not serialize sorted message trees to disk.
1427             deserialize_child_buffer_v26(bnc, &rb, cmp);
1428         } else {
1429             deserialize_child_buffer(bnc, &rb);
1430         }
1431         BP_WORKDONE(node, childnum) = 0;
1432     } else {
1433         if (ch != FTNODE_PARTITION_DMT_LEAVES) {
1434             fprintf(stderr,
1435                     "%s:%d:deserialize_ftnode_partition - "
1436                     "file[%s], blocknum[%ld], ch[%d] != "
1437                     "FTNODE_PARTITION_DMT_LEAVES[%d]\n",
1438                     __FILE__,
1439                     __LINE__,
1440                     fname ? fname : "unknown",
1441                     node->blocknum.b,
1442                     ch,
1443                     FTNODE_PARTITION_DMT_LEAVES);
1444             dump_bad_block(rb.buf, rb.size);
1445             assert(ch == FTNODE_PARTITION_DMT_LEAVES);
1446         }
1447 
1448         BLB_SEQINSERT(node, childnum) = 0;
1449         uint32_t num_entries = rbuf_int(&rb);
1450         // we are now at the first byte of first leafentry
1451         data_size -= rb.ndone; // remaining bytes of leafentry data
1452 
1453         BASEMENTNODE bn = BLB(node, childnum);
1454         bn->data_buffer.deserialize_from_rbuf(
1455             num_entries, &rb, data_size, node->layout_version_read_from_disk);
1456     }
1457     if (rb.ndone != rb.size) {
1458         fprintf(stderr,
1459                 "%s:%d:deserialize_ftnode_partition - "
1460                 "file[%s], blocknum[%ld], rb.ndone[%d] != rb.size[%d]\n",
1461                 __FILE__,
1462                 __LINE__,
1463                 fname ? fname : "unknown",
1464                 node->blocknum.b,
1465                 rb.ndone,
1466                 rb.size);
1467         dump_bad_block(rb.buf, rb.size);
1468         assert(rb.ndone == rb.size);
1469     }
1470 
1471 exit:
1472     return r;
1473 }
1474 
decompress_and_deserialize_worker(struct rbuf curr_rbuf,struct sub_block curr_sb,FTNODE node,int child,const toku::comparator & cmp,tokutime_t * decompress_time)1475 static int decompress_and_deserialize_worker(struct rbuf curr_rbuf,
1476                                              struct sub_block curr_sb,
1477                                              FTNODE node,
1478                                              int child,
1479                                              const toku::comparator &cmp,
1480                                              tokutime_t *decompress_time) {
1481     int r = 0;
1482     tokutime_t t0 = toku_time_now();
1483     r = read_and_decompress_sub_block(&curr_rbuf, &curr_sb);
1484     if (r != 0) {
1485         const char *fname = toku_ftnode_get_cachefile_fname_in_env(node);
1486         fprintf(stderr,
1487                 "%s:%d:decompress_and_deserialize_worker - "
1488                 "file[%s], blocknum[%ld], read_and_decompress_sub_block failed "
1489                 "with %d\n",
1490                 __FILE__,
1491                 __LINE__,
1492                 fname ? fname : "unknown",
1493                 node->blocknum.b,
1494                 r);
1495         dump_bad_block(curr_rbuf.buf, curr_rbuf.size);
1496         goto exit;
1497     }
1498     *decompress_time = toku_time_now() - t0;
1499     // at this point, sb->uncompressed_ptr stores the serialized node partition
1500     r = deserialize_ftnode_partition(&curr_sb, node, child, cmp);
1501     if (r != 0) {
1502         const char *fname = toku_ftnode_get_cachefile_fname_in_env(node);
1503         fprintf(stderr,
1504                 "%s:%d:decompress_and_deserialize_worker - "
1505                 "file[%s], blocknum[%ld], deserialize_ftnode_partition failed "
1506                 "with %d\n",
1507                 __FILE__,
1508                 __LINE__,
1509                 fname ? fname : "unknown",
1510                 node->blocknum.b,
1511                 r);
1512         dump_bad_block(curr_rbuf.buf, curr_rbuf.size);
1513         goto exit;
1514     }
1515 
1516 exit:
1517     toku_free(curr_sb.uncompressed_ptr);
1518     return r;
1519 }
1520 
check_and_copy_compressed_sub_block_worker(struct rbuf curr_rbuf,struct sub_block curr_sb,FTNODE node,int child)1521 static int check_and_copy_compressed_sub_block_worker(struct rbuf curr_rbuf,
1522                                                       struct sub_block curr_sb,
1523                                                       FTNODE node,
1524                                                       int child) {
1525     int r = 0;
1526     r = read_compressed_sub_block(&curr_rbuf, &curr_sb);
1527     if (r != 0) {
1528         goto exit;
1529     }
1530 
1531     SUB_BLOCK bp_sb;
1532     bp_sb = BSB(node, child);
1533     bp_sb->compressed_size = curr_sb.compressed_size;
1534     bp_sb->uncompressed_size = curr_sb.uncompressed_size;
1535     bp_sb->compressed_ptr = toku_xmalloc(bp_sb->compressed_size);
1536     memcpy(
1537         bp_sb->compressed_ptr, curr_sb.compressed_ptr, bp_sb->compressed_size);
1538 exit:
1539     return r;
1540 }
1541 
alloc_ftnode_for_deserialize(uint32_t fullhash,BLOCKNUM blocknum)1542 static FTNODE alloc_ftnode_for_deserialize(uint32_t fullhash, BLOCKNUM blocknum) {
1543 // Effect: Allocate an FTNODE and fill in the values that are not read from
1544     FTNODE XMALLOC(node);
1545     node->fullhash = fullhash;
1546     node->blocknum = blocknum;
1547     node->clear_dirty();
1548     node->oldest_referenced_xid_known = TXNID_NONE;
1549     node->bp = nullptr;
1550     node->ct_pair = nullptr;
1551     return node;
1552 }
1553 
deserialize_ftnode_header_from_rbuf_if_small_enough(FTNODE * ftnode,FTNODE_DISK_DATA * ndd,BLOCKNUM blocknum,uint32_t fullhash,ftnode_fetch_extra * bfe,struct rbuf * rb,int fd)1554 static int deserialize_ftnode_header_from_rbuf_if_small_enough(
1555     FTNODE *ftnode,
1556     FTNODE_DISK_DATA *ndd,
1557     BLOCKNUM blocknum,
1558     uint32_t fullhash,
1559     ftnode_fetch_extra *bfe,
1560     struct rbuf *rb,
1561     int fd)
1562 // If we have enough information in the rbuf to construct a header, then do so.
1563 // Also fetch in the basement node if needed.
1564 // Return 0 if it worked.  If something goes wrong (including that we are
1565 // looking at some old data format that doesn't have partitions) then return
1566 // nonzero.
1567 {
1568     int r = 0;
1569 
1570     tokutime_t t0, t1;
1571     tokutime_t decompress_time = 0;
1572     tokutime_t deserialize_time = 0;
1573     // we must get the name from bfe and not through
1574     // toku_ftnode_get_cachefile_fname_in_env as the node is not set up yet
1575     const char* fname = toku_cachefile_fname_in_env(bfe->ft->cf);
1576 
1577     t0 = toku_time_now();
1578 
1579     FTNODE node = alloc_ftnode_for_deserialize(fullhash, blocknum);
1580 
1581     if (rb->size < 24) {
1582         fprintf(
1583             stderr,
1584             "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - "
1585             "file[%s], blocknum[%ld], rb->size[%u] < 24\n",
1586             __FILE__,
1587             __LINE__,
1588             fname ? fname : "unknown",
1589             blocknum.b,
1590             rb->size);
1591         dump_bad_block(rb->buf, rb->size);
1592         // TODO: What error do we return here?
1593         // Does it even matter?
1594         r = toku_db_badformat();
1595         goto cleanup;
1596     }
1597 
1598     const void *magic;
1599     rbuf_literal_bytes(rb, &magic, 8);
1600     if (memcmp(magic, "tokuleaf", 8) != 0 &&
1601         memcmp(magic, "tokunode", 8) != 0) {
1602         fprintf(
1603             stderr,
1604             "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - "
1605             "file[%s], blocknum[%ld], unrecognized magic number "
1606             "%2.2x %2.2x %2.2x %2.2x   %2.2x %2.2x %2.2x %2.2x\n",
1607             __FILE__,
1608             __LINE__,
1609             fname ? fname : "unknown",
1610             blocknum.b,
1611             static_cast<const uint8_t*>(magic)[0],
1612             static_cast<const uint8_t*>(magic)[1],
1613             static_cast<const uint8_t*>(magic)[2],
1614             static_cast<const uint8_t*>(magic)[3],
1615             static_cast<const uint8_t*>(magic)[4],
1616             static_cast<const uint8_t*>(magic)[5],
1617             static_cast<const uint8_t*>(magic)[6],
1618             static_cast<const uint8_t*>(magic)[7]);
1619         dump_bad_block(rb->buf, rb->size);
1620         r = toku_db_badformat();
1621         goto cleanup;
1622     }
1623 
1624     node->layout_version_read_from_disk = rbuf_int(rb);
1625     if (node->layout_version_read_from_disk <
1626         FT_FIRST_LAYOUT_VERSION_WITH_BASEMENT_NODES) {
1627         fprintf(
1628             stderr,
1629             "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - "
1630             "file[%s], blocknum[%ld], node->layout_version_read_from_disk[%d] "
1631             "< FT_FIRST_LAYOUT_VERSION_WITH_BASEMENT_NODES[%d]\n",
1632             __FILE__,
1633             __LINE__,
1634             fname ? fname : "unknown",
1635             blocknum.b,
1636             node->layout_version_read_from_disk,
1637             FT_FIRST_LAYOUT_VERSION_WITH_BASEMENT_NODES);
1638         dump_bad_block(rb->buf, rb->size);
1639         // This code path doesn't have to worry about upgrade.
1640         r = toku_db_badformat();
1641         goto cleanup;
1642     }
1643 
1644     // If we get here, we know the node is at least
1645     // FT_FIRST_LAYOUT_VERSION_WITH_BASEMENT_NODES.  We haven't changed
1646     // the serialization format since then (this comment is correct as of
1647     // version 20, which is Deadshot) so we can go ahead and say the
1648     // layout version is current (it will be as soon as we finish
1649     // deserializing).
1650     // TODO(leif): remove node->layout_version (#5174)
1651     node->layout_version = FT_LAYOUT_VERSION;
1652 
1653     node->layout_version_original = rbuf_int(rb);
1654     node->build_id = rbuf_int(rb);
1655     node->n_children = rbuf_int(rb);
1656     // Guaranteed to be have been able to read up to here.  If n_children
1657     // is too big, we may have a problem, so check that we won't overflow
1658     // while reading the partition locations.
1659     unsigned int nhsize;
1660     // we can do this because n_children is filled in.
1661     nhsize = serialize_node_header_size(node);
1662     unsigned int needed_size;
1663     // we need 12 more so that we can read the compressed block size information
1664     // that follows for the nodeinfo.
1665     needed_size = nhsize + 12;
1666     if (needed_size > rb->size) {
1667         fprintf(
1668             stderr,
1669             "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - "
1670             "file[%s], blocknum[%ld], needed_size[%d] > rb->size[%d]\n",
1671             __FILE__,
1672             __LINE__,
1673             fname ? fname : "unknown",
1674             blocknum.b,
1675             needed_size,
1676             rb->size);
1677         dump_bad_block(rb->buf, rb->size);
1678         r = toku_db_badformat();
1679         goto cleanup;
1680     }
1681 
1682     XMALLOC_N(node->n_children, node->bp);
1683     XMALLOC_N(node->n_children, *ndd);
1684     // read the partition locations
1685     for (int i=0; i<node->n_children; i++) {
1686         BP_START(*ndd,i) = rbuf_int(rb);
1687         BP_SIZE (*ndd,i) = rbuf_int(rb);
1688     }
1689 
1690     uint32_t checksum;
1691     checksum = toku_x1764_memory(rb->buf, rb->ndone);
1692     uint32_t stored_checksum;
1693     stored_checksum = rbuf_int(rb);
1694     if (stored_checksum != checksum) {
1695         fprintf(
1696             stderr,
1697             "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - "
1698             "file[%s], blocknum[%ld], stored_checksum[%d] != checksum[%d]\n",
1699             __FILE__,
1700             __LINE__,
1701             fname ? fname : "unknown",
1702             blocknum.b,
1703             stored_checksum,
1704             checksum);
1705         dump_bad_block(rb->buf, rb->size);
1706         r = TOKUDB_BAD_CHECKSUM;
1707         goto cleanup;
1708     }
1709 
1710     // Now we want to read the pivot information.
1711     struct sub_block sb_node_info;
1712     sub_block_init(&sb_node_info);
1713     // we'll be able to read these because we checked the size earlier.
1714     sb_node_info.compressed_size = rbuf_int(rb);
1715     sb_node_info.uncompressed_size = rbuf_int(rb);
1716     if (rb->size - rb->ndone < sb_node_info.compressed_size + 8) {
1717         fprintf(
1718             stderr,
1719             "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - "
1720             "file[%s], blocknum[%ld], rb->size[%d] - rb->ndone[%d] < "
1721             "sb_node_info.compressed_size[%d] + 8\n",
1722             __FILE__,
1723             __LINE__,
1724             fname ? fname : "unknown",
1725             blocknum.b,
1726             rb->size,
1727             rb->ndone,
1728             sb_node_info.compressed_size);
1729         dump_bad_block(rb->buf, rb->size);
1730         r = toku_db_badformat();
1731         goto cleanup;
1732     }
1733 
1734     // Finish reading compressed the sub_block
1735     const void **cp;
1736     cp = (const void **) &sb_node_info.compressed_ptr;
1737     rbuf_literal_bytes(rb, cp, sb_node_info.compressed_size);
1738     sb_node_info.xsum = rbuf_int(rb);
1739     // let's check the checksum
1740     uint32_t actual_xsum;
1741     actual_xsum = toku_x1764_memory((char *)sb_node_info.compressed_ptr - 8,
1742                                     8 + sb_node_info.compressed_size);
1743     if (sb_node_info.xsum != actual_xsum) {
1744         fprintf(
1745             stderr,
1746             "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - "
1747             "file[%s], blocknum[%ld], sb_node_info.xsum[%d] != actual_xsum[%d]\n",
1748             __FILE__,
1749             __LINE__,
1750             fname ? fname : "unknown",
1751             blocknum.b,
1752             sb_node_info.xsum,
1753             actual_xsum);
1754         dump_bad_block(rb->buf, rb->size);
1755         r = TOKUDB_BAD_CHECKSUM;
1756         goto cleanup;
1757     }
1758 
1759     // Now decompress the subblock
1760     {
1761         toku::scoped_malloc sb_node_info_buf(sb_node_info.uncompressed_size);
1762         sb_node_info.uncompressed_ptr = sb_node_info_buf.get();
1763         tokutime_t decompress_t0 = toku_time_now();
1764         toku_decompress((Bytef *)sb_node_info.uncompressed_ptr,
1765                         sb_node_info.uncompressed_size,
1766                         (Bytef *)sb_node_info.compressed_ptr,
1767                         sb_node_info.compressed_size);
1768         tokutime_t decompress_t1 = toku_time_now();
1769         decompress_time = decompress_t1 - decompress_t0;
1770 
1771         // at this point sb->uncompressed_ptr stores the serialized node info.
1772         r = deserialize_ftnode_info(&sb_node_info, node);
1773         if (r != 0) {
1774             fprintf(
1775                 stderr,
1776                 "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - "
1777                 "file[%s], blocknum[%ld], deserialize_ftnode_info failed with "
1778                 "%d\n",
1779                 __FILE__,
1780                 __LINE__,
1781                 fname ? fname : "unknown",
1782                 blocknum.b,
1783                 r);
1784             dump_bad_block(
1785                 static_cast<unsigned char *>(sb_node_info.uncompressed_ptr),
1786                 sb_node_info.uncompressed_size);
1787             dump_bad_block(rb->buf, rb->size);
1788             goto cleanup;
1789         }
1790     }
1791 
1792     // Now we have the ftnode_info.  We have a bunch more stuff in the
1793     // rbuf, so we might be able to store the compressed data for some
1794     // objects.
1795     // We can proceed to deserialize the individual subblocks.
1796 
1797     // setup the memory of the partitions
1798     // for partitions being decompressed, create either message buffer or basement node
1799     // for partitions staying compressed, create sub_block
1800     setup_ftnode_partitions(node, bfe, false);
1801 
1802     // We must capture deserialize and decompression time before
1803     // the pf_callback, otherwise we would double-count.
1804     t1 = toku_time_now();
1805     deserialize_time = (t1 - t0) - decompress_time;
1806 
1807     // do partial fetch if necessary
1808     if (bfe->type != ftnode_fetch_none) {
1809         PAIR_ATTR attr;
1810         r = toku_ftnode_pf_callback(node, *ndd, bfe, fd, &attr);
1811         if (r != 0) {
1812             fprintf(
1813                 stderr,
1814                 "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - "
1815                 "file[%s], blocknum[%ld], toku_ftnode_pf_callback failed with "
1816                 "%d\n",
1817                 __FILE__,
1818                 __LINE__,
1819                 fname ? fname : "unknown",
1820                 blocknum.b,
1821                 r);
1822             dump_bad_block(rb->buf, rb->size);
1823             goto cleanup;
1824         }
1825     }
1826 
1827     // handle clock
1828     for (int i = 0; i < node->n_children; i++) {
1829         if (bfe->wants_child_available(i)) {
1830             paranoid_invariant(BP_STATE(node,i) == PT_AVAIL);
1831             BP_TOUCH_CLOCK(node,i);
1832         }
1833     }
1834     *ftnode = node;
1835     r = 0;
1836 
1837 cleanup:
1838     if (r == 0) {
1839         bfe->deserialize_time += deserialize_time;
1840         bfe->decompress_time += decompress_time;
1841         toku_ft_status_update_deserialize_times(node, deserialize_time, decompress_time);
1842     }
1843     if (r != 0) {
1844         if (node) {
1845             toku_free(*ndd);
1846             toku_free(node->bp);
1847             toku_free(node);
1848         }
1849     }
1850     return r;
1851 }
1852 
1853 // This function takes a deserialized version 13 or 14 buffer and
1854 // constructs the associated internal, non-leaf ftnode object.  It
1855 // also creates MSN's for older messages created in older versions
1856 // that did not generate MSN's for messages.  These new MSN's are
1857 // generated from the root downwards, counting backwards from MIN_MSN
1858 // and persisted in the ft header.
deserialize_and_upgrade_internal_node(FTNODE node,struct rbuf * rb,ftnode_fetch_extra * bfe,STAT64INFO info)1859 static int deserialize_and_upgrade_internal_node(FTNODE node,
1860                                                  struct rbuf *rb,
1861                                                  ftnode_fetch_extra *bfe,
1862                                                  STAT64INFO info) {
1863     int version = node->layout_version_read_from_disk;
1864 
1865     if (version == FT_LAST_LAYOUT_VERSION_WITH_FINGERPRINT) {
1866         (void) rbuf_int(rb);                          // 10. fingerprint
1867     }
1868 
1869     node->n_children = rbuf_int(rb);                  // 11. n_children
1870 
1871     // Sub-tree esitmates...
1872     for (int i = 0; i < node->n_children; ++i) {
1873         if (version == FT_LAST_LAYOUT_VERSION_WITH_FINGERPRINT) {
1874             (void) rbuf_int(rb);                      // 12. fingerprint
1875         }
1876         uint64_t nkeys = rbuf_ulonglong(rb);          // 13. nkeys
1877         uint64_t ndata = rbuf_ulonglong(rb);          // 14. ndata
1878         uint64_t dsize = rbuf_ulonglong(rb);          // 15. dsize
1879         (void) rbuf_char(rb);                         // 16. exact (char)
1880         invariant(nkeys == ndata);
1881         if (info) {
1882             // info is non-null if we're trying to upgrade old subtree
1883             // estimates to stat64info
1884             info->numrows += nkeys;
1885             info->numbytes += dsize;
1886         }
1887     }
1888 
1889     // Pivot keys
1890     node->pivotkeys.deserialize_from_rbuf(rb, node->n_children - 1);
1891 
1892     // Create space for the child node buffers (a.k.a. partitions).
1893     XMALLOC_N(node->n_children, node->bp);
1894 
1895     // Set the child blocknums.
1896     for (int i = 0; i < node->n_children; ++i) {
1897         BP_BLOCKNUM(node, i) = rbuf_blocknum(rb);    // 18. blocknums
1898         BP_WORKDONE(node, i) = 0;
1899     }
1900 
1901     // Read in the child buffer maps.
1902     for (int i = 0; i < node->n_children; ++i) {
1903         // The following fields were previously used by the `sub_block_map'
1904         // They include:
1905         // - 4 byte index
1906         (void) rbuf_int(rb);
1907         // - 4 byte offset
1908         (void) rbuf_int(rb);
1909         // - 4 byte size
1910         (void) rbuf_int(rb);
1911     }
1912 
1913     // We need to setup this node's partitions, but we can't call the
1914     // existing call (setup_ftnode_paritions.) because there are
1915     // existing optimizations that would prevent us from bringing all
1916     // of this node's partitions into memory.  Instead, We use the
1917     // existing bfe and node to set the bfe's child_to_search member.
1918     // Then we create a temporary bfe that needs all the nodes to make
1919     // sure we properly intitialize our partitions before filling them
1920     // in from our soon-to-be-upgraded node.
1921     update_bfe_using_ftnode(node, bfe);
1922     ftnode_fetch_extra temp_bfe;
1923     temp_bfe.create_for_full_read(nullptr);
1924     setup_partitions_using_bfe(node, &temp_bfe, true);
1925 
1926     // Cache the highest MSN generated for the message buffers.  This
1927     // will be set in the ftnode.
1928     //
1929     // The way we choose MSNs for upgraded messages is delicate.  The
1930     // field `highest_unused_msn_for_upgrade' in the header is always an
1931     // MSN that no message has yet.  So when we have N messages that need
1932     // MSNs, we decrement it by N, and then use it and the N-1 MSNs less
1933     // than it, but we do not use the value we decremented it to.
1934     //
1935     // In the code below, we initialize `lowest' with the value of
1936     // `highest_unused_msn_for_upgrade' after it is decremented, so we
1937     // need to be sure to increment it once before we enqueue our first
1938     // message.
1939     MSN highest_msn;
1940     highest_msn.msn = 0;
1941 
1942     // Deserialize de-compressed buffers.
1943     for (int i = 0; i < node->n_children; ++i) {
1944         NONLEAF_CHILDINFO bnc = BNC(node, i);
1945         MSN highest_msn_in_this_buffer = deserialize_child_buffer_v13(bfe->ft, bnc, rb);
1946         if (highest_msn.msn == 0) {
1947             highest_msn.msn = highest_msn_in_this_buffer.msn;
1948         }
1949     }
1950 
1951     // Assign the highest msn from our upgrade message buffers
1952     node->max_msn_applied_to_node_on_disk = highest_msn;
1953     // Since we assigned MSNs to this node's messages, we need to dirty it.
1954     node->set_dirty();
1955 
1956     // Must compute the checksum now (rather than at the end, while we
1957     // still have the pointer to the buffer).
1958     if (version >= FT_FIRST_LAYOUT_VERSION_WITH_END_TO_END_CHECKSUM) {
1959         uint32_t expected_xsum = toku_dtoh32(*(uint32_t*)(rb->buf+rb->size-4)); // 27. checksum
1960         uint32_t actual_xsum   = toku_x1764_memory(rb->buf, rb->size-4);
1961         if (expected_xsum != actual_xsum) {
1962             fprintf(stderr, "%s:%d: Bad checksum: expected = %" PRIx32 ", actual= %" PRIx32 "\n",
1963                     __FUNCTION__,
1964                     __LINE__,
1965                     expected_xsum,
1966                     actual_xsum);
1967             fprintf(stderr,
1968                     "Checksum failure while reading node in file %s.\n",
1969                     toku_cachefile_fname_in_env(bfe->ft->cf));
1970             fflush(stderr);
1971             return toku_db_badformat();
1972         }
1973     }
1974 
1975     return 0;
1976 }
1977 
1978 // This function takes a deserialized version 13 or 14 buffer and
1979 // constructs the associated leaf ftnode object.
1980 static int
deserialize_and_upgrade_leaf_node(FTNODE node,struct rbuf * rb,ftnode_fetch_extra * bfe,STAT64INFO info)1981 deserialize_and_upgrade_leaf_node(FTNODE node,
1982                                   struct rbuf *rb,
1983                                   ftnode_fetch_extra *bfe,
1984                                   STAT64INFO info)
1985 {
1986     int r = 0;
1987     int version = node->layout_version_read_from_disk;
1988 
1989     // This is a leaf node, so the offsets in the buffer will be
1990     // different from the internal node offsets above.
1991     uint64_t nkeys = rbuf_ulonglong(rb);                // 10. nkeys
1992     uint64_t ndata = rbuf_ulonglong(rb);                // 11. ndata
1993     uint64_t dsize = rbuf_ulonglong(rb);                // 12. dsize
1994     invariant(nkeys == ndata);
1995     if (info) {
1996         // info is non-null if we're trying to upgrade old subtree
1997         // estimates to stat64info
1998         info->numrows += nkeys;
1999         info->numbytes += dsize;
2000     }
2001 
2002     // This is the optimized for upgrade field.
2003     if (version == FT_LAYOUT_VERSION_14) {
2004         (void) rbuf_int(rb);                            // 13. optimized
2005     }
2006 
2007     // npartitions - This is really the number of leaf entries in
2008     // our single basement node.  There should only be 1 (ONE)
2009     // partition, so there shouldn't be any pivot key stored.  This
2010     // means the loop will not iterate.  We could remove the loop and
2011     // assert that the value is indeed 1.
2012     int npartitions = rbuf_int(rb);                     // 14. npartitions
2013     assert(npartitions == 1);
2014 
2015     // Set number of children to 1, since we will only have one
2016     // basement node.
2017     node->n_children = 1;
2018     XMALLOC_N(node->n_children, node->bp);
2019     node->pivotkeys.create_empty();
2020 
2021     // Create one basement node to contain all the leaf entries by
2022     // setting up the single partition and updating the bfe.
2023     update_bfe_using_ftnode(node, bfe);
2024     ftnode_fetch_extra temp_bfe;
2025     temp_bfe.create_for_full_read(bfe->ft);
2026     setup_partitions_using_bfe(node, &temp_bfe, true);
2027 
2028     // 11. Deserialize the partition maps, though they are not used in the
2029     // newer versions of ftnodes.
2030     for (int i = 0; i < node->n_children; ++i) {
2031         // The following fields were previously used by the `sub_block_map'
2032         // They include:
2033         // - 4 byte index
2034         (void) rbuf_int(rb);
2035         // - 4 byte offset
2036         (void) rbuf_int(rb);
2037         // - 4 byte size
2038         (void) rbuf_int(rb);
2039     }
2040 
2041     // Copy all of the leaf entries into the single basement node.
2042 
2043     // The number of leaf entries in buffer.
2044     int n_in_buf = rbuf_int(rb);                        // 15. # of leaves
2045     BLB_SEQINSERT(node,0) = 0;
2046     BASEMENTNODE bn = BLB(node, 0);
2047 
2048     // Read the leaf entries from the buffer, advancing the buffer
2049     // as we go.
2050     bool has_end_to_end_checksum = (version >= FT_FIRST_LAYOUT_VERSION_WITH_END_TO_END_CHECKSUM);
2051     if (version <= FT_LAYOUT_VERSION_13) {
2052         // Create our mempool.
2053         // Loop through
2054         for (int i = 0; i < n_in_buf; ++i) {
2055             LEAFENTRY_13 le = reinterpret_cast<LEAFENTRY_13>(&rb->buf[rb->ndone]);
2056             uint32_t disksize = leafentry_disksize_13(le);
2057             rb->ndone += disksize;                       // 16. leaf entry (13)
2058             invariant(rb->ndone<=rb->size);
2059             LEAFENTRY new_le;
2060             size_t new_le_size;
2061             void* key = NULL;
2062             uint32_t keylen = 0;
2063             r = toku_le_upgrade_13_14(le,
2064                                       &key,
2065                                       &keylen,
2066                                       &new_le_size,
2067                                       &new_le);
2068             assert_zero(r);
2069             // Copy the pointer value straight into the OMT
2070             LEAFENTRY new_le_in_bn = nullptr;
2071             void *maybe_free;
2072             bn->data_buffer.get_space_for_insert(
2073                 i,
2074                 key,
2075                 keylen,
2076                 new_le_size,
2077                 &new_le_in_bn,
2078                 &maybe_free
2079                 );
2080             if (maybe_free) {
2081                 toku_free(maybe_free);
2082             }
2083             memcpy(new_le_in_bn, new_le, new_le_size);
2084             toku_free(new_le);
2085         }
2086     } else {
2087         uint32_t data_size = rb->size - rb->ndone;
2088         if (has_end_to_end_checksum) {
2089             data_size -= sizeof(uint32_t);
2090         }
2091         bn->data_buffer.deserialize_from_rbuf(n_in_buf, rb, data_size, node->layout_version_read_from_disk);
2092     }
2093 
2094     // Whatever this is must be less than the MSNs of every message above
2095     // it, so it's ok to take it here.
2096     bn->max_msn_applied = bfe->ft->h->highest_unused_msn_for_upgrade;
2097     bn->stale_ancestor_messages_applied = false;
2098     node->max_msn_applied_to_node_on_disk = bn->max_msn_applied;
2099 
2100     // Checksum (end to end) is only on version 14
2101     if (has_end_to_end_checksum) {
2102         uint32_t expected_xsum = rbuf_int(rb);             // 17. checksum
2103         uint32_t actual_xsum = toku_x1764_memory(rb->buf, rb->size - 4);
2104         if (expected_xsum != actual_xsum) {
2105             fprintf(stderr, "%s:%d: Bad checksum: expected = %" PRIx32 ", actual= %" PRIx32 "\n",
2106                     __FUNCTION__,
2107                     __LINE__,
2108                     expected_xsum,
2109                     actual_xsum);
2110             fprintf(stderr,
2111                     "Checksum failure while reading node in file %s.\n",
2112                     toku_cachefile_fname_in_env(bfe->ft->cf));
2113             fflush(stderr);
2114             return toku_db_badformat();
2115         }
2116     }
2117 
2118     // We should have read the whole block by this point.
2119     if (rb->ndone != rb->size) {
2120         // TODO: Error handling.
2121         return 1;
2122     }
2123 
2124     return r;
2125 }
2126 
2127 static int read_and_decompress_block_from_fd_into_rbuf(
2128     int fd,
2129     BLOCKNUM blocknum,
2130     DISKOFF offset,
2131     DISKOFF size,
2132     FT ft,
2133     struct rbuf *rb,
2134     /* out */ int *layout_version_p);
2135 
2136 // This function upgrades a version 14 or 13 ftnode to the current
2137 // version. NOTE: This code assumes the first field of the rbuf has
2138 // already been read from the buffer (namely the layout_version of the
2139 // ftnode.)
deserialize_and_upgrade_ftnode(FTNODE node,FTNODE_DISK_DATA * ndd,BLOCKNUM blocknum,ftnode_fetch_extra * bfe,STAT64INFO info,int fd)2140 static int deserialize_and_upgrade_ftnode(FTNODE node,
2141                                           FTNODE_DISK_DATA *ndd,
2142                                           BLOCKNUM blocknum,
2143                                           ftnode_fetch_extra *bfe,
2144                                           STAT64INFO info,
2145                                           int fd) {
2146     int r = 0;
2147     int version;
2148 
2149     // I. First we need to de-compress the entire node, only then can
2150     // we read the different sub-sections.
2151     // get the file offset and block size for the block
2152     DISKOFF offset, size;
2153     bfe->ft->blocktable.translate_blocknum_to_offset_size(blocknum, &offset, &size);
2154 
2155     struct rbuf rb;
2156     r = read_and_decompress_block_from_fd_into_rbuf(fd,
2157                                                     blocknum,
2158                                                     offset,
2159                                                     size,
2160                                                     bfe->ft,
2161                                                     &rb,
2162                                                     &version);
2163     if (r != 0) {
2164         const char* fname = toku_cachefile_fname_in_env(bfe->ft->cf);
2165         fprintf(stderr,
2166                 "%s:%d:deserialize_and_upgrade_ftnode - "
2167                 "file[%s], blocknum[%ld], "
2168                 "read_and_decompress_block_from_fd_into_rbuf failed with %d\n",
2169                 __FILE__,
2170                 __LINE__,
2171                 fname ? fname : "unknown",
2172                 blocknum.b,
2173                 r);
2174         goto exit;
2175     }
2176 
2177     // Re-read the magic field from the previous call, since we are
2178     // restarting with a fresh rbuf.
2179     {
2180         const void *magic;
2181         rbuf_literal_bytes(&rb, &magic, 8);              // 1. magic
2182     }
2183 
2184     // II. Start reading ftnode fields out of the decompressed buffer.
2185 
2186     // Copy over old version info.
2187     node->layout_version_read_from_disk = rbuf_int(&rb); // 2. layout version
2188     version = node->layout_version_read_from_disk;
2189     if (version > FT_LAYOUT_VERSION_14) {
2190         const char* fname = toku_cachefile_fname_in_env(bfe->ft->cf);
2191         fprintf(stderr,
2192                 "%s:%d:deserialize_and_upgrade_ftnode - "
2193                 "file[%s], blocknum[%ld], version[%d] > "
2194                 "FT_LAYOUT_VERSION_14[%d]\n",
2195                 __FILE__,
2196                 __LINE__,
2197                 fname ? fname : "unknown",
2198                 blocknum.b,
2199                 version,
2200                 FT_LAYOUT_VERSION_14);
2201         dump_bad_block(rb.buf, rb.size);
2202         goto exit;
2203     }
2204     assert(version <= FT_LAYOUT_VERSION_14);
2205     // Upgrade the current version number to the current version.
2206     node->layout_version = FT_LAYOUT_VERSION;
2207 
2208     node->layout_version_original = rbuf_int(&rb);      // 3. original layout
2209     node->build_id = rbuf_int(&rb);                     // 4. build id
2210 
2211     // The remaining offsets into the rbuf do not map to the current
2212     // version, so we need to fill in the blanks and ignore older
2213     // fields.
2214     (void)rbuf_int(&rb);                                // 5. nodesize
2215     node->flags = rbuf_int(&rb);                        // 6. flags
2216     node->height = rbuf_int(&rb);                       // 7. height
2217 
2218     // If the version is less than 14, there are two extra ints here.
2219     // we would need to ignore them if they are there.
2220     // These are the 'fingerprints'.
2221     if (version == FT_LAYOUT_VERSION_13) {
2222         (void) rbuf_int(&rb);                           // 8. rand4
2223         (void) rbuf_int(&rb);                           // 9. local
2224     }
2225 
2226     // The next offsets are dependent on whether this is a leaf node
2227     // or not.
2228 
2229     // III. Read in Leaf and Internal Node specific data.
2230 
2231     // Check height to determine whether this is a leaf node or not.
2232     if (node->height > 0) {
2233         r = deserialize_and_upgrade_internal_node(node, &rb, bfe, info);
2234     } else {
2235         r = deserialize_and_upgrade_leaf_node(node, &rb, bfe, info);
2236     }
2237 
2238     XMALLOC_N(node->n_children, *ndd);
2239     // Initialize the partition locations to zero, because version 14
2240     // and below have no notion of partitions on disk.
2241     for (int i=0; i<node->n_children; i++) {
2242         BP_START(*ndd,i) = 0;
2243         BP_SIZE (*ndd,i) = 0;
2244     }
2245 
2246     toku_free(rb.buf);
2247 exit:
2248     return r;
2249 }
2250 
2251 // Effect: deserializes a ftnode that is in rb (with pointer of rb just past the
2252 // magic) into a FTNODE.
deserialize_ftnode_from_rbuf(FTNODE * ftnode,FTNODE_DISK_DATA * ndd,BLOCKNUM blocknum,uint32_t fullhash,ftnode_fetch_extra * bfe,STAT64INFO info,struct rbuf * rb,int fd)2253 static int deserialize_ftnode_from_rbuf(FTNODE *ftnode,
2254                                         FTNODE_DISK_DATA *ndd,
2255                                         BLOCKNUM blocknum,
2256                                         uint32_t fullhash,
2257                                         ftnode_fetch_extra *bfe,
2258                                         STAT64INFO info,
2259                                         struct rbuf *rb,
2260                                         int fd) {
2261     int r = 0;
2262     struct sub_block sb_node_info;
2263 
2264     tokutime_t t0, t1;
2265     tokutime_t decompress_time = 0;
2266     tokutime_t deserialize_time = 0;
2267     const char* fname = toku_cachefile_fname_in_env(bfe->ft->cf);
2268 
2269     t0 = toku_time_now();
2270 
2271     FTNODE node = alloc_ftnode_for_deserialize(fullhash, blocknum);
2272 
2273     // now start reading from rbuf
2274     // first thing we do is read the header information
2275     const void *magic;
2276     rbuf_literal_bytes(rb, &magic, 8);
2277     if (memcmp(magic, "tokuleaf", 8) != 0 &&
2278         memcmp(magic, "tokunode", 8) != 0) {
2279         fprintf(stderr,
2280                 "%s:%d:deserialize_ftnode_from_rbuf - "
2281                 "file[%s], blocknum[%ld], unrecognized magic number "
2282                 "%2.2x %2.2x %2.2x %2.2x   %2.2x %2.2x %2.2x %2.2x\n",
2283                 __FILE__,
2284                 __LINE__,
2285                 fname ? fname : "unknown",
2286                 blocknum.b,
2287                 static_cast<const uint8_t *>(magic)[0],
2288                 static_cast<const uint8_t *>(magic)[1],
2289                 static_cast<const uint8_t *>(magic)[2],
2290                 static_cast<const uint8_t *>(magic)[3],
2291                 static_cast<const uint8_t *>(magic)[4],
2292                 static_cast<const uint8_t *>(magic)[5],
2293                 static_cast<const uint8_t *>(magic)[6],
2294                 static_cast<const uint8_t *>(magic)[7]);
2295         dump_bad_block(rb->buf, rb->size);
2296 
2297         r = toku_db_badformat();
2298         goto cleanup;
2299     }
2300 
2301     node->layout_version_read_from_disk = rbuf_int(rb);
2302     lazy_assert(node->layout_version_read_from_disk >= FT_LAYOUT_MIN_SUPPORTED_VERSION);
2303 
2304     // Check if we are reading in an older node version.
2305     if (node->layout_version_read_from_disk <= FT_LAYOUT_VERSION_14) {
2306         int version = node->layout_version_read_from_disk;
2307         // Perform the upgrade.
2308         r = deserialize_and_upgrade_ftnode(node, ndd, blocknum, bfe, info, fd);
2309         if (r != 0) {
2310             fprintf(stderr,
2311                     "%s:%d:deserialize_ftnode_from_rbuf - "
2312                     "file[%s], blocknum[%ld], deserialize_and_upgrade_ftnode "
2313                     "failed with %d\n",
2314                     __FILE__,
2315                     __LINE__,
2316                     fname ? fname : "unknown",
2317                     blocknum.b,
2318                     r);
2319             dump_bad_block(rb->buf, rb->size);
2320             goto cleanup;
2321         }
2322 
2323         if (version <= FT_LAYOUT_VERSION_13) {
2324             // deprecate 'TOKU_DB_VALCMP_BUILTIN'. just remove the flag
2325             node->flags &= ~TOKU_DB_VALCMP_BUILTIN_13;
2326         }
2327 
2328         // If everything is ok, just re-assign the ftnode and retrn.
2329         *ftnode = node;
2330         r = 0;
2331         goto cleanup;
2332     }
2333 
2334     // Upgrade versions after 14 to current.  This upgrade is trivial, it
2335     // removes the optimized for upgrade field, which has already been
2336     // removed in the deserialization code (see
2337     // deserialize_ftnode_info()).
2338     node->layout_version = FT_LAYOUT_VERSION;
2339     node->layout_version_original = rbuf_int(rb);
2340     node->build_id = rbuf_int(rb);
2341     node->n_children = rbuf_int(rb);
2342     XMALLOC_N(node->n_children, node->bp);
2343     XMALLOC_N(node->n_children, *ndd);
2344     // read the partition locations
2345     for (int i=0; i<node->n_children; i++) {
2346         BP_START(*ndd,i) = rbuf_int(rb);
2347         BP_SIZE (*ndd,i) = rbuf_int(rb);
2348     }
2349     // verify checksum of header stored
2350     uint32_t checksum;
2351     checksum = toku_x1764_memory(rb->buf, rb->ndone);
2352     uint32_t stored_checksum;
2353     stored_checksum = rbuf_int(rb);
2354     if (stored_checksum != checksum) {
2355         fprintf(
2356             stderr,
2357             "%s:%d:deserialize_ftnode_from_rbuf - "
2358             "file[%s], blocknum[%ld], stored_checksum[%d] != checksum[%d]\n",
2359             __FILE__,
2360             __LINE__,
2361             fname ? fname : "unknown",
2362             blocknum.b,
2363             stored_checksum,
2364             checksum);
2365         dump_bad_block(rb->buf, rb->size);
2366         invariant(stored_checksum == checksum);
2367     }
2368 
2369     // now we read and decompress the pivot and child information
2370     sub_block_init(&sb_node_info);
2371     {
2372         tokutime_t sb_decompress_t0 = toku_time_now();
2373         r = read_and_decompress_sub_block(rb, &sb_node_info);
2374         tokutime_t sb_decompress_t1 = toku_time_now();
2375         decompress_time += sb_decompress_t1 - sb_decompress_t0;
2376         if (r != 0) {
2377             fprintf(
2378                 stderr,
2379                 "%s:%d:deserialize_ftnode_from_rbuf - "
2380                 "file[%s], blocknum[%ld], read_and_decompress_sub_block failed "
2381                 "with %d\n",
2382                 __FILE__,
2383                 __LINE__,
2384                 fname ? fname : "unknown",
2385                 blocknum.b,
2386                 r);
2387             dump_bad_block(
2388                 static_cast<unsigned char *>(sb_node_info.uncompressed_ptr),
2389                 sb_node_info.uncompressed_size);
2390             dump_bad_block(rb->buf, rb->size);
2391             goto cleanup;
2392         }
2393     }
2394 
2395     // at this point, sb->uncompressed_ptr stores the serialized node info
2396     r = deserialize_ftnode_info(&sb_node_info, node);
2397     if (r != 0) {
2398         fprintf(
2399             stderr,
2400             "%s:%d:deserialize_ftnode_from_rbuf - "
2401             "file[%s], blocknum[%ld], deserialize_ftnode_info failed with "
2402             "%d\n",
2403             __FILE__,
2404             __LINE__,
2405             fname ? fname : "unknown",
2406             blocknum.b,
2407             r);
2408         dump_bad_block(rb->buf, rb->size);
2409         goto cleanup;
2410     }
2411     toku_free(sb_node_info.uncompressed_ptr);
2412 
2413     // now that the node info has been deserialized, we can proceed to
2414     // deserialize the individual sub blocks
2415 
2416     // setup the memory of the partitions
2417     // for partitions being decompressed, create either message buffer or
2418     //   basement node
2419     // for partitions staying compressed, create sub_block
2420     setup_ftnode_partitions(node, bfe, true);
2421 
2422     // This loop is parallelizeable, since we don't have a dependency on the
2423     // work done so far.
2424     for (int i = 0; i < node->n_children; i++) {
2425         uint32_t curr_offset = BP_START(*ndd, i);
2426         uint32_t curr_size = BP_SIZE(*ndd, i);
2427         // the compressed, serialized partitions start at where rb is currently
2428         // pointing, which would be rb->buf + rb->ndone
2429         // we need to intialize curr_rbuf to point to this place
2430         struct rbuf curr_rbuf = {.buf = nullptr, .size = 0, .ndone = 0};
2431         rbuf_init(&curr_rbuf, rb->buf + curr_offset, curr_size);
2432 
2433         //
2434         // now we are at the point where we have:
2435         //  - read the entire compressed node off of disk,
2436         //  - decompressed the pivot and offset information,
2437         //  - have arrived at the individual partitions.
2438         //
2439         // Based on the information in bfe, we want to decompress a subset of
2440         // of the compressed partitions (also possibly none or possibly all)
2441         // The partitions that we want to decompress and make available
2442         // to the node, we do, the rest we simply copy in compressed
2443         // form into the node, and set the state of the partition to
2444         // PT_COMPRESSED
2445         //
2446 
2447         struct sub_block curr_sb;
2448         sub_block_init(&curr_sb);
2449 
2450         // curr_rbuf is passed by value to decompress_and_deserialize_worker,
2451         // so there's no ugly race condition.
2452         // This would be more obvious if curr_rbuf were an array.
2453 
2454         // deserialize_ftnode_info figures out what the state
2455         // should be and sets up the memory so that we are ready to use it
2456 
2457         switch (BP_STATE(node, i)) {
2458             case PT_AVAIL: {
2459                 //  case where we read and decompress the partition
2460                 tokutime_t partition_decompress_time;
2461                 r = decompress_and_deserialize_worker(
2462                     curr_rbuf,
2463                     curr_sb,
2464                     node,
2465                     i,
2466                     bfe->ft->cmp,
2467                     &partition_decompress_time);
2468                 decompress_time += partition_decompress_time;
2469                 if (r != 0) {
2470                     fprintf(
2471                         stderr,
2472                         "%s:%d:deserialize_ftnode_from_rbuf - "
2473                         "file[%s], blocknum[%ld], childnum[%d], "
2474                         "decompress_and_deserialize_worker failed with %d\n",
2475                         __FILE__,
2476                         __LINE__,
2477                         fname ? fname : "unknown",
2478                         blocknum.b,
2479                         i,
2480                         r);
2481                     dump_bad_block(rb->buf, rb->size);
2482                     goto cleanup;
2483                 }
2484                 break;
2485             }
2486         case PT_COMPRESSED:
2487             // case where we leave the partition in the compressed state
2488             r = check_and_copy_compressed_sub_block_worker(curr_rbuf, curr_sb, node, i);
2489             if (r != 0) {
2490                 fprintf(
2491                     stderr,
2492                     "%s:%d:deserialize_ftnode_from_rbuf - "
2493                     "file[%s], blocknum[%ld], childnum[%d], "
2494                     "check_and_copy_compressed_sub_block_worker failed with "
2495                     "%d\n",
2496                     __FILE__,
2497                     __LINE__,
2498                     fname ? fname : "unknown",
2499                     blocknum.b,
2500                     i,
2501                     r);
2502                 dump_bad_block(rb->buf, rb->size);
2503                 goto cleanup;
2504             }
2505             break;
2506         case PT_INVALID: // this is really bad
2507         case PT_ON_DISK: // it's supposed to be in memory.
2508             abort();
2509         }
2510     }
2511     *ftnode = node;
2512     r = 0;
2513 
2514 cleanup:
2515     if (r == 0) {
2516         t1 = toku_time_now();
2517         deserialize_time = (t1 - t0) - decompress_time;
2518         bfe->deserialize_time += deserialize_time;
2519         bfe->decompress_time += decompress_time;
2520         toku_ft_status_update_deserialize_times(node, deserialize_time, decompress_time);
2521     }
2522     if (r != 0) {
2523         // NOTE: Right now, callers higher in the stack will assert on
2524         // failure, so this is OK for production.  However, if we
2525         // create tools that use this function to search for errors in
2526         // the FT, then we will leak memory.
2527         if (node) {
2528             toku_free(node);
2529         }
2530     }
2531     return r;
2532 }
2533 
2534 int
toku_deserialize_bp_from_disk(FTNODE node,FTNODE_DISK_DATA ndd,int childnum,int fd,ftnode_fetch_extra * bfe)2535 toku_deserialize_bp_from_disk(FTNODE node, FTNODE_DISK_DATA ndd, int childnum, int fd, ftnode_fetch_extra *bfe) {
2536     int r = 0;
2537     assert(BP_STATE(node,childnum) == PT_ON_DISK);
2538     assert(node->bp[childnum].ptr.tag == BCT_NULL);
2539 
2540     //
2541     // setup the partition
2542     //
2543     setup_available_ftnode_partition(node, childnum);
2544     BP_STATE(node,childnum) = PT_AVAIL;
2545 
2546     //
2547     // read off disk and make available in memory
2548     //
2549     // get the file offset and block size for the block
2550     DISKOFF node_offset, total_node_disk_size;
2551     bfe->ft->blocktable.translate_blocknum_to_offset_size(node->blocknum, &node_offset, &total_node_disk_size);
2552 
2553     uint32_t curr_offset = BP_START(ndd, childnum);
2554     uint32_t curr_size = BP_SIZE (ndd, childnum);
2555 
2556     struct rbuf rb;
2557     rbuf_init(&rb, nullptr, 0);
2558 
2559     uint32_t pad_at_beginning = (node_offset+curr_offset)%512;
2560     uint32_t padded_size = roundup_to_multiple(512, pad_at_beginning + curr_size);
2561 
2562     toku::scoped_malloc_aligned raw_block_buf(padded_size, 512);
2563     uint8_t *raw_block = reinterpret_cast<uint8_t *>(raw_block_buf.get());
2564     rbuf_init(&rb, pad_at_beginning+raw_block, curr_size);
2565     tokutime_t t0 = toku_time_now();
2566 
2567     // read the block
2568     assert(0==((unsigned long long)raw_block)%512); // for O_DIRECT
2569     assert(0==(padded_size)%512);
2570     assert(0==(node_offset+curr_offset-pad_at_beginning)%512);
2571     ssize_t rlen = toku_os_pread(fd, raw_block, padded_size, node_offset+curr_offset-pad_at_beginning);
2572     assert((DISKOFF)rlen >= pad_at_beginning + curr_size); // we read in at least enough to get what we wanted
2573     assert((DISKOFF)rlen <= padded_size);                  // we didn't read in too much.
2574 
2575     tokutime_t t1 = toku_time_now();
2576 
2577     // read sub block
2578     struct sub_block curr_sb;
2579     sub_block_init(&curr_sb);
2580     r = read_compressed_sub_block(&rb, &curr_sb);
2581     if (r != 0) {
2582         return r;
2583     }
2584     invariant(curr_sb.compressed_ptr != NULL);
2585 
2586     // decompress
2587     toku::scoped_malloc uncompressed_buf(curr_sb.uncompressed_size);
2588     curr_sb.uncompressed_ptr = uncompressed_buf.get();
2589     toku_decompress((Bytef *) curr_sb.uncompressed_ptr, curr_sb.uncompressed_size,
2590                     (Bytef *) curr_sb.compressed_ptr, curr_sb.compressed_size);
2591 
2592     // deserialize
2593     tokutime_t t2 = toku_time_now();
2594 
2595     r = deserialize_ftnode_partition(&curr_sb, node, childnum, bfe->ft->cmp);
2596 
2597     tokutime_t t3 = toku_time_now();
2598 
2599     // capture stats
2600     tokutime_t io_time = t1 - t0;
2601     tokutime_t decompress_time = t2 - t1;
2602     tokutime_t deserialize_time = t3 - t2;
2603     bfe->deserialize_time += deserialize_time;
2604     bfe->decompress_time += decompress_time;
2605     toku_ft_status_update_deserialize_times(node, deserialize_time, decompress_time);
2606 
2607     bfe->bytes_read = rlen;
2608     bfe->io_time = io_time;
2609 
2610     return r;
2611 }
2612 
2613 // Take a ftnode partition that is in the compressed state, and make it avail
toku_deserialize_bp_from_compressed(FTNODE node,int childnum,ftnode_fetch_extra * bfe)2614 int toku_deserialize_bp_from_compressed(FTNODE node,
2615                                         int childnum,
2616                                         ftnode_fetch_extra *bfe) {
2617 
2618     int r = 0;
2619     assert(BP_STATE(node, childnum) == PT_COMPRESSED);
2620     SUB_BLOCK curr_sb = BSB(node, childnum);
2621 
2622     toku::scoped_malloc uncompressed_buf(curr_sb->uncompressed_size);
2623     assert(curr_sb->uncompressed_ptr == NULL);
2624     curr_sb->uncompressed_ptr = uncompressed_buf.get();
2625 
2626     setup_available_ftnode_partition(node, childnum);
2627     BP_STATE(node,childnum) = PT_AVAIL;
2628 
2629     // decompress the sub_block
2630     tokutime_t t0 = toku_time_now();
2631 
2632     toku_decompress((Bytef *)curr_sb->uncompressed_ptr,
2633                     curr_sb->uncompressed_size,
2634                     (Bytef *)curr_sb->compressed_ptr,
2635                     curr_sb->compressed_size);
2636 
2637     tokutime_t t1 = toku_time_now();
2638 
2639     r = deserialize_ftnode_partition(curr_sb, node, childnum, bfe->ft->cmp);
2640     if (r != 0) {
2641         const char* fname = toku_cachefile_fname_in_env(bfe->ft->cf);
2642         fprintf(stderr,
2643                 "%s:%d:toku_deserialize_bp_from_compressed - "
2644                 "file[%s], blocknum[%ld], "
2645                 "deserialize_ftnode_partition failed with %d\n",
2646                 __FILE__,
2647                 __LINE__,
2648                 fname ? fname : "unknown",
2649                 node->blocknum.b,
2650                 r);
2651         dump_bad_block(static_cast<unsigned char *>(curr_sb->compressed_ptr),
2652                        curr_sb->compressed_size);
2653         dump_bad_block(static_cast<unsigned char *>(curr_sb->uncompressed_ptr),
2654                        curr_sb->uncompressed_size);
2655     }
2656 
2657     tokutime_t t2 = toku_time_now();
2658 
2659     tokutime_t decompress_time = t1 - t0;
2660     tokutime_t deserialize_time = t2 - t1;
2661     bfe->deserialize_time += deserialize_time;
2662     bfe->decompress_time += decompress_time;
2663     toku_ft_status_update_deserialize_times(node, deserialize_time, decompress_time);
2664 
2665     toku_free(curr_sb->compressed_ptr);
2666     toku_free(curr_sb);
2667     return r;
2668 }
2669 
deserialize_ftnode_from_fd(int fd,BLOCKNUM blocknum,uint32_t fullhash,FTNODE * ftnode,FTNODE_DISK_DATA * ndd,ftnode_fetch_extra * bfe,STAT64INFO info)2670 static int deserialize_ftnode_from_fd(int fd,
2671                                       BLOCKNUM blocknum,
2672                                       uint32_t fullhash,
2673                                       FTNODE *ftnode,
2674                                       FTNODE_DISK_DATA *ndd,
2675                                       ftnode_fetch_extra *bfe,
2676                                       STAT64INFO info) {
2677     struct rbuf rb = RBUF_INITIALIZER;
2678 
2679     tokutime_t t0 = toku_time_now();
2680     read_block_from_fd_into_rbuf(fd, blocknum, bfe->ft, &rb);
2681     tokutime_t t1 = toku_time_now();
2682 
2683     // Decompress and deserialize the ftnode. Time statistics
2684     // are taken inside this function.
2685     int r = deserialize_ftnode_from_rbuf(
2686         ftnode, ndd, blocknum, fullhash, bfe, info, &rb, fd);
2687     if (r != 0) {
2688         const char* fname = toku_cachefile_fname_in_env(bfe->ft->cf);
2689         fprintf(
2690             stderr,
2691             "%s:%d:deserialize_ftnode_from_fd - "
2692             "file[%s], blocknum[%ld], deserialize_ftnode_from_rbuf failed with "
2693             "%d\n",
2694             __FILE__,
2695             __LINE__,
2696             fname ? fname : "unknown",
2697             blocknum.b,
2698             r);
2699         dump_bad_block(rb.buf, rb.size);
2700     }
2701 
2702     bfe->bytes_read = rb.size;
2703     bfe->io_time = t1 - t0;
2704     toku_free(rb.buf);
2705     return r;
2706 }
2707 
2708 // Effect: Read a node in.  If possible, read just the header.
2709 //         Perform version upgrade if necessary.
toku_deserialize_ftnode_from(int fd,BLOCKNUM blocknum,uint32_t fullhash,FTNODE * ftnode,FTNODE_DISK_DATA * ndd,ftnode_fetch_extra * bfe)2710 int toku_deserialize_ftnode_from(int fd,
2711                                  BLOCKNUM blocknum,
2712                                  uint32_t fullhash,
2713                                  FTNODE *ftnode,
2714                                  FTNODE_DISK_DATA *ndd,
2715                                  ftnode_fetch_extra *bfe) {
2716     int r = 0;
2717     struct rbuf rb = RBUF_INITIALIZER;
2718 
2719     // each function below takes the appropriate io/decompression/deserialize
2720     // statistics
2721 
2722     if (!bfe->read_all_partitions) {
2723         read_ftnode_header_from_fd_into_rbuf_if_small_enough(
2724             fd, blocknum, bfe->ft, &rb, bfe);
2725         r = deserialize_ftnode_header_from_rbuf_if_small_enough(
2726             ftnode, ndd, blocknum, fullhash, bfe, &rb, fd);
2727     } else {
2728         // force us to do it the old way
2729         r = -1;
2730     }
2731     if (r != 0) {
2732         // Something went wrong, go back to doing it the old way.
2733         r = deserialize_ftnode_from_fd(
2734             fd, blocknum, fullhash, ftnode, ndd, bfe, nullptr);
2735     }
2736 
2737     toku_free(rb.buf);
2738     return r;
2739 }
2740 
2741 void
toku_verify_or_set_counts(FTNODE UU (node))2742 toku_verify_or_set_counts(FTNODE UU(node)) {
2743 }
2744 
2745 int
toku_db_badformat(void)2746 toku_db_badformat(void) {
2747     return DB_BADFORMAT;
2748 }
2749 
2750 static size_t
serialize_rollback_log_size(ROLLBACK_LOG_NODE log)2751 serialize_rollback_log_size(ROLLBACK_LOG_NODE log) {
2752     size_t size = node_header_overhead //8 "tokuroll", 4 version, 4 version_original, 4 build_id
2753                  +16 //TXNID_PAIR
2754                  +8 //sequence
2755                  +8 //blocknum
2756                  +8 //previous (blocknum)
2757                  +8 //resident_bytecount
2758                  +8 //memarena size
2759                  +log->rollentry_resident_bytecount;
2760     return size;
2761 }
2762 
2763 static void
serialize_rollback_log_node_to_buf(ROLLBACK_LOG_NODE log,char * buf,size_t calculated_size,int UU (n_sub_blocks),struct sub_block UU (sub_block[]))2764 serialize_rollback_log_node_to_buf(ROLLBACK_LOG_NODE log, char *buf, size_t calculated_size, int UU(n_sub_blocks), struct sub_block UU(sub_block[])) {
2765     struct wbuf wb;
2766     wbuf_init(&wb, buf, calculated_size);
2767     {   //Serialize rollback log to local wbuf
2768         wbuf_nocrc_literal_bytes(&wb, "tokuroll", 8);
2769         lazy_assert(log->layout_version == FT_LAYOUT_VERSION);
2770         wbuf_nocrc_int(&wb, log->layout_version);
2771         wbuf_nocrc_int(&wb, log->layout_version_original);
2772         wbuf_nocrc_uint(&wb, BUILD_ID);
2773         wbuf_nocrc_TXNID_PAIR(&wb, log->txnid);
2774         wbuf_nocrc_ulonglong(&wb, log->sequence);
2775         wbuf_nocrc_BLOCKNUM(&wb, log->blocknum);
2776         wbuf_nocrc_BLOCKNUM(&wb, log->previous);
2777         wbuf_nocrc_ulonglong(&wb, log->rollentry_resident_bytecount);
2778         //Write down memarena size needed to restore
2779         wbuf_nocrc_ulonglong(&wb, log->rollentry_arena.total_size_in_use());
2780 
2781         {
2782             //Store rollback logs
2783             struct roll_entry *item;
2784             size_t done_before = wb.ndone;
2785             for (item = log->newest_logentry; item; item = item->prev) {
2786                 toku_logger_rollback_wbuf_nocrc_write(&wb, item);
2787             }
2788             lazy_assert(done_before + log->rollentry_resident_bytecount == wb.ndone);
2789         }
2790     }
2791     lazy_assert(wb.ndone == wb.size);
2792     lazy_assert(calculated_size==wb.ndone);
2793 }
2794 
2795 static void
serialize_uncompressed_block_to_memory(char * uncompressed_buf,int n_sub_blocks,struct sub_block sub_block[],enum toku_compression_method method,size_t * n_bytes_to_write,char ** bytes_to_write)2796 serialize_uncompressed_block_to_memory(char * uncompressed_buf,
2797                                        int n_sub_blocks,
2798                                        struct sub_block sub_block[/*n_sub_blocks*/],
2799                                        enum toku_compression_method method,
2800                                /*out*/ size_t *n_bytes_to_write,
2801                                /*out*/ char  **bytes_to_write)
2802 // Guarantees that the malloc'd BYTES_TO_WRITE is 512-byte aligned (so that O_DIRECT will work)
2803 {
2804     // allocate space for the compressed uncompressed_buf
2805     size_t compressed_len = get_sum_compressed_size_bound(n_sub_blocks, sub_block, method);
2806     size_t sub_block_header_len = sub_block_header_size(n_sub_blocks);
2807     size_t header_len = node_header_overhead + sub_block_header_len + sizeof (uint32_t); // node + sub_block + checksum
2808     char *XMALLOC_N_ALIGNED(512, roundup_to_multiple(512, header_len + compressed_len), compressed_buf);
2809 
2810     // copy the header
2811     memcpy(compressed_buf, uncompressed_buf, node_header_overhead);
2812     if (0) printf("First 4 bytes before compressing data are %02x%02x%02x%02x\n",
2813                   uncompressed_buf[node_header_overhead],   uncompressed_buf[node_header_overhead+1],
2814                   uncompressed_buf[node_header_overhead+2], uncompressed_buf[node_header_overhead+3]);
2815 
2816     // compress all of the sub blocks
2817     char *uncompressed_ptr = uncompressed_buf + node_header_overhead;
2818     char *compressed_ptr = compressed_buf + header_len;
2819     compressed_len = compress_all_sub_blocks(n_sub_blocks, sub_block, uncompressed_ptr, compressed_ptr, num_cores, ft_pool, method);
2820 
2821     //if (0) printf("Block %" PRId64 " Size before compressing %u, after compression %" PRIu64 "\n", blocknum.b, calculated_size-node_header_overhead, (uint64_t) compressed_len);
2822 
2823     // serialize the sub block header
2824     uint32_t *ptr = (uint32_t *)(compressed_buf + node_header_overhead);
2825     *ptr++ = toku_htod32(n_sub_blocks);
2826     for (int i=0; i<n_sub_blocks; i++) {
2827         ptr[0] = toku_htod32(sub_block[i].compressed_size);
2828         ptr[1] = toku_htod32(sub_block[i].uncompressed_size);
2829         ptr[2] = toku_htod32(sub_block[i].xsum);
2830         ptr += 3;
2831     }
2832 
2833     // compute the header checksum and serialize it
2834     uint32_t header_length = (char *)ptr - (char *)compressed_buf;
2835     uint32_t xsum = toku_x1764_memory(compressed_buf, header_length);
2836     *ptr = toku_htod32(xsum);
2837 
2838     uint32_t padded_len = roundup_to_multiple(512, header_len + compressed_len);
2839     // Zero out padding.
2840     for (uint32_t i = header_len+compressed_len; i < padded_len; i++) {
2841         compressed_buf[i] = 0;
2842     }
2843     *n_bytes_to_write = padded_len;
2844     *bytes_to_write   = compressed_buf;
2845 }
2846 
2847 void
toku_serialize_rollback_log_to_memory_uncompressed(ROLLBACK_LOG_NODE log,SERIALIZED_ROLLBACK_LOG_NODE serialized)2848 toku_serialize_rollback_log_to_memory_uncompressed(ROLLBACK_LOG_NODE log, SERIALIZED_ROLLBACK_LOG_NODE serialized) {
2849     // get the size of the serialized node
2850     size_t calculated_size = serialize_rollback_log_size(log);
2851 
2852     serialized->len = calculated_size;
2853     serialized->n_sub_blocks = 0;
2854     // choose sub block parameters
2855     int sub_block_size = 0;
2856     size_t data_size = calculated_size - node_header_overhead;
2857     choose_sub_block_size(data_size, max_sub_blocks, &sub_block_size, &serialized->n_sub_blocks);
2858     lazy_assert(0 < serialized->n_sub_blocks && serialized->n_sub_blocks <= max_sub_blocks);
2859     lazy_assert(sub_block_size > 0);
2860 
2861     // set the initial sub block size for all of the sub blocks
2862     for (int i = 0; i < serialized->n_sub_blocks; i++)
2863         sub_block_init(&serialized->sub_block[i]);
2864     set_all_sub_block_sizes(data_size, sub_block_size, serialized->n_sub_blocks, serialized->sub_block);
2865 
2866     // allocate space for the serialized node
2867     XMALLOC_N(calculated_size, serialized->data);
2868     // serialize the node into buf
2869     serialize_rollback_log_node_to_buf(log, serialized->data, calculated_size, serialized->n_sub_blocks, serialized->sub_block);
2870     serialized->blocknum = log->blocknum;
2871 }
2872 
toku_serialize_rollback_log_to(int fd,ROLLBACK_LOG_NODE log,SERIALIZED_ROLLBACK_LOG_NODE serialized_log,bool is_serialized,FT ft,bool for_checkpoint)2873 int toku_serialize_rollback_log_to(int fd,
2874                                    ROLLBACK_LOG_NODE log,
2875                                    SERIALIZED_ROLLBACK_LOG_NODE serialized_log,
2876                                    bool is_serialized,
2877                                    FT ft,
2878                                    bool for_checkpoint) {
2879     size_t n_to_write;
2880     char *compressed_buf;
2881     struct serialized_rollback_log_node serialized_local;
2882 
2883     if (is_serialized) {
2884         invariant_null(log);
2885     } else {
2886         invariant_null(serialized_log);
2887         serialized_log = &serialized_local;
2888         toku_serialize_rollback_log_to_memory_uncompressed(log, serialized_log);
2889     }
2890 
2891     BLOCKNUM blocknum = serialized_log->blocknum;
2892     invariant(blocknum.b >= 0);
2893 
2894     // Compress and malloc buffer to write
2895     serialize_uncompressed_block_to_memory(serialized_log->data,
2896                                            serialized_log->n_sub_blocks,
2897                                            serialized_log->sub_block,
2898                                            ft->h->compression_method,
2899                                            &n_to_write,
2900                                            &compressed_buf);
2901 
2902     // Dirties the ft
2903     DISKOFF offset;
2904     ft->blocktable.realloc_on_disk(
2905         blocknum, n_to_write, &offset, ft, fd, for_checkpoint);
2906 
2907     toku_os_full_pwrite(fd, compressed_buf, n_to_write, offset);
2908     toku_free(compressed_buf);
2909     if (!is_serialized) {
2910         toku_static_serialized_rollback_log_destroy(&serialized_local);
2911         log->dirty = false;  // See #1957.   Must set the node to be clean after
2912                              // serializing it so that it doesn't get written again
2913                              // on the next checkpoint or eviction.
2914     }
2915     return 0;
2916 }
2917 
2918 static int
deserialize_rollback_log_from_rbuf(BLOCKNUM blocknum,ROLLBACK_LOG_NODE * log_p,struct rbuf * rb)2919 deserialize_rollback_log_from_rbuf (BLOCKNUM blocknum, ROLLBACK_LOG_NODE *log_p, struct rbuf *rb) {
2920     ROLLBACK_LOG_NODE MALLOC(result);
2921     int r;
2922     if (result==NULL) {
2923 	r=get_error_errno();
2924 	if (0) { died0: toku_free(result); }
2925 	return r;
2926     }
2927 
2928     const void *magic;
2929     rbuf_literal_bytes(rb, &magic, 8);
2930     lazy_assert(!memcmp(magic, "tokuroll", 8));
2931 
2932     result->layout_version    = rbuf_int(rb);
2933     lazy_assert((FT_LAYOUT_VERSION_25 <= result->layout_version && result->layout_version <= FT_LAYOUT_VERSION_27) ||
2934                 (result->layout_version == FT_LAYOUT_VERSION));
2935     result->layout_version_original = rbuf_int(rb);
2936     result->layout_version_read_from_disk = result->layout_version;
2937     result->build_id = rbuf_int(rb);
2938     result->dirty = false;
2939     //TODO: Maybe add descriptor (or just descriptor version) here eventually?
2940     //TODO: This is hard.. everything is shared in a single dictionary.
2941     rbuf_TXNID_PAIR(rb, &result->txnid);
2942     result->sequence = rbuf_ulonglong(rb);
2943     result->blocknum = rbuf_blocknum(rb);
2944     if (result->blocknum.b != blocknum.b) {
2945         r = toku_db_badformat();
2946         goto died0;
2947     }
2948     result->previous       = rbuf_blocknum(rb);
2949     result->rollentry_resident_bytecount = rbuf_ulonglong(rb);
2950 
2951     size_t arena_initial_size = rbuf_ulonglong(rb);
2952     result->rollentry_arena.create(arena_initial_size);
2953     if (0) { died1: result->rollentry_arena.destroy(); goto died0; }
2954 
2955     //Load rollback entries
2956     lazy_assert(rb->size > 4);
2957     //Start with empty list
2958     result->oldest_logentry = result->newest_logentry = NULL;
2959     while (rb->ndone < rb->size) {
2960         struct roll_entry *item;
2961         uint32_t rollback_fsize = rbuf_int(rb); //Already read 4.  Rest is 4 smaller
2962         const void *item_vec;
2963         rbuf_literal_bytes(rb, &item_vec, rollback_fsize-4);
2964         unsigned char* item_buf = (unsigned char*)item_vec;
2965         r = toku_parse_rollback(item_buf, rollback_fsize-4, &item, &result->rollentry_arena);
2966         if (r!=0) {
2967             r = toku_db_badformat();
2968             goto died1;
2969         }
2970         //Add to head of list
2971         if (result->oldest_logentry) {
2972             result->oldest_logentry->prev = item;
2973             result->oldest_logentry       = item;
2974             item->prev = NULL;
2975         }
2976         else {
2977             result->oldest_logentry = result->newest_logentry = item;
2978             item->prev = NULL;
2979         }
2980     }
2981 
2982     toku_free(rb->buf);
2983     rb->buf = NULL;
2984     *log_p = result;
2985     return 0;
2986 }
2987 
2988 static int
deserialize_rollback_log_from_rbuf_versioned(uint32_t version,BLOCKNUM blocknum,ROLLBACK_LOG_NODE * log,struct rbuf * rb)2989 deserialize_rollback_log_from_rbuf_versioned (uint32_t version, BLOCKNUM blocknum,
2990                                               ROLLBACK_LOG_NODE *log,
2991                                               struct rbuf *rb) {
2992     int r = 0;
2993     ROLLBACK_LOG_NODE rollback_log_node = NULL;
2994     invariant((FT_LAYOUT_VERSION_25 <= version && version <= FT_LAYOUT_VERSION_27) || version == FT_LAYOUT_VERSION);
2995     r = deserialize_rollback_log_from_rbuf(blocknum, &rollback_log_node, rb);
2996     if (r==0) {
2997         *log = rollback_log_node;
2998     }
2999     return r;
3000 }
3001 
3002 int
decompress_from_raw_block_into_rbuf(uint8_t * raw_block,size_t raw_block_size,struct rbuf * rb,BLOCKNUM blocknum)3003 decompress_from_raw_block_into_rbuf(uint8_t *raw_block, size_t raw_block_size, struct rbuf *rb, BLOCKNUM blocknum) {
3004     int r = 0;
3005     // get the number of compressed sub blocks
3006     int n_sub_blocks;
3007     n_sub_blocks = toku_dtoh32(*(uint32_t*)(&raw_block[node_header_overhead]));
3008 
3009     // verify the number of sub blocks
3010     invariant(0 <= n_sub_blocks);
3011     invariant(n_sub_blocks <= max_sub_blocks);
3012 
3013     { // verify the header checksum
3014         uint32_t header_length = node_header_overhead + sub_block_header_size(n_sub_blocks);
3015         invariant(header_length <= raw_block_size);
3016         uint32_t xsum = toku_x1764_memory(raw_block, header_length);
3017         uint32_t stored_xsum = toku_dtoh32(*(uint32_t *)(raw_block + header_length));
3018         if (xsum != stored_xsum) {
3019             r = TOKUDB_BAD_CHECKSUM;
3020         }
3021     }
3022 
3023     // deserialize the sub block header
3024     struct sub_block sub_block[n_sub_blocks];
3025     uint32_t *sub_block_header = (uint32_t *) &raw_block[node_header_overhead+4];
3026     for (int i = 0; i < n_sub_blocks; i++) {
3027         sub_block_init(&sub_block[i]);
3028         sub_block[i].compressed_size = toku_dtoh32(sub_block_header[0]);
3029         sub_block[i].uncompressed_size = toku_dtoh32(sub_block_header[1]);
3030         sub_block[i].xsum = toku_dtoh32(sub_block_header[2]);
3031         sub_block_header += 3;
3032     }
3033 
3034     // This predicate needs to be here and instead of where it is set
3035     // for the compiler.
3036     if (r == TOKUDB_BAD_CHECKSUM) {
3037         goto exit;
3038     }
3039 
3040     // verify sub block sizes
3041     for (int i = 0; i < n_sub_blocks; i++) {
3042         uint32_t compressed_size = sub_block[i].compressed_size;
3043         if (compressed_size<=0   || compressed_size>(1<<30)) {
3044             r = toku_db_badformat();
3045             goto exit;
3046         }
3047 
3048         uint32_t uncompressed_size = sub_block[i].uncompressed_size;
3049         if (0) printf("Block %" PRId64 " Compressed size = %u, uncompressed size=%u\n", blocknum.b, compressed_size, uncompressed_size);
3050         if (uncompressed_size<=0 || uncompressed_size>(1<<30)) {
3051             r = toku_db_badformat();
3052             goto exit;
3053         }
3054     }
3055 
3056     // sum up the uncompressed size of the sub blocks
3057     size_t uncompressed_size;
3058     uncompressed_size = get_sum_uncompressed_size(n_sub_blocks, sub_block);
3059 
3060     // allocate the uncompressed buffer
3061     size_t size;
3062     size = node_header_overhead + uncompressed_size;
3063     unsigned char *buf;
3064     XMALLOC_N(size, buf);
3065     rbuf_init(rb, buf, size);
3066 
3067     // copy the uncompressed node header to the uncompressed buffer
3068     memcpy(rb->buf, raw_block, node_header_overhead);
3069 
3070     // point at the start of the compressed data (past the node header, the sub block header, and the header checksum)
3071     unsigned char *compressed_data;
3072     compressed_data = raw_block + node_header_overhead + sub_block_header_size(n_sub_blocks) + sizeof (uint32_t);
3073 
3074     // point at the start of the uncompressed data
3075     unsigned char *uncompressed_data;
3076     uncompressed_data = rb->buf + node_header_overhead;
3077 
3078     // decompress all the compressed sub blocks into the uncompressed buffer
3079     r = decompress_all_sub_blocks(n_sub_blocks, sub_block, compressed_data, uncompressed_data, num_cores, ft_pool);
3080     if (r != 0) {
3081         fprintf(stderr, "%s:%d block %" PRId64 " failed %d at %p size %lu\n", __FUNCTION__, __LINE__, blocknum.b, r, raw_block, raw_block_size);
3082         dump_bad_block(raw_block, raw_block_size);
3083         goto exit;
3084     }
3085 
3086     rb->ndone=0;
3087 exit:
3088     return r;
3089 }
3090 
decompress_from_raw_block_into_rbuf_versioned(uint32_t version,uint8_t * raw_block,size_t raw_block_size,struct rbuf * rb,BLOCKNUM blocknum)3091 static int decompress_from_raw_block_into_rbuf_versioned(uint32_t version, uint8_t *raw_block, size_t raw_block_size, struct rbuf *rb, BLOCKNUM blocknum) {
3092     // This function exists solely to accommodate future changes in compression.
3093     int r = 0;
3094     if ((version == FT_LAYOUT_VERSION_13 || version == FT_LAYOUT_VERSION_14) ||
3095         (FT_LAYOUT_VERSION_25 <= version && version <= FT_LAYOUT_VERSION_27) ||
3096         version == FT_LAYOUT_VERSION) {
3097         r = decompress_from_raw_block_into_rbuf(raw_block, raw_block_size, rb, blocknum);
3098     } else {
3099         abort();
3100     }
3101     return r;
3102 }
3103 
read_and_decompress_block_from_fd_into_rbuf(int fd,BLOCKNUM blocknum,DISKOFF offset,DISKOFF size,FT ft,struct rbuf * rb,int * layout_version_p)3104 static int read_and_decompress_block_from_fd_into_rbuf(
3105     int fd,
3106     BLOCKNUM blocknum,
3107     DISKOFF offset,
3108     DISKOFF size,
3109     FT ft,
3110     struct rbuf *rb,
3111     /* out */ int *layout_version_p) {
3112     int r = 0;
3113     if (0) printf("Deserializing Block %" PRId64 "\n", blocknum.b);
3114 
3115     DISKOFF size_aligned = roundup_to_multiple(512, size);
3116     uint8_t *XMALLOC_N_ALIGNED(512, size_aligned, raw_block);
3117     {
3118         // read the (partially compressed) block
3119         ssize_t rlen = toku_os_pread(fd, raw_block, size_aligned, offset);
3120         lazy_assert((DISKOFF)rlen >= size);
3121         lazy_assert((DISKOFF)rlen <= size_aligned);
3122     }
3123     // get the layout_version
3124     int layout_version;
3125     {
3126         uint8_t *magic = raw_block + uncompressed_magic_offset;
3127         if (memcmp(magic, "tokuleaf", 8)!=0 &&
3128             memcmp(magic, "tokunode", 8)!=0 &&
3129             memcmp(magic, "tokuroll", 8)!=0) {
3130             r = toku_db_badformat();
3131             goto cleanup;
3132         }
3133         uint8_t *version = raw_block + uncompressed_version_offset;
3134         layout_version = toku_dtoh32(*(uint32_t*)version);
3135         if (layout_version < FT_LAYOUT_MIN_SUPPORTED_VERSION || layout_version > FT_LAYOUT_VERSION) {
3136             r = toku_db_badformat();
3137             goto cleanup;
3138         }
3139     }
3140 
3141     r = decompress_from_raw_block_into_rbuf_versioned(layout_version, raw_block, size, rb, blocknum);
3142     if (r != 0) {
3143         // We either failed the checksome, or there is a bad format in
3144         // the buffer.
3145         if (r == TOKUDB_BAD_CHECKSUM) {
3146             fprintf(stderr,
3147                     "Checksum failure while reading raw block in file %s.\n",
3148                     toku_cachefile_fname_in_env(ft->cf));
3149             abort();
3150         } else {
3151             r = toku_db_badformat();
3152             goto cleanup;
3153         }
3154     }
3155 
3156     *layout_version_p = layout_version;
3157 cleanup:
3158     if (r!=0) {
3159         if (rb->buf) toku_free(rb->buf);
3160         rb->buf = NULL;
3161     }
3162     if (raw_block) {
3163         toku_free(raw_block);
3164     }
3165     return r;
3166 }
3167 
3168 // Read rollback log node from file into struct.
3169 // Perform version upgrade if necessary.
toku_deserialize_rollback_log_from(int fd,BLOCKNUM blocknum,ROLLBACK_LOG_NODE * logp,FT ft)3170 int toku_deserialize_rollback_log_from(int fd, BLOCKNUM blocknum, ROLLBACK_LOG_NODE *logp, FT ft) {
3171     int layout_version = 0;
3172     int r;
3173 
3174     struct rbuf rb;
3175     rbuf_init(&rb, nullptr, 0);
3176 
3177     // get the file offset and block size for the block
3178     DISKOFF offset, size;
3179     ft->blocktable.translate_blocknum_to_offset_size(blocknum, &offset, &size);
3180 
3181     // if the size is 0, then the blocknum is unused
3182     if (size == 0) {
3183         // blocknum is unused, just create an empty one and get out
3184         ROLLBACK_LOG_NODE XMALLOC(log);
3185         rollback_empty_log_init(log);
3186         log->blocknum.b = blocknum.b;
3187         r = 0;
3188         *logp = log;
3189         goto cleanup;
3190     }
3191 
3192     r = read_and_decompress_block_from_fd_into_rbuf(fd, blocknum, offset, size, ft, &rb, &layout_version);
3193     if (r!=0) goto cleanup;
3194 
3195     {
3196         uint8_t *magic = rb.buf + uncompressed_magic_offset;
3197         if (memcmp(magic, "tokuroll", 8)!=0) {
3198             r = toku_db_badformat();
3199             goto cleanup;
3200         }
3201     }
3202 
3203     r = deserialize_rollback_log_from_rbuf_versioned(layout_version, blocknum, logp, &rb);
3204 
3205 cleanup:
3206     if (rb.buf) {
3207         toku_free(rb.buf);
3208     }
3209     return r;
3210 }
3211 
3212 int
toku_upgrade_subtree_estimates_to_stat64info(int fd,FT ft)3213 toku_upgrade_subtree_estimates_to_stat64info(int fd, FT ft)
3214 {
3215     int r = 0;
3216     // 15 was the last version with subtree estimates
3217     invariant(ft->layout_version_read_from_disk <= FT_LAYOUT_VERSION_15);
3218 
3219     FTNODE unused_node = NULL;
3220     FTNODE_DISK_DATA unused_ndd = NULL;
3221     ftnode_fetch_extra bfe;
3222     bfe.create_for_min_read(ft);
3223     r = deserialize_ftnode_from_fd(fd, ft->h->root_blocknum, 0, &unused_node, &unused_ndd,
3224                                    &bfe, &ft->h->on_disk_stats);
3225     ft->in_memory_stats = ft->h->on_disk_stats;
3226 
3227     if (unused_node) {
3228         toku_ftnode_free(&unused_node);
3229     }
3230     if (unused_ndd) {
3231         toku_free(unused_ndd);
3232     }
3233     return r;
3234 }
3235 
3236 int
toku_upgrade_msn_from_root_to_header(int fd,FT ft)3237 toku_upgrade_msn_from_root_to_header(int fd, FT ft)
3238 {
3239     int r;
3240     // 21 was the first version with max_msn_in_ft in the header
3241     invariant(ft->layout_version_read_from_disk <= FT_LAYOUT_VERSION_20);
3242 
3243     FTNODE node;
3244     FTNODE_DISK_DATA ndd;
3245     ftnode_fetch_extra bfe;
3246     bfe.create_for_min_read(ft);
3247     r = deserialize_ftnode_from_fd(fd, ft->h->root_blocknum, 0, &node, &ndd, &bfe, nullptr);
3248     if (r != 0) {
3249         goto exit;
3250     }
3251 
3252     ft->h->max_msn_in_ft = node->max_msn_applied_to_node_on_disk;
3253     toku_ftnode_free(&node);
3254     toku_free(ndd);
3255  exit:
3256     return r;
3257 }
3258 
3259 #undef UPGRADE_STATUS_VALUE
3260