1"""Poller Components for asynchronous file and socket I/O.
2
3This module contains Poller components that enable polling of file or socket
4descriptors for read/write events. Pollers:
5- Select
6- Poll
7- EPoll
8"""
9
10import os
11import select
12import platform
13from errno import EBADF, EINTR
14from select import error as SelectError
15from socket import error as SocketError, create_connection, \
16    socket as create_socket, AF_INET, SOCK_STREAM, socket
17from threading import Thread
18from circuits.core.handlers import handler
19
20from .events import Event
21from .components import BaseComponent
22
23
24class _read(Event):
25
26    """_read Event"""
27
28
29class _write(Event):
30
31    """_write Event"""
32
33
34class _error(Event):
35
36    """_error Event"""
37
38
39class _disconnect(Event):
40
41    """_disconnect Event"""
42
43
44class BasePoller(BaseComponent):
45
46    channel = None
47
48    def __init__(self, channel=channel):
49        super(BasePoller, self).__init__(channel=channel)
50
51        self._read = []
52        self._write = []
53        self._targets = {}
54
55        self._ctrl_recv, self._ctrl_send = self._create_control_con()
56
57    def _create_control_con(self):
58        if platform.system() == "Linux":
59            return os.pipe()
60        server = create_socket(AF_INET, SOCK_STREAM)
61        server.bind(("localhost", 0))
62        server.listen(1)
63        res_list = []
64
65        def accept():
66            sock, _ = server.accept()
67            sock.setblocking(False)
68            res_list.append(sock)
69        at = Thread(target=accept)
70        at.start()
71        clnt_sock = create_connection(server.getsockname())
72        at.join()
73        return (res_list[0], clnt_sock)
74
75    @handler("generate_events", priority=-9)
76    def _on_generate_events(self, event):
77        """
78        Pollers have slightly higher priority than the default handler
79        from Manager to ensure that they are invoked before the
80        default handler. They act as event filters to avoid the additional
81        invocation of the default handler which would be unnecessary
82        overhead.
83        """
84
85        event.stop()
86        self._generate_events(event)
87
88    def resume(self):
89        if isinstance(self._ctrl_send, socket):
90            self._ctrl_send.send(b"\0")
91        else:
92            os.write(self._ctrl_send, b"\0")
93
94    def _read_ctrl(self):
95        try:
96            if isinstance(self._ctrl_recv, socket):
97                return self._ctrl_recv.recv(1)
98            else:
99                return os.read(self._ctrl_recv, 1)
100        except:
101            return b"\0"
102
103    def addReader(self, source, fd):
104        channel = getattr(source, "channel", "*")
105        self._read.append(fd)
106        self._targets[fd] = channel
107
108    def addWriter(self, source, fd):
109        channel = getattr(source, "channel", "*")
110        self._write.append(fd)
111        self._targets[fd] = channel
112
113    def removeReader(self, fd):
114        if fd in self._read:
115            self._read.remove(fd)
116        if not (fd in self._read or fd in self._write) and fd in self._targets:
117            del self._targets[fd]
118
119    def removeWriter(self, fd):
120        if fd in self._write:
121            self._write.remove(fd)
122        if not (fd in self._read or fd in self._write) and fd in self._targets:
123            del self._targets[fd]
124
125    def isReading(self, fd):
126        return fd in self._read
127
128    def isWriting(self, fd):
129        return fd in self._write
130
131    def discard(self, fd):
132        if fd in self._read:
133            self._read.remove(fd)
134        if fd in self._write:
135            self._write.remove(fd)
136        if fd in self._targets:
137            del self._targets[fd]
138
139    def getTarget(self, fd):
140        return self._targets.get(fd, self.parent)
141
142
143class Select(BasePoller):
144
145    """Select(...) -> new Select Poller Component
146
147    Creates a new Select Poller Component that uses the select poller
148    implementation. This poller is not recommended but is available for legacy
149    reasons as most systems implement select-based polling for backwards
150    compatibility.
151    """
152
153    channel = "select"
154
155    def __init__(self, channel=channel):
156        super(Select, self).__init__(channel=channel)
157
158        self._read.append(self._ctrl_recv)
159
160    def _preenDescriptors(self):
161        for socks in (self._read[:], self._write[:]):
162            for sock in socks:
163                try:
164                    select.select([sock], [sock], [sock], 0)
165                except Exception:
166                    self.discard(sock)
167
168    def _generate_events(self, event):
169        try:
170            if not any([self._read, self._write]):
171                return
172            timeout = event.time_left
173            if timeout < 0:
174                r, w, _ = select.select(self._read, self._write, [])
175            else:
176                r, w, _ = select.select(self._read, self._write, [], timeout)
177        except ValueError as e:
178            # Possibly a file descriptor has gone negative?
179            return self._preenDescriptors()
180        except TypeError as e:
181            # Something *totally* invalid (object w/o fileno, non-integral
182            # result) was passed
183            return self._preenDescriptors()
184        except (SelectError, SocketError, IOError) as e:
185            # select(2) encountered an error
186            if e.args[0] in (0, 2):
187                # windows does this if it got an empty list
188                if (not self._read) and (not self._write):
189                    return
190                else:
191                    raise
192            elif e.args[0] == EINTR:
193                return
194            elif e.args[0] == EBADF:
195                return self._preenDescriptors()
196            else:
197                # OK, I really don't know what's going on.  Blow up.
198                raise
199
200        for sock in w:
201            if self.isWriting(sock):
202                self.fire(_write(sock), self.getTarget(sock))
203
204        for sock in r:
205            if sock == self._ctrl_recv:
206                self._read_ctrl()
207                continue
208            if self.isReading(sock):
209                self.fire(_read(sock), self.getTarget(sock))
210
211
212class Poll(BasePoller):
213
214    """Poll(...) -> new Poll Poller Component
215
216    Creates a new Poll Poller Component that uses the poll poller
217    implementation.
218    """
219
220    channel = "poll"
221
222    def __init__(self, channel=channel):
223        super(Poll, self).__init__(channel=channel)
224
225        self._map = {}
226        self._poller = select.poll()
227
228        self._disconnected_flag = (
229            select.POLLHUP
230            | select.POLLERR
231            | select.POLLNVAL
232        )
233
234        self._read.append(self._ctrl_recv)
235        self._updateRegistration(self._ctrl_recv)
236
237    def _updateRegistration(self, fd):
238        fileno = fd.fileno() if not isinstance(fd, int) else fd
239
240        try:
241            self._poller.unregister(fileno)
242        except (KeyError, ValueError):
243            pass
244
245        mask = 0
246
247        if fd in self._read:
248            mask = mask | select.POLLIN
249        if fd in self._write:
250            mask = mask | select.POLLOUT
251
252        if mask:
253            self._poller.register(fd, mask)
254            self._map[fileno] = fd
255        else:
256            super(Poll, self).discard(fd)
257            try:
258                del self._map[fileno]
259            except KeyError:
260                pass
261
262    def addReader(self, source, fd):
263        super(Poll, self).addReader(source, fd)
264        self._updateRegistration(fd)
265
266    def addWriter(self, source, fd):
267        super(Poll, self).addWriter(source, fd)
268        self._updateRegistration(fd)
269
270    def removeReader(self, fd):
271        super(Poll, self).removeReader(fd)
272        self._updateRegistration(fd)
273
274    def removeWriter(self, fd):
275        super(Poll, self).removeWriter(fd)
276        self._updateRegistration(fd)
277
278    def discard(self, fd):
279        super(Poll, self).discard(fd)
280        self._updateRegistration(fd)
281
282    def _generate_events(self, event):
283        try:
284            timeout = event.time_left
285            if timeout < 0:
286                l = self._poller.poll()
287            else:
288                l = self._poller.poll(1000 * timeout)
289        except SelectError as e:
290            if e.args[0] == EINTR:
291                return
292            else:
293                raise
294
295        for fileno, event in l:
296            self._process(fileno, event)
297
298    def _process(self, fileno, event):
299        if fileno not in self._map:
300            return
301
302        fd = self._map[fileno]
303        if fd == self._ctrl_recv:
304            self._read_ctrl()
305            return
306
307        if event & self._disconnected_flag and not (event & select.POLLIN):
308            self.fire(_disconnect(fd), self.getTarget(fd))
309            self._poller.unregister(fileno)
310            super(Poll, self).discard(fd)
311            del self._map[fileno]
312        else:
313            try:
314                if event & select.POLLIN:
315                    self.fire(_read(fd), self.getTarget(fd))
316                if event & select.POLLOUT:
317                    self.fire(_write(fd), self.getTarget(fd))
318            except Exception as e:
319                self.fire(_error(fd, e), self.getTarget(fd))
320                self.fire(_disconnect(fd), self.getTarget(fd))
321                self._poller.unregister(fileno)
322                super(Poll, self).discard(fd)
323                del self._map[fileno]
324
325
326class EPoll(BasePoller):
327
328    """EPoll(...) -> new EPoll Poller Component
329
330    Creates a new EPoll Poller Component that uses the epoll poller
331    implementation.
332    """
333
334    channel = "epoll"
335
336    def __init__(self, channel=channel):
337        super(EPoll, self).__init__(channel=channel)
338
339        self._map = {}
340        self._poller = select.epoll()
341
342        self._disconnected_flag = (select.EPOLLHUP | select.EPOLLERR)
343
344        self._read.append(self._ctrl_recv)
345        self._updateRegistration(self._ctrl_recv)
346
347    def _updateRegistration(self, fd):
348        try:
349            fileno = fd.fileno() if not isinstance(fd, int) else fd
350            self._poller.unregister(fileno)
351        except (SocketError, IOError, ValueError) as e:
352            if e.args[0] == EBADF:
353                keys = [k for k, v in list(self._map.items()) if v == fd]
354                for key in keys:
355                    del self._map[key]
356
357        mask = 0
358
359        if fd in self._read:
360            mask = mask | select.EPOLLIN
361        if fd in self._write:
362            mask = mask | select.EPOLLOUT
363
364        if mask:
365            self._poller.register(fd, mask)
366            self._map[fileno] = fd
367        else:
368            super(EPoll, self).discard(fd)
369
370    def addReader(self, source, fd):
371        super(EPoll, self).addReader(source, fd)
372        self._updateRegistration(fd)
373
374    def addWriter(self, source, fd):
375        super(EPoll, self).addWriter(source, fd)
376        self._updateRegistration(fd)
377
378    def removeReader(self, fd):
379        super(EPoll, self).removeReader(fd)
380        self._updateRegistration(fd)
381
382    def removeWriter(self, fd):
383        super(EPoll, self).removeWriter(fd)
384        self._updateRegistration(fd)
385
386    def discard(self, fd):
387        super(EPoll, self).discard(fd)
388        self._updateRegistration(fd)
389
390    def _generate_events(self, event):
391        try:
392            timeout = event.time_left
393            if timeout < 0:
394                l = self._poller.poll()
395            else:
396                l = self._poller.poll(timeout)
397        except IOError as e:
398            if e.args[0] == EINTR:
399                return
400        except SelectError as e:
401            if e.args[0] == EINTR:
402                return
403            else:
404                raise
405
406        for fileno, event in l:
407            self._process(fileno, event)
408
409    def _process(self, fileno, event):
410        if fileno not in self._map:
411            return
412
413        fd = self._map[fileno]
414        if fd == self._ctrl_recv:
415            self._read_ctrl()
416            return
417
418        if event & self._disconnected_flag and not (event & select.POLLIN):
419            self.fire(_disconnect(fd), self.getTarget(fd))
420            self._poller.unregister(fileno)
421            super(EPoll, self).discard(fd)
422            del self._map[fileno]
423        else:
424            try:
425                if event & select.EPOLLIN:
426                    self.fire(_read(fd), self.getTarget(fd))
427                if event & select.EPOLLOUT:
428                    self.fire(_write(fd), self.getTarget(fd))
429            except Exception as e:
430                self.fire(_error(fd, e), self.getTarget(fd))
431                self.fire(_disconnect(fd), self.getTarget(fd))
432                self._poller.unregister(fileno)
433                super(EPoll, self).discard(fd)
434                del self._map[fileno]
435
436
437class KQueue(BasePoller):
438
439    """KQueue(...) -> new KQueue Poller Component
440
441    Creates a new KQueue Poller Component that uses the kqueue poller
442    implementation.
443    """
444
445    channel = "kqueue"
446
447    def __init__(self, channel=channel):
448        super(KQueue, self).__init__(channel=channel)
449        self._map = {}
450        self._poller = select.kqueue()
451
452        self._read.append(self._ctrl_recv)
453        self._map[self._ctrl_recv.fileno()] = self._ctrl_recv
454        self._poller.control(
455            [
456                select.kevent(
457                    self._ctrl_recv, select.KQ_FILTER_READ, select.KQ_EV_ADD
458                )
459            ], 0
460        )
461
462    def addReader(self, source, sock):
463        super(KQueue, self).addReader(source, sock)
464        self._map[sock.fileno()] = sock
465        self._poller.control(
466            [select.kevent(sock, select.KQ_FILTER_READ, select.KQ_EV_ADD)], 0
467        )
468
469    def addWriter(self, source, sock):
470        super(KQueue, self).addWriter(source, sock)
471        self._map[sock.fileno()] = sock
472        self._poller.control(
473            [select.kevent(sock, select.KQ_FILTER_WRITE, select.KQ_EV_ADD)], 0
474        )
475
476    def removeReader(self, sock):
477        super(KQueue, self).removeReader(sock)
478        self._poller.control(
479            [
480                select.kevent(sock, select.KQ_FILTER_READ, select.KQ_EV_DELETE)
481            ],
482            0
483        )
484
485    def removeWriter(self, sock):
486        super(KQueue, self).removeWriter(sock)
487        self._poller.control(
488            [select.kevent(sock, select.KQ_FILTER_WRITE, select.KQ_EV_DELETE)],
489            0
490        )
491
492    def discard(self, sock):
493        super(KQueue, self).discard(sock)
494        del self._map[sock.fileno()]
495        self._poller.control(
496            [
497                select.kevent(
498                    sock,
499                    select.KQ_FILTER_WRITE | select.KQ_FILTER_READ,
500                    select.KQ_EV_DELETE
501                )
502            ],
503            0
504        )
505
506    def _generate_events(self, event):
507        try:
508            timeout = event.time_left
509            if timeout < 0:
510                l = self._poller.control(None, 1000)
511            else:
512                l = self._poller.control(None, 1000, timeout)
513        except SelectError as e:
514            if e[0] == EINTR:
515                return
516            else:
517                raise
518
519        for event in l:
520            self._process(event)
521
522    def _process(self, event):
523        if event.ident not in self._map:
524            # shouldn't happen ?
525            # we unregister the socket since we don't care about it anymore
526            self._poller.control(
527                [
528                    select.kevent(
529                        event.ident, event.filter, select.KQ_EV_DELETE
530                    )
531                ],
532                0
533            )
534
535            return
536
537        sock = self._map[event.ident]
538        if sock == self._ctrl_recv:
539            self._read_ctrl()
540            return
541
542        if event.flags & select.KQ_EV_ERROR:
543            self.fire(_error(sock, "error"), self.getTarget(sock))
544        elif event.flags & select.KQ_EV_EOF:
545            self.fire(_disconnect(sock), self.getTarget(sock))
546        elif event.filter == select.KQ_FILTER_WRITE:
547            self.fire(_write(sock), self.getTarget(sock))
548        elif event.filter == select.KQ_FILTER_READ:
549            self.fire(_read(sock), self.getTarget(sock))
550
551Poller = Select
552
553__all__ = ("BasePoller", "Poller", "Select", "Poll", "EPoll", "KQueue")
554