1"""The IPython Controller Hub with 0MQ 2 3This is the master object that handles connections from engines and clients, 4and monitors traffic through the various queues. 5""" 6 7# Copyright (c) IPython Development Team. 8# Distributed under the terms of the Modified BSD License. 9 10from __future__ import print_function 11 12import json 13import os 14import sys 15import time 16 17from tornado.gen import coroutine, maybe_future 18import zmq 19from zmq.eventloop.zmqstream import ZMQStream 20 21# internal: 22from ipython_genutils.importstring import import_item 23from ..util import extract_dates 24from jupyter_client.localinterfaces import localhost 25from ipython_genutils.py3compat import cast_bytes, unicode_type, iteritems, buffer_to_bytes_py2 26from traitlets import ( 27 HasTraits, Any, Instance, Integer, Unicode, Dict, Set, Tuple, 28 DottedObjectName, observe 29) 30 31from ipyparallel import error, util 32from ipyparallel.factory import RegistrationFactory 33 34from jupyter_client.session import SessionFactory 35 36from .heartmonitor import HeartMonitor 37 38 39def _passer(*args, **kwargs): 40 return 41 42def _printer(*args, **kwargs): 43 print (args) 44 print (kwargs) 45 46def empty_record(): 47 """Return an empty dict with all record keys.""" 48 return { 49 'msg_id' : None, 50 'header' : None, 51 'metadata' : None, 52 'content': None, 53 'buffers': None, 54 'submitted': None, 55 'client_uuid' : None, 56 'engine_uuid' : None, 57 'started': None, 58 'completed': None, 59 'resubmitted': None, 60 'received': None, 61 'result_header' : None, 62 'result_metadata' : None, 63 'result_content' : None, 64 'result_buffers' : None, 65 'queue' : None, 66 'execute_input' : None, 67 'execute_result': None, 68 'error': None, 69 'stdout': '', 70 'stderr': '', 71 } 72 73def init_record(msg): 74 """Initialize a TaskRecord based on a request.""" 75 header = msg['header'] 76 return { 77 'msg_id' : header['msg_id'], 78 'header' : header, 79 'content': msg['content'], 80 'metadata': msg['metadata'], 81 'buffers': msg['buffers'], 82 'submitted': util.ensure_timezone(header['date']), 83 'client_uuid' : None, 84 'engine_uuid' : None, 85 'started': None, 86 'completed': None, 87 'resubmitted': None, 88 'received': None, 89 'result_header' : None, 90 'result_metadata': None, 91 'result_content' : None, 92 'result_buffers' : None, 93 'queue' : None, 94 'execute_input' : None, 95 'execute_result': None, 96 'error': None, 97 'stdout': '', 98 'stderr': '', 99 } 100 101 102class EngineConnector(HasTraits): 103 """A simple object for accessing the various zmq connections of an object. 104 Attributes are: 105 id (int): engine ID 106 uuid (unicode): engine UUID 107 pending: set of msg_ids 108 stallback: tornado timeout for stalled registration 109 """ 110 111 id = Integer(0) 112 uuid = Unicode() 113 pending = Set() 114 stallback = Any() 115 116 117_db_shortcuts = { 118 'sqlitedb' : 'ipyparallel.controller.sqlitedb.SQLiteDB', 119 'mongodb' : 'ipyparallel.controller.mongodb.MongoDB', 120 'dictdb' : 'ipyparallel.controller.dictdb.DictDB', 121 'nodb' : 'ipyparallel.controller.dictdb.NoDB', 122} 123 124class HubFactory(RegistrationFactory): 125 """The Configurable for setting up a Hub.""" 126 127 # port-pairs for monitoredqueues: 128 hb = Tuple(Integer(), Integer(), config=True, 129 help="""PUB/ROUTER Port pair for Engine heartbeats""") 130 def _hb_default(self): 131 return tuple(util.select_random_ports(2)) 132 133 mux = Tuple(Integer(), Integer(), config=True, 134 help="""Client/Engine Port pair for MUX queue""") 135 136 def _mux_default(self): 137 return tuple(util.select_random_ports(2)) 138 139 task = Tuple(Integer(), Integer(), config=True, 140 help="""Client/Engine Port pair for Task queue""") 141 def _task_default(self): 142 return tuple(util.select_random_ports(2)) 143 144 control = Tuple(Integer(), Integer(), config=True, 145 help="""Client/Engine Port pair for Control queue""") 146 147 def _control_default(self): 148 return tuple(util.select_random_ports(2)) 149 150 iopub = Tuple(Integer(), Integer(), config=True, 151 help="""Client/Engine Port pair for IOPub relay""") 152 153 def _iopub_default(self): 154 return tuple(util.select_random_ports(2)) 155 156 # single ports: 157 mon_port = Integer(config=True, 158 help="""Monitor (SUB) port for queue traffic""") 159 160 def _mon_port_default(self): 161 return util.select_random_ports(1)[0] 162 163 notifier_port = Integer(config=True, 164 help="""PUB port for sending engine status notifications""") 165 166 def _notifier_port_default(self): 167 return util.select_random_ports(1)[0] 168 169 engine_ip = Unicode(config=True, 170 help="IP on which to listen for engine connections. [default: loopback]") 171 def _engine_ip_default(self): 172 return localhost() 173 engine_transport = Unicode('tcp', config=True, 174 help="0MQ transport for engine connections. [default: tcp]") 175 176 client_ip = Unicode(config=True, 177 help="IP on which to listen for client connections. [default: loopback]") 178 client_transport = Unicode('tcp', config=True, 179 help="0MQ transport for client connections. [default : tcp]") 180 181 monitor_ip = Unicode(config=True, 182 help="IP on which to listen for monitor messages. [default: loopback]") 183 monitor_transport = Unicode('tcp', config=True, 184 help="0MQ transport for monitor messages. [default : tcp]") 185 186 _client_ip_default = _monitor_ip_default = _engine_ip_default 187 188 189 monitor_url = Unicode('') 190 191 db_class = DottedObjectName('DictDB', 192 config=True, help="""The class to use for the DB backend 193 194 Options include: 195 196 SQLiteDB: SQLite 197 MongoDB : use MongoDB 198 DictDB : in-memory storage (fastest, but be mindful of memory growth of the Hub) 199 NoDB : disable database altogether (default) 200 201 """) 202 203 registration_timeout = Integer(0, config=True, 204 help="Engine registration timeout in seconds [default: max(30," 205 "10*heartmonitor.period)]" ) 206 207 def _registration_timeout_default(self): 208 if self.heartmonitor is None: 209 # early initialization, this value will be ignored 210 return 0 211 # heartmonitor period is in milliseconds, so 10x in seconds is .01 212 return max(30, int(.01 * self.heartmonitor.period)) 213 214 # not configurable 215 db = Instance('ipyparallel.controller.dictdb.BaseDB', allow_none=True) 216 heartmonitor = Instance('ipyparallel.controller.heartmonitor.HeartMonitor', allow_none=True) 217 218 @observe('ip') 219 def _ip_changed(self, change): 220 new = change['new'] 221 self.engine_ip = new 222 self.client_ip = new 223 self.monitor_ip = new 224 self._update_monitor_url() 225 226 def _update_monitor_url(self): 227 self.monitor_url = "%s://%s:%i" % (self.monitor_transport, self.monitor_ip, self.mon_port) 228 229 @observe('transport') 230 def _transport_changed(self, change): 231 new = change['new'] 232 self.engine_transport = new 233 self.client_transport = new 234 self.monitor_transport = new 235 self._update_monitor_url() 236 237 def __init__(self, **kwargs): 238 super(HubFactory, self).__init__(**kwargs) 239 self._update_monitor_url() 240 241 242 def construct(self): 243 self.init_hub() 244 245 def start(self): 246 self.heartmonitor.start() 247 self.log.info("Heartmonitor started") 248 249 def client_url(self, channel): 250 """return full zmq url for a named client channel""" 251 return "%s://%s:%i" % (self.client_transport, self.client_ip, self.client_info[channel]) 252 253 def engine_url(self, channel): 254 """return full zmq url for a named engine channel""" 255 return "%s://%s:%i" % (self.engine_transport, self.engine_ip, self.engine_info[channel]) 256 257 def init_hub(self): 258 """construct Hub object""" 259 260 ctx = self.context 261 loop = self.loop 262 if 'TaskScheduler.scheme_name' in self.config: 263 scheme = self.config.TaskScheduler.scheme_name 264 else: 265 from .scheduler import TaskScheduler 266 scheme = TaskScheduler.scheme_name.default_value 267 268 # build connection dicts 269 engine = self.engine_info = { 270 'interface' : "%s://%s" % (self.engine_transport, self.engine_ip), 271 'registration' : self.regport, 272 'control' : self.control[1], 273 'mux' : self.mux[1], 274 'hb_ping' : self.hb[0], 275 'hb_pong' : self.hb[1], 276 'task' : self.task[1], 277 'iopub' : self.iopub[1], 278 } 279 280 client = self.client_info = { 281 'interface' : "%s://%s" % (self.client_transport, self.client_ip), 282 'registration' : self.regport, 283 'control' : self.control[0], 284 'mux' : self.mux[0], 285 'task' : self.task[0], 286 'task_scheme' : scheme, 287 'iopub' : self.iopub[0], 288 'notification' : self.notifier_port, 289 } 290 291 self.log.debug("Hub engine addrs: %s", self.engine_info) 292 self.log.debug("Hub client addrs: %s", self.client_info) 293 294 # Registrar socket 295 q = ZMQStream(ctx.socket(zmq.ROUTER), loop) 296 util.set_hwm(q, 0) 297 q.bind(self.client_url('registration')) 298 self.log.info("Hub listening on %s for registration.", self.client_url('registration')) 299 if self.client_ip != self.engine_ip: 300 q.bind(self.engine_url('registration')) 301 self.log.info("Hub listening on %s for registration.", self.engine_url('registration')) 302 303 ### Engine connections ### 304 305 # heartbeat 306 hpub = ctx.socket(zmq.PUB) 307 hpub.bind(self.engine_url('hb_ping')) 308 hrep = ctx.socket(zmq.ROUTER) 309 util.set_hwm(hrep, 0) 310 hrep.bind(self.engine_url('hb_pong')) 311 self.heartmonitor = HeartMonitor(loop=loop, parent=self, log=self.log, 312 pingstream=ZMQStream(hpub,loop), 313 pongstream=ZMQStream(hrep,loop) 314 ) 315 316 ### Client connections ### 317 318 # Notifier socket 319 n = ZMQStream(ctx.socket(zmq.PUB), loop) 320 n.bind(self.client_url('notification')) 321 322 ### build and launch the queues ### 323 324 # monitor socket 325 sub = ctx.socket(zmq.SUB) 326 sub.setsockopt(zmq.SUBSCRIBE, b"") 327 sub.bind(self.monitor_url) 328 sub.bind('inproc://monitor') 329 sub = ZMQStream(sub, loop) 330 331 # connect the db 332 db_class = _db_shortcuts.get(self.db_class.lower(), self.db_class) 333 self.log.info('Hub using DB backend: %r', (db_class.split('.')[-1])) 334 self.db = import_item(str(db_class))(session=self.session.session, 335 parent=self, log=self.log) 336 time.sleep(.25) 337 338 # resubmit stream 339 r = ZMQStream(ctx.socket(zmq.DEALER), loop) 340 url = util.disambiguate_url(self.client_url('task')) 341 r.connect(url) 342 343 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor, 344 query=q, notifier=n, resubmit=r, db=self.db, 345 engine_info=self.engine_info, client_info=self.client_info, 346 log=self.log, registration_timeout=self.registration_timeout, 347 parent=self, 348 ) 349 350 351class Hub(SessionFactory): 352 """The IPython Controller Hub with 0MQ connections 353 354 Parameters 355 ========== 356 loop: zmq IOLoop instance 357 session: Session object 358 <removed> context: zmq context for creating new connections (?) 359 queue: ZMQStream for monitoring the command queue (SUB) 360 query: ZMQStream for engine registration and client queries requests (ROUTER) 361 heartbeat: HeartMonitor object checking the pulse of the engines 362 notifier: ZMQStream for broadcasting engine registration changes (PUB) 363 db: connection to db for out of memory logging of commands 364 NotImplemented 365 engine_info: dict of zmq connection information for engines to connect 366 to the queues. 367 client_info: dict of zmq connection information for engines to connect 368 to the queues. 369 """ 370 371 engine_state_file = Unicode() 372 373 # internal data structures: 374 ids=Set() # engine IDs 375 keytable=Dict() 376 by_ident=Dict() 377 engines=Dict() 378 clients=Dict() 379 hearts=Dict() 380 pending=Set() 381 queues=Dict() # pending msg_ids keyed by engine_id 382 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id 383 completed=Dict() # completed msg_ids keyed by engine_id 384 all_completed=Set() # completed msg_ids keyed by engine_id 385 unassigned=Set() # set of task msg_ds not yet assigned a destination 386 incoming_registrations=Dict() 387 registration_timeout=Integer() 388 _idcounter=Integer(0) 389 distributed_scheduler = Any() 390 391 # objects from constructor: 392 query=Instance(ZMQStream, allow_none=True) 393 monitor=Instance(ZMQStream, allow_none=True) 394 notifier=Instance(ZMQStream, allow_none=True) 395 resubmit=Instance(ZMQStream, allow_none=True) 396 heartmonitor=Instance(HeartMonitor, allow_none=True) 397 db=Instance(object, allow_none=True) 398 client_info=Dict() 399 engine_info=Dict() 400 401 402 def __init__(self, **kwargs): 403 """ 404 # universal: 405 loop: IOLoop for creating future connections 406 session: streamsession for sending serialized data 407 # engine: 408 queue: ZMQStream for monitoring queue messages 409 query: ZMQStream for engine+client registration and client requests 410 heartbeat: HeartMonitor object for tracking engines 411 # extra: 412 db: ZMQStream for db connection (NotImplemented) 413 engine_info: zmq address/protocol dict for engine connections 414 client_info: zmq address/protocol dict for client connections 415 """ 416 417 super(Hub, self).__init__(**kwargs) 418 419 # register our callbacks 420 self.query.on_recv(self.dispatch_query) 421 self.monitor.on_recv(self.dispatch_monitor_traffic) 422 423 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure) 424 self.heartmonitor.add_new_heart_handler(self.handle_new_heart) 425 426 self.monitor_handlers = {b'in' : self.save_queue_request, 427 b'out': self.save_queue_result, 428 b'intask': self.save_task_request, 429 b'outtask': self.save_task_result, 430 b'tracktask': self.save_task_destination, 431 b'incontrol': _passer, 432 b'outcontrol': _passer, 433 b'iopub': self.monitor_iopub_message, 434 } 435 436 self.query_handlers = {'queue_request': self.queue_status, 437 'result_request': self.get_results, 438 'history_request': self.get_history, 439 'db_request': self.db_query, 440 'purge_request': self.purge_results, 441 'load_request': self.check_load, 442 'resubmit_request': self.resubmit_task, 443 'shutdown_request': self.shutdown_request, 444 'registration_request' : self.register_engine, 445 'unregistration_request' : self.unregister_engine, 446 'connection_request': self.connection_request, 447 'become_dask_request': self.become_dask, 448 'stop_distributed_request': self.stop_distributed, 449 } 450 451 # ignore resubmit replies 452 self.resubmit.on_recv(lambda msg: None, copy=False) 453 454 self.log.info("hub::created hub") 455 456 def new_engine_id(self, requested_id=None): 457 """generate a new engine integer id. 458 459 No longer reuse old ids, just count from 0. 460 461 If an id is requested and available, use that. 462 Otherwise, use the counter. 463 """ 464 if requested_id is not None: 465 if requested_id in self.engines: 466 self.log.warning("Engine id %s in use by engine with uuid=%s", 467 requested_id, self.engines[requested_id].uuid) 468 elif requested_id in { 469 ec.id for ec in self.incoming_registrations.values() 470 }: 471 self.log.warning("Engine id %s registration already pending", requested_id) 472 else: 473 self._idcounter = max(requested_id + 1, self._idcounter) 474 return requested_id 475 newid = self._idcounter 476 self._idcounter += 1 477 return newid 478 479 #----------------------------------------------------------------------------- 480 # message validation 481 #----------------------------------------------------------------------------- 482 483 def _validate_targets(self, targets): 484 """turn any valid targets argument into a list of integer ids""" 485 if targets is None: 486 # default to all 487 return self.ids 488 489 if isinstance(targets, (int,str,unicode_type)): 490 # only one target specified 491 targets = [targets] 492 _targets = [] 493 for t in targets: 494 # map raw identities to ids 495 if isinstance(t, (str,unicode_type)): 496 t = self.by_ident.get(cast_bytes(t), t) 497 _targets.append(t) 498 targets = _targets 499 bad_targets = [ t for t in targets if t not in self.ids ] 500 if bad_targets: 501 raise IndexError("No Such Engine: %r" % bad_targets) 502 if not targets: 503 raise IndexError("No Engines Registered") 504 return targets 505 506 #----------------------------------------------------------------------------- 507 # dispatch methods (1 per stream) 508 #----------------------------------------------------------------------------- 509 510 511 @util.log_errors 512 def dispatch_monitor_traffic(self, msg): 513 """all ME and Task queue messages come through here, as well as 514 IOPub traffic.""" 515 self.log.debug("monitor traffic: %r", msg[0]) 516 switch = msg[0] 517 try: 518 idents, msg = self.session.feed_identities(msg[1:]) 519 except ValueError: 520 idents=[] 521 if not idents: 522 self.log.error("Monitor message without topic: %r", msg) 523 return 524 handler = self.monitor_handlers.get(switch, None) 525 if handler is not None: 526 handler(idents, msg) 527 else: 528 self.log.error("Unrecognized monitor topic: %r", switch) 529 530 @util.log_errors 531 @coroutine 532 def dispatch_query(self, msg): 533 """Route registration requests and queries from clients.""" 534 try: 535 idents, msg = self.session.feed_identities(msg) 536 except ValueError: 537 idents = [] 538 if not idents: 539 self.log.error("Bad Query Message: %r", msg) 540 return 541 client_id = idents[0] 542 try: 543 msg = self.session.deserialize(msg, content=True) 544 except Exception: 545 content = error.wrap_exception() 546 self.log.error("Bad Query Message: %r", msg, exc_info=True) 547 self.session.send(self.query, "hub_error", ident=client_id, 548 content=content, parent=msg) 549 return 550 # print client_id, header, parent, content 551 #switch on message type: 552 msg_type = msg['header']['msg_type'] 553 self.log.info("client::client %r requested %r", client_id, msg_type) 554 handler = self.query_handlers.get(msg_type, None) 555 try: 556 if handler is None: 557 raise KeyError("Bad Message Type: %r" % msg_type) 558 except: 559 content = error.wrap_exception() 560 self.log.error("Bad Message Type: %r", msg_type, exc_info=True) 561 self.session.send(self.query, "hub_error", ident=client_id, 562 content=content, parent=msg) 563 return 564 565 try: 566 f = handler(idents, msg) 567 if f: 568 yield maybe_future(f) 569 except Exception: 570 content = error.wrap_exception() 571 self.log.error("Error handling request: %r", msg_type, exc_info=True) 572 self.session.send(self.query, "hub_error", ident=client_id, 573 content=content, parent=msg) 574 575 #--------------------------------------------------------------------------- 576 # handler methods (1 per event) 577 #--------------------------------------------------------------------------- 578 579 #----------------------- Heartbeat -------------------------------------- 580 581 def handle_new_heart(self, heart): 582 """handler to attach to heartbeater. 583 Called when a new heart starts to beat. 584 Triggers completion of registration.""" 585 self.log.debug("heartbeat::handle_new_heart(%r)", heart) 586 if heart not in self.incoming_registrations: 587 self.log.info("heartbeat::ignoring new heart: %r", heart) 588 else: 589 self.finish_registration(heart) 590 591 592 def handle_heart_failure(self, heart): 593 """handler to attach to heartbeater. 594 called when a previously registered heart fails to respond to beat request. 595 triggers unregistration""" 596 self.log.debug("heartbeat::handle_heart_failure(%r)", heart) 597 eid = self.hearts.get(heart, None) 598 if eid is None: 599 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart) 600 else: 601 uuid = self.engines[eid].uuid 602 self.unregister_engine(heart, dict(content=dict(id=eid, queue=uuid))) 603 604 #----------------------- MUX Queue Traffic ------------------------------ 605 606 def save_queue_request(self, idents, msg): 607 if len(idents) < 2: 608 self.log.error("invalid identity prefix: %r", idents) 609 return 610 queue_id, client_id = idents[:2] 611 try: 612 msg = self.session.deserialize(msg) 613 except Exception: 614 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True) 615 return 616 617 eid = self.by_ident.get(queue_id, None) 618 if eid is None: 619 self.log.error("queue::target %r not registered", queue_id) 620 self.log.debug("queue:: valid are: %r", self.by_ident.keys()) 621 return 622 record = init_record(msg) 623 msg_id = record['msg_id'] 624 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid) 625 # Unicode in records 626 record['engine_uuid'] = queue_id.decode('ascii') 627 record['client_uuid'] = msg['header']['session'] 628 record['queue'] = 'mux' 629 630 try: 631 # it's posible iopub arrived first: 632 existing = self.db.get_record(msg_id) 633 for key,evalue in iteritems(existing): 634 rvalue = record.get(key, None) 635 if evalue and rvalue and evalue != rvalue: 636 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue) 637 elif evalue and not rvalue: 638 record[key] = evalue 639 try: 640 self.db.update_record(msg_id, record) 641 except Exception: 642 self.log.error("DB Error updating record %r", msg_id, exc_info=True) 643 except KeyError: 644 try: 645 self.db.add_record(msg_id, record) 646 except Exception: 647 self.log.error("DB Error adding record %r", msg_id, exc_info=True) 648 649 650 self.pending.add(msg_id) 651 self.queues[eid].append(msg_id) 652 653 def save_queue_result(self, idents, msg): 654 if len(idents) < 2: 655 self.log.error("invalid identity prefix: %r", idents) 656 return 657 658 client_id, queue_id = idents[:2] 659 try: 660 msg = self.session.deserialize(msg) 661 except Exception: 662 self.log.error("queue::engine %r sent invalid message to %r: %r", 663 queue_id, client_id, msg, exc_info=True) 664 return 665 666 eid = self.by_ident.get(queue_id, None) 667 if eid is None: 668 self.log.error("queue::unknown engine %r is sending a reply: ", queue_id) 669 return 670 671 parent = msg['parent_header'] 672 if not parent: 673 return 674 msg_id = parent['msg_id'] 675 if msg_id in self.pending: 676 self.pending.remove(msg_id) 677 self.all_completed.add(msg_id) 678 self.queues[eid].remove(msg_id) 679 self.completed[eid].append(msg_id) 680 self.log.info("queue::request %r completed on %s", msg_id, eid) 681 elif msg_id not in self.all_completed: 682 # it could be a result from a dead engine that died before delivering the 683 # result 684 self.log.warn("queue:: unknown msg finished %r", msg_id) 685 return 686 # update record anyway, because the unregistration could have been premature 687 rheader = msg['header'] 688 md = msg['metadata'] 689 completed = util.ensure_timezone(rheader['date']) 690 started = extract_dates(md.get('started', None)) 691 result = { 692 'result_header' : rheader, 693 'result_metadata': md, 694 'result_content': msg['content'], 695 'received': util.utcnow(), 696 'started' : started, 697 'completed' : completed 698 } 699 700 result['result_buffers'] = msg['buffers'] 701 try: 702 self.db.update_record(msg_id, result) 703 except Exception: 704 self.log.error("DB Error updating record %r", msg_id, exc_info=True) 705 706 707 #--------------------- Task Queue Traffic ------------------------------ 708 709 def save_task_request(self, idents, msg): 710 """Save the submission of a task.""" 711 client_id = idents[0] 712 713 try: 714 msg = self.session.deserialize(msg) 715 except Exception: 716 self.log.error("task::client %r sent invalid task message: %r", 717 client_id, msg, exc_info=True) 718 return 719 record = init_record(msg) 720 721 record['client_uuid'] = msg['header']['session'] 722 record['queue'] = 'task' 723 header = msg['header'] 724 msg_id = header['msg_id'] 725 self.pending.add(msg_id) 726 self.unassigned.add(msg_id) 727 try: 728 # it's posible iopub arrived first: 729 existing = self.db.get_record(msg_id) 730 if existing['resubmitted']: 731 for key in ('submitted', 'client_uuid', 'buffers'): 732 # don't clobber these keys on resubmit 733 # submitted and client_uuid should be different 734 # and buffers might be big, and shouldn't have changed 735 record.pop(key) 736 # still check content,header which should not change 737 # but are not expensive to compare as buffers 738 739 for key,evalue in iteritems(existing): 740 if key.endswith('buffers'): 741 # don't compare buffers 742 continue 743 rvalue = record.get(key, None) 744 if evalue and rvalue and evalue != rvalue: 745 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue) 746 elif evalue and not rvalue: 747 record[key] = evalue 748 try: 749 self.db.update_record(msg_id, record) 750 except Exception: 751 self.log.error("DB Error updating record %r", msg_id, exc_info=True) 752 except KeyError: 753 try: 754 self.db.add_record(msg_id, record) 755 except Exception: 756 self.log.error("DB Error adding record %r", msg_id, exc_info=True) 757 except Exception: 758 self.log.error("DB Error saving task request %r", msg_id, exc_info=True) 759 760 def save_task_result(self, idents, msg): 761 """save the result of a completed task.""" 762 client_id = idents[0] 763 try: 764 msg = self.session.deserialize(msg) 765 except Exception: 766 self.log.error("task::invalid task result message send to %r: %r", 767 client_id, msg, exc_info=True) 768 return 769 770 parent = msg['parent_header'] 771 if not parent: 772 # print msg 773 self.log.warn("Task %r had no parent!", msg) 774 return 775 msg_id = parent['msg_id'] 776 if msg_id in self.unassigned: 777 self.unassigned.remove(msg_id) 778 779 header = msg['header'] 780 md = msg['metadata'] 781 engine_uuid = md.get('engine', u'') 782 eid = self.by_ident.get(cast_bytes(engine_uuid), None) 783 784 status = md.get('status', None) 785 786 if msg_id in self.pending: 787 self.log.info("task::task %r finished on %s", msg_id, eid) 788 self.pending.remove(msg_id) 789 self.all_completed.add(msg_id) 790 if eid is not None: 791 if status != 'aborted': 792 self.completed[eid].append(msg_id) 793 if msg_id in self.tasks[eid]: 794 self.tasks[eid].remove(msg_id) 795 completed = util.ensure_timezone(header['date']) 796 started = extract_dates(md.get('started', None)) 797 result = { 798 'result_header' : header, 799 'result_metadata': msg['metadata'], 800 'result_content': msg['content'], 801 'started' : started, 802 'completed' : completed, 803 'received' : util.utcnow(), 804 'engine_uuid': engine_uuid, 805 } 806 807 result['result_buffers'] = msg['buffers'] 808 try: 809 self.db.update_record(msg_id, result) 810 except Exception: 811 self.log.error("DB Error saving task request %r", msg_id, exc_info=True) 812 813 else: 814 self.log.debug("task::unknown task %r finished", msg_id) 815 816 def save_task_destination(self, idents, msg): 817 try: 818 msg = self.session.deserialize(msg, content=True) 819 except Exception: 820 self.log.error("task::invalid task tracking message", exc_info=True) 821 return 822 content = msg['content'] 823 # print (content) 824 msg_id = content['msg_id'] 825 engine_uuid = content['engine_id'] 826 eid = self.by_ident[cast_bytes(engine_uuid)] 827 828 self.log.info("task::task %r arrived on %r", msg_id, eid) 829 if msg_id in self.unassigned: 830 self.unassigned.remove(msg_id) 831 # else: 832 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id)) 833 834 self.tasks[eid].append(msg_id) 835 try: 836 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid)) 837 except Exception: 838 self.log.error("DB Error saving task destination %r", msg_id, exc_info=True) 839 840 841 #--------------------- IOPub Traffic ------------------------------ 842 843 def monitor_iopub_message(self, topics, msg): 844 '''intercept iopub traffic so events can be acted upon''' 845 try: 846 msg = self.session.deserialize(msg, content=True) 847 except Exception: 848 self.log.error("iopub::invalid IOPub message", exc_info=True) 849 return 850 851 msg_type = msg['header']['msg_type'] 852 if msg_type == 'shutdown_reply': 853 session = msg['header']['session'] 854 eid = self.by_ident.get(session, None) 855 uuid = self.engines[eid].uuid 856 self.unregister_engine(ident='shutdown_reply', 857 msg=dict(content=dict(id=eid, queue=uuid))) 858 859 if msg_type not in ('status', 'shutdown_reply', ): 860 self.save_iopub_message(topics, msg) 861 862 def save_iopub_message(self, topics, msg): 863 """save an iopub message into the db""" 864 parent = msg['parent_header'] 865 if not parent: 866 self.log.debug("iopub::IOPub message lacks parent: %r", msg) 867 return 868 msg_id = parent['msg_id'] 869 msg_type = msg['header']['msg_type'] 870 content = msg['content'] 871 872 # ensure msg_id is in db 873 try: 874 rec = self.db.get_record(msg_id) 875 except KeyError: 876 rec = None 877 878 # stream 879 d = {} 880 if msg_type == 'stream': 881 name = content['name'] 882 s = '' if rec is None else rec[name] 883 d[name] = s + content['text'] 884 elif msg_type == 'error': 885 d['error'] = content 886 elif msg_type == 'execute_input': 887 d['execute_input'] = content['code'] 888 elif msg_type in ('display_data', 'execute_result'): 889 d[msg_type] = content 890 elif msg_type == 'data_pub': 891 self.log.info("ignored data_pub message for %s" % msg_id) 892 else: 893 self.log.warn("unhandled iopub msg_type: %r", msg_type) 894 895 if not d: 896 return 897 898 if rec is None: 899 # new record 900 rec = empty_record() 901 rec['msg_id'] = msg_id 902 rec.update(d) 903 d = rec 904 update_record = self.db.add_record 905 else: 906 update_record = self.db.update_record 907 908 try: 909 update_record(msg_id, d) 910 except Exception: 911 self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True) 912 913 914 915 #------------------------------------------------------------------------- 916 # Registration requests 917 #------------------------------------------------------------------------- 918 919 def connection_request(self, client_id, msg): 920 """Reply with connection addresses for clients.""" 921 self.log.info("client::client %r connected", client_id) 922 content = dict(status='ok') 923 jsonable = {} 924 for k,v in iteritems(self.keytable): 925 jsonable[str(k)] = v 926 content['engines'] = jsonable 927 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id) 928 929 def register_engine(self, reg, msg): 930 """Register a new engine.""" 931 content = msg['content'] 932 try: 933 uuid = content['uuid'] 934 except KeyError: 935 self.log.error("registration::queue not specified", exc_info=True) 936 return 937 938 eid = self.new_engine_id(content.get('id')) 939 940 self.log.debug("registration::register_engine(%i, %r)", eid, uuid) 941 942 content = dict(id=eid,status='ok',hb_period=self.heartmonitor.period) 943 # check if requesting available IDs: 944 if cast_bytes(uuid) in self.by_ident: 945 try: 946 raise KeyError("uuid %r in use" % uuid) 947 except: 948 content = error.wrap_exception() 949 self.log.error("uuid %r in use", uuid, exc_info=True) 950 else: 951 for h, ec in iteritems(self.incoming_registrations): 952 if uuid == h: 953 try: 954 raise KeyError("heart_id %r in use" % uuid) 955 except: 956 self.log.error("heart_id %r in use", uuid, exc_info=True) 957 content = error.wrap_exception() 958 break 959 elif uuid == ec.uuid: 960 try: 961 raise KeyError("uuid %r in use" % uuid) 962 except: 963 self.log.error("uuid %r in use", uuid, exc_info=True) 964 content = error.wrap_exception() 965 break 966 967 msg = self.session.send(self.query, "registration_reply", 968 content=content, 969 ident=reg) 970 971 heart = cast_bytes(uuid) 972 973 if content['status'] == 'ok': 974 if heart in self.heartmonitor.hearts: 975 # already beating 976 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid) 977 self.finish_registration(heart) 978 else: 979 purge = lambda : self._purge_stalled_registration(heart) 980 t = self.loop.add_timeout( 981 self.loop.time() + self.registration_timeout, 982 purge, 983 ) 984 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=t) 985 else: 986 self.log.error("registration::registration %i failed: %r", eid, content['evalue']) 987 988 return eid 989 990 def unregister_engine(self, ident, msg): 991 """Unregister an engine that explicitly requested to leave.""" 992 try: 993 eid = msg['content']['id'] 994 except: 995 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True) 996 return 997 self.log.info("registration::unregister_engine(%r)", eid) 998 999 uuid = self.keytable[eid] 1000 content=dict(id=eid, uuid=uuid) 1001 1002 #stop the heartbeats 1003 self.hearts.pop(uuid, None) 1004 self.heartmonitor.responses.discard(uuid) 1005 self.heartmonitor.hearts.discard(uuid) 1006 1007 self.loop.add_timeout( 1008 self.loop.time() + self.registration_timeout, 1009 lambda : self._handle_stranded_msgs(eid, uuid), 1010 ) 1011 1012 # cleanup mappings 1013 self.by_ident.pop(uuid, None) 1014 self.engines.pop(eid, None) 1015 self.keytable.pop(eid, None) 1016 1017 self._save_engine_state() 1018 1019 if self.notifier: 1020 self.session.send(self.notifier, "unregistration_notification", content=content) 1021 1022 def _handle_stranded_msgs(self, eid, uuid): 1023 """Handle messages known to be on an engine when the engine unregisters. 1024 1025 It is possible that this will fire prematurely - that is, an engine will 1026 go down after completing a result, and the client will be notified 1027 that the result failed and later receive the actual result. 1028 """ 1029 1030 outstanding = self.queues[eid] 1031 1032 for msg_id in outstanding: 1033 self.pending.remove(msg_id) 1034 self.all_completed.add(msg_id) 1035 try: 1036 raise error.EngineError("Engine %r died while running task %r" % (eid, msg_id)) 1037 except: 1038 content = error.wrap_exception() 1039 # build a fake header: 1040 header = {} 1041 header['engine'] = uuid 1042 header['date'] = util.utcnow() 1043 rec = dict(result_content=content, result_header=header, result_buffers=[]) 1044 rec['completed'] = util.ensure_timezone(header['date']) 1045 rec['engine_uuid'] = uuid 1046 try: 1047 self.db.update_record(msg_id, rec) 1048 except Exception: 1049 self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True) 1050 1051 1052 def finish_registration(self, heart): 1053 """Second half of engine registration, called after our HeartMonitor 1054 has received a beat from the Engine's Heart.""" 1055 try: 1056 ec = self.incoming_registrations.pop(heart) 1057 except KeyError: 1058 self.log.error("registration::tried to finish nonexistant registration", exc_info=True) 1059 return 1060 self.log.info("registration::finished registering engine %i:%s", ec.id, ec.uuid) 1061 if ec.stallback is not None: 1062 self.loop.remove_timeout(ec.stallback) 1063 eid = ec.id 1064 self.ids.add(eid) 1065 self.keytable[eid] = ec.uuid 1066 self.engines[eid] = ec 1067 self.by_ident[cast_bytes(ec.uuid)] = ec.id 1068 self.queues[eid] = list() 1069 self.tasks[eid] = list() 1070 self.completed[eid] = list() 1071 self.hearts[heart] = eid 1072 content = dict(id=eid, uuid=self.engines[eid].uuid) 1073 if self.notifier: 1074 self.session.send(self.notifier, "registration_notification", content=content) 1075 self.log.info("engine::Engine Connected: %i", eid) 1076 1077 self._save_engine_state() 1078 1079 def _purge_stalled_registration(self, heart): 1080 if heart in self.incoming_registrations: 1081 ec = self.incoming_registrations.pop(heart) 1082 self.log.info("registration::purging stalled registration: %i", ec.id) 1083 else: 1084 pass 1085 1086 #------------------------------------------------------------------------- 1087 # Engine State 1088 #------------------------------------------------------------------------- 1089 1090 1091 def _cleanup_engine_state_file(self): 1092 """cleanup engine state mapping""" 1093 1094 if os.path.exists(self.engine_state_file): 1095 self.log.debug("cleaning up engine state: %s", self.engine_state_file) 1096 try: 1097 os.remove(self.engine_state_file) 1098 except IOError: 1099 self.log.error("Couldn't cleanup file: %s", self.engine_state_file, exc_info=True) 1100 1101 1102 def _save_engine_state(self): 1103 """save engine mapping to JSON file""" 1104 if not self.engine_state_file: 1105 return 1106 self.log.debug("save engine state to %s" % self.engine_state_file) 1107 state = {} 1108 engines = {} 1109 for eid, ec in self.engines.items(): 1110 engines[eid] = ec.uuid 1111 1112 state['engines'] = engines 1113 1114 state['next_id'] = self._idcounter 1115 1116 with open(self.engine_state_file, 'w') as f: 1117 json.dump(state, f) 1118 1119 1120 def _load_engine_state(self): 1121 """load engine mapping from JSON file""" 1122 if not os.path.exists(self.engine_state_file): 1123 return 1124 1125 self.log.info("loading engine state from %s" % self.engine_state_file) 1126 1127 with open(self.engine_state_file) as f: 1128 state = json.load(f) 1129 1130 save_notifier = self.notifier 1131 self.notifier = None 1132 for eid, uuid in iteritems(state['engines']): 1133 heart = uuid.encode('ascii') 1134 # start with this heart as current and beating: 1135 self.heartmonitor.responses.add(heart) 1136 self.heartmonitor.hearts.add(heart) 1137 1138 self.incoming_registrations[heart] = EngineConnector(id=int(eid), uuid=uuid) 1139 self.finish_registration(heart) 1140 1141 self.notifier = save_notifier 1142 1143 self._idcounter = state['next_id'] 1144 1145 #------------------------------------------------------------------------- 1146 # Client Requests 1147 #------------------------------------------------------------------------- 1148 1149 def shutdown_request(self, client_id, msg): 1150 """handle shutdown request.""" 1151 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id, parent=msg) 1152 # also notify other clients of shutdown 1153 self.session.send(self.notifier, 'shutdown_notification', content={'status': 'ok'}, parent=msg) 1154 self.loop.add_timeout(self.loop.time() + 1, self._shutdown) 1155 1156 def _shutdown(self): 1157 self.log.info("hub::hub shutting down.") 1158 time.sleep(0.1) 1159 sys.exit(0) 1160 1161 1162 def check_load(self, client_id, msg): 1163 content = msg['content'] 1164 try: 1165 targets = content['targets'] 1166 targets = self._validate_targets(targets) 1167 except: 1168 content = error.wrap_exception() 1169 self.session.send(self.query, "hub_error", 1170 content=content, ident=client_id, parent=msg) 1171 return 1172 1173 content = dict(status='ok') 1174 # loads = {} 1175 for t in targets: 1176 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t]) 1177 self.session.send(self.query, "load_reply", content=content, ident=client_id, parent=msg) 1178 1179 1180 def queue_status(self, client_id, msg): 1181 """Return the Queue status of one or more targets. 1182 1183 If verbose, return the msg_ids, else return len of each type. 1184 1185 Keys: 1186 1187 * queue (pending MUX jobs) 1188 * tasks (pending Task jobs) 1189 * completed (finished jobs from both queues) 1190 """ 1191 content = msg['content'] 1192 targets = content['targets'] 1193 try: 1194 targets = self._validate_targets(targets) 1195 except: 1196 content = error.wrap_exception() 1197 self.session.send(self.query, "hub_error", 1198 content=content, ident=client_id, parent=msg) 1199 return 1200 verbose = content.get('verbose', False) 1201 content = dict(status='ok') 1202 for t in targets: 1203 queue = self.queues[t] 1204 completed = self.completed[t] 1205 tasks = self.tasks[t] 1206 if not verbose: 1207 queue = len(queue) 1208 completed = len(completed) 1209 tasks = len(tasks) 1210 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks} 1211 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned) 1212 # print (content) 1213 self.session.send(self.query, "queue_reply", content=content, ident=client_id, parent=msg) 1214 1215 def purge_results(self, client_id, msg): 1216 """Purge results from memory. This method is more valuable before we move 1217 to a DB based message storage mechanism.""" 1218 content = msg['content'] 1219 self.log.info("Dropping records with %s", content) 1220 msg_ids = content.get('msg_ids', []) 1221 reply = dict(status='ok') 1222 if msg_ids == 'all': 1223 try: 1224 self.db.drop_matching_records(dict(completed={'$ne':None})) 1225 except Exception: 1226 reply = error.wrap_exception() 1227 self.log.exception("Error dropping records") 1228 else: 1229 pending = [m for m in msg_ids if (m in self.pending)] 1230 if pending: 1231 try: 1232 raise IndexError("msg pending: %r" % pending[0]) 1233 except: 1234 reply = error.wrap_exception() 1235 self.log.exception("Error dropping records") 1236 else: 1237 try: 1238 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids})) 1239 except Exception: 1240 reply = error.wrap_exception() 1241 self.log.exception("Error dropping records") 1242 1243 if reply['status'] == 'ok': 1244 eids = content.get('engine_ids', []) 1245 for eid in eids: 1246 if eid not in self.engines: 1247 try: 1248 raise IndexError("No such engine: %i" % eid) 1249 except: 1250 reply = error.wrap_exception() 1251 self.log.exception("Error dropping records") 1252 break 1253 uid = self.engines[eid].uuid 1254 try: 1255 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None})) 1256 except Exception: 1257 reply = error.wrap_exception() 1258 self.log.exception("Error dropping records") 1259 break 1260 1261 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id, parent=msg) 1262 1263 def resubmit_task(self, client_id, msg): 1264 """Resubmit one or more tasks.""" 1265 parent = msg 1266 def finish(reply): 1267 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id, parent=parent) 1268 1269 content = msg['content'] 1270 msg_ids = content['msg_ids'] 1271 reply = dict(status='ok') 1272 try: 1273 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[ 1274 'header', 'content', 'buffers']) 1275 except Exception: 1276 self.log.error('db::db error finding tasks to resubmit', exc_info=True) 1277 return finish(error.wrap_exception()) 1278 1279 # validate msg_ids 1280 found_ids = [ rec['msg_id'] for rec in records ] 1281 pending_ids = [ msg_id for msg_id in found_ids if msg_id in self.pending ] 1282 if len(records) > len(msg_ids): 1283 try: 1284 raise RuntimeError("DB appears to be in an inconsistent state." 1285 "More matching records were found than should exist") 1286 except Exception: 1287 self.log.exception("Failed to resubmit task") 1288 return finish(error.wrap_exception()) 1289 elif len(records) < len(msg_ids): 1290 missing = [ m for m in msg_ids if m not in found_ids ] 1291 try: 1292 raise KeyError("No such msg(s): %r" % missing) 1293 except KeyError: 1294 self.log.exception("Failed to resubmit task") 1295 return finish(error.wrap_exception()) 1296 elif pending_ids: 1297 pass 1298 # no need to raise on resubmit of pending task, now that we 1299 # resubmit under new ID, but do we want to raise anyway? 1300 # msg_id = invalid_ids[0] 1301 # try: 1302 # raise ValueError("Task(s) %r appears to be inflight" % ) 1303 # except Exception: 1304 # return finish(error.wrap_exception()) 1305 1306 # mapping of original IDs to resubmitted IDs 1307 resubmitted = {} 1308 1309 # send the messages 1310 for rec in records: 1311 header = rec['header'] 1312 msg = self.session.msg(header['msg_type'], parent=header) 1313 msg_id = msg['msg_id'] 1314 msg['content'] = rec['content'] 1315 1316 # use the old header, but update msg_id and timestamp 1317 fresh = msg['header'] 1318 header['msg_id'] = fresh['msg_id'] 1319 header['date'] = fresh['date'] 1320 msg['header'] = header 1321 1322 self.session.send(self.resubmit, msg, buffers=rec['buffers']) 1323 1324 resubmitted[rec['msg_id']] = msg_id 1325 self.pending.add(msg_id) 1326 msg['buffers'] = rec['buffers'] 1327 try: 1328 self.db.add_record(msg_id, init_record(msg)) 1329 except Exception: 1330 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True) 1331 return finish(error.wrap_exception()) 1332 1333 finish(dict(status='ok', resubmitted=resubmitted)) 1334 1335 # store the new IDs in the Task DB 1336 for msg_id, resubmit_id in iteritems(resubmitted): 1337 try: 1338 self.db.update_record(msg_id, {'resubmitted' : resubmit_id}) 1339 except Exception: 1340 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True) 1341 1342 1343 def _extract_record(self, rec): 1344 """decompose a TaskRecord dict into subsection of reply for get_result""" 1345 io_dict = {} 1346 for key in ('execute_input', 'execute_result', 'error', 'stdout', 'stderr'): 1347 io_dict[key] = rec[key] 1348 content = { 1349 'header': rec['header'], 1350 'metadata': rec['metadata'], 1351 'result_metadata': rec['result_metadata'], 1352 'result_header' : rec['result_header'], 1353 'result_content': rec['result_content'], 1354 'received' : rec['received'], 1355 'io' : io_dict, 1356 } 1357 if rec['result_buffers']: 1358 buffers = list(map(buffer_to_bytes_py2, rec['result_buffers'])) 1359 else: 1360 buffers = [] 1361 1362 return content, buffers 1363 1364 def get_results(self, client_id, msg): 1365 """Get the result of 1 or more messages.""" 1366 content = msg['content'] 1367 msg_ids = sorted(set(content['msg_ids'])) 1368 statusonly = content.get('status_only', False) 1369 pending = [] 1370 completed = [] 1371 content = dict(status='ok') 1372 content['pending'] = pending 1373 content['completed'] = completed 1374 buffers = [] 1375 if not statusonly: 1376 try: 1377 matches = self.db.find_records(dict(msg_id={'$in':msg_ids})) 1378 # turn match list into dict, for faster lookup 1379 records = {} 1380 for rec in matches: 1381 records[rec['msg_id']] = rec 1382 except Exception: 1383 content = error.wrap_exception() 1384 self.log.exception("Failed to get results") 1385 self.session.send(self.query, "result_reply", content=content, 1386 parent=msg, ident=client_id) 1387 return 1388 else: 1389 records = {} 1390 for msg_id in msg_ids: 1391 if msg_id in self.pending: 1392 pending.append(msg_id) 1393 elif msg_id in self.all_completed: 1394 completed.append(msg_id) 1395 if not statusonly: 1396 c,bufs = self._extract_record(records[msg_id]) 1397 content[msg_id] = c 1398 buffers.extend(bufs) 1399 elif msg_id in records: 1400 if rec['completed']: 1401 completed.append(msg_id) 1402 c,bufs = self._extract_record(records[msg_id]) 1403 content[msg_id] = c 1404 buffers.extend(bufs) 1405 else: 1406 pending.append(msg_id) 1407 else: 1408 try: 1409 raise KeyError('No such message: '+msg_id) 1410 except: 1411 content = error.wrap_exception() 1412 break 1413 self.session.send(self.query, "result_reply", content=content, 1414 parent=msg, ident=client_id, 1415 buffers=buffers) 1416 1417 def get_history(self, client_id, msg): 1418 """Get a list of all msg_ids in our DB records""" 1419 try: 1420 msg_ids = self.db.get_history() 1421 except Exception as e: 1422 content = error.wrap_exception() 1423 self.log.exception("Failed to get history") 1424 else: 1425 content = dict(status='ok', history=msg_ids) 1426 1427 self.session.send(self.query, "history_reply", content=content, 1428 parent=msg, ident=client_id) 1429 1430 def db_query(self, client_id, msg): 1431 """Perform a raw query on the task record database.""" 1432 content = msg['content'] 1433 query = extract_dates(content.get('query', {})) 1434 keys = content.get('keys', None) 1435 buffers = [] 1436 empty = list() 1437 try: 1438 records = self.db.find_records(query, keys) 1439 except Exception as e: 1440 content = error.wrap_exception() 1441 self.log.exception("DB query failed") 1442 else: 1443 # extract buffers from reply content: 1444 if keys is not None: 1445 buffer_lens = [] if 'buffers' in keys else None 1446 result_buffer_lens = [] if 'result_buffers' in keys else None 1447 else: 1448 buffer_lens = None 1449 result_buffer_lens = None 1450 1451 for rec in records: 1452 # buffers may be None, so double check 1453 b = rec.pop('buffers', empty) or empty 1454 if buffer_lens is not None: 1455 buffer_lens.append(len(b)) 1456 buffers.extend(b) 1457 rb = rec.pop('result_buffers', empty) or empty 1458 if result_buffer_lens is not None: 1459 result_buffer_lens.append(len(rb)) 1460 buffers.extend(rb) 1461 content = dict(status='ok', records=records, buffer_lens=buffer_lens, 1462 result_buffer_lens=result_buffer_lens) 1463 # self.log.debug (content) 1464 self.session.send(self.query, "db_reply", content=content, 1465 parent=msg, ident=client_id, 1466 buffers=buffers) 1467 1468 @coroutine 1469 def become_dask(self, client_id, msg): 1470 """Start a dask.distributed Scheduler.""" 1471 if self.distributed_scheduler is None: 1472 kwargs = msg['content'].get('scheduler_args', {}) 1473 self.log.info("Becoming dask.distributed scheduler: %s", kwargs) 1474 from distributed import Scheduler 1475 self.distributed_scheduler = scheduler = Scheduler(**kwargs) 1476 yield scheduler.start() 1477 content = { 1478 'status': 'ok', 1479 'ip': self.distributed_scheduler.ip, 1480 'port': self.distributed_scheduler.port, 1481 } 1482 self.log.info("dask.distributed scheduler running at {ip}:{port}".format(**content)) 1483 self.session.send(self.query, "become_dask_reply", content=content, 1484 parent=msg, ident=client_id, 1485 ) 1486 1487 1488 def stop_distributed(self, client_id, msg): 1489 """Start a dask.distributed Scheduler.""" 1490 if self.distributed_scheduler is not None: 1491 self.log.info("Stopping dask.distributed scheduler") 1492 self.distributed_scheduler.close() 1493 self.distributed_scheduler = None 1494 else: 1495 self.log.info("No dask.distributed scheduler running.") 1496 content = {'status': 'ok'} 1497 self.session.send(self.query, "stop_distributed_reply", content=content, 1498 parent=msg, ident=client_id, 1499 ) 1500 1501