1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7
8 #include "utilities/transactions/write_unprepared_txn.h"
9 #include "db/db_impl/db_impl.h"
10 #include "util/cast_util.h"
11 #include "utilities/transactions/write_unprepared_txn_db.h"
12 #include "utilities/write_batch_with_index/write_batch_with_index_internal.h"
13
14 namespace ROCKSDB_NAMESPACE {
15
IsVisibleFullCheck(SequenceNumber seq)16 bool WriteUnpreparedTxnReadCallback::IsVisibleFullCheck(SequenceNumber seq) {
17 // Since unprep_seqs maps prep_seq => prepare_batch_cnt, to check if seq is
18 // in unprep_seqs, we have to check if seq is equal to prep_seq or any of
19 // the prepare_batch_cnt seq nums after it.
20 //
21 // TODO(lth): Can be optimized with std::lower_bound if unprep_seqs is
22 // large.
23 for (const auto& it : unprep_seqs_) {
24 if (it.first <= seq && seq < it.first + it.second) {
25 return true;
26 }
27 }
28
29 bool snap_released = false;
30 auto ret =
31 db_->IsInSnapshot(seq, wup_snapshot_, min_uncommitted_, &snap_released);
32 assert(!snap_released || backed_by_snapshot_ == kUnbackedByDBSnapshot);
33 snap_released_ |= snap_released;
34 return ret;
35 }
36
WriteUnpreparedTxn(WriteUnpreparedTxnDB * txn_db,const WriteOptions & write_options,const TransactionOptions & txn_options)37 WriteUnpreparedTxn::WriteUnpreparedTxn(WriteUnpreparedTxnDB* txn_db,
38 const WriteOptions& write_options,
39 const TransactionOptions& txn_options)
40 : WritePreparedTxn(txn_db, write_options, txn_options),
41 wupt_db_(txn_db),
42 last_log_number_(0),
43 recovered_txn_(false),
44 largest_validated_seq_(0) {
45 if (txn_options.write_batch_flush_threshold < 0) {
46 write_batch_flush_threshold_ =
47 txn_db_impl_->GetTxnDBOptions().default_write_batch_flush_threshold;
48 } else {
49 write_batch_flush_threshold_ = txn_options.write_batch_flush_threshold;
50 }
51 }
52
~WriteUnpreparedTxn()53 WriteUnpreparedTxn::~WriteUnpreparedTxn() {
54 if (!unprep_seqs_.empty()) {
55 assert(log_number_ > 0);
56 assert(GetId() > 0);
57 assert(!name_.empty());
58
59 // We should rollback regardless of GetState, but some unit tests that
60 // test crash recovery run the destructor assuming that rollback does not
61 // happen, so that rollback during recovery can be exercised.
62 if (GetState() == STARTED || GetState() == LOCKS_STOLEN) {
63 auto s = RollbackInternal();
64 assert(s.ok());
65 if (!s.ok()) {
66 ROCKS_LOG_FATAL(
67 wupt_db_->info_log_,
68 "Rollback of WriteUnprepared transaction failed in destructor: %s",
69 s.ToString().c_str());
70 }
71 dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
72 log_number_);
73 }
74 }
75
76 // Clear the tracked locks so that ~PessimisticTransaction does not
77 // try to unlock keys for recovered transactions.
78 if (recovered_txn_) {
79 tracked_locks_->Clear();
80 }
81 }
82
Initialize(const TransactionOptions & txn_options)83 void WriteUnpreparedTxn::Initialize(const TransactionOptions& txn_options) {
84 PessimisticTransaction::Initialize(txn_options);
85 if (txn_options.write_batch_flush_threshold < 0) {
86 write_batch_flush_threshold_ =
87 txn_db_impl_->GetTxnDBOptions().default_write_batch_flush_threshold;
88 } else {
89 write_batch_flush_threshold_ = txn_options.write_batch_flush_threshold;
90 }
91
92 unprep_seqs_.clear();
93 flushed_save_points_.reset(nullptr);
94 unflushed_save_points_.reset(nullptr);
95 recovered_txn_ = false;
96 largest_validated_seq_ = 0;
97 assert(active_iterators_.empty());
98 active_iterators_.clear();
99 untracked_keys_.clear();
100 }
101
HandleWrite(std::function<Status ()> do_write)102 Status WriteUnpreparedTxn::HandleWrite(std::function<Status()> do_write) {
103 Status s;
104 if (active_iterators_.empty()) {
105 s = MaybeFlushWriteBatchToDB();
106 if (!s.ok()) {
107 return s;
108 }
109 }
110 s = do_write();
111 if (s.ok()) {
112 if (snapshot_) {
113 largest_validated_seq_ =
114 std::max(largest_validated_seq_, snapshot_->GetSequenceNumber());
115 } else {
116 // TODO(lth): We should use the same number as tracked_at_seq in TryLock,
117 // because what is actually being tracked is the sequence number at which
118 // this key was locked at.
119 largest_validated_seq_ = db_impl_->GetLastPublishedSequence();
120 }
121 }
122 return s;
123 }
124
Put(ColumnFamilyHandle * column_family,const Slice & key,const Slice & value,const bool assume_tracked)125 Status WriteUnpreparedTxn::Put(ColumnFamilyHandle* column_family,
126 const Slice& key, const Slice& value,
127 const bool assume_tracked) {
128 return HandleWrite([&]() {
129 return TransactionBaseImpl::Put(column_family, key, value, assume_tracked);
130 });
131 }
132
Put(ColumnFamilyHandle * column_family,const SliceParts & key,const SliceParts & value,const bool assume_tracked)133 Status WriteUnpreparedTxn::Put(ColumnFamilyHandle* column_family,
134 const SliceParts& key, const SliceParts& value,
135 const bool assume_tracked) {
136 return HandleWrite([&]() {
137 return TransactionBaseImpl::Put(column_family, key, value, assume_tracked);
138 });
139 }
140
Merge(ColumnFamilyHandle * column_family,const Slice & key,const Slice & value,const bool assume_tracked)141 Status WriteUnpreparedTxn::Merge(ColumnFamilyHandle* column_family,
142 const Slice& key, const Slice& value,
143 const bool assume_tracked) {
144 return HandleWrite([&]() {
145 return TransactionBaseImpl::Merge(column_family, key, value,
146 assume_tracked);
147 });
148 }
149
Delete(ColumnFamilyHandle * column_family,const Slice & key,const bool assume_tracked)150 Status WriteUnpreparedTxn::Delete(ColumnFamilyHandle* column_family,
151 const Slice& key, const bool assume_tracked) {
152 return HandleWrite([&]() {
153 return TransactionBaseImpl::Delete(column_family, key, assume_tracked);
154 });
155 }
156
Delete(ColumnFamilyHandle * column_family,const SliceParts & key,const bool assume_tracked)157 Status WriteUnpreparedTxn::Delete(ColumnFamilyHandle* column_family,
158 const SliceParts& key,
159 const bool assume_tracked) {
160 return HandleWrite([&]() {
161 return TransactionBaseImpl::Delete(column_family, key, assume_tracked);
162 });
163 }
164
SingleDelete(ColumnFamilyHandle * column_family,const Slice & key,const bool assume_tracked)165 Status WriteUnpreparedTxn::SingleDelete(ColumnFamilyHandle* column_family,
166 const Slice& key,
167 const bool assume_tracked) {
168 return HandleWrite([&]() {
169 return TransactionBaseImpl::SingleDelete(column_family, key,
170 assume_tracked);
171 });
172 }
173
SingleDelete(ColumnFamilyHandle * column_family,const SliceParts & key,const bool assume_tracked)174 Status WriteUnpreparedTxn::SingleDelete(ColumnFamilyHandle* column_family,
175 const SliceParts& key,
176 const bool assume_tracked) {
177 return HandleWrite([&]() {
178 return TransactionBaseImpl::SingleDelete(column_family, key,
179 assume_tracked);
180 });
181 }
182
183 // WriteUnpreparedTxn::RebuildFromWriteBatch is only called on recovery. For
184 // WriteUnprepared, the write batches have already been written into the
185 // database during WAL replay, so all we have to do is just to "retrack" the key
186 // so that rollbacks are possible.
187 //
188 // Calling TryLock instead of TrackKey is also possible, but as an optimization,
189 // recovered transactions do not hold locks on their keys. This follows the
190 // implementation in PessimisticTransactionDB::Initialize where we set
191 // skip_concurrency_control to true.
RebuildFromWriteBatch(WriteBatch * wb)192 Status WriteUnpreparedTxn::RebuildFromWriteBatch(WriteBatch* wb) {
193 struct TrackKeyHandler : public WriteBatch::Handler {
194 WriteUnpreparedTxn* txn_;
195 bool rollback_merge_operands_;
196
197 TrackKeyHandler(WriteUnpreparedTxn* txn, bool rollback_merge_operands)
198 : txn_(txn), rollback_merge_operands_(rollback_merge_operands) {}
199
200 Status PutCF(uint32_t cf, const Slice& key, const Slice&) override {
201 txn_->TrackKey(cf, key.ToString(), kMaxSequenceNumber,
202 false /* read_only */, true /* exclusive */);
203 return Status::OK();
204 }
205
206 Status DeleteCF(uint32_t cf, const Slice& key) override {
207 txn_->TrackKey(cf, key.ToString(), kMaxSequenceNumber,
208 false /* read_only */, true /* exclusive */);
209 return Status::OK();
210 }
211
212 Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
213 txn_->TrackKey(cf, key.ToString(), kMaxSequenceNumber,
214 false /* read_only */, true /* exclusive */);
215 return Status::OK();
216 }
217
218 Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override {
219 if (rollback_merge_operands_) {
220 txn_->TrackKey(cf, key.ToString(), kMaxSequenceNumber,
221 false /* read_only */, true /* exclusive */);
222 }
223 return Status::OK();
224 }
225
226 // Recovered batches do not contain 2PC markers.
227 Status MarkBeginPrepare(bool) override { return Status::InvalidArgument(); }
228
229 Status MarkEndPrepare(const Slice&) override {
230 return Status::InvalidArgument();
231 }
232
233 Status MarkNoop(bool) override { return Status::InvalidArgument(); }
234
235 Status MarkCommit(const Slice&) override {
236 return Status::InvalidArgument();
237 }
238
239 Status MarkRollback(const Slice&) override {
240 return Status::InvalidArgument();
241 }
242 };
243
244 TrackKeyHandler handler(this,
245 wupt_db_->txn_db_options_.rollback_merge_operands);
246 return wb->Iterate(&handler);
247 }
248
MaybeFlushWriteBatchToDB()249 Status WriteUnpreparedTxn::MaybeFlushWriteBatchToDB() {
250 const bool kPrepared = true;
251 Status s;
252 if (write_batch_flush_threshold_ > 0 &&
253 write_batch_.GetWriteBatch()->Count() > 0 &&
254 write_batch_.GetDataSize() >
255 static_cast<size_t>(write_batch_flush_threshold_)) {
256 assert(GetState() != PREPARED);
257 s = FlushWriteBatchToDB(!kPrepared);
258 }
259 return s;
260 }
261
FlushWriteBatchToDB(bool prepared)262 Status WriteUnpreparedTxn::FlushWriteBatchToDB(bool prepared) {
263 // If the current write batch contains savepoints, then some special handling
264 // is required so that RollbackToSavepoint can work.
265 //
266 // RollbackToSavepoint is not supported after Prepare() is called, so only do
267 // this for unprepared batches.
268 if (!prepared && unflushed_save_points_ != nullptr &&
269 !unflushed_save_points_->empty()) {
270 return FlushWriteBatchWithSavePointToDB();
271 }
272
273 return FlushWriteBatchToDBInternal(prepared);
274 }
275
FlushWriteBatchToDBInternal(bool prepared)276 Status WriteUnpreparedTxn::FlushWriteBatchToDBInternal(bool prepared) {
277 if (name_.empty()) {
278 assert(!prepared);
279 #ifndef NDEBUG
280 static std::atomic_ullong autogen_id{0};
281 // To avoid changing all tests to call SetName, just autogenerate one.
282 if (wupt_db_->txn_db_options_.autogenerate_name) {
283 auto s =
284 SetName(std::string("autoxid") + ToString(autogen_id.fetch_add(1)));
285 assert(s.ok());
286 } else
287 #endif
288 {
289 return Status::InvalidArgument("Cannot write to DB without SetName.");
290 }
291 }
292
293 struct UntrackedKeyHandler : public WriteBatch::Handler {
294 WriteUnpreparedTxn* txn_;
295 bool rollback_merge_operands_;
296
297 UntrackedKeyHandler(WriteUnpreparedTxn* txn, bool rollback_merge_operands)
298 : txn_(txn), rollback_merge_operands_(rollback_merge_operands) {}
299
300 Status AddUntrackedKey(uint32_t cf, const Slice& key) {
301 auto str = key.ToString();
302 PointLockStatus lock_status =
303 txn_->tracked_locks_->GetPointLockStatus(cf, str);
304 if (!lock_status.locked) {
305 txn_->untracked_keys_[cf].push_back(str);
306 }
307 return Status::OK();
308 }
309
310 Status PutCF(uint32_t cf, const Slice& key, const Slice&) override {
311 return AddUntrackedKey(cf, key);
312 }
313
314 Status DeleteCF(uint32_t cf, const Slice& key) override {
315 return AddUntrackedKey(cf, key);
316 }
317
318 Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
319 return AddUntrackedKey(cf, key);
320 }
321
322 Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override {
323 if (rollback_merge_operands_) {
324 return AddUntrackedKey(cf, key);
325 }
326 return Status::OK();
327 }
328
329 // The only expected 2PC marker is the initial Noop marker.
330 Status MarkNoop(bool empty_batch) override {
331 return empty_batch ? Status::OK() : Status::InvalidArgument();
332 }
333
334 Status MarkBeginPrepare(bool) override { return Status::InvalidArgument(); }
335
336 Status MarkEndPrepare(const Slice&) override {
337 return Status::InvalidArgument();
338 }
339
340 Status MarkCommit(const Slice&) override {
341 return Status::InvalidArgument();
342 }
343
344 Status MarkRollback(const Slice&) override {
345 return Status::InvalidArgument();
346 }
347 };
348
349 UntrackedKeyHandler handler(
350 this, wupt_db_->txn_db_options_.rollback_merge_operands);
351 auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&handler);
352 assert(s.ok());
353
354 // TODO(lth): Reduce duplicate code with WritePrepared prepare logic.
355 WriteOptions write_options = write_options_;
356 write_options.disableWAL = false;
357 const bool WRITE_AFTER_COMMIT = true;
358 const bool first_prepare_batch = log_number_ == 0;
359 // MarkEndPrepare will change Noop marker to the appropriate marker.
360 s = WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(),
361 name_, !WRITE_AFTER_COMMIT, !prepared);
362 assert(s.ok());
363 // For each duplicate key we account for a new sub-batch
364 prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt();
365 // AddPrepared better to be called in the pre-release callback otherwise there
366 // is a non-zero chance of max advancing prepare_seq and readers assume the
367 // data as committed.
368 // Also having it in the PreReleaseCallback allows in-order addition of
369 // prepared entries to PreparedHeap and hence enables an optimization. Refer
370 // to SmallestUnCommittedSeq for more details.
371 AddPreparedCallback add_prepared_callback(
372 wpt_db_, db_impl_, prepare_batch_cnt_,
373 db_impl_->immutable_db_options().two_write_queues, first_prepare_batch);
374 const bool DISABLE_MEMTABLE = true;
375 uint64_t seq_used = kMaxSequenceNumber;
376 // log_number_ should refer to the oldest log containing uncommitted data
377 // from the current transaction. This means that if log_number_ is set,
378 // WriteImpl should not overwrite that value, so set log_used to nullptr if
379 // log_number_ is already set.
380 s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(),
381 /*callback*/ nullptr, &last_log_number_,
382 /*log ref*/ 0, !DISABLE_MEMTABLE, &seq_used,
383 prepare_batch_cnt_, &add_prepared_callback);
384 if (log_number_ == 0) {
385 log_number_ = last_log_number_;
386 }
387 assert(!s.ok() || seq_used != kMaxSequenceNumber);
388 auto prepare_seq = seq_used;
389
390 // Only call SetId if it hasn't been set yet.
391 if (GetId() == 0) {
392 SetId(prepare_seq);
393 }
394 // unprep_seqs_ will also contain prepared seqnos since they are treated in
395 // the same way in the prepare/commit callbacks. See the comment on the
396 // definition of unprep_seqs_.
397 unprep_seqs_[prepare_seq] = prepare_batch_cnt_;
398
399 // Reset transaction state.
400 if (!prepared) {
401 prepare_batch_cnt_ = 0;
402 const bool kClear = true;
403 TransactionBaseImpl::InitWriteBatch(kClear);
404 }
405
406 return s;
407 }
408
FlushWriteBatchWithSavePointToDB()409 Status WriteUnpreparedTxn::FlushWriteBatchWithSavePointToDB() {
410 assert(unflushed_save_points_ != nullptr &&
411 unflushed_save_points_->size() > 0);
412 assert(save_points_ != nullptr && save_points_->size() > 0);
413 assert(save_points_->size() >= unflushed_save_points_->size());
414
415 // Handler class for creating an unprepared batch from a savepoint.
416 struct SavePointBatchHandler : public WriteBatch::Handler {
417 WriteBatchWithIndex* wb_;
418 const std::map<uint32_t, ColumnFamilyHandle*>& handles_;
419
420 SavePointBatchHandler(
421 WriteBatchWithIndex* wb,
422 const std::map<uint32_t, ColumnFamilyHandle*>& handles)
423 : wb_(wb), handles_(handles) {}
424
425 Status PutCF(uint32_t cf, const Slice& key, const Slice& value) override {
426 return wb_->Put(handles_.at(cf), key, value);
427 }
428
429 Status DeleteCF(uint32_t cf, const Slice& key) override {
430 return wb_->Delete(handles_.at(cf), key);
431 }
432
433 Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
434 return wb_->SingleDelete(handles_.at(cf), key);
435 }
436
437 Status MergeCF(uint32_t cf, const Slice& key, const Slice& value) override {
438 return wb_->Merge(handles_.at(cf), key, value);
439 }
440
441 // The only expected 2PC marker is the initial Noop marker.
442 Status MarkNoop(bool empty_batch) override {
443 return empty_batch ? Status::OK() : Status::InvalidArgument();
444 }
445
446 Status MarkBeginPrepare(bool) override { return Status::InvalidArgument(); }
447
448 Status MarkEndPrepare(const Slice&) override {
449 return Status::InvalidArgument();
450 }
451
452 Status MarkCommit(const Slice&) override {
453 return Status::InvalidArgument();
454 }
455
456 Status MarkRollback(const Slice&) override {
457 return Status::InvalidArgument();
458 }
459 };
460
461 // The comparator of the default cf is passed in, similar to the
462 // initialization of TransactionBaseImpl::write_batch_. This comparator is
463 // only used if the write batch encounters an invalid cf id, and falls back to
464 // this comparator.
465 WriteBatchWithIndex wb(wpt_db_->DefaultColumnFamily()->GetComparator(), 0,
466 true, 0);
467 // Swap with write_batch_ so that wb contains the complete write batch. The
468 // actual write batch that will be flushed to DB will be built in
469 // write_batch_, and will be read by FlushWriteBatchToDBInternal.
470 std::swap(wb, write_batch_);
471 TransactionBaseImpl::InitWriteBatch();
472
473 size_t prev_boundary = WriteBatchInternal::kHeader;
474 const bool kPrepared = true;
475 for (size_t i = 0; i < unflushed_save_points_->size() + 1; i++) {
476 bool trailing_batch = i == unflushed_save_points_->size();
477 SavePointBatchHandler sp_handler(&write_batch_,
478 *wupt_db_->GetCFHandleMap().get());
479 size_t curr_boundary = trailing_batch ? wb.GetWriteBatch()->GetDataSize()
480 : (*unflushed_save_points_)[i];
481
482 // Construct the partial write batch up to the savepoint.
483 //
484 // Theoretically, a memcpy between the write batches should be sufficient
485 // since the rewriting into the batch should produce the exact same byte
486 // representation. Rebuilding the WriteBatchWithIndex index is still
487 // necessary though, and would imply doing two passes over the batch though.
488 Status s = WriteBatchInternal::Iterate(wb.GetWriteBatch(), &sp_handler,
489 prev_boundary, curr_boundary);
490 if (!s.ok()) {
491 return s;
492 }
493
494 if (write_batch_.GetWriteBatch()->Count() > 0) {
495 // Flush the write batch.
496 s = FlushWriteBatchToDBInternal(!kPrepared);
497 if (!s.ok()) {
498 return s;
499 }
500 }
501
502 if (!trailing_batch) {
503 if (flushed_save_points_ == nullptr) {
504 flushed_save_points_.reset(
505 new autovector<WriteUnpreparedTxn::SavePoint>());
506 }
507 flushed_save_points_->emplace_back(
508 unprep_seqs_, new ManagedSnapshot(db_impl_, wupt_db_->GetSnapshot()));
509 }
510
511 prev_boundary = curr_boundary;
512 const bool kClear = true;
513 TransactionBaseImpl::InitWriteBatch(kClear);
514 }
515
516 unflushed_save_points_->clear();
517 return Status::OK();
518 }
519
PrepareInternal()520 Status WriteUnpreparedTxn::PrepareInternal() {
521 const bool kPrepared = true;
522 return FlushWriteBatchToDB(kPrepared);
523 }
524
CommitWithoutPrepareInternal()525 Status WriteUnpreparedTxn::CommitWithoutPrepareInternal() {
526 if (unprep_seqs_.empty()) {
527 assert(log_number_ == 0);
528 assert(GetId() == 0);
529 return WritePreparedTxn::CommitWithoutPrepareInternal();
530 }
531
532 // TODO(lth): We should optimize commit without prepare to not perform
533 // a prepare under the hood.
534 auto s = PrepareInternal();
535 if (!s.ok()) {
536 return s;
537 }
538 return CommitInternal();
539 }
540
CommitInternal()541 Status WriteUnpreparedTxn::CommitInternal() {
542 // TODO(lth): Reduce duplicate code with WritePrepared commit logic.
543
544 // We take the commit-time batch and append the Commit marker. The Memtable
545 // will ignore the Commit marker in non-recovery mode
546 WriteBatch* working_batch = GetCommitTimeWriteBatch();
547 const bool empty = working_batch->Count() == 0;
548 auto s = WriteBatchInternal::MarkCommit(working_batch, name_);
549 assert(s.ok());
550
551 const bool for_recovery = use_only_the_last_commit_time_batch_for_recovery_;
552 if (!empty && for_recovery) {
553 // When not writing to memtable, we can still cache the latest write batch.
554 // The cached batch will be written to memtable in WriteRecoverableState
555 // during FlushMemTable
556 WriteBatchInternal::SetAsLatestPersistentState(working_batch);
557 }
558
559 const bool includes_data = !empty && !for_recovery;
560 size_t commit_batch_cnt = 0;
561 if (UNLIKELY(includes_data)) {
562 ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
563 "Duplicate key overhead");
564 SubBatchCounter counter(*wpt_db_->GetCFComparatorMap());
565 s = working_batch->Iterate(&counter);
566 assert(s.ok());
567 commit_batch_cnt = counter.BatchCount();
568 }
569 const bool disable_memtable = !includes_data;
570 const bool do_one_write =
571 !db_impl_->immutable_db_options().two_write_queues || disable_memtable;
572
573 WriteUnpreparedCommitEntryPreReleaseCallback update_commit_map(
574 wpt_db_, db_impl_, unprep_seqs_, commit_batch_cnt);
575 const bool kFirstPrepareBatch = true;
576 AddPreparedCallback add_prepared_callback(
577 wpt_db_, db_impl_, commit_batch_cnt,
578 db_impl_->immutable_db_options().two_write_queues, !kFirstPrepareBatch);
579 PreReleaseCallback* pre_release_callback;
580 if (do_one_write) {
581 pre_release_callback = &update_commit_map;
582 } else {
583 pre_release_callback = &add_prepared_callback;
584 }
585 uint64_t seq_used = kMaxSequenceNumber;
586 // Since the prepared batch is directly written to memtable, there is
587 // already a connection between the memtable and its WAL, so there is no
588 // need to redundantly reference the log that contains the prepared data.
589 const uint64_t zero_log_number = 0ull;
590 size_t batch_cnt = UNLIKELY(commit_batch_cnt) ? commit_batch_cnt : 1;
591 s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr,
592 zero_log_number, disable_memtable, &seq_used,
593 batch_cnt, pre_release_callback);
594 assert(!s.ok() || seq_used != kMaxSequenceNumber);
595 const SequenceNumber commit_batch_seq = seq_used;
596 if (LIKELY(do_one_write || !s.ok())) {
597 if (LIKELY(s.ok())) {
598 // Note RemovePrepared should be called after WriteImpl that publishsed
599 // the seq. Otherwise SmallestUnCommittedSeq optimization breaks.
600 for (const auto& seq : unprep_seqs_) {
601 wpt_db_->RemovePrepared(seq.first, seq.second);
602 }
603 }
604 if (UNLIKELY(!do_one_write)) {
605 wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt);
606 }
607 unprep_seqs_.clear();
608 flushed_save_points_.reset(nullptr);
609 unflushed_save_points_.reset(nullptr);
610 return s;
611 } // else do the 2nd write to publish seq
612
613 // Populate unprep_seqs_ with commit_batch_seq, since we treat data in the
614 // commit write batch as just another "unprepared" batch. This will also
615 // update the unprep_seqs_ in the update_commit_map callback.
616 unprep_seqs_[commit_batch_seq] = commit_batch_cnt;
617 WriteUnpreparedCommitEntryPreReleaseCallback
618 update_commit_map_with_commit_batch(wpt_db_, db_impl_, unprep_seqs_, 0);
619
620 // Note: the 2nd write comes with a performance penality. So if we have too
621 // many of commits accompanied with ComitTimeWriteBatch and yet we cannot
622 // enable use_only_the_last_commit_time_batch_for_recovery_ optimization,
623 // two_write_queues should be disabled to avoid many additional writes here.
624
625 // Update commit map only from the 2nd queue
626 WriteBatch empty_batch;
627 s = empty_batch.PutLogData(Slice());
628 assert(s.ok());
629 // In the absence of Prepare markers, use Noop as a batch separator
630 s = WriteBatchInternal::InsertNoop(&empty_batch);
631 assert(s.ok());
632 const bool DISABLE_MEMTABLE = true;
633 const size_t ONE_BATCH = 1;
634 const uint64_t NO_REF_LOG = 0;
635 s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
636 NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
637 &update_commit_map_with_commit_batch);
638 assert(!s.ok() || seq_used != kMaxSequenceNumber);
639 // Note RemovePrepared should be called after WriteImpl that publishsed the
640 // seq. Otherwise SmallestUnCommittedSeq optimization breaks.
641 for (const auto& seq : unprep_seqs_) {
642 wpt_db_->RemovePrepared(seq.first, seq.second);
643 }
644 unprep_seqs_.clear();
645 flushed_save_points_.reset(nullptr);
646 unflushed_save_points_.reset(nullptr);
647 return s;
648 }
649
WriteRollbackKeys(const LockTracker & lock_tracker,WriteBatchWithIndex * rollback_batch,ReadCallback * callback,const ReadOptions & roptions)650 Status WriteUnpreparedTxn::WriteRollbackKeys(
651 const LockTracker& lock_tracker, WriteBatchWithIndex* rollback_batch,
652 ReadCallback* callback, const ReadOptions& roptions) {
653 // This assertion can be removed when range lock is supported.
654 assert(lock_tracker.IsPointLockSupported());
655 const auto& cf_map = *wupt_db_->GetCFHandleMap();
656 auto WriteRollbackKey = [&](const std::string& key, uint32_t cfid) {
657 const auto& cf_handle = cf_map.at(cfid);
658 PinnableSlice pinnable_val;
659 bool not_used;
660 DBImpl::GetImplOptions get_impl_options;
661 get_impl_options.column_family = cf_handle;
662 get_impl_options.value = &pinnable_val;
663 get_impl_options.value_found = ¬_used;
664 get_impl_options.callback = callback;
665 auto s = db_impl_->GetImpl(roptions, key, get_impl_options);
666
667 if (s.ok()) {
668 s = rollback_batch->Put(cf_handle, key, pinnable_val);
669 assert(s.ok());
670 } else if (s.IsNotFound()) {
671 s = rollback_batch->Delete(cf_handle, key);
672 assert(s.ok());
673 } else {
674 return s;
675 }
676
677 return Status::OK();
678 };
679
680 std::unique_ptr<LockTracker::ColumnFamilyIterator> cf_it(
681 lock_tracker.GetColumnFamilyIterator());
682 assert(cf_it != nullptr);
683 while (cf_it->HasNext()) {
684 ColumnFamilyId cf = cf_it->Next();
685 std::unique_ptr<LockTracker::KeyIterator> key_it(
686 lock_tracker.GetKeyIterator(cf));
687 assert(key_it != nullptr);
688 while (key_it->HasNext()) {
689 const std::string& key = key_it->Next();
690 auto s = WriteRollbackKey(key, cf);
691 if (!s.ok()) {
692 return s;
693 }
694 }
695 }
696
697 for (const auto& cfkey : untracked_keys_) {
698 const auto cfid = cfkey.first;
699 const auto& keys = cfkey.second;
700 for (const auto& key : keys) {
701 auto s = WriteRollbackKey(key, cfid);
702 if (!s.ok()) {
703 return s;
704 }
705 }
706 }
707
708 return Status::OK();
709 }
710
RollbackInternal()711 Status WriteUnpreparedTxn::RollbackInternal() {
712 // TODO(lth): Reduce duplicate code with WritePrepared rollback logic.
713 WriteBatchWithIndex rollback_batch(
714 wpt_db_->DefaultColumnFamily()->GetComparator(), 0, true, 0);
715 assert(GetId() != kMaxSequenceNumber);
716 assert(GetId() > 0);
717 Status s;
718 auto read_at_seq = kMaxSequenceNumber;
719 ReadOptions roptions;
720 // to prevent callback's seq to be overrriden inside DBImpk::Get
721 roptions.snapshot = wpt_db_->GetMaxSnapshot();
722 // Note that we do not use WriteUnpreparedTxnReadCallback because we do not
723 // need to read our own writes when reading prior versions of the key for
724 // rollback.
725 WritePreparedTxnReadCallback callback(wpt_db_, read_at_seq);
726 // TODO(lth): We write rollback batch all in a single batch here, but this
727 // should be subdivded into multiple batches as well. In phase 2, when key
728 // sets are read from WAL, this will happen naturally.
729 s = WriteRollbackKeys(*tracked_locks_, &rollback_batch, &callback, roptions);
730 if (!s.ok()) {
731 return s;
732 }
733
734 // The Rollback marker will be used as a batch separator
735 s = WriteBatchInternal::MarkRollback(rollback_batch.GetWriteBatch(), name_);
736 assert(s.ok());
737 bool do_one_write = !db_impl_->immutable_db_options().two_write_queues;
738 const bool DISABLE_MEMTABLE = true;
739 const uint64_t NO_REF_LOG = 0;
740 uint64_t seq_used = kMaxSequenceNumber;
741 // Rollback batch may contain duplicate keys, because tracked_keys_ is not
742 // comparator aware.
743 auto rollback_batch_cnt = rollback_batch.SubBatchCnt();
744 // We commit the rolled back prepared batches. Although this is
745 // counter-intuitive, i) it is safe to do so, since the prepared batches are
746 // already canceled out by the rollback batch, ii) adding the commit entry to
747 // CommitCache will allow us to benefit from the existing mechanism in
748 // CommitCache that keeps an entry evicted due to max advance and yet overlaps
749 // with a live snapshot around so that the live snapshot properly skips the
750 // entry even if its prepare seq is lower than max_evicted_seq_.
751 //
752 // TODO(lth): RollbackInternal is conceptually very similar to
753 // CommitInternal, with the rollback batch simply taking on the role of
754 // CommitTimeWriteBatch. We should be able to merge the two code paths.
755 WriteUnpreparedCommitEntryPreReleaseCallback update_commit_map(
756 wpt_db_, db_impl_, unprep_seqs_, rollback_batch_cnt);
757 // Note: the rollback batch does not need AddPrepared since it is written to
758 // DB in one shot. min_uncommitted still works since it requires capturing
759 // data that is written to DB but not yet committed, while the rollback
760 // batch commits with PreReleaseCallback.
761 s = db_impl_->WriteImpl(write_options_, rollback_batch.GetWriteBatch(),
762 nullptr, nullptr, NO_REF_LOG, !DISABLE_MEMTABLE,
763 &seq_used, rollback_batch_cnt,
764 do_one_write ? &update_commit_map : nullptr);
765 assert(!s.ok() || seq_used != kMaxSequenceNumber);
766 if (!s.ok()) {
767 return s;
768 }
769 if (do_one_write) {
770 for (const auto& seq : unprep_seqs_) {
771 wpt_db_->RemovePrepared(seq.first, seq.second);
772 }
773 unprep_seqs_.clear();
774 flushed_save_points_.reset(nullptr);
775 unflushed_save_points_.reset(nullptr);
776 return s;
777 } // else do the 2nd write for commit
778
779 uint64_t& prepare_seq = seq_used;
780 // Populate unprep_seqs_ with rollback_batch_cnt, since we treat data in the
781 // rollback write batch as just another "unprepared" batch. This will also
782 // update the unprep_seqs_ in the update_commit_map callback.
783 unprep_seqs_[prepare_seq] = rollback_batch_cnt;
784 WriteUnpreparedCommitEntryPreReleaseCallback
785 update_commit_map_with_rollback_batch(wpt_db_, db_impl_, unprep_seqs_, 0);
786
787 ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
788 "RollbackInternal 2nd write prepare_seq: %" PRIu64,
789 prepare_seq);
790 WriteBatch empty_batch;
791 const size_t ONE_BATCH = 1;
792 s = empty_batch.PutLogData(Slice());
793 assert(s.ok());
794 // In the absence of Prepare markers, use Noop as a batch separator
795 s = WriteBatchInternal::InsertNoop(&empty_batch);
796 assert(s.ok());
797 s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
798 NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
799 &update_commit_map_with_rollback_batch);
800 assert(!s.ok() || seq_used != kMaxSequenceNumber);
801 // Mark the txn as rolled back
802 if (s.ok()) {
803 for (const auto& seq : unprep_seqs_) {
804 wpt_db_->RemovePrepared(seq.first, seq.second);
805 }
806 }
807
808 unprep_seqs_.clear();
809 flushed_save_points_.reset(nullptr);
810 unflushed_save_points_.reset(nullptr);
811 return s;
812 }
813
Clear()814 void WriteUnpreparedTxn::Clear() {
815 if (!recovered_txn_) {
816 txn_db_impl_->UnLock(this, *tracked_locks_);
817 }
818 unprep_seqs_.clear();
819 flushed_save_points_.reset(nullptr);
820 unflushed_save_points_.reset(nullptr);
821 recovered_txn_ = false;
822 largest_validated_seq_ = 0;
823 for (auto& it : active_iterators_) {
824 auto bdit = static_cast<BaseDeltaIterator*>(it);
825 bdit->Invalidate(Status::InvalidArgument(
826 "Cannot use iterator after transaction has finished"));
827 }
828 active_iterators_.clear();
829 untracked_keys_.clear();
830 TransactionBaseImpl::Clear();
831 }
832
SetSavePoint()833 void WriteUnpreparedTxn::SetSavePoint() {
834 assert((unflushed_save_points_ ? unflushed_save_points_->size() : 0) +
835 (flushed_save_points_ ? flushed_save_points_->size() : 0) ==
836 (save_points_ ? save_points_->size() : 0));
837 PessimisticTransaction::SetSavePoint();
838 if (unflushed_save_points_ == nullptr) {
839 unflushed_save_points_.reset(new autovector<size_t>());
840 }
841 unflushed_save_points_->push_back(write_batch_.GetDataSize());
842 }
843
RollbackToSavePoint()844 Status WriteUnpreparedTxn::RollbackToSavePoint() {
845 assert((unflushed_save_points_ ? unflushed_save_points_->size() : 0) +
846 (flushed_save_points_ ? flushed_save_points_->size() : 0) ==
847 (save_points_ ? save_points_->size() : 0));
848 if (unflushed_save_points_ != nullptr && unflushed_save_points_->size() > 0) {
849 Status s = PessimisticTransaction::RollbackToSavePoint();
850 assert(!s.IsNotFound());
851 unflushed_save_points_->pop_back();
852 return s;
853 }
854
855 if (flushed_save_points_ != nullptr && !flushed_save_points_->empty()) {
856 return RollbackToSavePointInternal();
857 }
858
859 return Status::NotFound();
860 }
861
RollbackToSavePointInternal()862 Status WriteUnpreparedTxn::RollbackToSavePointInternal() {
863 Status s;
864
865 const bool kClear = true;
866 TransactionBaseImpl::InitWriteBatch(kClear);
867
868 assert(flushed_save_points_->size() > 0);
869 WriteUnpreparedTxn::SavePoint& top = flushed_save_points_->back();
870
871 assert(save_points_ != nullptr && save_points_->size() > 0);
872 const LockTracker& tracked_keys = *save_points_->top().new_locks_;
873
874 ReadOptions roptions;
875 roptions.snapshot = top.snapshot_->snapshot();
876 SequenceNumber min_uncommitted =
877 static_cast_with_check<const SnapshotImpl>(roptions.snapshot)
878 ->min_uncommitted_;
879 SequenceNumber snap_seq = roptions.snapshot->GetSequenceNumber();
880 WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted,
881 top.unprep_seqs_,
882 kBackedByDBSnapshot);
883 s = WriteRollbackKeys(tracked_keys, &write_batch_, &callback, roptions);
884 if (!s.ok()) {
885 return s;
886 }
887
888 const bool kPrepared = true;
889 s = FlushWriteBatchToDBInternal(!kPrepared);
890 if (!s.ok()) {
891 return s;
892 }
893
894 // PessimisticTransaction::RollbackToSavePoint will call also call
895 // RollbackToSavepoint on write_batch_. However, write_batch_ is empty and has
896 // no savepoints because this savepoint has already been flushed. Work around
897 // this by setting a fake savepoint.
898 write_batch_.SetSavePoint();
899 s = PessimisticTransaction::RollbackToSavePoint();
900 assert(s.ok());
901 if (!s.ok()) {
902 return s;
903 }
904
905 flushed_save_points_->pop_back();
906 return s;
907 }
908
PopSavePoint()909 Status WriteUnpreparedTxn::PopSavePoint() {
910 assert((unflushed_save_points_ ? unflushed_save_points_->size() : 0) +
911 (flushed_save_points_ ? flushed_save_points_->size() : 0) ==
912 (save_points_ ? save_points_->size() : 0));
913 if (unflushed_save_points_ != nullptr && unflushed_save_points_->size() > 0) {
914 Status s = PessimisticTransaction::PopSavePoint();
915 assert(!s.IsNotFound());
916 unflushed_save_points_->pop_back();
917 return s;
918 }
919
920 if (flushed_save_points_ != nullptr && !flushed_save_points_->empty()) {
921 // PessimisticTransaction::PopSavePoint will call also call PopSavePoint on
922 // write_batch_. However, write_batch_ is empty and has no savepoints
923 // because this savepoint has already been flushed. Work around this by
924 // setting a fake savepoint.
925 write_batch_.SetSavePoint();
926 Status s = PessimisticTransaction::PopSavePoint();
927 assert(!s.IsNotFound());
928 flushed_save_points_->pop_back();
929 return s;
930 }
931
932 return Status::NotFound();
933 }
934
MultiGet(const ReadOptions & options,ColumnFamilyHandle * column_family,const size_t num_keys,const Slice * keys,PinnableSlice * values,Status * statuses,const bool sorted_input)935 void WriteUnpreparedTxn::MultiGet(const ReadOptions& options,
936 ColumnFamilyHandle* column_family,
937 const size_t num_keys, const Slice* keys,
938 PinnableSlice* values, Status* statuses,
939 const bool sorted_input) {
940 SequenceNumber min_uncommitted, snap_seq;
941 const SnapshotBackup backed_by_snapshot =
942 wupt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
943 WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted,
944 unprep_seqs_, backed_by_snapshot);
945 write_batch_.MultiGetFromBatchAndDB(db_, options, column_family, num_keys,
946 keys, values, statuses, sorted_input,
947 &callback);
948 if (UNLIKELY(!callback.valid() ||
949 !wupt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) {
950 wupt_db_->WPRecordTick(TXN_GET_TRY_AGAIN);
951 for (size_t i = 0; i < num_keys; i++) {
952 statuses[i] = Status::TryAgain();
953 }
954 }
955 }
956
Get(const ReadOptions & options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * value)957 Status WriteUnpreparedTxn::Get(const ReadOptions& options,
958 ColumnFamilyHandle* column_family,
959 const Slice& key, PinnableSlice* value) {
960 SequenceNumber min_uncommitted, snap_seq;
961 const SnapshotBackup backed_by_snapshot =
962 wupt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
963 WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted,
964 unprep_seqs_, backed_by_snapshot);
965 auto res = write_batch_.GetFromBatchAndDB(db_, options, column_family, key,
966 value, &callback);
967 if (LIKELY(callback.valid() &&
968 wupt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) {
969 return res;
970 } else {
971 res.PermitUncheckedError();
972 wupt_db_->WPRecordTick(TXN_GET_TRY_AGAIN);
973 return Status::TryAgain();
974 }
975 }
976
977 namespace {
CleanupWriteUnpreparedWBWIIterator(void * arg1,void * arg2)978 static void CleanupWriteUnpreparedWBWIIterator(void* arg1, void* arg2) {
979 auto txn = reinterpret_cast<WriteUnpreparedTxn*>(arg1);
980 auto iter = reinterpret_cast<Iterator*>(arg2);
981 txn->RemoveActiveIterator(iter);
982 }
983 } // anonymous namespace
984
GetIterator(const ReadOptions & options)985 Iterator* WriteUnpreparedTxn::GetIterator(const ReadOptions& options) {
986 return GetIterator(options, wupt_db_->DefaultColumnFamily());
987 }
988
GetIterator(const ReadOptions & options,ColumnFamilyHandle * column_family)989 Iterator* WriteUnpreparedTxn::GetIterator(const ReadOptions& options,
990 ColumnFamilyHandle* column_family) {
991 // Make sure to get iterator from WriteUnprepareTxnDB, not the root db.
992 Iterator* db_iter = wupt_db_->NewIterator(options, column_family, this);
993 assert(db_iter);
994
995 auto iter = write_batch_.NewIteratorWithBase(column_family, db_iter);
996 active_iterators_.push_back(iter);
997 iter->RegisterCleanup(CleanupWriteUnpreparedWBWIIterator, this, iter);
998 return iter;
999 }
1000
ValidateSnapshot(ColumnFamilyHandle * column_family,const Slice & key,SequenceNumber * tracked_at_seq)1001 Status WriteUnpreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family,
1002 const Slice& key,
1003 SequenceNumber* tracked_at_seq) {
1004 // TODO(lth): Reduce duplicate code with WritePrepared ValidateSnapshot logic.
1005 assert(snapshot_);
1006
1007 SequenceNumber min_uncommitted =
1008 static_cast_with_check<const SnapshotImpl>(snapshot_.get())
1009 ->min_uncommitted_;
1010 SequenceNumber snap_seq = snapshot_->GetSequenceNumber();
1011 // tracked_at_seq is either max or the last snapshot with which this key was
1012 // trackeed so there is no need to apply the IsInSnapshot to this comparison
1013 // here as tracked_at_seq is not a prepare seq.
1014 if (*tracked_at_seq <= snap_seq) {
1015 // If the key has been previous validated at a sequence number earlier
1016 // than the curent snapshot's sequence number, we already know it has not
1017 // been modified.
1018 return Status::OK();
1019 }
1020
1021 *tracked_at_seq = snap_seq;
1022
1023 ColumnFamilyHandle* cfh =
1024 column_family ? column_family : db_impl_->DefaultColumnFamily();
1025
1026 WriteUnpreparedTxnReadCallback snap_checker(
1027 wupt_db_, snap_seq, min_uncommitted, unprep_seqs_, kBackedByDBSnapshot);
1028 return TransactionUtil::CheckKeyForConflicts(db_impl_, cfh, key.ToString(),
1029 snap_seq, false /* cache_only */,
1030 &snap_checker, min_uncommitted);
1031 }
1032
1033 const std::map<SequenceNumber, size_t>&
GetUnpreparedSequenceNumbers()1034 WriteUnpreparedTxn::GetUnpreparedSequenceNumbers() {
1035 return unprep_seqs_;
1036 }
1037
1038 } // namespace ROCKSDB_NAMESPACE
1039
1040 #endif // ROCKSDB_LITE
1041