1#!/usr/bin/python -u 2# 3# p7zr library 4# 5# Copyright (c) 2019,2020 Hiroshi Miura <miurahr@linux.com> 6# Copyright (c) 2004-2015 by Joachim Bauch, mail@joachim-bauch.de 7# 7-Zip Copyright (C) 1999-2010 Igor Pavlov 8# LZMA SDK Copyright (C) 1999-2010 Igor Pavlov 9# 10# This library is free software; you can redistribute it and/or 11# modify it under the terms of the GNU Lesser General Public 12# License as published by the Free Software Foundation; either 13# version 2.1 of the License, or (at your option) any later version. 14# 15# This library is distributed in the hope that it will be useful, 16# but WITHOUT ANY WARRANTY; without even the implied warranty of 17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18# Lesser General Public License for more details. 19# 20# You should have received a copy of the GNU Lesser General Public 21# License along with this library; if not, write to the Free Software 22# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 23# 24# 25"""Read 7zip format archives.""" 26import collections.abc 27import datetime 28import errno 29import functools 30import io 31import operator 32import os 33import queue 34import stat 35import sys 36import threading 37from io import BytesIO 38from typing import IO, Any, BinaryIO, Dict, List, Optional, Tuple, Union 39 40from py7zr.archiveinfo import Folder, Header, SignatureHeader 41from py7zr.callbacks import ExtractCallback 42from py7zr.compression import SevenZipCompressor, Worker, get_methods_names 43from py7zr.exceptions import Bad7zFile, InternalError 44from py7zr.helpers import ArchiveTimestamp, MemIO, calculate_crc32, filetime_to_dt 45from py7zr.properties import MAGIC_7Z, READ_BLOCKSIZE, ArchivePassword 46 47if sys.version_info < (3, 6): 48 import contextlib2 as contextlib 49 import pathlib2 as pathlib 50else: 51 import contextlib 52 import pathlib 53 54if sys.platform.startswith('win'): 55 import _winapi 56 57FILE_ATTRIBUTE_UNIX_EXTENSION = 0x8000 58FILE_ATTRIBUTE_WINDOWS_MASK = 0x04fff 59 60 61class ArchiveFile: 62 """Represent each files metadata inside archive file. 63 It holds file properties; filename, permissions, and type whether 64 it is directory, link or normal file. 65 66 Instances of the :class:`ArchiveFile` class are returned by iterating :attr:`files_list` of 67 :class:`SevenZipFile` objects. 68 Each object stores information about a single member of the 7z archive. Most of users use :meth:`extractall()`. 69 70 The class also hold an archive parameter where file is exist in 71 archive file folder(container).""" 72 def __init__(self, id: int, file_info: Dict[str, Any]) -> None: 73 self.id = id 74 self._file_info = file_info 75 76 def file_properties(self) -> Dict[str, Any]: 77 """Return file properties as a hash object. Following keys are included: ‘readonly’, ‘is_directory’, 78 ‘posix_mode’, ‘archivable’, ‘emptystream’, ‘filename’, ‘creationtime’, ‘lastaccesstime’, 79 ‘lastwritetime’, ‘attributes’ 80 """ 81 properties = self._file_info 82 if properties is not None: 83 properties['readonly'] = self.readonly 84 properties['posix_mode'] = self.posix_mode 85 properties['archivable'] = self.archivable 86 properties['is_directory'] = self.is_directory 87 return properties 88 89 def _get_property(self, key: str) -> Any: 90 try: 91 return self._file_info[key] 92 except KeyError: 93 return None 94 95 @property 96 def origin(self) -> pathlib.Path: 97 return self._get_property('origin') 98 99 @property 100 def folder(self) -> Folder: 101 return self._get_property('folder') 102 103 @property 104 def filename(self) -> str: 105 """return filename of archive file.""" 106 return self._get_property('filename') 107 108 @property 109 def emptystream(self) -> bool: 110 """True if file is empty(0-byte file), otherwise False""" 111 return self._get_property('emptystream') 112 113 @property 114 def uncompressed(self) -> List[int]: 115 return self._get_property('uncompressed') 116 117 @property 118 def uncompressed_size(self) -> int: 119 """Uncompressed file size.""" 120 return functools.reduce(operator.add, self.uncompressed) 121 122 @property 123 def compressed(self) -> Optional[int]: 124 """Compressed size""" 125 return self._get_property('compressed') 126 127 def _test_attribute(self, target_bit: int) -> bool: 128 attributes = self._get_property('attributes') 129 if attributes is None: 130 return False 131 return attributes & target_bit == target_bit 132 133 @property 134 def archivable(self) -> bool: 135 """File has a Windows `archive` flag.""" 136 return self._test_attribute(stat.FILE_ATTRIBUTE_ARCHIVE) # type: ignore # noqa 137 138 @property 139 def is_directory(self) -> bool: 140 """True if file is a directory, otherwise False.""" 141 return self._test_attribute(stat.FILE_ATTRIBUTE_DIRECTORY) # type: ignore # noqa 142 143 @property 144 def readonly(self) -> bool: 145 """True if file is readonly, otherwise False.""" 146 return self._test_attribute(stat.FILE_ATTRIBUTE_READONLY) # type: ignore # noqa 147 148 def _get_unix_extension(self) -> Optional[int]: 149 attributes = self._get_property('attributes') 150 if self._test_attribute(FILE_ATTRIBUTE_UNIX_EXTENSION): 151 return attributes >> 16 152 return None 153 154 @property 155 def is_symlink(self) -> bool: 156 """True if file is a symbolic link, otherwise False.""" 157 e = self._get_unix_extension() 158 if e is not None: 159 return stat.S_ISLNK(e) 160 return self._test_attribute(stat.FILE_ATTRIBUTE_REPARSE_POINT) # type: ignore # noqa 161 162 @property 163 def is_junction(self) -> bool: 164 """True if file is a junction/reparse point on windows, otherwise False.""" 165 return self._test_attribute(stat.FILE_ATTRIBUTE_REPARSE_POINT | # type: ignore # noqa 166 stat.FILE_ATTRIBUTE_DIRECTORY) # type: ignore # noqa 167 168 @property 169 def is_socket(self) -> bool: 170 """True if file is a socket, otherwise False.""" 171 e = self._get_unix_extension() 172 if e is not None: 173 return stat.S_ISSOCK(e) 174 return False 175 176 @property 177 def lastwritetime(self) -> Optional[ArchiveTimestamp]: 178 """Return last written timestamp of a file.""" 179 return self._get_property('lastwritetime') 180 181 @property 182 def posix_mode(self) -> Optional[int]: 183 """ 184 posix mode when a member has a unix extension property, or None 185 :return: Return file stat mode can be set by os.chmod() 186 """ 187 e = self._get_unix_extension() 188 if e is not None: 189 return stat.S_IMODE(e) 190 return None 191 192 @property 193 def st_fmt(self) -> Optional[int]: 194 """ 195 :return: Return the portion of the file mode that describes the file type 196 """ 197 e = self._get_unix_extension() 198 if e is not None: 199 return stat.S_IFMT(e) 200 return None 201 202 203class ArchiveFileList(collections.abc.Iterable): 204 """Iteratable container of ArchiveFile.""" 205 206 def __init__(self, offset: int = 0): 207 self.files_list = [] # type: List[dict] 208 self.index = 0 209 self.offset = offset 210 211 def append(self, file_info: Dict[str, Any]) -> None: 212 self.files_list.append(file_info) 213 214 def __len__(self) -> int: 215 return len(self.files_list) 216 217 def __iter__(self) -> 'ArchiveFileListIterator': 218 return ArchiveFileListIterator(self) 219 220 def __getitem__(self, index): 221 if index > len(self.files_list): 222 raise IndexError 223 if index < 0: 224 raise IndexError 225 res = ArchiveFile(index + self.offset, self.files_list[index]) 226 return res 227 228 229class ArchiveFileListIterator(collections.abc.Iterator): 230 231 def __init__(self, archive_file_list): 232 self._archive_file_list = archive_file_list 233 self._index = 0 234 235 def __next__(self) -> ArchiveFile: 236 if self._index == len(self._archive_file_list): 237 raise StopIteration 238 res = self._archive_file_list[self._index] 239 self._index += 1 240 return res 241 242 243# ------------------ 244# Exported Classes 245# ------------------ 246class ArchiveInfo: 247 """Hold archive information""" 248 249 def __init__(self, filename, size, header_size, method_names, solid, blocks, uncompressed): 250 self.filename = filename 251 self.size = size 252 self.header_size = header_size 253 self.method_names = method_names 254 self.solid = solid 255 self.blocks = blocks 256 self.uncompressed = uncompressed 257 258 259class FileInfo: 260 """Hold archived file information.""" 261 262 def __init__(self, filename, compressed, uncompressed, archivable, is_directory, creationtime): 263 self.filename = filename 264 self.compressed = compressed 265 self.uncompressed = uncompressed 266 self.archivable = archivable 267 self.is_directory = is_directory 268 self.creationtime = creationtime 269 270 271class SevenZipFile(contextlib.AbstractContextManager): 272 """The SevenZipFile Class provides an interface to 7z archives.""" 273 274 def __init__(self, file: Union[BinaryIO, str, pathlib.Path], mode: str = 'r', 275 *, filters: Optional[str] = None, dereference=False, password: Optional[str] = None) -> None: 276 if mode not in ('r', 'w', 'x', 'a'): 277 raise ValueError("ZipFile requires mode 'r', 'w', 'x', or 'a'") 278 if password is not None: 279 if mode not in ('r'): 280 raise NotImplementedError("It has not been implemented to create archive with password.") 281 ArchivePassword(password) 282 self.password_protected = True 283 else: 284 self.password_protected = False 285 # Check if we were passed a file-like object or not 286 if isinstance(file, str): 287 self._filePassed = False # type: bool 288 self.filename = file # type: str 289 if mode == 'r': 290 self.fp = open(file, 'rb') # type: BinaryIO 291 elif mode == 'w': 292 self.fp = open(file, 'w+b') 293 elif mode == 'x': 294 self.fp = open(file, 'x+b') 295 elif mode == 'a': 296 self.fp = open(file, 'r+b') 297 else: 298 raise ValueError("File open error.") 299 self.mode = mode 300 elif isinstance(file, pathlib.Path): 301 self._filePassed = False 302 self.filename = str(file) 303 if mode == 'r': 304 self.fp = file.open(mode='rb') # type: ignore # noqa # typeshed issue: 2911 305 elif mode == 'w': 306 self.fp = file.open(mode='w+b') # type: ignore # noqa 307 elif mode == 'x': 308 self.fp = file.open(mode='x+b') # type: ignore # noqa 309 elif mode == 'a': 310 self.fp = file.open(mode='r+b') # type: ignore # noqa 311 else: 312 raise ValueError("File open error.") 313 self.mode = mode 314 elif isinstance(file, io.IOBase): 315 self._filePassed = True 316 self.fp = file 317 self.filename = getattr(file, 'name', None) 318 self.mode = mode # type: ignore #noqa 319 else: 320 raise TypeError("invalid file: {}".format(type(file))) 321 self._fileRefCnt = 1 322 try: 323 if mode == "r": 324 self._real_get_contents(self.fp) 325 self._reset_worker() 326 elif mode in 'w': 327 # FIXME: check filters here 328 self.folder = self._create_folder(filters) 329 self.files = ArchiveFileList() 330 self._prepare_write() 331 self._reset_worker() 332 elif mode in 'x': 333 raise NotImplementedError 334 elif mode == 'a': 335 raise NotImplementedError 336 else: 337 raise ValueError("Mode must be 'r', 'w', 'x', or 'a'") 338 except Exception as e: 339 self._fpclose() 340 raise e 341 self.encoded_header_mode = False 342 self._dict = {} # type: Dict[str, IO[Any]] 343 self.dereference = dereference 344 self.reporterd = None # type: Optional[threading.Thread] 345 self.q = queue.Queue() # type: queue.Queue[Any] 346 347 def __enter__(self): 348 return self 349 350 def __exit__(self, exc_type, exc_val, exc_tb): 351 self.close() 352 353 def _create_folder(self, filters): 354 folder = Folder() 355 folder.compressor = SevenZipCompressor(filters) 356 folder.coders = folder.compressor.coders 357 folder.solid = True 358 folder.digestdefined = False 359 folder.bindpairs = [] 360 folder.totalin = 1 361 folder.totalout = 1 362 return folder 363 364 def _fpclose(self) -> None: 365 assert self._fileRefCnt > 0 366 self._fileRefCnt -= 1 367 if not self._fileRefCnt and not self._filePassed: 368 self.fp.close() 369 370 def _real_get_contents(self, fp: BinaryIO) -> None: 371 if not self._check_7zfile(fp): 372 raise Bad7zFile('not a 7z file') 373 self.sig_header = SignatureHeader.retrieve(self.fp) 374 self.afterheader = self.fp.tell() 375 buffer = self._read_header_data() 376 header = Header.retrieve(self.fp, buffer, self.afterheader) 377 if header is None: 378 return 379 self.header = header 380 buffer.close() 381 self.files = ArchiveFileList() 382 if getattr(self.header, 'files_info', None) is not None: 383 self._filelist_retrieve() 384 385 def _read_header_data(self) -> BytesIO: 386 self.fp.seek(self.sig_header.nextheaderofs, os.SEEK_CUR) 387 buffer = io.BytesIO(self.fp.read(self.sig_header.nextheadersize)) 388 if self.sig_header.nextheadercrc != calculate_crc32(buffer.getvalue()): 389 raise Bad7zFile('invalid header data') 390 return buffer 391 392 class ParseStatus: 393 def __init__(self, src_pos=0): 394 self.src_pos = src_pos 395 self.folder = 0 # 7zip folder where target stored 396 self.outstreams = 0 # output stream count 397 self.input = 0 # unpack stream count in each folder 398 self.stream = 0 # target input stream position 399 400 def _gen_filename(self) -> str: 401 # compressed file is stored without a name, generate one 402 try: 403 basefilename = self.filename 404 except AttributeError: 405 # 7z archive file doesn't have a name 406 return 'contents' 407 else: 408 if basefilename is not None: 409 fn, ext = os.path.splitext(os.path.basename(basefilename)) 410 return fn 411 else: 412 return 'contents' 413 414 def _get_fileinfo_sizes(self, pstat, subinfo, packinfo, folder, packsizes, unpacksizes, file_in_solid, numinstreams): 415 if pstat.input == 0: 416 folder.solid = subinfo.num_unpackstreams_folders[pstat.folder] > 1 417 maxsize = (folder.solid and packinfo.packsizes[pstat.stream]) or None 418 uncompressed = unpacksizes[pstat.outstreams] 419 if not isinstance(uncompressed, (list, tuple)): 420 uncompressed = [uncompressed] * len(folder.coders) 421 if file_in_solid > 0: 422 compressed = None 423 elif pstat.stream < len(packsizes): # file is compressed 424 compressed = packsizes[pstat.stream] 425 else: # file is not compressed 426 compressed = uncompressed 427 packsize = packsizes[pstat.stream:pstat.stream + numinstreams] 428 return maxsize, compressed, uncompressed, packsize, folder.solid 429 430 def _filelist_retrieve(self) -> None: 431 # Initialize references for convenience 432 if hasattr(self.header, 'main_streams') and self.header.main_streams is not None: 433 folders = self.header.main_streams.unpackinfo.folders 434 packinfo = self.header.main_streams.packinfo 435 subinfo = self.header.main_streams.substreamsinfo 436 packsizes = packinfo.packsizes 437 unpacksizes = subinfo.unpacksizes if subinfo.unpacksizes is not None else [x.unpacksizes for x in folders] 438 else: 439 subinfo = None 440 folders = None 441 packinfo = None 442 packsizes = [] 443 unpacksizes = [0] 444 445 pstat = self.ParseStatus() 446 pstat.src_pos = self.afterheader 447 file_in_solid = 0 448 449 for file_id, file_info in enumerate(self.header.files_info.files): 450 if not file_info['emptystream'] and folders is not None: 451 folder = folders[pstat.folder] 452 numinstreams = max([coder.get('numinstreams', 1) for coder in folder.coders]) 453 (maxsize, compressed, uncompressed, 454 packsize, solid) = self._get_fileinfo_sizes(pstat, subinfo, packinfo, folder, packsizes, 455 unpacksizes, file_in_solid, numinstreams) 456 pstat.input += 1 457 folder.solid = solid 458 file_info['folder'] = folder 459 file_info['maxsize'] = maxsize 460 file_info['compressed'] = compressed 461 file_info['uncompressed'] = uncompressed 462 file_info['packsizes'] = packsize 463 if subinfo.digestsdefined[pstat.outstreams]: 464 file_info['digest'] = subinfo.digests[pstat.outstreams] 465 if folder is None: 466 pstat.src_pos += file_info['compressed'] 467 else: 468 if folder.solid: 469 file_in_solid += 1 470 pstat.outstreams += 1 471 if folder.files is None: 472 folder.files = ArchiveFileList(offset=file_id) 473 folder.files.append(file_info) 474 if pstat.input >= subinfo.num_unpackstreams_folders[pstat.folder]: 475 file_in_solid = 0 476 pstat.src_pos += sum(packinfo.packsizes[pstat.stream:pstat.stream + numinstreams]) 477 pstat.folder += 1 478 pstat.stream += numinstreams 479 pstat.input = 0 480 else: 481 file_info['folder'] = None 482 file_info['maxsize'] = 0 483 file_info['compressed'] = 0 484 file_info['uncompressed'] = [0] 485 file_info['packsizes'] = [0] 486 487 if 'filename' not in file_info: 488 file_info['filename'] = self._gen_filename() 489 self.files.append(file_info) 490 491 def _num_files(self) -> int: 492 if getattr(self.header, 'files_info', None) is not None: 493 return len(self.header.files_info.files) 494 return 0 495 496 def _set_file_property(self, outfilename: pathlib.Path, properties: Dict[str, Any]) -> None: 497 # creation time 498 creationtime = ArchiveTimestamp(properties['lastwritetime']).totimestamp() 499 if creationtime is not None: 500 os.utime(str(outfilename), times=(creationtime, creationtime)) 501 if os.name == 'posix': 502 st_mode = properties['posix_mode'] 503 if st_mode is not None: 504 outfilename.chmod(st_mode) 505 return 506 # fallback: only set readonly if specified 507 if properties['readonly'] and not properties['is_directory']: 508 ro_mask = 0o777 ^ (stat.S_IWRITE | stat.S_IWGRP | stat.S_IWOTH) 509 outfilename.chmod(outfilename.stat().st_mode & ro_mask) 510 511 def _reset_decompressor(self) -> None: 512 if self.header.main_streams is not None and self.header.main_streams.unpackinfo.numfolders > 0: 513 for i, folder in enumerate(self.header.main_streams.unpackinfo.folders): 514 folder.decompressor = None 515 516 def _reset_worker(self) -> None: 517 """Seek to where archive data start in archive and recreate new worker.""" 518 self.fp.seek(self.afterheader) 519 self.worker = Worker(self.files, self.afterheader, self.header) 520 521 def set_encoded_header_mode(self, mode: bool) -> None: 522 self.encoded_header_mode = mode 523 524 @staticmethod 525 def _check_7zfile(fp: Union[BinaryIO, io.BufferedReader]) -> bool: 526 result = MAGIC_7Z == fp.read(len(MAGIC_7Z))[:len(MAGIC_7Z)] 527 fp.seek(-len(MAGIC_7Z), 1) 528 return result 529 530 def _get_method_names(self) -> str: 531 methods_names = [] # type: List[str] 532 for folder in self.header.main_streams.unpackinfo.folders: 533 methods_names += get_methods_names(folder.coders) 534 return ', '.join(x for x in methods_names) 535 536 def _test_digest_raw(self, pos: int, size: int, crc: int) -> bool: 537 self.fp.seek(pos) 538 remaining_size = size 539 digest = None 540 while remaining_size > 0: 541 block = min(READ_BLOCKSIZE, remaining_size) 542 digest = calculate_crc32(self.fp.read(block), digest) 543 remaining_size -= block 544 return digest == crc 545 546 def _test_pack_digest(self) -> bool: 547 self._reset_worker() 548 crcs = self.header.main_streams.packinfo.crcs 549 if crcs is not None and len(crcs) > 0: 550 # check packed stream's crc 551 for i, p in enumerate(self.header.main_streams.packinfo.packpositions): 552 if not self._test_digest_raw(p, self.header.main_streams.packinfo.packsizes[i], crcs[i]): 553 return False 554 return True 555 556 def _test_unpack_digest(self) -> bool: 557 self._reset_worker() 558 for f in self.files: 559 self.worker.register_filelike(f.id, None) 560 try: 561 self.worker.extract(self.fp, parallel=(not self.password_protected)) # TODO: print progress 562 except Bad7zFile: 563 return False 564 else: 565 return True 566 567 def _test_digests(self) -> bool: 568 if self._test_pack_digest(): 569 if self._test_unpack_digest(): 570 return True 571 return False 572 573 def _prepare_write(self) -> None: 574 self.sig_header = SignatureHeader() 575 self.sig_header._write_skelton(self.fp) 576 self.afterheader = self.fp.tell() 577 self.folder.totalin = 1 578 self.folder.totalout = 1 579 self.folder.bindpairs = [] 580 self.folder.unpacksizes = [] 581 self.header = Header.build_header([self.folder]) 582 583 def _write_archive(self): 584 self.worker.archive(self.fp, self.folder, deref=self.dereference) 585 # Write header and update signature header 586 (header_pos, header_len, header_crc) = self.header.write(self.fp, self.afterheader, 587 encoded=self.encoded_header_mode) 588 self.sig_header.nextheaderofs = header_pos - self.afterheader 589 self.sig_header.calccrc(header_len, header_crc) 590 self.sig_header.write(self.fp) 591 return 592 593 def _is_solid(self): 594 for f in self.header.main_streams.substreamsinfo.num_unpackstreams_folders: 595 if f > 1: 596 return True 597 return False 598 599 def _var_release(self): 600 self._dict = None 601 self.files = None 602 self.folder = None 603 self.header = None 604 self.worker = None 605 self.sig_header = None 606 607 @staticmethod 608 def _make_file_info(target: pathlib.Path, arcname: Optional[str] = None, dereference=False) -> Dict[str, Any]: 609 f = {} # type: Dict[str, Any] 610 f['origin'] = target 611 if arcname is not None: 612 f['filename'] = pathlib.Path(arcname).as_posix() 613 else: 614 f['filename'] = target.as_posix() 615 if os.name == 'nt': 616 fstat = target.lstat() 617 if target.is_symlink(): 618 if dereference: 619 fstat = target.stat() 620 if stat.S_ISDIR(fstat.st_mode): 621 f['emptystream'] = True 622 f['attributes'] = fstat.st_file_attributes & FILE_ATTRIBUTE_WINDOWS_MASK # type: ignore # noqa 623 else: 624 f['emptystream'] = False 625 f['attributes'] = stat.FILE_ATTRIBUTE_ARCHIVE # type: ignore # noqa 626 f['uncompressed'] = fstat.st_size 627 else: 628 f['emptystream'] = False 629 f['attributes'] = fstat.st_file_attributes & FILE_ATTRIBUTE_WINDOWS_MASK # type: ignore # noqa 630 # f['attributes'] |= stat.FILE_ATTRIBUTE_REPARSE_POINT # type: ignore # noqa 631 elif target.is_dir(): 632 f['emptystream'] = True 633 f['attributes'] = fstat.st_file_attributes & FILE_ATTRIBUTE_WINDOWS_MASK # type: ignore # noqa 634 elif target.is_file(): 635 f['emptystream'] = False 636 f['attributes'] = stat.FILE_ATTRIBUTE_ARCHIVE # type: ignore # noqa 637 f['uncompressed'] = fstat.st_size 638 else: 639 fstat = target.lstat() 640 if target.is_symlink(): 641 if dereference: 642 fstat = target.stat() 643 if stat.S_ISDIR(fstat.st_mode): 644 f['emptystream'] = True 645 f['attributes'] = stat.FILE_ATTRIBUTE_DIRECTORY # type: ignore # noqa 646 f['attributes'] |= FILE_ATTRIBUTE_UNIX_EXTENSION | (stat.S_IFDIR << 16) 647 f['attributes'] |= (stat.S_IMODE(fstat.st_mode) << 16) 648 else: 649 f['emptystream'] = False 650 f['attributes'] = stat.FILE_ATTRIBUTE_ARCHIVE # type: ignore # noqa 651 f['attributes'] |= FILE_ATTRIBUTE_UNIX_EXTENSION | (stat.S_IMODE(fstat.st_mode) << 16) 652 else: 653 f['emptystream'] = False 654 f['attributes'] = stat.FILE_ATTRIBUTE_ARCHIVE | stat.FILE_ATTRIBUTE_REPARSE_POINT # type: ignore # noqa 655 f['attributes'] |= FILE_ATTRIBUTE_UNIX_EXTENSION | (stat.S_IFLNK << 16) 656 f['attributes'] |= (stat.S_IMODE(fstat.st_mode) << 16) 657 elif target.is_dir(): 658 f['emptystream'] = True 659 f['attributes'] = stat.FILE_ATTRIBUTE_DIRECTORY # type: ignore # noqa 660 f['attributes'] |= FILE_ATTRIBUTE_UNIX_EXTENSION | (stat.S_IFDIR << 16) 661 f['attributes'] |= (stat.S_IMODE(fstat.st_mode) << 16) 662 elif target.is_file(): 663 f['emptystream'] = False 664 f['uncompressed'] = fstat.st_size 665 f['attributes'] = stat.FILE_ATTRIBUTE_ARCHIVE # type: ignore # noqa 666 f['attributes'] |= FILE_ATTRIBUTE_UNIX_EXTENSION | (stat.S_IMODE(fstat.st_mode) << 16) 667 668 f['creationtime'] = fstat.st_ctime 669 f['lastwritetime'] = fstat.st_mtime 670 f['lastaccesstime'] = fstat.st_atime 671 return f 672 673 # -------------------------------------------------------------------------- 674 # The public methods which SevenZipFile provides: 675 def getnames(self) -> List[str]: 676 """Return the members of the archive as a list of their names. It has 677 the same order as the list returned by getmembers(). 678 """ 679 return list(map(lambda x: x.filename, self.files)) 680 681 def archiveinfo(self) -> ArchiveInfo: 682 fstat = os.stat(self.filename) 683 uncompressed = 0 684 for f in self.files: 685 uncompressed += f.uncompressed_size 686 return ArchiveInfo(self.filename, fstat.st_size, self.header.size, self._get_method_names(), 687 self._is_solid(), len(self.header.main_streams.unpackinfo.folders), 688 uncompressed) 689 690 def list(self) -> List[FileInfo]: 691 """Returns contents information """ 692 alist = [] # type: List[FileInfo] 693 creationtime = None # type: Optional[datetime.datetime] 694 for f in self.files: 695 if f.lastwritetime is not None: 696 creationtime = filetime_to_dt(f.lastwritetime) 697 alist.append(FileInfo(f.filename, f.compressed, f.uncompressed_size, f.archivable, f.is_directory, 698 creationtime)) 699 return alist 700 701 def test(self) -> bool: 702 """Test archive using CRC digests.""" 703 return self._test_digests() 704 705 def readall(self) -> Optional[Dict[str, IO[Any]]]: 706 return self._extract(path=None, return_dict=True) 707 708 def extractall(self, path: Optional[Any] = None, callback: Optional[ExtractCallback] = None) -> None: 709 """Extract all members from the archive to the current working 710 directory and set owner, modification time and permissions on 711 directories afterwards. `path' specifies a different directory 712 to extract to. 713 """ 714 self._extract(path=path, return_dict=False, callback=callback) 715 716 def read(self, targets: Optional[List[str]] = None) -> Optional[Dict[str, IO[Any]]]: 717 return self._extract(path=None, targets=targets, return_dict=True) 718 719 def extract(self, path: Optional[Any] = None, targets: Optional[List[str]] = None) -> None: 720 self._extract(path, targets, return_dict=False) 721 722 def _extract(self, path: Optional[Any] = None, targets: Optional[List[str]] = None, 723 return_dict: bool = False, callback: Optional[ExtractCallback] = None) -> Optional[Dict[str, IO[Any]]]: 724 if callback is not None and not isinstance(callback, ExtractCallback): 725 raise ValueError('Callback specified is not a subclass of py7zr.callbacks.ExtractCallback class') 726 elif callback is not None: 727 self.reporterd = threading.Thread(target=self.reporter, args=(callback,), daemon=True) 728 self.reporterd.start() 729 target_junction = [] # type: List[pathlib.Path] 730 target_sym = [] # type: List[pathlib.Path] 731 target_files = [] # type: List[Tuple[pathlib.Path, Dict[str, Any]]] 732 target_dirs = [] # type: List[pathlib.Path] 733 if path is not None: 734 if isinstance(path, str): 735 path = pathlib.Path(path) 736 try: 737 if not path.exists(): 738 path.mkdir(parents=True) 739 else: 740 pass 741 except OSError as e: 742 if e.errno == errno.EEXIST and path.is_dir(): 743 pass 744 else: 745 raise e 746 fnames = [] # type: List[str] # check duplicated filename in one archive? 747 self.q.put(('pre', None, None)) 748 for f in self.files: 749 # TODO: sanity check 750 # check whether f.filename with invalid characters: '../' 751 if f.filename.startswith('../'): 752 raise Bad7zFile 753 # When archive has a multiple files which have same name 754 # To guarantee order of archive, multi-thread decompression becomes off. 755 # Currently always overwrite by latter archives. 756 # TODO: provide option to select overwrite or skip. 757 if f.filename not in fnames: 758 outname = f.filename 759 else: 760 i = 0 761 while True: 762 outname = f.filename + '_%d' % i 763 if outname not in fnames: 764 break 765 fnames.append(outname) 766 if path is not None: 767 outfilename = path.joinpath(outname) 768 else: 769 outfilename = pathlib.Path(outname) 770 if os.name == 'nt': 771 if outfilename.is_absolute(): 772 # hack for microsoft windows path length limit < 255 773 outfilename = pathlib.WindowsPath('\\\\?\\' + str(outfilename)) 774 if targets is not None and f.filename not in targets: 775 self.worker.register_filelike(f.id, None) 776 continue 777 if f.is_directory: 778 if not outfilename.exists(): 779 target_dirs.append(outfilename) 780 target_files.append((outfilename, f.file_properties())) 781 else: 782 pass 783 elif f.is_socket: 784 pass 785 elif return_dict: 786 fname = outfilename.as_posix() 787 _buf = io.BytesIO() 788 self._dict[fname] = _buf 789 self.worker.register_filelike(f.id, MemIO(_buf)) 790 elif f.is_symlink: 791 target_sym.append(outfilename) 792 try: 793 if outfilename.exists(): 794 outfilename.unlink() 795 except OSError as ose: 796 if ose.errno not in [errno.ENOENT]: 797 raise 798 self.worker.register_filelike(f.id, outfilename) 799 elif f.is_junction: 800 target_junction.append(outfilename) 801 self.worker.register_filelike(f.id, outfilename) 802 else: 803 self.worker.register_filelike(f.id, outfilename) 804 target_files.append((outfilename, f.file_properties())) 805 for target_dir in sorted(target_dirs): 806 try: 807 target_dir.mkdir() 808 except FileExistsError: 809 if target_dir.is_dir(): 810 # skip rare case 811 pass 812 elif target_dir.is_file(): 813 raise Exception("Directory name is existed as a normal file.") 814 else: 815 raise Exception("Directory making fails on unknown condition.") 816 817 if callback is not None: 818 self.worker.extract(self.fp, parallel=(not self.password_protected and not self._filePassed), q=self.q) 819 else: 820 self.worker.extract(self.fp, parallel=(not self.password_protected and not self._filePassed)) 821 822 self.q.put(('post', None, None)) 823 if return_dict: 824 return self._dict 825 else: 826 # create symbolic links on target path as a working directory. 827 # if path is None, work on current working directory. 828 for t in target_sym: 829 sym_dst = t.resolve() 830 with sym_dst.open('rb') as b: 831 sym_src = b.read().decode(encoding='utf-8') # symlink target name stored in utf-8 832 sym_dst.unlink() # unlink after close(). 833 sym_dst.symlink_to(pathlib.Path(sym_src)) 834 # create junction point only on windows platform 835 if sys.platform.startswith('win'): 836 for t in target_junction: 837 junction_dst = t.resolve() 838 with junction_dst.open('rb') as b: 839 junction_target = pathlib.Path(b.read().decode(encoding='utf-8')) 840 junction_dst.unlink() 841 _winapi.CreateJunction(junction_target, str(junction_dst)) # type: ignore # noqa 842 # set file properties 843 for o, p in target_files: 844 self._set_file_property(o, p) 845 return None 846 847 def reporter(self, callback: ExtractCallback): 848 while True: 849 try: 850 item: Optional[Tuple[str, str, str]] = self.q.get(timeout=1) 851 except queue.Empty: 852 pass 853 else: 854 if item is None: 855 break 856 elif item[0] == 's': 857 callback.report_start(item[1], item[2]) 858 elif item[0] == 'e': 859 callback.report_end(item[1], item[2]) 860 elif item[0] == 'pre': 861 callback.report_start_preparation() 862 elif item[0] == 'post': 863 callback.report_postprocess() 864 elif item[0] == 'w': 865 callback.report_warning(item[1]) 866 else: 867 pass 868 self.q.task_done() 869 870 def writeall(self, path: Union[pathlib.Path, str], arcname: Optional[str] = None): 871 """Write files in target path into archive.""" 872 if isinstance(path, str): 873 path = pathlib.Path(path) 874 if not path.exists(): 875 raise ValueError("specified path does not exist.") 876 if path.is_dir() or path.is_file(): 877 self._writeall(path, arcname) 878 else: 879 raise ValueError("specified path is not a directory or a file") 880 881 def _writeall(self, path, arcname): 882 try: 883 if path.is_symlink() and not self.dereference: 884 self.write(path, arcname) 885 elif path.is_file(): 886 self.write(path, arcname) 887 elif path.is_dir(): 888 if not path.samefile('.'): 889 self.write(path, arcname) 890 for nm in sorted(os.listdir(str(path))): 891 arc = os.path.join(arcname, nm) if arcname is not None else None 892 self._writeall(path.joinpath(nm), arc) 893 else: 894 return # pathlib ignores ELOOP and return False for is_*(). 895 except OSError as ose: 896 if self.dereference and ose.errno in [errno.ELOOP]: 897 return # ignore ELOOP here, this resulted to stop looped symlink reference. 898 elif self.dereference and sys.platform == 'win32' and ose.errno in [errno.ENOENT]: 899 return # ignore ENOENT which is happened when a case of ELOOP on windows. 900 else: 901 raise 902 903 def write(self, file: Union[pathlib.Path, str], arcname: Optional[str] = None): 904 """Write single target file into archive(Not implemented yet).""" 905 if isinstance(file, str): 906 path = pathlib.Path(file) 907 elif isinstance(file, pathlib.Path): 908 path = file 909 else: 910 raise ValueError("Unsupported file type.") 911 file_info = self._make_file_info(path, arcname, self.dereference) 912 self.files.append(file_info) 913 914 def close(self): 915 """Flush all the data into archive and close it. 916 When close py7zr start reading target and writing actual archive file. 917 """ 918 if 'w' in self.mode: 919 self._write_archive() 920 if 'r' in self.mode: 921 if self.reporterd is not None: 922 self.q.put_nowait(None) 923 self.reporterd.join(1) 924 if self.reporterd.is_alive(): 925 raise InternalError("Progress report thread terminate error.") 926 self.reporterd = None 927 self._fpclose() 928 self._var_release() 929 930 def reset(self) -> None: 931 """When read mode, it reset file pointer, decompress worker and decompressor""" 932 if self.mode == 'r': 933 self._reset_worker() 934 self._reset_decompressor() 935 936 937# -------------------- 938# exported functions 939# -------------------- 940def is_7zfile(file: Union[BinaryIO, str, pathlib.Path]) -> bool: 941 """Quickly see if a file is a 7Z file by checking the magic number. 942 The file argument may be a filename or file-like object too. 943 """ 944 result = False 945 try: 946 if isinstance(file, io.IOBase) and hasattr(file, "read"): 947 result = SevenZipFile._check_7zfile(file) # type: ignore # noqa 948 elif isinstance(file, str): 949 with open(file, 'rb') as fp: 950 result = SevenZipFile._check_7zfile(fp) 951 elif isinstance(file, pathlib.Path) or isinstance(file, pathlib.PosixPath) or \ 952 isinstance(file, pathlib.WindowsPath): 953 with file.open(mode='rb') as fp: # type: ignore # noqa 954 result = SevenZipFile._check_7zfile(fp) 955 else: 956 raise TypeError('invalid type: file should be str, pathlib.Path or BinaryIO, but {}'.format(type(file))) 957 except OSError: 958 pass 959 return result 960 961 962def unpack_7zarchive(archive, path, extra=None): 963 """Function for registering with shutil.register_unpack_format()""" 964 arc = SevenZipFile(archive) 965 arc.extractall(path) 966 arc.close() 967 968 969def pack_7zarchive(base_name, base_dir, owner=None, group=None, dry_run=None, logger=None): 970 """Function for registering with shutil.register_archive_format()""" 971 target_name = '{}.7z'.format(base_name) 972 archive = SevenZipFile(target_name, mode='w') 973 archive.writeall(path=base_dir) 974 archive.close() 975