1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22 ----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39 #include <memory.h>
40 #include <ctype.h>
41 #include <limits.h>
42 #include <unistd.h>
43
44 #include "ft/serialize/block_table.h"
45 #include "ft/ft.h"
46 #include "ft/logger/log-internal.h"
47 #include "ft/txn/txn_manager.h"
48 #include "ft/txn/rollback_log_node_cache.h"
49
50 #include "util/status.h"
51
52 int writing_rollback = 0;
53 extern "C" {
54 uint force_recovery = 0;
55 }
56
57 static const int log_format_version = TOKU_LOG_VERSION;
58
59 toku_instr_key *result_output_condition_lock_mutex_key;
60 toku_instr_key *result_output_condition_key;
61 toku_instr_key *tokudb_file_log_key;
62
63 static int open_logfile(TOKULOGGER logger);
64 static void logger_write_buffer(TOKULOGGER logger, LSN *fsynced_lsn);
65 static void delete_logfile(TOKULOGGER logger,
66 long long index,
67 uint32_t version);
68 static void grab_output(TOKULOGGER logger, LSN *fsynced_lsn);
69 static void release_output(TOKULOGGER logger, LSN fsynced_lsn);
70
toku_print_bytes(FILE * outf,uint32_t len,char * data)71 static void toku_print_bytes (FILE *outf, uint32_t len, char *data) {
72 fprintf(outf, "\"");
73 uint32_t i;
74 for (i=0; i<len; i++) {
75 switch (data[i]) {
76 case '"': fprintf(outf, "\\\""); break;
77 case '\\': fprintf(outf, "\\\\"); break;
78 case '\n': fprintf(outf, "\\n"); break;
79 default:
80 if (isprint(data[i])) fprintf(outf, "%c", data[i]);
81 else fprintf(outf, "\\%03o", (unsigned char)(data[i]));
82 }
83 }
84 fprintf(outf, "\"");
85 }
86
is_a_logfile_any_version(const char * name,uint64_t * number_result,uint32_t * version_of_log)87 static bool is_a_logfile_any_version (const char *name, uint64_t *number_result, uint32_t *version_of_log) {
88 bool rval = true;
89 uint64_t result;
90 int n;
91 int r;
92 uint32_t version;
93 r = sscanf(name, "log%" SCNu64 ".tokulog%" SCNu32 "%n", &result, &version, &n);
94 if (r!=2 || name[n]!='\0' || version <= TOKU_LOG_VERSION_1) {
95 //Version 1 does NOT append 'version' to end of '.tokulog'
96 version = TOKU_LOG_VERSION_1;
97 r = sscanf(name, "log%" SCNu64 ".tokulog%n", &result, &n);
98 if (r!=1 || name[n]!='\0') {
99 rval = false;
100 }
101 }
102 if (rval) {
103 *number_result = result;
104 *version_of_log = version;
105 }
106
107 return rval;
108 }
109
110 // added for #2424, improved for #2521
is_a_logfile(const char * name,long long * number_result)111 static bool is_a_logfile (const char *name, long long *number_result) {
112 bool rval;
113 uint64_t result;
114 uint32_t version;
115 rval = is_a_logfile_any_version(name, &result, &version);
116 if (rval && version != TOKU_LOG_VERSION)
117 rval = false;
118 if (rval)
119 *number_result = result;
120 return rval;
121 }
122
123
124 // TODO: can't fail
toku_logger_create(TOKULOGGER * resultp)125 int toku_logger_create (TOKULOGGER *resultp) {
126 TOKULOGGER CALLOC(result);
127 if (result==0) return get_error_errno();
128 result->is_open=false;
129 result->write_log_files = true;
130 result->trim_log_files = true;
131 result->directory=0;
132 // fd is uninitialized on purpose
133 // ct is uninitialized on purpose
134 result->lg_max = 100<<20; // 100MB default
135 // lsn is uninitialized
136 result->inbuf = (struct logbuf) {0, LOGGER_MIN_BUF_SIZE, (char *) toku_xmalloc(LOGGER_MIN_BUF_SIZE), ZERO_LSN};
137 result->outbuf = (struct logbuf) {0, LOGGER_MIN_BUF_SIZE, (char *) toku_xmalloc(LOGGER_MIN_BUF_SIZE), ZERO_LSN};
138 // written_lsn is uninitialized
139 // fsynced_lsn is uninitialized
140 result->last_completed_checkpoint_lsn = ZERO_LSN;
141 // next_log_file_number is uninitialized
142 // n_in_file is uninitialized
143 result->write_block_size = FT_DEFAULT_NODE_SIZE; // default logging size is the same as the default ft block size
144 toku_logfilemgr_create(&result->logfilemgr);
145 *resultp = result;
146 ml_init(&result->input_lock);
147 toku_mutex_init(*result_output_condition_lock_mutex_key,
148 &result->output_condition_lock,
149 nullptr);
150 toku_cond_init(
151 *result_output_condition_key, &result->output_condition, nullptr);
152 result->rollback_cachefile = NULL;
153 result->output_is_available = true;
154 toku_txn_manager_init(&result->txn_manager);
155 return 0;
156 }
157
fsync_logdir(TOKULOGGER logger)158 static void fsync_logdir(TOKULOGGER logger) {
159 toku_fsync_dirfd_without_accounting(logger->dir);
160 }
161
open_logdir(TOKULOGGER logger,const char * directory)162 static int open_logdir(TOKULOGGER logger, const char *directory) {
163 if (toku_os_is_absolute_name(directory)) {
164 logger->directory = toku_strdup(directory);
165 } else {
166 char cwdbuf[PATH_MAX];
167 char *cwd = getcwd(cwdbuf, PATH_MAX);
168 if (cwd == NULL)
169 return -1;
170 char *MALLOC_N(strlen(cwd) + strlen(directory) + 2, new_log_dir);
171 if (new_log_dir == NULL) {
172 return -2;
173 }
174 sprintf(new_log_dir, "%s/%s", cwd, directory);
175 logger->directory = new_log_dir;
176 }
177 if (logger->directory==0) return get_error_errno();
178
179 logger->dir = opendir(logger->directory);
180 if ( logger->dir == NULL ) return -1;
181 return 0;
182 }
183
close_logdir(TOKULOGGER logger)184 static int close_logdir(TOKULOGGER logger) {
185 return closedir(logger->dir);
186 }
187
188 int
toku_logger_open_with_last_xid(const char * directory,TOKULOGGER logger,TXNID last_xid)189 toku_logger_open_with_last_xid(const char *directory, TOKULOGGER logger, TXNID last_xid) {
190 if (logger->is_open) return EINVAL;
191
192 int r;
193 TXNID last_xid_if_clean_shutdown = TXNID_NONE;
194 r = toku_logfilemgr_init(logger->logfilemgr, directory, &last_xid_if_clean_shutdown);
195 if ( r!=0 )
196 return r;
197 logger->lsn = toku_logfilemgr_get_last_lsn(logger->logfilemgr);
198 logger->written_lsn = logger->lsn;
199 logger->fsynced_lsn = logger->lsn;
200 logger->inbuf.max_lsn_in_buf = logger->lsn;
201 logger->outbuf.max_lsn_in_buf = logger->lsn;
202
203 // open directory, save pointer for fsyncing t:2445
204 r = open_logdir(logger, directory);
205 if (r!=0) return r;
206
207 long long nexti;
208 r = toku_logger_find_next_unused_log_file(logger->directory, &nexti);
209 if (r!=0) return r;
210
211 logger->next_log_file_number = nexti;
212 r = open_logfile(logger);
213 if (r!=0) return r;
214 if (last_xid == TXNID_NONE) {
215 last_xid = last_xid_if_clean_shutdown;
216 }
217 toku_txn_manager_set_last_xid_from_logger(logger->txn_manager, last_xid);
218
219 logger->is_open = true;
220 return 0;
221 }
222
toku_logger_open(const char * directory,TOKULOGGER logger)223 int toku_logger_open (const char *directory, TOKULOGGER logger) {
224 return toku_logger_open_with_last_xid(directory, logger, TXNID_NONE);
225 }
226
toku_logger_rollback_is_open(TOKULOGGER logger)227 bool toku_logger_rollback_is_open (TOKULOGGER logger) {
228 return logger->rollback_cachefile != NULL;
229 }
230
231 #define MAX_CACHED_ROLLBACK_NODES 4096
232
toku_logger_initialize_rollback_cache(TOKULOGGER logger,FT ft)233 void toku_logger_initialize_rollback_cache(TOKULOGGER logger, FT ft) {
234 ft->blocktable.free_unused_blocknums(ft->h->root_blocknum);
235 logger->rollback_cache.init(MAX_CACHED_ROLLBACK_NODES);
236 }
237
toku_logger_open_rollback(TOKULOGGER logger,CACHETABLE cachetable,bool create)238 int toku_logger_open_rollback(TOKULOGGER logger, CACHETABLE cachetable, bool create) {
239 writing_rollback++;
240 assert(logger->is_open);
241 assert(!logger->rollback_cachefile);
242
243 FT_HANDLE ft_handle = nullptr; // Note, there is no DB associated with this FT.
244 toku_ft_handle_create(&ft_handle);
245 int r = toku_ft_handle_open(ft_handle, toku_product_name_strings.rollback_cachefile, create, create, cachetable, nullptr);
246 if (r == 0) {
247 FT ft = ft_handle->ft;
248 logger->rollback_cachefile = ft->cf;
249 toku_logger_initialize_rollback_cache(logger, ft_handle->ft);
250
251 // Verify it is empty
252 // Must have no data blocks (rollback logs or otherwise).
253 ft->blocktable.verify_no_data_blocks_except_root(ft->h->root_blocknum);
254 bool is_empty = toku_ft_is_empty_fast(ft_handle);
255 assert(is_empty);
256 } else {
257 toku_ft_handle_close(ft_handle);
258 }
259 writing_rollback--;
260 return r;
261 }
262
263
264 // Requires: Rollback cachefile can only be closed immediately after a checkpoint,
265 // so it will always be clean (!h->dirty) when about to be closed.
266 // Rollback log can only be closed when there are no open transactions,
267 // so it will always be empty (no data blocks) when about to be closed.
toku_logger_close_rollback_check_empty(TOKULOGGER logger,bool clean_shutdown)268 void toku_logger_close_rollback_check_empty(TOKULOGGER logger, bool clean_shutdown) {
269 CACHEFILE cf = logger->rollback_cachefile; // stored in logger at rollback cachefile open
270 if (cf) {
271 FT_HANDLE ft_to_close;
272 { //Find "ft_to_close"
273 logger->rollback_cache.destroy();
274 FT CAST_FROM_VOIDP(ft, toku_cachefile_get_userdata(cf));
275 if (clean_shutdown) {
276 //Verify it is safe to close it.
277 assert(!ft->h->dirty()); //Must not be dirty.
278 ft->blocktable.free_unused_blocknums(ft->h->root_blocknum);
279 // Must have no data blocks (rollback logs or otherwise).
280 ft->blocktable.verify_no_data_blocks_except_root(ft->h->root_blocknum);
281 assert(!ft->h->dirty());
282 } else {
283 ft->h->clear_dirty();
284 }
285 ft_to_close = toku_ft_get_only_existing_ft_handle(ft);
286 if (clean_shutdown) {
287 bool is_empty;
288 is_empty = toku_ft_is_empty_fast(ft_to_close);
289 assert(is_empty);
290 assert(!ft->h->dirty()); // it should not have been dirtied by the toku_ft_is_empty test.
291 }
292 }
293
294 toku_ft_handle_close(ft_to_close);
295 //Set as dealt with already.
296 logger->rollback_cachefile = NULL;
297 }
298 }
299
toku_logger_close_rollback(TOKULOGGER logger)300 void toku_logger_close_rollback(TOKULOGGER logger) {
301 toku_logger_close_rollback_check_empty(logger, true);
302 }
303
304 // No locks held on entry
305 // No locks held on exit.
306 // No locks are needed, since you cannot legally close the log concurrently with doing anything else.
307 // TODO: can't fail
toku_logger_close(TOKULOGGER * loggerp)308 int toku_logger_close(TOKULOGGER *loggerp) {
309 int r;
310 TOKULOGGER logger = *loggerp;
311 if (!logger->is_open) {
312 goto is_closed;
313 }
314 ml_lock(&logger->input_lock);
315 LSN fsynced_lsn;
316 grab_output(logger, &fsynced_lsn);
317 logger_write_buffer(logger, &fsynced_lsn);
318 if (logger->fd!=-1) {
319 if (logger->write_log_files) {
320 toku_file_fsync_without_accounting(logger->fd);
321 }
322 r = toku_os_close(logger->fd);
323 assert(r == 0);
324 }
325 r = close_logdir(logger);
326 assert(r == 0);
327 logger->fd=-1;
328 release_output(logger, fsynced_lsn);
329
330 is_closed:
331 toku_free(logger->inbuf.buf);
332 toku_free(logger->outbuf.buf);
333 // before destroying locks they must be left in the unlocked state.
334 ml_destroy(&logger->input_lock);
335 toku_mutex_destroy(&logger->output_condition_lock);
336 toku_cond_destroy(&logger->output_condition);
337 toku_txn_manager_destroy(logger->txn_manager);
338 if (logger->directory) toku_free(logger->directory);
339 toku_logfilemgr_destroy(&logger->logfilemgr);
340 toku_free(logger);
341 *loggerp=0;
342 return 0;
343 }
344
toku_logger_shutdown(TOKULOGGER logger)345 void toku_logger_shutdown(TOKULOGGER logger) {
346 if (logger->is_open) {
347 TXN_MANAGER mgr = logger->txn_manager;
348 if (toku_txn_manager_num_live_root_txns(mgr) == 0) {
349 TXNID last_xid = toku_txn_manager_get_last_xid(mgr);
350 toku_log_shutdown(logger, NULL, true, 0, last_xid);
351 }
352 }
353 }
354
close_and_open_logfile(TOKULOGGER logger,LSN * fsynced_lsn)355 static int close_and_open_logfile (TOKULOGGER logger, LSN *fsynced_lsn)
356 // Effect: close the current file, and open the next one.
357 // Entry: This thread has permission to modify the output.
358 // Exit: This thread has permission to modify the output.
359 {
360 int r;
361 if (logger->write_log_files) {
362 toku_file_fsync_without_accounting(logger->fd);
363 *fsynced_lsn = logger->written_lsn;
364 toku_logfilemgr_update_last_lsn(logger->logfilemgr,
365 logger->written_lsn); // fixes t:2294
366 }
367 r = toku_os_close(logger->fd);
368
369 if (r != 0)
370 return get_error_errno();
371 return open_logfile(logger);
372 }
373
374 static int
max_int(int a,int b)375 max_int (int a, int b)
376 {
377 if (a>b) return a;
378 return b;
379 }
380
381 // ***********************************************************
382 // output mutex/condition manipulation routines
383 // ***********************************************************
384
385 static void
wait_till_output_available(TOKULOGGER logger)386 wait_till_output_available (TOKULOGGER logger)
387 // Effect: Wait until output becomes available.
388 // Implementation hint: Use a pthread_cond_wait.
389 // Entry: Holds the output_condition_lock (but not the inlock)
390 // Exit: Holds the output_condition_lock and logger->output_is_available
391 //
392 {
393 tokutime_t t0 = toku_time_now();
394 while (!logger->output_is_available) {
395 toku_cond_wait(&logger->output_condition, &logger->output_condition_lock);
396 }
397 if (tokutime_to_seconds(toku_time_now() - t0) >= 0.100) {
398 logger->num_wait_buf_long++;
399 }
400 }
401
402 static void
grab_output(TOKULOGGER logger,LSN * fsynced_lsn)403 grab_output(TOKULOGGER logger, LSN *fsynced_lsn)
404 // Effect: Wait until output becomes available and get permission to modify output.
405 // Entry: Holds no lock (including not holding the input lock, since we never hold both at once).
406 // Exit: Hold permission to modify output (but none of the locks).
407 {
408 toku_mutex_lock(&logger->output_condition_lock);
409 wait_till_output_available(logger);
410 logger->output_is_available = false;
411 if (fsynced_lsn) {
412 *fsynced_lsn = logger->fsynced_lsn;
413 }
414 toku_mutex_unlock(&logger->output_condition_lock);
415 }
416
417 static bool
wait_till_output_already_written_or_output_buffer_available(TOKULOGGER logger,LSN lsn,LSN * fsynced_lsn)418 wait_till_output_already_written_or_output_buffer_available (TOKULOGGER logger, LSN lsn, LSN *fsynced_lsn)
419 // Effect: Wait until either the output is available or the lsn has been written.
420 // Return true iff the lsn has been written.
421 // If returning true, then on exit we don't hold output permission.
422 // If returning false, then on exit we do hold output permission.
423 // Entry: Hold no locks.
424 // Exit: Hold the output permission if returns false.
425 {
426 bool result;
427 toku_mutex_lock(&logger->output_condition_lock);
428 while (1) {
429 if (logger->fsynced_lsn.lsn >= lsn.lsn) { // we can look at the fsynced lsn since we have the lock.
430 result = true;
431 break;
432 }
433 if (logger->output_is_available) {
434 logger->output_is_available = false;
435 result = false;
436 break;
437 }
438 // otherwise wait for a good time to look again.
439 toku_cond_wait(&logger->output_condition, &logger->output_condition_lock);
440 }
441 *fsynced_lsn = logger->fsynced_lsn;
442 toku_mutex_unlock(&logger->output_condition_lock);
443 return result;
444 }
445
446 static void
release_output(TOKULOGGER logger,LSN fsynced_lsn)447 release_output (TOKULOGGER logger, LSN fsynced_lsn)
448 // Effect: Release output permission.
449 // Entry: Holds output permissions, but no locks.
450 // Exit: Holds neither locks nor output permission.
451 {
452 toku_mutex_lock(&logger->output_condition_lock);
453 logger->output_is_available = true;
454 if (logger->fsynced_lsn.lsn < fsynced_lsn.lsn) {
455 logger->fsynced_lsn = fsynced_lsn;
456 }
457 toku_cond_broadcast(&logger->output_condition);
458 toku_mutex_unlock(&logger->output_condition_lock);
459 }
460
461 static void
swap_inbuf_outbuf(TOKULOGGER logger)462 swap_inbuf_outbuf (TOKULOGGER logger)
463 // Effect: Swap the inbuf and outbuf
464 // Entry and exit: Hold the input lock and permission to modify output.
465 {
466 struct logbuf tmp = logger->inbuf;
467 logger->inbuf = logger->outbuf;
468 logger->outbuf = tmp;
469 assert(logger->inbuf.n_in_buf == 0);
470 }
471
472 static void
write_outbuf_to_logfile(TOKULOGGER logger,LSN * fsynced_lsn)473 write_outbuf_to_logfile (TOKULOGGER logger, LSN *fsynced_lsn)
474 // Effect: Write the contents of outbuf to logfile. Don't necessarily fsync (but it might, in which case fynced_lsn is updated).
475 // If the logfile gets too big, open the next one (that's the case where an fsync might happen).
476 // Entry and exit: Holds permission to modify output (and doesn't let it go, so it's ok to also hold the inlock).
477 {
478 if (logger->outbuf.n_in_buf>0) {
479 // Write the outbuf to disk, take accounting measurements
480 tokutime_t io_t0 = toku_time_now();
481 toku_os_full_write(logger->fd, logger->outbuf.buf, logger->outbuf.n_in_buf);
482 tokutime_t io_t1 = toku_time_now();
483 logger->num_writes_to_disk++;
484 logger->bytes_written_to_disk += logger->outbuf.n_in_buf;
485 logger->time_spent_writing_to_disk += (io_t1 - io_t0);
486
487 assert(logger->outbuf.max_lsn_in_buf.lsn > logger->written_lsn.lsn); // since there is something in the buffer, its LSN must be bigger than what's previously written.
488 logger->written_lsn = logger->outbuf.max_lsn_in_buf;
489 logger->n_in_file += logger->outbuf.n_in_buf;
490 logger->outbuf.n_in_buf = 0;
491 }
492 // If the file got too big, then open a new file.
493 if (logger->n_in_file > logger->lg_max) {
494 int r = close_and_open_logfile(logger, fsynced_lsn);
495 assert_zero(r);
496 }
497 }
498
499 void
toku_logger_make_space_in_inbuf(TOKULOGGER logger,int n_bytes_needed)500 toku_logger_make_space_in_inbuf (TOKULOGGER logger, int n_bytes_needed)
501 // Entry: Holds the inlock
502 // Exit: Holds the inlock
503 // Effect: Upon exit, the inlock is held and there are at least n_bytes_needed in the buffer.
504 // May release the inlock (and then reacquire it), so this is not atomic.
505 // May obtain the output lock and output permission (but if it does so, it will have released the inlock, since we don't hold both locks at once).
506 // (But may hold output permission and inlock at the same time.)
507 // Implementation hint: Makes space in the inbuf, possibly by writing the inbuf to disk or increasing the size of the inbuf. There might not be an fsync.
508 // Arguments: logger: the logger (side effects)
509 // n_bytes_needed: how many bytes to make space for.
510 {
511 if (logger->inbuf.n_in_buf + n_bytes_needed <= LOGGER_MIN_BUF_SIZE) {
512 return;
513 }
514 ml_unlock(&logger->input_lock);
515 LSN fsynced_lsn;
516 grab_output(logger, &fsynced_lsn);
517
518 ml_lock(&logger->input_lock);
519 // Some other thread may have written the log out while we didn't have the lock. If we have space now, then be happy.
520 if (logger->inbuf.n_in_buf + n_bytes_needed <= LOGGER_MIN_BUF_SIZE) {
521 release_output(logger, fsynced_lsn);
522 return;
523 }
524 if (logger->inbuf.n_in_buf > 0) {
525 // There isn't enough space, and there is something in the buffer, so write the inbuf.
526 swap_inbuf_outbuf(logger);
527
528 // Don't release the inlock in this case, because we don't want to get starved.
529 write_outbuf_to_logfile(logger, &fsynced_lsn);
530 }
531 // the inbuf is empty. Make it big enough (just in case it is somehow smaller than a single log entry).
532 if (n_bytes_needed > logger->inbuf.buf_size) {
533 assert(n_bytes_needed < (1<<30)); // it seems unlikely to work if a logentry gets that big.
534 int new_size = max_int(logger->inbuf.buf_size * 2, n_bytes_needed); // make it at least twice as big, and big enough for n_bytes
535 assert(new_size < (1<<30));
536 XREALLOC_N(new_size, logger->inbuf.buf);
537 logger->inbuf.buf_size = new_size;
538 }
539 release_output(logger, fsynced_lsn);
540 }
541
toku_logger_fsync(TOKULOGGER logger)542 void toku_logger_fsync(TOKULOGGER logger)
543 // Effect: This is the exported fsync used by ydb.c for env_log_flush. Group commit doesn't have to work.
544 // Entry: Holds no locks
545 // Exit: Holds no locks
546 // Implementation note: Acquire the output condition lock, then the output permission, then release the output condition lock, then get the input lock.
547 // Then release everything. Hold the input lock while reading the current max lsn in buf to make drd happy that there is no data race.
548 {
549 ml_lock(&logger->input_lock);
550 const LSN max_lsn_in_buf = logger->inbuf.max_lsn_in_buf;
551 ml_unlock(&logger->input_lock);
552
553 toku_logger_maybe_fsync(logger, max_lsn_in_buf, true, false);
554 }
555
toku_logger_fsync_if_lsn_not_fsynced(TOKULOGGER logger,LSN lsn)556 void toku_logger_fsync_if_lsn_not_fsynced (TOKULOGGER logger, LSN lsn) {
557 if (logger->write_log_files) {
558 toku_logger_maybe_fsync(logger, lsn, true, false);
559 }
560 }
561
toku_logger_is_open(TOKULOGGER logger)562 int toku_logger_is_open(TOKULOGGER logger) {
563 if (logger==0) return 0;
564 return logger->is_open;
565 }
566
toku_logger_set_cachetable(TOKULOGGER logger,CACHETABLE ct)567 void toku_logger_set_cachetable (TOKULOGGER logger, CACHETABLE ct) {
568 logger->ct = ct;
569 }
570
toku_logger_set_lg_max(TOKULOGGER logger,uint32_t lg_max)571 int toku_logger_set_lg_max(TOKULOGGER logger, uint32_t lg_max) {
572 if (logger==0) return EINVAL; // no logger
573 if (logger->is_open) return EINVAL;
574 if (lg_max>(1<<30)) return EINVAL; // too big
575 logger->lg_max = lg_max;
576 return 0;
577 }
toku_logger_get_lg_max(TOKULOGGER logger,uint32_t * lg_maxp)578 int toku_logger_get_lg_max(TOKULOGGER logger, uint32_t *lg_maxp) {
579 if (logger==0) return EINVAL; // no logger
580 *lg_maxp = logger->lg_max;
581 return 0;
582 }
583
toku_logger_set_lg_bsize(TOKULOGGER logger,uint32_t bsize)584 int toku_logger_set_lg_bsize(TOKULOGGER logger, uint32_t bsize) {
585 if (logger==0) return EINVAL; // no logger
586 if (logger->is_open) return EINVAL;
587 if (bsize<=0 || bsize>(1<<30)) return EINVAL;
588 logger->write_block_size = bsize;
589 return 0;
590 }
591
toku_logger_find_next_unused_log_file(const char * directory,long long * result)592 int toku_logger_find_next_unused_log_file(const char *directory, long long *result)
593 // This is called during logger initialalization, and no locks are required.
594 {
595 DIR *d=opendir(directory);
596 long long maxf=-1; *result = maxf;
597 struct dirent *de;
598 if (d==0) return get_error_errno();
599 while ((de=readdir(d))) {
600 if (de==0) return get_error_errno();
601 long long thisl = -1;
602 if ( is_a_logfile(de->d_name, &thisl) ) {
603 if ((long long)thisl > maxf) maxf = thisl;
604 }
605 }
606 *result=maxf+1;
607 int r = closedir(d);
608 return r;
609 }
610
611 // TODO: Put this in portability layer when ready
612 // in: file pathname that may have a dirname prefix
613 // return: file leaf name
fileleafname(char * pathname)614 static char * fileleafname(char *pathname) {
615 const char delimiter = '/';
616 char *leafname = strrchr(pathname, delimiter);
617 if (leafname)
618 leafname++;
619 else
620 leafname = pathname;
621 return leafname;
622 }
623
logfilenamecompare(const void * ap,const void * bp)624 static int logfilenamecompare (const void *ap, const void *bp) {
625 char *a=*(char**)ap;
626 char *a_leafname = fileleafname(a);
627 char *b=*(char**)bp;
628 char * b_leafname = fileleafname(b);
629 int rval;
630 bool valid;
631 uint64_t num_a = 0; // placate compiler
632 uint64_t num_b = 0;
633 uint32_t ver_a = 0;
634 uint32_t ver_b = 0;
635 valid = is_a_logfile_any_version(a_leafname, &num_a, &ver_a);
636 invariant(valid);
637 valid = is_a_logfile_any_version(b_leafname, &num_b, &ver_b);
638 invariant(valid);
639 if (ver_a < ver_b) rval = -1;
640 else if (ver_a > ver_b) rval = +1;
641 else if (num_a < num_b) rval = -1;
642 else if (num_a > num_b) rval = +1;
643 else rval = 0;
644 return rval;
645 }
646
647 // Return the log files in sorted order
648 // Return a null_terminated array of strings, and also return the number of strings in the array.
649 // Requires: Race conditions must be dealt with by caller. Either call during initialization or grab the output permission.
toku_logger_find_logfiles(const char * directory,char *** resultp,int * n_logfiles)650 int toku_logger_find_logfiles (const char *directory, char ***resultp, int *n_logfiles)
651 {
652 int result_limit=2;
653 int n_results=0;
654 char **MALLOC_N(result_limit, result);
655 assert(result!= NULL);
656 struct dirent *de;
657 DIR *d=opendir(directory);
658 if (d==0) {
659 int er = get_error_errno();
660 toku_free(result);
661 return er;
662 }
663 int dirnamelen = strlen(directory);
664 while ((de=readdir(d))) {
665 uint64_t thisl;
666 uint32_t version_ignore;
667 if ( !(is_a_logfile_any_version(de->d_name, &thisl, &version_ignore)) ) continue; //#2424: Skip over files that don't match the exact logfile template
668 if (n_results+1>=result_limit) {
669 result_limit*=2;
670 XREALLOC_N(result_limit, result);
671 }
672 int fnamelen = dirnamelen + strlen(de->d_name) + 2; // One for the slash and one for the trailing NUL.
673 char *XMALLOC_N(fnamelen, fname);
674 snprintf(fname, fnamelen, "%s/%s", directory, de->d_name);
675 result[n_results++] = fname;
676 }
677 // Return them in increasing order. Set width to allow for newer log file names ("xxx.tokulog13")
678 // which are one character longer than old log file names ("xxx.tokulog2"). The comparison function
679 // won't look beyond the terminating NUL, so an extra character in the comparison string doesn't matter.
680 // Allow room for terminating NUL after "xxx.tokulog13" even if result[0] is of form "xxx.tokulog2."
681 int width = sizeof(result[0]+2);
682 qsort(result, n_results, width, logfilenamecompare);
683 *resultp = result;
684 *n_logfiles = n_results;
685 result[n_results]=0; // make a trailing null
686 return d ? closedir(d) : 0;
687 }
688
toku_logger_free_logfiles(char ** logfiles,int n_logfiles)689 void toku_logger_free_logfiles(char **logfiles, int n_logfiles) {
690 for (int i = 0; i < n_logfiles; i++)
691 toku_free(logfiles[i]);
692 toku_free(logfiles);
693 }
694
open_logfile(TOKULOGGER logger)695 static int open_logfile (TOKULOGGER logger)
696 // Entry and Exit: This thread has permission to modify the output.
697 {
698 int fnamelen = strlen(logger->directory)+50;
699 char fname[fnamelen];
700 snprintf(fname,
701 fnamelen,
702 "%s/log%012lld.tokulog%d",
703 logger->directory,
704 logger->next_log_file_number,
705 TOKU_LOG_VERSION);
706 long long index = logger->next_log_file_number;
707 if (logger->write_log_files) {
708 logger->fd =
709 toku_os_open(fname,
710 O_CREAT + O_WRONLY + O_TRUNC + O_EXCL + O_BINARY,
711 S_IRUSR + S_IWUSR,
712 *tokudb_file_log_key);
713 if (logger->fd == -1) {
714 return get_error_errno();
715 }
716 fsync_logdir(logger);
717 logger->next_log_file_number++;
718 } else {
719 logger->fd = toku_os_open(
720 DEV_NULL_FILE, O_WRONLY + O_BINARY, S_IWUSR, *tokudb_file_log_key);
721 if (logger->fd == -1) {
722 return get_error_errno();
723 }
724 }
725 toku_os_full_write(logger->fd, "tokulogg", 8);
726 int version_l = toku_htonl(log_format_version); //version MUST be in network byte order regardless of disk order
727 toku_os_full_write(logger->fd, &version_l, 4);
728 if ( logger->write_log_files ) {
729 TOKULOGFILEINFO XMALLOC(lf_info);
730 lf_info->index = index;
731 lf_info->maxlsn = logger->written_lsn;
732 lf_info->version = TOKU_LOG_VERSION;
733 toku_logfilemgr_add_logfile_info(logger->logfilemgr, lf_info);
734 }
735 logger->fsynced_lsn = logger->written_lsn;
736 logger->n_in_file = 12;
737 return 0;
738 }
739
delete_logfile(TOKULOGGER logger,long long index,uint32_t version)740 static void delete_logfile(TOKULOGGER logger, long long index, uint32_t version)
741 // Entry and Exit: This thread has permission to modify the output.
742 {
743 int fnamelen = strlen(logger->directory)+50;
744 char fname[fnamelen];
745 snprintf(fname, fnamelen, "%s/log%012lld.tokulog%d", logger->directory, index, version);
746 int r = remove(fname);
747 invariant_zero(r);
748 }
749
toku_logger_maybe_trim_log(TOKULOGGER logger,LSN trim_lsn)750 void toku_logger_maybe_trim_log(TOKULOGGER logger, LSN trim_lsn)
751 // On entry and exit: No logger locks held.
752 // Acquires and releases output permission.
753 {
754 LSN fsynced_lsn;
755 grab_output(logger, &fsynced_lsn);
756 TOKULOGFILEMGR lfm = logger->logfilemgr;
757 int n_logfiles = toku_logfilemgr_num_logfiles(lfm);
758
759 TOKULOGFILEINFO lf_info = NULL;
760
761 if ( logger->write_log_files && logger->trim_log_files) {
762 while ( n_logfiles > 1 ) { // don't delete current logfile
763 uint32_t log_version;
764 lf_info = toku_logfilemgr_get_oldest_logfile_info(lfm);
765 log_version = lf_info->version;
766 if ( lf_info->maxlsn.lsn >= trim_lsn.lsn ) {
767 // file contains an open LSN, can't delete this or any newer log files
768 break;
769 }
770 // need to save copy - toku_logfilemgr_delete_oldest_logfile_info free's the lf_info
771 long index = lf_info->index;
772 toku_logfilemgr_delete_oldest_logfile_info(lfm);
773 n_logfiles--;
774 delete_logfile(logger, index, log_version);
775 }
776 }
777 release_output(logger, fsynced_lsn);
778 }
779
toku_logger_write_log_files(TOKULOGGER logger,bool write_log_files)780 void toku_logger_write_log_files (TOKULOGGER logger, bool write_log_files)
781 // Called only during initialization (or just after recovery), so no locks are needed.
782 {
783 logger->write_log_files = write_log_files;
784 }
785
toku_logger_trim_log_files(TOKULOGGER logger,bool trim_log_files)786 void toku_logger_trim_log_files (TOKULOGGER logger, bool trim_log_files)
787 // Called only during initialization, so no locks are needed.
788 {
789 logger->trim_log_files = trim_log_files;
790 }
791
toku_logger_txns_exist(TOKULOGGER logger)792 bool toku_logger_txns_exist(TOKULOGGER logger)
793 // Called during close of environment to ensure that transactions don't exist
794 {
795 return toku_txn_manager_txns_exist(logger->txn_manager);
796 }
797
798
toku_logger_maybe_fsync(TOKULOGGER logger,LSN lsn,int do_fsync,bool holds_input_lock)799 void toku_logger_maybe_fsync(TOKULOGGER logger, LSN lsn, int do_fsync, bool holds_input_lock)
800 // Effect: If fsync is nonzero, then make sure that the log is flushed and synced at least up to lsn.
801 // Entry: Holds input lock iff 'holds_input_lock'. The log entry has already been written to the input buffer.
802 // Exit: Holds no locks.
803 // The input lock may be released and then reacquired. Thus this function does not run atomically with respect to other threads.
804 {
805 if (holds_input_lock) {
806 ml_unlock(&logger->input_lock);
807 }
808 if (do_fsync) {
809 // reacquire the locks (acquire output permission first)
810 LSN fsynced_lsn;
811 bool already_done = wait_till_output_already_written_or_output_buffer_available(logger, lsn, &fsynced_lsn);
812 if (already_done) {
813 return;
814 }
815
816 // otherwise we now own the output permission, and our lsn isn't outputed.
817
818 ml_lock(&logger->input_lock);
819
820 swap_inbuf_outbuf(logger);
821
822 ml_unlock(&logger->input_lock); // release the input lock now, so other threads can fill the inbuf. (Thus enabling group commit.)
823
824 write_outbuf_to_logfile(logger, &fsynced_lsn);
825 if (fsynced_lsn.lsn < lsn.lsn) {
826 // it may have gotten fsynced by the write_outbuf_to_logfile.
827 toku_file_fsync_without_accounting(logger->fd);
828 assert(fsynced_lsn.lsn <= logger->written_lsn.lsn);
829 fsynced_lsn = logger->written_lsn;
830 }
831 // the last lsn is only accessed while holding output permission or else when the log file is old.
832 if (logger->write_log_files) {
833 toku_logfilemgr_update_last_lsn(logger->logfilemgr, logger->written_lsn);
834 }
835 release_output(logger, fsynced_lsn);
836 }
837 }
838
839 static void
logger_write_buffer(TOKULOGGER logger,LSN * fsynced_lsn)840 logger_write_buffer(TOKULOGGER logger, LSN *fsynced_lsn)
841 // Entry: Holds the input lock and permission to modify output.
842 // Exit: Holds only the permission to modify output.
843 // Effect: Write the buffers to the output. If DO_FSYNC is true, then fsync.
844 // Note: Only called during single-threaded activity from toku_logger_restart, so locks aren't really needed.
845 {
846 swap_inbuf_outbuf(logger);
847 ml_unlock(&logger->input_lock);
848 write_outbuf_to_logfile(logger, fsynced_lsn);
849 if (logger->write_log_files) {
850 toku_file_fsync_without_accounting(logger->fd);
851 toku_logfilemgr_update_last_lsn(logger->logfilemgr, logger->written_lsn); // t:2294
852 }
853 }
854
toku_logger_restart(TOKULOGGER logger,LSN lastlsn)855 int toku_logger_restart(TOKULOGGER logger, LSN lastlsn)
856 // Entry and exit: Holds no locks (this is called only during single-threaded activity, such as initial start).
857 {
858 int r;
859
860 // flush out the log buffer
861 LSN fsynced_lsn;
862 grab_output(logger, &fsynced_lsn);
863 ml_lock(&logger->input_lock);
864 logger_write_buffer(logger, &fsynced_lsn);
865
866 // close the log file
867 if (logger->write_log_files) { // fsyncs don't work to /dev/null
868 toku_file_fsync_without_accounting(logger->fd);
869 }
870 r = toku_os_close(logger->fd);
871 assert(r == 0);
872 logger->fd = -1;
873
874 // reset the LSN's to the lastlsn when the logger was opened
875 logger->lsn = logger->written_lsn = logger->fsynced_lsn = lastlsn;
876 logger->write_log_files = true;
877 logger->trim_log_files = true;
878
879 // open a new log file
880 r = open_logfile(logger);
881 release_output(logger, fsynced_lsn);
882 return r;
883 }
884
885 // fname is the iname
toku_logger_log_fcreate(TOKUTXN txn,const char * fname,FILENUM filenum,uint32_t mode,uint32_t treeflags,uint32_t nodesize,uint32_t basementnodesize,enum toku_compression_method compression_method)886 void toku_logger_log_fcreate (TOKUTXN txn, const char *fname, FILENUM filenum, uint32_t mode,
887 uint32_t treeflags, uint32_t nodesize, uint32_t basementnodesize,
888 enum toku_compression_method compression_method) {
889 if (txn) {
890 BYTESTRING bs_fname = { .len = (uint32_t) strlen(fname), .data = (char *) fname };
891 // fsync log on fcreate
892 toku_log_fcreate (txn->logger, (LSN*)0, 1, txn, toku_txn_get_txnid(txn), filenum,
893 bs_fname, mode, treeflags, nodesize, basementnodesize, compression_method);
894 }
895 }
896
897
898 // We only do fdelete on open ft's, so we pass the filenum here
toku_logger_log_fdelete(TOKUTXN txn,FILENUM filenum)899 void toku_logger_log_fdelete (TOKUTXN txn, FILENUM filenum) {
900 if (txn) {
901 //No fsync.
902 toku_log_fdelete (txn->logger, (LSN*)0, 0, txn, toku_txn_get_txnid(txn), filenum);
903 }
904 }
905
906
907
908 /* fopen isn't really an action. It's just for bookkeeping. We need to know the filename that goes with a filenum. */
toku_logger_log_fopen(TOKUTXN txn,const char * fname,FILENUM filenum,uint32_t treeflags)909 void toku_logger_log_fopen (TOKUTXN txn, const char * fname, FILENUM filenum, uint32_t treeflags) {
910 if (txn) {
911 BYTESTRING bs;
912 bs.len = strlen(fname);
913 bs.data = (char*)fname;
914 toku_log_fopen (txn->logger, (LSN*)0, 0, bs, filenum, treeflags);
915 }
916 }
917
toku_fread_uint8_t_nocrclen(FILE * f,uint8_t * v)918 static int toku_fread_uint8_t_nocrclen (FILE *f, uint8_t *v) {
919 int vi=fgetc(f);
920 if (vi==EOF) return -1;
921 uint8_t vc=(uint8_t)vi;
922 *v = vc;
923 return 0;
924 }
925
toku_fread_uint8_t(FILE * f,uint8_t * v,struct x1764 * mm,uint32_t * len)926 int toku_fread_uint8_t (FILE *f, uint8_t *v, struct x1764 *mm, uint32_t *len) {
927 int vi=fgetc(f);
928 if (vi==EOF) return -1;
929 uint8_t vc=(uint8_t)vi;
930 toku_x1764_add(mm, &vc, 1);
931 (*len)++;
932 *v = vc;
933 return 0;
934 }
935
toku_fread_uint32_t_nocrclen(FILE * f,uint32_t * v)936 int toku_fread_uint32_t_nocrclen (FILE *f, uint32_t *v) {
937 uint32_t result;
938 uint8_t *cp = (uint8_t*)&result;
939 int r;
940 r = toku_fread_uint8_t_nocrclen (f, cp+0); if (r!=0) return r;
941 r = toku_fread_uint8_t_nocrclen (f, cp+1); if (r!=0) return r;
942 r = toku_fread_uint8_t_nocrclen (f, cp+2); if (r!=0) return r;
943 r = toku_fread_uint8_t_nocrclen (f, cp+3); if (r!=0) return r;
944 *v = toku_dtoh32(result);
945
946 return 0;
947 }
toku_fread_uint32_t(FILE * f,uint32_t * v,struct x1764 * checksum,uint32_t * len)948 int toku_fread_uint32_t (FILE *f, uint32_t *v, struct x1764 *checksum, uint32_t *len) {
949 uint32_t result;
950 uint8_t *cp = (uint8_t*)&result;
951 int r;
952 r = toku_fread_uint8_t (f, cp+0, checksum, len); if(r!=0) return r;
953 r = toku_fread_uint8_t (f, cp+1, checksum, len); if(r!=0) return r;
954 r = toku_fread_uint8_t (f, cp+2, checksum, len); if(r!=0) return r;
955 r = toku_fread_uint8_t (f, cp+3, checksum, len); if(r!=0) return r;
956 *v = toku_dtoh32(result);
957 return 0;
958 }
959
toku_fread_uint64_t(FILE * f,uint64_t * v,struct x1764 * checksum,uint32_t * len)960 int toku_fread_uint64_t (FILE *f, uint64_t *v, struct x1764 *checksum, uint32_t *len) {
961 uint32_t v1,v2;
962 int r;
963 r=toku_fread_uint32_t(f, &v1, checksum, len); if (r!=0) return r;
964 r=toku_fread_uint32_t(f, &v2, checksum, len); if (r!=0) return r;
965 *v = (((uint64_t)v1)<<32 ) | ((uint64_t)v2);
966 return 0;
967 }
968
toku_fread_bool(FILE * f,bool * v,struct x1764 * mm,uint32_t * len)969 int toku_fread_bool (FILE *f, bool *v, struct x1764 *mm, uint32_t *len) {
970 uint8_t iv;
971 int r = toku_fread_uint8_t(f, &iv, mm, len);
972 if (r == 0) {
973 *v = (iv!=0);
974 }
975 return r;
976 }
977
toku_fread_LSN(FILE * f,LSN * lsn,struct x1764 * checksum,uint32_t * len)978 int toku_fread_LSN (FILE *f, LSN *lsn, struct x1764 *checksum, uint32_t *len) {
979 return toku_fread_uint64_t (f, &lsn->lsn, checksum, len);
980 }
981
toku_fread_BLOCKNUM(FILE * f,BLOCKNUM * b,struct x1764 * checksum,uint32_t * len)982 int toku_fread_BLOCKNUM (FILE *f, BLOCKNUM *b, struct x1764 *checksum, uint32_t *len) {
983 return toku_fread_uint64_t (f, (uint64_t*)&b->b, checksum, len);
984 }
985
toku_fread_FILENUM(FILE * f,FILENUM * filenum,struct x1764 * checksum,uint32_t * len)986 int toku_fread_FILENUM (FILE *f, FILENUM *filenum, struct x1764 *checksum, uint32_t *len) {
987 return toku_fread_uint32_t (f, &filenum->fileid, checksum, len);
988 }
989
toku_fread_TXNID(FILE * f,TXNID * txnid,struct x1764 * checksum,uint32_t * len)990 int toku_fread_TXNID (FILE *f, TXNID *txnid, struct x1764 *checksum, uint32_t *len) {
991 return toku_fread_uint64_t (f, txnid, checksum, len);
992 }
993
toku_fread_TXNID_PAIR(FILE * f,TXNID_PAIR * txnid,struct x1764 * checksum,uint32_t * len)994 int toku_fread_TXNID_PAIR (FILE *f, TXNID_PAIR *txnid, struct x1764 *checksum, uint32_t *len) {
995 TXNID parent;
996 TXNID child;
997 int r;
998 r = toku_fread_TXNID(f, &parent, checksum, len); if (r != 0) { return r; }
999 r = toku_fread_TXNID(f, &child, checksum, len); if (r != 0) { return r; }
1000 txnid->parent_id64 = parent;
1001 txnid->child_id64 = child;
1002 return 0;
1003 }
1004
1005
toku_fread_XIDP(FILE * f,XIDP * xidp,struct x1764 * checksum,uint32_t * len)1006 int toku_fread_XIDP (FILE *f, XIDP *xidp, struct x1764 *checksum, uint32_t *len) {
1007 // These reads are verbose because XA defined the fields as "long", but we use 4 bytes, 1 byte and 1 byte respectively.
1008 TOKU_XA_XID *XMALLOC(xid);
1009 {
1010 uint32_t formatID;
1011 int r = toku_fread_uint32_t(f, &formatID, checksum, len);
1012 if (r!=0) return r;
1013 xid->formatID = formatID;
1014 }
1015 {
1016 uint8_t gtrid_length;
1017 int r = toku_fread_uint8_t (f, >rid_length, checksum, len);
1018 if (r!=0) return r;
1019 xid->gtrid_length = gtrid_length;
1020 }
1021 {
1022 uint8_t bqual_length;
1023 int r = toku_fread_uint8_t (f, &bqual_length, checksum, len);
1024 if (r!=0) return r;
1025 xid->bqual_length = bqual_length;
1026 }
1027 for (int i=0; i< xid->gtrid_length + xid->bqual_length; i++) {
1028 uint8_t byte;
1029 int r = toku_fread_uint8_t(f, &byte, checksum, len);
1030 if (r!=0) return r;
1031 xid->data[i] = byte;
1032 }
1033 *xidp = xid;
1034 return 0;
1035 }
1036
1037 // fills in the bs with malloced data.
toku_fread_BYTESTRING(FILE * f,BYTESTRING * bs,struct x1764 * checksum,uint32_t * len)1038 int toku_fread_BYTESTRING (FILE *f, BYTESTRING *bs, struct x1764 *checksum, uint32_t *len) {
1039 int r=toku_fread_uint32_t(f, (uint32_t*)&bs->len, checksum, len);
1040 if (r!=0) return r;
1041 XMALLOC_N(bs->len, bs->data);
1042 uint32_t i;
1043 for (i=0; i<bs->len; i++) {
1044 r=toku_fread_uint8_t(f, (uint8_t*)&bs->data[i], checksum, len);
1045 if (r!=0) {
1046 toku_free(bs->data);
1047 bs->data=0;
1048 return r;
1049 }
1050 }
1051 return 0;
1052 }
1053
1054 // fills in the fs with malloced data.
toku_fread_FILENUMS(FILE * f,FILENUMS * fs,struct x1764 * checksum,uint32_t * len)1055 int toku_fread_FILENUMS (FILE *f, FILENUMS *fs, struct x1764 *checksum, uint32_t *len) {
1056 int r=toku_fread_uint32_t(f, (uint32_t*)&fs->num, checksum, len);
1057 if (r!=0) return r;
1058 XMALLOC_N(fs->num, fs->filenums);
1059 uint32_t i;
1060 for (i=0; i<fs->num; i++) {
1061 r=toku_fread_FILENUM (f, &fs->filenums[i], checksum, len);
1062 if (r!=0) {
1063 toku_free(fs->filenums);
1064 fs->filenums=0;
1065 return r;
1066 }
1067 }
1068 return 0;
1069 }
1070
toku_logprint_LSN(FILE * outf,FILE * inf,const char * fieldname,struct x1764 * checksum,uint32_t * len,const char * format)1071 int toku_logprint_LSN (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format __attribute__((__unused__))) {
1072 LSN v;
1073 int r = toku_fread_LSN(inf, &v, checksum, len);
1074 if (r!=0) return r;
1075 fprintf(outf, " %s=%" PRIu64, fieldname, v.lsn);
1076 return 0;
1077 }
1078
toku_logprint_TXNID(FILE * outf,FILE * inf,const char * fieldname,struct x1764 * checksum,uint32_t * len,const char * format)1079 int toku_logprint_TXNID (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format __attribute__((__unused__))) {
1080 TXNID v;
1081 int r = toku_fread_TXNID(inf, &v, checksum, len);
1082 if (r!=0) return r;
1083 fprintf(outf, " %s=%" PRIu64, fieldname, v);
1084 return 0;
1085 }
1086
toku_logprint_TXNID_PAIR(FILE * outf,FILE * inf,const char * fieldname,struct x1764 * checksum,uint32_t * len,const char * format)1087 int toku_logprint_TXNID_PAIR (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format __attribute__((__unused__))) {
1088 TXNID_PAIR v;
1089 int r = toku_fread_TXNID_PAIR(inf, &v, checksum, len);
1090 if (r!=0) return r;
1091 fprintf(outf, " %s=%" PRIu64 ",%" PRIu64, fieldname, v.parent_id64, v.child_id64);
1092 return 0;
1093 }
1094
toku_logprint_XIDP(FILE * outf,FILE * inf,const char * fieldname,struct x1764 * checksum,uint32_t * len,const char * format)1095 int toku_logprint_XIDP (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format __attribute__((__unused__))) {
1096 XIDP vp;
1097 int r = toku_fread_XIDP(inf, &vp, checksum, len);
1098 if (r!=0) return r;
1099 fprintf(outf, " %s={formatID=0x%lx gtrid_length=%ld bqual_length=%ld data=", fieldname, vp->formatID, vp->gtrid_length, vp->bqual_length);
1100 toku_print_bytes(outf, vp->gtrid_length + vp->bqual_length, vp->data);
1101 fprintf(outf, "}");
1102 toku_free(vp);
1103 return 0;
1104 }
1105
toku_logprint_uint8_t(FILE * outf,FILE * inf,const char * fieldname,struct x1764 * checksum,uint32_t * len,const char * format)1106 int toku_logprint_uint8_t (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format) {
1107 uint8_t v;
1108 int r = toku_fread_uint8_t(inf, &v, checksum, len);
1109 if (r!=0) return r;
1110 fprintf(outf, " %s=%d", fieldname, v);
1111 if (format) fprintf(outf, format, v);
1112 else if (v=='\'') fprintf(outf, "('\'')");
1113 else if (isprint(v)) fprintf(outf, "('%c')", v);
1114 else {}/*nothing*/
1115 return 0;
1116 }
1117
toku_logprint_uint32_t(FILE * outf,FILE * inf,const char * fieldname,struct x1764 * checksum,uint32_t * len,const char * format)1118 int toku_logprint_uint32_t (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format) {
1119 uint32_t v;
1120 int r = toku_fread_uint32_t(inf, &v, checksum, len);
1121 if (r!=0) return r;
1122 fprintf(outf, " %s=", fieldname);
1123 fprintf(outf, format ? format : "%d", v);
1124 return 0;
1125 }
1126
toku_logprint_uint64_t(FILE * outf,FILE * inf,const char * fieldname,struct x1764 * checksum,uint32_t * len,const char * format)1127 int toku_logprint_uint64_t (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format) {
1128 uint64_t v;
1129 int r = toku_fread_uint64_t(inf, &v, checksum, len);
1130 if (r!=0) return r;
1131 fprintf(outf, " %s=", fieldname);
1132 fprintf(outf, format ? format : "%" PRId64, v);
1133 return 0;
1134 }
1135
toku_logprint_bool(FILE * outf,FILE * inf,const char * fieldname,struct x1764 * checksum,uint32_t * len,const char * format)1136 int toku_logprint_bool (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format __attribute__((__unused__))) {
1137 bool v;
1138 int r = toku_fread_bool(inf, &v, checksum, len);
1139 if (r!=0) return r;
1140 fprintf(outf, " %s=%s", fieldname, v ? "true" : "false");
1141 return 0;
1142
1143 }
1144
toku_print_BYTESTRING(FILE * outf,uint32_t len,char * data)1145 void toku_print_BYTESTRING (FILE *outf, uint32_t len, char *data) {
1146 fprintf(outf, "{len=%u data=", len);
1147 toku_print_bytes(outf, len, data);
1148 fprintf(outf, "}");
1149
1150 }
1151
toku_logprint_BYTESTRING(FILE * outf,FILE * inf,const char * fieldname,struct x1764 * checksum,uint32_t * len,const char * format)1152 int toku_logprint_BYTESTRING (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format __attribute__((__unused__))) {
1153 BYTESTRING bs;
1154 int r = toku_fread_BYTESTRING(inf, &bs, checksum, len);
1155 if (r!=0) return r;
1156 fprintf(outf, " %s=", fieldname);
1157 toku_print_BYTESTRING(outf, bs.len, bs.data);
1158 toku_free(bs.data);
1159 return 0;
1160 }
1161
toku_logprint_BLOCKNUM(FILE * outf,FILE * inf,const char * fieldname,struct x1764 * checksum,uint32_t * len,const char * format)1162 int toku_logprint_BLOCKNUM (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format) {
1163 return toku_logprint_uint64_t(outf, inf, fieldname, checksum, len, format);
1164
1165 }
1166
toku_logprint_FILENUM(FILE * outf,FILE * inf,const char * fieldname,struct x1764 * checksum,uint32_t * len,const char * format)1167 int toku_logprint_FILENUM (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format) {
1168 return toku_logprint_uint32_t(outf, inf, fieldname, checksum, len, format);
1169
1170 }
1171
1172 static void
toku_print_FILENUMS(FILE * outf,uint32_t num,FILENUM * filenums)1173 toku_print_FILENUMS (FILE *outf, uint32_t num, FILENUM *filenums) {
1174 fprintf(outf, "{num=%u filenums=\"", num);
1175 uint32_t i;
1176 for (i=0; i<num; i++) {
1177 if (i>0)
1178 fprintf(outf, ",");
1179 fprintf(outf, "0x%" PRIx32, filenums[i].fileid);
1180 }
1181 fprintf(outf, "\"}");
1182
1183 }
1184
toku_logprint_FILENUMS(FILE * outf,FILE * inf,const char * fieldname,struct x1764 * checksum,uint32_t * len,const char * format)1185 int toku_logprint_FILENUMS (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format __attribute__((__unused__))) {
1186 FILENUMS bs;
1187 int r = toku_fread_FILENUMS(inf, &bs, checksum, len);
1188 if (r!=0) return r;
1189 fprintf(outf, " %s=", fieldname);
1190 toku_print_FILENUMS(outf, bs.num, bs.filenums);
1191 toku_free(bs.filenums);
1192 return 0;
1193 }
1194
toku_read_and_print_logmagic(FILE * f,uint32_t * versionp)1195 int toku_read_and_print_logmagic (FILE *f, uint32_t *versionp) {
1196 {
1197 char magic[8];
1198 int r=fread(magic, 1, 8, f);
1199 if (r!=8) {
1200 return DB_BADFORMAT;
1201 }
1202 if (memcmp(magic, "tokulogg", 8)!=0) {
1203 return DB_BADFORMAT;
1204 }
1205 }
1206 {
1207 int version;
1208 int r=fread(&version, 1, 4, f);
1209 if (r!=4) {
1210 return DB_BADFORMAT;
1211 }
1212 printf("tokulog v.%u\n", toku_ntohl(version));
1213 //version MUST be in network order regardless of disk order
1214 *versionp=toku_ntohl(version);
1215 }
1216 return 0;
1217 }
1218
toku_read_logmagic(FILE * f,uint32_t * versionp)1219 int toku_read_logmagic (FILE *f, uint32_t *versionp) {
1220 {
1221 char magic[8];
1222 int r=fread(magic, 1, 8, f);
1223 if (r!=8) {
1224 return DB_BADFORMAT;
1225 }
1226 if (memcmp(magic, "tokulogg", 8)!=0) {
1227 return DB_BADFORMAT;
1228 }
1229 }
1230 {
1231 int version;
1232 int r=fread(&version, 1, 4, f);
1233 if (r!=4) {
1234 return DB_BADFORMAT;
1235 }
1236 *versionp=toku_ntohl(version);
1237 }
1238 return 0;
1239 }
1240
toku_txn_get_txnid(TOKUTXN txn)1241 TXNID_PAIR toku_txn_get_txnid (TOKUTXN txn) {
1242 TXNID_PAIR tp = { .parent_id64 = TXNID_NONE, .child_id64 = TXNID_NONE};
1243 if (txn==0) return tp;
1244 else return txn->txnid;
1245 }
1246
toku_logger_last_lsn(TOKULOGGER logger)1247 LSN toku_logger_last_lsn(TOKULOGGER logger) {
1248 return logger->lsn;
1249 }
1250
toku_txn_logger(TOKUTXN txn)1251 TOKULOGGER toku_txn_logger (TOKUTXN txn) {
1252 return txn ? txn->logger : 0;
1253 }
1254
toku_txnid2txn(TOKULOGGER logger,TXNID_PAIR txnid,TOKUTXN * result)1255 void toku_txnid2txn(TOKULOGGER logger, TXNID_PAIR txnid, TOKUTXN *result) {
1256 TOKUTXN root_txn = NULL;
1257 toku_txn_manager_suspend(logger->txn_manager);
1258 toku_txn_manager_id2txn_unlocked(logger->txn_manager, txnid, &root_txn);
1259 if (root_txn == NULL || root_txn->txnid.child_id64 == txnid.child_id64) {
1260 *result = root_txn;
1261 }
1262 else if (root_txn != NULL) {
1263 root_txn->child_manager->suspend();
1264 root_txn->child_manager->find_tokutxn_by_xid_unlocked(txnid, result);
1265 root_txn->child_manager->resume();
1266 }
1267 toku_txn_manager_resume(logger->txn_manager);
1268 }
1269
1270 // Find the earliest LSN in a log. No locks are needed.
peek_at_log(TOKULOGGER logger,char * filename,LSN * first_lsn)1271 static int peek_at_log(TOKULOGGER logger, char *filename, LSN *first_lsn) {
1272 int fd = toku_os_open(
1273 filename, O_RDONLY + O_BINARY, S_IRUSR, *tokudb_file_log_key);
1274 if (fd < 0) {
1275 int er = get_error_errno();
1276 if (logger->write_log_files)
1277 printf("couldn't open: %s\n", strerror(er));
1278 return er;
1279 }
1280 enum { SKIP = 12+1+4 }; // read the 12 byte header, the first message, and the first len
1281 unsigned char header[SKIP+8];
1282 int r = read(fd, header, SKIP+8);
1283 if (r!=SKIP+8) return 0; // cannot determine that it's archivable, so we'll assume no. If a later-log is archivable is then this one will be too.
1284
1285 uint64_t lsn;
1286 {
1287 struct rbuf rb;
1288 rb.buf = header+SKIP;
1289 rb.size = 8;
1290 rb.ndone = 0;
1291 lsn = rbuf_ulonglong(&rb);
1292 }
1293
1294 r = toku_os_close(fd);
1295
1296 if (r != 0) {
1297 return 0;
1298 }
1299
1300 first_lsn->lsn = lsn;
1301 return 0;
1302 }
1303
1304 // Return a malloc'd array of malloc'd strings which are the filenames that can be archived.
1305 // Output permission are obtained briefly so we can get a list of the log files without conflicting.
toku_logger_log_archive(TOKULOGGER logger,char *** logs_p,int flags)1306 int toku_logger_log_archive (TOKULOGGER logger, char ***logs_p, int flags) {
1307 if (flags!=0) return EINVAL; // don't know what to do.
1308 int all_n_logs;
1309 int i;
1310 char **all_logs;
1311 int n_logfiles;
1312 LSN fsynced_lsn;
1313 grab_output(logger, &fsynced_lsn);
1314 int r = toku_logger_find_logfiles (logger->directory, &all_logs, &n_logfiles);
1315 release_output(logger, fsynced_lsn);
1316 if (r!=0) return r;
1317
1318 for (i=0; all_logs[i]; i++);
1319 all_n_logs=i;
1320 // get them into increasing order
1321 qsort(all_logs, all_n_logs, sizeof(all_logs[0]), logfilenamecompare);
1322
1323 LSN save_lsn = logger->last_completed_checkpoint_lsn;
1324
1325 // Now starting at the last one, look for archivable ones.
1326 // Count the total number of bytes, because we have to return a single big array. (That's the BDB interface. Bleah...)
1327 LSN earliest_lsn_in_logfile={(unsigned long long)(-1LL)};
1328 r = peek_at_log(logger, all_logs[all_n_logs-1], &earliest_lsn_in_logfile); // try to find the lsn that's in the most recent log
1329 if (earliest_lsn_in_logfile.lsn <= save_lsn.lsn) {
1330 i=all_n_logs-1;
1331 } else {
1332 for (i=all_n_logs-2; i>=0; i--) { // start at all_n_logs-2 because we never archive the most recent log
1333 r = peek_at_log(logger, all_logs[i], &earliest_lsn_in_logfile);
1334 if (r!=0) continue; // In case of error, just keep going
1335
1336 if (earliest_lsn_in_logfile.lsn <= save_lsn.lsn) {
1337 break;
1338 }
1339 }
1340 }
1341
1342 // all log files up to, but but not including, i can be archived.
1343 int n_to_archive=i;
1344 int count_bytes=0;
1345 for (i=0; i<n_to_archive; i++) {
1346 count_bytes+=1+strlen(all_logs[i]);
1347 }
1348 char **result;
1349 if (i==0) {
1350 result=0;
1351 } else {
1352 CAST_FROM_VOIDP(result, toku_xmalloc((1+n_to_archive)*sizeof(*result) + count_bytes));
1353 char *base = (char*)(result+1+n_to_archive);
1354 for (i=0; i<n_to_archive; i++) {
1355 int len=1+strlen(all_logs[i]);
1356 result[i]=base;
1357 memcpy(base, all_logs[i], len);
1358 base+=len;
1359 }
1360 result[n_to_archive]=0;
1361 }
1362 for (i=0; all_logs[i]; i++) {
1363 toku_free(all_logs[i]);
1364 }
1365 toku_free(all_logs);
1366 *logs_p = result;
1367 return 0;
1368 }
1369
1370
toku_logger_txn_parent(TOKUTXN txn)1371 TOKUTXN toku_logger_txn_parent (TOKUTXN txn) {
1372 return txn->parent;
1373 }
1374
toku_logger_note_checkpoint(TOKULOGGER logger,LSN lsn)1375 void toku_logger_note_checkpoint(TOKULOGGER logger, LSN lsn) {
1376 logger->last_completed_checkpoint_lsn = lsn;
1377 }
1378
1379 void
toku_logger_get_status(TOKULOGGER logger,LOGGER_STATUS statp)1380 toku_logger_get_status(TOKULOGGER logger, LOGGER_STATUS statp) {
1381 log_status.init();
1382 if (logger) {
1383 LOG_STATUS_VAL(LOGGER_NEXT_LSN) = logger->lsn.lsn;
1384 LOG_STATUS_VAL(LOGGER_NUM_WRITES) = logger->num_writes_to_disk;
1385 LOG_STATUS_VAL(LOGGER_BYTES_WRITTEN) = logger->bytes_written_to_disk;
1386 // No compression on logfiles so the uncompressed size is just number of bytes written
1387 LOG_STATUS_VAL(LOGGER_UNCOMPRESSED_BYTES_WRITTEN) = logger->bytes_written_to_disk;
1388 LOG_STATUS_VAL(LOGGER_TOKUTIME_WRITES) = logger->time_spent_writing_to_disk;
1389 LOG_STATUS_VAL(LOGGER_WAIT_BUF_LONG) = logger->num_wait_buf_long;
1390 }
1391 *statp = log_status;
1392 }
1393
1394
1395
1396 //////////////////////////////////////////////////////////////////////////////////////////////////////
1397 // Used for upgrade:
1398 // if any valid log files exist in log_dir, then
1399 // set *found_any_logs to true and set *version_found to version number of latest log
1400 int
toku_get_version_of_logs_on_disk(const char * log_dir,bool * found_any_logs,uint32_t * version_found)1401 toku_get_version_of_logs_on_disk(const char *log_dir, bool *found_any_logs, uint32_t *version_found) {
1402 bool found = false;
1403 uint32_t highest_version = 0;
1404 int r = 0;
1405
1406 struct dirent *de;
1407 DIR *d=opendir(log_dir);
1408 if (d==NULL) {
1409 r = get_error_errno();
1410 }
1411 else {
1412 // Examine every file in the directory and find highest version
1413 while ((de=readdir(d))) {
1414 uint32_t this_log_version;
1415 uint64_t this_log_number;
1416 bool is_log = is_a_logfile_any_version(de->d_name, &this_log_number, &this_log_version);
1417 if (is_log) {
1418 if (!found) { // first log file found
1419 found = true;
1420 highest_version = this_log_version;
1421 }
1422 else
1423 highest_version = highest_version > this_log_version ? highest_version : this_log_version;
1424 }
1425 }
1426 int r2 = closedir(d);
1427 if (r==0) r = r2;
1428 }
1429 if (r==0) {
1430 *found_any_logs = found;
1431 if (found)
1432 *version_found = highest_version;
1433 }
1434 return r;
1435 }
1436
toku_logger_get_txn_manager(TOKULOGGER logger)1437 TXN_MANAGER toku_logger_get_txn_manager(TOKULOGGER logger) {
1438 return logger->txn_manager;
1439 }
1440