1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include "ft/cachetable/checkpoint.h"
40 #include "ft/ft.h"
41 #include "ft/logger/log-internal.h"
42 #include "ft/ule.h"
43 #include "ft/txn/rollback-apply.h"
44 #include "ft/txn/txn.h"
45 #include "ft/txn/txn_manager.h"
46 #include "util/status.h"
47 
48 toku_instr_key *txn_lock_mutex_key;
49 toku_instr_key *txn_state_lock_mutex_key;
50 toku_instr_key *result_state_cond_key;
51 
toku_txn_get_status(TXN_STATUS s)52 void toku_txn_get_status(TXN_STATUS s) {
53     txn_status.init();
54     *s = txn_status;
55 }
56 
57 void
toku_txn_lock(TOKUTXN txn)58 toku_txn_lock(TOKUTXN txn)
59 {
60     toku_mutex_lock(&txn->txn_lock);
61 }
62 
63 void
toku_txn_unlock(TOKUTXN txn)64 toku_txn_unlock(TOKUTXN txn)
65 {
66     toku_mutex_unlock(&txn->txn_lock);
67 }
68 
69 uint64_t
toku_txn_get_root_id(TOKUTXN txn)70 toku_txn_get_root_id(TOKUTXN txn)
71 {
72     return txn->txnid.parent_id64;
73 }
74 
txn_declared_read_only(TOKUTXN txn)75 bool txn_declared_read_only(TOKUTXN txn) {
76     return txn->declared_read_only;
77 }
78 
79 int
toku_txn_begin_txn(DB_TXN * container_db_txn,TOKUTXN parent_tokutxn,TOKUTXN * tokutxn,TOKULOGGER logger,TXN_SNAPSHOT_TYPE snapshot_type,bool read_only)80 toku_txn_begin_txn (
81     DB_TXN  *container_db_txn,
82     TOKUTXN parent_tokutxn,
83     TOKUTXN *tokutxn,
84     TOKULOGGER logger,
85     TXN_SNAPSHOT_TYPE snapshot_type,
86     bool read_only
87     )
88 {
89     int r = toku_txn_begin_with_xid(
90         parent_tokutxn,
91         tokutxn,
92         logger,
93         TXNID_PAIR_NONE,
94         snapshot_type,
95         container_db_txn,
96         false, // for_recovery
97         read_only
98         );
99     return r;
100 }
101 
102 
103 static void
txn_create_xids(TOKUTXN txn,TOKUTXN parent)104 txn_create_xids(TOKUTXN txn, TOKUTXN parent) {
105     XIDS xids;
106     XIDS parent_xids;
107     if (parent == NULL) {
108         parent_xids = toku_xids_get_root_xids();
109     } else {
110         parent_xids = parent->xids;
111     }
112     toku_xids_create_unknown_child(parent_xids, &xids);
113     TXNID finalized_xid = (parent == NULL) ? txn->txnid.parent_id64 : txn->txnid.child_id64;
114     toku_xids_finalize_with_child(xids, finalized_xid);
115     txn->xids = xids;
116 }
117 
118 // Allocate and initialize a txn
119 static void toku_txn_create_txn(TOKUTXN *txn_ptr, TOKUTXN parent, TOKULOGGER logger, TXN_SNAPSHOT_TYPE snapshot_type, DB_TXN *container_db_txn, bool for_checkpoint, bool read_only);
120 
121 int
toku_txn_begin_with_xid(TOKUTXN parent,TOKUTXN * txnp,TOKULOGGER logger,TXNID_PAIR xid,TXN_SNAPSHOT_TYPE snapshot_type,DB_TXN * container_db_txn,bool for_recovery,bool read_only)122 toku_txn_begin_with_xid (
123     TOKUTXN parent,
124     TOKUTXN *txnp,
125     TOKULOGGER logger,
126     TXNID_PAIR xid,
127     TXN_SNAPSHOT_TYPE snapshot_type,
128     DB_TXN *container_db_txn,
129     bool for_recovery,
130     bool read_only
131     )
132 {
133     int r = 0;
134     TOKUTXN txn;
135     // check for case where we are trying to
136     // create too many nested transactions
137     if (!read_only && parent && !toku_xids_can_create_child(parent->xids)) {
138         r = EINVAL;
139         goto exit;
140     }
141     if (read_only && parent) {
142         invariant(txn_declared_read_only(parent));
143     }
144     toku_txn_create_txn(&txn, parent, logger, snapshot_type, container_db_txn, for_recovery, read_only);
145     // txnid64, snapshot_txnid64
146     // will be set in here.
147     if (for_recovery) {
148         if (parent == NULL) {
149             invariant(xid.child_id64 == TXNID_NONE);
150             toku_txn_manager_start_txn_for_recovery(
151                 txn,
152                 logger->txn_manager,
153                 xid.parent_id64
154                 );
155         }
156         else {
157             parent->child_manager->start_child_txn_for_recovery(txn, parent, xid);
158         }
159     }
160     else {
161         assert(xid.parent_id64 == TXNID_NONE);
162         assert(xid.child_id64 == TXNID_NONE);
163         if (parent == NULL) {
164             toku_txn_manager_start_txn(
165                 txn,
166                 logger->txn_manager,
167                 snapshot_type,
168                 read_only
169                 );
170         }
171         else {
172             parent->child_manager->start_child_txn(txn, parent);
173             toku_txn_manager_handle_snapshot_create_for_child_txn(
174                 txn,
175                 logger->txn_manager,
176                 snapshot_type
177                 );
178         }
179     }
180     if (!read_only) {
181         // this call will set txn->xids
182         txn_create_xids(txn, parent);
183     }
184     toku_unsafe_set(txnp, txn);
185 exit:
186     return r;
187 }
188 
189 DB_TXN *
toku_txn_get_container_db_txn(TOKUTXN tokutxn)190 toku_txn_get_container_db_txn (TOKUTXN tokutxn) {
191     DB_TXN * container = tokutxn->container_db_txn;
192     return container;
193 }
194 
toku_txn_set_container_db_txn(TOKUTXN tokutxn,DB_TXN * container)195 void toku_txn_set_container_db_txn (TOKUTXN tokutxn, DB_TXN*container) {
196     tokutxn->container_db_txn = container;
197 }
198 
invalidate_xa_xid(TOKU_XA_XID * xid)199 static void invalidate_xa_xid (TOKU_XA_XID *xid) {
200     TOKU_ANNOTATE_NEW_MEMORY(xid, sizeof(*xid)); // consider it to be all invalid for valgrind
201     xid->formatID = -1; // According to the XA spec, -1 means "invalid data"
202 }
203 
toku_txn_create_txn(TOKUTXN * tokutxn,TOKUTXN parent_tokutxn,TOKULOGGER logger,TXN_SNAPSHOT_TYPE snapshot_type,DB_TXN * container_db_txn,bool for_recovery,bool read_only)204 static void toku_txn_create_txn (
205     TOKUTXN *tokutxn,
206     TOKUTXN parent_tokutxn,
207     TOKULOGGER logger,
208     TXN_SNAPSHOT_TYPE snapshot_type,
209     DB_TXN *container_db_txn,
210     bool for_recovery,
211     bool read_only
212     )
213 {
214     assert(logger->rollback_cachefile);
215 
216     omt<FT> open_fts;
217     open_fts.create_no_array();
218 
219     struct txn_roll_info roll_info = {
220         .num_rollback_nodes = 0,
221         .num_rollentries = 0,
222         .num_rollentries_processed = 0,
223         .rollentry_raw_count = 0,
224         .spilled_rollback_head = ROLLBACK_NONE,
225         .spilled_rollback_tail = ROLLBACK_NONE,
226         .current_rollback = ROLLBACK_NONE,
227     };
228 
229 static txn_child_manager tcm;
230 
231 struct tokutxn new_txn = {
232     .txnid = {.parent_id64 = TXNID_NONE, .child_id64 = TXNID_NONE },
233     .snapshot_txnid64 = TXNID_NONE,
234     .snapshot_type = for_recovery ? TXN_SNAPSHOT_NONE : snapshot_type,
235     .for_recovery = for_recovery,
236     .logger = logger,
237     .parent = parent_tokutxn,
238     .child = NULL,
239     .child_manager_s = tcm,
240     .child_manager = NULL,
241     .container_db_txn = container_db_txn,
242     .live_root_txn_list = nullptr,
243     .xids = NULL,
244     .snapshot_next = NULL,
245     .snapshot_prev = NULL,
246     .begin_was_logged = false,
247     .declared_read_only = read_only,
248     .do_fsync = false,
249     .force_fsync_on_commit = false,
250     .do_fsync_lsn = ZERO_LSN,
251     .xa_xid = {0, 0, 0, ""},
252     .progress_poll_fun = NULL,
253     .progress_poll_fun_extra = NULL,
254 
255     // You cannot initialize txn_lock a TOKU_MUTEX_INITIALIZER, because we
256     // will initialize it in the code below, and it cannot already
257     // be initialized at that point.  Also, in general, you don't
258     // get to use PTHREAD_MUTEX_INITALIZER (which is what is inside
259     // TOKU_MUTEX_INITIALIZER) except in static variables, and this
260     // is initializing an auto variable.
261     //
262     // And we cannot simply avoid initializing these fields
263     // because, although it avoids -Wmissing-field-initializer
264     // errors under gcc, it gets other errors about non-trivial
265     // designated initializers not being supported.
266 
267     .txn_lock = ZERO_MUTEX_INITIALIZER,   // Not TOKU_MUTEX_INITIALIZER
268     .open_fts = open_fts,
269     .roll_info = roll_info,
270     .state_lock = ZERO_MUTEX_INITIALIZER, // Not TOKU_MUTEX_INITIALIZER
271     .state_cond = ZERO_COND_INITIALIZER,  // Not TOKU_COND_INITIALIZER
272     .state = TOKUTXN_LIVE,
273     .num_pin = 0,
274     .client_id = 0,
275     .client_extra = nullptr,
276     .start_time = time(NULL),
277 };
278 
279 TOKUTXN result = NULL;
280 XMEMDUP(result, &new_txn);
281 invalidate_xa_xid(&result->xa_xid);
282 if (parent_tokutxn == NULL) {
283     result->child_manager = &result->child_manager_s;
284     result->child_manager->init(result);
285     }
286     else {
287         result->child_manager = parent_tokutxn->child_manager;
288     }
289 
290     toku_mutex_init(*txn_lock_mutex_key, &result->txn_lock, nullptr);
291 
292     toku_pthread_mutexattr_t attr;
293     toku_mutexattr_init(&attr);
294     toku_mutexattr_settype(&attr, TOKU_MUTEX_ADAPTIVE);
295     toku_mutex_init(*txn_state_lock_mutex_key, &result->state_lock, &attr);
296     toku_mutexattr_destroy(&attr);
297 
298     toku_cond_init(*result_state_cond_key, &result->state_cond, nullptr);
299 
300     *tokutxn = result;
301 
302     if (read_only) {
303         TXN_STATUS_INC(TXN_READ_BEGIN, 1);
304     }
305     else {
306         TXN_STATUS_INC(TXN_BEGIN, 1);
307     }
308 }
309 
310 void
toku_txn_update_xids_in_txn(TOKUTXN txn,TXNID xid)311 toku_txn_update_xids_in_txn(TOKUTXN txn, TXNID xid)
312 {
313     // these should not have been set yet
314     invariant(txn->txnid.parent_id64 == TXNID_NONE);
315     invariant(txn->txnid.child_id64 == TXNID_NONE);
316     txn->txnid.parent_id64 = xid;
317     txn->txnid.child_id64 = TXNID_NONE;
318 }
319 
320 //Used on recovery to recover a transaction.
321 int
toku_txn_load_txninfo(TOKUTXN txn,struct txninfo * info)322 toku_txn_load_txninfo (TOKUTXN txn, struct txninfo *info) {
323     txn->roll_info.rollentry_raw_count = info->rollentry_raw_count;
324     uint32_t i;
325     for (i = 0; i < info->num_fts; i++) {
326         FT ft = info->open_fts[i];
327         toku_txn_maybe_note_ft(txn, ft);
328     }
329     txn->force_fsync_on_commit = info->force_fsync_on_commit;
330     txn->roll_info.num_rollback_nodes = info->num_rollback_nodes;
331     txn->roll_info.num_rollentries = info->num_rollentries;
332 
333     txn->roll_info.spilled_rollback_head = info->spilled_rollback_head;
334     txn->roll_info.spilled_rollback_tail = info->spilled_rollback_tail;
335     txn->roll_info.current_rollback = info->current_rollback;
336     return 0;
337 }
338 
toku_txn_commit_txn(TOKUTXN txn,int nosync,TXN_PROGRESS_POLL_FUNCTION poll,void * poll_extra)339 int toku_txn_commit_txn(TOKUTXN txn, int nosync,
340                         TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra)
341 // Effect: Doesn't close the txn, just performs the commit operations.
342 //  If release_multi_operation_client_lock is true, then unlock that lock (even if an error path is taken)
343 {
344     return toku_txn_commit_with_lsn(txn, nosync, ZERO_LSN,
345                                     poll, poll_extra);
346 }
347 
348 struct xcommit_info {
349     int r;
350     TOKUTXN txn;
351 };
352 
txn_note_commit(TOKUTXN txn)353 static void txn_note_commit(TOKUTXN txn) {
354     // Purpose:
355     //  Delay until any indexer is done pinning this transaction.
356     //  Update status of a transaction from live->committing (or prepared->committing)
357     //  Do so in a thread-safe manner that does not conflict with hot indexing or
358     //  begin checkpoint.
359     if (toku_txn_is_read_only(txn)) {
360         // Neither hot indexing nor checkpoint do any work with readonly txns,
361         // so we can skip taking the txn_manager lock here.
362         invariant(txn->state==TOKUTXN_LIVE);
363         txn->state = TOKUTXN_COMMITTING;
364         goto done;
365     }
366     if (txn->state==TOKUTXN_PREPARING) {
367         invalidate_xa_xid(&txn->xa_xid);
368     }
369     // for hot indexing, if hot index is processing
370     // this transaction in some leafentry, then we cannot change
371     // the state to commit or abort until
372     // hot index is done with that leafentry
373     toku_txn_lock_state(txn);
374     while (txn->num_pin > 0) {
375         toku_cond_wait(
376             &txn->state_cond,
377             &txn->state_lock
378             );
379     }
380     txn->state = TOKUTXN_COMMITTING;
381     toku_txn_unlock_state(txn);
382 done:
383     return;
384 }
385 
toku_txn_commit_with_lsn(TOKUTXN txn,int nosync,LSN oplsn,TXN_PROGRESS_POLL_FUNCTION poll,void * poll_extra)386 int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, LSN oplsn,
387                              TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra)
388 {
389     // there should be no child when we commit or abort a TOKUTXN
390     invariant(txn->child == NULL);
391     txn_note_commit(txn);
392 
393     // Child transactions do not actually 'commit'.  They promote their
394     // changes to parent, so no need to fsync if this txn has a parent. The
395     // do_sync state is captured in the txn for txn_maybe_fsync_log function
396     // Additionally, if the transaction was first prepared, we do not need to
397     // fsync because the prepare caused an fsync of the log. In this case,
398     // we do not need an additional of the log. We rely on the client running
399     // recovery to properly recommit this transaction if the commit
400     // does not make it to disk. In the case of MySQL, that would be the
401     // binary log.
402     txn->do_fsync = !txn->parent && (txn->force_fsync_on_commit || (!nosync && txn->roll_info.num_rollentries>0));
403 
404     txn->progress_poll_fun = poll;
405     txn->progress_poll_fun_extra = poll_extra;
406 
407     if (!toku_txn_is_read_only(txn)) {
408         toku_log_xcommit(txn->logger, &txn->do_fsync_lsn, 0, txn, txn->txnid);
409     }
410     // If !txn->begin_was_logged, we could skip toku_rollback_commit
411     // but it's cheap (only a number of function calls that return immediately)
412     // since there were no writes.  Skipping it would mean we would need to be careful
413     // in case we added any additional required cleanup into those functions in the future.
414     int r = toku_rollback_commit(txn, oplsn);
415     TXN_STATUS_INC(TXN_COMMIT, 1);
416     return r;
417 }
418 
toku_txn_abort_txn(TOKUTXN txn,TXN_PROGRESS_POLL_FUNCTION poll,void * poll_extra)419 int toku_txn_abort_txn(TOKUTXN txn,
420                        TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra)
421 // Effect: Doesn't close the txn, just performs the abort operations.
422 // If release_multi_operation_client_lock is true, then unlock that lock (even if an error path is taken)
423 {
424     return toku_txn_abort_with_lsn(txn, ZERO_LSN, poll, poll_extra);
425 }
426 
txn_note_abort(TOKUTXN txn)427 static void txn_note_abort(TOKUTXN txn) {
428     // Purpose:
429     //  Delay until any indexer is done pinning this transaction.
430     //  Update status of a transaction from live->aborting (or prepared->aborting)
431     //  Do so in a thread-safe manner that does not conflict with hot indexing or
432     //  begin checkpoint.
433     if (toku_txn_is_read_only(txn)) {
434         // Neither hot indexing nor checkpoint do any work with readonly txns,
435         // so we can skip taking the state lock here.
436         invariant(txn->state==TOKUTXN_LIVE);
437         txn->state = TOKUTXN_ABORTING;
438         goto done;
439     }
440     if (txn->state==TOKUTXN_PREPARING) {
441         invalidate_xa_xid(&txn->xa_xid);
442     }
443     // for hot indexing, if hot index is processing
444     // this transaction in some leafentry, then we cannot change
445     // the state to commit or abort until
446     // hot index is done with that leafentry
447     toku_txn_lock_state(txn);
448     while (txn->num_pin > 0) {
449         toku_cond_wait(
450             &txn->state_cond,
451             &txn->state_lock
452             );
453     }
454     txn->state = TOKUTXN_ABORTING;
455     toku_txn_unlock_state(txn);
456 done:
457     return;
458 }
459 
toku_txn_abort_with_lsn(TOKUTXN txn,LSN oplsn,TXN_PROGRESS_POLL_FUNCTION poll,void * poll_extra)460 int toku_txn_abort_with_lsn(TOKUTXN txn, LSN oplsn,
461                             TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra)
462 {
463     // there should be no child when we commit or abort a TOKUTXN
464     invariant(txn->child == NULL);
465     txn_note_abort(txn);
466 
467     txn->progress_poll_fun = poll;
468     txn->progress_poll_fun_extra = poll_extra;
469     txn->do_fsync = false;
470 
471     if (!toku_txn_is_read_only(txn)) {
472         toku_log_xabort(txn->logger, &txn->do_fsync_lsn, 0, txn, txn->txnid);
473     }
474     // If !txn->begin_was_logged, we could skip toku_rollback_abort
475     // but it's cheap (only a number of function calls that return immediately)
476     // since there were no writes.  Skipping it would mean we would need to be careful
477     // in case we added any additional required cleanup into those functions in the future.
478     int r = toku_rollback_abort(txn, oplsn);
479     TXN_STATUS_INC(TXN_ABORT, 1);
480     return r;
481 }
482 
copy_xid(TOKU_XA_XID * dest,TOKU_XA_XID * source)483 static void copy_xid (TOKU_XA_XID *dest, TOKU_XA_XID *source) {
484     TOKU_ANNOTATE_NEW_MEMORY(dest, sizeof(*dest));
485     dest->formatID     = source->formatID;
486     dest->gtrid_length = source->gtrid_length;
487     dest->bqual_length = source->bqual_length;
488     memcpy(dest->data, source->data, source->gtrid_length+source->bqual_length);
489 }
490 
toku_txn_prepare_txn(TOKUTXN txn,TOKU_XA_XID * xa_xid,int nosync)491 void toku_txn_prepare_txn (TOKUTXN txn, TOKU_XA_XID *xa_xid, int nosync) {
492     if (txn->parent || toku_txn_is_read_only(txn)) {
493         // We do not prepare children.
494         //
495         // Readonly transactions do the same if they commit or abort, so
496         // XA guarantees are free.  No need to pay for overhead of prepare.
497         return;
498     }
499     assert(txn->state==TOKUTXN_LIVE);
500     // This state transition must be protected against begin_checkpoint
501     // Therefore, the caller must have the mo lock held
502     toku_txn_lock_state(txn);
503     txn->state = TOKUTXN_PREPARING;
504     toku_txn_unlock_state(txn);
505     // Do we need to do an fsync?
506     txn->do_fsync = txn->force_fsync_on_commit || (!nosync && txn->roll_info.num_rollentries>0);
507     copy_xid(&txn->xa_xid, xa_xid);
508     // This list will go away with #4683, so we wn't need the ydb lock for this anymore.
509     toku_log_xprepare(txn->logger, &txn->do_fsync_lsn, 0, txn, txn->txnid, xa_xid);
510 }
511 
toku_txn_get_prepared_xa_xid(TOKUTXN txn,TOKU_XA_XID * xid)512 void toku_txn_get_prepared_xa_xid (TOKUTXN txn, TOKU_XA_XID *xid) {
513     copy_xid(xid, &txn->xa_xid);
514 }
515 
toku_logger_recover_txn(TOKULOGGER logger,struct tokulogger_preplist preplist[],long count,long * retp,uint32_t flags)516 int toku_logger_recover_txn (TOKULOGGER logger, struct tokulogger_preplist preplist[/*count*/], long count, /*out*/ long *retp, uint32_t flags) {
517     return toku_txn_manager_recover_root_txn(
518         logger->txn_manager,
519         preplist,
520         count,
521         retp,
522         flags
523         );
524 }
525 
toku_txn_maybe_fsync_log(TOKULOGGER logger,LSN do_fsync_lsn,bool do_fsync)526 void toku_txn_maybe_fsync_log(TOKULOGGER logger, LSN do_fsync_lsn, bool do_fsync) {
527     if (logger && do_fsync) {
528         toku_logger_fsync_if_lsn_not_fsynced(logger, do_fsync_lsn);
529     }
530 }
531 
toku_txn_get_fsync_info(TOKUTXN ttxn,bool * do_fsync,LSN * do_fsync_lsn)532 void toku_txn_get_fsync_info(TOKUTXN ttxn, bool* do_fsync, LSN* do_fsync_lsn) {
533     *do_fsync = ttxn->do_fsync;
534     *do_fsync_lsn = ttxn->do_fsync_lsn;
535 }
536 
toku_txn_close_txn(TOKUTXN txn)537 void toku_txn_close_txn(TOKUTXN txn) {
538     toku_txn_complete_txn(txn);
539     toku_txn_destroy_txn(txn);
540 }
541 
542 int remove_txn (const FT &h, const uint32_t UU(idx), TOKUTXN const txn);
remove_txn(const FT & h,const uint32_t UU (idx),TOKUTXN const UU (txn))543 int remove_txn (const FT &h, const uint32_t UU(idx), TOKUTXN const UU(txn))
544 // Effect:  This function is called on every open FT that a transaction used.
545 //  This function removes the transaction from that FT.
546 {
547     toku_ft_remove_txn_ref(h);
548 
549     return 0;
550 }
551 
552 // for every ft in txn, remove it.
note_txn_closing(TOKUTXN txn)553 static void note_txn_closing (TOKUTXN txn) {
554     txn->open_fts.iterate<struct tokutxn, remove_txn>(txn);
555 }
556 
toku_txn_complete_txn(TOKUTXN txn)557 void toku_txn_complete_txn(TOKUTXN txn) {
558     assert(txn->roll_info.spilled_rollback_head.b == ROLLBACK_NONE.b);
559     assert(txn->roll_info.spilled_rollback_tail.b == ROLLBACK_NONE.b);
560     assert(txn->roll_info.current_rollback.b == ROLLBACK_NONE.b);
561     assert(txn->num_pin == 0);
562     assert(txn->state == TOKUTXN_COMMITTING || txn->state == TOKUTXN_ABORTING || txn->state == TOKUTXN_PREPARING);
563     if (txn->parent) {
564         toku_txn_manager_handle_snapshot_destroy_for_child_txn(
565             txn,
566             txn->logger->txn_manager,
567             txn->snapshot_type
568             );
569         txn->parent->child_manager->finish_child_txn(txn);
570     }
571     else {
572         toku_txn_manager_finish_txn(txn->logger->txn_manager, txn);
573         txn->child_manager->destroy();
574     }
575     // note that here is another place we depend on
576     // this function being called with the multi operation lock
577     note_txn_closing(txn);
578 }
579 
toku_txn_destroy_txn(TOKUTXN txn)580 void toku_txn_destroy_txn(TOKUTXN txn) {
581     txn->open_fts.destroy();
582     if (txn->xids) {
583         toku_xids_destroy(&txn->xids);
584     }
585     toku_mutex_destroy(&txn->txn_lock);
586     toku_mutex_destroy(&txn->state_lock);
587     toku_cond_destroy(&txn->state_cond);
588     toku_free(txn);
589 }
590 
toku_txn_get_xids(TOKUTXN txn)591 XIDS toku_txn_get_xids (TOKUTXN txn) {
592     if (txn==0) return toku_xids_get_root_xids();
593     else return txn->xids;
594 }
595 
toku_txn_force_fsync_on_commit(TOKUTXN txn)596 void toku_txn_force_fsync_on_commit(TOKUTXN txn) {
597     txn->force_fsync_on_commit = true;
598 }
599 
toku_get_oldest_in_live_root_txn_list(TOKUTXN txn)600 TXNID toku_get_oldest_in_live_root_txn_list(TOKUTXN txn) {
601     TXNID xid;
602     if (txn->live_root_txn_list->size()>0) {
603         int r = txn->live_root_txn_list->fetch(0, &xid);
604         assert_zero(r);
605     }
606     else {
607         xid = TXNID_NONE;
608     }
609     return xid;
610 }
611 
toku_is_txn_in_live_root_txn_list(const xid_omt_t & live_root_txn_list,TXNID xid)612 bool toku_is_txn_in_live_root_txn_list(const xid_omt_t &live_root_txn_list, TXNID xid) {
613     TXNID txnid;
614     bool retval = false;
615     int r = live_root_txn_list.find_zero<TXNID, toku_find_xid_by_xid>(xid, &txnid, nullptr);
616     if (r==0) {
617         invariant(txnid == xid);
618         retval = true;
619     }
620     else {
621         invariant(r==DB_NOTFOUND);
622     }
623     return retval;
624 }
625 
626 TOKUTXN_STATE
toku_txn_get_state(TOKUTXN txn)627 toku_txn_get_state(TOKUTXN txn) {
628     return txn->state;
629 }
630 
631 static void
maybe_log_begin_txn_for_write_operation_unlocked(TOKUTXN txn)632 maybe_log_begin_txn_for_write_operation_unlocked(TOKUTXN txn) {
633     // We now hold the lock.
634     if (txn->begin_was_logged) {
635         return;
636     }
637     TOKUTXN parent;
638     parent = txn->parent;
639     TXNID_PAIR xid;
640     xid = txn->txnid;
641     TXNID_PAIR pxid;
642     pxid = TXNID_PAIR_NONE;
643     if (parent) {
644         // Recursively log parent first if necessary.
645         // Transactions cannot do work if they have children,
646         // so the lowest level child's lock is sufficient for ancestors.
647         maybe_log_begin_txn_for_write_operation_unlocked(parent);
648         pxid = parent->txnid;
649     }
650 
651     toku_log_xbegin(txn->logger, NULL, 0, xid, pxid);
652     txn->begin_was_logged = true;
653 }
654 
655 void
toku_maybe_log_begin_txn_for_write_operation(TOKUTXN txn)656 toku_maybe_log_begin_txn_for_write_operation(TOKUTXN txn) {
657     toku_txn_lock(txn);
658     maybe_log_begin_txn_for_write_operation_unlocked(txn);
659     toku_txn_unlock(txn);
660 }
661 
662 bool
toku_txn_is_read_only(TOKUTXN txn)663 toku_txn_is_read_only(TOKUTXN txn) {
664     // No need to recursively check children because parents are
665     // recursively logged before children.
666     if (!txn->begin_was_logged) {
667         // Did no work.
668         invariant(txn->roll_info.num_rollentries == 0);
669         invariant(txn->do_fsync_lsn.lsn == ZERO_LSN.lsn);
670         invariant(txn->open_fts.size() == 0);
671         invariant(txn->num_pin==0);
672         return true;
673     }
674     return false;
675 }
676 
677 // needed for hot indexing
toku_txn_lock_state(TOKUTXN txn)678 void toku_txn_lock_state(TOKUTXN txn) {
679     toku_mutex_lock(&txn->state_lock);
680 }
toku_txn_unlock_state(TOKUTXN txn)681 void toku_txn_unlock_state(TOKUTXN txn){
682     toku_mutex_unlock(&txn->state_lock);
683 }
684 
685 
686 // prevents a client thread from transitioning txn from LIVE|PREPARING -> COMMITTING|ABORTING
687 // hot indexing may need a transactions to stay in the LIVE|PREPARING state while it processes
688 // a leafentry.
toku_txn_pin_live_txn_unlocked(TOKUTXN txn)689 void toku_txn_pin_live_txn_unlocked(TOKUTXN txn) {
690     assert(txn->state == TOKUTXN_LIVE || txn->state == TOKUTXN_PREPARING);
691     assert(!toku_txn_is_read_only(txn));
692     txn->num_pin++;
693 }
694 
695 // allows a client thread to go back to being able to transition txn
696 // from LIVE|PREPARING -> COMMITTING|ABORTING
toku_txn_unpin_live_txn(TOKUTXN txn)697 void toku_txn_unpin_live_txn(TOKUTXN txn) {
698     assert(txn->state == TOKUTXN_LIVE || txn->state == TOKUTXN_PREPARING);
699     assert(txn->num_pin > 0);
700     toku_txn_lock_state(txn);
701     txn->num_pin--;
702     if (txn->num_pin == 0) {
703         toku_cond_broadcast(&txn->state_cond);
704     }
705     toku_txn_unlock_state(txn);
706 }
707 
toku_txn_has_spilled_rollback(TOKUTXN txn)708 bool toku_txn_has_spilled_rollback(TOKUTXN txn) {
709     return txn_has_spilled_rollback_logs(txn);
710 }
711 
toku_txn_get_client_id(TOKUTXN txn,uint64_t * client_id,void ** client_extra)712 void toku_txn_get_client_id(TOKUTXN txn, uint64_t *client_id, void **client_extra) {
713     if (client_id) *client_id = txn->client_id;
714     if (client_extra) *client_extra = txn->client_extra;
715 }
716 
toku_txn_set_client_id(TOKUTXN txn,uint64_t client_id,void * client_extra)717 void toku_txn_set_client_id(TOKUTXN txn, uint64_t client_id, void *client_extra) {
718     txn->client_id = client_id;
719     txn->client_extra = client_extra;
720 }
721 
toku_txn_get_start_time(struct tokutxn * txn)722 time_t toku_txn_get_start_time(struct tokutxn *txn) {
723     return txn->start_time;
724 }
725 
726 extern uint force_recovery;
toku_txn_reads_txnid(TXNID txnid,TOKUTXN txn,bool is_provisional UU ())727 int toku_txn_reads_txnid(TXNID txnid, TOKUTXN txn, bool is_provisional UU()) {
728     if(force_recovery) {
729         return TOKUDB_ACCEPT;
730     }
731     int r = 0;
732     TXNID oldest_live_in_snapshot = toku_get_oldest_in_live_root_txn_list(txn);
733     if (oldest_live_in_snapshot == TXNID_NONE && txnid < txn->snapshot_txnid64) {
734         r = TOKUDB_ACCEPT;
735     } else if (txnid < oldest_live_in_snapshot || txnid == txn->txnid.parent_id64) {
736         r = TOKUDB_ACCEPT;
737     } else if (txnid > txn->snapshot_txnid64 || toku_is_txn_in_live_root_txn_list(*txn->live_root_txn_list, txnid)) {
738         r = 0;
739     } else {
740         r = TOKUDB_ACCEPT;
741     }
742     return r;
743 }
744 
toku_txn_discard_txn(TOKUTXN txn)745 int toku_txn_discard_txn(TOKUTXN txn) {
746     int r = toku_rollback_discard(txn);
747     return r;
748 }
749 
750 #include <toku_race_tools.h>
751 void __attribute__((__constructor__)) toku_txn_status_helgrind_ignore(void);
toku_txn_status_helgrind_ignore(void)752 void toku_txn_status_helgrind_ignore(void) {
753     TOKU_VALGRIND_HG_DISABLE_CHECKING(&txn_status, sizeof txn_status);
754 }
755