1# -*- coding: utf-8 -*- 2 3"""Extremely simple pure-Python implementation of coroutine-style 4asynchronous socket I/O. Inspired by, but inferior to, Eventlet. 5Bluelet can also be thought of as a less-terrible replacement for 6asyncore. 7 8Bluelet: easy concurrency without all the messy parallelism. 9""" 10from __future__ import division, absolute_import, print_function 11 12import six 13import socket 14import select 15import sys 16import types 17import errno 18import traceback 19import time 20import collections 21 22 23# Basic events used for thread scheduling. 24 25class Event(object): 26 """Just a base class identifying Bluelet events. An event is an 27 object yielded from a Bluelet thread coroutine to suspend operation 28 and communicate with the scheduler. 29 """ 30 pass 31 32 33class WaitableEvent(Event): 34 """A waitable event is one encapsulating an action that can be 35 waited for using a select() call. That is, it's an event with an 36 associated file descriptor. 37 """ 38 def waitables(self): 39 """Return "waitable" objects to pass to select(). Should return 40 three iterables for input readiness, output readiness, and 41 exceptional conditions (i.e., the three lists passed to 42 select()). 43 """ 44 return (), (), () 45 46 def fire(self): 47 """Called when an associated file descriptor becomes ready 48 (i.e., is returned from a select() call). 49 """ 50 pass 51 52 53class ValueEvent(Event): 54 """An event that does nothing but return a fixed value.""" 55 def __init__(self, value): 56 self.value = value 57 58 59class ExceptionEvent(Event): 60 """Raise an exception at the yield point. Used internally.""" 61 def __init__(self, exc_info): 62 self.exc_info = exc_info 63 64 65class SpawnEvent(Event): 66 """Add a new coroutine thread to the scheduler.""" 67 def __init__(self, coro): 68 self.spawned = coro 69 70 71class JoinEvent(Event): 72 """Suspend the thread until the specified child thread has 73 completed. 74 """ 75 def __init__(self, child): 76 self.child = child 77 78 79class KillEvent(Event): 80 """Unschedule a child thread.""" 81 def __init__(self, child): 82 self.child = child 83 84 85class DelegationEvent(Event): 86 """Suspend execution of the current thread, start a new thread and, 87 once the child thread finished, return control to the parent 88 thread. 89 """ 90 def __init__(self, coro): 91 self.spawned = coro 92 93 94class ReturnEvent(Event): 95 """Return a value the current thread's delegator at the point of 96 delegation. Ends the current (delegate) thread. 97 """ 98 def __init__(self, value): 99 self.value = value 100 101 102class SleepEvent(WaitableEvent): 103 """Suspend the thread for a given duration. 104 """ 105 def __init__(self, duration): 106 self.wakeup_time = time.time() + duration 107 108 def time_left(self): 109 return max(self.wakeup_time - time.time(), 0.0) 110 111 112class ReadEvent(WaitableEvent): 113 """Reads from a file-like object.""" 114 def __init__(self, fd, bufsize): 115 self.fd = fd 116 self.bufsize = bufsize 117 118 def waitables(self): 119 return (self.fd,), (), () 120 121 def fire(self): 122 return self.fd.read(self.bufsize) 123 124 125class WriteEvent(WaitableEvent): 126 """Writes to a file-like object.""" 127 def __init__(self, fd, data): 128 self.fd = fd 129 self.data = data 130 131 def waitable(self): 132 return (), (self.fd,), () 133 134 def fire(self): 135 self.fd.write(self.data) 136 137 138# Core logic for executing and scheduling threads. 139 140def _event_select(events): 141 """Perform a select() over all the Events provided, returning the 142 ones ready to be fired. Only WaitableEvents (including SleepEvents) 143 matter here; all other events are ignored (and thus postponed). 144 """ 145 # Gather waitables and wakeup times. 146 waitable_to_event = {} 147 rlist, wlist, xlist = [], [], [] 148 earliest_wakeup = None 149 for event in events: 150 if isinstance(event, SleepEvent): 151 if not earliest_wakeup: 152 earliest_wakeup = event.wakeup_time 153 else: 154 earliest_wakeup = min(earliest_wakeup, event.wakeup_time) 155 elif isinstance(event, WaitableEvent): 156 r, w, x = event.waitables() 157 rlist += r 158 wlist += w 159 xlist += x 160 for waitable in r: 161 waitable_to_event[('r', waitable)] = event 162 for waitable in w: 163 waitable_to_event[('w', waitable)] = event 164 for waitable in x: 165 waitable_to_event[('x', waitable)] = event 166 167 # If we have a any sleeping threads, determine how long to sleep. 168 if earliest_wakeup: 169 timeout = max(earliest_wakeup - time.time(), 0.0) 170 else: 171 timeout = None 172 173 # Perform select() if we have any waitables. 174 if rlist or wlist or xlist: 175 rready, wready, xready = select.select(rlist, wlist, xlist, timeout) 176 else: 177 rready, wready, xready = (), (), () 178 if timeout: 179 time.sleep(timeout) 180 181 # Gather ready events corresponding to the ready waitables. 182 ready_events = set() 183 for ready in rready: 184 ready_events.add(waitable_to_event[('r', ready)]) 185 for ready in wready: 186 ready_events.add(waitable_to_event[('w', ready)]) 187 for ready in xready: 188 ready_events.add(waitable_to_event[('x', ready)]) 189 190 # Gather any finished sleeps. 191 for event in events: 192 if isinstance(event, SleepEvent) and event.time_left() == 0.0: 193 ready_events.add(event) 194 195 return ready_events 196 197 198class ThreadException(Exception): 199 def __init__(self, coro, exc_info): 200 self.coro = coro 201 self.exc_info = exc_info 202 203 def reraise(self): 204 six.reraise(self.exc_info[0], self.exc_info[1], self.exc_info[2]) 205 206 207SUSPENDED = Event() # Special sentinel placeholder for suspended threads. 208 209 210class Delegated(Event): 211 """Placeholder indicating that a thread has delegated execution to a 212 different thread. 213 """ 214 def __init__(self, child): 215 self.child = child 216 217 218def run(root_coro): 219 """Schedules a coroutine, running it to completion. This 220 encapsulates the Bluelet scheduler, which the root coroutine can 221 add to by spawning new coroutines. 222 """ 223 # The "threads" dictionary keeps track of all the currently- 224 # executing and suspended coroutines. It maps coroutines to their 225 # currently "blocking" event. The event value may be SUSPENDED if 226 # the coroutine is waiting on some other condition: namely, a 227 # delegated coroutine or a joined coroutine. In this case, the 228 # coroutine should *also* appear as a value in one of the below 229 # dictionaries `delegators` or `joiners`. 230 threads = {root_coro: ValueEvent(None)} 231 232 # Maps child coroutines to delegating parents. 233 delegators = {} 234 235 # Maps child coroutines to joining (exit-waiting) parents. 236 joiners = collections.defaultdict(list) 237 238 def complete_thread(coro, return_value): 239 """Remove a coroutine from the scheduling pool, awaking 240 delegators and joiners as necessary and returning the specified 241 value to any delegating parent. 242 """ 243 del threads[coro] 244 245 # Resume delegator. 246 if coro in delegators: 247 threads[delegators[coro]] = ValueEvent(return_value) 248 del delegators[coro] 249 250 # Resume joiners. 251 if coro in joiners: 252 for parent in joiners[coro]: 253 threads[parent] = ValueEvent(None) 254 del joiners[coro] 255 256 def advance_thread(coro, value, is_exc=False): 257 """After an event is fired, run a given coroutine associated with 258 it in the threads dict until it yields again. If the coroutine 259 exits, then the thread is removed from the pool. If the coroutine 260 raises an exception, it is reraised in a ThreadException. If 261 is_exc is True, then the value must be an exc_info tuple and the 262 exception is thrown into the coroutine. 263 """ 264 try: 265 if is_exc: 266 next_event = coro.throw(*value) 267 else: 268 next_event = coro.send(value) 269 except StopIteration: 270 # Thread is done. 271 complete_thread(coro, None) 272 except BaseException: 273 # Thread raised some other exception. 274 del threads[coro] 275 raise ThreadException(coro, sys.exc_info()) 276 else: 277 if isinstance(next_event, types.GeneratorType): 278 # Automatically invoke sub-coroutines. (Shorthand for 279 # explicit bluelet.call().) 280 next_event = DelegationEvent(next_event) 281 threads[coro] = next_event 282 283 def kill_thread(coro): 284 """Unschedule this thread and its (recursive) delegates. 285 """ 286 # Collect all coroutines in the delegation stack. 287 coros = [coro] 288 while isinstance(threads[coro], Delegated): 289 coro = threads[coro].child 290 coros.append(coro) 291 292 # Complete each coroutine from the top to the bottom of the 293 # stack. 294 for coro in reversed(coros): 295 complete_thread(coro, None) 296 297 # Continue advancing threads until root thread exits. 298 exit_te = None 299 while threads: 300 try: 301 # Look for events that can be run immediately. Continue 302 # running immediate events until nothing is ready. 303 while True: 304 have_ready = False 305 for coro, event in list(threads.items()): 306 if isinstance(event, SpawnEvent): 307 threads[event.spawned] = ValueEvent(None) # Spawn. 308 advance_thread(coro, None) 309 have_ready = True 310 elif isinstance(event, ValueEvent): 311 advance_thread(coro, event.value) 312 have_ready = True 313 elif isinstance(event, ExceptionEvent): 314 advance_thread(coro, event.exc_info, True) 315 have_ready = True 316 elif isinstance(event, DelegationEvent): 317 threads[coro] = Delegated(event.spawned) # Suspend. 318 threads[event.spawned] = ValueEvent(None) # Spawn. 319 delegators[event.spawned] = coro 320 have_ready = True 321 elif isinstance(event, ReturnEvent): 322 # Thread is done. 323 complete_thread(coro, event.value) 324 have_ready = True 325 elif isinstance(event, JoinEvent): 326 threads[coro] = SUSPENDED # Suspend. 327 joiners[event.child].append(coro) 328 have_ready = True 329 elif isinstance(event, KillEvent): 330 threads[coro] = ValueEvent(None) 331 kill_thread(event.child) 332 have_ready = True 333 334 # Only start the select when nothing else is ready. 335 if not have_ready: 336 break 337 338 # Wait and fire. 339 event2coro = dict((v, k) for k, v in threads.items()) 340 for event in _event_select(threads.values()): 341 # Run the IO operation, but catch socket errors. 342 try: 343 value = event.fire() 344 except socket.error as exc: 345 if isinstance(exc.args, tuple) and \ 346 exc.args[0] == errno.EPIPE: 347 # Broken pipe. Remote host disconnected. 348 pass 349 else: 350 traceback.print_exc() 351 # Abort the coroutine. 352 threads[event2coro[event]] = ReturnEvent(None) 353 else: 354 advance_thread(event2coro[event], value) 355 356 except ThreadException as te: 357 # Exception raised from inside a thread. 358 event = ExceptionEvent(te.exc_info) 359 if te.coro in delegators: 360 # The thread is a delegate. Raise exception in its 361 # delegator. 362 threads[delegators[te.coro]] = event 363 del delegators[te.coro] 364 else: 365 # The thread is root-level. Raise in client code. 366 exit_te = te 367 break 368 369 except BaseException: 370 # For instance, KeyboardInterrupt during select(). Raise 371 # into root thread and terminate others. 372 threads = {root_coro: ExceptionEvent(sys.exc_info())} 373 374 # If any threads still remain, kill them. 375 for coro in threads: 376 coro.close() 377 378 # If we're exiting with an exception, raise it in the client. 379 if exit_te: 380 exit_te.reraise() 381 382 383# Sockets and their associated events. 384 385class SocketClosedError(Exception): 386 pass 387 388 389class Listener(object): 390 """A socket wrapper object for listening sockets. 391 """ 392 def __init__(self, host, port): 393 """Create a listening socket on the given hostname and port. 394 """ 395 self._closed = False 396 self.host = host 397 self.port = port 398 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 399 self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 400 self.sock.bind((host, port)) 401 self.sock.listen(5) 402 403 def accept(self): 404 """An event that waits for a connection on the listening socket. 405 When a connection is made, the event returns a Connection 406 object. 407 """ 408 if self._closed: 409 raise SocketClosedError() 410 return AcceptEvent(self) 411 412 def close(self): 413 """Immediately close the listening socket. (Not an event.) 414 """ 415 self._closed = True 416 self.sock.close() 417 418 419class Connection(object): 420 """A socket wrapper object for connected sockets. 421 """ 422 def __init__(self, sock, addr): 423 self.sock = sock 424 self.addr = addr 425 self._buf = b'' 426 self._closed = False 427 428 def close(self): 429 """Close the connection.""" 430 self._closed = True 431 self.sock.close() 432 433 def recv(self, size): 434 """Read at most size bytes of data from the socket.""" 435 if self._closed: 436 raise SocketClosedError() 437 438 if self._buf: 439 # We already have data read previously. 440 out = self._buf[:size] 441 self._buf = self._buf[size:] 442 return ValueEvent(out) 443 else: 444 return ReceiveEvent(self, size) 445 446 def send(self, data): 447 """Sends data on the socket, returning the number of bytes 448 successfully sent. 449 """ 450 if self._closed: 451 raise SocketClosedError() 452 return SendEvent(self, data) 453 454 def sendall(self, data): 455 """Send all of data on the socket.""" 456 if self._closed: 457 raise SocketClosedError() 458 return SendEvent(self, data, True) 459 460 def readline(self, terminator=b"\n", bufsize=1024): 461 """Reads a line (delimited by terminator) from the socket.""" 462 if self._closed: 463 raise SocketClosedError() 464 465 while True: 466 if terminator in self._buf: 467 line, self._buf = self._buf.split(terminator, 1) 468 line += terminator 469 yield ReturnEvent(line) 470 break 471 data = yield ReceiveEvent(self, bufsize) 472 if data: 473 self._buf += data 474 else: 475 line = self._buf 476 self._buf = b'' 477 yield ReturnEvent(line) 478 break 479 480 481class AcceptEvent(WaitableEvent): 482 """An event for Listener objects (listening sockets) that suspends 483 execution until the socket gets a connection. 484 """ 485 def __init__(self, listener): 486 self.listener = listener 487 488 def waitables(self): 489 return (self.listener.sock,), (), () 490 491 def fire(self): 492 sock, addr = self.listener.sock.accept() 493 return Connection(sock, addr) 494 495 496class ReceiveEvent(WaitableEvent): 497 """An event for Connection objects (connected sockets) for 498 asynchronously reading data. 499 """ 500 def __init__(self, conn, bufsize): 501 self.conn = conn 502 self.bufsize = bufsize 503 504 def waitables(self): 505 return (self.conn.sock,), (), () 506 507 def fire(self): 508 return self.conn.sock.recv(self.bufsize) 509 510 511class SendEvent(WaitableEvent): 512 """An event for Connection objects (connected sockets) for 513 asynchronously writing data. 514 """ 515 def __init__(self, conn, data, sendall=False): 516 self.conn = conn 517 self.data = data 518 self.sendall = sendall 519 520 def waitables(self): 521 return (), (self.conn.sock,), () 522 523 def fire(self): 524 if self.sendall: 525 return self.conn.sock.sendall(self.data) 526 else: 527 return self.conn.sock.send(self.data) 528 529 530# Public interface for threads; each returns an event object that 531# can immediately be "yield"ed. 532 533def null(): 534 """Event: yield to the scheduler without doing anything special. 535 """ 536 return ValueEvent(None) 537 538 539def spawn(coro): 540 """Event: add another coroutine to the scheduler. Both the parent 541 and child coroutines run concurrently. 542 """ 543 if not isinstance(coro, types.GeneratorType): 544 raise ValueError(u'%s is not a coroutine' % coro) 545 return SpawnEvent(coro) 546 547 548def call(coro): 549 """Event: delegate to another coroutine. The current coroutine 550 is resumed once the sub-coroutine finishes. If the sub-coroutine 551 returns a value using end(), then this event returns that value. 552 """ 553 if not isinstance(coro, types.GeneratorType): 554 raise ValueError(u'%s is not a coroutine' % coro) 555 return DelegationEvent(coro) 556 557 558def end(value=None): 559 """Event: ends the coroutine and returns a value to its 560 delegator. 561 """ 562 return ReturnEvent(value) 563 564 565def read(fd, bufsize=None): 566 """Event: read from a file descriptor asynchronously.""" 567 if bufsize is None: 568 # Read all. 569 def reader(): 570 buf = [] 571 while True: 572 data = yield read(fd, 1024) 573 if not data: 574 break 575 buf.append(data) 576 yield ReturnEvent(''.join(buf)) 577 return DelegationEvent(reader()) 578 579 else: 580 return ReadEvent(fd, bufsize) 581 582 583def write(fd, data): 584 """Event: write to a file descriptor asynchronously.""" 585 return WriteEvent(fd, data) 586 587 588def connect(host, port): 589 """Event: connect to a network address and return a Connection 590 object for communicating on the socket. 591 """ 592 addr = (host, port) 593 sock = socket.create_connection(addr) 594 return ValueEvent(Connection(sock, addr)) 595 596 597def sleep(duration): 598 """Event: suspend the thread for ``duration`` seconds. 599 """ 600 return SleepEvent(duration) 601 602 603def join(coro): 604 """Suspend the thread until another, previously `spawn`ed thread 605 completes. 606 """ 607 return JoinEvent(coro) 608 609 610def kill(coro): 611 """Halt the execution of a different `spawn`ed thread. 612 """ 613 return KillEvent(coro) 614 615 616# Convenience function for running socket servers. 617 618def server(host, port, func): 619 """A coroutine that runs a network server. Host and port specify the 620 listening address. func should be a coroutine that takes a single 621 parameter, a Connection object. The coroutine is invoked for every 622 incoming connection on the listening socket. 623 """ 624 def handler(conn): 625 try: 626 yield func(conn) 627 finally: 628 conn.close() 629 630 listener = Listener(host, port) 631 try: 632 while True: 633 conn = yield listener.accept() 634 yield spawn(handler(conn)) 635 except KeyboardInterrupt: 636 pass 637 finally: 638 listener.close() 639