1 //  Copyright (c) 2018-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #include "db/range_tombstone_fragmenter.h"
7 
8 #include <algorithm>
9 #include <functional>
10 #include <set>
11 
12 #include <stdio.h>
13 #include <cinttypes>
14 
15 #include "util/autovector.h"
16 #include "util/kv_map.h"
17 #include "util/vector_iterator.h"
18 
19 namespace ROCKSDB_NAMESPACE {
20 
FragmentedRangeTombstoneList(std::unique_ptr<InternalIterator> unfragmented_tombstones,const InternalKeyComparator & icmp,bool for_compaction,const std::vector<SequenceNumber> & snapshots)21 FragmentedRangeTombstoneList::FragmentedRangeTombstoneList(
22     std::unique_ptr<InternalIterator> unfragmented_tombstones,
23     const InternalKeyComparator& icmp, bool for_compaction,
24     const std::vector<SequenceNumber>& snapshots) {
25   if (unfragmented_tombstones == nullptr) {
26     return;
27   }
28   bool is_sorted = true;
29   int num_tombstones = 0;
30   InternalKey pinned_last_start_key;
31   Slice last_start_key;
32   for (unfragmented_tombstones->SeekToFirst(); unfragmented_tombstones->Valid();
33        unfragmented_tombstones->Next(), num_tombstones++) {
34     if (num_tombstones > 0 &&
35         icmp.Compare(last_start_key, unfragmented_tombstones->key()) > 0) {
36       is_sorted = false;
37       break;
38     }
39     if (unfragmented_tombstones->IsKeyPinned()) {
40       last_start_key = unfragmented_tombstones->key();
41     } else {
42       pinned_last_start_key.DecodeFrom(unfragmented_tombstones->key());
43       last_start_key = pinned_last_start_key.Encode();
44     }
45   }
46   if (is_sorted) {
47     FragmentTombstones(std::move(unfragmented_tombstones), icmp, for_compaction,
48                        snapshots);
49     return;
50   }
51 
52   // Sort the tombstones before fragmenting them.
53   std::vector<std::string> keys, values;
54   keys.reserve(num_tombstones);
55   values.reserve(num_tombstones);
56   for (unfragmented_tombstones->SeekToFirst(); unfragmented_tombstones->Valid();
57        unfragmented_tombstones->Next()) {
58     keys.emplace_back(unfragmented_tombstones->key().data(),
59                       unfragmented_tombstones->key().size());
60     values.emplace_back(unfragmented_tombstones->value().data(),
61                         unfragmented_tombstones->value().size());
62   }
63   // VectorIterator implicitly sorts by key during construction.
64   auto iter = std::unique_ptr<VectorIterator>(
65       new VectorIterator(std::move(keys), std::move(values), &icmp));
66   FragmentTombstones(std::move(iter), icmp, for_compaction, snapshots);
67 }
68 
FragmentTombstones(std::unique_ptr<InternalIterator> unfragmented_tombstones,const InternalKeyComparator & icmp,bool for_compaction,const std::vector<SequenceNumber> & snapshots)69 void FragmentedRangeTombstoneList::FragmentTombstones(
70     std::unique_ptr<InternalIterator> unfragmented_tombstones,
71     const InternalKeyComparator& icmp, bool for_compaction,
72     const std::vector<SequenceNumber>& snapshots) {
73   Slice cur_start_key(nullptr, 0);
74   auto cmp = ParsedInternalKeyComparator(&icmp);
75 
76   // Stores the end keys and sequence numbers of range tombstones with a start
77   // key less than or equal to cur_start_key. Provides an ordering by end key
78   // for use in flush_current_tombstones.
79   std::set<ParsedInternalKey, ParsedInternalKeyComparator> cur_end_keys(cmp);
80 
81   // Given the next start key in unfragmented_tombstones,
82   // flush_current_tombstones writes every tombstone fragment that starts
83   // and ends with a key before next_start_key, and starts with a key greater
84   // than or equal to cur_start_key.
85   auto flush_current_tombstones = [&](const Slice& next_start_key) {
86     auto it = cur_end_keys.begin();
87     bool reached_next_start_key = false;
88     for (; it != cur_end_keys.end() && !reached_next_start_key; ++it) {
89       Slice cur_end_key = it->user_key;
90       if (icmp.user_comparator()->Compare(cur_start_key, cur_end_key) == 0) {
91         // Empty tombstone.
92         continue;
93       }
94       if (icmp.user_comparator()->Compare(next_start_key, cur_end_key) <= 0) {
95         // All of the end keys in [it, cur_end_keys.end()) are after
96         // next_start_key, so the tombstones they represent can be used in
97         // fragments that start with keys greater than or equal to
98         // next_start_key. However, the end keys we already passed will not be
99         // used in any more tombstone fragments.
100         //
101         // Remove the fully fragmented tombstones and stop iteration after a
102         // final round of flushing to preserve the tombstones we can create more
103         // fragments from.
104         reached_next_start_key = true;
105         cur_end_keys.erase(cur_end_keys.begin(), it);
106         cur_end_key = next_start_key;
107       }
108 
109       // Flush a range tombstone fragment [cur_start_key, cur_end_key), which
110       // should not overlap with the last-flushed tombstone fragment.
111       assert(tombstones_.empty() ||
112              icmp.user_comparator()->Compare(tombstones_.back().end_key,
113                                              cur_start_key) <= 0);
114 
115       // Sort the sequence numbers of the tombstones being fragmented in
116       // descending order, and then flush them in that order.
117       autovector<SequenceNumber> seqnums_to_flush;
118       for (auto flush_it = it; flush_it != cur_end_keys.end(); ++flush_it) {
119         seqnums_to_flush.push_back(flush_it->sequence);
120       }
121       std::sort(seqnums_to_flush.begin(), seqnums_to_flush.end(),
122                 std::greater<SequenceNumber>());
123 
124       size_t start_idx = tombstone_seqs_.size();
125       size_t end_idx = start_idx + seqnums_to_flush.size();
126 
127       if (for_compaction) {
128         // Drop all tombstone seqnums that are not preserved by a snapshot.
129         SequenceNumber next_snapshot = kMaxSequenceNumber;
130         for (auto seq : seqnums_to_flush) {
131           if (seq <= next_snapshot) {
132             // This seqnum is visible by a lower snapshot.
133             tombstone_seqs_.push_back(seq);
134             seq_set_.insert(seq);
135             auto upper_bound_it =
136                 std::lower_bound(snapshots.begin(), snapshots.end(), seq);
137             if (upper_bound_it == snapshots.begin()) {
138               // This seqnum is the topmost one visible by the earliest
139               // snapshot. None of the seqnums below it will be visible, so we
140               // can skip them.
141               break;
142             }
143             next_snapshot = *std::prev(upper_bound_it);
144           }
145         }
146         end_idx = tombstone_seqs_.size();
147       } else {
148         // The fragmentation is being done for reads, so preserve all seqnums.
149         tombstone_seqs_.insert(tombstone_seqs_.end(), seqnums_to_flush.begin(),
150                                seqnums_to_flush.end());
151         seq_set_.insert(seqnums_to_flush.begin(), seqnums_to_flush.end());
152       }
153 
154       assert(start_idx < end_idx);
155       tombstones_.emplace_back(cur_start_key, cur_end_key, start_idx, end_idx);
156 
157       cur_start_key = cur_end_key;
158     }
159     if (!reached_next_start_key) {
160       // There is a gap between the last flushed tombstone fragment and
161       // the next tombstone's start key. Remove all the end keys in
162       // the working set, since we have fully fragmented their corresponding
163       // tombstones.
164       cur_end_keys.clear();
165     }
166     cur_start_key = next_start_key;
167   };
168 
169   pinned_iters_mgr_.StartPinning();
170 
171   bool no_tombstones = true;
172   for (unfragmented_tombstones->SeekToFirst(); unfragmented_tombstones->Valid();
173        unfragmented_tombstones->Next()) {
174     const Slice& ikey = unfragmented_tombstones->key();
175     Slice tombstone_start_key = ExtractUserKey(ikey);
176     SequenceNumber tombstone_seq = GetInternalKeySeqno(ikey);
177     if (!unfragmented_tombstones->IsKeyPinned()) {
178       pinned_slices_.emplace_back(tombstone_start_key.data(),
179                                   tombstone_start_key.size());
180       tombstone_start_key = pinned_slices_.back();
181     }
182     no_tombstones = false;
183 
184     Slice tombstone_end_key = unfragmented_tombstones->value();
185     if (!unfragmented_tombstones->IsValuePinned()) {
186       pinned_slices_.emplace_back(tombstone_end_key.data(),
187                                   tombstone_end_key.size());
188       tombstone_end_key = pinned_slices_.back();
189     }
190     if (!cur_end_keys.empty() && icmp.user_comparator()->Compare(
191                                      cur_start_key, tombstone_start_key) != 0) {
192       // The start key has changed. Flush all tombstones that start before
193       // this new start key.
194       flush_current_tombstones(tombstone_start_key);
195     }
196     cur_start_key = tombstone_start_key;
197 
198     cur_end_keys.emplace(tombstone_end_key, tombstone_seq, kTypeRangeDeletion);
199   }
200   if (!cur_end_keys.empty()) {
201     ParsedInternalKey last_end_key = *std::prev(cur_end_keys.end());
202     flush_current_tombstones(last_end_key.user_key);
203   }
204 
205   if (!no_tombstones) {
206     pinned_iters_mgr_.PinIterator(unfragmented_tombstones.release(),
207                                   false /* arena */);
208   }
209 }
210 
ContainsRange(SequenceNumber lower,SequenceNumber upper) const211 bool FragmentedRangeTombstoneList::ContainsRange(SequenceNumber lower,
212                                                  SequenceNumber upper) const {
213   auto seq_it = seq_set_.lower_bound(lower);
214   return seq_it != seq_set_.end() && *seq_it <= upper;
215 }
216 
FragmentedRangeTombstoneIterator(const FragmentedRangeTombstoneList * tombstones,const InternalKeyComparator & icmp,SequenceNumber _upper_bound,SequenceNumber _lower_bound)217 FragmentedRangeTombstoneIterator::FragmentedRangeTombstoneIterator(
218     const FragmentedRangeTombstoneList* tombstones,
219     const InternalKeyComparator& icmp, SequenceNumber _upper_bound,
220     SequenceNumber _lower_bound)
221     : tombstone_start_cmp_(icmp.user_comparator()),
222       tombstone_end_cmp_(icmp.user_comparator()),
223       icmp_(&icmp),
224       ucmp_(icmp.user_comparator()),
225       tombstones_(tombstones),
226       upper_bound_(_upper_bound),
227       lower_bound_(_lower_bound) {
228   assert(tombstones_ != nullptr);
229   Invalidate();
230 }
231 
FragmentedRangeTombstoneIterator(const std::shared_ptr<const FragmentedRangeTombstoneList> & tombstones,const InternalKeyComparator & icmp,SequenceNumber _upper_bound,SequenceNumber _lower_bound)232 FragmentedRangeTombstoneIterator::FragmentedRangeTombstoneIterator(
233     const std::shared_ptr<const FragmentedRangeTombstoneList>& tombstones,
234     const InternalKeyComparator& icmp, SequenceNumber _upper_bound,
235     SequenceNumber _lower_bound)
236     : tombstone_start_cmp_(icmp.user_comparator()),
237       tombstone_end_cmp_(icmp.user_comparator()),
238       icmp_(&icmp),
239       ucmp_(icmp.user_comparator()),
240       tombstones_ref_(tombstones),
241       tombstones_(tombstones_ref_.get()),
242       upper_bound_(_upper_bound),
243       lower_bound_(_lower_bound) {
244   assert(tombstones_ != nullptr);
245   Invalidate();
246 }
247 
SeekToFirst()248 void FragmentedRangeTombstoneIterator::SeekToFirst() {
249   pos_ = tombstones_->begin();
250   seq_pos_ = tombstones_->seq_begin();
251 }
252 
SeekToTopFirst()253 void FragmentedRangeTombstoneIterator::SeekToTopFirst() {
254   if (tombstones_->empty()) {
255     Invalidate();
256     return;
257   }
258   pos_ = tombstones_->begin();
259   seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
260                               tombstones_->seq_iter(pos_->seq_end_idx),
261                               upper_bound_, std::greater<SequenceNumber>());
262   ScanForwardToVisibleTombstone();
263 }
264 
SeekToLast()265 void FragmentedRangeTombstoneIterator::SeekToLast() {
266   pos_ = std::prev(tombstones_->end());
267   seq_pos_ = std::prev(tombstones_->seq_end());
268 }
269 
SeekToTopLast()270 void FragmentedRangeTombstoneIterator::SeekToTopLast() {
271   if (tombstones_->empty()) {
272     Invalidate();
273     return;
274   }
275   pos_ = std::prev(tombstones_->end());
276   seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
277                               tombstones_->seq_iter(pos_->seq_end_idx),
278                               upper_bound_, std::greater<SequenceNumber>());
279   ScanBackwardToVisibleTombstone();
280 }
281 
Seek(const Slice & target)282 void FragmentedRangeTombstoneIterator::Seek(const Slice& target) {
283   if (tombstones_->empty()) {
284     Invalidate();
285     return;
286   }
287   SeekToCoveringTombstone(target);
288   ScanForwardToVisibleTombstone();
289 }
290 
SeekForPrev(const Slice & target)291 void FragmentedRangeTombstoneIterator::SeekForPrev(const Slice& target) {
292   if (tombstones_->empty()) {
293     Invalidate();
294     return;
295   }
296   SeekForPrevToCoveringTombstone(target);
297   ScanBackwardToVisibleTombstone();
298 }
299 
SeekToCoveringTombstone(const Slice & target)300 void FragmentedRangeTombstoneIterator::SeekToCoveringTombstone(
301     const Slice& target) {
302   pos_ = std::upper_bound(tombstones_->begin(), tombstones_->end(), target,
303                           tombstone_end_cmp_);
304   if (pos_ == tombstones_->end()) {
305     // All tombstones end before target.
306     seq_pos_ = tombstones_->seq_end();
307     return;
308   }
309   seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
310                               tombstones_->seq_iter(pos_->seq_end_idx),
311                               upper_bound_, std::greater<SequenceNumber>());
312 }
313 
SeekForPrevToCoveringTombstone(const Slice & target)314 void FragmentedRangeTombstoneIterator::SeekForPrevToCoveringTombstone(
315     const Slice& target) {
316   if (tombstones_->empty()) {
317     Invalidate();
318     return;
319   }
320   pos_ = std::upper_bound(tombstones_->begin(), tombstones_->end(), target,
321                           tombstone_start_cmp_);
322   if (pos_ == tombstones_->begin()) {
323     // All tombstones start after target.
324     Invalidate();
325     return;
326   }
327   --pos_;
328   seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
329                               tombstones_->seq_iter(pos_->seq_end_idx),
330                               upper_bound_, std::greater<SequenceNumber>());
331 }
332 
ScanForwardToVisibleTombstone()333 void FragmentedRangeTombstoneIterator::ScanForwardToVisibleTombstone() {
334   while (pos_ != tombstones_->end() &&
335          (seq_pos_ == tombstones_->seq_iter(pos_->seq_end_idx) ||
336           *seq_pos_ < lower_bound_)) {
337     ++pos_;
338     if (pos_ == tombstones_->end()) {
339       Invalidate();
340       return;
341     }
342     seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
343                                 tombstones_->seq_iter(pos_->seq_end_idx),
344                                 upper_bound_, std::greater<SequenceNumber>());
345   }
346 }
347 
ScanBackwardToVisibleTombstone()348 void FragmentedRangeTombstoneIterator::ScanBackwardToVisibleTombstone() {
349   while (pos_ != tombstones_->end() &&
350          (seq_pos_ == tombstones_->seq_iter(pos_->seq_end_idx) ||
351           *seq_pos_ < lower_bound_)) {
352     if (pos_ == tombstones_->begin()) {
353       Invalidate();
354       return;
355     }
356     --pos_;
357     seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
358                                 tombstones_->seq_iter(pos_->seq_end_idx),
359                                 upper_bound_, std::greater<SequenceNumber>());
360   }
361 }
362 
Next()363 void FragmentedRangeTombstoneIterator::Next() {
364   ++seq_pos_;
365   if (seq_pos_ == tombstones_->seq_iter(pos_->seq_end_idx)) {
366     ++pos_;
367   }
368 }
369 
TopNext()370 void FragmentedRangeTombstoneIterator::TopNext() {
371   ++pos_;
372   if (pos_ == tombstones_->end()) {
373     return;
374   }
375   seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
376                               tombstones_->seq_iter(pos_->seq_end_idx),
377                               upper_bound_, std::greater<SequenceNumber>());
378   ScanForwardToVisibleTombstone();
379 }
380 
Prev()381 void FragmentedRangeTombstoneIterator::Prev() {
382   if (seq_pos_ == tombstones_->seq_begin()) {
383     Invalidate();
384     return;
385   }
386   --seq_pos_;
387   if (pos_ == tombstones_->end() ||
388       seq_pos_ == tombstones_->seq_iter(pos_->seq_start_idx - 1)) {
389     --pos_;
390   }
391 }
392 
TopPrev()393 void FragmentedRangeTombstoneIterator::TopPrev() {
394   if (pos_ == tombstones_->begin()) {
395     Invalidate();
396     return;
397   }
398   --pos_;
399   seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
400                               tombstones_->seq_iter(pos_->seq_end_idx),
401                               upper_bound_, std::greater<SequenceNumber>());
402   ScanBackwardToVisibleTombstone();
403 }
404 
Valid() const405 bool FragmentedRangeTombstoneIterator::Valid() const {
406   return tombstones_ != nullptr && pos_ != tombstones_->end();
407 }
408 
MaxCoveringTombstoneSeqnum(const Slice & target_user_key)409 SequenceNumber FragmentedRangeTombstoneIterator::MaxCoveringTombstoneSeqnum(
410     const Slice& target_user_key) {
411   SeekToCoveringTombstone(target_user_key);
412   return ValidPos() && ucmp_->Compare(start_key(), target_user_key) <= 0 ? seq()
413                                                                          : 0;
414 }
415 
416 std::map<SequenceNumber, std::unique_ptr<FragmentedRangeTombstoneIterator>>
SplitBySnapshot(const std::vector<SequenceNumber> & snapshots)417 FragmentedRangeTombstoneIterator::SplitBySnapshot(
418     const std::vector<SequenceNumber>& snapshots) {
419   std::map<SequenceNumber, std::unique_ptr<FragmentedRangeTombstoneIterator>>
420       splits;
421   SequenceNumber lower = 0;
422   SequenceNumber upper;
423   for (size_t i = 0; i <= snapshots.size(); i++) {
424     if (i >= snapshots.size()) {
425       upper = kMaxSequenceNumber;
426     } else {
427       upper = snapshots[i];
428     }
429     if (tombstones_->ContainsRange(lower, upper)) {
430       splits.emplace(upper, std::unique_ptr<FragmentedRangeTombstoneIterator>(
431                                 new FragmentedRangeTombstoneIterator(
432                                     tombstones_, *icmp_, upper, lower)));
433     }
434     lower = upper + 1;
435   }
436   return splits;
437 }
438 
439 }  // namespace ROCKSDB_NAMESPACE
440