1# -*- coding: utf-8 -*-
2
3"""Extremely simple pure-Python implementation of coroutine-style
4asynchronous socket I/O. Inspired by, but inferior to, Eventlet.
5Bluelet can also be thought of as a less-terrible replacement for
6asyncore.
7
8Bluelet: easy concurrency without all the messy parallelism.
9"""
10from __future__ import division, absolute_import, print_function
11
12import six
13import socket
14import select
15import sys
16import types
17import errno
18import traceback
19import time
20import collections
21
22
23# Basic events used for thread scheduling.
24
25class Event(object):
26    """Just a base class identifying Bluelet events. An event is an
27    object yielded from a Bluelet thread coroutine to suspend operation
28    and communicate with the scheduler.
29    """
30    pass
31
32
33class WaitableEvent(Event):
34    """A waitable event is one encapsulating an action that can be
35    waited for using a select() call. That is, it's an event with an
36    associated file descriptor.
37    """
38    def waitables(self):
39        """Return "waitable" objects to pass to select(). Should return
40        three iterables for input readiness, output readiness, and
41        exceptional conditions (i.e., the three lists passed to
42        select()).
43        """
44        return (), (), ()
45
46    def fire(self):
47        """Called when an associated file descriptor becomes ready
48        (i.e., is returned from a select() call).
49        """
50        pass
51
52
53class ValueEvent(Event):
54    """An event that does nothing but return a fixed value."""
55    def __init__(self, value):
56        self.value = value
57
58
59class ExceptionEvent(Event):
60    """Raise an exception at the yield point. Used internally."""
61    def __init__(self, exc_info):
62        self.exc_info = exc_info
63
64
65class SpawnEvent(Event):
66    """Add a new coroutine thread to the scheduler."""
67    def __init__(self, coro):
68        self.spawned = coro
69
70
71class JoinEvent(Event):
72    """Suspend the thread until the specified child thread has
73    completed.
74    """
75    def __init__(self, child):
76        self.child = child
77
78
79class KillEvent(Event):
80    """Unschedule a child thread."""
81    def __init__(self, child):
82        self.child = child
83
84
85class DelegationEvent(Event):
86    """Suspend execution of the current thread, start a new thread and,
87    once the child thread finished, return control to the parent
88    thread.
89    """
90    def __init__(self, coro):
91        self.spawned = coro
92
93
94class ReturnEvent(Event):
95    """Return a value the current thread's delegator at the point of
96    delegation. Ends the current (delegate) thread.
97    """
98    def __init__(self, value):
99        self.value = value
100
101
102class SleepEvent(WaitableEvent):
103    """Suspend the thread for a given duration.
104    """
105    def __init__(self, duration):
106        self.wakeup_time = time.time() + duration
107
108    def time_left(self):
109        return max(self.wakeup_time - time.time(), 0.0)
110
111
112class ReadEvent(WaitableEvent):
113    """Reads from a file-like object."""
114    def __init__(self, fd, bufsize):
115        self.fd = fd
116        self.bufsize = bufsize
117
118    def waitables(self):
119        return (self.fd,), (), ()
120
121    def fire(self):
122        return self.fd.read(self.bufsize)
123
124
125class WriteEvent(WaitableEvent):
126    """Writes to a file-like object."""
127    def __init__(self, fd, data):
128        self.fd = fd
129        self.data = data
130
131    def waitable(self):
132        return (), (self.fd,), ()
133
134    def fire(self):
135        self.fd.write(self.data)
136
137
138# Core logic for executing and scheduling threads.
139
140def _event_select(events):
141    """Perform a select() over all the Events provided, returning the
142    ones ready to be fired. Only WaitableEvents (including SleepEvents)
143    matter here; all other events are ignored (and thus postponed).
144    """
145    # Gather waitables and wakeup times.
146    waitable_to_event = {}
147    rlist, wlist, xlist = [], [], []
148    earliest_wakeup = None
149    for event in events:
150        if isinstance(event, SleepEvent):
151            if not earliest_wakeup:
152                earliest_wakeup = event.wakeup_time
153            else:
154                earliest_wakeup = min(earliest_wakeup, event.wakeup_time)
155        elif isinstance(event, WaitableEvent):
156            r, w, x = event.waitables()
157            rlist += r
158            wlist += w
159            xlist += x
160            for waitable in r:
161                waitable_to_event[('r', waitable)] = event
162            for waitable in w:
163                waitable_to_event[('w', waitable)] = event
164            for waitable in x:
165                waitable_to_event[('x', waitable)] = event
166
167    # If we have a any sleeping threads, determine how long to sleep.
168    if earliest_wakeup:
169        timeout = max(earliest_wakeup - time.time(), 0.0)
170    else:
171        timeout = None
172
173    # Perform select() if we have any waitables.
174    if rlist or wlist or xlist:
175        rready, wready, xready = select.select(rlist, wlist, xlist, timeout)
176    else:
177        rready, wready, xready = (), (), ()
178        if timeout:
179            time.sleep(timeout)
180
181    # Gather ready events corresponding to the ready waitables.
182    ready_events = set()
183    for ready in rready:
184        ready_events.add(waitable_to_event[('r', ready)])
185    for ready in wready:
186        ready_events.add(waitable_to_event[('w', ready)])
187    for ready in xready:
188        ready_events.add(waitable_to_event[('x', ready)])
189
190    # Gather any finished sleeps.
191    for event in events:
192        if isinstance(event, SleepEvent) and event.time_left() == 0.0:
193            ready_events.add(event)
194
195    return ready_events
196
197
198class ThreadException(Exception):
199    def __init__(self, coro, exc_info):
200        self.coro = coro
201        self.exc_info = exc_info
202
203    def reraise(self):
204        six.reraise(self.exc_info[0], self.exc_info[1], self.exc_info[2])
205
206
207SUSPENDED = Event()  # Special sentinel placeholder for suspended threads.
208
209
210class Delegated(Event):
211    """Placeholder indicating that a thread has delegated execution to a
212    different thread.
213    """
214    def __init__(self, child):
215        self.child = child
216
217
218def run(root_coro):
219    """Schedules a coroutine, running it to completion. This
220    encapsulates the Bluelet scheduler, which the root coroutine can
221    add to by spawning new coroutines.
222    """
223    # The "threads" dictionary keeps track of all the currently-
224    # executing and suspended coroutines. It maps coroutines to their
225    # currently "blocking" event. The event value may be SUSPENDED if
226    # the coroutine is waiting on some other condition: namely, a
227    # delegated coroutine or a joined coroutine. In this case, the
228    # coroutine should *also* appear as a value in one of the below
229    # dictionaries `delegators` or `joiners`.
230    threads = {root_coro: ValueEvent(None)}
231
232    # Maps child coroutines to delegating parents.
233    delegators = {}
234
235    # Maps child coroutines to joining (exit-waiting) parents.
236    joiners = collections.defaultdict(list)
237
238    def complete_thread(coro, return_value):
239        """Remove a coroutine from the scheduling pool, awaking
240        delegators and joiners as necessary and returning the specified
241        value to any delegating parent.
242        """
243        del threads[coro]
244
245        # Resume delegator.
246        if coro in delegators:
247            threads[delegators[coro]] = ValueEvent(return_value)
248            del delegators[coro]
249
250        # Resume joiners.
251        if coro in joiners:
252            for parent in joiners[coro]:
253                threads[parent] = ValueEvent(None)
254            del joiners[coro]
255
256    def advance_thread(coro, value, is_exc=False):
257        """After an event is fired, run a given coroutine associated with
258        it in the threads dict until it yields again. If the coroutine
259        exits, then the thread is removed from the pool. If the coroutine
260        raises an exception, it is reraised in a ThreadException. If
261        is_exc is True, then the value must be an exc_info tuple and the
262        exception is thrown into the coroutine.
263        """
264        try:
265            if is_exc:
266                next_event = coro.throw(*value)
267            else:
268                next_event = coro.send(value)
269        except StopIteration:
270            # Thread is done.
271            complete_thread(coro, None)
272        except BaseException:
273            # Thread raised some other exception.
274            del threads[coro]
275            raise ThreadException(coro, sys.exc_info())
276        else:
277            if isinstance(next_event, types.GeneratorType):
278                # Automatically invoke sub-coroutines. (Shorthand for
279                # explicit bluelet.call().)
280                next_event = DelegationEvent(next_event)
281            threads[coro] = next_event
282
283    def kill_thread(coro):
284        """Unschedule this thread and its (recursive) delegates.
285        """
286        # Collect all coroutines in the delegation stack.
287        coros = [coro]
288        while isinstance(threads[coro], Delegated):
289            coro = threads[coro].child
290            coros.append(coro)
291
292        # Complete each coroutine from the top to the bottom of the
293        # stack.
294        for coro in reversed(coros):
295            complete_thread(coro, None)
296
297    # Continue advancing threads until root thread exits.
298    exit_te = None
299    while threads:
300        try:
301            # Look for events that can be run immediately. Continue
302            # running immediate events until nothing is ready.
303            while True:
304                have_ready = False
305                for coro, event in list(threads.items()):
306                    if isinstance(event, SpawnEvent):
307                        threads[event.spawned] = ValueEvent(None)  # Spawn.
308                        advance_thread(coro, None)
309                        have_ready = True
310                    elif isinstance(event, ValueEvent):
311                        advance_thread(coro, event.value)
312                        have_ready = True
313                    elif isinstance(event, ExceptionEvent):
314                        advance_thread(coro, event.exc_info, True)
315                        have_ready = True
316                    elif isinstance(event, DelegationEvent):
317                        threads[coro] = Delegated(event.spawned)  # Suspend.
318                        threads[event.spawned] = ValueEvent(None)  # Spawn.
319                        delegators[event.spawned] = coro
320                        have_ready = True
321                    elif isinstance(event, ReturnEvent):
322                        # Thread is done.
323                        complete_thread(coro, event.value)
324                        have_ready = True
325                    elif isinstance(event, JoinEvent):
326                        threads[coro] = SUSPENDED  # Suspend.
327                        joiners[event.child].append(coro)
328                        have_ready = True
329                    elif isinstance(event, KillEvent):
330                        threads[coro] = ValueEvent(None)
331                        kill_thread(event.child)
332                        have_ready = True
333
334                # Only start the select when nothing else is ready.
335                if not have_ready:
336                    break
337
338            # Wait and fire.
339            event2coro = dict((v, k) for k, v in threads.items())
340            for event in _event_select(threads.values()):
341                # Run the IO operation, but catch socket errors.
342                try:
343                    value = event.fire()
344                except socket.error as exc:
345                    if isinstance(exc.args, tuple) and \
346                            exc.args[0] == errno.EPIPE:
347                        # Broken pipe. Remote host disconnected.
348                        pass
349                    else:
350                        traceback.print_exc()
351                    # Abort the coroutine.
352                    threads[event2coro[event]] = ReturnEvent(None)
353                else:
354                    advance_thread(event2coro[event], value)
355
356        except ThreadException as te:
357            # Exception raised from inside a thread.
358            event = ExceptionEvent(te.exc_info)
359            if te.coro in delegators:
360                # The thread is a delegate. Raise exception in its
361                # delegator.
362                threads[delegators[te.coro]] = event
363                del delegators[te.coro]
364            else:
365                # The thread is root-level. Raise in client code.
366                exit_te = te
367                break
368
369        except BaseException:
370            # For instance, KeyboardInterrupt during select(). Raise
371            # into root thread and terminate others.
372            threads = {root_coro: ExceptionEvent(sys.exc_info())}
373
374    # If any threads still remain, kill them.
375    for coro in threads:
376        coro.close()
377
378    # If we're exiting with an exception, raise it in the client.
379    if exit_te:
380        exit_te.reraise()
381
382
383# Sockets and their associated events.
384
385class SocketClosedError(Exception):
386    pass
387
388
389class Listener(object):
390    """A socket wrapper object for listening sockets.
391    """
392    def __init__(self, host, port):
393        """Create a listening socket on the given hostname and port.
394        """
395        self._closed = False
396        self.host = host
397        self.port = port
398        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
399        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
400        self.sock.bind((host, port))
401        self.sock.listen(5)
402
403    def accept(self):
404        """An event that waits for a connection on the listening socket.
405        When a connection is made, the event returns a Connection
406        object.
407        """
408        if self._closed:
409            raise SocketClosedError()
410        return AcceptEvent(self)
411
412    def close(self):
413        """Immediately close the listening socket. (Not an event.)
414        """
415        self._closed = True
416        self.sock.close()
417
418
419class Connection(object):
420    """A socket wrapper object for connected sockets.
421    """
422    def __init__(self, sock, addr):
423        self.sock = sock
424        self.addr = addr
425        self._buf = b''
426        self._closed = False
427
428    def close(self):
429        """Close the connection."""
430        self._closed = True
431        self.sock.close()
432
433    def recv(self, size):
434        """Read at most size bytes of data from the socket."""
435        if self._closed:
436            raise SocketClosedError()
437
438        if self._buf:
439            # We already have data read previously.
440            out = self._buf[:size]
441            self._buf = self._buf[size:]
442            return ValueEvent(out)
443        else:
444            return ReceiveEvent(self, size)
445
446    def send(self, data):
447        """Sends data on the socket, returning the number of bytes
448        successfully sent.
449        """
450        if self._closed:
451            raise SocketClosedError()
452        return SendEvent(self, data)
453
454    def sendall(self, data):
455        """Send all of data on the socket."""
456        if self._closed:
457            raise SocketClosedError()
458        return SendEvent(self, data, True)
459
460    def readline(self, terminator=b"\n", bufsize=1024):
461        """Reads a line (delimited by terminator) from the socket."""
462        if self._closed:
463            raise SocketClosedError()
464
465        while True:
466            if terminator in self._buf:
467                line, self._buf = self._buf.split(terminator, 1)
468                line += terminator
469                yield ReturnEvent(line)
470                break
471            data = yield ReceiveEvent(self, bufsize)
472            if data:
473                self._buf += data
474            else:
475                line = self._buf
476                self._buf = b''
477                yield ReturnEvent(line)
478                break
479
480
481class AcceptEvent(WaitableEvent):
482    """An event for Listener objects (listening sockets) that suspends
483    execution until the socket gets a connection.
484    """
485    def __init__(self, listener):
486        self.listener = listener
487
488    def waitables(self):
489        return (self.listener.sock,), (), ()
490
491    def fire(self):
492        sock, addr = self.listener.sock.accept()
493        return Connection(sock, addr)
494
495
496class ReceiveEvent(WaitableEvent):
497    """An event for Connection objects (connected sockets) for
498    asynchronously reading data.
499    """
500    def __init__(self, conn, bufsize):
501        self.conn = conn
502        self.bufsize = bufsize
503
504    def waitables(self):
505        return (self.conn.sock,), (), ()
506
507    def fire(self):
508        return self.conn.sock.recv(self.bufsize)
509
510
511class SendEvent(WaitableEvent):
512    """An event for Connection objects (connected sockets) for
513    asynchronously writing data.
514    """
515    def __init__(self, conn, data, sendall=False):
516        self.conn = conn
517        self.data = data
518        self.sendall = sendall
519
520    def waitables(self):
521        return (), (self.conn.sock,), ()
522
523    def fire(self):
524        if self.sendall:
525            return self.conn.sock.sendall(self.data)
526        else:
527            return self.conn.sock.send(self.data)
528
529
530# Public interface for threads; each returns an event object that
531# can immediately be "yield"ed.
532
533def null():
534    """Event: yield to the scheduler without doing anything special.
535    """
536    return ValueEvent(None)
537
538
539def spawn(coro):
540    """Event: add another coroutine to the scheduler. Both the parent
541    and child coroutines run concurrently.
542    """
543    if not isinstance(coro, types.GeneratorType):
544        raise ValueError(u'%s is not a coroutine' % coro)
545    return SpawnEvent(coro)
546
547
548def call(coro):
549    """Event: delegate to another coroutine. The current coroutine
550    is resumed once the sub-coroutine finishes. If the sub-coroutine
551    returns a value using end(), then this event returns that value.
552    """
553    if not isinstance(coro, types.GeneratorType):
554        raise ValueError(u'%s is not a coroutine' % coro)
555    return DelegationEvent(coro)
556
557
558def end(value=None):
559    """Event: ends the coroutine and returns a value to its
560    delegator.
561    """
562    return ReturnEvent(value)
563
564
565def read(fd, bufsize=None):
566    """Event: read from a file descriptor asynchronously."""
567    if bufsize is None:
568        # Read all.
569        def reader():
570            buf = []
571            while True:
572                data = yield read(fd, 1024)
573                if not data:
574                    break
575                buf.append(data)
576            yield ReturnEvent(''.join(buf))
577        return DelegationEvent(reader())
578
579    else:
580        return ReadEvent(fd, bufsize)
581
582
583def write(fd, data):
584    """Event: write to a file descriptor asynchronously."""
585    return WriteEvent(fd, data)
586
587
588def connect(host, port):
589    """Event: connect to a network address and return a Connection
590    object for communicating on the socket.
591    """
592    addr = (host, port)
593    sock = socket.create_connection(addr)
594    return ValueEvent(Connection(sock, addr))
595
596
597def sleep(duration):
598    """Event: suspend the thread for ``duration`` seconds.
599    """
600    return SleepEvent(duration)
601
602
603def join(coro):
604    """Suspend the thread until another, previously `spawn`ed thread
605    completes.
606    """
607    return JoinEvent(coro)
608
609
610def kill(coro):
611    """Halt the execution of a different `spawn`ed thread.
612    """
613    return KillEvent(coro)
614
615
616# Convenience function for running socket servers.
617
618def server(host, port, func):
619    """A coroutine that runs a network server. Host and port specify the
620    listening address. func should be a coroutine that takes a single
621    parameter, a Connection object. The coroutine is invoked for every
622    incoming connection on the listening socket.
623    """
624    def handler(conn):
625        try:
626            yield func(conn)
627        finally:
628            conn.close()
629
630    listener = Listener(host, port)
631    try:
632        while True:
633            conn = yield listener.accept()
634            yield spawn(handler(conn))
635    except KeyboardInterrupt:
636        pass
637    finally:
638        listener.close()
639