1# Copyright 2019, OpenCensus Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import itertools
16import logging
17
18from opencensus.common import utils
19from opencensus.common.schedule import PeriodicTask
20from opencensus.trace import execution_context
21
22logger = logging.getLogger(__name__)
23
24DEFAULT_INTERVAL = 60
25GRACE_PERIOD = 5
26
27
28class TransportError(Exception):
29    pass
30
31
32class PeriodicMetricTask(PeriodicTask):
33    """Thread that periodically calls a given function.
34
35    :type interval: int or float
36    :param interval: Seconds between calls to the function.
37
38    :type function: function
39    :param function: The function to call.
40
41    :type args: list
42    :param args: The args passed in while calling `function`.
43
44    :type kwargs: dict
45    :param args: The kwargs passed in while calling `function`.
46
47    :type name: str
48    :param name: The source of the worker. Used for naming.
49    """
50
51    daemon = True
52
53    def __init__(
54        self,
55        interval=None,
56        function=None,
57        args=None,
58        kwargs=None,
59        name=None
60    ):
61        if interval is None:
62            interval = DEFAULT_INTERVAL
63
64        self.func = function
65        self.args = args
66        self.kwargs = kwargs
67
68        def func(*aa, **kw):
69            try:
70                return self.func(*aa, **kw)
71            except TransportError as ex:
72                logger.exception(ex)
73                self.cancel()
74            except Exception as ex:
75                logger.exception("Error handling metric export: {}".format(ex))
76
77        super(PeriodicMetricTask, self).__init__(
78            interval, func, args, kwargs, '{} Worker'.format(name)
79        )
80
81    def run(self):
82        # Indicate that this thread is an exporter thread.
83        # Used to suppress tracking of requests in this thread
84        execution_context.set_is_exporter(True)
85        super(PeriodicMetricTask, self).run()
86
87    def close(self):
88        try:
89            # Suppress request tracking on flush
90            execution_context.set_is_exporter(True)
91            self.func(*self.args, **self.kwargs)
92            execution_context.set_is_exporter(False)
93        except Exception as ex:
94            logger.exception("Error handling metric flush: {}".format(ex))
95        self.cancel()
96
97
98def get_exporter_thread(metric_producers, exporter, interval=None):
99    """Get a running task that periodically exports metrics.
100
101    Get a `PeriodicTask` that periodically calls:
102
103        export(itertools.chain(*all_gets))
104
105    where all_gets is the concatenation of all metrics produced by the metric
106    producers in metric_producers, each calling metric_producer.get_metrics()
107
108    :type metric_producers:
109    list(:class:`opencensus.metrics.export.metric_producer.MetricProducer`)
110    :param metric_producers: The list of metric producers to use to get metrics
111
112    :type exporter: :class:`opencensus.stats.base_exporter.MetricsExporter`
113    :param exporter: The exporter to use to export metrics.
114
115    :type interval: int or float
116    :param interval: Seconds between export calls.
117
118    :rtype: :class:`PeriodicTask`
119    :return: A running thread responsible calling the exporter.
120
121    """
122    weak_gets = [utils.get_weakref(producer.get_metrics)
123                 for producer in metric_producers]
124    weak_export = utils.get_weakref(exporter.export_metrics)
125
126    def export_all():
127        all_gets = []
128        for weak_get in weak_gets:
129            get = weak_get()
130            if get is None:
131                raise TransportError("Metric producer is not available")
132            all_gets.append(get())
133        export = weak_export()
134        if export is None:
135            raise TransportError("Metric exporter is not available")
136
137        export(itertools.chain(*all_gets))
138
139    tt = PeriodicMetricTask(
140        interval,
141        export_all,
142        name=exporter.__class__.__name__
143    )
144    tt.start()
145    return tt
146