1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 6 #pragma once 7 #ifndef ROCKSDB_LITE 8 9 #include <list> 10 #include <map> 11 #include <queue> 12 #include <set> 13 #include <utility> 14 #include <vector> 15 16 #include "rocksdb/env.h" 17 #include "rocksdb/trace_reader_writer.h" 18 #include "rocksdb/write_batch.h" 19 #include "trace_replay/trace_replay.h" 20 21 namespace rocksdb { 22 23 class DBImpl; 24 class WriteBatch; 25 26 enum TraceOperationType : int { 27 kGet = 0, 28 kPut = 1, 29 kDelete = 2, 30 kSingleDelete = 3, 31 kRangeDelete = 4, 32 kMerge = 5, 33 kIteratorSeek = 6, 34 kIteratorSeekForPrev = 7, 35 kTaTypeNum = 8 36 }; 37 38 struct TraceUnit { 39 uint64_t ts; 40 uint32_t type; 41 uint32_t cf_id; 42 size_t value_size; 43 std::string key; 44 }; 45 46 struct TypeCorrelation { 47 uint64_t count; 48 uint64_t total_ts; 49 }; 50 51 struct StatsUnit { 52 uint64_t key_id; 53 uint64_t access_count; 54 uint64_t latest_ts; 55 uint64_t succ_count; // current only used to count Get if key found 56 uint32_t cf_id; 57 size_t value_size; 58 std::vector<TypeCorrelation> v_correlation; 59 }; 60 61 class AnalyzerOptions { 62 public: 63 std::vector<std::vector<int>> correlation_map; 64 std::vector<std::pair<int, int>> correlation_list; 65 66 AnalyzerOptions(); 67 68 ~AnalyzerOptions(); 69 70 void SparseCorrelationInput(const std::string& in_str); 71 }; 72 73 // Note that, for the variable names in the trace_analyzer, 74 // Starting with 'a_' means the variable is used for 'accessed_keys'. 75 // Starting with 'w_' means it is used for 'the whole key space'. 76 // Ending with '_f' means a file write or reader pointer. 77 // For example, 'a_count' means 'accessed_keys_count', 78 // 'w_key_f' means 'whole_key_space_file'. 79 80 struct TraceStats { 81 uint32_t cf_id; 82 std::string cf_name; 83 uint64_t a_count; 84 uint64_t a_succ_count; 85 uint64_t a_key_id; 86 uint64_t a_key_size_sqsum; 87 uint64_t a_key_size_sum; 88 uint64_t a_key_mid; 89 uint64_t a_value_size_sqsum; 90 uint64_t a_value_size_sum; 91 uint64_t a_value_mid; 92 uint32_t a_peak_qps; 93 double a_ave_qps; 94 std::map<std::string, StatsUnit> a_key_stats; 95 std::map<uint64_t, uint64_t> a_count_stats; 96 std::map<uint64_t, uint64_t> a_key_size_stats; 97 std::map<uint64_t, uint64_t> a_value_size_stats; 98 std::map<uint32_t, uint32_t> a_qps_stats; 99 std::map<uint32_t, std::map<std::string, uint32_t>> a_qps_prefix_stats; 100 std::priority_queue<std::pair<uint64_t, std::string>, 101 std::vector<std::pair<uint64_t, std::string>>, 102 std::greater<std::pair<uint64_t, std::string>>> 103 top_k_queue; 104 std::priority_queue<std::pair<uint64_t, std::string>, 105 std::vector<std::pair<uint64_t, std::string>>, 106 std::greater<std::pair<uint64_t, std::string>>> 107 top_k_prefix_access; 108 std::priority_queue<std::pair<double, std::string>, 109 std::vector<std::pair<double, std::string>>, 110 std::greater<std::pair<double, std::string>>> 111 top_k_prefix_ave; 112 std::priority_queue<std::pair<uint32_t, uint32_t>, 113 std::vector<std::pair<uint32_t, uint32_t>>, 114 std::greater<std::pair<uint32_t, uint32_t>>> 115 top_k_qps_sec; 116 std::list<TraceUnit> time_series; 117 std::vector<std::pair<uint64_t, uint64_t>> correlation_output; 118 std::map<uint32_t, uint64_t> uni_key_num; 119 120 std::unique_ptr<rocksdb::WritableFile> time_series_f; 121 std::unique_ptr<rocksdb::WritableFile> a_key_f; 122 std::unique_ptr<rocksdb::WritableFile> a_count_dist_f; 123 std::unique_ptr<rocksdb::WritableFile> a_prefix_cut_f; 124 std::unique_ptr<rocksdb::WritableFile> a_value_size_f; 125 std::unique_ptr<rocksdb::WritableFile> a_key_size_f; 126 std::unique_ptr<rocksdb::WritableFile> a_key_num_f; 127 std::unique_ptr<rocksdb::WritableFile> a_qps_f; 128 std::unique_ptr<rocksdb::WritableFile> a_top_qps_prefix_f; 129 std::unique_ptr<rocksdb::WritableFile> w_key_f; 130 std::unique_ptr<rocksdb::WritableFile> w_prefix_cut_f; 131 132 TraceStats(); 133 ~TraceStats(); 134 TraceStats(const TraceStats&) = delete; 135 TraceStats& operator=(const TraceStats&) = delete; 136 TraceStats(TraceStats&&) = default; 137 TraceStats& operator=(TraceStats&&) = default; 138 }; 139 140 struct TypeUnit { 141 std::string type_name; 142 bool enabled; 143 uint64_t total_keys; 144 uint64_t total_access; 145 uint64_t total_succ_access; 146 uint32_t sample_count; 147 std::map<uint32_t, TraceStats> stats; 148 TypeUnit() = default; 149 ~TypeUnit() = default; 150 TypeUnit(const TypeUnit&) = delete; 151 TypeUnit& operator=(const TypeUnit&) = delete; 152 TypeUnit(TypeUnit&&) = default; 153 TypeUnit& operator=(TypeUnit&&) = default; 154 }; 155 156 struct CfUnit { 157 uint32_t cf_id; 158 uint64_t w_count; // total keys in this cf if we use the whole key space 159 uint64_t a_count; // the total keys in this cf that are accessed 160 std::map<uint64_t, uint64_t> w_key_size_stats; // whole key space key size 161 // statistic this cf 162 std::map<uint32_t, uint32_t> cf_qps; 163 }; 164 165 class TraceAnalyzer { 166 public: 167 TraceAnalyzer(std::string& trace_path, std::string& output_path, 168 AnalyzerOptions _analyzer_opts); 169 ~TraceAnalyzer(); 170 171 Status PrepareProcessing(); 172 173 Status StartProcessing(); 174 175 Status MakeStatistics(); 176 177 Status ReProcessing(); 178 179 Status EndProcessing(); 180 181 Status WriteTraceUnit(TraceUnit& unit); 182 183 // The trace processing functions for different type 184 Status HandleGet(uint32_t column_family_id, const std::string& key, 185 const uint64_t& ts, const uint32_t& get_ret); 186 Status HandlePut(uint32_t column_family_id, const Slice& key, 187 const Slice& value); 188 Status HandleDelete(uint32_t column_family_id, const Slice& key); 189 Status HandleSingleDelete(uint32_t column_family_id, const Slice& key); 190 Status HandleDeleteRange(uint32_t column_family_id, const Slice& begin_key, 191 const Slice& end_key); 192 Status HandleMerge(uint32_t column_family_id, const Slice& key, 193 const Slice& value); 194 Status HandleIter(uint32_t column_family_id, const std::string& key, 195 const uint64_t& ts, TraceType& trace_type); GetTaVector()196 std::vector<TypeUnit>& GetTaVector() { return ta_; } 197 198 private: 199 rocksdb::Env* env_; 200 EnvOptions env_options_; 201 std::unique_ptr<TraceReader> trace_reader_; 202 size_t offset_; 203 char buffer_[1024]; 204 uint64_t c_time_; 205 std::string trace_name_; 206 std::string output_path_; 207 AnalyzerOptions analyzer_opts_; 208 uint64_t total_requests_; 209 uint64_t total_access_keys_; 210 uint64_t total_gets_; 211 uint64_t total_writes_; 212 uint64_t trace_create_time_; 213 uint64_t begin_time_; 214 uint64_t end_time_; 215 uint64_t time_series_start_; 216 uint32_t sample_max_; 217 uint32_t cur_time_sec_; 218 std::unique_ptr<rocksdb::WritableFile> trace_sequence_f_; // readable trace 219 std::unique_ptr<rocksdb::WritableFile> qps_f_; // overall qps 220 std::unique_ptr<rocksdb::WritableFile> cf_qps_f_; // The qps of each CF> 221 std::vector<TypeUnit> ta_; // The main statistic collecting data structure 222 std::map<uint32_t, CfUnit> cfs_; // All the cf_id appears in this trace; 223 std::vector<uint32_t> qps_peak_; 224 std::vector<double> qps_ave_; 225 226 Status ReadTraceHeader(Trace* header); 227 Status ReadTraceFooter(Trace* footer); 228 Status ReadTraceRecord(Trace* trace); 229 Status KeyStatsInsertion(const uint32_t& type, const uint32_t& cf_id, 230 const std::string& key, const size_t value_size, 231 const uint64_t ts); 232 Status StatsUnitCorrelationUpdate(StatsUnit& unit, const uint32_t& type, 233 const uint64_t& ts, const std::string& key); 234 Status OpenStatsOutputFiles(const std::string& type, TraceStats& new_stats); 235 Status CreateOutputFile(const std::string& type, const std::string& cf_name, 236 const std::string& ending, 237 std::unique_ptr<rocksdb::WritableFile>* f_ptr); 238 void CloseOutputFiles(); 239 240 void PrintStatistics(); 241 Status TraceUnitWriter(std::unique_ptr<rocksdb::WritableFile>& f_ptr, 242 TraceUnit& unit); 243 Status WriteTraceSequence(const uint32_t& type, const uint32_t& cf_id, 244 const std::string& key, const size_t value_size, 245 const uint64_t ts); 246 Status MakeStatisticKeyStatsOrPrefix(TraceStats& stats); 247 Status MakeStatisticCorrelation(TraceStats& stats, StatsUnit& unit); 248 Status MakeStatisticQPS(); 249 }; 250 251 // write bach handler to be used for WriteBache iterator 252 // when processing the write trace 253 class TraceWriteHandler : public WriteBatch::Handler { 254 public: TraceWriteHandler()255 TraceWriteHandler() { ta_ptr = nullptr; } TraceWriteHandler(TraceAnalyzer * _ta_ptr)256 explicit TraceWriteHandler(TraceAnalyzer* _ta_ptr) { ta_ptr = _ta_ptr; } ~TraceWriteHandler()257 ~TraceWriteHandler() {} 258 PutCF(uint32_t column_family_id,const Slice & key,const Slice & value)259 virtual Status PutCF(uint32_t column_family_id, const Slice& key, 260 const Slice& value) override { 261 return ta_ptr->HandlePut(column_family_id, key, value); 262 } DeleteCF(uint32_t column_family_id,const Slice & key)263 virtual Status DeleteCF(uint32_t column_family_id, 264 const Slice& key) override { 265 return ta_ptr->HandleDelete(column_family_id, key); 266 } SingleDeleteCF(uint32_t column_family_id,const Slice & key)267 virtual Status SingleDeleteCF(uint32_t column_family_id, 268 const Slice& key) override { 269 return ta_ptr->HandleSingleDelete(column_family_id, key); 270 } DeleteRangeCF(uint32_t column_family_id,const Slice & begin_key,const Slice & end_key)271 virtual Status DeleteRangeCF(uint32_t column_family_id, 272 const Slice& begin_key, 273 const Slice& end_key) override { 274 return ta_ptr->HandleDeleteRange(column_family_id, begin_key, end_key); 275 } MergeCF(uint32_t column_family_id,const Slice & key,const Slice & value)276 virtual Status MergeCF(uint32_t column_family_id, const Slice& key, 277 const Slice& value) override { 278 return ta_ptr->HandleMerge(column_family_id, key, value); 279 } 280 281 private: 282 TraceAnalyzer* ta_ptr; 283 }; 284 285 int trace_analyzer_tool(int argc, char** argv); 286 287 } // namespace rocksdb 288 289 #endif // ROCKSDB_LITE 290