1# Copyright 2019, OpenCensus Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import itertools 16import logging 17 18from opencensus.common import utils 19from opencensus.common.schedule import PeriodicTask 20from opencensus.trace import execution_context 21 22logger = logging.getLogger(__name__) 23 24DEFAULT_INTERVAL = 60 25GRACE_PERIOD = 5 26 27 28class TransportError(Exception): 29 pass 30 31 32class PeriodicMetricTask(PeriodicTask): 33 """Thread that periodically calls a given function. 34 35 :type interval: int or float 36 :param interval: Seconds between calls to the function. 37 38 :type function: function 39 :param function: The function to call. 40 41 :type args: list 42 :param args: The args passed in while calling `function`. 43 44 :type kwargs: dict 45 :param args: The kwargs passed in while calling `function`. 46 47 :type name: str 48 :param name: The source of the worker. Used for naming. 49 """ 50 51 daemon = True 52 53 def __init__( 54 self, 55 interval=None, 56 function=None, 57 args=None, 58 kwargs=None, 59 name=None 60 ): 61 if interval is None: 62 interval = DEFAULT_INTERVAL 63 64 self.func = function 65 self.args = args 66 self.kwargs = kwargs 67 68 def func(*aa, **kw): 69 try: 70 return self.func(*aa, **kw) 71 except TransportError as ex: 72 logger.exception(ex) 73 self.cancel() 74 except Exception as ex: 75 logger.exception("Error handling metric export: {}".format(ex)) 76 77 super(PeriodicMetricTask, self).__init__( 78 interval, func, args, kwargs, '{} Worker'.format(name) 79 ) 80 81 def run(self): 82 # Indicate that this thread is an exporter thread. 83 # Used to suppress tracking of requests in this thread 84 execution_context.set_is_exporter(True) 85 super(PeriodicMetricTask, self).run() 86 87 def close(self): 88 try: 89 # Suppress request tracking on flush 90 execution_context.set_is_exporter(True) 91 self.func(*self.args, **self.kwargs) 92 execution_context.set_is_exporter(False) 93 except Exception as ex: 94 logger.exception("Error handling metric flush: {}".format(ex)) 95 self.cancel() 96 97 98def get_exporter_thread(metric_producers, exporter, interval=None): 99 """Get a running task that periodically exports metrics. 100 101 Get a `PeriodicTask` that periodically calls: 102 103 export(itertools.chain(*all_gets)) 104 105 where all_gets is the concatenation of all metrics produced by the metric 106 producers in metric_producers, each calling metric_producer.get_metrics() 107 108 :type metric_producers: 109 list(:class:`opencensus.metrics.export.metric_producer.MetricProducer`) 110 :param metric_producers: The list of metric producers to use to get metrics 111 112 :type exporter: :class:`opencensus.stats.base_exporter.MetricsExporter` 113 :param exporter: The exporter to use to export metrics. 114 115 :type interval: int or float 116 :param interval: Seconds between export calls. 117 118 :rtype: :class:`PeriodicTask` 119 :return: A running thread responsible calling the exporter. 120 121 """ 122 weak_gets = [utils.get_weakref(producer.get_metrics) 123 for producer in metric_producers] 124 weak_export = utils.get_weakref(exporter.export_metrics) 125 126 def export_all(): 127 all_gets = [] 128 for weak_get in weak_gets: 129 get = weak_get() 130 if get is None: 131 raise TransportError("Metric producer is not available") 132 all_gets.append(get()) 133 export = weak_export() 134 if export is None: 135 raise TransportError("Metric exporter is not available") 136 137 export(itertools.chain(*all_gets)) 138 139 tt = PeriodicMetricTask( 140 interval, 141 export_all, 142 name=exporter.__class__.__name__ 143 ) 144 tt.start() 145 return tt 146