1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22 ----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39 #include "ft/cachetable/checkpoint.h"
40 #include "ft/ft.h"
41 #include "ft/logger/log-internal.h"
42 #include "ft/ule.h"
43 #include "ft/txn/rollback-apply.h"
44 #include "ft/txn/txn.h"
45 #include "ft/txn/txn_manager.h"
46 #include "util/status.h"
47
48 toku_instr_key *txn_lock_mutex_key;
49 toku_instr_key *txn_state_lock_mutex_key;
50 toku_instr_key *result_state_cond_key;
51
toku_txn_get_status(TXN_STATUS s)52 void toku_txn_get_status(TXN_STATUS s) {
53 txn_status.init();
54 *s = txn_status;
55 }
56
57 void
toku_txn_lock(TOKUTXN txn)58 toku_txn_lock(TOKUTXN txn)
59 {
60 toku_mutex_lock(&txn->txn_lock);
61 }
62
63 void
toku_txn_unlock(TOKUTXN txn)64 toku_txn_unlock(TOKUTXN txn)
65 {
66 toku_mutex_unlock(&txn->txn_lock);
67 }
68
69 uint64_t
toku_txn_get_root_id(TOKUTXN txn)70 toku_txn_get_root_id(TOKUTXN txn)
71 {
72 return txn->txnid.parent_id64;
73 }
74
txn_declared_read_only(TOKUTXN txn)75 bool txn_declared_read_only(TOKUTXN txn) {
76 return txn->declared_read_only;
77 }
78
79 int
toku_txn_begin_txn(DB_TXN * container_db_txn,TOKUTXN parent_tokutxn,TOKUTXN * tokutxn,TOKULOGGER logger,TXN_SNAPSHOT_TYPE snapshot_type,bool read_only)80 toku_txn_begin_txn (
81 DB_TXN *container_db_txn,
82 TOKUTXN parent_tokutxn,
83 TOKUTXN *tokutxn,
84 TOKULOGGER logger,
85 TXN_SNAPSHOT_TYPE snapshot_type,
86 bool read_only
87 )
88 {
89 int r = toku_txn_begin_with_xid(
90 parent_tokutxn,
91 tokutxn,
92 logger,
93 TXNID_PAIR_NONE,
94 snapshot_type,
95 container_db_txn,
96 false, // for_recovery
97 read_only
98 );
99 return r;
100 }
101
102
103 static void
txn_create_xids(TOKUTXN txn,TOKUTXN parent)104 txn_create_xids(TOKUTXN txn, TOKUTXN parent) {
105 XIDS xids;
106 XIDS parent_xids;
107 if (parent == NULL) {
108 parent_xids = toku_xids_get_root_xids();
109 } else {
110 parent_xids = parent->xids;
111 }
112 toku_xids_create_unknown_child(parent_xids, &xids);
113 TXNID finalized_xid = (parent == NULL) ? txn->txnid.parent_id64 : txn->txnid.child_id64;
114 toku_xids_finalize_with_child(xids, finalized_xid);
115 txn->xids = xids;
116 }
117
118 // Allocate and initialize a txn
119 static void toku_txn_create_txn(TOKUTXN *txn_ptr, TOKUTXN parent, TOKULOGGER logger, TXN_SNAPSHOT_TYPE snapshot_type, DB_TXN *container_db_txn, bool for_checkpoint, bool read_only);
120
121 int
toku_txn_begin_with_xid(TOKUTXN parent,TOKUTXN * txnp,TOKULOGGER logger,TXNID_PAIR xid,TXN_SNAPSHOT_TYPE snapshot_type,DB_TXN * container_db_txn,bool for_recovery,bool read_only)122 toku_txn_begin_with_xid (
123 TOKUTXN parent,
124 TOKUTXN *txnp,
125 TOKULOGGER logger,
126 TXNID_PAIR xid,
127 TXN_SNAPSHOT_TYPE snapshot_type,
128 DB_TXN *container_db_txn,
129 bool for_recovery,
130 bool read_only
131 )
132 {
133 int r = 0;
134 TOKUTXN txn;
135 // check for case where we are trying to
136 // create too many nested transactions
137 if (!read_only && parent && !toku_xids_can_create_child(parent->xids)) {
138 r = EINVAL;
139 goto exit;
140 }
141 if (read_only && parent) {
142 invariant(txn_declared_read_only(parent));
143 }
144 toku_txn_create_txn(&txn, parent, logger, snapshot_type, container_db_txn, for_recovery, read_only);
145 // txnid64, snapshot_txnid64
146 // will be set in here.
147 if (for_recovery) {
148 if (parent == NULL) {
149 invariant(xid.child_id64 == TXNID_NONE);
150 toku_txn_manager_start_txn_for_recovery(
151 txn,
152 logger->txn_manager,
153 xid.parent_id64
154 );
155 }
156 else {
157 parent->child_manager->start_child_txn_for_recovery(txn, parent, xid);
158 }
159 }
160 else {
161 assert(xid.parent_id64 == TXNID_NONE);
162 assert(xid.child_id64 == TXNID_NONE);
163 if (parent == NULL) {
164 toku_txn_manager_start_txn(
165 txn,
166 logger->txn_manager,
167 snapshot_type,
168 read_only
169 );
170 }
171 else {
172 parent->child_manager->start_child_txn(txn, parent);
173 toku_txn_manager_handle_snapshot_create_for_child_txn(
174 txn,
175 logger->txn_manager,
176 snapshot_type
177 );
178 }
179 }
180 if (!read_only) {
181 // this call will set txn->xids
182 txn_create_xids(txn, parent);
183 }
184 toku_unsafe_set(txnp, txn);
185 exit:
186 return r;
187 }
188
189 DB_TXN *
toku_txn_get_container_db_txn(TOKUTXN tokutxn)190 toku_txn_get_container_db_txn (TOKUTXN tokutxn) {
191 DB_TXN * container = tokutxn->container_db_txn;
192 return container;
193 }
194
toku_txn_set_container_db_txn(TOKUTXN tokutxn,DB_TXN * container)195 void toku_txn_set_container_db_txn (TOKUTXN tokutxn, DB_TXN*container) {
196 tokutxn->container_db_txn = container;
197 }
198
invalidate_xa_xid(TOKU_XA_XID * xid)199 static void invalidate_xa_xid (TOKU_XA_XID *xid) {
200 TOKU_ANNOTATE_NEW_MEMORY(xid, sizeof(*xid)); // consider it to be all invalid for valgrind
201 xid->formatID = -1; // According to the XA spec, -1 means "invalid data"
202 }
203
toku_txn_create_txn(TOKUTXN * tokutxn,TOKUTXN parent_tokutxn,TOKULOGGER logger,TXN_SNAPSHOT_TYPE snapshot_type,DB_TXN * container_db_txn,bool for_recovery,bool read_only)204 static void toku_txn_create_txn (
205 TOKUTXN *tokutxn,
206 TOKUTXN parent_tokutxn,
207 TOKULOGGER logger,
208 TXN_SNAPSHOT_TYPE snapshot_type,
209 DB_TXN *container_db_txn,
210 bool for_recovery,
211 bool read_only
212 )
213 {
214 assert(logger->rollback_cachefile);
215
216 omt<FT> open_fts;
217 open_fts.create_no_array();
218
219 struct txn_roll_info roll_info = {
220 .num_rollback_nodes = 0,
221 .num_rollentries = 0,
222 .num_rollentries_processed = 0,
223 .rollentry_raw_count = 0,
224 .spilled_rollback_head = ROLLBACK_NONE,
225 .spilled_rollback_tail = ROLLBACK_NONE,
226 .current_rollback = ROLLBACK_NONE,
227 };
228
229 static txn_child_manager tcm;
230
231 struct tokutxn new_txn = {
232 .txnid = {.parent_id64 = TXNID_NONE, .child_id64 = TXNID_NONE },
233 .snapshot_txnid64 = TXNID_NONE,
234 .snapshot_type = for_recovery ? TXN_SNAPSHOT_NONE : snapshot_type,
235 .for_recovery = for_recovery,
236 .logger = logger,
237 .parent = parent_tokutxn,
238 .child = NULL,
239 .child_manager_s = tcm,
240 .child_manager = NULL,
241 .container_db_txn = container_db_txn,
242 .live_root_txn_list = nullptr,
243 .xids = NULL,
244 .snapshot_next = NULL,
245 .snapshot_prev = NULL,
246 .begin_was_logged = false,
247 .declared_read_only = read_only,
248 .do_fsync = false,
249 .force_fsync_on_commit = false,
250 .do_fsync_lsn = ZERO_LSN,
251 .xa_xid = {0, 0, 0, ""},
252 .progress_poll_fun = NULL,
253 .progress_poll_fun_extra = NULL,
254
255 // You cannot initialize txn_lock a TOKU_MUTEX_INITIALIZER, because we
256 // will initialize it in the code below, and it cannot already
257 // be initialized at that point. Also, in general, you don't
258 // get to use PTHREAD_MUTEX_INITALIZER (which is what is inside
259 // TOKU_MUTEX_INITIALIZER) except in static variables, and this
260 // is initializing an auto variable.
261 //
262 // And we cannot simply avoid initializing these fields
263 // because, although it avoids -Wmissing-field-initializer
264 // errors under gcc, it gets other errors about non-trivial
265 // designated initializers not being supported.
266
267 .txn_lock = ZERO_MUTEX_INITIALIZER, // Not TOKU_MUTEX_INITIALIZER
268 .open_fts = open_fts,
269 .roll_info = roll_info,
270 .state_lock = ZERO_MUTEX_INITIALIZER, // Not TOKU_MUTEX_INITIALIZER
271 .state_cond = ZERO_COND_INITIALIZER, // Not TOKU_COND_INITIALIZER
272 .state = TOKUTXN_LIVE,
273 .num_pin = 0,
274 .client_id = 0,
275 .client_extra = nullptr,
276 .start_time = time(NULL),
277 };
278
279 TOKUTXN result = NULL;
280 XMEMDUP(result, &new_txn);
281 invalidate_xa_xid(&result->xa_xid);
282 if (parent_tokutxn == NULL) {
283 result->child_manager = &result->child_manager_s;
284 result->child_manager->init(result);
285 }
286 else {
287 result->child_manager = parent_tokutxn->child_manager;
288 }
289
290 toku_mutex_init(*txn_lock_mutex_key, &result->txn_lock, nullptr);
291
292 toku_pthread_mutexattr_t attr;
293 toku_mutexattr_init(&attr);
294 toku_mutexattr_settype(&attr, TOKU_MUTEX_ADAPTIVE);
295 toku_mutex_init(*txn_state_lock_mutex_key, &result->state_lock, &attr);
296 toku_mutexattr_destroy(&attr);
297
298 toku_cond_init(*result_state_cond_key, &result->state_cond, nullptr);
299
300 *tokutxn = result;
301
302 if (read_only) {
303 TXN_STATUS_INC(TXN_READ_BEGIN, 1);
304 }
305 else {
306 TXN_STATUS_INC(TXN_BEGIN, 1);
307 }
308 }
309
310 void
toku_txn_update_xids_in_txn(TOKUTXN txn,TXNID xid)311 toku_txn_update_xids_in_txn(TOKUTXN txn, TXNID xid)
312 {
313 // these should not have been set yet
314 invariant(txn->txnid.parent_id64 == TXNID_NONE);
315 invariant(txn->txnid.child_id64 == TXNID_NONE);
316 txn->txnid.parent_id64 = xid;
317 txn->txnid.child_id64 = TXNID_NONE;
318 }
319
320 //Used on recovery to recover a transaction.
321 int
toku_txn_load_txninfo(TOKUTXN txn,struct txninfo * info)322 toku_txn_load_txninfo (TOKUTXN txn, struct txninfo *info) {
323 txn->roll_info.rollentry_raw_count = info->rollentry_raw_count;
324 uint32_t i;
325 for (i = 0; i < info->num_fts; i++) {
326 FT ft = info->open_fts[i];
327 toku_txn_maybe_note_ft(txn, ft);
328 }
329 txn->force_fsync_on_commit = info->force_fsync_on_commit;
330 txn->roll_info.num_rollback_nodes = info->num_rollback_nodes;
331 txn->roll_info.num_rollentries = info->num_rollentries;
332
333 txn->roll_info.spilled_rollback_head = info->spilled_rollback_head;
334 txn->roll_info.spilled_rollback_tail = info->spilled_rollback_tail;
335 txn->roll_info.current_rollback = info->current_rollback;
336 return 0;
337 }
338
toku_txn_commit_txn(TOKUTXN txn,int nosync,TXN_PROGRESS_POLL_FUNCTION poll,void * poll_extra)339 int toku_txn_commit_txn(TOKUTXN txn, int nosync,
340 TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra)
341 // Effect: Doesn't close the txn, just performs the commit operations.
342 // If release_multi_operation_client_lock is true, then unlock that lock (even if an error path is taken)
343 {
344 return toku_txn_commit_with_lsn(txn, nosync, ZERO_LSN,
345 poll, poll_extra);
346 }
347
348 struct xcommit_info {
349 int r;
350 TOKUTXN txn;
351 };
352
txn_note_commit(TOKUTXN txn)353 static void txn_note_commit(TOKUTXN txn) {
354 // Purpose:
355 // Delay until any indexer is done pinning this transaction.
356 // Update status of a transaction from live->committing (or prepared->committing)
357 // Do so in a thread-safe manner that does not conflict with hot indexing or
358 // begin checkpoint.
359 if (toku_txn_is_read_only(txn)) {
360 // Neither hot indexing nor checkpoint do any work with readonly txns,
361 // so we can skip taking the txn_manager lock here.
362 invariant(txn->state==TOKUTXN_LIVE);
363 txn->state = TOKUTXN_COMMITTING;
364 goto done;
365 }
366 if (txn->state==TOKUTXN_PREPARING) {
367 invalidate_xa_xid(&txn->xa_xid);
368 }
369 // for hot indexing, if hot index is processing
370 // this transaction in some leafentry, then we cannot change
371 // the state to commit or abort until
372 // hot index is done with that leafentry
373 toku_txn_lock_state(txn);
374 while (txn->num_pin > 0) {
375 toku_cond_wait(
376 &txn->state_cond,
377 &txn->state_lock
378 );
379 }
380 txn->state = TOKUTXN_COMMITTING;
381 toku_txn_unlock_state(txn);
382 done:
383 return;
384 }
385
toku_txn_commit_with_lsn(TOKUTXN txn,int nosync,LSN oplsn,TXN_PROGRESS_POLL_FUNCTION poll,void * poll_extra)386 int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, LSN oplsn,
387 TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra)
388 {
389 // there should be no child when we commit or abort a TOKUTXN
390 invariant(txn->child == NULL);
391 txn_note_commit(txn);
392
393 // Child transactions do not actually 'commit'. They promote their
394 // changes to parent, so no need to fsync if this txn has a parent. The
395 // do_sync state is captured in the txn for txn_maybe_fsync_log function
396 // Additionally, if the transaction was first prepared, we do not need to
397 // fsync because the prepare caused an fsync of the log. In this case,
398 // we do not need an additional of the log. We rely on the client running
399 // recovery to properly recommit this transaction if the commit
400 // does not make it to disk. In the case of MySQL, that would be the
401 // binary log.
402 txn->do_fsync = !txn->parent && (txn->force_fsync_on_commit || (!nosync && txn->roll_info.num_rollentries>0));
403
404 txn->progress_poll_fun = poll;
405 txn->progress_poll_fun_extra = poll_extra;
406
407 if (!toku_txn_is_read_only(txn)) {
408 toku_log_xcommit(txn->logger, &txn->do_fsync_lsn, 0, txn, txn->txnid);
409 }
410 // If !txn->begin_was_logged, we could skip toku_rollback_commit
411 // but it's cheap (only a number of function calls that return immediately)
412 // since there were no writes. Skipping it would mean we would need to be careful
413 // in case we added any additional required cleanup into those functions in the future.
414 int r = toku_rollback_commit(txn, oplsn);
415 TXN_STATUS_INC(TXN_COMMIT, 1);
416 return r;
417 }
418
toku_txn_abort_txn(TOKUTXN txn,TXN_PROGRESS_POLL_FUNCTION poll,void * poll_extra)419 int toku_txn_abort_txn(TOKUTXN txn,
420 TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra)
421 // Effect: Doesn't close the txn, just performs the abort operations.
422 // If release_multi_operation_client_lock is true, then unlock that lock (even if an error path is taken)
423 {
424 return toku_txn_abort_with_lsn(txn, ZERO_LSN, poll, poll_extra);
425 }
426
txn_note_abort(TOKUTXN txn)427 static void txn_note_abort(TOKUTXN txn) {
428 // Purpose:
429 // Delay until any indexer is done pinning this transaction.
430 // Update status of a transaction from live->aborting (or prepared->aborting)
431 // Do so in a thread-safe manner that does not conflict with hot indexing or
432 // begin checkpoint.
433 if (toku_txn_is_read_only(txn)) {
434 // Neither hot indexing nor checkpoint do any work with readonly txns,
435 // so we can skip taking the state lock here.
436 invariant(txn->state==TOKUTXN_LIVE);
437 txn->state = TOKUTXN_ABORTING;
438 goto done;
439 }
440 if (txn->state==TOKUTXN_PREPARING) {
441 invalidate_xa_xid(&txn->xa_xid);
442 }
443 // for hot indexing, if hot index is processing
444 // this transaction in some leafentry, then we cannot change
445 // the state to commit or abort until
446 // hot index is done with that leafentry
447 toku_txn_lock_state(txn);
448 while (txn->num_pin > 0) {
449 toku_cond_wait(
450 &txn->state_cond,
451 &txn->state_lock
452 );
453 }
454 txn->state = TOKUTXN_ABORTING;
455 toku_txn_unlock_state(txn);
456 done:
457 return;
458 }
459
toku_txn_abort_with_lsn(TOKUTXN txn,LSN oplsn,TXN_PROGRESS_POLL_FUNCTION poll,void * poll_extra)460 int toku_txn_abort_with_lsn(TOKUTXN txn, LSN oplsn,
461 TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra)
462 {
463 // there should be no child when we commit or abort a TOKUTXN
464 invariant(txn->child == NULL);
465 txn_note_abort(txn);
466
467 txn->progress_poll_fun = poll;
468 txn->progress_poll_fun_extra = poll_extra;
469 txn->do_fsync = false;
470
471 if (!toku_txn_is_read_only(txn)) {
472 toku_log_xabort(txn->logger, &txn->do_fsync_lsn, 0, txn, txn->txnid);
473 }
474 // If !txn->begin_was_logged, we could skip toku_rollback_abort
475 // but it's cheap (only a number of function calls that return immediately)
476 // since there were no writes. Skipping it would mean we would need to be careful
477 // in case we added any additional required cleanup into those functions in the future.
478 int r = toku_rollback_abort(txn, oplsn);
479 TXN_STATUS_INC(TXN_ABORT, 1);
480 return r;
481 }
482
copy_xid(TOKU_XA_XID * dest,TOKU_XA_XID * source)483 static void copy_xid (TOKU_XA_XID *dest, TOKU_XA_XID *source) {
484 TOKU_ANNOTATE_NEW_MEMORY(dest, sizeof(*dest));
485 dest->formatID = source->formatID;
486 dest->gtrid_length = source->gtrid_length;
487 dest->bqual_length = source->bqual_length;
488 memcpy(dest->data, source->data, source->gtrid_length+source->bqual_length);
489 }
490
toku_txn_prepare_txn(TOKUTXN txn,TOKU_XA_XID * xa_xid,int nosync)491 void toku_txn_prepare_txn (TOKUTXN txn, TOKU_XA_XID *xa_xid, int nosync) {
492 if (txn->parent || toku_txn_is_read_only(txn)) {
493 // We do not prepare children.
494 //
495 // Readonly transactions do the same if they commit or abort, so
496 // XA guarantees are free. No need to pay for overhead of prepare.
497 return;
498 }
499 assert(txn->state==TOKUTXN_LIVE);
500 // This state transition must be protected against begin_checkpoint
501 // Therefore, the caller must have the mo lock held
502 toku_txn_lock_state(txn);
503 txn->state = TOKUTXN_PREPARING;
504 toku_txn_unlock_state(txn);
505 // Do we need to do an fsync?
506 txn->do_fsync = txn->force_fsync_on_commit || (!nosync && txn->roll_info.num_rollentries>0);
507 copy_xid(&txn->xa_xid, xa_xid);
508 // This list will go away with #4683, so we wn't need the ydb lock for this anymore.
509 toku_log_xprepare(txn->logger, &txn->do_fsync_lsn, 0, txn, txn->txnid, xa_xid);
510 }
511
toku_txn_get_prepared_xa_xid(TOKUTXN txn,TOKU_XA_XID * xid)512 void toku_txn_get_prepared_xa_xid (TOKUTXN txn, TOKU_XA_XID *xid) {
513 copy_xid(xid, &txn->xa_xid);
514 }
515
toku_logger_recover_txn(TOKULOGGER logger,struct tokulogger_preplist preplist[],long count,long * retp,uint32_t flags)516 int toku_logger_recover_txn (TOKULOGGER logger, struct tokulogger_preplist preplist[/*count*/], long count, /*out*/ long *retp, uint32_t flags) {
517 return toku_txn_manager_recover_root_txn(
518 logger->txn_manager,
519 preplist,
520 count,
521 retp,
522 flags
523 );
524 }
525
toku_txn_maybe_fsync_log(TOKULOGGER logger,LSN do_fsync_lsn,bool do_fsync)526 void toku_txn_maybe_fsync_log(TOKULOGGER logger, LSN do_fsync_lsn, bool do_fsync) {
527 if (logger && do_fsync) {
528 toku_logger_fsync_if_lsn_not_fsynced(logger, do_fsync_lsn);
529 }
530 }
531
toku_txn_get_fsync_info(TOKUTXN ttxn,bool * do_fsync,LSN * do_fsync_lsn)532 void toku_txn_get_fsync_info(TOKUTXN ttxn, bool* do_fsync, LSN* do_fsync_lsn) {
533 *do_fsync = ttxn->do_fsync;
534 *do_fsync_lsn = ttxn->do_fsync_lsn;
535 }
536
toku_txn_close_txn(TOKUTXN txn)537 void toku_txn_close_txn(TOKUTXN txn) {
538 toku_txn_complete_txn(txn);
539 toku_txn_destroy_txn(txn);
540 }
541
542 int remove_txn (const FT &h, const uint32_t UU(idx), TOKUTXN const txn);
remove_txn(const FT & h,const uint32_t UU (idx),TOKUTXN const UU (txn))543 int remove_txn (const FT &h, const uint32_t UU(idx), TOKUTXN const UU(txn))
544 // Effect: This function is called on every open FT that a transaction used.
545 // This function removes the transaction from that FT.
546 {
547 toku_ft_remove_txn_ref(h);
548
549 return 0;
550 }
551
552 // for every ft in txn, remove it.
note_txn_closing(TOKUTXN txn)553 static void note_txn_closing (TOKUTXN txn) {
554 txn->open_fts.iterate<struct tokutxn, remove_txn>(txn);
555 }
556
toku_txn_complete_txn(TOKUTXN txn)557 void toku_txn_complete_txn(TOKUTXN txn) {
558 assert(txn->roll_info.spilled_rollback_head.b == ROLLBACK_NONE.b);
559 assert(txn->roll_info.spilled_rollback_tail.b == ROLLBACK_NONE.b);
560 assert(txn->roll_info.current_rollback.b == ROLLBACK_NONE.b);
561 assert(txn->num_pin == 0);
562 assert(txn->state == TOKUTXN_COMMITTING || txn->state == TOKUTXN_ABORTING || txn->state == TOKUTXN_PREPARING);
563 if (txn->parent) {
564 toku_txn_manager_handle_snapshot_destroy_for_child_txn(
565 txn,
566 txn->logger->txn_manager,
567 txn->snapshot_type
568 );
569 txn->parent->child_manager->finish_child_txn(txn);
570 }
571 else {
572 toku_txn_manager_finish_txn(txn->logger->txn_manager, txn);
573 txn->child_manager->destroy();
574 }
575 // note that here is another place we depend on
576 // this function being called with the multi operation lock
577 note_txn_closing(txn);
578 }
579
toku_txn_destroy_txn(TOKUTXN txn)580 void toku_txn_destroy_txn(TOKUTXN txn) {
581 txn->open_fts.destroy();
582 if (txn->xids) {
583 toku_xids_destroy(&txn->xids);
584 }
585 toku_mutex_destroy(&txn->txn_lock);
586 toku_mutex_destroy(&txn->state_lock);
587 toku_cond_destroy(&txn->state_cond);
588 toku_free(txn);
589 }
590
toku_txn_get_xids(TOKUTXN txn)591 XIDS toku_txn_get_xids (TOKUTXN txn) {
592 if (txn==0) return toku_xids_get_root_xids();
593 else return txn->xids;
594 }
595
toku_txn_force_fsync_on_commit(TOKUTXN txn)596 void toku_txn_force_fsync_on_commit(TOKUTXN txn) {
597 txn->force_fsync_on_commit = true;
598 }
599
toku_get_oldest_in_live_root_txn_list(TOKUTXN txn)600 TXNID toku_get_oldest_in_live_root_txn_list(TOKUTXN txn) {
601 TXNID xid;
602 if (txn->live_root_txn_list->size()>0) {
603 int r = txn->live_root_txn_list->fetch(0, &xid);
604 assert_zero(r);
605 }
606 else {
607 xid = TXNID_NONE;
608 }
609 return xid;
610 }
611
toku_is_txn_in_live_root_txn_list(const xid_omt_t & live_root_txn_list,TXNID xid)612 bool toku_is_txn_in_live_root_txn_list(const xid_omt_t &live_root_txn_list, TXNID xid) {
613 TXNID txnid;
614 bool retval = false;
615 int r = live_root_txn_list.find_zero<TXNID, toku_find_xid_by_xid>(xid, &txnid, nullptr);
616 if (r==0) {
617 invariant(txnid == xid);
618 retval = true;
619 }
620 else {
621 invariant(r==DB_NOTFOUND);
622 }
623 return retval;
624 }
625
626 TOKUTXN_STATE
toku_txn_get_state(TOKUTXN txn)627 toku_txn_get_state(TOKUTXN txn) {
628 return txn->state;
629 }
630
631 static void
maybe_log_begin_txn_for_write_operation_unlocked(TOKUTXN txn)632 maybe_log_begin_txn_for_write_operation_unlocked(TOKUTXN txn) {
633 // We now hold the lock.
634 if (txn->begin_was_logged) {
635 return;
636 }
637 TOKUTXN parent;
638 parent = txn->parent;
639 TXNID_PAIR xid;
640 xid = txn->txnid;
641 TXNID_PAIR pxid;
642 pxid = TXNID_PAIR_NONE;
643 if (parent) {
644 // Recursively log parent first if necessary.
645 // Transactions cannot do work if they have children,
646 // so the lowest level child's lock is sufficient for ancestors.
647 maybe_log_begin_txn_for_write_operation_unlocked(parent);
648 pxid = parent->txnid;
649 }
650
651 toku_log_xbegin(txn->logger, NULL, 0, xid, pxid);
652 txn->begin_was_logged = true;
653 }
654
655 void
toku_maybe_log_begin_txn_for_write_operation(TOKUTXN txn)656 toku_maybe_log_begin_txn_for_write_operation(TOKUTXN txn) {
657 toku_txn_lock(txn);
658 maybe_log_begin_txn_for_write_operation_unlocked(txn);
659 toku_txn_unlock(txn);
660 }
661
662 bool
toku_txn_is_read_only(TOKUTXN txn)663 toku_txn_is_read_only(TOKUTXN txn) {
664 // No need to recursively check children because parents are
665 // recursively logged before children.
666 if (!txn->begin_was_logged) {
667 // Did no work.
668 invariant(txn->roll_info.num_rollentries == 0);
669 invariant(txn->do_fsync_lsn.lsn == ZERO_LSN.lsn);
670 invariant(txn->open_fts.size() == 0);
671 invariant(txn->num_pin==0);
672 return true;
673 }
674 return false;
675 }
676
677 // needed for hot indexing
toku_txn_lock_state(TOKUTXN txn)678 void toku_txn_lock_state(TOKUTXN txn) {
679 toku_mutex_lock(&txn->state_lock);
680 }
toku_txn_unlock_state(TOKUTXN txn)681 void toku_txn_unlock_state(TOKUTXN txn){
682 toku_mutex_unlock(&txn->state_lock);
683 }
684
685
686 // prevents a client thread from transitioning txn from LIVE|PREPARING -> COMMITTING|ABORTING
687 // hot indexing may need a transactions to stay in the LIVE|PREPARING state while it processes
688 // a leafentry.
toku_txn_pin_live_txn_unlocked(TOKUTXN txn)689 void toku_txn_pin_live_txn_unlocked(TOKUTXN txn) {
690 assert(txn->state == TOKUTXN_LIVE || txn->state == TOKUTXN_PREPARING);
691 assert(!toku_txn_is_read_only(txn));
692 txn->num_pin++;
693 }
694
695 // allows a client thread to go back to being able to transition txn
696 // from LIVE|PREPARING -> COMMITTING|ABORTING
toku_txn_unpin_live_txn(TOKUTXN txn)697 void toku_txn_unpin_live_txn(TOKUTXN txn) {
698 assert(txn->state == TOKUTXN_LIVE || txn->state == TOKUTXN_PREPARING);
699 assert(txn->num_pin > 0);
700 toku_txn_lock_state(txn);
701 txn->num_pin--;
702 if (txn->num_pin == 0) {
703 toku_cond_broadcast(&txn->state_cond);
704 }
705 toku_txn_unlock_state(txn);
706 }
707
toku_txn_has_spilled_rollback(TOKUTXN txn)708 bool toku_txn_has_spilled_rollback(TOKUTXN txn) {
709 return txn_has_spilled_rollback_logs(txn);
710 }
711
toku_txn_get_client_id(TOKUTXN txn,uint64_t * client_id,void ** client_extra)712 void toku_txn_get_client_id(TOKUTXN txn, uint64_t *client_id, void **client_extra) {
713 if (client_id) *client_id = txn->client_id;
714 if (client_extra) *client_extra = txn->client_extra;
715 }
716
toku_txn_set_client_id(TOKUTXN txn,uint64_t client_id,void * client_extra)717 void toku_txn_set_client_id(TOKUTXN txn, uint64_t client_id, void *client_extra) {
718 txn->client_id = client_id;
719 txn->client_extra = client_extra;
720 }
721
toku_txn_get_start_time(struct tokutxn * txn)722 time_t toku_txn_get_start_time(struct tokutxn *txn) {
723 return txn->start_time;
724 }
725
726 extern uint force_recovery;
toku_txn_reads_txnid(TXNID txnid,TOKUTXN txn,bool is_provisional UU ())727 int toku_txn_reads_txnid(TXNID txnid, TOKUTXN txn, bool is_provisional UU()) {
728 if(force_recovery) {
729 return TOKUDB_ACCEPT;
730 }
731 int r = 0;
732 TXNID oldest_live_in_snapshot = toku_get_oldest_in_live_root_txn_list(txn);
733 if (oldest_live_in_snapshot == TXNID_NONE && txnid < txn->snapshot_txnid64) {
734 r = TOKUDB_ACCEPT;
735 } else if (txnid < oldest_live_in_snapshot || txnid == txn->txnid.parent_id64) {
736 r = TOKUDB_ACCEPT;
737 } else if (txnid > txn->snapshot_txnid64 || toku_is_txn_in_live_root_txn_list(*txn->live_root_txn_list, txnid)) {
738 r = 0;
739 } else {
740 r = TOKUDB_ACCEPT;
741 }
742 return r;
743 }
744
toku_txn_discard_txn(TOKUTXN txn)745 int toku_txn_discard_txn(TOKUTXN txn) {
746 int r = toku_rollback_discard(txn);
747 return r;
748 }
749
750 #include <toku_race_tools.h>
751 void __attribute__((__constructor__)) toku_txn_status_helgrind_ignore(void);
toku_txn_status_helgrind_ignore(void)752 void toku_txn_status_helgrind_ignore(void) {
753 TOKU_VALGRIND_HG_DISABLE_CHECKING(&txn_status, sizeof txn_status);
754 }
755