1#!/usr/bin/python -u
2#
3# p7zr library
4#
5# Copyright (c) 2019,2020 Hiroshi Miura <miurahr@linux.com>
6# Copyright (c) 2004-2015 by Joachim Bauch, mail@joachim-bauch.de
7# 7-Zip Copyright (C) 1999-2010 Igor Pavlov
8# LZMA SDK Copyright (C) 1999-2010 Igor Pavlov
9#
10# This library is free software; you can redistribute it and/or
11# modify it under the terms of the GNU Lesser General Public
12# License as published by the Free Software Foundation; either
13# version 2.1 of the License, or (at your option) any later version.
14#
15# This library is distributed in the hope that it will be useful,
16# but WITHOUT ANY WARRANTY; without even the implied warranty of
17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18# Lesser General Public License for more details.
19#
20# You should have received a copy of the GNU Lesser General Public
21# License along with this library; if not, write to the Free Software
22# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
23#
24#
25"""Read 7zip format archives."""
26import collections.abc
27import datetime
28import errno
29import functools
30import io
31import operator
32import os
33import queue
34import stat
35import sys
36import threading
37from io import BytesIO
38from typing import IO, Any, BinaryIO, Dict, List, Optional, Tuple, Union
39
40from py7zr.archiveinfo import Folder, Header, SignatureHeader
41from py7zr.callbacks import ExtractCallback
42from py7zr.compression import SevenZipCompressor, Worker, get_methods_names
43from py7zr.exceptions import Bad7zFile, InternalError
44from py7zr.helpers import ArchiveTimestamp, MemIO, calculate_crc32, filetime_to_dt
45from py7zr.properties import MAGIC_7Z, READ_BLOCKSIZE, ArchivePassword
46
47if sys.version_info < (3, 6):
48    import contextlib2 as contextlib
49    import pathlib2 as pathlib
50else:
51    import contextlib
52    import pathlib
53
54if sys.platform.startswith('win'):
55    import _winapi
56
57FILE_ATTRIBUTE_UNIX_EXTENSION = 0x8000
58FILE_ATTRIBUTE_WINDOWS_MASK = 0x04fff
59
60
61class ArchiveFile:
62    """Represent each files metadata inside archive file.
63    It holds file properties; filename, permissions, and type whether
64    it is directory, link or normal file.
65
66    Instances of the :class:`ArchiveFile` class are returned by iterating :attr:`files_list` of
67    :class:`SevenZipFile` objects.
68    Each object stores information about a single member of the 7z archive. Most of users use :meth:`extractall()`.
69
70    The class also hold an archive parameter where file is exist in
71    archive file folder(container)."""
72    def __init__(self, id: int, file_info: Dict[str, Any]) -> None:
73        self.id = id
74        self._file_info = file_info
75
76    def file_properties(self) -> Dict[str, Any]:
77        """Return file properties as a hash object. Following keys are included: ‘readonly’, ‘is_directory’,
78        ‘posix_mode’, ‘archivable’, ‘emptystream’, ‘filename’, ‘creationtime’, ‘lastaccesstime’,
79        ‘lastwritetime’, ‘attributes’
80        """
81        properties = self._file_info
82        if properties is not None:
83            properties['readonly'] = self.readonly
84            properties['posix_mode'] = self.posix_mode
85            properties['archivable'] = self.archivable
86            properties['is_directory'] = self.is_directory
87        return properties
88
89    def _get_property(self, key: str) -> Any:
90        try:
91            return self._file_info[key]
92        except KeyError:
93            return None
94
95    @property
96    def origin(self) -> pathlib.Path:
97        return self._get_property('origin')
98
99    @property
100    def folder(self) -> Folder:
101        return self._get_property('folder')
102
103    @property
104    def filename(self) -> str:
105        """return filename of archive file."""
106        return self._get_property('filename')
107
108    @property
109    def emptystream(self) -> bool:
110        """True if file is empty(0-byte file), otherwise False"""
111        return self._get_property('emptystream')
112
113    @property
114    def uncompressed(self) -> List[int]:
115        return self._get_property('uncompressed')
116
117    @property
118    def uncompressed_size(self) -> int:
119        """Uncompressed file size."""
120        return functools.reduce(operator.add, self.uncompressed)
121
122    @property
123    def compressed(self) -> Optional[int]:
124        """Compressed size"""
125        return self._get_property('compressed')
126
127    def _test_attribute(self, target_bit: int) -> bool:
128        attributes = self._get_property('attributes')
129        if attributes is None:
130            return False
131        return attributes & target_bit == target_bit
132
133    @property
134    def archivable(self) -> bool:
135        """File has a Windows `archive` flag."""
136        return self._test_attribute(stat.FILE_ATTRIBUTE_ARCHIVE)  # type: ignore  # noqa
137
138    @property
139    def is_directory(self) -> bool:
140        """True if file is a directory, otherwise False."""
141        return self._test_attribute(stat.FILE_ATTRIBUTE_DIRECTORY)  # type: ignore  # noqa
142
143    @property
144    def readonly(self) -> bool:
145        """True if file is readonly, otherwise False."""
146        return self._test_attribute(stat.FILE_ATTRIBUTE_READONLY)  # type: ignore  # noqa
147
148    def _get_unix_extension(self) -> Optional[int]:
149        attributes = self._get_property('attributes')
150        if self._test_attribute(FILE_ATTRIBUTE_UNIX_EXTENSION):
151            return attributes >> 16
152        return None
153
154    @property
155    def is_symlink(self) -> bool:
156        """True if file is a symbolic link, otherwise False."""
157        e = self._get_unix_extension()
158        if e is not None:
159            return stat.S_ISLNK(e)
160        return self._test_attribute(stat.FILE_ATTRIBUTE_REPARSE_POINT)  # type: ignore  # noqa
161
162    @property
163    def is_junction(self) -> bool:
164        """True if file is a junction/reparse point on windows, otherwise False."""
165        return self._test_attribute(stat.FILE_ATTRIBUTE_REPARSE_POINT |  # type: ignore  # noqa
166                                    stat.FILE_ATTRIBUTE_DIRECTORY)  # type: ignore  # noqa
167
168    @property
169    def is_socket(self) -> bool:
170        """True if file is a socket, otherwise False."""
171        e = self._get_unix_extension()
172        if e is not None:
173            return stat.S_ISSOCK(e)
174        return False
175
176    @property
177    def lastwritetime(self) -> Optional[ArchiveTimestamp]:
178        """Return last written timestamp of a file."""
179        return self._get_property('lastwritetime')
180
181    @property
182    def posix_mode(self) -> Optional[int]:
183        """
184        posix mode when a member has a unix extension property, or None
185        :return: Return file stat mode can be set by os.chmod()
186        """
187        e = self._get_unix_extension()
188        if e is not None:
189            return stat.S_IMODE(e)
190        return None
191
192    @property
193    def st_fmt(self) -> Optional[int]:
194        """
195        :return: Return the portion of the file mode that describes the file type
196        """
197        e = self._get_unix_extension()
198        if e is not None:
199            return stat.S_IFMT(e)
200        return None
201
202
203class ArchiveFileList(collections.abc.Iterable):
204    """Iteratable container of ArchiveFile."""
205
206    def __init__(self, offset: int = 0):
207        self.files_list = []  # type: List[dict]
208        self.index = 0
209        self.offset = offset
210
211    def append(self, file_info: Dict[str, Any]) -> None:
212        self.files_list.append(file_info)
213
214    def __len__(self) -> int:
215        return len(self.files_list)
216
217    def __iter__(self) -> 'ArchiveFileListIterator':
218        return ArchiveFileListIterator(self)
219
220    def __getitem__(self, index):
221        if index > len(self.files_list):
222            raise IndexError
223        if index < 0:
224            raise IndexError
225        res = ArchiveFile(index + self.offset, self.files_list[index])
226        return res
227
228
229class ArchiveFileListIterator(collections.abc.Iterator):
230
231    def __init__(self, archive_file_list):
232        self._archive_file_list = archive_file_list
233        self._index = 0
234
235    def __next__(self) -> ArchiveFile:
236        if self._index == len(self._archive_file_list):
237            raise StopIteration
238        res = self._archive_file_list[self._index]
239        self._index += 1
240        return res
241
242
243# ------------------
244# Exported Classes
245# ------------------
246class ArchiveInfo:
247    """Hold archive information"""
248
249    def __init__(self, filename, size, header_size, method_names, solid, blocks, uncompressed):
250        self.filename = filename
251        self.size = size
252        self.header_size = header_size
253        self.method_names = method_names
254        self.solid = solid
255        self.blocks = blocks
256        self.uncompressed = uncompressed
257
258
259class FileInfo:
260    """Hold archived file information."""
261
262    def __init__(self, filename, compressed, uncompressed, archivable, is_directory, creationtime):
263        self.filename = filename
264        self.compressed = compressed
265        self.uncompressed = uncompressed
266        self.archivable = archivable
267        self.is_directory = is_directory
268        self.creationtime = creationtime
269
270
271class SevenZipFile(contextlib.AbstractContextManager):
272    """The SevenZipFile Class provides an interface to 7z archives."""
273
274    def __init__(self, file: Union[BinaryIO, str, pathlib.Path], mode: str = 'r',
275                 *, filters: Optional[str] = None, dereference=False, password: Optional[str] = None) -> None:
276        if mode not in ('r', 'w', 'x', 'a'):
277            raise ValueError("ZipFile requires mode 'r', 'w', 'x', or 'a'")
278        if password is not None:
279            if mode not in ('r'):
280                raise NotImplementedError("It has not been implemented to create archive with password.")
281            ArchivePassword(password)
282            self.password_protected = True
283        else:
284            self.password_protected = False
285        # Check if we were passed a file-like object or not
286        if isinstance(file, str):
287            self._filePassed = False  # type: bool
288            self.filename = file  # type: str
289            if mode == 'r':
290                self.fp = open(file, 'rb')  # type: BinaryIO
291            elif mode == 'w':
292                self.fp = open(file, 'w+b')
293            elif mode == 'x':
294                self.fp = open(file, 'x+b')
295            elif mode == 'a':
296                self.fp = open(file, 'r+b')
297            else:
298                raise ValueError("File open error.")
299            self.mode = mode
300        elif isinstance(file, pathlib.Path):
301            self._filePassed = False
302            self.filename = str(file)
303            if mode == 'r':
304                self.fp = file.open(mode='rb')  # type: ignore  # noqa   # typeshed issue: 2911
305            elif mode == 'w':
306                self.fp = file.open(mode='w+b')  # type: ignore  # noqa
307            elif mode == 'x':
308                self.fp = file.open(mode='x+b')  # type: ignore  # noqa
309            elif mode == 'a':
310                self.fp = file.open(mode='r+b')  # type: ignore  # noqa
311            else:
312                raise ValueError("File open error.")
313            self.mode = mode
314        elif isinstance(file, io.IOBase):
315            self._filePassed = True
316            self.fp = file
317            self.filename = getattr(file, 'name', None)
318            self.mode = mode  # type: ignore  #noqa
319        else:
320            raise TypeError("invalid file: {}".format(type(file)))
321        self._fileRefCnt = 1
322        try:
323            if mode == "r":
324                self._real_get_contents(self.fp)
325                self._reset_worker()
326            elif mode in 'w':
327                # FIXME: check filters here
328                self.folder = self._create_folder(filters)
329                self.files = ArchiveFileList()
330                self._prepare_write()
331                self._reset_worker()
332            elif mode in 'x':
333                raise NotImplementedError
334            elif mode == 'a':
335                raise NotImplementedError
336            else:
337                raise ValueError("Mode must be 'r', 'w', 'x', or 'a'")
338        except Exception as e:
339            self._fpclose()
340            raise e
341        self.encoded_header_mode = False
342        self._dict = {}  # type: Dict[str, IO[Any]]
343        self.dereference = dereference
344        self.reporterd = None  # type: Optional[threading.Thread]
345        self.q = queue.Queue()  # type: queue.Queue[Any]
346
347    def __enter__(self):
348        return self
349
350    def __exit__(self, exc_type, exc_val, exc_tb):
351        self.close()
352
353    def _create_folder(self, filters):
354        folder = Folder()
355        folder.compressor = SevenZipCompressor(filters)
356        folder.coders = folder.compressor.coders
357        folder.solid = True
358        folder.digestdefined = False
359        folder.bindpairs = []
360        folder.totalin = 1
361        folder.totalout = 1
362        return folder
363
364    def _fpclose(self) -> None:
365        assert self._fileRefCnt > 0
366        self._fileRefCnt -= 1
367        if not self._fileRefCnt and not self._filePassed:
368            self.fp.close()
369
370    def _real_get_contents(self, fp: BinaryIO) -> None:
371        if not self._check_7zfile(fp):
372            raise Bad7zFile('not a 7z file')
373        self.sig_header = SignatureHeader.retrieve(self.fp)
374        self.afterheader = self.fp.tell()
375        buffer = self._read_header_data()
376        header = Header.retrieve(self.fp, buffer, self.afterheader)
377        if header is None:
378            return
379        self.header = header
380        buffer.close()
381        self.files = ArchiveFileList()
382        if getattr(self.header, 'files_info', None) is not None:
383            self._filelist_retrieve()
384
385    def _read_header_data(self) -> BytesIO:
386        self.fp.seek(self.sig_header.nextheaderofs, os.SEEK_CUR)
387        buffer = io.BytesIO(self.fp.read(self.sig_header.nextheadersize))
388        if self.sig_header.nextheadercrc != calculate_crc32(buffer.getvalue()):
389            raise Bad7zFile('invalid header data')
390        return buffer
391
392    class ParseStatus:
393        def __init__(self, src_pos=0):
394            self.src_pos = src_pos
395            self.folder = 0  # 7zip folder where target stored
396            self.outstreams = 0  # output stream count
397            self.input = 0  # unpack stream count in each folder
398            self.stream = 0  # target input stream position
399
400    def _gen_filename(self) -> str:
401        # compressed file is stored without a name, generate one
402        try:
403            basefilename = self.filename
404        except AttributeError:
405            # 7z archive file doesn't have a name
406            return 'contents'
407        else:
408            if basefilename is not None:
409                fn, ext = os.path.splitext(os.path.basename(basefilename))
410                return fn
411            else:
412                return 'contents'
413
414    def _get_fileinfo_sizes(self, pstat, subinfo, packinfo, folder, packsizes, unpacksizes, file_in_solid, numinstreams):
415        if pstat.input == 0:
416            folder.solid = subinfo.num_unpackstreams_folders[pstat.folder] > 1
417        maxsize = (folder.solid and packinfo.packsizes[pstat.stream]) or None
418        uncompressed = unpacksizes[pstat.outstreams]
419        if not isinstance(uncompressed, (list, tuple)):
420            uncompressed = [uncompressed] * len(folder.coders)
421        if file_in_solid > 0:
422            compressed = None
423        elif pstat.stream < len(packsizes):  # file is compressed
424            compressed = packsizes[pstat.stream]
425        else:  # file is not compressed
426            compressed = uncompressed
427        packsize = packsizes[pstat.stream:pstat.stream + numinstreams]
428        return maxsize, compressed, uncompressed, packsize, folder.solid
429
430    def _filelist_retrieve(self) -> None:
431        # Initialize references for convenience
432        if hasattr(self.header, 'main_streams') and self.header.main_streams is not None:
433            folders = self.header.main_streams.unpackinfo.folders
434            packinfo = self.header.main_streams.packinfo
435            subinfo = self.header.main_streams.substreamsinfo
436            packsizes = packinfo.packsizes
437            unpacksizes = subinfo.unpacksizes if subinfo.unpacksizes is not None else [x.unpacksizes for x in folders]
438        else:
439            subinfo = None
440            folders = None
441            packinfo = None
442            packsizes = []
443            unpacksizes = [0]
444
445        pstat = self.ParseStatus()
446        pstat.src_pos = self.afterheader
447        file_in_solid = 0
448
449        for file_id, file_info in enumerate(self.header.files_info.files):
450            if not file_info['emptystream'] and folders is not None:
451                folder = folders[pstat.folder]
452                numinstreams = max([coder.get('numinstreams', 1) for coder in folder.coders])
453                (maxsize, compressed, uncompressed,
454                 packsize, solid) = self._get_fileinfo_sizes(pstat, subinfo, packinfo, folder, packsizes,
455                                                             unpacksizes, file_in_solid, numinstreams)
456                pstat.input += 1
457                folder.solid = solid
458                file_info['folder'] = folder
459                file_info['maxsize'] = maxsize
460                file_info['compressed'] = compressed
461                file_info['uncompressed'] = uncompressed
462                file_info['packsizes'] = packsize
463                if subinfo.digestsdefined[pstat.outstreams]:
464                    file_info['digest'] = subinfo.digests[pstat.outstreams]
465                if folder is None:
466                    pstat.src_pos += file_info['compressed']
467                else:
468                    if folder.solid:
469                        file_in_solid += 1
470                    pstat.outstreams += 1
471                    if folder.files is None:
472                        folder.files = ArchiveFileList(offset=file_id)
473                    folder.files.append(file_info)
474                    if pstat.input >= subinfo.num_unpackstreams_folders[pstat.folder]:
475                        file_in_solid = 0
476                        pstat.src_pos += sum(packinfo.packsizes[pstat.stream:pstat.stream + numinstreams])
477                        pstat.folder += 1
478                        pstat.stream += numinstreams
479                        pstat.input = 0
480            else:
481                file_info['folder'] = None
482                file_info['maxsize'] = 0
483                file_info['compressed'] = 0
484                file_info['uncompressed'] = [0]
485                file_info['packsizes'] = [0]
486
487            if 'filename' not in file_info:
488                file_info['filename'] = self._gen_filename()
489            self.files.append(file_info)
490
491    def _num_files(self) -> int:
492        if getattr(self.header, 'files_info', None) is not None:
493            return len(self.header.files_info.files)
494        return 0
495
496    def _set_file_property(self, outfilename: pathlib.Path, properties: Dict[str, Any]) -> None:
497        # creation time
498        creationtime = ArchiveTimestamp(properties['lastwritetime']).totimestamp()
499        if creationtime is not None:
500            os.utime(str(outfilename), times=(creationtime, creationtime))
501        if os.name == 'posix':
502            st_mode = properties['posix_mode']
503            if st_mode is not None:
504                outfilename.chmod(st_mode)
505                return
506        # fallback: only set readonly if specified
507        if properties['readonly'] and not properties['is_directory']:
508            ro_mask = 0o777 ^ (stat.S_IWRITE | stat.S_IWGRP | stat.S_IWOTH)
509            outfilename.chmod(outfilename.stat().st_mode & ro_mask)
510
511    def _reset_decompressor(self) -> None:
512        if self.header.main_streams is not None and self.header.main_streams.unpackinfo.numfolders > 0:
513            for i, folder in enumerate(self.header.main_streams.unpackinfo.folders):
514                folder.decompressor = None
515
516    def _reset_worker(self) -> None:
517        """Seek to where archive data start in archive and recreate new worker."""
518        self.fp.seek(self.afterheader)
519        self.worker = Worker(self.files, self.afterheader, self.header)
520
521    def set_encoded_header_mode(self, mode: bool) -> None:
522        self.encoded_header_mode = mode
523
524    @staticmethod
525    def _check_7zfile(fp: Union[BinaryIO, io.BufferedReader]) -> bool:
526        result = MAGIC_7Z == fp.read(len(MAGIC_7Z))[:len(MAGIC_7Z)]
527        fp.seek(-len(MAGIC_7Z), 1)
528        return result
529
530    def _get_method_names(self) -> str:
531        methods_names = []  # type: List[str]
532        for folder in self.header.main_streams.unpackinfo.folders:
533            methods_names += get_methods_names(folder.coders)
534        return ', '.join(x for x in methods_names)
535
536    def _test_digest_raw(self, pos: int, size: int, crc: int) -> bool:
537        self.fp.seek(pos)
538        remaining_size = size
539        digest = None
540        while remaining_size > 0:
541            block = min(READ_BLOCKSIZE, remaining_size)
542            digest = calculate_crc32(self.fp.read(block), digest)
543            remaining_size -= block
544        return digest == crc
545
546    def _test_pack_digest(self) -> bool:
547        self._reset_worker()
548        crcs = self.header.main_streams.packinfo.crcs
549        if crcs is not None and len(crcs) > 0:
550            # check packed stream's crc
551            for i, p in enumerate(self.header.main_streams.packinfo.packpositions):
552                if not self._test_digest_raw(p, self.header.main_streams.packinfo.packsizes[i], crcs[i]):
553                    return False
554        return True
555
556    def _test_unpack_digest(self) -> bool:
557        self._reset_worker()
558        for f in self.files:
559            self.worker.register_filelike(f.id, None)
560        try:
561            self.worker.extract(self.fp, parallel=(not self.password_protected))  # TODO: print progress
562        except Bad7zFile:
563            return False
564        else:
565            return True
566
567    def _test_digests(self) -> bool:
568        if self._test_pack_digest():
569            if self._test_unpack_digest():
570                return True
571        return False
572
573    def _prepare_write(self) -> None:
574        self.sig_header = SignatureHeader()
575        self.sig_header._write_skelton(self.fp)
576        self.afterheader = self.fp.tell()
577        self.folder.totalin = 1
578        self.folder.totalout = 1
579        self.folder.bindpairs = []
580        self.folder.unpacksizes = []
581        self.header = Header.build_header([self.folder])
582
583    def _write_archive(self):
584        self.worker.archive(self.fp, self.folder, deref=self.dereference)
585        # Write header and update signature header
586        (header_pos, header_len, header_crc) = self.header.write(self.fp, self.afterheader,
587                                                                 encoded=self.encoded_header_mode)
588        self.sig_header.nextheaderofs = header_pos - self.afterheader
589        self.sig_header.calccrc(header_len, header_crc)
590        self.sig_header.write(self.fp)
591        return
592
593    def _is_solid(self):
594        for f in self.header.main_streams.substreamsinfo.num_unpackstreams_folders:
595            if f > 1:
596                return True
597        return False
598
599    def _var_release(self):
600        self._dict = None
601        self.files = None
602        self.folder = None
603        self.header = None
604        self.worker = None
605        self.sig_header = None
606
607    @staticmethod
608    def _make_file_info(target: pathlib.Path, arcname: Optional[str] = None, dereference=False) -> Dict[str, Any]:
609        f = {}  # type: Dict[str, Any]
610        f['origin'] = target
611        if arcname is not None:
612            f['filename'] = pathlib.Path(arcname).as_posix()
613        else:
614            f['filename'] = target.as_posix()
615        if os.name == 'nt':
616            fstat = target.lstat()
617            if target.is_symlink():
618                if dereference:
619                    fstat = target.stat()
620                    if stat.S_ISDIR(fstat.st_mode):
621                        f['emptystream'] = True
622                        f['attributes'] = fstat.st_file_attributes & FILE_ATTRIBUTE_WINDOWS_MASK  # type: ignore  # noqa
623                    else:
624                        f['emptystream'] = False
625                        f['attributes'] = stat.FILE_ATTRIBUTE_ARCHIVE  # type: ignore  # noqa
626                        f['uncompressed'] = fstat.st_size
627                else:
628                    f['emptystream'] = False
629                    f['attributes'] = fstat.st_file_attributes & FILE_ATTRIBUTE_WINDOWS_MASK  # type: ignore  # noqa
630                    # f['attributes'] |= stat.FILE_ATTRIBUTE_REPARSE_POINT  # type: ignore  # noqa
631            elif target.is_dir():
632                f['emptystream'] = True
633                f['attributes'] = fstat.st_file_attributes & FILE_ATTRIBUTE_WINDOWS_MASK  # type: ignore  # noqa
634            elif target.is_file():
635                f['emptystream'] = False
636                f['attributes'] = stat.FILE_ATTRIBUTE_ARCHIVE  # type: ignore  # noqa
637                f['uncompressed'] = fstat.st_size
638        else:
639            fstat = target.lstat()
640            if target.is_symlink():
641                if dereference:
642                    fstat = target.stat()
643                    if stat.S_ISDIR(fstat.st_mode):
644                        f['emptystream'] = True
645                        f['attributes'] = stat.FILE_ATTRIBUTE_DIRECTORY  # type: ignore  # noqa
646                        f['attributes'] |= FILE_ATTRIBUTE_UNIX_EXTENSION | (stat.S_IFDIR << 16)
647                        f['attributes'] |= (stat.S_IMODE(fstat.st_mode) << 16)
648                    else:
649                        f['emptystream'] = False
650                        f['attributes'] = stat.FILE_ATTRIBUTE_ARCHIVE  # type: ignore  # noqa
651                        f['attributes'] |= FILE_ATTRIBUTE_UNIX_EXTENSION | (stat.S_IMODE(fstat.st_mode) << 16)
652                else:
653                    f['emptystream'] = False
654                    f['attributes'] = stat.FILE_ATTRIBUTE_ARCHIVE | stat.FILE_ATTRIBUTE_REPARSE_POINT # type: ignore  # noqa
655                    f['attributes'] |= FILE_ATTRIBUTE_UNIX_EXTENSION | (stat.S_IFLNK << 16)
656                    f['attributes'] |= (stat.S_IMODE(fstat.st_mode) << 16)
657            elif target.is_dir():
658                f['emptystream'] = True
659                f['attributes'] = stat.FILE_ATTRIBUTE_DIRECTORY  # type: ignore  # noqa
660                f['attributes'] |= FILE_ATTRIBUTE_UNIX_EXTENSION | (stat.S_IFDIR << 16)
661                f['attributes'] |= (stat.S_IMODE(fstat.st_mode) << 16)
662            elif target.is_file():
663                f['emptystream'] = False
664                f['uncompressed'] = fstat.st_size
665                f['attributes'] = stat.FILE_ATTRIBUTE_ARCHIVE  # type: ignore  # noqa
666                f['attributes'] |= FILE_ATTRIBUTE_UNIX_EXTENSION | (stat.S_IMODE(fstat.st_mode) << 16)
667
668        f['creationtime'] = fstat.st_ctime
669        f['lastwritetime'] = fstat.st_mtime
670        f['lastaccesstime'] = fstat.st_atime
671        return f
672
673    # --------------------------------------------------------------------------
674    # The public methods which SevenZipFile provides:
675    def getnames(self) -> List[str]:
676        """Return the members of the archive as a list of their names. It has
677           the same order as the list returned by getmembers().
678        """
679        return list(map(lambda x: x.filename, self.files))
680
681    def archiveinfo(self) -> ArchiveInfo:
682        fstat = os.stat(self.filename)
683        uncompressed = 0
684        for f in self.files:
685            uncompressed += f.uncompressed_size
686        return ArchiveInfo(self.filename, fstat.st_size, self.header.size, self._get_method_names(),
687                           self._is_solid(), len(self.header.main_streams.unpackinfo.folders),
688                           uncompressed)
689
690    def list(self) -> List[FileInfo]:
691        """Returns contents information """
692        alist = []  # type: List[FileInfo]
693        creationtime = None  # type: Optional[datetime.datetime]
694        for f in self.files:
695            if f.lastwritetime is not None:
696                creationtime = filetime_to_dt(f.lastwritetime)
697            alist.append(FileInfo(f.filename, f.compressed, f.uncompressed_size, f.archivable, f.is_directory,
698                                  creationtime))
699        return alist
700
701    def test(self) -> bool:
702        """Test archive using CRC digests."""
703        return self._test_digests()
704
705    def readall(self) -> Optional[Dict[str, IO[Any]]]:
706        return self._extract(path=None, return_dict=True)
707
708    def extractall(self, path: Optional[Any] = None, callback: Optional[ExtractCallback] = None) -> None:
709        """Extract all members from the archive to the current working
710           directory and set owner, modification time and permissions on
711           directories afterwards. `path' specifies a different directory
712           to extract to.
713        """
714        self._extract(path=path, return_dict=False, callback=callback)
715
716    def read(self, targets: Optional[List[str]] = None) -> Optional[Dict[str, IO[Any]]]:
717        return self._extract(path=None, targets=targets, return_dict=True)
718
719    def extract(self, path: Optional[Any] = None, targets: Optional[List[str]] = None) -> None:
720        self._extract(path, targets, return_dict=False)
721
722    def _extract(self, path: Optional[Any] = None, targets: Optional[List[str]] = None,
723                 return_dict: bool = False, callback: Optional[ExtractCallback] = None) -> Optional[Dict[str, IO[Any]]]:
724        if callback is not None and not isinstance(callback, ExtractCallback):
725            raise ValueError('Callback specified is not a subclass of py7zr.callbacks.ExtractCallback class')
726        elif callback is not None:
727            self.reporterd = threading.Thread(target=self.reporter, args=(callback,), daemon=True)
728            self.reporterd.start()
729        target_junction = []  # type: List[pathlib.Path]
730        target_sym = []  # type: List[pathlib.Path]
731        target_files = []  # type: List[Tuple[pathlib.Path, Dict[str, Any]]]
732        target_dirs = []  # type: List[pathlib.Path]
733        if path is not None:
734            if isinstance(path, str):
735                path = pathlib.Path(path)
736            try:
737                if not path.exists():
738                    path.mkdir(parents=True)
739                else:
740                    pass
741            except OSError as e:
742                if e.errno == errno.EEXIST and path.is_dir():
743                    pass
744                else:
745                    raise e
746        fnames = []  # type: List[str]  # check duplicated filename in one archive?
747        self.q.put(('pre', None, None))
748        for f in self.files:
749            # TODO: sanity check
750            # check whether f.filename with invalid characters: '../'
751            if f.filename.startswith('../'):
752                raise Bad7zFile
753            # When archive has a multiple files which have same name
754            # To guarantee order of archive, multi-thread decompression becomes off.
755            # Currently always overwrite by latter archives.
756            # TODO: provide option to select overwrite or skip.
757            if f.filename not in fnames:
758                outname = f.filename
759            else:
760                i = 0
761                while True:
762                    outname = f.filename + '_%d' % i
763                    if outname not in fnames:
764                        break
765            fnames.append(outname)
766            if path is not None:
767                outfilename = path.joinpath(outname)
768            else:
769                outfilename = pathlib.Path(outname)
770            if os.name == 'nt':
771                if outfilename.is_absolute():
772                    # hack for microsoft windows path length limit < 255
773                    outfilename = pathlib.WindowsPath('\\\\?\\' + str(outfilename))
774            if targets is not None and f.filename not in targets:
775                self.worker.register_filelike(f.id, None)
776                continue
777            if f.is_directory:
778                if not outfilename.exists():
779                    target_dirs.append(outfilename)
780                    target_files.append((outfilename, f.file_properties()))
781                else:
782                    pass
783            elif f.is_socket:
784                pass
785            elif return_dict:
786                fname = outfilename.as_posix()
787                _buf = io.BytesIO()
788                self._dict[fname] = _buf
789                self.worker.register_filelike(f.id, MemIO(_buf))
790            elif f.is_symlink:
791                target_sym.append(outfilename)
792                try:
793                    if outfilename.exists():
794                        outfilename.unlink()
795                except OSError as ose:
796                    if ose.errno not in [errno.ENOENT]:
797                        raise
798                self.worker.register_filelike(f.id, outfilename)
799            elif f.is_junction:
800                target_junction.append(outfilename)
801                self.worker.register_filelike(f.id, outfilename)
802            else:
803                self.worker.register_filelike(f.id, outfilename)
804                target_files.append((outfilename, f.file_properties()))
805        for target_dir in sorted(target_dirs):
806            try:
807                target_dir.mkdir()
808            except FileExistsError:
809                if target_dir.is_dir():
810                    # skip rare case
811                    pass
812                elif target_dir.is_file():
813                    raise Exception("Directory name is existed as a normal file.")
814                else:
815                    raise Exception("Directory making fails on unknown condition.")
816
817        if callback is not None:
818            self.worker.extract(self.fp, parallel=(not self.password_protected and not self._filePassed), q=self.q)
819        else:
820            self.worker.extract(self.fp, parallel=(not self.password_protected and not self._filePassed))
821
822        self.q.put(('post', None, None))
823        if return_dict:
824            return self._dict
825        else:
826            # create symbolic links on target path as a working directory.
827            # if path is None, work on current working directory.
828            for t in target_sym:
829                sym_dst = t.resolve()
830                with sym_dst.open('rb') as b:
831                    sym_src = b.read().decode(encoding='utf-8')  # symlink target name stored in utf-8
832                sym_dst.unlink()  # unlink after close().
833                sym_dst.symlink_to(pathlib.Path(sym_src))
834            # create junction point only on windows platform
835            if sys.platform.startswith('win'):
836                for t in target_junction:
837                    junction_dst = t.resolve()
838                    with junction_dst.open('rb') as b:
839                        junction_target = pathlib.Path(b.read().decode(encoding='utf-8'))
840                        junction_dst.unlink()
841                        _winapi.CreateJunction(junction_target, str(junction_dst))  # type: ignore  # noqa
842            # set file properties
843            for o, p in target_files:
844                self._set_file_property(o, p)
845            return None
846
847    def reporter(self, callback: ExtractCallback):
848        while True:
849            try:
850                item: Optional[Tuple[str, str, str]] = self.q.get(timeout=1)
851            except queue.Empty:
852                pass
853            else:
854                if item is None:
855                    break
856                elif item[0] == 's':
857                    callback.report_start(item[1], item[2])
858                elif item[0] == 'e':
859                    callback.report_end(item[1], item[2])
860                elif item[0] == 'pre':
861                    callback.report_start_preparation()
862                elif item[0] == 'post':
863                    callback.report_postprocess()
864                elif item[0] == 'w':
865                    callback.report_warning(item[1])
866                else:
867                    pass
868                self.q.task_done()
869
870    def writeall(self, path: Union[pathlib.Path, str], arcname: Optional[str] = None):
871        """Write files in target path into archive."""
872        if isinstance(path, str):
873            path = pathlib.Path(path)
874        if not path.exists():
875            raise ValueError("specified path does not exist.")
876        if path.is_dir() or path.is_file():
877            self._writeall(path, arcname)
878        else:
879            raise ValueError("specified path is not a directory or a file")
880
881    def _writeall(self, path, arcname):
882        try:
883            if path.is_symlink() and not self.dereference:
884                self.write(path, arcname)
885            elif path.is_file():
886                self.write(path, arcname)
887            elif path.is_dir():
888                if not path.samefile('.'):
889                    self.write(path, arcname)
890                for nm in sorted(os.listdir(str(path))):
891                    arc = os.path.join(arcname, nm) if arcname is not None else None
892                    self._writeall(path.joinpath(nm), arc)
893            else:
894                return  # pathlib ignores ELOOP and return False for is_*().
895        except OSError as ose:
896            if self.dereference and ose.errno in [errno.ELOOP]:
897                return  # ignore ELOOP here, this resulted to stop looped symlink reference.
898            elif self.dereference and sys.platform == 'win32' and ose.errno in [errno.ENOENT]:
899                return  # ignore ENOENT which is happened when a case of ELOOP on windows.
900            else:
901                raise
902
903    def write(self, file: Union[pathlib.Path, str], arcname: Optional[str] = None):
904        """Write single target file into archive(Not implemented yet)."""
905        if isinstance(file, str):
906            path = pathlib.Path(file)
907        elif isinstance(file, pathlib.Path):
908            path = file
909        else:
910            raise ValueError("Unsupported file type.")
911        file_info = self._make_file_info(path, arcname, self.dereference)
912        self.files.append(file_info)
913
914    def close(self):
915        """Flush all the data into archive and close it.
916        When close py7zr start reading target and writing actual archive file.
917        """
918        if 'w' in self.mode:
919            self._write_archive()
920        if 'r' in self.mode:
921            if self.reporterd is not None:
922                self.q.put_nowait(None)
923                self.reporterd.join(1)
924                if self.reporterd.is_alive():
925                    raise InternalError("Progress report thread terminate error.")
926                self.reporterd = None
927        self._fpclose()
928        self._var_release()
929
930    def reset(self) -> None:
931        """When read mode, it reset file pointer, decompress worker and decompressor"""
932        if self.mode == 'r':
933            self._reset_worker()
934            self._reset_decompressor()
935
936
937# --------------------
938# exported functions
939# --------------------
940def is_7zfile(file: Union[BinaryIO, str, pathlib.Path]) -> bool:
941    """Quickly see if a file is a 7Z file by checking the magic number.
942    The file argument may be a filename or file-like object too.
943    """
944    result = False
945    try:
946        if isinstance(file, io.IOBase) and hasattr(file, "read"):
947            result = SevenZipFile._check_7zfile(file)  # type: ignore  # noqa
948        elif isinstance(file, str):
949            with open(file, 'rb') as fp:
950                result = SevenZipFile._check_7zfile(fp)
951        elif isinstance(file, pathlib.Path) or isinstance(file, pathlib.PosixPath) or \
952                isinstance(file, pathlib.WindowsPath):
953            with file.open(mode='rb') as fp:  # type: ignore  # noqa
954                result = SevenZipFile._check_7zfile(fp)
955        else:
956            raise TypeError('invalid type: file should be str, pathlib.Path or BinaryIO, but {}'.format(type(file)))
957    except OSError:
958        pass
959    return result
960
961
962def unpack_7zarchive(archive, path, extra=None):
963    """Function for registering with shutil.register_unpack_format()"""
964    arc = SevenZipFile(archive)
965    arc.extractall(path)
966    arc.close()
967
968
969def pack_7zarchive(base_name, base_dir, owner=None, group=None, dry_run=None, logger=None):
970    """Function for registering with shutil.register_archive_format()"""
971    target_name = '{}.7z'.format(base_name)
972    archive = SevenZipFile(target_name, mode='w')
973    archive.writeall(path=base_dir)
974    archive.close()
975