1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include <toku_stdint.h>
40 
41 #include "ft/serialize/block_table.h"
42 #include "ft/ft.h"
43 #include "ft/logger/log-internal.h"
44 #include "ft/txn/rollback-ct-callbacks.h"
45 
46 extern int writing_rollback;
47 
rollback_unpin_remove_callback(CACHEKEY * cachekey,bool for_checkpoint,void * extra)48 static void rollback_unpin_remove_callback(CACHEKEY* cachekey, bool for_checkpoint, void* extra) {
49     FT CAST_FROM_VOIDP(ft, extra);
50     ft->blocktable.free_blocknum(cachekey, ft, for_checkpoint);
51 }
52 
toku_rollback_log_unpin_and_remove(TOKUTXN txn,ROLLBACK_LOG_NODE log)53 void toku_rollback_log_unpin_and_remove(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
54     int r;
55     CACHEFILE cf = txn->logger->rollback_cachefile;
56     FT CAST_FROM_VOIDP(ft, toku_cachefile_get_userdata(cf));
57     r = toku_cachetable_unpin_and_remove (cf, log->ct_pair, rollback_unpin_remove_callback, ft);
58     assert(r == 0);
59 }
60 
61 int
toku_find_xid_by_xid(const TXNID & xid,const TXNID & xidfind)62 toku_find_xid_by_xid (const TXNID &xid, const TXNID &xidfind) {
63     if (xid<xidfind) return -1;
64     if (xid>xidfind) return +1;
65     return 0;
66 }
67 
68 // TODO: fix this name
69 //       toku_rollback_malloc
toku_malloc_in_rollback(ROLLBACK_LOG_NODE log,size_t size)70 void *toku_malloc_in_rollback(ROLLBACK_LOG_NODE log, size_t size) {
71     return log->rollentry_arena.malloc_from_arena(size);
72 }
73 
74 // TODO: fix this name
75 //       toku_rollback_memdup
toku_memdup_in_rollback(ROLLBACK_LOG_NODE log,const void * v,size_t len)76 void *toku_memdup_in_rollback(ROLLBACK_LOG_NODE log, const void *v, size_t len) {
77     void *r = toku_malloc_in_rollback(log, len);
78     memcpy(r, v, len);
79     return r;
80 }
81 
make_rollback_pair_attr(long size)82 static inline PAIR_ATTR make_rollback_pair_attr(long size) {
83     PAIR_ATTR result={
84      .size = size,
85      .nonleaf_size = 0,
86      .leaf_size = 0,
87      .rollback_size = size,
88      .cache_pressure_size = 0,
89      .is_valid = true
90     };
91     return result;
92 }
93 
94 PAIR_ATTR
rollback_memory_size(ROLLBACK_LOG_NODE log)95 rollback_memory_size(ROLLBACK_LOG_NODE log) {
96     size_t size = sizeof(*log);
97     size += log->rollentry_arena.total_footprint();
98     return make_rollback_pair_attr(size);
99 }
100 
toku_rollback_node_save_ct_pair(CACHEKEY UU (key),void * value_data,PAIR p)101 static void toku_rollback_node_save_ct_pair(CACHEKEY UU(key), void *value_data, PAIR p) {
102     ROLLBACK_LOG_NODE CAST_FROM_VOIDP(log, value_data);
103     log->ct_pair = p;
104 }
105 
106 //
107 // initializes an empty rollback log node
108 // Does not touch the blocknum, that is the
109 // responsibility of the caller
110 //
rollback_empty_log_init(ROLLBACK_LOG_NODE log)111 void rollback_empty_log_init(ROLLBACK_LOG_NODE log) {
112     // Having a txnid set to TXNID_NONE is how we determine if the
113     // rollback log node is empty or in use.
114     log->txnid.parent_id64 = TXNID_NONE;
115     log->txnid.child_id64 = TXNID_NONE;
116 
117     log->layout_version                = FT_LAYOUT_VERSION;
118     log->layout_version_original       = FT_LAYOUT_VERSION;
119     log->layout_version_read_from_disk = FT_LAYOUT_VERSION;
120     log->dirty = true;
121     log->sequence = 0;
122     log->previous = make_blocknum(0);
123     log->oldest_logentry = NULL;
124     log->newest_logentry = NULL;
125     log->rollentry_arena.create(0);
126     log->rollentry_resident_bytecount = 0;
127 }
128 
rollback_initialize_for_txn(ROLLBACK_LOG_NODE log,TOKUTXN txn,BLOCKNUM previous)129 static void rollback_initialize_for_txn(
130     ROLLBACK_LOG_NODE log,
131     TOKUTXN txn,
132     BLOCKNUM previous
133     )
134 {
135     log->txnid = txn->txnid;
136     log->sequence = txn->roll_info.num_rollback_nodes++;
137     log->previous = previous;
138     log->oldest_logentry = NULL;
139     log->newest_logentry = NULL;
140     log->rollentry_arena.create(1024);
141     log->rollentry_resident_bytecount = 0;
142     log->dirty = true;
143 }
144 
145 // TODO: fix this name
make_rollback_log_empty(ROLLBACK_LOG_NODE log)146 void make_rollback_log_empty(ROLLBACK_LOG_NODE log) {
147     log->rollentry_arena.destroy();
148     rollback_empty_log_init(log);
149 }
150 
151 // create and pin a new rollback log node. chain it to the other rollback nodes
152 // by providing a previous blocknum and assigning the new rollback log
153 // node the next sequence number
rollback_log_create(TOKUTXN txn,BLOCKNUM previous,ROLLBACK_LOG_NODE * result)154 static void rollback_log_create (
155     TOKUTXN txn,
156     BLOCKNUM previous,
157     ROLLBACK_LOG_NODE *result
158     )
159 {
160     writing_rollback++;
161     ROLLBACK_LOG_NODE XMALLOC(log);
162     rollback_empty_log_init(log);
163 
164     CACHEFILE cf = txn->logger->rollback_cachefile;
165     FT CAST_FROM_VOIDP(ft, toku_cachefile_get_userdata(cf));
166     rollback_initialize_for_txn(log, txn, previous);
167     ft->blocktable.allocate_blocknum(&log->blocknum, ft);
168     const uint32_t hash = toku_cachetable_hash(ft->cf, log->blocknum);
169     *result = log;
170     toku_cachetable_put(cf, log->blocknum, hash,
171                        log, rollback_memory_size(log),
172                        get_write_callbacks_for_rollback_log(ft),
173                        toku_rollback_node_save_ct_pair);
174     txn->roll_info.current_rollback = log->blocknum;
175     writing_rollback --;
176 }
177 
toku_rollback_log_unpin(TOKUTXN txn,ROLLBACK_LOG_NODE log)178 void toku_rollback_log_unpin(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
179     int r;
180     CACHEFILE cf = txn->logger->rollback_cachefile;
181     r = toku_cachetable_unpin(
182         cf,
183         log->ct_pair,
184         (enum cachetable_dirty)log->dirty,
185         rollback_memory_size(log)
186         );
187     assert(r == 0);
188 }
189 
190 //Requires: log is pinned
191 //          log is current
192 //After:
193 //  Maybe there is no current after (if it spilled)
toku_maybe_spill_rollbacks(TOKUTXN txn,ROLLBACK_LOG_NODE log)194 void toku_maybe_spill_rollbacks(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
195     if (log->rollentry_resident_bytecount > txn->logger->write_block_size) {
196         assert(log->blocknum.b == txn->roll_info.current_rollback.b);
197         //spill
198         if (!txn_has_spilled_rollback_logs(txn)) {
199             //First spilled.  Copy to head.
200             txn->roll_info.spilled_rollback_head      = txn->roll_info.current_rollback;
201         }
202         //Unconditionally copy to tail.  Old tail does not need to be cached anymore.
203         txn->roll_info.spilled_rollback_tail      = txn->roll_info.current_rollback;
204 
205         txn->roll_info.current_rollback      = ROLLBACK_NONE;
206     }
207 }
208 
209 int find_filenum (const FT &h, const FT &hfind);
find_filenum(const FT & h,const FT & hfind)210 int find_filenum (const FT &h, const FT &hfind) {
211     FILENUM fnum     = toku_cachefile_filenum(h->cf);
212     FILENUM fnumfind = toku_cachefile_filenum(hfind->cf);
213     if (fnum.fileid<fnumfind.fileid) return -1;
214     if (fnum.fileid>fnumfind.fileid) return +1;
215     return 0;
216 }
217 
218 //Notify a transaction that it has touched an ft.
toku_txn_maybe_note_ft(TOKUTXN txn,FT ft)219 void toku_txn_maybe_note_ft (TOKUTXN txn, FT ft) {
220     toku_txn_lock(txn);
221     FT ftv;
222     uint32_t idx;
223     int r = txn->open_fts.find_zero<FT, find_filenum>(ft, &ftv, &idx);
224     if (r == 0) {
225         // already there
226         assert(ftv == ft);
227         goto exit;
228     }
229     r = txn->open_fts.insert_at(ft, idx);
230     assert_zero(r);
231     // TODO(leif): if there's anything that locks the reflock and then
232     // the txn lock, this may deadlock, because it grabs the reflock.
233     toku_ft_add_txn_ref(ft);
234 exit:
235     toku_txn_unlock(txn);
236 }
237 
238 // Return the number of bytes that went into the rollback data structure (the uncompressed count if there is compression)
toku_logger_txn_rollback_stats(TOKUTXN txn,struct txn_stat * txn_stat)239 int toku_logger_txn_rollback_stats(TOKUTXN txn, struct txn_stat *txn_stat)
240 {
241     toku_txn_lock(txn);
242     txn_stat->rollback_raw_count = txn->roll_info.rollentry_raw_count;
243     txn_stat->rollback_num_entries = txn->roll_info.num_rollentries;
244     toku_txn_unlock(txn);
245     return 0;
246 }
247 
toku_maybe_prefetch_previous_rollback_log(TOKUTXN txn,ROLLBACK_LOG_NODE log)248 void toku_maybe_prefetch_previous_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
249     //Currently processing 'log'.  Prefetch the next (previous) log node.
250 
251     BLOCKNUM name = log->previous;
252     int r = 0;
253     if (name.b != ROLLBACK_NONE.b) {
254         CACHEFILE cf = txn->logger->rollback_cachefile;
255         uint32_t hash = toku_cachetable_hash(cf, name);
256         FT CAST_FROM_VOIDP(h, toku_cachefile_get_userdata(cf));
257         bool doing_prefetch = false;
258         r = toku_cachefile_prefetch(cf, name, hash,
259                                     get_write_callbacks_for_rollback_log(h),
260                                     toku_rollback_fetch_callback,
261                                     toku_rollback_pf_req_callback,
262                                     toku_rollback_pf_callback,
263                                     h,
264                                     &doing_prefetch);
265         assert(r == 0);
266     }
267 }
268 
toku_rollback_verify_contents(ROLLBACK_LOG_NODE log,TXNID_PAIR txnid,uint64_t sequence)269 void toku_rollback_verify_contents(ROLLBACK_LOG_NODE log,
270         TXNID_PAIR txnid, uint64_t sequence)
271 {
272     assert(log->txnid.parent_id64 == txnid.parent_id64);
273     assert(log->txnid.child_id64 == txnid.child_id64);
274     assert(log->sequence == sequence);
275 }
276 
toku_get_and_pin_rollback_log(TOKUTXN txn,BLOCKNUM blocknum,ROLLBACK_LOG_NODE * log)277 void toku_get_and_pin_rollback_log(TOKUTXN txn, BLOCKNUM blocknum, ROLLBACK_LOG_NODE *log) {
278     void * value;
279     CACHEFILE cf = txn->logger->rollback_cachefile;
280     FT CAST_FROM_VOIDP(h, toku_cachefile_get_userdata(cf));
281     uint32_t hash = toku_cachetable_hash(cf, blocknum);
282     int r = toku_cachetable_get_and_pin_with_dep_pairs(cf, blocknum, hash,
283                                         &value,
284                                         get_write_callbacks_for_rollback_log(h),
285                                         toku_rollback_fetch_callback,
286                                         toku_rollback_pf_req_callback,
287                                         toku_rollback_pf_callback,
288                                         PL_WRITE_CHEAP, // lock_type
289                                         h,
290                                         0, NULL, NULL
291                                         );
292     assert(r == 0);
293     ROLLBACK_LOG_NODE CAST_FROM_VOIDP(pinned_log, value);
294     assert(pinned_log->blocknum.b == blocknum.b);
295     *log = pinned_log;
296 }
297 
toku_get_and_pin_rollback_log_for_new_entry(TOKUTXN txn,ROLLBACK_LOG_NODE * log)298 void toku_get_and_pin_rollback_log_for_new_entry (TOKUTXN txn, ROLLBACK_LOG_NODE *log) {
299     ROLLBACK_LOG_NODE pinned_log = NULL;
300     invariant(txn->state == TOKUTXN_LIVE || txn->state == TOKUTXN_PREPARING); // hot indexing may call this function for prepared transactions
301     if (txn_has_current_rollback_log(txn)) {
302         toku_get_and_pin_rollback_log(txn, txn->roll_info.current_rollback, &pinned_log);
303         toku_rollback_verify_contents(pinned_log, txn->txnid, txn->roll_info.num_rollback_nodes - 1);
304     } else {
305         // For each transaction, we try to acquire the first rollback log
306         // from the rollback log node cache, so that we avoid
307         // putting something new into the cachetable. However,
308         // if transaction has spilled rollbacks, that means we
309         // have already done a lot of work for this transaction,
310         // and subsequent rollback log nodes are created
311         // and put into the cachetable. The idea is for
312         // transactions that don't do a lot of work to (hopefully)
313         // get a rollback log node from a cache, as opposed to
314         // taking the more expensive route of creating a new one.
315         if (!txn_has_spilled_rollback_logs(txn)) {
316             txn->logger->rollback_cache.get_rollback_log_node(txn, &pinned_log);
317             if (pinned_log != NULL) {
318                 rollback_initialize_for_txn(
319                     pinned_log,
320                     txn,
321                     txn->roll_info.spilled_rollback_tail
322                     );
323                 txn->roll_info.current_rollback = pinned_log->blocknum;
324             }
325         }
326         if (pinned_log == NULL) {
327             rollback_log_create(txn, txn->roll_info.spilled_rollback_tail, &pinned_log);
328         }
329     }
330     assert(pinned_log->txnid.parent_id64 == txn->txnid.parent_id64);
331     assert(pinned_log->txnid.child_id64 == txn->txnid.child_id64);
332     assert(pinned_log->blocknum.b != ROLLBACK_NONE.b);
333     *log = pinned_log;
334 }
335