1"""The client and server for a basic ping-pong style heartbeat. 2""" 3 4#----------------------------------------------------------------------------- 5# Copyright (C) 2008-2011 The IPython Development Team 6# 7# Distributed under the terms of the BSD License. The full license is in 8# the file COPYING, distributed as part of this software. 9#----------------------------------------------------------------------------- 10 11#----------------------------------------------------------------------------- 12# Imports 13#----------------------------------------------------------------------------- 14 15import errno 16import os 17import socket 18from threading import Thread 19 20import zmq 21 22from jupyter_client.localinterfaces import localhost 23 24#----------------------------------------------------------------------------- 25# Code 26#----------------------------------------------------------------------------- 27 28 29class Heartbeat(Thread): 30 "A simple ping-pong style heartbeat that runs in a thread." 31 32 def __init__(self, context, addr=None): 33 if addr is None: 34 addr = ('tcp', localhost(), 0) 35 Thread.__init__(self) 36 self.context = context 37 self.transport, self.ip, self.port = addr 38 self.original_port = self.port 39 if self.original_port == 0: 40 self.pick_port() 41 self.addr = (self.ip, self.port) 42 self.daemon = True 43 44 def pick_port(self): 45 if self.transport == 'tcp': 46 s = socket.socket() 47 # '*' means all interfaces to 0MQ, which is '' to socket.socket 48 s.bind(('' if self.ip == '*' else self.ip, 0)) 49 self.port = s.getsockname()[1] 50 s.close() 51 elif self.transport == 'ipc': 52 self.port = 1 53 while os.path.exists("%s-%s" % (self.ip, self.port)): 54 self.port = self.port + 1 55 else: 56 raise ValueError("Unrecognized zmq transport: %s" % self.transport) 57 return self.port 58 59 def _try_bind_socket(self): 60 c = ':' if self.transport == 'tcp' else '-' 61 return self.socket.bind('%s://%s' % (self.transport, self.ip) + c + str(self.port)) 62 63 def _bind_socket(self): 64 try: 65 win_in_use = errno.WSAEADDRINUSE 66 except AttributeError: 67 win_in_use = None 68 69 # Try up to 100 times to bind a port when in conflict to avoid 70 # infinite attempts in bad setups 71 max_attempts = 1 if self.original_port else 100 72 for attempt in range(max_attempts): 73 try: 74 self._try_bind_socket() 75 except zmq.ZMQError as ze: 76 if attempt == max_attempts - 1: 77 raise 78 # Raise if we have any error not related to socket binding 79 if ze.errno != errno.EADDRINUSE and ze.errno != win_in_use: 80 raise 81 # Raise if we have any error not related to socket binding 82 if self.original_port == 0: 83 self.pick_port() 84 else: 85 raise 86 else: 87 return 88 89 def run(self): 90 self.socket = self.context.socket(zmq.ROUTER) 91 self.socket.linger = 1000 92 try: 93 self._bind_socket() 94 except Exception: 95 self.socket.close() 96 raise 97 98 while True: 99 try: 100 zmq.device(zmq.QUEUE, self.socket, self.socket) 101 except zmq.ZMQError as e: 102 if e.errno == errno.EINTR: 103 # signal interrupt, resume heartbeat 104 continue 105 elif e.errno == zmq.ETERM: 106 # context terminated, close socket and exit 107 try: 108 self.socket.close() 109 except zmq.ZMQError: 110 # suppress further errors during cleanup 111 # this shouldn't happen, though 112 pass 113 break 114 elif e.errno == zmq.ENOTSOCK: 115 # socket closed elsewhere, exit 116 break 117 else: 118 raise 119 else: 120 break 121