1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7
8 #include "utilities/transactions/pessimistic_transaction_db.h"
9
10 #include <cinttypes>
11 #include <string>
12 #include <unordered_set>
13 #include <vector>
14
15 #include "db/db_impl/db_impl.h"
16 #include "logging/logging.h"
17 #include "rocksdb/db.h"
18 #include "rocksdb/options.h"
19 #include "rocksdb/utilities/transaction_db.h"
20 #include "test_util/sync_point.h"
21 #include "util/cast_util.h"
22 #include "util/mutexlock.h"
23 #include "utilities/transactions/pessimistic_transaction.h"
24 #include "utilities/transactions/transaction_db_mutex_impl.h"
25 #include "utilities/transactions/write_prepared_txn_db.h"
26 #include "utilities/transactions/write_unprepared_txn_db.h"
27
28 namespace ROCKSDB_NAMESPACE {
29
PessimisticTransactionDB(DB * db,const TransactionDBOptions & txn_db_options)30 PessimisticTransactionDB::PessimisticTransactionDB(
31 DB* db, const TransactionDBOptions& txn_db_options)
32 : TransactionDB(db),
33 db_impl_(static_cast_with_check<DBImpl>(db)),
34 txn_db_options_(txn_db_options),
35 lock_manager_(NewLockManager(this, txn_db_options)) {
36 assert(db_impl_ != nullptr);
37 info_log_ = db_impl_->GetDBOptions().info_log;
38 }
39
40 // Support initiliazing PessimisticTransactionDB from a stackable db
41 //
42 // PessimisticTransactionDB
43 // ^ ^
44 // | |
45 // | +
46 // | StackableDB
47 // | ^
48 // | |
49 // + +
50 // DBImpl
51 // ^
52 // |(inherit)
53 // +
54 // DB
55 //
PessimisticTransactionDB(StackableDB * db,const TransactionDBOptions & txn_db_options)56 PessimisticTransactionDB::PessimisticTransactionDB(
57 StackableDB* db, const TransactionDBOptions& txn_db_options)
58 : TransactionDB(db),
59 db_impl_(static_cast_with_check<DBImpl>(db->GetRootDB())),
60 txn_db_options_(txn_db_options),
61 lock_manager_(NewLockManager(this, txn_db_options)) {
62 assert(db_impl_ != nullptr);
63 }
64
~PessimisticTransactionDB()65 PessimisticTransactionDB::~PessimisticTransactionDB() {
66 while (!transactions_.empty()) {
67 delete transactions_.begin()->second;
68 // TODO(myabandeh): this seems to be an unsafe approach as it is not quite
69 // clear whether delete would also remove the entry from transactions_.
70 }
71 }
72
VerifyCFOptions(const ColumnFamilyOptions &)73 Status PessimisticTransactionDB::VerifyCFOptions(const ColumnFamilyOptions&) {
74 return Status::OK();
75 }
76
Initialize(const std::vector<size_t> & compaction_enabled_cf_indices,const std::vector<ColumnFamilyHandle * > & handles)77 Status PessimisticTransactionDB::Initialize(
78 const std::vector<size_t>& compaction_enabled_cf_indices,
79 const std::vector<ColumnFamilyHandle*>& handles) {
80 for (auto cf_ptr : handles) {
81 AddColumnFamily(cf_ptr);
82 }
83 // Verify cf options
84 for (auto handle : handles) {
85 ColumnFamilyDescriptor cfd;
86 Status s = handle->GetDescriptor(&cfd);
87 if (!s.ok()) {
88 return s;
89 }
90 s = VerifyCFOptions(cfd.options);
91 if (!s.ok()) {
92 return s;
93 }
94 }
95
96 // Re-enable compaction for the column families that initially had
97 // compaction enabled.
98 std::vector<ColumnFamilyHandle*> compaction_enabled_cf_handles;
99 compaction_enabled_cf_handles.reserve(compaction_enabled_cf_indices.size());
100 for (auto index : compaction_enabled_cf_indices) {
101 compaction_enabled_cf_handles.push_back(handles[index]);
102 }
103
104 Status s = EnableAutoCompaction(compaction_enabled_cf_handles);
105
106 // create 'real' transactions from recovered shell transactions
107 auto dbimpl = static_cast_with_check<DBImpl>(GetRootDB());
108 assert(dbimpl != nullptr);
109 auto rtrxs = dbimpl->recovered_transactions();
110
111 for (auto it = rtrxs.begin(); it != rtrxs.end(); ++it) {
112 auto recovered_trx = it->second;
113 assert(recovered_trx);
114 assert(recovered_trx->batches_.size() == 1);
115 const auto& seq = recovered_trx->batches_.begin()->first;
116 const auto& batch_info = recovered_trx->batches_.begin()->second;
117 assert(batch_info.log_number_);
118 assert(recovered_trx->name_.length());
119
120 WriteOptions w_options;
121 w_options.sync = true;
122 TransactionOptions t_options;
123 // This would help avoiding deadlock for keys that although exist in the WAL
124 // did not go through concurrency control. This includes the merge that
125 // MyRocks uses for auto-inc columns. It is safe to do so, since (i) if
126 // there is a conflict between the keys of two transactions that must be
127 // avoided, it is already avoided by the application, MyRocks, before the
128 // restart (ii) application, MyRocks, guarntees to rollback/commit the
129 // recovered transactions before new transactions start.
130 t_options.skip_concurrency_control = true;
131
132 Transaction* real_trx = BeginTransaction(w_options, t_options, nullptr);
133 assert(real_trx);
134 real_trx->SetLogNumber(batch_info.log_number_);
135 assert(seq != kMaxSequenceNumber);
136 if (GetTxnDBOptions().write_policy != WRITE_COMMITTED) {
137 real_trx->SetId(seq);
138 }
139
140 s = real_trx->SetName(recovered_trx->name_);
141 if (!s.ok()) {
142 break;
143 }
144
145 s = real_trx->RebuildFromWriteBatch(batch_info.batch_);
146 // WriteCommitted set this to to disable this check that is specific to
147 // WritePrepared txns
148 assert(batch_info.batch_cnt_ == 0 ||
149 real_trx->GetWriteBatch()->SubBatchCnt() == batch_info.batch_cnt_);
150 real_trx->SetState(Transaction::PREPARED);
151 if (!s.ok()) {
152 break;
153 }
154 }
155 if (s.ok()) {
156 dbimpl->DeleteAllRecoveredTransactions();
157 }
158 return s;
159 }
160
BeginTransaction(const WriteOptions & write_options,const TransactionOptions & txn_options,Transaction * old_txn)161 Transaction* WriteCommittedTxnDB::BeginTransaction(
162 const WriteOptions& write_options, const TransactionOptions& txn_options,
163 Transaction* old_txn) {
164 if (old_txn != nullptr) {
165 ReinitializeTransaction(old_txn, write_options, txn_options);
166 return old_txn;
167 } else {
168 return new WriteCommittedTxn(this, write_options, txn_options);
169 }
170 }
171
ValidateTxnDBOptions(const TransactionDBOptions & txn_db_options)172 TransactionDBOptions PessimisticTransactionDB::ValidateTxnDBOptions(
173 const TransactionDBOptions& txn_db_options) {
174 TransactionDBOptions validated = txn_db_options;
175
176 if (txn_db_options.num_stripes == 0) {
177 validated.num_stripes = 1;
178 }
179
180 return validated;
181 }
182
Open(const Options & options,const TransactionDBOptions & txn_db_options,const std::string & dbname,TransactionDB ** dbptr)183 Status TransactionDB::Open(const Options& options,
184 const TransactionDBOptions& txn_db_options,
185 const std::string& dbname, TransactionDB** dbptr) {
186 DBOptions db_options(options);
187 ColumnFamilyOptions cf_options(options);
188 std::vector<ColumnFamilyDescriptor> column_families;
189 column_families.push_back(
190 ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
191 std::vector<ColumnFamilyHandle*> handles;
192 Status s = TransactionDB::Open(db_options, txn_db_options, dbname,
193 column_families, &handles, dbptr);
194 if (s.ok()) {
195 assert(handles.size() == 1);
196 // i can delete the handle since DBImpl is always holding a reference to
197 // default column family
198 delete handles[0];
199 }
200
201 return s;
202 }
203
Open(const DBOptions & db_options,const TransactionDBOptions & txn_db_options,const std::string & dbname,const std::vector<ColumnFamilyDescriptor> & column_families,std::vector<ColumnFamilyHandle * > * handles,TransactionDB ** dbptr)204 Status TransactionDB::Open(
205 const DBOptions& db_options, const TransactionDBOptions& txn_db_options,
206 const std::string& dbname,
207 const std::vector<ColumnFamilyDescriptor>& column_families,
208 std::vector<ColumnFamilyHandle*>* handles, TransactionDB** dbptr) {
209 Status s;
210 DB* db = nullptr;
211 if (txn_db_options.write_policy == WRITE_COMMITTED &&
212 db_options.unordered_write) {
213 return Status::NotSupported(
214 "WRITE_COMMITTED is incompatible with unordered_writes");
215 }
216 if (txn_db_options.write_policy == WRITE_UNPREPARED &&
217 db_options.unordered_write) {
218 // TODO(lth): support it
219 return Status::NotSupported(
220 "WRITE_UNPREPARED is currently incompatible with unordered_writes");
221 }
222 if (txn_db_options.write_policy == WRITE_PREPARED &&
223 db_options.unordered_write && !db_options.two_write_queues) {
224 return Status::NotSupported(
225 "WRITE_PREPARED is incompatible with unordered_writes if "
226 "two_write_queues is not enabled.");
227 }
228
229 std::vector<ColumnFamilyDescriptor> column_families_copy = column_families;
230 std::vector<size_t> compaction_enabled_cf_indices;
231 DBOptions db_options_2pc = db_options;
232 PrepareWrap(&db_options_2pc, &column_families_copy,
233 &compaction_enabled_cf_indices);
234 const bool use_seq_per_batch =
235 txn_db_options.write_policy == WRITE_PREPARED ||
236 txn_db_options.write_policy == WRITE_UNPREPARED;
237 const bool use_batch_per_txn =
238 txn_db_options.write_policy == WRITE_COMMITTED ||
239 txn_db_options.write_policy == WRITE_PREPARED;
240 s = DBImpl::Open(db_options_2pc, dbname, column_families_copy, handles, &db,
241 use_seq_per_batch, use_batch_per_txn);
242 if (s.ok()) {
243 ROCKS_LOG_WARN(db->GetDBOptions().info_log,
244 "Transaction write_policy is %" PRId32,
245 static_cast<int>(txn_db_options.write_policy));
246 s = WrapDB(db, txn_db_options, compaction_enabled_cf_indices, *handles,
247 dbptr);
248 }
249 if (!s.ok()) {
250 // just in case it was not deleted (and not set to nullptr).
251 delete db;
252 }
253 return s;
254 }
255
PrepareWrap(DBOptions * db_options,std::vector<ColumnFamilyDescriptor> * column_families,std::vector<size_t> * compaction_enabled_cf_indices)256 void TransactionDB::PrepareWrap(
257 DBOptions* db_options, std::vector<ColumnFamilyDescriptor>* column_families,
258 std::vector<size_t>* compaction_enabled_cf_indices) {
259 compaction_enabled_cf_indices->clear();
260
261 // Enable MemTable History if not already enabled
262 for (size_t i = 0; i < column_families->size(); i++) {
263 ColumnFamilyOptions* cf_options = &(*column_families)[i].options;
264
265 if (cf_options->max_write_buffer_size_to_maintain == 0 &&
266 cf_options->max_write_buffer_number_to_maintain == 0) {
267 // Setting to -1 will set the History size to
268 // max_write_buffer_number * write_buffer_size.
269 cf_options->max_write_buffer_size_to_maintain = -1;
270 }
271 if (!cf_options->disable_auto_compactions) {
272 // Disable compactions momentarily to prevent race with DB::Open
273 cf_options->disable_auto_compactions = true;
274 compaction_enabled_cf_indices->push_back(i);
275 }
276 }
277 db_options->allow_2pc = true;
278 }
279
280 namespace {
281 template <typename DBType>
WrapAnotherDBInternal(DBType * db,const TransactionDBOptions & txn_db_options,const std::vector<size_t> & compaction_enabled_cf_indices,const std::vector<ColumnFamilyHandle * > & handles,TransactionDB ** dbptr)282 Status WrapAnotherDBInternal(
283 DBType* db, const TransactionDBOptions& txn_db_options,
284 const std::vector<size_t>& compaction_enabled_cf_indices,
285 const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) {
286 assert(db != nullptr);
287 assert(dbptr != nullptr);
288 *dbptr = nullptr;
289 std::unique_ptr<PessimisticTransactionDB> txn_db;
290 switch (txn_db_options.write_policy) {
291 case WRITE_UNPREPARED:
292 txn_db.reset(new WriteUnpreparedTxnDB(
293 db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
294 break;
295 case WRITE_PREPARED:
296 txn_db.reset(new WritePreparedTxnDB(
297 db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
298 break;
299 case WRITE_COMMITTED:
300 default:
301 txn_db.reset(new WriteCommittedTxnDB(
302 db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
303 }
304 txn_db->UpdateCFComparatorMap(handles);
305 Status s = txn_db->Initialize(compaction_enabled_cf_indices, handles);
306 // In case of a failure at this point, db is deleted via the txn_db destructor
307 // and set to nullptr.
308 if (s.ok()) {
309 *dbptr = txn_db.release();
310 }
311 return s;
312 }
313 } // namespace
314
WrapDB(DB * db,const TransactionDBOptions & txn_db_options,const std::vector<size_t> & compaction_enabled_cf_indices,const std::vector<ColumnFamilyHandle * > & handles,TransactionDB ** dbptr)315 Status TransactionDB::WrapDB(
316 // make sure this db is already opened with memtable history enabled,
317 // auto compaction distabled and 2 phase commit enabled
318 DB* db, const TransactionDBOptions& txn_db_options,
319 const std::vector<size_t>& compaction_enabled_cf_indices,
320 const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) {
321 return WrapAnotherDBInternal(db, txn_db_options,
322 compaction_enabled_cf_indices, handles, dbptr);
323 }
324
WrapStackableDB(StackableDB * db,const TransactionDBOptions & txn_db_options,const std::vector<size_t> & compaction_enabled_cf_indices,const std::vector<ColumnFamilyHandle * > & handles,TransactionDB ** dbptr)325 Status TransactionDB::WrapStackableDB(
326 // make sure this stackable_db is already opened with memtable history
327 // enabled, auto compaction distabled and 2 phase commit enabled
328 StackableDB* db, const TransactionDBOptions& txn_db_options,
329 const std::vector<size_t>& compaction_enabled_cf_indices,
330 const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) {
331 return WrapAnotherDBInternal(db, txn_db_options,
332 compaction_enabled_cf_indices, handles, dbptr);
333 }
334
335 // Let LockManager know that this column family exists so it can
336 // allocate a LockMap for it.
AddColumnFamily(const ColumnFamilyHandle * handle)337 void PessimisticTransactionDB::AddColumnFamily(
338 const ColumnFamilyHandle* handle) {
339 lock_manager_->AddColumnFamily(handle);
340 }
341
CreateColumnFamily(const ColumnFamilyOptions & options,const std::string & column_family_name,ColumnFamilyHandle ** handle)342 Status PessimisticTransactionDB::CreateColumnFamily(
343 const ColumnFamilyOptions& options, const std::string& column_family_name,
344 ColumnFamilyHandle** handle) {
345 InstrumentedMutexLock l(&column_family_mutex_);
346 Status s = VerifyCFOptions(options);
347 if (!s.ok()) {
348 return s;
349 }
350
351 s = db_->CreateColumnFamily(options, column_family_name, handle);
352 if (s.ok()) {
353 lock_manager_->AddColumnFamily(*handle);
354 UpdateCFComparatorMap(*handle);
355 }
356
357 return s;
358 }
359
360 // Let LockManager know that it can deallocate the LockMap for this
361 // column family.
DropColumnFamily(ColumnFamilyHandle * column_family)362 Status PessimisticTransactionDB::DropColumnFamily(
363 ColumnFamilyHandle* column_family) {
364 InstrumentedMutexLock l(&column_family_mutex_);
365
366 Status s = db_->DropColumnFamily(column_family);
367 if (s.ok()) {
368 lock_manager_->RemoveColumnFamily(column_family);
369 }
370
371 return s;
372 }
373
TryLock(PessimisticTransaction * txn,uint32_t cfh_id,const std::string & key,bool exclusive)374 Status PessimisticTransactionDB::TryLock(PessimisticTransaction* txn,
375 uint32_t cfh_id,
376 const std::string& key,
377 bool exclusive) {
378 return lock_manager_->TryLock(txn, cfh_id, key, GetEnv(), exclusive);
379 }
380
TryRangeLock(PessimisticTransaction * txn,uint32_t cfh_id,const Endpoint & start_endp,const Endpoint & end_endp)381 Status PessimisticTransactionDB::TryRangeLock(PessimisticTransaction* txn,
382 uint32_t cfh_id,
383 const Endpoint& start_endp,
384 const Endpoint& end_endp) {
385 return lock_manager_->TryLock(txn, cfh_id, start_endp, end_endp, GetEnv(),
386 /*exclusive=*/true);
387 }
388
UnLock(PessimisticTransaction * txn,const LockTracker & keys)389 void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn,
390 const LockTracker& keys) {
391 lock_manager_->UnLock(txn, keys, GetEnv());
392 }
393
UnLock(PessimisticTransaction * txn,uint32_t cfh_id,const std::string & key)394 void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn,
395 uint32_t cfh_id, const std::string& key) {
396 lock_manager_->UnLock(txn, cfh_id, key, GetEnv());
397 }
398
399 // Used when wrapping DB write operations in a transaction
BeginInternalTransaction(const WriteOptions & options)400 Transaction* PessimisticTransactionDB::BeginInternalTransaction(
401 const WriteOptions& options) {
402 TransactionOptions txn_options;
403 Transaction* txn = BeginTransaction(options, txn_options, nullptr);
404
405 // Use default timeout for non-transactional writes
406 txn->SetLockTimeout(txn_db_options_.default_lock_timeout);
407 return txn;
408 }
409
410 // All user Put, Merge, Delete, and Write requests must be intercepted to make
411 // sure that they lock all keys that they are writing to avoid causing conflicts
412 // with any concurrent transactions. The easiest way to do this is to wrap all
413 // write operations in a transaction.
414 //
415 // Put(), Merge(), and Delete() only lock a single key per call. Write() will
416 // sort its keys before locking them. This guarantees that TransactionDB write
417 // methods cannot deadlock with each other (but still could deadlock with a
418 // Transaction).
Put(const WriteOptions & options,ColumnFamilyHandle * column_family,const Slice & key,const Slice & val)419 Status PessimisticTransactionDB::Put(const WriteOptions& options,
420 ColumnFamilyHandle* column_family,
421 const Slice& key, const Slice& val) {
422 Status s;
423
424 Transaction* txn = BeginInternalTransaction(options);
425 txn->DisableIndexing();
426
427 // Since the client didn't create a transaction, they don't care about
428 // conflict checking for this write. So we just need to do PutUntracked().
429 s = txn->PutUntracked(column_family, key, val);
430
431 if (s.ok()) {
432 s = txn->Commit();
433 }
434
435 delete txn;
436
437 return s;
438 }
439
Delete(const WriteOptions & wopts,ColumnFamilyHandle * column_family,const Slice & key)440 Status PessimisticTransactionDB::Delete(const WriteOptions& wopts,
441 ColumnFamilyHandle* column_family,
442 const Slice& key) {
443 Status s;
444
445 Transaction* txn = BeginInternalTransaction(wopts);
446 txn->DisableIndexing();
447
448 // Since the client didn't create a transaction, they don't care about
449 // conflict checking for this write. So we just need to do
450 // DeleteUntracked().
451 s = txn->DeleteUntracked(column_family, key);
452
453 if (s.ok()) {
454 s = txn->Commit();
455 }
456
457 delete txn;
458
459 return s;
460 }
461
SingleDelete(const WriteOptions & wopts,ColumnFamilyHandle * column_family,const Slice & key)462 Status PessimisticTransactionDB::SingleDelete(const WriteOptions& wopts,
463 ColumnFamilyHandle* column_family,
464 const Slice& key) {
465 Status s;
466
467 Transaction* txn = BeginInternalTransaction(wopts);
468 txn->DisableIndexing();
469
470 // Since the client didn't create a transaction, they don't care about
471 // conflict checking for this write. So we just need to do
472 // SingleDeleteUntracked().
473 s = txn->SingleDeleteUntracked(column_family, key);
474
475 if (s.ok()) {
476 s = txn->Commit();
477 }
478
479 delete txn;
480
481 return s;
482 }
483
Merge(const WriteOptions & options,ColumnFamilyHandle * column_family,const Slice & key,const Slice & value)484 Status PessimisticTransactionDB::Merge(const WriteOptions& options,
485 ColumnFamilyHandle* column_family,
486 const Slice& key, const Slice& value) {
487 Status s;
488
489 Transaction* txn = BeginInternalTransaction(options);
490 txn->DisableIndexing();
491
492 // Since the client didn't create a transaction, they don't care about
493 // conflict checking for this write. So we just need to do
494 // MergeUntracked().
495 s = txn->MergeUntracked(column_family, key, value);
496
497 if (s.ok()) {
498 s = txn->Commit();
499 }
500
501 delete txn;
502
503 return s;
504 }
505
Write(const WriteOptions & opts,WriteBatch * updates)506 Status PessimisticTransactionDB::Write(const WriteOptions& opts,
507 WriteBatch* updates) {
508 return WriteWithConcurrencyControl(opts, updates);
509 }
510
Write(const WriteOptions & opts,WriteBatch * updates)511 Status WriteCommittedTxnDB::Write(const WriteOptions& opts,
512 WriteBatch* updates) {
513 if (txn_db_options_.skip_concurrency_control) {
514 return db_impl_->Write(opts, updates);
515 } else {
516 return WriteWithConcurrencyControl(opts, updates);
517 }
518 }
519
Write(const WriteOptions & opts,const TransactionDBWriteOptimizations & optimizations,WriteBatch * updates)520 Status WriteCommittedTxnDB::Write(
521 const WriteOptions& opts,
522 const TransactionDBWriteOptimizations& optimizations, WriteBatch* updates) {
523 if (optimizations.skip_concurrency_control) {
524 return db_impl_->Write(opts, updates);
525 } else {
526 return WriteWithConcurrencyControl(opts, updates);
527 }
528 }
529
InsertExpirableTransaction(TransactionID tx_id,PessimisticTransaction * tx)530 void PessimisticTransactionDB::InsertExpirableTransaction(
531 TransactionID tx_id, PessimisticTransaction* tx) {
532 assert(tx->GetExpirationTime() > 0);
533 std::lock_guard<std::mutex> lock(map_mutex_);
534 expirable_transactions_map_.insert({tx_id, tx});
535 }
536
RemoveExpirableTransaction(TransactionID tx_id)537 void PessimisticTransactionDB::RemoveExpirableTransaction(TransactionID tx_id) {
538 std::lock_guard<std::mutex> lock(map_mutex_);
539 expirable_transactions_map_.erase(tx_id);
540 }
541
TryStealingExpiredTransactionLocks(TransactionID tx_id)542 bool PessimisticTransactionDB::TryStealingExpiredTransactionLocks(
543 TransactionID tx_id) {
544 std::lock_guard<std::mutex> lock(map_mutex_);
545
546 auto tx_it = expirable_transactions_map_.find(tx_id);
547 if (tx_it == expirable_transactions_map_.end()) {
548 return true;
549 }
550 PessimisticTransaction& tx = *(tx_it->second);
551 return tx.TryStealingLocks();
552 }
553
ReinitializeTransaction(Transaction * txn,const WriteOptions & write_options,const TransactionOptions & txn_options)554 void PessimisticTransactionDB::ReinitializeTransaction(
555 Transaction* txn, const WriteOptions& write_options,
556 const TransactionOptions& txn_options) {
557 auto txn_impl = static_cast_with_check<PessimisticTransaction>(txn);
558
559 txn_impl->Reinitialize(this, write_options, txn_options);
560 }
561
GetTransactionByName(const TransactionName & name)562 Transaction* PessimisticTransactionDB::GetTransactionByName(
563 const TransactionName& name) {
564 std::lock_guard<std::mutex> lock(name_map_mutex_);
565 auto it = transactions_.find(name);
566 if (it == transactions_.end()) {
567 return nullptr;
568 } else {
569 return it->second;
570 }
571 }
572
GetAllPreparedTransactions(std::vector<Transaction * > * transv)573 void PessimisticTransactionDB::GetAllPreparedTransactions(
574 std::vector<Transaction*>* transv) {
575 assert(transv);
576 transv->clear();
577 std::lock_guard<std::mutex> lock(name_map_mutex_);
578 for (auto it = transactions_.begin(); it != transactions_.end(); ++it) {
579 if (it->second->GetState() == Transaction::PREPARED) {
580 transv->push_back(it->second);
581 }
582 }
583 }
584
GetLockStatusData()585 LockManager::PointLockStatus PessimisticTransactionDB::GetLockStatusData() {
586 return lock_manager_->GetPointLockStatus();
587 }
588
GetDeadlockInfoBuffer()589 std::vector<DeadlockPath> PessimisticTransactionDB::GetDeadlockInfoBuffer() {
590 return lock_manager_->GetDeadlockInfoBuffer();
591 }
592
SetDeadlockInfoBufferSize(uint32_t target_size)593 void PessimisticTransactionDB::SetDeadlockInfoBufferSize(uint32_t target_size) {
594 lock_manager_->Resize(target_size);
595 }
596
RegisterTransaction(Transaction * txn)597 void PessimisticTransactionDB::RegisterTransaction(Transaction* txn) {
598 assert(txn);
599 assert(txn->GetName().length() > 0);
600 assert(GetTransactionByName(txn->GetName()) == nullptr);
601 assert(txn->GetState() == Transaction::STARTED);
602 std::lock_guard<std::mutex> lock(name_map_mutex_);
603 transactions_[txn->GetName()] = txn;
604 }
605
UnregisterTransaction(Transaction * txn)606 void PessimisticTransactionDB::UnregisterTransaction(Transaction* txn) {
607 assert(txn);
608 std::lock_guard<std::mutex> lock(name_map_mutex_);
609 auto it = transactions_.find(txn->GetName());
610 assert(it != transactions_.end());
611 transactions_.erase(it);
612 }
613
614 } // namespace ROCKSDB_NAMESPACE
615 #endif // ROCKSDB_LITE
616