1 //  Copyright (c) 2018-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #include "db/range_tombstone_fragmenter.h"
7 
8 #include <algorithm>
9 #include <cinttypes>
10 #include <cstdio>
11 #include <functional>
12 #include <set>
13 
14 #include "util/autovector.h"
15 #include "util/kv_map.h"
16 #include "util/vector_iterator.h"
17 
18 namespace ROCKSDB_NAMESPACE {
19 
FragmentedRangeTombstoneList(std::unique_ptr<InternalIterator> unfragmented_tombstones,const InternalKeyComparator & icmp,bool for_compaction,const std::vector<SequenceNumber> & snapshots)20 FragmentedRangeTombstoneList::FragmentedRangeTombstoneList(
21     std::unique_ptr<InternalIterator> unfragmented_tombstones,
22     const InternalKeyComparator& icmp, bool for_compaction,
23     const std::vector<SequenceNumber>& snapshots) {
24   if (unfragmented_tombstones == nullptr) {
25     return;
26   }
27   bool is_sorted = true;
28   InternalKey pinned_last_start_key;
29   Slice last_start_key;
30   num_unfragmented_tombstones_ = 0;
31   for (unfragmented_tombstones->SeekToFirst(); unfragmented_tombstones->Valid();
32        unfragmented_tombstones->Next(), num_unfragmented_tombstones_++) {
33     if (num_unfragmented_tombstones_ > 0 &&
34         icmp.Compare(last_start_key, unfragmented_tombstones->key()) > 0) {
35       is_sorted = false;
36       break;
37     }
38     if (unfragmented_tombstones->IsKeyPinned()) {
39       last_start_key = unfragmented_tombstones->key();
40     } else {
41       pinned_last_start_key.DecodeFrom(unfragmented_tombstones->key());
42       last_start_key = pinned_last_start_key.Encode();
43     }
44   }
45   if (is_sorted) {
46     FragmentTombstones(std::move(unfragmented_tombstones), icmp, for_compaction,
47                        snapshots);
48     return;
49   }
50 
51   // Sort the tombstones before fragmenting them.
52   std::vector<std::string> keys, values;
53   keys.reserve(num_unfragmented_tombstones_);
54   values.reserve(num_unfragmented_tombstones_);
55   for (unfragmented_tombstones->SeekToFirst(); unfragmented_tombstones->Valid();
56        unfragmented_tombstones->Next()) {
57     keys.emplace_back(unfragmented_tombstones->key().data(),
58                       unfragmented_tombstones->key().size());
59     values.emplace_back(unfragmented_tombstones->value().data(),
60                         unfragmented_tombstones->value().size());
61   }
62   // VectorIterator implicitly sorts by key during construction.
63   auto iter = std::unique_ptr<VectorIterator>(
64       new VectorIterator(std::move(keys), std::move(values), &icmp));
65   FragmentTombstones(std::move(iter), icmp, for_compaction, snapshots);
66 }
67 
FragmentTombstones(std::unique_ptr<InternalIterator> unfragmented_tombstones,const InternalKeyComparator & icmp,bool for_compaction,const std::vector<SequenceNumber> & snapshots)68 void FragmentedRangeTombstoneList::FragmentTombstones(
69     std::unique_ptr<InternalIterator> unfragmented_tombstones,
70     const InternalKeyComparator& icmp, bool for_compaction,
71     const std::vector<SequenceNumber>& snapshots) {
72   Slice cur_start_key(nullptr, 0);
73   auto cmp = ParsedInternalKeyComparator(&icmp);
74 
75   // Stores the end keys and sequence numbers of range tombstones with a start
76   // key less than or equal to cur_start_key. Provides an ordering by end key
77   // for use in flush_current_tombstones.
78   std::set<ParsedInternalKey, ParsedInternalKeyComparator> cur_end_keys(cmp);
79 
80   // Given the next start key in unfragmented_tombstones,
81   // flush_current_tombstones writes every tombstone fragment that starts
82   // and ends with a key before next_start_key, and starts with a key greater
83   // than or equal to cur_start_key.
84   auto flush_current_tombstones = [&](const Slice& next_start_key) {
85     auto it = cur_end_keys.begin();
86     bool reached_next_start_key = false;
87     for (; it != cur_end_keys.end() && !reached_next_start_key; ++it) {
88       Slice cur_end_key = it->user_key;
89       if (icmp.user_comparator()->Compare(cur_start_key, cur_end_key) == 0) {
90         // Empty tombstone.
91         continue;
92       }
93       if (icmp.user_comparator()->Compare(next_start_key, cur_end_key) <= 0) {
94         // All of the end keys in [it, cur_end_keys.end()) are after
95         // next_start_key, so the tombstones they represent can be used in
96         // fragments that start with keys greater than or equal to
97         // next_start_key. However, the end keys we already passed will not be
98         // used in any more tombstone fragments.
99         //
100         // Remove the fully fragmented tombstones and stop iteration after a
101         // final round of flushing to preserve the tombstones we can create more
102         // fragments from.
103         reached_next_start_key = true;
104         cur_end_keys.erase(cur_end_keys.begin(), it);
105         cur_end_key = next_start_key;
106       }
107 
108       // Flush a range tombstone fragment [cur_start_key, cur_end_key), which
109       // should not overlap with the last-flushed tombstone fragment.
110       assert(tombstones_.empty() ||
111              icmp.user_comparator()->Compare(tombstones_.back().end_key,
112                                              cur_start_key) <= 0);
113 
114       // Sort the sequence numbers of the tombstones being fragmented in
115       // descending order, and then flush them in that order.
116       autovector<SequenceNumber> seqnums_to_flush;
117       for (auto flush_it = it; flush_it != cur_end_keys.end(); ++flush_it) {
118         seqnums_to_flush.push_back(flush_it->sequence);
119       }
120       std::sort(seqnums_to_flush.begin(), seqnums_to_flush.end(),
121                 std::greater<SequenceNumber>());
122 
123       size_t start_idx = tombstone_seqs_.size();
124       size_t end_idx = start_idx + seqnums_to_flush.size();
125 
126       if (for_compaction) {
127         // Drop all tombstone seqnums that are not preserved by a snapshot.
128         SequenceNumber next_snapshot = kMaxSequenceNumber;
129         for (auto seq : seqnums_to_flush) {
130           if (seq <= next_snapshot) {
131             // This seqnum is visible by a lower snapshot.
132             tombstone_seqs_.push_back(seq);
133             seq_set_.insert(seq);
134             auto upper_bound_it =
135                 std::lower_bound(snapshots.begin(), snapshots.end(), seq);
136             if (upper_bound_it == snapshots.begin()) {
137               // This seqnum is the topmost one visible by the earliest
138               // snapshot. None of the seqnums below it will be visible, so we
139               // can skip them.
140               break;
141             }
142             next_snapshot = *std::prev(upper_bound_it);
143           }
144         }
145         end_idx = tombstone_seqs_.size();
146       } else {
147         // The fragmentation is being done for reads, so preserve all seqnums.
148         tombstone_seqs_.insert(tombstone_seqs_.end(), seqnums_to_flush.begin(),
149                                seqnums_to_flush.end());
150         seq_set_.insert(seqnums_to_flush.begin(), seqnums_to_flush.end());
151       }
152 
153       assert(start_idx < end_idx);
154       tombstones_.emplace_back(cur_start_key, cur_end_key, start_idx, end_idx);
155 
156       cur_start_key = cur_end_key;
157     }
158     if (!reached_next_start_key) {
159       // There is a gap between the last flushed tombstone fragment and
160       // the next tombstone's start key. Remove all the end keys in
161       // the working set, since we have fully fragmented their corresponding
162       // tombstones.
163       cur_end_keys.clear();
164     }
165     cur_start_key = next_start_key;
166   };
167 
168   pinned_iters_mgr_.StartPinning();
169 
170   bool no_tombstones = true;
171   for (unfragmented_tombstones->SeekToFirst(); unfragmented_tombstones->Valid();
172        unfragmented_tombstones->Next()) {
173     const Slice& ikey = unfragmented_tombstones->key();
174     Slice tombstone_start_key = ExtractUserKey(ikey);
175     SequenceNumber tombstone_seq = GetInternalKeySeqno(ikey);
176     if (!unfragmented_tombstones->IsKeyPinned()) {
177       pinned_slices_.emplace_back(tombstone_start_key.data(),
178                                   tombstone_start_key.size());
179       tombstone_start_key = pinned_slices_.back();
180     }
181     no_tombstones = false;
182 
183     Slice tombstone_end_key = unfragmented_tombstones->value();
184     if (!unfragmented_tombstones->IsValuePinned()) {
185       pinned_slices_.emplace_back(tombstone_end_key.data(),
186                                   tombstone_end_key.size());
187       tombstone_end_key = pinned_slices_.back();
188     }
189     if (!cur_end_keys.empty() && icmp.user_comparator()->Compare(
190                                      cur_start_key, tombstone_start_key) != 0) {
191       // The start key has changed. Flush all tombstones that start before
192       // this new start key.
193       flush_current_tombstones(tombstone_start_key);
194     }
195     cur_start_key = tombstone_start_key;
196 
197     cur_end_keys.emplace(tombstone_end_key, tombstone_seq, kTypeRangeDeletion);
198   }
199   if (!cur_end_keys.empty()) {
200     ParsedInternalKey last_end_key = *std::prev(cur_end_keys.end());
201     flush_current_tombstones(last_end_key.user_key);
202   }
203 
204   if (!no_tombstones) {
205     pinned_iters_mgr_.PinIterator(unfragmented_tombstones.release(),
206                                   false /* arena */);
207   }
208 }
209 
ContainsRange(SequenceNumber lower,SequenceNumber upper) const210 bool FragmentedRangeTombstoneList::ContainsRange(SequenceNumber lower,
211                                                  SequenceNumber upper) const {
212   auto seq_it = seq_set_.lower_bound(lower);
213   return seq_it != seq_set_.end() && *seq_it <= upper;
214 }
215 
FragmentedRangeTombstoneIterator(const FragmentedRangeTombstoneList * tombstones,const InternalKeyComparator & icmp,SequenceNumber _upper_bound,SequenceNumber _lower_bound)216 FragmentedRangeTombstoneIterator::FragmentedRangeTombstoneIterator(
217     const FragmentedRangeTombstoneList* tombstones,
218     const InternalKeyComparator& icmp, SequenceNumber _upper_bound,
219     SequenceNumber _lower_bound)
220     : tombstone_start_cmp_(icmp.user_comparator()),
221       tombstone_end_cmp_(icmp.user_comparator()),
222       icmp_(&icmp),
223       ucmp_(icmp.user_comparator()),
224       tombstones_(tombstones),
225       upper_bound_(_upper_bound),
226       lower_bound_(_lower_bound) {
227   assert(tombstones_ != nullptr);
228   Invalidate();
229 }
230 
FragmentedRangeTombstoneIterator(const std::shared_ptr<const FragmentedRangeTombstoneList> & tombstones,const InternalKeyComparator & icmp,SequenceNumber _upper_bound,SequenceNumber _lower_bound)231 FragmentedRangeTombstoneIterator::FragmentedRangeTombstoneIterator(
232     const std::shared_ptr<const FragmentedRangeTombstoneList>& tombstones,
233     const InternalKeyComparator& icmp, SequenceNumber _upper_bound,
234     SequenceNumber _lower_bound)
235     : tombstone_start_cmp_(icmp.user_comparator()),
236       tombstone_end_cmp_(icmp.user_comparator()),
237       icmp_(&icmp),
238       ucmp_(icmp.user_comparator()),
239       tombstones_ref_(tombstones),
240       tombstones_(tombstones_ref_.get()),
241       upper_bound_(_upper_bound),
242       lower_bound_(_lower_bound) {
243   assert(tombstones_ != nullptr);
244   Invalidate();
245 }
246 
SeekToFirst()247 void FragmentedRangeTombstoneIterator::SeekToFirst() {
248   pos_ = tombstones_->begin();
249   seq_pos_ = tombstones_->seq_begin();
250 }
251 
SeekToTopFirst()252 void FragmentedRangeTombstoneIterator::SeekToTopFirst() {
253   if (tombstones_->empty()) {
254     Invalidate();
255     return;
256   }
257   pos_ = tombstones_->begin();
258   seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
259                               tombstones_->seq_iter(pos_->seq_end_idx),
260                               upper_bound_, std::greater<SequenceNumber>());
261   ScanForwardToVisibleTombstone();
262 }
263 
SeekToLast()264 void FragmentedRangeTombstoneIterator::SeekToLast() {
265   pos_ = std::prev(tombstones_->end());
266   seq_pos_ = std::prev(tombstones_->seq_end());
267 }
268 
SeekToTopLast()269 void FragmentedRangeTombstoneIterator::SeekToTopLast() {
270   if (tombstones_->empty()) {
271     Invalidate();
272     return;
273   }
274   pos_ = std::prev(tombstones_->end());
275   seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
276                               tombstones_->seq_iter(pos_->seq_end_idx),
277                               upper_bound_, std::greater<SequenceNumber>());
278   ScanBackwardToVisibleTombstone();
279 }
280 
Seek(const Slice & target)281 void FragmentedRangeTombstoneIterator::Seek(const Slice& target) {
282   if (tombstones_->empty()) {
283     Invalidate();
284     return;
285   }
286   SeekToCoveringTombstone(target);
287   ScanForwardToVisibleTombstone();
288 }
289 
SeekForPrev(const Slice & target)290 void FragmentedRangeTombstoneIterator::SeekForPrev(const Slice& target) {
291   if (tombstones_->empty()) {
292     Invalidate();
293     return;
294   }
295   SeekForPrevToCoveringTombstone(target);
296   ScanBackwardToVisibleTombstone();
297 }
298 
SeekToCoveringTombstone(const Slice & target)299 void FragmentedRangeTombstoneIterator::SeekToCoveringTombstone(
300     const Slice& target) {
301   pos_ = std::upper_bound(tombstones_->begin(), tombstones_->end(), target,
302                           tombstone_end_cmp_);
303   if (pos_ == tombstones_->end()) {
304     // All tombstones end before target.
305     seq_pos_ = tombstones_->seq_end();
306     return;
307   }
308   seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
309                               tombstones_->seq_iter(pos_->seq_end_idx),
310                               upper_bound_, std::greater<SequenceNumber>());
311 }
312 
SeekForPrevToCoveringTombstone(const Slice & target)313 void FragmentedRangeTombstoneIterator::SeekForPrevToCoveringTombstone(
314     const Slice& target) {
315   if (tombstones_->empty()) {
316     Invalidate();
317     return;
318   }
319   pos_ = std::upper_bound(tombstones_->begin(), tombstones_->end(), target,
320                           tombstone_start_cmp_);
321   if (pos_ == tombstones_->begin()) {
322     // All tombstones start after target.
323     Invalidate();
324     return;
325   }
326   --pos_;
327   seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
328                               tombstones_->seq_iter(pos_->seq_end_idx),
329                               upper_bound_, std::greater<SequenceNumber>());
330 }
331 
ScanForwardToVisibleTombstone()332 void FragmentedRangeTombstoneIterator::ScanForwardToVisibleTombstone() {
333   while (pos_ != tombstones_->end() &&
334          (seq_pos_ == tombstones_->seq_iter(pos_->seq_end_idx) ||
335           *seq_pos_ < lower_bound_)) {
336     ++pos_;
337     if (pos_ == tombstones_->end()) {
338       Invalidate();
339       return;
340     }
341     seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
342                                 tombstones_->seq_iter(pos_->seq_end_idx),
343                                 upper_bound_, std::greater<SequenceNumber>());
344   }
345 }
346 
ScanBackwardToVisibleTombstone()347 void FragmentedRangeTombstoneIterator::ScanBackwardToVisibleTombstone() {
348   while (pos_ != tombstones_->end() &&
349          (seq_pos_ == tombstones_->seq_iter(pos_->seq_end_idx) ||
350           *seq_pos_ < lower_bound_)) {
351     if (pos_ == tombstones_->begin()) {
352       Invalidate();
353       return;
354     }
355     --pos_;
356     seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
357                                 tombstones_->seq_iter(pos_->seq_end_idx),
358                                 upper_bound_, std::greater<SequenceNumber>());
359   }
360 }
361 
Next()362 void FragmentedRangeTombstoneIterator::Next() {
363   ++seq_pos_;
364   if (seq_pos_ == tombstones_->seq_iter(pos_->seq_end_idx)) {
365     ++pos_;
366   }
367 }
368 
TopNext()369 void FragmentedRangeTombstoneIterator::TopNext() {
370   ++pos_;
371   if (pos_ == tombstones_->end()) {
372     return;
373   }
374   seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
375                               tombstones_->seq_iter(pos_->seq_end_idx),
376                               upper_bound_, std::greater<SequenceNumber>());
377   ScanForwardToVisibleTombstone();
378 }
379 
Prev()380 void FragmentedRangeTombstoneIterator::Prev() {
381   if (seq_pos_ == tombstones_->seq_begin()) {
382     Invalidate();
383     return;
384   }
385   --seq_pos_;
386   if (pos_ == tombstones_->end() ||
387       seq_pos_ == tombstones_->seq_iter(pos_->seq_start_idx - 1)) {
388     --pos_;
389   }
390 }
391 
TopPrev()392 void FragmentedRangeTombstoneIterator::TopPrev() {
393   if (pos_ == tombstones_->begin()) {
394     Invalidate();
395     return;
396   }
397   --pos_;
398   seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
399                               tombstones_->seq_iter(pos_->seq_end_idx),
400                               upper_bound_, std::greater<SequenceNumber>());
401   ScanBackwardToVisibleTombstone();
402 }
403 
Valid() const404 bool FragmentedRangeTombstoneIterator::Valid() const {
405   return tombstones_ != nullptr && pos_ != tombstones_->end();
406 }
407 
MaxCoveringTombstoneSeqnum(const Slice & target_user_key)408 SequenceNumber FragmentedRangeTombstoneIterator::MaxCoveringTombstoneSeqnum(
409     const Slice& target_user_key) {
410   SeekToCoveringTombstone(target_user_key);
411   return ValidPos() && ucmp_->Compare(start_key(), target_user_key) <= 0 ? seq()
412                                                                          : 0;
413 }
414 
415 std::map<SequenceNumber, std::unique_ptr<FragmentedRangeTombstoneIterator>>
SplitBySnapshot(const std::vector<SequenceNumber> & snapshots)416 FragmentedRangeTombstoneIterator::SplitBySnapshot(
417     const std::vector<SequenceNumber>& snapshots) {
418   std::map<SequenceNumber, std::unique_ptr<FragmentedRangeTombstoneIterator>>
419       splits;
420   SequenceNumber lower = 0;
421   SequenceNumber upper;
422   for (size_t i = 0; i <= snapshots.size(); i++) {
423     if (i >= snapshots.size()) {
424       upper = kMaxSequenceNumber;
425     } else {
426       upper = snapshots[i];
427     }
428     if (tombstones_->ContainsRange(lower, upper)) {
429       splits.emplace(upper, std::unique_ptr<FragmentedRangeTombstoneIterator>(
430                                 new FragmentedRangeTombstoneIterator(
431                                     tombstones_, *icmp_, upper, lower)));
432     }
433     lower = upper + 1;
434   }
435   return splits;
436 }
437 
438 }  // namespace ROCKSDB_NAMESPACE
439