1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 //
10 // WriteBatch::rep_ :=
11 // sequence: fixed64
12 // count: fixed32
13 // data: record[count]
14 // record :=
15 // kTypeValue varstring varstring
16 // kTypeDeletion varstring
17 // kTypeSingleDeletion varstring
18 // kTypeRangeDeletion varstring varstring
19 // kTypeMerge varstring varstring
20 // kTypeColumnFamilyValue varint32 varstring varstring
21 // kTypeColumnFamilyDeletion varint32 varstring
22 // kTypeColumnFamilySingleDeletion varint32 varstring
23 // kTypeColumnFamilyRangeDeletion varint32 varstring varstring
24 // kTypeColumnFamilyMerge varint32 varstring varstring
25 // kTypeBeginPrepareXID varstring
26 // kTypeEndPrepareXID
27 // kTypeCommitXID varstring
28 // kTypeRollbackXID varstring
29 // kTypeBeginPersistedPrepareXID varstring
30 // kTypeBeginUnprepareXID varstring
31 // kTypeNoop
32 // varstring :=
33 // len: varint32
34 // data: uint8[len]
35
36 #include "rocksdb/write_batch.h"
37
38 #include <map>
39 #include <stack>
40 #include <stdexcept>
41 #include <type_traits>
42 #include <unordered_map>
43 #include <vector>
44
45 #include "db/column_family.h"
46 #include "db/db_impl/db_impl.h"
47 #include "db/dbformat.h"
48 #include "db/flush_scheduler.h"
49 #include "db/memtable.h"
50 #include "db/merge_context.h"
51 #include "db/snapshot_impl.h"
52 #include "db/trim_history_scheduler.h"
53 #include "db/write_batch_internal.h"
54 #include "monitoring/perf_context_imp.h"
55 #include "monitoring/statistics.h"
56 #include "rocksdb/merge_operator.h"
57 #include "util/autovector.h"
58 #include "util/cast_util.h"
59 #include "util/coding.h"
60 #include "util/duplicate_detector.h"
61 #include "util/string_util.h"
62 #include "util/util.h"
63
64 namespace ROCKSDB_NAMESPACE {
65
66 // anon namespace for file-local types
67 namespace {
68
69 enum ContentFlags : uint32_t {
70 DEFERRED = 1 << 0,
71 HAS_PUT = 1 << 1,
72 HAS_DELETE = 1 << 2,
73 HAS_SINGLE_DELETE = 1 << 3,
74 HAS_MERGE = 1 << 4,
75 HAS_BEGIN_PREPARE = 1 << 5,
76 HAS_END_PREPARE = 1 << 6,
77 HAS_COMMIT = 1 << 7,
78 HAS_ROLLBACK = 1 << 8,
79 HAS_DELETE_RANGE = 1 << 9,
80 HAS_BLOB_INDEX = 1 << 10,
81 HAS_BEGIN_UNPREPARE = 1 << 11,
82 };
83
84 struct BatchContentClassifier : public WriteBatch::Handler {
85 uint32_t content_flags = 0;
86
PutCFROCKSDB_NAMESPACE::__anone6c4e62d0111::BatchContentClassifier87 Status PutCF(uint32_t, const Slice&, const Slice&) override {
88 content_flags |= ContentFlags::HAS_PUT;
89 return Status::OK();
90 }
91
DeleteCFROCKSDB_NAMESPACE::__anone6c4e62d0111::BatchContentClassifier92 Status DeleteCF(uint32_t, const Slice&) override {
93 content_flags |= ContentFlags::HAS_DELETE;
94 return Status::OK();
95 }
96
SingleDeleteCFROCKSDB_NAMESPACE::__anone6c4e62d0111::BatchContentClassifier97 Status SingleDeleteCF(uint32_t, const Slice&) override {
98 content_flags |= ContentFlags::HAS_SINGLE_DELETE;
99 return Status::OK();
100 }
101
DeleteRangeCFROCKSDB_NAMESPACE::__anone6c4e62d0111::BatchContentClassifier102 Status DeleteRangeCF(uint32_t, const Slice&, const Slice&) override {
103 content_flags |= ContentFlags::HAS_DELETE_RANGE;
104 return Status::OK();
105 }
106
MergeCFROCKSDB_NAMESPACE::__anone6c4e62d0111::BatchContentClassifier107 Status MergeCF(uint32_t, const Slice&, const Slice&) override {
108 content_flags |= ContentFlags::HAS_MERGE;
109 return Status::OK();
110 }
111
PutBlobIndexCFROCKSDB_NAMESPACE::__anone6c4e62d0111::BatchContentClassifier112 Status PutBlobIndexCF(uint32_t, const Slice&, const Slice&) override {
113 content_flags |= ContentFlags::HAS_BLOB_INDEX;
114 return Status::OK();
115 }
116
MarkBeginPrepareROCKSDB_NAMESPACE::__anone6c4e62d0111::BatchContentClassifier117 Status MarkBeginPrepare(bool unprepare) override {
118 content_flags |= ContentFlags::HAS_BEGIN_PREPARE;
119 if (unprepare) {
120 content_flags |= ContentFlags::HAS_BEGIN_UNPREPARE;
121 }
122 return Status::OK();
123 }
124
MarkEndPrepareROCKSDB_NAMESPACE::__anone6c4e62d0111::BatchContentClassifier125 Status MarkEndPrepare(const Slice&) override {
126 content_flags |= ContentFlags::HAS_END_PREPARE;
127 return Status::OK();
128 }
129
MarkCommitROCKSDB_NAMESPACE::__anone6c4e62d0111::BatchContentClassifier130 Status MarkCommit(const Slice&) override {
131 content_flags |= ContentFlags::HAS_COMMIT;
132 return Status::OK();
133 }
134
MarkRollbackROCKSDB_NAMESPACE::__anone6c4e62d0111::BatchContentClassifier135 Status MarkRollback(const Slice&) override {
136 content_flags |= ContentFlags::HAS_ROLLBACK;
137 return Status::OK();
138 }
139 };
140
141 class TimestampAssigner : public WriteBatch::Handler {
142 public:
TimestampAssigner(const Slice & ts)143 explicit TimestampAssigner(const Slice& ts)
144 : timestamp_(ts), timestamps_(kEmptyTimestampList) {}
TimestampAssigner(const std::vector<Slice> & ts_list)145 explicit TimestampAssigner(const std::vector<Slice>& ts_list)
146 : timestamps_(ts_list) {
147 SanityCheck();
148 }
~TimestampAssigner()149 ~TimestampAssigner() override {}
150
PutCF(uint32_t,const Slice & key,const Slice &)151 Status PutCF(uint32_t, const Slice& key, const Slice&) override {
152 AssignTimestamp(key);
153 ++idx_;
154 return Status::OK();
155 }
156
DeleteCF(uint32_t,const Slice & key)157 Status DeleteCF(uint32_t, const Slice& key) override {
158 AssignTimestamp(key);
159 ++idx_;
160 return Status::OK();
161 }
162
SingleDeleteCF(uint32_t,const Slice & key)163 Status SingleDeleteCF(uint32_t, const Slice& key) override {
164 AssignTimestamp(key);
165 ++idx_;
166 return Status::OK();
167 }
168
DeleteRangeCF(uint32_t,const Slice & begin_key,const Slice & end_key)169 Status DeleteRangeCF(uint32_t, const Slice& begin_key,
170 const Slice& end_key) override {
171 AssignTimestamp(begin_key);
172 AssignTimestamp(end_key);
173 ++idx_;
174 return Status::OK();
175 }
176
MergeCF(uint32_t,const Slice & key,const Slice &)177 Status MergeCF(uint32_t, const Slice& key, const Slice&) override {
178 AssignTimestamp(key);
179 ++idx_;
180 return Status::OK();
181 }
182
PutBlobIndexCF(uint32_t,const Slice &,const Slice &)183 Status PutBlobIndexCF(uint32_t, const Slice&, const Slice&) override {
184 // TODO (yanqin): support blob db in the future.
185 return Status::OK();
186 }
187
MarkBeginPrepare(bool)188 Status MarkBeginPrepare(bool) override {
189 // TODO (yanqin): support in the future.
190 return Status::OK();
191 }
192
MarkEndPrepare(const Slice &)193 Status MarkEndPrepare(const Slice&) override {
194 // TODO (yanqin): support in the future.
195 return Status::OK();
196 }
197
MarkCommit(const Slice &)198 Status MarkCommit(const Slice&) override {
199 // TODO (yanqin): support in the future.
200 return Status::OK();
201 }
202
MarkRollback(const Slice &)203 Status MarkRollback(const Slice&) override {
204 // TODO (yanqin): support in the future.
205 return Status::OK();
206 }
207
208 private:
SanityCheck() const209 void SanityCheck() const {
210 assert(!timestamps_.empty());
211 #ifndef NDEBUG
212 const size_t ts_sz = timestamps_[0].size();
213 for (size_t i = 1; i != timestamps_.size(); ++i) {
214 assert(ts_sz == timestamps_[i].size());
215 }
216 #endif // !NDEBUG
217 }
218
AssignTimestamp(const Slice & key)219 void AssignTimestamp(const Slice& key) {
220 assert(timestamps_.empty() || idx_ < timestamps_.size());
221 const Slice& ts = timestamps_.empty() ? timestamp_ : timestamps_[idx_];
222 size_t ts_sz = ts.size();
223 char* ptr = const_cast<char*>(key.data() + key.size() - ts_sz);
224 memcpy(ptr, ts.data(), ts_sz);
225 }
226
227 static const std::vector<Slice> kEmptyTimestampList;
228 const Slice timestamp_;
229 const std::vector<Slice>& timestamps_;
230 size_t idx_ = 0;
231
232 // No copy or move.
233 TimestampAssigner(const TimestampAssigner&) = delete;
234 TimestampAssigner(TimestampAssigner&&) = delete;
235 TimestampAssigner& operator=(const TimestampAssigner&) = delete;
236 TimestampAssigner&& operator=(TimestampAssigner&&) = delete;
237 };
238 const std::vector<Slice> TimestampAssigner::kEmptyTimestampList;
239
240 } // anon namespace
241
242 struct SavePoints {
243 std::stack<SavePoint, autovector<SavePoint>> stack;
244 };
245
WriteBatch(size_t reserved_bytes,size_t max_bytes)246 WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes)
247 : content_flags_(0), max_bytes_(max_bytes), rep_(), timestamp_size_(0) {
248 rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader)
249 ? reserved_bytes
250 : WriteBatchInternal::kHeader);
251 rep_.resize(WriteBatchInternal::kHeader);
252 }
253
WriteBatch(size_t reserved_bytes,size_t max_bytes,size_t ts_sz)254 WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes, size_t ts_sz)
255 : content_flags_(0), max_bytes_(max_bytes), rep_(), timestamp_size_(ts_sz) {
256 rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader) ?
257 reserved_bytes : WriteBatchInternal::kHeader);
258 rep_.resize(WriteBatchInternal::kHeader);
259 }
260
WriteBatch(const std::string & rep)261 WriteBatch::WriteBatch(const std::string& rep)
262 : content_flags_(ContentFlags::DEFERRED),
263 max_bytes_(0),
264 rep_(rep),
265 timestamp_size_(0) {}
266
WriteBatch(std::string && rep)267 WriteBatch::WriteBatch(std::string&& rep)
268 : content_flags_(ContentFlags::DEFERRED),
269 max_bytes_(0),
270 rep_(std::move(rep)),
271 timestamp_size_(0) {}
272
WriteBatch(const WriteBatch & src)273 WriteBatch::WriteBatch(const WriteBatch& src)
274 : wal_term_point_(src.wal_term_point_),
275 content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
276 max_bytes_(src.max_bytes_),
277 rep_(src.rep_),
278 timestamp_size_(src.timestamp_size_) {
279 if (src.save_points_ != nullptr) {
280 save_points_.reset(new SavePoints());
281 save_points_->stack = src.save_points_->stack;
282 }
283 }
284
WriteBatch(WriteBatch && src)285 WriteBatch::WriteBatch(WriteBatch&& src) noexcept
286 : save_points_(std::move(src.save_points_)),
287 wal_term_point_(std::move(src.wal_term_point_)),
288 content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
289 max_bytes_(src.max_bytes_),
290 rep_(std::move(src.rep_)),
291 timestamp_size_(src.timestamp_size_) {}
292
operator =(const WriteBatch & src)293 WriteBatch& WriteBatch::operator=(const WriteBatch& src) {
294 if (&src != this) {
295 this->~WriteBatch();
296 new (this) WriteBatch(src);
297 }
298 return *this;
299 }
300
operator =(WriteBatch && src)301 WriteBatch& WriteBatch::operator=(WriteBatch&& src) {
302 if (&src != this) {
303 this->~WriteBatch();
304 new (this) WriteBatch(std::move(src));
305 }
306 return *this;
307 }
308
~WriteBatch()309 WriteBatch::~WriteBatch() { }
310
~Handler()311 WriteBatch::Handler::~Handler() { }
312
LogData(const Slice &)313 void WriteBatch::Handler::LogData(const Slice& /*blob*/) {
314 // If the user has not specified something to do with blobs, then we ignore
315 // them.
316 }
317
Continue()318 bool WriteBatch::Handler::Continue() {
319 return true;
320 }
321
Clear()322 void WriteBatch::Clear() {
323 rep_.clear();
324 rep_.resize(WriteBatchInternal::kHeader);
325
326 content_flags_.store(0, std::memory_order_relaxed);
327
328 if (save_points_ != nullptr) {
329 while (!save_points_->stack.empty()) {
330 save_points_->stack.pop();
331 }
332 }
333
334 wal_term_point_.clear();
335 }
336
Count() const337 uint32_t WriteBatch::Count() const { return WriteBatchInternal::Count(this); }
338
ComputeContentFlags() const339 uint32_t WriteBatch::ComputeContentFlags() const {
340 auto rv = content_flags_.load(std::memory_order_relaxed);
341 if ((rv & ContentFlags::DEFERRED) != 0) {
342 BatchContentClassifier classifier;
343 Iterate(&classifier);
344 rv = classifier.content_flags;
345
346 // this method is conceptually const, because it is performing a lazy
347 // computation that doesn't affect the abstract state of the batch.
348 // content_flags_ is marked mutable so that we can perform the
349 // following assignment
350 content_flags_.store(rv, std::memory_order_relaxed);
351 }
352 return rv;
353 }
354
MarkWalTerminationPoint()355 void WriteBatch::MarkWalTerminationPoint() {
356 wal_term_point_.size = GetDataSize();
357 wal_term_point_.count = Count();
358 wal_term_point_.content_flags = content_flags_;
359 }
360
HasPut() const361 bool WriteBatch::HasPut() const {
362 return (ComputeContentFlags() & ContentFlags::HAS_PUT) != 0;
363 }
364
HasDelete() const365 bool WriteBatch::HasDelete() const {
366 return (ComputeContentFlags() & ContentFlags::HAS_DELETE) != 0;
367 }
368
HasSingleDelete() const369 bool WriteBatch::HasSingleDelete() const {
370 return (ComputeContentFlags() & ContentFlags::HAS_SINGLE_DELETE) != 0;
371 }
372
HasDeleteRange() const373 bool WriteBatch::HasDeleteRange() const {
374 return (ComputeContentFlags() & ContentFlags::HAS_DELETE_RANGE) != 0;
375 }
376
HasMerge() const377 bool WriteBatch::HasMerge() const {
378 return (ComputeContentFlags() & ContentFlags::HAS_MERGE) != 0;
379 }
380
ReadKeyFromWriteBatchEntry(Slice * input,Slice * key,bool cf_record)381 bool ReadKeyFromWriteBatchEntry(Slice* input, Slice* key, bool cf_record) {
382 assert(input != nullptr && key != nullptr);
383 // Skip tag byte
384 input->remove_prefix(1);
385
386 if (cf_record) {
387 // Skip column_family bytes
388 uint32_t cf;
389 if (!GetVarint32(input, &cf)) {
390 return false;
391 }
392 }
393
394 // Extract key
395 return GetLengthPrefixedSlice(input, key);
396 }
397
HasBeginPrepare() const398 bool WriteBatch::HasBeginPrepare() const {
399 return (ComputeContentFlags() & ContentFlags::HAS_BEGIN_PREPARE) != 0;
400 }
401
HasEndPrepare() const402 bool WriteBatch::HasEndPrepare() const {
403 return (ComputeContentFlags() & ContentFlags::HAS_END_PREPARE) != 0;
404 }
405
HasCommit() const406 bool WriteBatch::HasCommit() const {
407 return (ComputeContentFlags() & ContentFlags::HAS_COMMIT) != 0;
408 }
409
HasRollback() const410 bool WriteBatch::HasRollback() const {
411 return (ComputeContentFlags() & ContentFlags::HAS_ROLLBACK) != 0;
412 }
413
ReadRecordFromWriteBatch(Slice * input,char * tag,uint32_t * column_family,Slice * key,Slice * value,Slice * blob,Slice * xid)414 Status ReadRecordFromWriteBatch(Slice* input, char* tag,
415 uint32_t* column_family, Slice* key,
416 Slice* value, Slice* blob, Slice* xid) {
417 assert(key != nullptr && value != nullptr);
418 *tag = (*input)[0];
419 input->remove_prefix(1);
420 *column_family = 0; // default
421 switch (*tag) {
422 case kTypeColumnFamilyValue:
423 if (!GetVarint32(input, column_family)) {
424 return Status::Corruption("bad WriteBatch Put");
425 }
426 FALLTHROUGH_INTENDED;
427 case kTypeValue:
428 if (!GetLengthPrefixedSlice(input, key) ||
429 !GetLengthPrefixedSlice(input, value)) {
430 return Status::Corruption("bad WriteBatch Put");
431 }
432 break;
433 case kTypeColumnFamilyDeletion:
434 case kTypeColumnFamilySingleDeletion:
435 if (!GetVarint32(input, column_family)) {
436 return Status::Corruption("bad WriteBatch Delete");
437 }
438 FALLTHROUGH_INTENDED;
439 case kTypeDeletion:
440 case kTypeSingleDeletion:
441 if (!GetLengthPrefixedSlice(input, key)) {
442 return Status::Corruption("bad WriteBatch Delete");
443 }
444 break;
445 case kTypeColumnFamilyRangeDeletion:
446 if (!GetVarint32(input, column_family)) {
447 return Status::Corruption("bad WriteBatch DeleteRange");
448 }
449 FALLTHROUGH_INTENDED;
450 case kTypeRangeDeletion:
451 // for range delete, "key" is begin_key, "value" is end_key
452 if (!GetLengthPrefixedSlice(input, key) ||
453 !GetLengthPrefixedSlice(input, value)) {
454 return Status::Corruption("bad WriteBatch DeleteRange");
455 }
456 break;
457 case kTypeColumnFamilyMerge:
458 if (!GetVarint32(input, column_family)) {
459 return Status::Corruption("bad WriteBatch Merge");
460 }
461 FALLTHROUGH_INTENDED;
462 case kTypeMerge:
463 if (!GetLengthPrefixedSlice(input, key) ||
464 !GetLengthPrefixedSlice(input, value)) {
465 return Status::Corruption("bad WriteBatch Merge");
466 }
467 break;
468 case kTypeColumnFamilyBlobIndex:
469 if (!GetVarint32(input, column_family)) {
470 return Status::Corruption("bad WriteBatch BlobIndex");
471 }
472 FALLTHROUGH_INTENDED;
473 case kTypeBlobIndex:
474 if (!GetLengthPrefixedSlice(input, key) ||
475 !GetLengthPrefixedSlice(input, value)) {
476 return Status::Corruption("bad WriteBatch BlobIndex");
477 }
478 break;
479 case kTypeLogData:
480 assert(blob != nullptr);
481 if (!GetLengthPrefixedSlice(input, blob)) {
482 return Status::Corruption("bad WriteBatch Blob");
483 }
484 break;
485 case kTypeNoop:
486 case kTypeBeginPrepareXID:
487 // This indicates that the prepared batch is also persisted in the db.
488 // This is used in WritePreparedTxn
489 case kTypeBeginPersistedPrepareXID:
490 // This is used in WriteUnpreparedTxn
491 case kTypeBeginUnprepareXID:
492 break;
493 case kTypeEndPrepareXID:
494 if (!GetLengthPrefixedSlice(input, xid)) {
495 return Status::Corruption("bad EndPrepare XID");
496 }
497 break;
498 case kTypeCommitXID:
499 if (!GetLengthPrefixedSlice(input, xid)) {
500 return Status::Corruption("bad Commit XID");
501 }
502 break;
503 case kTypeRollbackXID:
504 if (!GetLengthPrefixedSlice(input, xid)) {
505 return Status::Corruption("bad Rollback XID");
506 }
507 break;
508 default:
509 return Status::Corruption("unknown WriteBatch tag");
510 }
511 return Status::OK();
512 }
513
Iterate(Handler * handler) const514 Status WriteBatch::Iterate(Handler* handler) const {
515 if (rep_.size() < WriteBatchInternal::kHeader) {
516 return Status::Corruption("malformed WriteBatch (too small)");
517 }
518
519 return WriteBatchInternal::Iterate(this, handler, WriteBatchInternal::kHeader,
520 rep_.size());
521 }
522
Iterate(const WriteBatch * wb,WriteBatch::Handler * handler,size_t begin,size_t end)523 Status WriteBatchInternal::Iterate(const WriteBatch* wb,
524 WriteBatch::Handler* handler, size_t begin,
525 size_t end) {
526 if (begin > wb->rep_.size() || end > wb->rep_.size() || end < begin) {
527 return Status::Corruption("Invalid start/end bounds for Iterate");
528 }
529 assert(begin <= end);
530 Slice input(wb->rep_.data() + begin, static_cast<size_t>(end - begin));
531 bool whole_batch =
532 (begin == WriteBatchInternal::kHeader) && (end == wb->rep_.size());
533
534 Slice key, value, blob, xid;
535 // Sometimes a sub-batch starts with a Noop. We want to exclude such Noops as
536 // the batch boundary symbols otherwise we would mis-count the number of
537 // batches. We do that by checking whether the accumulated batch is empty
538 // before seeing the next Noop.
539 bool empty_batch = true;
540 uint32_t found = 0;
541 Status s;
542 char tag = 0;
543 uint32_t column_family = 0; // default
544 bool last_was_try_again = false;
545 bool handler_continue = true;
546 while (((s.ok() && !input.empty()) || UNLIKELY(s.IsTryAgain()))) {
547 handler_continue = handler->Continue();
548 if (!handler_continue) {
549 break;
550 }
551
552 if (LIKELY(!s.IsTryAgain())) {
553 last_was_try_again = false;
554 tag = 0;
555 column_family = 0; // default
556
557 s = ReadRecordFromWriteBatch(&input, &tag, &column_family, &key, &value,
558 &blob, &xid);
559 if (!s.ok()) {
560 return s;
561 }
562 } else {
563 assert(s.IsTryAgain());
564 assert(!last_was_try_again); // to detect infinite loop bugs
565 if (UNLIKELY(last_was_try_again)) {
566 return Status::Corruption(
567 "two consecutive TryAgain in WriteBatch handler; this is either a "
568 "software bug or data corruption.");
569 }
570 last_was_try_again = true;
571 s = Status::OK();
572 }
573
574 switch (tag) {
575 case kTypeColumnFamilyValue:
576 case kTypeValue:
577 assert(wb->content_flags_.load(std::memory_order_relaxed) &
578 (ContentFlags::DEFERRED | ContentFlags::HAS_PUT));
579 s = handler->PutCF(column_family, key, value);
580 if (LIKELY(s.ok())) {
581 empty_batch = false;
582 found++;
583 }
584 break;
585 case kTypeColumnFamilyDeletion:
586 case kTypeDeletion:
587 assert(wb->content_flags_.load(std::memory_order_relaxed) &
588 (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE));
589 s = handler->DeleteCF(column_family, key);
590 if (LIKELY(s.ok())) {
591 empty_batch = false;
592 found++;
593 }
594 break;
595 case kTypeColumnFamilySingleDeletion:
596 case kTypeSingleDeletion:
597 assert(wb->content_flags_.load(std::memory_order_relaxed) &
598 (ContentFlags::DEFERRED | ContentFlags::HAS_SINGLE_DELETE));
599 s = handler->SingleDeleteCF(column_family, key);
600 if (LIKELY(s.ok())) {
601 empty_batch = false;
602 found++;
603 }
604 break;
605 case kTypeColumnFamilyRangeDeletion:
606 case kTypeRangeDeletion:
607 assert(wb->content_flags_.load(std::memory_order_relaxed) &
608 (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE_RANGE));
609 s = handler->DeleteRangeCF(column_family, key, value);
610 if (LIKELY(s.ok())) {
611 empty_batch = false;
612 found++;
613 }
614 break;
615 case kTypeColumnFamilyMerge:
616 case kTypeMerge:
617 assert(wb->content_flags_.load(std::memory_order_relaxed) &
618 (ContentFlags::DEFERRED | ContentFlags::HAS_MERGE));
619 s = handler->MergeCF(column_family, key, value);
620 if (LIKELY(s.ok())) {
621 empty_batch = false;
622 found++;
623 }
624 break;
625 case kTypeColumnFamilyBlobIndex:
626 case kTypeBlobIndex:
627 assert(wb->content_flags_.load(std::memory_order_relaxed) &
628 (ContentFlags::DEFERRED | ContentFlags::HAS_BLOB_INDEX));
629 s = handler->PutBlobIndexCF(column_family, key, value);
630 if (LIKELY(s.ok())) {
631 found++;
632 }
633 break;
634 case kTypeLogData:
635 handler->LogData(blob);
636 // A batch might have nothing but LogData. It is still a batch.
637 empty_batch = false;
638 break;
639 case kTypeBeginPrepareXID:
640 assert(wb->content_flags_.load(std::memory_order_relaxed) &
641 (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE));
642 handler->MarkBeginPrepare();
643 empty_batch = false;
644 if (!handler->WriteAfterCommit()) {
645 s = Status::NotSupported(
646 "WriteCommitted txn tag when write_after_commit_ is disabled (in "
647 "WritePrepared/WriteUnprepared mode). If it is not due to "
648 "corruption, the WAL must be emptied before changing the "
649 "WritePolicy.");
650 }
651 if (handler->WriteBeforePrepare()) {
652 s = Status::NotSupported(
653 "WriteCommitted txn tag when write_before_prepare_ is enabled "
654 "(in WriteUnprepared mode). If it is not due to corruption, the "
655 "WAL must be emptied before changing the WritePolicy.");
656 }
657 break;
658 case kTypeBeginPersistedPrepareXID:
659 assert(wb->content_flags_.load(std::memory_order_relaxed) &
660 (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE));
661 handler->MarkBeginPrepare();
662 empty_batch = false;
663 if (handler->WriteAfterCommit()) {
664 s = Status::NotSupported(
665 "WritePrepared/WriteUnprepared txn tag when write_after_commit_ "
666 "is enabled (in default WriteCommitted mode). If it is not due "
667 "to corruption, the WAL must be emptied before changing the "
668 "WritePolicy.");
669 }
670 break;
671 case kTypeBeginUnprepareXID:
672 assert(wb->content_flags_.load(std::memory_order_relaxed) &
673 (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_UNPREPARE));
674 handler->MarkBeginPrepare(true /* unprepared */);
675 empty_batch = false;
676 if (handler->WriteAfterCommit()) {
677 s = Status::NotSupported(
678 "WriteUnprepared txn tag when write_after_commit_ is enabled (in "
679 "default WriteCommitted mode). If it is not due to corruption, "
680 "the WAL must be emptied before changing the WritePolicy.");
681 }
682 if (!handler->WriteBeforePrepare()) {
683 s = Status::NotSupported(
684 "WriteUnprepared txn tag when write_before_prepare_ is disabled "
685 "(in WriteCommitted/WritePrepared mode). If it is not due to "
686 "corruption, the WAL must be emptied before changing the "
687 "WritePolicy.");
688 }
689 break;
690 case kTypeEndPrepareXID:
691 assert(wb->content_flags_.load(std::memory_order_relaxed) &
692 (ContentFlags::DEFERRED | ContentFlags::HAS_END_PREPARE));
693 handler->MarkEndPrepare(xid);
694 empty_batch = true;
695 break;
696 case kTypeCommitXID:
697 assert(wb->content_flags_.load(std::memory_order_relaxed) &
698 (ContentFlags::DEFERRED | ContentFlags::HAS_COMMIT));
699 handler->MarkCommit(xid);
700 empty_batch = true;
701 break;
702 case kTypeRollbackXID:
703 assert(wb->content_flags_.load(std::memory_order_relaxed) &
704 (ContentFlags::DEFERRED | ContentFlags::HAS_ROLLBACK));
705 handler->MarkRollback(xid);
706 empty_batch = true;
707 break;
708 case kTypeNoop:
709 handler->MarkNoop(empty_batch);
710 empty_batch = true;
711 break;
712 default:
713 return Status::Corruption("unknown WriteBatch tag");
714 }
715 }
716 if (!s.ok()) {
717 return s;
718 }
719 if (handler_continue && whole_batch &&
720 found != WriteBatchInternal::Count(wb)) {
721 return Status::Corruption("WriteBatch has wrong count");
722 } else {
723 return Status::OK();
724 }
725 }
726
IsLatestPersistentState(const WriteBatch * b)727 bool WriteBatchInternal::IsLatestPersistentState(const WriteBatch* b) {
728 return b->is_latest_persistent_state_;
729 }
730
SetAsLastestPersistentState(WriteBatch * b)731 void WriteBatchInternal::SetAsLastestPersistentState(WriteBatch* b) {
732 b->is_latest_persistent_state_ = true;
733 }
734
Count(const WriteBatch * b)735 uint32_t WriteBatchInternal::Count(const WriteBatch* b) {
736 return DecodeFixed32(b->rep_.data() + 8);
737 }
738
SetCount(WriteBatch * b,uint32_t n)739 void WriteBatchInternal::SetCount(WriteBatch* b, uint32_t n) {
740 EncodeFixed32(&b->rep_[8], n);
741 }
742
Sequence(const WriteBatch * b)743 SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) {
744 return SequenceNumber(DecodeFixed64(b->rep_.data()));
745 }
746
SetSequence(WriteBatch * b,SequenceNumber seq)747 void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
748 EncodeFixed64(&b->rep_[0], seq);
749 }
750
GetFirstOffset(WriteBatch *)751 size_t WriteBatchInternal::GetFirstOffset(WriteBatch* /*b*/) {
752 return WriteBatchInternal::kHeader;
753 }
754
Put(WriteBatch * b,uint32_t column_family_id,const Slice & key,const Slice & value)755 Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
756 const Slice& key, const Slice& value) {
757 if (key.size() > size_t{port::kMaxUint32}) {
758 return Status::InvalidArgument("key is too large");
759 }
760 if (value.size() > size_t{port::kMaxUint32}) {
761 return Status::InvalidArgument("value is too large");
762 }
763
764 LocalSavePoint save(b);
765 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
766 if (column_family_id == 0) {
767 b->rep_.push_back(static_cast<char>(kTypeValue));
768 } else {
769 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
770 PutVarint32(&b->rep_, column_family_id);
771 }
772 if (0 == b->timestamp_size_) {
773 PutLengthPrefixedSlice(&b->rep_, key);
774 } else {
775 PutVarint32(&b->rep_,
776 static_cast<uint32_t>(key.size() + b->timestamp_size_));
777 b->rep_.append(key.data(), key.size());
778 b->rep_.append(b->timestamp_size_, '\0');
779 }
780 PutLengthPrefixedSlice(&b->rep_, value);
781 b->content_flags_.store(
782 b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
783 std::memory_order_relaxed);
784 return save.commit();
785 }
786
Put(ColumnFamilyHandle * column_family,const Slice & key,const Slice & value)787 Status WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key,
788 const Slice& value) {
789 return WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key,
790 value);
791 }
792
CheckSlicePartsLength(const SliceParts & key,const SliceParts & value)793 Status WriteBatchInternal::CheckSlicePartsLength(const SliceParts& key,
794 const SliceParts& value) {
795 size_t total_key_bytes = 0;
796 for (int i = 0; i < key.num_parts; ++i) {
797 total_key_bytes += key.parts[i].size();
798 }
799 if (total_key_bytes >= size_t{port::kMaxUint32}) {
800 return Status::InvalidArgument("key is too large");
801 }
802
803 size_t total_value_bytes = 0;
804 for (int i = 0; i < value.num_parts; ++i) {
805 total_value_bytes += value.parts[i].size();
806 }
807 if (total_value_bytes >= size_t{port::kMaxUint32}) {
808 return Status::InvalidArgument("value is too large");
809 }
810 return Status::OK();
811 }
812
Put(WriteBatch * b,uint32_t column_family_id,const SliceParts & key,const SliceParts & value)813 Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
814 const SliceParts& key, const SliceParts& value) {
815 Status s = CheckSlicePartsLength(key, value);
816 if (!s.ok()) {
817 return s;
818 }
819
820 LocalSavePoint save(b);
821 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
822 if (column_family_id == 0) {
823 b->rep_.push_back(static_cast<char>(kTypeValue));
824 } else {
825 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
826 PutVarint32(&b->rep_, column_family_id);
827 }
828 if (0 == b->timestamp_size_) {
829 PutLengthPrefixedSliceParts(&b->rep_, key);
830 } else {
831 PutLengthPrefixedSlicePartsWithPadding(&b->rep_, key, b->timestamp_size_);
832 }
833 PutLengthPrefixedSliceParts(&b->rep_, value);
834 b->content_flags_.store(
835 b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
836 std::memory_order_relaxed);
837 return save.commit();
838 }
839
Put(ColumnFamilyHandle * column_family,const SliceParts & key,const SliceParts & value)840 Status WriteBatch::Put(ColumnFamilyHandle* column_family, const SliceParts& key,
841 const SliceParts& value) {
842 return WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key,
843 value);
844 }
845
InsertNoop(WriteBatch * b)846 Status WriteBatchInternal::InsertNoop(WriteBatch* b) {
847 b->rep_.push_back(static_cast<char>(kTypeNoop));
848 return Status::OK();
849 }
850
MarkEndPrepare(WriteBatch * b,const Slice & xid,bool write_after_commit,bool unprepared_batch)851 Status WriteBatchInternal::MarkEndPrepare(WriteBatch* b, const Slice& xid,
852 bool write_after_commit,
853 bool unprepared_batch) {
854 // a manually constructed batch can only contain one prepare section
855 assert(b->rep_[12] == static_cast<char>(kTypeNoop));
856
857 // all savepoints up to this point are cleared
858 if (b->save_points_ != nullptr) {
859 while (!b->save_points_->stack.empty()) {
860 b->save_points_->stack.pop();
861 }
862 }
863
864 // rewrite noop as begin marker
865 b->rep_[12] = static_cast<char>(
866 write_after_commit ? kTypeBeginPrepareXID
867 : (unprepared_batch ? kTypeBeginUnprepareXID
868 : kTypeBeginPersistedPrepareXID));
869 b->rep_.push_back(static_cast<char>(kTypeEndPrepareXID));
870 PutLengthPrefixedSlice(&b->rep_, xid);
871 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
872 ContentFlags::HAS_END_PREPARE |
873 ContentFlags::HAS_BEGIN_PREPARE,
874 std::memory_order_relaxed);
875 if (unprepared_batch) {
876 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
877 ContentFlags::HAS_BEGIN_UNPREPARE,
878 std::memory_order_relaxed);
879 }
880 return Status::OK();
881 }
882
MarkCommit(WriteBatch * b,const Slice & xid)883 Status WriteBatchInternal::MarkCommit(WriteBatch* b, const Slice& xid) {
884 b->rep_.push_back(static_cast<char>(kTypeCommitXID));
885 PutLengthPrefixedSlice(&b->rep_, xid);
886 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
887 ContentFlags::HAS_COMMIT,
888 std::memory_order_relaxed);
889 return Status::OK();
890 }
891
MarkRollback(WriteBatch * b,const Slice & xid)892 Status WriteBatchInternal::MarkRollback(WriteBatch* b, const Slice& xid) {
893 b->rep_.push_back(static_cast<char>(kTypeRollbackXID));
894 PutLengthPrefixedSlice(&b->rep_, xid);
895 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
896 ContentFlags::HAS_ROLLBACK,
897 std::memory_order_relaxed);
898 return Status::OK();
899 }
900
Delete(WriteBatch * b,uint32_t column_family_id,const Slice & key)901 Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
902 const Slice& key) {
903 LocalSavePoint save(b);
904 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
905 if (column_family_id == 0) {
906 b->rep_.push_back(static_cast<char>(kTypeDeletion));
907 } else {
908 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion));
909 PutVarint32(&b->rep_, column_family_id);
910 }
911 PutLengthPrefixedSlice(&b->rep_, key);
912 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
913 ContentFlags::HAS_DELETE,
914 std::memory_order_relaxed);
915 return save.commit();
916 }
917
Delete(ColumnFamilyHandle * column_family,const Slice & key)918 Status WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key) {
919 return WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family),
920 key);
921 }
922
Delete(WriteBatch * b,uint32_t column_family_id,const SliceParts & key)923 Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
924 const SliceParts& key) {
925 LocalSavePoint save(b);
926 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
927 if (column_family_id == 0) {
928 b->rep_.push_back(static_cast<char>(kTypeDeletion));
929 } else {
930 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion));
931 PutVarint32(&b->rep_, column_family_id);
932 }
933 PutLengthPrefixedSliceParts(&b->rep_, key);
934 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
935 ContentFlags::HAS_DELETE,
936 std::memory_order_relaxed);
937 return save.commit();
938 }
939
Delete(ColumnFamilyHandle * column_family,const SliceParts & key)940 Status WriteBatch::Delete(ColumnFamilyHandle* column_family,
941 const SliceParts& key) {
942 return WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family),
943 key);
944 }
945
SingleDelete(WriteBatch * b,uint32_t column_family_id,const Slice & key)946 Status WriteBatchInternal::SingleDelete(WriteBatch* b,
947 uint32_t column_family_id,
948 const Slice& key) {
949 LocalSavePoint save(b);
950 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
951 if (column_family_id == 0) {
952 b->rep_.push_back(static_cast<char>(kTypeSingleDeletion));
953 } else {
954 b->rep_.push_back(static_cast<char>(kTypeColumnFamilySingleDeletion));
955 PutVarint32(&b->rep_, column_family_id);
956 }
957 PutLengthPrefixedSlice(&b->rep_, key);
958 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
959 ContentFlags::HAS_SINGLE_DELETE,
960 std::memory_order_relaxed);
961 return save.commit();
962 }
963
SingleDelete(ColumnFamilyHandle * column_family,const Slice & key)964 Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family,
965 const Slice& key) {
966 return WriteBatchInternal::SingleDelete(
967 this, GetColumnFamilyID(column_family), key);
968 }
969
SingleDelete(WriteBatch * b,uint32_t column_family_id,const SliceParts & key)970 Status WriteBatchInternal::SingleDelete(WriteBatch* b,
971 uint32_t column_family_id,
972 const SliceParts& key) {
973 LocalSavePoint save(b);
974 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
975 if (column_family_id == 0) {
976 b->rep_.push_back(static_cast<char>(kTypeSingleDeletion));
977 } else {
978 b->rep_.push_back(static_cast<char>(kTypeColumnFamilySingleDeletion));
979 PutVarint32(&b->rep_, column_family_id);
980 }
981 PutLengthPrefixedSliceParts(&b->rep_, key);
982 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
983 ContentFlags::HAS_SINGLE_DELETE,
984 std::memory_order_relaxed);
985 return save.commit();
986 }
987
SingleDelete(ColumnFamilyHandle * column_family,const SliceParts & key)988 Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family,
989 const SliceParts& key) {
990 return WriteBatchInternal::SingleDelete(
991 this, GetColumnFamilyID(column_family), key);
992 }
993
DeleteRange(WriteBatch * b,uint32_t column_family_id,const Slice & begin_key,const Slice & end_key)994 Status WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id,
995 const Slice& begin_key,
996 const Slice& end_key) {
997 LocalSavePoint save(b);
998 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
999 if (column_family_id == 0) {
1000 b->rep_.push_back(static_cast<char>(kTypeRangeDeletion));
1001 } else {
1002 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyRangeDeletion));
1003 PutVarint32(&b->rep_, column_family_id);
1004 }
1005 PutLengthPrefixedSlice(&b->rep_, begin_key);
1006 PutLengthPrefixedSlice(&b->rep_, end_key);
1007 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1008 ContentFlags::HAS_DELETE_RANGE,
1009 std::memory_order_relaxed);
1010 return save.commit();
1011 }
1012
DeleteRange(ColumnFamilyHandle * column_family,const Slice & begin_key,const Slice & end_key)1013 Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family,
1014 const Slice& begin_key, const Slice& end_key) {
1015 return WriteBatchInternal::DeleteRange(this, GetColumnFamilyID(column_family),
1016 begin_key, end_key);
1017 }
1018
DeleteRange(WriteBatch * b,uint32_t column_family_id,const SliceParts & begin_key,const SliceParts & end_key)1019 Status WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id,
1020 const SliceParts& begin_key,
1021 const SliceParts& end_key) {
1022 LocalSavePoint save(b);
1023 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1024 if (column_family_id == 0) {
1025 b->rep_.push_back(static_cast<char>(kTypeRangeDeletion));
1026 } else {
1027 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyRangeDeletion));
1028 PutVarint32(&b->rep_, column_family_id);
1029 }
1030 PutLengthPrefixedSliceParts(&b->rep_, begin_key);
1031 PutLengthPrefixedSliceParts(&b->rep_, end_key);
1032 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1033 ContentFlags::HAS_DELETE_RANGE,
1034 std::memory_order_relaxed);
1035 return save.commit();
1036 }
1037
DeleteRange(ColumnFamilyHandle * column_family,const SliceParts & begin_key,const SliceParts & end_key)1038 Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family,
1039 const SliceParts& begin_key,
1040 const SliceParts& end_key) {
1041 return WriteBatchInternal::DeleteRange(this, GetColumnFamilyID(column_family),
1042 begin_key, end_key);
1043 }
1044
Merge(WriteBatch * b,uint32_t column_family_id,const Slice & key,const Slice & value)1045 Status WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
1046 const Slice& key, const Slice& value) {
1047 if (key.size() > size_t{port::kMaxUint32}) {
1048 return Status::InvalidArgument("key is too large");
1049 }
1050 if (value.size() > size_t{port::kMaxUint32}) {
1051 return Status::InvalidArgument("value is too large");
1052 }
1053
1054 LocalSavePoint save(b);
1055 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1056 if (column_family_id == 0) {
1057 b->rep_.push_back(static_cast<char>(kTypeMerge));
1058 } else {
1059 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyMerge));
1060 PutVarint32(&b->rep_, column_family_id);
1061 }
1062 PutLengthPrefixedSlice(&b->rep_, key);
1063 PutLengthPrefixedSlice(&b->rep_, value);
1064 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1065 ContentFlags::HAS_MERGE,
1066 std::memory_order_relaxed);
1067 return save.commit();
1068 }
1069
Merge(ColumnFamilyHandle * column_family,const Slice & key,const Slice & value)1070 Status WriteBatch::Merge(ColumnFamilyHandle* column_family, const Slice& key,
1071 const Slice& value) {
1072 return WriteBatchInternal::Merge(this, GetColumnFamilyID(column_family), key,
1073 value);
1074 }
1075
Merge(WriteBatch * b,uint32_t column_family_id,const SliceParts & key,const SliceParts & value)1076 Status WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
1077 const SliceParts& key,
1078 const SliceParts& value) {
1079 Status s = CheckSlicePartsLength(key, value);
1080 if (!s.ok()) {
1081 return s;
1082 }
1083
1084 LocalSavePoint save(b);
1085 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1086 if (column_family_id == 0) {
1087 b->rep_.push_back(static_cast<char>(kTypeMerge));
1088 } else {
1089 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyMerge));
1090 PutVarint32(&b->rep_, column_family_id);
1091 }
1092 PutLengthPrefixedSliceParts(&b->rep_, key);
1093 PutLengthPrefixedSliceParts(&b->rep_, value);
1094 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1095 ContentFlags::HAS_MERGE,
1096 std::memory_order_relaxed);
1097 return save.commit();
1098 }
1099
Merge(ColumnFamilyHandle * column_family,const SliceParts & key,const SliceParts & value)1100 Status WriteBatch::Merge(ColumnFamilyHandle* column_family,
1101 const SliceParts& key, const SliceParts& value) {
1102 return WriteBatchInternal::Merge(this, GetColumnFamilyID(column_family), key,
1103 value);
1104 }
1105
PutBlobIndex(WriteBatch * b,uint32_t column_family_id,const Slice & key,const Slice & value)1106 Status WriteBatchInternal::PutBlobIndex(WriteBatch* b,
1107 uint32_t column_family_id,
1108 const Slice& key, const Slice& value) {
1109 LocalSavePoint save(b);
1110 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1111 if (column_family_id == 0) {
1112 b->rep_.push_back(static_cast<char>(kTypeBlobIndex));
1113 } else {
1114 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyBlobIndex));
1115 PutVarint32(&b->rep_, column_family_id);
1116 }
1117 PutLengthPrefixedSlice(&b->rep_, key);
1118 PutLengthPrefixedSlice(&b->rep_, value);
1119 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1120 ContentFlags::HAS_BLOB_INDEX,
1121 std::memory_order_relaxed);
1122 return save.commit();
1123 }
1124
PutLogData(const Slice & blob)1125 Status WriteBatch::PutLogData(const Slice& blob) {
1126 LocalSavePoint save(this);
1127 rep_.push_back(static_cast<char>(kTypeLogData));
1128 PutLengthPrefixedSlice(&rep_, blob);
1129 return save.commit();
1130 }
1131
SetSavePoint()1132 void WriteBatch::SetSavePoint() {
1133 if (save_points_ == nullptr) {
1134 save_points_.reset(new SavePoints());
1135 }
1136 // Record length and count of current batch of writes.
1137 save_points_->stack.push(SavePoint(
1138 GetDataSize(), Count(), content_flags_.load(std::memory_order_relaxed)));
1139 }
1140
RollbackToSavePoint()1141 Status WriteBatch::RollbackToSavePoint() {
1142 if (save_points_ == nullptr || save_points_->stack.size() == 0) {
1143 return Status::NotFound();
1144 }
1145
1146 // Pop the most recent savepoint off the stack
1147 SavePoint savepoint = save_points_->stack.top();
1148 save_points_->stack.pop();
1149
1150 assert(savepoint.size <= rep_.size());
1151 assert(static_cast<uint32_t>(savepoint.count) <= Count());
1152
1153 if (savepoint.size == rep_.size()) {
1154 // No changes to rollback
1155 } else if (savepoint.size == 0) {
1156 // Rollback everything
1157 Clear();
1158 } else {
1159 rep_.resize(savepoint.size);
1160 WriteBatchInternal::SetCount(this, savepoint.count);
1161 content_flags_.store(savepoint.content_flags, std::memory_order_relaxed);
1162 }
1163
1164 return Status::OK();
1165 }
1166
PopSavePoint()1167 Status WriteBatch::PopSavePoint() {
1168 if (save_points_ == nullptr || save_points_->stack.size() == 0) {
1169 return Status::NotFound();
1170 }
1171
1172 // Pop the most recent savepoint off the stack
1173 save_points_->stack.pop();
1174
1175 return Status::OK();
1176 }
1177
AssignTimestamp(const Slice & ts)1178 Status WriteBatch::AssignTimestamp(const Slice& ts) {
1179 TimestampAssigner ts_assigner(ts);
1180 return Iterate(&ts_assigner);
1181 }
1182
AssignTimestamps(const std::vector<Slice> & ts_list)1183 Status WriteBatch::AssignTimestamps(const std::vector<Slice>& ts_list) {
1184 TimestampAssigner ts_assigner(ts_list);
1185 return Iterate(&ts_assigner);
1186 }
1187
1188 class MemTableInserter : public WriteBatch::Handler {
1189
1190 SequenceNumber sequence_;
1191 ColumnFamilyMemTables* const cf_mems_;
1192 FlushScheduler* const flush_scheduler_;
1193 TrimHistoryScheduler* const trim_history_scheduler_;
1194 const bool ignore_missing_column_families_;
1195 const uint64_t recovering_log_number_;
1196 // log number that all Memtables inserted into should reference
1197 uint64_t log_number_ref_;
1198 DBImpl* db_;
1199 const bool concurrent_memtable_writes_;
1200 bool post_info_created_;
1201
1202 bool* has_valid_writes_;
1203 // On some (!) platforms just default creating
1204 // a map is too expensive in the Write() path as they
1205 // cause memory allocations though unused.
1206 // Make creation optional but do not incur
1207 // std::unique_ptr additional allocation
1208 using MemPostInfoMap = std::map<MemTable*, MemTablePostProcessInfo>;
1209 using PostMapType = std::aligned_storage<sizeof(MemPostInfoMap)>::type;
1210 PostMapType mem_post_info_map_;
1211 // current recovered transaction we are rebuilding (recovery)
1212 WriteBatch* rebuilding_trx_;
1213 SequenceNumber rebuilding_trx_seq_;
1214 // Increase seq number once per each write batch. Otherwise increase it once
1215 // per key.
1216 bool seq_per_batch_;
1217 // Whether the memtable write will be done only after the commit
1218 bool write_after_commit_;
1219 // Whether memtable write can be done before prepare
1220 bool write_before_prepare_;
1221 // Whether this batch was unprepared or not
1222 bool unprepared_batch_;
1223 using DupDetector = std::aligned_storage<sizeof(DuplicateDetector)>::type;
1224 DupDetector duplicate_detector_;
1225 bool dup_dectector_on_;
1226
1227 bool hint_per_batch_;
1228 bool hint_created_;
1229 // Hints for this batch
1230 using HintMap = std::unordered_map<MemTable*, void*>;
1231 using HintMapType = std::aligned_storage<sizeof(HintMap)>::type;
1232 HintMapType hint_;
1233
GetHintMap()1234 HintMap& GetHintMap() {
1235 assert(hint_per_batch_);
1236 if (!hint_created_) {
1237 new (&hint_) HintMap();
1238 hint_created_ = true;
1239 }
1240 return *reinterpret_cast<HintMap*>(&hint_);
1241 }
1242
GetPostMap()1243 MemPostInfoMap& GetPostMap() {
1244 assert(concurrent_memtable_writes_);
1245 if(!post_info_created_) {
1246 new (&mem_post_info_map_) MemPostInfoMap();
1247 post_info_created_ = true;
1248 }
1249 return *reinterpret_cast<MemPostInfoMap*>(&mem_post_info_map_);
1250 }
1251
IsDuplicateKeySeq(uint32_t column_family_id,const Slice & key)1252 bool IsDuplicateKeySeq(uint32_t column_family_id, const Slice& key) {
1253 assert(!write_after_commit_);
1254 assert(rebuilding_trx_ != nullptr);
1255 if (!dup_dectector_on_) {
1256 new (&duplicate_detector_) DuplicateDetector(db_);
1257 dup_dectector_on_ = true;
1258 }
1259 return reinterpret_cast<DuplicateDetector*>
1260 (&duplicate_detector_)->IsDuplicateKeySeq(column_family_id, key, sequence_);
1261 }
1262
1263 protected:
WriteBeforePrepare() const1264 bool WriteBeforePrepare() const override { return write_before_prepare_; }
WriteAfterCommit() const1265 bool WriteAfterCommit() const override { return write_after_commit_; }
1266
1267 public:
1268 // cf_mems should not be shared with concurrent inserters
MemTableInserter(SequenceNumber _sequence,ColumnFamilyMemTables * cf_mems,FlushScheduler * flush_scheduler,TrimHistoryScheduler * trim_history_scheduler,bool ignore_missing_column_families,uint64_t recovering_log_number,DB * db,bool concurrent_memtable_writes,bool * has_valid_writes=nullptr,bool seq_per_batch=false,bool batch_per_txn=true,bool hint_per_batch=false)1269 MemTableInserter(SequenceNumber _sequence, ColumnFamilyMemTables* cf_mems,
1270 FlushScheduler* flush_scheduler,
1271 TrimHistoryScheduler* trim_history_scheduler,
1272 bool ignore_missing_column_families,
1273 uint64_t recovering_log_number, DB* db,
1274 bool concurrent_memtable_writes,
1275 bool* has_valid_writes = nullptr, bool seq_per_batch = false,
1276 bool batch_per_txn = true, bool hint_per_batch = false)
1277 : sequence_(_sequence),
1278 cf_mems_(cf_mems),
1279 flush_scheduler_(flush_scheduler),
1280 trim_history_scheduler_(trim_history_scheduler),
1281 ignore_missing_column_families_(ignore_missing_column_families),
1282 recovering_log_number_(recovering_log_number),
1283 log_number_ref_(0),
1284 db_(static_cast_with_check<DBImpl, DB>(db)),
1285 concurrent_memtable_writes_(concurrent_memtable_writes),
1286 post_info_created_(false),
1287 has_valid_writes_(has_valid_writes),
1288 rebuilding_trx_(nullptr),
1289 rebuilding_trx_seq_(0),
1290 seq_per_batch_(seq_per_batch),
1291 // Write after commit currently uses one seq per key (instead of per
1292 // batch). So seq_per_batch being false indicates write_after_commit
1293 // approach.
1294 write_after_commit_(!seq_per_batch),
1295 // WriteUnprepared can write WriteBatches per transaction, so
1296 // batch_per_txn being false indicates write_before_prepare.
1297 write_before_prepare_(!batch_per_txn),
1298 unprepared_batch_(false),
1299 duplicate_detector_(),
1300 dup_dectector_on_(false),
1301 hint_per_batch_(hint_per_batch),
1302 hint_created_(false) {
1303 assert(cf_mems_);
1304 }
1305
~MemTableInserter()1306 ~MemTableInserter() override {
1307 if (dup_dectector_on_) {
1308 reinterpret_cast<DuplicateDetector*>
1309 (&duplicate_detector_)->~DuplicateDetector();
1310 }
1311 if (post_info_created_) {
1312 reinterpret_cast<MemPostInfoMap*>
1313 (&mem_post_info_map_)->~MemPostInfoMap();
1314 }
1315 if (hint_created_) {
1316 for (auto iter : GetHintMap()) {
1317 delete[] reinterpret_cast<char*>(iter.second);
1318 }
1319 reinterpret_cast<HintMap*>(&hint_)->~HintMap();
1320 }
1321 delete rebuilding_trx_;
1322 }
1323
1324 MemTableInserter(const MemTableInserter&) = delete;
1325 MemTableInserter& operator=(const MemTableInserter&) = delete;
1326
1327 // The batch seq is regularly restarted; In normal mode it is set when
1328 // MemTableInserter is constructed in the write thread and in recovery mode it
1329 // is set when a batch, which is tagged with seq, is read from the WAL.
1330 // Within a sequenced batch, which could be a merge of multiple batches, we
1331 // have two policies to advance the seq: i) seq_per_key (default) and ii)
1332 // seq_per_batch. To implement the latter we need to mark the boundary between
1333 // the individual batches. The approach is this: 1) Use the terminating
1334 // markers to indicate the boundary (kTypeEndPrepareXID, kTypeCommitXID,
1335 // kTypeRollbackXID) 2) Terminate a batch with kTypeNoop in the absence of a
1336 // natural boundary marker.
MaybeAdvanceSeq(bool batch_boundry=false)1337 void MaybeAdvanceSeq(bool batch_boundry = false) {
1338 if (batch_boundry == seq_per_batch_) {
1339 sequence_++;
1340 }
1341 }
1342
set_log_number_ref(uint64_t log)1343 void set_log_number_ref(uint64_t log) { log_number_ref_ = log; }
1344
sequence() const1345 SequenceNumber sequence() const { return sequence_; }
1346
PostProcess()1347 void PostProcess() {
1348 assert(concurrent_memtable_writes_);
1349 // If post info was not created there is nothing
1350 // to process and no need to create on demand
1351 if(post_info_created_) {
1352 for (auto& pair : GetPostMap()) {
1353 pair.first->BatchPostProcess(pair.second);
1354 }
1355 }
1356 }
1357
SeekToColumnFamily(uint32_t column_family_id,Status * s)1358 bool SeekToColumnFamily(uint32_t column_family_id, Status* s) {
1359 // If we are in a concurrent mode, it is the caller's responsibility
1360 // to clone the original ColumnFamilyMemTables so that each thread
1361 // has its own instance. Otherwise, it must be guaranteed that there
1362 // is no concurrent access
1363 bool found = cf_mems_->Seek(column_family_id);
1364 if (!found) {
1365 if (ignore_missing_column_families_) {
1366 *s = Status::OK();
1367 } else {
1368 *s = Status::InvalidArgument(
1369 "Invalid column family specified in write batch");
1370 }
1371 return false;
1372 }
1373 if (recovering_log_number_ != 0 &&
1374 recovering_log_number_ < cf_mems_->GetLogNumber()) {
1375 // This is true only in recovery environment (recovering_log_number_ is
1376 // always 0 in
1377 // non-recovery, regular write code-path)
1378 // * If recovering_log_number_ < cf_mems_->GetLogNumber(), this means that
1379 // column
1380 // family already contains updates from this log. We can't apply updates
1381 // twice because of update-in-place or merge workloads -- ignore the
1382 // update
1383 *s = Status::OK();
1384 return false;
1385 }
1386
1387 if (has_valid_writes_ != nullptr) {
1388 *has_valid_writes_ = true;
1389 }
1390
1391 if (log_number_ref_ > 0) {
1392 cf_mems_->GetMemTable()->RefLogContainingPrepSection(log_number_ref_);
1393 }
1394
1395 return true;
1396 }
1397
PutCFImpl(uint32_t column_family_id,const Slice & key,const Slice & value,ValueType value_type)1398 Status PutCFImpl(uint32_t column_family_id, const Slice& key,
1399 const Slice& value, ValueType value_type) {
1400 // optimize for non-recovery mode
1401 if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
1402 WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value);
1403 return Status::OK();
1404 // else insert the values to the memtable right away
1405 }
1406
1407 Status seek_status;
1408 if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) {
1409 bool batch_boundry = false;
1410 if (rebuilding_trx_ != nullptr) {
1411 assert(!write_after_commit_);
1412 // The CF is probably flushed and hence no need for insert but we still
1413 // need to keep track of the keys for upcoming rollback/commit.
1414 WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value);
1415 batch_boundry = IsDuplicateKeySeq(column_family_id, key);
1416 }
1417 MaybeAdvanceSeq(batch_boundry);
1418 return seek_status;
1419 }
1420 Status ret_status;
1421
1422 MemTable* mem = cf_mems_->GetMemTable();
1423 auto* moptions = mem->GetImmutableMemTableOptions();
1424 // inplace_update_support is inconsistent with snapshots, and therefore with
1425 // any kind of transactions including the ones that use seq_per_batch
1426 assert(!seq_per_batch_ || !moptions->inplace_update_support);
1427 if (!moptions->inplace_update_support) {
1428 bool mem_res =
1429 mem->Add(sequence_, value_type, key, value,
1430 concurrent_memtable_writes_, get_post_process_info(mem),
1431 hint_per_batch_ ? &GetHintMap()[mem] : nullptr);
1432 if (UNLIKELY(!mem_res)) {
1433 assert(seq_per_batch_);
1434 ret_status = Status::TryAgain("key+seq exists");
1435 const bool BATCH_BOUNDRY = true;
1436 MaybeAdvanceSeq(BATCH_BOUNDRY);
1437 }
1438 } else if (moptions->inplace_callback == nullptr) {
1439 assert(!concurrent_memtable_writes_);
1440 mem->Update(sequence_, key, value);
1441 } else {
1442 assert(!concurrent_memtable_writes_);
1443 if (mem->UpdateCallback(sequence_, key, value)) {
1444 } else {
1445 // key not found in memtable. Do sst get, update, add
1446 SnapshotImpl read_from_snapshot;
1447 read_from_snapshot.number_ = sequence_;
1448 ReadOptions ropts;
1449 // it's going to be overwritten for sure, so no point caching data block
1450 // containing the old version
1451 ropts.fill_cache = false;
1452 ropts.snapshot = &read_from_snapshot;
1453
1454 std::string prev_value;
1455 std::string merged_value;
1456
1457 auto cf_handle = cf_mems_->GetColumnFamilyHandle();
1458 Status s = Status::NotSupported();
1459 if (db_ != nullptr && recovering_log_number_ == 0) {
1460 if (cf_handle == nullptr) {
1461 cf_handle = db_->DefaultColumnFamily();
1462 }
1463 s = db_->Get(ropts, cf_handle, key, &prev_value);
1464 }
1465
1466 char* prev_buffer = const_cast<char*>(prev_value.c_str());
1467 uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
1468 auto status = moptions->inplace_callback(s.ok() ? prev_buffer : nullptr,
1469 s.ok() ? &prev_size : nullptr,
1470 value, &merged_value);
1471 if (status == UpdateStatus::UPDATED_INPLACE) {
1472 // prev_value is updated in-place with final value.
1473 bool mem_res __attribute__((__unused__));
1474 mem_res = mem->Add(
1475 sequence_, value_type, key, Slice(prev_buffer, prev_size));
1476 assert(mem_res);
1477 RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
1478 } else if (status == UpdateStatus::UPDATED) {
1479 // merged_value contains the final value.
1480 bool mem_res __attribute__((__unused__));
1481 mem_res =
1482 mem->Add(sequence_, value_type, key, Slice(merged_value));
1483 assert(mem_res);
1484 RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
1485 }
1486 }
1487 }
1488 // optimize for non-recovery mode
1489 if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
1490 assert(!write_after_commit_);
1491 // If the ret_status is TryAgain then let the next try to add the ky to
1492 // the rebuilding transaction object.
1493 WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value);
1494 }
1495 // Since all Puts are logged in transaction logs (if enabled), always bump
1496 // sequence number. Even if the update eventually fails and does not result
1497 // in memtable add/update.
1498 MaybeAdvanceSeq();
1499 CheckMemtableFull();
1500 return ret_status;
1501 }
1502
PutCF(uint32_t column_family_id,const Slice & key,const Slice & value)1503 Status PutCF(uint32_t column_family_id, const Slice& key,
1504 const Slice& value) override {
1505 return PutCFImpl(column_family_id, key, value, kTypeValue);
1506 }
1507
DeleteImpl(uint32_t,const Slice & key,const Slice & value,ValueType delete_type)1508 Status DeleteImpl(uint32_t /*column_family_id*/, const Slice& key,
1509 const Slice& value, ValueType delete_type) {
1510 Status ret_status;
1511 MemTable* mem = cf_mems_->GetMemTable();
1512 bool mem_res =
1513 mem->Add(sequence_, delete_type, key, value,
1514 concurrent_memtable_writes_, get_post_process_info(mem),
1515 hint_per_batch_ ? &GetHintMap()[mem] : nullptr);
1516 if (UNLIKELY(!mem_res)) {
1517 assert(seq_per_batch_);
1518 ret_status = Status::TryAgain("key+seq exists");
1519 const bool BATCH_BOUNDRY = true;
1520 MaybeAdvanceSeq(BATCH_BOUNDRY);
1521 }
1522 MaybeAdvanceSeq();
1523 CheckMemtableFull();
1524 return ret_status;
1525 }
1526
DeleteCF(uint32_t column_family_id,const Slice & key)1527 Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
1528 // optimize for non-recovery mode
1529 if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
1530 WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
1531 return Status::OK();
1532 // else insert the values to the memtable right away
1533 }
1534
1535 Status seek_status;
1536 if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) {
1537 bool batch_boundry = false;
1538 if (rebuilding_trx_ != nullptr) {
1539 assert(!write_after_commit_);
1540 // The CF is probably flushed and hence no need for insert but we still
1541 // need to keep track of the keys for upcoming rollback/commit.
1542 WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
1543 batch_boundry = IsDuplicateKeySeq(column_family_id, key);
1544 }
1545 MaybeAdvanceSeq(batch_boundry);
1546 return seek_status;
1547 }
1548
1549 auto ret_status = DeleteImpl(column_family_id, key, Slice(), kTypeDeletion);
1550 // optimize for non-recovery mode
1551 if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
1552 assert(!write_after_commit_);
1553 // If the ret_status is TryAgain then let the next try to add the ky to
1554 // the rebuilding transaction object.
1555 WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
1556 }
1557 return ret_status;
1558 }
1559
SingleDeleteCF(uint32_t column_family_id,const Slice & key)1560 Status SingleDeleteCF(uint32_t column_family_id, const Slice& key) override {
1561 // optimize for non-recovery mode
1562 if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
1563 WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id, key);
1564 return Status::OK();
1565 // else insert the values to the memtable right away
1566 }
1567
1568 Status seek_status;
1569 if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) {
1570 bool batch_boundry = false;
1571 if (rebuilding_trx_ != nullptr) {
1572 assert(!write_after_commit_);
1573 // The CF is probably flushed and hence no need for insert but we still
1574 // need to keep track of the keys for upcoming rollback/commit.
1575 WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id,
1576 key);
1577 batch_boundry = IsDuplicateKeySeq(column_family_id, key);
1578 }
1579 MaybeAdvanceSeq(batch_boundry);
1580 return seek_status;
1581 }
1582
1583 auto ret_status =
1584 DeleteImpl(column_family_id, key, Slice(), kTypeSingleDeletion);
1585 // optimize for non-recovery mode
1586 if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
1587 assert(!write_after_commit_);
1588 // If the ret_status is TryAgain then let the next try to add the ky to
1589 // the rebuilding transaction object.
1590 WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id, key);
1591 }
1592 return ret_status;
1593 }
1594
DeleteRangeCF(uint32_t column_family_id,const Slice & begin_key,const Slice & end_key)1595 Status DeleteRangeCF(uint32_t column_family_id, const Slice& begin_key,
1596 const Slice& end_key) override {
1597 // optimize for non-recovery mode
1598 if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
1599 WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id,
1600 begin_key, end_key);
1601 return Status::OK();
1602 // else insert the values to the memtable right away
1603 }
1604
1605 Status seek_status;
1606 if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) {
1607 bool batch_boundry = false;
1608 if (rebuilding_trx_ != nullptr) {
1609 assert(!write_after_commit_);
1610 // The CF is probably flushed and hence no need for insert but we still
1611 // need to keep track of the keys for upcoming rollback/commit.
1612 WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id,
1613 begin_key, end_key);
1614 // TODO(myabandeh): when transactional DeleteRange support is added,
1615 // check if end_key must also be added.
1616 batch_boundry = IsDuplicateKeySeq(column_family_id, begin_key);
1617 }
1618 MaybeAdvanceSeq(batch_boundry);
1619 return seek_status;
1620 }
1621 if (db_ != nullptr) {
1622 auto cf_handle = cf_mems_->GetColumnFamilyHandle();
1623 if (cf_handle == nullptr) {
1624 cf_handle = db_->DefaultColumnFamily();
1625 }
1626 auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cf_handle)->cfd();
1627 if (!cfd->is_delete_range_supported()) {
1628 return Status::NotSupported(
1629 std::string("DeleteRange not supported for table type ") +
1630 cfd->ioptions()->table_factory->Name() + " in CF " +
1631 cfd->GetName());
1632 }
1633 }
1634
1635 auto ret_status =
1636 DeleteImpl(column_family_id, begin_key, end_key, kTypeRangeDeletion);
1637 // optimize for non-recovery mode
1638 if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
1639 assert(!write_after_commit_);
1640 // If the ret_status is TryAgain then let the next try to add the ky to
1641 // the rebuilding transaction object.
1642 WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id,
1643 begin_key, end_key);
1644 }
1645 return ret_status;
1646 }
1647
MergeCF(uint32_t column_family_id,const Slice & key,const Slice & value)1648 Status MergeCF(uint32_t column_family_id, const Slice& key,
1649 const Slice& value) override {
1650 // optimize for non-recovery mode
1651 if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
1652 WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key, value);
1653 return Status::OK();
1654 // else insert the values to the memtable right away
1655 }
1656
1657 Status seek_status;
1658 if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) {
1659 bool batch_boundry = false;
1660 if (rebuilding_trx_ != nullptr) {
1661 assert(!write_after_commit_);
1662 // The CF is probably flushed and hence no need for insert but we still
1663 // need to keep track of the keys for upcoming rollback/commit.
1664 WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key,
1665 value);
1666 batch_boundry = IsDuplicateKeySeq(column_family_id, key);
1667 }
1668 MaybeAdvanceSeq(batch_boundry);
1669 return seek_status;
1670 }
1671
1672 Status ret_status;
1673 MemTable* mem = cf_mems_->GetMemTable();
1674 auto* moptions = mem->GetImmutableMemTableOptions();
1675 bool perform_merge = false;
1676 assert(!concurrent_memtable_writes_ ||
1677 moptions->max_successive_merges == 0);
1678
1679 // If we pass DB through and options.max_successive_merges is hit
1680 // during recovery, Get() will be issued which will try to acquire
1681 // DB mutex and cause deadlock, as DB mutex is already held.
1682 // So we disable merge in recovery
1683 if (moptions->max_successive_merges > 0 && db_ != nullptr &&
1684 recovering_log_number_ == 0) {
1685 assert(!concurrent_memtable_writes_);
1686 LookupKey lkey(key, sequence_);
1687
1688 // Count the number of successive merges at the head
1689 // of the key in the memtable
1690 size_t num_merges = mem->CountSuccessiveMergeEntries(lkey);
1691
1692 if (num_merges >= moptions->max_successive_merges) {
1693 perform_merge = true;
1694 }
1695 }
1696
1697 if (perform_merge) {
1698 // 1) Get the existing value
1699 std::string get_value;
1700
1701 // Pass in the sequence number so that we also include previous merge
1702 // operations in the same batch.
1703 SnapshotImpl read_from_snapshot;
1704 read_from_snapshot.number_ = sequence_;
1705 ReadOptions read_options;
1706 read_options.snapshot = &read_from_snapshot;
1707
1708 auto cf_handle = cf_mems_->GetColumnFamilyHandle();
1709 if (cf_handle == nullptr) {
1710 cf_handle = db_->DefaultColumnFamily();
1711 }
1712 db_->Get(read_options, cf_handle, key, &get_value);
1713 Slice get_value_slice = Slice(get_value);
1714
1715 // 2) Apply this merge
1716 auto merge_operator = moptions->merge_operator;
1717 assert(merge_operator);
1718
1719 std::string new_value;
1720
1721 Status merge_status = MergeHelper::TimedFullMerge(
1722 merge_operator, key, &get_value_slice, {value}, &new_value,
1723 moptions->info_log, moptions->statistics, Env::Default());
1724
1725 if (!merge_status.ok()) {
1726 // Failed to merge!
1727 // Store the delta in memtable
1728 perform_merge = false;
1729 } else {
1730 // 3) Add value to memtable
1731 assert(!concurrent_memtable_writes_);
1732 bool mem_res = mem->Add(sequence_, kTypeValue, key, new_value);
1733 if (UNLIKELY(!mem_res)) {
1734 assert(seq_per_batch_);
1735 ret_status = Status::TryAgain("key+seq exists");
1736 const bool BATCH_BOUNDRY = true;
1737 MaybeAdvanceSeq(BATCH_BOUNDRY);
1738 }
1739 }
1740 }
1741
1742 if (!perform_merge) {
1743 // Add merge operator to memtable
1744 bool mem_res =
1745 mem->Add(sequence_, kTypeMerge, key, value,
1746 concurrent_memtable_writes_, get_post_process_info(mem));
1747 if (UNLIKELY(!mem_res)) {
1748 assert(seq_per_batch_);
1749 ret_status = Status::TryAgain("key+seq exists");
1750 const bool BATCH_BOUNDRY = true;
1751 MaybeAdvanceSeq(BATCH_BOUNDRY);
1752 }
1753 }
1754
1755 // optimize for non-recovery mode
1756 if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
1757 assert(!write_after_commit_);
1758 // If the ret_status is TryAgain then let the next try to add the ky to
1759 // the rebuilding transaction object.
1760 WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key, value);
1761 }
1762 MaybeAdvanceSeq();
1763 CheckMemtableFull();
1764 return ret_status;
1765 }
1766
PutBlobIndexCF(uint32_t column_family_id,const Slice & key,const Slice & value)1767 Status PutBlobIndexCF(uint32_t column_family_id, const Slice& key,
1768 const Slice& value) override {
1769 // Same as PutCF except for value type.
1770 return PutCFImpl(column_family_id, key, value, kTypeBlobIndex);
1771 }
1772
CheckMemtableFull()1773 void CheckMemtableFull() {
1774 if (flush_scheduler_ != nullptr) {
1775 auto* cfd = cf_mems_->current();
1776 assert(cfd != nullptr);
1777 if (cfd->mem()->ShouldScheduleFlush() &&
1778 cfd->mem()->MarkFlushScheduled()) {
1779 // MarkFlushScheduled only returns true if we are the one that
1780 // should take action, so no need to dedup further
1781 flush_scheduler_->ScheduleWork(cfd);
1782 }
1783 }
1784 // check if memtable_list size exceeds max_write_buffer_size_to_maintain
1785 if (trim_history_scheduler_ != nullptr) {
1786 auto* cfd = cf_mems_->current();
1787
1788 assert(cfd);
1789 assert(cfd->ioptions());
1790
1791 const size_t size_to_maintain = static_cast<size_t>(
1792 cfd->ioptions()->max_write_buffer_size_to_maintain);
1793
1794 if (size_to_maintain > 0) {
1795 MemTableList* const imm = cfd->imm();
1796 assert(imm);
1797
1798 if (imm->HasHistory()) {
1799 const MemTable* const mem = cfd->mem();
1800 assert(mem);
1801
1802 if (mem->ApproximateMemoryUsageFast() +
1803 imm->ApproximateMemoryUsageExcludingLast() >=
1804 size_to_maintain &&
1805 imm->MarkTrimHistoryNeeded()) {
1806 trim_history_scheduler_->ScheduleWork(cfd);
1807 }
1808 }
1809 }
1810 }
1811 }
1812
1813 // The write batch handler calls MarkBeginPrepare with unprepare set to true
1814 // if it encounters the kTypeBeginUnprepareXID marker.
MarkBeginPrepare(bool unprepare)1815 Status MarkBeginPrepare(bool unprepare) override {
1816 assert(rebuilding_trx_ == nullptr);
1817 assert(db_);
1818
1819 if (recovering_log_number_ != 0) {
1820 // during recovery we rebuild a hollow transaction
1821 // from all encountered prepare sections of the wal
1822 if (db_->allow_2pc() == false) {
1823 return Status::NotSupported(
1824 "WAL contains prepared transactions. Open with "
1825 "TransactionDB::Open().");
1826 }
1827
1828 // we are now iterating through a prepared section
1829 rebuilding_trx_ = new WriteBatch();
1830 rebuilding_trx_seq_ = sequence_;
1831 // Verify that we have matching MarkBeginPrepare/MarkEndPrepare markers.
1832 // unprepared_batch_ should be false because it is false by default, and
1833 // gets reset to false in MarkEndPrepare.
1834 assert(!unprepared_batch_);
1835 unprepared_batch_ = unprepare;
1836
1837 if (has_valid_writes_ != nullptr) {
1838 *has_valid_writes_ = true;
1839 }
1840 }
1841
1842 return Status::OK();
1843 }
1844
MarkEndPrepare(const Slice & name)1845 Status MarkEndPrepare(const Slice& name) override {
1846 assert(db_);
1847 assert((rebuilding_trx_ != nullptr) == (recovering_log_number_ != 0));
1848
1849 if (recovering_log_number_ != 0) {
1850 assert(db_->allow_2pc());
1851 size_t batch_cnt =
1852 write_after_commit_
1853 ? 0 // 0 will disable further checks
1854 : static_cast<size_t>(sequence_ - rebuilding_trx_seq_ + 1);
1855 db_->InsertRecoveredTransaction(recovering_log_number_, name.ToString(),
1856 rebuilding_trx_, rebuilding_trx_seq_,
1857 batch_cnt, unprepared_batch_);
1858 unprepared_batch_ = false;
1859 rebuilding_trx_ = nullptr;
1860 } else {
1861 assert(rebuilding_trx_ == nullptr);
1862 }
1863 const bool batch_boundry = true;
1864 MaybeAdvanceSeq(batch_boundry);
1865
1866 return Status::OK();
1867 }
1868
MarkNoop(bool empty_batch)1869 Status MarkNoop(bool empty_batch) override {
1870 // A hack in pessimistic transaction could result into a noop at the start
1871 // of the write batch, that should be ignored.
1872 if (!empty_batch) {
1873 // In the absence of Prepare markers, a kTypeNoop tag indicates the end of
1874 // a batch. This happens when write batch commits skipping the prepare
1875 // phase.
1876 const bool batch_boundry = true;
1877 MaybeAdvanceSeq(batch_boundry);
1878 }
1879 return Status::OK();
1880 }
1881
MarkCommit(const Slice & name)1882 Status MarkCommit(const Slice& name) override {
1883 assert(db_);
1884
1885 Status s;
1886
1887 if (recovering_log_number_ != 0) {
1888 // in recovery when we encounter a commit marker
1889 // we lookup this transaction in our set of rebuilt transactions
1890 // and commit.
1891 auto trx = db_->GetRecoveredTransaction(name.ToString());
1892
1893 // the log containing the prepared section may have
1894 // been released in the last incarnation because the
1895 // data was flushed to L0
1896 if (trx != nullptr) {
1897 // at this point individual CF lognumbers will prevent
1898 // duplicate re-insertion of values.
1899 assert(log_number_ref_ == 0);
1900 if (write_after_commit_) {
1901 // write_after_commit_ can only have one batch in trx.
1902 assert(trx->batches_.size() == 1);
1903 const auto& batch_info = trx->batches_.begin()->second;
1904 // all inserts must reference this trx log number
1905 log_number_ref_ = batch_info.log_number_;
1906 s = batch_info.batch_->Iterate(this);
1907 log_number_ref_ = 0;
1908 }
1909 // else the values are already inserted before the commit
1910
1911 if (s.ok()) {
1912 db_->DeleteRecoveredTransaction(name.ToString());
1913 }
1914 if (has_valid_writes_ != nullptr) {
1915 *has_valid_writes_ = true;
1916 }
1917 }
1918 } else {
1919 // When writes are not delayed until commit, there is no disconnect
1920 // between a memtable write and the WAL that supports it. So the commit
1921 // need not reference any log as the only log to which it depends.
1922 assert(!write_after_commit_ || log_number_ref_ > 0);
1923 }
1924 const bool batch_boundry = true;
1925 MaybeAdvanceSeq(batch_boundry);
1926
1927 return s;
1928 }
1929
MarkRollback(const Slice & name)1930 Status MarkRollback(const Slice& name) override {
1931 assert(db_);
1932
1933 if (recovering_log_number_ != 0) {
1934 auto trx = db_->GetRecoveredTransaction(name.ToString());
1935
1936 // the log containing the transactions prep section
1937 // may have been released in the previous incarnation
1938 // because we knew it had been rolled back
1939 if (trx != nullptr) {
1940 db_->DeleteRecoveredTransaction(name.ToString());
1941 }
1942 } else {
1943 // in non recovery we simply ignore this tag
1944 }
1945
1946 const bool batch_boundry = true;
1947 MaybeAdvanceSeq(batch_boundry);
1948
1949 return Status::OK();
1950 }
1951
1952 private:
get_post_process_info(MemTable * mem)1953 MemTablePostProcessInfo* get_post_process_info(MemTable* mem) {
1954 if (!concurrent_memtable_writes_) {
1955 // No need to batch counters locally if we don't use concurrent mode.
1956 return nullptr;
1957 }
1958 return &GetPostMap()[mem];
1959 }
1960 };
1961
1962 // This function can only be called in these conditions:
1963 // 1) During Recovery()
1964 // 2) During Write(), in a single-threaded write thread
1965 // 3) During Write(), in a concurrent context where memtables has been cloned
1966 // The reason is that it calls memtables->Seek(), which has a stateful cache
InsertInto(WriteThread::WriteGroup & write_group,SequenceNumber sequence,ColumnFamilyMemTables * memtables,FlushScheduler * flush_scheduler,TrimHistoryScheduler * trim_history_scheduler,bool ignore_missing_column_families,uint64_t recovery_log_number,DB * db,bool concurrent_memtable_writes,bool seq_per_batch,bool batch_per_txn)1967 Status WriteBatchInternal::InsertInto(
1968 WriteThread::WriteGroup& write_group, SequenceNumber sequence,
1969 ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
1970 TrimHistoryScheduler* trim_history_scheduler,
1971 bool ignore_missing_column_families, uint64_t recovery_log_number, DB* db,
1972 bool concurrent_memtable_writes, bool seq_per_batch, bool batch_per_txn) {
1973 MemTableInserter inserter(
1974 sequence, memtables, flush_scheduler, trim_history_scheduler,
1975 ignore_missing_column_families, recovery_log_number, db,
1976 concurrent_memtable_writes, nullptr /*has_valid_writes*/, seq_per_batch,
1977 batch_per_txn);
1978 for (auto w : write_group) {
1979 if (w->CallbackFailed()) {
1980 continue;
1981 }
1982 w->sequence = inserter.sequence();
1983 if (!w->ShouldWriteToMemtable()) {
1984 // In seq_per_batch_ mode this advances the seq by one.
1985 inserter.MaybeAdvanceSeq(true);
1986 continue;
1987 }
1988 SetSequence(w->batch, inserter.sequence());
1989 inserter.set_log_number_ref(w->log_ref);
1990 w->status = w->batch->Iterate(&inserter);
1991 if (!w->status.ok()) {
1992 return w->status;
1993 }
1994 assert(!seq_per_batch || w->batch_cnt != 0);
1995 assert(!seq_per_batch || inserter.sequence() - w->sequence == w->batch_cnt);
1996 }
1997 return Status::OK();
1998 }
1999
InsertInto(WriteThread::Writer * writer,SequenceNumber sequence,ColumnFamilyMemTables * memtables,FlushScheduler * flush_scheduler,TrimHistoryScheduler * trim_history_scheduler,bool ignore_missing_column_families,uint64_t log_number,DB * db,bool concurrent_memtable_writes,bool seq_per_batch,size_t batch_cnt,bool batch_per_txn,bool hint_per_batch)2000 Status WriteBatchInternal::InsertInto(
2001 WriteThread::Writer* writer, SequenceNumber sequence,
2002 ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
2003 TrimHistoryScheduler* trim_history_scheduler,
2004 bool ignore_missing_column_families, uint64_t log_number, DB* db,
2005 bool concurrent_memtable_writes, bool seq_per_batch, size_t batch_cnt,
2006 bool batch_per_txn, bool hint_per_batch) {
2007 #ifdef NDEBUG
2008 (void)batch_cnt;
2009 #endif
2010 assert(writer->ShouldWriteToMemtable());
2011 MemTableInserter inserter(
2012 sequence, memtables, flush_scheduler, trim_history_scheduler,
2013 ignore_missing_column_families, log_number, db,
2014 concurrent_memtable_writes, nullptr /*has_valid_writes*/, seq_per_batch,
2015 batch_per_txn, hint_per_batch);
2016 SetSequence(writer->batch, sequence);
2017 inserter.set_log_number_ref(writer->log_ref);
2018 Status s = writer->batch->Iterate(&inserter);
2019 assert(!seq_per_batch || batch_cnt != 0);
2020 assert(!seq_per_batch || inserter.sequence() - sequence == batch_cnt);
2021 if (concurrent_memtable_writes) {
2022 inserter.PostProcess();
2023 }
2024 return s;
2025 }
2026
InsertInto(const WriteBatch * batch,ColumnFamilyMemTables * memtables,FlushScheduler * flush_scheduler,TrimHistoryScheduler * trim_history_scheduler,bool ignore_missing_column_families,uint64_t log_number,DB * db,bool concurrent_memtable_writes,SequenceNumber * next_seq,bool * has_valid_writes,bool seq_per_batch,bool batch_per_txn)2027 Status WriteBatchInternal::InsertInto(
2028 const WriteBatch* batch, ColumnFamilyMemTables* memtables,
2029 FlushScheduler* flush_scheduler,
2030 TrimHistoryScheduler* trim_history_scheduler,
2031 bool ignore_missing_column_families, uint64_t log_number, DB* db,
2032 bool concurrent_memtable_writes, SequenceNumber* next_seq,
2033 bool* has_valid_writes, bool seq_per_batch, bool batch_per_txn) {
2034 MemTableInserter inserter(Sequence(batch), memtables, flush_scheduler,
2035 trim_history_scheduler,
2036 ignore_missing_column_families, log_number, db,
2037 concurrent_memtable_writes, has_valid_writes,
2038 seq_per_batch, batch_per_txn);
2039 Status s = batch->Iterate(&inserter);
2040 if (next_seq != nullptr) {
2041 *next_seq = inserter.sequence();
2042 }
2043 if (concurrent_memtable_writes) {
2044 inserter.PostProcess();
2045 }
2046 return s;
2047 }
2048
SetContents(WriteBatch * b,const Slice & contents)2049 Status WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
2050 assert(contents.size() >= WriteBatchInternal::kHeader);
2051 b->rep_.assign(contents.data(), contents.size());
2052 b->content_flags_.store(ContentFlags::DEFERRED, std::memory_order_relaxed);
2053 return Status::OK();
2054 }
2055
Append(WriteBatch * dst,const WriteBatch * src,const bool wal_only)2056 Status WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src,
2057 const bool wal_only) {
2058 size_t src_len;
2059 int src_count;
2060 uint32_t src_flags;
2061
2062 const SavePoint& batch_end = src->GetWalTerminationPoint();
2063
2064 if (wal_only && !batch_end.is_cleared()) {
2065 src_len = batch_end.size - WriteBatchInternal::kHeader;
2066 src_count = batch_end.count;
2067 src_flags = batch_end.content_flags;
2068 } else {
2069 src_len = src->rep_.size() - WriteBatchInternal::kHeader;
2070 src_count = Count(src);
2071 src_flags = src->content_flags_.load(std::memory_order_relaxed);
2072 }
2073
2074 SetCount(dst, Count(dst) + src_count);
2075 assert(src->rep_.size() >= WriteBatchInternal::kHeader);
2076 dst->rep_.append(src->rep_.data() + WriteBatchInternal::kHeader, src_len);
2077 dst->content_flags_.store(
2078 dst->content_flags_.load(std::memory_order_relaxed) | src_flags,
2079 std::memory_order_relaxed);
2080 return Status::OK();
2081 }
2082
AppendedByteSize(size_t leftByteSize,size_t rightByteSize)2083 size_t WriteBatchInternal::AppendedByteSize(size_t leftByteSize,
2084 size_t rightByteSize) {
2085 if (leftByteSize == 0 || rightByteSize == 0) {
2086 return leftByteSize + rightByteSize;
2087 } else {
2088 return leftByteSize + rightByteSize - WriteBatchInternal::kHeader;
2089 }
2090 }
2091
2092 } // namespace ROCKSDB_NAMESPACE
2093