1#!/usr/bin/env python 2# coding: utf-8 3from __future__ import absolute_import, division, print_function, unicode_literals 4 5from collections import OrderedDict 6from io import BytesIO 7import struct 8import sys 9 10import pytest 11from pytest import raises, xfail 12 13from msgpack import packb, unpackb, Unpacker, Packer, pack 14 15 16def check(data, use_list=False): 17 re = unpackb(packb(data), use_list=use_list, strict_map_key=False) 18 assert re == data 19 20 21def testPack(): 22 test_data = [ 23 0, 24 1, 25 127, 26 128, 27 255, 28 256, 29 65535, 30 65536, 31 4294967295, 32 4294967296, 33 -1, 34 -32, 35 -33, 36 -128, 37 -129, 38 -32768, 39 -32769, 40 -4294967296, 41 -4294967297, 42 1.0, 43 b"", 44 b"a", 45 b"a" * 31, 46 b"a" * 32, 47 None, 48 True, 49 False, 50 (), 51 ((),), 52 ((), None), 53 {None: 0}, 54 (1 << 23), 55 ] 56 for td in test_data: 57 check(td) 58 59 60def testPackUnicode(): 61 test_data = ["", "abcd", ["defgh"], "Русский текст"] 62 for td in test_data: 63 re = unpackb(packb(td), use_list=1, raw=False) 64 assert re == td 65 packer = Packer() 66 data = packer.pack(td) 67 re = Unpacker(BytesIO(data), raw=False, use_list=1).unpack() 68 assert re == td 69 70 71def testPackBytes(): 72 test_data = [b"", b"abcd", (b"defgh",)] 73 for td in test_data: 74 check(td) 75 76 77def testPackByteArrays(): 78 test_data = [bytearray(b""), bytearray(b"abcd"), (bytearray(b"defgh"),)] 79 for td in test_data: 80 check(td) 81 82 83@pytest.mark.skipif( 84 sys.version_info < (3, 0), reason="Python 2 passes invalid surrogates" 85) 86def testIgnoreUnicodeErrors(): 87 re = unpackb( 88 packb(b"abc\xeddef", use_bin_type=False), raw=False, unicode_errors="ignore" 89 ) 90 assert re == "abcdef" 91 92 93def testStrictUnicodeUnpack(): 94 packed = packb(b"abc\xeddef", use_bin_type=False) 95 with pytest.raises(UnicodeDecodeError): 96 unpackb(packed, raw=False, use_list=1) 97 98 99@pytest.mark.skipif( 100 sys.version_info < (3, 0), reason="Python 2 passes invalid surrogates" 101) 102def testIgnoreErrorsPack(): 103 re = unpackb( 104 packb("abc\uDC80\uDCFFdef", use_bin_type=True, unicode_errors="ignore"), 105 raw=False, 106 use_list=1, 107 ) 108 assert re == "abcdef" 109 110 111def testDecodeBinary(): 112 re = unpackb(packb(b"abc"), use_list=1) 113 assert re == b"abc" 114 115 116def testPackFloat(): 117 assert packb(1.0, use_single_float=True) == b"\xca" + struct.pack(str(">f"), 1.0) 118 assert packb(1.0, use_single_float=False) == b"\xcb" + struct.pack(str(">d"), 1.0) 119 120 121def testArraySize(sizes=[0, 5, 50, 1000]): 122 bio = BytesIO() 123 packer = Packer() 124 for size in sizes: 125 bio.write(packer.pack_array_header(size)) 126 for i in range(size): 127 bio.write(packer.pack(i)) 128 129 bio.seek(0) 130 unpacker = Unpacker(bio, use_list=1) 131 for size in sizes: 132 assert unpacker.unpack() == list(range(size)) 133 134 135def test_manualreset(sizes=[0, 5, 50, 1000]): 136 packer = Packer(autoreset=False) 137 for size in sizes: 138 packer.pack_array_header(size) 139 for i in range(size): 140 packer.pack(i) 141 142 bio = BytesIO(packer.bytes()) 143 unpacker = Unpacker(bio, use_list=1) 144 for size in sizes: 145 assert unpacker.unpack() == list(range(size)) 146 147 packer.reset() 148 assert packer.bytes() == b"" 149 150 151def testMapSize(sizes=[0, 5, 50, 1000]): 152 bio = BytesIO() 153 packer = Packer() 154 for size in sizes: 155 bio.write(packer.pack_map_header(size)) 156 for i in range(size): 157 bio.write(packer.pack(i)) # key 158 bio.write(packer.pack(i * 2)) # value 159 160 bio.seek(0) 161 unpacker = Unpacker(bio, strict_map_key=False) 162 for size in sizes: 163 assert unpacker.unpack() == dict((i, i * 2) for i in range(size)) 164 165 166def test_odict(): 167 seq = [(b"one", 1), (b"two", 2), (b"three", 3), (b"four", 4)] 168 od = OrderedDict(seq) 169 assert unpackb(packb(od), use_list=1) == dict(seq) 170 171 def pair_hook(seq): 172 return list(seq) 173 174 assert unpackb(packb(od), object_pairs_hook=pair_hook, use_list=1) == seq 175 176 177def test_pairlist(): 178 pairlist = [(b"a", 1), (2, b"b"), (b"foo", b"bar")] 179 packer = Packer() 180 packed = packer.pack_map_pairs(pairlist) 181 unpacked = unpackb(packed, object_pairs_hook=list, strict_map_key=False) 182 assert pairlist == unpacked 183 184 185def test_get_buffer(): 186 packer = Packer(autoreset=0, use_bin_type=True) 187 packer.pack([1, 2]) 188 strm = BytesIO() 189 strm.write(packer.getbuffer()) 190 written = strm.getvalue() 191 192 expected = packb([1, 2], use_bin_type=True) 193 assert written == expected 194