1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #include "trace_replay/trace_replay.h"
7 
8 #include <chrono>
9 #include <sstream>
10 #include <thread>
11 #include "db/db_impl/db_impl.h"
12 #include "rocksdb/slice.h"
13 #include "rocksdb/write_batch.h"
14 #include "util/coding.h"
15 #include "util/string_util.h"
16 #include "util/threadpool_imp.h"
17 
18 namespace ROCKSDB_NAMESPACE {
19 
20 const std::string kTraceMagic = "feedcafedeadbeef";
21 
22 namespace {
EncodeCFAndKey(std::string * dst,uint32_t cf_id,const Slice & key)23 void EncodeCFAndKey(std::string* dst, uint32_t cf_id, const Slice& key) {
24   PutFixed32(dst, cf_id);
25   PutLengthPrefixedSlice(dst, key);
26 }
27 
DecodeCFAndKey(std::string & buffer,uint32_t * cf_id,Slice * key)28 void DecodeCFAndKey(std::string& buffer, uint32_t* cf_id, Slice* key) {
29   Slice buf(buffer);
30   GetFixed32(&buf, cf_id);
31   GetLengthPrefixedSlice(&buf, key);
32 }
33 }  // namespace
34 
EncodeTrace(const Trace & trace,std::string * encoded_trace)35 void TracerHelper::EncodeTrace(const Trace& trace, std::string* encoded_trace) {
36   assert(encoded_trace);
37   PutFixed64(encoded_trace, trace.ts);
38   encoded_trace->push_back(trace.type);
39   PutFixed32(encoded_trace, static_cast<uint32_t>(trace.payload.size()));
40   encoded_trace->append(trace.payload);
41 }
42 
DecodeTrace(const std::string & encoded_trace,Trace * trace)43 Status TracerHelper::DecodeTrace(const std::string& encoded_trace,
44                                  Trace* trace) {
45   assert(trace != nullptr);
46   Slice enc_slice = Slice(encoded_trace);
47   if (!GetFixed64(&enc_slice, &trace->ts)) {
48     return Status::Incomplete("Decode trace string failed");
49   }
50   if (enc_slice.size() < kTraceTypeSize + kTracePayloadLengthSize) {
51     return Status::Incomplete("Decode trace string failed");
52   }
53   trace->type = static_cast<TraceType>(enc_slice[0]);
54   enc_slice.remove_prefix(kTraceTypeSize + kTracePayloadLengthSize);
55   trace->payload = enc_slice.ToString();
56   return Status::OK();
57 }
58 
Tracer(Env * env,const TraceOptions & trace_options,std::unique_ptr<TraceWriter> && trace_writer)59 Tracer::Tracer(Env* env, const TraceOptions& trace_options,
60                std::unique_ptr<TraceWriter>&& trace_writer)
61     : env_(env),
62       trace_options_(trace_options),
63       trace_writer_(std::move(trace_writer)),
64       trace_request_count_ (0) {
65   WriteHeader();
66 }
67 
~Tracer()68 Tracer::~Tracer() { trace_writer_.reset(); }
69 
Write(WriteBatch * write_batch)70 Status Tracer::Write(WriteBatch* write_batch) {
71   TraceType trace_type = kTraceWrite;
72   if (ShouldSkipTrace(trace_type)) {
73     return Status::OK();
74   }
75   Trace trace;
76   trace.ts = env_->NowMicros();
77   trace.type = trace_type;
78   trace.payload = write_batch->Data();
79   return WriteTrace(trace);
80 }
81 
Get(ColumnFamilyHandle * column_family,const Slice & key)82 Status Tracer::Get(ColumnFamilyHandle* column_family, const Slice& key) {
83   TraceType trace_type = kTraceGet;
84   if (ShouldSkipTrace(trace_type)) {
85     return Status::OK();
86   }
87   Trace trace;
88   trace.ts = env_->NowMicros();
89   trace.type = trace_type;
90   EncodeCFAndKey(&trace.payload, column_family->GetID(), key);
91   return WriteTrace(trace);
92 }
93 
IteratorSeek(const uint32_t & cf_id,const Slice & key)94 Status Tracer::IteratorSeek(const uint32_t& cf_id, const Slice& key) {
95   TraceType trace_type = kTraceIteratorSeek;
96   if (ShouldSkipTrace(trace_type)) {
97     return Status::OK();
98   }
99   Trace trace;
100   trace.ts = env_->NowMicros();
101   trace.type = trace_type;
102   EncodeCFAndKey(&trace.payload, cf_id, key);
103   return WriteTrace(trace);
104 }
105 
IteratorSeekForPrev(const uint32_t & cf_id,const Slice & key)106 Status Tracer::IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key) {
107   TraceType trace_type = kTraceIteratorSeekForPrev;
108   if (ShouldSkipTrace(trace_type)) {
109     return Status::OK();
110   }
111   Trace trace;
112   trace.ts = env_->NowMicros();
113   trace.type = trace_type;
114   EncodeCFAndKey(&trace.payload, cf_id, key);
115   return WriteTrace(trace);
116 }
117 
ShouldSkipTrace(const TraceType & trace_type)118 bool Tracer::ShouldSkipTrace(const TraceType& trace_type) {
119   if (IsTraceFileOverMax()) {
120     return true;
121   }
122   if ((trace_options_.filter & kTraceFilterGet
123     && trace_type == kTraceGet)
124    || (trace_options_.filter & kTraceFilterWrite
125     && trace_type == kTraceWrite)) {
126     return true;
127   }
128   ++trace_request_count_;
129   if (trace_request_count_ < trace_options_.sampling_frequency) {
130     return true;
131   }
132   trace_request_count_ = 0;
133   return false;
134 }
135 
IsTraceFileOverMax()136 bool Tracer::IsTraceFileOverMax() {
137   uint64_t trace_file_size = trace_writer_->GetFileSize();
138   return (trace_file_size > trace_options_.max_trace_file_size);
139 }
140 
WriteHeader()141 Status Tracer::WriteHeader() {
142   std::ostringstream s;
143   s << kTraceMagic << "\t"
144     << "Trace Version: 0.1\t"
145     << "RocksDB Version: " << kMajorVersion << "." << kMinorVersion << "\t"
146     << "Format: Timestamp OpType Payload\n";
147   std::string header(s.str());
148 
149   Trace trace;
150   trace.ts = env_->NowMicros();
151   trace.type = kTraceBegin;
152   trace.payload = header;
153   return WriteTrace(trace);
154 }
155 
WriteFooter()156 Status Tracer::WriteFooter() {
157   Trace trace;
158   trace.ts = env_->NowMicros();
159   trace.type = kTraceEnd;
160   trace.payload = "";
161   return WriteTrace(trace);
162 }
163 
WriteTrace(const Trace & trace)164 Status Tracer::WriteTrace(const Trace& trace) {
165   std::string encoded_trace;
166   TracerHelper::EncodeTrace(trace, &encoded_trace);
167   return trace_writer_->Write(Slice(encoded_trace));
168 }
169 
Close()170 Status Tracer::Close() { return WriteFooter(); }
171 
Replayer(DB * db,const std::vector<ColumnFamilyHandle * > & handles,std::unique_ptr<TraceReader> && reader)172 Replayer::Replayer(DB* db, const std::vector<ColumnFamilyHandle*>& handles,
173                    std::unique_ptr<TraceReader>&& reader)
174     : trace_reader_(std::move(reader)) {
175   assert(db != nullptr);
176   db_ = static_cast<DBImpl*>(db->GetRootDB());
177   env_ = Env::Default();
178   for (ColumnFamilyHandle* cfh : handles) {
179     cf_map_[cfh->GetID()] = cfh;
180   }
181   fast_forward_ = 1;
182 }
183 
~Replayer()184 Replayer::~Replayer() { trace_reader_.reset(); }
185 
SetFastForward(uint32_t fast_forward)186 Status Replayer::SetFastForward(uint32_t fast_forward) {
187   Status s;
188   if (fast_forward < 1) {
189     s = Status::InvalidArgument("Wrong fast forward speed!");
190   } else {
191     fast_forward_ = fast_forward;
192     s = Status::OK();
193   }
194   return s;
195 }
196 
Replay()197 Status Replayer::Replay() {
198   Status s;
199   Trace header;
200   s = ReadHeader(&header);
201   if (!s.ok()) {
202     return s;
203   }
204 
205   std::chrono::system_clock::time_point replay_epoch =
206       std::chrono::system_clock::now();
207   WriteOptions woptions;
208   ReadOptions roptions;
209   Trace trace;
210   uint64_t ops = 0;
211   Iterator* single_iter = nullptr;
212   while (s.ok()) {
213     trace.reset();
214     s = ReadTrace(&trace);
215     if (!s.ok()) {
216       break;
217     }
218 
219     std::this_thread::sleep_until(
220         replay_epoch +
221         std::chrono::microseconds((trace.ts - header.ts) / fast_forward_));
222     if (trace.type == kTraceWrite) {
223       WriteBatch batch(trace.payload);
224       db_->Write(woptions, &batch);
225       ops++;
226     } else if (trace.type == kTraceGet) {
227       uint32_t cf_id = 0;
228       Slice key;
229       DecodeCFAndKey(trace.payload, &cf_id, &key);
230       if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) {
231         return Status::Corruption("Invalid Column Family ID.");
232       }
233 
234       std::string value;
235       if (cf_id == 0) {
236         db_->Get(roptions, key, &value);
237       } else {
238         db_->Get(roptions, cf_map_[cf_id], key, &value);
239       }
240       ops++;
241     } else if (trace.type == kTraceIteratorSeek) {
242       uint32_t cf_id = 0;
243       Slice key;
244       DecodeCFAndKey(trace.payload, &cf_id, &key);
245       if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) {
246         return Status::Corruption("Invalid Column Family ID.");
247       }
248 
249       if (cf_id == 0) {
250         single_iter = db_->NewIterator(roptions);
251       } else {
252         single_iter = db_->NewIterator(roptions, cf_map_[cf_id]);
253       }
254       single_iter->Seek(key);
255       ops++;
256       delete single_iter;
257     } else if (trace.type == kTraceIteratorSeekForPrev) {
258       // Currently, only support to call the Seek()
259       uint32_t cf_id = 0;
260       Slice key;
261       DecodeCFAndKey(trace.payload, &cf_id, &key);
262       if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) {
263         return Status::Corruption("Invalid Column Family ID.");
264       }
265 
266       if (cf_id == 0) {
267         single_iter = db_->NewIterator(roptions);
268       } else {
269         single_iter = db_->NewIterator(roptions, cf_map_[cf_id]);
270       }
271       single_iter->SeekForPrev(key);
272       ops++;
273       delete single_iter;
274     } else if (trace.type == kTraceEnd) {
275       // Do nothing for now.
276       // TODO: Add some validations later.
277       break;
278     }
279   }
280 
281   if (s.IsIncomplete()) {
282     // Reaching eof returns Incomplete status at the moment.
283     // Could happen when killing a process without calling EndTrace() API.
284     // TODO: Add better error handling.
285     return Status::OK();
286   }
287   return s;
288 }
289 
290 // The trace can be replayed with multithread by configurnge the number of
291 // threads in the thread pool. Trace records are read from the trace file
292 // sequentially and the corresponding queries are scheduled in the task
293 // queue based on the timestamp. Currently, we support Write_batch (Put,
294 // Delete, SingleDelete, DeleteRange), Get, Iterator (Seek and SeekForPrev).
MultiThreadReplay(uint32_t threads_num)295 Status Replayer::MultiThreadReplay(uint32_t threads_num) {
296   Status s;
297   Trace header;
298   s = ReadHeader(&header);
299   if (!s.ok()) {
300     return s;
301   }
302 
303   ThreadPoolImpl thread_pool;
304   thread_pool.SetHostEnv(env_);
305 
306   if (threads_num > 1) {
307     thread_pool.SetBackgroundThreads(static_cast<int>(threads_num));
308   } else {
309     thread_pool.SetBackgroundThreads(1);
310   }
311 
312   std::chrono::system_clock::time_point replay_epoch =
313       std::chrono::system_clock::now();
314   WriteOptions woptions;
315   ReadOptions roptions;
316   uint64_t ops = 0;
317   while (s.ok()) {
318     std::unique_ptr<ReplayerWorkerArg> ra(new ReplayerWorkerArg);
319     ra->db = db_;
320     s = ReadTrace(&(ra->trace_entry));
321     if (!s.ok()) {
322       break;
323     }
324     ra->woptions = woptions;
325     ra->roptions = roptions;
326 
327     std::this_thread::sleep_until(
328         replay_epoch + std::chrono::microseconds(
329                            (ra->trace_entry.ts - header.ts) / fast_forward_));
330     if (ra->trace_entry.type == kTraceWrite) {
331       thread_pool.Schedule(&Replayer::BGWorkWriteBatch, ra.release(), nullptr,
332                            nullptr);
333       ops++;
334     } else if (ra->trace_entry.type == kTraceGet) {
335       thread_pool.Schedule(&Replayer::BGWorkGet, ra.release(), nullptr,
336                            nullptr);
337       ops++;
338     } else if (ra->trace_entry.type == kTraceIteratorSeek) {
339       thread_pool.Schedule(&Replayer::BGWorkIterSeek, ra.release(), nullptr,
340                            nullptr);
341       ops++;
342     } else if (ra->trace_entry.type == kTraceIteratorSeekForPrev) {
343       thread_pool.Schedule(&Replayer::BGWorkIterSeekForPrev, ra.release(),
344                            nullptr, nullptr);
345       ops++;
346     } else if (ra->trace_entry.type == kTraceEnd) {
347       // Do nothing for now.
348       // TODO: Add some validations later.
349       break;
350     } else {
351       // Other trace entry types that are not implemented for replay.
352       // To finish the replay, we continue the process.
353       continue;
354     }
355   }
356 
357   if (s.IsIncomplete()) {
358     // Reaching eof returns Incomplete status at the moment.
359     // Could happen when killing a process without calling EndTrace() API.
360     // TODO: Add better error handling.
361     s = Status::OK();
362   }
363   thread_pool.JoinAllThreads();
364   return s;
365 }
366 
ReadHeader(Trace * header)367 Status Replayer::ReadHeader(Trace* header) {
368   assert(header != nullptr);
369   Status s = ReadTrace(header);
370   if (!s.ok()) {
371     return s;
372   }
373   if (header->type != kTraceBegin) {
374     return Status::Corruption("Corrupted trace file. Incorrect header.");
375   }
376   if (header->payload.substr(0, kTraceMagic.length()) != kTraceMagic) {
377     return Status::Corruption("Corrupted trace file. Incorrect magic.");
378   }
379 
380   return s;
381 }
382 
ReadFooter(Trace * footer)383 Status Replayer::ReadFooter(Trace* footer) {
384   assert(footer != nullptr);
385   Status s = ReadTrace(footer);
386   if (!s.ok()) {
387     return s;
388   }
389   if (footer->type != kTraceEnd) {
390     return Status::Corruption("Corrupted trace file. Incorrect footer.");
391   }
392 
393   // TODO: Add more validations later
394   return s;
395 }
396 
ReadTrace(Trace * trace)397 Status Replayer::ReadTrace(Trace* trace) {
398   assert(trace != nullptr);
399   std::string encoded_trace;
400   Status s = trace_reader_->Read(&encoded_trace);
401   if (!s.ok()) {
402     return s;
403   }
404   return TracerHelper::DecodeTrace(encoded_trace, trace);
405 }
406 
BGWorkGet(void * arg)407 void Replayer::BGWorkGet(void* arg) {
408   std::unique_ptr<ReplayerWorkerArg> ra(
409       reinterpret_cast<ReplayerWorkerArg*>(arg));
410   auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
411       ra->cf_map);
412   uint32_t cf_id = 0;
413   Slice key;
414   DecodeCFAndKey(ra->trace_entry.payload, &cf_id, &key);
415   if (cf_id > 0 && cf_map->find(cf_id) == cf_map->end()) {
416     return;
417   }
418 
419   std::string value;
420   if (cf_id == 0) {
421     ra->db->Get(ra->roptions, key, &value);
422   } else {
423     ra->db->Get(ra->roptions, (*cf_map)[cf_id], key, &value);
424   }
425 
426   return;
427 }
428 
BGWorkWriteBatch(void * arg)429 void Replayer::BGWorkWriteBatch(void* arg) {
430   std::unique_ptr<ReplayerWorkerArg> ra(
431       reinterpret_cast<ReplayerWorkerArg*>(arg));
432   WriteBatch batch(ra->trace_entry.payload);
433   ra->db->Write(ra->woptions, &batch);
434   return;
435 }
436 
BGWorkIterSeek(void * arg)437 void Replayer::BGWorkIterSeek(void* arg) {
438   std::unique_ptr<ReplayerWorkerArg> ra(
439       reinterpret_cast<ReplayerWorkerArg*>(arg));
440   auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
441       ra->cf_map);
442   uint32_t cf_id = 0;
443   Slice key;
444   DecodeCFAndKey(ra->trace_entry.payload, &cf_id, &key);
445   if (cf_id > 0 && cf_map->find(cf_id) == cf_map->end()) {
446     return;
447   }
448 
449   std::string value;
450   Iterator* single_iter = nullptr;
451   if (cf_id == 0) {
452     single_iter = ra->db->NewIterator(ra->roptions);
453   } else {
454     single_iter = ra->db->NewIterator(ra->roptions, (*cf_map)[cf_id]);
455   }
456   single_iter->Seek(key);
457   delete single_iter;
458   return;
459 }
460 
BGWorkIterSeekForPrev(void * arg)461 void Replayer::BGWorkIterSeekForPrev(void* arg) {
462   std::unique_ptr<ReplayerWorkerArg> ra(
463       reinterpret_cast<ReplayerWorkerArg*>(arg));
464   auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
465       ra->cf_map);
466   uint32_t cf_id = 0;
467   Slice key;
468   DecodeCFAndKey(ra->trace_entry.payload, &cf_id, &key);
469   if (cf_id > 0 && cf_map->find(cf_id) == cf_map->end()) {
470     return;
471   }
472 
473   std::string value;
474   Iterator* single_iter = nullptr;
475   if (cf_id == 0) {
476     single_iter = ra->db->NewIterator(ra->roptions);
477   } else {
478     single_iter = ra->db->NewIterator(ra->roptions, (*cf_map)[cf_id]);
479   }
480   single_iter->SeekForPrev(key);
481   delete single_iter;
482   return;
483 }
484 
485 }  // namespace ROCKSDB_NAMESPACE
486