1import logging
2
3from distributed.http.prometheus import PrometheusCollector
4from distributed.http.utils import RequestHandler
5
6
7class WorkerMetricCollector(PrometheusCollector):
8    def __init__(self, server):
9        super().__init__(server)
10        self.logger = logging.getLogger("distributed.dask_worker")
11        self.subsystem = "worker"
12        self.crick_available = True
13        try:
14            import crick  # noqa: F401
15        except ImportError:
16            self.crick_available = False
17            self.logger.info(
18                "Not all prometheus metrics available are exported. Digest-based metrics require crick to be installed"
19            )
20
21    def collect(self):
22        from prometheus_client.core import GaugeMetricFamily
23
24        tasks = GaugeMetricFamily(
25            self.build_name("tasks"),
26            "Number of tasks at worker.",
27            labels=["state"],
28        )
29        tasks.add_metric(["stored"], len(self.server.data))
30        tasks.add_metric(["executing"], self.server.executing_count)
31        tasks.add_metric(["ready"], len(self.server.ready))
32        tasks.add_metric(["waiting"], self.server.waiting_for_data_count)
33        yield tasks
34
35        yield GaugeMetricFamily(
36            self.build_name("concurrent_fetch_requests"),
37            "Number of open fetch requests to other workers.",
38            value=len(self.server.in_flight_workers),
39        )
40
41        yield GaugeMetricFamily(
42            self.build_name("threads"),
43            "Number of worker threads.",
44            value=self.server.nthreads,
45        )
46
47        yield GaugeMetricFamily(
48            self.build_name("latency_seconds"),
49            "Latency of worker connection.",
50            value=self.server.latency,
51        )
52
53        # all metrics using digests require crick to be installed
54        # the following metrics will export NaN, if the corresponding digests are None
55        if self.crick_available:
56            yield GaugeMetricFamily(
57                self.build_name("tick_duration_median_seconds"),
58                "Median tick duration at worker.",
59                value=self.server.digests["tick-duration"].components[1].quantile(50),
60            )
61
62            yield GaugeMetricFamily(
63                self.build_name("task_duration_median_seconds"),
64                "Median task runtime at worker.",
65                value=self.server.digests["task-duration"].components[1].quantile(50),
66            )
67
68            yield GaugeMetricFamily(
69                self.build_name("transfer_bandwidth_median_bytes"),
70                "Bandwidth for transfer at worker in Bytes.",
71                value=self.server.digests["transfer-bandwidth"]
72                .components[1]
73                .quantile(50),
74            )
75
76
77class PrometheusHandler(RequestHandler):
78    _initialized = False
79
80    def __init__(self, *args, **kwargs):
81        import prometheus_client
82
83        super().__init__(*args, **kwargs)
84
85        if PrometheusHandler._initialized:
86            return
87
88        prometheus_client.REGISTRY.register(WorkerMetricCollector(self.server))
89
90        PrometheusHandler._initialized = True
91
92    def get(self):
93        import prometheus_client
94
95        self.write(prometheus_client.generate_latest())
96        self.set_header("Content-Type", "text/plain; version=0.0.4")
97