1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include "portability/memory.h"
40 #include "portability/toku_assert.h"
41 #include "portability/toku_portability.h"
42 #include "portability/toku_pthread.h"
43 
44 // ugly but pragmatic, need access to dirty bits while holding translation lock
45 // TODO: Refactor this (possibly with FT-301)
46 #include "ft/ft-internal.h"
47 
48 // TODO: reorganize this dependency (FT-303)
49 #include "ft/ft-ops.h"  // for toku_maybe_truncate_file
50 #include "ft/serialize/block_table.h"
51 #include "ft/serialize/rbuf.h"
52 #include "ft/serialize/wbuf.h"
53 #include "ft/serialize/block_allocator.h"
54 #include "util/nb_mutex.h"
55 #include "util/scoped_malloc.h"
56 
57 
58 toku_instr_key *block_table_mutex_key;
59 toku_instr_key *safe_file_size_lock_mutex_key;
60 toku_instr_key *safe_file_size_lock_rwlock_key;
61 
62 // indicates the end of a freelist
63 static const BLOCKNUM freelist_null = {-1};
64 
65 // value of block_translation_pair.size if blocknum is unused
66 static const DISKOFF size_is_free = (DISKOFF)-1;
67 
68 // value of block_translation_pair.u.diskoff if blocknum is used but does not
69 // yet have a diskblock
70 static const DISKOFF diskoff_unused = (DISKOFF)-2;
71 
_mutex_lock()72 void block_table::_mutex_lock() { toku_mutex_lock(&_mutex); }
73 
_mutex_unlock()74 void block_table::_mutex_unlock() { toku_mutex_unlock(&_mutex); }
75 
76 // TODO: Move lock to FT
toku_ft_lock(FT ft)77 void toku_ft_lock(FT ft) {
78     block_table *bt = &ft->blocktable;
79     bt->_mutex_lock();
80 }
81 
82 // TODO: Move lock to FT
toku_ft_unlock(FT ft)83 void toku_ft_unlock(FT ft) {
84     block_table *bt = &ft->blocktable;
85     toku_mutex_assert_locked(&bt->_mutex);
86     bt->_mutex_unlock();
87 }
88 
89 // There are two headers: the reserve must fit them both and be suitably
90 // aligned.
91 static_assert(BlockAllocator::BLOCK_ALLOCATOR_HEADER_RESERVE %
92                       BlockAllocator::BLOCK_ALLOCATOR_ALIGNMENT ==
93                   0,
94               "Block allocator's header reserve must be suitibly aligned");
95 static_assert(
96     BlockAllocator::BLOCK_ALLOCATOR_HEADER_RESERVE * 2 ==
97         BlockAllocator::BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE,
98     "Block allocator's total header reserve must exactly fit two headers");
99 
100 // does NOT initialize the block allocator: the caller is responsible
_create_internal()101 void block_table::_create_internal() {
102     memset(&_current, 0, sizeof(struct translation));
103     memset(&_inprogress, 0, sizeof(struct translation));
104     memset(&_checkpointed, 0, sizeof(struct translation));
105     memset(&_mutex, 0, sizeof(_mutex));
106     _bt_block_allocator = new BlockAllocator();
107     toku_mutex_init(*block_table_mutex_key, &_mutex, nullptr);
108     nb_mutex_init(*safe_file_size_lock_mutex_key,
109                   *safe_file_size_lock_rwlock_key,
110                   &_safe_file_size_lock);
111 }
112 
113 // Fill in the checkpointed translation from buffer, and copy checkpointed to
114 // current.
115 // The one read from disk is the last known checkpointed one, so we are keeping
116 // it in
117 // place and then setting current (which is never stored on disk) for current
118 // use.
119 // The translation_buffer has translation only, we create the rest of the
120 // block_table.
create_from_buffer(int fd,DISKOFF location_on_disk,DISKOFF size_on_disk,unsigned char * translation_buffer)121 int block_table::create_from_buffer(
122     int fd,
123     DISKOFF location_on_disk,  // Location of translation_buffer
124     DISKOFF size_on_disk,
125     unsigned char *translation_buffer) {
126     // Does not initialize the block allocator
127     _create_internal();
128 
129     // Deserialize the translation and copy it to current
130     int r = _translation_deserialize_from_buffer(
131         &_checkpointed, location_on_disk, size_on_disk, translation_buffer);
132     if (r != 0) {
133         return r;
134     }
135     _copy_translation(&_current, &_checkpointed, TRANSLATION_CURRENT);
136 
137     // Determine the file size
138     int64_t file_size = 0;
139     r = toku_os_get_file_size(fd, &file_size);
140     lazy_assert_zero(r);
141     invariant(file_size >= 0);
142     _safe_file_size = file_size;
143 
144     // Gather the non-empty translations and use them to create the block
145     // allocator
146     toku::scoped_malloc pairs_buf(_checkpointed.smallest_never_used_blocknum.b *
147                                   sizeof(struct BlockAllocator::BlockPair));
148     struct BlockAllocator::BlockPair *CAST_FROM_VOIDP(pairs, pairs_buf.get());
149     uint64_t n_pairs = 0;
150     for (int64_t i = 0; i < _checkpointed.smallest_never_used_blocknum.b; i++) {
151         struct block_translation_pair pair = _checkpointed.block_translation[i];
152         if (pair.size > 0) {
153             invariant(pair.u.diskoff != diskoff_unused);
154             pairs[n_pairs++] =
155                 BlockAllocator::BlockPair(pair.u.diskoff, pair.size);
156         }
157     }
158 
159     _bt_block_allocator->CreateFromBlockPairs(
160         BlockAllocator::BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE,
161         BlockAllocator::BLOCK_ALLOCATOR_ALIGNMENT,
162         pairs,
163         n_pairs);
164 
165     return 0;
166 }
167 
create()168 void block_table::create() {
169     // Does not initialize the block allocator
170     _create_internal();
171 
172     _checkpointed.type = TRANSLATION_CHECKPOINTED;
173     _checkpointed.smallest_never_used_blocknum =
174         make_blocknum(RESERVED_BLOCKNUMS);
175     _checkpointed.length_of_array =
176         _checkpointed.smallest_never_used_blocknum.b;
177     _checkpointed.blocknum_freelist_head = freelist_null;
178     XMALLOC_N(_checkpointed.length_of_array, _checkpointed.block_translation);
179     for (int64_t i = 0; i < _checkpointed.length_of_array; i++) {
180         _checkpointed.block_translation[i].size = 0;
181         _checkpointed.block_translation[i].u.diskoff = diskoff_unused;
182     }
183 
184     // we just created a default checkpointed, now copy it to current.
185     _copy_translation(&_current, &_checkpointed, TRANSLATION_CURRENT);
186 
187     // Create an empty block allocator.
188     _bt_block_allocator->Create(
189         BlockAllocator::BLOCK_ALLOCATOR_TOTAL_HEADER_RESERVE,
190         BlockAllocator::BLOCK_ALLOCATOR_ALIGNMENT);
191 }
192 
193 // TODO: Refactor with FT-303
ft_set_dirty(FT ft,bool for_checkpoint)194 static void ft_set_dirty(FT ft, bool for_checkpoint) {
195     invariant(ft->h->type == FT_CURRENT);
196     if (for_checkpoint) {
197         invariant(ft->checkpoint_header->type == FT_CHECKPOINT_INPROGRESS);
198         ft->checkpoint_header->set_dirty();
199     } else {
200         ft->h->set_dirty();
201     }
202 }
203 
_maybe_truncate_file(int fd,uint64_t size_needed_before)204 void block_table::_maybe_truncate_file(int fd, uint64_t size_needed_before) {
205     toku_mutex_assert_locked(&_mutex);
206     uint64_t new_size_needed = _bt_block_allocator->AllocatedLimit();
207     // Save a call to toku_os_get_file_size (kernel call) if unlikely to be
208     // useful.
209     if (new_size_needed < size_needed_before &&
210         new_size_needed < _safe_file_size) {
211         nb_mutex_lock(&_safe_file_size_lock, &_mutex);
212 
213         // Must hold _safe_file_size_lock to change _safe_file_size.
214         if (new_size_needed < _safe_file_size) {
215             int64_t safe_file_size_before = _safe_file_size;
216             // Not safe to use the 'to-be-truncated' portion until truncate is
217             // done.
218             _safe_file_size = new_size_needed;
219             _mutex_unlock();
220 
221             uint64_t size_after;
222             toku_maybe_truncate_file(
223                 fd, new_size_needed, safe_file_size_before, &size_after);
224             _mutex_lock();
225 
226             _safe_file_size = size_after;
227         }
228         nb_mutex_unlock(&_safe_file_size_lock);
229     }
230 }
231 
maybe_truncate_file_on_open(int fd)232 void block_table::maybe_truncate_file_on_open(int fd) {
233     _mutex_lock();
234     _maybe_truncate_file(fd, _safe_file_size);
235     _mutex_unlock();
236 }
237 
_copy_translation(struct translation * dst,struct translation * src,enum translation_type newtype)238 void block_table::_copy_translation(struct translation *dst,
239                                     struct translation *src,
240                                     enum translation_type newtype) {
241     // We intend to malloc a fresh block, so the incoming translation should be
242     // empty
243     invariant_null(dst->block_translation);
244 
245     invariant(src->length_of_array >= src->smallest_never_used_blocknum.b);
246     invariant(newtype == TRANSLATION_DEBUG ||
247               (src->type == TRANSLATION_CURRENT &&
248                newtype == TRANSLATION_INPROGRESS) ||
249               (src->type == TRANSLATION_CHECKPOINTED &&
250                newtype == TRANSLATION_CURRENT));
251     dst->type = newtype;
252     dst->smallest_never_used_blocknum = src->smallest_never_used_blocknum;
253     dst->blocknum_freelist_head = src->blocknum_freelist_head;
254 
255     // destination btt is of fixed size. Allocate + memcpy the exact length
256     // necessary.
257     dst->length_of_array = dst->smallest_never_used_blocknum.b;
258     XMALLOC_N(dst->length_of_array, dst->block_translation);
259     memcpy(dst->block_translation,
260            src->block_translation,
261            dst->length_of_array * sizeof(*dst->block_translation));
262 
263     // New version of btt is not yet stored on disk.
264     dst->block_translation[RESERVED_BLOCKNUM_TRANSLATION].size = 0;
265     dst->block_translation[RESERVED_BLOCKNUM_TRANSLATION].u.diskoff =
266         diskoff_unused;
267 }
268 
get_blocks_in_use_unlocked()269 int64_t block_table::get_blocks_in_use_unlocked() {
270     BLOCKNUM b;
271     struct translation *t = &_current;
272     int64_t num_blocks = 0;
273     {
274         // Reserved blocknums do not get upgraded; They are part of the header.
275         for (b.b = RESERVED_BLOCKNUMS; b.b < t->smallest_never_used_blocknum.b;
276              b.b++) {
277             if (t->block_translation[b.b].size != size_is_free) {
278                 num_blocks++;
279             }
280         }
281     }
282     return num_blocks;
283 }
284 
_maybe_optimize_translation(struct translation * t)285 void block_table::_maybe_optimize_translation(struct translation *t) {
286     // Reduce 'smallest_never_used_blocknum.b' (completely free blocknums
287     // instead of just
288     // on a free list.  Doing so requires us to regenerate the free list.
289     // This is O(n) work, so do it only if you're already doing that.
290 
291     BLOCKNUM b;
292     paranoid_invariant(t->smallest_never_used_blocknum.b >= RESERVED_BLOCKNUMS);
293     // Calculate how large the free suffix is.
294     int64_t freed;
295     {
296         for (b.b = t->smallest_never_used_blocknum.b; b.b > RESERVED_BLOCKNUMS;
297              b.b--) {
298             if (t->block_translation[b.b - 1].size != size_is_free) {
299                 break;
300             }
301         }
302         freed = t->smallest_never_used_blocknum.b - b.b;
303     }
304     if (freed > 0) {
305         t->smallest_never_used_blocknum.b = b.b;
306         if (t->length_of_array / 4 > t->smallest_never_used_blocknum.b) {
307             // We're using more memory than necessary to represent this now.
308             // Reduce.
309             uint64_t new_length = t->smallest_never_used_blocknum.b * 2;
310             XREALLOC_N(new_length, t->block_translation);
311             t->length_of_array = new_length;
312             // No need to zero anything out.
313         }
314 
315         // Regenerate free list.
316         t->blocknum_freelist_head.b = freelist_null.b;
317         for (b.b = RESERVED_BLOCKNUMS; b.b < t->smallest_never_used_blocknum.b;
318              b.b++) {
319             if (t->block_translation[b.b].size == size_is_free) {
320                 t->block_translation[b.b].u.next_free_blocknum =
321                     t->blocknum_freelist_head;
322                 t->blocknum_freelist_head = b;
323             }
324         }
325     }
326 }
327 
328 // block table must be locked by caller of this function
note_start_checkpoint_unlocked()329 void block_table::note_start_checkpoint_unlocked() {
330     toku_mutex_assert_locked(&_mutex);
331 
332     // We're going to do O(n) work to copy the translation, so we
333     // can afford to do O(n) work by optimizing the translation
334     _maybe_optimize_translation(&_current);
335 
336     // Copy current translation to inprogress translation.
337     _copy_translation(&_inprogress, &_current, TRANSLATION_INPROGRESS);
338 
339     _checkpoint_skipped = false;
340 }
341 
note_skipped_checkpoint()342 void block_table::note_skipped_checkpoint() {
343     // Purpose, alert block translation that the checkpoint was skipped, e.x.
344     // for a non-dirty header
345     _mutex_lock();
346     paranoid_invariant_notnull(_inprogress.block_translation);
347     _checkpoint_skipped = true;
348     _mutex_unlock();
349 }
350 
351 // Purpose: free any disk space used by previous checkpoint that isn't in use by
352 // either
353 //           - current state
354 //           - in-progress checkpoint
355 //          capture inprogress as new checkpointed.
356 // For each entry in checkpointBTT
357 //   if offset does not match offset in inprogress
358 //      assert offset does not match offset in current
359 //      free (offset,len) from checkpoint
360 // move inprogress to checkpoint (resetting type)
361 // inprogress = NULL
note_end_checkpoint(int fd)362 void block_table::note_end_checkpoint(int fd) {
363     // Free unused blocks
364     _mutex_lock();
365     uint64_t allocated_limit_at_start = _bt_block_allocator->AllocatedLimit();
366     paranoid_invariant_notnull(_inprogress.block_translation);
367     if (_checkpoint_skipped) {
368         toku_free(_inprogress.block_translation);
369         memset(&_inprogress, 0, sizeof(_inprogress));
370         goto end;
371     }
372 
373     // Make certain inprogress was allocated space on disk
374     invariant(
375         _inprogress.block_translation[RESERVED_BLOCKNUM_TRANSLATION].size > 0);
376     invariant(
377         _inprogress.block_translation[RESERVED_BLOCKNUM_TRANSLATION].u.diskoff >
378         0);
379 
380     {
381         struct translation *t = &_checkpointed;
382         for (int64_t i = 0; i < t->length_of_array; i++) {
383             struct block_translation_pair *pair = &t->block_translation[i];
384             if (pair->size > 0 &&
385                 !_translation_prevents_freeing(
386                     &_inprogress, make_blocknum(i), pair)) {
387                 invariant(!_translation_prevents_freeing(
388                               &_current, make_blocknum(i), pair));
389                 _bt_block_allocator->FreeBlock(pair->u.diskoff, pair->size);
390             }
391         }
392         toku_free(_checkpointed.block_translation);
393         _checkpointed = _inprogress;
394         _checkpointed.type = TRANSLATION_CHECKPOINTED;
395         memset(&_inprogress, 0, sizeof(_inprogress));
396         _maybe_truncate_file(fd, allocated_limit_at_start);
397     }
398 end:
399     _mutex_unlock();
400 }
401 
_is_valid_blocknum(struct translation * t,BLOCKNUM b)402 bool block_table::_is_valid_blocknum(struct translation *t, BLOCKNUM b) {
403     invariant(t->length_of_array >= t->smallest_never_used_blocknum.b);
404     return b.b >= 0 && b.b < t->smallest_never_used_blocknum.b;
405 }
406 
_verify_valid_blocknum(struct translation * UU (t),BLOCKNUM UU (b))407 void block_table::_verify_valid_blocknum(struct translation *UU(t),
408                                          BLOCKNUM UU(b)) {
409     invariant(_is_valid_blocknum(t, b));
410 }
411 
_is_valid_freeable_blocknum(struct translation * t,BLOCKNUM b)412 bool block_table::_is_valid_freeable_blocknum(struct translation *t,
413                                               BLOCKNUM b) {
414     invariant(t->length_of_array >= t->smallest_never_used_blocknum.b);
415     return b.b >= RESERVED_BLOCKNUMS && b.b < t->smallest_never_used_blocknum.b;
416 }
417 
418 // should be freeable
_verify_valid_freeable_blocknum(struct translation * UU (t),BLOCKNUM UU (b))419 void block_table::_verify_valid_freeable_blocknum(struct translation *UU(t),
420                                                   BLOCKNUM UU(b)) {
421     invariant(_is_valid_freeable_blocknum(t, b));
422 }
423 
424 // Also used only in ft-serialize-test.
block_free(uint64_t offset,uint64_t size)425 void block_table::block_free(uint64_t offset, uint64_t size) {
426     _mutex_lock();
427     _bt_block_allocator->FreeBlock(offset, size);
428     _mutex_unlock();
429 }
430 
_calculate_size_on_disk(struct translation * t)431 int64_t block_table::_calculate_size_on_disk(struct translation *t) {
432     return 8 +  // smallest_never_used_blocknum
433            8 +  // blocknum_freelist_head
434            t->smallest_never_used_blocknum.b * 16 +  // Array
435            4;                                        // 4 for checksum
436 }
437 
438 // We cannot free the disk space allocated to this blocknum if it is still in
439 // use by the given translation table.
_translation_prevents_freeing(struct translation * t,BLOCKNUM b,struct block_translation_pair * old_pair)440 bool block_table::_translation_prevents_freeing(
441     struct translation *t,
442     BLOCKNUM b,
443     struct block_translation_pair *old_pair) {
444     return t->block_translation && b.b < t->smallest_never_used_blocknum.b &&
445            old_pair->u.diskoff == t->block_translation[b.b].u.diskoff;
446 }
447 
_realloc_on_disk_internal(BLOCKNUM b,DISKOFF size,DISKOFF * offset,FT ft,bool for_checkpoint)448 void block_table::_realloc_on_disk_internal(BLOCKNUM b,
449                                             DISKOFF size,
450                                             DISKOFF *offset,
451                                             FT ft,
452                                             bool for_checkpoint) {
453     toku_mutex_assert_locked(&_mutex);
454     ft_set_dirty(ft, for_checkpoint);
455 
456     struct translation *t = &_current;
457     struct block_translation_pair old_pair = t->block_translation[b.b];
458     // Free the old block if it is not still in use by the checkpoint in
459     // progress or the previous checkpoint
460     bool cannot_free =
461         (!for_checkpoint &&
462          _translation_prevents_freeing(&_inprogress, b, &old_pair)) ||
463         _translation_prevents_freeing(&_checkpointed, b, &old_pair);
464     if (!cannot_free && old_pair.u.diskoff != diskoff_unused) {
465         _bt_block_allocator->FreeBlock(old_pair.u.diskoff, old_pair.size);
466     }
467 
468     uint64_t allocator_offset = diskoff_unused;
469     t->block_translation[b.b].size = size;
470     if (size > 0) {
471         // Allocate a new block if the size is greater than 0,
472         // if the size is just 0, offset will be set to diskoff_unused
473         _bt_block_allocator->AllocBlock(size, &allocator_offset);
474     }
475     t->block_translation[b.b].u.diskoff = allocator_offset;
476     *offset = allocator_offset;
477 
478     // Update inprogress btt if appropriate (if called because Pending bit is
479     // set).
480     if (for_checkpoint) {
481         paranoid_invariant(b.b < _inprogress.length_of_array);
482         _inprogress.block_translation[b.b] = t->block_translation[b.b];
483     }
484 }
485 
_ensure_safe_write_unlocked(int fd,DISKOFF block_size,DISKOFF block_offset)486 void block_table::_ensure_safe_write_unlocked(int fd,
487                                               DISKOFF block_size,
488                                               DISKOFF block_offset) {
489     // Requires: holding _mutex
490     uint64_t size_needed = block_size + block_offset;
491     if (size_needed > _safe_file_size) {
492         // Must hold _safe_file_size_lock to change _safe_file_size.
493         nb_mutex_lock(&_safe_file_size_lock, &_mutex);
494         if (size_needed > _safe_file_size) {
495             _mutex_unlock();
496 
497             int64_t size_after;
498             toku_maybe_preallocate_in_file(
499                 fd, size_needed, _safe_file_size, &size_after);
500 
501             _mutex_lock();
502             _safe_file_size = size_after;
503         }
504         nb_mutex_unlock(&_safe_file_size_lock);
505     }
506 }
507 
realloc_on_disk(BLOCKNUM b,DISKOFF size,DISKOFF * offset,FT ft,int fd,bool for_checkpoint)508 void block_table::realloc_on_disk(BLOCKNUM b,
509                                   DISKOFF size,
510                                   DISKOFF *offset,
511                                   FT ft,
512                                   int fd,
513                                   bool for_checkpoint) {
514     _mutex_lock();
515     struct translation *t = &_current;
516     _verify_valid_freeable_blocknum(t, b);
517     _realloc_on_disk_internal(b, size, offset, ft, for_checkpoint);
518 
519     _ensure_safe_write_unlocked(fd, size, *offset);
520     _mutex_unlock();
521 }
522 
_pair_is_unallocated(struct block_translation_pair * pair)523 bool block_table::_pair_is_unallocated(struct block_translation_pair *pair) {
524     return pair->size == 0 && pair->u.diskoff == diskoff_unused;
525 }
526 
527 // Effect: figure out where to put the inprogress btt on disk, allocate space
528 // for it there.
529 //   The space must be 512-byte aligned (both the starting address and the
530 //   size).
531 //   As a result, the allcoated space may be a little bit bigger (up to the next
532 //   512-byte boundary) than the actual btt.
_alloc_inprogress_translation_on_disk_unlocked()533 void block_table::_alloc_inprogress_translation_on_disk_unlocked() {
534     toku_mutex_assert_locked(&_mutex);
535 
536     struct translation *t = &_inprogress;
537     paranoid_invariant_notnull(t->block_translation);
538     BLOCKNUM b = make_blocknum(RESERVED_BLOCKNUM_TRANSLATION);
539     // Each inprogress is allocated only once
540     paranoid_invariant(_pair_is_unallocated(&t->block_translation[b.b]));
541 
542     // Allocate a new block
543     int64_t size = _calculate_size_on_disk(t);
544     uint64_t offset;
545     _bt_block_allocator->AllocBlock(size, &offset);
546     t->block_translation[b.b].u.diskoff = offset;
547     t->block_translation[b.b].size = size;
548 }
549 
550 // Effect: Serializes the blocktable to a wbuf (which starts uninitialized)
551 //   A clean shutdown runs checkpoint start so that current and inprogress are
552 //   copies.
553 //   The resulting wbuf buffer is guaranteed to be be 512-byte aligned and the
554 //   total length is a multiple of 512 (so we pad with zeros at the end if
555 //   needd)
556 //   The address is guaranteed to be 512-byte aligned, but the size is not
557 //   guaranteed.
558 //   It *is* guaranteed that we can read up to the next 512-byte boundary,
559 //   however
serialize_translation_to_wbuf(int fd,struct wbuf * w,int64_t * address,int64_t * size)560 void block_table::serialize_translation_to_wbuf(int fd,
561                                                 struct wbuf *w,
562                                                 int64_t *address,
563                                                 int64_t *size) {
564     _mutex_lock();
565     struct translation *t = &_inprogress;
566 
567     BLOCKNUM b = make_blocknum(RESERVED_BLOCKNUM_TRANSLATION);
568     _alloc_inprogress_translation_on_disk_unlocked();  // The allocated block
569                                                        // must be 512-byte
570                                                        // aligned to make
571                                                        // O_DIRECT happy.
572     uint64_t size_translation = _calculate_size_on_disk(t);
573     uint64_t size_aligned = roundup_to_multiple(512, size_translation);
574     invariant((int64_t)size_translation == t->block_translation[b.b].size);
575     {
576         // Init wbuf
577         if (0)
578             printf(
579                 "%s:%d writing translation table of size_translation %" PRIu64
580                 " at %" PRId64 "\n",
581                 __FILE__,
582                 __LINE__,
583                 size_translation,
584                 t->block_translation[b.b].u.diskoff);
585         char *XMALLOC_N_ALIGNED(512, size_aligned, buf);
586         for (uint64_t i = size_translation; i < size_aligned; i++)
587             buf[i] = 0;  // fill in the end of the buffer with zeros.
588         wbuf_init(w, buf, size_aligned);
589     }
590     wbuf_BLOCKNUM(w, t->smallest_never_used_blocknum);
591     wbuf_BLOCKNUM(w, t->blocknum_freelist_head);
592     int64_t i;
593     for (i = 0; i < t->smallest_never_used_blocknum.b; i++) {
594         if (0)
595             printf("%s:%d %" PRId64 ",%" PRId64 "\n",
596                    __FILE__,
597                    __LINE__,
598                    t->block_translation[i].u.diskoff,
599                    t->block_translation[i].size);
600         wbuf_DISKOFF(w, t->block_translation[i].u.diskoff);
601         wbuf_DISKOFF(w, t->block_translation[i].size);
602     }
603     uint32_t checksum = toku_x1764_finish(&w->checksum);
604     wbuf_int(w, checksum);
605     *address = t->block_translation[b.b].u.diskoff;
606     *size = size_translation;
607     invariant((*address) % 512 == 0);
608 
609     _ensure_safe_write_unlocked(fd, size_aligned, *address);
610     _mutex_unlock();
611 }
612 
613 // Perhaps rename: purpose is get disk address of a block, given its blocknum
614 // (blockid?)
_translate_blocknum_to_offset_size_unlocked(BLOCKNUM b,DISKOFF * offset,DISKOFF * size)615 void block_table::_translate_blocknum_to_offset_size_unlocked(BLOCKNUM b,
616                                                               DISKOFF *offset,
617                                                               DISKOFF *size) {
618     struct translation *t = &_current;
619     _verify_valid_blocknum(t, b);
620     if (offset) {
621         *offset = t->block_translation[b.b].u.diskoff;
622     }
623     if (size) {
624         *size = t->block_translation[b.b].size;
625     }
626 }
627 
628 // Perhaps rename: purpose is get disk address of a block, given its blocknum
629 // (blockid?)
translate_blocknum_to_offset_size(BLOCKNUM b,DISKOFF * offset,DISKOFF * size)630 void block_table::translate_blocknum_to_offset_size(BLOCKNUM b,
631                                                     DISKOFF *offset,
632                                                     DISKOFF *size) {
633     _mutex_lock();
634     _translate_blocknum_to_offset_size_unlocked(b, offset, size);
635     _mutex_unlock();
636 }
637 
638 // Only called by toku_allocate_blocknum
639 // Effect: expand the array to maintain size invariant
640 // given that one more never-used blocknum will soon be used.
_maybe_expand_translation(struct translation * t)641 void block_table::_maybe_expand_translation(struct translation *t) {
642     if (t->length_of_array <= t->smallest_never_used_blocknum.b) {
643         // expansion is necessary
644         uint64_t new_length = t->smallest_never_used_blocknum.b * 2;
645         XREALLOC_N(new_length, t->block_translation);
646         uint64_t i;
647         for (i = t->length_of_array; i < new_length; i++) {
648             t->block_translation[i].u.next_free_blocknum = freelist_null;
649             t->block_translation[i].size = size_is_free;
650         }
651         t->length_of_array = new_length;
652     }
653 }
654 
_allocate_blocknum_unlocked(BLOCKNUM * res,FT ft)655 void block_table::_allocate_blocknum_unlocked(BLOCKNUM *res, FT ft) {
656     toku_mutex_assert_locked(&_mutex);
657     BLOCKNUM result;
658     struct translation *t = &_current;
659     if (t->blocknum_freelist_head.b == freelist_null.b) {
660         // no previously used blocknums are available
661         // use a never used blocknum
662         _maybe_expand_translation(
663             t);  // Ensure a never used blocknums is available
664         result = t->smallest_never_used_blocknum;
665         t->smallest_never_used_blocknum.b++;
666     } else {  // reuse a previously used blocknum
667         result = t->blocknum_freelist_head;
668         BLOCKNUM next = t->block_translation[result.b].u.next_free_blocknum;
669         t->blocknum_freelist_head = next;
670     }
671     // Verify the blocknum is free
672     paranoid_invariant(t->block_translation[result.b].size == size_is_free);
673     // blocknum is not free anymore
674     t->block_translation[result.b].u.diskoff = diskoff_unused;
675     t->block_translation[result.b].size = 0;
676     _verify_valid_freeable_blocknum(t, result);
677     *res = result;
678     ft_set_dirty(ft, false);
679 }
680 
allocate_blocknum(BLOCKNUM * res,FT ft)681 void block_table::allocate_blocknum(BLOCKNUM *res, FT ft) {
682     _mutex_lock();
683     _allocate_blocknum_unlocked(res, ft);
684     _mutex_unlock();
685 }
686 
_free_blocknum_in_translation(struct translation * t,BLOCKNUM b)687 void block_table::_free_blocknum_in_translation(struct translation *t,
688                                                 BLOCKNUM b) {
689     _verify_valid_freeable_blocknum(t, b);
690     paranoid_invariant(t->block_translation[b.b].size != size_is_free);
691 
692     t->block_translation[b.b].size = size_is_free;
693     t->block_translation[b.b].u.next_free_blocknum = t->blocknum_freelist_head;
694     t->blocknum_freelist_head = b;
695 }
696 
697 // Effect: Free a blocknum.
698 // If the blocknum holds the only reference to a block on disk, free that block
_free_blocknum_unlocked(BLOCKNUM * bp,FT ft,bool for_checkpoint)699 void block_table::_free_blocknum_unlocked(BLOCKNUM *bp,
700                                           FT ft,
701                                           bool for_checkpoint) {
702     toku_mutex_assert_locked(&_mutex);
703     BLOCKNUM b = *bp;
704     bp->b = 0;  // Remove caller's reference.
705 
706     struct block_translation_pair old_pair = _current.block_translation[b.b];
707 
708     _free_blocknum_in_translation(&_current, b);
709     if (for_checkpoint) {
710         paranoid_invariant(ft->checkpoint_header->type ==
711                            FT_CHECKPOINT_INPROGRESS);
712         _free_blocknum_in_translation(&_inprogress, b);
713     }
714 
715     // If the size is 0, no disk block has ever been assigned to this blocknum.
716     if (old_pair.size > 0) {
717         // Free the old block if it is not still in use by the checkpoint in
718         // progress or the previous checkpoint
719         bool cannot_free =
720             _translation_prevents_freeing(&_inprogress, b, &old_pair) ||
721             _translation_prevents_freeing(&_checkpointed, b, &old_pair);
722         if (!cannot_free) {
723             _bt_block_allocator->FreeBlock(old_pair.u.diskoff, old_pair.size);
724         }
725     } else {
726         paranoid_invariant(old_pair.size == 0);
727         paranoid_invariant(old_pair.u.diskoff == diskoff_unused);
728     }
729     ft_set_dirty(ft, for_checkpoint);
730 }
731 
free_blocknum(BLOCKNUM * bp,FT ft,bool for_checkpoint)732 void block_table::free_blocknum(BLOCKNUM *bp, FT ft, bool for_checkpoint) {
733     _mutex_lock();
734     _free_blocknum_unlocked(bp, ft, for_checkpoint);
735     _mutex_unlock();
736 }
737 
738 // Verify there are no free blocks.
verify_no_free_blocknums()739 void block_table::verify_no_free_blocknums() {
740     invariant(_current.blocknum_freelist_head.b == freelist_null.b);
741 }
742 
743 // Frees blocknums that have a size of 0 and unused diskoff
744 // Currently used for eliminating unused cached rollback log nodes
free_unused_blocknums(BLOCKNUM root)745 void block_table::free_unused_blocknums(BLOCKNUM root) {
746     _mutex_lock();
747     int64_t smallest = _current.smallest_never_used_blocknum.b;
748     for (int64_t i = RESERVED_BLOCKNUMS; i < smallest; i++) {
749         if (i == root.b) {
750             continue;
751         }
752         BLOCKNUM b = make_blocknum(i);
753         if (_current.block_translation[b.b].size == 0) {
754             invariant(_current.block_translation[b.b].u.diskoff ==
755                       diskoff_unused);
756             _free_blocknum_in_translation(&_current, b);
757         }
758     }
759     _mutex_unlock();
760 }
761 
_no_data_blocks_except_root(BLOCKNUM root)762 bool block_table::_no_data_blocks_except_root(BLOCKNUM root) {
763     bool ok = true;
764     _mutex_lock();
765     int64_t smallest = _current.smallest_never_used_blocknum.b;
766     if (root.b < RESERVED_BLOCKNUMS) {
767         ok = false;
768         goto cleanup;
769     }
770     for (int64_t i = RESERVED_BLOCKNUMS; i < smallest; i++) {
771         if (i == root.b) {
772             continue;
773         }
774         BLOCKNUM b = make_blocknum(i);
775         if (_current.block_translation[b.b].size != size_is_free) {
776             ok = false;
777             goto cleanup;
778         }
779     }
780 cleanup:
781     _mutex_unlock();
782     return ok;
783 }
784 
785 // Verify there are no data blocks except root.
786 // TODO(leif): This actually takes a lock, but I don't want to fix all the
787 // callers right now.
verify_no_data_blocks_except_root(BLOCKNUM UU (root))788 void block_table::verify_no_data_blocks_except_root(BLOCKNUM UU(root)) {
789     paranoid_invariant(_no_data_blocks_except_root(root));
790 }
791 
_blocknum_allocated(BLOCKNUM b)792 bool block_table::_blocknum_allocated(BLOCKNUM b) {
793     _mutex_lock();
794     struct translation *t = &_current;
795     _verify_valid_blocknum(t, b);
796     bool ok = t->block_translation[b.b].size != size_is_free;
797     _mutex_unlock();
798     return ok;
799 }
800 
801 // Verify a blocknum is currently allocated.
verify_blocknum_allocated(BLOCKNUM UU (b))802 void block_table::verify_blocknum_allocated(BLOCKNUM UU(b)) {
803     paranoid_invariant(_blocknum_allocated(b));
804 }
805 
806 // Only used by toku_dump_translation table (debug info)
_dump_translation_internal(FILE * f,struct translation * t)807 void block_table::_dump_translation_internal(FILE *f, struct translation *t) {
808     if (t->block_translation) {
809         BLOCKNUM b = make_blocknum(RESERVED_BLOCKNUM_TRANSLATION);
810         fprintf(f, " length_of_array[%" PRId64 "]", t->length_of_array);
811         fprintf(f,
812                 " smallest_never_used_blocknum[%" PRId64 "]",
813                 t->smallest_never_used_blocknum.b);
814         fprintf(f,
815                 " blocknum_free_list_head[%" PRId64 "]",
816                 t->blocknum_freelist_head.b);
817         fprintf(
818             f, " size_on_disk[%" PRId64 "]", t->block_translation[b.b].size);
819         fprintf(f,
820                 " location_on_disk[%" PRId64 "]\n",
821                 t->block_translation[b.b].u.diskoff);
822         int64_t i;
823         for (i = 0; i < t->length_of_array; i++) {
824             fprintf(f,
825                     " %" PRId64 ": %" PRId64 " %" PRId64 "\n",
826                     i,
827                     t->block_translation[i].u.diskoff,
828                     t->block_translation[i].size);
829         }
830         fprintf(f, "\n");
831     } else {
832         fprintf(f, " does not exist\n");
833     }
834 }
835 
836 // Only used by toku_ft_dump which is only for debugging purposes
837 // "pretty" just means we use tabs so we can parse output easier later
dump_translation_table_pretty(FILE * f)838 void block_table::dump_translation_table_pretty(FILE *f) {
839     _mutex_lock();
840     struct translation *t = &_checkpointed;
841     invariant(t->block_translation != nullptr);
842     for (int64_t i = 0; i < t->length_of_array; ++i) {
843         fprintf(f,
844                 "%" PRId64 "\t%" PRId64 "\t%" PRId64 "\n",
845                 i,
846                 t->block_translation[i].u.diskoff,
847                 t->block_translation[i].size);
848     }
849     _mutex_unlock();
850 }
851 
852 // Only used by toku_ft_dump which is only for debugging purposes
dump_translation_table(FILE * f)853 void block_table::dump_translation_table(FILE *f) {
854     _mutex_lock();
855     fprintf(f, "Current block translation:");
856     _dump_translation_internal(f, &_current);
857     fprintf(f, "Checkpoint in progress block translation:");
858     _dump_translation_internal(f, &_inprogress);
859     fprintf(f, "Checkpointed block translation:");
860     _dump_translation_internal(f, &_checkpointed);
861     _mutex_unlock();
862 }
863 
864 // Only used by ftdump
blocknum_dump_translation(BLOCKNUM b)865 void block_table::blocknum_dump_translation(BLOCKNUM b) {
866     _mutex_lock();
867 
868     struct translation *t = &_current;
869     if (b.b < t->length_of_array) {
870         struct block_translation_pair *bx = &t->block_translation[b.b];
871         printf("%" PRId64 ": %" PRId64 " %" PRId64 "\n",
872                b.b,
873                bx->u.diskoff,
874                bx->size);
875     }
876     _mutex_unlock();
877 }
878 
879 // Must not call this function when anything else is using the blocktable.
880 // No one may use the blocktable afterwards.
destroy(void)881 void block_table::destroy(void) {
882     // TODO: translation.destroy();
883     toku_free(_current.block_translation);
884     toku_free(_inprogress.block_translation);
885     toku_free(_checkpointed.block_translation);
886 
887     _bt_block_allocator->Destroy();
888     delete _bt_block_allocator;
889     toku_mutex_destroy(&_mutex);
890     nb_mutex_destroy(&_safe_file_size_lock);
891 }
892 
_translation_deserialize_from_buffer(struct translation * t,DISKOFF location_on_disk,uint64_t size_on_disk,unsigned char * translation_buffer)893 int block_table::_translation_deserialize_from_buffer(
894     struct translation *t,
895     DISKOFF location_on_disk,
896     uint64_t size_on_disk,
897     // out: buffer with serialized translation
898     unsigned char *translation_buffer) {
899     int r = 0;
900     invariant(location_on_disk != 0);
901     t->type = TRANSLATION_CHECKPOINTED;
902 
903     // check the checksum
904     uint32_t x1764 = toku_x1764_memory(translation_buffer, size_on_disk - 4);
905     uint64_t offset = size_on_disk - 4;
906     uint32_t stored_x1764 = toku_dtoh32(*(int *)(translation_buffer + offset));
907     if (x1764 != stored_x1764) {
908         fprintf(stderr,
909                 "Translation table checksum failure: calc=0x%08x read=0x%08x\n",
910                 x1764,
911                 stored_x1764);
912         r = TOKUDB_BAD_CHECKSUM;
913         goto exit;
914     }
915 
916     struct rbuf rb;
917     rb.buf = translation_buffer;
918     rb.ndone = 0;
919     rb.size = size_on_disk - 4;  // 4==checksum
920 
921     t->smallest_never_used_blocknum = rbuf_blocknum(&rb);
922     t->length_of_array = t->smallest_never_used_blocknum.b;
923     invariant(t->smallest_never_used_blocknum.b >= RESERVED_BLOCKNUMS);
924     t->blocknum_freelist_head = rbuf_blocknum(&rb);
925     XMALLOC_N(t->length_of_array, t->block_translation);
926     for (int64_t i = 0; i < t->length_of_array; i++) {
927         t->block_translation[i].u.diskoff = rbuf_DISKOFF(&rb);
928         t->block_translation[i].size = rbuf_DISKOFF(&rb);
929     }
930     invariant(_calculate_size_on_disk(t) == (int64_t)size_on_disk);
931     invariant(t->block_translation[RESERVED_BLOCKNUM_TRANSLATION].size ==
932               (int64_t)size_on_disk);
933     invariant(t->block_translation[RESERVED_BLOCKNUM_TRANSLATION].u.diskoff ==
934               location_on_disk);
935 
936 exit:
937     return r;
938 }
939 
iterate(enum translation_type type,BLOCKTABLE_CALLBACK f,void * extra,bool data_only,bool used_only)940 int block_table::iterate(enum translation_type type,
941                          BLOCKTABLE_CALLBACK f,
942                          void *extra,
943                          bool data_only,
944                          bool used_only) {
945     struct translation *src;
946 
947     int r = 0;
948     switch (type) {
949         case TRANSLATION_CURRENT:
950             src = &_current;
951             break;
952         case TRANSLATION_INPROGRESS:
953             src = &_inprogress;
954             break;
955         case TRANSLATION_CHECKPOINTED:
956             src = &_checkpointed;
957             break;
958         default:
959             r = EINVAL;
960     }
961 
962     struct translation fakecurrent;
963     memset(&fakecurrent, 0, sizeof(struct translation));
964 
965     struct translation *t = &fakecurrent;
966     if (r == 0) {
967         _mutex_lock();
968         _copy_translation(t, src, TRANSLATION_DEBUG);
969         t->block_translation[RESERVED_BLOCKNUM_TRANSLATION] =
970             src->block_translation[RESERVED_BLOCKNUM_TRANSLATION];
971         _mutex_unlock();
972         int64_t i;
973         for (i = 0; i < t->smallest_never_used_blocknum.b; i++) {
974             struct block_translation_pair pair = t->block_translation[i];
975             if (data_only && i < RESERVED_BLOCKNUMS)
976                 continue;
977             if (used_only && pair.size <= 0)
978                 continue;
979             r = f(make_blocknum(i), pair.size, pair.u.diskoff, extra);
980             if (r != 0)
981                 break;
982         }
983         toku_free(t->block_translation);
984     }
985     return r;
986 }
987 
988 typedef struct {
989     int64_t used_space;
990     int64_t total_space;
991 } frag_extra;
992 
frag_helper(BLOCKNUM UU (b),int64_t size,int64_t address,void * extra)993 static int frag_helper(BLOCKNUM UU(b),
994                        int64_t size,
995                        int64_t address,
996                        void *extra) {
997     frag_extra *info = (frag_extra *)extra;
998 
999     if (size + address > info->total_space)
1000         info->total_space = size + address;
1001     info->used_space += size;
1002     return 0;
1003 }
1004 
internal_fragmentation(int64_t * total_sizep,int64_t * used_sizep)1005 void block_table::internal_fragmentation(int64_t *total_sizep,
1006                                          int64_t *used_sizep) {
1007     frag_extra info = {0, 0};
1008     int r = iterate(TRANSLATION_CHECKPOINTED, frag_helper, &info, false, true);
1009     invariant_zero(r);
1010 
1011     if (total_sizep)
1012         *total_sizep = info.total_space;
1013     if (used_sizep)
1014         *used_sizep = info.used_space;
1015 }
1016 
_realloc_descriptor_on_disk_unlocked(DISKOFF size,DISKOFF * offset,FT ft)1017 void block_table::_realloc_descriptor_on_disk_unlocked(DISKOFF size,
1018                                                        DISKOFF *offset,
1019                                                        FT ft) {
1020     toku_mutex_assert_locked(&_mutex);
1021     BLOCKNUM b = make_blocknum(RESERVED_BLOCKNUM_DESCRIPTOR);
1022     _realloc_on_disk_internal(b, size, offset, ft, false);
1023 }
1024 
realloc_descriptor_on_disk(DISKOFF size,DISKOFF * offset,FT ft,int fd)1025 void block_table::realloc_descriptor_on_disk(DISKOFF size,
1026                                              DISKOFF *offset,
1027                                              FT ft,
1028                                              int fd) {
1029     _mutex_lock();
1030     _realloc_descriptor_on_disk_unlocked(size, offset, ft);
1031     _ensure_safe_write_unlocked(fd, size, *offset);
1032     _mutex_unlock();
1033 }
1034 
get_descriptor_offset_size(DISKOFF * offset,DISKOFF * size)1035 void block_table::get_descriptor_offset_size(DISKOFF *offset, DISKOFF *size) {
1036     _mutex_lock();
1037     BLOCKNUM b = make_blocknum(RESERVED_BLOCKNUM_DESCRIPTOR);
1038     _translate_blocknum_to_offset_size_unlocked(b, offset, size);
1039     _mutex_unlock();
1040 }
1041 
get_fragmentation_unlocked(TOKU_DB_FRAGMENTATION report)1042 void block_table::get_fragmentation_unlocked(TOKU_DB_FRAGMENTATION report) {
1043     // Requires:  blocktable lock is held.
1044     // Requires:  report->file_size_bytes is already filled in.
1045 
1046     // Count the headers.
1047     report->data_bytes = BlockAllocator::BLOCK_ALLOCATOR_HEADER_RESERVE;
1048     report->data_blocks = 1;
1049     report->checkpoint_bytes_additional =
1050         BlockAllocator::BLOCK_ALLOCATOR_HEADER_RESERVE;
1051     report->checkpoint_blocks_additional = 1;
1052 
1053     struct translation *current = &_current;
1054     for (int64_t i = 0; i < current->length_of_array; i++) {
1055         struct block_translation_pair *pair = &current->block_translation[i];
1056         if (pair->size > 0) {
1057             report->data_bytes += pair->size;
1058             report->data_blocks++;
1059         }
1060     }
1061 
1062     struct translation *checkpointed = &_checkpointed;
1063     for (int64_t i = 0; i < checkpointed->length_of_array; i++) {
1064         struct block_translation_pair *pair =
1065             &checkpointed->block_translation[i];
1066         if (pair->size > 0 &&
1067             !(i < current->length_of_array &&
1068               current->block_translation[i].size > 0 &&
1069               current->block_translation[i].u.diskoff == pair->u.diskoff)) {
1070             report->checkpoint_bytes_additional += pair->size;
1071             report->checkpoint_blocks_additional++;
1072         }
1073     }
1074 
1075     struct translation *inprogress = &_inprogress;
1076     for (int64_t i = 0; i < inprogress->length_of_array; i++) {
1077         struct block_translation_pair *pair = &inprogress->block_translation[i];
1078         if (pair->size > 0 &&
1079             !(i < current->length_of_array &&
1080               current->block_translation[i].size > 0 &&
1081               current->block_translation[i].u.diskoff == pair->u.diskoff) &&
1082             !(i < checkpointed->length_of_array &&
1083               checkpointed->block_translation[i].size > 0 &&
1084               checkpointed->block_translation[i].u.diskoff ==
1085                   pair->u.diskoff)) {
1086             report->checkpoint_bytes_additional += pair->size;
1087             report->checkpoint_blocks_additional++;
1088         }
1089     }
1090 
1091     _bt_block_allocator->UnusedStatistics(report);
1092 }
1093 
get_info64(struct ftinfo64 * s)1094 void block_table::get_info64(struct ftinfo64 *s) {
1095     _mutex_lock();
1096 
1097     struct translation *current = &_current;
1098     s->num_blocks_allocated = current->length_of_array;
1099     s->num_blocks_in_use = 0;
1100     s->size_allocated = 0;
1101     s->size_in_use = 0;
1102 
1103     for (int64_t i = 0; i < current->length_of_array; ++i) {
1104         struct block_translation_pair *block = &current->block_translation[i];
1105         if (block->size != size_is_free) {
1106             ++s->num_blocks_in_use;
1107             s->size_in_use += block->size;
1108             if (block->u.diskoff != diskoff_unused) {
1109                 uint64_t limit = block->u.diskoff + block->size;
1110                 if (limit > s->size_allocated) {
1111                     s->size_allocated = limit;
1112                 }
1113             }
1114         }
1115     }
1116 
1117     _mutex_unlock();
1118 }
1119 
iterate_translation_tables(uint64_t checkpoint_count,int (* iter)(uint64_t checkpoint_count,int64_t total_num_rows,int64_t blocknum,int64_t diskoff,int64_t size,void * extra),void * iter_extra)1120 int block_table::iterate_translation_tables(
1121     uint64_t checkpoint_count,
1122     int (*iter)(uint64_t checkpoint_count,
1123                 int64_t total_num_rows,
1124                 int64_t blocknum,
1125                 int64_t diskoff,
1126                 int64_t size,
1127                 void *extra),
1128     void *iter_extra) {
1129     int error = 0;
1130     _mutex_lock();
1131 
1132     int64_t total_num_rows =
1133         _current.length_of_array + _checkpointed.length_of_array;
1134     for (int64_t i = 0; error == 0 && i < _current.length_of_array; ++i) {
1135         struct block_translation_pair *block = &_current.block_translation[i];
1136         error = iter(checkpoint_count,
1137                      total_num_rows,
1138                      i,
1139                      block->u.diskoff,
1140                      block->size,
1141                      iter_extra);
1142     }
1143     for (int64_t i = 0; error == 0 && i < _checkpointed.length_of_array; ++i) {
1144         struct block_translation_pair *block =
1145             &_checkpointed.block_translation[i];
1146         error = iter(checkpoint_count - 1,
1147                      total_num_rows,
1148                      i,
1149                      block->u.diskoff,
1150                      block->size,
1151                      iter_extra);
1152     }
1153 
1154     _mutex_unlock();
1155     return error;
1156 }
1157