1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 6 #pragma once 7 #ifndef ROCKSDB_LITE 8 9 #include <atomic> 10 #include <functional> 11 #include <memory> 12 #include <mutex> 13 #include <unordered_map> 14 15 #include "rocksdb/db.h" 16 #include "rocksdb/env.h" 17 #include "rocksdb/status.h" 18 #include "rocksdb/trace_reader_writer.h" 19 #include "rocksdb/trace_record.h" 20 #include "rocksdb/trace_record_result.h" 21 #include "rocksdb/utilities/replayer.h" 22 #include "trace_replay/trace_replay.h" 23 24 namespace ROCKSDB_NAMESPACE { 25 26 class ReplayerImpl : public Replayer { 27 public: 28 ReplayerImpl(DB* db, const std::vector<ColumnFamilyHandle*>& handles, 29 std::unique_ptr<TraceReader>&& reader); 30 ~ReplayerImpl() override; 31 32 using Replayer::Prepare; 33 Status Prepare() override; 34 35 using Replayer::Next; 36 Status Next(std::unique_ptr<TraceRecord>* record) override; 37 38 using Replayer::Execute; 39 Status Execute(const std::unique_ptr<TraceRecord>& record, 40 std::unique_ptr<TraceRecordResult>* result) override; 41 42 using Replayer::Replay; 43 Status Replay( 44 const ReplayOptions& options, 45 const std::function<void(Status, std::unique_ptr<TraceRecordResult>&&)>& 46 result_callback) override; 47 48 using Replayer::GetHeaderTimestamp; 49 uint64_t GetHeaderTimestamp() const override; 50 51 private: 52 Status ReadHeader(Trace* header); 53 Status ReadTrace(Trace* trace); 54 55 // Generic function to execute a Trace in a thread pool. 56 static void BackgroundWork(void* arg); 57 58 std::unique_ptr<TraceReader> trace_reader_; 59 std::mutex mutex_; 60 std::atomic<bool> prepared_; 61 std::atomic<bool> trace_end_; 62 uint64_t header_ts_; 63 std::unique_ptr<TraceRecord::Handler> exec_handler_; 64 Env* env_; 65 // When reading the trace header, the trace file version can be parsed. 66 // Replayer will use different decode method to get the trace content based 67 // on different trace file version. 68 int trace_file_version_; 69 }; 70 71 // Arguments passed to BackgroundWork() for replaying in a thread pool. 72 struct ReplayerWorkerArg { 73 Trace trace_entry; 74 int trace_file_version; 75 // Handler to execute TraceRecord. 76 TraceRecord::Handler* handler; 77 // Callback function to report the error status and the timestamp of the 78 // TraceRecord (not the start/end timestamp of executing the TraceRecord). 79 std::function<void(Status, uint64_t)> error_cb; 80 // Callback function to report the trace execution status and operation 81 // execution status/result(s). 82 std::function<void(Status, std::unique_ptr<TraceRecordResult>&&)> result_cb; 83 }; 84 85 } // namespace ROCKSDB_NAMESPACE 86 #endif // ROCKSDB_LITE 87