1# Copyright (c) 2010, 2015 Nicira, Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at:
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import errno
16import os
17
18import select
19import socket
20import sys
21
22import ovs.timeval
23import ovs.vlog
24
25if sys.platform == "win32":
26    import ovs.winutils as winutils
27
28try:
29    from OpenSSL import SSL
30except ImportError:
31    SSL = None
32
33try:
34    import eventlet.patcher
35
36    def _using_eventlet_green_select():
37        return eventlet.patcher.is_monkey_patched(select)
38except:
39    def _using_eventlet_green_select():
40        return False
41
42vlog = ovs.vlog.Vlog("poller")
43
44POLLIN = 0x001
45POLLOUT = 0x004
46POLLERR = 0x008
47POLLHUP = 0x010
48POLLNVAL = 0x020
49
50
51# eventlet/gevent doesn't support select.poll. If select.poll is used,
52# python interpreter is blocked as a whole instead of switching from the
53# current thread that is about to block to other runnable thread.
54# So emulate select.poll by select.select because using python means that
55# performance isn't so important.
56class _SelectSelect(object):
57    """ select.poll emulation by using select.select.
58    Only register and poll are needed at the moment.
59    """
60    def __init__(self):
61        self.rlist = []
62        self.wlist = []
63        self.xlist = []
64
65    def register(self, fd, events):
66        if isinstance(fd, socket.socket):
67            fd = fd.fileno()
68        if SSL and isinstance(fd, SSL.Connection):
69            fd = fd.fileno()
70
71        if sys.platform != 'win32':
72            # Skip this on Windows, it also register events
73            assert isinstance(fd, int)
74        if events & POLLIN:
75            self.rlist.append(fd)
76            events &= ~POLLIN
77        if events & POLLOUT:
78            self.wlist.append(fd)
79            events &= ~POLLOUT
80        if events:
81            self.xlist.append(fd)
82
83    def poll(self, timeout):
84        # XXX workaround a bug in eventlet
85        # see https://github.com/eventlet/eventlet/pull/25
86        if timeout == 0 and _using_eventlet_green_select():
87            timeout = 0.1
88        if sys.platform == 'win32':
89            events = self.rlist + self.wlist + self.xlist
90            if not events:
91                return []
92            if len(events) > winutils.win32event.MAXIMUM_WAIT_OBJECTS:
93                raise WindowsError("Cannot handle more than maximum wait"
94                                   "objects\n")
95
96            # win32event.INFINITE timeout is -1
97            # timeout must be an int number, expressed in ms
98            if timeout == 0.1:
99                timeout = 100
100            else:
101                timeout = int(timeout)
102
103            # Wait until any of the events is set to signaled
104            try:
105                retval = winutils.win32event.WaitForMultipleObjects(
106                    events,
107                    False,  # Wait all
108                    timeout)
109            except winutils.pywintypes.error:
110                    return [(0, POLLERR)]
111
112            if retval == winutils.winerror.WAIT_TIMEOUT:
113                return []
114
115            if events[retval] in self.rlist:
116                revent = POLLIN
117            elif events[retval] in self.wlist:
118                revent = POLLOUT
119            else:
120                revent = POLLERR
121
122            return [(events[retval], revent)]
123        else:
124            if timeout == -1:
125                # epoll uses -1 for infinite timeout, select uses None.
126                timeout = None
127            else:
128                timeout = float(timeout) / 1000
129            rlist, wlist, xlist = select.select(self.rlist,
130                                                self.wlist,
131                                                self.xlist,
132                                                timeout)
133            events_dict = {}
134            for fd in rlist:
135                events_dict[fd] = events_dict.get(fd, 0) | POLLIN
136            for fd in wlist:
137                events_dict[fd] = events_dict.get(fd, 0) | POLLOUT
138            for fd in xlist:
139                events_dict[fd] = events_dict.get(fd, 0) | (POLLERR |
140                                                            POLLHUP |
141                                                            POLLNVAL)
142            return list(events_dict.items())
143
144
145SelectPoll = _SelectSelect
146# If eventlet/gevent isn't used, we can use select.poll by replacing
147# _SelectPoll with select.poll class
148# _SelectPoll = select.poll
149
150
151class Poller(object):
152    """High-level wrapper around the "poll" system call.
153
154    Intended usage is for the program's main loop to go about its business
155    servicing whatever events it needs to.  Then, when it runs out of immediate
156    tasks, it calls each subordinate module or object's "wait" function, which
157    in turn calls one (or more) of the functions Poller.fd_wait(),
158    Poller.immediate_wake(), and Poller.timer_wait() to register to be awakened
159    when the appropriate event occurs.  Then the main loop calls
160    Poller.block(), which blocks until one of the registered events happens."""
161
162    def __init__(self):
163        self.__reset()
164
165    def fd_wait(self, fd, events):
166        """Registers 'fd' as waiting for the specified 'events' (which should
167        be select.POLLIN or select.POLLOUT or their bitwise-OR).  The following
168        call to self.block() will wake up when 'fd' becomes ready for one or
169        more of the requested events.
170
171        The event registration is one-shot: only the following call to
172        self.block() is affected.  The event will need to be re-registered
173        after self.block() is called if it is to persist.
174
175        'fd' may be an integer file descriptor or an object with a fileno()
176        method that returns an integer file descriptor."""
177        self.poll.register(fd, events)
178
179    def __timer_wait(self, msec):
180        if self.timeout < 0 or msec < self.timeout:
181            self.timeout = msec
182
183    def timer_wait(self, msec):
184        """Causes the following call to self.block() to block for no more than
185        'msec' milliseconds.  If 'msec' is nonpositive, the following call to
186        self.block() will not block at all.
187
188        The timer registration is one-shot: only the following call to
189        self.block() is affected.  The timer will need to be re-registered
190        after self.block() is called if it is to persist."""
191        if msec <= 0:
192            self.immediate_wake()
193        else:
194            self.__timer_wait(msec)
195
196    def timer_wait_until(self, msec):
197        """Causes the following call to self.block() to wake up when the
198        current time, as returned by ovs.timeval.msec(), reaches 'msec' or
199        later.  If 'msec' is earlier than the current time, the following call
200        to self.block() will not block at all.
201
202        The timer registration is one-shot: only the following call to
203        self.block() is affected.  The timer will need to be re-registered
204        after self.block() is called if it is to persist."""
205        now = ovs.timeval.msec()
206        if msec <= now:
207            self.immediate_wake()
208        else:
209            self.__timer_wait(msec - now)
210
211    def immediate_wake(self):
212        """Causes the following call to self.block() to wake up immediately,
213        without blocking."""
214        self.timeout = 0
215
216    def block(self):
217        """Blocks until one or more of the events registered with
218        self.fd_wait() occurs, or until the minimum duration registered with
219        self.timer_wait() elapses, or not at all if self.immediate_wake() has
220        been called."""
221        try:
222            try:
223                events = self.poll.poll(self.timeout)
224                self.__log_wakeup(events)
225            except OSError as e:
226                """ On Windows, the select function from poll raises OSError
227                exception if the polled array is empty."""
228                if e.errno != errno.EINTR:
229                    vlog.err("poll: %s" % os.strerror(e.errno))
230            except select.error as e:
231                # XXX rate-limit
232                error, msg = e
233                if error != errno.EINTR:
234                    vlog.err("poll: %s" % e[1])
235        finally:
236            self.__reset()
237
238    def __log_wakeup(self, events):
239        if not events:
240            vlog.dbg("%d-ms timeout" % self.timeout)
241        else:
242            for fd, revents in events:
243                if revents != 0:
244                    s = ""
245                    if revents & POLLIN:
246                        s += "[POLLIN]"
247                    if revents & POLLOUT:
248                        s += "[POLLOUT]"
249                    if revents & POLLERR:
250                        s += "[POLLERR]"
251                    if revents & POLLHUP:
252                        s += "[POLLHUP]"
253                    if revents & POLLNVAL:
254                        s += "[POLLNVAL]"
255                    vlog.dbg("%s on fd %d" % (s, fd))
256
257    def __reset(self):
258        self.poll = SelectPoll()
259        self.timeout = -1
260