1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #pragma once
7 #ifndef ROCKSDB_LITE
8 
9 #include <atomic>
10 #include <functional>
11 #include <memory>
12 #include <mutex>
13 #include <unordered_map>
14 
15 #include "rocksdb/db.h"
16 #include "rocksdb/env.h"
17 #include "rocksdb/status.h"
18 #include "rocksdb/trace_reader_writer.h"
19 #include "rocksdb/trace_record.h"
20 #include "rocksdb/trace_record_result.h"
21 #include "rocksdb/utilities/replayer.h"
22 #include "trace_replay/trace_replay.h"
23 
24 namespace ROCKSDB_NAMESPACE {
25 
26 class ReplayerImpl : public Replayer {
27  public:
28   ReplayerImpl(DB* db, const std::vector<ColumnFamilyHandle*>& handles,
29                std::unique_ptr<TraceReader>&& reader);
30   ~ReplayerImpl() override;
31 
32   using Replayer::Prepare;
33   Status Prepare() override;
34 
35   using Replayer::Next;
36   Status Next(std::unique_ptr<TraceRecord>* record) override;
37 
38   using Replayer::Execute;
39   Status Execute(const std::unique_ptr<TraceRecord>& record,
40                  std::unique_ptr<TraceRecordResult>* result) override;
41 
42   using Replayer::Replay;
43   Status Replay(
44       const ReplayOptions& options,
45       const std::function<void(Status, std::unique_ptr<TraceRecordResult>&&)>&
46           result_callback) override;
47 
48   using Replayer::GetHeaderTimestamp;
49   uint64_t GetHeaderTimestamp() const override;
50 
51  private:
52   Status ReadHeader(Trace* header);
53   Status ReadTrace(Trace* trace);
54 
55   // Generic function to execute a Trace in a thread pool.
56   static void BackgroundWork(void* arg);
57 
58   std::unique_ptr<TraceReader> trace_reader_;
59   std::mutex mutex_;
60   std::atomic<bool> prepared_;
61   std::atomic<bool> trace_end_;
62   uint64_t header_ts_;
63   std::unique_ptr<TraceRecord::Handler> exec_handler_;
64   Env* env_;
65   // When reading the trace header, the trace file version can be parsed.
66   // Replayer will use different decode method to get the trace content based
67   // on different trace file version.
68   int trace_file_version_;
69 };
70 
71 // Arguments passed to BackgroundWork() for replaying in a thread pool.
72 struct ReplayerWorkerArg {
73   Trace trace_entry;
74   int trace_file_version;
75   // Handler to execute TraceRecord.
76   TraceRecord::Handler* handler;
77   // Callback function to report the error status and the timestamp of the
78   // TraceRecord (not the start/end timestamp of executing the TraceRecord).
79   std::function<void(Status, uint64_t)> error_cb;
80   // Callback function to report the trace execution status and operation
81   // execution status/result(s).
82   std::function<void(Status, std::unique_ptr<TraceRecordResult>&&)> result_cb;
83 };
84 
85 }  // namespace ROCKSDB_NAMESPACE
86 #endif  // ROCKSDB_LITE
87