1"""Classes for running 0MQ Devices in the background."""
2
3# Copyright (C) PyZMQ Developers
4# Distributed under the terms of the Modified BSD License.
5
6
7import time
8from threading import Thread
9from multiprocessing import Process
10from typing import Any
11
12from zmq import device, QUEUE, REQ, Context, ETERM, ZMQBindError, ZMQError
13
14
15class Device:
16    """A 0MQ Device to be run in the background.
17
18    You do not pass Socket instances to this, but rather Socket types::
19
20        Device(device_type, in_socket_type, out_socket_type)
21
22    For instance::
23
24        dev = Device(zmq.QUEUE, zmq.DEALER, zmq.ROUTER)
25
26    Similar to zmq.device, but socket types instead of sockets themselves are
27    passed, and the sockets are created in the work thread, to avoid issues
28    with thread safety. As a result, additional bind_{in|out} and
29    connect_{in|out} methods and setsockopt_{in|out} allow users to specify
30    connections for the sockets.
31
32    Parameters
33    ----------
34    device_type : int
35        The 0MQ Device type
36    {in|out}_type : int
37        zmq socket types, to be passed later to context.socket(). e.g.
38        zmq.PUB, zmq.SUB, zmq.REQ. If out_type is < 0, then in_socket is used
39        for both in_socket and out_socket.
40
41    Methods
42    -------
43    bind_{in_out}(iface)
44        passthrough for ``{in|out}_socket.bind(iface)``, to be called in the thread
45    connect_{in_out}(iface)
46        passthrough for ``{in|out}_socket.connect(iface)``, to be called in the
47        thread
48    setsockopt_{in_out}(opt,value)
49        passthrough for ``{in|out}_socket.setsockopt(opt, value)``, to be called in
50        the thread
51
52    Attributes
53    ----------
54    daemon : int
55        sets whether the thread should be run as a daemon
56        Default is true, because if it is false, the thread will not
57        exit unless it is killed
58    context_factory : callable (class attribute)
59        Function for creating the Context. This will be Context.instance
60        in ThreadDevices, and Context in ProcessDevices.  The only reason
61        it is not instance() in ProcessDevices is that there may be a stale
62        Context instance already initialized, and the forked environment
63        should *never* try to use it.
64    """
65
66    context_factory = Context.instance
67    """Callable that returns a context. Typically either Context.instance or Context,
68    depending on whether the device should share the global instance or not.
69    """
70
71    def __init__(self, device_type=QUEUE, in_type=None, out_type=None):
72        self.device_type = device_type
73        if in_type is None:
74            raise TypeError("in_type must be specified")
75        if out_type is None:
76            raise TypeError("out_type must be specified")
77        self.in_type = in_type
78        self.out_type = out_type
79        self._in_binds = []
80        self._in_connects = []
81        self._in_sockopts = []
82        self._out_binds = []
83        self._out_connects = []
84        self._out_sockopts = []
85        self._random_addrs = []
86        self.daemon = True
87        self.done = False
88
89    def bind_in(self, addr):
90        """Enqueue ZMQ address for binding on in_socket.
91
92        See zmq.Socket.bind for details.
93        """
94        self._in_binds.append(addr)
95
96    def bind_in_to_random_port(self, addr, *args, **kwargs):
97        """Enqueue a random port on the given interface for binding on
98        in_socket.
99
100        See zmq.Socket.bind_to_random_port for details.
101
102        .. versionadded:: 18.0
103        """
104        port = self._reserve_random_port(addr, *args, **kwargs)
105
106        self.bind_in('%s:%i' % (addr, port))
107
108        return port
109
110    def connect_in(self, addr):
111        """Enqueue ZMQ address for connecting on in_socket.
112
113        See zmq.Socket.connect for details.
114        """
115        self._in_connects.append(addr)
116
117    def setsockopt_in(self, opt, value):
118        """Enqueue setsockopt(opt, value) for in_socket
119
120        See zmq.Socket.setsockopt for details.
121        """
122        self._in_sockopts.append((opt, value))
123
124    def bind_out(self, addr):
125        """Enqueue ZMQ address for binding on out_socket.
126
127        See zmq.Socket.bind for details.
128        """
129        self._out_binds.append(addr)
130
131    def bind_out_to_random_port(self, addr, *args, **kwargs):
132        """Enqueue a random port on the given interface for binding on
133        out_socket.
134
135        See zmq.Socket.bind_to_random_port for details.
136
137        .. versionadded:: 18.0
138        """
139        port = self._reserve_random_port(addr, *args, **kwargs)
140
141        self.bind_out('%s:%i' % (addr, port))
142
143        return port
144
145    def connect_out(self, addr):
146        """Enqueue ZMQ address for connecting on out_socket.
147
148        See zmq.Socket.connect for details.
149        """
150        self._out_connects.append(addr)
151
152    def setsockopt_out(self, opt, value):
153        """Enqueue setsockopt(opt, value) for out_socket
154
155        See zmq.Socket.setsockopt for details.
156        """
157        self._out_sockopts.append((opt, value))
158
159    def _reserve_random_port(self, addr, *args, **kwargs):
160        ctx = Context()
161
162        binder = ctx.socket(REQ)
163
164        for i in range(5):
165            port = binder.bind_to_random_port(addr, *args, **kwargs)
166
167            new_addr = '%s:%i' % (addr, port)
168
169            if new_addr in self._random_addrs:
170                continue
171            else:
172                break
173        else:
174            raise ZMQBindError("Could not reserve random port.")
175
176        self._random_addrs.append(new_addr)
177
178        binder.close()
179
180        return port
181
182    def _setup_sockets(self):
183        ctx = self.context_factory()
184
185        self._context = ctx
186
187        # create the sockets
188        ins = ctx.socket(self.in_type)
189        if self.out_type < 0:
190            outs = ins
191        else:
192            outs = ctx.socket(self.out_type)
193
194        # set sockopts (must be done first, in case of zmq.IDENTITY)
195        for opt, value in self._in_sockopts:
196            ins.setsockopt(opt, value)
197        for opt, value in self._out_sockopts:
198            outs.setsockopt(opt, value)
199
200        for iface in self._in_binds:
201            ins.bind(iface)
202        for iface in self._out_binds:
203            outs.bind(iface)
204
205        for iface in self._in_connects:
206            ins.connect(iface)
207        for iface in self._out_connects:
208            outs.connect(iface)
209
210        return ins, outs
211
212    def run_device(self):
213        """The runner method.
214
215        Do not call me directly, instead call ``self.start()``, just like a Thread.
216        """
217        ins, outs = self._setup_sockets()
218        device(self.device_type, ins, outs)
219
220    def run(self):
221        """wrap run_device in try/catch ETERM"""
222        try:
223            self.run_device()
224        except ZMQError as e:
225            if e.errno == ETERM:
226                # silence TERM errors, because this should be a clean shutdown
227                pass
228            else:
229                raise
230        finally:
231            self.done = True
232
233    def start(self):
234        """Start the device. Override me in subclass for other launchers."""
235        return self.run()
236
237    def join(self, timeout=None):
238        """wait for me to finish, like Thread.join.
239
240        Reimplemented appropriately by subclasses."""
241        tic = time.time()
242        toc = tic
243        while not self.done and not (timeout is not None and toc - tic > timeout):
244            time.sleep(0.001)
245            toc = time.time()
246
247
248class BackgroundDevice(Device):
249    """Base class for launching Devices in background processes and threads."""
250
251    launcher: Any = None
252    _launch_class: Any = None
253
254    def start(self):
255        self.launcher = self._launch_class(target=self.run)
256        self.launcher.daemon = self.daemon
257        return self.launcher.start()
258
259    def join(self, timeout=None):
260        return self.launcher.join(timeout=timeout)
261
262
263class ThreadDevice(BackgroundDevice):
264    """A Device that will be run in a background Thread.
265
266    See Device for details.
267    """
268
269    _launch_class = Thread
270
271
272class ProcessDevice(BackgroundDevice):
273    """A Device that will be run in a background Process.
274
275    See Device for details.
276    """
277
278    _launch_class = Process
279    context_factory = Context
280    """Callable that returns a context. Typically either Context.instance or Context,
281    depending on whether the device should share the global instance or not.
282    """
283
284
285__all__ = ['Device', 'ThreadDevice', 'ProcessDevice']
286