1# Copyright (c) 2010, 2015 Nicira, Inc. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at: 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import errno 16import os 17 18import select 19import socket 20import sys 21 22import ovs.timeval 23import ovs.vlog 24 25if sys.platform == "win32": 26 import ovs.winutils as winutils 27 28try: 29 from OpenSSL import SSL 30except ImportError: 31 SSL = None 32 33try: 34 import eventlet.patcher 35 36 def _using_eventlet_green_select(): 37 return eventlet.patcher.is_monkey_patched(select) 38except: 39 def _using_eventlet_green_select(): 40 return False 41 42vlog = ovs.vlog.Vlog("poller") 43 44POLLIN = 0x001 45POLLOUT = 0x004 46POLLERR = 0x008 47POLLHUP = 0x010 48POLLNVAL = 0x020 49 50 51# eventlet/gevent doesn't support select.poll. If select.poll is used, 52# python interpreter is blocked as a whole instead of switching from the 53# current thread that is about to block to other runnable thread. 54# So emulate select.poll by select.select because using python means that 55# performance isn't so important. 56class _SelectSelect(object): 57 """ select.poll emulation by using select.select. 58 Only register and poll are needed at the moment. 59 """ 60 def __init__(self): 61 self.rlist = [] 62 self.wlist = [] 63 self.xlist = [] 64 65 def register(self, fd, events): 66 if isinstance(fd, socket.socket): 67 fd = fd.fileno() 68 if SSL and isinstance(fd, SSL.Connection): 69 fd = fd.fileno() 70 71 if sys.platform != 'win32': 72 # Skip this on Windows, it also register events 73 assert isinstance(fd, int) 74 if events & POLLIN: 75 self.rlist.append(fd) 76 events &= ~POLLIN 77 if events & POLLOUT: 78 self.wlist.append(fd) 79 events &= ~POLLOUT 80 if events: 81 self.xlist.append(fd) 82 83 def poll(self, timeout): 84 # XXX workaround a bug in eventlet 85 # see https://github.com/eventlet/eventlet/pull/25 86 if timeout == 0 and _using_eventlet_green_select(): 87 timeout = 0.1 88 if sys.platform == 'win32': 89 events = self.rlist + self.wlist + self.xlist 90 if not events: 91 return [] 92 if len(events) > winutils.win32event.MAXIMUM_WAIT_OBJECTS: 93 raise WindowsError("Cannot handle more than maximum wait" 94 "objects\n") 95 96 # win32event.INFINITE timeout is -1 97 # timeout must be an int number, expressed in ms 98 if timeout == 0.1: 99 timeout = 100 100 else: 101 timeout = int(timeout) 102 103 # Wait until any of the events is set to signaled 104 try: 105 retval = winutils.win32event.WaitForMultipleObjects( 106 events, 107 False, # Wait all 108 timeout) 109 except winutils.pywintypes.error: 110 return [(0, POLLERR)] 111 112 if retval == winutils.winerror.WAIT_TIMEOUT: 113 return [] 114 115 if events[retval] in self.rlist: 116 revent = POLLIN 117 elif events[retval] in self.wlist: 118 revent = POLLOUT 119 else: 120 revent = POLLERR 121 122 return [(events[retval], revent)] 123 else: 124 if timeout == -1: 125 # epoll uses -1 for infinite timeout, select uses None. 126 timeout = None 127 else: 128 timeout = float(timeout) / 1000 129 rlist, wlist, xlist = select.select(self.rlist, 130 self.wlist, 131 self.xlist, 132 timeout) 133 events_dict = {} 134 for fd in rlist: 135 events_dict[fd] = events_dict.get(fd, 0) | POLLIN 136 for fd in wlist: 137 events_dict[fd] = events_dict.get(fd, 0) | POLLOUT 138 for fd in xlist: 139 events_dict[fd] = events_dict.get(fd, 0) | (POLLERR | 140 POLLHUP | 141 POLLNVAL) 142 return list(events_dict.items()) 143 144 145SelectPoll = _SelectSelect 146# If eventlet/gevent isn't used, we can use select.poll by replacing 147# _SelectPoll with select.poll class 148# _SelectPoll = select.poll 149 150 151class Poller(object): 152 """High-level wrapper around the "poll" system call. 153 154 Intended usage is for the program's main loop to go about its business 155 servicing whatever events it needs to. Then, when it runs out of immediate 156 tasks, it calls each subordinate module or object's "wait" function, which 157 in turn calls one (or more) of the functions Poller.fd_wait(), 158 Poller.immediate_wake(), and Poller.timer_wait() to register to be awakened 159 when the appropriate event occurs. Then the main loop calls 160 Poller.block(), which blocks until one of the registered events happens.""" 161 162 def __init__(self): 163 self.__reset() 164 165 def fd_wait(self, fd, events): 166 """Registers 'fd' as waiting for the specified 'events' (which should 167 be select.POLLIN or select.POLLOUT or their bitwise-OR). The following 168 call to self.block() will wake up when 'fd' becomes ready for one or 169 more of the requested events. 170 171 The event registration is one-shot: only the following call to 172 self.block() is affected. The event will need to be re-registered 173 after self.block() is called if it is to persist. 174 175 'fd' may be an integer file descriptor or an object with a fileno() 176 method that returns an integer file descriptor.""" 177 self.poll.register(fd, events) 178 179 def __timer_wait(self, msec): 180 if self.timeout < 0 or msec < self.timeout: 181 self.timeout = msec 182 183 def timer_wait(self, msec): 184 """Causes the following call to self.block() to block for no more than 185 'msec' milliseconds. If 'msec' is nonpositive, the following call to 186 self.block() will not block at all. 187 188 The timer registration is one-shot: only the following call to 189 self.block() is affected. The timer will need to be re-registered 190 after self.block() is called if it is to persist.""" 191 if msec <= 0: 192 self.immediate_wake() 193 else: 194 self.__timer_wait(msec) 195 196 def timer_wait_until(self, msec): 197 """Causes the following call to self.block() to wake up when the 198 current time, as returned by ovs.timeval.msec(), reaches 'msec' or 199 later. If 'msec' is earlier than the current time, the following call 200 to self.block() will not block at all. 201 202 The timer registration is one-shot: only the following call to 203 self.block() is affected. The timer will need to be re-registered 204 after self.block() is called if it is to persist.""" 205 now = ovs.timeval.msec() 206 if msec <= now: 207 self.immediate_wake() 208 else: 209 self.__timer_wait(msec - now) 210 211 def immediate_wake(self): 212 """Causes the following call to self.block() to wake up immediately, 213 without blocking.""" 214 self.timeout = 0 215 216 def block(self): 217 """Blocks until one or more of the events registered with 218 self.fd_wait() occurs, or until the minimum duration registered with 219 self.timer_wait() elapses, or not at all if self.immediate_wake() has 220 been called.""" 221 try: 222 try: 223 events = self.poll.poll(self.timeout) 224 self.__log_wakeup(events) 225 except OSError as e: 226 """ On Windows, the select function from poll raises OSError 227 exception if the polled array is empty.""" 228 if e.errno != errno.EINTR: 229 vlog.err("poll: %s" % os.strerror(e.errno)) 230 except select.error as e: 231 # XXX rate-limit 232 error, msg = e 233 if error != errno.EINTR: 234 vlog.err("poll: %s" % e[1]) 235 finally: 236 self.__reset() 237 238 def __log_wakeup(self, events): 239 if not events: 240 vlog.dbg("%d-ms timeout" % self.timeout) 241 else: 242 for fd, revents in events: 243 if revents != 0: 244 s = "" 245 if revents & POLLIN: 246 s += "[POLLIN]" 247 if revents & POLLOUT: 248 s += "[POLLOUT]" 249 if revents & POLLERR: 250 s += "[POLLERR]" 251 if revents & POLLHUP: 252 s += "[POLLHUP]" 253 if revents & POLLNVAL: 254 s += "[POLLNVAL]" 255 vlog.dbg("%s on fd %d" % (s, fd)) 256 257 def __reset(self): 258 self.poll = SelectPoll() 259 self.timeout = -1 260