1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include "ft/serialize/block_table.h"
40 #include "ft/ft.h"
41 #include "ft/ft-cachetable-wrappers.h"
42 #include "ft/ft-internal.h"
43 #include "ft/logger/log-internal.h"
44 #include "ft/log_header.h"
45 #include "ft/node.h"
46 #include "ft/serialize/ft-serialize.h"
47 #include "ft/serialize/ft_node-serialize.h"
48 
49 #include <memory.h>
50 #include <toku_assert.h>
51 #include <portability/toku_atomic.h>
52 
53 toku_instr_key *ft_ref_lock_mutex_key;
54 
toku_reset_root_xid_that_created(FT ft,TXNID new_root_xid_that_created)55 void toku_reset_root_xid_that_created(FT ft, TXNID new_root_xid_that_created) {
56     // Reset the root_xid_that_created field to the given value.
57     // This redefines which xid created the dictionary.
58 
59     // hold lock around setting and clearing of dirty bit
60     // (see cooperative use of dirty bit in ft_begin_checkpoint())
61     toku_ft_lock(ft);
62     ft->h->root_xid_that_created = new_root_xid_that_created;
63     ft->h->set_dirty();
64     toku_ft_unlock(ft);
65 }
66 
67 static void
ft_destroy(FT ft)68 ft_destroy(FT ft) {
69     //header and checkpoint_header have same Blocktable pointer
70     //cannot destroy since it is still in use by CURRENT
71     assert(ft->h->type == FT_CURRENT);
72     ft->blocktable.destroy();
73     ft->cmp.destroy();
74     toku_destroy_dbt(&ft->descriptor.dbt);
75     toku_destroy_dbt(&ft->cmp_descriptor.dbt);
76     toku_ft_destroy_reflock(ft);
77     toku_free(ft->h);
78 }
79 
80 // Make a copy of the header for the purpose of a checkpoint
81 // Not reentrant for a single FT.
82 // See ft_checkpoint for explanation of why
83 // FT lock must be held.
84 static void
ft_copy_for_checkpoint_unlocked(FT ft,LSN checkpoint_lsn)85 ft_copy_for_checkpoint_unlocked(FT ft, LSN checkpoint_lsn) {
86     assert(ft->h->type == FT_CURRENT);
87     assert(ft->checkpoint_header == NULL);
88 
89     FT_HEADER XMEMDUP(ch, ft->h);
90     ch->type = FT_CHECKPOINT_INPROGRESS; //Different type
91     //printf("checkpoint_lsn=%" PRIu64 "\n", checkpoint_lsn.lsn);
92     ch->checkpoint_lsn = checkpoint_lsn;
93 
94     //ch->blocktable is SHARED between the two headers
95     ft->checkpoint_header = ch;
96 }
97 
98 void
toku_ft_free(FT ft)99 toku_ft_free (FT ft) {
100     ft_destroy(ft);
101     toku_free(ft);
102 }
103 
toku_ft_init_reflock(FT ft)104 void toku_ft_init_reflock(FT ft) {
105     toku_mutex_init(*ft_ref_lock_mutex_key, &ft->ft_ref_lock, nullptr);
106 }
107 
toku_ft_destroy_reflock(FT ft)108 void toku_ft_destroy_reflock(FT ft) { toku_mutex_destroy(&ft->ft_ref_lock); }
109 
110 void
toku_ft_grab_reflock(FT ft)111 toku_ft_grab_reflock(FT ft) {
112     toku_mutex_lock(&ft->ft_ref_lock);
113 }
114 
115 void
toku_ft_release_reflock(FT ft)116 toku_ft_release_reflock(FT ft) {
117     toku_mutex_unlock(&ft->ft_ref_lock);
118 }
119 
120 /////////////////////////////////////////////////////////////////////////
121 // Start of Functions that are callbacks to the cachefule
122 //
123 
124 // maps to cf->log_fassociate_during_checkpoint
125 static void
ft_log_fassociate_during_checkpoint(CACHEFILE cf,void * header_v)126 ft_log_fassociate_during_checkpoint (CACHEFILE cf, void *header_v) {
127     FT ft = (FT) header_v;
128     char* fname_in_env = toku_cachefile_fname_in_env(cf);
129     BYTESTRING bs = { .len = (uint32_t) strlen(fname_in_env), // don't include the NUL
130                       .data = fname_in_env };
131     TOKULOGGER logger = toku_cachefile_logger(cf);
132     FILENUM filenum = toku_cachefile_filenum(cf);
133     bool unlink_on_close = toku_cachefile_is_unlink_on_close(cf);
134     toku_log_fassociate(logger, NULL, 0, filenum, ft->h->flags, bs, unlink_on_close);
135 }
136 
137 // Maps to cf->begin_checkpoint_userdata
138 // Create checkpoint-in-progress versions of header and translation (btt)
139 // Has access to fd (it is protected).
140 //
141 // Not reentrant for a single FT (see ft_checkpoint)
ft_begin_checkpoint(LSN checkpoint_lsn,void * header_v)142 static void ft_begin_checkpoint (LSN checkpoint_lsn, void *header_v) {
143     FT ft = (FT) header_v;
144     // hold lock around copying and clearing of dirty bit
145     toku_ft_lock (ft);
146     assert(ft->h->type == FT_CURRENT);
147     assert(ft->checkpoint_header == NULL);
148     ft_copy_for_checkpoint_unlocked(ft, checkpoint_lsn);
149     ft->h->clear_dirty();             // this is only place this bit is cleared        (in currentheader)
150     ft->blocktable.note_start_checkpoint_unlocked();
151     toku_ft_unlock (ft);
152 }
153 
154 // #4922: Hack to remove data corruption race condition.
155 // Reading (and upgrading) a node up to version 19 causes this.
156 // We COULD skip this if we know that no nodes remained (as of last checkpoint)
157 // that are below version 19.
158 // If there are no nodes < version 19 this is harmless (field is unused).
159 // If there are, this will make certain the value is at least as low as necessary,
160 // and not much lower.  (Too low is good, too high can cause data corruption).
161 // TODO(yoni): If we ever stop supporting upgrades of nodes < version 19 we can delete this.
162 // TODO(yoni): If we know no nodes are left to upgrade, we can skip this. (Probably not worth doing).
163 static void
ft_hack_highest_unused_msn_for_upgrade_for_checkpoint(FT ft)164 ft_hack_highest_unused_msn_for_upgrade_for_checkpoint(FT ft) {
165     if (ft->h->layout_version_original < FT_LAYOUT_VERSION_19) {
166         ft->checkpoint_header->highest_unused_msn_for_upgrade = ft->h->highest_unused_msn_for_upgrade;
167     }
168 }
169 
170 // maps to cf->checkpoint_userdata
171 // Write checkpoint-in-progress versions of header and translation to disk (really to OS internal buffer).
172 // Copy current header's version of checkpoint_staging stat64info to checkpoint header.
173 // Must have access to fd (protected).
174 // Requires: all pending bits are clear.  This implies that no thread will modify the checkpoint_staging
175 // version of the stat64info.
176 //
177 // No locks are taken for checkpoint_count/lsn because this is single threaded.  Can be called by:
178 //  - ft_close
179 //  - end_checkpoint
180 // checkpoints hold references to FTs and so they cannot be closed during a checkpoint.
181 // ft_close is not reentrant for a single FT
182 // end_checkpoint is not reentrant period
ft_checkpoint(CACHEFILE cf,int fd,void * header_v)183 static void ft_checkpoint (CACHEFILE cf, int fd, void *header_v) {
184     FT ft = (FT) header_v;
185     FT_HEADER ch = ft->checkpoint_header;
186     assert(ch);
187     assert(ch->type == FT_CHECKPOINT_INPROGRESS);
188     if (ch->dirty()) {            // this is only place this bit is tested (in checkpoint_header)
189         TOKULOGGER logger = toku_cachefile_logger(cf);
190         if (logger) {
191             toku_logger_fsync_if_lsn_not_fsynced(logger, ch->checkpoint_lsn);
192         }
193         uint64_t now = (uint64_t) time(NULL);
194         ft->h->time_of_last_modification = now;
195         ch->time_of_last_modification = now;
196         ch->checkpoint_count++;
197         ft_hack_highest_unused_msn_for_upgrade_for_checkpoint(ft);
198         ch->on_disk_logical_rows =
199             ft->h->on_disk_logical_rows = ft->in_memory_logical_rows;
200 
201         // write translation and header to disk (or at least to OS internal buffer)
202         toku_serialize_ft_to(fd, ch, &ft->blocktable, ft->cf);
203         ch->clear_dirty();                      // this is only place this bit is cleared (in checkpoint_header)
204 
205         // fsync the cachefile
206         toku_cachefile_fsync(cf);
207         ft->h->checkpoint_count++;        // checkpoint succeeded, next checkpoint will save to alternate header location
208         ft->h->checkpoint_lsn = ch->checkpoint_lsn;  //Header updated.
209     } else {
210         ft->blocktable.note_skipped_checkpoint();
211     }
212 }
213 
214 // maps to cf->end_checkpoint_userdata
215 // free unused disk space
216 // (i.e. tell BlockAllocator to liberate blocks used by previous checkpoint).
217 // Must have access to fd (protected)
ft_end_checkpoint(CACHEFILE UU (cf),int fd,void * header_v)218 static void ft_end_checkpoint(CACHEFILE UU(cf), int fd, void *header_v) {
219     FT ft = (FT) header_v;
220     assert(ft->h->type == FT_CURRENT);
221     ft->blocktable.note_end_checkpoint(fd);
222     toku_free(ft->checkpoint_header);
223     ft->checkpoint_header = nullptr;
224 }
225 
226 // maps to cf->close_userdata
227 // Has access to fd (it is protected).
ft_close(CACHEFILE cachefile,int fd,void * header_v,bool oplsn_valid,LSN oplsn)228 static void ft_close(CACHEFILE cachefile, int fd, void *header_v, bool oplsn_valid, LSN oplsn) {
229     FT ft = (FT) header_v;
230     assert(ft->h->type == FT_CURRENT);
231     // We already have exclusive access to this field already, so skip the locking.
232     // This should already never fail.
233     invariant(!toku_ft_needed_unlocked(ft));
234     assert(ft->cf == cachefile);
235     TOKULOGGER logger = toku_cachefile_logger(cachefile);
236     LSN lsn = ZERO_LSN;
237     //Get LSN
238     if (oplsn_valid) {
239         //Use recovery-specified lsn
240         lsn = oplsn;
241         //Recovery cannot reduce lsn of a header.
242         if (lsn.lsn < ft->h->checkpoint_lsn.lsn) {
243             lsn = ft->h->checkpoint_lsn;
244         }
245     }
246     else {
247         //Get LSN from logger
248         lsn = ZERO_LSN; // if there is no logger, we use zero for the lsn
249         if (logger) {
250             char* fname_in_env = toku_cachefile_fname_in_env(cachefile);
251             assert(fname_in_env);
252             BYTESTRING bs = {.len=(uint32_t) strlen(fname_in_env), .data=fname_in_env};
253             if (!toku_cachefile_is_skip_log_recover_on_close(cachefile)) {
254                 toku_log_fclose(
255                     logger,
256                     &lsn,
257                     ft->h->dirty(),
258                     bs,
259                     toku_cachefile_filenum(cachefile));  // flush the log on
260                                                          // close (if new header
261                                                          // is being written),
262                                                          // otherwise it might
263                                                          // not make it out.
264                 toku_cachefile_do_log_recover_on_close(cachefile);
265             }
266         }
267     }
268     if (ft->h->dirty()) {               // this is the only place this bit is tested (in currentheader)
269         bool do_checkpoint = true;
270         if (logger && logger->rollback_cachefile == cachefile) {
271             do_checkpoint = false;
272         }
273         if (do_checkpoint) {
274             ft_begin_checkpoint(lsn, header_v);
275             ft_checkpoint(cachefile, fd, ft);
276             ft_end_checkpoint(cachefile, fd, header_v);
277             assert(!ft->h->dirty()); // dirty bit should be cleared by begin_checkpoint and never set again (because we're closing the dictionary)
278         }
279     }
280 }
281 
282 // maps to cf->free_userdata
ft_free(CACHEFILE cachefile UU (),void * header_v)283 static void ft_free(CACHEFILE cachefile UU(), void *header_v) {
284     FT ft = (FT) header_v;
285     toku_ft_free(ft);
286 }
287 
288 // maps to cf->note_pin_by_checkpoint
289 //Must be protected by ydb lock.
290 //Is only called by checkpoint begin, which holds it
ft_note_pin_by_checkpoint(CACHEFILE UU (cachefile),void * header_v)291 static void ft_note_pin_by_checkpoint (CACHEFILE UU(cachefile), void *header_v) {
292     // Note: open_close lock is held by checkpoint begin
293     FT ft = (FT) header_v;
294     toku_ft_grab_reflock(ft);
295     assert(!ft->pinned_by_checkpoint);
296     assert(toku_ft_needed_unlocked(ft));
297     ft->pinned_by_checkpoint = true;
298     toku_ft_release_reflock(ft);
299 }
300 
301 // Requires: the reflock is held.
unpin_by_checkpoint_callback(FT ft,void * extra)302 static void unpin_by_checkpoint_callback(FT ft, void *extra) {
303     invariant(extra == NULL);
304     invariant(ft->pinned_by_checkpoint);
305     ft->pinned_by_checkpoint = false;
306 }
307 
308 // maps to cf->note_unpin_by_checkpoint
309 //Must be protected by ydb lock.
310 //Called by end_checkpoint, which grabs ydb lock around note_unpin
ft_note_unpin_by_checkpoint(CACHEFILE UU (cachefile),void * header_v)311 static void ft_note_unpin_by_checkpoint (CACHEFILE UU(cachefile), void *header_v) {
312     FT ft = (FT) header_v;
313     toku_ft_remove_reference(ft, false, ZERO_LSN, unpin_by_checkpoint_callback, NULL);
314 }
315 
316 //
317 // End of Functions that are callbacks to the cachefile
318 /////////////////////////////////////////////////////////////////////////
319 
setup_initial_ft_root_node(FT ft,BLOCKNUM blocknum)320 static void setup_initial_ft_root_node(FT ft, BLOCKNUM blocknum) {
321     FTNODE XCALLOC(node);
322     toku_initialize_empty_ftnode(node, blocknum, 0, 1, ft->h->layout_version, ft->h->flags);
323     BP_STATE(node,0) = PT_AVAIL;
324 
325     uint32_t fullhash = toku_cachetable_hash(ft->cf, blocknum);
326     node->fullhash = fullhash;
327     toku_cachetable_put(ft->cf, blocknum, fullhash,
328                         node, make_ftnode_pair_attr(node),
329                         get_write_callbacks_for_node(ft),
330                         toku_ftnode_save_ct_pair);
331     toku_unpin_ftnode(ft, node);
332 }
333 
ft_init(FT ft,FT_OPTIONS options,CACHEFILE cf)334 static void ft_init(FT ft, FT_OPTIONS options, CACHEFILE cf) {
335     // fake, prevent unnecessary upgrade logic
336     ft->layout_version_read_from_disk = FT_LAYOUT_VERSION;
337     ft->checkpoint_header = NULL;
338 
339     toku_list_init(&ft->live_ft_handles);
340 
341     // intuitively, the comparator points to the FT's cmp descriptor
342     ft->cmp.create(options->compare_fun, &ft->cmp_descriptor, options->memcmp_magic);
343     ft->update_fun = options->update_fun;
344 
345     if (ft->cf != NULL) {
346         assert(ft->cf == cf);
347     }
348     ft->cf = cf;
349     ft->in_memory_stats = ZEROSTATS;
350 
351     setup_initial_ft_root_node(ft, ft->h->root_blocknum);
352     toku_cachefile_set_userdata(ft->cf,
353                                 ft,
354                                 ft_log_fassociate_during_checkpoint,
355                                 ft_close,
356                                 ft_free,
357                                 ft_checkpoint,
358                                 ft_begin_checkpoint,
359                                 ft_end_checkpoint,
360                                 ft_note_pin_by_checkpoint,
361                                 ft_note_unpin_by_checkpoint);
362 
363     ft->blocktable.verify_no_free_blocknums();
364 }
365 
366 
367 static FT_HEADER
ft_header_create(FT_OPTIONS options,BLOCKNUM root_blocknum,TXNID root_xid_that_created)368 ft_header_create(FT_OPTIONS options, BLOCKNUM root_blocknum, TXNID root_xid_that_created)
369 {
370     uint64_t now = (uint64_t) time(NULL);
371     struct ft_header h = {
372         .type = FT_CURRENT,
373         .dirty_ = 0,
374         .checkpoint_count = 0,
375         .checkpoint_lsn = ZERO_LSN,
376         .layout_version = FT_LAYOUT_VERSION,
377         .layout_version_original = FT_LAYOUT_VERSION,
378         .build_id = BUILD_ID,
379         .build_id_original = BUILD_ID,
380         .time_of_creation = now,
381         .root_xid_that_created = root_xid_that_created,
382         .time_of_last_modification = now,
383         .time_of_last_verification = 0,
384         .root_blocknum = root_blocknum,
385         .flags = options->flags,
386         .nodesize = options->nodesize,
387         .basementnodesize = options->basementnodesize,
388         .compression_method = options->compression_method,
389         .fanout = options->fanout,
390         .highest_unused_msn_for_upgrade = { .msn = (MIN_MSN.msn - 1) },
391         .max_msn_in_ft = ZERO_MSN,
392         .time_of_last_optimize_begin = 0,
393         .time_of_last_optimize_end = 0,
394         .count_of_optimize_in_progress = 0,
395         .count_of_optimize_in_progress_read_from_disk = 0,
396         .msn_at_start_of_last_completed_optimize = ZERO_MSN,
397         .on_disk_stats = ZEROSTATS,
398         .on_disk_logical_rows = 0
399     };
400     return (FT_HEADER) toku_xmemdup(&h, sizeof h);
401 }
402 
403 // allocate and initialize a fractal tree.
toku_ft_create(FT * ftp,FT_OPTIONS options,CACHEFILE cf,TOKUTXN txn)404 void toku_ft_create(FT *ftp, FT_OPTIONS options, CACHEFILE cf, TOKUTXN txn) {
405     invariant(ftp);
406 
407     FT XCALLOC(ft);
408     ft->h = ft_header_create(options, make_blocknum(0), (txn ? txn->txnid.parent_id64: TXNID_NONE));
409 
410     toku_ft_init_reflock(ft);
411 
412     // Assign blocknum for root block, also dirty the header
413     ft->blocktable.create();
414     ft->blocktable.allocate_blocknum(&ft->h->root_blocknum, ft);
415 
416     ft_init(ft, options, cf);
417 
418     *ftp = ft;
419 }
420 
421 // TODO: (Zardosht) get rid of ft parameter
toku_read_ft_and_store_in_cachefile(FT_HANDLE ft_handle,CACHEFILE cf,LSN max_acceptable_lsn,FT * header)422 int toku_read_ft_and_store_in_cachefile (FT_HANDLE ft_handle, CACHEFILE cf, LSN max_acceptable_lsn, FT *header)
423 // If the cachefile already has the header, then just get it.
424 // If the cachefile has not been initialized, then don't modify anything.
425 // max_acceptable_lsn is the latest acceptable checkpointed version of the file.
426 {
427     FT ft = nullptr;
428     if ((ft = (FT) toku_cachefile_get_userdata(cf)) != nullptr) {
429         *header = ft;
430         assert(ft_handle->options.update_fun == ft->update_fun);
431         return 0;
432     }
433 
434     int fd = toku_cachefile_get_fd(cf);
435     const char *fn = toku_cachefile_fname_in_env(cf);
436     int r = toku_deserialize_ft_from(fd, fn, max_acceptable_lsn, &ft);
437     if (r == TOKUDB_BAD_CHECKSUM) {
438         fprintf(stderr, "Checksum failure while reading header in file %s.\n", toku_cachefile_fname_in_env(cf));
439         assert(false);  // make absolutely sure we crash before doing anything else
440     } else if (r != 0) {
441         return r;
442     }
443 
444     invariant_notnull(ft);
445     // intuitively, the comparator points to the FT's cmp descriptor
446     ft->cmp.create(ft_handle->options.compare_fun, &ft->cmp_descriptor, ft_handle->options.memcmp_magic);
447     ft->update_fun = ft_handle->options.update_fun;
448     ft->cf = cf;
449     toku_cachefile_set_userdata(cf,
450                                 reinterpret_cast<void *>(ft),
451                                 ft_log_fassociate_during_checkpoint,
452                                 ft_close,
453                                 ft_free,
454                                 ft_checkpoint,
455                                 ft_begin_checkpoint,
456                                 ft_end_checkpoint,
457                                 ft_note_pin_by_checkpoint,
458                                 ft_note_unpin_by_checkpoint);
459     *header = ft;
460     return 0;
461 }
462 
463 void
toku_ft_note_ft_handle_open(FT ft,FT_HANDLE live)464 toku_ft_note_ft_handle_open(FT ft, FT_HANDLE live) {
465     toku_ft_grab_reflock(ft);
466     live->ft = ft;
467     toku_list_push(&ft->live_ft_handles, &live->live_ft_handle_link);
468     toku_ft_release_reflock(ft);
469 }
470 
471 // the reference count for a ft is the number of txn's that
472 // touched it plus the number of open handles plus one if
473 // pinned by a checkpoint.
474 static int
ft_get_reference_count(FT ft)475 ft_get_reference_count(FT ft) {
476     uint32_t pinned_by_checkpoint = ft->pinned_by_checkpoint ? 1 : 0;
477     int num_handles = toku_list_num_elements_est(&ft->live_ft_handles);
478     return pinned_by_checkpoint + ft->num_txns + num_handles;
479 }
480 
481 // a ft is needed in memory iff its reference count is non-zero
482 bool
toku_ft_needed_unlocked(FT ft)483 toku_ft_needed_unlocked(FT ft) {
484     return ft_get_reference_count(ft) != 0;
485 }
486 
487 // get the reference count and return true if it was 1
488 bool
toku_ft_has_one_reference_unlocked(FT ft)489 toku_ft_has_one_reference_unlocked(FT ft) {
490     return ft_get_reference_count(ft) == 1;
491 }
492 
493 // evict a ft from memory by closing its cachefile. any future work
494 // will have to read in the ft in a new cachefile and new FT object.
toku_ft_evict_from_memory(FT ft,bool oplsn_valid,LSN oplsn)495 void toku_ft_evict_from_memory(FT ft, bool oplsn_valid, LSN oplsn) {
496     assert(ft->cf);
497     toku_cachefile_close(&ft->cf, oplsn_valid, oplsn);
498 }
499 
500 // Verifies there exists exactly one ft handle and returns it.
toku_ft_get_only_existing_ft_handle(FT ft)501 FT_HANDLE toku_ft_get_only_existing_ft_handle(FT ft) {
502     FT_HANDLE ft_handle_ret = NULL;
503     toku_ft_grab_reflock(ft);
504     assert(toku_list_num_elements_est(&ft->live_ft_handles) == 1);
505     ft_handle_ret = toku_list_struct(toku_list_head(&ft->live_ft_handles), struct ft_handle, live_ft_handle_link);
506     toku_ft_release_reflock(ft);
507     return ft_handle_ret;
508 }
509 
510 // Purpose: set fields in ft_header to capture accountability info for start of HOT optimize.
511 // Note: HOT accountability variables in header are modified only while holding header lock.
512 //       (Header lock is really needed for touching the dirty bit, but it's useful and
513 //       convenient here for keeping the HOT variables threadsafe.)
514 void
toku_ft_note_hot_begin(FT_HANDLE ft_handle)515 toku_ft_note_hot_begin(FT_HANDLE ft_handle) {
516     FT ft = ft_handle->ft;
517     time_t now = time(NULL);
518 
519     // hold lock around setting and clearing of dirty bit
520     // (see cooperative use of dirty bit in ft_begin_checkpoint())
521     toku_ft_lock(ft);
522     ft->h->time_of_last_optimize_begin = now;
523     ft->h->count_of_optimize_in_progress++;
524     ft->h->set_dirty();
525     toku_ft_unlock(ft);
526 }
527 
528 
529 // Purpose: set fields in ft_header to capture accountability info for end of HOT optimize.
530 // Note: See note for toku_ft_note_hot_begin().
531 void
toku_ft_note_hot_complete(FT_HANDLE ft_handle,bool success,MSN msn_at_start_of_hot)532 toku_ft_note_hot_complete(FT_HANDLE ft_handle, bool success, MSN msn_at_start_of_hot) {
533     FT ft = ft_handle->ft;
534     time_t now = time(NULL);
535 
536     toku_ft_lock(ft);
537     ft->h->count_of_optimize_in_progress--;
538     if (success) {
539         ft->h->time_of_last_optimize_end = now;
540         ft->h->msn_at_start_of_last_completed_optimize = msn_at_start_of_hot;
541         // If we just successfully completed an optimization and no other thread is performing
542         // an optimization, then the number of optimizations in progress is zero.
543         // If there was a crash during a HOT optimization, this is how count_of_optimize_in_progress
544         // would be reset to zero on the disk after recovery from that crash.
545         if (ft->h->count_of_optimize_in_progress == ft->h->count_of_optimize_in_progress_read_from_disk)
546             ft->h->count_of_optimize_in_progress = 0;
547     }
548     ft->h->set_dirty();
549     toku_ft_unlock(ft);
550 }
551 
552 
553 void
toku_ft_init(FT ft,BLOCKNUM root_blocknum_on_disk,LSN checkpoint_lsn,TXNID root_xid_that_created,uint32_t target_nodesize,uint32_t target_basementnodesize,enum toku_compression_method compression_method,uint32_t fanout)554 toku_ft_init(FT ft,
555              BLOCKNUM root_blocknum_on_disk,
556              LSN checkpoint_lsn,
557              TXNID root_xid_that_created,
558              uint32_t target_nodesize,
559              uint32_t target_basementnodesize,
560              enum toku_compression_method compression_method,
561              uint32_t fanout
562              )
563 {
564     memset(ft, 0, sizeof *ft);
565     struct ft_options options = {
566         .nodesize = target_nodesize,
567         .basementnodesize = target_basementnodesize,
568         .compression_method = compression_method,
569         .fanout = fanout,
570         .flags = 0,
571         .memcmp_magic = 0,
572         .compare_fun = NULL,
573         .update_fun = NULL
574     };
575     ft->h = ft_header_create(&options, root_blocknum_on_disk, root_xid_that_created);
576     ft->h->checkpoint_count = 1;
577     ft->h->checkpoint_lsn   = checkpoint_lsn;
578 }
579 
580 // Open an ft for use by redirect.  The new ft must have the same dict_id as the old_ft passed in.  (FILENUM is assigned by the ft_handle_open() function.)
581 static int
ft_handle_open_for_redirect(FT_HANDLE * new_ftp,const char * fname_in_env,TOKUTXN txn,FT old_ft)582 ft_handle_open_for_redirect(FT_HANDLE *new_ftp, const char *fname_in_env, TOKUTXN txn, FT old_ft) {
583     FT_HANDLE ft_handle;
584     assert(old_ft->dict_id.dictid != DICTIONARY_ID_NONE.dictid);
585     toku_ft_handle_create(&ft_handle);
586     toku_ft_set_bt_compare(ft_handle, old_ft->cmp.get_compare_func());
587     toku_ft_set_update(ft_handle, old_ft->update_fun);
588     toku_ft_handle_set_nodesize(ft_handle, old_ft->h->nodesize);
589     toku_ft_handle_set_basementnodesize(ft_handle, old_ft->h->basementnodesize);
590     toku_ft_handle_set_compression_method(ft_handle, old_ft->h->compression_method);
591     toku_ft_handle_set_fanout(ft_handle, old_ft->h->fanout);
592     CACHETABLE ct = toku_cachefile_get_cachetable(old_ft->cf);
593     int r = toku_ft_handle_open_with_dict_id(ft_handle, fname_in_env, 0, 0, ct, txn, old_ft->dict_id);
594     if (r != 0) {
595         goto cleanup;
596     }
597     assert(ft_handle->ft->dict_id.dictid == old_ft->dict_id.dictid);
598     *new_ftp = ft_handle;
599 
600  cleanup:
601     if (r != 0) {
602         toku_ft_handle_close(ft_handle);
603     }
604     return r;
605 }
606 
607 // This function performs most of the work to redirect a dictionary to different file.
608 // It is called for redirect and to abort a redirect.  (This function is almost its own inverse.)
609 static int
dictionary_redirect_internal(const char * dst_fname_in_env,FT src_ft,TOKUTXN txn,FT * dst_ftp)610 dictionary_redirect_internal(const char *dst_fname_in_env, FT src_ft, TOKUTXN txn, FT *dst_ftp) {
611     int r;
612 
613     FILENUM src_filenum = toku_cachefile_filenum(src_ft->cf);
614     FILENUM dst_filenum = FILENUM_NONE;
615 
616     FT dst_ft = NULL;
617     struct toku_list *list;
618     // open a dummy ft based off of
619     // dst_fname_in_env to get the header
620     // then we will change all the ft's to have
621     // their headers point to dst_ft instead of src_ft
622     FT_HANDLE tmp_dst_ft = NULL;
623     r = ft_handle_open_for_redirect(&tmp_dst_ft, dst_fname_in_env, txn, src_ft);
624     if (r != 0) {
625         goto cleanup;
626     }
627     dst_ft = tmp_dst_ft->ft;
628 
629     // some sanity checks on dst_filenum
630     dst_filenum = toku_cachefile_filenum(dst_ft->cf);
631     assert(dst_filenum.fileid!=FILENUM_NONE.fileid);
632     assert(dst_filenum.fileid!=src_filenum.fileid); //Cannot be same file.
633 
634     // for each live ft_handle, ft_handle->ft is currently src_ft
635     // we want to change it to dummy_dst
636     toku_ft_grab_reflock(src_ft);
637     while (!toku_list_empty(&src_ft->live_ft_handles)) {
638         list = src_ft->live_ft_handles.next;
639         FT_HANDLE src_handle = NULL;
640         src_handle = toku_list_struct(list, struct ft_handle, live_ft_handle_link);
641 
642         toku_list_remove(&src_handle->live_ft_handle_link);
643 
644         toku_ft_note_ft_handle_open(dst_ft, src_handle);
645         if (src_handle->redirect_callback) {
646             src_handle->redirect_callback(src_handle, src_handle->redirect_callback_extra);
647         }
648     }
649     assert(dst_ft);
650     // making sure that we are not leaking src_ft
651     assert(toku_ft_needed_unlocked(src_ft));
652     toku_ft_release_reflock(src_ft);
653 
654     toku_ft_handle_close(tmp_dst_ft);
655 
656     *dst_ftp = dst_ft;
657 cleanup:
658     return r;
659 }
660 
661 
662 
663 //This is the 'abort redirect' function.  The redirect of old_ft to new_ft was done
664 //and now must be undone, so here we redirect new_ft back to old_ft.
665 int
toku_dictionary_redirect_abort(FT old_ft,FT new_ft,TOKUTXN txn)666 toku_dictionary_redirect_abort(FT old_ft, FT new_ft, TOKUTXN txn) {
667     char *old_fname_in_env = toku_cachefile_fname_in_env(old_ft->cf);
668     int r;
669     {
670         FILENUM old_filenum = toku_cachefile_filenum(old_ft->cf);
671         FILENUM new_filenum = toku_cachefile_filenum(new_ft->cf);
672         assert(old_filenum.fileid!=new_filenum.fileid); //Cannot be same file.
673 
674         //No living fts in old header.
675         toku_ft_grab_reflock(old_ft);
676         assert(toku_list_empty(&old_ft->live_ft_handles));
677         toku_ft_release_reflock(old_ft);
678     }
679 
680     FT dst_ft;
681     // redirect back from new_ft to old_ft
682     r = dictionary_redirect_internal(old_fname_in_env, new_ft, txn, &dst_ft);
683     if (r == 0) {
684         assert(dst_ft == old_ft);
685     }
686     return r;
687 }
688 
689 /****
690  * on redirect or abort:
691  *  if redirect txn_note_doing_work(txn)
692  *  if redirect connect src ft to txn (txn modified this ft)
693  *  for each src ft
694  *    open ft to dst file (create new ft struct)
695  *    if redirect connect dst ft to txn
696  *    redirect db to new ft
697  *    redirect cursors to new ft
698  *  close all src fts
699  *  if redirect make rollback log entry
700  *
701  * on commit:
702  *   nothing to do
703  *
704  *****/
705 
706 int
toku_dictionary_redirect(const char * dst_fname_in_env,FT_HANDLE old_ft_h,TOKUTXN txn)707 toku_dictionary_redirect (const char *dst_fname_in_env, FT_HANDLE old_ft_h, TOKUTXN txn) {
708 // Input args:
709 //   new file name for dictionary (relative to env)
710 //   old_ft_h is a live ft of open handle ({DB, FT_HANDLE} pair) that currently refers to old dictionary file.
711 //   (old_ft_h may be one of many handles to the dictionary.)
712 //   txn that created the loader
713 // Requires:
714 //   multi operation lock is held.
715 //   The ft is open.  (which implies there can be no zombies.)
716 //   The new file must be a valid dictionary.
717 //   The block size and flags in the new file must match the existing FT.
718 //   The new file must already have its descriptor in it (and it must match the existing descriptor).
719 // Effect:
720 //   Open new FTs (and related header and cachefile) to the new dictionary file with a new FILENUM.
721 //   Redirect all DBs that point to fts that point to the old file to point to fts that point to the new file.
722 //   Copy the dictionary id (dict_id) from the header of the original file to the header of the new file.
723 //   Create a rollback log entry.
724 //   The original FT, header, cachefile and file remain unchanged.  They will be cleaned up on commmit.
725 //   If the txn aborts, then this operation will be undone
726     int r;
727 
728     FT old_ft = old_ft_h->ft;
729 
730     // dst file should not be open.  (implies that dst and src are different because src must be open.)
731     {
732         CACHETABLE ct = toku_cachefile_get_cachetable(old_ft->cf);
733         CACHEFILE cf;
734         r = toku_cachefile_of_iname_in_env(ct, dst_fname_in_env, &cf);
735         if (r==0) {
736             r = EINVAL;
737             goto cleanup;
738         }
739         assert(r==ENOENT);
740         r = 0;
741     }
742 
743     if (txn) {
744         toku_txn_maybe_note_ft(txn, old_ft);  // mark old ft as touched by this txn
745     }
746 
747     FT new_ft;
748     r = dictionary_redirect_internal(dst_fname_in_env, old_ft, txn, &new_ft);
749     if (r != 0) {
750         goto cleanup;
751     }
752 
753     // make rollback log entry
754     if (txn) {
755         toku_txn_maybe_note_ft(txn, new_ft); // mark new ft as touched by this txn
756 
757         // There is no recovery log entry for redirect,
758         // and rollback log entries are not allowed for read-only transactions.
759         // Normally the recovery log entry would ensure the begin was logged.
760         if (!txn->begin_was_logged) {
761           toku_maybe_log_begin_txn_for_write_operation(txn);
762         }
763         FILENUM old_filenum = toku_cachefile_filenum(old_ft->cf);
764         FILENUM new_filenum = toku_cachefile_filenum(new_ft->cf);
765         toku_logger_save_rollback_dictionary_redirect(txn, old_filenum, new_filenum);
766     }
767 
768 cleanup:
769     return r;
770 }
771 
772 // Insert reference to transaction into ft
773 void
toku_ft_add_txn_ref(FT ft)774 toku_ft_add_txn_ref(FT ft) {
775     toku_ft_grab_reflock(ft);
776     ++ft->num_txns;
777     toku_ft_release_reflock(ft);
778 }
779 
780 static void
remove_txn_ref_callback(FT ft,void * UU (context))781 remove_txn_ref_callback(FT ft, void *UU(context)) {
782     invariant(ft->num_txns > 0);
783     --ft->num_txns;
784 }
785 
786 void
toku_ft_remove_txn_ref(FT ft)787 toku_ft_remove_txn_ref(FT ft) {
788     toku_ft_remove_reference(ft, false, ZERO_LSN, remove_txn_ref_callback, NULL);
789 }
790 
toku_calculate_root_offset_pointer(FT ft,CACHEKEY * root_key,uint32_t * roothash)791 void toku_calculate_root_offset_pointer (
792     FT ft,
793     CACHEKEY* root_key,
794     uint32_t *roothash
795     )
796 {
797     *roothash = toku_cachetable_hash(ft->cf, ft->h->root_blocknum);
798     *root_key = ft->h->root_blocknum;
799 }
800 
toku_ft_set_new_root_blocknum(FT ft,CACHEKEY new_root_key)801 void toku_ft_set_new_root_blocknum(
802     FT ft,
803     CACHEKEY new_root_key
804     )
805 {
806     ft->h->root_blocknum = new_root_key;
807 }
808 
toku_ft_checkpoint_lsn(FT ft)809 LSN toku_ft_checkpoint_lsn(FT ft) {
810     return ft->h->checkpoint_lsn;
811 }
812 
813 void
toku_ft_stat64(FT ft,struct ftstat64_s * s)814 toku_ft_stat64 (FT ft, struct ftstat64_s *s) {
815     s->fsize = toku_cachefile_size(ft->cf);
816     // just use the in memory stats from the header
817     // prevent appearance of negative numbers for numrows, numbytes
818     // if the logical count was never properly re-counted on an upgrade,
819     // return the existing physical count instead.
820     int64_t n;
821     if (ft->in_memory_logical_rows == (uint64_t)-1) {
822         n = ft->in_memory_stats.numrows;
823     } else {
824         n = ft->in_memory_logical_rows;
825     }
826     if (n < 0) {
827         n = 0;
828     }
829     s->nkeys = s->ndata = n;
830     n = ft->in_memory_stats.numbytes;
831     if (n < 0) {
832         n = 0;
833     }
834     s->dsize = n;
835     s->create_time_sec = ft->h->time_of_creation;
836     s->modify_time_sec = ft->h->time_of_last_modification;
837     s->verify_time_sec = ft->h->time_of_last_verification;
838 }
839 
toku_ft_get_fractal_tree_info64(FT ft,struct ftinfo64 * info)840 void toku_ft_get_fractal_tree_info64(FT ft, struct ftinfo64 *info) {
841     ft->blocktable.get_info64(info);
842 }
843 
toku_ft_iterate_fractal_tree_block_map(FT ft,int (* iter)(uint64_t,int64_t,int64_t,int64_t,int64_t,void *),void * iter_extra)844 int toku_ft_iterate_fractal_tree_block_map(FT ft, int (*iter)(uint64_t,int64_t,int64_t,int64_t,int64_t,void*), void *iter_extra) {
845     uint64_t this_checkpoint_count = ft->h->checkpoint_count;
846     return ft->blocktable.iterate_translation_tables(this_checkpoint_count, iter, iter_extra);
847 }
848 
849 void
toku_ft_update_descriptor(FT ft,DESCRIPTOR desc)850 toku_ft_update_descriptor(FT ft, DESCRIPTOR desc)
851 // Effect: Changes the descriptor in a tree (log the change, make sure it makes it to disk eventually).
852 // requires: the ft is fully user-opened with a valid cachefile.
853 //           descriptor updates cannot happen in parallel for an FT
854 //           (ydb layer uses a row lock to enforce this)
855 {
856     assert(ft->cf);
857     int fd = toku_cachefile_get_fd(ft->cf);
858     toku_ft_update_descriptor_with_fd(ft, desc, fd);
859 }
860 
861 // upadate the descriptor for an ft and serialize it using
862 // the given descriptor instead of reading the descriptor
863 // from the ft's cachefile. we do this so serialize code can
864 // update a descriptor before the ft is fully opened and has
865 // a valid cachefile.
866 void
toku_ft_update_descriptor_with_fd(FT ft,DESCRIPTOR desc,int fd)867 toku_ft_update_descriptor_with_fd(FT ft, DESCRIPTOR desc, int fd) {
868     // the checksum is four bytes, so that's where the magic number comes from
869     // make space for the new descriptor and write it out to disk
870     DISKOFF offset, size;
871     size = toku_serialize_descriptor_size(desc) + 4;
872     ft->blocktable.realloc_descriptor_on_disk(size, &offset, ft, fd);
873     toku_serialize_descriptor_contents_to_fd(fd, desc, offset);
874 
875     // cleanup the old descriptor and set the in-memory descriptor to the new one
876     toku_destroy_dbt(&ft->descriptor.dbt);
877     toku_clone_dbt(&ft->descriptor.dbt, desc->dbt);
878 }
879 
toku_ft_update_cmp_descriptor(FT ft)880 void toku_ft_update_cmp_descriptor(FT ft) {
881     // cleanup the old cmp descriptor and clone it as the in-memory descriptor
882     toku_destroy_dbt(&ft->cmp_descriptor.dbt);
883     toku_clone_dbt(&ft->cmp_descriptor.dbt, ft->descriptor.dbt);
884 }
885 
toku_ft_get_descriptor(FT_HANDLE ft_handle)886 DESCRIPTOR toku_ft_get_descriptor(FT_HANDLE ft_handle) {
887     return &ft_handle->ft->descriptor;
888 }
889 
toku_ft_get_cmp_descriptor(FT_HANDLE ft_handle)890 DESCRIPTOR toku_ft_get_cmp_descriptor(FT_HANDLE ft_handle) {
891     return &ft_handle->ft->cmp_descriptor;
892 }
893 
toku_ft_update_stats(STAT64INFO headerstats,STAT64INFO_S delta)894 void toku_ft_update_stats(STAT64INFO headerstats, STAT64INFO_S delta) {
895     (void) toku_sync_fetch_and_add(&(headerstats->numrows),  delta.numrows);
896     (void) toku_sync_fetch_and_add(&(headerstats->numbytes), delta.numbytes);
897 }
898 
toku_ft_decrease_stats(STAT64INFO headerstats,STAT64INFO_S delta)899 void toku_ft_decrease_stats(STAT64INFO headerstats, STAT64INFO_S delta) {
900     (void) toku_sync_fetch_and_sub(&(headerstats->numrows),  delta.numrows);
901     (void) toku_sync_fetch_and_sub(&(headerstats->numbytes), delta.numbytes);
902 }
903 
toku_ft_adjust_logical_row_count(FT ft,int64_t delta)904 void toku_ft_adjust_logical_row_count(FT ft, int64_t delta) {
905     // In order to make sure that the correct count is returned from
906     // toku_ft_stat64, the ft->(in_memory|on_disk)_logical_rows _MUST_NOT_ be
907     // modified from anywhere else from here with the exceptions of
908     // serializing in a header, initializing a new header and analyzing
909     // an index for a logical_row count.
910     // The gist is that on an index upgrade, all logical_rows values
911     // in the ft header are set to -1 until an analyze can reset it to an
912     // accurate value. Until then, the physical count from in_memory_stats
913     // must be returned in toku_ft_stat64.
914     if (delta != 0 && ft->in_memory_logical_rows != (uint64_t)-1) {
915         toku_sync_fetch_and_add(&(ft->in_memory_logical_rows), delta);
916         if (ft->in_memory_logical_rows == (uint64_t)-1) {
917             toku_sync_fetch_and_add(&(ft->in_memory_logical_rows), 1);
918         }
919     }
920 }
921 
toku_ft_remove_reference(FT ft,bool oplsn_valid,LSN oplsn,remove_ft_ref_callback remove_ref,void * extra)922 void toku_ft_remove_reference(
923     FT ft,
924     bool oplsn_valid,
925     LSN oplsn,
926     remove_ft_ref_callback remove_ref,
927     void *extra) {
928 
929     toku_ft_grab_reflock(ft);
930     if (toku_ft_has_one_reference_unlocked(ft)) {
931         toku_ft_release_reflock(ft);
932 
933         toku_ft_open_close_lock();
934         toku_ft_grab_reflock(ft);
935 
936         remove_ref(ft, extra);
937         bool needed = toku_ft_needed_unlocked(ft);
938         toku_ft_release_reflock(ft);
939 
940         // if we're running during recovery, we must close the underlying ft.
941         // we know we're running in recovery if we were passed a valid lsn.
942         if (oplsn_valid) {
943             assert(!needed);
944         }
945         if (!needed) {
946             // close header
947             toku_ft_evict_from_memory(ft, oplsn_valid, oplsn);
948         }
949 
950         toku_ft_open_close_unlock();
951     }
952     else {
953         remove_ref(ft, extra);
954         toku_ft_release_reflock(ft);
955     }
956 }
957 
toku_ft_set_nodesize(FT ft,unsigned int nodesize)958 void toku_ft_set_nodesize(FT ft, unsigned int nodesize) {
959     toku_ft_lock(ft);
960     ft->h->nodesize = nodesize;
961     ft->h->set_dirty();
962     toku_ft_unlock(ft);
963 }
964 
toku_ft_get_nodesize(FT ft,unsigned int * nodesize)965 void toku_ft_get_nodesize(FT ft, unsigned int *nodesize) {
966     toku_ft_lock(ft);
967     *nodesize = ft->h->nodesize;
968     toku_ft_unlock(ft);
969 }
970 
toku_ft_set_basementnodesize(FT ft,unsigned int basementnodesize)971 void toku_ft_set_basementnodesize(FT ft, unsigned int basementnodesize) {
972     toku_ft_lock(ft);
973     ft->h->basementnodesize = basementnodesize;
974     ft->h->set_dirty();
975     toku_ft_unlock(ft);
976 }
977 
toku_ft_get_basementnodesize(FT ft,unsigned int * basementnodesize)978 void toku_ft_get_basementnodesize(FT ft, unsigned int *basementnodesize) {
979     toku_ft_lock(ft);
980     *basementnodesize = ft->h->basementnodesize;
981     toku_ft_unlock(ft);
982 }
983 
toku_ft_set_compression_method(FT ft,enum toku_compression_method method)984 void toku_ft_set_compression_method(FT ft, enum toku_compression_method method) {
985     toku_ft_lock(ft);
986     ft->h->compression_method = method;
987     ft->h->set_dirty();
988     toku_ft_unlock(ft);
989 }
990 
toku_ft_get_compression_method(FT ft,enum toku_compression_method * methodp)991 void toku_ft_get_compression_method(FT ft, enum toku_compression_method *methodp) {
992     toku_ft_lock(ft);
993     *methodp = ft->h->compression_method;
994     toku_ft_unlock(ft);
995 }
996 
toku_ft_set_fanout(FT ft,unsigned int fanout)997 void toku_ft_set_fanout(FT ft, unsigned int fanout) {
998     toku_ft_lock(ft);
999     ft->h->fanout = fanout;
1000     ft->h->set_dirty();
1001     toku_ft_unlock(ft);
1002 }
1003 
toku_ft_get_fanout(FT ft,unsigned int * fanout)1004 void toku_ft_get_fanout(FT ft, unsigned int *fanout) {
1005     toku_ft_lock(ft);
1006     *fanout = ft->h->fanout;
1007     toku_ft_unlock(ft);
1008 }
1009 
1010 // mark the ft as a blackhole. any message injections will be a no op.
toku_ft_set_blackhole(FT_HANDLE ft_handle)1011 void toku_ft_set_blackhole(FT_HANDLE ft_handle) {
1012     ft_handle->ft->blackhole = true;
1013 }
1014 
1015 struct garbage_helper_extra {
1016     FT ft;
1017     size_t total_space;
1018     size_t used_space;
1019 };
1020 
1021 static int
garbage_leafentry_helper(const void * key UU (),const uint32_t keylen,const LEAFENTRY & le,uint32_t UU (idx),struct garbage_helper_extra * const info)1022 garbage_leafentry_helper(const void* key UU(), const uint32_t keylen, const LEAFENTRY & le, uint32_t UU(idx), struct garbage_helper_extra * const info) {
1023     //TODO #warning need to reanalyze for split
1024     info->total_space += leafentry_disksize(le) + keylen + sizeof(keylen);
1025     if (!le_latest_is_del(le)) {
1026         info->used_space += LE_CLEAN_MEMSIZE(le_latest_vallen(le)) + keylen + sizeof(keylen);
1027     }
1028     return 0;
1029 }
1030 
1031 static int
garbage_helper(BLOCKNUM blocknum,int64_t UU (size),int64_t UU (address),void * extra)1032 garbage_helper(BLOCKNUM blocknum, int64_t UU(size), int64_t UU(address), void *extra) {
1033     struct garbage_helper_extra *CAST_FROM_VOIDP(info, extra);
1034     FTNODE node;
1035     FTNODE_DISK_DATA ndd;
1036     ftnode_fetch_extra bfe;
1037     bfe.create_for_full_read(info->ft);
1038     int fd = toku_cachefile_get_fd(info->ft->cf);
1039     int r = toku_deserialize_ftnode_from(fd, blocknum, 0, &node, &ndd, &bfe);
1040     if (r != 0) {
1041         goto no_node;
1042     }
1043     if (node->height > 0) {
1044         goto exit;
1045     }
1046     for (int i = 0; i < node->n_children; ++i) {
1047         bn_data* bd = BLB_DATA(node, i);
1048         r = bd->iterate<struct garbage_helper_extra, garbage_leafentry_helper>(info);
1049         if (r != 0) {
1050             goto exit;
1051         }
1052     }
1053     {
1054         float a = info->used_space, b=info->total_space;
1055         float percentage = (1 - (a / b)) * 100;
1056         printf("LeafNode# %d has %d BasementNodes and %2.1f%% of the allocated space is garbage\n", (int)blocknum.b, node->n_children, percentage);
1057     }
1058 exit:
1059     toku_ftnode_free(&node);
1060     toku_free(ndd);
1061 no_node:
1062     return r;
1063 }
1064 
toku_ft_get_garbage(FT ft,uint64_t * total_space,uint64_t * used_space)1065 void toku_ft_get_garbage(FT ft, uint64_t *total_space, uint64_t *used_space) {
1066 // Effect: Iterates the FT's blocktable and calculates the total and used space for leaf blocks.
1067 // Note: It is ok to call this function concurrently with reads/writes to the table since
1068 //       the blocktable lock is held, which means no new allocations or file writes can occur.
1069     invariant_notnull(total_space);
1070     invariant_notnull(used_space);
1071     struct garbage_helper_extra info = {
1072         .ft = ft,
1073         .total_space = 0,
1074         .used_space = 0
1075     };
1076     ft->blocktable.iterate(block_table::TRANSLATION_CHECKPOINTED, garbage_helper, &info, true, true);
1077     *total_space = info.total_space;
1078     *used_space = info.used_space;
1079 }
1080 
1081 
1082 #if !defined(TOKUDB_REVISION)
1083 #error
1084 #endif
1085 
1086 #define xstr(X) str(X)
1087 #define str(X) #X
1088 #define static_version_string xstr(DB_VERSION_MAJOR) "." \
1089                               xstr(DB_VERSION_MINOR) "." \
1090                               xstr(DB_VERSION_PATCH) " build " \
1091                               xstr(TOKUDB_REVISION)
1092 struct toku_product_name_strings_struct toku_product_name_strings;
1093 
1094 char toku_product_name[TOKU_MAX_PRODUCT_NAME_LENGTH];
tokuft_update_product_name_strings(void)1095 void tokuft_update_product_name_strings(void) {
1096     // DO ALL STRINGS HERE.. maybe have a separate FT layer version as well
1097     {
1098         int n = snprintf(toku_product_name_strings.db_version,
1099                          sizeof(toku_product_name_strings.db_version),
1100                          "%s %s", toku_product_name, static_version_string);
1101         assert(n >= 0);
1102         assert((unsigned)n < sizeof(toku_product_name_strings.db_version));
1103     }
1104     {
1105         int n = snprintf(toku_product_name_strings.fileopsdirectory,
1106                          sizeof(toku_product_name_strings.fileopsdirectory),
1107                          "%s.directory", toku_product_name);
1108         assert(n >= 0);
1109         assert((unsigned)n < sizeof(toku_product_name_strings.fileopsdirectory));
1110     }
1111     {
1112         int n = snprintf(toku_product_name_strings.environmentdictionary,
1113                          sizeof(toku_product_name_strings.environmentdictionary),
1114                          "%s.environment", toku_product_name);
1115         assert(n >= 0);
1116         assert((unsigned)n < sizeof(toku_product_name_strings.environmentdictionary));
1117     }
1118     {
1119         int n = snprintf(toku_product_name_strings.rollback_cachefile,
1120                          sizeof(toku_product_name_strings.rollback_cachefile),
1121                          "%s.rollback", toku_product_name);
1122         assert(n >= 0);
1123         assert((unsigned)n < sizeof(toku_product_name_strings.rollback_cachefile));
1124     }
1125     {
1126         int n = snprintf(toku_product_name_strings.single_process_lock,
1127                          sizeof(toku_product_name_strings.single_process_lock),
1128                          "__%s_lock_dont_delete_me", toku_product_name);
1129         assert(n >= 0);
1130         assert((unsigned)n < sizeof(toku_product_name_strings.single_process_lock));
1131     }
1132 }
1133 #undef xstr
1134 #undef str
1135 
1136 int
toku_single_process_lock(const char * lock_dir,const char * which,int * lockfd)1137 toku_single_process_lock(const char *lock_dir, const char *which, int *lockfd) {
1138     if (!lock_dir)
1139         return ENOENT;
1140     int namelen=strlen(lock_dir)+strlen(which);
1141     char lockfname[namelen+sizeof("/_") + strlen(toku_product_name_strings.single_process_lock)];
1142 
1143     int l = snprintf(lockfname, sizeof(lockfname), "%s/%s_%s",
1144                      lock_dir, toku_product_name_strings.single_process_lock, which);
1145     assert(l+1 == (signed)(sizeof(lockfname)));
1146     *lockfd = toku_os_lock_file(lockfname);
1147     if (*lockfd < 0) {
1148         int e = get_error_errno();
1149         fprintf(stderr, "Couldn't start tokuft because some other tokuft process is using the same directory [%s] for [%s]\n", lock_dir, which);
1150         return e;
1151     }
1152     return 0;
1153 }
1154 
1155 int
toku_single_process_unlock(int * lockfd)1156 toku_single_process_unlock(int *lockfd) {
1157     int fd = *lockfd;
1158     *lockfd = -1;
1159     if (fd>=0) {
1160         int r = toku_os_unlock_file(fd);
1161         if (r != 0)
1162             return get_error_errno();
1163     }
1164     return 0;
1165 }
1166 
1167 int tokuft_num_envs = 0;
1168 int
db_env_set_toku_product_name(const char * name)1169 db_env_set_toku_product_name(const char *name) {
1170     if (tokuft_num_envs > 0) {
1171         return EINVAL;
1172     }
1173     if (!name || strlen(name) < 1) {
1174         return EINVAL;
1175     }
1176     if (strlen(name) >= sizeof(toku_product_name)) {
1177         return ENAMETOOLONG;
1178     }
1179     if (strncmp(toku_product_name, name, sizeof(toku_product_name))) {
1180         strcpy(toku_product_name, name);
1181         tokuft_update_product_name_strings();
1182     }
1183     return 0;
1184 }
1185 
1186