1from __future__ import absolute_import
2
3import platform
4import struct
5
6import pytest
7from kafka.vendor.six.moves import range
8
9from kafka.codec import (
10    has_snappy, has_gzip, has_lz4,
11    gzip_encode, gzip_decode,
12    snappy_encode, snappy_decode,
13    lz4_encode, lz4_decode,
14    lz4_encode_old_kafka, lz4_decode_old_kafka,
15)
16
17from test.fixtures import random_string
18
19
20def test_gzip():
21    for i in range(1000):
22        b1 = random_string(100).encode('utf-8')
23        b2 = gzip_decode(gzip_encode(b1))
24        assert b1 == b2
25
26
27@pytest.mark.skipif(not has_snappy(), reason="Snappy not available")
28def test_snappy():
29    for i in range(1000):
30        b1 = random_string(100).encode('utf-8')
31        b2 = snappy_decode(snappy_encode(b1))
32        assert b1 == b2
33
34
35@pytest.mark.skipif(not has_snappy(), reason="Snappy not available")
36def test_snappy_detect_xerial():
37    import kafka as kafka1
38    _detect_xerial_stream = kafka1.codec._detect_xerial_stream
39
40    header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01Some extra bytes'
41    false_header = b'\x01SNAPPY\x00\x00\x00\x01\x00\x00\x00\x01'
42    default_snappy = snappy_encode(b'foobar' * 50)
43    random_snappy = snappy_encode(b'SNAPPY' * 50, xerial_compatible=False)
44    short_data = b'\x01\x02\x03\x04'
45
46    assert _detect_xerial_stream(header) is True
47    assert _detect_xerial_stream(b'') is False
48    assert _detect_xerial_stream(b'\x00') is False
49    assert _detect_xerial_stream(false_header) is False
50    assert _detect_xerial_stream(default_snappy) is True
51    assert _detect_xerial_stream(random_snappy) is False
52    assert _detect_xerial_stream(short_data) is False
53
54
55@pytest.mark.skipif(not has_snappy(), reason="Snappy not available")
56def test_snappy_decode_xerial():
57    header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01'
58    random_snappy = snappy_encode(b'SNAPPY' * 50, xerial_compatible=False)
59    block_len = len(random_snappy)
60    random_snappy2 = snappy_encode(b'XERIAL' * 50, xerial_compatible=False)
61    block_len2 = len(random_snappy2)
62
63    to_test = header \
64        + struct.pack('!i', block_len) + random_snappy \
65        + struct.pack('!i', block_len2) + random_snappy2 \
66
67    assert snappy_decode(to_test) == (b'SNAPPY' * 50) + (b'XERIAL' * 50)
68
69
70@pytest.mark.skipif(not has_snappy(), reason="Snappy not available")
71def test_snappy_encode_xerial():
72    to_ensure = (
73        b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01'
74        b'\x00\x00\x00\x18'
75        b'\xac\x02\x14SNAPPY\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00'
76        b'\x00\x00\x00\x18'
77        b'\xac\x02\x14XERIAL\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00'
78    )
79
80    to_test = (b'SNAPPY' * 50) + (b'XERIAL' * 50)
81
82    compressed = snappy_encode(to_test, xerial_compatible=True, xerial_blocksize=300)
83    assert compressed == to_ensure
84
85
86@pytest.mark.skipif(not has_lz4() or platform.python_implementation() == 'PyPy',
87                    reason="python-lz4 crashes on old versions of pypy")
88def test_lz4():
89    for i in range(1000):
90        b1 = random_string(100).encode('utf-8')
91        b2 = lz4_decode(lz4_encode(b1))
92        assert len(b1) == len(b2)
93        assert b1 == b2
94
95
96@pytest.mark.skipif(not has_lz4() or platform.python_implementation() == 'PyPy',
97                    reason="python-lz4 crashes on old versions of pypy")
98def test_lz4_old():
99    for i in range(1000):
100        b1 = random_string(100).encode('utf-8')
101        b2 = lz4_decode_old_kafka(lz4_encode_old_kafka(b1))
102        assert len(b1) == len(b2)
103        assert b1 == b2
104
105
106@pytest.mark.skipif(not has_lz4() or platform.python_implementation() == 'PyPy',
107                    reason="python-lz4 crashes on old versions of pypy")
108def test_lz4_incremental():
109    for i in range(1000):
110        # lz4 max single block size is 4MB
111        # make sure we test with multiple-blocks
112        b1 = random_string(100).encode('utf-8') * 50000
113        b2 = lz4_decode(lz4_encode(b1))
114        assert len(b1) == len(b2)
115        assert b1 == b2
116