1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "trace_replay/block_cache_tracer.h"
7
8 #include <cinttypes>
9 #include <cstdio>
10 #include <cstdlib>
11
12 #include "db/db_impl/db_impl.h"
13 #include "db/dbformat.h"
14 #include "rocksdb/slice.h"
15 #include "util/coding.h"
16 #include "util/hash.h"
17 #include "util/string_util.h"
18
19 namespace ROCKSDB_NAMESPACE {
20
21 namespace {
22 const unsigned int kCharSize = 1;
23
ShouldTrace(const Slice & block_key,const TraceOptions & trace_options)24 bool ShouldTrace(const Slice& block_key, const TraceOptions& trace_options) {
25 if (trace_options.sampling_frequency == 0 ||
26 trace_options.sampling_frequency == 1) {
27 return true;
28 }
29 // We use spatial downsampling so that we have a complete access history for a
30 // block.
31 return 0 == fastrange64(GetSliceNPHash64(block_key),
32 trace_options.sampling_frequency);
33 }
34 } // namespace
35
36 const uint64_t kMicrosInSecond = 1000 * 1000;
37 const uint64_t kSecondInMinute = 60;
38 const uint64_t kSecondInHour = 3600;
39 const std::string BlockCacheTraceHelper::kUnknownColumnFamilyName =
40 "UnknownColumnFamily";
41 const uint64_t BlockCacheTraceHelper::kReservedGetId = 0;
42
IsGetOrMultiGetOnDataBlock(TraceType block_type,TableReaderCaller caller)43 bool BlockCacheTraceHelper::IsGetOrMultiGetOnDataBlock(
44 TraceType block_type, TableReaderCaller caller) {
45 return (block_type == TraceType::kBlockTraceDataBlock) &&
46 IsGetOrMultiGet(caller);
47 }
48
IsGetOrMultiGet(TableReaderCaller caller)49 bool BlockCacheTraceHelper::IsGetOrMultiGet(TableReaderCaller caller) {
50 return caller == TableReaderCaller::kUserGet ||
51 caller == TableReaderCaller::kUserMultiGet;
52 }
53
IsUserAccess(TableReaderCaller caller)54 bool BlockCacheTraceHelper::IsUserAccess(TableReaderCaller caller) {
55 return caller == TableReaderCaller::kUserGet ||
56 caller == TableReaderCaller::kUserMultiGet ||
57 caller == TableReaderCaller::kUserIterator ||
58 caller == TableReaderCaller::kUserApproximateSize ||
59 caller == TableReaderCaller::kUserVerifyChecksum;
60 }
61
ComputeRowKey(const BlockCacheTraceRecord & access)62 std::string BlockCacheTraceHelper::ComputeRowKey(
63 const BlockCacheTraceRecord& access) {
64 if (!IsGetOrMultiGet(access.caller)) {
65 return "";
66 }
67 Slice key = ExtractUserKey(access.referenced_key);
68 return std::to_string(access.sst_fd_number) + "_" + key.ToString();
69 }
70
GetTableId(const BlockCacheTraceRecord & access)71 uint64_t BlockCacheTraceHelper::GetTableId(
72 const BlockCacheTraceRecord& access) {
73 if (!IsGetOrMultiGet(access.caller) || access.referenced_key.size() < 4) {
74 return 0;
75 }
76 return static_cast<uint64_t>(DecodeFixed32(access.referenced_key.data())) + 1;
77 }
78
GetSequenceNumber(const BlockCacheTraceRecord & access)79 uint64_t BlockCacheTraceHelper::GetSequenceNumber(
80 const BlockCacheTraceRecord& access) {
81 if (!IsGetOrMultiGet(access.caller)) {
82 return 0;
83 }
84 return access.get_from_user_specified_snapshot == Boolean::kFalse
85 ? 0
86 : 1 + GetInternalKeySeqno(access.referenced_key);
87 }
88
GetBlockOffsetInFile(const BlockCacheTraceRecord & access)89 uint64_t BlockCacheTraceHelper::GetBlockOffsetInFile(
90 const BlockCacheTraceRecord& access) {
91 Slice input(access.block_key);
92 uint64_t offset = 0;
93 while (true) {
94 uint64_t tmp = 0;
95 if (GetVarint64(&input, &tmp)) {
96 offset = tmp;
97 } else {
98 break;
99 }
100 }
101 return offset;
102 }
103
BlockCacheTraceWriter(Env * env,const TraceOptions & trace_options,std::unique_ptr<TraceWriter> && trace_writer)104 BlockCacheTraceWriter::BlockCacheTraceWriter(
105 Env* env, const TraceOptions& trace_options,
106 std::unique_ptr<TraceWriter>&& trace_writer)
107 : env_(env),
108 trace_options_(trace_options),
109 trace_writer_(std::move(trace_writer)) {}
110
WriteBlockAccess(const BlockCacheTraceRecord & record,const Slice & block_key,const Slice & cf_name,const Slice & referenced_key)111 Status BlockCacheTraceWriter::WriteBlockAccess(
112 const BlockCacheTraceRecord& record, const Slice& block_key,
113 const Slice& cf_name, const Slice& referenced_key) {
114 uint64_t trace_file_size = trace_writer_->GetFileSize();
115 if (trace_file_size > trace_options_.max_trace_file_size) {
116 return Status::OK();
117 }
118 Trace trace;
119 trace.ts = record.access_timestamp;
120 trace.type = record.block_type;
121 PutLengthPrefixedSlice(&trace.payload, block_key);
122 PutFixed64(&trace.payload, record.block_size);
123 PutFixed64(&trace.payload, record.cf_id);
124 PutLengthPrefixedSlice(&trace.payload, cf_name);
125 PutFixed32(&trace.payload, record.level);
126 PutFixed64(&trace.payload, record.sst_fd_number);
127 trace.payload.push_back(record.caller);
128 trace.payload.push_back(record.is_cache_hit);
129 trace.payload.push_back(record.no_insert);
130 if (BlockCacheTraceHelper::IsGetOrMultiGet(record.caller)) {
131 PutFixed64(&trace.payload, record.get_id);
132 trace.payload.push_back(record.get_from_user_specified_snapshot);
133 PutLengthPrefixedSlice(&trace.payload, referenced_key);
134 }
135 if (BlockCacheTraceHelper::IsGetOrMultiGetOnDataBlock(record.block_type,
136 record.caller)) {
137 PutFixed64(&trace.payload, record.referenced_data_size);
138 PutFixed64(&trace.payload, record.num_keys_in_block);
139 trace.payload.push_back(record.referenced_key_exist_in_block);
140 }
141 std::string encoded_trace;
142 TracerHelper::EncodeTrace(trace, &encoded_trace);
143 return trace_writer_->Write(encoded_trace);
144 }
145
WriteHeader()146 Status BlockCacheTraceWriter::WriteHeader() {
147 Trace trace;
148 trace.ts = env_->NowMicros();
149 trace.type = TraceType::kTraceBegin;
150 PutLengthPrefixedSlice(&trace.payload, kTraceMagic);
151 PutFixed32(&trace.payload, kMajorVersion);
152 PutFixed32(&trace.payload, kMinorVersion);
153 std::string encoded_trace;
154 TracerHelper::EncodeTrace(trace, &encoded_trace);
155 return trace_writer_->Write(encoded_trace);
156 }
157
BlockCacheTraceReader(std::unique_ptr<TraceReader> && reader)158 BlockCacheTraceReader::BlockCacheTraceReader(
159 std::unique_ptr<TraceReader>&& reader)
160 : trace_reader_(std::move(reader)) {}
161
ReadHeader(BlockCacheTraceHeader * header)162 Status BlockCacheTraceReader::ReadHeader(BlockCacheTraceHeader* header) {
163 assert(header != nullptr);
164 std::string encoded_trace;
165 Status s = trace_reader_->Read(&encoded_trace);
166 if (!s.ok()) {
167 return s;
168 }
169 Trace trace;
170 s = TracerHelper::DecodeTrace(encoded_trace, &trace);
171 if (!s.ok()) {
172 return s;
173 }
174 header->start_time = trace.ts;
175 Slice enc_slice = Slice(trace.payload);
176 Slice magnic_number;
177 if (!GetLengthPrefixedSlice(&enc_slice, &magnic_number)) {
178 return Status::Corruption(
179 "Corrupted header in the trace file: Failed to read the magic number.");
180 }
181 if (magnic_number.ToString() != kTraceMagic) {
182 return Status::Corruption(
183 "Corrupted header in the trace file: Magic number does not match.");
184 }
185 if (!GetFixed32(&enc_slice, &header->rocksdb_major_version)) {
186 return Status::Corruption(
187 "Corrupted header in the trace file: Failed to read rocksdb major "
188 "version number.");
189 }
190 if (!GetFixed32(&enc_slice, &header->rocksdb_minor_version)) {
191 return Status::Corruption(
192 "Corrupted header in the trace file: Failed to read rocksdb minor "
193 "version number.");
194 }
195 // We should have retrieved all information in the header.
196 if (!enc_slice.empty()) {
197 return Status::Corruption(
198 "Corrupted header in the trace file: The length of header is too "
199 "long.");
200 }
201 return Status::OK();
202 }
203
ReadAccess(BlockCacheTraceRecord * record)204 Status BlockCacheTraceReader::ReadAccess(BlockCacheTraceRecord* record) {
205 assert(record);
206 std::string encoded_trace;
207 Status s = trace_reader_->Read(&encoded_trace);
208 if (!s.ok()) {
209 return s;
210 }
211 Trace trace;
212 s = TracerHelper::DecodeTrace(encoded_trace, &trace);
213 if (!s.ok()) {
214 return s;
215 }
216 record->access_timestamp = trace.ts;
217 record->block_type = trace.type;
218 Slice enc_slice = Slice(trace.payload);
219
220 Slice block_key;
221 if (!GetLengthPrefixedSlice(&enc_slice, &block_key)) {
222 return Status::Incomplete(
223 "Incomplete access record: Failed to read block key.");
224 }
225 record->block_key = block_key.ToString();
226 if (!GetFixed64(&enc_slice, &record->block_size)) {
227 return Status::Incomplete(
228 "Incomplete access record: Failed to read block size.");
229 }
230 if (!GetFixed64(&enc_slice, &record->cf_id)) {
231 return Status::Incomplete(
232 "Incomplete access record: Failed to read column family ID.");
233 }
234 Slice cf_name;
235 if (!GetLengthPrefixedSlice(&enc_slice, &cf_name)) {
236 return Status::Incomplete(
237 "Incomplete access record: Failed to read column family name.");
238 }
239 record->cf_name = cf_name.ToString();
240 if (!GetFixed32(&enc_slice, &record->level)) {
241 return Status::Incomplete(
242 "Incomplete access record: Failed to read level.");
243 }
244 if (!GetFixed64(&enc_slice, &record->sst_fd_number)) {
245 return Status::Incomplete(
246 "Incomplete access record: Failed to read SST file number.");
247 }
248 if (enc_slice.empty()) {
249 return Status::Incomplete(
250 "Incomplete access record: Failed to read caller.");
251 }
252 record->caller = static_cast<TableReaderCaller>(enc_slice[0]);
253 enc_slice.remove_prefix(kCharSize);
254 if (enc_slice.empty()) {
255 return Status::Incomplete(
256 "Incomplete access record: Failed to read is_cache_hit.");
257 }
258 record->is_cache_hit = static_cast<Boolean>(enc_slice[0]);
259 enc_slice.remove_prefix(kCharSize);
260 if (enc_slice.empty()) {
261 return Status::Incomplete(
262 "Incomplete access record: Failed to read no_insert.");
263 }
264 record->no_insert = static_cast<Boolean>(enc_slice[0]);
265 enc_slice.remove_prefix(kCharSize);
266 if (BlockCacheTraceHelper::IsGetOrMultiGet(record->caller)) {
267 if (!GetFixed64(&enc_slice, &record->get_id)) {
268 return Status::Incomplete(
269 "Incomplete access record: Failed to read the get id.");
270 }
271 if (enc_slice.empty()) {
272 return Status::Incomplete(
273 "Incomplete access record: Failed to read "
274 "get_from_user_specified_snapshot.");
275 }
276 record->get_from_user_specified_snapshot =
277 static_cast<Boolean>(enc_slice[0]);
278 enc_slice.remove_prefix(kCharSize);
279 Slice referenced_key;
280 if (!GetLengthPrefixedSlice(&enc_slice, &referenced_key)) {
281 return Status::Incomplete(
282 "Incomplete access record: Failed to read the referenced key.");
283 }
284 record->referenced_key = referenced_key.ToString();
285 }
286 if (BlockCacheTraceHelper::IsGetOrMultiGetOnDataBlock(record->block_type,
287 record->caller)) {
288 if (!GetFixed64(&enc_slice, &record->referenced_data_size)) {
289 return Status::Incomplete(
290 "Incomplete access record: Failed to read the referenced data size.");
291 }
292 if (!GetFixed64(&enc_slice, &record->num_keys_in_block)) {
293 return Status::Incomplete(
294 "Incomplete access record: Failed to read the number of keys in the "
295 "block.");
296 }
297 if (enc_slice.empty()) {
298 return Status::Incomplete(
299 "Incomplete access record: Failed to read "
300 "referenced_key_exist_in_block.");
301 }
302 record->referenced_key_exist_in_block = static_cast<Boolean>(enc_slice[0]);
303 }
304 return Status::OK();
305 }
306
~BlockCacheHumanReadableTraceWriter()307 BlockCacheHumanReadableTraceWriter::~BlockCacheHumanReadableTraceWriter() {
308 if (human_readable_trace_file_writer_) {
309 human_readable_trace_file_writer_->Flush();
310 human_readable_trace_file_writer_->Close();
311 }
312 }
313
NewWritableFile(const std::string & human_readable_trace_file_path,ROCKSDB_NAMESPACE::Env * env)314 Status BlockCacheHumanReadableTraceWriter::NewWritableFile(
315 const std::string& human_readable_trace_file_path,
316 ROCKSDB_NAMESPACE::Env* env) {
317 if (human_readable_trace_file_path.empty()) {
318 return Status::InvalidArgument(
319 "The provided human_readable_trace_file_path is null.");
320 }
321 return env->NewWritableFile(human_readable_trace_file_path,
322 &human_readable_trace_file_writer_, EnvOptions());
323 }
324
WriteHumanReadableTraceRecord(const BlockCacheTraceRecord & access,uint64_t block_id,uint64_t get_key_id)325 Status BlockCacheHumanReadableTraceWriter::WriteHumanReadableTraceRecord(
326 const BlockCacheTraceRecord& access, uint64_t block_id,
327 uint64_t get_key_id) {
328 if (!human_readable_trace_file_writer_) {
329 return Status::OK();
330 }
331 int ret = snprintf(
332 trace_record_buffer_, sizeof(trace_record_buffer_),
333 "%" PRIu64 ",%" PRIu64 ",%u,%" PRIu64 ",%" PRIu64 ",%s,%" PRIu32
334 ",%" PRIu64 ",%u,%u,%" PRIu64 ",%" PRIu64 ",%" PRIu64 ",%u,%u,%" PRIu64
335 ",%" PRIu64 ",%" PRIu64 ",%" PRIu64 ",%" PRIu64 ",%" PRIu64 "\n",
336 access.access_timestamp, block_id, access.block_type, access.block_size,
337 access.cf_id, access.cf_name.c_str(), access.level, access.sst_fd_number,
338 access.caller, access.no_insert, access.get_id, get_key_id,
339 access.referenced_data_size, access.is_cache_hit,
340 access.referenced_key_exist_in_block, access.num_keys_in_block,
341 BlockCacheTraceHelper::GetTableId(access),
342 BlockCacheTraceHelper::GetSequenceNumber(access),
343 static_cast<uint64_t>(access.block_key.size()),
344 static_cast<uint64_t>(access.referenced_key.size()),
345 BlockCacheTraceHelper::GetBlockOffsetInFile(access));
346 if (ret < 0) {
347 return Status::IOError("failed to format the output");
348 }
349 std::string printout(trace_record_buffer_);
350 return human_readable_trace_file_writer_->Append(printout);
351 }
352
BlockCacheHumanReadableTraceReader(const std::string & trace_file_path)353 BlockCacheHumanReadableTraceReader::BlockCacheHumanReadableTraceReader(
354 const std::string& trace_file_path)
355 : BlockCacheTraceReader(/*trace_reader=*/nullptr) {
356 human_readable_trace_reader_.open(trace_file_path, std::ifstream::in);
357 }
358
~BlockCacheHumanReadableTraceReader()359 BlockCacheHumanReadableTraceReader::~BlockCacheHumanReadableTraceReader() {
360 human_readable_trace_reader_.close();
361 }
362
ReadHeader(BlockCacheTraceHeader *)363 Status BlockCacheHumanReadableTraceReader::ReadHeader(
364 BlockCacheTraceHeader* /*header*/) {
365 return Status::OK();
366 }
367
ReadAccess(BlockCacheTraceRecord * record)368 Status BlockCacheHumanReadableTraceReader::ReadAccess(
369 BlockCacheTraceRecord* record) {
370 std::string line;
371 if (!std::getline(human_readable_trace_reader_, line)) {
372 return Status::Incomplete("No more records to read.");
373 }
374 std::stringstream ss(line);
375 std::vector<std::string> record_strs;
376 while (ss.good()) {
377 std::string substr;
378 getline(ss, substr, ',');
379 record_strs.push_back(substr);
380 }
381 if (record_strs.size() != 21) {
382 return Status::Incomplete("Records format is wrong.");
383 }
384
385 record->access_timestamp = ParseUint64(record_strs[0]);
386 uint64_t block_key = ParseUint64(record_strs[1]);
387 record->block_type = static_cast<TraceType>(ParseUint64(record_strs[2]));
388 record->block_size = ParseUint64(record_strs[3]);
389 record->cf_id = ParseUint64(record_strs[4]);
390 record->cf_name = record_strs[5];
391 record->level = static_cast<uint32_t>(ParseUint64(record_strs[6]));
392 record->sst_fd_number = ParseUint64(record_strs[7]);
393 record->caller = static_cast<TableReaderCaller>(ParseUint64(record_strs[8]));
394 record->no_insert = static_cast<Boolean>(ParseUint64(record_strs[9]));
395 record->get_id = ParseUint64(record_strs[10]);
396 uint64_t get_key_id = ParseUint64(record_strs[11]);
397
398 record->referenced_data_size = ParseUint64(record_strs[12]);
399 record->is_cache_hit = static_cast<Boolean>(ParseUint64(record_strs[13]));
400 record->referenced_key_exist_in_block =
401 static_cast<Boolean>(ParseUint64(record_strs[14]));
402 record->num_keys_in_block = ParseUint64(record_strs[15]);
403 uint64_t table_id = ParseUint64(record_strs[16]);
404 if (table_id > 0) {
405 // Decrement since valid table id in the trace file equals traced table id
406 // + 1.
407 table_id -= 1;
408 }
409 uint64_t get_sequence_number = ParseUint64(record_strs[17]);
410 if (get_sequence_number > 0) {
411 record->get_from_user_specified_snapshot = Boolean::kTrue;
412 // Decrement since valid seq number in the trace file equals traced seq
413 // number + 1.
414 get_sequence_number -= 1;
415 }
416 uint64_t block_key_size = ParseUint64(record_strs[18]);
417 uint64_t get_key_size = ParseUint64(record_strs[19]);
418 uint64_t block_offset = ParseUint64(record_strs[20]);
419
420 std::string tmp_block_key;
421 PutVarint64(&tmp_block_key, block_key);
422 PutVarint64(&tmp_block_key, block_offset);
423 // Append 1 until the size is the same as traced block key size.
424 while (record->block_key.size() < block_key_size - tmp_block_key.size()) {
425 record->block_key += "1";
426 }
427 record->block_key += tmp_block_key;
428
429 if (get_key_id != 0) {
430 std::string tmp_get_key;
431 PutFixed64(&tmp_get_key, get_key_id);
432 PutFixed64(&tmp_get_key, get_sequence_number << 8);
433 PutFixed32(&record->referenced_key, static_cast<uint32_t>(table_id));
434 // Append 1 until the size is the same as traced key size.
435 while (record->referenced_key.size() < get_key_size - tmp_get_key.size()) {
436 record->referenced_key += "1";
437 }
438 record->referenced_key += tmp_get_key;
439 }
440 return Status::OK();
441 }
442
BlockCacheTracer()443 BlockCacheTracer::BlockCacheTracer() { writer_.store(nullptr); }
444
~BlockCacheTracer()445 BlockCacheTracer::~BlockCacheTracer() { EndTrace(); }
446
StartTrace(Env * env,const TraceOptions & trace_options,std::unique_ptr<TraceWriter> && trace_writer)447 Status BlockCacheTracer::StartTrace(
448 Env* env, const TraceOptions& trace_options,
449 std::unique_ptr<TraceWriter>&& trace_writer) {
450 InstrumentedMutexLock lock_guard(&trace_writer_mutex_);
451 if (writer_.load()) {
452 return Status::Busy();
453 }
454 get_id_counter_.store(1);
455 trace_options_ = trace_options;
456 writer_.store(
457 new BlockCacheTraceWriter(env, trace_options, std::move(trace_writer)));
458 return writer_.load()->WriteHeader();
459 }
460
EndTrace()461 void BlockCacheTracer::EndTrace() {
462 InstrumentedMutexLock lock_guard(&trace_writer_mutex_);
463 if (!writer_.load()) {
464 return;
465 }
466 delete writer_.load();
467 writer_.store(nullptr);
468 }
469
WriteBlockAccess(const BlockCacheTraceRecord & record,const Slice & block_key,const Slice & cf_name,const Slice & referenced_key)470 Status BlockCacheTracer::WriteBlockAccess(const BlockCacheTraceRecord& record,
471 const Slice& block_key,
472 const Slice& cf_name,
473 const Slice& referenced_key) {
474 if (!writer_.load() || !ShouldTrace(block_key, trace_options_)) {
475 return Status::OK();
476 }
477 InstrumentedMutexLock lock_guard(&trace_writer_mutex_);
478 if (!writer_.load()) {
479 return Status::OK();
480 }
481 return writer_.load()->WriteBlockAccess(record, block_key, cf_name,
482 referenced_key);
483 }
484
NextGetId()485 uint64_t BlockCacheTracer::NextGetId() {
486 if (!writer_.load(std::memory_order_relaxed)) {
487 return BlockCacheTraceHelper::kReservedGetId;
488 }
489 uint64_t prev_value = get_id_counter_.fetch_add(1);
490 if (prev_value == BlockCacheTraceHelper::kReservedGetId) {
491 // fetch and add again.
492 return get_id_counter_.fetch_add(1);
493 }
494 return prev_value;
495 }
496
497 } // namespace ROCKSDB_NAMESPACE
498