1#!/usr/bin/env python 2""" 3A multi-heart Heartbeat system using PUB and ROUTER sockets. pings are sent out on the PUB, 4and hearts are tracked based on their DEALER identities. 5""" 6 7# Copyright (c) IPython Development Team. 8# Distributed under the terms of the Modified BSD License. 9 10from __future__ import print_function 11import time 12import uuid 13 14import zmq 15from zmq.devices import ThreadDevice, ThreadMonitoredQueue 16 17from traitlets.config.configurable import LoggingConfigurable 18from ipython_genutils.py3compat import str_to_bytes 19from traitlets import Set, Instance, Float, Integer, Dict, Bool 20 21from ipyparallel.util import log_errors, ioloop 22 23class Heart(object): 24 """A basic heart object for responding to a HeartMonitor. 25 This is a simple wrapper with defaults for the most common 26 Device model for responding to heartbeats. 27 28 It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using 29 SUB/DEALER for in/out. 30 31 You can specify the DEALER's IDENTITY via the optional heart_id argument.""" 32 device=None 33 id=None 34 def __init__(self, in_addr, out_addr, mon_addr=None, in_type=zmq.SUB, out_type=zmq.DEALER, mon_type=zmq.PUB, heart_id=None): 35 if mon_addr is None: 36 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type) 37 else: 38 self.device = ThreadMonitoredQueue(in_type, out_type, mon_type, in_prefix=b"", out_prefix=b"") 39 # do not allow the device to share global Context.instance, 40 # which is the default behavior in pyzmq > 2.1.10 41 self.device.context_factory = zmq.Context 42 43 self.device.daemon=True 44 self.device.connect_in(in_addr) 45 self.device.connect_out(out_addr) 46 if mon_addr is not None: 47 self.device.connect_mon(mon_addr) 48 if in_type == zmq.SUB: 49 self.device.setsockopt_in(zmq.SUBSCRIBE, b"") 50 if heart_id is None: 51 heart_id = uuid.uuid4().bytes 52 self.device.setsockopt_out(zmq.IDENTITY, heart_id) 53 self.id = heart_id 54 55 def start(self): 56 return self.device.start() 57 58 59class HeartMonitor(LoggingConfigurable): 60 """A basic HeartMonitor class 61 pingstream: a PUB stream 62 pongstream: an ROUTER stream 63 period: the period of the heartbeat in milliseconds""" 64 65 debug = Bool(False, config=True, 66 help="""Whether to include every heartbeat in debugging output. 67 68 Has to be set explicitly, because there will be *a lot* of output. 69 """ 70 ) 71 period = Integer(3000, config=True, 72 help='The frequency at which the Hub pings the engines for heartbeats ' 73 '(in ms)', 74 ) 75 max_heartmonitor_misses = Integer(10, config=True, 76 help='Allowed consecutive missed pings from controller Hub to engine before unregistering.', 77 ) 78 79 pingstream = Instance('zmq.eventloop.zmqstream.ZMQStream', allow_none=True) 80 pongstream = Instance('zmq.eventloop.zmqstream.ZMQStream', allow_none=True) 81 loop = Instance('tornado.ioloop.IOLoop') 82 def _loop_default(self): 83 return ioloop.IOLoop.current() 84 85 # not settable: 86 hearts = Set() 87 responses = Set() 88 on_probation = Dict() 89 last_ping = Float(0) 90 _new_handlers = Set() 91 _failure_handlers = Set() 92 lifetime = Float(0) 93 tic = Float(0) 94 95 def __init__(self, **kwargs): 96 super(HeartMonitor, self).__init__(**kwargs) 97 98 self.pongstream.on_recv(self.handle_pong) 99 100 def start(self): 101 self.tic = time.time() 102 self.caller = ioloop.PeriodicCallback(self.beat, self.period) 103 self.caller.start() 104 105 def add_new_heart_handler(self, handler): 106 """add a new handler for new hearts""" 107 self.log.debug("heartbeat::new_heart_handler: %s", handler) 108 self._new_handlers.add(handler) 109 110 def add_heart_failure_handler(self, handler): 111 """add a new handler for heart failure""" 112 self.log.debug("heartbeat::new heart failure handler: %s", handler) 113 self._failure_handlers.add(handler) 114 115 def beat(self): 116 self.pongstream.flush() 117 self.last_ping = self.lifetime 118 119 toc = time.time() 120 self.lifetime += toc-self.tic 121 self.tic = toc 122 if self.debug: 123 self.log.debug("heartbeat::sending %s", self.lifetime) 124 goodhearts = self.hearts.intersection(self.responses) 125 missed_beats = self.hearts.difference(goodhearts) 126 newhearts = self.responses.difference(goodhearts) 127 for heart in newhearts: 128 self.handle_new_heart(heart) 129 heartfailures, on_probation = self._check_missed(missed_beats, self.on_probation, 130 self.hearts) 131 for failure in heartfailures: 132 self.handle_heart_failure(failure) 133 self.on_probation = on_probation 134 self.responses = set() 135 #print self.on_probation, self.hearts 136 # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts)) 137 self.pingstream.send(str_to_bytes(str(self.lifetime))) 138 # flush stream to force immediate socket send 139 self.pingstream.flush() 140 141 def _check_missed(self, missed_beats, on_probation, hearts): 142 """Update heartbeats on probation, identifying any that have too many misses. 143 """ 144 failures = [] 145 new_probation = {} 146 for cur_heart in (b for b in missed_beats if b in hearts): 147 miss_count = on_probation.get(cur_heart, 0) + 1 148 self.log.info("heartbeat::missed %s : %s" % (cur_heart, miss_count)) 149 if miss_count > self.max_heartmonitor_misses: 150 failures.append(cur_heart) 151 else: 152 new_probation[cur_heart] = miss_count 153 return failures, new_probation 154 155 def handle_new_heart(self, heart): 156 if self._new_handlers: 157 for handler in self._new_handlers: 158 handler(heart) 159 else: 160 self.log.info("heartbeat::yay, got new heart %s!", heart) 161 self.hearts.add(heart) 162 163 def handle_heart_failure(self, heart): 164 if self._failure_handlers: 165 for handler in self._failure_handlers: 166 try: 167 handler(heart) 168 except Exception as e: 169 self.log.error("heartbeat::Bad Handler! %s", handler, exc_info=True) 170 pass 171 else: 172 self.log.info("heartbeat::Heart %s failed :(", heart) 173 try: 174 self.hearts.remove(heart) 175 except KeyError: 176 self.log.info("heartbeat:: %s has already been removed." % heart) 177 178 179 @log_errors 180 def handle_pong(self, msg): 181 "a heart just beat" 182 current = str_to_bytes(str(self.lifetime)) 183 last = str_to_bytes(str(self.last_ping)) 184 if msg[1] == current: 185 delta = time.time()-self.tic 186 if self.debug: 187 self.log.debug("heartbeat::heart %r took %.2f ms to respond", msg[0], 1000*delta) 188 self.responses.add(msg[0]) 189 elif msg[1] == last: 190 delta = time.time()-self.tic + (self.lifetime-self.last_ping) 191 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond", msg[0], 1000*delta) 192 self.responses.add(msg[0]) 193 else: 194 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)", msg[1], self.lifetime) 195 196