1######################################################################
2#
3# File: b2sdk/encryption/setting.py
4#
5# Copyright 2021 Backblaze Inc. All Rights Reserved.
6#
7# License https://www.backblaze.com/using_b2_code.html
8#
9######################################################################
10
11import enum
12import logging
13from typing import Optional, Union
14import urllib
15import urllib.parse
16
17from ..http_constants import SSE_C_KEY_ID_FILE_INFO_KEY_NAME, SSE_C_KEY_ID_HEADER
18from ..utils import b64_of_bytes, md5_of_bytes
19from .types import ENCRYPTION_MODES_WITH_MANDATORY_ALGORITHM, ENCRYPTION_MODES_WITH_MANDATORY_KEY
20from .types import EncryptionAlgorithm, EncryptionMode
21
22logger = logging.getLogger(__name__)
23
24
25class _UnknownKeyId(enum.Enum):
26    """The purpose of this enum is to provide a sentinel that can be used with type annotations."""
27    unknown_key_id = 0
28
29
30UNKNOWN_KEY_ID = _UnknownKeyId.unknown_key_id
31"""
32Value for EncryptionKey.key_id that signifies that the key id may or may not be defined. Useful when establishing
33encryption settings based on user input (and not based on B2 cloud data).
34"""
35
36
37class EncryptionKey:
38    """
39    Hold information about encryption key: the key itself, and its id. The id may be None, if it's not set
40    in encrypted file's fileInfo, or UNKNOWN_KEY_ID when that information is missing.
41    The secret may be None, if encryption metadata is read from the server.
42    """
43    SECRET_REPR = '******'
44
45    def __init__(self, secret: Optional[bytes], key_id: Union[str, None, _UnknownKeyId]):
46        self.secret = secret
47        self.key_id = key_id
48
49    def __eq__(self, other):
50        return self.secret == other.secret and self.key_id == other.key_id
51
52    def __repr__(self):
53        key_repr = self.SECRET_REPR
54        if self.secret is None:
55            key_repr = None
56
57        if self.key_id is UNKNOWN_KEY_ID:
58            key_id_repr = 'unknown'
59        else:
60            key_id_repr = repr(self.key_id)
61        return '<%s(%s, %s)>' % (self.__class__.__name__, key_repr, key_id_repr)
62
63    def as_dict(self):
64        """
65        Dump EncryptionKey as dict for serializing a to json for requests.
66        """
67        if self.secret is not None:
68            return {
69                'customerKey': self.key_b64(),
70                'customerKeyMd5': self.key_md5(),
71            }
72        return {
73            'customerKey': self.SECRET_REPR,
74            'customerKeyMd5': self.SECRET_REPR,
75        }
76
77    def key_b64(self):
78        return b64_of_bytes(self.secret)
79
80    def key_md5(self):
81        return b64_of_bytes(md5_of_bytes(self.secret))
82
83
84class EncryptionSetting:
85    """
86    Hold information about encryption mode, algorithm and key (for bucket default,
87    file version info or even upload)
88    """
89
90    def __init__(
91        self,
92        mode: EncryptionMode,
93        algorithm: EncryptionAlgorithm = None,
94        key: EncryptionKey = None,
95    ):
96        """
97        :param b2sdk.v2.EncryptionMode mode: encryption mode
98        :param b2sdk.v2.EncryptionAlgorithm algorithm: encryption algorithm
99        :param b2sdk.v2.EncryptionKey key: encryption key object for SSE-C
100        """
101        self.mode = mode
102        self.algorithm = algorithm
103        self.key = key
104        if self.mode == EncryptionMode.NONE and (self.algorithm or self.key):
105            raise ValueError("cannot specify algorithm or key for 'plaintext' encryption mode")
106        if self.mode in ENCRYPTION_MODES_WITH_MANDATORY_ALGORITHM and not self.algorithm:
107            raise ValueError('must specify algorithm for encryption mode %s' % (self.mode,))
108        if self.mode in ENCRYPTION_MODES_WITH_MANDATORY_KEY and not self.key:
109            raise ValueError(
110                'must specify key for encryption mode %s and algorithm %s' %
111                (self.mode, self.algorithm)
112            )
113
114    def __eq__(self, other):
115        if other is None:
116            raise ValueError('cannot compare a known encryption setting to an unknown one')
117        return self.mode == other.mode and self.algorithm == other.algorithm and self.key == other.key
118
119    def serialize_to_json_for_request(self):
120        if self.key and self.key.secret is None:
121            raise ValueError('cannot use an unknown key in requests')
122        return self.as_dict()
123
124    def as_dict(self):
125        """
126        Represent the setting as a dict, for example:
127
128        .. code-block:: python
129
130            {
131                'mode': 'SSE-C',
132                'algorithm': 'AES256',
133                'customerKey': 'U3hWbVlxM3Q2djl5JEImRSlIQE1jUWZUalduWnI0dTc=',
134                'customerKeyMd5': 'SWx9GFv5BTT1jdwf48Bx+Q=='
135            }
136
137        .. code-block:: python
138
139            {
140                'mode': 'SSE-B2',
141                'algorithm': 'AES256'
142            }
143
144        or
145
146        .. code-block:: python
147
148            {
149                'mode': 'none'
150            }
151        """
152        result = {'mode': self.mode.value}
153        if self.algorithm is not None:
154            result['algorithm'] = self.algorithm.value
155        if self.mode == EncryptionMode.SSE_C:
156            result.update(self.key.as_dict())
157        return result
158
159    def add_to_upload_headers(self, headers):
160        if self.mode == EncryptionMode.NONE:
161            # as of 2021-03-16, server always fails it
162            headers['X-Bz-Server-Side-Encryption'] = self.mode.name
163        elif self.mode == EncryptionMode.SSE_B2:
164            self._add_sse_b2_headers(headers)
165        elif self.mode == EncryptionMode.SSE_C:
166            if self.key.key_id is UNKNOWN_KEY_ID:
167                raise ValueError('Cannot upload a file with an unknown key id')
168            self._add_sse_c_headers(headers)
169            if self.key.key_id is not None:
170                header = SSE_C_KEY_ID_HEADER
171                if headers.get(header) is not None and headers[header] != self.key.key_id:
172                    raise ValueError(
173                        'Ambiguous key id set: "%s" in headers and "%s" in %s' %
174                        (headers[header], self.key.key_id, self.__class__.__name__)
175                    )
176                headers[header] = urllib.parse.quote(str(self.key.key_id))
177        else:
178            raise NotImplementedError('unsupported encryption setting: %s' % (self,))
179
180    def add_to_download_headers(self, headers):
181        if self.mode == EncryptionMode.NONE:
182            return
183        elif self.mode == EncryptionMode.SSE_B2:
184            self._add_sse_b2_headers(headers)
185        elif self.mode == EncryptionMode.SSE_C:
186            self._add_sse_c_headers(headers)
187        else:
188            raise NotImplementedError('unsupported encryption setting: %s' % (self,))
189
190    def _add_sse_b2_headers(self, headers):
191        headers['X-Bz-Server-Side-Encryption'] = self.algorithm.name
192
193    def _add_sse_c_headers(self, headers):
194        if self.key.secret is None:
195            raise ValueError('Cannot use an unknown key in http headers')
196        headers['X-Bz-Server-Side-Encryption-Customer-Algorithm'] = self.algorithm.name
197        headers['X-Bz-Server-Side-Encryption-Customer-Key'] = self.key.key_b64()
198        headers['X-Bz-Server-Side-Encryption-Customer-Key-Md5'] = self.key.key_md5()
199
200    def add_key_id_to_file_info(self, file_info: Optional[dict]):
201        if self.key is None or self.key.key_id is None:
202            return file_info
203        if self.key.key_id is UNKNOWN_KEY_ID:
204            raise ValueError('Cannot add an unknown key id to file info')
205        if file_info is None:
206            file_info = {}
207        if file_info.get(SSE_C_KEY_ID_FILE_INFO_KEY_NAME) is not None and file_info[
208            SSE_C_KEY_ID_FILE_INFO_KEY_NAME] != self.key.key_id:
209            raise ValueError(
210                'Ambiguous key id set: "%s" in file_info and "%s" in %s' % (
211                    file_info[SSE_C_KEY_ID_FILE_INFO_KEY_NAME], self.key.key_id,
212                    self.__class__.__name__
213                )
214            )
215        file_info[SSE_C_KEY_ID_FILE_INFO_KEY_NAME] = self.key.key_id
216        return file_info
217
218    def __repr__(self):
219        return '<%s(%s, %s, %s)>' % (self.__class__.__name__, self.mode, self.algorithm, self.key)
220
221
222class EncryptionSettingFactory:
223    # 2021-03-17: for the bucket the response of the server is:
224    # if authorized to read:
225    #    "mode": "none"
226    #    or
227    #    "mode": "SSE-B2"
228    # if not authorized to read:
229    #    isClientAuthorizedToRead is False and there is no value, so no mode
230    #
231    # BUT file_version (get_file_info, list_file_versions, upload_file etc)
232    # if the file is encrypted, then
233    #     "serverSideEncryption": {"algorithm": "AES256", "mode": "SSE-B2"},
234    #     or
235    #     "serverSideEncryption": {"algorithm": "AES256", "mode": "SSE-C"},
236    # if the file is not encrypted, then "serverSideEncryption" is not present at all
237    @classmethod
238    def from_file_version_dict(cls, file_version_dict: dict) -> EncryptionSetting:
239        """
240        Returns EncryptionSetting for the given file_version_dict retrieved from the api
241
242        .. code-block:: python
243
244            ...
245            "serverSideEncryption": {"algorithm": "AES256", "mode": "SSE-B2"},
246            "fileInfo": {"sse_c_key_id": "key-identifier"}
247            ...
248
249        """
250        sse = file_version_dict.get('serverSideEncryption')
251        if sse is None:
252            return EncryptionSetting(EncryptionMode.NONE)
253        key_id = None
254        file_info = file_version_dict.get('fileInfo')
255        if file_info is not None and SSE_C_KEY_ID_FILE_INFO_KEY_NAME in file_info:
256            key_id = urllib.parse.unquote(file_info[SSE_C_KEY_ID_FILE_INFO_KEY_NAME])
257
258        return cls._from_value_dict(sse, key_id=key_id)
259
260    @classmethod
261    def from_bucket_dict(cls, bucket_dict: dict) -> Optional[EncryptionSetting]:
262        """
263        Returns EncryptionSetting for the given bucket dict retrieved from the api, or None if unauthorized
264
265        Example inputs:
266
267        .. code-block:: python
268
269            ...
270            "defaultServerSideEncryption": {
271                "isClientAuthorizedToRead" : true,
272                "value": {
273                  "algorithm" : "AES256",
274                  "mode" : "SSE-B2"
275                }
276            }
277            ...
278
279        unset:
280
281        .. code-block:: python
282
283             ...
284            "defaultServerSideEncryption": {
285                "isClientAuthorizedToRead" : true,
286                "value": {
287                  "mode" : "none"
288                }
289            }
290            ...
291
292        unknown:
293
294        .. code-block:: python
295
296            ...
297            "defaultServerSideEncryption": {
298                "isClientAuthorizedToRead" : false
299            }
300            ...
301
302        """
303        default_sse = bucket_dict.get(
304            'defaultServerSideEncryption',
305            {'isClientAuthorizedToRead': False},
306        )
307
308        if not default_sse['isClientAuthorizedToRead']:
309            return EncryptionSetting(EncryptionMode.UNKNOWN)
310
311        assert 'value' in default_sse
312        return cls._from_value_dict(default_sse['value'])
313
314    @classmethod
315    def _from_value_dict(cls, value_dict, key_id=None):
316        kwargs = {}
317        if value_dict is None:
318            kwargs['mode'] = EncryptionMode.NONE
319        else:
320            mode = EncryptionMode(value_dict['mode'] or 'none')
321            kwargs['mode'] = mode
322
323            algorithm = value_dict.get('algorithm')
324            if algorithm is not None:
325                kwargs['algorithm'] = EncryptionAlgorithm(algorithm)
326
327            if mode == EncryptionMode.SSE_C:
328                kwargs['key'] = EncryptionKey(key_id=key_id, secret=None)
329
330        return EncryptionSetting(**kwargs)
331
332    @classmethod
333    def from_response_headers(cls, headers):
334        if 'X-Bz-Server-Side-Encryption' in headers:
335            mode = EncryptionMode.SSE_B2
336            algorithm = EncryptionAlgorithm(headers['X-Bz-Server-Side-Encryption'])
337            return EncryptionSetting(mode, algorithm)
338        if 'X-Bz-Server-Side-Encryption-Customer-Algorithm' in headers:
339            mode = EncryptionMode.SSE_C
340            algorithm = EncryptionAlgorithm(
341                headers['X-Bz-Server-Side-Encryption-Customer-Algorithm']
342            )
343            key_id = headers.get(SSE_C_KEY_ID_HEADER)
344            key = EncryptionKey(secret=None, key_id=key_id)
345            return EncryptionSetting(mode, algorithm, key)
346        return EncryptionSetting(EncryptionMode.NONE)
347
348
349SSE_NONE = EncryptionSetting(mode=EncryptionMode.NONE,)
350"""
351Commonly used "no encryption" setting
352"""
353
354SSE_B2_AES = EncryptionSetting(
355    mode=EncryptionMode.SSE_B2,
356    algorithm=EncryptionAlgorithm.AES256,
357)
358"""
359Commonly used SSE-B2 setting
360"""
361