1# 2# Copyright 2012 Facebook 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may 5# not use this file except in compliance with the License. You may obtain 6# a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations 14# under the License. 15"""KQueue-based IOLoop implementation for BSD/Mac systems.""" 16from __future__ import absolute_import, division, print_function 17 18import select 19 20from tornado.ioloop import IOLoop, PollIOLoop 21 22assert hasattr(select, 'kqueue'), 'kqueue not supported' 23 24 25class _KQueue(object): 26 """A kqueue-based event loop for BSD/Mac systems.""" 27 def __init__(self): 28 self._kqueue = select.kqueue() 29 self._active = {} 30 31 def fileno(self): 32 return self._kqueue.fileno() 33 34 def close(self): 35 self._kqueue.close() 36 37 def register(self, fd, events): 38 if fd in self._active: 39 raise IOError("fd %s already registered" % fd) 40 self._control(fd, events, select.KQ_EV_ADD) 41 self._active[fd] = events 42 43 def modify(self, fd, events): 44 self.unregister(fd) 45 self.register(fd, events) 46 47 def unregister(self, fd): 48 events = self._active.pop(fd) 49 self._control(fd, events, select.KQ_EV_DELETE) 50 51 def _control(self, fd, events, flags): 52 kevents = [] 53 if events & IOLoop.WRITE: 54 kevents.append(select.kevent( 55 fd, filter=select.KQ_FILTER_WRITE, flags=flags)) 56 if events & IOLoop.READ: 57 kevents.append(select.kevent( 58 fd, filter=select.KQ_FILTER_READ, flags=flags)) 59 # Even though control() takes a list, it seems to return EINVAL 60 # on Mac OS X (10.6) when there is more than one event in the list. 61 for kevent in kevents: 62 self._kqueue.control([kevent], 0) 63 64 def poll(self, timeout): 65 kevents = self._kqueue.control(None, 1000, timeout) 66 events = {} 67 for kevent in kevents: 68 fd = kevent.ident 69 if kevent.filter == select.KQ_FILTER_READ: 70 events[fd] = events.get(fd, 0) | IOLoop.READ 71 if kevent.filter == select.KQ_FILTER_WRITE: 72 if kevent.flags & select.KQ_EV_EOF: 73 # If an asynchronous connection is refused, kqueue 74 # returns a write event with the EOF flag set. 75 # Turn this into an error for consistency with the 76 # other IOLoop implementations. 77 # Note that for read events, EOF may be returned before 78 # all data has been consumed from the socket buffer, 79 # so we only check for EOF on write events. 80 events[fd] = IOLoop.ERROR 81 else: 82 events[fd] = events.get(fd, 0) | IOLoop.WRITE 83 if kevent.flags & select.KQ_EV_ERROR: 84 events[fd] = events.get(fd, 0) | IOLoop.ERROR 85 return events.items() 86 87 88class KQueueIOLoop(PollIOLoop): 89 def initialize(self, **kwargs): 90 super(KQueueIOLoop, self).initialize(impl=_KQueue(), **kwargs) 91