1# -*- coding: utf-8 -*- 2""" 3hyper/http20/stream 4~~~~~~~~~~~~~~~~~~~ 5 6Objects that make up the stream-level abstraction of hyper's HTTP/2 support. 7 8These objects are not expected to be part of the public HTTP/2 API: they're 9intended purely for use inside hyper's HTTP/2 abstraction. 10 11Conceptually, a single HTTP/2 connection is made up of many streams: each 12stream is an independent, bi-directional sequence of HTTP headers and data. 13Each stream is identified by a monotonically increasing integer, assigned to 14the stream by the endpoint that initiated the stream. 15""" 16from ..h2 import exceptions as h2Exceptions 17 18from ..common.headers import HTTPHeaderMap 19from .util import h2_safe_headers 20import logging 21 22log = logging.getLogger(__name__) 23 24# Define the largest chunk of data we'll send in one go. Realistically, we 25# should take the MSS into account but that's pretty dull, so let's just say 26# 1kB and call it a day. 27MAX_CHUNK = 1024 28 29 30class Stream(object): 31 """ 32 A single HTTP/2 stream. 33 34 A stream is an independent, bi-directional sequence of HTTP headers and 35 data. Each stream is identified by a single integer. From a HTTP 36 perspective, a stream _approximately_ matches a single request-response 37 pair. 38 """ 39 def __init__(self, 40 stream_id, 41 window_manager, 42 connection, 43 send_outstanding_data, 44 recv_cb, 45 close_cb): 46 self.stream_id = stream_id 47 self.headers = HTTPHeaderMap() 48 49 # Set to a key-value set of the response headers once their 50 # HEADERS..CONTINUATION frame sequence finishes. 51 self.response_headers = None 52 53 # Set to a key-value set of the response trailers once their 54 # HEADERS..CONTINUATION frame sequence finishes. 55 self.response_trailers = None 56 57 # A dict mapping the promised stream ID of a pushed resource to a 58 # key-value set of its request headers. Entries are added once their 59 # PUSH_PROMISE..CONTINUATION frame sequence finishes. 60 self.promised_headers = {} 61 62 # Unconsumed response data chunks. Empties after every call to _read(). 63 self.data = [] 64 65 # Whether the remote side has completed the stream. 66 self.remote_closed = False 67 68 # Whether we have closed the stream. 69 self.local_closed = False 70 71 # There are two flow control windows: one for data we're sending, 72 # one for data being sent to us. 73 self._in_window_manager = window_manager 74 75 # Save off a reference to the state machine wrapped with lock. 76 self._conn = connection 77 78 # Save off a data callback. 79 self._send_outstanding_data = send_outstanding_data 80 self._recv_cb = recv_cb 81 self._close_cb = close_cb 82 83 def add_header(self, name, value, replace=False): 84 """ 85 Adds a single HTTP header to the headers to be sent on the request. 86 """ 87 if not replace: 88 self.headers[name] = value 89 else: 90 self.headers.replace(name, value) 91 92 def send_headers(self, end_stream=False): 93 """ 94 Sends the complete saved header block on the stream. 95 """ 96 headers = self.get_headers() 97 with self._conn as conn: 98 conn.send_headers(self.stream_id, headers, end_stream) 99 self._send_outstanding_data() 100 101 if end_stream: 102 self.local_closed = True 103 104 def send_data(self, data, final): 105 """ 106 Send some data on the stream. If this is the end of the data to be 107 sent, the ``final`` flag _must_ be set to True. If no data is to be 108 sent, set ``data`` to ``None``. 109 """ 110 # Define a utility iterator for file objects. 111 def file_iterator(fobj): 112 while True: 113 data = fobj.read(MAX_CHUNK) 114 yield data 115 if len(data) < MAX_CHUNK: 116 break 117 118 # Build the appropriate iterator for the data, in chunks of CHUNK_SIZE. 119 if hasattr(data, 'read'): 120 chunks = file_iterator(data) 121 else: 122 chunks = (data[i:i+MAX_CHUNK] 123 for i in range(0, len(data), MAX_CHUNK)) 124 125 for chunk in chunks: 126 self._send_chunk(chunk, final) 127 128 def _read(self, amt=None): 129 """ 130 Read data from the stream. Unlike a normal read behaviour, this 131 function returns _at least_ ``amt`` data, but may return more. 132 """ 133 def listlen(list): 134 return sum(map(len, list)) 135 136 # Keep reading until the stream is closed or we get enough data. 137 while (not self.remote_closed and 138 (amt is None or listlen(self.data) < amt)): 139 self._recv_cb(stream_id=self.stream_id) 140 141 result = b''.join(self.data) 142 self.data = [] 143 return result 144 145 def _read_one_frame(self): 146 """ 147 Reads a single data frame from the stream and returns it. 148 """ 149 # Keep reading until the stream is closed or we have a data frame. 150 while not self.remote_closed and not self.data: 151 self._recv_cb(stream_id=self.stream_id) 152 153 try: 154 return self.data.pop(0) 155 except IndexError: 156 return None 157 158 def receive_response(self, event): 159 """ 160 Receive response headers. 161 """ 162 # TODO: If this is called while we're still sending data, we may want 163 # to stop sending that data and check the response. Early responses to 164 # big uploads are almost always a problem. 165 self.response_headers = HTTPHeaderMap(event.headers) 166 167 def receive_trailers(self, event): 168 """ 169 Receive response trailers. 170 """ 171 self.response_trailers = HTTPHeaderMap(event.headers) 172 173 def receive_push(self, event): 174 """ 175 Receive the request headers for a pushed stream. 176 """ 177 self.promised_headers[event.pushed_stream_id] = event.headers 178 179 def receive_data(self, event): 180 """ 181 Receive a chunk of data. 182 """ 183 size = event.flow_controlled_length 184 increment = self._in_window_manager._handle_frame(size) 185 186 # Append the data to the buffer. 187 self.data.append(event.data) 188 189 if increment: 190 try: 191 with self._conn as conn: 192 conn.increment_flow_control_window( 193 increment, stream_id=self.stream_id 194 ) 195 except h2Exceptions.StreamClosedError: 196 # We haven't got to it yet, but the stream is already 197 # closed. We don't need to increment the window in this 198 # case! 199 pass 200 else: 201 self._send_outstanding_data() 202 203 def receive_end_stream(self, event): 204 """ 205 All of the data is returned now. 206 """ 207 self.remote_closed = True 208 209 def receive_reset(self, event): 210 """ 211 Stream forcefully reset. 212 """ 213 self.remote_closed = True 214 self._close_cb(self.stream_id) 215 216 def get_headers(self): 217 """ 218 Provides the headers to the connection object. 219 """ 220 # Strip any headers invalid in H2. 221 return h2_safe_headers(self.headers) 222 223 def getheaders(self): 224 """ 225 Once all data has been sent on this connection, returns a key-value set 226 of the headers of the response to the original request. 227 """ 228 # Keep reading until all headers are received. 229 while self.response_headers is None: 230 self._recv_cb(stream_id=self.stream_id) 231 232 # Find the Content-Length header if present. 233 self._in_window_manager.document_size = ( 234 int(self.response_headers.get(b'content-length', [0])[0]) 235 ) 236 237 return self.response_headers 238 239 def gettrailers(self): 240 """ 241 Once all data has been sent on this connection, returns a key-value set 242 of the trailers of the response to the original request. 243 244 .. warning:: Note that this method requires that the stream is 245 totally exhausted. This means that, if you have not 246 completely read from the stream, all stream data will be 247 read into memory. 248 249 :returns: The key-value set of the trailers, or ``None`` if no trailers 250 were sent. 251 """ 252 # Keep reading until the stream is done. 253 while not self.remote_closed: 254 self._recv_cb(stream_id=self.stream_id) 255 256 return self.response_trailers 257 258 def get_pushes(self, capture_all=False): 259 """ 260 Returns a generator that yields push promises from the server. Note 261 that this method is not idempotent; promises returned in one call will 262 not be returned in subsequent calls. Iterating through generators 263 returned by multiple calls to this method simultaneously results in 264 undefined behavior. 265 266 :param capture_all: If ``False``, the generator will yield all buffered 267 push promises without blocking. If ``True``, the generator will 268 first yield all buffered push promises, then yield additional ones 269 as they arrive, and terminate when the original stream closes. 270 """ 271 while True: 272 for pair in self.promised_headers.items(): 273 yield pair 274 self.promised_headers = {} 275 if not capture_all or self.remote_closed: 276 break 277 self._recv_cb(stream_id=self.stream_id) 278 279 def close(self, error_code=None): 280 """ 281 Closes the stream. If the stream is currently open, attempts to close 282 it as gracefully as possible. 283 284 :param error_code: (optional) The error code to reset the stream with. 285 :returns: Nothing. 286 """ 287 # FIXME: I think this is overbroad, but for now it's probably ok. 288 if not (self.remote_closed and self.local_closed): 289 try: 290 with self._conn as conn: 291 conn.reset_stream(self.stream_id, error_code or 0) 292 except h2Exceptions.ProtocolError: 293 # If for any reason we can't reset the stream, just 294 # tolerate it. 295 pass 296 else: 297 self._send_outstanding_data(tolerate_peer_gone=True) 298 self.remote_closed = True 299 self.local_closed = True 300 301 self._close_cb(self.stream_id) 302 303 @property 304 def _out_flow_control_window(self): 305 """ 306 The size of our outbound flow control window. 307 """ 308 309 with self._conn as conn: 310 return conn.local_flow_control_window(self.stream_id) 311 312 def _send_chunk(self, data, final): 313 """ 314 Implements most of the sending logic. 315 316 Takes a single chunk of size at most MAX_CHUNK, wraps it in a frame and 317 sends it. Optionally sets the END_STREAM flag if this is the last chunk 318 (determined by being of size less than MAX_CHUNK) and no more data is 319 to be sent. 320 """ 321 # If we don't fit in the connection window, try popping frames off the 322 # connection in hope that one might be a window update frame. 323 while len(data) > self._out_flow_control_window: 324 self._recv_cb() 325 326 # If the length of the data is less than MAX_CHUNK, we're probably 327 # at the end of the file. If this is the end of the data, mark it 328 # as END_STREAM. 329 end_stream = False 330 if len(data) < MAX_CHUNK and final: 331 end_stream = True 332 333 # Send the frame and decrement the flow control window. 334 with self._conn as conn: 335 conn.send_data( 336 stream_id=self.stream_id, data=data, end_stream=end_stream 337 ) 338 self._send_outstanding_data() 339 340 if end_stream: 341 self.local_closed = True 342