1"""Selector Utilities.""" 2from __future__ import absolute_import, unicode_literals 3 4import errno 5import math 6import select as __select__ 7import socket 8import sys 9 10from numbers import Integral 11 12from . import fileno 13from .compat import detect_environment 14 15__all__ = ('poll',) 16 17_selectf = __select__.select 18_selecterr = __select__.error 19xpoll = getattr(__select__, 'poll', None) 20epoll = getattr(__select__, 'epoll', None) 21kqueue = getattr(__select__, 'kqueue', None) 22kevent = getattr(__select__, 'kevent', None) 23KQ_EV_ADD = getattr(__select__, 'KQ_EV_ADD', 1) 24KQ_EV_DELETE = getattr(__select__, 'KQ_EV_DELETE', 2) 25KQ_EV_ENABLE = getattr(__select__, 'KQ_EV_ENABLE', 4) 26KQ_EV_CLEAR = getattr(__select__, 'KQ_EV_CLEAR', 32) 27KQ_EV_ERROR = getattr(__select__, 'KQ_EV_ERROR', 16384) 28KQ_EV_EOF = getattr(__select__, 'KQ_EV_EOF', 32768) 29KQ_FILTER_READ = getattr(__select__, 'KQ_FILTER_READ', -1) 30KQ_FILTER_WRITE = getattr(__select__, 'KQ_FILTER_WRITE', -2) 31KQ_FILTER_AIO = getattr(__select__, 'KQ_FILTER_AIO', -3) 32KQ_FILTER_VNODE = getattr(__select__, 'KQ_FILTER_VNODE', -4) 33KQ_FILTER_PROC = getattr(__select__, 'KQ_FILTER_PROC', -5) 34KQ_FILTER_SIGNAL = getattr(__select__, 'KQ_FILTER_SIGNAL', -6) 35KQ_FILTER_TIMER = getattr(__select__, 'KQ_FILTER_TIMER', -7) 36KQ_NOTE_LOWAT = getattr(__select__, 'KQ_NOTE_LOWAT', 1) 37KQ_NOTE_DELETE = getattr(__select__, 'KQ_NOTE_DELETE', 1) 38KQ_NOTE_WRITE = getattr(__select__, 'KQ_NOTE_WRITE', 2) 39KQ_NOTE_EXTEND = getattr(__select__, 'KQ_NOTE_EXTEND', 4) 40KQ_NOTE_ATTRIB = getattr(__select__, 'KQ_NOTE_ATTRIB', 8) 41KQ_NOTE_LINK = getattr(__select__, 'KQ_NOTE_LINK', 16) 42KQ_NOTE_RENAME = getattr(__select__, 'KQ_NOTE_RENAME', 32) 43KQ_NOTE_REVOKE = getattr(__select__, 'KQ_NOTE_REVOKE', 64) 44POLLIN = getattr(__select__, 'POLLIN', 1) 45POLLOUT = getattr(__select__, 'POLLOUT', 4) 46POLLERR = getattr(__select__, 'POLLERR', 8) 47POLLHUP = getattr(__select__, 'POLLHUP', 16) 48POLLNVAL = getattr(__select__, 'POLLNVAL', 32) 49 50READ = POLL_READ = 0x001 51WRITE = POLL_WRITE = 0x004 52ERR = POLL_ERR = 0x008 | 0x010 53 54try: 55 SELECT_BAD_FD = {errno.EBADF, errno.WSAENOTSOCK} 56except AttributeError: 57 SELECT_BAD_FD = {errno.EBADF} 58 59 60class _epoll(object): 61 62 def __init__(self): 63 self._epoll = epoll() 64 65 def register(self, fd, events): 66 try: 67 self._epoll.register(fd, events) 68 except Exception as exc: 69 if getattr(exc, 'errno', None) != errno.EEXIST: 70 raise 71 return fd 72 73 def unregister(self, fd): 74 try: 75 self._epoll.unregister(fd) 76 except (socket.error, ValueError, KeyError, TypeError): 77 pass 78 except (IOError, OSError) as exc: 79 if getattr(exc, 'errno', None) not in (errno.ENOENT, errno.EPERM): 80 raise 81 82 def poll(self, timeout): 83 try: 84 return self._epoll.poll(timeout if timeout is not None else -1) 85 except Exception as exc: 86 if getattr(exc, 'errno', None) != errno.EINTR: 87 raise 88 89 def close(self): 90 self._epoll.close() 91 92 93class _kqueue(object): 94 w_fflags = (KQ_NOTE_WRITE | KQ_NOTE_EXTEND | 95 KQ_NOTE_ATTRIB | KQ_NOTE_DELETE) 96 97 def __init__(self): 98 self._kqueue = kqueue() 99 self._active = {} 100 self.on_file_change = None 101 self._kcontrol = self._kqueue.control 102 103 def register(self, fd, events): 104 self._control(fd, events, KQ_EV_ADD) 105 self._active[fd] = events 106 return fd 107 108 def unregister(self, fd): 109 events = self._active.pop(fd, None) 110 if events: 111 try: 112 self._control(fd, events, KQ_EV_DELETE) 113 except socket.error: 114 pass 115 116 def watch_file(self, fd): 117 ev = kevent(fd, 118 filter=KQ_FILTER_VNODE, 119 flags=KQ_EV_ADD | KQ_EV_ENABLE | KQ_EV_CLEAR, 120 fflags=self.w_fflags) 121 self._kcontrol([ev], 0) 122 123 def unwatch_file(self, fd): 124 ev = kevent(fd, 125 filter=KQ_FILTER_VNODE, 126 flags=KQ_EV_DELETE, 127 fflags=self.w_fflags) 128 self._kcontrol([ev], 0) 129 130 def _control(self, fd, events, flags): 131 if not events: 132 return 133 kevents = [] 134 if events & WRITE: 135 kevents.append(kevent(fd, 136 filter=KQ_FILTER_WRITE, 137 flags=flags)) 138 if not kevents or events & READ: 139 kevents.append( 140 kevent(fd, filter=KQ_FILTER_READ, flags=flags), 141 ) 142 control = self._kcontrol 143 for e in kevents: 144 try: 145 control([e], 0) 146 except ValueError: 147 pass 148 149 def poll(self, timeout): 150 try: 151 kevents = self._kcontrol(None, 1000, timeout) 152 except Exception as exc: 153 if getattr(exc, 'errno', None) == errno.EINTR: 154 return 155 raise 156 events, file_changes = {}, [] 157 for k in kevents: 158 fd = k.ident 159 if k.filter == KQ_FILTER_READ: 160 events[fd] = events.get(fd, 0) | READ 161 elif k.filter == KQ_FILTER_WRITE: 162 if k.flags & KQ_EV_EOF: 163 events[fd] = ERR 164 else: 165 events[fd] = events.get(fd, 0) | WRITE 166 elif k.filter == KQ_EV_ERROR: 167 events[fd] = events.get(fd, 0) | ERR 168 elif k.filter == KQ_FILTER_VNODE: 169 if k.fflags & KQ_NOTE_DELETE: 170 self.unregister(fd) 171 file_changes.append(k) 172 if file_changes: 173 self.on_file_change(file_changes) 174 return list(events.items()) 175 176 def close(self): 177 self._kqueue.close() 178 179 180class _poll(object): 181 182 def __init__(self): 183 self._poller = xpoll() 184 self._quick_poll = self._poller.poll 185 self._quick_register = self._poller.register 186 self._quick_unregister = self._poller.unregister 187 188 def register(self, fd, events): 189 fd = fileno(fd) 190 poll_flags = 0 191 if events & ERR: 192 poll_flags |= POLLERR 193 if events & WRITE: 194 poll_flags |= POLLOUT 195 if events & READ: 196 poll_flags |= POLLIN 197 self._quick_register(fd, poll_flags) 198 return fd 199 200 def unregister(self, fd): 201 try: 202 fd = fileno(fd) 203 except socket.error as exc: 204 # we don't know the previous fd of this object 205 # but it will be removed by the next poll iteration. 206 if getattr(exc, 'errno', None) in SELECT_BAD_FD: 207 return fd 208 raise 209 self._quick_unregister(fd) 210 return fd 211 212 def poll(self, timeout, round=math.ceil, 213 POLLIN=POLLIN, POLLOUT=POLLOUT, POLLERR=POLLERR, 214 READ=READ, WRITE=WRITE, ERR=ERR, Integral=Integral): 215 timeout = 0 if timeout and timeout < 0 else round((timeout or 0) * 1e3) 216 try: 217 event_list = self._quick_poll(timeout) 218 except (_selecterr, socket.error) as exc: 219 if getattr(exc, 'errno', None) == errno.EINTR: 220 return 221 raise 222 223 ready = [] 224 for fd, event in event_list: 225 events = 0 226 if event & POLLIN: 227 events |= READ 228 if event & POLLOUT: 229 events |= WRITE 230 if event & POLLERR or event & POLLNVAL or event & POLLHUP: 231 events |= ERR 232 assert events 233 if not isinstance(fd, Integral): 234 fd = fd.fileno() 235 ready.append((fd, events)) 236 return ready 237 238 def close(self): 239 self._poller = None 240 241 242class _select(object): 243 244 def __init__(self): 245 self._all = (self._rfd, 246 self._wfd, 247 self._efd) = set(), set(), set() 248 249 def register(self, fd, events): 250 fd = fileno(fd) 251 if events & ERR: 252 self._efd.add(fd) 253 if events & WRITE: 254 self._wfd.add(fd) 255 if events & READ: 256 self._rfd.add(fd) 257 return fd 258 259 def _remove_bad(self): 260 for fd in self._rfd | self._wfd | self._efd: 261 try: 262 _selectf([fd], [], [], 0) 263 except (_selecterr, socket.error) as exc: 264 if getattr(exc, 'errno', None) in SELECT_BAD_FD: 265 self.unregister(fd) 266 267 def unregister(self, fd): 268 try: 269 fd = fileno(fd) 270 except socket.error as exc: 271 # we don't know the previous fd of this object 272 # but it will be removed by the next poll iteration. 273 if getattr(exc, 'errno', None) in SELECT_BAD_FD: 274 return 275 raise 276 self._rfd.discard(fd) 277 self._wfd.discard(fd) 278 self._efd.discard(fd) 279 280 def poll(self, timeout): 281 try: 282 read, write, error = _selectf( 283 self._rfd, self._wfd, self._efd, timeout, 284 ) 285 except (_selecterr, socket.error) as exc: 286 if getattr(exc, 'errno', None) == errno.EINTR: 287 return 288 elif getattr(exc, 'errno', None) in SELECT_BAD_FD: 289 return self._remove_bad() 290 raise 291 292 events = {} 293 for fd in read: 294 if not isinstance(fd, Integral): 295 fd = fd.fileno() 296 events[fd] = events.get(fd, 0) | READ 297 for fd in write: 298 if not isinstance(fd, Integral): 299 fd = fd.fileno() 300 events[fd] = events.get(fd, 0) | WRITE 301 for fd in error: 302 if not isinstance(fd, Integral): 303 fd = fd.fileno() 304 events[fd] = events.get(fd, 0) | ERR 305 return list(events.items()) 306 307 def close(self): 308 self._rfd.clear() 309 self._wfd.clear() 310 self._efd.clear() 311 312 313def _get_poller(): 314 if detect_environment() != 'default': 315 # greenlet 316 return _select 317 elif epoll: 318 # Py2.6+ Linux 319 return _epoll 320 elif kqueue and 'netbsd' in sys.platform: 321 return _kqueue 322 elif xpoll: 323 return _poll 324 else: 325 return _select 326 327 328def poll(*args, **kwargs): 329 """Create new poller instance.""" 330 return _get_poller()(*args, **kwargs) 331