1# -*- coding: utf-8 -*-
2"""
3hyper/http20/stream
4~~~~~~~~~~~~~~~~~~~
5
6Objects that make up the stream-level abstraction of hyper's HTTP/2 support.
7
8These objects are not expected to be part of the public HTTP/2 API: they're
9intended purely for use inside hyper's HTTP/2 abstraction.
10
11Conceptually, a single HTTP/2 connection is made up of many streams: each
12stream is an independent, bi-directional sequence of HTTP headers and data.
13Each stream is identified by a monotonically increasing integer, assigned to
14the stream by the endpoint that initiated the stream.
15"""
16from ..h2 import exceptions as h2Exceptions
17
18from ..common.headers import HTTPHeaderMap
19from .util import h2_safe_headers
20import logging
21
22log = logging.getLogger(__name__)
23
24# Define the largest chunk of data we'll send in one go. Realistically, we
25# should take the MSS into account but that's pretty dull, so let's just say
26# 1kB and call it a day.
27MAX_CHUNK = 1024
28
29
30class Stream(object):
31    """
32    A single HTTP/2 stream.
33
34    A stream is an independent, bi-directional sequence of HTTP headers and
35    data. Each stream is identified by a single integer. From a HTTP
36    perspective, a stream _approximately_ matches a single request-response
37    pair.
38    """
39    def __init__(self,
40                 stream_id,
41                 window_manager,
42                 connection,
43                 send_outstanding_data,
44                 recv_cb,
45                 close_cb):
46        self.stream_id = stream_id
47        self.headers = HTTPHeaderMap()
48
49        # Set to a key-value set of the response headers once their
50        # HEADERS..CONTINUATION frame sequence finishes.
51        self.response_headers = None
52
53        # Set to a key-value set of the response trailers once their
54        # HEADERS..CONTINUATION frame sequence finishes.
55        self.response_trailers = None
56
57        # A dict mapping the promised stream ID of a pushed resource to a
58        # key-value set of its request headers. Entries are added once their
59        # PUSH_PROMISE..CONTINUATION frame sequence finishes.
60        self.promised_headers = {}
61
62        # Unconsumed response data chunks. Empties after every call to _read().
63        self.data = []
64
65        # Whether the remote side has completed the stream.
66        self.remote_closed = False
67
68        # Whether we have closed the stream.
69        self.local_closed = False
70
71        # There are two flow control windows: one for data we're sending,
72        # one for data being sent to us.
73        self._in_window_manager = window_manager
74
75        # Save off a reference to the state machine wrapped with lock.
76        self._conn = connection
77
78        # Save off a data callback.
79        self._send_outstanding_data = send_outstanding_data
80        self._recv_cb = recv_cb
81        self._close_cb = close_cb
82
83    def add_header(self, name, value, replace=False):
84        """
85        Adds a single HTTP header to the headers to be sent on the request.
86        """
87        if not replace:
88            self.headers[name] = value
89        else:
90            self.headers.replace(name, value)
91
92    def send_headers(self, end_stream=False):
93        """
94        Sends the complete saved header block on the stream.
95        """
96        headers = self.get_headers()
97        with self._conn as conn:
98            conn.send_headers(self.stream_id, headers, end_stream)
99        self._send_outstanding_data()
100
101        if end_stream:
102            self.local_closed = True
103
104    def send_data(self, data, final):
105        """
106        Send some data on the stream. If this is the end of the data to be
107        sent, the ``final`` flag _must_ be set to True. If no data is to be
108        sent, set ``data`` to ``None``.
109        """
110        # Define a utility iterator for file objects.
111        def file_iterator(fobj):
112            while True:
113                data = fobj.read(MAX_CHUNK)
114                yield data
115                if len(data) < MAX_CHUNK:
116                    break
117
118        # Build the appropriate iterator for the data, in chunks of CHUNK_SIZE.
119        if hasattr(data, 'read'):
120            chunks = file_iterator(data)
121        else:
122            chunks = (data[i:i+MAX_CHUNK]
123                      for i in range(0, len(data), MAX_CHUNK))
124
125        for chunk in chunks:
126            self._send_chunk(chunk, final)
127
128    def _read(self, amt=None):
129        """
130        Read data from the stream. Unlike a normal read behaviour, this
131        function returns _at least_ ``amt`` data, but may return more.
132        """
133        def listlen(list):
134            return sum(map(len, list))
135
136        # Keep reading until the stream is closed or we get enough data.
137        while (not self.remote_closed and
138                (amt is None or listlen(self.data) < amt)):
139            self._recv_cb(stream_id=self.stream_id)
140
141        result = b''.join(self.data)
142        self.data = []
143        return result
144
145    def _read_one_frame(self):
146        """
147        Reads a single data frame from the stream and returns it.
148        """
149        # Keep reading until the stream is closed or we have a data frame.
150        while not self.remote_closed and not self.data:
151            self._recv_cb(stream_id=self.stream_id)
152
153        try:
154            return self.data.pop(0)
155        except IndexError:
156            return None
157
158    def receive_response(self, event):
159        """
160        Receive response headers.
161        """
162        # TODO: If this is called while we're still sending data, we may want
163        # to stop sending that data and check the response. Early responses to
164        # big uploads are almost always a problem.
165        self.response_headers = HTTPHeaderMap(event.headers)
166
167    def receive_trailers(self, event):
168        """
169        Receive response trailers.
170        """
171        self.response_trailers = HTTPHeaderMap(event.headers)
172
173    def receive_push(self, event):
174        """
175        Receive the request headers for a pushed stream.
176        """
177        self.promised_headers[event.pushed_stream_id] = event.headers
178
179    def receive_data(self, event):
180        """
181        Receive a chunk of data.
182        """
183        size = event.flow_controlled_length
184        increment = self._in_window_manager._handle_frame(size)
185
186        # Append the data to the buffer.
187        self.data.append(event.data)
188
189        if increment:
190            try:
191                with self._conn as conn:
192                    conn.increment_flow_control_window(
193                        increment, stream_id=self.stream_id
194                    )
195            except h2Exceptions.StreamClosedError:
196                # We haven't got to it yet, but the stream is already
197                # closed. We don't need to increment the window in this
198                # case!
199                pass
200            else:
201                self._send_outstanding_data()
202
203    def receive_end_stream(self, event):
204        """
205        All of the data is returned now.
206        """
207        self.remote_closed = True
208
209    def receive_reset(self, event):
210        """
211        Stream forcefully reset.
212        """
213        self.remote_closed = True
214        self._close_cb(self.stream_id)
215
216    def get_headers(self):
217        """
218        Provides the headers to the connection object.
219        """
220        # Strip any headers invalid in H2.
221        return h2_safe_headers(self.headers)
222
223    def getheaders(self):
224        """
225        Once all data has been sent on this connection, returns a key-value set
226        of the headers of the response to the original request.
227        """
228        # Keep reading until all headers are received.
229        while self.response_headers is None:
230            self._recv_cb(stream_id=self.stream_id)
231
232        # Find the Content-Length header if present.
233        self._in_window_manager.document_size = (
234            int(self.response_headers.get(b'content-length', [0])[0])
235        )
236
237        return self.response_headers
238
239    def gettrailers(self):
240        """
241        Once all data has been sent on this connection, returns a key-value set
242        of the trailers of the response to the original request.
243
244        .. warning:: Note that this method requires that the stream is
245                     totally exhausted. This means that, if you have not
246                     completely read from the stream, all stream data will be
247                     read into memory.
248
249        :returns: The key-value set of the trailers, or ``None`` if no trailers
250                  were sent.
251        """
252        # Keep reading until the stream is done.
253        while not self.remote_closed:
254            self._recv_cb(stream_id=self.stream_id)
255
256        return self.response_trailers
257
258    def get_pushes(self, capture_all=False):
259        """
260        Returns a generator that yields push promises from the server. Note
261        that this method is not idempotent; promises returned in one call will
262        not be returned in subsequent calls. Iterating through generators
263        returned by multiple calls to this method simultaneously results in
264        undefined behavior.
265
266        :param capture_all: If ``False``, the generator will yield all buffered
267            push promises without blocking. If ``True``, the generator will
268            first yield all buffered push promises, then yield additional ones
269            as they arrive, and terminate when the original stream closes.
270        """
271        while True:
272            for pair in self.promised_headers.items():
273                yield pair
274            self.promised_headers = {}
275            if not capture_all or self.remote_closed:
276                break
277            self._recv_cb(stream_id=self.stream_id)
278
279    def close(self, error_code=None):
280        """
281        Closes the stream. If the stream is currently open, attempts to close
282        it as gracefully as possible.
283
284        :param error_code: (optional) The error code to reset the stream with.
285        :returns: Nothing.
286        """
287        # FIXME: I think this is overbroad, but for now it's probably ok.
288        if not (self.remote_closed and self.local_closed):
289            try:
290                with self._conn as conn:
291                    conn.reset_stream(self.stream_id, error_code or 0)
292            except h2Exceptions.ProtocolError:
293                # If for any reason we can't reset the stream, just
294                # tolerate it.
295                pass
296            else:
297                self._send_outstanding_data(tolerate_peer_gone=True)
298            self.remote_closed = True
299            self.local_closed = True
300
301        self._close_cb(self.stream_id)
302
303    @property
304    def _out_flow_control_window(self):
305        """
306        The size of our outbound flow control window.
307        """
308
309        with self._conn as conn:
310            return conn.local_flow_control_window(self.stream_id)
311
312    def _send_chunk(self, data, final):
313        """
314        Implements most of the sending logic.
315
316        Takes a single chunk of size at most MAX_CHUNK, wraps it in a frame and
317        sends it. Optionally sets the END_STREAM flag if this is the last chunk
318        (determined by being of size less than MAX_CHUNK) and no more data is
319        to be sent.
320        """
321        # If we don't fit in the connection window, try popping frames off the
322        # connection in hope that one might be a window update frame.
323        while len(data) > self._out_flow_control_window:
324            self._recv_cb()
325
326        # If the length of the data is less than MAX_CHUNK, we're probably
327        # at the end of the file. If this is the end of the data, mark it
328        # as END_STREAM.
329        end_stream = False
330        if len(data) < MAX_CHUNK and final:
331            end_stream = True
332
333        # Send the frame and decrement the flow control window.
334        with self._conn as conn:
335            conn.send_data(
336                stream_id=self.stream_id, data=data, end_stream=end_stream
337            )
338        self._send_outstanding_data()
339
340        if end_stream:
341            self.local_closed = True
342