1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "table/get_context.h"
7
8 #include "db/blob/blob_fetcher.h"
9 #include "db/merge_helper.h"
10 #include "db/pinned_iterators_manager.h"
11 #include "db/read_callback.h"
12 #include "monitoring/file_read_sample.h"
13 #include "monitoring/perf_context_imp.h"
14 #include "monitoring/statistics.h"
15 #include "rocksdb/merge_operator.h"
16 #include "rocksdb/statistics.h"
17 #include "rocksdb/system_clock.h"
18
19 namespace ROCKSDB_NAMESPACE {
20
21 namespace {
22
appendToReplayLog(std::string * replay_log,ValueType type,Slice value)23 void appendToReplayLog(std::string* replay_log, ValueType type, Slice value) {
24 #ifndef ROCKSDB_LITE
25 if (replay_log) {
26 if (replay_log->empty()) {
27 // Optimization: in the common case of only one operation in the
28 // log, we allocate the exact amount of space needed.
29 replay_log->reserve(1 + VarintLength(value.size()) + value.size());
30 }
31 replay_log->push_back(type);
32 PutLengthPrefixedSlice(replay_log, value);
33 }
34 #else
35 (void)replay_log;
36 (void)type;
37 (void)value;
38 #endif // ROCKSDB_LITE
39 }
40
41 } // namespace
42
GetContext(const Comparator * ucmp,const MergeOperator * merge_operator,Logger * logger,Statistics * statistics,GetState init_state,const Slice & user_key,PinnableSlice * pinnable_val,std::string * timestamp,bool * value_found,MergeContext * merge_context,bool do_merge,SequenceNumber * _max_covering_tombstone_seq,SystemClock * clock,SequenceNumber * seq,PinnedIteratorsManager * _pinned_iters_mgr,ReadCallback * callback,bool * is_blob_index,uint64_t tracing_get_id,BlobFetcher * blob_fetcher)43 GetContext::GetContext(const Comparator* ucmp,
44 const MergeOperator* merge_operator, Logger* logger,
45 Statistics* statistics, GetState init_state,
46 const Slice& user_key, PinnableSlice* pinnable_val,
47 std::string* timestamp, bool* value_found,
48 MergeContext* merge_context, bool do_merge,
49 SequenceNumber* _max_covering_tombstone_seq,
50 SystemClock* clock, SequenceNumber* seq,
51 PinnedIteratorsManager* _pinned_iters_mgr,
52 ReadCallback* callback, bool* is_blob_index,
53 uint64_t tracing_get_id, BlobFetcher* blob_fetcher)
54 : ucmp_(ucmp),
55 merge_operator_(merge_operator),
56 logger_(logger),
57 statistics_(statistics),
58 state_(init_state),
59 user_key_(user_key),
60 pinnable_val_(pinnable_val),
61 timestamp_(timestamp),
62 value_found_(value_found),
63 merge_context_(merge_context),
64 max_covering_tombstone_seq_(_max_covering_tombstone_seq),
65 clock_(clock),
66 seq_(seq),
67 replay_log_(nullptr),
68 pinned_iters_mgr_(_pinned_iters_mgr),
69 callback_(callback),
70 do_merge_(do_merge),
71 is_blob_index_(is_blob_index),
72 tracing_get_id_(tracing_get_id),
73 blob_fetcher_(blob_fetcher) {
74 if (seq_) {
75 *seq_ = kMaxSequenceNumber;
76 }
77 sample_ = should_sample_file_read();
78 }
79
GetContext(const Comparator * ucmp,const MergeOperator * merge_operator,Logger * logger,Statistics * statistics,GetState init_state,const Slice & user_key,PinnableSlice * pinnable_val,bool * value_found,MergeContext * merge_context,bool do_merge,SequenceNumber * _max_covering_tombstone_seq,SystemClock * clock,SequenceNumber * seq,PinnedIteratorsManager * _pinned_iters_mgr,ReadCallback * callback,bool * is_blob_index,uint64_t tracing_get_id,BlobFetcher * blob_fetcher)80 GetContext::GetContext(
81 const Comparator* ucmp, const MergeOperator* merge_operator, Logger* logger,
82 Statistics* statistics, GetState init_state, const Slice& user_key,
83 PinnableSlice* pinnable_val, bool* value_found, MergeContext* merge_context,
84 bool do_merge, SequenceNumber* _max_covering_tombstone_seq,
85 SystemClock* clock, SequenceNumber* seq,
86 PinnedIteratorsManager* _pinned_iters_mgr, ReadCallback* callback,
87 bool* is_blob_index, uint64_t tracing_get_id, BlobFetcher* blob_fetcher)
88 : GetContext(ucmp, merge_operator, logger, statistics, init_state, user_key,
89 pinnable_val, nullptr, value_found, merge_context, do_merge,
90 _max_covering_tombstone_seq, clock, seq, _pinned_iters_mgr,
91 callback, is_blob_index, tracing_get_id, blob_fetcher) {}
92
93 // Called from TableCache::Get and Table::Get when file/block in which
94 // key may exist are not there in TableCache/BlockCache respectively. In this
95 // case we can't guarantee that key does not exist and are not permitted to do
96 // IO to be certain.Set the status=kFound and value_found=false to let the
97 // caller know that key may exist but is not there in memory
MarkKeyMayExist()98 void GetContext::MarkKeyMayExist() {
99 state_ = kFound;
100 if (value_found_ != nullptr) {
101 *value_found_ = false;
102 }
103 }
104
SaveValue(const Slice & value,SequenceNumber)105 void GetContext::SaveValue(const Slice& value, SequenceNumber /*seq*/) {
106 assert(state_ == kNotFound);
107 appendToReplayLog(replay_log_, kTypeValue, value);
108
109 state_ = kFound;
110 if (LIKELY(pinnable_val_ != nullptr)) {
111 pinnable_val_->PinSelf(value);
112 }
113 }
114
ReportCounters()115 void GetContext::ReportCounters() {
116 if (get_context_stats_.num_cache_hit > 0) {
117 RecordTick(statistics_, BLOCK_CACHE_HIT, get_context_stats_.num_cache_hit);
118 }
119 if (get_context_stats_.num_cache_index_hit > 0) {
120 RecordTick(statistics_, BLOCK_CACHE_INDEX_HIT,
121 get_context_stats_.num_cache_index_hit);
122 }
123 if (get_context_stats_.num_cache_data_hit > 0) {
124 RecordTick(statistics_, BLOCK_CACHE_DATA_HIT,
125 get_context_stats_.num_cache_data_hit);
126 }
127 if (get_context_stats_.num_cache_filter_hit > 0) {
128 RecordTick(statistics_, BLOCK_CACHE_FILTER_HIT,
129 get_context_stats_.num_cache_filter_hit);
130 }
131 if (get_context_stats_.num_cache_compression_dict_hit > 0) {
132 RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_HIT,
133 get_context_stats_.num_cache_compression_dict_hit);
134 }
135 if (get_context_stats_.num_cache_index_miss > 0) {
136 RecordTick(statistics_, BLOCK_CACHE_INDEX_MISS,
137 get_context_stats_.num_cache_index_miss);
138 }
139 if (get_context_stats_.num_cache_filter_miss > 0) {
140 RecordTick(statistics_, BLOCK_CACHE_FILTER_MISS,
141 get_context_stats_.num_cache_filter_miss);
142 }
143 if (get_context_stats_.num_cache_data_miss > 0) {
144 RecordTick(statistics_, BLOCK_CACHE_DATA_MISS,
145 get_context_stats_.num_cache_data_miss);
146 }
147 if (get_context_stats_.num_cache_compression_dict_miss > 0) {
148 RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_MISS,
149 get_context_stats_.num_cache_compression_dict_miss);
150 }
151 if (get_context_stats_.num_cache_bytes_read > 0) {
152 RecordTick(statistics_, BLOCK_CACHE_BYTES_READ,
153 get_context_stats_.num_cache_bytes_read);
154 }
155 if (get_context_stats_.num_cache_miss > 0) {
156 RecordTick(statistics_, BLOCK_CACHE_MISS,
157 get_context_stats_.num_cache_miss);
158 }
159 if (get_context_stats_.num_cache_add > 0) {
160 RecordTick(statistics_, BLOCK_CACHE_ADD, get_context_stats_.num_cache_add);
161 }
162 if (get_context_stats_.num_cache_add_redundant > 0) {
163 RecordTick(statistics_, BLOCK_CACHE_ADD_REDUNDANT,
164 get_context_stats_.num_cache_add_redundant);
165 }
166 if (get_context_stats_.num_cache_bytes_write > 0) {
167 RecordTick(statistics_, BLOCK_CACHE_BYTES_WRITE,
168 get_context_stats_.num_cache_bytes_write);
169 }
170 if (get_context_stats_.num_cache_index_add > 0) {
171 RecordTick(statistics_, BLOCK_CACHE_INDEX_ADD,
172 get_context_stats_.num_cache_index_add);
173 }
174 if (get_context_stats_.num_cache_index_add_redundant > 0) {
175 RecordTick(statistics_, BLOCK_CACHE_INDEX_ADD_REDUNDANT,
176 get_context_stats_.num_cache_index_add_redundant);
177 }
178 if (get_context_stats_.num_cache_index_bytes_insert > 0) {
179 RecordTick(statistics_, BLOCK_CACHE_INDEX_BYTES_INSERT,
180 get_context_stats_.num_cache_index_bytes_insert);
181 }
182 if (get_context_stats_.num_cache_data_add > 0) {
183 RecordTick(statistics_, BLOCK_CACHE_DATA_ADD,
184 get_context_stats_.num_cache_data_add);
185 }
186 if (get_context_stats_.num_cache_data_add_redundant > 0) {
187 RecordTick(statistics_, BLOCK_CACHE_DATA_ADD_REDUNDANT,
188 get_context_stats_.num_cache_data_add_redundant);
189 }
190 if (get_context_stats_.num_cache_data_bytes_insert > 0) {
191 RecordTick(statistics_, BLOCK_CACHE_DATA_BYTES_INSERT,
192 get_context_stats_.num_cache_data_bytes_insert);
193 }
194 if (get_context_stats_.num_cache_filter_add > 0) {
195 RecordTick(statistics_, BLOCK_CACHE_FILTER_ADD,
196 get_context_stats_.num_cache_filter_add);
197 }
198 if (get_context_stats_.num_cache_filter_add_redundant > 0) {
199 RecordTick(statistics_, BLOCK_CACHE_FILTER_ADD_REDUNDANT,
200 get_context_stats_.num_cache_filter_add_redundant);
201 }
202 if (get_context_stats_.num_cache_filter_bytes_insert > 0) {
203 RecordTick(statistics_, BLOCK_CACHE_FILTER_BYTES_INSERT,
204 get_context_stats_.num_cache_filter_bytes_insert);
205 }
206 if (get_context_stats_.num_cache_compression_dict_add > 0) {
207 RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_ADD,
208 get_context_stats_.num_cache_compression_dict_add);
209 }
210 if (get_context_stats_.num_cache_compression_dict_add_redundant > 0) {
211 RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_ADD_REDUNDANT,
212 get_context_stats_.num_cache_compression_dict_add_redundant);
213 }
214 if (get_context_stats_.num_cache_compression_dict_bytes_insert > 0) {
215 RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT,
216 get_context_stats_.num_cache_compression_dict_bytes_insert);
217 }
218 }
219
SaveValue(const ParsedInternalKey & parsed_key,const Slice & value,bool * matched,Cleanable * value_pinner)220 bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
221 const Slice& value, bool* matched,
222 Cleanable* value_pinner) {
223 assert(matched);
224 assert((state_ != kMerge && parsed_key.type != kTypeMerge) ||
225 merge_context_ != nullptr);
226 if (ucmp_->EqualWithoutTimestamp(parsed_key.user_key, user_key_)) {
227 *matched = true;
228 // If the value is not in the snapshot, skip it
229 if (!CheckCallback(parsed_key.sequence)) {
230 return true; // to continue to the next seq
231 }
232
233 appendToReplayLog(replay_log_, parsed_key.type, value);
234
235 if (seq_ != nullptr) {
236 // Set the sequence number if it is uninitialized
237 if (*seq_ == kMaxSequenceNumber) {
238 *seq_ = parsed_key.sequence;
239 }
240 }
241
242 auto type = parsed_key.type;
243 // Key matches. Process it
244 if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex) &&
245 max_covering_tombstone_seq_ != nullptr &&
246 *max_covering_tombstone_seq_ > parsed_key.sequence) {
247 type = kTypeRangeDeletion;
248 }
249 switch (type) {
250 case kTypeValue:
251 case kTypeBlobIndex:
252 assert(state_ == kNotFound || state_ == kMerge);
253 if (type == kTypeBlobIndex && is_blob_index_ == nullptr) {
254 // Blob value not supported. Stop.
255 state_ = kUnexpectedBlobIndex;
256 return false;
257 }
258 if (is_blob_index_ != nullptr) {
259 *is_blob_index_ = (type == kTypeBlobIndex);
260 }
261 if (kNotFound == state_) {
262 state_ = kFound;
263 if (do_merge_) {
264 if (LIKELY(pinnable_val_ != nullptr)) {
265 if (LIKELY(value_pinner != nullptr)) {
266 // If the backing resources for the value are provided, pin them
267 pinnable_val_->PinSlice(value, value_pinner);
268 } else {
269 TEST_SYNC_POINT_CALLBACK("GetContext::SaveValue::PinSelf",
270 this);
271 // Otherwise copy the value
272 pinnable_val_->PinSelf(value);
273 }
274 }
275 } else {
276 // It means this function is called as part of DB GetMergeOperands
277 // API and the current value should be part of
278 // merge_context_->operand_list
279 if (is_blob_index_ != nullptr && *is_blob_index_) {
280 PinnableSlice pin_val;
281 if (GetBlobValue(value, &pin_val) == false) {
282 return false;
283 }
284 Slice blob_value(pin_val);
285 push_operand(blob_value, nullptr);
286 } else {
287 push_operand(value, value_pinner);
288 }
289 }
290 } else if (kMerge == state_) {
291 assert(merge_operator_ != nullptr);
292 if (is_blob_index_ != nullptr && *is_blob_index_) {
293 PinnableSlice pin_val;
294 if (GetBlobValue(value, &pin_val) == false) {
295 return false;
296 }
297 Slice blob_value(pin_val);
298 state_ = kFound;
299 if (do_merge_) {
300 Merge(&blob_value);
301 } else {
302 // It means this function is called as part of DB GetMergeOperands
303 // API and the current value should be part of
304 // merge_context_->operand_list
305 push_operand(blob_value, nullptr);
306 }
307 } else {
308 state_ = kFound;
309 if (do_merge_) {
310 Merge(&value);
311 } else {
312 // It means this function is called as part of DB GetMergeOperands
313 // API and the current value should be part of
314 // merge_context_->operand_list
315 push_operand(value, value_pinner);
316 }
317 }
318 }
319 if (state_ == kFound) {
320 size_t ts_sz = ucmp_->timestamp_size();
321 if (ts_sz > 0 && timestamp_ != nullptr) {
322 Slice ts = ExtractTimestampFromUserKey(parsed_key.user_key, ts_sz);
323 timestamp_->assign(ts.data(), ts.size());
324 }
325 }
326 return false;
327
328 case kTypeDeletion:
329 case kTypeDeletionWithTimestamp:
330 case kTypeSingleDeletion:
331 case kTypeRangeDeletion:
332 // TODO(noetzli): Verify correctness once merge of single-deletes
333 // is supported
334 assert(state_ == kNotFound || state_ == kMerge);
335 if (kNotFound == state_) {
336 state_ = kDeleted;
337 } else if (kMerge == state_) {
338 state_ = kFound;
339 Merge(nullptr);
340 // If do_merge_ = false then the current value shouldn't be part of
341 // merge_context_->operand_list
342 }
343 return false;
344
345 case kTypeMerge:
346 assert(state_ == kNotFound || state_ == kMerge);
347 state_ = kMerge;
348 // value_pinner is not set from plain_table_reader.cc for example.
349 push_operand(value, value_pinner);
350 if (do_merge_ && merge_operator_ != nullptr &&
351 merge_operator_->ShouldMerge(
352 merge_context_->GetOperandsDirectionBackward())) {
353 state_ = kFound;
354 Merge(nullptr);
355 return false;
356 }
357 return true;
358
359 default:
360 assert(false);
361 break;
362 }
363 }
364
365 // state_ could be Corrupt, merge or notfound
366 return false;
367 }
368
Merge(const Slice * value)369 void GetContext::Merge(const Slice* value) {
370 if (LIKELY(pinnable_val_ != nullptr)) {
371 if (do_merge_) {
372 Status merge_status = MergeHelper::TimedFullMerge(
373 merge_operator_, user_key_, value, merge_context_->GetOperands(),
374 pinnable_val_->GetSelf(), logger_, statistics_, clock_);
375 pinnable_val_->PinSelf();
376 if (!merge_status.ok()) {
377 state_ = kCorrupt;
378 }
379 }
380 }
381 }
382
GetBlobValue(const Slice & blob_index,PinnableSlice * blob_value)383 bool GetContext::GetBlobValue(const Slice& blob_index,
384 PinnableSlice* blob_value) {
385 Status status = blob_fetcher_->FetchBlob(user_key_, blob_index, blob_value);
386 if (!status.ok()) {
387 if (status.IsIncomplete()) {
388 MarkKeyMayExist();
389 return false;
390 }
391 state_ = kCorrupt;
392 return false;
393 }
394 *is_blob_index_ = false;
395 return true;
396 }
397
push_operand(const Slice & value,Cleanable * value_pinner)398 void GetContext::push_operand(const Slice& value, Cleanable* value_pinner) {
399 if (pinned_iters_mgr() && pinned_iters_mgr()->PinningEnabled() &&
400 value_pinner != nullptr) {
401 value_pinner->DelegateCleanupsTo(pinned_iters_mgr());
402 merge_context_->PushOperand(value, true /*value_pinned*/);
403 } else {
404 merge_context_->PushOperand(value, false);
405 }
406 }
407
replayGetContextLog(const Slice & replay_log,const Slice & user_key,GetContext * get_context,Cleanable * value_pinner)408 void replayGetContextLog(const Slice& replay_log, const Slice& user_key,
409 GetContext* get_context, Cleanable* value_pinner) {
410 #ifndef ROCKSDB_LITE
411 Slice s = replay_log;
412 while (s.size()) {
413 auto type = static_cast<ValueType>(*s.data());
414 s.remove_prefix(1);
415 Slice value;
416 bool ret = GetLengthPrefixedSlice(&s, &value);
417 assert(ret);
418 (void)ret;
419
420 bool dont_care __attribute__((__unused__));
421 // Since SequenceNumber is not stored and unknown, we will use
422 // kMaxSequenceNumber.
423 get_context->SaveValue(
424 ParsedInternalKey(user_key, kMaxSequenceNumber, type), value,
425 &dont_care, value_pinner);
426 }
427 #else // ROCKSDB_LITE
428 (void)replay_log;
429 (void)user_key;
430 (void)get_context;
431 (void)value_pinner;
432 assert(false);
433 #endif // ROCKSDB_LITE
434 }
435
436 } // namespace ROCKSDB_NAMESPACE
437