1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 //
10 // WriteBatch::rep_ :=
11 // sequence: fixed64
12 // count: fixed32
13 // data: record[count]
14 // record :=
15 // kTypeValue varstring varstring
16 // kTypeDeletion varstring
17 // kTypeSingleDeletion varstring
18 // kTypeRangeDeletion varstring varstring
19 // kTypeMerge varstring varstring
20 // kTypeColumnFamilyValue varint32 varstring varstring
21 // kTypeColumnFamilyDeletion varint32 varstring
22 // kTypeColumnFamilySingleDeletion varint32 varstring
23 // kTypeColumnFamilyRangeDeletion varint32 varstring varstring
24 // kTypeColumnFamilyMerge varint32 varstring varstring
25 // kTypeBeginPrepareXID varstring
26 // kTypeEndPrepareXID
27 // kTypeCommitXID varstring
28 // kTypeRollbackXID varstring
29 // kTypeBeginPersistedPrepareXID varstring
30 // kTypeBeginUnprepareXID varstring
31 // kTypeNoop
32 // varstring :=
33 // len: varint32
34 // data: uint8[len]
35
36 #include "rocksdb/write_batch.h"
37
38 #include <map>
39 #include <stack>
40 #include <stdexcept>
41 #include <type_traits>
42 #include <unordered_map>
43 #include <vector>
44
45 #include "db/column_family.h"
46 #include "db/db_impl/db_impl.h"
47 #include "db/dbformat.h"
48 #include "db/flush_scheduler.h"
49 #include "db/kv_checksum.h"
50 #include "db/memtable.h"
51 #include "db/merge_context.h"
52 #include "db/snapshot_impl.h"
53 #include "db/trim_history_scheduler.h"
54 #include "db/write_batch_internal.h"
55 #include "monitoring/perf_context_imp.h"
56 #include "monitoring/statistics.h"
57 #include "port/lang.h"
58 #include "rocksdb/merge_operator.h"
59 #include "rocksdb/system_clock.h"
60 #include "util/autovector.h"
61 #include "util/cast_util.h"
62 #include "util/coding.h"
63 #include "util/duplicate_detector.h"
64 #include "util/string_util.h"
65
66 namespace ROCKSDB_NAMESPACE {
67
68 // anon namespace for file-local types
69 namespace {
70
71 enum ContentFlags : uint32_t {
72 DEFERRED = 1 << 0,
73 HAS_PUT = 1 << 1,
74 HAS_DELETE = 1 << 2,
75 HAS_SINGLE_DELETE = 1 << 3,
76 HAS_MERGE = 1 << 4,
77 HAS_BEGIN_PREPARE = 1 << 5,
78 HAS_END_PREPARE = 1 << 6,
79 HAS_COMMIT = 1 << 7,
80 HAS_ROLLBACK = 1 << 8,
81 HAS_DELETE_RANGE = 1 << 9,
82 HAS_BLOB_INDEX = 1 << 10,
83 HAS_BEGIN_UNPREPARE = 1 << 11,
84 };
85
86 struct BatchContentClassifier : public WriteBatch::Handler {
87 uint32_t content_flags = 0;
88
PutCFROCKSDB_NAMESPACE::__anon011bc7090111::BatchContentClassifier89 Status PutCF(uint32_t, const Slice&, const Slice&) override {
90 content_flags |= ContentFlags::HAS_PUT;
91 return Status::OK();
92 }
93
DeleteCFROCKSDB_NAMESPACE::__anon011bc7090111::BatchContentClassifier94 Status DeleteCF(uint32_t, const Slice&) override {
95 content_flags |= ContentFlags::HAS_DELETE;
96 return Status::OK();
97 }
98
SingleDeleteCFROCKSDB_NAMESPACE::__anon011bc7090111::BatchContentClassifier99 Status SingleDeleteCF(uint32_t, const Slice&) override {
100 content_flags |= ContentFlags::HAS_SINGLE_DELETE;
101 return Status::OK();
102 }
103
DeleteRangeCFROCKSDB_NAMESPACE::__anon011bc7090111::BatchContentClassifier104 Status DeleteRangeCF(uint32_t, const Slice&, const Slice&) override {
105 content_flags |= ContentFlags::HAS_DELETE_RANGE;
106 return Status::OK();
107 }
108
MergeCFROCKSDB_NAMESPACE::__anon011bc7090111::BatchContentClassifier109 Status MergeCF(uint32_t, const Slice&, const Slice&) override {
110 content_flags |= ContentFlags::HAS_MERGE;
111 return Status::OK();
112 }
113
PutBlobIndexCFROCKSDB_NAMESPACE::__anon011bc7090111::BatchContentClassifier114 Status PutBlobIndexCF(uint32_t, const Slice&, const Slice&) override {
115 content_flags |= ContentFlags::HAS_BLOB_INDEX;
116 return Status::OK();
117 }
118
MarkBeginPrepareROCKSDB_NAMESPACE::__anon011bc7090111::BatchContentClassifier119 Status MarkBeginPrepare(bool unprepare) override {
120 content_flags |= ContentFlags::HAS_BEGIN_PREPARE;
121 if (unprepare) {
122 content_flags |= ContentFlags::HAS_BEGIN_UNPREPARE;
123 }
124 return Status::OK();
125 }
126
MarkEndPrepareROCKSDB_NAMESPACE::__anon011bc7090111::BatchContentClassifier127 Status MarkEndPrepare(const Slice&) override {
128 content_flags |= ContentFlags::HAS_END_PREPARE;
129 return Status::OK();
130 }
131
MarkCommitROCKSDB_NAMESPACE::__anon011bc7090111::BatchContentClassifier132 Status MarkCommit(const Slice&) override {
133 content_flags |= ContentFlags::HAS_COMMIT;
134 return Status::OK();
135 }
136
MarkRollbackROCKSDB_NAMESPACE::__anon011bc7090111::BatchContentClassifier137 Status MarkRollback(const Slice&) override {
138 content_flags |= ContentFlags::HAS_ROLLBACK;
139 return Status::OK();
140 }
141 };
142
143 class TimestampAssigner : public WriteBatch::Handler {
144 public:
TimestampAssigner(const Slice & ts,WriteBatch::ProtectionInfo * prot_info)145 explicit TimestampAssigner(const Slice& ts,
146 WriteBatch::ProtectionInfo* prot_info)
147 : timestamp_(ts),
148 timestamps_(kEmptyTimestampList),
149 prot_info_(prot_info) {}
TimestampAssigner(const std::vector<Slice> & ts_list,WriteBatch::ProtectionInfo * prot_info)150 explicit TimestampAssigner(const std::vector<Slice>& ts_list,
151 WriteBatch::ProtectionInfo* prot_info)
152 : timestamps_(ts_list), prot_info_(prot_info) {
153 SanityCheck();
154 }
~TimestampAssigner()155 ~TimestampAssigner() override {}
156
PutCF(uint32_t,const Slice & key,const Slice &)157 Status PutCF(uint32_t, const Slice& key, const Slice&) override {
158 AssignTimestamp(key);
159 ++idx_;
160 return Status::OK();
161 }
162
DeleteCF(uint32_t,const Slice & key)163 Status DeleteCF(uint32_t, const Slice& key) override {
164 AssignTimestamp(key);
165 ++idx_;
166 return Status::OK();
167 }
168
SingleDeleteCF(uint32_t,const Slice & key)169 Status SingleDeleteCF(uint32_t, const Slice& key) override {
170 AssignTimestamp(key);
171 ++idx_;
172 return Status::OK();
173 }
174
DeleteRangeCF(uint32_t,const Slice & begin_key,const Slice &)175 Status DeleteRangeCF(uint32_t, const Slice& begin_key,
176 const Slice& /* end_key */) override {
177 AssignTimestamp(begin_key);
178 ++idx_;
179 return Status::OK();
180 }
181
MergeCF(uint32_t,const Slice & key,const Slice &)182 Status MergeCF(uint32_t, const Slice& key, const Slice&) override {
183 AssignTimestamp(key);
184 ++idx_;
185 return Status::OK();
186 }
187
PutBlobIndexCF(uint32_t,const Slice &,const Slice &)188 Status PutBlobIndexCF(uint32_t, const Slice&, const Slice&) override {
189 // TODO (yanqin): support blob db in the future.
190 return Status::OK();
191 }
192
MarkBeginPrepare(bool)193 Status MarkBeginPrepare(bool) override {
194 // TODO (yanqin): support in the future.
195 return Status::OK();
196 }
197
MarkEndPrepare(const Slice &)198 Status MarkEndPrepare(const Slice&) override {
199 // TODO (yanqin): support in the future.
200 return Status::OK();
201 }
202
MarkCommit(const Slice &)203 Status MarkCommit(const Slice&) override {
204 // TODO (yanqin): support in the future.
205 return Status::OK();
206 }
207
MarkRollback(const Slice &)208 Status MarkRollback(const Slice&) override {
209 // TODO (yanqin): support in the future.
210 return Status::OK();
211 }
212
213 private:
SanityCheck() const214 void SanityCheck() const {
215 assert(!timestamps_.empty());
216 #ifndef NDEBUG
217 const size_t ts_sz = timestamps_[0].size();
218 for (size_t i = 1; i != timestamps_.size(); ++i) {
219 assert(ts_sz == timestamps_[i].size());
220 }
221 #endif // !NDEBUG
222 }
223
AssignTimestamp(const Slice & key)224 void AssignTimestamp(const Slice& key) {
225 assert(timestamps_.empty() || idx_ < timestamps_.size());
226 const Slice& ts = timestamps_.empty() ? timestamp_ : timestamps_[idx_];
227 size_t ts_sz = ts.size();
228 char* ptr = const_cast<char*>(key.data() + key.size() - ts_sz);
229 if (prot_info_ != nullptr) {
230 Slice old_ts(ptr, ts_sz), new_ts(ts.data(), ts_sz);
231 prot_info_->entries_[idx_].UpdateT(old_ts, new_ts);
232 }
233 memcpy(ptr, ts.data(), ts_sz);
234 }
235
236 static const std::vector<Slice> kEmptyTimestampList;
237 const Slice timestamp_;
238 const std::vector<Slice>& timestamps_;
239 WriteBatch::ProtectionInfo* const prot_info_;
240 size_t idx_ = 0;
241
242 // No copy or move.
243 TimestampAssigner(const TimestampAssigner&) = delete;
244 TimestampAssigner(TimestampAssigner&&) = delete;
245 TimestampAssigner& operator=(const TimestampAssigner&) = delete;
246 TimestampAssigner&& operator=(TimestampAssigner&&) = delete;
247 };
248 const std::vector<Slice> TimestampAssigner::kEmptyTimestampList;
249
250 } // anon namespace
251
252 struct SavePoints {
253 std::stack<SavePoint, autovector<SavePoint>> stack;
254 };
255
WriteBatch(size_t reserved_bytes,size_t max_bytes)256 WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes)
257 : content_flags_(0), max_bytes_(max_bytes), rep_(), timestamp_size_(0) {
258 rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader)
259 ? reserved_bytes
260 : WriteBatchInternal::kHeader);
261 rep_.resize(WriteBatchInternal::kHeader);
262 }
263
WriteBatch(size_t reserved_bytes,size_t max_bytes,size_t ts_sz)264 WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes, size_t ts_sz)
265 : content_flags_(0), max_bytes_(max_bytes), rep_(), timestamp_size_(ts_sz) {
266 rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader) ?
267 reserved_bytes : WriteBatchInternal::kHeader);
268 rep_.resize(WriteBatchInternal::kHeader);
269 }
270
WriteBatch(size_t reserved_bytes,size_t max_bytes,size_t ts_sz,size_t protection_bytes_per_key)271 WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes, size_t ts_sz,
272 size_t protection_bytes_per_key)
273 : content_flags_(0), max_bytes_(max_bytes), rep_(), timestamp_size_(ts_sz) {
274 // Currently `protection_bytes_per_key` can only be enabled at 8 bytes per
275 // entry.
276 assert(protection_bytes_per_key == 0 || protection_bytes_per_key == 8);
277 if (protection_bytes_per_key != 0) {
278 prot_info_.reset(new WriteBatch::ProtectionInfo());
279 }
280 rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader)
281 ? reserved_bytes
282 : WriteBatchInternal::kHeader);
283 rep_.resize(WriteBatchInternal::kHeader);
284 }
285
WriteBatch(const std::string & rep)286 WriteBatch::WriteBatch(const std::string& rep)
287 : content_flags_(ContentFlags::DEFERRED),
288 max_bytes_(0),
289 rep_(rep),
290 timestamp_size_(0) {}
291
WriteBatch(std::string && rep)292 WriteBatch::WriteBatch(std::string&& rep)
293 : content_flags_(ContentFlags::DEFERRED),
294 max_bytes_(0),
295 rep_(std::move(rep)),
296 timestamp_size_(0) {}
297
WriteBatch(const WriteBatch & src)298 WriteBatch::WriteBatch(const WriteBatch& src)
299 : wal_term_point_(src.wal_term_point_),
300 content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
301 max_bytes_(src.max_bytes_),
302 rep_(src.rep_),
303 timestamp_size_(src.timestamp_size_) {
304 if (src.save_points_ != nullptr) {
305 save_points_.reset(new SavePoints());
306 save_points_->stack = src.save_points_->stack;
307 }
308 if (src.prot_info_ != nullptr) {
309 prot_info_.reset(new WriteBatch::ProtectionInfo());
310 prot_info_->entries_ = src.prot_info_->entries_;
311 }
312 }
313
WriteBatch(WriteBatch && src)314 WriteBatch::WriteBatch(WriteBatch&& src) noexcept
315 : save_points_(std::move(src.save_points_)),
316 wal_term_point_(std::move(src.wal_term_point_)),
317 content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
318 max_bytes_(src.max_bytes_),
319 prot_info_(std::move(src.prot_info_)),
320 rep_(std::move(src.rep_)),
321 timestamp_size_(src.timestamp_size_) {}
322
operator =(const WriteBatch & src)323 WriteBatch& WriteBatch::operator=(const WriteBatch& src) {
324 if (&src != this) {
325 this->~WriteBatch();
326 new (this) WriteBatch(src);
327 }
328 return *this;
329 }
330
operator =(WriteBatch && src)331 WriteBatch& WriteBatch::operator=(WriteBatch&& src) {
332 if (&src != this) {
333 this->~WriteBatch();
334 new (this) WriteBatch(std::move(src));
335 }
336 return *this;
337 }
338
~WriteBatch()339 WriteBatch::~WriteBatch() { }
340
~Handler()341 WriteBatch::Handler::~Handler() { }
342
LogData(const Slice &)343 void WriteBatch::Handler::LogData(const Slice& /*blob*/) {
344 // If the user has not specified something to do with blobs, then we ignore
345 // them.
346 }
347
Continue()348 bool WriteBatch::Handler::Continue() {
349 return true;
350 }
351
Clear()352 void WriteBatch::Clear() {
353 rep_.clear();
354 rep_.resize(WriteBatchInternal::kHeader);
355
356 content_flags_.store(0, std::memory_order_relaxed);
357
358 if (save_points_ != nullptr) {
359 while (!save_points_->stack.empty()) {
360 save_points_->stack.pop();
361 }
362 }
363
364 if (prot_info_ != nullptr) {
365 prot_info_->entries_.clear();
366 }
367 wal_term_point_.clear();
368 }
369
Count() const370 uint32_t WriteBatch::Count() const { return WriteBatchInternal::Count(this); }
371
ComputeContentFlags() const372 uint32_t WriteBatch::ComputeContentFlags() const {
373 auto rv = content_flags_.load(std::memory_order_relaxed);
374 if ((rv & ContentFlags::DEFERRED) != 0) {
375 BatchContentClassifier classifier;
376 // Should we handle status here?
377 Iterate(&classifier).PermitUncheckedError();
378 rv = classifier.content_flags;
379
380 // this method is conceptually const, because it is performing a lazy
381 // computation that doesn't affect the abstract state of the batch.
382 // content_flags_ is marked mutable so that we can perform the
383 // following assignment
384 content_flags_.store(rv, std::memory_order_relaxed);
385 }
386 return rv;
387 }
388
MarkWalTerminationPoint()389 void WriteBatch::MarkWalTerminationPoint() {
390 wal_term_point_.size = GetDataSize();
391 wal_term_point_.count = Count();
392 wal_term_point_.content_flags = content_flags_;
393 }
394
GetProtectionBytesPerKey() const395 size_t WriteBatch::GetProtectionBytesPerKey() const {
396 if (prot_info_ != nullptr) {
397 return prot_info_->GetBytesPerKey();
398 }
399 return 0;
400 }
401
HasPut() const402 bool WriteBatch::HasPut() const {
403 return (ComputeContentFlags() & ContentFlags::HAS_PUT) != 0;
404 }
405
HasDelete() const406 bool WriteBatch::HasDelete() const {
407 return (ComputeContentFlags() & ContentFlags::HAS_DELETE) != 0;
408 }
409
HasSingleDelete() const410 bool WriteBatch::HasSingleDelete() const {
411 return (ComputeContentFlags() & ContentFlags::HAS_SINGLE_DELETE) != 0;
412 }
413
HasDeleteRange() const414 bool WriteBatch::HasDeleteRange() const {
415 return (ComputeContentFlags() & ContentFlags::HAS_DELETE_RANGE) != 0;
416 }
417
HasMerge() const418 bool WriteBatch::HasMerge() const {
419 return (ComputeContentFlags() & ContentFlags::HAS_MERGE) != 0;
420 }
421
ReadKeyFromWriteBatchEntry(Slice * input,Slice * key,bool cf_record)422 bool ReadKeyFromWriteBatchEntry(Slice* input, Slice* key, bool cf_record) {
423 assert(input != nullptr && key != nullptr);
424 // Skip tag byte
425 input->remove_prefix(1);
426
427 if (cf_record) {
428 // Skip column_family bytes
429 uint32_t cf;
430 if (!GetVarint32(input, &cf)) {
431 return false;
432 }
433 }
434
435 // Extract key
436 return GetLengthPrefixedSlice(input, key);
437 }
438
HasBeginPrepare() const439 bool WriteBatch::HasBeginPrepare() const {
440 return (ComputeContentFlags() & ContentFlags::HAS_BEGIN_PREPARE) != 0;
441 }
442
HasEndPrepare() const443 bool WriteBatch::HasEndPrepare() const {
444 return (ComputeContentFlags() & ContentFlags::HAS_END_PREPARE) != 0;
445 }
446
HasCommit() const447 bool WriteBatch::HasCommit() const {
448 return (ComputeContentFlags() & ContentFlags::HAS_COMMIT) != 0;
449 }
450
HasRollback() const451 bool WriteBatch::HasRollback() const {
452 return (ComputeContentFlags() & ContentFlags::HAS_ROLLBACK) != 0;
453 }
454
ReadRecordFromWriteBatch(Slice * input,char * tag,uint32_t * column_family,Slice * key,Slice * value,Slice * blob,Slice * xid)455 Status ReadRecordFromWriteBatch(Slice* input, char* tag,
456 uint32_t* column_family, Slice* key,
457 Slice* value, Slice* blob, Slice* xid) {
458 assert(key != nullptr && value != nullptr);
459 *tag = (*input)[0];
460 input->remove_prefix(1);
461 *column_family = 0; // default
462 switch (*tag) {
463 case kTypeColumnFamilyValue:
464 if (!GetVarint32(input, column_family)) {
465 return Status::Corruption("bad WriteBatch Put");
466 }
467 FALLTHROUGH_INTENDED;
468 case kTypeValue:
469 if (!GetLengthPrefixedSlice(input, key) ||
470 !GetLengthPrefixedSlice(input, value)) {
471 return Status::Corruption("bad WriteBatch Put");
472 }
473 break;
474 case kTypeColumnFamilyDeletion:
475 case kTypeColumnFamilySingleDeletion:
476 if (!GetVarint32(input, column_family)) {
477 return Status::Corruption("bad WriteBatch Delete");
478 }
479 FALLTHROUGH_INTENDED;
480 case kTypeDeletion:
481 case kTypeSingleDeletion:
482 if (!GetLengthPrefixedSlice(input, key)) {
483 return Status::Corruption("bad WriteBatch Delete");
484 }
485 break;
486 case kTypeColumnFamilyRangeDeletion:
487 if (!GetVarint32(input, column_family)) {
488 return Status::Corruption("bad WriteBatch DeleteRange");
489 }
490 FALLTHROUGH_INTENDED;
491 case kTypeRangeDeletion:
492 // for range delete, "key" is begin_key, "value" is end_key
493 if (!GetLengthPrefixedSlice(input, key) ||
494 !GetLengthPrefixedSlice(input, value)) {
495 return Status::Corruption("bad WriteBatch DeleteRange");
496 }
497 break;
498 case kTypeColumnFamilyMerge:
499 if (!GetVarint32(input, column_family)) {
500 return Status::Corruption("bad WriteBatch Merge");
501 }
502 FALLTHROUGH_INTENDED;
503 case kTypeMerge:
504 if (!GetLengthPrefixedSlice(input, key) ||
505 !GetLengthPrefixedSlice(input, value)) {
506 return Status::Corruption("bad WriteBatch Merge");
507 }
508 break;
509 case kTypeColumnFamilyBlobIndex:
510 if (!GetVarint32(input, column_family)) {
511 return Status::Corruption("bad WriteBatch BlobIndex");
512 }
513 FALLTHROUGH_INTENDED;
514 case kTypeBlobIndex:
515 if (!GetLengthPrefixedSlice(input, key) ||
516 !GetLengthPrefixedSlice(input, value)) {
517 return Status::Corruption("bad WriteBatch BlobIndex");
518 }
519 break;
520 case kTypeLogData:
521 assert(blob != nullptr);
522 if (!GetLengthPrefixedSlice(input, blob)) {
523 return Status::Corruption("bad WriteBatch Blob");
524 }
525 break;
526 case kTypeNoop:
527 case kTypeBeginPrepareXID:
528 // This indicates that the prepared batch is also persisted in the db.
529 // This is used in WritePreparedTxn
530 case kTypeBeginPersistedPrepareXID:
531 // This is used in WriteUnpreparedTxn
532 case kTypeBeginUnprepareXID:
533 break;
534 case kTypeEndPrepareXID:
535 if (!GetLengthPrefixedSlice(input, xid)) {
536 return Status::Corruption("bad EndPrepare XID");
537 }
538 break;
539 case kTypeCommitXID:
540 if (!GetLengthPrefixedSlice(input, xid)) {
541 return Status::Corruption("bad Commit XID");
542 }
543 break;
544 case kTypeRollbackXID:
545 if (!GetLengthPrefixedSlice(input, xid)) {
546 return Status::Corruption("bad Rollback XID");
547 }
548 break;
549 default:
550 return Status::Corruption("unknown WriteBatch tag");
551 }
552 return Status::OK();
553 }
554
Iterate(Handler * handler) const555 Status WriteBatch::Iterate(Handler* handler) const {
556 if (rep_.size() < WriteBatchInternal::kHeader) {
557 return Status::Corruption("malformed WriteBatch (too small)");
558 }
559
560 return WriteBatchInternal::Iterate(this, handler, WriteBatchInternal::kHeader,
561 rep_.size());
562 }
563
Iterate(const WriteBatch * wb,WriteBatch::Handler * handler,size_t begin,size_t end)564 Status WriteBatchInternal::Iterate(const WriteBatch* wb,
565 WriteBatch::Handler* handler, size_t begin,
566 size_t end) {
567 if (begin > wb->rep_.size() || end > wb->rep_.size() || end < begin) {
568 return Status::Corruption("Invalid start/end bounds for Iterate");
569 }
570 assert(begin <= end);
571 Slice input(wb->rep_.data() + begin, static_cast<size_t>(end - begin));
572 bool whole_batch =
573 (begin == WriteBatchInternal::kHeader) && (end == wb->rep_.size());
574
575 Slice key, value, blob, xid;
576 // Sometimes a sub-batch starts with a Noop. We want to exclude such Noops as
577 // the batch boundary symbols otherwise we would mis-count the number of
578 // batches. We do that by checking whether the accumulated batch is empty
579 // before seeing the next Noop.
580 bool empty_batch = true;
581 uint32_t found = 0;
582 Status s;
583 char tag = 0;
584 uint32_t column_family = 0; // default
585 bool last_was_try_again = false;
586 bool handler_continue = true;
587 while (((s.ok() && !input.empty()) || UNLIKELY(s.IsTryAgain()))) {
588 handler_continue = handler->Continue();
589 if (!handler_continue) {
590 break;
591 }
592
593 if (LIKELY(!s.IsTryAgain())) {
594 last_was_try_again = false;
595 tag = 0;
596 column_family = 0; // default
597
598 s = ReadRecordFromWriteBatch(&input, &tag, &column_family, &key, &value,
599 &blob, &xid);
600 if (!s.ok()) {
601 return s;
602 }
603 } else {
604 assert(s.IsTryAgain());
605 assert(!last_was_try_again); // to detect infinite loop bugs
606 if (UNLIKELY(last_was_try_again)) {
607 return Status::Corruption(
608 "two consecutive TryAgain in WriteBatch handler; this is either a "
609 "software bug or data corruption.");
610 }
611 last_was_try_again = true;
612 s = Status::OK();
613 }
614
615 switch (tag) {
616 case kTypeColumnFamilyValue:
617 case kTypeValue:
618 assert(wb->content_flags_.load(std::memory_order_relaxed) &
619 (ContentFlags::DEFERRED | ContentFlags::HAS_PUT));
620 s = handler->PutCF(column_family, key, value);
621 if (LIKELY(s.ok())) {
622 empty_batch = false;
623 found++;
624 }
625 break;
626 case kTypeColumnFamilyDeletion:
627 case kTypeDeletion:
628 assert(wb->content_flags_.load(std::memory_order_relaxed) &
629 (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE));
630 s = handler->DeleteCF(column_family, key);
631 if (LIKELY(s.ok())) {
632 empty_batch = false;
633 found++;
634 }
635 break;
636 case kTypeColumnFamilySingleDeletion:
637 case kTypeSingleDeletion:
638 assert(wb->content_flags_.load(std::memory_order_relaxed) &
639 (ContentFlags::DEFERRED | ContentFlags::HAS_SINGLE_DELETE));
640 s = handler->SingleDeleteCF(column_family, key);
641 if (LIKELY(s.ok())) {
642 empty_batch = false;
643 found++;
644 }
645 break;
646 case kTypeColumnFamilyRangeDeletion:
647 case kTypeRangeDeletion:
648 assert(wb->content_flags_.load(std::memory_order_relaxed) &
649 (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE_RANGE));
650 s = handler->DeleteRangeCF(column_family, key, value);
651 if (LIKELY(s.ok())) {
652 empty_batch = false;
653 found++;
654 }
655 break;
656 case kTypeColumnFamilyMerge:
657 case kTypeMerge:
658 assert(wb->content_flags_.load(std::memory_order_relaxed) &
659 (ContentFlags::DEFERRED | ContentFlags::HAS_MERGE));
660 s = handler->MergeCF(column_family, key, value);
661 if (LIKELY(s.ok())) {
662 empty_batch = false;
663 found++;
664 }
665 break;
666 case kTypeColumnFamilyBlobIndex:
667 case kTypeBlobIndex:
668 assert(wb->content_flags_.load(std::memory_order_relaxed) &
669 (ContentFlags::DEFERRED | ContentFlags::HAS_BLOB_INDEX));
670 s = handler->PutBlobIndexCF(column_family, key, value);
671 if (LIKELY(s.ok())) {
672 found++;
673 }
674 break;
675 case kTypeLogData:
676 handler->LogData(blob);
677 // A batch might have nothing but LogData. It is still a batch.
678 empty_batch = false;
679 break;
680 case kTypeBeginPrepareXID:
681 assert(wb->content_flags_.load(std::memory_order_relaxed) &
682 (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE));
683 s = handler->MarkBeginPrepare();
684 assert(s.ok());
685 empty_batch = false;
686 if (!handler->WriteAfterCommit()) {
687 s = Status::NotSupported(
688 "WriteCommitted txn tag when write_after_commit_ is disabled (in "
689 "WritePrepared/WriteUnprepared mode). If it is not due to "
690 "corruption, the WAL must be emptied before changing the "
691 "WritePolicy.");
692 }
693 if (handler->WriteBeforePrepare()) {
694 s = Status::NotSupported(
695 "WriteCommitted txn tag when write_before_prepare_ is enabled "
696 "(in WriteUnprepared mode). If it is not due to corruption, the "
697 "WAL must be emptied before changing the WritePolicy.");
698 }
699 break;
700 case kTypeBeginPersistedPrepareXID:
701 assert(wb->content_flags_.load(std::memory_order_relaxed) &
702 (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE));
703 s = handler->MarkBeginPrepare();
704 assert(s.ok());
705 empty_batch = false;
706 if (handler->WriteAfterCommit()) {
707 s = Status::NotSupported(
708 "WritePrepared/WriteUnprepared txn tag when write_after_commit_ "
709 "is enabled (in default WriteCommitted mode). If it is not due "
710 "to corruption, the WAL must be emptied before changing the "
711 "WritePolicy.");
712 }
713 break;
714 case kTypeBeginUnprepareXID:
715 assert(wb->content_flags_.load(std::memory_order_relaxed) &
716 (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_UNPREPARE));
717 s = handler->MarkBeginPrepare(true /* unprepared */);
718 assert(s.ok());
719 empty_batch = false;
720 if (handler->WriteAfterCommit()) {
721 s = Status::NotSupported(
722 "WriteUnprepared txn tag when write_after_commit_ is enabled (in "
723 "default WriteCommitted mode). If it is not due to corruption, "
724 "the WAL must be emptied before changing the WritePolicy.");
725 }
726 if (!handler->WriteBeforePrepare()) {
727 s = Status::NotSupported(
728 "WriteUnprepared txn tag when write_before_prepare_ is disabled "
729 "(in WriteCommitted/WritePrepared mode). If it is not due to "
730 "corruption, the WAL must be emptied before changing the "
731 "WritePolicy.");
732 }
733 break;
734 case kTypeEndPrepareXID:
735 assert(wb->content_flags_.load(std::memory_order_relaxed) &
736 (ContentFlags::DEFERRED | ContentFlags::HAS_END_PREPARE));
737 s = handler->MarkEndPrepare(xid);
738 assert(s.ok());
739 empty_batch = true;
740 break;
741 case kTypeCommitXID:
742 assert(wb->content_flags_.load(std::memory_order_relaxed) &
743 (ContentFlags::DEFERRED | ContentFlags::HAS_COMMIT));
744 s = handler->MarkCommit(xid);
745 assert(s.ok());
746 empty_batch = true;
747 break;
748 case kTypeRollbackXID:
749 assert(wb->content_flags_.load(std::memory_order_relaxed) &
750 (ContentFlags::DEFERRED | ContentFlags::HAS_ROLLBACK));
751 s = handler->MarkRollback(xid);
752 assert(s.ok());
753 empty_batch = true;
754 break;
755 case kTypeNoop:
756 s = handler->MarkNoop(empty_batch);
757 assert(s.ok());
758 empty_batch = true;
759 break;
760 default:
761 return Status::Corruption("unknown WriteBatch tag");
762 }
763 }
764 if (!s.ok()) {
765 return s;
766 }
767 if (handler_continue && whole_batch &&
768 found != WriteBatchInternal::Count(wb)) {
769 return Status::Corruption("WriteBatch has wrong count");
770 } else {
771 return Status::OK();
772 }
773 }
774
IsLatestPersistentState(const WriteBatch * b)775 bool WriteBatchInternal::IsLatestPersistentState(const WriteBatch* b) {
776 return b->is_latest_persistent_state_;
777 }
778
SetAsLastestPersistentState(WriteBatch * b)779 void WriteBatchInternal::SetAsLastestPersistentState(WriteBatch* b) {
780 b->is_latest_persistent_state_ = true;
781 }
782
Count(const WriteBatch * b)783 uint32_t WriteBatchInternal::Count(const WriteBatch* b) {
784 return DecodeFixed32(b->rep_.data() + 8);
785 }
786
SetCount(WriteBatch * b,uint32_t n)787 void WriteBatchInternal::SetCount(WriteBatch* b, uint32_t n) {
788 EncodeFixed32(&b->rep_[8], n);
789 }
790
Sequence(const WriteBatch * b)791 SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) {
792 return SequenceNumber(DecodeFixed64(b->rep_.data()));
793 }
794
SetSequence(WriteBatch * b,SequenceNumber seq)795 void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
796 EncodeFixed64(&b->rep_[0], seq);
797 }
798
GetFirstOffset(WriteBatch *)799 size_t WriteBatchInternal::GetFirstOffset(WriteBatch* /*b*/) {
800 return WriteBatchInternal::kHeader;
801 }
802
Put(WriteBatch * b,uint32_t column_family_id,const Slice & key,const Slice & value)803 Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
804 const Slice& key, const Slice& value) {
805 if (key.size() > size_t{port::kMaxUint32}) {
806 return Status::InvalidArgument("key is too large");
807 }
808 if (value.size() > size_t{port::kMaxUint32}) {
809 return Status::InvalidArgument("value is too large");
810 }
811
812 LocalSavePoint save(b);
813 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
814 if (column_family_id == 0) {
815 b->rep_.push_back(static_cast<char>(kTypeValue));
816 } else {
817 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
818 PutVarint32(&b->rep_, column_family_id);
819 }
820 std::string timestamp(b->timestamp_size_, '\0');
821 if (0 == b->timestamp_size_) {
822 PutLengthPrefixedSlice(&b->rep_, key);
823 } else {
824 PutVarint32(&b->rep_,
825 static_cast<uint32_t>(key.size() + b->timestamp_size_));
826 b->rep_.append(key.data(), key.size());
827 b->rep_.append(timestamp);
828 }
829 PutLengthPrefixedSlice(&b->rep_, value);
830 b->content_flags_.store(
831 b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
832 std::memory_order_relaxed);
833 if (b->prot_info_ != nullptr) {
834 // Technically the optype could've been `kTypeColumnFamilyValue` with the
835 // CF ID encoded in the `WriteBatch`. That distinction is unimportant
836 // however since we verify CF ID is correct, as well as all other fields
837 // (a missing/extra encoded CF ID would corrupt another field). It is
838 // convenient to consolidate on `kTypeValue` here as that is what will be
839 // inserted into memtable.
840 b->prot_info_->entries_.emplace_back(
841 ProtectionInfo64()
842 .ProtectKVOT(key, value, kTypeValue, timestamp)
843 .ProtectC(column_family_id));
844 }
845 return save.commit();
846 }
847
Put(ColumnFamilyHandle * column_family,const Slice & key,const Slice & value)848 Status WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key,
849 const Slice& value) {
850 return WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key,
851 value);
852 }
853
CheckSlicePartsLength(const SliceParts & key,const SliceParts & value)854 Status WriteBatchInternal::CheckSlicePartsLength(const SliceParts& key,
855 const SliceParts& value) {
856 size_t total_key_bytes = 0;
857 for (int i = 0; i < key.num_parts; ++i) {
858 total_key_bytes += key.parts[i].size();
859 }
860 if (total_key_bytes >= size_t{port::kMaxUint32}) {
861 return Status::InvalidArgument("key is too large");
862 }
863
864 size_t total_value_bytes = 0;
865 for (int i = 0; i < value.num_parts; ++i) {
866 total_value_bytes += value.parts[i].size();
867 }
868 if (total_value_bytes >= size_t{port::kMaxUint32}) {
869 return Status::InvalidArgument("value is too large");
870 }
871 return Status::OK();
872 }
873
Put(WriteBatch * b,uint32_t column_family_id,const SliceParts & key,const SliceParts & value)874 Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
875 const SliceParts& key, const SliceParts& value) {
876 Status s = CheckSlicePartsLength(key, value);
877 if (!s.ok()) {
878 return s;
879 }
880
881 LocalSavePoint save(b);
882 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
883 if (column_family_id == 0) {
884 b->rep_.push_back(static_cast<char>(kTypeValue));
885 } else {
886 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
887 PutVarint32(&b->rep_, column_family_id);
888 }
889 std::string timestamp(b->timestamp_size_, '\0');
890 if (0 == b->timestamp_size_) {
891 PutLengthPrefixedSliceParts(&b->rep_, key);
892 } else {
893 PutLengthPrefixedSlicePartsWithPadding(&b->rep_, key, b->timestamp_size_);
894 }
895 PutLengthPrefixedSliceParts(&b->rep_, value);
896 b->content_flags_.store(
897 b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
898 std::memory_order_relaxed);
899 if (b->prot_info_ != nullptr) {
900 // See comment in first `WriteBatchInternal::Put()` overload concerning the
901 // `ValueType` argument passed to `ProtectKVOT()`.
902 b->prot_info_->entries_.emplace_back(
903 ProtectionInfo64()
904 .ProtectKVOT(key, value, kTypeValue, timestamp)
905 .ProtectC(column_family_id));
906 }
907 return save.commit();
908 }
909
Put(ColumnFamilyHandle * column_family,const SliceParts & key,const SliceParts & value)910 Status WriteBatch::Put(ColumnFamilyHandle* column_family, const SliceParts& key,
911 const SliceParts& value) {
912 return WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key,
913 value);
914 }
915
InsertNoop(WriteBatch * b)916 Status WriteBatchInternal::InsertNoop(WriteBatch* b) {
917 b->rep_.push_back(static_cast<char>(kTypeNoop));
918 return Status::OK();
919 }
920
MarkEndPrepare(WriteBatch * b,const Slice & xid,bool write_after_commit,bool unprepared_batch)921 Status WriteBatchInternal::MarkEndPrepare(WriteBatch* b, const Slice& xid,
922 bool write_after_commit,
923 bool unprepared_batch) {
924 // a manually constructed batch can only contain one prepare section
925 assert(b->rep_[12] == static_cast<char>(kTypeNoop));
926
927 // all savepoints up to this point are cleared
928 if (b->save_points_ != nullptr) {
929 while (!b->save_points_->stack.empty()) {
930 b->save_points_->stack.pop();
931 }
932 }
933
934 // rewrite noop as begin marker
935 b->rep_[12] = static_cast<char>(
936 write_after_commit ? kTypeBeginPrepareXID
937 : (unprepared_batch ? kTypeBeginUnprepareXID
938 : kTypeBeginPersistedPrepareXID));
939 b->rep_.push_back(static_cast<char>(kTypeEndPrepareXID));
940 PutLengthPrefixedSlice(&b->rep_, xid);
941 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
942 ContentFlags::HAS_END_PREPARE |
943 ContentFlags::HAS_BEGIN_PREPARE,
944 std::memory_order_relaxed);
945 if (unprepared_batch) {
946 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
947 ContentFlags::HAS_BEGIN_UNPREPARE,
948 std::memory_order_relaxed);
949 }
950 return Status::OK();
951 }
952
MarkCommit(WriteBatch * b,const Slice & xid)953 Status WriteBatchInternal::MarkCommit(WriteBatch* b, const Slice& xid) {
954 b->rep_.push_back(static_cast<char>(kTypeCommitXID));
955 PutLengthPrefixedSlice(&b->rep_, xid);
956 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
957 ContentFlags::HAS_COMMIT,
958 std::memory_order_relaxed);
959 return Status::OK();
960 }
961
MarkRollback(WriteBatch * b,const Slice & xid)962 Status WriteBatchInternal::MarkRollback(WriteBatch* b, const Slice& xid) {
963 b->rep_.push_back(static_cast<char>(kTypeRollbackXID));
964 PutLengthPrefixedSlice(&b->rep_, xid);
965 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
966 ContentFlags::HAS_ROLLBACK,
967 std::memory_order_relaxed);
968 return Status::OK();
969 }
970
Delete(WriteBatch * b,uint32_t column_family_id,const Slice & key)971 Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
972 const Slice& key) {
973 LocalSavePoint save(b);
974 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
975 if (column_family_id == 0) {
976 b->rep_.push_back(static_cast<char>(kTypeDeletion));
977 } else {
978 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion));
979 PutVarint32(&b->rep_, column_family_id);
980 }
981 std::string timestamp(b->timestamp_size_, '\0');
982 if (0 == b->timestamp_size_) {
983 PutLengthPrefixedSlice(&b->rep_, key);
984 } else {
985 PutVarint32(&b->rep_,
986 static_cast<uint32_t>(key.size() + b->timestamp_size_));
987 b->rep_.append(key.data(), key.size());
988 b->rep_.append(timestamp);
989 }
990 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
991 ContentFlags::HAS_DELETE,
992 std::memory_order_relaxed);
993 if (b->prot_info_ != nullptr) {
994 // See comment in first `WriteBatchInternal::Put()` overload concerning the
995 // `ValueType` argument passed to `ProtectKVOT()`.
996 b->prot_info_->entries_.emplace_back(
997 ProtectionInfo64()
998 .ProtectKVOT(key, "" /* value */, kTypeDeletion, timestamp)
999 .ProtectC(column_family_id));
1000 }
1001 return save.commit();
1002 }
1003
Delete(ColumnFamilyHandle * column_family,const Slice & key)1004 Status WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key) {
1005 return WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family),
1006 key);
1007 }
1008
Delete(WriteBatch * b,uint32_t column_family_id,const SliceParts & key)1009 Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
1010 const SliceParts& key) {
1011 LocalSavePoint save(b);
1012 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1013 if (column_family_id == 0) {
1014 b->rep_.push_back(static_cast<char>(kTypeDeletion));
1015 } else {
1016 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion));
1017 PutVarint32(&b->rep_, column_family_id);
1018 }
1019 std::string timestamp(b->timestamp_size_, '\0');
1020 if (0 == b->timestamp_size_) {
1021 PutLengthPrefixedSliceParts(&b->rep_, key);
1022 } else {
1023 PutLengthPrefixedSlicePartsWithPadding(&b->rep_, key, b->timestamp_size_);
1024 }
1025 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1026 ContentFlags::HAS_DELETE,
1027 std::memory_order_relaxed);
1028 if (b->prot_info_ != nullptr) {
1029 // See comment in first `WriteBatchInternal::Put()` overload concerning the
1030 // `ValueType` argument passed to `ProtectKVOT()`.
1031 b->prot_info_->entries_.emplace_back(
1032 ProtectionInfo64()
1033 .ProtectKVOT(key,
1034 SliceParts(nullptr /* _parts */, 0 /* _num_parts */),
1035 kTypeDeletion, timestamp)
1036 .ProtectC(column_family_id));
1037 }
1038 return save.commit();
1039 }
1040
Delete(ColumnFamilyHandle * column_family,const SliceParts & key)1041 Status WriteBatch::Delete(ColumnFamilyHandle* column_family,
1042 const SliceParts& key) {
1043 return WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family),
1044 key);
1045 }
1046
SingleDelete(WriteBatch * b,uint32_t column_family_id,const Slice & key)1047 Status WriteBatchInternal::SingleDelete(WriteBatch* b,
1048 uint32_t column_family_id,
1049 const Slice& key) {
1050 LocalSavePoint save(b);
1051 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1052 if (column_family_id == 0) {
1053 b->rep_.push_back(static_cast<char>(kTypeSingleDeletion));
1054 } else {
1055 b->rep_.push_back(static_cast<char>(kTypeColumnFamilySingleDeletion));
1056 PutVarint32(&b->rep_, column_family_id);
1057 }
1058 PutLengthPrefixedSlice(&b->rep_, key);
1059 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1060 ContentFlags::HAS_SINGLE_DELETE,
1061 std::memory_order_relaxed);
1062 if (b->prot_info_ != nullptr) {
1063 // See comment in first `WriteBatchInternal::Put()` overload concerning the
1064 // `ValueType` argument passed to `ProtectKVOT()`.
1065 b->prot_info_->entries_.emplace_back(ProtectionInfo64()
1066 .ProtectKVOT(key, "" /* value */,
1067 kTypeSingleDeletion,
1068 "" /* timestamp */)
1069 .ProtectC(column_family_id));
1070 }
1071 return save.commit();
1072 }
1073
SingleDelete(ColumnFamilyHandle * column_family,const Slice & key)1074 Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family,
1075 const Slice& key) {
1076 return WriteBatchInternal::SingleDelete(
1077 this, GetColumnFamilyID(column_family), key);
1078 }
1079
SingleDelete(WriteBatch * b,uint32_t column_family_id,const SliceParts & key)1080 Status WriteBatchInternal::SingleDelete(WriteBatch* b,
1081 uint32_t column_family_id,
1082 const SliceParts& key) {
1083 LocalSavePoint save(b);
1084 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1085 if (column_family_id == 0) {
1086 b->rep_.push_back(static_cast<char>(kTypeSingleDeletion));
1087 } else {
1088 b->rep_.push_back(static_cast<char>(kTypeColumnFamilySingleDeletion));
1089 PutVarint32(&b->rep_, column_family_id);
1090 }
1091 PutLengthPrefixedSliceParts(&b->rep_, key);
1092 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1093 ContentFlags::HAS_SINGLE_DELETE,
1094 std::memory_order_relaxed);
1095 if (b->prot_info_ != nullptr) {
1096 // See comment in first `WriteBatchInternal::Put()` overload concerning the
1097 // `ValueType` argument passed to `ProtectKVOT()`.
1098 b->prot_info_->entries_.emplace_back(
1099 ProtectionInfo64()
1100 .ProtectKVOT(key,
1101 SliceParts(nullptr /* _parts */,
1102 0 /* _num_parts */) /* value */,
1103 kTypeSingleDeletion, "" /* timestamp */)
1104 .ProtectC(column_family_id));
1105 }
1106 return save.commit();
1107 }
1108
SingleDelete(ColumnFamilyHandle * column_family,const SliceParts & key)1109 Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family,
1110 const SliceParts& key) {
1111 return WriteBatchInternal::SingleDelete(
1112 this, GetColumnFamilyID(column_family), key);
1113 }
1114
DeleteRange(WriteBatch * b,uint32_t column_family_id,const Slice & begin_key,const Slice & end_key)1115 Status WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id,
1116 const Slice& begin_key,
1117 const Slice& end_key) {
1118 LocalSavePoint save(b);
1119 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1120 if (column_family_id == 0) {
1121 b->rep_.push_back(static_cast<char>(kTypeRangeDeletion));
1122 } else {
1123 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyRangeDeletion));
1124 PutVarint32(&b->rep_, column_family_id);
1125 }
1126 PutLengthPrefixedSlice(&b->rep_, begin_key);
1127 PutLengthPrefixedSlice(&b->rep_, end_key);
1128 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1129 ContentFlags::HAS_DELETE_RANGE,
1130 std::memory_order_relaxed);
1131 if (b->prot_info_ != nullptr) {
1132 // See comment in first `WriteBatchInternal::Put()` overload concerning the
1133 // `ValueType` argument passed to `ProtectKVOT()`.
1134 // In `DeleteRange()`, the end key is treated as the value.
1135 b->prot_info_->entries_.emplace_back(ProtectionInfo64()
1136 .ProtectKVOT(begin_key, end_key,
1137 kTypeRangeDeletion,
1138 "" /* timestamp */)
1139 .ProtectC(column_family_id));
1140 }
1141 return save.commit();
1142 }
1143
DeleteRange(ColumnFamilyHandle * column_family,const Slice & begin_key,const Slice & end_key)1144 Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family,
1145 const Slice& begin_key, const Slice& end_key) {
1146 return WriteBatchInternal::DeleteRange(this, GetColumnFamilyID(column_family),
1147 begin_key, end_key);
1148 }
1149
DeleteRange(WriteBatch * b,uint32_t column_family_id,const SliceParts & begin_key,const SliceParts & end_key)1150 Status WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id,
1151 const SliceParts& begin_key,
1152 const SliceParts& end_key) {
1153 LocalSavePoint save(b);
1154 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1155 if (column_family_id == 0) {
1156 b->rep_.push_back(static_cast<char>(kTypeRangeDeletion));
1157 } else {
1158 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyRangeDeletion));
1159 PutVarint32(&b->rep_, column_family_id);
1160 }
1161 PutLengthPrefixedSliceParts(&b->rep_, begin_key);
1162 PutLengthPrefixedSliceParts(&b->rep_, end_key);
1163 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1164 ContentFlags::HAS_DELETE_RANGE,
1165 std::memory_order_relaxed);
1166 if (b->prot_info_ != nullptr) {
1167 // See comment in first `WriteBatchInternal::Put()` overload concerning the
1168 // `ValueType` argument passed to `ProtectKVOT()`.
1169 // In `DeleteRange()`, the end key is treated as the value.
1170 b->prot_info_->entries_.emplace_back(ProtectionInfo64()
1171 .ProtectKVOT(begin_key, end_key,
1172 kTypeRangeDeletion,
1173 "" /* timestamp */)
1174 .ProtectC(column_family_id));
1175 }
1176 return save.commit();
1177 }
1178
DeleteRange(ColumnFamilyHandle * column_family,const SliceParts & begin_key,const SliceParts & end_key)1179 Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family,
1180 const SliceParts& begin_key,
1181 const SliceParts& end_key) {
1182 return WriteBatchInternal::DeleteRange(this, GetColumnFamilyID(column_family),
1183 begin_key, end_key);
1184 }
1185
Merge(WriteBatch * b,uint32_t column_family_id,const Slice & key,const Slice & value)1186 Status WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
1187 const Slice& key, const Slice& value) {
1188 if (key.size() > size_t{port::kMaxUint32}) {
1189 return Status::InvalidArgument("key is too large");
1190 }
1191 if (value.size() > size_t{port::kMaxUint32}) {
1192 return Status::InvalidArgument("value is too large");
1193 }
1194
1195 LocalSavePoint save(b);
1196 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1197 if (column_family_id == 0) {
1198 b->rep_.push_back(static_cast<char>(kTypeMerge));
1199 } else {
1200 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyMerge));
1201 PutVarint32(&b->rep_, column_family_id);
1202 }
1203 PutLengthPrefixedSlice(&b->rep_, key);
1204 PutLengthPrefixedSlice(&b->rep_, value);
1205 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1206 ContentFlags::HAS_MERGE,
1207 std::memory_order_relaxed);
1208 if (b->prot_info_ != nullptr) {
1209 // See comment in first `WriteBatchInternal::Put()` overload concerning the
1210 // `ValueType` argument passed to `ProtectKVOT()`.
1211 b->prot_info_->entries_.emplace_back(
1212 ProtectionInfo64()
1213 .ProtectKVOT(key, value, kTypeMerge, "" /* timestamp */)
1214 .ProtectC(column_family_id));
1215 }
1216 return save.commit();
1217 }
1218
Merge(ColumnFamilyHandle * column_family,const Slice & key,const Slice & value)1219 Status WriteBatch::Merge(ColumnFamilyHandle* column_family, const Slice& key,
1220 const Slice& value) {
1221 return WriteBatchInternal::Merge(this, GetColumnFamilyID(column_family), key,
1222 value);
1223 }
1224
Merge(WriteBatch * b,uint32_t column_family_id,const SliceParts & key,const SliceParts & value)1225 Status WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
1226 const SliceParts& key,
1227 const SliceParts& value) {
1228 Status s = CheckSlicePartsLength(key, value);
1229 if (!s.ok()) {
1230 return s;
1231 }
1232
1233 LocalSavePoint save(b);
1234 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1235 if (column_family_id == 0) {
1236 b->rep_.push_back(static_cast<char>(kTypeMerge));
1237 } else {
1238 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyMerge));
1239 PutVarint32(&b->rep_, column_family_id);
1240 }
1241 PutLengthPrefixedSliceParts(&b->rep_, key);
1242 PutLengthPrefixedSliceParts(&b->rep_, value);
1243 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1244 ContentFlags::HAS_MERGE,
1245 std::memory_order_relaxed);
1246 if (b->prot_info_ != nullptr) {
1247 // See comment in first `WriteBatchInternal::Put()` overload concerning the
1248 // `ValueType` argument passed to `ProtectKVOT()`.
1249 b->prot_info_->entries_.emplace_back(
1250 ProtectionInfo64()
1251 .ProtectKVOT(key, value, kTypeMerge, "" /* timestamp */)
1252 .ProtectC(column_family_id));
1253 }
1254 return save.commit();
1255 }
1256
Merge(ColumnFamilyHandle * column_family,const SliceParts & key,const SliceParts & value)1257 Status WriteBatch::Merge(ColumnFamilyHandle* column_family,
1258 const SliceParts& key, const SliceParts& value) {
1259 return WriteBatchInternal::Merge(this, GetColumnFamilyID(column_family), key,
1260 value);
1261 }
1262
PutBlobIndex(WriteBatch * b,uint32_t column_family_id,const Slice & key,const Slice & value)1263 Status WriteBatchInternal::PutBlobIndex(WriteBatch* b,
1264 uint32_t column_family_id,
1265 const Slice& key, const Slice& value) {
1266 LocalSavePoint save(b);
1267 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1268 if (column_family_id == 0) {
1269 b->rep_.push_back(static_cast<char>(kTypeBlobIndex));
1270 } else {
1271 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyBlobIndex));
1272 PutVarint32(&b->rep_, column_family_id);
1273 }
1274 PutLengthPrefixedSlice(&b->rep_, key);
1275 PutLengthPrefixedSlice(&b->rep_, value);
1276 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1277 ContentFlags::HAS_BLOB_INDEX,
1278 std::memory_order_relaxed);
1279 if (b->prot_info_ != nullptr) {
1280 // See comment in first `WriteBatchInternal::Put()` overload concerning the
1281 // `ValueType` argument passed to `ProtectKVOT()`.
1282 b->prot_info_->entries_.emplace_back(
1283 ProtectionInfo64()
1284 .ProtectKVOT(key, value, kTypeBlobIndex, "" /* timestamp */)
1285 .ProtectC(column_family_id));
1286 }
1287 return save.commit();
1288 }
1289
PutLogData(const Slice & blob)1290 Status WriteBatch::PutLogData(const Slice& blob) {
1291 LocalSavePoint save(this);
1292 rep_.push_back(static_cast<char>(kTypeLogData));
1293 PutLengthPrefixedSlice(&rep_, blob);
1294 return save.commit();
1295 }
1296
SetSavePoint()1297 void WriteBatch::SetSavePoint() {
1298 if (save_points_ == nullptr) {
1299 save_points_.reset(new SavePoints());
1300 }
1301 // Record length and count of current batch of writes.
1302 save_points_->stack.push(SavePoint(
1303 GetDataSize(), Count(), content_flags_.load(std::memory_order_relaxed)));
1304 }
1305
RollbackToSavePoint()1306 Status WriteBatch::RollbackToSavePoint() {
1307 if (save_points_ == nullptr || save_points_->stack.size() == 0) {
1308 return Status::NotFound();
1309 }
1310
1311 // Pop the most recent savepoint off the stack
1312 SavePoint savepoint = save_points_->stack.top();
1313 save_points_->stack.pop();
1314
1315 assert(savepoint.size <= rep_.size());
1316 assert(static_cast<uint32_t>(savepoint.count) <= Count());
1317
1318 if (savepoint.size == rep_.size()) {
1319 // No changes to rollback
1320 } else if (savepoint.size == 0) {
1321 // Rollback everything
1322 Clear();
1323 } else {
1324 rep_.resize(savepoint.size);
1325 if (prot_info_ != nullptr) {
1326 prot_info_->entries_.resize(savepoint.count);
1327 }
1328 WriteBatchInternal::SetCount(this, savepoint.count);
1329 content_flags_.store(savepoint.content_flags, std::memory_order_relaxed);
1330 }
1331
1332 return Status::OK();
1333 }
1334
PopSavePoint()1335 Status WriteBatch::PopSavePoint() {
1336 if (save_points_ == nullptr || save_points_->stack.size() == 0) {
1337 return Status::NotFound();
1338 }
1339
1340 // Pop the most recent savepoint off the stack
1341 save_points_->stack.pop();
1342
1343 return Status::OK();
1344 }
1345
AssignTimestamp(const Slice & ts)1346 Status WriteBatch::AssignTimestamp(const Slice& ts) {
1347 TimestampAssigner ts_assigner(ts, prot_info_.get());
1348 return Iterate(&ts_assigner);
1349 }
1350
AssignTimestamps(const std::vector<Slice> & ts_list)1351 Status WriteBatch::AssignTimestamps(const std::vector<Slice>& ts_list) {
1352 TimestampAssigner ts_assigner(ts_list, prot_info_.get());
1353 return Iterate(&ts_assigner);
1354 }
1355
1356 class MemTableInserter : public WriteBatch::Handler {
1357
1358 SequenceNumber sequence_;
1359 ColumnFamilyMemTables* const cf_mems_;
1360 FlushScheduler* const flush_scheduler_;
1361 TrimHistoryScheduler* const trim_history_scheduler_;
1362 const bool ignore_missing_column_families_;
1363 const uint64_t recovering_log_number_;
1364 // log number that all Memtables inserted into should reference
1365 uint64_t log_number_ref_;
1366 DBImpl* db_;
1367 const bool concurrent_memtable_writes_;
1368 bool post_info_created_;
1369 const WriteBatch::ProtectionInfo* prot_info_;
1370 size_t prot_info_idx_;
1371
1372 bool* has_valid_writes_;
1373 // On some (!) platforms just default creating
1374 // a map is too expensive in the Write() path as they
1375 // cause memory allocations though unused.
1376 // Make creation optional but do not incur
1377 // std::unique_ptr additional allocation
1378 using MemPostInfoMap = std::map<MemTable*, MemTablePostProcessInfo>;
1379 using PostMapType = std::aligned_storage<sizeof(MemPostInfoMap)>::type;
1380 PostMapType mem_post_info_map_;
1381 // current recovered transaction we are rebuilding (recovery)
1382 WriteBatch* rebuilding_trx_;
1383 SequenceNumber rebuilding_trx_seq_;
1384 // Increase seq number once per each write batch. Otherwise increase it once
1385 // per key.
1386 bool seq_per_batch_;
1387 // Whether the memtable write will be done only after the commit
1388 bool write_after_commit_;
1389 // Whether memtable write can be done before prepare
1390 bool write_before_prepare_;
1391 // Whether this batch was unprepared or not
1392 bool unprepared_batch_;
1393 using DupDetector = std::aligned_storage<sizeof(DuplicateDetector)>::type;
1394 DupDetector duplicate_detector_;
1395 bool dup_dectector_on_;
1396
1397 bool hint_per_batch_;
1398 bool hint_created_;
1399 // Hints for this batch
1400 using HintMap = std::unordered_map<MemTable*, void*>;
1401 using HintMapType = std::aligned_storage<sizeof(HintMap)>::type;
1402 HintMapType hint_;
1403
GetHintMap()1404 HintMap& GetHintMap() {
1405 assert(hint_per_batch_);
1406 if (!hint_created_) {
1407 new (&hint_) HintMap();
1408 hint_created_ = true;
1409 }
1410 return *reinterpret_cast<HintMap*>(&hint_);
1411 }
1412
GetPostMap()1413 MemPostInfoMap& GetPostMap() {
1414 assert(concurrent_memtable_writes_);
1415 if(!post_info_created_) {
1416 new (&mem_post_info_map_) MemPostInfoMap();
1417 post_info_created_ = true;
1418 }
1419 return *reinterpret_cast<MemPostInfoMap*>(&mem_post_info_map_);
1420 }
1421
IsDuplicateKeySeq(uint32_t column_family_id,const Slice & key)1422 bool IsDuplicateKeySeq(uint32_t column_family_id, const Slice& key) {
1423 assert(!write_after_commit_);
1424 assert(rebuilding_trx_ != nullptr);
1425 if (!dup_dectector_on_) {
1426 new (&duplicate_detector_) DuplicateDetector(db_);
1427 dup_dectector_on_ = true;
1428 }
1429 return reinterpret_cast<DuplicateDetector*>
1430 (&duplicate_detector_)->IsDuplicateKeySeq(column_family_id, key, sequence_);
1431 }
1432
NextProtectionInfo()1433 const ProtectionInfoKVOTC64* NextProtectionInfo() {
1434 const ProtectionInfoKVOTC64* res = nullptr;
1435 if (prot_info_ != nullptr) {
1436 assert(prot_info_idx_ < prot_info_->entries_.size());
1437 res = &prot_info_->entries_[prot_info_idx_];
1438 ++prot_info_idx_;
1439 }
1440 return res;
1441 }
1442
1443 protected:
WriteBeforePrepare() const1444 bool WriteBeforePrepare() const override { return write_before_prepare_; }
WriteAfterCommit() const1445 bool WriteAfterCommit() const override { return write_after_commit_; }
1446
1447 public:
1448 // cf_mems should not be shared with concurrent inserters
MemTableInserter(SequenceNumber _sequence,ColumnFamilyMemTables * cf_mems,FlushScheduler * flush_scheduler,TrimHistoryScheduler * trim_history_scheduler,bool ignore_missing_column_families,uint64_t recovering_log_number,DB * db,bool concurrent_memtable_writes,const WriteBatch::ProtectionInfo * prot_info,bool * has_valid_writes=nullptr,bool seq_per_batch=false,bool batch_per_txn=true,bool hint_per_batch=false)1449 MemTableInserter(SequenceNumber _sequence, ColumnFamilyMemTables* cf_mems,
1450 FlushScheduler* flush_scheduler,
1451 TrimHistoryScheduler* trim_history_scheduler,
1452 bool ignore_missing_column_families,
1453 uint64_t recovering_log_number, DB* db,
1454 bool concurrent_memtable_writes,
1455 const WriteBatch::ProtectionInfo* prot_info,
1456 bool* has_valid_writes = nullptr, bool seq_per_batch = false,
1457 bool batch_per_txn = true, bool hint_per_batch = false)
1458 : sequence_(_sequence),
1459 cf_mems_(cf_mems),
1460 flush_scheduler_(flush_scheduler),
1461 trim_history_scheduler_(trim_history_scheduler),
1462 ignore_missing_column_families_(ignore_missing_column_families),
1463 recovering_log_number_(recovering_log_number),
1464 log_number_ref_(0),
1465 db_(static_cast_with_check<DBImpl>(db)),
1466 concurrent_memtable_writes_(concurrent_memtable_writes),
1467 post_info_created_(false),
1468 prot_info_(prot_info),
1469 prot_info_idx_(0),
1470 has_valid_writes_(has_valid_writes),
1471 rebuilding_trx_(nullptr),
1472 rebuilding_trx_seq_(0),
1473 seq_per_batch_(seq_per_batch),
1474 // Write after commit currently uses one seq per key (instead of per
1475 // batch). So seq_per_batch being false indicates write_after_commit
1476 // approach.
1477 write_after_commit_(!seq_per_batch),
1478 // WriteUnprepared can write WriteBatches per transaction, so
1479 // batch_per_txn being false indicates write_before_prepare.
1480 write_before_prepare_(!batch_per_txn),
1481 unprepared_batch_(false),
1482 duplicate_detector_(),
1483 dup_dectector_on_(false),
1484 hint_per_batch_(hint_per_batch),
1485 hint_created_(false) {
1486 assert(cf_mems_);
1487 }
1488
~MemTableInserter()1489 ~MemTableInserter() override {
1490 if (dup_dectector_on_) {
1491 reinterpret_cast<DuplicateDetector*>
1492 (&duplicate_detector_)->~DuplicateDetector();
1493 }
1494 if (post_info_created_) {
1495 reinterpret_cast<MemPostInfoMap*>
1496 (&mem_post_info_map_)->~MemPostInfoMap();
1497 }
1498 if (hint_created_) {
1499 for (auto iter : GetHintMap()) {
1500 delete[] reinterpret_cast<char*>(iter.second);
1501 }
1502 reinterpret_cast<HintMap*>(&hint_)->~HintMap();
1503 }
1504 delete rebuilding_trx_;
1505 }
1506
1507 MemTableInserter(const MemTableInserter&) = delete;
1508 MemTableInserter& operator=(const MemTableInserter&) = delete;
1509
1510 // The batch seq is regularly restarted; In normal mode it is set when
1511 // MemTableInserter is constructed in the write thread and in recovery mode it
1512 // is set when a batch, which is tagged with seq, is read from the WAL.
1513 // Within a sequenced batch, which could be a merge of multiple batches, we
1514 // have two policies to advance the seq: i) seq_per_key (default) and ii)
1515 // seq_per_batch. To implement the latter we need to mark the boundary between
1516 // the individual batches. The approach is this: 1) Use the terminating
1517 // markers to indicate the boundary (kTypeEndPrepareXID, kTypeCommitXID,
1518 // kTypeRollbackXID) 2) Terminate a batch with kTypeNoop in the absence of a
1519 // natural boundary marker.
MaybeAdvanceSeq(bool batch_boundry=false)1520 void MaybeAdvanceSeq(bool batch_boundry = false) {
1521 if (batch_boundry == seq_per_batch_) {
1522 sequence_++;
1523 }
1524 }
1525
set_log_number_ref(uint64_t log)1526 void set_log_number_ref(uint64_t log) { log_number_ref_ = log; }
set_prot_info(const WriteBatch::ProtectionInfo * prot_info)1527 void set_prot_info(const WriteBatch::ProtectionInfo* prot_info) {
1528 prot_info_ = prot_info;
1529 prot_info_idx_ = 0;
1530 }
1531
sequence() const1532 SequenceNumber sequence() const { return sequence_; }
1533
PostProcess()1534 void PostProcess() {
1535 assert(concurrent_memtable_writes_);
1536 // If post info was not created there is nothing
1537 // to process and no need to create on demand
1538 if(post_info_created_) {
1539 for (auto& pair : GetPostMap()) {
1540 pair.first->BatchPostProcess(pair.second);
1541 }
1542 }
1543 }
1544
SeekToColumnFamily(uint32_t column_family_id,Status * s)1545 bool SeekToColumnFamily(uint32_t column_family_id, Status* s) {
1546 // If we are in a concurrent mode, it is the caller's responsibility
1547 // to clone the original ColumnFamilyMemTables so that each thread
1548 // has its own instance. Otherwise, it must be guaranteed that there
1549 // is no concurrent access
1550 bool found = cf_mems_->Seek(column_family_id);
1551 if (!found) {
1552 if (ignore_missing_column_families_) {
1553 *s = Status::OK();
1554 } else {
1555 *s = Status::InvalidArgument(
1556 "Invalid column family specified in write batch");
1557 }
1558 return false;
1559 }
1560 if (recovering_log_number_ != 0 &&
1561 recovering_log_number_ < cf_mems_->GetLogNumber()) {
1562 // This is true only in recovery environment (recovering_log_number_ is
1563 // always 0 in
1564 // non-recovery, regular write code-path)
1565 // * If recovering_log_number_ < cf_mems_->GetLogNumber(), this means that
1566 // column
1567 // family already contains updates from this log. We can't apply updates
1568 // twice because of update-in-place or merge workloads -- ignore the
1569 // update
1570 *s = Status::OK();
1571 return false;
1572 }
1573
1574 if (has_valid_writes_ != nullptr) {
1575 *has_valid_writes_ = true;
1576 }
1577
1578 if (log_number_ref_ > 0) {
1579 cf_mems_->GetMemTable()->RefLogContainingPrepSection(log_number_ref_);
1580 }
1581
1582 return true;
1583 }
1584
PutCFImpl(uint32_t column_family_id,const Slice & key,const Slice & value,ValueType value_type,const ProtectionInfoKVOTS64 * kv_prot_info)1585 Status PutCFImpl(uint32_t column_family_id, const Slice& key,
1586 const Slice& value, ValueType value_type,
1587 const ProtectionInfoKVOTS64* kv_prot_info) {
1588 // optimize for non-recovery mode
1589 if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
1590 // TODO(ajkr): propagate `ProtectionInfoKVOTS64`.
1591 return WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key,
1592 value);
1593 // else insert the values to the memtable right away
1594 }
1595
1596 Status ret_status;
1597 if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
1598 if (ret_status.ok() && rebuilding_trx_ != nullptr) {
1599 assert(!write_after_commit_);
1600 // The CF is probably flushed and hence no need for insert but we still
1601 // need to keep track of the keys for upcoming rollback/commit.
1602 // TODO(ajkr): propagate `ProtectionInfoKVOTS64`.
1603 ret_status = WriteBatchInternal::Put(rebuilding_trx_, column_family_id,
1604 key, value);
1605 if (ret_status.ok()) {
1606 MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, key));
1607 }
1608 } else if (ret_status.ok()) {
1609 MaybeAdvanceSeq(false /* batch_boundary */);
1610 }
1611 return ret_status;
1612 }
1613 assert(ret_status.ok());
1614
1615 MemTable* mem = cf_mems_->GetMemTable();
1616 auto* moptions = mem->GetImmutableMemTableOptions();
1617 // inplace_update_support is inconsistent with snapshots, and therefore with
1618 // any kind of transactions including the ones that use seq_per_batch
1619 assert(!seq_per_batch_ || !moptions->inplace_update_support);
1620 if (!moptions->inplace_update_support) {
1621 ret_status =
1622 mem->Add(sequence_, value_type, key, value, kv_prot_info,
1623 concurrent_memtable_writes_, get_post_process_info(mem),
1624 hint_per_batch_ ? &GetHintMap()[mem] : nullptr);
1625 } else if (moptions->inplace_callback == nullptr) {
1626 assert(!concurrent_memtable_writes_);
1627 ret_status = mem->Update(sequence_, key, value, kv_prot_info);
1628 } else {
1629 assert(!concurrent_memtable_writes_);
1630 ret_status = mem->UpdateCallback(sequence_, key, value, kv_prot_info);
1631 if (ret_status.IsNotFound()) {
1632 // key not found in memtable. Do sst get, update, add
1633 SnapshotImpl read_from_snapshot;
1634 read_from_snapshot.number_ = sequence_;
1635 ReadOptions ropts;
1636 // it's going to be overwritten for sure, so no point caching data block
1637 // containing the old version
1638 ropts.fill_cache = false;
1639 ropts.snapshot = &read_from_snapshot;
1640
1641 std::string prev_value;
1642 std::string merged_value;
1643
1644 auto cf_handle = cf_mems_->GetColumnFamilyHandle();
1645 Status get_status = Status::NotSupported();
1646 if (db_ != nullptr && recovering_log_number_ == 0) {
1647 if (cf_handle == nullptr) {
1648 cf_handle = db_->DefaultColumnFamily();
1649 }
1650 get_status = db_->Get(ropts, cf_handle, key, &prev_value);
1651 }
1652 // Intentionally overwrites the `NotFound` in `ret_status`.
1653 if (!get_status.ok() && !get_status.IsNotFound()) {
1654 ret_status = get_status;
1655 } else {
1656 ret_status = Status::OK();
1657 }
1658 if (ret_status.ok()) {
1659 UpdateStatus update_status;
1660 char* prev_buffer = const_cast<char*>(prev_value.c_str());
1661 uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
1662 if (get_status.ok()) {
1663 update_status = moptions->inplace_callback(prev_buffer, &prev_size,
1664 value, &merged_value);
1665 } else {
1666 update_status = moptions->inplace_callback(
1667 nullptr /* existing_value */, nullptr /* existing_value_size */,
1668 value, &merged_value);
1669 }
1670 if (update_status == UpdateStatus::UPDATED_INPLACE) {
1671 assert(get_status.ok());
1672 if (kv_prot_info != nullptr) {
1673 ProtectionInfoKVOTS64 updated_kv_prot_info(*kv_prot_info);
1674 updated_kv_prot_info.UpdateV(value,
1675 Slice(prev_buffer, prev_size));
1676 // prev_value is updated in-place with final value.
1677 ret_status = mem->Add(sequence_, value_type, key,
1678 Slice(prev_buffer, prev_size),
1679 &updated_kv_prot_info);
1680 } else {
1681 ret_status = mem->Add(sequence_, value_type, key,
1682 Slice(prev_buffer, prev_size),
1683 nullptr /* kv_prot_info */);
1684 }
1685 if (ret_status.ok()) {
1686 RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
1687 }
1688 } else if (update_status == UpdateStatus::UPDATED) {
1689 if (kv_prot_info != nullptr) {
1690 ProtectionInfoKVOTS64 updated_kv_prot_info(*kv_prot_info);
1691 updated_kv_prot_info.UpdateV(value, merged_value);
1692 // merged_value contains the final value.
1693 ret_status = mem->Add(sequence_, value_type, key,
1694 Slice(merged_value), &updated_kv_prot_info);
1695 } else {
1696 // merged_value contains the final value.
1697 ret_status =
1698 mem->Add(sequence_, value_type, key, Slice(merged_value),
1699 nullptr /* kv_prot_info */);
1700 }
1701 if (ret_status.ok()) {
1702 RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
1703 }
1704 }
1705 }
1706 }
1707 }
1708 if (UNLIKELY(ret_status.IsTryAgain())) {
1709 assert(seq_per_batch_);
1710 const bool kBatchBoundary = true;
1711 MaybeAdvanceSeq(kBatchBoundary);
1712 } else if (ret_status.ok()) {
1713 MaybeAdvanceSeq();
1714 CheckMemtableFull();
1715 }
1716 // optimize for non-recovery mode
1717 // If `ret_status` is `TryAgain` then the next (successful) try will add
1718 // the key to the rebuilding transaction object. If `ret_status` is
1719 // another non-OK `Status`, then the `rebuilding_trx_` will be thrown
1720 // away. So we only need to add to it when `ret_status.ok()`.
1721 if (UNLIKELY(ret_status.ok() && rebuilding_trx_ != nullptr)) {
1722 assert(!write_after_commit_);
1723 // TODO(ajkr): propagate `ProtectionInfoKVOTS64`.
1724 ret_status = WriteBatchInternal::Put(rebuilding_trx_, column_family_id,
1725 key, value);
1726 }
1727 return ret_status;
1728 }
1729
PutCF(uint32_t column_family_id,const Slice & key,const Slice & value)1730 Status PutCF(uint32_t column_family_id, const Slice& key,
1731 const Slice& value) override {
1732 const auto* kv_prot_info = NextProtectionInfo();
1733 if (kv_prot_info != nullptr) {
1734 // Memtable needs seqno, doesn't need CF ID
1735 auto mem_kv_prot_info =
1736 kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
1737 return PutCFImpl(column_family_id, key, value, kTypeValue,
1738 &mem_kv_prot_info);
1739 }
1740 return PutCFImpl(column_family_id, key, value, kTypeValue,
1741 nullptr /* kv_prot_info */);
1742 }
1743
DeleteImpl(uint32_t,const Slice & key,const Slice & value,ValueType delete_type,const ProtectionInfoKVOTS64 * kv_prot_info)1744 Status DeleteImpl(uint32_t /*column_family_id*/, const Slice& key,
1745 const Slice& value, ValueType delete_type,
1746 const ProtectionInfoKVOTS64* kv_prot_info) {
1747 Status ret_status;
1748 MemTable* mem = cf_mems_->GetMemTable();
1749 ret_status =
1750 mem->Add(sequence_, delete_type, key, value, kv_prot_info,
1751 concurrent_memtable_writes_, get_post_process_info(mem),
1752 hint_per_batch_ ? &GetHintMap()[mem] : nullptr);
1753 if (UNLIKELY(ret_status.IsTryAgain())) {
1754 assert(seq_per_batch_);
1755 const bool kBatchBoundary = true;
1756 MaybeAdvanceSeq(kBatchBoundary);
1757 } else if (ret_status.ok()) {
1758 MaybeAdvanceSeq();
1759 CheckMemtableFull();
1760 }
1761 return ret_status;
1762 }
1763
DeleteCF(uint32_t column_family_id,const Slice & key)1764 Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
1765 const auto* kv_prot_info = NextProtectionInfo();
1766 // optimize for non-recovery mode
1767 if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
1768 // TODO(ajkr): propagate `ProtectionInfoKVOTS64`.
1769 return WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
1770 // else insert the values to the memtable right away
1771 }
1772
1773 Status ret_status;
1774 if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
1775 if (ret_status.ok() && rebuilding_trx_ != nullptr) {
1776 assert(!write_after_commit_);
1777 // The CF is probably flushed and hence no need for insert but we still
1778 // need to keep track of the keys for upcoming rollback/commit.
1779 // TODO(ajkr): propagate `ProtectionInfoKVOTS64`.
1780 ret_status =
1781 WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
1782 if (ret_status.ok()) {
1783 MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, key));
1784 }
1785 } else if (ret_status.ok()) {
1786 MaybeAdvanceSeq(false /* batch_boundary */);
1787 }
1788 return ret_status;
1789 }
1790
1791 ColumnFamilyData* cfd = cf_mems_->current();
1792 assert(!cfd || cfd->user_comparator());
1793 const size_t ts_sz = (cfd && cfd->user_comparator())
1794 ? cfd->user_comparator()->timestamp_size()
1795 : 0;
1796 const ValueType delete_type =
1797 (0 == ts_sz) ? kTypeDeletion : kTypeDeletionWithTimestamp;
1798 if (kv_prot_info != nullptr) {
1799 auto mem_kv_prot_info =
1800 kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
1801 mem_kv_prot_info.UpdateO(kTypeDeletion, delete_type);
1802 ret_status = DeleteImpl(column_family_id, key, Slice(), delete_type,
1803 &mem_kv_prot_info);
1804 } else {
1805 ret_status = DeleteImpl(column_family_id, key, Slice(), delete_type,
1806 nullptr /* kv_prot_info */);
1807 }
1808 // optimize for non-recovery mode
1809 // If `ret_status` is `TryAgain` then the next (successful) try will add
1810 // the key to the rebuilding transaction object. If `ret_status` is
1811 // another non-OK `Status`, then the `rebuilding_trx_` will be thrown
1812 // away. So we only need to add to it when `ret_status.ok()`.
1813 if (UNLIKELY(ret_status.ok() && rebuilding_trx_ != nullptr)) {
1814 assert(!write_after_commit_);
1815 // TODO(ajkr): propagate `ProtectionInfoKVOTS64`.
1816 ret_status =
1817 WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
1818 }
1819 return ret_status;
1820 }
1821
SingleDeleteCF(uint32_t column_family_id,const Slice & key)1822 Status SingleDeleteCF(uint32_t column_family_id, const Slice& key) override {
1823 const auto* kv_prot_info = NextProtectionInfo();
1824 // optimize for non-recovery mode
1825 if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
1826 // TODO(ajkr): propagate `ProtectionInfoKVOTS64`.
1827 return WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id,
1828 key);
1829 // else insert the values to the memtable right away
1830 }
1831
1832 Status ret_status;
1833 if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
1834 if (ret_status.ok() && rebuilding_trx_ != nullptr) {
1835 assert(!write_after_commit_);
1836 // The CF is probably flushed and hence no need for insert but we still
1837 // need to keep track of the keys for upcoming rollback/commit.
1838 // TODO(ajkr): propagate `ProtectionInfoKVOTS64`.
1839 ret_status = WriteBatchInternal::SingleDelete(rebuilding_trx_,
1840 column_family_id, key);
1841 if (ret_status.ok()) {
1842 MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, key));
1843 }
1844 } else if (ret_status.ok()) {
1845 MaybeAdvanceSeq(false /* batch_boundary */);
1846 }
1847 return ret_status;
1848 }
1849 assert(ret_status.ok());
1850
1851 if (kv_prot_info != nullptr) {
1852 auto mem_kv_prot_info =
1853 kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
1854 ret_status = DeleteImpl(column_family_id, key, Slice(),
1855 kTypeSingleDeletion, &mem_kv_prot_info);
1856 } else {
1857 ret_status = DeleteImpl(column_family_id, key, Slice(),
1858 kTypeSingleDeletion, nullptr /* kv_prot_info */);
1859 }
1860 // optimize for non-recovery mode
1861 // If `ret_status` is `TryAgain` then the next (successful) try will add
1862 // the key to the rebuilding transaction object. If `ret_status` is
1863 // another non-OK `Status`, then the `rebuilding_trx_` will be thrown
1864 // away. So we only need to add to it when `ret_status.ok()`.
1865 if (UNLIKELY(ret_status.ok() && rebuilding_trx_ != nullptr)) {
1866 assert(!write_after_commit_);
1867 // TODO(ajkr): propagate `ProtectionInfoKVOTS64`.
1868 ret_status = WriteBatchInternal::SingleDelete(rebuilding_trx_,
1869 column_family_id, key);
1870 }
1871 return ret_status;
1872 }
1873
DeleteRangeCF(uint32_t column_family_id,const Slice & begin_key,const Slice & end_key)1874 Status DeleteRangeCF(uint32_t column_family_id, const Slice& begin_key,
1875 const Slice& end_key) override {
1876 const auto* kv_prot_info = NextProtectionInfo();
1877 // optimize for non-recovery mode
1878 if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
1879 // TODO(ajkr): propagate `ProtectionInfoKVOTS64`.
1880 return WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id,
1881 begin_key, end_key);
1882 // else insert the values to the memtable right away
1883 }
1884
1885 Status ret_status;
1886 if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
1887 if (ret_status.ok() && rebuilding_trx_ != nullptr) {
1888 assert(!write_after_commit_);
1889 // The CF is probably flushed and hence no need for insert but we still
1890 // need to keep track of the keys for upcoming rollback/commit.
1891 // TODO(ajkr): propagate `ProtectionInfoKVOTS64`.
1892 ret_status = WriteBatchInternal::DeleteRange(
1893 rebuilding_trx_, column_family_id, begin_key, end_key);
1894 if (ret_status.ok()) {
1895 MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, begin_key));
1896 }
1897 } else if (ret_status.ok()) {
1898 MaybeAdvanceSeq(false /* batch_boundary */);
1899 }
1900 return ret_status;
1901 }
1902 assert(ret_status.ok());
1903
1904 if (db_ != nullptr) {
1905 auto cf_handle = cf_mems_->GetColumnFamilyHandle();
1906 if (cf_handle == nullptr) {
1907 cf_handle = db_->DefaultColumnFamily();
1908 }
1909 auto* cfd =
1910 static_cast_with_check<ColumnFamilyHandleImpl>(cf_handle)->cfd();
1911 if (!cfd->is_delete_range_supported()) {
1912 // TODO(ajkr): refactor `SeekToColumnFamily()` so it returns a `Status`.
1913 ret_status.PermitUncheckedError();
1914 return Status::NotSupported(
1915 std::string("DeleteRange not supported for table type ") +
1916 cfd->ioptions()->table_factory->Name() + " in CF " +
1917 cfd->GetName());
1918 }
1919 int cmp = cfd->user_comparator()->Compare(begin_key, end_key);
1920 if (cmp > 0) {
1921 // TODO(ajkr): refactor `SeekToColumnFamily()` so it returns a `Status`.
1922 ret_status.PermitUncheckedError();
1923 // It's an empty range where endpoints appear mistaken. Don't bother
1924 // applying it to the DB, and return an error to the user.
1925 return Status::InvalidArgument("end key comes before start key");
1926 } else if (cmp == 0) {
1927 // TODO(ajkr): refactor `SeekToColumnFamily()` so it returns a `Status`.
1928 ret_status.PermitUncheckedError();
1929 // It's an empty range. Don't bother applying it to the DB.
1930 return Status::OK();
1931 }
1932 }
1933
1934 if (kv_prot_info != nullptr) {
1935 auto mem_kv_prot_info =
1936 kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
1937 ret_status = DeleteImpl(column_family_id, begin_key, end_key,
1938 kTypeRangeDeletion, &mem_kv_prot_info);
1939 } else {
1940 ret_status = DeleteImpl(column_family_id, begin_key, end_key,
1941 kTypeRangeDeletion, nullptr /* kv_prot_info */);
1942 }
1943 // optimize for non-recovery mode
1944 // If `ret_status` is `TryAgain` then the next (successful) try will add
1945 // the key to the rebuilding transaction object. If `ret_status` is
1946 // another non-OK `Status`, then the `rebuilding_trx_` will be thrown
1947 // away. So we only need to add to it when `ret_status.ok()`.
1948 if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
1949 assert(!write_after_commit_);
1950 // TODO(ajkr): propagate `ProtectionInfoKVOTS64`.
1951 ret_status = WriteBatchInternal::DeleteRange(
1952 rebuilding_trx_, column_family_id, begin_key, end_key);
1953 }
1954 return ret_status;
1955 }
1956
MergeCF(uint32_t column_family_id,const Slice & key,const Slice & value)1957 Status MergeCF(uint32_t column_family_id, const Slice& key,
1958 const Slice& value) override {
1959 const auto* kv_prot_info = NextProtectionInfo();
1960 // optimize for non-recovery mode
1961 if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
1962 // TODO(ajkr): propagate `ProtectionInfoKVOTS64`.
1963 return WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key,
1964 value);
1965 // else insert the values to the memtable right away
1966 }
1967
1968 Status ret_status;
1969 if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
1970 if (ret_status.ok() && rebuilding_trx_ != nullptr) {
1971 assert(!write_after_commit_);
1972 // The CF is probably flushed and hence no need for insert but we still
1973 // need to keep track of the keys for upcoming rollback/commit.
1974 // TODO(ajkr): propagate `ProtectionInfoKVOTS64`.
1975 ret_status = WriteBatchInternal::Merge(rebuilding_trx_,
1976 column_family_id, key, value);
1977 if (ret_status.ok()) {
1978 MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, key));
1979 }
1980 } else if (ret_status.ok()) {
1981 MaybeAdvanceSeq(false /* batch_boundary */);
1982 }
1983 return ret_status;
1984 }
1985 assert(ret_status.ok());
1986
1987 MemTable* mem = cf_mems_->GetMemTable();
1988 auto* moptions = mem->GetImmutableMemTableOptions();
1989 if (moptions->merge_operator == nullptr) {
1990 return Status::InvalidArgument(
1991 "Merge requires `ColumnFamilyOptions::merge_operator != nullptr`");
1992 }
1993 bool perform_merge = false;
1994 assert(!concurrent_memtable_writes_ ||
1995 moptions->max_successive_merges == 0);
1996
1997 // If we pass DB through and options.max_successive_merges is hit
1998 // during recovery, Get() will be issued which will try to acquire
1999 // DB mutex and cause deadlock, as DB mutex is already held.
2000 // So we disable merge in recovery
2001 if (moptions->max_successive_merges > 0 && db_ != nullptr &&
2002 recovering_log_number_ == 0) {
2003 assert(!concurrent_memtable_writes_);
2004 LookupKey lkey(key, sequence_);
2005
2006 // Count the number of successive merges at the head
2007 // of the key in the memtable
2008 size_t num_merges = mem->CountSuccessiveMergeEntries(lkey);
2009
2010 if (num_merges >= moptions->max_successive_merges) {
2011 perform_merge = true;
2012 }
2013 }
2014
2015 if (perform_merge) {
2016 // 1) Get the existing value
2017 std::string get_value;
2018
2019 // Pass in the sequence number so that we also include previous merge
2020 // operations in the same batch.
2021 SnapshotImpl read_from_snapshot;
2022 read_from_snapshot.number_ = sequence_;
2023 ReadOptions read_options;
2024 read_options.snapshot = &read_from_snapshot;
2025
2026 auto cf_handle = cf_mems_->GetColumnFamilyHandle();
2027 if (cf_handle == nullptr) {
2028 cf_handle = db_->DefaultColumnFamily();
2029 }
2030 Status get_status = db_->Get(read_options, cf_handle, key, &get_value);
2031 if (!get_status.ok()) {
2032 // Failed to read a key we know exists. Store the delta in memtable.
2033 perform_merge = false;
2034 } else {
2035 Slice get_value_slice = Slice(get_value);
2036
2037 // 2) Apply this merge
2038 auto merge_operator = moptions->merge_operator;
2039 assert(merge_operator);
2040
2041 std::string new_value;
2042 Status merge_status = MergeHelper::TimedFullMerge(
2043 merge_operator, key, &get_value_slice, {value}, &new_value,
2044 moptions->info_log, moptions->statistics,
2045 SystemClock::Default().get());
2046
2047 if (!merge_status.ok()) {
2048 // Failed to merge!
2049 // Store the delta in memtable
2050 perform_merge = false;
2051 } else {
2052 // 3) Add value to memtable
2053 assert(!concurrent_memtable_writes_);
2054 if (kv_prot_info != nullptr) {
2055 auto merged_kv_prot_info =
2056 kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
2057 merged_kv_prot_info.UpdateV(value, new_value);
2058 merged_kv_prot_info.UpdateO(kTypeMerge, kTypeValue);
2059 ret_status = mem->Add(sequence_, kTypeValue, key, new_value,
2060 &merged_kv_prot_info);
2061 } else {
2062 ret_status = mem->Add(sequence_, kTypeValue, key, new_value,
2063 nullptr /* kv_prot_info */);
2064 }
2065 }
2066 }
2067 }
2068
2069 if (!perform_merge) {
2070 assert(ret_status.ok());
2071 // Add merge operand to memtable
2072 if (kv_prot_info != nullptr) {
2073 auto mem_kv_prot_info =
2074 kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
2075 ret_status =
2076 mem->Add(sequence_, kTypeMerge, key, value, &mem_kv_prot_info,
2077 concurrent_memtable_writes_, get_post_process_info(mem));
2078 } else {
2079 ret_status = mem->Add(
2080 sequence_, kTypeMerge, key, value, nullptr /* kv_prot_info */,
2081 concurrent_memtable_writes_, get_post_process_info(mem));
2082 }
2083 }
2084
2085 if (UNLIKELY(ret_status.IsTryAgain())) {
2086 assert(seq_per_batch_);
2087 const bool kBatchBoundary = true;
2088 MaybeAdvanceSeq(kBatchBoundary);
2089 } else if (ret_status.ok()) {
2090 MaybeAdvanceSeq();
2091 CheckMemtableFull();
2092 }
2093 // optimize for non-recovery mode
2094 // If `ret_status` is `TryAgain` then the next (successful) try will add
2095 // the key to the rebuilding transaction object. If `ret_status` is
2096 // another non-OK `Status`, then the `rebuilding_trx_` will be thrown
2097 // away. So we only need to add to it when `ret_status.ok()`.
2098 if (UNLIKELY(ret_status.ok() && rebuilding_trx_ != nullptr)) {
2099 assert(!write_after_commit_);
2100 // TODO(ajkr): propagate `ProtectionInfoKVOTS64`.
2101 ret_status = WriteBatchInternal::Merge(rebuilding_trx_, column_family_id,
2102 key, value);
2103 }
2104 return ret_status;
2105 }
2106
PutBlobIndexCF(uint32_t column_family_id,const Slice & key,const Slice & value)2107 Status PutBlobIndexCF(uint32_t column_family_id, const Slice& key,
2108 const Slice& value) override {
2109 const auto* kv_prot_info = NextProtectionInfo();
2110 if (kv_prot_info != nullptr) {
2111 // Memtable needs seqno, doesn't need CF ID
2112 auto mem_kv_prot_info =
2113 kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
2114 // Same as PutCF except for value type.
2115 return PutCFImpl(column_family_id, key, value, kTypeBlobIndex,
2116 &mem_kv_prot_info);
2117 } else {
2118 return PutCFImpl(column_family_id, key, value, kTypeBlobIndex,
2119 nullptr /* kv_prot_info */);
2120 }
2121 }
2122
CheckMemtableFull()2123 void CheckMemtableFull() {
2124 if (flush_scheduler_ != nullptr) {
2125 auto* cfd = cf_mems_->current();
2126 assert(cfd != nullptr);
2127 if (cfd->mem()->ShouldScheduleFlush() &&
2128 cfd->mem()->MarkFlushScheduled()) {
2129 // MarkFlushScheduled only returns true if we are the one that
2130 // should take action, so no need to dedup further
2131 flush_scheduler_->ScheduleWork(cfd);
2132 }
2133 }
2134 // check if memtable_list size exceeds max_write_buffer_size_to_maintain
2135 if (trim_history_scheduler_ != nullptr) {
2136 auto* cfd = cf_mems_->current();
2137
2138 assert(cfd);
2139 assert(cfd->ioptions());
2140
2141 const size_t size_to_maintain = static_cast<size_t>(
2142 cfd->ioptions()->max_write_buffer_size_to_maintain);
2143
2144 if (size_to_maintain > 0) {
2145 MemTableList* const imm = cfd->imm();
2146 assert(imm);
2147
2148 if (imm->HasHistory()) {
2149 const MemTable* const mem = cfd->mem();
2150 assert(mem);
2151
2152 if (mem->ApproximateMemoryUsageFast() +
2153 imm->ApproximateMemoryUsageExcludingLast() >=
2154 size_to_maintain &&
2155 imm->MarkTrimHistoryNeeded()) {
2156 trim_history_scheduler_->ScheduleWork(cfd);
2157 }
2158 }
2159 }
2160 }
2161 }
2162
2163 // The write batch handler calls MarkBeginPrepare with unprepare set to true
2164 // if it encounters the kTypeBeginUnprepareXID marker.
MarkBeginPrepare(bool unprepare)2165 Status MarkBeginPrepare(bool unprepare) override {
2166 assert(rebuilding_trx_ == nullptr);
2167 assert(db_);
2168
2169 if (recovering_log_number_ != 0) {
2170 // during recovery we rebuild a hollow transaction
2171 // from all encountered prepare sections of the wal
2172 if (db_->allow_2pc() == false) {
2173 return Status::NotSupported(
2174 "WAL contains prepared transactions. Open with "
2175 "TransactionDB::Open().");
2176 }
2177
2178 // we are now iterating through a prepared section
2179 rebuilding_trx_ = new WriteBatch();
2180 rebuilding_trx_seq_ = sequence_;
2181 // Verify that we have matching MarkBeginPrepare/MarkEndPrepare markers.
2182 // unprepared_batch_ should be false because it is false by default, and
2183 // gets reset to false in MarkEndPrepare.
2184 assert(!unprepared_batch_);
2185 unprepared_batch_ = unprepare;
2186
2187 if (has_valid_writes_ != nullptr) {
2188 *has_valid_writes_ = true;
2189 }
2190 }
2191
2192 return Status::OK();
2193 }
2194
MarkEndPrepare(const Slice & name)2195 Status MarkEndPrepare(const Slice& name) override {
2196 assert(db_);
2197 assert((rebuilding_trx_ != nullptr) == (recovering_log_number_ != 0));
2198
2199 if (recovering_log_number_ != 0) {
2200 assert(db_->allow_2pc());
2201 size_t batch_cnt =
2202 write_after_commit_
2203 ? 0 // 0 will disable further checks
2204 : static_cast<size_t>(sequence_ - rebuilding_trx_seq_ + 1);
2205 db_->InsertRecoveredTransaction(recovering_log_number_, name.ToString(),
2206 rebuilding_trx_, rebuilding_trx_seq_,
2207 batch_cnt, unprepared_batch_);
2208 unprepared_batch_ = false;
2209 rebuilding_trx_ = nullptr;
2210 } else {
2211 assert(rebuilding_trx_ == nullptr);
2212 }
2213 const bool batch_boundry = true;
2214 MaybeAdvanceSeq(batch_boundry);
2215
2216 return Status::OK();
2217 }
2218
MarkNoop(bool empty_batch)2219 Status MarkNoop(bool empty_batch) override {
2220 // A hack in pessimistic transaction could result into a noop at the start
2221 // of the write batch, that should be ignored.
2222 if (!empty_batch) {
2223 // In the absence of Prepare markers, a kTypeNoop tag indicates the end of
2224 // a batch. This happens when write batch commits skipping the prepare
2225 // phase.
2226 const bool batch_boundry = true;
2227 MaybeAdvanceSeq(batch_boundry);
2228 }
2229 return Status::OK();
2230 }
2231
MarkCommit(const Slice & name)2232 Status MarkCommit(const Slice& name) override {
2233 assert(db_);
2234
2235 Status s;
2236
2237 if (recovering_log_number_ != 0) {
2238 // in recovery when we encounter a commit marker
2239 // we lookup this transaction in our set of rebuilt transactions
2240 // and commit.
2241 auto trx = db_->GetRecoveredTransaction(name.ToString());
2242
2243 // the log containing the prepared section may have
2244 // been released in the last incarnation because the
2245 // data was flushed to L0
2246 if (trx != nullptr) {
2247 // at this point individual CF lognumbers will prevent
2248 // duplicate re-insertion of values.
2249 assert(log_number_ref_ == 0);
2250 if (write_after_commit_) {
2251 // write_after_commit_ can only have one batch in trx.
2252 assert(trx->batches_.size() == 1);
2253 const auto& batch_info = trx->batches_.begin()->second;
2254 // all inserts must reference this trx log number
2255 log_number_ref_ = batch_info.log_number_;
2256 s = batch_info.batch_->Iterate(this);
2257 log_number_ref_ = 0;
2258 }
2259 // else the values are already inserted before the commit
2260
2261 if (s.ok()) {
2262 db_->DeleteRecoveredTransaction(name.ToString());
2263 }
2264 if (has_valid_writes_ != nullptr) {
2265 *has_valid_writes_ = true;
2266 }
2267 }
2268 } else {
2269 // When writes are not delayed until commit, there is no disconnect
2270 // between a memtable write and the WAL that supports it. So the commit
2271 // need not reference any log as the only log to which it depends.
2272 assert(!write_after_commit_ || log_number_ref_ > 0);
2273 }
2274 const bool batch_boundry = true;
2275 MaybeAdvanceSeq(batch_boundry);
2276
2277 return s;
2278 }
2279
MarkRollback(const Slice & name)2280 Status MarkRollback(const Slice& name) override {
2281 assert(db_);
2282
2283 if (recovering_log_number_ != 0) {
2284 auto trx = db_->GetRecoveredTransaction(name.ToString());
2285
2286 // the log containing the transactions prep section
2287 // may have been released in the previous incarnation
2288 // because we knew it had been rolled back
2289 if (trx != nullptr) {
2290 db_->DeleteRecoveredTransaction(name.ToString());
2291 }
2292 } else {
2293 // in non recovery we simply ignore this tag
2294 }
2295
2296 const bool batch_boundry = true;
2297 MaybeAdvanceSeq(batch_boundry);
2298
2299 return Status::OK();
2300 }
2301
2302 private:
get_post_process_info(MemTable * mem)2303 MemTablePostProcessInfo* get_post_process_info(MemTable* mem) {
2304 if (!concurrent_memtable_writes_) {
2305 // No need to batch counters locally if we don't use concurrent mode.
2306 return nullptr;
2307 }
2308 return &GetPostMap()[mem];
2309 }
2310 };
2311
2312 // This function can only be called in these conditions:
2313 // 1) During Recovery()
2314 // 2) During Write(), in a single-threaded write thread
2315 // 3) During Write(), in a concurrent context where memtables has been cloned
2316 // The reason is that it calls memtables->Seek(), which has a stateful cache
InsertInto(WriteThread::WriteGroup & write_group,SequenceNumber sequence,ColumnFamilyMemTables * memtables,FlushScheduler * flush_scheduler,TrimHistoryScheduler * trim_history_scheduler,bool ignore_missing_column_families,uint64_t recovery_log_number,DB * db,bool concurrent_memtable_writes,bool seq_per_batch,bool batch_per_txn)2317 Status WriteBatchInternal::InsertInto(
2318 WriteThread::WriteGroup& write_group, SequenceNumber sequence,
2319 ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
2320 TrimHistoryScheduler* trim_history_scheduler,
2321 bool ignore_missing_column_families, uint64_t recovery_log_number, DB* db,
2322 bool concurrent_memtable_writes, bool seq_per_batch, bool batch_per_txn) {
2323 MemTableInserter inserter(
2324 sequence, memtables, flush_scheduler, trim_history_scheduler,
2325 ignore_missing_column_families, recovery_log_number, db,
2326 concurrent_memtable_writes, nullptr /* prot_info */,
2327 nullptr /*has_valid_writes*/, seq_per_batch, batch_per_txn);
2328 for (auto w : write_group) {
2329 if (w->CallbackFailed()) {
2330 continue;
2331 }
2332 w->sequence = inserter.sequence();
2333 if (!w->ShouldWriteToMemtable()) {
2334 // In seq_per_batch_ mode this advances the seq by one.
2335 inserter.MaybeAdvanceSeq(true);
2336 continue;
2337 }
2338 SetSequence(w->batch, inserter.sequence());
2339 inserter.set_log_number_ref(w->log_ref);
2340 inserter.set_prot_info(w->batch->prot_info_.get());
2341 w->status = w->batch->Iterate(&inserter);
2342 if (!w->status.ok()) {
2343 return w->status;
2344 }
2345 assert(!seq_per_batch || w->batch_cnt != 0);
2346 assert(!seq_per_batch || inserter.sequence() - w->sequence == w->batch_cnt);
2347 }
2348 return Status::OK();
2349 }
2350
InsertInto(WriteThread::Writer * writer,SequenceNumber sequence,ColumnFamilyMemTables * memtables,FlushScheduler * flush_scheduler,TrimHistoryScheduler * trim_history_scheduler,bool ignore_missing_column_families,uint64_t log_number,DB * db,bool concurrent_memtable_writes,bool seq_per_batch,size_t batch_cnt,bool batch_per_txn,bool hint_per_batch)2351 Status WriteBatchInternal::InsertInto(
2352 WriteThread::Writer* writer, SequenceNumber sequence,
2353 ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
2354 TrimHistoryScheduler* trim_history_scheduler,
2355 bool ignore_missing_column_families, uint64_t log_number, DB* db,
2356 bool concurrent_memtable_writes, bool seq_per_batch, size_t batch_cnt,
2357 bool batch_per_txn, bool hint_per_batch) {
2358 #ifdef NDEBUG
2359 (void)batch_cnt;
2360 #endif
2361 assert(writer->ShouldWriteToMemtable());
2362 MemTableInserter inserter(sequence, memtables, flush_scheduler,
2363 trim_history_scheduler,
2364 ignore_missing_column_families, log_number, db,
2365 concurrent_memtable_writes, nullptr /* prot_info */,
2366 nullptr /*has_valid_writes*/, seq_per_batch,
2367 batch_per_txn, hint_per_batch);
2368 SetSequence(writer->batch, sequence);
2369 inserter.set_log_number_ref(writer->log_ref);
2370 inserter.set_prot_info(writer->batch->prot_info_.get());
2371 Status s = writer->batch->Iterate(&inserter);
2372 assert(!seq_per_batch || batch_cnt != 0);
2373 assert(!seq_per_batch || inserter.sequence() - sequence == batch_cnt);
2374 if (concurrent_memtable_writes) {
2375 inserter.PostProcess();
2376 }
2377 return s;
2378 }
2379
InsertInto(const WriteBatch * batch,ColumnFamilyMemTables * memtables,FlushScheduler * flush_scheduler,TrimHistoryScheduler * trim_history_scheduler,bool ignore_missing_column_families,uint64_t log_number,DB * db,bool concurrent_memtable_writes,SequenceNumber * next_seq,bool * has_valid_writes,bool seq_per_batch,bool batch_per_txn)2380 Status WriteBatchInternal::InsertInto(
2381 const WriteBatch* batch, ColumnFamilyMemTables* memtables,
2382 FlushScheduler* flush_scheduler,
2383 TrimHistoryScheduler* trim_history_scheduler,
2384 bool ignore_missing_column_families, uint64_t log_number, DB* db,
2385 bool concurrent_memtable_writes, SequenceNumber* next_seq,
2386 bool* has_valid_writes, bool seq_per_batch, bool batch_per_txn) {
2387 MemTableInserter inserter(Sequence(batch), memtables, flush_scheduler,
2388 trim_history_scheduler,
2389 ignore_missing_column_families, log_number, db,
2390 concurrent_memtable_writes, batch->prot_info_.get(),
2391 has_valid_writes, seq_per_batch, batch_per_txn);
2392 Status s = batch->Iterate(&inserter);
2393 if (next_seq != nullptr) {
2394 *next_seq = inserter.sequence();
2395 }
2396 if (concurrent_memtable_writes) {
2397 inserter.PostProcess();
2398 }
2399 return s;
2400 }
2401
SetContents(WriteBatch * b,const Slice & contents)2402 Status WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
2403 assert(contents.size() >= WriteBatchInternal::kHeader);
2404 assert(b->prot_info_ == nullptr);
2405 b->rep_.assign(contents.data(), contents.size());
2406 b->content_flags_.store(ContentFlags::DEFERRED, std::memory_order_relaxed);
2407 return Status::OK();
2408 }
2409
Append(WriteBatch * dst,const WriteBatch * src,const bool wal_only)2410 Status WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src,
2411 const bool wal_only) {
2412 assert(dst->Count() == 0 ||
2413 (dst->prot_info_ == nullptr) == (src->prot_info_ == nullptr));
2414 size_t src_len;
2415 int src_count;
2416 uint32_t src_flags;
2417
2418 const SavePoint& batch_end = src->GetWalTerminationPoint();
2419
2420 if (wal_only && !batch_end.is_cleared()) {
2421 src_len = batch_end.size - WriteBatchInternal::kHeader;
2422 src_count = batch_end.count;
2423 src_flags = batch_end.content_flags;
2424 } else {
2425 src_len = src->rep_.size() - WriteBatchInternal::kHeader;
2426 src_count = Count(src);
2427 src_flags = src->content_flags_.load(std::memory_order_relaxed);
2428 }
2429
2430 if (dst->prot_info_ != nullptr) {
2431 std::copy(src->prot_info_->entries_.begin(),
2432 src->prot_info_->entries_.begin() + src_count,
2433 std::back_inserter(dst->prot_info_->entries_));
2434 } else if (src->prot_info_ != nullptr) {
2435 dst->prot_info_.reset(new WriteBatch::ProtectionInfo(*src->prot_info_));
2436 }
2437 SetCount(dst, Count(dst) + src_count);
2438 assert(src->rep_.size() >= WriteBatchInternal::kHeader);
2439 dst->rep_.append(src->rep_.data() + WriteBatchInternal::kHeader, src_len);
2440 dst->content_flags_.store(
2441 dst->content_flags_.load(std::memory_order_relaxed) | src_flags,
2442 std::memory_order_relaxed);
2443 return Status::OK();
2444 }
2445
AppendedByteSize(size_t leftByteSize,size_t rightByteSize)2446 size_t WriteBatchInternal::AppendedByteSize(size_t leftByteSize,
2447 size_t rightByteSize) {
2448 if (leftByteSize == 0 || rightByteSize == 0) {
2449 return leftByteSize + rightByteSize;
2450 } else {
2451 return leftByteSize + rightByteSize - WriteBatchInternal::kHeader;
2452 }
2453 }
2454
2455 } // namespace ROCKSDB_NAMESPACE
2456