1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "trace_replay/trace_replay.h"
7
8 #include <chrono>
9 #include <sstream>
10 #include <thread>
11
12 #include "db/db_impl/db_impl.h"
13 #include "rocksdb/env.h"
14 #include "rocksdb/options.h"
15 #include "rocksdb/slice.h"
16 #include "rocksdb/system_clock.h"
17 #include "rocksdb/trace_reader_writer.h"
18 #include "rocksdb/write_batch.h"
19 #include "util/coding.h"
20 #include "util/string_util.h"
21 #include "util/threadpool_imp.h"
22
23 namespace ROCKSDB_NAMESPACE {
24
25 const std::string kTraceMagic = "feedcafedeadbeef";
26
27 namespace {
DecodeCFAndKey(std::string & buffer,uint32_t * cf_id,Slice * key)28 void DecodeCFAndKey(std::string& buffer, uint32_t* cf_id, Slice* key) {
29 Slice buf(buffer);
30 GetFixed32(&buf, cf_id);
31 GetLengthPrefixedSlice(&buf, key);
32 }
33 } // namespace
34
ParseVersionStr(std::string & v_string,int * v_num)35 Status TracerHelper::ParseVersionStr(std::string& v_string, int* v_num) {
36 if (v_string.find_first_of('.') == std::string::npos ||
37 v_string.find_first_of('.') != v_string.find_last_of('.')) {
38 return Status::Corruption(
39 "Corrupted trace file. Incorrect version format.");
40 }
41 int tmp_num = 0;
42 for (int i = 0; i < static_cast<int>(v_string.size()); i++) {
43 if (v_string[i] == '.') {
44 continue;
45 } else if (isdigit(v_string[i])) {
46 tmp_num = tmp_num * 10 + (v_string[i] - '0');
47 } else {
48 return Status::Corruption(
49 "Corrupted trace file. Incorrect version format");
50 }
51 }
52 *v_num = tmp_num;
53 return Status::OK();
54 }
55
ParseTraceHeader(const Trace & header,int * trace_version,int * db_version)56 Status TracerHelper::ParseTraceHeader(const Trace& header, int* trace_version,
57 int* db_version) {
58 std::vector<std::string> s_vec;
59 int begin = 0, end;
60 for (int i = 0; i < 3; i++) {
61 assert(header.payload.find("\t", begin) != std::string::npos);
62 end = static_cast<int>(header.payload.find("\t", begin));
63 s_vec.push_back(header.payload.substr(begin, end - begin));
64 begin = end + 1;
65 }
66
67 std::string t_v_str, db_v_str;
68 assert(s_vec.size() == 3);
69 assert(s_vec[1].find("Trace Version: ") != std::string::npos);
70 t_v_str = s_vec[1].substr(15);
71 assert(s_vec[2].find("RocksDB Version: ") != std::string::npos);
72 db_v_str = s_vec[2].substr(17);
73
74 Status s;
75 s = ParseVersionStr(t_v_str, trace_version);
76 if (s != Status::OK()) {
77 return s;
78 }
79 s = ParseVersionStr(db_v_str, db_version);
80 return s;
81 }
82
EncodeTrace(const Trace & trace,std::string * encoded_trace)83 void TracerHelper::EncodeTrace(const Trace& trace, std::string* encoded_trace) {
84 assert(encoded_trace);
85 PutFixed64(encoded_trace, trace.ts);
86 encoded_trace->push_back(trace.type);
87 PutFixed32(encoded_trace, static_cast<uint32_t>(trace.payload.size()));
88 encoded_trace->append(trace.payload);
89 }
90
DecodeTrace(const std::string & encoded_trace,Trace * trace)91 Status TracerHelper::DecodeTrace(const std::string& encoded_trace,
92 Trace* trace) {
93 assert(trace != nullptr);
94 Slice enc_slice = Slice(encoded_trace);
95 if (!GetFixed64(&enc_slice, &trace->ts)) {
96 return Status::Incomplete("Decode trace string failed");
97 }
98 if (enc_slice.size() < kTraceTypeSize + kTracePayloadLengthSize) {
99 return Status::Incomplete("Decode trace string failed");
100 }
101 trace->type = static_cast<TraceType>(enc_slice[0]);
102 enc_slice.remove_prefix(kTraceTypeSize + kTracePayloadLengthSize);
103 trace->payload = enc_slice.ToString();
104 return Status::OK();
105 }
106
SetPayloadMap(uint64_t & payload_map,const TracePayloadType payload_type)107 bool TracerHelper::SetPayloadMap(uint64_t& payload_map,
108 const TracePayloadType payload_type) {
109 uint64_t old_state = payload_map;
110 uint64_t tmp = 1;
111 payload_map |= (tmp << payload_type);
112 return old_state != payload_map;
113 }
114
DecodeWritePayload(Trace * trace,WritePayload * write_payload)115 void TracerHelper::DecodeWritePayload(Trace* trace,
116 WritePayload* write_payload) {
117 assert(write_payload != nullptr);
118 Slice buf(trace->payload);
119 GetFixed64(&buf, &trace->payload_map);
120 int64_t payload_map = static_cast<int64_t>(trace->payload_map);
121 while (payload_map) {
122 // Find the rightmost set bit.
123 uint32_t set_pos = static_cast<uint32_t>(log2(payload_map & -payload_map));
124 switch (set_pos) {
125 case TracePayloadType::kWriteBatchData:
126 GetLengthPrefixedSlice(&buf, &(write_payload->write_batch_data));
127 break;
128 default:
129 assert(false);
130 }
131 // unset the rightmost bit.
132 payload_map &= (payload_map - 1);
133 }
134 }
135
DecodeGetPayload(Trace * trace,GetPayload * get_payload)136 void TracerHelper::DecodeGetPayload(Trace* trace, GetPayload* get_payload) {
137 assert(get_payload != nullptr);
138 Slice buf(trace->payload);
139 GetFixed64(&buf, &trace->payload_map);
140 int64_t payload_map = static_cast<int64_t>(trace->payload_map);
141 while (payload_map) {
142 // Find the rightmost set bit.
143 uint32_t set_pos = static_cast<uint32_t>(log2(payload_map & -payload_map));
144 switch (set_pos) {
145 case TracePayloadType::kGetCFID:
146 GetFixed32(&buf, &(get_payload->cf_id));
147 break;
148 case TracePayloadType::kGetKey:
149 GetLengthPrefixedSlice(&buf, &(get_payload->get_key));
150 break;
151 default:
152 assert(false);
153 }
154 // unset the rightmost bit.
155 payload_map &= (payload_map - 1);
156 }
157 }
158
DecodeIterPayload(Trace * trace,IterPayload * iter_payload)159 void TracerHelper::DecodeIterPayload(Trace* trace, IterPayload* iter_payload) {
160 assert(iter_payload != nullptr);
161 Slice buf(trace->payload);
162 GetFixed64(&buf, &trace->payload_map);
163 int64_t payload_map = static_cast<int64_t>(trace->payload_map);
164 while (payload_map) {
165 // Find the rightmost set bit.
166 uint32_t set_pos = static_cast<uint32_t>(log2(payload_map & -payload_map));
167 switch (set_pos) {
168 case TracePayloadType::kIterCFID:
169 GetFixed32(&buf, &(iter_payload->cf_id));
170 break;
171 case TracePayloadType::kIterKey:
172 GetLengthPrefixedSlice(&buf, &(iter_payload->iter_key));
173 break;
174 case TracePayloadType::kIterLowerBound:
175 GetLengthPrefixedSlice(&buf, &(iter_payload->lower_bound));
176 break;
177 case TracePayloadType::kIterUpperBound:
178 GetLengthPrefixedSlice(&buf, &(iter_payload->upper_bound));
179 break;
180 default:
181 assert(false);
182 }
183 // unset the rightmost bit.
184 payload_map &= (payload_map - 1);
185 }
186 }
187
Tracer(SystemClock * clock,const TraceOptions & trace_options,std::unique_ptr<TraceWriter> && trace_writer)188 Tracer::Tracer(SystemClock* clock, const TraceOptions& trace_options,
189 std::unique_ptr<TraceWriter>&& trace_writer)
190 : clock_(clock),
191 trace_options_(trace_options),
192 trace_writer_(std::move(trace_writer)),
193 trace_request_count_(0) {
194 // TODO: What if this fails?
195 WriteHeader().PermitUncheckedError();
196 }
197
~Tracer()198 Tracer::~Tracer() { trace_writer_.reset(); }
199
Write(WriteBatch * write_batch)200 Status Tracer::Write(WriteBatch* write_batch) {
201 TraceType trace_type = kTraceWrite;
202 if (ShouldSkipTrace(trace_type)) {
203 return Status::OK();
204 }
205 Trace trace;
206 trace.ts = clock_->NowMicros();
207 trace.type = trace_type;
208 TracerHelper::SetPayloadMap(trace.payload_map,
209 TracePayloadType::kWriteBatchData);
210 PutFixed64(&trace.payload, trace.payload_map);
211 PutLengthPrefixedSlice(&trace.payload, Slice(write_batch->Data()));
212 return WriteTrace(trace);
213 }
214
Get(ColumnFamilyHandle * column_family,const Slice & key)215 Status Tracer::Get(ColumnFamilyHandle* column_family, const Slice& key) {
216 TraceType trace_type = kTraceGet;
217 if (ShouldSkipTrace(trace_type)) {
218 return Status::OK();
219 }
220 Trace trace;
221 trace.ts = clock_->NowMicros();
222 trace.type = trace_type;
223 // Set the payloadmap of the struct member that will be encoded in the
224 // payload.
225 TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kGetCFID);
226 TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kGetKey);
227 // Encode the Get struct members into payload. Make sure add them in order.
228 PutFixed64(&trace.payload, trace.payload_map);
229 PutFixed32(&trace.payload, column_family->GetID());
230 PutLengthPrefixedSlice(&trace.payload, key);
231 return WriteTrace(trace);
232 }
233
IteratorSeek(const uint32_t & cf_id,const Slice & key,const Slice & lower_bound,const Slice upper_bound)234 Status Tracer::IteratorSeek(const uint32_t& cf_id, const Slice& key,
235 const Slice& lower_bound, const Slice upper_bound) {
236 TraceType trace_type = kTraceIteratorSeek;
237 if (ShouldSkipTrace(trace_type)) {
238 return Status::OK();
239 }
240 Trace trace;
241 trace.ts = clock_->NowMicros();
242 trace.type = trace_type;
243 // Set the payloadmap of the struct member that will be encoded in the
244 // payload.
245 TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kIterCFID);
246 TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kIterKey);
247 if (lower_bound.size() > 0) {
248 TracerHelper::SetPayloadMap(trace.payload_map,
249 TracePayloadType::kIterLowerBound);
250 }
251 if (upper_bound.size() > 0) {
252 TracerHelper::SetPayloadMap(trace.payload_map,
253 TracePayloadType::kIterUpperBound);
254 }
255 // Encode the Iterator struct members into payload. Make sure add them in
256 // order.
257 PutFixed64(&trace.payload, trace.payload_map);
258 PutFixed32(&trace.payload, cf_id);
259 PutLengthPrefixedSlice(&trace.payload, key);
260 if (lower_bound.size() > 0) {
261 PutLengthPrefixedSlice(&trace.payload, lower_bound);
262 }
263 if (upper_bound.size() > 0) {
264 PutLengthPrefixedSlice(&trace.payload, upper_bound);
265 }
266 return WriteTrace(trace);
267 }
268
IteratorSeekForPrev(const uint32_t & cf_id,const Slice & key,const Slice & lower_bound,const Slice upper_bound)269 Status Tracer::IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key,
270 const Slice& lower_bound,
271 const Slice upper_bound) {
272 TraceType trace_type = kTraceIteratorSeekForPrev;
273 if (ShouldSkipTrace(trace_type)) {
274 return Status::OK();
275 }
276 Trace trace;
277 trace.ts = clock_->NowMicros();
278 trace.type = trace_type;
279 // Set the payloadmap of the struct member that will be encoded in the
280 // payload.
281 TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kIterCFID);
282 TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kIterKey);
283 if (lower_bound.size() > 0) {
284 TracerHelper::SetPayloadMap(trace.payload_map,
285 TracePayloadType::kIterLowerBound);
286 }
287 if (upper_bound.size() > 0) {
288 TracerHelper::SetPayloadMap(trace.payload_map,
289 TracePayloadType::kIterUpperBound);
290 }
291 // Encode the Iterator struct members into payload. Make sure add them in
292 // order.
293 PutFixed64(&trace.payload, trace.payload_map);
294 PutFixed32(&trace.payload, cf_id);
295 PutLengthPrefixedSlice(&trace.payload, key);
296 if (lower_bound.size() > 0) {
297 PutLengthPrefixedSlice(&trace.payload, lower_bound);
298 }
299 if (upper_bound.size() > 0) {
300 PutLengthPrefixedSlice(&trace.payload, upper_bound);
301 }
302 return WriteTrace(trace);
303 }
304
ShouldSkipTrace(const TraceType & trace_type)305 bool Tracer::ShouldSkipTrace(const TraceType& trace_type) {
306 if (IsTraceFileOverMax()) {
307 return true;
308 }
309 if ((trace_options_.filter & kTraceFilterGet
310 && trace_type == kTraceGet)
311 || (trace_options_.filter & kTraceFilterWrite
312 && trace_type == kTraceWrite)) {
313 return true;
314 }
315 ++trace_request_count_;
316 if (trace_request_count_ < trace_options_.sampling_frequency) {
317 return true;
318 }
319 trace_request_count_ = 0;
320 return false;
321 }
322
IsTraceFileOverMax()323 bool Tracer::IsTraceFileOverMax() {
324 uint64_t trace_file_size = trace_writer_->GetFileSize();
325 return (trace_file_size > trace_options_.max_trace_file_size);
326 }
327
WriteHeader()328 Status Tracer::WriteHeader() {
329 std::ostringstream s;
330 s << kTraceMagic << "\t"
331 << "Trace Version: " << kTraceFileMajorVersion << "."
332 << kTraceFileMinorVersion << "\t"
333 << "RocksDB Version: " << kMajorVersion << "." << kMinorVersion << "\t"
334 << "Format: Timestamp OpType Payload\n";
335 std::string header(s.str());
336
337 Trace trace;
338 trace.ts = clock_->NowMicros();
339 trace.type = kTraceBegin;
340 trace.payload = header;
341 return WriteTrace(trace);
342 }
343
WriteFooter()344 Status Tracer::WriteFooter() {
345 Trace trace;
346 trace.ts = clock_->NowMicros();
347 trace.type = kTraceEnd;
348 TracerHelper::SetPayloadMap(trace.payload_map,
349 TracePayloadType::kEmptyPayload);
350 trace.payload = "";
351 return WriteTrace(trace);
352 }
353
WriteTrace(const Trace & trace)354 Status Tracer::WriteTrace(const Trace& trace) {
355 std::string encoded_trace;
356 TracerHelper::EncodeTrace(trace, &encoded_trace);
357 return trace_writer_->Write(Slice(encoded_trace));
358 }
359
Close()360 Status Tracer::Close() { return WriteFooter(); }
361
Replayer(DB * db,const std::vector<ColumnFamilyHandle * > & handles,std::unique_ptr<TraceReader> && reader)362 Replayer::Replayer(DB* db, const std::vector<ColumnFamilyHandle*>& handles,
363 std::unique_ptr<TraceReader>&& reader)
364 : trace_reader_(std::move(reader)) {
365 assert(db != nullptr);
366 db_ = static_cast<DBImpl*>(db->GetRootDB());
367 env_ = Env::Default();
368 for (ColumnFamilyHandle* cfh : handles) {
369 cf_map_[cfh->GetID()] = cfh;
370 }
371 fast_forward_ = 1;
372 }
373
~Replayer()374 Replayer::~Replayer() { trace_reader_.reset(); }
375
SetFastForward(uint32_t fast_forward)376 Status Replayer::SetFastForward(uint32_t fast_forward) {
377 Status s;
378 if (fast_forward < 1) {
379 s = Status::InvalidArgument("Wrong fast forward speed!");
380 } else {
381 fast_forward_ = fast_forward;
382 s = Status::OK();
383 }
384 return s;
385 }
386
Replay()387 Status Replayer::Replay() {
388 Status s;
389 Trace header;
390 int db_version;
391 s = ReadHeader(&header);
392 if (!s.ok()) {
393 return s;
394 }
395 s = TracerHelper::ParseTraceHeader(header, &trace_file_version_, &db_version);
396 if (!s.ok()) {
397 return s;
398 }
399
400 std::chrono::system_clock::time_point replay_epoch =
401 std::chrono::system_clock::now();
402 WriteOptions woptions;
403 ReadOptions roptions;
404 Trace trace;
405 uint64_t ops = 0;
406 Iterator* single_iter = nullptr;
407 while (s.ok()) {
408 trace.reset();
409 s = ReadTrace(&trace);
410 if (!s.ok()) {
411 break;
412 }
413
414 std::this_thread::sleep_until(
415 replay_epoch +
416 std::chrono::microseconds((trace.ts - header.ts) / fast_forward_));
417 if (trace.type == kTraceWrite) {
418 if (trace_file_version_ < 2) {
419 WriteBatch batch(trace.payload);
420 db_->Write(woptions, &batch);
421 } else {
422 WritePayload w_payload;
423 TracerHelper::DecodeWritePayload(&trace, &w_payload);
424 WriteBatch batch(w_payload.write_batch_data.ToString());
425 db_->Write(woptions, &batch);
426 }
427 ops++;
428 } else if (trace.type == kTraceGet) {
429 GetPayload get_payload;
430 get_payload.cf_id = 0;
431 get_payload.get_key = 0;
432 if (trace_file_version_ < 2) {
433 DecodeCFAndKey(trace.payload, &get_payload.cf_id, &get_payload.get_key);
434 } else {
435 TracerHelper::DecodeGetPayload(&trace, &get_payload);
436 }
437 if (get_payload.cf_id > 0 &&
438 cf_map_.find(get_payload.cf_id) == cf_map_.end()) {
439 return Status::Corruption("Invalid Column Family ID.");
440 }
441
442 std::string value;
443 if (get_payload.cf_id == 0) {
444 db_->Get(roptions, get_payload.get_key, &value);
445 } else {
446 db_->Get(roptions, cf_map_[get_payload.cf_id], get_payload.get_key,
447 &value);
448 }
449 ops++;
450 } else if (trace.type == kTraceIteratorSeek) {
451 // Currently, we only support to call Seek. The Next() and Prev() is not
452 // supported.
453 IterPayload iter_payload;
454 iter_payload.cf_id = 0;
455 if (trace_file_version_ < 2) {
456 DecodeCFAndKey(trace.payload, &iter_payload.cf_id,
457 &iter_payload.iter_key);
458 } else {
459 TracerHelper::DecodeIterPayload(&trace, &iter_payload);
460 }
461 if (iter_payload.cf_id > 0 &&
462 cf_map_.find(iter_payload.cf_id) == cf_map_.end()) {
463 return Status::Corruption("Invalid Column Family ID.");
464 }
465
466 if (iter_payload.cf_id == 0) {
467 single_iter = db_->NewIterator(roptions);
468 } else {
469 single_iter = db_->NewIterator(roptions, cf_map_[iter_payload.cf_id]);
470 }
471 single_iter->Seek(iter_payload.iter_key);
472 ops++;
473 delete single_iter;
474 } else if (trace.type == kTraceIteratorSeekForPrev) {
475 // Currently, we only support to call SeekForPrev. The Next() and Prev()
476 // is not supported.
477 IterPayload iter_payload;
478 iter_payload.cf_id = 0;
479 if (trace_file_version_ < 2) {
480 DecodeCFAndKey(trace.payload, &iter_payload.cf_id,
481 &iter_payload.iter_key);
482 } else {
483 TracerHelper::DecodeIterPayload(&trace, &iter_payload);
484 }
485 if (iter_payload.cf_id > 0 &&
486 cf_map_.find(iter_payload.cf_id) == cf_map_.end()) {
487 return Status::Corruption("Invalid Column Family ID.");
488 }
489
490 if (iter_payload.cf_id == 0) {
491 single_iter = db_->NewIterator(roptions);
492 } else {
493 single_iter = db_->NewIterator(roptions, cf_map_[iter_payload.cf_id]);
494 }
495 single_iter->SeekForPrev(iter_payload.iter_key);
496 ops++;
497 delete single_iter;
498 } else if (trace.type == kTraceEnd) {
499 // Do nothing for now.
500 // TODO: Add some validations later.
501 break;
502 }
503 }
504
505 if (s.IsIncomplete()) {
506 // Reaching eof returns Incomplete status at the moment.
507 // Could happen when killing a process without calling EndTrace() API.
508 // TODO: Add better error handling.
509 return Status::OK();
510 }
511 return s;
512 }
513
514 // The trace can be replayed with multithread by configurnge the number of
515 // threads in the thread pool. Trace records are read from the trace file
516 // sequentially and the corresponding queries are scheduled in the task
517 // queue based on the timestamp. Currently, we support Write_batch (Put,
518 // Delete, SingleDelete, DeleteRange), Get, Iterator (Seek and SeekForPrev).
MultiThreadReplay(uint32_t threads_num)519 Status Replayer::MultiThreadReplay(uint32_t threads_num) {
520 Status s;
521 Trace header;
522 int db_version;
523 s = ReadHeader(&header);
524 if (!s.ok()) {
525 return s;
526 }
527 s = TracerHelper::ParseTraceHeader(header, &trace_file_version_, &db_version);
528 if (!s.ok()) {
529 return s;
530 }
531 ThreadPoolImpl thread_pool;
532 thread_pool.SetHostEnv(env_);
533
534 if (threads_num > 1) {
535 thread_pool.SetBackgroundThreads(static_cast<int>(threads_num));
536 } else {
537 thread_pool.SetBackgroundThreads(1);
538 }
539
540 std::chrono::system_clock::time_point replay_epoch =
541 std::chrono::system_clock::now();
542 WriteOptions woptions;
543 ReadOptions roptions;
544 uint64_t ops = 0;
545 while (s.ok()) {
546 std::unique_ptr<ReplayerWorkerArg> ra(new ReplayerWorkerArg);
547 ra->db = db_;
548 s = ReadTrace(&(ra->trace_entry));
549 if (!s.ok()) {
550 break;
551 }
552 ra->cf_map = &cf_map_;
553 ra->woptions = woptions;
554 ra->roptions = roptions;
555 ra->trace_file_version = trace_file_version_;
556
557 std::this_thread::sleep_until(
558 replay_epoch + std::chrono::microseconds(
559 (ra->trace_entry.ts - header.ts) / fast_forward_));
560 if (ra->trace_entry.type == kTraceWrite) {
561 thread_pool.Schedule(&Replayer::BGWorkWriteBatch, ra.release(), nullptr,
562 nullptr);
563 ops++;
564 } else if (ra->trace_entry.type == kTraceGet) {
565 thread_pool.Schedule(&Replayer::BGWorkGet, ra.release(), nullptr,
566 nullptr);
567 ops++;
568 } else if (ra->trace_entry.type == kTraceIteratorSeek) {
569 thread_pool.Schedule(&Replayer::BGWorkIterSeek, ra.release(), nullptr,
570 nullptr);
571 ops++;
572 } else if (ra->trace_entry.type == kTraceIteratorSeekForPrev) {
573 thread_pool.Schedule(&Replayer::BGWorkIterSeekForPrev, ra.release(),
574 nullptr, nullptr);
575 ops++;
576 } else if (ra->trace_entry.type == kTraceEnd) {
577 // Do nothing for now.
578 // TODO: Add some validations later.
579 break;
580 } else {
581 // Other trace entry types that are not implemented for replay.
582 // To finish the replay, we continue the process.
583 continue;
584 }
585 }
586
587 if (s.IsIncomplete()) {
588 // Reaching eof returns Incomplete status at the moment.
589 // Could happen when killing a process without calling EndTrace() API.
590 // TODO: Add better error handling.
591 s = Status::OK();
592 }
593 thread_pool.JoinAllThreads();
594 return s;
595 }
596
ReadHeader(Trace * header)597 Status Replayer::ReadHeader(Trace* header) {
598 assert(header != nullptr);
599 std::string encoded_trace;
600 // Read the trace head
601 Status s = trace_reader_->Read(&encoded_trace);
602 if (!s.ok()) {
603 return s;
604 }
605
606 s = TracerHelper::DecodeTrace(encoded_trace, header);
607
608 if (header->type != kTraceBegin) {
609 return Status::Corruption("Corrupted trace file. Incorrect header.");
610 }
611 if (header->payload.substr(0, kTraceMagic.length()) != kTraceMagic) {
612 return Status::Corruption("Corrupted trace file. Incorrect magic.");
613 }
614
615 return s;
616 }
617
ReadFooter(Trace * footer)618 Status Replayer::ReadFooter(Trace* footer) {
619 assert(footer != nullptr);
620 Status s = ReadTrace(footer);
621 if (!s.ok()) {
622 return s;
623 }
624 if (footer->type != kTraceEnd) {
625 return Status::Corruption("Corrupted trace file. Incorrect footer.");
626 }
627
628 // TODO: Add more validations later
629 return s;
630 }
631
ReadTrace(Trace * trace)632 Status Replayer::ReadTrace(Trace* trace) {
633 assert(trace != nullptr);
634 std::string encoded_trace;
635 Status s = trace_reader_->Read(&encoded_trace);
636 if (!s.ok()) {
637 return s;
638 }
639 return TracerHelper::DecodeTrace(encoded_trace, trace);
640 }
641
BGWorkGet(void * arg)642 void Replayer::BGWorkGet(void* arg) {
643 std::unique_ptr<ReplayerWorkerArg> ra(
644 reinterpret_cast<ReplayerWorkerArg*>(arg));
645 assert(ra != nullptr);
646 auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
647 ra->cf_map);
648 GetPayload get_payload;
649 get_payload.cf_id = 0;
650 if (ra->trace_file_version < 2) {
651 DecodeCFAndKey(ra->trace_entry.payload, &get_payload.cf_id,
652 &get_payload.get_key);
653 } else {
654 TracerHelper::DecodeGetPayload(&(ra->trace_entry), &get_payload);
655 }
656 if (get_payload.cf_id > 0 &&
657 cf_map->find(get_payload.cf_id) == cf_map->end()) {
658 return;
659 }
660
661 std::string value;
662 if (get_payload.cf_id == 0) {
663 ra->db->Get(ra->roptions, get_payload.get_key, &value);
664 } else {
665 ra->db->Get(ra->roptions, (*cf_map)[get_payload.cf_id], get_payload.get_key,
666 &value);
667 }
668 return;
669 }
670
BGWorkWriteBatch(void * arg)671 void Replayer::BGWorkWriteBatch(void* arg) {
672 std::unique_ptr<ReplayerWorkerArg> ra(
673 reinterpret_cast<ReplayerWorkerArg*>(arg));
674 assert(ra != nullptr);
675
676 if (ra->trace_file_version < 2) {
677 WriteBatch batch(ra->trace_entry.payload);
678 ra->db->Write(ra->woptions, &batch);
679 } else {
680 WritePayload w_payload;
681 TracerHelper::DecodeWritePayload(&(ra->trace_entry), &w_payload);
682 WriteBatch batch(w_payload.write_batch_data.ToString());
683 ra->db->Write(ra->woptions, &batch);
684 }
685 return;
686 }
687
BGWorkIterSeek(void * arg)688 void Replayer::BGWorkIterSeek(void* arg) {
689 std::unique_ptr<ReplayerWorkerArg> ra(
690 reinterpret_cast<ReplayerWorkerArg*>(arg));
691 assert(ra != nullptr);
692 auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
693 ra->cf_map);
694 IterPayload iter_payload;
695 iter_payload.cf_id = 0;
696
697 if (ra->trace_file_version < 2) {
698 DecodeCFAndKey(ra->trace_entry.payload, &iter_payload.cf_id,
699 &iter_payload.iter_key);
700 } else {
701 TracerHelper::DecodeIterPayload(&(ra->trace_entry), &iter_payload);
702 }
703 if (iter_payload.cf_id > 0 &&
704 cf_map->find(iter_payload.cf_id) == cf_map->end()) {
705 return;
706 }
707
708 Iterator* single_iter = nullptr;
709 if (iter_payload.cf_id == 0) {
710 single_iter = ra->db->NewIterator(ra->roptions);
711 } else {
712 single_iter =
713 ra->db->NewIterator(ra->roptions, (*cf_map)[iter_payload.cf_id]);
714 }
715 single_iter->Seek(iter_payload.iter_key);
716 delete single_iter;
717 return;
718 }
719
BGWorkIterSeekForPrev(void * arg)720 void Replayer::BGWorkIterSeekForPrev(void* arg) {
721 std::unique_ptr<ReplayerWorkerArg> ra(
722 reinterpret_cast<ReplayerWorkerArg*>(arg));
723 assert(ra != nullptr);
724 auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
725 ra->cf_map);
726 IterPayload iter_payload;
727 iter_payload.cf_id = 0;
728
729 if (ra->trace_file_version < 2) {
730 DecodeCFAndKey(ra->trace_entry.payload, &iter_payload.cf_id,
731 &iter_payload.iter_key);
732 } else {
733 TracerHelper::DecodeIterPayload(&(ra->trace_entry), &iter_payload);
734 }
735 if (iter_payload.cf_id > 0 &&
736 cf_map->find(iter_payload.cf_id) == cf_map->end()) {
737 return;
738 }
739
740 Iterator* single_iter = nullptr;
741 if (iter_payload.cf_id == 0) {
742 single_iter = ra->db->NewIterator(ra->roptions);
743 } else {
744 single_iter =
745 ra->db->NewIterator(ra->roptions, (*cf_map)[iter_payload.cf_id]);
746 }
747 single_iter->SeekForPrev(iter_payload.iter_key);
748 delete single_iter;
749 return;
750 }
751
752 } // namespace ROCKSDB_NAMESPACE
753