1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22 ----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39 #include "portability/memory.h"
40 #include "portability/toku_assert.h"
41 #include "portability/toku_portability.h"
42 #include "portability/toku_pthread.h"
43
44 // ugly but pragmatic, need access to dirty bits while holding translation lock
45 // TODO: Refactor this (possibly with FT-301)
46 #include "ft/ft-internal.h"
47
48 // TODO: reorganize this dependency (FT-303)
49 #include "ft/ft-ops.h" // for toku_maybe_truncate_file
50 #include "ft/serialize/block_table.h"
51 #include "ft/serialize/rbuf.h"
52 #include "ft/serialize/wbuf.h"
53 #include "ft/serialize/block_allocator.h"
54 #include "util/nb_mutex.h"
55 #include "util/scoped_malloc.h"
56
57
58 toku_instr_key *block_table_mutex_key;
59 toku_instr_key *safe_file_size_lock_mutex_key;
60 toku_instr_key *safe_file_size_lock_rwlock_key;
61
62 // indicates the end of a freelist
63 static const BLOCKNUM freelist_null = {-1};
64
65 // value of block_translation_pair.size if blocknum is unused
66 static const DISKOFF size_is_free = (DISKOFF)-1;
67
68 // value of block_translation_pair.u.diskoff if blocknum is used but does not
69 // yet have a diskblock
70 static const DISKOFF diskoff_unused = (DISKOFF)-2;
71
_mutex_lock()72 void block_table::_mutex_lock() { toku_mutex_lock(&_mutex); }
73
_mutex_unlock()74 void block_table::_mutex_unlock() { toku_mutex_unlock(&_mutex); }
75
76 // TODO: Move lock to FT
toku_ft_lock(FT ft)77 void toku_ft_lock(FT ft) {
78 block_table *bt = &ft->blocktable;
79 bt->_mutex_lock();
80 }
81
82 // TODO: Move lock to FT
toku_ft_unlock(FT ft)83 void toku_ft_unlock(FT ft) {
84 block_table *bt = &ft->blocktable;
85 toku_mutex_assert_locked(&bt->_mutex);
86 bt->_mutex_unlock();
87 }
88
89 // There are two headers: the reserve must fit them both and be suitably
90 // aligned.
91 static_assert(BlockAllocator::BLOCK_ALLOCATOR_HEADER_RESERVE %
92 BlockAllocator::BLOCK_ALLOCATOR_ALIGNMENT ==
93 0,
94 "Block allocator's header reserve must be suitibly aligned");
95 static_assert(
96 BlockAllocator::BLOCK_ALLOCATOR_HEADER_RESERVE * 2 ==
97 BlockAllocator::BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE,
98 "Block allocator's total header reserve must exactly fit two headers");
99
100 // does NOT initialize the block allocator: the caller is responsible
_create_internal()101 void block_table::_create_internal() {
102 memset(&_current, 0, sizeof(struct translation));
103 memset(&_inprogress, 0, sizeof(struct translation));
104 memset(&_checkpointed, 0, sizeof(struct translation));
105 memset(&_mutex, 0, sizeof(_mutex));
106 _bt_block_allocator = new BlockAllocator();
107 toku_mutex_init(*block_table_mutex_key, &_mutex, nullptr);
108 nb_mutex_init(*safe_file_size_lock_mutex_key,
109 *safe_file_size_lock_rwlock_key,
110 &_safe_file_size_lock);
111 }
112
113 // Fill in the checkpointed translation from buffer, and copy checkpointed to
114 // current.
115 // The one read from disk is the last known checkpointed one, so we are keeping
116 // it in
117 // place and then setting current (which is never stored on disk) for current
118 // use.
119 // The translation_buffer has translation only, we create the rest of the
120 // block_table.
create_from_buffer(int fd,DISKOFF location_on_disk,DISKOFF size_on_disk,unsigned char * translation_buffer)121 int block_table::create_from_buffer(
122 int fd,
123 DISKOFF location_on_disk, // Location of translation_buffer
124 DISKOFF size_on_disk,
125 unsigned char *translation_buffer) {
126 // Does not initialize the block allocator
127 _create_internal();
128
129 // Deserialize the translation and copy it to current
130 int r = _translation_deserialize_from_buffer(
131 &_checkpointed, location_on_disk, size_on_disk, translation_buffer);
132 if (r != 0) {
133 return r;
134 }
135 _copy_translation(&_current, &_checkpointed, TRANSLATION_CURRENT);
136
137 // Determine the file size
138 int64_t file_size = 0;
139 r = toku_os_get_file_size(fd, &file_size);
140 lazy_assert_zero(r);
141 invariant(file_size >= 0);
142 _safe_file_size = file_size;
143
144 // Gather the non-empty translations and use them to create the block
145 // allocator
146 toku::scoped_malloc pairs_buf(_checkpointed.smallest_never_used_blocknum.b *
147 sizeof(struct BlockAllocator::BlockPair));
148 struct BlockAllocator::BlockPair *CAST_FROM_VOIDP(pairs, pairs_buf.get());
149 uint64_t n_pairs = 0;
150 for (int64_t i = 0; i < _checkpointed.smallest_never_used_blocknum.b; i++) {
151 struct block_translation_pair pair = _checkpointed.block_translation[i];
152 if (pair.size > 0) {
153 invariant(pair.u.diskoff != diskoff_unused);
154 pairs[n_pairs++] =
155 BlockAllocator::BlockPair(pair.u.diskoff, pair.size);
156 }
157 }
158
159 _bt_block_allocator->CreateFromBlockPairs(
160 BlockAllocator::BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE,
161 BlockAllocator::BLOCK_ALLOCATOR_ALIGNMENT,
162 pairs,
163 n_pairs);
164
165 return 0;
166 }
167
create()168 void block_table::create() {
169 // Does not initialize the block allocator
170 _create_internal();
171
172 _checkpointed.type = TRANSLATION_CHECKPOINTED;
173 _checkpointed.smallest_never_used_blocknum =
174 make_blocknum(RESERVED_BLOCKNUMS);
175 _checkpointed.length_of_array =
176 _checkpointed.smallest_never_used_blocknum.b;
177 _checkpointed.blocknum_freelist_head = freelist_null;
178 XMALLOC_N(_checkpointed.length_of_array, _checkpointed.block_translation);
179 for (int64_t i = 0; i < _checkpointed.length_of_array; i++) {
180 _checkpointed.block_translation[i].size = 0;
181 _checkpointed.block_translation[i].u.diskoff = diskoff_unused;
182 }
183
184 // we just created a default checkpointed, now copy it to current.
185 _copy_translation(&_current, &_checkpointed, TRANSLATION_CURRENT);
186
187 // Create an empty block allocator.
188 _bt_block_allocator->Create(
189 BlockAllocator::BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE,
190 BlockAllocator::BLOCK_ALLOCATOR_ALIGNMENT);
191 }
192
193 // TODO: Refactor with FT-303
ft_set_dirty(FT ft,bool for_checkpoint)194 static void ft_set_dirty(FT ft, bool for_checkpoint) {
195 invariant(ft->h->type == FT_CURRENT);
196 if (for_checkpoint) {
197 invariant(ft->checkpoint_header->type == FT_CHECKPOINT_INPROGRESS);
198 ft->checkpoint_header->set_dirty();
199 } else {
200 ft->h->set_dirty();
201 }
202 }
203
_maybe_truncate_file(int fd,uint64_t size_needed_before)204 void block_table::_maybe_truncate_file(int fd, uint64_t size_needed_before) {
205 toku_mutex_assert_locked(&_mutex);
206 uint64_t new_size_needed = _bt_block_allocator->AllocatedLimit();
207 // Save a call to toku_os_get_file_size (kernel call) if unlikely to be
208 // useful.
209 if (new_size_needed < size_needed_before &&
210 new_size_needed < _safe_file_size) {
211 nb_mutex_lock(&_safe_file_size_lock, &_mutex);
212
213 // Must hold _safe_file_size_lock to change _safe_file_size.
214 if (new_size_needed < _safe_file_size) {
215 int64_t safe_file_size_before = _safe_file_size;
216 // Not safe to use the 'to-be-truncated' portion until truncate is
217 // done.
218 _safe_file_size = new_size_needed;
219 _mutex_unlock();
220
221 uint64_t size_after;
222 toku_maybe_truncate_file(
223 fd, new_size_needed, safe_file_size_before, &size_after);
224 _mutex_lock();
225
226 _safe_file_size = size_after;
227 }
228 nb_mutex_unlock(&_safe_file_size_lock);
229 }
230 }
231
maybe_truncate_file_on_open(int fd)232 void block_table::maybe_truncate_file_on_open(int fd) {
233 _mutex_lock();
234 _maybe_truncate_file(fd, _safe_file_size);
235 _mutex_unlock();
236 }
237
_copy_translation(struct translation * dst,struct translation * src,enum translation_type newtype)238 void block_table::_copy_translation(struct translation *dst,
239 struct translation *src,
240 enum translation_type newtype) {
241 // We intend to malloc a fresh block, so the incoming translation should be
242 // empty
243 invariant_null(dst->block_translation);
244
245 invariant(src->length_of_array >= src->smallest_never_used_blocknum.b);
246 invariant(newtype == TRANSLATION_DEBUG ||
247 (src->type == TRANSLATION_CURRENT &&
248 newtype == TRANSLATION_INPROGRESS) ||
249 (src->type == TRANSLATION_CHECKPOINTED &&
250 newtype == TRANSLATION_CURRENT));
251 dst->type = newtype;
252 dst->smallest_never_used_blocknum = src->smallest_never_used_blocknum;
253 dst->blocknum_freelist_head = src->blocknum_freelist_head;
254
255 // destination btt is of fixed size. Allocate + memcpy the exact length
256 // necessary.
257 dst->length_of_array = dst->smallest_never_used_blocknum.b;
258 XMALLOC_N(dst->length_of_array, dst->block_translation);
259 memcpy(dst->block_translation,
260 src->block_translation,
261 dst->length_of_array * sizeof(*dst->block_translation));
262
263 // New version of btt is not yet stored on disk.
264 dst->block_translation[RESERVED_BLOCKNUM_TRANSLATION].size = 0;
265 dst->block_translation[RESERVED_BLOCKNUM_TRANSLATION].u.diskoff =
266 diskoff_unused;
267 }
268
get_blocks_in_use_unlocked()269 int64_t block_table::get_blocks_in_use_unlocked() {
270 BLOCKNUM b;
271 struct translation *t = &_current;
272 int64_t num_blocks = 0;
273 {
274 // Reserved blocknums do not get upgraded; They are part of the header.
275 for (b.b = RESERVED_BLOCKNUMS; b.b < t->smallest_never_used_blocknum.b;
276 b.b++) {
277 if (t->block_translation[b.b].size != size_is_free) {
278 num_blocks++;
279 }
280 }
281 }
282 return num_blocks;
283 }
284
_maybe_optimize_translation(struct translation * t)285 void block_table::_maybe_optimize_translation(struct translation *t) {
286 // Reduce 'smallest_never_used_blocknum.b' (completely free blocknums
287 // instead of just
288 // on a free list. Doing so requires us to regenerate the free list.
289 // This is O(n) work, so do it only if you're already doing that.
290
291 BLOCKNUM b;
292 paranoid_invariant(t->smallest_never_used_blocknum.b >= RESERVED_BLOCKNUMS);
293 // Calculate how large the free suffix is.
294 int64_t freed;
295 {
296 for (b.b = t->smallest_never_used_blocknum.b; b.b > RESERVED_BLOCKNUMS;
297 b.b--) {
298 if (t->block_translation[b.b - 1].size != size_is_free) {
299 break;
300 }
301 }
302 freed = t->smallest_never_used_blocknum.b - b.b;
303 }
304 if (freed > 0) {
305 t->smallest_never_used_blocknum.b = b.b;
306 if (t->length_of_array / 4 > t->smallest_never_used_blocknum.b) {
307 // We're using more memory than necessary to represent this now.
308 // Reduce.
309 uint64_t new_length = t->smallest_never_used_blocknum.b * 2;
310 XREALLOC_N(new_length, t->block_translation);
311 t->length_of_array = new_length;
312 // No need to zero anything out.
313 }
314
315 // Regenerate free list.
316 t->blocknum_freelist_head.b = freelist_null.b;
317 for (b.b = RESERVED_BLOCKNUMS; b.b < t->smallest_never_used_blocknum.b;
318 b.b++) {
319 if (t->block_translation[b.b].size == size_is_free) {
320 t->block_translation[b.b].u.next_free_blocknum =
321 t->blocknum_freelist_head;
322 t->blocknum_freelist_head = b;
323 }
324 }
325 }
326 }
327
328 // block table must be locked by caller of this function
note_start_checkpoint_unlocked()329 void block_table::note_start_checkpoint_unlocked() {
330 toku_mutex_assert_locked(&_mutex);
331
332 // We're going to do O(n) work to copy the translation, so we
333 // can afford to do O(n) work by optimizing the translation
334 _maybe_optimize_translation(&_current);
335
336 // Copy current translation to inprogress translation.
337 _copy_translation(&_inprogress, &_current, TRANSLATION_INPROGRESS);
338
339 _checkpoint_skipped = false;
340 }
341
note_skipped_checkpoint()342 void block_table::note_skipped_checkpoint() {
343 // Purpose, alert block translation that the checkpoint was skipped, e.x.
344 // for a non-dirty header
345 _mutex_lock();
346 paranoid_invariant_notnull(_inprogress.block_translation);
347 _checkpoint_skipped = true;
348 _mutex_unlock();
349 }
350
351 // Purpose: free any disk space used by previous checkpoint that isn't in use by
352 // either
353 // - current state
354 // - in-progress checkpoint
355 // capture inprogress as new checkpointed.
356 // For each entry in checkpointBTT
357 // if offset does not match offset in inprogress
358 // assert offset does not match offset in current
359 // free (offset,len) from checkpoint
360 // move inprogress to checkpoint (resetting type)
361 // inprogress = NULL
note_end_checkpoint(int fd)362 void block_table::note_end_checkpoint(int fd) {
363 // Free unused blocks
364 _mutex_lock();
365 uint64_t allocated_limit_at_start = _bt_block_allocator->AllocatedLimit();
366 paranoid_invariant_notnull(_inprogress.block_translation);
367 if (_checkpoint_skipped) {
368 toku_free(_inprogress.block_translation);
369 memset(&_inprogress, 0, sizeof(_inprogress));
370 goto end;
371 }
372
373 // Make certain inprogress was allocated space on disk
374 invariant(
375 _inprogress.block_translation[RESERVED_BLOCKNUM_TRANSLATION].size > 0);
376 invariant(
377 _inprogress.block_translation[RESERVED_BLOCKNUM_TRANSLATION].u.diskoff >
378 0);
379
380 {
381 struct translation *t = &_checkpointed;
382 for (int64_t i = 0; i < t->length_of_array; i++) {
383 struct block_translation_pair *pair = &t->block_translation[i];
384 if (pair->size > 0 &&
385 !_translation_prevents_freeing(
386 &_inprogress, make_blocknum(i), pair)) {
387 invariant(!_translation_prevents_freeing(
388 &_current, make_blocknum(i), pair));
389 _bt_block_allocator->FreeBlock(pair->u.diskoff, pair->size);
390 }
391 }
392 toku_free(_checkpointed.block_translation);
393 _checkpointed = _inprogress;
394 _checkpointed.type = TRANSLATION_CHECKPOINTED;
395 memset(&_inprogress, 0, sizeof(_inprogress));
396 _maybe_truncate_file(fd, allocated_limit_at_start);
397 }
398 end:
399 _mutex_unlock();
400 }
401
_is_valid_blocknum(struct translation * t,BLOCKNUM b)402 bool block_table::_is_valid_blocknum(struct translation *t, BLOCKNUM b) {
403 invariant(t->length_of_array >= t->smallest_never_used_blocknum.b);
404 return b.b >= 0 && b.b < t->smallest_never_used_blocknum.b;
405 }
406
_verify_valid_blocknum(struct translation * UU (t),BLOCKNUM UU (b))407 void block_table::_verify_valid_blocknum(struct translation *UU(t),
408 BLOCKNUM UU(b)) {
409 invariant(_is_valid_blocknum(t, b));
410 }
411
_is_valid_freeable_blocknum(struct translation * t,BLOCKNUM b)412 bool block_table::_is_valid_freeable_blocknum(struct translation *t,
413 BLOCKNUM b) {
414 invariant(t->length_of_array >= t->smallest_never_used_blocknum.b);
415 return b.b >= RESERVED_BLOCKNUMS && b.b < t->smallest_never_used_blocknum.b;
416 }
417
418 // should be freeable
_verify_valid_freeable_blocknum(struct translation * UU (t),BLOCKNUM UU (b))419 void block_table::_verify_valid_freeable_blocknum(struct translation *UU(t),
420 BLOCKNUM UU(b)) {
421 invariant(_is_valid_freeable_blocknum(t, b));
422 }
423
424 // Also used only in ft-serialize-test.
block_free(uint64_t offset,uint64_t size)425 void block_table::block_free(uint64_t offset, uint64_t size) {
426 _mutex_lock();
427 _bt_block_allocator->FreeBlock(offset, size);
428 _mutex_unlock();
429 }
430
_calculate_size_on_disk(struct translation * t)431 int64_t block_table::_calculate_size_on_disk(struct translation *t) {
432 return 8 + // smallest_never_used_blocknum
433 8 + // blocknum_freelist_head
434 t->smallest_never_used_blocknum.b * 16 + // Array
435 4; // 4 for checksum
436 }
437
438 // We cannot free the disk space allocated to this blocknum if it is still in
439 // use by the given translation table.
_translation_prevents_freeing(struct translation * t,BLOCKNUM b,struct block_translation_pair * old_pair)440 bool block_table::_translation_prevents_freeing(
441 struct translation *t,
442 BLOCKNUM b,
443 struct block_translation_pair *old_pair) {
444 return t->block_translation && b.b < t->smallest_never_used_blocknum.b &&
445 old_pair->u.diskoff == t->block_translation[b.b].u.diskoff;
446 }
447
_realloc_on_disk_internal(BLOCKNUM b,DISKOFF size,DISKOFF * offset,FT ft,bool for_checkpoint)448 void block_table::_realloc_on_disk_internal(BLOCKNUM b,
449 DISKOFF size,
450 DISKOFF *offset,
451 FT ft,
452 bool for_checkpoint) {
453 toku_mutex_assert_locked(&_mutex);
454 ft_set_dirty(ft, for_checkpoint);
455
456 struct translation *t = &_current;
457 struct block_translation_pair old_pair = t->block_translation[b.b];
458 // Free the old block if it is not still in use by the checkpoint in
459 // progress or the previous checkpoint
460 bool cannot_free =
461 (!for_checkpoint &&
462 _translation_prevents_freeing(&_inprogress, b, &old_pair)) ||
463 _translation_prevents_freeing(&_checkpointed, b, &old_pair);
464 if (!cannot_free && old_pair.u.diskoff != diskoff_unused) {
465 _bt_block_allocator->FreeBlock(old_pair.u.diskoff, old_pair.size);
466 }
467
468 uint64_t allocator_offset = diskoff_unused;
469 t->block_translation[b.b].size = size;
470 if (size > 0) {
471 // Allocate a new block if the size is greater than 0,
472 // if the size is just 0, offset will be set to diskoff_unused
473 _bt_block_allocator->AllocBlock(size, &allocator_offset);
474 }
475 t->block_translation[b.b].u.diskoff = allocator_offset;
476 *offset = allocator_offset;
477
478 // Update inprogress btt if appropriate (if called because Pending bit is
479 // set).
480 if (for_checkpoint) {
481 paranoid_invariant(b.b < _inprogress.length_of_array);
482 _inprogress.block_translation[b.b] = t->block_translation[b.b];
483 }
484 }
485
_ensure_safe_write_unlocked(int fd,DISKOFF block_size,DISKOFF block_offset)486 void block_table::_ensure_safe_write_unlocked(int fd,
487 DISKOFF block_size,
488 DISKOFF block_offset) {
489 // Requires: holding _mutex
490 uint64_t size_needed = block_size + block_offset;
491 if (size_needed > _safe_file_size) {
492 // Must hold _safe_file_size_lock to change _safe_file_size.
493 nb_mutex_lock(&_safe_file_size_lock, &_mutex);
494 if (size_needed > _safe_file_size) {
495 _mutex_unlock();
496
497 int64_t size_after;
498 toku_maybe_preallocate_in_file(
499 fd, size_needed, _safe_file_size, &size_after);
500
501 _mutex_lock();
502 _safe_file_size = size_after;
503 }
504 nb_mutex_unlock(&_safe_file_size_lock);
505 }
506 }
507
realloc_on_disk(BLOCKNUM b,DISKOFF size,DISKOFF * offset,FT ft,int fd,bool for_checkpoint)508 void block_table::realloc_on_disk(BLOCKNUM b,
509 DISKOFF size,
510 DISKOFF *offset,
511 FT ft,
512 int fd,
513 bool for_checkpoint) {
514 _mutex_lock();
515 struct translation *t = &_current;
516 _verify_valid_freeable_blocknum(t, b);
517 _realloc_on_disk_internal(b, size, offset, ft, for_checkpoint);
518
519 _ensure_safe_write_unlocked(fd, size, *offset);
520 _mutex_unlock();
521 }
522
_pair_is_unallocated(struct block_translation_pair * pair)523 bool block_table::_pair_is_unallocated(struct block_translation_pair *pair) {
524 return pair->size == 0 && pair->u.diskoff == diskoff_unused;
525 }
526
527 // Effect: figure out where to put the inprogress btt on disk, allocate space
528 // for it there.
529 // The space must be 512-byte aligned (both the starting address and the
530 // size).
531 // As a result, the allcoated space may be a little bit bigger (up to the next
532 // 512-byte boundary) than the actual btt.
_alloc_inprogress_translation_on_disk_unlocked()533 void block_table::_alloc_inprogress_translation_on_disk_unlocked() {
534 toku_mutex_assert_locked(&_mutex);
535
536 struct translation *t = &_inprogress;
537 paranoid_invariant_notnull(t->block_translation);
538 BLOCKNUM b = make_blocknum(RESERVED_BLOCKNUM_TRANSLATION);
539 // Each inprogress is allocated only once
540 paranoid_invariant(_pair_is_unallocated(&t->block_translation[b.b]));
541
542 // Allocate a new block
543 int64_t size = _calculate_size_on_disk(t);
544 uint64_t offset;
545 _bt_block_allocator->AllocBlock(size, &offset);
546 t->block_translation[b.b].u.diskoff = offset;
547 t->block_translation[b.b].size = size;
548 }
549
550 // Effect: Serializes the blocktable to a wbuf (which starts uninitialized)
551 // A clean shutdown runs checkpoint start so that current and inprogress are
552 // copies.
553 // The resulting wbuf buffer is guaranteed to be be 512-byte aligned and the
554 // total length is a multiple of 512 (so we pad with zeros at the end if
555 // needd)
556 // The address is guaranteed to be 512-byte aligned, but the size is not
557 // guaranteed.
558 // It *is* guaranteed that we can read up to the next 512-byte boundary,
559 // however
serialize_translation_to_wbuf(int fd,struct wbuf * w,int64_t * address,int64_t * size)560 void block_table::serialize_translation_to_wbuf(int fd,
561 struct wbuf *w,
562 int64_t *address,
563 int64_t *size) {
564 _mutex_lock();
565 struct translation *t = &_inprogress;
566
567 BLOCKNUM b = make_blocknum(RESERVED_BLOCKNUM_TRANSLATION);
568 _alloc_inprogress_translation_on_disk_unlocked(); // The allocated block
569 // must be 512-byte
570 // aligned to make
571 // O_DIRECT happy.
572 uint64_t size_translation = _calculate_size_on_disk(t);
573 uint64_t size_aligned = roundup_to_multiple(512, size_translation);
574 invariant((int64_t)size_translation == t->block_translation[b.b].size);
575 {
576 // Init wbuf
577 if (0)
578 printf(
579 "%s:%d writing translation table of size_translation %" PRIu64
580 " at %" PRId64 "\n",
581 __FILE__,
582 __LINE__,
583 size_translation,
584 t->block_translation[b.b].u.diskoff);
585 char *XMALLOC_N_ALIGNED(512, size_aligned, buf);
586 for (uint64_t i = size_translation; i < size_aligned; i++)
587 buf[i] = 0; // fill in the end of the buffer with zeros.
588 wbuf_init(w, buf, size_aligned);
589 }
590 wbuf_BLOCKNUM(w, t->smallest_never_used_blocknum);
591 wbuf_BLOCKNUM(w, t->blocknum_freelist_head);
592 int64_t i;
593 for (i = 0; i < t->smallest_never_used_blocknum.b; i++) {
594 if (0)
595 printf("%s:%d %" PRId64 ",%" PRId64 "\n",
596 __FILE__,
597 __LINE__,
598 t->block_translation[i].u.diskoff,
599 t->block_translation[i].size);
600 wbuf_DISKOFF(w, t->block_translation[i].u.diskoff);
601 wbuf_DISKOFF(w, t->block_translation[i].size);
602 }
603 uint32_t checksum = toku_x1764_finish(&w->checksum);
604 wbuf_int(w, checksum);
605 *address = t->block_translation[b.b].u.diskoff;
606 *size = size_translation;
607 invariant((*address) % 512 == 0);
608
609 _ensure_safe_write_unlocked(fd, size_aligned, *address);
610 _mutex_unlock();
611 }
612
613 // Perhaps rename: purpose is get disk address of a block, given its blocknum
614 // (blockid?)
_translate_blocknum_to_offset_size_unlocked(BLOCKNUM b,DISKOFF * offset,DISKOFF * size)615 void block_table::_translate_blocknum_to_offset_size_unlocked(BLOCKNUM b,
616 DISKOFF *offset,
617 DISKOFF *size) {
618 struct translation *t = &_current;
619 _verify_valid_blocknum(t, b);
620 if (offset) {
621 *offset = t->block_translation[b.b].u.diskoff;
622 }
623 if (size) {
624 *size = t->block_translation[b.b].size;
625 }
626 }
627
628 // Perhaps rename: purpose is get disk address of a block, given its blocknum
629 // (blockid?)
translate_blocknum_to_offset_size(BLOCKNUM b,DISKOFF * offset,DISKOFF * size)630 void block_table::translate_blocknum_to_offset_size(BLOCKNUM b,
631 DISKOFF *offset,
632 DISKOFF *size) {
633 _mutex_lock();
634 _translate_blocknum_to_offset_size_unlocked(b, offset, size);
635 _mutex_unlock();
636 }
637
638 // Only called by toku_allocate_blocknum
639 // Effect: expand the array to maintain size invariant
640 // given that one more never-used blocknum will soon be used.
_maybe_expand_translation(struct translation * t)641 void block_table::_maybe_expand_translation(struct translation *t) {
642 if (t->length_of_array <= t->smallest_never_used_blocknum.b) {
643 // expansion is necessary
644 uint64_t new_length = t->smallest_never_used_blocknum.b * 2;
645 XREALLOC_N(new_length, t->block_translation);
646 uint64_t i;
647 for (i = t->length_of_array; i < new_length; i++) {
648 t->block_translation[i].u.next_free_blocknum = freelist_null;
649 t->block_translation[i].size = size_is_free;
650 }
651 t->length_of_array = new_length;
652 }
653 }
654
_allocate_blocknum_unlocked(BLOCKNUM * res,FT ft)655 void block_table::_allocate_blocknum_unlocked(BLOCKNUM *res, FT ft) {
656 toku_mutex_assert_locked(&_mutex);
657 BLOCKNUM result;
658 struct translation *t = &_current;
659 if (t->blocknum_freelist_head.b == freelist_null.b) {
660 // no previously used blocknums are available
661 // use a never used blocknum
662 _maybe_expand_translation(
663 t); // Ensure a never used blocknums is available
664 result = t->smallest_never_used_blocknum;
665 t->smallest_never_used_blocknum.b++;
666 } else { // reuse a previously used blocknum
667 result = t->blocknum_freelist_head;
668 BLOCKNUM next = t->block_translation[result.b].u.next_free_blocknum;
669 t->blocknum_freelist_head = next;
670 }
671 // Verify the blocknum is free
672 paranoid_invariant(t->block_translation[result.b].size == size_is_free);
673 // blocknum is not free anymore
674 t->block_translation[result.b].u.diskoff = diskoff_unused;
675 t->block_translation[result.b].size = 0;
676 _verify_valid_freeable_blocknum(t, result);
677 *res = result;
678 ft_set_dirty(ft, false);
679 }
680
allocate_blocknum(BLOCKNUM * res,FT ft)681 void block_table::allocate_blocknum(BLOCKNUM *res, FT ft) {
682 _mutex_lock();
683 _allocate_blocknum_unlocked(res, ft);
684 _mutex_unlock();
685 }
686
_free_blocknum_in_translation(struct translation * t,BLOCKNUM b)687 void block_table::_free_blocknum_in_translation(struct translation *t,
688 BLOCKNUM b) {
689 _verify_valid_freeable_blocknum(t, b);
690 paranoid_invariant(t->block_translation[b.b].size != size_is_free);
691
692 t->block_translation[b.b].size = size_is_free;
693 t->block_translation[b.b].u.next_free_blocknum = t->blocknum_freelist_head;
694 t->blocknum_freelist_head = b;
695 }
696
697 // Effect: Free a blocknum.
698 // If the blocknum holds the only reference to a block on disk, free that block
_free_blocknum_unlocked(BLOCKNUM * bp,FT ft,bool for_checkpoint)699 void block_table::_free_blocknum_unlocked(BLOCKNUM *bp,
700 FT ft,
701 bool for_checkpoint) {
702 toku_mutex_assert_locked(&_mutex);
703 BLOCKNUM b = *bp;
704 bp->b = 0; // Remove caller's reference.
705
706 struct block_translation_pair old_pair = _current.block_translation[b.b];
707
708 _free_blocknum_in_translation(&_current, b);
709 if (for_checkpoint) {
710 paranoid_invariant(ft->checkpoint_header->type ==
711 FT_CHECKPOINT_INPROGRESS);
712 _free_blocknum_in_translation(&_inprogress, b);
713 }
714
715 // If the size is 0, no disk block has ever been assigned to this blocknum.
716 if (old_pair.size > 0) {
717 // Free the old block if it is not still in use by the checkpoint in
718 // progress or the previous checkpoint
719 bool cannot_free =
720 _translation_prevents_freeing(&_inprogress, b, &old_pair) ||
721 _translation_prevents_freeing(&_checkpointed, b, &old_pair);
722 if (!cannot_free) {
723 _bt_block_allocator->FreeBlock(old_pair.u.diskoff, old_pair.size);
724 }
725 } else {
726 paranoid_invariant(old_pair.size == 0);
727 paranoid_invariant(old_pair.u.diskoff == diskoff_unused);
728 }
729 ft_set_dirty(ft, for_checkpoint);
730 }
731
free_blocknum(BLOCKNUM * bp,FT ft,bool for_checkpoint)732 void block_table::free_blocknum(BLOCKNUM *bp, FT ft, bool for_checkpoint) {
733 _mutex_lock();
734 _free_blocknum_unlocked(bp, ft, for_checkpoint);
735 _mutex_unlock();
736 }
737
738 // Verify there are no free blocks.
verify_no_free_blocknums()739 void block_table::verify_no_free_blocknums() {
740 invariant(_current.blocknum_freelist_head.b == freelist_null.b);
741 }
742
743 // Frees blocknums that have a size of 0 and unused diskoff
744 // Currently used for eliminating unused cached rollback log nodes
free_unused_blocknums(BLOCKNUM root)745 void block_table::free_unused_blocknums(BLOCKNUM root) {
746 _mutex_lock();
747 int64_t smallest = _current.smallest_never_used_blocknum.b;
748 for (int64_t i = RESERVED_BLOCKNUMS; i < smallest; i++) {
749 if (i == root.b) {
750 continue;
751 }
752 BLOCKNUM b = make_blocknum(i);
753 if (_current.block_translation[b.b].size == 0) {
754 invariant(_current.block_translation[b.b].u.diskoff ==
755 diskoff_unused);
756 _free_blocknum_in_translation(&_current, b);
757 }
758 }
759 _mutex_unlock();
760 }
761
_no_data_blocks_except_root(BLOCKNUM root)762 bool block_table::_no_data_blocks_except_root(BLOCKNUM root) {
763 bool ok = true;
764 _mutex_lock();
765 int64_t smallest = _current.smallest_never_used_blocknum.b;
766 if (root.b < RESERVED_BLOCKNUMS) {
767 ok = false;
768 goto cleanup;
769 }
770 for (int64_t i = RESERVED_BLOCKNUMS; i < smallest; i++) {
771 if (i == root.b) {
772 continue;
773 }
774 BLOCKNUM b = make_blocknum(i);
775 if (_current.block_translation[b.b].size != size_is_free) {
776 ok = false;
777 goto cleanup;
778 }
779 }
780 cleanup:
781 _mutex_unlock();
782 return ok;
783 }
784
785 // Verify there are no data blocks except root.
786 // TODO(leif): This actually takes a lock, but I don't want to fix all the
787 // callers right now.
verify_no_data_blocks_except_root(BLOCKNUM UU (root))788 void block_table::verify_no_data_blocks_except_root(BLOCKNUM UU(root)) {
789 paranoid_invariant(_no_data_blocks_except_root(root));
790 }
791
_blocknum_allocated(BLOCKNUM b)792 bool block_table::_blocknum_allocated(BLOCKNUM b) {
793 _mutex_lock();
794 struct translation *t = &_current;
795 _verify_valid_blocknum(t, b);
796 bool ok = t->block_translation[b.b].size != size_is_free;
797 _mutex_unlock();
798 return ok;
799 }
800
801 // Verify a blocknum is currently allocated.
verify_blocknum_allocated(BLOCKNUM UU (b))802 void block_table::verify_blocknum_allocated(BLOCKNUM UU(b)) {
803 paranoid_invariant(_blocknum_allocated(b));
804 }
805
806 // Only used by toku_dump_translation table (debug info)
_dump_translation_internal(FILE * f,struct translation * t)807 void block_table::_dump_translation_internal(FILE *f, struct translation *t) {
808 if (t->block_translation) {
809 BLOCKNUM b = make_blocknum(RESERVED_BLOCKNUM_TRANSLATION);
810 fprintf(f, " length_of_array[%" PRId64 "]", t->length_of_array);
811 fprintf(f,
812 " smallest_never_used_blocknum[%" PRId64 "]",
813 t->smallest_never_used_blocknum.b);
814 fprintf(f,
815 " blocknum_free_list_head[%" PRId64 "]",
816 t->blocknum_freelist_head.b);
817 fprintf(
818 f, " size_on_disk[%" PRId64 "]", t->block_translation[b.b].size);
819 fprintf(f,
820 " location_on_disk[%" PRId64 "]\n",
821 t->block_translation[b.b].u.diskoff);
822 int64_t i;
823 for (i = 0; i < t->length_of_array; i++) {
824 fprintf(f,
825 " %" PRId64 ": %" PRId64 " %" PRId64 "\n",
826 i,
827 t->block_translation[i].u.diskoff,
828 t->block_translation[i].size);
829 }
830 fprintf(f, "\n");
831 } else {
832 fprintf(f, " does not exist\n");
833 }
834 }
835
836 // Only used by toku_ft_dump which is only for debugging purposes
837 // "pretty" just means we use tabs so we can parse output easier later
dump_translation_table_pretty(FILE * f)838 void block_table::dump_translation_table_pretty(FILE *f) {
839 _mutex_lock();
840 struct translation *t = &_checkpointed;
841 invariant(t->block_translation != nullptr);
842 for (int64_t i = 0; i < t->length_of_array; ++i) {
843 fprintf(f,
844 "%" PRId64 "\t%" PRId64 "\t%" PRId64 "\n",
845 i,
846 t->block_translation[i].u.diskoff,
847 t->block_translation[i].size);
848 }
849 _mutex_unlock();
850 }
851
852 // Only used by toku_ft_dump which is only for debugging purposes
dump_translation_table(FILE * f)853 void block_table::dump_translation_table(FILE *f) {
854 _mutex_lock();
855 fprintf(f, "Current block translation:");
856 _dump_translation_internal(f, &_current);
857 fprintf(f, "Checkpoint in progress block translation:");
858 _dump_translation_internal(f, &_inprogress);
859 fprintf(f, "Checkpointed block translation:");
860 _dump_translation_internal(f, &_checkpointed);
861 _mutex_unlock();
862 }
863
864 // Only used by ftdump
blocknum_dump_translation(BLOCKNUM b)865 void block_table::blocknum_dump_translation(BLOCKNUM b) {
866 _mutex_lock();
867
868 struct translation *t = &_current;
869 if (b.b < t->length_of_array) {
870 struct block_translation_pair *bx = &t->block_translation[b.b];
871 printf("%" PRId64 ": %" PRId64 " %" PRId64 "\n",
872 b.b,
873 bx->u.diskoff,
874 bx->size);
875 }
876 _mutex_unlock();
877 }
878
879 // Must not call this function when anything else is using the blocktable.
880 // No one may use the blocktable afterwards.
destroy(void)881 void block_table::destroy(void) {
882 // TODO: translation.destroy();
883 toku_free(_current.block_translation);
884 toku_free(_inprogress.block_translation);
885 toku_free(_checkpointed.block_translation);
886
887 _bt_block_allocator->Destroy();
888 delete _bt_block_allocator;
889 toku_mutex_destroy(&_mutex);
890 nb_mutex_destroy(&_safe_file_size_lock);
891 }
892
_translation_deserialize_from_buffer(struct translation * t,DISKOFF location_on_disk,uint64_t size_on_disk,unsigned char * translation_buffer)893 int block_table::_translation_deserialize_from_buffer(
894 struct translation *t,
895 DISKOFF location_on_disk,
896 uint64_t size_on_disk,
897 // out: buffer with serialized translation
898 unsigned char *translation_buffer) {
899 int r = 0;
900 invariant(location_on_disk != 0);
901 t->type = TRANSLATION_CHECKPOINTED;
902
903 // check the checksum
904 uint32_t x1764 = toku_x1764_memory(translation_buffer, size_on_disk - 4);
905 uint64_t offset = size_on_disk - 4;
906 uint32_t stored_x1764 = toku_dtoh32(*(int *)(translation_buffer + offset));
907 if (x1764 != stored_x1764) {
908 fprintf(stderr,
909 "Translation table checksum failure: calc=0x%08x read=0x%08x\n",
910 x1764,
911 stored_x1764);
912 r = TOKUDB_BAD_CHECKSUM;
913 goto exit;
914 }
915
916 struct rbuf rb;
917 rb.buf = translation_buffer;
918 rb.ndone = 0;
919 rb.size = size_on_disk - 4; // 4==checksum
920
921 t->smallest_never_used_blocknum = rbuf_blocknum(&rb);
922 t->length_of_array = t->smallest_never_used_blocknum.b;
923 invariant(t->smallest_never_used_blocknum.b >= RESERVED_BLOCKNUMS);
924 t->blocknum_freelist_head = rbuf_blocknum(&rb);
925 XMALLOC_N(t->length_of_array, t->block_translation);
926 for (int64_t i = 0; i < t->length_of_array; i++) {
927 t->block_translation[i].u.diskoff = rbuf_DISKOFF(&rb);
928 t->block_translation[i].size = rbuf_DISKOFF(&rb);
929 }
930 invariant(_calculate_size_on_disk(t) == (int64_t)size_on_disk);
931 invariant(t->block_translation[RESERVED_BLOCKNUM_TRANSLATION].size ==
932 (int64_t)size_on_disk);
933 invariant(t->block_translation[RESERVED_BLOCKNUM_TRANSLATION].u.diskoff ==
934 location_on_disk);
935
936 exit:
937 return r;
938 }
939
iterate(enum translation_type type,BLOCKTABLE_CALLBACK f,void * extra,bool data_only,bool used_only)940 int block_table::iterate(enum translation_type type,
941 BLOCKTABLE_CALLBACK f,
942 void *extra,
943 bool data_only,
944 bool used_only) {
945 struct translation *src;
946
947 int r = 0;
948 switch (type) {
949 case TRANSLATION_CURRENT:
950 src = &_current;
951 break;
952 case TRANSLATION_INPROGRESS:
953 src = &_inprogress;
954 break;
955 case TRANSLATION_CHECKPOINTED:
956 src = &_checkpointed;
957 break;
958 default:
959 r = EINVAL;
960 }
961
962 struct translation fakecurrent;
963 memset(&fakecurrent, 0, sizeof(struct translation));
964
965 struct translation *t = &fakecurrent;
966 if (r == 0) {
967 _mutex_lock();
968 _copy_translation(t, src, TRANSLATION_DEBUG);
969 t->block_translation[RESERVED_BLOCKNUM_TRANSLATION] =
970 src->block_translation[RESERVED_BLOCKNUM_TRANSLATION];
971 _mutex_unlock();
972 int64_t i;
973 for (i = 0; i < t->smallest_never_used_blocknum.b; i++) {
974 struct block_translation_pair pair = t->block_translation[i];
975 if (data_only && i < RESERVED_BLOCKNUMS)
976 continue;
977 if (used_only && pair.size <= 0)
978 continue;
979 r = f(make_blocknum(i), pair.size, pair.u.diskoff, extra);
980 if (r != 0)
981 break;
982 }
983 toku_free(t->block_translation);
984 }
985 return r;
986 }
987
988 typedef struct {
989 int64_t used_space;
990 int64_t total_space;
991 } frag_extra;
992
frag_helper(BLOCKNUM UU (b),int64_t size,int64_t address,void * extra)993 static int frag_helper(BLOCKNUM UU(b),
994 int64_t size,
995 int64_t address,
996 void *extra) {
997 frag_extra *info = (frag_extra *)extra;
998
999 if (size + address > info->total_space)
1000 info->total_space = size + address;
1001 info->used_space += size;
1002 return 0;
1003 }
1004
internal_fragmentation(int64_t * total_sizep,int64_t * used_sizep)1005 void block_table::internal_fragmentation(int64_t *total_sizep,
1006 int64_t *used_sizep) {
1007 frag_extra info = {0, 0};
1008 int r = iterate(TRANSLATION_CHECKPOINTED, frag_helper, &info, false, true);
1009 invariant_zero(r);
1010
1011 if (total_sizep)
1012 *total_sizep = info.total_space;
1013 if (used_sizep)
1014 *used_sizep = info.used_space;
1015 }
1016
_realloc_descriptor_on_disk_unlocked(DISKOFF size,DISKOFF * offset,FT ft)1017 void block_table::_realloc_descriptor_on_disk_unlocked(DISKOFF size,
1018 DISKOFF *offset,
1019 FT ft) {
1020 toku_mutex_assert_locked(&_mutex);
1021 BLOCKNUM b = make_blocknum(RESERVED_BLOCKNUM_DESCRIPTOR);
1022 _realloc_on_disk_internal(b, size, offset, ft, false);
1023 }
1024
realloc_descriptor_on_disk(DISKOFF size,DISKOFF * offset,FT ft,int fd)1025 void block_table::realloc_descriptor_on_disk(DISKOFF size,
1026 DISKOFF *offset,
1027 FT ft,
1028 int fd) {
1029 _mutex_lock();
1030 _realloc_descriptor_on_disk_unlocked(size, offset, ft);
1031 _ensure_safe_write_unlocked(fd, size, *offset);
1032 _mutex_unlock();
1033 }
1034
get_descriptor_offset_size(DISKOFF * offset,DISKOFF * size)1035 void block_table::get_descriptor_offset_size(DISKOFF *offset, DISKOFF *size) {
1036 _mutex_lock();
1037 BLOCKNUM b = make_blocknum(RESERVED_BLOCKNUM_DESCRIPTOR);
1038 _translate_blocknum_to_offset_size_unlocked(b, offset, size);
1039 _mutex_unlock();
1040 }
1041
get_fragmentation_unlocked(TOKU_DB_FRAGMENTATION report)1042 void block_table::get_fragmentation_unlocked(TOKU_DB_FRAGMENTATION report) {
1043 // Requires: blocktable lock is held.
1044 // Requires: report->file_size_bytes is already filled in.
1045
1046 // Count the headers.
1047 report->data_bytes = BlockAllocator::BLOCK_ALLOCATOR_HEADER_RESERVE;
1048 report->data_blocks = 1;
1049 report->checkpoint_bytes_additional =
1050 BlockAllocator::BLOCK_ALLOCATOR_HEADER_RESERVE;
1051 report->checkpoint_blocks_additional = 1;
1052
1053 struct translation *current = &_current;
1054 for (int64_t i = 0; i < current->length_of_array; i++) {
1055 struct block_translation_pair *pair = ¤t->block_translation[i];
1056 if (pair->size > 0) {
1057 report->data_bytes += pair->size;
1058 report->data_blocks++;
1059 }
1060 }
1061
1062 struct translation *checkpointed = &_checkpointed;
1063 for (int64_t i = 0; i < checkpointed->length_of_array; i++) {
1064 struct block_translation_pair *pair =
1065 &checkpointed->block_translation[i];
1066 if (pair->size > 0 &&
1067 !(i < current->length_of_array &&
1068 current->block_translation[i].size > 0 &&
1069 current->block_translation[i].u.diskoff == pair->u.diskoff)) {
1070 report->checkpoint_bytes_additional += pair->size;
1071 report->checkpoint_blocks_additional++;
1072 }
1073 }
1074
1075 struct translation *inprogress = &_inprogress;
1076 for (int64_t i = 0; i < inprogress->length_of_array; i++) {
1077 struct block_translation_pair *pair = &inprogress->block_translation[i];
1078 if (pair->size > 0 &&
1079 !(i < current->length_of_array &&
1080 current->block_translation[i].size > 0 &&
1081 current->block_translation[i].u.diskoff == pair->u.diskoff) &&
1082 !(i < checkpointed->length_of_array &&
1083 checkpointed->block_translation[i].size > 0 &&
1084 checkpointed->block_translation[i].u.diskoff ==
1085 pair->u.diskoff)) {
1086 report->checkpoint_bytes_additional += pair->size;
1087 report->checkpoint_blocks_additional++;
1088 }
1089 }
1090
1091 _bt_block_allocator->UnusedStatistics(report);
1092 }
1093
get_info64(struct ftinfo64 * s)1094 void block_table::get_info64(struct ftinfo64 *s) {
1095 _mutex_lock();
1096
1097 struct translation *current = &_current;
1098 s->num_blocks_allocated = current->length_of_array;
1099 s->num_blocks_in_use = 0;
1100 s->size_allocated = 0;
1101 s->size_in_use = 0;
1102
1103 for (int64_t i = 0; i < current->length_of_array; ++i) {
1104 struct block_translation_pair *block = ¤t->block_translation[i];
1105 if (block->size != size_is_free) {
1106 ++s->num_blocks_in_use;
1107 s->size_in_use += block->size;
1108 if (block->u.diskoff != diskoff_unused) {
1109 uint64_t limit = block->u.diskoff + block->size;
1110 if (limit > s->size_allocated) {
1111 s->size_allocated = limit;
1112 }
1113 }
1114 }
1115 }
1116
1117 _mutex_unlock();
1118 }
1119
iterate_translation_tables(uint64_t checkpoint_count,int (* iter)(uint64_t checkpoint_count,int64_t total_num_rows,int64_t blocknum,int64_t diskoff,int64_t size,void * extra),void * iter_extra)1120 int block_table::iterate_translation_tables(
1121 uint64_t checkpoint_count,
1122 int (*iter)(uint64_t checkpoint_count,
1123 int64_t total_num_rows,
1124 int64_t blocknum,
1125 int64_t diskoff,
1126 int64_t size,
1127 void *extra),
1128 void *iter_extra) {
1129 int error = 0;
1130 _mutex_lock();
1131
1132 int64_t total_num_rows =
1133 _current.length_of_array + _checkpointed.length_of_array;
1134 for (int64_t i = 0; error == 0 && i < _current.length_of_array; ++i) {
1135 struct block_translation_pair *block = &_current.block_translation[i];
1136 error = iter(checkpoint_count,
1137 total_num_rows,
1138 i,
1139 block->u.diskoff,
1140 block->size,
1141 iter_extra);
1142 }
1143 for (int64_t i = 0; error == 0 && i < _checkpointed.length_of_array; ++i) {
1144 struct block_translation_pair *block =
1145 &_checkpointed.block_translation[i];
1146 error = iter(checkpoint_count - 1,
1147 total_num_rows,
1148 i,
1149 block->u.diskoff,
1150 block->size,
1151 iter_extra);
1152 }
1153
1154 _mutex_unlock();
1155 return error;
1156 }
1157