1""" Monkey patching asyncio.StreamReader to add readuntil() from Python 3.5.2""" 2import asyncio 3 4 5class IncompleteReadError(EOFError): 6 """ 7 Incomplete read error. Attributes: 8 9 - partial: read bytes string before the end of stream was reached 10 - expected: total number of expected bytes (or None if unknown) 11 """ 12 def __init__(self, partial, expected): 13 super().__init__("%d bytes read on a total of %r expected bytes" 14 % (len(partial), expected)) 15 self.partial = partial 16 self.expected = expected 17 18 19class LimitOverrunError(Exception): 20 """Reached the buffer limit while looking for a separator. 21 22 Attributes: 23 - consumed: total number of to be consumed bytes. 24 """ 25 def __init__(self, message, consumed): 26 super().__init__(message) 27 self.consumed = consumed 28 29 30@asyncio.coroutine 31def _wait_for_data(self, func_name): 32 """Wait until feed_data() or feed_eof() is called. 33 34 If stream was paused, automatically resume it. 35 """ 36 # StreamReader uses a future to link the protocol feed_data() method 37 # to a read coroutine. Running two read coroutines at the same time 38 # would have an unexpected behaviour. It would not possible to know 39 # which coroutine would get the next data. 40 if self._waiter is not None: 41 raise RuntimeError('%s() called while another coroutine is ' 42 'already waiting for incoming data' % func_name) 43 44 assert not self._eof, '_wait_for_data after EOF' 45 46 # Waiting for data while paused will make deadlock, so prevent it. 47 if self._paused: 48 self._paused = False 49 self._transport.resume_reading() 50 51 self._waiter = asyncio.futures.Future(loop=self._loop) 52 try: 53 yield from self._waiter 54 finally: 55 self._waiter = None 56 57 58@asyncio.coroutine 59def readuntil(self, separator=b'\n'): 60 """Read data from the stream until ``separator`` is found. 61 62 On success, the data and separator will be removed from the 63 internal buffer (consumed). Returned data will include the 64 separator at the end. 65 66 Configured stream limit is used to check result. Limit sets the 67 maximal length of data that can be returned, not counting the 68 separator. 69 70 If an EOF occurs and the complete separator is still not found, 71 an IncompleteReadError exception will be raised, and the internal 72 buffer will be reset. The IncompleteReadError.partial attribute 73 may contain the separator partially. 74 75 If the data cannot be read because of over limit, a 76 LimitOverrunError exception will be raised, and the data 77 will be left in the internal buffer, so it can be read again. 78 """ 79 seplen = len(separator) 80 if seplen == 0: 81 raise ValueError('Separator should be at least one-byte string') 82 83 if self._exception is not None: 84 raise self._exception 85 86 # Consume whole buffer except last bytes, which length is 87 # one less than seplen. Let's check corner cases with 88 # separator='SEPARATOR': 89 # * we have received almost complete separator (without last 90 # byte). i.e buffer='some textSEPARATO'. In this case we 91 # can safely consume len(separator) - 1 bytes. 92 # * last byte of buffer is first byte of separator, i.e. 93 # buffer='abcdefghijklmnopqrS'. We may safely consume 94 # everything except that last byte, but this require to 95 # analyze bytes of buffer that match partial separator. 96 # This is slow and/or require FSM. For this case our 97 # implementation is not optimal, since require rescanning 98 # of data that is known to not belong to separator. In 99 # real world, separator will not be so long to notice 100 # performance problems. Even when reading MIME-encoded 101 # messages :) 102 103 # `offset` is the number of bytes from the beginning of the buffer 104 # where there is no occurrence of `separator`. 105 offset = 0 106 107 # Loop until we find `separator` in the buffer, exceed the buffer size, 108 # or an EOF has happened. 109 while True: 110 buflen = len(self._buffer) 111 112 # Check if we now have enough data in the buffer for `separator` to 113 # fit. 114 if buflen - offset >= seplen: 115 isep = self._buffer.find(separator, offset) 116 117 if isep != -1: 118 # `separator` is in the buffer. `isep` will be used later 119 # to retrieve the data. 120 break 121 122 # see upper comment for explanation. 123 offset = buflen + 1 - seplen 124 if offset > self._limit: 125 raise LimitOverrunError( 126 'Separator is not found, and chunk exceed the limit', 127 offset) 128 129 # Complete message (with full separator) may be present in buffer 130 # even when EOF flag is set. This may happen when the last chunk 131 # adds data which makes separator be found. That's why we check for 132 # EOF *ater* inspecting the buffer. 133 if self._eof: 134 chunk = bytes(self._buffer) 135 self._buffer.clear() 136 raise IncompleteReadError(chunk, None) 137 138 # _wait_for_data() will resume reading if stream was paused. 139 yield from self._wait_for_data('readuntil') 140 141 if isep > self._limit: 142 raise LimitOverrunError( 143 'Separator is found, but chunk is longer than limit', isep) 144 145 chunk = self._buffer[:isep + seplen] 146 del self._buffer[:isep + seplen] 147 self._maybe_resume_transport() 148 return bytes(chunk) 149