1"""Classes for running 0MQ Devices in the background.""" 2 3# Copyright (C) PyZMQ Developers 4# Distributed under the terms of the Modified BSD License. 5 6 7import time 8from threading import Thread 9from multiprocessing import Process 10from typing import Any 11 12from zmq import device, QUEUE, REQ, Context, ETERM, ZMQBindError, ZMQError 13 14 15class Device: 16 """A 0MQ Device to be run in the background. 17 18 You do not pass Socket instances to this, but rather Socket types:: 19 20 Device(device_type, in_socket_type, out_socket_type) 21 22 For instance:: 23 24 dev = Device(zmq.QUEUE, zmq.DEALER, zmq.ROUTER) 25 26 Similar to zmq.device, but socket types instead of sockets themselves are 27 passed, and the sockets are created in the work thread, to avoid issues 28 with thread safety. As a result, additional bind_{in|out} and 29 connect_{in|out} methods and setsockopt_{in|out} allow users to specify 30 connections for the sockets. 31 32 Parameters 33 ---------- 34 device_type : int 35 The 0MQ Device type 36 {in|out}_type : int 37 zmq socket types, to be passed later to context.socket(). e.g. 38 zmq.PUB, zmq.SUB, zmq.REQ. If out_type is < 0, then in_socket is used 39 for both in_socket and out_socket. 40 41 Methods 42 ------- 43 bind_{in_out}(iface) 44 passthrough for ``{in|out}_socket.bind(iface)``, to be called in the thread 45 connect_{in_out}(iface) 46 passthrough for ``{in|out}_socket.connect(iface)``, to be called in the 47 thread 48 setsockopt_{in_out}(opt,value) 49 passthrough for ``{in|out}_socket.setsockopt(opt, value)``, to be called in 50 the thread 51 52 Attributes 53 ---------- 54 daemon : int 55 sets whether the thread should be run as a daemon 56 Default is true, because if it is false, the thread will not 57 exit unless it is killed 58 context_factory : callable (class attribute) 59 Function for creating the Context. This will be Context.instance 60 in ThreadDevices, and Context in ProcessDevices. The only reason 61 it is not instance() in ProcessDevices is that there may be a stale 62 Context instance already initialized, and the forked environment 63 should *never* try to use it. 64 """ 65 66 context_factory = Context.instance 67 """Callable that returns a context. Typically either Context.instance or Context, 68 depending on whether the device should share the global instance or not. 69 """ 70 71 def __init__(self, device_type=QUEUE, in_type=None, out_type=None): 72 self.device_type = device_type 73 if in_type is None: 74 raise TypeError("in_type must be specified") 75 if out_type is None: 76 raise TypeError("out_type must be specified") 77 self.in_type = in_type 78 self.out_type = out_type 79 self._in_binds = [] 80 self._in_connects = [] 81 self._in_sockopts = [] 82 self._out_binds = [] 83 self._out_connects = [] 84 self._out_sockopts = [] 85 self._random_addrs = [] 86 self.daemon = True 87 self.done = False 88 89 def bind_in(self, addr): 90 """Enqueue ZMQ address for binding on in_socket. 91 92 See zmq.Socket.bind for details. 93 """ 94 self._in_binds.append(addr) 95 96 def bind_in_to_random_port(self, addr, *args, **kwargs): 97 """Enqueue a random port on the given interface for binding on 98 in_socket. 99 100 See zmq.Socket.bind_to_random_port for details. 101 102 .. versionadded:: 18.0 103 """ 104 port = self._reserve_random_port(addr, *args, **kwargs) 105 106 self.bind_in('%s:%i' % (addr, port)) 107 108 return port 109 110 def connect_in(self, addr): 111 """Enqueue ZMQ address for connecting on in_socket. 112 113 See zmq.Socket.connect for details. 114 """ 115 self._in_connects.append(addr) 116 117 def setsockopt_in(self, opt, value): 118 """Enqueue setsockopt(opt, value) for in_socket 119 120 See zmq.Socket.setsockopt for details. 121 """ 122 self._in_sockopts.append((opt, value)) 123 124 def bind_out(self, addr): 125 """Enqueue ZMQ address for binding on out_socket. 126 127 See zmq.Socket.bind for details. 128 """ 129 self._out_binds.append(addr) 130 131 def bind_out_to_random_port(self, addr, *args, **kwargs): 132 """Enqueue a random port on the given interface for binding on 133 out_socket. 134 135 See zmq.Socket.bind_to_random_port for details. 136 137 .. versionadded:: 18.0 138 """ 139 port = self._reserve_random_port(addr, *args, **kwargs) 140 141 self.bind_out('%s:%i' % (addr, port)) 142 143 return port 144 145 def connect_out(self, addr): 146 """Enqueue ZMQ address for connecting on out_socket. 147 148 See zmq.Socket.connect for details. 149 """ 150 self._out_connects.append(addr) 151 152 def setsockopt_out(self, opt, value): 153 """Enqueue setsockopt(opt, value) for out_socket 154 155 See zmq.Socket.setsockopt for details. 156 """ 157 self._out_sockopts.append((opt, value)) 158 159 def _reserve_random_port(self, addr, *args, **kwargs): 160 ctx = Context() 161 162 binder = ctx.socket(REQ) 163 164 for i in range(5): 165 port = binder.bind_to_random_port(addr, *args, **kwargs) 166 167 new_addr = '%s:%i' % (addr, port) 168 169 if new_addr in self._random_addrs: 170 continue 171 else: 172 break 173 else: 174 raise ZMQBindError("Could not reserve random port.") 175 176 self._random_addrs.append(new_addr) 177 178 binder.close() 179 180 return port 181 182 def _setup_sockets(self): 183 ctx = self.context_factory() 184 185 self._context = ctx 186 187 # create the sockets 188 ins = ctx.socket(self.in_type) 189 if self.out_type < 0: 190 outs = ins 191 else: 192 outs = ctx.socket(self.out_type) 193 194 # set sockopts (must be done first, in case of zmq.IDENTITY) 195 for opt, value in self._in_sockopts: 196 ins.setsockopt(opt, value) 197 for opt, value in self._out_sockopts: 198 outs.setsockopt(opt, value) 199 200 for iface in self._in_binds: 201 ins.bind(iface) 202 for iface in self._out_binds: 203 outs.bind(iface) 204 205 for iface in self._in_connects: 206 ins.connect(iface) 207 for iface in self._out_connects: 208 outs.connect(iface) 209 210 return ins, outs 211 212 def run_device(self): 213 """The runner method. 214 215 Do not call me directly, instead call ``self.start()``, just like a Thread. 216 """ 217 ins, outs = self._setup_sockets() 218 device(self.device_type, ins, outs) 219 220 def run(self): 221 """wrap run_device in try/catch ETERM""" 222 try: 223 self.run_device() 224 except ZMQError as e: 225 if e.errno == ETERM: 226 # silence TERM errors, because this should be a clean shutdown 227 pass 228 else: 229 raise 230 finally: 231 self.done = True 232 233 def start(self): 234 """Start the device. Override me in subclass for other launchers.""" 235 return self.run() 236 237 def join(self, timeout=None): 238 """wait for me to finish, like Thread.join. 239 240 Reimplemented appropriately by subclasses.""" 241 tic = time.time() 242 toc = tic 243 while not self.done and not (timeout is not None and toc - tic > timeout): 244 time.sleep(0.001) 245 toc = time.time() 246 247 248class BackgroundDevice(Device): 249 """Base class for launching Devices in background processes and threads.""" 250 251 launcher: Any = None 252 _launch_class: Any = None 253 254 def start(self): 255 self.launcher = self._launch_class(target=self.run) 256 self.launcher.daemon = self.daemon 257 return self.launcher.start() 258 259 def join(self, timeout=None): 260 return self.launcher.join(timeout=timeout) 261 262 263class ThreadDevice(BackgroundDevice): 264 """A Device that will be run in a background Thread. 265 266 See Device for details. 267 """ 268 269 _launch_class = Thread 270 271 272class ProcessDevice(BackgroundDevice): 273 """A Device that will be run in a background Process. 274 275 See Device for details. 276 """ 277 278 _launch_class = Process 279 context_factory = Context 280 """Callable that returns a context. Typically either Context.instance or Context, 281 depending on whether the device should share the global instance or not. 282 """ 283 284 285__all__ = ['Device', 'ThreadDevice', 'ProcessDevice'] 286