1"""Copyright 2008 Orbitz WorldWide
2
3Licensed under the Apache License, Version 2.0 (the "License");
4you may not use this file except in compliance with the License.
5You may obtain a copy of the License at
6
7     http://www.apache.org/licenses/LICENSE-2.0
8
9Unless required by applicable law or agreed to in writing, software
10distributed under the License is distributed on an "AS IS" BASIS,
11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12See the License for the specific language governing permissions and
13limitations under the License."""
14from collections import defaultdict
15from structlog import get_logger
16
17from ..utils import epoch
18
19logger = get_logger()
20
21
22class TimeSeries(list):
23    def __init__(self, name, start, end, step, values, consolidate='average'):
24        list.__init__(self, values)
25        self.name = name
26        self.start = start
27        self.end = end
28        self.step = step
29        self.consolidationFunc = consolidate
30        self.valuesPerPoint = 1
31        self.options = {}
32
33    def __iter__(self):
34        if self.valuesPerPoint > 1:
35            return self.__consolidatingGenerator(list.__iter__(self))
36        else:
37            return list.__iter__(self)
38
39    def consolidate(self, valuesPerPoint):
40        self.valuesPerPoint = int(valuesPerPoint)
41
42    def __consolidatingGenerator(self, gen):
43        buf = []
44        for x in gen:
45            buf.append(x)
46            if len(buf) == self.valuesPerPoint:
47                while None in buf:
48                    buf.remove(None)
49                if buf:
50                    yield self.__consolidate(buf)
51                    buf = []
52                else:
53                    yield None
54        while None in buf:
55            buf.remove(None)
56        if buf:
57            yield self.__consolidate(buf)
58        else:
59            yield None
60        return
61
62    def __consolidate(self, values):
63        usable = [v for v in values if v is not None]
64        if not usable:
65            return None
66        if self.consolidationFunc == 'sum':
67            return sum(usable)
68        if self.consolidationFunc == 'average':
69            return float(sum(usable)) / len(usable)
70        if self.consolidationFunc == 'max':
71            return max(usable)
72        if self.consolidationFunc == 'min':
73            return min(usable)
74        raise Exception(
75            "Invalid consolidation function: '%s'" % self.consolidationFunc)
76
77    def __repr__(self):
78        return 'TimeSeries(name=%s, start=%s, end=%s, step=%s)' % (
79            self.name, self.start, self.end, self.step)
80
81
82class DataStore(object):
83    """
84    Simple object to store results of multi fetches.
85    Also aids in looking up data by pathExpressions.
86    """
87    def __init__(self):
88        self.paths = defaultdict(set)
89        self.data = defaultdict(list)
90
91    def get_paths(self, path_expr):
92        """
93        Returns all paths found for path_expr
94        """
95        return sorted(self.paths[path_expr])
96
97    def add_data(self, path, time_info, data, exprs):
98        """
99        Stores data before it can be put into a time series
100        """
101        # Dont add if empty
102        if not nonempty(data):
103            for d in self.data[path]:
104                if nonempty(d['values']):
105                    return
106
107        # Add data to path
108        for expr in exprs:
109            self.paths[expr].add(path)
110        self.data[path].append({
111            'time_info': time_info,
112            'values': data
113        })
114
115    def get_series_list(self, path_expr):
116        series_list = []
117        for path in self.get_paths(path_expr):
118            for data in self.data.get(path):
119                start, end, step = data['time_info']
120                series = TimeSeries(path, start, end, step, data['values'])
121                series.pathExpression = path_expr
122                series_list.append(series)
123        return series_list
124
125
126def fetchData(requestContext, pathExprs):
127    from ..app import app
128    startTime = int(epoch(requestContext['startTime']))
129    endTime = int(epoch(requestContext['endTime']))
130
131    # Convert to list if given single path
132    if not isinstance(pathExprs, list):
133        pathExprs = [pathExprs]
134
135    data_store = DataStore()
136    multi_nodes = defaultdict(list)
137    single_nodes = []
138
139    path_to_exprs = defaultdict(list)
140
141    # Group nodes that support multiple fetches
142    for pathExpr in pathExprs:
143        for node in app.store.find(pathExpr, startTime, endTime):
144            if not node.is_leaf:
145                continue
146            if node.path not in path_to_exprs:
147                if hasattr(node, '__fetch_multi__'):
148                    multi_nodes[node.__fetch_multi__].append(node)
149                else:
150                    single_nodes.append(node)
151            path_to_exprs[node.path].append(pathExpr)
152
153    # Multi fetches
154    for finder in app.store.finders:
155        if not hasattr(finder, '__fetch_multi__'):
156            continue
157        nodes = multi_nodes[finder.__fetch_multi__]
158        if not nodes:
159            continue
160        time_info, series = finder.fetch_multi(nodes, startTime, endTime)
161        for path, values in series.items():
162            data_store.add_data(path, time_info, values,
163                                path_to_exprs[path])
164
165    # Single fetches
166    fetches = [
167        (node, node.fetch(startTime, endTime)) for node in single_nodes]
168    for node, results in fetches:
169        if not results:
170            logger.info("no results", node=node, start=startTime,
171                        end=endTime)
172            continue
173
174        try:
175            time_info, values = results
176        except ValueError as e:
177            raise Exception("could not parse timeInfo/values from metric "
178                            "'%s': %s" % (node.path, e))
179        data_store.add_data(node.path, time_info, values,
180                            path_to_exprs[node.path])
181
182    return data_store
183
184
185def nonempty(series):
186    for value in series:
187        if value is not None:
188            return True
189    return False
190