1"""The IPython Controller Hub with 0MQ
2
3This is the master object that handles connections from engines and clients,
4and monitors traffic through the various queues.
5"""
6
7# Copyright (c) IPython Development Team.
8# Distributed under the terms of the Modified BSD License.
9
10from __future__ import print_function
11
12import json
13import os
14import sys
15import time
16
17from tornado.gen import coroutine, maybe_future
18import zmq
19from zmq.eventloop.zmqstream import ZMQStream
20
21# internal:
22from ipython_genutils.importstring import import_item
23from ..util import extract_dates
24from jupyter_client.localinterfaces import localhost
25from ipython_genutils.py3compat import cast_bytes, unicode_type, iteritems, buffer_to_bytes_py2
26from traitlets import (
27    HasTraits, Any, Instance, Integer, Unicode, Dict, Set, Tuple,
28    DottedObjectName, observe
29)
30
31from ipyparallel import error, util
32from ipyparallel.factory import RegistrationFactory
33
34from jupyter_client.session import SessionFactory
35
36from .heartmonitor import HeartMonitor
37
38
39def _passer(*args, **kwargs):
40    return
41
42def _printer(*args, **kwargs):
43    print (args)
44    print (kwargs)
45
46def empty_record():
47    """Return an empty dict with all record keys."""
48    return {
49        'msg_id' : None,
50        'header' : None,
51        'metadata' : None,
52        'content': None,
53        'buffers': None,
54        'submitted': None,
55        'client_uuid' : None,
56        'engine_uuid' : None,
57        'started': None,
58        'completed': None,
59        'resubmitted': None,
60        'received': None,
61        'result_header' : None,
62        'result_metadata' : None,
63        'result_content' : None,
64        'result_buffers' : None,
65        'queue' : None,
66        'execute_input' : None,
67        'execute_result': None,
68        'error': None,
69        'stdout': '',
70        'stderr': '',
71    }
72
73def init_record(msg):
74    """Initialize a TaskRecord based on a request."""
75    header = msg['header']
76    return {
77        'msg_id' : header['msg_id'],
78        'header' : header,
79        'content': msg['content'],
80        'metadata': msg['metadata'],
81        'buffers': msg['buffers'],
82        'submitted': util.ensure_timezone(header['date']),
83        'client_uuid' : None,
84        'engine_uuid' : None,
85        'started': None,
86        'completed': None,
87        'resubmitted': None,
88        'received': None,
89        'result_header' : None,
90        'result_metadata': None,
91        'result_content' : None,
92        'result_buffers' : None,
93        'queue' : None,
94        'execute_input' : None,
95        'execute_result': None,
96        'error': None,
97        'stdout': '',
98        'stderr': '',
99    }
100
101
102class EngineConnector(HasTraits):
103    """A simple object for accessing the various zmq connections of an object.
104    Attributes are:
105    id (int): engine ID
106    uuid (unicode): engine UUID
107    pending: set of msg_ids
108    stallback: tornado timeout for stalled registration
109    """
110
111    id = Integer(0)
112    uuid = Unicode()
113    pending = Set()
114    stallback = Any()
115
116
117_db_shortcuts = {
118    'sqlitedb' : 'ipyparallel.controller.sqlitedb.SQLiteDB',
119    'mongodb'  : 'ipyparallel.controller.mongodb.MongoDB',
120    'dictdb'   : 'ipyparallel.controller.dictdb.DictDB',
121    'nodb'     : 'ipyparallel.controller.dictdb.NoDB',
122}
123
124class HubFactory(RegistrationFactory):
125    """The Configurable for setting up a Hub."""
126
127    # port-pairs for monitoredqueues:
128    hb = Tuple(Integer(), Integer(), config=True,
129        help="""PUB/ROUTER Port pair for Engine heartbeats""")
130    def _hb_default(self):
131        return tuple(util.select_random_ports(2))
132
133    mux = Tuple(Integer(), Integer(), config=True,
134        help="""Client/Engine Port pair for MUX queue""")
135
136    def _mux_default(self):
137        return tuple(util.select_random_ports(2))
138
139    task = Tuple(Integer(), Integer(), config=True,
140        help="""Client/Engine Port pair for Task queue""")
141    def _task_default(self):
142        return tuple(util.select_random_ports(2))
143
144    control = Tuple(Integer(), Integer(), config=True,
145        help="""Client/Engine Port pair for Control queue""")
146
147    def _control_default(self):
148        return tuple(util.select_random_ports(2))
149
150    iopub = Tuple(Integer(), Integer(), config=True,
151        help="""Client/Engine Port pair for IOPub relay""")
152
153    def _iopub_default(self):
154        return tuple(util.select_random_ports(2))
155
156    # single ports:
157    mon_port = Integer(config=True,
158        help="""Monitor (SUB) port for queue traffic""")
159
160    def _mon_port_default(self):
161        return util.select_random_ports(1)[0]
162
163    notifier_port = Integer(config=True,
164        help="""PUB port for sending engine status notifications""")
165
166    def _notifier_port_default(self):
167        return util.select_random_ports(1)[0]
168
169    engine_ip = Unicode(config=True,
170        help="IP on which to listen for engine connections. [default: loopback]")
171    def _engine_ip_default(self):
172        return localhost()
173    engine_transport = Unicode('tcp', config=True,
174        help="0MQ transport for engine connections. [default: tcp]")
175
176    client_ip = Unicode(config=True,
177        help="IP on which to listen for client connections. [default: loopback]")
178    client_transport = Unicode('tcp', config=True,
179        help="0MQ transport for client connections. [default : tcp]")
180
181    monitor_ip = Unicode(config=True,
182        help="IP on which to listen for monitor messages. [default: loopback]")
183    monitor_transport = Unicode('tcp', config=True,
184        help="0MQ transport for monitor messages. [default : tcp]")
185
186    _client_ip_default = _monitor_ip_default = _engine_ip_default
187
188
189    monitor_url = Unicode('')
190
191    db_class = DottedObjectName('DictDB',
192        config=True, help="""The class to use for the DB backend
193
194        Options include:
195
196        SQLiteDB: SQLite
197        MongoDB : use MongoDB
198        DictDB  : in-memory storage (fastest, but be mindful of memory growth of the Hub)
199        NoDB    : disable database altogether (default)
200
201        """)
202
203    registration_timeout = Integer(0, config=True,
204        help="Engine registration timeout in seconds [default: max(30,"
205             "10*heartmonitor.period)]" )
206
207    def _registration_timeout_default(self):
208        if self.heartmonitor is None:
209            # early initialization, this value will be ignored
210            return 0
211            # heartmonitor period is in milliseconds, so 10x in seconds is .01
212        return max(30, int(.01 * self.heartmonitor.period))
213
214    # not configurable
215    db = Instance('ipyparallel.controller.dictdb.BaseDB', allow_none=True)
216    heartmonitor = Instance('ipyparallel.controller.heartmonitor.HeartMonitor', allow_none=True)
217
218    @observe('ip')
219    def _ip_changed(self, change):
220        new = change['new']
221        self.engine_ip = new
222        self.client_ip = new
223        self.monitor_ip = new
224        self._update_monitor_url()
225
226    def _update_monitor_url(self):
227        self.monitor_url = "%s://%s:%i" % (self.monitor_transport, self.monitor_ip, self.mon_port)
228
229    @observe('transport')
230    def _transport_changed(self, change):
231        new = change['new']
232        self.engine_transport = new
233        self.client_transport = new
234        self.monitor_transport = new
235        self._update_monitor_url()
236
237    def __init__(self, **kwargs):
238        super(HubFactory, self).__init__(**kwargs)
239        self._update_monitor_url()
240
241
242    def construct(self):
243        self.init_hub()
244
245    def start(self):
246        self.heartmonitor.start()
247        self.log.info("Heartmonitor started")
248
249    def client_url(self, channel):
250        """return full zmq url for a named client channel"""
251        return "%s://%s:%i" % (self.client_transport, self.client_ip, self.client_info[channel])
252
253    def engine_url(self, channel):
254        """return full zmq url for a named engine channel"""
255        return "%s://%s:%i" % (self.engine_transport, self.engine_ip, self.engine_info[channel])
256
257    def init_hub(self):
258        """construct Hub object"""
259
260        ctx = self.context
261        loop = self.loop
262        if 'TaskScheduler.scheme_name' in self.config:
263            scheme = self.config.TaskScheduler.scheme_name
264        else:
265            from .scheduler import TaskScheduler
266            scheme = TaskScheduler.scheme_name.default_value
267
268        # build connection dicts
269        engine = self.engine_info = {
270            'interface'     : "%s://%s" % (self.engine_transport, self.engine_ip),
271            'registration'  : self.regport,
272            'control'       : self.control[1],
273            'mux'           : self.mux[1],
274            'hb_ping'       : self.hb[0],
275            'hb_pong'       : self.hb[1],
276            'task'          : self.task[1],
277            'iopub'         : self.iopub[1],
278            }
279
280        client = self.client_info = {
281            'interface'     : "%s://%s" % (self.client_transport, self.client_ip),
282            'registration'  : self.regport,
283            'control'       : self.control[0],
284            'mux'           : self.mux[0],
285            'task'          : self.task[0],
286            'task_scheme'   : scheme,
287            'iopub'         : self.iopub[0],
288            'notification'  : self.notifier_port,
289            }
290
291        self.log.debug("Hub engine addrs: %s", self.engine_info)
292        self.log.debug("Hub client addrs: %s", self.client_info)
293
294        # Registrar socket
295        q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
296        util.set_hwm(q, 0)
297        q.bind(self.client_url('registration'))
298        self.log.info("Hub listening on %s for registration.", self.client_url('registration'))
299        if self.client_ip != self.engine_ip:
300            q.bind(self.engine_url('registration'))
301            self.log.info("Hub listening on %s for registration.", self.engine_url('registration'))
302
303        ### Engine connections ###
304
305        # heartbeat
306        hpub = ctx.socket(zmq.PUB)
307        hpub.bind(self.engine_url('hb_ping'))
308        hrep = ctx.socket(zmq.ROUTER)
309        util.set_hwm(hrep, 0)
310        hrep.bind(self.engine_url('hb_pong'))
311        self.heartmonitor = HeartMonitor(loop=loop, parent=self, log=self.log,
312                                pingstream=ZMQStream(hpub,loop),
313                                pongstream=ZMQStream(hrep,loop)
314                            )
315
316        ### Client connections ###
317
318        # Notifier socket
319        n = ZMQStream(ctx.socket(zmq.PUB), loop)
320        n.bind(self.client_url('notification'))
321
322        ### build and launch the queues ###
323
324        # monitor socket
325        sub = ctx.socket(zmq.SUB)
326        sub.setsockopt(zmq.SUBSCRIBE, b"")
327        sub.bind(self.monitor_url)
328        sub.bind('inproc://monitor')
329        sub = ZMQStream(sub, loop)
330
331        # connect the db
332        db_class = _db_shortcuts.get(self.db_class.lower(), self.db_class)
333        self.log.info('Hub using DB backend: %r', (db_class.split('.')[-1]))
334        self.db = import_item(str(db_class))(session=self.session.session,
335                                            parent=self, log=self.log)
336        time.sleep(.25)
337
338        # resubmit stream
339        r = ZMQStream(ctx.socket(zmq.DEALER), loop)
340        url = util.disambiguate_url(self.client_url('task'))
341        r.connect(url)
342
343        self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
344                query=q, notifier=n, resubmit=r, db=self.db,
345                engine_info=self.engine_info, client_info=self.client_info,
346                log=self.log, registration_timeout=self.registration_timeout,
347                parent=self,
348                )
349
350
351class Hub(SessionFactory):
352    """The IPython Controller Hub with 0MQ connections
353
354    Parameters
355    ==========
356    loop: zmq IOLoop instance
357    session: Session object
358    <removed> context: zmq context for creating new connections (?)
359    queue: ZMQStream for monitoring the command queue (SUB)
360    query: ZMQStream for engine registration and client queries requests (ROUTER)
361    heartbeat: HeartMonitor object checking the pulse of the engines
362    notifier: ZMQStream for broadcasting engine registration changes (PUB)
363    db: connection to db for out of memory logging of commands
364                NotImplemented
365    engine_info: dict of zmq connection information for engines to connect
366                to the queues.
367    client_info: dict of zmq connection information for engines to connect
368                to the queues.
369    """
370
371    engine_state_file = Unicode()
372
373    # internal data structures:
374    ids=Set() # engine IDs
375    keytable=Dict()
376    by_ident=Dict()
377    engines=Dict()
378    clients=Dict()
379    hearts=Dict()
380    pending=Set()
381    queues=Dict()  # pending msg_ids keyed by engine_id
382    tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
383    completed=Dict() # completed msg_ids keyed by engine_id
384    all_completed=Set() # completed msg_ids keyed by engine_id
385    unassigned=Set() # set of task msg_ds not yet assigned a destination
386    incoming_registrations=Dict()
387    registration_timeout=Integer()
388    _idcounter=Integer(0)
389    distributed_scheduler = Any()
390
391    # objects from constructor:
392    query=Instance(ZMQStream, allow_none=True)
393    monitor=Instance(ZMQStream, allow_none=True)
394    notifier=Instance(ZMQStream, allow_none=True)
395    resubmit=Instance(ZMQStream, allow_none=True)
396    heartmonitor=Instance(HeartMonitor, allow_none=True)
397    db=Instance(object, allow_none=True)
398    client_info=Dict()
399    engine_info=Dict()
400
401
402    def __init__(self, **kwargs):
403        """
404        # universal:
405        loop: IOLoop for creating future connections
406        session: streamsession for sending serialized data
407        # engine:
408        queue: ZMQStream for monitoring queue messages
409        query: ZMQStream for engine+client registration and client requests
410        heartbeat: HeartMonitor object for tracking engines
411        # extra:
412        db: ZMQStream for db connection (NotImplemented)
413        engine_info: zmq address/protocol dict for engine connections
414        client_info: zmq address/protocol dict for client connections
415        """
416
417        super(Hub, self).__init__(**kwargs)
418
419        # register our callbacks
420        self.query.on_recv(self.dispatch_query)
421        self.monitor.on_recv(self.dispatch_monitor_traffic)
422
423        self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
424        self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
425
426        self.monitor_handlers = {b'in' : self.save_queue_request,
427                                b'out': self.save_queue_result,
428                                b'intask': self.save_task_request,
429                                b'outtask': self.save_task_result,
430                                b'tracktask': self.save_task_destination,
431                                b'incontrol': _passer,
432                                b'outcontrol': _passer,
433                                b'iopub': self.monitor_iopub_message,
434        }
435
436        self.query_handlers = {'queue_request': self.queue_status,
437                                'result_request': self.get_results,
438                                'history_request': self.get_history,
439                                'db_request': self.db_query,
440                                'purge_request': self.purge_results,
441                                'load_request': self.check_load,
442                                'resubmit_request': self.resubmit_task,
443                                'shutdown_request': self.shutdown_request,
444                                'registration_request' : self.register_engine,
445                                'unregistration_request' : self.unregister_engine,
446                                'connection_request': self.connection_request,
447                                'become_dask_request': self.become_dask,
448                                'stop_distributed_request': self.stop_distributed,
449        }
450
451        # ignore resubmit replies
452        self.resubmit.on_recv(lambda msg: None, copy=False)
453
454        self.log.info("hub::created hub")
455
456    def new_engine_id(self, requested_id=None):
457        """generate a new engine integer id.
458
459        No longer reuse old ids, just count from 0.
460
461        If an id is requested and available, use that.
462        Otherwise, use the counter.
463        """
464        if requested_id is not None:
465            if requested_id in self.engines:
466                self.log.warning("Engine id %s in use by engine with uuid=%s",
467                     requested_id, self.engines[requested_id].uuid)
468            elif requested_id in {
469                ec.id for ec in self.incoming_registrations.values()
470            }:
471                self.log.warning("Engine id %s registration already pending", requested_id)
472            else:
473                self._idcounter = max(requested_id + 1, self._idcounter)
474                return requested_id
475        newid = self._idcounter
476        self._idcounter += 1
477        return newid
478
479    #-----------------------------------------------------------------------------
480    # message validation
481    #-----------------------------------------------------------------------------
482
483    def _validate_targets(self, targets):
484        """turn any valid targets argument into a list of integer ids"""
485        if targets is None:
486            # default to all
487            return self.ids
488
489        if isinstance(targets, (int,str,unicode_type)):
490            # only one target specified
491            targets = [targets]
492        _targets = []
493        for t in targets:
494            # map raw identities to ids
495            if isinstance(t, (str,unicode_type)):
496                t = self.by_ident.get(cast_bytes(t), t)
497            _targets.append(t)
498        targets = _targets
499        bad_targets = [ t for t in targets if t not in self.ids ]
500        if bad_targets:
501            raise IndexError("No Such Engine: %r" % bad_targets)
502        if not targets:
503            raise IndexError("No Engines Registered")
504        return targets
505
506    #-----------------------------------------------------------------------------
507    # dispatch methods (1 per stream)
508    #-----------------------------------------------------------------------------
509
510
511    @util.log_errors
512    def dispatch_monitor_traffic(self, msg):
513        """all ME and Task queue messages come through here, as well as
514        IOPub traffic."""
515        self.log.debug("monitor traffic: %r", msg[0])
516        switch = msg[0]
517        try:
518            idents, msg = self.session.feed_identities(msg[1:])
519        except ValueError:
520            idents=[]
521        if not idents:
522            self.log.error("Monitor message without topic: %r", msg)
523            return
524        handler = self.monitor_handlers.get(switch, None)
525        if handler is not None:
526            handler(idents, msg)
527        else:
528            self.log.error("Unrecognized monitor topic: %r", switch)
529
530    @util.log_errors
531    @coroutine
532    def dispatch_query(self, msg):
533        """Route registration requests and queries from clients."""
534        try:
535            idents, msg = self.session.feed_identities(msg)
536        except ValueError:
537            idents = []
538        if not idents:
539            self.log.error("Bad Query Message: %r", msg)
540            return
541        client_id = idents[0]
542        try:
543            msg = self.session.deserialize(msg, content=True)
544        except Exception:
545            content = error.wrap_exception()
546            self.log.error("Bad Query Message: %r", msg, exc_info=True)
547            self.session.send(self.query, "hub_error", ident=client_id,
548                    content=content, parent=msg)
549            return
550        # print client_id, header, parent, content
551        #switch on message type:
552        msg_type = msg['header']['msg_type']
553        self.log.info("client::client %r requested %r", client_id, msg_type)
554        handler = self.query_handlers.get(msg_type, None)
555        try:
556            if handler is None:
557                raise KeyError("Bad Message Type: %r" % msg_type)
558        except:
559            content = error.wrap_exception()
560            self.log.error("Bad Message Type: %r", msg_type, exc_info=True)
561            self.session.send(self.query, "hub_error", ident=client_id,
562                    content=content, parent=msg)
563            return
564
565        try:
566            f = handler(idents, msg)
567            if f:
568                yield maybe_future(f)
569        except Exception:
570            content = error.wrap_exception()
571            self.log.error("Error handling request: %r", msg_type, exc_info=True)
572            self.session.send(self.query, "hub_error", ident=client_id,
573                    content=content, parent=msg)
574
575    #---------------------------------------------------------------------------
576    # handler methods (1 per event)
577    #---------------------------------------------------------------------------
578
579    #----------------------- Heartbeat --------------------------------------
580
581    def handle_new_heart(self, heart):
582        """handler to attach to heartbeater.
583        Called when a new heart starts to beat.
584        Triggers completion of registration."""
585        self.log.debug("heartbeat::handle_new_heart(%r)", heart)
586        if heart not in self.incoming_registrations:
587            self.log.info("heartbeat::ignoring new heart: %r", heart)
588        else:
589            self.finish_registration(heart)
590
591
592    def handle_heart_failure(self, heart):
593        """handler to attach to heartbeater.
594        called when a previously registered heart fails to respond to beat request.
595        triggers unregistration"""
596        self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
597        eid = self.hearts.get(heart, None)
598        if eid is None:
599            self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
600        else:
601            uuid = self.engines[eid].uuid
602            self.unregister_engine(heart, dict(content=dict(id=eid, queue=uuid)))
603
604    #----------------------- MUX Queue Traffic ------------------------------
605
606    def save_queue_request(self, idents, msg):
607        if len(idents) < 2:
608            self.log.error("invalid identity prefix: %r", idents)
609            return
610        queue_id, client_id = idents[:2]
611        try:
612            msg = self.session.deserialize(msg)
613        except Exception:
614            self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
615            return
616
617        eid = self.by_ident.get(queue_id, None)
618        if eid is None:
619            self.log.error("queue::target %r not registered", queue_id)
620            self.log.debug("queue::    valid are: %r", self.by_ident.keys())
621            return
622        record = init_record(msg)
623        msg_id = record['msg_id']
624        self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
625        # Unicode in records
626        record['engine_uuid'] = queue_id.decode('ascii')
627        record['client_uuid'] = msg['header']['session']
628        record['queue'] = 'mux'
629
630        try:
631            # it's posible iopub arrived first:
632            existing = self.db.get_record(msg_id)
633            for key,evalue in iteritems(existing):
634                rvalue = record.get(key, None)
635                if evalue and rvalue and evalue != rvalue:
636                    self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
637                elif evalue and not rvalue:
638                    record[key] = evalue
639            try:
640                self.db.update_record(msg_id, record)
641            except Exception:
642                self.log.error("DB Error updating record %r", msg_id, exc_info=True)
643        except KeyError:
644            try:
645                self.db.add_record(msg_id, record)
646            except Exception:
647                self.log.error("DB Error adding record %r", msg_id, exc_info=True)
648
649
650        self.pending.add(msg_id)
651        self.queues[eid].append(msg_id)
652
653    def save_queue_result(self, idents, msg):
654        if len(idents) < 2:
655            self.log.error("invalid identity prefix: %r", idents)
656            return
657
658        client_id, queue_id = idents[:2]
659        try:
660            msg = self.session.deserialize(msg)
661        except Exception:
662            self.log.error("queue::engine %r sent invalid message to %r: %r",
663                    queue_id, client_id, msg, exc_info=True)
664            return
665
666        eid = self.by_ident.get(queue_id, None)
667        if eid is None:
668            self.log.error("queue::unknown engine %r is sending a reply: ", queue_id)
669            return
670
671        parent = msg['parent_header']
672        if not parent:
673            return
674        msg_id = parent['msg_id']
675        if msg_id in self.pending:
676            self.pending.remove(msg_id)
677            self.all_completed.add(msg_id)
678            self.queues[eid].remove(msg_id)
679            self.completed[eid].append(msg_id)
680            self.log.info("queue::request %r completed on %s", msg_id, eid)
681        elif msg_id not in self.all_completed:
682            # it could be a result from a dead engine that died before delivering the
683            # result
684            self.log.warn("queue:: unknown msg finished %r", msg_id)
685            return
686        # update record anyway, because the unregistration could have been premature
687        rheader = msg['header']
688        md = msg['metadata']
689        completed = util.ensure_timezone(rheader['date'])
690        started = extract_dates(md.get('started', None))
691        result = {
692            'result_header' : rheader,
693            'result_metadata': md,
694            'result_content': msg['content'],
695            'received': util.utcnow(),
696            'started' : started,
697            'completed' : completed
698        }
699
700        result['result_buffers'] = msg['buffers']
701        try:
702            self.db.update_record(msg_id, result)
703        except Exception:
704            self.log.error("DB Error updating record %r", msg_id, exc_info=True)
705
706
707    #--------------------- Task Queue Traffic ------------------------------
708
709    def save_task_request(self, idents, msg):
710        """Save the submission of a task."""
711        client_id = idents[0]
712
713        try:
714            msg = self.session.deserialize(msg)
715        except Exception:
716            self.log.error("task::client %r sent invalid task message: %r",
717                    client_id, msg, exc_info=True)
718            return
719        record = init_record(msg)
720
721        record['client_uuid'] = msg['header']['session']
722        record['queue'] = 'task'
723        header = msg['header']
724        msg_id = header['msg_id']
725        self.pending.add(msg_id)
726        self.unassigned.add(msg_id)
727        try:
728            # it's posible iopub arrived first:
729            existing = self.db.get_record(msg_id)
730            if existing['resubmitted']:
731                for key in ('submitted', 'client_uuid', 'buffers'):
732                    # don't clobber these keys on resubmit
733                    # submitted and client_uuid should be different
734                    # and buffers might be big, and shouldn't have changed
735                    record.pop(key)
736                    # still check content,header which should not change
737                    # but are not expensive to compare as buffers
738
739            for key,evalue in iteritems(existing):
740                if key.endswith('buffers'):
741                    # don't compare buffers
742                    continue
743                rvalue = record.get(key, None)
744                if evalue and rvalue and evalue != rvalue:
745                    self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
746                elif evalue and not rvalue:
747                    record[key] = evalue
748            try:
749                self.db.update_record(msg_id, record)
750            except Exception:
751                self.log.error("DB Error updating record %r", msg_id, exc_info=True)
752        except KeyError:
753            try:
754                self.db.add_record(msg_id, record)
755            except Exception:
756                self.log.error("DB Error adding record %r", msg_id, exc_info=True)
757        except Exception:
758            self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
759
760    def save_task_result(self, idents, msg):
761        """save the result of a completed task."""
762        client_id = idents[0]
763        try:
764            msg = self.session.deserialize(msg)
765        except Exception:
766            self.log.error("task::invalid task result message send to %r: %r",
767                    client_id, msg, exc_info=True)
768            return
769
770        parent = msg['parent_header']
771        if not parent:
772            # print msg
773            self.log.warn("Task %r had no parent!", msg)
774            return
775        msg_id = parent['msg_id']
776        if msg_id in self.unassigned:
777            self.unassigned.remove(msg_id)
778
779        header = msg['header']
780        md = msg['metadata']
781        engine_uuid = md.get('engine', u'')
782        eid = self.by_ident.get(cast_bytes(engine_uuid), None)
783
784        status = md.get('status', None)
785
786        if msg_id in self.pending:
787            self.log.info("task::task %r finished on %s", msg_id, eid)
788            self.pending.remove(msg_id)
789            self.all_completed.add(msg_id)
790            if eid is not None:
791                if status != 'aborted':
792                    self.completed[eid].append(msg_id)
793                if msg_id in self.tasks[eid]:
794                    self.tasks[eid].remove(msg_id)
795            completed = util.ensure_timezone(header['date'])
796            started = extract_dates(md.get('started', None))
797            result = {
798                'result_header' : header,
799                'result_metadata': msg['metadata'],
800                'result_content': msg['content'],
801                'started' : started,
802                'completed' : completed,
803                'received' : util.utcnow(),
804                'engine_uuid': engine_uuid,
805            }
806
807            result['result_buffers'] = msg['buffers']
808            try:
809                self.db.update_record(msg_id, result)
810            except Exception:
811                self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
812
813        else:
814            self.log.debug("task::unknown task %r finished", msg_id)
815
816    def save_task_destination(self, idents, msg):
817        try:
818            msg = self.session.deserialize(msg, content=True)
819        except Exception:
820            self.log.error("task::invalid task tracking message", exc_info=True)
821            return
822        content = msg['content']
823        # print (content)
824        msg_id = content['msg_id']
825        engine_uuid = content['engine_id']
826        eid = self.by_ident[cast_bytes(engine_uuid)]
827
828        self.log.info("task::task %r arrived on %r", msg_id, eid)
829        if msg_id in self.unassigned:
830            self.unassigned.remove(msg_id)
831        # else:
832        #     self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
833
834        self.tasks[eid].append(msg_id)
835        try:
836            self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
837        except Exception:
838            self.log.error("DB Error saving task destination %r", msg_id, exc_info=True)
839
840
841    #--------------------- IOPub Traffic ------------------------------
842
843    def monitor_iopub_message(self, topics, msg):
844        '''intercept iopub traffic so events can be acted upon'''
845        try:
846            msg = self.session.deserialize(msg, content=True)
847        except Exception:
848            self.log.error("iopub::invalid IOPub message", exc_info=True)
849            return
850
851        msg_type = msg['header']['msg_type']
852        if msg_type == 'shutdown_reply':
853            session = msg['header']['session']
854            eid = self.by_ident.get(session, None)
855            uuid = self.engines[eid].uuid
856            self.unregister_engine(ident='shutdown_reply',
857                                   msg=dict(content=dict(id=eid, queue=uuid)))
858
859        if msg_type not in ('status', 'shutdown_reply', ):
860            self.save_iopub_message(topics, msg)
861
862    def save_iopub_message(self, topics, msg):
863        """save an iopub message into the db"""
864        parent = msg['parent_header']
865        if not parent:
866            self.log.debug("iopub::IOPub message lacks parent: %r", msg)
867            return
868        msg_id = parent['msg_id']
869        msg_type = msg['header']['msg_type']
870        content = msg['content']
871
872        # ensure msg_id is in db
873        try:
874            rec = self.db.get_record(msg_id)
875        except KeyError:
876            rec = None
877
878        # stream
879        d = {}
880        if msg_type == 'stream':
881            name = content['name']
882            s = '' if rec is None else rec[name]
883            d[name] = s + content['text']
884        elif msg_type == 'error':
885            d['error'] = content
886        elif msg_type == 'execute_input':
887            d['execute_input'] = content['code']
888        elif msg_type in ('display_data', 'execute_result'):
889            d[msg_type] = content
890        elif msg_type == 'data_pub':
891            self.log.info("ignored data_pub message for %s" % msg_id)
892        else:
893            self.log.warn("unhandled iopub msg_type: %r", msg_type)
894
895        if not d:
896            return
897
898        if rec is None:
899            # new record
900            rec = empty_record()
901            rec['msg_id'] = msg_id
902            rec.update(d)
903            d = rec
904            update_record = self.db.add_record
905        else:
906            update_record = self.db.update_record
907
908        try:
909            update_record(msg_id, d)
910        except Exception:
911            self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True)
912
913
914
915    #-------------------------------------------------------------------------
916    # Registration requests
917    #-------------------------------------------------------------------------
918
919    def connection_request(self, client_id, msg):
920        """Reply with connection addresses for clients."""
921        self.log.info("client::client %r connected", client_id)
922        content = dict(status='ok')
923        jsonable = {}
924        for k,v in iteritems(self.keytable):
925                jsonable[str(k)] = v
926        content['engines'] = jsonable
927        self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
928
929    def register_engine(self, reg, msg):
930        """Register a new engine."""
931        content = msg['content']
932        try:
933            uuid = content['uuid']
934        except KeyError:
935            self.log.error("registration::queue not specified", exc_info=True)
936            return
937
938        eid = self.new_engine_id(content.get('id'))
939
940        self.log.debug("registration::register_engine(%i, %r)", eid, uuid)
941
942        content = dict(id=eid,status='ok',hb_period=self.heartmonitor.period)
943        # check if requesting available IDs:
944        if cast_bytes(uuid) in self.by_ident:
945            try:
946                raise KeyError("uuid %r in use" % uuid)
947            except:
948                content = error.wrap_exception()
949                self.log.error("uuid %r in use", uuid, exc_info=True)
950        else:
951            for h, ec in iteritems(self.incoming_registrations):
952                if uuid == h:
953                    try:
954                        raise KeyError("heart_id %r in use" % uuid)
955                    except:
956                        self.log.error("heart_id %r in use", uuid, exc_info=True)
957                        content = error.wrap_exception()
958                    break
959                elif uuid == ec.uuid:
960                    try:
961                        raise KeyError("uuid %r in use" % uuid)
962                    except:
963                        self.log.error("uuid %r in use", uuid, exc_info=True)
964                        content = error.wrap_exception()
965                    break
966
967        msg = self.session.send(self.query, "registration_reply",
968                content=content,
969                ident=reg)
970
971        heart = cast_bytes(uuid)
972
973        if content['status'] == 'ok':
974            if heart in self.heartmonitor.hearts:
975                # already beating
976                self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid)
977                self.finish_registration(heart)
978            else:
979                purge = lambda : self._purge_stalled_registration(heart)
980                t = self.loop.add_timeout(
981                    self.loop.time() + self.registration_timeout,
982                    purge,
983                )
984                self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=t)
985        else:
986            self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
987
988        return eid
989
990    def unregister_engine(self, ident, msg):
991        """Unregister an engine that explicitly requested to leave."""
992        try:
993            eid = msg['content']['id']
994        except:
995            self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
996            return
997        self.log.info("registration::unregister_engine(%r)", eid)
998
999        uuid = self.keytable[eid]
1000        content=dict(id=eid, uuid=uuid)
1001
1002        #stop the heartbeats
1003        self.hearts.pop(uuid, None)
1004        self.heartmonitor.responses.discard(uuid)
1005        self.heartmonitor.hearts.discard(uuid)
1006
1007        self.loop.add_timeout(
1008            self.loop.time() + self.registration_timeout,
1009            lambda : self._handle_stranded_msgs(eid, uuid),
1010        )
1011
1012        # cleanup mappings
1013        self.by_ident.pop(uuid, None)
1014        self.engines.pop(eid, None)
1015        self.keytable.pop(eid, None)
1016
1017        self._save_engine_state()
1018
1019        if self.notifier:
1020            self.session.send(self.notifier, "unregistration_notification", content=content)
1021
1022    def _handle_stranded_msgs(self, eid, uuid):
1023        """Handle messages known to be on an engine when the engine unregisters.
1024
1025        It is possible that this will fire prematurely - that is, an engine will
1026        go down after completing a result, and the client will be notified
1027        that the result failed and later receive the actual result.
1028        """
1029
1030        outstanding = self.queues[eid]
1031
1032        for msg_id in outstanding:
1033            self.pending.remove(msg_id)
1034            self.all_completed.add(msg_id)
1035            try:
1036                raise error.EngineError("Engine %r died while running task %r" % (eid, msg_id))
1037            except:
1038                content = error.wrap_exception()
1039            # build a fake header:
1040            header = {}
1041            header['engine'] = uuid
1042            header['date'] = util.utcnow()
1043            rec = dict(result_content=content, result_header=header, result_buffers=[])
1044            rec['completed'] = util.ensure_timezone(header['date'])
1045            rec['engine_uuid'] = uuid
1046            try:
1047                self.db.update_record(msg_id, rec)
1048            except Exception:
1049                self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True)
1050
1051
1052    def finish_registration(self, heart):
1053        """Second half of engine registration, called after our HeartMonitor
1054        has received a beat from the Engine's Heart."""
1055        try:
1056            ec = self.incoming_registrations.pop(heart)
1057        except KeyError:
1058            self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
1059            return
1060        self.log.info("registration::finished registering engine %i:%s", ec.id, ec.uuid)
1061        if ec.stallback is not None:
1062            self.loop.remove_timeout(ec.stallback)
1063        eid = ec.id
1064        self.ids.add(eid)
1065        self.keytable[eid] = ec.uuid
1066        self.engines[eid] = ec
1067        self.by_ident[cast_bytes(ec.uuid)] = ec.id
1068        self.queues[eid] = list()
1069        self.tasks[eid] = list()
1070        self.completed[eid] = list()
1071        self.hearts[heart] = eid
1072        content = dict(id=eid, uuid=self.engines[eid].uuid)
1073        if self.notifier:
1074            self.session.send(self.notifier, "registration_notification", content=content)
1075        self.log.info("engine::Engine Connected: %i", eid)
1076
1077        self._save_engine_state()
1078
1079    def _purge_stalled_registration(self, heart):
1080        if heart in self.incoming_registrations:
1081            ec = self.incoming_registrations.pop(heart)
1082            self.log.info("registration::purging stalled registration: %i", ec.id)
1083        else:
1084            pass
1085
1086    #-------------------------------------------------------------------------
1087    # Engine State
1088    #-------------------------------------------------------------------------
1089
1090
1091    def _cleanup_engine_state_file(self):
1092        """cleanup engine state mapping"""
1093
1094        if os.path.exists(self.engine_state_file):
1095            self.log.debug("cleaning up engine state: %s", self.engine_state_file)
1096            try:
1097                os.remove(self.engine_state_file)
1098            except IOError:
1099                self.log.error("Couldn't cleanup file: %s", self.engine_state_file, exc_info=True)
1100
1101
1102    def _save_engine_state(self):
1103        """save engine mapping to JSON file"""
1104        if not self.engine_state_file:
1105            return
1106        self.log.debug("save engine state to %s" % self.engine_state_file)
1107        state = {}
1108        engines = {}
1109        for eid, ec in self.engines.items():
1110            engines[eid] = ec.uuid
1111
1112        state['engines'] = engines
1113
1114        state['next_id'] = self._idcounter
1115
1116        with open(self.engine_state_file, 'w') as f:
1117            json.dump(state, f)
1118
1119
1120    def _load_engine_state(self):
1121        """load engine mapping from JSON file"""
1122        if not os.path.exists(self.engine_state_file):
1123            return
1124
1125        self.log.info("loading engine state from %s" % self.engine_state_file)
1126
1127        with open(self.engine_state_file) as f:
1128            state = json.load(f)
1129
1130        save_notifier = self.notifier
1131        self.notifier = None
1132        for eid, uuid in iteritems(state['engines']):
1133            heart = uuid.encode('ascii')
1134            # start with this heart as current and beating:
1135            self.heartmonitor.responses.add(heart)
1136            self.heartmonitor.hearts.add(heart)
1137
1138            self.incoming_registrations[heart] = EngineConnector(id=int(eid), uuid=uuid)
1139            self.finish_registration(heart)
1140
1141        self.notifier = save_notifier
1142
1143        self._idcounter = state['next_id']
1144
1145    #-------------------------------------------------------------------------
1146    # Client Requests
1147    #-------------------------------------------------------------------------
1148
1149    def shutdown_request(self, client_id, msg):
1150        """handle shutdown request."""
1151        self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id, parent=msg)
1152        # also notify other clients of shutdown
1153        self.session.send(self.notifier, 'shutdown_notification', content={'status': 'ok'}, parent=msg)
1154        self.loop.add_timeout(self.loop.time() + 1, self._shutdown)
1155
1156    def _shutdown(self):
1157        self.log.info("hub::hub shutting down.")
1158        time.sleep(0.1)
1159        sys.exit(0)
1160
1161
1162    def check_load(self, client_id, msg):
1163        content = msg['content']
1164        try:
1165            targets = content['targets']
1166            targets = self._validate_targets(targets)
1167        except:
1168            content = error.wrap_exception()
1169            self.session.send(self.query, "hub_error",
1170                    content=content, ident=client_id, parent=msg)
1171            return
1172
1173        content = dict(status='ok')
1174        # loads = {}
1175        for t in targets:
1176            content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1177        self.session.send(self.query, "load_reply", content=content, ident=client_id, parent=msg)
1178
1179
1180    def queue_status(self, client_id, msg):
1181        """Return the Queue status of one or more targets.
1182
1183        If verbose, return the msg_ids, else return len of each type.
1184
1185        Keys:
1186
1187        * queue (pending MUX jobs)
1188        * tasks (pending Task jobs)
1189        * completed (finished jobs from both queues)
1190        """
1191        content = msg['content']
1192        targets = content['targets']
1193        try:
1194            targets = self._validate_targets(targets)
1195        except:
1196            content = error.wrap_exception()
1197            self.session.send(self.query, "hub_error",
1198                    content=content, ident=client_id, parent=msg)
1199            return
1200        verbose = content.get('verbose', False)
1201        content = dict(status='ok')
1202        for t in targets:
1203            queue = self.queues[t]
1204            completed = self.completed[t]
1205            tasks = self.tasks[t]
1206            if not verbose:
1207                queue = len(queue)
1208                completed = len(completed)
1209                tasks = len(tasks)
1210            content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1211        content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1212        # print (content)
1213        self.session.send(self.query, "queue_reply", content=content, ident=client_id, parent=msg)
1214
1215    def purge_results(self, client_id, msg):
1216        """Purge results from memory. This method is more valuable before we move
1217        to a DB based message storage mechanism."""
1218        content = msg['content']
1219        self.log.info("Dropping records with %s", content)
1220        msg_ids = content.get('msg_ids', [])
1221        reply = dict(status='ok')
1222        if msg_ids == 'all':
1223            try:
1224                self.db.drop_matching_records(dict(completed={'$ne':None}))
1225            except Exception:
1226                reply = error.wrap_exception()
1227                self.log.exception("Error dropping records")
1228        else:
1229            pending = [m for m in msg_ids if (m in self.pending)]
1230            if pending:
1231                try:
1232                    raise IndexError("msg pending: %r" % pending[0])
1233                except:
1234                    reply = error.wrap_exception()
1235                    self.log.exception("Error dropping records")
1236            else:
1237                try:
1238                    self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1239                except Exception:
1240                    reply = error.wrap_exception()
1241                    self.log.exception("Error dropping records")
1242
1243            if reply['status'] == 'ok':
1244                eids = content.get('engine_ids', [])
1245                for eid in eids:
1246                    if eid not in self.engines:
1247                        try:
1248                            raise IndexError("No such engine: %i" % eid)
1249                        except:
1250                            reply = error.wrap_exception()
1251                            self.log.exception("Error dropping records")
1252                        break
1253                    uid = self.engines[eid].uuid
1254                    try:
1255                        self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1256                    except Exception:
1257                        reply = error.wrap_exception()
1258                        self.log.exception("Error dropping records")
1259                        break
1260
1261        self.session.send(self.query, 'purge_reply', content=reply, ident=client_id, parent=msg)
1262
1263    def resubmit_task(self, client_id, msg):
1264        """Resubmit one or more tasks."""
1265        parent = msg
1266        def finish(reply):
1267            self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id, parent=parent)
1268
1269        content = msg['content']
1270        msg_ids = content['msg_ids']
1271        reply = dict(status='ok')
1272        try:
1273            records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1274                'header', 'content', 'buffers'])
1275        except Exception:
1276            self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1277            return finish(error.wrap_exception())
1278
1279        # validate msg_ids
1280        found_ids = [ rec['msg_id'] for rec in records ]
1281        pending_ids = [ msg_id for msg_id in found_ids if msg_id in self.pending ]
1282        if len(records) > len(msg_ids):
1283            try:
1284                raise RuntimeError("DB appears to be in an inconsistent state."
1285                    "More matching records were found than should exist")
1286            except Exception:
1287                self.log.exception("Failed to resubmit task")
1288                return finish(error.wrap_exception())
1289        elif len(records) < len(msg_ids):
1290            missing = [ m for m in msg_ids if m not in found_ids ]
1291            try:
1292                raise KeyError("No such msg(s): %r" % missing)
1293            except KeyError:
1294                self.log.exception("Failed to resubmit task")
1295                return finish(error.wrap_exception())
1296        elif pending_ids:
1297            pass
1298            # no need to raise on resubmit of pending task, now that we
1299            # resubmit under new ID, but do we want to raise anyway?
1300            # msg_id = invalid_ids[0]
1301            # try:
1302            #     raise ValueError("Task(s) %r appears to be inflight" % )
1303            # except Exception:
1304            #     return finish(error.wrap_exception())
1305
1306        # mapping of original IDs to resubmitted IDs
1307        resubmitted = {}
1308
1309        # send the messages
1310        for rec in records:
1311            header = rec['header']
1312            msg = self.session.msg(header['msg_type'], parent=header)
1313            msg_id = msg['msg_id']
1314            msg['content'] = rec['content']
1315
1316            # use the old header, but update msg_id and timestamp
1317            fresh = msg['header']
1318            header['msg_id'] = fresh['msg_id']
1319            header['date'] = fresh['date']
1320            msg['header'] = header
1321
1322            self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1323
1324            resubmitted[rec['msg_id']] = msg_id
1325            self.pending.add(msg_id)
1326            msg['buffers'] = rec['buffers']
1327            try:
1328                self.db.add_record(msg_id, init_record(msg))
1329            except Exception:
1330                self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1331                return finish(error.wrap_exception())
1332
1333        finish(dict(status='ok', resubmitted=resubmitted))
1334
1335        # store the new IDs in the Task DB
1336        for msg_id, resubmit_id in iteritems(resubmitted):
1337            try:
1338                self.db.update_record(msg_id, {'resubmitted' : resubmit_id})
1339            except Exception:
1340                self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1341
1342
1343    def _extract_record(self, rec):
1344        """decompose a TaskRecord dict into subsection of reply for get_result"""
1345        io_dict = {}
1346        for key in ('execute_input', 'execute_result', 'error', 'stdout', 'stderr'):
1347                io_dict[key] = rec[key]
1348        content = {
1349            'header': rec['header'],
1350            'metadata': rec['metadata'],
1351            'result_metadata': rec['result_metadata'],
1352            'result_header' : rec['result_header'],
1353            'result_content': rec['result_content'],
1354            'received' : rec['received'],
1355            'io' : io_dict,
1356        }
1357        if rec['result_buffers']:
1358            buffers = list(map(buffer_to_bytes_py2, rec['result_buffers']))
1359        else:
1360            buffers = []
1361
1362        return content, buffers
1363
1364    def get_results(self, client_id, msg):
1365        """Get the result of 1 or more messages."""
1366        content = msg['content']
1367        msg_ids = sorted(set(content['msg_ids']))
1368        statusonly = content.get('status_only', False)
1369        pending = []
1370        completed = []
1371        content = dict(status='ok')
1372        content['pending'] = pending
1373        content['completed'] = completed
1374        buffers = []
1375        if not statusonly:
1376            try:
1377                matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1378                # turn match list into dict, for faster lookup
1379                records = {}
1380                for rec in matches:
1381                    records[rec['msg_id']] = rec
1382            except Exception:
1383                content = error.wrap_exception()
1384                self.log.exception("Failed to get results")
1385                self.session.send(self.query, "result_reply", content=content,
1386                                                    parent=msg, ident=client_id)
1387                return
1388        else:
1389            records = {}
1390        for msg_id in msg_ids:
1391            if msg_id in self.pending:
1392                pending.append(msg_id)
1393            elif msg_id in self.all_completed:
1394                completed.append(msg_id)
1395                if not statusonly:
1396                    c,bufs = self._extract_record(records[msg_id])
1397                    content[msg_id] = c
1398                    buffers.extend(bufs)
1399            elif msg_id in records:
1400                if rec['completed']:
1401                    completed.append(msg_id)
1402                    c,bufs = self._extract_record(records[msg_id])
1403                    content[msg_id] = c
1404                    buffers.extend(bufs)
1405                else:
1406                    pending.append(msg_id)
1407            else:
1408                try:
1409                    raise KeyError('No such message: '+msg_id)
1410                except:
1411                    content = error.wrap_exception()
1412                break
1413        self.session.send(self.query, "result_reply", content=content,
1414                                            parent=msg, ident=client_id,
1415                                            buffers=buffers)
1416
1417    def get_history(self, client_id, msg):
1418        """Get a list of all msg_ids in our DB records"""
1419        try:
1420            msg_ids = self.db.get_history()
1421        except Exception as e:
1422            content = error.wrap_exception()
1423            self.log.exception("Failed to get history")
1424        else:
1425            content = dict(status='ok', history=msg_ids)
1426
1427        self.session.send(self.query, "history_reply", content=content,
1428                                            parent=msg, ident=client_id)
1429
1430    def db_query(self, client_id, msg):
1431        """Perform a raw query on the task record database."""
1432        content = msg['content']
1433        query = extract_dates(content.get('query', {}))
1434        keys = content.get('keys', None)
1435        buffers = []
1436        empty = list()
1437        try:
1438            records = self.db.find_records(query, keys)
1439        except Exception as e:
1440            content = error.wrap_exception()
1441            self.log.exception("DB query failed")
1442        else:
1443            # extract buffers from reply content:
1444            if keys is not None:
1445                buffer_lens = [] if 'buffers' in keys else None
1446                result_buffer_lens = [] if 'result_buffers' in keys else None
1447            else:
1448                buffer_lens = None
1449                result_buffer_lens = None
1450
1451            for rec in records:
1452                # buffers may be None, so double check
1453                b = rec.pop('buffers', empty) or empty
1454                if buffer_lens is not None:
1455                    buffer_lens.append(len(b))
1456                    buffers.extend(b)
1457                rb = rec.pop('result_buffers', empty) or empty
1458                if result_buffer_lens is not None:
1459                    result_buffer_lens.append(len(rb))
1460                    buffers.extend(rb)
1461            content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1462                                    result_buffer_lens=result_buffer_lens)
1463        # self.log.debug (content)
1464        self.session.send(self.query, "db_reply", content=content,
1465                                            parent=msg, ident=client_id,
1466                                            buffers=buffers)
1467
1468    @coroutine
1469    def become_dask(self, client_id, msg):
1470        """Start a dask.distributed Scheduler."""
1471        if self.distributed_scheduler is None:
1472            kwargs = msg['content'].get('scheduler_args', {})
1473            self.log.info("Becoming dask.distributed scheduler: %s", kwargs)
1474            from distributed import Scheduler
1475            self.distributed_scheduler = scheduler = Scheduler(**kwargs)
1476            yield scheduler.start()
1477        content = {
1478            'status': 'ok',
1479            'ip': self.distributed_scheduler.ip,
1480            'port': self.distributed_scheduler.port,
1481        }
1482        self.log.info("dask.distributed scheduler running at {ip}:{port}".format(**content))
1483        self.session.send(self.query, "become_dask_reply", content=content,
1484            parent=msg, ident=client_id,
1485        )
1486
1487
1488    def stop_distributed(self, client_id, msg):
1489        """Start a dask.distributed Scheduler."""
1490        if self.distributed_scheduler is not None:
1491            self.log.info("Stopping dask.distributed scheduler")
1492            self.distributed_scheduler.close()
1493            self.distributed_scheduler = None
1494        else:
1495            self.log.info("No dask.distributed scheduler running.")
1496        content = {'status': 'ok'}
1497        self.session.send(self.query, "stop_distributed_reply", content=content,
1498            parent=msg, ident=client_id,
1499        )
1500
1501