1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #pragma once
7 #ifndef ROCKSDB_LITE
8 
9 #include <list>
10 #include <map>
11 #include <queue>
12 #include <set>
13 #include <utility>
14 #include <vector>
15 
16 #include "rocksdb/env.h"
17 #include "rocksdb/trace_reader_writer.h"
18 #include "rocksdb/write_batch.h"
19 #include "trace_replay/trace_replay.h"
20 
21 namespace ROCKSDB_NAMESPACE {
22 
23 class DBImpl;
24 class WriteBatch;
25 
26 enum TraceOperationType : int {
27   kGet = 0,
28   kPut = 1,
29   kDelete = 2,
30   kSingleDelete = 3,
31   kRangeDelete = 4,
32   kMerge = 5,
33   kIteratorSeek = 6,
34   kIteratorSeekForPrev = 7,
35   kTaTypeNum = 8
36 };
37 
38 struct TraceUnit {
39   uint64_t ts;
40   uint32_t type;
41   uint32_t cf_id;
42   size_t value_size;
43   std::string key;
44 };
45 
46 struct TypeCorrelation {
47   uint64_t count;
48   uint64_t total_ts;
49 };
50 
51 struct StatsUnit {
52   uint64_t key_id;
53   uint64_t access_count;
54   uint64_t latest_ts;
55   uint64_t succ_count;  // current only used to count Get if key found
56   uint32_t cf_id;
57   size_t value_size;
58   std::vector<TypeCorrelation> v_correlation;
59 };
60 
61 class AnalyzerOptions {
62  public:
63   std::vector<std::vector<int>> correlation_map;
64   std::vector<std::pair<int, int>> correlation_list;
65 
66   AnalyzerOptions();
67 
68   ~AnalyzerOptions();
69 
70   void SparseCorrelationInput(const std::string& in_str);
71 };
72 
73 // Note that, for the variable names  in the trace_analyzer,
74 // Starting with 'a_' means the variable is used for 'accessed_keys'.
75 // Starting with 'w_' means it is used for 'the whole key space'.
76 // Ending with '_f' means a file write or reader pointer.
77 // For example, 'a_count' means 'accessed_keys_count',
78 // 'w_key_f' means 'whole_key_space_file'.
79 
80 struct TraceStats {
81   uint32_t cf_id;
82   std::string cf_name;
83   uint64_t a_count;
84   uint64_t a_succ_count;
85   uint64_t a_key_id;
86   uint64_t a_key_size_sqsum;
87   uint64_t a_key_size_sum;
88   uint64_t a_key_mid;
89   uint64_t a_value_size_sqsum;
90   uint64_t a_value_size_sum;
91   uint64_t a_value_mid;
92   uint32_t a_peak_qps;
93   double a_ave_qps;
94   std::map<std::string, StatsUnit> a_key_stats;
95   std::map<uint64_t, uint64_t> a_count_stats;
96   std::map<uint64_t, uint64_t> a_key_size_stats;
97   std::map<uint64_t, uint64_t> a_value_size_stats;
98   std::map<uint32_t, uint32_t> a_qps_stats;
99   std::map<uint32_t, std::map<std::string, uint32_t>> a_qps_prefix_stats;
100   std::priority_queue<std::pair<uint64_t, std::string>,
101                       std::vector<std::pair<uint64_t, std::string>>,
102                       std::greater<std::pair<uint64_t, std::string>>>
103       top_k_queue;
104   std::priority_queue<std::pair<uint64_t, std::string>,
105                       std::vector<std::pair<uint64_t, std::string>>,
106                       std::greater<std::pair<uint64_t, std::string>>>
107       top_k_prefix_access;
108   std::priority_queue<std::pair<double, std::string>,
109                       std::vector<std::pair<double, std::string>>,
110                       std::greater<std::pair<double, std::string>>>
111       top_k_prefix_ave;
112   std::priority_queue<std::pair<uint32_t, uint32_t>,
113                       std::vector<std::pair<uint32_t, uint32_t>>,
114                       std::greater<std::pair<uint32_t, uint32_t>>>
115       top_k_qps_sec;
116   std::list<TraceUnit> time_series;
117   std::vector<std::pair<uint64_t, uint64_t>> correlation_output;
118   std::map<uint32_t, uint64_t> uni_key_num;
119 
120   std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> time_series_f;
121   std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> a_key_f;
122   std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> a_count_dist_f;
123   std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> a_prefix_cut_f;
124   std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> a_value_size_f;
125   std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> a_key_size_f;
126   std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> a_key_num_f;
127   std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> a_qps_f;
128   std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> a_top_qps_prefix_f;
129   std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> w_key_f;
130   std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> w_prefix_cut_f;
131 
132   TraceStats();
133   ~TraceStats();
134   TraceStats(const TraceStats&) = delete;
135   TraceStats& operator=(const TraceStats&) = delete;
136   TraceStats(TraceStats&&) = default;
137   TraceStats& operator=(TraceStats&&) = default;
138 };
139 
140 struct TypeUnit {
141   std::string type_name;
142   bool enabled;
143   uint64_t total_keys;
144   uint64_t total_access;
145   uint64_t total_succ_access;
146   uint32_t sample_count;
147   std::map<uint32_t, TraceStats> stats;
148   TypeUnit() = default;
149   ~TypeUnit() = default;
150   TypeUnit(const TypeUnit&) = delete;
151   TypeUnit& operator=(const TypeUnit&) = delete;
152   TypeUnit(TypeUnit&&) = default;
153   TypeUnit& operator=(TypeUnit&&) = default;
154 };
155 
156 struct CfUnit {
157   uint32_t cf_id;
158   uint64_t w_count;  // total keys in this cf if we use the whole key space
159   uint64_t a_count;  // the total keys in this cf that are accessed
160   std::map<uint64_t, uint64_t> w_key_size_stats;  // whole key space key size
161                                                   // statistic this cf
162   std::map<uint32_t, uint32_t> cf_qps;
163 };
164 
165 class TraceAnalyzer {
166  public:
167   TraceAnalyzer(std::string& trace_path, std::string& output_path,
168                 AnalyzerOptions _analyzer_opts);
169   ~TraceAnalyzer();
170 
171   Status PrepareProcessing();
172 
173   Status StartProcessing();
174 
175   Status MakeStatistics();
176 
177   Status ReProcessing();
178 
179   Status EndProcessing();
180 
181   Status WriteTraceUnit(TraceUnit& unit);
182 
183   // The trace  processing functions for different type
184   Status HandleGet(uint32_t column_family_id, const std::string& key,
185                    const uint64_t& ts, const uint32_t& get_ret);
186   Status HandlePut(uint32_t column_family_id, const Slice& key,
187                    const Slice& value);
188   Status HandleDelete(uint32_t column_family_id, const Slice& key);
189   Status HandleSingleDelete(uint32_t column_family_id, const Slice& key);
190   Status HandleDeleteRange(uint32_t column_family_id, const Slice& begin_key,
191                            const Slice& end_key);
192   Status HandleMerge(uint32_t column_family_id, const Slice& key,
193                      const Slice& value);
194   Status HandleIter(uint32_t column_family_id, const std::string& key,
195                     const uint64_t& ts, TraceType& trace_type);
GetTaVector()196   std::vector<TypeUnit>& GetTaVector() { return ta_; }
197 
198  private:
199   ROCKSDB_NAMESPACE::Env* env_;
200   EnvOptions env_options_;
201   std::unique_ptr<TraceReader> trace_reader_;
202   size_t offset_;
203   char buffer_[1024];
204   uint64_t c_time_;
205   std::string trace_name_;
206   std::string output_path_;
207   AnalyzerOptions analyzer_opts_;
208   uint64_t total_requests_;
209   uint64_t total_access_keys_;
210   uint64_t total_gets_;
211   uint64_t total_writes_;
212   uint64_t trace_create_time_;
213   uint64_t begin_time_;
214   uint64_t end_time_;
215   uint64_t time_series_start_;
216   uint32_t sample_max_;
217   uint32_t cur_time_sec_;
218   std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile>
219       trace_sequence_f_;                                    // readable trace
220   std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> qps_f_;  // overall qps
221   std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile>
222       cf_qps_f_;              // The qps of each CF>
223   std::vector<TypeUnit> ta_;  // The main statistic collecting data structure
224   std::map<uint32_t, CfUnit> cfs_;  // All the cf_id appears in this trace;
225   std::vector<uint32_t> qps_peak_;
226   std::vector<double> qps_ave_;
227 
228   Status ReadTraceHeader(Trace* header);
229   Status ReadTraceFooter(Trace* footer);
230   Status ReadTraceRecord(Trace* trace);
231   Status KeyStatsInsertion(const uint32_t& type, const uint32_t& cf_id,
232                            const std::string& key, const size_t value_size,
233                            const uint64_t ts);
234   Status StatsUnitCorrelationUpdate(StatsUnit& unit, const uint32_t& type,
235                                     const uint64_t& ts, const std::string& key);
236   Status OpenStatsOutputFiles(const std::string& type, TraceStats& new_stats);
237   Status CreateOutputFile(
238       const std::string& type, const std::string& cf_name,
239       const std::string& ending,
240       std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile>* f_ptr);
241   void CloseOutputFiles();
242 
243   void PrintStatistics();
244   Status TraceUnitWriter(
245       std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile>& f_ptr, TraceUnit& unit);
246   Status WriteTraceSequence(const uint32_t& type, const uint32_t& cf_id,
247                             const std::string& key, const size_t value_size,
248                             const uint64_t ts);
249   Status MakeStatisticKeyStatsOrPrefix(TraceStats& stats);
250   Status MakeStatisticCorrelation(TraceStats& stats, StatsUnit& unit);
251   Status MakeStatisticQPS();
252 };
253 
254 // write bach handler to be used for WriteBache iterator
255 // when processing the write trace
256 class TraceWriteHandler : public WriteBatch::Handler {
257  public:
TraceWriteHandler()258   TraceWriteHandler() { ta_ptr = nullptr; }
TraceWriteHandler(TraceAnalyzer * _ta_ptr)259   explicit TraceWriteHandler(TraceAnalyzer* _ta_ptr) { ta_ptr = _ta_ptr; }
~TraceWriteHandler()260   ~TraceWriteHandler() {}
261 
PutCF(uint32_t column_family_id,const Slice & key,const Slice & value)262   virtual Status PutCF(uint32_t column_family_id, const Slice& key,
263                        const Slice& value) override {
264     return ta_ptr->HandlePut(column_family_id, key, value);
265   }
DeleteCF(uint32_t column_family_id,const Slice & key)266   virtual Status DeleteCF(uint32_t column_family_id,
267                           const Slice& key) override {
268     return ta_ptr->HandleDelete(column_family_id, key);
269   }
SingleDeleteCF(uint32_t column_family_id,const Slice & key)270   virtual Status SingleDeleteCF(uint32_t column_family_id,
271                                 const Slice& key) override {
272     return ta_ptr->HandleSingleDelete(column_family_id, key);
273   }
DeleteRangeCF(uint32_t column_family_id,const Slice & begin_key,const Slice & end_key)274   virtual Status DeleteRangeCF(uint32_t column_family_id,
275                                const Slice& begin_key,
276                                const Slice& end_key) override {
277     return ta_ptr->HandleDeleteRange(column_family_id, begin_key, end_key);
278   }
MergeCF(uint32_t column_family_id,const Slice & key,const Slice & value)279   virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
280                          const Slice& value) override {
281     return ta_ptr->HandleMerge(column_family_id, key, value);
282   }
283 
284  private:
285   TraceAnalyzer* ta_ptr;
286 };
287 
288 int trace_analyzer_tool(int argc, char** argv);
289 
290 }  // namespace ROCKSDB_NAMESPACE
291 
292 #endif  // ROCKSDB_LITE
293