1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include <my_global.h>
40 #include <memory.h>
41 #include <ctype.h>
42 #include <limits.h>
43 #include <unistd.h>
44 
45 #include "ft/serialize/block_table.h"
46 #include "ft/ft.h"
47 #include "ft/logger/log-internal.h"
48 #include "ft/txn/txn_manager.h"
49 #include "ft/txn/rollback_log_node_cache.h"
50 
51 #include "util/status.h"
52 
53 int writing_rollback = 0;
54 extern "C" {
55   uint force_recovery = 0;
56 }
57 
58 static const int log_format_version = TOKU_LOG_VERSION;
59 
60 toku_instr_key *result_output_condition_lock_mutex_key;
61 toku_instr_key *result_output_condition_key;
62 toku_instr_key *tokudb_file_log_key;
63 
64 static int open_logfile(TOKULOGGER logger);
65 static void logger_write_buffer(TOKULOGGER logger, LSN *fsynced_lsn);
66 static void delete_logfile(TOKULOGGER logger,
67                            long long index,
68                            uint32_t version);
69 static void grab_output(TOKULOGGER logger, LSN *fsynced_lsn);
70 static void release_output(TOKULOGGER logger, LSN fsynced_lsn);
71 
toku_print_bytes(FILE * outf,uint32_t len,char * data)72 static void toku_print_bytes (FILE *outf, uint32_t len, char *data) {
73     fprintf(outf, "\"");
74     uint32_t i;
75     for (i=0; i<len; i++) {
76         switch (data[i]) {
77         case '"':  fprintf(outf, "\\\""); break;
78         case '\\': fprintf(outf, "\\\\"); break;
79         case '\n': fprintf(outf, "\\n");  break;
80         default:
81             if (isprint(data[i])) fprintf(outf, "%c", data[i]);
82             else fprintf(outf, "\\%03o", (unsigned char)(data[i]));
83         }
84     }
85     fprintf(outf, "\"");
86 }
87 
is_a_logfile_any_version(const char * name,uint64_t * number_result,uint32_t * version_of_log)88 static bool is_a_logfile_any_version (const char *name, uint64_t *number_result, uint32_t *version_of_log) {
89     bool rval = true;
90     uint64_t result;
91     int n;
92     int r;
93     uint32_t version;
94     r = sscanf(name, "log%" SCNu64 ".tokulog%" SCNu32 "%n", &result, &version, &n);
95     if (r!=2 || name[n]!='\0' || version <= TOKU_LOG_VERSION_1) {
96         //Version 1 does NOT append 'version' to end of '.tokulog'
97         version = TOKU_LOG_VERSION_1;
98         r = sscanf(name, "log%" SCNu64 ".tokulog%n", &result, &n);
99         if (r!=1 || name[n]!='\0') {
100             rval = false;
101         }
102     }
103     if (rval) {
104         *number_result  = result;
105         *version_of_log = version;
106     }
107 
108     return rval;
109 }
110 
111 // added for #2424, improved for #2521
is_a_logfile(const char * name,long long * number_result)112 static bool is_a_logfile (const char *name, long long *number_result) {
113     bool rval;
114     uint64_t result;
115     uint32_t version;
116     rval = is_a_logfile_any_version(name, &result, &version);
117     if (rval && version != TOKU_LOG_VERSION)
118         rval = false;
119     if (rval)
120         *number_result = result;
121     return rval;
122 }
123 
124 
125 // TODO: can't fail
toku_logger_create(TOKULOGGER * resultp)126 int toku_logger_create (TOKULOGGER *resultp) {
127     TOKULOGGER CALLOC(result);
128     if (result==0) return get_error_errno();
129     result->is_open=false;
130     result->write_log_files = true;
131     result->trim_log_files = true;
132     result->directory=0;
133     // fd is uninitialized on purpose
134     // ct is uninitialized on purpose
135     result->lg_max = 100<<20; // 100MB default
136     // lsn is uninitialized
137     result->inbuf  = (struct logbuf) {0, LOGGER_MIN_BUF_SIZE, (char *) toku_xmalloc(LOGGER_MIN_BUF_SIZE), ZERO_LSN};
138     result->outbuf = (struct logbuf) {0, LOGGER_MIN_BUF_SIZE, (char *) toku_xmalloc(LOGGER_MIN_BUF_SIZE), ZERO_LSN};
139     // written_lsn is uninitialized
140     // fsynced_lsn is uninitialized
141     result->last_completed_checkpoint_lsn = ZERO_LSN;
142     // next_log_file_number is uninitialized
143     // n_in_file is uninitialized
144     result->write_block_size = FT_DEFAULT_NODE_SIZE; // default logging size is the same as the default ft block size
145     toku_logfilemgr_create(&result->logfilemgr);
146     *resultp = result;
147     ml_init(&result->input_lock);
148     toku_mutex_init(*result_output_condition_lock_mutex_key,
149                     &result->output_condition_lock,
150                     nullptr);
151     toku_cond_init(
152         *result_output_condition_key, &result->output_condition, nullptr);
153     result->rollback_cachefile = NULL;
154     result->output_is_available = true;
155     toku_txn_manager_init(&result->txn_manager);
156     return 0;
157 }
158 
fsync_logdir(TOKULOGGER logger)159 static void fsync_logdir(TOKULOGGER logger) {
160     toku_fsync_dirfd_without_accounting(logger->dir);
161 }
162 
open_logdir(TOKULOGGER logger,const char * directory)163 static int open_logdir(TOKULOGGER logger, const char *directory) {
164     if (toku_os_is_absolute_name(directory)) {
165         logger->directory = toku_strdup(directory);
166     } else {
167         char cwdbuf[PATH_MAX];
168         char *cwd = getcwd(cwdbuf, PATH_MAX);
169         if (cwd == NULL)
170             return -1;
171         char *MALLOC_N(strlen(cwd) + strlen(directory) + 2, new_log_dir);
172         if (new_log_dir == NULL) {
173             return -2;
174         }
175         sprintf(new_log_dir, "%s/%s", cwd, directory);
176         logger->directory = new_log_dir;
177     }
178     if (logger->directory==0) return get_error_errno();
179 
180     logger->dir = opendir(logger->directory);
181     if ( logger->dir == NULL ) return -1;
182     return 0;
183 }
184 
close_logdir(TOKULOGGER logger)185 static int close_logdir(TOKULOGGER logger) {
186     return closedir(logger->dir);
187 }
188 
189 int
toku_logger_open_with_last_xid(const char * directory,TOKULOGGER logger,TXNID last_xid)190 toku_logger_open_with_last_xid(const char *directory, TOKULOGGER logger, TXNID last_xid) {
191     if (logger->is_open) return EINVAL;
192 
193     int r;
194     TXNID last_xid_if_clean_shutdown = TXNID_NONE;
195     r = toku_logfilemgr_init(logger->logfilemgr, directory, &last_xid_if_clean_shutdown);
196     if ( r!=0 )
197         return r;
198     logger->lsn = toku_logfilemgr_get_last_lsn(logger->logfilemgr);
199     logger->written_lsn = logger->lsn;
200     logger->fsynced_lsn = logger->lsn;
201     logger->inbuf.max_lsn_in_buf  = logger->lsn;
202     logger->outbuf.max_lsn_in_buf = logger->lsn;
203 
204     // open directory, save pointer for fsyncing t:2445
205     r = open_logdir(logger, directory);
206     if (r!=0) return r;
207 
208     long long nexti;
209     r = toku_logger_find_next_unused_log_file(logger->directory, &nexti);
210     if (r!=0) return r;
211 
212     logger->next_log_file_number = nexti;
213     r = open_logfile(logger);
214     if (r!=0) return r;
215     if (last_xid == TXNID_NONE) {
216         last_xid = last_xid_if_clean_shutdown;
217     }
218     toku_txn_manager_set_last_xid_from_logger(logger->txn_manager, last_xid);
219 
220     logger->is_open = true;
221     return 0;
222 }
223 
toku_logger_open(const char * directory,TOKULOGGER logger)224 int toku_logger_open (const char *directory, TOKULOGGER logger) {
225     return toku_logger_open_with_last_xid(directory, logger, TXNID_NONE);
226 }
227 
toku_logger_rollback_is_open(TOKULOGGER logger)228 bool toku_logger_rollback_is_open (TOKULOGGER logger) {
229     return logger->rollback_cachefile != NULL;
230 }
231 
232 #define MAX_CACHED_ROLLBACK_NODES 4096
233 
toku_logger_initialize_rollback_cache(TOKULOGGER logger,FT ft)234 void toku_logger_initialize_rollback_cache(TOKULOGGER logger, FT ft) {
235     ft->blocktable.free_unused_blocknums(ft->h->root_blocknum);
236     logger->rollback_cache.init(MAX_CACHED_ROLLBACK_NODES);
237 }
238 
toku_logger_open_rollback(TOKULOGGER logger,CACHETABLE cachetable,bool create)239 int toku_logger_open_rollback(TOKULOGGER logger, CACHETABLE cachetable, bool create) {
240     writing_rollback++;
241     assert(logger->is_open);
242     assert(!logger->rollback_cachefile);
243 
244     FT_HANDLE ft_handle = nullptr;   // Note, there is no DB associated with this FT.
245     toku_ft_handle_create(&ft_handle);
246     int r = toku_ft_handle_open(ft_handle, toku_product_name_strings.rollback_cachefile, create, create, cachetable, nullptr);
247     if (r == 0) {
248         FT ft = ft_handle->ft;
249         logger->rollback_cachefile = ft->cf;
250         toku_logger_initialize_rollback_cache(logger, ft_handle->ft);
251 
252         // Verify it is empty
253         // Must have no data blocks (rollback logs or otherwise).
254         ft->blocktable.verify_no_data_blocks_except_root(ft->h->root_blocknum);
255         bool is_empty = toku_ft_is_empty_fast(ft_handle);
256         assert(is_empty);
257     } else {
258         toku_ft_handle_close(ft_handle);
259     }
260     writing_rollback--;
261     return r;
262 }
263 
264 
265 //  Requires: Rollback cachefile can only be closed immediately after a checkpoint,
266 //            so it will always be clean (!h->dirty) when about to be closed.
267 //            Rollback log can only be closed when there are no open transactions,
268 //            so it will always be empty (no data blocks) when about to be closed.
toku_logger_close_rollback_check_empty(TOKULOGGER logger,bool clean_shutdown)269 void toku_logger_close_rollback_check_empty(TOKULOGGER logger, bool clean_shutdown) {
270     CACHEFILE cf = logger->rollback_cachefile;  // stored in logger at rollback cachefile open
271     if (cf) {
272         FT_HANDLE ft_to_close;
273         {   //Find "ft_to_close"
274             logger->rollback_cache.destroy();
275             FT CAST_FROM_VOIDP(ft, toku_cachefile_get_userdata(cf));
276             if (clean_shutdown) {
277                 //Verify it is safe to close it.
278                 assert(!ft->h->dirty());  //Must not be dirty.
279                 ft->blocktable.free_unused_blocknums(ft->h->root_blocknum);
280                 // Must have no data blocks (rollback logs or otherwise).
281                 ft->blocktable.verify_no_data_blocks_except_root(ft->h->root_blocknum);
282                 assert(!ft->h->dirty());
283             } else {
284                 ft->h->clear_dirty();
285             }
286             ft_to_close = toku_ft_get_only_existing_ft_handle(ft);
287             if (clean_shutdown) {
288                 bool is_empty;
289                 is_empty = toku_ft_is_empty_fast(ft_to_close);
290                 assert(is_empty);
291                 assert(!ft->h->dirty()); // it should not have been dirtied by the toku_ft_is_empty test.
292             }
293         }
294 
295         toku_ft_handle_close(ft_to_close);
296         //Set as dealt with already.
297         logger->rollback_cachefile = NULL;
298     }
299 }
300 
toku_logger_close_rollback(TOKULOGGER logger)301 void toku_logger_close_rollback(TOKULOGGER logger) {
302     toku_logger_close_rollback_check_empty(logger, true);
303 }
304 
305 // No locks held on entry
306 // No locks held on exit.
307 // No locks are needed, since you cannot legally close the log concurrently with doing anything else.
308 // TODO: can't fail
toku_logger_close(TOKULOGGER * loggerp)309 int toku_logger_close(TOKULOGGER *loggerp) {
310     int r;
311     TOKULOGGER logger = *loggerp;
312     if (!logger->is_open) {
313         goto is_closed;
314     }
315     ml_lock(&logger->input_lock);
316     LSN fsynced_lsn;
317     grab_output(logger, &fsynced_lsn);
318     logger_write_buffer(logger, &fsynced_lsn);
319     if (logger->fd!=-1) {
320         if (logger->write_log_files) {
321             toku_file_fsync_without_accounting(logger->fd);
322         }
323         r = toku_os_close(logger->fd);
324         assert(r == 0);
325     }
326     r = close_logdir(logger);
327     assert(r == 0);
328     logger->fd=-1;
329     release_output(logger, fsynced_lsn);
330 
331 is_closed:
332     toku_free(logger->inbuf.buf);
333     toku_free(logger->outbuf.buf);
334     // before destroying locks they must be left in the unlocked state.
335     ml_destroy(&logger->input_lock);
336     toku_mutex_destroy(&logger->output_condition_lock);
337     toku_cond_destroy(&logger->output_condition);
338     toku_txn_manager_destroy(logger->txn_manager);
339     if (logger->directory) toku_free(logger->directory);
340     toku_logfilemgr_destroy(&logger->logfilemgr);
341     toku_free(logger);
342     *loggerp=0;
343     return 0;
344 }
345 
toku_logger_shutdown(TOKULOGGER logger)346 void toku_logger_shutdown(TOKULOGGER logger) {
347     if (logger->is_open) {
348         TXN_MANAGER mgr = logger->txn_manager;
349         if (toku_txn_manager_num_live_root_txns(mgr) == 0) {
350             TXNID last_xid = toku_txn_manager_get_last_xid(mgr);
351             toku_log_shutdown(logger, NULL, true, 0, last_xid);
352         }
353     }
354 }
355 
close_and_open_logfile(TOKULOGGER logger,LSN * fsynced_lsn)356 static int close_and_open_logfile (TOKULOGGER logger, LSN *fsynced_lsn)
357 // Effect: close the current file, and open the next one.
358 // Entry: This thread has permission to modify the output.
359 // Exit:  This thread has permission to modify the output.
360 {
361     int r;
362     if (logger->write_log_files) {
363         toku_file_fsync_without_accounting(logger->fd);
364         *fsynced_lsn = logger->written_lsn;
365         toku_logfilemgr_update_last_lsn(logger->logfilemgr,
366                                         logger->written_lsn);  // fixes t:2294
367     }
368     r = toku_os_close(logger->fd);
369 
370     if (r != 0)
371         return get_error_errno();
372     return open_logfile(logger);
373 }
374 
375 static int
max_int(int a,int b)376 max_int (int a, int b)
377 {
378     if (a>b) return a;
379     return b;
380 }
381 
382 // ***********************************************************
383 // output mutex/condition manipulation routines
384 // ***********************************************************
385 
386 static void
wait_till_output_available(TOKULOGGER logger)387 wait_till_output_available (TOKULOGGER logger)
388 // Effect: Wait until output becomes available.
389 // Implementation hint: Use a pthread_cond_wait.
390 // Entry: Holds the output_condition_lock (but not the inlock)
391 // Exit: Holds the output_condition_lock and logger->output_is_available
392 //
393 {
394     tokutime_t t0 = toku_time_now();
395     while (!logger->output_is_available) {
396         toku_cond_wait(&logger->output_condition, &logger->output_condition_lock);
397     }
398     if (tokutime_to_seconds(toku_time_now() - t0) >= 0.100) {
399         logger->num_wait_buf_long++;
400     }
401 }
402 
403 static void
grab_output(TOKULOGGER logger,LSN * fsynced_lsn)404 grab_output(TOKULOGGER logger, LSN *fsynced_lsn)
405 // Effect: Wait until output becomes available and get permission to modify output.
406 // Entry: Holds no lock (including not holding the input lock, since we never hold both at once).
407 // Exit:  Hold permission to modify output (but none of the locks).
408 {
409     toku_mutex_lock(&logger->output_condition_lock);
410     wait_till_output_available(logger);
411     logger->output_is_available = false;
412     if (fsynced_lsn) {
413         *fsynced_lsn = logger->fsynced_lsn;
414     }
415     toku_mutex_unlock(&logger->output_condition_lock);
416 }
417 
418 static bool
wait_till_output_already_written_or_output_buffer_available(TOKULOGGER logger,LSN lsn,LSN * fsynced_lsn)419 wait_till_output_already_written_or_output_buffer_available (TOKULOGGER logger, LSN lsn, LSN *fsynced_lsn)
420 // Effect: Wait until either the output is available or the lsn has been written.
421 //  Return true iff the lsn has been written.
422 //  If returning true, then on exit we don't hold output permission.
423 //  If returning false, then on exit we do hold output permission.
424 // Entry: Hold no locks.
425 // Exit: Hold the output permission if returns false.
426 {
427     bool result;
428     toku_mutex_lock(&logger->output_condition_lock);
429     while (1) {
430         if (logger->fsynced_lsn.lsn >= lsn.lsn) { // we can look at the fsynced lsn since we have the lock.
431             result = true;
432             break;
433         }
434         if (logger->output_is_available) {
435             logger->output_is_available = false;
436             result = false;
437             break;
438         }
439         // otherwise wait for a good time to look again.
440         toku_cond_wait(&logger->output_condition, &logger->output_condition_lock);
441     }
442     *fsynced_lsn = logger->fsynced_lsn;
443     toku_mutex_unlock(&logger->output_condition_lock);
444     return result;
445 }
446 
447 static void
release_output(TOKULOGGER logger,LSN fsynced_lsn)448 release_output (TOKULOGGER logger, LSN fsynced_lsn)
449 // Effect: Release output permission.
450 // Entry: Holds output permissions, but no locks.
451 // Exit: Holds neither locks nor output permission.
452 {
453     toku_mutex_lock(&logger->output_condition_lock);
454     logger->output_is_available = true;
455     if (logger->fsynced_lsn.lsn < fsynced_lsn.lsn) {
456         logger->fsynced_lsn = fsynced_lsn;
457     }
458     toku_cond_broadcast(&logger->output_condition);
459     toku_mutex_unlock(&logger->output_condition_lock);
460 }
461 
462 static void
swap_inbuf_outbuf(TOKULOGGER logger)463 swap_inbuf_outbuf (TOKULOGGER logger)
464 // Effect: Swap the inbuf and outbuf
465 // Entry and exit: Hold the input lock and permission to modify output.
466 {
467     struct logbuf tmp = logger->inbuf;
468     logger->inbuf = logger->outbuf;
469     logger->outbuf = tmp;
470     assert(logger->inbuf.n_in_buf == 0);
471 }
472 
473 static void
write_outbuf_to_logfile(TOKULOGGER logger,LSN * fsynced_lsn)474 write_outbuf_to_logfile (TOKULOGGER logger, LSN *fsynced_lsn)
475 // Effect:  Write the contents of outbuf to logfile.  Don't necessarily fsync (but it might, in which case fynced_lsn is updated).
476 //  If the logfile gets too big, open the next one (that's the case where an fsync might happen).
477 // Entry and exit: Holds permission to modify output (and doesn't let it go, so it's ok to also hold the inlock).
478 {
479     if (logger->outbuf.n_in_buf>0) {
480         // Write the outbuf to disk, take accounting measurements
481         tokutime_t io_t0 = toku_time_now();
482         toku_os_full_write(logger->fd, logger->outbuf.buf, logger->outbuf.n_in_buf);
483         tokutime_t io_t1 = toku_time_now();
484         logger->num_writes_to_disk++;
485         logger->bytes_written_to_disk += logger->outbuf.n_in_buf;
486         logger->time_spent_writing_to_disk += (io_t1 - io_t0);
487 
488         assert(logger->outbuf.max_lsn_in_buf.lsn > logger->written_lsn.lsn); // since there is something in the buffer, its LSN must be bigger than what's previously written.
489         logger->written_lsn = logger->outbuf.max_lsn_in_buf;
490         logger->n_in_file += logger->outbuf.n_in_buf;
491         logger->outbuf.n_in_buf = 0;
492     }
493     // If the file got too big, then open a new file.
494     if (logger->n_in_file > logger->lg_max) {
495         int r = close_and_open_logfile(logger, fsynced_lsn);
496         assert_zero(r);
497     }
498 }
499 
500 void
toku_logger_make_space_in_inbuf(TOKULOGGER logger,int n_bytes_needed)501 toku_logger_make_space_in_inbuf (TOKULOGGER logger, int n_bytes_needed)
502 // Entry: Holds the inlock
503 // Exit:  Holds the inlock
504 // Effect: Upon exit, the inlock is held and there are at least n_bytes_needed in the buffer.
505 //  May release the inlock (and then reacquire it), so this is not atomic.
506 //  May obtain the output lock and output permission (but if it does so, it will have released the inlock, since we don't hold both locks at once).
507 //   (But may hold output permission and inlock at the same time.)
508 // Implementation hint: Makes space in the inbuf, possibly by writing the inbuf to disk or increasing the size of the inbuf.  There might not be an fsync.
509 // Arguments:  logger:         the logger (side effects)
510 //             n_bytes_needed: how many bytes to make space for.
511 {
512     if (logger->inbuf.n_in_buf + n_bytes_needed <= LOGGER_MIN_BUF_SIZE) {
513         return;
514     }
515     ml_unlock(&logger->input_lock);
516     LSN fsynced_lsn;
517     grab_output(logger, &fsynced_lsn);
518 
519     ml_lock(&logger->input_lock);
520     // Some other thread may have written the log out while we didn't have the lock.  If we have space now, then be happy.
521     if (logger->inbuf.n_in_buf + n_bytes_needed <= LOGGER_MIN_BUF_SIZE) {
522         release_output(logger, fsynced_lsn);
523         return;
524     }
525     if (logger->inbuf.n_in_buf > 0) {
526         // There isn't enough space, and there is something in the buffer, so write the inbuf.
527         swap_inbuf_outbuf(logger);
528 
529         // Don't release the inlock in this case, because we don't want to get starved.
530         write_outbuf_to_logfile(logger, &fsynced_lsn);
531     }
532     // the inbuf is empty.  Make it big enough (just in case it is somehow smaller than a single log entry).
533     if (n_bytes_needed > logger->inbuf.buf_size) {
534         assert(n_bytes_needed < (1<<30)); // it seems unlikely to work if a logentry gets that big.
535         int new_size = max_int(logger->inbuf.buf_size * 2, n_bytes_needed); // make it at least twice as big, and big enough for n_bytes
536         assert(new_size < (1<<30));
537         XREALLOC_N(new_size, logger->inbuf.buf);
538         logger->inbuf.buf_size = new_size;
539     }
540     release_output(logger, fsynced_lsn);
541 }
542 
toku_logger_fsync(TOKULOGGER logger)543 void toku_logger_fsync(TOKULOGGER logger)
544 // Effect: This is the exported fsync used by ydb.c for env_log_flush.  Group commit doesn't have to work.
545 // Entry: Holds no locks
546 // Exit: Holds no locks
547 // Implementation note:  Acquire the output condition lock, then the output permission, then release the output condition lock, then get the input lock.
548 // Then release everything.  Hold the input lock while reading the current max lsn in buf to make drd happy that there is no data race.
549 {
550     ml_lock(&logger->input_lock);
551     const LSN max_lsn_in_buf = logger->inbuf.max_lsn_in_buf;
552     ml_unlock(&logger->input_lock);
553 
554     toku_logger_maybe_fsync(logger, max_lsn_in_buf, true, false);
555 }
556 
toku_logger_fsync_if_lsn_not_fsynced(TOKULOGGER logger,LSN lsn)557 void toku_logger_fsync_if_lsn_not_fsynced (TOKULOGGER logger, LSN lsn) {
558     if (logger->write_log_files) {
559         toku_logger_maybe_fsync(logger, lsn, true, false);
560     }
561 }
562 
toku_logger_is_open(TOKULOGGER logger)563 int toku_logger_is_open(TOKULOGGER logger) {
564     if (logger==0) return 0;
565     return logger->is_open;
566 }
567 
toku_logger_set_cachetable(TOKULOGGER logger,CACHETABLE ct)568 void toku_logger_set_cachetable (TOKULOGGER logger, CACHETABLE ct) {
569     logger->ct = ct;
570 }
571 
toku_logger_set_lg_max(TOKULOGGER logger,uint32_t lg_max)572 int toku_logger_set_lg_max(TOKULOGGER logger, uint32_t lg_max) {
573     if (logger==0) return EINVAL; // no logger
574     if (logger->is_open) return EINVAL;
575     if (lg_max>(1<<30)) return EINVAL; // too big
576     logger->lg_max = lg_max;
577     return 0;
578 }
toku_logger_get_lg_max(TOKULOGGER logger,uint32_t * lg_maxp)579 int toku_logger_get_lg_max(TOKULOGGER logger, uint32_t *lg_maxp) {
580     if (logger==0) return EINVAL; // no logger
581     *lg_maxp = logger->lg_max;
582     return 0;
583 }
584 
toku_logger_set_lg_bsize(TOKULOGGER logger,uint32_t bsize)585 int toku_logger_set_lg_bsize(TOKULOGGER logger, uint32_t bsize) {
586     if (logger==0) return EINVAL; // no logger
587     if (logger->is_open) return EINVAL;
588     if (bsize<=0 || bsize>(1<<30)) return EINVAL;
589     logger->write_block_size = bsize;
590     return 0;
591 }
592 
toku_logger_find_next_unused_log_file(const char * directory,long long * result)593 int toku_logger_find_next_unused_log_file(const char *directory, long long *result)
594 // This is called during logger initialalization, and no locks are required.
595 {
596     DIR *d=opendir(directory);
597     long long maxf=-1; *result = maxf;
598     struct dirent *de;
599     if (d==0) return get_error_errno();
600     while ((de=readdir(d))) {
601         if (de==0) return get_error_errno();
602         long long thisl = -1;
603         if ( is_a_logfile(de->d_name, &thisl) ) {
604             if ((long long)thisl > maxf) maxf = thisl;
605         }
606     }
607     *result=maxf+1;
608     int r = closedir(d);
609     return r;
610 }
611 
612 // TODO: Put this in portability layer when ready
613 // in: file pathname that may have a dirname prefix
614 // return: file leaf name
fileleafname(char * pathname)615 static char * fileleafname(char *pathname) {
616     const char delimiter = '/';
617     char *leafname = strrchr(pathname, delimiter);
618     if (leafname)
619         leafname++;
620     else
621         leafname = pathname;
622     return leafname;
623 }
624 
logfilenamecompare(const void * ap,const void * bp)625 static int logfilenamecompare (const void *ap, const void *bp) {
626     char *a=*(char**)ap;
627     char *a_leafname = fileleafname(a);
628     char *b=*(char**)bp;
629     char * b_leafname = fileleafname(b);
630     int rval;
631     bool valid;
632     uint64_t num_a = 0;  // placate compiler
633     uint64_t num_b = 0;
634     uint32_t ver_a = 0;
635     uint32_t ver_b = 0;
636     valid = is_a_logfile_any_version(a_leafname, &num_a, &ver_a);
637     invariant(valid);
638     valid = is_a_logfile_any_version(b_leafname, &num_b, &ver_b);
639     invariant(valid);
640     if (ver_a < ver_b) rval = -1;
641     else if (ver_a > ver_b) rval = +1;
642     else if (num_a < num_b) rval = -1;
643     else if (num_a > num_b) rval = +1;
644     else rval = 0;
645     return rval;
646 }
647 
648 // Return the log files in sorted order
649 // Return a null_terminated array of strings, and also return the number of strings in the array.
650 // Requires: Race conditions must be dealt with by caller.  Either call during initialization or grab the output permission.
toku_logger_find_logfiles(const char * directory,char *** resultp,int * n_logfiles)651 int toku_logger_find_logfiles (const char *directory, char ***resultp, int *n_logfiles)
652 {
653     int result_limit=2;
654     int n_results=0;
655     char **MALLOC_N(result_limit, result);
656     assert(result!= NULL);
657     struct dirent *de;
658     DIR *d=opendir(directory);
659     if (d==0) {
660         int er = get_error_errno();
661         toku_free(result);
662         return er;
663     }
664     int dirnamelen = strlen(directory);
665     while ((de=readdir(d))) {
666         uint64_t thisl;
667         uint32_t version_ignore;
668         if ( !(is_a_logfile_any_version(de->d_name, &thisl, &version_ignore)) ) continue; //#2424: Skip over files that don't match the exact logfile template
669         if (n_results+1>=result_limit) {
670             result_limit*=2;
671             XREALLOC_N(result_limit, result);
672         }
673         int fnamelen = dirnamelen + strlen(de->d_name) + 2; // One for the slash and one for the trailing NUL.
674         char *XMALLOC_N(fnamelen, fname);
675         snprintf(fname, fnamelen, "%s/%s", directory, de->d_name);
676         result[n_results++] = fname;
677     }
678     // Return them in increasing order.
679     qsort(result, n_results, sizeof(result[0]), logfilenamecompare);
680     *resultp    = result;
681     *n_logfiles = n_results;
682     result[n_results]=0; // make a trailing null
683     return d ? closedir(d) : 0;
684 }
685 
toku_logger_free_logfiles(char ** logfiles,int n_logfiles)686 void toku_logger_free_logfiles(char **logfiles, int n_logfiles) {
687     for (int i = 0; i < n_logfiles; i++)
688         toku_free(logfiles[i]);
689     toku_free(logfiles);
690 }
691 
open_logfile(TOKULOGGER logger)692 static int open_logfile (TOKULOGGER logger)
693 // Entry and Exit: This thread has permission to modify the output.
694 {
695     int fnamelen = strlen(logger->directory)+50;
696     char fname[fnamelen];
697     snprintf(fname,
698              fnamelen,
699              "%s/log%012lld.tokulog%d",
700              logger->directory,
701              logger->next_log_file_number,
702              TOKU_LOG_VERSION);
703     long long index = logger->next_log_file_number;
704     if (logger->write_log_files) {
705         logger->fd =
706             toku_os_open(fname,
707                          O_CREAT + O_WRONLY + O_TRUNC + O_EXCL + O_BINARY,
708                          S_IRUSR + S_IWUSR,
709                          *tokudb_file_log_key);
710         if (logger->fd == -1) {
711             return get_error_errno();
712         }
713         fsync_logdir(logger);
714         logger->next_log_file_number++;
715     } else {
716         logger->fd = toku_os_open(
717             DEV_NULL_FILE, O_WRONLY + O_BINARY, S_IWUSR, *tokudb_file_log_key);
718         if (logger->fd == -1) {
719             return get_error_errno();
720         }
721     }
722     toku_os_full_write(logger->fd, "tokulogg", 8);
723     int version_l = toku_htonl(log_format_version); //version MUST be in network byte order regardless of disk order
724     toku_os_full_write(logger->fd, &version_l, 4);
725     if ( logger->write_log_files ) {
726         TOKULOGFILEINFO XMALLOC(lf_info);
727         lf_info->index = index;
728         lf_info->maxlsn = logger->written_lsn;
729         lf_info->version = TOKU_LOG_VERSION;
730         toku_logfilemgr_add_logfile_info(logger->logfilemgr, lf_info);
731     }
732     logger->fsynced_lsn = logger->written_lsn;
733     logger->n_in_file = 12;
734     return 0;
735 }
736 
delete_logfile(TOKULOGGER logger,long long index,uint32_t version)737 static void delete_logfile(TOKULOGGER logger, long long index, uint32_t version)
738 // Entry and Exit: This thread has permission to modify the output.
739 {
740     int fnamelen = strlen(logger->directory)+50;
741     char fname[fnamelen];
742     snprintf(fname, fnamelen, "%s/log%012lld.tokulog%d", logger->directory, index, version);
743     int r = remove(fname);
744     invariant_zero(r);
745 }
746 
toku_logger_maybe_trim_log(TOKULOGGER logger,LSN trim_lsn)747 void toku_logger_maybe_trim_log(TOKULOGGER logger, LSN trim_lsn)
748 // On entry and exit: No logger locks held.
749 // Acquires and releases output permission.
750 {
751     LSN fsynced_lsn;
752     grab_output(logger, &fsynced_lsn);
753     TOKULOGFILEMGR lfm = logger->logfilemgr;
754     int n_logfiles = toku_logfilemgr_num_logfiles(lfm);
755 
756     TOKULOGFILEINFO lf_info = NULL;
757 
758     if ( logger->write_log_files && logger->trim_log_files) {
759         while ( n_logfiles > 1 ) { // don't delete current logfile
760             uint32_t log_version;
761             lf_info = toku_logfilemgr_get_oldest_logfile_info(lfm);
762             log_version = lf_info->version;
763             if ( lf_info->maxlsn.lsn >= trim_lsn.lsn ) {
764                 // file contains an open LSN, can't delete this or any newer log files
765                 break;
766             }
767             // need to save copy - toku_logfilemgr_delete_oldest_logfile_info free's the lf_info
768             long index = lf_info->index;
769             toku_logfilemgr_delete_oldest_logfile_info(lfm);
770             n_logfiles--;
771             delete_logfile(logger, index, log_version);
772         }
773     }
774     release_output(logger, fsynced_lsn);
775 }
776 
toku_logger_write_log_files(TOKULOGGER logger,bool write_log_files)777 void toku_logger_write_log_files (TOKULOGGER logger, bool write_log_files)
778 // Called only during initialization (or just after recovery), so no locks are needed.
779 {
780     logger->write_log_files = write_log_files;
781 }
782 
toku_logger_trim_log_files(TOKULOGGER logger,bool trim_log_files)783 void toku_logger_trim_log_files (TOKULOGGER logger, bool trim_log_files)
784 // Called only during initialization, so no locks are needed.
785 {
786     logger->trim_log_files = trim_log_files;
787 }
788 
toku_logger_txns_exist(TOKULOGGER logger)789 bool toku_logger_txns_exist(TOKULOGGER logger)
790 // Called during close of environment to ensure that transactions don't exist
791 {
792     return toku_txn_manager_txns_exist(logger->txn_manager);
793 }
794 
795 
toku_logger_maybe_fsync(TOKULOGGER logger,LSN lsn,int do_fsync,bool holds_input_lock)796 void toku_logger_maybe_fsync(TOKULOGGER logger, LSN lsn, int do_fsync, bool holds_input_lock)
797 // Effect: If fsync is nonzero, then make sure that the log is flushed and synced at least up to lsn.
798 // Entry: Holds input lock iff 'holds_input_lock'.  The log entry has already been written to the input buffer.
799 // Exit:  Holds no locks.
800 // The input lock may be released and then reacquired.  Thus this function does not run atomically with respect to other threads.
801 {
802     if (holds_input_lock) {
803         ml_unlock(&logger->input_lock);
804     }
805     if (do_fsync) {
806         // reacquire the locks (acquire output permission first)
807         LSN  fsynced_lsn;
808         bool already_done = wait_till_output_already_written_or_output_buffer_available(logger, lsn, &fsynced_lsn);
809         if (already_done) {
810             return;
811         }
812 
813         // otherwise we now own the output permission, and our lsn isn't outputed.
814 
815         ml_lock(&logger->input_lock);
816 
817         swap_inbuf_outbuf(logger);
818 
819         ml_unlock(&logger->input_lock); // release the input lock now, so other threads can fill the inbuf.  (Thus enabling group commit.)
820 
821         write_outbuf_to_logfile(logger, &fsynced_lsn);
822         if (fsynced_lsn.lsn < lsn.lsn) {
823             // it may have gotten fsynced by the write_outbuf_to_logfile.
824             toku_file_fsync_without_accounting(logger->fd);
825             assert(fsynced_lsn.lsn <= logger->written_lsn.lsn);
826             fsynced_lsn = logger->written_lsn;
827         }
828         // the last lsn is only accessed while holding output permission or else when the log file is old.
829         if (logger->write_log_files) {
830             toku_logfilemgr_update_last_lsn(logger->logfilemgr, logger->written_lsn);
831         }
832         release_output(logger, fsynced_lsn);
833     }
834 }
835 
836 static void
logger_write_buffer(TOKULOGGER logger,LSN * fsynced_lsn)837 logger_write_buffer(TOKULOGGER logger, LSN *fsynced_lsn)
838 // Entry:  Holds the input lock and permission to modify output.
839 // Exit:   Holds only the permission to modify output.
840 // Effect:  Write the buffers to the output.  If DO_FSYNC is true, then fsync.
841 // Note: Only called during single-threaded activity from toku_logger_restart, so locks aren't really needed.
842 {
843     swap_inbuf_outbuf(logger);
844     ml_unlock(&logger->input_lock);
845     write_outbuf_to_logfile(logger, fsynced_lsn);
846     if (logger->write_log_files) {
847         toku_file_fsync_without_accounting(logger->fd);
848         toku_logfilemgr_update_last_lsn(logger->logfilemgr, logger->written_lsn);  // t:2294
849     }
850 }
851 
toku_logger_restart(TOKULOGGER logger,LSN lastlsn)852 int toku_logger_restart(TOKULOGGER logger, LSN lastlsn)
853 // Entry and exit: Holds no locks (this is called only during single-threaded activity, such as initial start).
854 {
855     int r;
856 
857     // flush out the log buffer
858     LSN fsynced_lsn;
859     grab_output(logger, &fsynced_lsn);
860     ml_lock(&logger->input_lock);
861     logger_write_buffer(logger, &fsynced_lsn);
862 
863     // close the log file
864     if (logger->write_log_files) {  // fsyncs don't work to /dev/null
865         toku_file_fsync_without_accounting(logger->fd);
866     }
867     r = toku_os_close(logger->fd);
868     assert(r == 0);
869     logger->fd = -1;
870 
871     // reset the LSN's to the lastlsn when the logger was opened
872     logger->lsn = logger->written_lsn = logger->fsynced_lsn = lastlsn;
873     logger->write_log_files = true;
874     logger->trim_log_files = true;
875 
876     // open a new log file
877     r = open_logfile(logger);
878     release_output(logger, fsynced_lsn);
879     return r;
880 }
881 
882 // fname is the iname
toku_logger_log_fcreate(TOKUTXN txn,const char * fname,FILENUM filenum,uint32_t mode,uint32_t treeflags,uint32_t nodesize,uint32_t basementnodesize,enum toku_compression_method compression_method)883 void toku_logger_log_fcreate (TOKUTXN txn, const char *fname, FILENUM filenum, uint32_t mode,
884         uint32_t treeflags, uint32_t nodesize, uint32_t basementnodesize,
885         enum toku_compression_method compression_method) {
886     if (txn) {
887         BYTESTRING bs_fname = { .len = (uint32_t) strlen(fname), .data = (char *) fname };
888         // fsync log on fcreate
889         toku_log_fcreate (txn->logger, (LSN*)0, 1, txn, toku_txn_get_txnid(txn), filenum,
890                 bs_fname, mode, treeflags, nodesize, basementnodesize, compression_method);
891     }
892 }
893 
894 
895 // We only do fdelete on open ft's, so we pass the filenum here
toku_logger_log_fdelete(TOKUTXN txn,FILENUM filenum)896 void toku_logger_log_fdelete (TOKUTXN txn, FILENUM filenum) {
897     if (txn) {
898         //No fsync.
899         toku_log_fdelete (txn->logger, (LSN*)0, 0, txn, toku_txn_get_txnid(txn), filenum);
900     }
901 }
902 
903 
904 
905 /* fopen isn't really an action.  It's just for bookkeeping.  We need to know the filename that goes with a filenum. */
toku_logger_log_fopen(TOKUTXN txn,const char * fname,FILENUM filenum,uint32_t treeflags)906 void toku_logger_log_fopen (TOKUTXN txn, const char * fname, FILENUM filenum, uint32_t treeflags) {
907     if (txn) {
908         BYTESTRING bs;
909         bs.len = strlen(fname);
910         bs.data = (char*)fname;
911         toku_log_fopen (txn->logger, (LSN*)0, 0, bs, filenum, treeflags);
912     }
913 }
914 
toku_fread_uint8_t_nocrclen(FILE * f,uint8_t * v)915 static int toku_fread_uint8_t_nocrclen (FILE *f, uint8_t *v) {
916     int vi=fgetc(f);
917     if (vi==EOF) return -1;
918     uint8_t vc=(uint8_t)vi;
919     *v = vc;
920     return 0;
921 }
922 
toku_fread_uint8_t(FILE * f,uint8_t * v,struct x1764 * mm,uint32_t * len)923 int toku_fread_uint8_t (FILE *f, uint8_t *v, struct x1764 *mm, uint32_t *len) {
924     int vi=fgetc(f);
925     if (vi==EOF) return -1;
926     uint8_t vc=(uint8_t)vi;
927     toku_x1764_add(mm, &vc, 1);
928     (*len)++;
929     *v = vc;
930     return 0;
931 }
932 
toku_fread_uint32_t_nocrclen(FILE * f,uint32_t * v)933 int toku_fread_uint32_t_nocrclen (FILE *f, uint32_t *v) {
934     uint32_t result;
935     uint8_t *cp = (uint8_t*)&result;
936     int r;
937     r = toku_fread_uint8_t_nocrclen (f, cp+0); if (r!=0) return r;
938     r = toku_fread_uint8_t_nocrclen (f, cp+1); if (r!=0) return r;
939     r = toku_fread_uint8_t_nocrclen (f, cp+2); if (r!=0) return r;
940     r = toku_fread_uint8_t_nocrclen (f, cp+3); if (r!=0) return r;
941     *v = toku_dtoh32(result);
942 
943     return 0;
944 }
toku_fread_uint32_t(FILE * f,uint32_t * v,struct x1764 * checksum,uint32_t * len)945 int toku_fread_uint32_t (FILE *f, uint32_t *v, struct x1764 *checksum, uint32_t *len) {
946     uint32_t result;
947     uint8_t *cp = (uint8_t*)&result;
948     int r;
949     r = toku_fread_uint8_t (f, cp+0, checksum, len); if(r!=0) return r;
950     r = toku_fread_uint8_t (f, cp+1, checksum, len); if(r!=0) return r;
951     r = toku_fread_uint8_t (f, cp+2, checksum, len); if(r!=0) return r;
952     r = toku_fread_uint8_t (f, cp+3, checksum, len); if(r!=0) return r;
953     *v = toku_dtoh32(result);
954     return 0;
955 }
956 
toku_fread_uint64_t(FILE * f,uint64_t * v,struct x1764 * checksum,uint32_t * len)957 int toku_fread_uint64_t (FILE *f, uint64_t *v, struct x1764 *checksum, uint32_t *len) {
958     uint32_t v1,v2;
959     int r;
960     r=toku_fread_uint32_t(f, &v1, checksum, len);    if (r!=0) return r;
961     r=toku_fread_uint32_t(f, &v2, checksum, len);    if (r!=0) return r;
962     *v = (((uint64_t)v1)<<32 ) | ((uint64_t)v2);
963     return 0;
964 }
965 
toku_fread_bool(FILE * f,bool * v,struct x1764 * mm,uint32_t * len)966 int toku_fread_bool (FILE *f, bool *v, struct x1764 *mm, uint32_t *len) {
967     uint8_t iv;
968     int r = toku_fread_uint8_t(f, &iv, mm, len);
969     if (r == 0) {
970         *v = (iv!=0);
971     }
972     return r;
973 }
974 
toku_fread_LSN(FILE * f,LSN * lsn,struct x1764 * checksum,uint32_t * len)975 int toku_fread_LSN     (FILE *f, LSN *lsn, struct x1764 *checksum, uint32_t *len) {
976     return toku_fread_uint64_t (f, &lsn->lsn, checksum, len);
977 }
978 
toku_fread_BLOCKNUM(FILE * f,BLOCKNUM * b,struct x1764 * checksum,uint32_t * len)979 int toku_fread_BLOCKNUM (FILE *f, BLOCKNUM *b, struct x1764 *checksum, uint32_t *len) {
980     return toku_fread_uint64_t (f, (uint64_t*)&b->b, checksum, len);
981 }
982 
toku_fread_FILENUM(FILE * f,FILENUM * filenum,struct x1764 * checksum,uint32_t * len)983 int toku_fread_FILENUM (FILE *f, FILENUM *filenum, struct x1764 *checksum, uint32_t *len) {
984     return toku_fread_uint32_t (f, &filenum->fileid, checksum, len);
985 }
986 
toku_fread_TXNID(FILE * f,TXNID * txnid,struct x1764 * checksum,uint32_t * len)987 int toku_fread_TXNID   (FILE *f, TXNID *txnid, struct x1764 *checksum, uint32_t *len) {
988     return toku_fread_uint64_t (f, txnid, checksum, len);
989 }
990 
toku_fread_TXNID_PAIR(FILE * f,TXNID_PAIR * txnid,struct x1764 * checksum,uint32_t * len)991 int toku_fread_TXNID_PAIR   (FILE *f, TXNID_PAIR *txnid, struct x1764 *checksum, uint32_t *len) {
992     TXNID parent;
993     TXNID child;
994     int r;
995     r = toku_fread_TXNID(f, &parent, checksum, len); if (r != 0) { return r; }
996     r = toku_fread_TXNID(f, &child, checksum, len);  if (r != 0) { return r; }
997     txnid->parent_id64 = parent;
998     txnid->child_id64 = child;
999     return 0;
1000 }
1001 
1002 
toku_fread_XIDP(FILE * f,XIDP * xidp,struct x1764 * checksum,uint32_t * len)1003 int toku_fread_XIDP    (FILE *f, XIDP *xidp, struct x1764 *checksum, uint32_t *len) {
1004     // These reads are verbose because XA defined the fields as "long", but we use 4 bytes, 1 byte and 1 byte respectively.
1005     TOKU_XA_XID *XMALLOC(xid);
1006     {
1007         uint32_t formatID;
1008         int r = toku_fread_uint32_t(f, &formatID,     checksum, len);
1009         if (r!=0) return r;
1010         xid->formatID = formatID;
1011     }
1012     {
1013         uint8_t gtrid_length;
1014         int r = toku_fread_uint8_t (f, &gtrid_length, checksum, len);
1015         if (r!=0) return r;
1016         xid->gtrid_length = gtrid_length;
1017     }
1018     {
1019         uint8_t bqual_length;
1020         int r = toku_fread_uint8_t (f, &bqual_length, checksum, len);
1021         if (r!=0) return r;
1022         xid->bqual_length = bqual_length;
1023     }
1024     for (int i=0; i< xid->gtrid_length + xid->bqual_length; i++) {
1025         uint8_t byte;
1026         int r = toku_fread_uint8_t(f, &byte, checksum, len);
1027         if (r!=0) return r;
1028         xid->data[i] = byte;
1029     }
1030     *xidp = xid;
1031     return 0;
1032 }
1033 
1034 // fills in the bs with malloced data.
toku_fread_BYTESTRING(FILE * f,BYTESTRING * bs,struct x1764 * checksum,uint32_t * len)1035 int toku_fread_BYTESTRING (FILE *f, BYTESTRING *bs, struct x1764 *checksum, uint32_t *len) {
1036     int r=toku_fread_uint32_t(f, (uint32_t*)&bs->len, checksum, len);
1037     if (r!=0) return r;
1038     XMALLOC_N(bs->len, bs->data);
1039     uint32_t i;
1040     for (i=0; i<bs->len; i++) {
1041         r=toku_fread_uint8_t(f, (uint8_t*)&bs->data[i], checksum, len);
1042         if (r!=0) {
1043             toku_free(bs->data);
1044             bs->data=0;
1045             return r;
1046         }
1047     }
1048     return 0;
1049 }
1050 
1051 // fills in the fs with malloced data.
toku_fread_FILENUMS(FILE * f,FILENUMS * fs,struct x1764 * checksum,uint32_t * len)1052 int toku_fread_FILENUMS (FILE *f, FILENUMS *fs, struct x1764 *checksum, uint32_t *len) {
1053     int r=toku_fread_uint32_t(f, (uint32_t*)&fs->num, checksum, len);
1054     if (r!=0) return r;
1055     XMALLOC_N(fs->num, fs->filenums);
1056     uint32_t i;
1057     for (i=0; i<fs->num; i++) {
1058         r=toku_fread_FILENUM (f, &fs->filenums[i], checksum, len);
1059         if (r!=0) {
1060             toku_free(fs->filenums);
1061             fs->filenums=0;
1062             return r;
1063         }
1064     }
1065     return 0;
1066 }
1067 
toku_logprint_LSN(FILE * outf,FILE * inf,const char * fieldname,struct x1764 * checksum,uint32_t * len,const char * format)1068 int toku_logprint_LSN (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format __attribute__((__unused__))) {
1069     LSN v;
1070     int r = toku_fread_LSN(inf, &v, checksum, len);
1071     if (r!=0) return r;
1072     fprintf(outf, " %s=%" PRIu64, fieldname, v.lsn);
1073     return 0;
1074 }
1075 
toku_logprint_TXNID(FILE * outf,FILE * inf,const char * fieldname,struct x1764 * checksum,uint32_t * len,const char * format)1076 int toku_logprint_TXNID (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format __attribute__((__unused__))) {
1077     TXNID v;
1078     int r = toku_fread_TXNID(inf, &v, checksum, len);
1079     if (r!=0) return r;
1080     fprintf(outf, " %s=%" PRIu64, fieldname, v);
1081     return 0;
1082 }
1083 
toku_logprint_TXNID_PAIR(FILE * outf,FILE * inf,const char * fieldname,struct x1764 * checksum,uint32_t * len,const char * format)1084 int toku_logprint_TXNID_PAIR (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format __attribute__((__unused__))) {
1085     TXNID_PAIR v;
1086     int r = toku_fread_TXNID_PAIR(inf, &v, checksum, len);
1087     if (r!=0) return r;
1088     fprintf(outf, " %s=%" PRIu64 ",%" PRIu64, fieldname, v.parent_id64, v.child_id64);
1089     return 0;
1090 }
1091 
toku_logprint_XIDP(FILE * outf,FILE * inf,const char * fieldname,struct x1764 * checksum,uint32_t * len,const char * format)1092 int toku_logprint_XIDP (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format __attribute__((__unused__))) {
1093     XIDP vp;
1094     int r = toku_fread_XIDP(inf, &vp, checksum, len);
1095     if (r!=0) return r;
1096     fprintf(outf, " %s={formatID=0x%lx gtrid_length=%ld bqual_length=%ld data=", fieldname, vp->formatID, vp->gtrid_length, vp->bqual_length);
1097     toku_print_bytes(outf, vp->gtrid_length + vp->bqual_length, vp->data);
1098     fprintf(outf, "}");
1099     toku_free(vp);
1100     return 0;
1101 }
1102 
toku_logprint_uint8_t(FILE * outf,FILE * inf,const char * fieldname,struct x1764 * checksum,uint32_t * len,const char * format)1103 int toku_logprint_uint8_t (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format) {
1104     uint8_t v;
1105     int r = toku_fread_uint8_t(inf, &v, checksum, len);
1106     if (r!=0) return r;
1107     fprintf(outf, " %s=%d", fieldname, v);
1108     if (format) fprintf(outf, format, v);
1109     else if (v=='\'') fprintf(outf, "('\'')");
1110     else if (isprint(v)) fprintf(outf, "('%c')", v);
1111     else {}/*nothing*/
1112     return 0;
1113 }
1114 
toku_logprint_uint32_t(FILE * outf,FILE * inf,const char * fieldname,struct x1764 * checksum,uint32_t * len,const char * format)1115 int toku_logprint_uint32_t (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format) {
1116     uint32_t v;
1117     int r = toku_fread_uint32_t(inf, &v, checksum, len);
1118     if (r!=0) return r;
1119     fprintf(outf, " %s=", fieldname);
1120     fprintf(outf, format ? format : "%d", v);
1121     return 0;
1122 }
1123 
toku_logprint_uint64_t(FILE * outf,FILE * inf,const char * fieldname,struct x1764 * checksum,uint32_t * len,const char * format)1124 int toku_logprint_uint64_t (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format) {
1125     uint64_t v;
1126     int r = toku_fread_uint64_t(inf, &v, checksum, len);
1127     if (r!=0) return r;
1128     fprintf(outf, " %s=", fieldname);
1129     fprintf(outf, format ? format : "%" PRId64, v);
1130     return 0;
1131 }
1132 
toku_logprint_bool(FILE * outf,FILE * inf,const char * fieldname,struct x1764 * checksum,uint32_t * len,const char * format)1133 int toku_logprint_bool (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format __attribute__((__unused__))) {
1134     bool v;
1135     int r = toku_fread_bool(inf, &v, checksum, len);
1136     if (r!=0) return r;
1137     fprintf(outf, " %s=%s", fieldname, v ? "true" : "false");
1138     return 0;
1139 
1140 }
1141 
toku_print_BYTESTRING(FILE * outf,uint32_t len,char * data)1142 void toku_print_BYTESTRING (FILE *outf, uint32_t len, char *data) {
1143     fprintf(outf, "{len=%u data=", len);
1144     toku_print_bytes(outf, len, data);
1145     fprintf(outf, "}");
1146 
1147 }
1148 
toku_logprint_BYTESTRING(FILE * outf,FILE * inf,const char * fieldname,struct x1764 * checksum,uint32_t * len,const char * format)1149 int toku_logprint_BYTESTRING (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format __attribute__((__unused__))) {
1150     BYTESTRING bs;
1151     int r = toku_fread_BYTESTRING(inf, &bs, checksum, len);
1152     if (r!=0) return r;
1153     fprintf(outf, " %s=", fieldname);
1154     toku_print_BYTESTRING(outf, bs.len, bs.data);
1155     toku_free(bs.data);
1156     return 0;
1157 }
1158 
toku_logprint_BLOCKNUM(FILE * outf,FILE * inf,const char * fieldname,struct x1764 * checksum,uint32_t * len,const char * format)1159 int toku_logprint_BLOCKNUM (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format) {
1160     return toku_logprint_uint64_t(outf, inf, fieldname, checksum, len, format);
1161 
1162 }
1163 
toku_logprint_FILENUM(FILE * outf,FILE * inf,const char * fieldname,struct x1764 * checksum,uint32_t * len,const char * format)1164 int toku_logprint_FILENUM (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format) {
1165     return toku_logprint_uint32_t(outf, inf, fieldname, checksum, len, format);
1166 
1167 }
1168 
1169 static void
toku_print_FILENUMS(FILE * outf,uint32_t num,FILENUM * filenums)1170 toku_print_FILENUMS (FILE *outf, uint32_t num, FILENUM *filenums) {
1171     fprintf(outf, "{num=%u filenums=\"", num);
1172     uint32_t i;
1173     for (i=0; i<num; i++) {
1174         if (i>0)
1175             fprintf(outf, ",");
1176         fprintf(outf, "0x%" PRIx32, filenums[i].fileid);
1177     }
1178     fprintf(outf, "\"}");
1179 
1180 }
1181 
toku_logprint_FILENUMS(FILE * outf,FILE * inf,const char * fieldname,struct x1764 * checksum,uint32_t * len,const char * format)1182 int toku_logprint_FILENUMS (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format __attribute__((__unused__))) {
1183     FILENUMS bs;
1184     int r = toku_fread_FILENUMS(inf, &bs, checksum, len);
1185     if (r!=0) return r;
1186     fprintf(outf, " %s=", fieldname);
1187     toku_print_FILENUMS(outf, bs.num, bs.filenums);
1188     toku_free(bs.filenums);
1189     return 0;
1190 }
1191 
toku_read_and_print_logmagic(FILE * f,uint32_t * versionp)1192 int toku_read_and_print_logmagic (FILE *f, uint32_t *versionp) {
1193     {
1194         char magic[8];
1195         int r=fread(magic, 1, 8, f);
1196         if (r!=8) {
1197             return DB_BADFORMAT;
1198         }
1199         if (memcmp(magic, "tokulogg", 8)!=0) {
1200             return DB_BADFORMAT;
1201         }
1202     }
1203     {
1204         int version;
1205             int r=fread(&version, 1, 4, f);
1206         if (r!=4) {
1207             return DB_BADFORMAT;
1208         }
1209         printf("tokulog v.%u\n", toku_ntohl(version));
1210         //version MUST be in network order regardless of disk order
1211         *versionp=toku_ntohl(version);
1212     }
1213     return 0;
1214 }
1215 
toku_read_logmagic(FILE * f,uint32_t * versionp)1216 int toku_read_logmagic (FILE *f, uint32_t *versionp) {
1217     {
1218         char magic[8];
1219         int r=fread(magic, 1, 8, f);
1220         if (r!=8) {
1221             return DB_BADFORMAT;
1222         }
1223         if (memcmp(magic, "tokulogg", 8)!=0) {
1224             return DB_BADFORMAT;
1225         }
1226     }
1227     {
1228         int version;
1229             int r=fread(&version, 1, 4, f);
1230         if (r!=4) {
1231             return DB_BADFORMAT;
1232         }
1233         *versionp=toku_ntohl(version);
1234     }
1235     return 0;
1236 }
1237 
toku_txn_get_txnid(TOKUTXN txn)1238 TXNID_PAIR toku_txn_get_txnid (TOKUTXN txn) {
1239     TXNID_PAIR tp = { .parent_id64 = TXNID_NONE, .child_id64 = TXNID_NONE};
1240     if (txn==0) return tp;
1241     else return txn->txnid;
1242 }
1243 
toku_logger_last_lsn(TOKULOGGER logger)1244 LSN toku_logger_last_lsn(TOKULOGGER logger) {
1245     return logger->lsn;
1246 }
1247 
toku_txn_logger(TOKUTXN txn)1248 TOKULOGGER toku_txn_logger (TOKUTXN txn) {
1249     return txn ? txn->logger : 0;
1250 }
1251 
toku_txnid2txn(TOKULOGGER logger,TXNID_PAIR txnid,TOKUTXN * result)1252 void toku_txnid2txn(TOKULOGGER logger, TXNID_PAIR txnid, TOKUTXN *result) {
1253     TOKUTXN root_txn = NULL;
1254     toku_txn_manager_suspend(logger->txn_manager);
1255     toku_txn_manager_id2txn_unlocked(logger->txn_manager, txnid, &root_txn);
1256     if (root_txn == NULL || root_txn->txnid.child_id64 == txnid.child_id64) {
1257         *result = root_txn;
1258     }
1259     else if (root_txn != NULL) {
1260         root_txn->child_manager->suspend();
1261         root_txn->child_manager->find_tokutxn_by_xid_unlocked(txnid, result);
1262         root_txn->child_manager->resume();
1263     }
1264     toku_txn_manager_resume(logger->txn_manager);
1265 }
1266 
1267 // Find the earliest LSN in a log.  No locks are needed.
peek_at_log(TOKULOGGER logger,char * filename,LSN * first_lsn)1268 static int peek_at_log(TOKULOGGER logger, char *filename, LSN *first_lsn) {
1269     int fd = toku_os_open(
1270         filename, O_RDONLY + O_BINARY, S_IRUSR, *tokudb_file_log_key);
1271     if (fd < 0) {
1272         int er = get_error_errno();
1273         if (logger->write_log_files)
1274             printf("couldn't open: %s\n", strerror(er));
1275         return er;
1276     }
1277     enum { SKIP = 12+1+4 }; // read the 12 byte header, the first message, and the first len
1278     unsigned char header[SKIP+8];
1279     int r = read(fd, header, SKIP+8);
1280     if (r!=SKIP+8) return 0; // cannot determine that it's archivable, so we'll assume no.  If a later-log is archivable is then this one will be too.
1281 
1282     uint64_t lsn;
1283     {
1284         struct rbuf rb;
1285         rb.buf   = header+SKIP;
1286         rb.size  = 8;
1287         rb.ndone = 0;
1288         lsn = rbuf_ulonglong(&rb);
1289     }
1290 
1291     r = toku_os_close(fd);
1292 
1293     if (r != 0) {
1294         return 0;
1295     }
1296 
1297     first_lsn->lsn = lsn;
1298     return 0;
1299 }
1300 
1301 // Return a malloc'd array of malloc'd strings which are the filenames that can be archived.
1302 // Output permission are obtained briefly so we can get a list of the log files without conflicting.
toku_logger_log_archive(TOKULOGGER logger,char *** logs_p,int flags)1303 int toku_logger_log_archive (TOKULOGGER logger, char ***logs_p, int flags) {
1304     if (flags!=0) return EINVAL; // don't know what to do.
1305     int all_n_logs;
1306     int i;
1307     char **all_logs;
1308     int n_logfiles;
1309     LSN fsynced_lsn;
1310     grab_output(logger, &fsynced_lsn);
1311     int r = toku_logger_find_logfiles (logger->directory, &all_logs, &n_logfiles);
1312     release_output(logger, fsynced_lsn);
1313     if (r!=0) return r;
1314 
1315     for (i=0; all_logs[i]; i++);
1316     all_n_logs=i;
1317     // get them into increasing order
1318     qsort(all_logs, all_n_logs, sizeof(all_logs[0]), logfilenamecompare);
1319 
1320     LSN save_lsn = logger->last_completed_checkpoint_lsn;
1321 
1322     // Now starting at the last one, look for archivable ones.
1323     // Count the total number of bytes, because we have to return a single big array.  (That's the BDB interface.  Bleah...)
1324     LSN earliest_lsn_in_logfile={(unsigned long long)(-1LL)};
1325     r = peek_at_log(logger, all_logs[all_n_logs-1], &earliest_lsn_in_logfile); // try to find the lsn that's in the most recent log
1326     if (earliest_lsn_in_logfile.lsn <= save_lsn.lsn) {
1327         i=all_n_logs-1;
1328     } else {
1329         for (i=all_n_logs-2; i>=0; i--) { // start at all_n_logs-2 because we never archive the most recent log
1330             r = peek_at_log(logger, all_logs[i], &earliest_lsn_in_logfile);
1331             if (r!=0) continue; // In case of error, just keep going
1332 
1333             if (earliest_lsn_in_logfile.lsn <= save_lsn.lsn) {
1334                 break;
1335             }
1336         }
1337     }
1338 
1339     // all log files up to, but but not including, i can be archived.
1340     int n_to_archive=i;
1341     int count_bytes=0;
1342     for (i=0; i<n_to_archive; i++) {
1343         count_bytes+=1+strlen(all_logs[i]);
1344     }
1345     char **result;
1346     if (i==0) {
1347         result=0;
1348     } else {
1349         CAST_FROM_VOIDP(result, toku_xmalloc((1+n_to_archive)*sizeof(*result) + count_bytes));
1350         char  *base = (char*)(result+1+n_to_archive);
1351         for (i=0; i<n_to_archive; i++) {
1352             int len=1+strlen(all_logs[i]);
1353             result[i]=base;
1354             memcpy(base, all_logs[i], len);
1355             base+=len;
1356         }
1357         result[n_to_archive]=0;
1358     }
1359     for (i=0; all_logs[i]; i++) {
1360         toku_free(all_logs[i]);
1361     }
1362     toku_free(all_logs);
1363     *logs_p = result;
1364     return 0;
1365 }
1366 
1367 
toku_logger_txn_parent(TOKUTXN txn)1368 TOKUTXN toku_logger_txn_parent (TOKUTXN txn) {
1369     return txn->parent;
1370 }
1371 
toku_logger_note_checkpoint(TOKULOGGER logger,LSN lsn)1372 void toku_logger_note_checkpoint(TOKULOGGER logger, LSN lsn) {
1373     logger->last_completed_checkpoint_lsn = lsn;
1374 }
1375 
1376 void
toku_logger_get_status(TOKULOGGER logger,LOGGER_STATUS statp)1377 toku_logger_get_status(TOKULOGGER logger, LOGGER_STATUS statp) {
1378     log_status.init();
1379     if (logger) {
1380         LOG_STATUS_VAL(LOGGER_NEXT_LSN)    = logger->lsn.lsn;
1381         LOG_STATUS_VAL(LOGGER_NUM_WRITES)  = logger->num_writes_to_disk;
1382         LOG_STATUS_VAL(LOGGER_BYTES_WRITTEN)  = logger->bytes_written_to_disk;
1383         // No compression on logfiles so the uncompressed size is just number of bytes written
1384         LOG_STATUS_VAL(LOGGER_UNCOMPRESSED_BYTES_WRITTEN)  = logger->bytes_written_to_disk;
1385         LOG_STATUS_VAL(LOGGER_TOKUTIME_WRITES) = logger->time_spent_writing_to_disk;
1386         LOG_STATUS_VAL(LOGGER_WAIT_BUF_LONG) = logger->num_wait_buf_long;
1387     }
1388     *statp = log_status;
1389 }
1390 
1391 
1392 
1393 //////////////////////////////////////////////////////////////////////////////////////////////////////
1394 // Used for upgrade:
1395 // if any valid log files exist in log_dir, then
1396 //   set *found_any_logs to true and set *version_found to version number of latest log
1397 int
toku_get_version_of_logs_on_disk(const char * log_dir,bool * found_any_logs,uint32_t * version_found)1398 toku_get_version_of_logs_on_disk(const char *log_dir, bool *found_any_logs, uint32_t *version_found) {
1399     bool found = false;
1400     uint32_t highest_version = 0;
1401     int r = 0;
1402 
1403     struct dirent *de;
1404     DIR *d=opendir(log_dir);
1405     if (d==NULL) {
1406         r = get_error_errno();
1407     }
1408     else {
1409         // Examine every file in the directory and find highest version
1410         while ((de=readdir(d))) {
1411             uint32_t this_log_version;
1412             uint64_t this_log_number;
1413             bool is_log = is_a_logfile_any_version(de->d_name, &this_log_number, &this_log_version);
1414             if (is_log) {
1415                 if (!found) {  // first log file found
1416                     found = true;
1417                     highest_version = this_log_version;
1418                 }
1419                 else
1420                     highest_version = highest_version > this_log_version ? highest_version : this_log_version;
1421             }
1422         }
1423         int r2 = closedir(d);
1424         if (r==0) r = r2;
1425     }
1426     if (r==0) {
1427         *found_any_logs = found;
1428         if (found)
1429             *version_found = highest_version;
1430     }
1431     return r;
1432 }
1433 
toku_logger_get_txn_manager(TOKULOGGER logger)1434 TXN_MANAGER toku_logger_get_txn_manager(TOKULOGGER logger) {
1435     return logger->txn_manager;
1436 }
1437