import logging
import math
import operator
import os
from collections import defaultdict
from numbers import Number
import numpy as np
from bokeh.core.properties import without_property_validation
from bokeh.io import curdoc
from bokeh.layouts import column, row
from bokeh.models import (
AdaptiveTicker,
Arrow,
BasicTicker,
BoxSelectTool,
BoxZoomTool,
CDSView,
ColorBar,
ColumnDataSource,
DataRange1d,
GroupFilter,
HoverTool,
NumberFormatter,
NumeralTickFormatter,
OpenURL,
Panel,
PanTool,
Range1d,
ResetTool,
Tabs,
TapTool,
Title,
VeeHead,
WheelZoomTool,
value,
)
from bokeh.models.widgets import DataTable, TableColumn
from bokeh.models.widgets.markups import Div
from bokeh.palettes import Viridis11
from bokeh.plotting import figure
from bokeh.themes import Theme
from bokeh.transform import cumsum, factor_cmap, linear_cmap
from tlz import curry, pipe
from tlz.curried import concat, groupby, map
from tornado import escape
import dask
from dask import config
from dask.utils import format_bytes, format_time, key_split, parse_timedelta
from distributed.dashboard.components import add_periodic_callback
from distributed.dashboard.components.shared import (
DashboardComponent,
ProfileServer,
ProfileTimePlot,
SystemMonitor,
)
from distributed.dashboard.utils import BOKEH_VERSION, PROFILING, transpose, update
from distributed.diagnostics.graph_layout import GraphLayout
from distributed.diagnostics.progress_stream import color_of, progress_quads
from distributed.diagnostics.task_stream import TaskStreamPlugin
from distributed.diagnostics.task_stream import color_of as ts_color_of
from distributed.diagnostics.task_stream import colors as ts_color_lookup
from distributed.metrics import time
from distributed.utils import Log, log_errors
if dask.config.get("distributed.dashboard.export-tool"):
from distributed.dashboard.export_tool import ExportTool
else:
ExportTool = None # type: ignore
logger = logging.getLogger(__name__)
from jinja2 import Environment, FileSystemLoader
env = Environment(
loader=FileSystemLoader(
os.path.join(os.path.dirname(__file__), "..", "..", "http", "templates")
)
)
BOKEH_THEME = Theme(
filename=os.path.join(os.path.dirname(__file__), "..", "theme.yaml")
)
TICKS_1024 = {"base": 1024, "mantissas": [1, 2, 4, 8, 16, 32, 64, 128, 256, 512]}
XLABEL_ORIENTATION = -math.pi / 9 # slanted downwards 20 degrees
logos_dict = {
"numpy": "statics/images/numpy.png",
"pandas": "statics/images/pandas.png",
"builtins": "statics/images/python.png",
}
class Occupancy(DashboardComponent):
"""Occupancy (in time) per worker"""
def __init__(self, scheduler, **kwargs):
with log_errors():
self.scheduler = scheduler
self.source = ColumnDataSource(
{
"occupancy": [0, 0],
"worker": ["a", "b"],
"x": [0.0, 0.1],
"y": [1, 2],
"ms": [1, 2],
"color": ["red", "blue"],
"escaped_worker": ["a", "b"],
}
)
self.root = figure(
title="Occupancy",
tools="",
toolbar_location="above",
id="bk-occupancy-plot",
x_axis_type="datetime",
min_border_bottom=50,
**kwargs,
)
rect = self.root.rect(
source=self.source, x="x", width="ms", y="y", height=0.9, color="color"
)
rect.nonselection_glyph = None
self.root.xaxis.minor_tick_line_alpha = 0
self.root.yaxis.visible = False
self.root.ygrid.visible = False
# fig.xaxis[0].formatter = NumeralTickFormatter(format='0.0s')
self.root.x_range.start = 0
tap = TapTool(callback=OpenURL(url="./info/worker/@escaped_worker.html"))
hover = HoverTool()
hover.tooltips = "@worker : @occupancy s."
hover.point_policy = "follow_mouse"
self.root.add_tools(hover, tap)
@without_property_validation
def update(self):
with log_errors():
workers = self.scheduler.workers.values()
y = list(range(len(workers)))
occupancy = [ws.occupancy for ws in workers]
ms = [occ * 1000 for occ in occupancy]
x = [occ / 500 for occ in occupancy]
total = sum(occupancy)
color = []
for ws in workers:
if ws in self.scheduler.idle:
color.append("red")
elif ws in self.scheduler.saturated:
color.append("green")
else:
color.append("blue")
if total:
self.root.title.text = (
f"Occupancy -- total time: {format_time(total)} "
f"wall time: {format_time(total / self.scheduler.total_nthreads)}"
)
else:
self.root.title.text = "Occupancy"
if occupancy:
result = {
"occupancy": occupancy,
"worker": [ws.address for ws in workers],
"ms": ms,
"color": color,
"escaped_worker": [escape.url_escape(ws.address) for ws in workers],
"x": x,
"y": y,
}
update(self.source, result)
class ProcessingHistogram(DashboardComponent):
"""How many tasks are on each worker"""
def __init__(self, scheduler, **kwargs):
with log_errors():
self.last = 0
self.scheduler = scheduler
self.source = ColumnDataSource(
{"left": [1, 2], "right": [10, 10], "top": [0, 0]}
)
self.root = figure(
title="Tasks Processing (count)",
id="bk-nprocessing-histogram-plot",
name="processing",
y_axis_label="frequency",
tools="",
**kwargs,
)
self.root.xaxis.minor_tick_line_alpha = 0
self.root.ygrid.visible = False
self.root.toolbar_location = None
self.root.quad(
source=self.source,
left="left",
right="right",
bottom=0,
top="top",
color="deepskyblue",
fill_alpha=0.5,
)
@without_property_validation
def update(self):
L = [len(ws.processing) for ws in self.scheduler.workers.values()]
counts, x = np.histogram(L, bins=40)
self.source.data.update({"left": x[:-1], "right": x[1:], "top": counts})
def _memory_color(current: int, limit: int) -> str:
"""Dynamic color used by WorkersMemory and ClusterMemory"""
if limit and current > limit:
return "red"
if limit and current > limit / 2:
return "orange"
return "blue"
class ClusterMemory(DashboardComponent):
"""Total memory usage on the cluster"""
def __init__(self, scheduler, width=600, **kwargs):
with log_errors():
self.scheduler = scheduler
self.source = ColumnDataSource(
{
"width": [0] * 4,
"x": [0] * 4,
"y": [0] * 4,
"color": ["blue", "blue", "blue", "grey"],
"alpha": [1, 0.7, 0.4, 1],
"proc_memory": [0] * 4,
"managed": [0] * 4,
"unmanaged_old": [0] * 4,
"unmanaged_recent": [0] * 4,
"spilled": [0] * 4,
}
)
self.root = figure(
title="Bytes stored on cluster",
tools="",
id="bk-cluster-memory-plot",
width=int(width / 2),
name="cluster_memory",
min_border_bottom=50,
**kwargs,
)
rect = self.root.rect(
source=self.source,
x="x",
y="y",
width="width",
height=0.9,
color="color",
alpha="alpha",
)
rect.nonselection_glyph = None
self.root.axis[0].ticker = BasicTicker(**TICKS_1024)
self.root.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b")
self.root.xaxis.major_label_orientation = XLABEL_ORIENTATION
self.root.xaxis.minor_tick_line_alpha = 0
self.root.x_range = Range1d(start=0)
self.root.yaxis.visible = False
self.root.ygrid.visible = False
self.root.toolbar_location = None
self.root.yaxis.visible = False
hover = HoverTool(
point_policy="follow_mouse",
tooltips="""
Process memory (RSS):
@proc_memory{0.00 b}
Managed:
@managed{0.00 b}
Unmanaged (old):
@unmanaged_old{0.00 b}
Unmanaged (recent):
@unmanaged_recent{0.00 b}
Spilled to disk:
@spilled{0.00 b}
""",
)
self.root.add_tools(hover)
@without_property_validation
def update(self):
with log_errors():
limit = sum(ws.memory_limit for ws in self.scheduler.workers.values())
meminfo = self.scheduler.memory
color = _memory_color(meminfo.process, limit)
width = [
meminfo.managed_in_memory,
meminfo.unmanaged_old,
meminfo.unmanaged_recent,
meminfo.managed_spilled,
]
result = {
"width": width,
"x": [sum(width[:i]) + w / 2 for i, w in enumerate(width)],
"color": [color, color, color, "grey"],
"proc_memory": [meminfo.process] * 4,
"managed": [meminfo.managed_in_memory] * 4,
"unmanaged_old": [meminfo.unmanaged_old] * 4,
"unmanaged_recent": [meminfo.unmanaged_recent] * 4,
"spilled": [meminfo.managed_spilled] * 4,
}
x_end = max(limit, meminfo.process + meminfo.managed_spilled)
self.root.x_range.end = x_end
title = f"Bytes stored: {format_bytes(meminfo.process)}"
if meminfo.managed_spilled:
title += f" + {format_bytes(meminfo.managed_spilled)} spilled to disk"
self.root.title.text = title
update(self.source, result)
class WorkersMemory(DashboardComponent):
"""Memory usage for single workers"""
def __init__(self, scheduler, width=600, **kwargs):
with log_errors():
self.scheduler = scheduler
self.source = ColumnDataSource(
{
"width": [],
"x": [],
"y": [],
"color": [],
"alpha": [],
"worker": [],
"escaped_worker": [],
"proc_memory": [],
"managed": [],
"unmanaged_old": [],
"unmanaged_recent": [],
"spilled": [],
}
)
self.root = figure(
title="Bytes stored per worker",
tools="",
id="bk-workers-memory-plot",
width=int(width / 2),
name="workers_memory",
min_border_bottom=50,
**kwargs,
)
rect = self.root.rect(
source=self.source,
x="x",
y="y",
width="width",
height=0.9,
color="color",
fill_alpha="alpha",
line_width=0,
)
rect.nonselection_glyph = None
self.root.axis[0].ticker = BasicTicker(**TICKS_1024)
self.root.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b")
self.root.xaxis.major_label_orientation = XLABEL_ORIENTATION
self.root.xaxis.minor_tick_line_alpha = 0
self.root.x_range = Range1d(start=0)
self.root.yaxis.visible = False
self.root.ygrid.visible = False
tap = TapTool(callback=OpenURL(url="./info/worker/@escaped_worker.html"))
self.root.add_tools(tap)
self.root.toolbar_location = None
self.root.yaxis.visible = False
hover = HoverTool(
point_policy="follow_mouse",
tooltips="""
Worker:
@worker
Process memory (RSS):
@proc_memory{0.00 b}
Managed:
@managed{0.00 b}
Unmanaged (old):
@unmanaged_old{0.00 b}
Unmanaged (recent):
@unmanaged_recent{0.00 b}
Spilled to disk:
@spilled{0.00 b}
""",
)
self.root.add_tools(hover)
@without_property_validation
def update(self):
def quadlist(i) -> list:
out = []
for ii in i:
out += [ii, ii, ii, ii]
return out
with log_errors():
workers = self.scheduler.workers.values()
width = []
x = []
color = []
max_limit = 0
procmemory = []
managed = []
spilled = []
unmanaged_old = []
unmanaged_recent = []
for ws in workers:
meminfo = ws.memory
limit = getattr(ws, "memory_limit", 0)
max_limit = max(
max_limit, limit, meminfo.process + meminfo.managed_spilled
)
color_i = _memory_color(meminfo.process, limit)
width += [
meminfo.managed_in_memory,
meminfo.unmanaged_old,
meminfo.unmanaged_recent,
meminfo.managed_spilled,
]
x += [sum(width[-4:i]) + width[i] / 2 for i in range(-4, 0)]
color += [color_i, color_i, color_i, "grey"]
# memory info
procmemory.append(meminfo.process)
managed.append(meminfo.managed_in_memory)
unmanaged_old.append(meminfo.unmanaged_old)
unmanaged_recent.append(meminfo.unmanaged_recent)
spilled.append(meminfo.managed_spilled)
result = {
"width": width,
"x": x,
"color": color,
"alpha": [1, 0.7, 0.4, 1] * len(workers),
"worker": quadlist(ws.address for ws in workers),
"escaped_worker": quadlist(
escape.url_escape(ws.address) for ws in workers
),
"y": quadlist(range(len(workers))),
"proc_memory": quadlist(procmemory),
"managed": quadlist(managed),
"unmanaged_old": quadlist(unmanaged_old),
"unmanaged_recent": quadlist(unmanaged_recent),
"spilled": quadlist(spilled),
}
# Remove rectangles with width=0
result = {
k: [vi for vi, w in zip(v, width) if w] for k, v in result.items()
}
self.root.x_range.end = max_limit
update(self.source, result)
class WorkersMemoryHistogram(DashboardComponent):
"""Histogram of memory usage, showing how many workers there are in each bucket of
usage. Replaces the per-worker graph when there are >= 50 workers.
"""
def __init__(self, scheduler, **kwargs):
with log_errors():
self.last = 0
self.scheduler = scheduler
self.source = ColumnDataSource(
{"left": [1, 2], "right": [10, 10], "top": [0, 0]}
)
self.root = figure(
title="Bytes stored per worker",
name="workers_memory",
id="bk-workers-memory-histogram-plot",
y_axis_label="frequency",
tools="",
**kwargs,
)
self.root.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b")
self.root.xaxis.ticker = AdaptiveTicker(**TICKS_1024)
self.root.xaxis.major_label_orientation = XLABEL_ORIENTATION
self.root.xaxis.minor_tick_line_alpha = 0
self.root.ygrid.visible = False
self.root.toolbar_location = None
self.root.quad(
source=self.source,
left="left",
right="right",
bottom=0,
top="top",
color="deepskyblue",
fill_alpha=0.5,
)
@without_property_validation
def update(self):
nbytes = np.asarray(
[ws.metrics["memory"] for ws in self.scheduler.workers.values()]
)
counts, x = np.histogram(nbytes, bins=40)
d = {"left": x[:-1], "right": x[1:], "top": counts}
update(self.source, d)
class BandwidthTypes(DashboardComponent):
"""Bar chart showing bandwidth per type"""
def __init__(self, scheduler, **kwargs):
with log_errors():
self.last = 0
self.scheduler = scheduler
self.source = ColumnDataSource(
{
"bandwidth": [1, 2],
"bandwidth-half": [0.5, 1],
"type": ["a", "b"],
"bandwidth_text": ["1", "2"],
}
)
self.root = figure(
title="Bandwidth by Type",
tools="",
id="bk-bandwidth-type-plot",
name="bandwidth_type_histogram",
y_range=["a", "b"],
**kwargs,
)
self.root.xaxis.major_label_orientation = -0.5
rect = self.root.rect(
source=self.source,
x="bandwidth-half",
y="type",
width="bandwidth",
height=0.9,
color="blue",
)
self.root.x_range.start = 0
self.root.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b")
self.root.xaxis.ticker = AdaptiveTicker(**TICKS_1024)
rect.nonselection_glyph = None
self.root.xaxis.minor_tick_line_alpha = 0
self.root.ygrid.visible = False
self.root.toolbar_location = None
hover = HoverTool()
hover.tooltips = "@type: @bandwidth_text / s"
hover.point_policy = "follow_mouse"
self.root.add_tools(hover)
@without_property_validation
def update(self):
with log_errors():
bw = self.scheduler.bandwidth_types
self.root.y_range.factors = list(sorted(bw))
result = {
"bandwidth": list(bw.values()),
"bandwidth-half": [b / 2 for b in bw.values()],
"type": list(bw.keys()),
"bandwidth_text": [format_bytes(x) for x in bw.values()],
}
self.root.title.text = "Bandwidth: " + format_bytes(
self.scheduler.bandwidth
)
update(self.source, result)
class BandwidthWorkers(DashboardComponent):
"""How many tasks are on each worker"""
def __init__(self, scheduler, **kwargs):
with log_errors():
self.last = 0
self.scheduler = scheduler
self.source = ColumnDataSource(
{
"bandwidth": [1, 2],
"source": ["a", "b"],
"destination": ["a", "b"],
"bandwidth_text": ["1", "2"],
}
)
values = [hex(x)[2:] for x in range(64, 256)][::-1]
mapper = linear_cmap(
field_name="bandwidth",
palette=["#" + x + x + "FF" for x in values],
low=0,
high=1,
)
self.root = figure(
title="Bandwidth by Worker",
tools="",
id="bk-bandwidth-worker-plot",
name="bandwidth_worker_heatmap",
x_range=["a", "b"],
y_range=["a", "b"],
**kwargs,
)
self.root.xaxis.major_label_orientation = XLABEL_ORIENTATION
self.root.rect(
source=self.source,
x="source",
y="destination",
color=mapper,
height=1,
width=1,
)
self.color_map = mapper["transform"]
color_bar = ColorBar(
color_mapper=self.color_map,
label_standoff=12,
border_line_color=None,
location=(0, 0),
)
color_bar.formatter = NumeralTickFormatter(format="0.0 b")
color_bar.ticker = AdaptiveTicker(**TICKS_1024)
self.root.add_layout(color_bar, "right")
self.root.toolbar_location = None
hover = HoverTool()
hover.tooltips = """
Source: @source
Destination: @destination
Bandwidth: @bandwidth_text / s
"""
hover.point_policy = "follow_mouse"
self.root.add_tools(hover)
@without_property_validation
def update(self):
with log_errors():
bw = self.scheduler.bandwidth_workers
if not bw:
return
def name(address):
try:
ws = self.scheduler.workers[address]
except KeyError:
return address
if ws.name is not None:
return str(ws.name)
return address
x, y, value = zip(*((name(a), name(b), c) for (a, b), c in bw.items()))
self.color_map.high = max(value)
factors = list(sorted(set(x + y)))
self.root.x_range.factors = factors
self.root.y_range.factors = factors[::-1]
result = {
"source": x,
"destination": y,
"bandwidth": value,
"bandwidth_text": list(map(format_bytes, value)),
}
self.root.title.text = "Bandwidth: " + format_bytes(
self.scheduler.bandwidth
)
update(self.source, result)
class WorkerNetworkBandwidth(DashboardComponent):
"""Worker network bandwidth chart
Plots horizontal bars with the read_bytes and write_bytes worker state
"""
def __init__(self, scheduler, **kwargs):
with log_errors():
self.scheduler = scheduler
self.source = ColumnDataSource(
{
"y_read": [],
"y_write": [],
"x_read": [],
"x_write": [],
"x_read_disk": [],
"x_write_disk": [],
}
)
self.bandwidth = figure(
title="Worker Network Bandwidth",
tools="",
id="bk-worker-net-bandwidth",
name="worker_network_bandwidth",
**kwargs,
)
# read_bytes
self.bandwidth.hbar(
y="y_read",
right="x_read",
line_color=None,
left=0,
height=0.5,
fill_color="red",
legend_label="read",
source=self.source,
)
# write_bytes
self.bandwidth.hbar(
y="y_write",
right="x_write",
line_color=None,
left=0,
height=0.5,
fill_color="blue",
legend_label="write",
source=self.source,
)
self.bandwidth.axis[0].ticker = BasicTicker(**TICKS_1024)
self.bandwidth.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b")
self.bandwidth.xaxis.major_label_orientation = XLABEL_ORIENTATION
self.bandwidth.xaxis.minor_tick_line_alpha = 0
self.bandwidth.x_range = Range1d(start=0)
self.bandwidth.yaxis.visible = False
self.bandwidth.ygrid.visible = False
self.bandwidth.toolbar_location = None
self.disk = figure(
title="Workers Disk",
tools="",
id="bk-workers-disk",
name="worker_disk",
**kwargs,
)
# read_bytes_disk
self.disk.hbar(
y="y_read",
right="x_read_disk",
line_color=None,
left=0,
height=0.5,
fill_color="red",
legend_label="read",
source=self.source,
)
# write_bytes_disk
self.disk.hbar(
y="y_write",
right="x_write_disk",
line_color=None,
left=0,
height=0.5,
fill_color="blue",
legend_label="write",
source=self.source,
)
self.disk.axis[0].ticker = BasicTicker(**TICKS_1024)
self.disk.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b")
self.disk.xaxis.major_label_orientation = XLABEL_ORIENTATION
self.disk.xaxis.minor_tick_line_alpha = 0
self.disk.x_range = Range1d(start=0)
self.disk.yaxis.visible = False
self.disk.ygrid.visible = False
self.disk.toolbar_location = None
@without_property_validation
def update(self):
with log_errors():
workers = self.scheduler.workers.values()
h = 0.1
y_read = [i + 0.75 + i * h for i in range(len(workers))]
y_write = [i + 0.25 + i * h for i in range(len(workers))]
x_read = []
x_write = []
x_read_disk = []
x_write_disk = []
for ws in workers:
x_read.append(ws.metrics["read_bytes"])
x_write.append(ws.metrics["write_bytes"])
x_read_disk.append(ws.metrics["read_bytes_disk"])
x_write_disk.append(ws.metrics["write_bytes_disk"])
if self.scheduler.workers:
self.bandwidth.x_range.end = max(
max(x_read),
max(x_write),
100_000_000,
0.95 * self.bandwidth.x_range.end,
)
self.disk.x_range.end = max(
max(x_read_disk),
max(x_write_disk),
100_000_000,
0.95 * self.disk.x_range.end,
)
else:
self.bandwidth.x_range.end = 100_000_000
self.disk.x_range.end = 100_000_000
result = {
"y_read": y_read,
"y_write": y_write,
"x_read": x_read,
"x_write": x_write,
"x_read_disk": x_read_disk,
"x_write_disk": x_write_disk,
}
update(self.source, result)
class SystemTimeseries(DashboardComponent):
"""Timeseries for worker network bandwidth, cpu, memory and disk.
bandwidth: plots the average of read_bytes and write_bytes for the workers
as a function of time.
cpu: plots the average of cpu for the workers as a function of time.
memory: plots the average of memory for the workers as a function of time.
disk: plots the average of read_bytes_disk and write_bytes_disk for the workers
as a function of time.
The metrics plotted come from the aggregation of
from ws.metrics["val"] for ws in scheduler.workers.values() divided by nuber of workers.
"""
def __init__(self, scheduler, **kwargs):
with log_errors():
self.scheduler = scheduler
self.source = ColumnDataSource(
{
"time": [],
"read_bytes": [],
"write_bytes": [],
"cpu": [],
"memory": [],
"read_bytes_disk": [],
"write_bytes_disk": [],
}
)
update(self.source, self.get_data())
x_range = DataRange1d(follow="end", follow_interval=20000, range_padding=0)
tools = "reset, xpan, xwheel_zoom"
self.bandwidth = figure(
title="Workers Network Bandwidth",
x_axis_type="datetime",
tools=tools,
x_range=x_range,
id="bk-worker-network-bandwidth-ts",
name="worker_network_bandwidth-timeseries",
**kwargs,
)
self.bandwidth.line(
source=self.source,
x="time",
y="read_bytes",
color="red",
legend_label="read (mean)",
)
self.bandwidth.line(
source=self.source,
x="time",
y="write_bytes",
color="blue",
legend_label="write (mean)",
)
self.bandwidth.legend.location = "top_left"
self.bandwidth.yaxis.axis_label = "bytes / second"
self.bandwidth.yaxis[0].formatter = NumeralTickFormatter(format="0.0b")
self.bandwidth.y_range.start = 0
self.bandwidth.yaxis.minor_tick_line_alpha = 0
self.bandwidth.xgrid.visible = False
self.cpu = figure(
title="Workers CPU",
x_axis_type="datetime",
tools=tools,
x_range=x_range,
id="bk-worker-cpu-ts",
name="worker_cpu-timeseries",
**kwargs,
)
self.cpu.line(
source=self.source,
x="time",
y="cpu",
)
self.cpu.yaxis.axis_label = "Utilization"
self.cpu.y_range.start = 0
self.cpu.yaxis.minor_tick_line_alpha = 0
self.cpu.xgrid.visible = False
self.memory = figure(
title="Workers Memory",
x_axis_type="datetime",
tools=tools,
x_range=x_range,
id="bk-worker-memory-ts",
name="worker_memory-timeseries",
**kwargs,
)
self.memory.line(
source=self.source,
x="time",
y="memory",
)
self.memory.yaxis.axis_label = "Bytes"
self.memory.yaxis[0].formatter = NumeralTickFormatter(format="0.0b")
self.memory.y_range.start = 0
self.memory.yaxis.minor_tick_line_alpha = 0
self.memory.xgrid.visible = False
self.disk = figure(
title="Workers Disk",
x_axis_type="datetime",
tools=tools,
x_range=x_range,
id="bk-worker-disk-ts",
name="worker_disk-timeseries",
**kwargs,
)
self.disk.line(
source=self.source,
x="time",
y="read_bytes_disk",
color="red",
legend_label="read (mean)",
)
self.disk.line(
source=self.source,
x="time",
y="write_bytes_disk",
color="blue",
legend_label="write (mean)",
)
self.disk.legend.location = "top_left"
self.disk.yaxis.axis_label = "bytes / second"
self.disk.yaxis[0].formatter = NumeralTickFormatter(format="0.0b")
self.disk.y_range.start = 0
self.disk.yaxis.minor_tick_line_alpha = 0
self.disk.xgrid.visible = False
def get_data(self):
workers = self.scheduler.workers.values()
read_bytes = 0
write_bytes = 0
cpu = 0
memory = 0
read_bytes_disk = 0
write_bytes_disk = 0
time = 0
for ws in workers:
read_bytes += ws.metrics["read_bytes"]
write_bytes += ws.metrics["write_bytes"]
cpu += ws.metrics["cpu"]
memory += ws.metrics["memory"]
read_bytes_disk += ws.metrics["read_bytes_disk"]
write_bytes_disk += ws.metrics["write_bytes_disk"]
time += ws.metrics["time"]
result = {
# use `or` to avoid ZeroDivision when no workers
"time": [time / (len(workers) or 1) * 1000],
"read_bytes": [read_bytes / (len(workers) or 1)],
"write_bytes": [write_bytes / (len(workers) or 1)],
"cpu": [cpu / (len(workers) or 1)],
"memory": [memory / (len(workers) or 1)],
"read_bytes_disk": [read_bytes_disk / (len(workers) or 1)],
"write_bytes_disk": [write_bytes_disk / (len(workers) or 1)],
}
return result
@without_property_validation
def update(self):
with log_errors():
self.source.stream(self.get_data(), 1000)
if self.scheduler.workers:
y_end_cpu = sum(
ws.nthreads or 1 for ws in self.scheduler.workers.values()
) / len(self.scheduler.workers.values())
y_end_mem = sum(
ws.memory_limit for ws in self.scheduler.workers.values()
) / len(self.scheduler.workers.values())
else:
y_end_cpu = 1
y_end_mem = 100_000_000
self.cpu.y_range.end = y_end_cpu * 100
self.memory.y_range.end = y_end_mem
class ComputePerKey(DashboardComponent):
"""Bar chart showing time spend in action by key prefix"""
def __init__(self, scheduler, **kwargs):
with log_errors():
self.last = 0
self.scheduler = scheduler
if TaskStreamPlugin.name not in self.scheduler.plugins:
self.scheduler.add_plugin(TaskStreamPlugin(self.scheduler))
compute_data = {
"times": [0.2, 0.1],
"formatted_time": ["0.2 ms", "2.8 us"],
"angles": [3.14, 0.785],
"color": [ts_color_lookup["transfer"], ts_color_lookup["compute"]],
"names": ["sum", "sum_partial"],
}
self.compute_source = ColumnDataSource(data=compute_data)
fig = figure(
title="Compute Time Per Task",
tools="",
id="bk-Compute-by-key-plot",
name="compute_time_per_key",
x_range=["a", "b"],
**kwargs,
)
rect = fig.vbar(
source=self.compute_source,
x="names",
top="times",
width=0.7,
color="color",
)
fig.y_range.start = 0
fig.yaxis.axis_label = "Time (s)"
fig.yaxis[0].formatter = NumeralTickFormatter(format="0")
fig.yaxis.ticker = AdaptiveTicker(**TICKS_1024)
fig.xaxis.major_label_orientation = XLABEL_ORIENTATION
rect.nonselection_glyph = None
fig.xaxis.minor_tick_line_alpha = 0
fig.xgrid.visible = False
fig.toolbar_location = None
hover = HoverTool()
hover.tooltips = """
Name: @names
Time: @formatted_time
"""
hover.point_policy = "follow_mouse"
fig.add_tools(hover)
fig.add_layout(
Title(
text="Note: tasks less than 2% of max are not displayed",
text_font_style="italic",
),
"below",
)
self.fig = fig
tab1 = Panel(child=fig, title="Bar Chart")
fig2 = figure(
title="Compute Time Per Task",
tools="",
id="bk-Compute-by-key-pie",
name="compute_time_per_key-pie",
x_range=(-0.5, 1.0),
**kwargs,
)
fig2.wedge(
x=0,
y=1,
radius=0.4,
start_angle=cumsum("angles", include_zero=True),
end_angle=cumsum("angles"),
line_color="white",
fill_color="color",
legend_field="names",
source=self.compute_source,
)
fig2.axis.axis_label = None
fig2.axis.visible = False
fig2.grid.grid_line_color = None
fig2.add_layout(
Title(
text="Note: tasks less than 2% of max are not displayed",
text_font_style="italic",
),
"below",
)
hover = HoverTool()
hover.tooltips = """
Name: @names
Time: @formatted_time
"""
hover.point_policy = "follow_mouse"
fig2.add_tools(hover)
self.wedge_fig = fig2
tab2 = Panel(child=fig2, title="Pie Chart")
self.root = Tabs(tabs=[tab1, tab2])
@without_property_validation
def update(self):
with log_errors():
compute_times = defaultdict(float)
for key, ts in self.scheduler.task_prefixes.items():
name = key_split(key)
for action, t in ts.all_durations.items():
if action == "compute":
compute_times[name] += t
# order by largest time first
compute_times = sorted(
compute_times.items(), key=lambda x: x[1], reverse=True
)
# keep only time which are 2% of max or greater
if compute_times:
max_time = compute_times[0][1] * 0.02
compute_times = [(n, t) for n, t in compute_times if t > max_time]
compute_colors = list()
compute_names = list()
compute_time = list()
total_time = 0
for name, t in compute_times:
compute_names.append(name)
compute_colors.append(ts_color_of(name))
compute_time.append(t)
total_time += t
angles = [t / total_time * 2 * math.pi for t in compute_time]
self.fig.x_range.factors = compute_names
compute_result = dict(
angles=angles,
times=compute_time,
color=compute_colors,
names=compute_names,
formatted_time=[format_time(t) for t in compute_time],
)
update(self.compute_source, compute_result)
class AggregateAction(DashboardComponent):
"""Bar chart showing time spend in action by key prefix"""
def __init__(self, scheduler, **kwargs):
with log_errors():
self.last = 0
self.scheduler = scheduler
if TaskStreamPlugin.name not in self.scheduler.plugins:
self.scheduler.add_plugin(TaskStreamPlugin(self.scheduler))
action_data = {
"times": [0.2, 0.1],
"formatted_time": ["0.2 ms", "2.8 us"],
"color": [ts_color_lookup["transfer"], ts_color_lookup["compute"]],
"names": ["transfer", "compute"],
}
self.action_source = ColumnDataSource(data=action_data)
self.root = figure(
title="Aggregate Per Action",
tools="",
id="bk-aggregate-per-action-plot",
name="aggregate_per_action",
x_range=["a", "b"],
**kwargs,
)
rect = self.root.vbar(
source=self.action_source,
x="names",
top="times",
width=0.7,
color="color",
)
self.root.y_range.start = 0
self.root.yaxis[0].formatter = NumeralTickFormatter(format="0")
self.root.yaxis.axis_label = "Time (s)"
self.root.yaxis.ticker = AdaptiveTicker(**TICKS_1024)
self.root.xaxis.major_label_orientation = XLABEL_ORIENTATION
self.root.xaxis.major_label_text_font_size = "16px"
rect.nonselection_glyph = None
self.root.xaxis.minor_tick_line_alpha = 0
self.root.xgrid.visible = False
self.root.toolbar_location = None
hover = HoverTool()
hover.tooltips = """
Name: @names
Time: @formatted_time
"""
hover.point_policy = "follow_mouse"
self.root.add_tools(hover)
@without_property_validation
def update(self):
with log_errors():
agg_times = defaultdict(float)
for key, ts in self.scheduler.task_prefixes.items():
for action, t in ts.all_durations.items():
agg_times[action] += t
# order by largest time first
agg_times = sorted(agg_times.items(), key=lambda x: x[1], reverse=True)
agg_colors = list()
agg_names = list()
agg_time = list()
for action, t in agg_times:
agg_names.append(action)
if action == "compute":
agg_colors.append("purple")
else:
agg_colors.append(ts_color_lookup[action])
agg_time.append(t)
self.root.x_range.factors = agg_names
self.root.title.text = "Aggregate Time Per Action"
action_result = dict(
times=agg_time,
color=agg_colors,
names=agg_names,
formatted_time=[format_time(t) for t in agg_time],
)
update(self.action_source, action_result)
class MemoryByKey(DashboardComponent):
"""Bar chart showing memory use by key prefix"""
def __init__(self, scheduler, **kwargs):
with log_errors():
self.last = 0
self.scheduler = scheduler
self.source = ColumnDataSource(
{
"name": ["a", "b"],
"nbytes": [100, 1000],
"count": [1, 2],
"color": ["blue", "blue"],
}
)
self.root = figure(
title="Memory Use",
tools="",
id="bk-memory-by-key-plot",
name="memory_by_key",
x_range=["a", "b"],
**kwargs,
)
rect = self.root.vbar(
source=self.source, x="name", top="nbytes", width=0.9, color="color"
)
self.root.yaxis[0].formatter = NumeralTickFormatter(format="0.0 b")
self.root.yaxis.ticker = AdaptiveTicker(**TICKS_1024)
self.root.xaxis.major_label_orientation = XLABEL_ORIENTATION
rect.nonselection_glyph = None
self.root.xaxis.minor_tick_line_alpha = 0
self.root.ygrid.visible = False
self.root.toolbar_location = None
hover = HoverTool()
hover.tooltips = "@name: @nbytes_text"
hover.tooltips = """
Name: @name
Bytes: @nbytes_text
Count: @count objects
"""
hover.point_policy = "follow_mouse"
self.root.add_tools(hover)
@without_property_validation
def update(self):
with log_errors():
counts = defaultdict(int)
nbytes = defaultdict(int)
for ws in self.scheduler.workers.values():
for ts in ws.has_what:
ks = key_split(ts.key)
counts[ks] += 1
nbytes[ks] += ts.nbytes
names = list(sorted(counts))
self.root.x_range.factors = names
result = {
"name": names,
"count": [counts[name] for name in names],
"nbytes": [nbytes[name] for name in names],
"nbytes_text": [format_bytes(nbytes[name]) for name in names],
"color": [color_of(name) for name in names],
}
self.root.title.text = "Total Use: " + format_bytes(sum(nbytes.values()))
update(self.source, result)
class CurrentLoad(DashboardComponent):
"""Tasks and CPU usage on each worker"""
def __init__(self, scheduler, width=600, **kwargs):
with log_errors():
self.last = 0
self.scheduler = scheduler
self.source = ColumnDataSource(
{
"nprocessing": [],
"nprocessing-half": [],
"nprocessing-color": [],
"cpu": [],
"cpu-half": [],
"y": [],
"worker": [],
"escaped_worker": [],
}
)
processing = figure(
title="Tasks Processing",
tools="",
id="bk-nprocessing-plot",
name="processing",
width=int(width / 2),
min_border_bottom=50,
**kwargs,
)
rect = processing.rect(
source=self.source,
x="nprocessing-half",
y="y",
width="nprocessing",
height=0.9,
color="nprocessing-color",
)
processing.x_range.start = 0
rect.nonselection_glyph = None
cpu = figure(
title="CPU Utilization",
tools="",
id="bk-cpu-worker-plot",
width=int(width / 2),
name="cpu_hist",
x_range=(0, 100),
min_border_bottom=50,
**kwargs,
)
rect = cpu.rect(
source=self.source,
x="cpu-half",
y="y",
width="cpu",
height=0.9,
color="blue",
)
rect.nonselection_glyph = None
for fig in (processing, cpu):
fig.xaxis.minor_tick_line_alpha = 0
fig.yaxis.visible = False
fig.ygrid.visible = False
tap = TapTool(
callback=OpenURL(url="./info/worker/@escaped_worker.html")
)
fig.add_tools(tap)
fig.toolbar_location = None
fig.yaxis.visible = False
hover = HoverTool()
hover.tooltips = "@worker : @nprocessing tasks"
hover.point_policy = "follow_mouse"
processing.add_tools(hover)
hover = HoverTool()
hover.tooltips = "@worker : @cpu %"
hover.point_policy = "follow_mouse"
cpu.add_tools(hover)
self.processing_figure = processing
self.cpu_figure = cpu
@without_property_validation
def update(self):
with log_errors():
workers = self.scheduler.workers.values()
now = time()
if not any(ws.processing for ws in workers) and now < self.last + 1:
return
self.last = now
cpu = [int(ws.metrics["cpu"]) for ws in workers]
nprocessing = [len(ws.processing) for ws in workers]
nprocessing_color = []
for ws in workers:
if ws in self.scheduler.idle:
nprocessing_color.append("red")
elif ws in self.scheduler.saturated:
nprocessing_color.append("green")
else:
nprocessing_color.append("blue")
result = {
"cpu": cpu,
"cpu-half": [c / 2 for c in cpu],
"nprocessing": nprocessing,
"nprocessing-half": [np / 2 for np in nprocessing],
"nprocessing-color": nprocessing_color,
"worker": [ws.address for ws in workers],
"escaped_worker": [escape.url_escape(ws.address) for ws in workers],
"y": list(range(len(workers))),
}
if self.scheduler.workers:
xrange = max(ws.nthreads or 1 for ws in workers)
else:
xrange = 1
self.cpu_figure.x_range.end = xrange * 100
update(self.source, result)
class StealingTimeSeries(DashboardComponent):
def __init__(self, scheduler, **kwargs):
self.scheduler = scheduler
self.source = ColumnDataSource(
{
"time": [time() * 1000, time() * 1000 + 1],
"idle": [0, 0],
"saturated": [0, 0],
}
)
x_range = DataRange1d(follow="end", follow_interval=20000, range_padding=0)
self.root = figure(
title="Idle and Saturated Workers Over Time",
x_axis_type="datetime",
tools="",
x_range=x_range,
**kwargs,
)
self.root.line(source=self.source, x="time", y="idle", color="red")
self.root.line(source=self.source, x="time", y="saturated", color="green")
self.root.yaxis.minor_tick_line_color = None
self.root.add_tools(
ResetTool(), PanTool(dimensions="width"), WheelZoomTool(dimensions="width")
)
@without_property_validation
def update(self):
with log_errors():
result = {
"time": [time() * 1000],
"idle": [len(self.scheduler.idle)],
"saturated": [len(self.scheduler.saturated)],
}
if PROFILING:
curdoc().add_next_tick_callback(
lambda: self.source.stream(result, 10000)
)
else:
self.source.stream(result, 10000)
class StealingEvents(DashboardComponent):
def __init__(self, scheduler, **kwargs):
self.scheduler = scheduler
self.steal = scheduler.extensions["stealing"]
self.last = 0
self.source = ColumnDataSource(
{
"time": [time() - 20, time()],
"level": [0, 15],
"color": ["white", "white"],
"duration": [0, 0],
"radius": [1, 1],
"cost_factor": [0, 10],
"count": [1, 1],
}
)
x_range = DataRange1d(follow="end", follow_interval=20000, range_padding=0)
self.root = figure(
title="Stealing Events",
x_axis_type="datetime",
tools="",
x_range=x_range,
**kwargs,
)
self.root.circle(
source=self.source,
x="time",
y="level",
color="color",
size="radius",
alpha=0.5,
)
self.root.yaxis.axis_label = "Level"
hover = HoverTool()
hover.tooltips = "Level: @level, Duration: @duration, Count: @count, Cost factor: @cost_factor"
hover.point_policy = "follow_mouse"
self.root.add_tools(
hover,
ResetTool(),
PanTool(dimensions="width"),
WheelZoomTool(dimensions="width"),
)
def convert(self, msgs):
"""Convert a log message to a glyph"""
total_duration = 0
for msg in msgs:
time, level, key, duration, sat, occ_sat, idl, occ_idl = msg
total_duration += duration
try:
color = Viridis11[level]
except (KeyError, IndexError):
color = "black"
radius = math.sqrt(min(total_duration, 10)) * 30 + 2
d = {
"time": time * 1000,
"level": level,
"count": len(msgs),
"color": color,
"duration": total_duration,
"radius": radius,
"cost_factor": self.steal.cost_multipliers[level],
}
return d
@without_property_validation
def update(self):
with log_errors():
log = self.scheduler.get_events(topic="stealing")
current = len(self.scheduler.events["stealing"])
n = current - self.last
log = [log[-i][1] for i in range(1, n + 1) if isinstance(log[-i][1], list)]
self.last = current
if log:
new = pipe(
log,
map(groupby(1)),
map(dict.values),
concat,
map(self.convert),
list,
transpose,
)
if PROFILING:
curdoc().add_next_tick_callback(
lambda: self.source.stream(new, 10000)
)
else:
self.source.stream(new, 10000)
class Events(DashboardComponent):
def __init__(self, scheduler, name, height=150, **kwargs):
self.scheduler = scheduler
self.action_ys = dict()
self.last = 0
self.name = name
self.source = ColumnDataSource(
{"time": [], "action": [], "hover": [], "y": [], "color": []}
)
x_range = DataRange1d(follow="end", follow_interval=200000)
self.root = figure(
title=name,
x_axis_type="datetime",
height=height,
tools="",
x_range=x_range,
**kwargs,
)
self.root.circle(
source=self.source,
x="time",
y="y",
color="color",
size=50,
alpha=0.5,
**{"legend_field" if BOKEH_VERSION >= "1.4" else "legend": "action"},
)
self.root.yaxis.axis_label = "Action"
self.root.legend.location = "top_left"
hover = HoverTool()
hover.tooltips = "@action
@hover"
hover.point_policy = "follow_mouse"
self.root.add_tools(
hover,
ResetTool(),
PanTool(dimensions="width"),
WheelZoomTool(dimensions="width"),
)
@without_property_validation
def update(self):
with log_errors():
log = self.scheduler.events[self.name]
n = self.scheduler.event_counts[self.name] - self.last
if log:
log = [log[-i] for i in range(1, n + 1)]
self.last = self.scheduler.event_counts[self.name]
if log:
actions = []
times = []
hovers = []
ys = []
colors = []
for msg_time, msg in log:
times.append(msg_time * 1000)
action = msg["action"]
actions.append(action)
try:
ys.append(self.action_ys[action])
except KeyError:
self.action_ys[action] = len(self.action_ys)
ys.append(self.action_ys[action])
colors.append(color_of(action))
hovers.append("TODO")
new = {
"time": times,
"action": actions,
"hover": hovers,
"y": ys,
"color": colors,
}
if PROFILING:
curdoc().add_next_tick_callback(
lambda: self.source.stream(new, 10000)
)
else:
self.source.stream(new, 10000)
class TaskStream(DashboardComponent):
def __init__(self, scheduler, n_rectangles=1000, clear_interval="20s", **kwargs):
self.scheduler = scheduler
self.offset = 0
if TaskStreamPlugin.name not in self.scheduler.plugins:
self.scheduler.add_plugin(TaskStreamPlugin(self.scheduler))
self.plugin = self.scheduler.plugins[TaskStreamPlugin.name]
self.index = max(0, self.plugin.index - n_rectangles)
self.workers = dict()
self.n_rectangles = n_rectangles
clear_interval = parse_timedelta(clear_interval, default="ms")
self.clear_interval = clear_interval
self.last = 0
self.last_seen = 0
self.source, self.root = task_stream_figure(clear_interval, **kwargs)
# Required for update callback
self.task_stream_index = [0]
@without_property_validation
def update(self):
if self.index == self.plugin.index:
return
with log_errors():
if self.index and len(self.source.data["start"]):
start = min(self.source.data["start"])
duration = max(self.source.data["duration"])
boundary = (self.offset + start - duration) / 1000
else:
boundary = self.offset
rectangles = self.plugin.rectangles(
istart=self.index, workers=self.workers, start_boundary=boundary
)
n = len(rectangles["name"])
self.index = self.plugin.index
if not rectangles["start"]:
return
# If it has been a while since we've updated the plot
if time() > self.last_seen + self.clear_interval:
new_start = min(rectangles["start"]) - self.offset
old_start = min(self.source.data["start"])
old_end = max(
map(
operator.add,
self.source.data["start"],
self.source.data["duration"],
)
)
density = (
sum(self.source.data["duration"])
/ len(self.workers)
/ (old_end - old_start)
)
# If whitespace is more than 3x the old width
if (new_start - old_end) > (old_end - old_start) * 2 or density < 0.05:
self.source.data.update({k: [] for k in rectangles}) # clear
self.offset = min(rectangles["start"]) # redefine offset
rectangles["start"] = [x - self.offset for x in rectangles["start"]]
self.last_seen = time()
# Convert to numpy for serialization speed
if n >= 10 and np:
for k, v in rectangles.items():
if isinstance(v[0], Number):
rectangles[k] = np.array(v)
if PROFILING:
curdoc().add_next_tick_callback(
lambda: self.source.stream(rectangles, self.n_rectangles)
)
else:
self.source.stream(rectangles, self.n_rectangles)
def task_stream_figure(clear_interval="20s", **kwargs):
"""
kwargs are applied to the bokeh.models.plots.Plot constructor
"""
clear_interval = parse_timedelta(clear_interval, default="ms")
source = ColumnDataSource(
data=dict(
start=[time() - clear_interval],
duration=[0.1],
key=["start"],
name=["start"],
color=["white"],
duration_text=["100 ms"],
worker=["foo"],
y=[0],
worker_thread=[1],
alpha=[0.0],
)
)
x_range = DataRange1d(range_padding=0)
y_range = DataRange1d(range_padding=0)
root = figure(
name="task_stream",
title="Task Stream",
id="bk-task-stream-plot",
x_range=x_range,
y_range=y_range,
toolbar_location="above",
x_axis_type="datetime",
y_axis_location=None,
tools="",
min_border_bottom=50,
**kwargs,
)
rect = root.rect(
source=source,
x="start",
y="y",
width="duration",
height=0.4,
fill_color="color",
line_color="color",
line_alpha=0.6,
fill_alpha="alpha",
line_width=3,
)
rect.nonselection_glyph = None
root.yaxis.major_label_text_alpha = 0
root.yaxis.minor_tick_line_alpha = 0
root.yaxis.major_tick_line_alpha = 0
root.xgrid.visible = False
hover = HoverTool(
point_policy="follow_mouse",
tooltips="""
@name:
@duration_text
""",
)
tap = TapTool(callback=OpenURL(url="./profile?key=@name"))
root.add_tools(
hover,
tap,
BoxZoomTool(),
ResetTool(),
PanTool(dimensions="width"),
WheelZoomTool(dimensions="width"),
)
if ExportTool:
export = ExportTool()
export.register_plot(root)
root.add_tools(export)
return source, root
class TaskGraph(DashboardComponent):
"""
A dynamic node-link diagram for the task graph on the scheduler
See also the GraphLayout diagnostic at
distributed/diagnostics/graph_layout.py
"""
def __init__(self, scheduler, **kwargs):
self.scheduler = scheduler
self.layout = GraphLayout(scheduler)
scheduler.add_plugin(self.layout)
self.invisible_count = 0 # number of invisible nodes
self.node_source = ColumnDataSource(
{"x": [], "y": [], "name": [], "state": [], "visible": [], "key": []}
)
self.edge_source = ColumnDataSource({"x": [], "y": [], "visible": []})
node_view = CDSView(
source=self.node_source,
filters=[GroupFilter(column_name="visible", group="True")],
)
edge_view = CDSView(
source=self.edge_source,
filters=[GroupFilter(column_name="visible", group="True")],
)
node_colors = factor_cmap(
"state",
factors=["waiting", "processing", "memory", "released", "erred"],
palette=["gray", "green", "red", "blue", "black"],
)
self.root = figure(title="Task Graph", **kwargs)
self.subtitle = Title(text=" ", text_font_style="italic")
self.root.add_layout(self.subtitle, "above")
self.root.multi_line(
xs="x",
ys="y",
source=self.edge_source,
line_width=1,
view=edge_view,
color="black",
alpha=0.3,
)
rect = self.root.square(
x="x",
y="y",
size=10,
color=node_colors,
source=self.node_source,
view=node_view,
**{"legend_field" if BOKEH_VERSION >= "1.4" else "legend": "state"},
)
self.root.xgrid.grid_line_color = None
self.root.ygrid.grid_line_color = None
hover = HoverTool(
point_policy="follow_mouse",
tooltips="@name: @state",
renderers=[rect],
)
tap = TapTool(callback=OpenURL(url="info/task/@key.html"), renderers=[rect])
rect.nonselection_glyph = None
self.root.add_tools(hover, tap)
self.max_items = config.get("distributed.dashboard.graph-max-items", 5000)
@without_property_validation
def update(self):
with log_errors():
# If there are too many tasks in the scheduler we'll disable this
# compoonents to not overload scheduler or client. Once we drop
# below the threshold, the data is filled up again as usual
if len(self.scheduler.tasks) > self.max_items:
self.subtitle.text = "Scheduler has too many tasks to display."
for container in [self.node_source, self.edge_source]:
container.data = {col: [] for col in container.column_names}
else:
# occasionally reset the column data source to remove old nodes
if self.invisible_count > len(self.node_source.data["x"]) / 2:
self.layout.reset_index()
self.invisible_count = 0
update = True
else:
update = False
new, self.layout.new = self.layout.new, []
new_edges = self.layout.new_edges
self.layout.new_edges = []
self.add_new_nodes_edges(new, new_edges, update=update)
self.patch_updates()
if len(self.scheduler.tasks) == 0:
self.subtitle.text = "Scheduler is empty."
else:
self.subtitle.text = " "
@without_property_validation
def add_new_nodes_edges(self, new, new_edges, update=False):
if new or update:
node_key = []
node_x = []
node_y = []
node_state = []
node_name = []
edge_x = []
edge_y = []
x = self.layout.x
y = self.layout.y
tasks = self.scheduler.tasks
for key in new:
try:
task = tasks[key]
except KeyError:
continue
xx = x[key]
yy = y[key]
node_key.append(escape.url_escape(key))
node_x.append(xx)
node_y.append(yy)
node_state.append(task.state)
node_name.append(task.prefix.name)
for a, b in new_edges:
try:
edge_x.append([x[a], x[b]])
edge_y.append([y[a], y[b]])
except KeyError:
pass
node = {
"x": node_x,
"y": node_y,
"state": node_state,
"name": node_name,
"key": node_key,
"visible": ["True"] * len(node_x),
}
edge = {"x": edge_x, "y": edge_y, "visible": ["True"] * len(edge_x)}
if update or not len(self.node_source.data["x"]):
# see https://github.com/bokeh/bokeh/issues/7523
self.node_source.data.update(node)
self.edge_source.data.update(edge)
else:
self.node_source.stream(node)
self.edge_source.stream(edge)
@without_property_validation
def patch_updates(self):
"""
Small updates like color changes or lost nodes from task transitions
"""
n = len(self.node_source.data["x"])
m = len(self.edge_source.data["x"])
if self.layout.state_updates:
state_updates = self.layout.state_updates
self.layout.state_updates = []
updates = [(i, c) for i, c in state_updates if i < n]
self.node_source.patch({"state": updates})
if self.layout.visible_updates:
updates = self.layout.visible_updates
updates = [(i, c) for i, c in updates if i < n]
self.layout.visible_updates = []
self.node_source.patch({"visible": updates})
self.invisible_count += len(updates)
if self.layout.visible_edge_updates:
updates = self.layout.visible_edge_updates
updates = [(i, c) for i, c in updates if i < m]
self.layout.visible_edge_updates = []
self.edge_source.patch({"visible": updates})
def __del__(self):
self.scheduler.remove_plugin(name=self.layout.name)
class TaskGroupGraph(DashboardComponent):
"""
Task Group Graph
Creates a graph layout for TaskGroups on the scheduler. It assigns
(x, y) locations to all the TaskGroups and lays them out by according
to their dependencies. The layout gets updated every time that new
TaskGroups are added.
Each task group node incodes information about task progress, memory,
and output type into glyphs, as well as a hover tooltip with more detailed
information on name, computation time, memory, and tasks status.
"""
def __init__(self, scheduler, **kwargs):
self.scheduler = scheduler
self.nodes_layout = {}
self.arrows_layout = {}
self.old_counter = -1
self.nodes_source = ColumnDataSource(
{
"x": [],
"y": [],
"w_box": [],
"h_box": [],
"name": [],
"tot_tasks": [],
"color": [],
"x_start": [],
"x_end": [],
"y_start": [],
"y_end": [],
"x_end_progress": [],
"mem_alpha": [],
"node_line_width": [],
"comp_tasks": [],
"url_logo": [],
"x_logo": [],
"y_logo": [],
"w_logo": [],
"h_logo": [],
"in_processing": [],
"in_memory": [],
"in_released": [],
"in_erred": [],
"compute_time": [],
"memory": [],
}
)
self.arrows_source = ColumnDataSource({"xs": [], "ys": [], "xe": [], "ye": []})
self.root = figure(title="Task Groups Graph", match_aspect=True, **kwargs)
self.root.axis.visible = False
self.subtitle = Title(text=" ", text_font_style="italic")
self.root.add_layout(self.subtitle, "above")
rect = self.root.rect(
x="x",
y="y",
width="w_box",
height="h_box",
color="color",
fill_alpha="mem_alpha",
line_color="black",
line_width="node_line_width",
source=self.nodes_source,
)
# plot tg log
self.root.image_url(
url="url_logo",
x="x_logo",
y="y_logo",
w="w_logo",
h="h_logo",
anchor="center",
source=self.nodes_source,
)
# progress bar plain box
self.root.quad(
left="x_start",
right="x_end",
bottom="y_start",
top="y_end",
color=None,
line_color="black",
source=self.nodes_source,
)
# progress bar
self.root.quad(
left="x_start",
right="x_end_progress",
bottom="y_start",
top="y_end",
color="color",
line_color=None,
fill_alpha=0.6,
source=self.nodes_source,
)
self.arrows = Arrow(
end=VeeHead(size=8),
line_color="black",
line_alpha=0.5,
line_width=1,
x_start="xs",
y_start="ys",
x_end="xe",
y_end="ye",
source=self.arrows_source,
)
self.root.add_layout(self.arrows)
self.root.xgrid.grid_line_color = None
self.root.ygrid.grid_line_color = None
self.root.x_range.range_padding = 0.5
self.root.y_range.range_padding = 0.5
hover = HoverTool(
point_policy="follow_mouse",
tooltips="""
Name:
@name
Compute time:
@compute_time
Memory:
@memory
Tasks:
@tot_tasks
Completed:
@comp_tasks
Processing:
@in_processing
In memory:
@in_memory
Erred:
@in_erred
Released:
@in_released
""",
renderers=[rect],
)
self.root.add_tools(hover)
@without_property_validation
def update_layout(self):
with log_errors():
# get dependecies per task group
# in some cases there are tg that have themeselves as dependencies, we remove those.
dependencies = {
k: {ds.name for ds in ts.dependencies if ds.name != k}
for k, ts in self.scheduler.task_groups.items()
}
import dask
order = dask.order.order(
dsk={group.name: 1 for k, group in self.scheduler.task_groups.items()},
dependencies=dependencies,
)
ordered = sorted(self.scheduler.task_groups, key=order.get)
xs = {}
ys = {}
locations = set()
nodes_layout = {}
arrows_layout = {}
for tg in ordered:
if dependencies[tg]:
x = max(xs[dep] for dep in dependencies[tg]) + 1
y = max(ys[dep] for dep in dependencies[tg])
if (
len(dependencies[tg]) > 1
and len({ys[dep] for dep in dependencies[tg]}) == 1
):
y += 1
else:
x = 0
y = max(ys.values()) + 1 if ys else 0
while (x, y) in locations: # avoid collisions by moving up
y += 1
locations.add((x, y))
xs[tg], ys[tg] = x, y
# info neded for node layout to coulmn data source
nodes_layout[tg] = {"x": xs[tg], "y": ys[tg]}
# info needed for arrow layout
arrows_layout[tg] = {
"nstart": dependencies[tg],
"nend": [tg] * len(dependencies[tg]),
}
return nodes_layout, arrows_layout
def compute_size(self, x, min_box, max_box):
start = 0.4
end = 0.8
y = (end - start) / (max_box - min_box) * (x - min_box) + start
return y
@without_property_validation
def update(self):
if self.scheduler.transition_counter == self.old_counter:
return
self.old_counter = self.scheduler.transition_counter
if not self.scheduler.task_groups:
self.subtitle.text = "Scheduler is empty."
else:
self.subtitle.text = " "
if self.nodes_layout.keys() != self.scheduler.task_groups.keys():
self.nodes_layout, self.arrows_layout = self.update_layout()
nodes_data = {
"x": [],
"y": [],
"w_box": [],
"h_box": [],
"name": [],
"color": [],
"tot_tasks": [],
"x_start": [],
"x_end": [],
"y_start": [],
"y_end": [],
"x_end_progress": [],
"mem_alpha": [],
"node_line_width": [],
"comp_tasks": [],
"url_logo": [],
"x_logo": [],
"y_logo": [],
"w_logo": [],
"h_logo": [],
"in_processing": [],
"in_memory": [],
"in_released": [],
"in_erred": [],
"compute_time": [],
"memory": [],
}
arrows_data = {
"xs": [],
"ys": [],
"xe": [],
"ye": [],
}
durations = set()
nbytes = set()
for key, tg in self.scheduler.task_groups.items():
if tg.duration and tg.nbytes_total:
durations.add(tg.duration)
nbytes.add(tg.nbytes_total)
durations_min = min(durations, default=0)
durations_max = max(durations, default=0)
nbytes_min = min(nbytes, default=0)
nbytes_max = max(nbytes, default=0)
box_dim = {}
for key, tg in self.scheduler.task_groups.items():
comp_tasks = (
tg.states["released"] + tg.states["memory"] + tg.states["erred"]
)
tot_tasks = sum(tg.states.values())
# compute width and height of boxes
if (
tg.duration
and tg.nbytes_total
and comp_tasks
and len(durations) > 1
and len(nbytes) > 1
):
# scale duration (width)
width_box = self.compute_size(
tg.duration / comp_tasks * tot_tasks,
min_box=durations_min / comp_tasks * tot_tasks,
max_box=durations_max / comp_tasks * tot_tasks,
)
# need to scale memory (height)
height_box = self.compute_size(
tg.nbytes_total / comp_tasks * tot_tasks,
min_box=nbytes_min / comp_tasks * tot_tasks,
max_box=nbytes_max / comp_tasks * tot_tasks,
)
else:
width_box = 0.6
height_box = width_box / 2
box_dim[key] = {"width": width_box, "height": height_box}
for key, tg in self.scheduler.task_groups.items():
x = self.nodes_layout[key]["x"]
y = self.nodes_layout[key]["y"]
width = box_dim[key]["width"]
height = box_dim[key]["height"]
# main boxes layout
nodes_data["x"].append(x)
nodes_data["y"].append(y)
nodes_data["w_box"].append(width)
nodes_data["h_box"].append(height)
comp_tasks = (
tg.states["released"] + tg.states["memory"] + tg.states["erred"]
)
tot_tasks = sum(tg.states.values())
nodes_data["name"].append(tg.prefix.name)
nodes_data["color"].append(color_of(tg.prefix.name))
nodes_data["tot_tasks"].append(tot_tasks)
# memory alpha factor by 0.4 if not get's too dark
nodes_data["mem_alpha"].append(
(tg.states["memory"] / sum(tg.states.values())) * 0.4
)
# main box line width
if tg.states["processing"]:
nodes_data["node_line_width"].append(5)
else:
nodes_data["node_line_width"].append(1)
# progress bar data update
nodes_data["x_start"].append(x - width / 2)
nodes_data["x_end"].append(x + width / 2)
nodes_data["y_start"].append(y - height / 2)
nodes_data["y_end"].append(y - height / 2 + height * 0.4)
nodes_data["x_end_progress"].append(
x - width / 2 + width * comp_tasks / tot_tasks
)
# arrows
arrows_data["xs"] += [
self.nodes_layout[k]["x"] + box_dim[k]["width"] / 2
for k in self.arrows_layout[key]["nstart"]
]
arrows_data["ys"] += [
self.nodes_layout[k]["y"] for k in self.arrows_layout[key]["nstart"]
]
arrows_data["xe"] += [
self.nodes_layout[k]["x"] - box_dim[k]["width"] / 2
for k in self.arrows_layout[key]["nend"]
]
arrows_data["ye"] += [
self.nodes_layout[k]["y"] for k in self.arrows_layout[key]["nend"]
]
# LOGOS
if len(tg.types) == 1:
logo_type = next(iter(tg.types)).split(".")[0]
try:
url_logo = logos_dict[logo_type]
except KeyError:
url_logo = ""
else:
url_logo = ""
nodes_data["url_logo"].append(url_logo)
nodes_data["x_logo"].append(x + width / 3)
nodes_data["y_logo"].append(y + height / 3)
ratio = width / height
if ratio > 1:
nodes_data["h_logo"].append(height * 0.3)
nodes_data["w_logo"].append(width * 0.3 / ratio)
else:
nodes_data["h_logo"].append(height * 0.3 * ratio)
nodes_data["w_logo"].append(width * 0.3)
# compute_time and memory
nodes_data["compute_time"].append(format_time(tg.duration))
nodes_data["memory"].append(format_bytes(tg.nbytes_total))
# Add some status to hover
tasks_processing = tg.states["processing"]
tasks_memory = tg.states["memory"]
tasks_relased = tg.states["released"]
tasks_erred = tg.states["erred"]
nodes_data["comp_tasks"].append(
f"{comp_tasks} ({comp_tasks / tot_tasks * 100:.0f} %)"
)
nodes_data["in_processing"].append(
f"{tasks_processing} ({tasks_processing/ tot_tasks * 100:.0f} %)"
)
nodes_data["in_memory"].append(
f"{tasks_memory} ({tasks_memory/ tot_tasks * 100:.0f} %)"
)
nodes_data["in_released"].append(
f"{tasks_relased} ({tasks_relased/ tot_tasks * 100:.0f} %)"
)
nodes_data["in_erred"].append(
f"{ tasks_erred} ({tasks_erred/ tot_tasks * 100:.0f} %)"
)
self.nodes_source.data.update(nodes_data)
self.arrows_source.data.update(arrows_data)
class TaskProgress(DashboardComponent):
"""Progress bars per task type"""
def __init__(self, scheduler, **kwargs):
self.scheduler = scheduler
data = progress_quads(
dict(all={}, memory={}, erred={}, released={}, processing={})
)
self.source = ColumnDataSource(data=data)
x_range = DataRange1d(range_padding=0)
y_range = Range1d(-8, 0)
self.root = figure(
id="bk-task-progress-plot",
title="Progress",
name="task_progress",
x_range=x_range,
y_range=y_range,
toolbar_location=None,
tools="",
min_border_bottom=50,
**kwargs,
)
self.root.line( # just to define early ranges
x=[0, 0.9], y=[-1, 0], line_color="#FFFFFF", alpha=0.0
)
self.root.quad(
source=self.source,
top="top",
bottom="bottom",
left="left",
right="right",
fill_color="#aaaaaa",
line_color="#aaaaaa",
fill_alpha=0.1,
line_alpha=0.3,
)
self.root.quad(
source=self.source,
top="top",
bottom="bottom",
left="left",
right="released-loc",
fill_color="color",
line_color="color",
fill_alpha=0.6,
)
self.root.quad(
source=self.source,
top="top",
bottom="bottom",
left="released-loc",
right="memory-loc",
fill_color="color",
line_color="color",
fill_alpha=1.0,
)
self.root.quad(
source=self.source,
top="top",
bottom="bottom",
left="memory-loc",
right="erred-loc",
fill_color="black",
fill_alpha=0.5,
line_alpha=0,
)
self.root.quad(
source=self.source,
top="top",
bottom="bottom",
left="erred-loc",
right="processing-loc",
fill_color="gray",
fill_alpha=0.35,
line_alpha=0,
)
self.root.text(
source=self.source,
text="show-name",
y="bottom",
x="left",
x_offset=5,
text_font_size=value("10pt"),
)
self.root.text(
source=self.source,
text="done",
y="bottom",
x="right",
x_offset=-5,
text_align="right",
text_font_size=value("10pt"),
)
self.root.ygrid.visible = False
self.root.yaxis.minor_tick_line_alpha = 0
self.root.yaxis.visible = False
self.root.xgrid.visible = False
self.root.xaxis.minor_tick_line_alpha = 0
self.root.xaxis.visible = False
hover = HoverTool(
point_policy="follow_mouse",
tooltips="""
Name:
@name
All:
@all
Memory:
@memory
Erred:
@erred
Ready:
@processing
""",
)
self.root.add_tools(hover)
@without_property_validation
def update(self):
with log_errors():
state = {
"memory": {},
"erred": {},
"released": {},
"processing": {},
"waiting": {},
}
for tp in self.scheduler.task_prefixes.values():
active_states = tp.active_states
if any(active_states.get(s) for s in state.keys()):
state["memory"][tp.name] = active_states["memory"]
state["erred"][tp.name] = active_states["erred"]
state["released"][tp.name] = active_states["released"]
state["processing"][tp.name] = active_states["processing"]
state["waiting"][tp.name] = active_states["waiting"]
state["all"] = {
k: sum(v[k] for v in state.values()) for k in state["memory"]
}
if not state["all"] and not len(self.source.data["all"]):
return
d = progress_quads(state)
update(self.source, d)
totals = {
k: sum(state[k].values())
for k in ["all", "memory", "erred", "released", "waiting"]
}
totals["processing"] = totals["all"] - sum(
v for k, v in totals.items() if k != "all"
)
self.root.title.text = (
"Progress -- total: %(all)s, "
"in-memory: %(memory)s, processing: %(processing)s, "
"waiting: %(waiting)s, "
"erred: %(erred)s" % totals
)
class WorkerTable(DashboardComponent):
"""Status of the current workers
This is two plots, a text-based table for each host and a thin horizontal
plot laying out hosts by their current memory use.
"""
excluded_names = {
"executing",
"in_flight",
"in_memory",
"ready",
"time",
"spilled_nbytes",
}
def __init__(self, scheduler, width=800, **kwargs):
self.scheduler = scheduler
self.names = [
"name",
"address",
"nthreads",
"cpu",
"memory",
"memory_limit",
"memory_percent",
"memory_managed",
"memory_unmanaged_old",
"memory_unmanaged_recent",
"memory_spilled",
"num_fds",
"read_bytes",
"write_bytes",
"cpu_fraction",
]
workers = self.scheduler.workers.values()
self.extra_names = sorted(
{
m
for ws in workers
for m, v in ws.metrics.items()
if m not in self.names and isinstance(v, (str, int, float))
}
- self.excluded_names
)
table_names = [
"name",
"address",
"nthreads",
"cpu",
"memory",
"memory_limit",
"memory_percent",
"memory_managed",
"memory_unmanaged_old",
"memory_unmanaged_recent",
"memory_spilled",
"num_fds",
"read_bytes",
"write_bytes",
]
column_title_renames = {
"memory_limit": "limit",
"memory_percent": "memory %",
"memory_managed": "managed",
"memory_unmanaged_old": "unmanaged old",
"memory_unmanaged_recent": "unmanaged recent",
"memory_spilled": "spilled",
"num_fds": "# fds",
"read_bytes": "read",
"write_bytes": "write",
}
self.source = ColumnDataSource({k: [] for k in self.names})
columns = {
name: TableColumn(field=name, title=column_title_renames.get(name, name))
for name in table_names
}
formatters = {
"cpu": NumberFormatter(format="0 %"),
"memory_percent": NumberFormatter(format="0.0 %"),
"memory": NumberFormatter(format="0.0 b"),
"memory_limit": NumberFormatter(format="0.0 b"),
"memory_managed": NumberFormatter(format="0.0 b"),
"memory_unmanaged_old": NumberFormatter(format="0.0 b"),
"memory_unmanaged_recent": NumberFormatter(format="0.0 b"),
"memory_spilled": NumberFormatter(format="0.0 b"),
"read_bytes": NumberFormatter(format="0 b"),
"write_bytes": NumberFormatter(format="0 b"),
"num_fds": NumberFormatter(format="0"),
"nthreads": NumberFormatter(format="0"),
}
table = DataTable(
source=self.source,
columns=[columns[n] for n in table_names],
reorderable=True,
sortable=True,
width=width,
index_position=None,
)
for name in table_names:
if name in formatters:
table.columns[table_names.index(name)].formatter = formatters[name]
extra_names = ["name", "address"] + self.extra_names
extra_columns = {
name: TableColumn(field=name, title=column_title_renames.get(name, name))
for name in extra_names
}
extra_table = DataTable(
source=self.source,
columns=[extra_columns[n] for n in extra_names],
reorderable=True,
sortable=True,
width=width,
index_position=None,
)
hover = HoverTool(
point_policy="follow_mouse",
tooltips="""
Worker (@name):
@memory_percent{0.0 %}
""",
)
mem_plot = figure(
title="Memory Use (%)",
toolbar_location=None,
x_range=(0, 1),
y_range=(-0.1, 0.1),
height=60,
width=width,
tools="",
min_border_right=0,
**kwargs,
)
mem_plot.circle(
source=self.source, x="memory_percent", y=0, size=10, fill_alpha=0.5
)
mem_plot.ygrid.visible = False
mem_plot.yaxis.minor_tick_line_alpha = 0
mem_plot.xaxis.visible = False
mem_plot.yaxis.visible = False
mem_plot.add_tools(hover, BoxSelectTool())
hover = HoverTool(
point_policy="follow_mouse",
tooltips="""
Worker (@name):
@cpu_fraction{0 %}
""",
)
cpu_plot = figure(
title="CPU Use (%)",
toolbar_location=None,
x_range=(0, 1),
y_range=(-0.1, 0.1),
height=60,
width=width,
tools="",
min_border_right=0,
**kwargs,
)
cpu_plot.circle(
source=self.source, x="cpu_fraction", y=0, size=10, fill_alpha=0.5
)
cpu_plot.ygrid.visible = False
cpu_plot.yaxis.minor_tick_line_alpha = 0
cpu_plot.xaxis.visible = False
cpu_plot.yaxis.visible = False
cpu_plot.add_tools(hover, BoxSelectTool())
self.cpu_plot = cpu_plot
if "sizing_mode" in kwargs:
sizing_mode = {"sizing_mode": kwargs["sizing_mode"]}
else:
sizing_mode = {}
components = [cpu_plot, mem_plot, table]
if self.extra_names:
components.append(extra_table)
self.root = column(*components, id="bk-worker-table", **sizing_mode)
@without_property_validation
def update(self):
data = {name: [] for name in self.names + self.extra_names}
for i, (addr, ws) in enumerate(
sorted(self.scheduler.workers.items(), key=lambda kv: str(kv[1].name))
):
minfo = ws.memory
for name in self.names + self.extra_names:
data[name].append(ws.metrics.get(name, None))
data["name"][-1] = ws.name if ws.name is not None else i
data["address"][-1] = ws.address
if ws.memory_limit:
data["memory_percent"][-1] = ws.metrics["memory"] / ws.memory_limit
else:
data["memory_percent"][-1] = ""
data["memory_limit"][-1] = ws.memory_limit
data["memory_managed"][-1] = minfo.managed_in_memory
data["memory_unmanaged_old"][-1] = minfo.unmanaged_old
data["memory_unmanaged_recent"][-1] = minfo.unmanaged_recent
data["memory_unmanaged_recent"][-1] = minfo.unmanaged_recent
data["memory_spilled"][-1] = minfo.managed_spilled
data["cpu"][-1] = ws.metrics["cpu"] / 100.0
data["cpu_fraction"][-1] = ws.metrics["cpu"] / 100.0 / ws.nthreads
data["nthreads"][-1] = ws.nthreads
for name in self.names + self.extra_names:
if name == "name":
data[name].insert(0, f"Total ({len(data[name])})")
continue
try:
if len(self.scheduler.workers) == 0:
total_data = None
elif name == "memory_percent":
total_mem = sum(
ws.memory_limit for ws in self.scheduler.workers.values()
)
total_data = (
(
sum(
ws.metrics["memory"]
for ws in self.scheduler.workers.values()
)
/ total_mem
)
if total_mem
else ""
)
elif name == "cpu":
total_data = (
sum(ws.metrics["cpu"] for ws in self.scheduler.workers.values())
/ 100
/ len(self.scheduler.workers.values())
)
elif name == "cpu_fraction":
total_data = (
sum(ws.metrics["cpu"] for ws in self.scheduler.workers.values())
/ 100
/ sum(ws.nthreads for ws in self.scheduler.workers.values())
)
else:
total_data = sum(data[name])
data[name].insert(0, total_data)
except TypeError:
data[name].insert(0, None)
self.source.data.update(data)
class SchedulerLogs:
def __init__(self, scheduler):
logs = Log(
"\n".join(line for level, line in scheduler.get_logs())
)._repr_html_()
self.root = Div(
text=logs,
style={
"width": "100%",
"height": "100%",
"max-width": "1920px",
"max-height": "1080px",
"padding": "12px",
"border": "1px solid lightgray",
"box-shadow": "inset 1px 0 8px 0 lightgray",
"overflow": "auto",
},
)
def systemmonitor_doc(scheduler, extra, doc):
with log_errors():
sysmon = SystemMonitor(scheduler, sizing_mode="stretch_both")
doc.title = "Dask: Scheduler System Monitor"
add_periodic_callback(doc, sysmon, 500)
doc.add_root(sysmon.root)
doc.template = env.get_template("simple.html")
doc.template_variables.update(extra)
doc.theme = BOKEH_THEME
def stealing_doc(scheduler, extra, doc):
with log_errors():
occupancy = Occupancy(scheduler)
stealing_ts = StealingTimeSeries(scheduler)
stealing_events = StealingEvents(scheduler)
stealing_events.root.x_range = stealing_ts.root.x_range
doc.title = "Dask: Work Stealing"
add_periodic_callback(doc, occupancy, 500)
add_periodic_callback(doc, stealing_ts, 500)
add_periodic_callback(doc, stealing_events, 500)
doc.add_root(
row(
occupancy.root,
column(
stealing_ts.root,
stealing_events.root,
sizing_mode="stretch_both",
),
)
)
doc.template = env.get_template("simple.html")
doc.template_variables.update(extra)
doc.theme = BOKEH_THEME
def events_doc(scheduler, extra, doc):
with log_errors():
events = Events(scheduler, "all", height=250)
events.update()
add_periodic_callback(doc, events, 500)
doc.title = "Dask: Scheduler Events"
doc.add_root(column(events.root, sizing_mode="scale_width"))
doc.template = env.get_template("simple.html")
doc.template_variables.update(extra)
doc.theme = BOKEH_THEME
def workers_doc(scheduler, extra, doc):
with log_errors():
table = WorkerTable(scheduler)
table.update()
add_periodic_callback(doc, table, 500)
doc.title = "Dask: Workers"
doc.add_root(table.root)
doc.template = env.get_template("simple.html")
doc.template_variables.update(extra)
doc.theme = BOKEH_THEME
def tasks_doc(scheduler, extra, doc):
with log_errors():
ts = TaskStream(
scheduler,
n_rectangles=dask.config.get(
"distributed.scheduler.dashboard.tasks.task-stream-length"
),
clear_interval="60s",
sizing_mode="stretch_both",
)
ts.update()
add_periodic_callback(doc, ts, 5000)
doc.title = "Dask: Task Stream"
doc.add_root(ts.root)
doc.template = env.get_template("simple.html")
doc.template_variables.update(extra)
doc.theme = BOKEH_THEME
def graph_doc(scheduler, extra, doc):
with log_errors():
graph = TaskGraph(scheduler, sizing_mode="stretch_both")
doc.title = "Dask: Task Graph"
graph.update()
add_periodic_callback(doc, graph, 200)
doc.add_root(graph.root)
doc.template = env.get_template("simple.html")
doc.template_variables.update(extra)
doc.theme = BOKEH_THEME
def tg_graph_doc(scheduler, extra, doc):
with log_errors():
tg_graph = TaskGroupGraph(scheduler, sizing_mode="stretch_both")
doc.title = "Dask: Task Groups Graph"
tg_graph.update()
add_periodic_callback(doc, tg_graph, 200)
doc.add_root(tg_graph.root)
doc.template = env.get_template("simple.html")
doc.template_variables.update(extra)
doc.theme = BOKEH_THEME
def status_doc(scheduler, extra, doc):
with log_errors():
cluster_memory = ClusterMemory(scheduler, sizing_mode="stretch_both")
cluster_memory.update()
add_periodic_callback(doc, cluster_memory, 100)
doc.add_root(cluster_memory.root)
if len(scheduler.workers) <= 100:
workers_memory = WorkersMemory(scheduler, sizing_mode="stretch_both")
processing = CurrentLoad(scheduler, sizing_mode="stretch_both")
processing_root = processing.processing_figure
else:
workers_memory = WorkersMemoryHistogram(
scheduler, sizing_mode="stretch_both"
)
processing = ProcessingHistogram(scheduler, sizing_mode="stretch_both")
processing_root = processing.root
current_load = CurrentLoad(scheduler, sizing_mode="stretch_both")
occupancy = Occupancy(scheduler, sizing_mode="stretch_both")
cpu_root = current_load.cpu_figure
occupancy_root = occupancy.root
workers_memory.update()
processing.update()
current_load.update()
occupancy.update()
add_periodic_callback(doc, workers_memory, 100)
add_periodic_callback(doc, processing, 100)
add_periodic_callback(doc, current_load, 100)
add_periodic_callback(doc, occupancy, 100)
doc.add_root(workers_memory.root)
tab1 = Panel(child=processing_root, title="Processing")
tab2 = Panel(child=cpu_root, title="CPU")
tab3 = Panel(child=occupancy_root, title="Occupancy")
proc_tabs = Tabs(tabs=[tab1, tab2, tab3], name="processing_tabs")
doc.add_root(proc_tabs)
task_stream = TaskStream(
scheduler,
n_rectangles=dask.config.get(
"distributed.scheduler.dashboard.status.task-stream-length"
),
clear_interval="5s",
sizing_mode="stretch_both",
)
task_stream.update()
add_periodic_callback(doc, task_stream, 100)
doc.add_root(task_stream.root)
task_progress = TaskProgress(scheduler, sizing_mode="stretch_both")
task_progress.update()
add_periodic_callback(doc, task_progress, 100)
doc.add_root(task_progress.root)
doc.title = "Dask: Status"
doc.theme = BOKEH_THEME
doc.template = env.get_template("status.html")
doc.template_variables.update(extra)
@curry
def individual_doc(cls, interval, scheduler, extra, doc, fig_attr="root", **kwargs):
with log_errors():
fig = cls(scheduler, sizing_mode="stretch_both", **kwargs)
fig.update()
add_periodic_callback(doc, fig, interval)
doc.add_root(getattr(fig, fig_attr))
doc.theme = BOKEH_THEME
def individual_profile_doc(scheduler, extra, doc):
with log_errors():
prof = ProfileTimePlot(scheduler, sizing_mode="stretch_both", doc=doc)
doc.add_root(prof.root)
prof.trigger_update()
doc.theme = BOKEH_THEME
def individual_profile_server_doc(scheduler, extra, doc):
with log_errors():
prof = ProfileServer(scheduler, sizing_mode="stretch_both", doc=doc)
doc.add_root(prof.root)
prof.trigger_update()
doc.theme = BOKEH_THEME
def profile_doc(scheduler, extra, doc):
with log_errors():
doc.title = "Dask: Profile"
prof = ProfileTimePlot(scheduler, sizing_mode="stretch_both", doc=doc)
doc.add_root(prof.root)
doc.template = env.get_template("simple.html")
doc.template_variables.update(extra)
doc.theme = BOKEH_THEME
prof.trigger_update()
def profile_server_doc(scheduler, extra, doc):
with log_errors():
doc.title = "Dask: Profile of Event Loop"
prof = ProfileServer(scheduler, sizing_mode="stretch_both", doc=doc)
doc.add_root(prof.root)
doc.template = env.get_template("simple.html")
doc.template_variables.update(extra)
doc.theme = BOKEH_THEME
prof.trigger_update()