1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #pragma once
7 #ifndef ROCKSDB_LITE
8 
9 #include <list>
10 #include <map>
11 #include <queue>
12 #include <set>
13 #include <utility>
14 #include <vector>
15 
16 #include "rocksdb/env.h"
17 #include "rocksdb/trace_reader_writer.h"
18 #include "rocksdb/trace_record.h"
19 #include "rocksdb/write_batch.h"
20 #include "trace_replay/trace_replay.h"
21 
22 namespace ROCKSDB_NAMESPACE {
23 
24 // Value sizes may be used as denominators. Replacing 0 value sizes with this
25 // positive integer avoids division error.
26 extern const size_t kShadowValueSize /* = 10*/;
27 
28 enum TraceOperationType : int {
29   kGet = 0,
30   kPut = 1,
31   kDelete = 2,
32   kSingleDelete = 3,
33   kRangeDelete = 4,
34   kMerge = 5,
35   kIteratorSeek = 6,
36   kIteratorSeekForPrev = 7,
37   kMultiGet = 8,
38   kTaTypeNum = 9
39 };
40 
41 struct TraceUnit {
42   uint64_t ts;
43   uint32_t type;
44   uint32_t cf_id;
45   size_t value_size;
46   std::string key;
47 };
48 
49 struct TypeCorrelation {
50   uint64_t count;
51   uint64_t total_ts;
52 };
53 
54 struct StatsUnit {
55   uint64_t key_id;
56   uint64_t access_count;
57   uint64_t latest_ts;
58   uint64_t succ_count;  // current only used to count Get if key found
59   uint32_t cf_id;
60   size_t value_size;
61   std::vector<TypeCorrelation> v_correlation;
62 };
63 
64 class AnalyzerOptions {
65  public:
66   std::vector<std::vector<int>> correlation_map;
67   std::vector<std::pair<int, int>> correlation_list;
68 
69   AnalyzerOptions();
70 
71   ~AnalyzerOptions();
72 
73   void SparseCorrelationInput(const std::string& in_str);
74 };
75 
76 // Note that, for the variable names  in the trace_analyzer,
77 // Starting with 'a_' means the variable is used for 'accessed_keys'.
78 // Starting with 'w_' means it is used for 'the whole key space'.
79 // Ending with '_f' means a file write or reader pointer.
80 // For example, 'a_count' means 'accessed_keys_count',
81 // 'w_key_f' means 'whole_key_space_file'.
82 
83 struct TraceStats {
84   uint32_t cf_id;
85   std::string cf_name;
86   uint64_t a_count;
87   uint64_t a_succ_count;
88   uint64_t a_key_id;
89   uint64_t a_key_size_sqsum;
90   uint64_t a_key_size_sum;
91   uint64_t a_key_mid;
92   uint64_t a_value_size_sqsum;
93   uint64_t a_value_size_sum;
94   uint64_t a_value_mid;
95   uint32_t a_peak_qps;
96   double a_ave_qps;
97   std::map<std::string, StatsUnit> a_key_stats;
98   std::map<uint64_t, uint64_t> a_count_stats;
99   std::map<uint64_t, uint64_t> a_key_size_stats;
100   std::map<uint64_t, uint64_t> a_value_size_stats;
101   std::map<uint32_t, uint32_t> a_qps_stats;
102   std::map<uint32_t, std::map<std::string, uint32_t>> a_qps_prefix_stats;
103   std::priority_queue<std::pair<uint64_t, std::string>,
104                       std::vector<std::pair<uint64_t, std::string>>,
105                       std::greater<std::pair<uint64_t, std::string>>>
106       top_k_queue;
107   std::priority_queue<std::pair<uint64_t, std::string>,
108                       std::vector<std::pair<uint64_t, std::string>>,
109                       std::greater<std::pair<uint64_t, std::string>>>
110       top_k_prefix_access;
111   std::priority_queue<std::pair<double, std::string>,
112                       std::vector<std::pair<double, std::string>>,
113                       std::greater<std::pair<double, std::string>>>
114       top_k_prefix_ave;
115   std::priority_queue<std::pair<uint32_t, uint32_t>,
116                       std::vector<std::pair<uint32_t, uint32_t>>,
117                       std::greater<std::pair<uint32_t, uint32_t>>>
118       top_k_qps_sec;
119   std::list<TraceUnit> time_series;
120   std::vector<std::pair<uint64_t, uint64_t>> correlation_output;
121   std::map<uint32_t, uint64_t> uni_key_num;
122 
123   std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> time_series_f;
124   std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> a_key_f;
125   std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> a_count_dist_f;
126   std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> a_prefix_cut_f;
127   std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> a_value_size_f;
128   std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> a_key_size_f;
129   std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> a_key_num_f;
130   std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> a_qps_f;
131   std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> a_top_qps_prefix_f;
132   std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> w_key_f;
133   std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> w_prefix_cut_f;
134 
135   TraceStats();
136   ~TraceStats();
137   TraceStats(const TraceStats&) = delete;
138   TraceStats& operator=(const TraceStats&) = delete;
139   TraceStats(TraceStats&&) = default;
140   TraceStats& operator=(TraceStats&&) = default;
141 };
142 
143 struct TypeUnit {
144   std::string type_name;
145   bool enabled;
146   uint64_t total_keys;
147   uint64_t total_access;
148   uint64_t total_succ_access;
149   uint32_t sample_count;
150   std::map<uint32_t, TraceStats> stats;
151   TypeUnit() = default;
152   ~TypeUnit() = default;
153   TypeUnit(const TypeUnit&) = delete;
154   TypeUnit& operator=(const TypeUnit&) = delete;
155   TypeUnit(TypeUnit&&) = default;
156   TypeUnit& operator=(TypeUnit&&) = default;
157 };
158 
159 struct CfUnit {
160   uint32_t cf_id;
161   uint64_t w_count;  // total keys in this cf if we use the whole key space
162   uint64_t a_count;  // the total keys in this cf that are accessed
163   std::map<uint64_t, uint64_t> w_key_size_stats;  // whole key space key size
164                                                   // statistic this cf
165   std::map<uint32_t, uint32_t> cf_qps;
166 };
167 
168 class TraceAnalyzer : private TraceRecord::Handler,
169                       private WriteBatch::Handler {
170  public:
171   TraceAnalyzer(std::string& trace_path, std::string& output_path,
172                 AnalyzerOptions _analyzer_opts);
173   ~TraceAnalyzer();
174 
175   Status PrepareProcessing();
176 
177   Status StartProcessing();
178 
179   Status MakeStatistics();
180 
181   Status ReProcessing();
182 
183   Status EndProcessing();
184 
185   Status WriteTraceUnit(TraceUnit& unit);
186 
GetTaVector()187   std::vector<TypeUnit>& GetTaVector() { return ta_; }
188 
189  private:
190   using TraceRecord::Handler::Handle;
191   Status Handle(const WriteQueryTraceRecord& record,
192                 std::unique_ptr<TraceRecordResult>* result) override;
193   Status Handle(const GetQueryTraceRecord& record,
194                 std::unique_ptr<TraceRecordResult>* result) override;
195   Status Handle(const IteratorSeekQueryTraceRecord& record,
196                 std::unique_ptr<TraceRecordResult>* result) override;
197   Status Handle(const MultiGetQueryTraceRecord& record,
198                 std::unique_ptr<TraceRecordResult>* result) override;
199 
200   using WriteBatch::Handler::PutCF;
201   Status PutCF(uint32_t column_family_id, const Slice& key,
202                const Slice& value) override;
203 
204   using WriteBatch::Handler::DeleteCF;
205   Status DeleteCF(uint32_t column_family_id, const Slice& key) override;
206 
207   using WriteBatch::Handler::SingleDeleteCF;
208   Status SingleDeleteCF(uint32_t column_family_id, const Slice& key) override;
209 
210   using WriteBatch::Handler::DeleteRangeCF;
211   Status DeleteRangeCF(uint32_t column_family_id, const Slice& begin_key,
212                        const Slice& end_key) override;
213 
214   using WriteBatch::Handler::MergeCF;
215   Status MergeCF(uint32_t column_family_id, const Slice& key,
216                  const Slice& value) override;
217 
218   // The following hanlders are not implemented, return Status::OK() to avoid
219   // the running time assertion and other irrelevant falures.
220   using WriteBatch::Handler::PutBlobIndexCF;
PutBlobIndexCF(uint32_t,const Slice &,const Slice &)221   Status PutBlobIndexCF(uint32_t /*column_family_id*/, const Slice& /*key*/,
222                         const Slice& /*value*/) override {
223     return Status::OK();
224   }
225 
226   // The default implementation of LogData does nothing.
227   using WriteBatch::Handler::LogData;
LogData(const Slice &)228   void LogData(const Slice& /*blob*/) override {}
229 
230   using WriteBatch::Handler::MarkBeginPrepare;
231   Status MarkBeginPrepare(bool = false) override { return Status::OK(); }
232 
233   using WriteBatch::Handler::MarkEndPrepare;
MarkEndPrepare(const Slice &)234   Status MarkEndPrepare(const Slice& /*xid*/) override { return Status::OK(); }
235 
236   using WriteBatch::Handler::MarkNoop;
MarkNoop(bool)237   Status MarkNoop(bool /*empty_batch*/) override { return Status::OK(); }
238 
239   using WriteBatch::Handler::MarkRollback;
MarkRollback(const Slice &)240   Status MarkRollback(const Slice& /*xid*/) override { return Status::OK(); }
241 
242   using WriteBatch::Handler::MarkCommit;
MarkCommit(const Slice &)243   Status MarkCommit(const Slice& /*xid*/) override { return Status::OK(); }
244 
245   // Process each trace operation and output the analysis result to
246   // stdout/files.
247   Status OutputAnalysisResult(TraceOperationType op_type, uint64_t timestamp,
248                               std::vector<uint32_t> cf_ids,
249                               std::vector<Slice> keys,
250                               std::vector<size_t> value_sizes);
251 
252   Status OutputAnalysisResult(TraceOperationType op_type, uint64_t timestamp,
253                               uint32_t cf_id, const Slice& key,
254                               size_t value_size);
255 
256   ROCKSDB_NAMESPACE::Env* env_;
257   EnvOptions env_options_;
258   std::unique_ptr<TraceReader> trace_reader_;
259   size_t offset_;
260   char buffer_[1024];
261   // Timestamp of a WriteBatch, used in its iteration.
262   uint64_t write_batch_ts_;
263   std::string trace_name_;
264   std::string output_path_;
265   AnalyzerOptions analyzer_opts_;
266   uint64_t total_requests_;
267   uint64_t total_access_keys_;
268   uint64_t total_gets_;
269   uint64_t total_writes_;
270   uint64_t total_seeks_;
271   uint64_t total_seek_prevs_;
272   uint64_t total_multigets_;
273   uint64_t trace_create_time_;
274   uint64_t begin_time_;
275   uint64_t end_time_;
276   uint64_t time_series_start_;
277   uint32_t sample_max_;
278   uint32_t cur_time_sec_;
279   std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile>
280       trace_sequence_f_;                                    // readable trace
281   std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> qps_f_;  // overall qps
282   std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile>
283       cf_qps_f_;              // The qps of each CF>
284   std::vector<TypeUnit> ta_;  // The main statistic collecting data structure
285   std::map<uint32_t, CfUnit> cfs_;  // All the cf_id appears in this trace;
286   std::vector<uint32_t> qps_peak_;
287   std::vector<double> qps_ave_;
288 
289   Status ReadTraceHeader(Trace* header);
290   Status ReadTraceFooter(Trace* footer);
291   Status ReadTraceRecord(Trace* trace);
292   Status KeyStatsInsertion(const uint32_t& type, const uint32_t& cf_id,
293                            const std::string& key, const size_t value_size,
294                            const uint64_t ts);
295   Status StatsUnitCorrelationUpdate(StatsUnit& unit, const uint32_t& type,
296                                     const uint64_t& ts, const std::string& key);
297   Status OpenStatsOutputFiles(const std::string& type, TraceStats& new_stats);
298   Status CreateOutputFile(
299       const std::string& type, const std::string& cf_name,
300       const std::string& ending,
301       std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile>* f_ptr);
302   Status CloseOutputFiles();
303 
304   void PrintStatistics();
305   Status TraceUnitWriter(
306       std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile>& f_ptr, TraceUnit& unit);
307   Status WriteTraceSequence(const uint32_t& type, const uint32_t& cf_id,
308                             const Slice& key, const size_t value_size,
309                             const uint64_t ts);
310   Status MakeStatisticKeyStatsOrPrefix(TraceStats& stats);
311   Status MakeStatisticCorrelation(TraceStats& stats, StatsUnit& unit);
312   Status MakeStatisticQPS();
313   int db_version_;
314 };
315 
316 int trace_analyzer_tool(int argc, char** argv);
317 
318 }  // namespace ROCKSDB_NAMESPACE
319 
320 #endif  // ROCKSDB_LITE
321