1 // Copyright (c) 2018-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "db/range_tombstone_fragmenter.h"
7
8 #include <algorithm>
9 #include <cinttypes>
10 #include <cstdio>
11 #include <functional>
12 #include <set>
13
14 #include "util/autovector.h"
15 #include "util/kv_map.h"
16 #include "util/vector_iterator.h"
17
18 namespace ROCKSDB_NAMESPACE {
19
FragmentedRangeTombstoneList(std::unique_ptr<InternalIterator> unfragmented_tombstones,const InternalKeyComparator & icmp,bool for_compaction,const std::vector<SequenceNumber> & snapshots)20 FragmentedRangeTombstoneList::FragmentedRangeTombstoneList(
21 std::unique_ptr<InternalIterator> unfragmented_tombstones,
22 const InternalKeyComparator& icmp, bool for_compaction,
23 const std::vector<SequenceNumber>& snapshots) {
24 if (unfragmented_tombstones == nullptr) {
25 return;
26 }
27 bool is_sorted = true;
28 InternalKey pinned_last_start_key;
29 Slice last_start_key;
30 num_unfragmented_tombstones_ = 0;
31 for (unfragmented_tombstones->SeekToFirst(); unfragmented_tombstones->Valid();
32 unfragmented_tombstones->Next(), num_unfragmented_tombstones_++) {
33 if (num_unfragmented_tombstones_ > 0 &&
34 icmp.Compare(last_start_key, unfragmented_tombstones->key()) > 0) {
35 is_sorted = false;
36 break;
37 }
38 if (unfragmented_tombstones->IsKeyPinned()) {
39 last_start_key = unfragmented_tombstones->key();
40 } else {
41 pinned_last_start_key.DecodeFrom(unfragmented_tombstones->key());
42 last_start_key = pinned_last_start_key.Encode();
43 }
44 }
45 if (is_sorted) {
46 FragmentTombstones(std::move(unfragmented_tombstones), icmp, for_compaction,
47 snapshots);
48 return;
49 }
50
51 // Sort the tombstones before fragmenting them.
52 std::vector<std::string> keys, values;
53 keys.reserve(num_unfragmented_tombstones_);
54 values.reserve(num_unfragmented_tombstones_);
55 for (unfragmented_tombstones->SeekToFirst(); unfragmented_tombstones->Valid();
56 unfragmented_tombstones->Next()) {
57 keys.emplace_back(unfragmented_tombstones->key().data(),
58 unfragmented_tombstones->key().size());
59 values.emplace_back(unfragmented_tombstones->value().data(),
60 unfragmented_tombstones->value().size());
61 }
62 // VectorIterator implicitly sorts by key during construction.
63 auto iter = std::unique_ptr<VectorIterator>(
64 new VectorIterator(std::move(keys), std::move(values), &icmp));
65 FragmentTombstones(std::move(iter), icmp, for_compaction, snapshots);
66 }
67
FragmentTombstones(std::unique_ptr<InternalIterator> unfragmented_tombstones,const InternalKeyComparator & icmp,bool for_compaction,const std::vector<SequenceNumber> & snapshots)68 void FragmentedRangeTombstoneList::FragmentTombstones(
69 std::unique_ptr<InternalIterator> unfragmented_tombstones,
70 const InternalKeyComparator& icmp, bool for_compaction,
71 const std::vector<SequenceNumber>& snapshots) {
72 Slice cur_start_key(nullptr, 0);
73 auto cmp = ParsedInternalKeyComparator(&icmp);
74
75 // Stores the end keys and sequence numbers of range tombstones with a start
76 // key less than or equal to cur_start_key. Provides an ordering by end key
77 // for use in flush_current_tombstones.
78 std::set<ParsedInternalKey, ParsedInternalKeyComparator> cur_end_keys(cmp);
79
80 // Given the next start key in unfragmented_tombstones,
81 // flush_current_tombstones writes every tombstone fragment that starts
82 // and ends with a key before next_start_key, and starts with a key greater
83 // than or equal to cur_start_key.
84 auto flush_current_tombstones = [&](const Slice& next_start_key) {
85 auto it = cur_end_keys.begin();
86 bool reached_next_start_key = false;
87 for (; it != cur_end_keys.end() && !reached_next_start_key; ++it) {
88 Slice cur_end_key = it->user_key;
89 if (icmp.user_comparator()->Compare(cur_start_key, cur_end_key) == 0) {
90 // Empty tombstone.
91 continue;
92 }
93 if (icmp.user_comparator()->Compare(next_start_key, cur_end_key) <= 0) {
94 // All of the end keys in [it, cur_end_keys.end()) are after
95 // next_start_key, so the tombstones they represent can be used in
96 // fragments that start with keys greater than or equal to
97 // next_start_key. However, the end keys we already passed will not be
98 // used in any more tombstone fragments.
99 //
100 // Remove the fully fragmented tombstones and stop iteration after a
101 // final round of flushing to preserve the tombstones we can create more
102 // fragments from.
103 reached_next_start_key = true;
104 cur_end_keys.erase(cur_end_keys.begin(), it);
105 cur_end_key = next_start_key;
106 }
107
108 // Flush a range tombstone fragment [cur_start_key, cur_end_key), which
109 // should not overlap with the last-flushed tombstone fragment.
110 assert(tombstones_.empty() ||
111 icmp.user_comparator()->Compare(tombstones_.back().end_key,
112 cur_start_key) <= 0);
113
114 // Sort the sequence numbers of the tombstones being fragmented in
115 // descending order, and then flush them in that order.
116 autovector<SequenceNumber> seqnums_to_flush;
117 for (auto flush_it = it; flush_it != cur_end_keys.end(); ++flush_it) {
118 seqnums_to_flush.push_back(flush_it->sequence);
119 }
120 std::sort(seqnums_to_flush.begin(), seqnums_to_flush.end(),
121 std::greater<SequenceNumber>());
122
123 size_t start_idx = tombstone_seqs_.size();
124 size_t end_idx = start_idx + seqnums_to_flush.size();
125
126 if (for_compaction) {
127 // Drop all tombstone seqnums that are not preserved by a snapshot.
128 SequenceNumber next_snapshot = kMaxSequenceNumber;
129 for (auto seq : seqnums_to_flush) {
130 if (seq <= next_snapshot) {
131 // This seqnum is visible by a lower snapshot.
132 tombstone_seqs_.push_back(seq);
133 seq_set_.insert(seq);
134 auto upper_bound_it =
135 std::lower_bound(snapshots.begin(), snapshots.end(), seq);
136 if (upper_bound_it == snapshots.begin()) {
137 // This seqnum is the topmost one visible by the earliest
138 // snapshot. None of the seqnums below it will be visible, so we
139 // can skip them.
140 break;
141 }
142 next_snapshot = *std::prev(upper_bound_it);
143 }
144 }
145 end_idx = tombstone_seqs_.size();
146 } else {
147 // The fragmentation is being done for reads, so preserve all seqnums.
148 tombstone_seqs_.insert(tombstone_seqs_.end(), seqnums_to_flush.begin(),
149 seqnums_to_flush.end());
150 seq_set_.insert(seqnums_to_flush.begin(), seqnums_to_flush.end());
151 }
152
153 assert(start_idx < end_idx);
154 tombstones_.emplace_back(cur_start_key, cur_end_key, start_idx, end_idx);
155
156 cur_start_key = cur_end_key;
157 }
158 if (!reached_next_start_key) {
159 // There is a gap between the last flushed tombstone fragment and
160 // the next tombstone's start key. Remove all the end keys in
161 // the working set, since we have fully fragmented their corresponding
162 // tombstones.
163 cur_end_keys.clear();
164 }
165 cur_start_key = next_start_key;
166 };
167
168 pinned_iters_mgr_.StartPinning();
169
170 bool no_tombstones = true;
171 for (unfragmented_tombstones->SeekToFirst(); unfragmented_tombstones->Valid();
172 unfragmented_tombstones->Next()) {
173 const Slice& ikey = unfragmented_tombstones->key();
174 Slice tombstone_start_key = ExtractUserKey(ikey);
175 SequenceNumber tombstone_seq = GetInternalKeySeqno(ikey);
176 if (!unfragmented_tombstones->IsKeyPinned()) {
177 pinned_slices_.emplace_back(tombstone_start_key.data(),
178 tombstone_start_key.size());
179 tombstone_start_key = pinned_slices_.back();
180 }
181 no_tombstones = false;
182
183 Slice tombstone_end_key = unfragmented_tombstones->value();
184 if (!unfragmented_tombstones->IsValuePinned()) {
185 pinned_slices_.emplace_back(tombstone_end_key.data(),
186 tombstone_end_key.size());
187 tombstone_end_key = pinned_slices_.back();
188 }
189 if (!cur_end_keys.empty() && icmp.user_comparator()->Compare(
190 cur_start_key, tombstone_start_key) != 0) {
191 // The start key has changed. Flush all tombstones that start before
192 // this new start key.
193 flush_current_tombstones(tombstone_start_key);
194 }
195 cur_start_key = tombstone_start_key;
196
197 cur_end_keys.emplace(tombstone_end_key, tombstone_seq, kTypeRangeDeletion);
198 }
199 if (!cur_end_keys.empty()) {
200 ParsedInternalKey last_end_key = *std::prev(cur_end_keys.end());
201 flush_current_tombstones(last_end_key.user_key);
202 }
203
204 if (!no_tombstones) {
205 pinned_iters_mgr_.PinIterator(unfragmented_tombstones.release(),
206 false /* arena */);
207 }
208 }
209
ContainsRange(SequenceNumber lower,SequenceNumber upper) const210 bool FragmentedRangeTombstoneList::ContainsRange(SequenceNumber lower,
211 SequenceNumber upper) const {
212 auto seq_it = seq_set_.lower_bound(lower);
213 return seq_it != seq_set_.end() && *seq_it <= upper;
214 }
215
FragmentedRangeTombstoneIterator(const FragmentedRangeTombstoneList * tombstones,const InternalKeyComparator & icmp,SequenceNumber _upper_bound,SequenceNumber _lower_bound)216 FragmentedRangeTombstoneIterator::FragmentedRangeTombstoneIterator(
217 const FragmentedRangeTombstoneList* tombstones,
218 const InternalKeyComparator& icmp, SequenceNumber _upper_bound,
219 SequenceNumber _lower_bound)
220 : tombstone_start_cmp_(icmp.user_comparator()),
221 tombstone_end_cmp_(icmp.user_comparator()),
222 icmp_(&icmp),
223 ucmp_(icmp.user_comparator()),
224 tombstones_(tombstones),
225 upper_bound_(_upper_bound),
226 lower_bound_(_lower_bound) {
227 assert(tombstones_ != nullptr);
228 Invalidate();
229 }
230
FragmentedRangeTombstoneIterator(const std::shared_ptr<const FragmentedRangeTombstoneList> & tombstones,const InternalKeyComparator & icmp,SequenceNumber _upper_bound,SequenceNumber _lower_bound)231 FragmentedRangeTombstoneIterator::FragmentedRangeTombstoneIterator(
232 const std::shared_ptr<const FragmentedRangeTombstoneList>& tombstones,
233 const InternalKeyComparator& icmp, SequenceNumber _upper_bound,
234 SequenceNumber _lower_bound)
235 : tombstone_start_cmp_(icmp.user_comparator()),
236 tombstone_end_cmp_(icmp.user_comparator()),
237 icmp_(&icmp),
238 ucmp_(icmp.user_comparator()),
239 tombstones_ref_(tombstones),
240 tombstones_(tombstones_ref_.get()),
241 upper_bound_(_upper_bound),
242 lower_bound_(_lower_bound) {
243 assert(tombstones_ != nullptr);
244 Invalidate();
245 }
246
SeekToFirst()247 void FragmentedRangeTombstoneIterator::SeekToFirst() {
248 pos_ = tombstones_->begin();
249 seq_pos_ = tombstones_->seq_begin();
250 }
251
SeekToTopFirst()252 void FragmentedRangeTombstoneIterator::SeekToTopFirst() {
253 if (tombstones_->empty()) {
254 Invalidate();
255 return;
256 }
257 pos_ = tombstones_->begin();
258 seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
259 tombstones_->seq_iter(pos_->seq_end_idx),
260 upper_bound_, std::greater<SequenceNumber>());
261 ScanForwardToVisibleTombstone();
262 }
263
SeekToLast()264 void FragmentedRangeTombstoneIterator::SeekToLast() {
265 pos_ = std::prev(tombstones_->end());
266 seq_pos_ = std::prev(tombstones_->seq_end());
267 }
268
SeekToTopLast()269 void FragmentedRangeTombstoneIterator::SeekToTopLast() {
270 if (tombstones_->empty()) {
271 Invalidate();
272 return;
273 }
274 pos_ = std::prev(tombstones_->end());
275 seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
276 tombstones_->seq_iter(pos_->seq_end_idx),
277 upper_bound_, std::greater<SequenceNumber>());
278 ScanBackwardToVisibleTombstone();
279 }
280
Seek(const Slice & target)281 void FragmentedRangeTombstoneIterator::Seek(const Slice& target) {
282 if (tombstones_->empty()) {
283 Invalidate();
284 return;
285 }
286 SeekToCoveringTombstone(target);
287 ScanForwardToVisibleTombstone();
288 }
289
SeekForPrev(const Slice & target)290 void FragmentedRangeTombstoneIterator::SeekForPrev(const Slice& target) {
291 if (tombstones_->empty()) {
292 Invalidate();
293 return;
294 }
295 SeekForPrevToCoveringTombstone(target);
296 ScanBackwardToVisibleTombstone();
297 }
298
SeekToCoveringTombstone(const Slice & target)299 void FragmentedRangeTombstoneIterator::SeekToCoveringTombstone(
300 const Slice& target) {
301 pos_ = std::upper_bound(tombstones_->begin(), tombstones_->end(), target,
302 tombstone_end_cmp_);
303 if (pos_ == tombstones_->end()) {
304 // All tombstones end before target.
305 seq_pos_ = tombstones_->seq_end();
306 return;
307 }
308 seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
309 tombstones_->seq_iter(pos_->seq_end_idx),
310 upper_bound_, std::greater<SequenceNumber>());
311 }
312
SeekForPrevToCoveringTombstone(const Slice & target)313 void FragmentedRangeTombstoneIterator::SeekForPrevToCoveringTombstone(
314 const Slice& target) {
315 if (tombstones_->empty()) {
316 Invalidate();
317 return;
318 }
319 pos_ = std::upper_bound(tombstones_->begin(), tombstones_->end(), target,
320 tombstone_start_cmp_);
321 if (pos_ == tombstones_->begin()) {
322 // All tombstones start after target.
323 Invalidate();
324 return;
325 }
326 --pos_;
327 seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
328 tombstones_->seq_iter(pos_->seq_end_idx),
329 upper_bound_, std::greater<SequenceNumber>());
330 }
331
ScanForwardToVisibleTombstone()332 void FragmentedRangeTombstoneIterator::ScanForwardToVisibleTombstone() {
333 while (pos_ != tombstones_->end() &&
334 (seq_pos_ == tombstones_->seq_iter(pos_->seq_end_idx) ||
335 *seq_pos_ < lower_bound_)) {
336 ++pos_;
337 if (pos_ == tombstones_->end()) {
338 Invalidate();
339 return;
340 }
341 seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
342 tombstones_->seq_iter(pos_->seq_end_idx),
343 upper_bound_, std::greater<SequenceNumber>());
344 }
345 }
346
ScanBackwardToVisibleTombstone()347 void FragmentedRangeTombstoneIterator::ScanBackwardToVisibleTombstone() {
348 while (pos_ != tombstones_->end() &&
349 (seq_pos_ == tombstones_->seq_iter(pos_->seq_end_idx) ||
350 *seq_pos_ < lower_bound_)) {
351 if (pos_ == tombstones_->begin()) {
352 Invalidate();
353 return;
354 }
355 --pos_;
356 seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
357 tombstones_->seq_iter(pos_->seq_end_idx),
358 upper_bound_, std::greater<SequenceNumber>());
359 }
360 }
361
Next()362 void FragmentedRangeTombstoneIterator::Next() {
363 ++seq_pos_;
364 if (seq_pos_ == tombstones_->seq_iter(pos_->seq_end_idx)) {
365 ++pos_;
366 }
367 }
368
TopNext()369 void FragmentedRangeTombstoneIterator::TopNext() {
370 ++pos_;
371 if (pos_ == tombstones_->end()) {
372 return;
373 }
374 seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
375 tombstones_->seq_iter(pos_->seq_end_idx),
376 upper_bound_, std::greater<SequenceNumber>());
377 ScanForwardToVisibleTombstone();
378 }
379
Prev()380 void FragmentedRangeTombstoneIterator::Prev() {
381 if (seq_pos_ == tombstones_->seq_begin()) {
382 Invalidate();
383 return;
384 }
385 --seq_pos_;
386 if (pos_ == tombstones_->end() ||
387 seq_pos_ == tombstones_->seq_iter(pos_->seq_start_idx - 1)) {
388 --pos_;
389 }
390 }
391
TopPrev()392 void FragmentedRangeTombstoneIterator::TopPrev() {
393 if (pos_ == tombstones_->begin()) {
394 Invalidate();
395 return;
396 }
397 --pos_;
398 seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
399 tombstones_->seq_iter(pos_->seq_end_idx),
400 upper_bound_, std::greater<SequenceNumber>());
401 ScanBackwardToVisibleTombstone();
402 }
403
Valid() const404 bool FragmentedRangeTombstoneIterator::Valid() const {
405 return tombstones_ != nullptr && pos_ != tombstones_->end();
406 }
407
MaxCoveringTombstoneSeqnum(const Slice & target_user_key)408 SequenceNumber FragmentedRangeTombstoneIterator::MaxCoveringTombstoneSeqnum(
409 const Slice& target_user_key) {
410 SeekToCoveringTombstone(target_user_key);
411 return ValidPos() && ucmp_->Compare(start_key(), target_user_key) <= 0 ? seq()
412 : 0;
413 }
414
415 std::map<SequenceNumber, std::unique_ptr<FragmentedRangeTombstoneIterator>>
SplitBySnapshot(const std::vector<SequenceNumber> & snapshots)416 FragmentedRangeTombstoneIterator::SplitBySnapshot(
417 const std::vector<SequenceNumber>& snapshots) {
418 std::map<SequenceNumber, std::unique_ptr<FragmentedRangeTombstoneIterator>>
419 splits;
420 SequenceNumber lower = 0;
421 SequenceNumber upper;
422 for (size_t i = 0; i <= snapshots.size(); i++) {
423 if (i >= snapshots.size()) {
424 upper = kMaxSequenceNumber;
425 } else {
426 upper = snapshots[i];
427 }
428 if (tombstones_->ContainsRange(lower, upper)) {
429 splits.emplace(upper, std::unique_ptr<FragmentedRangeTombstoneIterator>(
430 new FragmentedRangeTombstoneIterator(
431 tombstones_, *icmp_, upper, lower)));
432 }
433 lower = upper + 1;
434 }
435 return splits;
436 }
437
438 } // namespace ROCKSDB_NAMESPACE
439