1import errno 2import select 3import sys 4from functools import partial 5 6try: 7 from time import monotonic 8except ImportError: 9 from time import time as monotonic 10 11__all__ = ["NoWayToWaitForSocketError", "wait_for_read", "wait_for_write"] 12 13 14class NoWayToWaitForSocketError(Exception): 15 pass 16 17 18# How should we wait on sockets? 19# 20# There are two types of APIs you can use for waiting on sockets: the fancy 21# modern stateful APIs like epoll/kqueue, and the older stateless APIs like 22# select/poll. The stateful APIs are more efficient when you have a lots of 23# sockets to keep track of, because you can set them up once and then use them 24# lots of times. But we only ever want to wait on a single socket at a time 25# and don't want to keep track of state, so the stateless APIs are actually 26# more efficient. So we want to use select() or poll(). 27# 28# Now, how do we choose between select() and poll()? On traditional Unixes, 29# select() has a strange calling convention that makes it slow, or fail 30# altogether, for high-numbered file descriptors. The point of poll() is to fix 31# that, so on Unixes, we prefer poll(). 32# 33# On Windows, there is no poll() (or at least Python doesn't provide a wrapper 34# for it), but that's OK, because on Windows, select() doesn't have this 35# strange calling convention; plain select() works fine. 36# 37# So: on Windows we use select(), and everywhere else we use poll(). We also 38# fall back to select() in case poll() is somehow broken or missing. 39 40if sys.version_info >= (3, 5): 41 # Modern Python, that retries syscalls by default 42 def _retry_on_intr(fn, timeout): 43 return fn(timeout) 44 45 46else: 47 # Old and broken Pythons. 48 def _retry_on_intr(fn, timeout): 49 if timeout is None: 50 deadline = float("inf") 51 else: 52 deadline = monotonic() + timeout 53 54 while True: 55 try: 56 return fn(timeout) 57 # OSError for 3 <= pyver < 3.5, select.error for pyver <= 2.7 58 except (OSError, select.error) as e: 59 # 'e.args[0]' incantation works for both OSError and select.error 60 if e.args[0] != errno.EINTR: 61 raise 62 else: 63 timeout = deadline - monotonic() 64 if timeout < 0: 65 timeout = 0 66 if timeout == float("inf"): 67 timeout = None 68 continue 69 70 71def select_wait_for_socket(sock, read=False, write=False, timeout=None): 72 if not read and not write: 73 raise RuntimeError("must specify at least one of read=True, write=True") 74 rcheck = [] 75 wcheck = [] 76 if read: 77 rcheck.append(sock) 78 if write: 79 wcheck.append(sock) 80 # When doing a non-blocking connect, most systems signal success by 81 # marking the socket writable. Windows, though, signals success by marked 82 # it as "exceptional". We paper over the difference by checking the write 83 # sockets for both conditions. (The stdlib selectors module does the same 84 # thing.) 85 fn = partial(select.select, rcheck, wcheck, wcheck) 86 rready, wready, xready = _retry_on_intr(fn, timeout) 87 return bool(rready or wready or xready) 88 89 90def poll_wait_for_socket(sock, read=False, write=False, timeout=None): 91 if not read and not write: 92 raise RuntimeError("must specify at least one of read=True, write=True") 93 mask = 0 94 if read: 95 mask |= select.POLLIN 96 if write: 97 mask |= select.POLLOUT 98 poll_obj = select.poll() 99 poll_obj.register(sock, mask) 100 101 # For some reason, poll() takes timeout in milliseconds 102 def do_poll(t): 103 if t is not None: 104 t *= 1000 105 return poll_obj.poll(t) 106 107 return bool(_retry_on_intr(do_poll, timeout)) 108 109 110def null_wait_for_socket(*args, **kwargs): 111 raise NoWayToWaitForSocketError("no select-equivalent available") 112 113 114def _have_working_poll(): 115 # Apparently some systems have a select.poll that fails as soon as you try 116 # to use it, either due to strange configuration or broken monkeypatching 117 # from libraries like eventlet/greenlet. 118 try: 119 poll_obj = select.poll() 120 _retry_on_intr(poll_obj.poll, 0) 121 except (AttributeError, OSError): 122 return False 123 else: 124 return True 125 126 127def wait_for_socket(*args, **kwargs): 128 # We delay choosing which implementation to use until the first time we're 129 # called. We could do it at import time, but then we might make the wrong 130 # decision if someone goes wild with monkeypatching select.poll after 131 # we're imported. 132 global wait_for_socket 133 if _have_working_poll(): 134 wait_for_socket = poll_wait_for_socket 135 elif hasattr(select, "select"): 136 wait_for_socket = select_wait_for_socket 137 else: # Platform-specific: Appengine. 138 wait_for_socket = null_wait_for_socket 139 return wait_for_socket(*args, **kwargs) 140 141 142def wait_for_read(sock, timeout=None): 143 """Waits for reading to be available on a given socket. 144 Returns True if the socket is readable, or False if the timeout expired. 145 """ 146 return wait_for_socket(sock, read=True, timeout=timeout) 147 148 149def wait_for_write(sock, timeout=None): 150 """Waits for writing to be available on a given socket. 151 Returns True if the socket is readable, or False if the timeout expired. 152 """ 153 return wait_for_socket(sock, write=True, timeout=timeout) 154