1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include <memory>
40 #include "ft/cachetable/cachetable.h"
41 #include "ft/cachetable/checkpoint.h"
42 #include "ft/ft.h"
43 #include "ft/log_header.h"
44 #include "ft/logger/log-internal.h"
45 #include "ft/logger/logcursor.h"
46 #include "ft/txn/txn_manager.h"
47 #include "util/omt.h"
48 
49 int tokuft_recovery_trace = 0;                    // turn on recovery tracing, default off.
50 
51 //#define DO_VERIFY_COUNTS
52 #ifdef DO_VERIFY_COUNTS
53 #define VERIFY_COUNTS(n) toku_verify_or_set_counts(n, false)
54 #else
55 #define VERIFY_COUNTS(n) ((void)0)
56 #endif
57 
58 // time in seconds between recovery progress reports
59 #define TOKUFT_RECOVERY_PROGRESS_TIME 15
60 time_t tokuft_recovery_progress_time = TOKUFT_RECOVERY_PROGRESS_TIME;
61 
62 enum ss {
63     BACKWARD_NEWER_CHECKPOINT_END = 1,
64     BACKWARD_BETWEEN_CHECKPOINT_BEGIN_END,
65     FORWARD_BETWEEN_CHECKPOINT_BEGIN_END,
66     FORWARD_NEWER_CHECKPOINT_END,
67 };
68 
69 struct scan_state {
70     enum ss ss;
71     LSN checkpoint_begin_lsn;
72     LSN checkpoint_end_lsn;
73     uint64_t checkpoint_end_timestamp;
74     uint64_t checkpoint_begin_timestamp;
75     uint32_t checkpoint_num_fassociate;
76     uint32_t checkpoint_num_xstillopen;
77     TXNID last_xid;
78 };
79 
80 static const char *scan_state_strings[] = {
81     "?", "bw_newer", "bw_between", "fw_between", "fw_newer",
82 };
83 
scan_state_init(struct scan_state * ss)84 static void scan_state_init(struct scan_state *ss) {
85     ss->ss = BACKWARD_NEWER_CHECKPOINT_END;
86     ss->checkpoint_begin_lsn = ZERO_LSN;
87     ss->checkpoint_end_lsn = ZERO_LSN;
88     ss->checkpoint_num_fassociate = 0;
89     ss->checkpoint_num_xstillopen = 0;
90     ss->last_xid = 0;
91 }
92 
scan_state_string(struct scan_state * ss)93 static const char *scan_state_string(struct scan_state *ss) {
94     assert(BACKWARD_NEWER_CHECKPOINT_END <= ss->ss && ss->ss <= FORWARD_NEWER_CHECKPOINT_END);
95     return scan_state_strings[ss->ss];
96 }
97 
98 // File map tuple
99 struct file_map_tuple {
100     FILENUM filenum;
101     FT_HANDLE ft_handle;     // NULL ft_handle means it's a rollback file.
102     char *iname;
103     struct __toku_db fake_db;
104 };
105 
file_map_tuple_init(struct file_map_tuple * tuple,FILENUM filenum,FT_HANDLE ft_handle,char * iname)106 static void file_map_tuple_init(struct file_map_tuple *tuple, FILENUM filenum, FT_HANDLE ft_handle, char *iname) {
107     tuple->filenum = filenum;
108     tuple->ft_handle = ft_handle;
109     tuple->iname = iname;
110     // use a fake DB for comparisons, using the ft's cmp descriptor
111     memset(&tuple->fake_db, 0, sizeof(tuple->fake_db));
112     tuple->fake_db.cmp_descriptor = &tuple->ft_handle->ft->cmp_descriptor;
113     tuple->fake_db.descriptor = &tuple->ft_handle->ft->descriptor;
114 }
115 
file_map_tuple_destroy(struct file_map_tuple * tuple)116 static void file_map_tuple_destroy(struct file_map_tuple *tuple) {
117     if (tuple->iname) {
118         toku_free(tuple->iname);
119         tuple->iname = NULL;
120     }
121 }
122 
123 // Map filenum to ft_handle
124 struct file_map {
125     toku::omt<struct file_map_tuple *> *filenums;
126 };
127 
128 // The recovery environment
129 struct recover_env {
130     DB_ENV *env;
131     prepared_txn_callback_t    prepared_txn_callback;    // at the end of recovery, all the prepared txns are passed back to the ydb layer to make them into valid transactions.
132     keep_cachetable_callback_t keep_cachetable_callback; // after recovery, store the cachetable into the environment.
133     CACHETABLE ct;
134     TOKULOGGER logger;
135     CHECKPOINTER cp;
136     ft_compare_func bt_compare;
137     ft_update_func update_function;
138     generate_row_for_put_func generate_row_for_put;
139     generate_row_for_del_func generate_row_for_del;
140     DBT_ARRAY dest_keys;
141     DBT_ARRAY dest_vals;
142     struct scan_state ss;
143     struct file_map fmap;
144     bool goforward;
145     bool destroy_logger_at_end; // If true then destroy the logger when we are done.  If false then set the logger into write-files mode when we are done with recovery.*/
146 };
147 typedef struct recover_env *RECOVER_ENV;
148 
149 
file_map_init(struct file_map * fmap)150 static void file_map_init(struct file_map *fmap) {
151     XMALLOC(fmap->filenums);
152     fmap->filenums->create();
153 }
154 
file_map_destroy(struct file_map * fmap)155 static void file_map_destroy(struct file_map *fmap) {
156     fmap->filenums->destroy();
157     toku_free(fmap->filenums);
158     fmap->filenums = nullptr;
159 }
160 
file_map_get_num_dictionaries(struct file_map * fmap)161 static uint32_t file_map_get_num_dictionaries(struct file_map *fmap) {
162     return fmap->filenums->size();
163 }
164 
file_map_close_dictionaries(struct file_map * fmap,LSN oplsn)165 static void file_map_close_dictionaries(struct file_map *fmap, LSN oplsn) {
166     int r;
167 
168     while (1) {
169         uint32_t n = fmap->filenums->size();
170         if (n == 0) {
171             break;
172         }
173         struct file_map_tuple *tuple;
174         r = fmap->filenums->fetch(n - 1, &tuple);
175         assert(r == 0);
176         r = fmap->filenums->delete_at(n - 1);
177         assert(r == 0);
178         assert(tuple->ft_handle);
179         // Logging is on again, but we must pass the right LSN into close.
180         if (tuple->ft_handle) { // it's a DB, not a rollback file
181             toku_ft_handle_close_recovery(tuple->ft_handle, oplsn);
182         }
183         file_map_tuple_destroy(tuple);
184         toku_free(tuple);
185     }
186 }
187 
file_map_h(struct file_map_tuple * const & a,const FILENUM & b)188 static int file_map_h(struct file_map_tuple *const &a, const FILENUM &b) {
189     if (a->filenum.fileid < b.fileid) {
190         return -1;
191     } else if (a->filenum.fileid > b.fileid) {
192         return 1;
193     } else {
194         return 0;
195     }
196 }
197 
file_map_insert(struct file_map * fmap,FILENUM fnum,FT_HANDLE ft_handle,char * iname)198 static int file_map_insert (struct file_map *fmap, FILENUM fnum, FT_HANDLE ft_handle, char *iname) {
199     struct file_map_tuple *XMALLOC(tuple);
200     file_map_tuple_init(tuple, fnum, ft_handle, iname);
201     int r = fmap->filenums->insert<FILENUM, file_map_h>(tuple, fnum, nullptr);
202     return r;
203 }
204 
file_map_remove(struct file_map * fmap,FILENUM fnum)205 static void file_map_remove(struct file_map *fmap, FILENUM fnum) {
206     uint32_t idx;
207     struct file_map_tuple *tuple;
208     int r = fmap->filenums->find_zero<FILENUM, file_map_h>(fnum, &tuple, &idx);
209     if (r == 0) {
210         r = fmap->filenums->delete_at(idx);
211         file_map_tuple_destroy(tuple);
212         toku_free(tuple);
213     }
214 }
215 
216 // Look up file info: given FILENUM, return file_map_tuple (or DB_NOTFOUND)
file_map_find(struct file_map * fmap,FILENUM fnum,struct file_map_tuple ** file_map_tuple)217 static int file_map_find(struct file_map *fmap, FILENUM fnum, struct file_map_tuple **file_map_tuple) {
218     uint32_t idx;
219     struct file_map_tuple *tuple;
220     int r = fmap->filenums->find_zero<FILENUM, file_map_h>(fnum, &tuple, &idx);
221     if (r == 0) {
222         assert(tuple->filenum.fileid == fnum.fileid);
223         *file_map_tuple = tuple;
224     } else {
225         assert(r == DB_NOTFOUND);
226     }
227     return r;
228 }
229 
recover_env_init(RECOVER_ENV renv,const char * env_dir,DB_ENV * env,prepared_txn_callback_t prepared_txn_callback,keep_cachetable_callback_t keep_cachetable_callback,TOKULOGGER logger,ft_compare_func bt_compare,ft_update_func update_function,generate_row_for_put_func generate_row_for_put,generate_row_for_del_func generate_row_for_del,size_t cachetable_size)230 static int recover_env_init (RECOVER_ENV renv,
231                              const char *env_dir,
232                              DB_ENV *env,
233                              prepared_txn_callback_t    prepared_txn_callback,
234                              keep_cachetable_callback_t keep_cachetable_callback,
235                              TOKULOGGER logger,
236                              ft_compare_func bt_compare,
237                              ft_update_func update_function,
238                              generate_row_for_put_func generate_row_for_put,
239                              generate_row_for_del_func generate_row_for_del,
240                              size_t cachetable_size) {
241     int r = 0;
242 
243     // If we are passed a logger use it, otherwise create one.
244     renv->destroy_logger_at_end = logger==NULL;
245     if (logger) {
246         renv->logger = logger;
247     } else {
248         r = toku_logger_create(&renv->logger);
249         assert(r == 0);
250     }
251     toku_logger_write_log_files(renv->logger, false);
252     toku_cachetable_create(&renv->ct, cachetable_size ? cachetable_size : 1<<25, (LSN){0}, renv->logger);
253     toku_cachetable_set_env_dir(renv->ct, env_dir);
254     if (keep_cachetable_callback) keep_cachetable_callback(env, renv->ct);
255     toku_logger_set_cachetable(renv->logger, renv->ct);
256     renv->env                      = env;
257     renv->prepared_txn_callback    = prepared_txn_callback;
258     renv->keep_cachetable_callback = keep_cachetable_callback;
259     renv->bt_compare               = bt_compare;
260     renv->update_function          = update_function;
261     renv->generate_row_for_put     = generate_row_for_put;
262     renv->generate_row_for_del     = generate_row_for_del;
263     file_map_init(&renv->fmap);
264     renv->goforward = false;
265     renv->cp = toku_cachetable_get_checkpointer(renv->ct);
266     toku_dbt_array_init(&renv->dest_keys, 1);
267     toku_dbt_array_init(&renv->dest_vals, 1);
268     if (tokuft_recovery_trace)
269         fprintf(stderr, "%s:%d\n", __FUNCTION__, __LINE__);
270     return r;
271 }
272 
recover_env_cleanup(RECOVER_ENV renv)273 static void recover_env_cleanup (RECOVER_ENV renv) {
274     invariant_zero(renv->fmap.filenums->size());
275     file_map_destroy(&renv->fmap);
276 
277     if (renv->destroy_logger_at_end) {
278         toku_logger_close_rollback(renv->logger);
279         int r = toku_logger_close(&renv->logger);
280         assert(r == 0);
281     } else {
282         toku_logger_write_log_files(renv->logger, true);
283     }
284 
285     if (renv->keep_cachetable_callback) {
286         renv->ct = NULL;
287     } else {
288         toku_cachetable_close(&renv->ct);
289     }
290     toku_dbt_array_destroy(&renv->dest_keys);
291     toku_dbt_array_destroy(&renv->dest_vals);
292 
293     if (tokuft_recovery_trace)
294         fprintf(stderr, "%s:%d\n", __FUNCTION__, __LINE__);
295 }
296 
recover_state(RECOVER_ENV renv)297 static const char *recover_state(RECOVER_ENV renv) {
298     return scan_state_string(&renv->ss);
299 }
300 
301 // Open the file if it is not already open.  If it is already open, then do nothing.
internal_recover_fopen_or_fcreate(RECOVER_ENV renv,bool must_create,int UU (mode),BYTESTRING * bs_iname,FILENUM filenum,uint32_t treeflags,TOKUTXN txn,uint32_t nodesize,uint32_t basementnodesize,enum toku_compression_method compression_method,LSN max_acceptable_lsn)302 static int internal_recover_fopen_or_fcreate (RECOVER_ENV renv, bool must_create, int UU(mode), BYTESTRING *bs_iname, FILENUM filenum, uint32_t treeflags,
303                                               TOKUTXN txn, uint32_t nodesize, uint32_t basementnodesize, enum toku_compression_method compression_method, LSN max_acceptable_lsn) {
304     int r = 0;
305     FT_HANDLE ft_handle = NULL;
306     char *iname = fixup_fname(bs_iname);
307 
308     toku_ft_handle_create(&ft_handle);
309     toku_ft_set_flags(ft_handle, treeflags);
310 
311     if (nodesize != 0) {
312         toku_ft_handle_set_nodesize(ft_handle, nodesize);
313     }
314 
315     if (basementnodesize != 0) {
316         toku_ft_handle_set_basementnodesize(ft_handle, basementnodesize);
317     }
318 
319     if (compression_method != TOKU_DEFAULT_COMPRESSION_METHOD) {
320         toku_ft_handle_set_compression_method(ft_handle, compression_method);
321     }
322 
323     // set the key compare functions
324     if (!(treeflags & TOKU_DB_KEYCMP_BUILTIN) && renv->bt_compare) {
325         toku_ft_set_bt_compare(ft_handle, renv->bt_compare);
326     }
327 
328     if (renv->update_function) {
329         toku_ft_set_update(ft_handle, renv->update_function);
330     }
331 
332     // TODO mode (FUTURE FEATURE)
333     //mode = mode;
334 
335     r = toku_ft_handle_open_recovery(ft_handle, iname, must_create, must_create, renv->ct, txn, filenum, max_acceptable_lsn);
336     if (r != 0) {
337         //Note:  If ft_handle_open fails, then close_ft will NOT write a header to disk.
338         //No need to provide lsn, so use the regular toku_ft_handle_close function
339         toku_ft_handle_close(ft_handle);
340         toku_free(iname);
341         if (r == ENOENT) //Not an error to simply be missing.
342             r = 0;
343         return r;
344     }
345 
346     file_map_insert(&renv->fmap, filenum, ft_handle, iname);
347     return 0;
348 }
349 
toku_recover_begin_checkpoint(struct logtype_begin_checkpoint * l,RECOVER_ENV renv)350 static int toku_recover_begin_checkpoint (struct logtype_begin_checkpoint *l, RECOVER_ENV renv) {
351     int r;
352     TXN_MANAGER mgr = toku_logger_get_txn_manager(renv->logger);
353     switch (renv->ss.ss) {
354     case FORWARD_BETWEEN_CHECKPOINT_BEGIN_END:
355         assert(l->lsn.lsn == renv->ss.checkpoint_begin_lsn.lsn);
356         invariant(renv->ss.last_xid == TXNID_NONE);
357         renv->ss.last_xid = l->last_xid;
358         toku_txn_manager_set_last_xid_from_recovered_checkpoint(mgr, l->last_xid);
359 
360         r = 0;
361         break;
362     case FORWARD_NEWER_CHECKPOINT_END:
363         assert(l->lsn.lsn > renv->ss.checkpoint_end_lsn.lsn);
364         // Verify last_xid is no older than the previous begin
365         invariant(l->last_xid >= renv->ss.last_xid);
366         // Verify last_xid is no older than the newest txn
367         invariant(l->last_xid >= toku_txn_manager_get_last_xid(mgr));
368 
369         r = 0; // ignore it (log only has a begin checkpoint)
370         break;
371     default:
372         fprintf(stderr, "PerconaFT recovery %s: %d Unknown checkpoint state %d\n", __FILE__, __LINE__, (int)renv->ss.ss);
373         abort();
374         break;
375     }
376     return r;
377 }
378 
toku_recover_backward_begin_checkpoint(struct logtype_begin_checkpoint * l,RECOVER_ENV renv)379 static int toku_recover_backward_begin_checkpoint (struct logtype_begin_checkpoint *l, RECOVER_ENV renv) {
380     int r;
381     time_t tnow = time(NULL);
382     fprintf(stderr, "%.24s PerconaFT recovery bw_begin_checkpoint at %" PRIu64 " timestamp %" PRIu64 " (%s)\n", ctime(&tnow), l->lsn.lsn, l->timestamp, recover_state(renv));
383     switch (renv->ss.ss) {
384     case BACKWARD_NEWER_CHECKPOINT_END:
385         // incomplete checkpoint, nothing to do
386         r = 0;
387         break;
388     case BACKWARD_BETWEEN_CHECKPOINT_BEGIN_END:
389         assert(l->lsn.lsn == renv->ss.checkpoint_begin_lsn.lsn);
390         renv->ss.ss = FORWARD_BETWEEN_CHECKPOINT_BEGIN_END;
391         renv->ss.checkpoint_begin_timestamp = l->timestamp;
392         renv->goforward = true;
393         tnow = time(NULL);
394         fprintf(stderr, "%.24s PerconaFT recovery turning around at begin checkpoint %" PRIu64 " time %" PRIu64 "\n",
395                 ctime(&tnow), l->lsn.lsn,
396                 renv->ss.checkpoint_end_timestamp - renv->ss.checkpoint_begin_timestamp);
397         r = 0;
398         break;
399     default:
400         fprintf(stderr, "PerconaFT recovery %s: %d Unknown checkpoint state %d\n", __FILE__, __LINE__, (int)renv->ss.ss);
401         abort();
402         break;
403     }
404     return r;
405 }
406 
toku_recover_end_checkpoint(struct logtype_end_checkpoint * l,RECOVER_ENV renv)407 static int toku_recover_end_checkpoint (struct logtype_end_checkpoint *l, RECOVER_ENV renv) {
408     int r;
409     switch (renv->ss.ss) {
410     case FORWARD_BETWEEN_CHECKPOINT_BEGIN_END:
411         assert(l->lsn_begin_checkpoint.lsn == renv->ss.checkpoint_begin_lsn.lsn);
412         assert(l->lsn.lsn == renv->ss.checkpoint_end_lsn.lsn);
413         assert(l->num_fassociate_entries == renv->ss.checkpoint_num_fassociate);
414         assert(l->num_xstillopen_entries == renv->ss.checkpoint_num_xstillopen);
415         renv->ss.ss = FORWARD_NEWER_CHECKPOINT_END;
416         r = 0;
417         break;
418     case FORWARD_NEWER_CHECKPOINT_END:
419         assert(0);
420         return 0;
421     default:
422         assert(0);
423         return 0;
424     }
425     return r;
426 }
427 
toku_recover_backward_end_checkpoint(struct logtype_end_checkpoint * l,RECOVER_ENV renv)428 static int toku_recover_backward_end_checkpoint (struct logtype_end_checkpoint *l, RECOVER_ENV renv) {
429     time_t tnow = time(NULL);
430     fprintf(stderr, "%.24s PerconaFT recovery bw_end_checkpoint at %" PRIu64 " timestamp %" PRIu64 " xid %" PRIu64 " (%s)\n", ctime(&tnow), l->lsn.lsn, l->timestamp, l->lsn_begin_checkpoint.lsn, recover_state(renv));
431     switch (renv->ss.ss) {
432     case BACKWARD_NEWER_CHECKPOINT_END:
433         renv->ss.ss = BACKWARD_BETWEEN_CHECKPOINT_BEGIN_END;
434         renv->ss.checkpoint_begin_lsn.lsn = l->lsn_begin_checkpoint.lsn;
435         renv->ss.checkpoint_end_lsn.lsn   = l->lsn.lsn;
436         renv->ss.checkpoint_end_timestamp = l->timestamp;
437         return 0;
438     case BACKWARD_BETWEEN_CHECKPOINT_BEGIN_END:
439         fprintf(stderr, "PerconaFT recovery %s:%d Should not see two end_checkpoint log entries without an intervening begin_checkpoint\n", __FILE__, __LINE__);
440         abort();
441     default:
442         break;
443     }
444     fprintf(stderr, "PerconaFT recovery %s: %d Unknown checkpoint state %d\n", __FILE__, __LINE__, (int)renv->ss.ss);
445     abort();
446 }
447 
toku_recover_fassociate(struct logtype_fassociate * l,RECOVER_ENV renv)448 static int toku_recover_fassociate (struct logtype_fassociate *l, RECOVER_ENV renv) {
449     struct file_map_tuple *tuple = NULL;
450     int r = file_map_find(&renv->fmap, l->filenum, &tuple);
451     char *fname = fixup_fname(&l->iname);
452     switch (renv->ss.ss) {
453     case FORWARD_BETWEEN_CHECKPOINT_BEGIN_END:
454         renv->ss.checkpoint_num_fassociate++;
455         assert(r==DB_NOTFOUND); //Not open
456         // Open it if it exists.
457         // If rollback file, specify which checkpointed version of file we need (not just the latest)
458         // because we cannot use a rollback log that is later than the last complete checkpoint.  See #3113.
459         {
460             bool rollback_file = (0==strcmp(fname, toku_product_name_strings.rollback_cachefile));
461             LSN max_acceptable_lsn = MAX_LSN;
462             if (rollback_file) {
463                 max_acceptable_lsn = renv->ss.checkpoint_begin_lsn;
464                 FT_HANDLE t;
465                 toku_ft_handle_create(&t);
466                 r = toku_ft_handle_open_recovery(t, toku_product_name_strings.rollback_cachefile, false, false, renv->ct, (TOKUTXN)NULL, l->filenum, max_acceptable_lsn);
467                 renv->logger->rollback_cachefile = t->ft->cf;
468                 toku_logger_initialize_rollback_cache(renv->logger, t->ft);
469             } else {
470                 r = internal_recover_fopen_or_fcreate(renv, false, 0, &l->iname, l->filenum, l->treeflags, NULL, 0, 0, TOKU_DEFAULT_COMPRESSION_METHOD, max_acceptable_lsn);
471                 assert(r==0);
472             }
473         }
474         // try to open the file again and if we get it, restore
475         // the unlink on close bit.
476         int ret;
477         ret = file_map_find(&renv->fmap, l->filenum, &tuple);
478         if (ret == 0 && l->unlink_on_close) {
479             toku_cachefile_unlink_on_close(tuple->ft_handle->ft->cf);
480         }
481         break;
482     case FORWARD_NEWER_CHECKPOINT_END:
483         if (r == 0) { //IF it is open
484             // assert that the filenum maps to the correct iname
485             assert(strcmp(fname, tuple->iname) == 0);
486         }
487         r = 0;
488         break;
489     default:
490         assert(0);
491         return 0;
492     }
493     toku_free(fname);
494 
495     return r;
496 }
497 
toku_recover_backward_fassociate(struct logtype_fassociate * UU (l),RECOVER_ENV UU (renv))498 static int toku_recover_backward_fassociate (struct logtype_fassociate *UU(l), RECOVER_ENV UU(renv)) {
499     // nothing
500     return 0;
501 }
502 
503 static int
recover_transaction(TOKUTXN * txnp,TXNID_PAIR xid,TXNID_PAIR parentxid,TOKULOGGER logger)504 recover_transaction(TOKUTXN *txnp, TXNID_PAIR xid, TXNID_PAIR parentxid, TOKULOGGER logger) {
505     int r;
506 
507     // lookup the parent
508     TOKUTXN parent = NULL;
509     if (!txn_pair_is_none(parentxid)) {
510         toku_txnid2txn(logger, parentxid, &parent);
511         assert(parent!=NULL);
512     }
513     else {
514         invariant(xid.child_id64 == TXNID_NONE);
515     }
516 
517     // create a transaction and bind it to the transaction id
518     TOKUTXN txn = NULL;
519     {
520         //Verify it does not yet exist.
521         toku_txnid2txn(logger, xid, &txn);
522         assert(txn==NULL);
523     }
524     r = toku_txn_begin_with_xid(
525         parent,
526         &txn,
527         logger,
528         xid,
529         TXN_SNAPSHOT_NONE,
530         NULL,
531         true, // for_recovery
532         false // read_only
533         );
534     assert(r == 0);
535     // We only know about it because it was logged.  Restore the log bit.
536     // Logging is 'off' but it will still set the bit.
537     toku_maybe_log_begin_txn_for_write_operation(txn);
538     if (txnp) *txnp = txn;
539     return 0;
540 }
541 
recover_xstillopen_internal(TOKUTXN * txnp,LSN UU (lsn),TXNID_PAIR xid,TXNID_PAIR parentxid,uint64_t rollentry_raw_count,FILENUMS open_filenums,bool force_fsync_on_commit,uint64_t num_rollback_nodes,uint64_t num_rollentries,BLOCKNUM spilled_rollback_head,BLOCKNUM spilled_rollback_tail,BLOCKNUM current_rollback,uint32_t UU (crc),uint32_t UU (len),RECOVER_ENV renv)542 static int recover_xstillopen_internal (TOKUTXN         *txnp,
543                                         LSN           UU(lsn),
544                                         TXNID_PAIR       xid,
545                                         TXNID_PAIR       parentxid,
546                                         uint64_t        rollentry_raw_count,
547                                         FILENUMS         open_filenums,
548                                         bool             force_fsync_on_commit,
549                                         uint64_t        num_rollback_nodes,
550                                         uint64_t        num_rollentries,
551                                         BLOCKNUM         spilled_rollback_head,
552                                         BLOCKNUM         spilled_rollback_tail,
553                                         BLOCKNUM         current_rollback,
554                                         uint32_t     UU(crc),
555                                         uint32_t     UU(len),
556                                         RECOVER_ENV      renv) {
557     int r;
558     *txnp = NULL;
559     switch (renv->ss.ss) {
560     case FORWARD_BETWEEN_CHECKPOINT_BEGIN_END: {
561         renv->ss.checkpoint_num_xstillopen++;
562         invariant(renv->ss.last_xid != TXNID_NONE);
563         invariant(xid.parent_id64 <= renv->ss.last_xid);
564         TOKUTXN txn = NULL;
565         { //Create the transaction.
566             r = recover_transaction(&txn, xid, parentxid, renv->logger);
567             assert(r==0);
568             assert(txn!=NULL);
569             *txnp = txn;
570         }
571         { //Recover rest of transaction.
572 #define COPY_TO_INFO(field) .field = field
573             struct txninfo info = {
574                 COPY_TO_INFO(rollentry_raw_count),
575                 .num_fts  = 0,    //Set afterwards
576                 .open_fts = NULL, //Set afterwards
577                 COPY_TO_INFO(force_fsync_on_commit),
578                 COPY_TO_INFO(num_rollback_nodes),
579                 COPY_TO_INFO(num_rollentries),
580                 COPY_TO_INFO(spilled_rollback_head),
581                 COPY_TO_INFO(spilled_rollback_tail),
582                 COPY_TO_INFO(current_rollback)
583             };
584 #undef COPY_TO_INFO
585             //Generate open_fts
586             FT array[open_filenums.num]; //Allocate maximum possible requirement
587             info.open_fts = array;
588             uint32_t i;
589             for (i = 0; i < open_filenums.num; i++) {
590                 //open_filenums.filenums[]
591                 struct file_map_tuple *tuple = NULL;
592                 r = file_map_find(&renv->fmap, open_filenums.filenums[i], &tuple);
593                 if (r==0) {
594                     info.open_fts[info.num_fts++] = tuple->ft_handle->ft;
595                 }
596                 else {
597                     assert(r==DB_NOTFOUND);
598                 }
599             }
600             r = toku_txn_load_txninfo(txn, &info);
601             assert(r==0);
602         }
603         break;
604     }
605     case FORWARD_NEWER_CHECKPOINT_END: {
606         // assert that the transaction exists
607         TOKUTXN txn = NULL;
608         toku_txnid2txn(renv->logger, xid, &txn);
609         r = 0;
610         *txnp = txn;
611         break;
612     }
613     default:
614         assert(0);
615         return 0;
616     }
617     return r;
618 }
619 
toku_recover_xstillopen(struct logtype_xstillopen * l,RECOVER_ENV renv)620 static int toku_recover_xstillopen (struct logtype_xstillopen *l, RECOVER_ENV renv) {
621     TOKUTXN txn;
622     return recover_xstillopen_internal (&txn,
623                                         l->lsn,
624                                         l->xid,
625                                         l->parentxid,
626                                         l->rollentry_raw_count,
627                                         l->open_filenums,
628                                         l->force_fsync_on_commit,
629                                         l->num_rollback_nodes,
630                                         l->num_rollentries,
631                                         l->spilled_rollback_head,
632                                         l->spilled_rollback_tail,
633                                         l->current_rollback,
634                                         l->crc,
635                                         l->len,
636                                         renv);
637 }
638 
toku_recover_xstillopenprepared(struct logtype_xstillopenprepared * l,RECOVER_ENV renv)639 static int toku_recover_xstillopenprepared (struct logtype_xstillopenprepared *l, RECOVER_ENV renv) {
640     TOKUTXN txn;
641     int r = recover_xstillopen_internal (&txn,
642                                          l->lsn,
643                                          l->xid,
644                                          TXNID_PAIR_NONE,
645                                          l->rollentry_raw_count,
646                                          l->open_filenums,
647                                          l->force_fsync_on_commit,
648                                          l->num_rollback_nodes,
649                                          l->num_rollentries,
650                                          l->spilled_rollback_head,
651                                          l->spilled_rollback_tail,
652                                          l->current_rollback,
653                                          l->crc,
654                                          l->len,
655                                          renv);
656     if (r != 0) {
657         goto exit;
658     }
659     switch (renv->ss.ss) {
660         case FORWARD_BETWEEN_CHECKPOINT_BEGIN_END: {
661             toku_txn_prepare_txn(txn, l->xa_xid, 0);
662             break;
663         }
664         case FORWARD_NEWER_CHECKPOINT_END: {
665             assert(txn->state == TOKUTXN_PREPARING);
666             break;
667         }
668         default: {
669             assert(0);
670         }
671     }
672 exit:
673     return r;
674 }
675 
toku_recover_backward_xstillopen(struct logtype_xstillopen * UU (l),RECOVER_ENV UU (renv))676 static int toku_recover_backward_xstillopen (struct logtype_xstillopen *UU(l), RECOVER_ENV UU(renv)) {
677     // nothing
678     return 0;
679 }
toku_recover_backward_xstillopenprepared(struct logtype_xstillopenprepared * UU (l),RECOVER_ENV UU (renv))680 static int toku_recover_backward_xstillopenprepared (struct logtype_xstillopenprepared *UU(l), RECOVER_ENV UU(renv)) {
681     // nothing
682     return 0;
683 }
684 
toku_recover_xbegin(struct logtype_xbegin * l,RECOVER_ENV renv)685 static int toku_recover_xbegin (struct logtype_xbegin *l, RECOVER_ENV renv) {
686     int r;
687     r = recover_transaction(NULL, l->xid, l->parentxid, renv->logger);
688     return r;
689 }
690 
toku_recover_backward_xbegin(struct logtype_xbegin * UU (l),RECOVER_ENV UU (renv))691 static int toku_recover_backward_xbegin (struct logtype_xbegin *UU(l), RECOVER_ENV UU(renv)) {
692     // nothing
693     return 0;
694 }
695 
696 struct toku_txn_progress_extra {
697     time_t tlast;
698     LSN lsn;
699     const char *type;
700     TXNID_PAIR xid;
701     uint64_t last_total;
702 };
703 
toku_recover_txn_progress(TOKU_TXN_PROGRESS txn_progress,void * extra)704 static void toku_recover_txn_progress(TOKU_TXN_PROGRESS txn_progress, void *extra) {
705     toku_txn_progress_extra *txn_progress_extra = static_cast<toku_txn_progress_extra *>(extra);
706     if (txn_progress_extra->last_total == 0)
707         txn_progress_extra->last_total = txn_progress->entries_total;
708     else
709         assert(txn_progress_extra->last_total == txn_progress->entries_total);
710     time_t tnow = time(NULL);
711     if (tnow - txn_progress_extra->tlast >= tokuft_recovery_progress_time) {
712         txn_progress_extra->tlast = tnow;
713         fprintf(stderr, "%.24s PerconaFT ", ctime(&tnow));
714         if (txn_progress_extra->lsn.lsn != 0)
715             fprintf(stderr, "lsn %" PRIu64 " ", txn_progress_extra->lsn.lsn);
716         fprintf(stderr, "%s xid %" PRIu64 ":%" PRIu64 " ",
717                 txn_progress_extra->type, txn_progress_extra->xid.parent_id64, txn_progress_extra->xid.child_id64);
718         fprintf(stderr, "%" PRIu64 "/%" PRIu64 " ",
719                 txn_progress->entries_processed, txn_progress->entries_total);
720         if (txn_progress->entries_total > 0)
721             fprintf(stderr, "%.0f%% ", ((double) txn_progress->entries_processed / (double) txn_progress->entries_total) * 100.0);
722         fprintf(stderr, "\n");
723     }
724 }
725 
toku_recover_xcommit(struct logtype_xcommit * l,RECOVER_ENV renv)726 static int toku_recover_xcommit (struct logtype_xcommit *l, RECOVER_ENV renv) {
727     // find the transaction by transaction id
728     TOKUTXN txn = NULL;
729     toku_txnid2txn(renv->logger, l->xid, &txn);
730     assert(txn!=NULL);
731 
732     // commit the transaction
733     toku_txn_progress_extra extra = { time(NULL), l->lsn, "commit", l->xid, 0 };
734     int r = toku_txn_commit_with_lsn(txn, true, l->lsn, toku_recover_txn_progress, &extra);
735     assert(r == 0);
736 
737     // close the transaction
738     toku_txn_close_txn(txn);
739 
740     return 0;
741 }
742 
toku_recover_backward_xcommit(struct logtype_xcommit * UU (l),RECOVER_ENV UU (renv))743 static int toku_recover_backward_xcommit (struct logtype_xcommit *UU(l), RECOVER_ENV UU(renv)) {
744     // nothing
745     return 0;
746 }
747 
toku_recover_xprepare(struct logtype_xprepare * l,RECOVER_ENV renv)748 static int toku_recover_xprepare (struct logtype_xprepare *l, RECOVER_ENV renv) {
749     // find the transaction by transaction id
750     TOKUTXN txn = NULL;
751     toku_txnid2txn(renv->logger, l->xid, &txn);
752     assert(txn!=NULL);
753 
754     // Save the transaction
755     toku_txn_prepare_txn(txn, l->xa_xid, 0);
756 
757     return 0;
758 }
759 
toku_recover_backward_xprepare(struct logtype_xprepare * UU (l),RECOVER_ENV UU (renv))760 static int toku_recover_backward_xprepare (struct logtype_xprepare *UU(l), RECOVER_ENV UU(renv)) {
761     // nothing
762     return 0;
763 }
764 
765 
766 
toku_recover_xabort(struct logtype_xabort * l,RECOVER_ENV renv)767 static int toku_recover_xabort (struct logtype_xabort *l, RECOVER_ENV renv) {
768     int r;
769 
770     // find the transaction by transaction id
771     TOKUTXN txn = NULL;
772     toku_txnid2txn(renv->logger, l->xid, &txn);
773     assert(txn!=NULL);
774 
775     // abort the transaction
776     toku_txn_progress_extra extra = { time(NULL), l->lsn, "abort", l->xid, 0 };
777     r = toku_txn_abort_with_lsn(txn, l->lsn, toku_recover_txn_progress, &extra);
778     assert(r == 0);
779 
780     // close the transaction
781     toku_txn_close_txn(txn);
782 
783     return 0;
784 }
785 
toku_recover_backward_xabort(struct logtype_xabort * UU (l),RECOVER_ENV UU (renv))786 static int toku_recover_backward_xabort (struct logtype_xabort *UU(l), RECOVER_ENV UU(renv)) {
787     // nothing
788     return 0;
789 }
790 
791 // fcreate is like fopen except that the file must be created.
toku_recover_fcreate(struct logtype_fcreate * l,RECOVER_ENV renv)792 static int toku_recover_fcreate (struct logtype_fcreate *l, RECOVER_ENV renv) {
793     int r;
794 
795     TOKUTXN txn = NULL;
796     toku_txnid2txn(renv->logger, l->xid, &txn);
797 
798     // assert that filenum is closed
799     struct file_map_tuple *tuple = NULL;
800     r = file_map_find(&renv->fmap, l->filenum, &tuple);
801     assert(r==DB_NOTFOUND);
802 
803     assert(txn!=NULL);
804 
805     //unlink if it exists (recreate from scratch).
806     char *iname = fixup_fname(&l->iname);
807     char *iname_in_cwd = toku_cachetable_get_fname_in_cwd(renv->ct, iname);
808     r = unlink(iname_in_cwd);
809     if (r != 0) {
810         int er = get_error_errno();
811         if (er != ENOENT) {
812             fprintf(stderr, "PerconaFT recovery %s:%d unlink %s %d\n", __FUNCTION__, __LINE__, iname, er);
813             toku_free(iname);
814             return r;
815         }
816     }
817     assert(0!=strcmp(iname, toku_product_name_strings.rollback_cachefile)); //Creation of rollback cachefile never gets logged.
818     toku_free(iname_in_cwd);
819     toku_free(iname);
820 
821     bool must_create = true;
822     r = internal_recover_fopen_or_fcreate(renv, must_create, l->mode, &l->iname, l->filenum, l->treeflags, txn, l->nodesize, l->basementnodesize, (enum toku_compression_method) l->compression_method, MAX_LSN);
823     return r;
824 }
825 
toku_recover_backward_fcreate(struct logtype_fcreate * UU (l),RECOVER_ENV UU (renv))826 static int toku_recover_backward_fcreate (struct logtype_fcreate *UU(l), RECOVER_ENV UU(renv)) {
827     // nothing
828     return 0;
829 }
830 
831 
832 
toku_recover_fopen(struct logtype_fopen * l,RECOVER_ENV renv)833 static int toku_recover_fopen (struct logtype_fopen *l, RECOVER_ENV renv) {
834     int r;
835 
836     // assert that filenum is closed
837     struct file_map_tuple *tuple = NULL;
838     r = file_map_find(&renv->fmap, l->filenum, &tuple);
839     assert(r==DB_NOTFOUND);
840 
841     bool must_create = false;
842     TOKUTXN txn = NULL;
843     char *fname = fixup_fname(&l->iname);
844 
845     assert(0!=strcmp(fname, toku_product_name_strings.rollback_cachefile)); //Rollback cachefile can be opened only via fassociate.
846     r = internal_recover_fopen_or_fcreate(renv, must_create, 0, &l->iname, l->filenum, l->treeflags, txn, 0, 0, TOKU_DEFAULT_COMPRESSION_METHOD, MAX_LSN);
847 
848     toku_free(fname);
849     return r;
850 }
851 
toku_recover_backward_fopen(struct logtype_fopen * UU (l),RECOVER_ENV UU (renv))852 static int toku_recover_backward_fopen (struct logtype_fopen *UU(l), RECOVER_ENV UU(renv)) {
853     // nothing
854     return 0;
855 }
856 
toku_recover_change_fdescriptor(struct logtype_change_fdescriptor * l,RECOVER_ENV renv)857 static int toku_recover_change_fdescriptor (struct logtype_change_fdescriptor *l, RECOVER_ENV renv) {
858     int r;
859     struct file_map_tuple *tuple = NULL;
860     r = file_map_find(&renv->fmap, l->filenum, &tuple);
861     if (r==0) {
862         TOKUTXN txn = NULL;
863         //Maybe do the descriptor (lsn filter)
864         toku_txnid2txn(renv->logger, l->xid, &txn);
865         DBT old_descriptor, new_descriptor;
866         toku_fill_dbt(
867             &old_descriptor,
868             l->old_descriptor.data,
869             l->old_descriptor.len
870             );
871         toku_fill_dbt(
872             &new_descriptor,
873             l->new_descriptor.data,
874             l->new_descriptor.len
875             );
876         toku_ft_change_descriptor(
877             tuple->ft_handle,
878             &old_descriptor,
879             &new_descriptor,
880             false,
881             txn,
882             l->update_cmp_descriptor
883             );
884     }
885     return 0;
886 }
887 
toku_recover_backward_change_fdescriptor(struct logtype_change_fdescriptor * UU (l),RECOVER_ENV UU (renv))888 static int toku_recover_backward_change_fdescriptor (struct logtype_change_fdescriptor *UU(l), RECOVER_ENV UU(renv)) {
889     return 0;
890 }
891 
892 
893 // if file referred to in l is open, close it
toku_recover_fclose(struct logtype_fclose * l,RECOVER_ENV renv)894 static int toku_recover_fclose (struct logtype_fclose *l, RECOVER_ENV renv) {
895     struct file_map_tuple *tuple = NULL;
896     int r = file_map_find(&renv->fmap, l->filenum, &tuple);
897     if (r == 0) {  // if file is open
898         char *iname = fixup_fname(&l->iname);
899         assert(strcmp(tuple->iname, iname) == 0);  // verify that file_map has same iname as log entry
900 
901         if (0!=strcmp(iname, toku_product_name_strings.rollback_cachefile)) {
902             //Rollback cachefile is closed manually at end of recovery, not here
903             toku_ft_handle_close_recovery(tuple->ft_handle, l->lsn);
904         }
905         file_map_remove(&renv->fmap, l->filenum);
906         toku_free(iname);
907     }
908     return 0;
909 }
910 
toku_recover_backward_fclose(struct logtype_fclose * UU (l),RECOVER_ENV UU (renv))911 static int toku_recover_backward_fclose (struct logtype_fclose *UU(l), RECOVER_ENV UU(renv)) {
912     // nothing
913     return 0;
914 }
915 
916 // fdelete is a transactional file delete.
toku_recover_fdelete(struct logtype_fdelete * l,RECOVER_ENV renv)917 static int toku_recover_fdelete (struct logtype_fdelete *l, RECOVER_ENV renv) {
918     TOKUTXN txn = NULL;
919     toku_txnid2txn(renv->logger, l->xid, &txn);
920     assert(txn != NULL);
921 
922     // if the forward scan in recovery found this file and opened it, we
923     // need to mark the txn to remove the ft on commit. if the file was
924     // not found and not opened, we don't need to do anything - the ft
925     // is already gone, so we're happy.
926     struct file_map_tuple *tuple;
927     int r = file_map_find(&renv->fmap, l->filenum, &tuple);
928     if (r == 0) {
929         toku_ft_unlink_on_commit(tuple->ft_handle, txn);
930     }
931     return 0;
932 }
933 
toku_recover_backward_fdelete(struct logtype_fdelete * UU (l),RECOVER_ENV UU (renv))934 static int toku_recover_backward_fdelete (struct logtype_fdelete *UU(l), RECOVER_ENV UU(renv)) {
935     // nothing
936     return 0;
937 }
938 
toku_recover_frename(struct logtype_frename * l,RECOVER_ENV renv)939 static int toku_recover_frename(struct logtype_frename *l, RECOVER_ENV renv) {
940     assert(renv);
941     assert(renv->env);
942 
943     toku_struct_stat stat;
944     const char *data_dir = renv->env->get_data_dir(renv->env);
945     bool old_exist = true;
946     bool new_exist = true;
947 
948     assert(data_dir);
949 
950     struct file_map_tuple *tuple;
951 
952     std::unique_ptr<char[], decltype(&toku_free)> old_iname_full(
953         toku_construct_full_name(2, data_dir, l->old_iname.data), &toku_free);
954     std::unique_ptr<char[], decltype(&toku_free)> new_iname_full(
955         toku_construct_full_name(2, data_dir, l->new_iname.data), &toku_free);
956 
957     if (toku_stat(old_iname_full.get(), &stat, toku_uninstrumented) == -1) {
958         if (ENOENT == errno)
959             old_exist = false;
960         else
961             return 1;
962     }
963 
964     if (toku_stat(new_iname_full.get(), &stat, toku_uninstrumented) == -1) {
965         if (ENOENT == errno)
966             new_exist = false;
967         else
968             return 1;
969     }
970 
971     // Both old and new files can exist if:
972     // - rename() is not completed
973     // - fcreate was replayed during recovery
974     // 'Stalled cachefiles' container cachefile_list::m_stale_fileid contains
975     // closed but not yet evicted cachefiles and the key of this container is
976     // fs-dependent file id - (device id, inode number) pair. As it is supposed
977     // new file have not yet created during recovery process the 'stalled
978     // cachefile' container can contain only cache file of old file.
979     // To preserve the old cachefile file's id and keep it in
980     // 'stalled cachefiles' container the new file is removed
981     // and the old file is renamed.
982     if (old_exist && new_exist &&
983         (toku_os_delete(new_iname_full.get()) == -1 ||
984          toku_os_rename(old_iname_full.get(), new_iname_full.get()) == -1 ||
985          toku_fsync_directory(old_iname_full.get()) == -1 ||
986          toku_fsync_directory(new_iname_full.get()) == -1))
987         return 1;
988 
989     if (old_exist && !new_exist &&
990         (!toku_create_subdirs_if_needed(new_iname_full.get()) ||
991          toku_os_rename(old_iname_full.get(), new_iname_full.get()) == -1 ||
992          toku_fsync_directory(old_iname_full.get()) == -1 ||
993          toku_fsync_directory(new_iname_full.get()) == -1))
994         return 1;
995 
996     if (file_map_find(&renv->fmap, l->old_filenum, &tuple) != DB_NOTFOUND) {
997         if (tuple->iname)
998             toku_free(tuple->iname);
999         tuple->iname = toku_xstrdup(l->new_iname.data);
1000     }
1001 
1002     TOKUTXN txn = NULL;
1003     toku_txnid2txn(renv->logger, l->xid, &txn);
1004 
1005     if (txn)
1006         toku_logger_save_rollback_frename(txn, &l->old_iname, &l->new_iname);
1007 
1008     return 0;
1009 }
1010 
toku_recover_backward_frename(struct logtype_frename * UU (l),RECOVER_ENV UU (renv))1011 static int toku_recover_backward_frename(struct logtype_frename *UU(l),
1012                                          RECOVER_ENV UU(renv)) {
1013     // nothing
1014     return 0;
1015 }
1016 
toku_recover_enq_insert(struct logtype_enq_insert * l,RECOVER_ENV renv)1017 static int toku_recover_enq_insert (struct logtype_enq_insert *l, RECOVER_ENV renv) {
1018     int r;
1019     TOKUTXN txn = NULL;
1020     toku_txnid2txn(renv->logger, l->xid, &txn);
1021     assert(txn!=NULL);
1022     struct file_map_tuple *tuple = NULL;
1023     r = file_map_find(&renv->fmap, l->filenum, &tuple);
1024     if (r==0) {
1025         //Maybe do the insertion if we found the cachefile.
1026         DBT keydbt, valdbt;
1027         toku_fill_dbt(&keydbt, l->key.data, l->key.len);
1028         toku_fill_dbt(&valdbt, l->value.data, l->value.len);
1029         toku_ft_maybe_insert(tuple->ft_handle, &keydbt, &valdbt, txn, true, l->lsn, false, FT_INSERT);
1030         toku_txn_maybe_note_ft(txn, tuple->ft_handle->ft);
1031     }
1032     return 0;
1033 }
1034 
toku_recover_backward_enq_insert(struct logtype_enq_insert * UU (l),RECOVER_ENV UU (renv))1035 static int toku_recover_backward_enq_insert (struct logtype_enq_insert *UU(l), RECOVER_ENV UU(renv)) {
1036     // nothing
1037     return 0;
1038 }
1039 
toku_recover_enq_insert_no_overwrite(struct logtype_enq_insert_no_overwrite * l,RECOVER_ENV renv)1040 static int toku_recover_enq_insert_no_overwrite (struct logtype_enq_insert_no_overwrite *l, RECOVER_ENV renv) {
1041     int r;
1042     TOKUTXN txn = NULL;
1043     toku_txnid2txn(renv->logger, l->xid, &txn);
1044     assert(txn!=NULL);
1045     struct file_map_tuple *tuple = NULL;
1046     r = file_map_find(&renv->fmap, l->filenum, &tuple);
1047     if (r==0) {
1048         //Maybe do the insertion if we found the cachefile.
1049         DBT keydbt, valdbt;
1050         toku_fill_dbt(&keydbt, l->key.data, l->key.len);
1051         toku_fill_dbt(&valdbt, l->value.data, l->value.len);
1052         toku_ft_maybe_insert(tuple->ft_handle, &keydbt, &valdbt, txn, true, l->lsn, false, FT_INSERT_NO_OVERWRITE);
1053     }
1054     return 0;
1055 }
1056 
toku_recover_backward_enq_insert_no_overwrite(struct logtype_enq_insert_no_overwrite * UU (l),RECOVER_ENV UU (renv))1057 static int toku_recover_backward_enq_insert_no_overwrite (struct logtype_enq_insert_no_overwrite *UU(l), RECOVER_ENV UU(renv)) {
1058     // nothing
1059     return 0;
1060 }
1061 
toku_recover_enq_delete_any(struct logtype_enq_delete_any * l,RECOVER_ENV renv)1062 static int toku_recover_enq_delete_any (struct logtype_enq_delete_any *l, RECOVER_ENV renv) {
1063     int r;
1064     TOKUTXN txn = NULL;
1065     toku_txnid2txn(renv->logger, l->xid, &txn);
1066     assert(txn!=NULL);
1067     struct file_map_tuple *tuple = NULL;
1068     r = file_map_find(&renv->fmap, l->filenum, &tuple);
1069     if (r==0) {
1070         //Maybe do the deletion if we found the cachefile.
1071         DBT keydbt;
1072         toku_fill_dbt(&keydbt, l->key.data, l->key.len);
1073         toku_ft_maybe_delete(tuple->ft_handle, &keydbt, txn, true, l->lsn, false);
1074     }
1075     return 0;
1076 }
1077 
toku_recover_backward_enq_delete_any(struct logtype_enq_delete_any * UU (l),RECOVER_ENV UU (renv))1078 static int toku_recover_backward_enq_delete_any (struct logtype_enq_delete_any *UU(l), RECOVER_ENV UU(renv)) {
1079     // nothing
1080     return 0;
1081 }
1082 
toku_recover_enq_insert_multiple(struct logtype_enq_insert_multiple * l,RECOVER_ENV renv)1083 static int toku_recover_enq_insert_multiple (struct logtype_enq_insert_multiple *l, RECOVER_ENV renv) {
1084     int r;
1085     TOKUTXN txn = NULL;
1086     toku_txnid2txn(renv->logger, l->xid, &txn);
1087     assert(txn!=NULL);
1088     DB *src_db = NULL;
1089     bool do_inserts = true;
1090     {
1091         struct file_map_tuple *tuple = NULL;
1092         r = file_map_find(&renv->fmap, l->src_filenum, &tuple);
1093         if (l->src_filenum.fileid == FILENUM_NONE.fileid)
1094             assert(r==DB_NOTFOUND);
1095         else {
1096             if (r == 0)
1097                 src_db = &tuple->fake_db;
1098             else
1099                 do_inserts = false; // src file was probably deleted, #3129
1100         }
1101     }
1102 
1103     if (do_inserts) {
1104         DBT src_key, src_val;
1105 
1106         toku_fill_dbt(&src_key, l->src_key.data, l->src_key.len);
1107         toku_fill_dbt(&src_val, l->src_val.data, l->src_val.len);
1108 
1109         for (uint32_t file = 0; file < l->dest_filenums.num; file++) {
1110             struct file_map_tuple *tuple = NULL;
1111             r = file_map_find(&renv->fmap, l->dest_filenums.filenums[file], &tuple);
1112             if (r==0) {
1113                 // We found the cachefile.  (maybe) Do the insert.
1114                 DB *db = &tuple->fake_db;
1115 
1116                 DBT_ARRAY key_array;
1117                 DBT_ARRAY val_array;
1118                 if (db != src_db) {
1119                     r = renv->generate_row_for_put(db, src_db, &renv->dest_keys, &renv->dest_vals, &src_key, &src_val);
1120                     assert(r==0);
1121                     invariant(renv->dest_keys.size <= renv->dest_keys.capacity);
1122                     invariant(renv->dest_vals.size <= renv->dest_vals.capacity);
1123                     invariant(renv->dest_keys.size == renv->dest_vals.size);
1124                     key_array = renv->dest_keys;
1125                     val_array = renv->dest_vals;
1126                 } else {
1127                     key_array.size = key_array.capacity = 1;
1128                     key_array.dbts = &src_key;
1129 
1130                     val_array.size = val_array.capacity = 1;
1131                     val_array.dbts = &src_val;
1132                 }
1133                 for (uint32_t i = 0; i < key_array.size; i++) {
1134                     toku_ft_maybe_insert(tuple->ft_handle, &key_array.dbts[i], &val_array.dbts[i], txn, true, l->lsn, false, FT_INSERT);
1135                 }
1136             }
1137         }
1138     }
1139 
1140     return 0;
1141 }
1142 
toku_recover_backward_enq_insert_multiple(struct logtype_enq_insert_multiple * UU (l),RECOVER_ENV UU (renv))1143 static int toku_recover_backward_enq_insert_multiple (struct logtype_enq_insert_multiple *UU(l), RECOVER_ENV UU(renv)) {
1144     // nothing
1145     return 0;
1146 }
1147 
toku_recover_enq_delete_multiple(struct logtype_enq_delete_multiple * l,RECOVER_ENV renv)1148 static int toku_recover_enq_delete_multiple (struct logtype_enq_delete_multiple *l, RECOVER_ENV renv) {
1149     int r;
1150     TOKUTXN txn = NULL;
1151     toku_txnid2txn(renv->logger, l->xid, &txn);
1152     assert(txn!=NULL);
1153     DB *src_db = NULL;
1154     bool do_deletes = true;
1155     {
1156         struct file_map_tuple *tuple = NULL;
1157         r = file_map_find(&renv->fmap, l->src_filenum, &tuple);
1158         if (l->src_filenum.fileid == FILENUM_NONE.fileid)
1159             assert(r==DB_NOTFOUND);
1160         else {
1161             if (r == 0) {
1162                 src_db = &tuple->fake_db;
1163             } else {
1164                 do_deletes = false; // src file was probably deleted, #3129
1165             }
1166         }
1167     }
1168 
1169     if (do_deletes) {
1170         DBT src_key, src_val;
1171         toku_fill_dbt(&src_key, l->src_key.data, l->src_key.len);
1172         toku_fill_dbt(&src_val, l->src_val.data, l->src_val.len);
1173 
1174         for (uint32_t file = 0; file < l->dest_filenums.num; file++) {
1175             struct file_map_tuple *tuple = NULL;
1176             r = file_map_find(&renv->fmap, l->dest_filenums.filenums[file], &tuple);
1177             if (r==0) {
1178                 // We found the cachefile.  (maybe) Do the delete.
1179                 DB *db = &tuple->fake_db;
1180 
1181                 DBT_ARRAY key_array;
1182                 if (db != src_db) {
1183                     r = renv->generate_row_for_del(db, src_db, &renv->dest_keys, &src_key, &src_val);
1184                     assert(r==0);
1185                     invariant(renv->dest_keys.size <= renv->dest_keys.capacity);
1186                     key_array = renv->dest_keys;
1187                 } else {
1188                     key_array.size = key_array.capacity = 1;
1189                     key_array.dbts = &src_key;
1190                 }
1191                 for (uint32_t i = 0; i < key_array.size; i++) {
1192                     toku_ft_maybe_delete(tuple->ft_handle, &key_array.dbts[i], txn, true, l->lsn, false);
1193                 }
1194             }
1195         }
1196     }
1197 
1198     return 0;
1199 }
1200 
toku_recover_backward_enq_delete_multiple(struct logtype_enq_delete_multiple * UU (l),RECOVER_ENV UU (renv))1201 static int toku_recover_backward_enq_delete_multiple (struct logtype_enq_delete_multiple *UU(l), RECOVER_ENV UU(renv)) {
1202     // nothing
1203     return 0;
1204 }
1205 
toku_recover_enq_update(struct logtype_enq_update * l,RECOVER_ENV renv)1206 static int toku_recover_enq_update(struct logtype_enq_update *l, RECOVER_ENV renv) {
1207     int r;
1208     TOKUTXN txn = NULL;
1209     toku_txnid2txn(renv->logger, l->xid, &txn);
1210     assert(txn != NULL);
1211     struct file_map_tuple *tuple = NULL;
1212     r = file_map_find(&renv->fmap, l->filenum, &tuple);
1213     if (r == 0) {
1214         // Maybe do the update if we found the cachefile.
1215         DBT key, extra;
1216         toku_fill_dbt(&key, l->key.data, l->key.len);
1217         toku_fill_dbt(&extra, l->extra.data, l->extra.len);
1218         toku_ft_maybe_update(tuple->ft_handle, &key, &extra, txn, true, l->lsn, false);
1219     }
1220     return 0;
1221 }
1222 
toku_recover_enq_updatebroadcast(struct logtype_enq_updatebroadcast * l,RECOVER_ENV renv)1223 static int toku_recover_enq_updatebroadcast(struct logtype_enq_updatebroadcast *l, RECOVER_ENV renv) {
1224     int r;
1225     TOKUTXN txn = NULL;
1226     toku_txnid2txn(renv->logger, l->xid, &txn);
1227     assert(txn != NULL);
1228     struct file_map_tuple *tuple = NULL;
1229     r = file_map_find(&renv->fmap, l->filenum, &tuple);
1230     if (r == 0) {
1231         // Maybe do the update broadcast if we found the cachefile.
1232         DBT extra;
1233         toku_fill_dbt(&extra, l->extra.data, l->extra.len);
1234         toku_ft_maybe_update_broadcast(tuple->ft_handle, &extra, txn, true,
1235                                             l->lsn, false, l->is_resetting_op);
1236     }
1237     return 0;
1238 }
1239 
toku_recover_backward_enq_update(struct logtype_enq_update * UU (l),RECOVER_ENV UU (renv))1240 static int toku_recover_backward_enq_update(struct logtype_enq_update *UU(l), RECOVER_ENV UU(renv)) {
1241     // nothing
1242     return 0;
1243 }
1244 
toku_recover_backward_enq_updatebroadcast(struct logtype_enq_updatebroadcast * UU (l),RECOVER_ENV UU (renv))1245 static int toku_recover_backward_enq_updatebroadcast(struct logtype_enq_updatebroadcast *UU(l), RECOVER_ENV UU(renv)) {
1246     // nothing
1247     return 0;
1248 }
1249 
toku_recover_comment(struct logtype_comment * UU (l),RECOVER_ENV UU (renv))1250 static int toku_recover_comment (struct logtype_comment *UU(l), RECOVER_ENV UU(renv)) {
1251     // nothing
1252     return 0;
1253 }
1254 
toku_recover_backward_comment(struct logtype_comment * UU (l),RECOVER_ENV UU (renv))1255 static int toku_recover_backward_comment (struct logtype_comment *UU(l), RECOVER_ENV UU(renv)) {
1256     // nothing
1257     return 0;
1258 }
1259 
toku_recover_shutdown_up_to_19(struct logtype_shutdown_up_to_19 * UU (l),RECOVER_ENV UU (renv))1260 static int toku_recover_shutdown_up_to_19 (struct logtype_shutdown_up_to_19 *UU(l), RECOVER_ENV UU(renv)) {
1261     // nothing
1262     return 0;
1263 }
1264 
toku_recover_backward_shutdown_up_to_19(struct logtype_shutdown_up_to_19 * UU (l),RECOVER_ENV UU (renv))1265 static int toku_recover_backward_shutdown_up_to_19 (struct logtype_shutdown_up_to_19 *UU(l), RECOVER_ENV UU(renv)) {
1266     // nothing
1267     return 0;
1268 }
1269 
toku_recover_shutdown(struct logtype_shutdown * UU (l),RECOVER_ENV UU (renv))1270 static int toku_recover_shutdown (struct logtype_shutdown *UU(l), RECOVER_ENV UU(renv)) {
1271     // nothing
1272     return 0;
1273 }
1274 
toku_recover_backward_shutdown(struct logtype_shutdown * UU (l),RECOVER_ENV UU (renv))1275 static int toku_recover_backward_shutdown (struct logtype_shutdown *UU(l), RECOVER_ENV UU(renv)) {
1276     // nothing
1277     return 0;
1278 }
1279 
toku_recover_load(struct logtype_load * UU (l),RECOVER_ENV UU (renv))1280 static int toku_recover_load(struct logtype_load *UU(l), RECOVER_ENV UU(renv)) {
1281     TOKUTXN txn = NULL;
1282     toku_txnid2txn(renv->logger, l->xid, &txn);
1283     assert(txn!=NULL);
1284     char *new_iname = fixup_fname(&l->new_iname);
1285 
1286     toku_ft_load_recovery(txn, l->old_filenum, new_iname, 0, 0, (LSN*)NULL);
1287 
1288     toku_free(new_iname);
1289     return 0;
1290 }
1291 
toku_recover_backward_load(struct logtype_load * UU (l),RECOVER_ENV UU (renv))1292 static int toku_recover_backward_load(struct logtype_load *UU(l), RECOVER_ENV UU(renv)) {
1293     // nothing
1294     return 0;
1295 }
1296 
1297 // #2954
toku_recover_hot_index(struct logtype_hot_index * UU (l),RECOVER_ENV UU (renv))1298 static int toku_recover_hot_index(struct logtype_hot_index *UU(l), RECOVER_ENV UU(renv)) {
1299     TOKUTXN txn = NULL;
1300     toku_txnid2txn(renv->logger, l->xid, &txn);
1301     assert(txn!=NULL);
1302     // just make an entry in the rollback log
1303     //   - set do_log = 0 -> don't write to recovery log
1304     toku_ft_hot_index_recovery(txn, l->hot_index_filenums, 0, 0, (LSN*)NULL);
1305     return 0;
1306 }
1307 
1308 // #2954
toku_recover_backward_hot_index(struct logtype_hot_index * UU (l),RECOVER_ENV UU (renv))1309 static int toku_recover_backward_hot_index(struct logtype_hot_index *UU(l), RECOVER_ENV UU(renv)) {
1310     // nothing
1311     return 0;
1312 }
1313 
1314 // Effects: If there are no log files, or if there is a clean "shutdown" at
1315 // the end of the log, then we don't need recovery to run.
1316 // Returns: true if we need recovery, otherwise false.
tokuft_needs_recovery(const char * log_dir,bool ignore_log_empty)1317 int tokuft_needs_recovery(const char *log_dir, bool ignore_log_empty) {
1318     int needs_recovery;
1319     int r;
1320     TOKULOGCURSOR logcursor = NULL;
1321 
1322     r = toku_logcursor_create(&logcursor, log_dir);
1323     if (r != 0) {
1324         needs_recovery = true; goto exit;
1325     }
1326 
1327     struct log_entry *le;
1328     le = NULL;
1329     r = toku_logcursor_last(logcursor, &le);
1330     if (r == 0) {
1331         needs_recovery = le->cmd != LT_shutdown;
1332     }
1333     else {
1334         needs_recovery = !(r == DB_NOTFOUND && ignore_log_empty);
1335     }
1336  exit:
1337     if (logcursor) {
1338         r = toku_logcursor_destroy(&logcursor);
1339         assert(r == 0);
1340     }
1341     return needs_recovery;
1342 }
1343 
recover_get_num_live_txns(RECOVER_ENV renv)1344 static uint32_t recover_get_num_live_txns(RECOVER_ENV renv) {
1345     return toku_txn_manager_num_live_root_txns(renv->logger->txn_manager);
1346 }
1347 
is_txn_unprepared(TOKUTXN txn,void * extra)1348 static int is_txn_unprepared(TOKUTXN txn, void* extra) {
1349     TOKUTXN* ptxn = (TOKUTXN *)extra;
1350     if (txn->state != TOKUTXN_PREPARING) {
1351         *ptxn = txn;
1352         return -1; // return -1 to get iterator to return
1353     }
1354     return 0;
1355 }
1356 
find_an_unprepared_txn(RECOVER_ENV renv,TOKUTXN * txnp)1357 static int find_an_unprepared_txn (RECOVER_ENV renv, TOKUTXN *txnp) {
1358     TOKUTXN txn = nullptr;
1359     int r = toku_txn_manager_iter_over_live_root_txns(
1360         renv->logger->txn_manager,
1361         is_txn_unprepared,
1362         &txn
1363         );
1364     assert(r == 0 || r == -1);
1365     if (txn != nullptr) {
1366         *txnp = txn;
1367         return 0;
1368     }
1369     return DB_NOTFOUND;
1370 }
1371 
call_prepare_txn_callback_iter(TOKUTXN txn,void * extra)1372 static int call_prepare_txn_callback_iter(TOKUTXN txn, void* extra) {
1373     RECOVER_ENV* renv = (RECOVER_ENV *)extra;
1374     invariant(txn->state == TOKUTXN_PREPARING);
1375     invariant(txn->child == NULL);
1376     (*renv)->prepared_txn_callback((*renv)->env, txn);
1377     return 0;
1378 }
1379 
recover_abort_live_txn(TOKUTXN txn)1380 static void recover_abort_live_txn(TOKUTXN txn) {
1381     fprintf(stderr, "%s %" PRIu64 "\n", __FUNCTION__, txn->txnid.parent_id64);
1382     // recursively abort all children first
1383     if (txn->child != NULL) {
1384         recover_abort_live_txn(txn->child);
1385     }
1386     // sanity check that the recursive call successfully NULLs out txn->child
1387     invariant(txn->child == NULL);
1388     // abort the transaction
1389     toku_txn_progress_extra extra = { time(NULL), ZERO_LSN, "abort live", txn->txnid, 0 };
1390     int r = toku_txn_abort_txn(txn, toku_recover_txn_progress, &extra);
1391     assert(r == 0);
1392 
1393     // close the transaction
1394     toku_txn_close_txn(txn);
1395 }
1396 
1397 // abort all of the remaining live transactions in descending transaction id order
recover_abort_all_live_txns(RECOVER_ENV renv)1398 static void recover_abort_all_live_txns(RECOVER_ENV renv) {
1399     while (1) {
1400         TOKUTXN txn;
1401         int r = find_an_unprepared_txn(renv, &txn);
1402         if (r==0) {
1403             recover_abort_live_txn(txn);
1404         } else if (r==DB_NOTFOUND) {
1405             break;
1406         } else {
1407             abort();
1408         }
1409     }
1410 
1411     // Now we have only prepared txns.  These prepared txns don't have full DB_TXNs in them, so we need to make some.
1412     int r = toku_txn_manager_iter_over_live_root_txns(
1413         renv->logger->txn_manager,
1414         call_prepare_txn_callback_iter,
1415         &renv
1416         );
1417     assert_zero(r);
1418 }
1419 
recover_trace_le(const char * f,int l,int r,struct log_entry * le)1420 static void recover_trace_le(const char *f, int l, int r, struct log_entry *le) {
1421     if (le) {
1422         LSN thislsn = toku_log_entry_get_lsn(le);
1423         fprintf(stderr, "%s:%d r=%d cmd=%c lsn=%" PRIu64 "\n", f, l, r, le->cmd, thislsn.lsn);
1424     } else
1425         fprintf(stderr, "%s:%d r=%d cmd=?\n", f, l, r);
1426 }
1427 
1428 // For test purposes only.
1429 static void (*recover_callback_fx)(void*)  = NULL;
1430 static void * recover_callback_args        = NULL;
1431 static void (*recover_callback2_fx)(void*) = NULL;
1432 static void * recover_callback2_args       = NULL;
1433 
1434 
do_recovery(RECOVER_ENV renv,const char * env_dir,const char * log_dir)1435 static int do_recovery(RECOVER_ENV renv, const char *env_dir, const char *log_dir) {
1436     int r;
1437     int rr = 0;
1438     TOKULOGCURSOR logcursor = NULL;
1439     struct log_entry *le = NULL;
1440 
1441     time_t tnow = time(NULL);
1442     fprintf(stderr, "%.24s PerconaFT recovery starting in env %s\n", ctime(&tnow), env_dir);
1443 
1444     char org_wd[1000];
1445     {
1446         char *wd=getcwd(org_wd, sizeof(org_wd));
1447         assert(wd!=0);
1448     }
1449 
1450     r = toku_logger_open(log_dir, renv->logger);
1451     assert(r == 0);
1452 
1453     // grab the last LSN so that it can be restored when the log is restarted
1454     LSN lastlsn = toku_logger_last_lsn(renv->logger);
1455     LSN thislsn;
1456 
1457     // there must be at least one log entry
1458     r = toku_logcursor_create(&logcursor, log_dir);
1459     assert(r == 0);
1460 
1461     r = toku_logcursor_last(logcursor, &le);
1462     if (r != 0) {
1463         if (tokuft_recovery_trace)
1464             fprintf(stderr, "RUNRECOVERY: %s:%d r=%d\n", __FUNCTION__, __LINE__, r);
1465         rr = DB_RUNRECOVERY; goto errorexit;
1466     }
1467 
1468     r = toku_logcursor_destroy(&logcursor);
1469     assert(r == 0);
1470 
1471     r = toku_logcursor_create(&logcursor, log_dir);
1472     assert(r == 0);
1473 
1474     {
1475         toku_struct_stat buf;
1476         if (toku_stat(env_dir, &buf, toku_uninstrumented)) {
1477             rr = get_error_errno();
1478             fprintf(stderr,
1479                     "%.24s PerconaFT recovery error: directory does not exist: "
1480                     "%s\n",
1481                     ctime(&tnow),
1482                     env_dir);
1483             goto errorexit;
1484         } else if (!S_ISDIR(buf.st_mode)) {
1485             fprintf(stderr, "%.24s PerconaFT recovery error: this file is supposed to be a directory, but is not: %s\n", ctime(&tnow), env_dir);
1486             rr = ENOTDIR; goto errorexit;
1487         }
1488     }
1489     // scan backwards
1490     scan_state_init(&renv->ss);
1491     tnow = time(NULL);
1492     time_t tlast;
1493     tlast = tnow;
1494     fprintf(stderr, "%.24s PerconaFT recovery scanning backward from %" PRIu64 "\n", ctime(&tnow), lastlsn.lsn);
1495     for (unsigned i=0; 1; i++) {
1496 
1497         // get the previous log entry (first time gets the last one)
1498         le = NULL;
1499         r = toku_logcursor_prev(logcursor, &le);
1500         if (tokuft_recovery_trace)
1501             recover_trace_le(__FUNCTION__, __LINE__, r, le);
1502         if (r != 0) {
1503             if (r == DB_NOTFOUND)
1504                 break;
1505             rr = DB_RUNRECOVERY;
1506             goto errorexit;
1507         }
1508 
1509         // trace progress
1510         if ((i % 1000) == 0) {
1511             tnow = time(NULL);
1512             if (tnow - tlast >= tokuft_recovery_progress_time) {
1513                 thislsn = toku_log_entry_get_lsn(le);
1514                 fprintf(stderr, "%.24s PerconaFT recovery scanning backward from %" PRIu64 " at %" PRIu64 " (%s)\n",
1515                         ctime(&tnow), lastlsn.lsn, thislsn.lsn, recover_state(renv));
1516                 tlast = tnow;
1517             }
1518         }
1519 
1520         // dispatch the log entry handler
1521         assert(renv->ss.ss == BACKWARD_BETWEEN_CHECKPOINT_BEGIN_END ||
1522                renv->ss.ss == BACKWARD_NEWER_CHECKPOINT_END);
1523         logtype_dispatch_assign(le, toku_recover_backward_, r, renv);
1524         if (tokuft_recovery_trace)
1525             recover_trace_le(__FUNCTION__, __LINE__, r, le);
1526         if (r != 0) {
1527             if (tokuft_recovery_trace)
1528                 fprintf(stderr, "DB_RUNRECOVERY: %s:%d r=%d\n", __FUNCTION__, __LINE__, r);
1529             rr = DB_RUNRECOVERY;
1530             goto errorexit;
1531         }
1532         if (renv->goforward)
1533             break;
1534     }
1535 
1536     // run first callback
1537     if (recover_callback_fx)
1538         recover_callback_fx(recover_callback_args);
1539 
1540     // scan forwards
1541     assert(le);
1542     thislsn = toku_log_entry_get_lsn(le);
1543     tnow = time(NULL);
1544     fprintf(stderr, "%.24s PerconaFT recovery starts scanning forward to %" PRIu64 " from %" PRIu64 " left %" PRIu64 " (%s)\n",
1545             ctime(&tnow), lastlsn.lsn, thislsn.lsn, lastlsn.lsn - thislsn.lsn, recover_state(renv));
1546 
1547     for (unsigned i=0; 1; i++) {
1548 
1549         // trace progress
1550         if ((i % 1000) == 0) {
1551             tnow = time(NULL);
1552             if (tnow - tlast >= tokuft_recovery_progress_time) {
1553                 thislsn = toku_log_entry_get_lsn(le);
1554                 fprintf(stderr, "%.24s PerconaFT recovery scanning forward to %" PRIu64 " at %" PRIu64 " left %" PRIu64 " (%s)\n",
1555                         ctime(&tnow), lastlsn.lsn, thislsn.lsn, lastlsn.lsn - thislsn.lsn, recover_state(renv));
1556                 tlast = tnow;
1557             }
1558         }
1559 
1560         // dispatch the log entry handler (first time calls the forward handler for the log entry at the turnaround
1561         assert(renv->ss.ss == FORWARD_BETWEEN_CHECKPOINT_BEGIN_END ||
1562                renv->ss.ss == FORWARD_NEWER_CHECKPOINT_END);
1563         logtype_dispatch_assign(le, toku_recover_, r, renv);
1564         if (tokuft_recovery_trace)
1565             recover_trace_le(__FUNCTION__, __LINE__, r, le);
1566         if (r != 0) {
1567             if (tokuft_recovery_trace)
1568                 fprintf(stderr, "DB_RUNRECOVERY: %s:%d r=%d\n", __FUNCTION__, __LINE__, r);
1569             rr = DB_RUNRECOVERY;
1570             goto errorexit;
1571         }
1572 
1573         // get the next log entry
1574         le = NULL;
1575         r = toku_logcursor_next(logcursor, &le);
1576         if (tokuft_recovery_trace)
1577             recover_trace_le(__FUNCTION__, __LINE__, r, le);
1578         if (r != 0) {
1579             if (r == DB_NOTFOUND)
1580                 break;
1581             rr = DB_RUNRECOVERY;
1582             goto errorexit;
1583         }
1584     }
1585 
1586     // verify the final recovery state
1587     assert(renv->ss.ss == FORWARD_NEWER_CHECKPOINT_END);
1588 
1589     r = toku_logcursor_destroy(&logcursor);
1590     assert(r == 0);
1591 
1592     // run second callback
1593     if (recover_callback2_fx)
1594         recover_callback2_fx(recover_callback2_args);
1595 
1596     // restart logging
1597     toku_logger_restart(renv->logger, lastlsn);
1598 
1599     // abort the live transactions
1600     {
1601         uint32_t n = recover_get_num_live_txns(renv);
1602         if (n > 0) {
1603             tnow = time(NULL);
1604             fprintf(stderr, "%.24s PerconaFT recovery has %" PRIu32 " live transaction%s\n", ctime(&tnow), n, n > 1 ? "s" : "");
1605         }
1606     }
1607     recover_abort_all_live_txns(renv);
1608     {
1609         uint32_t n = recover_get_num_live_txns(renv);
1610         if (n > 0) {
1611             tnow = time(NULL);
1612             fprintf(stderr, "%.24s PerconaFT recovery has %" PRIu32 " prepared transaction%s\n", ctime(&tnow), n, n > 1 ? "s" : "");
1613         }
1614     }
1615 
1616     // close the open dictionaries
1617     uint32_t n;
1618     n = file_map_get_num_dictionaries(&renv->fmap);
1619     if (n > 0) {
1620         tnow = time(NULL);
1621         fprintf(stderr, "%.24s PerconaFT recovery closing %" PRIu32 " dictionar%s\n", ctime(&tnow), n, n > 1 ? "ies" : "y");
1622     }
1623     file_map_close_dictionaries(&renv->fmap, lastlsn);
1624 
1625     {
1626         // write a recovery log entry
1627         BYTESTRING recover_comment = { static_cast<uint32_t>(strlen("recover")), (char *) "recover" };
1628         toku_log_comment(renv->logger, NULL, true, 0, recover_comment);
1629     }
1630 
1631     // checkpoint
1632     tnow = time(NULL);
1633     fprintf(stderr, "%.24s PerconaFT recovery making a checkpoint\n", ctime(&tnow));
1634     r = toku_checkpoint(renv->cp, renv->logger, NULL, NULL, NULL, NULL, RECOVERY_CHECKPOINT);
1635     assert(r == 0);
1636     tnow = time(NULL);
1637     fprintf(stderr, "%.24s PerconaFT recovery done\n", ctime(&tnow));
1638 
1639     return 0;
1640 
1641  errorexit:
1642     tnow = time(NULL);
1643     fprintf(stderr, "%.24s PerconaFT recovery failed %d\n", ctime(&tnow), rr);
1644 
1645     if (logcursor) {
1646         r = toku_logcursor_destroy(&logcursor);
1647         assert(r == 0);
1648     }
1649 
1650     return rr;
1651 }
1652 
1653 int
toku_recover_lock(const char * lock_dir,int * lockfd)1654 toku_recover_lock(const char *lock_dir, int *lockfd) {
1655     int e = toku_single_process_lock(lock_dir, "recovery", lockfd);
1656     if (e != 0 && e != ENOENT) {
1657         fprintf(stderr, "Couldn't run recovery because some other process holds the recovery lock\n");
1658     }
1659     return e;
1660 }
1661 
1662 int
toku_recover_unlock(int lockfd)1663 toku_recover_unlock(int lockfd) {
1664     int lockfd_copy = lockfd;
1665     return toku_single_process_unlock(&lockfd_copy);
1666 }
1667 
tokuft_recover(DB_ENV * env,prepared_txn_callback_t prepared_txn_callback,keep_cachetable_callback_t keep_cachetable_callback,TOKULOGGER logger,const char * env_dir,const char * log_dir,ft_compare_func bt_compare,ft_update_func update_function,generate_row_for_put_func generate_row_for_put,generate_row_for_del_func generate_row_for_del,size_t cachetable_size)1668 int tokuft_recover(DB_ENV *env,
1669                    prepared_txn_callback_t    prepared_txn_callback,
1670                    keep_cachetable_callback_t keep_cachetable_callback,
1671                    TOKULOGGER logger,
1672                    const char *env_dir, const char *log_dir,
1673                    ft_compare_func bt_compare,
1674                    ft_update_func update_function,
1675                    generate_row_for_put_func generate_row_for_put,
1676                    generate_row_for_del_func generate_row_for_del,
1677                    size_t cachetable_size) {
1678     int r;
1679     int lockfd = -1;
1680 
1681     r = toku_recover_lock(log_dir, &lockfd);
1682     if (r != 0)
1683         return r;
1684 
1685     int rr = 0;
1686     if (tokuft_needs_recovery(log_dir, false)) {
1687         struct recover_env renv;
1688         r = recover_env_init(&renv,
1689                              env_dir,
1690                              env,
1691                              prepared_txn_callback,
1692                              keep_cachetable_callback,
1693                              logger,
1694                              bt_compare,
1695                              update_function,
1696                              generate_row_for_put,
1697                              generate_row_for_del,
1698                              cachetable_size);
1699         assert(r == 0);
1700 
1701         rr = do_recovery(&renv, env_dir, log_dir);
1702 
1703         recover_env_cleanup(&renv);
1704     }
1705 
1706     r = toku_recover_unlock(lockfd);
1707     if (r != 0)
1708         return r;
1709 
1710     return rr;
1711 }
1712 
1713 // Return 0 if recovery log exists, ENOENT if log is missing
1714 int
tokuft_recover_log_exists(const char * log_dir)1715 tokuft_recover_log_exists(const char * log_dir) {
1716     int r;
1717     TOKULOGCURSOR logcursor;
1718 
1719     r = toku_logcursor_create(&logcursor, log_dir);
1720     if (r == 0) {
1721         int rclose;
1722         r = toku_logcursor_log_exists(logcursor);  // return ENOENT if no log
1723         rclose = toku_logcursor_destroy(&logcursor);
1724         assert(rclose == 0);
1725     }
1726     else
1727         r = ENOENT;
1728 
1729     return r;
1730 }
1731 
toku_recover_set_callback(void (* callback_fx)(void *),void * callback_args)1732 void toku_recover_set_callback (void (*callback_fx)(void*), void* callback_args) {
1733     recover_callback_fx   = callback_fx;
1734     recover_callback_args = callback_args;
1735 }
1736 
toku_recover_set_callback2(void (* callback_fx)(void *),void * callback_args)1737 void toku_recover_set_callback2 (void (*callback_fx)(void*), void* callback_args) {
1738     recover_callback2_fx   = callback_fx;
1739     recover_callback2_args = callback_args;
1740 }
1741