1# Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2#  This source code is licensed under both the GPLv2 (found in the
3#  COPYING file in the root directory) and Apache 2.0 License
4#  (found in the LICENSE.Apache file in the root directory).
5
6from advisor.db_log_parser import Log
7from advisor.db_timeseries_parser import TimeSeriesData, NO_ENTITY
8import copy
9import glob
10import re
11import subprocess
12import time
13
14
15class LogStatsParser(TimeSeriesData):
16    STATS = 'STATISTICS:'
17
18    @staticmethod
19    def parse_log_line_for_stats(log_line):
20        # Example stat line (from LOG file):
21        # "rocksdb.db.get.micros P50 : 8.4 P95 : 21.8 P99 : 33.9 P100 : 92.0\n"
22        token_list = log_line.strip().split()
23        # token_list = ['rocksdb.db.get.micros', 'P50', ':', '8.4', 'P95', ':',
24        # '21.8', 'P99', ':', '33.9', 'P100', ':', '92.0']
25        stat_prefix = token_list[0] + '.'  # 'rocksdb.db.get.micros.'
26        stat_values = [
27            token
28            for token in token_list[1:]
29            if token != ':'
30        ]
31        # stat_values = ['P50', '8.4', 'P95', '21.8', 'P99', '33.9', 'P100',
32        # '92.0']
33        stat_dict = {}
34        for ix, metric in enumerate(stat_values):
35            if ix % 2 == 0:
36                stat_name = stat_prefix + metric
37                stat_name = stat_name.lower()  # Note: case insensitive names
38            else:
39                stat_dict[stat_name] = float(metric)
40        # stat_dict = {'rocksdb.db.get.micros.p50': 8.4,
41        # 'rocksdb.db.get.micros.p95': 21.8, 'rocksdb.db.get.micros.p99': 33.9,
42        # 'rocksdb.db.get.micros.p100': 92.0}
43        return stat_dict
44
45    def __init__(self, logs_path_prefix, stats_freq_sec):
46        super().__init__()
47        self.logs_file_prefix = logs_path_prefix
48        self.stats_freq_sec = stats_freq_sec
49        self.duration_sec = 60
50
51    def get_keys_from_conditions(self, conditions):
52        # Note: case insensitive stat names
53        reqd_stats = []
54        for cond in conditions:
55            for key in cond.keys:
56                key = key.lower()
57                # some keys are prepended with '[]' for OdsStatsFetcher to
58                # replace this with the appropriate key_prefix, remove these
59                # characters here since the LogStatsParser does not need
60                # a prefix
61                if key.startswith('[]'):
62                    reqd_stats.append(key[2:])
63                else:
64                    reqd_stats.append(key)
65        return reqd_stats
66
67    def add_to_timeseries(self, log, reqd_stats):
68        # this method takes in the Log object that contains the Rocksdb stats
69        # and a list of required stats, then it parses the stats line by line
70        # to fetch required stats and add them to the keys_ts object
71        # Example: reqd_stats = ['rocksdb.block.cache.hit.count',
72        # 'rocksdb.db.get.micros.p99']
73        # Let log.get_message() returns following string:
74        # "[WARN] [db/db_impl.cc:485] STATISTICS:\n
75        # rocksdb.block.cache.miss COUNT : 1459\n
76        # rocksdb.block.cache.hit COUNT : 37\n
77        # ...
78        # rocksdb.db.get.micros P50 : 15.6 P95 : 39.7 P99 : 62.6 P100 : 148.0\n
79        # ..."
80        new_lines = log.get_message().split('\n')
81        # let log_ts = 1532518219
82        log_ts = log.get_timestamp()
83        # example updates to keys_ts:
84        # keys_ts[NO_ENTITY]['rocksdb.db.get.micros.p99'][1532518219] = 62.6
85        # keys_ts[NO_ENTITY]['rocksdb.block.cache.hit.count'][1532518219] = 37
86        for line in new_lines[1:]:  # new_lines[0] does not contain any stats
87            stats_on_line = self.parse_log_line_for_stats(line)
88            for stat in stats_on_line:
89                if stat in reqd_stats:
90                    if stat not in self.keys_ts[NO_ENTITY]:
91                        self.keys_ts[NO_ENTITY][stat] = {}
92                    self.keys_ts[NO_ENTITY][stat][log_ts] = stats_on_line[stat]
93
94    def fetch_timeseries(self, reqd_stats):
95        # this method parses the Rocksdb LOG file and generates timeseries for
96        # each of the statistic in the list reqd_stats
97        self.keys_ts = {NO_ENTITY: {}}
98        for file_name in glob.glob(self.logs_file_prefix + '*'):
99            # TODO(poojam23): find a way to distinguish between 'old' log files
100            # from current and previous experiments, present in the same
101            # directory
102            if re.search('old', file_name, re.IGNORECASE):
103                continue
104            with open(file_name, 'r') as db_logs:
105                new_log = None
106                for line in db_logs:
107                    if Log.is_new_log(line):
108                        if (
109                            new_log and
110                            re.search(self.STATS, new_log.get_message())
111                        ):
112                            self.add_to_timeseries(new_log, reqd_stats)
113                        new_log = Log(line, column_families=[])
114                    else:
115                        # To account for logs split into multiple lines
116                        new_log.append_message(line)
117            # Check for the last log in the file.
118            if new_log and re.search(self.STATS, new_log.get_message()):
119                self.add_to_timeseries(new_log, reqd_stats)
120
121
122class DatabasePerfContext(TimeSeriesData):
123    # TODO(poojam23): check if any benchrunner provides PerfContext sampled at
124    # regular intervals
125    def __init__(self, perf_context_ts, stats_freq_sec, cumulative):
126        '''
127        perf_context_ts is expected to be in the following format:
128        Dict[metric, Dict[timestamp, value]], where for
129        each (metric, timestamp) pair, the value is database-wide (i.e.
130        summed over all the threads involved)
131        if stats_freq_sec == 0, per-metric only one value is reported
132        '''
133        super().__init__()
134        self.stats_freq_sec = stats_freq_sec
135        self.keys_ts = {NO_ENTITY: perf_context_ts}
136        if cumulative:
137            self.unaccumulate_metrics()
138
139    def unaccumulate_metrics(self):
140        # if the perf context metrics provided are cumulative in nature, this
141        # method can be used to convert them to a disjoint format
142        epoch_ts = copy.deepcopy(self.keys_ts)
143        for stat in self.keys_ts[NO_ENTITY]:
144            timeseries = sorted(
145                list(self.keys_ts[NO_ENTITY][stat].keys()), reverse=True
146            )
147            if len(timeseries) < 2:
148                continue
149            for ix, ts in enumerate(timeseries[:-1]):
150                epoch_ts[NO_ENTITY][stat][ts] = (
151                    epoch_ts[NO_ENTITY][stat][ts] -
152                    epoch_ts[NO_ENTITY][stat][timeseries[ix+1]]
153                )
154                if epoch_ts[NO_ENTITY][stat][ts] < 0:
155                    raise ValueError('DBPerfContext: really cumulative?')
156            # drop the smallest timestamp in the timeseries for this metric
157            epoch_ts[NO_ENTITY][stat].pop(timeseries[-1])
158        self.keys_ts = epoch_ts
159
160    def get_keys_from_conditions(self, conditions):
161        reqd_stats = []
162        for cond in conditions:
163            reqd_stats.extend([key.lower() for key in cond.keys])
164        return reqd_stats
165
166    def fetch_timeseries(self, statistics):
167        # this method is redundant for DatabasePerfContext because the __init__
168        # does the job of populating 'keys_ts'
169        pass
170
171
172class OdsStatsFetcher(TimeSeriesData):
173    # class constants
174    OUTPUT_FILE = 'temp/stats_out.tmp'
175    ERROR_FILE = 'temp/stats_err.tmp'
176    RAPIDO_COMMAND = "%s --entity=%s --key=%s --tstart=%s --tend=%s --showtime"
177
178    # static methods
179    @staticmethod
180    def _get_string_in_quotes(value):
181        return '"' + str(value) + '"'
182
183    @staticmethod
184    def _get_time_value_pair(pair_string):
185        # example pair_string: '[1532544591, 97.3653601828]'
186        pair_string = pair_string.replace('[', '')
187        pair_string = pair_string.replace(']', '')
188        pair = pair_string.split(',')
189        first = int(pair[0].strip())
190        second = float(pair[1].strip())
191        return [first, second]
192
193    @staticmethod
194    def _get_ods_cli_stime(start_time):
195        diff = int(time.time() - int(start_time))
196        stime = str(diff) + '_s'
197        return stime
198
199    def __init__(
200        self, client, entities, start_time, end_time, key_prefix=None
201    ):
202        super().__init__()
203        self.client = client
204        self.entities = entities
205        self.start_time = start_time
206        self.end_time = end_time
207        self.key_prefix = key_prefix
208        self.stats_freq_sec = 60
209        self.duration_sec = 60
210
211    def execute_script(self, command):
212        print('executing...')
213        print(command)
214        out_file = open(self.OUTPUT_FILE, "w+")
215        err_file = open(self.ERROR_FILE, "w+")
216        subprocess.call(command, shell=True, stdout=out_file, stderr=err_file)
217        out_file.close()
218        err_file.close()
219
220    def parse_rapido_output(self):
221        # Output looks like the following:
222        # <entity_name>\t<key_name>\t[[ts, value], [ts, value], ...]
223        # ts = timestamp; value = value of key_name in entity_name at time ts
224        self.keys_ts = {}
225        with open(self.OUTPUT_FILE, 'r') as fp:
226            for line in fp:
227                token_list = line.strip().split('\t')
228                entity = token_list[0]
229                key = token_list[1]
230                if entity not in self.keys_ts:
231                    self.keys_ts[entity] = {}
232                if key not in self.keys_ts[entity]:
233                    self.keys_ts[entity][key] = {}
234                list_of_lists = [
235                    self._get_time_value_pair(pair_string)
236                    for pair_string in token_list[2].split('],')
237                ]
238                value = {pair[0]: pair[1] for pair in list_of_lists}
239                self.keys_ts[entity][key] = value
240
241    def parse_ods_output(self):
242        # Output looks like the following:
243        # <entity_name>\t<key_name>\t<timestamp>\t<value>
244        # there is one line per (entity_name, key_name, timestamp)
245        self.keys_ts = {}
246        with open(self.OUTPUT_FILE, 'r') as fp:
247            for line in fp:
248                token_list = line.split()
249                entity = token_list[0]
250                if entity not in self.keys_ts:
251                    self.keys_ts[entity] = {}
252                key = token_list[1]
253                if key not in self.keys_ts[entity]:
254                    self.keys_ts[entity][key] = {}
255                self.keys_ts[entity][key][token_list[2]] = token_list[3]
256
257    def fetch_timeseries(self, statistics):
258        # this method fetches the timeseries of required stats from the ODS
259        # service and populates the 'keys_ts' object appropriately
260        print('OdsStatsFetcher: fetching ' + str(statistics))
261        if re.search('rapido', self.client, re.IGNORECASE):
262            command = self.RAPIDO_COMMAND % (
263                self.client,
264                self._get_string_in_quotes(self.entities),
265                self._get_string_in_quotes(','.join(statistics)),
266                self._get_string_in_quotes(self.start_time),
267                self._get_string_in_quotes(self.end_time)
268            )
269            # Run the tool and fetch the time-series data
270            self.execute_script(command)
271            # Parse output and populate the 'keys_ts' map
272            self.parse_rapido_output()
273        elif re.search('ods', self.client, re.IGNORECASE):
274            command = (
275                self.client + ' ' +
276                '--stime=' + self._get_ods_cli_stime(self.start_time) + ' ' +
277                self._get_string_in_quotes(self.entities) + ' ' +
278                self._get_string_in_quotes(','.join(statistics))
279            )
280            # Run the tool and fetch the time-series data
281            self.execute_script(command)
282            # Parse output and populate the 'keys_ts' map
283            self.parse_ods_output()
284
285    def get_keys_from_conditions(self, conditions):
286        reqd_stats = []
287        for cond in conditions:
288            for key in cond.keys:
289                use_prefix = False
290                if key.startswith('[]'):
291                    use_prefix = True
292                    key = key[2:]
293                # TODO(poojam23): this is very hacky and needs to be improved
294                if key.startswith("rocksdb"):
295                    key += ".60"
296                if use_prefix:
297                    if not self.key_prefix:
298                        print('Warning: OdsStatsFetcher might need key prefix')
299                        print('for the key: ' + key)
300                    else:
301                        key = self.key_prefix + "." + key
302                reqd_stats.append(key)
303        return reqd_stats
304
305    def fetch_rate_url(self, entities, keys, window_len, percent, display):
306        # type: (List[str], List[str], str, str, bool) -> str
307        transform_desc = (
308            "rate(" + str(window_len) + ",duration=" + str(self.duration_sec)
309        )
310        if percent:
311            transform_desc = transform_desc + ",%)"
312        else:
313            transform_desc = transform_desc + ")"
314        if re.search('rapido', self.client, re.IGNORECASE):
315            command = self.RAPIDO_COMMAND + " --transform=%s --url=%s"
316            command = command % (
317                self.client,
318                self._get_string_in_quotes(','.join(entities)),
319                self._get_string_in_quotes(','.join(keys)),
320                self._get_string_in_quotes(self.start_time),
321                self._get_string_in_quotes(self.end_time),
322                self._get_string_in_quotes(transform_desc),
323                self._get_string_in_quotes(display)
324            )
325        elif re.search('ods', self.client, re.IGNORECASE):
326            command = (
327                self.client + ' ' +
328                '--stime=' + self._get_ods_cli_stime(self.start_time) + ' ' +
329                '--fburlonly ' +
330                self._get_string_in_quotes(entities) + ' ' +
331                self._get_string_in_quotes(','.join(keys)) + ' ' +
332                self._get_string_in_quotes(transform_desc)
333            )
334        self.execute_script(command)
335        url = ""
336        with open(self.OUTPUT_FILE, 'r') as fp:
337            url = fp.readline()
338        return url
339