1# coding: utf-8 2"""0MQ Socket pure Python methods.""" 3 4# Copyright (C) PyZMQ Developers 5# Distributed under the terms of the Modified BSD License. 6 7 8import errno 9import random 10import sys 11import warnings 12 13import zmq 14from zmq.backend import Socket as SocketBase 15from .poll import Poller 16from . import constants 17from .attrsettr import AttributeSetter 18from zmq.error import ZMQError, ZMQBindError 19from zmq.utils import jsonapi 20from zmq.utils.strtypes import bytes, unicode 21 22 23from .constants import ( 24 SNDMORE, 25 ENOTSUP, 26 POLLIN, 27 int64_sockopt_names, 28 int_sockopt_names, 29 bytes_sockopt_names, 30 fd_sockopt_names, 31 socket_types, 32) 33import pickle 34 35try: 36 DEFAULT_PROTOCOL = pickle.DEFAULT_PROTOCOL 37except AttributeError: 38 DEFAULT_PROTOCOL = pickle.HIGHEST_PROTOCOL 39 40 41class _SocketContext: 42 """Context Manager for socket bind/unbind""" 43 44 def __repr__(self): 45 return f"<SocketContext({self.kind}={self.addr!r})>" 46 47 def __init__(self, socket, kind, addr): 48 assert kind in {"bind", "connect"} 49 self.socket = socket 50 self.kind = kind 51 self.addr = addr 52 53 def __enter__(self): 54 return self.socket 55 56 def __exit__(self, *args): 57 if self.socket.closed: 58 return 59 if self.kind == "bind": 60 self.socket.unbind(self.addr) 61 elif self.kind == "connect": 62 self.socket.disconnect(self.addr) 63 64 65class Socket(SocketBase, AttributeSetter): 66 """The ZMQ socket object 67 68 To create a Socket, first create a Context:: 69 70 ctx = zmq.Context.instance() 71 72 then call ``ctx.socket(socket_type)``:: 73 74 s = ctx.socket(zmq.ROUTER) 75 76 """ 77 78 _shadow = False 79 _monitor_socket = None 80 _type_name = 'UNKNOWN' 81 82 def __init__(self, *a, **kw): 83 super().__init__(*a, **kw) 84 if 'shadow' in kw: 85 self._shadow = True 86 else: 87 self._shadow = False 88 try: 89 socket_type = self.get(zmq.TYPE) 90 except Exception: 91 pass 92 else: 93 self._type_name = socket_types.get(socket_type, str(socket_type)) 94 95 def __del__(self): 96 if not self._shadow and not self.closed: 97 warnings.warn( 98 f"unclosed socket {self}", 99 ResourceWarning, 100 stacklevel=2, 101 source=self, 102 ) 103 self.close() 104 105 _repr_cls = "zmq.Socket" 106 107 def __repr__(self): 108 cls = self.__class__ 109 # look up _repr_cls on exact class, not inherited 110 _repr_cls = cls.__dict__.get("_repr_cls", None) 111 if _repr_cls is None: 112 _repr_cls = f"{cls.__module__}.{cls.__name__}" 113 114 closed = ' closed' if self._closed else '' 115 116 return f"<{_repr_cls}(zmq.{self._type_name}) at {hex(id(self))}{closed}>" 117 118 # socket as context manager: 119 def __enter__(self): 120 """Sockets are context managers 121 122 .. versionadded:: 14.4 123 """ 124 return self 125 126 def __exit__(self, *args, **kwargs): 127 self.close() 128 129 # ------------------------------------------------------------------------- 130 # Socket creation 131 # ------------------------------------------------------------------------- 132 133 def __copy__(self, memo=None): 134 """Copying a Socket creates a shadow copy""" 135 return self.__class__.shadow(self.underlying) 136 137 __deepcopy__ = __copy__ 138 139 @classmethod 140 def shadow(cls, address): 141 """Shadow an existing libzmq socket 142 143 address is the integer address of the libzmq socket 144 or an FFI pointer to it. 145 146 .. versionadded:: 14.1 147 """ 148 from zmq.utils.interop import cast_int_addr 149 150 address = cast_int_addr(address) 151 return cls(shadow=address) 152 153 def close(self, linger=None): 154 """ 155 Close the socket. 156 157 If linger is specified, LINGER sockopt will be set prior to closing. 158 159 Note: closing a zmq Socket may not close the underlying sockets 160 if there are undelivered messages. 161 Only after all messages are delivered or discarded by reaching the socket's LINGER timeout 162 (default: forever) 163 will the underlying sockets be closed. 164 165 This can be called to close the socket by hand. If this is not 166 called, the socket will automatically be closed when it is 167 garbage collected. 168 """ 169 if self.context: 170 self.context._rm_socket(self) 171 super(Socket, self).close(linger=linger) 172 173 # ------------------------------------------------------------------------- 174 # Connect/Bind context managers 175 # ------------------------------------------------------------------------- 176 177 def _connect_cm(self, addr): 178 """Context manager to disconnect on exit 179 180 .. versionadded:: 20.0 181 """ 182 return _SocketContext(self, 'connect', addr) 183 184 def _bind_cm(self, addr): 185 """Context manager to unbind on exit 186 187 .. versionadded:: 20.0 188 """ 189 return _SocketContext(self, 'bind', addr) 190 191 def bind(self, addr): 192 """s.bind(addr) 193 194 Bind the socket to an address. 195 196 This causes the socket to listen on a network port. Sockets on the 197 other side of this connection will use ``Socket.connect(addr)`` to 198 connect to this socket. 199 200 Returns a context manager which will call unbind on exit. 201 202 .. versionadded:: 20.0 203 Can be used as a context manager. 204 205 Parameters 206 ---------- 207 addr : str 208 The address string. This has the form 'protocol://interface:port', 209 for example 'tcp://127.0.0.1:5555'. Protocols supported include 210 tcp, udp, pgm, epgm, inproc and ipc. If the address is unicode, it is 211 encoded to utf-8 first. 212 213 """ 214 super().bind(addr) 215 return self._bind_cm(addr) 216 217 def connect(self, addr): 218 """s.connect(addr) 219 220 Connect to a remote 0MQ socket. 221 222 Returns a context manager which will call disconnect on exit. 223 224 .. versionadded:: 20.0 225 Can be used as a context manager. 226 227 Parameters 228 ---------- 229 addr : str 230 The address string. This has the form 'protocol://interface:port', 231 for example 'tcp://127.0.0.1:5555'. Protocols supported are 232 tcp, upd, pgm, inproc and ipc. If the address is unicode, it is 233 encoded to utf-8 first. 234 235 """ 236 super().connect(addr) 237 return self._connect_cm(addr) 238 239 # ------------------------------------------------------------------------- 240 # Deprecated aliases 241 # ------------------------------------------------------------------------- 242 243 @property 244 def socket_type(self) -> int: 245 warnings.warn( 246 "Socket.socket_type is deprecated, use Socket.type", DeprecationWarning 247 ) 248 return self.type 249 250 # ------------------------------------------------------------------------- 251 # Hooks for sockopt completion 252 # ------------------------------------------------------------------------- 253 254 def __dir__(self): 255 keys = dir(self.__class__) 256 for collection in ( 257 bytes_sockopt_names, 258 int_sockopt_names, 259 int64_sockopt_names, 260 fd_sockopt_names, 261 ): 262 keys.extend(collection) 263 return keys 264 265 # ------------------------------------------------------------------------- 266 # Getting/Setting options 267 # ------------------------------------------------------------------------- 268 setsockopt = SocketBase.set 269 getsockopt = SocketBase.get 270 271 def __setattr__(self, key, value): 272 """Override to allow setting zmq.[UN]SUBSCRIBE even though we have a subscribe method""" 273 if key in self.__dict__: 274 object.__setattr__(self, key, value) 275 return 276 _key = key.lower() 277 if _key in ('subscribe', 'unsubscribe'): 278 279 if isinstance(value, unicode): 280 value = value.encode('utf8') 281 if _key == 'subscribe': 282 self.set(zmq.SUBSCRIBE, value) 283 else: 284 self.set(zmq.UNSUBSCRIBE, value) 285 return 286 super(Socket, self).__setattr__(key, value) 287 288 def fileno(self): 289 """Return edge-triggered file descriptor for this socket. 290 291 This is a read-only edge-triggered file descriptor for both read and write events on this socket. 292 It is important that all available events be consumed when an event is detected, 293 otherwise the read event will not trigger again. 294 295 .. versionadded:: 17.0 296 """ 297 return self.FD 298 299 def subscribe(self, topic): 300 """Subscribe to a topic 301 302 Only for SUB sockets. 303 304 .. versionadded:: 15.3 305 """ 306 if isinstance(topic, unicode): 307 topic = topic.encode('utf8') 308 self.set(zmq.SUBSCRIBE, topic) 309 310 def unsubscribe(self, topic): 311 """Unsubscribe from a topic 312 313 Only for SUB sockets. 314 315 .. versionadded:: 15.3 316 """ 317 if isinstance(topic, unicode): 318 topic = topic.encode('utf8') 319 self.set(zmq.UNSUBSCRIBE, topic) 320 321 def set_string(self, option, optval, encoding='utf-8'): 322 """Set socket options with a unicode object. 323 324 This is simply a wrapper for setsockopt to protect from encoding ambiguity. 325 326 See the 0MQ documentation for details on specific options. 327 328 Parameters 329 ---------- 330 option : int 331 The name of the option to set. Can be any of: SUBSCRIBE, 332 UNSUBSCRIBE, IDENTITY 333 optval : str 334 The value of the option to set. 335 encoding : str 336 The encoding to be used, default is utf8 337 """ 338 if not isinstance(optval, unicode): 339 raise TypeError("unicode strings only") 340 return self.set(option, optval.encode(encoding)) 341 342 setsockopt_unicode = setsockopt_string = set_string 343 344 def get_string(self, option, encoding='utf-8'): 345 """Get the value of a socket option. 346 347 See the 0MQ documentation for details on specific options. 348 349 Parameters 350 ---------- 351 option : int 352 The option to retrieve. 353 354 Returns 355 ------- 356 optval : str 357 The value of the option as a unicode string. 358 """ 359 360 if option not in constants.bytes_sockopts: 361 raise TypeError("option %i will not return a string to be decoded" % option) 362 return self.getsockopt(option).decode(encoding) 363 364 getsockopt_unicode = getsockopt_string = get_string 365 366 def bind_to_random_port(self, addr, min_port=49152, max_port=65536, max_tries=100): 367 """Bind this socket to a random port in a range. 368 369 If the port range is unspecified, the system will choose the port. 370 371 Parameters 372 ---------- 373 addr : str 374 The address string without the port to pass to ``Socket.bind()``. 375 min_port : int, optional 376 The minimum port in the range of ports to try (inclusive). 377 max_port : int, optional 378 The maximum port in the range of ports to try (exclusive). 379 max_tries : int, optional 380 The maximum number of bind attempts to make. 381 382 Returns 383 ------- 384 port : int 385 The port the socket was bound to. 386 387 Raises 388 ------ 389 ZMQBindError 390 if `max_tries` reached before successful bind 391 """ 392 if ( 393 hasattr(constants, 'LAST_ENDPOINT') 394 and min_port == 49152 395 and max_port == 65536 396 ): 397 # if LAST_ENDPOINT is supported, and min_port / max_port weren't specified, 398 # we can bind to port 0 and let the OS do the work 399 self.bind("%s:*" % addr) 400 url = self.last_endpoint.decode('ascii', 'replace') 401 _, port_s = url.rsplit(':', 1) 402 return int(port_s) 403 404 for i in range(max_tries): 405 try: 406 port = random.randrange(min_port, max_port) 407 self.bind('%s:%s' % (addr, port)) 408 except ZMQError as exception: 409 en = exception.errno 410 if en == zmq.EADDRINUSE: 411 continue 412 elif sys.platform == 'win32' and en == errno.EACCES: 413 continue 414 else: 415 raise 416 else: 417 return port 418 raise ZMQBindError("Could not bind socket to random port.") 419 420 def get_hwm(self): 421 """Get the High Water Mark. 422 423 On libzmq ≥ 3, this gets SNDHWM if available, otherwise RCVHWM 424 """ 425 major = zmq.zmq_version_info()[0] 426 if major >= 3: 427 # return sndhwm, fallback on rcvhwm 428 try: 429 return self.getsockopt(zmq.SNDHWM) 430 except zmq.ZMQError: 431 pass 432 433 return self.getsockopt(zmq.RCVHWM) 434 else: 435 return self.getsockopt(zmq.HWM) 436 437 def set_hwm(self, value): 438 """Set the High Water Mark. 439 440 On libzmq ≥ 3, this sets both SNDHWM and RCVHWM 441 442 443 .. warning:: 444 445 New values only take effect for subsequent socket 446 bind/connects. 447 """ 448 major = zmq.zmq_version_info()[0] 449 if major >= 3: 450 raised = None 451 try: 452 self.sndhwm = value 453 except Exception as e: 454 raised = e 455 try: 456 self.rcvhwm = value 457 except Exception as e: 458 raised = e 459 460 if raised: 461 raise raised 462 else: 463 return self.setsockopt(zmq.HWM, value) 464 465 hwm = property( 466 get_hwm, 467 set_hwm, 468 None, 469 """Property for High Water Mark. 470 471 Setting hwm sets both SNDHWM and RCVHWM as appropriate. 472 It gets SNDHWM if available, otherwise RCVHWM. 473 """, 474 ) 475 476 # ------------------------------------------------------------------------- 477 # Sending and receiving messages 478 # ------------------------------------------------------------------------- 479 480 def send(self, data, flags=0, copy=True, track=False, routing_id=None, group=None): 481 """Send a single zmq message frame on this socket. 482 483 This queues the message to be sent by the IO thread at a later time. 484 485 With flags=NOBLOCK, this raises :class:`ZMQError` if the queue is full; 486 otherwise, this waits until space is available. 487 See :class:`Poller` for more general non-blocking I/O. 488 489 Parameters 490 ---------- 491 data : bytes, Frame, memoryview 492 The content of the message. This can be any object that provides 493 the Python buffer API (i.e. `memoryview(data)` can be called). 494 flags : int 495 0, NOBLOCK, SNDMORE, or NOBLOCK|SNDMORE. 496 copy : bool 497 Should the message be sent in a copying or non-copying manner. 498 track : bool 499 Should the message be tracked for notification that ZMQ has 500 finished with it? (ignored if copy=True) 501 routing_id : int 502 For use with SERVER sockets 503 group : str 504 For use with RADIO sockets 505 506 Returns 507 ------- 508 None : if `copy` or not track 509 None if message was sent, raises an exception otherwise. 510 MessageTracker : if track and not copy 511 a MessageTracker object, whose `pending` property will 512 be True until the send is completed. 513 514 Raises 515 ------ 516 TypeError 517 If a unicode object is passed 518 ValueError 519 If `track=True`, but an untracked Frame is passed. 520 ZMQError 521 If the send does not succeed for any reason (including 522 if NOBLOCK is set and the outgoing queue is full). 523 524 525 .. versionchanged:: 17.0 526 527 DRAFT support for routing_id and group arguments. 528 """ 529 if routing_id is not None: 530 if not isinstance(data, zmq.Frame): 531 data = zmq.Frame( 532 data, 533 track=track, 534 copy=copy or None, 535 copy_threshold=self.copy_threshold, 536 ) 537 data.routing_id = routing_id 538 if group is not None: 539 if not isinstance(data, zmq.Frame): 540 data = zmq.Frame( 541 data, 542 track=track, 543 copy=copy or None, 544 copy_threshold=self.copy_threshold, 545 ) 546 data.group = group 547 return super(Socket, self).send(data, flags=flags, copy=copy, track=track) 548 549 def send_multipart(self, msg_parts, flags=0, copy=True, track=False, **kwargs): 550 """Send a sequence of buffers as a multipart message. 551 552 The zmq.SNDMORE flag is added to all msg parts before the last. 553 554 Parameters 555 ---------- 556 msg_parts : iterable 557 A sequence of objects to send as a multipart message. Each element 558 can be any sendable object (Frame, bytes, buffer-providers) 559 flags : int, optional 560 Any valid flags for :func:`Socket.send`. 561 SNDMORE is added automatically for frames before the last. 562 copy : bool, optional 563 Should the frame(s) be sent in a copying or non-copying manner. 564 If copy=False, frames smaller than self.copy_threshold bytes 565 will be copied anyway. 566 track : bool, optional 567 Should the frame(s) be tracked for notification that ZMQ has 568 finished with it (ignored if copy=True). 569 570 Returns 571 ------- 572 None : if copy or not track 573 MessageTracker : if track and not copy 574 a MessageTracker object, whose `pending` property will 575 be True until the last send is completed. 576 """ 577 # typecheck parts before sending: 578 for i, msg in enumerate(msg_parts): 579 if isinstance(msg, (zmq.Frame, bytes, memoryview)): 580 continue 581 try: 582 memoryview(msg) 583 except Exception: 584 rmsg = repr(msg) 585 if len(rmsg) > 32: 586 rmsg = rmsg[:32] + '...' 587 raise TypeError( 588 "Frame %i (%s) does not support the buffer interface." 589 % ( 590 i, 591 rmsg, 592 ) 593 ) 594 for msg in msg_parts[:-1]: 595 self.send(msg, SNDMORE | flags, copy=copy, track=track) 596 # Send the last part without the extra SNDMORE flag. 597 return self.send(msg_parts[-1], flags, copy=copy, track=track) 598 599 def recv_multipart(self, flags=0, copy=True, track=False): 600 """Receive a multipart message as a list of bytes or Frame objects 601 602 Parameters 603 ---------- 604 flags : int, optional 605 Any valid flags for :func:`Socket.recv`. 606 copy : bool, optional 607 Should the message frame(s) be received in a copying or non-copying manner? 608 If False a Frame object is returned for each part, if True a copy of 609 the bytes is made for each frame. 610 track : bool, optional 611 Should the message frame(s) be tracked for notification that ZMQ has 612 finished with it? (ignored if copy=True) 613 614 Returns 615 ------- 616 msg_parts : list 617 A list of frames in the multipart message; either Frames or bytes, 618 depending on `copy`. 619 620 Raises 621 ------ 622 ZMQError 623 for any of the reasons :func:`~Socket.recv` might fail 624 """ 625 parts = [self.recv(flags, copy=copy, track=track)] 626 # have first part already, only loop while more to receive 627 while self.getsockopt(zmq.RCVMORE): 628 part = self.recv(flags, copy=copy, track=track) 629 parts.append(part) 630 631 return parts 632 633 def _deserialize(self, recvd, load): 634 """Deserialize a received message 635 636 Override in subclass (e.g. Futures) if recvd is not the raw bytes. 637 638 The default implementation expects bytes and returns the deserialized message immediately. 639 640 Parameters 641 ---------- 642 643 load: callable 644 Callable that deserializes bytes 645 recvd: 646 The object returned by self.recv 647 648 """ 649 return load(recvd) 650 651 def send_serialized(self, msg, serialize, flags=0, copy=True, **kwargs): 652 """Send a message with a custom serialization function. 653 654 .. versionadded:: 17 655 656 Parameters 657 ---------- 658 msg : The message to be sent. Can be any object serializable by `serialize`. 659 serialize : callable 660 The serialization function to use. 661 serialize(msg) should return an iterable of sendable message frames 662 (e.g. bytes objects), which will be passed to send_multipart. 663 flags : int, optional 664 Any valid flags for :func:`Socket.send`. 665 copy : bool, optional 666 Whether to copy the frames. 667 668 """ 669 frames = serialize(msg) 670 return self.send_multipart(frames, flags=flags, copy=copy, **kwargs) 671 672 def recv_serialized(self, deserialize, flags=0, copy=True): 673 """Receive a message with a custom deserialization function. 674 675 .. versionadded:: 17 676 677 Parameters 678 ---------- 679 deserialize : callable 680 The deserialization function to use. 681 deserialize will be called with one argument: the list of frames 682 returned by recv_multipart() and can return any object. 683 flags : int, optional 684 Any valid flags for :func:`Socket.recv`. 685 copy : bool, optional 686 Whether to recv bytes or Frame objects. 687 688 Returns 689 ------- 690 obj : object 691 The object returned by the deserialization function. 692 693 Raises 694 ------ 695 ZMQError 696 for any of the reasons :func:`~Socket.recv` might fail 697 """ 698 frames = self.recv_multipart(flags=flags, copy=copy) 699 return self._deserialize(frames, deserialize) 700 701 def send_string(self, u, flags=0, copy=True, encoding='utf-8', **kwargs): 702 """Send a Python unicode string as a message with an encoding. 703 704 0MQ communicates with raw bytes, so you must encode/decode 705 text (str) around 0MQ. 706 707 Parameters 708 ---------- 709 u : str 710 The unicode string to send. 711 flags : int, optional 712 Any valid flags for :func:`Socket.send`. 713 encoding : str [default: 'utf-8'] 714 The encoding to be used 715 """ 716 if not isinstance(u, str): 717 raise TypeError("str objects only") 718 return self.send(u.encode(encoding), flags=flags, copy=copy, **kwargs) 719 720 send_unicode = send_string 721 722 def recv_string(self, flags=0, encoding='utf-8'): 723 """Receive a unicode string, as sent by send_string. 724 725 Parameters 726 ---------- 727 flags : int 728 Any valid flags for :func:`Socket.recv`. 729 encoding : str [default: 'utf-8'] 730 The encoding to be used 731 732 Returns 733 ------- 734 s : str 735 The Python unicode string that arrives as encoded bytes. 736 737 Raises 738 ------ 739 ZMQError 740 for any of the reasons :func:`~Socket.recv` might fail 741 """ 742 msg = self.recv(flags=flags) 743 return self._deserialize(msg, lambda buf: buf.decode(encoding)) 744 745 recv_unicode = recv_string 746 747 def send_pyobj(self, obj, flags=0, protocol=DEFAULT_PROTOCOL, **kwargs): 748 """Send a Python object as a message using pickle to serialize. 749 750 Parameters 751 ---------- 752 obj : Python object 753 The Python object to send. 754 flags : int 755 Any valid flags for :func:`Socket.send`. 756 protocol : int 757 The pickle protocol number to use. The default is pickle.DEFAULT_PROTOCOL 758 where defined, and pickle.HIGHEST_PROTOCOL elsewhere. 759 """ 760 msg = pickle.dumps(obj, protocol) 761 return self.send(msg, flags=flags, **kwargs) 762 763 def recv_pyobj(self, flags=0): 764 """Receive a Python object as a message using pickle to serialize. 765 766 Parameters 767 ---------- 768 flags : int 769 Any valid flags for :func:`Socket.recv`. 770 771 Returns 772 ------- 773 obj : Python object 774 The Python object that arrives as a message. 775 776 Raises 777 ------ 778 ZMQError 779 for any of the reasons :func:`~Socket.recv` might fail 780 """ 781 msg = self.recv(flags) 782 return self._deserialize(msg, pickle.loads) 783 784 def send_json(self, obj, flags=0, **kwargs): 785 """Send a Python object as a message using json to serialize. 786 787 Keyword arguments are passed on to json.dumps 788 789 Parameters 790 ---------- 791 obj : Python object 792 The Python object to send 793 flags : int 794 Any valid flags for :func:`Socket.send` 795 """ 796 send_kwargs = {} 797 for key in ('routing_id', 'group'): 798 if key in kwargs: 799 send_kwargs[key] = kwargs.pop(key) 800 msg = jsonapi.dumps(obj, **kwargs) 801 return self.send(msg, flags=flags, **send_kwargs) 802 803 def recv_json(self, flags=0, **kwargs): 804 """Receive a Python object as a message using json to serialize. 805 806 Keyword arguments are passed on to json.loads 807 808 Parameters 809 ---------- 810 flags : int 811 Any valid flags for :func:`Socket.recv`. 812 813 Returns 814 ------- 815 obj : Python object 816 The Python object that arrives as a message. 817 818 Raises 819 ------ 820 ZMQError 821 for any of the reasons :func:`~Socket.recv` might fail 822 """ 823 msg = self.recv(flags) 824 return self._deserialize(msg, lambda buf: jsonapi.loads(buf, **kwargs)) 825 826 _poller_class = Poller 827 828 def poll(self, timeout=None, flags=POLLIN): 829 """Poll the socket for events. 830 See :class:`Poller` to wait for multiple sockets at once. 831 832 Parameters 833 ---------- 834 timeout : int [default: None] 835 The timeout (in milliseconds) to wait for an event. If unspecified 836 (or specified None), will wait forever for an event. 837 flags : int [default: POLLIN] 838 POLLIN, POLLOUT, or POLLIN|POLLOUT. The event flags to poll for. 839 840 Returns 841 ------- 842 event_mask : int 843 The poll event mask (POLLIN, POLLOUT), 844 0 if the timeout was reached without an event. 845 """ 846 847 if self.closed: 848 raise ZMQError(ENOTSUP) 849 850 p = self._poller_class() 851 p.register(self, flags) 852 evts = dict(p.poll(timeout)) 853 # return 0 if no events, otherwise return event bitfield 854 return evts.get(self, 0) 855 856 def get_monitor_socket(self, events=None, addr=None): 857 """Return a connected PAIR socket ready to receive the event notifications. 858 859 .. versionadded:: libzmq-4.0 860 .. versionadded:: 14.0 861 862 Parameters 863 ---------- 864 events : int [default: ZMQ_EVENT_ALL] 865 The bitmask defining which events are wanted. 866 addr : string [default: None] 867 The optional endpoint for the monitoring sockets. 868 869 Returns 870 ------- 871 socket : (PAIR) 872 The socket is already connected and ready to receive messages. 873 """ 874 # safe-guard, method only available on libzmq >= 4 875 if zmq.zmq_version_info() < (4,): 876 raise NotImplementedError( 877 "get_monitor_socket requires libzmq >= 4, have %s" % zmq.zmq_version() 878 ) 879 880 # if already monitoring, return existing socket 881 if self._monitor_socket: 882 if self._monitor_socket.closed: 883 self._monitor_socket = None 884 else: 885 return self._monitor_socket 886 887 if addr is None: 888 # create endpoint name from internal fd 889 addr = "inproc://monitor.s-%d" % self.FD 890 if events is None: 891 # use all events 892 events = zmq.EVENT_ALL 893 # attach monitoring socket 894 self.monitor(addr, events) 895 # create new PAIR socket and connect it 896 self._monitor_socket = self.context.socket(zmq.PAIR) 897 self._monitor_socket.connect(addr) 898 return self._monitor_socket 899 900 def disable_monitor(self): 901 """Shutdown the PAIR socket (created using get_monitor_socket) 902 that is serving socket events. 903 904 .. versionadded:: 14.4 905 """ 906 self._monitor_socket = None 907 self.monitor(None, 0) 908 909 910__all__ = ['Socket'] 911