1#!/usr/bin/python -u 2# 3# p7zr library 4# 5# Copyright (c) 2019 Hiroshi Miura <miurahr@linux.com> 6# Copyright (c) 2004-2015 by Joachim Bauch, mail@joachim-bauch.de 7# 7-Zip Copyright (C) 1999-2010 Igor Pavlov 8# LZMA SDK Copyright (C) 1999-2010 Igor Pavlov 9# 10# This library is free software; you can redistribute it and/or 11# modify it under the terms of the GNU Lesser General Public 12# License as published by the Free Software Foundation; either 13# version 2.1 of the License, or (at your option) any later version. 14# 15# This library is distributed in the hope that it will be useful, 16# but WITHOUT ANY WARRANTY; without even the implied warranty of 17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18# Lesser General Public License for more details. 19# 20# You should have received a copy of the GNU Lesser General Public 21# License along with this library; if not, write to the Free Software 22# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 23# 24import bz2 25import io 26import lzma 27import os 28import queue 29import sys 30import threading 31from typing import IO, Any, BinaryIO, Dict, List, Optional, Union 32 33from py7zr import UnsupportedCompressionMethodError 34from py7zr.extra import CopyDecompressor, DeflateDecompressor, ISevenZipDecompressor, ZstdDecompressor 35from py7zr.helpers import MemIO, NullIO, calculate_crc32, readlink 36from py7zr.properties import READ_BLOCKSIZE, ArchivePassword, CompressionMethod 37 38if sys.version_info < (3, 6): 39 import pathlib2 as pathlib 40else: 41 import pathlib 42try: 43 import zstandard as Zstd # type: ignore 44except ImportError: 45 Zstd = None 46 47 48class Worker: 49 """Extract worker class to invoke handler""" 50 51 def __init__(self, files, src_start: int, header) -> None: 52 self.target_filepath = {} # type: Dict[int, Union[MemIO, pathlib.Path, None]] 53 self.files = files 54 self.src_start = src_start 55 self.header = header 56 57 def extract(self, fp: BinaryIO, parallel: bool, q=None) -> None: 58 """Extract worker method to handle 7zip folder and decompress each files.""" 59 if hasattr(self.header, 'main_streams') and self.header.main_streams is not None: 60 src_end = self.src_start + self.header.main_streams.packinfo.packpositions[-1] 61 numfolders = self.header.main_streams.unpackinfo.numfolders 62 if numfolders == 1: 63 self.extract_single(fp, self.files, self.src_start, src_end, q) 64 else: 65 folders = self.header.main_streams.unpackinfo.folders 66 positions = self.header.main_streams.packinfo.packpositions 67 empty_files = [f for f in self.files if f.emptystream] 68 if not parallel: 69 self.extract_single(fp, empty_files, 0, 0, q) 70 for i in range(numfolders): 71 self.extract_single(fp, folders[i].files, self.src_start + positions[i], 72 self.src_start + positions[i + 1], q) 73 else: 74 filename = getattr(fp, 'name', None) 75 self.extract_single(open(filename, 'rb'), empty_files, 0, 0, q) 76 extract_threads = [] 77 for i in range(numfolders): 78 p = threading.Thread(target=self.extract_single, 79 args=(filename, folders[i].files, 80 self.src_start + positions[i], self.src_start + positions[i + 1], q)) 81 p.start() 82 extract_threads.append((p)) 83 for p in extract_threads: 84 p.join() 85 else: 86 empty_files = [f for f in self.files if f.emptystream] 87 self.extract_single(fp, empty_files, 0, 0, q) 88 89 def extract_single(self, fp: Union[BinaryIO, str], files, src_start: int, src_end: int, 90 q: Optional[queue.Queue]) -> None: 91 """Single thread extractor that takes file lists in single 7zip folder.""" 92 if files is None: 93 return 94 if isinstance(fp, str): 95 fp = open(fp, 'rb') 96 fp.seek(src_start) 97 for f in files: 98 if q is not None: 99 q.put(('s', str(f.filename), str(f.compressed) if f.compressed is not None else '0')) 100 fileish = self.target_filepath.get(f.id, None) 101 if fileish is not None: 102 fileish.parent.mkdir(parents=True, exist_ok=True) 103 with fileish.open(mode='wb') as ofp: 104 if not f.emptystream: 105 # extract to file 106 self.decompress(fp, f.folder, ofp, f.uncompressed[-1], f.compressed, src_end) 107 ofp.seek(0) 108 else: 109 pass # just create empty file 110 elif not f.emptystream: 111 # read and bin off a data but check crc 112 with NullIO() as ofp: 113 self.decompress(fp, f.folder, ofp, f.uncompressed[-1], f.compressed, src_end) 114 if q is not None: 115 q.put(('e', str(f.filename), str(f.uncompressed[-1]))) 116 117 def decompress(self, fp: BinaryIO, folder, fq: IO[Any], 118 size: int, compressed_size: Optional[int], src_end: int) -> None: 119 """decompressor wrapper called from extract method. 120 121 :parameter fp: archive source file pointer 122 :parameter folder: Folder object that have decompressor object. 123 :parameter fq: output file pathlib.Path 124 :parameter size: uncompressed size of target file. 125 :parameter compressed_size: compressed size of target file. 126 :parameter src_end: end position of the folder 127 :returns None 128 """ 129 assert folder is not None 130 out_remaining = size 131 decompressor = folder.get_decompressor(compressed_size) 132 while out_remaining > 0: 133 max_length = min(out_remaining, io.DEFAULT_BUFFER_SIZE) 134 rest_size = src_end - fp.tell() 135 read_size = min(READ_BLOCKSIZE, rest_size) 136 if read_size == 0: 137 tmp = decompressor.decompress(b'', max_length) 138 if len(tmp) == 0: 139 raise Exception("decompression get wrong: no output data.") 140 else: 141 inp = fp.read(read_size) 142 tmp = decompressor.decompress(inp, max_length) 143 if len(tmp) > 0 and out_remaining >= len(tmp): 144 out_remaining -= len(tmp) 145 fq.write(tmp) 146 if out_remaining <= 0: 147 break 148 if fp.tell() >= src_end: 149 if decompressor.crc is not None and not decompressor.check_crc(): 150 print('\nCRC error! expected: {}, real: {}'.format(decompressor.crc, decompressor.digest)) 151 return 152 153 def _find_link_target(self, target): 154 """Find the target member of a symlink or hardlink member in the archive. 155 """ 156 targetname = target.as_posix() # type: str 157 linkname = readlink(targetname) 158 # Check windows full path symlinks 159 if linkname.startswith("\\\\?\\"): 160 linkname = linkname[4:] 161 # normalize as posix style 162 linkname = pathlib.Path(linkname).as_posix() # type: str 163 member = None 164 for j in range(len(self.files)): 165 if linkname == self.files[j].origin.as_posix(): 166 # FIXME: when API user specify arcname, it will break 167 member = os.path.relpath(linkname, os.path.dirname(targetname)) 168 break 169 if member is None: 170 member = linkname 171 return member 172 173 def archive(self, fp: BinaryIO, folder, deref=False): 174 """Run archive task for specified 7zip folder.""" 175 compressor = folder.get_compressor() 176 outsize = 0 177 self.header.main_streams.packinfo.numstreams = 1 178 num_unpack_streams = 0 179 self.header.main_streams.substreamsinfo.digests = [] 180 self.header.main_streams.substreamsinfo.digestsdefined = [] 181 last_file_index = 0 182 foutsize = 0 183 for i, f in enumerate(self.files): 184 file_info = f.file_properties() 185 self.header.files_info.files.append(file_info) 186 self.header.files_info.emptyfiles.append(f.emptystream) 187 foutsize = 0 188 if f.is_symlink and not deref: 189 last_file_index = i 190 num_unpack_streams += 1 191 link_target = self._find_link_target(f.origin) # type: str 192 tgt = link_target.encode('utf-8') # type: bytes 193 insize = len(tgt) 194 crc = calculate_crc32(tgt, 0) # type: int 195 out = compressor.compress(tgt) 196 outsize += len(out) 197 foutsize += len(out) 198 fp.write(out) 199 self.header.main_streams.substreamsinfo.digests.append(crc) 200 self.header.main_streams.substreamsinfo.digestsdefined.append(True) 201 self.header.main_streams.substreamsinfo.unpacksizes.append(insize) 202 self.header.files_info.files[i]['maxsize'] = foutsize 203 elif not f.emptystream: 204 last_file_index = i 205 num_unpack_streams += 1 206 insize = 0 207 with f.origin.open(mode='rb') as fd: 208 data = fd.read(READ_BLOCKSIZE) 209 insize += len(data) 210 crc = 0 211 while data: 212 crc = calculate_crc32(data, crc) 213 out = compressor.compress(data) 214 outsize += len(out) 215 foutsize += len(out) 216 fp.write(out) 217 data = fd.read(READ_BLOCKSIZE) 218 insize += len(data) 219 self.header.main_streams.substreamsinfo.digests.append(crc) 220 self.header.main_streams.substreamsinfo.digestsdefined.append(True) 221 self.header.files_info.files[i]['maxsize'] = foutsize 222 self.header.main_streams.substreamsinfo.unpacksizes.append(insize) 223 else: 224 out = compressor.flush() 225 outsize += len(out) 226 foutsize += len(out) 227 fp.write(out) 228 if len(self.files) > 0: 229 self.header.files_info.files[last_file_index]['maxsize'] = foutsize 230 # Update size data in header 231 self.header.main_streams.packinfo.packsizes = [outsize] 232 folder.unpacksizes = [sum(self.header.main_streams.substreamsinfo.unpacksizes)] 233 self.header.main_streams.substreamsinfo.num_unpackstreams_folders = [num_unpack_streams] 234 235 def register_filelike(self, id: int, fileish: Union[MemIO, pathlib.Path, None]) -> None: 236 """register file-ish to worker.""" 237 self.target_filepath[id] = fileish 238 239 240class SevenZipDecompressor: 241 """Main decompressor object which is properly configured and bind to each 7zip folder. 242 because 7zip folder can have a custom compression method""" 243 244 lzma_methods_map = { 245 CompressionMethod.LZMA: lzma.FILTER_LZMA1, 246 CompressionMethod.LZMA2: lzma.FILTER_LZMA2, 247 CompressionMethod.DELTA: lzma.FILTER_DELTA, 248 CompressionMethod.P7Z_BCJ: lzma.FILTER_X86, 249 CompressionMethod.BCJ_ARM: lzma.FILTER_ARM, 250 CompressionMethod.BCJ_ARMT: lzma.FILTER_ARMTHUMB, 251 CompressionMethod.BCJ_IA64: lzma.FILTER_IA64, 252 CompressionMethod.BCJ_PPC: lzma.FILTER_POWERPC, 253 CompressionMethod.BCJ_SPARC: lzma.FILTER_SPARC, 254 } 255 256 FILTER_BZIP2 = 0x31 257 FILTER_ZIP = 0x32 258 FILTER_COPY = 0x33 259 FILTER_AES = 0x34 260 FILTER_ZSTD = 0x35 261 alt_methods_map = { 262 CompressionMethod.MISC_BZIP2: FILTER_BZIP2, 263 CompressionMethod.MISC_DEFLATE: FILTER_ZIP, 264 CompressionMethod.COPY: FILTER_COPY, 265 CompressionMethod.CRYPT_AES256_SHA256: FILTER_AES, 266 CompressionMethod.MISC_ZSTD: FILTER_ZSTD, 267 } 268 269 def __init__(self, coders: List[Dict[str, Any]], size: int, crc: Optional[int]) -> None: 270 # Get password which was set when creation of py7zr.SevenZipFile object. 271 self.input_size = size 272 self.consumed = 0 # type: int 273 self.crc = crc 274 self.digest = None # type: Optional[int] 275 if self._check_lzma_coders(coders): 276 self._set_lzma_decompressor(coders) 277 else: 278 self._set_alternative_decompressor(coders) 279 280 def _check_lzma_coders(self, coders: List[Dict[str, Any]]) -> bool: 281 res = True 282 for coder in coders: 283 if self.lzma_methods_map.get(coder['method'], None) is None: 284 res = False 285 break 286 return res 287 288 def _set_lzma_decompressor(self, coders: List[Dict[str, Any]]) -> None: 289 filters = [] # type: List[Dict[str, Any]] 290 for coder in coders: 291 if coder['numinstreams'] != 1 or coder['numoutstreams'] != 1: 292 raise UnsupportedCompressionMethodError('Only a simple compression method is currently supported.') 293 filter_id = self.lzma_methods_map.get(coder['method'], None) 294 if filter_id is None: 295 raise UnsupportedCompressionMethodError 296 properties = coder.get('properties', None) 297 if properties is not None: 298 filters[:0] = [lzma._decode_filter_properties(filter_id, properties)] # type: ignore 299 else: 300 filters[:0] = [{'id': filter_id}] 301 self.decompressor = lzma.LZMADecompressor(format=lzma.FORMAT_RAW, filters=filters) # type: Union[bz2.BZ2Decompressor, lzma.LZMADecompressor, ISevenZipDecompressor] # noqa 302 303 def _set_alternative_decompressor(self, coders: List[Dict[str, Any]]) -> None: 304 filter_id = self.alt_methods_map.get(coders[0]['method'], None) 305 if filter_id == self.FILTER_BZIP2: 306 self.decompressor = bz2.BZ2Decompressor() 307 elif filter_id == self.FILTER_ZIP: 308 self.decompressor = DeflateDecompressor() 309 elif filter_id == self.FILTER_COPY: 310 self.decompressor = CopyDecompressor() 311 elif filter_id == self.FILTER_ZSTD and Zstd: 312 self.decompressor = ZstdDecompressor() 313 else: 314 raise UnsupportedCompressionMethodError 315 316 def decompress(self, data: bytes, max_length: Optional[int] = None) -> bytes: 317 self.consumed += len(data) 318 if max_length is not None: 319 folder_data = self.decompressor.decompress(data, max_length=max_length) 320 else: 321 folder_data = self.decompressor.decompress(data) 322 # calculate CRC with uncompressed data 323 if self.crc is not None: 324 self.digest = calculate_crc32(folder_data, self.digest) 325 return folder_data 326 327 def check_crc(self): 328 return self.crc == self.digest 329 330 331class SevenZipCompressor: 332 333 """Main compressor object to configured for each 7zip folder.""" 334 335 __slots__ = ['filters', 'compressor', 'coders'] 336 337 lzma_methods_map_r = { 338 lzma.FILTER_LZMA2: CompressionMethod.LZMA2, 339 lzma.FILTER_DELTA: CompressionMethod.DELTA, 340 lzma.FILTER_X86: CompressionMethod.P7Z_BCJ, 341 } 342 343 def __init__(self, filters=None): 344 if filters is None: 345 self.filters = [{"id": lzma.FILTER_LZMA2, "preset": 7 | lzma.PRESET_EXTREME}, ] 346 else: 347 self.filters = filters 348 self.compressor = lzma.LZMACompressor(format=lzma.FORMAT_RAW, filters=self.filters) 349 self.coders = [] 350 for filter in self.filters: 351 if filter is None: 352 break 353 method = self.lzma_methods_map_r[filter['id']] 354 properties = lzma._encode_filter_properties(filter) 355 self.coders.append({'method': method, 'properties': properties, 'numinstreams': 1, 'numoutstreams': 1}) 356 357 def compress(self, data): 358 return self.compressor.compress(data) 359 360 def flush(self): 361 return self.compressor.flush() 362 363 364def get_methods_names(coders: List[dict]) -> List[str]: 365 """Return human readable method names for specified coders""" 366 methods_name_map = { 367 CompressionMethod.LZMA2: "LZMA2", 368 CompressionMethod.LZMA: "LZMA", 369 CompressionMethod.DELTA: "delta", 370 CompressionMethod.P7Z_BCJ: "BCJ", 371 CompressionMethod.BCJ_ARM: "BCJ(ARM)", 372 CompressionMethod.BCJ_ARMT: "BCJ(ARMT)", 373 CompressionMethod.BCJ_IA64: "BCJ(IA64)", 374 CompressionMethod.BCJ_PPC: "BCJ(POWERPC)", 375 CompressionMethod.BCJ_SPARC: "BCJ(SPARC)", 376 CompressionMethod.CRYPT_AES256_SHA256: "7zAES", 377 } 378 methods_names = [] # type: List[str] 379 for coder in coders: 380 try: 381 methods_names.append(methods_name_map[coder['method']]) 382 except KeyError: 383 raise UnsupportedCompressionMethodError("Unknown method {}".format(coder['method'])) 384 return methods_names 385