1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #include "table/get_context.h"
7 #include "db/merge_helper.h"
8 #include "db/pinned_iterators_manager.h"
9 #include "db/read_callback.h"
10 #include "monitoring/file_read_sample.h"
11 #include "monitoring/perf_context_imp.h"
12 #include "monitoring/statistics.h"
13 #include "rocksdb/env.h"
14 #include "rocksdb/merge_operator.h"
15 #include "rocksdb/statistics.h"
16 
17 namespace ROCKSDB_NAMESPACE {
18 
19 namespace {
20 
appendToReplayLog(std::string * replay_log,ValueType type,Slice value)21 void appendToReplayLog(std::string* replay_log, ValueType type, Slice value) {
22 #ifndef ROCKSDB_LITE
23   if (replay_log) {
24     if (replay_log->empty()) {
25       // Optimization: in the common case of only one operation in the
26       // log, we allocate the exact amount of space needed.
27       replay_log->reserve(1 + VarintLength(value.size()) + value.size());
28     }
29     replay_log->push_back(type);
30     PutLengthPrefixedSlice(replay_log, value);
31   }
32 #else
33   (void)replay_log;
34   (void)type;
35   (void)value;
36 #endif  // ROCKSDB_LITE
37 }
38 
39 }  // namespace
40 
GetContext(const Comparator * ucmp,const MergeOperator * merge_operator,Logger * logger,Statistics * statistics,GetState init_state,const Slice & user_key,PinnableSlice * pinnable_val,bool * value_found,MergeContext * merge_context,bool do_merge,SequenceNumber * _max_covering_tombstone_seq,Env * env,SequenceNumber * seq,PinnedIteratorsManager * _pinned_iters_mgr,ReadCallback * callback,bool * is_blob_index,uint64_t tracing_get_id)41 GetContext::GetContext(
42     const Comparator* ucmp, const MergeOperator* merge_operator, Logger* logger,
43     Statistics* statistics, GetState init_state, const Slice& user_key,
44     PinnableSlice* pinnable_val, bool* value_found, MergeContext* merge_context,
45     bool do_merge, SequenceNumber* _max_covering_tombstone_seq, Env* env,
46     SequenceNumber* seq, PinnedIteratorsManager* _pinned_iters_mgr,
47     ReadCallback* callback, bool* is_blob_index, uint64_t tracing_get_id)
48     : ucmp_(ucmp),
49       merge_operator_(merge_operator),
50       logger_(logger),
51       statistics_(statistics),
52       state_(init_state),
53       user_key_(user_key),
54       pinnable_val_(pinnable_val),
55       value_found_(value_found),
56       merge_context_(merge_context),
57       max_covering_tombstone_seq_(_max_covering_tombstone_seq),
58       env_(env),
59       seq_(seq),
60       replay_log_(nullptr),
61       pinned_iters_mgr_(_pinned_iters_mgr),
62       callback_(callback),
63       do_merge_(do_merge),
64       is_blob_index_(is_blob_index),
65       tracing_get_id_(tracing_get_id) {
66   if (seq_) {
67     *seq_ = kMaxSequenceNumber;
68   }
69   sample_ = should_sample_file_read();
70 }
71 
72 // Called from TableCache::Get and Table::Get when file/block in which
73 // key may exist are not there in TableCache/BlockCache respectively. In this
74 // case we can't guarantee that key does not exist and are not permitted to do
75 // IO to be certain.Set the status=kFound and value_found=false to let the
76 // caller know that key may exist but is not there in memory
MarkKeyMayExist()77 void GetContext::MarkKeyMayExist() {
78   state_ = kFound;
79   if (value_found_ != nullptr) {
80     *value_found_ = false;
81   }
82 }
83 
SaveValue(const Slice & value,SequenceNumber)84 void GetContext::SaveValue(const Slice& value, SequenceNumber /*seq*/) {
85   assert(state_ == kNotFound);
86   appendToReplayLog(replay_log_, kTypeValue, value);
87 
88   state_ = kFound;
89   if (LIKELY(pinnable_val_ != nullptr)) {
90     pinnable_val_->PinSelf(value);
91   }
92 }
93 
ReportCounters()94 void GetContext::ReportCounters() {
95   if (get_context_stats_.num_cache_hit > 0) {
96     RecordTick(statistics_, BLOCK_CACHE_HIT, get_context_stats_.num_cache_hit);
97   }
98   if (get_context_stats_.num_cache_index_hit > 0) {
99     RecordTick(statistics_, BLOCK_CACHE_INDEX_HIT,
100                get_context_stats_.num_cache_index_hit);
101   }
102   if (get_context_stats_.num_cache_data_hit > 0) {
103     RecordTick(statistics_, BLOCK_CACHE_DATA_HIT,
104                get_context_stats_.num_cache_data_hit);
105   }
106   if (get_context_stats_.num_cache_filter_hit > 0) {
107     RecordTick(statistics_, BLOCK_CACHE_FILTER_HIT,
108                get_context_stats_.num_cache_filter_hit);
109   }
110   if (get_context_stats_.num_cache_compression_dict_hit > 0) {
111     RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_HIT,
112                get_context_stats_.num_cache_compression_dict_hit);
113   }
114   if (get_context_stats_.num_cache_index_miss > 0) {
115     RecordTick(statistics_, BLOCK_CACHE_INDEX_MISS,
116                get_context_stats_.num_cache_index_miss);
117   }
118   if (get_context_stats_.num_cache_filter_miss > 0) {
119     RecordTick(statistics_, BLOCK_CACHE_FILTER_MISS,
120                get_context_stats_.num_cache_filter_miss);
121   }
122   if (get_context_stats_.num_cache_data_miss > 0) {
123     RecordTick(statistics_, BLOCK_CACHE_DATA_MISS,
124                get_context_stats_.num_cache_data_miss);
125   }
126   if (get_context_stats_.num_cache_compression_dict_miss > 0) {
127     RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_MISS,
128                get_context_stats_.num_cache_compression_dict_miss);
129   }
130   if (get_context_stats_.num_cache_bytes_read > 0) {
131     RecordTick(statistics_, BLOCK_CACHE_BYTES_READ,
132                get_context_stats_.num_cache_bytes_read);
133   }
134   if (get_context_stats_.num_cache_miss > 0) {
135     RecordTick(statistics_, BLOCK_CACHE_MISS,
136                get_context_stats_.num_cache_miss);
137   }
138   if (get_context_stats_.num_cache_add > 0) {
139     RecordTick(statistics_, BLOCK_CACHE_ADD, get_context_stats_.num_cache_add);
140   }
141   if (get_context_stats_.num_cache_bytes_write > 0) {
142     RecordTick(statistics_, BLOCK_CACHE_BYTES_WRITE,
143                get_context_stats_.num_cache_bytes_write);
144   }
145   if (get_context_stats_.num_cache_index_add > 0) {
146     RecordTick(statistics_, BLOCK_CACHE_INDEX_ADD,
147                get_context_stats_.num_cache_index_add);
148   }
149   if (get_context_stats_.num_cache_index_bytes_insert > 0) {
150     RecordTick(statistics_, BLOCK_CACHE_INDEX_BYTES_INSERT,
151                get_context_stats_.num_cache_index_bytes_insert);
152   }
153   if (get_context_stats_.num_cache_data_add > 0) {
154     RecordTick(statistics_, BLOCK_CACHE_DATA_ADD,
155                get_context_stats_.num_cache_data_add);
156   }
157   if (get_context_stats_.num_cache_data_bytes_insert > 0) {
158     RecordTick(statistics_, BLOCK_CACHE_DATA_BYTES_INSERT,
159                get_context_stats_.num_cache_data_bytes_insert);
160   }
161   if (get_context_stats_.num_cache_filter_add > 0) {
162     RecordTick(statistics_, BLOCK_CACHE_FILTER_ADD,
163                get_context_stats_.num_cache_filter_add);
164   }
165   if (get_context_stats_.num_cache_filter_bytes_insert > 0) {
166     RecordTick(statistics_, BLOCK_CACHE_FILTER_BYTES_INSERT,
167                get_context_stats_.num_cache_filter_bytes_insert);
168   }
169   if (get_context_stats_.num_cache_compression_dict_add > 0) {
170     RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_ADD,
171                get_context_stats_.num_cache_compression_dict_add);
172   }
173   if (get_context_stats_.num_cache_compression_dict_bytes_insert > 0) {
174     RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT,
175                get_context_stats_.num_cache_compression_dict_bytes_insert);
176   }
177 }
178 
SaveValue(const ParsedInternalKey & parsed_key,const Slice & value,bool * matched,Cleanable * value_pinner)179 bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
180                            const Slice& value, bool* matched,
181                            Cleanable* value_pinner) {
182   assert(matched);
183   assert((state_ != kMerge && parsed_key.type != kTypeMerge) ||
184          merge_context_ != nullptr);
185   if (ucmp_->CompareWithoutTimestamp(parsed_key.user_key, user_key_) == 0) {
186     *matched = true;
187     // If the value is not in the snapshot, skip it
188     if (!CheckCallback(parsed_key.sequence)) {
189       return true;  // to continue to the next seq
190     }
191 
192     appendToReplayLog(replay_log_, parsed_key.type, value);
193 
194     if (seq_ != nullptr) {
195       // Set the sequence number if it is uninitialized
196       if (*seq_ == kMaxSequenceNumber) {
197         *seq_ = parsed_key.sequence;
198       }
199     }
200 
201     auto type = parsed_key.type;
202     // Key matches. Process it
203     if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex) &&
204         max_covering_tombstone_seq_ != nullptr &&
205         *max_covering_tombstone_seq_ > parsed_key.sequence) {
206       type = kTypeRangeDeletion;
207     }
208     switch (type) {
209       case kTypeValue:
210       case kTypeBlobIndex:
211         assert(state_ == kNotFound || state_ == kMerge);
212         if (type == kTypeBlobIndex && is_blob_index_ == nullptr) {
213           // Blob value not supported. Stop.
214           state_ = kBlobIndex;
215           return false;
216         }
217         if (kNotFound == state_) {
218           state_ = kFound;
219           if (do_merge_) {
220             if (LIKELY(pinnable_val_ != nullptr)) {
221               if (LIKELY(value_pinner != nullptr)) {
222                 // If the backing resources for the value are provided, pin them
223                 pinnable_val_->PinSlice(value, value_pinner);
224               } else {
225                 TEST_SYNC_POINT_CALLBACK("GetContext::SaveValue::PinSelf",
226                                          this);
227 
228                 // Otherwise copy the value
229                 pinnable_val_->PinSelf(value);
230               }
231             }
232           } else {
233             // It means this function is called as part of DB GetMergeOperands
234             // API and the current value should be part of
235             // merge_context_->operand_list
236             push_operand(value, value_pinner);
237           }
238         } else if (kMerge == state_) {
239           assert(merge_operator_ != nullptr);
240           state_ = kFound;
241           if (do_merge_) {
242             if (LIKELY(pinnable_val_ != nullptr)) {
243               Status merge_status = MergeHelper::TimedFullMerge(
244                   merge_operator_, user_key_, &value,
245                   merge_context_->GetOperands(), pinnable_val_->GetSelf(),
246                   logger_, statistics_, env_);
247               pinnable_val_->PinSelf();
248               if (!merge_status.ok()) {
249                 state_ = kCorrupt;
250               }
251             }
252           } else {
253             // It means this function is called as part of DB GetMergeOperands
254             // API and the current value should be part of
255             // merge_context_->operand_list
256             push_operand(value, value_pinner);
257           }
258         }
259         if (is_blob_index_ != nullptr) {
260           *is_blob_index_ = (type == kTypeBlobIndex);
261         }
262         return false;
263 
264       case kTypeDeletion:
265       case kTypeSingleDeletion:
266       case kTypeRangeDeletion:
267         // TODO(noetzli): Verify correctness once merge of single-deletes
268         // is supported
269         assert(state_ == kNotFound || state_ == kMerge);
270         if (kNotFound == state_) {
271           state_ = kDeleted;
272         } else if (kMerge == state_) {
273           state_ = kFound;
274           if (LIKELY(pinnable_val_ != nullptr)) {
275             if (do_merge_) {
276               Status merge_status = MergeHelper::TimedFullMerge(
277                   merge_operator_, user_key_, nullptr,
278                   merge_context_->GetOperands(), pinnable_val_->GetSelf(),
279                   logger_, statistics_, env_);
280               pinnable_val_->PinSelf();
281               if (!merge_status.ok()) {
282                 state_ = kCorrupt;
283               }
284             }
285             // If do_merge_ = false then the current value shouldn't be part of
286             // merge_context_->operand_list
287           }
288         }
289         return false;
290 
291       case kTypeMerge:
292         assert(state_ == kNotFound || state_ == kMerge);
293         state_ = kMerge;
294         // value_pinner is not set from plain_table_reader.cc for example.
295         push_operand(value, value_pinner);
296         if (do_merge_ && merge_operator_ != nullptr &&
297             merge_operator_->ShouldMerge(
298                 merge_context_->GetOperandsDirectionBackward())) {
299           state_ = kFound;
300           if (LIKELY(pinnable_val_ != nullptr)) {
301             // do_merge_ = true this is the case where this function is called
302             // as part of DB Get API hence merge operators should be merged.
303             if (do_merge_) {
304               Status merge_status = MergeHelper::TimedFullMerge(
305                   merge_operator_, user_key_, nullptr,
306                   merge_context_->GetOperands(), pinnable_val_->GetSelf(),
307                   logger_, statistics_, env_);
308               pinnable_val_->PinSelf();
309               if (!merge_status.ok()) {
310                 state_ = kCorrupt;
311               }
312             }
313           }
314           return false;
315         }
316         return true;
317 
318       default:
319         assert(false);
320         break;
321     }
322   }
323 
324   // state_ could be Corrupt, merge or notfound
325   return false;
326 }
327 
push_operand(const Slice & value,Cleanable * value_pinner)328 void GetContext::push_operand(const Slice& value, Cleanable* value_pinner) {
329   if (pinned_iters_mgr() && pinned_iters_mgr()->PinningEnabled() &&
330       value_pinner != nullptr) {
331     value_pinner->DelegateCleanupsTo(pinned_iters_mgr());
332     merge_context_->PushOperand(value, true /*value_pinned*/);
333   } else {
334     merge_context_->PushOperand(value, false);
335   }
336 }
337 
replayGetContextLog(const Slice & replay_log,const Slice & user_key,GetContext * get_context,Cleanable * value_pinner)338 void replayGetContextLog(const Slice& replay_log, const Slice& user_key,
339                          GetContext* get_context, Cleanable* value_pinner) {
340 #ifndef ROCKSDB_LITE
341   Slice s = replay_log;
342   while (s.size()) {
343     auto type = static_cast<ValueType>(*s.data());
344     s.remove_prefix(1);
345     Slice value;
346     bool ret = GetLengthPrefixedSlice(&s, &value);
347     assert(ret);
348     (void)ret;
349 
350     bool dont_care __attribute__((__unused__));
351     // Since SequenceNumber is not stored and unknown, we will use
352     // kMaxSequenceNumber.
353     get_context->SaveValue(
354         ParsedInternalKey(user_key, kMaxSequenceNumber, type), value,
355         &dont_care, value_pinner);
356   }
357 #else   // ROCKSDB_LITE
358   (void)replay_log;
359   (void)user_key;
360   (void)get_context;
361   (void)value_pinner;
362   assert(false);
363 #endif  // ROCKSDB_LITE
364 }
365 
366 }  // namespace ROCKSDB_NAMESPACE
367