1#!/usr/bin/env python3
2
3##
4# Licensed to the Apache Software Foundation (ASF) under one
5# or more contributor license agreements.  See the NOTICE file
6# distributed with this work for additional information
7# regarding copyright ownership.  The ASF licenses this file
8# to you under the Apache License, Version 2.0 (the
9# "License"); you may not use this file except in compliance
10# with the License.  You may obtain a copy of the License at
11#
12# https://www.apache.org/licenses/LICENSE-2.0
13#
14# Unless required by applicable law or agreed to in writing, software
15# distributed under the License is distributed on an "AS IS" BASIS,
16# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17# See the License for the specific language governing permissions and
18# limitations under the License.
19
20"""
21Contains Codecs for Python Avro.
22
23Note that the word "codecs" means "compression/decompression algorithms" in the
24Avro world (https://avro.apache.org/docs/current/spec.html#Object+Container+Files),
25so don't confuse it with the Python's "codecs", which is a package mainly for
26converting charsets (https://docs.python.org/3/library/codecs.html).
27"""
28
29import abc
30import binascii
31import io
32import struct
33import zlib
34from typing import Dict, Tuple, Type
35
36import avro.errors
37import avro.io
38
39#
40# Constants
41#
42STRUCT_CRC32 = struct.Struct(">I")  # big-endian unsigned int
43
44
45def _check_crc32(bytes_: bytes, checksum: bytes) -> None:
46    if binascii.crc32(bytes_) & 0xFFFFFFFF != STRUCT_CRC32.unpack(checksum)[0]:
47        raise avro.errors.AvroException("Checksum failure")
48
49
50try:
51    import bz2
52
53    has_bzip2 = True
54except ImportError:
55    has_bzip2 = False
56try:
57    import snappy
58
59    has_snappy = True
60except ImportError:
61    has_snappy = False
62try:
63    import zstandard as zstd
64
65    has_zstandard = True
66except ImportError:
67    has_zstandard = False
68
69
70class Codec(abc.ABC):
71    """Abstract base class for all Avro codec classes."""
72
73    @staticmethod
74    @abc.abstractmethod
75    def compress(data: bytes) -> Tuple[bytes, int]:
76        """Compress the passed data.
77
78        :param data: a byte string to be compressed
79        :type data: str
80
81        :rtype: tuple
82        :return: compressed data and its length
83        """
84
85    @staticmethod
86    @abc.abstractmethod
87    def decompress(readers_decoder: avro.io.BinaryDecoder) -> avro.io.BinaryDecoder:
88        """Read compressed data via the passed BinaryDecoder and decompress it.
89
90        :param readers_decoder: a BinaryDecoder object currently being used for
91                                reading an object container file
92        :type readers_decoder: avro.io.BinaryDecoder
93
94        :rtype: avro.io.BinaryDecoder
95        :return: a newly instantiated BinaryDecoder object that contains the
96                 decompressed data which is wrapped by a StringIO
97        """
98
99
100class NullCodec(Codec):
101    @staticmethod
102    def compress(data: bytes) -> Tuple[bytes, int]:
103        return data, len(data)
104
105    @staticmethod
106    def decompress(readers_decoder: avro.io.BinaryDecoder) -> avro.io.BinaryDecoder:
107        readers_decoder.skip_long()
108        return readers_decoder
109
110
111class DeflateCodec(Codec):
112    @staticmethod
113    def compress(data: bytes) -> Tuple[bytes, int]:
114        # The first two characters and last character are zlib
115        # wrappers around deflate data.
116        compressed_data = zlib.compress(data)[2:-1]
117        return compressed_data, len(compressed_data)
118
119    @staticmethod
120    def decompress(readers_decoder: avro.io.BinaryDecoder) -> avro.io.BinaryDecoder:
121        # Compressed data is stored as (length, data), which
122        # corresponds to how the "bytes" type is encoded.
123        data = readers_decoder.read_bytes()
124        # -15 is the log of the window size; negative indicates
125        # "raw" (no zlib headers) decompression.  See zlib.h.
126        uncompressed = zlib.decompress(data, -15)
127        return avro.io.BinaryDecoder(io.BytesIO(uncompressed))
128
129
130if has_bzip2:
131
132    class BZip2Codec(Codec):
133        @staticmethod
134        def compress(data: bytes) -> Tuple[bytes, int]:
135            compressed_data = bz2.compress(data)
136            return compressed_data, len(compressed_data)
137
138        @staticmethod
139        def decompress(readers_decoder: avro.io.BinaryDecoder) -> avro.io.BinaryDecoder:
140            length = readers_decoder.read_long()
141            data = readers_decoder.read(length)
142            uncompressed = bz2.decompress(data)
143            return avro.io.BinaryDecoder(io.BytesIO(uncompressed))
144
145
146if has_snappy:
147
148    class SnappyCodec(Codec):
149        @staticmethod
150        def compress(data: bytes) -> Tuple[bytes, int]:
151            compressed_data = snappy.compress(data)
152            # A 4-byte, big-endian CRC32 checksum
153            compressed_data += STRUCT_CRC32.pack(binascii.crc32(data) & 0xFFFFFFFF)
154            return compressed_data, len(compressed_data)
155
156        @staticmethod
157        def decompress(readers_decoder: avro.io.BinaryDecoder) -> avro.io.BinaryDecoder:
158            # Compressed data includes a 4-byte CRC32 checksum
159            length = readers_decoder.read_long()
160            data = readers_decoder.read(length - 4)
161            uncompressed = snappy.decompress(data)
162            checksum = readers_decoder.read(4)
163            _check_crc32(uncompressed, checksum)
164            return avro.io.BinaryDecoder(io.BytesIO(uncompressed))
165
166
167if has_zstandard:
168
169    class ZstandardCodec(Codec):
170        @staticmethod
171        def compress(data: bytes) -> Tuple[bytes, int]:
172            compressed_data = zstd.ZstdCompressor().compress(data)
173            return compressed_data, len(compressed_data)
174
175        @staticmethod
176        def decompress(readers_decoder: avro.io.BinaryDecoder) -> avro.io.BinaryDecoder:
177            length = readers_decoder.read_long()
178            data = readers_decoder.read(length)
179            uncompressed = bytearray()
180            dctx = zstd.ZstdDecompressor()
181            with dctx.stream_reader(io.BytesIO(data)) as reader:
182                while True:
183                    chunk = reader.read(16384)
184                    if not chunk:
185                        break
186                    uncompressed.extend(chunk)
187            return avro.io.BinaryDecoder(io.BytesIO(uncompressed))
188
189
190KNOWN_CODECS: Dict[str, Type[Codec]] = {
191    name[:-5].lower(): class_
192    for name, class_ in globals().items()
193    if class_ != Codec and name.endswith("Codec") and isinstance(class_, type) and issubclass(class_, Codec)
194}
195
196
197def get_codec(codec_name: str) -> Type[Codec]:
198    try:
199        return KNOWN_CODECS[codec_name]
200    except KeyError:
201        raise avro.errors.UnsupportedCodec(f"Unsupported codec: {codec_name}. (Is it installed?)")
202