1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #ifndef ROCKSDB_LITE
7 
8 #include "utilities/trace/replayer_impl.h"
9 
10 #include <cmath>
11 #include <thread>
12 
13 #include "rocksdb/options.h"
14 #include "rocksdb/slice.h"
15 #include "rocksdb/system_clock.h"
16 #include "util/threadpool_imp.h"
17 
18 namespace ROCKSDB_NAMESPACE {
19 
ReplayerImpl(DB * db,const std::vector<ColumnFamilyHandle * > & handles,std::unique_ptr<TraceReader> && reader)20 ReplayerImpl::ReplayerImpl(DB* db,
21                            const std::vector<ColumnFamilyHandle*>& handles,
22                            std::unique_ptr<TraceReader>&& reader)
23     : Replayer(),
24       trace_reader_(std::move(reader)),
25       prepared_(false),
26       trace_end_(false),
27       header_ts_(0),
28       exec_handler_(TraceRecord::NewExecutionHandler(db, handles)),
29       env_(db->GetEnv()),
30       trace_file_version_(-1) {}
31 
~ReplayerImpl()32 ReplayerImpl::~ReplayerImpl() {
33   exec_handler_.reset();
34   trace_reader_.reset();
35 }
36 
Prepare()37 Status ReplayerImpl::Prepare() {
38   Trace header;
39   int db_version;
40   Status s = ReadHeader(&header);
41   if (!s.ok()) {
42     return s;
43   }
44   s = TracerHelper::ParseTraceHeader(header, &trace_file_version_, &db_version);
45   if (!s.ok()) {
46     return s;
47   }
48   header_ts_ = header.ts;
49   prepared_ = true;
50   trace_end_ = false;
51   return Status::OK();
52 }
53 
Next(std::unique_ptr<TraceRecord> * record)54 Status ReplayerImpl::Next(std::unique_ptr<TraceRecord>* record) {
55   if (!prepared_) {
56     return Status::Incomplete("Not prepared!");
57   }
58   if (trace_end_) {
59     return Status::Incomplete("Trace end.");
60   }
61 
62   Trace trace;
63   Status s = ReadTrace(&trace);  // ReadTrace is atomic
64   // Reached the trace end.
65   if (s.ok() && trace.type == kTraceEnd) {
66     trace_end_ = true;
67     return Status::Incomplete("Trace end.");
68   }
69   if (!s.ok() || record == nullptr) {
70     return s;
71   }
72 
73   return TracerHelper::DecodeTraceRecord(&trace, trace_file_version_, record);
74 }
75 
Execute(const std::unique_ptr<TraceRecord> & record,std::unique_ptr<TraceRecordResult> * result)76 Status ReplayerImpl::Execute(const std::unique_ptr<TraceRecord>& record,
77                              std::unique_ptr<TraceRecordResult>* result) {
78   return record->Accept(exec_handler_.get(), result);
79 }
80 
Replay(const ReplayOptions & options,const std::function<void (Status,std::unique_ptr<TraceRecordResult> &&)> & result_callback)81 Status ReplayerImpl::Replay(
82     const ReplayOptions& options,
83     const std::function<void(Status, std::unique_ptr<TraceRecordResult>&&)>&
84         result_callback) {
85   if (options.fast_forward <= 0.0) {
86     return Status::InvalidArgument("Wrong fast forward speed!");
87   }
88 
89   if (!prepared_) {
90     return Status::Incomplete("Not prepared!");
91   }
92   if (trace_end_) {
93     return Status::Incomplete("Trace end.");
94   }
95 
96   Status s = Status::OK();
97 
98   if (options.num_threads <= 1) {
99     // num_threads == 0 or num_threads == 1 uses single thread.
100     std::chrono::system_clock::time_point replay_epoch =
101         std::chrono::system_clock::now();
102 
103     while (s.ok()) {
104       Trace trace;
105       s = ReadTrace(&trace);
106       // If already at trace end, ReadTrace should return Status::Incomplete().
107       if (!s.ok()) {
108         break;
109       }
110 
111       // No need to sleep before breaking the loop if at the trace end.
112       if (trace.type == kTraceEnd) {
113         trace_end_ = true;
114         s = Status::Incomplete("Trace end.");
115         break;
116       }
117 
118       // In single-threaded replay, decode first then sleep.
119       std::unique_ptr<TraceRecord> record;
120       s = TracerHelper::DecodeTraceRecord(&trace, trace_file_version_, &record);
121       if (!s.ok() && !s.IsNotSupported()) {
122         break;
123       }
124 
125       std::chrono::system_clock::time_point sleep_to =
126           replay_epoch +
127           std::chrono::microseconds(static_cast<uint64_t>(std::llround(
128               1.0 * (trace.ts - header_ts_) / options.fast_forward)));
129       if (sleep_to > std::chrono::system_clock::now()) {
130         std::this_thread::sleep_until(sleep_to);
131       }
132 
133       // Skip unsupported traces, stop for other errors.
134       if (s.IsNotSupported()) {
135         if (result_callback != nullptr) {
136           result_callback(s, nullptr);
137         }
138         s = Status::OK();
139         continue;
140       }
141 
142       if (result_callback == nullptr) {
143         s = Execute(record, nullptr);
144       } else {
145         std::unique_ptr<TraceRecordResult> res;
146         s = Execute(record, &res);
147         result_callback(s, std::move(res));
148       }
149     }
150   } else {
151     // Multi-threaded replay.
152     ThreadPoolImpl thread_pool;
153     thread_pool.SetHostEnv(env_);
154     thread_pool.SetBackgroundThreads(static_cast<int>(options.num_threads));
155 
156     std::mutex mtx;
157     // Background decoding and execution status.
158     Status bg_s = Status::OK();
159     uint64_t last_err_ts = static_cast<uint64_t>(-1);
160     // Callback function used in background work to update bg_s for the ealiest
161     // TraceRecord which has execution error. This is different from the
162     // timestamp of the first execution error (either start or end timestamp).
163     //
164     // Suppose TraceRecord R1, R2, with timestamps T1 < T2. Their execution
165     // timestamps are T1_start, T1_end, T2_start, T2_end.
166     // Single-thread: there must be T1_start < T1_end < T2_start < T2_end.
167     // Multi-thread: T1_start < T2_start may not be enforced. Orders of them are
168     // totally unknown.
169     // In order to report the same `first` error in both single-thread and
170     // multi-thread replay, we can only rely on the TraceRecords' timestamps,
171     // rather than their executin timestamps. Although in single-thread replay,
172     // the first error is also the last error, while in multi-thread replay, the
173     // first error may not be the first error in execution, and it may not be
174     // the last error in exeution as well.
175     auto error_cb = [&mtx, &bg_s, &last_err_ts](Status err, uint64_t err_ts) {
176       std::lock_guard<std::mutex> gd(mtx);
177       // Only record the first error.
178       if (!err.ok() && !err.IsNotSupported() && err_ts < last_err_ts) {
179         bg_s = err;
180         last_err_ts = err_ts;
181       }
182     };
183 
184     std::chrono::system_clock::time_point replay_epoch =
185         std::chrono::system_clock::now();
186 
187     while (bg_s.ok() && s.ok()) {
188       Trace trace;
189       s = ReadTrace(&trace);
190       // If already at trace end, ReadTrace should return Status::Incomplete().
191       if (!s.ok()) {
192         break;
193       }
194 
195       TraceType trace_type = trace.type;
196 
197       // No need to sleep before breaking the loop if at the trace end.
198       if (trace_type == kTraceEnd) {
199         trace_end_ = true;
200         s = Status::Incomplete("Trace end.");
201         break;
202       }
203 
204       // In multi-threaded replay, sleep first then start decoding and
205       // execution in a thread.
206       std::chrono::system_clock::time_point sleep_to =
207           replay_epoch +
208           std::chrono::microseconds(static_cast<uint64_t>(std::llround(
209               1.0 * (trace.ts - header_ts_) / options.fast_forward)));
210       if (sleep_to > std::chrono::system_clock::now()) {
211         std::this_thread::sleep_until(sleep_to);
212       }
213 
214       if (trace_type == kTraceWrite || trace_type == kTraceGet ||
215           trace_type == kTraceIteratorSeek ||
216           trace_type == kTraceIteratorSeekForPrev ||
217           trace_type == kTraceMultiGet) {
218         std::unique_ptr<ReplayerWorkerArg> ra(new ReplayerWorkerArg);
219         ra->trace_entry = std::move(trace);
220         ra->handler = exec_handler_.get();
221         ra->trace_file_version = trace_file_version_;
222         ra->error_cb = error_cb;
223         ra->result_cb = result_callback;
224         thread_pool.Schedule(&ReplayerImpl::BackgroundWork, ra.release(),
225                              nullptr, nullptr);
226       } else {
227         // Skip unsupported traces.
228         if (result_callback != nullptr) {
229           result_callback(Status::NotSupported("Unsupported trace type."),
230                           nullptr);
231         }
232       }
233     }
234 
235     thread_pool.WaitForJobsAndJoinAllThreads();
236     if (!bg_s.ok()) {
237       s = bg_s;
238     }
239   }
240 
241   if (s.IsIncomplete()) {
242     // Reaching eof returns Incomplete status at the moment.
243     // Could happen when killing a process without calling EndTrace() API.
244     // TODO: Add better error handling.
245     trace_end_ = true;
246     return Status::OK();
247   }
248   return s;
249 }
250 
GetHeaderTimestamp() const251 uint64_t ReplayerImpl::GetHeaderTimestamp() const { return header_ts_; }
252 
ReadHeader(Trace * header)253 Status ReplayerImpl::ReadHeader(Trace* header) {
254   assert(header != nullptr);
255   Status s = trace_reader_->Reset();
256   if (!s.ok()) {
257     return s;
258   }
259   std::string encoded_trace;
260   // Read the trace head
261   s = trace_reader_->Read(&encoded_trace);
262   if (!s.ok()) {
263     return s;
264   }
265 
266   return TracerHelper::DecodeHeader(encoded_trace, header);
267 }
268 
ReadTrace(Trace * trace)269 Status ReplayerImpl::ReadTrace(Trace* trace) {
270   assert(trace != nullptr);
271   std::string encoded_trace;
272   // We don't know if TraceReader is implemented thread-safe, so we protect the
273   // reading trace part with a mutex. The decoding part does not need to be
274   // protected since it's local.
275   {
276     std::lock_guard<std::mutex> guard(mutex_);
277     Status s = trace_reader_->Read(&encoded_trace);
278     if (!s.ok()) {
279       return s;
280     }
281   }
282   return TracerHelper::DecodeTrace(encoded_trace, trace);
283 }
284 
BackgroundWork(void * arg)285 void ReplayerImpl::BackgroundWork(void* arg) {
286   std::unique_ptr<ReplayerWorkerArg> ra(
287       reinterpret_cast<ReplayerWorkerArg*>(arg));
288   assert(ra != nullptr);
289 
290   std::unique_ptr<TraceRecord> record;
291   Status s = TracerHelper::DecodeTraceRecord(&(ra->trace_entry),
292                                              ra->trace_file_version, &record);
293   if (!s.ok()) {
294     // Stop the replay
295     if (ra->error_cb != nullptr) {
296       ra->error_cb(s, ra->trace_entry.ts);
297     }
298     // Report the result
299     if (ra->result_cb != nullptr) {
300       ra->result_cb(s, nullptr);
301     }
302     return;
303   }
304 
305   if (ra->result_cb == nullptr) {
306     s = record->Accept(ra->handler, nullptr);
307   } else {
308     std::unique_ptr<TraceRecordResult> res;
309     s = record->Accept(ra->handler, &res);
310     ra->result_cb(s, std::move(res));
311   }
312   record.reset();
313 }
314 
315 }  // namespace ROCKSDB_NAMESPACE
316 #endif  // ROCKSDB_LITE
317