1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 //
10 // WriteBatch::rep_ :=
11 //    sequence: fixed64
12 //    count: fixed32
13 //    data: record[count]
14 // record :=
15 //    kTypeValue varstring varstring
16 //    kTypeDeletion varstring
17 //    kTypeSingleDeletion varstring
18 //    kTypeRangeDeletion varstring varstring
19 //    kTypeMerge varstring varstring
20 //    kTypeColumnFamilyValue varint32 varstring varstring
21 //    kTypeColumnFamilyDeletion varint32 varstring
22 //    kTypeColumnFamilySingleDeletion varint32 varstring
23 //    kTypeColumnFamilyRangeDeletion varint32 varstring varstring
24 //    kTypeColumnFamilyMerge varint32 varstring varstring
25 //    kTypeBeginPrepareXID varstring
26 //    kTypeEndPrepareXID
27 //    kTypeCommitXID varstring
28 //    kTypeRollbackXID varstring
29 //    kTypeBeginPersistedPrepareXID varstring
30 //    kTypeBeginUnprepareXID varstring
31 //    kTypeNoop
32 // varstring :=
33 //    len: varint32
34 //    data: uint8[len]
35 
36 #include "rocksdb/write_batch.h"
37 
38 #include <map>
39 #include <stack>
40 #include <stdexcept>
41 #include <type_traits>
42 #include <unordered_map>
43 #include <vector>
44 
45 #include "db/column_family.h"
46 #include "db/db_impl/db_impl.h"
47 #include "db/dbformat.h"
48 #include "db/flush_scheduler.h"
49 #include "db/memtable.h"
50 #include "db/merge_context.h"
51 #include "db/snapshot_impl.h"
52 #include "db/trim_history_scheduler.h"
53 #include "db/write_batch_internal.h"
54 #include "monitoring/perf_context_imp.h"
55 #include "monitoring/statistics.h"
56 #include "rocksdb/merge_operator.h"
57 #include "util/autovector.h"
58 #include "util/cast_util.h"
59 #include "util/coding.h"
60 #include "util/duplicate_detector.h"
61 #include "util/string_util.h"
62 #include "util/util.h"
63 
64 namespace ROCKSDB_NAMESPACE {
65 
66 // anon namespace for file-local types
67 namespace {
68 
69 enum ContentFlags : uint32_t {
70   DEFERRED = 1 << 0,
71   HAS_PUT = 1 << 1,
72   HAS_DELETE = 1 << 2,
73   HAS_SINGLE_DELETE = 1 << 3,
74   HAS_MERGE = 1 << 4,
75   HAS_BEGIN_PREPARE = 1 << 5,
76   HAS_END_PREPARE = 1 << 6,
77   HAS_COMMIT = 1 << 7,
78   HAS_ROLLBACK = 1 << 8,
79   HAS_DELETE_RANGE = 1 << 9,
80   HAS_BLOB_INDEX = 1 << 10,
81   HAS_BEGIN_UNPREPARE = 1 << 11,
82 };
83 
84 struct BatchContentClassifier : public WriteBatch::Handler {
85   uint32_t content_flags = 0;
86 
PutCFROCKSDB_NAMESPACE::__anone6c4e62d0111::BatchContentClassifier87   Status PutCF(uint32_t, const Slice&, const Slice&) override {
88     content_flags |= ContentFlags::HAS_PUT;
89     return Status::OK();
90   }
91 
DeleteCFROCKSDB_NAMESPACE::__anone6c4e62d0111::BatchContentClassifier92   Status DeleteCF(uint32_t, const Slice&) override {
93     content_flags |= ContentFlags::HAS_DELETE;
94     return Status::OK();
95   }
96 
SingleDeleteCFROCKSDB_NAMESPACE::__anone6c4e62d0111::BatchContentClassifier97   Status SingleDeleteCF(uint32_t, const Slice&) override {
98     content_flags |= ContentFlags::HAS_SINGLE_DELETE;
99     return Status::OK();
100   }
101 
DeleteRangeCFROCKSDB_NAMESPACE::__anone6c4e62d0111::BatchContentClassifier102   Status DeleteRangeCF(uint32_t, const Slice&, const Slice&) override {
103     content_flags |= ContentFlags::HAS_DELETE_RANGE;
104     return Status::OK();
105   }
106 
MergeCFROCKSDB_NAMESPACE::__anone6c4e62d0111::BatchContentClassifier107   Status MergeCF(uint32_t, const Slice&, const Slice&) override {
108     content_flags |= ContentFlags::HAS_MERGE;
109     return Status::OK();
110   }
111 
PutBlobIndexCFROCKSDB_NAMESPACE::__anone6c4e62d0111::BatchContentClassifier112   Status PutBlobIndexCF(uint32_t, const Slice&, const Slice&) override {
113     content_flags |= ContentFlags::HAS_BLOB_INDEX;
114     return Status::OK();
115   }
116 
MarkBeginPrepareROCKSDB_NAMESPACE::__anone6c4e62d0111::BatchContentClassifier117   Status MarkBeginPrepare(bool unprepare) override {
118     content_flags |= ContentFlags::HAS_BEGIN_PREPARE;
119     if (unprepare) {
120       content_flags |= ContentFlags::HAS_BEGIN_UNPREPARE;
121     }
122     return Status::OK();
123   }
124 
MarkEndPrepareROCKSDB_NAMESPACE::__anone6c4e62d0111::BatchContentClassifier125   Status MarkEndPrepare(const Slice&) override {
126     content_flags |= ContentFlags::HAS_END_PREPARE;
127     return Status::OK();
128   }
129 
MarkCommitROCKSDB_NAMESPACE::__anone6c4e62d0111::BatchContentClassifier130   Status MarkCommit(const Slice&) override {
131     content_flags |= ContentFlags::HAS_COMMIT;
132     return Status::OK();
133   }
134 
MarkRollbackROCKSDB_NAMESPACE::__anone6c4e62d0111::BatchContentClassifier135   Status MarkRollback(const Slice&) override {
136     content_flags |= ContentFlags::HAS_ROLLBACK;
137     return Status::OK();
138   }
139 };
140 
141 class TimestampAssigner : public WriteBatch::Handler {
142  public:
TimestampAssigner(const Slice & ts)143   explicit TimestampAssigner(const Slice& ts)
144       : timestamp_(ts), timestamps_(kEmptyTimestampList) {}
TimestampAssigner(const std::vector<Slice> & ts_list)145   explicit TimestampAssigner(const std::vector<Slice>& ts_list)
146       : timestamps_(ts_list) {
147     SanityCheck();
148   }
~TimestampAssigner()149   ~TimestampAssigner() override {}
150 
PutCF(uint32_t,const Slice & key,const Slice &)151   Status PutCF(uint32_t, const Slice& key, const Slice&) override {
152     AssignTimestamp(key);
153     ++idx_;
154     return Status::OK();
155   }
156 
DeleteCF(uint32_t,const Slice & key)157   Status DeleteCF(uint32_t, const Slice& key) override {
158     AssignTimestamp(key);
159     ++idx_;
160     return Status::OK();
161   }
162 
SingleDeleteCF(uint32_t,const Slice & key)163   Status SingleDeleteCF(uint32_t, const Slice& key) override {
164     AssignTimestamp(key);
165     ++idx_;
166     return Status::OK();
167   }
168 
DeleteRangeCF(uint32_t,const Slice & begin_key,const Slice & end_key)169   Status DeleteRangeCF(uint32_t, const Slice& begin_key,
170                        const Slice& end_key) override {
171     AssignTimestamp(begin_key);
172     AssignTimestamp(end_key);
173     ++idx_;
174     return Status::OK();
175   }
176 
MergeCF(uint32_t,const Slice & key,const Slice &)177   Status MergeCF(uint32_t, const Slice& key, const Slice&) override {
178     AssignTimestamp(key);
179     ++idx_;
180     return Status::OK();
181   }
182 
PutBlobIndexCF(uint32_t,const Slice &,const Slice &)183   Status PutBlobIndexCF(uint32_t, const Slice&, const Slice&) override {
184     // TODO (yanqin): support blob db in the future.
185     return Status::OK();
186   }
187 
MarkBeginPrepare(bool)188   Status MarkBeginPrepare(bool) override {
189     // TODO (yanqin): support in the future.
190     return Status::OK();
191   }
192 
MarkEndPrepare(const Slice &)193   Status MarkEndPrepare(const Slice&) override {
194     // TODO (yanqin): support in the future.
195     return Status::OK();
196   }
197 
MarkCommit(const Slice &)198   Status MarkCommit(const Slice&) override {
199     // TODO (yanqin): support in the future.
200     return Status::OK();
201   }
202 
MarkRollback(const Slice &)203   Status MarkRollback(const Slice&) override {
204     // TODO (yanqin): support in the future.
205     return Status::OK();
206   }
207 
208  private:
SanityCheck() const209   void SanityCheck() const {
210     assert(!timestamps_.empty());
211 #ifndef NDEBUG
212     const size_t ts_sz = timestamps_[0].size();
213     for (size_t i = 1; i != timestamps_.size(); ++i) {
214       assert(ts_sz == timestamps_[i].size());
215     }
216 #endif  // !NDEBUG
217   }
218 
AssignTimestamp(const Slice & key)219   void AssignTimestamp(const Slice& key) {
220     assert(timestamps_.empty() || idx_ < timestamps_.size());
221     const Slice& ts = timestamps_.empty() ? timestamp_ : timestamps_[idx_];
222     size_t ts_sz = ts.size();
223     char* ptr = const_cast<char*>(key.data() + key.size() - ts_sz);
224     memcpy(ptr, ts.data(), ts_sz);
225   }
226 
227   static const std::vector<Slice> kEmptyTimestampList;
228   const Slice timestamp_;
229   const std::vector<Slice>& timestamps_;
230   size_t idx_ = 0;
231 
232   // No copy or move.
233   TimestampAssigner(const TimestampAssigner&) = delete;
234   TimestampAssigner(TimestampAssigner&&) = delete;
235   TimestampAssigner& operator=(const TimestampAssigner&) = delete;
236   TimestampAssigner&& operator=(TimestampAssigner&&) = delete;
237 };
238 const std::vector<Slice> TimestampAssigner::kEmptyTimestampList;
239 
240 }  // anon namespace
241 
242 struct SavePoints {
243   std::stack<SavePoint, autovector<SavePoint>> stack;
244 };
245 
WriteBatch(size_t reserved_bytes,size_t max_bytes)246 WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes)
247     : content_flags_(0), max_bytes_(max_bytes), rep_(), timestamp_size_(0) {
248   rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader)
249                    ? reserved_bytes
250                    : WriteBatchInternal::kHeader);
251   rep_.resize(WriteBatchInternal::kHeader);
252 }
253 
WriteBatch(size_t reserved_bytes,size_t max_bytes,size_t ts_sz)254 WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes, size_t ts_sz)
255     : content_flags_(0), max_bytes_(max_bytes), rep_(), timestamp_size_(ts_sz) {
256   rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader) ?
257     reserved_bytes : WriteBatchInternal::kHeader);
258   rep_.resize(WriteBatchInternal::kHeader);
259 }
260 
WriteBatch(const std::string & rep)261 WriteBatch::WriteBatch(const std::string& rep)
262     : content_flags_(ContentFlags::DEFERRED),
263       max_bytes_(0),
264       rep_(rep),
265       timestamp_size_(0) {}
266 
WriteBatch(std::string && rep)267 WriteBatch::WriteBatch(std::string&& rep)
268     : content_flags_(ContentFlags::DEFERRED),
269       max_bytes_(0),
270       rep_(std::move(rep)),
271       timestamp_size_(0) {}
272 
WriteBatch(const WriteBatch & src)273 WriteBatch::WriteBatch(const WriteBatch& src)
274     : wal_term_point_(src.wal_term_point_),
275       content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
276       max_bytes_(src.max_bytes_),
277       rep_(src.rep_),
278       timestamp_size_(src.timestamp_size_) {
279   if (src.save_points_ != nullptr) {
280     save_points_.reset(new SavePoints());
281     save_points_->stack = src.save_points_->stack;
282   }
283 }
284 
WriteBatch(WriteBatch && src)285 WriteBatch::WriteBatch(WriteBatch&& src) noexcept
286     : save_points_(std::move(src.save_points_)),
287       wal_term_point_(std::move(src.wal_term_point_)),
288       content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
289       max_bytes_(src.max_bytes_),
290       rep_(std::move(src.rep_)),
291       timestamp_size_(src.timestamp_size_) {}
292 
operator =(const WriteBatch & src)293 WriteBatch& WriteBatch::operator=(const WriteBatch& src) {
294   if (&src != this) {
295     this->~WriteBatch();
296     new (this) WriteBatch(src);
297   }
298   return *this;
299 }
300 
operator =(WriteBatch && src)301 WriteBatch& WriteBatch::operator=(WriteBatch&& src) {
302   if (&src != this) {
303     this->~WriteBatch();
304     new (this) WriteBatch(std::move(src));
305   }
306   return *this;
307 }
308 
~WriteBatch()309 WriteBatch::~WriteBatch() { }
310 
~Handler()311 WriteBatch::Handler::~Handler() { }
312 
LogData(const Slice &)313 void WriteBatch::Handler::LogData(const Slice& /*blob*/) {
314   // If the user has not specified something to do with blobs, then we ignore
315   // them.
316 }
317 
Continue()318 bool WriteBatch::Handler::Continue() {
319   return true;
320 }
321 
Clear()322 void WriteBatch::Clear() {
323   rep_.clear();
324   rep_.resize(WriteBatchInternal::kHeader);
325 
326   content_flags_.store(0, std::memory_order_relaxed);
327 
328   if (save_points_ != nullptr) {
329     while (!save_points_->stack.empty()) {
330       save_points_->stack.pop();
331     }
332   }
333 
334   wal_term_point_.clear();
335 }
336 
Count() const337 uint32_t WriteBatch::Count() const { return WriteBatchInternal::Count(this); }
338 
ComputeContentFlags() const339 uint32_t WriteBatch::ComputeContentFlags() const {
340   auto rv = content_flags_.load(std::memory_order_relaxed);
341   if ((rv & ContentFlags::DEFERRED) != 0) {
342     BatchContentClassifier classifier;
343     Iterate(&classifier);
344     rv = classifier.content_flags;
345 
346     // this method is conceptually const, because it is performing a lazy
347     // computation that doesn't affect the abstract state of the batch.
348     // content_flags_ is marked mutable so that we can perform the
349     // following assignment
350     content_flags_.store(rv, std::memory_order_relaxed);
351   }
352   return rv;
353 }
354 
MarkWalTerminationPoint()355 void WriteBatch::MarkWalTerminationPoint() {
356   wal_term_point_.size = GetDataSize();
357   wal_term_point_.count = Count();
358   wal_term_point_.content_flags = content_flags_;
359 }
360 
HasPut() const361 bool WriteBatch::HasPut() const {
362   return (ComputeContentFlags() & ContentFlags::HAS_PUT) != 0;
363 }
364 
HasDelete() const365 bool WriteBatch::HasDelete() const {
366   return (ComputeContentFlags() & ContentFlags::HAS_DELETE) != 0;
367 }
368 
HasSingleDelete() const369 bool WriteBatch::HasSingleDelete() const {
370   return (ComputeContentFlags() & ContentFlags::HAS_SINGLE_DELETE) != 0;
371 }
372 
HasDeleteRange() const373 bool WriteBatch::HasDeleteRange() const {
374   return (ComputeContentFlags() & ContentFlags::HAS_DELETE_RANGE) != 0;
375 }
376 
HasMerge() const377 bool WriteBatch::HasMerge() const {
378   return (ComputeContentFlags() & ContentFlags::HAS_MERGE) != 0;
379 }
380 
ReadKeyFromWriteBatchEntry(Slice * input,Slice * key,bool cf_record)381 bool ReadKeyFromWriteBatchEntry(Slice* input, Slice* key, bool cf_record) {
382   assert(input != nullptr && key != nullptr);
383   // Skip tag byte
384   input->remove_prefix(1);
385 
386   if (cf_record) {
387     // Skip column_family bytes
388     uint32_t cf;
389     if (!GetVarint32(input, &cf)) {
390       return false;
391     }
392   }
393 
394   // Extract key
395   return GetLengthPrefixedSlice(input, key);
396 }
397 
HasBeginPrepare() const398 bool WriteBatch::HasBeginPrepare() const {
399   return (ComputeContentFlags() & ContentFlags::HAS_BEGIN_PREPARE) != 0;
400 }
401 
HasEndPrepare() const402 bool WriteBatch::HasEndPrepare() const {
403   return (ComputeContentFlags() & ContentFlags::HAS_END_PREPARE) != 0;
404 }
405 
HasCommit() const406 bool WriteBatch::HasCommit() const {
407   return (ComputeContentFlags() & ContentFlags::HAS_COMMIT) != 0;
408 }
409 
HasRollback() const410 bool WriteBatch::HasRollback() const {
411   return (ComputeContentFlags() & ContentFlags::HAS_ROLLBACK) != 0;
412 }
413 
ReadRecordFromWriteBatch(Slice * input,char * tag,uint32_t * column_family,Slice * key,Slice * value,Slice * blob,Slice * xid)414 Status ReadRecordFromWriteBatch(Slice* input, char* tag,
415                                 uint32_t* column_family, Slice* key,
416                                 Slice* value, Slice* blob, Slice* xid) {
417   assert(key != nullptr && value != nullptr);
418   *tag = (*input)[0];
419   input->remove_prefix(1);
420   *column_family = 0;  // default
421   switch (*tag) {
422     case kTypeColumnFamilyValue:
423       if (!GetVarint32(input, column_family)) {
424         return Status::Corruption("bad WriteBatch Put");
425       }
426       FALLTHROUGH_INTENDED;
427     case kTypeValue:
428       if (!GetLengthPrefixedSlice(input, key) ||
429           !GetLengthPrefixedSlice(input, value)) {
430         return Status::Corruption("bad WriteBatch Put");
431       }
432       break;
433     case kTypeColumnFamilyDeletion:
434     case kTypeColumnFamilySingleDeletion:
435       if (!GetVarint32(input, column_family)) {
436         return Status::Corruption("bad WriteBatch Delete");
437       }
438       FALLTHROUGH_INTENDED;
439     case kTypeDeletion:
440     case kTypeSingleDeletion:
441       if (!GetLengthPrefixedSlice(input, key)) {
442         return Status::Corruption("bad WriteBatch Delete");
443       }
444       break;
445     case kTypeColumnFamilyRangeDeletion:
446       if (!GetVarint32(input, column_family)) {
447         return Status::Corruption("bad WriteBatch DeleteRange");
448       }
449       FALLTHROUGH_INTENDED;
450     case kTypeRangeDeletion:
451       // for range delete, "key" is begin_key, "value" is end_key
452       if (!GetLengthPrefixedSlice(input, key) ||
453           !GetLengthPrefixedSlice(input, value)) {
454         return Status::Corruption("bad WriteBatch DeleteRange");
455       }
456       break;
457     case kTypeColumnFamilyMerge:
458       if (!GetVarint32(input, column_family)) {
459         return Status::Corruption("bad WriteBatch Merge");
460       }
461       FALLTHROUGH_INTENDED;
462     case kTypeMerge:
463       if (!GetLengthPrefixedSlice(input, key) ||
464           !GetLengthPrefixedSlice(input, value)) {
465         return Status::Corruption("bad WriteBatch Merge");
466       }
467       break;
468     case kTypeColumnFamilyBlobIndex:
469       if (!GetVarint32(input, column_family)) {
470         return Status::Corruption("bad WriteBatch BlobIndex");
471       }
472       FALLTHROUGH_INTENDED;
473     case kTypeBlobIndex:
474       if (!GetLengthPrefixedSlice(input, key) ||
475           !GetLengthPrefixedSlice(input, value)) {
476         return Status::Corruption("bad WriteBatch BlobIndex");
477       }
478       break;
479     case kTypeLogData:
480       assert(blob != nullptr);
481       if (!GetLengthPrefixedSlice(input, blob)) {
482         return Status::Corruption("bad WriteBatch Blob");
483       }
484       break;
485     case kTypeNoop:
486     case kTypeBeginPrepareXID:
487       // This indicates that the prepared batch is also persisted in the db.
488       // This is used in WritePreparedTxn
489     case kTypeBeginPersistedPrepareXID:
490       // This is used in WriteUnpreparedTxn
491     case kTypeBeginUnprepareXID:
492       break;
493     case kTypeEndPrepareXID:
494       if (!GetLengthPrefixedSlice(input, xid)) {
495         return Status::Corruption("bad EndPrepare XID");
496       }
497       break;
498     case kTypeCommitXID:
499       if (!GetLengthPrefixedSlice(input, xid)) {
500         return Status::Corruption("bad Commit XID");
501       }
502       break;
503     case kTypeRollbackXID:
504       if (!GetLengthPrefixedSlice(input, xid)) {
505         return Status::Corruption("bad Rollback XID");
506       }
507       break;
508     default:
509       return Status::Corruption("unknown WriteBatch tag");
510   }
511   return Status::OK();
512 }
513 
Iterate(Handler * handler) const514 Status WriteBatch::Iterate(Handler* handler) const {
515   if (rep_.size() < WriteBatchInternal::kHeader) {
516     return Status::Corruption("malformed WriteBatch (too small)");
517   }
518 
519   return WriteBatchInternal::Iterate(this, handler, WriteBatchInternal::kHeader,
520                                      rep_.size());
521 }
522 
Iterate(const WriteBatch * wb,WriteBatch::Handler * handler,size_t begin,size_t end)523 Status WriteBatchInternal::Iterate(const WriteBatch* wb,
524                                    WriteBatch::Handler* handler, size_t begin,
525                                    size_t end) {
526   if (begin > wb->rep_.size() || end > wb->rep_.size() || end < begin) {
527     return Status::Corruption("Invalid start/end bounds for Iterate");
528   }
529   assert(begin <= end);
530   Slice input(wb->rep_.data() + begin, static_cast<size_t>(end - begin));
531   bool whole_batch =
532       (begin == WriteBatchInternal::kHeader) && (end == wb->rep_.size());
533 
534   Slice key, value, blob, xid;
535   // Sometimes a sub-batch starts with a Noop. We want to exclude such Noops as
536   // the batch boundary symbols otherwise we would mis-count the number of
537   // batches. We do that by checking whether the accumulated batch is empty
538   // before seeing the next Noop.
539   bool empty_batch = true;
540   uint32_t found = 0;
541   Status s;
542   char tag = 0;
543   uint32_t column_family = 0;  // default
544   bool last_was_try_again = false;
545   bool handler_continue = true;
546   while (((s.ok() && !input.empty()) || UNLIKELY(s.IsTryAgain()))) {
547     handler_continue = handler->Continue();
548     if (!handler_continue) {
549       break;
550     }
551 
552     if (LIKELY(!s.IsTryAgain())) {
553       last_was_try_again = false;
554       tag = 0;
555       column_family = 0;  // default
556 
557       s = ReadRecordFromWriteBatch(&input, &tag, &column_family, &key, &value,
558                                    &blob, &xid);
559       if (!s.ok()) {
560         return s;
561       }
562     } else {
563       assert(s.IsTryAgain());
564       assert(!last_was_try_again);  // to detect infinite loop bugs
565       if (UNLIKELY(last_was_try_again)) {
566         return Status::Corruption(
567             "two consecutive TryAgain in WriteBatch handler; this is either a "
568             "software bug or data corruption.");
569       }
570       last_was_try_again = true;
571       s = Status::OK();
572     }
573 
574     switch (tag) {
575       case kTypeColumnFamilyValue:
576       case kTypeValue:
577         assert(wb->content_flags_.load(std::memory_order_relaxed) &
578                (ContentFlags::DEFERRED | ContentFlags::HAS_PUT));
579         s = handler->PutCF(column_family, key, value);
580         if (LIKELY(s.ok())) {
581           empty_batch = false;
582           found++;
583         }
584         break;
585       case kTypeColumnFamilyDeletion:
586       case kTypeDeletion:
587         assert(wb->content_flags_.load(std::memory_order_relaxed) &
588                (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE));
589         s = handler->DeleteCF(column_family, key);
590         if (LIKELY(s.ok())) {
591           empty_batch = false;
592           found++;
593         }
594         break;
595       case kTypeColumnFamilySingleDeletion:
596       case kTypeSingleDeletion:
597         assert(wb->content_flags_.load(std::memory_order_relaxed) &
598                (ContentFlags::DEFERRED | ContentFlags::HAS_SINGLE_DELETE));
599         s = handler->SingleDeleteCF(column_family, key);
600         if (LIKELY(s.ok())) {
601           empty_batch = false;
602           found++;
603         }
604         break;
605       case kTypeColumnFamilyRangeDeletion:
606       case kTypeRangeDeletion:
607         assert(wb->content_flags_.load(std::memory_order_relaxed) &
608                (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE_RANGE));
609         s = handler->DeleteRangeCF(column_family, key, value);
610         if (LIKELY(s.ok())) {
611           empty_batch = false;
612           found++;
613         }
614         break;
615       case kTypeColumnFamilyMerge:
616       case kTypeMerge:
617         assert(wb->content_flags_.load(std::memory_order_relaxed) &
618                (ContentFlags::DEFERRED | ContentFlags::HAS_MERGE));
619         s = handler->MergeCF(column_family, key, value);
620         if (LIKELY(s.ok())) {
621           empty_batch = false;
622           found++;
623         }
624         break;
625       case kTypeColumnFamilyBlobIndex:
626       case kTypeBlobIndex:
627         assert(wb->content_flags_.load(std::memory_order_relaxed) &
628                (ContentFlags::DEFERRED | ContentFlags::HAS_BLOB_INDEX));
629         s = handler->PutBlobIndexCF(column_family, key, value);
630         if (LIKELY(s.ok())) {
631           found++;
632         }
633         break;
634       case kTypeLogData:
635         handler->LogData(blob);
636         // A batch might have nothing but LogData. It is still a batch.
637         empty_batch = false;
638         break;
639       case kTypeBeginPrepareXID:
640         assert(wb->content_flags_.load(std::memory_order_relaxed) &
641                (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE));
642         handler->MarkBeginPrepare();
643         empty_batch = false;
644         if (!handler->WriteAfterCommit()) {
645           s = Status::NotSupported(
646               "WriteCommitted txn tag when write_after_commit_ is disabled (in "
647               "WritePrepared/WriteUnprepared mode). If it is not due to "
648               "corruption, the WAL must be emptied before changing the "
649               "WritePolicy.");
650         }
651         if (handler->WriteBeforePrepare()) {
652           s = Status::NotSupported(
653               "WriteCommitted txn tag when write_before_prepare_ is enabled "
654               "(in WriteUnprepared mode). If it is not due to corruption, the "
655               "WAL must be emptied before changing the WritePolicy.");
656         }
657         break;
658       case kTypeBeginPersistedPrepareXID:
659         assert(wb->content_flags_.load(std::memory_order_relaxed) &
660                (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE));
661         handler->MarkBeginPrepare();
662         empty_batch = false;
663         if (handler->WriteAfterCommit()) {
664           s = Status::NotSupported(
665               "WritePrepared/WriteUnprepared txn tag when write_after_commit_ "
666               "is enabled (in default WriteCommitted mode). If it is not due "
667               "to corruption, the WAL must be emptied before changing the "
668               "WritePolicy.");
669         }
670         break;
671       case kTypeBeginUnprepareXID:
672         assert(wb->content_flags_.load(std::memory_order_relaxed) &
673                (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_UNPREPARE));
674         handler->MarkBeginPrepare(true /* unprepared */);
675         empty_batch = false;
676         if (handler->WriteAfterCommit()) {
677           s = Status::NotSupported(
678               "WriteUnprepared txn tag when write_after_commit_ is enabled (in "
679               "default WriteCommitted mode). If it is not due to corruption, "
680               "the WAL must be emptied before changing the WritePolicy.");
681         }
682         if (!handler->WriteBeforePrepare()) {
683           s = Status::NotSupported(
684               "WriteUnprepared txn tag when write_before_prepare_ is disabled "
685               "(in WriteCommitted/WritePrepared mode). If it is not due to "
686               "corruption, the WAL must be emptied before changing the "
687               "WritePolicy.");
688         }
689         break;
690       case kTypeEndPrepareXID:
691         assert(wb->content_flags_.load(std::memory_order_relaxed) &
692                (ContentFlags::DEFERRED | ContentFlags::HAS_END_PREPARE));
693         handler->MarkEndPrepare(xid);
694         empty_batch = true;
695         break;
696       case kTypeCommitXID:
697         assert(wb->content_flags_.load(std::memory_order_relaxed) &
698                (ContentFlags::DEFERRED | ContentFlags::HAS_COMMIT));
699         handler->MarkCommit(xid);
700         empty_batch = true;
701         break;
702       case kTypeRollbackXID:
703         assert(wb->content_flags_.load(std::memory_order_relaxed) &
704                (ContentFlags::DEFERRED | ContentFlags::HAS_ROLLBACK));
705         handler->MarkRollback(xid);
706         empty_batch = true;
707         break;
708       case kTypeNoop:
709         handler->MarkNoop(empty_batch);
710         empty_batch = true;
711         break;
712       default:
713         return Status::Corruption("unknown WriteBatch tag");
714     }
715   }
716   if (!s.ok()) {
717     return s;
718   }
719   if (handler_continue && whole_batch &&
720       found != WriteBatchInternal::Count(wb)) {
721     return Status::Corruption("WriteBatch has wrong count");
722   } else {
723     return Status::OK();
724   }
725 }
726 
IsLatestPersistentState(const WriteBatch * b)727 bool WriteBatchInternal::IsLatestPersistentState(const WriteBatch* b) {
728   return b->is_latest_persistent_state_;
729 }
730 
SetAsLastestPersistentState(WriteBatch * b)731 void WriteBatchInternal::SetAsLastestPersistentState(WriteBatch* b) {
732   b->is_latest_persistent_state_ = true;
733 }
734 
Count(const WriteBatch * b)735 uint32_t WriteBatchInternal::Count(const WriteBatch* b) {
736   return DecodeFixed32(b->rep_.data() + 8);
737 }
738 
SetCount(WriteBatch * b,uint32_t n)739 void WriteBatchInternal::SetCount(WriteBatch* b, uint32_t n) {
740   EncodeFixed32(&b->rep_[8], n);
741 }
742 
Sequence(const WriteBatch * b)743 SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) {
744   return SequenceNumber(DecodeFixed64(b->rep_.data()));
745 }
746 
SetSequence(WriteBatch * b,SequenceNumber seq)747 void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
748   EncodeFixed64(&b->rep_[0], seq);
749 }
750 
GetFirstOffset(WriteBatch *)751 size_t WriteBatchInternal::GetFirstOffset(WriteBatch* /*b*/) {
752   return WriteBatchInternal::kHeader;
753 }
754 
Put(WriteBatch * b,uint32_t column_family_id,const Slice & key,const Slice & value)755 Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
756                                const Slice& key, const Slice& value) {
757   if (key.size() > size_t{port::kMaxUint32}) {
758     return Status::InvalidArgument("key is too large");
759   }
760   if (value.size() > size_t{port::kMaxUint32}) {
761     return Status::InvalidArgument("value is too large");
762   }
763 
764   LocalSavePoint save(b);
765   WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
766   if (column_family_id == 0) {
767     b->rep_.push_back(static_cast<char>(kTypeValue));
768   } else {
769     b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
770     PutVarint32(&b->rep_, column_family_id);
771   }
772   if (0 == b->timestamp_size_) {
773     PutLengthPrefixedSlice(&b->rep_, key);
774   } else {
775     PutVarint32(&b->rep_,
776                 static_cast<uint32_t>(key.size() + b->timestamp_size_));
777     b->rep_.append(key.data(), key.size());
778     b->rep_.append(b->timestamp_size_, '\0');
779   }
780   PutLengthPrefixedSlice(&b->rep_, value);
781   b->content_flags_.store(
782       b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
783       std::memory_order_relaxed);
784   return save.commit();
785 }
786 
Put(ColumnFamilyHandle * column_family,const Slice & key,const Slice & value)787 Status WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key,
788                        const Slice& value) {
789   return WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key,
790                                  value);
791 }
792 
CheckSlicePartsLength(const SliceParts & key,const SliceParts & value)793 Status WriteBatchInternal::CheckSlicePartsLength(const SliceParts& key,
794                                                  const SliceParts& value) {
795   size_t total_key_bytes = 0;
796   for (int i = 0; i < key.num_parts; ++i) {
797     total_key_bytes += key.parts[i].size();
798   }
799   if (total_key_bytes >= size_t{port::kMaxUint32}) {
800     return Status::InvalidArgument("key is too large");
801   }
802 
803   size_t total_value_bytes = 0;
804   for (int i = 0; i < value.num_parts; ++i) {
805     total_value_bytes += value.parts[i].size();
806   }
807   if (total_value_bytes >= size_t{port::kMaxUint32}) {
808     return Status::InvalidArgument("value is too large");
809   }
810   return Status::OK();
811 }
812 
Put(WriteBatch * b,uint32_t column_family_id,const SliceParts & key,const SliceParts & value)813 Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
814                                const SliceParts& key, const SliceParts& value) {
815   Status s = CheckSlicePartsLength(key, value);
816   if (!s.ok()) {
817     return s;
818   }
819 
820   LocalSavePoint save(b);
821   WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
822   if (column_family_id == 0) {
823     b->rep_.push_back(static_cast<char>(kTypeValue));
824   } else {
825     b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
826     PutVarint32(&b->rep_, column_family_id);
827   }
828   if (0 == b->timestamp_size_) {
829     PutLengthPrefixedSliceParts(&b->rep_, key);
830   } else {
831     PutLengthPrefixedSlicePartsWithPadding(&b->rep_, key, b->timestamp_size_);
832   }
833   PutLengthPrefixedSliceParts(&b->rep_, value);
834   b->content_flags_.store(
835       b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
836       std::memory_order_relaxed);
837   return save.commit();
838 }
839 
Put(ColumnFamilyHandle * column_family,const SliceParts & key,const SliceParts & value)840 Status WriteBatch::Put(ColumnFamilyHandle* column_family, const SliceParts& key,
841                        const SliceParts& value) {
842   return WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key,
843                                  value);
844 }
845 
InsertNoop(WriteBatch * b)846 Status WriteBatchInternal::InsertNoop(WriteBatch* b) {
847   b->rep_.push_back(static_cast<char>(kTypeNoop));
848   return Status::OK();
849 }
850 
MarkEndPrepare(WriteBatch * b,const Slice & xid,bool write_after_commit,bool unprepared_batch)851 Status WriteBatchInternal::MarkEndPrepare(WriteBatch* b, const Slice& xid,
852                                           bool write_after_commit,
853                                           bool unprepared_batch) {
854   // a manually constructed batch can only contain one prepare section
855   assert(b->rep_[12] == static_cast<char>(kTypeNoop));
856 
857   // all savepoints up to this point are cleared
858   if (b->save_points_ != nullptr) {
859     while (!b->save_points_->stack.empty()) {
860       b->save_points_->stack.pop();
861     }
862   }
863 
864   // rewrite noop as begin marker
865   b->rep_[12] = static_cast<char>(
866       write_after_commit ? kTypeBeginPrepareXID
867                          : (unprepared_batch ? kTypeBeginUnprepareXID
868                                              : kTypeBeginPersistedPrepareXID));
869   b->rep_.push_back(static_cast<char>(kTypeEndPrepareXID));
870   PutLengthPrefixedSlice(&b->rep_, xid);
871   b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
872                               ContentFlags::HAS_END_PREPARE |
873                               ContentFlags::HAS_BEGIN_PREPARE,
874                           std::memory_order_relaxed);
875   if (unprepared_batch) {
876     b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
877                                 ContentFlags::HAS_BEGIN_UNPREPARE,
878                             std::memory_order_relaxed);
879   }
880   return Status::OK();
881 }
882 
MarkCommit(WriteBatch * b,const Slice & xid)883 Status WriteBatchInternal::MarkCommit(WriteBatch* b, const Slice& xid) {
884   b->rep_.push_back(static_cast<char>(kTypeCommitXID));
885   PutLengthPrefixedSlice(&b->rep_, xid);
886   b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
887                               ContentFlags::HAS_COMMIT,
888                           std::memory_order_relaxed);
889   return Status::OK();
890 }
891 
MarkRollback(WriteBatch * b,const Slice & xid)892 Status WriteBatchInternal::MarkRollback(WriteBatch* b, const Slice& xid) {
893   b->rep_.push_back(static_cast<char>(kTypeRollbackXID));
894   PutLengthPrefixedSlice(&b->rep_, xid);
895   b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
896                               ContentFlags::HAS_ROLLBACK,
897                           std::memory_order_relaxed);
898   return Status::OK();
899 }
900 
Delete(WriteBatch * b,uint32_t column_family_id,const Slice & key)901 Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
902                                   const Slice& key) {
903   LocalSavePoint save(b);
904   WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
905   if (column_family_id == 0) {
906     b->rep_.push_back(static_cast<char>(kTypeDeletion));
907   } else {
908     b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion));
909     PutVarint32(&b->rep_, column_family_id);
910   }
911   PutLengthPrefixedSlice(&b->rep_, key);
912   b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
913                               ContentFlags::HAS_DELETE,
914                           std::memory_order_relaxed);
915   return save.commit();
916 }
917 
Delete(ColumnFamilyHandle * column_family,const Slice & key)918 Status WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key) {
919   return WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family),
920                                     key);
921 }
922 
Delete(WriteBatch * b,uint32_t column_family_id,const SliceParts & key)923 Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
924                                   const SliceParts& key) {
925   LocalSavePoint save(b);
926   WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
927   if (column_family_id == 0) {
928     b->rep_.push_back(static_cast<char>(kTypeDeletion));
929   } else {
930     b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion));
931     PutVarint32(&b->rep_, column_family_id);
932   }
933   PutLengthPrefixedSliceParts(&b->rep_, key);
934   b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
935                               ContentFlags::HAS_DELETE,
936                           std::memory_order_relaxed);
937   return save.commit();
938 }
939 
Delete(ColumnFamilyHandle * column_family,const SliceParts & key)940 Status WriteBatch::Delete(ColumnFamilyHandle* column_family,
941                           const SliceParts& key) {
942   return WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family),
943                                     key);
944 }
945 
SingleDelete(WriteBatch * b,uint32_t column_family_id,const Slice & key)946 Status WriteBatchInternal::SingleDelete(WriteBatch* b,
947                                         uint32_t column_family_id,
948                                         const Slice& key) {
949   LocalSavePoint save(b);
950   WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
951   if (column_family_id == 0) {
952     b->rep_.push_back(static_cast<char>(kTypeSingleDeletion));
953   } else {
954     b->rep_.push_back(static_cast<char>(kTypeColumnFamilySingleDeletion));
955     PutVarint32(&b->rep_, column_family_id);
956   }
957   PutLengthPrefixedSlice(&b->rep_, key);
958   b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
959                               ContentFlags::HAS_SINGLE_DELETE,
960                           std::memory_order_relaxed);
961   return save.commit();
962 }
963 
SingleDelete(ColumnFamilyHandle * column_family,const Slice & key)964 Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family,
965                                 const Slice& key) {
966   return WriteBatchInternal::SingleDelete(
967       this, GetColumnFamilyID(column_family), key);
968 }
969 
SingleDelete(WriteBatch * b,uint32_t column_family_id,const SliceParts & key)970 Status WriteBatchInternal::SingleDelete(WriteBatch* b,
971                                         uint32_t column_family_id,
972                                         const SliceParts& key) {
973   LocalSavePoint save(b);
974   WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
975   if (column_family_id == 0) {
976     b->rep_.push_back(static_cast<char>(kTypeSingleDeletion));
977   } else {
978     b->rep_.push_back(static_cast<char>(kTypeColumnFamilySingleDeletion));
979     PutVarint32(&b->rep_, column_family_id);
980   }
981   PutLengthPrefixedSliceParts(&b->rep_, key);
982   b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
983                               ContentFlags::HAS_SINGLE_DELETE,
984                           std::memory_order_relaxed);
985   return save.commit();
986 }
987 
SingleDelete(ColumnFamilyHandle * column_family,const SliceParts & key)988 Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family,
989                                 const SliceParts& key) {
990   return WriteBatchInternal::SingleDelete(
991       this, GetColumnFamilyID(column_family), key);
992 }
993 
DeleteRange(WriteBatch * b,uint32_t column_family_id,const Slice & begin_key,const Slice & end_key)994 Status WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id,
995                                        const Slice& begin_key,
996                                        const Slice& end_key) {
997   LocalSavePoint save(b);
998   WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
999   if (column_family_id == 0) {
1000     b->rep_.push_back(static_cast<char>(kTypeRangeDeletion));
1001   } else {
1002     b->rep_.push_back(static_cast<char>(kTypeColumnFamilyRangeDeletion));
1003     PutVarint32(&b->rep_, column_family_id);
1004   }
1005   PutLengthPrefixedSlice(&b->rep_, begin_key);
1006   PutLengthPrefixedSlice(&b->rep_, end_key);
1007   b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1008                               ContentFlags::HAS_DELETE_RANGE,
1009                           std::memory_order_relaxed);
1010   return save.commit();
1011 }
1012 
DeleteRange(ColumnFamilyHandle * column_family,const Slice & begin_key,const Slice & end_key)1013 Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family,
1014                                const Slice& begin_key, const Slice& end_key) {
1015   return WriteBatchInternal::DeleteRange(this, GetColumnFamilyID(column_family),
1016                                          begin_key, end_key);
1017 }
1018 
DeleteRange(WriteBatch * b,uint32_t column_family_id,const SliceParts & begin_key,const SliceParts & end_key)1019 Status WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id,
1020                                        const SliceParts& begin_key,
1021                                        const SliceParts& end_key) {
1022   LocalSavePoint save(b);
1023   WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1024   if (column_family_id == 0) {
1025     b->rep_.push_back(static_cast<char>(kTypeRangeDeletion));
1026   } else {
1027     b->rep_.push_back(static_cast<char>(kTypeColumnFamilyRangeDeletion));
1028     PutVarint32(&b->rep_, column_family_id);
1029   }
1030   PutLengthPrefixedSliceParts(&b->rep_, begin_key);
1031   PutLengthPrefixedSliceParts(&b->rep_, end_key);
1032   b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1033                               ContentFlags::HAS_DELETE_RANGE,
1034                           std::memory_order_relaxed);
1035   return save.commit();
1036 }
1037 
DeleteRange(ColumnFamilyHandle * column_family,const SliceParts & begin_key,const SliceParts & end_key)1038 Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family,
1039                                const SliceParts& begin_key,
1040                                const SliceParts& end_key) {
1041   return WriteBatchInternal::DeleteRange(this, GetColumnFamilyID(column_family),
1042                                          begin_key, end_key);
1043 }
1044 
Merge(WriteBatch * b,uint32_t column_family_id,const Slice & key,const Slice & value)1045 Status WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
1046                                  const Slice& key, const Slice& value) {
1047   if (key.size() > size_t{port::kMaxUint32}) {
1048     return Status::InvalidArgument("key is too large");
1049   }
1050   if (value.size() > size_t{port::kMaxUint32}) {
1051     return Status::InvalidArgument("value is too large");
1052   }
1053 
1054   LocalSavePoint save(b);
1055   WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1056   if (column_family_id == 0) {
1057     b->rep_.push_back(static_cast<char>(kTypeMerge));
1058   } else {
1059     b->rep_.push_back(static_cast<char>(kTypeColumnFamilyMerge));
1060     PutVarint32(&b->rep_, column_family_id);
1061   }
1062   PutLengthPrefixedSlice(&b->rep_, key);
1063   PutLengthPrefixedSlice(&b->rep_, value);
1064   b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1065                               ContentFlags::HAS_MERGE,
1066                           std::memory_order_relaxed);
1067   return save.commit();
1068 }
1069 
Merge(ColumnFamilyHandle * column_family,const Slice & key,const Slice & value)1070 Status WriteBatch::Merge(ColumnFamilyHandle* column_family, const Slice& key,
1071                          const Slice& value) {
1072   return WriteBatchInternal::Merge(this, GetColumnFamilyID(column_family), key,
1073                                    value);
1074 }
1075 
Merge(WriteBatch * b,uint32_t column_family_id,const SliceParts & key,const SliceParts & value)1076 Status WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
1077                                  const SliceParts& key,
1078                                  const SliceParts& value) {
1079   Status s = CheckSlicePartsLength(key, value);
1080   if (!s.ok()) {
1081     return s;
1082   }
1083 
1084   LocalSavePoint save(b);
1085   WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1086   if (column_family_id == 0) {
1087     b->rep_.push_back(static_cast<char>(kTypeMerge));
1088   } else {
1089     b->rep_.push_back(static_cast<char>(kTypeColumnFamilyMerge));
1090     PutVarint32(&b->rep_, column_family_id);
1091   }
1092   PutLengthPrefixedSliceParts(&b->rep_, key);
1093   PutLengthPrefixedSliceParts(&b->rep_, value);
1094   b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1095                               ContentFlags::HAS_MERGE,
1096                           std::memory_order_relaxed);
1097   return save.commit();
1098 }
1099 
Merge(ColumnFamilyHandle * column_family,const SliceParts & key,const SliceParts & value)1100 Status WriteBatch::Merge(ColumnFamilyHandle* column_family,
1101                          const SliceParts& key, const SliceParts& value) {
1102   return WriteBatchInternal::Merge(this, GetColumnFamilyID(column_family), key,
1103                                    value);
1104 }
1105 
PutBlobIndex(WriteBatch * b,uint32_t column_family_id,const Slice & key,const Slice & value)1106 Status WriteBatchInternal::PutBlobIndex(WriteBatch* b,
1107                                         uint32_t column_family_id,
1108                                         const Slice& key, const Slice& value) {
1109   LocalSavePoint save(b);
1110   WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1111   if (column_family_id == 0) {
1112     b->rep_.push_back(static_cast<char>(kTypeBlobIndex));
1113   } else {
1114     b->rep_.push_back(static_cast<char>(kTypeColumnFamilyBlobIndex));
1115     PutVarint32(&b->rep_, column_family_id);
1116   }
1117   PutLengthPrefixedSlice(&b->rep_, key);
1118   PutLengthPrefixedSlice(&b->rep_, value);
1119   b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1120                               ContentFlags::HAS_BLOB_INDEX,
1121                           std::memory_order_relaxed);
1122   return save.commit();
1123 }
1124 
PutLogData(const Slice & blob)1125 Status WriteBatch::PutLogData(const Slice& blob) {
1126   LocalSavePoint save(this);
1127   rep_.push_back(static_cast<char>(kTypeLogData));
1128   PutLengthPrefixedSlice(&rep_, blob);
1129   return save.commit();
1130 }
1131 
SetSavePoint()1132 void WriteBatch::SetSavePoint() {
1133   if (save_points_ == nullptr) {
1134     save_points_.reset(new SavePoints());
1135   }
1136   // Record length and count of current batch of writes.
1137   save_points_->stack.push(SavePoint(
1138       GetDataSize(), Count(), content_flags_.load(std::memory_order_relaxed)));
1139 }
1140 
RollbackToSavePoint()1141 Status WriteBatch::RollbackToSavePoint() {
1142   if (save_points_ == nullptr || save_points_->stack.size() == 0) {
1143     return Status::NotFound();
1144   }
1145 
1146   // Pop the most recent savepoint off the stack
1147   SavePoint savepoint = save_points_->stack.top();
1148   save_points_->stack.pop();
1149 
1150   assert(savepoint.size <= rep_.size());
1151   assert(static_cast<uint32_t>(savepoint.count) <= Count());
1152 
1153   if (savepoint.size == rep_.size()) {
1154     // No changes to rollback
1155   } else if (savepoint.size == 0) {
1156     // Rollback everything
1157     Clear();
1158   } else {
1159     rep_.resize(savepoint.size);
1160     WriteBatchInternal::SetCount(this, savepoint.count);
1161     content_flags_.store(savepoint.content_flags, std::memory_order_relaxed);
1162   }
1163 
1164   return Status::OK();
1165 }
1166 
PopSavePoint()1167 Status WriteBatch::PopSavePoint() {
1168   if (save_points_ == nullptr || save_points_->stack.size() == 0) {
1169     return Status::NotFound();
1170   }
1171 
1172   // Pop the most recent savepoint off the stack
1173   save_points_->stack.pop();
1174 
1175   return Status::OK();
1176 }
1177 
AssignTimestamp(const Slice & ts)1178 Status WriteBatch::AssignTimestamp(const Slice& ts) {
1179   TimestampAssigner ts_assigner(ts);
1180   return Iterate(&ts_assigner);
1181 }
1182 
AssignTimestamps(const std::vector<Slice> & ts_list)1183 Status WriteBatch::AssignTimestamps(const std::vector<Slice>& ts_list) {
1184   TimestampAssigner ts_assigner(ts_list);
1185   return Iterate(&ts_assigner);
1186 }
1187 
1188 class MemTableInserter : public WriteBatch::Handler {
1189 
1190   SequenceNumber sequence_;
1191   ColumnFamilyMemTables* const cf_mems_;
1192   FlushScheduler* const flush_scheduler_;
1193   TrimHistoryScheduler* const trim_history_scheduler_;
1194   const bool ignore_missing_column_families_;
1195   const uint64_t recovering_log_number_;
1196   // log number that all Memtables inserted into should reference
1197   uint64_t log_number_ref_;
1198   DBImpl* db_;
1199   const bool concurrent_memtable_writes_;
1200   bool       post_info_created_;
1201 
1202   bool* has_valid_writes_;
1203   // On some (!) platforms just default creating
1204   // a map is too expensive in the Write() path as they
1205   // cause memory allocations though unused.
1206   // Make creation optional but do not incur
1207   // std::unique_ptr additional allocation
1208   using MemPostInfoMap = std::map<MemTable*, MemTablePostProcessInfo>;
1209   using PostMapType = std::aligned_storage<sizeof(MemPostInfoMap)>::type;
1210   PostMapType mem_post_info_map_;
1211   // current recovered transaction we are rebuilding (recovery)
1212   WriteBatch* rebuilding_trx_;
1213   SequenceNumber rebuilding_trx_seq_;
1214   // Increase seq number once per each write batch. Otherwise increase it once
1215   // per key.
1216   bool seq_per_batch_;
1217   // Whether the memtable write will be done only after the commit
1218   bool write_after_commit_;
1219   // Whether memtable write can be done before prepare
1220   bool write_before_prepare_;
1221   // Whether this batch was unprepared or not
1222   bool unprepared_batch_;
1223   using DupDetector = std::aligned_storage<sizeof(DuplicateDetector)>::type;
1224   DupDetector       duplicate_detector_;
1225   bool              dup_dectector_on_;
1226 
1227   bool hint_per_batch_;
1228   bool hint_created_;
1229   // Hints for this batch
1230   using HintMap = std::unordered_map<MemTable*, void*>;
1231   using HintMapType = std::aligned_storage<sizeof(HintMap)>::type;
1232   HintMapType hint_;
1233 
GetHintMap()1234   HintMap& GetHintMap() {
1235     assert(hint_per_batch_);
1236     if (!hint_created_) {
1237       new (&hint_) HintMap();
1238       hint_created_ = true;
1239     }
1240     return *reinterpret_cast<HintMap*>(&hint_);
1241   }
1242 
GetPostMap()1243   MemPostInfoMap& GetPostMap() {
1244     assert(concurrent_memtable_writes_);
1245     if(!post_info_created_) {
1246       new (&mem_post_info_map_) MemPostInfoMap();
1247       post_info_created_ = true;
1248     }
1249     return *reinterpret_cast<MemPostInfoMap*>(&mem_post_info_map_);
1250   }
1251 
IsDuplicateKeySeq(uint32_t column_family_id,const Slice & key)1252   bool IsDuplicateKeySeq(uint32_t column_family_id, const Slice& key) {
1253     assert(!write_after_commit_);
1254     assert(rebuilding_trx_ != nullptr);
1255     if (!dup_dectector_on_) {
1256       new (&duplicate_detector_) DuplicateDetector(db_);
1257       dup_dectector_on_ = true;
1258     }
1259     return reinterpret_cast<DuplicateDetector*>
1260       (&duplicate_detector_)->IsDuplicateKeySeq(column_family_id, key, sequence_);
1261   }
1262 
1263  protected:
WriteBeforePrepare() const1264   bool WriteBeforePrepare() const override { return write_before_prepare_; }
WriteAfterCommit() const1265   bool WriteAfterCommit() const override { return write_after_commit_; }
1266 
1267  public:
1268   // cf_mems should not be shared with concurrent inserters
MemTableInserter(SequenceNumber _sequence,ColumnFamilyMemTables * cf_mems,FlushScheduler * flush_scheduler,TrimHistoryScheduler * trim_history_scheduler,bool ignore_missing_column_families,uint64_t recovering_log_number,DB * db,bool concurrent_memtable_writes,bool * has_valid_writes=nullptr,bool seq_per_batch=false,bool batch_per_txn=true,bool hint_per_batch=false)1269   MemTableInserter(SequenceNumber _sequence, ColumnFamilyMemTables* cf_mems,
1270                    FlushScheduler* flush_scheduler,
1271                    TrimHistoryScheduler* trim_history_scheduler,
1272                    bool ignore_missing_column_families,
1273                    uint64_t recovering_log_number, DB* db,
1274                    bool concurrent_memtable_writes,
1275                    bool* has_valid_writes = nullptr, bool seq_per_batch = false,
1276                    bool batch_per_txn = true, bool hint_per_batch = false)
1277       : sequence_(_sequence),
1278         cf_mems_(cf_mems),
1279         flush_scheduler_(flush_scheduler),
1280         trim_history_scheduler_(trim_history_scheduler),
1281         ignore_missing_column_families_(ignore_missing_column_families),
1282         recovering_log_number_(recovering_log_number),
1283         log_number_ref_(0),
1284         db_(static_cast_with_check<DBImpl, DB>(db)),
1285         concurrent_memtable_writes_(concurrent_memtable_writes),
1286         post_info_created_(false),
1287         has_valid_writes_(has_valid_writes),
1288         rebuilding_trx_(nullptr),
1289         rebuilding_trx_seq_(0),
1290         seq_per_batch_(seq_per_batch),
1291         // Write after commit currently uses one seq per key (instead of per
1292         // batch). So seq_per_batch being false indicates write_after_commit
1293         // approach.
1294         write_after_commit_(!seq_per_batch),
1295         // WriteUnprepared can write WriteBatches per transaction, so
1296         // batch_per_txn being false indicates write_before_prepare.
1297         write_before_prepare_(!batch_per_txn),
1298         unprepared_batch_(false),
1299         duplicate_detector_(),
1300         dup_dectector_on_(false),
1301         hint_per_batch_(hint_per_batch),
1302         hint_created_(false) {
1303     assert(cf_mems_);
1304   }
1305 
~MemTableInserter()1306   ~MemTableInserter() override {
1307     if (dup_dectector_on_) {
1308       reinterpret_cast<DuplicateDetector*>
1309         (&duplicate_detector_)->~DuplicateDetector();
1310     }
1311     if (post_info_created_) {
1312       reinterpret_cast<MemPostInfoMap*>
1313         (&mem_post_info_map_)->~MemPostInfoMap();
1314     }
1315     if (hint_created_) {
1316       for (auto iter : GetHintMap()) {
1317         delete[] reinterpret_cast<char*>(iter.second);
1318       }
1319       reinterpret_cast<HintMap*>(&hint_)->~HintMap();
1320     }
1321     delete rebuilding_trx_;
1322   }
1323 
1324   MemTableInserter(const MemTableInserter&) = delete;
1325   MemTableInserter& operator=(const MemTableInserter&) = delete;
1326 
1327   // The batch seq is regularly restarted; In normal mode it is set when
1328   // MemTableInserter is constructed in the write thread and in recovery mode it
1329   // is set when a batch, which is tagged with seq, is read from the WAL.
1330   // Within a sequenced batch, which could be a merge of multiple batches, we
1331   // have two policies to advance the seq: i) seq_per_key (default) and ii)
1332   // seq_per_batch. To implement the latter we need to mark the boundary between
1333   // the individual batches. The approach is this: 1) Use the terminating
1334   // markers to indicate the boundary (kTypeEndPrepareXID, kTypeCommitXID,
1335   // kTypeRollbackXID) 2) Terminate a batch with kTypeNoop in the absence of a
1336   // natural boundary marker.
MaybeAdvanceSeq(bool batch_boundry=false)1337   void MaybeAdvanceSeq(bool batch_boundry = false) {
1338     if (batch_boundry == seq_per_batch_) {
1339       sequence_++;
1340     }
1341   }
1342 
set_log_number_ref(uint64_t log)1343   void set_log_number_ref(uint64_t log) { log_number_ref_ = log; }
1344 
sequence() const1345   SequenceNumber sequence() const { return sequence_; }
1346 
PostProcess()1347   void PostProcess() {
1348     assert(concurrent_memtable_writes_);
1349     // If post info was not created there is nothing
1350     // to process and no need to create on demand
1351     if(post_info_created_) {
1352       for (auto& pair : GetPostMap()) {
1353         pair.first->BatchPostProcess(pair.second);
1354       }
1355     }
1356   }
1357 
SeekToColumnFamily(uint32_t column_family_id,Status * s)1358   bool SeekToColumnFamily(uint32_t column_family_id, Status* s) {
1359     // If we are in a concurrent mode, it is the caller's responsibility
1360     // to clone the original ColumnFamilyMemTables so that each thread
1361     // has its own instance.  Otherwise, it must be guaranteed that there
1362     // is no concurrent access
1363     bool found = cf_mems_->Seek(column_family_id);
1364     if (!found) {
1365       if (ignore_missing_column_families_) {
1366         *s = Status::OK();
1367       } else {
1368         *s = Status::InvalidArgument(
1369             "Invalid column family specified in write batch");
1370       }
1371       return false;
1372     }
1373     if (recovering_log_number_ != 0 &&
1374         recovering_log_number_ < cf_mems_->GetLogNumber()) {
1375       // This is true only in recovery environment (recovering_log_number_ is
1376       // always 0 in
1377       // non-recovery, regular write code-path)
1378       // * If recovering_log_number_ < cf_mems_->GetLogNumber(), this means that
1379       // column
1380       // family already contains updates from this log. We can't apply updates
1381       // twice because of update-in-place or merge workloads -- ignore the
1382       // update
1383       *s = Status::OK();
1384       return false;
1385     }
1386 
1387     if (has_valid_writes_ != nullptr) {
1388       *has_valid_writes_ = true;
1389     }
1390 
1391     if (log_number_ref_ > 0) {
1392       cf_mems_->GetMemTable()->RefLogContainingPrepSection(log_number_ref_);
1393     }
1394 
1395     return true;
1396   }
1397 
PutCFImpl(uint32_t column_family_id,const Slice & key,const Slice & value,ValueType value_type)1398   Status PutCFImpl(uint32_t column_family_id, const Slice& key,
1399                    const Slice& value, ValueType value_type) {
1400     // optimize for non-recovery mode
1401     if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
1402       WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value);
1403       return Status::OK();
1404       // else insert the values to the memtable right away
1405     }
1406 
1407     Status seek_status;
1408     if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) {
1409       bool batch_boundry = false;
1410       if (rebuilding_trx_ != nullptr) {
1411         assert(!write_after_commit_);
1412         // The CF is probably flushed and hence no need for insert but we still
1413         // need to keep track of the keys for upcoming rollback/commit.
1414         WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value);
1415         batch_boundry = IsDuplicateKeySeq(column_family_id, key);
1416       }
1417       MaybeAdvanceSeq(batch_boundry);
1418       return seek_status;
1419     }
1420     Status ret_status;
1421 
1422     MemTable* mem = cf_mems_->GetMemTable();
1423     auto* moptions = mem->GetImmutableMemTableOptions();
1424     // inplace_update_support is inconsistent with snapshots, and therefore with
1425     // any kind of transactions including the ones that use seq_per_batch
1426     assert(!seq_per_batch_ || !moptions->inplace_update_support);
1427     if (!moptions->inplace_update_support) {
1428       bool mem_res =
1429           mem->Add(sequence_, value_type, key, value,
1430                    concurrent_memtable_writes_, get_post_process_info(mem),
1431                    hint_per_batch_ ? &GetHintMap()[mem] : nullptr);
1432       if (UNLIKELY(!mem_res)) {
1433         assert(seq_per_batch_);
1434         ret_status = Status::TryAgain("key+seq exists");
1435         const bool BATCH_BOUNDRY = true;
1436         MaybeAdvanceSeq(BATCH_BOUNDRY);
1437       }
1438     } else if (moptions->inplace_callback == nullptr) {
1439       assert(!concurrent_memtable_writes_);
1440       mem->Update(sequence_, key, value);
1441     } else {
1442       assert(!concurrent_memtable_writes_);
1443       if (mem->UpdateCallback(sequence_, key, value)) {
1444       } else {
1445         // key not found in memtable. Do sst get, update, add
1446         SnapshotImpl read_from_snapshot;
1447         read_from_snapshot.number_ = sequence_;
1448         ReadOptions ropts;
1449         // it's going to be overwritten for sure, so no point caching data block
1450         // containing the old version
1451         ropts.fill_cache = false;
1452         ropts.snapshot = &read_from_snapshot;
1453 
1454         std::string prev_value;
1455         std::string merged_value;
1456 
1457         auto cf_handle = cf_mems_->GetColumnFamilyHandle();
1458         Status s = Status::NotSupported();
1459         if (db_ != nullptr && recovering_log_number_ == 0) {
1460           if (cf_handle == nullptr) {
1461             cf_handle = db_->DefaultColumnFamily();
1462           }
1463           s = db_->Get(ropts, cf_handle, key, &prev_value);
1464         }
1465 
1466         char* prev_buffer = const_cast<char*>(prev_value.c_str());
1467         uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
1468         auto status = moptions->inplace_callback(s.ok() ? prev_buffer : nullptr,
1469                                                  s.ok() ? &prev_size : nullptr,
1470                                                  value, &merged_value);
1471         if (status == UpdateStatus::UPDATED_INPLACE) {
1472           // prev_value is updated in-place with final value.
1473           bool mem_res __attribute__((__unused__));
1474           mem_res = mem->Add(
1475               sequence_, value_type, key, Slice(prev_buffer, prev_size));
1476           assert(mem_res);
1477           RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
1478         } else if (status == UpdateStatus::UPDATED) {
1479           // merged_value contains the final value.
1480           bool mem_res __attribute__((__unused__));
1481           mem_res =
1482               mem->Add(sequence_, value_type, key, Slice(merged_value));
1483           assert(mem_res);
1484           RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
1485         }
1486       }
1487     }
1488     // optimize for non-recovery mode
1489     if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
1490       assert(!write_after_commit_);
1491       // If the ret_status is TryAgain then let the next try to add the ky to
1492       // the rebuilding transaction object.
1493       WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value);
1494     }
1495     // Since all Puts are logged in transaction logs (if enabled), always bump
1496     // sequence number. Even if the update eventually fails and does not result
1497     // in memtable add/update.
1498     MaybeAdvanceSeq();
1499     CheckMemtableFull();
1500     return ret_status;
1501   }
1502 
PutCF(uint32_t column_family_id,const Slice & key,const Slice & value)1503   Status PutCF(uint32_t column_family_id, const Slice& key,
1504                const Slice& value) override {
1505     return PutCFImpl(column_family_id, key, value, kTypeValue);
1506   }
1507 
DeleteImpl(uint32_t,const Slice & key,const Slice & value,ValueType delete_type)1508   Status DeleteImpl(uint32_t /*column_family_id*/, const Slice& key,
1509                     const Slice& value, ValueType delete_type) {
1510     Status ret_status;
1511     MemTable* mem = cf_mems_->GetMemTable();
1512     bool mem_res =
1513         mem->Add(sequence_, delete_type, key, value,
1514                  concurrent_memtable_writes_, get_post_process_info(mem),
1515                  hint_per_batch_ ? &GetHintMap()[mem] : nullptr);
1516     if (UNLIKELY(!mem_res)) {
1517       assert(seq_per_batch_);
1518       ret_status = Status::TryAgain("key+seq exists");
1519       const bool BATCH_BOUNDRY = true;
1520       MaybeAdvanceSeq(BATCH_BOUNDRY);
1521     }
1522     MaybeAdvanceSeq();
1523     CheckMemtableFull();
1524     return ret_status;
1525   }
1526 
DeleteCF(uint32_t column_family_id,const Slice & key)1527   Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
1528     // optimize for non-recovery mode
1529     if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
1530       WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
1531       return Status::OK();
1532       // else insert the values to the memtable right away
1533     }
1534 
1535     Status seek_status;
1536     if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) {
1537       bool batch_boundry = false;
1538       if (rebuilding_trx_ != nullptr) {
1539         assert(!write_after_commit_);
1540         // The CF is probably flushed and hence no need for insert but we still
1541         // need to keep track of the keys for upcoming rollback/commit.
1542         WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
1543         batch_boundry = IsDuplicateKeySeq(column_family_id, key);
1544       }
1545       MaybeAdvanceSeq(batch_boundry);
1546       return seek_status;
1547     }
1548 
1549     auto ret_status = DeleteImpl(column_family_id, key, Slice(), kTypeDeletion);
1550     // optimize for non-recovery mode
1551     if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
1552       assert(!write_after_commit_);
1553       // If the ret_status is TryAgain then let the next try to add the ky to
1554       // the rebuilding transaction object.
1555       WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
1556     }
1557     return ret_status;
1558   }
1559 
SingleDeleteCF(uint32_t column_family_id,const Slice & key)1560   Status SingleDeleteCF(uint32_t column_family_id, const Slice& key) override {
1561     // optimize for non-recovery mode
1562     if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
1563       WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id, key);
1564       return Status::OK();
1565       // else insert the values to the memtable right away
1566     }
1567 
1568     Status seek_status;
1569     if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) {
1570       bool batch_boundry = false;
1571       if (rebuilding_trx_ != nullptr) {
1572         assert(!write_after_commit_);
1573         // The CF is probably flushed and hence no need for insert but we still
1574         // need to keep track of the keys for upcoming rollback/commit.
1575         WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id,
1576                                          key);
1577         batch_boundry = IsDuplicateKeySeq(column_family_id, key);
1578       }
1579       MaybeAdvanceSeq(batch_boundry);
1580       return seek_status;
1581     }
1582 
1583     auto ret_status =
1584         DeleteImpl(column_family_id, key, Slice(), kTypeSingleDeletion);
1585     // optimize for non-recovery mode
1586     if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
1587       assert(!write_after_commit_);
1588       // If the ret_status is TryAgain then let the next try to add the ky to
1589       // the rebuilding transaction object.
1590       WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id, key);
1591     }
1592     return ret_status;
1593   }
1594 
DeleteRangeCF(uint32_t column_family_id,const Slice & begin_key,const Slice & end_key)1595   Status DeleteRangeCF(uint32_t column_family_id, const Slice& begin_key,
1596                        const Slice& end_key) override {
1597     // optimize for non-recovery mode
1598     if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
1599       WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id,
1600                                       begin_key, end_key);
1601       return Status::OK();
1602       // else insert the values to the memtable right away
1603     }
1604 
1605     Status seek_status;
1606     if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) {
1607       bool batch_boundry = false;
1608       if (rebuilding_trx_ != nullptr) {
1609         assert(!write_after_commit_);
1610         // The CF is probably flushed and hence no need for insert but we still
1611         // need to keep track of the keys for upcoming rollback/commit.
1612         WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id,
1613                                         begin_key, end_key);
1614         // TODO(myabandeh): when transactional DeleteRange support is added,
1615         // check if end_key must also be added.
1616         batch_boundry = IsDuplicateKeySeq(column_family_id, begin_key);
1617       }
1618       MaybeAdvanceSeq(batch_boundry);
1619       return seek_status;
1620     }
1621     if (db_ != nullptr) {
1622       auto cf_handle = cf_mems_->GetColumnFamilyHandle();
1623       if (cf_handle == nullptr) {
1624         cf_handle = db_->DefaultColumnFamily();
1625       }
1626       auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cf_handle)->cfd();
1627       if (!cfd->is_delete_range_supported()) {
1628         return Status::NotSupported(
1629             std::string("DeleteRange not supported for table type ") +
1630             cfd->ioptions()->table_factory->Name() + " in CF " +
1631             cfd->GetName());
1632       }
1633     }
1634 
1635     auto ret_status =
1636         DeleteImpl(column_family_id, begin_key, end_key, kTypeRangeDeletion);
1637     // optimize for non-recovery mode
1638     if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
1639       assert(!write_after_commit_);
1640       // If the ret_status is TryAgain then let the next try to add the ky to
1641       // the rebuilding transaction object.
1642       WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id,
1643                                       begin_key, end_key);
1644     }
1645     return ret_status;
1646   }
1647 
MergeCF(uint32_t column_family_id,const Slice & key,const Slice & value)1648   Status MergeCF(uint32_t column_family_id, const Slice& key,
1649                  const Slice& value) override {
1650     // optimize for non-recovery mode
1651     if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
1652       WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key, value);
1653       return Status::OK();
1654       // else insert the values to the memtable right away
1655     }
1656 
1657     Status seek_status;
1658     if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) {
1659       bool batch_boundry = false;
1660       if (rebuilding_trx_ != nullptr) {
1661         assert(!write_after_commit_);
1662         // The CF is probably flushed and hence no need for insert but we still
1663         // need to keep track of the keys for upcoming rollback/commit.
1664         WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key,
1665                                   value);
1666         batch_boundry = IsDuplicateKeySeq(column_family_id, key);
1667       }
1668       MaybeAdvanceSeq(batch_boundry);
1669       return seek_status;
1670     }
1671 
1672     Status ret_status;
1673     MemTable* mem = cf_mems_->GetMemTable();
1674     auto* moptions = mem->GetImmutableMemTableOptions();
1675     bool perform_merge = false;
1676     assert(!concurrent_memtable_writes_ ||
1677            moptions->max_successive_merges == 0);
1678 
1679     // If we pass DB through and options.max_successive_merges is hit
1680     // during recovery, Get() will be issued which will try to acquire
1681     // DB mutex and cause deadlock, as DB mutex is already held.
1682     // So we disable merge in recovery
1683     if (moptions->max_successive_merges > 0 && db_ != nullptr &&
1684         recovering_log_number_ == 0) {
1685       assert(!concurrent_memtable_writes_);
1686       LookupKey lkey(key, sequence_);
1687 
1688       // Count the number of successive merges at the head
1689       // of the key in the memtable
1690       size_t num_merges = mem->CountSuccessiveMergeEntries(lkey);
1691 
1692       if (num_merges >= moptions->max_successive_merges) {
1693         perform_merge = true;
1694       }
1695     }
1696 
1697     if (perform_merge) {
1698       // 1) Get the existing value
1699       std::string get_value;
1700 
1701       // Pass in the sequence number so that we also include previous merge
1702       // operations in the same batch.
1703       SnapshotImpl read_from_snapshot;
1704       read_from_snapshot.number_ = sequence_;
1705       ReadOptions read_options;
1706       read_options.snapshot = &read_from_snapshot;
1707 
1708       auto cf_handle = cf_mems_->GetColumnFamilyHandle();
1709       if (cf_handle == nullptr) {
1710         cf_handle = db_->DefaultColumnFamily();
1711       }
1712       db_->Get(read_options, cf_handle, key, &get_value);
1713       Slice get_value_slice = Slice(get_value);
1714 
1715       // 2) Apply this merge
1716       auto merge_operator = moptions->merge_operator;
1717       assert(merge_operator);
1718 
1719       std::string new_value;
1720 
1721       Status merge_status = MergeHelper::TimedFullMerge(
1722           merge_operator, key, &get_value_slice, {value}, &new_value,
1723           moptions->info_log, moptions->statistics, Env::Default());
1724 
1725       if (!merge_status.ok()) {
1726         // Failed to merge!
1727         // Store the delta in memtable
1728         perform_merge = false;
1729       } else {
1730         // 3) Add value to memtable
1731         assert(!concurrent_memtable_writes_);
1732         bool mem_res = mem->Add(sequence_, kTypeValue, key, new_value);
1733         if (UNLIKELY(!mem_res)) {
1734           assert(seq_per_batch_);
1735           ret_status = Status::TryAgain("key+seq exists");
1736           const bool BATCH_BOUNDRY = true;
1737           MaybeAdvanceSeq(BATCH_BOUNDRY);
1738         }
1739       }
1740     }
1741 
1742     if (!perform_merge) {
1743       // Add merge operator to memtable
1744       bool mem_res =
1745           mem->Add(sequence_, kTypeMerge, key, value,
1746                    concurrent_memtable_writes_, get_post_process_info(mem));
1747       if (UNLIKELY(!mem_res)) {
1748         assert(seq_per_batch_);
1749         ret_status = Status::TryAgain("key+seq exists");
1750         const bool BATCH_BOUNDRY = true;
1751         MaybeAdvanceSeq(BATCH_BOUNDRY);
1752       }
1753     }
1754 
1755     // optimize for non-recovery mode
1756     if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
1757       assert(!write_after_commit_);
1758       // If the ret_status is TryAgain then let the next try to add the ky to
1759       // the rebuilding transaction object.
1760       WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key, value);
1761     }
1762     MaybeAdvanceSeq();
1763     CheckMemtableFull();
1764     return ret_status;
1765   }
1766 
PutBlobIndexCF(uint32_t column_family_id,const Slice & key,const Slice & value)1767   Status PutBlobIndexCF(uint32_t column_family_id, const Slice& key,
1768                         const Slice& value) override {
1769     // Same as PutCF except for value type.
1770     return PutCFImpl(column_family_id, key, value, kTypeBlobIndex);
1771   }
1772 
CheckMemtableFull()1773   void CheckMemtableFull() {
1774     if (flush_scheduler_ != nullptr) {
1775       auto* cfd = cf_mems_->current();
1776       assert(cfd != nullptr);
1777       if (cfd->mem()->ShouldScheduleFlush() &&
1778           cfd->mem()->MarkFlushScheduled()) {
1779         // MarkFlushScheduled only returns true if we are the one that
1780         // should take action, so no need to dedup further
1781         flush_scheduler_->ScheduleWork(cfd);
1782       }
1783     }
1784     // check if memtable_list size exceeds max_write_buffer_size_to_maintain
1785     if (trim_history_scheduler_ != nullptr) {
1786       auto* cfd = cf_mems_->current();
1787 
1788       assert(cfd);
1789       assert(cfd->ioptions());
1790 
1791       const size_t size_to_maintain = static_cast<size_t>(
1792           cfd->ioptions()->max_write_buffer_size_to_maintain);
1793 
1794       if (size_to_maintain > 0) {
1795         MemTableList* const imm = cfd->imm();
1796         assert(imm);
1797 
1798         if (imm->HasHistory()) {
1799           const MemTable* const mem = cfd->mem();
1800           assert(mem);
1801 
1802           if (mem->ApproximateMemoryUsageFast() +
1803                       imm->ApproximateMemoryUsageExcludingLast() >=
1804                   size_to_maintain &&
1805               imm->MarkTrimHistoryNeeded()) {
1806             trim_history_scheduler_->ScheduleWork(cfd);
1807           }
1808         }
1809       }
1810     }
1811   }
1812 
1813   // The write batch handler calls MarkBeginPrepare with unprepare set to true
1814   // if it encounters the kTypeBeginUnprepareXID marker.
MarkBeginPrepare(bool unprepare)1815   Status MarkBeginPrepare(bool unprepare) override {
1816     assert(rebuilding_trx_ == nullptr);
1817     assert(db_);
1818 
1819     if (recovering_log_number_ != 0) {
1820       // during recovery we rebuild a hollow transaction
1821       // from all encountered prepare sections of the wal
1822       if (db_->allow_2pc() == false) {
1823         return Status::NotSupported(
1824             "WAL contains prepared transactions. Open with "
1825             "TransactionDB::Open().");
1826       }
1827 
1828       // we are now iterating through a prepared section
1829       rebuilding_trx_ = new WriteBatch();
1830       rebuilding_trx_seq_ = sequence_;
1831       // Verify that we have matching MarkBeginPrepare/MarkEndPrepare markers.
1832       // unprepared_batch_ should be false because it is false by default, and
1833       // gets reset to false in MarkEndPrepare.
1834       assert(!unprepared_batch_);
1835       unprepared_batch_ = unprepare;
1836 
1837       if (has_valid_writes_ != nullptr) {
1838         *has_valid_writes_ = true;
1839       }
1840     }
1841 
1842     return Status::OK();
1843   }
1844 
MarkEndPrepare(const Slice & name)1845   Status MarkEndPrepare(const Slice& name) override {
1846     assert(db_);
1847     assert((rebuilding_trx_ != nullptr) == (recovering_log_number_ != 0));
1848 
1849     if (recovering_log_number_ != 0) {
1850       assert(db_->allow_2pc());
1851       size_t batch_cnt =
1852           write_after_commit_
1853               ? 0  // 0 will disable further checks
1854               : static_cast<size_t>(sequence_ - rebuilding_trx_seq_ + 1);
1855       db_->InsertRecoveredTransaction(recovering_log_number_, name.ToString(),
1856                                       rebuilding_trx_, rebuilding_trx_seq_,
1857                                       batch_cnt, unprepared_batch_);
1858       unprepared_batch_ = false;
1859       rebuilding_trx_ = nullptr;
1860     } else {
1861       assert(rebuilding_trx_ == nullptr);
1862     }
1863     const bool batch_boundry = true;
1864     MaybeAdvanceSeq(batch_boundry);
1865 
1866     return Status::OK();
1867   }
1868 
MarkNoop(bool empty_batch)1869   Status MarkNoop(bool empty_batch) override {
1870     // A hack in pessimistic transaction could result into a noop at the start
1871     // of the write batch, that should be ignored.
1872     if (!empty_batch) {
1873       // In the absence of Prepare markers, a kTypeNoop tag indicates the end of
1874       // a batch. This happens when write batch commits skipping the prepare
1875       // phase.
1876       const bool batch_boundry = true;
1877       MaybeAdvanceSeq(batch_boundry);
1878     }
1879     return Status::OK();
1880   }
1881 
MarkCommit(const Slice & name)1882   Status MarkCommit(const Slice& name) override {
1883     assert(db_);
1884 
1885     Status s;
1886 
1887     if (recovering_log_number_ != 0) {
1888       // in recovery when we encounter a commit marker
1889       // we lookup this transaction in our set of rebuilt transactions
1890       // and commit.
1891       auto trx = db_->GetRecoveredTransaction(name.ToString());
1892 
1893       // the log containing the prepared section may have
1894       // been released in the last incarnation because the
1895       // data was flushed to L0
1896       if (trx != nullptr) {
1897         // at this point individual CF lognumbers will prevent
1898         // duplicate re-insertion of values.
1899         assert(log_number_ref_ == 0);
1900         if (write_after_commit_) {
1901           // write_after_commit_ can only have one batch in trx.
1902           assert(trx->batches_.size() == 1);
1903           const auto& batch_info = trx->batches_.begin()->second;
1904           // all inserts must reference this trx log number
1905           log_number_ref_ = batch_info.log_number_;
1906           s = batch_info.batch_->Iterate(this);
1907           log_number_ref_ = 0;
1908         }
1909         // else the values are already inserted before the commit
1910 
1911         if (s.ok()) {
1912           db_->DeleteRecoveredTransaction(name.ToString());
1913         }
1914         if (has_valid_writes_ != nullptr) {
1915           *has_valid_writes_ = true;
1916         }
1917       }
1918     } else {
1919       // When writes are not delayed until commit, there is no disconnect
1920       // between a memtable write and the WAL that supports it. So the commit
1921       // need not reference any log as the only log to which it depends.
1922       assert(!write_after_commit_ || log_number_ref_ > 0);
1923     }
1924     const bool batch_boundry = true;
1925     MaybeAdvanceSeq(batch_boundry);
1926 
1927     return s;
1928   }
1929 
MarkRollback(const Slice & name)1930   Status MarkRollback(const Slice& name) override {
1931     assert(db_);
1932 
1933     if (recovering_log_number_ != 0) {
1934       auto trx = db_->GetRecoveredTransaction(name.ToString());
1935 
1936       // the log containing the transactions prep section
1937       // may have been released in the previous incarnation
1938       // because we knew it had been rolled back
1939       if (trx != nullptr) {
1940         db_->DeleteRecoveredTransaction(name.ToString());
1941       }
1942     } else {
1943       // in non recovery we simply ignore this tag
1944     }
1945 
1946     const bool batch_boundry = true;
1947     MaybeAdvanceSeq(batch_boundry);
1948 
1949     return Status::OK();
1950   }
1951 
1952  private:
get_post_process_info(MemTable * mem)1953   MemTablePostProcessInfo* get_post_process_info(MemTable* mem) {
1954     if (!concurrent_memtable_writes_) {
1955       // No need to batch counters locally if we don't use concurrent mode.
1956       return nullptr;
1957     }
1958     return &GetPostMap()[mem];
1959   }
1960 };
1961 
1962 // This function can only be called in these conditions:
1963 // 1) During Recovery()
1964 // 2) During Write(), in a single-threaded write thread
1965 // 3) During Write(), in a concurrent context where memtables has been cloned
1966 // The reason is that it calls memtables->Seek(), which has a stateful cache
InsertInto(WriteThread::WriteGroup & write_group,SequenceNumber sequence,ColumnFamilyMemTables * memtables,FlushScheduler * flush_scheduler,TrimHistoryScheduler * trim_history_scheduler,bool ignore_missing_column_families,uint64_t recovery_log_number,DB * db,bool concurrent_memtable_writes,bool seq_per_batch,bool batch_per_txn)1967 Status WriteBatchInternal::InsertInto(
1968     WriteThread::WriteGroup& write_group, SequenceNumber sequence,
1969     ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
1970     TrimHistoryScheduler* trim_history_scheduler,
1971     bool ignore_missing_column_families, uint64_t recovery_log_number, DB* db,
1972     bool concurrent_memtable_writes, bool seq_per_batch, bool batch_per_txn) {
1973   MemTableInserter inserter(
1974       sequence, memtables, flush_scheduler, trim_history_scheduler,
1975       ignore_missing_column_families, recovery_log_number, db,
1976       concurrent_memtable_writes, nullptr /*has_valid_writes*/, seq_per_batch,
1977       batch_per_txn);
1978   for (auto w : write_group) {
1979     if (w->CallbackFailed()) {
1980       continue;
1981     }
1982     w->sequence = inserter.sequence();
1983     if (!w->ShouldWriteToMemtable()) {
1984       // In seq_per_batch_ mode this advances the seq by one.
1985       inserter.MaybeAdvanceSeq(true);
1986       continue;
1987     }
1988     SetSequence(w->batch, inserter.sequence());
1989     inserter.set_log_number_ref(w->log_ref);
1990     w->status = w->batch->Iterate(&inserter);
1991     if (!w->status.ok()) {
1992       return w->status;
1993     }
1994     assert(!seq_per_batch || w->batch_cnt != 0);
1995     assert(!seq_per_batch || inserter.sequence() - w->sequence == w->batch_cnt);
1996   }
1997   return Status::OK();
1998 }
1999 
InsertInto(WriteThread::Writer * writer,SequenceNumber sequence,ColumnFamilyMemTables * memtables,FlushScheduler * flush_scheduler,TrimHistoryScheduler * trim_history_scheduler,bool ignore_missing_column_families,uint64_t log_number,DB * db,bool concurrent_memtable_writes,bool seq_per_batch,size_t batch_cnt,bool batch_per_txn,bool hint_per_batch)2000 Status WriteBatchInternal::InsertInto(
2001     WriteThread::Writer* writer, SequenceNumber sequence,
2002     ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
2003     TrimHistoryScheduler* trim_history_scheduler,
2004     bool ignore_missing_column_families, uint64_t log_number, DB* db,
2005     bool concurrent_memtable_writes, bool seq_per_batch, size_t batch_cnt,
2006     bool batch_per_txn, bool hint_per_batch) {
2007 #ifdef NDEBUG
2008   (void)batch_cnt;
2009 #endif
2010   assert(writer->ShouldWriteToMemtable());
2011   MemTableInserter inserter(
2012       sequence, memtables, flush_scheduler, trim_history_scheduler,
2013       ignore_missing_column_families, log_number, db,
2014       concurrent_memtable_writes, nullptr /*has_valid_writes*/, seq_per_batch,
2015       batch_per_txn, hint_per_batch);
2016   SetSequence(writer->batch, sequence);
2017   inserter.set_log_number_ref(writer->log_ref);
2018   Status s = writer->batch->Iterate(&inserter);
2019   assert(!seq_per_batch || batch_cnt != 0);
2020   assert(!seq_per_batch || inserter.sequence() - sequence == batch_cnt);
2021   if (concurrent_memtable_writes) {
2022     inserter.PostProcess();
2023   }
2024   return s;
2025 }
2026 
InsertInto(const WriteBatch * batch,ColumnFamilyMemTables * memtables,FlushScheduler * flush_scheduler,TrimHistoryScheduler * trim_history_scheduler,bool ignore_missing_column_families,uint64_t log_number,DB * db,bool concurrent_memtable_writes,SequenceNumber * next_seq,bool * has_valid_writes,bool seq_per_batch,bool batch_per_txn)2027 Status WriteBatchInternal::InsertInto(
2028     const WriteBatch* batch, ColumnFamilyMemTables* memtables,
2029     FlushScheduler* flush_scheduler,
2030     TrimHistoryScheduler* trim_history_scheduler,
2031     bool ignore_missing_column_families, uint64_t log_number, DB* db,
2032     bool concurrent_memtable_writes, SequenceNumber* next_seq,
2033     bool* has_valid_writes, bool seq_per_batch, bool batch_per_txn) {
2034   MemTableInserter inserter(Sequence(batch), memtables, flush_scheduler,
2035                             trim_history_scheduler,
2036                             ignore_missing_column_families, log_number, db,
2037                             concurrent_memtable_writes, has_valid_writes,
2038                             seq_per_batch, batch_per_txn);
2039   Status s = batch->Iterate(&inserter);
2040   if (next_seq != nullptr) {
2041     *next_seq = inserter.sequence();
2042   }
2043   if (concurrent_memtable_writes) {
2044     inserter.PostProcess();
2045   }
2046   return s;
2047 }
2048 
SetContents(WriteBatch * b,const Slice & contents)2049 Status WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
2050   assert(contents.size() >= WriteBatchInternal::kHeader);
2051   b->rep_.assign(contents.data(), contents.size());
2052   b->content_flags_.store(ContentFlags::DEFERRED, std::memory_order_relaxed);
2053   return Status::OK();
2054 }
2055 
Append(WriteBatch * dst,const WriteBatch * src,const bool wal_only)2056 Status WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src,
2057                                   const bool wal_only) {
2058   size_t src_len;
2059   int src_count;
2060   uint32_t src_flags;
2061 
2062   const SavePoint& batch_end = src->GetWalTerminationPoint();
2063 
2064   if (wal_only && !batch_end.is_cleared()) {
2065     src_len = batch_end.size - WriteBatchInternal::kHeader;
2066     src_count = batch_end.count;
2067     src_flags = batch_end.content_flags;
2068   } else {
2069     src_len = src->rep_.size() - WriteBatchInternal::kHeader;
2070     src_count = Count(src);
2071     src_flags = src->content_flags_.load(std::memory_order_relaxed);
2072   }
2073 
2074   SetCount(dst, Count(dst) + src_count);
2075   assert(src->rep_.size() >= WriteBatchInternal::kHeader);
2076   dst->rep_.append(src->rep_.data() + WriteBatchInternal::kHeader, src_len);
2077   dst->content_flags_.store(
2078       dst->content_flags_.load(std::memory_order_relaxed) | src_flags,
2079       std::memory_order_relaxed);
2080   return Status::OK();
2081 }
2082 
AppendedByteSize(size_t leftByteSize,size_t rightByteSize)2083 size_t WriteBatchInternal::AppendedByteSize(size_t leftByteSize,
2084                                             size_t rightByteSize) {
2085   if (leftByteSize == 0 || rightByteSize == 0) {
2086     return leftByteSize + rightByteSize;
2087   } else {
2088     return leftByteSize + rightByteSize - WriteBatchInternal::kHeader;
2089   }
2090 }
2091 
2092 }  // namespace ROCKSDB_NAMESPACE
2093