1# Copyright (C) 2007 Giampaolo Rodola' <g.rodola@gmail.com>. 2# Use of this source code is governed by MIT license that can be 3# found in the LICENSE file. 4 5""" 6A specialized IO loop on top of asyncore adding support for epoll() 7on Linux and kqueue() and OSX/BSD, dramatically increasing performances 8offered by base asyncore module. 9 10poll() and select() loops are also reimplemented and are an order of 11magnitude faster as they support fd un/registration and modification. 12 13This module is not supposed to be used directly unless you want to 14include a new dispatcher which runs within the main FTP server loop, 15in which case: 16 __________________________________________________________________ 17 | | | 18 | INSTEAD OF | ...USE: | 19 |______________________|___________________________________________| 20 | | | 21 | asyncore.dispacher | Acceptor (for servers) | 22 | asyncore.dispacher | Connector (for clients) | 23 | asynchat.async_chat | AsyncChat (for a full duplex connection ) | 24 | asyncore.loop | FTPServer.server_forever() | 25 |______________________|___________________________________________| 26 27asyncore.dispatcher_with_send is not supported, same for "map" argument 28for asyncore.loop and asyncore.dispatcher and asynchat.async_chat 29constructors. 30 31Follows a server example: 32 33import socket 34from pyftpdlib.ioloop import IOLoop, Acceptor, AsyncChat 35 36class Handler(AsyncChat): 37 38 def __init__(self, sock): 39 AsyncChat.__init__(self, sock) 40 self.push('200 hello\r\n') 41 self.close_when_done() 42 43class Server(Acceptor): 44 45 def __init__(self, host, port): 46 Acceptor.__init__(self) 47 self.create_socket(socket.AF_INET, socket.SOCK_STREAM) 48 self.set_reuse_addr() 49 self.bind((host, port)) 50 self.listen(5) 51 52 def handle_accepted(self, sock, addr): 53 Handler(sock) 54 55server = Server('localhost', 8021) 56IOLoop.instance().loop() 57""" 58 59import asynchat 60import asyncore 61import errno 62import heapq 63import os 64import select 65import socket 66import sys 67import time 68import traceback 69try: 70 import threading 71except ImportError: 72 import dummy_threading as threading 73 74from ._compat import callable 75from .log import config_logging 76from .log import debug 77from .log import is_logging_configured 78from .log import logger 79 80 81timer = getattr(time, 'monotonic', time.time) 82_read = asyncore.read 83_write = asyncore.write 84 85# These errnos indicate that a connection has been abruptly terminated. 86_ERRNOS_DISCONNECTED = set(( 87 errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN, errno.ECONNABORTED, 88 errno.EPIPE, errno.EBADF, errno.ETIMEDOUT)) 89if hasattr(errno, "WSAECONNRESET"): 90 _ERRNOS_DISCONNECTED.add(errno.WSAECONNRESET) 91if hasattr(errno, "WSAECONNABORTED"): 92 _ERRNOS_DISCONNECTED.add(errno.WSAECONNABORTED) 93 94# These errnos indicate that a non-blocking operation must be retried 95# at a later time. 96_ERRNOS_RETRY = set((errno.EAGAIN, errno.EWOULDBLOCK)) 97if hasattr(errno, "WSAEWOULDBLOCK"): 98 _ERRNOS_RETRY.add(errno.WSAEWOULDBLOCK) 99 100 101class RetryError(Exception): 102 pass 103 104 105# =================================================================== 106# --- scheduler 107# =================================================================== 108 109class _Scheduler(object): 110 """Run the scheduled functions due to expire soonest (if any).""" 111 112 def __init__(self): 113 # the heap used for the scheduled tasks 114 self._tasks = [] 115 self._cancellations = 0 116 117 def poll(self): 118 """Run the scheduled functions due to expire soonest and 119 return the timeout of the next one (if any, else None). 120 """ 121 now = timer() 122 calls = [] 123 while self._tasks: 124 if now < self._tasks[0].timeout: 125 break 126 call = heapq.heappop(self._tasks) 127 if call.cancelled: 128 self._cancellations -= 1 129 else: 130 calls.append(call) 131 132 for call in calls: 133 if call._repush: 134 heapq.heappush(self._tasks, call) 135 call._repush = False 136 continue 137 try: 138 call.call() 139 except Exception: 140 logger.error(traceback.format_exc()) 141 142 # remove cancelled tasks and re-heapify the queue if the 143 # number of cancelled tasks is more than the half of the 144 # entire queue 145 if (self._cancellations > 512 and 146 self._cancellations > (len(self._tasks) >> 1)): 147 debug("re-heapifying %s cancelled tasks" % self._cancellations) 148 self.reheapify() 149 150 try: 151 return max(0, self._tasks[0].timeout - now) 152 except IndexError: 153 pass 154 155 def register(self, what): 156 """Register a _CallLater instance.""" 157 heapq.heappush(self._tasks, what) 158 159 def unregister(self, what): 160 """Unregister a _CallLater instance. 161 The actual unregistration will happen at a later time though. 162 """ 163 self._cancellations += 1 164 165 def reheapify(self): 166 """Get rid of cancelled calls and reinitialize the internal heap.""" 167 self._cancellations = 0 168 self._tasks = [x for x in self._tasks if not x.cancelled] 169 heapq.heapify(self._tasks) 170 171 172class _CallLater(object): 173 """Container object which instance is returned by ioloop.call_later().""" 174 175 __slots__ = ('_delay', '_target', '_args', '_kwargs', '_errback', '_sched', 176 '_repush', 'timeout', 'cancelled') 177 178 def __init__(self, seconds, target, *args, **kwargs): 179 assert callable(target), "%s is not callable" % target 180 assert sys.maxsize >= seconds >= 0, \ 181 "%s is not greater than or equal to 0 seconds" % seconds 182 self._delay = seconds 183 self._target = target 184 self._args = args 185 self._kwargs = kwargs 186 self._errback = kwargs.pop('_errback', None) 187 self._sched = kwargs.pop('_scheduler') 188 self._repush = False 189 # seconds from the epoch at which to call the function 190 if not seconds: 191 self.timeout = 0 192 else: 193 self.timeout = timer() + self._delay 194 self.cancelled = False 195 self._sched.register(self) 196 197 def __lt__(self, other): 198 return self.timeout < other.timeout 199 200 def __le__(self, other): 201 return self.timeout <= other.timeout 202 203 def __repr__(self): 204 if self._target is None: 205 sig = object.__repr__(self) 206 else: 207 sig = repr(self._target) 208 sig += ' args=%s, kwargs=%s, cancelled=%s, secs=%s' % ( 209 self._args or '[]', self._kwargs or '{}', self.cancelled, 210 self._delay) 211 return '<%s>' % sig 212 213 __str__ = __repr__ 214 215 def _post_call(self, exc): 216 if not self.cancelled: 217 self.cancel() 218 219 def call(self): 220 """Call this scheduled function.""" 221 assert not self.cancelled, "already cancelled" 222 exc = None 223 try: 224 self._target(*self._args, **self._kwargs) 225 except Exception as _: 226 exc = _ 227 if self._errback is not None: 228 self._errback() 229 else: 230 raise 231 finally: 232 self._post_call(exc) 233 234 def reset(self): 235 """Reschedule this call resetting the current countdown.""" 236 assert not self.cancelled, "already cancelled" 237 self.timeout = timer() + self._delay 238 self._repush = True 239 240 def cancel(self): 241 """Unschedule this call.""" 242 if not self.cancelled: 243 self.cancelled = True 244 self._target = self._args = self._kwargs = self._errback = None 245 self._sched.unregister(self) 246 247 248class _CallEvery(_CallLater): 249 """Container object which instance is returned by IOLoop.call_every().""" 250 251 def _post_call(self, exc): 252 if not self.cancelled: 253 if exc: 254 self.cancel() 255 else: 256 self.timeout = timer() + self._delay 257 self._sched.register(self) 258 259 260class _IOLoop(object): 261 """Base class which will later be referred as IOLoop.""" 262 263 READ = 1 264 WRITE = 2 265 _instance = None 266 _lock = threading.Lock() 267 _started_once = False 268 269 def __init__(self): 270 self.socket_map = {} 271 self.sched = _Scheduler() 272 273 def __enter__(self): 274 return self 275 276 def __exit__(self, *args): 277 self.close() 278 279 def __repr__(self): 280 status = [self.__class__.__module__ + "." + self.__class__.__name__] 281 status.append("(fds=%s, tasks=%s)" % ( 282 len(self.socket_map), len(self.sched._tasks))) 283 return '<%s at %#x>' % (' '.join(status), id(self)) 284 285 __str__ = __repr__ 286 287 @classmethod 288 def instance(cls): 289 """Return a global IOLoop instance.""" 290 if cls._instance is None: 291 with cls._lock: 292 if cls._instance is None: 293 cls._instance = cls() 294 return cls._instance 295 296 def register(self, fd, instance, events): 297 """Register a fd, handled by instance for the given events.""" 298 raise NotImplementedError('must be implemented in subclass') 299 300 def unregister(self, fd): 301 """Register fd.""" 302 raise NotImplementedError('must be implemented in subclass') 303 304 def modify(self, fd, events): 305 """Changes the events assigned for fd.""" 306 raise NotImplementedError('must be implemented in subclass') 307 308 def poll(self, timeout): 309 """Poll once. The subclass overriding this method is supposed 310 to poll over the registered handlers and the scheduled functions 311 and then return. 312 """ 313 raise NotImplementedError('must be implemented in subclass') 314 315 def loop(self, timeout=None, blocking=True): 316 """Start the asynchronous IO loop. 317 318 - (float) timeout: the timeout passed to the underlying 319 multiplex syscall (select(), epoll() etc.). 320 321 - (bool) blocking: if True poll repeatedly, as long as there 322 are registered handlers and/or scheduled functions. 323 If False poll only once and return the timeout of the next 324 scheduled call (if any, else None). 325 """ 326 if not _IOLoop._started_once: 327 _IOLoop._started_once = True 328 if not is_logging_configured(): 329 # If we get to this point it means the user hasn't 330 # configured logging. We want to log by default so 331 # we configure logging ourselves so that it will 332 # print to stderr. 333 config_logging() 334 335 if blocking: 336 # localize variable access to minimize overhead 337 poll = self.poll 338 socket_map = self.socket_map 339 sched_poll = self.sched.poll 340 341 if timeout is not None: 342 while socket_map: 343 poll(timeout) 344 sched_poll() 345 else: 346 soonest_timeout = None 347 while socket_map: 348 poll(soonest_timeout) 349 soonest_timeout = sched_poll() 350 else: 351 sched = self.sched 352 if self.socket_map: 353 self.poll(timeout) 354 if sched._tasks: 355 return sched.poll() 356 357 def call_later(self, seconds, target, *args, **kwargs): 358 """Calls a function at a later time. 359 It can be used to asynchronously schedule a call within the polling 360 loop without blocking it. The instance returned is an object that 361 can be used to cancel or reschedule the call. 362 363 - (int) seconds: the number of seconds to wait 364 - (obj) target: the callable object to call later 365 - args: the arguments to call it with 366 - kwargs: the keyword arguments to call it with; a special 367 '_errback' parameter can be passed: it is a callable 368 called in case target function raises an exception. 369 """ 370 kwargs['_scheduler'] = self.sched 371 return _CallLater(seconds, target, *args, **kwargs) 372 373 def call_every(self, seconds, target, *args, **kwargs): 374 """Schedules the given callback to be called periodically.""" 375 kwargs['_scheduler'] = self.sched 376 return _CallEvery(seconds, target, *args, **kwargs) 377 378 def close(self): 379 """Closes the IOLoop, freeing any resources used.""" 380 debug("closing IOLoop", self) 381 self.__class__._instance = None 382 383 # free connections 384 instances = sorted(self.socket_map.values(), key=lambda x: x._fileno) 385 for inst in instances: 386 try: 387 inst.close() 388 except OSError as err: 389 if err.errno != errno.EBADF: 390 logger.error(traceback.format_exc()) 391 except Exception: 392 logger.error(traceback.format_exc()) 393 self.socket_map.clear() 394 395 # free scheduled functions 396 for x in self.sched._tasks: 397 try: 398 if not x.cancelled: 399 x.cancel() 400 except Exception: 401 logger.error(traceback.format_exc()) 402 del self.sched._tasks[:] 403 404 405# =================================================================== 406# --- select() - POSIX / Windows 407# =================================================================== 408 409class Select(_IOLoop): 410 """select()-based poller.""" 411 412 def __init__(self): 413 _IOLoop.__init__(self) 414 self._r = [] 415 self._w = [] 416 417 def register(self, fd, instance, events): 418 if fd not in self.socket_map: 419 self.socket_map[fd] = instance 420 if events & self.READ: 421 self._r.append(fd) 422 if events & self.WRITE: 423 self._w.append(fd) 424 425 def unregister(self, fd): 426 try: 427 del self.socket_map[fd] 428 except KeyError: 429 debug("call: unregister(); fd was no longer in socket_map", self) 430 for l in (self._r, self._w): 431 try: 432 l.remove(fd) 433 except ValueError: 434 pass 435 436 def modify(self, fd, events): 437 inst = self.socket_map.get(fd) 438 if inst is not None: 439 self.unregister(fd) 440 self.register(fd, inst, events) 441 else: 442 debug("call: modify(); fd was no longer in socket_map", self) 443 444 def poll(self, timeout): 445 try: 446 r, w, e = select.select(self._r, self._w, [], timeout) 447 except select.error as err: 448 if getattr(err, "errno", None) == errno.EINTR: 449 return 450 raise 451 452 smap_get = self.socket_map.get 453 for fd in r: 454 obj = smap_get(fd) 455 if obj is None or not obj.readable(): 456 continue 457 _read(obj) 458 for fd in w: 459 obj = smap_get(fd) 460 if obj is None or not obj.writable(): 461 continue 462 _write(obj) 463 464 465# =================================================================== 466# --- poll() / epoll() 467# =================================================================== 468 469class _BasePollEpoll(_IOLoop): 470 """This is common to both poll() (UNIX), epoll() (Linux) and 471 /dev/poll (Solaris) implementations which share almost the same 472 interface. 473 Not supposed to be used directly. 474 """ 475 476 def __init__(self): 477 _IOLoop.__init__(self) 478 self._poller = self._poller() 479 480 def register(self, fd, instance, events): 481 try: 482 self._poller.register(fd, events) 483 except EnvironmentError as err: 484 if err.errno == errno.EEXIST: 485 debug("call: register(); poller raised EEXIST; ignored", self) 486 else: 487 raise 488 self.socket_map[fd] = instance 489 490 def unregister(self, fd): 491 try: 492 del self.socket_map[fd] 493 except KeyError: 494 debug("call: unregister(); fd was no longer in socket_map", self) 495 else: 496 try: 497 self._poller.unregister(fd) 498 except EnvironmentError as err: 499 if err.errno in (errno.ENOENT, errno.EBADF): 500 debug("call: unregister(); poller returned %r; " 501 "ignoring it" % err, self) 502 else: 503 raise 504 505 def modify(self, fd, events): 506 try: 507 self._poller.modify(fd, events) 508 except OSError as err: 509 if err.errno == errno.ENOENT and fd in self.socket_map: 510 # XXX - see: 511 # https://github.com/giampaolo/pyftpdlib/issues/329 512 instance = self.socket_map[fd] 513 self.register(fd, instance, events) 514 else: 515 raise 516 517 def poll(self, timeout): 518 try: 519 events = self._poller.poll(timeout or -1) # -1 waits indefinitely 520 except (IOError, select.error) as err: 521 # for epoll() and poll() respectively 522 if err.errno == errno.EINTR: 523 return 524 raise 525 # localize variable access to minimize overhead 526 smap_get = self.socket_map.get 527 for fd, event in events: 528 inst = smap_get(fd) 529 if inst is None: 530 continue 531 if event & self._ERROR and not event & self.READ: 532 inst.handle_close() 533 else: 534 if event & self.READ: 535 if inst.readable(): 536 _read(inst) 537 if event & self.WRITE: 538 if inst.writable(): 539 _write(inst) 540 541 542# =================================================================== 543# --- poll() - POSIX 544# =================================================================== 545 546if hasattr(select, 'poll'): 547 548 class Poll(_BasePollEpoll): 549 """poll() based poller.""" 550 551 READ = select.POLLIN 552 WRITE = select.POLLOUT 553 _ERROR = select.POLLERR | select.POLLHUP | select.POLLNVAL 554 _poller = select.poll 555 556 def modify(self, fd, events): 557 inst = self.socket_map[fd] 558 self.unregister(fd) 559 self.register(fd, inst, events) 560 561 def poll(self, timeout): 562 # poll() timeout is expressed in milliseconds 563 if timeout is not None: 564 timeout = int(timeout * 1000) 565 _BasePollEpoll.poll(self, timeout) 566 567 568# =================================================================== 569# --- /dev/poll - Solaris (introduced in python 3.3) 570# =================================================================== 571 572if hasattr(select, 'devpoll'): # pragma: no cover 573 574 class DevPoll(_BasePollEpoll): 575 """/dev/poll based poller (introduced in python 3.3).""" 576 577 READ = select.POLLIN 578 WRITE = select.POLLOUT 579 _ERROR = select.POLLERR | select.POLLHUP | select.POLLNVAL 580 _poller = select.devpoll 581 582 # introduced in python 3.4 583 if hasattr(select.devpoll, 'fileno'): 584 def fileno(self): 585 """Return devpoll() fd.""" 586 return self._poller.fileno() 587 588 def modify(self, fd, events): 589 inst = self.socket_map[fd] 590 self.unregister(fd) 591 self.register(fd, inst, events) 592 593 def poll(self, timeout): 594 # /dev/poll timeout is expressed in milliseconds 595 if timeout is not None: 596 timeout = int(timeout * 1000) 597 _BasePollEpoll.poll(self, timeout) 598 599 # introduced in python 3.4 600 if hasattr(select.devpoll, 'close'): 601 def close(self): 602 _IOLoop.close(self) 603 self._poller.close() 604 605 606# =================================================================== 607# --- epoll() - Linux 608# =================================================================== 609 610if hasattr(select, 'epoll'): 611 612 class Epoll(_BasePollEpoll): 613 """epoll() based poller.""" 614 615 READ = select.EPOLLIN 616 WRITE = select.EPOLLOUT 617 _ERROR = select.EPOLLERR | select.EPOLLHUP 618 _poller = select.epoll 619 620 def fileno(self): 621 """Return epoll() fd.""" 622 return self._poller.fileno() 623 624 def close(self): 625 _IOLoop.close(self) 626 self._poller.close() 627 628 629# =================================================================== 630# --- kqueue() - BSD / OSX 631# =================================================================== 632 633if hasattr(select, 'kqueue'): # pragma: no cover 634 635 class Kqueue(_IOLoop): 636 """kqueue() based poller.""" 637 638 def __init__(self): 639 _IOLoop.__init__(self) 640 self._kqueue = select.kqueue() 641 self._active = {} 642 643 def fileno(self): 644 """Return kqueue() fd.""" 645 return self._kqueue.fileno() 646 647 def close(self): 648 _IOLoop.close(self) 649 self._kqueue.close() 650 651 def register(self, fd, instance, events): 652 self.socket_map[fd] = instance 653 try: 654 self._control(fd, events, select.KQ_EV_ADD) 655 except EnvironmentError as err: 656 if err.errno == errno.EEXIST: 657 debug("call: register(); poller raised EEXIST; ignored", 658 self) 659 else: 660 raise 661 self._active[fd] = events 662 663 def unregister(self, fd): 664 try: 665 del self.socket_map[fd] 666 events = self._active.pop(fd) 667 except KeyError: 668 pass 669 else: 670 try: 671 self._control(fd, events, select.KQ_EV_DELETE) 672 except EnvironmentError as err: 673 if err.errno in (errno.ENOENT, errno.EBADF): 674 debug("call: unregister(); poller returned %r; " 675 "ignoring it" % err, self) 676 else: 677 raise 678 679 def modify(self, fd, events): 680 instance = self.socket_map[fd] 681 self.unregister(fd) 682 self.register(fd, instance, events) 683 684 def _control(self, fd, events, flags): 685 kevents = [] 686 if events & self.WRITE: 687 kevents.append(select.kevent( 688 fd, filter=select.KQ_FILTER_WRITE, flags=flags)) 689 if events & self.READ or not kevents: 690 # always read when there is not a write 691 kevents.append(select.kevent( 692 fd, filter=select.KQ_FILTER_READ, flags=flags)) 693 # even though control() takes a list, it seems to return 694 # EINVAL on Mac OS X (10.6) when there is more than one 695 # event in the list 696 for kevent in kevents: 697 self._kqueue.control([kevent], 0) 698 699 # localize variable access to minimize overhead 700 def poll(self, 701 timeout, 702 _len=len, 703 _READ=select.KQ_FILTER_READ, 704 _WRITE=select.KQ_FILTER_WRITE, 705 _EOF=select.KQ_EV_EOF, 706 _ERROR=select.KQ_EV_ERROR): 707 try: 708 kevents = self._kqueue.control(None, _len(self.socket_map), 709 timeout) 710 except OSError as err: 711 if err.errno == errno.EINTR: 712 return 713 raise 714 for kevent in kevents: 715 inst = self.socket_map.get(kevent.ident) 716 if inst is None: 717 continue 718 if kevent.filter == _READ: 719 if inst.readable(): 720 _read(inst) 721 if kevent.filter == _WRITE: 722 if kevent.flags & _EOF: 723 # If an asynchronous connection is refused, 724 # kqueue returns a write event with the EOF 725 # flag set. 726 # Note that for read events, EOF may be returned 727 # before all data has been consumed from the 728 # socket buffer, so we only check for EOF on 729 # write events. 730 inst.handle_close() 731 else: 732 if inst.writable(): 733 _write(inst) 734 if kevent.flags & _ERROR: 735 inst.handle_close() 736 737 738# =================================================================== 739# --- choose the better poller for this platform 740# =================================================================== 741 742if hasattr(select, 'epoll'): # epoll() - Linux 743 IOLoop = Epoll 744elif hasattr(select, 'kqueue'): # kqueue() - BSD / OSX 745 IOLoop = Kqueue 746elif hasattr(select, 'devpoll'): # /dev/poll - Solaris 747 IOLoop = DevPoll 748elif hasattr(select, 'poll'): # poll() - POSIX 749 IOLoop = Poll 750else: # select() - POSIX and Windows 751 IOLoop = Select 752 753 754# =================================================================== 755# --- asyncore dispatchers 756# =================================================================== 757 758# these are overridden in order to register() and unregister() 759# file descriptors against the new pollers 760 761 762class AsyncChat(asynchat.async_chat): 763 """Same as asynchat.async_chat, only working with the new IO poller 764 and being more clever in avoid registering for read events when 765 it shouldn't. 766 """ 767 768 def __init__(self, sock=None, ioloop=None): 769 self.ioloop = ioloop or IOLoop.instance() 770 self._wanted_io_events = self.ioloop.READ 771 self._current_io_events = self.ioloop.READ 772 self._closed = False 773 self._closing = False 774 self._fileno = sock.fileno() if sock else None 775 self._tasks = [] 776 asynchat.async_chat.__init__(self, sock) 777 778 # --- IO loop related methods 779 780 def add_channel(self, map=None, events=None): 781 assert self._fileno, repr(self._fileno) 782 events = events if events is not None else self.ioloop.READ 783 self.ioloop.register(self._fileno, self, events) 784 self._wanted_io_events = events 785 self._current_io_events = events 786 787 def del_channel(self, map=None): 788 if self._fileno is not None: 789 self.ioloop.unregister(self._fileno) 790 791 def modify_ioloop_events(self, events, logdebug=False): 792 if not self._closed: 793 assert self._fileno, repr(self._fileno) 794 if self._fileno not in self.ioloop.socket_map: 795 debug( 796 "call: modify_ioloop_events(), fd was no longer in " 797 "socket_map, had to register() it again", inst=self) 798 self.add_channel(events=events) 799 else: 800 if events != self._current_io_events: 801 if logdebug: 802 if events == self.ioloop.READ: 803 ev = "R" 804 elif events == self.ioloop.WRITE: 805 ev = "W" 806 elif events == self.ioloop.READ | self.ioloop.WRITE: 807 ev = "RW" 808 else: 809 ev = events 810 debug("call: IOLoop.modify(); setting %r IO events" % ( 811 ev), self) 812 self.ioloop.modify(self._fileno, events) 813 self._current_io_events = events 814 else: 815 debug( 816 "call: modify_ioloop_events(), handler had already been " 817 "close()d, skipping modify()", inst=self) 818 819 # --- utils 820 821 def call_later(self, seconds, target, *args, **kwargs): 822 """Same as self.ioloop.call_later but also cancel()s the 823 scheduled function on close(). 824 """ 825 if '_errback' not in kwargs and hasattr(self, 'handle_error'): 826 kwargs['_errback'] = self.handle_error 827 callback = self.ioloop.call_later(seconds, target, *args, **kwargs) 828 self._tasks.append(callback) 829 return callback 830 831 # --- overridden asynchat methods 832 833 def connect(self, addr): 834 self.modify_ioloop_events(self.ioloop.WRITE) 835 asynchat.async_chat.connect(self, addr) 836 837 def connect_af_unspecified(self, addr, source_address=None): 838 """Same as connect() but guesses address family from addr. 839 Return the address family just determined. 840 """ 841 assert self.socket is None 842 host, port = addr 843 err = "getaddrinfo() returned an empty list" 844 info = socket.getaddrinfo(host, port, socket.AF_UNSPEC, 845 socket.SOCK_STREAM, 0, socket.AI_PASSIVE) 846 for res in info: 847 self.socket = None 848 af, socktype, proto, canonname, sa = res 849 try: 850 self.create_socket(af, socktype) 851 if source_address: 852 if source_address[0].startswith('::ffff:'): 853 # In this scenario, the server has an IPv6 socket, but 854 # the remote client is using IPv4 and its address is 855 # represented as an IPv4-mapped IPv6 address which 856 # looks like this ::ffff:151.12.5.65, see: 857 # http://en.wikipedia.org/wiki/IPv6\ 858 # IPv4-mapped_addresses 859 # http://tools.ietf.org/html/rfc3493.html#section-3.7 860 # We truncate the first bytes to make it look like a 861 # common IPv4 address. 862 source_address = (source_address[0][7:], 863 source_address[1]) 864 self.bind(source_address) 865 self.connect((host, port)) 866 except socket.error as _: 867 err = _ 868 if self.socket is not None: 869 self.socket.close() 870 self.del_channel() 871 self.socket = None 872 continue 873 break 874 if self.socket is None: 875 self.del_channel() 876 raise socket.error(err) 877 return af 878 879 # send() and recv() overridden as a fix around various bugs: 880 # - http://bugs.python.org/issue1736101 881 # - https://github.com/giampaolo/pyftpdlib/issues/104 882 # - https://github.com/giampaolo/pyftpdlib/issues/109 883 884 def send(self, data): 885 try: 886 return self.socket.send(data) 887 except socket.error as err: 888 debug("call: send(), err: %s" % err, inst=self) 889 if err.errno in _ERRNOS_RETRY: 890 return 0 891 elif err.errno in _ERRNOS_DISCONNECTED: 892 self.handle_close() 893 return 0 894 else: 895 raise 896 897 def recv(self, buffer_size): 898 try: 899 data = self.socket.recv(buffer_size) 900 except socket.error as err: 901 debug("call: recv(), err: %s" % err, inst=self) 902 if err.errno in _ERRNOS_DISCONNECTED: 903 self.handle_close() 904 return b'' 905 elif err.errno in _ERRNOS_RETRY: 906 raise RetryError 907 else: 908 raise 909 else: 910 if not data: 911 # a closed connection is indicated by signaling 912 # a read condition, and having recv() return 0. 913 self.handle_close() 914 return b'' 915 else: 916 return data 917 918 def handle_read(self): 919 try: 920 asynchat.async_chat.handle_read(self) 921 except RetryError: 922 # This can be raised by (the overridden) recv(). 923 pass 924 925 def initiate_send(self): 926 asynchat.async_chat.initiate_send(self) 927 if not self._closed: 928 # if there's still data to send we want to be ready 929 # for writing, else we're only intereseted in reading 930 if not self.producer_fifo: 931 wanted = self.ioloop.READ 932 else: 933 # In FTPHandler, we also want to listen for user input 934 # hence the READ. DTPHandler has its own initiate_send() 935 # which will either READ or WRITE. 936 wanted = self.ioloop.READ | self.ioloop.WRITE 937 if self._wanted_io_events != wanted: 938 self.ioloop.modify(self._fileno, wanted) 939 self._wanted_io_events = wanted 940 else: 941 debug("call: initiate_send(); called with no connection", 942 inst=self) 943 944 def close_when_done(self): 945 if len(self.producer_fifo) == 0: 946 self.handle_close() 947 else: 948 self._closing = True 949 asynchat.async_chat.close_when_done(self) 950 951 def close(self): 952 if not self._closed: 953 self._closed = True 954 try: 955 asynchat.async_chat.close(self) 956 finally: 957 for fun in self._tasks: 958 try: 959 fun.cancel() 960 except Exception: 961 logger.error(traceback.format_exc()) 962 self._tasks = [] 963 self._closed = True 964 self._closing = False 965 self.connected = False 966 967 968class Connector(AsyncChat): 969 """Same as base AsyncChat and supposed to be used for 970 clients. 971 """ 972 973 def add_channel(self, map=None, events=None): 974 AsyncChat.add_channel(self, map=map, events=self.ioloop.WRITE) 975 976 977class Acceptor(AsyncChat): 978 """Same as base AsyncChat and supposed to be used to 979 accept new connections. 980 """ 981 982 def add_channel(self, map=None, events=None): 983 AsyncChat.add_channel(self, map=map, events=self.ioloop.READ) 984 985 def bind_af_unspecified(self, addr): 986 """Same as bind() but guesses address family from addr. 987 Return the address family just determined. 988 """ 989 assert self.socket is None 990 host, port = addr 991 if host == "": 992 # When using bind() "" is a symbolic name meaning all 993 # available interfaces. People might not know we're 994 # using getaddrinfo() internally, which uses None 995 # instead of "", so we'll make the conversion for them. 996 host = None 997 err = "getaddrinfo() returned an empty list" 998 info = socket.getaddrinfo(host, port, socket.AF_UNSPEC, 999 socket.SOCK_STREAM, 0, socket.AI_PASSIVE) 1000 for res in info: 1001 self.socket = None 1002 self.del_channel() 1003 af, socktype, proto, canonname, sa = res 1004 try: 1005 self.create_socket(af, socktype) 1006 self.set_reuse_addr() 1007 self.bind(sa) 1008 except socket.error as _: 1009 err = _ 1010 if self.socket is not None: 1011 self.socket.close() 1012 self.del_channel() 1013 self.socket = None 1014 continue 1015 break 1016 if self.socket is None: 1017 self.del_channel() 1018 raise socket.error(err) 1019 return af 1020 1021 def listen(self, num): 1022 AsyncChat.listen(self, num) 1023 # XXX - this seems to be necessary, otherwise kqueue.control() 1024 # won't return listening fd events 1025 try: 1026 if isinstance(self.ioloop, Kqueue): 1027 self.ioloop.modify(self._fileno, self.ioloop.READ) 1028 except NameError: 1029 pass 1030 1031 def handle_accept(self): 1032 try: 1033 sock, addr = self.accept() 1034 except TypeError: 1035 # sometimes accept() might return None, see: 1036 # https://github.com/giampaolo/pyftpdlib/issues/91 1037 debug("call: handle_accept(); accept() returned None", self) 1038 return 1039 except socket.error as err: 1040 # ECONNABORTED might be thrown on *BSD, see: 1041 # https://github.com/giampaolo/pyftpdlib/issues/105 1042 if err.errno != errno.ECONNABORTED: 1043 raise 1044 else: 1045 debug("call: handle_accept(); accept() returned ECONNABORTED", 1046 self) 1047 else: 1048 # sometimes addr == None instead of (ip, port) (see issue 104) 1049 if addr is not None: 1050 self.handle_accepted(sock, addr) 1051 1052 def handle_accepted(self, sock, addr): 1053 sock.close() 1054 self.log_info('unhandled accepted event', 'warning') 1055 1056 # overridden for convenience; avoid to reuse address on Windows 1057 if (os.name in ('nt', 'ce')) or (sys.platform == 'cygwin'): 1058 def set_reuse_addr(self): 1059 pass 1060