1# -*- coding: utf-8 -*-
2#
3# Electrum - lightweight Bitcoin client
4# Copyright (C) 2018 The Electrum developers
5#
6# Permission is hereby granted, free of charge, to any person
7# obtaining a copy of this software and associated documentation files
8# (the "Software"), to deal in the Software without restriction,
9# including without limitation the rights to use, copy, modify, merge,
10# publish, distribute, sublicense, and/or sell copies of the Software,
11# and to permit persons to whom the Software is furnished to do so,
12# subject to the following conditions:
13#
14# The above copyright notice and this permission notice shall be
15# included in all copies or substantial portions of the Software.
16#
17# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24# SOFTWARE.
25
26import base64
27import os
28import sys
29import hashlib
30import hmac
31from typing import Union
32
33from .util import assert_bytes, InvalidPassword, to_bytes, to_string, WalletFileException, versiontuple
34from .i18n import _
35from .logging import get_logger
36
37
38_logger = get_logger(__name__)
39
40
41HAS_PYAES = False
42try:
43    import pyaes
44except:
45    pass
46else:
47    HAS_PYAES = True
48
49HAS_CRYPTODOME = False
50MIN_CRYPTODOME_VERSION = "3.7"
51try:
52    import Cryptodome
53    if versiontuple(Cryptodome.__version__) < versiontuple(MIN_CRYPTODOME_VERSION):
54        _logger.warning(f"found module 'Cryptodome' but it is too old: {Cryptodome.__version__}<{MIN_CRYPTODOME_VERSION}")
55        raise Exception()
56    from Cryptodome.Cipher import ChaCha20_Poly1305 as CD_ChaCha20_Poly1305
57    from Cryptodome.Cipher import ChaCha20 as CD_ChaCha20
58    from Cryptodome.Cipher import AES as CD_AES
59except:
60    pass
61else:
62    HAS_CRYPTODOME = True
63
64HAS_CRYPTOGRAPHY = False
65MIN_CRYPTOGRAPHY_VERSION = "2.1"
66try:
67    import cryptography
68    if versiontuple(cryptography.__version__) < versiontuple(MIN_CRYPTOGRAPHY_VERSION):
69        _logger.warning(f"found module 'cryptography' but it is too old: {cryptography.__version__}<{MIN_CRYPTOGRAPHY_VERSION}")
70        raise Exception()
71    from cryptography import exceptions
72    from cryptography.hazmat.primitives.ciphers import Cipher as CG_Cipher
73    from cryptography.hazmat.primitives.ciphers import algorithms as CG_algorithms
74    from cryptography.hazmat.primitives.ciphers import modes as CG_modes
75    from cryptography.hazmat.backends import default_backend as CG_default_backend
76    import cryptography.hazmat.primitives.ciphers.aead as CG_aead
77except:
78    pass
79else:
80    HAS_CRYPTOGRAPHY = True
81
82
83if not (HAS_CRYPTODOME or HAS_CRYPTOGRAPHY):
84    sys.exit(f"Error: at least one of ('pycryptodomex', 'cryptography') needs to be installed.")
85
86
87class InvalidPadding(Exception):
88    pass
89
90
91def append_PKCS7_padding(data: bytes) -> bytes:
92    assert_bytes(data)
93    padlen = 16 - (len(data) % 16)
94    return data + bytes([padlen]) * padlen
95
96
97def strip_PKCS7_padding(data: bytes) -> bytes:
98    assert_bytes(data)
99    if len(data) % 16 != 0 or len(data) == 0:
100        raise InvalidPadding("invalid length")
101    padlen = data[-1]
102    if not (0 < padlen <= 16):
103        raise InvalidPadding("invalid padding byte (out of range)")
104    for i in data[-padlen:]:
105        if i != padlen:
106            raise InvalidPadding("invalid padding byte (inconsistent)")
107    return data[0:-padlen]
108
109
110def aes_encrypt_with_iv(key: bytes, iv: bytes, data: bytes) -> bytes:
111    assert_bytes(key, iv, data)
112    data = append_PKCS7_padding(data)
113    if HAS_CRYPTODOME:
114        e = CD_AES.new(key, CD_AES.MODE_CBC, iv).encrypt(data)
115    elif HAS_CRYPTOGRAPHY:
116        cipher = CG_Cipher(CG_algorithms.AES(key), CG_modes.CBC(iv), backend=CG_default_backend())
117        encryptor = cipher.encryptor()
118        e = encryptor.update(data) + encryptor.finalize()
119    elif HAS_PYAES:
120        aes_cbc = pyaes.AESModeOfOperationCBC(key, iv=iv)
121        aes = pyaes.Encrypter(aes_cbc, padding=pyaes.PADDING_NONE)
122        e = aes.feed(data) + aes.feed()  # empty aes.feed() flushes buffer
123    else:
124        raise Exception("no AES backend found")
125    return e
126
127
128def aes_decrypt_with_iv(key: bytes, iv: bytes, data: bytes) -> bytes:
129    assert_bytes(key, iv, data)
130    if HAS_CRYPTODOME:
131        cipher = CD_AES.new(key, CD_AES.MODE_CBC, iv)
132        data = cipher.decrypt(data)
133    elif HAS_CRYPTOGRAPHY:
134        cipher = CG_Cipher(CG_algorithms.AES(key), CG_modes.CBC(iv), backend=CG_default_backend())
135        decryptor = cipher.decryptor()
136        data = decryptor.update(data) + decryptor.finalize()
137    elif HAS_PYAES:
138        aes_cbc = pyaes.AESModeOfOperationCBC(key, iv=iv)
139        aes = pyaes.Decrypter(aes_cbc, padding=pyaes.PADDING_NONE)
140        data = aes.feed(data) + aes.feed()  # empty aes.feed() flushes buffer
141    else:
142        raise Exception("no AES backend found")
143    try:
144        return strip_PKCS7_padding(data)
145    except InvalidPadding:
146        raise InvalidPassword()
147
148
149def EncodeAES_base64(secret: bytes, msg: bytes) -> bytes:
150    """Returns base64 encoded ciphertext."""
151    e = EncodeAES_bytes(secret, msg)
152    return base64.b64encode(e)
153
154
155def EncodeAES_bytes(secret: bytes, msg: bytes) -> bytes:
156    assert_bytes(msg)
157    iv = bytes(os.urandom(16))
158    ct = aes_encrypt_with_iv(secret, iv, msg)
159    return iv + ct
160
161
162def DecodeAES_base64(secret: bytes, ciphertext_b64: Union[bytes, str]) -> bytes:
163    ciphertext = bytes(base64.b64decode(ciphertext_b64))
164    return DecodeAES_bytes(secret, ciphertext)
165
166
167def DecodeAES_bytes(secret: bytes, ciphertext: bytes) -> bytes:
168    assert_bytes(ciphertext)
169    iv, e = ciphertext[:16], ciphertext[16:]
170    s = aes_decrypt_with_iv(secret, iv, e)
171    return s
172
173
174PW_HASH_VERSION_LATEST = 1
175KNOWN_PW_HASH_VERSIONS = (1, 2,)
176SUPPORTED_PW_HASH_VERSIONS = (1,)
177assert PW_HASH_VERSION_LATEST in KNOWN_PW_HASH_VERSIONS
178assert PW_HASH_VERSION_LATEST in SUPPORTED_PW_HASH_VERSIONS
179
180
181class UnexpectedPasswordHashVersion(InvalidPassword, WalletFileException):
182    def __init__(self, version):
183        self.version = version
184
185    def __str__(self):
186        return "{unexpected}: {version}\n{instruction}".format(
187            unexpected=_("Unexpected password hash version"),
188            version=self.version,
189            instruction=_('You are most likely using an outdated version of Electrum. Please update.'))
190
191
192class UnsupportedPasswordHashVersion(InvalidPassword, WalletFileException):
193    def __init__(self, version):
194        self.version = version
195
196    def __str__(self):
197        return "{unsupported}: {version}\n{instruction}".format(
198            unsupported=_("Unsupported password hash version"),
199            version=self.version,
200            instruction=f"To open this wallet, try 'git checkout password_v{self.version}'.\n"
201                        "Alternatively, restore from seed.")
202
203
204def _hash_password(password: Union[bytes, str], *, version: int) -> bytes:
205    pw = to_bytes(password, 'utf8')
206    if version not in SUPPORTED_PW_HASH_VERSIONS:
207        raise UnsupportedPasswordHashVersion(version)
208    if version == 1:
209        return sha256d(pw)
210    else:
211        assert version not in KNOWN_PW_HASH_VERSIONS
212        raise UnexpectedPasswordHashVersion(version)
213
214
215def _pw_encode_raw(data: bytes, password: Union[bytes, str], *, version: int) -> bytes:
216    if version not in KNOWN_PW_HASH_VERSIONS:
217        raise UnexpectedPasswordHashVersion(version)
218    # derive key from password
219    secret = _hash_password(password, version=version)
220    # encrypt given data
221    ciphertext = EncodeAES_bytes(secret, data)
222    return ciphertext
223
224
225def _pw_decode_raw(data_bytes: bytes, password: Union[bytes, str], *, version: int) -> bytes:
226    if version not in KNOWN_PW_HASH_VERSIONS:
227        raise UnexpectedPasswordHashVersion(version)
228    # derive key from password
229    secret = _hash_password(password, version=version)
230    # decrypt given data
231    try:
232        d = DecodeAES_bytes(secret, data_bytes)
233    except Exception as e:
234        raise InvalidPassword() from e
235    return d
236
237
238def pw_encode_bytes(data: bytes, password: Union[bytes, str], *, version: int) -> str:
239    """plaintext bytes -> base64 ciphertext"""
240    ciphertext = _pw_encode_raw(data, password, version=version)
241    ciphertext_b64 = base64.b64encode(ciphertext)
242    return ciphertext_b64.decode('utf8')
243
244
245def pw_decode_bytes(data: str, password: Union[bytes, str], *, version:int) -> bytes:
246    """base64 ciphertext -> plaintext bytes"""
247    if version not in KNOWN_PW_HASH_VERSIONS:
248        raise UnexpectedPasswordHashVersion(version)
249    data_bytes = bytes(base64.b64decode(data))
250    return _pw_decode_raw(data_bytes, password, version=version)
251
252
253def pw_encode_with_version_and_mac(data: bytes, password: Union[bytes, str]) -> str:
254    """plaintext bytes -> base64 ciphertext"""
255    # https://crypto.stackexchange.com/questions/202/should-we-mac-then-encrypt-or-encrypt-then-mac
256    # Encrypt-and-MAC. The MAC will be used to detect invalid passwords
257    version = PW_HASH_VERSION_LATEST
258    mac = sha256(data)[0:4]
259    ciphertext = _pw_encode_raw(data, password, version=version)
260    ciphertext_b64 = base64.b64encode(bytes([version]) + ciphertext + mac)
261    return ciphertext_b64.decode('utf8')
262
263
264def pw_decode_with_version_and_mac(data: str, password: Union[bytes, str]) -> bytes:
265    """base64 ciphertext -> plaintext bytes"""
266    data_bytes = bytes(base64.b64decode(data))
267    version = int(data_bytes[0])
268    encrypted = data_bytes[1:-4]
269    mac = data_bytes[-4:]
270    if version not in KNOWN_PW_HASH_VERSIONS:
271        raise UnexpectedPasswordHashVersion(version)
272    decrypted = _pw_decode_raw(encrypted, password, version=version)
273    if sha256(decrypted)[0:4] != mac:
274        raise InvalidPassword()
275    return decrypted
276
277
278def pw_encode(data: str, password: Union[bytes, str, None], *, version: int) -> str:
279    """plaintext str -> base64 ciphertext"""
280    if not password:
281        return data
282    plaintext_bytes = to_bytes(data, "utf8")
283    return pw_encode_bytes(plaintext_bytes, password, version=version)
284
285
286def pw_decode(data: str, password: Union[bytes, str, None], *, version: int) -> str:
287    """base64 ciphertext -> plaintext str"""
288    if password is None:
289        return data
290    plaintext_bytes = pw_decode_bytes(data, password, version=version)
291    try:
292        plaintext_str = to_string(plaintext_bytes, "utf8")
293    except UnicodeDecodeError as e:
294        raise InvalidPassword() from e
295    return plaintext_str
296
297
298def sha256(x: Union[bytes, str]) -> bytes:
299    x = to_bytes(x, 'utf8')
300    return bytes(hashlib.sha256(x).digest())
301
302
303def sha256d(x: Union[bytes, str]) -> bytes:
304    x = to_bytes(x, 'utf8')
305    out = bytes(sha256(sha256(x)))
306    return out
307
308
309def hash_160(x: bytes) -> bytes:
310    return ripemd(sha256(x))
311
312def ripemd(x):
313    try:
314        md = hashlib.new('ripemd160')
315        md.update(x)
316        return md.digest()
317    except BaseException:
318        # ripemd160 is not guaranteed to be available in hashlib on all platforms.
319        # Historically, our Android builds had hashlib/openssl which did not have it.
320        # see https://github.com/spesmilo/electrum/issues/7093
321        # We bundle a pure python implementation as fallback that gets used now:
322        from . import ripemd
323        md = ripemd.new(x)
324        return md.digest()
325
326def hmac_oneshot(key: bytes, msg: bytes, digest) -> bytes:
327    if hasattr(hmac, 'digest'):
328        # requires python 3.7+; faster
329        return hmac.digest(key, msg, digest)
330    else:
331        return hmac.new(key, msg, digest).digest()
332
333
334def chacha20_poly1305_encrypt(
335        *,
336        key: bytes,
337        nonce: bytes,
338        associated_data: bytes = None,
339        data: bytes
340) -> bytes:
341    assert isinstance(key, (bytes, bytearray))
342    assert isinstance(nonce, (bytes, bytearray))
343    assert isinstance(associated_data, (bytes, bytearray, type(None)))
344    assert isinstance(data, (bytes, bytearray))
345    assert len(key) == 32, f"unexpected key size: {len(key)} (expected: 32)"
346    assert len(nonce) == 12, f"unexpected nonce size: {len(nonce)} (expected: 12)"
347    if HAS_CRYPTODOME:
348        cipher = CD_ChaCha20_Poly1305.new(key=key, nonce=nonce)
349        if associated_data is not None:
350            cipher.update(associated_data)
351        ciphertext, mac = cipher.encrypt_and_digest(plaintext=data)
352        return ciphertext + mac
353    if HAS_CRYPTOGRAPHY:
354        a = CG_aead.ChaCha20Poly1305(key)
355        return a.encrypt(nonce, data, associated_data)
356    raise Exception("no chacha20 backend found")
357
358
359def chacha20_poly1305_decrypt(
360        *,
361        key: bytes,
362        nonce: bytes,
363        associated_data: bytes = None,
364        data: bytes
365) -> bytes:
366    assert isinstance(key, (bytes, bytearray))
367    assert isinstance(nonce, (bytes, bytearray))
368    assert isinstance(associated_data, (bytes, bytearray, type(None)))
369    assert isinstance(data, (bytes, bytearray))
370    assert len(key) == 32, f"unexpected key size: {len(key)} (expected: 32)"
371    assert len(nonce) == 12, f"unexpected nonce size: {len(nonce)} (expected: 12)"
372    if HAS_CRYPTODOME:
373        cipher = CD_ChaCha20_Poly1305.new(key=key, nonce=nonce)
374        if associated_data is not None:
375            cipher.update(associated_data)
376        # raises ValueError if not valid (e.g. incorrect MAC)
377        return cipher.decrypt_and_verify(ciphertext=data[:-16], received_mac_tag=data[-16:])
378    if HAS_CRYPTOGRAPHY:
379        a = CG_aead.ChaCha20Poly1305(key)
380        try:
381            return a.decrypt(nonce, data, associated_data)
382        except cryptography.exceptions.InvalidTag as e:
383            raise ValueError("invalid tag") from e
384    raise Exception("no chacha20 backend found")
385
386
387def chacha20_encrypt(*, key: bytes, nonce: bytes, data: bytes) -> bytes:
388    assert isinstance(key, (bytes, bytearray))
389    assert isinstance(nonce, (bytes, bytearray))
390    assert isinstance(data, (bytes, bytearray))
391    assert len(key) == 32, f"unexpected key size: {len(key)} (expected: 32)"
392    assert len(nonce) in (8, 12), f"unexpected nonce size: {len(nonce)} (expected: 8 or 12)"
393    if HAS_CRYPTODOME:
394        cipher = CD_ChaCha20.new(key=key, nonce=nonce)
395        return cipher.encrypt(data)
396    if HAS_CRYPTOGRAPHY:
397        nonce = bytes(16 - len(nonce)) + nonce  # cryptography wants 16 byte nonces
398        algo = CG_algorithms.ChaCha20(key=key, nonce=nonce)
399        cipher = CG_Cipher(algo, mode=None, backend=CG_default_backend())
400        encryptor = cipher.encryptor()
401        return encryptor.update(data)
402    raise Exception("no chacha20 backend found")
403
404
405def chacha20_decrypt(*, key: bytes, nonce: bytes, data: bytes) -> bytes:
406    assert isinstance(key, (bytes, bytearray))
407    assert isinstance(nonce, (bytes, bytearray))
408    assert isinstance(data, (bytes, bytearray))
409    assert len(key) == 32, f"unexpected key size: {len(key)} (expected: 32)"
410    assert len(nonce) in (8, 12), f"unexpected nonce size: {len(nonce)} (expected: 8 or 12)"
411    if HAS_CRYPTODOME:
412        cipher = CD_ChaCha20.new(key=key, nonce=nonce)
413        return cipher.decrypt(data)
414    if HAS_CRYPTOGRAPHY:
415        nonce = bytes(16 - len(nonce)) + nonce  # cryptography wants 16 byte nonces
416        algo = CG_algorithms.ChaCha20(key=key, nonce=nonce)
417        cipher = CG_Cipher(algo, mode=None, backend=CG_default_backend())
418        decryptor = cipher.decryptor()
419        return decryptor.update(data)
420    raise Exception("no chacha20 backend found")
421