1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #include "trace_replay/block_cache_tracer.h"
7 
8 #include <cinttypes>
9 #include <cstdio>
10 #include <cstdlib>
11 
12 #include "db/db_impl/db_impl.h"
13 #include "db/dbformat.h"
14 #include "rocksdb/slice.h"
15 #include "util/coding.h"
16 #include "util/hash.h"
17 #include "util/string_util.h"
18 
19 namespace ROCKSDB_NAMESPACE {
20 
21 namespace {
22 const unsigned int kCharSize = 1;
23 
ShouldTrace(const Slice & block_key,const TraceOptions & trace_options)24 bool ShouldTrace(const Slice& block_key, const TraceOptions& trace_options) {
25   if (trace_options.sampling_frequency == 0 ||
26       trace_options.sampling_frequency == 1) {
27     return true;
28   }
29   // We use spatial downsampling so that we have a complete access history for a
30   // block.
31   return 0 == fastrange64(GetSliceNPHash64(block_key),
32                           trace_options.sampling_frequency);
33 }
34 }  // namespace
35 
36 const uint64_t kMicrosInSecond = 1000 * 1000;
37 const uint64_t kSecondInMinute = 60;
38 const uint64_t kSecondInHour = 3600;
39 const std::string BlockCacheTraceHelper::kUnknownColumnFamilyName =
40     "UnknownColumnFamily";
41 const uint64_t BlockCacheTraceHelper::kReservedGetId = 0;
42 
IsGetOrMultiGetOnDataBlock(TraceType block_type,TableReaderCaller caller)43 bool BlockCacheTraceHelper::IsGetOrMultiGetOnDataBlock(
44     TraceType block_type, TableReaderCaller caller) {
45   return (block_type == TraceType::kBlockTraceDataBlock) &&
46          IsGetOrMultiGet(caller);
47 }
48 
IsGetOrMultiGet(TableReaderCaller caller)49 bool BlockCacheTraceHelper::IsGetOrMultiGet(TableReaderCaller caller) {
50   return caller == TableReaderCaller::kUserGet ||
51          caller == TableReaderCaller::kUserMultiGet;
52 }
53 
IsUserAccess(TableReaderCaller caller)54 bool BlockCacheTraceHelper::IsUserAccess(TableReaderCaller caller) {
55   return caller == TableReaderCaller::kUserGet ||
56          caller == TableReaderCaller::kUserMultiGet ||
57          caller == TableReaderCaller::kUserIterator ||
58          caller == TableReaderCaller::kUserApproximateSize ||
59          caller == TableReaderCaller::kUserVerifyChecksum;
60 }
61 
ComputeRowKey(const BlockCacheTraceRecord & access)62 std::string BlockCacheTraceHelper::ComputeRowKey(
63     const BlockCacheTraceRecord& access) {
64   if (!IsGetOrMultiGet(access.caller)) {
65     return "";
66   }
67   Slice key = ExtractUserKey(access.referenced_key);
68   return std::to_string(access.sst_fd_number) + "_" + key.ToString();
69 }
70 
GetTableId(const BlockCacheTraceRecord & access)71 uint64_t BlockCacheTraceHelper::GetTableId(
72     const BlockCacheTraceRecord& access) {
73   if (!IsGetOrMultiGet(access.caller) || access.referenced_key.size() < 4) {
74     return 0;
75   }
76   return static_cast<uint64_t>(DecodeFixed32(access.referenced_key.data())) + 1;
77 }
78 
GetSequenceNumber(const BlockCacheTraceRecord & access)79 uint64_t BlockCacheTraceHelper::GetSequenceNumber(
80     const BlockCacheTraceRecord& access) {
81   if (!IsGetOrMultiGet(access.caller)) {
82     return 0;
83   }
84   return access.get_from_user_specified_snapshot == Boolean::kFalse
85              ? 0
86              : 1 + GetInternalKeySeqno(access.referenced_key);
87 }
88 
GetBlockOffsetInFile(const BlockCacheTraceRecord & access)89 uint64_t BlockCacheTraceHelper::GetBlockOffsetInFile(
90     const BlockCacheTraceRecord& access) {
91   Slice input(access.block_key);
92   uint64_t offset = 0;
93   while (true) {
94     uint64_t tmp = 0;
95     if (GetVarint64(&input, &tmp)) {
96       offset = tmp;
97     } else {
98       break;
99     }
100   }
101   return offset;
102 }
103 
BlockCacheTraceWriter(Env * env,const TraceOptions & trace_options,std::unique_ptr<TraceWriter> && trace_writer)104 BlockCacheTraceWriter::BlockCacheTraceWriter(
105     Env* env, const TraceOptions& trace_options,
106     std::unique_ptr<TraceWriter>&& trace_writer)
107     : env_(env),
108       trace_options_(trace_options),
109       trace_writer_(std::move(trace_writer)) {}
110 
WriteBlockAccess(const BlockCacheTraceRecord & record,const Slice & block_key,const Slice & cf_name,const Slice & referenced_key)111 Status BlockCacheTraceWriter::WriteBlockAccess(
112     const BlockCacheTraceRecord& record, const Slice& block_key,
113     const Slice& cf_name, const Slice& referenced_key) {
114   uint64_t trace_file_size = trace_writer_->GetFileSize();
115   if (trace_file_size > trace_options_.max_trace_file_size) {
116     return Status::OK();
117   }
118   Trace trace;
119   trace.ts = record.access_timestamp;
120   trace.type = record.block_type;
121   PutLengthPrefixedSlice(&trace.payload, block_key);
122   PutFixed64(&trace.payload, record.block_size);
123   PutFixed64(&trace.payload, record.cf_id);
124   PutLengthPrefixedSlice(&trace.payload, cf_name);
125   PutFixed32(&trace.payload, record.level);
126   PutFixed64(&trace.payload, record.sst_fd_number);
127   trace.payload.push_back(record.caller);
128   trace.payload.push_back(record.is_cache_hit);
129   trace.payload.push_back(record.no_insert);
130   if (BlockCacheTraceHelper::IsGetOrMultiGet(record.caller)) {
131     PutFixed64(&trace.payload, record.get_id);
132     trace.payload.push_back(record.get_from_user_specified_snapshot);
133     PutLengthPrefixedSlice(&trace.payload, referenced_key);
134   }
135   if (BlockCacheTraceHelper::IsGetOrMultiGetOnDataBlock(record.block_type,
136                                                         record.caller)) {
137     PutFixed64(&trace.payload, record.referenced_data_size);
138     PutFixed64(&trace.payload, record.num_keys_in_block);
139     trace.payload.push_back(record.referenced_key_exist_in_block);
140   }
141   std::string encoded_trace;
142   TracerHelper::EncodeTrace(trace, &encoded_trace);
143   return trace_writer_->Write(encoded_trace);
144 }
145 
WriteHeader()146 Status BlockCacheTraceWriter::WriteHeader() {
147   Trace trace;
148   trace.ts = env_->NowMicros();
149   trace.type = TraceType::kTraceBegin;
150   PutLengthPrefixedSlice(&trace.payload, kTraceMagic);
151   PutFixed32(&trace.payload, kMajorVersion);
152   PutFixed32(&trace.payload, kMinorVersion);
153   std::string encoded_trace;
154   TracerHelper::EncodeTrace(trace, &encoded_trace);
155   return trace_writer_->Write(encoded_trace);
156 }
157 
BlockCacheTraceReader(std::unique_ptr<TraceReader> && reader)158 BlockCacheTraceReader::BlockCacheTraceReader(
159     std::unique_ptr<TraceReader>&& reader)
160     : trace_reader_(std::move(reader)) {}
161 
ReadHeader(BlockCacheTraceHeader * header)162 Status BlockCacheTraceReader::ReadHeader(BlockCacheTraceHeader* header) {
163   assert(header != nullptr);
164   std::string encoded_trace;
165   Status s = trace_reader_->Read(&encoded_trace);
166   if (!s.ok()) {
167     return s;
168   }
169   Trace trace;
170   s = TracerHelper::DecodeTrace(encoded_trace, &trace);
171   if (!s.ok()) {
172     return s;
173   }
174   header->start_time = trace.ts;
175   Slice enc_slice = Slice(trace.payload);
176   Slice magnic_number;
177   if (!GetLengthPrefixedSlice(&enc_slice, &magnic_number)) {
178     return Status::Corruption(
179         "Corrupted header in the trace file: Failed to read the magic number.");
180   }
181   if (magnic_number.ToString() != kTraceMagic) {
182     return Status::Corruption(
183         "Corrupted header in the trace file: Magic number does not match.");
184   }
185   if (!GetFixed32(&enc_slice, &header->rocksdb_major_version)) {
186     return Status::Corruption(
187         "Corrupted header in the trace file: Failed to read rocksdb major "
188         "version number.");
189   }
190   if (!GetFixed32(&enc_slice, &header->rocksdb_minor_version)) {
191     return Status::Corruption(
192         "Corrupted header in the trace file: Failed to read rocksdb minor "
193         "version number.");
194   }
195   // We should have retrieved all information in the header.
196   if (!enc_slice.empty()) {
197     return Status::Corruption(
198         "Corrupted header in the trace file: The length of header is too "
199         "long.");
200   }
201   return Status::OK();
202 }
203 
ReadAccess(BlockCacheTraceRecord * record)204 Status BlockCacheTraceReader::ReadAccess(BlockCacheTraceRecord* record) {
205   assert(record);
206   std::string encoded_trace;
207   Status s = trace_reader_->Read(&encoded_trace);
208   if (!s.ok()) {
209     return s;
210   }
211   Trace trace;
212   s = TracerHelper::DecodeTrace(encoded_trace, &trace);
213   if (!s.ok()) {
214     return s;
215   }
216   record->access_timestamp = trace.ts;
217   record->block_type = trace.type;
218   Slice enc_slice = Slice(trace.payload);
219 
220   Slice block_key;
221   if (!GetLengthPrefixedSlice(&enc_slice, &block_key)) {
222     return Status::Incomplete(
223         "Incomplete access record: Failed to read block key.");
224   }
225   record->block_key = block_key.ToString();
226   if (!GetFixed64(&enc_slice, &record->block_size)) {
227     return Status::Incomplete(
228         "Incomplete access record: Failed to read block size.");
229   }
230   if (!GetFixed64(&enc_slice, &record->cf_id)) {
231     return Status::Incomplete(
232         "Incomplete access record: Failed to read column family ID.");
233   }
234   Slice cf_name;
235   if (!GetLengthPrefixedSlice(&enc_slice, &cf_name)) {
236     return Status::Incomplete(
237         "Incomplete access record: Failed to read column family name.");
238   }
239   record->cf_name = cf_name.ToString();
240   if (!GetFixed32(&enc_slice, &record->level)) {
241     return Status::Incomplete(
242         "Incomplete access record: Failed to read level.");
243   }
244   if (!GetFixed64(&enc_slice, &record->sst_fd_number)) {
245     return Status::Incomplete(
246         "Incomplete access record: Failed to read SST file number.");
247   }
248   if (enc_slice.empty()) {
249     return Status::Incomplete(
250         "Incomplete access record: Failed to read caller.");
251   }
252   record->caller = static_cast<TableReaderCaller>(enc_slice[0]);
253   enc_slice.remove_prefix(kCharSize);
254   if (enc_slice.empty()) {
255     return Status::Incomplete(
256         "Incomplete access record: Failed to read is_cache_hit.");
257   }
258   record->is_cache_hit = static_cast<Boolean>(enc_slice[0]);
259   enc_slice.remove_prefix(kCharSize);
260   if (enc_slice.empty()) {
261     return Status::Incomplete(
262         "Incomplete access record: Failed to read no_insert.");
263   }
264   record->no_insert = static_cast<Boolean>(enc_slice[0]);
265   enc_slice.remove_prefix(kCharSize);
266   if (BlockCacheTraceHelper::IsGetOrMultiGet(record->caller)) {
267     if (!GetFixed64(&enc_slice, &record->get_id)) {
268       return Status::Incomplete(
269           "Incomplete access record: Failed to read the get id.");
270     }
271     if (enc_slice.empty()) {
272       return Status::Incomplete(
273           "Incomplete access record: Failed to read "
274           "get_from_user_specified_snapshot.");
275     }
276     record->get_from_user_specified_snapshot =
277         static_cast<Boolean>(enc_slice[0]);
278     enc_slice.remove_prefix(kCharSize);
279     Slice referenced_key;
280     if (!GetLengthPrefixedSlice(&enc_slice, &referenced_key)) {
281       return Status::Incomplete(
282           "Incomplete access record: Failed to read the referenced key.");
283     }
284     record->referenced_key = referenced_key.ToString();
285   }
286   if (BlockCacheTraceHelper::IsGetOrMultiGetOnDataBlock(record->block_type,
287                                                         record->caller)) {
288     if (!GetFixed64(&enc_slice, &record->referenced_data_size)) {
289       return Status::Incomplete(
290           "Incomplete access record: Failed to read the referenced data size.");
291     }
292     if (!GetFixed64(&enc_slice, &record->num_keys_in_block)) {
293       return Status::Incomplete(
294           "Incomplete access record: Failed to read the number of keys in the "
295           "block.");
296     }
297     if (enc_slice.empty()) {
298       return Status::Incomplete(
299           "Incomplete access record: Failed to read "
300           "referenced_key_exist_in_block.");
301     }
302     record->referenced_key_exist_in_block = static_cast<Boolean>(enc_slice[0]);
303   }
304   return Status::OK();
305 }
306 
~BlockCacheHumanReadableTraceWriter()307 BlockCacheHumanReadableTraceWriter::~BlockCacheHumanReadableTraceWriter() {
308   if (human_readable_trace_file_writer_) {
309     human_readable_trace_file_writer_->Flush();
310     human_readable_trace_file_writer_->Close();
311   }
312 }
313 
NewWritableFile(const std::string & human_readable_trace_file_path,ROCKSDB_NAMESPACE::Env * env)314 Status BlockCacheHumanReadableTraceWriter::NewWritableFile(
315     const std::string& human_readable_trace_file_path,
316     ROCKSDB_NAMESPACE::Env* env) {
317   if (human_readable_trace_file_path.empty()) {
318     return Status::InvalidArgument(
319         "The provided human_readable_trace_file_path is null.");
320   }
321   return env->NewWritableFile(human_readable_trace_file_path,
322                               &human_readable_trace_file_writer_, EnvOptions());
323 }
324 
WriteHumanReadableTraceRecord(const BlockCacheTraceRecord & access,uint64_t block_id,uint64_t get_key_id)325 Status BlockCacheHumanReadableTraceWriter::WriteHumanReadableTraceRecord(
326     const BlockCacheTraceRecord& access, uint64_t block_id,
327     uint64_t get_key_id) {
328   if (!human_readable_trace_file_writer_) {
329     return Status::OK();
330   }
331   int ret = snprintf(
332       trace_record_buffer_, sizeof(trace_record_buffer_),
333       "%" PRIu64 ",%" PRIu64 ",%u,%" PRIu64 ",%" PRIu64 ",%s,%" PRIu32
334       ",%" PRIu64 ",%u,%u,%" PRIu64 ",%" PRIu64 ",%" PRIu64 ",%u,%u,%" PRIu64
335       ",%" PRIu64 ",%" PRIu64 ",%" PRIu64 ",%" PRIu64 ",%" PRIu64 "\n",
336       access.access_timestamp, block_id, access.block_type, access.block_size,
337       access.cf_id, access.cf_name.c_str(), access.level, access.sst_fd_number,
338       access.caller, access.no_insert, access.get_id, get_key_id,
339       access.referenced_data_size, access.is_cache_hit,
340       access.referenced_key_exist_in_block, access.num_keys_in_block,
341       BlockCacheTraceHelper::GetTableId(access),
342       BlockCacheTraceHelper::GetSequenceNumber(access),
343       static_cast<uint64_t>(access.block_key.size()),
344       static_cast<uint64_t>(access.referenced_key.size()),
345       BlockCacheTraceHelper::GetBlockOffsetInFile(access));
346   if (ret < 0) {
347     return Status::IOError("failed to format the output");
348   }
349   std::string printout(trace_record_buffer_);
350   return human_readable_trace_file_writer_->Append(printout);
351 }
352 
BlockCacheHumanReadableTraceReader(const std::string & trace_file_path)353 BlockCacheHumanReadableTraceReader::BlockCacheHumanReadableTraceReader(
354     const std::string& trace_file_path)
355     : BlockCacheTraceReader(/*trace_reader=*/nullptr) {
356   human_readable_trace_reader_.open(trace_file_path, std::ifstream::in);
357 }
358 
~BlockCacheHumanReadableTraceReader()359 BlockCacheHumanReadableTraceReader::~BlockCacheHumanReadableTraceReader() {
360   human_readable_trace_reader_.close();
361 }
362 
ReadHeader(BlockCacheTraceHeader *)363 Status BlockCacheHumanReadableTraceReader::ReadHeader(
364     BlockCacheTraceHeader* /*header*/) {
365   return Status::OK();
366 }
367 
ReadAccess(BlockCacheTraceRecord * record)368 Status BlockCacheHumanReadableTraceReader::ReadAccess(
369     BlockCacheTraceRecord* record) {
370   std::string line;
371   if (!std::getline(human_readable_trace_reader_, line)) {
372     return Status::Incomplete("No more records to read.");
373   }
374   std::stringstream ss(line);
375   std::vector<std::string> record_strs;
376   while (ss.good()) {
377     std::string substr;
378     getline(ss, substr, ',');
379     record_strs.push_back(substr);
380   }
381   if (record_strs.size() != 21) {
382     return Status::Incomplete("Records format is wrong.");
383   }
384 
385   record->access_timestamp = ParseUint64(record_strs[0]);
386   uint64_t block_key = ParseUint64(record_strs[1]);
387   record->block_type = static_cast<TraceType>(ParseUint64(record_strs[2]));
388   record->block_size = ParseUint64(record_strs[3]);
389   record->cf_id = ParseUint64(record_strs[4]);
390   record->cf_name = record_strs[5];
391   record->level = static_cast<uint32_t>(ParseUint64(record_strs[6]));
392   record->sst_fd_number = ParseUint64(record_strs[7]);
393   record->caller = static_cast<TableReaderCaller>(ParseUint64(record_strs[8]));
394   record->no_insert = static_cast<Boolean>(ParseUint64(record_strs[9]));
395   record->get_id = ParseUint64(record_strs[10]);
396   uint64_t get_key_id = ParseUint64(record_strs[11]);
397 
398   record->referenced_data_size = ParseUint64(record_strs[12]);
399   record->is_cache_hit = static_cast<Boolean>(ParseUint64(record_strs[13]));
400   record->referenced_key_exist_in_block =
401       static_cast<Boolean>(ParseUint64(record_strs[14]));
402   record->num_keys_in_block = ParseUint64(record_strs[15]);
403   uint64_t table_id = ParseUint64(record_strs[16]);
404   if (table_id > 0) {
405     // Decrement since valid table id in the trace file equals traced table id
406     // + 1.
407     table_id -= 1;
408   }
409   uint64_t get_sequence_number = ParseUint64(record_strs[17]);
410   if (get_sequence_number > 0) {
411     record->get_from_user_specified_snapshot = Boolean::kTrue;
412     // Decrement since valid seq number in the trace file equals traced seq
413     // number + 1.
414     get_sequence_number -= 1;
415   }
416   uint64_t block_key_size = ParseUint64(record_strs[18]);
417   uint64_t get_key_size = ParseUint64(record_strs[19]);
418   uint64_t block_offset = ParseUint64(record_strs[20]);
419 
420   std::string tmp_block_key;
421   PutVarint64(&tmp_block_key, block_key);
422   PutVarint64(&tmp_block_key, block_offset);
423   // Append 1 until the size is the same as traced block key size.
424   while (record->block_key.size() < block_key_size - tmp_block_key.size()) {
425     record->block_key += "1";
426   }
427   record->block_key += tmp_block_key;
428 
429   if (get_key_id != 0) {
430     std::string tmp_get_key;
431     PutFixed64(&tmp_get_key, get_key_id);
432     PutFixed64(&tmp_get_key, get_sequence_number << 8);
433     PutFixed32(&record->referenced_key, static_cast<uint32_t>(table_id));
434     // Append 1 until the size is the same as traced key size.
435     while (record->referenced_key.size() < get_key_size - tmp_get_key.size()) {
436       record->referenced_key += "1";
437     }
438     record->referenced_key += tmp_get_key;
439   }
440   return Status::OK();
441 }
442 
BlockCacheTracer()443 BlockCacheTracer::BlockCacheTracer() { writer_.store(nullptr); }
444 
~BlockCacheTracer()445 BlockCacheTracer::~BlockCacheTracer() { EndTrace(); }
446 
StartTrace(Env * env,const TraceOptions & trace_options,std::unique_ptr<TraceWriter> && trace_writer)447 Status BlockCacheTracer::StartTrace(
448     Env* env, const TraceOptions& trace_options,
449     std::unique_ptr<TraceWriter>&& trace_writer) {
450   InstrumentedMutexLock lock_guard(&trace_writer_mutex_);
451   if (writer_.load()) {
452     return Status::Busy();
453   }
454   get_id_counter_.store(1);
455   trace_options_ = trace_options;
456   writer_.store(
457       new BlockCacheTraceWriter(env, trace_options, std::move(trace_writer)));
458   return writer_.load()->WriteHeader();
459 }
460 
EndTrace()461 void BlockCacheTracer::EndTrace() {
462   InstrumentedMutexLock lock_guard(&trace_writer_mutex_);
463   if (!writer_.load()) {
464     return;
465   }
466   delete writer_.load();
467   writer_.store(nullptr);
468 }
469 
WriteBlockAccess(const BlockCacheTraceRecord & record,const Slice & block_key,const Slice & cf_name,const Slice & referenced_key)470 Status BlockCacheTracer::WriteBlockAccess(const BlockCacheTraceRecord& record,
471                                           const Slice& block_key,
472                                           const Slice& cf_name,
473                                           const Slice& referenced_key) {
474   if (!writer_.load() || !ShouldTrace(block_key, trace_options_)) {
475     return Status::OK();
476   }
477   InstrumentedMutexLock lock_guard(&trace_writer_mutex_);
478   if (!writer_.load()) {
479     return Status::OK();
480   }
481   return writer_.load()->WriteBlockAccess(record, block_key, cf_name,
482                                           referenced_key);
483 }
484 
NextGetId()485 uint64_t BlockCacheTracer::NextGetId() {
486   if (!writer_.load(std::memory_order_relaxed)) {
487     return BlockCacheTraceHelper::kReservedGetId;
488   }
489   uint64_t prev_value = get_id_counter_.fetch_add(1);
490   if (prev_value == BlockCacheTraceHelper::kReservedGetId) {
491     // fetch and add again.
492     return get_id_counter_.fetch_add(1);
493   }
494   return prev_value;
495 }
496 
497 }  // namespace ROCKSDB_NAMESPACE
498