1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 //
10 // WriteBatch::rep_ :=
11 //    sequence: fixed64
12 //    count: fixed32
13 //    data: record[count]
14 // record :=
15 //    kTypeValue varstring varstring
16 //    kTypeDeletion varstring
17 //    kTypeSingleDeletion varstring
18 //    kTypeRangeDeletion varstring varstring
19 //    kTypeMerge varstring varstring
20 //    kTypeColumnFamilyValue varint32 varstring varstring
21 //    kTypeColumnFamilyDeletion varint32 varstring
22 //    kTypeColumnFamilySingleDeletion varint32 varstring
23 //    kTypeColumnFamilyRangeDeletion varint32 varstring varstring
24 //    kTypeColumnFamilyMerge varint32 varstring varstring
25 //    kTypeBeginPrepareXID varstring
26 //    kTypeEndPrepareXID
27 //    kTypeCommitXID varstring
28 //    kTypeRollbackXID varstring
29 //    kTypeBeginPersistedPrepareXID varstring
30 //    kTypeBeginUnprepareXID varstring
31 //    kTypeNoop
32 // varstring :=
33 //    len: varint32
34 //    data: uint8[len]
35 
36 #include "rocksdb/write_batch.h"
37 
38 #include <map>
39 #include <stack>
40 #include <stdexcept>
41 #include <type_traits>
42 #include <unordered_map>
43 #include <vector>
44 
45 #include "db/column_family.h"
46 #include "db/db_impl/db_impl.h"
47 #include "db/dbformat.h"
48 #include "db/flush_scheduler.h"
49 #include "db/kv_checksum.h"
50 #include "db/memtable.h"
51 #include "db/merge_context.h"
52 #include "db/snapshot_impl.h"
53 #include "db/trim_history_scheduler.h"
54 #include "db/write_batch_internal.h"
55 #include "monitoring/perf_context_imp.h"
56 #include "monitoring/statistics.h"
57 #include "port/lang.h"
58 #include "rocksdb/merge_operator.h"
59 #include "rocksdb/system_clock.h"
60 #include "util/autovector.h"
61 #include "util/cast_util.h"
62 #include "util/coding.h"
63 #include "util/duplicate_detector.h"
64 #include "util/string_util.h"
65 
66 namespace ROCKSDB_NAMESPACE {
67 
68 // anon namespace for file-local types
69 namespace {
70 
71 enum ContentFlags : uint32_t {
72   DEFERRED = 1 << 0,
73   HAS_PUT = 1 << 1,
74   HAS_DELETE = 1 << 2,
75   HAS_SINGLE_DELETE = 1 << 3,
76   HAS_MERGE = 1 << 4,
77   HAS_BEGIN_PREPARE = 1 << 5,
78   HAS_END_PREPARE = 1 << 6,
79   HAS_COMMIT = 1 << 7,
80   HAS_ROLLBACK = 1 << 8,
81   HAS_DELETE_RANGE = 1 << 9,
82   HAS_BLOB_INDEX = 1 << 10,
83   HAS_BEGIN_UNPREPARE = 1 << 11,
84 };
85 
86 struct BatchContentClassifier : public WriteBatch::Handler {
87   uint32_t content_flags = 0;
88 
PutCFROCKSDB_NAMESPACE::__anon239e65fc0111::BatchContentClassifier89   Status PutCF(uint32_t, const Slice&, const Slice&) override {
90     content_flags |= ContentFlags::HAS_PUT;
91     return Status::OK();
92   }
93 
DeleteCFROCKSDB_NAMESPACE::__anon239e65fc0111::BatchContentClassifier94   Status DeleteCF(uint32_t, const Slice&) override {
95     content_flags |= ContentFlags::HAS_DELETE;
96     return Status::OK();
97   }
98 
SingleDeleteCFROCKSDB_NAMESPACE::__anon239e65fc0111::BatchContentClassifier99   Status SingleDeleteCF(uint32_t, const Slice&) override {
100     content_flags |= ContentFlags::HAS_SINGLE_DELETE;
101     return Status::OK();
102   }
103 
DeleteRangeCFROCKSDB_NAMESPACE::__anon239e65fc0111::BatchContentClassifier104   Status DeleteRangeCF(uint32_t, const Slice&, const Slice&) override {
105     content_flags |= ContentFlags::HAS_DELETE_RANGE;
106     return Status::OK();
107   }
108 
MergeCFROCKSDB_NAMESPACE::__anon239e65fc0111::BatchContentClassifier109   Status MergeCF(uint32_t, const Slice&, const Slice&) override {
110     content_flags |= ContentFlags::HAS_MERGE;
111     return Status::OK();
112   }
113 
PutBlobIndexCFROCKSDB_NAMESPACE::__anon239e65fc0111::BatchContentClassifier114   Status PutBlobIndexCF(uint32_t, const Slice&, const Slice&) override {
115     content_flags |= ContentFlags::HAS_BLOB_INDEX;
116     return Status::OK();
117   }
118 
MarkBeginPrepareROCKSDB_NAMESPACE::__anon239e65fc0111::BatchContentClassifier119   Status MarkBeginPrepare(bool unprepare) override {
120     content_flags |= ContentFlags::HAS_BEGIN_PREPARE;
121     if (unprepare) {
122       content_flags |= ContentFlags::HAS_BEGIN_UNPREPARE;
123     }
124     return Status::OK();
125   }
126 
MarkEndPrepareROCKSDB_NAMESPACE::__anon239e65fc0111::BatchContentClassifier127   Status MarkEndPrepare(const Slice&) override {
128     content_flags |= ContentFlags::HAS_END_PREPARE;
129     return Status::OK();
130   }
131 
MarkCommitROCKSDB_NAMESPACE::__anon239e65fc0111::BatchContentClassifier132   Status MarkCommit(const Slice&) override {
133     content_flags |= ContentFlags::HAS_COMMIT;
134     return Status::OK();
135   }
136 
MarkRollbackROCKSDB_NAMESPACE::__anon239e65fc0111::BatchContentClassifier137   Status MarkRollback(const Slice&) override {
138     content_flags |= ContentFlags::HAS_ROLLBACK;
139     return Status::OK();
140   }
141 };
142 
143 class TimestampAssigner : public WriteBatch::Handler {
144  public:
TimestampAssigner(const Slice & ts,WriteBatch::ProtectionInfo * prot_info)145   explicit TimestampAssigner(const Slice& ts,
146                              WriteBatch::ProtectionInfo* prot_info)
147       : timestamp_(ts),
148         timestamps_(kEmptyTimestampList),
149         prot_info_(prot_info) {}
TimestampAssigner(const std::vector<Slice> & ts_list,WriteBatch::ProtectionInfo * prot_info)150   explicit TimestampAssigner(const std::vector<Slice>& ts_list,
151                              WriteBatch::ProtectionInfo* prot_info)
152       : timestamps_(ts_list), prot_info_(prot_info) {}
~TimestampAssigner()153   ~TimestampAssigner() override {}
154 
PutCF(uint32_t,const Slice & key,const Slice &)155   Status PutCF(uint32_t, const Slice& key, const Slice&) override {
156     AssignTimestamp(key);
157     ++idx_;
158     return Status::OK();
159   }
160 
DeleteCF(uint32_t,const Slice & key)161   Status DeleteCF(uint32_t, const Slice& key) override {
162     AssignTimestamp(key);
163     ++idx_;
164     return Status::OK();
165   }
166 
SingleDeleteCF(uint32_t,const Slice & key)167   Status SingleDeleteCF(uint32_t, const Slice& key) override {
168     AssignTimestamp(key);
169     ++idx_;
170     return Status::OK();
171   }
172 
DeleteRangeCF(uint32_t,const Slice & begin_key,const Slice &)173   Status DeleteRangeCF(uint32_t, const Slice& begin_key,
174                        const Slice& /* end_key */) override {
175     AssignTimestamp(begin_key);
176     ++idx_;
177     return Status::OK();
178   }
179 
MergeCF(uint32_t,const Slice & key,const Slice &)180   Status MergeCF(uint32_t, const Slice& key, const Slice&) override {
181     AssignTimestamp(key);
182     ++idx_;
183     return Status::OK();
184   }
185 
PutBlobIndexCF(uint32_t,const Slice &,const Slice &)186   Status PutBlobIndexCF(uint32_t, const Slice&, const Slice&) override {
187     // TODO (yanqin): support blob db in the future.
188     return Status::OK();
189   }
190 
MarkBeginPrepare(bool)191   Status MarkBeginPrepare(bool) override {
192     // TODO (yanqin): support in the future.
193     return Status::OK();
194   }
195 
MarkEndPrepare(const Slice &)196   Status MarkEndPrepare(const Slice&) override {
197     // TODO (yanqin): support in the future.
198     return Status::OK();
199   }
200 
MarkCommit(const Slice &)201   Status MarkCommit(const Slice&) override {
202     // TODO (yanqin): support in the future.
203     return Status::OK();
204   }
205 
MarkRollback(const Slice &)206   Status MarkRollback(const Slice&) override {
207     // TODO (yanqin): support in the future.
208     return Status::OK();
209   }
210 
211  private:
AssignTimestamp(const Slice & key)212   void AssignTimestamp(const Slice& key) {
213     assert(timestamps_.empty() || idx_ < timestamps_.size());
214     const Slice& ts = timestamps_.empty() ? timestamp_ : timestamps_[idx_];
215     size_t ts_sz = ts.size();
216     if (ts_sz == 0) {
217       // This key does not have timestamp, so skip.
218       return;
219     }
220     if (prot_info_ != nullptr) {
221       SliceParts old_key(&key, 1);
222       Slice key_no_ts(key.data(), key.size() - ts_sz);
223       std::array<Slice, 2> new_key_cmpts{{key_no_ts, ts}};
224       SliceParts new_key(new_key_cmpts.data(), 2);
225       prot_info_->entries_[idx_].UpdateK(old_key, new_key);
226     }
227     char* ptr = const_cast<char*>(key.data() + key.size() - ts_sz);
228     memcpy(ptr, ts.data(), ts_sz);
229   }
230 
231   static const std::vector<Slice> kEmptyTimestampList;
232   const Slice timestamp_;
233   const std::vector<Slice>& timestamps_;
234   WriteBatch::ProtectionInfo* const prot_info_;
235   size_t idx_ = 0;
236 
237   // No copy or move.
238   TimestampAssigner(const TimestampAssigner&) = delete;
239   TimestampAssigner(TimestampAssigner&&) = delete;
240   TimestampAssigner& operator=(const TimestampAssigner&) = delete;
241   TimestampAssigner&& operator=(TimestampAssigner&&) = delete;
242 };
243 const std::vector<Slice> TimestampAssigner::kEmptyTimestampList;
244 
245 }  // anon namespace
246 
247 struct SavePoints {
248   std::stack<SavePoint, autovector<SavePoint>> stack;
249 };
250 
WriteBatch(size_t reserved_bytes,size_t max_bytes)251 WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes)
252     : content_flags_(0), max_bytes_(max_bytes), rep_() {
253   rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader)
254                    ? reserved_bytes
255                    : WriteBatchInternal::kHeader);
256   rep_.resize(WriteBatchInternal::kHeader);
257 }
258 
WriteBatch(size_t reserved_bytes,size_t max_bytes,size_t protection_bytes_per_key)259 WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes,
260                        size_t protection_bytes_per_key)
261     : content_flags_(0), max_bytes_(max_bytes), rep_() {
262   // Currently `protection_bytes_per_key` can only be enabled at 8 bytes per
263   // entry.
264   assert(protection_bytes_per_key == 0 || protection_bytes_per_key == 8);
265   if (protection_bytes_per_key != 0) {
266     prot_info_.reset(new WriteBatch::ProtectionInfo());
267   }
268   rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader)
269                    ? reserved_bytes
270                    : WriteBatchInternal::kHeader);
271   rep_.resize(WriteBatchInternal::kHeader);
272 }
273 
WriteBatch(const std::string & rep)274 WriteBatch::WriteBatch(const std::string& rep)
275     : content_flags_(ContentFlags::DEFERRED), max_bytes_(0), rep_(rep) {}
276 
WriteBatch(std::string && rep)277 WriteBatch::WriteBatch(std::string&& rep)
278     : content_flags_(ContentFlags::DEFERRED),
279       max_bytes_(0),
280       rep_(std::move(rep)) {}
281 
WriteBatch(const WriteBatch & src)282 WriteBatch::WriteBatch(const WriteBatch& src)
283     : wal_term_point_(src.wal_term_point_),
284       content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
285       max_bytes_(src.max_bytes_),
286       rep_(src.rep_) {
287   if (src.save_points_ != nullptr) {
288     save_points_.reset(new SavePoints());
289     save_points_->stack = src.save_points_->stack;
290   }
291   if (src.prot_info_ != nullptr) {
292     prot_info_.reset(new WriteBatch::ProtectionInfo());
293     prot_info_->entries_ = src.prot_info_->entries_;
294   }
295 }
296 
WriteBatch(WriteBatch && src)297 WriteBatch::WriteBatch(WriteBatch&& src) noexcept
298     : save_points_(std::move(src.save_points_)),
299       wal_term_point_(std::move(src.wal_term_point_)),
300       content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
301       max_bytes_(src.max_bytes_),
302       prot_info_(std::move(src.prot_info_)),
303       rep_(std::move(src.rep_)) {}
304 
operator =(const WriteBatch & src)305 WriteBatch& WriteBatch::operator=(const WriteBatch& src) {
306   if (&src != this) {
307     this->~WriteBatch();
308     new (this) WriteBatch(src);
309   }
310   return *this;
311 }
312 
operator =(WriteBatch && src)313 WriteBatch& WriteBatch::operator=(WriteBatch&& src) {
314   if (&src != this) {
315     this->~WriteBatch();
316     new (this) WriteBatch(std::move(src));
317   }
318   return *this;
319 }
320 
~WriteBatch()321 WriteBatch::~WriteBatch() { }
322 
~Handler()323 WriteBatch::Handler::~Handler() { }
324 
LogData(const Slice &)325 void WriteBatch::Handler::LogData(const Slice& /*blob*/) {
326   // If the user has not specified something to do with blobs, then we ignore
327   // them.
328 }
329 
Continue()330 bool WriteBatch::Handler::Continue() {
331   return true;
332 }
333 
Clear()334 void WriteBatch::Clear() {
335   rep_.clear();
336   rep_.resize(WriteBatchInternal::kHeader);
337 
338   content_flags_.store(0, std::memory_order_relaxed);
339 
340   if (save_points_ != nullptr) {
341     while (!save_points_->stack.empty()) {
342       save_points_->stack.pop();
343     }
344   }
345 
346   if (prot_info_ != nullptr) {
347     prot_info_->entries_.clear();
348   }
349   wal_term_point_.clear();
350 }
351 
Count() const352 uint32_t WriteBatch::Count() const { return WriteBatchInternal::Count(this); }
353 
ComputeContentFlags() const354 uint32_t WriteBatch::ComputeContentFlags() const {
355   auto rv = content_flags_.load(std::memory_order_relaxed);
356   if ((rv & ContentFlags::DEFERRED) != 0) {
357     BatchContentClassifier classifier;
358     // Should we handle status here?
359     Iterate(&classifier).PermitUncheckedError();
360     rv = classifier.content_flags;
361 
362     // this method is conceptually const, because it is performing a lazy
363     // computation that doesn't affect the abstract state of the batch.
364     // content_flags_ is marked mutable so that we can perform the
365     // following assignment
366     content_flags_.store(rv, std::memory_order_relaxed);
367   }
368   return rv;
369 }
370 
MarkWalTerminationPoint()371 void WriteBatch::MarkWalTerminationPoint() {
372   wal_term_point_.size = GetDataSize();
373   wal_term_point_.count = Count();
374   wal_term_point_.content_flags = content_flags_;
375 }
376 
GetProtectionBytesPerKey() const377 size_t WriteBatch::GetProtectionBytesPerKey() const {
378   if (prot_info_ != nullptr) {
379     return prot_info_->GetBytesPerKey();
380   }
381   return 0;
382 }
383 
HasPut() const384 bool WriteBatch::HasPut() const {
385   return (ComputeContentFlags() & ContentFlags::HAS_PUT) != 0;
386 }
387 
HasDelete() const388 bool WriteBatch::HasDelete() const {
389   return (ComputeContentFlags() & ContentFlags::HAS_DELETE) != 0;
390 }
391 
HasSingleDelete() const392 bool WriteBatch::HasSingleDelete() const {
393   return (ComputeContentFlags() & ContentFlags::HAS_SINGLE_DELETE) != 0;
394 }
395 
HasDeleteRange() const396 bool WriteBatch::HasDeleteRange() const {
397   return (ComputeContentFlags() & ContentFlags::HAS_DELETE_RANGE) != 0;
398 }
399 
HasMerge() const400 bool WriteBatch::HasMerge() const {
401   return (ComputeContentFlags() & ContentFlags::HAS_MERGE) != 0;
402 }
403 
ReadKeyFromWriteBatchEntry(Slice * input,Slice * key,bool cf_record)404 bool ReadKeyFromWriteBatchEntry(Slice* input, Slice* key, bool cf_record) {
405   assert(input != nullptr && key != nullptr);
406   // Skip tag byte
407   input->remove_prefix(1);
408 
409   if (cf_record) {
410     // Skip column_family bytes
411     uint32_t cf;
412     if (!GetVarint32(input, &cf)) {
413       return false;
414     }
415   }
416 
417   // Extract key
418   return GetLengthPrefixedSlice(input, key);
419 }
420 
HasBeginPrepare() const421 bool WriteBatch::HasBeginPrepare() const {
422   return (ComputeContentFlags() & ContentFlags::HAS_BEGIN_PREPARE) != 0;
423 }
424 
HasEndPrepare() const425 bool WriteBatch::HasEndPrepare() const {
426   return (ComputeContentFlags() & ContentFlags::HAS_END_PREPARE) != 0;
427 }
428 
HasCommit() const429 bool WriteBatch::HasCommit() const {
430   return (ComputeContentFlags() & ContentFlags::HAS_COMMIT) != 0;
431 }
432 
HasRollback() const433 bool WriteBatch::HasRollback() const {
434   return (ComputeContentFlags() & ContentFlags::HAS_ROLLBACK) != 0;
435 }
436 
ReadRecordFromWriteBatch(Slice * input,char * tag,uint32_t * column_family,Slice * key,Slice * value,Slice * blob,Slice * xid)437 Status ReadRecordFromWriteBatch(Slice* input, char* tag,
438                                 uint32_t* column_family, Slice* key,
439                                 Slice* value, Slice* blob, Slice* xid) {
440   assert(key != nullptr && value != nullptr);
441   *tag = (*input)[0];
442   input->remove_prefix(1);
443   *column_family = 0;  // default
444   switch (*tag) {
445     case kTypeColumnFamilyValue:
446       if (!GetVarint32(input, column_family)) {
447         return Status::Corruption("bad WriteBatch Put");
448       }
449       FALLTHROUGH_INTENDED;
450     case kTypeValue:
451       if (!GetLengthPrefixedSlice(input, key) ||
452           !GetLengthPrefixedSlice(input, value)) {
453         return Status::Corruption("bad WriteBatch Put");
454       }
455       break;
456     case kTypeColumnFamilyDeletion:
457     case kTypeColumnFamilySingleDeletion:
458       if (!GetVarint32(input, column_family)) {
459         return Status::Corruption("bad WriteBatch Delete");
460       }
461       FALLTHROUGH_INTENDED;
462     case kTypeDeletion:
463     case kTypeSingleDeletion:
464       if (!GetLengthPrefixedSlice(input, key)) {
465         return Status::Corruption("bad WriteBatch Delete");
466       }
467       break;
468     case kTypeColumnFamilyRangeDeletion:
469       if (!GetVarint32(input, column_family)) {
470         return Status::Corruption("bad WriteBatch DeleteRange");
471       }
472       FALLTHROUGH_INTENDED;
473     case kTypeRangeDeletion:
474       // for range delete, "key" is begin_key, "value" is end_key
475       if (!GetLengthPrefixedSlice(input, key) ||
476           !GetLengthPrefixedSlice(input, value)) {
477         return Status::Corruption("bad WriteBatch DeleteRange");
478       }
479       break;
480     case kTypeColumnFamilyMerge:
481       if (!GetVarint32(input, column_family)) {
482         return Status::Corruption("bad WriteBatch Merge");
483       }
484       FALLTHROUGH_INTENDED;
485     case kTypeMerge:
486       if (!GetLengthPrefixedSlice(input, key) ||
487           !GetLengthPrefixedSlice(input, value)) {
488         return Status::Corruption("bad WriteBatch Merge");
489       }
490       break;
491     case kTypeColumnFamilyBlobIndex:
492       if (!GetVarint32(input, column_family)) {
493         return Status::Corruption("bad WriteBatch BlobIndex");
494       }
495       FALLTHROUGH_INTENDED;
496     case kTypeBlobIndex:
497       if (!GetLengthPrefixedSlice(input, key) ||
498           !GetLengthPrefixedSlice(input, value)) {
499         return Status::Corruption("bad WriteBatch BlobIndex");
500       }
501       break;
502     case kTypeLogData:
503       assert(blob != nullptr);
504       if (!GetLengthPrefixedSlice(input, blob)) {
505         return Status::Corruption("bad WriteBatch Blob");
506       }
507       break;
508     case kTypeNoop:
509     case kTypeBeginPrepareXID:
510       // This indicates that the prepared batch is also persisted in the db.
511       // This is used in WritePreparedTxn
512     case kTypeBeginPersistedPrepareXID:
513       // This is used in WriteUnpreparedTxn
514     case kTypeBeginUnprepareXID:
515       break;
516     case kTypeEndPrepareXID:
517       if (!GetLengthPrefixedSlice(input, xid)) {
518         return Status::Corruption("bad EndPrepare XID");
519       }
520       break;
521     case kTypeCommitXID:
522       if (!GetLengthPrefixedSlice(input, xid)) {
523         return Status::Corruption("bad Commit XID");
524       }
525       break;
526     case kTypeRollbackXID:
527       if (!GetLengthPrefixedSlice(input, xid)) {
528         return Status::Corruption("bad Rollback XID");
529       }
530       break;
531     default:
532       return Status::Corruption("unknown WriteBatch tag");
533   }
534   return Status::OK();
535 }
536 
Iterate(Handler * handler) const537 Status WriteBatch::Iterate(Handler* handler) const {
538   if (rep_.size() < WriteBatchInternal::kHeader) {
539     return Status::Corruption("malformed WriteBatch (too small)");
540   }
541 
542   return WriteBatchInternal::Iterate(this, handler, WriteBatchInternal::kHeader,
543                                      rep_.size());
544 }
545 
Iterate(const WriteBatch * wb,WriteBatch::Handler * handler,size_t begin,size_t end)546 Status WriteBatchInternal::Iterate(const WriteBatch* wb,
547                                    WriteBatch::Handler* handler, size_t begin,
548                                    size_t end) {
549   if (begin > wb->rep_.size() || end > wb->rep_.size() || end < begin) {
550     return Status::Corruption("Invalid start/end bounds for Iterate");
551   }
552   assert(begin <= end);
553   Slice input(wb->rep_.data() + begin, static_cast<size_t>(end - begin));
554   bool whole_batch =
555       (begin == WriteBatchInternal::kHeader) && (end == wb->rep_.size());
556 
557   Slice key, value, blob, xid;
558   // Sometimes a sub-batch starts with a Noop. We want to exclude such Noops as
559   // the batch boundary symbols otherwise we would mis-count the number of
560   // batches. We do that by checking whether the accumulated batch is empty
561   // before seeing the next Noop.
562   bool empty_batch = true;
563   uint32_t found = 0;
564   Status s;
565   char tag = 0;
566   uint32_t column_family = 0;  // default
567   bool last_was_try_again = false;
568   bool handler_continue = true;
569   while (((s.ok() && !input.empty()) || UNLIKELY(s.IsTryAgain()))) {
570     handler_continue = handler->Continue();
571     if (!handler_continue) {
572       break;
573     }
574 
575     if (LIKELY(!s.IsTryAgain())) {
576       last_was_try_again = false;
577       tag = 0;
578       column_family = 0;  // default
579 
580       s = ReadRecordFromWriteBatch(&input, &tag, &column_family, &key, &value,
581                                    &blob, &xid);
582       if (!s.ok()) {
583         return s;
584       }
585     } else {
586       assert(s.IsTryAgain());
587       assert(!last_was_try_again);  // to detect infinite loop bugs
588       if (UNLIKELY(last_was_try_again)) {
589         return Status::Corruption(
590             "two consecutive TryAgain in WriteBatch handler; this is either a "
591             "software bug or data corruption.");
592       }
593       last_was_try_again = true;
594       s = Status::OK();
595     }
596 
597     switch (tag) {
598       case kTypeColumnFamilyValue:
599       case kTypeValue:
600         assert(wb->content_flags_.load(std::memory_order_relaxed) &
601                (ContentFlags::DEFERRED | ContentFlags::HAS_PUT));
602         s = handler->PutCF(column_family, key, value);
603         if (LIKELY(s.ok())) {
604           empty_batch = false;
605           found++;
606         }
607         break;
608       case kTypeColumnFamilyDeletion:
609       case kTypeDeletion:
610         assert(wb->content_flags_.load(std::memory_order_relaxed) &
611                (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE));
612         s = handler->DeleteCF(column_family, key);
613         if (LIKELY(s.ok())) {
614           empty_batch = false;
615           found++;
616         }
617         break;
618       case kTypeColumnFamilySingleDeletion:
619       case kTypeSingleDeletion:
620         assert(wb->content_flags_.load(std::memory_order_relaxed) &
621                (ContentFlags::DEFERRED | ContentFlags::HAS_SINGLE_DELETE));
622         s = handler->SingleDeleteCF(column_family, key);
623         if (LIKELY(s.ok())) {
624           empty_batch = false;
625           found++;
626         }
627         break;
628       case kTypeColumnFamilyRangeDeletion:
629       case kTypeRangeDeletion:
630         assert(wb->content_flags_.load(std::memory_order_relaxed) &
631                (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE_RANGE));
632         s = handler->DeleteRangeCF(column_family, key, value);
633         if (LIKELY(s.ok())) {
634           empty_batch = false;
635           found++;
636         }
637         break;
638       case kTypeColumnFamilyMerge:
639       case kTypeMerge:
640         assert(wb->content_flags_.load(std::memory_order_relaxed) &
641                (ContentFlags::DEFERRED | ContentFlags::HAS_MERGE));
642         s = handler->MergeCF(column_family, key, value);
643         if (LIKELY(s.ok())) {
644           empty_batch = false;
645           found++;
646         }
647         break;
648       case kTypeColumnFamilyBlobIndex:
649       case kTypeBlobIndex:
650         assert(wb->content_flags_.load(std::memory_order_relaxed) &
651                (ContentFlags::DEFERRED | ContentFlags::HAS_BLOB_INDEX));
652         s = handler->PutBlobIndexCF(column_family, key, value);
653         if (LIKELY(s.ok())) {
654           found++;
655         }
656         break;
657       case kTypeLogData:
658         handler->LogData(blob);
659         // A batch might have nothing but LogData. It is still a batch.
660         empty_batch = false;
661         break;
662       case kTypeBeginPrepareXID:
663         assert(wb->content_flags_.load(std::memory_order_relaxed) &
664                (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE));
665         s = handler->MarkBeginPrepare();
666         assert(s.ok());
667         empty_batch = false;
668         if (!handler->WriteAfterCommit()) {
669           s = Status::NotSupported(
670               "WriteCommitted txn tag when write_after_commit_ is disabled (in "
671               "WritePrepared/WriteUnprepared mode). If it is not due to "
672               "corruption, the WAL must be emptied before changing the "
673               "WritePolicy.");
674         }
675         if (handler->WriteBeforePrepare()) {
676           s = Status::NotSupported(
677               "WriteCommitted txn tag when write_before_prepare_ is enabled "
678               "(in WriteUnprepared mode). If it is not due to corruption, the "
679               "WAL must be emptied before changing the WritePolicy.");
680         }
681         break;
682       case kTypeBeginPersistedPrepareXID:
683         assert(wb->content_flags_.load(std::memory_order_relaxed) &
684                (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE));
685         s = handler->MarkBeginPrepare();
686         assert(s.ok());
687         empty_batch = false;
688         if (handler->WriteAfterCommit()) {
689           s = Status::NotSupported(
690               "WritePrepared/WriteUnprepared txn tag when write_after_commit_ "
691               "is enabled (in default WriteCommitted mode). If it is not due "
692               "to corruption, the WAL must be emptied before changing the "
693               "WritePolicy.");
694         }
695         break;
696       case kTypeBeginUnprepareXID:
697         assert(wb->content_flags_.load(std::memory_order_relaxed) &
698                (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_UNPREPARE));
699         s = handler->MarkBeginPrepare(true /* unprepared */);
700         assert(s.ok());
701         empty_batch = false;
702         if (handler->WriteAfterCommit()) {
703           s = Status::NotSupported(
704               "WriteUnprepared txn tag when write_after_commit_ is enabled (in "
705               "default WriteCommitted mode). If it is not due to corruption, "
706               "the WAL must be emptied before changing the WritePolicy.");
707         }
708         if (!handler->WriteBeforePrepare()) {
709           s = Status::NotSupported(
710               "WriteUnprepared txn tag when write_before_prepare_ is disabled "
711               "(in WriteCommitted/WritePrepared mode). If it is not due to "
712               "corruption, the WAL must be emptied before changing the "
713               "WritePolicy.");
714         }
715         break;
716       case kTypeEndPrepareXID:
717         assert(wb->content_flags_.load(std::memory_order_relaxed) &
718                (ContentFlags::DEFERRED | ContentFlags::HAS_END_PREPARE));
719         s = handler->MarkEndPrepare(xid);
720         assert(s.ok());
721         empty_batch = true;
722         break;
723       case kTypeCommitXID:
724         assert(wb->content_flags_.load(std::memory_order_relaxed) &
725                (ContentFlags::DEFERRED | ContentFlags::HAS_COMMIT));
726         s = handler->MarkCommit(xid);
727         assert(s.ok());
728         empty_batch = true;
729         break;
730       case kTypeRollbackXID:
731         assert(wb->content_flags_.load(std::memory_order_relaxed) &
732                (ContentFlags::DEFERRED | ContentFlags::HAS_ROLLBACK));
733         s = handler->MarkRollback(xid);
734         assert(s.ok());
735         empty_batch = true;
736         break;
737       case kTypeNoop:
738         s = handler->MarkNoop(empty_batch);
739         assert(s.ok());
740         empty_batch = true;
741         break;
742       default:
743         return Status::Corruption("unknown WriteBatch tag");
744     }
745   }
746   if (!s.ok()) {
747     return s;
748   }
749   if (handler_continue && whole_batch &&
750       found != WriteBatchInternal::Count(wb)) {
751     return Status::Corruption("WriteBatch has wrong count");
752   } else {
753     return Status::OK();
754   }
755 }
756 
IsLatestPersistentState(const WriteBatch * b)757 bool WriteBatchInternal::IsLatestPersistentState(const WriteBatch* b) {
758   return b->is_latest_persistent_state_;
759 }
760 
SetAsLatestPersistentState(WriteBatch * b)761 void WriteBatchInternal::SetAsLatestPersistentState(WriteBatch* b) {
762   b->is_latest_persistent_state_ = true;
763 }
764 
Count(const WriteBatch * b)765 uint32_t WriteBatchInternal::Count(const WriteBatch* b) {
766   return DecodeFixed32(b->rep_.data() + 8);
767 }
768 
SetCount(WriteBatch * b,uint32_t n)769 void WriteBatchInternal::SetCount(WriteBatch* b, uint32_t n) {
770   EncodeFixed32(&b->rep_[8], n);
771 }
772 
Sequence(const WriteBatch * b)773 SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) {
774   return SequenceNumber(DecodeFixed64(b->rep_.data()));
775 }
776 
SetSequence(WriteBatch * b,SequenceNumber seq)777 void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
778   EncodeFixed64(&b->rep_[0], seq);
779 }
780 
GetFirstOffset(WriteBatch *)781 size_t WriteBatchInternal::GetFirstOffset(WriteBatch* /*b*/) {
782   return WriteBatchInternal::kHeader;
783 }
784 
Put(WriteBatch * b,uint32_t column_family_id,const Slice & key,const Slice & value)785 Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
786                                const Slice& key, const Slice& value) {
787   if (key.size() > size_t{port::kMaxUint32}) {
788     return Status::InvalidArgument("key is too large");
789   }
790   if (value.size() > size_t{port::kMaxUint32}) {
791     return Status::InvalidArgument("value is too large");
792   }
793 
794   LocalSavePoint save(b);
795   WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
796   if (column_family_id == 0) {
797     b->rep_.push_back(static_cast<char>(kTypeValue));
798   } else {
799     b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
800     PutVarint32(&b->rep_, column_family_id);
801   }
802   PutLengthPrefixedSlice(&b->rep_, key);
803   PutLengthPrefixedSlice(&b->rep_, value);
804   b->content_flags_.store(
805       b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
806       std::memory_order_relaxed);
807   if (b->prot_info_ != nullptr) {
808     // Technically the optype could've been `kTypeColumnFamilyValue` with the
809     // CF ID encoded in the `WriteBatch`. That distinction is unimportant
810     // however since we verify CF ID is correct, as well as all other fields
811     // (a missing/extra encoded CF ID would corrupt another field). It is
812     // convenient to consolidate on `kTypeValue` here as that is what will be
813     // inserted into memtable.
814     b->prot_info_->entries_.emplace_back(ProtectionInfo64()
815                                              .ProtectKVO(key, value, kTypeValue)
816                                              .ProtectC(column_family_id));
817   }
818   return save.commit();
819 }
820 
Put(ColumnFamilyHandle * column_family,const Slice & key,const Slice & value)821 Status WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key,
822                        const Slice& value) {
823   return WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key,
824                                  value);
825 }
826 
CheckSlicePartsLength(const SliceParts & key,const SliceParts & value)827 Status WriteBatchInternal::CheckSlicePartsLength(const SliceParts& key,
828                                                  const SliceParts& value) {
829   size_t total_key_bytes = 0;
830   for (int i = 0; i < key.num_parts; ++i) {
831     total_key_bytes += key.parts[i].size();
832   }
833   if (total_key_bytes >= size_t{port::kMaxUint32}) {
834     return Status::InvalidArgument("key is too large");
835   }
836 
837   size_t total_value_bytes = 0;
838   for (int i = 0; i < value.num_parts; ++i) {
839     total_value_bytes += value.parts[i].size();
840   }
841   if (total_value_bytes >= size_t{port::kMaxUint32}) {
842     return Status::InvalidArgument("value is too large");
843   }
844   return Status::OK();
845 }
846 
Put(WriteBatch * b,uint32_t column_family_id,const SliceParts & key,const SliceParts & value)847 Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
848                                const SliceParts& key, const SliceParts& value) {
849   Status s = CheckSlicePartsLength(key, value);
850   if (!s.ok()) {
851     return s;
852   }
853 
854   LocalSavePoint save(b);
855   WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
856   if (column_family_id == 0) {
857     b->rep_.push_back(static_cast<char>(kTypeValue));
858   } else {
859     b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
860     PutVarint32(&b->rep_, column_family_id);
861   }
862   PutLengthPrefixedSliceParts(&b->rep_, key);
863   PutLengthPrefixedSliceParts(&b->rep_, value);
864   b->content_flags_.store(
865       b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
866       std::memory_order_relaxed);
867   if (b->prot_info_ != nullptr) {
868     // See comment in first `WriteBatchInternal::Put()` overload concerning the
869     // `ValueType` argument passed to `ProtectKVO()`.
870     b->prot_info_->entries_.emplace_back(ProtectionInfo64()
871                                              .ProtectKVO(key, value, kTypeValue)
872                                              .ProtectC(column_family_id));
873   }
874   return save.commit();
875 }
876 
Put(ColumnFamilyHandle * column_family,const SliceParts & key,const SliceParts & value)877 Status WriteBatch::Put(ColumnFamilyHandle* column_family, const SliceParts& key,
878                        const SliceParts& value) {
879   return WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key,
880                                  value);
881 }
882 
InsertNoop(WriteBatch * b)883 Status WriteBatchInternal::InsertNoop(WriteBatch* b) {
884   b->rep_.push_back(static_cast<char>(kTypeNoop));
885   return Status::OK();
886 }
887 
MarkEndPrepare(WriteBatch * b,const Slice & xid,bool write_after_commit,bool unprepared_batch)888 Status WriteBatchInternal::MarkEndPrepare(WriteBatch* b, const Slice& xid,
889                                           bool write_after_commit,
890                                           bool unprepared_batch) {
891   // a manually constructed batch can only contain one prepare section
892   assert(b->rep_[12] == static_cast<char>(kTypeNoop));
893 
894   // all savepoints up to this point are cleared
895   if (b->save_points_ != nullptr) {
896     while (!b->save_points_->stack.empty()) {
897       b->save_points_->stack.pop();
898     }
899   }
900 
901   // rewrite noop as begin marker
902   b->rep_[12] = static_cast<char>(
903       write_after_commit ? kTypeBeginPrepareXID
904                          : (unprepared_batch ? kTypeBeginUnprepareXID
905                                              : kTypeBeginPersistedPrepareXID));
906   b->rep_.push_back(static_cast<char>(kTypeEndPrepareXID));
907   PutLengthPrefixedSlice(&b->rep_, xid);
908   b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
909                               ContentFlags::HAS_END_PREPARE |
910                               ContentFlags::HAS_BEGIN_PREPARE,
911                           std::memory_order_relaxed);
912   if (unprepared_batch) {
913     b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
914                                 ContentFlags::HAS_BEGIN_UNPREPARE,
915                             std::memory_order_relaxed);
916   }
917   return Status::OK();
918 }
919 
MarkCommit(WriteBatch * b,const Slice & xid)920 Status WriteBatchInternal::MarkCommit(WriteBatch* b, const Slice& xid) {
921   b->rep_.push_back(static_cast<char>(kTypeCommitXID));
922   PutLengthPrefixedSlice(&b->rep_, xid);
923   b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
924                               ContentFlags::HAS_COMMIT,
925                           std::memory_order_relaxed);
926   return Status::OK();
927 }
928 
MarkRollback(WriteBatch * b,const Slice & xid)929 Status WriteBatchInternal::MarkRollback(WriteBatch* b, const Slice& xid) {
930   b->rep_.push_back(static_cast<char>(kTypeRollbackXID));
931   PutLengthPrefixedSlice(&b->rep_, xid);
932   b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
933                               ContentFlags::HAS_ROLLBACK,
934                           std::memory_order_relaxed);
935   return Status::OK();
936 }
937 
Delete(WriteBatch * b,uint32_t column_family_id,const Slice & key)938 Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
939                                   const Slice& key) {
940   LocalSavePoint save(b);
941   WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
942   if (column_family_id == 0) {
943     b->rep_.push_back(static_cast<char>(kTypeDeletion));
944   } else {
945     b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion));
946     PutVarint32(&b->rep_, column_family_id);
947   }
948   PutLengthPrefixedSlice(&b->rep_, key);
949   b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
950                               ContentFlags::HAS_DELETE,
951                           std::memory_order_relaxed);
952   if (b->prot_info_ != nullptr) {
953     // See comment in first `WriteBatchInternal::Put()` overload concerning the
954     // `ValueType` argument passed to `ProtectKVO()`.
955     b->prot_info_->entries_.emplace_back(
956         ProtectionInfo64()
957             .ProtectKVO(key, "" /* value */, kTypeDeletion)
958             .ProtectC(column_family_id));
959   }
960   return save.commit();
961 }
962 
Delete(ColumnFamilyHandle * column_family,const Slice & key)963 Status WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key) {
964   return WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family),
965                                     key);
966 }
967 
Delete(WriteBatch * b,uint32_t column_family_id,const SliceParts & key)968 Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
969                                   const SliceParts& key) {
970   LocalSavePoint save(b);
971   WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
972   if (column_family_id == 0) {
973     b->rep_.push_back(static_cast<char>(kTypeDeletion));
974   } else {
975     b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion));
976     PutVarint32(&b->rep_, column_family_id);
977   }
978   PutLengthPrefixedSliceParts(&b->rep_, key);
979   b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
980                               ContentFlags::HAS_DELETE,
981                           std::memory_order_relaxed);
982   if (b->prot_info_ != nullptr) {
983     // See comment in first `WriteBatchInternal::Put()` overload concerning the
984     // `ValueType` argument passed to `ProtectKVO()`.
985     b->prot_info_->entries_.emplace_back(
986         ProtectionInfo64()
987             .ProtectKVO(key,
988                         SliceParts(nullptr /* _parts */, 0 /* _num_parts */),
989                         kTypeDeletion)
990             .ProtectC(column_family_id));
991   }
992   return save.commit();
993 }
994 
Delete(ColumnFamilyHandle * column_family,const SliceParts & key)995 Status WriteBatch::Delete(ColumnFamilyHandle* column_family,
996                           const SliceParts& key) {
997   return WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family),
998                                     key);
999 }
1000 
SingleDelete(WriteBatch * b,uint32_t column_family_id,const Slice & key)1001 Status WriteBatchInternal::SingleDelete(WriteBatch* b,
1002                                         uint32_t column_family_id,
1003                                         const Slice& key) {
1004   LocalSavePoint save(b);
1005   WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1006   if (column_family_id == 0) {
1007     b->rep_.push_back(static_cast<char>(kTypeSingleDeletion));
1008   } else {
1009     b->rep_.push_back(static_cast<char>(kTypeColumnFamilySingleDeletion));
1010     PutVarint32(&b->rep_, column_family_id);
1011   }
1012   PutLengthPrefixedSlice(&b->rep_, key);
1013   b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1014                               ContentFlags::HAS_SINGLE_DELETE,
1015                           std::memory_order_relaxed);
1016   if (b->prot_info_ != nullptr) {
1017     // See comment in first `WriteBatchInternal::Put()` overload concerning the
1018     // `ValueType` argument passed to `ProtectKVO()`.
1019     b->prot_info_->entries_.emplace_back(
1020         ProtectionInfo64()
1021             .ProtectKVO(key, "" /* value */, kTypeSingleDeletion)
1022             .ProtectC(column_family_id));
1023   }
1024   return save.commit();
1025 }
1026 
SingleDelete(ColumnFamilyHandle * column_family,const Slice & key)1027 Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family,
1028                                 const Slice& key) {
1029   return WriteBatchInternal::SingleDelete(
1030       this, GetColumnFamilyID(column_family), key);
1031 }
1032 
SingleDelete(WriteBatch * b,uint32_t column_family_id,const SliceParts & key)1033 Status WriteBatchInternal::SingleDelete(WriteBatch* b,
1034                                         uint32_t column_family_id,
1035                                         const SliceParts& key) {
1036   LocalSavePoint save(b);
1037   WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1038   if (column_family_id == 0) {
1039     b->rep_.push_back(static_cast<char>(kTypeSingleDeletion));
1040   } else {
1041     b->rep_.push_back(static_cast<char>(kTypeColumnFamilySingleDeletion));
1042     PutVarint32(&b->rep_, column_family_id);
1043   }
1044   PutLengthPrefixedSliceParts(&b->rep_, key);
1045   b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1046                               ContentFlags::HAS_SINGLE_DELETE,
1047                           std::memory_order_relaxed);
1048   if (b->prot_info_ != nullptr) {
1049     // See comment in first `WriteBatchInternal::Put()` overload concerning the
1050     // `ValueType` argument passed to `ProtectKVO()`.
1051     b->prot_info_->entries_.emplace_back(
1052         ProtectionInfo64()
1053             .ProtectKVO(key,
1054                         SliceParts(nullptr /* _parts */,
1055                                    0 /* _num_parts */) /* value */,
1056                         kTypeSingleDeletion)
1057             .ProtectC(column_family_id));
1058   }
1059   return save.commit();
1060 }
1061 
SingleDelete(ColumnFamilyHandle * column_family,const SliceParts & key)1062 Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family,
1063                                 const SliceParts& key) {
1064   return WriteBatchInternal::SingleDelete(
1065       this, GetColumnFamilyID(column_family), key);
1066 }
1067 
DeleteRange(WriteBatch * b,uint32_t column_family_id,const Slice & begin_key,const Slice & end_key)1068 Status WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id,
1069                                        const Slice& begin_key,
1070                                        const Slice& end_key) {
1071   LocalSavePoint save(b);
1072   WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1073   if (column_family_id == 0) {
1074     b->rep_.push_back(static_cast<char>(kTypeRangeDeletion));
1075   } else {
1076     b->rep_.push_back(static_cast<char>(kTypeColumnFamilyRangeDeletion));
1077     PutVarint32(&b->rep_, column_family_id);
1078   }
1079   PutLengthPrefixedSlice(&b->rep_, begin_key);
1080   PutLengthPrefixedSlice(&b->rep_, end_key);
1081   b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1082                               ContentFlags::HAS_DELETE_RANGE,
1083                           std::memory_order_relaxed);
1084   if (b->prot_info_ != nullptr) {
1085     // See comment in first `WriteBatchInternal::Put()` overload concerning the
1086     // `ValueType` argument passed to `ProtectKVO()`.
1087     // In `DeleteRange()`, the end key is treated as the value.
1088     b->prot_info_->entries_.emplace_back(
1089         ProtectionInfo64()
1090             .ProtectKVO(begin_key, end_key, kTypeRangeDeletion)
1091             .ProtectC(column_family_id));
1092   }
1093   return save.commit();
1094 }
1095 
DeleteRange(ColumnFamilyHandle * column_family,const Slice & begin_key,const Slice & end_key)1096 Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family,
1097                                const Slice& begin_key, const Slice& end_key) {
1098   return WriteBatchInternal::DeleteRange(this, GetColumnFamilyID(column_family),
1099                                          begin_key, end_key);
1100 }
1101 
DeleteRange(WriteBatch * b,uint32_t column_family_id,const SliceParts & begin_key,const SliceParts & end_key)1102 Status WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id,
1103                                        const SliceParts& begin_key,
1104                                        const SliceParts& end_key) {
1105   LocalSavePoint save(b);
1106   WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1107   if (column_family_id == 0) {
1108     b->rep_.push_back(static_cast<char>(kTypeRangeDeletion));
1109   } else {
1110     b->rep_.push_back(static_cast<char>(kTypeColumnFamilyRangeDeletion));
1111     PutVarint32(&b->rep_, column_family_id);
1112   }
1113   PutLengthPrefixedSliceParts(&b->rep_, begin_key);
1114   PutLengthPrefixedSliceParts(&b->rep_, end_key);
1115   b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1116                               ContentFlags::HAS_DELETE_RANGE,
1117                           std::memory_order_relaxed);
1118   if (b->prot_info_ != nullptr) {
1119     // See comment in first `WriteBatchInternal::Put()` overload concerning the
1120     // `ValueType` argument passed to `ProtectKVO()`.
1121     // In `DeleteRange()`, the end key is treated as the value.
1122     b->prot_info_->entries_.emplace_back(
1123         ProtectionInfo64()
1124             .ProtectKVO(begin_key, end_key, kTypeRangeDeletion)
1125             .ProtectC(column_family_id));
1126   }
1127   return save.commit();
1128 }
1129 
DeleteRange(ColumnFamilyHandle * column_family,const SliceParts & begin_key,const SliceParts & end_key)1130 Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family,
1131                                const SliceParts& begin_key,
1132                                const SliceParts& end_key) {
1133   return WriteBatchInternal::DeleteRange(this, GetColumnFamilyID(column_family),
1134                                          begin_key, end_key);
1135 }
1136 
Merge(WriteBatch * b,uint32_t column_family_id,const Slice & key,const Slice & value)1137 Status WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
1138                                  const Slice& key, const Slice& value) {
1139   if (key.size() > size_t{port::kMaxUint32}) {
1140     return Status::InvalidArgument("key is too large");
1141   }
1142   if (value.size() > size_t{port::kMaxUint32}) {
1143     return Status::InvalidArgument("value is too large");
1144   }
1145 
1146   LocalSavePoint save(b);
1147   WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1148   if (column_family_id == 0) {
1149     b->rep_.push_back(static_cast<char>(kTypeMerge));
1150   } else {
1151     b->rep_.push_back(static_cast<char>(kTypeColumnFamilyMerge));
1152     PutVarint32(&b->rep_, column_family_id);
1153   }
1154   PutLengthPrefixedSlice(&b->rep_, key);
1155   PutLengthPrefixedSlice(&b->rep_, value);
1156   b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1157                               ContentFlags::HAS_MERGE,
1158                           std::memory_order_relaxed);
1159   if (b->prot_info_ != nullptr) {
1160     // See comment in first `WriteBatchInternal::Put()` overload concerning the
1161     // `ValueType` argument passed to `ProtectKVO()`.
1162     b->prot_info_->entries_.emplace_back(ProtectionInfo64()
1163                                              .ProtectKVO(key, value, kTypeMerge)
1164                                              .ProtectC(column_family_id));
1165   }
1166   return save.commit();
1167 }
1168 
Merge(ColumnFamilyHandle * column_family,const Slice & key,const Slice & value)1169 Status WriteBatch::Merge(ColumnFamilyHandle* column_family, const Slice& key,
1170                          const Slice& value) {
1171   return WriteBatchInternal::Merge(this, GetColumnFamilyID(column_family), key,
1172                                    value);
1173 }
1174 
Merge(WriteBatch * b,uint32_t column_family_id,const SliceParts & key,const SliceParts & value)1175 Status WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
1176                                  const SliceParts& key,
1177                                  const SliceParts& value) {
1178   Status s = CheckSlicePartsLength(key, value);
1179   if (!s.ok()) {
1180     return s;
1181   }
1182 
1183   LocalSavePoint save(b);
1184   WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1185   if (column_family_id == 0) {
1186     b->rep_.push_back(static_cast<char>(kTypeMerge));
1187   } else {
1188     b->rep_.push_back(static_cast<char>(kTypeColumnFamilyMerge));
1189     PutVarint32(&b->rep_, column_family_id);
1190   }
1191   PutLengthPrefixedSliceParts(&b->rep_, key);
1192   PutLengthPrefixedSliceParts(&b->rep_, value);
1193   b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1194                               ContentFlags::HAS_MERGE,
1195                           std::memory_order_relaxed);
1196   if (b->prot_info_ != nullptr) {
1197     // See comment in first `WriteBatchInternal::Put()` overload concerning the
1198     // `ValueType` argument passed to `ProtectKVO()`.
1199     b->prot_info_->entries_.emplace_back(ProtectionInfo64()
1200                                              .ProtectKVO(key, value, kTypeMerge)
1201                                              .ProtectC(column_family_id));
1202   }
1203   return save.commit();
1204 }
1205 
Merge(ColumnFamilyHandle * column_family,const SliceParts & key,const SliceParts & value)1206 Status WriteBatch::Merge(ColumnFamilyHandle* column_family,
1207                          const SliceParts& key, const SliceParts& value) {
1208   return WriteBatchInternal::Merge(this, GetColumnFamilyID(column_family), key,
1209                                    value);
1210 }
1211 
PutBlobIndex(WriteBatch * b,uint32_t column_family_id,const Slice & key,const Slice & value)1212 Status WriteBatchInternal::PutBlobIndex(WriteBatch* b,
1213                                         uint32_t column_family_id,
1214                                         const Slice& key, const Slice& value) {
1215   LocalSavePoint save(b);
1216   WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1217   if (column_family_id == 0) {
1218     b->rep_.push_back(static_cast<char>(kTypeBlobIndex));
1219   } else {
1220     b->rep_.push_back(static_cast<char>(kTypeColumnFamilyBlobIndex));
1221     PutVarint32(&b->rep_, column_family_id);
1222   }
1223   PutLengthPrefixedSlice(&b->rep_, key);
1224   PutLengthPrefixedSlice(&b->rep_, value);
1225   b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1226                               ContentFlags::HAS_BLOB_INDEX,
1227                           std::memory_order_relaxed);
1228   if (b->prot_info_ != nullptr) {
1229     // See comment in first `WriteBatchInternal::Put()` overload concerning the
1230     // `ValueType` argument passed to `ProtectKVO()`.
1231     b->prot_info_->entries_.emplace_back(
1232         ProtectionInfo64()
1233             .ProtectKVO(key, value, kTypeBlobIndex)
1234             .ProtectC(column_family_id));
1235   }
1236   return save.commit();
1237 }
1238 
PutLogData(const Slice & blob)1239 Status WriteBatch::PutLogData(const Slice& blob) {
1240   LocalSavePoint save(this);
1241   rep_.push_back(static_cast<char>(kTypeLogData));
1242   PutLengthPrefixedSlice(&rep_, blob);
1243   return save.commit();
1244 }
1245 
SetSavePoint()1246 void WriteBatch::SetSavePoint() {
1247   if (save_points_ == nullptr) {
1248     save_points_.reset(new SavePoints());
1249   }
1250   // Record length and count of current batch of writes.
1251   save_points_->stack.push(SavePoint(
1252       GetDataSize(), Count(), content_flags_.load(std::memory_order_relaxed)));
1253 }
1254 
RollbackToSavePoint()1255 Status WriteBatch::RollbackToSavePoint() {
1256   if (save_points_ == nullptr || save_points_->stack.size() == 0) {
1257     return Status::NotFound();
1258   }
1259 
1260   // Pop the most recent savepoint off the stack
1261   SavePoint savepoint = save_points_->stack.top();
1262   save_points_->stack.pop();
1263 
1264   assert(savepoint.size <= rep_.size());
1265   assert(static_cast<uint32_t>(savepoint.count) <= Count());
1266 
1267   if (savepoint.size == rep_.size()) {
1268     // No changes to rollback
1269   } else if (savepoint.size == 0) {
1270     // Rollback everything
1271     Clear();
1272   } else {
1273     rep_.resize(savepoint.size);
1274     if (prot_info_ != nullptr) {
1275       prot_info_->entries_.resize(savepoint.count);
1276     }
1277     WriteBatchInternal::SetCount(this, savepoint.count);
1278     content_flags_.store(savepoint.content_flags, std::memory_order_relaxed);
1279   }
1280 
1281   return Status::OK();
1282 }
1283 
PopSavePoint()1284 Status WriteBatch::PopSavePoint() {
1285   if (save_points_ == nullptr || save_points_->stack.size() == 0) {
1286     return Status::NotFound();
1287   }
1288 
1289   // Pop the most recent savepoint off the stack
1290   save_points_->stack.pop();
1291 
1292   return Status::OK();
1293 }
1294 
AssignTimestamp(const Slice & ts)1295 Status WriteBatch::AssignTimestamp(const Slice& ts) {
1296   TimestampAssigner ts_assigner(ts, prot_info_.get());
1297   return Iterate(&ts_assigner);
1298 }
1299 
AssignTimestamps(const std::vector<Slice> & ts_list)1300 Status WriteBatch::AssignTimestamps(const std::vector<Slice>& ts_list) {
1301   TimestampAssigner ts_assigner(ts_list, prot_info_.get());
1302   return Iterate(&ts_assigner);
1303 }
1304 
1305 class MemTableInserter : public WriteBatch::Handler {
1306 
1307   SequenceNumber sequence_;
1308   ColumnFamilyMemTables* const cf_mems_;
1309   FlushScheduler* const flush_scheduler_;
1310   TrimHistoryScheduler* const trim_history_scheduler_;
1311   const bool ignore_missing_column_families_;
1312   const uint64_t recovering_log_number_;
1313   // log number that all Memtables inserted into should reference
1314   uint64_t log_number_ref_;
1315   DBImpl* db_;
1316   const bool concurrent_memtable_writes_;
1317   bool       post_info_created_;
1318   const WriteBatch::ProtectionInfo* prot_info_;
1319   size_t prot_info_idx_;
1320 
1321   bool* has_valid_writes_;
1322   // On some (!) platforms just default creating
1323   // a map is too expensive in the Write() path as they
1324   // cause memory allocations though unused.
1325   // Make creation optional but do not incur
1326   // std::unique_ptr additional allocation
1327   using MemPostInfoMap = std::map<MemTable*, MemTablePostProcessInfo>;
1328   using PostMapType = std::aligned_storage<sizeof(MemPostInfoMap)>::type;
1329   PostMapType mem_post_info_map_;
1330   // current recovered transaction we are rebuilding (recovery)
1331   WriteBatch* rebuilding_trx_;
1332   SequenceNumber rebuilding_trx_seq_;
1333   // Increase seq number once per each write batch. Otherwise increase it once
1334   // per key.
1335   bool seq_per_batch_;
1336   // Whether the memtable write will be done only after the commit
1337   bool write_after_commit_;
1338   // Whether memtable write can be done before prepare
1339   bool write_before_prepare_;
1340   // Whether this batch was unprepared or not
1341   bool unprepared_batch_;
1342   using DupDetector = std::aligned_storage<sizeof(DuplicateDetector)>::type;
1343   DupDetector       duplicate_detector_;
1344   bool              dup_dectector_on_;
1345 
1346   bool hint_per_batch_;
1347   bool hint_created_;
1348   // Hints for this batch
1349   using HintMap = std::unordered_map<MemTable*, void*>;
1350   using HintMapType = std::aligned_storage<sizeof(HintMap)>::type;
1351   HintMapType hint_;
1352 
GetHintMap()1353   HintMap& GetHintMap() {
1354     assert(hint_per_batch_);
1355     if (!hint_created_) {
1356       new (&hint_) HintMap();
1357       hint_created_ = true;
1358     }
1359     return *reinterpret_cast<HintMap*>(&hint_);
1360   }
1361 
GetPostMap()1362   MemPostInfoMap& GetPostMap() {
1363     assert(concurrent_memtable_writes_);
1364     if(!post_info_created_) {
1365       new (&mem_post_info_map_) MemPostInfoMap();
1366       post_info_created_ = true;
1367     }
1368     return *reinterpret_cast<MemPostInfoMap*>(&mem_post_info_map_);
1369   }
1370 
IsDuplicateKeySeq(uint32_t column_family_id,const Slice & key)1371   bool IsDuplicateKeySeq(uint32_t column_family_id, const Slice& key) {
1372     assert(!write_after_commit_);
1373     assert(rebuilding_trx_ != nullptr);
1374     if (!dup_dectector_on_) {
1375       new (&duplicate_detector_) DuplicateDetector(db_);
1376       dup_dectector_on_ = true;
1377     }
1378     return reinterpret_cast<DuplicateDetector*>
1379       (&duplicate_detector_)->IsDuplicateKeySeq(column_family_id, key, sequence_);
1380   }
1381 
NextProtectionInfo()1382   const ProtectionInfoKVOC64* NextProtectionInfo() {
1383     const ProtectionInfoKVOC64* res = nullptr;
1384     if (prot_info_ != nullptr) {
1385       assert(prot_info_idx_ < prot_info_->entries_.size());
1386       res = &prot_info_->entries_[prot_info_idx_];
1387       ++prot_info_idx_;
1388     }
1389     return res;
1390   }
1391 
1392  protected:
WriteBeforePrepare() const1393   bool WriteBeforePrepare() const override { return write_before_prepare_; }
WriteAfterCommit() const1394   bool WriteAfterCommit() const override { return write_after_commit_; }
1395 
1396  public:
1397   // cf_mems should not be shared with concurrent inserters
MemTableInserter(SequenceNumber _sequence,ColumnFamilyMemTables * cf_mems,FlushScheduler * flush_scheduler,TrimHistoryScheduler * trim_history_scheduler,bool ignore_missing_column_families,uint64_t recovering_log_number,DB * db,bool concurrent_memtable_writes,const WriteBatch::ProtectionInfo * prot_info,bool * has_valid_writes=nullptr,bool seq_per_batch=false,bool batch_per_txn=true,bool hint_per_batch=false)1398   MemTableInserter(SequenceNumber _sequence, ColumnFamilyMemTables* cf_mems,
1399                    FlushScheduler* flush_scheduler,
1400                    TrimHistoryScheduler* trim_history_scheduler,
1401                    bool ignore_missing_column_families,
1402                    uint64_t recovering_log_number, DB* db,
1403                    bool concurrent_memtable_writes,
1404                    const WriteBatch::ProtectionInfo* prot_info,
1405                    bool* has_valid_writes = nullptr, bool seq_per_batch = false,
1406                    bool batch_per_txn = true, bool hint_per_batch = false)
1407       : sequence_(_sequence),
1408         cf_mems_(cf_mems),
1409         flush_scheduler_(flush_scheduler),
1410         trim_history_scheduler_(trim_history_scheduler),
1411         ignore_missing_column_families_(ignore_missing_column_families),
1412         recovering_log_number_(recovering_log_number),
1413         log_number_ref_(0),
1414         db_(static_cast_with_check<DBImpl>(db)),
1415         concurrent_memtable_writes_(concurrent_memtable_writes),
1416         post_info_created_(false),
1417         prot_info_(prot_info),
1418         prot_info_idx_(0),
1419         has_valid_writes_(has_valid_writes),
1420         rebuilding_trx_(nullptr),
1421         rebuilding_trx_seq_(0),
1422         seq_per_batch_(seq_per_batch),
1423         // Write after commit currently uses one seq per key (instead of per
1424         // batch). So seq_per_batch being false indicates write_after_commit
1425         // approach.
1426         write_after_commit_(!seq_per_batch),
1427         // WriteUnprepared can write WriteBatches per transaction, so
1428         // batch_per_txn being false indicates write_before_prepare.
1429         write_before_prepare_(!batch_per_txn),
1430         unprepared_batch_(false),
1431         duplicate_detector_(),
1432         dup_dectector_on_(false),
1433         hint_per_batch_(hint_per_batch),
1434         hint_created_(false) {
1435     assert(cf_mems_);
1436   }
1437 
~MemTableInserter()1438   ~MemTableInserter() override {
1439     if (dup_dectector_on_) {
1440       reinterpret_cast<DuplicateDetector*>
1441         (&duplicate_detector_)->~DuplicateDetector();
1442     }
1443     if (post_info_created_) {
1444       reinterpret_cast<MemPostInfoMap*>
1445         (&mem_post_info_map_)->~MemPostInfoMap();
1446     }
1447     if (hint_created_) {
1448       for (auto iter : GetHintMap()) {
1449         delete[] reinterpret_cast<char*>(iter.second);
1450       }
1451       reinterpret_cast<HintMap*>(&hint_)->~HintMap();
1452     }
1453     delete rebuilding_trx_;
1454   }
1455 
1456   MemTableInserter(const MemTableInserter&) = delete;
1457   MemTableInserter& operator=(const MemTableInserter&) = delete;
1458 
1459   // The batch seq is regularly restarted; In normal mode it is set when
1460   // MemTableInserter is constructed in the write thread and in recovery mode it
1461   // is set when a batch, which is tagged with seq, is read from the WAL.
1462   // Within a sequenced batch, which could be a merge of multiple batches, we
1463   // have two policies to advance the seq: i) seq_per_key (default) and ii)
1464   // seq_per_batch. To implement the latter we need to mark the boundary between
1465   // the individual batches. The approach is this: 1) Use the terminating
1466   // markers to indicate the boundary (kTypeEndPrepareXID, kTypeCommitXID,
1467   // kTypeRollbackXID) 2) Terminate a batch with kTypeNoop in the absence of a
1468   // natural boundary marker.
MaybeAdvanceSeq(bool batch_boundry=false)1469   void MaybeAdvanceSeq(bool batch_boundry = false) {
1470     if (batch_boundry == seq_per_batch_) {
1471       sequence_++;
1472     }
1473   }
1474 
set_log_number_ref(uint64_t log)1475   void set_log_number_ref(uint64_t log) { log_number_ref_ = log; }
set_prot_info(const WriteBatch::ProtectionInfo * prot_info)1476   void set_prot_info(const WriteBatch::ProtectionInfo* prot_info) {
1477     prot_info_ = prot_info;
1478     prot_info_idx_ = 0;
1479   }
1480 
sequence() const1481   SequenceNumber sequence() const { return sequence_; }
1482 
PostProcess()1483   void PostProcess() {
1484     assert(concurrent_memtable_writes_);
1485     // If post info was not created there is nothing
1486     // to process and no need to create on demand
1487     if(post_info_created_) {
1488       for (auto& pair : GetPostMap()) {
1489         pair.first->BatchPostProcess(pair.second);
1490       }
1491     }
1492   }
1493 
SeekToColumnFamily(uint32_t column_family_id,Status * s)1494   bool SeekToColumnFamily(uint32_t column_family_id, Status* s) {
1495     // If we are in a concurrent mode, it is the caller's responsibility
1496     // to clone the original ColumnFamilyMemTables so that each thread
1497     // has its own instance.  Otherwise, it must be guaranteed that there
1498     // is no concurrent access
1499     bool found = cf_mems_->Seek(column_family_id);
1500     if (!found) {
1501       if (ignore_missing_column_families_) {
1502         *s = Status::OK();
1503       } else {
1504         *s = Status::InvalidArgument(
1505             "Invalid column family specified in write batch");
1506       }
1507       return false;
1508     }
1509     if (recovering_log_number_ != 0 &&
1510         recovering_log_number_ < cf_mems_->GetLogNumber()) {
1511       // This is true only in recovery environment (recovering_log_number_ is
1512       // always 0 in
1513       // non-recovery, regular write code-path)
1514       // * If recovering_log_number_ < cf_mems_->GetLogNumber(), this means that
1515       // column
1516       // family already contains updates from this log. We can't apply updates
1517       // twice because of update-in-place or merge workloads -- ignore the
1518       // update
1519       *s = Status::OK();
1520       return false;
1521     }
1522 
1523     if (has_valid_writes_ != nullptr) {
1524       *has_valid_writes_ = true;
1525     }
1526 
1527     if (log_number_ref_ > 0) {
1528       cf_mems_->GetMemTable()->RefLogContainingPrepSection(log_number_ref_);
1529     }
1530 
1531     return true;
1532   }
1533 
PutCFImpl(uint32_t column_family_id,const Slice & key,const Slice & value,ValueType value_type,const ProtectionInfoKVOS64 * kv_prot_info)1534   Status PutCFImpl(uint32_t column_family_id, const Slice& key,
1535                    const Slice& value, ValueType value_type,
1536                    const ProtectionInfoKVOS64* kv_prot_info) {
1537     // optimize for non-recovery mode
1538     if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
1539       // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
1540       return WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key,
1541                                      value);
1542       // else insert the values to the memtable right away
1543     }
1544 
1545     Status ret_status;
1546     if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
1547       if (ret_status.ok() && rebuilding_trx_ != nullptr) {
1548         assert(!write_after_commit_);
1549         // The CF is probably flushed and hence no need for insert but we still
1550         // need to keep track of the keys for upcoming rollback/commit.
1551         // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
1552         ret_status = WriteBatchInternal::Put(rebuilding_trx_, column_family_id,
1553                                              key, value);
1554         if (ret_status.ok()) {
1555           MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, key));
1556         }
1557       } else if (ret_status.ok()) {
1558         MaybeAdvanceSeq(false /* batch_boundary */);
1559       }
1560       return ret_status;
1561     }
1562     assert(ret_status.ok());
1563 
1564     MemTable* mem = cf_mems_->GetMemTable();
1565     auto* moptions = mem->GetImmutableMemTableOptions();
1566     // inplace_update_support is inconsistent with snapshots, and therefore with
1567     // any kind of transactions including the ones that use seq_per_batch
1568     assert(!seq_per_batch_ || !moptions->inplace_update_support);
1569     if (!moptions->inplace_update_support) {
1570       ret_status =
1571           mem->Add(sequence_, value_type, key, value, kv_prot_info,
1572                    concurrent_memtable_writes_, get_post_process_info(mem),
1573                    hint_per_batch_ ? &GetHintMap()[mem] : nullptr);
1574     } else if (moptions->inplace_callback == nullptr) {
1575       assert(!concurrent_memtable_writes_);
1576       ret_status = mem->Update(sequence_, key, value, kv_prot_info);
1577     } else {
1578       assert(!concurrent_memtable_writes_);
1579       ret_status = mem->UpdateCallback(sequence_, key, value, kv_prot_info);
1580       if (ret_status.IsNotFound()) {
1581         // key not found in memtable. Do sst get, update, add
1582         SnapshotImpl read_from_snapshot;
1583         read_from_snapshot.number_ = sequence_;
1584         ReadOptions ropts;
1585         // it's going to be overwritten for sure, so no point caching data block
1586         // containing the old version
1587         ropts.fill_cache = false;
1588         ropts.snapshot = &read_from_snapshot;
1589 
1590         std::string prev_value;
1591         std::string merged_value;
1592 
1593         auto cf_handle = cf_mems_->GetColumnFamilyHandle();
1594         Status get_status = Status::NotSupported();
1595         if (db_ != nullptr && recovering_log_number_ == 0) {
1596           if (cf_handle == nullptr) {
1597             cf_handle = db_->DefaultColumnFamily();
1598           }
1599           get_status = db_->Get(ropts, cf_handle, key, &prev_value);
1600         }
1601         // Intentionally overwrites the `NotFound` in `ret_status`.
1602         if (!get_status.ok() && !get_status.IsNotFound()) {
1603           ret_status = get_status;
1604         } else {
1605           ret_status = Status::OK();
1606         }
1607         if (ret_status.ok()) {
1608           UpdateStatus update_status;
1609           char* prev_buffer = const_cast<char*>(prev_value.c_str());
1610           uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
1611           if (get_status.ok()) {
1612             update_status = moptions->inplace_callback(prev_buffer, &prev_size,
1613                                                        value, &merged_value);
1614           } else {
1615             update_status = moptions->inplace_callback(
1616                 nullptr /* existing_value */, nullptr /* existing_value_size */,
1617                 value, &merged_value);
1618           }
1619           if (update_status == UpdateStatus::UPDATED_INPLACE) {
1620             assert(get_status.ok());
1621             if (kv_prot_info != nullptr) {
1622               ProtectionInfoKVOS64 updated_kv_prot_info(*kv_prot_info);
1623               updated_kv_prot_info.UpdateV(value,
1624                                            Slice(prev_buffer, prev_size));
1625               // prev_value is updated in-place with final value.
1626               ret_status = mem->Add(sequence_, value_type, key,
1627                                     Slice(prev_buffer, prev_size),
1628                                     &updated_kv_prot_info);
1629             } else {
1630               ret_status = mem->Add(sequence_, value_type, key,
1631                                     Slice(prev_buffer, prev_size),
1632                                     nullptr /* kv_prot_info */);
1633             }
1634             if (ret_status.ok()) {
1635               RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
1636             }
1637           } else if (update_status == UpdateStatus::UPDATED) {
1638             if (kv_prot_info != nullptr) {
1639               ProtectionInfoKVOS64 updated_kv_prot_info(*kv_prot_info);
1640               updated_kv_prot_info.UpdateV(value, merged_value);
1641               // merged_value contains the final value.
1642               ret_status = mem->Add(sequence_, value_type, key,
1643                                     Slice(merged_value), &updated_kv_prot_info);
1644             } else {
1645               // merged_value contains the final value.
1646               ret_status =
1647                   mem->Add(sequence_, value_type, key, Slice(merged_value),
1648                            nullptr /* kv_prot_info */);
1649             }
1650             if (ret_status.ok()) {
1651               RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
1652             }
1653           }
1654         }
1655       }
1656     }
1657     if (UNLIKELY(ret_status.IsTryAgain())) {
1658       assert(seq_per_batch_);
1659       const bool kBatchBoundary = true;
1660       MaybeAdvanceSeq(kBatchBoundary);
1661     } else if (ret_status.ok()) {
1662       MaybeAdvanceSeq();
1663       CheckMemtableFull();
1664     }
1665     // optimize for non-recovery mode
1666     // If `ret_status` is `TryAgain` then the next (successful) try will add
1667     // the key to the rebuilding transaction object. If `ret_status` is
1668     // another non-OK `Status`, then the `rebuilding_trx_` will be thrown
1669     // away. So we only need to add to it when `ret_status.ok()`.
1670     if (UNLIKELY(ret_status.ok() && rebuilding_trx_ != nullptr)) {
1671       assert(!write_after_commit_);
1672       // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
1673       ret_status = WriteBatchInternal::Put(rebuilding_trx_, column_family_id,
1674                                            key, value);
1675     }
1676     return ret_status;
1677   }
1678 
PutCF(uint32_t column_family_id,const Slice & key,const Slice & value)1679   Status PutCF(uint32_t column_family_id, const Slice& key,
1680                const Slice& value) override {
1681     const auto* kv_prot_info = NextProtectionInfo();
1682     if (kv_prot_info != nullptr) {
1683       // Memtable needs seqno, doesn't need CF ID
1684       auto mem_kv_prot_info =
1685           kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
1686       return PutCFImpl(column_family_id, key, value, kTypeValue,
1687                        &mem_kv_prot_info);
1688     }
1689     return PutCFImpl(column_family_id, key, value, kTypeValue,
1690                      nullptr /* kv_prot_info */);
1691   }
1692 
DeleteImpl(uint32_t,const Slice & key,const Slice & value,ValueType delete_type,const ProtectionInfoKVOS64 * kv_prot_info)1693   Status DeleteImpl(uint32_t /*column_family_id*/, const Slice& key,
1694                     const Slice& value, ValueType delete_type,
1695                     const ProtectionInfoKVOS64* kv_prot_info) {
1696     Status ret_status;
1697     MemTable* mem = cf_mems_->GetMemTable();
1698     ret_status =
1699         mem->Add(sequence_, delete_type, key, value, kv_prot_info,
1700                  concurrent_memtable_writes_, get_post_process_info(mem),
1701                  hint_per_batch_ ? &GetHintMap()[mem] : nullptr);
1702     if (UNLIKELY(ret_status.IsTryAgain())) {
1703       assert(seq_per_batch_);
1704       const bool kBatchBoundary = true;
1705       MaybeAdvanceSeq(kBatchBoundary);
1706     } else if (ret_status.ok()) {
1707       MaybeAdvanceSeq();
1708       CheckMemtableFull();
1709     }
1710     return ret_status;
1711   }
1712 
DeleteCF(uint32_t column_family_id,const Slice & key)1713   Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
1714     const auto* kv_prot_info = NextProtectionInfo();
1715     // optimize for non-recovery mode
1716     if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
1717       // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
1718       return WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
1719       // else insert the values to the memtable right away
1720     }
1721 
1722     Status ret_status;
1723     if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
1724       if (ret_status.ok() && rebuilding_trx_ != nullptr) {
1725         assert(!write_after_commit_);
1726         // The CF is probably flushed and hence no need for insert but we still
1727         // need to keep track of the keys for upcoming rollback/commit.
1728         // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
1729         ret_status =
1730             WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
1731         if (ret_status.ok()) {
1732           MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, key));
1733         }
1734       } else if (ret_status.ok()) {
1735         MaybeAdvanceSeq(false /* batch_boundary */);
1736       }
1737       return ret_status;
1738     }
1739 
1740     ColumnFamilyData* cfd = cf_mems_->current();
1741     assert(!cfd || cfd->user_comparator());
1742     const size_t ts_sz = (cfd && cfd->user_comparator())
1743                              ? cfd->user_comparator()->timestamp_size()
1744                              : 0;
1745     const ValueType delete_type =
1746         (0 == ts_sz) ? kTypeDeletion : kTypeDeletionWithTimestamp;
1747     if (kv_prot_info != nullptr) {
1748       auto mem_kv_prot_info =
1749           kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
1750       mem_kv_prot_info.UpdateO(kTypeDeletion, delete_type);
1751       ret_status = DeleteImpl(column_family_id, key, Slice(), delete_type,
1752                               &mem_kv_prot_info);
1753     } else {
1754       ret_status = DeleteImpl(column_family_id, key, Slice(), delete_type,
1755                               nullptr /* kv_prot_info */);
1756     }
1757     // optimize for non-recovery mode
1758     // If `ret_status` is `TryAgain` then the next (successful) try will add
1759     // the key to the rebuilding transaction object. If `ret_status` is
1760     // another non-OK `Status`, then the `rebuilding_trx_` will be thrown
1761     // away. So we only need to add to it when `ret_status.ok()`.
1762     if (UNLIKELY(ret_status.ok() && rebuilding_trx_ != nullptr)) {
1763       assert(!write_after_commit_);
1764       // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
1765       ret_status =
1766           WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
1767     }
1768     return ret_status;
1769   }
1770 
SingleDeleteCF(uint32_t column_family_id,const Slice & key)1771   Status SingleDeleteCF(uint32_t column_family_id, const Slice& key) override {
1772     const auto* kv_prot_info = NextProtectionInfo();
1773     // optimize for non-recovery mode
1774     if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
1775       // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
1776       return WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id,
1777                                               key);
1778       // else insert the values to the memtable right away
1779     }
1780 
1781     Status ret_status;
1782     if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
1783       if (ret_status.ok() && rebuilding_trx_ != nullptr) {
1784         assert(!write_after_commit_);
1785         // The CF is probably flushed and hence no need for insert but we still
1786         // need to keep track of the keys for upcoming rollback/commit.
1787         // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
1788         ret_status = WriteBatchInternal::SingleDelete(rebuilding_trx_,
1789                                                       column_family_id, key);
1790         if (ret_status.ok()) {
1791           MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, key));
1792         }
1793       } else if (ret_status.ok()) {
1794         MaybeAdvanceSeq(false /* batch_boundary */);
1795       }
1796       return ret_status;
1797     }
1798     assert(ret_status.ok());
1799 
1800     if (kv_prot_info != nullptr) {
1801       auto mem_kv_prot_info =
1802           kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
1803       ret_status = DeleteImpl(column_family_id, key, Slice(),
1804                               kTypeSingleDeletion, &mem_kv_prot_info);
1805     } else {
1806       ret_status = DeleteImpl(column_family_id, key, Slice(),
1807                               kTypeSingleDeletion, nullptr /* kv_prot_info */);
1808     }
1809     // optimize for non-recovery mode
1810     // If `ret_status` is `TryAgain` then the next (successful) try will add
1811     // the key to the rebuilding transaction object. If `ret_status` is
1812     // another non-OK `Status`, then the `rebuilding_trx_` will be thrown
1813     // away. So we only need to add to it when `ret_status.ok()`.
1814     if (UNLIKELY(ret_status.ok() && rebuilding_trx_ != nullptr)) {
1815       assert(!write_after_commit_);
1816       // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
1817       ret_status = WriteBatchInternal::SingleDelete(rebuilding_trx_,
1818                                                     column_family_id, key);
1819     }
1820     return ret_status;
1821   }
1822 
DeleteRangeCF(uint32_t column_family_id,const Slice & begin_key,const Slice & end_key)1823   Status DeleteRangeCF(uint32_t column_family_id, const Slice& begin_key,
1824                        const Slice& end_key) override {
1825     const auto* kv_prot_info = NextProtectionInfo();
1826     // optimize for non-recovery mode
1827     if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
1828       // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
1829       return WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id,
1830                                              begin_key, end_key);
1831       // else insert the values to the memtable right away
1832     }
1833 
1834     Status ret_status;
1835     if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
1836       if (ret_status.ok() && rebuilding_trx_ != nullptr) {
1837         assert(!write_after_commit_);
1838         // The CF is probably flushed and hence no need for insert but we still
1839         // need to keep track of the keys for upcoming rollback/commit.
1840         // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
1841         ret_status = WriteBatchInternal::DeleteRange(
1842             rebuilding_trx_, column_family_id, begin_key, end_key);
1843         if (ret_status.ok()) {
1844           MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, begin_key));
1845         }
1846       } else if (ret_status.ok()) {
1847         MaybeAdvanceSeq(false /* batch_boundary */);
1848       }
1849       return ret_status;
1850     }
1851     assert(ret_status.ok());
1852 
1853     if (db_ != nullptr) {
1854       auto cf_handle = cf_mems_->GetColumnFamilyHandle();
1855       if (cf_handle == nullptr) {
1856         cf_handle = db_->DefaultColumnFamily();
1857       }
1858       auto* cfd =
1859           static_cast_with_check<ColumnFamilyHandleImpl>(cf_handle)->cfd();
1860       if (!cfd->is_delete_range_supported()) {
1861         // TODO(ajkr): refactor `SeekToColumnFamily()` so it returns a `Status`.
1862         ret_status.PermitUncheckedError();
1863         return Status::NotSupported(
1864             std::string("DeleteRange not supported for table type ") +
1865             cfd->ioptions()->table_factory->Name() + " in CF " +
1866             cfd->GetName());
1867       }
1868       int cmp = cfd->user_comparator()->Compare(begin_key, end_key);
1869       if (cmp > 0) {
1870         // TODO(ajkr): refactor `SeekToColumnFamily()` so it returns a `Status`.
1871         ret_status.PermitUncheckedError();
1872         // It's an empty range where endpoints appear mistaken. Don't bother
1873         // applying it to the DB, and return an error to the user.
1874         return Status::InvalidArgument("end key comes before start key");
1875       } else if (cmp == 0) {
1876         // TODO(ajkr): refactor `SeekToColumnFamily()` so it returns a `Status`.
1877         ret_status.PermitUncheckedError();
1878         // It's an empty range. Don't bother applying it to the DB.
1879         return Status::OK();
1880       }
1881     }
1882 
1883     if (kv_prot_info != nullptr) {
1884       auto mem_kv_prot_info =
1885           kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
1886       ret_status = DeleteImpl(column_family_id, begin_key, end_key,
1887                               kTypeRangeDeletion, &mem_kv_prot_info);
1888     } else {
1889       ret_status = DeleteImpl(column_family_id, begin_key, end_key,
1890                               kTypeRangeDeletion, nullptr /* kv_prot_info */);
1891     }
1892     // optimize for non-recovery mode
1893     // If `ret_status` is `TryAgain` then the next (successful) try will add
1894     // the key to the rebuilding transaction object. If `ret_status` is
1895     // another non-OK `Status`, then the `rebuilding_trx_` will be thrown
1896     // away. So we only need to add to it when `ret_status.ok()`.
1897     if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
1898       assert(!write_after_commit_);
1899       // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
1900       ret_status = WriteBatchInternal::DeleteRange(
1901           rebuilding_trx_, column_family_id, begin_key, end_key);
1902     }
1903     return ret_status;
1904   }
1905 
MergeCF(uint32_t column_family_id,const Slice & key,const Slice & value)1906   Status MergeCF(uint32_t column_family_id, const Slice& key,
1907                  const Slice& value) override {
1908     const auto* kv_prot_info = NextProtectionInfo();
1909     // optimize for non-recovery mode
1910     if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
1911       // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
1912       return WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key,
1913                                        value);
1914       // else insert the values to the memtable right away
1915     }
1916 
1917     Status ret_status;
1918     if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
1919       if (ret_status.ok() && rebuilding_trx_ != nullptr) {
1920         assert(!write_after_commit_);
1921         // The CF is probably flushed and hence no need for insert but we still
1922         // need to keep track of the keys for upcoming rollback/commit.
1923         // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
1924         ret_status = WriteBatchInternal::Merge(rebuilding_trx_,
1925                                                column_family_id, key, value);
1926         if (ret_status.ok()) {
1927           MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, key));
1928         }
1929       } else if (ret_status.ok()) {
1930         MaybeAdvanceSeq(false /* batch_boundary */);
1931       }
1932       return ret_status;
1933     }
1934     assert(ret_status.ok());
1935 
1936     MemTable* mem = cf_mems_->GetMemTable();
1937     auto* moptions = mem->GetImmutableMemTableOptions();
1938     if (moptions->merge_operator == nullptr) {
1939       return Status::InvalidArgument(
1940           "Merge requires `ColumnFamilyOptions::merge_operator != nullptr`");
1941     }
1942     bool perform_merge = false;
1943     assert(!concurrent_memtable_writes_ ||
1944            moptions->max_successive_merges == 0);
1945 
1946     // If we pass DB through and options.max_successive_merges is hit
1947     // during recovery, Get() will be issued which will try to acquire
1948     // DB mutex and cause deadlock, as DB mutex is already held.
1949     // So we disable merge in recovery
1950     if (moptions->max_successive_merges > 0 && db_ != nullptr &&
1951         recovering_log_number_ == 0) {
1952       assert(!concurrent_memtable_writes_);
1953       LookupKey lkey(key, sequence_);
1954 
1955       // Count the number of successive merges at the head
1956       // of the key in the memtable
1957       size_t num_merges = mem->CountSuccessiveMergeEntries(lkey);
1958 
1959       if (num_merges >= moptions->max_successive_merges) {
1960         perform_merge = true;
1961       }
1962     }
1963 
1964     if (perform_merge) {
1965       // 1) Get the existing value
1966       std::string get_value;
1967 
1968       // Pass in the sequence number so that we also include previous merge
1969       // operations in the same batch.
1970       SnapshotImpl read_from_snapshot;
1971       read_from_snapshot.number_ = sequence_;
1972       ReadOptions read_options;
1973       read_options.snapshot = &read_from_snapshot;
1974 
1975       auto cf_handle = cf_mems_->GetColumnFamilyHandle();
1976       if (cf_handle == nullptr) {
1977         cf_handle = db_->DefaultColumnFamily();
1978       }
1979       Status get_status = db_->Get(read_options, cf_handle, key, &get_value);
1980       if (!get_status.ok()) {
1981         // Failed to read a key we know exists. Store the delta in memtable.
1982         perform_merge = false;
1983       } else {
1984         Slice get_value_slice = Slice(get_value);
1985 
1986         // 2) Apply this merge
1987         auto merge_operator = moptions->merge_operator;
1988         assert(merge_operator);
1989 
1990         std::string new_value;
1991         Status merge_status = MergeHelper::TimedFullMerge(
1992             merge_operator, key, &get_value_slice, {value}, &new_value,
1993             moptions->info_log, moptions->statistics,
1994             SystemClock::Default().get());
1995 
1996         if (!merge_status.ok()) {
1997           // Failed to merge!
1998           // Store the delta in memtable
1999           perform_merge = false;
2000         } else {
2001           // 3) Add value to memtable
2002           assert(!concurrent_memtable_writes_);
2003           if (kv_prot_info != nullptr) {
2004             auto merged_kv_prot_info =
2005                 kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
2006             merged_kv_prot_info.UpdateV(value, new_value);
2007             merged_kv_prot_info.UpdateO(kTypeMerge, kTypeValue);
2008             ret_status = mem->Add(sequence_, kTypeValue, key, new_value,
2009                                   &merged_kv_prot_info);
2010           } else {
2011             ret_status = mem->Add(sequence_, kTypeValue, key, new_value,
2012                                   nullptr /* kv_prot_info */);
2013           }
2014         }
2015       }
2016     }
2017 
2018     if (!perform_merge) {
2019       assert(ret_status.ok());
2020       // Add merge operand to memtable
2021       if (kv_prot_info != nullptr) {
2022         auto mem_kv_prot_info =
2023             kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
2024         ret_status =
2025             mem->Add(sequence_, kTypeMerge, key, value, &mem_kv_prot_info,
2026                      concurrent_memtable_writes_, get_post_process_info(mem));
2027       } else {
2028         ret_status = mem->Add(
2029             sequence_, kTypeMerge, key, value, nullptr /* kv_prot_info */,
2030             concurrent_memtable_writes_, get_post_process_info(mem));
2031       }
2032     }
2033 
2034     if (UNLIKELY(ret_status.IsTryAgain())) {
2035       assert(seq_per_batch_);
2036       const bool kBatchBoundary = true;
2037       MaybeAdvanceSeq(kBatchBoundary);
2038     } else if (ret_status.ok()) {
2039       MaybeAdvanceSeq();
2040       CheckMemtableFull();
2041     }
2042     // optimize for non-recovery mode
2043     // If `ret_status` is `TryAgain` then the next (successful) try will add
2044     // the key to the rebuilding transaction object. If `ret_status` is
2045     // another non-OK `Status`, then the `rebuilding_trx_` will be thrown
2046     // away. So we only need to add to it when `ret_status.ok()`.
2047     if (UNLIKELY(ret_status.ok() && rebuilding_trx_ != nullptr)) {
2048       assert(!write_after_commit_);
2049       // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
2050       ret_status = WriteBatchInternal::Merge(rebuilding_trx_, column_family_id,
2051                                              key, value);
2052     }
2053     return ret_status;
2054   }
2055 
PutBlobIndexCF(uint32_t column_family_id,const Slice & key,const Slice & value)2056   Status PutBlobIndexCF(uint32_t column_family_id, const Slice& key,
2057                         const Slice& value) override {
2058     const auto* kv_prot_info = NextProtectionInfo();
2059     if (kv_prot_info != nullptr) {
2060       // Memtable needs seqno, doesn't need CF ID
2061       auto mem_kv_prot_info =
2062           kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
2063       // Same as PutCF except for value type.
2064       return PutCFImpl(column_family_id, key, value, kTypeBlobIndex,
2065                        &mem_kv_prot_info);
2066     } else {
2067       return PutCFImpl(column_family_id, key, value, kTypeBlobIndex,
2068                        nullptr /* kv_prot_info */);
2069     }
2070   }
2071 
CheckMemtableFull()2072   void CheckMemtableFull() {
2073     if (flush_scheduler_ != nullptr) {
2074       auto* cfd = cf_mems_->current();
2075       assert(cfd != nullptr);
2076       if (cfd->mem()->ShouldScheduleFlush() &&
2077           cfd->mem()->MarkFlushScheduled()) {
2078         // MarkFlushScheduled only returns true if we are the one that
2079         // should take action, so no need to dedup further
2080         flush_scheduler_->ScheduleWork(cfd);
2081       }
2082     }
2083     // check if memtable_list size exceeds max_write_buffer_size_to_maintain
2084     if (trim_history_scheduler_ != nullptr) {
2085       auto* cfd = cf_mems_->current();
2086 
2087       assert(cfd);
2088       assert(cfd->ioptions());
2089 
2090       const size_t size_to_maintain = static_cast<size_t>(
2091           cfd->ioptions()->max_write_buffer_size_to_maintain);
2092 
2093       if (size_to_maintain > 0) {
2094         MemTableList* const imm = cfd->imm();
2095         assert(imm);
2096 
2097         if (imm->HasHistory()) {
2098           const MemTable* const mem = cfd->mem();
2099           assert(mem);
2100 
2101           if (mem->ApproximateMemoryUsageFast() +
2102                       imm->ApproximateMemoryUsageExcludingLast() >=
2103                   size_to_maintain &&
2104               imm->MarkTrimHistoryNeeded()) {
2105             trim_history_scheduler_->ScheduleWork(cfd);
2106           }
2107         }
2108       }
2109     }
2110   }
2111 
2112   // The write batch handler calls MarkBeginPrepare with unprepare set to true
2113   // if it encounters the kTypeBeginUnprepareXID marker.
MarkBeginPrepare(bool unprepare)2114   Status MarkBeginPrepare(bool unprepare) override {
2115     assert(rebuilding_trx_ == nullptr);
2116     assert(db_);
2117 
2118     if (recovering_log_number_ != 0) {
2119       // during recovery we rebuild a hollow transaction
2120       // from all encountered prepare sections of the wal
2121       if (db_->allow_2pc() == false) {
2122         return Status::NotSupported(
2123             "WAL contains prepared transactions. Open with "
2124             "TransactionDB::Open().");
2125       }
2126 
2127       // we are now iterating through a prepared section
2128       rebuilding_trx_ = new WriteBatch();
2129       rebuilding_trx_seq_ = sequence_;
2130       // Verify that we have matching MarkBeginPrepare/MarkEndPrepare markers.
2131       // unprepared_batch_ should be false because it is false by default, and
2132       // gets reset to false in MarkEndPrepare.
2133       assert(!unprepared_batch_);
2134       unprepared_batch_ = unprepare;
2135 
2136       if (has_valid_writes_ != nullptr) {
2137         *has_valid_writes_ = true;
2138       }
2139     }
2140 
2141     return Status::OK();
2142   }
2143 
MarkEndPrepare(const Slice & name)2144   Status MarkEndPrepare(const Slice& name) override {
2145     assert(db_);
2146     assert((rebuilding_trx_ != nullptr) == (recovering_log_number_ != 0));
2147 
2148     if (recovering_log_number_ != 0) {
2149       assert(db_->allow_2pc());
2150       size_t batch_cnt =
2151           write_after_commit_
2152               ? 0  // 0 will disable further checks
2153               : static_cast<size_t>(sequence_ - rebuilding_trx_seq_ + 1);
2154       db_->InsertRecoveredTransaction(recovering_log_number_, name.ToString(),
2155                                       rebuilding_trx_, rebuilding_trx_seq_,
2156                                       batch_cnt, unprepared_batch_);
2157       unprepared_batch_ = false;
2158       rebuilding_trx_ = nullptr;
2159     } else {
2160       assert(rebuilding_trx_ == nullptr);
2161     }
2162     const bool batch_boundry = true;
2163     MaybeAdvanceSeq(batch_boundry);
2164 
2165     return Status::OK();
2166   }
2167 
MarkNoop(bool empty_batch)2168   Status MarkNoop(bool empty_batch) override {
2169     // A hack in pessimistic transaction could result into a noop at the start
2170     // of the write batch, that should be ignored.
2171     if (!empty_batch) {
2172       // In the absence of Prepare markers, a kTypeNoop tag indicates the end of
2173       // a batch. This happens when write batch commits skipping the prepare
2174       // phase.
2175       const bool batch_boundry = true;
2176       MaybeAdvanceSeq(batch_boundry);
2177     }
2178     return Status::OK();
2179   }
2180 
MarkCommit(const Slice & name)2181   Status MarkCommit(const Slice& name) override {
2182     assert(db_);
2183 
2184     Status s;
2185 
2186     if (recovering_log_number_ != 0) {
2187       // in recovery when we encounter a commit marker
2188       // we lookup this transaction in our set of rebuilt transactions
2189       // and commit.
2190       auto trx = db_->GetRecoveredTransaction(name.ToString());
2191 
2192       // the log containing the prepared section may have
2193       // been released in the last incarnation because the
2194       // data was flushed to L0
2195       if (trx != nullptr) {
2196         // at this point individual CF lognumbers will prevent
2197         // duplicate re-insertion of values.
2198         assert(log_number_ref_ == 0);
2199         if (write_after_commit_) {
2200           // write_after_commit_ can only have one batch in trx.
2201           assert(trx->batches_.size() == 1);
2202           const auto& batch_info = trx->batches_.begin()->second;
2203           // all inserts must reference this trx log number
2204           log_number_ref_ = batch_info.log_number_;
2205           s = batch_info.batch_->Iterate(this);
2206           log_number_ref_ = 0;
2207         }
2208         // else the values are already inserted before the commit
2209 
2210         if (s.ok()) {
2211           db_->DeleteRecoveredTransaction(name.ToString());
2212         }
2213         if (has_valid_writes_ != nullptr) {
2214           *has_valid_writes_ = true;
2215         }
2216       }
2217     } else {
2218       // When writes are not delayed until commit, there is no disconnect
2219       // between a memtable write and the WAL that supports it. So the commit
2220       // need not reference any log as the only log to which it depends.
2221       assert(!write_after_commit_ || log_number_ref_ > 0);
2222     }
2223     const bool batch_boundry = true;
2224     MaybeAdvanceSeq(batch_boundry);
2225 
2226     return s;
2227   }
2228 
MarkRollback(const Slice & name)2229   Status MarkRollback(const Slice& name) override {
2230     assert(db_);
2231 
2232     if (recovering_log_number_ != 0) {
2233       auto trx = db_->GetRecoveredTransaction(name.ToString());
2234 
2235       // the log containing the transactions prep section
2236       // may have been released in the previous incarnation
2237       // because we knew it had been rolled back
2238       if (trx != nullptr) {
2239         db_->DeleteRecoveredTransaction(name.ToString());
2240       }
2241     } else {
2242       // in non recovery we simply ignore this tag
2243     }
2244 
2245     const bool batch_boundry = true;
2246     MaybeAdvanceSeq(batch_boundry);
2247 
2248     return Status::OK();
2249   }
2250 
2251  private:
get_post_process_info(MemTable * mem)2252   MemTablePostProcessInfo* get_post_process_info(MemTable* mem) {
2253     if (!concurrent_memtable_writes_) {
2254       // No need to batch counters locally if we don't use concurrent mode.
2255       return nullptr;
2256     }
2257     return &GetPostMap()[mem];
2258   }
2259 };
2260 
2261 // This function can only be called in these conditions:
2262 // 1) During Recovery()
2263 // 2) During Write(), in a single-threaded write thread
2264 // 3) During Write(), in a concurrent context where memtables has been cloned
2265 // The reason is that it calls memtables->Seek(), which has a stateful cache
InsertInto(WriteThread::WriteGroup & write_group,SequenceNumber sequence,ColumnFamilyMemTables * memtables,FlushScheduler * flush_scheduler,TrimHistoryScheduler * trim_history_scheduler,bool ignore_missing_column_families,uint64_t recovery_log_number,DB * db,bool concurrent_memtable_writes,bool seq_per_batch,bool batch_per_txn)2266 Status WriteBatchInternal::InsertInto(
2267     WriteThread::WriteGroup& write_group, SequenceNumber sequence,
2268     ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
2269     TrimHistoryScheduler* trim_history_scheduler,
2270     bool ignore_missing_column_families, uint64_t recovery_log_number, DB* db,
2271     bool concurrent_memtable_writes, bool seq_per_batch, bool batch_per_txn) {
2272   MemTableInserter inserter(
2273       sequence, memtables, flush_scheduler, trim_history_scheduler,
2274       ignore_missing_column_families, recovery_log_number, db,
2275       concurrent_memtable_writes, nullptr /* prot_info */,
2276       nullptr /*has_valid_writes*/, seq_per_batch, batch_per_txn);
2277   for (auto w : write_group) {
2278     if (w->CallbackFailed()) {
2279       continue;
2280     }
2281     w->sequence = inserter.sequence();
2282     if (!w->ShouldWriteToMemtable()) {
2283       // In seq_per_batch_ mode this advances the seq by one.
2284       inserter.MaybeAdvanceSeq(true);
2285       continue;
2286     }
2287     SetSequence(w->batch, inserter.sequence());
2288     inserter.set_log_number_ref(w->log_ref);
2289     inserter.set_prot_info(w->batch->prot_info_.get());
2290     w->status = w->batch->Iterate(&inserter);
2291     if (!w->status.ok()) {
2292       return w->status;
2293     }
2294     assert(!seq_per_batch || w->batch_cnt != 0);
2295     assert(!seq_per_batch || inserter.sequence() - w->sequence == w->batch_cnt);
2296   }
2297   return Status::OK();
2298 }
2299 
InsertInto(WriteThread::Writer * writer,SequenceNumber sequence,ColumnFamilyMemTables * memtables,FlushScheduler * flush_scheduler,TrimHistoryScheduler * trim_history_scheduler,bool ignore_missing_column_families,uint64_t log_number,DB * db,bool concurrent_memtable_writes,bool seq_per_batch,size_t batch_cnt,bool batch_per_txn,bool hint_per_batch)2300 Status WriteBatchInternal::InsertInto(
2301     WriteThread::Writer* writer, SequenceNumber sequence,
2302     ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
2303     TrimHistoryScheduler* trim_history_scheduler,
2304     bool ignore_missing_column_families, uint64_t log_number, DB* db,
2305     bool concurrent_memtable_writes, bool seq_per_batch, size_t batch_cnt,
2306     bool batch_per_txn, bool hint_per_batch) {
2307 #ifdef NDEBUG
2308   (void)batch_cnt;
2309 #endif
2310   assert(writer->ShouldWriteToMemtable());
2311   MemTableInserter inserter(sequence, memtables, flush_scheduler,
2312                             trim_history_scheduler,
2313                             ignore_missing_column_families, log_number, db,
2314                             concurrent_memtable_writes, nullptr /* prot_info */,
2315                             nullptr /*has_valid_writes*/, seq_per_batch,
2316                             batch_per_txn, hint_per_batch);
2317   SetSequence(writer->batch, sequence);
2318   inserter.set_log_number_ref(writer->log_ref);
2319   inserter.set_prot_info(writer->batch->prot_info_.get());
2320   Status s = writer->batch->Iterate(&inserter);
2321   assert(!seq_per_batch || batch_cnt != 0);
2322   assert(!seq_per_batch || inserter.sequence() - sequence == batch_cnt);
2323   if (concurrent_memtable_writes) {
2324     inserter.PostProcess();
2325   }
2326   return s;
2327 }
2328 
InsertInto(const WriteBatch * batch,ColumnFamilyMemTables * memtables,FlushScheduler * flush_scheduler,TrimHistoryScheduler * trim_history_scheduler,bool ignore_missing_column_families,uint64_t log_number,DB * db,bool concurrent_memtable_writes,SequenceNumber * next_seq,bool * has_valid_writes,bool seq_per_batch,bool batch_per_txn)2329 Status WriteBatchInternal::InsertInto(
2330     const WriteBatch* batch, ColumnFamilyMemTables* memtables,
2331     FlushScheduler* flush_scheduler,
2332     TrimHistoryScheduler* trim_history_scheduler,
2333     bool ignore_missing_column_families, uint64_t log_number, DB* db,
2334     bool concurrent_memtable_writes, SequenceNumber* next_seq,
2335     bool* has_valid_writes, bool seq_per_batch, bool batch_per_txn) {
2336   MemTableInserter inserter(Sequence(batch), memtables, flush_scheduler,
2337                             trim_history_scheduler,
2338                             ignore_missing_column_families, log_number, db,
2339                             concurrent_memtable_writes, batch->prot_info_.get(),
2340                             has_valid_writes, seq_per_batch, batch_per_txn);
2341   Status s = batch->Iterate(&inserter);
2342   if (next_seq != nullptr) {
2343     *next_seq = inserter.sequence();
2344   }
2345   if (concurrent_memtable_writes) {
2346     inserter.PostProcess();
2347   }
2348   return s;
2349 }
2350 
SetContents(WriteBatch * b,const Slice & contents)2351 Status WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
2352   assert(contents.size() >= WriteBatchInternal::kHeader);
2353   assert(b->prot_info_ == nullptr);
2354   b->rep_.assign(contents.data(), contents.size());
2355   b->content_flags_.store(ContentFlags::DEFERRED, std::memory_order_relaxed);
2356   return Status::OK();
2357 }
2358 
Append(WriteBatch * dst,const WriteBatch * src,const bool wal_only)2359 Status WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src,
2360                                   const bool wal_only) {
2361   assert(dst->Count() == 0 ||
2362          (dst->prot_info_ == nullptr) == (src->prot_info_ == nullptr));
2363   size_t src_len;
2364   int src_count;
2365   uint32_t src_flags;
2366 
2367   const SavePoint& batch_end = src->GetWalTerminationPoint();
2368 
2369   if (wal_only && !batch_end.is_cleared()) {
2370     src_len = batch_end.size - WriteBatchInternal::kHeader;
2371     src_count = batch_end.count;
2372     src_flags = batch_end.content_flags;
2373   } else {
2374     src_len = src->rep_.size() - WriteBatchInternal::kHeader;
2375     src_count = Count(src);
2376     src_flags = src->content_flags_.load(std::memory_order_relaxed);
2377   }
2378 
2379   if (dst->prot_info_ != nullptr) {
2380     std::copy(src->prot_info_->entries_.begin(),
2381               src->prot_info_->entries_.begin() + src_count,
2382               std::back_inserter(dst->prot_info_->entries_));
2383   } else if (src->prot_info_ != nullptr) {
2384     dst->prot_info_.reset(new WriteBatch::ProtectionInfo(*src->prot_info_));
2385   }
2386   SetCount(dst, Count(dst) + src_count);
2387   assert(src->rep_.size() >= WriteBatchInternal::kHeader);
2388   dst->rep_.append(src->rep_.data() + WriteBatchInternal::kHeader, src_len);
2389   dst->content_flags_.store(
2390       dst->content_flags_.load(std::memory_order_relaxed) | src_flags,
2391       std::memory_order_relaxed);
2392   return Status::OK();
2393 }
2394 
AppendedByteSize(size_t leftByteSize,size_t rightByteSize)2395 size_t WriteBatchInternal::AppendedByteSize(size_t leftByteSize,
2396                                             size_t rightByteSize) {
2397   if (leftByteSize == 0 || rightByteSize == 0) {
2398     return leftByteSize + rightByteSize;
2399   } else {
2400     return leftByteSize + rightByteSize - WriteBatchInternal::kHeader;
2401   }
2402 }
2403 
2404 }  // namespace ROCKSDB_NAMESPACE
2405