1#!/usr/bin/env python3 2 3## 4# Licensed to the Apache Software Foundation (ASF) under one 5# or more contributor license agreements. See the NOTICE file 6# distributed with this work for additional information 7# regarding copyright ownership. The ASF licenses this file 8# to you under the Apache License, Version 2.0 (the 9# "License"); you may not use this file except in compliance 10# with the License. You may obtain a copy of the License at 11# 12# https://www.apache.org/licenses/LICENSE-2.0 13# 14# Unless required by applicable law or agreed to in writing, software 15# distributed under the License is distributed on an "AS IS" BASIS, 16# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 17# See the License for the specific language governing permissions and 18# limitations under the License. 19 20""" 21Contains Codecs for Python Avro. 22 23Note that the word "codecs" means "compression/decompression algorithms" in the 24Avro world (https://avro.apache.org/docs/current/spec.html#Object+Container+Files), 25so don't confuse it with the Python's "codecs", which is a package mainly for 26converting charsets (https://docs.python.org/3/library/codecs.html). 27""" 28 29import abc 30import binascii 31import io 32import struct 33import zlib 34from typing import Dict, Tuple, Type 35 36import avro.errors 37import avro.io 38 39# 40# Constants 41# 42STRUCT_CRC32 = struct.Struct(">I") # big-endian unsigned int 43 44 45def _check_crc32(bytes_: bytes, checksum: bytes) -> None: 46 if binascii.crc32(bytes_) & 0xFFFFFFFF != STRUCT_CRC32.unpack(checksum)[0]: 47 raise avro.errors.AvroException("Checksum failure") 48 49 50try: 51 import bz2 52 53 has_bzip2 = True 54except ImportError: 55 has_bzip2 = False 56try: 57 import snappy 58 59 has_snappy = True 60except ImportError: 61 has_snappy = False 62try: 63 import zstandard as zstd 64 65 has_zstandard = True 66except ImportError: 67 has_zstandard = False 68 69 70class Codec(abc.ABC): 71 """Abstract base class for all Avro codec classes.""" 72 73 @staticmethod 74 @abc.abstractmethod 75 def compress(data: bytes) -> Tuple[bytes, int]: 76 """Compress the passed data. 77 78 :param data: a byte string to be compressed 79 :type data: str 80 81 :rtype: tuple 82 :return: compressed data and its length 83 """ 84 85 @staticmethod 86 @abc.abstractmethod 87 def decompress(readers_decoder: avro.io.BinaryDecoder) -> avro.io.BinaryDecoder: 88 """Read compressed data via the passed BinaryDecoder and decompress it. 89 90 :param readers_decoder: a BinaryDecoder object currently being used for 91 reading an object container file 92 :type readers_decoder: avro.io.BinaryDecoder 93 94 :rtype: avro.io.BinaryDecoder 95 :return: a newly instantiated BinaryDecoder object that contains the 96 decompressed data which is wrapped by a StringIO 97 """ 98 99 100class NullCodec(Codec): 101 @staticmethod 102 def compress(data: bytes) -> Tuple[bytes, int]: 103 return data, len(data) 104 105 @staticmethod 106 def decompress(readers_decoder: avro.io.BinaryDecoder) -> avro.io.BinaryDecoder: 107 readers_decoder.skip_long() 108 return readers_decoder 109 110 111class DeflateCodec(Codec): 112 @staticmethod 113 def compress(data: bytes) -> Tuple[bytes, int]: 114 # The first two characters and last character are zlib 115 # wrappers around deflate data. 116 compressed_data = zlib.compress(data)[2:-1] 117 return compressed_data, len(compressed_data) 118 119 @staticmethod 120 def decompress(readers_decoder: avro.io.BinaryDecoder) -> avro.io.BinaryDecoder: 121 # Compressed data is stored as (length, data), which 122 # corresponds to how the "bytes" type is encoded. 123 data = readers_decoder.read_bytes() 124 # -15 is the log of the window size; negative indicates 125 # "raw" (no zlib headers) decompression. See zlib.h. 126 uncompressed = zlib.decompress(data, -15) 127 return avro.io.BinaryDecoder(io.BytesIO(uncompressed)) 128 129 130if has_bzip2: 131 132 class BZip2Codec(Codec): 133 @staticmethod 134 def compress(data: bytes) -> Tuple[bytes, int]: 135 compressed_data = bz2.compress(data) 136 return compressed_data, len(compressed_data) 137 138 @staticmethod 139 def decompress(readers_decoder: avro.io.BinaryDecoder) -> avro.io.BinaryDecoder: 140 length = readers_decoder.read_long() 141 data = readers_decoder.read(length) 142 uncompressed = bz2.decompress(data) 143 return avro.io.BinaryDecoder(io.BytesIO(uncompressed)) 144 145 146if has_snappy: 147 148 class SnappyCodec(Codec): 149 @staticmethod 150 def compress(data: bytes) -> Tuple[bytes, int]: 151 compressed_data = snappy.compress(data) 152 # A 4-byte, big-endian CRC32 checksum 153 compressed_data += STRUCT_CRC32.pack(binascii.crc32(data) & 0xFFFFFFFF) 154 return compressed_data, len(compressed_data) 155 156 @staticmethod 157 def decompress(readers_decoder: avro.io.BinaryDecoder) -> avro.io.BinaryDecoder: 158 # Compressed data includes a 4-byte CRC32 checksum 159 length = readers_decoder.read_long() 160 data = readers_decoder.read(length - 4) 161 uncompressed = snappy.decompress(data) 162 checksum = readers_decoder.read(4) 163 _check_crc32(uncompressed, checksum) 164 return avro.io.BinaryDecoder(io.BytesIO(uncompressed)) 165 166 167if has_zstandard: 168 169 class ZstandardCodec(Codec): 170 @staticmethod 171 def compress(data: bytes) -> Tuple[bytes, int]: 172 compressed_data = zstd.ZstdCompressor().compress(data) 173 return compressed_data, len(compressed_data) 174 175 @staticmethod 176 def decompress(readers_decoder: avro.io.BinaryDecoder) -> avro.io.BinaryDecoder: 177 length = readers_decoder.read_long() 178 data = readers_decoder.read(length) 179 uncompressed = bytearray() 180 dctx = zstd.ZstdDecompressor() 181 with dctx.stream_reader(io.BytesIO(data)) as reader: 182 while True: 183 chunk = reader.read(16384) 184 if not chunk: 185 break 186 uncompressed.extend(chunk) 187 return avro.io.BinaryDecoder(io.BytesIO(uncompressed)) 188 189 190KNOWN_CODECS: Dict[str, Type[Codec]] = { 191 name[:-5].lower(): class_ 192 for name, class_ in globals().items() 193 if class_ != Codec and name.endswith("Codec") and isinstance(class_, type) and issubclass(class_, Codec) 194} 195 196 197def get_codec(codec_name: str) -> Type[Codec]: 198 try: 199 return KNOWN_CODECS[codec_name] 200 except KeyError: 201 raise avro.errors.UnsupportedCodec(f"Unsupported codec: {codec_name}. (Is it installed?)") 202