1#!/usr/bin/env python 2# 3# Copyright 2012 Facebook 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may 6# not use this file except in compliance with the License. You may obtain 7# a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations 15# under the License. 16"""KQueue-based IOLoop implementation for BSD/Mac systems.""" 17# pylint: skip-file 18from __future__ import absolute_import, division, print_function 19 20import select 21 22from salt.ext.tornado.ioloop import IOLoop, PollIOLoop 23 24assert hasattr(select, 'kqueue'), 'kqueue not supported' 25 26 27class _KQueue(object): 28 """A kqueue-based event loop for BSD/Mac systems.""" 29 def __init__(self): 30 self._kqueue = select.kqueue() 31 self._active = {} 32 33 def fileno(self): 34 return self._kqueue.fileno() 35 36 def close(self): 37 self._kqueue.close() 38 39 def register(self, fd, events): 40 if fd in self._active: 41 raise IOError("fd %s already registered" % fd) 42 self._control(fd, events, select.KQ_EV_ADD) 43 self._active[fd] = events 44 45 def modify(self, fd, events): 46 self.unregister(fd) 47 self.register(fd, events) 48 49 def unregister(self, fd): 50 events = self._active.pop(fd) 51 self._control(fd, events, select.KQ_EV_DELETE) 52 53 def _control(self, fd, events, flags): 54 kevents = [] 55 if events & IOLoop.WRITE: 56 kevents.append(select.kevent( 57 fd, filter=select.KQ_FILTER_WRITE, flags=flags)) 58 if events & IOLoop.READ: 59 kevents.append(select.kevent( 60 fd, filter=select.KQ_FILTER_READ, flags=flags)) 61 # Even though control() takes a list, it seems to return EINVAL 62 # on Mac OS X (10.6) when there is more than one event in the list. 63 for kevent in kevents: 64 self._kqueue.control([kevent], 0) 65 66 def poll(self, timeout): 67 kevents = self._kqueue.control(None, 1000, timeout) 68 events = {} 69 for kevent in kevents: 70 fd = kevent.ident 71 if kevent.filter == select.KQ_FILTER_READ: 72 events[fd] = events.get(fd, 0) | IOLoop.READ 73 if kevent.filter == select.KQ_FILTER_WRITE: 74 if kevent.flags & select.KQ_EV_EOF: 75 # If an asynchronous connection is refused, kqueue 76 # returns a write event with the EOF flag set. 77 # Turn this into an error for consistency with the 78 # other IOLoop implementations. 79 # Note that for read events, EOF may be returned before 80 # all data has been consumed from the socket buffer, 81 # so we only check for EOF on write events. 82 events[fd] = IOLoop.ERROR 83 else: 84 events[fd] = events.get(fd, 0) | IOLoop.WRITE 85 if kevent.flags & select.KQ_EV_ERROR: 86 events[fd] = events.get(fd, 0) | IOLoop.ERROR 87 return events.items() 88 89 90class KQueueIOLoop(PollIOLoop): 91 def initialize(self, **kwargs): 92 super(KQueueIOLoop, self).initialize(impl=_KQueue(), **kwargs) 93