1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22 ----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39 #include "portability/toku_atomic.h"
40
41 #include "ft/cachetable/cachetable.h"
42 #include "ft/ft.h"
43 #include "ft/ft-internal.h"
44 #include "ft/node.h"
45 #include "ft/logger/log-internal.h"
46 #include "ft/txn/rollback.h"
47 #include "ft/serialize/block_allocator.h"
48 #include "ft/serialize/block_table.h"
49 #include "ft/serialize/compress.h"
50 #include "ft/serialize/ft_node-serialize.h"
51 #include "ft/serialize/sub_block.h"
52 #include "util/sort.h"
53 #include "util/threadpool.h"
54 #include "util/status.h"
55 #include "util/scoped_malloc.h"
56
57 static FT_UPGRADE_STATUS_S ft_upgrade_status;
58
59 #define STATUS_INIT(k,c,t,l,inc) TOKUFT_STATUS_INIT(ft_upgrade_status, k, c, t, "ft upgrade: " l, inc)
60
61 static void
status_init(void)62 status_init(void)
63 {
64 // Note, this function initializes the keyname, type, and legend fields.
65 // Value fields are initialized to zero by compiler.
66 STATUS_INIT(FT_UPGRADE_FOOTPRINT, nullptr, UINT64, "footprint", TOKU_ENGINE_STATUS);
67 ft_upgrade_status.initialized = true;
68 }
69 #undef STATUS_INIT
70
71 #define UPGRADE_STATUS_VALUE(x) ft_upgrade_status.status[x].value.num
72
73 void
toku_ft_upgrade_get_status(FT_UPGRADE_STATUS s)74 toku_ft_upgrade_get_status(FT_UPGRADE_STATUS s) {
75 if (!ft_upgrade_status.initialized) {
76 status_init();
77 }
78 UPGRADE_STATUS_VALUE(FT_UPGRADE_FOOTPRINT) = toku_log_upgrade_get_footprint();
79 *s = ft_upgrade_status;
80 }
81
82 static int num_cores = 0; // cache the number of cores for the parallelization
83 static struct toku_thread_pool *ft_pool = NULL;
84 bool toku_serialize_in_parallel;
85
get_num_cores(void)86 int get_num_cores(void) {
87 return num_cores;
88 }
89
get_ft_pool(void)90 struct toku_thread_pool *get_ft_pool(void) {
91 return ft_pool;
92 }
93
toku_serialize_set_parallel(bool in_parallel)94 void toku_serialize_set_parallel(bool in_parallel) {
95 toku_unsafe_set(&toku_serialize_in_parallel, in_parallel);
96 }
97
toku_ft_serialize_layer_init(void)98 void toku_ft_serialize_layer_init(void) {
99 num_cores = toku_os_get_number_active_processors();
100 int r = toku_thread_pool_create(&ft_pool, num_cores);
101 lazy_assert_zero(r);
102 toku_serialize_in_parallel = false;
103 }
104
toku_ft_serialize_layer_destroy(void)105 void toku_ft_serialize_layer_destroy(void) {
106 toku_thread_pool_destroy(&ft_pool);
107 }
108
109 enum { FILE_CHANGE_INCREMENT = (16 << 20) };
110
111 static inline uint64_t
alignup64(uint64_t a,uint64_t b)112 alignup64(uint64_t a, uint64_t b) {
113 return ((a+b-1)/b)*b;
114 }
115
116 // safe_file_size_lock must be held.
117 void
toku_maybe_truncate_file(int fd,uint64_t size_used,uint64_t expected_size,uint64_t * new_sizep)118 toku_maybe_truncate_file (int fd, uint64_t size_used, uint64_t expected_size, uint64_t *new_sizep)
119 // Effect: If file size >= SIZE+32MiB, reduce file size.
120 // (32 instead of 16.. hysteresis).
121 // Return 0 on success, otherwise an error number.
122 {
123 int64_t file_size;
124 {
125 int r = toku_os_get_file_size(fd, &file_size);
126 lazy_assert_zero(r);
127 invariant(file_size >= 0);
128 }
129 invariant(expected_size == (uint64_t)file_size);
130 // If file space is overallocated by at least 32M
131 if ((uint64_t)file_size >= size_used + (2*FILE_CHANGE_INCREMENT)) {
132 toku_off_t new_size = alignup64(size_used, (2*FILE_CHANGE_INCREMENT)); //Truncate to new size_used.
133 invariant(new_size < file_size);
134 invariant(new_size >= 0);
135 int r = ftruncate(fd, new_size);
136 lazy_assert_zero(r);
137 *new_sizep = new_size;
138 }
139 else {
140 *new_sizep = file_size;
141 }
142 return;
143 }
144
145 static int64_t
min64(int64_t a,int64_t b)146 min64(int64_t a, int64_t b) {
147 if (a<b) return a;
148 return b;
149 }
150
151 void
toku_maybe_preallocate_in_file(int fd,int64_t size,int64_t expected_size,int64_t * new_size)152 toku_maybe_preallocate_in_file (int fd, int64_t size, int64_t expected_size, int64_t *new_size)
153 // Effect: make the file bigger by either doubling it or growing by 16MiB whichever is less, until it is at least size
154 // Return 0 on success, otherwise an error number.
155 {
156 int64_t file_size = 0;
157 //TODO(yoni): Allow variable stripe_width (perhaps from ft) for larger raids
158 const uint64_t stripe_width = 4096;
159 {
160 int r = toku_os_get_file_size(fd, &file_size);
161 if (r != 0) { // debug #2463
162 int the_errno = get_maybe_error_errno();
163 fprintf(stderr, "%s:%d fd=%d size=%" PRIu64 " r=%d errno=%d\n", __FUNCTION__, __LINE__, fd, size, r, the_errno); fflush(stderr);
164 }
165 lazy_assert_zero(r);
166 }
167 invariant(file_size >= 0);
168 invariant(expected_size == file_size);
169 // We want to double the size of the file, or add 16MiB, whichever is less.
170 // We emulate calling this function repeatedly until it satisfies the request.
171 int64_t to_write = 0;
172 if (file_size == 0) {
173 // Prevent infinite loop by starting with stripe_width as a base case.
174 to_write = stripe_width;
175 }
176 while (file_size + to_write < size) {
177 to_write += alignup64(min64(file_size + to_write, FILE_CHANGE_INCREMENT), stripe_width);
178 }
179 if (to_write > 0) {
180 assert(to_write%512==0);
181 toku::scoped_malloc_aligned wbuf_aligned(to_write, 512);
182 char *wbuf = reinterpret_cast<char *>(wbuf_aligned.get());
183 memset(wbuf, 0, to_write);
184 toku_off_t start_write = alignup64(file_size, stripe_width);
185 invariant(start_write >= file_size);
186 toku_os_full_pwrite(fd, wbuf, to_write, start_write);
187 *new_size = start_write + to_write;
188 }
189 else {
190 *new_size = file_size;
191 }
192 }
193
194 // Don't include the sub_block header
195 // Overhead calculated in same order fields are written to wbuf
196 enum {
197 node_header_overhead = (8+ // magic "tokunode" or "tokuleaf" or "tokuroll"
198 4+ // layout_version
199 4+ // layout_version_original
200 4), // build_id
201 };
202
203 // uncompressed header offsets
204 enum {
205 uncompressed_magic_offset = 0,
206 uncompressed_version_offset = 8,
207 };
208
209 static uint32_t
serialize_node_header_size(FTNODE node)210 serialize_node_header_size(FTNODE node) {
211 uint32_t retval = 0;
212 retval += 8; // magic
213 retval += sizeof(node->layout_version);
214 retval += sizeof(node->layout_version_original);
215 retval += 4; // BUILD_ID
216 retval += 4; // n_children
217 retval += node->n_children*8; // encode start offset and length of each partition
218 retval += 4; // checksum
219 return retval;
220 }
221
222 static void
serialize_node_header(FTNODE node,FTNODE_DISK_DATA ndd,struct wbuf * wbuf)223 serialize_node_header(FTNODE node, FTNODE_DISK_DATA ndd, struct wbuf *wbuf) {
224 if (node->height == 0)
225 wbuf_nocrc_literal_bytes(wbuf, "tokuleaf", 8);
226 else
227 wbuf_nocrc_literal_bytes(wbuf, "tokunode", 8);
228 paranoid_invariant(node->layout_version == FT_LAYOUT_VERSION);
229 wbuf_nocrc_int(wbuf, node->layout_version);
230 wbuf_nocrc_int(wbuf, node->layout_version_original);
231 wbuf_nocrc_uint(wbuf, BUILD_ID);
232 wbuf_nocrc_int (wbuf, node->n_children);
233 for (int i=0; i<node->n_children; i++) {
234 assert(BP_SIZE(ndd,i)>0);
235 wbuf_nocrc_int(wbuf, BP_START(ndd, i)); // save the beginning of the partition
236 wbuf_nocrc_int(wbuf, BP_SIZE (ndd, i)); // and the size
237 }
238 // checksum the header
239 uint32_t end_to_end_checksum = toku_x1764_memory(wbuf->buf, wbuf_get_woffset(wbuf));
240 wbuf_nocrc_int(wbuf, end_to_end_checksum);
241 invariant(wbuf->ndone == wbuf->size);
242 }
243
244 static uint32_t
serialize_ftnode_partition_size(FTNODE node,int i)245 serialize_ftnode_partition_size (FTNODE node, int i)
246 {
247 uint32_t result = 0;
248 paranoid_invariant(node->bp[i].state == PT_AVAIL);
249 result++; // Byte that states what the partition is
250 if (node->height > 0) {
251 NONLEAF_CHILDINFO bnc = BNC(node, i);
252 // number of messages (4 bytes) plus size of the buffer
253 result += (4 + toku_bnc_nbytesinbuf(bnc));
254 // number of offsets (4 bytes) plus an array of 4 byte offsets, for each message tree
255 result += (4 + (4 * bnc->fresh_message_tree.size()));
256 result += (4 + (4 * bnc->stale_message_tree.size()));
257 result += (4 + (4 * bnc->broadcast_list.size()));
258 }
259 else {
260 result += 4 + bn_data::HEADER_LENGTH; // n_entries in buffer table + basement header
261 result += BLB_NBYTESINDATA(node, i);
262 }
263 result += 4; // checksum
264 return result;
265 }
266
267 #define FTNODE_PARTITION_DMT_LEAVES 0xaa
268 #define FTNODE_PARTITION_MSG_BUFFER 0xbb
269
UU()270 UU() static int
271 assert_fresh(const int32_t &offset, const uint32_t UU(idx), message_buffer *const msg_buffer) {
272 bool is_fresh = msg_buffer->get_freshness(offset);
273 assert(is_fresh);
274 return 0;
275 }
276
UU()277 UU() static int
278 assert_stale(const int32_t &offset, const uint32_t UU(idx), message_buffer *const msg_buffer) {
279 bool is_fresh = msg_buffer->get_freshness(offset);
280 assert(!is_fresh);
281 return 0;
282 }
283
bnc_verify_message_trees(NONLEAF_CHILDINFO UU (bnc))284 static void bnc_verify_message_trees(NONLEAF_CHILDINFO UU(bnc)) {
285 #ifdef TOKU_DEBUG_PARANOID
286 bnc->fresh_message_tree.iterate<message_buffer, assert_fresh>(&bnc->msg_buffer);
287 bnc->stale_message_tree.iterate<message_buffer, assert_stale>(&bnc->msg_buffer);
288 #endif
289 }
290
291 static int
wbuf_write_offset(const int32_t & offset,const uint32_t UU (idx),struct wbuf * const wb)292 wbuf_write_offset(const int32_t &offset, const uint32_t UU(idx), struct wbuf *const wb) {
293 wbuf_nocrc_int(wb, offset);
294 return 0;
295 }
296
serialize_child_buffer(NONLEAF_CHILDINFO bnc,struct wbuf * wb)297 static void serialize_child_buffer(NONLEAF_CHILDINFO bnc, struct wbuf *wb) {
298 unsigned char ch = FTNODE_PARTITION_MSG_BUFFER;
299 wbuf_nocrc_char(wb, ch);
300
301 // serialize the message buffer
302 bnc->msg_buffer.serialize_to_wbuf(wb);
303
304 // serialize the message trees (num entries, offsets array):
305 // first, verify their contents are consistent with the message buffer
306 bnc_verify_message_trees(bnc);
307
308 // fresh
309 wbuf_nocrc_int(wb, bnc->fresh_message_tree.size());
310 bnc->fresh_message_tree.iterate<struct wbuf, wbuf_write_offset>(wb);
311
312 // stale
313 wbuf_nocrc_int(wb, bnc->stale_message_tree.size());
314 bnc->stale_message_tree.iterate<struct wbuf, wbuf_write_offset>(wb);
315
316 // broadcast
317 wbuf_nocrc_int(wb, bnc->broadcast_list.size());
318 bnc->broadcast_list.iterate<struct wbuf, wbuf_write_offset>(wb);
319 }
320
321 //
322 // Serialize the i'th partition of node into sb
323 // For leaf nodes, this would be the i'th basement node
324 // For internal nodes, this would be the i'th internal node
325 //
326 static void
serialize_ftnode_partition(FTNODE node,int i,struct sub_block * sb)327 serialize_ftnode_partition(FTNODE node, int i, struct sub_block *sb) {
328 // Caller should have allocated memory.
329 invariant_notnull(sb->uncompressed_ptr);
330 invariant(sb->uncompressed_size > 0);
331 paranoid_invariant(sb->uncompressed_size == serialize_ftnode_partition_size(node, i));
332
333 //
334 // Now put the data into sb->uncompressed_ptr
335 //
336 struct wbuf wb;
337 wbuf_init(&wb, sb->uncompressed_ptr, sb->uncompressed_size);
338 if (node->height > 0) {
339 // TODO: (Zardosht) possibly exit early if there are no messages
340 serialize_child_buffer(BNC(node, i), &wb);
341 }
342 else {
343 unsigned char ch = FTNODE_PARTITION_DMT_LEAVES;
344 bn_data* bd = BLB_DATA(node, i);
345
346 wbuf_nocrc_char(&wb, ch);
347 wbuf_nocrc_uint(&wb, bd->num_klpairs());
348
349 bd->serialize_to_wbuf(&wb);
350 }
351 uint32_t end_to_end_checksum = toku_x1764_memory(sb->uncompressed_ptr, wbuf_get_woffset(&wb));
352 wbuf_nocrc_int(&wb, end_to_end_checksum);
353 invariant(wb.ndone == wb.size);
354 invariant(sb->uncompressed_size==wb.ndone);
355 }
356
357 //
358 // Takes the data in sb->uncompressed_ptr, and compresses it
359 // into a newly allocated buffer sb->compressed_ptr
360 //
361 static void
compress_ftnode_sub_block(struct sub_block * sb,enum toku_compression_method method)362 compress_ftnode_sub_block(struct sub_block *sb, enum toku_compression_method method) {
363 invariant(sb->compressed_ptr != nullptr);
364 invariant(sb->compressed_size_bound > 0);
365 paranoid_invariant(sb->compressed_size_bound == toku_compress_bound(method, sb->uncompressed_size));
366
367 //
368 // This probably seems a bit complicated. Here is what is going on.
369 // In PerconaFT 5.0, sub_blocks were compressed and the compressed data
370 // was checksummed. The checksum did NOT include the size of the compressed data
371 // and the size of the uncompressed data. The fields of sub_block only reference the
372 // compressed data, and it is the responsibility of the user of the sub_block
373 // to write the length
374 //
375 // For Dr. No, we want the checksum to also include the size of the compressed data, and the
376 // size of the decompressed data, because this data
377 // may be read off of disk alone, so it must be verifiable alone.
378 //
379 // So, we pass in a buffer to compress_nocrc_sub_block that starts 8 bytes after the beginning
380 // of sb->compressed_ptr, so we have space to put in the sizes, and then run the checksum.
381 //
382 sb->compressed_size = compress_nocrc_sub_block(
383 sb,
384 (char *)sb->compressed_ptr + 8,
385 sb->compressed_size_bound,
386 method
387 );
388
389 uint32_t* extra = (uint32_t *)(sb->compressed_ptr);
390 // store the compressed and uncompressed size at the beginning
391 extra[0] = toku_htod32(sb->compressed_size);
392 extra[1] = toku_htod32(sb->uncompressed_size);
393 // now checksum the entire thing
394 sb->compressed_size += 8; // now add the eight bytes that we saved for the sizes
395 sb->xsum = toku_x1764_memory(sb->compressed_ptr,sb->compressed_size);
396
397 //
398 // This is the end result for Dr. No and forward. For ftnodes, sb->compressed_ptr contains
399 // two integers at the beginning, the size and uncompressed size, and then the compressed
400 // data. sb->xsum contains the checksum of this entire thing.
401 //
402 // In PerconaFT 5.0, sb->compressed_ptr only contained the compressed data, sb->xsum
403 // checksummed only the compressed data, and the checksumming of the sizes were not
404 // done here.
405 //
406 }
407
408 //
409 // Returns the size needed to serialize the ftnode info
410 // Does not include header information that is common with rollback logs
411 // such as the magic, layout_version, and build_id
412 // Includes only node specific info such as pivot information, n_children, and so on
413 //
414 static uint32_t
serialize_ftnode_info_size(FTNODE node)415 serialize_ftnode_info_size(FTNODE node)
416 {
417 uint32_t retval = 0;
418 retval += 8; // max_msn_applied_to_node_on_disk
419 retval += 4; // nodesize
420 retval += 4; // flags
421 retval += 4; // height;
422 retval += 8; // oldest_referenced_xid_known
423 retval += node->pivotkeys.serialized_size();
424 retval += (node->n_children-1)*4; // encode length of each pivot
425 if (node->height > 0) {
426 retval += node->n_children*8; // child blocknum's
427 }
428 retval += 4; // checksum
429 return retval;
430 }
431
serialize_ftnode_info(FTNODE node,SUB_BLOCK sb)432 static void serialize_ftnode_info(FTNODE node, SUB_BLOCK sb) {
433 // Memory must have been allocated by our caller.
434 invariant(sb->uncompressed_size > 0);
435 invariant_notnull(sb->uncompressed_ptr);
436 paranoid_invariant(sb->uncompressed_size == serialize_ftnode_info_size(node));
437
438 struct wbuf wb;
439 wbuf_init(&wb, sb->uncompressed_ptr, sb->uncompressed_size);
440
441 wbuf_MSN(&wb, node->max_msn_applied_to_node_on_disk);
442 wbuf_nocrc_uint(&wb, 0); // write a dummy value for where node->nodesize used to be
443 wbuf_nocrc_uint(&wb, node->flags);
444 wbuf_nocrc_int (&wb, node->height);
445 wbuf_TXNID(&wb, node->oldest_referenced_xid_known);
446 node->pivotkeys.serialize_to_wbuf(&wb);
447
448 // child blocks, only for internal nodes
449 if (node->height > 0) {
450 for (int i = 0; i < node->n_children; i++) {
451 wbuf_nocrc_BLOCKNUM(&wb, BP_BLOCKNUM(node,i));
452 }
453 }
454
455 uint32_t end_to_end_checksum = toku_x1764_memory(sb->uncompressed_ptr, wbuf_get_woffset(&wb));
456 wbuf_nocrc_int(&wb, end_to_end_checksum);
457 invariant(wb.ndone == wb.size);
458 invariant(sb->uncompressed_size==wb.ndone);
459 }
460
461 // This is the size of the uncompressed data, not including the compression headers
462 unsigned int
toku_serialize_ftnode_size(FTNODE node)463 toku_serialize_ftnode_size (FTNODE node) {
464 unsigned int result = 0;
465 //
466 // As of now, this seems to be called if and only if the entire node is supposed
467 // to be in memory, so we will assert it.
468 //
469 toku_ftnode_assert_fully_in_memory(node);
470 result += serialize_node_header_size(node);
471 result += serialize_ftnode_info_size(node);
472 for (int i = 0; i < node->n_children; i++) {
473 result += serialize_ftnode_partition_size(node,i);
474 }
475 return result;
476 }
477
478 struct serialize_times {
479 tokutime_t serialize_time;
480 tokutime_t compress_time;
481 };
482
483 static void
serialize_and_compress_partition(FTNODE node,int childnum,enum toku_compression_method compression_method,SUB_BLOCK sb,struct serialize_times * st)484 serialize_and_compress_partition(FTNODE node,
485 int childnum,
486 enum toku_compression_method compression_method,
487 SUB_BLOCK sb,
488 struct serialize_times *st)
489 {
490 // serialize, compress, update status
491 tokutime_t t0 = toku_time_now();
492 serialize_ftnode_partition(node, childnum, sb);
493 tokutime_t t1 = toku_time_now();
494 compress_ftnode_sub_block(sb, compression_method);
495 tokutime_t t2 = toku_time_now();
496
497 st->serialize_time += t1 - t0;
498 st->compress_time += t2 - t1;
499 }
500
501 void
toku_create_compressed_partition_from_available(FTNODE node,int childnum,enum toku_compression_method compression_method,SUB_BLOCK sb)502 toku_create_compressed_partition_from_available(
503 FTNODE node,
504 int childnum,
505 enum toku_compression_method compression_method,
506 SUB_BLOCK sb
507 )
508 {
509 tokutime_t t0 = toku_time_now();
510
511 // serialize
512 sb->uncompressed_size = serialize_ftnode_partition_size(node, childnum);
513 toku::scoped_malloc uncompressed_buf(sb->uncompressed_size);
514 sb->uncompressed_ptr = uncompressed_buf.get();
515 serialize_ftnode_partition(node, childnum, sb);
516
517 tokutime_t t1 = toku_time_now();
518
519 // compress. no need to pad with extra bytes for sizes/xsum - we're not storing them
520 set_compressed_size_bound(sb, compression_method);
521 sb->compressed_ptr = toku_xmalloc(sb->compressed_size_bound);
522 sb->compressed_size = compress_nocrc_sub_block(
523 sb,
524 sb->compressed_ptr,
525 sb->compressed_size_bound,
526 compression_method
527 );
528 sb->uncompressed_ptr = NULL;
529
530 tokutime_t t2 = toku_time_now();
531
532 toku_ft_status_update_serialize_times(node, t1 - t0, t2 - t1);
533 }
534
535 static void
serialize_and_compress_serially(FTNODE node,int npartitions,enum toku_compression_method compression_method,struct sub_block sb[],struct serialize_times * st)536 serialize_and_compress_serially(FTNODE node,
537 int npartitions,
538 enum toku_compression_method compression_method,
539 struct sub_block sb[],
540 struct serialize_times *st) {
541 for (int i = 0; i < npartitions; i++) {
542 serialize_and_compress_partition(node, i, compression_method, &sb[i], st);
543 }
544 }
545
546 struct serialize_compress_work {
547 struct work base;
548 FTNODE node;
549 int i;
550 enum toku_compression_method compression_method;
551 struct sub_block *sb;
552 struct serialize_times st;
553 };
554
555 static void *
serialize_and_compress_worker(void * arg)556 serialize_and_compress_worker(void *arg) {
557 struct workset *ws = (struct workset *) arg;
558 while (1) {
559 struct serialize_compress_work *w = (struct serialize_compress_work *) workset_get(ws);
560 if (w == NULL)
561 break;
562 int i = w->i;
563 serialize_and_compress_partition(w->node, i, w->compression_method, &w->sb[i], &w->st);
564 }
565 workset_release_ref(ws);
566 return arg;
567 }
568
569 static void
serialize_and_compress_in_parallel(FTNODE node,int npartitions,enum toku_compression_method compression_method,struct sub_block sb[],struct serialize_times * st)570 serialize_and_compress_in_parallel(FTNODE node,
571 int npartitions,
572 enum toku_compression_method compression_method,
573 struct sub_block sb[],
574 struct serialize_times *st) {
575 if (npartitions == 1) {
576 serialize_and_compress_partition(node, 0, compression_method, &sb[0], st);
577 } else {
578 int T = num_cores;
579 if (T > npartitions)
580 T = npartitions;
581 if (T > 0)
582 T = T - 1;
583 struct workset ws;
584 ZERO_STRUCT(ws);
585 workset_init(&ws);
586 struct serialize_compress_work work[npartitions];
587 workset_lock(&ws);
588 for (int i = 0; i < npartitions; i++) {
589 work[i] = (struct serialize_compress_work) { .base = {{NULL, NULL}},
590 .node = node,
591 .i = i,
592 .compression_method = compression_method,
593 .sb = sb,
594 .st = { .serialize_time = 0, .compress_time = 0} };
595 workset_put_locked(&ws, &work[i].base);
596 }
597 workset_unlock(&ws);
598 toku_thread_pool_run(ft_pool, 0, &T, serialize_and_compress_worker, &ws);
599 workset_add_ref(&ws, T);
600 serialize_and_compress_worker(&ws);
601 workset_join(&ws);
602 workset_destroy(&ws);
603
604 // gather up the statistics from each thread's work item
605 for (int i = 0; i < npartitions; i++) {
606 st->serialize_time += work[i].st.serialize_time;
607 st->compress_time += work[i].st.compress_time;
608 }
609 }
610 }
611
612 static void
serialize_and_compress_sb_node_info(FTNODE node,struct sub_block * sb,enum toku_compression_method compression_method,struct serialize_times * st)613 serialize_and_compress_sb_node_info(FTNODE node, struct sub_block *sb,
614 enum toku_compression_method compression_method, struct serialize_times *st) {
615 // serialize, compress, update serialize times.
616 tokutime_t t0 = toku_time_now();
617 serialize_ftnode_info(node, sb);
618 tokutime_t t1 = toku_time_now();
619 compress_ftnode_sub_block(sb, compression_method);
620 tokutime_t t2 = toku_time_now();
621
622 st->serialize_time += t1 - t0;
623 st->compress_time += t2 - t1;
624 }
625
toku_serialize_ftnode_to_memory(FTNODE node,FTNODE_DISK_DATA * ndd,unsigned int basementnodesize,enum toku_compression_method compression_method,bool do_rebalancing,bool in_parallel,size_t * n_bytes_to_write,size_t * n_uncompressed_bytes,char ** bytes_to_write)626 int toku_serialize_ftnode_to_memory(FTNODE node,
627 FTNODE_DISK_DATA* ndd,
628 unsigned int basementnodesize,
629 enum toku_compression_method compression_method,
630 bool do_rebalancing,
631 bool in_parallel, // for loader is true, for toku_ftnode_flush_callback, is false
632 /*out*/ size_t *n_bytes_to_write,
633 /*out*/ size_t *n_uncompressed_bytes,
634 /*out*/ char **bytes_to_write)
635 // Effect: Writes out each child to a separate malloc'd buffer, then compresses
636 // all of them, and writes the uncompressed header, to bytes_to_write,
637 // which is malloc'd.
638 //
639 // The resulting buffer is guaranteed to be 512-byte aligned and the total length is a multiple of 512 (so we pad with zeros at the end if needed).
640 // 512-byte padding is for O_DIRECT to work.
641 {
642 toku_ftnode_assert_fully_in_memory(node);
643
644 if (do_rebalancing && node->height == 0) {
645 toku_ftnode_leaf_rebalance(node, basementnodesize);
646 }
647 const int npartitions = node->n_children;
648
649 // Each partition represents a compressed sub block
650 // For internal nodes, a sub block is a message buffer
651 // For leaf nodes, a sub block is a basement node
652 toku::scoped_calloc sb_buf(sizeof(struct sub_block) * npartitions);
653 struct sub_block *sb = reinterpret_cast<struct sub_block *>(sb_buf.get());
654 XREALLOC_N(npartitions, *ndd);
655
656 //
657 // First, let's serialize and compress the individual sub blocks
658 //
659
660 // determine how large our serialization and compression buffers need to be.
661 size_t serialize_buf_size = 0, compression_buf_size = 0;
662 for (int i = 0; i < node->n_children; i++) {
663 sb[i].uncompressed_size = serialize_ftnode_partition_size(node, i);
664 sb[i].compressed_size_bound = toku_compress_bound(compression_method, sb[i].uncompressed_size);
665 serialize_buf_size += sb[i].uncompressed_size;
666 compression_buf_size += sb[i].compressed_size_bound + 8; // add 8 extra bytes, 4 for compressed size, 4 for decompressed size
667 }
668
669 // give each sub block a base pointer to enough buffer space for serialization and compression
670 toku::scoped_malloc serialize_buf(serialize_buf_size);
671 toku::scoped_malloc compression_buf(compression_buf_size);
672 for (size_t i = 0, uncompressed_offset = 0, compressed_offset = 0; i < (size_t) node->n_children; i++) {
673 sb[i].uncompressed_ptr = reinterpret_cast<char *>(serialize_buf.get()) + uncompressed_offset;
674 sb[i].compressed_ptr = reinterpret_cast<char *>(compression_buf.get()) + compressed_offset;
675 uncompressed_offset += sb[i].uncompressed_size;
676 compressed_offset += sb[i].compressed_size_bound + 8; // add 8 extra bytes, 4 for compressed size, 4 for decompressed size
677 invariant(uncompressed_offset <= serialize_buf_size);
678 invariant(compressed_offset <= compression_buf_size);
679 }
680
681 // do the actual serialization now that we have buffer space
682 struct serialize_times st = { 0, 0 };
683 if (in_parallel) {
684 serialize_and_compress_in_parallel(node, npartitions, compression_method, sb, &st);
685 } else {
686 serialize_and_compress_serially(node, npartitions, compression_method, sb, &st);
687 }
688
689 //
690 // Now lets create a sub-block that has the common node information,
691 // This does NOT include the header
692 //
693
694 // determine how large our serialization and copmression buffers need to be
695 struct sub_block sb_node_info;
696 sub_block_init(&sb_node_info);
697 size_t sb_node_info_uncompressed_size = serialize_ftnode_info_size(node);
698 size_t sb_node_info_compressed_size_bound = toku_compress_bound(compression_method, sb_node_info_uncompressed_size);
699 toku::scoped_malloc sb_node_info_uncompressed_buf(sb_node_info_uncompressed_size);
700 toku::scoped_malloc sb_node_info_compressed_buf(sb_node_info_compressed_size_bound + 8); // add 8 extra bytes, 4 for compressed size, 4 for decompressed size
701 sb_node_info.uncompressed_size = sb_node_info_uncompressed_size;
702 sb_node_info.uncompressed_ptr = sb_node_info_uncompressed_buf.get();
703 sb_node_info.compressed_size_bound = sb_node_info_compressed_size_bound;
704 sb_node_info.compressed_ptr = sb_node_info_compressed_buf.get();
705
706 // do the actual serialization now that we have buffer space
707 serialize_and_compress_sb_node_info(node, &sb_node_info, compression_method, &st);
708
709 //
710 // At this point, we have compressed each of our pieces into individual sub_blocks,
711 // we can put the header and all the subblocks into a single buffer and return it.
712 //
713
714 // update the serialize times, ignore the header for simplicity. we captured all
715 // of the partitions' serialize times so that's probably good enough.
716 toku_ft_status_update_serialize_times(node, st.serialize_time, st.compress_time);
717
718 // The total size of the node is:
719 // size of header + disk size of the n+1 sub_block's created above
720 uint32_t total_node_size = (serialize_node_header_size(node) // uncompressed header
721 + sb_node_info.compressed_size // compressed nodeinfo (without its checksum)
722 + 4); // nodeinfo's checksum
723 uint32_t total_uncompressed_size = (serialize_node_header_size(node) // uncompressed header
724 + sb_node_info.uncompressed_size // uncompressed nodeinfo (without its checksum)
725 + 4); // nodeinfo's checksum
726 // store the BP_SIZESs
727 for (int i = 0; i < node->n_children; i++) {
728 uint32_t len = sb[i].compressed_size + 4; // data and checksum
729 BP_SIZE (*ndd,i) = len;
730 BP_START(*ndd,i) = total_node_size;
731 total_node_size += sb[i].compressed_size + 4;
732 total_uncompressed_size += sb[i].uncompressed_size + 4;
733 }
734
735 // now create the final serialized node
736 uint32_t total_buffer_size = roundup_to_multiple(512, total_node_size); // make the buffer be 512 bytes.
737 char *XMALLOC_N_ALIGNED(512, total_buffer_size, data);
738 char *curr_ptr = data;
739
740 // write the header
741 struct wbuf wb;
742 wbuf_init(&wb, curr_ptr, serialize_node_header_size(node));
743 serialize_node_header(node, *ndd, &wb);
744 assert(wb.ndone == wb.size);
745 curr_ptr += serialize_node_header_size(node);
746
747 // now write sb_node_info
748 memcpy(curr_ptr, sb_node_info.compressed_ptr, sb_node_info.compressed_size);
749 curr_ptr += sb_node_info.compressed_size;
750 // write the checksum
751 *(uint32_t *)curr_ptr = toku_htod32(sb_node_info.xsum);
752 curr_ptr += sizeof(sb_node_info.xsum);
753
754 for (int i = 0; i < npartitions; i++) {
755 memcpy(curr_ptr, sb[i].compressed_ptr, sb[i].compressed_size);
756 curr_ptr += sb[i].compressed_size;
757 // write the checksum
758 *(uint32_t *)curr_ptr = toku_htod32(sb[i].xsum);
759 curr_ptr += sizeof(sb[i].xsum);
760 }
761 // Zero the rest of the buffer
762 memset(data + total_node_size, 0, total_buffer_size - total_node_size);
763
764 assert(curr_ptr - data == total_node_size);
765 *bytes_to_write = data;
766 *n_bytes_to_write = total_buffer_size;
767 *n_uncompressed_bytes = total_uncompressed_size;
768
769 invariant(*n_bytes_to_write % 512 == 0);
770 invariant(reinterpret_cast<unsigned long long>(*bytes_to_write) % 512 == 0);
771 return 0;
772 }
773
toku_serialize_ftnode_to(int fd,BLOCKNUM blocknum,FTNODE node,FTNODE_DISK_DATA * ndd,bool do_rebalancing,FT ft,bool for_checkpoint)774 int toku_serialize_ftnode_to(int fd,
775 BLOCKNUM blocknum,
776 FTNODE node,
777 FTNODE_DISK_DATA *ndd,
778 bool do_rebalancing,
779 FT ft,
780 bool for_checkpoint) {
781 size_t n_to_write;
782 size_t n_uncompressed_bytes;
783 char *compressed_buf = nullptr;
784
785 // because toku_serialize_ftnode_to is only called for
786 // in toku_ftnode_flush_callback, we pass false
787 // for in_parallel. The reasoning is that when we write
788 // nodes to disk via toku_ftnode_flush_callback, we
789 // assume that it is being done on a non-critical
790 // background thread (probably for checkpointing), and therefore
791 // should not hog CPU,
792 //
793 // Should the above facts change, we may want to revisit
794 // passing false for in_parallel here
795 //
796 // alternatively, we could have made in_parallel a parameter
797 // for toku_serialize_ftnode_to, but instead we did this.
798 int r = toku_serialize_ftnode_to_memory(
799 node,
800 ndd,
801 ft->h->basementnodesize,
802 ft->h->compression_method,
803 do_rebalancing,
804 toku_unsafe_fetch(&toku_serialize_in_parallel),
805 &n_to_write,
806 &n_uncompressed_bytes,
807 &compressed_buf);
808 if (r != 0) {
809 return r;
810 }
811
812 // If the node has never been written, then write the whole buffer,
813 // including the zeros
814 invariant(blocknum.b >= 0);
815 DISKOFF offset;
816
817 // Dirties the ft
818 ft->blocktable.realloc_on_disk(
819 blocknum, n_to_write, &offset, ft, fd, for_checkpoint);
820
821 tokutime_t t0 = toku_time_now();
822 toku_os_full_pwrite(fd, compressed_buf, n_to_write, offset);
823 tokutime_t t1 = toku_time_now();
824
825 tokutime_t io_time = t1 - t0;
826 toku_ft_status_update_flush_reason(
827 node, n_uncompressed_bytes, n_to_write, io_time, for_checkpoint);
828
829 toku_free(compressed_buf);
830 node->clear_dirty(); // See #1957. Must set the node to be clean after
831 // serializing it so that it doesn't get written again on
832 // the next checkpoint or eviction.
833 if (node->height == 0) {
834 for (int i = 0; i < node->n_children; i++) {
835 if (BP_STATE(node, i) == PT_AVAIL) {
836 BLB_LRD(node, i) = 0;
837 }
838 }
839 }
840 return 0;
841 }
842
843 static void
sort_and_steal_offset_arrays(NONLEAF_CHILDINFO bnc,const toku::comparator & cmp,int32_t ** fresh_offsets,int32_t nfresh,int32_t ** stale_offsets,int32_t nstale,int32_t ** broadcast_offsets,int32_t nbroadcast)844 sort_and_steal_offset_arrays(NONLEAF_CHILDINFO bnc,
845 const toku::comparator &cmp,
846 int32_t **fresh_offsets, int32_t nfresh,
847 int32_t **stale_offsets, int32_t nstale,
848 int32_t **broadcast_offsets, int32_t nbroadcast) {
849 // We always have fresh / broadcast offsets (even if they are empty)
850 // but we may not have stale offsets, in the case of v13 upgrade.
851 invariant(fresh_offsets != nullptr);
852 invariant(broadcast_offsets != nullptr);
853 invariant(cmp.valid());
854
855 typedef toku::sort<int32_t, const struct toku_msg_buffer_key_msn_cmp_extra, toku_msg_buffer_key_msn_cmp> msn_sort;
856
857 const int32_t n_in_this_buffer = nfresh + nstale + nbroadcast;
858 struct toku_msg_buffer_key_msn_cmp_extra extra(cmp, &bnc->msg_buffer);
859 msn_sort::mergesort_r(*fresh_offsets, nfresh, extra);
860 bnc->fresh_message_tree.destroy();
861 bnc->fresh_message_tree.create_steal_sorted_array(fresh_offsets, nfresh, n_in_this_buffer);
862 if (stale_offsets) {
863 msn_sort::mergesort_r(*stale_offsets, nstale, extra);
864 bnc->stale_message_tree.destroy();
865 bnc->stale_message_tree.create_steal_sorted_array(stale_offsets, nstale, n_in_this_buffer);
866 }
867 bnc->broadcast_list.destroy();
868 bnc->broadcast_list.create_steal_sorted_array(broadcast_offsets, nbroadcast, n_in_this_buffer);
869 }
870
871 static MSN
deserialize_child_buffer_v13(FT ft,NONLEAF_CHILDINFO bnc,struct rbuf * rb)872 deserialize_child_buffer_v13(FT ft, NONLEAF_CHILDINFO bnc, struct rbuf *rb) {
873 // We skip 'stale' offsets for upgraded nodes.
874 int32_t nfresh = 0, nbroadcast = 0;
875 int32_t *fresh_offsets = nullptr, *broadcast_offsets = nullptr;
876
877 // Only sort buffers if we have a valid comparison function. In certain scenarios,
878 // like deserialie_ft_versioned() or tokuftdump, we'll need to deserialize ftnodes
879 // for simple inspection and don't actually require that the message buffers are
880 // properly sorted. This is very ugly, but correct.
881 const bool sort = ft->cmp.valid();
882
883 MSN highest_msn_in_this_buffer =
884 bnc->msg_buffer.deserialize_from_rbuf_v13(rb, &ft->h->highest_unused_msn_for_upgrade,
885 sort ? &fresh_offsets : nullptr, &nfresh,
886 sort ? &broadcast_offsets : nullptr, &nbroadcast);
887
888 if (sort) {
889 sort_and_steal_offset_arrays(bnc, ft->cmp,
890 &fresh_offsets, nfresh,
891 nullptr, 0, // no stale offsets
892 &broadcast_offsets, nbroadcast);
893 }
894
895 return highest_msn_in_this_buffer;
896 }
897
898 static void
deserialize_child_buffer_v26(NONLEAF_CHILDINFO bnc,struct rbuf * rb,const toku::comparator & cmp)899 deserialize_child_buffer_v26(NONLEAF_CHILDINFO bnc, struct rbuf *rb, const toku::comparator &cmp) {
900 int32_t nfresh = 0, nstale = 0, nbroadcast = 0;
901 int32_t *fresh_offsets, *stale_offsets, *broadcast_offsets;
902
903 // Only sort buffers if we have a valid comparison function. In certain scenarios,
904 // like deserialie_ft_versioned() or tokuftdump, we'll need to deserialize ftnodes
905 // for simple inspection and don't actually require that the message buffers are
906 // properly sorted. This is very ugly, but correct.
907 const bool sort = cmp.valid();
908
909 // read in the message buffer
910 bnc->msg_buffer.deserialize_from_rbuf(rb,
911 sort ? &fresh_offsets : nullptr, &nfresh,
912 sort ? &stale_offsets : nullptr, &nstale,
913 sort ? &broadcast_offsets : nullptr, &nbroadcast);
914
915 if (sort) {
916 sort_and_steal_offset_arrays(bnc, cmp,
917 &fresh_offsets, nfresh,
918 &stale_offsets, nstale,
919 &broadcast_offsets, nbroadcast);
920 }
921 }
922
923 static void
deserialize_child_buffer(NONLEAF_CHILDINFO bnc,struct rbuf * rb)924 deserialize_child_buffer(NONLEAF_CHILDINFO bnc, struct rbuf *rb) {
925 // read in the message buffer
926 bnc->msg_buffer.deserialize_from_rbuf(rb,
927 nullptr, nullptr, // fresh_offsets, nfresh,
928 nullptr, nullptr, // stale_offsets, nstale,
929 nullptr, nullptr); // broadcast_offsets, nbroadcast
930
931 // read in each message tree (fresh, stale, broadcast)
932 int32_t nfresh = rbuf_int(rb);
933 int32_t *XMALLOC_N(nfresh, fresh_offsets);
934 for (int i = 0; i < nfresh; i++) {
935 fresh_offsets[i] = rbuf_int(rb);
936 }
937
938 int32_t nstale = rbuf_int(rb);
939 int32_t *XMALLOC_N(nstale, stale_offsets);
940 for (int i = 0; i < nstale; i++) {
941 stale_offsets[i] = rbuf_int(rb);
942 }
943
944 int32_t nbroadcast = rbuf_int(rb);
945 int32_t *XMALLOC_N(nbroadcast, broadcast_offsets);
946 for (int i = 0; i < nbroadcast; i++) {
947 broadcast_offsets[i] = rbuf_int(rb);
948 }
949
950 // build OMTs out of each offset array
951 bnc->fresh_message_tree.destroy();
952 bnc->fresh_message_tree.create_steal_sorted_array(&fresh_offsets, nfresh, nfresh);
953 bnc->stale_message_tree.destroy();
954 bnc->stale_message_tree.create_steal_sorted_array(&stale_offsets, nstale, nstale);
955 bnc->broadcast_list.destroy();
956 bnc->broadcast_list.create_steal_sorted_array(&broadcast_offsets, nbroadcast, nbroadcast);
957 }
958
959 // dump a buffer to stderr
960 // no locking around this for now
961 void
dump_bad_block(unsigned char * vp,uint64_t size)962 dump_bad_block(unsigned char *vp, uint64_t size) {
963 const uint64_t linesize = 64;
964 uint64_t n = size / linesize;
965 for (uint64_t i = 0; i < n; i++) {
966 fprintf(stderr, "%p: ", vp);
967 for (uint64_t j = 0; j < linesize; j++) {
968 unsigned char c = vp[j];
969 fprintf(stderr, "%2.2X", c);
970 }
971 fprintf(stderr, "\n");
972 vp += linesize;
973 }
974 size = size % linesize;
975 for (uint64_t i=0; i<size; i++) {
976 if ((i % linesize) == 0)
977 fprintf(stderr, "%p: ", vp+i);
978 fprintf(stderr, "%2.2X", vp[i]);
979 if (((i+1) % linesize) == 0)
980 fprintf(stderr, "\n");
981 }
982 fprintf(stderr, "\n");
983 }
984
985 ////////////////////////////////////////////////////////////////////
986 ////////////////////////////////////////////////////////////////////
987 ////////////////////////////////////////////////////////////////////
988 ////////////////////////////////////////////////////////////////////
989 ////////////////////////////////////////////////////////////////////
990 ////////////////////////////////////////////////////////////////////
991 ////////////////////////////////////////////////////////////////////
992 ////////////////////////////////////////////////////////////////////
993
toku_create_empty_bn(void)994 BASEMENTNODE toku_create_empty_bn(void) {
995 BASEMENTNODE bn = toku_create_empty_bn_no_buffer();
996 bn->data_buffer.initialize_empty();
997 return bn;
998 }
999
toku_clone_bn(BASEMENTNODE orig_bn)1000 BASEMENTNODE toku_clone_bn(BASEMENTNODE orig_bn) {
1001 BASEMENTNODE bn = toku_create_empty_bn_no_buffer();
1002 bn->max_msn_applied = orig_bn->max_msn_applied;
1003 bn->seqinsert = orig_bn->seqinsert;
1004 bn->stale_ancestor_messages_applied = orig_bn->stale_ancestor_messages_applied;
1005 bn->stat64_delta = orig_bn->stat64_delta;
1006 bn->logical_rows_delta = orig_bn->logical_rows_delta;
1007 bn->data_buffer.clone(&orig_bn->data_buffer);
1008 return bn;
1009 }
1010
toku_create_empty_bn_no_buffer(void)1011 BASEMENTNODE toku_create_empty_bn_no_buffer(void) {
1012 BASEMENTNODE XMALLOC(bn);
1013 bn->max_msn_applied.msn = 0;
1014 bn->seqinsert = 0;
1015 bn->stale_ancestor_messages_applied = false;
1016 bn->stat64_delta = ZEROSTATS;
1017 bn->logical_rows_delta = 0;
1018 bn->data_buffer.init_zero();
1019 return bn;
1020 }
1021
toku_create_empty_nl(void)1022 NONLEAF_CHILDINFO toku_create_empty_nl(void) {
1023 NONLEAF_CHILDINFO XMALLOC(cn);
1024 cn->msg_buffer.create();
1025 cn->fresh_message_tree.create_no_array();
1026 cn->stale_message_tree.create_no_array();
1027 cn->broadcast_list.create_no_array();
1028 memset(cn->flow, 0, sizeof cn->flow);
1029 return cn;
1030 }
1031
1032 // must clone the OMTs, since we serialize them along with the message buffer
toku_clone_nl(NONLEAF_CHILDINFO orig_childinfo)1033 NONLEAF_CHILDINFO toku_clone_nl(NONLEAF_CHILDINFO orig_childinfo) {
1034 NONLEAF_CHILDINFO XMALLOC(cn);
1035 cn->msg_buffer.clone(&orig_childinfo->msg_buffer);
1036 cn->fresh_message_tree.create_no_array();
1037 cn->fresh_message_tree.clone(orig_childinfo->fresh_message_tree);
1038 cn->stale_message_tree.create_no_array();
1039 cn->stale_message_tree.clone(orig_childinfo->stale_message_tree);
1040 cn->broadcast_list.create_no_array();
1041 cn->broadcast_list.clone(orig_childinfo->broadcast_list);
1042 memset(cn->flow, 0, sizeof cn->flow);
1043 return cn;
1044 }
1045
destroy_basement_node(BASEMENTNODE bn)1046 void destroy_basement_node (BASEMENTNODE bn)
1047 {
1048 bn->data_buffer.destroy();
1049 toku_free(bn);
1050 }
1051
destroy_nonleaf_childinfo(NONLEAF_CHILDINFO nl)1052 void destroy_nonleaf_childinfo (NONLEAF_CHILDINFO nl)
1053 {
1054 nl->msg_buffer.destroy();
1055 nl->fresh_message_tree.destroy();
1056 nl->stale_message_tree.destroy();
1057 nl->broadcast_list.destroy();
1058 toku_free(nl);
1059 }
1060
read_block_from_fd_into_rbuf(int fd,BLOCKNUM blocknum,FT ft,struct rbuf * rb)1061 void read_block_from_fd_into_rbuf(
1062 int fd,
1063 BLOCKNUM blocknum,
1064 FT ft,
1065 struct rbuf *rb
1066 )
1067 {
1068 // get the file offset and block size for the block
1069 DISKOFF offset, size;
1070 ft->blocktable.translate_blocknum_to_offset_size(blocknum, &offset, &size);
1071 DISKOFF size_aligned = roundup_to_multiple(512, size);
1072 uint8_t *XMALLOC_N_ALIGNED(512, size_aligned, raw_block);
1073 rbuf_init(rb, raw_block, size);
1074 // read the block
1075 ssize_t rlen = toku_os_pread(fd, raw_block, size_aligned, offset);
1076 assert((DISKOFF)rlen >= size);
1077 assert((DISKOFF)rlen <= size_aligned);
1078 }
1079
1080 static const int read_header_heuristic_max = 32*1024;
1081
1082 #ifndef MIN
1083 #define MIN(a,b) (((a)>(b)) ? (b) : (a))
1084 #endif
1085
1086 // Effect: If the header part of the node is small enough, then read it into the rbuf. The rbuf will be allocated to be big enough in any case.
read_ftnode_header_from_fd_into_rbuf_if_small_enough(int fd,BLOCKNUM blocknum,FT ft,struct rbuf * rb,ftnode_fetch_extra * bfe)1087 static void read_ftnode_header_from_fd_into_rbuf_if_small_enough(int fd, BLOCKNUM blocknum,
1088 FT ft, struct rbuf *rb,
1089 ftnode_fetch_extra *bfe) {
1090 DISKOFF offset, size;
1091 ft->blocktable.translate_blocknum_to_offset_size(blocknum, &offset, &size);
1092 DISKOFF read_size = roundup_to_multiple(512, MIN(read_header_heuristic_max, size));
1093 uint8_t *XMALLOC_N_ALIGNED(512, roundup_to_multiple(512, size), raw_block);
1094 rbuf_init(rb, raw_block, read_size);
1095
1096 // read the block
1097 tokutime_t t0 = toku_time_now();
1098 ssize_t rlen = toku_os_pread(fd, raw_block, read_size, offset);
1099 tokutime_t t1 = toku_time_now();
1100
1101 assert(rlen >= 0);
1102 rbuf_init(rb, raw_block, rlen);
1103
1104 bfe->bytes_read = rlen;
1105 bfe->io_time = t1 - t0;
1106 toku_ft_status_update_pivot_fetch_reason(bfe);
1107 }
1108
1109 //
1110 // read the compressed partition into the sub_block,
1111 // validate the checksum of the compressed data
1112 //
1113 int
read_compressed_sub_block(struct rbuf * rb,struct sub_block * sb)1114 read_compressed_sub_block(struct rbuf *rb, struct sub_block *sb)
1115 {
1116 int r = 0;
1117 sb->compressed_size = rbuf_int(rb);
1118 sb->uncompressed_size = rbuf_int(rb);
1119 const void **cp = (const void **) &sb->compressed_ptr;
1120 rbuf_literal_bytes(rb, cp, sb->compressed_size);
1121 sb->xsum = rbuf_int(rb);
1122 // let's check the checksum
1123 uint32_t actual_xsum = toku_x1764_memory((char *)sb->compressed_ptr-8, 8+sb->compressed_size);
1124 if (sb->xsum != actual_xsum) {
1125 r = TOKUDB_BAD_CHECKSUM;
1126 }
1127 return r;
1128 }
1129
1130 static int
read_and_decompress_sub_block(struct rbuf * rb,struct sub_block * sb)1131 read_and_decompress_sub_block(struct rbuf *rb, struct sub_block *sb)
1132 {
1133 int r = 0;
1134 r = read_compressed_sub_block(rb, sb);
1135 if (r != 0) {
1136 goto exit;
1137 }
1138
1139 just_decompress_sub_block(sb);
1140 exit:
1141 return r;
1142 }
1143
1144 // Allocates space for the sub-block and de-compresses the data from
1145 // the supplied compressed pointer..
1146 void
just_decompress_sub_block(struct sub_block * sb)1147 just_decompress_sub_block(struct sub_block *sb)
1148 {
1149 // <CER> TODO: Add assert that the subblock was read in.
1150 sb->uncompressed_ptr = toku_xmalloc(sb->uncompressed_size);
1151
1152 toku_decompress(
1153 (Bytef *) sb->uncompressed_ptr,
1154 sb->uncompressed_size,
1155 (Bytef *) sb->compressed_ptr,
1156 sb->compressed_size
1157 );
1158 }
1159
1160 // verify the checksum
verify_ftnode_sub_block(struct sub_block * sb,const char * fname,BLOCKNUM blocknum)1161 int verify_ftnode_sub_block(struct sub_block *sb,
1162 const char *fname,
1163 BLOCKNUM blocknum) {
1164 int r = 0;
1165 // first verify the checksum
1166 uint32_t data_size = sb->uncompressed_size - 4; // checksum is 4 bytes at end
1167 uint32_t stored_xsum = toku_dtoh32(*((uint32_t *)((char *)sb->uncompressed_ptr + data_size)));
1168 uint32_t actual_xsum = toku_x1764_memory(sb->uncompressed_ptr, data_size);
1169 if (stored_xsum != actual_xsum) {
1170 fprintf(
1171 stderr,
1172 "%s:%d:verify_ftnode_sub_block - "
1173 "file[%s], blocknum[%ld], stored_xsum[%u] != actual_xsum[%u]\n",
1174 __FILE__,
1175 __LINE__,
1176 fname ? fname : "unknown",
1177 blocknum.b,
1178 stored_xsum,
1179 actual_xsum);
1180 dump_bad_block((Bytef *) sb->uncompressed_ptr, sb->uncompressed_size);
1181 r = TOKUDB_BAD_CHECKSUM;
1182 }
1183 return r;
1184 }
1185
1186 // This function deserializes the data stored by serialize_ftnode_info
deserialize_ftnode_info(struct sub_block * sb,FTNODE node)1187 static int deserialize_ftnode_info(struct sub_block *sb, FTNODE node) {
1188
1189 // sb_node_info->uncompressed_ptr stores the serialized node information
1190 // this function puts that information into node
1191
1192 // first verify the checksum
1193 int r = 0;
1194 const char *fname = toku_ftnode_get_cachefile_fname_in_env(node);
1195 r = verify_ftnode_sub_block(sb, fname, node->blocknum);
1196 if (r != 0) {
1197 fprintf(
1198 stderr,
1199 "%s:%d:deserialize_ftnode_info - "
1200 "file[%s], blocknum[%ld], verify_ftnode_sub_block failed with %d\n",
1201 __FILE__,
1202 __LINE__,
1203 fname ? fname : "unknown",
1204 node->blocknum.b,
1205 r);
1206 dump_bad_block(static_cast<unsigned char *>(sb->uncompressed_ptr),
1207 sb->uncompressed_size);
1208 goto exit;
1209 }
1210
1211 uint32_t data_size;
1212 data_size = sb->uncompressed_size - 4; // checksum is 4 bytes at end
1213
1214 // now with the data verified, we can read the information into the node
1215 struct rbuf rb;
1216 rbuf_init(&rb, (unsigned char *) sb->uncompressed_ptr, data_size);
1217
1218 node->max_msn_applied_to_node_on_disk = rbuf_MSN(&rb);
1219 (void)rbuf_int(&rb);
1220 node->flags = rbuf_int(&rb);
1221 node->height = rbuf_int(&rb);
1222 if (node->layout_version_read_from_disk < FT_LAYOUT_VERSION_19) {
1223 (void) rbuf_int(&rb); // optimized_for_upgrade
1224 }
1225 if (node->layout_version_read_from_disk >= FT_LAYOUT_VERSION_22) {
1226 rbuf_TXNID(&rb, &node->oldest_referenced_xid_known);
1227 }
1228
1229 // now create the basement nodes or childinfos, depending on whether this is a
1230 // leaf node or internal node
1231 // now the subtree_estimates
1232
1233 // n_children is now in the header, nd the allocatio of the node->bp is in deserialize_ftnode_from_rbuf.
1234
1235 // now the pivots
1236 if (node->n_children > 1) {
1237 node->pivotkeys.deserialize_from_rbuf(&rb, node->n_children - 1);
1238 } else {
1239 node->pivotkeys.create_empty();
1240 }
1241
1242 // if this is an internal node, unpack the block nums, and fill in necessary fields
1243 // of childinfo
1244 if (node->height > 0) {
1245 for (int i = 0; i < node->n_children; i++) {
1246 BP_BLOCKNUM(node,i) = rbuf_blocknum(&rb);
1247 BP_WORKDONE(node, i) = 0;
1248 }
1249 }
1250
1251 // make sure that all the data was read
1252 if (data_size != rb.ndone) {
1253 fprintf(
1254 stderr,
1255 "%s:%d:deserialize_ftnode_info - "
1256 "file[%s], blocknum[%ld], data_size[%d] != rb.ndone[%d]\n",
1257 __FILE__,
1258 __LINE__,
1259 fname ? fname : "unknown",
1260 node->blocknum.b,
1261 data_size,
1262 rb.ndone);
1263 dump_bad_block(rb.buf, rb.size);
1264 abort();
1265 }
1266 exit:
1267 return r;
1268 }
1269
1270 static void
setup_available_ftnode_partition(FTNODE node,int i)1271 setup_available_ftnode_partition(FTNODE node, int i) {
1272 if (node->height == 0) {
1273 set_BLB(node, i, toku_create_empty_bn());
1274 BLB_MAX_MSN_APPLIED(node,i) = node->max_msn_applied_to_node_on_disk;
1275 }
1276 else {
1277 set_BNC(node, i, toku_create_empty_nl());
1278 }
1279 }
1280
1281 // Assign the child_to_read member of the bfe from the given ftnode
1282 // that has been brought into memory.
1283 static void
update_bfe_using_ftnode(FTNODE node,ftnode_fetch_extra * bfe)1284 update_bfe_using_ftnode(FTNODE node, ftnode_fetch_extra *bfe)
1285 {
1286 if (bfe->type == ftnode_fetch_subset && bfe->search != NULL) {
1287 // we do not take into account prefetching yet
1288 // as of now, if we need a subset, the only thing
1289 // we can possibly require is a single basement node
1290 // we find out what basement node the query cares about
1291 // and check if it is available
1292 bfe->child_to_read = toku_ft_search_which_child(
1293 bfe->ft->cmp,
1294 node,
1295 bfe->search
1296 );
1297 } else if (bfe->type == ftnode_fetch_keymatch) {
1298 // we do not take into account prefetching yet
1299 // as of now, if we need a subset, the only thing
1300 // we can possibly require is a single basement node
1301 // we find out what basement node the query cares about
1302 // and check if it is available
1303 if (node->height == 0) {
1304 int left_child = bfe->leftmost_child_wanted(node);
1305 int right_child = bfe->rightmost_child_wanted(node);
1306 if (left_child == right_child) {
1307 bfe->child_to_read = left_child;
1308 }
1309 }
1310 }
1311 }
1312
1313 // Using the search parameters in the bfe, this function will
1314 // initialize all of the given ftnode's partitions.
1315 static void
setup_partitions_using_bfe(FTNODE node,ftnode_fetch_extra * bfe,bool data_in_memory)1316 setup_partitions_using_bfe(FTNODE node,
1317 ftnode_fetch_extra *bfe,
1318 bool data_in_memory)
1319 {
1320 // Leftmost and Rightmost Child bounds.
1321 int lc, rc;
1322 if (bfe->type == ftnode_fetch_subset || bfe->type == ftnode_fetch_prefetch) {
1323 lc = bfe->leftmost_child_wanted(node);
1324 rc = bfe->rightmost_child_wanted(node);
1325 } else {
1326 lc = -1;
1327 rc = -1;
1328 }
1329
1330 //
1331 // setup memory needed for the node
1332 //
1333 //printf("node height %d, blocknum %" PRId64 ", type %d lc %d rc %d\n", node->height, node->blocknum.b, bfe->type, lc, rc);
1334 for (int i = 0; i < node->n_children; i++) {
1335 BP_INIT_UNTOUCHED_CLOCK(node,i);
1336 if (data_in_memory) {
1337 BP_STATE(node, i) = ((bfe->wants_child_available(i) || (lc <= i && i <= rc))
1338 ? PT_AVAIL : PT_COMPRESSED);
1339 } else {
1340 BP_STATE(node, i) = PT_ON_DISK;
1341 }
1342 BP_WORKDONE(node,i) = 0;
1343
1344 switch (BP_STATE(node,i)) {
1345 case PT_AVAIL:
1346 setup_available_ftnode_partition(node, i);
1347 BP_TOUCH_CLOCK(node,i);
1348 break;
1349 case PT_COMPRESSED:
1350 set_BSB(node, i, sub_block_creat());
1351 break;
1352 case PT_ON_DISK:
1353 set_BNULL(node, i);
1354 break;
1355 case PT_INVALID:
1356 abort();
1357 }
1358 }
1359 }
1360
setup_ftnode_partitions(FTNODE node,ftnode_fetch_extra * bfe,bool data_in_memory)1361 static void setup_ftnode_partitions(FTNODE node, ftnode_fetch_extra *bfe, bool data_in_memory)
1362 // Effect: Used when reading a ftnode into main memory, this sets up the partitions.
1363 // We set bfe->child_to_read as well as the BP_STATE and the data pointers (e.g., with set_BSB or set_BNULL or other set_ operations).
1364 // Arguments: Node: the node to set up.
1365 // bfe: Describes the key range needed.
1366 // data_in_memory: true if we have all the data (in which case we set the BP_STATE to be either PT_AVAIL or PT_COMPRESSED depending on the bfe.
1367 // false if we don't have the partitions in main memory (in which case we set the state to PT_ON_DISK.
1368 {
1369 // Set bfe->child_to_read.
1370 update_bfe_using_ftnode(node, bfe);
1371
1372 // Setup the partitions.
1373 setup_partitions_using_bfe(node, bfe, data_in_memory);
1374 }
1375
1376 /* deserialize the partition from the sub-block's uncompressed buffer
1377 * and destroy the uncompressed buffer
1378 */
deserialize_ftnode_partition(struct sub_block * sb,FTNODE node,int childnum,const toku::comparator & cmp)1379 static int deserialize_ftnode_partition(
1380 struct sub_block *sb,
1381 FTNODE node,
1382 int childnum, // which partition to deserialize
1383 const toku::comparator &cmp) {
1384
1385 int r = 0;
1386 const char *fname = toku_ftnode_get_cachefile_fname_in_env(node);
1387 r = verify_ftnode_sub_block(sb, fname, node->blocknum);
1388 if (r != 0) {
1389 fprintf(stderr,
1390 "%s:%d:deserialize_ftnode_partition - "
1391 "file[%s], blocknum[%ld], "
1392 "verify_ftnode_sub_block failed with %d\n",
1393 __FILE__,
1394 __LINE__,
1395 fname ? fname : "unknown",
1396 node->blocknum.b,
1397 r);
1398 goto exit;
1399 }
1400 uint32_t data_size;
1401 data_size = sb->uncompressed_size - 4; // checksum is 4 bytes at end
1402
1403 // now with the data verified, we can read the information into the node
1404 struct rbuf rb;
1405 rbuf_init(&rb, (unsigned char *) sb->uncompressed_ptr, data_size);
1406 unsigned char ch;
1407 ch = rbuf_char(&rb);
1408
1409 if (node->height > 0) {
1410 if (ch != FTNODE_PARTITION_MSG_BUFFER) {
1411 fprintf(stderr,
1412 "%s:%d:deserialize_ftnode_partition - "
1413 "file[%s], blocknum[%ld], ch[%d] != "
1414 "FTNODE_PARTITION_MSG_BUFFER[%d]\n",
1415 __FILE__,
1416 __LINE__,
1417 fname ? fname : "unknown",
1418 node->blocknum.b,
1419 ch,
1420 FTNODE_PARTITION_MSG_BUFFER);
1421 dump_bad_block(rb.buf, rb.size);
1422 assert(ch == FTNODE_PARTITION_MSG_BUFFER);
1423 }
1424 NONLEAF_CHILDINFO bnc = BNC(node, childnum);
1425 if (node->layout_version_read_from_disk <= FT_LAYOUT_VERSION_26) {
1426 // Layout version <= 26 did not serialize sorted message trees to disk.
1427 deserialize_child_buffer_v26(bnc, &rb, cmp);
1428 } else {
1429 deserialize_child_buffer(bnc, &rb);
1430 }
1431 BP_WORKDONE(node, childnum) = 0;
1432 } else {
1433 if (ch != FTNODE_PARTITION_DMT_LEAVES) {
1434 fprintf(stderr,
1435 "%s:%d:deserialize_ftnode_partition - "
1436 "file[%s], blocknum[%ld], ch[%d] != "
1437 "FTNODE_PARTITION_DMT_LEAVES[%d]\n",
1438 __FILE__,
1439 __LINE__,
1440 fname ? fname : "unknown",
1441 node->blocknum.b,
1442 ch,
1443 FTNODE_PARTITION_DMT_LEAVES);
1444 dump_bad_block(rb.buf, rb.size);
1445 assert(ch == FTNODE_PARTITION_DMT_LEAVES);
1446 }
1447
1448 BLB_SEQINSERT(node, childnum) = 0;
1449 uint32_t num_entries = rbuf_int(&rb);
1450 // we are now at the first byte of first leafentry
1451 data_size -= rb.ndone; // remaining bytes of leafentry data
1452
1453 BASEMENTNODE bn = BLB(node, childnum);
1454 bn->data_buffer.deserialize_from_rbuf(
1455 num_entries, &rb, data_size, node->layout_version_read_from_disk);
1456 }
1457 if (rb.ndone != rb.size) {
1458 fprintf(stderr,
1459 "%s:%d:deserialize_ftnode_partition - "
1460 "file[%s], blocknum[%ld], rb.ndone[%d] != rb.size[%d]\n",
1461 __FILE__,
1462 __LINE__,
1463 fname ? fname : "unknown",
1464 node->blocknum.b,
1465 rb.ndone,
1466 rb.size);
1467 dump_bad_block(rb.buf, rb.size);
1468 assert(rb.ndone == rb.size);
1469 }
1470
1471 exit:
1472 return r;
1473 }
1474
decompress_and_deserialize_worker(struct rbuf curr_rbuf,struct sub_block curr_sb,FTNODE node,int child,const toku::comparator & cmp,tokutime_t * decompress_time)1475 static int decompress_and_deserialize_worker(struct rbuf curr_rbuf,
1476 struct sub_block curr_sb,
1477 FTNODE node,
1478 int child,
1479 const toku::comparator &cmp,
1480 tokutime_t *decompress_time) {
1481 int r = 0;
1482 tokutime_t t0 = toku_time_now();
1483 r = read_and_decompress_sub_block(&curr_rbuf, &curr_sb);
1484 if (r != 0) {
1485 const char *fname = toku_ftnode_get_cachefile_fname_in_env(node);
1486 fprintf(stderr,
1487 "%s:%d:decompress_and_deserialize_worker - "
1488 "file[%s], blocknum[%ld], read_and_decompress_sub_block failed "
1489 "with %d\n",
1490 __FILE__,
1491 __LINE__,
1492 fname ? fname : "unknown",
1493 node->blocknum.b,
1494 r);
1495 dump_bad_block(curr_rbuf.buf, curr_rbuf.size);
1496 goto exit;
1497 }
1498 *decompress_time = toku_time_now() - t0;
1499 // at this point, sb->uncompressed_ptr stores the serialized node partition
1500 r = deserialize_ftnode_partition(&curr_sb, node, child, cmp);
1501 if (r != 0) {
1502 const char *fname = toku_ftnode_get_cachefile_fname_in_env(node);
1503 fprintf(stderr,
1504 "%s:%d:decompress_and_deserialize_worker - "
1505 "file[%s], blocknum[%ld], deserialize_ftnode_partition failed "
1506 "with %d\n",
1507 __FILE__,
1508 __LINE__,
1509 fname ? fname : "unknown",
1510 node->blocknum.b,
1511 r);
1512 dump_bad_block(curr_rbuf.buf, curr_rbuf.size);
1513 goto exit;
1514 }
1515
1516 exit:
1517 toku_free(curr_sb.uncompressed_ptr);
1518 return r;
1519 }
1520
check_and_copy_compressed_sub_block_worker(struct rbuf curr_rbuf,struct sub_block curr_sb,FTNODE node,int child)1521 static int check_and_copy_compressed_sub_block_worker(struct rbuf curr_rbuf,
1522 struct sub_block curr_sb,
1523 FTNODE node,
1524 int child) {
1525 int r = 0;
1526 r = read_compressed_sub_block(&curr_rbuf, &curr_sb);
1527 if (r != 0) {
1528 goto exit;
1529 }
1530
1531 SUB_BLOCK bp_sb;
1532 bp_sb = BSB(node, child);
1533 bp_sb->compressed_size = curr_sb.compressed_size;
1534 bp_sb->uncompressed_size = curr_sb.uncompressed_size;
1535 bp_sb->compressed_ptr = toku_xmalloc(bp_sb->compressed_size);
1536 memcpy(
1537 bp_sb->compressed_ptr, curr_sb.compressed_ptr, bp_sb->compressed_size);
1538 exit:
1539 return r;
1540 }
1541
alloc_ftnode_for_deserialize(uint32_t fullhash,BLOCKNUM blocknum)1542 static FTNODE alloc_ftnode_for_deserialize(uint32_t fullhash, BLOCKNUM blocknum) {
1543 // Effect: Allocate an FTNODE and fill in the values that are not read from
1544 FTNODE XMALLOC(node);
1545 node->fullhash = fullhash;
1546 node->blocknum = blocknum;
1547 node->clear_dirty();
1548 node->oldest_referenced_xid_known = TXNID_NONE;
1549 node->bp = nullptr;
1550 node->ct_pair = nullptr;
1551 return node;
1552 }
1553
deserialize_ftnode_header_from_rbuf_if_small_enough(FTNODE * ftnode,FTNODE_DISK_DATA * ndd,BLOCKNUM blocknum,uint32_t fullhash,ftnode_fetch_extra * bfe,struct rbuf * rb,int fd)1554 static int deserialize_ftnode_header_from_rbuf_if_small_enough(
1555 FTNODE *ftnode,
1556 FTNODE_DISK_DATA *ndd,
1557 BLOCKNUM blocknum,
1558 uint32_t fullhash,
1559 ftnode_fetch_extra *bfe,
1560 struct rbuf *rb,
1561 int fd)
1562 // If we have enough information in the rbuf to construct a header, then do so.
1563 // Also fetch in the basement node if needed.
1564 // Return 0 if it worked. If something goes wrong (including that we are
1565 // looking at some old data format that doesn't have partitions) then return
1566 // nonzero.
1567 {
1568 int r = 0;
1569
1570 tokutime_t t0, t1;
1571 tokutime_t decompress_time = 0;
1572 tokutime_t deserialize_time = 0;
1573 // we must get the name from bfe and not through
1574 // toku_ftnode_get_cachefile_fname_in_env as the node is not set up yet
1575 const char* fname = toku_cachefile_fname_in_env(bfe->ft->cf);
1576
1577 t0 = toku_time_now();
1578
1579 FTNODE node = alloc_ftnode_for_deserialize(fullhash, blocknum);
1580
1581 if (rb->size < 24) {
1582 fprintf(
1583 stderr,
1584 "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - "
1585 "file[%s], blocknum[%ld], rb->size[%u] < 24\n",
1586 __FILE__,
1587 __LINE__,
1588 fname ? fname : "unknown",
1589 blocknum.b,
1590 rb->size);
1591 dump_bad_block(rb->buf, rb->size);
1592 // TODO: What error do we return here?
1593 // Does it even matter?
1594 r = toku_db_badformat();
1595 goto cleanup;
1596 }
1597
1598 const void *magic;
1599 rbuf_literal_bytes(rb, &magic, 8);
1600 if (memcmp(magic, "tokuleaf", 8) != 0 &&
1601 memcmp(magic, "tokunode", 8) != 0) {
1602 fprintf(
1603 stderr,
1604 "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - "
1605 "file[%s], blocknum[%ld], unrecognized magic number "
1606 "%2.2x %2.2x %2.2x %2.2x %2.2x %2.2x %2.2x %2.2x\n",
1607 __FILE__,
1608 __LINE__,
1609 fname ? fname : "unknown",
1610 blocknum.b,
1611 static_cast<const uint8_t*>(magic)[0],
1612 static_cast<const uint8_t*>(magic)[1],
1613 static_cast<const uint8_t*>(magic)[2],
1614 static_cast<const uint8_t*>(magic)[3],
1615 static_cast<const uint8_t*>(magic)[4],
1616 static_cast<const uint8_t*>(magic)[5],
1617 static_cast<const uint8_t*>(magic)[6],
1618 static_cast<const uint8_t*>(magic)[7]);
1619 dump_bad_block(rb->buf, rb->size);
1620 r = toku_db_badformat();
1621 goto cleanup;
1622 }
1623
1624 node->layout_version_read_from_disk = rbuf_int(rb);
1625 if (node->layout_version_read_from_disk <
1626 FT_FIRST_LAYOUT_VERSION_WITH_BASEMENT_NODES) {
1627 fprintf(
1628 stderr,
1629 "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - "
1630 "file[%s], blocknum[%ld], node->layout_version_read_from_disk[%d] "
1631 "< FT_FIRST_LAYOUT_VERSION_WITH_BASEMENT_NODES[%d]\n",
1632 __FILE__,
1633 __LINE__,
1634 fname ? fname : "unknown",
1635 blocknum.b,
1636 node->layout_version_read_from_disk,
1637 FT_FIRST_LAYOUT_VERSION_WITH_BASEMENT_NODES);
1638 dump_bad_block(rb->buf, rb->size);
1639 // This code path doesn't have to worry about upgrade.
1640 r = toku_db_badformat();
1641 goto cleanup;
1642 }
1643
1644 // If we get here, we know the node is at least
1645 // FT_FIRST_LAYOUT_VERSION_WITH_BASEMENT_NODES. We haven't changed
1646 // the serialization format since then (this comment is correct as of
1647 // version 20, which is Deadshot) so we can go ahead and say the
1648 // layout version is current (it will be as soon as we finish
1649 // deserializing).
1650 // TODO(leif): remove node->layout_version (#5174)
1651 node->layout_version = FT_LAYOUT_VERSION;
1652
1653 node->layout_version_original = rbuf_int(rb);
1654 node->build_id = rbuf_int(rb);
1655 node->n_children = rbuf_int(rb);
1656 // Guaranteed to be have been able to read up to here. If n_children
1657 // is too big, we may have a problem, so check that we won't overflow
1658 // while reading the partition locations.
1659 unsigned int nhsize;
1660 // we can do this because n_children is filled in.
1661 nhsize = serialize_node_header_size(node);
1662 unsigned int needed_size;
1663 // we need 12 more so that we can read the compressed block size information
1664 // that follows for the nodeinfo.
1665 needed_size = nhsize + 12;
1666 if (needed_size > rb->size) {
1667 fprintf(
1668 stderr,
1669 "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - "
1670 "file[%s], blocknum[%ld], needed_size[%d] > rb->size[%d]\n",
1671 __FILE__,
1672 __LINE__,
1673 fname ? fname : "unknown",
1674 blocknum.b,
1675 needed_size,
1676 rb->size);
1677 dump_bad_block(rb->buf, rb->size);
1678 r = toku_db_badformat();
1679 goto cleanup;
1680 }
1681
1682 XMALLOC_N(node->n_children, node->bp);
1683 XMALLOC_N(node->n_children, *ndd);
1684 // read the partition locations
1685 for (int i=0; i<node->n_children; i++) {
1686 BP_START(*ndd,i) = rbuf_int(rb);
1687 BP_SIZE (*ndd,i) = rbuf_int(rb);
1688 }
1689
1690 uint32_t checksum;
1691 checksum = toku_x1764_memory(rb->buf, rb->ndone);
1692 uint32_t stored_checksum;
1693 stored_checksum = rbuf_int(rb);
1694 if (stored_checksum != checksum) {
1695 fprintf(
1696 stderr,
1697 "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - "
1698 "file[%s], blocknum[%ld], stored_checksum[%d] != checksum[%d]\n",
1699 __FILE__,
1700 __LINE__,
1701 fname ? fname : "unknown",
1702 blocknum.b,
1703 stored_checksum,
1704 checksum);
1705 dump_bad_block(rb->buf, rb->size);
1706 r = TOKUDB_BAD_CHECKSUM;
1707 goto cleanup;
1708 }
1709
1710 // Now we want to read the pivot information.
1711 struct sub_block sb_node_info;
1712 sub_block_init(&sb_node_info);
1713 // we'll be able to read these because we checked the size earlier.
1714 sb_node_info.compressed_size = rbuf_int(rb);
1715 sb_node_info.uncompressed_size = rbuf_int(rb);
1716 if (rb->size - rb->ndone < sb_node_info.compressed_size + 8) {
1717 fprintf(
1718 stderr,
1719 "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - "
1720 "file[%s], blocknum[%ld], rb->size[%d] - rb->ndone[%d] < "
1721 "sb_node_info.compressed_size[%d] + 8\n",
1722 __FILE__,
1723 __LINE__,
1724 fname ? fname : "unknown",
1725 blocknum.b,
1726 rb->size,
1727 rb->ndone,
1728 sb_node_info.compressed_size);
1729 dump_bad_block(rb->buf, rb->size);
1730 r = toku_db_badformat();
1731 goto cleanup;
1732 }
1733
1734 // Finish reading compressed the sub_block
1735 const void **cp;
1736 cp = (const void **) &sb_node_info.compressed_ptr;
1737 rbuf_literal_bytes(rb, cp, sb_node_info.compressed_size);
1738 sb_node_info.xsum = rbuf_int(rb);
1739 // let's check the checksum
1740 uint32_t actual_xsum;
1741 actual_xsum = toku_x1764_memory((char *)sb_node_info.compressed_ptr - 8,
1742 8 + sb_node_info.compressed_size);
1743 if (sb_node_info.xsum != actual_xsum) {
1744 fprintf(
1745 stderr,
1746 "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - "
1747 "file[%s], blocknum[%ld], sb_node_info.xsum[%d] != actual_xsum[%d]\n",
1748 __FILE__,
1749 __LINE__,
1750 fname ? fname : "unknown",
1751 blocknum.b,
1752 sb_node_info.xsum,
1753 actual_xsum);
1754 dump_bad_block(rb->buf, rb->size);
1755 r = TOKUDB_BAD_CHECKSUM;
1756 goto cleanup;
1757 }
1758
1759 // Now decompress the subblock
1760 {
1761 toku::scoped_malloc sb_node_info_buf(sb_node_info.uncompressed_size);
1762 sb_node_info.uncompressed_ptr = sb_node_info_buf.get();
1763 tokutime_t decompress_t0 = toku_time_now();
1764 toku_decompress((Bytef *)sb_node_info.uncompressed_ptr,
1765 sb_node_info.uncompressed_size,
1766 (Bytef *)sb_node_info.compressed_ptr,
1767 sb_node_info.compressed_size);
1768 tokutime_t decompress_t1 = toku_time_now();
1769 decompress_time = decompress_t1 - decompress_t0;
1770
1771 // at this point sb->uncompressed_ptr stores the serialized node info.
1772 r = deserialize_ftnode_info(&sb_node_info, node);
1773 if (r != 0) {
1774 fprintf(
1775 stderr,
1776 "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - "
1777 "file[%s], blocknum[%ld], deserialize_ftnode_info failed with "
1778 "%d\n",
1779 __FILE__,
1780 __LINE__,
1781 fname ? fname : "unknown",
1782 blocknum.b,
1783 r);
1784 dump_bad_block(
1785 static_cast<unsigned char *>(sb_node_info.uncompressed_ptr),
1786 sb_node_info.uncompressed_size);
1787 dump_bad_block(rb->buf, rb->size);
1788 goto cleanup;
1789 }
1790 }
1791
1792 // Now we have the ftnode_info. We have a bunch more stuff in the
1793 // rbuf, so we might be able to store the compressed data for some
1794 // objects.
1795 // We can proceed to deserialize the individual subblocks.
1796
1797 // setup the memory of the partitions
1798 // for partitions being decompressed, create either message buffer or basement node
1799 // for partitions staying compressed, create sub_block
1800 setup_ftnode_partitions(node, bfe, false);
1801
1802 // We must capture deserialize and decompression time before
1803 // the pf_callback, otherwise we would double-count.
1804 t1 = toku_time_now();
1805 deserialize_time = (t1 - t0) - decompress_time;
1806
1807 // do partial fetch if necessary
1808 if (bfe->type != ftnode_fetch_none) {
1809 PAIR_ATTR attr;
1810 r = toku_ftnode_pf_callback(node, *ndd, bfe, fd, &attr);
1811 if (r != 0) {
1812 fprintf(
1813 stderr,
1814 "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - "
1815 "file[%s], blocknum[%ld], toku_ftnode_pf_callback failed with "
1816 "%d\n",
1817 __FILE__,
1818 __LINE__,
1819 fname ? fname : "unknown",
1820 blocknum.b,
1821 r);
1822 dump_bad_block(rb->buf, rb->size);
1823 goto cleanup;
1824 }
1825 }
1826
1827 // handle clock
1828 for (int i = 0; i < node->n_children; i++) {
1829 if (bfe->wants_child_available(i)) {
1830 paranoid_invariant(BP_STATE(node,i) == PT_AVAIL);
1831 BP_TOUCH_CLOCK(node,i);
1832 }
1833 }
1834 *ftnode = node;
1835 r = 0;
1836
1837 cleanup:
1838 if (r == 0) {
1839 bfe->deserialize_time += deserialize_time;
1840 bfe->decompress_time += decompress_time;
1841 toku_ft_status_update_deserialize_times(node, deserialize_time, decompress_time);
1842 }
1843 if (r != 0) {
1844 if (node) {
1845 toku_free(*ndd);
1846 toku_free(node->bp);
1847 toku_free(node);
1848 }
1849 }
1850 return r;
1851 }
1852
1853 // This function takes a deserialized version 13 or 14 buffer and
1854 // constructs the associated internal, non-leaf ftnode object. It
1855 // also creates MSN's for older messages created in older versions
1856 // that did not generate MSN's for messages. These new MSN's are
1857 // generated from the root downwards, counting backwards from MIN_MSN
1858 // and persisted in the ft header.
deserialize_and_upgrade_internal_node(FTNODE node,struct rbuf * rb,ftnode_fetch_extra * bfe,STAT64INFO info)1859 static int deserialize_and_upgrade_internal_node(FTNODE node,
1860 struct rbuf *rb,
1861 ftnode_fetch_extra *bfe,
1862 STAT64INFO info) {
1863 int version = node->layout_version_read_from_disk;
1864
1865 if (version == FT_LAST_LAYOUT_VERSION_WITH_FINGERPRINT) {
1866 (void) rbuf_int(rb); // 10. fingerprint
1867 }
1868
1869 node->n_children = rbuf_int(rb); // 11. n_children
1870
1871 // Sub-tree esitmates...
1872 for (int i = 0; i < node->n_children; ++i) {
1873 if (version == FT_LAST_LAYOUT_VERSION_WITH_FINGERPRINT) {
1874 (void) rbuf_int(rb); // 12. fingerprint
1875 }
1876 uint64_t nkeys = rbuf_ulonglong(rb); // 13. nkeys
1877 uint64_t ndata = rbuf_ulonglong(rb); // 14. ndata
1878 uint64_t dsize = rbuf_ulonglong(rb); // 15. dsize
1879 (void) rbuf_char(rb); // 16. exact (char)
1880 invariant(nkeys == ndata);
1881 if (info) {
1882 // info is non-null if we're trying to upgrade old subtree
1883 // estimates to stat64info
1884 info->numrows += nkeys;
1885 info->numbytes += dsize;
1886 }
1887 }
1888
1889 // Pivot keys
1890 node->pivotkeys.deserialize_from_rbuf(rb, node->n_children - 1);
1891
1892 // Create space for the child node buffers (a.k.a. partitions).
1893 XMALLOC_N(node->n_children, node->bp);
1894
1895 // Set the child blocknums.
1896 for (int i = 0; i < node->n_children; ++i) {
1897 BP_BLOCKNUM(node, i) = rbuf_blocknum(rb); // 18. blocknums
1898 BP_WORKDONE(node, i) = 0;
1899 }
1900
1901 // Read in the child buffer maps.
1902 for (int i = 0; i < node->n_children; ++i) {
1903 // The following fields were previously used by the `sub_block_map'
1904 // They include:
1905 // - 4 byte index
1906 (void) rbuf_int(rb);
1907 // - 4 byte offset
1908 (void) rbuf_int(rb);
1909 // - 4 byte size
1910 (void) rbuf_int(rb);
1911 }
1912
1913 // We need to setup this node's partitions, but we can't call the
1914 // existing call (setup_ftnode_paritions.) because there are
1915 // existing optimizations that would prevent us from bringing all
1916 // of this node's partitions into memory. Instead, We use the
1917 // existing bfe and node to set the bfe's child_to_search member.
1918 // Then we create a temporary bfe that needs all the nodes to make
1919 // sure we properly intitialize our partitions before filling them
1920 // in from our soon-to-be-upgraded node.
1921 update_bfe_using_ftnode(node, bfe);
1922 ftnode_fetch_extra temp_bfe;
1923 temp_bfe.create_for_full_read(nullptr);
1924 setup_partitions_using_bfe(node, &temp_bfe, true);
1925
1926 // Cache the highest MSN generated for the message buffers. This
1927 // will be set in the ftnode.
1928 //
1929 // The way we choose MSNs for upgraded messages is delicate. The
1930 // field `highest_unused_msn_for_upgrade' in the header is always an
1931 // MSN that no message has yet. So when we have N messages that need
1932 // MSNs, we decrement it by N, and then use it and the N-1 MSNs less
1933 // than it, but we do not use the value we decremented it to.
1934 //
1935 // In the code below, we initialize `lowest' with the value of
1936 // `highest_unused_msn_for_upgrade' after it is decremented, so we
1937 // need to be sure to increment it once before we enqueue our first
1938 // message.
1939 MSN highest_msn;
1940 highest_msn.msn = 0;
1941
1942 // Deserialize de-compressed buffers.
1943 for (int i = 0; i < node->n_children; ++i) {
1944 NONLEAF_CHILDINFO bnc = BNC(node, i);
1945 MSN highest_msn_in_this_buffer = deserialize_child_buffer_v13(bfe->ft, bnc, rb);
1946 if (highest_msn.msn == 0) {
1947 highest_msn.msn = highest_msn_in_this_buffer.msn;
1948 }
1949 }
1950
1951 // Assign the highest msn from our upgrade message buffers
1952 node->max_msn_applied_to_node_on_disk = highest_msn;
1953 // Since we assigned MSNs to this node's messages, we need to dirty it.
1954 node->set_dirty();
1955
1956 // Must compute the checksum now (rather than at the end, while we
1957 // still have the pointer to the buffer).
1958 if (version >= FT_FIRST_LAYOUT_VERSION_WITH_END_TO_END_CHECKSUM) {
1959 uint32_t expected_xsum = toku_dtoh32(*(uint32_t*)(rb->buf+rb->size-4)); // 27. checksum
1960 uint32_t actual_xsum = toku_x1764_memory(rb->buf, rb->size-4);
1961 if (expected_xsum != actual_xsum) {
1962 fprintf(stderr, "%s:%d: Bad checksum: expected = %" PRIx32 ", actual= %" PRIx32 "\n",
1963 __FUNCTION__,
1964 __LINE__,
1965 expected_xsum,
1966 actual_xsum);
1967 fprintf(stderr,
1968 "Checksum failure while reading node in file %s.\n",
1969 toku_cachefile_fname_in_env(bfe->ft->cf));
1970 fflush(stderr);
1971 return toku_db_badformat();
1972 }
1973 }
1974
1975 return 0;
1976 }
1977
1978 // This function takes a deserialized version 13 or 14 buffer and
1979 // constructs the associated leaf ftnode object.
1980 static int
deserialize_and_upgrade_leaf_node(FTNODE node,struct rbuf * rb,ftnode_fetch_extra * bfe,STAT64INFO info)1981 deserialize_and_upgrade_leaf_node(FTNODE node,
1982 struct rbuf *rb,
1983 ftnode_fetch_extra *bfe,
1984 STAT64INFO info)
1985 {
1986 int r = 0;
1987 int version = node->layout_version_read_from_disk;
1988
1989 // This is a leaf node, so the offsets in the buffer will be
1990 // different from the internal node offsets above.
1991 uint64_t nkeys = rbuf_ulonglong(rb); // 10. nkeys
1992 uint64_t ndata = rbuf_ulonglong(rb); // 11. ndata
1993 uint64_t dsize = rbuf_ulonglong(rb); // 12. dsize
1994 invariant(nkeys == ndata);
1995 if (info) {
1996 // info is non-null if we're trying to upgrade old subtree
1997 // estimates to stat64info
1998 info->numrows += nkeys;
1999 info->numbytes += dsize;
2000 }
2001
2002 // This is the optimized for upgrade field.
2003 if (version == FT_LAYOUT_VERSION_14) {
2004 (void) rbuf_int(rb); // 13. optimized
2005 }
2006
2007 // npartitions - This is really the number of leaf entries in
2008 // our single basement node. There should only be 1 (ONE)
2009 // partition, so there shouldn't be any pivot key stored. This
2010 // means the loop will not iterate. We could remove the loop and
2011 // assert that the value is indeed 1.
2012 int npartitions = rbuf_int(rb); // 14. npartitions
2013 assert(npartitions == 1);
2014
2015 // Set number of children to 1, since we will only have one
2016 // basement node.
2017 node->n_children = 1;
2018 XMALLOC_N(node->n_children, node->bp);
2019 node->pivotkeys.create_empty();
2020
2021 // Create one basement node to contain all the leaf entries by
2022 // setting up the single partition and updating the bfe.
2023 update_bfe_using_ftnode(node, bfe);
2024 ftnode_fetch_extra temp_bfe;
2025 temp_bfe.create_for_full_read(bfe->ft);
2026 setup_partitions_using_bfe(node, &temp_bfe, true);
2027
2028 // 11. Deserialize the partition maps, though they are not used in the
2029 // newer versions of ftnodes.
2030 for (int i = 0; i < node->n_children; ++i) {
2031 // The following fields were previously used by the `sub_block_map'
2032 // They include:
2033 // - 4 byte index
2034 (void) rbuf_int(rb);
2035 // - 4 byte offset
2036 (void) rbuf_int(rb);
2037 // - 4 byte size
2038 (void) rbuf_int(rb);
2039 }
2040
2041 // Copy all of the leaf entries into the single basement node.
2042
2043 // The number of leaf entries in buffer.
2044 int n_in_buf = rbuf_int(rb); // 15. # of leaves
2045 BLB_SEQINSERT(node,0) = 0;
2046 BASEMENTNODE bn = BLB(node, 0);
2047
2048 // Read the leaf entries from the buffer, advancing the buffer
2049 // as we go.
2050 bool has_end_to_end_checksum = (version >= FT_FIRST_LAYOUT_VERSION_WITH_END_TO_END_CHECKSUM);
2051 if (version <= FT_LAYOUT_VERSION_13) {
2052 // Create our mempool.
2053 // Loop through
2054 for (int i = 0; i < n_in_buf; ++i) {
2055 LEAFENTRY_13 le = reinterpret_cast<LEAFENTRY_13>(&rb->buf[rb->ndone]);
2056 uint32_t disksize = leafentry_disksize_13(le);
2057 rb->ndone += disksize; // 16. leaf entry (13)
2058 invariant(rb->ndone<=rb->size);
2059 LEAFENTRY new_le;
2060 size_t new_le_size;
2061 void* key = NULL;
2062 uint32_t keylen = 0;
2063 r = toku_le_upgrade_13_14(le,
2064 &key,
2065 &keylen,
2066 &new_le_size,
2067 &new_le);
2068 assert_zero(r);
2069 // Copy the pointer value straight into the OMT
2070 LEAFENTRY new_le_in_bn = nullptr;
2071 void *maybe_free;
2072 bn->data_buffer.get_space_for_insert(
2073 i,
2074 key,
2075 keylen,
2076 new_le_size,
2077 &new_le_in_bn,
2078 &maybe_free
2079 );
2080 if (maybe_free) {
2081 toku_free(maybe_free);
2082 }
2083 memcpy(new_le_in_bn, new_le, new_le_size);
2084 toku_free(new_le);
2085 }
2086 } else {
2087 uint32_t data_size = rb->size - rb->ndone;
2088 if (has_end_to_end_checksum) {
2089 data_size -= sizeof(uint32_t);
2090 }
2091 bn->data_buffer.deserialize_from_rbuf(n_in_buf, rb, data_size, node->layout_version_read_from_disk);
2092 }
2093
2094 // Whatever this is must be less than the MSNs of every message above
2095 // it, so it's ok to take it here.
2096 bn->max_msn_applied = bfe->ft->h->highest_unused_msn_for_upgrade;
2097 bn->stale_ancestor_messages_applied = false;
2098 node->max_msn_applied_to_node_on_disk = bn->max_msn_applied;
2099
2100 // Checksum (end to end) is only on version 14
2101 if (has_end_to_end_checksum) {
2102 uint32_t expected_xsum = rbuf_int(rb); // 17. checksum
2103 uint32_t actual_xsum = toku_x1764_memory(rb->buf, rb->size - 4);
2104 if (expected_xsum != actual_xsum) {
2105 fprintf(stderr, "%s:%d: Bad checksum: expected = %" PRIx32 ", actual= %" PRIx32 "\n",
2106 __FUNCTION__,
2107 __LINE__,
2108 expected_xsum,
2109 actual_xsum);
2110 fprintf(stderr,
2111 "Checksum failure while reading node in file %s.\n",
2112 toku_cachefile_fname_in_env(bfe->ft->cf));
2113 fflush(stderr);
2114 return toku_db_badformat();
2115 }
2116 }
2117
2118 // We should have read the whole block by this point.
2119 if (rb->ndone != rb->size) {
2120 // TODO: Error handling.
2121 return 1;
2122 }
2123
2124 return r;
2125 }
2126
2127 static int read_and_decompress_block_from_fd_into_rbuf(
2128 int fd,
2129 BLOCKNUM blocknum,
2130 DISKOFF offset,
2131 DISKOFF size,
2132 FT ft,
2133 struct rbuf *rb,
2134 /* out */ int *layout_version_p);
2135
2136 // This function upgrades a version 14 or 13 ftnode to the current
2137 // version. NOTE: This code assumes the first field of the rbuf has
2138 // already been read from the buffer (namely the layout_version of the
2139 // ftnode.)
deserialize_and_upgrade_ftnode(FTNODE node,FTNODE_DISK_DATA * ndd,BLOCKNUM blocknum,ftnode_fetch_extra * bfe,STAT64INFO info,int fd)2140 static int deserialize_and_upgrade_ftnode(FTNODE node,
2141 FTNODE_DISK_DATA *ndd,
2142 BLOCKNUM blocknum,
2143 ftnode_fetch_extra *bfe,
2144 STAT64INFO info,
2145 int fd) {
2146 int r = 0;
2147 int version;
2148
2149 // I. First we need to de-compress the entire node, only then can
2150 // we read the different sub-sections.
2151 // get the file offset and block size for the block
2152 DISKOFF offset, size;
2153 bfe->ft->blocktable.translate_blocknum_to_offset_size(blocknum, &offset, &size);
2154
2155 struct rbuf rb;
2156 r = read_and_decompress_block_from_fd_into_rbuf(fd,
2157 blocknum,
2158 offset,
2159 size,
2160 bfe->ft,
2161 &rb,
2162 &version);
2163 if (r != 0) {
2164 const char* fname = toku_cachefile_fname_in_env(bfe->ft->cf);
2165 fprintf(stderr,
2166 "%s:%d:deserialize_and_upgrade_ftnode - "
2167 "file[%s], blocknum[%ld], "
2168 "read_and_decompress_block_from_fd_into_rbuf failed with %d\n",
2169 __FILE__,
2170 __LINE__,
2171 fname ? fname : "unknown",
2172 blocknum.b,
2173 r);
2174 goto exit;
2175 }
2176
2177 // Re-read the magic field from the previous call, since we are
2178 // restarting with a fresh rbuf.
2179 {
2180 const void *magic;
2181 rbuf_literal_bytes(&rb, &magic, 8); // 1. magic
2182 }
2183
2184 // II. Start reading ftnode fields out of the decompressed buffer.
2185
2186 // Copy over old version info.
2187 node->layout_version_read_from_disk = rbuf_int(&rb); // 2. layout version
2188 version = node->layout_version_read_from_disk;
2189 if (version > FT_LAYOUT_VERSION_14) {
2190 const char* fname = toku_cachefile_fname_in_env(bfe->ft->cf);
2191 fprintf(stderr,
2192 "%s:%d:deserialize_and_upgrade_ftnode - "
2193 "file[%s], blocknum[%ld], version[%d] > "
2194 "FT_LAYOUT_VERSION_14[%d]\n",
2195 __FILE__,
2196 __LINE__,
2197 fname ? fname : "unknown",
2198 blocknum.b,
2199 version,
2200 FT_LAYOUT_VERSION_14);
2201 dump_bad_block(rb.buf, rb.size);
2202 goto exit;
2203 }
2204 assert(version <= FT_LAYOUT_VERSION_14);
2205 // Upgrade the current version number to the current version.
2206 node->layout_version = FT_LAYOUT_VERSION;
2207
2208 node->layout_version_original = rbuf_int(&rb); // 3. original layout
2209 node->build_id = rbuf_int(&rb); // 4. build id
2210
2211 // The remaining offsets into the rbuf do not map to the current
2212 // version, so we need to fill in the blanks and ignore older
2213 // fields.
2214 (void)rbuf_int(&rb); // 5. nodesize
2215 node->flags = rbuf_int(&rb); // 6. flags
2216 node->height = rbuf_int(&rb); // 7. height
2217
2218 // If the version is less than 14, there are two extra ints here.
2219 // we would need to ignore them if they are there.
2220 // These are the 'fingerprints'.
2221 if (version == FT_LAYOUT_VERSION_13) {
2222 (void) rbuf_int(&rb); // 8. rand4
2223 (void) rbuf_int(&rb); // 9. local
2224 }
2225
2226 // The next offsets are dependent on whether this is a leaf node
2227 // or not.
2228
2229 // III. Read in Leaf and Internal Node specific data.
2230
2231 // Check height to determine whether this is a leaf node or not.
2232 if (node->height > 0) {
2233 r = deserialize_and_upgrade_internal_node(node, &rb, bfe, info);
2234 } else {
2235 r = deserialize_and_upgrade_leaf_node(node, &rb, bfe, info);
2236 }
2237
2238 XMALLOC_N(node->n_children, *ndd);
2239 // Initialize the partition locations to zero, because version 14
2240 // and below have no notion of partitions on disk.
2241 for (int i=0; i<node->n_children; i++) {
2242 BP_START(*ndd,i) = 0;
2243 BP_SIZE (*ndd,i) = 0;
2244 }
2245
2246 toku_free(rb.buf);
2247 exit:
2248 return r;
2249 }
2250
2251 // Effect: deserializes a ftnode that is in rb (with pointer of rb just past the
2252 // magic) into a FTNODE.
deserialize_ftnode_from_rbuf(FTNODE * ftnode,FTNODE_DISK_DATA * ndd,BLOCKNUM blocknum,uint32_t fullhash,ftnode_fetch_extra * bfe,STAT64INFO info,struct rbuf * rb,int fd)2253 static int deserialize_ftnode_from_rbuf(FTNODE *ftnode,
2254 FTNODE_DISK_DATA *ndd,
2255 BLOCKNUM blocknum,
2256 uint32_t fullhash,
2257 ftnode_fetch_extra *bfe,
2258 STAT64INFO info,
2259 struct rbuf *rb,
2260 int fd) {
2261 int r = 0;
2262 struct sub_block sb_node_info;
2263
2264 tokutime_t t0, t1;
2265 tokutime_t decompress_time = 0;
2266 tokutime_t deserialize_time = 0;
2267 const char* fname = toku_cachefile_fname_in_env(bfe->ft->cf);
2268
2269 t0 = toku_time_now();
2270
2271 FTNODE node = alloc_ftnode_for_deserialize(fullhash, blocknum);
2272
2273 // now start reading from rbuf
2274 // first thing we do is read the header information
2275 const void *magic;
2276 rbuf_literal_bytes(rb, &magic, 8);
2277 if (memcmp(magic, "tokuleaf", 8) != 0 &&
2278 memcmp(magic, "tokunode", 8) != 0) {
2279 fprintf(stderr,
2280 "%s:%d:deserialize_ftnode_from_rbuf - "
2281 "file[%s], blocknum[%ld], unrecognized magic number "
2282 "%2.2x %2.2x %2.2x %2.2x %2.2x %2.2x %2.2x %2.2x\n",
2283 __FILE__,
2284 __LINE__,
2285 fname ? fname : "unknown",
2286 blocknum.b,
2287 static_cast<const uint8_t *>(magic)[0],
2288 static_cast<const uint8_t *>(magic)[1],
2289 static_cast<const uint8_t *>(magic)[2],
2290 static_cast<const uint8_t *>(magic)[3],
2291 static_cast<const uint8_t *>(magic)[4],
2292 static_cast<const uint8_t *>(magic)[5],
2293 static_cast<const uint8_t *>(magic)[6],
2294 static_cast<const uint8_t *>(magic)[7]);
2295 dump_bad_block(rb->buf, rb->size);
2296
2297 r = toku_db_badformat();
2298 goto cleanup;
2299 }
2300
2301 node->layout_version_read_from_disk = rbuf_int(rb);
2302 lazy_assert(node->layout_version_read_from_disk >= FT_LAYOUT_MIN_SUPPORTED_VERSION);
2303
2304 // Check if we are reading in an older node version.
2305 if (node->layout_version_read_from_disk <= FT_LAYOUT_VERSION_14) {
2306 int version = node->layout_version_read_from_disk;
2307 // Perform the upgrade.
2308 r = deserialize_and_upgrade_ftnode(node, ndd, blocknum, bfe, info, fd);
2309 if (r != 0) {
2310 fprintf(stderr,
2311 "%s:%d:deserialize_ftnode_from_rbuf - "
2312 "file[%s], blocknum[%ld], deserialize_and_upgrade_ftnode "
2313 "failed with %d\n",
2314 __FILE__,
2315 __LINE__,
2316 fname ? fname : "unknown",
2317 blocknum.b,
2318 r);
2319 dump_bad_block(rb->buf, rb->size);
2320 goto cleanup;
2321 }
2322
2323 if (version <= FT_LAYOUT_VERSION_13) {
2324 // deprecate 'TOKU_DB_VALCMP_BUILTIN'. just remove the flag
2325 node->flags &= ~TOKU_DB_VALCMP_BUILTIN_13;
2326 }
2327
2328 // If everything is ok, just re-assign the ftnode and retrn.
2329 *ftnode = node;
2330 r = 0;
2331 goto cleanup;
2332 }
2333
2334 // Upgrade versions after 14 to current. This upgrade is trivial, it
2335 // removes the optimized for upgrade field, which has already been
2336 // removed in the deserialization code (see
2337 // deserialize_ftnode_info()).
2338 node->layout_version = FT_LAYOUT_VERSION;
2339 node->layout_version_original = rbuf_int(rb);
2340 node->build_id = rbuf_int(rb);
2341 node->n_children = rbuf_int(rb);
2342 XMALLOC_N(node->n_children, node->bp);
2343 XMALLOC_N(node->n_children, *ndd);
2344 // read the partition locations
2345 for (int i=0; i<node->n_children; i++) {
2346 BP_START(*ndd,i) = rbuf_int(rb);
2347 BP_SIZE (*ndd,i) = rbuf_int(rb);
2348 }
2349 // verify checksum of header stored
2350 uint32_t checksum;
2351 checksum = toku_x1764_memory(rb->buf, rb->ndone);
2352 uint32_t stored_checksum;
2353 stored_checksum = rbuf_int(rb);
2354 if (stored_checksum != checksum) {
2355 fprintf(
2356 stderr,
2357 "%s:%d:deserialize_ftnode_from_rbuf - "
2358 "file[%s], blocknum[%ld], stored_checksum[%d] != checksum[%d]\n",
2359 __FILE__,
2360 __LINE__,
2361 fname ? fname : "unknown",
2362 blocknum.b,
2363 stored_checksum,
2364 checksum);
2365 dump_bad_block(rb->buf, rb->size);
2366 invariant(stored_checksum == checksum);
2367 }
2368
2369 // now we read and decompress the pivot and child information
2370 sub_block_init(&sb_node_info);
2371 {
2372 tokutime_t sb_decompress_t0 = toku_time_now();
2373 r = read_and_decompress_sub_block(rb, &sb_node_info);
2374 tokutime_t sb_decompress_t1 = toku_time_now();
2375 decompress_time += sb_decompress_t1 - sb_decompress_t0;
2376 if (r != 0) {
2377 fprintf(
2378 stderr,
2379 "%s:%d:deserialize_ftnode_from_rbuf - "
2380 "file[%s], blocknum[%ld], read_and_decompress_sub_block failed "
2381 "with %d\n",
2382 __FILE__,
2383 __LINE__,
2384 fname ? fname : "unknown",
2385 blocknum.b,
2386 r);
2387 dump_bad_block(
2388 static_cast<unsigned char *>(sb_node_info.uncompressed_ptr),
2389 sb_node_info.uncompressed_size);
2390 dump_bad_block(rb->buf, rb->size);
2391 goto cleanup;
2392 }
2393 }
2394
2395 // at this point, sb->uncompressed_ptr stores the serialized node info
2396 r = deserialize_ftnode_info(&sb_node_info, node);
2397 if (r != 0) {
2398 fprintf(
2399 stderr,
2400 "%s:%d:deserialize_ftnode_from_rbuf - "
2401 "file[%s], blocknum[%ld], deserialize_ftnode_info failed with "
2402 "%d\n",
2403 __FILE__,
2404 __LINE__,
2405 fname ? fname : "unknown",
2406 blocknum.b,
2407 r);
2408 dump_bad_block(rb->buf, rb->size);
2409 goto cleanup;
2410 }
2411 toku_free(sb_node_info.uncompressed_ptr);
2412
2413 // now that the node info has been deserialized, we can proceed to
2414 // deserialize the individual sub blocks
2415
2416 // setup the memory of the partitions
2417 // for partitions being decompressed, create either message buffer or
2418 // basement node
2419 // for partitions staying compressed, create sub_block
2420 setup_ftnode_partitions(node, bfe, true);
2421
2422 // This loop is parallelizeable, since we don't have a dependency on the
2423 // work done so far.
2424 for (int i = 0; i < node->n_children; i++) {
2425 uint32_t curr_offset = BP_START(*ndd, i);
2426 uint32_t curr_size = BP_SIZE(*ndd, i);
2427 // the compressed, serialized partitions start at where rb is currently
2428 // pointing, which would be rb->buf + rb->ndone
2429 // we need to intialize curr_rbuf to point to this place
2430 struct rbuf curr_rbuf = {.buf = nullptr, .size = 0, .ndone = 0};
2431 rbuf_init(&curr_rbuf, rb->buf + curr_offset, curr_size);
2432
2433 //
2434 // now we are at the point where we have:
2435 // - read the entire compressed node off of disk,
2436 // - decompressed the pivot and offset information,
2437 // - have arrived at the individual partitions.
2438 //
2439 // Based on the information in bfe, we want to decompress a subset of
2440 // of the compressed partitions (also possibly none or possibly all)
2441 // The partitions that we want to decompress and make available
2442 // to the node, we do, the rest we simply copy in compressed
2443 // form into the node, and set the state of the partition to
2444 // PT_COMPRESSED
2445 //
2446
2447 struct sub_block curr_sb;
2448 sub_block_init(&curr_sb);
2449
2450 // curr_rbuf is passed by value to decompress_and_deserialize_worker,
2451 // so there's no ugly race condition.
2452 // This would be more obvious if curr_rbuf were an array.
2453
2454 // deserialize_ftnode_info figures out what the state
2455 // should be and sets up the memory so that we are ready to use it
2456
2457 switch (BP_STATE(node, i)) {
2458 case PT_AVAIL: {
2459 // case where we read and decompress the partition
2460 tokutime_t partition_decompress_time;
2461 r = decompress_and_deserialize_worker(
2462 curr_rbuf,
2463 curr_sb,
2464 node,
2465 i,
2466 bfe->ft->cmp,
2467 &partition_decompress_time);
2468 decompress_time += partition_decompress_time;
2469 if (r != 0) {
2470 fprintf(
2471 stderr,
2472 "%s:%d:deserialize_ftnode_from_rbuf - "
2473 "file[%s], blocknum[%ld], childnum[%d], "
2474 "decompress_and_deserialize_worker failed with %d\n",
2475 __FILE__,
2476 __LINE__,
2477 fname ? fname : "unknown",
2478 blocknum.b,
2479 i,
2480 r);
2481 dump_bad_block(rb->buf, rb->size);
2482 goto cleanup;
2483 }
2484 break;
2485 }
2486 case PT_COMPRESSED:
2487 // case where we leave the partition in the compressed state
2488 r = check_and_copy_compressed_sub_block_worker(curr_rbuf, curr_sb, node, i);
2489 if (r != 0) {
2490 fprintf(
2491 stderr,
2492 "%s:%d:deserialize_ftnode_from_rbuf - "
2493 "file[%s], blocknum[%ld], childnum[%d], "
2494 "check_and_copy_compressed_sub_block_worker failed with "
2495 "%d\n",
2496 __FILE__,
2497 __LINE__,
2498 fname ? fname : "unknown",
2499 blocknum.b,
2500 i,
2501 r);
2502 dump_bad_block(rb->buf, rb->size);
2503 goto cleanup;
2504 }
2505 break;
2506 case PT_INVALID: // this is really bad
2507 case PT_ON_DISK: // it's supposed to be in memory.
2508 abort();
2509 }
2510 }
2511 *ftnode = node;
2512 r = 0;
2513
2514 cleanup:
2515 if (r == 0) {
2516 t1 = toku_time_now();
2517 deserialize_time = (t1 - t0) - decompress_time;
2518 bfe->deserialize_time += deserialize_time;
2519 bfe->decompress_time += decompress_time;
2520 toku_ft_status_update_deserialize_times(node, deserialize_time, decompress_time);
2521 }
2522 if (r != 0) {
2523 // NOTE: Right now, callers higher in the stack will assert on
2524 // failure, so this is OK for production. However, if we
2525 // create tools that use this function to search for errors in
2526 // the FT, then we will leak memory.
2527 if (node) {
2528 toku_free(node);
2529 }
2530 }
2531 return r;
2532 }
2533
2534 int
toku_deserialize_bp_from_disk(FTNODE node,FTNODE_DISK_DATA ndd,int childnum,int fd,ftnode_fetch_extra * bfe)2535 toku_deserialize_bp_from_disk(FTNODE node, FTNODE_DISK_DATA ndd, int childnum, int fd, ftnode_fetch_extra *bfe) {
2536 int r = 0;
2537 assert(BP_STATE(node,childnum) == PT_ON_DISK);
2538 assert(node->bp[childnum].ptr.tag == BCT_NULL);
2539
2540 //
2541 // setup the partition
2542 //
2543 setup_available_ftnode_partition(node, childnum);
2544 BP_STATE(node,childnum) = PT_AVAIL;
2545
2546 //
2547 // read off disk and make available in memory
2548 //
2549 // get the file offset and block size for the block
2550 DISKOFF node_offset, total_node_disk_size;
2551 bfe->ft->blocktable.translate_blocknum_to_offset_size(node->blocknum, &node_offset, &total_node_disk_size);
2552
2553 uint32_t curr_offset = BP_START(ndd, childnum);
2554 uint32_t curr_size = BP_SIZE (ndd, childnum);
2555
2556 struct rbuf rb;
2557 rbuf_init(&rb, nullptr, 0);
2558
2559 uint32_t pad_at_beginning = (node_offset+curr_offset)%512;
2560 uint32_t padded_size = roundup_to_multiple(512, pad_at_beginning + curr_size);
2561
2562 toku::scoped_malloc_aligned raw_block_buf(padded_size, 512);
2563 uint8_t *raw_block = reinterpret_cast<uint8_t *>(raw_block_buf.get());
2564 rbuf_init(&rb, pad_at_beginning+raw_block, curr_size);
2565 tokutime_t t0 = toku_time_now();
2566
2567 // read the block
2568 assert(0==((unsigned long long)raw_block)%512); // for O_DIRECT
2569 assert(0==(padded_size)%512);
2570 assert(0==(node_offset+curr_offset-pad_at_beginning)%512);
2571 ssize_t rlen = toku_os_pread(fd, raw_block, padded_size, node_offset+curr_offset-pad_at_beginning);
2572 assert((DISKOFF)rlen >= pad_at_beginning + curr_size); // we read in at least enough to get what we wanted
2573 assert((DISKOFF)rlen <= padded_size); // we didn't read in too much.
2574
2575 tokutime_t t1 = toku_time_now();
2576
2577 // read sub block
2578 struct sub_block curr_sb;
2579 sub_block_init(&curr_sb);
2580 r = read_compressed_sub_block(&rb, &curr_sb);
2581 if (r != 0) {
2582 return r;
2583 }
2584 invariant(curr_sb.compressed_ptr != NULL);
2585
2586 // decompress
2587 toku::scoped_malloc uncompressed_buf(curr_sb.uncompressed_size);
2588 curr_sb.uncompressed_ptr = uncompressed_buf.get();
2589 toku_decompress((Bytef *) curr_sb.uncompressed_ptr, curr_sb.uncompressed_size,
2590 (Bytef *) curr_sb.compressed_ptr, curr_sb.compressed_size);
2591
2592 // deserialize
2593 tokutime_t t2 = toku_time_now();
2594
2595 r = deserialize_ftnode_partition(&curr_sb, node, childnum, bfe->ft->cmp);
2596
2597 tokutime_t t3 = toku_time_now();
2598
2599 // capture stats
2600 tokutime_t io_time = t1 - t0;
2601 tokutime_t decompress_time = t2 - t1;
2602 tokutime_t deserialize_time = t3 - t2;
2603 bfe->deserialize_time += deserialize_time;
2604 bfe->decompress_time += decompress_time;
2605 toku_ft_status_update_deserialize_times(node, deserialize_time, decompress_time);
2606
2607 bfe->bytes_read = rlen;
2608 bfe->io_time = io_time;
2609
2610 return r;
2611 }
2612
2613 // Take a ftnode partition that is in the compressed state, and make it avail
toku_deserialize_bp_from_compressed(FTNODE node,int childnum,ftnode_fetch_extra * bfe)2614 int toku_deserialize_bp_from_compressed(FTNODE node,
2615 int childnum,
2616 ftnode_fetch_extra *bfe) {
2617
2618 int r = 0;
2619 assert(BP_STATE(node, childnum) == PT_COMPRESSED);
2620 SUB_BLOCK curr_sb = BSB(node, childnum);
2621
2622 toku::scoped_malloc uncompressed_buf(curr_sb->uncompressed_size);
2623 assert(curr_sb->uncompressed_ptr == NULL);
2624 curr_sb->uncompressed_ptr = uncompressed_buf.get();
2625
2626 setup_available_ftnode_partition(node, childnum);
2627 BP_STATE(node,childnum) = PT_AVAIL;
2628
2629 // decompress the sub_block
2630 tokutime_t t0 = toku_time_now();
2631
2632 toku_decompress((Bytef *)curr_sb->uncompressed_ptr,
2633 curr_sb->uncompressed_size,
2634 (Bytef *)curr_sb->compressed_ptr,
2635 curr_sb->compressed_size);
2636
2637 tokutime_t t1 = toku_time_now();
2638
2639 r = deserialize_ftnode_partition(curr_sb, node, childnum, bfe->ft->cmp);
2640 if (r != 0) {
2641 const char* fname = toku_cachefile_fname_in_env(bfe->ft->cf);
2642 fprintf(stderr,
2643 "%s:%d:toku_deserialize_bp_from_compressed - "
2644 "file[%s], blocknum[%ld], "
2645 "deserialize_ftnode_partition failed with %d\n",
2646 __FILE__,
2647 __LINE__,
2648 fname ? fname : "unknown",
2649 node->blocknum.b,
2650 r);
2651 dump_bad_block(static_cast<unsigned char *>(curr_sb->compressed_ptr),
2652 curr_sb->compressed_size);
2653 dump_bad_block(static_cast<unsigned char *>(curr_sb->uncompressed_ptr),
2654 curr_sb->uncompressed_size);
2655 }
2656
2657 tokutime_t t2 = toku_time_now();
2658
2659 tokutime_t decompress_time = t1 - t0;
2660 tokutime_t deserialize_time = t2 - t1;
2661 bfe->deserialize_time += deserialize_time;
2662 bfe->decompress_time += decompress_time;
2663 toku_ft_status_update_deserialize_times(node, deserialize_time, decompress_time);
2664
2665 toku_free(curr_sb->compressed_ptr);
2666 toku_free(curr_sb);
2667 return r;
2668 }
2669
deserialize_ftnode_from_fd(int fd,BLOCKNUM blocknum,uint32_t fullhash,FTNODE * ftnode,FTNODE_DISK_DATA * ndd,ftnode_fetch_extra * bfe,STAT64INFO info)2670 static int deserialize_ftnode_from_fd(int fd,
2671 BLOCKNUM blocknum,
2672 uint32_t fullhash,
2673 FTNODE *ftnode,
2674 FTNODE_DISK_DATA *ndd,
2675 ftnode_fetch_extra *bfe,
2676 STAT64INFO info) {
2677 struct rbuf rb = RBUF_INITIALIZER;
2678
2679 tokutime_t t0 = toku_time_now();
2680 read_block_from_fd_into_rbuf(fd, blocknum, bfe->ft, &rb);
2681 tokutime_t t1 = toku_time_now();
2682
2683 // Decompress and deserialize the ftnode. Time statistics
2684 // are taken inside this function.
2685 int r = deserialize_ftnode_from_rbuf(
2686 ftnode, ndd, blocknum, fullhash, bfe, info, &rb, fd);
2687 if (r != 0) {
2688 const char* fname = toku_cachefile_fname_in_env(bfe->ft->cf);
2689 fprintf(
2690 stderr,
2691 "%s:%d:deserialize_ftnode_from_fd - "
2692 "file[%s], blocknum[%ld], deserialize_ftnode_from_rbuf failed with "
2693 "%d\n",
2694 __FILE__,
2695 __LINE__,
2696 fname ? fname : "unknown",
2697 blocknum.b,
2698 r);
2699 dump_bad_block(rb.buf, rb.size);
2700 }
2701
2702 bfe->bytes_read = rb.size;
2703 bfe->io_time = t1 - t0;
2704 toku_free(rb.buf);
2705 return r;
2706 }
2707
2708 // Effect: Read a node in. If possible, read just the header.
2709 // Perform version upgrade if necessary.
toku_deserialize_ftnode_from(int fd,BLOCKNUM blocknum,uint32_t fullhash,FTNODE * ftnode,FTNODE_DISK_DATA * ndd,ftnode_fetch_extra * bfe)2710 int toku_deserialize_ftnode_from(int fd,
2711 BLOCKNUM blocknum,
2712 uint32_t fullhash,
2713 FTNODE *ftnode,
2714 FTNODE_DISK_DATA *ndd,
2715 ftnode_fetch_extra *bfe) {
2716 int r = 0;
2717 struct rbuf rb = RBUF_INITIALIZER;
2718
2719 // each function below takes the appropriate io/decompression/deserialize
2720 // statistics
2721
2722 if (!bfe->read_all_partitions) {
2723 read_ftnode_header_from_fd_into_rbuf_if_small_enough(
2724 fd, blocknum, bfe->ft, &rb, bfe);
2725 r = deserialize_ftnode_header_from_rbuf_if_small_enough(
2726 ftnode, ndd, blocknum, fullhash, bfe, &rb, fd);
2727 } else {
2728 // force us to do it the old way
2729 r = -1;
2730 }
2731 if (r != 0) {
2732 // Something went wrong, go back to doing it the old way.
2733 r = deserialize_ftnode_from_fd(
2734 fd, blocknum, fullhash, ftnode, ndd, bfe, nullptr);
2735 }
2736
2737 toku_free(rb.buf);
2738 return r;
2739 }
2740
2741 void
toku_verify_or_set_counts(FTNODE UU (node))2742 toku_verify_or_set_counts(FTNODE UU(node)) {
2743 }
2744
2745 int
toku_db_badformat(void)2746 toku_db_badformat(void) {
2747 return DB_BADFORMAT;
2748 }
2749
2750 static size_t
serialize_rollback_log_size(ROLLBACK_LOG_NODE log)2751 serialize_rollback_log_size(ROLLBACK_LOG_NODE log) {
2752 size_t size = node_header_overhead //8 "tokuroll", 4 version, 4 version_original, 4 build_id
2753 +16 //TXNID_PAIR
2754 +8 //sequence
2755 +8 //blocknum
2756 +8 //previous (blocknum)
2757 +8 //resident_bytecount
2758 +8 //memarena size
2759 +log->rollentry_resident_bytecount;
2760 return size;
2761 }
2762
2763 static void
serialize_rollback_log_node_to_buf(ROLLBACK_LOG_NODE log,char * buf,size_t calculated_size,int UU (n_sub_blocks),struct sub_block UU (sub_block[]))2764 serialize_rollback_log_node_to_buf(ROLLBACK_LOG_NODE log, char *buf, size_t calculated_size, int UU(n_sub_blocks), struct sub_block UU(sub_block[])) {
2765 struct wbuf wb;
2766 wbuf_init(&wb, buf, calculated_size);
2767 { //Serialize rollback log to local wbuf
2768 wbuf_nocrc_literal_bytes(&wb, "tokuroll", 8);
2769 lazy_assert(log->layout_version == FT_LAYOUT_VERSION);
2770 wbuf_nocrc_int(&wb, log->layout_version);
2771 wbuf_nocrc_int(&wb, log->layout_version_original);
2772 wbuf_nocrc_uint(&wb, BUILD_ID);
2773 wbuf_nocrc_TXNID_PAIR(&wb, log->txnid);
2774 wbuf_nocrc_ulonglong(&wb, log->sequence);
2775 wbuf_nocrc_BLOCKNUM(&wb, log->blocknum);
2776 wbuf_nocrc_BLOCKNUM(&wb, log->previous);
2777 wbuf_nocrc_ulonglong(&wb, log->rollentry_resident_bytecount);
2778 //Write down memarena size needed to restore
2779 wbuf_nocrc_ulonglong(&wb, log->rollentry_arena.total_size_in_use());
2780
2781 {
2782 //Store rollback logs
2783 struct roll_entry *item;
2784 size_t done_before = wb.ndone;
2785 for (item = log->newest_logentry; item; item = item->prev) {
2786 toku_logger_rollback_wbuf_nocrc_write(&wb, item);
2787 }
2788 lazy_assert(done_before + log->rollentry_resident_bytecount == wb.ndone);
2789 }
2790 }
2791 lazy_assert(wb.ndone == wb.size);
2792 lazy_assert(calculated_size==wb.ndone);
2793 }
2794
2795 static void
serialize_uncompressed_block_to_memory(char * uncompressed_buf,int n_sub_blocks,struct sub_block sub_block[],enum toku_compression_method method,size_t * n_bytes_to_write,char ** bytes_to_write)2796 serialize_uncompressed_block_to_memory(char * uncompressed_buf,
2797 int n_sub_blocks,
2798 struct sub_block sub_block[/*n_sub_blocks*/],
2799 enum toku_compression_method method,
2800 /*out*/ size_t *n_bytes_to_write,
2801 /*out*/ char **bytes_to_write)
2802 // Guarantees that the malloc'd BYTES_TO_WRITE is 512-byte aligned (so that O_DIRECT will work)
2803 {
2804 // allocate space for the compressed uncompressed_buf
2805 size_t compressed_len = get_sum_compressed_size_bound(n_sub_blocks, sub_block, method);
2806 size_t sub_block_header_len = sub_block_header_size(n_sub_blocks);
2807 size_t header_len = node_header_overhead + sub_block_header_len + sizeof (uint32_t); // node + sub_block + checksum
2808 char *XMALLOC_N_ALIGNED(512, roundup_to_multiple(512, header_len + compressed_len), compressed_buf);
2809
2810 // copy the header
2811 memcpy(compressed_buf, uncompressed_buf, node_header_overhead);
2812 if (0) printf("First 4 bytes before compressing data are %02x%02x%02x%02x\n",
2813 uncompressed_buf[node_header_overhead], uncompressed_buf[node_header_overhead+1],
2814 uncompressed_buf[node_header_overhead+2], uncompressed_buf[node_header_overhead+3]);
2815
2816 // compress all of the sub blocks
2817 char *uncompressed_ptr = uncompressed_buf + node_header_overhead;
2818 char *compressed_ptr = compressed_buf + header_len;
2819 compressed_len = compress_all_sub_blocks(n_sub_blocks, sub_block, uncompressed_ptr, compressed_ptr, num_cores, ft_pool, method);
2820
2821 //if (0) printf("Block %" PRId64 " Size before compressing %u, after compression %" PRIu64 "\n", blocknum.b, calculated_size-node_header_overhead, (uint64_t) compressed_len);
2822
2823 // serialize the sub block header
2824 uint32_t *ptr = (uint32_t *)(compressed_buf + node_header_overhead);
2825 *ptr++ = toku_htod32(n_sub_blocks);
2826 for (int i=0; i<n_sub_blocks; i++) {
2827 ptr[0] = toku_htod32(sub_block[i].compressed_size);
2828 ptr[1] = toku_htod32(sub_block[i].uncompressed_size);
2829 ptr[2] = toku_htod32(sub_block[i].xsum);
2830 ptr += 3;
2831 }
2832
2833 // compute the header checksum and serialize it
2834 uint32_t header_length = (char *)ptr - (char *)compressed_buf;
2835 uint32_t xsum = toku_x1764_memory(compressed_buf, header_length);
2836 *ptr = toku_htod32(xsum);
2837
2838 uint32_t padded_len = roundup_to_multiple(512, header_len + compressed_len);
2839 // Zero out padding.
2840 for (uint32_t i = header_len+compressed_len; i < padded_len; i++) {
2841 compressed_buf[i] = 0;
2842 }
2843 *n_bytes_to_write = padded_len;
2844 *bytes_to_write = compressed_buf;
2845 }
2846
2847 void
toku_serialize_rollback_log_to_memory_uncompressed(ROLLBACK_LOG_NODE log,SERIALIZED_ROLLBACK_LOG_NODE serialized)2848 toku_serialize_rollback_log_to_memory_uncompressed(ROLLBACK_LOG_NODE log, SERIALIZED_ROLLBACK_LOG_NODE serialized) {
2849 // get the size of the serialized node
2850 size_t calculated_size = serialize_rollback_log_size(log);
2851
2852 serialized->len = calculated_size;
2853 serialized->n_sub_blocks = 0;
2854 // choose sub block parameters
2855 int sub_block_size = 0;
2856 size_t data_size = calculated_size - node_header_overhead;
2857 choose_sub_block_size(data_size, max_sub_blocks, &sub_block_size, &serialized->n_sub_blocks);
2858 lazy_assert(0 < serialized->n_sub_blocks && serialized->n_sub_blocks <= max_sub_blocks);
2859 lazy_assert(sub_block_size > 0);
2860
2861 // set the initial sub block size for all of the sub blocks
2862 for (int i = 0; i < serialized->n_sub_blocks; i++)
2863 sub_block_init(&serialized->sub_block[i]);
2864 set_all_sub_block_sizes(data_size, sub_block_size, serialized->n_sub_blocks, serialized->sub_block);
2865
2866 // allocate space for the serialized node
2867 XMALLOC_N(calculated_size, serialized->data);
2868 // serialize the node into buf
2869 serialize_rollback_log_node_to_buf(log, serialized->data, calculated_size, serialized->n_sub_blocks, serialized->sub_block);
2870 serialized->blocknum = log->blocknum;
2871 }
2872
toku_serialize_rollback_log_to(int fd,ROLLBACK_LOG_NODE log,SERIALIZED_ROLLBACK_LOG_NODE serialized_log,bool is_serialized,FT ft,bool for_checkpoint)2873 int toku_serialize_rollback_log_to(int fd,
2874 ROLLBACK_LOG_NODE log,
2875 SERIALIZED_ROLLBACK_LOG_NODE serialized_log,
2876 bool is_serialized,
2877 FT ft,
2878 bool for_checkpoint) {
2879 size_t n_to_write;
2880 char *compressed_buf;
2881 struct serialized_rollback_log_node serialized_local;
2882
2883 if (is_serialized) {
2884 invariant_null(log);
2885 } else {
2886 invariant_null(serialized_log);
2887 serialized_log = &serialized_local;
2888 toku_serialize_rollback_log_to_memory_uncompressed(log, serialized_log);
2889 }
2890
2891 BLOCKNUM blocknum = serialized_log->blocknum;
2892 invariant(blocknum.b >= 0);
2893
2894 // Compress and malloc buffer to write
2895 serialize_uncompressed_block_to_memory(serialized_log->data,
2896 serialized_log->n_sub_blocks,
2897 serialized_log->sub_block,
2898 ft->h->compression_method,
2899 &n_to_write,
2900 &compressed_buf);
2901
2902 // Dirties the ft
2903 DISKOFF offset;
2904 ft->blocktable.realloc_on_disk(
2905 blocknum, n_to_write, &offset, ft, fd, for_checkpoint);
2906
2907 toku_os_full_pwrite(fd, compressed_buf, n_to_write, offset);
2908 toku_free(compressed_buf);
2909 if (!is_serialized) {
2910 toku_static_serialized_rollback_log_destroy(&serialized_local);
2911 log->dirty = false; // See #1957. Must set the node to be clean after
2912 // serializing it so that it doesn't get written again
2913 // on the next checkpoint or eviction.
2914 }
2915 return 0;
2916 }
2917
2918 static int
deserialize_rollback_log_from_rbuf(BLOCKNUM blocknum,ROLLBACK_LOG_NODE * log_p,struct rbuf * rb)2919 deserialize_rollback_log_from_rbuf (BLOCKNUM blocknum, ROLLBACK_LOG_NODE *log_p, struct rbuf *rb) {
2920 ROLLBACK_LOG_NODE MALLOC(result);
2921 int r;
2922 if (result==NULL) {
2923 r=get_error_errno();
2924 if (0) { died0: toku_free(result); }
2925 return r;
2926 }
2927
2928 const void *magic;
2929 rbuf_literal_bytes(rb, &magic, 8);
2930 lazy_assert(!memcmp(magic, "tokuroll", 8));
2931
2932 result->layout_version = rbuf_int(rb);
2933 lazy_assert((FT_LAYOUT_VERSION_25 <= result->layout_version && result->layout_version <= FT_LAYOUT_VERSION_27) ||
2934 (result->layout_version == FT_LAYOUT_VERSION));
2935 result->layout_version_original = rbuf_int(rb);
2936 result->layout_version_read_from_disk = result->layout_version;
2937 result->build_id = rbuf_int(rb);
2938 result->dirty = false;
2939 //TODO: Maybe add descriptor (or just descriptor version) here eventually?
2940 //TODO: This is hard.. everything is shared in a single dictionary.
2941 rbuf_TXNID_PAIR(rb, &result->txnid);
2942 result->sequence = rbuf_ulonglong(rb);
2943 result->blocknum = rbuf_blocknum(rb);
2944 if (result->blocknum.b != blocknum.b) {
2945 r = toku_db_badformat();
2946 goto died0;
2947 }
2948 result->previous = rbuf_blocknum(rb);
2949 result->rollentry_resident_bytecount = rbuf_ulonglong(rb);
2950
2951 size_t arena_initial_size = rbuf_ulonglong(rb);
2952 result->rollentry_arena.create(arena_initial_size);
2953 if (0) { died1: result->rollentry_arena.destroy(); goto died0; }
2954
2955 //Load rollback entries
2956 lazy_assert(rb->size > 4);
2957 //Start with empty list
2958 result->oldest_logentry = result->newest_logentry = NULL;
2959 while (rb->ndone < rb->size) {
2960 struct roll_entry *item;
2961 uint32_t rollback_fsize = rbuf_int(rb); //Already read 4. Rest is 4 smaller
2962 const void *item_vec;
2963 rbuf_literal_bytes(rb, &item_vec, rollback_fsize-4);
2964 unsigned char* item_buf = (unsigned char*)item_vec;
2965 r = toku_parse_rollback(item_buf, rollback_fsize-4, &item, &result->rollentry_arena);
2966 if (r!=0) {
2967 r = toku_db_badformat();
2968 goto died1;
2969 }
2970 //Add to head of list
2971 if (result->oldest_logentry) {
2972 result->oldest_logentry->prev = item;
2973 result->oldest_logentry = item;
2974 item->prev = NULL;
2975 }
2976 else {
2977 result->oldest_logentry = result->newest_logentry = item;
2978 item->prev = NULL;
2979 }
2980 }
2981
2982 toku_free(rb->buf);
2983 rb->buf = NULL;
2984 *log_p = result;
2985 return 0;
2986 }
2987
2988 static int
deserialize_rollback_log_from_rbuf_versioned(uint32_t version,BLOCKNUM blocknum,ROLLBACK_LOG_NODE * log,struct rbuf * rb)2989 deserialize_rollback_log_from_rbuf_versioned (uint32_t version, BLOCKNUM blocknum,
2990 ROLLBACK_LOG_NODE *log,
2991 struct rbuf *rb) {
2992 int r = 0;
2993 ROLLBACK_LOG_NODE rollback_log_node = NULL;
2994 invariant((FT_LAYOUT_VERSION_25 <= version && version <= FT_LAYOUT_VERSION_27) || version == FT_LAYOUT_VERSION);
2995 r = deserialize_rollback_log_from_rbuf(blocknum, &rollback_log_node, rb);
2996 if (r==0) {
2997 *log = rollback_log_node;
2998 }
2999 return r;
3000 }
3001
3002 int
decompress_from_raw_block_into_rbuf(uint8_t * raw_block,size_t raw_block_size,struct rbuf * rb,BLOCKNUM blocknum)3003 decompress_from_raw_block_into_rbuf(uint8_t *raw_block, size_t raw_block_size, struct rbuf *rb, BLOCKNUM blocknum) {
3004 int r = 0;
3005 // get the number of compressed sub blocks
3006 int n_sub_blocks;
3007 n_sub_blocks = toku_dtoh32(*(uint32_t*)(&raw_block[node_header_overhead]));
3008
3009 // verify the number of sub blocks
3010 invariant(0 <= n_sub_blocks);
3011 invariant(n_sub_blocks <= max_sub_blocks);
3012
3013 { // verify the header checksum
3014 uint32_t header_length = node_header_overhead + sub_block_header_size(n_sub_blocks);
3015 invariant(header_length <= raw_block_size);
3016 uint32_t xsum = toku_x1764_memory(raw_block, header_length);
3017 uint32_t stored_xsum = toku_dtoh32(*(uint32_t *)(raw_block + header_length));
3018 if (xsum != stored_xsum) {
3019 r = TOKUDB_BAD_CHECKSUM;
3020 }
3021 }
3022
3023 // deserialize the sub block header
3024 struct sub_block sub_block[n_sub_blocks];
3025 uint32_t *sub_block_header = (uint32_t *) &raw_block[node_header_overhead+4];
3026 for (int i = 0; i < n_sub_blocks; i++) {
3027 sub_block_init(&sub_block[i]);
3028 sub_block[i].compressed_size = toku_dtoh32(sub_block_header[0]);
3029 sub_block[i].uncompressed_size = toku_dtoh32(sub_block_header[1]);
3030 sub_block[i].xsum = toku_dtoh32(sub_block_header[2]);
3031 sub_block_header += 3;
3032 }
3033
3034 // This predicate needs to be here and instead of where it is set
3035 // for the compiler.
3036 if (r == TOKUDB_BAD_CHECKSUM) {
3037 goto exit;
3038 }
3039
3040 // verify sub block sizes
3041 for (int i = 0; i < n_sub_blocks; i++) {
3042 uint32_t compressed_size = sub_block[i].compressed_size;
3043 if (compressed_size<=0 || compressed_size>(1<<30)) {
3044 r = toku_db_badformat();
3045 goto exit;
3046 }
3047
3048 uint32_t uncompressed_size = sub_block[i].uncompressed_size;
3049 if (0) printf("Block %" PRId64 " Compressed size = %u, uncompressed size=%u\n", blocknum.b, compressed_size, uncompressed_size);
3050 if (uncompressed_size<=0 || uncompressed_size>(1<<30)) {
3051 r = toku_db_badformat();
3052 goto exit;
3053 }
3054 }
3055
3056 // sum up the uncompressed size of the sub blocks
3057 size_t uncompressed_size;
3058 uncompressed_size = get_sum_uncompressed_size(n_sub_blocks, sub_block);
3059
3060 // allocate the uncompressed buffer
3061 size_t size;
3062 size = node_header_overhead + uncompressed_size;
3063 unsigned char *buf;
3064 XMALLOC_N(size, buf);
3065 rbuf_init(rb, buf, size);
3066
3067 // copy the uncompressed node header to the uncompressed buffer
3068 memcpy(rb->buf, raw_block, node_header_overhead);
3069
3070 // point at the start of the compressed data (past the node header, the sub block header, and the header checksum)
3071 unsigned char *compressed_data;
3072 compressed_data = raw_block + node_header_overhead + sub_block_header_size(n_sub_blocks) + sizeof (uint32_t);
3073
3074 // point at the start of the uncompressed data
3075 unsigned char *uncompressed_data;
3076 uncompressed_data = rb->buf + node_header_overhead;
3077
3078 // decompress all the compressed sub blocks into the uncompressed buffer
3079 r = decompress_all_sub_blocks(n_sub_blocks, sub_block, compressed_data, uncompressed_data, num_cores, ft_pool);
3080 if (r != 0) {
3081 fprintf(stderr, "%s:%d block %" PRId64 " failed %d at %p size %lu\n", __FUNCTION__, __LINE__, blocknum.b, r, raw_block, raw_block_size);
3082 dump_bad_block(raw_block, raw_block_size);
3083 goto exit;
3084 }
3085
3086 rb->ndone=0;
3087 exit:
3088 return r;
3089 }
3090
decompress_from_raw_block_into_rbuf_versioned(uint32_t version,uint8_t * raw_block,size_t raw_block_size,struct rbuf * rb,BLOCKNUM blocknum)3091 static int decompress_from_raw_block_into_rbuf_versioned(uint32_t version, uint8_t *raw_block, size_t raw_block_size, struct rbuf *rb, BLOCKNUM blocknum) {
3092 // This function exists solely to accommodate future changes in compression.
3093 int r = 0;
3094 if ((version == FT_LAYOUT_VERSION_13 || version == FT_LAYOUT_VERSION_14) ||
3095 (FT_LAYOUT_VERSION_25 <= version && version <= FT_LAYOUT_VERSION_27) ||
3096 version == FT_LAYOUT_VERSION) {
3097 r = decompress_from_raw_block_into_rbuf(raw_block, raw_block_size, rb, blocknum);
3098 } else {
3099 abort();
3100 }
3101 return r;
3102 }
3103
read_and_decompress_block_from_fd_into_rbuf(int fd,BLOCKNUM blocknum,DISKOFF offset,DISKOFF size,FT ft,struct rbuf * rb,int * layout_version_p)3104 static int read_and_decompress_block_from_fd_into_rbuf(
3105 int fd,
3106 BLOCKNUM blocknum,
3107 DISKOFF offset,
3108 DISKOFF size,
3109 FT ft,
3110 struct rbuf *rb,
3111 /* out */ int *layout_version_p) {
3112 int r = 0;
3113 if (0) printf("Deserializing Block %" PRId64 "\n", blocknum.b);
3114
3115 DISKOFF size_aligned = roundup_to_multiple(512, size);
3116 uint8_t *XMALLOC_N_ALIGNED(512, size_aligned, raw_block);
3117 {
3118 // read the (partially compressed) block
3119 ssize_t rlen = toku_os_pread(fd, raw_block, size_aligned, offset);
3120 lazy_assert((DISKOFF)rlen >= size);
3121 lazy_assert((DISKOFF)rlen <= size_aligned);
3122 }
3123 // get the layout_version
3124 int layout_version;
3125 {
3126 uint8_t *magic = raw_block + uncompressed_magic_offset;
3127 if (memcmp(magic, "tokuleaf", 8)!=0 &&
3128 memcmp(magic, "tokunode", 8)!=0 &&
3129 memcmp(magic, "tokuroll", 8)!=0) {
3130 r = toku_db_badformat();
3131 goto cleanup;
3132 }
3133 uint8_t *version = raw_block + uncompressed_version_offset;
3134 layout_version = toku_dtoh32(*(uint32_t*)version);
3135 if (layout_version < FT_LAYOUT_MIN_SUPPORTED_VERSION || layout_version > FT_LAYOUT_VERSION) {
3136 r = toku_db_badformat();
3137 goto cleanup;
3138 }
3139 }
3140
3141 r = decompress_from_raw_block_into_rbuf_versioned(layout_version, raw_block, size, rb, blocknum);
3142 if (r != 0) {
3143 // We either failed the checksome, or there is a bad format in
3144 // the buffer.
3145 if (r == TOKUDB_BAD_CHECKSUM) {
3146 fprintf(stderr,
3147 "Checksum failure while reading raw block in file %s.\n",
3148 toku_cachefile_fname_in_env(ft->cf));
3149 abort();
3150 } else {
3151 r = toku_db_badformat();
3152 goto cleanup;
3153 }
3154 }
3155
3156 *layout_version_p = layout_version;
3157 cleanup:
3158 if (r!=0) {
3159 if (rb->buf) toku_free(rb->buf);
3160 rb->buf = NULL;
3161 }
3162 if (raw_block) {
3163 toku_free(raw_block);
3164 }
3165 return r;
3166 }
3167
3168 // Read rollback log node from file into struct.
3169 // Perform version upgrade if necessary.
toku_deserialize_rollback_log_from(int fd,BLOCKNUM blocknum,ROLLBACK_LOG_NODE * logp,FT ft)3170 int toku_deserialize_rollback_log_from(int fd, BLOCKNUM blocknum, ROLLBACK_LOG_NODE *logp, FT ft) {
3171 int layout_version = 0;
3172 int r;
3173
3174 struct rbuf rb;
3175 rbuf_init(&rb, nullptr, 0);
3176
3177 // get the file offset and block size for the block
3178 DISKOFF offset, size;
3179 ft->blocktable.translate_blocknum_to_offset_size(blocknum, &offset, &size);
3180
3181 // if the size is 0, then the blocknum is unused
3182 if (size == 0) {
3183 // blocknum is unused, just create an empty one and get out
3184 ROLLBACK_LOG_NODE XMALLOC(log);
3185 rollback_empty_log_init(log);
3186 log->blocknum.b = blocknum.b;
3187 r = 0;
3188 *logp = log;
3189 goto cleanup;
3190 }
3191
3192 r = read_and_decompress_block_from_fd_into_rbuf(fd, blocknum, offset, size, ft, &rb, &layout_version);
3193 if (r!=0) goto cleanup;
3194
3195 {
3196 uint8_t *magic = rb.buf + uncompressed_magic_offset;
3197 if (memcmp(magic, "tokuroll", 8)!=0) {
3198 r = toku_db_badformat();
3199 goto cleanup;
3200 }
3201 }
3202
3203 r = deserialize_rollback_log_from_rbuf_versioned(layout_version, blocknum, logp, &rb);
3204
3205 cleanup:
3206 if (rb.buf) {
3207 toku_free(rb.buf);
3208 }
3209 return r;
3210 }
3211
3212 int
toku_upgrade_subtree_estimates_to_stat64info(int fd,FT ft)3213 toku_upgrade_subtree_estimates_to_stat64info(int fd, FT ft)
3214 {
3215 int r = 0;
3216 // 15 was the last version with subtree estimates
3217 invariant(ft->layout_version_read_from_disk <= FT_LAYOUT_VERSION_15);
3218
3219 FTNODE unused_node = NULL;
3220 FTNODE_DISK_DATA unused_ndd = NULL;
3221 ftnode_fetch_extra bfe;
3222 bfe.create_for_min_read(ft);
3223 r = deserialize_ftnode_from_fd(fd, ft->h->root_blocknum, 0, &unused_node, &unused_ndd,
3224 &bfe, &ft->h->on_disk_stats);
3225 ft->in_memory_stats = ft->h->on_disk_stats;
3226
3227 if (unused_node) {
3228 toku_ftnode_free(&unused_node);
3229 }
3230 if (unused_ndd) {
3231 toku_free(unused_ndd);
3232 }
3233 return r;
3234 }
3235
3236 int
toku_upgrade_msn_from_root_to_header(int fd,FT ft)3237 toku_upgrade_msn_from_root_to_header(int fd, FT ft)
3238 {
3239 int r;
3240 // 21 was the first version with max_msn_in_ft in the header
3241 invariant(ft->layout_version_read_from_disk <= FT_LAYOUT_VERSION_20);
3242
3243 FTNODE node;
3244 FTNODE_DISK_DATA ndd;
3245 ftnode_fetch_extra bfe;
3246 bfe.create_for_min_read(ft);
3247 r = deserialize_ftnode_from_fd(fd, ft->h->root_blocknum, 0, &node, &ndd, &bfe, nullptr);
3248 if (r != 0) {
3249 goto exit;
3250 }
3251
3252 ft->h->max_msn_in_ft = node->max_msn_applied_to_node_on_disk;
3253 toku_ftnode_free(&node);
3254 toku_free(ndd);
3255 exit:
3256 return r;
3257 }
3258
3259 #undef UPGRADE_STATUS_VALUE
3260