1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #ifndef ROCKSDB_LITE
7 
8 #include "utilities/transactions/pessimistic_transaction_db.h"
9 
10 #include <cinttypes>
11 #include <string>
12 #include <unordered_set>
13 #include <vector>
14 
15 #include "db/db_impl/db_impl.h"
16 #include "rocksdb/db.h"
17 #include "rocksdb/options.h"
18 #include "rocksdb/utilities/transaction_db.h"
19 #include "test_util/sync_point.h"
20 #include "util/cast_util.h"
21 #include "util/mutexlock.h"
22 #include "utilities/transactions/pessimistic_transaction.h"
23 #include "utilities/transactions/transaction_db_mutex_impl.h"
24 #include "utilities/transactions/write_prepared_txn_db.h"
25 #include "utilities/transactions/write_unprepared_txn_db.h"
26 
27 namespace ROCKSDB_NAMESPACE {
28 
PessimisticTransactionDB(DB * db,const TransactionDBOptions & txn_db_options)29 PessimisticTransactionDB::PessimisticTransactionDB(
30     DB* db, const TransactionDBOptions& txn_db_options)
31     : TransactionDB(db),
32       db_impl_(static_cast_with_check<DBImpl, DB>(db)),
33       txn_db_options_(txn_db_options),
34       lock_mgr_(this, txn_db_options_.num_stripes, txn_db_options.max_num_locks,
35                 txn_db_options_.max_num_deadlocks,
36                 txn_db_options_.custom_mutex_factory
37                     ? txn_db_options_.custom_mutex_factory
38                     : std::shared_ptr<TransactionDBMutexFactory>(
39                           new TransactionDBMutexFactoryImpl())) {
40   assert(db_impl_ != nullptr);
41   info_log_ = db_impl_->GetDBOptions().info_log;
42 }
43 
44 // Support initiliazing PessimisticTransactionDB from a stackable db
45 //
46 //    PessimisticTransactionDB
47 //     ^        ^
48 //     |        |
49 //     |        +
50 //     |   StackableDB
51 //     |   ^
52 //     |   |
53 //     +   +
54 //     DBImpl
55 //       ^
56 //       |(inherit)
57 //       +
58 //       DB
59 //
PessimisticTransactionDB(StackableDB * db,const TransactionDBOptions & txn_db_options)60 PessimisticTransactionDB::PessimisticTransactionDB(
61     StackableDB* db, const TransactionDBOptions& txn_db_options)
62     : TransactionDB(db),
63       db_impl_(static_cast_with_check<DBImpl, DB>(db->GetRootDB())),
64       txn_db_options_(txn_db_options),
65       lock_mgr_(this, txn_db_options_.num_stripes, txn_db_options.max_num_locks,
66                 txn_db_options_.max_num_deadlocks,
67                 txn_db_options_.custom_mutex_factory
68                     ? txn_db_options_.custom_mutex_factory
69                     : std::shared_ptr<TransactionDBMutexFactory>(
70                           new TransactionDBMutexFactoryImpl())) {
71   assert(db_impl_ != nullptr);
72 }
73 
~PessimisticTransactionDB()74 PessimisticTransactionDB::~PessimisticTransactionDB() {
75   while (!transactions_.empty()) {
76     delete transactions_.begin()->second;
77     // TODO(myabandeh): this seems to be an unsafe approach as it is not quite
78     // clear whether delete would also remove the entry from transactions_.
79   }
80 }
81 
VerifyCFOptions(const ColumnFamilyOptions &)82 Status PessimisticTransactionDB::VerifyCFOptions(const ColumnFamilyOptions&) {
83   return Status::OK();
84 }
85 
Initialize(const std::vector<size_t> & compaction_enabled_cf_indices,const std::vector<ColumnFamilyHandle * > & handles)86 Status PessimisticTransactionDB::Initialize(
87     const std::vector<size_t>& compaction_enabled_cf_indices,
88     const std::vector<ColumnFamilyHandle*>& handles) {
89   for (auto cf_ptr : handles) {
90     AddColumnFamily(cf_ptr);
91   }
92   // Verify cf options
93   for (auto handle : handles) {
94     ColumnFamilyDescriptor cfd;
95     Status s = handle->GetDescriptor(&cfd);
96     if (!s.ok()) {
97       return s;
98     }
99     s = VerifyCFOptions(cfd.options);
100     if (!s.ok()) {
101       return s;
102     }
103   }
104 
105   // Re-enable compaction for the column families that initially had
106   // compaction enabled.
107   std::vector<ColumnFamilyHandle*> compaction_enabled_cf_handles;
108   compaction_enabled_cf_handles.reserve(compaction_enabled_cf_indices.size());
109   for (auto index : compaction_enabled_cf_indices) {
110     compaction_enabled_cf_handles.push_back(handles[index]);
111   }
112 
113   Status s = EnableAutoCompaction(compaction_enabled_cf_handles);
114 
115   // create 'real' transactions from recovered shell transactions
116   auto dbimpl = static_cast_with_check<DBImpl, DB>(GetRootDB());
117   assert(dbimpl != nullptr);
118   auto rtrxs = dbimpl->recovered_transactions();
119 
120   for (auto it = rtrxs.begin(); it != rtrxs.end(); ++it) {
121     auto recovered_trx = it->second;
122     assert(recovered_trx);
123     assert(recovered_trx->batches_.size() == 1);
124     const auto& seq = recovered_trx->batches_.begin()->first;
125     const auto& batch_info = recovered_trx->batches_.begin()->second;
126     assert(batch_info.log_number_);
127     assert(recovered_trx->name_.length());
128 
129     WriteOptions w_options;
130     w_options.sync = true;
131     TransactionOptions t_options;
132     // This would help avoiding deadlock for keys that although exist in the WAL
133     // did not go through concurrency control. This includes the merge that
134     // MyRocks uses for auto-inc columns. It is safe to do so, since (i) if
135     // there is a conflict between the keys of two transactions that must be
136     // avoided, it is already avoided by the application, MyRocks, before the
137     // restart (ii) application, MyRocks, guarntees to rollback/commit the
138     // recovered transactions before new transactions start.
139     t_options.skip_concurrency_control = true;
140 
141     Transaction* real_trx = BeginTransaction(w_options, t_options, nullptr);
142     assert(real_trx);
143     real_trx->SetLogNumber(batch_info.log_number_);
144     assert(seq != kMaxSequenceNumber);
145     if (GetTxnDBOptions().write_policy != WRITE_COMMITTED) {
146       real_trx->SetId(seq);
147     }
148 
149     s = real_trx->SetName(recovered_trx->name_);
150     if (!s.ok()) {
151       break;
152     }
153 
154     s = real_trx->RebuildFromWriteBatch(batch_info.batch_);
155     // WriteCommitted set this to to disable this check that is specific to
156     // WritePrepared txns
157     assert(batch_info.batch_cnt_ == 0 ||
158            real_trx->GetWriteBatch()->SubBatchCnt() == batch_info.batch_cnt_);
159     real_trx->SetState(Transaction::PREPARED);
160     if (!s.ok()) {
161       break;
162     }
163   }
164   if (s.ok()) {
165     dbimpl->DeleteAllRecoveredTransactions();
166   }
167   return s;
168 }
169 
BeginTransaction(const WriteOptions & write_options,const TransactionOptions & txn_options,Transaction * old_txn)170 Transaction* WriteCommittedTxnDB::BeginTransaction(
171     const WriteOptions& write_options, const TransactionOptions& txn_options,
172     Transaction* old_txn) {
173   if (old_txn != nullptr) {
174     ReinitializeTransaction(old_txn, write_options, txn_options);
175     return old_txn;
176   } else {
177     return new WriteCommittedTxn(this, write_options, txn_options);
178   }
179 }
180 
ValidateTxnDBOptions(const TransactionDBOptions & txn_db_options)181 TransactionDBOptions PessimisticTransactionDB::ValidateTxnDBOptions(
182     const TransactionDBOptions& txn_db_options) {
183   TransactionDBOptions validated = txn_db_options;
184 
185   if (txn_db_options.num_stripes == 0) {
186     validated.num_stripes = 1;
187   }
188 
189   return validated;
190 }
191 
Open(const Options & options,const TransactionDBOptions & txn_db_options,const std::string & dbname,TransactionDB ** dbptr)192 Status TransactionDB::Open(const Options& options,
193                            const TransactionDBOptions& txn_db_options,
194                            const std::string& dbname, TransactionDB** dbptr) {
195   DBOptions db_options(options);
196   ColumnFamilyOptions cf_options(options);
197   std::vector<ColumnFamilyDescriptor> column_families;
198   column_families.push_back(
199       ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
200   std::vector<ColumnFamilyHandle*> handles;
201   Status s = TransactionDB::Open(db_options, txn_db_options, dbname,
202                                  column_families, &handles, dbptr);
203   if (s.ok()) {
204     assert(handles.size() == 1);
205     // i can delete the handle since DBImpl is always holding a reference to
206     // default column family
207     delete handles[0];
208   }
209 
210   return s;
211 }
212 
Open(const DBOptions & db_options,const TransactionDBOptions & txn_db_options,const std::string & dbname,const std::vector<ColumnFamilyDescriptor> & column_families,std::vector<ColumnFamilyHandle * > * handles,TransactionDB ** dbptr)213 Status TransactionDB::Open(
214     const DBOptions& db_options, const TransactionDBOptions& txn_db_options,
215     const std::string& dbname,
216     const std::vector<ColumnFamilyDescriptor>& column_families,
217     std::vector<ColumnFamilyHandle*>* handles, TransactionDB** dbptr) {
218   Status s;
219   DB* db = nullptr;
220   if (txn_db_options.write_policy == WRITE_COMMITTED &&
221       db_options.unordered_write) {
222     return Status::NotSupported(
223         "WRITE_COMMITTED is incompatible with unordered_writes");
224   }
225   if (txn_db_options.write_policy == WRITE_UNPREPARED &&
226       db_options.unordered_write) {
227     // TODO(lth): support it
228     return Status::NotSupported(
229         "WRITE_UNPREPARED is currently incompatible with unordered_writes");
230   }
231   if (txn_db_options.write_policy == WRITE_PREPARED &&
232       db_options.unordered_write && !db_options.two_write_queues) {
233     return Status::NotSupported(
234         "WRITE_PREPARED is incompatible with unordered_writes if "
235         "two_write_queues is not enabled.");
236   }
237 
238   std::vector<ColumnFamilyDescriptor> column_families_copy = column_families;
239   std::vector<size_t> compaction_enabled_cf_indices;
240   DBOptions db_options_2pc = db_options;
241   PrepareWrap(&db_options_2pc, &column_families_copy,
242               &compaction_enabled_cf_indices);
243   const bool use_seq_per_batch =
244       txn_db_options.write_policy == WRITE_PREPARED ||
245       txn_db_options.write_policy == WRITE_UNPREPARED;
246   const bool use_batch_per_txn =
247       txn_db_options.write_policy == WRITE_COMMITTED ||
248       txn_db_options.write_policy == WRITE_PREPARED;
249   s = DBImpl::Open(db_options_2pc, dbname, column_families_copy, handles, &db,
250                    use_seq_per_batch, use_batch_per_txn);
251   if (s.ok()) {
252     ROCKS_LOG_WARN(db->GetDBOptions().info_log,
253                    "Transaction write_policy is %" PRId32,
254                    static_cast<int>(txn_db_options.write_policy));
255     s = WrapDB(db, txn_db_options, compaction_enabled_cf_indices, *handles,
256                dbptr);
257   }
258   if (!s.ok()) {
259     // just in case it was not deleted (and not set to nullptr).
260     delete db;
261   }
262   return s;
263 }
264 
PrepareWrap(DBOptions * db_options,std::vector<ColumnFamilyDescriptor> * column_families,std::vector<size_t> * compaction_enabled_cf_indices)265 void TransactionDB::PrepareWrap(
266     DBOptions* db_options, std::vector<ColumnFamilyDescriptor>* column_families,
267     std::vector<size_t>* compaction_enabled_cf_indices) {
268   compaction_enabled_cf_indices->clear();
269 
270   // Enable MemTable History if not already enabled
271   for (size_t i = 0; i < column_families->size(); i++) {
272     ColumnFamilyOptions* cf_options = &(*column_families)[i].options;
273 
274     if (cf_options->max_write_buffer_size_to_maintain == 0 &&
275         cf_options->max_write_buffer_number_to_maintain == 0) {
276       // Setting to -1 will set the History size to
277       // max_write_buffer_number * write_buffer_size.
278       cf_options->max_write_buffer_size_to_maintain = -1;
279     }
280     if (!cf_options->disable_auto_compactions) {
281       // Disable compactions momentarily to prevent race with DB::Open
282       cf_options->disable_auto_compactions = true;
283       compaction_enabled_cf_indices->push_back(i);
284     }
285   }
286   db_options->allow_2pc = true;
287 }
288 
WrapDB(DB * db,const TransactionDBOptions & txn_db_options,const std::vector<size_t> & compaction_enabled_cf_indices,const std::vector<ColumnFamilyHandle * > & handles,TransactionDB ** dbptr)289 Status TransactionDB::WrapDB(
290     // make sure this db is already opened with memtable history enabled,
291     // auto compaction distabled and 2 phase commit enabled
292     DB* db, const TransactionDBOptions& txn_db_options,
293     const std::vector<size_t>& compaction_enabled_cf_indices,
294     const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) {
295   assert(db != nullptr);
296   assert(dbptr != nullptr);
297   *dbptr = nullptr;
298   std::unique_ptr<PessimisticTransactionDB> txn_db;
299   switch (txn_db_options.write_policy) {
300     case WRITE_UNPREPARED:
301       txn_db.reset(new WriteUnpreparedTxnDB(
302           db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
303       break;
304     case WRITE_PREPARED:
305       txn_db.reset(new WritePreparedTxnDB(
306           db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
307       break;
308     case WRITE_COMMITTED:
309     default:
310       txn_db.reset(new WriteCommittedTxnDB(
311           db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
312   }
313   txn_db->UpdateCFComparatorMap(handles);
314   Status s = txn_db->Initialize(compaction_enabled_cf_indices, handles);
315   // In case of a failure at this point, db is deleted via the txn_db destructor
316   // and set to nullptr.
317   if (s.ok()) {
318     *dbptr = txn_db.release();
319   }
320   return s;
321 }
322 
WrapStackableDB(StackableDB * db,const TransactionDBOptions & txn_db_options,const std::vector<size_t> & compaction_enabled_cf_indices,const std::vector<ColumnFamilyHandle * > & handles,TransactionDB ** dbptr)323 Status TransactionDB::WrapStackableDB(
324     // make sure this stackable_db is already opened with memtable history
325     // enabled, auto compaction distabled and 2 phase commit enabled
326     StackableDB* db, const TransactionDBOptions& txn_db_options,
327     const std::vector<size_t>& compaction_enabled_cf_indices,
328     const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) {
329   assert(db != nullptr);
330   assert(dbptr != nullptr);
331   *dbptr = nullptr;
332   std::unique_ptr<PessimisticTransactionDB> txn_db;
333 
334   switch (txn_db_options.write_policy) {
335     case WRITE_UNPREPARED:
336       txn_db.reset(new WriteUnpreparedTxnDB(
337           db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
338       break;
339     case WRITE_PREPARED:
340       txn_db.reset(new WritePreparedTxnDB(
341           db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
342       break;
343     case WRITE_COMMITTED:
344     default:
345       txn_db.reset(new WriteCommittedTxnDB(
346           db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
347   }
348   txn_db->UpdateCFComparatorMap(handles);
349   Status s = txn_db->Initialize(compaction_enabled_cf_indices, handles);
350   // In case of a failure at this point, db is deleted via the txn_db destructor
351   // and set to nullptr.
352   if (s.ok()) {
353     *dbptr = txn_db.release();
354   }
355   return s;
356 }
357 
358 // Let TransactionLockMgr know that this column family exists so it can
359 // allocate a LockMap for it.
AddColumnFamily(const ColumnFamilyHandle * handle)360 void PessimisticTransactionDB::AddColumnFamily(
361     const ColumnFamilyHandle* handle) {
362   lock_mgr_.AddColumnFamily(handle->GetID());
363 }
364 
CreateColumnFamily(const ColumnFamilyOptions & options,const std::string & column_family_name,ColumnFamilyHandle ** handle)365 Status PessimisticTransactionDB::CreateColumnFamily(
366     const ColumnFamilyOptions& options, const std::string& column_family_name,
367     ColumnFamilyHandle** handle) {
368   InstrumentedMutexLock l(&column_family_mutex_);
369   Status s = VerifyCFOptions(options);
370   if (!s.ok()) {
371     return s;
372   }
373 
374   s = db_->CreateColumnFamily(options, column_family_name, handle);
375   if (s.ok()) {
376     lock_mgr_.AddColumnFamily((*handle)->GetID());
377     UpdateCFComparatorMap(*handle);
378   }
379 
380   return s;
381 }
382 
383 // Let TransactionLockMgr know that it can deallocate the LockMap for this
384 // column family.
DropColumnFamily(ColumnFamilyHandle * column_family)385 Status PessimisticTransactionDB::DropColumnFamily(
386     ColumnFamilyHandle* column_family) {
387   InstrumentedMutexLock l(&column_family_mutex_);
388 
389   Status s = db_->DropColumnFamily(column_family);
390   if (s.ok()) {
391     lock_mgr_.RemoveColumnFamily(column_family->GetID());
392   }
393 
394   return s;
395 }
396 
TryLock(PessimisticTransaction * txn,uint32_t cfh_id,const std::string & key,bool exclusive)397 Status PessimisticTransactionDB::TryLock(PessimisticTransaction* txn,
398                                          uint32_t cfh_id,
399                                          const std::string& key,
400                                          bool exclusive) {
401   return lock_mgr_.TryLock(txn, cfh_id, key, GetEnv(), exclusive);
402 }
403 
UnLock(PessimisticTransaction * txn,const TransactionKeyMap * keys)404 void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn,
405                                       const TransactionKeyMap* keys) {
406   lock_mgr_.UnLock(txn, keys, GetEnv());
407 }
408 
UnLock(PessimisticTransaction * txn,uint32_t cfh_id,const std::string & key)409 void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn,
410                                       uint32_t cfh_id, const std::string& key) {
411   lock_mgr_.UnLock(txn, cfh_id, key, GetEnv());
412 }
413 
414 // Used when wrapping DB write operations in a transaction
BeginInternalTransaction(const WriteOptions & options)415 Transaction* PessimisticTransactionDB::BeginInternalTransaction(
416     const WriteOptions& options) {
417   TransactionOptions txn_options;
418   Transaction* txn = BeginTransaction(options, txn_options, nullptr);
419 
420   // Use default timeout for non-transactional writes
421   txn->SetLockTimeout(txn_db_options_.default_lock_timeout);
422   return txn;
423 }
424 
425 // All user Put, Merge, Delete, and Write requests must be intercepted to make
426 // sure that they lock all keys that they are writing to avoid causing conflicts
427 // with any concurrent transactions. The easiest way to do this is to wrap all
428 // write operations in a transaction.
429 //
430 // Put(), Merge(), and Delete() only lock a single key per call.  Write() will
431 // sort its keys before locking them.  This guarantees that TransactionDB write
432 // methods cannot deadlock with each other (but still could deadlock with a
433 // Transaction).
Put(const WriteOptions & options,ColumnFamilyHandle * column_family,const Slice & key,const Slice & val)434 Status PessimisticTransactionDB::Put(const WriteOptions& options,
435                                      ColumnFamilyHandle* column_family,
436                                      const Slice& key, const Slice& val) {
437   Status s;
438 
439   Transaction* txn = BeginInternalTransaction(options);
440   txn->DisableIndexing();
441 
442   // Since the client didn't create a transaction, they don't care about
443   // conflict checking for this write.  So we just need to do PutUntracked().
444   s = txn->PutUntracked(column_family, key, val);
445 
446   if (s.ok()) {
447     s = txn->Commit();
448   }
449 
450   delete txn;
451 
452   return s;
453 }
454 
Delete(const WriteOptions & wopts,ColumnFamilyHandle * column_family,const Slice & key)455 Status PessimisticTransactionDB::Delete(const WriteOptions& wopts,
456                                         ColumnFamilyHandle* column_family,
457                                         const Slice& key) {
458   Status s;
459 
460   Transaction* txn = BeginInternalTransaction(wopts);
461   txn->DisableIndexing();
462 
463   // Since the client didn't create a transaction, they don't care about
464   // conflict checking for this write.  So we just need to do
465   // DeleteUntracked().
466   s = txn->DeleteUntracked(column_family, key);
467 
468   if (s.ok()) {
469     s = txn->Commit();
470   }
471 
472   delete txn;
473 
474   return s;
475 }
476 
SingleDelete(const WriteOptions & wopts,ColumnFamilyHandle * column_family,const Slice & key)477 Status PessimisticTransactionDB::SingleDelete(const WriteOptions& wopts,
478                                               ColumnFamilyHandle* column_family,
479                                               const Slice& key) {
480   Status s;
481 
482   Transaction* txn = BeginInternalTransaction(wopts);
483   txn->DisableIndexing();
484 
485   // Since the client didn't create a transaction, they don't care about
486   // conflict checking for this write.  So we just need to do
487   // SingleDeleteUntracked().
488   s = txn->SingleDeleteUntracked(column_family, key);
489 
490   if (s.ok()) {
491     s = txn->Commit();
492   }
493 
494   delete txn;
495 
496   return s;
497 }
498 
Merge(const WriteOptions & options,ColumnFamilyHandle * column_family,const Slice & key,const Slice & value)499 Status PessimisticTransactionDB::Merge(const WriteOptions& options,
500                                        ColumnFamilyHandle* column_family,
501                                        const Slice& key, const Slice& value) {
502   Status s;
503 
504   Transaction* txn = BeginInternalTransaction(options);
505   txn->DisableIndexing();
506 
507   // Since the client didn't create a transaction, they don't care about
508   // conflict checking for this write.  So we just need to do
509   // MergeUntracked().
510   s = txn->MergeUntracked(column_family, key, value);
511 
512   if (s.ok()) {
513     s = txn->Commit();
514   }
515 
516   delete txn;
517 
518   return s;
519 }
520 
Write(const WriteOptions & opts,WriteBatch * updates)521 Status PessimisticTransactionDB::Write(const WriteOptions& opts,
522                                        WriteBatch* updates) {
523   return WriteWithConcurrencyControl(opts, updates);
524 }
525 
Write(const WriteOptions & opts,WriteBatch * updates)526 Status WriteCommittedTxnDB::Write(const WriteOptions& opts,
527                                   WriteBatch* updates) {
528   if (txn_db_options_.skip_concurrency_control) {
529     return db_impl_->Write(opts, updates);
530   } else {
531     return WriteWithConcurrencyControl(opts, updates);
532   }
533 }
534 
Write(const WriteOptions & opts,const TransactionDBWriteOptimizations & optimizations,WriteBatch * updates)535 Status WriteCommittedTxnDB::Write(
536     const WriteOptions& opts,
537     const TransactionDBWriteOptimizations& optimizations, WriteBatch* updates) {
538   if (optimizations.skip_concurrency_control) {
539     return db_impl_->Write(opts, updates);
540   } else {
541     return WriteWithConcurrencyControl(opts, updates);
542   }
543 }
544 
InsertExpirableTransaction(TransactionID tx_id,PessimisticTransaction * tx)545 void PessimisticTransactionDB::InsertExpirableTransaction(
546     TransactionID tx_id, PessimisticTransaction* tx) {
547   assert(tx->GetExpirationTime() > 0);
548   std::lock_guard<std::mutex> lock(map_mutex_);
549   expirable_transactions_map_.insert({tx_id, tx});
550 }
551 
RemoveExpirableTransaction(TransactionID tx_id)552 void PessimisticTransactionDB::RemoveExpirableTransaction(TransactionID tx_id) {
553   std::lock_guard<std::mutex> lock(map_mutex_);
554   expirable_transactions_map_.erase(tx_id);
555 }
556 
TryStealingExpiredTransactionLocks(TransactionID tx_id)557 bool PessimisticTransactionDB::TryStealingExpiredTransactionLocks(
558     TransactionID tx_id) {
559   std::lock_guard<std::mutex> lock(map_mutex_);
560 
561   auto tx_it = expirable_transactions_map_.find(tx_id);
562   if (tx_it == expirable_transactions_map_.end()) {
563     return true;
564   }
565   PessimisticTransaction& tx = *(tx_it->second);
566   return tx.TryStealingLocks();
567 }
568 
ReinitializeTransaction(Transaction * txn,const WriteOptions & write_options,const TransactionOptions & txn_options)569 void PessimisticTransactionDB::ReinitializeTransaction(
570     Transaction* txn, const WriteOptions& write_options,
571     const TransactionOptions& txn_options) {
572   auto txn_impl =
573       static_cast_with_check<PessimisticTransaction, Transaction>(txn);
574 
575   txn_impl->Reinitialize(this, write_options, txn_options);
576 }
577 
GetTransactionByName(const TransactionName & name)578 Transaction* PessimisticTransactionDB::GetTransactionByName(
579     const TransactionName& name) {
580   std::lock_guard<std::mutex> lock(name_map_mutex_);
581   auto it = transactions_.find(name);
582   if (it == transactions_.end()) {
583     return nullptr;
584   } else {
585     return it->second;
586   }
587 }
588 
GetAllPreparedTransactions(std::vector<Transaction * > * transv)589 void PessimisticTransactionDB::GetAllPreparedTransactions(
590     std::vector<Transaction*>* transv) {
591   assert(transv);
592   transv->clear();
593   std::lock_guard<std::mutex> lock(name_map_mutex_);
594   for (auto it = transactions_.begin(); it != transactions_.end(); ++it) {
595     if (it->second->GetState() == Transaction::PREPARED) {
596       transv->push_back(it->second);
597     }
598   }
599 }
600 
601 TransactionLockMgr::LockStatusData
GetLockStatusData()602 PessimisticTransactionDB::GetLockStatusData() {
603   return lock_mgr_.GetLockStatusData();
604 }
605 
GetDeadlockInfoBuffer()606 std::vector<DeadlockPath> PessimisticTransactionDB::GetDeadlockInfoBuffer() {
607   return lock_mgr_.GetDeadlockInfoBuffer();
608 }
609 
SetDeadlockInfoBufferSize(uint32_t target_size)610 void PessimisticTransactionDB::SetDeadlockInfoBufferSize(uint32_t target_size) {
611   lock_mgr_.Resize(target_size);
612 }
613 
RegisterTransaction(Transaction * txn)614 void PessimisticTransactionDB::RegisterTransaction(Transaction* txn) {
615   assert(txn);
616   assert(txn->GetName().length() > 0);
617   assert(GetTransactionByName(txn->GetName()) == nullptr);
618   assert(txn->GetState() == Transaction::STARTED);
619   std::lock_guard<std::mutex> lock(name_map_mutex_);
620   transactions_[txn->GetName()] = txn;
621 }
622 
UnregisterTransaction(Transaction * txn)623 void PessimisticTransactionDB::UnregisterTransaction(Transaction* txn) {
624   assert(txn);
625   std::lock_guard<std::mutex> lock(name_map_mutex_);
626   auto it = transactions_.find(txn->GetName());
627   assert(it != transactions_.end());
628   transactions_.erase(it);
629 }
630 
631 }  // namespace ROCKSDB_NAMESPACE
632 #endif  // ROCKSDB_LITE
633