1#!/usr/bin/python -u
2#
3# p7zr library
4#
5# Copyright (c) 2019 Hiroshi Miura <miurahr@linux.com>
6# Copyright (c) 2004-2015 by Joachim Bauch, mail@joachim-bauch.de
7# 7-Zip Copyright (C) 1999-2010 Igor Pavlov
8# LZMA SDK Copyright (C) 1999-2010 Igor Pavlov
9#
10# This library is free software; you can redistribute it and/or
11# modify it under the terms of the GNU Lesser General Public
12# License as published by the Free Software Foundation; either
13# version 2.1 of the License, or (at your option) any later version.
14#
15# This library is distributed in the hope that it will be useful,
16# but WITHOUT ANY WARRANTY; without even the implied warranty of
17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18# Lesser General Public License for more details.
19#
20# You should have received a copy of the GNU Lesser General Public
21# License along with this library; if not, write to the Free Software
22# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
23#
24import bz2
25import io
26import lzma
27import os
28import queue
29import sys
30import threading
31from typing import IO, Any, BinaryIO, Dict, List, Optional, Union
32
33from py7zr import UnsupportedCompressionMethodError
34from py7zr.extra import CopyDecompressor, DeflateDecompressor, ISevenZipDecompressor, ZstdDecompressor
35from py7zr.helpers import MemIO, NullIO, calculate_crc32, readlink
36from py7zr.properties import READ_BLOCKSIZE, ArchivePassword, CompressionMethod
37
38if sys.version_info < (3, 6):
39    import pathlib2 as pathlib
40else:
41    import pathlib
42try:
43    import zstandard as Zstd  # type: ignore
44except ImportError:
45    Zstd = None
46
47
48class Worker:
49    """Extract worker class to invoke handler"""
50
51    def __init__(self, files, src_start: int, header) -> None:
52        self.target_filepath = {}  # type: Dict[int, Union[MemIO, pathlib.Path, None]]
53        self.files = files
54        self.src_start = src_start
55        self.header = header
56
57    def extract(self, fp: BinaryIO, parallel: bool, q=None) -> None:
58        """Extract worker method to handle 7zip folder and decompress each files."""
59        if hasattr(self.header, 'main_streams') and self.header.main_streams is not None:
60            src_end = self.src_start + self.header.main_streams.packinfo.packpositions[-1]
61            numfolders = self.header.main_streams.unpackinfo.numfolders
62            if numfolders == 1:
63                self.extract_single(fp, self.files, self.src_start, src_end, q)
64            else:
65                folders = self.header.main_streams.unpackinfo.folders
66                positions = self.header.main_streams.packinfo.packpositions
67                empty_files = [f for f in self.files if f.emptystream]
68                if not parallel:
69                    self.extract_single(fp, empty_files, 0, 0, q)
70                    for i in range(numfolders):
71                        self.extract_single(fp, folders[i].files, self.src_start + positions[i],
72                                            self.src_start + positions[i + 1], q)
73                else:
74                    filename = getattr(fp, 'name', None)
75                    self.extract_single(open(filename, 'rb'), empty_files, 0, 0, q)
76                    extract_threads = []
77                    for i in range(numfolders):
78                        p = threading.Thread(target=self.extract_single,
79                                             args=(filename, folders[i].files,
80                                                   self.src_start + positions[i], self.src_start + positions[i + 1], q))
81                        p.start()
82                        extract_threads.append((p))
83                    for p in extract_threads:
84                        p.join()
85        else:
86            empty_files = [f for f in self.files if f.emptystream]
87            self.extract_single(fp, empty_files, 0, 0, q)
88
89    def extract_single(self, fp: Union[BinaryIO, str], files, src_start: int, src_end: int,
90                       q: Optional[queue.Queue]) -> None:
91        """Single thread extractor that takes file lists in single 7zip folder."""
92        if files is None:
93            return
94        if isinstance(fp, str):
95            fp = open(fp, 'rb')
96        fp.seek(src_start)
97        for f in files:
98            if q is not None:
99                q.put(('s', str(f.filename), str(f.compressed) if f.compressed is not None else '0'))
100            fileish = self.target_filepath.get(f.id, None)
101            if fileish is not None:
102                fileish.parent.mkdir(parents=True, exist_ok=True)
103                with fileish.open(mode='wb') as ofp:
104                    if not f.emptystream:
105                        # extract to file
106                        self.decompress(fp, f.folder, ofp, f.uncompressed[-1], f.compressed, src_end)
107                        ofp.seek(0)
108                    else:
109                        pass  # just create empty file
110            elif not f.emptystream:
111                # read and bin off a data but check crc
112                with NullIO() as ofp:
113                    self.decompress(fp, f.folder, ofp, f.uncompressed[-1], f.compressed, src_end)
114            if q is not None:
115                q.put(('e', str(f.filename), str(f.uncompressed[-1])))
116
117    def decompress(self, fp: BinaryIO, folder, fq: IO[Any],
118                   size: int, compressed_size: Optional[int], src_end: int) -> None:
119        """decompressor wrapper called from extract method.
120
121           :parameter fp: archive source file pointer
122           :parameter folder: Folder object that have decompressor object.
123           :parameter fq: output file pathlib.Path
124           :parameter size: uncompressed size of target file.
125           :parameter compressed_size: compressed size of target file.
126           :parameter src_end: end position of the folder
127           :returns None
128        """
129        assert folder is not None
130        out_remaining = size
131        decompressor = folder.get_decompressor(compressed_size)
132        while out_remaining > 0:
133            max_length = min(out_remaining, io.DEFAULT_BUFFER_SIZE)
134            rest_size = src_end - fp.tell()
135            read_size = min(READ_BLOCKSIZE, rest_size)
136            if read_size == 0:
137                tmp = decompressor.decompress(b'', max_length)
138                if len(tmp) == 0:
139                    raise Exception("decompression get wrong: no output data.")
140            else:
141                inp = fp.read(read_size)
142                tmp = decompressor.decompress(inp, max_length)
143            if len(tmp) > 0 and out_remaining >= len(tmp):
144                out_remaining -= len(tmp)
145                fq.write(tmp)
146            if out_remaining <= 0:
147                break
148        if fp.tell() >= src_end:
149            if decompressor.crc is not None and not decompressor.check_crc():
150                print('\nCRC error! expected: {}, real: {}'.format(decompressor.crc, decompressor.digest))
151        return
152
153    def _find_link_target(self, target):
154        """Find the target member of a symlink or hardlink member in the archive.
155        """
156        targetname = target.as_posix()  # type: str
157        linkname = readlink(targetname)
158        # Check windows full path symlinks
159        if linkname.startswith("\\\\?\\"):
160            linkname = linkname[4:]
161        # normalize as posix style
162        linkname = pathlib.Path(linkname).as_posix()  # type: str
163        member = None
164        for j in range(len(self.files)):
165            if linkname == self.files[j].origin.as_posix():
166                # FIXME: when API user specify arcname, it will break
167                member = os.path.relpath(linkname, os.path.dirname(targetname))
168                break
169        if member is None:
170            member = linkname
171        return member
172
173    def archive(self, fp: BinaryIO, folder, deref=False):
174        """Run archive task for specified 7zip folder."""
175        compressor = folder.get_compressor()
176        outsize = 0
177        self.header.main_streams.packinfo.numstreams = 1
178        num_unpack_streams = 0
179        self.header.main_streams.substreamsinfo.digests = []
180        self.header.main_streams.substreamsinfo.digestsdefined = []
181        last_file_index = 0
182        foutsize = 0
183        for i, f in enumerate(self.files):
184            file_info = f.file_properties()
185            self.header.files_info.files.append(file_info)
186            self.header.files_info.emptyfiles.append(f.emptystream)
187            foutsize = 0
188            if f.is_symlink and not deref:
189                last_file_index = i
190                num_unpack_streams += 1
191                link_target = self._find_link_target(f.origin)  # type: str
192                tgt = link_target.encode('utf-8')  # type: bytes
193                insize = len(tgt)
194                crc = calculate_crc32(tgt, 0)  # type: int
195                out = compressor.compress(tgt)
196                outsize += len(out)
197                foutsize += len(out)
198                fp.write(out)
199                self.header.main_streams.substreamsinfo.digests.append(crc)
200                self.header.main_streams.substreamsinfo.digestsdefined.append(True)
201                self.header.main_streams.substreamsinfo.unpacksizes.append(insize)
202                self.header.files_info.files[i]['maxsize'] = foutsize
203            elif not f.emptystream:
204                last_file_index = i
205                num_unpack_streams += 1
206                insize = 0
207                with f.origin.open(mode='rb') as fd:
208                    data = fd.read(READ_BLOCKSIZE)
209                    insize += len(data)
210                    crc = 0
211                    while data:
212                        crc = calculate_crc32(data, crc)
213                        out = compressor.compress(data)
214                        outsize += len(out)
215                        foutsize += len(out)
216                        fp.write(out)
217                        data = fd.read(READ_BLOCKSIZE)
218                        insize += len(data)
219                    self.header.main_streams.substreamsinfo.digests.append(crc)
220                    self.header.main_streams.substreamsinfo.digestsdefined.append(True)
221                    self.header.files_info.files[i]['maxsize'] = foutsize
222                self.header.main_streams.substreamsinfo.unpacksizes.append(insize)
223        else:
224            out = compressor.flush()
225            outsize += len(out)
226            foutsize += len(out)
227            fp.write(out)
228            if len(self.files) > 0:
229                self.header.files_info.files[last_file_index]['maxsize'] = foutsize
230        # Update size data in header
231        self.header.main_streams.packinfo.packsizes = [outsize]
232        folder.unpacksizes = [sum(self.header.main_streams.substreamsinfo.unpacksizes)]
233        self.header.main_streams.substreamsinfo.num_unpackstreams_folders = [num_unpack_streams]
234
235    def register_filelike(self, id: int, fileish: Union[MemIO, pathlib.Path, None]) -> None:
236        """register file-ish to worker."""
237        self.target_filepath[id] = fileish
238
239
240class SevenZipDecompressor:
241    """Main decompressor object which is properly configured and bind to each 7zip folder.
242    because 7zip folder can have a custom compression method"""
243
244    lzma_methods_map = {
245        CompressionMethod.LZMA: lzma.FILTER_LZMA1,
246        CompressionMethod.LZMA2: lzma.FILTER_LZMA2,
247        CompressionMethod.DELTA: lzma.FILTER_DELTA,
248        CompressionMethod.P7Z_BCJ: lzma.FILTER_X86,
249        CompressionMethod.BCJ_ARM: lzma.FILTER_ARM,
250        CompressionMethod.BCJ_ARMT: lzma.FILTER_ARMTHUMB,
251        CompressionMethod.BCJ_IA64: lzma.FILTER_IA64,
252        CompressionMethod.BCJ_PPC: lzma.FILTER_POWERPC,
253        CompressionMethod.BCJ_SPARC: lzma.FILTER_SPARC,
254    }
255
256    FILTER_BZIP2 = 0x31
257    FILTER_ZIP = 0x32
258    FILTER_COPY = 0x33
259    FILTER_AES = 0x34
260    FILTER_ZSTD = 0x35
261    alt_methods_map = {
262        CompressionMethod.MISC_BZIP2: FILTER_BZIP2,
263        CompressionMethod.MISC_DEFLATE: FILTER_ZIP,
264        CompressionMethod.COPY: FILTER_COPY,
265        CompressionMethod.CRYPT_AES256_SHA256: FILTER_AES,
266        CompressionMethod.MISC_ZSTD: FILTER_ZSTD,
267    }
268
269    def __init__(self, coders: List[Dict[str, Any]], size: int, crc: Optional[int]) -> None:
270        # Get password which was set when creation of py7zr.SevenZipFile object.
271        self.input_size = size
272        self.consumed = 0  # type: int
273        self.crc = crc
274        self.digest = None  # type: Optional[int]
275        if self._check_lzma_coders(coders):
276            self._set_lzma_decompressor(coders)
277        else:
278            self._set_alternative_decompressor(coders)
279
280    def _check_lzma_coders(self, coders: List[Dict[str, Any]]) -> bool:
281        res = True
282        for coder in coders:
283            if self.lzma_methods_map.get(coder['method'], None) is None:
284                res = False
285                break
286        return res
287
288    def _set_lzma_decompressor(self, coders: List[Dict[str, Any]]) -> None:
289        filters = []  # type: List[Dict[str, Any]]
290        for coder in coders:
291            if coder['numinstreams'] != 1 or coder['numoutstreams'] != 1:
292                raise UnsupportedCompressionMethodError('Only a simple compression method is currently supported.')
293            filter_id = self.lzma_methods_map.get(coder['method'], None)
294            if filter_id is None:
295                raise UnsupportedCompressionMethodError
296            properties = coder.get('properties', None)
297            if properties is not None:
298                filters[:0] = [lzma._decode_filter_properties(filter_id, properties)]  # type: ignore
299            else:
300                filters[:0] = [{'id': filter_id}]
301        self.decompressor = lzma.LZMADecompressor(format=lzma.FORMAT_RAW, filters=filters)  # type: Union[bz2.BZ2Decompressor, lzma.LZMADecompressor, ISevenZipDecompressor]  # noqa
302
303    def _set_alternative_decompressor(self, coders: List[Dict[str, Any]]) -> None:
304        filter_id = self.alt_methods_map.get(coders[0]['method'], None)
305        if filter_id == self.FILTER_BZIP2:
306            self.decompressor = bz2.BZ2Decompressor()
307        elif filter_id == self.FILTER_ZIP:
308            self.decompressor = DeflateDecompressor()
309        elif filter_id == self.FILTER_COPY:
310            self.decompressor = CopyDecompressor()
311        elif filter_id == self.FILTER_ZSTD and Zstd:
312            self.decompressor = ZstdDecompressor()
313        else:
314            raise UnsupportedCompressionMethodError
315
316    def decompress(self, data: bytes, max_length: Optional[int] = None) -> bytes:
317        self.consumed += len(data)
318        if max_length is not None:
319            folder_data = self.decompressor.decompress(data, max_length=max_length)
320        else:
321            folder_data = self.decompressor.decompress(data)
322        # calculate CRC with uncompressed data
323        if self.crc is not None:
324            self.digest = calculate_crc32(folder_data, self.digest)
325        return folder_data
326
327    def check_crc(self):
328        return self.crc == self.digest
329
330
331class SevenZipCompressor:
332
333    """Main compressor object to configured for each 7zip folder."""
334
335    __slots__ = ['filters', 'compressor', 'coders']
336
337    lzma_methods_map_r = {
338        lzma.FILTER_LZMA2: CompressionMethod.LZMA2,
339        lzma.FILTER_DELTA: CompressionMethod.DELTA,
340        lzma.FILTER_X86: CompressionMethod.P7Z_BCJ,
341    }
342
343    def __init__(self, filters=None):
344        if filters is None:
345            self.filters = [{"id": lzma.FILTER_LZMA2, "preset": 7 | lzma.PRESET_EXTREME}, ]
346        else:
347            self.filters = filters
348        self.compressor = lzma.LZMACompressor(format=lzma.FORMAT_RAW, filters=self.filters)
349        self.coders = []
350        for filter in self.filters:
351            if filter is None:
352                break
353            method = self.lzma_methods_map_r[filter['id']]
354            properties = lzma._encode_filter_properties(filter)
355            self.coders.append({'method': method, 'properties': properties, 'numinstreams': 1, 'numoutstreams': 1})
356
357    def compress(self, data):
358        return self.compressor.compress(data)
359
360    def flush(self):
361        return self.compressor.flush()
362
363
364def get_methods_names(coders: List[dict]) -> List[str]:
365    """Return human readable method names for specified coders"""
366    methods_name_map = {
367        CompressionMethod.LZMA2: "LZMA2",
368        CompressionMethod.LZMA: "LZMA",
369        CompressionMethod.DELTA: "delta",
370        CompressionMethod.P7Z_BCJ: "BCJ",
371        CompressionMethod.BCJ_ARM: "BCJ(ARM)",
372        CompressionMethod.BCJ_ARMT: "BCJ(ARMT)",
373        CompressionMethod.BCJ_IA64: "BCJ(IA64)",
374        CompressionMethod.BCJ_PPC: "BCJ(POWERPC)",
375        CompressionMethod.BCJ_SPARC: "BCJ(SPARC)",
376        CompressionMethod.CRYPT_AES256_SHA256: "7zAES",
377    }
378    methods_names = []  # type: List[str]
379    for coder in coders:
380        try:
381            methods_names.append(methods_name_map[coder['method']])
382        except KeyError:
383            raise UnsupportedCompressionMethodError("Unknown method {}".format(coder['method']))
384    return methods_names
385