1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22 ----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39 #include "ft/serialize/block_table.h"
40 #include "ft/ft.h"
41 #include "ft/ft-cachetable-wrappers.h"
42 #include "ft/ft-internal.h"
43 #include "ft/logger/log-internal.h"
44 #include "ft/log_header.h"
45 #include "ft/node.h"
46 #include "ft/serialize/ft-serialize.h"
47 #include "ft/serialize/ft_node-serialize.h"
48
49 #include <memory.h>
50 #include <toku_assert.h>
51 #include <portability/toku_atomic.h>
52
53 toku_instr_key *ft_ref_lock_mutex_key;
54
toku_reset_root_xid_that_created(FT ft,TXNID new_root_xid_that_created)55 void toku_reset_root_xid_that_created(FT ft, TXNID new_root_xid_that_created) {
56 // Reset the root_xid_that_created field to the given value.
57 // This redefines which xid created the dictionary.
58
59 // hold lock around setting and clearing of dirty bit
60 // (see cooperative use of dirty bit in ft_begin_checkpoint())
61 toku_ft_lock(ft);
62 ft->h->root_xid_that_created = new_root_xid_that_created;
63 ft->h->set_dirty();
64 toku_ft_unlock(ft);
65 }
66
67 static void
ft_destroy(FT ft)68 ft_destroy(FT ft) {
69 //header and checkpoint_header have same Blocktable pointer
70 //cannot destroy since it is still in use by CURRENT
71 assert(ft->h->type == FT_CURRENT);
72 ft->blocktable.destroy();
73 ft->cmp.destroy();
74 toku_destroy_dbt(&ft->descriptor.dbt);
75 toku_destroy_dbt(&ft->cmp_descriptor.dbt);
76 toku_ft_destroy_reflock(ft);
77 toku_free(ft->h);
78 }
79
80 // Make a copy of the header for the purpose of a checkpoint
81 // Not reentrant for a single FT.
82 // See ft_checkpoint for explanation of why
83 // FT lock must be held.
84 static void
ft_copy_for_checkpoint_unlocked(FT ft,LSN checkpoint_lsn)85 ft_copy_for_checkpoint_unlocked(FT ft, LSN checkpoint_lsn) {
86 assert(ft->h->type == FT_CURRENT);
87 assert(ft->checkpoint_header == NULL);
88
89 FT_HEADER XMEMDUP(ch, ft->h);
90 ch->type = FT_CHECKPOINT_INPROGRESS; //Different type
91 //printf("checkpoint_lsn=%" PRIu64 "\n", checkpoint_lsn.lsn);
92 ch->checkpoint_lsn = checkpoint_lsn;
93
94 //ch->blocktable is SHARED between the two headers
95 ft->checkpoint_header = ch;
96 }
97
98 void
toku_ft_free(FT ft)99 toku_ft_free (FT ft) {
100 ft_destroy(ft);
101 toku_free(ft);
102 }
103
toku_ft_init_reflock(FT ft)104 void toku_ft_init_reflock(FT ft) {
105 toku_mutex_init(*ft_ref_lock_mutex_key, &ft->ft_ref_lock, nullptr);
106 }
107
toku_ft_destroy_reflock(FT ft)108 void toku_ft_destroy_reflock(FT ft) { toku_mutex_destroy(&ft->ft_ref_lock); }
109
110 void
toku_ft_grab_reflock(FT ft)111 toku_ft_grab_reflock(FT ft) {
112 toku_mutex_lock(&ft->ft_ref_lock);
113 }
114
115 void
toku_ft_release_reflock(FT ft)116 toku_ft_release_reflock(FT ft) {
117 toku_mutex_unlock(&ft->ft_ref_lock);
118 }
119
120 /////////////////////////////////////////////////////////////////////////
121 // Start of Functions that are callbacks to the cachefule
122 //
123
124 // maps to cf->log_fassociate_during_checkpoint
125 static void
ft_log_fassociate_during_checkpoint(CACHEFILE cf,void * header_v)126 ft_log_fassociate_during_checkpoint (CACHEFILE cf, void *header_v) {
127 FT ft = (FT) header_v;
128 char* fname_in_env = toku_cachefile_fname_in_env(cf);
129 BYTESTRING bs = { .len = (uint32_t) strlen(fname_in_env), // don't include the NUL
130 .data = fname_in_env };
131 TOKULOGGER logger = toku_cachefile_logger(cf);
132 FILENUM filenum = toku_cachefile_filenum(cf);
133 bool unlink_on_close = toku_cachefile_is_unlink_on_close(cf);
134 toku_log_fassociate(logger, NULL, 0, filenum, ft->h->flags, bs, unlink_on_close);
135 }
136
137 // Maps to cf->begin_checkpoint_userdata
138 // Create checkpoint-in-progress versions of header and translation (btt)
139 // Has access to fd (it is protected).
140 //
141 // Not reentrant for a single FT (see ft_checkpoint)
ft_begin_checkpoint(LSN checkpoint_lsn,void * header_v)142 static void ft_begin_checkpoint (LSN checkpoint_lsn, void *header_v) {
143 FT ft = (FT) header_v;
144 // hold lock around copying and clearing of dirty bit
145 toku_ft_lock (ft);
146 assert(ft->h->type == FT_CURRENT);
147 assert(ft->checkpoint_header == NULL);
148 ft_copy_for_checkpoint_unlocked(ft, checkpoint_lsn);
149 ft->h->clear_dirty(); // this is only place this bit is cleared (in currentheader)
150 ft->blocktable.note_start_checkpoint_unlocked();
151 toku_ft_unlock (ft);
152 }
153
154 // #4922: Hack to remove data corruption race condition.
155 // Reading (and upgrading) a node up to version 19 causes this.
156 // We COULD skip this if we know that no nodes remained (as of last checkpoint)
157 // that are below version 19.
158 // If there are no nodes < version 19 this is harmless (field is unused).
159 // If there are, this will make certain the value is at least as low as necessary,
160 // and not much lower. (Too low is good, too high can cause data corruption).
161 // TODO(yoni): If we ever stop supporting upgrades of nodes < version 19 we can delete this.
162 // TODO(yoni): If we know no nodes are left to upgrade, we can skip this. (Probably not worth doing).
163 static void
ft_hack_highest_unused_msn_for_upgrade_for_checkpoint(FT ft)164 ft_hack_highest_unused_msn_for_upgrade_for_checkpoint(FT ft) {
165 if (ft->h->layout_version_original < FT_LAYOUT_VERSION_19) {
166 ft->checkpoint_header->highest_unused_msn_for_upgrade = ft->h->highest_unused_msn_for_upgrade;
167 }
168 }
169
170 // maps to cf->checkpoint_userdata
171 // Write checkpoint-in-progress versions of header and translation to disk (really to OS internal buffer).
172 // Copy current header's version of checkpoint_staging stat64info to checkpoint header.
173 // Must have access to fd (protected).
174 // Requires: all pending bits are clear. This implies that no thread will modify the checkpoint_staging
175 // version of the stat64info.
176 //
177 // No locks are taken for checkpoint_count/lsn because this is single threaded. Can be called by:
178 // - ft_close
179 // - end_checkpoint
180 // checkpoints hold references to FTs and so they cannot be closed during a checkpoint.
181 // ft_close is not reentrant for a single FT
182 // end_checkpoint is not reentrant period
ft_checkpoint(CACHEFILE cf,int fd,void * header_v)183 static void ft_checkpoint (CACHEFILE cf, int fd, void *header_v) {
184 FT ft = (FT) header_v;
185 FT_HEADER ch = ft->checkpoint_header;
186 assert(ch);
187 assert(ch->type == FT_CHECKPOINT_INPROGRESS);
188 if (ch->dirty()) { // this is only place this bit is tested (in checkpoint_header)
189 TOKULOGGER logger = toku_cachefile_logger(cf);
190 if (logger) {
191 toku_logger_fsync_if_lsn_not_fsynced(logger, ch->checkpoint_lsn);
192 }
193 uint64_t now = (uint64_t) time(NULL);
194 ft->h->time_of_last_modification = now;
195 ch->time_of_last_modification = now;
196 ch->checkpoint_count++;
197 ft_hack_highest_unused_msn_for_upgrade_for_checkpoint(ft);
198 ch->on_disk_logical_rows =
199 ft->h->on_disk_logical_rows = ft->in_memory_logical_rows;
200
201 // write translation and header to disk (or at least to OS internal buffer)
202 toku_serialize_ft_to(fd, ch, &ft->blocktable, ft->cf);
203 ch->clear_dirty(); // this is only place this bit is cleared (in checkpoint_header)
204
205 // fsync the cachefile
206 toku_cachefile_fsync(cf);
207 ft->h->checkpoint_count++; // checkpoint succeeded, next checkpoint will save to alternate header location
208 ft->h->checkpoint_lsn = ch->checkpoint_lsn; //Header updated.
209 } else {
210 ft->blocktable.note_skipped_checkpoint();
211 }
212 }
213
214 // maps to cf->end_checkpoint_userdata
215 // free unused disk space
216 // (i.e. tell BlockAllocator to liberate blocks used by previous checkpoint).
217 // Must have access to fd (protected)
ft_end_checkpoint(CACHEFILE UU (cf),int fd,void * header_v)218 static void ft_end_checkpoint(CACHEFILE UU(cf), int fd, void *header_v) {
219 FT ft = (FT) header_v;
220 assert(ft->h->type == FT_CURRENT);
221 ft->blocktable.note_end_checkpoint(fd);
222 toku_free(ft->checkpoint_header);
223 ft->checkpoint_header = nullptr;
224 }
225
226 // maps to cf->close_userdata
227 // Has access to fd (it is protected).
ft_close(CACHEFILE cachefile,int fd,void * header_v,bool oplsn_valid,LSN oplsn)228 static void ft_close(CACHEFILE cachefile, int fd, void *header_v, bool oplsn_valid, LSN oplsn) {
229 FT ft = (FT) header_v;
230 assert(ft->h->type == FT_CURRENT);
231 // We already have exclusive access to this field already, so skip the locking.
232 // This should already never fail.
233 invariant(!toku_ft_needed_unlocked(ft));
234 assert(ft->cf == cachefile);
235 TOKULOGGER logger = toku_cachefile_logger(cachefile);
236 LSN lsn = ZERO_LSN;
237 //Get LSN
238 if (oplsn_valid) {
239 //Use recovery-specified lsn
240 lsn = oplsn;
241 //Recovery cannot reduce lsn of a header.
242 if (lsn.lsn < ft->h->checkpoint_lsn.lsn) {
243 lsn = ft->h->checkpoint_lsn;
244 }
245 }
246 else {
247 //Get LSN from logger
248 lsn = ZERO_LSN; // if there is no logger, we use zero for the lsn
249 if (logger) {
250 char* fname_in_env = toku_cachefile_fname_in_env(cachefile);
251 assert(fname_in_env);
252 BYTESTRING bs = {.len=(uint32_t) strlen(fname_in_env), .data=fname_in_env};
253 if (!toku_cachefile_is_skip_log_recover_on_close(cachefile)) {
254 toku_log_fclose(
255 logger,
256 &lsn,
257 ft->h->dirty(),
258 bs,
259 toku_cachefile_filenum(cachefile)); // flush the log on
260 // close (if new header
261 // is being written),
262 // otherwise it might
263 // not make it out.
264 toku_cachefile_do_log_recover_on_close(cachefile);
265 }
266 }
267 }
268 if (ft->h->dirty()) { // this is the only place this bit is tested (in currentheader)
269 bool do_checkpoint = true;
270 if (logger && logger->rollback_cachefile == cachefile) {
271 do_checkpoint = false;
272 }
273 if (do_checkpoint) {
274 ft_begin_checkpoint(lsn, header_v);
275 ft_checkpoint(cachefile, fd, ft);
276 ft_end_checkpoint(cachefile, fd, header_v);
277 assert(!ft->h->dirty()); // dirty bit should be cleared by begin_checkpoint and never set again (because we're closing the dictionary)
278 }
279 }
280 }
281
282 // maps to cf->free_userdata
ft_free(CACHEFILE cachefile UU (),void * header_v)283 static void ft_free(CACHEFILE cachefile UU(), void *header_v) {
284 FT ft = (FT) header_v;
285 toku_ft_free(ft);
286 }
287
288 // maps to cf->note_pin_by_checkpoint
289 //Must be protected by ydb lock.
290 //Is only called by checkpoint begin, which holds it
ft_note_pin_by_checkpoint(CACHEFILE UU (cachefile),void * header_v)291 static void ft_note_pin_by_checkpoint (CACHEFILE UU(cachefile), void *header_v) {
292 // Note: open_close lock is held by checkpoint begin
293 FT ft = (FT) header_v;
294 toku_ft_grab_reflock(ft);
295 assert(!ft->pinned_by_checkpoint);
296 assert(toku_ft_needed_unlocked(ft));
297 ft->pinned_by_checkpoint = true;
298 toku_ft_release_reflock(ft);
299 }
300
301 // Requires: the reflock is held.
unpin_by_checkpoint_callback(FT ft,void * extra)302 static void unpin_by_checkpoint_callback(FT ft, void *extra) {
303 invariant(extra == NULL);
304 invariant(ft->pinned_by_checkpoint);
305 ft->pinned_by_checkpoint = false;
306 }
307
308 // maps to cf->note_unpin_by_checkpoint
309 //Must be protected by ydb lock.
310 //Called by end_checkpoint, which grabs ydb lock around note_unpin
ft_note_unpin_by_checkpoint(CACHEFILE UU (cachefile),void * header_v)311 static void ft_note_unpin_by_checkpoint (CACHEFILE UU(cachefile), void *header_v) {
312 FT ft = (FT) header_v;
313 toku_ft_remove_reference(ft, false, ZERO_LSN, unpin_by_checkpoint_callback, NULL);
314 }
315
316 //
317 // End of Functions that are callbacks to the cachefile
318 /////////////////////////////////////////////////////////////////////////
319
setup_initial_ft_root_node(FT ft,BLOCKNUM blocknum)320 static void setup_initial_ft_root_node(FT ft, BLOCKNUM blocknum) {
321 FTNODE XCALLOC(node);
322 toku_initialize_empty_ftnode(node, blocknum, 0, 1, ft->h->layout_version, ft->h->flags);
323 BP_STATE(node,0) = PT_AVAIL;
324
325 uint32_t fullhash = toku_cachetable_hash(ft->cf, blocknum);
326 node->fullhash = fullhash;
327 toku_cachetable_put(ft->cf, blocknum, fullhash,
328 node, make_ftnode_pair_attr(node),
329 get_write_callbacks_for_node(ft),
330 toku_ftnode_save_ct_pair);
331 toku_unpin_ftnode(ft, node);
332 }
333
ft_init(FT ft,FT_OPTIONS options,CACHEFILE cf)334 static void ft_init(FT ft, FT_OPTIONS options, CACHEFILE cf) {
335 // fake, prevent unnecessary upgrade logic
336 ft->layout_version_read_from_disk = FT_LAYOUT_VERSION;
337 ft->checkpoint_header = NULL;
338
339 toku_list_init(&ft->live_ft_handles);
340
341 // intuitively, the comparator points to the FT's cmp descriptor
342 ft->cmp.create(options->compare_fun, &ft->cmp_descriptor, options->memcmp_magic);
343 ft->update_fun = options->update_fun;
344
345 if (ft->cf != NULL) {
346 assert(ft->cf == cf);
347 }
348 ft->cf = cf;
349 ft->in_memory_stats = ZEROSTATS;
350
351 setup_initial_ft_root_node(ft, ft->h->root_blocknum);
352 toku_cachefile_set_userdata(ft->cf,
353 ft,
354 ft_log_fassociate_during_checkpoint,
355 ft_close,
356 ft_free,
357 ft_checkpoint,
358 ft_begin_checkpoint,
359 ft_end_checkpoint,
360 ft_note_pin_by_checkpoint,
361 ft_note_unpin_by_checkpoint);
362
363 ft->blocktable.verify_no_free_blocknums();
364 }
365
366
367 static FT_HEADER
ft_header_create(FT_OPTIONS options,BLOCKNUM root_blocknum,TXNID root_xid_that_created)368 ft_header_create(FT_OPTIONS options, BLOCKNUM root_blocknum, TXNID root_xid_that_created)
369 {
370 uint64_t now = (uint64_t) time(NULL);
371 struct ft_header h = {
372 .type = FT_CURRENT,
373 .dirty_ = 0,
374 .checkpoint_count = 0,
375 .checkpoint_lsn = ZERO_LSN,
376 .layout_version = FT_LAYOUT_VERSION,
377 .layout_version_original = FT_LAYOUT_VERSION,
378 .build_id = BUILD_ID,
379 .build_id_original = BUILD_ID,
380 .time_of_creation = now,
381 .root_xid_that_created = root_xid_that_created,
382 .time_of_last_modification = now,
383 .time_of_last_verification = 0,
384 .root_blocknum = root_blocknum,
385 .flags = options->flags,
386 .nodesize = options->nodesize,
387 .basementnodesize = options->basementnodesize,
388 .compression_method = options->compression_method,
389 .fanout = options->fanout,
390 .highest_unused_msn_for_upgrade = { .msn = (MIN_MSN.msn - 1) },
391 .max_msn_in_ft = ZERO_MSN,
392 .time_of_last_optimize_begin = 0,
393 .time_of_last_optimize_end = 0,
394 .count_of_optimize_in_progress = 0,
395 .count_of_optimize_in_progress_read_from_disk = 0,
396 .msn_at_start_of_last_completed_optimize = ZERO_MSN,
397 .on_disk_stats = ZEROSTATS,
398 .on_disk_logical_rows = 0
399 };
400 return (FT_HEADER) toku_xmemdup(&h, sizeof h);
401 }
402
403 // allocate and initialize a fractal tree.
toku_ft_create(FT * ftp,FT_OPTIONS options,CACHEFILE cf,TOKUTXN txn)404 void toku_ft_create(FT *ftp, FT_OPTIONS options, CACHEFILE cf, TOKUTXN txn) {
405 invariant(ftp);
406
407 FT XCALLOC(ft);
408 ft->h = ft_header_create(options, make_blocknum(0), (txn ? txn->txnid.parent_id64: TXNID_NONE));
409
410 toku_ft_init_reflock(ft);
411
412 // Assign blocknum for root block, also dirty the header
413 ft->blocktable.create();
414 ft->blocktable.allocate_blocknum(&ft->h->root_blocknum, ft);
415
416 ft_init(ft, options, cf);
417
418 *ftp = ft;
419 }
420
421 // TODO: (Zardosht) get rid of ft parameter
toku_read_ft_and_store_in_cachefile(FT_HANDLE ft_handle,CACHEFILE cf,LSN max_acceptable_lsn,FT * header)422 int toku_read_ft_and_store_in_cachefile (FT_HANDLE ft_handle, CACHEFILE cf, LSN max_acceptable_lsn, FT *header)
423 // If the cachefile already has the header, then just get it.
424 // If the cachefile has not been initialized, then don't modify anything.
425 // max_acceptable_lsn is the latest acceptable checkpointed version of the file.
426 {
427 FT ft = nullptr;
428 if ((ft = (FT) toku_cachefile_get_userdata(cf)) != nullptr) {
429 *header = ft;
430 assert(ft_handle->options.update_fun == ft->update_fun);
431 return 0;
432 }
433
434 int fd = toku_cachefile_get_fd(cf);
435 const char *fn = toku_cachefile_fname_in_env(cf);
436 int r = toku_deserialize_ft_from(fd, fn, max_acceptable_lsn, &ft);
437 if (r == TOKUDB_BAD_CHECKSUM) {
438 fprintf(stderr, "Checksum failure while reading header in file %s.\n", toku_cachefile_fname_in_env(cf));
439 assert(false); // make absolutely sure we crash before doing anything else
440 } else if (r != 0) {
441 return r;
442 }
443
444 invariant_notnull(ft);
445 // intuitively, the comparator points to the FT's cmp descriptor
446 ft->cmp.create(ft_handle->options.compare_fun, &ft->cmp_descriptor, ft_handle->options.memcmp_magic);
447 ft->update_fun = ft_handle->options.update_fun;
448 ft->cf = cf;
449 toku_cachefile_set_userdata(cf,
450 reinterpret_cast<void *>(ft),
451 ft_log_fassociate_during_checkpoint,
452 ft_close,
453 ft_free,
454 ft_checkpoint,
455 ft_begin_checkpoint,
456 ft_end_checkpoint,
457 ft_note_pin_by_checkpoint,
458 ft_note_unpin_by_checkpoint);
459 *header = ft;
460 return 0;
461 }
462
463 void
toku_ft_note_ft_handle_open(FT ft,FT_HANDLE live)464 toku_ft_note_ft_handle_open(FT ft, FT_HANDLE live) {
465 toku_ft_grab_reflock(ft);
466 live->ft = ft;
467 toku_list_push(&ft->live_ft_handles, &live->live_ft_handle_link);
468 toku_ft_release_reflock(ft);
469 }
470
471 // the reference count for a ft is the number of txn's that
472 // touched it plus the number of open handles plus one if
473 // pinned by a checkpoint.
474 static int
ft_get_reference_count(FT ft)475 ft_get_reference_count(FT ft) {
476 uint32_t pinned_by_checkpoint = ft->pinned_by_checkpoint ? 1 : 0;
477 int num_handles = toku_list_num_elements_est(&ft->live_ft_handles);
478 return pinned_by_checkpoint + ft->num_txns + num_handles;
479 }
480
481 // a ft is needed in memory iff its reference count is non-zero
482 bool
toku_ft_needed_unlocked(FT ft)483 toku_ft_needed_unlocked(FT ft) {
484 return ft_get_reference_count(ft) != 0;
485 }
486
487 // get the reference count and return true if it was 1
488 bool
toku_ft_has_one_reference_unlocked(FT ft)489 toku_ft_has_one_reference_unlocked(FT ft) {
490 return ft_get_reference_count(ft) == 1;
491 }
492
493 // evict a ft from memory by closing its cachefile. any future work
494 // will have to read in the ft in a new cachefile and new FT object.
toku_ft_evict_from_memory(FT ft,bool oplsn_valid,LSN oplsn)495 void toku_ft_evict_from_memory(FT ft, bool oplsn_valid, LSN oplsn) {
496 assert(ft->cf);
497 toku_cachefile_close(&ft->cf, oplsn_valid, oplsn);
498 }
499
500 // Verifies there exists exactly one ft handle and returns it.
toku_ft_get_only_existing_ft_handle(FT ft)501 FT_HANDLE toku_ft_get_only_existing_ft_handle(FT ft) {
502 FT_HANDLE ft_handle_ret = NULL;
503 toku_ft_grab_reflock(ft);
504 assert(toku_list_num_elements_est(&ft->live_ft_handles) == 1);
505 ft_handle_ret = toku_list_struct(toku_list_head(&ft->live_ft_handles), struct ft_handle, live_ft_handle_link);
506 toku_ft_release_reflock(ft);
507 return ft_handle_ret;
508 }
509
510 // Purpose: set fields in ft_header to capture accountability info for start of HOT optimize.
511 // Note: HOT accountability variables in header are modified only while holding header lock.
512 // (Header lock is really needed for touching the dirty bit, but it's useful and
513 // convenient here for keeping the HOT variables threadsafe.)
514 void
toku_ft_note_hot_begin(FT_HANDLE ft_handle)515 toku_ft_note_hot_begin(FT_HANDLE ft_handle) {
516 FT ft = ft_handle->ft;
517 time_t now = time(NULL);
518
519 // hold lock around setting and clearing of dirty bit
520 // (see cooperative use of dirty bit in ft_begin_checkpoint())
521 toku_ft_lock(ft);
522 ft->h->time_of_last_optimize_begin = now;
523 ft->h->count_of_optimize_in_progress++;
524 ft->h->set_dirty();
525 toku_ft_unlock(ft);
526 }
527
528
529 // Purpose: set fields in ft_header to capture accountability info for end of HOT optimize.
530 // Note: See note for toku_ft_note_hot_begin().
531 void
toku_ft_note_hot_complete(FT_HANDLE ft_handle,bool success,MSN msn_at_start_of_hot)532 toku_ft_note_hot_complete(FT_HANDLE ft_handle, bool success, MSN msn_at_start_of_hot) {
533 FT ft = ft_handle->ft;
534 time_t now = time(NULL);
535
536 toku_ft_lock(ft);
537 ft->h->count_of_optimize_in_progress--;
538 if (success) {
539 ft->h->time_of_last_optimize_end = now;
540 ft->h->msn_at_start_of_last_completed_optimize = msn_at_start_of_hot;
541 // If we just successfully completed an optimization and no other thread is performing
542 // an optimization, then the number of optimizations in progress is zero.
543 // If there was a crash during a HOT optimization, this is how count_of_optimize_in_progress
544 // would be reset to zero on the disk after recovery from that crash.
545 if (ft->h->count_of_optimize_in_progress == ft->h->count_of_optimize_in_progress_read_from_disk)
546 ft->h->count_of_optimize_in_progress = 0;
547 }
548 ft->h->set_dirty();
549 toku_ft_unlock(ft);
550 }
551
552
553 void
toku_ft_init(FT ft,BLOCKNUM root_blocknum_on_disk,LSN checkpoint_lsn,TXNID root_xid_that_created,uint32_t target_nodesize,uint32_t target_basementnodesize,enum toku_compression_method compression_method,uint32_t fanout)554 toku_ft_init(FT ft,
555 BLOCKNUM root_blocknum_on_disk,
556 LSN checkpoint_lsn,
557 TXNID root_xid_that_created,
558 uint32_t target_nodesize,
559 uint32_t target_basementnodesize,
560 enum toku_compression_method compression_method,
561 uint32_t fanout
562 )
563 {
564 memset(ft, 0, sizeof *ft);
565 struct ft_options options = {
566 .nodesize = target_nodesize,
567 .basementnodesize = target_basementnodesize,
568 .compression_method = compression_method,
569 .fanout = fanout,
570 .flags = 0,
571 .memcmp_magic = 0,
572 .compare_fun = NULL,
573 .update_fun = NULL
574 };
575 ft->h = ft_header_create(&options, root_blocknum_on_disk, root_xid_that_created);
576 ft->h->checkpoint_count = 1;
577 ft->h->checkpoint_lsn = checkpoint_lsn;
578 }
579
580 // Open an ft for use by redirect. The new ft must have the same dict_id as the old_ft passed in. (FILENUM is assigned by the ft_handle_open() function.)
581 static int
ft_handle_open_for_redirect(FT_HANDLE * new_ftp,const char * fname_in_env,TOKUTXN txn,FT old_ft)582 ft_handle_open_for_redirect(FT_HANDLE *new_ftp, const char *fname_in_env, TOKUTXN txn, FT old_ft) {
583 FT_HANDLE ft_handle;
584 assert(old_ft->dict_id.dictid != DICTIONARY_ID_NONE.dictid);
585 toku_ft_handle_create(&ft_handle);
586 toku_ft_set_bt_compare(ft_handle, old_ft->cmp.get_compare_func());
587 toku_ft_set_update(ft_handle, old_ft->update_fun);
588 toku_ft_handle_set_nodesize(ft_handle, old_ft->h->nodesize);
589 toku_ft_handle_set_basementnodesize(ft_handle, old_ft->h->basementnodesize);
590 toku_ft_handle_set_compression_method(ft_handle, old_ft->h->compression_method);
591 toku_ft_handle_set_fanout(ft_handle, old_ft->h->fanout);
592 CACHETABLE ct = toku_cachefile_get_cachetable(old_ft->cf);
593 int r = toku_ft_handle_open_with_dict_id(ft_handle, fname_in_env, 0, 0, ct, txn, old_ft->dict_id);
594 if (r != 0) {
595 goto cleanup;
596 }
597 assert(ft_handle->ft->dict_id.dictid == old_ft->dict_id.dictid);
598 *new_ftp = ft_handle;
599
600 cleanup:
601 if (r != 0) {
602 toku_ft_handle_close(ft_handle);
603 }
604 return r;
605 }
606
607 // This function performs most of the work to redirect a dictionary to different file.
608 // It is called for redirect and to abort a redirect. (This function is almost its own inverse.)
609 static int
dictionary_redirect_internal(const char * dst_fname_in_env,FT src_ft,TOKUTXN txn,FT * dst_ftp)610 dictionary_redirect_internal(const char *dst_fname_in_env, FT src_ft, TOKUTXN txn, FT *dst_ftp) {
611 int r;
612
613 FILENUM src_filenum = toku_cachefile_filenum(src_ft->cf);
614 FILENUM dst_filenum = FILENUM_NONE;
615
616 FT dst_ft = NULL;
617 struct toku_list *list;
618 // open a dummy ft based off of
619 // dst_fname_in_env to get the header
620 // then we will change all the ft's to have
621 // their headers point to dst_ft instead of src_ft
622 FT_HANDLE tmp_dst_ft = NULL;
623 r = ft_handle_open_for_redirect(&tmp_dst_ft, dst_fname_in_env, txn, src_ft);
624 if (r != 0) {
625 goto cleanup;
626 }
627 dst_ft = tmp_dst_ft->ft;
628
629 // some sanity checks on dst_filenum
630 dst_filenum = toku_cachefile_filenum(dst_ft->cf);
631 assert(dst_filenum.fileid!=FILENUM_NONE.fileid);
632 assert(dst_filenum.fileid!=src_filenum.fileid); //Cannot be same file.
633
634 // for each live ft_handle, ft_handle->ft is currently src_ft
635 // we want to change it to dummy_dst
636 toku_ft_grab_reflock(src_ft);
637 while (!toku_list_empty(&src_ft->live_ft_handles)) {
638 list = src_ft->live_ft_handles.next;
639 FT_HANDLE src_handle = NULL;
640 src_handle = toku_list_struct(list, struct ft_handle, live_ft_handle_link);
641
642 toku_list_remove(&src_handle->live_ft_handle_link);
643
644 toku_ft_note_ft_handle_open(dst_ft, src_handle);
645 if (src_handle->redirect_callback) {
646 src_handle->redirect_callback(src_handle, src_handle->redirect_callback_extra);
647 }
648 }
649 assert(dst_ft);
650 // making sure that we are not leaking src_ft
651 assert(toku_ft_needed_unlocked(src_ft));
652 toku_ft_release_reflock(src_ft);
653
654 toku_ft_handle_close(tmp_dst_ft);
655
656 *dst_ftp = dst_ft;
657 cleanup:
658 return r;
659 }
660
661
662
663 //This is the 'abort redirect' function. The redirect of old_ft to new_ft was done
664 //and now must be undone, so here we redirect new_ft back to old_ft.
665 int
toku_dictionary_redirect_abort(FT old_ft,FT new_ft,TOKUTXN txn)666 toku_dictionary_redirect_abort(FT old_ft, FT new_ft, TOKUTXN txn) {
667 char *old_fname_in_env = toku_cachefile_fname_in_env(old_ft->cf);
668 int r;
669 {
670 FILENUM old_filenum = toku_cachefile_filenum(old_ft->cf);
671 FILENUM new_filenum = toku_cachefile_filenum(new_ft->cf);
672 assert(old_filenum.fileid!=new_filenum.fileid); //Cannot be same file.
673
674 //No living fts in old header.
675 toku_ft_grab_reflock(old_ft);
676 assert(toku_list_empty(&old_ft->live_ft_handles));
677 toku_ft_release_reflock(old_ft);
678 }
679
680 FT dst_ft;
681 // redirect back from new_ft to old_ft
682 r = dictionary_redirect_internal(old_fname_in_env, new_ft, txn, &dst_ft);
683 if (r == 0) {
684 assert(dst_ft == old_ft);
685 }
686 return r;
687 }
688
689 /****
690 * on redirect or abort:
691 * if redirect txn_note_doing_work(txn)
692 * if redirect connect src ft to txn (txn modified this ft)
693 * for each src ft
694 * open ft to dst file (create new ft struct)
695 * if redirect connect dst ft to txn
696 * redirect db to new ft
697 * redirect cursors to new ft
698 * close all src fts
699 * if redirect make rollback log entry
700 *
701 * on commit:
702 * nothing to do
703 *
704 *****/
705
706 int
toku_dictionary_redirect(const char * dst_fname_in_env,FT_HANDLE old_ft_h,TOKUTXN txn)707 toku_dictionary_redirect (const char *dst_fname_in_env, FT_HANDLE old_ft_h, TOKUTXN txn) {
708 // Input args:
709 // new file name for dictionary (relative to env)
710 // old_ft_h is a live ft of open handle ({DB, FT_HANDLE} pair) that currently refers to old dictionary file.
711 // (old_ft_h may be one of many handles to the dictionary.)
712 // txn that created the loader
713 // Requires:
714 // multi operation lock is held.
715 // The ft is open. (which implies there can be no zombies.)
716 // The new file must be a valid dictionary.
717 // The block size and flags in the new file must match the existing FT.
718 // The new file must already have its descriptor in it (and it must match the existing descriptor).
719 // Effect:
720 // Open new FTs (and related header and cachefile) to the new dictionary file with a new FILENUM.
721 // Redirect all DBs that point to fts that point to the old file to point to fts that point to the new file.
722 // Copy the dictionary id (dict_id) from the header of the original file to the header of the new file.
723 // Create a rollback log entry.
724 // The original FT, header, cachefile and file remain unchanged. They will be cleaned up on commmit.
725 // If the txn aborts, then this operation will be undone
726 int r;
727
728 FT old_ft = old_ft_h->ft;
729
730 // dst file should not be open. (implies that dst and src are different because src must be open.)
731 {
732 CACHETABLE ct = toku_cachefile_get_cachetable(old_ft->cf);
733 CACHEFILE cf;
734 r = toku_cachefile_of_iname_in_env(ct, dst_fname_in_env, &cf);
735 if (r==0) {
736 r = EINVAL;
737 goto cleanup;
738 }
739 assert(r==ENOENT);
740 r = 0;
741 }
742
743 if (txn) {
744 toku_txn_maybe_note_ft(txn, old_ft); // mark old ft as touched by this txn
745 }
746
747 FT new_ft;
748 r = dictionary_redirect_internal(dst_fname_in_env, old_ft, txn, &new_ft);
749 if (r != 0) {
750 goto cleanup;
751 }
752
753 // make rollback log entry
754 if (txn) {
755 toku_txn_maybe_note_ft(txn, new_ft); // mark new ft as touched by this txn
756
757 // There is no recovery log entry for redirect,
758 // and rollback log entries are not allowed for read-only transactions.
759 // Normally the recovery log entry would ensure the begin was logged.
760 if (!txn->begin_was_logged) {
761 toku_maybe_log_begin_txn_for_write_operation(txn);
762 }
763 FILENUM old_filenum = toku_cachefile_filenum(old_ft->cf);
764 FILENUM new_filenum = toku_cachefile_filenum(new_ft->cf);
765 toku_logger_save_rollback_dictionary_redirect(txn, old_filenum, new_filenum);
766 }
767
768 cleanup:
769 return r;
770 }
771
772 // Insert reference to transaction into ft
773 void
toku_ft_add_txn_ref(FT ft)774 toku_ft_add_txn_ref(FT ft) {
775 toku_ft_grab_reflock(ft);
776 ++ft->num_txns;
777 toku_ft_release_reflock(ft);
778 }
779
780 static void
remove_txn_ref_callback(FT ft,void * UU (context))781 remove_txn_ref_callback(FT ft, void *UU(context)) {
782 invariant(ft->num_txns > 0);
783 --ft->num_txns;
784 }
785
786 void
toku_ft_remove_txn_ref(FT ft)787 toku_ft_remove_txn_ref(FT ft) {
788 toku_ft_remove_reference(ft, false, ZERO_LSN, remove_txn_ref_callback, NULL);
789 }
790
toku_calculate_root_offset_pointer(FT ft,CACHEKEY * root_key,uint32_t * roothash)791 void toku_calculate_root_offset_pointer (
792 FT ft,
793 CACHEKEY* root_key,
794 uint32_t *roothash
795 )
796 {
797 *roothash = toku_cachetable_hash(ft->cf, ft->h->root_blocknum);
798 *root_key = ft->h->root_blocknum;
799 }
800
toku_ft_set_new_root_blocknum(FT ft,CACHEKEY new_root_key)801 void toku_ft_set_new_root_blocknum(
802 FT ft,
803 CACHEKEY new_root_key
804 )
805 {
806 ft->h->root_blocknum = new_root_key;
807 }
808
toku_ft_checkpoint_lsn(FT ft)809 LSN toku_ft_checkpoint_lsn(FT ft) {
810 return ft->h->checkpoint_lsn;
811 }
812
813 void
toku_ft_stat64(FT ft,struct ftstat64_s * s)814 toku_ft_stat64 (FT ft, struct ftstat64_s *s) {
815 s->fsize = toku_cachefile_size(ft->cf);
816 // just use the in memory stats from the header
817 // prevent appearance of negative numbers for numrows, numbytes
818 // if the logical count was never properly re-counted on an upgrade,
819 // return the existing physical count instead.
820 int64_t n;
821 if (ft->in_memory_logical_rows == (uint64_t)-1) {
822 n = ft->in_memory_stats.numrows;
823 } else {
824 n = ft->in_memory_logical_rows;
825 }
826 if (n < 0) {
827 n = 0;
828 }
829 s->nkeys = s->ndata = n;
830 n = ft->in_memory_stats.numbytes;
831 if (n < 0) {
832 n = 0;
833 }
834 s->dsize = n;
835 s->create_time_sec = ft->h->time_of_creation;
836 s->modify_time_sec = ft->h->time_of_last_modification;
837 s->verify_time_sec = ft->h->time_of_last_verification;
838 }
839
toku_ft_get_fractal_tree_info64(FT ft,struct ftinfo64 * info)840 void toku_ft_get_fractal_tree_info64(FT ft, struct ftinfo64 *info) {
841 ft->blocktable.get_info64(info);
842 }
843
toku_ft_iterate_fractal_tree_block_map(FT ft,int (* iter)(uint64_t,int64_t,int64_t,int64_t,int64_t,void *),void * iter_extra)844 int toku_ft_iterate_fractal_tree_block_map(FT ft, int (*iter)(uint64_t,int64_t,int64_t,int64_t,int64_t,void*), void *iter_extra) {
845 uint64_t this_checkpoint_count = ft->h->checkpoint_count;
846 return ft->blocktable.iterate_translation_tables(this_checkpoint_count, iter, iter_extra);
847 }
848
849 void
toku_ft_update_descriptor(FT ft,DESCRIPTOR desc)850 toku_ft_update_descriptor(FT ft, DESCRIPTOR desc)
851 // Effect: Changes the descriptor in a tree (log the change, make sure it makes it to disk eventually).
852 // requires: the ft is fully user-opened with a valid cachefile.
853 // descriptor updates cannot happen in parallel for an FT
854 // (ydb layer uses a row lock to enforce this)
855 {
856 assert(ft->cf);
857 int fd = toku_cachefile_get_fd(ft->cf);
858 toku_ft_update_descriptor_with_fd(ft, desc, fd);
859 }
860
861 // upadate the descriptor for an ft and serialize it using
862 // the given descriptor instead of reading the descriptor
863 // from the ft's cachefile. we do this so serialize code can
864 // update a descriptor before the ft is fully opened and has
865 // a valid cachefile.
866 void
toku_ft_update_descriptor_with_fd(FT ft,DESCRIPTOR desc,int fd)867 toku_ft_update_descriptor_with_fd(FT ft, DESCRIPTOR desc, int fd) {
868 // the checksum is four bytes, so that's where the magic number comes from
869 // make space for the new descriptor and write it out to disk
870 DISKOFF offset, size;
871 size = toku_serialize_descriptor_size(desc) + 4;
872 ft->blocktable.realloc_descriptor_on_disk(size, &offset, ft, fd);
873 toku_serialize_descriptor_contents_to_fd(fd, desc, offset);
874
875 // cleanup the old descriptor and set the in-memory descriptor to the new one
876 toku_destroy_dbt(&ft->descriptor.dbt);
877 toku_clone_dbt(&ft->descriptor.dbt, desc->dbt);
878 }
879
toku_ft_update_cmp_descriptor(FT ft)880 void toku_ft_update_cmp_descriptor(FT ft) {
881 // cleanup the old cmp descriptor and clone it as the in-memory descriptor
882 toku_destroy_dbt(&ft->cmp_descriptor.dbt);
883 toku_clone_dbt(&ft->cmp_descriptor.dbt, ft->descriptor.dbt);
884 }
885
toku_ft_get_descriptor(FT_HANDLE ft_handle)886 DESCRIPTOR toku_ft_get_descriptor(FT_HANDLE ft_handle) {
887 return &ft_handle->ft->descriptor;
888 }
889
toku_ft_get_cmp_descriptor(FT_HANDLE ft_handle)890 DESCRIPTOR toku_ft_get_cmp_descriptor(FT_HANDLE ft_handle) {
891 return &ft_handle->ft->cmp_descriptor;
892 }
893
toku_ft_update_stats(STAT64INFO headerstats,STAT64INFO_S delta)894 void toku_ft_update_stats(STAT64INFO headerstats, STAT64INFO_S delta) {
895 (void) toku_sync_fetch_and_add(&(headerstats->numrows), delta.numrows);
896 (void) toku_sync_fetch_and_add(&(headerstats->numbytes), delta.numbytes);
897 }
898
toku_ft_decrease_stats(STAT64INFO headerstats,STAT64INFO_S delta)899 void toku_ft_decrease_stats(STAT64INFO headerstats, STAT64INFO_S delta) {
900 (void) toku_sync_fetch_and_sub(&(headerstats->numrows), delta.numrows);
901 (void) toku_sync_fetch_and_sub(&(headerstats->numbytes), delta.numbytes);
902 }
903
toku_ft_adjust_logical_row_count(FT ft,int64_t delta)904 void toku_ft_adjust_logical_row_count(FT ft, int64_t delta) {
905 // In order to make sure that the correct count is returned from
906 // toku_ft_stat64, the ft->(in_memory|on_disk)_logical_rows _MUST_NOT_ be
907 // modified from anywhere else from here with the exceptions of
908 // serializing in a header, initializing a new header and analyzing
909 // an index for a logical_row count.
910 // The gist is that on an index upgrade, all logical_rows values
911 // in the ft header are set to -1 until an analyze can reset it to an
912 // accurate value. Until then, the physical count from in_memory_stats
913 // must be returned in toku_ft_stat64.
914 if (delta != 0 && ft->in_memory_logical_rows != (uint64_t)-1) {
915 toku_sync_fetch_and_add(&(ft->in_memory_logical_rows), delta);
916 if (ft->in_memory_logical_rows == (uint64_t)-1) {
917 toku_sync_fetch_and_add(&(ft->in_memory_logical_rows), 1);
918 }
919 }
920 }
921
toku_ft_remove_reference(FT ft,bool oplsn_valid,LSN oplsn,remove_ft_ref_callback remove_ref,void * extra)922 void toku_ft_remove_reference(
923 FT ft,
924 bool oplsn_valid,
925 LSN oplsn,
926 remove_ft_ref_callback remove_ref,
927 void *extra) {
928
929 toku_ft_grab_reflock(ft);
930 if (toku_ft_has_one_reference_unlocked(ft)) {
931 toku_ft_release_reflock(ft);
932
933 toku_ft_open_close_lock();
934 toku_ft_grab_reflock(ft);
935
936 remove_ref(ft, extra);
937 bool needed = toku_ft_needed_unlocked(ft);
938 toku_ft_release_reflock(ft);
939
940 // if we're running during recovery, we must close the underlying ft.
941 // we know we're running in recovery if we were passed a valid lsn.
942 if (oplsn_valid) {
943 assert(!needed);
944 }
945 if (!needed) {
946 // close header
947 toku_ft_evict_from_memory(ft, oplsn_valid, oplsn);
948 }
949
950 toku_ft_open_close_unlock();
951 }
952 else {
953 remove_ref(ft, extra);
954 toku_ft_release_reflock(ft);
955 }
956 }
957
toku_ft_set_nodesize(FT ft,unsigned int nodesize)958 void toku_ft_set_nodesize(FT ft, unsigned int nodesize) {
959 toku_ft_lock(ft);
960 ft->h->nodesize = nodesize;
961 ft->h->set_dirty();
962 toku_ft_unlock(ft);
963 }
964
toku_ft_get_nodesize(FT ft,unsigned int * nodesize)965 void toku_ft_get_nodesize(FT ft, unsigned int *nodesize) {
966 toku_ft_lock(ft);
967 *nodesize = ft->h->nodesize;
968 toku_ft_unlock(ft);
969 }
970
toku_ft_set_basementnodesize(FT ft,unsigned int basementnodesize)971 void toku_ft_set_basementnodesize(FT ft, unsigned int basementnodesize) {
972 toku_ft_lock(ft);
973 ft->h->basementnodesize = basementnodesize;
974 ft->h->set_dirty();
975 toku_ft_unlock(ft);
976 }
977
toku_ft_get_basementnodesize(FT ft,unsigned int * basementnodesize)978 void toku_ft_get_basementnodesize(FT ft, unsigned int *basementnodesize) {
979 toku_ft_lock(ft);
980 *basementnodesize = ft->h->basementnodesize;
981 toku_ft_unlock(ft);
982 }
983
toku_ft_set_compression_method(FT ft,enum toku_compression_method method)984 void toku_ft_set_compression_method(FT ft, enum toku_compression_method method) {
985 toku_ft_lock(ft);
986 ft->h->compression_method = method;
987 ft->h->set_dirty();
988 toku_ft_unlock(ft);
989 }
990
toku_ft_get_compression_method(FT ft,enum toku_compression_method * methodp)991 void toku_ft_get_compression_method(FT ft, enum toku_compression_method *methodp) {
992 toku_ft_lock(ft);
993 *methodp = ft->h->compression_method;
994 toku_ft_unlock(ft);
995 }
996
toku_ft_set_fanout(FT ft,unsigned int fanout)997 void toku_ft_set_fanout(FT ft, unsigned int fanout) {
998 toku_ft_lock(ft);
999 ft->h->fanout = fanout;
1000 ft->h->set_dirty();
1001 toku_ft_unlock(ft);
1002 }
1003
toku_ft_get_fanout(FT ft,unsigned int * fanout)1004 void toku_ft_get_fanout(FT ft, unsigned int *fanout) {
1005 toku_ft_lock(ft);
1006 *fanout = ft->h->fanout;
1007 toku_ft_unlock(ft);
1008 }
1009
1010 // mark the ft as a blackhole. any message injections will be a no op.
toku_ft_set_blackhole(FT_HANDLE ft_handle)1011 void toku_ft_set_blackhole(FT_HANDLE ft_handle) {
1012 ft_handle->ft->blackhole = true;
1013 }
1014
1015 struct garbage_helper_extra {
1016 FT ft;
1017 size_t total_space;
1018 size_t used_space;
1019 };
1020
1021 static int
garbage_leafentry_helper(const void * key UU (),const uint32_t keylen,const LEAFENTRY & le,uint32_t UU (idx),struct garbage_helper_extra * const info)1022 garbage_leafentry_helper(const void* key UU(), const uint32_t keylen, const LEAFENTRY & le, uint32_t UU(idx), struct garbage_helper_extra * const info) {
1023 //TODO #warning need to reanalyze for split
1024 info->total_space += leafentry_disksize(le) + keylen + sizeof(keylen);
1025 if (!le_latest_is_del(le)) {
1026 info->used_space += LE_CLEAN_MEMSIZE(le_latest_vallen(le)) + keylen + sizeof(keylen);
1027 }
1028 return 0;
1029 }
1030
1031 static int
garbage_helper(BLOCKNUM blocknum,int64_t UU (size),int64_t UU (address),void * extra)1032 garbage_helper(BLOCKNUM blocknum, int64_t UU(size), int64_t UU(address), void *extra) {
1033 struct garbage_helper_extra *CAST_FROM_VOIDP(info, extra);
1034 FTNODE node;
1035 FTNODE_DISK_DATA ndd;
1036 ftnode_fetch_extra bfe;
1037 bfe.create_for_full_read(info->ft);
1038 int fd = toku_cachefile_get_fd(info->ft->cf);
1039 int r = toku_deserialize_ftnode_from(fd, blocknum, 0, &node, &ndd, &bfe);
1040 if (r != 0) {
1041 goto no_node;
1042 }
1043 if (node->height > 0) {
1044 goto exit;
1045 }
1046 for (int i = 0; i < node->n_children; ++i) {
1047 bn_data* bd = BLB_DATA(node, i);
1048 r = bd->iterate<struct garbage_helper_extra, garbage_leafentry_helper>(info);
1049 if (r != 0) {
1050 goto exit;
1051 }
1052 }
1053 {
1054 float a = info->used_space, b=info->total_space;
1055 float percentage = (1 - (a / b)) * 100;
1056 printf("LeafNode# %d has %d BasementNodes and %2.1f%% of the allocated space is garbage\n", (int)blocknum.b, node->n_children, percentage);
1057 }
1058 exit:
1059 toku_ftnode_free(&node);
1060 toku_free(ndd);
1061 no_node:
1062 return r;
1063 }
1064
toku_ft_get_garbage(FT ft,uint64_t * total_space,uint64_t * used_space)1065 void toku_ft_get_garbage(FT ft, uint64_t *total_space, uint64_t *used_space) {
1066 // Effect: Iterates the FT's blocktable and calculates the total and used space for leaf blocks.
1067 // Note: It is ok to call this function concurrently with reads/writes to the table since
1068 // the blocktable lock is held, which means no new allocations or file writes can occur.
1069 invariant_notnull(total_space);
1070 invariant_notnull(used_space);
1071 struct garbage_helper_extra info = {
1072 .ft = ft,
1073 .total_space = 0,
1074 .used_space = 0
1075 };
1076 ft->blocktable.iterate(block_table::TRANSLATION_CHECKPOINTED, garbage_helper, &info, true, true);
1077 *total_space = info.total_space;
1078 *used_space = info.used_space;
1079 }
1080
1081
1082 #if !defined(TOKUDB_REVISION)
1083 #error
1084 #endif
1085
1086 #define xstr(X) str(X)
1087 #define str(X) #X
1088 #define static_version_string xstr(DB_VERSION_MAJOR) "." \
1089 xstr(DB_VERSION_MINOR) "." \
1090 xstr(DB_VERSION_PATCH) " build " \
1091 xstr(TOKUDB_REVISION)
1092 struct toku_product_name_strings_struct toku_product_name_strings;
1093
1094 char toku_product_name[TOKU_MAX_PRODUCT_NAME_LENGTH];
tokuft_update_product_name_strings(void)1095 void tokuft_update_product_name_strings(void) {
1096 // DO ALL STRINGS HERE.. maybe have a separate FT layer version as well
1097 {
1098 int n = snprintf(toku_product_name_strings.db_version,
1099 sizeof(toku_product_name_strings.db_version),
1100 "%s %s", toku_product_name, static_version_string);
1101 assert(n >= 0);
1102 assert((unsigned)n < sizeof(toku_product_name_strings.db_version));
1103 }
1104 {
1105 int n = snprintf(toku_product_name_strings.fileopsdirectory,
1106 sizeof(toku_product_name_strings.fileopsdirectory),
1107 "%s.directory", toku_product_name);
1108 assert(n >= 0);
1109 assert((unsigned)n < sizeof(toku_product_name_strings.fileopsdirectory));
1110 }
1111 {
1112 int n = snprintf(toku_product_name_strings.environmentdictionary,
1113 sizeof(toku_product_name_strings.environmentdictionary),
1114 "%s.environment", toku_product_name);
1115 assert(n >= 0);
1116 assert((unsigned)n < sizeof(toku_product_name_strings.environmentdictionary));
1117 }
1118 {
1119 int n = snprintf(toku_product_name_strings.rollback_cachefile,
1120 sizeof(toku_product_name_strings.rollback_cachefile),
1121 "%s.rollback", toku_product_name);
1122 assert(n >= 0);
1123 assert((unsigned)n < sizeof(toku_product_name_strings.rollback_cachefile));
1124 }
1125 {
1126 int n = snprintf(toku_product_name_strings.single_process_lock,
1127 sizeof(toku_product_name_strings.single_process_lock),
1128 "__%s_lock_dont_delete_me", toku_product_name);
1129 assert(n >= 0);
1130 assert((unsigned)n < sizeof(toku_product_name_strings.single_process_lock));
1131 }
1132 }
1133 #undef xstr
1134 #undef str
1135
1136 int
toku_single_process_lock(const char * lock_dir,const char * which,int * lockfd)1137 toku_single_process_lock(const char *lock_dir, const char *which, int *lockfd) {
1138 if (!lock_dir)
1139 return ENOENT;
1140 int namelen=strlen(lock_dir)+strlen(which);
1141 char lockfname[namelen+sizeof("/_") + strlen(toku_product_name_strings.single_process_lock)];
1142
1143 int l = snprintf(lockfname, sizeof(lockfname), "%s/%s_%s",
1144 lock_dir, toku_product_name_strings.single_process_lock, which);
1145 assert(l+1 == (signed)(sizeof(lockfname)));
1146 *lockfd = toku_os_lock_file(lockfname);
1147 if (*lockfd < 0) {
1148 int e = get_error_errno();
1149 fprintf(stderr, "Couldn't start tokuft because some other tokuft process is using the same directory [%s] for [%s]\n", lock_dir, which);
1150 return e;
1151 }
1152 return 0;
1153 }
1154
1155 int
toku_single_process_unlock(int * lockfd)1156 toku_single_process_unlock(int *lockfd) {
1157 int fd = *lockfd;
1158 *lockfd = -1;
1159 if (fd>=0) {
1160 int r = toku_os_unlock_file(fd);
1161 if (r != 0)
1162 return get_error_errno();
1163 }
1164 return 0;
1165 }
1166
1167 int tokuft_num_envs = 0;
1168 int
db_env_set_toku_product_name(const char * name)1169 db_env_set_toku_product_name(const char *name) {
1170 if (tokuft_num_envs > 0) {
1171 return EINVAL;
1172 }
1173 if (!name || strlen(name) < 1) {
1174 return EINVAL;
1175 }
1176 if (strlen(name) >= sizeof(toku_product_name)) {
1177 return ENAMETOOLONG;
1178 }
1179 if (strncmp(toku_product_name, name, sizeof(toku_product_name))) {
1180 strcpy(toku_product_name, name);
1181 tokuft_update_product_name_strings();
1182 }
1183 return 0;
1184 }
1185
1186