1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22 ----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39 #include <toku_stdint.h>
40
41 #include "ft/serialize/block_table.h"
42 #include "ft/ft.h"
43 #include "ft/logger/log-internal.h"
44 #include "ft/txn/rollback-ct-callbacks.h"
45
46 extern int writing_rollback;
47
rollback_unpin_remove_callback(CACHEKEY * cachekey,bool for_checkpoint,void * extra)48 static void rollback_unpin_remove_callback(CACHEKEY* cachekey, bool for_checkpoint, void* extra) {
49 FT CAST_FROM_VOIDP(ft, extra);
50 ft->blocktable.free_blocknum(cachekey, ft, for_checkpoint);
51 }
52
toku_rollback_log_unpin_and_remove(TOKUTXN txn,ROLLBACK_LOG_NODE log)53 void toku_rollback_log_unpin_and_remove(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
54 int r;
55 CACHEFILE cf = txn->logger->rollback_cachefile;
56 FT CAST_FROM_VOIDP(ft, toku_cachefile_get_userdata(cf));
57 r = toku_cachetable_unpin_and_remove (cf, log->ct_pair, rollback_unpin_remove_callback, ft);
58 assert(r == 0);
59 }
60
61 int
toku_find_xid_by_xid(const TXNID & xid,const TXNID & xidfind)62 toku_find_xid_by_xid (const TXNID &xid, const TXNID &xidfind) {
63 if (xid<xidfind) return -1;
64 if (xid>xidfind) return +1;
65 return 0;
66 }
67
68 // TODO: fix this name
69 // toku_rollback_malloc
toku_malloc_in_rollback(ROLLBACK_LOG_NODE log,size_t size)70 void *toku_malloc_in_rollback(ROLLBACK_LOG_NODE log, size_t size) {
71 return log->rollentry_arena.malloc_from_arena(size);
72 }
73
74 // TODO: fix this name
75 // toku_rollback_memdup
toku_memdup_in_rollback(ROLLBACK_LOG_NODE log,const void * v,size_t len)76 void *toku_memdup_in_rollback(ROLLBACK_LOG_NODE log, const void *v, size_t len) {
77 void *r = toku_malloc_in_rollback(log, len);
78 memcpy(r, v, len);
79 return r;
80 }
81
make_rollback_pair_attr(long size)82 static inline PAIR_ATTR make_rollback_pair_attr(long size) {
83 PAIR_ATTR result={
84 .size = size,
85 .nonleaf_size = 0,
86 .leaf_size = 0,
87 .rollback_size = size,
88 .cache_pressure_size = 0,
89 .is_valid = true
90 };
91 return result;
92 }
93
94 PAIR_ATTR
rollback_memory_size(ROLLBACK_LOG_NODE log)95 rollback_memory_size(ROLLBACK_LOG_NODE log) {
96 size_t size = sizeof(*log);
97 size += log->rollentry_arena.total_footprint();
98 return make_rollback_pair_attr(size);
99 }
100
toku_rollback_node_save_ct_pair(CACHEKEY UU (key),void * value_data,PAIR p)101 static void toku_rollback_node_save_ct_pair(CACHEKEY UU(key), void *value_data, PAIR p) {
102 ROLLBACK_LOG_NODE CAST_FROM_VOIDP(log, value_data);
103 log->ct_pair = p;
104 }
105
106 //
107 // initializes an empty rollback log node
108 // Does not touch the blocknum, that is the
109 // responsibility of the caller
110 //
rollback_empty_log_init(ROLLBACK_LOG_NODE log)111 void rollback_empty_log_init(ROLLBACK_LOG_NODE log) {
112 // Having a txnid set to TXNID_NONE is how we determine if the
113 // rollback log node is empty or in use.
114 log->txnid.parent_id64 = TXNID_NONE;
115 log->txnid.child_id64 = TXNID_NONE;
116
117 log->layout_version = FT_LAYOUT_VERSION;
118 log->layout_version_original = FT_LAYOUT_VERSION;
119 log->layout_version_read_from_disk = FT_LAYOUT_VERSION;
120 log->dirty = true;
121 log->sequence = 0;
122 log->previous = make_blocknum(0);
123 log->oldest_logentry = NULL;
124 log->newest_logentry = NULL;
125 log->rollentry_arena.create(0);
126 log->rollentry_resident_bytecount = 0;
127 }
128
rollback_initialize_for_txn(ROLLBACK_LOG_NODE log,TOKUTXN txn,BLOCKNUM previous)129 static void rollback_initialize_for_txn(
130 ROLLBACK_LOG_NODE log,
131 TOKUTXN txn,
132 BLOCKNUM previous
133 )
134 {
135 log->txnid = txn->txnid;
136 log->sequence = txn->roll_info.num_rollback_nodes++;
137 log->previous = previous;
138 log->oldest_logentry = NULL;
139 log->newest_logentry = NULL;
140 log->rollentry_arena.create(1024);
141 log->rollentry_resident_bytecount = 0;
142 log->dirty = true;
143 }
144
145 // TODO: fix this name
make_rollback_log_empty(ROLLBACK_LOG_NODE log)146 void make_rollback_log_empty(ROLLBACK_LOG_NODE log) {
147 log->rollentry_arena.destroy();
148 rollback_empty_log_init(log);
149 }
150
151 // create and pin a new rollback log node. chain it to the other rollback nodes
152 // by providing a previous blocknum and assigning the new rollback log
153 // node the next sequence number
rollback_log_create(TOKUTXN txn,BLOCKNUM previous,ROLLBACK_LOG_NODE * result)154 static void rollback_log_create (
155 TOKUTXN txn,
156 BLOCKNUM previous,
157 ROLLBACK_LOG_NODE *result
158 )
159 {
160 writing_rollback++;
161 ROLLBACK_LOG_NODE XMALLOC(log);
162 rollback_empty_log_init(log);
163
164 CACHEFILE cf = txn->logger->rollback_cachefile;
165 FT CAST_FROM_VOIDP(ft, toku_cachefile_get_userdata(cf));
166 rollback_initialize_for_txn(log, txn, previous);
167 ft->blocktable.allocate_blocknum(&log->blocknum, ft);
168 const uint32_t hash = toku_cachetable_hash(ft->cf, log->blocknum);
169 *result = log;
170 toku_cachetable_put(cf, log->blocknum, hash,
171 log, rollback_memory_size(log),
172 get_write_callbacks_for_rollback_log(ft),
173 toku_rollback_node_save_ct_pair);
174 txn->roll_info.current_rollback = log->blocknum;
175 writing_rollback --;
176 }
177
toku_rollback_log_unpin(TOKUTXN txn,ROLLBACK_LOG_NODE log)178 void toku_rollback_log_unpin(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
179 int r;
180 CACHEFILE cf = txn->logger->rollback_cachefile;
181 r = toku_cachetable_unpin(
182 cf,
183 log->ct_pair,
184 (enum cachetable_dirty)log->dirty,
185 rollback_memory_size(log)
186 );
187 assert(r == 0);
188 }
189
190 //Requires: log is pinned
191 // log is current
192 //After:
193 // Maybe there is no current after (if it spilled)
toku_maybe_spill_rollbacks(TOKUTXN txn,ROLLBACK_LOG_NODE log)194 void toku_maybe_spill_rollbacks(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
195 if (log->rollentry_resident_bytecount > txn->logger->write_block_size) {
196 assert(log->blocknum.b == txn->roll_info.current_rollback.b);
197 //spill
198 if (!txn_has_spilled_rollback_logs(txn)) {
199 //First spilled. Copy to head.
200 txn->roll_info.spilled_rollback_head = txn->roll_info.current_rollback;
201 }
202 //Unconditionally copy to tail. Old tail does not need to be cached anymore.
203 txn->roll_info.spilled_rollback_tail = txn->roll_info.current_rollback;
204
205 txn->roll_info.current_rollback = ROLLBACK_NONE;
206 }
207 }
208
209 int find_filenum (const FT &h, const FT &hfind);
find_filenum(const FT & h,const FT & hfind)210 int find_filenum (const FT &h, const FT &hfind) {
211 FILENUM fnum = toku_cachefile_filenum(h->cf);
212 FILENUM fnumfind = toku_cachefile_filenum(hfind->cf);
213 if (fnum.fileid<fnumfind.fileid) return -1;
214 if (fnum.fileid>fnumfind.fileid) return +1;
215 return 0;
216 }
217
218 //Notify a transaction that it has touched an ft.
toku_txn_maybe_note_ft(TOKUTXN txn,FT ft)219 void toku_txn_maybe_note_ft (TOKUTXN txn, FT ft) {
220 toku_txn_lock(txn);
221 FT ftv;
222 uint32_t idx;
223 int r = txn->open_fts.find_zero<FT, find_filenum>(ft, &ftv, &idx);
224 if (r == 0) {
225 // already there
226 assert(ftv == ft);
227 goto exit;
228 }
229 r = txn->open_fts.insert_at(ft, idx);
230 assert_zero(r);
231 // TODO(leif): if there's anything that locks the reflock and then
232 // the txn lock, this may deadlock, because it grabs the reflock.
233 toku_ft_add_txn_ref(ft);
234 exit:
235 toku_txn_unlock(txn);
236 }
237
238 // Return the number of bytes that went into the rollback data structure (the uncompressed count if there is compression)
toku_logger_txn_rollback_stats(TOKUTXN txn,struct txn_stat * txn_stat)239 int toku_logger_txn_rollback_stats(TOKUTXN txn, struct txn_stat *txn_stat)
240 {
241 toku_txn_lock(txn);
242 txn_stat->rollback_raw_count = txn->roll_info.rollentry_raw_count;
243 txn_stat->rollback_num_entries = txn->roll_info.num_rollentries;
244 toku_txn_unlock(txn);
245 return 0;
246 }
247
toku_maybe_prefetch_previous_rollback_log(TOKUTXN txn,ROLLBACK_LOG_NODE log)248 void toku_maybe_prefetch_previous_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
249 //Currently processing 'log'. Prefetch the next (previous) log node.
250
251 BLOCKNUM name = log->previous;
252 int r = 0;
253 if (name.b != ROLLBACK_NONE.b) {
254 CACHEFILE cf = txn->logger->rollback_cachefile;
255 uint32_t hash = toku_cachetable_hash(cf, name);
256 FT CAST_FROM_VOIDP(h, toku_cachefile_get_userdata(cf));
257 bool doing_prefetch = false;
258 r = toku_cachefile_prefetch(cf, name, hash,
259 get_write_callbacks_for_rollback_log(h),
260 toku_rollback_fetch_callback,
261 toku_rollback_pf_req_callback,
262 toku_rollback_pf_callback,
263 h,
264 &doing_prefetch);
265 assert(r == 0);
266 }
267 }
268
toku_rollback_verify_contents(ROLLBACK_LOG_NODE log,TXNID_PAIR txnid,uint64_t sequence)269 void toku_rollback_verify_contents(ROLLBACK_LOG_NODE log,
270 TXNID_PAIR txnid, uint64_t sequence)
271 {
272 assert(log->txnid.parent_id64 == txnid.parent_id64);
273 assert(log->txnid.child_id64 == txnid.child_id64);
274 assert(log->sequence == sequence);
275 }
276
toku_get_and_pin_rollback_log(TOKUTXN txn,BLOCKNUM blocknum,ROLLBACK_LOG_NODE * log)277 void toku_get_and_pin_rollback_log(TOKUTXN txn, BLOCKNUM blocknum, ROLLBACK_LOG_NODE *log) {
278 void * value;
279 CACHEFILE cf = txn->logger->rollback_cachefile;
280 FT CAST_FROM_VOIDP(h, toku_cachefile_get_userdata(cf));
281 uint32_t hash = toku_cachetable_hash(cf, blocknum);
282 int r = toku_cachetable_get_and_pin_with_dep_pairs(cf, blocknum, hash,
283 &value,
284 get_write_callbacks_for_rollback_log(h),
285 toku_rollback_fetch_callback,
286 toku_rollback_pf_req_callback,
287 toku_rollback_pf_callback,
288 PL_WRITE_CHEAP, // lock_type
289 h,
290 0, NULL, NULL
291 );
292 assert(r == 0);
293 ROLLBACK_LOG_NODE CAST_FROM_VOIDP(pinned_log, value);
294 assert(pinned_log->blocknum.b == blocknum.b);
295 *log = pinned_log;
296 }
297
toku_get_and_pin_rollback_log_for_new_entry(TOKUTXN txn,ROLLBACK_LOG_NODE * log)298 void toku_get_and_pin_rollback_log_for_new_entry (TOKUTXN txn, ROLLBACK_LOG_NODE *log) {
299 ROLLBACK_LOG_NODE pinned_log = NULL;
300 invariant(txn->state == TOKUTXN_LIVE || txn->state == TOKUTXN_PREPARING); // hot indexing may call this function for prepared transactions
301 if (txn_has_current_rollback_log(txn)) {
302 toku_get_and_pin_rollback_log(txn, txn->roll_info.current_rollback, &pinned_log);
303 toku_rollback_verify_contents(pinned_log, txn->txnid, txn->roll_info.num_rollback_nodes - 1);
304 } else {
305 // For each transaction, we try to acquire the first rollback log
306 // from the rollback log node cache, so that we avoid
307 // putting something new into the cachetable. However,
308 // if transaction has spilled rollbacks, that means we
309 // have already done a lot of work for this transaction,
310 // and subsequent rollback log nodes are created
311 // and put into the cachetable. The idea is for
312 // transactions that don't do a lot of work to (hopefully)
313 // get a rollback log node from a cache, as opposed to
314 // taking the more expensive route of creating a new one.
315 if (!txn_has_spilled_rollback_logs(txn)) {
316 txn->logger->rollback_cache.get_rollback_log_node(txn, &pinned_log);
317 if (pinned_log != NULL) {
318 rollback_initialize_for_txn(
319 pinned_log,
320 txn,
321 txn->roll_info.spilled_rollback_tail
322 );
323 txn->roll_info.current_rollback = pinned_log->blocknum;
324 }
325 }
326 if (pinned_log == NULL) {
327 rollback_log_create(txn, txn->roll_info.spilled_rollback_tail, &pinned_log);
328 }
329 }
330 assert(pinned_log->txnid.parent_id64 == txn->txnid.parent_id64);
331 assert(pinned_log->txnid.child_id64 == txn->txnid.child_id64);
332 assert(pinned_log->blocknum.b != ROLLBACK_NONE.b);
333 *log = pinned_log;
334 }
335