1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #ifndef ROCKSDB_LITE
7 
8 #include "utilities/transactions/pessimistic_transaction_db.h"
9 
10 #include <cinttypes>
11 #include <string>
12 #include <unordered_set>
13 #include <vector>
14 
15 #include "db/db_impl/db_impl.h"
16 #include "logging/logging.h"
17 #include "rocksdb/db.h"
18 #include "rocksdb/options.h"
19 #include "rocksdb/utilities/transaction_db.h"
20 #include "test_util/sync_point.h"
21 #include "util/cast_util.h"
22 #include "util/mutexlock.h"
23 #include "utilities/transactions/pessimistic_transaction.h"
24 #include "utilities/transactions/transaction_db_mutex_impl.h"
25 #include "utilities/transactions/write_prepared_txn_db.h"
26 #include "utilities/transactions/write_unprepared_txn_db.h"
27 
28 namespace ROCKSDB_NAMESPACE {
29 
PessimisticTransactionDB(DB * db,const TransactionDBOptions & txn_db_options)30 PessimisticTransactionDB::PessimisticTransactionDB(
31     DB* db, const TransactionDBOptions& txn_db_options)
32     : TransactionDB(db),
33       db_impl_(static_cast_with_check<DBImpl>(db)),
34       txn_db_options_(txn_db_options),
35       lock_manager_(NewLockManager(this, txn_db_options)) {
36   assert(db_impl_ != nullptr);
37   info_log_ = db_impl_->GetDBOptions().info_log;
38 }
39 
40 // Support initiliazing PessimisticTransactionDB from a stackable db
41 //
42 //    PessimisticTransactionDB
43 //     ^        ^
44 //     |        |
45 //     |        +
46 //     |   StackableDB
47 //     |   ^
48 //     |   |
49 //     +   +
50 //     DBImpl
51 //       ^
52 //       |(inherit)
53 //       +
54 //       DB
55 //
PessimisticTransactionDB(StackableDB * db,const TransactionDBOptions & txn_db_options)56 PessimisticTransactionDB::PessimisticTransactionDB(
57     StackableDB* db, const TransactionDBOptions& txn_db_options)
58     : TransactionDB(db),
59       db_impl_(static_cast_with_check<DBImpl>(db->GetRootDB())),
60       txn_db_options_(txn_db_options),
61       lock_manager_(NewLockManager(this, txn_db_options)) {
62   assert(db_impl_ != nullptr);
63 }
64 
~PessimisticTransactionDB()65 PessimisticTransactionDB::~PessimisticTransactionDB() {
66   while (!transactions_.empty()) {
67     delete transactions_.begin()->second;
68     // TODO(myabandeh): this seems to be an unsafe approach as it is not quite
69     // clear whether delete would also remove the entry from transactions_.
70   }
71 }
72 
VerifyCFOptions(const ColumnFamilyOptions &)73 Status PessimisticTransactionDB::VerifyCFOptions(const ColumnFamilyOptions&) {
74   return Status::OK();
75 }
76 
Initialize(const std::vector<size_t> & compaction_enabled_cf_indices,const std::vector<ColumnFamilyHandle * > & handles)77 Status PessimisticTransactionDB::Initialize(
78     const std::vector<size_t>& compaction_enabled_cf_indices,
79     const std::vector<ColumnFamilyHandle*>& handles) {
80   for (auto cf_ptr : handles) {
81     AddColumnFamily(cf_ptr);
82   }
83   // Verify cf options
84   for (auto handle : handles) {
85     ColumnFamilyDescriptor cfd;
86     Status s = handle->GetDescriptor(&cfd);
87     if (!s.ok()) {
88       return s;
89     }
90     s = VerifyCFOptions(cfd.options);
91     if (!s.ok()) {
92       return s;
93     }
94   }
95 
96   // Re-enable compaction for the column families that initially had
97   // compaction enabled.
98   std::vector<ColumnFamilyHandle*> compaction_enabled_cf_handles;
99   compaction_enabled_cf_handles.reserve(compaction_enabled_cf_indices.size());
100   for (auto index : compaction_enabled_cf_indices) {
101     compaction_enabled_cf_handles.push_back(handles[index]);
102   }
103 
104   Status s = EnableAutoCompaction(compaction_enabled_cf_handles);
105 
106   // create 'real' transactions from recovered shell transactions
107   auto dbimpl = static_cast_with_check<DBImpl>(GetRootDB());
108   assert(dbimpl != nullptr);
109   auto rtrxs = dbimpl->recovered_transactions();
110 
111   for (auto it = rtrxs.begin(); it != rtrxs.end(); ++it) {
112     auto recovered_trx = it->second;
113     assert(recovered_trx);
114     assert(recovered_trx->batches_.size() == 1);
115     const auto& seq = recovered_trx->batches_.begin()->first;
116     const auto& batch_info = recovered_trx->batches_.begin()->second;
117     assert(batch_info.log_number_);
118     assert(recovered_trx->name_.length());
119 
120     WriteOptions w_options;
121     w_options.sync = true;
122     TransactionOptions t_options;
123     // This would help avoiding deadlock for keys that although exist in the WAL
124     // did not go through concurrency control. This includes the merge that
125     // MyRocks uses for auto-inc columns. It is safe to do so, since (i) if
126     // there is a conflict between the keys of two transactions that must be
127     // avoided, it is already avoided by the application, MyRocks, before the
128     // restart (ii) application, MyRocks, guarntees to rollback/commit the
129     // recovered transactions before new transactions start.
130     t_options.skip_concurrency_control = true;
131 
132     Transaction* real_trx = BeginTransaction(w_options, t_options, nullptr);
133     assert(real_trx);
134     real_trx->SetLogNumber(batch_info.log_number_);
135     assert(seq != kMaxSequenceNumber);
136     if (GetTxnDBOptions().write_policy != WRITE_COMMITTED) {
137       real_trx->SetId(seq);
138     }
139 
140     s = real_trx->SetName(recovered_trx->name_);
141     if (!s.ok()) {
142       break;
143     }
144 
145     s = real_trx->RebuildFromWriteBatch(batch_info.batch_);
146     // WriteCommitted set this to to disable this check that is specific to
147     // WritePrepared txns
148     assert(batch_info.batch_cnt_ == 0 ||
149            real_trx->GetWriteBatch()->SubBatchCnt() == batch_info.batch_cnt_);
150     real_trx->SetState(Transaction::PREPARED);
151     if (!s.ok()) {
152       break;
153     }
154   }
155   if (s.ok()) {
156     dbimpl->DeleteAllRecoveredTransactions();
157   }
158   return s;
159 }
160 
BeginTransaction(const WriteOptions & write_options,const TransactionOptions & txn_options,Transaction * old_txn)161 Transaction* WriteCommittedTxnDB::BeginTransaction(
162     const WriteOptions& write_options, const TransactionOptions& txn_options,
163     Transaction* old_txn) {
164   if (old_txn != nullptr) {
165     ReinitializeTransaction(old_txn, write_options, txn_options);
166     return old_txn;
167   } else {
168     return new WriteCommittedTxn(this, write_options, txn_options);
169   }
170 }
171 
ValidateTxnDBOptions(const TransactionDBOptions & txn_db_options)172 TransactionDBOptions PessimisticTransactionDB::ValidateTxnDBOptions(
173     const TransactionDBOptions& txn_db_options) {
174   TransactionDBOptions validated = txn_db_options;
175 
176   if (txn_db_options.num_stripes == 0) {
177     validated.num_stripes = 1;
178   }
179 
180   return validated;
181 }
182 
Open(const Options & options,const TransactionDBOptions & txn_db_options,const std::string & dbname,TransactionDB ** dbptr)183 Status TransactionDB::Open(const Options& options,
184                            const TransactionDBOptions& txn_db_options,
185                            const std::string& dbname, TransactionDB** dbptr) {
186   DBOptions db_options(options);
187   ColumnFamilyOptions cf_options(options);
188   std::vector<ColumnFamilyDescriptor> column_families;
189   column_families.push_back(
190       ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
191   std::vector<ColumnFamilyHandle*> handles;
192   Status s = TransactionDB::Open(db_options, txn_db_options, dbname,
193                                  column_families, &handles, dbptr);
194   if (s.ok()) {
195     assert(handles.size() == 1);
196     // i can delete the handle since DBImpl is always holding a reference to
197     // default column family
198     delete handles[0];
199   }
200 
201   return s;
202 }
203 
Open(const DBOptions & db_options,const TransactionDBOptions & txn_db_options,const std::string & dbname,const std::vector<ColumnFamilyDescriptor> & column_families,std::vector<ColumnFamilyHandle * > * handles,TransactionDB ** dbptr)204 Status TransactionDB::Open(
205     const DBOptions& db_options, const TransactionDBOptions& txn_db_options,
206     const std::string& dbname,
207     const std::vector<ColumnFamilyDescriptor>& column_families,
208     std::vector<ColumnFamilyHandle*>* handles, TransactionDB** dbptr) {
209   Status s;
210   DB* db = nullptr;
211   if (txn_db_options.write_policy == WRITE_COMMITTED &&
212       db_options.unordered_write) {
213     return Status::NotSupported(
214         "WRITE_COMMITTED is incompatible with unordered_writes");
215   }
216   if (txn_db_options.write_policy == WRITE_UNPREPARED &&
217       db_options.unordered_write) {
218     // TODO(lth): support it
219     return Status::NotSupported(
220         "WRITE_UNPREPARED is currently incompatible with unordered_writes");
221   }
222   if (txn_db_options.write_policy == WRITE_PREPARED &&
223       db_options.unordered_write && !db_options.two_write_queues) {
224     return Status::NotSupported(
225         "WRITE_PREPARED is incompatible with unordered_writes if "
226         "two_write_queues is not enabled.");
227   }
228 
229   std::vector<ColumnFamilyDescriptor> column_families_copy = column_families;
230   std::vector<size_t> compaction_enabled_cf_indices;
231   DBOptions db_options_2pc = db_options;
232   PrepareWrap(&db_options_2pc, &column_families_copy,
233               &compaction_enabled_cf_indices);
234   const bool use_seq_per_batch =
235       txn_db_options.write_policy == WRITE_PREPARED ||
236       txn_db_options.write_policy == WRITE_UNPREPARED;
237   const bool use_batch_per_txn =
238       txn_db_options.write_policy == WRITE_COMMITTED ||
239       txn_db_options.write_policy == WRITE_PREPARED;
240   s = DBImpl::Open(db_options_2pc, dbname, column_families_copy, handles, &db,
241                    use_seq_per_batch, use_batch_per_txn);
242   if (s.ok()) {
243     ROCKS_LOG_WARN(db->GetDBOptions().info_log,
244                    "Transaction write_policy is %" PRId32,
245                    static_cast<int>(txn_db_options.write_policy));
246     s = WrapDB(db, txn_db_options, compaction_enabled_cf_indices, *handles,
247                dbptr);
248   }
249   if (!s.ok()) {
250     // just in case it was not deleted (and not set to nullptr).
251     delete db;
252   }
253   return s;
254 }
255 
PrepareWrap(DBOptions * db_options,std::vector<ColumnFamilyDescriptor> * column_families,std::vector<size_t> * compaction_enabled_cf_indices)256 void TransactionDB::PrepareWrap(
257     DBOptions* db_options, std::vector<ColumnFamilyDescriptor>* column_families,
258     std::vector<size_t>* compaction_enabled_cf_indices) {
259   compaction_enabled_cf_indices->clear();
260 
261   // Enable MemTable History if not already enabled
262   for (size_t i = 0; i < column_families->size(); i++) {
263     ColumnFamilyOptions* cf_options = &(*column_families)[i].options;
264 
265     if (cf_options->max_write_buffer_size_to_maintain == 0 &&
266         cf_options->max_write_buffer_number_to_maintain == 0) {
267       // Setting to -1 will set the History size to
268       // max_write_buffer_number * write_buffer_size.
269       cf_options->max_write_buffer_size_to_maintain = -1;
270     }
271     if (!cf_options->disable_auto_compactions) {
272       // Disable compactions momentarily to prevent race with DB::Open
273       cf_options->disable_auto_compactions = true;
274       compaction_enabled_cf_indices->push_back(i);
275     }
276   }
277   db_options->allow_2pc = true;
278 }
279 
280 namespace {
281 template <typename DBType>
WrapAnotherDBInternal(DBType * db,const TransactionDBOptions & txn_db_options,const std::vector<size_t> & compaction_enabled_cf_indices,const std::vector<ColumnFamilyHandle * > & handles,TransactionDB ** dbptr)282 Status WrapAnotherDBInternal(
283     DBType* db, const TransactionDBOptions& txn_db_options,
284     const std::vector<size_t>& compaction_enabled_cf_indices,
285     const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) {
286   assert(db != nullptr);
287   assert(dbptr != nullptr);
288   *dbptr = nullptr;
289   std::unique_ptr<PessimisticTransactionDB> txn_db;
290   switch (txn_db_options.write_policy) {
291     case WRITE_UNPREPARED:
292       txn_db.reset(new WriteUnpreparedTxnDB(
293           db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
294       break;
295     case WRITE_PREPARED:
296       txn_db.reset(new WritePreparedTxnDB(
297           db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
298       break;
299     case WRITE_COMMITTED:
300     default:
301       txn_db.reset(new WriteCommittedTxnDB(
302           db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
303   }
304   txn_db->UpdateCFComparatorMap(handles);
305   Status s = txn_db->Initialize(compaction_enabled_cf_indices, handles);
306   // In case of a failure at this point, db is deleted via the txn_db destructor
307   // and set to nullptr.
308   if (s.ok()) {
309     *dbptr = txn_db.release();
310   }
311   return s;
312 }
313 }  // namespace
314 
WrapDB(DB * db,const TransactionDBOptions & txn_db_options,const std::vector<size_t> & compaction_enabled_cf_indices,const std::vector<ColumnFamilyHandle * > & handles,TransactionDB ** dbptr)315 Status TransactionDB::WrapDB(
316     // make sure this db is already opened with memtable history enabled,
317     // auto compaction distabled and 2 phase commit enabled
318     DB* db, const TransactionDBOptions& txn_db_options,
319     const std::vector<size_t>& compaction_enabled_cf_indices,
320     const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) {
321   return WrapAnotherDBInternal(db, txn_db_options,
322                                compaction_enabled_cf_indices, handles, dbptr);
323 }
324 
WrapStackableDB(StackableDB * db,const TransactionDBOptions & txn_db_options,const std::vector<size_t> & compaction_enabled_cf_indices,const std::vector<ColumnFamilyHandle * > & handles,TransactionDB ** dbptr)325 Status TransactionDB::WrapStackableDB(
326     // make sure this stackable_db is already opened with memtable history
327     // enabled, auto compaction distabled and 2 phase commit enabled
328     StackableDB* db, const TransactionDBOptions& txn_db_options,
329     const std::vector<size_t>& compaction_enabled_cf_indices,
330     const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) {
331   return WrapAnotherDBInternal(db, txn_db_options,
332                                compaction_enabled_cf_indices, handles, dbptr);
333 }
334 
335 // Let LockManager know that this column family exists so it can
336 // allocate a LockMap for it.
AddColumnFamily(const ColumnFamilyHandle * handle)337 void PessimisticTransactionDB::AddColumnFamily(
338     const ColumnFamilyHandle* handle) {
339   lock_manager_->AddColumnFamily(handle);
340 }
341 
CreateColumnFamily(const ColumnFamilyOptions & options,const std::string & column_family_name,ColumnFamilyHandle ** handle)342 Status PessimisticTransactionDB::CreateColumnFamily(
343     const ColumnFamilyOptions& options, const std::string& column_family_name,
344     ColumnFamilyHandle** handle) {
345   InstrumentedMutexLock l(&column_family_mutex_);
346   Status s = VerifyCFOptions(options);
347   if (!s.ok()) {
348     return s;
349   }
350 
351   s = db_->CreateColumnFamily(options, column_family_name, handle);
352   if (s.ok()) {
353     lock_manager_->AddColumnFamily(*handle);
354     UpdateCFComparatorMap(*handle);
355   }
356 
357   return s;
358 }
359 
360 // Let LockManager know that it can deallocate the LockMap for this
361 // column family.
DropColumnFamily(ColumnFamilyHandle * column_family)362 Status PessimisticTransactionDB::DropColumnFamily(
363     ColumnFamilyHandle* column_family) {
364   InstrumentedMutexLock l(&column_family_mutex_);
365 
366   Status s = db_->DropColumnFamily(column_family);
367   if (s.ok()) {
368     lock_manager_->RemoveColumnFamily(column_family);
369   }
370 
371   return s;
372 }
373 
TryLock(PessimisticTransaction * txn,uint32_t cfh_id,const std::string & key,bool exclusive)374 Status PessimisticTransactionDB::TryLock(PessimisticTransaction* txn,
375                                          uint32_t cfh_id,
376                                          const std::string& key,
377                                          bool exclusive) {
378   return lock_manager_->TryLock(txn, cfh_id, key, GetEnv(), exclusive);
379 }
380 
TryRangeLock(PessimisticTransaction * txn,uint32_t cfh_id,const Endpoint & start_endp,const Endpoint & end_endp)381 Status PessimisticTransactionDB::TryRangeLock(PessimisticTransaction* txn,
382                                               uint32_t cfh_id,
383                                               const Endpoint& start_endp,
384                                               const Endpoint& end_endp) {
385   return lock_manager_->TryLock(txn, cfh_id, start_endp, end_endp, GetEnv(),
386                                 /*exclusive=*/true);
387 }
388 
UnLock(PessimisticTransaction * txn,const LockTracker & keys)389 void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn,
390                                       const LockTracker& keys) {
391   lock_manager_->UnLock(txn, keys, GetEnv());
392 }
393 
UnLock(PessimisticTransaction * txn,uint32_t cfh_id,const std::string & key)394 void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn,
395                                       uint32_t cfh_id, const std::string& key) {
396   lock_manager_->UnLock(txn, cfh_id, key, GetEnv());
397 }
398 
399 // Used when wrapping DB write operations in a transaction
BeginInternalTransaction(const WriteOptions & options)400 Transaction* PessimisticTransactionDB::BeginInternalTransaction(
401     const WriteOptions& options) {
402   TransactionOptions txn_options;
403   Transaction* txn = BeginTransaction(options, txn_options, nullptr);
404 
405   // Use default timeout for non-transactional writes
406   txn->SetLockTimeout(txn_db_options_.default_lock_timeout);
407   return txn;
408 }
409 
410 // All user Put, Merge, Delete, and Write requests must be intercepted to make
411 // sure that they lock all keys that they are writing to avoid causing conflicts
412 // with any concurrent transactions. The easiest way to do this is to wrap all
413 // write operations in a transaction.
414 //
415 // Put(), Merge(), and Delete() only lock a single key per call.  Write() will
416 // sort its keys before locking them.  This guarantees that TransactionDB write
417 // methods cannot deadlock with each other (but still could deadlock with a
418 // Transaction).
Put(const WriteOptions & options,ColumnFamilyHandle * column_family,const Slice & key,const Slice & val)419 Status PessimisticTransactionDB::Put(const WriteOptions& options,
420                                      ColumnFamilyHandle* column_family,
421                                      const Slice& key, const Slice& val) {
422   Status s;
423 
424   Transaction* txn = BeginInternalTransaction(options);
425   txn->DisableIndexing();
426 
427   // Since the client didn't create a transaction, they don't care about
428   // conflict checking for this write.  So we just need to do PutUntracked().
429   s = txn->PutUntracked(column_family, key, val);
430 
431   if (s.ok()) {
432     s = txn->Commit();
433   }
434 
435   delete txn;
436 
437   return s;
438 }
439 
Delete(const WriteOptions & wopts,ColumnFamilyHandle * column_family,const Slice & key)440 Status PessimisticTransactionDB::Delete(const WriteOptions& wopts,
441                                         ColumnFamilyHandle* column_family,
442                                         const Slice& key) {
443   Status s;
444 
445   Transaction* txn = BeginInternalTransaction(wopts);
446   txn->DisableIndexing();
447 
448   // Since the client didn't create a transaction, they don't care about
449   // conflict checking for this write.  So we just need to do
450   // DeleteUntracked().
451   s = txn->DeleteUntracked(column_family, key);
452 
453   if (s.ok()) {
454     s = txn->Commit();
455   }
456 
457   delete txn;
458 
459   return s;
460 }
461 
SingleDelete(const WriteOptions & wopts,ColumnFamilyHandle * column_family,const Slice & key)462 Status PessimisticTransactionDB::SingleDelete(const WriteOptions& wopts,
463                                               ColumnFamilyHandle* column_family,
464                                               const Slice& key) {
465   Status s;
466 
467   Transaction* txn = BeginInternalTransaction(wopts);
468   txn->DisableIndexing();
469 
470   // Since the client didn't create a transaction, they don't care about
471   // conflict checking for this write.  So we just need to do
472   // SingleDeleteUntracked().
473   s = txn->SingleDeleteUntracked(column_family, key);
474 
475   if (s.ok()) {
476     s = txn->Commit();
477   }
478 
479   delete txn;
480 
481   return s;
482 }
483 
Merge(const WriteOptions & options,ColumnFamilyHandle * column_family,const Slice & key,const Slice & value)484 Status PessimisticTransactionDB::Merge(const WriteOptions& options,
485                                        ColumnFamilyHandle* column_family,
486                                        const Slice& key, const Slice& value) {
487   Status s;
488 
489   Transaction* txn = BeginInternalTransaction(options);
490   txn->DisableIndexing();
491 
492   // Since the client didn't create a transaction, they don't care about
493   // conflict checking for this write.  So we just need to do
494   // MergeUntracked().
495   s = txn->MergeUntracked(column_family, key, value);
496 
497   if (s.ok()) {
498     s = txn->Commit();
499   }
500 
501   delete txn;
502 
503   return s;
504 }
505 
Write(const WriteOptions & opts,WriteBatch * updates)506 Status PessimisticTransactionDB::Write(const WriteOptions& opts,
507                                        WriteBatch* updates) {
508   return WriteWithConcurrencyControl(opts, updates);
509 }
510 
Write(const WriteOptions & opts,WriteBatch * updates)511 Status WriteCommittedTxnDB::Write(const WriteOptions& opts,
512                                   WriteBatch* updates) {
513   if (txn_db_options_.skip_concurrency_control) {
514     return db_impl_->Write(opts, updates);
515   } else {
516     return WriteWithConcurrencyControl(opts, updates);
517   }
518 }
519 
Write(const WriteOptions & opts,const TransactionDBWriteOptimizations & optimizations,WriteBatch * updates)520 Status WriteCommittedTxnDB::Write(
521     const WriteOptions& opts,
522     const TransactionDBWriteOptimizations& optimizations, WriteBatch* updates) {
523   if (optimizations.skip_concurrency_control) {
524     return db_impl_->Write(opts, updates);
525   } else {
526     return WriteWithConcurrencyControl(opts, updates);
527   }
528 }
529 
InsertExpirableTransaction(TransactionID tx_id,PessimisticTransaction * tx)530 void PessimisticTransactionDB::InsertExpirableTransaction(
531     TransactionID tx_id, PessimisticTransaction* tx) {
532   assert(tx->GetExpirationTime() > 0);
533   std::lock_guard<std::mutex> lock(map_mutex_);
534   expirable_transactions_map_.insert({tx_id, tx});
535 }
536 
RemoveExpirableTransaction(TransactionID tx_id)537 void PessimisticTransactionDB::RemoveExpirableTransaction(TransactionID tx_id) {
538   std::lock_guard<std::mutex> lock(map_mutex_);
539   expirable_transactions_map_.erase(tx_id);
540 }
541 
TryStealingExpiredTransactionLocks(TransactionID tx_id)542 bool PessimisticTransactionDB::TryStealingExpiredTransactionLocks(
543     TransactionID tx_id) {
544   std::lock_guard<std::mutex> lock(map_mutex_);
545 
546   auto tx_it = expirable_transactions_map_.find(tx_id);
547   if (tx_it == expirable_transactions_map_.end()) {
548     return true;
549   }
550   PessimisticTransaction& tx = *(tx_it->second);
551   return tx.TryStealingLocks();
552 }
553 
ReinitializeTransaction(Transaction * txn,const WriteOptions & write_options,const TransactionOptions & txn_options)554 void PessimisticTransactionDB::ReinitializeTransaction(
555     Transaction* txn, const WriteOptions& write_options,
556     const TransactionOptions& txn_options) {
557   auto txn_impl = static_cast_with_check<PessimisticTransaction>(txn);
558 
559   txn_impl->Reinitialize(this, write_options, txn_options);
560 }
561 
GetTransactionByName(const TransactionName & name)562 Transaction* PessimisticTransactionDB::GetTransactionByName(
563     const TransactionName& name) {
564   std::lock_guard<std::mutex> lock(name_map_mutex_);
565   auto it = transactions_.find(name);
566   if (it == transactions_.end()) {
567     return nullptr;
568   } else {
569     return it->second;
570   }
571 }
572 
GetAllPreparedTransactions(std::vector<Transaction * > * transv)573 void PessimisticTransactionDB::GetAllPreparedTransactions(
574     std::vector<Transaction*>* transv) {
575   assert(transv);
576   transv->clear();
577   std::lock_guard<std::mutex> lock(name_map_mutex_);
578   for (auto it = transactions_.begin(); it != transactions_.end(); ++it) {
579     if (it->second->GetState() == Transaction::PREPARED) {
580       transv->push_back(it->second);
581     }
582   }
583 }
584 
GetLockStatusData()585 LockManager::PointLockStatus PessimisticTransactionDB::GetLockStatusData() {
586   return lock_manager_->GetPointLockStatus();
587 }
588 
GetDeadlockInfoBuffer()589 std::vector<DeadlockPath> PessimisticTransactionDB::GetDeadlockInfoBuffer() {
590   return lock_manager_->GetDeadlockInfoBuffer();
591 }
592 
SetDeadlockInfoBufferSize(uint32_t target_size)593 void PessimisticTransactionDB::SetDeadlockInfoBufferSize(uint32_t target_size) {
594   lock_manager_->Resize(target_size);
595 }
596 
RegisterTransaction(Transaction * txn)597 void PessimisticTransactionDB::RegisterTransaction(Transaction* txn) {
598   assert(txn);
599   assert(txn->GetName().length() > 0);
600   assert(GetTransactionByName(txn->GetName()) == nullptr);
601   assert(txn->GetState() == Transaction::STARTED);
602   std::lock_guard<std::mutex> lock(name_map_mutex_);
603   transactions_[txn->GetName()] = txn;
604 }
605 
UnregisterTransaction(Transaction * txn)606 void PessimisticTransactionDB::UnregisterTransaction(Transaction* txn) {
607   assert(txn);
608   std::lock_guard<std::mutex> lock(name_map_mutex_);
609   auto it = transactions_.find(txn->GetName());
610   assert(it != transactions_.end());
611   transactions_.erase(it);
612 }
613 
614 }  // namespace ROCKSDB_NAMESPACE
615 #endif  // ROCKSDB_LITE
616