1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #include "table/get_context.h"
7 
8 #include "db/blob/blob_fetcher.h"
9 #include "db/merge_helper.h"
10 #include "db/pinned_iterators_manager.h"
11 #include "db/read_callback.h"
12 #include "monitoring/file_read_sample.h"
13 #include "monitoring/perf_context_imp.h"
14 #include "monitoring/statistics.h"
15 #include "rocksdb/merge_operator.h"
16 #include "rocksdb/statistics.h"
17 #include "rocksdb/system_clock.h"
18 
19 namespace ROCKSDB_NAMESPACE {
20 
21 namespace {
22 
appendToReplayLog(std::string * replay_log,ValueType type,Slice value)23 void appendToReplayLog(std::string* replay_log, ValueType type, Slice value) {
24 #ifndef ROCKSDB_LITE
25   if (replay_log) {
26     if (replay_log->empty()) {
27       // Optimization: in the common case of only one operation in the
28       // log, we allocate the exact amount of space needed.
29       replay_log->reserve(1 + VarintLength(value.size()) + value.size());
30     }
31     replay_log->push_back(type);
32     PutLengthPrefixedSlice(replay_log, value);
33   }
34 #else
35   (void)replay_log;
36   (void)type;
37   (void)value;
38 #endif  // ROCKSDB_LITE
39 }
40 
41 }  // namespace
42 
GetContext(const Comparator * ucmp,const MergeOperator * merge_operator,Logger * logger,Statistics * statistics,GetState init_state,const Slice & user_key,PinnableSlice * pinnable_val,std::string * timestamp,bool * value_found,MergeContext * merge_context,bool do_merge,SequenceNumber * _max_covering_tombstone_seq,SystemClock * clock,SequenceNumber * seq,PinnedIteratorsManager * _pinned_iters_mgr,ReadCallback * callback,bool * is_blob_index,uint64_t tracing_get_id,BlobFetcher * blob_fetcher)43 GetContext::GetContext(const Comparator* ucmp,
44                        const MergeOperator* merge_operator, Logger* logger,
45                        Statistics* statistics, GetState init_state,
46                        const Slice& user_key, PinnableSlice* pinnable_val,
47                        std::string* timestamp, bool* value_found,
48                        MergeContext* merge_context, bool do_merge,
49                        SequenceNumber* _max_covering_tombstone_seq,
50                        SystemClock* clock, SequenceNumber* seq,
51                        PinnedIteratorsManager* _pinned_iters_mgr,
52                        ReadCallback* callback, bool* is_blob_index,
53                        uint64_t tracing_get_id, BlobFetcher* blob_fetcher)
54     : ucmp_(ucmp),
55       merge_operator_(merge_operator),
56       logger_(logger),
57       statistics_(statistics),
58       state_(init_state),
59       user_key_(user_key),
60       pinnable_val_(pinnable_val),
61       timestamp_(timestamp),
62       value_found_(value_found),
63       merge_context_(merge_context),
64       max_covering_tombstone_seq_(_max_covering_tombstone_seq),
65       clock_(clock),
66       seq_(seq),
67       replay_log_(nullptr),
68       pinned_iters_mgr_(_pinned_iters_mgr),
69       callback_(callback),
70       do_merge_(do_merge),
71       is_blob_index_(is_blob_index),
72       tracing_get_id_(tracing_get_id),
73       blob_fetcher_(blob_fetcher) {
74   if (seq_) {
75     *seq_ = kMaxSequenceNumber;
76   }
77   sample_ = should_sample_file_read();
78 }
79 
GetContext(const Comparator * ucmp,const MergeOperator * merge_operator,Logger * logger,Statistics * statistics,GetState init_state,const Slice & user_key,PinnableSlice * pinnable_val,bool * value_found,MergeContext * merge_context,bool do_merge,SequenceNumber * _max_covering_tombstone_seq,SystemClock * clock,SequenceNumber * seq,PinnedIteratorsManager * _pinned_iters_mgr,ReadCallback * callback,bool * is_blob_index,uint64_t tracing_get_id,BlobFetcher * blob_fetcher)80 GetContext::GetContext(
81     const Comparator* ucmp, const MergeOperator* merge_operator, Logger* logger,
82     Statistics* statistics, GetState init_state, const Slice& user_key,
83     PinnableSlice* pinnable_val, bool* value_found, MergeContext* merge_context,
84     bool do_merge, SequenceNumber* _max_covering_tombstone_seq,
85     SystemClock* clock, SequenceNumber* seq,
86     PinnedIteratorsManager* _pinned_iters_mgr, ReadCallback* callback,
87     bool* is_blob_index, uint64_t tracing_get_id, BlobFetcher* blob_fetcher)
88     : GetContext(ucmp, merge_operator, logger, statistics, init_state, user_key,
89                  pinnable_val, nullptr, value_found, merge_context, do_merge,
90                  _max_covering_tombstone_seq, clock, seq, _pinned_iters_mgr,
91                  callback, is_blob_index, tracing_get_id, blob_fetcher) {}
92 
93 // Called from TableCache::Get and Table::Get when file/block in which
94 // key may exist are not there in TableCache/BlockCache respectively. In this
95 // case we can't guarantee that key does not exist and are not permitted to do
96 // IO to be certain.Set the status=kFound and value_found=false to let the
97 // caller know that key may exist but is not there in memory
MarkKeyMayExist()98 void GetContext::MarkKeyMayExist() {
99   state_ = kFound;
100   if (value_found_ != nullptr) {
101     *value_found_ = false;
102   }
103 }
104 
SaveValue(const Slice & value,SequenceNumber)105 void GetContext::SaveValue(const Slice& value, SequenceNumber /*seq*/) {
106   assert(state_ == kNotFound);
107   appendToReplayLog(replay_log_, kTypeValue, value);
108 
109   state_ = kFound;
110   if (LIKELY(pinnable_val_ != nullptr)) {
111     pinnable_val_->PinSelf(value);
112   }
113 }
114 
ReportCounters()115 void GetContext::ReportCounters() {
116   if (get_context_stats_.num_cache_hit > 0) {
117     RecordTick(statistics_, BLOCK_CACHE_HIT, get_context_stats_.num_cache_hit);
118   }
119   if (get_context_stats_.num_cache_index_hit > 0) {
120     RecordTick(statistics_, BLOCK_CACHE_INDEX_HIT,
121                get_context_stats_.num_cache_index_hit);
122   }
123   if (get_context_stats_.num_cache_data_hit > 0) {
124     RecordTick(statistics_, BLOCK_CACHE_DATA_HIT,
125                get_context_stats_.num_cache_data_hit);
126   }
127   if (get_context_stats_.num_cache_filter_hit > 0) {
128     RecordTick(statistics_, BLOCK_CACHE_FILTER_HIT,
129                get_context_stats_.num_cache_filter_hit);
130   }
131   if (get_context_stats_.num_cache_compression_dict_hit > 0) {
132     RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_HIT,
133                get_context_stats_.num_cache_compression_dict_hit);
134   }
135   if (get_context_stats_.num_cache_index_miss > 0) {
136     RecordTick(statistics_, BLOCK_CACHE_INDEX_MISS,
137                get_context_stats_.num_cache_index_miss);
138   }
139   if (get_context_stats_.num_cache_filter_miss > 0) {
140     RecordTick(statistics_, BLOCK_CACHE_FILTER_MISS,
141                get_context_stats_.num_cache_filter_miss);
142   }
143   if (get_context_stats_.num_cache_data_miss > 0) {
144     RecordTick(statistics_, BLOCK_CACHE_DATA_MISS,
145                get_context_stats_.num_cache_data_miss);
146   }
147   if (get_context_stats_.num_cache_compression_dict_miss > 0) {
148     RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_MISS,
149                get_context_stats_.num_cache_compression_dict_miss);
150   }
151   if (get_context_stats_.num_cache_bytes_read > 0) {
152     RecordTick(statistics_, BLOCK_CACHE_BYTES_READ,
153                get_context_stats_.num_cache_bytes_read);
154   }
155   if (get_context_stats_.num_cache_miss > 0) {
156     RecordTick(statistics_, BLOCK_CACHE_MISS,
157                get_context_stats_.num_cache_miss);
158   }
159   if (get_context_stats_.num_cache_add > 0) {
160     RecordTick(statistics_, BLOCK_CACHE_ADD, get_context_stats_.num_cache_add);
161   }
162   if (get_context_stats_.num_cache_add_redundant > 0) {
163     RecordTick(statistics_, BLOCK_CACHE_ADD_REDUNDANT,
164                get_context_stats_.num_cache_add_redundant);
165   }
166   if (get_context_stats_.num_cache_bytes_write > 0) {
167     RecordTick(statistics_, BLOCK_CACHE_BYTES_WRITE,
168                get_context_stats_.num_cache_bytes_write);
169   }
170   if (get_context_stats_.num_cache_index_add > 0) {
171     RecordTick(statistics_, BLOCK_CACHE_INDEX_ADD,
172                get_context_stats_.num_cache_index_add);
173   }
174   if (get_context_stats_.num_cache_index_add_redundant > 0) {
175     RecordTick(statistics_, BLOCK_CACHE_INDEX_ADD_REDUNDANT,
176                get_context_stats_.num_cache_index_add_redundant);
177   }
178   if (get_context_stats_.num_cache_index_bytes_insert > 0) {
179     RecordTick(statistics_, BLOCK_CACHE_INDEX_BYTES_INSERT,
180                get_context_stats_.num_cache_index_bytes_insert);
181   }
182   if (get_context_stats_.num_cache_data_add > 0) {
183     RecordTick(statistics_, BLOCK_CACHE_DATA_ADD,
184                get_context_stats_.num_cache_data_add);
185   }
186   if (get_context_stats_.num_cache_data_add_redundant > 0) {
187     RecordTick(statistics_, BLOCK_CACHE_DATA_ADD_REDUNDANT,
188                get_context_stats_.num_cache_data_add_redundant);
189   }
190   if (get_context_stats_.num_cache_data_bytes_insert > 0) {
191     RecordTick(statistics_, BLOCK_CACHE_DATA_BYTES_INSERT,
192                get_context_stats_.num_cache_data_bytes_insert);
193   }
194   if (get_context_stats_.num_cache_filter_add > 0) {
195     RecordTick(statistics_, BLOCK_CACHE_FILTER_ADD,
196                get_context_stats_.num_cache_filter_add);
197   }
198   if (get_context_stats_.num_cache_filter_add_redundant > 0) {
199     RecordTick(statistics_, BLOCK_CACHE_FILTER_ADD_REDUNDANT,
200                get_context_stats_.num_cache_filter_add_redundant);
201   }
202   if (get_context_stats_.num_cache_filter_bytes_insert > 0) {
203     RecordTick(statistics_, BLOCK_CACHE_FILTER_BYTES_INSERT,
204                get_context_stats_.num_cache_filter_bytes_insert);
205   }
206   if (get_context_stats_.num_cache_compression_dict_add > 0) {
207     RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_ADD,
208                get_context_stats_.num_cache_compression_dict_add);
209   }
210   if (get_context_stats_.num_cache_compression_dict_add_redundant > 0) {
211     RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_ADD_REDUNDANT,
212                get_context_stats_.num_cache_compression_dict_add_redundant);
213   }
214   if (get_context_stats_.num_cache_compression_dict_bytes_insert > 0) {
215     RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT,
216                get_context_stats_.num_cache_compression_dict_bytes_insert);
217   }
218 }
219 
SaveValue(const ParsedInternalKey & parsed_key,const Slice & value,bool * matched,Cleanable * value_pinner)220 bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
221                            const Slice& value, bool* matched,
222                            Cleanable* value_pinner) {
223   assert(matched);
224   assert((state_ != kMerge && parsed_key.type != kTypeMerge) ||
225          merge_context_ != nullptr);
226   if (ucmp_->EqualWithoutTimestamp(parsed_key.user_key, user_key_)) {
227     *matched = true;
228     // If the value is not in the snapshot, skip it
229     if (!CheckCallback(parsed_key.sequence)) {
230       return true;  // to continue to the next seq
231     }
232 
233     appendToReplayLog(replay_log_, parsed_key.type, value);
234 
235     if (seq_ != nullptr) {
236       // Set the sequence number if it is uninitialized
237       if (*seq_ == kMaxSequenceNumber) {
238         *seq_ = parsed_key.sequence;
239       }
240     }
241 
242     auto type = parsed_key.type;
243     // Key matches. Process it
244     if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex) &&
245         max_covering_tombstone_seq_ != nullptr &&
246         *max_covering_tombstone_seq_ > parsed_key.sequence) {
247       type = kTypeRangeDeletion;
248     }
249     switch (type) {
250       case kTypeValue:
251       case kTypeBlobIndex:
252         assert(state_ == kNotFound || state_ == kMerge);
253         if (type == kTypeBlobIndex && is_blob_index_ == nullptr) {
254           // Blob value not supported. Stop.
255           state_ = kUnexpectedBlobIndex;
256           return false;
257         }
258         if (is_blob_index_ != nullptr) {
259           *is_blob_index_ = (type == kTypeBlobIndex);
260         }
261         if (kNotFound == state_) {
262           state_ = kFound;
263           if (do_merge_) {
264             if (LIKELY(pinnable_val_ != nullptr)) {
265               if (LIKELY(value_pinner != nullptr)) {
266                 // If the backing resources for the value are provided, pin them
267                 pinnable_val_->PinSlice(value, value_pinner);
268               } else {
269                 TEST_SYNC_POINT_CALLBACK("GetContext::SaveValue::PinSelf",
270                                          this);
271                 // Otherwise copy the value
272                 pinnable_val_->PinSelf(value);
273               }
274             }
275           } else {
276             // It means this function is called as part of DB GetMergeOperands
277             // API and the current value should be part of
278             // merge_context_->operand_list
279             if (is_blob_index_ != nullptr && *is_blob_index_) {
280               PinnableSlice pin_val;
281               if (GetBlobValue(value, &pin_val) == false) {
282                 return false;
283               }
284               Slice blob_value(pin_val);
285               push_operand(blob_value, nullptr);
286             } else {
287               push_operand(value, value_pinner);
288             }
289           }
290         } else if (kMerge == state_) {
291           assert(merge_operator_ != nullptr);
292           if (is_blob_index_ != nullptr && *is_blob_index_) {
293             PinnableSlice pin_val;
294             if (GetBlobValue(value, &pin_val) == false) {
295               return false;
296             }
297             Slice blob_value(pin_val);
298             state_ = kFound;
299             if (do_merge_) {
300               Merge(&blob_value);
301             } else {
302               // It means this function is called as part of DB GetMergeOperands
303               // API and the current value should be part of
304               // merge_context_->operand_list
305               push_operand(blob_value, nullptr);
306             }
307           } else {
308             state_ = kFound;
309             if (do_merge_) {
310               Merge(&value);
311             } else {
312               // It means this function is called as part of DB GetMergeOperands
313               // API and the current value should be part of
314               // merge_context_->operand_list
315               push_operand(value, value_pinner);
316             }
317           }
318         }
319         if (state_ == kFound) {
320           size_t ts_sz = ucmp_->timestamp_size();
321           if (ts_sz > 0 && timestamp_ != nullptr) {
322             Slice ts = ExtractTimestampFromUserKey(parsed_key.user_key, ts_sz);
323             timestamp_->assign(ts.data(), ts.size());
324           }
325         }
326         return false;
327 
328       case kTypeDeletion:
329       case kTypeDeletionWithTimestamp:
330       case kTypeSingleDeletion:
331       case kTypeRangeDeletion:
332         // TODO(noetzli): Verify correctness once merge of single-deletes
333         // is supported
334         assert(state_ == kNotFound || state_ == kMerge);
335         if (kNotFound == state_) {
336           state_ = kDeleted;
337         } else if (kMerge == state_) {
338           state_ = kFound;
339           Merge(nullptr);
340           // If do_merge_ = false then the current value shouldn't be part of
341           // merge_context_->operand_list
342         }
343         return false;
344 
345       case kTypeMerge:
346         assert(state_ == kNotFound || state_ == kMerge);
347         state_ = kMerge;
348         // value_pinner is not set from plain_table_reader.cc for example.
349         push_operand(value, value_pinner);
350         if (do_merge_ && merge_operator_ != nullptr &&
351             merge_operator_->ShouldMerge(
352                 merge_context_->GetOperandsDirectionBackward())) {
353           state_ = kFound;
354           Merge(nullptr);
355           return false;
356         }
357         return true;
358 
359       default:
360         assert(false);
361         break;
362     }
363   }
364 
365   // state_ could be Corrupt, merge or notfound
366   return false;
367 }
368 
Merge(const Slice * value)369 void GetContext::Merge(const Slice* value) {
370   if (LIKELY(pinnable_val_ != nullptr)) {
371     if (do_merge_) {
372       Status merge_status = MergeHelper::TimedFullMerge(
373           merge_operator_, user_key_, value, merge_context_->GetOperands(),
374           pinnable_val_->GetSelf(), logger_, statistics_, clock_);
375       pinnable_val_->PinSelf();
376       if (!merge_status.ok()) {
377         state_ = kCorrupt;
378       }
379     }
380   }
381 }
382 
GetBlobValue(const Slice & blob_index,PinnableSlice * blob_value)383 bool GetContext::GetBlobValue(const Slice& blob_index,
384                               PinnableSlice* blob_value) {
385   Status status = blob_fetcher_->FetchBlob(user_key_, blob_index, blob_value);
386   if (!status.ok()) {
387     if (status.IsIncomplete()) {
388       MarkKeyMayExist();
389       return false;
390     }
391     state_ = kCorrupt;
392     return false;
393   }
394   *is_blob_index_ = false;
395   return true;
396 }
397 
push_operand(const Slice & value,Cleanable * value_pinner)398 void GetContext::push_operand(const Slice& value, Cleanable* value_pinner) {
399   if (pinned_iters_mgr() && pinned_iters_mgr()->PinningEnabled() &&
400       value_pinner != nullptr) {
401     value_pinner->DelegateCleanupsTo(pinned_iters_mgr());
402     merge_context_->PushOperand(value, true /*value_pinned*/);
403   } else {
404     merge_context_->PushOperand(value, false);
405   }
406 }
407 
replayGetContextLog(const Slice & replay_log,const Slice & user_key,GetContext * get_context,Cleanable * value_pinner)408 void replayGetContextLog(const Slice& replay_log, const Slice& user_key,
409                          GetContext* get_context, Cleanable* value_pinner) {
410 #ifndef ROCKSDB_LITE
411   Slice s = replay_log;
412   while (s.size()) {
413     auto type = static_cast<ValueType>(*s.data());
414     s.remove_prefix(1);
415     Slice value;
416     bool ret = GetLengthPrefixedSlice(&s, &value);
417     assert(ret);
418     (void)ret;
419 
420     bool dont_care __attribute__((__unused__));
421     // Since SequenceNumber is not stored and unknown, we will use
422     // kMaxSequenceNumber.
423     get_context->SaveValue(
424         ParsedInternalKey(user_key, kMaxSequenceNumber, type), value,
425         &dont_care, value_pinner);
426   }
427 #else   // ROCKSDB_LITE
428   (void)replay_log;
429   (void)user_key;
430   (void)get_context;
431   (void)value_pinner;
432   assert(false);
433 #endif  // ROCKSDB_LITE
434 }
435 
436 }  // namespace ROCKSDB_NAMESPACE
437