1import logging
2import math
3import operator
4import os
5from collections import defaultdict
6from numbers import Number
7
8import numpy as np
9from bokeh.core.properties import without_property_validation
10from bokeh.io import curdoc
11from bokeh.layouts import column, row
12from bokeh.models import (
13    AdaptiveTicker,
14    Arrow,
15    BasicTicker,
16    BoxSelectTool,
17    BoxZoomTool,
18    CDSView,
19    ColorBar,
20    ColumnDataSource,
21    DataRange1d,
22    GroupFilter,
23    HoverTool,
24    NumberFormatter,
25    NumeralTickFormatter,
26    OpenURL,
27    Panel,
28    PanTool,
29    Range1d,
30    ResetTool,
31    Tabs,
32    TapTool,
33    Title,
34    VeeHead,
35    WheelZoomTool,
36    value,
37)
38from bokeh.models.widgets import DataTable, TableColumn
39from bokeh.models.widgets.markups import Div
40from bokeh.palettes import Viridis11
41from bokeh.plotting import figure
42from bokeh.themes import Theme
43from bokeh.transform import cumsum, factor_cmap, linear_cmap
44from tlz import curry, pipe
45from tlz.curried import concat, groupby, map
46from tornado import escape
47
48import dask
49from dask import config
50from dask.utils import format_bytes, format_time, key_split, parse_timedelta
51
52from distributed.dashboard.components import add_periodic_callback
53from distributed.dashboard.components.shared import (
54    DashboardComponent,
55    ProfileServer,
56    ProfileTimePlot,
57    SystemMonitor,
58)
59from distributed.dashboard.utils import BOKEH_VERSION, PROFILING, transpose, update
60from distributed.diagnostics.graph_layout import GraphLayout
61from distributed.diagnostics.progress_stream import color_of, progress_quads
62from distributed.diagnostics.task_stream import TaskStreamPlugin
63from distributed.diagnostics.task_stream import color_of as ts_color_of
64from distributed.diagnostics.task_stream import colors as ts_color_lookup
65from distributed.metrics import time
66from distributed.utils import Log, log_errors
67
68if dask.config.get("distributed.dashboard.export-tool"):
69    from distributed.dashboard.export_tool import ExportTool
70else:
71    ExportTool = None  # type: ignore
72
73logger = logging.getLogger(__name__)
74
75from jinja2 import Environment, FileSystemLoader
76
77env = Environment(
78    loader=FileSystemLoader(
79        os.path.join(os.path.dirname(__file__), "..", "..", "http", "templates")
80    )
81)
82
83BOKEH_THEME = Theme(
84    filename=os.path.join(os.path.dirname(__file__), "..", "theme.yaml")
85)
86TICKS_1024 = {"base": 1024, "mantissas": [1, 2, 4, 8, 16, 32, 64, 128, 256, 512]}
87XLABEL_ORIENTATION = -math.pi / 9  # slanted downwards 20 degrees
88
89
90logos_dict = {
91    "numpy": "statics/images/numpy.png",
92    "pandas": "statics/images/pandas.png",
93    "builtins": "statics/images/python.png",
94}
95
96
97class Occupancy(DashboardComponent):
98    """Occupancy (in time) per worker"""
99
100    def __init__(self, scheduler, **kwargs):
101        with log_errors():
102            self.scheduler = scheduler
103            self.source = ColumnDataSource(
104                {
105                    "occupancy": [0, 0],
106                    "worker": ["a", "b"],
107                    "x": [0.0, 0.1],
108                    "y": [1, 2],
109                    "ms": [1, 2],
110                    "color": ["red", "blue"],
111                    "escaped_worker": ["a", "b"],
112                }
113            )
114
115            self.root = figure(
116                title="Occupancy",
117                tools="",
118                toolbar_location="above",
119                id="bk-occupancy-plot",
120                x_axis_type="datetime",
121                min_border_bottom=50,
122                **kwargs,
123            )
124            rect = self.root.rect(
125                source=self.source, x="x", width="ms", y="y", height=0.9, color="color"
126            )
127            rect.nonselection_glyph = None
128
129            self.root.xaxis.minor_tick_line_alpha = 0
130            self.root.yaxis.visible = False
131            self.root.ygrid.visible = False
132            # fig.xaxis[0].formatter = NumeralTickFormatter(format='0.0s')
133            self.root.x_range.start = 0
134
135            tap = TapTool(callback=OpenURL(url="./info/worker/@escaped_worker.html"))
136
137            hover = HoverTool()
138            hover.tooltips = "@worker : @occupancy s."
139            hover.point_policy = "follow_mouse"
140            self.root.add_tools(hover, tap)
141
142    @without_property_validation
143    def update(self):
144        with log_errors():
145            workers = self.scheduler.workers.values()
146
147            y = list(range(len(workers)))
148            occupancy = [ws.occupancy for ws in workers]
149            ms = [occ * 1000 for occ in occupancy]
150            x = [occ / 500 for occ in occupancy]
151            total = sum(occupancy)
152            color = []
153            for ws in workers:
154                if ws in self.scheduler.idle:
155                    color.append("red")
156                elif ws in self.scheduler.saturated:
157                    color.append("green")
158                else:
159                    color.append("blue")
160
161            if total:
162                self.root.title.text = (
163                    f"Occupancy -- total time: {format_time(total)} "
164                    f"wall time: {format_time(total / self.scheduler.total_nthreads)}"
165                )
166            else:
167                self.root.title.text = "Occupancy"
168
169            if occupancy:
170                result = {
171                    "occupancy": occupancy,
172                    "worker": [ws.address for ws in workers],
173                    "ms": ms,
174                    "color": color,
175                    "escaped_worker": [escape.url_escape(ws.address) for ws in workers],
176                    "x": x,
177                    "y": y,
178                }
179
180                update(self.source, result)
181
182
183class ProcessingHistogram(DashboardComponent):
184    """How many tasks are on each worker"""
185
186    def __init__(self, scheduler, **kwargs):
187        with log_errors():
188            self.last = 0
189            self.scheduler = scheduler
190            self.source = ColumnDataSource(
191                {"left": [1, 2], "right": [10, 10], "top": [0, 0]}
192            )
193
194            self.root = figure(
195                title="Tasks Processing (count)",
196                id="bk-nprocessing-histogram-plot",
197                name="processing",
198                y_axis_label="frequency",
199                tools="",
200                **kwargs,
201            )
202
203            self.root.xaxis.minor_tick_line_alpha = 0
204            self.root.ygrid.visible = False
205
206            self.root.toolbar_location = None
207
208            self.root.quad(
209                source=self.source,
210                left="left",
211                right="right",
212                bottom=0,
213                top="top",
214                color="deepskyblue",
215                fill_alpha=0.5,
216            )
217
218    @without_property_validation
219    def update(self):
220        L = [len(ws.processing) for ws in self.scheduler.workers.values()]
221        counts, x = np.histogram(L, bins=40)
222        self.source.data.update({"left": x[:-1], "right": x[1:], "top": counts})
223
224
225def _memory_color(current: int, limit: int) -> str:
226    """Dynamic color used by WorkersMemory and ClusterMemory"""
227    if limit and current > limit:
228        return "red"
229    if limit and current > limit / 2:
230        return "orange"
231    return "blue"
232
233
234class ClusterMemory(DashboardComponent):
235    """Total memory usage on the cluster"""
236
237    def __init__(self, scheduler, width=600, **kwargs):
238        with log_errors():
239            self.scheduler = scheduler
240            self.source = ColumnDataSource(
241                {
242                    "width": [0] * 4,
243                    "x": [0] * 4,
244                    "y": [0] * 4,
245                    "color": ["blue", "blue", "blue", "grey"],
246                    "alpha": [1, 0.7, 0.4, 1],
247                    "proc_memory": [0] * 4,
248                    "managed": [0] * 4,
249                    "unmanaged_old": [0] * 4,
250                    "unmanaged_recent": [0] * 4,
251                    "spilled": [0] * 4,
252                }
253            )
254
255            self.root = figure(
256                title="Bytes stored on cluster",
257                tools="",
258                id="bk-cluster-memory-plot",
259                width=int(width / 2),
260                name="cluster_memory",
261                min_border_bottom=50,
262                **kwargs,
263            )
264            rect = self.root.rect(
265                source=self.source,
266                x="x",
267                y="y",
268                width="width",
269                height=0.9,
270                color="color",
271                alpha="alpha",
272            )
273            rect.nonselection_glyph = None
274
275            self.root.axis[0].ticker = BasicTicker(**TICKS_1024)
276            self.root.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b")
277            self.root.xaxis.major_label_orientation = XLABEL_ORIENTATION
278            self.root.xaxis.minor_tick_line_alpha = 0
279            self.root.x_range = Range1d(start=0)
280            self.root.yaxis.visible = False
281            self.root.ygrid.visible = False
282
283            self.root.toolbar_location = None
284            self.root.yaxis.visible = False
285
286            hover = HoverTool(
287                point_policy="follow_mouse",
288                tooltips="""
289                            <div>
290                                <span style="font-size: 12px; font-weight: bold;">Process memory (RSS):</span>&nbsp;
291                                <span style="font-size: 10px; font-family: Monaco, monospace;">@proc_memory{0.00 b}</span>
292                            </div>
293                            <div style="margin-left: 1em;">
294                                <span style="font-size: 12px; font-weight: bold;">Managed:</span>&nbsp;
295                                <span style="font-size: 10px; font-family: Monaco, monospace;">@managed{0.00 b}</span>
296                            </div>
297                            <div style="margin-left: 1em;">
298                                <span style="font-size: 12px; font-weight: bold;">Unmanaged (old):</span>&nbsp;
299                                <span style="font-size: 10px; font-family: Monaco, monospace;">@unmanaged_old{0.00 b}</span>
300                            </div>
301                            <div style="margin-left: 1em;">
302                                <span style="font-size: 12px; font-weight: bold;">Unmanaged (recent):</span>&nbsp;
303                                <span style="font-size: 10px; font-family: Monaco, monospace;">@unmanaged_recent{0.00 b}</span>
304                            </div>
305                            <div>
306                                <span style="font-size: 12px; font-weight: bold;">Spilled to disk:</span>&nbsp;
307                                <span style="font-size: 10px; font-family: Monaco, monospace;">@spilled{0.00 b}</span>
308                            </div>
309                            """,
310            )
311            self.root.add_tools(hover)
312
313    @without_property_validation
314    def update(self):
315        with log_errors():
316            limit = sum(ws.memory_limit for ws in self.scheduler.workers.values())
317            meminfo = self.scheduler.memory
318            color = _memory_color(meminfo.process, limit)
319
320            width = [
321                meminfo.managed_in_memory,
322                meminfo.unmanaged_old,
323                meminfo.unmanaged_recent,
324                meminfo.managed_spilled,
325            ]
326
327            result = {
328                "width": width,
329                "x": [sum(width[:i]) + w / 2 for i, w in enumerate(width)],
330                "color": [color, color, color, "grey"],
331                "proc_memory": [meminfo.process] * 4,
332                "managed": [meminfo.managed_in_memory] * 4,
333                "unmanaged_old": [meminfo.unmanaged_old] * 4,
334                "unmanaged_recent": [meminfo.unmanaged_recent] * 4,
335                "spilled": [meminfo.managed_spilled] * 4,
336            }
337
338            x_end = max(limit, meminfo.process + meminfo.managed_spilled)
339            self.root.x_range.end = x_end
340
341            title = f"Bytes stored: {format_bytes(meminfo.process)}"
342            if meminfo.managed_spilled:
343                title += f" + {format_bytes(meminfo.managed_spilled)} spilled to disk"
344            self.root.title.text = title
345
346            update(self.source, result)
347
348
349class WorkersMemory(DashboardComponent):
350    """Memory usage for single workers"""
351
352    def __init__(self, scheduler, width=600, **kwargs):
353        with log_errors():
354            self.scheduler = scheduler
355            self.source = ColumnDataSource(
356                {
357                    "width": [],
358                    "x": [],
359                    "y": [],
360                    "color": [],
361                    "alpha": [],
362                    "worker": [],
363                    "escaped_worker": [],
364                    "proc_memory": [],
365                    "managed": [],
366                    "unmanaged_old": [],
367                    "unmanaged_recent": [],
368                    "spilled": [],
369                }
370            )
371
372            self.root = figure(
373                title="Bytes stored per worker",
374                tools="",
375                id="bk-workers-memory-plot",
376                width=int(width / 2),
377                name="workers_memory",
378                min_border_bottom=50,
379                **kwargs,
380            )
381            rect = self.root.rect(
382                source=self.source,
383                x="x",
384                y="y",
385                width="width",
386                height=0.9,
387                color="color",
388                fill_alpha="alpha",
389                line_width=0,
390            )
391            rect.nonselection_glyph = None
392
393            self.root.axis[0].ticker = BasicTicker(**TICKS_1024)
394            self.root.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b")
395            self.root.xaxis.major_label_orientation = XLABEL_ORIENTATION
396            self.root.xaxis.minor_tick_line_alpha = 0
397            self.root.x_range = Range1d(start=0)
398            self.root.yaxis.visible = False
399            self.root.ygrid.visible = False
400
401            tap = TapTool(callback=OpenURL(url="./info/worker/@escaped_worker.html"))
402            self.root.add_tools(tap)
403
404            self.root.toolbar_location = None
405            self.root.yaxis.visible = False
406
407            hover = HoverTool(
408                point_policy="follow_mouse",
409                tooltips="""
410                            <div>
411                                <span style="font-size: 12px; font-weight: bold;">Worker:</span>&nbsp;
412                                <span style="font-size: 10px; font-family: Monaco, monospace;">@worker</span>
413                            </div>
414                            <div>
415                                <span style="font-size: 12px; font-weight: bold;">Process memory (RSS):</span>&nbsp;
416                                <span style="font-size: 10px; font-family: Monaco, monospace;">@proc_memory{0.00 b}</span>
417                            </div>
418                            <div style="margin-left: 1em;">
419                                <span style="font-size: 12px; font-weight: bold;">Managed:</span>&nbsp;
420                                <span style="font-size: 10px; font-family: Monaco, monospace;">@managed{0.00 b}</span>
421                            </div>
422                            <div style="margin-left: 1em;">
423                                <span style="font-size: 12px; font-weight: bold;">Unmanaged (old):</span>&nbsp;
424                                <span style="font-size: 10px; font-family: Monaco, monospace;">@unmanaged_old{0.00 b}</span>
425                            </div>
426                            <div style="margin-left: 1em;">
427                                <span style="font-size: 12px; font-weight: bold;">Unmanaged (recent):</span>&nbsp;
428                                <span style="font-size: 10px; font-family: Monaco, monospace;">@unmanaged_recent{0.00 b}</span>
429                            </div>
430                            <div>
431                                <span style="font-size: 12px; font-weight: bold;">Spilled to disk:</span>&nbsp;
432                                <span style="font-size: 10px; font-family: Monaco, monospace;">@spilled{0.00 b}</span>
433                            </div>
434                            """,
435            )
436            self.root.add_tools(hover)
437
438    @without_property_validation
439    def update(self):
440        def quadlist(i) -> list:
441            out = []
442            for ii in i:
443                out += [ii, ii, ii, ii]
444            return out
445
446        with log_errors():
447            workers = self.scheduler.workers.values()
448
449            width = []
450            x = []
451            color = []
452            max_limit = 0
453            procmemory = []
454            managed = []
455            spilled = []
456            unmanaged_old = []
457            unmanaged_recent = []
458
459            for ws in workers:
460                meminfo = ws.memory
461                limit = getattr(ws, "memory_limit", 0)
462                max_limit = max(
463                    max_limit, limit, meminfo.process + meminfo.managed_spilled
464                )
465                color_i = _memory_color(meminfo.process, limit)
466
467                width += [
468                    meminfo.managed_in_memory,
469                    meminfo.unmanaged_old,
470                    meminfo.unmanaged_recent,
471                    meminfo.managed_spilled,
472                ]
473                x += [sum(width[-4:i]) + width[i] / 2 for i in range(-4, 0)]
474                color += [color_i, color_i, color_i, "grey"]
475
476                # memory info
477                procmemory.append(meminfo.process)
478                managed.append(meminfo.managed_in_memory)
479                unmanaged_old.append(meminfo.unmanaged_old)
480                unmanaged_recent.append(meminfo.unmanaged_recent)
481                spilled.append(meminfo.managed_spilled)
482
483            result = {
484                "width": width,
485                "x": x,
486                "color": color,
487                "alpha": [1, 0.7, 0.4, 1] * len(workers),
488                "worker": quadlist(ws.address for ws in workers),
489                "escaped_worker": quadlist(
490                    escape.url_escape(ws.address) for ws in workers
491                ),
492                "y": quadlist(range(len(workers))),
493                "proc_memory": quadlist(procmemory),
494                "managed": quadlist(managed),
495                "unmanaged_old": quadlist(unmanaged_old),
496                "unmanaged_recent": quadlist(unmanaged_recent),
497                "spilled": quadlist(spilled),
498            }
499            # Remove rectangles with width=0
500            result = {
501                k: [vi for vi, w in zip(v, width) if w] for k, v in result.items()
502            }
503
504            self.root.x_range.end = max_limit
505            update(self.source, result)
506
507
508class WorkersMemoryHistogram(DashboardComponent):
509    """Histogram of memory usage, showing how many workers there are in each bucket of
510    usage. Replaces the per-worker graph when there are >= 50 workers.
511    """
512
513    def __init__(self, scheduler, **kwargs):
514        with log_errors():
515            self.last = 0
516            self.scheduler = scheduler
517            self.source = ColumnDataSource(
518                {"left": [1, 2], "right": [10, 10], "top": [0, 0]}
519            )
520
521            self.root = figure(
522                title="Bytes stored per worker",
523                name="workers_memory",
524                id="bk-workers-memory-histogram-plot",
525                y_axis_label="frequency",
526                tools="",
527                **kwargs,
528            )
529
530            self.root.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b")
531            self.root.xaxis.ticker = AdaptiveTicker(**TICKS_1024)
532            self.root.xaxis.major_label_orientation = XLABEL_ORIENTATION
533
534            self.root.xaxis.minor_tick_line_alpha = 0
535            self.root.ygrid.visible = False
536
537            self.root.toolbar_location = None
538
539            self.root.quad(
540                source=self.source,
541                left="left",
542                right="right",
543                bottom=0,
544                top="top",
545                color="deepskyblue",
546                fill_alpha=0.5,
547            )
548
549    @without_property_validation
550    def update(self):
551        nbytes = np.asarray(
552            [ws.metrics["memory"] for ws in self.scheduler.workers.values()]
553        )
554        counts, x = np.histogram(nbytes, bins=40)
555        d = {"left": x[:-1], "right": x[1:], "top": counts}
556        update(self.source, d)
557
558
559class BandwidthTypes(DashboardComponent):
560    """Bar chart showing bandwidth per type"""
561
562    def __init__(self, scheduler, **kwargs):
563        with log_errors():
564            self.last = 0
565            self.scheduler = scheduler
566            self.source = ColumnDataSource(
567                {
568                    "bandwidth": [1, 2],
569                    "bandwidth-half": [0.5, 1],
570                    "type": ["a", "b"],
571                    "bandwidth_text": ["1", "2"],
572                }
573            )
574
575            self.root = figure(
576                title="Bandwidth by Type",
577                tools="",
578                id="bk-bandwidth-type-plot",
579                name="bandwidth_type_histogram",
580                y_range=["a", "b"],
581                **kwargs,
582            )
583            self.root.xaxis.major_label_orientation = -0.5
584            rect = self.root.rect(
585                source=self.source,
586                x="bandwidth-half",
587                y="type",
588                width="bandwidth",
589                height=0.9,
590                color="blue",
591            )
592            self.root.x_range.start = 0
593            self.root.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b")
594            self.root.xaxis.ticker = AdaptiveTicker(**TICKS_1024)
595            rect.nonselection_glyph = None
596
597            self.root.xaxis.minor_tick_line_alpha = 0
598            self.root.ygrid.visible = False
599
600            self.root.toolbar_location = None
601
602            hover = HoverTool()
603            hover.tooltips = "@type: @bandwidth_text / s"
604            hover.point_policy = "follow_mouse"
605            self.root.add_tools(hover)
606
607    @without_property_validation
608    def update(self):
609        with log_errors():
610            bw = self.scheduler.bandwidth_types
611            self.root.y_range.factors = list(sorted(bw))
612            result = {
613                "bandwidth": list(bw.values()),
614                "bandwidth-half": [b / 2 for b in bw.values()],
615                "type": list(bw.keys()),
616                "bandwidth_text": [format_bytes(x) for x in bw.values()],
617            }
618            self.root.title.text = "Bandwidth: " + format_bytes(
619                self.scheduler.bandwidth
620            )
621            update(self.source, result)
622
623
624class BandwidthWorkers(DashboardComponent):
625    """How many tasks are on each worker"""
626
627    def __init__(self, scheduler, **kwargs):
628        with log_errors():
629            self.last = 0
630            self.scheduler = scheduler
631            self.source = ColumnDataSource(
632                {
633                    "bandwidth": [1, 2],
634                    "source": ["a", "b"],
635                    "destination": ["a", "b"],
636                    "bandwidth_text": ["1", "2"],
637                }
638            )
639
640            values = [hex(x)[2:] for x in range(64, 256)][::-1]
641            mapper = linear_cmap(
642                field_name="bandwidth",
643                palette=["#" + x + x + "FF" for x in values],
644                low=0,
645                high=1,
646            )
647
648            self.root = figure(
649                title="Bandwidth by Worker",
650                tools="",
651                id="bk-bandwidth-worker-plot",
652                name="bandwidth_worker_heatmap",
653                x_range=["a", "b"],
654                y_range=["a", "b"],
655                **kwargs,
656            )
657            self.root.xaxis.major_label_orientation = XLABEL_ORIENTATION
658            self.root.rect(
659                source=self.source,
660                x="source",
661                y="destination",
662                color=mapper,
663                height=1,
664                width=1,
665            )
666
667            self.color_map = mapper["transform"]
668            color_bar = ColorBar(
669                color_mapper=self.color_map,
670                label_standoff=12,
671                border_line_color=None,
672                location=(0, 0),
673            )
674            color_bar.formatter = NumeralTickFormatter(format="0.0 b")
675            color_bar.ticker = AdaptiveTicker(**TICKS_1024)
676            self.root.add_layout(color_bar, "right")
677
678            self.root.toolbar_location = None
679
680            hover = HoverTool()
681            hover.tooltips = """
682            <div>
683                <p><b>Source:</b> @source </p>
684                <p><b>Destination:</b> @destination </p>
685                <p><b>Bandwidth:</b> @bandwidth_text / s</p>
686            </div>
687            """
688            hover.point_policy = "follow_mouse"
689            self.root.add_tools(hover)
690
691    @without_property_validation
692    def update(self):
693        with log_errors():
694            bw = self.scheduler.bandwidth_workers
695            if not bw:
696                return
697
698            def name(address):
699                try:
700                    ws = self.scheduler.workers[address]
701                except KeyError:
702                    return address
703                if ws.name is not None:
704                    return str(ws.name)
705                return address
706
707            x, y, value = zip(*((name(a), name(b), c) for (a, b), c in bw.items()))
708
709            self.color_map.high = max(value)
710
711            factors = list(sorted(set(x + y)))
712            self.root.x_range.factors = factors
713            self.root.y_range.factors = factors[::-1]
714
715            result = {
716                "source": x,
717                "destination": y,
718                "bandwidth": value,
719                "bandwidth_text": list(map(format_bytes, value)),
720            }
721            self.root.title.text = "Bandwidth: " + format_bytes(
722                self.scheduler.bandwidth
723            )
724            update(self.source, result)
725
726
727class WorkerNetworkBandwidth(DashboardComponent):
728    """Worker network bandwidth chart
729
730    Plots horizontal bars with the read_bytes and write_bytes worker state
731    """
732
733    def __init__(self, scheduler, **kwargs):
734        with log_errors():
735            self.scheduler = scheduler
736            self.source = ColumnDataSource(
737                {
738                    "y_read": [],
739                    "y_write": [],
740                    "x_read": [],
741                    "x_write": [],
742                    "x_read_disk": [],
743                    "x_write_disk": [],
744                }
745            )
746
747            self.bandwidth = figure(
748                title="Worker Network Bandwidth",
749                tools="",
750                id="bk-worker-net-bandwidth",
751                name="worker_network_bandwidth",
752                **kwargs,
753            )
754
755            # read_bytes
756            self.bandwidth.hbar(
757                y="y_read",
758                right="x_read",
759                line_color=None,
760                left=0,
761                height=0.5,
762                fill_color="red",
763                legend_label="read",
764                source=self.source,
765            )
766
767            # write_bytes
768            self.bandwidth.hbar(
769                y="y_write",
770                right="x_write",
771                line_color=None,
772                left=0,
773                height=0.5,
774                fill_color="blue",
775                legend_label="write",
776                source=self.source,
777            )
778
779            self.bandwidth.axis[0].ticker = BasicTicker(**TICKS_1024)
780            self.bandwidth.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b")
781            self.bandwidth.xaxis.major_label_orientation = XLABEL_ORIENTATION
782            self.bandwidth.xaxis.minor_tick_line_alpha = 0
783            self.bandwidth.x_range = Range1d(start=0)
784            self.bandwidth.yaxis.visible = False
785            self.bandwidth.ygrid.visible = False
786            self.bandwidth.toolbar_location = None
787
788            self.disk = figure(
789                title="Workers Disk",
790                tools="",
791                id="bk-workers-disk",
792                name="worker_disk",
793                **kwargs,
794            )
795
796            # read_bytes_disk
797            self.disk.hbar(
798                y="y_read",
799                right="x_read_disk",
800                line_color=None,
801                left=0,
802                height=0.5,
803                fill_color="red",
804                legend_label="read",
805                source=self.source,
806            )
807
808            # write_bytes_disk
809            self.disk.hbar(
810                y="y_write",
811                right="x_write_disk",
812                line_color=None,
813                left=0,
814                height=0.5,
815                fill_color="blue",
816                legend_label="write",
817                source=self.source,
818            )
819
820            self.disk.axis[0].ticker = BasicTicker(**TICKS_1024)
821            self.disk.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b")
822            self.disk.xaxis.major_label_orientation = XLABEL_ORIENTATION
823            self.disk.xaxis.minor_tick_line_alpha = 0
824            self.disk.x_range = Range1d(start=0)
825            self.disk.yaxis.visible = False
826            self.disk.ygrid.visible = False
827            self.disk.toolbar_location = None
828
829    @without_property_validation
830    def update(self):
831        with log_errors():
832            workers = self.scheduler.workers.values()
833
834            h = 0.1
835            y_read = [i + 0.75 + i * h for i in range(len(workers))]
836            y_write = [i + 0.25 + i * h for i in range(len(workers))]
837
838            x_read = []
839            x_write = []
840            x_read_disk = []
841            x_write_disk = []
842
843            for ws in workers:
844                x_read.append(ws.metrics["read_bytes"])
845                x_write.append(ws.metrics["write_bytes"])
846                x_read_disk.append(ws.metrics["read_bytes_disk"])
847                x_write_disk.append(ws.metrics["write_bytes_disk"])
848
849            if self.scheduler.workers:
850                self.bandwidth.x_range.end = max(
851                    max(x_read),
852                    max(x_write),
853                    100_000_000,
854                    0.95 * self.bandwidth.x_range.end,
855                )
856
857                self.disk.x_range.end = max(
858                    max(x_read_disk),
859                    max(x_write_disk),
860                    100_000_000,
861                    0.95 * self.disk.x_range.end,
862                )
863            else:
864                self.bandwidth.x_range.end = 100_000_000
865                self.disk.x_range.end = 100_000_000
866
867            result = {
868                "y_read": y_read,
869                "y_write": y_write,
870                "x_read": x_read,
871                "x_write": x_write,
872                "x_read_disk": x_read_disk,
873                "x_write_disk": x_write_disk,
874            }
875
876            update(self.source, result)
877
878
879class SystemTimeseries(DashboardComponent):
880    """Timeseries for worker network bandwidth, cpu, memory and disk.
881
882    bandwidth: plots the average of read_bytes and write_bytes for the workers
883    as a function of time.
884    cpu: plots the average of cpu for the workers as a function of time.
885    memory: plots the average of memory for the workers as a function of time.
886    disk: plots the average of read_bytes_disk and write_bytes_disk for the workers
887    as a function of time.
888
889    The metrics plotted come from the aggregation of
890    from ws.metrics["val"] for ws in scheduler.workers.values() divided by nuber of workers.
891    """
892
893    def __init__(self, scheduler, **kwargs):
894        with log_errors():
895            self.scheduler = scheduler
896            self.source = ColumnDataSource(
897                {
898                    "time": [],
899                    "read_bytes": [],
900                    "write_bytes": [],
901                    "cpu": [],
902                    "memory": [],
903                    "read_bytes_disk": [],
904                    "write_bytes_disk": [],
905                }
906            )
907
908            update(self.source, self.get_data())
909
910            x_range = DataRange1d(follow="end", follow_interval=20000, range_padding=0)
911            tools = "reset, xpan, xwheel_zoom"
912
913            self.bandwidth = figure(
914                title="Workers Network Bandwidth",
915                x_axis_type="datetime",
916                tools=tools,
917                x_range=x_range,
918                id="bk-worker-network-bandwidth-ts",
919                name="worker_network_bandwidth-timeseries",
920                **kwargs,
921            )
922
923            self.bandwidth.line(
924                source=self.source,
925                x="time",
926                y="read_bytes",
927                color="red",
928                legend_label="read (mean)",
929            )
930            self.bandwidth.line(
931                source=self.source,
932                x="time",
933                y="write_bytes",
934                color="blue",
935                legend_label="write (mean)",
936            )
937
938            self.bandwidth.legend.location = "top_left"
939            self.bandwidth.yaxis.axis_label = "bytes / second"
940            self.bandwidth.yaxis[0].formatter = NumeralTickFormatter(format="0.0b")
941            self.bandwidth.y_range.start = 0
942            self.bandwidth.yaxis.minor_tick_line_alpha = 0
943            self.bandwidth.xgrid.visible = False
944
945            self.cpu = figure(
946                title="Workers CPU",
947                x_axis_type="datetime",
948                tools=tools,
949                x_range=x_range,
950                id="bk-worker-cpu-ts",
951                name="worker_cpu-timeseries",
952                **kwargs,
953            )
954
955            self.cpu.line(
956                source=self.source,
957                x="time",
958                y="cpu",
959            )
960            self.cpu.yaxis.axis_label = "Utilization"
961            self.cpu.y_range.start = 0
962            self.cpu.yaxis.minor_tick_line_alpha = 0
963            self.cpu.xgrid.visible = False
964
965            self.memory = figure(
966                title="Workers Memory",
967                x_axis_type="datetime",
968                tools=tools,
969                x_range=x_range,
970                id="bk-worker-memory-ts",
971                name="worker_memory-timeseries",
972                **kwargs,
973            )
974
975            self.memory.line(
976                source=self.source,
977                x="time",
978                y="memory",
979            )
980            self.memory.yaxis.axis_label = "Bytes"
981            self.memory.yaxis[0].formatter = NumeralTickFormatter(format="0.0b")
982            self.memory.y_range.start = 0
983            self.memory.yaxis.minor_tick_line_alpha = 0
984            self.memory.xgrid.visible = False
985
986            self.disk = figure(
987                title="Workers Disk",
988                x_axis_type="datetime",
989                tools=tools,
990                x_range=x_range,
991                id="bk-worker-disk-ts",
992                name="worker_disk-timeseries",
993                **kwargs,
994            )
995
996            self.disk.line(
997                source=self.source,
998                x="time",
999                y="read_bytes_disk",
1000                color="red",
1001                legend_label="read (mean)",
1002            )
1003            self.disk.line(
1004                source=self.source,
1005                x="time",
1006                y="write_bytes_disk",
1007                color="blue",
1008                legend_label="write (mean)",
1009            )
1010
1011            self.disk.legend.location = "top_left"
1012            self.disk.yaxis.axis_label = "bytes / second"
1013            self.disk.yaxis[0].formatter = NumeralTickFormatter(format="0.0b")
1014            self.disk.y_range.start = 0
1015            self.disk.yaxis.minor_tick_line_alpha = 0
1016            self.disk.xgrid.visible = False
1017
1018    def get_data(self):
1019        workers = self.scheduler.workers.values()
1020
1021        read_bytes = 0
1022        write_bytes = 0
1023        cpu = 0
1024        memory = 0
1025        read_bytes_disk = 0
1026        write_bytes_disk = 0
1027        time = 0
1028        for ws in workers:
1029            read_bytes += ws.metrics["read_bytes"]
1030            write_bytes += ws.metrics["write_bytes"]
1031            cpu += ws.metrics["cpu"]
1032            memory += ws.metrics["memory"]
1033            read_bytes_disk += ws.metrics["read_bytes_disk"]
1034            write_bytes_disk += ws.metrics["write_bytes_disk"]
1035            time += ws.metrics["time"]
1036
1037        result = {
1038            # use `or` to avoid ZeroDivision when no workers
1039            "time": [time / (len(workers) or 1) * 1000],
1040            "read_bytes": [read_bytes / (len(workers) or 1)],
1041            "write_bytes": [write_bytes / (len(workers) or 1)],
1042            "cpu": [cpu / (len(workers) or 1)],
1043            "memory": [memory / (len(workers) or 1)],
1044            "read_bytes_disk": [read_bytes_disk / (len(workers) or 1)],
1045            "write_bytes_disk": [write_bytes_disk / (len(workers) or 1)],
1046        }
1047        return result
1048
1049    @without_property_validation
1050    def update(self):
1051        with log_errors():
1052            self.source.stream(self.get_data(), 1000)
1053
1054            if self.scheduler.workers:
1055                y_end_cpu = sum(
1056                    ws.nthreads or 1 for ws in self.scheduler.workers.values()
1057                ) / len(self.scheduler.workers.values())
1058                y_end_mem = sum(
1059                    ws.memory_limit for ws in self.scheduler.workers.values()
1060                ) / len(self.scheduler.workers.values())
1061            else:
1062                y_end_cpu = 1
1063                y_end_mem = 100_000_000
1064
1065            self.cpu.y_range.end = y_end_cpu * 100
1066            self.memory.y_range.end = y_end_mem
1067
1068
1069class ComputePerKey(DashboardComponent):
1070    """Bar chart showing time spend in action by key prefix"""
1071
1072    def __init__(self, scheduler, **kwargs):
1073        with log_errors():
1074            self.last = 0
1075            self.scheduler = scheduler
1076
1077            if TaskStreamPlugin.name not in self.scheduler.plugins:
1078                self.scheduler.add_plugin(TaskStreamPlugin(self.scheduler))
1079
1080            compute_data = {
1081                "times": [0.2, 0.1],
1082                "formatted_time": ["0.2 ms", "2.8 us"],
1083                "angles": [3.14, 0.785],
1084                "color": [ts_color_lookup["transfer"], ts_color_lookup["compute"]],
1085                "names": ["sum", "sum_partial"],
1086            }
1087
1088            self.compute_source = ColumnDataSource(data=compute_data)
1089
1090            fig = figure(
1091                title="Compute Time Per Task",
1092                tools="",
1093                id="bk-Compute-by-key-plot",
1094                name="compute_time_per_key",
1095                x_range=["a", "b"],
1096                **kwargs,
1097            )
1098
1099            rect = fig.vbar(
1100                source=self.compute_source,
1101                x="names",
1102                top="times",
1103                width=0.7,
1104                color="color",
1105            )
1106
1107            fig.y_range.start = 0
1108            fig.yaxis.axis_label = "Time (s)"
1109            fig.yaxis[0].formatter = NumeralTickFormatter(format="0")
1110            fig.yaxis.ticker = AdaptiveTicker(**TICKS_1024)
1111            fig.xaxis.major_label_orientation = XLABEL_ORIENTATION
1112            rect.nonselection_glyph = None
1113
1114            fig.xaxis.minor_tick_line_alpha = 0
1115            fig.xgrid.visible = False
1116
1117            fig.toolbar_location = None
1118
1119            hover = HoverTool()
1120            hover.tooltips = """
1121            <div>
1122                <p><b>Name:</b> @names</p>
1123                <p><b>Time:</b> @formatted_time</p>
1124            </div>
1125            """
1126            hover.point_policy = "follow_mouse"
1127            fig.add_tools(hover)
1128
1129            fig.add_layout(
1130                Title(
1131                    text="Note: tasks less than 2% of max are not displayed",
1132                    text_font_style="italic",
1133                ),
1134                "below",
1135            )
1136
1137            self.fig = fig
1138            tab1 = Panel(child=fig, title="Bar Chart")
1139
1140            fig2 = figure(
1141                title="Compute Time Per Task",
1142                tools="",
1143                id="bk-Compute-by-key-pie",
1144                name="compute_time_per_key-pie",
1145                x_range=(-0.5, 1.0),
1146                **kwargs,
1147            )
1148
1149            fig2.wedge(
1150                x=0,
1151                y=1,
1152                radius=0.4,
1153                start_angle=cumsum("angles", include_zero=True),
1154                end_angle=cumsum("angles"),
1155                line_color="white",
1156                fill_color="color",
1157                legend_field="names",
1158                source=self.compute_source,
1159            )
1160
1161            fig2.axis.axis_label = None
1162            fig2.axis.visible = False
1163            fig2.grid.grid_line_color = None
1164            fig2.add_layout(
1165                Title(
1166                    text="Note: tasks less than 2% of max are not displayed",
1167                    text_font_style="italic",
1168                ),
1169                "below",
1170            )
1171
1172            hover = HoverTool()
1173            hover.tooltips = """
1174            <div>
1175                <p><b>Name:</b> @names</p>
1176                <p><b>Time:</b> @formatted_time</p>
1177            </div>
1178            """
1179            hover.point_policy = "follow_mouse"
1180            fig2.add_tools(hover)
1181            self.wedge_fig = fig2
1182            tab2 = Panel(child=fig2, title="Pie Chart")
1183
1184            self.root = Tabs(tabs=[tab1, tab2])
1185
1186    @without_property_validation
1187    def update(self):
1188        with log_errors():
1189            compute_times = defaultdict(float)
1190
1191            for key, ts in self.scheduler.task_prefixes.items():
1192                name = key_split(key)
1193                for action, t in ts.all_durations.items():
1194                    if action == "compute":
1195                        compute_times[name] += t
1196
1197            # order by largest time first
1198            compute_times = sorted(
1199                compute_times.items(), key=lambda x: x[1], reverse=True
1200            )
1201
1202            # keep only time which are 2% of max or greater
1203            if compute_times:
1204                max_time = compute_times[0][1] * 0.02
1205                compute_times = [(n, t) for n, t in compute_times if t > max_time]
1206                compute_colors = list()
1207                compute_names = list()
1208                compute_time = list()
1209                total_time = 0
1210                for name, t in compute_times:
1211                    compute_names.append(name)
1212                    compute_colors.append(ts_color_of(name))
1213                    compute_time.append(t)
1214                    total_time += t
1215
1216                angles = [t / total_time * 2 * math.pi for t in compute_time]
1217
1218                self.fig.x_range.factors = compute_names
1219
1220                compute_result = dict(
1221                    angles=angles,
1222                    times=compute_time,
1223                    color=compute_colors,
1224                    names=compute_names,
1225                    formatted_time=[format_time(t) for t in compute_time],
1226                )
1227
1228                update(self.compute_source, compute_result)
1229
1230
1231class AggregateAction(DashboardComponent):
1232    """Bar chart showing time spend in action by key prefix"""
1233
1234    def __init__(self, scheduler, **kwargs):
1235        with log_errors():
1236            self.last = 0
1237            self.scheduler = scheduler
1238
1239            if TaskStreamPlugin.name not in self.scheduler.plugins:
1240                self.scheduler.add_plugin(TaskStreamPlugin(self.scheduler))
1241
1242            action_data = {
1243                "times": [0.2, 0.1],
1244                "formatted_time": ["0.2 ms", "2.8 us"],
1245                "color": [ts_color_lookup["transfer"], ts_color_lookup["compute"]],
1246                "names": ["transfer", "compute"],
1247            }
1248
1249            self.action_source = ColumnDataSource(data=action_data)
1250
1251            self.root = figure(
1252                title="Aggregate Per Action",
1253                tools="",
1254                id="bk-aggregate-per-action-plot",
1255                name="aggregate_per_action",
1256                x_range=["a", "b"],
1257                **kwargs,
1258            )
1259
1260            rect = self.root.vbar(
1261                source=self.action_source,
1262                x="names",
1263                top="times",
1264                width=0.7,
1265                color="color",
1266            )
1267
1268            self.root.y_range.start = 0
1269            self.root.yaxis[0].formatter = NumeralTickFormatter(format="0")
1270            self.root.yaxis.axis_label = "Time (s)"
1271            self.root.yaxis.ticker = AdaptiveTicker(**TICKS_1024)
1272            self.root.xaxis.major_label_orientation = XLABEL_ORIENTATION
1273            self.root.xaxis.major_label_text_font_size = "16px"
1274            rect.nonselection_glyph = None
1275
1276            self.root.xaxis.minor_tick_line_alpha = 0
1277            self.root.xgrid.visible = False
1278
1279            self.root.toolbar_location = None
1280
1281            hover = HoverTool()
1282            hover.tooltips = """
1283            <div>
1284                <p><b>Name:</b> @names</p>
1285                <p><b>Time:</b> @formatted_time</p>
1286            </div>
1287            """
1288            hover.point_policy = "follow_mouse"
1289            self.root.add_tools(hover)
1290
1291    @without_property_validation
1292    def update(self):
1293        with log_errors():
1294            agg_times = defaultdict(float)
1295
1296            for key, ts in self.scheduler.task_prefixes.items():
1297                for action, t in ts.all_durations.items():
1298                    agg_times[action] += t
1299
1300            # order by largest time first
1301            agg_times = sorted(agg_times.items(), key=lambda x: x[1], reverse=True)
1302
1303            agg_colors = list()
1304            agg_names = list()
1305            agg_time = list()
1306            for action, t in agg_times:
1307                agg_names.append(action)
1308                if action == "compute":
1309                    agg_colors.append("purple")
1310                else:
1311                    agg_colors.append(ts_color_lookup[action])
1312                agg_time.append(t)
1313
1314            self.root.x_range.factors = agg_names
1315            self.root.title.text = "Aggregate Time Per Action"
1316
1317            action_result = dict(
1318                times=agg_time,
1319                color=agg_colors,
1320                names=agg_names,
1321                formatted_time=[format_time(t) for t in agg_time],
1322            )
1323
1324            update(self.action_source, action_result)
1325
1326
1327class MemoryByKey(DashboardComponent):
1328    """Bar chart showing memory use by key prefix"""
1329
1330    def __init__(self, scheduler, **kwargs):
1331        with log_errors():
1332            self.last = 0
1333            self.scheduler = scheduler
1334            self.source = ColumnDataSource(
1335                {
1336                    "name": ["a", "b"],
1337                    "nbytes": [100, 1000],
1338                    "count": [1, 2],
1339                    "color": ["blue", "blue"],
1340                }
1341            )
1342
1343            self.root = figure(
1344                title="Memory Use",
1345                tools="",
1346                id="bk-memory-by-key-plot",
1347                name="memory_by_key",
1348                x_range=["a", "b"],
1349                **kwargs,
1350            )
1351            rect = self.root.vbar(
1352                source=self.source, x="name", top="nbytes", width=0.9, color="color"
1353            )
1354            self.root.yaxis[0].formatter = NumeralTickFormatter(format="0.0 b")
1355            self.root.yaxis.ticker = AdaptiveTicker(**TICKS_1024)
1356            self.root.xaxis.major_label_orientation = XLABEL_ORIENTATION
1357            rect.nonselection_glyph = None
1358
1359            self.root.xaxis.minor_tick_line_alpha = 0
1360            self.root.ygrid.visible = False
1361
1362            self.root.toolbar_location = None
1363
1364            hover = HoverTool()
1365            hover.tooltips = "@name: @nbytes_text"
1366            hover.tooltips = """
1367            <div>
1368                <p><b>Name:</b> @name</p>
1369                <p><b>Bytes:</b> @nbytes_text </p>
1370                <p><b>Count:</b> @count objects </p>
1371            </div>
1372            """
1373            hover.point_policy = "follow_mouse"
1374            self.root.add_tools(hover)
1375
1376    @without_property_validation
1377    def update(self):
1378        with log_errors():
1379            counts = defaultdict(int)
1380            nbytes = defaultdict(int)
1381            for ws in self.scheduler.workers.values():
1382                for ts in ws.has_what:
1383                    ks = key_split(ts.key)
1384                    counts[ks] += 1
1385                    nbytes[ks] += ts.nbytes
1386
1387            names = list(sorted(counts))
1388            self.root.x_range.factors = names
1389            result = {
1390                "name": names,
1391                "count": [counts[name] for name in names],
1392                "nbytes": [nbytes[name] for name in names],
1393                "nbytes_text": [format_bytes(nbytes[name]) for name in names],
1394                "color": [color_of(name) for name in names],
1395            }
1396            self.root.title.text = "Total Use: " + format_bytes(sum(nbytes.values()))
1397
1398            update(self.source, result)
1399
1400
1401class CurrentLoad(DashboardComponent):
1402    """Tasks and CPU usage on each worker"""
1403
1404    def __init__(self, scheduler, width=600, **kwargs):
1405        with log_errors():
1406            self.last = 0
1407            self.scheduler = scheduler
1408            self.source = ColumnDataSource(
1409                {
1410                    "nprocessing": [],
1411                    "nprocessing-half": [],
1412                    "nprocessing-color": [],
1413                    "cpu": [],
1414                    "cpu-half": [],
1415                    "y": [],
1416                    "worker": [],
1417                    "escaped_worker": [],
1418                }
1419            )
1420            processing = figure(
1421                title="Tasks Processing",
1422                tools="",
1423                id="bk-nprocessing-plot",
1424                name="processing",
1425                width=int(width / 2),
1426                min_border_bottom=50,
1427                **kwargs,
1428            )
1429            rect = processing.rect(
1430                source=self.source,
1431                x="nprocessing-half",
1432                y="y",
1433                width="nprocessing",
1434                height=0.9,
1435                color="nprocessing-color",
1436            )
1437            processing.x_range.start = 0
1438            rect.nonselection_glyph = None
1439
1440            cpu = figure(
1441                title="CPU Utilization",
1442                tools="",
1443                id="bk-cpu-worker-plot",
1444                width=int(width / 2),
1445                name="cpu_hist",
1446                x_range=(0, 100),
1447                min_border_bottom=50,
1448                **kwargs,
1449            )
1450            rect = cpu.rect(
1451                source=self.source,
1452                x="cpu-half",
1453                y="y",
1454                width="cpu",
1455                height=0.9,
1456                color="blue",
1457            )
1458            rect.nonselection_glyph = None
1459
1460            for fig in (processing, cpu):
1461                fig.xaxis.minor_tick_line_alpha = 0
1462                fig.yaxis.visible = False
1463                fig.ygrid.visible = False
1464
1465                tap = TapTool(
1466                    callback=OpenURL(url="./info/worker/@escaped_worker.html")
1467                )
1468                fig.add_tools(tap)
1469
1470                fig.toolbar_location = None
1471                fig.yaxis.visible = False
1472
1473            hover = HoverTool()
1474            hover.tooltips = "@worker : @nprocessing tasks"
1475            hover.point_policy = "follow_mouse"
1476            processing.add_tools(hover)
1477
1478            hover = HoverTool()
1479            hover.tooltips = "@worker : @cpu %"
1480            hover.point_policy = "follow_mouse"
1481            cpu.add_tools(hover)
1482
1483            self.processing_figure = processing
1484            self.cpu_figure = cpu
1485
1486    @without_property_validation
1487    def update(self):
1488        with log_errors():
1489            workers = self.scheduler.workers.values()
1490            now = time()
1491            if not any(ws.processing for ws in workers) and now < self.last + 1:
1492                return
1493            self.last = now
1494
1495            cpu = [int(ws.metrics["cpu"]) for ws in workers]
1496            nprocessing = [len(ws.processing) for ws in workers]
1497
1498            nprocessing_color = []
1499            for ws in workers:
1500                if ws in self.scheduler.idle:
1501                    nprocessing_color.append("red")
1502                elif ws in self.scheduler.saturated:
1503                    nprocessing_color.append("green")
1504                else:
1505                    nprocessing_color.append("blue")
1506
1507            result = {
1508                "cpu": cpu,
1509                "cpu-half": [c / 2 for c in cpu],
1510                "nprocessing": nprocessing,
1511                "nprocessing-half": [np / 2 for np in nprocessing],
1512                "nprocessing-color": nprocessing_color,
1513                "worker": [ws.address for ws in workers],
1514                "escaped_worker": [escape.url_escape(ws.address) for ws in workers],
1515                "y": list(range(len(workers))),
1516            }
1517
1518            if self.scheduler.workers:
1519                xrange = max(ws.nthreads or 1 for ws in workers)
1520            else:
1521                xrange = 1
1522            self.cpu_figure.x_range.end = xrange * 100
1523
1524            update(self.source, result)
1525
1526
1527class StealingTimeSeries(DashboardComponent):
1528    def __init__(self, scheduler, **kwargs):
1529        self.scheduler = scheduler
1530        self.source = ColumnDataSource(
1531            {
1532                "time": [time() * 1000, time() * 1000 + 1],
1533                "idle": [0, 0],
1534                "saturated": [0, 0],
1535            }
1536        )
1537
1538        x_range = DataRange1d(follow="end", follow_interval=20000, range_padding=0)
1539
1540        self.root = figure(
1541            title="Idle and Saturated Workers Over Time",
1542            x_axis_type="datetime",
1543            tools="",
1544            x_range=x_range,
1545            **kwargs,
1546        )
1547        self.root.line(source=self.source, x="time", y="idle", color="red")
1548        self.root.line(source=self.source, x="time", y="saturated", color="green")
1549        self.root.yaxis.minor_tick_line_color = None
1550
1551        self.root.add_tools(
1552            ResetTool(), PanTool(dimensions="width"), WheelZoomTool(dimensions="width")
1553        )
1554
1555    @without_property_validation
1556    def update(self):
1557        with log_errors():
1558            result = {
1559                "time": [time() * 1000],
1560                "idle": [len(self.scheduler.idle)],
1561                "saturated": [len(self.scheduler.saturated)],
1562            }
1563            if PROFILING:
1564                curdoc().add_next_tick_callback(
1565                    lambda: self.source.stream(result, 10000)
1566                )
1567            else:
1568                self.source.stream(result, 10000)
1569
1570
1571class StealingEvents(DashboardComponent):
1572    def __init__(self, scheduler, **kwargs):
1573        self.scheduler = scheduler
1574        self.steal = scheduler.extensions["stealing"]
1575        self.last = 0
1576        self.source = ColumnDataSource(
1577            {
1578                "time": [time() - 20, time()],
1579                "level": [0, 15],
1580                "color": ["white", "white"],
1581                "duration": [0, 0],
1582                "radius": [1, 1],
1583                "cost_factor": [0, 10],
1584                "count": [1, 1],
1585            }
1586        )
1587
1588        x_range = DataRange1d(follow="end", follow_interval=20000, range_padding=0)
1589
1590        self.root = figure(
1591            title="Stealing Events",
1592            x_axis_type="datetime",
1593            tools="",
1594            x_range=x_range,
1595            **kwargs,
1596        )
1597
1598        self.root.circle(
1599            source=self.source,
1600            x="time",
1601            y="level",
1602            color="color",
1603            size="radius",
1604            alpha=0.5,
1605        )
1606        self.root.yaxis.axis_label = "Level"
1607
1608        hover = HoverTool()
1609        hover.tooltips = "Level: @level, Duration: @duration, Count: @count, Cost factor: @cost_factor"
1610        hover.point_policy = "follow_mouse"
1611
1612        self.root.add_tools(
1613            hover,
1614            ResetTool(),
1615            PanTool(dimensions="width"),
1616            WheelZoomTool(dimensions="width"),
1617        )
1618
1619    def convert(self, msgs):
1620        """Convert a log message to a glyph"""
1621        total_duration = 0
1622        for msg in msgs:
1623            time, level, key, duration, sat, occ_sat, idl, occ_idl = msg
1624            total_duration += duration
1625
1626        try:
1627            color = Viridis11[level]
1628        except (KeyError, IndexError):
1629            color = "black"
1630
1631        radius = math.sqrt(min(total_duration, 10)) * 30 + 2
1632
1633        d = {
1634            "time": time * 1000,
1635            "level": level,
1636            "count": len(msgs),
1637            "color": color,
1638            "duration": total_duration,
1639            "radius": radius,
1640            "cost_factor": self.steal.cost_multipliers[level],
1641        }
1642
1643        return d
1644
1645    @without_property_validation
1646    def update(self):
1647        with log_errors():
1648            log = self.scheduler.get_events(topic="stealing")
1649            current = len(self.scheduler.events["stealing"])
1650            n = current - self.last
1651
1652            log = [log[-i][1] for i in range(1, n + 1) if isinstance(log[-i][1], list)]
1653            self.last = current
1654
1655            if log:
1656                new = pipe(
1657                    log,
1658                    map(groupby(1)),
1659                    map(dict.values),
1660                    concat,
1661                    map(self.convert),
1662                    list,
1663                    transpose,
1664                )
1665                if PROFILING:
1666                    curdoc().add_next_tick_callback(
1667                        lambda: self.source.stream(new, 10000)
1668                    )
1669                else:
1670                    self.source.stream(new, 10000)
1671
1672
1673class Events(DashboardComponent):
1674    def __init__(self, scheduler, name, height=150, **kwargs):
1675        self.scheduler = scheduler
1676        self.action_ys = dict()
1677        self.last = 0
1678        self.name = name
1679        self.source = ColumnDataSource(
1680            {"time": [], "action": [], "hover": [], "y": [], "color": []}
1681        )
1682
1683        x_range = DataRange1d(follow="end", follow_interval=200000)
1684
1685        self.root = figure(
1686            title=name,
1687            x_axis_type="datetime",
1688            height=height,
1689            tools="",
1690            x_range=x_range,
1691            **kwargs,
1692        )
1693
1694        self.root.circle(
1695            source=self.source,
1696            x="time",
1697            y="y",
1698            color="color",
1699            size=50,
1700            alpha=0.5,
1701            **{"legend_field" if BOKEH_VERSION >= "1.4" else "legend": "action"},
1702        )
1703        self.root.yaxis.axis_label = "Action"
1704        self.root.legend.location = "top_left"
1705
1706        hover = HoverTool()
1707        hover.tooltips = "@action<br>@hover"
1708        hover.point_policy = "follow_mouse"
1709
1710        self.root.add_tools(
1711            hover,
1712            ResetTool(),
1713            PanTool(dimensions="width"),
1714            WheelZoomTool(dimensions="width"),
1715        )
1716
1717    @without_property_validation
1718    def update(self):
1719        with log_errors():
1720            log = self.scheduler.events[self.name]
1721            n = self.scheduler.event_counts[self.name] - self.last
1722            if log:
1723                log = [log[-i] for i in range(1, n + 1)]
1724            self.last = self.scheduler.event_counts[self.name]
1725
1726            if log:
1727                actions = []
1728                times = []
1729                hovers = []
1730                ys = []
1731                colors = []
1732                for msg_time, msg in log:
1733                    times.append(msg_time * 1000)
1734                    action = msg["action"]
1735                    actions.append(action)
1736                    try:
1737                        ys.append(self.action_ys[action])
1738                    except KeyError:
1739                        self.action_ys[action] = len(self.action_ys)
1740                        ys.append(self.action_ys[action])
1741                    colors.append(color_of(action))
1742                    hovers.append("TODO")
1743
1744                new = {
1745                    "time": times,
1746                    "action": actions,
1747                    "hover": hovers,
1748                    "y": ys,
1749                    "color": colors,
1750                }
1751
1752                if PROFILING:
1753                    curdoc().add_next_tick_callback(
1754                        lambda: self.source.stream(new, 10000)
1755                    )
1756                else:
1757                    self.source.stream(new, 10000)
1758
1759
1760class TaskStream(DashboardComponent):
1761    def __init__(self, scheduler, n_rectangles=1000, clear_interval="20s", **kwargs):
1762        self.scheduler = scheduler
1763        self.offset = 0
1764
1765        if TaskStreamPlugin.name not in self.scheduler.plugins:
1766            self.scheduler.add_plugin(TaskStreamPlugin(self.scheduler))
1767        self.plugin = self.scheduler.plugins[TaskStreamPlugin.name]
1768
1769        self.index = max(0, self.plugin.index - n_rectangles)
1770        self.workers = dict()
1771        self.n_rectangles = n_rectangles
1772        clear_interval = parse_timedelta(clear_interval, default="ms")
1773        self.clear_interval = clear_interval
1774        self.last = 0
1775        self.last_seen = 0
1776
1777        self.source, self.root = task_stream_figure(clear_interval, **kwargs)
1778
1779        # Required for update callback
1780        self.task_stream_index = [0]
1781
1782    @without_property_validation
1783    def update(self):
1784        if self.index == self.plugin.index:
1785            return
1786        with log_errors():
1787            if self.index and len(self.source.data["start"]):
1788                start = min(self.source.data["start"])
1789                duration = max(self.source.data["duration"])
1790                boundary = (self.offset + start - duration) / 1000
1791            else:
1792                boundary = self.offset
1793            rectangles = self.plugin.rectangles(
1794                istart=self.index, workers=self.workers, start_boundary=boundary
1795            )
1796            n = len(rectangles["name"])
1797            self.index = self.plugin.index
1798
1799            if not rectangles["start"]:
1800                return
1801
1802            # If it has been a while since we've updated the plot
1803            if time() > self.last_seen + self.clear_interval:
1804                new_start = min(rectangles["start"]) - self.offset
1805                old_start = min(self.source.data["start"])
1806                old_end = max(
1807                    map(
1808                        operator.add,
1809                        self.source.data["start"],
1810                        self.source.data["duration"],
1811                    )
1812                )
1813
1814                density = (
1815                    sum(self.source.data["duration"])
1816                    / len(self.workers)
1817                    / (old_end - old_start)
1818                )
1819
1820                # If whitespace is more than 3x the old width
1821                if (new_start - old_end) > (old_end - old_start) * 2 or density < 0.05:
1822                    self.source.data.update({k: [] for k in rectangles})  # clear
1823                    self.offset = min(rectangles["start"])  # redefine offset
1824
1825            rectangles["start"] = [x - self.offset for x in rectangles["start"]]
1826            self.last_seen = time()
1827
1828            # Convert to numpy for serialization speed
1829            if n >= 10 and np:
1830                for k, v in rectangles.items():
1831                    if isinstance(v[0], Number):
1832                        rectangles[k] = np.array(v)
1833
1834            if PROFILING:
1835                curdoc().add_next_tick_callback(
1836                    lambda: self.source.stream(rectangles, self.n_rectangles)
1837                )
1838            else:
1839                self.source.stream(rectangles, self.n_rectangles)
1840
1841
1842def task_stream_figure(clear_interval="20s", **kwargs):
1843    """
1844    kwargs are applied to the bokeh.models.plots.Plot constructor
1845    """
1846    clear_interval = parse_timedelta(clear_interval, default="ms")
1847
1848    source = ColumnDataSource(
1849        data=dict(
1850            start=[time() - clear_interval],
1851            duration=[0.1],
1852            key=["start"],
1853            name=["start"],
1854            color=["white"],
1855            duration_text=["100 ms"],
1856            worker=["foo"],
1857            y=[0],
1858            worker_thread=[1],
1859            alpha=[0.0],
1860        )
1861    )
1862
1863    x_range = DataRange1d(range_padding=0)
1864    y_range = DataRange1d(range_padding=0)
1865
1866    root = figure(
1867        name="task_stream",
1868        title="Task Stream",
1869        id="bk-task-stream-plot",
1870        x_range=x_range,
1871        y_range=y_range,
1872        toolbar_location="above",
1873        x_axis_type="datetime",
1874        y_axis_location=None,
1875        tools="",
1876        min_border_bottom=50,
1877        **kwargs,
1878    )
1879
1880    rect = root.rect(
1881        source=source,
1882        x="start",
1883        y="y",
1884        width="duration",
1885        height=0.4,
1886        fill_color="color",
1887        line_color="color",
1888        line_alpha=0.6,
1889        fill_alpha="alpha",
1890        line_width=3,
1891    )
1892    rect.nonselection_glyph = None
1893
1894    root.yaxis.major_label_text_alpha = 0
1895    root.yaxis.minor_tick_line_alpha = 0
1896    root.yaxis.major_tick_line_alpha = 0
1897    root.xgrid.visible = False
1898
1899    hover = HoverTool(
1900        point_policy="follow_mouse",
1901        tooltips="""
1902            <div>
1903                <span style="font-size: 12px; font-weight: bold;">@name:</span>&nbsp;
1904                <span style="font-size: 10px; font-family: Monaco, monospace;">@duration_text</span>
1905            </div>
1906            """,
1907    )
1908
1909    tap = TapTool(callback=OpenURL(url="./profile?key=@name"))
1910
1911    root.add_tools(
1912        hover,
1913        tap,
1914        BoxZoomTool(),
1915        ResetTool(),
1916        PanTool(dimensions="width"),
1917        WheelZoomTool(dimensions="width"),
1918    )
1919    if ExportTool:
1920        export = ExportTool()
1921        export.register_plot(root)
1922        root.add_tools(export)
1923
1924    return source, root
1925
1926
1927class TaskGraph(DashboardComponent):
1928    """
1929    A dynamic node-link diagram for the task graph on the scheduler
1930
1931    See also the GraphLayout diagnostic at
1932    distributed/diagnostics/graph_layout.py
1933    """
1934
1935    def __init__(self, scheduler, **kwargs):
1936        self.scheduler = scheduler
1937        self.layout = GraphLayout(scheduler)
1938        scheduler.add_plugin(self.layout)
1939        self.invisible_count = 0  # number of invisible nodes
1940
1941        self.node_source = ColumnDataSource(
1942            {"x": [], "y": [], "name": [], "state": [], "visible": [], "key": []}
1943        )
1944        self.edge_source = ColumnDataSource({"x": [], "y": [], "visible": []})
1945
1946        node_view = CDSView(
1947            source=self.node_source,
1948            filters=[GroupFilter(column_name="visible", group="True")],
1949        )
1950        edge_view = CDSView(
1951            source=self.edge_source,
1952            filters=[GroupFilter(column_name="visible", group="True")],
1953        )
1954
1955        node_colors = factor_cmap(
1956            "state",
1957            factors=["waiting", "processing", "memory", "released", "erred"],
1958            palette=["gray", "green", "red", "blue", "black"],
1959        )
1960
1961        self.root = figure(title="Task Graph", **kwargs)
1962        self.subtitle = Title(text=" ", text_font_style="italic")
1963        self.root.add_layout(self.subtitle, "above")
1964
1965        self.root.multi_line(
1966            xs="x",
1967            ys="y",
1968            source=self.edge_source,
1969            line_width=1,
1970            view=edge_view,
1971            color="black",
1972            alpha=0.3,
1973        )
1974        rect = self.root.square(
1975            x="x",
1976            y="y",
1977            size=10,
1978            color=node_colors,
1979            source=self.node_source,
1980            view=node_view,
1981            **{"legend_field" if BOKEH_VERSION >= "1.4" else "legend": "state"},
1982        )
1983        self.root.xgrid.grid_line_color = None
1984        self.root.ygrid.grid_line_color = None
1985
1986        hover = HoverTool(
1987            point_policy="follow_mouse",
1988            tooltips="<b>@name</b>: @state",
1989            renderers=[rect],
1990        )
1991        tap = TapTool(callback=OpenURL(url="info/task/@key.html"), renderers=[rect])
1992        rect.nonselection_glyph = None
1993        self.root.add_tools(hover, tap)
1994        self.max_items = config.get("distributed.dashboard.graph-max-items", 5000)
1995
1996    @without_property_validation
1997    def update(self):
1998        with log_errors():
1999            # If there are too many tasks in the scheduler we'll disable this
2000            # compoonents to not overload scheduler or client. Once we drop
2001            # below the threshold, the data is filled up again as usual
2002            if len(self.scheduler.tasks) > self.max_items:
2003                self.subtitle.text = "Scheduler has too many tasks to display."
2004                for container in [self.node_source, self.edge_source]:
2005                    container.data = {col: [] for col in container.column_names}
2006            else:
2007                # occasionally reset the column data source to remove old nodes
2008                if self.invisible_count > len(self.node_source.data["x"]) / 2:
2009                    self.layout.reset_index()
2010                    self.invisible_count = 0
2011                    update = True
2012                else:
2013                    update = False
2014
2015                new, self.layout.new = self.layout.new, []
2016                new_edges = self.layout.new_edges
2017                self.layout.new_edges = []
2018
2019                self.add_new_nodes_edges(new, new_edges, update=update)
2020
2021                self.patch_updates()
2022
2023                if len(self.scheduler.tasks) == 0:
2024                    self.subtitle.text = "Scheduler is empty."
2025                else:
2026                    self.subtitle.text = " "
2027
2028    @without_property_validation
2029    def add_new_nodes_edges(self, new, new_edges, update=False):
2030        if new or update:
2031            node_key = []
2032            node_x = []
2033            node_y = []
2034            node_state = []
2035            node_name = []
2036            edge_x = []
2037            edge_y = []
2038
2039            x = self.layout.x
2040            y = self.layout.y
2041
2042            tasks = self.scheduler.tasks
2043            for key in new:
2044                try:
2045                    task = tasks[key]
2046                except KeyError:
2047                    continue
2048                xx = x[key]
2049                yy = y[key]
2050                node_key.append(escape.url_escape(key))
2051                node_x.append(xx)
2052                node_y.append(yy)
2053                node_state.append(task.state)
2054                node_name.append(task.prefix.name)
2055
2056            for a, b in new_edges:
2057                try:
2058                    edge_x.append([x[a], x[b]])
2059                    edge_y.append([y[a], y[b]])
2060                except KeyError:
2061                    pass
2062
2063            node = {
2064                "x": node_x,
2065                "y": node_y,
2066                "state": node_state,
2067                "name": node_name,
2068                "key": node_key,
2069                "visible": ["True"] * len(node_x),
2070            }
2071            edge = {"x": edge_x, "y": edge_y, "visible": ["True"] * len(edge_x)}
2072
2073            if update or not len(self.node_source.data["x"]):
2074                # see https://github.com/bokeh/bokeh/issues/7523
2075                self.node_source.data.update(node)
2076                self.edge_source.data.update(edge)
2077            else:
2078                self.node_source.stream(node)
2079                self.edge_source.stream(edge)
2080
2081    @without_property_validation
2082    def patch_updates(self):
2083        """
2084        Small updates like color changes or lost nodes from task transitions
2085        """
2086        n = len(self.node_source.data["x"])
2087        m = len(self.edge_source.data["x"])
2088
2089        if self.layout.state_updates:
2090            state_updates = self.layout.state_updates
2091            self.layout.state_updates = []
2092            updates = [(i, c) for i, c in state_updates if i < n]
2093            self.node_source.patch({"state": updates})
2094
2095        if self.layout.visible_updates:
2096            updates = self.layout.visible_updates
2097            updates = [(i, c) for i, c in updates if i < n]
2098            self.layout.visible_updates = []
2099            self.node_source.patch({"visible": updates})
2100            self.invisible_count += len(updates)
2101
2102        if self.layout.visible_edge_updates:
2103            updates = self.layout.visible_edge_updates
2104            updates = [(i, c) for i, c in updates if i < m]
2105            self.layout.visible_edge_updates = []
2106            self.edge_source.patch({"visible": updates})
2107
2108    def __del__(self):
2109        self.scheduler.remove_plugin(name=self.layout.name)
2110
2111
2112class TaskGroupGraph(DashboardComponent):
2113    """
2114    Task Group Graph
2115
2116    Creates a graph layout for TaskGroups on the scheduler.  It assigns
2117    (x, y) locations to all the TaskGroups and lays them out by according
2118    to their dependencies. The layout gets updated every time that new
2119    TaskGroups are added.
2120
2121    Each task group node incodes information about task progress, memory,
2122    and output type into glyphs, as well as a hover tooltip with more detailed
2123    information on name, computation time, memory, and tasks status.
2124    """
2125
2126    def __init__(self, scheduler, **kwargs):
2127        self.scheduler = scheduler
2128
2129        self.nodes_layout = {}
2130        self.arrows_layout = {}
2131
2132        self.old_counter = -1
2133
2134        self.nodes_source = ColumnDataSource(
2135            {
2136                "x": [],
2137                "y": [],
2138                "w_box": [],
2139                "h_box": [],
2140                "name": [],
2141                "tot_tasks": [],
2142                "color": [],
2143                "x_start": [],
2144                "x_end": [],
2145                "y_start": [],
2146                "y_end": [],
2147                "x_end_progress": [],
2148                "mem_alpha": [],
2149                "node_line_width": [],
2150                "comp_tasks": [],
2151                "url_logo": [],
2152                "x_logo": [],
2153                "y_logo": [],
2154                "w_logo": [],
2155                "h_logo": [],
2156                "in_processing": [],
2157                "in_memory": [],
2158                "in_released": [],
2159                "in_erred": [],
2160                "compute_time": [],
2161                "memory": [],
2162            }
2163        )
2164
2165        self.arrows_source = ColumnDataSource({"xs": [], "ys": [], "xe": [], "ye": []})
2166
2167        self.root = figure(title="Task Groups Graph", match_aspect=True, **kwargs)
2168        self.root.axis.visible = False
2169        self.subtitle = Title(text=" ", text_font_style="italic")
2170        self.root.add_layout(self.subtitle, "above")
2171
2172        rect = self.root.rect(
2173            x="x",
2174            y="y",
2175            width="w_box",
2176            height="h_box",
2177            color="color",
2178            fill_alpha="mem_alpha",
2179            line_color="black",
2180            line_width="node_line_width",
2181            source=self.nodes_source,
2182        )
2183
2184        # plot tg log
2185        self.root.image_url(
2186            url="url_logo",
2187            x="x_logo",
2188            y="y_logo",
2189            w="w_logo",
2190            h="h_logo",
2191            anchor="center",
2192            source=self.nodes_source,
2193        )
2194
2195        # progress bar plain box
2196        self.root.quad(
2197            left="x_start",
2198            right="x_end",
2199            bottom="y_start",
2200            top="y_end",
2201            color=None,
2202            line_color="black",
2203            source=self.nodes_source,
2204        )
2205
2206        # progress bar
2207        self.root.quad(
2208            left="x_start",
2209            right="x_end_progress",
2210            bottom="y_start",
2211            top="y_end",
2212            color="color",
2213            line_color=None,
2214            fill_alpha=0.6,
2215            source=self.nodes_source,
2216        )
2217
2218        self.arrows = Arrow(
2219            end=VeeHead(size=8),
2220            line_color="black",
2221            line_alpha=0.5,
2222            line_width=1,
2223            x_start="xs",
2224            y_start="ys",
2225            x_end="xe",
2226            y_end="ye",
2227            source=self.arrows_source,
2228        )
2229        self.root.add_layout(self.arrows)
2230
2231        self.root.xgrid.grid_line_color = None
2232        self.root.ygrid.grid_line_color = None
2233        self.root.x_range.range_padding = 0.5
2234        self.root.y_range.range_padding = 0.5
2235
2236        hover = HoverTool(
2237            point_policy="follow_mouse",
2238            tooltips="""
2239                <div>
2240                    <span style="font-size: 12px; font-weight: bold;">Name:</span>&nbsp;
2241                    <span style="font-size: 10px; font-family: Monaco, monospace;">@name</span>
2242                </div>
2243                <div>
2244                    <span style="font-size: 12px; font-weight: bold;">Compute time:</span>&nbsp;
2245                    <span style="font-size: 10px; font-family: Monaco, monospace;">@compute_time</span>
2246                </div>
2247                <div>
2248                    <span style="font-size: 12px; font-weight: bold;">Memory:</span>&nbsp;
2249                    <span style="font-size: 10px; font-family: Monaco, monospace;">@memory</span>
2250                </div>
2251                <div>
2252                    <span style="font-size: 12px; font-weight: bold;">Tasks:</span>&nbsp;
2253                    <span style="font-size: 10px; font-family: Monaco, monospace;">@tot_tasks</span>
2254                </div>
2255                <div style="margin-left: 2em;">
2256                    <span style="font-size: 12px; font-weight: bold;">Completed:</span>&nbsp;
2257                    <span style="font-size: 10px; font-family: Monaco, monospace;">@comp_tasks</span>
2258                </div>
2259                <div style="margin-left: 2em;">
2260                    <span style="font-size: 12px; font-weight: bold;">Processing:</span>&nbsp;
2261                    <span style="font-size: 10px; font-family: Monaco, monospace;">@in_processing</span>
2262                </div>
2263                <div style="margin-left: 2em;">
2264                    <span style="font-size: 12px; font-weight: bold;">In memory:</span>&nbsp;
2265                    <span style="font-size: 10px; font-family: Monaco, monospace;">@in_memory</span>
2266                </div>
2267                <div style="margin-left: 2em;">
2268                    <span style="font-size: 12px; font-weight: bold;">Erred:</span>&nbsp;
2269                    <span style="font-size: 10px; font-family: Monaco, monospace;">@in_erred</span>
2270                </div>
2271                <div style="margin-left: 2em;">
2272                    <span style="font-size: 12px; font-weight: bold;">Released:</span>&nbsp;
2273                    <span style="font-size: 10px; font-family: Monaco, monospace;">@in_released</span>
2274                </div>
2275                """,
2276            renderers=[rect],
2277        )
2278
2279        self.root.add_tools(hover)
2280
2281    @without_property_validation
2282    def update_layout(self):
2283
2284        with log_errors():
2285            # get dependecies per task group
2286            # in some cases there are tg that have themeselves as dependencies, we remove those.
2287            dependencies = {
2288                k: {ds.name for ds in ts.dependencies if ds.name != k}
2289                for k, ts in self.scheduler.task_groups.items()
2290            }
2291
2292            import dask
2293
2294            order = dask.order.order(
2295                dsk={group.name: 1 for k, group in self.scheduler.task_groups.items()},
2296                dependencies=dependencies,
2297            )
2298
2299            ordered = sorted(self.scheduler.task_groups, key=order.get)
2300
2301            xs = {}
2302            ys = {}
2303            locations = set()
2304            nodes_layout = {}
2305            arrows_layout = {}
2306            for tg in ordered:
2307                if dependencies[tg]:
2308                    x = max(xs[dep] for dep in dependencies[tg]) + 1
2309                    y = max(ys[dep] for dep in dependencies[tg])
2310                    if (
2311                        len(dependencies[tg]) > 1
2312                        and len({ys[dep] for dep in dependencies[tg]}) == 1
2313                    ):
2314                        y += 1
2315                else:
2316                    x = 0
2317                    y = max(ys.values()) + 1 if ys else 0
2318
2319                while (x, y) in locations:  # avoid collisions by moving up
2320                    y += 1
2321
2322                locations.add((x, y))
2323
2324                xs[tg], ys[tg] = x, y
2325
2326                # info neded for node layout to coulmn data source
2327                nodes_layout[tg] = {"x": xs[tg], "y": ys[tg]}
2328
2329                # info needed for arrow layout
2330                arrows_layout[tg] = {
2331                    "nstart": dependencies[tg],
2332                    "nend": [tg] * len(dependencies[tg]),
2333                }
2334
2335            return nodes_layout, arrows_layout
2336
2337    def compute_size(self, x, min_box, max_box):
2338        start = 0.4
2339        end = 0.8
2340
2341        y = (end - start) / (max_box - min_box) * (x - min_box) + start
2342
2343        return y
2344
2345    @without_property_validation
2346    def update(self):
2347
2348        if self.scheduler.transition_counter == self.old_counter:
2349            return
2350        self.old_counter = self.scheduler.transition_counter
2351
2352        if not self.scheduler.task_groups:
2353            self.subtitle.text = "Scheduler is empty."
2354        else:
2355            self.subtitle.text = " "
2356
2357        if self.nodes_layout.keys() != self.scheduler.task_groups.keys():
2358            self.nodes_layout, self.arrows_layout = self.update_layout()
2359
2360        nodes_data = {
2361            "x": [],
2362            "y": [],
2363            "w_box": [],
2364            "h_box": [],
2365            "name": [],
2366            "color": [],
2367            "tot_tasks": [],
2368            "x_start": [],
2369            "x_end": [],
2370            "y_start": [],
2371            "y_end": [],
2372            "x_end_progress": [],
2373            "mem_alpha": [],
2374            "node_line_width": [],
2375            "comp_tasks": [],
2376            "url_logo": [],
2377            "x_logo": [],
2378            "y_logo": [],
2379            "w_logo": [],
2380            "h_logo": [],
2381            "in_processing": [],
2382            "in_memory": [],
2383            "in_released": [],
2384            "in_erred": [],
2385            "compute_time": [],
2386            "memory": [],
2387        }
2388
2389        arrows_data = {
2390            "xs": [],
2391            "ys": [],
2392            "xe": [],
2393            "ye": [],
2394        }
2395
2396        durations = set()
2397        nbytes = set()
2398        for key, tg in self.scheduler.task_groups.items():
2399
2400            if tg.duration and tg.nbytes_total:
2401                durations.add(tg.duration)
2402                nbytes.add(tg.nbytes_total)
2403
2404        durations_min = min(durations, default=0)
2405        durations_max = max(durations, default=0)
2406        nbytes_min = min(nbytes, default=0)
2407        nbytes_max = max(nbytes, default=0)
2408
2409        box_dim = {}
2410        for key, tg in self.scheduler.task_groups.items():
2411
2412            comp_tasks = (
2413                tg.states["released"] + tg.states["memory"] + tg.states["erred"]
2414            )
2415            tot_tasks = sum(tg.states.values())
2416
2417            # compute width and height of boxes
2418            if (
2419                tg.duration
2420                and tg.nbytes_total
2421                and comp_tasks
2422                and len(durations) > 1
2423                and len(nbytes) > 1
2424            ):
2425
2426                # scale duration (width)
2427                width_box = self.compute_size(
2428                    tg.duration / comp_tasks * tot_tasks,
2429                    min_box=durations_min / comp_tasks * tot_tasks,
2430                    max_box=durations_max / comp_tasks * tot_tasks,
2431                )
2432
2433                # need to scale memory (height)
2434                height_box = self.compute_size(
2435                    tg.nbytes_total / comp_tasks * tot_tasks,
2436                    min_box=nbytes_min / comp_tasks * tot_tasks,
2437                    max_box=nbytes_max / comp_tasks * tot_tasks,
2438                )
2439
2440            else:
2441                width_box = 0.6
2442                height_box = width_box / 2
2443
2444            box_dim[key] = {"width": width_box, "height": height_box}
2445
2446        for key, tg in self.scheduler.task_groups.items():
2447            x = self.nodes_layout[key]["x"]
2448            y = self.nodes_layout[key]["y"]
2449            width = box_dim[key]["width"]
2450            height = box_dim[key]["height"]
2451
2452            # main boxes layout
2453            nodes_data["x"].append(x)
2454            nodes_data["y"].append(y)
2455            nodes_data["w_box"].append(width)
2456            nodes_data["h_box"].append(height)
2457
2458            comp_tasks = (
2459                tg.states["released"] + tg.states["memory"] + tg.states["erred"]
2460            )
2461            tot_tasks = sum(tg.states.values())
2462
2463            nodes_data["name"].append(tg.prefix.name)
2464
2465            nodes_data["color"].append(color_of(tg.prefix.name))
2466            nodes_data["tot_tasks"].append(tot_tasks)
2467
2468            # memory alpha factor by 0.4 if not get's too dark
2469            nodes_data["mem_alpha"].append(
2470                (tg.states["memory"] / sum(tg.states.values())) * 0.4
2471            )
2472
2473            # main box line width
2474            if tg.states["processing"]:
2475                nodes_data["node_line_width"].append(5)
2476            else:
2477                nodes_data["node_line_width"].append(1)
2478
2479            # progress bar data update
2480            nodes_data["x_start"].append(x - width / 2)
2481            nodes_data["x_end"].append(x + width / 2)
2482
2483            nodes_data["y_start"].append(y - height / 2)
2484            nodes_data["y_end"].append(y - height / 2 + height * 0.4)
2485
2486            nodes_data["x_end_progress"].append(
2487                x - width / 2 + width * comp_tasks / tot_tasks
2488            )
2489
2490            # arrows
2491            arrows_data["xs"] += [
2492                self.nodes_layout[k]["x"] + box_dim[k]["width"] / 2
2493                for k in self.arrows_layout[key]["nstart"]
2494            ]
2495            arrows_data["ys"] += [
2496                self.nodes_layout[k]["y"] for k in self.arrows_layout[key]["nstart"]
2497            ]
2498            arrows_data["xe"] += [
2499                self.nodes_layout[k]["x"] - box_dim[k]["width"] / 2
2500                for k in self.arrows_layout[key]["nend"]
2501            ]
2502            arrows_data["ye"] += [
2503                self.nodes_layout[k]["y"] for k in self.arrows_layout[key]["nend"]
2504            ]
2505
2506            # LOGOS
2507            if len(tg.types) == 1:
2508                logo_type = next(iter(tg.types)).split(".")[0]
2509                try:
2510                    url_logo = logos_dict[logo_type]
2511                except KeyError:
2512                    url_logo = ""
2513            else:
2514                url_logo = ""
2515
2516            nodes_data["url_logo"].append(url_logo)
2517
2518            nodes_data["x_logo"].append(x + width / 3)
2519            nodes_data["y_logo"].append(y + height / 3)
2520
2521            ratio = width / height
2522
2523            if ratio > 1:
2524                nodes_data["h_logo"].append(height * 0.3)
2525                nodes_data["w_logo"].append(width * 0.3 / ratio)
2526            else:
2527                nodes_data["h_logo"].append(height * 0.3 * ratio)
2528                nodes_data["w_logo"].append(width * 0.3)
2529
2530            # compute_time and memory
2531            nodes_data["compute_time"].append(format_time(tg.duration))
2532            nodes_data["memory"].append(format_bytes(tg.nbytes_total))
2533
2534            # Add some status to hover
2535            tasks_processing = tg.states["processing"]
2536            tasks_memory = tg.states["memory"]
2537            tasks_relased = tg.states["released"]
2538            tasks_erred = tg.states["erred"]
2539
2540            nodes_data["comp_tasks"].append(
2541                f"{comp_tasks} ({comp_tasks / tot_tasks * 100:.0f} %)"
2542            )
2543            nodes_data["in_processing"].append(
2544                f"{tasks_processing} ({tasks_processing/ tot_tasks * 100:.0f} %)"
2545            )
2546            nodes_data["in_memory"].append(
2547                f"{tasks_memory} ({tasks_memory/ tot_tasks * 100:.0f} %)"
2548            )
2549            nodes_data["in_released"].append(
2550                f"{tasks_relased} ({tasks_relased/ tot_tasks * 100:.0f} %)"
2551            )
2552            nodes_data["in_erred"].append(
2553                f"{ tasks_erred} ({tasks_erred/ tot_tasks * 100:.0f} %)"
2554            )
2555
2556        self.nodes_source.data.update(nodes_data)
2557        self.arrows_source.data.update(arrows_data)
2558
2559
2560class TaskProgress(DashboardComponent):
2561    """Progress bars per task type"""
2562
2563    def __init__(self, scheduler, **kwargs):
2564        self.scheduler = scheduler
2565
2566        data = progress_quads(
2567            dict(all={}, memory={}, erred={}, released={}, processing={})
2568        )
2569        self.source = ColumnDataSource(data=data)
2570
2571        x_range = DataRange1d(range_padding=0)
2572        y_range = Range1d(-8, 0)
2573
2574        self.root = figure(
2575            id="bk-task-progress-plot",
2576            title="Progress",
2577            name="task_progress",
2578            x_range=x_range,
2579            y_range=y_range,
2580            toolbar_location=None,
2581            tools="",
2582            min_border_bottom=50,
2583            **kwargs,
2584        )
2585        self.root.line(  # just to define early ranges
2586            x=[0, 0.9], y=[-1, 0], line_color="#FFFFFF", alpha=0.0
2587        )
2588        self.root.quad(
2589            source=self.source,
2590            top="top",
2591            bottom="bottom",
2592            left="left",
2593            right="right",
2594            fill_color="#aaaaaa",
2595            line_color="#aaaaaa",
2596            fill_alpha=0.1,
2597            line_alpha=0.3,
2598        )
2599        self.root.quad(
2600            source=self.source,
2601            top="top",
2602            bottom="bottom",
2603            left="left",
2604            right="released-loc",
2605            fill_color="color",
2606            line_color="color",
2607            fill_alpha=0.6,
2608        )
2609        self.root.quad(
2610            source=self.source,
2611            top="top",
2612            bottom="bottom",
2613            left="released-loc",
2614            right="memory-loc",
2615            fill_color="color",
2616            line_color="color",
2617            fill_alpha=1.0,
2618        )
2619        self.root.quad(
2620            source=self.source,
2621            top="top",
2622            bottom="bottom",
2623            left="memory-loc",
2624            right="erred-loc",
2625            fill_color="black",
2626            fill_alpha=0.5,
2627            line_alpha=0,
2628        )
2629        self.root.quad(
2630            source=self.source,
2631            top="top",
2632            bottom="bottom",
2633            left="erred-loc",
2634            right="processing-loc",
2635            fill_color="gray",
2636            fill_alpha=0.35,
2637            line_alpha=0,
2638        )
2639        self.root.text(
2640            source=self.source,
2641            text="show-name",
2642            y="bottom",
2643            x="left",
2644            x_offset=5,
2645            text_font_size=value("10pt"),
2646        )
2647        self.root.text(
2648            source=self.source,
2649            text="done",
2650            y="bottom",
2651            x="right",
2652            x_offset=-5,
2653            text_align="right",
2654            text_font_size=value("10pt"),
2655        )
2656        self.root.ygrid.visible = False
2657        self.root.yaxis.minor_tick_line_alpha = 0
2658        self.root.yaxis.visible = False
2659        self.root.xgrid.visible = False
2660        self.root.xaxis.minor_tick_line_alpha = 0
2661        self.root.xaxis.visible = False
2662
2663        hover = HoverTool(
2664            point_policy="follow_mouse",
2665            tooltips="""
2666                <div>
2667                    <span style="font-size: 14px; font-weight: bold;">Name:</span>&nbsp;
2668                    <span style="font-size: 10px; font-family: Monaco, monospace;">@name</span>
2669                </div>
2670                <div>
2671                    <span style="font-size: 14px; font-weight: bold;">All:</span>&nbsp;
2672                    <span style="font-size: 10px; font-family: Monaco, monospace;">@all</span>
2673                </div>
2674                <div>
2675                    <span style="font-size: 14px; font-weight: bold;">Memory:</span>&nbsp;
2676                    <span style="font-size: 10px; font-family: Monaco, monospace;">@memory</span>
2677                </div>
2678                <div>
2679                    <span style="font-size: 14px; font-weight: bold;">Erred:</span>&nbsp;
2680                    <span style="font-size: 10px; font-family: Monaco, monospace;">@erred</span>
2681                </div>
2682                <div>
2683                    <span style="font-size: 14px; font-weight: bold;">Ready:</span>&nbsp;
2684                    <span style="font-size: 10px; font-family: Monaco, monospace;">@processing</span>
2685                </div>
2686                """,
2687        )
2688        self.root.add_tools(hover)
2689
2690    @without_property_validation
2691    def update(self):
2692        with log_errors():
2693            state = {
2694                "memory": {},
2695                "erred": {},
2696                "released": {},
2697                "processing": {},
2698                "waiting": {},
2699            }
2700
2701            for tp in self.scheduler.task_prefixes.values():
2702                active_states = tp.active_states
2703                if any(active_states.get(s) for s in state.keys()):
2704                    state["memory"][tp.name] = active_states["memory"]
2705                    state["erred"][tp.name] = active_states["erred"]
2706                    state["released"][tp.name] = active_states["released"]
2707                    state["processing"][tp.name] = active_states["processing"]
2708                    state["waiting"][tp.name] = active_states["waiting"]
2709
2710            state["all"] = {
2711                k: sum(v[k] for v in state.values()) for k in state["memory"]
2712            }
2713
2714            if not state["all"] and not len(self.source.data["all"]):
2715                return
2716
2717            d = progress_quads(state)
2718
2719            update(self.source, d)
2720
2721            totals = {
2722                k: sum(state[k].values())
2723                for k in ["all", "memory", "erred", "released", "waiting"]
2724            }
2725            totals["processing"] = totals["all"] - sum(
2726                v for k, v in totals.items() if k != "all"
2727            )
2728
2729            self.root.title.text = (
2730                "Progress -- total: %(all)s, "
2731                "in-memory: %(memory)s, processing: %(processing)s, "
2732                "waiting: %(waiting)s, "
2733                "erred: %(erred)s" % totals
2734            )
2735
2736
2737class WorkerTable(DashboardComponent):
2738    """Status of the current workers
2739
2740    This is two plots, a text-based table for each host and a thin horizontal
2741    plot laying out hosts by their current memory use.
2742    """
2743
2744    excluded_names = {
2745        "executing",
2746        "in_flight",
2747        "in_memory",
2748        "ready",
2749        "time",
2750        "spilled_nbytes",
2751    }
2752
2753    def __init__(self, scheduler, width=800, **kwargs):
2754        self.scheduler = scheduler
2755        self.names = [
2756            "name",
2757            "address",
2758            "nthreads",
2759            "cpu",
2760            "memory",
2761            "memory_limit",
2762            "memory_percent",
2763            "memory_managed",
2764            "memory_unmanaged_old",
2765            "memory_unmanaged_recent",
2766            "memory_spilled",
2767            "num_fds",
2768            "read_bytes",
2769            "write_bytes",
2770            "cpu_fraction",
2771        ]
2772        workers = self.scheduler.workers.values()
2773        self.extra_names = sorted(
2774            {
2775                m
2776                for ws in workers
2777                for m, v in ws.metrics.items()
2778                if m not in self.names and isinstance(v, (str, int, float))
2779            }
2780            - self.excluded_names
2781        )
2782
2783        table_names = [
2784            "name",
2785            "address",
2786            "nthreads",
2787            "cpu",
2788            "memory",
2789            "memory_limit",
2790            "memory_percent",
2791            "memory_managed",
2792            "memory_unmanaged_old",
2793            "memory_unmanaged_recent",
2794            "memory_spilled",
2795            "num_fds",
2796            "read_bytes",
2797            "write_bytes",
2798        ]
2799        column_title_renames = {
2800            "memory_limit": "limit",
2801            "memory_percent": "memory %",
2802            "memory_managed": "managed",
2803            "memory_unmanaged_old": "unmanaged old",
2804            "memory_unmanaged_recent": "unmanaged recent",
2805            "memory_spilled": "spilled",
2806            "num_fds": "# fds",
2807            "read_bytes": "read",
2808            "write_bytes": "write",
2809        }
2810
2811        self.source = ColumnDataSource({k: [] for k in self.names})
2812
2813        columns = {
2814            name: TableColumn(field=name, title=column_title_renames.get(name, name))
2815            for name in table_names
2816        }
2817
2818        formatters = {
2819            "cpu": NumberFormatter(format="0 %"),
2820            "memory_percent": NumberFormatter(format="0.0 %"),
2821            "memory": NumberFormatter(format="0.0 b"),
2822            "memory_limit": NumberFormatter(format="0.0 b"),
2823            "memory_managed": NumberFormatter(format="0.0 b"),
2824            "memory_unmanaged_old": NumberFormatter(format="0.0 b"),
2825            "memory_unmanaged_recent": NumberFormatter(format="0.0 b"),
2826            "memory_spilled": NumberFormatter(format="0.0 b"),
2827            "read_bytes": NumberFormatter(format="0 b"),
2828            "write_bytes": NumberFormatter(format="0 b"),
2829            "num_fds": NumberFormatter(format="0"),
2830            "nthreads": NumberFormatter(format="0"),
2831        }
2832
2833        table = DataTable(
2834            source=self.source,
2835            columns=[columns[n] for n in table_names],
2836            reorderable=True,
2837            sortable=True,
2838            width=width,
2839            index_position=None,
2840        )
2841
2842        for name in table_names:
2843            if name in formatters:
2844                table.columns[table_names.index(name)].formatter = formatters[name]
2845
2846        extra_names = ["name", "address"] + self.extra_names
2847        extra_columns = {
2848            name: TableColumn(field=name, title=column_title_renames.get(name, name))
2849            for name in extra_names
2850        }
2851
2852        extra_table = DataTable(
2853            source=self.source,
2854            columns=[extra_columns[n] for n in extra_names],
2855            reorderable=True,
2856            sortable=True,
2857            width=width,
2858            index_position=None,
2859        )
2860
2861        hover = HoverTool(
2862            point_policy="follow_mouse",
2863            tooltips="""
2864                <div>
2865                  <span style="font-size: 10px; font-family: Monaco, monospace;">Worker (@name): </span>
2866                  <span style="font-size: 10px; font-family: Monaco, monospace;">@memory_percent{0.0 %}</span>
2867                </div>
2868                """,
2869        )
2870
2871        mem_plot = figure(
2872            title="Memory Use (%)",
2873            toolbar_location=None,
2874            x_range=(0, 1),
2875            y_range=(-0.1, 0.1),
2876            height=60,
2877            width=width,
2878            tools="",
2879            min_border_right=0,
2880            **kwargs,
2881        )
2882        mem_plot.circle(
2883            source=self.source, x="memory_percent", y=0, size=10, fill_alpha=0.5
2884        )
2885        mem_plot.ygrid.visible = False
2886        mem_plot.yaxis.minor_tick_line_alpha = 0
2887        mem_plot.xaxis.visible = False
2888        mem_plot.yaxis.visible = False
2889        mem_plot.add_tools(hover, BoxSelectTool())
2890
2891        hover = HoverTool(
2892            point_policy="follow_mouse",
2893            tooltips="""
2894                <div>
2895                  <span style="font-size: 10px; font-family: Monaco, monospace;">Worker (@name): </span>
2896                  <span style="font-size: 10px; font-family: Monaco, monospace;">@cpu_fraction{0 %}</span>
2897                </div>
2898                """,
2899        )
2900
2901        cpu_plot = figure(
2902            title="CPU Use (%)",
2903            toolbar_location=None,
2904            x_range=(0, 1),
2905            y_range=(-0.1, 0.1),
2906            height=60,
2907            width=width,
2908            tools="",
2909            min_border_right=0,
2910            **kwargs,
2911        )
2912        cpu_plot.circle(
2913            source=self.source, x="cpu_fraction", y=0, size=10, fill_alpha=0.5
2914        )
2915        cpu_plot.ygrid.visible = False
2916        cpu_plot.yaxis.minor_tick_line_alpha = 0
2917        cpu_plot.xaxis.visible = False
2918        cpu_plot.yaxis.visible = False
2919        cpu_plot.add_tools(hover, BoxSelectTool())
2920        self.cpu_plot = cpu_plot
2921
2922        if "sizing_mode" in kwargs:
2923            sizing_mode = {"sizing_mode": kwargs["sizing_mode"]}
2924        else:
2925            sizing_mode = {}
2926
2927        components = [cpu_plot, mem_plot, table]
2928        if self.extra_names:
2929            components.append(extra_table)
2930
2931        self.root = column(*components, id="bk-worker-table", **sizing_mode)
2932
2933    @without_property_validation
2934    def update(self):
2935        data = {name: [] for name in self.names + self.extra_names}
2936        for i, (addr, ws) in enumerate(
2937            sorted(self.scheduler.workers.items(), key=lambda kv: str(kv[1].name))
2938        ):
2939            minfo = ws.memory
2940
2941            for name in self.names + self.extra_names:
2942                data[name].append(ws.metrics.get(name, None))
2943            data["name"][-1] = ws.name if ws.name is not None else i
2944            data["address"][-1] = ws.address
2945            if ws.memory_limit:
2946                data["memory_percent"][-1] = ws.metrics["memory"] / ws.memory_limit
2947            else:
2948                data["memory_percent"][-1] = ""
2949            data["memory_limit"][-1] = ws.memory_limit
2950            data["memory_managed"][-1] = minfo.managed_in_memory
2951            data["memory_unmanaged_old"][-1] = minfo.unmanaged_old
2952            data["memory_unmanaged_recent"][-1] = minfo.unmanaged_recent
2953            data["memory_unmanaged_recent"][-1] = minfo.unmanaged_recent
2954            data["memory_spilled"][-1] = minfo.managed_spilled
2955            data["cpu"][-1] = ws.metrics["cpu"] / 100.0
2956            data["cpu_fraction"][-1] = ws.metrics["cpu"] / 100.0 / ws.nthreads
2957            data["nthreads"][-1] = ws.nthreads
2958
2959        for name in self.names + self.extra_names:
2960            if name == "name":
2961                data[name].insert(0, f"Total ({len(data[name])})")
2962                continue
2963            try:
2964                if len(self.scheduler.workers) == 0:
2965                    total_data = None
2966                elif name == "memory_percent":
2967                    total_mem = sum(
2968                        ws.memory_limit for ws in self.scheduler.workers.values()
2969                    )
2970                    total_data = (
2971                        (
2972                            sum(
2973                                ws.metrics["memory"]
2974                                for ws in self.scheduler.workers.values()
2975                            )
2976                            / total_mem
2977                        )
2978                        if total_mem
2979                        else ""
2980                    )
2981                elif name == "cpu":
2982                    total_data = (
2983                        sum(ws.metrics["cpu"] for ws in self.scheduler.workers.values())
2984                        / 100
2985                        / len(self.scheduler.workers.values())
2986                    )
2987                elif name == "cpu_fraction":
2988                    total_data = (
2989                        sum(ws.metrics["cpu"] for ws in self.scheduler.workers.values())
2990                        / 100
2991                        / sum(ws.nthreads for ws in self.scheduler.workers.values())
2992                    )
2993                else:
2994                    total_data = sum(data[name])
2995
2996                data[name].insert(0, total_data)
2997            except TypeError:
2998                data[name].insert(0, None)
2999
3000        self.source.data.update(data)
3001
3002
3003class SchedulerLogs:
3004    def __init__(self, scheduler):
3005        logs = Log(
3006            "\n".join(line for level, line in scheduler.get_logs())
3007        )._repr_html_()
3008
3009        self.root = Div(
3010            text=logs,
3011            style={
3012                "width": "100%",
3013                "height": "100%",
3014                "max-width": "1920px",
3015                "max-height": "1080px",
3016                "padding": "12px",
3017                "border": "1px solid lightgray",
3018                "box-shadow": "inset 1px 0 8px 0 lightgray",
3019                "overflow": "auto",
3020            },
3021        )
3022
3023
3024def systemmonitor_doc(scheduler, extra, doc):
3025    with log_errors():
3026        sysmon = SystemMonitor(scheduler, sizing_mode="stretch_both")
3027        doc.title = "Dask: Scheduler System Monitor"
3028        add_periodic_callback(doc, sysmon, 500)
3029
3030        doc.add_root(sysmon.root)
3031        doc.template = env.get_template("simple.html")
3032        doc.template_variables.update(extra)
3033        doc.theme = BOKEH_THEME
3034
3035
3036def stealing_doc(scheduler, extra, doc):
3037    with log_errors():
3038        occupancy = Occupancy(scheduler)
3039        stealing_ts = StealingTimeSeries(scheduler)
3040        stealing_events = StealingEvents(scheduler)
3041        stealing_events.root.x_range = stealing_ts.root.x_range
3042        doc.title = "Dask: Work Stealing"
3043        add_periodic_callback(doc, occupancy, 500)
3044        add_periodic_callback(doc, stealing_ts, 500)
3045        add_periodic_callback(doc, stealing_events, 500)
3046
3047        doc.add_root(
3048            row(
3049                occupancy.root,
3050                column(
3051                    stealing_ts.root,
3052                    stealing_events.root,
3053                    sizing_mode="stretch_both",
3054                ),
3055            )
3056        )
3057
3058        doc.template = env.get_template("simple.html")
3059        doc.template_variables.update(extra)
3060        doc.theme = BOKEH_THEME
3061
3062
3063def events_doc(scheduler, extra, doc):
3064    with log_errors():
3065        events = Events(scheduler, "all", height=250)
3066        events.update()
3067        add_periodic_callback(doc, events, 500)
3068        doc.title = "Dask: Scheduler Events"
3069        doc.add_root(column(events.root, sizing_mode="scale_width"))
3070        doc.template = env.get_template("simple.html")
3071        doc.template_variables.update(extra)
3072        doc.theme = BOKEH_THEME
3073
3074
3075def workers_doc(scheduler, extra, doc):
3076    with log_errors():
3077        table = WorkerTable(scheduler)
3078        table.update()
3079        add_periodic_callback(doc, table, 500)
3080        doc.title = "Dask: Workers"
3081        doc.add_root(table.root)
3082        doc.template = env.get_template("simple.html")
3083        doc.template_variables.update(extra)
3084        doc.theme = BOKEH_THEME
3085
3086
3087def tasks_doc(scheduler, extra, doc):
3088    with log_errors():
3089        ts = TaskStream(
3090            scheduler,
3091            n_rectangles=dask.config.get(
3092                "distributed.scheduler.dashboard.tasks.task-stream-length"
3093            ),
3094            clear_interval="60s",
3095            sizing_mode="stretch_both",
3096        )
3097        ts.update()
3098        add_periodic_callback(doc, ts, 5000)
3099        doc.title = "Dask: Task Stream"
3100        doc.add_root(ts.root)
3101        doc.template = env.get_template("simple.html")
3102        doc.template_variables.update(extra)
3103        doc.theme = BOKEH_THEME
3104
3105
3106def graph_doc(scheduler, extra, doc):
3107    with log_errors():
3108        graph = TaskGraph(scheduler, sizing_mode="stretch_both")
3109        doc.title = "Dask: Task Graph"
3110        graph.update()
3111        add_periodic_callback(doc, graph, 200)
3112        doc.add_root(graph.root)
3113
3114        doc.template = env.get_template("simple.html")
3115        doc.template_variables.update(extra)
3116        doc.theme = BOKEH_THEME
3117
3118
3119def tg_graph_doc(scheduler, extra, doc):
3120    with log_errors():
3121        tg_graph = TaskGroupGraph(scheduler, sizing_mode="stretch_both")
3122        doc.title = "Dask: Task Groups Graph"
3123        tg_graph.update()
3124        add_periodic_callback(doc, tg_graph, 200)
3125        doc.add_root(tg_graph.root)
3126        doc.template = env.get_template("simple.html")
3127        doc.template_variables.update(extra)
3128        doc.theme = BOKEH_THEME
3129
3130
3131def status_doc(scheduler, extra, doc):
3132    with log_errors():
3133        cluster_memory = ClusterMemory(scheduler, sizing_mode="stretch_both")
3134        cluster_memory.update()
3135        add_periodic_callback(doc, cluster_memory, 100)
3136        doc.add_root(cluster_memory.root)
3137
3138        if len(scheduler.workers) <= 100:
3139            workers_memory = WorkersMemory(scheduler, sizing_mode="stretch_both")
3140            processing = CurrentLoad(scheduler, sizing_mode="stretch_both")
3141
3142            processing_root = processing.processing_figure
3143        else:
3144            workers_memory = WorkersMemoryHistogram(
3145                scheduler, sizing_mode="stretch_both"
3146            )
3147            processing = ProcessingHistogram(scheduler, sizing_mode="stretch_both")
3148
3149            processing_root = processing.root
3150
3151        current_load = CurrentLoad(scheduler, sizing_mode="stretch_both")
3152        occupancy = Occupancy(scheduler, sizing_mode="stretch_both")
3153
3154        cpu_root = current_load.cpu_figure
3155        occupancy_root = occupancy.root
3156
3157        workers_memory.update()
3158        processing.update()
3159        current_load.update()
3160        occupancy.update()
3161
3162        add_periodic_callback(doc, workers_memory, 100)
3163        add_periodic_callback(doc, processing, 100)
3164        add_periodic_callback(doc, current_load, 100)
3165        add_periodic_callback(doc, occupancy, 100)
3166
3167        doc.add_root(workers_memory.root)
3168
3169        tab1 = Panel(child=processing_root, title="Processing")
3170        tab2 = Panel(child=cpu_root, title="CPU")
3171        tab3 = Panel(child=occupancy_root, title="Occupancy")
3172
3173        proc_tabs = Tabs(tabs=[tab1, tab2, tab3], name="processing_tabs")
3174        doc.add_root(proc_tabs)
3175
3176        task_stream = TaskStream(
3177            scheduler,
3178            n_rectangles=dask.config.get(
3179                "distributed.scheduler.dashboard.status.task-stream-length"
3180            ),
3181            clear_interval="5s",
3182            sizing_mode="stretch_both",
3183        )
3184        task_stream.update()
3185        add_periodic_callback(doc, task_stream, 100)
3186        doc.add_root(task_stream.root)
3187
3188        task_progress = TaskProgress(scheduler, sizing_mode="stretch_both")
3189        task_progress.update()
3190        add_periodic_callback(doc, task_progress, 100)
3191        doc.add_root(task_progress.root)
3192
3193        doc.title = "Dask: Status"
3194        doc.theme = BOKEH_THEME
3195        doc.template = env.get_template("status.html")
3196        doc.template_variables.update(extra)
3197
3198
3199@curry
3200def individual_doc(cls, interval, scheduler, extra, doc, fig_attr="root", **kwargs):
3201    with log_errors():
3202        fig = cls(scheduler, sizing_mode="stretch_both", **kwargs)
3203        fig.update()
3204        add_periodic_callback(doc, fig, interval)
3205        doc.add_root(getattr(fig, fig_attr))
3206        doc.theme = BOKEH_THEME
3207
3208
3209def individual_profile_doc(scheduler, extra, doc):
3210    with log_errors():
3211        prof = ProfileTimePlot(scheduler, sizing_mode="stretch_both", doc=doc)
3212        doc.add_root(prof.root)
3213        prof.trigger_update()
3214        doc.theme = BOKEH_THEME
3215
3216
3217def individual_profile_server_doc(scheduler, extra, doc):
3218    with log_errors():
3219        prof = ProfileServer(scheduler, sizing_mode="stretch_both", doc=doc)
3220        doc.add_root(prof.root)
3221        prof.trigger_update()
3222        doc.theme = BOKEH_THEME
3223
3224
3225def profile_doc(scheduler, extra, doc):
3226    with log_errors():
3227        doc.title = "Dask: Profile"
3228        prof = ProfileTimePlot(scheduler, sizing_mode="stretch_both", doc=doc)
3229        doc.add_root(prof.root)
3230        doc.template = env.get_template("simple.html")
3231        doc.template_variables.update(extra)
3232        doc.theme = BOKEH_THEME
3233
3234        prof.trigger_update()
3235
3236
3237def profile_server_doc(scheduler, extra, doc):
3238    with log_errors():
3239        doc.title = "Dask: Profile of Event Loop"
3240        prof = ProfileServer(scheduler, sizing_mode="stretch_both", doc=doc)
3241        doc.add_root(prof.root)
3242        doc.template = env.get_template("simple.html")
3243        doc.template_variables.update(extra)
3244        doc.theme = BOKEH_THEME
3245
3246        prof.trigger_update()
3247