1"""Poller Components for asynchronous file and socket I/O. 2 3This module contains Poller components that enable polling of file or socket 4descriptors for read/write events. Pollers: 5- Select 6- Poll 7- EPoll 8""" 9 10import os 11import select 12import platform 13from errno import EBADF, EINTR 14from select import error as SelectError 15from socket import error as SocketError, create_connection, \ 16 socket as create_socket, AF_INET, SOCK_STREAM, socket 17from threading import Thread 18from circuits.core.handlers import handler 19 20from .events import Event 21from .components import BaseComponent 22 23 24class _read(Event): 25 26 """_read Event""" 27 28 29class _write(Event): 30 31 """_write Event""" 32 33 34class _error(Event): 35 36 """_error Event""" 37 38 39class _disconnect(Event): 40 41 """_disconnect Event""" 42 43 44class BasePoller(BaseComponent): 45 46 channel = None 47 48 def __init__(self, channel=channel): 49 super(BasePoller, self).__init__(channel=channel) 50 51 self._read = [] 52 self._write = [] 53 self._targets = {} 54 55 self._ctrl_recv, self._ctrl_send = self._create_control_con() 56 57 def _create_control_con(self): 58 if platform.system() == "Linux": 59 return os.pipe() 60 server = create_socket(AF_INET, SOCK_STREAM) 61 server.bind(("localhost", 0)) 62 server.listen(1) 63 res_list = [] 64 65 def accept(): 66 sock, _ = server.accept() 67 sock.setblocking(False) 68 res_list.append(sock) 69 at = Thread(target=accept) 70 at.start() 71 clnt_sock = create_connection(server.getsockname()) 72 at.join() 73 return (res_list[0], clnt_sock) 74 75 @handler("generate_events", priority=-9) 76 def _on_generate_events(self, event): 77 """ 78 Pollers have slightly higher priority than the default handler 79 from Manager to ensure that they are invoked before the 80 default handler. They act as event filters to avoid the additional 81 invocation of the default handler which would be unnecessary 82 overhead. 83 """ 84 85 event.stop() 86 self._generate_events(event) 87 88 def resume(self): 89 if isinstance(self._ctrl_send, socket): 90 self._ctrl_send.send(b"\0") 91 else: 92 os.write(self._ctrl_send, b"\0") 93 94 def _read_ctrl(self): 95 try: 96 if isinstance(self._ctrl_recv, socket): 97 return self._ctrl_recv.recv(1) 98 else: 99 return os.read(self._ctrl_recv, 1) 100 except: 101 return b"\0" 102 103 def addReader(self, source, fd): 104 channel = getattr(source, "channel", "*") 105 self._read.append(fd) 106 self._targets[fd] = channel 107 108 def addWriter(self, source, fd): 109 channel = getattr(source, "channel", "*") 110 self._write.append(fd) 111 self._targets[fd] = channel 112 113 def removeReader(self, fd): 114 if fd in self._read: 115 self._read.remove(fd) 116 if not (fd in self._read or fd in self._write) and fd in self._targets: 117 del self._targets[fd] 118 119 def removeWriter(self, fd): 120 if fd in self._write: 121 self._write.remove(fd) 122 if not (fd in self._read or fd in self._write) and fd in self._targets: 123 del self._targets[fd] 124 125 def isReading(self, fd): 126 return fd in self._read 127 128 def isWriting(self, fd): 129 return fd in self._write 130 131 def discard(self, fd): 132 if fd in self._read: 133 self._read.remove(fd) 134 if fd in self._write: 135 self._write.remove(fd) 136 if fd in self._targets: 137 del self._targets[fd] 138 139 def getTarget(self, fd): 140 return self._targets.get(fd, self.parent) 141 142 143class Select(BasePoller): 144 145 """Select(...) -> new Select Poller Component 146 147 Creates a new Select Poller Component that uses the select poller 148 implementation. This poller is not recommended but is available for legacy 149 reasons as most systems implement select-based polling for backwards 150 compatibility. 151 """ 152 153 channel = "select" 154 155 def __init__(self, channel=channel): 156 super(Select, self).__init__(channel=channel) 157 158 self._read.append(self._ctrl_recv) 159 160 def _preenDescriptors(self): 161 for socks in (self._read[:], self._write[:]): 162 for sock in socks: 163 try: 164 select.select([sock], [sock], [sock], 0) 165 except Exception: 166 self.discard(sock) 167 168 def _generate_events(self, event): 169 try: 170 if not any([self._read, self._write]): 171 return 172 timeout = event.time_left 173 if timeout < 0: 174 r, w, _ = select.select(self._read, self._write, []) 175 else: 176 r, w, _ = select.select(self._read, self._write, [], timeout) 177 except ValueError as e: 178 # Possibly a file descriptor has gone negative? 179 return self._preenDescriptors() 180 except TypeError as e: 181 # Something *totally* invalid (object w/o fileno, non-integral 182 # result) was passed 183 return self._preenDescriptors() 184 except (SelectError, SocketError, IOError) as e: 185 # select(2) encountered an error 186 if e.args[0] in (0, 2): 187 # windows does this if it got an empty list 188 if (not self._read) and (not self._write): 189 return 190 else: 191 raise 192 elif e.args[0] == EINTR: 193 return 194 elif e.args[0] == EBADF: 195 return self._preenDescriptors() 196 else: 197 # OK, I really don't know what's going on. Blow up. 198 raise 199 200 for sock in w: 201 if self.isWriting(sock): 202 self.fire(_write(sock), self.getTarget(sock)) 203 204 for sock in r: 205 if sock == self._ctrl_recv: 206 self._read_ctrl() 207 continue 208 if self.isReading(sock): 209 self.fire(_read(sock), self.getTarget(sock)) 210 211 212class Poll(BasePoller): 213 214 """Poll(...) -> new Poll Poller Component 215 216 Creates a new Poll Poller Component that uses the poll poller 217 implementation. 218 """ 219 220 channel = "poll" 221 222 def __init__(self, channel=channel): 223 super(Poll, self).__init__(channel=channel) 224 225 self._map = {} 226 self._poller = select.poll() 227 228 self._disconnected_flag = ( 229 select.POLLHUP 230 | select.POLLERR 231 | select.POLLNVAL 232 ) 233 234 self._read.append(self._ctrl_recv) 235 self._updateRegistration(self._ctrl_recv) 236 237 def _updateRegistration(self, fd): 238 fileno = fd.fileno() if not isinstance(fd, int) else fd 239 240 try: 241 self._poller.unregister(fileno) 242 except (KeyError, ValueError): 243 pass 244 245 mask = 0 246 247 if fd in self._read: 248 mask = mask | select.POLLIN 249 if fd in self._write: 250 mask = mask | select.POLLOUT 251 252 if mask: 253 self._poller.register(fd, mask) 254 self._map[fileno] = fd 255 else: 256 super(Poll, self).discard(fd) 257 try: 258 del self._map[fileno] 259 except KeyError: 260 pass 261 262 def addReader(self, source, fd): 263 super(Poll, self).addReader(source, fd) 264 self._updateRegistration(fd) 265 266 def addWriter(self, source, fd): 267 super(Poll, self).addWriter(source, fd) 268 self._updateRegistration(fd) 269 270 def removeReader(self, fd): 271 super(Poll, self).removeReader(fd) 272 self._updateRegistration(fd) 273 274 def removeWriter(self, fd): 275 super(Poll, self).removeWriter(fd) 276 self._updateRegistration(fd) 277 278 def discard(self, fd): 279 super(Poll, self).discard(fd) 280 self._updateRegistration(fd) 281 282 def _generate_events(self, event): 283 try: 284 timeout = event.time_left 285 if timeout < 0: 286 l = self._poller.poll() 287 else: 288 l = self._poller.poll(1000 * timeout) 289 except SelectError as e: 290 if e.args[0] == EINTR: 291 return 292 else: 293 raise 294 295 for fileno, event in l: 296 self._process(fileno, event) 297 298 def _process(self, fileno, event): 299 if fileno not in self._map: 300 return 301 302 fd = self._map[fileno] 303 if fd == self._ctrl_recv: 304 self._read_ctrl() 305 return 306 307 if event & self._disconnected_flag and not (event & select.POLLIN): 308 self.fire(_disconnect(fd), self.getTarget(fd)) 309 self._poller.unregister(fileno) 310 super(Poll, self).discard(fd) 311 del self._map[fileno] 312 else: 313 try: 314 if event & select.POLLIN: 315 self.fire(_read(fd), self.getTarget(fd)) 316 if event & select.POLLOUT: 317 self.fire(_write(fd), self.getTarget(fd)) 318 except Exception as e: 319 self.fire(_error(fd, e), self.getTarget(fd)) 320 self.fire(_disconnect(fd), self.getTarget(fd)) 321 self._poller.unregister(fileno) 322 super(Poll, self).discard(fd) 323 del self._map[fileno] 324 325 326class EPoll(BasePoller): 327 328 """EPoll(...) -> new EPoll Poller Component 329 330 Creates a new EPoll Poller Component that uses the epoll poller 331 implementation. 332 """ 333 334 channel = "epoll" 335 336 def __init__(self, channel=channel): 337 super(EPoll, self).__init__(channel=channel) 338 339 self._map = {} 340 self._poller = select.epoll() 341 342 self._disconnected_flag = (select.EPOLLHUP | select.EPOLLERR) 343 344 self._read.append(self._ctrl_recv) 345 self._updateRegistration(self._ctrl_recv) 346 347 def _updateRegistration(self, fd): 348 try: 349 fileno = fd.fileno() if not isinstance(fd, int) else fd 350 self._poller.unregister(fileno) 351 except (SocketError, IOError, ValueError) as e: 352 if e.args[0] == EBADF: 353 keys = [k for k, v in list(self._map.items()) if v == fd] 354 for key in keys: 355 del self._map[key] 356 357 mask = 0 358 359 if fd in self._read: 360 mask = mask | select.EPOLLIN 361 if fd in self._write: 362 mask = mask | select.EPOLLOUT 363 364 if mask: 365 self._poller.register(fd, mask) 366 self._map[fileno] = fd 367 else: 368 super(EPoll, self).discard(fd) 369 370 def addReader(self, source, fd): 371 super(EPoll, self).addReader(source, fd) 372 self._updateRegistration(fd) 373 374 def addWriter(self, source, fd): 375 super(EPoll, self).addWriter(source, fd) 376 self._updateRegistration(fd) 377 378 def removeReader(self, fd): 379 super(EPoll, self).removeReader(fd) 380 self._updateRegistration(fd) 381 382 def removeWriter(self, fd): 383 super(EPoll, self).removeWriter(fd) 384 self._updateRegistration(fd) 385 386 def discard(self, fd): 387 super(EPoll, self).discard(fd) 388 self._updateRegistration(fd) 389 390 def _generate_events(self, event): 391 try: 392 timeout = event.time_left 393 if timeout < 0: 394 l = self._poller.poll() 395 else: 396 l = self._poller.poll(timeout) 397 except IOError as e: 398 if e.args[0] == EINTR: 399 return 400 except SelectError as e: 401 if e.args[0] == EINTR: 402 return 403 else: 404 raise 405 406 for fileno, event in l: 407 self._process(fileno, event) 408 409 def _process(self, fileno, event): 410 if fileno not in self._map: 411 return 412 413 fd = self._map[fileno] 414 if fd == self._ctrl_recv: 415 self._read_ctrl() 416 return 417 418 if event & self._disconnected_flag and not (event & select.POLLIN): 419 self.fire(_disconnect(fd), self.getTarget(fd)) 420 self._poller.unregister(fileno) 421 super(EPoll, self).discard(fd) 422 del self._map[fileno] 423 else: 424 try: 425 if event & select.EPOLLIN: 426 self.fire(_read(fd), self.getTarget(fd)) 427 if event & select.EPOLLOUT: 428 self.fire(_write(fd), self.getTarget(fd)) 429 except Exception as e: 430 self.fire(_error(fd, e), self.getTarget(fd)) 431 self.fire(_disconnect(fd), self.getTarget(fd)) 432 self._poller.unregister(fileno) 433 super(EPoll, self).discard(fd) 434 del self._map[fileno] 435 436 437class KQueue(BasePoller): 438 439 """KQueue(...) -> new KQueue Poller Component 440 441 Creates a new KQueue Poller Component that uses the kqueue poller 442 implementation. 443 """ 444 445 channel = "kqueue" 446 447 def __init__(self, channel=channel): 448 super(KQueue, self).__init__(channel=channel) 449 self._map = {} 450 self._poller = select.kqueue() 451 452 self._read.append(self._ctrl_recv) 453 self._map[self._ctrl_recv.fileno()] = self._ctrl_recv 454 self._poller.control( 455 [ 456 select.kevent( 457 self._ctrl_recv, select.KQ_FILTER_READ, select.KQ_EV_ADD 458 ) 459 ], 0 460 ) 461 462 def addReader(self, source, sock): 463 super(KQueue, self).addReader(source, sock) 464 self._map[sock.fileno()] = sock 465 self._poller.control( 466 [select.kevent(sock, select.KQ_FILTER_READ, select.KQ_EV_ADD)], 0 467 ) 468 469 def addWriter(self, source, sock): 470 super(KQueue, self).addWriter(source, sock) 471 self._map[sock.fileno()] = sock 472 self._poller.control( 473 [select.kevent(sock, select.KQ_FILTER_WRITE, select.KQ_EV_ADD)], 0 474 ) 475 476 def removeReader(self, sock): 477 super(KQueue, self).removeReader(sock) 478 self._poller.control( 479 [ 480 select.kevent(sock, select.KQ_FILTER_READ, select.KQ_EV_DELETE) 481 ], 482 0 483 ) 484 485 def removeWriter(self, sock): 486 super(KQueue, self).removeWriter(sock) 487 self._poller.control( 488 [select.kevent(sock, select.KQ_FILTER_WRITE, select.KQ_EV_DELETE)], 489 0 490 ) 491 492 def discard(self, sock): 493 super(KQueue, self).discard(sock) 494 del self._map[sock.fileno()] 495 self._poller.control( 496 [ 497 select.kevent( 498 sock, 499 select.KQ_FILTER_WRITE | select.KQ_FILTER_READ, 500 select.KQ_EV_DELETE 501 ) 502 ], 503 0 504 ) 505 506 def _generate_events(self, event): 507 try: 508 timeout = event.time_left 509 if timeout < 0: 510 l = self._poller.control(None, 1000) 511 else: 512 l = self._poller.control(None, 1000, timeout) 513 except SelectError as e: 514 if e[0] == EINTR: 515 return 516 else: 517 raise 518 519 for event in l: 520 self._process(event) 521 522 def _process(self, event): 523 if event.ident not in self._map: 524 # shouldn't happen ? 525 # we unregister the socket since we don't care about it anymore 526 self._poller.control( 527 [ 528 select.kevent( 529 event.ident, event.filter, select.KQ_EV_DELETE 530 ) 531 ], 532 0 533 ) 534 535 return 536 537 sock = self._map[event.ident] 538 if sock == self._ctrl_recv: 539 self._read_ctrl() 540 return 541 542 if event.flags & select.KQ_EV_ERROR: 543 self.fire(_error(sock, "error"), self.getTarget(sock)) 544 elif event.flags & select.KQ_EV_EOF: 545 self.fire(_disconnect(sock), self.getTarget(sock)) 546 elif event.filter == select.KQ_FILTER_WRITE: 547 self.fire(_write(sock), self.getTarget(sock)) 548 elif event.filter == select.KQ_FILTER_READ: 549 self.fire(_read(sock), self.getTarget(sock)) 550 551Poller = Select 552 553__all__ = ("BasePoller", "Poller", "Select", "Poll", "EPoll", "KQueue") 554