1#!/usr/bin/python -u 2# 3# p7zr library 4# 5# Copyright (c) 2019 Hiroshi Miura <miurahr@linux.com> 6# Copyright (c) 2004-2015 by Joachim Bauch, mail@joachim-bauch.de 7# 8# This library is free software; you can redistribute it and/or 9# modify it under the terms of the GNU Lesser General Public 10# License as published by the Free Software Foundation; either 11# version 2.1 of the License, or (at your option) any later version. 12# 13# This library is distributed in the hope that it will be useful, 14# but WITHOUT ANY WARRANTY; without even the implied warranty of 15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 16# Lesser General Public License for more details. 17# 18# You should have received a copy of the GNU Lesser General Public 19# License along with this library; if not, write to the Free Software 20# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 21# 22# 23 24import _hashlib # type: ignore # noqa 25import ctypes 26import os 27import pathlib 28import platform 29import sys 30import time as _time 31import zlib 32from datetime import datetime, timedelta, timezone, tzinfo 33from typing import BinaryIO, Optional, Union 34 35import py7zr.win32compat 36 37 38def calculate_crc32(data: bytes, value: Optional[int] = None, blocksize: int = 1024 * 1024) -> int: 39 """Calculate CRC32 of strings with arbitrary lengths.""" 40 length = len(data) 41 pos = blocksize 42 if value: 43 value = zlib.crc32(data[:pos], value) 44 else: 45 value = zlib.crc32(data[:pos]) 46 while pos < length: 47 value = zlib.crc32(data[pos:pos + blocksize], value) 48 pos += blocksize 49 50 return value & 0xffffffff 51 52 53def _calculate_key1(password: bytes, cycles: int, salt: bytes, digest: str) -> bytes: 54 """Calculate 7zip AES encryption key.""" 55 if digest not in ('sha256'): 56 raise ValueError('Unknown digest method for password protection.') 57 assert cycles <= 0x3f 58 if cycles == 0x3f: 59 ba = bytearray(salt + password + bytes(32)) 60 key = bytes(ba[:32]) # type: bytes 61 else: 62 rounds = 1 << cycles 63 m = _hashlib.new(digest) 64 for round in range(rounds): 65 m.update(salt + password + round.to_bytes(8, byteorder='little', signed=False)) 66 key = m.digest()[:32] 67 return key 68 69 70def _calculate_key2(password: bytes, cycles: int, salt: bytes, digest: str): 71 """Calculate 7zip AES encryption key. 72 It utilize ctypes and memoryview buffer and zero-copy technology on Python.""" 73 if digest not in ('sha256'): 74 raise ValueError('Unknown digest method for password protection.') 75 assert cycles <= 0x3f 76 if cycles == 0x3f: 77 key = bytes(bytearray(salt + password + bytes(32))[:32]) # type: bytes 78 else: 79 rounds = 1 << cycles 80 m = _hashlib.new(digest) 81 length = len(salt) + len(password) 82 83 class RoundBuf(ctypes.LittleEndianStructure): 84 _pack_ = 1 85 _fields_ = [ 86 ('saltpassword', ctypes.c_ubyte * length), 87 ('round', ctypes.c_uint64) 88 ] 89 90 buf = RoundBuf() 91 for i, c in enumerate(salt + password): 92 buf.saltpassword[i] = c 93 buf.round = 0 94 mv = memoryview(buf) # type: ignore # noqa 95 while buf.round < rounds: 96 m.update(mv) 97 buf.round += 1 98 key = m.digest()[:32] 99 return key 100 101 102if platform.python_implementation() == "PyPy": 103 calculate_key = _calculate_key1 # Avoid https://foss.heptapod.net/pypy/pypy/issues/3209 104else: 105 calculate_key = _calculate_key2 # ver2 is 1.7-2.0 times faster than ver1 106 107 108def filetime_to_dt(ft): 109 """Convert Windows NTFS file time into python datetime object.""" 110 EPOCH_AS_FILETIME = 116444736000000000 111 us = (ft - EPOCH_AS_FILETIME) // 10 112 return datetime(1970, 1, 1, tzinfo=timezone.utc) + timedelta(microseconds=us) 113 114 115ZERO = timedelta(0) 116HOUR = timedelta(hours=1) 117SECOND = timedelta(seconds=1) 118 119# A class capturing the platform's idea of local time. 120# (May result in wrong values on historical times in 121# timezones where UTC offset and/or the DST rules had 122# changed in the past.) 123 124STDOFFSET = timedelta(seconds=-_time.timezone) 125if _time.daylight: 126 DSTOFFSET = timedelta(seconds=-_time.altzone) 127else: 128 DSTOFFSET = STDOFFSET 129 130DSTDIFF = DSTOFFSET - STDOFFSET 131 132 133class LocalTimezone(tzinfo): 134 135 def fromutc(self, dt): 136 assert dt.tzinfo is self 137 stamp = (dt - datetime(1970, 1, 1, tzinfo=self)) // SECOND 138 args = _time.localtime(stamp)[:6] 139 dst_diff = DSTDIFF // SECOND 140 # Detect fold 141 fold = (args == _time.localtime(stamp - dst_diff)) 142 return datetime(*args, microsecond=dt.microsecond, tzinfo=self) 143 144 def utcoffset(self, dt): 145 if self._isdst(dt): 146 return DSTOFFSET 147 else: 148 return STDOFFSET 149 150 def dst(self, dt): 151 if self._isdst(dt): 152 return DSTDIFF 153 else: 154 return ZERO 155 156 def tzname(self, dt): 157 return _time.tzname[self._isdst(dt)] 158 159 def _isdst(self, dt): 160 tt = (dt.year, dt.month, dt.day, 161 dt.hour, dt.minute, dt.second, 162 dt.weekday(), 0, 0) 163 stamp = _time.mktime(tt) 164 tt = _time.localtime(stamp) 165 return tt.tm_isdst > 0 166 167 168Local = LocalTimezone() 169TIMESTAMP_ADJUST = -11644473600 170 171 172class UTC(tzinfo): 173 """UTC""" 174 175 def utcoffset(self, dt): 176 return ZERO 177 178 def tzname(self, dt): 179 return "UTC" 180 181 def dst(self, dt): 182 return ZERO 183 184 def _call__(self): 185 return self 186 187 188class ArchiveTimestamp(int): 189 """Windows FILETIME timestamp.""" 190 191 def __repr__(self): 192 return '%s(%d)' % (type(self).__name__, self) 193 194 def totimestamp(self) -> float: 195 """Convert 7z FILETIME to Python timestamp.""" 196 # FILETIME is 100-nanosecond intervals since 1601/01/01 (UTC) 197 return (self / 10000000.0) + TIMESTAMP_ADJUST 198 199 def as_datetime(self): 200 """Convert FILETIME to Python datetime object.""" 201 return datetime.fromtimestamp(self.totimestamp(), UTC()) 202 203 @staticmethod 204 def from_datetime(val): 205 return ArchiveTimestamp((val - TIMESTAMP_ADJUST) * 10000000.0) 206 207 208def islink(path): 209 """ 210 Cross-platform islink implementation. 211 Supports Windows NT symbolic links and reparse points. 212 """ 213 is_symlink = os.path.islink(path) 214 if sys.version_info >= (3, 8) or sys.platform != "win32" or sys.getwindowsversion()[0] < 6: 215 return is_symlink 216 # special check for directory junctions which py38 does. 217 if is_symlink: 218 if py7zr.win32compat.is_reparse_point(path): 219 is_symlink = False 220 return is_symlink 221 222 223def readlink(path: Union[str, pathlib.Path], *, dir_fd=None) -> Union[str, pathlib.Path]: 224 """ 225 Cross-platform compat implementation of os.readlink and Path.readlink(). 226 Supports Windows NT symbolic links and reparse points. 227 When called with path argument as pathlike(str), return result as a pathlike(str). 228 When called with Path object, return also Path object. 229 When called with path argument as bytes, return result as a bytes. 230 """ 231 is_path_pathlib = isinstance(path, pathlib.Path) 232 if sys.version_info >= (3, 9): 233 if is_path_pathlib and dir_fd is None: 234 return path.readlink() 235 else: 236 return os.readlink(path, dir_fd=dir_fd) 237 elif sys.version_info >= (3, 8) or sys.platform != "win32": 238 res = os.readlink(path, dir_fd=dir_fd) 239 # Hack to handle a wrong type of results 240 if isinstance(res, bytes): 241 res = os.fsdecode(res) 242 if is_path_pathlib: 243 return pathlib.Path(res) 244 else: 245 return res 246 elif not os.path.exists(str(path)): 247 raise OSError(22, 'Invalid argument', path) 248 return py7zr.win32compat.readlink(path) 249 250 251class MemIO: 252 """pathlib.Path-like IO class to write memory(io.Bytes)""" 253 def __init__(self, buf: BinaryIO): 254 self._buf = buf 255 256 def write(self, data: bytes) -> int: 257 return self._buf.write(data) 258 259 def read(self, length: Optional[int] = None) -> bytes: 260 if length is not None: 261 return self._buf.read(length) 262 else: 263 return self._buf.read() 264 265 def close(self) -> None: 266 self._buf.seek(0) 267 268 def flush(self) -> None: 269 pass 270 271 def seek(self, position: int) -> None: 272 self._buf.seek(position) 273 274 def open(self, mode=None): 275 return self 276 277 @property 278 def parent(self): 279 return self 280 281 def mkdir(self, parents=None, exist_ok=False): 282 return None 283 284 def __enter__(self): 285 return self 286 287 def __exit__(self, exc_type, exc_val, exc_tb): 288 pass 289 290 291class NullIO: 292 """pathlib.Path-like IO class of /dev/null""" 293 294 def __init__(self): 295 pass 296 297 def write(self, data): 298 return len(data) 299 300 def read(self, length=None): 301 if length is not None: 302 return bytes(length) 303 else: 304 return b'' 305 306 def close(self): 307 pass 308 309 def flush(self): 310 pass 311 312 def open(self, mode=None): 313 return self 314 315 @property 316 def parent(self): 317 return self 318 319 def mkdir(self): 320 return None 321 322 def __enter__(self): 323 return self 324 325 def __exit__(self, exc_type, exc_val, exc_tb): 326 pass 327 328 329class BufferOverflow(Exception): 330 pass 331 332 333class Buffer: 334 335 def __init__(self, size: int = 16): 336 self._size = size 337 self._buf = bytearray(size) 338 self._buflen = 0 339 self.view = memoryview(self._buf[0:0]) 340 341 def add(self, data: Union[bytes, bytearray, memoryview]): 342 length = len(data) 343 if length + self._buflen > self._size: 344 raise BufferOverflow() 345 self._buf[self._buflen:self._buflen + length] = data 346 self._buflen += length 347 self.view = memoryview(self._buf[0:self._buflen]) 348 349 def reset(self) -> None: 350 self._buflen = 0 351 self.view = memoryview(self._buf[0:0]) 352 353 def set(self, data: Union[bytes, bytearray, memoryview]) -> None: 354 length = len(data) 355 if length > self._size: 356 raise BufferOverflow() 357 self._buf[0:length] = data 358 self._buflen = length 359 self.view = memoryview(self._buf[0:length]) 360 361 def __len__(self) -> int: 362 return self._buflen 363