1from __future__ import print_function 2import array 3import msgpack 4from msgpack import ExtType 5 6 7def test_pack_ext_type(): 8 def p(s): 9 packer = msgpack.Packer() 10 packer.pack_ext_type(0x42, s) 11 return packer.bytes() 12 13 assert p(b"A") == b"\xd4\x42A" # fixext 1 14 assert p(b"AB") == b"\xd5\x42AB" # fixext 2 15 assert p(b"ABCD") == b"\xd6\x42ABCD" # fixext 4 16 assert p(b"ABCDEFGH") == b"\xd7\x42ABCDEFGH" # fixext 8 17 assert p(b"A" * 16) == b"\xd8\x42" + b"A" * 16 # fixext 16 18 assert p(b"ABC") == b"\xc7\x03\x42ABC" # ext 8 19 assert p(b"A" * 0x0123) == b"\xc8\x01\x23\x42" + b"A" * 0x0123 # ext 16 20 assert ( 21 p(b"A" * 0x00012345) == b"\xc9\x00\x01\x23\x45\x42" + b"A" * 0x00012345 22 ) # ext 32 23 24 25def test_unpack_ext_type(): 26 def check(b, expected): 27 assert msgpack.unpackb(b) == expected 28 29 check(b"\xd4\x42A", ExtType(0x42, b"A")) # fixext 1 30 check(b"\xd5\x42AB", ExtType(0x42, b"AB")) # fixext 2 31 check(b"\xd6\x42ABCD", ExtType(0x42, b"ABCD")) # fixext 4 32 check(b"\xd7\x42ABCDEFGH", ExtType(0x42, b"ABCDEFGH")) # fixext 8 33 check(b"\xd8\x42" + b"A" * 16, ExtType(0x42, b"A" * 16)) # fixext 16 34 check(b"\xc7\x03\x42ABC", ExtType(0x42, b"ABC")) # ext 8 35 check(b"\xc8\x01\x23\x42" + b"A" * 0x0123, ExtType(0x42, b"A" * 0x0123)) # ext 16 36 check( 37 b"\xc9\x00\x01\x23\x45\x42" + b"A" * 0x00012345, 38 ExtType(0x42, b"A" * 0x00012345), 39 ) # ext 32 40 41 42def test_extension_type(): 43 def default(obj): 44 print("default called", obj) 45 if isinstance(obj, array.array): 46 typecode = 123 # application specific typecode 47 try: 48 data = obj.tobytes() 49 except AttributeError: 50 data = obj.tostring() 51 return ExtType(typecode, data) 52 raise TypeError("Unknown type object %r" % (obj,)) 53 54 def ext_hook(code, data): 55 print("ext_hook called", code, data) 56 assert code == 123 57 obj = array.array("d") 58 try: 59 obj.frombytes(data) 60 except AttributeError: # PY2 61 obj.fromstring(data) 62 return obj 63 64 obj = [42, b"hello", array.array("d", [1.1, 2.2, 3.3])] 65 s = msgpack.packb(obj, default=default) 66 obj2 = msgpack.unpackb(s, ext_hook=ext_hook) 67 assert obj == obj2 68 69 70import sys 71 72if sys.version > "3": 73 long = int 74 75 76def test_overriding_hooks(): 77 def default(obj): 78 if isinstance(obj, long): 79 return {"__type__": "long", "__data__": str(obj)} 80 else: 81 return obj 82 83 obj = {"testval": long(1823746192837461928374619)} 84 refobj = {"testval": default(obj["testval"])} 85 refout = msgpack.packb(refobj) 86 assert isinstance(refout, (str, bytes)) 87 testout = msgpack.packb(obj, default=default) 88 89 assert refout == testout 90