1# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2# This source code is licensed under both the GPLv2 (found in the 3# COPYING file in the root directory) and Apache 2.0 License 4# (found in the LICENSE.Apache file in the root directory). 5 6from advisor.bench_runner import BenchmarkRunner 7from advisor.db_log_parser import DataSource, DatabaseLogs, NO_COL_FAMILY 8from advisor.db_stats_fetcher import ( 9 LogStatsParser, OdsStatsFetcher, DatabasePerfContext 10) 11import shutil 12import subprocess 13import time 14 15 16''' 17NOTE: This is not thread-safe, because the output file is simply overwritten. 18''' 19 20 21class DBBenchRunner(BenchmarkRunner): 22 OUTPUT_FILE = "temp/dbbench_out.tmp" 23 ERROR_FILE = "temp/dbbench_err.tmp" 24 DB_PATH = "DB path" 25 THROUGHPUT = "ops/sec" 26 PERF_CON = " PERF_CONTEXT:" 27 28 @staticmethod 29 def is_metric_better(new_metric, old_metric): 30 # for db_bench 'throughput' is the metric returned by run_experiment 31 return new_metric >= old_metric 32 33 @staticmethod 34 def get_opt_args_str(misc_options_dict): 35 # given a dictionary of options and their values, return a string 36 # that can be appended as command-line arguments 37 optional_args_str = "" 38 for option_name, option_value in misc_options_dict.items(): 39 if option_value: 40 optional_args_str += ( 41 " --" + option_name + "=" + str(option_value) 42 ) 43 return optional_args_str 44 45 def __init__(self, positional_args, ods_args=None): 46 # parse positional_args list appropriately 47 self.db_bench_binary = positional_args[0] 48 self.benchmark = positional_args[1] 49 self.db_bench_args = None 50 if len(positional_args) > 2: 51 # options list with each option given as "<option>=<value>" 52 self.db_bench_args = positional_args[2:] 53 # save ods_args, if provided 54 self.ods_args = ods_args 55 56 def _parse_output(self, get_perf_context=False): 57 ''' 58 Sample db_bench output after running 'readwhilewriting' benchmark: 59 DB path: [/tmp/rocksdbtest-155919/dbbench]\n 60 readwhilewriting : 16.582 micros/op 60305 ops/sec; 4.2 MB/s (3433828\ 61 of 5427999 found)\n 62 PERF_CONTEXT:\n 63 user_key_comparison_count = 500466712, block_cache_hit_count = ...\n 64 ''' 65 output = { 66 self.THROUGHPUT: None, self.DB_PATH: None, self.PERF_CON: None 67 } 68 perf_context_begins = False 69 with open(self.OUTPUT_FILE, 'r') as fp: 70 for line in fp: 71 if line.startswith(self.benchmark): 72 # line from sample output: 73 # readwhilewriting : 16.582 micros/op 60305 ops/sec; \ 74 # 4.2 MB/s (3433828 of 5427999 found)\n 75 print(line) # print output of the benchmark run 76 token_list = line.strip().split() 77 for ix, token in enumerate(token_list): 78 if token.startswith(self.THROUGHPUT): 79 # in above example, throughput = 60305 ops/sec 80 output[self.THROUGHPUT] = ( 81 float(token_list[ix - 1]) 82 ) 83 break 84 elif get_perf_context and line.startswith(self.PERF_CON): 85 # the following lines in the output contain perf context 86 # statistics (refer example above) 87 perf_context_begins = True 88 elif get_perf_context and perf_context_begins: 89 # Sample perf_context output: 90 # user_key_comparison_count = 500, block_cache_hit_count =\ 91 # 468, block_read_count = 580, block_read_byte = 445, ... 92 token_list = line.strip().split(',') 93 # token_list = ['user_key_comparison_count = 500', 94 # 'block_cache_hit_count = 468','block_read_count = 580'... 95 perf_context = { 96 tk.split('=')[0].strip(): tk.split('=')[1].strip() 97 for tk in token_list 98 if tk 99 } 100 # TODO(poojam23): this is a hack and should be replaced 101 # with the timestamp that db_bench will provide per printed 102 # perf_context 103 timestamp = int(time.time()) 104 perf_context_ts = {} 105 for stat in perf_context.keys(): 106 perf_context_ts[stat] = { 107 timestamp: int(perf_context[stat]) 108 } 109 output[self.PERF_CON] = perf_context_ts 110 perf_context_begins = False 111 elif line.startswith(self.DB_PATH): 112 # line from sample output: 113 # DB path: [/tmp/rocksdbtest-155919/dbbench]\n 114 output[self.DB_PATH] = ( 115 line.split('[')[1].split(']')[0] 116 ) 117 return output 118 119 def get_log_options(self, db_options, db_path): 120 # get the location of the LOG file and the frequency at which stats are 121 # dumped in the LOG file 122 log_dir_path = None 123 stats_freq_sec = None 124 logs_file_prefix = None 125 126 # fetch frequency at which the stats are dumped in the Rocksdb logs 127 dump_period = 'DBOptions.stats_dump_period_sec' 128 # fetch the directory, if specified, in which the Rocksdb logs are 129 # dumped, by default logs are dumped in same location as database 130 log_dir = 'DBOptions.db_log_dir' 131 log_options = db_options.get_options([dump_period, log_dir]) 132 if dump_period in log_options: 133 stats_freq_sec = int(log_options[dump_period][NO_COL_FAMILY]) 134 if log_dir in log_options: 135 log_dir_path = log_options[log_dir][NO_COL_FAMILY] 136 137 log_file_name = DBBenchRunner.get_info_log_file_name( 138 log_dir_path, db_path 139 ) 140 141 if not log_dir_path: 142 log_dir_path = db_path 143 if not log_dir_path.endswith('/'): 144 log_dir_path += '/' 145 146 logs_file_prefix = log_dir_path + log_file_name 147 return (logs_file_prefix, stats_freq_sec) 148 149 def _get_options_command_line_args_str(self, curr_options): 150 ''' 151 This method uses the provided Rocksdb OPTIONS to create a string of 152 command-line arguments for db_bench. 153 The --options_file argument is always given and the options that are 154 not supported by the OPTIONS file are given as separate arguments. 155 ''' 156 optional_args_str = DBBenchRunner.get_opt_args_str( 157 curr_options.get_misc_options() 158 ) 159 # generate an options configuration file 160 options_file = curr_options.generate_options_config(nonce='12345') 161 optional_args_str += " --options_file=" + options_file 162 return optional_args_str 163 164 def _setup_db_before_experiment(self, curr_options, db_path): 165 # remove destination directory if it already exists 166 try: 167 shutil.rmtree(db_path, ignore_errors=True) 168 except OSError as e: 169 print('Error: rmdir ' + e.filename + ' ' + e.strerror) 170 # setup database with a million keys using the fillrandom benchmark 171 command = "%s --benchmarks=fillrandom --db=%s --num=1000000" % ( 172 self.db_bench_binary, db_path 173 ) 174 args_str = self._get_options_command_line_args_str(curr_options) 175 command += args_str 176 self._run_command(command) 177 178 def _build_experiment_command(self, curr_options, db_path): 179 command = "%s --benchmarks=%s --statistics --perf_level=3 --db=%s" % ( 180 self.db_bench_binary, self.benchmark, db_path 181 ) 182 # fetch the command-line arguments string for providing Rocksdb options 183 args_str = self._get_options_command_line_args_str(curr_options) 184 # handle the command-line args passed in the constructor, these 185 # arguments are specific to db_bench 186 for cmd_line_arg in self.db_bench_args: 187 args_str += (" --" + cmd_line_arg) 188 command += args_str 189 return command 190 191 def _run_command(self, command): 192 out_file = open(self.OUTPUT_FILE, "w+") 193 err_file = open(self.ERROR_FILE, "w+") 194 print('executing... - ' + command) 195 subprocess.call(command, shell=True, stdout=out_file, stderr=err_file) 196 out_file.close() 197 err_file.close() 198 199 def run_experiment(self, db_options, db_path): 200 # setup the Rocksdb database before running experiment 201 self._setup_db_before_experiment(db_options, db_path) 202 # get the command to run the experiment 203 command = self._build_experiment_command(db_options, db_path) 204 experiment_start_time = int(time.time()) 205 # run experiment 206 self._run_command(command) 207 experiment_end_time = int(time.time()) 208 # parse the db_bench experiment output 209 parsed_output = self._parse_output(get_perf_context=True) 210 211 # get the log files path prefix and frequency at which Rocksdb stats 212 # are dumped in the logs 213 logs_file_prefix, stats_freq_sec = self.get_log_options( 214 db_options, parsed_output[self.DB_PATH] 215 ) 216 # create the Rocksbd LOGS object 217 db_logs = DatabaseLogs( 218 logs_file_prefix, db_options.get_column_families() 219 ) 220 # Create the Log STATS object 221 db_log_stats = LogStatsParser(logs_file_prefix, stats_freq_sec) 222 # Create the PerfContext STATS object 223 db_perf_context = DatabasePerfContext( 224 parsed_output[self.PERF_CON], 0, False 225 ) 226 # create the data-sources dictionary 227 data_sources = { 228 DataSource.Type.DB_OPTIONS: [db_options], 229 DataSource.Type.LOG: [db_logs], 230 DataSource.Type.TIME_SERIES: [db_log_stats, db_perf_context] 231 } 232 # Create the ODS STATS object 233 if self.ods_args: 234 key_prefix = '' 235 if 'key_prefix' in self.ods_args: 236 key_prefix = self.ods_args['key_prefix'] 237 data_sources[DataSource.Type.TIME_SERIES].append(OdsStatsFetcher( 238 self.ods_args['client_script'], 239 self.ods_args['entity'], 240 experiment_start_time, 241 experiment_end_time, 242 key_prefix 243 )) 244 # return the experiment's data-sources and throughput 245 return data_sources, parsed_output[self.THROUGHPUT] 246