1# Copyright (C) 2007 Giampaolo Rodola' <g.rodola@gmail.com>.
2# Use of this source code is governed by MIT license that can be
3# found in the LICENSE file.
4
5"""
6A specialized IO loop on top of asyncore adding support for epoll()
7on Linux and kqueue() and OSX/BSD, dramatically increasing performances
8offered by base asyncore module.
9
10poll() and select() loops are also reimplemented and are an order of
11magnitude faster as they support fd un/registration and modification.
12
13This module is not supposed to be used directly unless you want to
14include a new dispatcher which runs within the main FTP server loop,
15in which case:
16  __________________________________________________________________
17 |                      |                                           |
18 | INSTEAD OF           | ...USE:                                   |
19 |______________________|___________________________________________|
20 |                      |                                           |
21 | asyncore.dispacher   | Acceptor (for servers)                    |
22 | asyncore.dispacher   | Connector (for clients)                   |
23 | asynchat.async_chat  | AsyncChat (for a full duplex connection ) |
24 | asyncore.loop        | FTPServer.server_forever()                |
25 |______________________|___________________________________________|
26
27asyncore.dispatcher_with_send is not supported, same for "map" argument
28for asyncore.loop and asyncore.dispatcher and asynchat.async_chat
29constructors.
30
31Follows a server example:
32
33import socket
34from pyftpdlib.ioloop import IOLoop, Acceptor, AsyncChat
35
36class Handler(AsyncChat):
37
38    def __init__(self, sock):
39        AsyncChat.__init__(self, sock)
40        self.push('200 hello\r\n')
41        self.close_when_done()
42
43class Server(Acceptor):
44
45    def __init__(self, host, port):
46        Acceptor.__init__(self)
47        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
48        self.set_reuse_addr()
49        self.bind((host, port))
50        self.listen(5)
51
52    def handle_accepted(self, sock, addr):
53        Handler(sock)
54
55server = Server('localhost', 8021)
56IOLoop.instance().loop()
57"""
58
59import asynchat
60import asyncore
61import errno
62import heapq
63import os
64import select
65import socket
66import sys
67import time
68import traceback
69try:
70    import threading
71except ImportError:
72    import dummy_threading as threading
73
74from ._compat import callable
75from .log import config_logging
76from .log import debug
77from .log import is_logging_configured
78from .log import logger
79
80
81timer = getattr(time, 'monotonic', time.time)
82_read = asyncore.read
83_write = asyncore.write
84
85# These errnos indicate that a connection has been abruptly terminated.
86_ERRNOS_DISCONNECTED = set((
87    errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN, errno.ECONNABORTED,
88    errno.EPIPE, errno.EBADF, errno.ETIMEDOUT))
89if hasattr(errno, "WSAECONNRESET"):
90    _ERRNOS_DISCONNECTED.add(errno.WSAECONNRESET)
91if hasattr(errno, "WSAECONNABORTED"):
92    _ERRNOS_DISCONNECTED.add(errno.WSAECONNABORTED)
93
94# These errnos indicate that a non-blocking operation must be retried
95# at a later time.
96_ERRNOS_RETRY = set((errno.EAGAIN, errno.EWOULDBLOCK))
97if hasattr(errno, "WSAEWOULDBLOCK"):
98    _ERRNOS_RETRY.add(errno.WSAEWOULDBLOCK)
99
100
101class RetryError(Exception):
102    pass
103
104
105# ===================================================================
106# --- scheduler
107# ===================================================================
108
109class _Scheduler(object):
110    """Run the scheduled functions due to expire soonest (if any)."""
111
112    def __init__(self):
113        # the heap used for the scheduled tasks
114        self._tasks = []
115        self._cancellations = 0
116
117    def poll(self):
118        """Run the scheduled functions due to expire soonest and
119        return the timeout of the next one (if any, else None).
120        """
121        now = timer()
122        calls = []
123        while self._tasks:
124            if now < self._tasks[0].timeout:
125                break
126            call = heapq.heappop(self._tasks)
127            if call.cancelled:
128                self._cancellations -= 1
129            else:
130                calls.append(call)
131
132        for call in calls:
133            if call._repush:
134                heapq.heappush(self._tasks, call)
135                call._repush = False
136                continue
137            try:
138                call.call()
139            except Exception:
140                logger.error(traceback.format_exc())
141
142        # remove cancelled tasks and re-heapify the queue if the
143        # number of cancelled tasks is more than the half of the
144        # entire queue
145        if (self._cancellations > 512 and
146                self._cancellations > (len(self._tasks) >> 1)):
147            debug("re-heapifying %s cancelled tasks" % self._cancellations)
148            self.reheapify()
149
150        try:
151            return max(0, self._tasks[0].timeout - now)
152        except IndexError:
153            pass
154
155    def register(self, what):
156        """Register a _CallLater instance."""
157        heapq.heappush(self._tasks, what)
158
159    def unregister(self, what):
160        """Unregister a _CallLater instance.
161        The actual unregistration will happen at a later time though.
162        """
163        self._cancellations += 1
164
165    def reheapify(self):
166        """Get rid of cancelled calls and reinitialize the internal heap."""
167        self._cancellations = 0
168        self._tasks = [x for x in self._tasks if not x.cancelled]
169        heapq.heapify(self._tasks)
170
171
172class _CallLater(object):
173    """Container object which instance is returned by ioloop.call_later()."""
174
175    __slots__ = ('_delay', '_target', '_args', '_kwargs', '_errback', '_sched',
176                 '_repush', 'timeout', 'cancelled')
177
178    def __init__(self, seconds, target, *args, **kwargs):
179        assert callable(target), "%s is not callable" % target
180        assert sys.maxsize >= seconds >= 0, \
181            "%s is not greater than or equal to 0 seconds" % seconds
182        self._delay = seconds
183        self._target = target
184        self._args = args
185        self._kwargs = kwargs
186        self._errback = kwargs.pop('_errback', None)
187        self._sched = kwargs.pop('_scheduler')
188        self._repush = False
189        # seconds from the epoch at which to call the function
190        if not seconds:
191            self.timeout = 0
192        else:
193            self.timeout = timer() + self._delay
194        self.cancelled = False
195        self._sched.register(self)
196
197    def __lt__(self, other):
198        return self.timeout < other.timeout
199
200    def __le__(self, other):
201        return self.timeout <= other.timeout
202
203    def __repr__(self):
204        if self._target is None:
205            sig = object.__repr__(self)
206        else:
207            sig = repr(self._target)
208        sig += ' args=%s, kwargs=%s, cancelled=%s, secs=%s' % (
209            self._args or '[]', self._kwargs or '{}', self.cancelled,
210            self._delay)
211        return '<%s>' % sig
212
213    __str__ = __repr__
214
215    def _post_call(self, exc):
216        if not self.cancelled:
217            self.cancel()
218
219    def call(self):
220        """Call this scheduled function."""
221        assert not self.cancelled, "already cancelled"
222        exc = None
223        try:
224            self._target(*self._args, **self._kwargs)
225        except Exception as _:
226            exc = _
227            if self._errback is not None:
228                self._errback()
229            else:
230                raise
231        finally:
232            self._post_call(exc)
233
234    def reset(self):
235        """Reschedule this call resetting the current countdown."""
236        assert not self.cancelled, "already cancelled"
237        self.timeout = timer() + self._delay
238        self._repush = True
239
240    def cancel(self):
241        """Unschedule this call."""
242        if not self.cancelled:
243            self.cancelled = True
244            self._target = self._args = self._kwargs = self._errback = None
245            self._sched.unregister(self)
246
247
248class _CallEvery(_CallLater):
249    """Container object which instance is returned by IOLoop.call_every()."""
250
251    def _post_call(self, exc):
252        if not self.cancelled:
253            if exc:
254                self.cancel()
255            else:
256                self.timeout = timer() + self._delay
257                self._sched.register(self)
258
259
260class _IOLoop(object):
261    """Base class which will later be referred as IOLoop."""
262
263    READ = 1
264    WRITE = 2
265    _instance = None
266    _lock = threading.Lock()
267    _started_once = False
268
269    def __init__(self):
270        self.socket_map = {}
271        self.sched = _Scheduler()
272
273    def __enter__(self):
274        return self
275
276    def __exit__(self, *args):
277        self.close()
278
279    def __repr__(self):
280        status = [self.__class__.__module__ + "." + self.__class__.__name__]
281        status.append("(fds=%s, tasks=%s)" % (
282            len(self.socket_map), len(self.sched._tasks)))
283        return '<%s at %#x>' % (' '.join(status), id(self))
284
285    __str__ = __repr__
286
287    @classmethod
288    def instance(cls):
289        """Return a global IOLoop instance."""
290        if cls._instance is None:
291            with cls._lock:
292                if cls._instance is None:
293                    cls._instance = cls()
294        return cls._instance
295
296    def register(self, fd, instance, events):
297        """Register a fd, handled by instance for the given events."""
298        raise NotImplementedError('must be implemented in subclass')
299
300    def unregister(self, fd):
301        """Register fd."""
302        raise NotImplementedError('must be implemented in subclass')
303
304    def modify(self, fd, events):
305        """Changes the events assigned for fd."""
306        raise NotImplementedError('must be implemented in subclass')
307
308    def poll(self, timeout):
309        """Poll once.  The subclass overriding this method is supposed
310        to poll over the registered handlers and the scheduled functions
311        and then return.
312        """
313        raise NotImplementedError('must be implemented in subclass')
314
315    def loop(self, timeout=None, blocking=True):
316        """Start the asynchronous IO loop.
317
318         - (float) timeout: the timeout passed to the underlying
319           multiplex syscall (select(), epoll() etc.).
320
321         - (bool) blocking: if True poll repeatedly, as long as there
322           are registered handlers and/or scheduled functions.
323           If False poll only once and return the timeout of the next
324           scheduled call (if any, else None).
325        """
326        if not _IOLoop._started_once:
327            _IOLoop._started_once = True
328            if not is_logging_configured():
329                # If we get to this point it means the user hasn't
330                # configured logging. We want to log by default so
331                # we configure logging ourselves so that it will
332                # print to stderr.
333                config_logging()
334
335        if blocking:
336            # localize variable access to minimize overhead
337            poll = self.poll
338            socket_map = self.socket_map
339            sched_poll = self.sched.poll
340
341            if timeout is not None:
342                while socket_map:
343                    poll(timeout)
344                    sched_poll()
345            else:
346                soonest_timeout = None
347                while socket_map:
348                    poll(soonest_timeout)
349                    soonest_timeout = sched_poll()
350        else:
351            sched = self.sched
352            if self.socket_map:
353                self.poll(timeout)
354            if sched._tasks:
355                return sched.poll()
356
357    def call_later(self, seconds, target, *args, **kwargs):
358        """Calls a function at a later time.
359        It can be used to asynchronously schedule a call within the polling
360        loop without blocking it. The instance returned is an object that
361        can be used to cancel or reschedule the call.
362
363         - (int) seconds: the number of seconds to wait
364         - (obj) target: the callable object to call later
365         - args: the arguments to call it with
366         - kwargs: the keyword arguments to call it with; a special
367           '_errback' parameter can be passed: it is a callable
368           called in case target function raises an exception.
369       """
370        kwargs['_scheduler'] = self.sched
371        return _CallLater(seconds, target, *args, **kwargs)
372
373    def call_every(self, seconds, target, *args, **kwargs):
374        """Schedules the given callback to be called periodically."""
375        kwargs['_scheduler'] = self.sched
376        return _CallEvery(seconds, target, *args, **kwargs)
377
378    def close(self):
379        """Closes the IOLoop, freeing any resources used."""
380        debug("closing IOLoop", self)
381        self.__class__._instance = None
382
383        # free connections
384        instances = sorted(self.socket_map.values(), key=lambda x: x._fileno)
385        for inst in instances:
386            try:
387                inst.close()
388            except OSError as err:
389                if err.errno != errno.EBADF:
390                    logger.error(traceback.format_exc())
391            except Exception:
392                logger.error(traceback.format_exc())
393        self.socket_map.clear()
394
395        # free scheduled functions
396        for x in self.sched._tasks:
397            try:
398                if not x.cancelled:
399                    x.cancel()
400            except Exception:
401                logger.error(traceback.format_exc())
402        del self.sched._tasks[:]
403
404
405# ===================================================================
406# --- select() - POSIX / Windows
407# ===================================================================
408
409class Select(_IOLoop):
410    """select()-based poller."""
411
412    def __init__(self):
413        _IOLoop.__init__(self)
414        self._r = []
415        self._w = []
416
417    def register(self, fd, instance, events):
418        if fd not in self.socket_map:
419            self.socket_map[fd] = instance
420            if events & self.READ:
421                self._r.append(fd)
422            if events & self.WRITE:
423                self._w.append(fd)
424
425    def unregister(self, fd):
426        try:
427            del self.socket_map[fd]
428        except KeyError:
429            debug("call: unregister(); fd was no longer in socket_map", self)
430        for l in (self._r, self._w):
431            try:
432                l.remove(fd)
433            except ValueError:
434                pass
435
436    def modify(self, fd, events):
437        inst = self.socket_map.get(fd)
438        if inst is not None:
439            self.unregister(fd)
440            self.register(fd, inst, events)
441        else:
442            debug("call: modify(); fd was no longer in socket_map", self)
443
444    def poll(self, timeout):
445        try:
446            r, w, e = select.select(self._r, self._w, [], timeout)
447        except select.error as err:
448            if getattr(err, "errno", None) == errno.EINTR:
449                return
450            raise
451
452        smap_get = self.socket_map.get
453        for fd in r:
454            obj = smap_get(fd)
455            if obj is None or not obj.readable():
456                continue
457            _read(obj)
458        for fd in w:
459            obj = smap_get(fd)
460            if obj is None or not obj.writable():
461                continue
462            _write(obj)
463
464
465# ===================================================================
466# --- poll() / epoll()
467# ===================================================================
468
469class _BasePollEpoll(_IOLoop):
470    """This is common to both poll() (UNIX), epoll() (Linux) and
471    /dev/poll (Solaris) implementations which share almost the same
472    interface.
473    Not supposed to be used directly.
474    """
475
476    def __init__(self):
477        _IOLoop.__init__(self)
478        self._poller = self._poller()
479
480    def register(self, fd, instance, events):
481        try:
482            self._poller.register(fd, events)
483        except EnvironmentError as err:
484            if err.errno == errno.EEXIST:
485                debug("call: register(); poller raised EEXIST; ignored", self)
486            else:
487                raise
488        self.socket_map[fd] = instance
489
490    def unregister(self, fd):
491        try:
492            del self.socket_map[fd]
493        except KeyError:
494            debug("call: unregister(); fd was no longer in socket_map", self)
495        else:
496            try:
497                self._poller.unregister(fd)
498            except EnvironmentError as err:
499                if err.errno in (errno.ENOENT, errno.EBADF):
500                    debug("call: unregister(); poller returned %r; "
501                          "ignoring it" % err, self)
502                else:
503                    raise
504
505    def modify(self, fd, events):
506        try:
507            self._poller.modify(fd, events)
508        except OSError as err:
509            if err.errno == errno.ENOENT and fd in self.socket_map:
510                # XXX - see:
511                # https://github.com/giampaolo/pyftpdlib/issues/329
512                instance = self.socket_map[fd]
513                self.register(fd, instance, events)
514            else:
515                raise
516
517    def poll(self, timeout):
518        try:
519            events = self._poller.poll(timeout or -1)  # -1 waits indefinitely
520        except (IOError, select.error) as err:
521            # for epoll() and poll() respectively
522            if err.errno == errno.EINTR:
523                return
524            raise
525        # localize variable access to minimize overhead
526        smap_get = self.socket_map.get
527        for fd, event in events:
528            inst = smap_get(fd)
529            if inst is None:
530                continue
531            if event & self._ERROR and not event & self.READ:
532                inst.handle_close()
533            else:
534                if event & self.READ:
535                    if inst.readable():
536                        _read(inst)
537                if event & self.WRITE:
538                    if inst.writable():
539                        _write(inst)
540
541
542# ===================================================================
543# --- poll() - POSIX
544# ===================================================================
545
546if hasattr(select, 'poll'):
547
548    class Poll(_BasePollEpoll):
549        """poll() based poller."""
550
551        READ = select.POLLIN
552        WRITE = select.POLLOUT
553        _ERROR = select.POLLERR | select.POLLHUP | select.POLLNVAL
554        _poller = select.poll
555
556        def modify(self, fd, events):
557            inst = self.socket_map[fd]
558            self.unregister(fd)
559            self.register(fd, inst, events)
560
561        def poll(self, timeout):
562            # poll() timeout is expressed in milliseconds
563            if timeout is not None:
564                timeout = int(timeout * 1000)
565            _BasePollEpoll.poll(self, timeout)
566
567
568# ===================================================================
569# --- /dev/poll - Solaris (introduced in python 3.3)
570# ===================================================================
571
572if hasattr(select, 'devpoll'):  # pragma: no cover
573
574    class DevPoll(_BasePollEpoll):
575        """/dev/poll based poller (introduced in python 3.3)."""
576
577        READ = select.POLLIN
578        WRITE = select.POLLOUT
579        _ERROR = select.POLLERR | select.POLLHUP | select.POLLNVAL
580        _poller = select.devpoll
581
582        # introduced in python 3.4
583        if hasattr(select.devpoll, 'fileno'):
584            def fileno(self):
585                """Return devpoll() fd."""
586                return self._poller.fileno()
587
588        def modify(self, fd, events):
589            inst = self.socket_map[fd]
590            self.unregister(fd)
591            self.register(fd, inst, events)
592
593        def poll(self, timeout):
594            # /dev/poll timeout is expressed in milliseconds
595            if timeout is not None:
596                timeout = int(timeout * 1000)
597            _BasePollEpoll.poll(self, timeout)
598
599        # introduced in python 3.4
600        if hasattr(select.devpoll, 'close'):
601            def close(self):
602                _IOLoop.close(self)
603                self._poller.close()
604
605
606# ===================================================================
607# --- epoll() - Linux
608# ===================================================================
609
610if hasattr(select, 'epoll'):
611
612    class Epoll(_BasePollEpoll):
613        """epoll() based poller."""
614
615        READ = select.EPOLLIN
616        WRITE = select.EPOLLOUT
617        _ERROR = select.EPOLLERR | select.EPOLLHUP
618        _poller = select.epoll
619
620        def fileno(self):
621            """Return epoll() fd."""
622            return self._poller.fileno()
623
624        def close(self):
625            _IOLoop.close(self)
626            self._poller.close()
627
628
629# ===================================================================
630# --- kqueue() - BSD / OSX
631# ===================================================================
632
633if hasattr(select, 'kqueue'):  # pragma: no cover
634
635    class Kqueue(_IOLoop):
636        """kqueue() based poller."""
637
638        def __init__(self):
639            _IOLoop.__init__(self)
640            self._kqueue = select.kqueue()
641            self._active = {}
642
643        def fileno(self):
644            """Return kqueue() fd."""
645            return self._kqueue.fileno()
646
647        def close(self):
648            _IOLoop.close(self)
649            self._kqueue.close()
650
651        def register(self, fd, instance, events):
652            self.socket_map[fd] = instance
653            try:
654                self._control(fd, events, select.KQ_EV_ADD)
655            except EnvironmentError as err:
656                if err.errno == errno.EEXIST:
657                    debug("call: register(); poller raised EEXIST; ignored",
658                          self)
659                else:
660                    raise
661            self._active[fd] = events
662
663        def unregister(self, fd):
664            try:
665                del self.socket_map[fd]
666                events = self._active.pop(fd)
667            except KeyError:
668                pass
669            else:
670                try:
671                    self._control(fd, events, select.KQ_EV_DELETE)
672                except EnvironmentError as err:
673                    if err.errno in (errno.ENOENT, errno.EBADF):
674                        debug("call: unregister(); poller returned %r; "
675                              "ignoring it" % err, self)
676                    else:
677                        raise
678
679        def modify(self, fd, events):
680            instance = self.socket_map[fd]
681            self.unregister(fd)
682            self.register(fd, instance, events)
683
684        def _control(self, fd, events, flags):
685            kevents = []
686            if events & self.WRITE:
687                kevents.append(select.kevent(
688                    fd, filter=select.KQ_FILTER_WRITE, flags=flags))
689            if events & self.READ or not kevents:
690                # always read when there is not a write
691                kevents.append(select.kevent(
692                    fd, filter=select.KQ_FILTER_READ, flags=flags))
693            # even though control() takes a list, it seems to return
694            # EINVAL on Mac OS X (10.6) when there is more than one
695            # event in the list
696            for kevent in kevents:
697                self._kqueue.control([kevent], 0)
698
699        # localize variable access to minimize overhead
700        def poll(self,
701                 timeout,
702                 _len=len,
703                 _READ=select.KQ_FILTER_READ,
704                 _WRITE=select.KQ_FILTER_WRITE,
705                 _EOF=select.KQ_EV_EOF,
706                 _ERROR=select.KQ_EV_ERROR):
707            try:
708                kevents = self._kqueue.control(None, _len(self.socket_map),
709                                               timeout)
710            except OSError as err:
711                if err.errno == errno.EINTR:
712                    return
713                raise
714            for kevent in kevents:
715                inst = self.socket_map.get(kevent.ident)
716                if inst is None:
717                    continue
718                if kevent.filter == _READ:
719                    if inst.readable():
720                        _read(inst)
721                if kevent.filter == _WRITE:
722                    if kevent.flags & _EOF:
723                        # If an asynchronous connection is refused,
724                        # kqueue returns a write event with the EOF
725                        # flag set.
726                        # Note that for read events, EOF may be returned
727                        # before all data has been consumed from the
728                        # socket buffer, so we only check for EOF on
729                        # write events.
730                        inst.handle_close()
731                    else:
732                        if inst.writable():
733                            _write(inst)
734                if kevent.flags & _ERROR:
735                    inst.handle_close()
736
737
738# ===================================================================
739# --- choose the better poller for this platform
740# ===================================================================
741
742if hasattr(select, 'epoll'):      # epoll() - Linux
743    IOLoop = Epoll
744elif hasattr(select, 'kqueue'):   # kqueue() - BSD / OSX
745    IOLoop = Kqueue
746elif hasattr(select, 'devpoll'):  # /dev/poll - Solaris
747    IOLoop = DevPoll
748elif hasattr(select, 'poll'):     # poll() - POSIX
749    IOLoop = Poll
750else:                             # select() - POSIX and Windows
751    IOLoop = Select
752
753
754# ===================================================================
755# --- asyncore dispatchers
756# ===================================================================
757
758# these are overridden in order to register() and unregister()
759# file descriptors against the new pollers
760
761
762class AsyncChat(asynchat.async_chat):
763    """Same as asynchat.async_chat, only working with the new IO poller
764    and being more clever in avoid registering for read events when
765    it shouldn't.
766    """
767
768    def __init__(self, sock=None, ioloop=None):
769        self.ioloop = ioloop or IOLoop.instance()
770        self._wanted_io_events = self.ioloop.READ
771        self._current_io_events = self.ioloop.READ
772        self._closed = False
773        self._closing = False
774        self._fileno = sock.fileno() if sock else None
775        self._tasks = []
776        asynchat.async_chat.__init__(self, sock)
777
778    # --- IO loop related methods
779
780    def add_channel(self, map=None, events=None):
781        assert self._fileno, repr(self._fileno)
782        events = events if events is not None else self.ioloop.READ
783        self.ioloop.register(self._fileno, self, events)
784        self._wanted_io_events = events
785        self._current_io_events = events
786
787    def del_channel(self, map=None):
788        if self._fileno is not None:
789            self.ioloop.unregister(self._fileno)
790
791    def modify_ioloop_events(self, events, logdebug=False):
792        if not self._closed:
793            assert self._fileno, repr(self._fileno)
794            if self._fileno not in self.ioloop.socket_map:
795                debug(
796                    "call: modify_ioloop_events(), fd was no longer in "
797                    "socket_map, had to register() it again", inst=self)
798                self.add_channel(events=events)
799            else:
800                if events != self._current_io_events:
801                    if logdebug:
802                        if events == self.ioloop.READ:
803                            ev = "R"
804                        elif events == self.ioloop.WRITE:
805                            ev = "W"
806                        elif events == self.ioloop.READ | self.ioloop.WRITE:
807                            ev = "RW"
808                        else:
809                            ev = events
810                        debug("call: IOLoop.modify(); setting %r IO events" % (
811                            ev), self)
812                    self.ioloop.modify(self._fileno, events)
813            self._current_io_events = events
814        else:
815            debug(
816                "call: modify_ioloop_events(), handler had already been "
817                "close()d, skipping modify()", inst=self)
818
819    # --- utils
820
821    def call_later(self, seconds, target, *args, **kwargs):
822        """Same as self.ioloop.call_later but also cancel()s the
823        scheduled function on close().
824        """
825        if '_errback' not in kwargs and hasattr(self, 'handle_error'):
826            kwargs['_errback'] = self.handle_error
827        callback = self.ioloop.call_later(seconds, target, *args, **kwargs)
828        self._tasks.append(callback)
829        return callback
830
831    # --- overridden asynchat methods
832
833    def connect(self, addr):
834        self.modify_ioloop_events(self.ioloop.WRITE)
835        asynchat.async_chat.connect(self, addr)
836
837    def connect_af_unspecified(self, addr, source_address=None):
838        """Same as connect() but guesses address family from addr.
839        Return the address family just determined.
840        """
841        assert self.socket is None
842        host, port = addr
843        err = "getaddrinfo() returned an empty list"
844        info = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
845                                  socket.SOCK_STREAM, 0, socket.AI_PASSIVE)
846        for res in info:
847            self.socket = None
848            af, socktype, proto, canonname, sa = res
849            try:
850                self.create_socket(af, socktype)
851                if source_address:
852                    if source_address[0].startswith('::ffff:'):
853                        # In this scenario, the server has an IPv6 socket, but
854                        # the remote client is using IPv4 and its address is
855                        # represented as an IPv4-mapped IPv6 address which
856                        # looks like this ::ffff:151.12.5.65, see:
857                        # http://en.wikipedia.org/wiki/IPv6\
858                        #     IPv4-mapped_addresses
859                        # http://tools.ietf.org/html/rfc3493.html#section-3.7
860                        # We truncate the first bytes to make it look like a
861                        # common IPv4 address.
862                        source_address = (source_address[0][7:],
863                                          source_address[1])
864                    self.bind(source_address)
865                self.connect((host, port))
866            except socket.error as _:
867                err = _
868                if self.socket is not None:
869                    self.socket.close()
870                    self.del_channel()
871                    self.socket = None
872                continue
873            break
874        if self.socket is None:
875            self.del_channel()
876            raise socket.error(err)
877        return af
878
879    # send() and recv() overridden as a fix around various bugs:
880    # - http://bugs.python.org/issue1736101
881    # - https://github.com/giampaolo/pyftpdlib/issues/104
882    # - https://github.com/giampaolo/pyftpdlib/issues/109
883
884    def send(self, data):
885        try:
886            return self.socket.send(data)
887        except socket.error as err:
888            debug("call: send(), err: %s" % err, inst=self)
889            if err.errno in _ERRNOS_RETRY:
890                return 0
891            elif err.errno in _ERRNOS_DISCONNECTED:
892                self.handle_close()
893                return 0
894            else:
895                raise
896
897    def recv(self, buffer_size):
898        try:
899            data = self.socket.recv(buffer_size)
900        except socket.error as err:
901            debug("call: recv(), err: %s" % err, inst=self)
902            if err.errno in _ERRNOS_DISCONNECTED:
903                self.handle_close()
904                return b''
905            elif err.errno in _ERRNOS_RETRY:
906                raise RetryError
907            else:
908                raise
909        else:
910            if not data:
911                # a closed connection is indicated by signaling
912                # a read condition, and having recv() return 0.
913                self.handle_close()
914                return b''
915            else:
916                return data
917
918    def handle_read(self):
919        try:
920            asynchat.async_chat.handle_read(self)
921        except RetryError:
922            # This can be raised by (the overridden) recv().
923            pass
924
925    def initiate_send(self):
926        asynchat.async_chat.initiate_send(self)
927        if not self._closed:
928            # if there's still data to send we want to be ready
929            # for writing, else we're only intereseted in reading
930            if not self.producer_fifo:
931                wanted = self.ioloop.READ
932            else:
933                # In FTPHandler, we also want to listen for user input
934                # hence the READ. DTPHandler has its own initiate_send()
935                # which will either READ or WRITE.
936                wanted = self.ioloop.READ | self.ioloop.WRITE
937            if self._wanted_io_events != wanted:
938                self.ioloop.modify(self._fileno, wanted)
939                self._wanted_io_events = wanted
940        else:
941            debug("call: initiate_send(); called with no connection",
942                  inst=self)
943
944    def close_when_done(self):
945        if len(self.producer_fifo) == 0:
946            self.handle_close()
947        else:
948            self._closing = True
949            asynchat.async_chat.close_when_done(self)
950
951    def close(self):
952        if not self._closed:
953            self._closed = True
954            try:
955                asynchat.async_chat.close(self)
956            finally:
957                for fun in self._tasks:
958                    try:
959                        fun.cancel()
960                    except Exception:
961                        logger.error(traceback.format_exc())
962                self._tasks = []
963                self._closed = True
964                self._closing = False
965                self.connected = False
966
967
968class Connector(AsyncChat):
969    """Same as base AsyncChat and supposed to be used for
970    clients.
971    """
972
973    def add_channel(self, map=None, events=None):
974        AsyncChat.add_channel(self, map=map, events=self.ioloop.WRITE)
975
976
977class Acceptor(AsyncChat):
978    """Same as base AsyncChat and supposed to be used to
979    accept new connections.
980    """
981
982    def add_channel(self, map=None, events=None):
983        AsyncChat.add_channel(self, map=map, events=self.ioloop.READ)
984
985    def bind_af_unspecified(self, addr):
986        """Same as bind() but guesses address family from addr.
987        Return the address family just determined.
988        """
989        assert self.socket is None
990        host, port = addr
991        if host == "":
992            # When using bind() "" is a symbolic name meaning all
993            # available interfaces. People might not know we're
994            # using getaddrinfo() internally, which uses None
995            # instead of "", so we'll make the conversion for them.
996            host = None
997        err = "getaddrinfo() returned an empty list"
998        info = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
999                                  socket.SOCK_STREAM, 0, socket.AI_PASSIVE)
1000        for res in info:
1001            self.socket = None
1002            self.del_channel()
1003            af, socktype, proto, canonname, sa = res
1004            try:
1005                self.create_socket(af, socktype)
1006                self.set_reuse_addr()
1007                self.bind(sa)
1008            except socket.error as _:
1009                err = _
1010                if self.socket is not None:
1011                    self.socket.close()
1012                    self.del_channel()
1013                    self.socket = None
1014                continue
1015            break
1016        if self.socket is None:
1017            self.del_channel()
1018            raise socket.error(err)
1019        return af
1020
1021    def listen(self, num):
1022        AsyncChat.listen(self, num)
1023        # XXX - this seems to be necessary, otherwise kqueue.control()
1024        # won't return listening fd events
1025        try:
1026            if isinstance(self.ioloop, Kqueue):
1027                self.ioloop.modify(self._fileno, self.ioloop.READ)
1028        except NameError:
1029            pass
1030
1031    def handle_accept(self):
1032        try:
1033            sock, addr = self.accept()
1034        except TypeError:
1035            # sometimes accept() might return None, see:
1036            # https://github.com/giampaolo/pyftpdlib/issues/91
1037            debug("call: handle_accept(); accept() returned None", self)
1038            return
1039        except socket.error as err:
1040            # ECONNABORTED might be thrown on *BSD, see:
1041            # https://github.com/giampaolo/pyftpdlib/issues/105
1042            if err.errno != errno.ECONNABORTED:
1043                raise
1044            else:
1045                debug("call: handle_accept(); accept() returned ECONNABORTED",
1046                      self)
1047        else:
1048            # sometimes addr == None instead of (ip, port) (see issue 104)
1049            if addr is not None:
1050                self.handle_accepted(sock, addr)
1051
1052    def handle_accepted(self, sock, addr):
1053        sock.close()
1054        self.log_info('unhandled accepted event', 'warning')
1055
1056    # overridden for convenience; avoid to reuse address on Windows
1057    if (os.name in ('nt', 'ce')) or (sys.platform == 'cygwin'):
1058        def set_reuse_addr(self):
1059            pass
1060