1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include <my_global.h>
40 #include "ft/serialize/block_table.h"
41 #include "ft/ft.h"
42 #include "ft/ft-cachetable-wrappers.h"
43 #include "ft/ft-internal.h"
44 #include "ft/logger/log-internal.h"
45 #include "ft/log_header.h"
46 #include "ft/node.h"
47 #include "ft/serialize/ft-serialize.h"
48 #include "ft/serialize/ft_node-serialize.h"
49 
50 #include <memory.h>
51 #include <toku_assert.h>
52 #include <portability/toku_atomic.h>
53 
54 toku_instr_key *ft_ref_lock_mutex_key;
55 
toku_reset_root_xid_that_created(FT ft,TXNID new_root_xid_that_created)56 void toku_reset_root_xid_that_created(FT ft, TXNID new_root_xid_that_created) {
57     // Reset the root_xid_that_created field to the given value.
58     // This redefines which xid created the dictionary.
59 
60     // hold lock around setting and clearing of dirty bit
61     // (see cooperative use of dirty bit in ft_begin_checkpoint())
62     toku_ft_lock(ft);
63     ft->h->root_xid_that_created = new_root_xid_that_created;
64     ft->h->set_dirty();
65     toku_ft_unlock(ft);
66 }
67 
68 static void
ft_destroy(FT ft)69 ft_destroy(FT ft) {
70     //header and checkpoint_header have same Blocktable pointer
71     //cannot destroy since it is still in use by CURRENT
72     assert(ft->h->type == FT_CURRENT);
73     ft->blocktable.destroy();
74     ft->cmp.destroy();
75     toku_destroy_dbt(&ft->descriptor.dbt);
76     toku_destroy_dbt(&ft->cmp_descriptor.dbt);
77     toku_ft_destroy_reflock(ft);
78     toku_free(ft->h);
79 }
80 
81 // Make a copy of the header for the purpose of a checkpoint
82 // Not reentrant for a single FT.
83 // See ft_checkpoint for explanation of why
84 // FT lock must be held.
85 static void
ft_copy_for_checkpoint_unlocked(FT ft,LSN checkpoint_lsn)86 ft_copy_for_checkpoint_unlocked(FT ft, LSN checkpoint_lsn) {
87     assert(ft->h->type == FT_CURRENT);
88     assert(ft->checkpoint_header == NULL);
89 
90     FT_HEADER XMEMDUP(ch, ft->h);
91     ch->type = FT_CHECKPOINT_INPROGRESS; //Different type
92     //printf("checkpoint_lsn=%" PRIu64 "\n", checkpoint_lsn.lsn);
93     ch->checkpoint_lsn = checkpoint_lsn;
94 
95     //ch->blocktable is SHARED between the two headers
96     ft->checkpoint_header = ch;
97 }
98 
99 void
toku_ft_free(FT ft)100 toku_ft_free (FT ft) {
101     ft_destroy(ft);
102     toku_free(ft);
103 }
104 
toku_ft_init_reflock(FT ft)105 void toku_ft_init_reflock(FT ft) {
106     toku_mutex_init(*ft_ref_lock_mutex_key, &ft->ft_ref_lock, nullptr);
107 }
108 
toku_ft_destroy_reflock(FT ft)109 void toku_ft_destroy_reflock(FT ft) { toku_mutex_destroy(&ft->ft_ref_lock); }
110 
111 void
toku_ft_grab_reflock(FT ft)112 toku_ft_grab_reflock(FT ft) {
113     toku_mutex_lock(&ft->ft_ref_lock);
114 }
115 
116 void
toku_ft_release_reflock(FT ft)117 toku_ft_release_reflock(FT ft) {
118     toku_mutex_unlock(&ft->ft_ref_lock);
119 }
120 
121 /////////////////////////////////////////////////////////////////////////
122 // Start of Functions that are callbacks to the cachefule
123 //
124 
125 // maps to cf->log_fassociate_during_checkpoint
126 static void
ft_log_fassociate_during_checkpoint(CACHEFILE cf,void * header_v)127 ft_log_fassociate_during_checkpoint (CACHEFILE cf, void *header_v) {
128     FT ft = (FT) header_v;
129     char* fname_in_env = toku_cachefile_fname_in_env(cf);
130     BYTESTRING bs = { .len = (uint32_t) strlen(fname_in_env), // don't include the NUL
131                       .data = fname_in_env };
132     TOKULOGGER logger = toku_cachefile_logger(cf);
133     FILENUM filenum = toku_cachefile_filenum(cf);
134     bool unlink_on_close = toku_cachefile_is_unlink_on_close(cf);
135     toku_log_fassociate(logger, NULL, 0, filenum, ft->h->flags, bs, unlink_on_close);
136 }
137 
138 // Maps to cf->begin_checkpoint_userdata
139 // Create checkpoint-in-progress versions of header and translation (btt)
140 // Has access to fd (it is protected).
141 //
142 // Not reentrant for a single FT (see ft_checkpoint)
ft_begin_checkpoint(LSN checkpoint_lsn,void * header_v)143 static void ft_begin_checkpoint (LSN checkpoint_lsn, void *header_v) {
144     FT ft = (FT) header_v;
145     // hold lock around copying and clearing of dirty bit
146     toku_ft_lock (ft);
147     assert(ft->h->type == FT_CURRENT);
148     assert(ft->checkpoint_header == NULL);
149     ft_copy_for_checkpoint_unlocked(ft, checkpoint_lsn);
150     ft->h->clear_dirty();             // this is only place this bit is cleared        (in currentheader)
151     ft->blocktable.note_start_checkpoint_unlocked();
152     toku_ft_unlock (ft);
153 }
154 
155 // #4922: Hack to remove data corruption race condition.
156 // Reading (and upgrading) a node up to version 19 causes this.
157 // We COULD skip this if we know that no nodes remained (as of last checkpoint)
158 // that are below version 19.
159 // If there are no nodes < version 19 this is harmless (field is unused).
160 // If there are, this will make certain the value is at least as low as necessary,
161 // and not much lower.  (Too low is good, too high can cause data corruption).
162 // TODO(yoni): If we ever stop supporting upgrades of nodes < version 19 we can delete this.
163 // TODO(yoni): If we know no nodes are left to upgrade, we can skip this. (Probably not worth doing).
164 static void
ft_hack_highest_unused_msn_for_upgrade_for_checkpoint(FT ft)165 ft_hack_highest_unused_msn_for_upgrade_for_checkpoint(FT ft) {
166     if (ft->h->layout_version_original < FT_LAYOUT_VERSION_19) {
167         ft->checkpoint_header->highest_unused_msn_for_upgrade = ft->h->highest_unused_msn_for_upgrade;
168     }
169 }
170 
171 // maps to cf->checkpoint_userdata
172 // Write checkpoint-in-progress versions of header and translation to disk (really to OS internal buffer).
173 // Copy current header's version of checkpoint_staging stat64info to checkpoint header.
174 // Must have access to fd (protected).
175 // Requires: all pending bits are clear.  This implies that no thread will modify the checkpoint_staging
176 // version of the stat64info.
177 //
178 // No locks are taken for checkpoint_count/lsn because this is single threaded.  Can be called by:
179 //  - ft_close
180 //  - end_checkpoint
181 // checkpoints hold references to FTs and so they cannot be closed during a checkpoint.
182 // ft_close is not reentrant for a single FT
183 // end_checkpoint is not reentrant period
ft_checkpoint(CACHEFILE cf,int fd,void * header_v)184 static void ft_checkpoint (CACHEFILE cf, int fd, void *header_v) {
185     FT ft = (FT) header_v;
186     FT_HEADER ch = ft->checkpoint_header;
187     assert(ch);
188     assert(ch->type == FT_CHECKPOINT_INPROGRESS);
189     if (ch->dirty()) {            // this is only place this bit is tested (in checkpoint_header)
190         TOKULOGGER logger = toku_cachefile_logger(cf);
191         if (logger) {
192             toku_logger_fsync_if_lsn_not_fsynced(logger, ch->checkpoint_lsn);
193         }
194         uint64_t now = (uint64_t) time(NULL);
195         ft->h->time_of_last_modification = now;
196         ch->time_of_last_modification = now;
197         ch->checkpoint_count++;
198         ft_hack_highest_unused_msn_for_upgrade_for_checkpoint(ft);
199         ch->on_disk_logical_rows =
200             ft->h->on_disk_logical_rows = ft->in_memory_logical_rows;
201 
202         // write translation and header to disk (or at least to OS internal buffer)
203         toku_serialize_ft_to(fd, ch, &ft->blocktable, ft->cf);
204         ch->clear_dirty();                      // this is only place this bit is cleared (in checkpoint_header)
205 
206         // fsync the cachefile
207         toku_cachefile_fsync(cf);
208         ft->h->checkpoint_count++;        // checkpoint succeeded, next checkpoint will save to alternate header location
209         ft->h->checkpoint_lsn = ch->checkpoint_lsn;  //Header updated.
210     } else {
211         ft->blocktable.note_skipped_checkpoint();
212     }
213 }
214 
215 // maps to cf->end_checkpoint_userdata
216 // free unused disk space
217 // (i.e. tell BlockAllocator to liberate blocks used by previous checkpoint).
218 // Must have access to fd (protected)
ft_end_checkpoint(CACHEFILE UU (cf),int fd,void * header_v)219 static void ft_end_checkpoint(CACHEFILE UU(cf), int fd, void *header_v) {
220     FT ft = (FT) header_v;
221     assert(ft->h->type == FT_CURRENT);
222     ft->blocktable.note_end_checkpoint(fd);
223     toku_free(ft->checkpoint_header);
224     ft->checkpoint_header = nullptr;
225 }
226 
227 // maps to cf->close_userdata
228 // Has access to fd (it is protected).
ft_close(CACHEFILE cachefile,int fd,void * header_v,bool oplsn_valid,LSN oplsn)229 static void ft_close(CACHEFILE cachefile, int fd, void *header_v, bool oplsn_valid, LSN oplsn) {
230     FT ft = (FT) header_v;
231     assert(ft->h->type == FT_CURRENT);
232     // We already have exclusive access to this field already, so skip the locking.
233     // This should already never fail.
234     invariant(!toku_ft_needed_unlocked(ft));
235     assert(ft->cf == cachefile);
236     TOKULOGGER logger = toku_cachefile_logger(cachefile);
237     LSN lsn = ZERO_LSN;
238     //Get LSN
239     if (oplsn_valid) {
240         //Use recovery-specified lsn
241         lsn = oplsn;
242         //Recovery cannot reduce lsn of a header.
243         if (lsn.lsn < ft->h->checkpoint_lsn.lsn) {
244             lsn = ft->h->checkpoint_lsn;
245         }
246     }
247     else {
248         //Get LSN from logger
249         lsn = ZERO_LSN; // if there is no logger, we use zero for the lsn
250         if (logger) {
251             char* fname_in_env = toku_cachefile_fname_in_env(cachefile);
252             assert(fname_in_env);
253             BYTESTRING bs = {.len=(uint32_t) strlen(fname_in_env), .data=fname_in_env};
254             if (!toku_cachefile_is_skip_log_recover_on_close(cachefile)) {
255                 toku_log_fclose(
256                     logger,
257                     &lsn,
258                     ft->h->dirty(),
259                     bs,
260                     toku_cachefile_filenum(cachefile));  // flush the log on
261                                                          // close (if new header
262                                                          // is being written),
263                                                          // otherwise it might
264                                                          // not make it out.
265                 toku_cachefile_do_log_recover_on_close(cachefile);
266             }
267         }
268     }
269     if (ft->h->dirty()) {               // this is the only place this bit is tested (in currentheader)
270         bool do_checkpoint = true;
271         if (logger && logger->rollback_cachefile == cachefile) {
272             do_checkpoint = false;
273         }
274         if (do_checkpoint) {
275             ft_begin_checkpoint(lsn, header_v);
276             ft_checkpoint(cachefile, fd, ft);
277             ft_end_checkpoint(cachefile, fd, header_v);
278             assert(!ft->h->dirty()); // dirty bit should be cleared by begin_checkpoint and never set again (because we're closing the dictionary)
279         }
280     }
281 }
282 
283 // maps to cf->free_userdata
ft_free(CACHEFILE cachefile UU (),void * header_v)284 static void ft_free(CACHEFILE cachefile UU(), void *header_v) {
285     FT ft = (FT) header_v;
286     toku_ft_free(ft);
287 }
288 
289 // maps to cf->note_pin_by_checkpoint
290 //Must be protected by ydb lock.
291 //Is only called by checkpoint begin, which holds it
ft_note_pin_by_checkpoint(CACHEFILE UU (cachefile),void * header_v)292 static void ft_note_pin_by_checkpoint (CACHEFILE UU(cachefile), void *header_v) {
293     // Note: open_close lock is held by checkpoint begin
294     FT ft = (FT) header_v;
295     toku_ft_grab_reflock(ft);
296     assert(!ft->pinned_by_checkpoint);
297     assert(toku_ft_needed_unlocked(ft));
298     ft->pinned_by_checkpoint = true;
299     toku_ft_release_reflock(ft);
300 }
301 
302 // Requires: the reflock is held.
unpin_by_checkpoint_callback(FT ft,void * extra)303 static void unpin_by_checkpoint_callback(FT ft, void *extra) {
304     invariant(extra == NULL);
305     invariant(ft->pinned_by_checkpoint);
306     ft->pinned_by_checkpoint = false;
307 }
308 
309 // maps to cf->note_unpin_by_checkpoint
310 //Must be protected by ydb lock.
311 //Called by end_checkpoint, which grabs ydb lock around note_unpin
ft_note_unpin_by_checkpoint(CACHEFILE UU (cachefile),void * header_v)312 static void ft_note_unpin_by_checkpoint (CACHEFILE UU(cachefile), void *header_v) {
313     FT ft = (FT) header_v;
314     toku_ft_remove_reference(ft, false, ZERO_LSN, unpin_by_checkpoint_callback, NULL);
315 }
316 
317 //
318 // End of Functions that are callbacks to the cachefile
319 /////////////////////////////////////////////////////////////////////////
320 
setup_initial_ft_root_node(FT ft,BLOCKNUM blocknum)321 static void setup_initial_ft_root_node(FT ft, BLOCKNUM blocknum) {
322     FTNODE XCALLOC(node);
323     toku_initialize_empty_ftnode(node, blocknum, 0, 1, ft->h->layout_version, ft->h->flags);
324     BP_STATE(node,0) = PT_AVAIL;
325 
326     uint32_t fullhash = toku_cachetable_hash(ft->cf, blocknum);
327     node->fullhash = fullhash;
328     toku_cachetable_put(ft->cf, blocknum, fullhash,
329                         node, make_ftnode_pair_attr(node),
330                         get_write_callbacks_for_node(ft),
331                         toku_ftnode_save_ct_pair);
332     toku_unpin_ftnode(ft, node);
333 }
334 
ft_init(FT ft,FT_OPTIONS options,CACHEFILE cf)335 static void ft_init(FT ft, FT_OPTIONS options, CACHEFILE cf) {
336     // fake, prevent unnecessary upgrade logic
337     ft->layout_version_read_from_disk = FT_LAYOUT_VERSION;
338     ft->checkpoint_header = NULL;
339 
340     toku_list_init(&ft->live_ft_handles);
341 
342     // intuitively, the comparator points to the FT's cmp descriptor
343     ft->cmp.create(options->compare_fun, &ft->cmp_descriptor, options->memcmp_magic);
344     ft->update_fun = options->update_fun;
345 
346     if (ft->cf != NULL) {
347         assert(ft->cf == cf);
348     }
349     ft->cf = cf;
350     ft->in_memory_stats = ZEROSTATS;
351 
352     setup_initial_ft_root_node(ft, ft->h->root_blocknum);
353     toku_cachefile_set_userdata(ft->cf,
354                                 ft,
355                                 ft_log_fassociate_during_checkpoint,
356                                 ft_close,
357                                 ft_free,
358                                 ft_checkpoint,
359                                 ft_begin_checkpoint,
360                                 ft_end_checkpoint,
361                                 ft_note_pin_by_checkpoint,
362                                 ft_note_unpin_by_checkpoint);
363 
364     ft->blocktable.verify_no_free_blocknums();
365 }
366 
367 
368 static FT_HEADER
ft_header_create(FT_OPTIONS options,BLOCKNUM root_blocknum,TXNID root_xid_that_created)369 ft_header_create(FT_OPTIONS options, BLOCKNUM root_blocknum, TXNID root_xid_that_created)
370 {
371     uint64_t now = (uint64_t) time(NULL);
372     struct ft_header h = {
373         .type = FT_CURRENT,
374         .dirty_ = 0,
375         .checkpoint_count = 0,
376         .checkpoint_lsn = ZERO_LSN,
377         .layout_version = FT_LAYOUT_VERSION,
378         .layout_version_original = FT_LAYOUT_VERSION,
379         .build_id = BUILD_ID,
380         .build_id_original = BUILD_ID,
381         .time_of_creation = now,
382         .root_xid_that_created = root_xid_that_created,
383         .time_of_last_modification = now,
384         .time_of_last_verification = 0,
385         .root_blocknum = root_blocknum,
386         .flags = options->flags,
387         .nodesize = options->nodesize,
388         .basementnodesize = options->basementnodesize,
389         .compression_method = options->compression_method,
390         .fanout = options->fanout,
391         .highest_unused_msn_for_upgrade = { .msn = (MIN_MSN.msn - 1) },
392         .max_msn_in_ft = ZERO_MSN,
393         .time_of_last_optimize_begin = 0,
394         .time_of_last_optimize_end = 0,
395         .count_of_optimize_in_progress = 0,
396         .count_of_optimize_in_progress_read_from_disk = 0,
397         .msn_at_start_of_last_completed_optimize = ZERO_MSN,
398         .on_disk_stats = ZEROSTATS,
399         .on_disk_logical_rows = 0
400     };
401     return (FT_HEADER) toku_xmemdup(&h, sizeof h);
402 }
403 
404 // allocate and initialize a fractal tree.
toku_ft_create(FT * ftp,FT_OPTIONS options,CACHEFILE cf,TOKUTXN txn)405 void toku_ft_create(FT *ftp, FT_OPTIONS options, CACHEFILE cf, TOKUTXN txn) {
406     invariant(ftp);
407 
408     FT XCALLOC(ft);
409     ft->h = ft_header_create(options, make_blocknum(0), (txn ? txn->txnid.parent_id64: TXNID_NONE));
410 
411     toku_ft_init_reflock(ft);
412 
413     // Assign blocknum for root block, also dirty the header
414     ft->blocktable.create();
415     ft->blocktable.allocate_blocknum(&ft->h->root_blocknum, ft);
416 
417     ft_init(ft, options, cf);
418 
419     *ftp = ft;
420 }
421 
422 // TODO: (Zardosht) get rid of ft parameter
toku_read_ft_and_store_in_cachefile(FT_HANDLE ft_handle,CACHEFILE cf,LSN max_acceptable_lsn,FT * header)423 int toku_read_ft_and_store_in_cachefile (FT_HANDLE ft_handle, CACHEFILE cf, LSN max_acceptable_lsn, FT *header)
424 // If the cachefile already has the header, then just get it.
425 // If the cachefile has not been initialized, then don't modify anything.
426 // max_acceptable_lsn is the latest acceptable checkpointed version of the file.
427 {
428     FT ft = nullptr;
429     if ((ft = (FT) toku_cachefile_get_userdata(cf)) != nullptr) {
430         *header = ft;
431         assert(ft_handle->options.update_fun == ft->update_fun);
432         return 0;
433     }
434 
435     int fd = toku_cachefile_get_fd(cf);
436     const char *fn = toku_cachefile_fname_in_env(cf);
437     int r = toku_deserialize_ft_from(fd, fn, max_acceptable_lsn, &ft);
438     if (r == TOKUDB_BAD_CHECKSUM) {
439         fprintf(stderr, "Checksum failure while reading header in file %s.\n", toku_cachefile_fname_in_env(cf));
440         assert(false);  // make absolutely sure we crash before doing anything else
441     } else if (r != 0) {
442         return r;
443     }
444 
445     invariant_notnull(ft);
446     // intuitively, the comparator points to the FT's cmp descriptor
447     ft->cmp.create(ft_handle->options.compare_fun, &ft->cmp_descriptor, ft_handle->options.memcmp_magic);
448     ft->update_fun = ft_handle->options.update_fun;
449     ft->cf = cf;
450     toku_cachefile_set_userdata(cf,
451                                 reinterpret_cast<void *>(ft),
452                                 ft_log_fassociate_during_checkpoint,
453                                 ft_close,
454                                 ft_free,
455                                 ft_checkpoint,
456                                 ft_begin_checkpoint,
457                                 ft_end_checkpoint,
458                                 ft_note_pin_by_checkpoint,
459                                 ft_note_unpin_by_checkpoint);
460     *header = ft;
461     return 0;
462 }
463 
464 void
toku_ft_note_ft_handle_open(FT ft,FT_HANDLE live)465 toku_ft_note_ft_handle_open(FT ft, FT_HANDLE live) {
466     toku_ft_grab_reflock(ft);
467     live->ft = ft;
468     toku_list_push(&ft->live_ft_handles, &live->live_ft_handle_link);
469     toku_ft_release_reflock(ft);
470 }
471 
472 // the reference count for a ft is the number of txn's that
473 // touched it plus the number of open handles plus one if
474 // pinned by a checkpoint.
475 static int
ft_get_reference_count(FT ft)476 ft_get_reference_count(FT ft) {
477     uint32_t pinned_by_checkpoint = ft->pinned_by_checkpoint ? 1 : 0;
478     int num_handles = toku_list_num_elements_est(&ft->live_ft_handles);
479     return pinned_by_checkpoint + ft->num_txns + num_handles;
480 }
481 
482 // a ft is needed in memory iff its reference count is non-zero
483 bool
toku_ft_needed_unlocked(FT ft)484 toku_ft_needed_unlocked(FT ft) {
485     return ft_get_reference_count(ft) != 0;
486 }
487 
488 // get the reference count and return true if it was 1
489 bool
toku_ft_has_one_reference_unlocked(FT ft)490 toku_ft_has_one_reference_unlocked(FT ft) {
491     return ft_get_reference_count(ft) == 1;
492 }
493 
494 // evict a ft from memory by closing its cachefile. any future work
495 // will have to read in the ft in a new cachefile and new FT object.
toku_ft_evict_from_memory(FT ft,bool oplsn_valid,LSN oplsn)496 void toku_ft_evict_from_memory(FT ft, bool oplsn_valid, LSN oplsn) {
497     assert(ft->cf);
498     toku_cachefile_close(&ft->cf, oplsn_valid, oplsn);
499 }
500 
501 // Verifies there exists exactly one ft handle and returns it.
toku_ft_get_only_existing_ft_handle(FT ft)502 FT_HANDLE toku_ft_get_only_existing_ft_handle(FT ft) {
503     FT_HANDLE ft_handle_ret = NULL;
504     toku_ft_grab_reflock(ft);
505     assert(toku_list_num_elements_est(&ft->live_ft_handles) == 1);
506     ft_handle_ret = toku_list_struct(toku_list_head(&ft->live_ft_handles), struct ft_handle, live_ft_handle_link);
507     toku_ft_release_reflock(ft);
508     return ft_handle_ret;
509 }
510 
511 // Purpose: set fields in ft_header to capture accountability info for start of HOT optimize.
512 // Note: HOT accountability variables in header are modified only while holding header lock.
513 //       (Header lock is really needed for touching the dirty bit, but it's useful and
514 //       convenient here for keeping the HOT variables threadsafe.)
515 void
toku_ft_note_hot_begin(FT_HANDLE ft_handle)516 toku_ft_note_hot_begin(FT_HANDLE ft_handle) {
517     FT ft = ft_handle->ft;
518     time_t now = time(NULL);
519 
520     // hold lock around setting and clearing of dirty bit
521     // (see cooperative use of dirty bit in ft_begin_checkpoint())
522     toku_ft_lock(ft);
523     ft->h->time_of_last_optimize_begin = now;
524     ft->h->count_of_optimize_in_progress++;
525     ft->h->set_dirty();
526     toku_ft_unlock(ft);
527 }
528 
529 
530 // Purpose: set fields in ft_header to capture accountability info for end of HOT optimize.
531 // Note: See note for toku_ft_note_hot_begin().
532 void
toku_ft_note_hot_complete(FT_HANDLE ft_handle,bool success,MSN msn_at_start_of_hot)533 toku_ft_note_hot_complete(FT_HANDLE ft_handle, bool success, MSN msn_at_start_of_hot) {
534     FT ft = ft_handle->ft;
535     time_t now = time(NULL);
536 
537     toku_ft_lock(ft);
538     ft->h->count_of_optimize_in_progress--;
539     if (success) {
540         ft->h->time_of_last_optimize_end = now;
541         ft->h->msn_at_start_of_last_completed_optimize = msn_at_start_of_hot;
542         // If we just successfully completed an optimization and no other thread is performing
543         // an optimization, then the number of optimizations in progress is zero.
544         // If there was a crash during a HOT optimization, this is how count_of_optimize_in_progress
545         // would be reset to zero on the disk after recovery from that crash.
546         if (ft->h->count_of_optimize_in_progress == ft->h->count_of_optimize_in_progress_read_from_disk)
547             ft->h->count_of_optimize_in_progress = 0;
548     }
549     ft->h->set_dirty();
550     toku_ft_unlock(ft);
551 }
552 
553 
554 void
toku_ft_init(FT ft,BLOCKNUM root_blocknum_on_disk,LSN checkpoint_lsn,TXNID root_xid_that_created,uint32_t target_nodesize,uint32_t target_basementnodesize,enum toku_compression_method compression_method,uint32_t fanout)555 toku_ft_init(FT ft,
556              BLOCKNUM root_blocknum_on_disk,
557              LSN checkpoint_lsn,
558              TXNID root_xid_that_created,
559              uint32_t target_nodesize,
560              uint32_t target_basementnodesize,
561              enum toku_compression_method compression_method,
562              uint32_t fanout
563              )
564 {
565     memset(ft, 0, sizeof *ft);
566     struct ft_options options = {
567         .nodesize = target_nodesize,
568         .basementnodesize = target_basementnodesize,
569         .compression_method = compression_method,
570         .fanout = fanout,
571         .flags = 0,
572         .memcmp_magic = 0,
573         .compare_fun = NULL,
574         .update_fun = NULL
575     };
576     ft->h = ft_header_create(&options, root_blocknum_on_disk, root_xid_that_created);
577     ft->h->checkpoint_count = 1;
578     ft->h->checkpoint_lsn   = checkpoint_lsn;
579 }
580 
581 // Open an ft for use by redirect.  The new ft must have the same dict_id as the old_ft passed in.  (FILENUM is assigned by the ft_handle_open() function.)
582 static int
ft_handle_open_for_redirect(FT_HANDLE * new_ftp,const char * fname_in_env,TOKUTXN txn,FT old_ft)583 ft_handle_open_for_redirect(FT_HANDLE *new_ftp, const char *fname_in_env, TOKUTXN txn, FT old_ft) {
584     FT_HANDLE ft_handle;
585     assert(old_ft->dict_id.dictid != DICTIONARY_ID_NONE.dictid);
586     toku_ft_handle_create(&ft_handle);
587     toku_ft_set_bt_compare(ft_handle, old_ft->cmp.get_compare_func());
588     toku_ft_set_update(ft_handle, old_ft->update_fun);
589     toku_ft_handle_set_nodesize(ft_handle, old_ft->h->nodesize);
590     toku_ft_handle_set_basementnodesize(ft_handle, old_ft->h->basementnodesize);
591     toku_ft_handle_set_compression_method(ft_handle, old_ft->h->compression_method);
592     toku_ft_handle_set_fanout(ft_handle, old_ft->h->fanout);
593     CACHETABLE ct = toku_cachefile_get_cachetable(old_ft->cf);
594     int r = toku_ft_handle_open_with_dict_id(ft_handle, fname_in_env, 0, 0, ct, txn, old_ft->dict_id);
595     if (r != 0) {
596         goto cleanup;
597     }
598     assert(ft_handle->ft->dict_id.dictid == old_ft->dict_id.dictid);
599     *new_ftp = ft_handle;
600 
601  cleanup:
602     if (r != 0) {
603         toku_ft_handle_close(ft_handle);
604     }
605     return r;
606 }
607 
608 // This function performs most of the work to redirect a dictionary to different file.
609 // It is called for redirect and to abort a redirect.  (This function is almost its own inverse.)
610 static int
dictionary_redirect_internal(const char * dst_fname_in_env,FT src_ft,TOKUTXN txn,FT * dst_ftp)611 dictionary_redirect_internal(const char *dst_fname_in_env, FT src_ft, TOKUTXN txn, FT *dst_ftp) {
612     int r;
613 
614     FILENUM src_filenum = toku_cachefile_filenum(src_ft->cf);
615     FILENUM dst_filenum = FILENUM_NONE;
616 
617     FT dst_ft = NULL;
618     struct toku_list *list;
619     // open a dummy ft based off of
620     // dst_fname_in_env to get the header
621     // then we will change all the ft's to have
622     // their headers point to dst_ft instead of src_ft
623     FT_HANDLE tmp_dst_ft = NULL;
624     r = ft_handle_open_for_redirect(&tmp_dst_ft, dst_fname_in_env, txn, src_ft);
625     if (r != 0) {
626         goto cleanup;
627     }
628     dst_ft = tmp_dst_ft->ft;
629 
630     // some sanity checks on dst_filenum
631     dst_filenum = toku_cachefile_filenum(dst_ft->cf);
632     assert(dst_filenum.fileid!=FILENUM_NONE.fileid);
633     assert(dst_filenum.fileid!=src_filenum.fileid); //Cannot be same file.
634 
635     // for each live ft_handle, ft_handle->ft is currently src_ft
636     // we want to change it to dummy_dst
637     toku_ft_grab_reflock(src_ft);
638     while (!toku_list_empty(&src_ft->live_ft_handles)) {
639         list = src_ft->live_ft_handles.next;
640         FT_HANDLE src_handle = NULL;
641         src_handle = toku_list_struct(list, struct ft_handle, live_ft_handle_link);
642 
643         toku_list_remove(&src_handle->live_ft_handle_link);
644 
645         toku_ft_note_ft_handle_open(dst_ft, src_handle);
646         if (src_handle->redirect_callback) {
647             src_handle->redirect_callback(src_handle, src_handle->redirect_callback_extra);
648         }
649     }
650     assert(dst_ft);
651     // making sure that we are not leaking src_ft
652     assert(toku_ft_needed_unlocked(src_ft));
653     toku_ft_release_reflock(src_ft);
654 
655     toku_ft_handle_close(tmp_dst_ft);
656 
657     *dst_ftp = dst_ft;
658 cleanup:
659     return r;
660 }
661 
662 
663 
664 //This is the 'abort redirect' function.  The redirect of old_ft to new_ft was done
665 //and now must be undone, so here we redirect new_ft back to old_ft.
666 int
toku_dictionary_redirect_abort(FT old_ft,FT new_ft,TOKUTXN txn)667 toku_dictionary_redirect_abort(FT old_ft, FT new_ft, TOKUTXN txn) {
668     char *old_fname_in_env = toku_cachefile_fname_in_env(old_ft->cf);
669     int r;
670     {
671         FILENUM old_filenum = toku_cachefile_filenum(old_ft->cf);
672         FILENUM new_filenum = toku_cachefile_filenum(new_ft->cf);
673         assert(old_filenum.fileid!=new_filenum.fileid); //Cannot be same file.
674 
675         //No living fts in old header.
676         toku_ft_grab_reflock(old_ft);
677         assert(toku_list_empty(&old_ft->live_ft_handles));
678         toku_ft_release_reflock(old_ft);
679     }
680 
681     FT dst_ft;
682     // redirect back from new_ft to old_ft
683     r = dictionary_redirect_internal(old_fname_in_env, new_ft, txn, &dst_ft);
684     if (r == 0) {
685         assert(dst_ft == old_ft);
686     }
687     return r;
688 }
689 
690 /****
691  * on redirect or abort:
692  *  if redirect txn_note_doing_work(txn)
693  *  if redirect connect src ft to txn (txn modified this ft)
694  *  for each src ft
695  *    open ft to dst file (create new ft struct)
696  *    if redirect connect dst ft to txn
697  *    redirect db to new ft
698  *    redirect cursors to new ft
699  *  close all src fts
700  *  if redirect make rollback log entry
701  *
702  * on commit:
703  *   nothing to do
704  *
705  *****/
706 
707 int
toku_dictionary_redirect(const char * dst_fname_in_env,FT_HANDLE old_ft_h,TOKUTXN txn)708 toku_dictionary_redirect (const char *dst_fname_in_env, FT_HANDLE old_ft_h, TOKUTXN txn) {
709 // Input args:
710 //   new file name for dictionary (relative to env)
711 //   old_ft_h is a live ft of open handle ({DB, FT_HANDLE} pair) that currently refers to old dictionary file.
712 //   (old_ft_h may be one of many handles to the dictionary.)
713 //   txn that created the loader
714 // Requires:
715 //   multi operation lock is held.
716 //   The ft is open.  (which implies there can be no zombies.)
717 //   The new file must be a valid dictionary.
718 //   The block size and flags in the new file must match the existing FT.
719 //   The new file must already have its descriptor in it (and it must match the existing descriptor).
720 // Effect:
721 //   Open new FTs (and related header and cachefile) to the new dictionary file with a new FILENUM.
722 //   Redirect all DBs that point to fts that point to the old file to point to fts that point to the new file.
723 //   Copy the dictionary id (dict_id) from the header of the original file to the header of the new file.
724 //   Create a rollback log entry.
725 //   The original FT, header, cachefile and file remain unchanged.  They will be cleaned up on commmit.
726 //   If the txn aborts, then this operation will be undone
727     int r;
728 
729     FT old_ft = old_ft_h->ft;
730 
731     // dst file should not be open.  (implies that dst and src are different because src must be open.)
732     {
733         CACHETABLE ct = toku_cachefile_get_cachetable(old_ft->cf);
734         CACHEFILE cf;
735         r = toku_cachefile_of_iname_in_env(ct, dst_fname_in_env, &cf);
736         if (r==0) {
737             r = EINVAL;
738             goto cleanup;
739         }
740         assert(r==ENOENT);
741         r = 0;
742     }
743 
744     if (txn) {
745         toku_txn_maybe_note_ft(txn, old_ft);  // mark old ft as touched by this txn
746     }
747 
748     FT new_ft;
749     r = dictionary_redirect_internal(dst_fname_in_env, old_ft, txn, &new_ft);
750     if (r != 0) {
751         goto cleanup;
752     }
753 
754     // make rollback log entry
755     if (txn) {
756         toku_txn_maybe_note_ft(txn, new_ft); // mark new ft as touched by this txn
757 
758         // There is no recovery log entry for redirect,
759         // and rollback log entries are not allowed for read-only transactions.
760         // Normally the recovery log entry would ensure the begin was logged.
761         if (!txn->begin_was_logged) {
762           toku_maybe_log_begin_txn_for_write_operation(txn);
763         }
764         FILENUM old_filenum = toku_cachefile_filenum(old_ft->cf);
765         FILENUM new_filenum = toku_cachefile_filenum(new_ft->cf);
766         toku_logger_save_rollback_dictionary_redirect(txn, old_filenum, new_filenum);
767     }
768 
769 cleanup:
770     return r;
771 }
772 
773 // Insert reference to transaction into ft
774 void
toku_ft_add_txn_ref(FT ft)775 toku_ft_add_txn_ref(FT ft) {
776     toku_ft_grab_reflock(ft);
777     ++ft->num_txns;
778     toku_ft_release_reflock(ft);
779 }
780 
781 static void
remove_txn_ref_callback(FT ft,void * UU (context))782 remove_txn_ref_callback(FT ft, void *UU(context)) {
783     invariant(ft->num_txns > 0);
784     --ft->num_txns;
785 }
786 
787 void
toku_ft_remove_txn_ref(FT ft)788 toku_ft_remove_txn_ref(FT ft) {
789     toku_ft_remove_reference(ft, false, ZERO_LSN, remove_txn_ref_callback, NULL);
790 }
791 
toku_calculate_root_offset_pointer(FT ft,CACHEKEY * root_key,uint32_t * roothash)792 void toku_calculate_root_offset_pointer (
793     FT ft,
794     CACHEKEY* root_key,
795     uint32_t *roothash
796     )
797 {
798     *roothash = toku_cachetable_hash(ft->cf, ft->h->root_blocknum);
799     *root_key = ft->h->root_blocknum;
800 }
801 
toku_ft_set_new_root_blocknum(FT ft,CACHEKEY new_root_key)802 void toku_ft_set_new_root_blocknum(
803     FT ft,
804     CACHEKEY new_root_key
805     )
806 {
807     ft->h->root_blocknum = new_root_key;
808 }
809 
toku_ft_checkpoint_lsn(FT ft)810 LSN toku_ft_checkpoint_lsn(FT ft) {
811     return ft->h->checkpoint_lsn;
812 }
813 
814 void
toku_ft_stat64(FT ft,struct ftstat64_s * s)815 toku_ft_stat64 (FT ft, struct ftstat64_s *s) {
816     s->fsize = toku_cachefile_size(ft->cf);
817     // just use the in memory stats from the header
818     // prevent appearance of negative numbers for numrows, numbytes
819     // if the logical count was never properly re-counted on an upgrade,
820     // return the existing physical count instead.
821     int64_t n;
822     if (ft->in_memory_logical_rows == (uint64_t)-1) {
823         n = ft->in_memory_stats.numrows;
824     } else {
825         n = ft->in_memory_logical_rows;
826     }
827     if (n < 0) {
828         n = 0;
829     }
830     s->nkeys = s->ndata = n;
831     n = ft->in_memory_stats.numbytes;
832     if (n < 0) {
833         n = 0;
834     }
835     s->dsize = n;
836     s->create_time_sec = ft->h->time_of_creation;
837     s->modify_time_sec = ft->h->time_of_last_modification;
838     s->verify_time_sec = ft->h->time_of_last_verification;
839 }
840 
toku_ft_get_fractal_tree_info64(FT ft,struct ftinfo64 * info)841 void toku_ft_get_fractal_tree_info64(FT ft, struct ftinfo64 *info) {
842     ft->blocktable.get_info64(info);
843 }
844 
toku_ft_iterate_fractal_tree_block_map(FT ft,int (* iter)(uint64_t,int64_t,int64_t,int64_t,int64_t,void *),void * iter_extra)845 int toku_ft_iterate_fractal_tree_block_map(FT ft, int (*iter)(uint64_t,int64_t,int64_t,int64_t,int64_t,void*), void *iter_extra) {
846     uint64_t this_checkpoint_count = ft->h->checkpoint_count;
847     return ft->blocktable.iterate_translation_tables(this_checkpoint_count, iter, iter_extra);
848 }
849 
850 void
toku_ft_update_descriptor(FT ft,DESCRIPTOR desc)851 toku_ft_update_descriptor(FT ft, DESCRIPTOR desc)
852 // Effect: Changes the descriptor in a tree (log the change, make sure it makes it to disk eventually).
853 // requires: the ft is fully user-opened with a valid cachefile.
854 //           descriptor updates cannot happen in parallel for an FT
855 //           (ydb layer uses a row lock to enforce this)
856 {
857     assert(ft->cf);
858     int fd = toku_cachefile_get_fd(ft->cf);
859     toku_ft_update_descriptor_with_fd(ft, desc, fd);
860 }
861 
862 // upadate the descriptor for an ft and serialize it using
863 // the given descriptor instead of reading the descriptor
864 // from the ft's cachefile. we do this so serialize code can
865 // update a descriptor before the ft is fully opened and has
866 // a valid cachefile.
867 void
toku_ft_update_descriptor_with_fd(FT ft,DESCRIPTOR desc,int fd)868 toku_ft_update_descriptor_with_fd(FT ft, DESCRIPTOR desc, int fd) {
869     // the checksum is four bytes, so that's where the magic number comes from
870     // make space for the new descriptor and write it out to disk
871     DISKOFF offset, size;
872     size = toku_serialize_descriptor_size(desc) + 4;
873     ft->blocktable.realloc_descriptor_on_disk(size, &offset, ft, fd);
874     toku_serialize_descriptor_contents_to_fd(fd, desc, offset);
875 
876     // cleanup the old descriptor and set the in-memory descriptor to the new one
877     toku_destroy_dbt(&ft->descriptor.dbt);
878     toku_clone_dbt(&ft->descriptor.dbt, desc->dbt);
879 }
880 
toku_ft_update_cmp_descriptor(FT ft)881 void toku_ft_update_cmp_descriptor(FT ft) {
882     // cleanup the old cmp descriptor and clone it as the in-memory descriptor
883     toku_destroy_dbt(&ft->cmp_descriptor.dbt);
884     toku_clone_dbt(&ft->cmp_descriptor.dbt, ft->descriptor.dbt);
885 }
886 
toku_ft_get_descriptor(FT_HANDLE ft_handle)887 DESCRIPTOR toku_ft_get_descriptor(FT_HANDLE ft_handle) {
888     return &ft_handle->ft->descriptor;
889 }
890 
toku_ft_get_cmp_descriptor(FT_HANDLE ft_handle)891 DESCRIPTOR toku_ft_get_cmp_descriptor(FT_HANDLE ft_handle) {
892     return &ft_handle->ft->cmp_descriptor;
893 }
894 
toku_ft_update_stats(STAT64INFO headerstats,STAT64INFO_S delta)895 void toku_ft_update_stats(STAT64INFO headerstats, STAT64INFO_S delta) {
896     (void) toku_sync_fetch_and_add(&(headerstats->numrows),  delta.numrows);
897     (void) toku_sync_fetch_and_add(&(headerstats->numbytes), delta.numbytes);
898 }
899 
toku_ft_decrease_stats(STAT64INFO headerstats,STAT64INFO_S delta)900 void toku_ft_decrease_stats(STAT64INFO headerstats, STAT64INFO_S delta) {
901     (void) toku_sync_fetch_and_sub(&(headerstats->numrows),  delta.numrows);
902     (void) toku_sync_fetch_and_sub(&(headerstats->numbytes), delta.numbytes);
903 }
904 
toku_ft_adjust_logical_row_count(FT ft,int64_t delta)905 void toku_ft_adjust_logical_row_count(FT ft, int64_t delta) {
906     // In order to make sure that the correct count is returned from
907     // toku_ft_stat64, the ft->(in_memory|on_disk)_logical_rows _MUST_NOT_ be
908     // modified from anywhere else from here with the exceptions of
909     // serializing in a header, initializing a new header and analyzing
910     // an index for a logical_row count.
911     // The gist is that on an index upgrade, all logical_rows values
912     // in the ft header are set to -1 until an analyze can reset it to an
913     // accurate value. Until then, the physical count from in_memory_stats
914     // must be returned in toku_ft_stat64.
915     if (delta != 0 && ft->in_memory_logical_rows != (uint64_t)-1) {
916         toku_sync_fetch_and_add(&(ft->in_memory_logical_rows), delta);
917         if (ft->in_memory_logical_rows == (uint64_t)-1) {
918             toku_sync_fetch_and_add(&(ft->in_memory_logical_rows), 1);
919         }
920     }
921 }
922 
toku_ft_remove_reference(FT ft,bool oplsn_valid,LSN oplsn,remove_ft_ref_callback remove_ref,void * extra)923 void toku_ft_remove_reference(
924     FT ft,
925     bool oplsn_valid,
926     LSN oplsn,
927     remove_ft_ref_callback remove_ref,
928     void *extra) {
929 
930     toku_ft_grab_reflock(ft);
931     if (toku_ft_has_one_reference_unlocked(ft)) {
932         toku_ft_release_reflock(ft);
933 
934         toku_ft_open_close_lock();
935         toku_ft_grab_reflock(ft);
936 
937         remove_ref(ft, extra);
938         bool needed = toku_ft_needed_unlocked(ft);
939         toku_ft_release_reflock(ft);
940 
941         // if we're running during recovery, we must close the underlying ft.
942         // we know we're running in recovery if we were passed a valid lsn.
943         if (oplsn_valid) {
944             assert(!needed);
945         }
946         if (!needed) {
947             // close header
948             toku_ft_evict_from_memory(ft, oplsn_valid, oplsn);
949         }
950 
951         toku_ft_open_close_unlock();
952     }
953     else {
954         remove_ref(ft, extra);
955         toku_ft_release_reflock(ft);
956     }
957 }
958 
toku_ft_set_nodesize(FT ft,unsigned int nodesize)959 void toku_ft_set_nodesize(FT ft, unsigned int nodesize) {
960     toku_ft_lock(ft);
961     ft->h->nodesize = nodesize;
962     ft->h->set_dirty();
963     toku_ft_unlock(ft);
964 }
965 
toku_ft_get_nodesize(FT ft,unsigned int * nodesize)966 void toku_ft_get_nodesize(FT ft, unsigned int *nodesize) {
967     toku_ft_lock(ft);
968     *nodesize = ft->h->nodesize;
969     toku_ft_unlock(ft);
970 }
971 
toku_ft_set_basementnodesize(FT ft,unsigned int basementnodesize)972 void toku_ft_set_basementnodesize(FT ft, unsigned int basementnodesize) {
973     toku_ft_lock(ft);
974     ft->h->basementnodesize = basementnodesize;
975     ft->h->set_dirty();
976     toku_ft_unlock(ft);
977 }
978 
toku_ft_get_basementnodesize(FT ft,unsigned int * basementnodesize)979 void toku_ft_get_basementnodesize(FT ft, unsigned int *basementnodesize) {
980     toku_ft_lock(ft);
981     *basementnodesize = ft->h->basementnodesize;
982     toku_ft_unlock(ft);
983 }
984 
toku_ft_set_compression_method(FT ft,enum toku_compression_method method)985 void toku_ft_set_compression_method(FT ft, enum toku_compression_method method) {
986     toku_ft_lock(ft);
987     ft->h->compression_method = method;
988     ft->h->set_dirty();
989     toku_ft_unlock(ft);
990 }
991 
toku_ft_get_compression_method(FT ft,enum toku_compression_method * methodp)992 void toku_ft_get_compression_method(FT ft, enum toku_compression_method *methodp) {
993     toku_ft_lock(ft);
994     *methodp = ft->h->compression_method;
995     toku_ft_unlock(ft);
996 }
997 
toku_ft_set_fanout(FT ft,unsigned int fanout)998 void toku_ft_set_fanout(FT ft, unsigned int fanout) {
999     toku_ft_lock(ft);
1000     ft->h->fanout = fanout;
1001     ft->h->set_dirty();
1002     toku_ft_unlock(ft);
1003 }
1004 
toku_ft_get_fanout(FT ft,unsigned int * fanout)1005 void toku_ft_get_fanout(FT ft, unsigned int *fanout) {
1006     toku_ft_lock(ft);
1007     *fanout = ft->h->fanout;
1008     toku_ft_unlock(ft);
1009 }
1010 
1011 // mark the ft as a blackhole. any message injections will be a no op.
toku_ft_set_blackhole(FT_HANDLE ft_handle)1012 void toku_ft_set_blackhole(FT_HANDLE ft_handle) {
1013     ft_handle->ft->blackhole = true;
1014 }
1015 
1016 struct garbage_helper_extra {
1017     FT ft;
1018     size_t total_space;
1019     size_t used_space;
1020 };
1021 
1022 static int
garbage_leafentry_helper(const void * key UU (),const uint32_t keylen,const LEAFENTRY & le,uint32_t UU (idx),struct garbage_helper_extra * const info)1023 garbage_leafentry_helper(const void* key UU(), const uint32_t keylen, const LEAFENTRY & le, uint32_t UU(idx), struct garbage_helper_extra * const info) {
1024     //TODO #warning need to reanalyze for split
1025     info->total_space += leafentry_disksize(le) + keylen + sizeof(keylen);
1026     if (!le_latest_is_del(le)) {
1027         info->used_space += LE_CLEAN_MEMSIZE(le_latest_vallen(le)) + keylen + sizeof(keylen);
1028     }
1029     return 0;
1030 }
1031 
1032 static int
garbage_helper(BLOCKNUM blocknum,int64_t UU (size),int64_t UU (address),void * extra)1033 garbage_helper(BLOCKNUM blocknum, int64_t UU(size), int64_t UU(address), void *extra) {
1034     struct garbage_helper_extra *CAST_FROM_VOIDP(info, extra);
1035     FTNODE node;
1036     FTNODE_DISK_DATA ndd;
1037     ftnode_fetch_extra bfe;
1038     bfe.create_for_full_read(info->ft);
1039     int fd = toku_cachefile_get_fd(info->ft->cf);
1040     int r = toku_deserialize_ftnode_from(fd, blocknum, 0, &node, &ndd, &bfe);
1041     if (r != 0) {
1042         goto no_node;
1043     }
1044     if (node->height > 0) {
1045         goto exit;
1046     }
1047     for (int i = 0; i < node->n_children; ++i) {
1048         bn_data* bd = BLB_DATA(node, i);
1049         r = bd->iterate<struct garbage_helper_extra, garbage_leafentry_helper>(info);
1050         if (r != 0) {
1051             goto exit;
1052         }
1053     }
1054     {
1055         float a = info->used_space, b=info->total_space;
1056         float percentage = (1 - (a / b)) * 100;
1057         printf("LeafNode# %d has %d BasementNodes and %2.1f%% of the allocated space is garbage\n", (int)blocknum.b, node->n_children, percentage);
1058     }
1059 exit:
1060     toku_ftnode_free(&node);
1061     toku_free(ndd);
1062 no_node:
1063     return r;
1064 }
1065 
toku_ft_get_garbage(FT ft,uint64_t * total_space,uint64_t * used_space)1066 void toku_ft_get_garbage(FT ft, uint64_t *total_space, uint64_t *used_space) {
1067 // Effect: Iterates the FT's blocktable and calculates the total and used space for leaf blocks.
1068 // Note: It is ok to call this function concurrently with reads/writes to the table since
1069 //       the blocktable lock is held, which means no new allocations or file writes can occur.
1070     invariant_notnull(total_space);
1071     invariant_notnull(used_space);
1072     struct garbage_helper_extra info = {
1073         .ft = ft,
1074         .total_space = 0,
1075         .used_space = 0
1076     };
1077     ft->blocktable.iterate(block_table::TRANSLATION_CHECKPOINTED, garbage_helper, &info, true, true);
1078     *total_space = info.total_space;
1079     *used_space = info.used_space;
1080 }
1081 
1082 
1083 #if !defined(TOKUDB_REVISION)
1084 #error
1085 #endif
1086 
1087 #define xstr(X) str(X)
1088 #define str(X) #X
1089 #define static_version_string xstr(DB_VERSION_MAJOR) "." \
1090                               xstr(DB_VERSION_MINOR) "." \
1091                               xstr(DB_VERSION_PATCH) " build " \
1092                               xstr(TOKUDB_REVISION)
1093 struct toku_product_name_strings_struct toku_product_name_strings;
1094 
1095 char toku_product_name[TOKU_MAX_PRODUCT_NAME_LENGTH];
tokuft_update_product_name_strings(void)1096 void tokuft_update_product_name_strings(void) {
1097     // DO ALL STRINGS HERE.. maybe have a separate FT layer version as well
1098     {
1099         int n = snprintf(toku_product_name_strings.db_version,
1100                          sizeof(toku_product_name_strings.db_version),
1101                          "%s %s", toku_product_name, static_version_string);
1102         assert(n >= 0);
1103         assert((unsigned)n < sizeof(toku_product_name_strings.db_version));
1104     }
1105     {
1106         int n = snprintf(toku_product_name_strings.fileopsdirectory,
1107                          sizeof(toku_product_name_strings.fileopsdirectory),
1108                          "%s.directory", toku_product_name);
1109         assert(n >= 0);
1110         assert((unsigned)n < sizeof(toku_product_name_strings.fileopsdirectory));
1111     }
1112     {
1113         int n = snprintf(toku_product_name_strings.environmentdictionary,
1114                          sizeof(toku_product_name_strings.environmentdictionary),
1115                          "%s.environment", toku_product_name);
1116         assert(n >= 0);
1117         assert((unsigned)n < sizeof(toku_product_name_strings.environmentdictionary));
1118     }
1119     {
1120         int n = snprintf(toku_product_name_strings.rollback_cachefile,
1121                          sizeof(toku_product_name_strings.rollback_cachefile),
1122                          "%s.rollback", toku_product_name);
1123         assert(n >= 0);
1124         assert((unsigned)n < sizeof(toku_product_name_strings.rollback_cachefile));
1125     }
1126     {
1127         int n = snprintf(toku_product_name_strings.single_process_lock,
1128                          sizeof(toku_product_name_strings.single_process_lock),
1129                          "__%s_lock_dont_delete_me", toku_product_name);
1130         assert(n >= 0);
1131         assert((unsigned)n < sizeof(toku_product_name_strings.single_process_lock));
1132     }
1133 }
1134 #undef xstr
1135 #undef str
1136 
1137 int
toku_single_process_lock(const char * lock_dir,const char * which,int * lockfd)1138 toku_single_process_lock(const char *lock_dir, const char *which, int *lockfd) {
1139     if (!lock_dir)
1140         return ENOENT;
1141     int namelen=strlen(lock_dir)+strlen(which);
1142     char lockfname[namelen+sizeof("/_") + strlen(toku_product_name_strings.single_process_lock)];
1143 
1144     int l = snprintf(lockfname, sizeof(lockfname), "%s/%s_%s",
1145                      lock_dir, toku_product_name_strings.single_process_lock, which);
1146     assert(l+1 == (signed)(sizeof(lockfname)));
1147     *lockfd = toku_os_lock_file(lockfname);
1148     if (*lockfd < 0) {
1149         int e = get_error_errno();
1150         fprintf(stderr, "Couldn't start tokuft because some other tokuft process is using the same directory [%s] for [%s]\n", lock_dir, which);
1151         return e;
1152     }
1153     return 0;
1154 }
1155 
1156 int
toku_single_process_unlock(int * lockfd)1157 toku_single_process_unlock(int *lockfd) {
1158     int fd = *lockfd;
1159     *lockfd = -1;
1160     if (fd>=0) {
1161         int r = toku_os_unlock_file(fd);
1162         if (r != 0)
1163             return get_error_errno();
1164     }
1165     return 0;
1166 }
1167 
1168 int tokuft_num_envs = 0;
1169 int
db_env_set_toku_product_name(const char * name)1170 db_env_set_toku_product_name(const char *name) {
1171     if (tokuft_num_envs > 0) {
1172         return EINVAL;
1173     }
1174     if (!name || strlen(name) < 1) {
1175         return EINVAL;
1176     }
1177     if (strlen(name) >= sizeof(toku_product_name)) {
1178         return ENAMETOOLONG;
1179     }
1180     if (strncmp(toku_product_name, name, sizeof(toku_product_name))) {
1181         strcpy(toku_product_name, name);
1182         tokuft_update_product_name_strings();
1183     }
1184     return 0;
1185 }
1186 
1187