1###################################################################### 2# 3# File: b2sdk/encryption/setting.py 4# 5# Copyright 2021 Backblaze Inc. All Rights Reserved. 6# 7# License https://www.backblaze.com/using_b2_code.html 8# 9###################################################################### 10 11import enum 12import logging 13from typing import Optional, Union 14import urllib 15import urllib.parse 16 17from ..http_constants import SSE_C_KEY_ID_FILE_INFO_KEY_NAME, SSE_C_KEY_ID_HEADER 18from ..utils import b64_of_bytes, md5_of_bytes 19from .types import ENCRYPTION_MODES_WITH_MANDATORY_ALGORITHM, ENCRYPTION_MODES_WITH_MANDATORY_KEY 20from .types import EncryptionAlgorithm, EncryptionMode 21 22logger = logging.getLogger(__name__) 23 24 25class _UnknownKeyId(enum.Enum): 26 """The purpose of this enum is to provide a sentinel that can be used with type annotations.""" 27 unknown_key_id = 0 28 29 30UNKNOWN_KEY_ID = _UnknownKeyId.unknown_key_id 31""" 32Value for EncryptionKey.key_id that signifies that the key id may or may not be defined. Useful when establishing 33encryption settings based on user input (and not based on B2 cloud data). 34""" 35 36 37class EncryptionKey: 38 """ 39 Hold information about encryption key: the key itself, and its id. The id may be None, if it's not set 40 in encrypted file's fileInfo, or UNKNOWN_KEY_ID when that information is missing. 41 The secret may be None, if encryption metadata is read from the server. 42 """ 43 SECRET_REPR = '******' 44 45 def __init__(self, secret: Optional[bytes], key_id: Union[str, None, _UnknownKeyId]): 46 self.secret = secret 47 self.key_id = key_id 48 49 def __eq__(self, other): 50 return self.secret == other.secret and self.key_id == other.key_id 51 52 def __repr__(self): 53 key_repr = self.SECRET_REPR 54 if self.secret is None: 55 key_repr = None 56 57 if self.key_id is UNKNOWN_KEY_ID: 58 key_id_repr = 'unknown' 59 else: 60 key_id_repr = repr(self.key_id) 61 return '<%s(%s, %s)>' % (self.__class__.__name__, key_repr, key_id_repr) 62 63 def as_dict(self): 64 """ 65 Dump EncryptionKey as dict for serializing a to json for requests. 66 """ 67 if self.secret is not None: 68 return { 69 'customerKey': self.key_b64(), 70 'customerKeyMd5': self.key_md5(), 71 } 72 return { 73 'customerKey': self.SECRET_REPR, 74 'customerKeyMd5': self.SECRET_REPR, 75 } 76 77 def key_b64(self): 78 return b64_of_bytes(self.secret) 79 80 def key_md5(self): 81 return b64_of_bytes(md5_of_bytes(self.secret)) 82 83 84class EncryptionSetting: 85 """ 86 Hold information about encryption mode, algorithm and key (for bucket default, 87 file version info or even upload) 88 """ 89 90 def __init__( 91 self, 92 mode: EncryptionMode, 93 algorithm: EncryptionAlgorithm = None, 94 key: EncryptionKey = None, 95 ): 96 """ 97 :param b2sdk.v2.EncryptionMode mode: encryption mode 98 :param b2sdk.v2.EncryptionAlgorithm algorithm: encryption algorithm 99 :param b2sdk.v2.EncryptionKey key: encryption key object for SSE-C 100 """ 101 self.mode = mode 102 self.algorithm = algorithm 103 self.key = key 104 if self.mode == EncryptionMode.NONE and (self.algorithm or self.key): 105 raise ValueError("cannot specify algorithm or key for 'plaintext' encryption mode") 106 if self.mode in ENCRYPTION_MODES_WITH_MANDATORY_ALGORITHM and not self.algorithm: 107 raise ValueError('must specify algorithm for encryption mode %s' % (self.mode,)) 108 if self.mode in ENCRYPTION_MODES_WITH_MANDATORY_KEY and not self.key: 109 raise ValueError( 110 'must specify key for encryption mode %s and algorithm %s' % 111 (self.mode, self.algorithm) 112 ) 113 114 def __eq__(self, other): 115 if other is None: 116 raise ValueError('cannot compare a known encryption setting to an unknown one') 117 return self.mode == other.mode and self.algorithm == other.algorithm and self.key == other.key 118 119 def serialize_to_json_for_request(self): 120 if self.key and self.key.secret is None: 121 raise ValueError('cannot use an unknown key in requests') 122 return self.as_dict() 123 124 def as_dict(self): 125 """ 126 Represent the setting as a dict, for example: 127 128 .. code-block:: python 129 130 { 131 'mode': 'SSE-C', 132 'algorithm': 'AES256', 133 'customerKey': 'U3hWbVlxM3Q2djl5JEImRSlIQE1jUWZUalduWnI0dTc=', 134 'customerKeyMd5': 'SWx9GFv5BTT1jdwf48Bx+Q==' 135 } 136 137 .. code-block:: python 138 139 { 140 'mode': 'SSE-B2', 141 'algorithm': 'AES256' 142 } 143 144 or 145 146 .. code-block:: python 147 148 { 149 'mode': 'none' 150 } 151 """ 152 result = {'mode': self.mode.value} 153 if self.algorithm is not None: 154 result['algorithm'] = self.algorithm.value 155 if self.mode == EncryptionMode.SSE_C: 156 result.update(self.key.as_dict()) 157 return result 158 159 def add_to_upload_headers(self, headers): 160 if self.mode == EncryptionMode.NONE: 161 # as of 2021-03-16, server always fails it 162 headers['X-Bz-Server-Side-Encryption'] = self.mode.name 163 elif self.mode == EncryptionMode.SSE_B2: 164 self._add_sse_b2_headers(headers) 165 elif self.mode == EncryptionMode.SSE_C: 166 if self.key.key_id is UNKNOWN_KEY_ID: 167 raise ValueError('Cannot upload a file with an unknown key id') 168 self._add_sse_c_headers(headers) 169 if self.key.key_id is not None: 170 header = SSE_C_KEY_ID_HEADER 171 if headers.get(header) is not None and headers[header] != self.key.key_id: 172 raise ValueError( 173 'Ambiguous key id set: "%s" in headers and "%s" in %s' % 174 (headers[header], self.key.key_id, self.__class__.__name__) 175 ) 176 headers[header] = urllib.parse.quote(str(self.key.key_id)) 177 else: 178 raise NotImplementedError('unsupported encryption setting: %s' % (self,)) 179 180 def add_to_download_headers(self, headers): 181 if self.mode == EncryptionMode.NONE: 182 return 183 elif self.mode == EncryptionMode.SSE_B2: 184 self._add_sse_b2_headers(headers) 185 elif self.mode == EncryptionMode.SSE_C: 186 self._add_sse_c_headers(headers) 187 else: 188 raise NotImplementedError('unsupported encryption setting: %s' % (self,)) 189 190 def _add_sse_b2_headers(self, headers): 191 headers['X-Bz-Server-Side-Encryption'] = self.algorithm.name 192 193 def _add_sse_c_headers(self, headers): 194 if self.key.secret is None: 195 raise ValueError('Cannot use an unknown key in http headers') 196 headers['X-Bz-Server-Side-Encryption-Customer-Algorithm'] = self.algorithm.name 197 headers['X-Bz-Server-Side-Encryption-Customer-Key'] = self.key.key_b64() 198 headers['X-Bz-Server-Side-Encryption-Customer-Key-Md5'] = self.key.key_md5() 199 200 def add_key_id_to_file_info(self, file_info: Optional[dict]): 201 if self.key is None or self.key.key_id is None: 202 return file_info 203 if self.key.key_id is UNKNOWN_KEY_ID: 204 raise ValueError('Cannot add an unknown key id to file info') 205 if file_info is None: 206 file_info = {} 207 if file_info.get(SSE_C_KEY_ID_FILE_INFO_KEY_NAME) is not None and file_info[ 208 SSE_C_KEY_ID_FILE_INFO_KEY_NAME] != self.key.key_id: 209 raise ValueError( 210 'Ambiguous key id set: "%s" in file_info and "%s" in %s' % ( 211 file_info[SSE_C_KEY_ID_FILE_INFO_KEY_NAME], self.key.key_id, 212 self.__class__.__name__ 213 ) 214 ) 215 file_info[SSE_C_KEY_ID_FILE_INFO_KEY_NAME] = self.key.key_id 216 return file_info 217 218 def __repr__(self): 219 return '<%s(%s, %s, %s)>' % (self.__class__.__name__, self.mode, self.algorithm, self.key) 220 221 222class EncryptionSettingFactory: 223 # 2021-03-17: for the bucket the response of the server is: 224 # if authorized to read: 225 # "mode": "none" 226 # or 227 # "mode": "SSE-B2" 228 # if not authorized to read: 229 # isClientAuthorizedToRead is False and there is no value, so no mode 230 # 231 # BUT file_version (get_file_info, list_file_versions, upload_file etc) 232 # if the file is encrypted, then 233 # "serverSideEncryption": {"algorithm": "AES256", "mode": "SSE-B2"}, 234 # or 235 # "serverSideEncryption": {"algorithm": "AES256", "mode": "SSE-C"}, 236 # if the file is not encrypted, then "serverSideEncryption" is not present at all 237 @classmethod 238 def from_file_version_dict(cls, file_version_dict: dict) -> EncryptionSetting: 239 """ 240 Returns EncryptionSetting for the given file_version_dict retrieved from the api 241 242 .. code-block:: python 243 244 ... 245 "serverSideEncryption": {"algorithm": "AES256", "mode": "SSE-B2"}, 246 "fileInfo": {"sse_c_key_id": "key-identifier"} 247 ... 248 249 """ 250 sse = file_version_dict.get('serverSideEncryption') 251 if sse is None: 252 return EncryptionSetting(EncryptionMode.NONE) 253 key_id = None 254 file_info = file_version_dict.get('fileInfo') 255 if file_info is not None and SSE_C_KEY_ID_FILE_INFO_KEY_NAME in file_info: 256 key_id = urllib.parse.unquote(file_info[SSE_C_KEY_ID_FILE_INFO_KEY_NAME]) 257 258 return cls._from_value_dict(sse, key_id=key_id) 259 260 @classmethod 261 def from_bucket_dict(cls, bucket_dict: dict) -> Optional[EncryptionSetting]: 262 """ 263 Returns EncryptionSetting for the given bucket dict retrieved from the api, or None if unauthorized 264 265 Example inputs: 266 267 .. code-block:: python 268 269 ... 270 "defaultServerSideEncryption": { 271 "isClientAuthorizedToRead" : true, 272 "value": { 273 "algorithm" : "AES256", 274 "mode" : "SSE-B2" 275 } 276 } 277 ... 278 279 unset: 280 281 .. code-block:: python 282 283 ... 284 "defaultServerSideEncryption": { 285 "isClientAuthorizedToRead" : true, 286 "value": { 287 "mode" : "none" 288 } 289 } 290 ... 291 292 unknown: 293 294 .. code-block:: python 295 296 ... 297 "defaultServerSideEncryption": { 298 "isClientAuthorizedToRead" : false 299 } 300 ... 301 302 """ 303 default_sse = bucket_dict.get( 304 'defaultServerSideEncryption', 305 {'isClientAuthorizedToRead': False}, 306 ) 307 308 if not default_sse['isClientAuthorizedToRead']: 309 return EncryptionSetting(EncryptionMode.UNKNOWN) 310 311 assert 'value' in default_sse 312 return cls._from_value_dict(default_sse['value']) 313 314 @classmethod 315 def _from_value_dict(cls, value_dict, key_id=None): 316 kwargs = {} 317 if value_dict is None: 318 kwargs['mode'] = EncryptionMode.NONE 319 else: 320 mode = EncryptionMode(value_dict['mode'] or 'none') 321 kwargs['mode'] = mode 322 323 algorithm = value_dict.get('algorithm') 324 if algorithm is not None: 325 kwargs['algorithm'] = EncryptionAlgorithm(algorithm) 326 327 if mode == EncryptionMode.SSE_C: 328 kwargs['key'] = EncryptionKey(key_id=key_id, secret=None) 329 330 return EncryptionSetting(**kwargs) 331 332 @classmethod 333 def from_response_headers(cls, headers): 334 if 'X-Bz-Server-Side-Encryption' in headers: 335 mode = EncryptionMode.SSE_B2 336 algorithm = EncryptionAlgorithm(headers['X-Bz-Server-Side-Encryption']) 337 return EncryptionSetting(mode, algorithm) 338 if 'X-Bz-Server-Side-Encryption-Customer-Algorithm' in headers: 339 mode = EncryptionMode.SSE_C 340 algorithm = EncryptionAlgorithm( 341 headers['X-Bz-Server-Side-Encryption-Customer-Algorithm'] 342 ) 343 key_id = headers.get(SSE_C_KEY_ID_HEADER) 344 key = EncryptionKey(secret=None, key_id=key_id) 345 return EncryptionSetting(mode, algorithm, key) 346 return EncryptionSetting(EncryptionMode.NONE) 347 348 349SSE_NONE = EncryptionSetting(mode=EncryptionMode.NONE,) 350""" 351Commonly used "no encryption" setting 352""" 353 354SSE_B2_AES = EncryptionSetting( 355 mode=EncryptionMode.SSE_B2, 356 algorithm=EncryptionAlgorithm.AES256, 357) 358""" 359Commonly used SSE-B2 setting 360""" 361