1# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2# This source code is licensed under both the GPLv2 (found in the 3# COPYING file in the root directory) and Apache 2.0 License 4# (found in the LICENSE.Apache file in the root directory). 5 6from advisor.db_log_parser import Log 7from advisor.db_timeseries_parser import TimeSeriesData, NO_ENTITY 8import copy 9import glob 10import re 11import subprocess 12import time 13 14 15class LogStatsParser(TimeSeriesData): 16 STATS = 'STATISTICS:' 17 18 @staticmethod 19 def parse_log_line_for_stats(log_line): 20 # Example stat line (from LOG file): 21 # "rocksdb.db.get.micros P50 : 8.4 P95 : 21.8 P99 : 33.9 P100 : 92.0\n" 22 token_list = log_line.strip().split() 23 # token_list = ['rocksdb.db.get.micros', 'P50', ':', '8.4', 'P95', ':', 24 # '21.8', 'P99', ':', '33.9', 'P100', ':', '92.0'] 25 stat_prefix = token_list[0] + '.' # 'rocksdb.db.get.micros.' 26 stat_values = [ 27 token 28 for token in token_list[1:] 29 if token != ':' 30 ] 31 # stat_values = ['P50', '8.4', 'P95', '21.8', 'P99', '33.9', 'P100', 32 # '92.0'] 33 stat_dict = {} 34 for ix, metric in enumerate(stat_values): 35 if ix % 2 == 0: 36 stat_name = stat_prefix + metric 37 stat_name = stat_name.lower() # Note: case insensitive names 38 else: 39 stat_dict[stat_name] = float(metric) 40 # stat_dict = {'rocksdb.db.get.micros.p50': 8.4, 41 # 'rocksdb.db.get.micros.p95': 21.8, 'rocksdb.db.get.micros.p99': 33.9, 42 # 'rocksdb.db.get.micros.p100': 92.0} 43 return stat_dict 44 45 def __init__(self, logs_path_prefix, stats_freq_sec): 46 super().__init__() 47 self.logs_file_prefix = logs_path_prefix 48 self.stats_freq_sec = stats_freq_sec 49 self.duration_sec = 60 50 51 def get_keys_from_conditions(self, conditions): 52 # Note: case insensitive stat names 53 reqd_stats = [] 54 for cond in conditions: 55 for key in cond.keys: 56 key = key.lower() 57 # some keys are prepended with '[]' for OdsStatsFetcher to 58 # replace this with the appropriate key_prefix, remove these 59 # characters here since the LogStatsParser does not need 60 # a prefix 61 if key.startswith('[]'): 62 reqd_stats.append(key[2:]) 63 else: 64 reqd_stats.append(key) 65 return reqd_stats 66 67 def add_to_timeseries(self, log, reqd_stats): 68 # this method takes in the Log object that contains the Rocksdb stats 69 # and a list of required stats, then it parses the stats line by line 70 # to fetch required stats and add them to the keys_ts object 71 # Example: reqd_stats = ['rocksdb.block.cache.hit.count', 72 # 'rocksdb.db.get.micros.p99'] 73 # Let log.get_message() returns following string: 74 # "[WARN] [db/db_impl.cc:485] STATISTICS:\n 75 # rocksdb.block.cache.miss COUNT : 1459\n 76 # rocksdb.block.cache.hit COUNT : 37\n 77 # ... 78 # rocksdb.db.get.micros P50 : 15.6 P95 : 39.7 P99 : 62.6 P100 : 148.0\n 79 # ..." 80 new_lines = log.get_message().split('\n') 81 # let log_ts = 1532518219 82 log_ts = log.get_timestamp() 83 # example updates to keys_ts: 84 # keys_ts[NO_ENTITY]['rocksdb.db.get.micros.p99'][1532518219] = 62.6 85 # keys_ts[NO_ENTITY]['rocksdb.block.cache.hit.count'][1532518219] = 37 86 for line in new_lines[1:]: # new_lines[0] does not contain any stats 87 stats_on_line = self.parse_log_line_for_stats(line) 88 for stat in stats_on_line: 89 if stat in reqd_stats: 90 if stat not in self.keys_ts[NO_ENTITY]: 91 self.keys_ts[NO_ENTITY][stat] = {} 92 self.keys_ts[NO_ENTITY][stat][log_ts] = stats_on_line[stat] 93 94 def fetch_timeseries(self, reqd_stats): 95 # this method parses the Rocksdb LOG file and generates timeseries for 96 # each of the statistic in the list reqd_stats 97 self.keys_ts = {NO_ENTITY: {}} 98 for file_name in glob.glob(self.logs_file_prefix + '*'): 99 # TODO(poojam23): find a way to distinguish between 'old' log files 100 # from current and previous experiments, present in the same 101 # directory 102 if re.search('old', file_name, re.IGNORECASE): 103 continue 104 with open(file_name, 'r') as db_logs: 105 new_log = None 106 for line in db_logs: 107 if Log.is_new_log(line): 108 if ( 109 new_log and 110 re.search(self.STATS, new_log.get_message()) 111 ): 112 self.add_to_timeseries(new_log, reqd_stats) 113 new_log = Log(line, column_families=[]) 114 else: 115 # To account for logs split into multiple lines 116 new_log.append_message(line) 117 # Check for the last log in the file. 118 if new_log and re.search(self.STATS, new_log.get_message()): 119 self.add_to_timeseries(new_log, reqd_stats) 120 121 122class DatabasePerfContext(TimeSeriesData): 123 # TODO(poojam23): check if any benchrunner provides PerfContext sampled at 124 # regular intervals 125 def __init__(self, perf_context_ts, stats_freq_sec, cumulative): 126 ''' 127 perf_context_ts is expected to be in the following format: 128 Dict[metric, Dict[timestamp, value]], where for 129 each (metric, timestamp) pair, the value is database-wide (i.e. 130 summed over all the threads involved) 131 if stats_freq_sec == 0, per-metric only one value is reported 132 ''' 133 super().__init__() 134 self.stats_freq_sec = stats_freq_sec 135 self.keys_ts = {NO_ENTITY: perf_context_ts} 136 if cumulative: 137 self.unaccumulate_metrics() 138 139 def unaccumulate_metrics(self): 140 # if the perf context metrics provided are cumulative in nature, this 141 # method can be used to convert them to a disjoint format 142 epoch_ts = copy.deepcopy(self.keys_ts) 143 for stat in self.keys_ts[NO_ENTITY]: 144 timeseries = sorted( 145 list(self.keys_ts[NO_ENTITY][stat].keys()), reverse=True 146 ) 147 if len(timeseries) < 2: 148 continue 149 for ix, ts in enumerate(timeseries[:-1]): 150 epoch_ts[NO_ENTITY][stat][ts] = ( 151 epoch_ts[NO_ENTITY][stat][ts] - 152 epoch_ts[NO_ENTITY][stat][timeseries[ix+1]] 153 ) 154 if epoch_ts[NO_ENTITY][stat][ts] < 0: 155 raise ValueError('DBPerfContext: really cumulative?') 156 # drop the smallest timestamp in the timeseries for this metric 157 epoch_ts[NO_ENTITY][stat].pop(timeseries[-1]) 158 self.keys_ts = epoch_ts 159 160 def get_keys_from_conditions(self, conditions): 161 reqd_stats = [] 162 for cond in conditions: 163 reqd_stats.extend([key.lower() for key in cond.keys]) 164 return reqd_stats 165 166 def fetch_timeseries(self, statistics): 167 # this method is redundant for DatabasePerfContext because the __init__ 168 # does the job of populating 'keys_ts' 169 pass 170 171 172class OdsStatsFetcher(TimeSeriesData): 173 # class constants 174 OUTPUT_FILE = 'temp/stats_out.tmp' 175 ERROR_FILE = 'temp/stats_err.tmp' 176 RAPIDO_COMMAND = "%s --entity=%s --key=%s --tstart=%s --tend=%s --showtime" 177 178 # static methods 179 @staticmethod 180 def _get_string_in_quotes(value): 181 return '"' + str(value) + '"' 182 183 @staticmethod 184 def _get_time_value_pair(pair_string): 185 # example pair_string: '[1532544591, 97.3653601828]' 186 pair_string = pair_string.replace('[', '') 187 pair_string = pair_string.replace(']', '') 188 pair = pair_string.split(',') 189 first = int(pair[0].strip()) 190 second = float(pair[1].strip()) 191 return [first, second] 192 193 @staticmethod 194 def _get_ods_cli_stime(start_time): 195 diff = int(time.time() - int(start_time)) 196 stime = str(diff) + '_s' 197 return stime 198 199 def __init__( 200 self, client, entities, start_time, end_time, key_prefix=None 201 ): 202 super().__init__() 203 self.client = client 204 self.entities = entities 205 self.start_time = start_time 206 self.end_time = end_time 207 self.key_prefix = key_prefix 208 self.stats_freq_sec = 60 209 self.duration_sec = 60 210 211 def execute_script(self, command): 212 print('executing...') 213 print(command) 214 out_file = open(self.OUTPUT_FILE, "w+") 215 err_file = open(self.ERROR_FILE, "w+") 216 subprocess.call(command, shell=True, stdout=out_file, stderr=err_file) 217 out_file.close() 218 err_file.close() 219 220 def parse_rapido_output(self): 221 # Output looks like the following: 222 # <entity_name>\t<key_name>\t[[ts, value], [ts, value], ...] 223 # ts = timestamp; value = value of key_name in entity_name at time ts 224 self.keys_ts = {} 225 with open(self.OUTPUT_FILE, 'r') as fp: 226 for line in fp: 227 token_list = line.strip().split('\t') 228 entity = token_list[0] 229 key = token_list[1] 230 if entity not in self.keys_ts: 231 self.keys_ts[entity] = {} 232 if key not in self.keys_ts[entity]: 233 self.keys_ts[entity][key] = {} 234 list_of_lists = [ 235 self._get_time_value_pair(pair_string) 236 for pair_string in token_list[2].split('],') 237 ] 238 value = {pair[0]: pair[1] for pair in list_of_lists} 239 self.keys_ts[entity][key] = value 240 241 def parse_ods_output(self): 242 # Output looks like the following: 243 # <entity_name>\t<key_name>\t<timestamp>\t<value> 244 # there is one line per (entity_name, key_name, timestamp) 245 self.keys_ts = {} 246 with open(self.OUTPUT_FILE, 'r') as fp: 247 for line in fp: 248 token_list = line.split() 249 entity = token_list[0] 250 if entity not in self.keys_ts: 251 self.keys_ts[entity] = {} 252 key = token_list[1] 253 if key not in self.keys_ts[entity]: 254 self.keys_ts[entity][key] = {} 255 self.keys_ts[entity][key][token_list[2]] = token_list[3] 256 257 def fetch_timeseries(self, statistics): 258 # this method fetches the timeseries of required stats from the ODS 259 # service and populates the 'keys_ts' object appropriately 260 print('OdsStatsFetcher: fetching ' + str(statistics)) 261 if re.search('rapido', self.client, re.IGNORECASE): 262 command = self.RAPIDO_COMMAND % ( 263 self.client, 264 self._get_string_in_quotes(self.entities), 265 self._get_string_in_quotes(','.join(statistics)), 266 self._get_string_in_quotes(self.start_time), 267 self._get_string_in_quotes(self.end_time) 268 ) 269 # Run the tool and fetch the time-series data 270 self.execute_script(command) 271 # Parse output and populate the 'keys_ts' map 272 self.parse_rapido_output() 273 elif re.search('ods', self.client, re.IGNORECASE): 274 command = ( 275 self.client + ' ' + 276 '--stime=' + self._get_ods_cli_stime(self.start_time) + ' ' + 277 self._get_string_in_quotes(self.entities) + ' ' + 278 self._get_string_in_quotes(','.join(statistics)) 279 ) 280 # Run the tool and fetch the time-series data 281 self.execute_script(command) 282 # Parse output and populate the 'keys_ts' map 283 self.parse_ods_output() 284 285 def get_keys_from_conditions(self, conditions): 286 reqd_stats = [] 287 for cond in conditions: 288 for key in cond.keys: 289 use_prefix = False 290 if key.startswith('[]'): 291 use_prefix = True 292 key = key[2:] 293 # TODO(poojam23): this is very hacky and needs to be improved 294 if key.startswith("rocksdb"): 295 key += ".60" 296 if use_prefix: 297 if not self.key_prefix: 298 print('Warning: OdsStatsFetcher might need key prefix') 299 print('for the key: ' + key) 300 else: 301 key = self.key_prefix + "." + key 302 reqd_stats.append(key) 303 return reqd_stats 304 305 def fetch_rate_url(self, entities, keys, window_len, percent, display): 306 # type: (List[str], List[str], str, str, bool) -> str 307 transform_desc = ( 308 "rate(" + str(window_len) + ",duration=" + str(self.duration_sec) 309 ) 310 if percent: 311 transform_desc = transform_desc + ",%)" 312 else: 313 transform_desc = transform_desc + ")" 314 if re.search('rapido', self.client, re.IGNORECASE): 315 command = self.RAPIDO_COMMAND + " --transform=%s --url=%s" 316 command = command % ( 317 self.client, 318 self._get_string_in_quotes(','.join(entities)), 319 self._get_string_in_quotes(','.join(keys)), 320 self._get_string_in_quotes(self.start_time), 321 self._get_string_in_quotes(self.end_time), 322 self._get_string_in_quotes(transform_desc), 323 self._get_string_in_quotes(display) 324 ) 325 elif re.search('ods', self.client, re.IGNORECASE): 326 command = ( 327 self.client + ' ' + 328 '--stime=' + self._get_ods_cli_stime(self.start_time) + ' ' + 329 '--fburlonly ' + 330 self._get_string_in_quotes(entities) + ' ' + 331 self._get_string_in_quotes(','.join(keys)) + ' ' + 332 self._get_string_in_quotes(transform_desc) 333 ) 334 self.execute_script(command) 335 url = "" 336 with open(self.OUTPUT_FILE, 'r') as fp: 337 url = fp.readline() 338 return url 339