1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 //
10 // WriteBatch::rep_ :=
11 // sequence: fixed64
12 // count: fixed32
13 // data: record[count]
14 // record :=
15 // kTypeValue varstring varstring
16 // kTypeDeletion varstring
17 // kTypeSingleDeletion varstring
18 // kTypeRangeDeletion varstring varstring
19 // kTypeMerge varstring varstring
20 // kTypeColumnFamilyValue varint32 varstring varstring
21 // kTypeColumnFamilyDeletion varint32 varstring
22 // kTypeColumnFamilySingleDeletion varint32 varstring
23 // kTypeColumnFamilyRangeDeletion varint32 varstring varstring
24 // kTypeColumnFamilyMerge varint32 varstring varstring
25 // kTypeBeginPrepareXID varstring
26 // kTypeEndPrepareXID
27 // kTypeCommitXID varstring
28 // kTypeRollbackXID varstring
29 // kTypeBeginPersistedPrepareXID varstring
30 // kTypeBeginUnprepareXID varstring
31 // kTypeNoop
32 // varstring :=
33 // len: varint32
34 // data: uint8[len]
35
36 #include "rocksdb/write_batch.h"
37
38 #include <map>
39 #include <stack>
40 #include <stdexcept>
41 #include <type_traits>
42 #include <unordered_map>
43 #include <vector>
44
45 #include "db/column_family.h"
46 #include "db/db_impl/db_impl.h"
47 #include "db/dbformat.h"
48 #include "db/flush_scheduler.h"
49 #include "db/kv_checksum.h"
50 #include "db/memtable.h"
51 #include "db/merge_context.h"
52 #include "db/snapshot_impl.h"
53 #include "db/trim_history_scheduler.h"
54 #include "db/write_batch_internal.h"
55 #include "monitoring/perf_context_imp.h"
56 #include "monitoring/statistics.h"
57 #include "port/lang.h"
58 #include "rocksdb/merge_operator.h"
59 #include "rocksdb/system_clock.h"
60 #include "util/autovector.h"
61 #include "util/cast_util.h"
62 #include "util/coding.h"
63 #include "util/duplicate_detector.h"
64 #include "util/string_util.h"
65
66 namespace ROCKSDB_NAMESPACE {
67
68 // anon namespace for file-local types
69 namespace {
70
71 enum ContentFlags : uint32_t {
72 DEFERRED = 1 << 0,
73 HAS_PUT = 1 << 1,
74 HAS_DELETE = 1 << 2,
75 HAS_SINGLE_DELETE = 1 << 3,
76 HAS_MERGE = 1 << 4,
77 HAS_BEGIN_PREPARE = 1 << 5,
78 HAS_END_PREPARE = 1 << 6,
79 HAS_COMMIT = 1 << 7,
80 HAS_ROLLBACK = 1 << 8,
81 HAS_DELETE_RANGE = 1 << 9,
82 HAS_BLOB_INDEX = 1 << 10,
83 HAS_BEGIN_UNPREPARE = 1 << 11,
84 };
85
86 struct BatchContentClassifier : public WriteBatch::Handler {
87 uint32_t content_flags = 0;
88
PutCFROCKSDB_NAMESPACE::__anon239e65fc0111::BatchContentClassifier89 Status PutCF(uint32_t, const Slice&, const Slice&) override {
90 content_flags |= ContentFlags::HAS_PUT;
91 return Status::OK();
92 }
93
DeleteCFROCKSDB_NAMESPACE::__anon239e65fc0111::BatchContentClassifier94 Status DeleteCF(uint32_t, const Slice&) override {
95 content_flags |= ContentFlags::HAS_DELETE;
96 return Status::OK();
97 }
98
SingleDeleteCFROCKSDB_NAMESPACE::__anon239e65fc0111::BatchContentClassifier99 Status SingleDeleteCF(uint32_t, const Slice&) override {
100 content_flags |= ContentFlags::HAS_SINGLE_DELETE;
101 return Status::OK();
102 }
103
DeleteRangeCFROCKSDB_NAMESPACE::__anon239e65fc0111::BatchContentClassifier104 Status DeleteRangeCF(uint32_t, const Slice&, const Slice&) override {
105 content_flags |= ContentFlags::HAS_DELETE_RANGE;
106 return Status::OK();
107 }
108
MergeCFROCKSDB_NAMESPACE::__anon239e65fc0111::BatchContentClassifier109 Status MergeCF(uint32_t, const Slice&, const Slice&) override {
110 content_flags |= ContentFlags::HAS_MERGE;
111 return Status::OK();
112 }
113
PutBlobIndexCFROCKSDB_NAMESPACE::__anon239e65fc0111::BatchContentClassifier114 Status PutBlobIndexCF(uint32_t, const Slice&, const Slice&) override {
115 content_flags |= ContentFlags::HAS_BLOB_INDEX;
116 return Status::OK();
117 }
118
MarkBeginPrepareROCKSDB_NAMESPACE::__anon239e65fc0111::BatchContentClassifier119 Status MarkBeginPrepare(bool unprepare) override {
120 content_flags |= ContentFlags::HAS_BEGIN_PREPARE;
121 if (unprepare) {
122 content_flags |= ContentFlags::HAS_BEGIN_UNPREPARE;
123 }
124 return Status::OK();
125 }
126
MarkEndPrepareROCKSDB_NAMESPACE::__anon239e65fc0111::BatchContentClassifier127 Status MarkEndPrepare(const Slice&) override {
128 content_flags |= ContentFlags::HAS_END_PREPARE;
129 return Status::OK();
130 }
131
MarkCommitROCKSDB_NAMESPACE::__anon239e65fc0111::BatchContentClassifier132 Status MarkCommit(const Slice&) override {
133 content_flags |= ContentFlags::HAS_COMMIT;
134 return Status::OK();
135 }
136
MarkRollbackROCKSDB_NAMESPACE::__anon239e65fc0111::BatchContentClassifier137 Status MarkRollback(const Slice&) override {
138 content_flags |= ContentFlags::HAS_ROLLBACK;
139 return Status::OK();
140 }
141 };
142
143 class TimestampAssigner : public WriteBatch::Handler {
144 public:
TimestampAssigner(const Slice & ts,WriteBatch::ProtectionInfo * prot_info)145 explicit TimestampAssigner(const Slice& ts,
146 WriteBatch::ProtectionInfo* prot_info)
147 : timestamp_(ts),
148 timestamps_(kEmptyTimestampList),
149 prot_info_(prot_info) {}
TimestampAssigner(const std::vector<Slice> & ts_list,WriteBatch::ProtectionInfo * prot_info)150 explicit TimestampAssigner(const std::vector<Slice>& ts_list,
151 WriteBatch::ProtectionInfo* prot_info)
152 : timestamps_(ts_list), prot_info_(prot_info) {}
~TimestampAssigner()153 ~TimestampAssigner() override {}
154
PutCF(uint32_t,const Slice & key,const Slice &)155 Status PutCF(uint32_t, const Slice& key, const Slice&) override {
156 AssignTimestamp(key);
157 ++idx_;
158 return Status::OK();
159 }
160
DeleteCF(uint32_t,const Slice & key)161 Status DeleteCF(uint32_t, const Slice& key) override {
162 AssignTimestamp(key);
163 ++idx_;
164 return Status::OK();
165 }
166
SingleDeleteCF(uint32_t,const Slice & key)167 Status SingleDeleteCF(uint32_t, const Slice& key) override {
168 AssignTimestamp(key);
169 ++idx_;
170 return Status::OK();
171 }
172
DeleteRangeCF(uint32_t,const Slice & begin_key,const Slice &)173 Status DeleteRangeCF(uint32_t, const Slice& begin_key,
174 const Slice& /* end_key */) override {
175 AssignTimestamp(begin_key);
176 ++idx_;
177 return Status::OK();
178 }
179
MergeCF(uint32_t,const Slice & key,const Slice &)180 Status MergeCF(uint32_t, const Slice& key, const Slice&) override {
181 AssignTimestamp(key);
182 ++idx_;
183 return Status::OK();
184 }
185
PutBlobIndexCF(uint32_t,const Slice &,const Slice &)186 Status PutBlobIndexCF(uint32_t, const Slice&, const Slice&) override {
187 // TODO (yanqin): support blob db in the future.
188 return Status::OK();
189 }
190
MarkBeginPrepare(bool)191 Status MarkBeginPrepare(bool) override {
192 // TODO (yanqin): support in the future.
193 return Status::OK();
194 }
195
MarkEndPrepare(const Slice &)196 Status MarkEndPrepare(const Slice&) override {
197 // TODO (yanqin): support in the future.
198 return Status::OK();
199 }
200
MarkCommit(const Slice &)201 Status MarkCommit(const Slice&) override {
202 // TODO (yanqin): support in the future.
203 return Status::OK();
204 }
205
MarkRollback(const Slice &)206 Status MarkRollback(const Slice&) override {
207 // TODO (yanqin): support in the future.
208 return Status::OK();
209 }
210
211 private:
AssignTimestamp(const Slice & key)212 void AssignTimestamp(const Slice& key) {
213 assert(timestamps_.empty() || idx_ < timestamps_.size());
214 const Slice& ts = timestamps_.empty() ? timestamp_ : timestamps_[idx_];
215 size_t ts_sz = ts.size();
216 if (ts_sz == 0) {
217 // This key does not have timestamp, so skip.
218 return;
219 }
220 if (prot_info_ != nullptr) {
221 SliceParts old_key(&key, 1);
222 Slice key_no_ts(key.data(), key.size() - ts_sz);
223 std::array<Slice, 2> new_key_cmpts{{key_no_ts, ts}};
224 SliceParts new_key(new_key_cmpts.data(), 2);
225 prot_info_->entries_[idx_].UpdateK(old_key, new_key);
226 }
227 char* ptr = const_cast<char*>(key.data() + key.size() - ts_sz);
228 memcpy(ptr, ts.data(), ts_sz);
229 }
230
231 static const std::vector<Slice> kEmptyTimestampList;
232 const Slice timestamp_;
233 const std::vector<Slice>& timestamps_;
234 WriteBatch::ProtectionInfo* const prot_info_;
235 size_t idx_ = 0;
236
237 // No copy or move.
238 TimestampAssigner(const TimestampAssigner&) = delete;
239 TimestampAssigner(TimestampAssigner&&) = delete;
240 TimestampAssigner& operator=(const TimestampAssigner&) = delete;
241 TimestampAssigner&& operator=(TimestampAssigner&&) = delete;
242 };
243 const std::vector<Slice> TimestampAssigner::kEmptyTimestampList;
244
245 } // anon namespace
246
247 struct SavePoints {
248 std::stack<SavePoint, autovector<SavePoint>> stack;
249 };
250
WriteBatch(size_t reserved_bytes,size_t max_bytes)251 WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes)
252 : content_flags_(0), max_bytes_(max_bytes), rep_() {
253 rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader)
254 ? reserved_bytes
255 : WriteBatchInternal::kHeader);
256 rep_.resize(WriteBatchInternal::kHeader);
257 }
258
WriteBatch(size_t reserved_bytes,size_t max_bytes,size_t protection_bytes_per_key)259 WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes,
260 size_t protection_bytes_per_key)
261 : content_flags_(0), max_bytes_(max_bytes), rep_() {
262 // Currently `protection_bytes_per_key` can only be enabled at 8 bytes per
263 // entry.
264 assert(protection_bytes_per_key == 0 || protection_bytes_per_key == 8);
265 if (protection_bytes_per_key != 0) {
266 prot_info_.reset(new WriteBatch::ProtectionInfo());
267 }
268 rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader)
269 ? reserved_bytes
270 : WriteBatchInternal::kHeader);
271 rep_.resize(WriteBatchInternal::kHeader);
272 }
273
WriteBatch(const std::string & rep)274 WriteBatch::WriteBatch(const std::string& rep)
275 : content_flags_(ContentFlags::DEFERRED), max_bytes_(0), rep_(rep) {}
276
WriteBatch(std::string && rep)277 WriteBatch::WriteBatch(std::string&& rep)
278 : content_flags_(ContentFlags::DEFERRED),
279 max_bytes_(0),
280 rep_(std::move(rep)) {}
281
WriteBatch(const WriteBatch & src)282 WriteBatch::WriteBatch(const WriteBatch& src)
283 : wal_term_point_(src.wal_term_point_),
284 content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
285 max_bytes_(src.max_bytes_),
286 rep_(src.rep_) {
287 if (src.save_points_ != nullptr) {
288 save_points_.reset(new SavePoints());
289 save_points_->stack = src.save_points_->stack;
290 }
291 if (src.prot_info_ != nullptr) {
292 prot_info_.reset(new WriteBatch::ProtectionInfo());
293 prot_info_->entries_ = src.prot_info_->entries_;
294 }
295 }
296
WriteBatch(WriteBatch && src)297 WriteBatch::WriteBatch(WriteBatch&& src) noexcept
298 : save_points_(std::move(src.save_points_)),
299 wal_term_point_(std::move(src.wal_term_point_)),
300 content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
301 max_bytes_(src.max_bytes_),
302 prot_info_(std::move(src.prot_info_)),
303 rep_(std::move(src.rep_)) {}
304
operator =(const WriteBatch & src)305 WriteBatch& WriteBatch::operator=(const WriteBatch& src) {
306 if (&src != this) {
307 this->~WriteBatch();
308 new (this) WriteBatch(src);
309 }
310 return *this;
311 }
312
operator =(WriteBatch && src)313 WriteBatch& WriteBatch::operator=(WriteBatch&& src) {
314 if (&src != this) {
315 this->~WriteBatch();
316 new (this) WriteBatch(std::move(src));
317 }
318 return *this;
319 }
320
~WriteBatch()321 WriteBatch::~WriteBatch() { }
322
~Handler()323 WriteBatch::Handler::~Handler() { }
324
LogData(const Slice &)325 void WriteBatch::Handler::LogData(const Slice& /*blob*/) {
326 // If the user has not specified something to do with blobs, then we ignore
327 // them.
328 }
329
Continue()330 bool WriteBatch::Handler::Continue() {
331 return true;
332 }
333
Clear()334 void WriteBatch::Clear() {
335 rep_.clear();
336 rep_.resize(WriteBatchInternal::kHeader);
337
338 content_flags_.store(0, std::memory_order_relaxed);
339
340 if (save_points_ != nullptr) {
341 while (!save_points_->stack.empty()) {
342 save_points_->stack.pop();
343 }
344 }
345
346 if (prot_info_ != nullptr) {
347 prot_info_->entries_.clear();
348 }
349 wal_term_point_.clear();
350 }
351
Count() const352 uint32_t WriteBatch::Count() const { return WriteBatchInternal::Count(this); }
353
ComputeContentFlags() const354 uint32_t WriteBatch::ComputeContentFlags() const {
355 auto rv = content_flags_.load(std::memory_order_relaxed);
356 if ((rv & ContentFlags::DEFERRED) != 0) {
357 BatchContentClassifier classifier;
358 // Should we handle status here?
359 Iterate(&classifier).PermitUncheckedError();
360 rv = classifier.content_flags;
361
362 // this method is conceptually const, because it is performing a lazy
363 // computation that doesn't affect the abstract state of the batch.
364 // content_flags_ is marked mutable so that we can perform the
365 // following assignment
366 content_flags_.store(rv, std::memory_order_relaxed);
367 }
368 return rv;
369 }
370
MarkWalTerminationPoint()371 void WriteBatch::MarkWalTerminationPoint() {
372 wal_term_point_.size = GetDataSize();
373 wal_term_point_.count = Count();
374 wal_term_point_.content_flags = content_flags_;
375 }
376
GetProtectionBytesPerKey() const377 size_t WriteBatch::GetProtectionBytesPerKey() const {
378 if (prot_info_ != nullptr) {
379 return prot_info_->GetBytesPerKey();
380 }
381 return 0;
382 }
383
HasPut() const384 bool WriteBatch::HasPut() const {
385 return (ComputeContentFlags() & ContentFlags::HAS_PUT) != 0;
386 }
387
HasDelete() const388 bool WriteBatch::HasDelete() const {
389 return (ComputeContentFlags() & ContentFlags::HAS_DELETE) != 0;
390 }
391
HasSingleDelete() const392 bool WriteBatch::HasSingleDelete() const {
393 return (ComputeContentFlags() & ContentFlags::HAS_SINGLE_DELETE) != 0;
394 }
395
HasDeleteRange() const396 bool WriteBatch::HasDeleteRange() const {
397 return (ComputeContentFlags() & ContentFlags::HAS_DELETE_RANGE) != 0;
398 }
399
HasMerge() const400 bool WriteBatch::HasMerge() const {
401 return (ComputeContentFlags() & ContentFlags::HAS_MERGE) != 0;
402 }
403
ReadKeyFromWriteBatchEntry(Slice * input,Slice * key,bool cf_record)404 bool ReadKeyFromWriteBatchEntry(Slice* input, Slice* key, bool cf_record) {
405 assert(input != nullptr && key != nullptr);
406 // Skip tag byte
407 input->remove_prefix(1);
408
409 if (cf_record) {
410 // Skip column_family bytes
411 uint32_t cf;
412 if (!GetVarint32(input, &cf)) {
413 return false;
414 }
415 }
416
417 // Extract key
418 return GetLengthPrefixedSlice(input, key);
419 }
420
HasBeginPrepare() const421 bool WriteBatch::HasBeginPrepare() const {
422 return (ComputeContentFlags() & ContentFlags::HAS_BEGIN_PREPARE) != 0;
423 }
424
HasEndPrepare() const425 bool WriteBatch::HasEndPrepare() const {
426 return (ComputeContentFlags() & ContentFlags::HAS_END_PREPARE) != 0;
427 }
428
HasCommit() const429 bool WriteBatch::HasCommit() const {
430 return (ComputeContentFlags() & ContentFlags::HAS_COMMIT) != 0;
431 }
432
HasRollback() const433 bool WriteBatch::HasRollback() const {
434 return (ComputeContentFlags() & ContentFlags::HAS_ROLLBACK) != 0;
435 }
436
ReadRecordFromWriteBatch(Slice * input,char * tag,uint32_t * column_family,Slice * key,Slice * value,Slice * blob,Slice * xid)437 Status ReadRecordFromWriteBatch(Slice* input, char* tag,
438 uint32_t* column_family, Slice* key,
439 Slice* value, Slice* blob, Slice* xid) {
440 assert(key != nullptr && value != nullptr);
441 *tag = (*input)[0];
442 input->remove_prefix(1);
443 *column_family = 0; // default
444 switch (*tag) {
445 case kTypeColumnFamilyValue:
446 if (!GetVarint32(input, column_family)) {
447 return Status::Corruption("bad WriteBatch Put");
448 }
449 FALLTHROUGH_INTENDED;
450 case kTypeValue:
451 if (!GetLengthPrefixedSlice(input, key) ||
452 !GetLengthPrefixedSlice(input, value)) {
453 return Status::Corruption("bad WriteBatch Put");
454 }
455 break;
456 case kTypeColumnFamilyDeletion:
457 case kTypeColumnFamilySingleDeletion:
458 if (!GetVarint32(input, column_family)) {
459 return Status::Corruption("bad WriteBatch Delete");
460 }
461 FALLTHROUGH_INTENDED;
462 case kTypeDeletion:
463 case kTypeSingleDeletion:
464 if (!GetLengthPrefixedSlice(input, key)) {
465 return Status::Corruption("bad WriteBatch Delete");
466 }
467 break;
468 case kTypeColumnFamilyRangeDeletion:
469 if (!GetVarint32(input, column_family)) {
470 return Status::Corruption("bad WriteBatch DeleteRange");
471 }
472 FALLTHROUGH_INTENDED;
473 case kTypeRangeDeletion:
474 // for range delete, "key" is begin_key, "value" is end_key
475 if (!GetLengthPrefixedSlice(input, key) ||
476 !GetLengthPrefixedSlice(input, value)) {
477 return Status::Corruption("bad WriteBatch DeleteRange");
478 }
479 break;
480 case kTypeColumnFamilyMerge:
481 if (!GetVarint32(input, column_family)) {
482 return Status::Corruption("bad WriteBatch Merge");
483 }
484 FALLTHROUGH_INTENDED;
485 case kTypeMerge:
486 if (!GetLengthPrefixedSlice(input, key) ||
487 !GetLengthPrefixedSlice(input, value)) {
488 return Status::Corruption("bad WriteBatch Merge");
489 }
490 break;
491 case kTypeColumnFamilyBlobIndex:
492 if (!GetVarint32(input, column_family)) {
493 return Status::Corruption("bad WriteBatch BlobIndex");
494 }
495 FALLTHROUGH_INTENDED;
496 case kTypeBlobIndex:
497 if (!GetLengthPrefixedSlice(input, key) ||
498 !GetLengthPrefixedSlice(input, value)) {
499 return Status::Corruption("bad WriteBatch BlobIndex");
500 }
501 break;
502 case kTypeLogData:
503 assert(blob != nullptr);
504 if (!GetLengthPrefixedSlice(input, blob)) {
505 return Status::Corruption("bad WriteBatch Blob");
506 }
507 break;
508 case kTypeNoop:
509 case kTypeBeginPrepareXID:
510 // This indicates that the prepared batch is also persisted in the db.
511 // This is used in WritePreparedTxn
512 case kTypeBeginPersistedPrepareXID:
513 // This is used in WriteUnpreparedTxn
514 case kTypeBeginUnprepareXID:
515 break;
516 case kTypeEndPrepareXID:
517 if (!GetLengthPrefixedSlice(input, xid)) {
518 return Status::Corruption("bad EndPrepare XID");
519 }
520 break;
521 case kTypeCommitXID:
522 if (!GetLengthPrefixedSlice(input, xid)) {
523 return Status::Corruption("bad Commit XID");
524 }
525 break;
526 case kTypeRollbackXID:
527 if (!GetLengthPrefixedSlice(input, xid)) {
528 return Status::Corruption("bad Rollback XID");
529 }
530 break;
531 default:
532 return Status::Corruption("unknown WriteBatch tag");
533 }
534 return Status::OK();
535 }
536
Iterate(Handler * handler) const537 Status WriteBatch::Iterate(Handler* handler) const {
538 if (rep_.size() < WriteBatchInternal::kHeader) {
539 return Status::Corruption("malformed WriteBatch (too small)");
540 }
541
542 return WriteBatchInternal::Iterate(this, handler, WriteBatchInternal::kHeader,
543 rep_.size());
544 }
545
Iterate(const WriteBatch * wb,WriteBatch::Handler * handler,size_t begin,size_t end)546 Status WriteBatchInternal::Iterate(const WriteBatch* wb,
547 WriteBatch::Handler* handler, size_t begin,
548 size_t end) {
549 if (begin > wb->rep_.size() || end > wb->rep_.size() || end < begin) {
550 return Status::Corruption("Invalid start/end bounds for Iterate");
551 }
552 assert(begin <= end);
553 Slice input(wb->rep_.data() + begin, static_cast<size_t>(end - begin));
554 bool whole_batch =
555 (begin == WriteBatchInternal::kHeader) && (end == wb->rep_.size());
556
557 Slice key, value, blob, xid;
558 // Sometimes a sub-batch starts with a Noop. We want to exclude such Noops as
559 // the batch boundary symbols otherwise we would mis-count the number of
560 // batches. We do that by checking whether the accumulated batch is empty
561 // before seeing the next Noop.
562 bool empty_batch = true;
563 uint32_t found = 0;
564 Status s;
565 char tag = 0;
566 uint32_t column_family = 0; // default
567 bool last_was_try_again = false;
568 bool handler_continue = true;
569 while (((s.ok() && !input.empty()) || UNLIKELY(s.IsTryAgain()))) {
570 handler_continue = handler->Continue();
571 if (!handler_continue) {
572 break;
573 }
574
575 if (LIKELY(!s.IsTryAgain())) {
576 last_was_try_again = false;
577 tag = 0;
578 column_family = 0; // default
579
580 s = ReadRecordFromWriteBatch(&input, &tag, &column_family, &key, &value,
581 &blob, &xid);
582 if (!s.ok()) {
583 return s;
584 }
585 } else {
586 assert(s.IsTryAgain());
587 assert(!last_was_try_again); // to detect infinite loop bugs
588 if (UNLIKELY(last_was_try_again)) {
589 return Status::Corruption(
590 "two consecutive TryAgain in WriteBatch handler; this is either a "
591 "software bug or data corruption.");
592 }
593 last_was_try_again = true;
594 s = Status::OK();
595 }
596
597 switch (tag) {
598 case kTypeColumnFamilyValue:
599 case kTypeValue:
600 assert(wb->content_flags_.load(std::memory_order_relaxed) &
601 (ContentFlags::DEFERRED | ContentFlags::HAS_PUT));
602 s = handler->PutCF(column_family, key, value);
603 if (LIKELY(s.ok())) {
604 empty_batch = false;
605 found++;
606 }
607 break;
608 case kTypeColumnFamilyDeletion:
609 case kTypeDeletion:
610 assert(wb->content_flags_.load(std::memory_order_relaxed) &
611 (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE));
612 s = handler->DeleteCF(column_family, key);
613 if (LIKELY(s.ok())) {
614 empty_batch = false;
615 found++;
616 }
617 break;
618 case kTypeColumnFamilySingleDeletion:
619 case kTypeSingleDeletion:
620 assert(wb->content_flags_.load(std::memory_order_relaxed) &
621 (ContentFlags::DEFERRED | ContentFlags::HAS_SINGLE_DELETE));
622 s = handler->SingleDeleteCF(column_family, key);
623 if (LIKELY(s.ok())) {
624 empty_batch = false;
625 found++;
626 }
627 break;
628 case kTypeColumnFamilyRangeDeletion:
629 case kTypeRangeDeletion:
630 assert(wb->content_flags_.load(std::memory_order_relaxed) &
631 (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE_RANGE));
632 s = handler->DeleteRangeCF(column_family, key, value);
633 if (LIKELY(s.ok())) {
634 empty_batch = false;
635 found++;
636 }
637 break;
638 case kTypeColumnFamilyMerge:
639 case kTypeMerge:
640 assert(wb->content_flags_.load(std::memory_order_relaxed) &
641 (ContentFlags::DEFERRED | ContentFlags::HAS_MERGE));
642 s = handler->MergeCF(column_family, key, value);
643 if (LIKELY(s.ok())) {
644 empty_batch = false;
645 found++;
646 }
647 break;
648 case kTypeColumnFamilyBlobIndex:
649 case kTypeBlobIndex:
650 assert(wb->content_flags_.load(std::memory_order_relaxed) &
651 (ContentFlags::DEFERRED | ContentFlags::HAS_BLOB_INDEX));
652 s = handler->PutBlobIndexCF(column_family, key, value);
653 if (LIKELY(s.ok())) {
654 found++;
655 }
656 break;
657 case kTypeLogData:
658 handler->LogData(blob);
659 // A batch might have nothing but LogData. It is still a batch.
660 empty_batch = false;
661 break;
662 case kTypeBeginPrepareXID:
663 assert(wb->content_flags_.load(std::memory_order_relaxed) &
664 (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE));
665 s = handler->MarkBeginPrepare();
666 assert(s.ok());
667 empty_batch = false;
668 if (!handler->WriteAfterCommit()) {
669 s = Status::NotSupported(
670 "WriteCommitted txn tag when write_after_commit_ is disabled (in "
671 "WritePrepared/WriteUnprepared mode). If it is not due to "
672 "corruption, the WAL must be emptied before changing the "
673 "WritePolicy.");
674 }
675 if (handler->WriteBeforePrepare()) {
676 s = Status::NotSupported(
677 "WriteCommitted txn tag when write_before_prepare_ is enabled "
678 "(in WriteUnprepared mode). If it is not due to corruption, the "
679 "WAL must be emptied before changing the WritePolicy.");
680 }
681 break;
682 case kTypeBeginPersistedPrepareXID:
683 assert(wb->content_flags_.load(std::memory_order_relaxed) &
684 (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE));
685 s = handler->MarkBeginPrepare();
686 assert(s.ok());
687 empty_batch = false;
688 if (handler->WriteAfterCommit()) {
689 s = Status::NotSupported(
690 "WritePrepared/WriteUnprepared txn tag when write_after_commit_ "
691 "is enabled (in default WriteCommitted mode). If it is not due "
692 "to corruption, the WAL must be emptied before changing the "
693 "WritePolicy.");
694 }
695 break;
696 case kTypeBeginUnprepareXID:
697 assert(wb->content_flags_.load(std::memory_order_relaxed) &
698 (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_UNPREPARE));
699 s = handler->MarkBeginPrepare(true /* unprepared */);
700 assert(s.ok());
701 empty_batch = false;
702 if (handler->WriteAfterCommit()) {
703 s = Status::NotSupported(
704 "WriteUnprepared txn tag when write_after_commit_ is enabled (in "
705 "default WriteCommitted mode). If it is not due to corruption, "
706 "the WAL must be emptied before changing the WritePolicy.");
707 }
708 if (!handler->WriteBeforePrepare()) {
709 s = Status::NotSupported(
710 "WriteUnprepared txn tag when write_before_prepare_ is disabled "
711 "(in WriteCommitted/WritePrepared mode). If it is not due to "
712 "corruption, the WAL must be emptied before changing the "
713 "WritePolicy.");
714 }
715 break;
716 case kTypeEndPrepareXID:
717 assert(wb->content_flags_.load(std::memory_order_relaxed) &
718 (ContentFlags::DEFERRED | ContentFlags::HAS_END_PREPARE));
719 s = handler->MarkEndPrepare(xid);
720 assert(s.ok());
721 empty_batch = true;
722 break;
723 case kTypeCommitXID:
724 assert(wb->content_flags_.load(std::memory_order_relaxed) &
725 (ContentFlags::DEFERRED | ContentFlags::HAS_COMMIT));
726 s = handler->MarkCommit(xid);
727 assert(s.ok());
728 empty_batch = true;
729 break;
730 case kTypeRollbackXID:
731 assert(wb->content_flags_.load(std::memory_order_relaxed) &
732 (ContentFlags::DEFERRED | ContentFlags::HAS_ROLLBACK));
733 s = handler->MarkRollback(xid);
734 assert(s.ok());
735 empty_batch = true;
736 break;
737 case kTypeNoop:
738 s = handler->MarkNoop(empty_batch);
739 assert(s.ok());
740 empty_batch = true;
741 break;
742 default:
743 return Status::Corruption("unknown WriteBatch tag");
744 }
745 }
746 if (!s.ok()) {
747 return s;
748 }
749 if (handler_continue && whole_batch &&
750 found != WriteBatchInternal::Count(wb)) {
751 return Status::Corruption("WriteBatch has wrong count");
752 } else {
753 return Status::OK();
754 }
755 }
756
IsLatestPersistentState(const WriteBatch * b)757 bool WriteBatchInternal::IsLatestPersistentState(const WriteBatch* b) {
758 return b->is_latest_persistent_state_;
759 }
760
SetAsLatestPersistentState(WriteBatch * b)761 void WriteBatchInternal::SetAsLatestPersistentState(WriteBatch* b) {
762 b->is_latest_persistent_state_ = true;
763 }
764
Count(const WriteBatch * b)765 uint32_t WriteBatchInternal::Count(const WriteBatch* b) {
766 return DecodeFixed32(b->rep_.data() + 8);
767 }
768
SetCount(WriteBatch * b,uint32_t n)769 void WriteBatchInternal::SetCount(WriteBatch* b, uint32_t n) {
770 EncodeFixed32(&b->rep_[8], n);
771 }
772
Sequence(const WriteBatch * b)773 SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) {
774 return SequenceNumber(DecodeFixed64(b->rep_.data()));
775 }
776
SetSequence(WriteBatch * b,SequenceNumber seq)777 void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
778 EncodeFixed64(&b->rep_[0], seq);
779 }
780
GetFirstOffset(WriteBatch *)781 size_t WriteBatchInternal::GetFirstOffset(WriteBatch* /*b*/) {
782 return WriteBatchInternal::kHeader;
783 }
784
Put(WriteBatch * b,uint32_t column_family_id,const Slice & key,const Slice & value)785 Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
786 const Slice& key, const Slice& value) {
787 if (key.size() > size_t{port::kMaxUint32}) {
788 return Status::InvalidArgument("key is too large");
789 }
790 if (value.size() > size_t{port::kMaxUint32}) {
791 return Status::InvalidArgument("value is too large");
792 }
793
794 LocalSavePoint save(b);
795 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
796 if (column_family_id == 0) {
797 b->rep_.push_back(static_cast<char>(kTypeValue));
798 } else {
799 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
800 PutVarint32(&b->rep_, column_family_id);
801 }
802 PutLengthPrefixedSlice(&b->rep_, key);
803 PutLengthPrefixedSlice(&b->rep_, value);
804 b->content_flags_.store(
805 b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
806 std::memory_order_relaxed);
807 if (b->prot_info_ != nullptr) {
808 // Technically the optype could've been `kTypeColumnFamilyValue` with the
809 // CF ID encoded in the `WriteBatch`. That distinction is unimportant
810 // however since we verify CF ID is correct, as well as all other fields
811 // (a missing/extra encoded CF ID would corrupt another field). It is
812 // convenient to consolidate on `kTypeValue` here as that is what will be
813 // inserted into memtable.
814 b->prot_info_->entries_.emplace_back(ProtectionInfo64()
815 .ProtectKVO(key, value, kTypeValue)
816 .ProtectC(column_family_id));
817 }
818 return save.commit();
819 }
820
Put(ColumnFamilyHandle * column_family,const Slice & key,const Slice & value)821 Status WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key,
822 const Slice& value) {
823 return WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key,
824 value);
825 }
826
CheckSlicePartsLength(const SliceParts & key,const SliceParts & value)827 Status WriteBatchInternal::CheckSlicePartsLength(const SliceParts& key,
828 const SliceParts& value) {
829 size_t total_key_bytes = 0;
830 for (int i = 0; i < key.num_parts; ++i) {
831 total_key_bytes += key.parts[i].size();
832 }
833 if (total_key_bytes >= size_t{port::kMaxUint32}) {
834 return Status::InvalidArgument("key is too large");
835 }
836
837 size_t total_value_bytes = 0;
838 for (int i = 0; i < value.num_parts; ++i) {
839 total_value_bytes += value.parts[i].size();
840 }
841 if (total_value_bytes >= size_t{port::kMaxUint32}) {
842 return Status::InvalidArgument("value is too large");
843 }
844 return Status::OK();
845 }
846
Put(WriteBatch * b,uint32_t column_family_id,const SliceParts & key,const SliceParts & value)847 Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
848 const SliceParts& key, const SliceParts& value) {
849 Status s = CheckSlicePartsLength(key, value);
850 if (!s.ok()) {
851 return s;
852 }
853
854 LocalSavePoint save(b);
855 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
856 if (column_family_id == 0) {
857 b->rep_.push_back(static_cast<char>(kTypeValue));
858 } else {
859 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
860 PutVarint32(&b->rep_, column_family_id);
861 }
862 PutLengthPrefixedSliceParts(&b->rep_, key);
863 PutLengthPrefixedSliceParts(&b->rep_, value);
864 b->content_flags_.store(
865 b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
866 std::memory_order_relaxed);
867 if (b->prot_info_ != nullptr) {
868 // See comment in first `WriteBatchInternal::Put()` overload concerning the
869 // `ValueType` argument passed to `ProtectKVO()`.
870 b->prot_info_->entries_.emplace_back(ProtectionInfo64()
871 .ProtectKVO(key, value, kTypeValue)
872 .ProtectC(column_family_id));
873 }
874 return save.commit();
875 }
876
Put(ColumnFamilyHandle * column_family,const SliceParts & key,const SliceParts & value)877 Status WriteBatch::Put(ColumnFamilyHandle* column_family, const SliceParts& key,
878 const SliceParts& value) {
879 return WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key,
880 value);
881 }
882
InsertNoop(WriteBatch * b)883 Status WriteBatchInternal::InsertNoop(WriteBatch* b) {
884 b->rep_.push_back(static_cast<char>(kTypeNoop));
885 return Status::OK();
886 }
887
MarkEndPrepare(WriteBatch * b,const Slice & xid,bool write_after_commit,bool unprepared_batch)888 Status WriteBatchInternal::MarkEndPrepare(WriteBatch* b, const Slice& xid,
889 bool write_after_commit,
890 bool unprepared_batch) {
891 // a manually constructed batch can only contain one prepare section
892 assert(b->rep_[12] == static_cast<char>(kTypeNoop));
893
894 // all savepoints up to this point are cleared
895 if (b->save_points_ != nullptr) {
896 while (!b->save_points_->stack.empty()) {
897 b->save_points_->stack.pop();
898 }
899 }
900
901 // rewrite noop as begin marker
902 b->rep_[12] = static_cast<char>(
903 write_after_commit ? kTypeBeginPrepareXID
904 : (unprepared_batch ? kTypeBeginUnprepareXID
905 : kTypeBeginPersistedPrepareXID));
906 b->rep_.push_back(static_cast<char>(kTypeEndPrepareXID));
907 PutLengthPrefixedSlice(&b->rep_, xid);
908 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
909 ContentFlags::HAS_END_PREPARE |
910 ContentFlags::HAS_BEGIN_PREPARE,
911 std::memory_order_relaxed);
912 if (unprepared_batch) {
913 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
914 ContentFlags::HAS_BEGIN_UNPREPARE,
915 std::memory_order_relaxed);
916 }
917 return Status::OK();
918 }
919
MarkCommit(WriteBatch * b,const Slice & xid)920 Status WriteBatchInternal::MarkCommit(WriteBatch* b, const Slice& xid) {
921 b->rep_.push_back(static_cast<char>(kTypeCommitXID));
922 PutLengthPrefixedSlice(&b->rep_, xid);
923 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
924 ContentFlags::HAS_COMMIT,
925 std::memory_order_relaxed);
926 return Status::OK();
927 }
928
MarkRollback(WriteBatch * b,const Slice & xid)929 Status WriteBatchInternal::MarkRollback(WriteBatch* b, const Slice& xid) {
930 b->rep_.push_back(static_cast<char>(kTypeRollbackXID));
931 PutLengthPrefixedSlice(&b->rep_, xid);
932 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
933 ContentFlags::HAS_ROLLBACK,
934 std::memory_order_relaxed);
935 return Status::OK();
936 }
937
Delete(WriteBatch * b,uint32_t column_family_id,const Slice & key)938 Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
939 const Slice& key) {
940 LocalSavePoint save(b);
941 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
942 if (column_family_id == 0) {
943 b->rep_.push_back(static_cast<char>(kTypeDeletion));
944 } else {
945 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion));
946 PutVarint32(&b->rep_, column_family_id);
947 }
948 PutLengthPrefixedSlice(&b->rep_, key);
949 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
950 ContentFlags::HAS_DELETE,
951 std::memory_order_relaxed);
952 if (b->prot_info_ != nullptr) {
953 // See comment in first `WriteBatchInternal::Put()` overload concerning the
954 // `ValueType` argument passed to `ProtectKVO()`.
955 b->prot_info_->entries_.emplace_back(
956 ProtectionInfo64()
957 .ProtectKVO(key, "" /* value */, kTypeDeletion)
958 .ProtectC(column_family_id));
959 }
960 return save.commit();
961 }
962
Delete(ColumnFamilyHandle * column_family,const Slice & key)963 Status WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key) {
964 return WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family),
965 key);
966 }
967
Delete(WriteBatch * b,uint32_t column_family_id,const SliceParts & key)968 Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
969 const SliceParts& key) {
970 LocalSavePoint save(b);
971 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
972 if (column_family_id == 0) {
973 b->rep_.push_back(static_cast<char>(kTypeDeletion));
974 } else {
975 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion));
976 PutVarint32(&b->rep_, column_family_id);
977 }
978 PutLengthPrefixedSliceParts(&b->rep_, key);
979 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
980 ContentFlags::HAS_DELETE,
981 std::memory_order_relaxed);
982 if (b->prot_info_ != nullptr) {
983 // See comment in first `WriteBatchInternal::Put()` overload concerning the
984 // `ValueType` argument passed to `ProtectKVO()`.
985 b->prot_info_->entries_.emplace_back(
986 ProtectionInfo64()
987 .ProtectKVO(key,
988 SliceParts(nullptr /* _parts */, 0 /* _num_parts */),
989 kTypeDeletion)
990 .ProtectC(column_family_id));
991 }
992 return save.commit();
993 }
994
Delete(ColumnFamilyHandle * column_family,const SliceParts & key)995 Status WriteBatch::Delete(ColumnFamilyHandle* column_family,
996 const SliceParts& key) {
997 return WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family),
998 key);
999 }
1000
SingleDelete(WriteBatch * b,uint32_t column_family_id,const Slice & key)1001 Status WriteBatchInternal::SingleDelete(WriteBatch* b,
1002 uint32_t column_family_id,
1003 const Slice& key) {
1004 LocalSavePoint save(b);
1005 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1006 if (column_family_id == 0) {
1007 b->rep_.push_back(static_cast<char>(kTypeSingleDeletion));
1008 } else {
1009 b->rep_.push_back(static_cast<char>(kTypeColumnFamilySingleDeletion));
1010 PutVarint32(&b->rep_, column_family_id);
1011 }
1012 PutLengthPrefixedSlice(&b->rep_, key);
1013 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1014 ContentFlags::HAS_SINGLE_DELETE,
1015 std::memory_order_relaxed);
1016 if (b->prot_info_ != nullptr) {
1017 // See comment in first `WriteBatchInternal::Put()` overload concerning the
1018 // `ValueType` argument passed to `ProtectKVO()`.
1019 b->prot_info_->entries_.emplace_back(
1020 ProtectionInfo64()
1021 .ProtectKVO(key, "" /* value */, kTypeSingleDeletion)
1022 .ProtectC(column_family_id));
1023 }
1024 return save.commit();
1025 }
1026
SingleDelete(ColumnFamilyHandle * column_family,const Slice & key)1027 Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family,
1028 const Slice& key) {
1029 return WriteBatchInternal::SingleDelete(
1030 this, GetColumnFamilyID(column_family), key);
1031 }
1032
SingleDelete(WriteBatch * b,uint32_t column_family_id,const SliceParts & key)1033 Status WriteBatchInternal::SingleDelete(WriteBatch* b,
1034 uint32_t column_family_id,
1035 const SliceParts& key) {
1036 LocalSavePoint save(b);
1037 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1038 if (column_family_id == 0) {
1039 b->rep_.push_back(static_cast<char>(kTypeSingleDeletion));
1040 } else {
1041 b->rep_.push_back(static_cast<char>(kTypeColumnFamilySingleDeletion));
1042 PutVarint32(&b->rep_, column_family_id);
1043 }
1044 PutLengthPrefixedSliceParts(&b->rep_, key);
1045 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1046 ContentFlags::HAS_SINGLE_DELETE,
1047 std::memory_order_relaxed);
1048 if (b->prot_info_ != nullptr) {
1049 // See comment in first `WriteBatchInternal::Put()` overload concerning the
1050 // `ValueType` argument passed to `ProtectKVO()`.
1051 b->prot_info_->entries_.emplace_back(
1052 ProtectionInfo64()
1053 .ProtectKVO(key,
1054 SliceParts(nullptr /* _parts */,
1055 0 /* _num_parts */) /* value */,
1056 kTypeSingleDeletion)
1057 .ProtectC(column_family_id));
1058 }
1059 return save.commit();
1060 }
1061
SingleDelete(ColumnFamilyHandle * column_family,const SliceParts & key)1062 Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family,
1063 const SliceParts& key) {
1064 return WriteBatchInternal::SingleDelete(
1065 this, GetColumnFamilyID(column_family), key);
1066 }
1067
DeleteRange(WriteBatch * b,uint32_t column_family_id,const Slice & begin_key,const Slice & end_key)1068 Status WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id,
1069 const Slice& begin_key,
1070 const Slice& end_key) {
1071 LocalSavePoint save(b);
1072 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1073 if (column_family_id == 0) {
1074 b->rep_.push_back(static_cast<char>(kTypeRangeDeletion));
1075 } else {
1076 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyRangeDeletion));
1077 PutVarint32(&b->rep_, column_family_id);
1078 }
1079 PutLengthPrefixedSlice(&b->rep_, begin_key);
1080 PutLengthPrefixedSlice(&b->rep_, end_key);
1081 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1082 ContentFlags::HAS_DELETE_RANGE,
1083 std::memory_order_relaxed);
1084 if (b->prot_info_ != nullptr) {
1085 // See comment in first `WriteBatchInternal::Put()` overload concerning the
1086 // `ValueType` argument passed to `ProtectKVO()`.
1087 // In `DeleteRange()`, the end key is treated as the value.
1088 b->prot_info_->entries_.emplace_back(
1089 ProtectionInfo64()
1090 .ProtectKVO(begin_key, end_key, kTypeRangeDeletion)
1091 .ProtectC(column_family_id));
1092 }
1093 return save.commit();
1094 }
1095
DeleteRange(ColumnFamilyHandle * column_family,const Slice & begin_key,const Slice & end_key)1096 Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family,
1097 const Slice& begin_key, const Slice& end_key) {
1098 return WriteBatchInternal::DeleteRange(this, GetColumnFamilyID(column_family),
1099 begin_key, end_key);
1100 }
1101
DeleteRange(WriteBatch * b,uint32_t column_family_id,const SliceParts & begin_key,const SliceParts & end_key)1102 Status WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id,
1103 const SliceParts& begin_key,
1104 const SliceParts& end_key) {
1105 LocalSavePoint save(b);
1106 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1107 if (column_family_id == 0) {
1108 b->rep_.push_back(static_cast<char>(kTypeRangeDeletion));
1109 } else {
1110 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyRangeDeletion));
1111 PutVarint32(&b->rep_, column_family_id);
1112 }
1113 PutLengthPrefixedSliceParts(&b->rep_, begin_key);
1114 PutLengthPrefixedSliceParts(&b->rep_, end_key);
1115 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1116 ContentFlags::HAS_DELETE_RANGE,
1117 std::memory_order_relaxed);
1118 if (b->prot_info_ != nullptr) {
1119 // See comment in first `WriteBatchInternal::Put()` overload concerning the
1120 // `ValueType` argument passed to `ProtectKVO()`.
1121 // In `DeleteRange()`, the end key is treated as the value.
1122 b->prot_info_->entries_.emplace_back(
1123 ProtectionInfo64()
1124 .ProtectKVO(begin_key, end_key, kTypeRangeDeletion)
1125 .ProtectC(column_family_id));
1126 }
1127 return save.commit();
1128 }
1129
DeleteRange(ColumnFamilyHandle * column_family,const SliceParts & begin_key,const SliceParts & end_key)1130 Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family,
1131 const SliceParts& begin_key,
1132 const SliceParts& end_key) {
1133 return WriteBatchInternal::DeleteRange(this, GetColumnFamilyID(column_family),
1134 begin_key, end_key);
1135 }
1136
Merge(WriteBatch * b,uint32_t column_family_id,const Slice & key,const Slice & value)1137 Status WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
1138 const Slice& key, const Slice& value) {
1139 if (key.size() > size_t{port::kMaxUint32}) {
1140 return Status::InvalidArgument("key is too large");
1141 }
1142 if (value.size() > size_t{port::kMaxUint32}) {
1143 return Status::InvalidArgument("value is too large");
1144 }
1145
1146 LocalSavePoint save(b);
1147 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1148 if (column_family_id == 0) {
1149 b->rep_.push_back(static_cast<char>(kTypeMerge));
1150 } else {
1151 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyMerge));
1152 PutVarint32(&b->rep_, column_family_id);
1153 }
1154 PutLengthPrefixedSlice(&b->rep_, key);
1155 PutLengthPrefixedSlice(&b->rep_, value);
1156 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1157 ContentFlags::HAS_MERGE,
1158 std::memory_order_relaxed);
1159 if (b->prot_info_ != nullptr) {
1160 // See comment in first `WriteBatchInternal::Put()` overload concerning the
1161 // `ValueType` argument passed to `ProtectKVO()`.
1162 b->prot_info_->entries_.emplace_back(ProtectionInfo64()
1163 .ProtectKVO(key, value, kTypeMerge)
1164 .ProtectC(column_family_id));
1165 }
1166 return save.commit();
1167 }
1168
Merge(ColumnFamilyHandle * column_family,const Slice & key,const Slice & value)1169 Status WriteBatch::Merge(ColumnFamilyHandle* column_family, const Slice& key,
1170 const Slice& value) {
1171 return WriteBatchInternal::Merge(this, GetColumnFamilyID(column_family), key,
1172 value);
1173 }
1174
Merge(WriteBatch * b,uint32_t column_family_id,const SliceParts & key,const SliceParts & value)1175 Status WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
1176 const SliceParts& key,
1177 const SliceParts& value) {
1178 Status s = CheckSlicePartsLength(key, value);
1179 if (!s.ok()) {
1180 return s;
1181 }
1182
1183 LocalSavePoint save(b);
1184 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1185 if (column_family_id == 0) {
1186 b->rep_.push_back(static_cast<char>(kTypeMerge));
1187 } else {
1188 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyMerge));
1189 PutVarint32(&b->rep_, column_family_id);
1190 }
1191 PutLengthPrefixedSliceParts(&b->rep_, key);
1192 PutLengthPrefixedSliceParts(&b->rep_, value);
1193 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1194 ContentFlags::HAS_MERGE,
1195 std::memory_order_relaxed);
1196 if (b->prot_info_ != nullptr) {
1197 // See comment in first `WriteBatchInternal::Put()` overload concerning the
1198 // `ValueType` argument passed to `ProtectKVO()`.
1199 b->prot_info_->entries_.emplace_back(ProtectionInfo64()
1200 .ProtectKVO(key, value, kTypeMerge)
1201 .ProtectC(column_family_id));
1202 }
1203 return save.commit();
1204 }
1205
Merge(ColumnFamilyHandle * column_family,const SliceParts & key,const SliceParts & value)1206 Status WriteBatch::Merge(ColumnFamilyHandle* column_family,
1207 const SliceParts& key, const SliceParts& value) {
1208 return WriteBatchInternal::Merge(this, GetColumnFamilyID(column_family), key,
1209 value);
1210 }
1211
PutBlobIndex(WriteBatch * b,uint32_t column_family_id,const Slice & key,const Slice & value)1212 Status WriteBatchInternal::PutBlobIndex(WriteBatch* b,
1213 uint32_t column_family_id,
1214 const Slice& key, const Slice& value) {
1215 LocalSavePoint save(b);
1216 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1217 if (column_family_id == 0) {
1218 b->rep_.push_back(static_cast<char>(kTypeBlobIndex));
1219 } else {
1220 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyBlobIndex));
1221 PutVarint32(&b->rep_, column_family_id);
1222 }
1223 PutLengthPrefixedSlice(&b->rep_, key);
1224 PutLengthPrefixedSlice(&b->rep_, value);
1225 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1226 ContentFlags::HAS_BLOB_INDEX,
1227 std::memory_order_relaxed);
1228 if (b->prot_info_ != nullptr) {
1229 // See comment in first `WriteBatchInternal::Put()` overload concerning the
1230 // `ValueType` argument passed to `ProtectKVO()`.
1231 b->prot_info_->entries_.emplace_back(
1232 ProtectionInfo64()
1233 .ProtectKVO(key, value, kTypeBlobIndex)
1234 .ProtectC(column_family_id));
1235 }
1236 return save.commit();
1237 }
1238
PutLogData(const Slice & blob)1239 Status WriteBatch::PutLogData(const Slice& blob) {
1240 LocalSavePoint save(this);
1241 rep_.push_back(static_cast<char>(kTypeLogData));
1242 PutLengthPrefixedSlice(&rep_, blob);
1243 return save.commit();
1244 }
1245
SetSavePoint()1246 void WriteBatch::SetSavePoint() {
1247 if (save_points_ == nullptr) {
1248 save_points_.reset(new SavePoints());
1249 }
1250 // Record length and count of current batch of writes.
1251 save_points_->stack.push(SavePoint(
1252 GetDataSize(), Count(), content_flags_.load(std::memory_order_relaxed)));
1253 }
1254
RollbackToSavePoint()1255 Status WriteBatch::RollbackToSavePoint() {
1256 if (save_points_ == nullptr || save_points_->stack.size() == 0) {
1257 return Status::NotFound();
1258 }
1259
1260 // Pop the most recent savepoint off the stack
1261 SavePoint savepoint = save_points_->stack.top();
1262 save_points_->stack.pop();
1263
1264 assert(savepoint.size <= rep_.size());
1265 assert(static_cast<uint32_t>(savepoint.count) <= Count());
1266
1267 if (savepoint.size == rep_.size()) {
1268 // No changes to rollback
1269 } else if (savepoint.size == 0) {
1270 // Rollback everything
1271 Clear();
1272 } else {
1273 rep_.resize(savepoint.size);
1274 if (prot_info_ != nullptr) {
1275 prot_info_->entries_.resize(savepoint.count);
1276 }
1277 WriteBatchInternal::SetCount(this, savepoint.count);
1278 content_flags_.store(savepoint.content_flags, std::memory_order_relaxed);
1279 }
1280
1281 return Status::OK();
1282 }
1283
PopSavePoint()1284 Status WriteBatch::PopSavePoint() {
1285 if (save_points_ == nullptr || save_points_->stack.size() == 0) {
1286 return Status::NotFound();
1287 }
1288
1289 // Pop the most recent savepoint off the stack
1290 save_points_->stack.pop();
1291
1292 return Status::OK();
1293 }
1294
AssignTimestamp(const Slice & ts)1295 Status WriteBatch::AssignTimestamp(const Slice& ts) {
1296 TimestampAssigner ts_assigner(ts, prot_info_.get());
1297 return Iterate(&ts_assigner);
1298 }
1299
AssignTimestamps(const std::vector<Slice> & ts_list)1300 Status WriteBatch::AssignTimestamps(const std::vector<Slice>& ts_list) {
1301 TimestampAssigner ts_assigner(ts_list, prot_info_.get());
1302 return Iterate(&ts_assigner);
1303 }
1304
1305 class MemTableInserter : public WriteBatch::Handler {
1306
1307 SequenceNumber sequence_;
1308 ColumnFamilyMemTables* const cf_mems_;
1309 FlushScheduler* const flush_scheduler_;
1310 TrimHistoryScheduler* const trim_history_scheduler_;
1311 const bool ignore_missing_column_families_;
1312 const uint64_t recovering_log_number_;
1313 // log number that all Memtables inserted into should reference
1314 uint64_t log_number_ref_;
1315 DBImpl* db_;
1316 const bool concurrent_memtable_writes_;
1317 bool post_info_created_;
1318 const WriteBatch::ProtectionInfo* prot_info_;
1319 size_t prot_info_idx_;
1320
1321 bool* has_valid_writes_;
1322 // On some (!) platforms just default creating
1323 // a map is too expensive in the Write() path as they
1324 // cause memory allocations though unused.
1325 // Make creation optional but do not incur
1326 // std::unique_ptr additional allocation
1327 using MemPostInfoMap = std::map<MemTable*, MemTablePostProcessInfo>;
1328 using PostMapType = std::aligned_storage<sizeof(MemPostInfoMap)>::type;
1329 PostMapType mem_post_info_map_;
1330 // current recovered transaction we are rebuilding (recovery)
1331 WriteBatch* rebuilding_trx_;
1332 SequenceNumber rebuilding_trx_seq_;
1333 // Increase seq number once per each write batch. Otherwise increase it once
1334 // per key.
1335 bool seq_per_batch_;
1336 // Whether the memtable write will be done only after the commit
1337 bool write_after_commit_;
1338 // Whether memtable write can be done before prepare
1339 bool write_before_prepare_;
1340 // Whether this batch was unprepared or not
1341 bool unprepared_batch_;
1342 using DupDetector = std::aligned_storage<sizeof(DuplicateDetector)>::type;
1343 DupDetector duplicate_detector_;
1344 bool dup_dectector_on_;
1345
1346 bool hint_per_batch_;
1347 bool hint_created_;
1348 // Hints for this batch
1349 using HintMap = std::unordered_map<MemTable*, void*>;
1350 using HintMapType = std::aligned_storage<sizeof(HintMap)>::type;
1351 HintMapType hint_;
1352
GetHintMap()1353 HintMap& GetHintMap() {
1354 assert(hint_per_batch_);
1355 if (!hint_created_) {
1356 new (&hint_) HintMap();
1357 hint_created_ = true;
1358 }
1359 return *reinterpret_cast<HintMap*>(&hint_);
1360 }
1361
GetPostMap()1362 MemPostInfoMap& GetPostMap() {
1363 assert(concurrent_memtable_writes_);
1364 if(!post_info_created_) {
1365 new (&mem_post_info_map_) MemPostInfoMap();
1366 post_info_created_ = true;
1367 }
1368 return *reinterpret_cast<MemPostInfoMap*>(&mem_post_info_map_);
1369 }
1370
IsDuplicateKeySeq(uint32_t column_family_id,const Slice & key)1371 bool IsDuplicateKeySeq(uint32_t column_family_id, const Slice& key) {
1372 assert(!write_after_commit_);
1373 assert(rebuilding_trx_ != nullptr);
1374 if (!dup_dectector_on_) {
1375 new (&duplicate_detector_) DuplicateDetector(db_);
1376 dup_dectector_on_ = true;
1377 }
1378 return reinterpret_cast<DuplicateDetector*>
1379 (&duplicate_detector_)->IsDuplicateKeySeq(column_family_id, key, sequence_);
1380 }
1381
NextProtectionInfo()1382 const ProtectionInfoKVOC64* NextProtectionInfo() {
1383 const ProtectionInfoKVOC64* res = nullptr;
1384 if (prot_info_ != nullptr) {
1385 assert(prot_info_idx_ < prot_info_->entries_.size());
1386 res = &prot_info_->entries_[prot_info_idx_];
1387 ++prot_info_idx_;
1388 }
1389 return res;
1390 }
1391
1392 protected:
WriteBeforePrepare() const1393 bool WriteBeforePrepare() const override { return write_before_prepare_; }
WriteAfterCommit() const1394 bool WriteAfterCommit() const override { return write_after_commit_; }
1395
1396 public:
1397 // cf_mems should not be shared with concurrent inserters
MemTableInserter(SequenceNumber _sequence,ColumnFamilyMemTables * cf_mems,FlushScheduler * flush_scheduler,TrimHistoryScheduler * trim_history_scheduler,bool ignore_missing_column_families,uint64_t recovering_log_number,DB * db,bool concurrent_memtable_writes,const WriteBatch::ProtectionInfo * prot_info,bool * has_valid_writes=nullptr,bool seq_per_batch=false,bool batch_per_txn=true,bool hint_per_batch=false)1398 MemTableInserter(SequenceNumber _sequence, ColumnFamilyMemTables* cf_mems,
1399 FlushScheduler* flush_scheduler,
1400 TrimHistoryScheduler* trim_history_scheduler,
1401 bool ignore_missing_column_families,
1402 uint64_t recovering_log_number, DB* db,
1403 bool concurrent_memtable_writes,
1404 const WriteBatch::ProtectionInfo* prot_info,
1405 bool* has_valid_writes = nullptr, bool seq_per_batch = false,
1406 bool batch_per_txn = true, bool hint_per_batch = false)
1407 : sequence_(_sequence),
1408 cf_mems_(cf_mems),
1409 flush_scheduler_(flush_scheduler),
1410 trim_history_scheduler_(trim_history_scheduler),
1411 ignore_missing_column_families_(ignore_missing_column_families),
1412 recovering_log_number_(recovering_log_number),
1413 log_number_ref_(0),
1414 db_(static_cast_with_check<DBImpl>(db)),
1415 concurrent_memtable_writes_(concurrent_memtable_writes),
1416 post_info_created_(false),
1417 prot_info_(prot_info),
1418 prot_info_idx_(0),
1419 has_valid_writes_(has_valid_writes),
1420 rebuilding_trx_(nullptr),
1421 rebuilding_trx_seq_(0),
1422 seq_per_batch_(seq_per_batch),
1423 // Write after commit currently uses one seq per key (instead of per
1424 // batch). So seq_per_batch being false indicates write_after_commit
1425 // approach.
1426 write_after_commit_(!seq_per_batch),
1427 // WriteUnprepared can write WriteBatches per transaction, so
1428 // batch_per_txn being false indicates write_before_prepare.
1429 write_before_prepare_(!batch_per_txn),
1430 unprepared_batch_(false),
1431 duplicate_detector_(),
1432 dup_dectector_on_(false),
1433 hint_per_batch_(hint_per_batch),
1434 hint_created_(false) {
1435 assert(cf_mems_);
1436 }
1437
~MemTableInserter()1438 ~MemTableInserter() override {
1439 if (dup_dectector_on_) {
1440 reinterpret_cast<DuplicateDetector*>
1441 (&duplicate_detector_)->~DuplicateDetector();
1442 }
1443 if (post_info_created_) {
1444 reinterpret_cast<MemPostInfoMap*>
1445 (&mem_post_info_map_)->~MemPostInfoMap();
1446 }
1447 if (hint_created_) {
1448 for (auto iter : GetHintMap()) {
1449 delete[] reinterpret_cast<char*>(iter.second);
1450 }
1451 reinterpret_cast<HintMap*>(&hint_)->~HintMap();
1452 }
1453 delete rebuilding_trx_;
1454 }
1455
1456 MemTableInserter(const MemTableInserter&) = delete;
1457 MemTableInserter& operator=(const MemTableInserter&) = delete;
1458
1459 // The batch seq is regularly restarted; In normal mode it is set when
1460 // MemTableInserter is constructed in the write thread and in recovery mode it
1461 // is set when a batch, which is tagged with seq, is read from the WAL.
1462 // Within a sequenced batch, which could be a merge of multiple batches, we
1463 // have two policies to advance the seq: i) seq_per_key (default) and ii)
1464 // seq_per_batch. To implement the latter we need to mark the boundary between
1465 // the individual batches. The approach is this: 1) Use the terminating
1466 // markers to indicate the boundary (kTypeEndPrepareXID, kTypeCommitXID,
1467 // kTypeRollbackXID) 2) Terminate a batch with kTypeNoop in the absence of a
1468 // natural boundary marker.
MaybeAdvanceSeq(bool batch_boundry=false)1469 void MaybeAdvanceSeq(bool batch_boundry = false) {
1470 if (batch_boundry == seq_per_batch_) {
1471 sequence_++;
1472 }
1473 }
1474
set_log_number_ref(uint64_t log)1475 void set_log_number_ref(uint64_t log) { log_number_ref_ = log; }
set_prot_info(const WriteBatch::ProtectionInfo * prot_info)1476 void set_prot_info(const WriteBatch::ProtectionInfo* prot_info) {
1477 prot_info_ = prot_info;
1478 prot_info_idx_ = 0;
1479 }
1480
sequence() const1481 SequenceNumber sequence() const { return sequence_; }
1482
PostProcess()1483 void PostProcess() {
1484 assert(concurrent_memtable_writes_);
1485 // If post info was not created there is nothing
1486 // to process and no need to create on demand
1487 if(post_info_created_) {
1488 for (auto& pair : GetPostMap()) {
1489 pair.first->BatchPostProcess(pair.second);
1490 }
1491 }
1492 }
1493
SeekToColumnFamily(uint32_t column_family_id,Status * s)1494 bool SeekToColumnFamily(uint32_t column_family_id, Status* s) {
1495 // If we are in a concurrent mode, it is the caller's responsibility
1496 // to clone the original ColumnFamilyMemTables so that each thread
1497 // has its own instance. Otherwise, it must be guaranteed that there
1498 // is no concurrent access
1499 bool found = cf_mems_->Seek(column_family_id);
1500 if (!found) {
1501 if (ignore_missing_column_families_) {
1502 *s = Status::OK();
1503 } else {
1504 *s = Status::InvalidArgument(
1505 "Invalid column family specified in write batch");
1506 }
1507 return false;
1508 }
1509 if (recovering_log_number_ != 0 &&
1510 recovering_log_number_ < cf_mems_->GetLogNumber()) {
1511 // This is true only in recovery environment (recovering_log_number_ is
1512 // always 0 in
1513 // non-recovery, regular write code-path)
1514 // * If recovering_log_number_ < cf_mems_->GetLogNumber(), this means that
1515 // column
1516 // family already contains updates from this log. We can't apply updates
1517 // twice because of update-in-place or merge workloads -- ignore the
1518 // update
1519 *s = Status::OK();
1520 return false;
1521 }
1522
1523 if (has_valid_writes_ != nullptr) {
1524 *has_valid_writes_ = true;
1525 }
1526
1527 if (log_number_ref_ > 0) {
1528 cf_mems_->GetMemTable()->RefLogContainingPrepSection(log_number_ref_);
1529 }
1530
1531 return true;
1532 }
1533
PutCFImpl(uint32_t column_family_id,const Slice & key,const Slice & value,ValueType value_type,const ProtectionInfoKVOS64 * kv_prot_info)1534 Status PutCFImpl(uint32_t column_family_id, const Slice& key,
1535 const Slice& value, ValueType value_type,
1536 const ProtectionInfoKVOS64* kv_prot_info) {
1537 // optimize for non-recovery mode
1538 if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
1539 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
1540 return WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key,
1541 value);
1542 // else insert the values to the memtable right away
1543 }
1544
1545 Status ret_status;
1546 if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
1547 if (ret_status.ok() && rebuilding_trx_ != nullptr) {
1548 assert(!write_after_commit_);
1549 // The CF is probably flushed and hence no need for insert but we still
1550 // need to keep track of the keys for upcoming rollback/commit.
1551 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
1552 ret_status = WriteBatchInternal::Put(rebuilding_trx_, column_family_id,
1553 key, value);
1554 if (ret_status.ok()) {
1555 MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, key));
1556 }
1557 } else if (ret_status.ok()) {
1558 MaybeAdvanceSeq(false /* batch_boundary */);
1559 }
1560 return ret_status;
1561 }
1562 assert(ret_status.ok());
1563
1564 MemTable* mem = cf_mems_->GetMemTable();
1565 auto* moptions = mem->GetImmutableMemTableOptions();
1566 // inplace_update_support is inconsistent with snapshots, and therefore with
1567 // any kind of transactions including the ones that use seq_per_batch
1568 assert(!seq_per_batch_ || !moptions->inplace_update_support);
1569 if (!moptions->inplace_update_support) {
1570 ret_status =
1571 mem->Add(sequence_, value_type, key, value, kv_prot_info,
1572 concurrent_memtable_writes_, get_post_process_info(mem),
1573 hint_per_batch_ ? &GetHintMap()[mem] : nullptr);
1574 } else if (moptions->inplace_callback == nullptr) {
1575 assert(!concurrent_memtable_writes_);
1576 ret_status = mem->Update(sequence_, key, value, kv_prot_info);
1577 } else {
1578 assert(!concurrent_memtable_writes_);
1579 ret_status = mem->UpdateCallback(sequence_, key, value, kv_prot_info);
1580 if (ret_status.IsNotFound()) {
1581 // key not found in memtable. Do sst get, update, add
1582 SnapshotImpl read_from_snapshot;
1583 read_from_snapshot.number_ = sequence_;
1584 ReadOptions ropts;
1585 // it's going to be overwritten for sure, so no point caching data block
1586 // containing the old version
1587 ropts.fill_cache = false;
1588 ropts.snapshot = &read_from_snapshot;
1589
1590 std::string prev_value;
1591 std::string merged_value;
1592
1593 auto cf_handle = cf_mems_->GetColumnFamilyHandle();
1594 Status get_status = Status::NotSupported();
1595 if (db_ != nullptr && recovering_log_number_ == 0) {
1596 if (cf_handle == nullptr) {
1597 cf_handle = db_->DefaultColumnFamily();
1598 }
1599 get_status = db_->Get(ropts, cf_handle, key, &prev_value);
1600 }
1601 // Intentionally overwrites the `NotFound` in `ret_status`.
1602 if (!get_status.ok() && !get_status.IsNotFound()) {
1603 ret_status = get_status;
1604 } else {
1605 ret_status = Status::OK();
1606 }
1607 if (ret_status.ok()) {
1608 UpdateStatus update_status;
1609 char* prev_buffer = const_cast<char*>(prev_value.c_str());
1610 uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
1611 if (get_status.ok()) {
1612 update_status = moptions->inplace_callback(prev_buffer, &prev_size,
1613 value, &merged_value);
1614 } else {
1615 update_status = moptions->inplace_callback(
1616 nullptr /* existing_value */, nullptr /* existing_value_size */,
1617 value, &merged_value);
1618 }
1619 if (update_status == UpdateStatus::UPDATED_INPLACE) {
1620 assert(get_status.ok());
1621 if (kv_prot_info != nullptr) {
1622 ProtectionInfoKVOS64 updated_kv_prot_info(*kv_prot_info);
1623 updated_kv_prot_info.UpdateV(value,
1624 Slice(prev_buffer, prev_size));
1625 // prev_value is updated in-place with final value.
1626 ret_status = mem->Add(sequence_, value_type, key,
1627 Slice(prev_buffer, prev_size),
1628 &updated_kv_prot_info);
1629 } else {
1630 ret_status = mem->Add(sequence_, value_type, key,
1631 Slice(prev_buffer, prev_size),
1632 nullptr /* kv_prot_info */);
1633 }
1634 if (ret_status.ok()) {
1635 RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
1636 }
1637 } else if (update_status == UpdateStatus::UPDATED) {
1638 if (kv_prot_info != nullptr) {
1639 ProtectionInfoKVOS64 updated_kv_prot_info(*kv_prot_info);
1640 updated_kv_prot_info.UpdateV(value, merged_value);
1641 // merged_value contains the final value.
1642 ret_status = mem->Add(sequence_, value_type, key,
1643 Slice(merged_value), &updated_kv_prot_info);
1644 } else {
1645 // merged_value contains the final value.
1646 ret_status =
1647 mem->Add(sequence_, value_type, key, Slice(merged_value),
1648 nullptr /* kv_prot_info */);
1649 }
1650 if (ret_status.ok()) {
1651 RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
1652 }
1653 }
1654 }
1655 }
1656 }
1657 if (UNLIKELY(ret_status.IsTryAgain())) {
1658 assert(seq_per_batch_);
1659 const bool kBatchBoundary = true;
1660 MaybeAdvanceSeq(kBatchBoundary);
1661 } else if (ret_status.ok()) {
1662 MaybeAdvanceSeq();
1663 CheckMemtableFull();
1664 }
1665 // optimize for non-recovery mode
1666 // If `ret_status` is `TryAgain` then the next (successful) try will add
1667 // the key to the rebuilding transaction object. If `ret_status` is
1668 // another non-OK `Status`, then the `rebuilding_trx_` will be thrown
1669 // away. So we only need to add to it when `ret_status.ok()`.
1670 if (UNLIKELY(ret_status.ok() && rebuilding_trx_ != nullptr)) {
1671 assert(!write_after_commit_);
1672 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
1673 ret_status = WriteBatchInternal::Put(rebuilding_trx_, column_family_id,
1674 key, value);
1675 }
1676 return ret_status;
1677 }
1678
PutCF(uint32_t column_family_id,const Slice & key,const Slice & value)1679 Status PutCF(uint32_t column_family_id, const Slice& key,
1680 const Slice& value) override {
1681 const auto* kv_prot_info = NextProtectionInfo();
1682 if (kv_prot_info != nullptr) {
1683 // Memtable needs seqno, doesn't need CF ID
1684 auto mem_kv_prot_info =
1685 kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
1686 return PutCFImpl(column_family_id, key, value, kTypeValue,
1687 &mem_kv_prot_info);
1688 }
1689 return PutCFImpl(column_family_id, key, value, kTypeValue,
1690 nullptr /* kv_prot_info */);
1691 }
1692
DeleteImpl(uint32_t,const Slice & key,const Slice & value,ValueType delete_type,const ProtectionInfoKVOS64 * kv_prot_info)1693 Status DeleteImpl(uint32_t /*column_family_id*/, const Slice& key,
1694 const Slice& value, ValueType delete_type,
1695 const ProtectionInfoKVOS64* kv_prot_info) {
1696 Status ret_status;
1697 MemTable* mem = cf_mems_->GetMemTable();
1698 ret_status =
1699 mem->Add(sequence_, delete_type, key, value, kv_prot_info,
1700 concurrent_memtable_writes_, get_post_process_info(mem),
1701 hint_per_batch_ ? &GetHintMap()[mem] : nullptr);
1702 if (UNLIKELY(ret_status.IsTryAgain())) {
1703 assert(seq_per_batch_);
1704 const bool kBatchBoundary = true;
1705 MaybeAdvanceSeq(kBatchBoundary);
1706 } else if (ret_status.ok()) {
1707 MaybeAdvanceSeq();
1708 CheckMemtableFull();
1709 }
1710 return ret_status;
1711 }
1712
DeleteCF(uint32_t column_family_id,const Slice & key)1713 Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
1714 const auto* kv_prot_info = NextProtectionInfo();
1715 // optimize for non-recovery mode
1716 if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
1717 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
1718 return WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
1719 // else insert the values to the memtable right away
1720 }
1721
1722 Status ret_status;
1723 if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
1724 if (ret_status.ok() && rebuilding_trx_ != nullptr) {
1725 assert(!write_after_commit_);
1726 // The CF is probably flushed and hence no need for insert but we still
1727 // need to keep track of the keys for upcoming rollback/commit.
1728 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
1729 ret_status =
1730 WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
1731 if (ret_status.ok()) {
1732 MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, key));
1733 }
1734 } else if (ret_status.ok()) {
1735 MaybeAdvanceSeq(false /* batch_boundary */);
1736 }
1737 return ret_status;
1738 }
1739
1740 ColumnFamilyData* cfd = cf_mems_->current();
1741 assert(!cfd || cfd->user_comparator());
1742 const size_t ts_sz = (cfd && cfd->user_comparator())
1743 ? cfd->user_comparator()->timestamp_size()
1744 : 0;
1745 const ValueType delete_type =
1746 (0 == ts_sz) ? kTypeDeletion : kTypeDeletionWithTimestamp;
1747 if (kv_prot_info != nullptr) {
1748 auto mem_kv_prot_info =
1749 kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
1750 mem_kv_prot_info.UpdateO(kTypeDeletion, delete_type);
1751 ret_status = DeleteImpl(column_family_id, key, Slice(), delete_type,
1752 &mem_kv_prot_info);
1753 } else {
1754 ret_status = DeleteImpl(column_family_id, key, Slice(), delete_type,
1755 nullptr /* kv_prot_info */);
1756 }
1757 // optimize for non-recovery mode
1758 // If `ret_status` is `TryAgain` then the next (successful) try will add
1759 // the key to the rebuilding transaction object. If `ret_status` is
1760 // another non-OK `Status`, then the `rebuilding_trx_` will be thrown
1761 // away. So we only need to add to it when `ret_status.ok()`.
1762 if (UNLIKELY(ret_status.ok() && rebuilding_trx_ != nullptr)) {
1763 assert(!write_after_commit_);
1764 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
1765 ret_status =
1766 WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
1767 }
1768 return ret_status;
1769 }
1770
SingleDeleteCF(uint32_t column_family_id,const Slice & key)1771 Status SingleDeleteCF(uint32_t column_family_id, const Slice& key) override {
1772 const auto* kv_prot_info = NextProtectionInfo();
1773 // optimize for non-recovery mode
1774 if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
1775 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
1776 return WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id,
1777 key);
1778 // else insert the values to the memtable right away
1779 }
1780
1781 Status ret_status;
1782 if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
1783 if (ret_status.ok() && rebuilding_trx_ != nullptr) {
1784 assert(!write_after_commit_);
1785 // The CF is probably flushed and hence no need for insert but we still
1786 // need to keep track of the keys for upcoming rollback/commit.
1787 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
1788 ret_status = WriteBatchInternal::SingleDelete(rebuilding_trx_,
1789 column_family_id, key);
1790 if (ret_status.ok()) {
1791 MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, key));
1792 }
1793 } else if (ret_status.ok()) {
1794 MaybeAdvanceSeq(false /* batch_boundary */);
1795 }
1796 return ret_status;
1797 }
1798 assert(ret_status.ok());
1799
1800 if (kv_prot_info != nullptr) {
1801 auto mem_kv_prot_info =
1802 kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
1803 ret_status = DeleteImpl(column_family_id, key, Slice(),
1804 kTypeSingleDeletion, &mem_kv_prot_info);
1805 } else {
1806 ret_status = DeleteImpl(column_family_id, key, Slice(),
1807 kTypeSingleDeletion, nullptr /* kv_prot_info */);
1808 }
1809 // optimize for non-recovery mode
1810 // If `ret_status` is `TryAgain` then the next (successful) try will add
1811 // the key to the rebuilding transaction object. If `ret_status` is
1812 // another non-OK `Status`, then the `rebuilding_trx_` will be thrown
1813 // away. So we only need to add to it when `ret_status.ok()`.
1814 if (UNLIKELY(ret_status.ok() && rebuilding_trx_ != nullptr)) {
1815 assert(!write_after_commit_);
1816 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
1817 ret_status = WriteBatchInternal::SingleDelete(rebuilding_trx_,
1818 column_family_id, key);
1819 }
1820 return ret_status;
1821 }
1822
DeleteRangeCF(uint32_t column_family_id,const Slice & begin_key,const Slice & end_key)1823 Status DeleteRangeCF(uint32_t column_family_id, const Slice& begin_key,
1824 const Slice& end_key) override {
1825 const auto* kv_prot_info = NextProtectionInfo();
1826 // optimize for non-recovery mode
1827 if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
1828 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
1829 return WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id,
1830 begin_key, end_key);
1831 // else insert the values to the memtable right away
1832 }
1833
1834 Status ret_status;
1835 if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
1836 if (ret_status.ok() && rebuilding_trx_ != nullptr) {
1837 assert(!write_after_commit_);
1838 // The CF is probably flushed and hence no need for insert but we still
1839 // need to keep track of the keys for upcoming rollback/commit.
1840 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
1841 ret_status = WriteBatchInternal::DeleteRange(
1842 rebuilding_trx_, column_family_id, begin_key, end_key);
1843 if (ret_status.ok()) {
1844 MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, begin_key));
1845 }
1846 } else if (ret_status.ok()) {
1847 MaybeAdvanceSeq(false /* batch_boundary */);
1848 }
1849 return ret_status;
1850 }
1851 assert(ret_status.ok());
1852
1853 if (db_ != nullptr) {
1854 auto cf_handle = cf_mems_->GetColumnFamilyHandle();
1855 if (cf_handle == nullptr) {
1856 cf_handle = db_->DefaultColumnFamily();
1857 }
1858 auto* cfd =
1859 static_cast_with_check<ColumnFamilyHandleImpl>(cf_handle)->cfd();
1860 if (!cfd->is_delete_range_supported()) {
1861 // TODO(ajkr): refactor `SeekToColumnFamily()` so it returns a `Status`.
1862 ret_status.PermitUncheckedError();
1863 return Status::NotSupported(
1864 std::string("DeleteRange not supported for table type ") +
1865 cfd->ioptions()->table_factory->Name() + " in CF " +
1866 cfd->GetName());
1867 }
1868 int cmp = cfd->user_comparator()->Compare(begin_key, end_key);
1869 if (cmp > 0) {
1870 // TODO(ajkr): refactor `SeekToColumnFamily()` so it returns a `Status`.
1871 ret_status.PermitUncheckedError();
1872 // It's an empty range where endpoints appear mistaken. Don't bother
1873 // applying it to the DB, and return an error to the user.
1874 return Status::InvalidArgument("end key comes before start key");
1875 } else if (cmp == 0) {
1876 // TODO(ajkr): refactor `SeekToColumnFamily()` so it returns a `Status`.
1877 ret_status.PermitUncheckedError();
1878 // It's an empty range. Don't bother applying it to the DB.
1879 return Status::OK();
1880 }
1881 }
1882
1883 if (kv_prot_info != nullptr) {
1884 auto mem_kv_prot_info =
1885 kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
1886 ret_status = DeleteImpl(column_family_id, begin_key, end_key,
1887 kTypeRangeDeletion, &mem_kv_prot_info);
1888 } else {
1889 ret_status = DeleteImpl(column_family_id, begin_key, end_key,
1890 kTypeRangeDeletion, nullptr /* kv_prot_info */);
1891 }
1892 // optimize for non-recovery mode
1893 // If `ret_status` is `TryAgain` then the next (successful) try will add
1894 // the key to the rebuilding transaction object. If `ret_status` is
1895 // another non-OK `Status`, then the `rebuilding_trx_` will be thrown
1896 // away. So we only need to add to it when `ret_status.ok()`.
1897 if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
1898 assert(!write_after_commit_);
1899 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
1900 ret_status = WriteBatchInternal::DeleteRange(
1901 rebuilding_trx_, column_family_id, begin_key, end_key);
1902 }
1903 return ret_status;
1904 }
1905
MergeCF(uint32_t column_family_id,const Slice & key,const Slice & value)1906 Status MergeCF(uint32_t column_family_id, const Slice& key,
1907 const Slice& value) override {
1908 const auto* kv_prot_info = NextProtectionInfo();
1909 // optimize for non-recovery mode
1910 if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
1911 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
1912 return WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key,
1913 value);
1914 // else insert the values to the memtable right away
1915 }
1916
1917 Status ret_status;
1918 if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
1919 if (ret_status.ok() && rebuilding_trx_ != nullptr) {
1920 assert(!write_after_commit_);
1921 // The CF is probably flushed and hence no need for insert but we still
1922 // need to keep track of the keys for upcoming rollback/commit.
1923 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
1924 ret_status = WriteBatchInternal::Merge(rebuilding_trx_,
1925 column_family_id, key, value);
1926 if (ret_status.ok()) {
1927 MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, key));
1928 }
1929 } else if (ret_status.ok()) {
1930 MaybeAdvanceSeq(false /* batch_boundary */);
1931 }
1932 return ret_status;
1933 }
1934 assert(ret_status.ok());
1935
1936 MemTable* mem = cf_mems_->GetMemTable();
1937 auto* moptions = mem->GetImmutableMemTableOptions();
1938 if (moptions->merge_operator == nullptr) {
1939 return Status::InvalidArgument(
1940 "Merge requires `ColumnFamilyOptions::merge_operator != nullptr`");
1941 }
1942 bool perform_merge = false;
1943 assert(!concurrent_memtable_writes_ ||
1944 moptions->max_successive_merges == 0);
1945
1946 // If we pass DB through and options.max_successive_merges is hit
1947 // during recovery, Get() will be issued which will try to acquire
1948 // DB mutex and cause deadlock, as DB mutex is already held.
1949 // So we disable merge in recovery
1950 if (moptions->max_successive_merges > 0 && db_ != nullptr &&
1951 recovering_log_number_ == 0) {
1952 assert(!concurrent_memtable_writes_);
1953 LookupKey lkey(key, sequence_);
1954
1955 // Count the number of successive merges at the head
1956 // of the key in the memtable
1957 size_t num_merges = mem->CountSuccessiveMergeEntries(lkey);
1958
1959 if (num_merges >= moptions->max_successive_merges) {
1960 perform_merge = true;
1961 }
1962 }
1963
1964 if (perform_merge) {
1965 // 1) Get the existing value
1966 std::string get_value;
1967
1968 // Pass in the sequence number so that we also include previous merge
1969 // operations in the same batch.
1970 SnapshotImpl read_from_snapshot;
1971 read_from_snapshot.number_ = sequence_;
1972 ReadOptions read_options;
1973 read_options.snapshot = &read_from_snapshot;
1974
1975 auto cf_handle = cf_mems_->GetColumnFamilyHandle();
1976 if (cf_handle == nullptr) {
1977 cf_handle = db_->DefaultColumnFamily();
1978 }
1979 Status get_status = db_->Get(read_options, cf_handle, key, &get_value);
1980 if (!get_status.ok()) {
1981 // Failed to read a key we know exists. Store the delta in memtable.
1982 perform_merge = false;
1983 } else {
1984 Slice get_value_slice = Slice(get_value);
1985
1986 // 2) Apply this merge
1987 auto merge_operator = moptions->merge_operator;
1988 assert(merge_operator);
1989
1990 std::string new_value;
1991 Status merge_status = MergeHelper::TimedFullMerge(
1992 merge_operator, key, &get_value_slice, {value}, &new_value,
1993 moptions->info_log, moptions->statistics,
1994 SystemClock::Default().get());
1995
1996 if (!merge_status.ok()) {
1997 // Failed to merge!
1998 // Store the delta in memtable
1999 perform_merge = false;
2000 } else {
2001 // 3) Add value to memtable
2002 assert(!concurrent_memtable_writes_);
2003 if (kv_prot_info != nullptr) {
2004 auto merged_kv_prot_info =
2005 kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
2006 merged_kv_prot_info.UpdateV(value, new_value);
2007 merged_kv_prot_info.UpdateO(kTypeMerge, kTypeValue);
2008 ret_status = mem->Add(sequence_, kTypeValue, key, new_value,
2009 &merged_kv_prot_info);
2010 } else {
2011 ret_status = mem->Add(sequence_, kTypeValue, key, new_value,
2012 nullptr /* kv_prot_info */);
2013 }
2014 }
2015 }
2016 }
2017
2018 if (!perform_merge) {
2019 assert(ret_status.ok());
2020 // Add merge operand to memtable
2021 if (kv_prot_info != nullptr) {
2022 auto mem_kv_prot_info =
2023 kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
2024 ret_status =
2025 mem->Add(sequence_, kTypeMerge, key, value, &mem_kv_prot_info,
2026 concurrent_memtable_writes_, get_post_process_info(mem));
2027 } else {
2028 ret_status = mem->Add(
2029 sequence_, kTypeMerge, key, value, nullptr /* kv_prot_info */,
2030 concurrent_memtable_writes_, get_post_process_info(mem));
2031 }
2032 }
2033
2034 if (UNLIKELY(ret_status.IsTryAgain())) {
2035 assert(seq_per_batch_);
2036 const bool kBatchBoundary = true;
2037 MaybeAdvanceSeq(kBatchBoundary);
2038 } else if (ret_status.ok()) {
2039 MaybeAdvanceSeq();
2040 CheckMemtableFull();
2041 }
2042 // optimize for non-recovery mode
2043 // If `ret_status` is `TryAgain` then the next (successful) try will add
2044 // the key to the rebuilding transaction object. If `ret_status` is
2045 // another non-OK `Status`, then the `rebuilding_trx_` will be thrown
2046 // away. So we only need to add to it when `ret_status.ok()`.
2047 if (UNLIKELY(ret_status.ok() && rebuilding_trx_ != nullptr)) {
2048 assert(!write_after_commit_);
2049 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
2050 ret_status = WriteBatchInternal::Merge(rebuilding_trx_, column_family_id,
2051 key, value);
2052 }
2053 return ret_status;
2054 }
2055
PutBlobIndexCF(uint32_t column_family_id,const Slice & key,const Slice & value)2056 Status PutBlobIndexCF(uint32_t column_family_id, const Slice& key,
2057 const Slice& value) override {
2058 const auto* kv_prot_info = NextProtectionInfo();
2059 if (kv_prot_info != nullptr) {
2060 // Memtable needs seqno, doesn't need CF ID
2061 auto mem_kv_prot_info =
2062 kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
2063 // Same as PutCF except for value type.
2064 return PutCFImpl(column_family_id, key, value, kTypeBlobIndex,
2065 &mem_kv_prot_info);
2066 } else {
2067 return PutCFImpl(column_family_id, key, value, kTypeBlobIndex,
2068 nullptr /* kv_prot_info */);
2069 }
2070 }
2071
CheckMemtableFull()2072 void CheckMemtableFull() {
2073 if (flush_scheduler_ != nullptr) {
2074 auto* cfd = cf_mems_->current();
2075 assert(cfd != nullptr);
2076 if (cfd->mem()->ShouldScheduleFlush() &&
2077 cfd->mem()->MarkFlushScheduled()) {
2078 // MarkFlushScheduled only returns true if we are the one that
2079 // should take action, so no need to dedup further
2080 flush_scheduler_->ScheduleWork(cfd);
2081 }
2082 }
2083 // check if memtable_list size exceeds max_write_buffer_size_to_maintain
2084 if (trim_history_scheduler_ != nullptr) {
2085 auto* cfd = cf_mems_->current();
2086
2087 assert(cfd);
2088 assert(cfd->ioptions());
2089
2090 const size_t size_to_maintain = static_cast<size_t>(
2091 cfd->ioptions()->max_write_buffer_size_to_maintain);
2092
2093 if (size_to_maintain > 0) {
2094 MemTableList* const imm = cfd->imm();
2095 assert(imm);
2096
2097 if (imm->HasHistory()) {
2098 const MemTable* const mem = cfd->mem();
2099 assert(mem);
2100
2101 if (mem->ApproximateMemoryUsageFast() +
2102 imm->ApproximateMemoryUsageExcludingLast() >=
2103 size_to_maintain &&
2104 imm->MarkTrimHistoryNeeded()) {
2105 trim_history_scheduler_->ScheduleWork(cfd);
2106 }
2107 }
2108 }
2109 }
2110 }
2111
2112 // The write batch handler calls MarkBeginPrepare with unprepare set to true
2113 // if it encounters the kTypeBeginUnprepareXID marker.
MarkBeginPrepare(bool unprepare)2114 Status MarkBeginPrepare(bool unprepare) override {
2115 assert(rebuilding_trx_ == nullptr);
2116 assert(db_);
2117
2118 if (recovering_log_number_ != 0) {
2119 // during recovery we rebuild a hollow transaction
2120 // from all encountered prepare sections of the wal
2121 if (db_->allow_2pc() == false) {
2122 return Status::NotSupported(
2123 "WAL contains prepared transactions. Open with "
2124 "TransactionDB::Open().");
2125 }
2126
2127 // we are now iterating through a prepared section
2128 rebuilding_trx_ = new WriteBatch();
2129 rebuilding_trx_seq_ = sequence_;
2130 // Verify that we have matching MarkBeginPrepare/MarkEndPrepare markers.
2131 // unprepared_batch_ should be false because it is false by default, and
2132 // gets reset to false in MarkEndPrepare.
2133 assert(!unprepared_batch_);
2134 unprepared_batch_ = unprepare;
2135
2136 if (has_valid_writes_ != nullptr) {
2137 *has_valid_writes_ = true;
2138 }
2139 }
2140
2141 return Status::OK();
2142 }
2143
MarkEndPrepare(const Slice & name)2144 Status MarkEndPrepare(const Slice& name) override {
2145 assert(db_);
2146 assert((rebuilding_trx_ != nullptr) == (recovering_log_number_ != 0));
2147
2148 if (recovering_log_number_ != 0) {
2149 assert(db_->allow_2pc());
2150 size_t batch_cnt =
2151 write_after_commit_
2152 ? 0 // 0 will disable further checks
2153 : static_cast<size_t>(sequence_ - rebuilding_trx_seq_ + 1);
2154 db_->InsertRecoveredTransaction(recovering_log_number_, name.ToString(),
2155 rebuilding_trx_, rebuilding_trx_seq_,
2156 batch_cnt, unprepared_batch_);
2157 unprepared_batch_ = false;
2158 rebuilding_trx_ = nullptr;
2159 } else {
2160 assert(rebuilding_trx_ == nullptr);
2161 }
2162 const bool batch_boundry = true;
2163 MaybeAdvanceSeq(batch_boundry);
2164
2165 return Status::OK();
2166 }
2167
MarkNoop(bool empty_batch)2168 Status MarkNoop(bool empty_batch) override {
2169 // A hack in pessimistic transaction could result into a noop at the start
2170 // of the write batch, that should be ignored.
2171 if (!empty_batch) {
2172 // In the absence of Prepare markers, a kTypeNoop tag indicates the end of
2173 // a batch. This happens when write batch commits skipping the prepare
2174 // phase.
2175 const bool batch_boundry = true;
2176 MaybeAdvanceSeq(batch_boundry);
2177 }
2178 return Status::OK();
2179 }
2180
MarkCommit(const Slice & name)2181 Status MarkCommit(const Slice& name) override {
2182 assert(db_);
2183
2184 Status s;
2185
2186 if (recovering_log_number_ != 0) {
2187 // in recovery when we encounter a commit marker
2188 // we lookup this transaction in our set of rebuilt transactions
2189 // and commit.
2190 auto trx = db_->GetRecoveredTransaction(name.ToString());
2191
2192 // the log containing the prepared section may have
2193 // been released in the last incarnation because the
2194 // data was flushed to L0
2195 if (trx != nullptr) {
2196 // at this point individual CF lognumbers will prevent
2197 // duplicate re-insertion of values.
2198 assert(log_number_ref_ == 0);
2199 if (write_after_commit_) {
2200 // write_after_commit_ can only have one batch in trx.
2201 assert(trx->batches_.size() == 1);
2202 const auto& batch_info = trx->batches_.begin()->second;
2203 // all inserts must reference this trx log number
2204 log_number_ref_ = batch_info.log_number_;
2205 s = batch_info.batch_->Iterate(this);
2206 log_number_ref_ = 0;
2207 }
2208 // else the values are already inserted before the commit
2209
2210 if (s.ok()) {
2211 db_->DeleteRecoveredTransaction(name.ToString());
2212 }
2213 if (has_valid_writes_ != nullptr) {
2214 *has_valid_writes_ = true;
2215 }
2216 }
2217 } else {
2218 // When writes are not delayed until commit, there is no disconnect
2219 // between a memtable write and the WAL that supports it. So the commit
2220 // need not reference any log as the only log to which it depends.
2221 assert(!write_after_commit_ || log_number_ref_ > 0);
2222 }
2223 const bool batch_boundry = true;
2224 MaybeAdvanceSeq(batch_boundry);
2225
2226 return s;
2227 }
2228
MarkRollback(const Slice & name)2229 Status MarkRollback(const Slice& name) override {
2230 assert(db_);
2231
2232 if (recovering_log_number_ != 0) {
2233 auto trx = db_->GetRecoveredTransaction(name.ToString());
2234
2235 // the log containing the transactions prep section
2236 // may have been released in the previous incarnation
2237 // because we knew it had been rolled back
2238 if (trx != nullptr) {
2239 db_->DeleteRecoveredTransaction(name.ToString());
2240 }
2241 } else {
2242 // in non recovery we simply ignore this tag
2243 }
2244
2245 const bool batch_boundry = true;
2246 MaybeAdvanceSeq(batch_boundry);
2247
2248 return Status::OK();
2249 }
2250
2251 private:
get_post_process_info(MemTable * mem)2252 MemTablePostProcessInfo* get_post_process_info(MemTable* mem) {
2253 if (!concurrent_memtable_writes_) {
2254 // No need to batch counters locally if we don't use concurrent mode.
2255 return nullptr;
2256 }
2257 return &GetPostMap()[mem];
2258 }
2259 };
2260
2261 // This function can only be called in these conditions:
2262 // 1) During Recovery()
2263 // 2) During Write(), in a single-threaded write thread
2264 // 3) During Write(), in a concurrent context where memtables has been cloned
2265 // The reason is that it calls memtables->Seek(), which has a stateful cache
InsertInto(WriteThread::WriteGroup & write_group,SequenceNumber sequence,ColumnFamilyMemTables * memtables,FlushScheduler * flush_scheduler,TrimHistoryScheduler * trim_history_scheduler,bool ignore_missing_column_families,uint64_t recovery_log_number,DB * db,bool concurrent_memtable_writes,bool seq_per_batch,bool batch_per_txn)2266 Status WriteBatchInternal::InsertInto(
2267 WriteThread::WriteGroup& write_group, SequenceNumber sequence,
2268 ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
2269 TrimHistoryScheduler* trim_history_scheduler,
2270 bool ignore_missing_column_families, uint64_t recovery_log_number, DB* db,
2271 bool concurrent_memtable_writes, bool seq_per_batch, bool batch_per_txn) {
2272 MemTableInserter inserter(
2273 sequence, memtables, flush_scheduler, trim_history_scheduler,
2274 ignore_missing_column_families, recovery_log_number, db,
2275 concurrent_memtable_writes, nullptr /* prot_info */,
2276 nullptr /*has_valid_writes*/, seq_per_batch, batch_per_txn);
2277 for (auto w : write_group) {
2278 if (w->CallbackFailed()) {
2279 continue;
2280 }
2281 w->sequence = inserter.sequence();
2282 if (!w->ShouldWriteToMemtable()) {
2283 // In seq_per_batch_ mode this advances the seq by one.
2284 inserter.MaybeAdvanceSeq(true);
2285 continue;
2286 }
2287 SetSequence(w->batch, inserter.sequence());
2288 inserter.set_log_number_ref(w->log_ref);
2289 inserter.set_prot_info(w->batch->prot_info_.get());
2290 w->status = w->batch->Iterate(&inserter);
2291 if (!w->status.ok()) {
2292 return w->status;
2293 }
2294 assert(!seq_per_batch || w->batch_cnt != 0);
2295 assert(!seq_per_batch || inserter.sequence() - w->sequence == w->batch_cnt);
2296 }
2297 return Status::OK();
2298 }
2299
InsertInto(WriteThread::Writer * writer,SequenceNumber sequence,ColumnFamilyMemTables * memtables,FlushScheduler * flush_scheduler,TrimHistoryScheduler * trim_history_scheduler,bool ignore_missing_column_families,uint64_t log_number,DB * db,bool concurrent_memtable_writes,bool seq_per_batch,size_t batch_cnt,bool batch_per_txn,bool hint_per_batch)2300 Status WriteBatchInternal::InsertInto(
2301 WriteThread::Writer* writer, SequenceNumber sequence,
2302 ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
2303 TrimHistoryScheduler* trim_history_scheduler,
2304 bool ignore_missing_column_families, uint64_t log_number, DB* db,
2305 bool concurrent_memtable_writes, bool seq_per_batch, size_t batch_cnt,
2306 bool batch_per_txn, bool hint_per_batch) {
2307 #ifdef NDEBUG
2308 (void)batch_cnt;
2309 #endif
2310 assert(writer->ShouldWriteToMemtable());
2311 MemTableInserter inserter(sequence, memtables, flush_scheduler,
2312 trim_history_scheduler,
2313 ignore_missing_column_families, log_number, db,
2314 concurrent_memtable_writes, nullptr /* prot_info */,
2315 nullptr /*has_valid_writes*/, seq_per_batch,
2316 batch_per_txn, hint_per_batch);
2317 SetSequence(writer->batch, sequence);
2318 inserter.set_log_number_ref(writer->log_ref);
2319 inserter.set_prot_info(writer->batch->prot_info_.get());
2320 Status s = writer->batch->Iterate(&inserter);
2321 assert(!seq_per_batch || batch_cnt != 0);
2322 assert(!seq_per_batch || inserter.sequence() - sequence == batch_cnt);
2323 if (concurrent_memtable_writes) {
2324 inserter.PostProcess();
2325 }
2326 return s;
2327 }
2328
InsertInto(const WriteBatch * batch,ColumnFamilyMemTables * memtables,FlushScheduler * flush_scheduler,TrimHistoryScheduler * trim_history_scheduler,bool ignore_missing_column_families,uint64_t log_number,DB * db,bool concurrent_memtable_writes,SequenceNumber * next_seq,bool * has_valid_writes,bool seq_per_batch,bool batch_per_txn)2329 Status WriteBatchInternal::InsertInto(
2330 const WriteBatch* batch, ColumnFamilyMemTables* memtables,
2331 FlushScheduler* flush_scheduler,
2332 TrimHistoryScheduler* trim_history_scheduler,
2333 bool ignore_missing_column_families, uint64_t log_number, DB* db,
2334 bool concurrent_memtable_writes, SequenceNumber* next_seq,
2335 bool* has_valid_writes, bool seq_per_batch, bool batch_per_txn) {
2336 MemTableInserter inserter(Sequence(batch), memtables, flush_scheduler,
2337 trim_history_scheduler,
2338 ignore_missing_column_families, log_number, db,
2339 concurrent_memtable_writes, batch->prot_info_.get(),
2340 has_valid_writes, seq_per_batch, batch_per_txn);
2341 Status s = batch->Iterate(&inserter);
2342 if (next_seq != nullptr) {
2343 *next_seq = inserter.sequence();
2344 }
2345 if (concurrent_memtable_writes) {
2346 inserter.PostProcess();
2347 }
2348 return s;
2349 }
2350
SetContents(WriteBatch * b,const Slice & contents)2351 Status WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
2352 assert(contents.size() >= WriteBatchInternal::kHeader);
2353 assert(b->prot_info_ == nullptr);
2354 b->rep_.assign(contents.data(), contents.size());
2355 b->content_flags_.store(ContentFlags::DEFERRED, std::memory_order_relaxed);
2356 return Status::OK();
2357 }
2358
Append(WriteBatch * dst,const WriteBatch * src,const bool wal_only)2359 Status WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src,
2360 const bool wal_only) {
2361 assert(dst->Count() == 0 ||
2362 (dst->prot_info_ == nullptr) == (src->prot_info_ == nullptr));
2363 size_t src_len;
2364 int src_count;
2365 uint32_t src_flags;
2366
2367 const SavePoint& batch_end = src->GetWalTerminationPoint();
2368
2369 if (wal_only && !batch_end.is_cleared()) {
2370 src_len = batch_end.size - WriteBatchInternal::kHeader;
2371 src_count = batch_end.count;
2372 src_flags = batch_end.content_flags;
2373 } else {
2374 src_len = src->rep_.size() - WriteBatchInternal::kHeader;
2375 src_count = Count(src);
2376 src_flags = src->content_flags_.load(std::memory_order_relaxed);
2377 }
2378
2379 if (dst->prot_info_ != nullptr) {
2380 std::copy(src->prot_info_->entries_.begin(),
2381 src->prot_info_->entries_.begin() + src_count,
2382 std::back_inserter(dst->prot_info_->entries_));
2383 } else if (src->prot_info_ != nullptr) {
2384 dst->prot_info_.reset(new WriteBatch::ProtectionInfo(*src->prot_info_));
2385 }
2386 SetCount(dst, Count(dst) + src_count);
2387 assert(src->rep_.size() >= WriteBatchInternal::kHeader);
2388 dst->rep_.append(src->rep_.data() + WriteBatchInternal::kHeader, src_len);
2389 dst->content_flags_.store(
2390 dst->content_flags_.load(std::memory_order_relaxed) | src_flags,
2391 std::memory_order_relaxed);
2392 return Status::OK();
2393 }
2394
AppendedByteSize(size_t leftByteSize,size_t rightByteSize)2395 size_t WriteBatchInternal::AppendedByteSize(size_t leftByteSize,
2396 size_t rightByteSize) {
2397 if (leftByteSize == 0 || rightByteSize == 0) {
2398 return leftByteSize + rightByteSize;
2399 } else {
2400 return leftByteSize + rightByteSize - WriteBatchInternal::kHeader;
2401 }
2402 }
2403
2404 } // namespace ROCKSDB_NAMESPACE
2405