from __future__ import print_function import array import msgpack from msgpack import ExtType def test_pack_ext_type(): def p(s): packer = msgpack.Packer() packer.pack_ext_type(0x42, s) return packer.bytes() assert p(b"A") == b"\xd4\x42A" # fixext 1 assert p(b"AB") == b"\xd5\x42AB" # fixext 2 assert p(b"ABCD") == b"\xd6\x42ABCD" # fixext 4 assert p(b"ABCDEFGH") == b"\xd7\x42ABCDEFGH" # fixext 8 assert p(b"A" * 16) == b"\xd8\x42" + b"A" * 16 # fixext 16 assert p(b"ABC") == b"\xc7\x03\x42ABC" # ext 8 assert p(b"A" * 0x0123) == b"\xc8\x01\x23\x42" + b"A" * 0x0123 # ext 16 assert ( p(b"A" * 0x00012345) == b"\xc9\x00\x01\x23\x45\x42" + b"A" * 0x00012345 ) # ext 32 def test_unpack_ext_type(): def check(b, expected): assert msgpack.unpackb(b) == expected check(b"\xd4\x42A", ExtType(0x42, b"A")) # fixext 1 check(b"\xd5\x42AB", ExtType(0x42, b"AB")) # fixext 2 check(b"\xd6\x42ABCD", ExtType(0x42, b"ABCD")) # fixext 4 check(b"\xd7\x42ABCDEFGH", ExtType(0x42, b"ABCDEFGH")) # fixext 8 check(b"\xd8\x42" + b"A" * 16, ExtType(0x42, b"A" * 16)) # fixext 16 check(b"\xc7\x03\x42ABC", ExtType(0x42, b"ABC")) # ext 8 check(b"\xc8\x01\x23\x42" + b"A" * 0x0123, ExtType(0x42, b"A" * 0x0123)) # ext 16 check( b"\xc9\x00\x01\x23\x45\x42" + b"A" * 0x00012345, ExtType(0x42, b"A" * 0x00012345), ) # ext 32 def test_extension_type(): def default(obj): print("default called", obj) if isinstance(obj, array.array): typecode = 123 # application specific typecode try: data = obj.tobytes() except AttributeError: data = obj.tostring() return ExtType(typecode, data) raise TypeError("Unknown type object %r" % (obj,)) def ext_hook(code, data): print("ext_hook called", code, data) assert code == 123 obj = array.array("d") try: obj.frombytes(data) except AttributeError: # PY2 obj.fromstring(data) return obj obj = [42, b"hello", array.array("d", [1.1, 2.2, 3.3])] s = msgpack.packb(obj, default=default) obj2 = msgpack.unpackb(s, ext_hook=ext_hook) assert obj == obj2 import sys if sys.version > "3": long = int def test_overriding_hooks(): def default(obj): if isinstance(obj, long): return {"__type__": "long", "__data__": str(obj)} else: return obj obj = {"testval": long(1823746192837461928374619)} refobj = {"testval": default(obj["testval"])} refout = msgpack.packb(refobj) assert isinstance(refout, (str, bytes)) testout = msgpack.packb(obj, default=default) assert refout == testout