1# Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2#  This source code is licensed under both the GPLv2 (found in the
3#  COPYING file in the root directory) and Apache 2.0 License
4#  (found in the LICENSE.Apache file in the root directory).
5
6from advisor.bench_runner import BenchmarkRunner
7from advisor.db_log_parser import DataSource, DatabaseLogs, NO_COL_FAMILY
8from advisor.db_stats_fetcher import (
9    LogStatsParser, OdsStatsFetcher, DatabasePerfContext
10)
11import shutil
12import subprocess
13import time
14
15
16'''
17NOTE: This is not thread-safe, because the output file is simply overwritten.
18'''
19
20
21class DBBenchRunner(BenchmarkRunner):
22    OUTPUT_FILE = "temp/dbbench_out.tmp"
23    ERROR_FILE = "temp/dbbench_err.tmp"
24    DB_PATH = "DB path"
25    THROUGHPUT = "ops/sec"
26    PERF_CON = " PERF_CONTEXT:"
27
28    @staticmethod
29    def is_metric_better(new_metric, old_metric):
30        # for db_bench 'throughput' is the metric returned by run_experiment
31        return new_metric >= old_metric
32
33    @staticmethod
34    def get_opt_args_str(misc_options_dict):
35        # given a dictionary of options and their values, return a string
36        # that can be appended as command-line arguments
37        optional_args_str = ""
38        for option_name, option_value in misc_options_dict.items():
39            if option_value:
40                optional_args_str += (
41                    " --" + option_name + "=" + str(option_value)
42                )
43        return optional_args_str
44
45    def __init__(self, positional_args, ods_args=None):
46        # parse positional_args list appropriately
47        self.db_bench_binary = positional_args[0]
48        self.benchmark = positional_args[1]
49        self.db_bench_args = None
50        if len(positional_args) > 2:
51            # options list with each option given as "<option>=<value>"
52            self.db_bench_args = positional_args[2:]
53        # save ods_args, if provided
54        self.ods_args = ods_args
55
56    def _parse_output(self, get_perf_context=False):
57        '''
58        Sample db_bench output after running 'readwhilewriting' benchmark:
59        DB path: [/tmp/rocksdbtest-155919/dbbench]\n
60        readwhilewriting : 16.582 micros/op 60305 ops/sec; 4.2 MB/s (3433828\
61        of 5427999 found)\n
62        PERF_CONTEXT:\n
63        user_key_comparison_count = 500466712, block_cache_hit_count = ...\n
64        '''
65        output = {
66            self.THROUGHPUT: None, self.DB_PATH: None, self.PERF_CON: None
67        }
68        perf_context_begins = False
69        with open(self.OUTPUT_FILE, 'r') as fp:
70            for line in fp:
71                if line.startswith(self.benchmark):
72                    # line from sample output:
73                    # readwhilewriting : 16.582 micros/op 60305 ops/sec; \
74                    # 4.2 MB/s (3433828 of 5427999 found)\n
75                    print(line)  # print output of the benchmark run
76                    token_list = line.strip().split()
77                    for ix, token in enumerate(token_list):
78                        if token.startswith(self.THROUGHPUT):
79                            # in above example, throughput = 60305 ops/sec
80                            output[self.THROUGHPUT] = (
81                                float(token_list[ix - 1])
82                            )
83                            break
84                elif get_perf_context and line.startswith(self.PERF_CON):
85                    # the following lines in the output contain perf context
86                    # statistics (refer example above)
87                    perf_context_begins = True
88                elif get_perf_context and perf_context_begins:
89                    # Sample perf_context output:
90                    # user_key_comparison_count = 500, block_cache_hit_count =\
91                    # 468, block_read_count = 580, block_read_byte = 445, ...
92                    token_list = line.strip().split(',')
93                    # token_list = ['user_key_comparison_count = 500',
94                    # 'block_cache_hit_count = 468','block_read_count = 580'...
95                    perf_context = {
96                        tk.split('=')[0].strip(): tk.split('=')[1].strip()
97                        for tk in token_list
98                        if tk
99                    }
100                    # TODO(poojam23): this is a hack and should be replaced
101                    # with the timestamp that db_bench will provide per printed
102                    # perf_context
103                    timestamp = int(time.time())
104                    perf_context_ts = {}
105                    for stat in perf_context.keys():
106                        perf_context_ts[stat] = {
107                            timestamp: int(perf_context[stat])
108                        }
109                    output[self.PERF_CON] = perf_context_ts
110                    perf_context_begins = False
111                elif line.startswith(self.DB_PATH):
112                    # line from sample output:
113                    # DB path: [/tmp/rocksdbtest-155919/dbbench]\n
114                    output[self.DB_PATH] = (
115                        line.split('[')[1].split(']')[0]
116                    )
117        return output
118
119    def get_log_options(self, db_options, db_path):
120        # get the location of the LOG file and the frequency at which stats are
121        # dumped in the LOG file
122        log_dir_path = None
123        stats_freq_sec = None
124        logs_file_prefix = None
125
126        # fetch frequency at which the stats are dumped in the Rocksdb logs
127        dump_period = 'DBOptions.stats_dump_period_sec'
128        # fetch the directory, if specified, in which the Rocksdb logs are
129        # dumped, by default logs are dumped in same location as database
130        log_dir = 'DBOptions.db_log_dir'
131        log_options = db_options.get_options([dump_period, log_dir])
132        if dump_period in log_options:
133            stats_freq_sec = int(log_options[dump_period][NO_COL_FAMILY])
134        if log_dir in log_options:
135            log_dir_path = log_options[log_dir][NO_COL_FAMILY]
136
137        log_file_name = DBBenchRunner.get_info_log_file_name(
138            log_dir_path, db_path
139        )
140
141        if not log_dir_path:
142            log_dir_path = db_path
143        if not log_dir_path.endswith('/'):
144            log_dir_path += '/'
145
146        logs_file_prefix = log_dir_path + log_file_name
147        return (logs_file_prefix, stats_freq_sec)
148
149    def _get_options_command_line_args_str(self, curr_options):
150        '''
151        This method uses the provided Rocksdb OPTIONS to create a string of
152        command-line arguments for db_bench.
153        The --options_file argument is always given and the options that are
154        not supported by the OPTIONS file are given as separate arguments.
155        '''
156        optional_args_str = DBBenchRunner.get_opt_args_str(
157            curr_options.get_misc_options()
158        )
159        # generate an options configuration file
160        options_file = curr_options.generate_options_config(nonce='12345')
161        optional_args_str += " --options_file=" + options_file
162        return optional_args_str
163
164    def _setup_db_before_experiment(self, curr_options, db_path):
165        # remove destination directory if it already exists
166        try:
167            shutil.rmtree(db_path, ignore_errors=True)
168        except OSError as e:
169            print('Error: rmdir ' + e.filename + ' ' + e.strerror)
170        # setup database with a million keys using the fillrandom benchmark
171        command = "%s --benchmarks=fillrandom --db=%s --num=1000000" % (
172            self.db_bench_binary, db_path
173        )
174        args_str = self._get_options_command_line_args_str(curr_options)
175        command += args_str
176        self._run_command(command)
177
178    def _build_experiment_command(self, curr_options, db_path):
179        command = "%s --benchmarks=%s --statistics --perf_level=3 --db=%s" % (
180            self.db_bench_binary, self.benchmark, db_path
181        )
182        # fetch the command-line arguments string for providing Rocksdb options
183        args_str = self._get_options_command_line_args_str(curr_options)
184        # handle the command-line args passed in the constructor, these
185        # arguments are specific to db_bench
186        for cmd_line_arg in self.db_bench_args:
187            args_str += (" --" + cmd_line_arg)
188        command += args_str
189        return command
190
191    def _run_command(self, command):
192        out_file = open(self.OUTPUT_FILE, "w+")
193        err_file = open(self.ERROR_FILE, "w+")
194        print('executing... - ' + command)
195        subprocess.call(command, shell=True, stdout=out_file, stderr=err_file)
196        out_file.close()
197        err_file.close()
198
199    def run_experiment(self, db_options, db_path):
200        # setup the Rocksdb database before running experiment
201        self._setup_db_before_experiment(db_options, db_path)
202        # get the command to run the experiment
203        command = self._build_experiment_command(db_options, db_path)
204        experiment_start_time = int(time.time())
205        # run experiment
206        self._run_command(command)
207        experiment_end_time = int(time.time())
208        # parse the db_bench experiment output
209        parsed_output = self._parse_output(get_perf_context=True)
210
211        # get the log files path prefix and frequency at which Rocksdb stats
212        # are dumped in the logs
213        logs_file_prefix, stats_freq_sec = self.get_log_options(
214            db_options, parsed_output[self.DB_PATH]
215        )
216        # create the Rocksbd LOGS object
217        db_logs = DatabaseLogs(
218            logs_file_prefix, db_options.get_column_families()
219        )
220        # Create the Log STATS object
221        db_log_stats = LogStatsParser(logs_file_prefix, stats_freq_sec)
222        # Create the PerfContext STATS object
223        db_perf_context = DatabasePerfContext(
224            parsed_output[self.PERF_CON], 0, False
225        )
226        # create the data-sources dictionary
227        data_sources = {
228            DataSource.Type.DB_OPTIONS: [db_options],
229            DataSource.Type.LOG: [db_logs],
230            DataSource.Type.TIME_SERIES: [db_log_stats, db_perf_context]
231        }
232        # Create the ODS STATS object
233        if self.ods_args:
234            key_prefix = ''
235            if 'key_prefix' in self.ods_args:
236                key_prefix = self.ods_args['key_prefix']
237            data_sources[DataSource.Type.TIME_SERIES].append(OdsStatsFetcher(
238                self.ods_args['client_script'],
239                self.ods_args['entity'],
240                experiment_start_time,
241                experiment_end_time,
242                key_prefix
243            ))
244        # return the experiment's data-sources and throughput
245        return data_sources, parsed_output[self.THROUGHPUT]
246