1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #include "trace_replay/trace_replay.h"
7 
8 #include <chrono>
9 #include <sstream>
10 #include <thread>
11 
12 #include "db/db_impl/db_impl.h"
13 #include "rocksdb/env.h"
14 #include "rocksdb/options.h"
15 #include "rocksdb/slice.h"
16 #include "rocksdb/system_clock.h"
17 #include "rocksdb/trace_reader_writer.h"
18 #include "rocksdb/write_batch.h"
19 #include "util/coding.h"
20 #include "util/string_util.h"
21 #include "util/threadpool_imp.h"
22 
23 namespace ROCKSDB_NAMESPACE {
24 
25 const std::string kTraceMagic = "feedcafedeadbeef";
26 
27 namespace {
DecodeCFAndKey(std::string & buffer,uint32_t * cf_id,Slice * key)28 void DecodeCFAndKey(std::string& buffer, uint32_t* cf_id, Slice* key) {
29   Slice buf(buffer);
30   GetFixed32(&buf, cf_id);
31   GetLengthPrefixedSlice(&buf, key);
32 }
33 }  // namespace
34 
ParseVersionStr(std::string & v_string,int * v_num)35 Status TracerHelper::ParseVersionStr(std::string& v_string, int* v_num) {
36   if (v_string.find_first_of('.') == std::string::npos ||
37       v_string.find_first_of('.') != v_string.find_last_of('.')) {
38     return Status::Corruption(
39         "Corrupted trace file. Incorrect version format.");
40   }
41   int tmp_num = 0;
42   for (int i = 0; i < static_cast<int>(v_string.size()); i++) {
43     if (v_string[i] == '.') {
44       continue;
45     } else if (isdigit(v_string[i])) {
46       tmp_num = tmp_num * 10 + (v_string[i] - '0');
47     } else {
48       return Status::Corruption(
49           "Corrupted trace file. Incorrect version format");
50     }
51   }
52   *v_num = tmp_num;
53   return Status::OK();
54 }
55 
ParseTraceHeader(const Trace & header,int * trace_version,int * db_version)56 Status TracerHelper::ParseTraceHeader(const Trace& header, int* trace_version,
57                                       int* db_version) {
58   std::vector<std::string> s_vec;
59   int begin = 0, end;
60   for (int i = 0; i < 3; i++) {
61     assert(header.payload.find("\t", begin) != std::string::npos);
62     end = static_cast<int>(header.payload.find("\t", begin));
63     s_vec.push_back(header.payload.substr(begin, end - begin));
64     begin = end + 1;
65   }
66 
67   std::string t_v_str, db_v_str;
68   assert(s_vec.size() == 3);
69   assert(s_vec[1].find("Trace Version: ") != std::string::npos);
70   t_v_str = s_vec[1].substr(15);
71   assert(s_vec[2].find("RocksDB Version: ") != std::string::npos);
72   db_v_str = s_vec[2].substr(17);
73 
74   Status s;
75   s = ParseVersionStr(t_v_str, trace_version);
76   if (s != Status::OK()) {
77     return s;
78   }
79   s = ParseVersionStr(db_v_str, db_version);
80   return s;
81 }
82 
EncodeTrace(const Trace & trace,std::string * encoded_trace)83 void TracerHelper::EncodeTrace(const Trace& trace, std::string* encoded_trace) {
84   assert(encoded_trace);
85   PutFixed64(encoded_trace, trace.ts);
86   encoded_trace->push_back(trace.type);
87   PutFixed32(encoded_trace, static_cast<uint32_t>(trace.payload.size()));
88   encoded_trace->append(trace.payload);
89 }
90 
DecodeTrace(const std::string & encoded_trace,Trace * trace)91 Status TracerHelper::DecodeTrace(const std::string& encoded_trace,
92                                  Trace* trace) {
93   assert(trace != nullptr);
94   Slice enc_slice = Slice(encoded_trace);
95   if (!GetFixed64(&enc_slice, &trace->ts)) {
96     return Status::Incomplete("Decode trace string failed");
97   }
98   if (enc_slice.size() < kTraceTypeSize + kTracePayloadLengthSize) {
99     return Status::Incomplete("Decode trace string failed");
100   }
101   trace->type = static_cast<TraceType>(enc_slice[0]);
102   enc_slice.remove_prefix(kTraceTypeSize + kTracePayloadLengthSize);
103   trace->payload = enc_slice.ToString();
104   return Status::OK();
105 }
106 
SetPayloadMap(uint64_t & payload_map,const TracePayloadType payload_type)107 bool TracerHelper::SetPayloadMap(uint64_t& payload_map,
108                                  const TracePayloadType payload_type) {
109   uint64_t old_state = payload_map;
110   uint64_t tmp = 1;
111   payload_map |= (tmp << payload_type);
112   return old_state != payload_map;
113 }
114 
DecodeWritePayload(Trace * trace,WritePayload * write_payload)115 void TracerHelper::DecodeWritePayload(Trace* trace,
116                                       WritePayload* write_payload) {
117   assert(write_payload != nullptr);
118   Slice buf(trace->payload);
119   GetFixed64(&buf, &trace->payload_map);
120   int64_t payload_map = static_cast<int64_t>(trace->payload_map);
121   while (payload_map) {
122     // Find the rightmost set bit.
123     uint32_t set_pos = static_cast<uint32_t>(log2(payload_map & -payload_map));
124     switch (set_pos) {
125       case TracePayloadType::kWriteBatchData:
126         GetLengthPrefixedSlice(&buf, &(write_payload->write_batch_data));
127         break;
128       default:
129         assert(false);
130     }
131     // unset the rightmost bit.
132     payload_map &= (payload_map - 1);
133   }
134 }
135 
DecodeGetPayload(Trace * trace,GetPayload * get_payload)136 void TracerHelper::DecodeGetPayload(Trace* trace, GetPayload* get_payload) {
137   assert(get_payload != nullptr);
138   Slice buf(trace->payload);
139   GetFixed64(&buf, &trace->payload_map);
140   int64_t payload_map = static_cast<int64_t>(trace->payload_map);
141   while (payload_map) {
142     // Find the rightmost set bit.
143     uint32_t set_pos = static_cast<uint32_t>(log2(payload_map & -payload_map));
144     switch (set_pos) {
145       case TracePayloadType::kGetCFID:
146         GetFixed32(&buf, &(get_payload->cf_id));
147         break;
148       case TracePayloadType::kGetKey:
149         GetLengthPrefixedSlice(&buf, &(get_payload->get_key));
150         break;
151       default:
152         assert(false);
153     }
154     // unset the rightmost bit.
155     payload_map &= (payload_map - 1);
156   }
157 }
158 
DecodeIterPayload(Trace * trace,IterPayload * iter_payload)159 void TracerHelper::DecodeIterPayload(Trace* trace, IterPayload* iter_payload) {
160   assert(iter_payload != nullptr);
161   Slice buf(trace->payload);
162   GetFixed64(&buf, &trace->payload_map);
163   int64_t payload_map = static_cast<int64_t>(trace->payload_map);
164   while (payload_map) {
165     // Find the rightmost set bit.
166     uint32_t set_pos = static_cast<uint32_t>(log2(payload_map & -payload_map));
167     switch (set_pos) {
168       case TracePayloadType::kIterCFID:
169         GetFixed32(&buf, &(iter_payload->cf_id));
170         break;
171       case TracePayloadType::kIterKey:
172         GetLengthPrefixedSlice(&buf, &(iter_payload->iter_key));
173         break;
174       case TracePayloadType::kIterLowerBound:
175         GetLengthPrefixedSlice(&buf, &(iter_payload->lower_bound));
176         break;
177       case TracePayloadType::kIterUpperBound:
178         GetLengthPrefixedSlice(&buf, &(iter_payload->upper_bound));
179         break;
180       default:
181         assert(false);
182     }
183     // unset the rightmost bit.
184     payload_map &= (payload_map - 1);
185   }
186 }
187 
Tracer(SystemClock * clock,const TraceOptions & trace_options,std::unique_ptr<TraceWriter> && trace_writer)188 Tracer::Tracer(SystemClock* clock, const TraceOptions& trace_options,
189                std::unique_ptr<TraceWriter>&& trace_writer)
190     : clock_(clock),
191       trace_options_(trace_options),
192       trace_writer_(std::move(trace_writer)),
193       trace_request_count_(0) {
194   // TODO: What if this fails?
195   WriteHeader().PermitUncheckedError();
196 }
197 
~Tracer()198 Tracer::~Tracer() { trace_writer_.reset(); }
199 
Write(WriteBatch * write_batch)200 Status Tracer::Write(WriteBatch* write_batch) {
201   TraceType trace_type = kTraceWrite;
202   if (ShouldSkipTrace(trace_type)) {
203     return Status::OK();
204   }
205   Trace trace;
206   trace.ts = clock_->NowMicros();
207   trace.type = trace_type;
208   TracerHelper::SetPayloadMap(trace.payload_map,
209                               TracePayloadType::kWriteBatchData);
210   PutFixed64(&trace.payload, trace.payload_map);
211   PutLengthPrefixedSlice(&trace.payload, Slice(write_batch->Data()));
212   return WriteTrace(trace);
213 }
214 
Get(ColumnFamilyHandle * column_family,const Slice & key)215 Status Tracer::Get(ColumnFamilyHandle* column_family, const Slice& key) {
216   TraceType trace_type = kTraceGet;
217   if (ShouldSkipTrace(trace_type)) {
218     return Status::OK();
219   }
220   Trace trace;
221   trace.ts = clock_->NowMicros();
222   trace.type = trace_type;
223   // Set the payloadmap of the struct member that will be encoded in the
224   // payload.
225   TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kGetCFID);
226   TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kGetKey);
227   // Encode the Get struct members into payload. Make sure add them in order.
228   PutFixed64(&trace.payload, trace.payload_map);
229   PutFixed32(&trace.payload, column_family->GetID());
230   PutLengthPrefixedSlice(&trace.payload, key);
231   return WriteTrace(trace);
232 }
233 
IteratorSeek(const uint32_t & cf_id,const Slice & key,const Slice & lower_bound,const Slice upper_bound)234 Status Tracer::IteratorSeek(const uint32_t& cf_id, const Slice& key,
235                             const Slice& lower_bound, const Slice upper_bound) {
236   TraceType trace_type = kTraceIteratorSeek;
237   if (ShouldSkipTrace(trace_type)) {
238     return Status::OK();
239   }
240   Trace trace;
241   trace.ts = clock_->NowMicros();
242   trace.type = trace_type;
243   // Set the payloadmap of the struct member that will be encoded in the
244   // payload.
245   TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kIterCFID);
246   TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kIterKey);
247   if (lower_bound.size() > 0) {
248     TracerHelper::SetPayloadMap(trace.payload_map,
249                                 TracePayloadType::kIterLowerBound);
250   }
251   if (upper_bound.size() > 0) {
252     TracerHelper::SetPayloadMap(trace.payload_map,
253                                 TracePayloadType::kIterUpperBound);
254   }
255   // Encode the Iterator struct members into payload. Make sure add them in
256   // order.
257   PutFixed64(&trace.payload, trace.payload_map);
258   PutFixed32(&trace.payload, cf_id);
259   PutLengthPrefixedSlice(&trace.payload, key);
260   if (lower_bound.size() > 0) {
261     PutLengthPrefixedSlice(&trace.payload, lower_bound);
262   }
263   if (upper_bound.size() > 0) {
264     PutLengthPrefixedSlice(&trace.payload, upper_bound);
265   }
266   return WriteTrace(trace);
267 }
268 
IteratorSeekForPrev(const uint32_t & cf_id,const Slice & key,const Slice & lower_bound,const Slice upper_bound)269 Status Tracer::IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key,
270                                    const Slice& lower_bound,
271                                    const Slice upper_bound) {
272   TraceType trace_type = kTraceIteratorSeekForPrev;
273   if (ShouldSkipTrace(trace_type)) {
274     return Status::OK();
275   }
276   Trace trace;
277   trace.ts = clock_->NowMicros();
278   trace.type = trace_type;
279   // Set the payloadmap of the struct member that will be encoded in the
280   // payload.
281   TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kIterCFID);
282   TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kIterKey);
283   if (lower_bound.size() > 0) {
284     TracerHelper::SetPayloadMap(trace.payload_map,
285                                 TracePayloadType::kIterLowerBound);
286   }
287   if (upper_bound.size() > 0) {
288     TracerHelper::SetPayloadMap(trace.payload_map,
289                                 TracePayloadType::kIterUpperBound);
290   }
291   // Encode the Iterator struct members into payload. Make sure add them in
292   // order.
293   PutFixed64(&trace.payload, trace.payload_map);
294   PutFixed32(&trace.payload, cf_id);
295   PutLengthPrefixedSlice(&trace.payload, key);
296   if (lower_bound.size() > 0) {
297     PutLengthPrefixedSlice(&trace.payload, lower_bound);
298   }
299   if (upper_bound.size() > 0) {
300     PutLengthPrefixedSlice(&trace.payload, upper_bound);
301   }
302   return WriteTrace(trace);
303 }
304 
ShouldSkipTrace(const TraceType & trace_type)305 bool Tracer::ShouldSkipTrace(const TraceType& trace_type) {
306   if (IsTraceFileOverMax()) {
307     return true;
308   }
309   if ((trace_options_.filter & kTraceFilterGet
310     && trace_type == kTraceGet)
311    || (trace_options_.filter & kTraceFilterWrite
312     && trace_type == kTraceWrite)) {
313     return true;
314   }
315   ++trace_request_count_;
316   if (trace_request_count_ < trace_options_.sampling_frequency) {
317     return true;
318   }
319   trace_request_count_ = 0;
320   return false;
321 }
322 
IsTraceFileOverMax()323 bool Tracer::IsTraceFileOverMax() {
324   uint64_t trace_file_size = trace_writer_->GetFileSize();
325   return (trace_file_size > trace_options_.max_trace_file_size);
326 }
327 
WriteHeader()328 Status Tracer::WriteHeader() {
329   std::ostringstream s;
330   s << kTraceMagic << "\t"
331     << "Trace Version: " << kTraceFileMajorVersion << "."
332     << kTraceFileMinorVersion << "\t"
333     << "RocksDB Version: " << kMajorVersion << "." << kMinorVersion << "\t"
334     << "Format: Timestamp OpType Payload\n";
335   std::string header(s.str());
336 
337   Trace trace;
338   trace.ts = clock_->NowMicros();
339   trace.type = kTraceBegin;
340   trace.payload = header;
341   return WriteTrace(trace);
342 }
343 
WriteFooter()344 Status Tracer::WriteFooter() {
345   Trace trace;
346   trace.ts = clock_->NowMicros();
347   trace.type = kTraceEnd;
348   TracerHelper::SetPayloadMap(trace.payload_map,
349                               TracePayloadType::kEmptyPayload);
350   trace.payload = "";
351   return WriteTrace(trace);
352 }
353 
WriteTrace(const Trace & trace)354 Status Tracer::WriteTrace(const Trace& trace) {
355   std::string encoded_trace;
356   TracerHelper::EncodeTrace(trace, &encoded_trace);
357   return trace_writer_->Write(Slice(encoded_trace));
358 }
359 
Close()360 Status Tracer::Close() { return WriteFooter(); }
361 
Replayer(DB * db,const std::vector<ColumnFamilyHandle * > & handles,std::unique_ptr<TraceReader> && reader)362 Replayer::Replayer(DB* db, const std::vector<ColumnFamilyHandle*>& handles,
363                    std::unique_ptr<TraceReader>&& reader)
364     : trace_reader_(std::move(reader)) {
365   assert(db != nullptr);
366   db_ = static_cast<DBImpl*>(db->GetRootDB());
367   env_ = Env::Default();
368   for (ColumnFamilyHandle* cfh : handles) {
369     cf_map_[cfh->GetID()] = cfh;
370   }
371   fast_forward_ = 1;
372 }
373 
~Replayer()374 Replayer::~Replayer() { trace_reader_.reset(); }
375 
SetFastForward(uint32_t fast_forward)376 Status Replayer::SetFastForward(uint32_t fast_forward) {
377   Status s;
378   if (fast_forward < 1) {
379     s = Status::InvalidArgument("Wrong fast forward speed!");
380   } else {
381     fast_forward_ = fast_forward;
382     s = Status::OK();
383   }
384   return s;
385 }
386 
Replay()387 Status Replayer::Replay() {
388   Status s;
389   Trace header;
390   int db_version;
391   s = ReadHeader(&header);
392   if (!s.ok()) {
393     return s;
394   }
395   s = TracerHelper::ParseTraceHeader(header, &trace_file_version_, &db_version);
396   if (!s.ok()) {
397     return s;
398   }
399 
400   std::chrono::system_clock::time_point replay_epoch =
401       std::chrono::system_clock::now();
402   WriteOptions woptions;
403   ReadOptions roptions;
404   Trace trace;
405   uint64_t ops = 0;
406   Iterator* single_iter = nullptr;
407   while (s.ok()) {
408     trace.reset();
409     s = ReadTrace(&trace);
410     if (!s.ok()) {
411       break;
412     }
413 
414     std::this_thread::sleep_until(
415         replay_epoch +
416         std::chrono::microseconds((trace.ts - header.ts) / fast_forward_));
417     if (trace.type == kTraceWrite) {
418       if (trace_file_version_ < 2) {
419         WriteBatch batch(trace.payload);
420         db_->Write(woptions, &batch);
421       } else {
422         WritePayload w_payload;
423         TracerHelper::DecodeWritePayload(&trace, &w_payload);
424         WriteBatch batch(w_payload.write_batch_data.ToString());
425         db_->Write(woptions, &batch);
426       }
427       ops++;
428     } else if (trace.type == kTraceGet) {
429       GetPayload get_payload;
430       get_payload.cf_id = 0;
431       get_payload.get_key = 0;
432       if (trace_file_version_ < 2) {
433         DecodeCFAndKey(trace.payload, &get_payload.cf_id, &get_payload.get_key);
434       } else {
435         TracerHelper::DecodeGetPayload(&trace, &get_payload);
436       }
437       if (get_payload.cf_id > 0 &&
438           cf_map_.find(get_payload.cf_id) == cf_map_.end()) {
439         return Status::Corruption("Invalid Column Family ID.");
440       }
441 
442       std::string value;
443       if (get_payload.cf_id == 0) {
444         db_->Get(roptions, get_payload.get_key, &value);
445       } else {
446         db_->Get(roptions, cf_map_[get_payload.cf_id], get_payload.get_key,
447                  &value);
448       }
449       ops++;
450     } else if (trace.type == kTraceIteratorSeek) {
451       // Currently, we only support to call Seek. The Next() and Prev() is not
452       // supported.
453       IterPayload iter_payload;
454       iter_payload.cf_id = 0;
455       if (trace_file_version_ < 2) {
456         DecodeCFAndKey(trace.payload, &iter_payload.cf_id,
457                        &iter_payload.iter_key);
458       } else {
459         TracerHelper::DecodeIterPayload(&trace, &iter_payload);
460       }
461       if (iter_payload.cf_id > 0 &&
462           cf_map_.find(iter_payload.cf_id) == cf_map_.end()) {
463         return Status::Corruption("Invalid Column Family ID.");
464       }
465 
466       if (iter_payload.cf_id == 0) {
467         single_iter = db_->NewIterator(roptions);
468       } else {
469         single_iter = db_->NewIterator(roptions, cf_map_[iter_payload.cf_id]);
470       }
471       single_iter->Seek(iter_payload.iter_key);
472       ops++;
473       delete single_iter;
474     } else if (trace.type == kTraceIteratorSeekForPrev) {
475       // Currently, we only support to call SeekForPrev. The Next() and Prev()
476       // is not supported.
477       IterPayload iter_payload;
478       iter_payload.cf_id = 0;
479       if (trace_file_version_ < 2) {
480         DecodeCFAndKey(trace.payload, &iter_payload.cf_id,
481                        &iter_payload.iter_key);
482       } else {
483         TracerHelper::DecodeIterPayload(&trace, &iter_payload);
484       }
485       if (iter_payload.cf_id > 0 &&
486           cf_map_.find(iter_payload.cf_id) == cf_map_.end()) {
487         return Status::Corruption("Invalid Column Family ID.");
488       }
489 
490       if (iter_payload.cf_id == 0) {
491         single_iter = db_->NewIterator(roptions);
492       } else {
493         single_iter = db_->NewIterator(roptions, cf_map_[iter_payload.cf_id]);
494       }
495       single_iter->SeekForPrev(iter_payload.iter_key);
496       ops++;
497       delete single_iter;
498     } else if (trace.type == kTraceEnd) {
499       // Do nothing for now.
500       // TODO: Add some validations later.
501       break;
502     }
503   }
504 
505   if (s.IsIncomplete()) {
506     // Reaching eof returns Incomplete status at the moment.
507     // Could happen when killing a process without calling EndTrace() API.
508     // TODO: Add better error handling.
509     return Status::OK();
510   }
511   return s;
512 }
513 
514 // The trace can be replayed with multithread by configurnge the number of
515 // threads in the thread pool. Trace records are read from the trace file
516 // sequentially and the corresponding queries are scheduled in the task
517 // queue based on the timestamp. Currently, we support Write_batch (Put,
518 // Delete, SingleDelete, DeleteRange), Get, Iterator (Seek and SeekForPrev).
MultiThreadReplay(uint32_t threads_num)519 Status Replayer::MultiThreadReplay(uint32_t threads_num) {
520   Status s;
521   Trace header;
522   int db_version;
523   s = ReadHeader(&header);
524   if (!s.ok()) {
525     return s;
526   }
527   s = TracerHelper::ParseTraceHeader(header, &trace_file_version_, &db_version);
528   if (!s.ok()) {
529     return s;
530   }
531   ThreadPoolImpl thread_pool;
532   thread_pool.SetHostEnv(env_);
533 
534   if (threads_num > 1) {
535     thread_pool.SetBackgroundThreads(static_cast<int>(threads_num));
536   } else {
537     thread_pool.SetBackgroundThreads(1);
538   }
539 
540   std::chrono::system_clock::time_point replay_epoch =
541       std::chrono::system_clock::now();
542   WriteOptions woptions;
543   ReadOptions roptions;
544   uint64_t ops = 0;
545   while (s.ok()) {
546     std::unique_ptr<ReplayerWorkerArg> ra(new ReplayerWorkerArg);
547     ra->db = db_;
548     s = ReadTrace(&(ra->trace_entry));
549     if (!s.ok()) {
550       break;
551     }
552     ra->cf_map = &cf_map_;
553     ra->woptions = woptions;
554     ra->roptions = roptions;
555     ra->trace_file_version = trace_file_version_;
556 
557     std::this_thread::sleep_until(
558         replay_epoch + std::chrono::microseconds(
559                            (ra->trace_entry.ts - header.ts) / fast_forward_));
560     if (ra->trace_entry.type == kTraceWrite) {
561       thread_pool.Schedule(&Replayer::BGWorkWriteBatch, ra.release(), nullptr,
562                            nullptr);
563       ops++;
564     } else if (ra->trace_entry.type == kTraceGet) {
565       thread_pool.Schedule(&Replayer::BGWorkGet, ra.release(), nullptr,
566                            nullptr);
567       ops++;
568     } else if (ra->trace_entry.type == kTraceIteratorSeek) {
569       thread_pool.Schedule(&Replayer::BGWorkIterSeek, ra.release(), nullptr,
570                            nullptr);
571       ops++;
572     } else if (ra->trace_entry.type == kTraceIteratorSeekForPrev) {
573       thread_pool.Schedule(&Replayer::BGWorkIterSeekForPrev, ra.release(),
574                            nullptr, nullptr);
575       ops++;
576     } else if (ra->trace_entry.type == kTraceEnd) {
577       // Do nothing for now.
578       // TODO: Add some validations later.
579       break;
580     } else {
581       // Other trace entry types that are not implemented for replay.
582       // To finish the replay, we continue the process.
583       continue;
584     }
585   }
586 
587   if (s.IsIncomplete()) {
588     // Reaching eof returns Incomplete status at the moment.
589     // Could happen when killing a process without calling EndTrace() API.
590     // TODO: Add better error handling.
591     s = Status::OK();
592   }
593   thread_pool.JoinAllThreads();
594   return s;
595 }
596 
ReadHeader(Trace * header)597 Status Replayer::ReadHeader(Trace* header) {
598   assert(header != nullptr);
599   std::string encoded_trace;
600   // Read the trace head
601   Status s = trace_reader_->Read(&encoded_trace);
602   if (!s.ok()) {
603     return s;
604   }
605 
606   s = TracerHelper::DecodeTrace(encoded_trace, header);
607 
608   if (header->type != kTraceBegin) {
609     return Status::Corruption("Corrupted trace file. Incorrect header.");
610   }
611   if (header->payload.substr(0, kTraceMagic.length()) != kTraceMagic) {
612     return Status::Corruption("Corrupted trace file. Incorrect magic.");
613   }
614 
615   return s;
616 }
617 
ReadFooter(Trace * footer)618 Status Replayer::ReadFooter(Trace* footer) {
619   assert(footer != nullptr);
620   Status s = ReadTrace(footer);
621   if (!s.ok()) {
622     return s;
623   }
624   if (footer->type != kTraceEnd) {
625     return Status::Corruption("Corrupted trace file. Incorrect footer.");
626   }
627 
628   // TODO: Add more validations later
629   return s;
630 }
631 
ReadTrace(Trace * trace)632 Status Replayer::ReadTrace(Trace* trace) {
633   assert(trace != nullptr);
634   std::string encoded_trace;
635   Status s = trace_reader_->Read(&encoded_trace);
636   if (!s.ok()) {
637     return s;
638   }
639   return TracerHelper::DecodeTrace(encoded_trace, trace);
640 }
641 
BGWorkGet(void * arg)642 void Replayer::BGWorkGet(void* arg) {
643   std::unique_ptr<ReplayerWorkerArg> ra(
644       reinterpret_cast<ReplayerWorkerArg*>(arg));
645   assert(ra != nullptr);
646   auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
647       ra->cf_map);
648   GetPayload get_payload;
649   get_payload.cf_id = 0;
650   if (ra->trace_file_version < 2) {
651     DecodeCFAndKey(ra->trace_entry.payload, &get_payload.cf_id,
652                    &get_payload.get_key);
653   } else {
654     TracerHelper::DecodeGetPayload(&(ra->trace_entry), &get_payload);
655   }
656   if (get_payload.cf_id > 0 &&
657       cf_map->find(get_payload.cf_id) == cf_map->end()) {
658     return;
659   }
660 
661   std::string value;
662   if (get_payload.cf_id == 0) {
663     ra->db->Get(ra->roptions, get_payload.get_key, &value);
664   } else {
665     ra->db->Get(ra->roptions, (*cf_map)[get_payload.cf_id], get_payload.get_key,
666                 &value);
667   }
668   return;
669 }
670 
BGWorkWriteBatch(void * arg)671 void Replayer::BGWorkWriteBatch(void* arg) {
672   std::unique_ptr<ReplayerWorkerArg> ra(
673       reinterpret_cast<ReplayerWorkerArg*>(arg));
674   assert(ra != nullptr);
675 
676   if (ra->trace_file_version < 2) {
677     WriteBatch batch(ra->trace_entry.payload);
678     ra->db->Write(ra->woptions, &batch);
679   } else {
680     WritePayload w_payload;
681     TracerHelper::DecodeWritePayload(&(ra->trace_entry), &w_payload);
682     WriteBatch batch(w_payload.write_batch_data.ToString());
683     ra->db->Write(ra->woptions, &batch);
684   }
685   return;
686 }
687 
BGWorkIterSeek(void * arg)688 void Replayer::BGWorkIterSeek(void* arg) {
689   std::unique_ptr<ReplayerWorkerArg> ra(
690       reinterpret_cast<ReplayerWorkerArg*>(arg));
691   assert(ra != nullptr);
692   auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
693       ra->cf_map);
694   IterPayload iter_payload;
695   iter_payload.cf_id = 0;
696 
697   if (ra->trace_file_version < 2) {
698     DecodeCFAndKey(ra->trace_entry.payload, &iter_payload.cf_id,
699                    &iter_payload.iter_key);
700   } else {
701     TracerHelper::DecodeIterPayload(&(ra->trace_entry), &iter_payload);
702   }
703   if (iter_payload.cf_id > 0 &&
704       cf_map->find(iter_payload.cf_id) == cf_map->end()) {
705     return;
706   }
707 
708   Iterator* single_iter = nullptr;
709   if (iter_payload.cf_id == 0) {
710     single_iter = ra->db->NewIterator(ra->roptions);
711   } else {
712     single_iter =
713         ra->db->NewIterator(ra->roptions, (*cf_map)[iter_payload.cf_id]);
714   }
715   single_iter->Seek(iter_payload.iter_key);
716   delete single_iter;
717   return;
718 }
719 
BGWorkIterSeekForPrev(void * arg)720 void Replayer::BGWorkIterSeekForPrev(void* arg) {
721   std::unique_ptr<ReplayerWorkerArg> ra(
722       reinterpret_cast<ReplayerWorkerArg*>(arg));
723   assert(ra != nullptr);
724   auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
725       ra->cf_map);
726   IterPayload iter_payload;
727   iter_payload.cf_id = 0;
728 
729   if (ra->trace_file_version < 2) {
730     DecodeCFAndKey(ra->trace_entry.payload, &iter_payload.cf_id,
731                    &iter_payload.iter_key);
732   } else {
733     TracerHelper::DecodeIterPayload(&(ra->trace_entry), &iter_payload);
734   }
735   if (iter_payload.cf_id > 0 &&
736       cf_map->find(iter_payload.cf_id) == cf_map->end()) {
737     return;
738   }
739 
740   Iterator* single_iter = nullptr;
741   if (iter_payload.cf_id == 0) {
742     single_iter = ra->db->NewIterator(ra->roptions);
743   } else {
744     single_iter =
745         ra->db->NewIterator(ra->roptions, (*cf_map)[iter_payload.cf_id]);
746   }
747   single_iter->SeekForPrev(iter_payload.iter_key);
748   delete single_iter;
749   return;
750 }
751 
752 }  // namespace ROCKSDB_NAMESPACE
753