1#!/usr/bin/python -u
2#
3# p7zr library
4#
5# Copyright (c) 2019 Hiroshi Miura <miurahr@linux.com>
6# Copyright (c) 2004-2015 by Joachim Bauch, mail@joachim-bauch.de
7#
8# This library is free software; you can redistribute it and/or
9# modify it under the terms of the GNU Lesser General Public
10# License as published by the Free Software Foundation; either
11# version 2.1 of the License, or (at your option) any later version.
12#
13# This library is distributed in the hope that it will be useful,
14# but WITHOUT ANY WARRANTY; without even the implied warranty of
15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16# Lesser General Public License for more details.
17#
18# You should have received a copy of the GNU Lesser General Public
19# License along with this library; if not, write to the Free Software
20# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
21#
22#
23
24import _hashlib  # type: ignore  # noqa
25import ctypes
26import os
27import pathlib
28import platform
29import sys
30import time as _time
31import zlib
32from datetime import datetime, timedelta, timezone, tzinfo
33from typing import BinaryIO, Optional, Union
34
35import py7zr.win32compat
36
37
38def calculate_crc32(data: bytes, value: Optional[int] = None, blocksize: int = 1024 * 1024) -> int:
39    """Calculate CRC32 of strings with arbitrary lengths."""
40    length = len(data)
41    pos = blocksize
42    if value:
43        value = zlib.crc32(data[:pos], value)
44    else:
45        value = zlib.crc32(data[:pos])
46    while pos < length:
47        value = zlib.crc32(data[pos:pos + blocksize], value)
48        pos += blocksize
49
50    return value & 0xffffffff
51
52
53def _calculate_key1(password: bytes, cycles: int, salt: bytes, digest: str) -> bytes:
54    """Calculate 7zip AES encryption key."""
55    if digest not in ('sha256'):
56        raise ValueError('Unknown digest method for password protection.')
57    assert cycles <= 0x3f
58    if cycles == 0x3f:
59        ba = bytearray(salt + password + bytes(32))
60        key = bytes(ba[:32])  # type: bytes
61    else:
62        rounds = 1 << cycles
63        m = _hashlib.new(digest)
64        for round in range(rounds):
65            m.update(salt + password + round.to_bytes(8, byteorder='little', signed=False))
66        key = m.digest()[:32]
67    return key
68
69
70def _calculate_key2(password: bytes, cycles: int, salt: bytes, digest: str):
71    """Calculate 7zip AES encryption key.
72    It utilize ctypes and memoryview buffer and zero-copy technology on Python."""
73    if digest not in ('sha256'):
74        raise ValueError('Unknown digest method for password protection.')
75    assert cycles <= 0x3f
76    if cycles == 0x3f:
77        key = bytes(bytearray(salt + password + bytes(32))[:32])  # type: bytes
78    else:
79        rounds = 1 << cycles
80        m = _hashlib.new(digest)
81        length = len(salt) + len(password)
82
83        class RoundBuf(ctypes.LittleEndianStructure):
84            _pack_ = 1
85            _fields_ = [
86                ('saltpassword', ctypes.c_ubyte * length),
87                ('round', ctypes.c_uint64)
88            ]
89
90        buf = RoundBuf()
91        for i, c in enumerate(salt + password):
92            buf.saltpassword[i] = c
93        buf.round = 0
94        mv = memoryview(buf)  # type: ignore # noqa
95        while buf.round < rounds:
96            m.update(mv)
97            buf.round += 1
98        key = m.digest()[:32]
99    return key
100
101
102if platform.python_implementation() == "PyPy":
103    calculate_key = _calculate_key1  # Avoid https://foss.heptapod.net/pypy/pypy/issues/3209
104else:
105    calculate_key = _calculate_key2  # ver2 is 1.7-2.0 times faster than ver1
106
107
108def filetime_to_dt(ft):
109    """Convert Windows NTFS file time into python datetime object."""
110    EPOCH_AS_FILETIME = 116444736000000000
111    us = (ft - EPOCH_AS_FILETIME) // 10
112    return datetime(1970, 1, 1, tzinfo=timezone.utc) + timedelta(microseconds=us)
113
114
115ZERO = timedelta(0)
116HOUR = timedelta(hours=1)
117SECOND = timedelta(seconds=1)
118
119# A class capturing the platform's idea of local time.
120# (May result in wrong values on historical times in
121#  timezones where UTC offset and/or the DST rules had
122#  changed in the past.)
123
124STDOFFSET = timedelta(seconds=-_time.timezone)
125if _time.daylight:
126    DSTOFFSET = timedelta(seconds=-_time.altzone)
127else:
128    DSTOFFSET = STDOFFSET
129
130DSTDIFF = DSTOFFSET - STDOFFSET
131
132
133class LocalTimezone(tzinfo):
134
135    def fromutc(self, dt):
136        assert dt.tzinfo is self
137        stamp = (dt - datetime(1970, 1, 1, tzinfo=self)) // SECOND
138        args = _time.localtime(stamp)[:6]
139        dst_diff = DSTDIFF // SECOND
140        # Detect fold
141        fold = (args == _time.localtime(stamp - dst_diff))
142        return datetime(*args, microsecond=dt.microsecond, tzinfo=self)
143
144    def utcoffset(self, dt):
145        if self._isdst(dt):
146            return DSTOFFSET
147        else:
148            return STDOFFSET
149
150    def dst(self, dt):
151        if self._isdst(dt):
152            return DSTDIFF
153        else:
154            return ZERO
155
156    def tzname(self, dt):
157        return _time.tzname[self._isdst(dt)]
158
159    def _isdst(self, dt):
160        tt = (dt.year, dt.month, dt.day,
161              dt.hour, dt.minute, dt.second,
162              dt.weekday(), 0, 0)
163        stamp = _time.mktime(tt)
164        tt = _time.localtime(stamp)
165        return tt.tm_isdst > 0
166
167
168Local = LocalTimezone()
169TIMESTAMP_ADJUST = -11644473600
170
171
172class UTC(tzinfo):
173    """UTC"""
174
175    def utcoffset(self, dt):
176        return ZERO
177
178    def tzname(self, dt):
179        return "UTC"
180
181    def dst(self, dt):
182        return ZERO
183
184    def _call__(self):
185        return self
186
187
188class ArchiveTimestamp(int):
189    """Windows FILETIME timestamp."""
190
191    def __repr__(self):
192        return '%s(%d)' % (type(self).__name__, self)
193
194    def totimestamp(self) -> float:
195        """Convert 7z FILETIME to Python timestamp."""
196        # FILETIME is 100-nanosecond intervals since 1601/01/01 (UTC)
197        return (self / 10000000.0) + TIMESTAMP_ADJUST
198
199    def as_datetime(self):
200        """Convert FILETIME to Python datetime object."""
201        return datetime.fromtimestamp(self.totimestamp(), UTC())
202
203    @staticmethod
204    def from_datetime(val):
205        return ArchiveTimestamp((val - TIMESTAMP_ADJUST) * 10000000.0)
206
207
208def islink(path):
209    """
210    Cross-platform islink implementation.
211    Supports Windows NT symbolic links and reparse points.
212    """
213    is_symlink = os.path.islink(path)
214    if sys.version_info >= (3, 8) or sys.platform != "win32" or sys.getwindowsversion()[0] < 6:
215        return is_symlink
216    # special check for directory junctions which py38 does.
217    if is_symlink:
218        if py7zr.win32compat.is_reparse_point(path):
219            is_symlink = False
220    return is_symlink
221
222
223def readlink(path: Union[str, pathlib.Path], *, dir_fd=None) -> Union[str, pathlib.Path]:
224    """
225    Cross-platform compat implementation of os.readlink and Path.readlink().
226    Supports Windows NT symbolic links and reparse points.
227    When called with path argument as pathlike(str), return result as a pathlike(str).
228    When called with Path object, return also Path object.
229    When called with path argument as bytes, return result as a bytes.
230    """
231    is_path_pathlib = isinstance(path, pathlib.Path)
232    if sys.version_info >= (3, 9):
233        if is_path_pathlib and dir_fd is None:
234            return path.readlink()
235        else:
236            return os.readlink(path, dir_fd=dir_fd)
237    elif sys.version_info >= (3, 8) or sys.platform != "win32":
238        res = os.readlink(path, dir_fd=dir_fd)
239        # Hack to handle a wrong type of results
240        if isinstance(res, bytes):
241            res = os.fsdecode(res)
242        if is_path_pathlib:
243            return pathlib.Path(res)
244        else:
245            return res
246    elif not os.path.exists(str(path)):
247        raise OSError(22, 'Invalid argument', path)
248    return py7zr.win32compat.readlink(path)
249
250
251class MemIO:
252    """pathlib.Path-like IO class to write memory(io.Bytes)"""
253    def __init__(self, buf: BinaryIO):
254        self._buf = buf
255
256    def write(self, data: bytes) -> int:
257        return self._buf.write(data)
258
259    def read(self, length: Optional[int] = None) -> bytes:
260        if length is not None:
261            return self._buf.read(length)
262        else:
263            return self._buf.read()
264
265    def close(self) -> None:
266        self._buf.seek(0)
267
268    def flush(self) -> None:
269        pass
270
271    def seek(self, position: int) -> None:
272        self._buf.seek(position)
273
274    def open(self, mode=None):
275        return self
276
277    @property
278    def parent(self):
279        return self
280
281    def mkdir(self, parents=None, exist_ok=False):
282        return None
283
284    def __enter__(self):
285        return self
286
287    def __exit__(self, exc_type, exc_val, exc_tb):
288        pass
289
290
291class NullIO:
292    """pathlib.Path-like IO class of /dev/null"""
293
294    def __init__(self):
295        pass
296
297    def write(self, data):
298        return len(data)
299
300    def read(self, length=None):
301        if length is not None:
302            return bytes(length)
303        else:
304            return b''
305
306    def close(self):
307        pass
308
309    def flush(self):
310        pass
311
312    def open(self, mode=None):
313        return self
314
315    @property
316    def parent(self):
317        return self
318
319    def mkdir(self):
320        return None
321
322    def __enter__(self):
323        return self
324
325    def __exit__(self, exc_type, exc_val, exc_tb):
326        pass
327
328
329class BufferOverflow(Exception):
330    pass
331
332
333class Buffer:
334
335    def __init__(self, size: int = 16):
336        self._size = size
337        self._buf = bytearray(size)
338        self._buflen = 0
339        self.view = memoryview(self._buf[0:0])
340
341    def add(self, data: Union[bytes, bytearray, memoryview]):
342        length = len(data)
343        if length + self._buflen > self._size:
344            raise BufferOverflow()
345        self._buf[self._buflen:self._buflen + length] = data
346        self._buflen += length
347        self.view = memoryview(self._buf[0:self._buflen])
348
349    def reset(self) -> None:
350        self._buflen = 0
351        self.view = memoryview(self._buf[0:0])
352
353    def set(self, data: Union[bytes, bytearray, memoryview]) -> None:
354        length = len(data)
355        if length > self._size:
356            raise BufferOverflow()
357        self._buf[0:length] = data
358        self._buflen = length
359        self.view = memoryview(self._buf[0:length])
360
361    def __len__(self) -> int:
362        return self._buflen
363