1import logging 2 3from distributed.http.prometheus import PrometheusCollector 4from distributed.http.utils import RequestHandler 5 6 7class WorkerMetricCollector(PrometheusCollector): 8 def __init__(self, server): 9 super().__init__(server) 10 self.logger = logging.getLogger("distributed.dask_worker") 11 self.subsystem = "worker" 12 self.crick_available = True 13 try: 14 import crick # noqa: F401 15 except ImportError: 16 self.crick_available = False 17 self.logger.info( 18 "Not all prometheus metrics available are exported. Digest-based metrics require crick to be installed" 19 ) 20 21 def collect(self): 22 from prometheus_client.core import GaugeMetricFamily 23 24 tasks = GaugeMetricFamily( 25 self.build_name("tasks"), 26 "Number of tasks at worker.", 27 labels=["state"], 28 ) 29 tasks.add_metric(["stored"], len(self.server.data)) 30 tasks.add_metric(["executing"], self.server.executing_count) 31 tasks.add_metric(["ready"], len(self.server.ready)) 32 tasks.add_metric(["waiting"], self.server.waiting_for_data_count) 33 yield tasks 34 35 yield GaugeMetricFamily( 36 self.build_name("concurrent_fetch_requests"), 37 "Number of open fetch requests to other workers.", 38 value=len(self.server.in_flight_workers), 39 ) 40 41 yield GaugeMetricFamily( 42 self.build_name("threads"), 43 "Number of worker threads.", 44 value=self.server.nthreads, 45 ) 46 47 yield GaugeMetricFamily( 48 self.build_name("latency_seconds"), 49 "Latency of worker connection.", 50 value=self.server.latency, 51 ) 52 53 # all metrics using digests require crick to be installed 54 # the following metrics will export NaN, if the corresponding digests are None 55 if self.crick_available: 56 yield GaugeMetricFamily( 57 self.build_name("tick_duration_median_seconds"), 58 "Median tick duration at worker.", 59 value=self.server.digests["tick-duration"].components[1].quantile(50), 60 ) 61 62 yield GaugeMetricFamily( 63 self.build_name("task_duration_median_seconds"), 64 "Median task runtime at worker.", 65 value=self.server.digests["task-duration"].components[1].quantile(50), 66 ) 67 68 yield GaugeMetricFamily( 69 self.build_name("transfer_bandwidth_median_bytes"), 70 "Bandwidth for transfer at worker in Bytes.", 71 value=self.server.digests["transfer-bandwidth"] 72 .components[1] 73 .quantile(50), 74 ) 75 76 77class PrometheusHandler(RequestHandler): 78 _initialized = False 79 80 def __init__(self, *args, **kwargs): 81 import prometheus_client 82 83 super().__init__(*args, **kwargs) 84 85 if PrometheusHandler._initialized: 86 return 87 88 prometheus_client.REGISTRY.register(WorkerMetricCollector(self.server)) 89 90 PrometheusHandler._initialized = True 91 92 def get(self): 93 import prometheus_client 94 95 self.write(prometheus_client.generate_latest()) 96 self.set_header("Content-Type", "text/plain; version=0.0.4") 97