1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ 2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: 3 #ident "$Id$" 4 /*====== 5 This file is part of PerconaFT. 6 7 8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. 9 10 PerconaFT is free software: you can redistribute it and/or modify 11 it under the terms of the GNU General Public License, version 2, 12 as published by the Free Software Foundation. 13 14 PerconaFT is distributed in the hope that it will be useful, 15 but WITHOUT ANY WARRANTY; without even the implied warranty of 16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 17 GNU General Public License for more details. 18 19 You should have received a copy of the GNU General Public License 20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. 21 22 ---------------------------------------- 23 24 PerconaFT is free software: you can redistribute it and/or modify 25 it under the terms of the GNU Affero General Public License, version 3, 26 as published by the Free Software Foundation. 27 28 PerconaFT is distributed in the hope that it will be useful, 29 but WITHOUT ANY WARRANTY; without even the implied warranty of 30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 31 GNU Affero General Public License for more details. 32 33 You should have received a copy of the GNU Affero General Public License 34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. 35 ======= */ 36 37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." 38 39 #include <my_global.h> 40 #include <memory.h> 41 #include <ctype.h> 42 #include <limits.h> 43 #include <unistd.h> 44 45 #include "ft/serialize/block_table.h" 46 #include "ft/ft.h" 47 #include "ft/logger/log-internal.h" 48 #include "ft/txn/txn_manager.h" 49 #include "ft/txn/rollback_log_node_cache.h" 50 51 #include "util/status.h" 52 53 int writing_rollback = 0; 54 extern "C" { 55 uint force_recovery = 0; 56 } 57 58 static const int log_format_version = TOKU_LOG_VERSION; 59 60 toku_instr_key *result_output_condition_lock_mutex_key; 61 toku_instr_key *result_output_condition_key; 62 toku_instr_key *tokudb_file_log_key; 63 64 static int open_logfile(TOKULOGGER logger); 65 static void logger_write_buffer(TOKULOGGER logger, LSN *fsynced_lsn); 66 static void delete_logfile(TOKULOGGER logger, 67 long long index, 68 uint32_t version); 69 static void grab_output(TOKULOGGER logger, LSN *fsynced_lsn); 70 static void release_output(TOKULOGGER logger, LSN fsynced_lsn); 71 72 static void toku_print_bytes (FILE *outf, uint32_t len, char *data) { 73 fprintf(outf, "\""); 74 uint32_t i; 75 for (i=0; i<len; i++) { 76 switch (data[i]) { 77 case '"': fprintf(outf, "\\\""); break; 78 case '\\': fprintf(outf, "\\\\"); break; 79 case '\n': fprintf(outf, "\\n"); break; 80 default: 81 if (isprint(data[i])) fprintf(outf, "%c", data[i]); 82 else fprintf(outf, "\\%03o", (unsigned char)(data[i])); 83 } 84 } 85 fprintf(outf, "\""); 86 } 87 88 static bool is_a_logfile_any_version (const char *name, uint64_t *number_result, uint32_t *version_of_log) { 89 bool rval = true; 90 uint64_t result; 91 int n; 92 int r; 93 uint32_t version; 94 r = sscanf(name, "log%" SCNu64 ".tokulog%" SCNu32 "%n", &result, &version, &n); 95 if (r!=2 || name[n]!='\0' || version <= TOKU_LOG_VERSION_1) { 96 //Version 1 does NOT append 'version' to end of '.tokulog' 97 version = TOKU_LOG_VERSION_1; 98 r = sscanf(name, "log%" SCNu64 ".tokulog%n", &result, &n); 99 if (r!=1 || name[n]!='\0') { 100 rval = false; 101 } 102 } 103 if (rval) { 104 *number_result = result; 105 *version_of_log = version; 106 } 107 108 return rval; 109 } 110 111 // added for #2424, improved for #2521 112 static bool is_a_logfile (const char *name, long long *number_result) { 113 bool rval; 114 uint64_t result; 115 uint32_t version; 116 rval = is_a_logfile_any_version(name, &result, &version); 117 if (rval && version != TOKU_LOG_VERSION) 118 rval = false; 119 if (rval) 120 *number_result = result; 121 return rval; 122 } 123 124 125 // TODO: can't fail 126 int toku_logger_create (TOKULOGGER *resultp) { 127 TOKULOGGER CALLOC(result); 128 if (result==0) return get_error_errno(); 129 result->is_open=false; 130 result->write_log_files = true; 131 result->trim_log_files = true; 132 result->directory=0; 133 // fd is uninitialized on purpose 134 // ct is uninitialized on purpose 135 result->lg_max = 100<<20; // 100MB default 136 // lsn is uninitialized 137 result->inbuf = (struct logbuf) {0, LOGGER_MIN_BUF_SIZE, (char *) toku_xmalloc(LOGGER_MIN_BUF_SIZE), ZERO_LSN}; 138 result->outbuf = (struct logbuf) {0, LOGGER_MIN_BUF_SIZE, (char *) toku_xmalloc(LOGGER_MIN_BUF_SIZE), ZERO_LSN}; 139 // written_lsn is uninitialized 140 // fsynced_lsn is uninitialized 141 result->last_completed_checkpoint_lsn = ZERO_LSN; 142 // next_log_file_number is uninitialized 143 // n_in_file is uninitialized 144 result->write_block_size = FT_DEFAULT_NODE_SIZE; // default logging size is the same as the default ft block size 145 toku_logfilemgr_create(&result->logfilemgr); 146 *resultp = result; 147 ml_init(&result->input_lock); 148 toku_mutex_init(*result_output_condition_lock_mutex_key, 149 &result->output_condition_lock, 150 nullptr); 151 toku_cond_init( 152 *result_output_condition_key, &result->output_condition, nullptr); 153 result->rollback_cachefile = NULL; 154 result->output_is_available = true; 155 toku_txn_manager_init(&result->txn_manager); 156 return 0; 157 } 158 159 static void fsync_logdir(TOKULOGGER logger) { 160 toku_fsync_dirfd_without_accounting(logger->dir); 161 } 162 163 static int open_logdir(TOKULOGGER logger, const char *directory) { 164 if (toku_os_is_absolute_name(directory)) { 165 logger->directory = toku_strdup(directory); 166 } else { 167 char cwdbuf[PATH_MAX]; 168 char *cwd = getcwd(cwdbuf, PATH_MAX); 169 if (cwd == NULL) 170 return -1; 171 char *MALLOC_N(strlen(cwd) + strlen(directory) + 2, new_log_dir); 172 if (new_log_dir == NULL) { 173 return -2; 174 } 175 sprintf(new_log_dir, "%s/%s", cwd, directory); 176 logger->directory = new_log_dir; 177 } 178 if (logger->directory==0) return get_error_errno(); 179 180 logger->dir = opendir(logger->directory); 181 if ( logger->dir == NULL ) return -1; 182 return 0; 183 } 184 185 static int close_logdir(TOKULOGGER logger) { 186 return closedir(logger->dir); 187 } 188 189 int 190 toku_logger_open_with_last_xid(const char *directory, TOKULOGGER logger, TXNID last_xid) { 191 if (logger->is_open) return EINVAL; 192 193 int r; 194 TXNID last_xid_if_clean_shutdown = TXNID_NONE; 195 r = toku_logfilemgr_init(logger->logfilemgr, directory, &last_xid_if_clean_shutdown); 196 if ( r!=0 ) 197 return r; 198 logger->lsn = toku_logfilemgr_get_last_lsn(logger->logfilemgr); 199 logger->written_lsn = logger->lsn; 200 logger->fsynced_lsn = logger->lsn; 201 logger->inbuf.max_lsn_in_buf = logger->lsn; 202 logger->outbuf.max_lsn_in_buf = logger->lsn; 203 204 // open directory, save pointer for fsyncing t:2445 205 r = open_logdir(logger, directory); 206 if (r!=0) return r; 207 208 long long nexti; 209 r = toku_logger_find_next_unused_log_file(logger->directory, &nexti); 210 if (r!=0) return r; 211 212 logger->next_log_file_number = nexti; 213 r = open_logfile(logger); 214 if (r!=0) return r; 215 if (last_xid == TXNID_NONE) { 216 last_xid = last_xid_if_clean_shutdown; 217 } 218 toku_txn_manager_set_last_xid_from_logger(logger->txn_manager, last_xid); 219 220 logger->is_open = true; 221 return 0; 222 } 223 224 int toku_logger_open (const char *directory, TOKULOGGER logger) { 225 return toku_logger_open_with_last_xid(directory, logger, TXNID_NONE); 226 } 227 228 bool toku_logger_rollback_is_open (TOKULOGGER logger) { 229 return logger->rollback_cachefile != NULL; 230 } 231 232 #define MAX_CACHED_ROLLBACK_NODES 4096 233 234 void toku_logger_initialize_rollback_cache(TOKULOGGER logger, FT ft) { 235 ft->blocktable.free_unused_blocknums(ft->h->root_blocknum); 236 logger->rollback_cache.init(MAX_CACHED_ROLLBACK_NODES); 237 } 238 239 int toku_logger_open_rollback(TOKULOGGER logger, CACHETABLE cachetable, bool create) { 240 writing_rollback++; 241 assert(logger->is_open); 242 assert(!logger->rollback_cachefile); 243 244 FT_HANDLE ft_handle = nullptr; // Note, there is no DB associated with this FT. 245 toku_ft_handle_create(&ft_handle); 246 int r = toku_ft_handle_open(ft_handle, toku_product_name_strings.rollback_cachefile, create, create, cachetable, nullptr); 247 if (r == 0) { 248 FT ft = ft_handle->ft; 249 logger->rollback_cachefile = ft->cf; 250 toku_logger_initialize_rollback_cache(logger, ft_handle->ft); 251 252 // Verify it is empty 253 // Must have no data blocks (rollback logs or otherwise). 254 ft->blocktable.verify_no_data_blocks_except_root(ft->h->root_blocknum); 255 bool is_empty = toku_ft_is_empty_fast(ft_handle); 256 assert(is_empty); 257 } else { 258 toku_ft_handle_close(ft_handle); 259 } 260 writing_rollback--; 261 return r; 262 } 263 264 265 // Requires: Rollback cachefile can only be closed immediately after a checkpoint, 266 // so it will always be clean (!h->dirty) when about to be closed. 267 // Rollback log can only be closed when there are no open transactions, 268 // so it will always be empty (no data blocks) when about to be closed. 269 void toku_logger_close_rollback_check_empty(TOKULOGGER logger, bool clean_shutdown) { 270 CACHEFILE cf = logger->rollback_cachefile; // stored in logger at rollback cachefile open 271 if (cf) { 272 FT_HANDLE ft_to_close; 273 { //Find "ft_to_close" 274 logger->rollback_cache.destroy(); 275 FT CAST_FROM_VOIDP(ft, toku_cachefile_get_userdata(cf)); 276 if (clean_shutdown) { 277 //Verify it is safe to close it. 278 assert(!ft->h->dirty()); //Must not be dirty. 279 ft->blocktable.free_unused_blocknums(ft->h->root_blocknum); 280 // Must have no data blocks (rollback logs or otherwise). 281 ft->blocktable.verify_no_data_blocks_except_root(ft->h->root_blocknum); 282 assert(!ft->h->dirty()); 283 } else { 284 ft->h->clear_dirty(); 285 } 286 ft_to_close = toku_ft_get_only_existing_ft_handle(ft); 287 if (clean_shutdown) { 288 bool is_empty; 289 is_empty = toku_ft_is_empty_fast(ft_to_close); 290 assert(is_empty); 291 assert(!ft->h->dirty()); // it should not have been dirtied by the toku_ft_is_empty test. 292 } 293 } 294 295 toku_ft_handle_close(ft_to_close); 296 //Set as dealt with already. 297 logger->rollback_cachefile = NULL; 298 } 299 } 300 301 void toku_logger_close_rollback(TOKULOGGER logger) { 302 toku_logger_close_rollback_check_empty(logger, true); 303 } 304 305 // No locks held on entry 306 // No locks held on exit. 307 // No locks are needed, since you cannot legally close the log concurrently with doing anything else. 308 // TODO: can't fail 309 int toku_logger_close(TOKULOGGER *loggerp) { 310 int r; 311 TOKULOGGER logger = *loggerp; 312 if (!logger->is_open) { 313 goto is_closed; 314 } 315 ml_lock(&logger->input_lock); 316 LSN fsynced_lsn; 317 grab_output(logger, &fsynced_lsn); 318 logger_write_buffer(logger, &fsynced_lsn); 319 if (logger->fd!=-1) { 320 if (logger->write_log_files) { 321 toku_file_fsync_without_accounting(logger->fd); 322 } 323 r = toku_os_close(logger->fd); 324 assert(r == 0); 325 } 326 r = close_logdir(logger); 327 assert(r == 0); 328 logger->fd=-1; 329 release_output(logger, fsynced_lsn); 330 331 is_closed: 332 toku_free(logger->inbuf.buf); 333 toku_free(logger->outbuf.buf); 334 // before destroying locks they must be left in the unlocked state. 335 ml_destroy(&logger->input_lock); 336 toku_mutex_destroy(&logger->output_condition_lock); 337 toku_cond_destroy(&logger->output_condition); 338 toku_txn_manager_destroy(logger->txn_manager); 339 if (logger->directory) toku_free(logger->directory); 340 toku_logfilemgr_destroy(&logger->logfilemgr); 341 toku_free(logger); 342 *loggerp=0; 343 return 0; 344 } 345 346 void toku_logger_shutdown(TOKULOGGER logger) { 347 if (logger->is_open) { 348 TXN_MANAGER mgr = logger->txn_manager; 349 if (toku_txn_manager_num_live_root_txns(mgr) == 0) { 350 TXNID last_xid = toku_txn_manager_get_last_xid(mgr); 351 toku_log_shutdown(logger, NULL, true, 0, last_xid); 352 } 353 } 354 } 355 356 static int close_and_open_logfile (TOKULOGGER logger, LSN *fsynced_lsn) 357 // Effect: close the current file, and open the next one. 358 // Entry: This thread has permission to modify the output. 359 // Exit: This thread has permission to modify the output. 360 { 361 int r; 362 if (logger->write_log_files) { 363 toku_file_fsync_without_accounting(logger->fd); 364 *fsynced_lsn = logger->written_lsn; 365 toku_logfilemgr_update_last_lsn(logger->logfilemgr, 366 logger->written_lsn); // fixes t:2294 367 } 368 r = toku_os_close(logger->fd); 369 370 if (r != 0) 371 return get_error_errno(); 372 return open_logfile(logger); 373 } 374 375 static int 376 max_int (int a, int b) 377 { 378 if (a>b) return a; 379 return b; 380 } 381 382 // *********************************************************** 383 // output mutex/condition manipulation routines 384 // *********************************************************** 385 386 static void 387 wait_till_output_available (TOKULOGGER logger) 388 // Effect: Wait until output becomes available. 389 // Implementation hint: Use a pthread_cond_wait. 390 // Entry: Holds the output_condition_lock (but not the inlock) 391 // Exit: Holds the output_condition_lock and logger->output_is_available 392 // 393 { 394 tokutime_t t0 = toku_time_now(); 395 while (!logger->output_is_available) { 396 toku_cond_wait(&logger->output_condition, &logger->output_condition_lock); 397 } 398 if (tokutime_to_seconds(toku_time_now() - t0) >= 0.100) { 399 logger->num_wait_buf_long++; 400 } 401 } 402 403 static void 404 grab_output(TOKULOGGER logger, LSN *fsynced_lsn) 405 // Effect: Wait until output becomes available and get permission to modify output. 406 // Entry: Holds no lock (including not holding the input lock, since we never hold both at once). 407 // Exit: Hold permission to modify output (but none of the locks). 408 { 409 toku_mutex_lock(&logger->output_condition_lock); 410 wait_till_output_available(logger); 411 logger->output_is_available = false; 412 if (fsynced_lsn) { 413 *fsynced_lsn = logger->fsynced_lsn; 414 } 415 toku_mutex_unlock(&logger->output_condition_lock); 416 } 417 418 static bool 419 wait_till_output_already_written_or_output_buffer_available (TOKULOGGER logger, LSN lsn, LSN *fsynced_lsn) 420 // Effect: Wait until either the output is available or the lsn has been written. 421 // Return true iff the lsn has been written. 422 // If returning true, then on exit we don't hold output permission. 423 // If returning false, then on exit we do hold output permission. 424 // Entry: Hold no locks. 425 // Exit: Hold the output permission if returns false. 426 { 427 bool result; 428 toku_mutex_lock(&logger->output_condition_lock); 429 while (1) { 430 if (logger->fsynced_lsn.lsn >= lsn.lsn) { // we can look at the fsynced lsn since we have the lock. 431 result = true; 432 break; 433 } 434 if (logger->output_is_available) { 435 logger->output_is_available = false; 436 result = false; 437 break; 438 } 439 // otherwise wait for a good time to look again. 440 toku_cond_wait(&logger->output_condition, &logger->output_condition_lock); 441 } 442 *fsynced_lsn = logger->fsynced_lsn; 443 toku_mutex_unlock(&logger->output_condition_lock); 444 return result; 445 } 446 447 static void 448 release_output (TOKULOGGER logger, LSN fsynced_lsn) 449 // Effect: Release output permission. 450 // Entry: Holds output permissions, but no locks. 451 // Exit: Holds neither locks nor output permission. 452 { 453 toku_mutex_lock(&logger->output_condition_lock); 454 logger->output_is_available = true; 455 if (logger->fsynced_lsn.lsn < fsynced_lsn.lsn) { 456 logger->fsynced_lsn = fsynced_lsn; 457 } 458 toku_cond_broadcast(&logger->output_condition); 459 toku_mutex_unlock(&logger->output_condition_lock); 460 } 461 462 static void 463 swap_inbuf_outbuf (TOKULOGGER logger) 464 // Effect: Swap the inbuf and outbuf 465 // Entry and exit: Hold the input lock and permission to modify output. 466 { 467 struct logbuf tmp = logger->inbuf; 468 logger->inbuf = logger->outbuf; 469 logger->outbuf = tmp; 470 assert(logger->inbuf.n_in_buf == 0); 471 } 472 473 static void 474 write_outbuf_to_logfile (TOKULOGGER logger, LSN *fsynced_lsn) 475 // Effect: Write the contents of outbuf to logfile. Don't necessarily fsync (but it might, in which case fynced_lsn is updated). 476 // If the logfile gets too big, open the next one (that's the case where an fsync might happen). 477 // Entry and exit: Holds permission to modify output (and doesn't let it go, so it's ok to also hold the inlock). 478 { 479 if (logger->outbuf.n_in_buf>0) { 480 // Write the outbuf to disk, take accounting measurements 481 tokutime_t io_t0 = toku_time_now(); 482 toku_os_full_write(logger->fd, logger->outbuf.buf, logger->outbuf.n_in_buf); 483 tokutime_t io_t1 = toku_time_now(); 484 logger->num_writes_to_disk++; 485 logger->bytes_written_to_disk += logger->outbuf.n_in_buf; 486 logger->time_spent_writing_to_disk += (io_t1 - io_t0); 487 488 assert(logger->outbuf.max_lsn_in_buf.lsn > logger->written_lsn.lsn); // since there is something in the buffer, its LSN must be bigger than what's previously written. 489 logger->written_lsn = logger->outbuf.max_lsn_in_buf; 490 logger->n_in_file += logger->outbuf.n_in_buf; 491 logger->outbuf.n_in_buf = 0; 492 } 493 // If the file got too big, then open a new file. 494 if (logger->n_in_file > logger->lg_max) { 495 int r = close_and_open_logfile(logger, fsynced_lsn); 496 assert_zero(r); 497 } 498 } 499 500 void 501 toku_logger_make_space_in_inbuf (TOKULOGGER logger, int n_bytes_needed) 502 // Entry: Holds the inlock 503 // Exit: Holds the inlock 504 // Effect: Upon exit, the inlock is held and there are at least n_bytes_needed in the buffer. 505 // May release the inlock (and then reacquire it), so this is not atomic. 506 // May obtain the output lock and output permission (but if it does so, it will have released the inlock, since we don't hold both locks at once). 507 // (But may hold output permission and inlock at the same time.) 508 // Implementation hint: Makes space in the inbuf, possibly by writing the inbuf to disk or increasing the size of the inbuf. There might not be an fsync. 509 // Arguments: logger: the logger (side effects) 510 // n_bytes_needed: how many bytes to make space for. 511 { 512 if (logger->inbuf.n_in_buf + n_bytes_needed <= LOGGER_MIN_BUF_SIZE) { 513 return; 514 } 515 ml_unlock(&logger->input_lock); 516 LSN fsynced_lsn; 517 grab_output(logger, &fsynced_lsn); 518 519 ml_lock(&logger->input_lock); 520 // Some other thread may have written the log out while we didn't have the lock. If we have space now, then be happy. 521 if (logger->inbuf.n_in_buf + n_bytes_needed <= LOGGER_MIN_BUF_SIZE) { 522 release_output(logger, fsynced_lsn); 523 return; 524 } 525 if (logger->inbuf.n_in_buf > 0) { 526 // There isn't enough space, and there is something in the buffer, so write the inbuf. 527 swap_inbuf_outbuf(logger); 528 529 // Don't release the inlock in this case, because we don't want to get starved. 530 write_outbuf_to_logfile(logger, &fsynced_lsn); 531 } 532 // the inbuf is empty. Make it big enough (just in case it is somehow smaller than a single log entry). 533 if (n_bytes_needed > logger->inbuf.buf_size) { 534 assert(n_bytes_needed < (1<<30)); // it seems unlikely to work if a logentry gets that big. 535 int new_size = max_int(logger->inbuf.buf_size * 2, n_bytes_needed); // make it at least twice as big, and big enough for n_bytes 536 assert(new_size < (1<<30)); 537 XREALLOC_N(new_size, logger->inbuf.buf); 538 logger->inbuf.buf_size = new_size; 539 } 540 release_output(logger, fsynced_lsn); 541 } 542 543 void toku_logger_fsync(TOKULOGGER logger) 544 // Effect: This is the exported fsync used by ydb.c for env_log_flush. Group commit doesn't have to work. 545 // Entry: Holds no locks 546 // Exit: Holds no locks 547 // Implementation note: Acquire the output condition lock, then the output permission, then release the output condition lock, then get the input lock. 548 // Then release everything. Hold the input lock while reading the current max lsn in buf to make drd happy that there is no data race. 549 { 550 ml_lock(&logger->input_lock); 551 const LSN max_lsn_in_buf = logger->inbuf.max_lsn_in_buf; 552 ml_unlock(&logger->input_lock); 553 554 toku_logger_maybe_fsync(logger, max_lsn_in_buf, true, false); 555 } 556 557 void toku_logger_fsync_if_lsn_not_fsynced (TOKULOGGER logger, LSN lsn) { 558 if (logger->write_log_files) { 559 toku_logger_maybe_fsync(logger, lsn, true, false); 560 } 561 } 562 563 int toku_logger_is_open(TOKULOGGER logger) { 564 if (logger==0) return 0; 565 return logger->is_open; 566 } 567 568 void toku_logger_set_cachetable (TOKULOGGER logger, CACHETABLE ct) { 569 logger->ct = ct; 570 } 571 572 int toku_logger_set_lg_max(TOKULOGGER logger, uint32_t lg_max) { 573 if (logger==0) return EINVAL; // no logger 574 if (logger->is_open) return EINVAL; 575 if (lg_max>(1<<30)) return EINVAL; // too big 576 logger->lg_max = lg_max; 577 return 0; 578 } 579 int toku_logger_get_lg_max(TOKULOGGER logger, uint32_t *lg_maxp) { 580 if (logger==0) return EINVAL; // no logger 581 *lg_maxp = logger->lg_max; 582 return 0; 583 } 584 585 int toku_logger_set_lg_bsize(TOKULOGGER logger, uint32_t bsize) { 586 if (logger==0) return EINVAL; // no logger 587 if (logger->is_open) return EINVAL; 588 if (bsize<=0 || bsize>(1<<30)) return EINVAL; 589 logger->write_block_size = bsize; 590 return 0; 591 } 592 593 int toku_logger_find_next_unused_log_file(const char *directory, long long *result) 594 // This is called during logger initialalization, and no locks are required. 595 { 596 DIR *d=opendir(directory); 597 long long maxf=-1; *result = maxf; 598 struct dirent *de; 599 if (d==0) return get_error_errno(); 600 while ((de=readdir(d))) { 601 if (de==0) return get_error_errno(); 602 long long thisl = -1; 603 if ( is_a_logfile(de->d_name, &thisl) ) { 604 if ((long long)thisl > maxf) maxf = thisl; 605 } 606 } 607 *result=maxf+1; 608 int r = closedir(d); 609 return r; 610 } 611 612 // TODO: Put this in portability layer when ready 613 // in: file pathname that may have a dirname prefix 614 // return: file leaf name 615 static char * fileleafname(char *pathname) { 616 const char delimiter = '/'; 617 char *leafname = strrchr(pathname, delimiter); 618 if (leafname) 619 leafname++; 620 else 621 leafname = pathname; 622 return leafname; 623 } 624 625 static int logfilenamecompare (const void *ap, const void *bp) { 626 char *a=*(char**)ap; 627 char *a_leafname = fileleafname(a); 628 char *b=*(char**)bp; 629 char * b_leafname = fileleafname(b); 630 int rval; 631 bool valid; 632 uint64_t num_a = 0; // placate compiler 633 uint64_t num_b = 0; 634 uint32_t ver_a = 0; 635 uint32_t ver_b = 0; 636 valid = is_a_logfile_any_version(a_leafname, &num_a, &ver_a); 637 invariant(valid); 638 valid = is_a_logfile_any_version(b_leafname, &num_b, &ver_b); 639 invariant(valid); 640 if (ver_a < ver_b) rval = -1; 641 else if (ver_a > ver_b) rval = +1; 642 else if (num_a < num_b) rval = -1; 643 else if (num_a > num_b) rval = +1; 644 else rval = 0; 645 return rval; 646 } 647 648 // Return the log files in sorted order 649 // Return a null_terminated array of strings, and also return the number of strings in the array. 650 // Requires: Race conditions must be dealt with by caller. Either call during initialization or grab the output permission. 651 int toku_logger_find_logfiles (const char *directory, char ***resultp, int *n_logfiles) 652 { 653 int result_limit=2; 654 int n_results=0; 655 char **MALLOC_N(result_limit, result); 656 assert(result!= NULL); 657 struct dirent *de; 658 DIR *d=opendir(directory); 659 if (d==0) { 660 int er = get_error_errno(); 661 toku_free(result); 662 return er; 663 } 664 int dirnamelen = strlen(directory); 665 while ((de=readdir(d))) { 666 uint64_t thisl; 667 uint32_t version_ignore; 668 if ( !(is_a_logfile_any_version(de->d_name, &thisl, &version_ignore)) ) continue; //#2424: Skip over files that don't match the exact logfile template 669 if (n_results+1>=result_limit) { 670 result_limit*=2; 671 XREALLOC_N(result_limit, result); 672 } 673 int fnamelen = dirnamelen + strlen(de->d_name) + 2; // One for the slash and one for the trailing NUL. 674 char *XMALLOC_N(fnamelen, fname); 675 snprintf(fname, fnamelen, "%s/%s", directory, de->d_name); 676 result[n_results++] = fname; 677 } 678 // Return them in increasing order. 679 qsort(result, n_results, sizeof(result[0]), logfilenamecompare); 680 *resultp = result; 681 *n_logfiles = n_results; 682 result[n_results]=0; // make a trailing null 683 return d ? closedir(d) : 0; 684 } 685 686 void toku_logger_free_logfiles(char **logfiles, int n_logfiles) { 687 for (int i = 0; i < n_logfiles; i++) 688 toku_free(logfiles[i]); 689 toku_free(logfiles); 690 } 691 692 static int open_logfile (TOKULOGGER logger) 693 // Entry and Exit: This thread has permission to modify the output. 694 { 695 int fnamelen = strlen(logger->directory)+50; 696 char fname[fnamelen]; 697 snprintf(fname, 698 fnamelen, 699 "%s/log%012lld.tokulog%d", 700 logger->directory, 701 logger->next_log_file_number, 702 TOKU_LOG_VERSION); 703 long long index = logger->next_log_file_number; 704 if (logger->write_log_files) { 705 logger->fd = 706 toku_os_open(fname, 707 O_CREAT + O_WRONLY + O_TRUNC + O_EXCL + O_BINARY, 708 S_IRUSR + S_IWUSR, 709 *tokudb_file_log_key); 710 if (logger->fd == -1) { 711 return get_error_errno(); 712 } 713 fsync_logdir(logger); 714 logger->next_log_file_number++; 715 } else { 716 logger->fd = toku_os_open( 717 DEV_NULL_FILE, O_WRONLY + O_BINARY, S_IWUSR, *tokudb_file_log_key); 718 if (logger->fd == -1) { 719 return get_error_errno(); 720 } 721 } 722 toku_os_full_write(logger->fd, "tokulogg", 8); 723 int version_l = toku_htonl(log_format_version); //version MUST be in network byte order regardless of disk order 724 toku_os_full_write(logger->fd, &version_l, 4); 725 if ( logger->write_log_files ) { 726 TOKULOGFILEINFO XMALLOC(lf_info); 727 lf_info->index = index; 728 lf_info->maxlsn = logger->written_lsn; 729 lf_info->version = TOKU_LOG_VERSION; 730 toku_logfilemgr_add_logfile_info(logger->logfilemgr, lf_info); 731 } 732 logger->fsynced_lsn = logger->written_lsn; 733 logger->n_in_file = 12; 734 return 0; 735 } 736 737 static void delete_logfile(TOKULOGGER logger, long long index, uint32_t version) 738 // Entry and Exit: This thread has permission to modify the output. 739 { 740 int fnamelen = strlen(logger->directory)+50; 741 char fname[fnamelen]; 742 snprintf(fname, fnamelen, "%s/log%012lld.tokulog%d", logger->directory, index, version); 743 int r = remove(fname); 744 invariant_zero(r); 745 } 746 747 void toku_logger_maybe_trim_log(TOKULOGGER logger, LSN trim_lsn) 748 // On entry and exit: No logger locks held. 749 // Acquires and releases output permission. 750 { 751 LSN fsynced_lsn; 752 grab_output(logger, &fsynced_lsn); 753 TOKULOGFILEMGR lfm = logger->logfilemgr; 754 int n_logfiles = toku_logfilemgr_num_logfiles(lfm); 755 756 TOKULOGFILEINFO lf_info = NULL; 757 758 if ( logger->write_log_files && logger->trim_log_files) { 759 while ( n_logfiles > 1 ) { // don't delete current logfile 760 uint32_t log_version; 761 lf_info = toku_logfilemgr_get_oldest_logfile_info(lfm); 762 log_version = lf_info->version; 763 if ( lf_info->maxlsn.lsn >= trim_lsn.lsn ) { 764 // file contains an open LSN, can't delete this or any newer log files 765 break; 766 } 767 // need to save copy - toku_logfilemgr_delete_oldest_logfile_info free's the lf_info 768 long index = lf_info->index; 769 toku_logfilemgr_delete_oldest_logfile_info(lfm); 770 n_logfiles--; 771 delete_logfile(logger, index, log_version); 772 } 773 } 774 release_output(logger, fsynced_lsn); 775 } 776 777 void toku_logger_write_log_files (TOKULOGGER logger, bool write_log_files) 778 // Called only during initialization (or just after recovery), so no locks are needed. 779 { 780 logger->write_log_files = write_log_files; 781 } 782 783 void toku_logger_trim_log_files (TOKULOGGER logger, bool trim_log_files) 784 // Called only during initialization, so no locks are needed. 785 { 786 logger->trim_log_files = trim_log_files; 787 } 788 789 bool toku_logger_txns_exist(TOKULOGGER logger) 790 // Called during close of environment to ensure that transactions don't exist 791 { 792 return toku_txn_manager_txns_exist(logger->txn_manager); 793 } 794 795 796 void toku_logger_maybe_fsync(TOKULOGGER logger, LSN lsn, int do_fsync, bool holds_input_lock) 797 // Effect: If fsync is nonzero, then make sure that the log is flushed and synced at least up to lsn. 798 // Entry: Holds input lock iff 'holds_input_lock'. The log entry has already been written to the input buffer. 799 // Exit: Holds no locks. 800 // The input lock may be released and then reacquired. Thus this function does not run atomically with respect to other threads. 801 { 802 if (holds_input_lock) { 803 ml_unlock(&logger->input_lock); 804 } 805 if (do_fsync) { 806 // reacquire the locks (acquire output permission first) 807 LSN fsynced_lsn; 808 bool already_done = wait_till_output_already_written_or_output_buffer_available(logger, lsn, &fsynced_lsn); 809 if (already_done) { 810 return; 811 } 812 813 // otherwise we now own the output permission, and our lsn isn't outputed. 814 815 ml_lock(&logger->input_lock); 816 817 swap_inbuf_outbuf(logger); 818 819 ml_unlock(&logger->input_lock); // release the input lock now, so other threads can fill the inbuf. (Thus enabling group commit.) 820 821 write_outbuf_to_logfile(logger, &fsynced_lsn); 822 if (fsynced_lsn.lsn < lsn.lsn) { 823 // it may have gotten fsynced by the write_outbuf_to_logfile. 824 toku_file_fsync_without_accounting(logger->fd); 825 assert(fsynced_lsn.lsn <= logger->written_lsn.lsn); 826 fsynced_lsn = logger->written_lsn; 827 } 828 // the last lsn is only accessed while holding output permission or else when the log file is old. 829 if (logger->write_log_files) { 830 toku_logfilemgr_update_last_lsn(logger->logfilemgr, logger->written_lsn); 831 } 832 release_output(logger, fsynced_lsn); 833 } 834 } 835 836 static void 837 logger_write_buffer(TOKULOGGER logger, LSN *fsynced_lsn) 838 // Entry: Holds the input lock and permission to modify output. 839 // Exit: Holds only the permission to modify output. 840 // Effect: Write the buffers to the output. If DO_FSYNC is true, then fsync. 841 // Note: Only called during single-threaded activity from toku_logger_restart, so locks aren't really needed. 842 { 843 swap_inbuf_outbuf(logger); 844 ml_unlock(&logger->input_lock); 845 write_outbuf_to_logfile(logger, fsynced_lsn); 846 if (logger->write_log_files) { 847 toku_file_fsync_without_accounting(logger->fd); 848 toku_logfilemgr_update_last_lsn(logger->logfilemgr, logger->written_lsn); // t:2294 849 } 850 } 851 852 int toku_logger_restart(TOKULOGGER logger, LSN lastlsn) 853 // Entry and exit: Holds no locks (this is called only during single-threaded activity, such as initial start). 854 { 855 int r; 856 857 // flush out the log buffer 858 LSN fsynced_lsn; 859 grab_output(logger, &fsynced_lsn); 860 ml_lock(&logger->input_lock); 861 logger_write_buffer(logger, &fsynced_lsn); 862 863 // close the log file 864 if (logger->write_log_files) { // fsyncs don't work to /dev/null 865 toku_file_fsync_without_accounting(logger->fd); 866 } 867 r = toku_os_close(logger->fd); 868 assert(r == 0); 869 logger->fd = -1; 870 871 // reset the LSN's to the lastlsn when the logger was opened 872 logger->lsn = logger->written_lsn = logger->fsynced_lsn = lastlsn; 873 logger->write_log_files = true; 874 logger->trim_log_files = true; 875 876 // open a new log file 877 r = open_logfile(logger); 878 release_output(logger, fsynced_lsn); 879 return r; 880 } 881 882 // fname is the iname 883 void toku_logger_log_fcreate (TOKUTXN txn, const char *fname, FILENUM filenum, uint32_t mode, 884 uint32_t treeflags, uint32_t nodesize, uint32_t basementnodesize, 885 enum toku_compression_method compression_method) { 886 if (txn) { 887 BYTESTRING bs_fname = { .len = (uint32_t) strlen(fname), .data = (char *) fname }; 888 // fsync log on fcreate 889 toku_log_fcreate (txn->logger, (LSN*)0, 1, txn, toku_txn_get_txnid(txn), filenum, 890 bs_fname, mode, treeflags, nodesize, basementnodesize, compression_method); 891 } 892 } 893 894 895 // We only do fdelete on open ft's, so we pass the filenum here 896 void toku_logger_log_fdelete (TOKUTXN txn, FILENUM filenum) { 897 if (txn) { 898 //No fsync. 899 toku_log_fdelete (txn->logger, (LSN*)0, 0, txn, toku_txn_get_txnid(txn), filenum); 900 } 901 } 902 903 904 905 /* fopen isn't really an action. It's just for bookkeeping. We need to know the filename that goes with a filenum. */ 906 void toku_logger_log_fopen (TOKUTXN txn, const char * fname, FILENUM filenum, uint32_t treeflags) { 907 if (txn) { 908 BYTESTRING bs; 909 bs.len = strlen(fname); 910 bs.data = (char*)fname; 911 toku_log_fopen (txn->logger, (LSN*)0, 0, bs, filenum, treeflags); 912 } 913 } 914 915 static int toku_fread_uint8_t_nocrclen (FILE *f, uint8_t *v) { 916 int vi=fgetc(f); 917 if (vi==EOF) return -1; 918 uint8_t vc=(uint8_t)vi; 919 *v = vc; 920 return 0; 921 } 922 923 int toku_fread_uint8_t (FILE *f, uint8_t *v, struct x1764 *mm, uint32_t *len) { 924 int vi=fgetc(f); 925 if (vi==EOF) return -1; 926 uint8_t vc=(uint8_t)vi; 927 toku_x1764_add(mm, &vc, 1); 928 (*len)++; 929 *v = vc; 930 return 0; 931 } 932 933 int toku_fread_uint32_t_nocrclen (FILE *f, uint32_t *v) { 934 uint32_t result; 935 uint8_t *cp = (uint8_t*)&result; 936 int r; 937 r = toku_fread_uint8_t_nocrclen (f, cp+0); if (r!=0) return r; 938 r = toku_fread_uint8_t_nocrclen (f, cp+1); if (r!=0) return r; 939 r = toku_fread_uint8_t_nocrclen (f, cp+2); if (r!=0) return r; 940 r = toku_fread_uint8_t_nocrclen (f, cp+3); if (r!=0) return r; 941 *v = toku_dtoh32(result); 942 943 return 0; 944 } 945 int toku_fread_uint32_t (FILE *f, uint32_t *v, struct x1764 *checksum, uint32_t *len) { 946 uint32_t result; 947 uint8_t *cp = (uint8_t*)&result; 948 int r; 949 r = toku_fread_uint8_t (f, cp+0, checksum, len); if(r!=0) return r; 950 r = toku_fread_uint8_t (f, cp+1, checksum, len); if(r!=0) return r; 951 r = toku_fread_uint8_t (f, cp+2, checksum, len); if(r!=0) return r; 952 r = toku_fread_uint8_t (f, cp+3, checksum, len); if(r!=0) return r; 953 *v = toku_dtoh32(result); 954 return 0; 955 } 956 957 int toku_fread_uint64_t (FILE *f, uint64_t *v, struct x1764 *checksum, uint32_t *len) { 958 uint32_t v1,v2; 959 int r; 960 r=toku_fread_uint32_t(f, &v1, checksum, len); if (r!=0) return r; 961 r=toku_fread_uint32_t(f, &v2, checksum, len); if (r!=0) return r; 962 *v = (((uint64_t)v1)<<32 ) | ((uint64_t)v2); 963 return 0; 964 } 965 966 int toku_fread_bool (FILE *f, bool *v, struct x1764 *mm, uint32_t *len) { 967 uint8_t iv; 968 int r = toku_fread_uint8_t(f, &iv, mm, len); 969 if (r == 0) { 970 *v = (iv!=0); 971 } 972 return r; 973 } 974 975 int toku_fread_LSN (FILE *f, LSN *lsn, struct x1764 *checksum, uint32_t *len) { 976 return toku_fread_uint64_t (f, &lsn->lsn, checksum, len); 977 } 978 979 int toku_fread_BLOCKNUM (FILE *f, BLOCKNUM *b, struct x1764 *checksum, uint32_t *len) { 980 return toku_fread_uint64_t (f, (uint64_t*)&b->b, checksum, len); 981 } 982 983 int toku_fread_FILENUM (FILE *f, FILENUM *filenum, struct x1764 *checksum, uint32_t *len) { 984 return toku_fread_uint32_t (f, &filenum->fileid, checksum, len); 985 } 986 987 int toku_fread_TXNID (FILE *f, TXNID *txnid, struct x1764 *checksum, uint32_t *len) { 988 return toku_fread_uint64_t (f, txnid, checksum, len); 989 } 990 991 int toku_fread_TXNID_PAIR (FILE *f, TXNID_PAIR *txnid, struct x1764 *checksum, uint32_t *len) { 992 TXNID parent; 993 TXNID child; 994 int r; 995 r = toku_fread_TXNID(f, &parent, checksum, len); if (r != 0) { return r; } 996 r = toku_fread_TXNID(f, &child, checksum, len); if (r != 0) { return r; } 997 txnid->parent_id64 = parent; 998 txnid->child_id64 = child; 999 return 0; 1000 } 1001 1002 1003 int toku_fread_XIDP (FILE *f, XIDP *xidp, struct x1764 *checksum, uint32_t *len) { 1004 // These reads are verbose because XA defined the fields as "long", but we use 4 bytes, 1 byte and 1 byte respectively. 1005 TOKU_XA_XID *XMALLOC(xid); 1006 { 1007 uint32_t formatID; 1008 int r = toku_fread_uint32_t(f, &formatID, checksum, len); 1009 if (r!=0) return r; 1010 xid->formatID = formatID; 1011 } 1012 { 1013 uint8_t gtrid_length; 1014 int r = toku_fread_uint8_t (f, >rid_length, checksum, len); 1015 if (r!=0) return r; 1016 xid->gtrid_length = gtrid_length; 1017 } 1018 { 1019 uint8_t bqual_length; 1020 int r = toku_fread_uint8_t (f, &bqual_length, checksum, len); 1021 if (r!=0) return r; 1022 xid->bqual_length = bqual_length; 1023 } 1024 for (int i=0; i< xid->gtrid_length + xid->bqual_length; i++) { 1025 uint8_t byte; 1026 int r = toku_fread_uint8_t(f, &byte, checksum, len); 1027 if (r!=0) return r; 1028 xid->data[i] = byte; 1029 } 1030 *xidp = xid; 1031 return 0; 1032 } 1033 1034 // fills in the bs with malloced data. 1035 int toku_fread_BYTESTRING (FILE *f, BYTESTRING *bs, struct x1764 *checksum, uint32_t *len) { 1036 int r=toku_fread_uint32_t(f, (uint32_t*)&bs->len, checksum, len); 1037 if (r!=0) return r; 1038 XMALLOC_N(bs->len, bs->data); 1039 uint32_t i; 1040 for (i=0; i<bs->len; i++) { 1041 r=toku_fread_uint8_t(f, (uint8_t*)&bs->data[i], checksum, len); 1042 if (r!=0) { 1043 toku_free(bs->data); 1044 bs->data=0; 1045 return r; 1046 } 1047 } 1048 return 0; 1049 } 1050 1051 // fills in the fs with malloced data. 1052 int toku_fread_FILENUMS (FILE *f, FILENUMS *fs, struct x1764 *checksum, uint32_t *len) { 1053 int r=toku_fread_uint32_t(f, (uint32_t*)&fs->num, checksum, len); 1054 if (r!=0) return r; 1055 XMALLOC_N(fs->num, fs->filenums); 1056 uint32_t i; 1057 for (i=0; i<fs->num; i++) { 1058 r=toku_fread_FILENUM (f, &fs->filenums[i], checksum, len); 1059 if (r!=0) { 1060 toku_free(fs->filenums); 1061 fs->filenums=0; 1062 return r; 1063 } 1064 } 1065 return 0; 1066 } 1067 1068 int toku_logprint_LSN (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format __attribute__((__unused__))) { 1069 LSN v; 1070 int r = toku_fread_LSN(inf, &v, checksum, len); 1071 if (r!=0) return r; 1072 fprintf(outf, " %s=%" PRIu64, fieldname, v.lsn); 1073 return 0; 1074 } 1075 1076 int toku_logprint_TXNID (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format __attribute__((__unused__))) { 1077 TXNID v; 1078 int r = toku_fread_TXNID(inf, &v, checksum, len); 1079 if (r!=0) return r; 1080 fprintf(outf, " %s=%" PRIu64, fieldname, v); 1081 return 0; 1082 } 1083 1084 int toku_logprint_TXNID_PAIR (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format __attribute__((__unused__))) { 1085 TXNID_PAIR v; 1086 int r = toku_fread_TXNID_PAIR(inf, &v, checksum, len); 1087 if (r!=0) return r; 1088 fprintf(outf, " %s=%" PRIu64 ",%" PRIu64, fieldname, v.parent_id64, v.child_id64); 1089 return 0; 1090 } 1091 1092 int toku_logprint_XIDP (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format __attribute__((__unused__))) { 1093 XIDP vp; 1094 int r = toku_fread_XIDP(inf, &vp, checksum, len); 1095 if (r!=0) return r; 1096 fprintf(outf, " %s={formatID=0x%lx gtrid_length=%ld bqual_length=%ld data=", fieldname, vp->formatID, vp->gtrid_length, vp->bqual_length); 1097 toku_print_bytes(outf, vp->gtrid_length + vp->bqual_length, vp->data); 1098 fprintf(outf, "}"); 1099 toku_free(vp); 1100 return 0; 1101 } 1102 1103 int toku_logprint_uint8_t (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format) { 1104 uint8_t v; 1105 int r = toku_fread_uint8_t(inf, &v, checksum, len); 1106 if (r!=0) return r; 1107 fprintf(outf, " %s=%d", fieldname, v); 1108 if (format) fprintf(outf, format, v); 1109 else if (v=='\'') fprintf(outf, "('\'')"); 1110 else if (isprint(v)) fprintf(outf, "('%c')", v); 1111 else {}/*nothing*/ 1112 return 0; 1113 } 1114 1115 int toku_logprint_uint32_t (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format) { 1116 uint32_t v; 1117 int r = toku_fread_uint32_t(inf, &v, checksum, len); 1118 if (r!=0) return r; 1119 fprintf(outf, " %s=", fieldname); 1120 fprintf(outf, format ? format : "%d", v); 1121 return 0; 1122 } 1123 1124 int toku_logprint_uint64_t (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format) { 1125 uint64_t v; 1126 int r = toku_fread_uint64_t(inf, &v, checksum, len); 1127 if (r!=0) return r; 1128 fprintf(outf, " %s=", fieldname); 1129 fprintf(outf, format ? format : "%" PRId64, v); 1130 return 0; 1131 } 1132 1133 int toku_logprint_bool (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format __attribute__((__unused__))) { 1134 bool v; 1135 int r = toku_fread_bool(inf, &v, checksum, len); 1136 if (r!=0) return r; 1137 fprintf(outf, " %s=%s", fieldname, v ? "true" : "false"); 1138 return 0; 1139 1140 } 1141 1142 void toku_print_BYTESTRING (FILE *outf, uint32_t len, char *data) { 1143 fprintf(outf, "{len=%u data=", len); 1144 toku_print_bytes(outf, len, data); 1145 fprintf(outf, "}"); 1146 1147 } 1148 1149 int toku_logprint_BYTESTRING (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format __attribute__((__unused__))) { 1150 BYTESTRING bs; 1151 int r = toku_fread_BYTESTRING(inf, &bs, checksum, len); 1152 if (r!=0) return r; 1153 fprintf(outf, " %s=", fieldname); 1154 toku_print_BYTESTRING(outf, bs.len, bs.data); 1155 toku_free(bs.data); 1156 return 0; 1157 } 1158 1159 int toku_logprint_BLOCKNUM (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format) { 1160 return toku_logprint_uint64_t(outf, inf, fieldname, checksum, len, format); 1161 1162 } 1163 1164 int toku_logprint_FILENUM (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format) { 1165 return toku_logprint_uint32_t(outf, inf, fieldname, checksum, len, format); 1166 1167 } 1168 1169 static void 1170 toku_print_FILENUMS (FILE *outf, uint32_t num, FILENUM *filenums) { 1171 fprintf(outf, "{num=%u filenums=\"", num); 1172 uint32_t i; 1173 for (i=0; i<num; i++) { 1174 if (i>0) 1175 fprintf(outf, ","); 1176 fprintf(outf, "0x%" PRIx32, filenums[i].fileid); 1177 } 1178 fprintf(outf, "\"}"); 1179 1180 } 1181 1182 int toku_logprint_FILENUMS (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format __attribute__((__unused__))) { 1183 FILENUMS bs; 1184 int r = toku_fread_FILENUMS(inf, &bs, checksum, len); 1185 if (r!=0) return r; 1186 fprintf(outf, " %s=", fieldname); 1187 toku_print_FILENUMS(outf, bs.num, bs.filenums); 1188 toku_free(bs.filenums); 1189 return 0; 1190 } 1191 1192 int toku_read_and_print_logmagic (FILE *f, uint32_t *versionp) { 1193 { 1194 char magic[8]; 1195 int r=fread(magic, 1, 8, f); 1196 if (r!=8) { 1197 return DB_BADFORMAT; 1198 } 1199 if (memcmp(magic, "tokulogg", 8)!=0) { 1200 return DB_BADFORMAT; 1201 } 1202 } 1203 { 1204 int version; 1205 int r=fread(&version, 1, 4, f); 1206 if (r!=4) { 1207 return DB_BADFORMAT; 1208 } 1209 printf("tokulog v.%u\n", toku_ntohl(version)); 1210 //version MUST be in network order regardless of disk order 1211 *versionp=toku_ntohl(version); 1212 } 1213 return 0; 1214 } 1215 1216 int toku_read_logmagic (FILE *f, uint32_t *versionp) { 1217 { 1218 char magic[8]; 1219 int r=fread(magic, 1, 8, f); 1220 if (r!=8) { 1221 return DB_BADFORMAT; 1222 } 1223 if (memcmp(magic, "tokulogg", 8)!=0) { 1224 return DB_BADFORMAT; 1225 } 1226 } 1227 { 1228 int version; 1229 int r=fread(&version, 1, 4, f); 1230 if (r!=4) { 1231 return DB_BADFORMAT; 1232 } 1233 *versionp=toku_ntohl(version); 1234 } 1235 return 0; 1236 } 1237 1238 TXNID_PAIR toku_txn_get_txnid (TOKUTXN txn) { 1239 TXNID_PAIR tp = { .parent_id64 = TXNID_NONE, .child_id64 = TXNID_NONE}; 1240 if (txn==0) return tp; 1241 else return txn->txnid; 1242 } 1243 1244 LSN toku_logger_last_lsn(TOKULOGGER logger) { 1245 return logger->lsn; 1246 } 1247 1248 TOKULOGGER toku_txn_logger (TOKUTXN txn) { 1249 return txn ? txn->logger : 0; 1250 } 1251 1252 void toku_txnid2txn(TOKULOGGER logger, TXNID_PAIR txnid, TOKUTXN *result) { 1253 TOKUTXN root_txn = NULL; 1254 toku_txn_manager_suspend(logger->txn_manager); 1255 toku_txn_manager_id2txn_unlocked(logger->txn_manager, txnid, &root_txn); 1256 if (root_txn == NULL || root_txn->txnid.child_id64 == txnid.child_id64) { 1257 *result = root_txn; 1258 } 1259 else if (root_txn != NULL) { 1260 root_txn->child_manager->suspend(); 1261 root_txn->child_manager->find_tokutxn_by_xid_unlocked(txnid, result); 1262 root_txn->child_manager->resume(); 1263 } 1264 toku_txn_manager_resume(logger->txn_manager); 1265 } 1266 1267 // Find the earliest LSN in a log. No locks are needed. 1268 static int peek_at_log(TOKULOGGER logger, char *filename, LSN *first_lsn) { 1269 int fd = toku_os_open( 1270 filename, O_RDONLY + O_BINARY, S_IRUSR, *tokudb_file_log_key); 1271 if (fd < 0) { 1272 int er = get_error_errno(); 1273 if (logger->write_log_files) 1274 printf("couldn't open: %s\n", strerror(er)); 1275 return er; 1276 } 1277 enum { SKIP = 12+1+4 }; // read the 12 byte header, the first message, and the first len 1278 unsigned char header[SKIP+8]; 1279 int r = read(fd, header, SKIP+8); 1280 if (r!=SKIP+8) return 0; // cannot determine that it's archivable, so we'll assume no. If a later-log is archivable is then this one will be too. 1281 1282 uint64_t lsn; 1283 { 1284 struct rbuf rb; 1285 rb.buf = header+SKIP; 1286 rb.size = 8; 1287 rb.ndone = 0; 1288 lsn = rbuf_ulonglong(&rb); 1289 } 1290 1291 r = toku_os_close(fd); 1292 1293 if (r != 0) { 1294 return 0; 1295 } 1296 1297 first_lsn->lsn = lsn; 1298 return 0; 1299 } 1300 1301 // Return a malloc'd array of malloc'd strings which are the filenames that can be archived. 1302 // Output permission are obtained briefly so we can get a list of the log files without conflicting. 1303 int toku_logger_log_archive (TOKULOGGER logger, char ***logs_p, int flags) { 1304 if (flags!=0) return EINVAL; // don't know what to do. 1305 int all_n_logs; 1306 int i; 1307 char **all_logs; 1308 int n_logfiles; 1309 LSN fsynced_lsn; 1310 grab_output(logger, &fsynced_lsn); 1311 int r = toku_logger_find_logfiles (logger->directory, &all_logs, &n_logfiles); 1312 release_output(logger, fsynced_lsn); 1313 if (r!=0) return r; 1314 1315 for (i=0; all_logs[i]; i++); 1316 all_n_logs=i; 1317 // get them into increasing order 1318 qsort(all_logs, all_n_logs, sizeof(all_logs[0]), logfilenamecompare); 1319 1320 LSN save_lsn = logger->last_completed_checkpoint_lsn; 1321 1322 // Now starting at the last one, look for archivable ones. 1323 // Count the total number of bytes, because we have to return a single big array. (That's the BDB interface. Bleah...) 1324 LSN earliest_lsn_in_logfile={(unsigned long long)(-1LL)}; 1325 r = peek_at_log(logger, all_logs[all_n_logs-1], &earliest_lsn_in_logfile); // try to find the lsn that's in the most recent log 1326 if (earliest_lsn_in_logfile.lsn <= save_lsn.lsn) { 1327 i=all_n_logs-1; 1328 } else { 1329 for (i=all_n_logs-2; i>=0; i--) { // start at all_n_logs-2 because we never archive the most recent log 1330 r = peek_at_log(logger, all_logs[i], &earliest_lsn_in_logfile); 1331 if (r!=0) continue; // In case of error, just keep going 1332 1333 if (earliest_lsn_in_logfile.lsn <= save_lsn.lsn) { 1334 break; 1335 } 1336 } 1337 } 1338 1339 // all log files up to, but but not including, i can be archived. 1340 int n_to_archive=i; 1341 int count_bytes=0; 1342 for (i=0; i<n_to_archive; i++) { 1343 count_bytes+=1+strlen(all_logs[i]); 1344 } 1345 char **result; 1346 if (i==0) { 1347 result=0; 1348 } else { 1349 CAST_FROM_VOIDP(result, toku_xmalloc((1+n_to_archive)*sizeof(*result) + count_bytes)); 1350 char *base = (char*)(result+1+n_to_archive); 1351 for (i=0; i<n_to_archive; i++) { 1352 int len=1+strlen(all_logs[i]); 1353 result[i]=base; 1354 memcpy(base, all_logs[i], len); 1355 base+=len; 1356 } 1357 result[n_to_archive]=0; 1358 } 1359 for (i=0; all_logs[i]; i++) { 1360 toku_free(all_logs[i]); 1361 } 1362 toku_free(all_logs); 1363 *logs_p = result; 1364 return 0; 1365 } 1366 1367 1368 TOKUTXN toku_logger_txn_parent (TOKUTXN txn) { 1369 return txn->parent; 1370 } 1371 1372 void toku_logger_note_checkpoint(TOKULOGGER logger, LSN lsn) { 1373 logger->last_completed_checkpoint_lsn = lsn; 1374 } 1375 1376 void 1377 toku_logger_get_status(TOKULOGGER logger, LOGGER_STATUS statp) { 1378 log_status.init(); 1379 if (logger) { 1380 LOG_STATUS_VAL(LOGGER_NEXT_LSN) = logger->lsn.lsn; 1381 LOG_STATUS_VAL(LOGGER_NUM_WRITES) = logger->num_writes_to_disk; 1382 LOG_STATUS_VAL(LOGGER_BYTES_WRITTEN) = logger->bytes_written_to_disk; 1383 // No compression on logfiles so the uncompressed size is just number of bytes written 1384 LOG_STATUS_VAL(LOGGER_UNCOMPRESSED_BYTES_WRITTEN) = logger->bytes_written_to_disk; 1385 LOG_STATUS_VAL(LOGGER_TOKUTIME_WRITES) = logger->time_spent_writing_to_disk; 1386 LOG_STATUS_VAL(LOGGER_WAIT_BUF_LONG) = logger->num_wait_buf_long; 1387 } 1388 *statp = log_status; 1389 } 1390 1391 1392 1393 ////////////////////////////////////////////////////////////////////////////////////////////////////// 1394 // Used for upgrade: 1395 // if any valid log files exist in log_dir, then 1396 // set *found_any_logs to true and set *version_found to version number of latest log 1397 int 1398 toku_get_version_of_logs_on_disk(const char *log_dir, bool *found_any_logs, uint32_t *version_found) { 1399 bool found = false; 1400 uint32_t highest_version = 0; 1401 int r = 0; 1402 1403 struct dirent *de; 1404 DIR *d=opendir(log_dir); 1405 if (d==NULL) { 1406 r = get_error_errno(); 1407 } 1408 else { 1409 // Examine every file in the directory and find highest version 1410 while ((de=readdir(d))) { 1411 uint32_t this_log_version; 1412 uint64_t this_log_number; 1413 bool is_log = is_a_logfile_any_version(de->d_name, &this_log_number, &this_log_version); 1414 if (is_log) { 1415 if (!found) { // first log file found 1416 found = true; 1417 highest_version = this_log_version; 1418 } 1419 else 1420 highest_version = highest_version > this_log_version ? highest_version : this_log_version; 1421 } 1422 } 1423 int r2 = closedir(d); 1424 if (r==0) r = r2; 1425 } 1426 if (r==0) { 1427 *found_any_logs = found; 1428 if (found) 1429 *version_found = highest_version; 1430 } 1431 return r; 1432 } 1433 1434 TXN_MANAGER toku_logger_get_txn_manager(TOKULOGGER logger) { 1435 return logger->txn_manager; 1436 } 1437