1# -*- coding: utf-8 -*- 2# 3# Electrum - lightweight Bitcoin client 4# Copyright (C) 2018 The Electrum developers 5# 6# Permission is hereby granted, free of charge, to any person 7# obtaining a copy of this software and associated documentation files 8# (the "Software"), to deal in the Software without restriction, 9# including without limitation the rights to use, copy, modify, merge, 10# publish, distribute, sublicense, and/or sell copies of the Software, 11# and to permit persons to whom the Software is furnished to do so, 12# subject to the following conditions: 13# 14# The above copyright notice and this permission notice shall be 15# included in all copies or substantial portions of the Software. 16# 17# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 18# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 19# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 20# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 21# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 22# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 23# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 24# SOFTWARE. 25 26import base64 27import os 28import sys 29import hashlib 30import hmac 31from typing import Union 32 33from .util import assert_bytes, InvalidPassword, to_bytes, to_string, WalletFileException, versiontuple 34from .i18n import _ 35from .logging import get_logger 36 37 38_logger = get_logger(__name__) 39 40 41HAS_PYAES = False 42try: 43 import pyaes 44except: 45 pass 46else: 47 HAS_PYAES = True 48 49HAS_CRYPTODOME = False 50MIN_CRYPTODOME_VERSION = "3.7" 51try: 52 import Cryptodome 53 if versiontuple(Cryptodome.__version__) < versiontuple(MIN_CRYPTODOME_VERSION): 54 _logger.warning(f"found module 'Cryptodome' but it is too old: {Cryptodome.__version__}<{MIN_CRYPTODOME_VERSION}") 55 raise Exception() 56 from Cryptodome.Cipher import ChaCha20_Poly1305 as CD_ChaCha20_Poly1305 57 from Cryptodome.Cipher import ChaCha20 as CD_ChaCha20 58 from Cryptodome.Cipher import AES as CD_AES 59except: 60 pass 61else: 62 HAS_CRYPTODOME = True 63 64HAS_CRYPTOGRAPHY = False 65MIN_CRYPTOGRAPHY_VERSION = "2.1" 66try: 67 import cryptography 68 if versiontuple(cryptography.__version__) < versiontuple(MIN_CRYPTOGRAPHY_VERSION): 69 _logger.warning(f"found module 'cryptography' but it is too old: {cryptography.__version__}<{MIN_CRYPTOGRAPHY_VERSION}") 70 raise Exception() 71 from cryptography import exceptions 72 from cryptography.hazmat.primitives.ciphers import Cipher as CG_Cipher 73 from cryptography.hazmat.primitives.ciphers import algorithms as CG_algorithms 74 from cryptography.hazmat.primitives.ciphers import modes as CG_modes 75 from cryptography.hazmat.backends import default_backend as CG_default_backend 76 import cryptography.hazmat.primitives.ciphers.aead as CG_aead 77except: 78 pass 79else: 80 HAS_CRYPTOGRAPHY = True 81 82 83if not (HAS_CRYPTODOME or HAS_CRYPTOGRAPHY): 84 sys.exit(f"Error: at least one of ('pycryptodomex', 'cryptography') needs to be installed.") 85 86 87class InvalidPadding(Exception): 88 pass 89 90 91def append_PKCS7_padding(data: bytes) -> bytes: 92 assert_bytes(data) 93 padlen = 16 - (len(data) % 16) 94 return data + bytes([padlen]) * padlen 95 96 97def strip_PKCS7_padding(data: bytes) -> bytes: 98 assert_bytes(data) 99 if len(data) % 16 != 0 or len(data) == 0: 100 raise InvalidPadding("invalid length") 101 padlen = data[-1] 102 if not (0 < padlen <= 16): 103 raise InvalidPadding("invalid padding byte (out of range)") 104 for i in data[-padlen:]: 105 if i != padlen: 106 raise InvalidPadding("invalid padding byte (inconsistent)") 107 return data[0:-padlen] 108 109 110def aes_encrypt_with_iv(key: bytes, iv: bytes, data: bytes) -> bytes: 111 assert_bytes(key, iv, data) 112 data = append_PKCS7_padding(data) 113 if HAS_CRYPTODOME: 114 e = CD_AES.new(key, CD_AES.MODE_CBC, iv).encrypt(data) 115 elif HAS_CRYPTOGRAPHY: 116 cipher = CG_Cipher(CG_algorithms.AES(key), CG_modes.CBC(iv), backend=CG_default_backend()) 117 encryptor = cipher.encryptor() 118 e = encryptor.update(data) + encryptor.finalize() 119 elif HAS_PYAES: 120 aes_cbc = pyaes.AESModeOfOperationCBC(key, iv=iv) 121 aes = pyaes.Encrypter(aes_cbc, padding=pyaes.PADDING_NONE) 122 e = aes.feed(data) + aes.feed() # empty aes.feed() flushes buffer 123 else: 124 raise Exception("no AES backend found") 125 return e 126 127 128def aes_decrypt_with_iv(key: bytes, iv: bytes, data: bytes) -> bytes: 129 assert_bytes(key, iv, data) 130 if HAS_CRYPTODOME: 131 cipher = CD_AES.new(key, CD_AES.MODE_CBC, iv) 132 data = cipher.decrypt(data) 133 elif HAS_CRYPTOGRAPHY: 134 cipher = CG_Cipher(CG_algorithms.AES(key), CG_modes.CBC(iv), backend=CG_default_backend()) 135 decryptor = cipher.decryptor() 136 data = decryptor.update(data) + decryptor.finalize() 137 elif HAS_PYAES: 138 aes_cbc = pyaes.AESModeOfOperationCBC(key, iv=iv) 139 aes = pyaes.Decrypter(aes_cbc, padding=pyaes.PADDING_NONE) 140 data = aes.feed(data) + aes.feed() # empty aes.feed() flushes buffer 141 else: 142 raise Exception("no AES backend found") 143 try: 144 return strip_PKCS7_padding(data) 145 except InvalidPadding: 146 raise InvalidPassword() 147 148 149def EncodeAES_base64(secret: bytes, msg: bytes) -> bytes: 150 """Returns base64 encoded ciphertext.""" 151 e = EncodeAES_bytes(secret, msg) 152 return base64.b64encode(e) 153 154 155def EncodeAES_bytes(secret: bytes, msg: bytes) -> bytes: 156 assert_bytes(msg) 157 iv = bytes(os.urandom(16)) 158 ct = aes_encrypt_with_iv(secret, iv, msg) 159 return iv + ct 160 161 162def DecodeAES_base64(secret: bytes, ciphertext_b64: Union[bytes, str]) -> bytes: 163 ciphertext = bytes(base64.b64decode(ciphertext_b64)) 164 return DecodeAES_bytes(secret, ciphertext) 165 166 167def DecodeAES_bytes(secret: bytes, ciphertext: bytes) -> bytes: 168 assert_bytes(ciphertext) 169 iv, e = ciphertext[:16], ciphertext[16:] 170 s = aes_decrypt_with_iv(secret, iv, e) 171 return s 172 173 174PW_HASH_VERSION_LATEST = 1 175KNOWN_PW_HASH_VERSIONS = (1, 2,) 176SUPPORTED_PW_HASH_VERSIONS = (1,) 177assert PW_HASH_VERSION_LATEST in KNOWN_PW_HASH_VERSIONS 178assert PW_HASH_VERSION_LATEST in SUPPORTED_PW_HASH_VERSIONS 179 180 181class UnexpectedPasswordHashVersion(InvalidPassword, WalletFileException): 182 def __init__(self, version): 183 self.version = version 184 185 def __str__(self): 186 return "{unexpected}: {version}\n{instruction}".format( 187 unexpected=_("Unexpected password hash version"), 188 version=self.version, 189 instruction=_('You are most likely using an outdated version of Electrum. Please update.')) 190 191 192class UnsupportedPasswordHashVersion(InvalidPassword, WalletFileException): 193 def __init__(self, version): 194 self.version = version 195 196 def __str__(self): 197 return "{unsupported}: {version}\n{instruction}".format( 198 unsupported=_("Unsupported password hash version"), 199 version=self.version, 200 instruction=f"To open this wallet, try 'git checkout password_v{self.version}'.\n" 201 "Alternatively, restore from seed.") 202 203 204def _hash_password(password: Union[bytes, str], *, version: int) -> bytes: 205 pw = to_bytes(password, 'utf8') 206 if version not in SUPPORTED_PW_HASH_VERSIONS: 207 raise UnsupportedPasswordHashVersion(version) 208 if version == 1: 209 return sha256d(pw) 210 else: 211 assert version not in KNOWN_PW_HASH_VERSIONS 212 raise UnexpectedPasswordHashVersion(version) 213 214 215def _pw_encode_raw(data: bytes, password: Union[bytes, str], *, version: int) -> bytes: 216 if version not in KNOWN_PW_HASH_VERSIONS: 217 raise UnexpectedPasswordHashVersion(version) 218 # derive key from password 219 secret = _hash_password(password, version=version) 220 # encrypt given data 221 ciphertext = EncodeAES_bytes(secret, data) 222 return ciphertext 223 224 225def _pw_decode_raw(data_bytes: bytes, password: Union[bytes, str], *, version: int) -> bytes: 226 if version not in KNOWN_PW_HASH_VERSIONS: 227 raise UnexpectedPasswordHashVersion(version) 228 # derive key from password 229 secret = _hash_password(password, version=version) 230 # decrypt given data 231 try: 232 d = DecodeAES_bytes(secret, data_bytes) 233 except Exception as e: 234 raise InvalidPassword() from e 235 return d 236 237 238def pw_encode_bytes(data: bytes, password: Union[bytes, str], *, version: int) -> str: 239 """plaintext bytes -> base64 ciphertext""" 240 ciphertext = _pw_encode_raw(data, password, version=version) 241 ciphertext_b64 = base64.b64encode(ciphertext) 242 return ciphertext_b64.decode('utf8') 243 244 245def pw_decode_bytes(data: str, password: Union[bytes, str], *, version:int) -> bytes: 246 """base64 ciphertext -> plaintext bytes""" 247 if version not in KNOWN_PW_HASH_VERSIONS: 248 raise UnexpectedPasswordHashVersion(version) 249 data_bytes = bytes(base64.b64decode(data)) 250 return _pw_decode_raw(data_bytes, password, version=version) 251 252 253def pw_encode_with_version_and_mac(data: bytes, password: Union[bytes, str]) -> str: 254 """plaintext bytes -> base64 ciphertext""" 255 # https://crypto.stackexchange.com/questions/202/should-we-mac-then-encrypt-or-encrypt-then-mac 256 # Encrypt-and-MAC. The MAC will be used to detect invalid passwords 257 version = PW_HASH_VERSION_LATEST 258 mac = sha256(data)[0:4] 259 ciphertext = _pw_encode_raw(data, password, version=version) 260 ciphertext_b64 = base64.b64encode(bytes([version]) + ciphertext + mac) 261 return ciphertext_b64.decode('utf8') 262 263 264def pw_decode_with_version_and_mac(data: str, password: Union[bytes, str]) -> bytes: 265 """base64 ciphertext -> plaintext bytes""" 266 data_bytes = bytes(base64.b64decode(data)) 267 version = int(data_bytes[0]) 268 encrypted = data_bytes[1:-4] 269 mac = data_bytes[-4:] 270 if version not in KNOWN_PW_HASH_VERSIONS: 271 raise UnexpectedPasswordHashVersion(version) 272 decrypted = _pw_decode_raw(encrypted, password, version=version) 273 if sha256(decrypted)[0:4] != mac: 274 raise InvalidPassword() 275 return decrypted 276 277 278def pw_encode(data: str, password: Union[bytes, str, None], *, version: int) -> str: 279 """plaintext str -> base64 ciphertext""" 280 if not password: 281 return data 282 plaintext_bytes = to_bytes(data, "utf8") 283 return pw_encode_bytes(plaintext_bytes, password, version=version) 284 285 286def pw_decode(data: str, password: Union[bytes, str, None], *, version: int) -> str: 287 """base64 ciphertext -> plaintext str""" 288 if password is None: 289 return data 290 plaintext_bytes = pw_decode_bytes(data, password, version=version) 291 try: 292 plaintext_str = to_string(plaintext_bytes, "utf8") 293 except UnicodeDecodeError as e: 294 raise InvalidPassword() from e 295 return plaintext_str 296 297 298def sha256(x: Union[bytes, str]) -> bytes: 299 x = to_bytes(x, 'utf8') 300 return bytes(hashlib.sha256(x).digest()) 301 302 303def sha256d(x: Union[bytes, str]) -> bytes: 304 x = to_bytes(x, 'utf8') 305 out = bytes(sha256(sha256(x))) 306 return out 307 308 309def hash_160(x: bytes) -> bytes: 310 return ripemd(sha256(x)) 311 312def ripemd(x): 313 try: 314 md = hashlib.new('ripemd160') 315 md.update(x) 316 return md.digest() 317 except BaseException: 318 # ripemd160 is not guaranteed to be available in hashlib on all platforms. 319 # Historically, our Android builds had hashlib/openssl which did not have it. 320 # see https://github.com/spesmilo/electrum/issues/7093 321 # We bundle a pure python implementation as fallback that gets used now: 322 from . import ripemd 323 md = ripemd.new(x) 324 return md.digest() 325 326def hmac_oneshot(key: bytes, msg: bytes, digest) -> bytes: 327 if hasattr(hmac, 'digest'): 328 # requires python 3.7+; faster 329 return hmac.digest(key, msg, digest) 330 else: 331 return hmac.new(key, msg, digest).digest() 332 333 334def chacha20_poly1305_encrypt( 335 *, 336 key: bytes, 337 nonce: bytes, 338 associated_data: bytes = None, 339 data: bytes 340) -> bytes: 341 assert isinstance(key, (bytes, bytearray)) 342 assert isinstance(nonce, (bytes, bytearray)) 343 assert isinstance(associated_data, (bytes, bytearray, type(None))) 344 assert isinstance(data, (bytes, bytearray)) 345 assert len(key) == 32, f"unexpected key size: {len(key)} (expected: 32)" 346 assert len(nonce) == 12, f"unexpected nonce size: {len(nonce)} (expected: 12)" 347 if HAS_CRYPTODOME: 348 cipher = CD_ChaCha20_Poly1305.new(key=key, nonce=nonce) 349 if associated_data is not None: 350 cipher.update(associated_data) 351 ciphertext, mac = cipher.encrypt_and_digest(plaintext=data) 352 return ciphertext + mac 353 if HAS_CRYPTOGRAPHY: 354 a = CG_aead.ChaCha20Poly1305(key) 355 return a.encrypt(nonce, data, associated_data) 356 raise Exception("no chacha20 backend found") 357 358 359def chacha20_poly1305_decrypt( 360 *, 361 key: bytes, 362 nonce: bytes, 363 associated_data: bytes = None, 364 data: bytes 365) -> bytes: 366 assert isinstance(key, (bytes, bytearray)) 367 assert isinstance(nonce, (bytes, bytearray)) 368 assert isinstance(associated_data, (bytes, bytearray, type(None))) 369 assert isinstance(data, (bytes, bytearray)) 370 assert len(key) == 32, f"unexpected key size: {len(key)} (expected: 32)" 371 assert len(nonce) == 12, f"unexpected nonce size: {len(nonce)} (expected: 12)" 372 if HAS_CRYPTODOME: 373 cipher = CD_ChaCha20_Poly1305.new(key=key, nonce=nonce) 374 if associated_data is not None: 375 cipher.update(associated_data) 376 # raises ValueError if not valid (e.g. incorrect MAC) 377 return cipher.decrypt_and_verify(ciphertext=data[:-16], received_mac_tag=data[-16:]) 378 if HAS_CRYPTOGRAPHY: 379 a = CG_aead.ChaCha20Poly1305(key) 380 try: 381 return a.decrypt(nonce, data, associated_data) 382 except cryptography.exceptions.InvalidTag as e: 383 raise ValueError("invalid tag") from e 384 raise Exception("no chacha20 backend found") 385 386 387def chacha20_encrypt(*, key: bytes, nonce: bytes, data: bytes) -> bytes: 388 assert isinstance(key, (bytes, bytearray)) 389 assert isinstance(nonce, (bytes, bytearray)) 390 assert isinstance(data, (bytes, bytearray)) 391 assert len(key) == 32, f"unexpected key size: {len(key)} (expected: 32)" 392 assert len(nonce) in (8, 12), f"unexpected nonce size: {len(nonce)} (expected: 8 or 12)" 393 if HAS_CRYPTODOME: 394 cipher = CD_ChaCha20.new(key=key, nonce=nonce) 395 return cipher.encrypt(data) 396 if HAS_CRYPTOGRAPHY: 397 nonce = bytes(16 - len(nonce)) + nonce # cryptography wants 16 byte nonces 398 algo = CG_algorithms.ChaCha20(key=key, nonce=nonce) 399 cipher = CG_Cipher(algo, mode=None, backend=CG_default_backend()) 400 encryptor = cipher.encryptor() 401 return encryptor.update(data) 402 raise Exception("no chacha20 backend found") 403 404 405def chacha20_decrypt(*, key: bytes, nonce: bytes, data: bytes) -> bytes: 406 assert isinstance(key, (bytes, bytearray)) 407 assert isinstance(nonce, (bytes, bytearray)) 408 assert isinstance(data, (bytes, bytearray)) 409 assert len(key) == 32, f"unexpected key size: {len(key)} (expected: 32)" 410 assert len(nonce) in (8, 12), f"unexpected nonce size: {len(nonce)} (expected: 8 or 12)" 411 if HAS_CRYPTODOME: 412 cipher = CD_ChaCha20.new(key=key, nonce=nonce) 413 return cipher.decrypt(data) 414 if HAS_CRYPTOGRAPHY: 415 nonce = bytes(16 - len(nonce)) + nonce # cryptography wants 16 byte nonces 416 algo = CG_algorithms.ChaCha20(key=key, nonce=nonce) 417 cipher = CG_Cipher(algo, mode=None, backend=CG_default_backend()) 418 decryptor = cipher.decryptor() 419 return decryptor.update(data) 420 raise Exception("no chacha20 backend found") 421