1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 /* rollback and rollforward routines. */
40 
41 #include <memory>
42 #include "ft/ft-ops.h"
43 #include "ft/ft.h"
44 #include "ft/log_header.h"
45 #include "ft/logger/log-internal.h"
46 #include "ft/txn/rollback-apply.h"
47 #include "ft/txn/xids.h"
48 
49 // functionality provided by roll.c is exposed by an autogenerated
50 // header file, logheader.h
51 //
52 // this (poorly) explains the absence of "roll.h"
53 
54 // these flags control whether or not we send commit messages for
55 // various operations
56 
57 // When a transaction is committed, should we send a FT_COMMIT message
58 // for each FT_INSERT message sent earlier by the transaction?
59 #define TOKU_DO_COMMIT_CMD_INSERT 0
60 
61 // When a transaction is committed, should we send a FT_COMMIT message
62 // for each FT_DELETE_ANY message sent earlier by the transaction?
63 #define TOKU_DO_COMMIT_CMD_DELETE 1
64 
65 // When a transaction is committed, should we send a FT_COMMIT message
66 // for each FT_UPDATE message sent earlier by the transaction?
67 #define TOKU_DO_COMMIT_CMD_UPDATE 0
68 
69 int
toku_commit_fdelete(FILENUM filenum,TOKUTXN txn,LSN UU (oplsn))70 toku_commit_fdelete (FILENUM    filenum,
71                      TOKUTXN    txn,
72                      LSN        UU(oplsn)) //oplsn is the lsn of the commit
73 {
74     int r;
75     CACHEFILE cf;
76     CACHETABLE ct = txn->logger->ct;
77 
78     // Try to get the cachefile for this filenum. A missing file on recovery
79     // is not an error, but a missing file outside of recovery is.
80     r = toku_cachefile_of_filenum(ct, filenum, &cf);
81     if (r == ENOENT) {
82         assert(txn->for_recovery);
83         r = 0;
84         goto done;
85     }
86     assert_zero(r);
87 
88     // bug fix for #4718
89     // bug was introduced in with fix for #3590
90     // Before Maxwell (and fix for #3590),
91     // the recovery log was fsynced after the xcommit was loged but
92     // before we processed rollback entries and before we released
93     // the row locks (in the lock tree). Due to performance concerns,
94     // the fsync was moved to after the release of row locks, which comes
95     // after processing rollback entries. As a result, we may be unlinking a file
96     // here as part of a transactoin that may abort if we do not fsync the log.
97     // So, we fsync the log here.
98     if (txn->logger) {
99         toku_logger_fsync_if_lsn_not_fsynced(txn->logger, txn->do_fsync_lsn);
100     }
101 
102     // Mark the cachefile as unlink on close. There are two ways for close
103     // to be eventually called on the cachefile:
104     //
105     // - when this txn completes, it will release a reference on the
106     // ft and close it, UNLESS it was pinned by checkpoint
107     // - if the cf was pinned by checkpoint, an unpin will release the
108     // final reference and call close. it must be the final reference
109     // since this txn has exclusive access to dictionary (by the
110     // directory row lock for its dname) and we would not get this
111     // far if there were other live handles.
112     toku_cachefile_unlink_on_close(cf);
113 done:
114     return r;
115 }
116 
117 int
toku_rollback_fdelete(FILENUM UU (filenum),TOKUTXN UU (txn),LSN UU (oplsn))118 toku_rollback_fdelete (FILENUM    UU(filenum),
119                        TOKUTXN    UU(txn),
120                        LSN        UU(oplsn)) //oplsn is the lsn of the abort
121 {
122     //Rolling back an fdelete is an no-op.
123     return 0;
124 }
125 
126 int
toku_commit_fcreate(FILENUM UU (filenum),BYTESTRING UU (bs_fname),TOKUTXN UU (txn),LSN UU (oplsn))127 toku_commit_fcreate (FILENUM UU(filenum),
128                      BYTESTRING UU(bs_fname),
129                      TOKUTXN    UU(txn),
130                      LSN        UU(oplsn))
131 {
132     return 0;
133 }
134 
135 int
toku_rollback_fcreate(FILENUM filenum,BYTESTRING UU (bs_fname),TOKUTXN txn,LSN UU (oplsn))136 toku_rollback_fcreate (FILENUM    filenum,
137                        BYTESTRING UU(bs_fname),
138                        TOKUTXN    txn,
139                        LSN        UU(oplsn))
140 {
141     int r;
142     CACHEFILE cf;
143     CACHETABLE ct = txn->logger->ct;
144 
145     // Try to get the cachefile for this filenum. A missing file on recovery
146     // is not an error, but a missing file outside of recovery is.
147     r = toku_cachefile_of_filenum(ct, filenum, &cf);
148     if (r == ENOENT) {
149         r = 0;
150         goto done;
151     }
152     assert_zero(r);
153 
154     // Mark the cachefile as unlink on close. There are two ways for close
155     // to be eventually called on the cachefile:
156     //
157     // - when this txn completes, it will release a reference on the
158     // ft and close it, UNLESS it was pinned by checkpoint
159     // - if the cf was pinned by checkpoint, an unpin will release the
160     // final reference and call close. it must be the final reference
161     // since this txn has exclusive access to dictionary (by the
162     // directory row lock for its dname) and we would not get this
163     // far if there were other live handles.
164     toku_cachefile_unlink_on_close(cf);
165     toku_cachefile_skip_log_recover_on_close(cf);
166 done:
167     return 0;
168 }
169 
toku_commit_frename(BYTESTRING,BYTESTRING,TOKUTXN,LSN UU (oplsn))170 int toku_commit_frename(BYTESTRING /* old_name */,
171                         BYTESTRING /* new_iname */,
172                         TOKUTXN /* txn */,
173                         LSN UU(oplsn)) {
174     return 0;
175 }
176 
toku_rollback_frename(BYTESTRING old_iname,BYTESTRING new_iname,TOKUTXN txn,LSN UU (oplsn))177 int toku_rollback_frename(BYTESTRING old_iname,
178                           BYTESTRING new_iname,
179                           TOKUTXN txn,
180                           LSN UU(oplsn)) {
181     assert(txn);
182     assert(txn->logger);
183     assert(txn->logger->ct);
184 
185     CACHETABLE cachetable = txn->logger->ct;
186 
187     toku_struct_stat stat;
188     bool old_exist = true;
189     bool new_exist = true;
190 
191     std::unique_ptr<char[], decltype(&toku_free)> old_iname_full(
192         toku_cachetable_get_fname_in_cwd(cachetable, old_iname.data),
193         &toku_free);
194     std::unique_ptr<char[], decltype(&toku_free)> new_iname_full(
195         toku_cachetable_get_fname_in_cwd(cachetable, new_iname.data),
196         &toku_free);
197 
198     if (toku_stat(old_iname_full.get(), &stat, toku_uninstrumented) == -1) {
199         if (ENOENT == errno)
200             old_exist = false;
201         else
202             return 1;
203     }
204 
205     if (toku_stat(new_iname_full.get(), &stat, toku_uninstrumented) == -1) {
206         if (ENOENT == errno || ENAMETOOLONG == errno)
207             new_exist = false;
208         else
209             return 1;
210     }
211 
212     // Both old and new files can exist if:
213     // - rename() is not completed
214     // - fcreate was replayed during recovery
215     // 'Stalled cachefiles' container cachefile_list::m_stale_fileid contains
216     // closed but not yet evicted cachefiles and the key of this container is
217     // fs-dependent file id - (device id, inode number) pair. To preserve the
218     // new cachefile
219     // file's id and keep it in 'stalled cachefiles' container the old file is
220     // removed
221     // and the new file is renamed.
222     if (old_exist && new_exist &&
223         (toku_os_delete(old_iname_full.get()) == -1 ||
224          toku_os_rename(new_iname_full.get(), old_iname_full.get()) == -1 ||
225          toku_fsync_directory(new_iname_full.get()) == -1 ||
226          toku_fsync_directory(old_iname_full.get()) == -1))
227         return 1;
228 
229     if (!old_exist && new_exist &&
230         (!toku_create_subdirs_if_needed(old_iname_full.get()) ||
231          toku_os_rename(new_iname_full.get(), old_iname_full.get()) == -1 ||
232          toku_fsync_directory(new_iname_full.get()) == -1 ||
233          toku_fsync_directory(old_iname_full.get()) == -1))
234         return 1;
235 
236     // it's ok if both files do not exist on recovery
237     if (!old_exist && !new_exist)
238         assert(txn->for_recovery);
239 
240     CACHEFILE cf;
241     int r = toku_cachefile_of_iname_in_env(cachetable, new_iname.data, &cf);
242     if (r != ENOENT) {
243         char *old_fname_in_cf = toku_cachefile_fname_in_env(cf);
244         toku_cachefile_set_fname_in_env(cf, toku_xstrdup(old_iname.data));
245         toku_free(old_fname_in_cf);
246         // There is at least one case when fclose logging cause error:
247         // 1) start transaction
248         // 2) create ft 'a'(write "fcreate" in recovery log)
249         // 3) rename ft 'a' to 'b'(write "frename" in recovery log)
250         // 4) abort transaction:
251         //    a) rollback rename ft (renames 'b' to 'a')
252         //    b) rollback create ft (removes 'a'):
253         //       invokes toku_cachefile_unlink_on_close - lazy unlink on file
254         //       close,
255         //       it just sets corresponding flag in cachefile object
256         //    c) write "unlink" for 'a' in recovery log
257         //       (when transaction is aborted all locks are released,
258         //       when file lock is released the file is closed and unlinked if
259         //       corresponding flag is set in cachefile object)
260         // 5) crash
261         //
262         // After this we have the following records in recovery log:
263         // - create ft 'a',
264         // - rename 'a' to 'b',
265         // - unlink 'a'
266         //
267         // On recovery:
268         // - create 'a'
269         // - rename 'a' to 'b'
270         // - unlink 'a' - as 'a' file does not exist we have crash on assert
271         // here
272         //
273         // There is no need to write "unlink" in recovery log in (4a) because
274         // 'a' will be removed
275         // on transaction rollback on recovery.
276         toku_cachefile_skip_log_recover_on_close(cf);
277     }
278 
279     return 0;
280 }
281 
282 int find_ft_from_filenum (const FT &ft, const FILENUM &filenum);
find_ft_from_filenum(const FT & ft,const FILENUM & filenum)283 int find_ft_from_filenum (const FT &ft, const FILENUM &filenum) {
284     FILENUM thisfnum = toku_cachefile_filenum(ft->cf);
285     if (thisfnum.fileid<filenum.fileid) return -1;
286     if (thisfnum.fileid>filenum.fileid) return +1;
287     return 0;
288 }
289 
290 // Input arg reset_root_xid_that_created true means that this operation has changed the definition of this dictionary.
291 // (Example use is for schema change committed with txn that inserted cmdupdatebroadcast message.)
292 // The oplsn argument is ZERO_LSN for normal operation.  When this function is called for recovery, it has the LSN of
293 // the operation (insert, delete, update, etc).
do_insertion(enum ft_msg_type type,FILENUM filenum,BYTESTRING key,BYTESTRING * data,TOKUTXN txn,LSN oplsn,bool reset_root_xid_that_created)294 static int do_insertion (enum ft_msg_type type, FILENUM filenum, BYTESTRING key, BYTESTRING *data, TOKUTXN txn, LSN oplsn,
295                          bool reset_root_xid_that_created) {
296     int r = 0;
297     //printf("%s:%d committing insert %s %s\n", __FILE__, __LINE__, key.data, data.data);
298     FT ft = nullptr;
299     r = txn->open_fts.find_zero<FILENUM, find_ft_from_filenum>(filenum, &ft, NULL);
300     if (r == DB_NOTFOUND) {
301         assert(txn->for_recovery);
302         r = 0;
303         goto done;
304     }
305     assert(r==0);
306 
307     if (oplsn.lsn != 0) {  // if we are executing the recovery algorithm
308         LSN treelsn = toku_ft_checkpoint_lsn(ft);
309         if (oplsn.lsn <= treelsn.lsn) {  // if operation was already applied to tree ...
310             r = 0;                       // ... do not apply it again.
311             goto done;
312         }
313     }
314 
315     DBT key_dbt,data_dbt;
316     XIDS xids;
317     xids = toku_txn_get_xids(txn);
318     {
319         const DBT *kdbt = key.len > 0 ? toku_fill_dbt(&key_dbt, key.data, key.len) :
320                                         toku_init_dbt(&key_dbt);
321         const DBT *vdbt = data ? toku_fill_dbt(&data_dbt, data->data, data->len) :
322                                  toku_init_dbt(&data_dbt);
323         ft_msg msg(kdbt, vdbt, type, ZERO_MSN, xids);
324 
325         TXN_MANAGER txn_manager = toku_logger_get_txn_manager(txn->logger);
326         txn_manager_state txn_state_for_gc(txn_manager);
327 
328         TXNID oldest_referenced_xid_estimate = toku_txn_manager_get_oldest_referenced_xid_estimate(txn_manager);
329         txn_gc_info gc_info(&txn_state_for_gc,
330                             oldest_referenced_xid_estimate,
331                             // no messages above us, we can implicitly promote uxrs based on this xid
332                             oldest_referenced_xid_estimate,
333                             !txn->for_recovery);
334         toku_ft_root_put_msg(ft, msg, &gc_info);
335         if (reset_root_xid_that_created) {
336             TXNID new_root_xid_that_created = toku_xids_get_outermost_xid(xids);
337             toku_reset_root_xid_that_created(ft, new_root_xid_that_created);
338         }
339     }
340 done:
341     return r;
342 }
343 
344 
do_nothing_with_filenum(TOKUTXN UU (txn),FILENUM UU (filenum))345 static int do_nothing_with_filenum(TOKUTXN UU(txn), FILENUM UU(filenum)) {
346     return 0;
347 }
348 
349 
toku_commit_cmdinsert(FILENUM filenum,BYTESTRING UU (key),TOKUTXN txn,LSN UU (oplsn))350 int toku_commit_cmdinsert (FILENUM filenum, BYTESTRING UU(key), TOKUTXN txn, LSN UU(oplsn)) {
351 #if TOKU_DO_COMMIT_CMD_INSERT
352     return do_insertion (FT_COMMIT_ANY, filenum, key, 0, txn, oplsn, false);
353 #else
354     return do_nothing_with_filenum(txn, filenum);
355 #endif
356 }
357 
358 int
toku_rollback_cmdinsert(FILENUM filenum,BYTESTRING key,TOKUTXN txn,LSN oplsn)359 toku_rollback_cmdinsert (FILENUM    filenum,
360                          BYTESTRING key,
361                          TOKUTXN    txn,
362                          LSN        oplsn)
363 {
364     return do_insertion (FT_ABORT_ANY, filenum, key, 0, txn, oplsn, false);
365 }
366 
367 int
toku_commit_cmdupdate(FILENUM filenum,BYTESTRING UU (key),TOKUTXN txn,LSN UU (oplsn))368 toku_commit_cmdupdate(FILENUM    filenum,
369                       BYTESTRING UU(key),
370                       TOKUTXN    txn,
371                       LSN        UU(oplsn))
372 {
373 #if TOKU_DO_COMMIT_CMD_UPDATE
374     return do_insertion(FT_COMMIT_ANY, filenum, key, 0, txn, oplsn, false);
375 #else
376     return do_nothing_with_filenum(txn, filenum);
377 #endif
378 }
379 
380 int
toku_rollback_cmdupdate(FILENUM filenum,BYTESTRING key,TOKUTXN txn,LSN oplsn)381 toku_rollback_cmdupdate(FILENUM    filenum,
382                         BYTESTRING key,
383                         TOKUTXN    txn,
384                         LSN        oplsn)
385 {
386     return do_insertion(FT_ABORT_ANY, filenum, key, 0, txn, oplsn, false);
387 }
388 
389 int
toku_commit_cmdupdatebroadcast(FILENUM filenum,bool is_resetting_op,TOKUTXN txn,LSN oplsn)390 toku_commit_cmdupdatebroadcast(FILENUM    filenum,
391                                bool       is_resetting_op,
392                                TOKUTXN    txn,
393                                LSN        oplsn)
394 {
395     // if is_resetting_op, reset root_xid_that_created in
396     // relevant ft.
397     bool reset_root_xid_that_created = (is_resetting_op ? true : false);
398     const enum ft_msg_type msg_type = (is_resetting_op
399                                         ? FT_COMMIT_BROADCAST_ALL
400                                         : FT_COMMIT_BROADCAST_TXN);
401     BYTESTRING nullkey = { 0, NULL };
402     return do_insertion(msg_type, filenum, nullkey, 0, txn, oplsn, reset_root_xid_that_created);
403 }
404 
405 int
toku_rollback_cmdupdatebroadcast(FILENUM filenum,bool UU (is_resetting_op),TOKUTXN txn,LSN oplsn)406 toku_rollback_cmdupdatebroadcast(FILENUM    filenum,
407                                  bool       UU(is_resetting_op),
408                                  TOKUTXN    txn,
409                                  LSN        oplsn)
410 {
411     BYTESTRING nullkey = { 0, NULL };
412     return do_insertion(FT_ABORT_BROADCAST_TXN, filenum, nullkey, 0, txn, oplsn, false);
413 }
414 
415 int
toku_commit_cmddelete(FILENUM filenum,BYTESTRING key,TOKUTXN txn,LSN oplsn)416 toku_commit_cmddelete (FILENUM    filenum,
417                        BYTESTRING key,
418                        TOKUTXN    txn,
419                        LSN        oplsn)
420 {
421 #if TOKU_DO_COMMIT_CMD_DELETE
422     return do_insertion (FT_COMMIT_ANY, filenum, key, 0, txn, oplsn, false);
423 #else
424     key = key; oplsn = oplsn;
425     return do_nothing_with_filenum(txn, filenum);
426 #endif
427 }
428 
429 int
toku_rollback_cmddelete(FILENUM filenum,BYTESTRING key,TOKUTXN txn,LSN oplsn)430 toku_rollback_cmddelete (FILENUM    filenum,
431                          BYTESTRING key,
432                          TOKUTXN    txn,
433                          LSN        oplsn)
434 {
435     return do_insertion (FT_ABORT_ANY, filenum, key, 0, txn, oplsn, false);
436 }
437 
438 static int
toku_apply_rollinclude(TXNID_PAIR xid,uint64_t num_nodes,BLOCKNUM spilled_head,BLOCKNUM spilled_tail,TOKUTXN txn,LSN oplsn,apply_rollback_item func)439 toku_apply_rollinclude (TXNID_PAIR      xid,
440                         uint64_t   num_nodes,
441                         BLOCKNUM   spilled_head,
442                         BLOCKNUM   spilled_tail,
443                         TOKUTXN    txn,
444                         LSN        oplsn,
445                         apply_rollback_item func) {
446     int r = 0;
447     struct roll_entry *item;
448 
449     BLOCKNUM next_log      = spilled_tail;
450     uint64_t last_sequence = num_nodes;
451 
452     bool found_head = false;
453     assert(next_log.b != ROLLBACK_NONE.b);
454     while (next_log.b != ROLLBACK_NONE.b) {
455         //pin log
456         ROLLBACK_LOG_NODE log;
457         toku_get_and_pin_rollback_log(txn, next_log, &log);
458         toku_rollback_verify_contents(log, xid, last_sequence - 1);
459         last_sequence = log->sequence;
460 
461         toku_maybe_prefetch_previous_rollback_log(txn, log);
462 
463         while ((item=log->newest_logentry)) {
464             log->newest_logentry = item->prev;
465             r = func(txn, item, oplsn);
466             if (r!=0) return r;
467         }
468         if (next_log.b == spilled_head.b) {
469             assert(!found_head);
470             found_head = true;
471             assert(log->sequence == 0);
472         }
473         next_log      = log->previous;
474         {
475             //Clean up transaction structure to prevent
476             //toku_txn_close from double-freeing
477             spilled_tail      = next_log;
478             if (found_head) {
479                 assert(next_log.b == ROLLBACK_NONE.b);
480                 spilled_head      = next_log;
481             }
482         }
483         toku_rollback_log_unpin_and_remove(txn, log);
484     }
485     return r;
486 }
487 
488 int
toku_commit_rollinclude(TXNID_PAIR xid,uint64_t num_nodes,BLOCKNUM spilled_head,BLOCKNUM spilled_tail,TOKUTXN txn,LSN oplsn)489 toku_commit_rollinclude (TXNID_PAIR      xid,
490                          uint64_t   num_nodes,
491                          BLOCKNUM   spilled_head,
492                          BLOCKNUM   spilled_tail,
493                          TOKUTXN    txn,
494                          LSN        oplsn) {
495     int r;
496     r = toku_apply_rollinclude(xid, num_nodes,
497                                spilled_head,
498                                spilled_tail,
499                                txn, oplsn,
500                                toku_commit_rollback_item);
501     return r;
502 }
503 
504 int
toku_rollback_rollinclude(TXNID_PAIR xid,uint64_t num_nodes,BLOCKNUM spilled_head,BLOCKNUM spilled_tail,TOKUTXN txn,LSN oplsn)505 toku_rollback_rollinclude (TXNID_PAIR      xid,
506                            uint64_t   num_nodes,
507                            BLOCKNUM   spilled_head,
508                            BLOCKNUM   spilled_tail,
509                            TOKUTXN    txn,
510                            LSN        oplsn) {
511     int r;
512     r = toku_apply_rollinclude(xid, num_nodes,
513                                spilled_head,
514                                spilled_tail,
515                                txn, oplsn,
516                                toku_abort_rollback_item);
517     return r;
518 }
519 
520 int
toku_commit_load(FILENUM old_filenum,BYTESTRING UU (new_iname),TOKUTXN txn,LSN UU (oplsn))521 toku_commit_load (FILENUM    old_filenum,
522                   BYTESTRING UU(new_iname),
523                   TOKUTXN    txn,
524                   LSN        UU(oplsn))
525 {
526     int r;
527     CACHEFILE old_cf;
528     CACHETABLE ct = txn->logger->ct;
529 
530     // To commit a dictionary load, we delete the old file
531     //
532     // Try to get the cachefile for the old filenum. A missing file on recovery
533     // is not an error, but a missing file outside of recovery is.
534     r = toku_cachefile_of_filenum(ct, old_filenum, &old_cf);
535     if (r == ENOENT) {
536         invariant(txn->for_recovery);
537         r = 0;
538         goto done;
539     }
540     lazy_assert(r == 0);
541 
542     // bug fix for #4718
543     // bug was introduced in with fix for #3590
544     // Before Maxwell (and fix for #3590),
545     // the recovery log was fsynced after the xcommit was loged but
546     // before we processed rollback entries and before we released
547     // the row locks (in the lock tree). Due to performance concerns,
548     // the fsync was moved to after the release of row locks, which comes
549     // after processing rollback entries. As a result, we may be unlinking a file
550     // here as part of a transactoin that may abort if we do not fsync the log.
551     // So, we fsync the log here.
552     if (txn->logger) {
553         toku_logger_fsync_if_lsn_not_fsynced(txn->logger, txn->do_fsync_lsn);
554     }
555 
556     // TODO: Zardosht
557     // Explain why this condition is valid, because I forget.
558     if (!toku_cachefile_is_unlink_on_close(old_cf)) {
559         toku_cachefile_unlink_on_close(old_cf);
560     }
561 done:
562     return r;
563 }
564 
565 int
toku_rollback_load(FILENUM UU (old_filenum),BYTESTRING new_iname,TOKUTXN txn,LSN UU (oplsn))566 toku_rollback_load (FILENUM    UU(old_filenum),
567                     BYTESTRING new_iname,
568                     TOKUTXN    txn,
569                     LSN        UU(oplsn))
570 {
571     int r;
572     CACHEFILE new_cf;
573     CACHETABLE ct = txn->logger->ct;
574 
575     // To rollback a dictionary load, we delete the new file.
576     // Try to get the cachefile for the new fname.
577     char *fname_in_env = fixup_fname(&new_iname);
578     r = toku_cachefile_of_iname_in_env(ct, fname_in_env, &new_cf);
579     if (r == ENOENT) {
580         // It's possible the new iname was never created, so just try to
581         // unlink it if it's there and ignore the error if it's not.
582         char *fname_in_cwd = toku_cachetable_get_fname_in_cwd(ct, fname_in_env);
583         r = unlink(fname_in_cwd);
584         assert(r == 0 || get_error_errno() == ENOENT);
585         toku_free(fname_in_cwd);
586         r = 0;
587     } else {
588         assert_zero(r);
589         toku_cachefile_unlink_on_close(new_cf);
590     }
591     toku_free(fname_in_env);
592     return r;
593 }
594 
595 //2954
596 int
toku_commit_hot_index(FILENUMS UU (hot_index_filenums),TOKUTXN UU (txn),LSN UU (oplsn))597 toku_commit_hot_index (FILENUMS UU(hot_index_filenums),
598                        TOKUTXN  UU(txn),
599                        LSN      UU(oplsn))
600 {
601     // nothing
602     return 0;
603 }
604 
605 int
toku_rollback_hot_index(FILENUMS UU (hot_index_filenums),TOKUTXN UU (txn),LSN UU (oplsn))606 toku_rollback_hot_index (FILENUMS UU(hot_index_filenums),
607                          TOKUTXN  UU(txn),
608                          LSN      UU(oplsn))
609 {
610     return 0;
611 }
612 
613 int
toku_commit_dictionary_redirect(FILENUM UU (old_filenum),FILENUM UU (new_filenum),TOKUTXN UU (txn),LSN UU (oplsn))614 toku_commit_dictionary_redirect (FILENUM UU(old_filenum),
615                                  FILENUM UU(new_filenum),
616                                  TOKUTXN UU(txn),
617                                  LSN     UU(oplsn)) //oplsn is the lsn of the commit
618 {
619     //Redirect only has meaning during normal operation (NOT during recovery).
620     if (!txn->for_recovery) {
621         //NO-OP
622     }
623     return 0;
624 }
625 
626 int
toku_rollback_dictionary_redirect(FILENUM old_filenum,FILENUM new_filenum,TOKUTXN txn,LSN UU (oplsn))627 toku_rollback_dictionary_redirect (FILENUM old_filenum,
628                                    FILENUM new_filenum,
629                                    TOKUTXN txn,
630                                    LSN     UU(oplsn)) //oplsn is the lsn of the abort
631 {
632     int r = 0;
633     //Redirect only has meaning during normal operation (NOT during recovery).
634     if (!txn->for_recovery) {
635         CACHEFILE new_cf = NULL;
636         r = toku_cachefile_of_filenum(txn->logger->ct, new_filenum, &new_cf);
637         assert(r == 0);
638         FT CAST_FROM_VOIDP(new_ft, toku_cachefile_get_userdata(new_cf));
639 
640         CACHEFILE old_cf = NULL;
641         r = toku_cachefile_of_filenum(txn->logger->ct, old_filenum, &old_cf);
642         assert(r == 0);
643         FT CAST_FROM_VOIDP(old_ft, toku_cachefile_get_userdata(old_cf));
644 
645         //Redirect back from new to old.
646         r = toku_dictionary_redirect_abort(old_ft, new_ft, txn);
647         assert(r==0);
648     }
649     return r;
650 }
651 
652 int
toku_commit_change_fdescriptor(FILENUM filenum,BYTESTRING UU (old_descriptor),TOKUTXN txn,LSN UU (oplsn))653 toku_commit_change_fdescriptor(FILENUM    filenum,
654                                BYTESTRING UU(old_descriptor),
655                                TOKUTXN    txn,
656                                LSN        UU(oplsn))
657 {
658     return do_nothing_with_filenum(txn, filenum);
659 }
660 
661 int
toku_rollback_change_fdescriptor(FILENUM filenum,BYTESTRING old_descriptor,TOKUTXN txn,LSN UU (oplsn))662 toku_rollback_change_fdescriptor(FILENUM    filenum,
663                                BYTESTRING old_descriptor,
664                                TOKUTXN    txn,
665                                LSN        UU(oplsn))
666 {
667     CACHEFILE cf;
668     int r;
669     r = toku_cachefile_of_filenum(txn->logger->ct, filenum, &cf);
670     if (r == ENOENT) { //Missing file on recovered transaction is not an error
671         assert(txn->for_recovery);
672         r = 0;
673         goto done;
674     }
675     // file must be open, because the txn that created it opened it and
676     // noted it,
677     assert(r == 0);
678 
679     FT ft;
680     ft = NULL;
681     r = txn->open_fts.find_zero<FILENUM, find_ft_from_filenum>(filenum, &ft, NULL);
682     assert(r == 0);
683 
684     DESCRIPTOR_S d;
685     toku_fill_dbt(&d.dbt,  old_descriptor.data,  old_descriptor.len);
686     toku_ft_update_descriptor(ft, &d);
687 done:
688     return r;
689 }
690 
691 
692 
693