1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include "portability/toku_race_tools.h"
40 
41 #include "ft/cachetable/checkpoint.h"
42 #include "ft/logger/log-internal.h"
43 #include "ft/ule.h"
44 #include "ft/txn/txn.h"
45 #include "ft/txn/txn_manager.h"
46 #include "ft/txn/rollback.h"
47 #include "util/omt.h"
48 //this is only for testing
49 
50 static void  (* test_txn_sync_callback) (pthread_t, void *) = NULL;
51 static void * test_txn_sync_callback_extra = NULL;
52 
set_test_txn_sync_callback(void (* cb)(pthread_t,void *),void * extra)53 void set_test_txn_sync_callback(void (*cb) (pthread_t, void *), void *extra) {
54         test_txn_sync_callback = cb;
55         test_txn_sync_callback_extra = extra;
56 }
57 bool garbage_collection_debug = false;
58 
59 toku_instr_key *txn_manager_lock_mutex_key;
60 
txn_records_snapshot(TXN_SNAPSHOT_TYPE snapshot_type,struct tokutxn * parent)61 static bool txn_records_snapshot(TXN_SNAPSHOT_TYPE snapshot_type,
62                                  struct tokutxn *parent) {
63     if (snapshot_type == TXN_COPIES_SNAPSHOT) {
64         return false;
65     }
66     // we need a snapshot if the snapshot type is a child or
67     // if the snapshot type is root and we have no parent.
68     // Cases that we don't need a snapshot: when snapshot type is NONE
69     //  or when it is ROOT and we have a parent
70     return (snapshot_type != TXN_SNAPSHOT_NONE && (parent==NULL || snapshot_type == TXN_SNAPSHOT_CHILD));
71 }
72 
txn_copies_snapshot(TXN_SNAPSHOT_TYPE snapshot_type,struct tokutxn * parent)73 static bool txn_copies_snapshot(TXN_SNAPSHOT_TYPE snapshot_type, struct tokutxn *parent) {
74     return (snapshot_type == TXN_COPIES_SNAPSHOT) || txn_records_snapshot(snapshot_type, parent);
75 }
76 
77 // internal locking functions, should use this instead of accessing lock directly
78 static void txn_manager_lock(TXN_MANAGER txn_manager);
79 static void txn_manager_unlock(TXN_MANAGER txn_manager);
80 
81 #if 0
82 static bool is_txnid_live(TXN_MANAGER txn_manager, TXNID txnid) {
83     TOKUTXN result = NULL;
84     toku_txn_manager_id2txn_unlocked(txn_manager, txnid, &result);
85     return (result != NULL);
86 }
87 #endif
88 
89 //Heaviside function to search through an OMT by a TXNID
90 int find_by_xid (const TOKUTXN &txn, const TXNID &txnidfind);
91 
is_txnid_live(TXN_MANAGER txn_manager,TXNID txnid)92 static bool is_txnid_live(TXN_MANAGER txn_manager, TXNID txnid) {
93     TOKUTXN result = NULL;
94     TXNID_PAIR id = { .parent_id64 = txnid, .child_id64 = TXNID_NONE };
95     toku_txn_manager_id2txn_unlocked(txn_manager, id, &result);
96     return (result != NULL);
97 }
98 
99 static void toku_txn_manager_clone_state_for_gc_unlocked(
100     TXN_MANAGER txn_manager,
101     xid_omt_t* snapshot_xids,
102     rx_omt_t* referenced_xids,
103     xid_omt_t* live_root_txns
104     );
105 
106 static void
verify_snapshot_system(TXN_MANAGER txn_manager UU ())107 verify_snapshot_system(TXN_MANAGER txn_manager UU()) {
108     uint32_t    num_snapshot_txnids = txn_manager->num_snapshots;
109     TXNID       snapshot_txnids[num_snapshot_txnids];
110     TOKUTXN     snapshot_txns[num_snapshot_txnids];
111     uint32_t    num_live_txns = txn_manager->live_root_txns.size();
112     TOKUTXN     live_txns[num_live_txns];
113     uint32_t    num_referenced_xid_tuples = txn_manager->referenced_xids.size();
114     struct      referenced_xid_tuple  *referenced_xid_tuples[num_referenced_xid_tuples];
115 
116     // do this to get an omt of snapshot_txnids
117     xid_omt_t snapshot_txnids_omt;
118     rx_omt_t referenced_xids_omt;
119     xid_omt_t live_root_txns_omt;
120     toku_txn_manager_clone_state_for_gc_unlocked(
121         txn_manager,
122         &snapshot_txnids_omt,
123         &referenced_xids_omt,
124         &live_root_txns_omt
125         );
126 
127     int r;
128     uint32_t i;
129     uint32_t j;
130     //set up arrays for easier access
131     {
132         TOKUTXN curr_txn = txn_manager->snapshot_head;
133         uint32_t curr_index = 0;
134         while (curr_txn != NULL) {
135             snapshot_txns[curr_index] = curr_txn;
136             snapshot_txnids[curr_index] = curr_txn->snapshot_txnid64;
137             curr_txn = curr_txn->snapshot_next;
138             curr_index++;
139         }
140     }
141 
142     for (i = 0; i < num_live_txns; i++) {
143         r = txn_manager->live_root_txns.fetch(i, &live_txns[i]);
144         assert_zero(r);
145     }
146     for (i = 0; i < num_referenced_xid_tuples; i++) {
147         r = txn_manager->referenced_xids.fetch(i, &referenced_xid_tuples[i]);
148         assert_zero(r);
149     }
150 
151     {
152         //Verify snapshot_txnids
153         for (i = 0; i < num_snapshot_txnids; i++) {
154             TXNID snapshot_xid = snapshot_txnids[i];
155             TOKUTXN snapshot_txn = snapshot_txns[i];
156             uint32_t num_live_root_txn_list = snapshot_txn->live_root_txn_list->size();
157             TXNID     live_root_txn_list[num_live_root_txn_list];
158             {
159                 for (j = 0; j < num_live_root_txn_list; j++) {
160                     r = snapshot_txn->live_root_txn_list->fetch(j, &live_root_txn_list[j]);
161                     assert_zero(r);
162                 }
163             }
164             {
165                 // Only committed entries have return a youngest.
166                 TXNID youngest = toku_get_youngest_live_list_txnid_for(
167                     snapshot_xid,
168                     snapshot_txnids_omt,
169                     txn_manager->referenced_xids
170                     );
171                 invariant(youngest == TXNID_NONE);
172             }
173             for (j = 0; j < num_live_root_txn_list; j++) {
174                 TXNID live_xid = live_root_txn_list[j];
175                 invariant(live_xid <= snapshot_xid);
176                 TXNID youngest = toku_get_youngest_live_list_txnid_for(
177                     live_xid,
178                     snapshot_txnids_omt,
179                     txn_manager->referenced_xids
180                     );
181                 if (is_txnid_live(txn_manager, live_xid)) {
182                     // Only committed entries have return a youngest.
183                     invariant(youngest == TXNID_NONE);
184                 }
185                 else {
186                     invariant(youngest != TXNID_NONE);
187                     // A committed entry might have been read-only, in which case it won't return anything.
188                     // This snapshot reads 'live_xid' so it's youngest cannot be older than snapshot_xid.
189                     invariant(youngest >= snapshot_xid);
190                 }
191             }
192         }
193     }
194     {
195         // Verify referenced_xids.
196         for (i = 0; i < num_referenced_xid_tuples; i++) {
197             struct referenced_xid_tuple *tuple = referenced_xid_tuples[i];
198             invariant(tuple->begin_id < tuple->end_id);
199             invariant(tuple->references > 0);
200 
201             {
202                 //verify neither pair->begin_id nor end_id is in live_list
203                 r = txn_manager->live_root_txns.find_zero<TXNID, find_by_xid>(tuple->begin_id, nullptr, nullptr);
204                 invariant(r == DB_NOTFOUND);
205                 r = txn_manager->live_root_txns.find_zero<TXNID, find_by_xid>(tuple->end_id, nullptr, nullptr);
206                 invariant(r == DB_NOTFOUND);
207             }
208             {
209                 //verify neither pair->begin_id nor end_id is in snapshot_xids
210                 TOKUTXN curr_txn = txn_manager->snapshot_head;
211                 uint32_t curr_index = 0;
212                 while (curr_txn != NULL) {
213                     invariant(tuple->begin_id != curr_txn->txnid.parent_id64);
214                     invariant(tuple->end_id != curr_txn->txnid.parent_id64);
215                     curr_txn = curr_txn->snapshot_next;
216                     curr_index++;
217                 }
218             }
219             {
220                 // Verify number of references is correct
221                 uint32_t refs_found = 0;
222                 for (j = 0; j < num_snapshot_txnids; j++) {
223                     TOKUTXN snapshot_txn = snapshot_txns[j];
224                     if (toku_is_txn_in_live_root_txn_list(*snapshot_txn->live_root_txn_list, tuple->begin_id)) {
225                         refs_found++;
226                     }
227                     invariant(!toku_is_txn_in_live_root_txn_list(
228                                 *snapshot_txn->live_root_txn_list,
229                                 tuple->end_id));
230                 }
231                 invariant(refs_found == tuple->references);
232             }
233             {
234                 // Verify youngest makes sense.
235                 TXNID youngest = toku_get_youngest_live_list_txnid_for(
236                     tuple->begin_id,
237                     snapshot_txnids_omt,
238                     txn_manager->referenced_xids
239                     );
240                 invariant(youngest != TXNID_NONE);
241                 invariant(youngest > tuple->begin_id);
242                 invariant(youngest < tuple->end_id);
243                 // Youngest must be found, and must be a snapshot txn
244                 r = snapshot_txnids_omt.find_zero<TXNID, toku_find_xid_by_xid>(youngest, nullptr, nullptr);
245                 invariant_zero(r);
246             }
247         }
248     }
249     snapshot_txnids_omt.destroy();
250     referenced_xids_omt.destroy();
251     live_root_txns_omt.destroy();
252 }
253 
toku_txn_manager_init(TXN_MANAGER * txn_managerp)254 void toku_txn_manager_init(TXN_MANAGER *txn_managerp) {
255     TXN_MANAGER XCALLOC(txn_manager);
256     toku_mutex_init(
257         *txn_manager_lock_mutex_key, &txn_manager->txn_manager_lock, nullptr);
258     txn_manager->live_root_txns.create();
259     txn_manager->live_root_ids.create();
260     txn_manager->snapshot_head = NULL;
261     txn_manager->snapshot_tail = NULL;
262     txn_manager->num_snapshots = 0;
263     txn_manager->referenced_xids.create();
264     txn_manager->last_xid = 0;
265 
266     txn_manager->last_xid_seen_for_recover = TXNID_NONE;
267     txn_manager->last_calculated_oldest_referenced_xid = TXNID_NONE;
268 
269     *txn_managerp = txn_manager;
270 }
271 
toku_txn_manager_destroy(TXN_MANAGER txn_manager)272 void toku_txn_manager_destroy(TXN_MANAGER txn_manager) {
273     toku_mutex_destroy(&txn_manager->txn_manager_lock);
274     invariant(txn_manager->live_root_txns.size() == 0);
275     txn_manager->live_root_txns.destroy();
276     invariant(txn_manager->live_root_ids.size() == 0);
277     txn_manager->live_root_ids.destroy();
278     invariant(txn_manager->snapshot_head == NULL);
279     invariant(txn_manager->referenced_xids.size() == 0);
280     txn_manager->referenced_xids.destroy();
281     toku_free(txn_manager);
282 }
283 
284 TXNID
toku_txn_manager_get_oldest_living_xid(TXN_MANAGER txn_manager)285 toku_txn_manager_get_oldest_living_xid(TXN_MANAGER txn_manager) {
286     TOKUTXN rtxn = NULL;
287     TXNID rval = TXNID_NONE_LIVING;
288     txn_manager_lock(txn_manager);
289 
290     if (txn_manager->live_root_txns.size() > 0) {
291         int r = txn_manager->live_root_txns.fetch(0, &rtxn);
292         invariant_zero(r);
293     }
294     if (rtxn) {
295         rval = rtxn->txnid.parent_id64;
296     }
297     txn_manager_unlock(txn_manager);
298     return rval;
299 }
300 
toku_txn_manager_get_oldest_referenced_xid_estimate(TXN_MANAGER txn_manager)301 TXNID toku_txn_manager_get_oldest_referenced_xid_estimate(TXN_MANAGER txn_manager) {
302     return toku_unsafe_fetch(&txn_manager->last_calculated_oldest_referenced_xid);
303 }
304 
305 int live_root_txn_list_iter(const TOKUTXN &live_xid, const uint32_t UU(index), TXNID **const referenced_xids);
live_root_txn_list_iter(const TOKUTXN & live_xid,const uint32_t UU (index),TXNID ** const referenced_xids)306 int live_root_txn_list_iter(const TOKUTXN &live_xid, const uint32_t UU(index), TXNID **const referenced_xids){
307     (*referenced_xids)[index] = live_xid->txnid.parent_id64;
308     return 0;
309 }
310 
311 
312 // Create list of root transactions that were live when this txn began.
313 static inline void
setup_live_root_txn_list(xid_omt_t * live_root_txnid,xid_omt_t * live_root_txn_list)314 setup_live_root_txn_list(xid_omt_t* live_root_txnid, xid_omt_t* live_root_txn_list) {
315     if (live_root_txnid->size() > 0) {
316         live_root_txn_list->clone(*live_root_txnid);
317     } else {
318         live_root_txn_list->create_no_array();
319     }
320 }
321 
322 //Heaviside function to search through an OMT by a TXNID
323 int
find_by_xid(const TOKUTXN & txn,const TXNID & txnidfind)324 find_by_xid (const TOKUTXN &txn, const TXNID &txnidfind) {
325     if (txn->txnid.parent_id64 < txnidfind) return -1;
326     if (txn->txnid.parent_id64 > txnidfind) return +1;
327     return 0;
328 }
329 
330 static TXNID
max_xid(TXNID a,TXNID b)331 max_xid(TXNID a, TXNID b) {
332     return a < b ? b : a;
333 }
334 
set_oldest_referenced_xid(TXN_MANAGER txn_manager)335 static void set_oldest_referenced_xid(TXN_MANAGER txn_manager) {
336     TXNID oldest_referenced_xid = TXNID_MAX;
337     int r;
338     if (txn_manager->live_root_ids.size() > 0) {
339         r = txn_manager->live_root_ids.fetch(0, &oldest_referenced_xid);
340         // this function should only be called when we know there is at least
341         // one live transaction
342         invariant_zero(r);
343     }
344 
345     if (txn_manager->referenced_xids.size() > 0) {
346         struct referenced_xid_tuple* tuple;
347         r = txn_manager->referenced_xids.fetch(0, &tuple);
348         if (r == 0 && tuple->begin_id < oldest_referenced_xid) {
349             oldest_referenced_xid = tuple->begin_id;
350         }
351     }
352     if (txn_manager->snapshot_head != NULL) {
353         TXNID id = txn_manager->snapshot_head->snapshot_txnid64;
354         if (id < oldest_referenced_xid) {
355             oldest_referenced_xid = id;
356         }
357     }
358     if (txn_manager->last_xid < oldest_referenced_xid) {
359         oldest_referenced_xid = txn_manager->last_xid;
360     }
361     invariant(oldest_referenced_xid != TXNID_MAX);
362     toku_unsafe_set(&txn_manager->last_calculated_oldest_referenced_xid, oldest_referenced_xid);
363 }
364 
365 //Heaviside function to find a TOKUTXN by TOKUTXN (used to find the index)
366 // template-only function, but must be extern
367 int find_xid (const TOKUTXN &txn, const TOKUTXN &txnfind);
368 int
find_xid(const TOKUTXN & txn,const TOKUTXN & txnfind)369 find_xid (const TOKUTXN &txn, const TOKUTXN &txnfind)
370 {
371     if (txn->txnid.parent_id64 < txnfind->txnid.parent_id64) return -1;
372     if (txn->txnid.parent_id64 > txnfind->txnid.parent_id64) return +1;
373     return 0;
374 }
375 
txn_manager_create_snapshot_unlocked(TXN_MANAGER txn_manager,TOKUTXN txn)376 static inline void txn_manager_create_snapshot_unlocked(
377     TXN_MANAGER txn_manager,
378     TOKUTXN txn
379     )
380 {
381     txn->snapshot_txnid64 = ++txn_manager->last_xid;
382     // Add this txn to the global list of txns that have their own snapshots.
383     // (Note, if a txn is a child that creates its own snapshot, then that child xid
384     // is the xid stored in the global list.)
385     if (txn_manager->snapshot_head == NULL) {
386         invariant(txn_manager->snapshot_tail == NULL);
387         txn_manager->snapshot_head = txn;
388         txn_manager->snapshot_tail = txn;
389     }
390     else {
391         txn_manager->snapshot_tail->snapshot_next = txn;
392         txn->snapshot_prev = txn_manager->snapshot_tail;
393         txn_manager->snapshot_tail = txn;
394     }
395     txn_manager->num_snapshots++;
396 }
397 
398 // template-only function, but must be extern
399 int find_tuple_by_xid (const struct referenced_xid_tuple &tuple, const TXNID &xidfind);
400 int
find_tuple_by_xid(const struct referenced_xid_tuple & tuple,const TXNID & xidfind)401 find_tuple_by_xid (const struct referenced_xid_tuple &tuple, const TXNID &xidfind)
402 {
403     if (tuple.begin_id < xidfind) return -1;
404     if (tuple.begin_id > xidfind) return +1;
405     return 0;
406 }
407 
408 // template-only function, but must be extern
409 int referenced_xids_note_snapshot_txn_end_iter(const TXNID &live_xid, const uint32_t UU(index), rx_omt_t *const referenced_xids)
410     __attribute__((nonnull(3)));
referenced_xids_note_snapshot_txn_end_iter(const TXNID & live_xid,const uint32_t UU (index),rx_omt_t * const referenced_xids)411 int referenced_xids_note_snapshot_txn_end_iter(const TXNID &live_xid, const uint32_t UU(index), rx_omt_t *const referenced_xids)
412 {
413     int r;
414     uint32_t idx;
415     struct referenced_xid_tuple *tuple;
416 
417     r = referenced_xids->find_zero<TXNID, find_tuple_by_xid>(live_xid, &tuple, &idx);
418     if (r == DB_NOTFOUND) {
419         goto done;
420     }
421     invariant_zero(r);
422     invariant(tuple->references > 0);
423     if (--tuple->references == 0) {
424         r = referenced_xids->delete_at(idx);
425         lazy_assert_zero(r);
426     }
427 done:
428     return 0;
429 }
430 
431 // When txn ends, update reverse live list.  To do that, examine each txn in this (closing) txn's live list.
432 static inline int
note_snapshot_txn_end_by_ref_xids(TXN_MANAGER mgr,const xid_omt_t & live_root_txn_list)433 note_snapshot_txn_end_by_ref_xids(TXN_MANAGER mgr, const xid_omt_t &live_root_txn_list) {
434     int r;
435     r = live_root_txn_list.iterate<rx_omt_t, referenced_xids_note_snapshot_txn_end_iter>(&mgr->referenced_xids);
436     invariant_zero(r);
437     return r;
438 }
439 
440 typedef struct snapshot_iter_extra {
441     uint32_t* indexes_to_delete;
442     uint32_t num_indexes;
443     xid_omt_t* live_root_txn_list;
444 } SNAPSHOT_ITER_EXTRA;
445 
446 // template-only function, but must be extern
447 int note_snapshot_txn_end_by_txn_live_list_iter(referenced_xid_tuple* tuple, const uint32_t index, SNAPSHOT_ITER_EXTRA *const sie)
448     __attribute__((nonnull(3)));
note_snapshot_txn_end_by_txn_live_list_iter(referenced_xid_tuple * tuple,const uint32_t index,SNAPSHOT_ITER_EXTRA * const sie)449 int note_snapshot_txn_end_by_txn_live_list_iter(
450     referenced_xid_tuple* tuple,
451     const uint32_t index,
452     SNAPSHOT_ITER_EXTRA *const sie
453     )
454 {
455     int r;
456     uint32_t idx;
457     TXNID txnid;
458     r = sie->live_root_txn_list->find_zero<TXNID, toku_find_xid_by_xid>(tuple->begin_id, &txnid, &idx);
459     if (r == DB_NOTFOUND) {
460         goto done;
461     }
462     invariant_zero(r);
463     invariant(txnid == tuple->begin_id);
464     invariant(tuple->references > 0);
465     if (--tuple->references == 0) {
466         sie->indexes_to_delete[sie->num_indexes] = index;
467         sie->num_indexes++;
468     }
469 done:
470     return 0;
471 }
472 
473 static inline int
note_snapshot_txn_end_by_txn_live_list(TXN_MANAGER mgr,xid_omt_t * live_root_txn_list)474 note_snapshot_txn_end_by_txn_live_list(TXN_MANAGER mgr, xid_omt_t* live_root_txn_list) {
475     uint32_t size = mgr->referenced_xids.size();
476     uint32_t indexes_to_delete[size];
477     SNAPSHOT_ITER_EXTRA sie = { .indexes_to_delete = indexes_to_delete, .num_indexes = 0, .live_root_txn_list = live_root_txn_list};
478     mgr->referenced_xids.iterate_ptr<SNAPSHOT_ITER_EXTRA, note_snapshot_txn_end_by_txn_live_list_iter>(&sie);
479     for (uint32_t i = 0; i < sie.num_indexes; i++) {
480         uint32_t curr_index = sie.indexes_to_delete[sie.num_indexes-i-1];
481         mgr->referenced_xids.delete_at(curr_index);
482     }
483     return 0;
484 }
485 
txn_manager_remove_snapshot_unlocked(TOKUTXN txn,TXN_MANAGER txn_manager)486 static inline void txn_manager_remove_snapshot_unlocked(
487     TOKUTXN txn,
488     TXN_MANAGER txn_manager
489     )
490 {
491     // Remove from linked list of snapshot txns
492     if (txn_manager->snapshot_head == txn) {
493         txn_manager->snapshot_head = txn->snapshot_next;
494     }
495     if (txn_manager->snapshot_tail == txn) {
496         txn_manager->snapshot_tail = txn->snapshot_prev;
497     }
498     if (txn->snapshot_next) {
499         txn->snapshot_next->snapshot_prev = txn->snapshot_prev;
500     }
501     if (txn->snapshot_prev) {
502         txn->snapshot_prev->snapshot_next = txn->snapshot_next;
503     }
504     txn_manager->num_snapshots--;
505     uint32_t ref_xids_size = txn_manager->referenced_xids.size();
506     uint32_t live_list_size = txn->live_root_txn_list->size();
507     if (ref_xids_size > 0 && live_list_size > 0) {
508         if (live_list_size > ref_xids_size && ref_xids_size < 2000) {
509             note_snapshot_txn_end_by_txn_live_list(txn_manager, txn->live_root_txn_list);
510         }
511         else {
512             note_snapshot_txn_end_by_ref_xids(txn_manager, *txn->live_root_txn_list);
513         }
514     }
515 }
516 
inherit_snapshot_from_parent(TOKUTXN child)517 static inline void inherit_snapshot_from_parent(TOKUTXN child) {
518     if (child->parent) {
519         child->snapshot_txnid64 = child->parent->snapshot_txnid64;
520         child->live_root_txn_list = child->parent->live_root_txn_list;
521     }
522 }
toku_txn_manager_handle_snapshot_create_for_child_txn(TOKUTXN txn,TXN_MANAGER txn_manager,TXN_SNAPSHOT_TYPE snapshot_type)523 void toku_txn_manager_handle_snapshot_create_for_child_txn(
524     TOKUTXN txn,
525     TXN_MANAGER txn_manager,
526     TXN_SNAPSHOT_TYPE snapshot_type
527     )
528 {
529     // this is a function for child txns, so just doint a sanity check
530     invariant(txn->parent != NULL);
531     bool copies_snapshot = txn_copies_snapshot(snapshot_type, txn->parent);
532     bool records_snapshot = txn_records_snapshot(snapshot_type, txn->parent);
533     // assert that if records_snapshot is true, then copies_snapshot is true
534     invariant(!records_snapshot || copies_snapshot);
535     if (records_snapshot) {
536         invariant(txn->live_root_txn_list == nullptr);
537         XMALLOC(txn->live_root_txn_list);
538         txn_manager_lock(txn_manager);
539         txn_manager_create_snapshot_unlocked(txn_manager, txn);
540     }
541     else {
542         inherit_snapshot_from_parent(txn);
543     }
544 
545     toku_debug_txn_sync(pthread_self());
546 
547     if (copies_snapshot) {
548 	if(!records_snapshot)
549     	    txn_manager_lock(txn_manager);
550 	setup_live_root_txn_list(&txn_manager->live_root_ids, txn->live_root_txn_list);
551        txn_manager_unlock(txn_manager);
552     }
553 }
554 
toku_txn_manager_handle_snapshot_destroy_for_child_txn(TOKUTXN txn,TXN_MANAGER txn_manager,TXN_SNAPSHOT_TYPE snapshot_type)555 void toku_txn_manager_handle_snapshot_destroy_for_child_txn(
556     TOKUTXN txn,
557     TXN_MANAGER txn_manager,
558     TXN_SNAPSHOT_TYPE snapshot_type
559     )
560 {
561     // this is a function for child txns, so just doint a sanity check
562     invariant(txn->parent != NULL);
563     bool copies_snapshot = txn_copies_snapshot(snapshot_type, txn->parent);
564     bool records_snapshot = txn_records_snapshot(snapshot_type, txn->parent);
565     if (records_snapshot) {
566         txn_manager_lock(txn_manager);
567         txn_manager_remove_snapshot_unlocked(txn, txn_manager);
568         txn_manager_unlock(txn_manager);
569     }
570     if (copies_snapshot) {
571         invariant(txn->live_root_txn_list != nullptr);
572         txn->live_root_txn_list->destroy();
573         toku_free(txn->live_root_txn_list);
574     }
575 }
576 
toku_txn_manager_start_txn_for_recovery(TOKUTXN txn,TXN_MANAGER txn_manager,TXNID xid)577 void toku_txn_manager_start_txn_for_recovery(
578     TOKUTXN txn,
579     TXN_MANAGER txn_manager,
580     TXNID xid
581     )
582 {
583     txn_manager_lock(txn_manager);
584     // using xid that is passed in
585     txn_manager->last_xid = max_xid(txn_manager->last_xid, xid);
586     toku_txn_update_xids_in_txn(txn, xid);
587 
588     uint32_t idx;
589     int r = txn_manager->live_root_txns.find_zero<TOKUTXN, find_xid>(txn, nullptr, &idx);
590     invariant(r == DB_NOTFOUND);
591     r = txn_manager->live_root_txns.insert_at(txn, idx);
592     invariant_zero(r);
593     r = txn_manager->live_root_ids.insert_at(txn->txnid.parent_id64, idx);
594     invariant_zero(r);
595 
596     txn_manager_unlock(txn_manager);
597 }
598 
toku_txn_manager_start_txn(TOKUTXN txn,TXN_MANAGER txn_manager,TXN_SNAPSHOT_TYPE snapshot_type,bool read_only)599 void toku_txn_manager_start_txn(
600     TOKUTXN txn,
601     TXN_MANAGER txn_manager,
602     TXN_SNAPSHOT_TYPE snapshot_type,
603     bool read_only
604     )
605 {
606     int r;
607     TXNID xid = TXNID_NONE;
608     // if we are running in recovery, we don't need to make snapshots
609     bool copies_snapshot = txn_copies_snapshot(snapshot_type, NULL);
610     bool records_snapshot = txn_records_snapshot(snapshot_type, NULL);
611     // assert that if records_snapshot is true, then copies_snapshot is true
612     invariant(!records_snapshot || copies_snapshot);
613 
614     // perform a malloc outside of the txn_manager lock
615     // will be used in txn_manager_create_snapshot_unlocked below
616     if (copies_snapshot) {
617         invariant(txn->live_root_txn_list == nullptr);
618         XMALLOC(txn->live_root_txn_list);
619     }
620     // the act of getting a transaction ID and adding the
621     // txn to the proper OMTs must be atomic. MVCC depends
622     // on this.
623     txn_manager_lock(txn_manager);
624     if (garbage_collection_debug) {
625         verify_snapshot_system(txn_manager);
626     }
627 
628     //
629     // maintain the data structures necessary for MVCC:
630     //  1. add txn to list of live_root_txns if this is a root transaction
631     //  2. if the transaction is creating a snapshot:
632     //    - create a live list for the transaction
633     //    - add the id to the list of snapshot ids
634     //
635     // The order of operations is important here, and must be taken
636     // into account when the transaction is closed. The txn is added
637     // to the live_root_txns first (if it is a root txn). This has the implication
638     // that a root level snapshot transaction is in its own live list. This fact
639     // is taken into account when the transaction is closed.
640 
641     // add ancestor information, and maintain global live root txn list
642     xid = ++txn_manager->last_xid; // we always need an ID, needed for lock tree
643     toku_txn_update_xids_in_txn(txn, xid);
644     if (!read_only) {
645         uint32_t idx = txn_manager->live_root_txns.size();
646         r = txn_manager->live_root_txns.insert_at(txn, idx);
647         invariant_zero(r);
648         r = txn_manager->live_root_ids.insert_at(txn->txnid.parent_id64, idx);
649         invariant_zero(r);
650     }
651     set_oldest_referenced_xid(txn_manager);
652 
653     if (records_snapshot) {
654         txn_manager_create_snapshot_unlocked(
655             txn_manager,
656             txn
657             );
658     }
659     if (copies_snapshot) {
660         setup_live_root_txn_list(&txn_manager->live_root_ids, txn->live_root_txn_list);
661     }
662 
663     if (garbage_collection_debug) {
664         verify_snapshot_system(txn_manager);
665     }
666     txn_manager_unlock(txn_manager);
667     return;
668 }
669 
670 TXNID
toku_get_youngest_live_list_txnid_for(TXNID xc,const xid_omt_t & snapshot_txnids,const rx_omt_t & referenced_xids)671 toku_get_youngest_live_list_txnid_for(TXNID xc, const xid_omt_t &snapshot_txnids, const rx_omt_t &referenced_xids) {
672     struct referenced_xid_tuple *tuple;
673     int r;
674     TXNID rval = TXNID_NONE;
675 
676     r = referenced_xids.find_zero<TXNID, find_tuple_by_xid>(xc, &tuple, nullptr);
677     if (r == DB_NOTFOUND) {
678         goto done;
679     }
680     TXNID live;
681 
682     r = snapshot_txnids.find<TXNID, toku_find_xid_by_xid>(tuple->end_id, -1, &live, nullptr);
683     if (r == DB_NOTFOUND) {
684         goto done;
685     }
686     invariant(live < tuple->end_id);
687     if (live > tuple->begin_id) {
688         rval = live;
689     }
690 done:
691     return rval;
692 }
693 
toku_txn_manager_finish_txn(TXN_MANAGER txn_manager,TOKUTXN txn)694 void toku_txn_manager_finish_txn(TXN_MANAGER txn_manager, TOKUTXN txn) {
695     int r;
696     invariant(txn->parent == NULL);
697     bool records_snapshot = txn_records_snapshot(txn->snapshot_type, NULL);
698     txn_manager_lock(txn_manager);
699 
700     if (garbage_collection_debug) {
701         verify_snapshot_system(txn_manager);
702     }
703 
704     if (records_snapshot) {
705         txn_manager_remove_snapshot_unlocked(
706             txn,
707             txn_manager
708             );
709     }
710 
711     if (!txn_declared_read_only(txn)) {
712         uint32_t idx;
713         //Remove txn from list of live root txns
714         TOKUTXN txnagain;
715         r = txn_manager->live_root_txns.find_zero<TOKUTXN, find_xid>(txn, &txnagain, &idx);
716         invariant_zero(r);
717         invariant(txn==txnagain);
718 
719         r = txn_manager->live_root_txns.delete_at(idx);
720         invariant_zero(r);
721         r = txn_manager->live_root_ids.delete_at(idx);
722         invariant_zero(r);
723 
724         if (!toku_txn_is_read_only(txn) || garbage_collection_debug) {
725             uint32_t num_references = 0;
726             TOKUTXN curr_txn = txn_manager->snapshot_tail;
727             while(curr_txn != NULL) {
728                 if (curr_txn->snapshot_txnid64 > txn->txnid.parent_id64) {
729                     num_references++;
730                 }
731                 else {
732                     break;
733                 }
734                 curr_txn = curr_txn->snapshot_prev;
735             }
736 
737             if (num_references > 0) {
738                 // This transaction exists in a live list of another transaction.
739                 struct referenced_xid_tuple tuple = {
740                     .begin_id = txn->txnid.parent_id64,
741                     .end_id = ++txn_manager->last_xid,
742                     .references = num_references
743                 };
744                 r = txn_manager->referenced_xids.insert<TXNID, find_tuple_by_xid>(tuple, txn->txnid.parent_id64, nullptr);
745                 lazy_assert_zero(r);
746             }
747         }
748     }
749 
750     if (garbage_collection_debug) {
751         verify_snapshot_system(txn_manager);
752     }
753     txn_manager_unlock(txn_manager);
754 
755     //Cleanup that does not require the txn_manager lock
756     if (txn->live_root_txn_list) {
757         txn->live_root_txn_list->destroy();
758         toku_free(txn->live_root_txn_list);
759     }
760     return;
761 }
762 
toku_txn_manager_clone_state_for_gc_unlocked(TXN_MANAGER txn_manager,xid_omt_t * snapshot_xids,rx_omt_t * referenced_xids,xid_omt_t * live_root_txns)763 static void toku_txn_manager_clone_state_for_gc_unlocked(
764     TXN_MANAGER txn_manager,
765     xid_omt_t* snapshot_xids,
766     rx_omt_t* referenced_xids,
767     xid_omt_t* live_root_txns
768     )
769 {
770     TXNID* snapshot_xids_array = NULL;
771     XMALLOC_N(txn_manager->num_snapshots, snapshot_xids_array);
772     TOKUTXN curr_txn = txn_manager->snapshot_head;
773     uint32_t curr_index = 0;
774     while (curr_txn != NULL) {
775         snapshot_xids_array[curr_index] = curr_txn->snapshot_txnid64;
776         curr_txn = curr_txn->snapshot_next;
777         curr_index++;
778     }
779     snapshot_xids->create_steal_sorted_array(
780         &snapshot_xids_array,
781         txn_manager->num_snapshots,
782         txn_manager->num_snapshots
783         );
784 
785     referenced_xids->clone(txn_manager->referenced_xids);
786     setup_live_root_txn_list(&txn_manager->live_root_ids, live_root_txns);
787 }
788 
toku_txn_manager_clone_state_for_gc(TXN_MANAGER txn_manager,xid_omt_t * snapshot_xids,rx_omt_t * referenced_xids,xid_omt_t * live_root_txns)789 void toku_txn_manager_clone_state_for_gc(
790     TXN_MANAGER txn_manager,
791     xid_omt_t* snapshot_xids,
792     rx_omt_t* referenced_xids,
793     xid_omt_t* live_root_txns
794     )
795 {
796     txn_manager_lock(txn_manager);
797     toku_txn_manager_clone_state_for_gc_unlocked(
798         txn_manager,
799         snapshot_xids,
800         referenced_xids,
801         live_root_txns
802         );
803     txn_manager_unlock(txn_manager);
804 }
805 
init()806 void txn_manager_state::init() {
807     invariant(!initialized);
808     invariant_notnull(txn_manager);
809     toku_txn_manager_clone_state_for_gc(
810         txn_manager,
811         &snapshot_xids,
812         &referenced_xids,
813         &live_root_txns
814         );
815     initialized = true;
816 }
817 
toku_txn_manager_id2txn_unlocked(TXN_MANAGER txn_manager,TXNID_PAIR txnid,TOKUTXN * result)818 void toku_txn_manager_id2txn_unlocked(TXN_MANAGER txn_manager, TXNID_PAIR txnid, TOKUTXN *result) {
819     TOKUTXN txn;
820     int r = txn_manager->live_root_txns.find_zero<TXNID, find_by_xid>(txnid.parent_id64, &txn, nullptr);
821     if (r==0) {
822         assert(txn->txnid.parent_id64 == txnid.parent_id64);
823         *result = txn;
824     }
825     else {
826         assert(r==DB_NOTFOUND);
827         // If there is no txn, then we treat it as the null txn.
828         *result = NULL;
829     }
830 }
831 
toku_txn_manager_get_root_txn_from_xid(TXN_MANAGER txn_manager,TOKU_XA_XID * xid,DB_TXN ** txnp)832 int toku_txn_manager_get_root_txn_from_xid (TXN_MANAGER txn_manager, TOKU_XA_XID *xid, DB_TXN **txnp) {
833     txn_manager_lock(txn_manager);
834     int ret_val = 0;
835     int num_live_txns = txn_manager->live_root_txns.size();
836     for (int i = 0; i < num_live_txns; i++) {
837         TOKUTXN txn;
838         {
839             int r = txn_manager->live_root_txns.fetch(i, &txn);
840             assert_zero(r);
841         }
842         if (txn->xa_xid.formatID     == xid->formatID
843             && txn->xa_xid.gtrid_length == xid->gtrid_length
844             && txn->xa_xid.bqual_length == xid->bqual_length
845             && 0==memcmp(txn->xa_xid.data, xid->data, xid->gtrid_length + xid->bqual_length)) {
846             *txnp = txn->container_db_txn;
847             ret_val = 0;
848             goto exit;
849         }
850     }
851     ret_val = DB_NOTFOUND;
852 exit:
853     txn_manager_unlock(txn_manager);
854     return ret_val;
855 }
856 
toku_txn_manager_num_live_root_txns(TXN_MANAGER txn_manager)857 uint32_t toku_txn_manager_num_live_root_txns(TXN_MANAGER txn_manager) {
858     int ret_val = 0;
859     txn_manager_lock(txn_manager);
860     ret_val = txn_manager->live_root_txns.size();
861     txn_manager_unlock(txn_manager);
862     return ret_val;
863 }
864 
txn_manager_iter(TXN_MANAGER txn_manager,txn_mgr_iter_callback cb,void * extra,bool just_root_txns)865 static int txn_manager_iter(
866     TXN_MANAGER txn_manager,
867     txn_mgr_iter_callback cb,
868     void* extra,
869     bool just_root_txns
870     )
871 {
872     int r = 0;
873     toku_mutex_lock(&txn_manager->txn_manager_lock);
874     uint32_t size = txn_manager->live_root_txns.size();
875     for (uint32_t i = 0; i < size; i++) {
876         TOKUTXN curr_txn = NULL;
877         r = txn_manager->live_root_txns.fetch(i, &curr_txn);
878         assert_zero(r);
879         if (just_root_txns) {
880             r = cb(curr_txn, extra);
881         }
882         else {
883             r = curr_txn->child_manager->iterate(cb, extra);
884         }
885         if (r) {
886             break;
887         }
888     }
889     toku_mutex_unlock(&txn_manager->txn_manager_lock);
890     return r;
891 }
892 
toku_txn_manager_iter_over_live_txns(TXN_MANAGER txn_manager,txn_mgr_iter_callback cb,void * extra)893 int toku_txn_manager_iter_over_live_txns(
894     TXN_MANAGER txn_manager,
895     txn_mgr_iter_callback cb,
896     void* extra
897     )
898 {
899     return txn_manager_iter(
900         txn_manager,
901         cb,
902         extra,
903         false
904         );
905 }
906 
toku_txn_manager_iter_over_live_root_txns(TXN_MANAGER txn_manager,txn_mgr_iter_callback cb,void * extra)907 int toku_txn_manager_iter_over_live_root_txns(
908     TXN_MANAGER txn_manager,
909     txn_mgr_iter_callback cb,
910     void* extra
911     )
912 {
913     return txn_manager_iter(
914         txn_manager,
915         cb,
916         extra,
917         true
918         );
919 }
920 
921 
922 //
923 // This function is called only via env_txn_xa_recover and env_txn_recover.
924 // See comments for those functions to understand assumptions that
925 // can be made when calling this function. Namely, that the system is
926 // quiescant, in that we are right after recovery and before user operations
927 // commence.
928 //
929 // Another key assumption made here is that only root transactions
930 // may be prepared and that child transactions cannot be prepared.
931 // This assumption is made by the fact that we iterate over the live root txns
932 // to find prepared transactions.
933 //
934 // I (Zardosht), don't think we take advantage of this fact, as we are holding
935 // the txn_manager_lock in this function, but in the future we might want
936 // to take these assumptions into account.
937 //
toku_txn_manager_recover_root_txn(TXN_MANAGER txn_manager,struct tokulogger_preplist preplist[],long count,long * retp,uint32_t flags)938 int toku_txn_manager_recover_root_txn (
939     TXN_MANAGER txn_manager,
940     struct tokulogger_preplist preplist[/*count*/],
941     long count,
942     long *retp, /*out*/
943     uint32_t flags
944     )
945 {
946     int ret_val = 0;
947     txn_manager_lock(txn_manager);
948     uint32_t num_txns_returned = 0;
949     // scan through live root txns to find
950     // prepared transactions and return them
951     uint32_t size = txn_manager->live_root_txns.size();
952     if (flags==DB_FIRST) {
953         txn_manager->last_xid_seen_for_recover = TXNID_NONE;
954     }
955     else if (flags!=DB_NEXT) {
956         ret_val = EINVAL;
957         goto exit;
958     }
959     for (uint32_t i = 0; i < size; i++) {
960         TOKUTXN curr_txn = NULL;
961         txn_manager->live_root_txns.fetch(i, &curr_txn);
962         // skip over TOKUTXNs whose txnid64 is too small, meaning
963         // we have already processed them.
964         if (curr_txn->txnid.parent_id64 <= txn_manager->last_xid_seen_for_recover) {
965             continue;
966         }
967         if (curr_txn->state == TOKUTXN_PREPARING) {
968             assert(curr_txn->container_db_txn);
969             preplist[num_txns_returned].txn = curr_txn->container_db_txn;
970             preplist[num_txns_returned].xid = curr_txn->xa_xid;
971             txn_manager->last_xid_seen_for_recover = curr_txn->txnid.parent_id64;
972             num_txns_returned++;
973         }
974         txn_manager->last_xid_seen_for_recover = curr_txn->txnid.parent_id64;
975         // if we found the maximum number of prepared transactions we are
976         // allowed to find, then break
977         if ((long) num_txns_returned >= count) {
978             break;
979         }
980     }
981     invariant((long) num_txns_returned <= count);
982     *retp = num_txns_returned;
983     ret_val = 0;
984 exit:
985     txn_manager_unlock(txn_manager);
986     return ret_val;
987 }
988 
txn_manager_lock(TXN_MANAGER txn_manager)989 static void txn_manager_lock(TXN_MANAGER txn_manager) {
990     toku_mutex_lock(&txn_manager->txn_manager_lock);
991 }
992 
txn_manager_unlock(TXN_MANAGER txn_manager)993 static void txn_manager_unlock(TXN_MANAGER txn_manager) {
994     toku_mutex_unlock(&txn_manager->txn_manager_lock);
995 }
996 
toku_txn_manager_suspend(TXN_MANAGER txn_manager)997 void toku_txn_manager_suspend(TXN_MANAGER txn_manager) {
998     txn_manager_lock(txn_manager);
999 }
1000 
toku_txn_manager_resume(TXN_MANAGER txn_manager)1001 void toku_txn_manager_resume(TXN_MANAGER txn_manager) {
1002     txn_manager_unlock(txn_manager);
1003 }
1004 
1005 void
toku_txn_manager_set_last_xid_from_logger(TXN_MANAGER txn_manager,TXNID last_xid)1006 toku_txn_manager_set_last_xid_from_logger(TXN_MANAGER txn_manager, TXNID last_xid) {
1007     invariant(txn_manager->last_xid == TXNID_NONE);
1008     txn_manager->last_xid = last_xid;
1009 }
1010 
1011 void
toku_txn_manager_set_last_xid_from_recovered_checkpoint(TXN_MANAGER txn_manager,TXNID last_xid)1012 toku_txn_manager_set_last_xid_from_recovered_checkpoint(TXN_MANAGER txn_manager, TXNID last_xid) {
1013     txn_manager->last_xid = last_xid;
1014 }
1015 
1016 TXNID
toku_txn_manager_get_last_xid(TXN_MANAGER mgr)1017 toku_txn_manager_get_last_xid(TXN_MANAGER mgr) {
1018     txn_manager_lock(mgr);
1019     TXNID last_xid = mgr->last_xid;
1020     txn_manager_unlock(mgr);
1021     return last_xid;
1022 }
1023 
1024 bool
toku_txn_manager_txns_exist(TXN_MANAGER mgr)1025 toku_txn_manager_txns_exist(TXN_MANAGER mgr) {
1026     txn_manager_lock(mgr);
1027     bool retval = mgr->live_root_txns.size() > 0;
1028     txn_manager_unlock(mgr);
1029     return retval;
1030 }
1031 
1032 
1033 // Test-only function
1034 void
toku_txn_manager_increase_last_xid(TXN_MANAGER mgr,uint64_t increment)1035 toku_txn_manager_increase_last_xid(TXN_MANAGER mgr, uint64_t increment) {
1036     txn_manager_lock(mgr);
1037     mgr->last_xid += increment;
1038     txn_manager_unlock(mgr);
1039 }
1040 
1041