1#!/usr/bin/env python
2"""
3A multi-heart Heartbeat system using PUB and ROUTER sockets. pings are sent out on the PUB,
4and hearts are tracked based on their DEALER identities.
5"""
6
7# Copyright (c) IPython Development Team.
8# Distributed under the terms of the Modified BSD License.
9
10from __future__ import print_function
11import time
12import uuid
13
14import zmq
15from zmq.devices import ThreadDevice, ThreadMonitoredQueue
16
17from traitlets.config.configurable import LoggingConfigurable
18from ipython_genutils.py3compat import str_to_bytes
19from traitlets import Set, Instance, Float, Integer, Dict, Bool
20
21from ipyparallel.util import log_errors, ioloop
22
23class Heart(object):
24    """A basic heart object for responding to a HeartMonitor.
25    This is a simple wrapper with defaults for the most common
26    Device model for responding to heartbeats.
27
28    It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
29    SUB/DEALER for in/out.
30
31    You can specify the DEALER's IDENTITY via the optional heart_id argument."""
32    device=None
33    id=None
34    def __init__(self, in_addr, out_addr, mon_addr=None, in_type=zmq.SUB, out_type=zmq.DEALER, mon_type=zmq.PUB, heart_id=None):
35        if mon_addr is None:
36            self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
37        else:
38            self.device = ThreadMonitoredQueue(in_type, out_type, mon_type, in_prefix=b"", out_prefix=b"")
39        # do not allow the device to share global Context.instance,
40        # which is the default behavior in pyzmq > 2.1.10
41        self.device.context_factory = zmq.Context
42
43        self.device.daemon=True
44        self.device.connect_in(in_addr)
45        self.device.connect_out(out_addr)
46        if mon_addr is not None:
47            self.device.connect_mon(mon_addr)
48        if in_type == zmq.SUB:
49            self.device.setsockopt_in(zmq.SUBSCRIBE, b"")
50        if heart_id is None:
51            heart_id = uuid.uuid4().bytes
52        self.device.setsockopt_out(zmq.IDENTITY, heart_id)
53        self.id = heart_id
54
55    def start(self):
56        return self.device.start()
57
58
59class HeartMonitor(LoggingConfigurable):
60    """A basic HeartMonitor class
61    pingstream: a PUB stream
62    pongstream: an ROUTER stream
63    period: the period of the heartbeat in milliseconds"""
64
65    debug = Bool(False, config=True,
66        help="""Whether to include every heartbeat in debugging output.
67
68        Has to be set explicitly, because there will be *a lot* of output.
69        """
70    )
71    period = Integer(3000, config=True,
72        help='The frequency at which the Hub pings the engines for heartbeats '
73        '(in ms)',
74    )
75    max_heartmonitor_misses = Integer(10, config=True,
76        help='Allowed consecutive missed pings from controller Hub to engine before unregistering.',
77    )
78
79    pingstream = Instance('zmq.eventloop.zmqstream.ZMQStream', allow_none=True)
80    pongstream = Instance('zmq.eventloop.zmqstream.ZMQStream', allow_none=True)
81    loop = Instance('tornado.ioloop.IOLoop')
82    def _loop_default(self):
83        return ioloop.IOLoop.current()
84
85    # not settable:
86    hearts = Set()
87    responses = Set()
88    on_probation = Dict()
89    last_ping = Float(0)
90    _new_handlers = Set()
91    _failure_handlers = Set()
92    lifetime = Float(0)
93    tic = Float(0)
94
95    def __init__(self, **kwargs):
96        super(HeartMonitor, self).__init__(**kwargs)
97
98        self.pongstream.on_recv(self.handle_pong)
99
100    def start(self):
101        self.tic = time.time()
102        self.caller = ioloop.PeriodicCallback(self.beat, self.period)
103        self.caller.start()
104
105    def add_new_heart_handler(self, handler):
106        """add a new handler for new hearts"""
107        self.log.debug("heartbeat::new_heart_handler: %s", handler)
108        self._new_handlers.add(handler)
109
110    def add_heart_failure_handler(self, handler):
111        """add a new handler for heart failure"""
112        self.log.debug("heartbeat::new heart failure handler: %s", handler)
113        self._failure_handlers.add(handler)
114
115    def beat(self):
116        self.pongstream.flush()
117        self.last_ping = self.lifetime
118
119        toc = time.time()
120        self.lifetime += toc-self.tic
121        self.tic = toc
122        if self.debug:
123            self.log.debug("heartbeat::sending %s", self.lifetime)
124        goodhearts = self.hearts.intersection(self.responses)
125        missed_beats = self.hearts.difference(goodhearts)
126        newhearts = self.responses.difference(goodhearts)
127        for heart in newhearts:
128            self.handle_new_heart(heart)
129        heartfailures, on_probation = self._check_missed(missed_beats, self.on_probation,
130                                                         self.hearts)
131        for failure in heartfailures:
132            self.handle_heart_failure(failure)
133        self.on_probation = on_probation
134        self.responses = set()
135        #print self.on_probation, self.hearts
136        # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts))
137        self.pingstream.send(str_to_bytes(str(self.lifetime)))
138        # flush stream to force immediate socket send
139        self.pingstream.flush()
140
141    def _check_missed(self, missed_beats, on_probation, hearts):
142        """Update heartbeats on probation, identifying any that have too many misses.
143        """
144        failures = []
145        new_probation = {}
146        for cur_heart in (b for b in missed_beats if b in hearts):
147            miss_count = on_probation.get(cur_heart, 0) + 1
148            self.log.info("heartbeat::missed %s : %s" % (cur_heart, miss_count))
149            if miss_count > self.max_heartmonitor_misses:
150                failures.append(cur_heart)
151            else:
152                new_probation[cur_heart] = miss_count
153        return failures, new_probation
154
155    def handle_new_heart(self, heart):
156        if self._new_handlers:
157            for handler in self._new_handlers:
158                handler(heart)
159        else:
160            self.log.info("heartbeat::yay, got new heart %s!", heart)
161        self.hearts.add(heart)
162
163    def handle_heart_failure(self, heart):
164        if self._failure_handlers:
165            for handler in self._failure_handlers:
166                try:
167                    handler(heart)
168                except Exception as e:
169                    self.log.error("heartbeat::Bad Handler! %s", handler, exc_info=True)
170                    pass
171        else:
172            self.log.info("heartbeat::Heart %s failed :(", heart)
173        try:
174            self.hearts.remove(heart)
175        except KeyError:
176            self.log.info("heartbeat:: %s has already been removed." % heart)
177
178
179    @log_errors
180    def handle_pong(self, msg):
181        "a heart just beat"
182        current = str_to_bytes(str(self.lifetime))
183        last = str_to_bytes(str(self.last_ping))
184        if msg[1] == current:
185            delta = time.time()-self.tic
186            if self.debug:
187                self.log.debug("heartbeat::heart %r took %.2f ms to respond", msg[0], 1000*delta)
188            self.responses.add(msg[0])
189        elif msg[1] == last:
190            delta = time.time()-self.tic + (self.lifetime-self.last_ping)
191            self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond", msg[0], 1000*delta)
192            self.responses.add(msg[0])
193        else:
194            self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)", msg[1], self.lifetime)
195
196