1from __future__ import absolute_import 2 3import platform 4import struct 5 6import pytest 7from kafka.vendor.six.moves import range 8 9from kafka.codec import ( 10 has_snappy, has_gzip, has_lz4, 11 gzip_encode, gzip_decode, 12 snappy_encode, snappy_decode, 13 lz4_encode, lz4_decode, 14 lz4_encode_old_kafka, lz4_decode_old_kafka, 15) 16 17from test.fixtures import random_string 18 19 20def test_gzip(): 21 for i in range(1000): 22 b1 = random_string(100).encode('utf-8') 23 b2 = gzip_decode(gzip_encode(b1)) 24 assert b1 == b2 25 26 27@pytest.mark.skipif(not has_snappy(), reason="Snappy not available") 28def test_snappy(): 29 for i in range(1000): 30 b1 = random_string(100).encode('utf-8') 31 b2 = snappy_decode(snappy_encode(b1)) 32 assert b1 == b2 33 34 35@pytest.mark.skipif(not has_snappy(), reason="Snappy not available") 36def test_snappy_detect_xerial(): 37 import kafka as kafka1 38 _detect_xerial_stream = kafka1.codec._detect_xerial_stream 39 40 header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01Some extra bytes' 41 false_header = b'\x01SNAPPY\x00\x00\x00\x01\x00\x00\x00\x01' 42 default_snappy = snappy_encode(b'foobar' * 50) 43 random_snappy = snappy_encode(b'SNAPPY' * 50, xerial_compatible=False) 44 short_data = b'\x01\x02\x03\x04' 45 46 assert _detect_xerial_stream(header) is True 47 assert _detect_xerial_stream(b'') is False 48 assert _detect_xerial_stream(b'\x00') is False 49 assert _detect_xerial_stream(false_header) is False 50 assert _detect_xerial_stream(default_snappy) is True 51 assert _detect_xerial_stream(random_snappy) is False 52 assert _detect_xerial_stream(short_data) is False 53 54 55@pytest.mark.skipif(not has_snappy(), reason="Snappy not available") 56def test_snappy_decode_xerial(): 57 header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01' 58 random_snappy = snappy_encode(b'SNAPPY' * 50, xerial_compatible=False) 59 block_len = len(random_snappy) 60 random_snappy2 = snappy_encode(b'XERIAL' * 50, xerial_compatible=False) 61 block_len2 = len(random_snappy2) 62 63 to_test = header \ 64 + struct.pack('!i', block_len) + random_snappy \ 65 + struct.pack('!i', block_len2) + random_snappy2 \ 66 67 assert snappy_decode(to_test) == (b'SNAPPY' * 50) + (b'XERIAL' * 50) 68 69 70@pytest.mark.skipif(not has_snappy(), reason="Snappy not available") 71def test_snappy_encode_xerial(): 72 to_ensure = ( 73 b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01' 74 b'\x00\x00\x00\x18' 75 b'\xac\x02\x14SNAPPY\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00' 76 b'\x00\x00\x00\x18' 77 b'\xac\x02\x14XERIAL\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00' 78 ) 79 80 to_test = (b'SNAPPY' * 50) + (b'XERIAL' * 50) 81 82 compressed = snappy_encode(to_test, xerial_compatible=True, xerial_blocksize=300) 83 assert compressed == to_ensure 84 85 86@pytest.mark.skipif(not has_lz4() or platform.python_implementation() == 'PyPy', 87 reason="python-lz4 crashes on old versions of pypy") 88def test_lz4(): 89 for i in range(1000): 90 b1 = random_string(100).encode('utf-8') 91 b2 = lz4_decode(lz4_encode(b1)) 92 assert len(b1) == len(b2) 93 assert b1 == b2 94 95 96@pytest.mark.skipif(not has_lz4() or platform.python_implementation() == 'PyPy', 97 reason="python-lz4 crashes on old versions of pypy") 98def test_lz4_old(): 99 for i in range(1000): 100 b1 = random_string(100).encode('utf-8') 101 b2 = lz4_decode_old_kafka(lz4_encode_old_kafka(b1)) 102 assert len(b1) == len(b2) 103 assert b1 == b2 104 105 106@pytest.mark.skipif(not has_lz4() or platform.python_implementation() == 'PyPy', 107 reason="python-lz4 crashes on old versions of pypy") 108def test_lz4_incremental(): 109 for i in range(1000): 110 # lz4 max single block size is 4MB 111 # make sure we test with multiple-blocks 112 b1 = random_string(100).encode('utf-8') * 50000 113 b2 = lz4_decode(lz4_encode(b1)) 114 assert len(b1) == len(b2) 115 assert b1 == b2 116