1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include <memory.h>
40 #include <ctype.h>
41 #include <limits.h>
42 #include <unistd.h>
43 
44 #include "ft/serialize/block_table.h"
45 #include "ft/ft.h"
46 #include "ft/logger/log-internal.h"
47 #include "ft/txn/txn_manager.h"
48 #include "ft/txn/rollback_log_node_cache.h"
49 
50 #include "util/status.h"
51 
52 int writing_rollback = 0;
53 extern "C" {
54   uint force_recovery = 0;
55 }
56 
57 static const int log_format_version = TOKU_LOG_VERSION;
58 
59 toku_instr_key *result_output_condition_lock_mutex_key;
60 toku_instr_key *result_output_condition_key;
61 toku_instr_key *tokudb_file_log_key;
62 
63 static int open_logfile(TOKULOGGER logger);
64 static void logger_write_buffer(TOKULOGGER logger, LSN *fsynced_lsn);
65 static void delete_logfile(TOKULOGGER logger,
66                            long long index,
67                            uint32_t version);
68 static void grab_output(TOKULOGGER logger, LSN *fsynced_lsn);
69 static void release_output(TOKULOGGER logger, LSN fsynced_lsn);
70 
toku_print_bytes(FILE * outf,uint32_t len,char * data)71 static void toku_print_bytes (FILE *outf, uint32_t len, char *data) {
72     fprintf(outf, "\"");
73     uint32_t i;
74     for (i=0; i<len; i++) {
75         switch (data[i]) {
76         case '"':  fprintf(outf, "\\\""); break;
77         case '\\': fprintf(outf, "\\\\"); break;
78         case '\n': fprintf(outf, "\\n");  break;
79         default:
80             if (isprint(data[i])) fprintf(outf, "%c", data[i]);
81             else fprintf(outf, "\\%03o", (unsigned char)(data[i]));
82         }
83     }
84     fprintf(outf, "\"");
85 }
86 
is_a_logfile_any_version(const char * name,uint64_t * number_result,uint32_t * version_of_log)87 static bool is_a_logfile_any_version (const char *name, uint64_t *number_result, uint32_t *version_of_log) {
88     bool rval = true;
89     uint64_t result;
90     int n;
91     int r;
92     uint32_t version;
93     r = sscanf(name, "log%" SCNu64 ".tokulog%" SCNu32 "%n", &result, &version, &n);
94     if (r!=2 || name[n]!='\0' || version <= TOKU_LOG_VERSION_1) {
95         //Version 1 does NOT append 'version' to end of '.tokulog'
96         version = TOKU_LOG_VERSION_1;
97         r = sscanf(name, "log%" SCNu64 ".tokulog%n", &result, &n);
98         if (r!=1 || name[n]!='\0') {
99             rval = false;
100         }
101     }
102     if (rval) {
103         *number_result  = result;
104         *version_of_log = version;
105     }
106 
107     return rval;
108 }
109 
110 // added for #2424, improved for #2521
is_a_logfile(const char * name,long long * number_result)111 static bool is_a_logfile (const char *name, long long *number_result) {
112     bool rval;
113     uint64_t result;
114     uint32_t version;
115     rval = is_a_logfile_any_version(name, &result, &version);
116     if (rval && version != TOKU_LOG_VERSION)
117         rval = false;
118     if (rval)
119         *number_result = result;
120     return rval;
121 }
122 
123 
124 // TODO: can't fail
toku_logger_create(TOKULOGGER * resultp)125 int toku_logger_create (TOKULOGGER *resultp) {
126     TOKULOGGER CALLOC(result);
127     if (result==0) return get_error_errno();
128     result->is_open=false;
129     result->write_log_files = true;
130     result->trim_log_files = true;
131     result->directory=0;
132     // fd is uninitialized on purpose
133     // ct is uninitialized on purpose
134     result->lg_max = 100<<20; // 100MB default
135     // lsn is uninitialized
136     result->inbuf  = (struct logbuf) {0, LOGGER_MIN_BUF_SIZE, (char *) toku_xmalloc(LOGGER_MIN_BUF_SIZE), ZERO_LSN};
137     result->outbuf = (struct logbuf) {0, LOGGER_MIN_BUF_SIZE, (char *) toku_xmalloc(LOGGER_MIN_BUF_SIZE), ZERO_LSN};
138     // written_lsn is uninitialized
139     // fsynced_lsn is uninitialized
140     result->last_completed_checkpoint_lsn = ZERO_LSN;
141     // next_log_file_number is uninitialized
142     // n_in_file is uninitialized
143     result->write_block_size = FT_DEFAULT_NODE_SIZE; // default logging size is the same as the default ft block size
144     toku_logfilemgr_create(&result->logfilemgr);
145     *resultp = result;
146     ml_init(&result->input_lock);
147     toku_mutex_init(*result_output_condition_lock_mutex_key,
148                     &result->output_condition_lock,
149                     nullptr);
150     toku_cond_init(
151         *result_output_condition_key, &result->output_condition, nullptr);
152     result->rollback_cachefile = NULL;
153     result->output_is_available = true;
154     toku_txn_manager_init(&result->txn_manager);
155     return 0;
156 }
157 
fsync_logdir(TOKULOGGER logger)158 static void fsync_logdir(TOKULOGGER logger) {
159     toku_fsync_dirfd_without_accounting(logger->dir);
160 }
161 
open_logdir(TOKULOGGER logger,const char * directory)162 static int open_logdir(TOKULOGGER logger, const char *directory) {
163     if (toku_os_is_absolute_name(directory)) {
164         logger->directory = toku_strdup(directory);
165     } else {
166         char cwdbuf[PATH_MAX];
167         char *cwd = getcwd(cwdbuf, PATH_MAX);
168         if (cwd == NULL)
169             return -1;
170         char *MALLOC_N(strlen(cwd) + strlen(directory) + 2, new_log_dir);
171         if (new_log_dir == NULL) {
172             return -2;
173         }
174         sprintf(new_log_dir, "%s/%s", cwd, directory);
175         logger->directory = new_log_dir;
176     }
177     if (logger->directory==0) return get_error_errno();
178 
179     logger->dir = opendir(logger->directory);
180     if ( logger->dir == NULL ) return -1;
181     return 0;
182 }
183 
close_logdir(TOKULOGGER logger)184 static int close_logdir(TOKULOGGER logger) {
185     return closedir(logger->dir);
186 }
187 
188 int
toku_logger_open_with_last_xid(const char * directory,TOKULOGGER logger,TXNID last_xid)189 toku_logger_open_with_last_xid(const char *directory, TOKULOGGER logger, TXNID last_xid) {
190     if (logger->is_open) return EINVAL;
191 
192     int r;
193     TXNID last_xid_if_clean_shutdown = TXNID_NONE;
194     r = toku_logfilemgr_init(logger->logfilemgr, directory, &last_xid_if_clean_shutdown);
195     if ( r!=0 )
196         return r;
197     logger->lsn = toku_logfilemgr_get_last_lsn(logger->logfilemgr);
198     logger->written_lsn = logger->lsn;
199     logger->fsynced_lsn = logger->lsn;
200     logger->inbuf.max_lsn_in_buf  = logger->lsn;
201     logger->outbuf.max_lsn_in_buf = logger->lsn;
202 
203     // open directory, save pointer for fsyncing t:2445
204     r = open_logdir(logger, directory);
205     if (r!=0) return r;
206 
207     long long nexti;
208     r = toku_logger_find_next_unused_log_file(logger->directory, &nexti);
209     if (r!=0) return r;
210 
211     logger->next_log_file_number = nexti;
212     r = open_logfile(logger);
213     if (r!=0) return r;
214     if (last_xid == TXNID_NONE) {
215         last_xid = last_xid_if_clean_shutdown;
216     }
217     toku_txn_manager_set_last_xid_from_logger(logger->txn_manager, last_xid);
218 
219     logger->is_open = true;
220     return 0;
221 }
222 
toku_logger_open(const char * directory,TOKULOGGER logger)223 int toku_logger_open (const char *directory, TOKULOGGER logger) {
224     return toku_logger_open_with_last_xid(directory, logger, TXNID_NONE);
225 }
226 
toku_logger_rollback_is_open(TOKULOGGER logger)227 bool toku_logger_rollback_is_open (TOKULOGGER logger) {
228     return logger->rollback_cachefile != NULL;
229 }
230 
231 #define MAX_CACHED_ROLLBACK_NODES 4096
232 
toku_logger_initialize_rollback_cache(TOKULOGGER logger,FT ft)233 void toku_logger_initialize_rollback_cache(TOKULOGGER logger, FT ft) {
234     ft->blocktable.free_unused_blocknums(ft->h->root_blocknum);
235     logger->rollback_cache.init(MAX_CACHED_ROLLBACK_NODES);
236 }
237 
toku_logger_open_rollback(TOKULOGGER logger,CACHETABLE cachetable,bool create)238 int toku_logger_open_rollback(TOKULOGGER logger, CACHETABLE cachetable, bool create) {
239     writing_rollback++;
240     assert(logger->is_open);
241     assert(!logger->rollback_cachefile);
242 
243     FT_HANDLE ft_handle = nullptr;   // Note, there is no DB associated with this FT.
244     toku_ft_handle_create(&ft_handle);
245     int r = toku_ft_handle_open(ft_handle, toku_product_name_strings.rollback_cachefile, create, create, cachetable, nullptr);
246     if (r == 0) {
247         FT ft = ft_handle->ft;
248         logger->rollback_cachefile = ft->cf;
249         toku_logger_initialize_rollback_cache(logger, ft_handle->ft);
250 
251         // Verify it is empty
252         // Must have no data blocks (rollback logs or otherwise).
253         ft->blocktable.verify_no_data_blocks_except_root(ft->h->root_blocknum);
254         bool is_empty = toku_ft_is_empty_fast(ft_handle);
255         assert(is_empty);
256     } else {
257         toku_ft_handle_close(ft_handle);
258     }
259     writing_rollback--;
260     return r;
261 }
262 
263 
264 //  Requires: Rollback cachefile can only be closed immediately after a checkpoint,
265 //            so it will always be clean (!h->dirty) when about to be closed.
266 //            Rollback log can only be closed when there are no open transactions,
267 //            so it will always be empty (no data blocks) when about to be closed.
toku_logger_close_rollback_check_empty(TOKULOGGER logger,bool clean_shutdown)268 void toku_logger_close_rollback_check_empty(TOKULOGGER logger, bool clean_shutdown) {
269     CACHEFILE cf = logger->rollback_cachefile;  // stored in logger at rollback cachefile open
270     if (cf) {
271         FT_HANDLE ft_to_close;
272         {   //Find "ft_to_close"
273             logger->rollback_cache.destroy();
274             FT CAST_FROM_VOIDP(ft, toku_cachefile_get_userdata(cf));
275             if (clean_shutdown) {
276                 //Verify it is safe to close it.
277                 assert(!ft->h->dirty());  //Must not be dirty.
278                 ft->blocktable.free_unused_blocknums(ft->h->root_blocknum);
279                 // Must have no data blocks (rollback logs or otherwise).
280                 ft->blocktable.verify_no_data_blocks_except_root(ft->h->root_blocknum);
281                 assert(!ft->h->dirty());
282             } else {
283                 ft->h->clear_dirty();
284             }
285             ft_to_close = toku_ft_get_only_existing_ft_handle(ft);
286             if (clean_shutdown) {
287                 bool is_empty;
288                 is_empty = toku_ft_is_empty_fast(ft_to_close);
289                 assert(is_empty);
290                 assert(!ft->h->dirty()); // it should not have been dirtied by the toku_ft_is_empty test.
291             }
292         }
293 
294         toku_ft_handle_close(ft_to_close);
295         //Set as dealt with already.
296         logger->rollback_cachefile = NULL;
297     }
298 }
299 
toku_logger_close_rollback(TOKULOGGER logger)300 void toku_logger_close_rollback(TOKULOGGER logger) {
301     toku_logger_close_rollback_check_empty(logger, true);
302 }
303 
304 // No locks held on entry
305 // No locks held on exit.
306 // No locks are needed, since you cannot legally close the log concurrently with doing anything else.
307 // TODO: can't fail
toku_logger_close(TOKULOGGER * loggerp)308 int toku_logger_close(TOKULOGGER *loggerp) {
309     int r;
310     TOKULOGGER logger = *loggerp;
311     if (!logger->is_open) {
312         goto is_closed;
313     }
314     ml_lock(&logger->input_lock);
315     LSN fsynced_lsn;
316     grab_output(logger, &fsynced_lsn);
317     logger_write_buffer(logger, &fsynced_lsn);
318     if (logger->fd!=-1) {
319         if (logger->write_log_files) {
320             toku_file_fsync_without_accounting(logger->fd);
321         }
322         r = toku_os_close(logger->fd);
323         assert(r == 0);
324     }
325     r = close_logdir(logger);
326     assert(r == 0);
327     logger->fd=-1;
328     release_output(logger, fsynced_lsn);
329 
330 is_closed:
331     toku_free(logger->inbuf.buf);
332     toku_free(logger->outbuf.buf);
333     // before destroying locks they must be left in the unlocked state.
334     ml_destroy(&logger->input_lock);
335     toku_mutex_destroy(&logger->output_condition_lock);
336     toku_cond_destroy(&logger->output_condition);
337     toku_txn_manager_destroy(logger->txn_manager);
338     if (logger->directory) toku_free(logger->directory);
339     toku_logfilemgr_destroy(&logger->logfilemgr);
340     toku_free(logger);
341     *loggerp=0;
342     return 0;
343 }
344 
toku_logger_shutdown(TOKULOGGER logger)345 void toku_logger_shutdown(TOKULOGGER logger) {
346     if (logger->is_open) {
347         TXN_MANAGER mgr = logger->txn_manager;
348         if (toku_txn_manager_num_live_root_txns(mgr) == 0) {
349             TXNID last_xid = toku_txn_manager_get_last_xid(mgr);
350             toku_log_shutdown(logger, NULL, true, 0, last_xid);
351         }
352     }
353 }
354 
close_and_open_logfile(TOKULOGGER logger,LSN * fsynced_lsn)355 static int close_and_open_logfile (TOKULOGGER logger, LSN *fsynced_lsn)
356 // Effect: close the current file, and open the next one.
357 // Entry: This thread has permission to modify the output.
358 // Exit:  This thread has permission to modify the output.
359 {
360     int r;
361     if (logger->write_log_files) {
362         toku_file_fsync_without_accounting(logger->fd);
363         *fsynced_lsn = logger->written_lsn;
364         toku_logfilemgr_update_last_lsn(logger->logfilemgr,
365                                         logger->written_lsn);  // fixes t:2294
366     }
367     r = toku_os_close(logger->fd);
368 
369     if (r != 0)
370         return get_error_errno();
371     return open_logfile(logger);
372 }
373 
374 static int
max_int(int a,int b)375 max_int (int a, int b)
376 {
377     if (a>b) return a;
378     return b;
379 }
380 
381 // ***********************************************************
382 // output mutex/condition manipulation routines
383 // ***********************************************************
384 
385 static void
wait_till_output_available(TOKULOGGER logger)386 wait_till_output_available (TOKULOGGER logger)
387 // Effect: Wait until output becomes available.
388 // Implementation hint: Use a pthread_cond_wait.
389 // Entry: Holds the output_condition_lock (but not the inlock)
390 // Exit: Holds the output_condition_lock and logger->output_is_available
391 //
392 {
393     tokutime_t t0 = toku_time_now();
394     while (!logger->output_is_available) {
395         toku_cond_wait(&logger->output_condition, &logger->output_condition_lock);
396     }
397     if (tokutime_to_seconds(toku_time_now() - t0) >= 0.100) {
398         logger->num_wait_buf_long++;
399     }
400 }
401 
402 static void
grab_output(TOKULOGGER logger,LSN * fsynced_lsn)403 grab_output(TOKULOGGER logger, LSN *fsynced_lsn)
404 // Effect: Wait until output becomes available and get permission to modify output.
405 // Entry: Holds no lock (including not holding the input lock, since we never hold both at once).
406 // Exit:  Hold permission to modify output (but none of the locks).
407 {
408     toku_mutex_lock(&logger->output_condition_lock);
409     wait_till_output_available(logger);
410     logger->output_is_available = false;
411     if (fsynced_lsn) {
412         *fsynced_lsn = logger->fsynced_lsn;
413     }
414     toku_mutex_unlock(&logger->output_condition_lock);
415 }
416 
417 static bool
wait_till_output_already_written_or_output_buffer_available(TOKULOGGER logger,LSN lsn,LSN * fsynced_lsn)418 wait_till_output_already_written_or_output_buffer_available (TOKULOGGER logger, LSN lsn, LSN *fsynced_lsn)
419 // Effect: Wait until either the output is available or the lsn has been written.
420 //  Return true iff the lsn has been written.
421 //  If returning true, then on exit we don't hold output permission.
422 //  If returning false, then on exit we do hold output permission.
423 // Entry: Hold no locks.
424 // Exit: Hold the output permission if returns false.
425 {
426     bool result;
427     toku_mutex_lock(&logger->output_condition_lock);
428     while (1) {
429         if (logger->fsynced_lsn.lsn >= lsn.lsn) { // we can look at the fsynced lsn since we have the lock.
430             result = true;
431             break;
432         }
433         if (logger->output_is_available) {
434             logger->output_is_available = false;
435             result = false;
436             break;
437         }
438         // otherwise wait for a good time to look again.
439         toku_cond_wait(&logger->output_condition, &logger->output_condition_lock);
440     }
441     *fsynced_lsn = logger->fsynced_lsn;
442     toku_mutex_unlock(&logger->output_condition_lock);
443     return result;
444 }
445 
446 static void
release_output(TOKULOGGER logger,LSN fsynced_lsn)447 release_output (TOKULOGGER logger, LSN fsynced_lsn)
448 // Effect: Release output permission.
449 // Entry: Holds output permissions, but no locks.
450 // Exit: Holds neither locks nor output permission.
451 {
452     toku_mutex_lock(&logger->output_condition_lock);
453     logger->output_is_available = true;
454     if (logger->fsynced_lsn.lsn < fsynced_lsn.lsn) {
455         logger->fsynced_lsn = fsynced_lsn;
456     }
457     toku_cond_broadcast(&logger->output_condition);
458     toku_mutex_unlock(&logger->output_condition_lock);
459 }
460 
461 static void
swap_inbuf_outbuf(TOKULOGGER logger)462 swap_inbuf_outbuf (TOKULOGGER logger)
463 // Effect: Swap the inbuf and outbuf
464 // Entry and exit: Hold the input lock and permission to modify output.
465 {
466     struct logbuf tmp = logger->inbuf;
467     logger->inbuf = logger->outbuf;
468     logger->outbuf = tmp;
469     assert(logger->inbuf.n_in_buf == 0);
470 }
471 
472 static void
write_outbuf_to_logfile(TOKULOGGER logger,LSN * fsynced_lsn)473 write_outbuf_to_logfile (TOKULOGGER logger, LSN *fsynced_lsn)
474 // Effect:  Write the contents of outbuf to logfile.  Don't necessarily fsync (but it might, in which case fynced_lsn is updated).
475 //  If the logfile gets too big, open the next one (that's the case where an fsync might happen).
476 // Entry and exit: Holds permission to modify output (and doesn't let it go, so it's ok to also hold the inlock).
477 {
478     if (logger->outbuf.n_in_buf>0) {
479         // Write the outbuf to disk, take accounting measurements
480         tokutime_t io_t0 = toku_time_now();
481         toku_os_full_write(logger->fd, logger->outbuf.buf, logger->outbuf.n_in_buf);
482         tokutime_t io_t1 = toku_time_now();
483         logger->num_writes_to_disk++;
484         logger->bytes_written_to_disk += logger->outbuf.n_in_buf;
485         logger->time_spent_writing_to_disk += (io_t1 - io_t0);
486 
487         assert(logger->outbuf.max_lsn_in_buf.lsn > logger->written_lsn.lsn); // since there is something in the buffer, its LSN must be bigger than what's previously written.
488         logger->written_lsn = logger->outbuf.max_lsn_in_buf;
489         logger->n_in_file += logger->outbuf.n_in_buf;
490         logger->outbuf.n_in_buf = 0;
491     }
492     // If the file got too big, then open a new file.
493     if (logger->n_in_file > logger->lg_max) {
494         int r = close_and_open_logfile(logger, fsynced_lsn);
495         assert_zero(r);
496     }
497 }
498 
499 void
toku_logger_make_space_in_inbuf(TOKULOGGER logger,int n_bytes_needed)500 toku_logger_make_space_in_inbuf (TOKULOGGER logger, int n_bytes_needed)
501 // Entry: Holds the inlock
502 // Exit:  Holds the inlock
503 // Effect: Upon exit, the inlock is held and there are at least n_bytes_needed in the buffer.
504 //  May release the inlock (and then reacquire it), so this is not atomic.
505 //  May obtain the output lock and output permission (but if it does so, it will have released the inlock, since we don't hold both locks at once).
506 //   (But may hold output permission and inlock at the same time.)
507 // Implementation hint: Makes space in the inbuf, possibly by writing the inbuf to disk or increasing the size of the inbuf.  There might not be an fsync.
508 // Arguments:  logger:         the logger (side effects)
509 //             n_bytes_needed: how many bytes to make space for.
510 {
511     if (logger->inbuf.n_in_buf + n_bytes_needed <= LOGGER_MIN_BUF_SIZE) {
512         return;
513     }
514     ml_unlock(&logger->input_lock);
515     LSN fsynced_lsn;
516     grab_output(logger, &fsynced_lsn);
517 
518     ml_lock(&logger->input_lock);
519     // Some other thread may have written the log out while we didn't have the lock.  If we have space now, then be happy.
520     if (logger->inbuf.n_in_buf + n_bytes_needed <= LOGGER_MIN_BUF_SIZE) {
521         release_output(logger, fsynced_lsn);
522         return;
523     }
524     if (logger->inbuf.n_in_buf > 0) {
525         // There isn't enough space, and there is something in the buffer, so write the inbuf.
526         swap_inbuf_outbuf(logger);
527 
528         // Don't release the inlock in this case, because we don't want to get starved.
529         write_outbuf_to_logfile(logger, &fsynced_lsn);
530     }
531     // the inbuf is empty.  Make it big enough (just in case it is somehow smaller than a single log entry).
532     if (n_bytes_needed > logger->inbuf.buf_size) {
533         assert(n_bytes_needed < (1<<30)); // it seems unlikely to work if a logentry gets that big.
534         int new_size = max_int(logger->inbuf.buf_size * 2, n_bytes_needed); // make it at least twice as big, and big enough for n_bytes
535         assert(new_size < (1<<30));
536         XREALLOC_N(new_size, logger->inbuf.buf);
537         logger->inbuf.buf_size = new_size;
538     }
539     release_output(logger, fsynced_lsn);
540 }
541 
toku_logger_fsync(TOKULOGGER logger)542 void toku_logger_fsync(TOKULOGGER logger)
543 // Effect: This is the exported fsync used by ydb.c for env_log_flush.  Group commit doesn't have to work.
544 // Entry: Holds no locks
545 // Exit: Holds no locks
546 // Implementation note:  Acquire the output condition lock, then the output permission, then release the output condition lock, then get the input lock.
547 // Then release everything.  Hold the input lock while reading the current max lsn in buf to make drd happy that there is no data race.
548 {
549     ml_lock(&logger->input_lock);
550     const LSN max_lsn_in_buf = logger->inbuf.max_lsn_in_buf;
551     ml_unlock(&logger->input_lock);
552 
553     toku_logger_maybe_fsync(logger, max_lsn_in_buf, true, false);
554 }
555 
toku_logger_fsync_if_lsn_not_fsynced(TOKULOGGER logger,LSN lsn)556 void toku_logger_fsync_if_lsn_not_fsynced (TOKULOGGER logger, LSN lsn) {
557     if (logger->write_log_files) {
558         toku_logger_maybe_fsync(logger, lsn, true, false);
559     }
560 }
561 
toku_logger_is_open(TOKULOGGER logger)562 int toku_logger_is_open(TOKULOGGER logger) {
563     if (logger==0) return 0;
564     return logger->is_open;
565 }
566 
toku_logger_set_cachetable(TOKULOGGER logger,CACHETABLE ct)567 void toku_logger_set_cachetable (TOKULOGGER logger, CACHETABLE ct) {
568     logger->ct = ct;
569 }
570 
toku_logger_set_lg_max(TOKULOGGER logger,uint32_t lg_max)571 int toku_logger_set_lg_max(TOKULOGGER logger, uint32_t lg_max) {
572     if (logger==0) return EINVAL; // no logger
573     if (logger->is_open) return EINVAL;
574     if (lg_max>(1<<30)) return EINVAL; // too big
575     logger->lg_max = lg_max;
576     return 0;
577 }
toku_logger_get_lg_max(TOKULOGGER logger,uint32_t * lg_maxp)578 int toku_logger_get_lg_max(TOKULOGGER logger, uint32_t *lg_maxp) {
579     if (logger==0) return EINVAL; // no logger
580     *lg_maxp = logger->lg_max;
581     return 0;
582 }
583 
toku_logger_set_lg_bsize(TOKULOGGER logger,uint32_t bsize)584 int toku_logger_set_lg_bsize(TOKULOGGER logger, uint32_t bsize) {
585     if (logger==0) return EINVAL; // no logger
586     if (logger->is_open) return EINVAL;
587     if (bsize<=0 || bsize>(1<<30)) return EINVAL;
588     logger->write_block_size = bsize;
589     return 0;
590 }
591 
toku_logger_find_next_unused_log_file(const char * directory,long long * result)592 int toku_logger_find_next_unused_log_file(const char *directory, long long *result)
593 // This is called during logger initialalization, and no locks are required.
594 {
595     DIR *d=opendir(directory);
596     long long maxf=-1; *result = maxf;
597     struct dirent *de;
598     if (d==0) return get_error_errno();
599     while ((de=readdir(d))) {
600         if (de==0) return get_error_errno();
601         long long thisl = -1;
602         if ( is_a_logfile(de->d_name, &thisl) ) {
603             if ((long long)thisl > maxf) maxf = thisl;
604         }
605     }
606     *result=maxf+1;
607     int r = closedir(d);
608     return r;
609 }
610 
611 // TODO: Put this in portability layer when ready
612 // in: file pathname that may have a dirname prefix
613 // return: file leaf name
fileleafname(char * pathname)614 static char * fileleafname(char *pathname) {
615     const char delimiter = '/';
616     char *leafname = strrchr(pathname, delimiter);
617     if (leafname)
618         leafname++;
619     else
620         leafname = pathname;
621     return leafname;
622 }
623 
logfilenamecompare(const void * ap,const void * bp)624 static int logfilenamecompare (const void *ap, const void *bp) {
625     char *a=*(char**)ap;
626     char *a_leafname = fileleafname(a);
627     char *b=*(char**)bp;
628     char * b_leafname = fileleafname(b);
629     int rval;
630     bool valid;
631     uint64_t num_a = 0;  // placate compiler
632     uint64_t num_b = 0;
633     uint32_t ver_a = 0;
634     uint32_t ver_b = 0;
635     valid = is_a_logfile_any_version(a_leafname, &num_a, &ver_a);
636     invariant(valid);
637     valid = is_a_logfile_any_version(b_leafname, &num_b, &ver_b);
638     invariant(valid);
639     if (ver_a < ver_b) rval = -1;
640     else if (ver_a > ver_b) rval = +1;
641     else if (num_a < num_b) rval = -1;
642     else if (num_a > num_b) rval = +1;
643     else rval = 0;
644     return rval;
645 }
646 
647 // Return the log files in sorted order
648 // Return a null_terminated array of strings, and also return the number of strings in the array.
649 // Requires: Race conditions must be dealt with by caller.  Either call during initialization or grab the output permission.
toku_logger_find_logfiles(const char * directory,char *** resultp,int * n_logfiles)650 int toku_logger_find_logfiles (const char *directory, char ***resultp, int *n_logfiles)
651 {
652     int result_limit=2;
653     int n_results=0;
654     char **MALLOC_N(result_limit, result);
655     assert(result!= NULL);
656     struct dirent *de;
657     DIR *d=opendir(directory);
658     if (d==0) {
659         int er = get_error_errno();
660         toku_free(result);
661         return er;
662     }
663     int dirnamelen = strlen(directory);
664     while ((de=readdir(d))) {
665         uint64_t thisl;
666         uint32_t version_ignore;
667         if ( !(is_a_logfile_any_version(de->d_name, &thisl, &version_ignore)) ) continue; //#2424: Skip over files that don't match the exact logfile template
668         if (n_results+1>=result_limit) {
669             result_limit*=2;
670             XREALLOC_N(result_limit, result);
671         }
672         int fnamelen = dirnamelen + strlen(de->d_name) + 2; // One for the slash and one for the trailing NUL.
673         char *XMALLOC_N(fnamelen, fname);
674         snprintf(fname, fnamelen, "%s/%s", directory, de->d_name);
675         result[n_results++] = fname;
676     }
677     // Return them in increasing order.  Set width to allow for newer log file names ("xxx.tokulog13")
678     // which are one character longer than old log file names ("xxx.tokulog2").  The comparison function
679     // won't look beyond the terminating NUL, so an extra character in the comparison string doesn't matter.
680     // Allow room for terminating NUL after "xxx.tokulog13" even if result[0] is of form "xxx.tokulog2."
681     int width = sizeof(result[0]+2);
682     qsort(result, n_results, width, logfilenamecompare);
683     *resultp    = result;
684     *n_logfiles = n_results;
685     result[n_results]=0; // make a trailing null
686     return d ? closedir(d) : 0;
687 }
688 
toku_logger_free_logfiles(char ** logfiles,int n_logfiles)689 void toku_logger_free_logfiles(char **logfiles, int n_logfiles) {
690     for (int i = 0; i < n_logfiles; i++)
691         toku_free(logfiles[i]);
692     toku_free(logfiles);
693 }
694 
open_logfile(TOKULOGGER logger)695 static int open_logfile (TOKULOGGER logger)
696 // Entry and Exit: This thread has permission to modify the output.
697 {
698     int fnamelen = strlen(logger->directory)+50;
699     char fname[fnamelen];
700     snprintf(fname,
701              fnamelen,
702              "%s/log%012lld.tokulog%d",
703              logger->directory,
704              logger->next_log_file_number,
705              TOKU_LOG_VERSION);
706     long long index = logger->next_log_file_number;
707     if (logger->write_log_files) {
708         logger->fd =
709             toku_os_open(fname,
710                          O_CREAT + O_WRONLY + O_TRUNC + O_EXCL + O_BINARY,
711                          S_IRUSR + S_IWUSR,
712                          *tokudb_file_log_key);
713         if (logger->fd == -1) {
714             return get_error_errno();
715         }
716         fsync_logdir(logger);
717         logger->next_log_file_number++;
718     } else {
719         logger->fd = toku_os_open(
720             DEV_NULL_FILE, O_WRONLY + O_BINARY, S_IWUSR, *tokudb_file_log_key);
721         if (logger->fd == -1) {
722             return get_error_errno();
723         }
724     }
725     toku_os_full_write(logger->fd, "tokulogg", 8);
726     int version_l = toku_htonl(log_format_version); //version MUST be in network byte order regardless of disk order
727     toku_os_full_write(logger->fd, &version_l, 4);
728     if ( logger->write_log_files ) {
729         TOKULOGFILEINFO XMALLOC(lf_info);
730         lf_info->index = index;
731         lf_info->maxlsn = logger->written_lsn;
732         lf_info->version = TOKU_LOG_VERSION;
733         toku_logfilemgr_add_logfile_info(logger->logfilemgr, lf_info);
734     }
735     logger->fsynced_lsn = logger->written_lsn;
736     logger->n_in_file = 12;
737     return 0;
738 }
739 
delete_logfile(TOKULOGGER logger,long long index,uint32_t version)740 static void delete_logfile(TOKULOGGER logger, long long index, uint32_t version)
741 // Entry and Exit: This thread has permission to modify the output.
742 {
743     int fnamelen = strlen(logger->directory)+50;
744     char fname[fnamelen];
745     snprintf(fname, fnamelen, "%s/log%012lld.tokulog%d", logger->directory, index, version);
746     int r = remove(fname);
747     invariant_zero(r);
748 }
749 
toku_logger_maybe_trim_log(TOKULOGGER logger,LSN trim_lsn)750 void toku_logger_maybe_trim_log(TOKULOGGER logger, LSN trim_lsn)
751 // On entry and exit: No logger locks held.
752 // Acquires and releases output permission.
753 {
754     LSN fsynced_lsn;
755     grab_output(logger, &fsynced_lsn);
756     TOKULOGFILEMGR lfm = logger->logfilemgr;
757     int n_logfiles = toku_logfilemgr_num_logfiles(lfm);
758 
759     TOKULOGFILEINFO lf_info = NULL;
760 
761     if ( logger->write_log_files && logger->trim_log_files) {
762         while ( n_logfiles > 1 ) { // don't delete current logfile
763             uint32_t log_version;
764             lf_info = toku_logfilemgr_get_oldest_logfile_info(lfm);
765             log_version = lf_info->version;
766             if ( lf_info->maxlsn.lsn >= trim_lsn.lsn ) {
767                 // file contains an open LSN, can't delete this or any newer log files
768                 break;
769             }
770             // need to save copy - toku_logfilemgr_delete_oldest_logfile_info free's the lf_info
771             long index = lf_info->index;
772             toku_logfilemgr_delete_oldest_logfile_info(lfm);
773             n_logfiles--;
774             delete_logfile(logger, index, log_version);
775         }
776     }
777     release_output(logger, fsynced_lsn);
778 }
779 
toku_logger_write_log_files(TOKULOGGER logger,bool write_log_files)780 void toku_logger_write_log_files (TOKULOGGER logger, bool write_log_files)
781 // Called only during initialization (or just after recovery), so no locks are needed.
782 {
783     logger->write_log_files = write_log_files;
784 }
785 
toku_logger_trim_log_files(TOKULOGGER logger,bool trim_log_files)786 void toku_logger_trim_log_files (TOKULOGGER logger, bool trim_log_files)
787 // Called only during initialization, so no locks are needed.
788 {
789     logger->trim_log_files = trim_log_files;
790 }
791 
toku_logger_txns_exist(TOKULOGGER logger)792 bool toku_logger_txns_exist(TOKULOGGER logger)
793 // Called during close of environment to ensure that transactions don't exist
794 {
795     return toku_txn_manager_txns_exist(logger->txn_manager);
796 }
797 
798 
toku_logger_maybe_fsync(TOKULOGGER logger,LSN lsn,int do_fsync,bool holds_input_lock)799 void toku_logger_maybe_fsync(TOKULOGGER logger, LSN lsn, int do_fsync, bool holds_input_lock)
800 // Effect: If fsync is nonzero, then make sure that the log is flushed and synced at least up to lsn.
801 // Entry: Holds input lock iff 'holds_input_lock'.  The log entry has already been written to the input buffer.
802 // Exit:  Holds no locks.
803 // The input lock may be released and then reacquired.  Thus this function does not run atomically with respect to other threads.
804 {
805     if (holds_input_lock) {
806         ml_unlock(&logger->input_lock);
807     }
808     if (do_fsync) {
809         // reacquire the locks (acquire output permission first)
810         LSN  fsynced_lsn;
811         bool already_done = wait_till_output_already_written_or_output_buffer_available(logger, lsn, &fsynced_lsn);
812         if (already_done) {
813             return;
814         }
815 
816         // otherwise we now own the output permission, and our lsn isn't outputed.
817 
818         ml_lock(&logger->input_lock);
819 
820         swap_inbuf_outbuf(logger);
821 
822         ml_unlock(&logger->input_lock); // release the input lock now, so other threads can fill the inbuf.  (Thus enabling group commit.)
823 
824         write_outbuf_to_logfile(logger, &fsynced_lsn);
825         if (fsynced_lsn.lsn < lsn.lsn) {
826             // it may have gotten fsynced by the write_outbuf_to_logfile.
827             toku_file_fsync_without_accounting(logger->fd);
828             assert(fsynced_lsn.lsn <= logger->written_lsn.lsn);
829             fsynced_lsn = logger->written_lsn;
830         }
831         // the last lsn is only accessed while holding output permission or else when the log file is old.
832         if (logger->write_log_files) {
833             toku_logfilemgr_update_last_lsn(logger->logfilemgr, logger->written_lsn);
834         }
835         release_output(logger, fsynced_lsn);
836     }
837 }
838 
839 static void
logger_write_buffer(TOKULOGGER logger,LSN * fsynced_lsn)840 logger_write_buffer(TOKULOGGER logger, LSN *fsynced_lsn)
841 // Entry:  Holds the input lock and permission to modify output.
842 // Exit:   Holds only the permission to modify output.
843 // Effect:  Write the buffers to the output.  If DO_FSYNC is true, then fsync.
844 // Note: Only called during single-threaded activity from toku_logger_restart, so locks aren't really needed.
845 {
846     swap_inbuf_outbuf(logger);
847     ml_unlock(&logger->input_lock);
848     write_outbuf_to_logfile(logger, fsynced_lsn);
849     if (logger->write_log_files) {
850         toku_file_fsync_without_accounting(logger->fd);
851         toku_logfilemgr_update_last_lsn(logger->logfilemgr, logger->written_lsn);  // t:2294
852     }
853 }
854 
toku_logger_restart(TOKULOGGER logger,LSN lastlsn)855 int toku_logger_restart(TOKULOGGER logger, LSN lastlsn)
856 // Entry and exit: Holds no locks (this is called only during single-threaded activity, such as initial start).
857 {
858     int r;
859 
860     // flush out the log buffer
861     LSN fsynced_lsn;
862     grab_output(logger, &fsynced_lsn);
863     ml_lock(&logger->input_lock);
864     logger_write_buffer(logger, &fsynced_lsn);
865 
866     // close the log file
867     if (logger->write_log_files) {  // fsyncs don't work to /dev/null
868         toku_file_fsync_without_accounting(logger->fd);
869     }
870     r = toku_os_close(logger->fd);
871     assert(r == 0);
872     logger->fd = -1;
873 
874     // reset the LSN's to the lastlsn when the logger was opened
875     logger->lsn = logger->written_lsn = logger->fsynced_lsn = lastlsn;
876     logger->write_log_files = true;
877     logger->trim_log_files = true;
878 
879     // open a new log file
880     r = open_logfile(logger);
881     release_output(logger, fsynced_lsn);
882     return r;
883 }
884 
885 // fname is the iname
toku_logger_log_fcreate(TOKUTXN txn,const char * fname,FILENUM filenum,uint32_t mode,uint32_t treeflags,uint32_t nodesize,uint32_t basementnodesize,enum toku_compression_method compression_method)886 void toku_logger_log_fcreate (TOKUTXN txn, const char *fname, FILENUM filenum, uint32_t mode,
887         uint32_t treeflags, uint32_t nodesize, uint32_t basementnodesize,
888         enum toku_compression_method compression_method) {
889     if (txn) {
890         BYTESTRING bs_fname = { .len = (uint32_t) strlen(fname), .data = (char *) fname };
891         // fsync log on fcreate
892         toku_log_fcreate (txn->logger, (LSN*)0, 1, txn, toku_txn_get_txnid(txn), filenum,
893                 bs_fname, mode, treeflags, nodesize, basementnodesize, compression_method);
894     }
895 }
896 
897 
898 // We only do fdelete on open ft's, so we pass the filenum here
toku_logger_log_fdelete(TOKUTXN txn,FILENUM filenum)899 void toku_logger_log_fdelete (TOKUTXN txn, FILENUM filenum) {
900     if (txn) {
901         //No fsync.
902         toku_log_fdelete (txn->logger, (LSN*)0, 0, txn, toku_txn_get_txnid(txn), filenum);
903     }
904 }
905 
906 
907 
908 /* fopen isn't really an action.  It's just for bookkeeping.  We need to know the filename that goes with a filenum. */
toku_logger_log_fopen(TOKUTXN txn,const char * fname,FILENUM filenum,uint32_t treeflags)909 void toku_logger_log_fopen (TOKUTXN txn, const char * fname, FILENUM filenum, uint32_t treeflags) {
910     if (txn) {
911         BYTESTRING bs;
912         bs.len = strlen(fname);
913         bs.data = (char*)fname;
914         toku_log_fopen (txn->logger, (LSN*)0, 0, bs, filenum, treeflags);
915     }
916 }
917 
toku_fread_uint8_t_nocrclen(FILE * f,uint8_t * v)918 static int toku_fread_uint8_t_nocrclen (FILE *f, uint8_t *v) {
919     int vi=fgetc(f);
920     if (vi==EOF) return -1;
921     uint8_t vc=(uint8_t)vi;
922     *v = vc;
923     return 0;
924 }
925 
toku_fread_uint8_t(FILE * f,uint8_t * v,struct x1764 * mm,uint32_t * len)926 int toku_fread_uint8_t (FILE *f, uint8_t *v, struct x1764 *mm, uint32_t *len) {
927     int vi=fgetc(f);
928     if (vi==EOF) return -1;
929     uint8_t vc=(uint8_t)vi;
930     toku_x1764_add(mm, &vc, 1);
931     (*len)++;
932     *v = vc;
933     return 0;
934 }
935 
toku_fread_uint32_t_nocrclen(FILE * f,uint32_t * v)936 int toku_fread_uint32_t_nocrclen (FILE *f, uint32_t *v) {
937     uint32_t result;
938     uint8_t *cp = (uint8_t*)&result;
939     int r;
940     r = toku_fread_uint8_t_nocrclen (f, cp+0); if (r!=0) return r;
941     r = toku_fread_uint8_t_nocrclen (f, cp+1); if (r!=0) return r;
942     r = toku_fread_uint8_t_nocrclen (f, cp+2); if (r!=0) return r;
943     r = toku_fread_uint8_t_nocrclen (f, cp+3); if (r!=0) return r;
944     *v = toku_dtoh32(result);
945 
946     return 0;
947 }
toku_fread_uint32_t(FILE * f,uint32_t * v,struct x1764 * checksum,uint32_t * len)948 int toku_fread_uint32_t (FILE *f, uint32_t *v, struct x1764 *checksum, uint32_t *len) {
949     uint32_t result;
950     uint8_t *cp = (uint8_t*)&result;
951     int r;
952     r = toku_fread_uint8_t (f, cp+0, checksum, len); if(r!=0) return r;
953     r = toku_fread_uint8_t (f, cp+1, checksum, len); if(r!=0) return r;
954     r = toku_fread_uint8_t (f, cp+2, checksum, len); if(r!=0) return r;
955     r = toku_fread_uint8_t (f, cp+3, checksum, len); if(r!=0) return r;
956     *v = toku_dtoh32(result);
957     return 0;
958 }
959 
toku_fread_uint64_t(FILE * f,uint64_t * v,struct x1764 * checksum,uint32_t * len)960 int toku_fread_uint64_t (FILE *f, uint64_t *v, struct x1764 *checksum, uint32_t *len) {
961     uint32_t v1,v2;
962     int r;
963     r=toku_fread_uint32_t(f, &v1, checksum, len);    if (r!=0) return r;
964     r=toku_fread_uint32_t(f, &v2, checksum, len);    if (r!=0) return r;
965     *v = (((uint64_t)v1)<<32 ) | ((uint64_t)v2);
966     return 0;
967 }
968 
toku_fread_bool(FILE * f,bool * v,struct x1764 * mm,uint32_t * len)969 int toku_fread_bool (FILE *f, bool *v, struct x1764 *mm, uint32_t *len) {
970     uint8_t iv;
971     int r = toku_fread_uint8_t(f, &iv, mm, len);
972     if (r == 0) {
973         *v = (iv!=0);
974     }
975     return r;
976 }
977 
toku_fread_LSN(FILE * f,LSN * lsn,struct x1764 * checksum,uint32_t * len)978 int toku_fread_LSN     (FILE *f, LSN *lsn, struct x1764 *checksum, uint32_t *len) {
979     return toku_fread_uint64_t (f, &lsn->lsn, checksum, len);
980 }
981 
toku_fread_BLOCKNUM(FILE * f,BLOCKNUM * b,struct x1764 * checksum,uint32_t * len)982 int toku_fread_BLOCKNUM (FILE *f, BLOCKNUM *b, struct x1764 *checksum, uint32_t *len) {
983     return toku_fread_uint64_t (f, (uint64_t*)&b->b, checksum, len);
984 }
985 
toku_fread_FILENUM(FILE * f,FILENUM * filenum,struct x1764 * checksum,uint32_t * len)986 int toku_fread_FILENUM (FILE *f, FILENUM *filenum, struct x1764 *checksum, uint32_t *len) {
987     return toku_fread_uint32_t (f, &filenum->fileid, checksum, len);
988 }
989 
toku_fread_TXNID(FILE * f,TXNID * txnid,struct x1764 * checksum,uint32_t * len)990 int toku_fread_TXNID   (FILE *f, TXNID *txnid, struct x1764 *checksum, uint32_t *len) {
991     return toku_fread_uint64_t (f, txnid, checksum, len);
992 }
993 
toku_fread_TXNID_PAIR(FILE * f,TXNID_PAIR * txnid,struct x1764 * checksum,uint32_t * len)994 int toku_fread_TXNID_PAIR   (FILE *f, TXNID_PAIR *txnid, struct x1764 *checksum, uint32_t *len) {
995     TXNID parent;
996     TXNID child;
997     int r;
998     r = toku_fread_TXNID(f, &parent, checksum, len); if (r != 0) { return r; }
999     r = toku_fread_TXNID(f, &child, checksum, len);  if (r != 0) { return r; }
1000     txnid->parent_id64 = parent;
1001     txnid->child_id64 = child;
1002     return 0;
1003 }
1004 
1005 
toku_fread_XIDP(FILE * f,XIDP * xidp,struct x1764 * checksum,uint32_t * len)1006 int toku_fread_XIDP    (FILE *f, XIDP *xidp, struct x1764 *checksum, uint32_t *len) {
1007     // These reads are verbose because XA defined the fields as "long", but we use 4 bytes, 1 byte and 1 byte respectively.
1008     TOKU_XA_XID *XMALLOC(xid);
1009     {
1010         uint32_t formatID;
1011         int r = toku_fread_uint32_t(f, &formatID,     checksum, len);
1012         if (r!=0) return r;
1013         xid->formatID = formatID;
1014     }
1015     {
1016         uint8_t gtrid_length;
1017         int r = toku_fread_uint8_t (f, &gtrid_length, checksum, len);
1018         if (r!=0) return r;
1019         xid->gtrid_length = gtrid_length;
1020     }
1021     {
1022         uint8_t bqual_length;
1023         int r = toku_fread_uint8_t (f, &bqual_length, checksum, len);
1024         if (r!=0) return r;
1025         xid->bqual_length = bqual_length;
1026     }
1027     for (int i=0; i< xid->gtrid_length + xid->bqual_length; i++) {
1028         uint8_t byte;
1029         int r = toku_fread_uint8_t(f, &byte, checksum, len);
1030         if (r!=0) return r;
1031         xid->data[i] = byte;
1032     }
1033     *xidp = xid;
1034     return 0;
1035 }
1036 
1037 // fills in the bs with malloced data.
toku_fread_BYTESTRING(FILE * f,BYTESTRING * bs,struct x1764 * checksum,uint32_t * len)1038 int toku_fread_BYTESTRING (FILE *f, BYTESTRING *bs, struct x1764 *checksum, uint32_t *len) {
1039     int r=toku_fread_uint32_t(f, (uint32_t*)&bs->len, checksum, len);
1040     if (r!=0) return r;
1041     XMALLOC_N(bs->len, bs->data);
1042     uint32_t i;
1043     for (i=0; i<bs->len; i++) {
1044         r=toku_fread_uint8_t(f, (uint8_t*)&bs->data[i], checksum, len);
1045         if (r!=0) {
1046             toku_free(bs->data);
1047             bs->data=0;
1048             return r;
1049         }
1050     }
1051     return 0;
1052 }
1053 
1054 // fills in the fs with malloced data.
toku_fread_FILENUMS(FILE * f,FILENUMS * fs,struct x1764 * checksum,uint32_t * len)1055 int toku_fread_FILENUMS (FILE *f, FILENUMS *fs, struct x1764 *checksum, uint32_t *len) {
1056     int r=toku_fread_uint32_t(f, (uint32_t*)&fs->num, checksum, len);
1057     if (r!=0) return r;
1058     XMALLOC_N(fs->num, fs->filenums);
1059     uint32_t i;
1060     for (i=0; i<fs->num; i++) {
1061         r=toku_fread_FILENUM (f, &fs->filenums[i], checksum, len);
1062         if (r!=0) {
1063             toku_free(fs->filenums);
1064             fs->filenums=0;
1065             return r;
1066         }
1067     }
1068     return 0;
1069 }
1070 
toku_logprint_LSN(FILE * outf,FILE * inf,const char * fieldname,struct x1764 * checksum,uint32_t * len,const char * format)1071 int toku_logprint_LSN (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format __attribute__((__unused__))) {
1072     LSN v;
1073     int r = toku_fread_LSN(inf, &v, checksum, len);
1074     if (r!=0) return r;
1075     fprintf(outf, " %s=%" PRIu64, fieldname, v.lsn);
1076     return 0;
1077 }
1078 
toku_logprint_TXNID(FILE * outf,FILE * inf,const char * fieldname,struct x1764 * checksum,uint32_t * len,const char * format)1079 int toku_logprint_TXNID (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format __attribute__((__unused__))) {
1080     TXNID v;
1081     int r = toku_fread_TXNID(inf, &v, checksum, len);
1082     if (r!=0) return r;
1083     fprintf(outf, " %s=%" PRIu64, fieldname, v);
1084     return 0;
1085 }
1086 
toku_logprint_TXNID_PAIR(FILE * outf,FILE * inf,const char * fieldname,struct x1764 * checksum,uint32_t * len,const char * format)1087 int toku_logprint_TXNID_PAIR (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format __attribute__((__unused__))) {
1088     TXNID_PAIR v;
1089     int r = toku_fread_TXNID_PAIR(inf, &v, checksum, len);
1090     if (r!=0) return r;
1091     fprintf(outf, " %s=%" PRIu64 ",%" PRIu64, fieldname, v.parent_id64, v.child_id64);
1092     return 0;
1093 }
1094 
toku_logprint_XIDP(FILE * outf,FILE * inf,const char * fieldname,struct x1764 * checksum,uint32_t * len,const char * format)1095 int toku_logprint_XIDP (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format __attribute__((__unused__))) {
1096     XIDP vp;
1097     int r = toku_fread_XIDP(inf, &vp, checksum, len);
1098     if (r!=0) return r;
1099     fprintf(outf, " %s={formatID=0x%lx gtrid_length=%ld bqual_length=%ld data=", fieldname, vp->formatID, vp->gtrid_length, vp->bqual_length);
1100     toku_print_bytes(outf, vp->gtrid_length + vp->bqual_length, vp->data);
1101     fprintf(outf, "}");
1102     toku_free(vp);
1103     return 0;
1104 }
1105 
toku_logprint_uint8_t(FILE * outf,FILE * inf,const char * fieldname,struct x1764 * checksum,uint32_t * len,const char * format)1106 int toku_logprint_uint8_t (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format) {
1107     uint8_t v;
1108     int r = toku_fread_uint8_t(inf, &v, checksum, len);
1109     if (r!=0) return r;
1110     fprintf(outf, " %s=%d", fieldname, v);
1111     if (format) fprintf(outf, format, v);
1112     else if (v=='\'') fprintf(outf, "('\'')");
1113     else if (isprint(v)) fprintf(outf, "('%c')", v);
1114     else {}/*nothing*/
1115     return 0;
1116 }
1117 
toku_logprint_uint32_t(FILE * outf,FILE * inf,const char * fieldname,struct x1764 * checksum,uint32_t * len,const char * format)1118 int toku_logprint_uint32_t (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format) {
1119     uint32_t v;
1120     int r = toku_fread_uint32_t(inf, &v, checksum, len);
1121     if (r!=0) return r;
1122     fprintf(outf, " %s=", fieldname);
1123     fprintf(outf, format ? format : "%d", v);
1124     return 0;
1125 }
1126 
toku_logprint_uint64_t(FILE * outf,FILE * inf,const char * fieldname,struct x1764 * checksum,uint32_t * len,const char * format)1127 int toku_logprint_uint64_t (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format) {
1128     uint64_t v;
1129     int r = toku_fread_uint64_t(inf, &v, checksum, len);
1130     if (r!=0) return r;
1131     fprintf(outf, " %s=", fieldname);
1132     fprintf(outf, format ? format : "%" PRId64, v);
1133     return 0;
1134 }
1135 
toku_logprint_bool(FILE * outf,FILE * inf,const char * fieldname,struct x1764 * checksum,uint32_t * len,const char * format)1136 int toku_logprint_bool (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format __attribute__((__unused__))) {
1137     bool v;
1138     int r = toku_fread_bool(inf, &v, checksum, len);
1139     if (r!=0) return r;
1140     fprintf(outf, " %s=%s", fieldname, v ? "true" : "false");
1141     return 0;
1142 
1143 }
1144 
toku_print_BYTESTRING(FILE * outf,uint32_t len,char * data)1145 void toku_print_BYTESTRING (FILE *outf, uint32_t len, char *data) {
1146     fprintf(outf, "{len=%u data=", len);
1147     toku_print_bytes(outf, len, data);
1148     fprintf(outf, "}");
1149 
1150 }
1151 
toku_logprint_BYTESTRING(FILE * outf,FILE * inf,const char * fieldname,struct x1764 * checksum,uint32_t * len,const char * format)1152 int toku_logprint_BYTESTRING (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format __attribute__((__unused__))) {
1153     BYTESTRING bs;
1154     int r = toku_fread_BYTESTRING(inf, &bs, checksum, len);
1155     if (r!=0) return r;
1156     fprintf(outf, " %s=", fieldname);
1157     toku_print_BYTESTRING(outf, bs.len, bs.data);
1158     toku_free(bs.data);
1159     return 0;
1160 }
1161 
toku_logprint_BLOCKNUM(FILE * outf,FILE * inf,const char * fieldname,struct x1764 * checksum,uint32_t * len,const char * format)1162 int toku_logprint_BLOCKNUM (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format) {
1163     return toku_logprint_uint64_t(outf, inf, fieldname, checksum, len, format);
1164 
1165 }
1166 
toku_logprint_FILENUM(FILE * outf,FILE * inf,const char * fieldname,struct x1764 * checksum,uint32_t * len,const char * format)1167 int toku_logprint_FILENUM (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format) {
1168     return toku_logprint_uint32_t(outf, inf, fieldname, checksum, len, format);
1169 
1170 }
1171 
1172 static void
toku_print_FILENUMS(FILE * outf,uint32_t num,FILENUM * filenums)1173 toku_print_FILENUMS (FILE *outf, uint32_t num, FILENUM *filenums) {
1174     fprintf(outf, "{num=%u filenums=\"", num);
1175     uint32_t i;
1176     for (i=0; i<num; i++) {
1177         if (i>0)
1178             fprintf(outf, ",");
1179         fprintf(outf, "0x%" PRIx32, filenums[i].fileid);
1180     }
1181     fprintf(outf, "\"}");
1182 
1183 }
1184 
toku_logprint_FILENUMS(FILE * outf,FILE * inf,const char * fieldname,struct x1764 * checksum,uint32_t * len,const char * format)1185 int toku_logprint_FILENUMS (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, uint32_t *len, const char *format __attribute__((__unused__))) {
1186     FILENUMS bs;
1187     int r = toku_fread_FILENUMS(inf, &bs, checksum, len);
1188     if (r!=0) return r;
1189     fprintf(outf, " %s=", fieldname);
1190     toku_print_FILENUMS(outf, bs.num, bs.filenums);
1191     toku_free(bs.filenums);
1192     return 0;
1193 }
1194 
toku_read_and_print_logmagic(FILE * f,uint32_t * versionp)1195 int toku_read_and_print_logmagic (FILE *f, uint32_t *versionp) {
1196     {
1197         char magic[8];
1198         int r=fread(magic, 1, 8, f);
1199         if (r!=8) {
1200             return DB_BADFORMAT;
1201         }
1202         if (memcmp(magic, "tokulogg", 8)!=0) {
1203             return DB_BADFORMAT;
1204         }
1205     }
1206     {
1207         int version;
1208             int r=fread(&version, 1, 4, f);
1209         if (r!=4) {
1210             return DB_BADFORMAT;
1211         }
1212         printf("tokulog v.%u\n", toku_ntohl(version));
1213         //version MUST be in network order regardless of disk order
1214         *versionp=toku_ntohl(version);
1215     }
1216     return 0;
1217 }
1218 
toku_read_logmagic(FILE * f,uint32_t * versionp)1219 int toku_read_logmagic (FILE *f, uint32_t *versionp) {
1220     {
1221         char magic[8];
1222         int r=fread(magic, 1, 8, f);
1223         if (r!=8) {
1224             return DB_BADFORMAT;
1225         }
1226         if (memcmp(magic, "tokulogg", 8)!=0) {
1227             return DB_BADFORMAT;
1228         }
1229     }
1230     {
1231         int version;
1232             int r=fread(&version, 1, 4, f);
1233         if (r!=4) {
1234             return DB_BADFORMAT;
1235         }
1236         *versionp=toku_ntohl(version);
1237     }
1238     return 0;
1239 }
1240 
toku_txn_get_txnid(TOKUTXN txn)1241 TXNID_PAIR toku_txn_get_txnid (TOKUTXN txn) {
1242     TXNID_PAIR tp = { .parent_id64 = TXNID_NONE, .child_id64 = TXNID_NONE};
1243     if (txn==0) return tp;
1244     else return txn->txnid;
1245 }
1246 
toku_logger_last_lsn(TOKULOGGER logger)1247 LSN toku_logger_last_lsn(TOKULOGGER logger) {
1248     return logger->lsn;
1249 }
1250 
toku_txn_logger(TOKUTXN txn)1251 TOKULOGGER toku_txn_logger (TOKUTXN txn) {
1252     return txn ? txn->logger : 0;
1253 }
1254 
toku_txnid2txn(TOKULOGGER logger,TXNID_PAIR txnid,TOKUTXN * result)1255 void toku_txnid2txn(TOKULOGGER logger, TXNID_PAIR txnid, TOKUTXN *result) {
1256     TOKUTXN root_txn = NULL;
1257     toku_txn_manager_suspend(logger->txn_manager);
1258     toku_txn_manager_id2txn_unlocked(logger->txn_manager, txnid, &root_txn);
1259     if (root_txn == NULL || root_txn->txnid.child_id64 == txnid.child_id64) {
1260         *result = root_txn;
1261     }
1262     else if (root_txn != NULL) {
1263         root_txn->child_manager->suspend();
1264         root_txn->child_manager->find_tokutxn_by_xid_unlocked(txnid, result);
1265         root_txn->child_manager->resume();
1266     }
1267     toku_txn_manager_resume(logger->txn_manager);
1268 }
1269 
1270 // Find the earliest LSN in a log.  No locks are needed.
peek_at_log(TOKULOGGER logger,char * filename,LSN * first_lsn)1271 static int peek_at_log(TOKULOGGER logger, char *filename, LSN *first_lsn) {
1272     int fd = toku_os_open(
1273         filename, O_RDONLY + O_BINARY, S_IRUSR, *tokudb_file_log_key);
1274     if (fd < 0) {
1275         int er = get_error_errno();
1276         if (logger->write_log_files)
1277             printf("couldn't open: %s\n", strerror(er));
1278         return er;
1279     }
1280     enum { SKIP = 12+1+4 }; // read the 12 byte header, the first message, and the first len
1281     unsigned char header[SKIP+8];
1282     int r = read(fd, header, SKIP+8);
1283     if (r!=SKIP+8) return 0; // cannot determine that it's archivable, so we'll assume no.  If a later-log is archivable is then this one will be too.
1284 
1285     uint64_t lsn;
1286     {
1287         struct rbuf rb;
1288         rb.buf   = header+SKIP;
1289         rb.size  = 8;
1290         rb.ndone = 0;
1291         lsn = rbuf_ulonglong(&rb);
1292     }
1293 
1294     r = toku_os_close(fd);
1295 
1296     if (r != 0) {
1297         return 0;
1298     }
1299 
1300     first_lsn->lsn = lsn;
1301     return 0;
1302 }
1303 
1304 // Return a malloc'd array of malloc'd strings which are the filenames that can be archived.
1305 // Output permission are obtained briefly so we can get a list of the log files without conflicting.
toku_logger_log_archive(TOKULOGGER logger,char *** logs_p,int flags)1306 int toku_logger_log_archive (TOKULOGGER logger, char ***logs_p, int flags) {
1307     if (flags!=0) return EINVAL; // don't know what to do.
1308     int all_n_logs;
1309     int i;
1310     char **all_logs;
1311     int n_logfiles;
1312     LSN fsynced_lsn;
1313     grab_output(logger, &fsynced_lsn);
1314     int r = toku_logger_find_logfiles (logger->directory, &all_logs, &n_logfiles);
1315     release_output(logger, fsynced_lsn);
1316     if (r!=0) return r;
1317 
1318     for (i=0; all_logs[i]; i++);
1319     all_n_logs=i;
1320     // get them into increasing order
1321     qsort(all_logs, all_n_logs, sizeof(all_logs[0]), logfilenamecompare);
1322 
1323     LSN save_lsn = logger->last_completed_checkpoint_lsn;
1324 
1325     // Now starting at the last one, look for archivable ones.
1326     // Count the total number of bytes, because we have to return a single big array.  (That's the BDB interface.  Bleah...)
1327     LSN earliest_lsn_in_logfile={(unsigned long long)(-1LL)};
1328     r = peek_at_log(logger, all_logs[all_n_logs-1], &earliest_lsn_in_logfile); // try to find the lsn that's in the most recent log
1329     if (earliest_lsn_in_logfile.lsn <= save_lsn.lsn) {
1330         i=all_n_logs-1;
1331     } else {
1332         for (i=all_n_logs-2; i>=0; i--) { // start at all_n_logs-2 because we never archive the most recent log
1333             r = peek_at_log(logger, all_logs[i], &earliest_lsn_in_logfile);
1334             if (r!=0) continue; // In case of error, just keep going
1335 
1336             if (earliest_lsn_in_logfile.lsn <= save_lsn.lsn) {
1337                 break;
1338             }
1339         }
1340     }
1341 
1342     // all log files up to, but but not including, i can be archived.
1343     int n_to_archive=i;
1344     int count_bytes=0;
1345     for (i=0; i<n_to_archive; i++) {
1346         count_bytes+=1+strlen(all_logs[i]);
1347     }
1348     char **result;
1349     if (i==0) {
1350         result=0;
1351     } else {
1352         CAST_FROM_VOIDP(result, toku_xmalloc((1+n_to_archive)*sizeof(*result) + count_bytes));
1353         char  *base = (char*)(result+1+n_to_archive);
1354         for (i=0; i<n_to_archive; i++) {
1355             int len=1+strlen(all_logs[i]);
1356             result[i]=base;
1357             memcpy(base, all_logs[i], len);
1358             base+=len;
1359         }
1360         result[n_to_archive]=0;
1361     }
1362     for (i=0; all_logs[i]; i++) {
1363         toku_free(all_logs[i]);
1364     }
1365     toku_free(all_logs);
1366     *logs_p = result;
1367     return 0;
1368 }
1369 
1370 
toku_logger_txn_parent(TOKUTXN txn)1371 TOKUTXN toku_logger_txn_parent (TOKUTXN txn) {
1372     return txn->parent;
1373 }
1374 
toku_logger_note_checkpoint(TOKULOGGER logger,LSN lsn)1375 void toku_logger_note_checkpoint(TOKULOGGER logger, LSN lsn) {
1376     logger->last_completed_checkpoint_lsn = lsn;
1377 }
1378 
1379 void
toku_logger_get_status(TOKULOGGER logger,LOGGER_STATUS statp)1380 toku_logger_get_status(TOKULOGGER logger, LOGGER_STATUS statp) {
1381     log_status.init();
1382     if (logger) {
1383         LOG_STATUS_VAL(LOGGER_NEXT_LSN)    = logger->lsn.lsn;
1384         LOG_STATUS_VAL(LOGGER_NUM_WRITES)  = logger->num_writes_to_disk;
1385         LOG_STATUS_VAL(LOGGER_BYTES_WRITTEN)  = logger->bytes_written_to_disk;
1386         // No compression on logfiles so the uncompressed size is just number of bytes written
1387         LOG_STATUS_VAL(LOGGER_UNCOMPRESSED_BYTES_WRITTEN)  = logger->bytes_written_to_disk;
1388         LOG_STATUS_VAL(LOGGER_TOKUTIME_WRITES) = logger->time_spent_writing_to_disk;
1389         LOG_STATUS_VAL(LOGGER_WAIT_BUF_LONG) = logger->num_wait_buf_long;
1390     }
1391     *statp = log_status;
1392 }
1393 
1394 
1395 
1396 //////////////////////////////////////////////////////////////////////////////////////////////////////
1397 // Used for upgrade:
1398 // if any valid log files exist in log_dir, then
1399 //   set *found_any_logs to true and set *version_found to version number of latest log
1400 int
toku_get_version_of_logs_on_disk(const char * log_dir,bool * found_any_logs,uint32_t * version_found)1401 toku_get_version_of_logs_on_disk(const char *log_dir, bool *found_any_logs, uint32_t *version_found) {
1402     bool found = false;
1403     uint32_t highest_version = 0;
1404     int r = 0;
1405 
1406     struct dirent *de;
1407     DIR *d=opendir(log_dir);
1408     if (d==NULL) {
1409         r = get_error_errno();
1410     }
1411     else {
1412         // Examine every file in the directory and find highest version
1413         while ((de=readdir(d))) {
1414             uint32_t this_log_version;
1415             uint64_t this_log_number;
1416             bool is_log = is_a_logfile_any_version(de->d_name, &this_log_number, &this_log_version);
1417             if (is_log) {
1418                 if (!found) {  // first log file found
1419                     found = true;
1420                     highest_version = this_log_version;
1421                 }
1422                 else
1423                     highest_version = highest_version > this_log_version ? highest_version : this_log_version;
1424             }
1425         }
1426         int r2 = closedir(d);
1427         if (r==0) r = r2;
1428     }
1429     if (r==0) {
1430         *found_any_logs = found;
1431         if (found)
1432             *version_found = highest_version;
1433     }
1434     return r;
1435 }
1436 
toku_logger_get_txn_manager(TOKULOGGER logger)1437 TXN_MANAGER toku_logger_get_txn_manager(TOKULOGGER logger) {
1438     return logger->txn_manager;
1439 }
1440