1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 //
10 // WriteBatch::rep_ :=
11 //    sequence: fixed64
12 //    count: fixed32
13 //    data: record[count]
14 // record :=
15 //    kTypeValue varstring varstring
16 //    kTypeDeletion varstring
17 //    kTypeSingleDeletion varstring
18 //    kTypeRangeDeletion varstring varstring
19 //    kTypeMerge varstring varstring
20 //    kTypeColumnFamilyValue varint32 varstring varstring
21 //    kTypeColumnFamilyDeletion varint32 varstring
22 //    kTypeColumnFamilySingleDeletion varint32 varstring
23 //    kTypeColumnFamilyRangeDeletion varint32 varstring varstring
24 //    kTypeColumnFamilyMerge varint32 varstring varstring
25 //    kTypeBeginPrepareXID varstring
26 //    kTypeEndPrepareXID
27 //    kTypeCommitXID varstring
28 //    kTypeRollbackXID varstring
29 //    kTypeBeginPersistedPrepareXID varstring
30 //    kTypeBeginUnprepareXID varstring
31 //    kTypeNoop
32 // varstring :=
33 //    len: varint32
34 //    data: uint8[len]
35 
36 #include "rocksdb/write_batch.h"
37 
38 #include <map>
39 #include <stack>
40 #include <stdexcept>
41 #include <type_traits>
42 #include <unordered_map>
43 #include <vector>
44 
45 #include "db/column_family.h"
46 #include "db/db_impl/db_impl.h"
47 #include "db/dbformat.h"
48 #include "db/flush_scheduler.h"
49 #include "db/kv_checksum.h"
50 #include "db/memtable.h"
51 #include "db/merge_context.h"
52 #include "db/snapshot_impl.h"
53 #include "db/trim_history_scheduler.h"
54 #include "db/write_batch_internal.h"
55 #include "monitoring/perf_context_imp.h"
56 #include "monitoring/statistics.h"
57 #include "port/lang.h"
58 #include "rocksdb/merge_operator.h"
59 #include "rocksdb/system_clock.h"
60 #include "util/autovector.h"
61 #include "util/cast_util.h"
62 #include "util/coding.h"
63 #include "util/duplicate_detector.h"
64 #include "util/string_util.h"
65 
66 namespace ROCKSDB_NAMESPACE {
67 
68 // anon namespace for file-local types
69 namespace {
70 
71 enum ContentFlags : uint32_t {
72   DEFERRED = 1 << 0,
73   HAS_PUT = 1 << 1,
74   HAS_DELETE = 1 << 2,
75   HAS_SINGLE_DELETE = 1 << 3,
76   HAS_MERGE = 1 << 4,
77   HAS_BEGIN_PREPARE = 1 << 5,
78   HAS_END_PREPARE = 1 << 6,
79   HAS_COMMIT = 1 << 7,
80   HAS_ROLLBACK = 1 << 8,
81   HAS_DELETE_RANGE = 1 << 9,
82   HAS_BLOB_INDEX = 1 << 10,
83   HAS_BEGIN_UNPREPARE = 1 << 11,
84 };
85 
86 struct BatchContentClassifier : public WriteBatch::Handler {
87   uint32_t content_flags = 0;
88 
PutCFROCKSDB_NAMESPACE::__anon011bc7090111::BatchContentClassifier89   Status PutCF(uint32_t, const Slice&, const Slice&) override {
90     content_flags |= ContentFlags::HAS_PUT;
91     return Status::OK();
92   }
93 
DeleteCFROCKSDB_NAMESPACE::__anon011bc7090111::BatchContentClassifier94   Status DeleteCF(uint32_t, const Slice&) override {
95     content_flags |= ContentFlags::HAS_DELETE;
96     return Status::OK();
97   }
98 
SingleDeleteCFROCKSDB_NAMESPACE::__anon011bc7090111::BatchContentClassifier99   Status SingleDeleteCF(uint32_t, const Slice&) override {
100     content_flags |= ContentFlags::HAS_SINGLE_DELETE;
101     return Status::OK();
102   }
103 
DeleteRangeCFROCKSDB_NAMESPACE::__anon011bc7090111::BatchContentClassifier104   Status DeleteRangeCF(uint32_t, const Slice&, const Slice&) override {
105     content_flags |= ContentFlags::HAS_DELETE_RANGE;
106     return Status::OK();
107   }
108 
MergeCFROCKSDB_NAMESPACE::__anon011bc7090111::BatchContentClassifier109   Status MergeCF(uint32_t, const Slice&, const Slice&) override {
110     content_flags |= ContentFlags::HAS_MERGE;
111     return Status::OK();
112   }
113 
PutBlobIndexCFROCKSDB_NAMESPACE::__anon011bc7090111::BatchContentClassifier114   Status PutBlobIndexCF(uint32_t, const Slice&, const Slice&) override {
115     content_flags |= ContentFlags::HAS_BLOB_INDEX;
116     return Status::OK();
117   }
118 
MarkBeginPrepareROCKSDB_NAMESPACE::__anon011bc7090111::BatchContentClassifier119   Status MarkBeginPrepare(bool unprepare) override {
120     content_flags |= ContentFlags::HAS_BEGIN_PREPARE;
121     if (unprepare) {
122       content_flags |= ContentFlags::HAS_BEGIN_UNPREPARE;
123     }
124     return Status::OK();
125   }
126 
MarkEndPrepareROCKSDB_NAMESPACE::__anon011bc7090111::BatchContentClassifier127   Status MarkEndPrepare(const Slice&) override {
128     content_flags |= ContentFlags::HAS_END_PREPARE;
129     return Status::OK();
130   }
131 
MarkCommitROCKSDB_NAMESPACE::__anon011bc7090111::BatchContentClassifier132   Status MarkCommit(const Slice&) override {
133     content_flags |= ContentFlags::HAS_COMMIT;
134     return Status::OK();
135   }
136 
MarkRollbackROCKSDB_NAMESPACE::__anon011bc7090111::BatchContentClassifier137   Status MarkRollback(const Slice&) override {
138     content_flags |= ContentFlags::HAS_ROLLBACK;
139     return Status::OK();
140   }
141 };
142 
143 class TimestampAssigner : public WriteBatch::Handler {
144  public:
TimestampAssigner(const Slice & ts,WriteBatch::ProtectionInfo * prot_info)145   explicit TimestampAssigner(const Slice& ts,
146                              WriteBatch::ProtectionInfo* prot_info)
147       : timestamp_(ts),
148         timestamps_(kEmptyTimestampList),
149         prot_info_(prot_info) {}
TimestampAssigner(const std::vector<Slice> & ts_list,WriteBatch::ProtectionInfo * prot_info)150   explicit TimestampAssigner(const std::vector<Slice>& ts_list,
151                              WriteBatch::ProtectionInfo* prot_info)
152       : timestamps_(ts_list), prot_info_(prot_info) {
153     SanityCheck();
154   }
~TimestampAssigner()155   ~TimestampAssigner() override {}
156 
PutCF(uint32_t,const Slice & key,const Slice &)157   Status PutCF(uint32_t, const Slice& key, const Slice&) override {
158     AssignTimestamp(key);
159     ++idx_;
160     return Status::OK();
161   }
162 
DeleteCF(uint32_t,const Slice & key)163   Status DeleteCF(uint32_t, const Slice& key) override {
164     AssignTimestamp(key);
165     ++idx_;
166     return Status::OK();
167   }
168 
SingleDeleteCF(uint32_t,const Slice & key)169   Status SingleDeleteCF(uint32_t, const Slice& key) override {
170     AssignTimestamp(key);
171     ++idx_;
172     return Status::OK();
173   }
174 
DeleteRangeCF(uint32_t,const Slice & begin_key,const Slice &)175   Status DeleteRangeCF(uint32_t, const Slice& begin_key,
176                        const Slice& /* end_key */) override {
177     AssignTimestamp(begin_key);
178     ++idx_;
179     return Status::OK();
180   }
181 
MergeCF(uint32_t,const Slice & key,const Slice &)182   Status MergeCF(uint32_t, const Slice& key, const Slice&) override {
183     AssignTimestamp(key);
184     ++idx_;
185     return Status::OK();
186   }
187 
PutBlobIndexCF(uint32_t,const Slice &,const Slice &)188   Status PutBlobIndexCF(uint32_t, const Slice&, const Slice&) override {
189     // TODO (yanqin): support blob db in the future.
190     return Status::OK();
191   }
192 
MarkBeginPrepare(bool)193   Status MarkBeginPrepare(bool) override {
194     // TODO (yanqin): support in the future.
195     return Status::OK();
196   }
197 
MarkEndPrepare(const Slice &)198   Status MarkEndPrepare(const Slice&) override {
199     // TODO (yanqin): support in the future.
200     return Status::OK();
201   }
202 
MarkCommit(const Slice &)203   Status MarkCommit(const Slice&) override {
204     // TODO (yanqin): support in the future.
205     return Status::OK();
206   }
207 
MarkRollback(const Slice &)208   Status MarkRollback(const Slice&) override {
209     // TODO (yanqin): support in the future.
210     return Status::OK();
211   }
212 
213  private:
SanityCheck() const214   void SanityCheck() const {
215     assert(!timestamps_.empty());
216 #ifndef NDEBUG
217     const size_t ts_sz = timestamps_[0].size();
218     for (size_t i = 1; i != timestamps_.size(); ++i) {
219       assert(ts_sz == timestamps_[i].size());
220     }
221 #endif  // !NDEBUG
222   }
223 
AssignTimestamp(const Slice & key)224   void AssignTimestamp(const Slice& key) {
225     assert(timestamps_.empty() || idx_ < timestamps_.size());
226     const Slice& ts = timestamps_.empty() ? timestamp_ : timestamps_[idx_];
227     size_t ts_sz = ts.size();
228     char* ptr = const_cast<char*>(key.data() + key.size() - ts_sz);
229     if (prot_info_ != nullptr) {
230       Slice old_ts(ptr, ts_sz), new_ts(ts.data(), ts_sz);
231       prot_info_->entries_[idx_].UpdateT(old_ts, new_ts);
232     }
233     memcpy(ptr, ts.data(), ts_sz);
234   }
235 
236   static const std::vector<Slice> kEmptyTimestampList;
237   const Slice timestamp_;
238   const std::vector<Slice>& timestamps_;
239   WriteBatch::ProtectionInfo* const prot_info_;
240   size_t idx_ = 0;
241 
242   // No copy or move.
243   TimestampAssigner(const TimestampAssigner&) = delete;
244   TimestampAssigner(TimestampAssigner&&) = delete;
245   TimestampAssigner& operator=(const TimestampAssigner&) = delete;
246   TimestampAssigner&& operator=(TimestampAssigner&&) = delete;
247 };
248 const std::vector<Slice> TimestampAssigner::kEmptyTimestampList;
249 
250 }  // anon namespace
251 
252 struct SavePoints {
253   std::stack<SavePoint, autovector<SavePoint>> stack;
254 };
255 
WriteBatch(size_t reserved_bytes,size_t max_bytes)256 WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes)
257     : content_flags_(0), max_bytes_(max_bytes), rep_(), timestamp_size_(0) {
258   rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader)
259                    ? reserved_bytes
260                    : WriteBatchInternal::kHeader);
261   rep_.resize(WriteBatchInternal::kHeader);
262 }
263 
WriteBatch(size_t reserved_bytes,size_t max_bytes,size_t ts_sz)264 WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes, size_t ts_sz)
265     : content_flags_(0), max_bytes_(max_bytes), rep_(), timestamp_size_(ts_sz) {
266   rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader) ?
267     reserved_bytes : WriteBatchInternal::kHeader);
268   rep_.resize(WriteBatchInternal::kHeader);
269 }
270 
WriteBatch(size_t reserved_bytes,size_t max_bytes,size_t ts_sz,size_t protection_bytes_per_key)271 WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes, size_t ts_sz,
272                        size_t protection_bytes_per_key)
273     : content_flags_(0), max_bytes_(max_bytes), rep_(), timestamp_size_(ts_sz) {
274   // Currently `protection_bytes_per_key` can only be enabled at 8 bytes per
275   // entry.
276   assert(protection_bytes_per_key == 0 || protection_bytes_per_key == 8);
277   if (protection_bytes_per_key != 0) {
278     prot_info_.reset(new WriteBatch::ProtectionInfo());
279   }
280   rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader)
281                    ? reserved_bytes
282                    : WriteBatchInternal::kHeader);
283   rep_.resize(WriteBatchInternal::kHeader);
284 }
285 
WriteBatch(const std::string & rep)286 WriteBatch::WriteBatch(const std::string& rep)
287     : content_flags_(ContentFlags::DEFERRED),
288       max_bytes_(0),
289       rep_(rep),
290       timestamp_size_(0) {}
291 
WriteBatch(std::string && rep)292 WriteBatch::WriteBatch(std::string&& rep)
293     : content_flags_(ContentFlags::DEFERRED),
294       max_bytes_(0),
295       rep_(std::move(rep)),
296       timestamp_size_(0) {}
297 
WriteBatch(const WriteBatch & src)298 WriteBatch::WriteBatch(const WriteBatch& src)
299     : wal_term_point_(src.wal_term_point_),
300       content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
301       max_bytes_(src.max_bytes_),
302       rep_(src.rep_),
303       timestamp_size_(src.timestamp_size_) {
304   if (src.save_points_ != nullptr) {
305     save_points_.reset(new SavePoints());
306     save_points_->stack = src.save_points_->stack;
307   }
308   if (src.prot_info_ != nullptr) {
309     prot_info_.reset(new WriteBatch::ProtectionInfo());
310     prot_info_->entries_ = src.prot_info_->entries_;
311   }
312 }
313 
WriteBatch(WriteBatch && src)314 WriteBatch::WriteBatch(WriteBatch&& src) noexcept
315     : save_points_(std::move(src.save_points_)),
316       wal_term_point_(std::move(src.wal_term_point_)),
317       content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
318       max_bytes_(src.max_bytes_),
319       prot_info_(std::move(src.prot_info_)),
320       rep_(std::move(src.rep_)),
321       timestamp_size_(src.timestamp_size_) {}
322 
operator =(const WriteBatch & src)323 WriteBatch& WriteBatch::operator=(const WriteBatch& src) {
324   if (&src != this) {
325     this->~WriteBatch();
326     new (this) WriteBatch(src);
327   }
328   return *this;
329 }
330 
operator =(WriteBatch && src)331 WriteBatch& WriteBatch::operator=(WriteBatch&& src) {
332   if (&src != this) {
333     this->~WriteBatch();
334     new (this) WriteBatch(std::move(src));
335   }
336   return *this;
337 }
338 
~WriteBatch()339 WriteBatch::~WriteBatch() { }
340 
~Handler()341 WriteBatch::Handler::~Handler() { }
342 
LogData(const Slice &)343 void WriteBatch::Handler::LogData(const Slice& /*blob*/) {
344   // If the user has not specified something to do with blobs, then we ignore
345   // them.
346 }
347 
Continue()348 bool WriteBatch::Handler::Continue() {
349   return true;
350 }
351 
Clear()352 void WriteBatch::Clear() {
353   rep_.clear();
354   rep_.resize(WriteBatchInternal::kHeader);
355 
356   content_flags_.store(0, std::memory_order_relaxed);
357 
358   if (save_points_ != nullptr) {
359     while (!save_points_->stack.empty()) {
360       save_points_->stack.pop();
361     }
362   }
363 
364   if (prot_info_ != nullptr) {
365     prot_info_->entries_.clear();
366   }
367   wal_term_point_.clear();
368 }
369 
Count() const370 uint32_t WriteBatch::Count() const { return WriteBatchInternal::Count(this); }
371 
ComputeContentFlags() const372 uint32_t WriteBatch::ComputeContentFlags() const {
373   auto rv = content_flags_.load(std::memory_order_relaxed);
374   if ((rv & ContentFlags::DEFERRED) != 0) {
375     BatchContentClassifier classifier;
376     // Should we handle status here?
377     Iterate(&classifier).PermitUncheckedError();
378     rv = classifier.content_flags;
379 
380     // this method is conceptually const, because it is performing a lazy
381     // computation that doesn't affect the abstract state of the batch.
382     // content_flags_ is marked mutable so that we can perform the
383     // following assignment
384     content_flags_.store(rv, std::memory_order_relaxed);
385   }
386   return rv;
387 }
388 
MarkWalTerminationPoint()389 void WriteBatch::MarkWalTerminationPoint() {
390   wal_term_point_.size = GetDataSize();
391   wal_term_point_.count = Count();
392   wal_term_point_.content_flags = content_flags_;
393 }
394 
GetProtectionBytesPerKey() const395 size_t WriteBatch::GetProtectionBytesPerKey() const {
396   if (prot_info_ != nullptr) {
397     return prot_info_->GetBytesPerKey();
398   }
399   return 0;
400 }
401 
HasPut() const402 bool WriteBatch::HasPut() const {
403   return (ComputeContentFlags() & ContentFlags::HAS_PUT) != 0;
404 }
405 
HasDelete() const406 bool WriteBatch::HasDelete() const {
407   return (ComputeContentFlags() & ContentFlags::HAS_DELETE) != 0;
408 }
409 
HasSingleDelete() const410 bool WriteBatch::HasSingleDelete() const {
411   return (ComputeContentFlags() & ContentFlags::HAS_SINGLE_DELETE) != 0;
412 }
413 
HasDeleteRange() const414 bool WriteBatch::HasDeleteRange() const {
415   return (ComputeContentFlags() & ContentFlags::HAS_DELETE_RANGE) != 0;
416 }
417 
HasMerge() const418 bool WriteBatch::HasMerge() const {
419   return (ComputeContentFlags() & ContentFlags::HAS_MERGE) != 0;
420 }
421 
ReadKeyFromWriteBatchEntry(Slice * input,Slice * key,bool cf_record)422 bool ReadKeyFromWriteBatchEntry(Slice* input, Slice* key, bool cf_record) {
423   assert(input != nullptr && key != nullptr);
424   // Skip tag byte
425   input->remove_prefix(1);
426 
427   if (cf_record) {
428     // Skip column_family bytes
429     uint32_t cf;
430     if (!GetVarint32(input, &cf)) {
431       return false;
432     }
433   }
434 
435   // Extract key
436   return GetLengthPrefixedSlice(input, key);
437 }
438 
HasBeginPrepare() const439 bool WriteBatch::HasBeginPrepare() const {
440   return (ComputeContentFlags() & ContentFlags::HAS_BEGIN_PREPARE) != 0;
441 }
442 
HasEndPrepare() const443 bool WriteBatch::HasEndPrepare() const {
444   return (ComputeContentFlags() & ContentFlags::HAS_END_PREPARE) != 0;
445 }
446 
HasCommit() const447 bool WriteBatch::HasCommit() const {
448   return (ComputeContentFlags() & ContentFlags::HAS_COMMIT) != 0;
449 }
450 
HasRollback() const451 bool WriteBatch::HasRollback() const {
452   return (ComputeContentFlags() & ContentFlags::HAS_ROLLBACK) != 0;
453 }
454 
ReadRecordFromWriteBatch(Slice * input,char * tag,uint32_t * column_family,Slice * key,Slice * value,Slice * blob,Slice * xid)455 Status ReadRecordFromWriteBatch(Slice* input, char* tag,
456                                 uint32_t* column_family, Slice* key,
457                                 Slice* value, Slice* blob, Slice* xid) {
458   assert(key != nullptr && value != nullptr);
459   *tag = (*input)[0];
460   input->remove_prefix(1);
461   *column_family = 0;  // default
462   switch (*tag) {
463     case kTypeColumnFamilyValue:
464       if (!GetVarint32(input, column_family)) {
465         return Status::Corruption("bad WriteBatch Put");
466       }
467       FALLTHROUGH_INTENDED;
468     case kTypeValue:
469       if (!GetLengthPrefixedSlice(input, key) ||
470           !GetLengthPrefixedSlice(input, value)) {
471         return Status::Corruption("bad WriteBatch Put");
472       }
473       break;
474     case kTypeColumnFamilyDeletion:
475     case kTypeColumnFamilySingleDeletion:
476       if (!GetVarint32(input, column_family)) {
477         return Status::Corruption("bad WriteBatch Delete");
478       }
479       FALLTHROUGH_INTENDED;
480     case kTypeDeletion:
481     case kTypeSingleDeletion:
482       if (!GetLengthPrefixedSlice(input, key)) {
483         return Status::Corruption("bad WriteBatch Delete");
484       }
485       break;
486     case kTypeColumnFamilyRangeDeletion:
487       if (!GetVarint32(input, column_family)) {
488         return Status::Corruption("bad WriteBatch DeleteRange");
489       }
490       FALLTHROUGH_INTENDED;
491     case kTypeRangeDeletion:
492       // for range delete, "key" is begin_key, "value" is end_key
493       if (!GetLengthPrefixedSlice(input, key) ||
494           !GetLengthPrefixedSlice(input, value)) {
495         return Status::Corruption("bad WriteBatch DeleteRange");
496       }
497       break;
498     case kTypeColumnFamilyMerge:
499       if (!GetVarint32(input, column_family)) {
500         return Status::Corruption("bad WriteBatch Merge");
501       }
502       FALLTHROUGH_INTENDED;
503     case kTypeMerge:
504       if (!GetLengthPrefixedSlice(input, key) ||
505           !GetLengthPrefixedSlice(input, value)) {
506         return Status::Corruption("bad WriteBatch Merge");
507       }
508       break;
509     case kTypeColumnFamilyBlobIndex:
510       if (!GetVarint32(input, column_family)) {
511         return Status::Corruption("bad WriteBatch BlobIndex");
512       }
513       FALLTHROUGH_INTENDED;
514     case kTypeBlobIndex:
515       if (!GetLengthPrefixedSlice(input, key) ||
516           !GetLengthPrefixedSlice(input, value)) {
517         return Status::Corruption("bad WriteBatch BlobIndex");
518       }
519       break;
520     case kTypeLogData:
521       assert(blob != nullptr);
522       if (!GetLengthPrefixedSlice(input, blob)) {
523         return Status::Corruption("bad WriteBatch Blob");
524       }
525       break;
526     case kTypeNoop:
527     case kTypeBeginPrepareXID:
528       // This indicates that the prepared batch is also persisted in the db.
529       // This is used in WritePreparedTxn
530     case kTypeBeginPersistedPrepareXID:
531       // This is used in WriteUnpreparedTxn
532     case kTypeBeginUnprepareXID:
533       break;
534     case kTypeEndPrepareXID:
535       if (!GetLengthPrefixedSlice(input, xid)) {
536         return Status::Corruption("bad EndPrepare XID");
537       }
538       break;
539     case kTypeCommitXID:
540       if (!GetLengthPrefixedSlice(input, xid)) {
541         return Status::Corruption("bad Commit XID");
542       }
543       break;
544     case kTypeRollbackXID:
545       if (!GetLengthPrefixedSlice(input, xid)) {
546         return Status::Corruption("bad Rollback XID");
547       }
548       break;
549     default:
550       return Status::Corruption("unknown WriteBatch tag");
551   }
552   return Status::OK();
553 }
554 
Iterate(Handler * handler) const555 Status WriteBatch::Iterate(Handler* handler) const {
556   if (rep_.size() < WriteBatchInternal::kHeader) {
557     return Status::Corruption("malformed WriteBatch (too small)");
558   }
559 
560   return WriteBatchInternal::Iterate(this, handler, WriteBatchInternal::kHeader,
561                                      rep_.size());
562 }
563 
Iterate(const WriteBatch * wb,WriteBatch::Handler * handler,size_t begin,size_t end)564 Status WriteBatchInternal::Iterate(const WriteBatch* wb,
565                                    WriteBatch::Handler* handler, size_t begin,
566                                    size_t end) {
567   if (begin > wb->rep_.size() || end > wb->rep_.size() || end < begin) {
568     return Status::Corruption("Invalid start/end bounds for Iterate");
569   }
570   assert(begin <= end);
571   Slice input(wb->rep_.data() + begin, static_cast<size_t>(end - begin));
572   bool whole_batch =
573       (begin == WriteBatchInternal::kHeader) && (end == wb->rep_.size());
574 
575   Slice key, value, blob, xid;
576   // Sometimes a sub-batch starts with a Noop. We want to exclude such Noops as
577   // the batch boundary symbols otherwise we would mis-count the number of
578   // batches. We do that by checking whether the accumulated batch is empty
579   // before seeing the next Noop.
580   bool empty_batch = true;
581   uint32_t found = 0;
582   Status s;
583   char tag = 0;
584   uint32_t column_family = 0;  // default
585   bool last_was_try_again = false;
586   bool handler_continue = true;
587   while (((s.ok() && !input.empty()) || UNLIKELY(s.IsTryAgain()))) {
588     handler_continue = handler->Continue();
589     if (!handler_continue) {
590       break;
591     }
592 
593     if (LIKELY(!s.IsTryAgain())) {
594       last_was_try_again = false;
595       tag = 0;
596       column_family = 0;  // default
597 
598       s = ReadRecordFromWriteBatch(&input, &tag, &column_family, &key, &value,
599                                    &blob, &xid);
600       if (!s.ok()) {
601         return s;
602       }
603     } else {
604       assert(s.IsTryAgain());
605       assert(!last_was_try_again);  // to detect infinite loop bugs
606       if (UNLIKELY(last_was_try_again)) {
607         return Status::Corruption(
608             "two consecutive TryAgain in WriteBatch handler; this is either a "
609             "software bug or data corruption.");
610       }
611       last_was_try_again = true;
612       s = Status::OK();
613     }
614 
615     switch (tag) {
616       case kTypeColumnFamilyValue:
617       case kTypeValue:
618         assert(wb->content_flags_.load(std::memory_order_relaxed) &
619                (ContentFlags::DEFERRED | ContentFlags::HAS_PUT));
620         s = handler->PutCF(column_family, key, value);
621         if (LIKELY(s.ok())) {
622           empty_batch = false;
623           found++;
624         }
625         break;
626       case kTypeColumnFamilyDeletion:
627       case kTypeDeletion:
628         assert(wb->content_flags_.load(std::memory_order_relaxed) &
629                (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE));
630         s = handler->DeleteCF(column_family, key);
631         if (LIKELY(s.ok())) {
632           empty_batch = false;
633           found++;
634         }
635         break;
636       case kTypeColumnFamilySingleDeletion:
637       case kTypeSingleDeletion:
638         assert(wb->content_flags_.load(std::memory_order_relaxed) &
639                (ContentFlags::DEFERRED | ContentFlags::HAS_SINGLE_DELETE));
640         s = handler->SingleDeleteCF(column_family, key);
641         if (LIKELY(s.ok())) {
642           empty_batch = false;
643           found++;
644         }
645         break;
646       case kTypeColumnFamilyRangeDeletion:
647       case kTypeRangeDeletion:
648         assert(wb->content_flags_.load(std::memory_order_relaxed) &
649                (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE_RANGE));
650         s = handler->DeleteRangeCF(column_family, key, value);
651         if (LIKELY(s.ok())) {
652           empty_batch = false;
653           found++;
654         }
655         break;
656       case kTypeColumnFamilyMerge:
657       case kTypeMerge:
658         assert(wb->content_flags_.load(std::memory_order_relaxed) &
659                (ContentFlags::DEFERRED | ContentFlags::HAS_MERGE));
660         s = handler->MergeCF(column_family, key, value);
661         if (LIKELY(s.ok())) {
662           empty_batch = false;
663           found++;
664         }
665         break;
666       case kTypeColumnFamilyBlobIndex:
667       case kTypeBlobIndex:
668         assert(wb->content_flags_.load(std::memory_order_relaxed) &
669                (ContentFlags::DEFERRED | ContentFlags::HAS_BLOB_INDEX));
670         s = handler->PutBlobIndexCF(column_family, key, value);
671         if (LIKELY(s.ok())) {
672           found++;
673         }
674         break;
675       case kTypeLogData:
676         handler->LogData(blob);
677         // A batch might have nothing but LogData. It is still a batch.
678         empty_batch = false;
679         break;
680       case kTypeBeginPrepareXID:
681         assert(wb->content_flags_.load(std::memory_order_relaxed) &
682                (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE));
683         s = handler->MarkBeginPrepare();
684         assert(s.ok());
685         empty_batch = false;
686         if (!handler->WriteAfterCommit()) {
687           s = Status::NotSupported(
688               "WriteCommitted txn tag when write_after_commit_ is disabled (in "
689               "WritePrepared/WriteUnprepared mode). If it is not due to "
690               "corruption, the WAL must be emptied before changing the "
691               "WritePolicy.");
692         }
693         if (handler->WriteBeforePrepare()) {
694           s = Status::NotSupported(
695               "WriteCommitted txn tag when write_before_prepare_ is enabled "
696               "(in WriteUnprepared mode). If it is not due to corruption, the "
697               "WAL must be emptied before changing the WritePolicy.");
698         }
699         break;
700       case kTypeBeginPersistedPrepareXID:
701         assert(wb->content_flags_.load(std::memory_order_relaxed) &
702                (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE));
703         s = handler->MarkBeginPrepare();
704         assert(s.ok());
705         empty_batch = false;
706         if (handler->WriteAfterCommit()) {
707           s = Status::NotSupported(
708               "WritePrepared/WriteUnprepared txn tag when write_after_commit_ "
709               "is enabled (in default WriteCommitted mode). If it is not due "
710               "to corruption, the WAL must be emptied before changing the "
711               "WritePolicy.");
712         }
713         break;
714       case kTypeBeginUnprepareXID:
715         assert(wb->content_flags_.load(std::memory_order_relaxed) &
716                (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_UNPREPARE));
717         s = handler->MarkBeginPrepare(true /* unprepared */);
718         assert(s.ok());
719         empty_batch = false;
720         if (handler->WriteAfterCommit()) {
721           s = Status::NotSupported(
722               "WriteUnprepared txn tag when write_after_commit_ is enabled (in "
723               "default WriteCommitted mode). If it is not due to corruption, "
724               "the WAL must be emptied before changing the WritePolicy.");
725         }
726         if (!handler->WriteBeforePrepare()) {
727           s = Status::NotSupported(
728               "WriteUnprepared txn tag when write_before_prepare_ is disabled "
729               "(in WriteCommitted/WritePrepared mode). If it is not due to "
730               "corruption, the WAL must be emptied before changing the "
731               "WritePolicy.");
732         }
733         break;
734       case kTypeEndPrepareXID:
735         assert(wb->content_flags_.load(std::memory_order_relaxed) &
736                (ContentFlags::DEFERRED | ContentFlags::HAS_END_PREPARE));
737         s = handler->MarkEndPrepare(xid);
738         assert(s.ok());
739         empty_batch = true;
740         break;
741       case kTypeCommitXID:
742         assert(wb->content_flags_.load(std::memory_order_relaxed) &
743                (ContentFlags::DEFERRED | ContentFlags::HAS_COMMIT));
744         s = handler->MarkCommit(xid);
745         assert(s.ok());
746         empty_batch = true;
747         break;
748       case kTypeRollbackXID:
749         assert(wb->content_flags_.load(std::memory_order_relaxed) &
750                (ContentFlags::DEFERRED | ContentFlags::HAS_ROLLBACK));
751         s = handler->MarkRollback(xid);
752         assert(s.ok());
753         empty_batch = true;
754         break;
755       case kTypeNoop:
756         s = handler->MarkNoop(empty_batch);
757         assert(s.ok());
758         empty_batch = true;
759         break;
760       default:
761         return Status::Corruption("unknown WriteBatch tag");
762     }
763   }
764   if (!s.ok()) {
765     return s;
766   }
767   if (handler_continue && whole_batch &&
768       found != WriteBatchInternal::Count(wb)) {
769     return Status::Corruption("WriteBatch has wrong count");
770   } else {
771     return Status::OK();
772   }
773 }
774 
IsLatestPersistentState(const WriteBatch * b)775 bool WriteBatchInternal::IsLatestPersistentState(const WriteBatch* b) {
776   return b->is_latest_persistent_state_;
777 }
778 
SetAsLastestPersistentState(WriteBatch * b)779 void WriteBatchInternal::SetAsLastestPersistentState(WriteBatch* b) {
780   b->is_latest_persistent_state_ = true;
781 }
782 
Count(const WriteBatch * b)783 uint32_t WriteBatchInternal::Count(const WriteBatch* b) {
784   return DecodeFixed32(b->rep_.data() + 8);
785 }
786 
SetCount(WriteBatch * b,uint32_t n)787 void WriteBatchInternal::SetCount(WriteBatch* b, uint32_t n) {
788   EncodeFixed32(&b->rep_[8], n);
789 }
790 
Sequence(const WriteBatch * b)791 SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) {
792   return SequenceNumber(DecodeFixed64(b->rep_.data()));
793 }
794 
SetSequence(WriteBatch * b,SequenceNumber seq)795 void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
796   EncodeFixed64(&b->rep_[0], seq);
797 }
798 
GetFirstOffset(WriteBatch *)799 size_t WriteBatchInternal::GetFirstOffset(WriteBatch* /*b*/) {
800   return WriteBatchInternal::kHeader;
801 }
802 
Put(WriteBatch * b,uint32_t column_family_id,const Slice & key,const Slice & value)803 Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
804                                const Slice& key, const Slice& value) {
805   if (key.size() > size_t{port::kMaxUint32}) {
806     return Status::InvalidArgument("key is too large");
807   }
808   if (value.size() > size_t{port::kMaxUint32}) {
809     return Status::InvalidArgument("value is too large");
810   }
811 
812   LocalSavePoint save(b);
813   WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
814   if (column_family_id == 0) {
815     b->rep_.push_back(static_cast<char>(kTypeValue));
816   } else {
817     b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
818     PutVarint32(&b->rep_, column_family_id);
819   }
820   std::string timestamp(b->timestamp_size_, '\0');
821   if (0 == b->timestamp_size_) {
822     PutLengthPrefixedSlice(&b->rep_, key);
823   } else {
824     PutVarint32(&b->rep_,
825                 static_cast<uint32_t>(key.size() + b->timestamp_size_));
826     b->rep_.append(key.data(), key.size());
827     b->rep_.append(timestamp);
828   }
829   PutLengthPrefixedSlice(&b->rep_, value);
830   b->content_flags_.store(
831       b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
832       std::memory_order_relaxed);
833   if (b->prot_info_ != nullptr) {
834     // Technically the optype could've been `kTypeColumnFamilyValue` with the
835     // CF ID encoded in the `WriteBatch`. That distinction is unimportant
836     // however since we verify CF ID is correct, as well as all other fields
837     // (a missing/extra encoded CF ID would corrupt another field). It is
838     // convenient to consolidate on `kTypeValue` here as that is what will be
839     // inserted into memtable.
840     b->prot_info_->entries_.emplace_back(
841         ProtectionInfo64()
842             .ProtectKVOT(key, value, kTypeValue, timestamp)
843             .ProtectC(column_family_id));
844   }
845   return save.commit();
846 }
847 
Put(ColumnFamilyHandle * column_family,const Slice & key,const Slice & value)848 Status WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key,
849                        const Slice& value) {
850   return WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key,
851                                  value);
852 }
853 
CheckSlicePartsLength(const SliceParts & key,const SliceParts & value)854 Status WriteBatchInternal::CheckSlicePartsLength(const SliceParts& key,
855                                                  const SliceParts& value) {
856   size_t total_key_bytes = 0;
857   for (int i = 0; i < key.num_parts; ++i) {
858     total_key_bytes += key.parts[i].size();
859   }
860   if (total_key_bytes >= size_t{port::kMaxUint32}) {
861     return Status::InvalidArgument("key is too large");
862   }
863 
864   size_t total_value_bytes = 0;
865   for (int i = 0; i < value.num_parts; ++i) {
866     total_value_bytes += value.parts[i].size();
867   }
868   if (total_value_bytes >= size_t{port::kMaxUint32}) {
869     return Status::InvalidArgument("value is too large");
870   }
871   return Status::OK();
872 }
873 
Put(WriteBatch * b,uint32_t column_family_id,const SliceParts & key,const SliceParts & value)874 Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
875                                const SliceParts& key, const SliceParts& value) {
876   Status s = CheckSlicePartsLength(key, value);
877   if (!s.ok()) {
878     return s;
879   }
880 
881   LocalSavePoint save(b);
882   WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
883   if (column_family_id == 0) {
884     b->rep_.push_back(static_cast<char>(kTypeValue));
885   } else {
886     b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
887     PutVarint32(&b->rep_, column_family_id);
888   }
889   std::string timestamp(b->timestamp_size_, '\0');
890   if (0 == b->timestamp_size_) {
891     PutLengthPrefixedSliceParts(&b->rep_, key);
892   } else {
893     PutLengthPrefixedSlicePartsWithPadding(&b->rep_, key, b->timestamp_size_);
894   }
895   PutLengthPrefixedSliceParts(&b->rep_, value);
896   b->content_flags_.store(
897       b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
898       std::memory_order_relaxed);
899   if (b->prot_info_ != nullptr) {
900     // See comment in first `WriteBatchInternal::Put()` overload concerning the
901     // `ValueType` argument passed to `ProtectKVOT()`.
902     b->prot_info_->entries_.emplace_back(
903         ProtectionInfo64()
904             .ProtectKVOT(key, value, kTypeValue, timestamp)
905             .ProtectC(column_family_id));
906   }
907   return save.commit();
908 }
909 
Put(ColumnFamilyHandle * column_family,const SliceParts & key,const SliceParts & value)910 Status WriteBatch::Put(ColumnFamilyHandle* column_family, const SliceParts& key,
911                        const SliceParts& value) {
912   return WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key,
913                                  value);
914 }
915 
InsertNoop(WriteBatch * b)916 Status WriteBatchInternal::InsertNoop(WriteBatch* b) {
917   b->rep_.push_back(static_cast<char>(kTypeNoop));
918   return Status::OK();
919 }
920 
MarkEndPrepare(WriteBatch * b,const Slice & xid,bool write_after_commit,bool unprepared_batch)921 Status WriteBatchInternal::MarkEndPrepare(WriteBatch* b, const Slice& xid,
922                                           bool write_after_commit,
923                                           bool unprepared_batch) {
924   // a manually constructed batch can only contain one prepare section
925   assert(b->rep_[12] == static_cast<char>(kTypeNoop));
926 
927   // all savepoints up to this point are cleared
928   if (b->save_points_ != nullptr) {
929     while (!b->save_points_->stack.empty()) {
930       b->save_points_->stack.pop();
931     }
932   }
933 
934   // rewrite noop as begin marker
935   b->rep_[12] = static_cast<char>(
936       write_after_commit ? kTypeBeginPrepareXID
937                          : (unprepared_batch ? kTypeBeginUnprepareXID
938                                              : kTypeBeginPersistedPrepareXID));
939   b->rep_.push_back(static_cast<char>(kTypeEndPrepareXID));
940   PutLengthPrefixedSlice(&b->rep_, xid);
941   b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
942                               ContentFlags::HAS_END_PREPARE |
943                               ContentFlags::HAS_BEGIN_PREPARE,
944                           std::memory_order_relaxed);
945   if (unprepared_batch) {
946     b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
947                                 ContentFlags::HAS_BEGIN_UNPREPARE,
948                             std::memory_order_relaxed);
949   }
950   return Status::OK();
951 }
952 
MarkCommit(WriteBatch * b,const Slice & xid)953 Status WriteBatchInternal::MarkCommit(WriteBatch* b, const Slice& xid) {
954   b->rep_.push_back(static_cast<char>(kTypeCommitXID));
955   PutLengthPrefixedSlice(&b->rep_, xid);
956   b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
957                               ContentFlags::HAS_COMMIT,
958                           std::memory_order_relaxed);
959   return Status::OK();
960 }
961 
MarkRollback(WriteBatch * b,const Slice & xid)962 Status WriteBatchInternal::MarkRollback(WriteBatch* b, const Slice& xid) {
963   b->rep_.push_back(static_cast<char>(kTypeRollbackXID));
964   PutLengthPrefixedSlice(&b->rep_, xid);
965   b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
966                               ContentFlags::HAS_ROLLBACK,
967                           std::memory_order_relaxed);
968   return Status::OK();
969 }
970 
Delete(WriteBatch * b,uint32_t column_family_id,const Slice & key)971 Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
972                                   const Slice& key) {
973   LocalSavePoint save(b);
974   WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
975   if (column_family_id == 0) {
976     b->rep_.push_back(static_cast<char>(kTypeDeletion));
977   } else {
978     b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion));
979     PutVarint32(&b->rep_, column_family_id);
980   }
981   std::string timestamp(b->timestamp_size_, '\0');
982   if (0 == b->timestamp_size_) {
983     PutLengthPrefixedSlice(&b->rep_, key);
984   } else {
985     PutVarint32(&b->rep_,
986                 static_cast<uint32_t>(key.size() + b->timestamp_size_));
987     b->rep_.append(key.data(), key.size());
988     b->rep_.append(timestamp);
989   }
990   b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
991                               ContentFlags::HAS_DELETE,
992                           std::memory_order_relaxed);
993   if (b->prot_info_ != nullptr) {
994     // See comment in first `WriteBatchInternal::Put()` overload concerning the
995     // `ValueType` argument passed to `ProtectKVOT()`.
996     b->prot_info_->entries_.emplace_back(
997         ProtectionInfo64()
998             .ProtectKVOT(key, "" /* value */, kTypeDeletion, timestamp)
999             .ProtectC(column_family_id));
1000   }
1001   return save.commit();
1002 }
1003 
Delete(ColumnFamilyHandle * column_family,const Slice & key)1004 Status WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key) {
1005   return WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family),
1006                                     key);
1007 }
1008 
Delete(WriteBatch * b,uint32_t column_family_id,const SliceParts & key)1009 Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
1010                                   const SliceParts& key) {
1011   LocalSavePoint save(b);
1012   WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1013   if (column_family_id == 0) {
1014     b->rep_.push_back(static_cast<char>(kTypeDeletion));
1015   } else {
1016     b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion));
1017     PutVarint32(&b->rep_, column_family_id);
1018   }
1019   std::string timestamp(b->timestamp_size_, '\0');
1020   if (0 == b->timestamp_size_) {
1021     PutLengthPrefixedSliceParts(&b->rep_, key);
1022   } else {
1023     PutLengthPrefixedSlicePartsWithPadding(&b->rep_, key, b->timestamp_size_);
1024   }
1025   b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1026                               ContentFlags::HAS_DELETE,
1027                           std::memory_order_relaxed);
1028   if (b->prot_info_ != nullptr) {
1029     // See comment in first `WriteBatchInternal::Put()` overload concerning the
1030     // `ValueType` argument passed to `ProtectKVOT()`.
1031     b->prot_info_->entries_.emplace_back(
1032         ProtectionInfo64()
1033             .ProtectKVOT(key,
1034                          SliceParts(nullptr /* _parts */, 0 /* _num_parts */),
1035                          kTypeDeletion, timestamp)
1036             .ProtectC(column_family_id));
1037   }
1038   return save.commit();
1039 }
1040 
Delete(ColumnFamilyHandle * column_family,const SliceParts & key)1041 Status WriteBatch::Delete(ColumnFamilyHandle* column_family,
1042                           const SliceParts& key) {
1043   return WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family),
1044                                     key);
1045 }
1046 
SingleDelete(WriteBatch * b,uint32_t column_family_id,const Slice & key)1047 Status WriteBatchInternal::SingleDelete(WriteBatch* b,
1048                                         uint32_t column_family_id,
1049                                         const Slice& key) {
1050   LocalSavePoint save(b);
1051   WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1052   if (column_family_id == 0) {
1053     b->rep_.push_back(static_cast<char>(kTypeSingleDeletion));
1054   } else {
1055     b->rep_.push_back(static_cast<char>(kTypeColumnFamilySingleDeletion));
1056     PutVarint32(&b->rep_, column_family_id);
1057   }
1058   PutLengthPrefixedSlice(&b->rep_, key);
1059   b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1060                               ContentFlags::HAS_SINGLE_DELETE,
1061                           std::memory_order_relaxed);
1062   if (b->prot_info_ != nullptr) {
1063     // See comment in first `WriteBatchInternal::Put()` overload concerning the
1064     // `ValueType` argument passed to `ProtectKVOT()`.
1065     b->prot_info_->entries_.emplace_back(ProtectionInfo64()
1066                                              .ProtectKVOT(key, "" /* value */,
1067                                                           kTypeSingleDeletion,
1068                                                           "" /* timestamp */)
1069                                              .ProtectC(column_family_id));
1070   }
1071   return save.commit();
1072 }
1073 
SingleDelete(ColumnFamilyHandle * column_family,const Slice & key)1074 Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family,
1075                                 const Slice& key) {
1076   return WriteBatchInternal::SingleDelete(
1077       this, GetColumnFamilyID(column_family), key);
1078 }
1079 
SingleDelete(WriteBatch * b,uint32_t column_family_id,const SliceParts & key)1080 Status WriteBatchInternal::SingleDelete(WriteBatch* b,
1081                                         uint32_t column_family_id,
1082                                         const SliceParts& key) {
1083   LocalSavePoint save(b);
1084   WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1085   if (column_family_id == 0) {
1086     b->rep_.push_back(static_cast<char>(kTypeSingleDeletion));
1087   } else {
1088     b->rep_.push_back(static_cast<char>(kTypeColumnFamilySingleDeletion));
1089     PutVarint32(&b->rep_, column_family_id);
1090   }
1091   PutLengthPrefixedSliceParts(&b->rep_, key);
1092   b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1093                               ContentFlags::HAS_SINGLE_DELETE,
1094                           std::memory_order_relaxed);
1095   if (b->prot_info_ != nullptr) {
1096     // See comment in first `WriteBatchInternal::Put()` overload concerning the
1097     // `ValueType` argument passed to `ProtectKVOT()`.
1098     b->prot_info_->entries_.emplace_back(
1099         ProtectionInfo64()
1100             .ProtectKVOT(key,
1101                          SliceParts(nullptr /* _parts */,
1102                                     0 /* _num_parts */) /* value */,
1103                          kTypeSingleDeletion, "" /* timestamp */)
1104             .ProtectC(column_family_id));
1105   }
1106   return save.commit();
1107 }
1108 
SingleDelete(ColumnFamilyHandle * column_family,const SliceParts & key)1109 Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family,
1110                                 const SliceParts& key) {
1111   return WriteBatchInternal::SingleDelete(
1112       this, GetColumnFamilyID(column_family), key);
1113 }
1114 
DeleteRange(WriteBatch * b,uint32_t column_family_id,const Slice & begin_key,const Slice & end_key)1115 Status WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id,
1116                                        const Slice& begin_key,
1117                                        const Slice& end_key) {
1118   LocalSavePoint save(b);
1119   WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1120   if (column_family_id == 0) {
1121     b->rep_.push_back(static_cast<char>(kTypeRangeDeletion));
1122   } else {
1123     b->rep_.push_back(static_cast<char>(kTypeColumnFamilyRangeDeletion));
1124     PutVarint32(&b->rep_, column_family_id);
1125   }
1126   PutLengthPrefixedSlice(&b->rep_, begin_key);
1127   PutLengthPrefixedSlice(&b->rep_, end_key);
1128   b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1129                               ContentFlags::HAS_DELETE_RANGE,
1130                           std::memory_order_relaxed);
1131   if (b->prot_info_ != nullptr) {
1132     // See comment in first `WriteBatchInternal::Put()` overload concerning the
1133     // `ValueType` argument passed to `ProtectKVOT()`.
1134     // In `DeleteRange()`, the end key is treated as the value.
1135     b->prot_info_->entries_.emplace_back(ProtectionInfo64()
1136                                              .ProtectKVOT(begin_key, end_key,
1137                                                           kTypeRangeDeletion,
1138                                                           "" /* timestamp */)
1139                                              .ProtectC(column_family_id));
1140   }
1141   return save.commit();
1142 }
1143 
DeleteRange(ColumnFamilyHandle * column_family,const Slice & begin_key,const Slice & end_key)1144 Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family,
1145                                const Slice& begin_key, const Slice& end_key) {
1146   return WriteBatchInternal::DeleteRange(this, GetColumnFamilyID(column_family),
1147                                          begin_key, end_key);
1148 }
1149 
DeleteRange(WriteBatch * b,uint32_t column_family_id,const SliceParts & begin_key,const SliceParts & end_key)1150 Status WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id,
1151                                        const SliceParts& begin_key,
1152                                        const SliceParts& end_key) {
1153   LocalSavePoint save(b);
1154   WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1155   if (column_family_id == 0) {
1156     b->rep_.push_back(static_cast<char>(kTypeRangeDeletion));
1157   } else {
1158     b->rep_.push_back(static_cast<char>(kTypeColumnFamilyRangeDeletion));
1159     PutVarint32(&b->rep_, column_family_id);
1160   }
1161   PutLengthPrefixedSliceParts(&b->rep_, begin_key);
1162   PutLengthPrefixedSliceParts(&b->rep_, end_key);
1163   b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1164                               ContentFlags::HAS_DELETE_RANGE,
1165                           std::memory_order_relaxed);
1166   if (b->prot_info_ != nullptr) {
1167     // See comment in first `WriteBatchInternal::Put()` overload concerning the
1168     // `ValueType` argument passed to `ProtectKVOT()`.
1169     // In `DeleteRange()`, the end key is treated as the value.
1170     b->prot_info_->entries_.emplace_back(ProtectionInfo64()
1171                                              .ProtectKVOT(begin_key, end_key,
1172                                                           kTypeRangeDeletion,
1173                                                           "" /* timestamp */)
1174                                              .ProtectC(column_family_id));
1175   }
1176   return save.commit();
1177 }
1178 
DeleteRange(ColumnFamilyHandle * column_family,const SliceParts & begin_key,const SliceParts & end_key)1179 Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family,
1180                                const SliceParts& begin_key,
1181                                const SliceParts& end_key) {
1182   return WriteBatchInternal::DeleteRange(this, GetColumnFamilyID(column_family),
1183                                          begin_key, end_key);
1184 }
1185 
Merge(WriteBatch * b,uint32_t column_family_id,const Slice & key,const Slice & value)1186 Status WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
1187                                  const Slice& key, const Slice& value) {
1188   if (key.size() > size_t{port::kMaxUint32}) {
1189     return Status::InvalidArgument("key is too large");
1190   }
1191   if (value.size() > size_t{port::kMaxUint32}) {
1192     return Status::InvalidArgument("value is too large");
1193   }
1194 
1195   LocalSavePoint save(b);
1196   WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1197   if (column_family_id == 0) {
1198     b->rep_.push_back(static_cast<char>(kTypeMerge));
1199   } else {
1200     b->rep_.push_back(static_cast<char>(kTypeColumnFamilyMerge));
1201     PutVarint32(&b->rep_, column_family_id);
1202   }
1203   PutLengthPrefixedSlice(&b->rep_, key);
1204   PutLengthPrefixedSlice(&b->rep_, value);
1205   b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1206                               ContentFlags::HAS_MERGE,
1207                           std::memory_order_relaxed);
1208   if (b->prot_info_ != nullptr) {
1209     // See comment in first `WriteBatchInternal::Put()` overload concerning the
1210     // `ValueType` argument passed to `ProtectKVOT()`.
1211     b->prot_info_->entries_.emplace_back(
1212         ProtectionInfo64()
1213             .ProtectKVOT(key, value, kTypeMerge, "" /* timestamp */)
1214             .ProtectC(column_family_id));
1215   }
1216   return save.commit();
1217 }
1218 
Merge(ColumnFamilyHandle * column_family,const Slice & key,const Slice & value)1219 Status WriteBatch::Merge(ColumnFamilyHandle* column_family, const Slice& key,
1220                          const Slice& value) {
1221   return WriteBatchInternal::Merge(this, GetColumnFamilyID(column_family), key,
1222                                    value);
1223 }
1224 
Merge(WriteBatch * b,uint32_t column_family_id,const SliceParts & key,const SliceParts & value)1225 Status WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
1226                                  const SliceParts& key,
1227                                  const SliceParts& value) {
1228   Status s = CheckSlicePartsLength(key, value);
1229   if (!s.ok()) {
1230     return s;
1231   }
1232 
1233   LocalSavePoint save(b);
1234   WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1235   if (column_family_id == 0) {
1236     b->rep_.push_back(static_cast<char>(kTypeMerge));
1237   } else {
1238     b->rep_.push_back(static_cast<char>(kTypeColumnFamilyMerge));
1239     PutVarint32(&b->rep_, column_family_id);
1240   }
1241   PutLengthPrefixedSliceParts(&b->rep_, key);
1242   PutLengthPrefixedSliceParts(&b->rep_, value);
1243   b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1244                               ContentFlags::HAS_MERGE,
1245                           std::memory_order_relaxed);
1246   if (b->prot_info_ != nullptr) {
1247     // See comment in first `WriteBatchInternal::Put()` overload concerning the
1248     // `ValueType` argument passed to `ProtectKVOT()`.
1249     b->prot_info_->entries_.emplace_back(
1250         ProtectionInfo64()
1251             .ProtectKVOT(key, value, kTypeMerge, "" /* timestamp */)
1252             .ProtectC(column_family_id));
1253   }
1254   return save.commit();
1255 }
1256 
Merge(ColumnFamilyHandle * column_family,const SliceParts & key,const SliceParts & value)1257 Status WriteBatch::Merge(ColumnFamilyHandle* column_family,
1258                          const SliceParts& key, const SliceParts& value) {
1259   return WriteBatchInternal::Merge(this, GetColumnFamilyID(column_family), key,
1260                                    value);
1261 }
1262 
PutBlobIndex(WriteBatch * b,uint32_t column_family_id,const Slice & key,const Slice & value)1263 Status WriteBatchInternal::PutBlobIndex(WriteBatch* b,
1264                                         uint32_t column_family_id,
1265                                         const Slice& key, const Slice& value) {
1266   LocalSavePoint save(b);
1267   WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1268   if (column_family_id == 0) {
1269     b->rep_.push_back(static_cast<char>(kTypeBlobIndex));
1270   } else {
1271     b->rep_.push_back(static_cast<char>(kTypeColumnFamilyBlobIndex));
1272     PutVarint32(&b->rep_, column_family_id);
1273   }
1274   PutLengthPrefixedSlice(&b->rep_, key);
1275   PutLengthPrefixedSlice(&b->rep_, value);
1276   b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1277                               ContentFlags::HAS_BLOB_INDEX,
1278                           std::memory_order_relaxed);
1279   if (b->prot_info_ != nullptr) {
1280     // See comment in first `WriteBatchInternal::Put()` overload concerning the
1281     // `ValueType` argument passed to `ProtectKVOT()`.
1282     b->prot_info_->entries_.emplace_back(
1283         ProtectionInfo64()
1284             .ProtectKVOT(key, value, kTypeBlobIndex, "" /* timestamp */)
1285             .ProtectC(column_family_id));
1286   }
1287   return save.commit();
1288 }
1289 
PutLogData(const Slice & blob)1290 Status WriteBatch::PutLogData(const Slice& blob) {
1291   LocalSavePoint save(this);
1292   rep_.push_back(static_cast<char>(kTypeLogData));
1293   PutLengthPrefixedSlice(&rep_, blob);
1294   return save.commit();
1295 }
1296 
SetSavePoint()1297 void WriteBatch::SetSavePoint() {
1298   if (save_points_ == nullptr) {
1299     save_points_.reset(new SavePoints());
1300   }
1301   // Record length and count of current batch of writes.
1302   save_points_->stack.push(SavePoint(
1303       GetDataSize(), Count(), content_flags_.load(std::memory_order_relaxed)));
1304 }
1305 
RollbackToSavePoint()1306 Status WriteBatch::RollbackToSavePoint() {
1307   if (save_points_ == nullptr || save_points_->stack.size() == 0) {
1308     return Status::NotFound();
1309   }
1310 
1311   // Pop the most recent savepoint off the stack
1312   SavePoint savepoint = save_points_->stack.top();
1313   save_points_->stack.pop();
1314 
1315   assert(savepoint.size <= rep_.size());
1316   assert(static_cast<uint32_t>(savepoint.count) <= Count());
1317 
1318   if (savepoint.size == rep_.size()) {
1319     // No changes to rollback
1320   } else if (savepoint.size == 0) {
1321     // Rollback everything
1322     Clear();
1323   } else {
1324     rep_.resize(savepoint.size);
1325     if (prot_info_ != nullptr) {
1326       prot_info_->entries_.resize(savepoint.count);
1327     }
1328     WriteBatchInternal::SetCount(this, savepoint.count);
1329     content_flags_.store(savepoint.content_flags, std::memory_order_relaxed);
1330   }
1331 
1332   return Status::OK();
1333 }
1334 
PopSavePoint()1335 Status WriteBatch::PopSavePoint() {
1336   if (save_points_ == nullptr || save_points_->stack.size() == 0) {
1337     return Status::NotFound();
1338   }
1339 
1340   // Pop the most recent savepoint off the stack
1341   save_points_->stack.pop();
1342 
1343   return Status::OK();
1344 }
1345 
AssignTimestamp(const Slice & ts)1346 Status WriteBatch::AssignTimestamp(const Slice& ts) {
1347   TimestampAssigner ts_assigner(ts, prot_info_.get());
1348   return Iterate(&ts_assigner);
1349 }
1350 
AssignTimestamps(const std::vector<Slice> & ts_list)1351 Status WriteBatch::AssignTimestamps(const std::vector<Slice>& ts_list) {
1352   TimestampAssigner ts_assigner(ts_list, prot_info_.get());
1353   return Iterate(&ts_assigner);
1354 }
1355 
1356 class MemTableInserter : public WriteBatch::Handler {
1357 
1358   SequenceNumber sequence_;
1359   ColumnFamilyMemTables* const cf_mems_;
1360   FlushScheduler* const flush_scheduler_;
1361   TrimHistoryScheduler* const trim_history_scheduler_;
1362   const bool ignore_missing_column_families_;
1363   const uint64_t recovering_log_number_;
1364   // log number that all Memtables inserted into should reference
1365   uint64_t log_number_ref_;
1366   DBImpl* db_;
1367   const bool concurrent_memtable_writes_;
1368   bool       post_info_created_;
1369   const WriteBatch::ProtectionInfo* prot_info_;
1370   size_t prot_info_idx_;
1371 
1372   bool* has_valid_writes_;
1373   // On some (!) platforms just default creating
1374   // a map is too expensive in the Write() path as they
1375   // cause memory allocations though unused.
1376   // Make creation optional but do not incur
1377   // std::unique_ptr additional allocation
1378   using MemPostInfoMap = std::map<MemTable*, MemTablePostProcessInfo>;
1379   using PostMapType = std::aligned_storage<sizeof(MemPostInfoMap)>::type;
1380   PostMapType mem_post_info_map_;
1381   // current recovered transaction we are rebuilding (recovery)
1382   WriteBatch* rebuilding_trx_;
1383   SequenceNumber rebuilding_trx_seq_;
1384   // Increase seq number once per each write batch. Otherwise increase it once
1385   // per key.
1386   bool seq_per_batch_;
1387   // Whether the memtable write will be done only after the commit
1388   bool write_after_commit_;
1389   // Whether memtable write can be done before prepare
1390   bool write_before_prepare_;
1391   // Whether this batch was unprepared or not
1392   bool unprepared_batch_;
1393   using DupDetector = std::aligned_storage<sizeof(DuplicateDetector)>::type;
1394   DupDetector       duplicate_detector_;
1395   bool              dup_dectector_on_;
1396 
1397   bool hint_per_batch_;
1398   bool hint_created_;
1399   // Hints for this batch
1400   using HintMap = std::unordered_map<MemTable*, void*>;
1401   using HintMapType = std::aligned_storage<sizeof(HintMap)>::type;
1402   HintMapType hint_;
1403 
GetHintMap()1404   HintMap& GetHintMap() {
1405     assert(hint_per_batch_);
1406     if (!hint_created_) {
1407       new (&hint_) HintMap();
1408       hint_created_ = true;
1409     }
1410     return *reinterpret_cast<HintMap*>(&hint_);
1411   }
1412 
GetPostMap()1413   MemPostInfoMap& GetPostMap() {
1414     assert(concurrent_memtable_writes_);
1415     if(!post_info_created_) {
1416       new (&mem_post_info_map_) MemPostInfoMap();
1417       post_info_created_ = true;
1418     }
1419     return *reinterpret_cast<MemPostInfoMap*>(&mem_post_info_map_);
1420   }
1421 
IsDuplicateKeySeq(uint32_t column_family_id,const Slice & key)1422   bool IsDuplicateKeySeq(uint32_t column_family_id, const Slice& key) {
1423     assert(!write_after_commit_);
1424     assert(rebuilding_trx_ != nullptr);
1425     if (!dup_dectector_on_) {
1426       new (&duplicate_detector_) DuplicateDetector(db_);
1427       dup_dectector_on_ = true;
1428     }
1429     return reinterpret_cast<DuplicateDetector*>
1430       (&duplicate_detector_)->IsDuplicateKeySeq(column_family_id, key, sequence_);
1431   }
1432 
NextProtectionInfo()1433   const ProtectionInfoKVOTC64* NextProtectionInfo() {
1434     const ProtectionInfoKVOTC64* res = nullptr;
1435     if (prot_info_ != nullptr) {
1436       assert(prot_info_idx_ < prot_info_->entries_.size());
1437       res = &prot_info_->entries_[prot_info_idx_];
1438       ++prot_info_idx_;
1439     }
1440     return res;
1441   }
1442 
1443  protected:
WriteBeforePrepare() const1444   bool WriteBeforePrepare() const override { return write_before_prepare_; }
WriteAfterCommit() const1445   bool WriteAfterCommit() const override { return write_after_commit_; }
1446 
1447  public:
1448   // cf_mems should not be shared with concurrent inserters
MemTableInserter(SequenceNumber _sequence,ColumnFamilyMemTables * cf_mems,FlushScheduler * flush_scheduler,TrimHistoryScheduler * trim_history_scheduler,bool ignore_missing_column_families,uint64_t recovering_log_number,DB * db,bool concurrent_memtable_writes,const WriteBatch::ProtectionInfo * prot_info,bool * has_valid_writes=nullptr,bool seq_per_batch=false,bool batch_per_txn=true,bool hint_per_batch=false)1449   MemTableInserter(SequenceNumber _sequence, ColumnFamilyMemTables* cf_mems,
1450                    FlushScheduler* flush_scheduler,
1451                    TrimHistoryScheduler* trim_history_scheduler,
1452                    bool ignore_missing_column_families,
1453                    uint64_t recovering_log_number, DB* db,
1454                    bool concurrent_memtable_writes,
1455                    const WriteBatch::ProtectionInfo* prot_info,
1456                    bool* has_valid_writes = nullptr, bool seq_per_batch = false,
1457                    bool batch_per_txn = true, bool hint_per_batch = false)
1458       : sequence_(_sequence),
1459         cf_mems_(cf_mems),
1460         flush_scheduler_(flush_scheduler),
1461         trim_history_scheduler_(trim_history_scheduler),
1462         ignore_missing_column_families_(ignore_missing_column_families),
1463         recovering_log_number_(recovering_log_number),
1464         log_number_ref_(0),
1465         db_(static_cast_with_check<DBImpl>(db)),
1466         concurrent_memtable_writes_(concurrent_memtable_writes),
1467         post_info_created_(false),
1468         prot_info_(prot_info),
1469         prot_info_idx_(0),
1470         has_valid_writes_(has_valid_writes),
1471         rebuilding_trx_(nullptr),
1472         rebuilding_trx_seq_(0),
1473         seq_per_batch_(seq_per_batch),
1474         // Write after commit currently uses one seq per key (instead of per
1475         // batch). So seq_per_batch being false indicates write_after_commit
1476         // approach.
1477         write_after_commit_(!seq_per_batch),
1478         // WriteUnprepared can write WriteBatches per transaction, so
1479         // batch_per_txn being false indicates write_before_prepare.
1480         write_before_prepare_(!batch_per_txn),
1481         unprepared_batch_(false),
1482         duplicate_detector_(),
1483         dup_dectector_on_(false),
1484         hint_per_batch_(hint_per_batch),
1485         hint_created_(false) {
1486     assert(cf_mems_);
1487   }
1488 
~MemTableInserter()1489   ~MemTableInserter() override {
1490     if (dup_dectector_on_) {
1491       reinterpret_cast<DuplicateDetector*>
1492         (&duplicate_detector_)->~DuplicateDetector();
1493     }
1494     if (post_info_created_) {
1495       reinterpret_cast<MemPostInfoMap*>
1496         (&mem_post_info_map_)->~MemPostInfoMap();
1497     }
1498     if (hint_created_) {
1499       for (auto iter : GetHintMap()) {
1500         delete[] reinterpret_cast<char*>(iter.second);
1501       }
1502       reinterpret_cast<HintMap*>(&hint_)->~HintMap();
1503     }
1504     delete rebuilding_trx_;
1505   }
1506 
1507   MemTableInserter(const MemTableInserter&) = delete;
1508   MemTableInserter& operator=(const MemTableInserter&) = delete;
1509 
1510   // The batch seq is regularly restarted; In normal mode it is set when
1511   // MemTableInserter is constructed in the write thread and in recovery mode it
1512   // is set when a batch, which is tagged with seq, is read from the WAL.
1513   // Within a sequenced batch, which could be a merge of multiple batches, we
1514   // have two policies to advance the seq: i) seq_per_key (default) and ii)
1515   // seq_per_batch. To implement the latter we need to mark the boundary between
1516   // the individual batches. The approach is this: 1) Use the terminating
1517   // markers to indicate the boundary (kTypeEndPrepareXID, kTypeCommitXID,
1518   // kTypeRollbackXID) 2) Terminate a batch with kTypeNoop in the absence of a
1519   // natural boundary marker.
MaybeAdvanceSeq(bool batch_boundry=false)1520   void MaybeAdvanceSeq(bool batch_boundry = false) {
1521     if (batch_boundry == seq_per_batch_) {
1522       sequence_++;
1523     }
1524   }
1525 
set_log_number_ref(uint64_t log)1526   void set_log_number_ref(uint64_t log) { log_number_ref_ = log; }
set_prot_info(const WriteBatch::ProtectionInfo * prot_info)1527   void set_prot_info(const WriteBatch::ProtectionInfo* prot_info) {
1528     prot_info_ = prot_info;
1529     prot_info_idx_ = 0;
1530   }
1531 
sequence() const1532   SequenceNumber sequence() const { return sequence_; }
1533 
PostProcess()1534   void PostProcess() {
1535     assert(concurrent_memtable_writes_);
1536     // If post info was not created there is nothing
1537     // to process and no need to create on demand
1538     if(post_info_created_) {
1539       for (auto& pair : GetPostMap()) {
1540         pair.first->BatchPostProcess(pair.second);
1541       }
1542     }
1543   }
1544 
SeekToColumnFamily(uint32_t column_family_id,Status * s)1545   bool SeekToColumnFamily(uint32_t column_family_id, Status* s) {
1546     // If we are in a concurrent mode, it is the caller's responsibility
1547     // to clone the original ColumnFamilyMemTables so that each thread
1548     // has its own instance.  Otherwise, it must be guaranteed that there
1549     // is no concurrent access
1550     bool found = cf_mems_->Seek(column_family_id);
1551     if (!found) {
1552       if (ignore_missing_column_families_) {
1553         *s = Status::OK();
1554       } else {
1555         *s = Status::InvalidArgument(
1556             "Invalid column family specified in write batch");
1557       }
1558       return false;
1559     }
1560     if (recovering_log_number_ != 0 &&
1561         recovering_log_number_ < cf_mems_->GetLogNumber()) {
1562       // This is true only in recovery environment (recovering_log_number_ is
1563       // always 0 in
1564       // non-recovery, regular write code-path)
1565       // * If recovering_log_number_ < cf_mems_->GetLogNumber(), this means that
1566       // column
1567       // family already contains updates from this log. We can't apply updates
1568       // twice because of update-in-place or merge workloads -- ignore the
1569       // update
1570       *s = Status::OK();
1571       return false;
1572     }
1573 
1574     if (has_valid_writes_ != nullptr) {
1575       *has_valid_writes_ = true;
1576     }
1577 
1578     if (log_number_ref_ > 0) {
1579       cf_mems_->GetMemTable()->RefLogContainingPrepSection(log_number_ref_);
1580     }
1581 
1582     return true;
1583   }
1584 
PutCFImpl(uint32_t column_family_id,const Slice & key,const Slice & value,ValueType value_type,const ProtectionInfoKVOTS64 * kv_prot_info)1585   Status PutCFImpl(uint32_t column_family_id, const Slice& key,
1586                    const Slice& value, ValueType value_type,
1587                    const ProtectionInfoKVOTS64* kv_prot_info) {
1588     // optimize for non-recovery mode
1589     if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
1590       // TODO(ajkr): propagate `ProtectionInfoKVOTS64`.
1591       return WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key,
1592                                      value);
1593       // else insert the values to the memtable right away
1594     }
1595 
1596     Status ret_status;
1597     if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
1598       if (ret_status.ok() && rebuilding_trx_ != nullptr) {
1599         assert(!write_after_commit_);
1600         // The CF is probably flushed and hence no need for insert but we still
1601         // need to keep track of the keys for upcoming rollback/commit.
1602         // TODO(ajkr): propagate `ProtectionInfoKVOTS64`.
1603         ret_status = WriteBatchInternal::Put(rebuilding_trx_, column_family_id,
1604                                              key, value);
1605         if (ret_status.ok()) {
1606           MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, key));
1607         }
1608       } else if (ret_status.ok()) {
1609         MaybeAdvanceSeq(false /* batch_boundary */);
1610       }
1611       return ret_status;
1612     }
1613     assert(ret_status.ok());
1614 
1615     MemTable* mem = cf_mems_->GetMemTable();
1616     auto* moptions = mem->GetImmutableMemTableOptions();
1617     // inplace_update_support is inconsistent with snapshots, and therefore with
1618     // any kind of transactions including the ones that use seq_per_batch
1619     assert(!seq_per_batch_ || !moptions->inplace_update_support);
1620     if (!moptions->inplace_update_support) {
1621       ret_status =
1622           mem->Add(sequence_, value_type, key, value, kv_prot_info,
1623                    concurrent_memtable_writes_, get_post_process_info(mem),
1624                    hint_per_batch_ ? &GetHintMap()[mem] : nullptr);
1625     } else if (moptions->inplace_callback == nullptr) {
1626       assert(!concurrent_memtable_writes_);
1627       ret_status = mem->Update(sequence_, key, value, kv_prot_info);
1628     } else {
1629       assert(!concurrent_memtable_writes_);
1630       ret_status = mem->UpdateCallback(sequence_, key, value, kv_prot_info);
1631       if (ret_status.IsNotFound()) {
1632         // key not found in memtable. Do sst get, update, add
1633         SnapshotImpl read_from_snapshot;
1634         read_from_snapshot.number_ = sequence_;
1635         ReadOptions ropts;
1636         // it's going to be overwritten for sure, so no point caching data block
1637         // containing the old version
1638         ropts.fill_cache = false;
1639         ropts.snapshot = &read_from_snapshot;
1640 
1641         std::string prev_value;
1642         std::string merged_value;
1643 
1644         auto cf_handle = cf_mems_->GetColumnFamilyHandle();
1645         Status get_status = Status::NotSupported();
1646         if (db_ != nullptr && recovering_log_number_ == 0) {
1647           if (cf_handle == nullptr) {
1648             cf_handle = db_->DefaultColumnFamily();
1649           }
1650           get_status = db_->Get(ropts, cf_handle, key, &prev_value);
1651         }
1652         // Intentionally overwrites the `NotFound` in `ret_status`.
1653         if (!get_status.ok() && !get_status.IsNotFound()) {
1654           ret_status = get_status;
1655         } else {
1656           ret_status = Status::OK();
1657         }
1658         if (ret_status.ok()) {
1659           UpdateStatus update_status;
1660           char* prev_buffer = const_cast<char*>(prev_value.c_str());
1661           uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
1662           if (get_status.ok()) {
1663             update_status = moptions->inplace_callback(prev_buffer, &prev_size,
1664                                                        value, &merged_value);
1665           } else {
1666             update_status = moptions->inplace_callback(
1667                 nullptr /* existing_value */, nullptr /* existing_value_size */,
1668                 value, &merged_value);
1669           }
1670           if (update_status == UpdateStatus::UPDATED_INPLACE) {
1671             assert(get_status.ok());
1672             if (kv_prot_info != nullptr) {
1673               ProtectionInfoKVOTS64 updated_kv_prot_info(*kv_prot_info);
1674               updated_kv_prot_info.UpdateV(value,
1675                                            Slice(prev_buffer, prev_size));
1676               // prev_value is updated in-place with final value.
1677               ret_status = mem->Add(sequence_, value_type, key,
1678                                     Slice(prev_buffer, prev_size),
1679                                     &updated_kv_prot_info);
1680             } else {
1681               ret_status = mem->Add(sequence_, value_type, key,
1682                                     Slice(prev_buffer, prev_size),
1683                                     nullptr /* kv_prot_info */);
1684             }
1685             if (ret_status.ok()) {
1686               RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
1687             }
1688           } else if (update_status == UpdateStatus::UPDATED) {
1689             if (kv_prot_info != nullptr) {
1690               ProtectionInfoKVOTS64 updated_kv_prot_info(*kv_prot_info);
1691               updated_kv_prot_info.UpdateV(value, merged_value);
1692               // merged_value contains the final value.
1693               ret_status = mem->Add(sequence_, value_type, key,
1694                                     Slice(merged_value), &updated_kv_prot_info);
1695             } else {
1696               // merged_value contains the final value.
1697               ret_status =
1698                   mem->Add(sequence_, value_type, key, Slice(merged_value),
1699                            nullptr /* kv_prot_info */);
1700             }
1701             if (ret_status.ok()) {
1702               RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
1703             }
1704           }
1705         }
1706       }
1707     }
1708     if (UNLIKELY(ret_status.IsTryAgain())) {
1709       assert(seq_per_batch_);
1710       const bool kBatchBoundary = true;
1711       MaybeAdvanceSeq(kBatchBoundary);
1712     } else if (ret_status.ok()) {
1713       MaybeAdvanceSeq();
1714       CheckMemtableFull();
1715     }
1716     // optimize for non-recovery mode
1717     // If `ret_status` is `TryAgain` then the next (successful) try will add
1718     // the key to the rebuilding transaction object. If `ret_status` is
1719     // another non-OK `Status`, then the `rebuilding_trx_` will be thrown
1720     // away. So we only need to add to it when `ret_status.ok()`.
1721     if (UNLIKELY(ret_status.ok() && rebuilding_trx_ != nullptr)) {
1722       assert(!write_after_commit_);
1723       // TODO(ajkr): propagate `ProtectionInfoKVOTS64`.
1724       ret_status = WriteBatchInternal::Put(rebuilding_trx_, column_family_id,
1725                                            key, value);
1726     }
1727     return ret_status;
1728   }
1729 
PutCF(uint32_t column_family_id,const Slice & key,const Slice & value)1730   Status PutCF(uint32_t column_family_id, const Slice& key,
1731                const Slice& value) override {
1732     const auto* kv_prot_info = NextProtectionInfo();
1733     if (kv_prot_info != nullptr) {
1734       // Memtable needs seqno, doesn't need CF ID
1735       auto mem_kv_prot_info =
1736           kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
1737       return PutCFImpl(column_family_id, key, value, kTypeValue,
1738                        &mem_kv_prot_info);
1739     }
1740     return PutCFImpl(column_family_id, key, value, kTypeValue,
1741                      nullptr /* kv_prot_info */);
1742   }
1743 
DeleteImpl(uint32_t,const Slice & key,const Slice & value,ValueType delete_type,const ProtectionInfoKVOTS64 * kv_prot_info)1744   Status DeleteImpl(uint32_t /*column_family_id*/, const Slice& key,
1745                     const Slice& value, ValueType delete_type,
1746                     const ProtectionInfoKVOTS64* kv_prot_info) {
1747     Status ret_status;
1748     MemTable* mem = cf_mems_->GetMemTable();
1749     ret_status =
1750         mem->Add(sequence_, delete_type, key, value, kv_prot_info,
1751                  concurrent_memtable_writes_, get_post_process_info(mem),
1752                  hint_per_batch_ ? &GetHintMap()[mem] : nullptr);
1753     if (UNLIKELY(ret_status.IsTryAgain())) {
1754       assert(seq_per_batch_);
1755       const bool kBatchBoundary = true;
1756       MaybeAdvanceSeq(kBatchBoundary);
1757     } else if (ret_status.ok()) {
1758       MaybeAdvanceSeq();
1759       CheckMemtableFull();
1760     }
1761     return ret_status;
1762   }
1763 
DeleteCF(uint32_t column_family_id,const Slice & key)1764   Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
1765     const auto* kv_prot_info = NextProtectionInfo();
1766     // optimize for non-recovery mode
1767     if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
1768       // TODO(ajkr): propagate `ProtectionInfoKVOTS64`.
1769       return WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
1770       // else insert the values to the memtable right away
1771     }
1772 
1773     Status ret_status;
1774     if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
1775       if (ret_status.ok() && rebuilding_trx_ != nullptr) {
1776         assert(!write_after_commit_);
1777         // The CF is probably flushed and hence no need for insert but we still
1778         // need to keep track of the keys for upcoming rollback/commit.
1779         // TODO(ajkr): propagate `ProtectionInfoKVOTS64`.
1780         ret_status =
1781             WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
1782         if (ret_status.ok()) {
1783           MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, key));
1784         }
1785       } else if (ret_status.ok()) {
1786         MaybeAdvanceSeq(false /* batch_boundary */);
1787       }
1788       return ret_status;
1789     }
1790 
1791     ColumnFamilyData* cfd = cf_mems_->current();
1792     assert(!cfd || cfd->user_comparator());
1793     const size_t ts_sz = (cfd && cfd->user_comparator())
1794                              ? cfd->user_comparator()->timestamp_size()
1795                              : 0;
1796     const ValueType delete_type =
1797         (0 == ts_sz) ? kTypeDeletion : kTypeDeletionWithTimestamp;
1798     if (kv_prot_info != nullptr) {
1799       auto mem_kv_prot_info =
1800           kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
1801       mem_kv_prot_info.UpdateO(kTypeDeletion, delete_type);
1802       ret_status = DeleteImpl(column_family_id, key, Slice(), delete_type,
1803                               &mem_kv_prot_info);
1804     } else {
1805       ret_status = DeleteImpl(column_family_id, key, Slice(), delete_type,
1806                               nullptr /* kv_prot_info */);
1807     }
1808     // optimize for non-recovery mode
1809     // If `ret_status` is `TryAgain` then the next (successful) try will add
1810     // the key to the rebuilding transaction object. If `ret_status` is
1811     // another non-OK `Status`, then the `rebuilding_trx_` will be thrown
1812     // away. So we only need to add to it when `ret_status.ok()`.
1813     if (UNLIKELY(ret_status.ok() && rebuilding_trx_ != nullptr)) {
1814       assert(!write_after_commit_);
1815       // TODO(ajkr): propagate `ProtectionInfoKVOTS64`.
1816       ret_status =
1817           WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
1818     }
1819     return ret_status;
1820   }
1821 
SingleDeleteCF(uint32_t column_family_id,const Slice & key)1822   Status SingleDeleteCF(uint32_t column_family_id, const Slice& key) override {
1823     const auto* kv_prot_info = NextProtectionInfo();
1824     // optimize for non-recovery mode
1825     if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
1826       // TODO(ajkr): propagate `ProtectionInfoKVOTS64`.
1827       return WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id,
1828                                               key);
1829       // else insert the values to the memtable right away
1830     }
1831 
1832     Status ret_status;
1833     if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
1834       if (ret_status.ok() && rebuilding_trx_ != nullptr) {
1835         assert(!write_after_commit_);
1836         // The CF is probably flushed and hence no need for insert but we still
1837         // need to keep track of the keys for upcoming rollback/commit.
1838         // TODO(ajkr): propagate `ProtectionInfoKVOTS64`.
1839         ret_status = WriteBatchInternal::SingleDelete(rebuilding_trx_,
1840                                                       column_family_id, key);
1841         if (ret_status.ok()) {
1842           MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, key));
1843         }
1844       } else if (ret_status.ok()) {
1845         MaybeAdvanceSeq(false /* batch_boundary */);
1846       }
1847       return ret_status;
1848     }
1849     assert(ret_status.ok());
1850 
1851     if (kv_prot_info != nullptr) {
1852       auto mem_kv_prot_info =
1853           kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
1854       ret_status = DeleteImpl(column_family_id, key, Slice(),
1855                               kTypeSingleDeletion, &mem_kv_prot_info);
1856     } else {
1857       ret_status = DeleteImpl(column_family_id, key, Slice(),
1858                               kTypeSingleDeletion, nullptr /* kv_prot_info */);
1859     }
1860     // optimize for non-recovery mode
1861     // If `ret_status` is `TryAgain` then the next (successful) try will add
1862     // the key to the rebuilding transaction object. If `ret_status` is
1863     // another non-OK `Status`, then the `rebuilding_trx_` will be thrown
1864     // away. So we only need to add to it when `ret_status.ok()`.
1865     if (UNLIKELY(ret_status.ok() && rebuilding_trx_ != nullptr)) {
1866       assert(!write_after_commit_);
1867       // TODO(ajkr): propagate `ProtectionInfoKVOTS64`.
1868       ret_status = WriteBatchInternal::SingleDelete(rebuilding_trx_,
1869                                                     column_family_id, key);
1870     }
1871     return ret_status;
1872   }
1873 
DeleteRangeCF(uint32_t column_family_id,const Slice & begin_key,const Slice & end_key)1874   Status DeleteRangeCF(uint32_t column_family_id, const Slice& begin_key,
1875                        const Slice& end_key) override {
1876     const auto* kv_prot_info = NextProtectionInfo();
1877     // optimize for non-recovery mode
1878     if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
1879       // TODO(ajkr): propagate `ProtectionInfoKVOTS64`.
1880       return WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id,
1881                                              begin_key, end_key);
1882       // else insert the values to the memtable right away
1883     }
1884 
1885     Status ret_status;
1886     if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
1887       if (ret_status.ok() && rebuilding_trx_ != nullptr) {
1888         assert(!write_after_commit_);
1889         // The CF is probably flushed and hence no need for insert but we still
1890         // need to keep track of the keys for upcoming rollback/commit.
1891         // TODO(ajkr): propagate `ProtectionInfoKVOTS64`.
1892         ret_status = WriteBatchInternal::DeleteRange(
1893             rebuilding_trx_, column_family_id, begin_key, end_key);
1894         if (ret_status.ok()) {
1895           MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, begin_key));
1896         }
1897       } else if (ret_status.ok()) {
1898         MaybeAdvanceSeq(false /* batch_boundary */);
1899       }
1900       return ret_status;
1901     }
1902     assert(ret_status.ok());
1903 
1904     if (db_ != nullptr) {
1905       auto cf_handle = cf_mems_->GetColumnFamilyHandle();
1906       if (cf_handle == nullptr) {
1907         cf_handle = db_->DefaultColumnFamily();
1908       }
1909       auto* cfd =
1910           static_cast_with_check<ColumnFamilyHandleImpl>(cf_handle)->cfd();
1911       if (!cfd->is_delete_range_supported()) {
1912         // TODO(ajkr): refactor `SeekToColumnFamily()` so it returns a `Status`.
1913         ret_status.PermitUncheckedError();
1914         return Status::NotSupported(
1915             std::string("DeleteRange not supported for table type ") +
1916             cfd->ioptions()->table_factory->Name() + " in CF " +
1917             cfd->GetName());
1918       }
1919       int cmp = cfd->user_comparator()->Compare(begin_key, end_key);
1920       if (cmp > 0) {
1921         // TODO(ajkr): refactor `SeekToColumnFamily()` so it returns a `Status`.
1922         ret_status.PermitUncheckedError();
1923         // It's an empty range where endpoints appear mistaken. Don't bother
1924         // applying it to the DB, and return an error to the user.
1925         return Status::InvalidArgument("end key comes before start key");
1926       } else if (cmp == 0) {
1927         // TODO(ajkr): refactor `SeekToColumnFamily()` so it returns a `Status`.
1928         ret_status.PermitUncheckedError();
1929         // It's an empty range. Don't bother applying it to the DB.
1930         return Status::OK();
1931       }
1932     }
1933 
1934     if (kv_prot_info != nullptr) {
1935       auto mem_kv_prot_info =
1936           kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
1937       ret_status = DeleteImpl(column_family_id, begin_key, end_key,
1938                               kTypeRangeDeletion, &mem_kv_prot_info);
1939     } else {
1940       ret_status = DeleteImpl(column_family_id, begin_key, end_key,
1941                               kTypeRangeDeletion, nullptr /* kv_prot_info */);
1942     }
1943     // optimize for non-recovery mode
1944     // If `ret_status` is `TryAgain` then the next (successful) try will add
1945     // the key to the rebuilding transaction object. If `ret_status` is
1946     // another non-OK `Status`, then the `rebuilding_trx_` will be thrown
1947     // away. So we only need to add to it when `ret_status.ok()`.
1948     if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
1949       assert(!write_after_commit_);
1950       // TODO(ajkr): propagate `ProtectionInfoKVOTS64`.
1951       ret_status = WriteBatchInternal::DeleteRange(
1952           rebuilding_trx_, column_family_id, begin_key, end_key);
1953     }
1954     return ret_status;
1955   }
1956 
MergeCF(uint32_t column_family_id,const Slice & key,const Slice & value)1957   Status MergeCF(uint32_t column_family_id, const Slice& key,
1958                  const Slice& value) override {
1959     const auto* kv_prot_info = NextProtectionInfo();
1960     // optimize for non-recovery mode
1961     if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
1962       // TODO(ajkr): propagate `ProtectionInfoKVOTS64`.
1963       return WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key,
1964                                        value);
1965       // else insert the values to the memtable right away
1966     }
1967 
1968     Status ret_status;
1969     if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
1970       if (ret_status.ok() && rebuilding_trx_ != nullptr) {
1971         assert(!write_after_commit_);
1972         // The CF is probably flushed and hence no need for insert but we still
1973         // need to keep track of the keys for upcoming rollback/commit.
1974         // TODO(ajkr): propagate `ProtectionInfoKVOTS64`.
1975         ret_status = WriteBatchInternal::Merge(rebuilding_trx_,
1976                                                column_family_id, key, value);
1977         if (ret_status.ok()) {
1978           MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, key));
1979         }
1980       } else if (ret_status.ok()) {
1981         MaybeAdvanceSeq(false /* batch_boundary */);
1982       }
1983       return ret_status;
1984     }
1985     assert(ret_status.ok());
1986 
1987     MemTable* mem = cf_mems_->GetMemTable();
1988     auto* moptions = mem->GetImmutableMemTableOptions();
1989     if (moptions->merge_operator == nullptr) {
1990       return Status::InvalidArgument(
1991           "Merge requires `ColumnFamilyOptions::merge_operator != nullptr`");
1992     }
1993     bool perform_merge = false;
1994     assert(!concurrent_memtable_writes_ ||
1995            moptions->max_successive_merges == 0);
1996 
1997     // If we pass DB through and options.max_successive_merges is hit
1998     // during recovery, Get() will be issued which will try to acquire
1999     // DB mutex and cause deadlock, as DB mutex is already held.
2000     // So we disable merge in recovery
2001     if (moptions->max_successive_merges > 0 && db_ != nullptr &&
2002         recovering_log_number_ == 0) {
2003       assert(!concurrent_memtable_writes_);
2004       LookupKey lkey(key, sequence_);
2005 
2006       // Count the number of successive merges at the head
2007       // of the key in the memtable
2008       size_t num_merges = mem->CountSuccessiveMergeEntries(lkey);
2009 
2010       if (num_merges >= moptions->max_successive_merges) {
2011         perform_merge = true;
2012       }
2013     }
2014 
2015     if (perform_merge) {
2016       // 1) Get the existing value
2017       std::string get_value;
2018 
2019       // Pass in the sequence number so that we also include previous merge
2020       // operations in the same batch.
2021       SnapshotImpl read_from_snapshot;
2022       read_from_snapshot.number_ = sequence_;
2023       ReadOptions read_options;
2024       read_options.snapshot = &read_from_snapshot;
2025 
2026       auto cf_handle = cf_mems_->GetColumnFamilyHandle();
2027       if (cf_handle == nullptr) {
2028         cf_handle = db_->DefaultColumnFamily();
2029       }
2030       Status get_status = db_->Get(read_options, cf_handle, key, &get_value);
2031       if (!get_status.ok()) {
2032         // Failed to read a key we know exists. Store the delta in memtable.
2033         perform_merge = false;
2034       } else {
2035         Slice get_value_slice = Slice(get_value);
2036 
2037         // 2) Apply this merge
2038         auto merge_operator = moptions->merge_operator;
2039         assert(merge_operator);
2040 
2041         std::string new_value;
2042         Status merge_status = MergeHelper::TimedFullMerge(
2043             merge_operator, key, &get_value_slice, {value}, &new_value,
2044             moptions->info_log, moptions->statistics,
2045             SystemClock::Default().get());
2046 
2047         if (!merge_status.ok()) {
2048           // Failed to merge!
2049           // Store the delta in memtable
2050           perform_merge = false;
2051         } else {
2052           // 3) Add value to memtable
2053           assert(!concurrent_memtable_writes_);
2054           if (kv_prot_info != nullptr) {
2055             auto merged_kv_prot_info =
2056                 kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
2057             merged_kv_prot_info.UpdateV(value, new_value);
2058             merged_kv_prot_info.UpdateO(kTypeMerge, kTypeValue);
2059             ret_status = mem->Add(sequence_, kTypeValue, key, new_value,
2060                                   &merged_kv_prot_info);
2061           } else {
2062             ret_status = mem->Add(sequence_, kTypeValue, key, new_value,
2063                                   nullptr /* kv_prot_info */);
2064           }
2065         }
2066       }
2067     }
2068 
2069     if (!perform_merge) {
2070       assert(ret_status.ok());
2071       // Add merge operand to memtable
2072       if (kv_prot_info != nullptr) {
2073         auto mem_kv_prot_info =
2074             kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
2075         ret_status =
2076             mem->Add(sequence_, kTypeMerge, key, value, &mem_kv_prot_info,
2077                      concurrent_memtable_writes_, get_post_process_info(mem));
2078       } else {
2079         ret_status = mem->Add(
2080             sequence_, kTypeMerge, key, value, nullptr /* kv_prot_info */,
2081             concurrent_memtable_writes_, get_post_process_info(mem));
2082       }
2083     }
2084 
2085     if (UNLIKELY(ret_status.IsTryAgain())) {
2086       assert(seq_per_batch_);
2087       const bool kBatchBoundary = true;
2088       MaybeAdvanceSeq(kBatchBoundary);
2089     } else if (ret_status.ok()) {
2090       MaybeAdvanceSeq();
2091       CheckMemtableFull();
2092     }
2093     // optimize for non-recovery mode
2094     // If `ret_status` is `TryAgain` then the next (successful) try will add
2095     // the key to the rebuilding transaction object. If `ret_status` is
2096     // another non-OK `Status`, then the `rebuilding_trx_` will be thrown
2097     // away. So we only need to add to it when `ret_status.ok()`.
2098     if (UNLIKELY(ret_status.ok() && rebuilding_trx_ != nullptr)) {
2099       assert(!write_after_commit_);
2100       // TODO(ajkr): propagate `ProtectionInfoKVOTS64`.
2101       ret_status = WriteBatchInternal::Merge(rebuilding_trx_, column_family_id,
2102                                              key, value);
2103     }
2104     return ret_status;
2105   }
2106 
PutBlobIndexCF(uint32_t column_family_id,const Slice & key,const Slice & value)2107   Status PutBlobIndexCF(uint32_t column_family_id, const Slice& key,
2108                         const Slice& value) override {
2109     const auto* kv_prot_info = NextProtectionInfo();
2110     if (kv_prot_info != nullptr) {
2111       // Memtable needs seqno, doesn't need CF ID
2112       auto mem_kv_prot_info =
2113           kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
2114       // Same as PutCF except for value type.
2115       return PutCFImpl(column_family_id, key, value, kTypeBlobIndex,
2116                        &mem_kv_prot_info);
2117     } else {
2118       return PutCFImpl(column_family_id, key, value, kTypeBlobIndex,
2119                        nullptr /* kv_prot_info */);
2120     }
2121   }
2122 
CheckMemtableFull()2123   void CheckMemtableFull() {
2124     if (flush_scheduler_ != nullptr) {
2125       auto* cfd = cf_mems_->current();
2126       assert(cfd != nullptr);
2127       if (cfd->mem()->ShouldScheduleFlush() &&
2128           cfd->mem()->MarkFlushScheduled()) {
2129         // MarkFlushScheduled only returns true if we are the one that
2130         // should take action, so no need to dedup further
2131         flush_scheduler_->ScheduleWork(cfd);
2132       }
2133     }
2134     // check if memtable_list size exceeds max_write_buffer_size_to_maintain
2135     if (trim_history_scheduler_ != nullptr) {
2136       auto* cfd = cf_mems_->current();
2137 
2138       assert(cfd);
2139       assert(cfd->ioptions());
2140 
2141       const size_t size_to_maintain = static_cast<size_t>(
2142           cfd->ioptions()->max_write_buffer_size_to_maintain);
2143 
2144       if (size_to_maintain > 0) {
2145         MemTableList* const imm = cfd->imm();
2146         assert(imm);
2147 
2148         if (imm->HasHistory()) {
2149           const MemTable* const mem = cfd->mem();
2150           assert(mem);
2151 
2152           if (mem->ApproximateMemoryUsageFast() +
2153                       imm->ApproximateMemoryUsageExcludingLast() >=
2154                   size_to_maintain &&
2155               imm->MarkTrimHistoryNeeded()) {
2156             trim_history_scheduler_->ScheduleWork(cfd);
2157           }
2158         }
2159       }
2160     }
2161   }
2162 
2163   // The write batch handler calls MarkBeginPrepare with unprepare set to true
2164   // if it encounters the kTypeBeginUnprepareXID marker.
MarkBeginPrepare(bool unprepare)2165   Status MarkBeginPrepare(bool unprepare) override {
2166     assert(rebuilding_trx_ == nullptr);
2167     assert(db_);
2168 
2169     if (recovering_log_number_ != 0) {
2170       // during recovery we rebuild a hollow transaction
2171       // from all encountered prepare sections of the wal
2172       if (db_->allow_2pc() == false) {
2173         return Status::NotSupported(
2174             "WAL contains prepared transactions. Open with "
2175             "TransactionDB::Open().");
2176       }
2177 
2178       // we are now iterating through a prepared section
2179       rebuilding_trx_ = new WriteBatch();
2180       rebuilding_trx_seq_ = sequence_;
2181       // Verify that we have matching MarkBeginPrepare/MarkEndPrepare markers.
2182       // unprepared_batch_ should be false because it is false by default, and
2183       // gets reset to false in MarkEndPrepare.
2184       assert(!unprepared_batch_);
2185       unprepared_batch_ = unprepare;
2186 
2187       if (has_valid_writes_ != nullptr) {
2188         *has_valid_writes_ = true;
2189       }
2190     }
2191 
2192     return Status::OK();
2193   }
2194 
MarkEndPrepare(const Slice & name)2195   Status MarkEndPrepare(const Slice& name) override {
2196     assert(db_);
2197     assert((rebuilding_trx_ != nullptr) == (recovering_log_number_ != 0));
2198 
2199     if (recovering_log_number_ != 0) {
2200       assert(db_->allow_2pc());
2201       size_t batch_cnt =
2202           write_after_commit_
2203               ? 0  // 0 will disable further checks
2204               : static_cast<size_t>(sequence_ - rebuilding_trx_seq_ + 1);
2205       db_->InsertRecoveredTransaction(recovering_log_number_, name.ToString(),
2206                                       rebuilding_trx_, rebuilding_trx_seq_,
2207                                       batch_cnt, unprepared_batch_);
2208       unprepared_batch_ = false;
2209       rebuilding_trx_ = nullptr;
2210     } else {
2211       assert(rebuilding_trx_ == nullptr);
2212     }
2213     const bool batch_boundry = true;
2214     MaybeAdvanceSeq(batch_boundry);
2215 
2216     return Status::OK();
2217   }
2218 
MarkNoop(bool empty_batch)2219   Status MarkNoop(bool empty_batch) override {
2220     // A hack in pessimistic transaction could result into a noop at the start
2221     // of the write batch, that should be ignored.
2222     if (!empty_batch) {
2223       // In the absence of Prepare markers, a kTypeNoop tag indicates the end of
2224       // a batch. This happens when write batch commits skipping the prepare
2225       // phase.
2226       const bool batch_boundry = true;
2227       MaybeAdvanceSeq(batch_boundry);
2228     }
2229     return Status::OK();
2230   }
2231 
MarkCommit(const Slice & name)2232   Status MarkCommit(const Slice& name) override {
2233     assert(db_);
2234 
2235     Status s;
2236 
2237     if (recovering_log_number_ != 0) {
2238       // in recovery when we encounter a commit marker
2239       // we lookup this transaction in our set of rebuilt transactions
2240       // and commit.
2241       auto trx = db_->GetRecoveredTransaction(name.ToString());
2242 
2243       // the log containing the prepared section may have
2244       // been released in the last incarnation because the
2245       // data was flushed to L0
2246       if (trx != nullptr) {
2247         // at this point individual CF lognumbers will prevent
2248         // duplicate re-insertion of values.
2249         assert(log_number_ref_ == 0);
2250         if (write_after_commit_) {
2251           // write_after_commit_ can only have one batch in trx.
2252           assert(trx->batches_.size() == 1);
2253           const auto& batch_info = trx->batches_.begin()->second;
2254           // all inserts must reference this trx log number
2255           log_number_ref_ = batch_info.log_number_;
2256           s = batch_info.batch_->Iterate(this);
2257           log_number_ref_ = 0;
2258         }
2259         // else the values are already inserted before the commit
2260 
2261         if (s.ok()) {
2262           db_->DeleteRecoveredTransaction(name.ToString());
2263         }
2264         if (has_valid_writes_ != nullptr) {
2265           *has_valid_writes_ = true;
2266         }
2267       }
2268     } else {
2269       // When writes are not delayed until commit, there is no disconnect
2270       // between a memtable write and the WAL that supports it. So the commit
2271       // need not reference any log as the only log to which it depends.
2272       assert(!write_after_commit_ || log_number_ref_ > 0);
2273     }
2274     const bool batch_boundry = true;
2275     MaybeAdvanceSeq(batch_boundry);
2276 
2277     return s;
2278   }
2279 
MarkRollback(const Slice & name)2280   Status MarkRollback(const Slice& name) override {
2281     assert(db_);
2282 
2283     if (recovering_log_number_ != 0) {
2284       auto trx = db_->GetRecoveredTransaction(name.ToString());
2285 
2286       // the log containing the transactions prep section
2287       // may have been released in the previous incarnation
2288       // because we knew it had been rolled back
2289       if (trx != nullptr) {
2290         db_->DeleteRecoveredTransaction(name.ToString());
2291       }
2292     } else {
2293       // in non recovery we simply ignore this tag
2294     }
2295 
2296     const bool batch_boundry = true;
2297     MaybeAdvanceSeq(batch_boundry);
2298 
2299     return Status::OK();
2300   }
2301 
2302  private:
get_post_process_info(MemTable * mem)2303   MemTablePostProcessInfo* get_post_process_info(MemTable* mem) {
2304     if (!concurrent_memtable_writes_) {
2305       // No need to batch counters locally if we don't use concurrent mode.
2306       return nullptr;
2307     }
2308     return &GetPostMap()[mem];
2309   }
2310 };
2311 
2312 // This function can only be called in these conditions:
2313 // 1) During Recovery()
2314 // 2) During Write(), in a single-threaded write thread
2315 // 3) During Write(), in a concurrent context where memtables has been cloned
2316 // The reason is that it calls memtables->Seek(), which has a stateful cache
InsertInto(WriteThread::WriteGroup & write_group,SequenceNumber sequence,ColumnFamilyMemTables * memtables,FlushScheduler * flush_scheduler,TrimHistoryScheduler * trim_history_scheduler,bool ignore_missing_column_families,uint64_t recovery_log_number,DB * db,bool concurrent_memtable_writes,bool seq_per_batch,bool batch_per_txn)2317 Status WriteBatchInternal::InsertInto(
2318     WriteThread::WriteGroup& write_group, SequenceNumber sequence,
2319     ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
2320     TrimHistoryScheduler* trim_history_scheduler,
2321     bool ignore_missing_column_families, uint64_t recovery_log_number, DB* db,
2322     bool concurrent_memtable_writes, bool seq_per_batch, bool batch_per_txn) {
2323   MemTableInserter inserter(
2324       sequence, memtables, flush_scheduler, trim_history_scheduler,
2325       ignore_missing_column_families, recovery_log_number, db,
2326       concurrent_memtable_writes, nullptr /* prot_info */,
2327       nullptr /*has_valid_writes*/, seq_per_batch, batch_per_txn);
2328   for (auto w : write_group) {
2329     if (w->CallbackFailed()) {
2330       continue;
2331     }
2332     w->sequence = inserter.sequence();
2333     if (!w->ShouldWriteToMemtable()) {
2334       // In seq_per_batch_ mode this advances the seq by one.
2335       inserter.MaybeAdvanceSeq(true);
2336       continue;
2337     }
2338     SetSequence(w->batch, inserter.sequence());
2339     inserter.set_log_number_ref(w->log_ref);
2340     inserter.set_prot_info(w->batch->prot_info_.get());
2341     w->status = w->batch->Iterate(&inserter);
2342     if (!w->status.ok()) {
2343       return w->status;
2344     }
2345     assert(!seq_per_batch || w->batch_cnt != 0);
2346     assert(!seq_per_batch || inserter.sequence() - w->sequence == w->batch_cnt);
2347   }
2348   return Status::OK();
2349 }
2350 
InsertInto(WriteThread::Writer * writer,SequenceNumber sequence,ColumnFamilyMemTables * memtables,FlushScheduler * flush_scheduler,TrimHistoryScheduler * trim_history_scheduler,bool ignore_missing_column_families,uint64_t log_number,DB * db,bool concurrent_memtable_writes,bool seq_per_batch,size_t batch_cnt,bool batch_per_txn,bool hint_per_batch)2351 Status WriteBatchInternal::InsertInto(
2352     WriteThread::Writer* writer, SequenceNumber sequence,
2353     ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
2354     TrimHistoryScheduler* trim_history_scheduler,
2355     bool ignore_missing_column_families, uint64_t log_number, DB* db,
2356     bool concurrent_memtable_writes, bool seq_per_batch, size_t batch_cnt,
2357     bool batch_per_txn, bool hint_per_batch) {
2358 #ifdef NDEBUG
2359   (void)batch_cnt;
2360 #endif
2361   assert(writer->ShouldWriteToMemtable());
2362   MemTableInserter inserter(sequence, memtables, flush_scheduler,
2363                             trim_history_scheduler,
2364                             ignore_missing_column_families, log_number, db,
2365                             concurrent_memtable_writes, nullptr /* prot_info */,
2366                             nullptr /*has_valid_writes*/, seq_per_batch,
2367                             batch_per_txn, hint_per_batch);
2368   SetSequence(writer->batch, sequence);
2369   inserter.set_log_number_ref(writer->log_ref);
2370   inserter.set_prot_info(writer->batch->prot_info_.get());
2371   Status s = writer->batch->Iterate(&inserter);
2372   assert(!seq_per_batch || batch_cnt != 0);
2373   assert(!seq_per_batch || inserter.sequence() - sequence == batch_cnt);
2374   if (concurrent_memtable_writes) {
2375     inserter.PostProcess();
2376   }
2377   return s;
2378 }
2379 
InsertInto(const WriteBatch * batch,ColumnFamilyMemTables * memtables,FlushScheduler * flush_scheduler,TrimHistoryScheduler * trim_history_scheduler,bool ignore_missing_column_families,uint64_t log_number,DB * db,bool concurrent_memtable_writes,SequenceNumber * next_seq,bool * has_valid_writes,bool seq_per_batch,bool batch_per_txn)2380 Status WriteBatchInternal::InsertInto(
2381     const WriteBatch* batch, ColumnFamilyMemTables* memtables,
2382     FlushScheduler* flush_scheduler,
2383     TrimHistoryScheduler* trim_history_scheduler,
2384     bool ignore_missing_column_families, uint64_t log_number, DB* db,
2385     bool concurrent_memtable_writes, SequenceNumber* next_seq,
2386     bool* has_valid_writes, bool seq_per_batch, bool batch_per_txn) {
2387   MemTableInserter inserter(Sequence(batch), memtables, flush_scheduler,
2388                             trim_history_scheduler,
2389                             ignore_missing_column_families, log_number, db,
2390                             concurrent_memtable_writes, batch->prot_info_.get(),
2391                             has_valid_writes, seq_per_batch, batch_per_txn);
2392   Status s = batch->Iterate(&inserter);
2393   if (next_seq != nullptr) {
2394     *next_seq = inserter.sequence();
2395   }
2396   if (concurrent_memtable_writes) {
2397     inserter.PostProcess();
2398   }
2399   return s;
2400 }
2401 
SetContents(WriteBatch * b,const Slice & contents)2402 Status WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
2403   assert(contents.size() >= WriteBatchInternal::kHeader);
2404   assert(b->prot_info_ == nullptr);
2405   b->rep_.assign(contents.data(), contents.size());
2406   b->content_flags_.store(ContentFlags::DEFERRED, std::memory_order_relaxed);
2407   return Status::OK();
2408 }
2409 
Append(WriteBatch * dst,const WriteBatch * src,const bool wal_only)2410 Status WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src,
2411                                   const bool wal_only) {
2412   assert(dst->Count() == 0 ||
2413          (dst->prot_info_ == nullptr) == (src->prot_info_ == nullptr));
2414   size_t src_len;
2415   int src_count;
2416   uint32_t src_flags;
2417 
2418   const SavePoint& batch_end = src->GetWalTerminationPoint();
2419 
2420   if (wal_only && !batch_end.is_cleared()) {
2421     src_len = batch_end.size - WriteBatchInternal::kHeader;
2422     src_count = batch_end.count;
2423     src_flags = batch_end.content_flags;
2424   } else {
2425     src_len = src->rep_.size() - WriteBatchInternal::kHeader;
2426     src_count = Count(src);
2427     src_flags = src->content_flags_.load(std::memory_order_relaxed);
2428   }
2429 
2430   if (dst->prot_info_ != nullptr) {
2431     std::copy(src->prot_info_->entries_.begin(),
2432               src->prot_info_->entries_.begin() + src_count,
2433               std::back_inserter(dst->prot_info_->entries_));
2434   } else if (src->prot_info_ != nullptr) {
2435     dst->prot_info_.reset(new WriteBatch::ProtectionInfo(*src->prot_info_));
2436   }
2437   SetCount(dst, Count(dst) + src_count);
2438   assert(src->rep_.size() >= WriteBatchInternal::kHeader);
2439   dst->rep_.append(src->rep_.data() + WriteBatchInternal::kHeader, src_len);
2440   dst->content_flags_.store(
2441       dst->content_flags_.load(std::memory_order_relaxed) | src_flags,
2442       std::memory_order_relaxed);
2443   return Status::OK();
2444 }
2445 
AppendedByteSize(size_t leftByteSize,size_t rightByteSize)2446 size_t WriteBatchInternal::AppendedByteSize(size_t leftByteSize,
2447                                             size_t rightByteSize) {
2448   if (leftByteSize == 0 || rightByteSize == 0) {
2449     return leftByteSize + rightByteSize;
2450   } else {
2451     return leftByteSize + rightByteSize - WriteBatchInternal::kHeader;
2452   }
2453 }
2454 
2455 }  // namespace ROCKSDB_NAMESPACE
2456