1#!/usr/bin/env python
2# coding: utf-8
3from __future__ import absolute_import, division, print_function, unicode_literals
4
5from collections import OrderedDict
6from io import BytesIO
7import struct
8import sys
9
10import pytest
11from pytest import raises, xfail
12
13from msgpack import packb, unpackb, Unpacker, Packer, pack
14
15
16def check(data, use_list=False):
17    re = unpackb(packb(data), use_list=use_list, strict_map_key=False)
18    assert re == data
19
20
21def testPack():
22    test_data = [
23        0,
24        1,
25        127,
26        128,
27        255,
28        256,
29        65535,
30        65536,
31        4294967295,
32        4294967296,
33        -1,
34        -32,
35        -33,
36        -128,
37        -129,
38        -32768,
39        -32769,
40        -4294967296,
41        -4294967297,
42        1.0,
43        b"",
44        b"a",
45        b"a" * 31,
46        b"a" * 32,
47        None,
48        True,
49        False,
50        (),
51        ((),),
52        ((), None),
53        {None: 0},
54        (1 << 23),
55    ]
56    for td in test_data:
57        check(td)
58
59
60def testPackUnicode():
61    test_data = ["", "abcd", ["defgh"], "Русский текст"]
62    for td in test_data:
63        re = unpackb(packb(td), use_list=1, raw=False)
64        assert re == td
65        packer = Packer()
66        data = packer.pack(td)
67        re = Unpacker(BytesIO(data), raw=False, use_list=1).unpack()
68        assert re == td
69
70
71def testPackBytes():
72    test_data = [b"", b"abcd", (b"defgh",)]
73    for td in test_data:
74        check(td)
75
76
77def testPackByteArrays():
78    test_data = [bytearray(b""), bytearray(b"abcd"), (bytearray(b"defgh"),)]
79    for td in test_data:
80        check(td)
81
82
83@pytest.mark.skipif(
84    sys.version_info < (3, 0), reason="Python 2 passes invalid surrogates"
85)
86def testIgnoreUnicodeErrors():
87    re = unpackb(
88        packb(b"abc\xeddef", use_bin_type=False), raw=False, unicode_errors="ignore"
89    )
90    assert re == "abcdef"
91
92
93def testStrictUnicodeUnpack():
94    packed = packb(b"abc\xeddef", use_bin_type=False)
95    with pytest.raises(UnicodeDecodeError):
96        unpackb(packed, raw=False, use_list=1)
97
98
99@pytest.mark.skipif(
100    sys.version_info < (3, 0), reason="Python 2 passes invalid surrogates"
101)
102def testIgnoreErrorsPack():
103    re = unpackb(
104        packb("abc\uDC80\uDCFFdef", use_bin_type=True, unicode_errors="ignore"),
105        raw=False,
106        use_list=1,
107    )
108    assert re == "abcdef"
109
110
111def testDecodeBinary():
112    re = unpackb(packb(b"abc"), use_list=1)
113    assert re == b"abc"
114
115
116def testPackFloat():
117    assert packb(1.0, use_single_float=True) == b"\xca" + struct.pack(str(">f"), 1.0)
118    assert packb(1.0, use_single_float=False) == b"\xcb" + struct.pack(str(">d"), 1.0)
119
120
121def testArraySize(sizes=[0, 5, 50, 1000]):
122    bio = BytesIO()
123    packer = Packer()
124    for size in sizes:
125        bio.write(packer.pack_array_header(size))
126        for i in range(size):
127            bio.write(packer.pack(i))
128
129    bio.seek(0)
130    unpacker = Unpacker(bio, use_list=1)
131    for size in sizes:
132        assert unpacker.unpack() == list(range(size))
133
134
135def test_manualreset(sizes=[0, 5, 50, 1000]):
136    packer = Packer(autoreset=False)
137    for size in sizes:
138        packer.pack_array_header(size)
139        for i in range(size):
140            packer.pack(i)
141
142    bio = BytesIO(packer.bytes())
143    unpacker = Unpacker(bio, use_list=1)
144    for size in sizes:
145        assert unpacker.unpack() == list(range(size))
146
147    packer.reset()
148    assert packer.bytes() == b""
149
150
151def testMapSize(sizes=[0, 5, 50, 1000]):
152    bio = BytesIO()
153    packer = Packer()
154    for size in sizes:
155        bio.write(packer.pack_map_header(size))
156        for i in range(size):
157            bio.write(packer.pack(i))  # key
158            bio.write(packer.pack(i * 2))  # value
159
160    bio.seek(0)
161    unpacker = Unpacker(bio, strict_map_key=False)
162    for size in sizes:
163        assert unpacker.unpack() == dict((i, i * 2) for i in range(size))
164
165
166def test_odict():
167    seq = [(b"one", 1), (b"two", 2), (b"three", 3), (b"four", 4)]
168    od = OrderedDict(seq)
169    assert unpackb(packb(od), use_list=1) == dict(seq)
170
171    def pair_hook(seq):
172        return list(seq)
173
174    assert unpackb(packb(od), object_pairs_hook=pair_hook, use_list=1) == seq
175
176
177def test_pairlist():
178    pairlist = [(b"a", 1), (2, b"b"), (b"foo", b"bar")]
179    packer = Packer()
180    packed = packer.pack_map_pairs(pairlist)
181    unpacked = unpackb(packed, object_pairs_hook=list, strict_map_key=False)
182    assert pairlist == unpacked
183
184
185def test_get_buffer():
186    packer = Packer(autoreset=0, use_bin_type=True)
187    packer.pack([1, 2])
188    strm = BytesIO()
189    strm.write(packer.getbuffer())
190    written = strm.getvalue()
191
192    expected = packb([1, 2], use_bin_type=True)
193    assert written == expected
194