1from __future__ import absolute_import 2 3import logging 4import threading 5 6from kafka.metrics.metrics_reporter import AbstractMetricsReporter 7 8logger = logging.getLogger(__name__) 9 10 11class DictReporter(AbstractMetricsReporter): 12 """A basic dictionary based metrics reporter. 13 14 Store all metrics in a two level dictionary of category > name > metric. 15 """ 16 def __init__(self, prefix=''): 17 self._lock = threading.Lock() 18 self._prefix = prefix if prefix else '' # never allow None 19 self._store = {} 20 21 def snapshot(self): 22 """ 23 Return a nested dictionary snapshot of all metrics and their 24 values at this time. Example: 25 { 26 'category': { 27 'metric1_name': 42.0, 28 'metric2_name': 'foo' 29 } 30 } 31 """ 32 return dict((category, dict((name, metric.value()) 33 for name, metric in list(metrics.items()))) 34 for category, metrics in 35 list(self._store.items())) 36 37 def init(self, metrics): 38 for metric in metrics: 39 self.metric_change(metric) 40 41 def metric_change(self, metric): 42 with self._lock: 43 category = self.get_category(metric) 44 if category not in self._store: 45 self._store[category] = {} 46 self._store[category][metric.metric_name.name] = metric 47 48 def metric_removal(self, metric): 49 with self._lock: 50 category = self.get_category(metric) 51 metrics = self._store.get(category, {}) 52 removed = metrics.pop(metric.metric_name.name, None) 53 if not metrics: 54 self._store.pop(category, None) 55 return removed 56 57 def get_category(self, metric): 58 """ 59 Return a string category for the metric. 60 61 The category is made up of this reporter's prefix and the 62 metric's group and tags. 63 64 Examples: 65 prefix = 'foo', group = 'bar', tags = {'a': 1, 'b': 2} 66 returns: 'foo.bar.a=1,b=2' 67 68 prefix = 'foo', group = 'bar', tags = None 69 returns: 'foo.bar' 70 71 prefix = None, group = 'bar', tags = None 72 returns: 'bar' 73 """ 74 tags = ','.join('%s=%s' % (k, v) for k, v in 75 sorted(metric.metric_name.tags.items())) 76 return '.'.join(x for x in 77 [self._prefix, metric.metric_name.group, tags] if x) 78 79 def configure(self, configs): 80 pass 81 82 def close(self): 83 pass 84