1# coding: utf-8
2"""0MQ Socket pure Python methods."""
3
4# Copyright (C) PyZMQ Developers
5# Distributed under the terms of the Modified BSD License.
6
7
8import errno
9import random
10import sys
11import warnings
12
13import zmq
14from zmq.backend import Socket as SocketBase
15from .poll import Poller
16from . import constants
17from .attrsettr import AttributeSetter
18from zmq.error import ZMQError, ZMQBindError
19from zmq.utils import jsonapi
20from zmq.utils.strtypes import bytes, unicode
21
22
23from .constants import (
24    SNDMORE,
25    ENOTSUP,
26    POLLIN,
27    int64_sockopt_names,
28    int_sockopt_names,
29    bytes_sockopt_names,
30    fd_sockopt_names,
31    socket_types,
32)
33import pickle
34
35try:
36    DEFAULT_PROTOCOL = pickle.DEFAULT_PROTOCOL
37except AttributeError:
38    DEFAULT_PROTOCOL = pickle.HIGHEST_PROTOCOL
39
40
41class _SocketContext:
42    """Context Manager for socket bind/unbind"""
43
44    def __repr__(self):
45        return f"<SocketContext({self.kind}={self.addr!r})>"
46
47    def __init__(self, socket, kind, addr):
48        assert kind in {"bind", "connect"}
49        self.socket = socket
50        self.kind = kind
51        self.addr = addr
52
53    def __enter__(self):
54        return self.socket
55
56    def __exit__(self, *args):
57        if self.socket.closed:
58            return
59        if self.kind == "bind":
60            self.socket.unbind(self.addr)
61        elif self.kind == "connect":
62            self.socket.disconnect(self.addr)
63
64
65class Socket(SocketBase, AttributeSetter):
66    """The ZMQ socket object
67
68    To create a Socket, first create a Context::
69
70        ctx = zmq.Context.instance()
71
72    then call ``ctx.socket(socket_type)``::
73
74        s = ctx.socket(zmq.ROUTER)
75
76    """
77
78    _shadow = False
79    _monitor_socket = None
80    _type_name = 'UNKNOWN'
81
82    def __init__(self, *a, **kw):
83        super().__init__(*a, **kw)
84        if 'shadow' in kw:
85            self._shadow = True
86        else:
87            self._shadow = False
88        try:
89            socket_type = self.get(zmq.TYPE)
90        except Exception:
91            pass
92        else:
93            self._type_name = socket_types.get(socket_type, str(socket_type))
94
95    def __del__(self):
96        if not self._shadow and not self.closed:
97            warnings.warn(
98                f"unclosed socket {self}",
99                ResourceWarning,
100                stacklevel=2,
101                source=self,
102            )
103            self.close()
104
105    _repr_cls = "zmq.Socket"
106
107    def __repr__(self):
108        cls = self.__class__
109        # look up _repr_cls on exact class, not inherited
110        _repr_cls = cls.__dict__.get("_repr_cls", None)
111        if _repr_cls is None:
112            _repr_cls = f"{cls.__module__}.{cls.__name__}"
113
114        closed = ' closed' if self._closed else ''
115
116        return f"<{_repr_cls}(zmq.{self._type_name}) at {hex(id(self))}{closed}>"
117
118    # socket as context manager:
119    def __enter__(self):
120        """Sockets are context managers
121
122        .. versionadded:: 14.4
123        """
124        return self
125
126    def __exit__(self, *args, **kwargs):
127        self.close()
128
129    # -------------------------------------------------------------------------
130    # Socket creation
131    # -------------------------------------------------------------------------
132
133    def __copy__(self, memo=None):
134        """Copying a Socket creates a shadow copy"""
135        return self.__class__.shadow(self.underlying)
136
137    __deepcopy__ = __copy__
138
139    @classmethod
140    def shadow(cls, address):
141        """Shadow an existing libzmq socket
142
143        address is the integer address of the libzmq socket
144        or an FFI pointer to it.
145
146        .. versionadded:: 14.1
147        """
148        from zmq.utils.interop import cast_int_addr
149
150        address = cast_int_addr(address)
151        return cls(shadow=address)
152
153    def close(self, linger=None):
154        """
155        Close the socket.
156
157        If linger is specified, LINGER sockopt will be set prior to closing.
158
159        Note: closing a zmq Socket may not close the underlying sockets
160        if there are undelivered messages.
161        Only after all messages are delivered or discarded by reaching the socket's LINGER timeout
162        (default: forever)
163        will the underlying sockets be closed.
164
165        This can be called to close the socket by hand. If this is not
166        called, the socket will automatically be closed when it is
167        garbage collected.
168        """
169        if self.context:
170            self.context._rm_socket(self)
171        super(Socket, self).close(linger=linger)
172
173    # -------------------------------------------------------------------------
174    # Connect/Bind context managers
175    # -------------------------------------------------------------------------
176
177    def _connect_cm(self, addr):
178        """Context manager to disconnect on exit
179
180        .. versionadded:: 20.0
181        """
182        return _SocketContext(self, 'connect', addr)
183
184    def _bind_cm(self, addr):
185        """Context manager to unbind on exit
186
187        .. versionadded:: 20.0
188        """
189        return _SocketContext(self, 'bind', addr)
190
191    def bind(self, addr):
192        """s.bind(addr)
193
194        Bind the socket to an address.
195
196        This causes the socket to listen on a network port. Sockets on the
197        other side of this connection will use ``Socket.connect(addr)`` to
198        connect to this socket.
199
200        Returns a context manager which will call unbind on exit.
201
202        .. versionadded:: 20.0
203            Can be used as a context manager.
204
205        Parameters
206        ----------
207        addr : str
208            The address string. This has the form 'protocol://interface:port',
209            for example 'tcp://127.0.0.1:5555'. Protocols supported include
210            tcp, udp, pgm, epgm, inproc and ipc. If the address is unicode, it is
211            encoded to utf-8 first.
212
213        """
214        super().bind(addr)
215        return self._bind_cm(addr)
216
217    def connect(self, addr):
218        """s.connect(addr)
219
220        Connect to a remote 0MQ socket.
221
222        Returns a context manager which will call disconnect on exit.
223
224        .. versionadded:: 20.0
225            Can be used as a context manager.
226
227        Parameters
228        ----------
229        addr : str
230            The address string. This has the form 'protocol://interface:port',
231            for example 'tcp://127.0.0.1:5555'. Protocols supported are
232            tcp, upd, pgm, inproc and ipc. If the address is unicode, it is
233            encoded to utf-8 first.
234
235        """
236        super().connect(addr)
237        return self._connect_cm(addr)
238
239    # -------------------------------------------------------------------------
240    # Deprecated aliases
241    # -------------------------------------------------------------------------
242
243    @property
244    def socket_type(self) -> int:
245        warnings.warn(
246            "Socket.socket_type is deprecated, use Socket.type", DeprecationWarning
247        )
248        return self.type
249
250    # -------------------------------------------------------------------------
251    # Hooks for sockopt completion
252    # -------------------------------------------------------------------------
253
254    def __dir__(self):
255        keys = dir(self.__class__)
256        for collection in (
257            bytes_sockopt_names,
258            int_sockopt_names,
259            int64_sockopt_names,
260            fd_sockopt_names,
261        ):
262            keys.extend(collection)
263        return keys
264
265    # -------------------------------------------------------------------------
266    # Getting/Setting options
267    # -------------------------------------------------------------------------
268    setsockopt = SocketBase.set
269    getsockopt = SocketBase.get
270
271    def __setattr__(self, key, value):
272        """Override to allow setting zmq.[UN]SUBSCRIBE even though we have a subscribe method"""
273        if key in self.__dict__:
274            object.__setattr__(self, key, value)
275            return
276        _key = key.lower()
277        if _key in ('subscribe', 'unsubscribe'):
278
279            if isinstance(value, unicode):
280                value = value.encode('utf8')
281            if _key == 'subscribe':
282                self.set(zmq.SUBSCRIBE, value)
283            else:
284                self.set(zmq.UNSUBSCRIBE, value)
285            return
286        super(Socket, self).__setattr__(key, value)
287
288    def fileno(self):
289        """Return edge-triggered file descriptor for this socket.
290
291        This is a read-only edge-triggered file descriptor for both read and write events on this socket.
292        It is important that all available events be consumed when an event is detected,
293        otherwise the read event will not trigger again.
294
295        .. versionadded:: 17.0
296        """
297        return self.FD
298
299    def subscribe(self, topic):
300        """Subscribe to a topic
301
302        Only for SUB sockets.
303
304        .. versionadded:: 15.3
305        """
306        if isinstance(topic, unicode):
307            topic = topic.encode('utf8')
308        self.set(zmq.SUBSCRIBE, topic)
309
310    def unsubscribe(self, topic):
311        """Unsubscribe from a topic
312
313        Only for SUB sockets.
314
315        .. versionadded:: 15.3
316        """
317        if isinstance(topic, unicode):
318            topic = topic.encode('utf8')
319        self.set(zmq.UNSUBSCRIBE, topic)
320
321    def set_string(self, option, optval, encoding='utf-8'):
322        """Set socket options with a unicode object.
323
324        This is simply a wrapper for setsockopt to protect from encoding ambiguity.
325
326        See the 0MQ documentation for details on specific options.
327
328        Parameters
329        ----------
330        option : int
331            The name of the option to set. Can be any of: SUBSCRIBE,
332            UNSUBSCRIBE, IDENTITY
333        optval : str
334            The value of the option to set.
335        encoding : str
336            The encoding to be used, default is utf8
337        """
338        if not isinstance(optval, unicode):
339            raise TypeError("unicode strings only")
340        return self.set(option, optval.encode(encoding))
341
342    setsockopt_unicode = setsockopt_string = set_string
343
344    def get_string(self, option, encoding='utf-8'):
345        """Get the value of a socket option.
346
347        See the 0MQ documentation for details on specific options.
348
349        Parameters
350        ----------
351        option : int
352            The option to retrieve.
353
354        Returns
355        -------
356        optval : str
357            The value of the option as a unicode string.
358        """
359
360        if option not in constants.bytes_sockopts:
361            raise TypeError("option %i will not return a string to be decoded" % option)
362        return self.getsockopt(option).decode(encoding)
363
364    getsockopt_unicode = getsockopt_string = get_string
365
366    def bind_to_random_port(self, addr, min_port=49152, max_port=65536, max_tries=100):
367        """Bind this socket to a random port in a range.
368
369        If the port range is unspecified, the system will choose the port.
370
371        Parameters
372        ----------
373        addr : str
374            The address string without the port to pass to ``Socket.bind()``.
375        min_port : int, optional
376            The minimum port in the range of ports to try (inclusive).
377        max_port : int, optional
378            The maximum port in the range of ports to try (exclusive).
379        max_tries : int, optional
380            The maximum number of bind attempts to make.
381
382        Returns
383        -------
384        port : int
385            The port the socket was bound to.
386
387        Raises
388        ------
389        ZMQBindError
390            if `max_tries` reached before successful bind
391        """
392        if (
393            hasattr(constants, 'LAST_ENDPOINT')
394            and min_port == 49152
395            and max_port == 65536
396        ):
397            # if LAST_ENDPOINT is supported, and min_port / max_port weren't specified,
398            # we can bind to port 0 and let the OS do the work
399            self.bind("%s:*" % addr)
400            url = self.last_endpoint.decode('ascii', 'replace')
401            _, port_s = url.rsplit(':', 1)
402            return int(port_s)
403
404        for i in range(max_tries):
405            try:
406                port = random.randrange(min_port, max_port)
407                self.bind('%s:%s' % (addr, port))
408            except ZMQError as exception:
409                en = exception.errno
410                if en == zmq.EADDRINUSE:
411                    continue
412                elif sys.platform == 'win32' and en == errno.EACCES:
413                    continue
414                else:
415                    raise
416            else:
417                return port
418        raise ZMQBindError("Could not bind socket to random port.")
419
420    def get_hwm(self):
421        """Get the High Water Mark.
422
423        On libzmq ≥ 3, this gets SNDHWM if available, otherwise RCVHWM
424        """
425        major = zmq.zmq_version_info()[0]
426        if major >= 3:
427            # return sndhwm, fallback on rcvhwm
428            try:
429                return self.getsockopt(zmq.SNDHWM)
430            except zmq.ZMQError:
431                pass
432
433            return self.getsockopt(zmq.RCVHWM)
434        else:
435            return self.getsockopt(zmq.HWM)
436
437    def set_hwm(self, value):
438        """Set the High Water Mark.
439
440        On libzmq ≥ 3, this sets both SNDHWM and RCVHWM
441
442
443        .. warning::
444
445            New values only take effect for subsequent socket
446            bind/connects.
447        """
448        major = zmq.zmq_version_info()[0]
449        if major >= 3:
450            raised = None
451            try:
452                self.sndhwm = value
453            except Exception as e:
454                raised = e
455            try:
456                self.rcvhwm = value
457            except Exception as e:
458                raised = e
459
460            if raised:
461                raise raised
462        else:
463            return self.setsockopt(zmq.HWM, value)
464
465    hwm = property(
466        get_hwm,
467        set_hwm,
468        None,
469        """Property for High Water Mark.
470
471        Setting hwm sets both SNDHWM and RCVHWM as appropriate.
472        It gets SNDHWM if available, otherwise RCVHWM.
473        """,
474    )
475
476    # -------------------------------------------------------------------------
477    # Sending and receiving messages
478    # -------------------------------------------------------------------------
479
480    def send(self, data, flags=0, copy=True, track=False, routing_id=None, group=None):
481        """Send a single zmq message frame on this socket.
482
483        This queues the message to be sent by the IO thread at a later time.
484
485        With flags=NOBLOCK, this raises :class:`ZMQError` if the queue is full;
486        otherwise, this waits until space is available.
487        See :class:`Poller` for more general non-blocking I/O.
488
489        Parameters
490        ----------
491        data : bytes, Frame, memoryview
492            The content of the message. This can be any object that provides
493            the Python buffer API (i.e. `memoryview(data)` can be called).
494        flags : int
495            0, NOBLOCK, SNDMORE, or NOBLOCK|SNDMORE.
496        copy : bool
497            Should the message be sent in a copying or non-copying manner.
498        track : bool
499            Should the message be tracked for notification that ZMQ has
500            finished with it? (ignored if copy=True)
501        routing_id : int
502            For use with SERVER sockets
503        group : str
504            For use with RADIO sockets
505
506        Returns
507        -------
508        None : if `copy` or not track
509            None if message was sent, raises an exception otherwise.
510        MessageTracker : if track and not copy
511            a MessageTracker object, whose `pending` property will
512            be True until the send is completed.
513
514        Raises
515        ------
516        TypeError
517            If a unicode object is passed
518        ValueError
519            If `track=True`, but an untracked Frame is passed.
520        ZMQError
521            If the send does not succeed for any reason (including
522            if NOBLOCK is set and the outgoing queue is full).
523
524
525        .. versionchanged:: 17.0
526
527            DRAFT support for routing_id and group arguments.
528        """
529        if routing_id is not None:
530            if not isinstance(data, zmq.Frame):
531                data = zmq.Frame(
532                    data,
533                    track=track,
534                    copy=copy or None,
535                    copy_threshold=self.copy_threshold,
536                )
537            data.routing_id = routing_id
538        if group is not None:
539            if not isinstance(data, zmq.Frame):
540                data = zmq.Frame(
541                    data,
542                    track=track,
543                    copy=copy or None,
544                    copy_threshold=self.copy_threshold,
545                )
546            data.group = group
547        return super(Socket, self).send(data, flags=flags, copy=copy, track=track)
548
549    def send_multipart(self, msg_parts, flags=0, copy=True, track=False, **kwargs):
550        """Send a sequence of buffers as a multipart message.
551
552        The zmq.SNDMORE flag is added to all msg parts before the last.
553
554        Parameters
555        ----------
556        msg_parts : iterable
557            A sequence of objects to send as a multipart message. Each element
558            can be any sendable object (Frame, bytes, buffer-providers)
559        flags : int, optional
560            Any valid flags for :func:`Socket.send`.
561            SNDMORE is added automatically for frames before the last.
562        copy : bool, optional
563            Should the frame(s) be sent in a copying or non-copying manner.
564            If copy=False, frames smaller than self.copy_threshold bytes
565            will be copied anyway.
566        track : bool, optional
567            Should the frame(s) be tracked for notification that ZMQ has
568            finished with it (ignored if copy=True).
569
570        Returns
571        -------
572        None : if copy or not track
573        MessageTracker : if track and not copy
574            a MessageTracker object, whose `pending` property will
575            be True until the last send is completed.
576        """
577        # typecheck parts before sending:
578        for i, msg in enumerate(msg_parts):
579            if isinstance(msg, (zmq.Frame, bytes, memoryview)):
580                continue
581            try:
582                memoryview(msg)
583            except Exception:
584                rmsg = repr(msg)
585                if len(rmsg) > 32:
586                    rmsg = rmsg[:32] + '...'
587                raise TypeError(
588                    "Frame %i (%s) does not support the buffer interface."
589                    % (
590                        i,
591                        rmsg,
592                    )
593                )
594        for msg in msg_parts[:-1]:
595            self.send(msg, SNDMORE | flags, copy=copy, track=track)
596        # Send the last part without the extra SNDMORE flag.
597        return self.send(msg_parts[-1], flags, copy=copy, track=track)
598
599    def recv_multipart(self, flags=0, copy=True, track=False):
600        """Receive a multipart message as a list of bytes or Frame objects
601
602        Parameters
603        ----------
604        flags : int, optional
605            Any valid flags for :func:`Socket.recv`.
606        copy : bool, optional
607            Should the message frame(s) be received in a copying or non-copying manner?
608            If False a Frame object is returned for each part, if True a copy of
609            the bytes is made for each frame.
610        track : bool, optional
611            Should the message frame(s) be tracked for notification that ZMQ has
612            finished with it? (ignored if copy=True)
613
614        Returns
615        -------
616        msg_parts : list
617            A list of frames in the multipart message; either Frames or bytes,
618            depending on `copy`.
619
620        Raises
621        ------
622        ZMQError
623            for any of the reasons :func:`~Socket.recv` might fail
624        """
625        parts = [self.recv(flags, copy=copy, track=track)]
626        # have first part already, only loop while more to receive
627        while self.getsockopt(zmq.RCVMORE):
628            part = self.recv(flags, copy=copy, track=track)
629            parts.append(part)
630
631        return parts
632
633    def _deserialize(self, recvd, load):
634        """Deserialize a received message
635
636        Override in subclass (e.g. Futures) if recvd is not the raw bytes.
637
638        The default implementation expects bytes and returns the deserialized message immediately.
639
640        Parameters
641        ----------
642
643        load: callable
644            Callable that deserializes bytes
645        recvd:
646            The object returned by self.recv
647
648        """
649        return load(recvd)
650
651    def send_serialized(self, msg, serialize, flags=0, copy=True, **kwargs):
652        """Send a message with a custom serialization function.
653
654        .. versionadded:: 17
655
656        Parameters
657        ----------
658        msg : The message to be sent. Can be any object serializable by `serialize`.
659        serialize : callable
660            The serialization function to use.
661            serialize(msg) should return an iterable of sendable message frames
662            (e.g. bytes objects), which will be passed to send_multipart.
663        flags : int, optional
664            Any valid flags for :func:`Socket.send`.
665        copy : bool, optional
666            Whether to copy the frames.
667
668        """
669        frames = serialize(msg)
670        return self.send_multipart(frames, flags=flags, copy=copy, **kwargs)
671
672    def recv_serialized(self, deserialize, flags=0, copy=True):
673        """Receive a message with a custom deserialization function.
674
675        .. versionadded:: 17
676
677        Parameters
678        ----------
679        deserialize : callable
680            The deserialization function to use.
681            deserialize will be called with one argument: the list of frames
682            returned by recv_multipart() and can return any object.
683        flags : int, optional
684            Any valid flags for :func:`Socket.recv`.
685        copy : bool, optional
686            Whether to recv bytes or Frame objects.
687
688        Returns
689        -------
690        obj : object
691            The object returned by the deserialization function.
692
693        Raises
694        ------
695        ZMQError
696            for any of the reasons :func:`~Socket.recv` might fail
697        """
698        frames = self.recv_multipart(flags=flags, copy=copy)
699        return self._deserialize(frames, deserialize)
700
701    def send_string(self, u, flags=0, copy=True, encoding='utf-8', **kwargs):
702        """Send a Python unicode string as a message with an encoding.
703
704        0MQ communicates with raw bytes, so you must encode/decode
705        text (str) around 0MQ.
706
707        Parameters
708        ----------
709        u : str
710            The unicode string to send.
711        flags : int, optional
712            Any valid flags for :func:`Socket.send`.
713        encoding : str [default: 'utf-8']
714            The encoding to be used
715        """
716        if not isinstance(u, str):
717            raise TypeError("str objects only")
718        return self.send(u.encode(encoding), flags=flags, copy=copy, **kwargs)
719
720    send_unicode = send_string
721
722    def recv_string(self, flags=0, encoding='utf-8'):
723        """Receive a unicode string, as sent by send_string.
724
725        Parameters
726        ----------
727        flags : int
728            Any valid flags for :func:`Socket.recv`.
729        encoding : str [default: 'utf-8']
730            The encoding to be used
731
732        Returns
733        -------
734        s : str
735            The Python unicode string that arrives as encoded bytes.
736
737        Raises
738        ------
739        ZMQError
740            for any of the reasons :func:`~Socket.recv` might fail
741        """
742        msg = self.recv(flags=flags)
743        return self._deserialize(msg, lambda buf: buf.decode(encoding))
744
745    recv_unicode = recv_string
746
747    def send_pyobj(self, obj, flags=0, protocol=DEFAULT_PROTOCOL, **kwargs):
748        """Send a Python object as a message using pickle to serialize.
749
750        Parameters
751        ----------
752        obj : Python object
753            The Python object to send.
754        flags : int
755            Any valid flags for :func:`Socket.send`.
756        protocol : int
757            The pickle protocol number to use. The default is pickle.DEFAULT_PROTOCOL
758            where defined, and pickle.HIGHEST_PROTOCOL elsewhere.
759        """
760        msg = pickle.dumps(obj, protocol)
761        return self.send(msg, flags=flags, **kwargs)
762
763    def recv_pyobj(self, flags=0):
764        """Receive a Python object as a message using pickle to serialize.
765
766        Parameters
767        ----------
768        flags : int
769            Any valid flags for :func:`Socket.recv`.
770
771        Returns
772        -------
773        obj : Python object
774            The Python object that arrives as a message.
775
776        Raises
777        ------
778        ZMQError
779            for any of the reasons :func:`~Socket.recv` might fail
780        """
781        msg = self.recv(flags)
782        return self._deserialize(msg, pickle.loads)
783
784    def send_json(self, obj, flags=0, **kwargs):
785        """Send a Python object as a message using json to serialize.
786
787        Keyword arguments are passed on to json.dumps
788
789        Parameters
790        ----------
791        obj : Python object
792            The Python object to send
793        flags : int
794            Any valid flags for :func:`Socket.send`
795        """
796        send_kwargs = {}
797        for key in ('routing_id', 'group'):
798            if key in kwargs:
799                send_kwargs[key] = kwargs.pop(key)
800        msg = jsonapi.dumps(obj, **kwargs)
801        return self.send(msg, flags=flags, **send_kwargs)
802
803    def recv_json(self, flags=0, **kwargs):
804        """Receive a Python object as a message using json to serialize.
805
806        Keyword arguments are passed on to json.loads
807
808        Parameters
809        ----------
810        flags : int
811            Any valid flags for :func:`Socket.recv`.
812
813        Returns
814        -------
815        obj : Python object
816            The Python object that arrives as a message.
817
818        Raises
819        ------
820        ZMQError
821            for any of the reasons :func:`~Socket.recv` might fail
822        """
823        msg = self.recv(flags)
824        return self._deserialize(msg, lambda buf: jsonapi.loads(buf, **kwargs))
825
826    _poller_class = Poller
827
828    def poll(self, timeout=None, flags=POLLIN):
829        """Poll the socket for events.
830        See :class:`Poller` to wait for multiple sockets at once.
831
832        Parameters
833        ----------
834        timeout : int [default: None]
835            The timeout (in milliseconds) to wait for an event. If unspecified
836            (or specified None), will wait forever for an event.
837        flags : int [default: POLLIN]
838            POLLIN, POLLOUT, or POLLIN|POLLOUT. The event flags to poll for.
839
840        Returns
841        -------
842        event_mask : int
843            The poll event mask (POLLIN, POLLOUT),
844            0 if the timeout was reached without an event.
845        """
846
847        if self.closed:
848            raise ZMQError(ENOTSUP)
849
850        p = self._poller_class()
851        p.register(self, flags)
852        evts = dict(p.poll(timeout))
853        # return 0 if no events, otherwise return event bitfield
854        return evts.get(self, 0)
855
856    def get_monitor_socket(self, events=None, addr=None):
857        """Return a connected PAIR socket ready to receive the event notifications.
858
859        .. versionadded:: libzmq-4.0
860        .. versionadded:: 14.0
861
862        Parameters
863        ----------
864        events : int [default: ZMQ_EVENT_ALL]
865            The bitmask defining which events are wanted.
866        addr :  string [default: None]
867            The optional endpoint for the monitoring sockets.
868
869        Returns
870        -------
871        socket :  (PAIR)
872            The socket is already connected and ready to receive messages.
873        """
874        # safe-guard, method only available on libzmq >= 4
875        if zmq.zmq_version_info() < (4,):
876            raise NotImplementedError(
877                "get_monitor_socket requires libzmq >= 4, have %s" % zmq.zmq_version()
878            )
879
880        # if already monitoring, return existing socket
881        if self._monitor_socket:
882            if self._monitor_socket.closed:
883                self._monitor_socket = None
884            else:
885                return self._monitor_socket
886
887        if addr is None:
888            # create endpoint name from internal fd
889            addr = "inproc://monitor.s-%d" % self.FD
890        if events is None:
891            # use all events
892            events = zmq.EVENT_ALL
893        # attach monitoring socket
894        self.monitor(addr, events)
895        # create new PAIR socket and connect it
896        self._monitor_socket = self.context.socket(zmq.PAIR)
897        self._monitor_socket.connect(addr)
898        return self._monitor_socket
899
900    def disable_monitor(self):
901        """Shutdown the PAIR socket (created using get_monitor_socket)
902        that is serving socket events.
903
904        .. versionadded:: 14.4
905        """
906        self._monitor_socket = None
907        self.monitor(None, 0)
908
909
910__all__ = ['Socket']
911