1import logging 2import math 3import operator 4import os 5from collections import defaultdict 6from numbers import Number 7 8import numpy as np 9from bokeh.core.properties import without_property_validation 10from bokeh.io import curdoc 11from bokeh.layouts import column, row 12from bokeh.models import ( 13 AdaptiveTicker, 14 Arrow, 15 BasicTicker, 16 BoxSelectTool, 17 BoxZoomTool, 18 CDSView, 19 ColorBar, 20 ColumnDataSource, 21 DataRange1d, 22 GroupFilter, 23 HoverTool, 24 NumberFormatter, 25 NumeralTickFormatter, 26 OpenURL, 27 Panel, 28 PanTool, 29 Range1d, 30 ResetTool, 31 Tabs, 32 TapTool, 33 Title, 34 VeeHead, 35 WheelZoomTool, 36 value, 37) 38from bokeh.models.widgets import DataTable, TableColumn 39from bokeh.models.widgets.markups import Div 40from bokeh.palettes import Viridis11 41from bokeh.plotting import figure 42from bokeh.themes import Theme 43from bokeh.transform import cumsum, factor_cmap, linear_cmap 44from tlz import curry, pipe 45from tlz.curried import concat, groupby, map 46from tornado import escape 47 48import dask 49from dask import config 50from dask.utils import format_bytes, format_time, key_split, parse_timedelta 51 52from distributed.dashboard.components import add_periodic_callback 53from distributed.dashboard.components.shared import ( 54 DashboardComponent, 55 ProfileServer, 56 ProfileTimePlot, 57 SystemMonitor, 58) 59from distributed.dashboard.utils import BOKEH_VERSION, PROFILING, transpose, update 60from distributed.diagnostics.graph_layout import GraphLayout 61from distributed.diagnostics.progress_stream import color_of, progress_quads 62from distributed.diagnostics.task_stream import TaskStreamPlugin 63from distributed.diagnostics.task_stream import color_of as ts_color_of 64from distributed.diagnostics.task_stream import colors as ts_color_lookup 65from distributed.metrics import time 66from distributed.utils import Log, log_errors 67 68if dask.config.get("distributed.dashboard.export-tool"): 69 from distributed.dashboard.export_tool import ExportTool 70else: 71 ExportTool = None # type: ignore 72 73logger = logging.getLogger(__name__) 74 75from jinja2 import Environment, FileSystemLoader 76 77env = Environment( 78 loader=FileSystemLoader( 79 os.path.join(os.path.dirname(__file__), "..", "..", "http", "templates") 80 ) 81) 82 83BOKEH_THEME = Theme( 84 filename=os.path.join(os.path.dirname(__file__), "..", "theme.yaml") 85) 86TICKS_1024 = {"base": 1024, "mantissas": [1, 2, 4, 8, 16, 32, 64, 128, 256, 512]} 87XLABEL_ORIENTATION = -math.pi / 9 # slanted downwards 20 degrees 88 89 90logos_dict = { 91 "numpy": "statics/images/numpy.png", 92 "pandas": "statics/images/pandas.png", 93 "builtins": "statics/images/python.png", 94} 95 96 97class Occupancy(DashboardComponent): 98 """Occupancy (in time) per worker""" 99 100 def __init__(self, scheduler, **kwargs): 101 with log_errors(): 102 self.scheduler = scheduler 103 self.source = ColumnDataSource( 104 { 105 "occupancy": [0, 0], 106 "worker": ["a", "b"], 107 "x": [0.0, 0.1], 108 "y": [1, 2], 109 "ms": [1, 2], 110 "color": ["red", "blue"], 111 "escaped_worker": ["a", "b"], 112 } 113 ) 114 115 self.root = figure( 116 title="Occupancy", 117 tools="", 118 toolbar_location="above", 119 id="bk-occupancy-plot", 120 x_axis_type="datetime", 121 min_border_bottom=50, 122 **kwargs, 123 ) 124 rect = self.root.rect( 125 source=self.source, x="x", width="ms", y="y", height=0.9, color="color" 126 ) 127 rect.nonselection_glyph = None 128 129 self.root.xaxis.minor_tick_line_alpha = 0 130 self.root.yaxis.visible = False 131 self.root.ygrid.visible = False 132 # fig.xaxis[0].formatter = NumeralTickFormatter(format='0.0s') 133 self.root.x_range.start = 0 134 135 tap = TapTool(callback=OpenURL(url="./info/worker/@escaped_worker.html")) 136 137 hover = HoverTool() 138 hover.tooltips = "@worker : @occupancy s." 139 hover.point_policy = "follow_mouse" 140 self.root.add_tools(hover, tap) 141 142 @without_property_validation 143 def update(self): 144 with log_errors(): 145 workers = self.scheduler.workers.values() 146 147 y = list(range(len(workers))) 148 occupancy = [ws.occupancy for ws in workers] 149 ms = [occ * 1000 for occ in occupancy] 150 x = [occ / 500 for occ in occupancy] 151 total = sum(occupancy) 152 color = [] 153 for ws in workers: 154 if ws in self.scheduler.idle: 155 color.append("red") 156 elif ws in self.scheduler.saturated: 157 color.append("green") 158 else: 159 color.append("blue") 160 161 if total: 162 self.root.title.text = ( 163 f"Occupancy -- total time: {format_time(total)} " 164 f"wall time: {format_time(total / self.scheduler.total_nthreads)}" 165 ) 166 else: 167 self.root.title.text = "Occupancy" 168 169 if occupancy: 170 result = { 171 "occupancy": occupancy, 172 "worker": [ws.address for ws in workers], 173 "ms": ms, 174 "color": color, 175 "escaped_worker": [escape.url_escape(ws.address) for ws in workers], 176 "x": x, 177 "y": y, 178 } 179 180 update(self.source, result) 181 182 183class ProcessingHistogram(DashboardComponent): 184 """How many tasks are on each worker""" 185 186 def __init__(self, scheduler, **kwargs): 187 with log_errors(): 188 self.last = 0 189 self.scheduler = scheduler 190 self.source = ColumnDataSource( 191 {"left": [1, 2], "right": [10, 10], "top": [0, 0]} 192 ) 193 194 self.root = figure( 195 title="Tasks Processing (count)", 196 id="bk-nprocessing-histogram-plot", 197 name="processing", 198 y_axis_label="frequency", 199 tools="", 200 **kwargs, 201 ) 202 203 self.root.xaxis.minor_tick_line_alpha = 0 204 self.root.ygrid.visible = False 205 206 self.root.toolbar_location = None 207 208 self.root.quad( 209 source=self.source, 210 left="left", 211 right="right", 212 bottom=0, 213 top="top", 214 color="deepskyblue", 215 fill_alpha=0.5, 216 ) 217 218 @without_property_validation 219 def update(self): 220 L = [len(ws.processing) for ws in self.scheduler.workers.values()] 221 counts, x = np.histogram(L, bins=40) 222 self.source.data.update({"left": x[:-1], "right": x[1:], "top": counts}) 223 224 225def _memory_color(current: int, limit: int) -> str: 226 """Dynamic color used by WorkersMemory and ClusterMemory""" 227 if limit and current > limit: 228 return "red" 229 if limit and current > limit / 2: 230 return "orange" 231 return "blue" 232 233 234class ClusterMemory(DashboardComponent): 235 """Total memory usage on the cluster""" 236 237 def __init__(self, scheduler, width=600, **kwargs): 238 with log_errors(): 239 self.scheduler = scheduler 240 self.source = ColumnDataSource( 241 { 242 "width": [0] * 4, 243 "x": [0] * 4, 244 "y": [0] * 4, 245 "color": ["blue", "blue", "blue", "grey"], 246 "alpha": [1, 0.7, 0.4, 1], 247 "proc_memory": [0] * 4, 248 "managed": [0] * 4, 249 "unmanaged_old": [0] * 4, 250 "unmanaged_recent": [0] * 4, 251 "spilled": [0] * 4, 252 } 253 ) 254 255 self.root = figure( 256 title="Bytes stored on cluster", 257 tools="", 258 id="bk-cluster-memory-plot", 259 width=int(width / 2), 260 name="cluster_memory", 261 min_border_bottom=50, 262 **kwargs, 263 ) 264 rect = self.root.rect( 265 source=self.source, 266 x="x", 267 y="y", 268 width="width", 269 height=0.9, 270 color="color", 271 alpha="alpha", 272 ) 273 rect.nonselection_glyph = None 274 275 self.root.axis[0].ticker = BasicTicker(**TICKS_1024) 276 self.root.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b") 277 self.root.xaxis.major_label_orientation = XLABEL_ORIENTATION 278 self.root.xaxis.minor_tick_line_alpha = 0 279 self.root.x_range = Range1d(start=0) 280 self.root.yaxis.visible = False 281 self.root.ygrid.visible = False 282 283 self.root.toolbar_location = None 284 self.root.yaxis.visible = False 285 286 hover = HoverTool( 287 point_policy="follow_mouse", 288 tooltips=""" 289 <div> 290 <span style="font-size: 12px; font-weight: bold;">Process memory (RSS):</span> 291 <span style="font-size: 10px; font-family: Monaco, monospace;">@proc_memory{0.00 b}</span> 292 </div> 293 <div style="margin-left: 1em;"> 294 <span style="font-size: 12px; font-weight: bold;">Managed:</span> 295 <span style="font-size: 10px; font-family: Monaco, monospace;">@managed{0.00 b}</span> 296 </div> 297 <div style="margin-left: 1em;"> 298 <span style="font-size: 12px; font-weight: bold;">Unmanaged (old):</span> 299 <span style="font-size: 10px; font-family: Monaco, monospace;">@unmanaged_old{0.00 b}</span> 300 </div> 301 <div style="margin-left: 1em;"> 302 <span style="font-size: 12px; font-weight: bold;">Unmanaged (recent):</span> 303 <span style="font-size: 10px; font-family: Monaco, monospace;">@unmanaged_recent{0.00 b}</span> 304 </div> 305 <div> 306 <span style="font-size: 12px; font-weight: bold;">Spilled to disk:</span> 307 <span style="font-size: 10px; font-family: Monaco, monospace;">@spilled{0.00 b}</span> 308 </div> 309 """, 310 ) 311 self.root.add_tools(hover) 312 313 @without_property_validation 314 def update(self): 315 with log_errors(): 316 limit = sum(ws.memory_limit for ws in self.scheduler.workers.values()) 317 meminfo = self.scheduler.memory 318 color = _memory_color(meminfo.process, limit) 319 320 width = [ 321 meminfo.managed_in_memory, 322 meminfo.unmanaged_old, 323 meminfo.unmanaged_recent, 324 meminfo.managed_spilled, 325 ] 326 327 result = { 328 "width": width, 329 "x": [sum(width[:i]) + w / 2 for i, w in enumerate(width)], 330 "color": [color, color, color, "grey"], 331 "proc_memory": [meminfo.process] * 4, 332 "managed": [meminfo.managed_in_memory] * 4, 333 "unmanaged_old": [meminfo.unmanaged_old] * 4, 334 "unmanaged_recent": [meminfo.unmanaged_recent] * 4, 335 "spilled": [meminfo.managed_spilled] * 4, 336 } 337 338 x_end = max(limit, meminfo.process + meminfo.managed_spilled) 339 self.root.x_range.end = x_end 340 341 title = f"Bytes stored: {format_bytes(meminfo.process)}" 342 if meminfo.managed_spilled: 343 title += f" + {format_bytes(meminfo.managed_spilled)} spilled to disk" 344 self.root.title.text = title 345 346 update(self.source, result) 347 348 349class WorkersMemory(DashboardComponent): 350 """Memory usage for single workers""" 351 352 def __init__(self, scheduler, width=600, **kwargs): 353 with log_errors(): 354 self.scheduler = scheduler 355 self.source = ColumnDataSource( 356 { 357 "width": [], 358 "x": [], 359 "y": [], 360 "color": [], 361 "alpha": [], 362 "worker": [], 363 "escaped_worker": [], 364 "proc_memory": [], 365 "managed": [], 366 "unmanaged_old": [], 367 "unmanaged_recent": [], 368 "spilled": [], 369 } 370 ) 371 372 self.root = figure( 373 title="Bytes stored per worker", 374 tools="", 375 id="bk-workers-memory-plot", 376 width=int(width / 2), 377 name="workers_memory", 378 min_border_bottom=50, 379 **kwargs, 380 ) 381 rect = self.root.rect( 382 source=self.source, 383 x="x", 384 y="y", 385 width="width", 386 height=0.9, 387 color="color", 388 fill_alpha="alpha", 389 line_width=0, 390 ) 391 rect.nonselection_glyph = None 392 393 self.root.axis[0].ticker = BasicTicker(**TICKS_1024) 394 self.root.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b") 395 self.root.xaxis.major_label_orientation = XLABEL_ORIENTATION 396 self.root.xaxis.minor_tick_line_alpha = 0 397 self.root.x_range = Range1d(start=0) 398 self.root.yaxis.visible = False 399 self.root.ygrid.visible = False 400 401 tap = TapTool(callback=OpenURL(url="./info/worker/@escaped_worker.html")) 402 self.root.add_tools(tap) 403 404 self.root.toolbar_location = None 405 self.root.yaxis.visible = False 406 407 hover = HoverTool( 408 point_policy="follow_mouse", 409 tooltips=""" 410 <div> 411 <span style="font-size: 12px; font-weight: bold;">Worker:</span> 412 <span style="font-size: 10px; font-family: Monaco, monospace;">@worker</span> 413 </div> 414 <div> 415 <span style="font-size: 12px; font-weight: bold;">Process memory (RSS):</span> 416 <span style="font-size: 10px; font-family: Monaco, monospace;">@proc_memory{0.00 b}</span> 417 </div> 418 <div style="margin-left: 1em;"> 419 <span style="font-size: 12px; font-weight: bold;">Managed:</span> 420 <span style="font-size: 10px; font-family: Monaco, monospace;">@managed{0.00 b}</span> 421 </div> 422 <div style="margin-left: 1em;"> 423 <span style="font-size: 12px; font-weight: bold;">Unmanaged (old):</span> 424 <span style="font-size: 10px; font-family: Monaco, monospace;">@unmanaged_old{0.00 b}</span> 425 </div> 426 <div style="margin-left: 1em;"> 427 <span style="font-size: 12px; font-weight: bold;">Unmanaged (recent):</span> 428 <span style="font-size: 10px; font-family: Monaco, monospace;">@unmanaged_recent{0.00 b}</span> 429 </div> 430 <div> 431 <span style="font-size: 12px; font-weight: bold;">Spilled to disk:</span> 432 <span style="font-size: 10px; font-family: Monaco, monospace;">@spilled{0.00 b}</span> 433 </div> 434 """, 435 ) 436 self.root.add_tools(hover) 437 438 @without_property_validation 439 def update(self): 440 def quadlist(i) -> list: 441 out = [] 442 for ii in i: 443 out += [ii, ii, ii, ii] 444 return out 445 446 with log_errors(): 447 workers = self.scheduler.workers.values() 448 449 width = [] 450 x = [] 451 color = [] 452 max_limit = 0 453 procmemory = [] 454 managed = [] 455 spilled = [] 456 unmanaged_old = [] 457 unmanaged_recent = [] 458 459 for ws in workers: 460 meminfo = ws.memory 461 limit = getattr(ws, "memory_limit", 0) 462 max_limit = max( 463 max_limit, limit, meminfo.process + meminfo.managed_spilled 464 ) 465 color_i = _memory_color(meminfo.process, limit) 466 467 width += [ 468 meminfo.managed_in_memory, 469 meminfo.unmanaged_old, 470 meminfo.unmanaged_recent, 471 meminfo.managed_spilled, 472 ] 473 x += [sum(width[-4:i]) + width[i] / 2 for i in range(-4, 0)] 474 color += [color_i, color_i, color_i, "grey"] 475 476 # memory info 477 procmemory.append(meminfo.process) 478 managed.append(meminfo.managed_in_memory) 479 unmanaged_old.append(meminfo.unmanaged_old) 480 unmanaged_recent.append(meminfo.unmanaged_recent) 481 spilled.append(meminfo.managed_spilled) 482 483 result = { 484 "width": width, 485 "x": x, 486 "color": color, 487 "alpha": [1, 0.7, 0.4, 1] * len(workers), 488 "worker": quadlist(ws.address for ws in workers), 489 "escaped_worker": quadlist( 490 escape.url_escape(ws.address) for ws in workers 491 ), 492 "y": quadlist(range(len(workers))), 493 "proc_memory": quadlist(procmemory), 494 "managed": quadlist(managed), 495 "unmanaged_old": quadlist(unmanaged_old), 496 "unmanaged_recent": quadlist(unmanaged_recent), 497 "spilled": quadlist(spilled), 498 } 499 # Remove rectangles with width=0 500 result = { 501 k: [vi for vi, w in zip(v, width) if w] for k, v in result.items() 502 } 503 504 self.root.x_range.end = max_limit 505 update(self.source, result) 506 507 508class WorkersMemoryHistogram(DashboardComponent): 509 """Histogram of memory usage, showing how many workers there are in each bucket of 510 usage. Replaces the per-worker graph when there are >= 50 workers. 511 """ 512 513 def __init__(self, scheduler, **kwargs): 514 with log_errors(): 515 self.last = 0 516 self.scheduler = scheduler 517 self.source = ColumnDataSource( 518 {"left": [1, 2], "right": [10, 10], "top": [0, 0]} 519 ) 520 521 self.root = figure( 522 title="Bytes stored per worker", 523 name="workers_memory", 524 id="bk-workers-memory-histogram-plot", 525 y_axis_label="frequency", 526 tools="", 527 **kwargs, 528 ) 529 530 self.root.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b") 531 self.root.xaxis.ticker = AdaptiveTicker(**TICKS_1024) 532 self.root.xaxis.major_label_orientation = XLABEL_ORIENTATION 533 534 self.root.xaxis.minor_tick_line_alpha = 0 535 self.root.ygrid.visible = False 536 537 self.root.toolbar_location = None 538 539 self.root.quad( 540 source=self.source, 541 left="left", 542 right="right", 543 bottom=0, 544 top="top", 545 color="deepskyblue", 546 fill_alpha=0.5, 547 ) 548 549 @without_property_validation 550 def update(self): 551 nbytes = np.asarray( 552 [ws.metrics["memory"] for ws in self.scheduler.workers.values()] 553 ) 554 counts, x = np.histogram(nbytes, bins=40) 555 d = {"left": x[:-1], "right": x[1:], "top": counts} 556 update(self.source, d) 557 558 559class BandwidthTypes(DashboardComponent): 560 """Bar chart showing bandwidth per type""" 561 562 def __init__(self, scheduler, **kwargs): 563 with log_errors(): 564 self.last = 0 565 self.scheduler = scheduler 566 self.source = ColumnDataSource( 567 { 568 "bandwidth": [1, 2], 569 "bandwidth-half": [0.5, 1], 570 "type": ["a", "b"], 571 "bandwidth_text": ["1", "2"], 572 } 573 ) 574 575 self.root = figure( 576 title="Bandwidth by Type", 577 tools="", 578 id="bk-bandwidth-type-plot", 579 name="bandwidth_type_histogram", 580 y_range=["a", "b"], 581 **kwargs, 582 ) 583 self.root.xaxis.major_label_orientation = -0.5 584 rect = self.root.rect( 585 source=self.source, 586 x="bandwidth-half", 587 y="type", 588 width="bandwidth", 589 height=0.9, 590 color="blue", 591 ) 592 self.root.x_range.start = 0 593 self.root.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b") 594 self.root.xaxis.ticker = AdaptiveTicker(**TICKS_1024) 595 rect.nonselection_glyph = None 596 597 self.root.xaxis.minor_tick_line_alpha = 0 598 self.root.ygrid.visible = False 599 600 self.root.toolbar_location = None 601 602 hover = HoverTool() 603 hover.tooltips = "@type: @bandwidth_text / s" 604 hover.point_policy = "follow_mouse" 605 self.root.add_tools(hover) 606 607 @without_property_validation 608 def update(self): 609 with log_errors(): 610 bw = self.scheduler.bandwidth_types 611 self.root.y_range.factors = list(sorted(bw)) 612 result = { 613 "bandwidth": list(bw.values()), 614 "bandwidth-half": [b / 2 for b in bw.values()], 615 "type": list(bw.keys()), 616 "bandwidth_text": [format_bytes(x) for x in bw.values()], 617 } 618 self.root.title.text = "Bandwidth: " + format_bytes( 619 self.scheduler.bandwidth 620 ) 621 update(self.source, result) 622 623 624class BandwidthWorkers(DashboardComponent): 625 """How many tasks are on each worker""" 626 627 def __init__(self, scheduler, **kwargs): 628 with log_errors(): 629 self.last = 0 630 self.scheduler = scheduler 631 self.source = ColumnDataSource( 632 { 633 "bandwidth": [1, 2], 634 "source": ["a", "b"], 635 "destination": ["a", "b"], 636 "bandwidth_text": ["1", "2"], 637 } 638 ) 639 640 values = [hex(x)[2:] for x in range(64, 256)][::-1] 641 mapper = linear_cmap( 642 field_name="bandwidth", 643 palette=["#" + x + x + "FF" for x in values], 644 low=0, 645 high=1, 646 ) 647 648 self.root = figure( 649 title="Bandwidth by Worker", 650 tools="", 651 id="bk-bandwidth-worker-plot", 652 name="bandwidth_worker_heatmap", 653 x_range=["a", "b"], 654 y_range=["a", "b"], 655 **kwargs, 656 ) 657 self.root.xaxis.major_label_orientation = XLABEL_ORIENTATION 658 self.root.rect( 659 source=self.source, 660 x="source", 661 y="destination", 662 color=mapper, 663 height=1, 664 width=1, 665 ) 666 667 self.color_map = mapper["transform"] 668 color_bar = ColorBar( 669 color_mapper=self.color_map, 670 label_standoff=12, 671 border_line_color=None, 672 location=(0, 0), 673 ) 674 color_bar.formatter = NumeralTickFormatter(format="0.0 b") 675 color_bar.ticker = AdaptiveTicker(**TICKS_1024) 676 self.root.add_layout(color_bar, "right") 677 678 self.root.toolbar_location = None 679 680 hover = HoverTool() 681 hover.tooltips = """ 682 <div> 683 <p><b>Source:</b> @source </p> 684 <p><b>Destination:</b> @destination </p> 685 <p><b>Bandwidth:</b> @bandwidth_text / s</p> 686 </div> 687 """ 688 hover.point_policy = "follow_mouse" 689 self.root.add_tools(hover) 690 691 @without_property_validation 692 def update(self): 693 with log_errors(): 694 bw = self.scheduler.bandwidth_workers 695 if not bw: 696 return 697 698 def name(address): 699 try: 700 ws = self.scheduler.workers[address] 701 except KeyError: 702 return address 703 if ws.name is not None: 704 return str(ws.name) 705 return address 706 707 x, y, value = zip(*((name(a), name(b), c) for (a, b), c in bw.items())) 708 709 self.color_map.high = max(value) 710 711 factors = list(sorted(set(x + y))) 712 self.root.x_range.factors = factors 713 self.root.y_range.factors = factors[::-1] 714 715 result = { 716 "source": x, 717 "destination": y, 718 "bandwidth": value, 719 "bandwidth_text": list(map(format_bytes, value)), 720 } 721 self.root.title.text = "Bandwidth: " + format_bytes( 722 self.scheduler.bandwidth 723 ) 724 update(self.source, result) 725 726 727class WorkerNetworkBandwidth(DashboardComponent): 728 """Worker network bandwidth chart 729 730 Plots horizontal bars with the read_bytes and write_bytes worker state 731 """ 732 733 def __init__(self, scheduler, **kwargs): 734 with log_errors(): 735 self.scheduler = scheduler 736 self.source = ColumnDataSource( 737 { 738 "y_read": [], 739 "y_write": [], 740 "x_read": [], 741 "x_write": [], 742 "x_read_disk": [], 743 "x_write_disk": [], 744 } 745 ) 746 747 self.bandwidth = figure( 748 title="Worker Network Bandwidth", 749 tools="", 750 id="bk-worker-net-bandwidth", 751 name="worker_network_bandwidth", 752 **kwargs, 753 ) 754 755 # read_bytes 756 self.bandwidth.hbar( 757 y="y_read", 758 right="x_read", 759 line_color=None, 760 left=0, 761 height=0.5, 762 fill_color="red", 763 legend_label="read", 764 source=self.source, 765 ) 766 767 # write_bytes 768 self.bandwidth.hbar( 769 y="y_write", 770 right="x_write", 771 line_color=None, 772 left=0, 773 height=0.5, 774 fill_color="blue", 775 legend_label="write", 776 source=self.source, 777 ) 778 779 self.bandwidth.axis[0].ticker = BasicTicker(**TICKS_1024) 780 self.bandwidth.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b") 781 self.bandwidth.xaxis.major_label_orientation = XLABEL_ORIENTATION 782 self.bandwidth.xaxis.minor_tick_line_alpha = 0 783 self.bandwidth.x_range = Range1d(start=0) 784 self.bandwidth.yaxis.visible = False 785 self.bandwidth.ygrid.visible = False 786 self.bandwidth.toolbar_location = None 787 788 self.disk = figure( 789 title="Workers Disk", 790 tools="", 791 id="bk-workers-disk", 792 name="worker_disk", 793 **kwargs, 794 ) 795 796 # read_bytes_disk 797 self.disk.hbar( 798 y="y_read", 799 right="x_read_disk", 800 line_color=None, 801 left=0, 802 height=0.5, 803 fill_color="red", 804 legend_label="read", 805 source=self.source, 806 ) 807 808 # write_bytes_disk 809 self.disk.hbar( 810 y="y_write", 811 right="x_write_disk", 812 line_color=None, 813 left=0, 814 height=0.5, 815 fill_color="blue", 816 legend_label="write", 817 source=self.source, 818 ) 819 820 self.disk.axis[0].ticker = BasicTicker(**TICKS_1024) 821 self.disk.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b") 822 self.disk.xaxis.major_label_orientation = XLABEL_ORIENTATION 823 self.disk.xaxis.minor_tick_line_alpha = 0 824 self.disk.x_range = Range1d(start=0) 825 self.disk.yaxis.visible = False 826 self.disk.ygrid.visible = False 827 self.disk.toolbar_location = None 828 829 @without_property_validation 830 def update(self): 831 with log_errors(): 832 workers = self.scheduler.workers.values() 833 834 h = 0.1 835 y_read = [i + 0.75 + i * h for i in range(len(workers))] 836 y_write = [i + 0.25 + i * h for i in range(len(workers))] 837 838 x_read = [] 839 x_write = [] 840 x_read_disk = [] 841 x_write_disk = [] 842 843 for ws in workers: 844 x_read.append(ws.metrics["read_bytes"]) 845 x_write.append(ws.metrics["write_bytes"]) 846 x_read_disk.append(ws.metrics["read_bytes_disk"]) 847 x_write_disk.append(ws.metrics["write_bytes_disk"]) 848 849 if self.scheduler.workers: 850 self.bandwidth.x_range.end = max( 851 max(x_read), 852 max(x_write), 853 100_000_000, 854 0.95 * self.bandwidth.x_range.end, 855 ) 856 857 self.disk.x_range.end = max( 858 max(x_read_disk), 859 max(x_write_disk), 860 100_000_000, 861 0.95 * self.disk.x_range.end, 862 ) 863 else: 864 self.bandwidth.x_range.end = 100_000_000 865 self.disk.x_range.end = 100_000_000 866 867 result = { 868 "y_read": y_read, 869 "y_write": y_write, 870 "x_read": x_read, 871 "x_write": x_write, 872 "x_read_disk": x_read_disk, 873 "x_write_disk": x_write_disk, 874 } 875 876 update(self.source, result) 877 878 879class SystemTimeseries(DashboardComponent): 880 """Timeseries for worker network bandwidth, cpu, memory and disk. 881 882 bandwidth: plots the average of read_bytes and write_bytes for the workers 883 as a function of time. 884 cpu: plots the average of cpu for the workers as a function of time. 885 memory: plots the average of memory for the workers as a function of time. 886 disk: plots the average of read_bytes_disk and write_bytes_disk for the workers 887 as a function of time. 888 889 The metrics plotted come from the aggregation of 890 from ws.metrics["val"] for ws in scheduler.workers.values() divided by nuber of workers. 891 """ 892 893 def __init__(self, scheduler, **kwargs): 894 with log_errors(): 895 self.scheduler = scheduler 896 self.source = ColumnDataSource( 897 { 898 "time": [], 899 "read_bytes": [], 900 "write_bytes": [], 901 "cpu": [], 902 "memory": [], 903 "read_bytes_disk": [], 904 "write_bytes_disk": [], 905 } 906 ) 907 908 update(self.source, self.get_data()) 909 910 x_range = DataRange1d(follow="end", follow_interval=20000, range_padding=0) 911 tools = "reset, xpan, xwheel_zoom" 912 913 self.bandwidth = figure( 914 title="Workers Network Bandwidth", 915 x_axis_type="datetime", 916 tools=tools, 917 x_range=x_range, 918 id="bk-worker-network-bandwidth-ts", 919 name="worker_network_bandwidth-timeseries", 920 **kwargs, 921 ) 922 923 self.bandwidth.line( 924 source=self.source, 925 x="time", 926 y="read_bytes", 927 color="red", 928 legend_label="read (mean)", 929 ) 930 self.bandwidth.line( 931 source=self.source, 932 x="time", 933 y="write_bytes", 934 color="blue", 935 legend_label="write (mean)", 936 ) 937 938 self.bandwidth.legend.location = "top_left" 939 self.bandwidth.yaxis.axis_label = "bytes / second" 940 self.bandwidth.yaxis[0].formatter = NumeralTickFormatter(format="0.0b") 941 self.bandwidth.y_range.start = 0 942 self.bandwidth.yaxis.minor_tick_line_alpha = 0 943 self.bandwidth.xgrid.visible = False 944 945 self.cpu = figure( 946 title="Workers CPU", 947 x_axis_type="datetime", 948 tools=tools, 949 x_range=x_range, 950 id="bk-worker-cpu-ts", 951 name="worker_cpu-timeseries", 952 **kwargs, 953 ) 954 955 self.cpu.line( 956 source=self.source, 957 x="time", 958 y="cpu", 959 ) 960 self.cpu.yaxis.axis_label = "Utilization" 961 self.cpu.y_range.start = 0 962 self.cpu.yaxis.minor_tick_line_alpha = 0 963 self.cpu.xgrid.visible = False 964 965 self.memory = figure( 966 title="Workers Memory", 967 x_axis_type="datetime", 968 tools=tools, 969 x_range=x_range, 970 id="bk-worker-memory-ts", 971 name="worker_memory-timeseries", 972 **kwargs, 973 ) 974 975 self.memory.line( 976 source=self.source, 977 x="time", 978 y="memory", 979 ) 980 self.memory.yaxis.axis_label = "Bytes" 981 self.memory.yaxis[0].formatter = NumeralTickFormatter(format="0.0b") 982 self.memory.y_range.start = 0 983 self.memory.yaxis.minor_tick_line_alpha = 0 984 self.memory.xgrid.visible = False 985 986 self.disk = figure( 987 title="Workers Disk", 988 x_axis_type="datetime", 989 tools=tools, 990 x_range=x_range, 991 id="bk-worker-disk-ts", 992 name="worker_disk-timeseries", 993 **kwargs, 994 ) 995 996 self.disk.line( 997 source=self.source, 998 x="time", 999 y="read_bytes_disk", 1000 color="red", 1001 legend_label="read (mean)", 1002 ) 1003 self.disk.line( 1004 source=self.source, 1005 x="time", 1006 y="write_bytes_disk", 1007 color="blue", 1008 legend_label="write (mean)", 1009 ) 1010 1011 self.disk.legend.location = "top_left" 1012 self.disk.yaxis.axis_label = "bytes / second" 1013 self.disk.yaxis[0].formatter = NumeralTickFormatter(format="0.0b") 1014 self.disk.y_range.start = 0 1015 self.disk.yaxis.minor_tick_line_alpha = 0 1016 self.disk.xgrid.visible = False 1017 1018 def get_data(self): 1019 workers = self.scheduler.workers.values() 1020 1021 read_bytes = 0 1022 write_bytes = 0 1023 cpu = 0 1024 memory = 0 1025 read_bytes_disk = 0 1026 write_bytes_disk = 0 1027 time = 0 1028 for ws in workers: 1029 read_bytes += ws.metrics["read_bytes"] 1030 write_bytes += ws.metrics["write_bytes"] 1031 cpu += ws.metrics["cpu"] 1032 memory += ws.metrics["memory"] 1033 read_bytes_disk += ws.metrics["read_bytes_disk"] 1034 write_bytes_disk += ws.metrics["write_bytes_disk"] 1035 time += ws.metrics["time"] 1036 1037 result = { 1038 # use `or` to avoid ZeroDivision when no workers 1039 "time": [time / (len(workers) or 1) * 1000], 1040 "read_bytes": [read_bytes / (len(workers) or 1)], 1041 "write_bytes": [write_bytes / (len(workers) or 1)], 1042 "cpu": [cpu / (len(workers) or 1)], 1043 "memory": [memory / (len(workers) or 1)], 1044 "read_bytes_disk": [read_bytes_disk / (len(workers) or 1)], 1045 "write_bytes_disk": [write_bytes_disk / (len(workers) or 1)], 1046 } 1047 return result 1048 1049 @without_property_validation 1050 def update(self): 1051 with log_errors(): 1052 self.source.stream(self.get_data(), 1000) 1053 1054 if self.scheduler.workers: 1055 y_end_cpu = sum( 1056 ws.nthreads or 1 for ws in self.scheduler.workers.values() 1057 ) / len(self.scheduler.workers.values()) 1058 y_end_mem = sum( 1059 ws.memory_limit for ws in self.scheduler.workers.values() 1060 ) / len(self.scheduler.workers.values()) 1061 else: 1062 y_end_cpu = 1 1063 y_end_mem = 100_000_000 1064 1065 self.cpu.y_range.end = y_end_cpu * 100 1066 self.memory.y_range.end = y_end_mem 1067 1068 1069class ComputePerKey(DashboardComponent): 1070 """Bar chart showing time spend in action by key prefix""" 1071 1072 def __init__(self, scheduler, **kwargs): 1073 with log_errors(): 1074 self.last = 0 1075 self.scheduler = scheduler 1076 1077 if TaskStreamPlugin.name not in self.scheduler.plugins: 1078 self.scheduler.add_plugin(TaskStreamPlugin(self.scheduler)) 1079 1080 compute_data = { 1081 "times": [0.2, 0.1], 1082 "formatted_time": ["0.2 ms", "2.8 us"], 1083 "angles": [3.14, 0.785], 1084 "color": [ts_color_lookup["transfer"], ts_color_lookup["compute"]], 1085 "names": ["sum", "sum_partial"], 1086 } 1087 1088 self.compute_source = ColumnDataSource(data=compute_data) 1089 1090 fig = figure( 1091 title="Compute Time Per Task", 1092 tools="", 1093 id="bk-Compute-by-key-plot", 1094 name="compute_time_per_key", 1095 x_range=["a", "b"], 1096 **kwargs, 1097 ) 1098 1099 rect = fig.vbar( 1100 source=self.compute_source, 1101 x="names", 1102 top="times", 1103 width=0.7, 1104 color="color", 1105 ) 1106 1107 fig.y_range.start = 0 1108 fig.yaxis.axis_label = "Time (s)" 1109 fig.yaxis[0].formatter = NumeralTickFormatter(format="0") 1110 fig.yaxis.ticker = AdaptiveTicker(**TICKS_1024) 1111 fig.xaxis.major_label_orientation = XLABEL_ORIENTATION 1112 rect.nonselection_glyph = None 1113 1114 fig.xaxis.minor_tick_line_alpha = 0 1115 fig.xgrid.visible = False 1116 1117 fig.toolbar_location = None 1118 1119 hover = HoverTool() 1120 hover.tooltips = """ 1121 <div> 1122 <p><b>Name:</b> @names</p> 1123 <p><b>Time:</b> @formatted_time</p> 1124 </div> 1125 """ 1126 hover.point_policy = "follow_mouse" 1127 fig.add_tools(hover) 1128 1129 fig.add_layout( 1130 Title( 1131 text="Note: tasks less than 2% of max are not displayed", 1132 text_font_style="italic", 1133 ), 1134 "below", 1135 ) 1136 1137 self.fig = fig 1138 tab1 = Panel(child=fig, title="Bar Chart") 1139 1140 fig2 = figure( 1141 title="Compute Time Per Task", 1142 tools="", 1143 id="bk-Compute-by-key-pie", 1144 name="compute_time_per_key-pie", 1145 x_range=(-0.5, 1.0), 1146 **kwargs, 1147 ) 1148 1149 fig2.wedge( 1150 x=0, 1151 y=1, 1152 radius=0.4, 1153 start_angle=cumsum("angles", include_zero=True), 1154 end_angle=cumsum("angles"), 1155 line_color="white", 1156 fill_color="color", 1157 legend_field="names", 1158 source=self.compute_source, 1159 ) 1160 1161 fig2.axis.axis_label = None 1162 fig2.axis.visible = False 1163 fig2.grid.grid_line_color = None 1164 fig2.add_layout( 1165 Title( 1166 text="Note: tasks less than 2% of max are not displayed", 1167 text_font_style="italic", 1168 ), 1169 "below", 1170 ) 1171 1172 hover = HoverTool() 1173 hover.tooltips = """ 1174 <div> 1175 <p><b>Name:</b> @names</p> 1176 <p><b>Time:</b> @formatted_time</p> 1177 </div> 1178 """ 1179 hover.point_policy = "follow_mouse" 1180 fig2.add_tools(hover) 1181 self.wedge_fig = fig2 1182 tab2 = Panel(child=fig2, title="Pie Chart") 1183 1184 self.root = Tabs(tabs=[tab1, tab2]) 1185 1186 @without_property_validation 1187 def update(self): 1188 with log_errors(): 1189 compute_times = defaultdict(float) 1190 1191 for key, ts in self.scheduler.task_prefixes.items(): 1192 name = key_split(key) 1193 for action, t in ts.all_durations.items(): 1194 if action == "compute": 1195 compute_times[name] += t 1196 1197 # order by largest time first 1198 compute_times = sorted( 1199 compute_times.items(), key=lambda x: x[1], reverse=True 1200 ) 1201 1202 # keep only time which are 2% of max or greater 1203 if compute_times: 1204 max_time = compute_times[0][1] * 0.02 1205 compute_times = [(n, t) for n, t in compute_times if t > max_time] 1206 compute_colors = list() 1207 compute_names = list() 1208 compute_time = list() 1209 total_time = 0 1210 for name, t in compute_times: 1211 compute_names.append(name) 1212 compute_colors.append(ts_color_of(name)) 1213 compute_time.append(t) 1214 total_time += t 1215 1216 angles = [t / total_time * 2 * math.pi for t in compute_time] 1217 1218 self.fig.x_range.factors = compute_names 1219 1220 compute_result = dict( 1221 angles=angles, 1222 times=compute_time, 1223 color=compute_colors, 1224 names=compute_names, 1225 formatted_time=[format_time(t) for t in compute_time], 1226 ) 1227 1228 update(self.compute_source, compute_result) 1229 1230 1231class AggregateAction(DashboardComponent): 1232 """Bar chart showing time spend in action by key prefix""" 1233 1234 def __init__(self, scheduler, **kwargs): 1235 with log_errors(): 1236 self.last = 0 1237 self.scheduler = scheduler 1238 1239 if TaskStreamPlugin.name not in self.scheduler.plugins: 1240 self.scheduler.add_plugin(TaskStreamPlugin(self.scheduler)) 1241 1242 action_data = { 1243 "times": [0.2, 0.1], 1244 "formatted_time": ["0.2 ms", "2.8 us"], 1245 "color": [ts_color_lookup["transfer"], ts_color_lookup["compute"]], 1246 "names": ["transfer", "compute"], 1247 } 1248 1249 self.action_source = ColumnDataSource(data=action_data) 1250 1251 self.root = figure( 1252 title="Aggregate Per Action", 1253 tools="", 1254 id="bk-aggregate-per-action-plot", 1255 name="aggregate_per_action", 1256 x_range=["a", "b"], 1257 **kwargs, 1258 ) 1259 1260 rect = self.root.vbar( 1261 source=self.action_source, 1262 x="names", 1263 top="times", 1264 width=0.7, 1265 color="color", 1266 ) 1267 1268 self.root.y_range.start = 0 1269 self.root.yaxis[0].formatter = NumeralTickFormatter(format="0") 1270 self.root.yaxis.axis_label = "Time (s)" 1271 self.root.yaxis.ticker = AdaptiveTicker(**TICKS_1024) 1272 self.root.xaxis.major_label_orientation = XLABEL_ORIENTATION 1273 self.root.xaxis.major_label_text_font_size = "16px" 1274 rect.nonselection_glyph = None 1275 1276 self.root.xaxis.minor_tick_line_alpha = 0 1277 self.root.xgrid.visible = False 1278 1279 self.root.toolbar_location = None 1280 1281 hover = HoverTool() 1282 hover.tooltips = """ 1283 <div> 1284 <p><b>Name:</b> @names</p> 1285 <p><b>Time:</b> @formatted_time</p> 1286 </div> 1287 """ 1288 hover.point_policy = "follow_mouse" 1289 self.root.add_tools(hover) 1290 1291 @without_property_validation 1292 def update(self): 1293 with log_errors(): 1294 agg_times = defaultdict(float) 1295 1296 for key, ts in self.scheduler.task_prefixes.items(): 1297 for action, t in ts.all_durations.items(): 1298 agg_times[action] += t 1299 1300 # order by largest time first 1301 agg_times = sorted(agg_times.items(), key=lambda x: x[1], reverse=True) 1302 1303 agg_colors = list() 1304 agg_names = list() 1305 agg_time = list() 1306 for action, t in agg_times: 1307 agg_names.append(action) 1308 if action == "compute": 1309 agg_colors.append("purple") 1310 else: 1311 agg_colors.append(ts_color_lookup[action]) 1312 agg_time.append(t) 1313 1314 self.root.x_range.factors = agg_names 1315 self.root.title.text = "Aggregate Time Per Action" 1316 1317 action_result = dict( 1318 times=agg_time, 1319 color=agg_colors, 1320 names=agg_names, 1321 formatted_time=[format_time(t) for t in agg_time], 1322 ) 1323 1324 update(self.action_source, action_result) 1325 1326 1327class MemoryByKey(DashboardComponent): 1328 """Bar chart showing memory use by key prefix""" 1329 1330 def __init__(self, scheduler, **kwargs): 1331 with log_errors(): 1332 self.last = 0 1333 self.scheduler = scheduler 1334 self.source = ColumnDataSource( 1335 { 1336 "name": ["a", "b"], 1337 "nbytes": [100, 1000], 1338 "count": [1, 2], 1339 "color": ["blue", "blue"], 1340 } 1341 ) 1342 1343 self.root = figure( 1344 title="Memory Use", 1345 tools="", 1346 id="bk-memory-by-key-plot", 1347 name="memory_by_key", 1348 x_range=["a", "b"], 1349 **kwargs, 1350 ) 1351 rect = self.root.vbar( 1352 source=self.source, x="name", top="nbytes", width=0.9, color="color" 1353 ) 1354 self.root.yaxis[0].formatter = NumeralTickFormatter(format="0.0 b") 1355 self.root.yaxis.ticker = AdaptiveTicker(**TICKS_1024) 1356 self.root.xaxis.major_label_orientation = XLABEL_ORIENTATION 1357 rect.nonselection_glyph = None 1358 1359 self.root.xaxis.minor_tick_line_alpha = 0 1360 self.root.ygrid.visible = False 1361 1362 self.root.toolbar_location = None 1363 1364 hover = HoverTool() 1365 hover.tooltips = "@name: @nbytes_text" 1366 hover.tooltips = """ 1367 <div> 1368 <p><b>Name:</b> @name</p> 1369 <p><b>Bytes:</b> @nbytes_text </p> 1370 <p><b>Count:</b> @count objects </p> 1371 </div> 1372 """ 1373 hover.point_policy = "follow_mouse" 1374 self.root.add_tools(hover) 1375 1376 @without_property_validation 1377 def update(self): 1378 with log_errors(): 1379 counts = defaultdict(int) 1380 nbytes = defaultdict(int) 1381 for ws in self.scheduler.workers.values(): 1382 for ts in ws.has_what: 1383 ks = key_split(ts.key) 1384 counts[ks] += 1 1385 nbytes[ks] += ts.nbytes 1386 1387 names = list(sorted(counts)) 1388 self.root.x_range.factors = names 1389 result = { 1390 "name": names, 1391 "count": [counts[name] for name in names], 1392 "nbytes": [nbytes[name] for name in names], 1393 "nbytes_text": [format_bytes(nbytes[name]) for name in names], 1394 "color": [color_of(name) for name in names], 1395 } 1396 self.root.title.text = "Total Use: " + format_bytes(sum(nbytes.values())) 1397 1398 update(self.source, result) 1399 1400 1401class CurrentLoad(DashboardComponent): 1402 """Tasks and CPU usage on each worker""" 1403 1404 def __init__(self, scheduler, width=600, **kwargs): 1405 with log_errors(): 1406 self.last = 0 1407 self.scheduler = scheduler 1408 self.source = ColumnDataSource( 1409 { 1410 "nprocessing": [], 1411 "nprocessing-half": [], 1412 "nprocessing-color": [], 1413 "cpu": [], 1414 "cpu-half": [], 1415 "y": [], 1416 "worker": [], 1417 "escaped_worker": [], 1418 } 1419 ) 1420 processing = figure( 1421 title="Tasks Processing", 1422 tools="", 1423 id="bk-nprocessing-plot", 1424 name="processing", 1425 width=int(width / 2), 1426 min_border_bottom=50, 1427 **kwargs, 1428 ) 1429 rect = processing.rect( 1430 source=self.source, 1431 x="nprocessing-half", 1432 y="y", 1433 width="nprocessing", 1434 height=0.9, 1435 color="nprocessing-color", 1436 ) 1437 processing.x_range.start = 0 1438 rect.nonselection_glyph = None 1439 1440 cpu = figure( 1441 title="CPU Utilization", 1442 tools="", 1443 id="bk-cpu-worker-plot", 1444 width=int(width / 2), 1445 name="cpu_hist", 1446 x_range=(0, 100), 1447 min_border_bottom=50, 1448 **kwargs, 1449 ) 1450 rect = cpu.rect( 1451 source=self.source, 1452 x="cpu-half", 1453 y="y", 1454 width="cpu", 1455 height=0.9, 1456 color="blue", 1457 ) 1458 rect.nonselection_glyph = None 1459 1460 for fig in (processing, cpu): 1461 fig.xaxis.minor_tick_line_alpha = 0 1462 fig.yaxis.visible = False 1463 fig.ygrid.visible = False 1464 1465 tap = TapTool( 1466 callback=OpenURL(url="./info/worker/@escaped_worker.html") 1467 ) 1468 fig.add_tools(tap) 1469 1470 fig.toolbar_location = None 1471 fig.yaxis.visible = False 1472 1473 hover = HoverTool() 1474 hover.tooltips = "@worker : @nprocessing tasks" 1475 hover.point_policy = "follow_mouse" 1476 processing.add_tools(hover) 1477 1478 hover = HoverTool() 1479 hover.tooltips = "@worker : @cpu %" 1480 hover.point_policy = "follow_mouse" 1481 cpu.add_tools(hover) 1482 1483 self.processing_figure = processing 1484 self.cpu_figure = cpu 1485 1486 @without_property_validation 1487 def update(self): 1488 with log_errors(): 1489 workers = self.scheduler.workers.values() 1490 now = time() 1491 if not any(ws.processing for ws in workers) and now < self.last + 1: 1492 return 1493 self.last = now 1494 1495 cpu = [int(ws.metrics["cpu"]) for ws in workers] 1496 nprocessing = [len(ws.processing) for ws in workers] 1497 1498 nprocessing_color = [] 1499 for ws in workers: 1500 if ws in self.scheduler.idle: 1501 nprocessing_color.append("red") 1502 elif ws in self.scheduler.saturated: 1503 nprocessing_color.append("green") 1504 else: 1505 nprocessing_color.append("blue") 1506 1507 result = { 1508 "cpu": cpu, 1509 "cpu-half": [c / 2 for c in cpu], 1510 "nprocessing": nprocessing, 1511 "nprocessing-half": [np / 2 for np in nprocessing], 1512 "nprocessing-color": nprocessing_color, 1513 "worker": [ws.address for ws in workers], 1514 "escaped_worker": [escape.url_escape(ws.address) for ws in workers], 1515 "y": list(range(len(workers))), 1516 } 1517 1518 if self.scheduler.workers: 1519 xrange = max(ws.nthreads or 1 for ws in workers) 1520 else: 1521 xrange = 1 1522 self.cpu_figure.x_range.end = xrange * 100 1523 1524 update(self.source, result) 1525 1526 1527class StealingTimeSeries(DashboardComponent): 1528 def __init__(self, scheduler, **kwargs): 1529 self.scheduler = scheduler 1530 self.source = ColumnDataSource( 1531 { 1532 "time": [time() * 1000, time() * 1000 + 1], 1533 "idle": [0, 0], 1534 "saturated": [0, 0], 1535 } 1536 ) 1537 1538 x_range = DataRange1d(follow="end", follow_interval=20000, range_padding=0) 1539 1540 self.root = figure( 1541 title="Idle and Saturated Workers Over Time", 1542 x_axis_type="datetime", 1543 tools="", 1544 x_range=x_range, 1545 **kwargs, 1546 ) 1547 self.root.line(source=self.source, x="time", y="idle", color="red") 1548 self.root.line(source=self.source, x="time", y="saturated", color="green") 1549 self.root.yaxis.minor_tick_line_color = None 1550 1551 self.root.add_tools( 1552 ResetTool(), PanTool(dimensions="width"), WheelZoomTool(dimensions="width") 1553 ) 1554 1555 @without_property_validation 1556 def update(self): 1557 with log_errors(): 1558 result = { 1559 "time": [time() * 1000], 1560 "idle": [len(self.scheduler.idle)], 1561 "saturated": [len(self.scheduler.saturated)], 1562 } 1563 if PROFILING: 1564 curdoc().add_next_tick_callback( 1565 lambda: self.source.stream(result, 10000) 1566 ) 1567 else: 1568 self.source.stream(result, 10000) 1569 1570 1571class StealingEvents(DashboardComponent): 1572 def __init__(self, scheduler, **kwargs): 1573 self.scheduler = scheduler 1574 self.steal = scheduler.extensions["stealing"] 1575 self.last = 0 1576 self.source = ColumnDataSource( 1577 { 1578 "time": [time() - 20, time()], 1579 "level": [0, 15], 1580 "color": ["white", "white"], 1581 "duration": [0, 0], 1582 "radius": [1, 1], 1583 "cost_factor": [0, 10], 1584 "count": [1, 1], 1585 } 1586 ) 1587 1588 x_range = DataRange1d(follow="end", follow_interval=20000, range_padding=0) 1589 1590 self.root = figure( 1591 title="Stealing Events", 1592 x_axis_type="datetime", 1593 tools="", 1594 x_range=x_range, 1595 **kwargs, 1596 ) 1597 1598 self.root.circle( 1599 source=self.source, 1600 x="time", 1601 y="level", 1602 color="color", 1603 size="radius", 1604 alpha=0.5, 1605 ) 1606 self.root.yaxis.axis_label = "Level" 1607 1608 hover = HoverTool() 1609 hover.tooltips = "Level: @level, Duration: @duration, Count: @count, Cost factor: @cost_factor" 1610 hover.point_policy = "follow_mouse" 1611 1612 self.root.add_tools( 1613 hover, 1614 ResetTool(), 1615 PanTool(dimensions="width"), 1616 WheelZoomTool(dimensions="width"), 1617 ) 1618 1619 def convert(self, msgs): 1620 """Convert a log message to a glyph""" 1621 total_duration = 0 1622 for msg in msgs: 1623 time, level, key, duration, sat, occ_sat, idl, occ_idl = msg 1624 total_duration += duration 1625 1626 try: 1627 color = Viridis11[level] 1628 except (KeyError, IndexError): 1629 color = "black" 1630 1631 radius = math.sqrt(min(total_duration, 10)) * 30 + 2 1632 1633 d = { 1634 "time": time * 1000, 1635 "level": level, 1636 "count": len(msgs), 1637 "color": color, 1638 "duration": total_duration, 1639 "radius": radius, 1640 "cost_factor": self.steal.cost_multipliers[level], 1641 } 1642 1643 return d 1644 1645 @without_property_validation 1646 def update(self): 1647 with log_errors(): 1648 log = self.scheduler.get_events(topic="stealing") 1649 current = len(self.scheduler.events["stealing"]) 1650 n = current - self.last 1651 1652 log = [log[-i][1] for i in range(1, n + 1) if isinstance(log[-i][1], list)] 1653 self.last = current 1654 1655 if log: 1656 new = pipe( 1657 log, 1658 map(groupby(1)), 1659 map(dict.values), 1660 concat, 1661 map(self.convert), 1662 list, 1663 transpose, 1664 ) 1665 if PROFILING: 1666 curdoc().add_next_tick_callback( 1667 lambda: self.source.stream(new, 10000) 1668 ) 1669 else: 1670 self.source.stream(new, 10000) 1671 1672 1673class Events(DashboardComponent): 1674 def __init__(self, scheduler, name, height=150, **kwargs): 1675 self.scheduler = scheduler 1676 self.action_ys = dict() 1677 self.last = 0 1678 self.name = name 1679 self.source = ColumnDataSource( 1680 {"time": [], "action": [], "hover": [], "y": [], "color": []} 1681 ) 1682 1683 x_range = DataRange1d(follow="end", follow_interval=200000) 1684 1685 self.root = figure( 1686 title=name, 1687 x_axis_type="datetime", 1688 height=height, 1689 tools="", 1690 x_range=x_range, 1691 **kwargs, 1692 ) 1693 1694 self.root.circle( 1695 source=self.source, 1696 x="time", 1697 y="y", 1698 color="color", 1699 size=50, 1700 alpha=0.5, 1701 **{"legend_field" if BOKEH_VERSION >= "1.4" else "legend": "action"}, 1702 ) 1703 self.root.yaxis.axis_label = "Action" 1704 self.root.legend.location = "top_left" 1705 1706 hover = HoverTool() 1707 hover.tooltips = "@action<br>@hover" 1708 hover.point_policy = "follow_mouse" 1709 1710 self.root.add_tools( 1711 hover, 1712 ResetTool(), 1713 PanTool(dimensions="width"), 1714 WheelZoomTool(dimensions="width"), 1715 ) 1716 1717 @without_property_validation 1718 def update(self): 1719 with log_errors(): 1720 log = self.scheduler.events[self.name] 1721 n = self.scheduler.event_counts[self.name] - self.last 1722 if log: 1723 log = [log[-i] for i in range(1, n + 1)] 1724 self.last = self.scheduler.event_counts[self.name] 1725 1726 if log: 1727 actions = [] 1728 times = [] 1729 hovers = [] 1730 ys = [] 1731 colors = [] 1732 for msg_time, msg in log: 1733 times.append(msg_time * 1000) 1734 action = msg["action"] 1735 actions.append(action) 1736 try: 1737 ys.append(self.action_ys[action]) 1738 except KeyError: 1739 self.action_ys[action] = len(self.action_ys) 1740 ys.append(self.action_ys[action]) 1741 colors.append(color_of(action)) 1742 hovers.append("TODO") 1743 1744 new = { 1745 "time": times, 1746 "action": actions, 1747 "hover": hovers, 1748 "y": ys, 1749 "color": colors, 1750 } 1751 1752 if PROFILING: 1753 curdoc().add_next_tick_callback( 1754 lambda: self.source.stream(new, 10000) 1755 ) 1756 else: 1757 self.source.stream(new, 10000) 1758 1759 1760class TaskStream(DashboardComponent): 1761 def __init__(self, scheduler, n_rectangles=1000, clear_interval="20s", **kwargs): 1762 self.scheduler = scheduler 1763 self.offset = 0 1764 1765 if TaskStreamPlugin.name not in self.scheduler.plugins: 1766 self.scheduler.add_plugin(TaskStreamPlugin(self.scheduler)) 1767 self.plugin = self.scheduler.plugins[TaskStreamPlugin.name] 1768 1769 self.index = max(0, self.plugin.index - n_rectangles) 1770 self.workers = dict() 1771 self.n_rectangles = n_rectangles 1772 clear_interval = parse_timedelta(clear_interval, default="ms") 1773 self.clear_interval = clear_interval 1774 self.last = 0 1775 self.last_seen = 0 1776 1777 self.source, self.root = task_stream_figure(clear_interval, **kwargs) 1778 1779 # Required for update callback 1780 self.task_stream_index = [0] 1781 1782 @without_property_validation 1783 def update(self): 1784 if self.index == self.plugin.index: 1785 return 1786 with log_errors(): 1787 if self.index and len(self.source.data["start"]): 1788 start = min(self.source.data["start"]) 1789 duration = max(self.source.data["duration"]) 1790 boundary = (self.offset + start - duration) / 1000 1791 else: 1792 boundary = self.offset 1793 rectangles = self.plugin.rectangles( 1794 istart=self.index, workers=self.workers, start_boundary=boundary 1795 ) 1796 n = len(rectangles["name"]) 1797 self.index = self.plugin.index 1798 1799 if not rectangles["start"]: 1800 return 1801 1802 # If it has been a while since we've updated the plot 1803 if time() > self.last_seen + self.clear_interval: 1804 new_start = min(rectangles["start"]) - self.offset 1805 old_start = min(self.source.data["start"]) 1806 old_end = max( 1807 map( 1808 operator.add, 1809 self.source.data["start"], 1810 self.source.data["duration"], 1811 ) 1812 ) 1813 1814 density = ( 1815 sum(self.source.data["duration"]) 1816 / len(self.workers) 1817 / (old_end - old_start) 1818 ) 1819 1820 # If whitespace is more than 3x the old width 1821 if (new_start - old_end) > (old_end - old_start) * 2 or density < 0.05: 1822 self.source.data.update({k: [] for k in rectangles}) # clear 1823 self.offset = min(rectangles["start"]) # redefine offset 1824 1825 rectangles["start"] = [x - self.offset for x in rectangles["start"]] 1826 self.last_seen = time() 1827 1828 # Convert to numpy for serialization speed 1829 if n >= 10 and np: 1830 for k, v in rectangles.items(): 1831 if isinstance(v[0], Number): 1832 rectangles[k] = np.array(v) 1833 1834 if PROFILING: 1835 curdoc().add_next_tick_callback( 1836 lambda: self.source.stream(rectangles, self.n_rectangles) 1837 ) 1838 else: 1839 self.source.stream(rectangles, self.n_rectangles) 1840 1841 1842def task_stream_figure(clear_interval="20s", **kwargs): 1843 """ 1844 kwargs are applied to the bokeh.models.plots.Plot constructor 1845 """ 1846 clear_interval = parse_timedelta(clear_interval, default="ms") 1847 1848 source = ColumnDataSource( 1849 data=dict( 1850 start=[time() - clear_interval], 1851 duration=[0.1], 1852 key=["start"], 1853 name=["start"], 1854 color=["white"], 1855 duration_text=["100 ms"], 1856 worker=["foo"], 1857 y=[0], 1858 worker_thread=[1], 1859 alpha=[0.0], 1860 ) 1861 ) 1862 1863 x_range = DataRange1d(range_padding=0) 1864 y_range = DataRange1d(range_padding=0) 1865 1866 root = figure( 1867 name="task_stream", 1868 title="Task Stream", 1869 id="bk-task-stream-plot", 1870 x_range=x_range, 1871 y_range=y_range, 1872 toolbar_location="above", 1873 x_axis_type="datetime", 1874 y_axis_location=None, 1875 tools="", 1876 min_border_bottom=50, 1877 **kwargs, 1878 ) 1879 1880 rect = root.rect( 1881 source=source, 1882 x="start", 1883 y="y", 1884 width="duration", 1885 height=0.4, 1886 fill_color="color", 1887 line_color="color", 1888 line_alpha=0.6, 1889 fill_alpha="alpha", 1890 line_width=3, 1891 ) 1892 rect.nonselection_glyph = None 1893 1894 root.yaxis.major_label_text_alpha = 0 1895 root.yaxis.minor_tick_line_alpha = 0 1896 root.yaxis.major_tick_line_alpha = 0 1897 root.xgrid.visible = False 1898 1899 hover = HoverTool( 1900 point_policy="follow_mouse", 1901 tooltips=""" 1902 <div> 1903 <span style="font-size: 12px; font-weight: bold;">@name:</span> 1904 <span style="font-size: 10px; font-family: Monaco, monospace;">@duration_text</span> 1905 </div> 1906 """, 1907 ) 1908 1909 tap = TapTool(callback=OpenURL(url="./profile?key=@name")) 1910 1911 root.add_tools( 1912 hover, 1913 tap, 1914 BoxZoomTool(), 1915 ResetTool(), 1916 PanTool(dimensions="width"), 1917 WheelZoomTool(dimensions="width"), 1918 ) 1919 if ExportTool: 1920 export = ExportTool() 1921 export.register_plot(root) 1922 root.add_tools(export) 1923 1924 return source, root 1925 1926 1927class TaskGraph(DashboardComponent): 1928 """ 1929 A dynamic node-link diagram for the task graph on the scheduler 1930 1931 See also the GraphLayout diagnostic at 1932 distributed/diagnostics/graph_layout.py 1933 """ 1934 1935 def __init__(self, scheduler, **kwargs): 1936 self.scheduler = scheduler 1937 self.layout = GraphLayout(scheduler) 1938 scheduler.add_plugin(self.layout) 1939 self.invisible_count = 0 # number of invisible nodes 1940 1941 self.node_source = ColumnDataSource( 1942 {"x": [], "y": [], "name": [], "state": [], "visible": [], "key": []} 1943 ) 1944 self.edge_source = ColumnDataSource({"x": [], "y": [], "visible": []}) 1945 1946 node_view = CDSView( 1947 source=self.node_source, 1948 filters=[GroupFilter(column_name="visible", group="True")], 1949 ) 1950 edge_view = CDSView( 1951 source=self.edge_source, 1952 filters=[GroupFilter(column_name="visible", group="True")], 1953 ) 1954 1955 node_colors = factor_cmap( 1956 "state", 1957 factors=["waiting", "processing", "memory", "released", "erred"], 1958 palette=["gray", "green", "red", "blue", "black"], 1959 ) 1960 1961 self.root = figure(title="Task Graph", **kwargs) 1962 self.subtitle = Title(text=" ", text_font_style="italic") 1963 self.root.add_layout(self.subtitle, "above") 1964 1965 self.root.multi_line( 1966 xs="x", 1967 ys="y", 1968 source=self.edge_source, 1969 line_width=1, 1970 view=edge_view, 1971 color="black", 1972 alpha=0.3, 1973 ) 1974 rect = self.root.square( 1975 x="x", 1976 y="y", 1977 size=10, 1978 color=node_colors, 1979 source=self.node_source, 1980 view=node_view, 1981 **{"legend_field" if BOKEH_VERSION >= "1.4" else "legend": "state"}, 1982 ) 1983 self.root.xgrid.grid_line_color = None 1984 self.root.ygrid.grid_line_color = None 1985 1986 hover = HoverTool( 1987 point_policy="follow_mouse", 1988 tooltips="<b>@name</b>: @state", 1989 renderers=[rect], 1990 ) 1991 tap = TapTool(callback=OpenURL(url="info/task/@key.html"), renderers=[rect]) 1992 rect.nonselection_glyph = None 1993 self.root.add_tools(hover, tap) 1994 self.max_items = config.get("distributed.dashboard.graph-max-items", 5000) 1995 1996 @without_property_validation 1997 def update(self): 1998 with log_errors(): 1999 # If there are too many tasks in the scheduler we'll disable this 2000 # compoonents to not overload scheduler or client. Once we drop 2001 # below the threshold, the data is filled up again as usual 2002 if len(self.scheduler.tasks) > self.max_items: 2003 self.subtitle.text = "Scheduler has too many tasks to display." 2004 for container in [self.node_source, self.edge_source]: 2005 container.data = {col: [] for col in container.column_names} 2006 else: 2007 # occasionally reset the column data source to remove old nodes 2008 if self.invisible_count > len(self.node_source.data["x"]) / 2: 2009 self.layout.reset_index() 2010 self.invisible_count = 0 2011 update = True 2012 else: 2013 update = False 2014 2015 new, self.layout.new = self.layout.new, [] 2016 new_edges = self.layout.new_edges 2017 self.layout.new_edges = [] 2018 2019 self.add_new_nodes_edges(new, new_edges, update=update) 2020 2021 self.patch_updates() 2022 2023 if len(self.scheduler.tasks) == 0: 2024 self.subtitle.text = "Scheduler is empty." 2025 else: 2026 self.subtitle.text = " " 2027 2028 @without_property_validation 2029 def add_new_nodes_edges(self, new, new_edges, update=False): 2030 if new or update: 2031 node_key = [] 2032 node_x = [] 2033 node_y = [] 2034 node_state = [] 2035 node_name = [] 2036 edge_x = [] 2037 edge_y = [] 2038 2039 x = self.layout.x 2040 y = self.layout.y 2041 2042 tasks = self.scheduler.tasks 2043 for key in new: 2044 try: 2045 task = tasks[key] 2046 except KeyError: 2047 continue 2048 xx = x[key] 2049 yy = y[key] 2050 node_key.append(escape.url_escape(key)) 2051 node_x.append(xx) 2052 node_y.append(yy) 2053 node_state.append(task.state) 2054 node_name.append(task.prefix.name) 2055 2056 for a, b in new_edges: 2057 try: 2058 edge_x.append([x[a], x[b]]) 2059 edge_y.append([y[a], y[b]]) 2060 except KeyError: 2061 pass 2062 2063 node = { 2064 "x": node_x, 2065 "y": node_y, 2066 "state": node_state, 2067 "name": node_name, 2068 "key": node_key, 2069 "visible": ["True"] * len(node_x), 2070 } 2071 edge = {"x": edge_x, "y": edge_y, "visible": ["True"] * len(edge_x)} 2072 2073 if update or not len(self.node_source.data["x"]): 2074 # see https://github.com/bokeh/bokeh/issues/7523 2075 self.node_source.data.update(node) 2076 self.edge_source.data.update(edge) 2077 else: 2078 self.node_source.stream(node) 2079 self.edge_source.stream(edge) 2080 2081 @without_property_validation 2082 def patch_updates(self): 2083 """ 2084 Small updates like color changes or lost nodes from task transitions 2085 """ 2086 n = len(self.node_source.data["x"]) 2087 m = len(self.edge_source.data["x"]) 2088 2089 if self.layout.state_updates: 2090 state_updates = self.layout.state_updates 2091 self.layout.state_updates = [] 2092 updates = [(i, c) for i, c in state_updates if i < n] 2093 self.node_source.patch({"state": updates}) 2094 2095 if self.layout.visible_updates: 2096 updates = self.layout.visible_updates 2097 updates = [(i, c) for i, c in updates if i < n] 2098 self.layout.visible_updates = [] 2099 self.node_source.patch({"visible": updates}) 2100 self.invisible_count += len(updates) 2101 2102 if self.layout.visible_edge_updates: 2103 updates = self.layout.visible_edge_updates 2104 updates = [(i, c) for i, c in updates if i < m] 2105 self.layout.visible_edge_updates = [] 2106 self.edge_source.patch({"visible": updates}) 2107 2108 def __del__(self): 2109 self.scheduler.remove_plugin(name=self.layout.name) 2110 2111 2112class TaskGroupGraph(DashboardComponent): 2113 """ 2114 Task Group Graph 2115 2116 Creates a graph layout for TaskGroups on the scheduler. It assigns 2117 (x, y) locations to all the TaskGroups and lays them out by according 2118 to their dependencies. The layout gets updated every time that new 2119 TaskGroups are added. 2120 2121 Each task group node incodes information about task progress, memory, 2122 and output type into glyphs, as well as a hover tooltip with more detailed 2123 information on name, computation time, memory, and tasks status. 2124 """ 2125 2126 def __init__(self, scheduler, **kwargs): 2127 self.scheduler = scheduler 2128 2129 self.nodes_layout = {} 2130 self.arrows_layout = {} 2131 2132 self.old_counter = -1 2133 2134 self.nodes_source = ColumnDataSource( 2135 { 2136 "x": [], 2137 "y": [], 2138 "w_box": [], 2139 "h_box": [], 2140 "name": [], 2141 "tot_tasks": [], 2142 "color": [], 2143 "x_start": [], 2144 "x_end": [], 2145 "y_start": [], 2146 "y_end": [], 2147 "x_end_progress": [], 2148 "mem_alpha": [], 2149 "node_line_width": [], 2150 "comp_tasks": [], 2151 "url_logo": [], 2152 "x_logo": [], 2153 "y_logo": [], 2154 "w_logo": [], 2155 "h_logo": [], 2156 "in_processing": [], 2157 "in_memory": [], 2158 "in_released": [], 2159 "in_erred": [], 2160 "compute_time": [], 2161 "memory": [], 2162 } 2163 ) 2164 2165 self.arrows_source = ColumnDataSource({"xs": [], "ys": [], "xe": [], "ye": []}) 2166 2167 self.root = figure(title="Task Groups Graph", match_aspect=True, **kwargs) 2168 self.root.axis.visible = False 2169 self.subtitle = Title(text=" ", text_font_style="italic") 2170 self.root.add_layout(self.subtitle, "above") 2171 2172 rect = self.root.rect( 2173 x="x", 2174 y="y", 2175 width="w_box", 2176 height="h_box", 2177 color="color", 2178 fill_alpha="mem_alpha", 2179 line_color="black", 2180 line_width="node_line_width", 2181 source=self.nodes_source, 2182 ) 2183 2184 # plot tg log 2185 self.root.image_url( 2186 url="url_logo", 2187 x="x_logo", 2188 y="y_logo", 2189 w="w_logo", 2190 h="h_logo", 2191 anchor="center", 2192 source=self.nodes_source, 2193 ) 2194 2195 # progress bar plain box 2196 self.root.quad( 2197 left="x_start", 2198 right="x_end", 2199 bottom="y_start", 2200 top="y_end", 2201 color=None, 2202 line_color="black", 2203 source=self.nodes_source, 2204 ) 2205 2206 # progress bar 2207 self.root.quad( 2208 left="x_start", 2209 right="x_end_progress", 2210 bottom="y_start", 2211 top="y_end", 2212 color="color", 2213 line_color=None, 2214 fill_alpha=0.6, 2215 source=self.nodes_source, 2216 ) 2217 2218 self.arrows = Arrow( 2219 end=VeeHead(size=8), 2220 line_color="black", 2221 line_alpha=0.5, 2222 line_width=1, 2223 x_start="xs", 2224 y_start="ys", 2225 x_end="xe", 2226 y_end="ye", 2227 source=self.arrows_source, 2228 ) 2229 self.root.add_layout(self.arrows) 2230 2231 self.root.xgrid.grid_line_color = None 2232 self.root.ygrid.grid_line_color = None 2233 self.root.x_range.range_padding = 0.5 2234 self.root.y_range.range_padding = 0.5 2235 2236 hover = HoverTool( 2237 point_policy="follow_mouse", 2238 tooltips=""" 2239 <div> 2240 <span style="font-size: 12px; font-weight: bold;">Name:</span> 2241 <span style="font-size: 10px; font-family: Monaco, monospace;">@name</span> 2242 </div> 2243 <div> 2244 <span style="font-size: 12px; font-weight: bold;">Compute time:</span> 2245 <span style="font-size: 10px; font-family: Monaco, monospace;">@compute_time</span> 2246 </div> 2247 <div> 2248 <span style="font-size: 12px; font-weight: bold;">Memory:</span> 2249 <span style="font-size: 10px; font-family: Monaco, monospace;">@memory</span> 2250 </div> 2251 <div> 2252 <span style="font-size: 12px; font-weight: bold;">Tasks:</span> 2253 <span style="font-size: 10px; font-family: Monaco, monospace;">@tot_tasks</span> 2254 </div> 2255 <div style="margin-left: 2em;"> 2256 <span style="font-size: 12px; font-weight: bold;">Completed:</span> 2257 <span style="font-size: 10px; font-family: Monaco, monospace;">@comp_tasks</span> 2258 </div> 2259 <div style="margin-left: 2em;"> 2260 <span style="font-size: 12px; font-weight: bold;">Processing:</span> 2261 <span style="font-size: 10px; font-family: Monaco, monospace;">@in_processing</span> 2262 </div> 2263 <div style="margin-left: 2em;"> 2264 <span style="font-size: 12px; font-weight: bold;">In memory:</span> 2265 <span style="font-size: 10px; font-family: Monaco, monospace;">@in_memory</span> 2266 </div> 2267 <div style="margin-left: 2em;"> 2268 <span style="font-size: 12px; font-weight: bold;">Erred:</span> 2269 <span style="font-size: 10px; font-family: Monaco, monospace;">@in_erred</span> 2270 </div> 2271 <div style="margin-left: 2em;"> 2272 <span style="font-size: 12px; font-weight: bold;">Released:</span> 2273 <span style="font-size: 10px; font-family: Monaco, monospace;">@in_released</span> 2274 </div> 2275 """, 2276 renderers=[rect], 2277 ) 2278 2279 self.root.add_tools(hover) 2280 2281 @without_property_validation 2282 def update_layout(self): 2283 2284 with log_errors(): 2285 # get dependecies per task group 2286 # in some cases there are tg that have themeselves as dependencies, we remove those. 2287 dependencies = { 2288 k: {ds.name for ds in ts.dependencies if ds.name != k} 2289 for k, ts in self.scheduler.task_groups.items() 2290 } 2291 2292 import dask 2293 2294 order = dask.order.order( 2295 dsk={group.name: 1 for k, group in self.scheduler.task_groups.items()}, 2296 dependencies=dependencies, 2297 ) 2298 2299 ordered = sorted(self.scheduler.task_groups, key=order.get) 2300 2301 xs = {} 2302 ys = {} 2303 locations = set() 2304 nodes_layout = {} 2305 arrows_layout = {} 2306 for tg in ordered: 2307 if dependencies[tg]: 2308 x = max(xs[dep] for dep in dependencies[tg]) + 1 2309 y = max(ys[dep] for dep in dependencies[tg]) 2310 if ( 2311 len(dependencies[tg]) > 1 2312 and len({ys[dep] for dep in dependencies[tg]}) == 1 2313 ): 2314 y += 1 2315 else: 2316 x = 0 2317 y = max(ys.values()) + 1 if ys else 0 2318 2319 while (x, y) in locations: # avoid collisions by moving up 2320 y += 1 2321 2322 locations.add((x, y)) 2323 2324 xs[tg], ys[tg] = x, y 2325 2326 # info neded for node layout to coulmn data source 2327 nodes_layout[tg] = {"x": xs[tg], "y": ys[tg]} 2328 2329 # info needed for arrow layout 2330 arrows_layout[tg] = { 2331 "nstart": dependencies[tg], 2332 "nend": [tg] * len(dependencies[tg]), 2333 } 2334 2335 return nodes_layout, arrows_layout 2336 2337 def compute_size(self, x, min_box, max_box): 2338 start = 0.4 2339 end = 0.8 2340 2341 y = (end - start) / (max_box - min_box) * (x - min_box) + start 2342 2343 return y 2344 2345 @without_property_validation 2346 def update(self): 2347 2348 if self.scheduler.transition_counter == self.old_counter: 2349 return 2350 self.old_counter = self.scheduler.transition_counter 2351 2352 if not self.scheduler.task_groups: 2353 self.subtitle.text = "Scheduler is empty." 2354 else: 2355 self.subtitle.text = " " 2356 2357 if self.nodes_layout.keys() != self.scheduler.task_groups.keys(): 2358 self.nodes_layout, self.arrows_layout = self.update_layout() 2359 2360 nodes_data = { 2361 "x": [], 2362 "y": [], 2363 "w_box": [], 2364 "h_box": [], 2365 "name": [], 2366 "color": [], 2367 "tot_tasks": [], 2368 "x_start": [], 2369 "x_end": [], 2370 "y_start": [], 2371 "y_end": [], 2372 "x_end_progress": [], 2373 "mem_alpha": [], 2374 "node_line_width": [], 2375 "comp_tasks": [], 2376 "url_logo": [], 2377 "x_logo": [], 2378 "y_logo": [], 2379 "w_logo": [], 2380 "h_logo": [], 2381 "in_processing": [], 2382 "in_memory": [], 2383 "in_released": [], 2384 "in_erred": [], 2385 "compute_time": [], 2386 "memory": [], 2387 } 2388 2389 arrows_data = { 2390 "xs": [], 2391 "ys": [], 2392 "xe": [], 2393 "ye": [], 2394 } 2395 2396 durations = set() 2397 nbytes = set() 2398 for key, tg in self.scheduler.task_groups.items(): 2399 2400 if tg.duration and tg.nbytes_total: 2401 durations.add(tg.duration) 2402 nbytes.add(tg.nbytes_total) 2403 2404 durations_min = min(durations, default=0) 2405 durations_max = max(durations, default=0) 2406 nbytes_min = min(nbytes, default=0) 2407 nbytes_max = max(nbytes, default=0) 2408 2409 box_dim = {} 2410 for key, tg in self.scheduler.task_groups.items(): 2411 2412 comp_tasks = ( 2413 tg.states["released"] + tg.states["memory"] + tg.states["erred"] 2414 ) 2415 tot_tasks = sum(tg.states.values()) 2416 2417 # compute width and height of boxes 2418 if ( 2419 tg.duration 2420 and tg.nbytes_total 2421 and comp_tasks 2422 and len(durations) > 1 2423 and len(nbytes) > 1 2424 ): 2425 2426 # scale duration (width) 2427 width_box = self.compute_size( 2428 tg.duration / comp_tasks * tot_tasks, 2429 min_box=durations_min / comp_tasks * tot_tasks, 2430 max_box=durations_max / comp_tasks * tot_tasks, 2431 ) 2432 2433 # need to scale memory (height) 2434 height_box = self.compute_size( 2435 tg.nbytes_total / comp_tasks * tot_tasks, 2436 min_box=nbytes_min / comp_tasks * tot_tasks, 2437 max_box=nbytes_max / comp_tasks * tot_tasks, 2438 ) 2439 2440 else: 2441 width_box = 0.6 2442 height_box = width_box / 2 2443 2444 box_dim[key] = {"width": width_box, "height": height_box} 2445 2446 for key, tg in self.scheduler.task_groups.items(): 2447 x = self.nodes_layout[key]["x"] 2448 y = self.nodes_layout[key]["y"] 2449 width = box_dim[key]["width"] 2450 height = box_dim[key]["height"] 2451 2452 # main boxes layout 2453 nodes_data["x"].append(x) 2454 nodes_data["y"].append(y) 2455 nodes_data["w_box"].append(width) 2456 nodes_data["h_box"].append(height) 2457 2458 comp_tasks = ( 2459 tg.states["released"] + tg.states["memory"] + tg.states["erred"] 2460 ) 2461 tot_tasks = sum(tg.states.values()) 2462 2463 nodes_data["name"].append(tg.prefix.name) 2464 2465 nodes_data["color"].append(color_of(tg.prefix.name)) 2466 nodes_data["tot_tasks"].append(tot_tasks) 2467 2468 # memory alpha factor by 0.4 if not get's too dark 2469 nodes_data["mem_alpha"].append( 2470 (tg.states["memory"] / sum(tg.states.values())) * 0.4 2471 ) 2472 2473 # main box line width 2474 if tg.states["processing"]: 2475 nodes_data["node_line_width"].append(5) 2476 else: 2477 nodes_data["node_line_width"].append(1) 2478 2479 # progress bar data update 2480 nodes_data["x_start"].append(x - width / 2) 2481 nodes_data["x_end"].append(x + width / 2) 2482 2483 nodes_data["y_start"].append(y - height / 2) 2484 nodes_data["y_end"].append(y - height / 2 + height * 0.4) 2485 2486 nodes_data["x_end_progress"].append( 2487 x - width / 2 + width * comp_tasks / tot_tasks 2488 ) 2489 2490 # arrows 2491 arrows_data["xs"] += [ 2492 self.nodes_layout[k]["x"] + box_dim[k]["width"] / 2 2493 for k in self.arrows_layout[key]["nstart"] 2494 ] 2495 arrows_data["ys"] += [ 2496 self.nodes_layout[k]["y"] for k in self.arrows_layout[key]["nstart"] 2497 ] 2498 arrows_data["xe"] += [ 2499 self.nodes_layout[k]["x"] - box_dim[k]["width"] / 2 2500 for k in self.arrows_layout[key]["nend"] 2501 ] 2502 arrows_data["ye"] += [ 2503 self.nodes_layout[k]["y"] for k in self.arrows_layout[key]["nend"] 2504 ] 2505 2506 # LOGOS 2507 if len(tg.types) == 1: 2508 logo_type = next(iter(tg.types)).split(".")[0] 2509 try: 2510 url_logo = logos_dict[logo_type] 2511 except KeyError: 2512 url_logo = "" 2513 else: 2514 url_logo = "" 2515 2516 nodes_data["url_logo"].append(url_logo) 2517 2518 nodes_data["x_logo"].append(x + width / 3) 2519 nodes_data["y_logo"].append(y + height / 3) 2520 2521 ratio = width / height 2522 2523 if ratio > 1: 2524 nodes_data["h_logo"].append(height * 0.3) 2525 nodes_data["w_logo"].append(width * 0.3 / ratio) 2526 else: 2527 nodes_data["h_logo"].append(height * 0.3 * ratio) 2528 nodes_data["w_logo"].append(width * 0.3) 2529 2530 # compute_time and memory 2531 nodes_data["compute_time"].append(format_time(tg.duration)) 2532 nodes_data["memory"].append(format_bytes(tg.nbytes_total)) 2533 2534 # Add some status to hover 2535 tasks_processing = tg.states["processing"] 2536 tasks_memory = tg.states["memory"] 2537 tasks_relased = tg.states["released"] 2538 tasks_erred = tg.states["erred"] 2539 2540 nodes_data["comp_tasks"].append( 2541 f"{comp_tasks} ({comp_tasks / tot_tasks * 100:.0f} %)" 2542 ) 2543 nodes_data["in_processing"].append( 2544 f"{tasks_processing} ({tasks_processing/ tot_tasks * 100:.0f} %)" 2545 ) 2546 nodes_data["in_memory"].append( 2547 f"{tasks_memory} ({tasks_memory/ tot_tasks * 100:.0f} %)" 2548 ) 2549 nodes_data["in_released"].append( 2550 f"{tasks_relased} ({tasks_relased/ tot_tasks * 100:.0f} %)" 2551 ) 2552 nodes_data["in_erred"].append( 2553 f"{ tasks_erred} ({tasks_erred/ tot_tasks * 100:.0f} %)" 2554 ) 2555 2556 self.nodes_source.data.update(nodes_data) 2557 self.arrows_source.data.update(arrows_data) 2558 2559 2560class TaskProgress(DashboardComponent): 2561 """Progress bars per task type""" 2562 2563 def __init__(self, scheduler, **kwargs): 2564 self.scheduler = scheduler 2565 2566 data = progress_quads( 2567 dict(all={}, memory={}, erred={}, released={}, processing={}) 2568 ) 2569 self.source = ColumnDataSource(data=data) 2570 2571 x_range = DataRange1d(range_padding=0) 2572 y_range = Range1d(-8, 0) 2573 2574 self.root = figure( 2575 id="bk-task-progress-plot", 2576 title="Progress", 2577 name="task_progress", 2578 x_range=x_range, 2579 y_range=y_range, 2580 toolbar_location=None, 2581 tools="", 2582 min_border_bottom=50, 2583 **kwargs, 2584 ) 2585 self.root.line( # just to define early ranges 2586 x=[0, 0.9], y=[-1, 0], line_color="#FFFFFF", alpha=0.0 2587 ) 2588 self.root.quad( 2589 source=self.source, 2590 top="top", 2591 bottom="bottom", 2592 left="left", 2593 right="right", 2594 fill_color="#aaaaaa", 2595 line_color="#aaaaaa", 2596 fill_alpha=0.1, 2597 line_alpha=0.3, 2598 ) 2599 self.root.quad( 2600 source=self.source, 2601 top="top", 2602 bottom="bottom", 2603 left="left", 2604 right="released-loc", 2605 fill_color="color", 2606 line_color="color", 2607 fill_alpha=0.6, 2608 ) 2609 self.root.quad( 2610 source=self.source, 2611 top="top", 2612 bottom="bottom", 2613 left="released-loc", 2614 right="memory-loc", 2615 fill_color="color", 2616 line_color="color", 2617 fill_alpha=1.0, 2618 ) 2619 self.root.quad( 2620 source=self.source, 2621 top="top", 2622 bottom="bottom", 2623 left="memory-loc", 2624 right="erred-loc", 2625 fill_color="black", 2626 fill_alpha=0.5, 2627 line_alpha=0, 2628 ) 2629 self.root.quad( 2630 source=self.source, 2631 top="top", 2632 bottom="bottom", 2633 left="erred-loc", 2634 right="processing-loc", 2635 fill_color="gray", 2636 fill_alpha=0.35, 2637 line_alpha=0, 2638 ) 2639 self.root.text( 2640 source=self.source, 2641 text="show-name", 2642 y="bottom", 2643 x="left", 2644 x_offset=5, 2645 text_font_size=value("10pt"), 2646 ) 2647 self.root.text( 2648 source=self.source, 2649 text="done", 2650 y="bottom", 2651 x="right", 2652 x_offset=-5, 2653 text_align="right", 2654 text_font_size=value("10pt"), 2655 ) 2656 self.root.ygrid.visible = False 2657 self.root.yaxis.minor_tick_line_alpha = 0 2658 self.root.yaxis.visible = False 2659 self.root.xgrid.visible = False 2660 self.root.xaxis.minor_tick_line_alpha = 0 2661 self.root.xaxis.visible = False 2662 2663 hover = HoverTool( 2664 point_policy="follow_mouse", 2665 tooltips=""" 2666 <div> 2667 <span style="font-size: 14px; font-weight: bold;">Name:</span> 2668 <span style="font-size: 10px; font-family: Monaco, monospace;">@name</span> 2669 </div> 2670 <div> 2671 <span style="font-size: 14px; font-weight: bold;">All:</span> 2672 <span style="font-size: 10px; font-family: Monaco, monospace;">@all</span> 2673 </div> 2674 <div> 2675 <span style="font-size: 14px; font-weight: bold;">Memory:</span> 2676 <span style="font-size: 10px; font-family: Monaco, monospace;">@memory</span> 2677 </div> 2678 <div> 2679 <span style="font-size: 14px; font-weight: bold;">Erred:</span> 2680 <span style="font-size: 10px; font-family: Monaco, monospace;">@erred</span> 2681 </div> 2682 <div> 2683 <span style="font-size: 14px; font-weight: bold;">Ready:</span> 2684 <span style="font-size: 10px; font-family: Monaco, monospace;">@processing</span> 2685 </div> 2686 """, 2687 ) 2688 self.root.add_tools(hover) 2689 2690 @without_property_validation 2691 def update(self): 2692 with log_errors(): 2693 state = { 2694 "memory": {}, 2695 "erred": {}, 2696 "released": {}, 2697 "processing": {}, 2698 "waiting": {}, 2699 } 2700 2701 for tp in self.scheduler.task_prefixes.values(): 2702 active_states = tp.active_states 2703 if any(active_states.get(s) for s in state.keys()): 2704 state["memory"][tp.name] = active_states["memory"] 2705 state["erred"][tp.name] = active_states["erred"] 2706 state["released"][tp.name] = active_states["released"] 2707 state["processing"][tp.name] = active_states["processing"] 2708 state["waiting"][tp.name] = active_states["waiting"] 2709 2710 state["all"] = { 2711 k: sum(v[k] for v in state.values()) for k in state["memory"] 2712 } 2713 2714 if not state["all"] and not len(self.source.data["all"]): 2715 return 2716 2717 d = progress_quads(state) 2718 2719 update(self.source, d) 2720 2721 totals = { 2722 k: sum(state[k].values()) 2723 for k in ["all", "memory", "erred", "released", "waiting"] 2724 } 2725 totals["processing"] = totals["all"] - sum( 2726 v for k, v in totals.items() if k != "all" 2727 ) 2728 2729 self.root.title.text = ( 2730 "Progress -- total: %(all)s, " 2731 "in-memory: %(memory)s, processing: %(processing)s, " 2732 "waiting: %(waiting)s, " 2733 "erred: %(erred)s" % totals 2734 ) 2735 2736 2737class WorkerTable(DashboardComponent): 2738 """Status of the current workers 2739 2740 This is two plots, a text-based table for each host and a thin horizontal 2741 plot laying out hosts by their current memory use. 2742 """ 2743 2744 excluded_names = { 2745 "executing", 2746 "in_flight", 2747 "in_memory", 2748 "ready", 2749 "time", 2750 "spilled_nbytes", 2751 } 2752 2753 def __init__(self, scheduler, width=800, **kwargs): 2754 self.scheduler = scheduler 2755 self.names = [ 2756 "name", 2757 "address", 2758 "nthreads", 2759 "cpu", 2760 "memory", 2761 "memory_limit", 2762 "memory_percent", 2763 "memory_managed", 2764 "memory_unmanaged_old", 2765 "memory_unmanaged_recent", 2766 "memory_spilled", 2767 "num_fds", 2768 "read_bytes", 2769 "write_bytes", 2770 "cpu_fraction", 2771 ] 2772 workers = self.scheduler.workers.values() 2773 self.extra_names = sorted( 2774 { 2775 m 2776 for ws in workers 2777 for m, v in ws.metrics.items() 2778 if m not in self.names and isinstance(v, (str, int, float)) 2779 } 2780 - self.excluded_names 2781 ) 2782 2783 table_names = [ 2784 "name", 2785 "address", 2786 "nthreads", 2787 "cpu", 2788 "memory", 2789 "memory_limit", 2790 "memory_percent", 2791 "memory_managed", 2792 "memory_unmanaged_old", 2793 "memory_unmanaged_recent", 2794 "memory_spilled", 2795 "num_fds", 2796 "read_bytes", 2797 "write_bytes", 2798 ] 2799 column_title_renames = { 2800 "memory_limit": "limit", 2801 "memory_percent": "memory %", 2802 "memory_managed": "managed", 2803 "memory_unmanaged_old": "unmanaged old", 2804 "memory_unmanaged_recent": "unmanaged recent", 2805 "memory_spilled": "spilled", 2806 "num_fds": "# fds", 2807 "read_bytes": "read", 2808 "write_bytes": "write", 2809 } 2810 2811 self.source = ColumnDataSource({k: [] for k in self.names}) 2812 2813 columns = { 2814 name: TableColumn(field=name, title=column_title_renames.get(name, name)) 2815 for name in table_names 2816 } 2817 2818 formatters = { 2819 "cpu": NumberFormatter(format="0 %"), 2820 "memory_percent": NumberFormatter(format="0.0 %"), 2821 "memory": NumberFormatter(format="0.0 b"), 2822 "memory_limit": NumberFormatter(format="0.0 b"), 2823 "memory_managed": NumberFormatter(format="0.0 b"), 2824 "memory_unmanaged_old": NumberFormatter(format="0.0 b"), 2825 "memory_unmanaged_recent": NumberFormatter(format="0.0 b"), 2826 "memory_spilled": NumberFormatter(format="0.0 b"), 2827 "read_bytes": NumberFormatter(format="0 b"), 2828 "write_bytes": NumberFormatter(format="0 b"), 2829 "num_fds": NumberFormatter(format="0"), 2830 "nthreads": NumberFormatter(format="0"), 2831 } 2832 2833 table = DataTable( 2834 source=self.source, 2835 columns=[columns[n] for n in table_names], 2836 reorderable=True, 2837 sortable=True, 2838 width=width, 2839 index_position=None, 2840 ) 2841 2842 for name in table_names: 2843 if name in formatters: 2844 table.columns[table_names.index(name)].formatter = formatters[name] 2845 2846 extra_names = ["name", "address"] + self.extra_names 2847 extra_columns = { 2848 name: TableColumn(field=name, title=column_title_renames.get(name, name)) 2849 for name in extra_names 2850 } 2851 2852 extra_table = DataTable( 2853 source=self.source, 2854 columns=[extra_columns[n] for n in extra_names], 2855 reorderable=True, 2856 sortable=True, 2857 width=width, 2858 index_position=None, 2859 ) 2860 2861 hover = HoverTool( 2862 point_policy="follow_mouse", 2863 tooltips=""" 2864 <div> 2865 <span style="font-size: 10px; font-family: Monaco, monospace;">Worker (@name): </span> 2866 <span style="font-size: 10px; font-family: Monaco, monospace;">@memory_percent{0.0 %}</span> 2867 </div> 2868 """, 2869 ) 2870 2871 mem_plot = figure( 2872 title="Memory Use (%)", 2873 toolbar_location=None, 2874 x_range=(0, 1), 2875 y_range=(-0.1, 0.1), 2876 height=60, 2877 width=width, 2878 tools="", 2879 min_border_right=0, 2880 **kwargs, 2881 ) 2882 mem_plot.circle( 2883 source=self.source, x="memory_percent", y=0, size=10, fill_alpha=0.5 2884 ) 2885 mem_plot.ygrid.visible = False 2886 mem_plot.yaxis.minor_tick_line_alpha = 0 2887 mem_plot.xaxis.visible = False 2888 mem_plot.yaxis.visible = False 2889 mem_plot.add_tools(hover, BoxSelectTool()) 2890 2891 hover = HoverTool( 2892 point_policy="follow_mouse", 2893 tooltips=""" 2894 <div> 2895 <span style="font-size: 10px; font-family: Monaco, monospace;">Worker (@name): </span> 2896 <span style="font-size: 10px; font-family: Monaco, monospace;">@cpu_fraction{0 %}</span> 2897 </div> 2898 """, 2899 ) 2900 2901 cpu_plot = figure( 2902 title="CPU Use (%)", 2903 toolbar_location=None, 2904 x_range=(0, 1), 2905 y_range=(-0.1, 0.1), 2906 height=60, 2907 width=width, 2908 tools="", 2909 min_border_right=0, 2910 **kwargs, 2911 ) 2912 cpu_plot.circle( 2913 source=self.source, x="cpu_fraction", y=0, size=10, fill_alpha=0.5 2914 ) 2915 cpu_plot.ygrid.visible = False 2916 cpu_plot.yaxis.minor_tick_line_alpha = 0 2917 cpu_plot.xaxis.visible = False 2918 cpu_plot.yaxis.visible = False 2919 cpu_plot.add_tools(hover, BoxSelectTool()) 2920 self.cpu_plot = cpu_plot 2921 2922 if "sizing_mode" in kwargs: 2923 sizing_mode = {"sizing_mode": kwargs["sizing_mode"]} 2924 else: 2925 sizing_mode = {} 2926 2927 components = [cpu_plot, mem_plot, table] 2928 if self.extra_names: 2929 components.append(extra_table) 2930 2931 self.root = column(*components, id="bk-worker-table", **sizing_mode) 2932 2933 @without_property_validation 2934 def update(self): 2935 data = {name: [] for name in self.names + self.extra_names} 2936 for i, (addr, ws) in enumerate( 2937 sorted(self.scheduler.workers.items(), key=lambda kv: str(kv[1].name)) 2938 ): 2939 minfo = ws.memory 2940 2941 for name in self.names + self.extra_names: 2942 data[name].append(ws.metrics.get(name, None)) 2943 data["name"][-1] = ws.name if ws.name is not None else i 2944 data["address"][-1] = ws.address 2945 if ws.memory_limit: 2946 data["memory_percent"][-1] = ws.metrics["memory"] / ws.memory_limit 2947 else: 2948 data["memory_percent"][-1] = "" 2949 data["memory_limit"][-1] = ws.memory_limit 2950 data["memory_managed"][-1] = minfo.managed_in_memory 2951 data["memory_unmanaged_old"][-1] = minfo.unmanaged_old 2952 data["memory_unmanaged_recent"][-1] = minfo.unmanaged_recent 2953 data["memory_unmanaged_recent"][-1] = minfo.unmanaged_recent 2954 data["memory_spilled"][-1] = minfo.managed_spilled 2955 data["cpu"][-1] = ws.metrics["cpu"] / 100.0 2956 data["cpu_fraction"][-1] = ws.metrics["cpu"] / 100.0 / ws.nthreads 2957 data["nthreads"][-1] = ws.nthreads 2958 2959 for name in self.names + self.extra_names: 2960 if name == "name": 2961 data[name].insert(0, f"Total ({len(data[name])})") 2962 continue 2963 try: 2964 if len(self.scheduler.workers) == 0: 2965 total_data = None 2966 elif name == "memory_percent": 2967 total_mem = sum( 2968 ws.memory_limit for ws in self.scheduler.workers.values() 2969 ) 2970 total_data = ( 2971 ( 2972 sum( 2973 ws.metrics["memory"] 2974 for ws in self.scheduler.workers.values() 2975 ) 2976 / total_mem 2977 ) 2978 if total_mem 2979 else "" 2980 ) 2981 elif name == "cpu": 2982 total_data = ( 2983 sum(ws.metrics["cpu"] for ws in self.scheduler.workers.values()) 2984 / 100 2985 / len(self.scheduler.workers.values()) 2986 ) 2987 elif name == "cpu_fraction": 2988 total_data = ( 2989 sum(ws.metrics["cpu"] for ws in self.scheduler.workers.values()) 2990 / 100 2991 / sum(ws.nthreads for ws in self.scheduler.workers.values()) 2992 ) 2993 else: 2994 total_data = sum(data[name]) 2995 2996 data[name].insert(0, total_data) 2997 except TypeError: 2998 data[name].insert(0, None) 2999 3000 self.source.data.update(data) 3001 3002 3003class SchedulerLogs: 3004 def __init__(self, scheduler): 3005 logs = Log( 3006 "\n".join(line for level, line in scheduler.get_logs()) 3007 )._repr_html_() 3008 3009 self.root = Div( 3010 text=logs, 3011 style={ 3012 "width": "100%", 3013 "height": "100%", 3014 "max-width": "1920px", 3015 "max-height": "1080px", 3016 "padding": "12px", 3017 "border": "1px solid lightgray", 3018 "box-shadow": "inset 1px 0 8px 0 lightgray", 3019 "overflow": "auto", 3020 }, 3021 ) 3022 3023 3024def systemmonitor_doc(scheduler, extra, doc): 3025 with log_errors(): 3026 sysmon = SystemMonitor(scheduler, sizing_mode="stretch_both") 3027 doc.title = "Dask: Scheduler System Monitor" 3028 add_periodic_callback(doc, sysmon, 500) 3029 3030 doc.add_root(sysmon.root) 3031 doc.template = env.get_template("simple.html") 3032 doc.template_variables.update(extra) 3033 doc.theme = BOKEH_THEME 3034 3035 3036def stealing_doc(scheduler, extra, doc): 3037 with log_errors(): 3038 occupancy = Occupancy(scheduler) 3039 stealing_ts = StealingTimeSeries(scheduler) 3040 stealing_events = StealingEvents(scheduler) 3041 stealing_events.root.x_range = stealing_ts.root.x_range 3042 doc.title = "Dask: Work Stealing" 3043 add_periodic_callback(doc, occupancy, 500) 3044 add_periodic_callback(doc, stealing_ts, 500) 3045 add_periodic_callback(doc, stealing_events, 500) 3046 3047 doc.add_root( 3048 row( 3049 occupancy.root, 3050 column( 3051 stealing_ts.root, 3052 stealing_events.root, 3053 sizing_mode="stretch_both", 3054 ), 3055 ) 3056 ) 3057 3058 doc.template = env.get_template("simple.html") 3059 doc.template_variables.update(extra) 3060 doc.theme = BOKEH_THEME 3061 3062 3063def events_doc(scheduler, extra, doc): 3064 with log_errors(): 3065 events = Events(scheduler, "all", height=250) 3066 events.update() 3067 add_periodic_callback(doc, events, 500) 3068 doc.title = "Dask: Scheduler Events" 3069 doc.add_root(column(events.root, sizing_mode="scale_width")) 3070 doc.template = env.get_template("simple.html") 3071 doc.template_variables.update(extra) 3072 doc.theme = BOKEH_THEME 3073 3074 3075def workers_doc(scheduler, extra, doc): 3076 with log_errors(): 3077 table = WorkerTable(scheduler) 3078 table.update() 3079 add_periodic_callback(doc, table, 500) 3080 doc.title = "Dask: Workers" 3081 doc.add_root(table.root) 3082 doc.template = env.get_template("simple.html") 3083 doc.template_variables.update(extra) 3084 doc.theme = BOKEH_THEME 3085 3086 3087def tasks_doc(scheduler, extra, doc): 3088 with log_errors(): 3089 ts = TaskStream( 3090 scheduler, 3091 n_rectangles=dask.config.get( 3092 "distributed.scheduler.dashboard.tasks.task-stream-length" 3093 ), 3094 clear_interval="60s", 3095 sizing_mode="stretch_both", 3096 ) 3097 ts.update() 3098 add_periodic_callback(doc, ts, 5000) 3099 doc.title = "Dask: Task Stream" 3100 doc.add_root(ts.root) 3101 doc.template = env.get_template("simple.html") 3102 doc.template_variables.update(extra) 3103 doc.theme = BOKEH_THEME 3104 3105 3106def graph_doc(scheduler, extra, doc): 3107 with log_errors(): 3108 graph = TaskGraph(scheduler, sizing_mode="stretch_both") 3109 doc.title = "Dask: Task Graph" 3110 graph.update() 3111 add_periodic_callback(doc, graph, 200) 3112 doc.add_root(graph.root) 3113 3114 doc.template = env.get_template("simple.html") 3115 doc.template_variables.update(extra) 3116 doc.theme = BOKEH_THEME 3117 3118 3119def tg_graph_doc(scheduler, extra, doc): 3120 with log_errors(): 3121 tg_graph = TaskGroupGraph(scheduler, sizing_mode="stretch_both") 3122 doc.title = "Dask: Task Groups Graph" 3123 tg_graph.update() 3124 add_periodic_callback(doc, tg_graph, 200) 3125 doc.add_root(tg_graph.root) 3126 doc.template = env.get_template("simple.html") 3127 doc.template_variables.update(extra) 3128 doc.theme = BOKEH_THEME 3129 3130 3131def status_doc(scheduler, extra, doc): 3132 with log_errors(): 3133 cluster_memory = ClusterMemory(scheduler, sizing_mode="stretch_both") 3134 cluster_memory.update() 3135 add_periodic_callback(doc, cluster_memory, 100) 3136 doc.add_root(cluster_memory.root) 3137 3138 if len(scheduler.workers) <= 100: 3139 workers_memory = WorkersMemory(scheduler, sizing_mode="stretch_both") 3140 processing = CurrentLoad(scheduler, sizing_mode="stretch_both") 3141 3142 processing_root = processing.processing_figure 3143 else: 3144 workers_memory = WorkersMemoryHistogram( 3145 scheduler, sizing_mode="stretch_both" 3146 ) 3147 processing = ProcessingHistogram(scheduler, sizing_mode="stretch_both") 3148 3149 processing_root = processing.root 3150 3151 current_load = CurrentLoad(scheduler, sizing_mode="stretch_both") 3152 occupancy = Occupancy(scheduler, sizing_mode="stretch_both") 3153 3154 cpu_root = current_load.cpu_figure 3155 occupancy_root = occupancy.root 3156 3157 workers_memory.update() 3158 processing.update() 3159 current_load.update() 3160 occupancy.update() 3161 3162 add_periodic_callback(doc, workers_memory, 100) 3163 add_periodic_callback(doc, processing, 100) 3164 add_periodic_callback(doc, current_load, 100) 3165 add_periodic_callback(doc, occupancy, 100) 3166 3167 doc.add_root(workers_memory.root) 3168 3169 tab1 = Panel(child=processing_root, title="Processing") 3170 tab2 = Panel(child=cpu_root, title="CPU") 3171 tab3 = Panel(child=occupancy_root, title="Occupancy") 3172 3173 proc_tabs = Tabs(tabs=[tab1, tab2, tab3], name="processing_tabs") 3174 doc.add_root(proc_tabs) 3175 3176 task_stream = TaskStream( 3177 scheduler, 3178 n_rectangles=dask.config.get( 3179 "distributed.scheduler.dashboard.status.task-stream-length" 3180 ), 3181 clear_interval="5s", 3182 sizing_mode="stretch_both", 3183 ) 3184 task_stream.update() 3185 add_periodic_callback(doc, task_stream, 100) 3186 doc.add_root(task_stream.root) 3187 3188 task_progress = TaskProgress(scheduler, sizing_mode="stretch_both") 3189 task_progress.update() 3190 add_periodic_callback(doc, task_progress, 100) 3191 doc.add_root(task_progress.root) 3192 3193 doc.title = "Dask: Status" 3194 doc.theme = BOKEH_THEME 3195 doc.template = env.get_template("status.html") 3196 doc.template_variables.update(extra) 3197 3198 3199@curry 3200def individual_doc(cls, interval, scheduler, extra, doc, fig_attr="root", **kwargs): 3201 with log_errors(): 3202 fig = cls(scheduler, sizing_mode="stretch_both", **kwargs) 3203 fig.update() 3204 add_periodic_callback(doc, fig, interval) 3205 doc.add_root(getattr(fig, fig_attr)) 3206 doc.theme = BOKEH_THEME 3207 3208 3209def individual_profile_doc(scheduler, extra, doc): 3210 with log_errors(): 3211 prof = ProfileTimePlot(scheduler, sizing_mode="stretch_both", doc=doc) 3212 doc.add_root(prof.root) 3213 prof.trigger_update() 3214 doc.theme = BOKEH_THEME 3215 3216 3217def individual_profile_server_doc(scheduler, extra, doc): 3218 with log_errors(): 3219 prof = ProfileServer(scheduler, sizing_mode="stretch_both", doc=doc) 3220 doc.add_root(prof.root) 3221 prof.trigger_update() 3222 doc.theme = BOKEH_THEME 3223 3224 3225def profile_doc(scheduler, extra, doc): 3226 with log_errors(): 3227 doc.title = "Dask: Profile" 3228 prof = ProfileTimePlot(scheduler, sizing_mode="stretch_both", doc=doc) 3229 doc.add_root(prof.root) 3230 doc.template = env.get_template("simple.html") 3231 doc.template_variables.update(extra) 3232 doc.theme = BOKEH_THEME 3233 3234 prof.trigger_update() 3235 3236 3237def profile_server_doc(scheduler, extra, doc): 3238 with log_errors(): 3239 doc.title = "Dask: Profile of Event Loop" 3240 prof = ProfileServer(scheduler, sizing_mode="stretch_both", doc=doc) 3241 doc.add_root(prof.root) 3242 doc.template = env.get_template("simple.html") 3243 doc.template_variables.update(extra) 3244 doc.theme = BOKEH_THEME 3245 3246 prof.trigger_update() 3247