1from __future__ import print_function
2import array
3import msgpack
4from msgpack import ExtType
5
6
7def test_pack_ext_type():
8    def p(s):
9        packer = msgpack.Packer()
10        packer.pack_ext_type(0x42, s)
11        return packer.bytes()
12
13    assert p(b"A") == b"\xd4\x42A"  # fixext 1
14    assert p(b"AB") == b"\xd5\x42AB"  # fixext 2
15    assert p(b"ABCD") == b"\xd6\x42ABCD"  # fixext 4
16    assert p(b"ABCDEFGH") == b"\xd7\x42ABCDEFGH"  # fixext 8
17    assert p(b"A" * 16) == b"\xd8\x42" + b"A" * 16  # fixext 16
18    assert p(b"ABC") == b"\xc7\x03\x42ABC"  # ext 8
19    assert p(b"A" * 0x0123) == b"\xc8\x01\x23\x42" + b"A" * 0x0123  # ext 16
20    assert (
21        p(b"A" * 0x00012345) == b"\xc9\x00\x01\x23\x45\x42" + b"A" * 0x00012345
22    )  # ext 32
23
24
25def test_unpack_ext_type():
26    def check(b, expected):
27        assert msgpack.unpackb(b) == expected
28
29    check(b"\xd4\x42A", ExtType(0x42, b"A"))  # fixext 1
30    check(b"\xd5\x42AB", ExtType(0x42, b"AB"))  # fixext 2
31    check(b"\xd6\x42ABCD", ExtType(0x42, b"ABCD"))  # fixext 4
32    check(b"\xd7\x42ABCDEFGH", ExtType(0x42, b"ABCDEFGH"))  # fixext 8
33    check(b"\xd8\x42" + b"A" * 16, ExtType(0x42, b"A" * 16))  # fixext 16
34    check(b"\xc7\x03\x42ABC", ExtType(0x42, b"ABC"))  # ext 8
35    check(b"\xc8\x01\x23\x42" + b"A" * 0x0123, ExtType(0x42, b"A" * 0x0123))  # ext 16
36    check(
37        b"\xc9\x00\x01\x23\x45\x42" + b"A" * 0x00012345,
38        ExtType(0x42, b"A" * 0x00012345),
39    )  # ext 32
40
41
42def test_extension_type():
43    def default(obj):
44        print("default called", obj)
45        if isinstance(obj, array.array):
46            typecode = 123  # application specific typecode
47            try:
48                data = obj.tobytes()
49            except AttributeError:
50                data = obj.tostring()
51            return ExtType(typecode, data)
52        raise TypeError("Unknown type object %r" % (obj,))
53
54    def ext_hook(code, data):
55        print("ext_hook called", code, data)
56        assert code == 123
57        obj = array.array("d")
58        try:
59            obj.frombytes(data)
60        except AttributeError:  # PY2
61            obj.fromstring(data)
62        return obj
63
64    obj = [42, b"hello", array.array("d", [1.1, 2.2, 3.3])]
65    s = msgpack.packb(obj, default=default)
66    obj2 = msgpack.unpackb(s, ext_hook=ext_hook)
67    assert obj == obj2
68
69
70import sys
71
72if sys.version > "3":
73    long = int
74
75
76def test_overriding_hooks():
77    def default(obj):
78        if isinstance(obj, long):
79            return {"__type__": "long", "__data__": str(obj)}
80        else:
81            return obj
82
83    obj = {"testval": long(1823746192837461928374619)}
84    refobj = {"testval": default(obj["testval"])}
85    refout = msgpack.packb(refobj)
86    assert isinstance(refout, (str, bytes))
87    testout = msgpack.packb(obj, default=default)
88
89    assert refout == testout
90