1# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13"""Unit tests for the binary event stream decoder. """
14import pytest
15
16from tests import mock
17
18from botocore.parsers import EventStreamXMLParser
19from botocore.eventstream import (
20    EventStreamMessage, MessagePrelude, EventStreamBuffer,
21    ChecksumMismatch, InvalidPayloadLength, InvalidHeadersLength,
22    DuplicateHeader, EventStreamHeaderParser, DecodeUtils, EventStream,
23    NoInitialResponseError
24)
25from botocore.exceptions import EventStreamError
26
27EMPTY_MESSAGE = (
28    b'\x00\x00\x00\x10\x00\x00\x00\x00\x05\xc2H\xeb}\x98\xc8\xff',
29    EventStreamMessage(
30        prelude=MessagePrelude(
31            total_length=0x10,
32            headers_length=0,
33            crc=0x05c248eb,
34        ),
35        headers={},
36        payload=b'',
37        crc=0x7d98c8ff,
38    )
39)
40
41INT8_HEADER = (
42    (
43        b"\x00\x00\x00\x17\x00\x00\x00\x07)\x86\x01X\x04"
44        b"byte\x02\xff\xc2\xf8i\xdc"
45    ),
46    EventStreamMessage(
47        prelude=MessagePrelude(
48            total_length=0x17,
49            headers_length=0x7,
50            crc=0x29860158,
51        ),
52        headers={'byte': -1},
53        payload=b'',
54        crc=0xc2f869dc,
55    )
56)
57
58INT16_HEADER = (
59    (
60        b"\x00\x00\x00\x19\x00\x00\x00\tq\x0e\x92>\x05"
61        b"short\x03\xff\xff\xb2|\xb6\xcc"
62    ),
63    EventStreamMessage(
64        prelude=MessagePrelude(
65            total_length=0x19,
66            headers_length=0x9,
67            crc=0x710e923e,
68        ),
69        headers={'short': -1},
70        payload=b'',
71        crc=0xb27cb6cc,
72    )
73)
74
75INT32_HEADER = (
76    (
77        b"\x00\x00\x00\x1d\x00\x00\x00\r\x83\xe3\xf0\xe7\x07"
78        b"integer\x04\xff\xff\xff\xff\x8b\x8e\x12\xeb"
79    ),
80    EventStreamMessage(
81        prelude=MessagePrelude(
82            total_length=0x1D,
83            headers_length=0xD,
84            crc=0x83e3f0e7,
85        ),
86        headers={'integer': -1},
87        payload=b'',
88        crc=0x8b8e12eb,
89    )
90)
91
92INT64_HEADER = (
93    (
94        b"\x00\x00\x00\x1e\x00\x00\x00\x0e]J\xdb\x8d\x04"
95        b"long\x05\xff\xff\xff\xff\xff\xff\xff\xffK\xc22\xda"
96    ),
97    EventStreamMessage(
98        prelude=MessagePrelude(
99            total_length=0x1E,
100            headers_length=0xE,
101            crc=0x5d4adb8d,
102        ),
103        headers={'long': -1},
104        payload=b'',
105        crc=0x4bc232da,
106    )
107)
108
109PAYLOAD_NO_HEADERS = (
110    b"\x00\x00\x00\x1d\x00\x00\x00\x00\xfdR\x8cZ{'foo':'bar'}\xc3e96",
111    EventStreamMessage(
112        prelude=MessagePrelude(
113            total_length=0x1d,
114            headers_length=0,
115            crc=0xfd528c5a,
116        ),
117        headers={},
118        payload=b"{'foo':'bar'}",
119        crc=0xc3653936,
120    )
121)
122
123PAYLOAD_ONE_STR_HEADER = (
124    (b"\x00\x00\x00=\x00\x00\x00 \x07\xfd\x83\x96\x0ccontent-type\x07\x00\x10"
125     b"application/json{'foo':'bar'}\x8d\x9c\x08\xb1"),
126    EventStreamMessage(
127        prelude=MessagePrelude(
128            total_length=0x3d,
129            headers_length=0x20,
130            crc=0x07fd8396,
131        ),
132        headers={'content-type': 'application/json'},
133        payload=b"{'foo':'bar'}",
134        crc=0x8d9c08b1,
135    )
136)
137
138ALL_HEADERS_TYPES = (
139    (b"\x00\x00\x00\x62\x00\x00\x00\x52\x03\xb5\xcb\x9c"
140     b"\x010\x00\x011\x01\x012\x02\x02\x013\x03\x00\x03"
141     b"\x014\x04\x00\x00\x00\x04\x015\x05\x00\x00\x00\x00\x00\x00\x00\x05"
142     b"\x016\x06\x00\x05bytes\x017\x07\x00\x04utf8"
143     b"\x018\x08\x00\x00\x00\x00\x00\x00\x00\x08\x019\x090123456789abcdef"
144     b"\x63\x35\x36\x71"),
145    EventStreamMessage(
146        prelude=MessagePrelude(
147            total_length=0x62,
148            headers_length=0x52,
149            crc=0x03b5cb9c,
150        ),
151        headers={
152            '0': True,
153            '1': False,
154            '2': 0x02,
155            '3': 0x03,
156            '4': 0x04,
157            '5': 0x05,
158            '6': b'bytes',
159            '7': u'utf8',
160            '8': 0x08,
161            '9': b'0123456789abcdef',
162        },
163        payload=b"",
164        crc=0x63353671,
165    )
166)
167
168ERROR_EVENT_MESSAGE = (
169    (b"\x00\x00\x00\x52\x00\x00\x00\x42\xbf\x23\x63\x7e"
170     b"\x0d:message-type\x07\x00\x05error"
171     b"\x0b:error-code\x07\x00\x04code"
172     b"\x0e:error-message\x07\x00\x07message"
173     b"\x6b\x6c\xea\x3d"),
174    EventStreamMessage(
175        prelude=MessagePrelude(
176            total_length=0x52,
177            headers_length=0x42,
178            crc=0xbf23637e,
179        ),
180        headers={
181            ':message-type': 'error',
182            ':error-code': 'code',
183            ':error-message': 'message',
184        },
185        payload=b'',
186        crc=0x6b6cea3d,
187    )
188)
189
190# Tuples of encoded messages and their expected decoded output
191POSITIVE_CASES = [
192    EMPTY_MESSAGE,
193    INT8_HEADER,
194    INT16_HEADER,
195    INT32_HEADER,
196    INT64_HEADER,
197    PAYLOAD_NO_HEADERS,
198    PAYLOAD_ONE_STR_HEADER,
199    ALL_HEADERS_TYPES,
200    ERROR_EVENT_MESSAGE,
201]
202
203CORRUPTED_HEADER_LENGTH = (
204    (b"\x00\x00\x00=\xFF\x00\x01\x02\x07\xfd\x83\x96\x0ccontent-type\x07\x00"
205     b"\x10application/json{'foo':'bar'}\x8d\x9c\x08\xb1"),
206    InvalidHeadersLength
207)
208
209CORRUPTED_HEADERS = (
210    (b"\x00\x00\x00=\x00\x00\x00 \x07\xfd\x83\x96\x0ccontent+type\x07\x00\x10"
211     b"application/json{'foo':'bar'}\x8d\x9c\x08\xb1"),
212    ChecksumMismatch
213)
214
215CORRUPTED_LENGTH = (
216    b"\x01\x00\x00\x1d\x00\x00\x00\x00\xfdR\x8cZ{'foo':'bar'}\xc3e96",
217    InvalidPayloadLength
218)
219
220CORRUPTED_PAYLOAD = (
221    b"\x00\x00\x00\x1d\x00\x00\x00\x00\xfdR\x8cZ{'foo':'bar'\x8d\xc3e96",
222    ChecksumMismatch
223)
224
225DUPLICATE_HEADER = (
226    (b"\x00\x00\x00\x24\x00\x00\x00\x14\x4b\xb9\x82\xd0"
227     b"\x04test\x04asdf\x04test\x04asdf\xf3\xf4\x75\x63"),
228    DuplicateHeader
229)
230
231# Tuples of encoded messages and their expected exception
232NEGATIVE_CASES = [
233    CORRUPTED_LENGTH,
234    CORRUPTED_PAYLOAD,
235    CORRUPTED_HEADERS,
236    CORRUPTED_HEADER_LENGTH,
237    DUPLICATE_HEADER,
238]
239
240
241def assert_message_equal(message_a, message_b):
242    """Asserts all fields for two messages are equal. """
243    assert message_a.prelude.total_length == message_b.prelude.total_length
244    assert message_a.prelude.headers_length == message_b.prelude.headers_length
245    assert message_a.prelude.crc == message_b.prelude.crc
246    assert message_a.headers == message_b.headers
247    assert message_a.payload == message_b.payload
248    assert message_a.crc == message_b.crc
249
250
251def test_partial_message():
252    """ Ensure that we can receive partial payloads. """
253    data = EMPTY_MESSAGE[0]
254    event_buffer = EventStreamBuffer()
255    # This mid point is an arbitrary break in the middle of the headers
256    mid_point = 15
257    event_buffer.add_data(data[:mid_point])
258    messages = list(event_buffer)
259    assert messages == []
260    event_buffer.add_data(data[mid_point:len(data)])
261    for message in event_buffer:
262        assert_message_equal(message, EMPTY_MESSAGE[1])
263
264
265def check_message_decodes(encoded, decoded):
266    """ Ensure the message decodes to what we expect. """
267    event_buffer = EventStreamBuffer()
268    event_buffer.add_data(encoded)
269    messages = list(event_buffer)
270    assert len(messages) == 1
271    assert_message_equal(messages[0], decoded)
272
273
274@pytest.mark.parametrize("encoded, decoded", POSITIVE_CASES)
275def test_positive_cases(encoded, decoded):
276    """Test that all positive cases decode how we expect. """
277    check_message_decodes(encoded, decoded)
278
279
280def test_all_positive_cases():
281    """Test all positive cases can be decoded on the same buffer. """
282    event_buffer = EventStreamBuffer()
283    # add all positive test cases to the same buffer
284    for (encoded, _) in POSITIVE_CASES:
285        event_buffer.add_data(encoded)
286    # collect all of the expected messages
287    expected_messages = [decoded for (_, decoded) in POSITIVE_CASES]
288    # collect all of the decoded messages
289    decoded_messages = list(event_buffer)
290    # assert all messages match what we expect
291    for (expected, decoded) in zip(expected_messages, decoded_messages):
292        assert_message_equal(expected, decoded)
293
294
295@pytest.mark.parametrize("encoded, exception", NEGATIVE_CASES)
296def test_negative_cases(encoded, exception):
297    """Test that all negative cases raise the expected exception. """
298    with pytest.raises(exception):
299        check_message_decodes(encoded, None)
300
301
302def test_header_parser():
303    """Test that the header parser supports all header types. """
304    headers_data = (
305        b"\x010\x00\x011\x01\x012\x02\x02\x013\x03\x00\x03"
306        b"\x014\x04\x00\x00\x00\x04\x015\x05\x00\x00\x00\x00\x00\x00\x00\x05"
307        b"\x016\x06\x00\x05bytes\x017\x07\x00\x04utf8"
308        b"\x018\x08\x00\x00\x00\x00\x00\x00\x00\x08\x019\x090123456789abcdef"
309    )
310
311    expected_headers = {
312        '0': True,
313        '1': False,
314        '2': 0x02,
315        '3': 0x03,
316        '4': 0x04,
317        '5': 0x05,
318        '6': b'bytes',
319        '7': u'utf8',
320        '8': 0x08,
321        '9': b'0123456789abcdef',
322    }
323
324    parser = EventStreamHeaderParser()
325    headers = parser.parse(headers_data)
326    assert headers == expected_headers
327
328
329def test_message_prelude_properties():
330    """Test that calculated properties from the payload are correct. """
331    # Total length: 40, Headers Length: 15, random crc
332    prelude = MessagePrelude(40, 15, 0x00000000)
333    assert prelude.payload_length == 9
334    assert prelude.headers_end == 27
335    assert prelude.payload_end == 36
336
337
338def test_message_to_response_dict():
339    response_dict = PAYLOAD_ONE_STR_HEADER[1].to_response_dict()
340    assert response_dict['status_code'] == 200
341
342    expected_headers = {'content-type': 'application/json'}
343    assert response_dict['headers'] == expected_headers
344    assert response_dict['body'] == b"{'foo':'bar'}"
345
346
347def test_message_to_response_dict_error():
348    response_dict = ERROR_EVENT_MESSAGE[1].to_response_dict()
349    assert response_dict['status_code'] == 400
350    headers = {
351        ':message-type': 'error',
352        ':error-code': 'code',
353        ':error-message': 'message',
354    }
355    assert response_dict['headers'] == headers
356    assert response_dict['body'] == b''
357
358
359def test_unpack_uint8():
360    (value, bytes_consumed) = DecodeUtils.unpack_uint8(b'\xDE')
361    assert bytes_consumed == 1
362    assert value == 0xDE
363
364
365def test_unpack_uint32():
366    (value, bytes_consumed) = DecodeUtils.unpack_uint32(b'\xDE\xAD\xBE\xEF')
367    assert bytes_consumed == 4
368    assert value == 0xDEADBEEF
369
370
371def test_unpack_int8():
372    (value, bytes_consumed) = DecodeUtils.unpack_int8(b'\xFE')
373    assert bytes_consumed == 1
374    assert value == -2
375
376
377def test_unpack_int16():
378    (value, bytes_consumed) = DecodeUtils.unpack_int16(b'\xFF\xFE')
379    assert bytes_consumed == 2
380    assert value == -2
381
382
383def test_unpack_int32():
384    (value, bytes_consumed) = DecodeUtils.unpack_int32(b'\xFF\xFF\xFF\xFE')
385    assert bytes_consumed == 4
386    assert value == -2
387
388
389def test_unpack_int64():
390    test_bytes = b'\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFE'
391    (value, bytes_consumed) = DecodeUtils.unpack_int64(test_bytes)
392    assert bytes_consumed == 8
393    assert value == -2
394
395
396def test_unpack_array_short():
397    test_bytes = b'\x00\x10application/json'
398    (value, bytes_consumed) = DecodeUtils.unpack_byte_array(test_bytes)
399    assert bytes_consumed == 18
400    assert value == b'application/json'
401
402
403def test_unpack_byte_array_int():
404    (value, array_bytes_consumed) = DecodeUtils.unpack_byte_array(
405        b'\x00\x00\x00\x10application/json', length_byte_size=4)
406    assert array_bytes_consumed == 20
407    assert value == b'application/json'
408
409
410def test_unpack_utf8_string():
411    length = b'\x00\x09'
412    utf8_string = b'\xe6\x97\xa5\xe6\x9c\xac\xe8\xaa\x9e'
413    encoded = length + utf8_string
414    (value, bytes_consumed) = DecodeUtils.unpack_utf8_string(encoded)
415    assert bytes_consumed == 11
416    assert value == utf8_string.decode('utf-8')
417
418
419def test_unpack_prelude():
420    data = b'\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03'
421    prelude = DecodeUtils.unpack_prelude(data)
422    assert prelude == ((1, 2, 3), 12)
423
424
425def create_mock_raw_stream(*data):
426    raw_stream = mock.Mock()
427
428    def generator():
429        for chunk in data:
430            yield chunk
431    raw_stream.stream = generator
432    return raw_stream
433
434
435def test_event_stream_wrapper_iteration():
436    raw_stream = create_mock_raw_stream(
437        b"\x00\x00\x00+\x00\x00\x00\x0e4\x8b\xec{\x08event-id\x04\x00",
438        b"\x00\xa0\x0c{'foo':'bar'}\xd3\x89\x02\x85",
439    )
440    parser = mock.Mock(spec=EventStreamXMLParser)
441    output_shape = mock.Mock()
442    event_stream = EventStream(raw_stream, output_shape, parser, '')
443    events = list(event_stream)
444    assert len(events) == 1
445
446    response_dict = {
447        'headers': {'event-id': 0x0000a00c},
448        'body': b"{'foo':'bar'}",
449        'status_code': 200,
450    }
451    parser.parse.assert_called_with(response_dict, output_shape)
452
453
454def test_eventstream_wrapper_iteration_error():
455    raw_stream = create_mock_raw_stream(ERROR_EVENT_MESSAGE[0])
456    parser = mock.Mock(spec=EventStreamXMLParser)
457    parser.parse.return_value = {}
458    output_shape = mock.Mock()
459    event_stream = EventStream(raw_stream, output_shape, parser, '')
460    with pytest.raises(EventStreamError):
461        list(event_stream)
462
463
464def test_event_stream_wrapper_close():
465    raw_stream = mock.Mock()
466    event_stream = EventStream(raw_stream, None, None, '')
467    event_stream.close()
468    raw_stream.close.assert_called_once_with()
469
470
471def test_event_stream_initial_response():
472    raw_stream = create_mock_raw_stream(
473        b'\x00\x00\x00~\x00\x00\x00O\xc5\xa3\xdd\xc6\r:message-type\x07\x00',
474        b'\x05event\x0b:event-type\x07\x00\x10initial-response\r:content-type',
475        b'\x07\x00\ttext/json{"InitialResponse": "sometext"}\xf6\x98$\x83'
476    )
477    parser = mock.Mock(spec=EventStreamXMLParser)
478    output_shape = mock.Mock()
479    event_stream = EventStream(raw_stream, output_shape, parser, '')
480    event = event_stream.get_initial_response()
481    headers = {
482        ':message-type': 'event',
483        ':event-type': 'initial-response',
484        ':content-type': 'text/json',
485    }
486    payload = b'{"InitialResponse": "sometext"}'
487    assert event.headers == headers
488    assert event.payload == payload
489
490
491def test_event_stream_initial_response_wrong_type():
492    raw_stream = create_mock_raw_stream(
493        b"\x00\x00\x00+\x00\x00\x00\x0e4\x8b\xec{\x08event-id\x04\x00",
494        b"\x00\xa0\x0c{'foo':'bar'}\xd3\x89\x02\x85",
495    )
496    parser = mock.Mock(spec=EventStreamXMLParser)
497    output_shape = mock.Mock()
498    event_stream = EventStream(raw_stream, output_shape, parser, '')
499    with pytest.raises(NoInitialResponseError):
500        event_stream.get_initial_response()
501
502
503def test_event_stream_initial_response_no_event():
504    raw_stream = create_mock_raw_stream(b'')
505    parser = mock.Mock(spec=EventStreamXMLParser)
506    output_shape = mock.Mock()
507    event_stream = EventStream(raw_stream, output_shape, parser, '')
508    with pytest.raises(NoInitialResponseError):
509        event_stream.get_initial_response()
510