1######################################################################
2#
3# File: b2sdk/raw_simulator.py
4#
5# Copyright 2021 Backblaze Inc. All Rights Reserved.
6#
7# License https://www.backblaze.com/using_b2_code.html
8#
9######################################################################
10
11from typing import Optional
12import collections
13import io
14import logging
15import random
16import re
17import time
18import threading
19from contextlib import contextmanager
20
21from requests.structures import CaseInsensitiveDict
22
23from .b2http import ResponseContextManager
24from .encryption.setting import EncryptionMode, EncryptionSetting
25from .exception import (
26    BadJson,
27    BadRequest,
28    BadUploadUrl,
29    ChecksumMismatch,
30    Conflict,
31    DuplicateBucketName,
32    FileNotPresent,
33    FileSha1Mismatch,
34    InvalidAuthToken,
35    InvalidMetadataDirective,
36    MissingPart,
37    NonExistentBucket,
38    PartSha1Mismatch,
39    Unauthorized,
40    UnsatisfiableRange,
41    SSECKeyError,
42)
43from .file_lock import BucketRetentionSetting, FileRetentionSetting, NO_RETENTION_BUCKET_SETTING, LegalHold
44from .raw_api import AbstractRawApi, MetadataDirectiveMode, ALL_CAPABILITIES
45from .utils import (
46    b2_url_decode,
47    b2_url_encode,
48    ConcurrentUsedAuthTokenGuard,
49    hex_sha1_of_bytes,
50)
51from b2sdk.http_constants import FILE_INFO_HEADER_PREFIX, HEX_DIGITS_AT_END
52from .stream.hashing import StreamWithHash
53
54logger = logging.getLogger(__name__)
55
56
57def get_bytes_range(data_bytes, bytes_range):
58    """ Slice bytes array using bytes range """
59    if bytes_range is None:
60        return data_bytes
61    if bytes_range[0] > bytes_range[1]:
62        raise UnsatisfiableRange()
63    if bytes_range[0] < 0:
64        raise UnsatisfiableRange()
65    if bytes_range[1] > len(data_bytes):
66        raise UnsatisfiableRange()
67    return data_bytes[bytes_range[0]:bytes_range[1]]
68
69
70class KeySimulator(object):
71    """
72    Hold information about one application key, which can be either
73    a master application key, or one created with create_key().
74    """
75
76    def __init__(
77        self, account_id, name, application_key_id, key, capabilities, expiration_timestamp_or_none,
78        bucket_id_or_none, bucket_name_or_none, name_prefix_or_none
79    ):
80        self.name = name
81        self.account_id = account_id
82        self.application_key_id = application_key_id
83        self.key = key
84        self.capabilities = capabilities
85        self.expiration_timestamp_or_none = expiration_timestamp_or_none
86        self.bucket_id_or_none = bucket_id_or_none
87        self.name_prefix_or_none = name_prefix_or_none
88
89    def as_key(self):
90        return dict(
91            accountId=self.account_id,
92            bucketId=self.bucket_id_or_none,
93            applicationKeyId=self.application_key_id,
94            capabilities=self.capabilities,
95            expirationTimestamp=self.expiration_timestamp_or_none and
96            self.expiration_timestamp_or_none * 1000,
97            keyName=self.name,
98            namePrefix=self.name_prefix_or_none,
99        )
100
101    def as_created_key(self):
102        """
103        Return the dict returned by b2_create_key.
104
105        This is just like the one for b2_list_keys, but also includes the secret key.
106        """
107        result = self.as_key()
108        result['applicationKey'] = self.key
109        return result
110
111    def get_allowed(self):
112        """
113        Return the 'allowed' structure to include in the response from b2_authorize_account.
114        """
115        return dict(
116            bucketId=self.bucket_id_or_none,
117            capabilities=self.capabilities,
118            namePrefix=self.name_prefix_or_none,
119        )
120
121
122class PartSimulator(object):
123    def __init__(self, file_id, part_number, content_length, content_sha1, part_data):
124        self.file_id = file_id
125        self.part_number = part_number
126        self.content_length = content_length
127        self.content_sha1 = content_sha1
128        self.part_data = part_data
129
130    def as_list_parts_dict(self):
131        return dict(
132            fileId=self.file_id,
133            partNumber=self.part_number,
134            contentLength=self.content_length,
135            contentSha1=self.content_sha1
136        )  # yapf: disable
137
138
139class FileSimulator(object):
140    """
141    One of three: an unfinished large file, a finished file, or a deletion marker.
142    """
143
144    CHECK_ENCRYPTION = True
145    SPECIAL_FILE_INFOS = {  # when downloading, these file info keys are translated to specialized headers
146        'b2-content-disposition': 'Content-Disposition',
147        'b2-content-language': 'Content-Language',
148        'b2-expires': 'Expires',
149        'b2-cache-control': 'Cache-Control',
150        'b2-content-encoding': 'Content-Encoding',
151    }
152
153    def __init__(
154        self,
155        account_id,
156        bucket,
157        file_id,
158        action,
159        name,
160        content_type,
161        content_sha1,
162        file_info,
163        data_bytes,
164        upload_timestamp,
165        range_=None,
166        server_side_encryption: Optional[EncryptionSetting] = None,
167        file_retention: Optional[FileRetentionSetting] = None,
168        legal_hold: LegalHold = LegalHold.UNSET,
169    ):
170        if action == 'hide':
171            assert server_side_encryption is None
172        else:
173            assert server_side_encryption is not None
174        self.account_id = account_id
175        self.bucket = bucket
176        self.file_id = file_id
177        self.action = action
178        self.name = name
179        if data_bytes is not None:
180            self.content_length = len(data_bytes)
181        self.content_type = content_type
182        self.content_sha1 = content_sha1
183        if content_sha1 and content_sha1 != 'none' and len(content_sha1) != 40:
184            raise ValueError(content_sha1)
185        self.file_info = file_info
186        self.data_bytes = data_bytes
187        self.upload_timestamp = upload_timestamp
188        self.range_ = range_
189        self.server_side_encryption = server_side_encryption
190        self.file_retention = file_retention
191        self.legal_hold = legal_hold if legal_hold is not None else LegalHold.UNSET
192
193        if action == 'start':
194            self.parts = []
195
196    @classmethod
197    @contextmanager
198    def dont_check_encryption(cls):
199        cls.CHECK_ENCRYPTION = False
200        yield
201        cls.CHECK_ENCRYPTION = True
202
203    def sort_key(self):
204        """
205        Return a key that can be used to sort the files in a
206        bucket in the order that b2_list_file_versions returns them.
207        """
208        return (self.name, self.file_id)
209
210    def as_download_headers(self, account_auth_token_or_none, range_=None):
211        if self.data_bytes is None:
212            content_length = 0
213        elif range_ is not None:
214            if range_[1] >= len(self.data_bytes):  # requested too much
215                content_length = len(self.data_bytes)
216            else:
217                content_length = range_[1] - range_[0] + 1
218        else:
219            content_length = len(self.data_bytes)
220        headers = CaseInsensitiveDict(
221            {
222                'content-length': content_length,
223                'content-type': self.content_type,
224                'x-bz-content-sha1': self.content_sha1,
225                'x-bz-upload-timestamp': self.upload_timestamp,
226                'x-bz-file-id': self.file_id,
227                'x-bz-file-name': self.name,
228            }
229        )
230        for key, value in self.file_info.items():
231            key_lower = key.lower()
232            if key_lower in self.SPECIAL_FILE_INFOS:
233                headers[self.SPECIAL_FILE_INFOS[key_lower]] = value
234            else:
235                headers[FILE_INFO_HEADER_PREFIX + key] = value
236
237        if account_auth_token_or_none is not None and self.bucket.is_file_lock_enabled:
238            not_permitted = []
239
240            if not self.is_allowed_to_read_file_retention(account_auth_token_or_none):
241                not_permitted.append('X-Bz-File-Retention-Mode')
242                not_permitted.append('X-Bz-File-Retain-Until-Timestamp')
243            else:
244                if self.file_retention is not None:
245                    self.file_retention.add_to_to_upload_headers(headers)
246
247            if not self.is_allowed_to_read_file_legal_hold(account_auth_token_or_none):
248                not_permitted.append('X-Bz-File-Legal-Hold')
249            else:
250                headers['X-Bz-File-Legal-Hold'] = self.legal_hold and 'on' or 'off'
251
252            if not_permitted:
253                headers['X-Bz-Client-Unauthorized-To-Read'] = ','.join(not_permitted)
254
255        if self.server_side_encryption is not None:
256            if self.server_side_encryption.mode == EncryptionMode.SSE_B2:
257                headers['X-Bz-Server-Side-Encryption'] = self.server_side_encryption.algorithm.value
258            elif self.server_side_encryption.mode == EncryptionMode.SSE_C:
259                headers['X-Bz-Server-Side-Encryption-Customer-Algorithm'
260                       ] = self.server_side_encryption.algorithm.value
261                headers['X-Bz-Server-Side-Encryption-Customer-Key-Md5'
262                       ] = self.server_side_encryption.key.key_md5()
263            elif self.server_side_encryption.mode in (EncryptionMode.NONE, EncryptionMode.UNKNOWN):
264                pass
265            else:
266                raise ValueError(
267                    'Unsupported encryption mode: %s' % (self.server_side_encryption.mode,)
268                )
269
270        if range_ is not None:
271            headers['Content-Range'] = 'bytes %d-%d/%d' % (
272                range_[0], range_[0] + content_length - 1, len(self.data_bytes)
273            )  # yapf: disable
274        return headers
275
276    def as_upload_result(self, account_auth_token):
277        result = dict(
278            fileId=self.file_id,
279            fileName=self.name,
280            accountId=self.account_id,
281            bucketId=self.bucket.bucket_id,
282            contentLength=len(self.data_bytes) if self.data_bytes is not None else 0,
283            contentType=self.content_type,
284            contentSha1=self.content_sha1,
285            fileInfo=self.file_info,
286            action=self.action,
287            uploadTimestamp=self.upload_timestamp,
288        )  # yapf: disable
289        if self.server_side_encryption is not None:
290            result['serverSideEncryption'
291                  ] = self.server_side_encryption.serialize_to_json_for_request()
292        result['fileRetention'] = self._file_retention_dict(account_auth_token)
293        result['legalHold'] = self._legal_hold_dict(account_auth_token)
294        return result
295
296    def as_list_files_dict(self, account_auth_token):
297        result = dict(
298            accountId=self.account_id,
299            bucketId=self.bucket.bucket_id,
300            fileId=self.file_id,
301            fileName=self.name,
302            contentLength=len(self.data_bytes) if self.data_bytes is not None else 0,
303            contentType=self.content_type,
304            contentSha1=self.content_sha1,
305            fileInfo=self.file_info,
306            action=self.action,
307            uploadTimestamp=self.upload_timestamp,
308        )  # yapf: disable
309        if self.server_side_encryption is not None:
310            result['serverSideEncryption'
311                  ] = self.server_side_encryption.serialize_to_json_for_request()
312        result['fileRetention'] = self._file_retention_dict(account_auth_token)
313        result['legalHold'] = self._legal_hold_dict(account_auth_token)
314        return result
315
316    def is_allowed_to_read_file_retention(self, account_auth_token):
317        return self.bucket._check_capability(account_auth_token, 'readFileRetentions')
318
319    def is_allowed_to_read_file_legal_hold(self, account_auth_token):
320        return self.bucket._check_capability(account_auth_token, 'readFileLegalHolds')
321
322    def as_start_large_file_result(self, account_auth_token):
323        result = dict(
324            fileId=self.file_id,
325            fileName=self.name,
326            accountId=self.account_id,
327            bucketId=self.bucket.bucket_id,
328            contentType=self.content_type,
329            fileInfo=self.file_info,
330            uploadTimestamp=self.upload_timestamp,
331        )  # yapf: disable
332        if self.server_side_encryption is not None:
333            result['serverSideEncryption'
334                  ] = self.server_side_encryption.serialize_to_json_for_request()
335        result['fileRetention'] = self._file_retention_dict(account_auth_token)
336        result['legalHold'] = self._legal_hold_dict(account_auth_token)
337        return result
338
339    def _file_retention_dict(self, account_auth_token):
340        if not self.is_allowed_to_read_file_retention(account_auth_token):
341            return {
342                'isClientAuthorizedToRead': False,
343                'value': None,
344            }
345
346        file_lock_configuration = {'isClientAuthorizedToRead': True}
347        if self.file_retention is None:
348            file_lock_configuration['value'] = {'mode': None}
349        else:
350            file_lock_configuration['value'] = {'mode': self.file_retention.mode.value}
351            if self.file_retention.retain_until is not None:
352                file_lock_configuration['value']['retainUntilTimestamp'
353                                                ] = self.file_retention.retain_until
354        return file_lock_configuration
355
356    def _legal_hold_dict(self, account_auth_token):
357        if not self.is_allowed_to_read_file_legal_hold(account_auth_token):
358            return {
359                'isClientAuthorizedToRead': False,
360                'value': None,
361            }
362        return {
363            'isClientAuthorizedToRead': True,
364            'value': self.legal_hold.value,
365        }
366
367    def add_part(self, part_number, part):
368        while len(self.parts) < part_number + 1:
369            self.parts.append(None)
370        self.parts[part_number] = part
371
372    def finish(self, part_sha1_array):
373        last_part_number = max(part.part_number for part in self.parts if part is not None)
374        for part_number in range(1, last_part_number + 1):
375            if self.parts[part_number] is None:
376                raise MissingPart(part_number)
377        my_part_sha1_array = [
378            self.parts[part_number].content_sha1 for part_number in range(1, last_part_number + 1)
379        ]
380        if part_sha1_array != my_part_sha1_array:
381            raise ChecksumMismatch(
382                'sha1', expected=str(part_sha1_array), actual=str(my_part_sha1_array)
383            )
384        self.data_bytes = b''.join(
385            self.parts[part_number].part_data for part_number in range(1, last_part_number + 1)
386        )
387        self.content_length = len(self.data_bytes)
388        self.action = 'upload'
389
390    def is_visible(self):
391        """
392        Does this file show up in b2_list_file_names?
393        """
394        return self.action == 'upload'
395
396    def list_parts(self, start_part_number, max_part_count):
397        start_part_number = start_part_number or 1
398        max_part_count = max_part_count or 100
399        parts = [
400            part.as_list_parts_dict()
401            for part in self.parts if part is not None and start_part_number <= part.part_number
402        ]
403        if len(parts) <= max_part_count:
404            next_part_number = None
405        else:
406            next_part_number = parts[max_part_count]['partNumber']
407            parts = parts[:max_part_count]
408        return dict(parts=parts, nextPartNumber=next_part_number)
409
410    def check_encryption(self, request_encryption: Optional[EncryptionSetting]):
411        if not self.CHECK_ENCRYPTION:
412            return
413        file_mode, file_secret = self._get_encryption_mode_and_secret(self.server_side_encryption)
414        request_mode, request_secret = self._get_encryption_mode_and_secret(request_encryption)
415
416        if file_mode in (None, EncryptionMode.NONE):
417            assert request_mode in (None, EncryptionMode.NONE)
418        elif file_mode == EncryptionMode.SSE_B2:
419            assert request_mode in (None, EncryptionMode.NONE, EncryptionMode.SSE_B2)
420        elif file_mode == EncryptionMode.SSE_C:
421            if request_mode != EncryptionMode.SSE_C or file_secret != request_secret:
422                raise SSECKeyError()
423        else:
424            raise ValueError('Unsupported EncryptionMode: %s' % (file_mode))
425
426    def _get_encryption_mode_and_secret(self, encryption: Optional[EncryptionSetting]):
427        if encryption is None:
428            return None, None
429        mode = encryption.mode
430        if encryption.key is None:
431            secret = None
432        else:
433            secret = encryption.key.secret
434        return mode, secret
435
436
437FakeRequest = collections.namedtuple('FakeRequest', 'url headers')
438
439
440class FakeResponse(object):
441    def __init__(self, account_auth_token_or_none, file_sim, url, range_=None):
442        self.data_bytes = file_sim.data_bytes
443        self.headers = file_sim.as_download_headers(account_auth_token_or_none, range_)
444        self.url = url
445        self.range_ = range_
446        if range_ is not None:
447            self.data_bytes = self.data_bytes[range_[0]:range_[1] + 1]
448
449    def iter_content(self, chunk_size=1):
450        start = 0
451        rnd = random.Random(self.url)
452        while start <= len(self.data_bytes):
453            time.sleep(rnd.random() * 0.01)
454            yield self.data_bytes[start:start + chunk_size]
455            start += chunk_size
456
457    @property
458    def request(self):
459        headers = CaseInsensitiveDict()
460        if self.range_ is not None:
461            headers['Range'] = '%s-%s' % self.range_
462        return FakeRequest(self.url, headers)
463
464    def close(self):
465        pass
466
467
468class BucketSimulator(object):
469
470    # File IDs start at 9999 and count down, so they sort in the order
471    # returned by list_file_versions. The IDs are strings.
472    FIRST_FILE_NUMBER = 9999
473
474    FIRST_FILE_ID = str(FIRST_FILE_NUMBER)
475
476    FILE_SIMULATOR_CLASS = FileSimulator
477    RESPONSE_CLASS = FakeResponse
478
479    def __init__(
480        self,
481        api,
482        account_id,
483        bucket_id,
484        bucket_name,
485        bucket_type,
486        bucket_info=None,
487        cors_rules=None,
488        lifecycle_rules=None,
489        options_set=None,
490        default_server_side_encryption=None,
491        is_file_lock_enabled: Optional[bool] = None,
492    ):
493        assert bucket_type in ['allPrivate', 'allPublic']
494        self.api = api
495        self.account_id = account_id
496        self.bucket_name = bucket_name
497        self.bucket_id = bucket_id
498        self.bucket_type = bucket_type
499        self.bucket_info = bucket_info or {}
500        self.cors_rules = cors_rules or []
501        self.lifecycle_rules = lifecycle_rules or []
502        self.options_set = options_set or set()
503        self.revision = 1
504        self.upload_url_counter = iter(range(200))
505        # File IDs count down, so that the most recent will come first when they are sorted.
506        self.file_id_counter = iter(range(self.FIRST_FILE_NUMBER, 0, -1))
507        self.upload_timestamp_counter = iter(range(5000, 9999))
508        self.file_id_to_file = dict()
509        # It would be nice to use an OrderedDict for this, but 2.6 doesn't have it.
510        self.file_name_and_id_to_file = dict()
511        if default_server_side_encryption is None:
512            default_server_side_encryption = EncryptionSetting(mode=EncryptionMode.NONE)
513        self.default_server_side_encryption = default_server_side_encryption
514        self.is_file_lock_enabled = is_file_lock_enabled
515        self.default_retention = NO_RETENTION_BUCKET_SETTING
516
517    def is_allowed_to_read_bucket_encryption_setting(self, account_auth_token):
518        return self._check_capability(account_auth_token, 'readBucketEncryption')
519
520    def is_allowed_to_read_bucket_retention(self, account_auth_token):
521        return self._check_capability(account_auth_token, 'readBucketRetentions')
522
523    def _check_capability(self, account_auth_token, capability):
524        try:
525            key = self.api.auth_token_to_key[account_auth_token]
526        except KeyError:
527            # looks like it's an upload token
528            # fortunately BucketSimulator makes it easy to retrieve the true account_auth_token
529            # from an upload url
530            real_auth_token = account_auth_token.split('/')[-1]
531            key = self.api.auth_token_to_key[real_auth_token]
532        capabilities = key.get_allowed()['capabilities']
533        return capability in capabilities
534
535    def bucket_dict(self, account_auth_token):
536        default_sse = {'isClientAuthorizedToRead': False}
537        if self.is_allowed_to_read_bucket_encryption_setting(account_auth_token):
538            default_sse['isClientAuthorizedToRead'] = True
539            default_sse['value'] = {'mode': self.default_server_side_encryption.mode.value}
540            if self.default_server_side_encryption.algorithm is not None:
541                default_sse['value']['algorithm'
542                                    ] = self.default_server_side_encryption.algorithm.value
543        else:
544            default_sse['value'] = {'mode': EncryptionMode.UNKNOWN.value}
545
546        if self.is_allowed_to_read_bucket_retention(account_auth_token):
547            file_lock_configuration = {
548                'isClientAuthorizedToRead': True,
549                'value': {
550                    'defaultRetention': {
551                        'mode': self.default_retention.mode.value,
552                        'period': self.default_retention.period.as_dict() if self.default_retention.period else None,
553                    },
554                    'isFileLockEnabled': self.is_file_lock_enabled,
555                },
556            }  # yapf: disable
557        else:
558            file_lock_configuration = {'isClientAuthorizedToRead': False, 'value': None}
559        return dict(
560            accountId=self.account_id,
561            bucketName=self.bucket_name,
562            bucketId=self.bucket_id,
563            bucketType=self.bucket_type,
564            bucketInfo=self.bucket_info,
565            corsRules=self.cors_rules,
566            lifecycleRules=self.lifecycle_rules,
567            options=self.options_set,
568            revision=self.revision,
569            defaultServerSideEncryption=default_sse,
570            fileLockConfiguration=file_lock_configuration,
571        )
572
573    def cancel_large_file(self, file_id):
574        file_sim = self.file_id_to_file[file_id]
575        key = (file_sim.name, file_id)
576        del self.file_name_and_id_to_file[key]
577        del self.file_id_to_file[file_id]
578        return dict(
579            accountId=self.account_id,
580            bucketId=self.bucket_id,
581            fileId=file_id,
582            fileName=file_sim.name
583        )  # yapf: disable
584
585    def delete_file_version(self, file_id, file_name):
586        key = (file_name, file_id)
587        file_sim = self.file_name_and_id_to_file[key]
588        del self.file_name_and_id_to_file[key]
589        del self.file_id_to_file[file_id]
590        return dict(fileId=file_id, fileName=file_name, uploadTimestamp=file_sim.upload_timestamp)
591
592    def download_file_by_id(
593        self,
594        account_auth_token_or_none,
595        file_id,
596        url,
597        range_=None,
598        encryption: Optional[EncryptionSetting] = None,
599    ):
600        file_sim = self.file_id_to_file[file_id]
601        file_sim.check_encryption(encryption)
602        return self._download_file_sim(account_auth_token_or_none, file_sim, url, range_=range_)
603
604    def download_file_by_name(
605        self,
606        account_auth_token_or_none,
607        file_name,
608        url,
609        range_=None,
610        encryption: Optional[EncryptionSetting] = None,
611    ):
612        files = self.list_file_names(self.api.current_token, file_name,
613                                     1)['files']  # token is not important here
614        if len(files) == 0:
615            raise FileNotPresent(file_id_or_name=file_name)
616        file_dict = files[0]
617        if file_dict['fileName'] != file_name or file_dict['action'] != 'upload':
618            raise FileNotPresent(file_id_or_name=file_name)
619        file_sim = self.file_name_and_id_to_file[(file_name, file_dict['fileId'])]
620        file_sim.check_encryption(encryption)
621        return self._download_file_sim(account_auth_token_or_none, file_sim, url, range_=range_)
622
623    def _download_file_sim(self, account_auth_token_or_none, file_sim, url, range_=None):
624        return ResponseContextManager(
625            self.RESPONSE_CLASS(
626                account_auth_token_or_none,
627                file_sim,
628                url,
629                range_,
630            )
631        )
632
633    def finish_large_file(self, account_auth_token, file_id, part_sha1_array):
634        file_sim = self.file_id_to_file[file_id]
635        file_sim.finish(part_sha1_array)
636        return file_sim.as_upload_result(account_auth_token)
637
638    def get_file_info_by_id(self, account_auth_token, file_id):
639        return self.file_id_to_file[file_id].as_upload_result(account_auth_token)
640
641    def get_file_info_by_name(self, account_auth_token, file_name):
642        for ((name, id), file) in self.file_name_and_id_to_file.items():
643            if file_name == name:
644                return file.as_download_headers(account_auth_token_or_none=account_auth_token)
645        raise FileNotPresent(file_id_or_name=file_name, bucket_name=self.bucket_name)
646
647    def get_upload_url(self, account_auth_token):
648        upload_id = next(self.upload_url_counter)
649        upload_url = 'https://upload.example.com/%s/%d/%s' % (
650            self.bucket_id, upload_id, account_auth_token
651        )
652        return dict(bucketId=self.bucket_id, uploadUrl=upload_url, authorizationToken=upload_url)
653
654    def get_upload_part_url(self, account_auth_token, file_id):
655        upload_url = 'https://upload.example.com/part/%s/%d/%s' % (
656            file_id, random.randint(1, 10**9), account_auth_token
657        )
658        return dict(bucketId=self.bucket_id, uploadUrl=upload_url, authorizationToken=upload_url)
659
660    def hide_file(self, account_auth_token, file_name):
661        file_id = self._next_file_id()
662        file_sim = self.FILE_SIMULATOR_CLASS(
663            self.account_id, self, file_id, 'hide', file_name, None, "none", {}, b'',
664            next(self.upload_timestamp_counter)
665        )
666        self.file_id_to_file[file_id] = file_sim
667        self.file_name_and_id_to_file[file_sim.sort_key()] = file_sim
668        return file_sim.as_list_files_dict(account_auth_token)
669
670    def update_file_retention(
671        self,
672        account_auth_token,
673        file_id,
674        file_name,
675        file_retention: FileRetentionSetting,
676        bypass_governance: bool = False,
677    ):
678        file_sim = self.file_id_to_file[file_id]
679        assert self.is_file_lock_enabled
680        assert file_sim.name == file_name
681        # TODO: check bypass etc
682        file_sim.file_retention = file_retention
683        return {
684            'fileId': file_id,
685            'fileName': file_name,
686            'fileRetention': file_sim.file_retention.serialize_to_json_for_request(),
687        }
688
689    def update_file_legal_hold(
690        self,
691        account_auth_token,
692        file_id,
693        file_name,
694        legal_hold: LegalHold,
695    ):
696        file_sim = self.file_id_to_file[file_id]
697        assert self.is_file_lock_enabled
698        assert file_sim.name == file_name
699        file_sim.legal_hold = legal_hold
700        return {
701            'fileId': file_id,
702            'fileName': file_name,
703            'legalHold': legal_hold.to_server(),
704        }
705
706    def copy_file(
707        self,
708        file_id,
709        new_file_name,
710        bytes_range=None,
711        metadata_directive=None,
712        content_type=None,
713        file_info=None,
714        destination_bucket_id=None,
715        destination_server_side_encryption: Optional[EncryptionSetting] = None,
716        source_server_side_encryption: Optional[EncryptionSetting] = None,
717        file_retention: Optional[FileRetentionSetting] = None,
718        legal_hold: Optional[LegalHold] = None,
719    ):
720        if metadata_directive is not None:
721            assert metadata_directive in tuple(MetadataDirectiveMode)
722            if metadata_directive is MetadataDirectiveMode.COPY and (
723                content_type is not None or file_info is not None
724            ):
725                raise InvalidMetadataDirective(
726                    'content_type and file_info should be None when metadata_directive is COPY'
727                )
728            elif metadata_directive is MetadataDirectiveMode.REPLACE and content_type is None:
729                raise InvalidMetadataDirective(
730                    'content_type cannot be None when metadata_directive is REPLACE'
731                )
732
733        file_sim = self.file_id_to_file[file_id]
734        file_sim.check_encryption(source_server_side_encryption)
735        new_file_id = self._next_file_id()
736
737        data_bytes = get_bytes_range(file_sim.data_bytes, bytes_range)
738
739        destination_bucket = self.api.bucket_id_to_bucket.get(destination_bucket_id, self)
740        sse = destination_server_side_encryption or self.default_server_side_encryption
741        copy_file_sim = self.FILE_SIMULATOR_CLASS(
742            self.account_id,
743            destination_bucket,
744            new_file_id,
745            'copy',
746            new_file_name,
747            file_sim.content_type,
748            file_sim.content_sha1,
749            file_sim.file_info,
750            data_bytes,
751            next(self.upload_timestamp_counter),
752            server_side_encryption=sse,
753            file_retention=file_retention,
754            legal_hold=legal_hold,
755        )
756
757        if metadata_directive is MetadataDirectiveMode.REPLACE:
758            copy_file_sim.content_type = content_type
759            copy_file_sim.file_info = file_info or file_sim.file_info
760
761        return copy_file_sim
762
763    def list_file_names(
764        self,
765        account_auth_token,
766        start_file_name=None,
767        max_file_count=None,
768        prefix=None,
769    ):
770        assert prefix is None or start_file_name is None or start_file_name.startswith(prefix
771                                                                                      ), locals()
772        start_file_name = start_file_name or ''
773        max_file_count = max_file_count or 100
774        result_files = []
775        next_file_name = None
776        prev_file_name = None
777        for key in sorted(self.file_name_and_id_to_file):
778            (file_name, file_id) = key
779            assert file_id
780            if start_file_name <= file_name and file_name != prev_file_name:
781                if prefix is not None and not file_name.startswith(prefix):
782                    break
783                prev_file_name = file_name
784                file_sim = self.file_name_and_id_to_file[key]
785                if file_sim.is_visible():
786                    result_files.append(file_sim.as_list_files_dict(account_auth_token))
787                    if len(result_files) == max_file_count:
788                        next_file_name = file_sim.name + ' '
789                        break
790        return dict(files=result_files, nextFileName=next_file_name)
791
792    def list_file_versions(
793        self,
794        account_auth_token,
795        start_file_name=None,
796        start_file_id=None,
797        max_file_count=None,
798        prefix=None,
799    ):
800        assert prefix is None or start_file_name is None or start_file_name.startswith(prefix
801                                                                                      ), locals()
802        start_file_name = start_file_name or ''
803        start_file_id = start_file_id or ''
804        max_file_count = max_file_count or 100
805        result_files = []
806        next_file_name = None
807        next_file_id = None
808        for key in sorted(self.file_name_and_id_to_file):
809            (file_name, file_id) = key
810            if (start_file_name < file_name) or (
811                start_file_name == file_name and
812                (start_file_id == '' or int(start_file_id) <= int(file_id))
813            ):
814                file_sim = self.file_name_and_id_to_file[key]
815                if prefix is not None and not file_name.startswith(prefix):
816                    break
817                result_files.append(file_sim.as_list_files_dict(account_auth_token))
818                if len(result_files) == max_file_count:
819                    next_file_name = file_sim.name
820                    next_file_id = str(int(file_id) + 1)
821                    break
822        return dict(files=result_files, nextFileName=next_file_name, nextFileId=next_file_id)
823
824    def list_parts(self, file_id, start_part_number, max_part_count):
825        file_sim = self.file_id_to_file[file_id]
826        return file_sim.list_parts(start_part_number, max_part_count)
827
828    def list_unfinished_large_files(
829        self, account_auth_token, start_file_id=None, max_file_count=None, prefix=None
830    ):
831        start_file_id = start_file_id or self.FIRST_FILE_ID
832        max_file_count = max_file_count or 100
833        all_unfinished_ids = set(
834            k for (k, v) in self.file_id_to_file.items()
835            if v.action == 'start' and k <= start_file_id and
836            (prefix is None or v.name.startswith(prefix))
837        )
838        ids_in_order = sorted(all_unfinished_ids, reverse=True)
839
840        file_dict_list = [
841            file_sim.as_start_large_file_result(account_auth_token)
842            for file_sim in (
843                self.file_id_to_file[file_id] for file_id in ids_in_order[:max_file_count]
844            )
845        ]  # yapf: disable
846        next_file_id = None
847        if len(file_dict_list) == max_file_count:
848            next_file_id = str(int(file_dict_list[-1]['fileId']) - 1)
849        return dict(files=file_dict_list, nextFileId=next_file_id)
850
851    def start_large_file(
852        self,
853        account_auth_token,
854        file_name,
855        content_type,
856        file_info,
857        server_side_encryption: Optional[EncryptionSetting] = None,
858        file_retention: Optional[FileRetentionSetting] = None,
859        legal_hold: Optional[LegalHold] = None,
860    ):
861        file_id = self._next_file_id()
862        sse = server_side_encryption or self.default_server_side_encryption
863        if sse:  # FIXME: remove this part when RawApi<->Encryption adapters are implemented properly
864            file_info = sse.add_key_id_to_file_info(file_info)
865        file_sim = self.FILE_SIMULATOR_CLASS(
866            self.account_id, self, file_id, 'start', file_name, content_type, 'none',
867            file_info, None, next(self.upload_timestamp_counter), server_side_encryption=sse,
868            file_retention=file_retention, legal_hold=legal_hold,
869        )  # yapf: disable
870        self.file_id_to_file[file_id] = file_sim
871        self.file_name_and_id_to_file[file_sim.sort_key()] = file_sim
872        return file_sim.as_start_large_file_result(account_auth_token)
873
874    def _update_bucket(
875        self,
876        bucket_type=None,
877        bucket_info=None,
878        cors_rules=None,
879        lifecycle_rules=None,
880        if_revision_is: Optional[int] = None,
881        default_server_side_encryption: Optional[EncryptionSetting] = None,
882        default_retention: Optional[BucketRetentionSetting] = None,
883    ):
884        if if_revision_is is not None and self.revision != if_revision_is:
885            raise Conflict()
886
887        if bucket_type is not None:
888            self.bucket_type = bucket_type
889        if bucket_info is not None:
890            self.bucket_info = bucket_info
891        if cors_rules is not None:
892            self.cors_rules = cors_rules
893        if lifecycle_rules is not None:
894            self.lifecycle_rules = lifecycle_rules
895        if default_server_side_encryption is not None:
896            self.default_server_side_encryption = default_server_side_encryption
897        if default_retention:
898            self.default_retention = default_retention
899        self.revision += 1
900        return self.bucket_dict(self.api.current_token)
901
902    def upload_file(
903        self,
904        upload_id,
905        upload_auth_token,
906        file_name,
907        content_length,
908        content_type,
909        content_sha1,
910        file_infos,
911        data_stream,
912        server_side_encryption: Optional[EncryptionSetting] = None,
913        file_retention: Optional[FileRetentionSetting] = None,
914        legal_hold: Optional[LegalHold] = None,
915    ):
916        data_bytes = self._simulate_chunked_post(data_stream, content_length)
917        assert len(data_bytes) == content_length
918        if content_sha1 == HEX_DIGITS_AT_END:
919            content_sha1 = data_bytes[-40:].decode()
920            data_bytes = data_bytes[0:-40]
921            content_length -= 40
922        elif len(content_sha1) != 40:
923            raise ValueError(content_sha1)
924        computed_sha1 = hex_sha1_of_bytes(data_bytes)
925        if content_sha1 != computed_sha1:
926            raise FileSha1Mismatch(file_name)
927        file_id = self._next_file_id()
928
929        encryption = server_side_encryption or self.default_server_side_encryption
930        if encryption:  # FIXME: remove this part when RawApi<->Encryption adapters are implemented properly
931            file_infos = encryption.add_key_id_to_file_info(file_infos)
932
933        file_sim = self.FILE_SIMULATOR_CLASS(
934            self.account_id,
935            self,
936            file_id,
937            'upload',
938            file_name,
939            content_type,
940            content_sha1,
941            file_infos,
942            data_bytes,
943            next(self.upload_timestamp_counter),
944            server_side_encryption=encryption,
945            file_retention=file_retention,
946            legal_hold=legal_hold,
947        )
948        self.file_id_to_file[file_id] = file_sim
949        self.file_name_and_id_to_file[file_sim.sort_key()] = file_sim
950        return file_sim.as_upload_result(upload_auth_token)
951
952    def upload_part(
953        self,
954        file_id,
955        part_number,
956        content_length,
957        sha1_sum,
958        input_stream,
959        server_side_encryption: Optional[EncryptionSetting] = None,
960    ):
961        file_sim = self.file_id_to_file[file_id]
962        part_data = self._simulate_chunked_post(input_stream, content_length)
963        assert len(part_data) == content_length
964        if sha1_sum == HEX_DIGITS_AT_END:
965            sha1_sum = part_data[-40:].decode()
966            part_data = part_data[0:-40]
967            content_length -= 40
968        computed_sha1 = hex_sha1_of_bytes(part_data)
969        if sha1_sum != computed_sha1:
970            raise PartSha1Mismatch(file_id)
971        part = PartSimulator(file_sim.file_id, part_number, content_length, sha1_sum, part_data)
972        file_sim.add_part(part_number, part)
973        result = dict(
974            fileId=file_id,
975            partNumber=part_number,
976            contentLength=content_length,
977            contentSha1=sha1_sum,
978        )  # yapf: disable
979        if server_side_encryption is not None:
980            result['serverSideEncryption'] = server_side_encryption.serialize_to_json_for_request()
981        return result
982
983    def _simulate_chunked_post(
984        self, stream, content_length, min_chunks=4, max_chunk_size=8096, simulate_retry=True
985    ):
986        chunk_size = max_chunk_size
987        chunks_num = self._chunks_number(content_length, chunk_size)
988        if chunks_num < min_chunks:
989            chunk_size = max(content_length // min_chunks, 1)
990        loop_count = 2 if simulate_retry else 1
991        stream_data = None
992        for _ in range(loop_count):
993            chunks = []
994            stream.seek(0)  # we always do this in `do_post` in `b2http` so we want it here *always*
995            while True:
996                data = stream.read(chunk_size)
997                chunks.append(data)
998                if not data:
999                    break
1000            _stream_data = b''.join(chunks)
1001            if stream_data is not None:
1002                assert _stream_data == stream_data
1003            stream_data = _stream_data
1004        return stream_data
1005
1006    def _chunks_number(self, content_length, chunk_size):
1007        chunks_number = content_length // chunk_size
1008        if content_length % chunk_size > 0:
1009            chunks_number = chunks_number + 1
1010        return chunks_number
1011
1012    def _next_file_id(self):
1013        return str(next(self.file_id_counter))
1014
1015
1016class RawSimulator(AbstractRawApi):
1017    """
1018    Implement the same interface as B2RawHTTPApi by simulating all of the
1019    calls and keeping state in memory.
1020
1021    The intended use for this class is for unit tests that test things
1022    built on top of B2RawHTTPApi.
1023    """
1024
1025    BUCKET_SIMULATOR_CLASS = BucketSimulator
1026    API_URL = 'http://api.example.com'
1027    S3_API_URL = 'http://s3.api.example.com'
1028    DOWNLOAD_URL = 'http://download.example.com'
1029
1030    MIN_PART_SIZE = 200
1031
1032    # This is the maximum duration in seconds that an application key can be valid (1000 days).
1033    MAX_DURATION_IN_SECONDS = 86400000
1034
1035    UPLOAD_PART_MATCHER = re.compile('https://upload.example.com/part/([^/]*)')
1036    UPLOAD_URL_MATCHER = re.compile(r'https://upload.example.com/([^/]*)/([^/]*)')
1037    DOWNLOAD_URL_MATCHER = re.compile(
1038        DOWNLOAD_URL + '(?:' + '|'.join(
1039            (
1040                '/b2api/v[0-9]+/b2_download_file_by_id\?fileId=(?P<file_id>[^/]+)',
1041                '/file/(?P<bucket_name>[^/]+)/(?P<file_name>.+)',
1042            )
1043        ) + ')$'
1044    )  # yapf: disable
1045
1046    def __init__(self, b2_http=None):
1047        # Map from application_key_id to KeySimulator.
1048        # The entry for the master application key ID is for the master application
1049        # key for the account, and the entries with non-master application keys
1050        # are for keys created b2 createKey().
1051        self.key_id_to_key = dict()
1052
1053        # Map from auth token to the KeySimulator for it.
1054        self.auth_token_to_key = dict()
1055
1056        # Set of auth tokens that have expired
1057        self.expired_auth_tokens = set()
1058
1059        # Map from auth token to a lock that upload procedure acquires
1060        # when utilizing the token
1061        self.currently_used_auth_tokens = collections.defaultdict(threading.Lock)
1062
1063        # Counter for generating auth tokens.
1064        self.auth_token_counter = 0
1065
1066        # Counter for generating account IDs an their matching master application keys.
1067        self.account_counter = 0
1068
1069        self.bucket_name_to_bucket = dict()
1070        self.bucket_id_to_bucket = dict()
1071        self.bucket_id_counter = iter(range(100))
1072        self.file_id_to_bucket_id = {}
1073        self.all_application_keys = []
1074        self.app_key_counter = 0
1075        self.upload_errors = []
1076
1077    def expire_auth_token(self, auth_token):
1078        """
1079        Simulate the auth token expiring.
1080
1081        The next call that tries to use this auth token will get an
1082        auth_token_expired error.
1083        """
1084        assert auth_token in self.auth_token_to_key
1085        self.expired_auth_tokens.add(auth_token)
1086
1087    def create_account(self):
1088        """
1089        Return (accountId, masterApplicationKey) for a newly created account.
1090        """
1091        # Pick the IDs for the account and the key
1092        account_id = 'account-%d' % (self.account_counter,)
1093        master_key = 'masterKey-%d' % (self.account_counter,)
1094        self.account_counter += 1
1095
1096        # Create the key
1097        self.key_id_to_key[account_id] = KeySimulator(
1098            account_id=account_id,
1099            name='master',
1100            application_key_id=account_id,
1101            key=master_key,
1102            capabilities=ALL_CAPABILITIES,
1103            expiration_timestamp_or_none=None,
1104            bucket_id_or_none=None,
1105            bucket_name_or_none=None,
1106            name_prefix_or_none=None,
1107        )
1108
1109        # Return the info
1110        return (account_id, master_key)
1111
1112    def set_upload_errors(self, errors):
1113        """
1114        Store a sequence of exceptions to raise on upload.  Each one will
1115        be raised in turn, until they are all gone.  Then the next upload
1116        will succeed.
1117        """
1118        assert len(self.upload_errors) == 0
1119        self.upload_errors = errors
1120
1121    def authorize_account(self, realm_url, application_key_id, application_key):
1122        key_sim = self.key_id_to_key.get(application_key_id)
1123        if key_sim is None:
1124            raise InvalidAuthToken('application key ID not valid', 'unauthorized')
1125        if application_key != key_sim.key:
1126            raise InvalidAuthToken('secret key is wrong', 'unauthorized')
1127        auth_token = 'auth_token_%d' % (self.auth_token_counter,)
1128        self.current_token = auth_token
1129        self.auth_token_counter += 1
1130        self.auth_token_to_key[auth_token] = key_sim
1131        allowed = key_sim.get_allowed()
1132        bucketId = allowed.get('bucketId')
1133        if (bucketId is not None) and (bucketId in self.bucket_id_to_bucket):
1134            allowed['bucketName'] = self.bucket_id_to_bucket[bucketId].bucket_name
1135        else:
1136            allowed['bucketName'] = None
1137        return dict(
1138            accountId=key_sim.account_id,
1139            authorizationToken=auth_token,
1140            apiUrl=self.API_URL,
1141            downloadUrl=self.DOWNLOAD_URL,
1142            recommendedPartSize=self.MIN_PART_SIZE,
1143            absoluteMinimumPartSize=self.MIN_PART_SIZE,
1144            allowed=allowed,
1145            s3ApiUrl=self.S3_API_URL,
1146        )
1147
1148    def cancel_large_file(self, api_url, account_auth_token, file_id):
1149        bucket_id = self.file_id_to_bucket_id[file_id]
1150        bucket = self._get_bucket_by_id(bucket_id)
1151        self._assert_account_auth(api_url, account_auth_token, bucket.account_id, 'writeFiles')
1152        return bucket.cancel_large_file(file_id)
1153
1154    def create_bucket(
1155        self,
1156        api_url,
1157        account_auth_token,
1158        account_id,
1159        bucket_name,
1160        bucket_type,
1161        bucket_info=None,
1162        cors_rules=None,
1163        lifecycle_rules=None,
1164        default_server_side_encryption: Optional[EncryptionSetting] = None,
1165        is_file_lock_enabled: Optional[bool] = None,
1166    ):
1167        if not re.match(r'^[-a-zA-Z0-9]*$', bucket_name):
1168            raise BadJson('illegal bucket name: ' + bucket_name)
1169        self._assert_account_auth(api_url, account_auth_token, account_id, 'writeBuckets')
1170        if bucket_name in self.bucket_name_to_bucket:
1171            raise DuplicateBucketName(bucket_name)
1172        bucket_id = 'bucket_' + str(next(self.bucket_id_counter))
1173        bucket = self.BUCKET_SIMULATOR_CLASS(
1174            self,
1175            account_id,
1176            bucket_id,
1177            bucket_name,
1178            bucket_type,
1179            bucket_info,
1180            cors_rules,
1181            lifecycle_rules,
1182            # watch out for options!
1183            default_server_side_encryption=default_server_side_encryption,
1184            is_file_lock_enabled=is_file_lock_enabled,
1185        )
1186        self.bucket_name_to_bucket[bucket_name] = bucket
1187        self.bucket_id_to_bucket[bucket_id] = bucket
1188        return bucket.bucket_dict(account_auth_token)  # TODO it should be an object, right?
1189
1190    def create_key(
1191        self,
1192        api_url,
1193        account_auth_token,
1194        account_id,
1195        capabilities,
1196        key_name,
1197        valid_duration_seconds,
1198        bucket_id,
1199        name_prefix,
1200    ):
1201        if not re.match(r'^[A-Za-z0-9-]{1,100}$', key_name):
1202            raise BadJson('illegal key name: ' + key_name)
1203        if valid_duration_seconds is not None:
1204            if valid_duration_seconds < 1 or valid_duration_seconds > self.MAX_DURATION_IN_SECONDS:
1205                raise BadJson(
1206                    'valid duration must be greater than 0, and less than 1000 days in seconds'
1207                )
1208        self._assert_account_auth(api_url, account_auth_token, account_id, 'writeKeys')
1209
1210        if valid_duration_seconds is None:
1211            expiration_timestamp_or_none = None
1212        else:
1213            expiration_timestamp_or_none = int(time.time() + valid_duration_seconds)
1214
1215        index = self.app_key_counter
1216        self.app_key_counter += 1
1217        application_key_id = 'appKeyId%d' % (index,)
1218        app_key = 'appKey%d' % (index,)
1219        if bucket_id is None:
1220            bucket_name_or_none = None
1221        else:
1222            bucket_name_or_none = self._get_bucket_by_id(bucket_id).bucket_name
1223        key_sim = KeySimulator(
1224            account_id=account_id,
1225            name=key_name,
1226            application_key_id=application_key_id,
1227            key=app_key,
1228            capabilities=capabilities,
1229            expiration_timestamp_or_none=expiration_timestamp_or_none,
1230            bucket_id_or_none=bucket_id,
1231            bucket_name_or_none=bucket_name_or_none,
1232            name_prefix_or_none=name_prefix,
1233        )
1234        self.key_id_to_key[application_key_id] = key_sim
1235        self.all_application_keys.append(key_sim)
1236        return key_sim.as_created_key()
1237
1238    def delete_file_version(self, api_url, account_auth_token, file_id, file_name):
1239        bucket_id = self.file_id_to_bucket_id[file_id]
1240        bucket = self._get_bucket_by_id(bucket_id)
1241        self._assert_account_auth(api_url, account_auth_token, bucket.account_id, 'deleteFiles')
1242        return bucket.delete_file_version(file_id, file_name)
1243
1244    def update_file_retention(
1245        self,
1246        api_url,
1247        account_auth_token,
1248        file_id,
1249        file_name,
1250        file_retention: FileRetentionSetting,
1251        bypass_governance: bool = False,
1252    ):
1253        bucket_id = self.file_id_to_bucket_id[file_id]
1254        bucket = self._get_bucket_by_id(bucket_id)
1255        return bucket.update_file_retention(
1256            account_auth_token,
1257            file_id,
1258            file_name,
1259            file_retention,
1260            bypass_governance,
1261        )
1262
1263    def update_file_legal_hold(
1264        self,
1265        api_url,
1266        account_auth_token,
1267        file_id,
1268        file_name,
1269        legal_hold: bool,
1270    ):
1271        bucket_id = self.file_id_to_bucket_id[file_id]
1272        bucket = self._get_bucket_by_id(bucket_id)
1273        return bucket.update_file_legal_hold(
1274            account_auth_token,
1275            file_id,
1276            file_name,
1277            legal_hold,
1278        )
1279
1280    def delete_bucket(self, api_url, account_auth_token, account_id, bucket_id):
1281        self._assert_account_auth(api_url, account_auth_token, account_id, 'deleteBuckets')
1282        bucket = self._get_bucket_by_id(bucket_id)
1283        del self.bucket_name_to_bucket[bucket.bucket_name]
1284        del self.bucket_id_to_bucket[bucket_id]
1285        return bucket.bucket_dict(account_auth_token)
1286
1287    def download_file_from_url(
1288        self,
1289        account_auth_token_or_none,
1290        url,
1291        range_=None,
1292        encryption: Optional[EncryptionSetting] = None
1293    ):
1294        # TODO: check auth token if bucket is not public
1295        matcher = self.DOWNLOAD_URL_MATCHER.match(url)
1296        assert matcher is not None, url
1297        groupdict = matcher.groupdict()
1298        file_id = groupdict['file_id']
1299        bucket_name = groupdict['bucket_name']
1300        file_name = groupdict['file_name']
1301        if file_id is not None:
1302            bucket_id = self.file_id_to_bucket_id[file_id]
1303            bucket = self._get_bucket_by_id(bucket_id)
1304            return bucket.download_file_by_id(
1305                account_auth_token_or_none,
1306                file_id,
1307                range_=range_,
1308                url=url,
1309                encryption=encryption,
1310            )
1311        elif bucket_name is not None and file_name is not None:
1312            bucket = self._get_bucket_by_name(bucket_name)
1313            return bucket.download_file_by_name(
1314                account_auth_token_or_none,
1315                b2_url_decode(file_name),
1316                range_=range_,
1317                url=url,
1318                encryption=encryption,
1319            )
1320        else:
1321            assert False
1322
1323    def delete_key(self, api_url, account_auth_token, application_key_id):
1324        assert api_url == self.API_URL
1325        key_sim = self.key_id_to_key.pop(application_key_id, None)
1326        if key_sim is None:
1327            raise BadRequest(
1328                'application key does not exist: %s' % (application_key_id,),
1329                'bad_request',
1330            )
1331        return key_sim.as_key()
1332
1333    def finish_large_file(self, api_url, account_auth_token, file_id, part_sha1_array):
1334        bucket_id = self.file_id_to_bucket_id[file_id]
1335        bucket = self._get_bucket_by_id(bucket_id)
1336        self._assert_account_auth(api_url, account_auth_token, bucket.account_id, 'writeFiles')
1337        return bucket.finish_large_file(account_auth_token, file_id, part_sha1_array)
1338
1339    def get_download_authorization(
1340        self, api_url, account_auth_token, bucket_id, file_name_prefix, valid_duration_in_seconds
1341    ):
1342        bucket = self._get_bucket_by_id(bucket_id)
1343        self._assert_account_auth(api_url, account_auth_token, bucket.account_id, 'shareFiles')
1344        return {
1345            'bucketId':
1346                bucket_id,
1347            'fileNamePrefix':
1348                file_name_prefix,
1349            'authorizationToken':
1350                'fake_download_auth_token_%s_%s_%d' % (
1351                    bucket_id,
1352                    b2_url_encode(file_name_prefix),
1353                    valid_duration_in_seconds,
1354                )
1355        }
1356
1357    def get_file_info_by_id(self, api_url, account_auth_token, file_id):
1358        bucket_id = self.file_id_to_bucket_id[file_id]
1359        bucket = self._get_bucket_by_id(bucket_id)
1360        return bucket.get_file_info_by_id(account_auth_token, file_id)
1361
1362    def get_file_info_by_name(self, api_url, account_auth_token, bucket_name, file_name):
1363        bucket = self._get_bucket_by_name(bucket_name)
1364        info = bucket.get_file_info_by_name(account_auth_token, file_name)
1365        return info
1366
1367    def get_upload_url(self, api_url, account_auth_token, bucket_id):
1368        bucket = self._get_bucket_by_id(bucket_id)
1369        self._assert_account_auth(api_url, account_auth_token, bucket.account_id, 'writeFiles')
1370        return self._get_bucket_by_id(bucket_id).get_upload_url(account_auth_token)
1371
1372    def get_upload_part_url(self, api_url, account_auth_token, file_id):
1373        bucket_id = self.file_id_to_bucket_id[file_id]
1374        bucket = self._get_bucket_by_id(bucket_id)
1375        self._assert_account_auth(api_url, account_auth_token, bucket.account_id, 'writeFiles')
1376        return self._get_bucket_by_id(bucket_id).get_upload_part_url(account_auth_token, file_id)
1377
1378    def hide_file(self, api_url, account_auth_token, bucket_id, file_name):
1379        bucket = self._get_bucket_by_id(bucket_id)
1380        self._assert_account_auth(api_url, account_auth_token, bucket.account_id, 'writeFiles')
1381        response = bucket.hide_file(account_auth_token, file_name)
1382        self.file_id_to_bucket_id[response['fileId']] = bucket_id
1383        return response
1384
1385    def copy_file(
1386        self,
1387        api_url,
1388        account_auth_token,
1389        source_file_id,
1390        new_file_name,
1391        bytes_range=None,
1392        metadata_directive=None,
1393        content_type=None,
1394        file_info=None,
1395        destination_bucket_id=None,
1396        destination_server_side_encryption=None,
1397        source_server_side_encryption=None,
1398        file_retention: Optional[FileRetentionSetting] = None,
1399        legal_hold: Optional[LegalHold] = None,
1400    ):
1401        bucket_id = self.file_id_to_bucket_id[source_file_id]
1402        bucket = self._get_bucket_by_id(bucket_id)
1403        self._assert_account_auth(api_url, account_auth_token, bucket.account_id, 'writeFiles')
1404        copy_file_sim = bucket.copy_file(
1405            source_file_id,
1406            new_file_name,
1407            bytes_range,
1408            metadata_directive,
1409            content_type,
1410            file_info,
1411            destination_bucket_id,
1412            destination_server_side_encryption=destination_server_side_encryption,
1413            source_server_side_encryption=source_server_side_encryption,
1414            file_retention=file_retention,
1415            legal_hold=legal_hold,
1416        )
1417
1418        if destination_bucket_id:
1419            # TODO: Handle and raise proper exception after server docs get updated
1420            dest_bucket = self.bucket_id_to_bucket[destination_bucket_id]
1421            assert dest_bucket.account_id == bucket.account_id
1422        else:
1423            dest_bucket = bucket
1424
1425        dest_bucket.file_id_to_file[copy_file_sim.file_id] = copy_file_sim
1426        dest_bucket.file_name_and_id_to_file[copy_file_sim.sort_key()] = copy_file_sim
1427
1428        return copy_file_sim.as_upload_result(account_auth_token)
1429
1430    def copy_part(
1431        self,
1432        api_url,
1433        account_auth_token,
1434        source_file_id,
1435        large_file_id,
1436        part_number,
1437        bytes_range=None,
1438        destination_server_side_encryption: Optional[EncryptionSetting] = None,
1439        source_server_side_encryption: Optional[EncryptionSetting] = None,
1440    ):
1441        if destination_server_side_encryption is not None and destination_server_side_encryption.mode == EncryptionMode.SSE_B2:
1442            raise ValueError(
1443                'unsupported sse mode for copy_part!'
1444            )  # SSE-B2 is only to be marked in b2_start_large_file
1445        src_bucket_id = self.file_id_to_bucket_id[source_file_id]
1446        src_bucket = self._get_bucket_by_id(src_bucket_id)
1447        dest_bucket_id = self.file_id_to_bucket_id[large_file_id]
1448        dest_bucket = self._get_bucket_by_id(dest_bucket_id)
1449
1450        self._assert_account_auth(api_url, account_auth_token, dest_bucket.account_id, 'writeFiles')
1451
1452        file_sim = src_bucket.file_id_to_file[source_file_id]
1453        file_sim.check_encryption(source_server_side_encryption)
1454        data_bytes = get_bytes_range(file_sim.data_bytes, bytes_range)
1455
1456        data_stream = StreamWithHash(io.BytesIO(data_bytes), len(data_bytes))
1457        content_length = len(data_stream)
1458        sha1_sum = HEX_DIGITS_AT_END
1459
1460        return dest_bucket.upload_part(
1461            large_file_id,
1462            part_number,
1463            content_length,
1464            sha1_sum,
1465            data_stream,
1466            server_side_encryption=destination_server_side_encryption,
1467        )
1468
1469    def list_buckets(
1470        self, api_url, account_auth_token, account_id, bucket_id=None, bucket_name=None
1471    ):
1472        # First, map the bucket name to a bucket_id, so that we can check auth.
1473        if bucket_name is None:
1474            bucket_id_for_auth = bucket_id
1475        else:
1476            bucket_id_for_auth = self._get_bucket_id_or_none_for_bucket_name(bucket_name)
1477        self._assert_account_auth(
1478            api_url, account_auth_token, account_id, 'listBuckets', bucket_id_for_auth
1479        )
1480
1481        # Do the query
1482        sorted_buckets = [
1483            self.bucket_name_to_bucket[name] for name in sorted(self.bucket_name_to_bucket)
1484        ]
1485        bucket_list = [
1486            bucket.bucket_dict(account_auth_token)
1487            for bucket in sorted_buckets if self._bucket_matches(bucket, bucket_id, bucket_name)
1488        ]
1489        return dict(buckets=bucket_list)
1490
1491    def _get_bucket_id_or_none_for_bucket_name(self, bucket_name):
1492        for bucket in self.bucket_name_to_bucket.values():
1493            if bucket.bucket_name == bucket_name:
1494                return bucket.bucket_id
1495
1496    def _bucket_matches(self, bucket, bucket_id, bucket_name):
1497        return (
1498            (bucket_id is None or bucket.bucket_id == bucket_id) and
1499            (bucket_name is None or bucket.bucket_name == bucket_name)
1500        )
1501
1502    def list_file_names(
1503        self,
1504        api_url,
1505        account_auth_token,
1506        bucket_id,
1507        start_file_name=None,
1508        max_file_count=None,
1509        prefix=None,
1510    ):
1511        bucket = self._get_bucket_by_id(bucket_id)
1512        self._assert_account_auth(
1513            api_url,
1514            account_auth_token,
1515            bucket.account_id,
1516            'listFiles',
1517            bucket_id=bucket_id,
1518            file_name=prefix,
1519        )
1520        return bucket.list_file_names(account_auth_token, start_file_name, max_file_count, prefix)
1521
1522    def list_file_versions(
1523        self,
1524        api_url,
1525        account_auth_token,
1526        bucket_id,
1527        start_file_name=None,
1528        start_file_id=None,
1529        max_file_count=None,
1530        prefix=None,
1531    ):
1532        bucket = self._get_bucket_by_id(bucket_id)
1533        self._assert_account_auth(
1534            api_url,
1535            account_auth_token,
1536            bucket.account_id,
1537            'listFiles',
1538            bucket_id=bucket_id,
1539            file_name=prefix,
1540        )
1541        return bucket.list_file_versions(
1542            account_auth_token,
1543            start_file_name,
1544            start_file_id,
1545            max_file_count,
1546            prefix,
1547        )
1548
1549    def list_keys(
1550        self,
1551        api_url,
1552        account_auth_token,
1553        account_id,
1554        max_key_count=1000,
1555        start_application_key_id=None
1556    ):
1557        self._assert_account_auth(api_url, account_auth_token, account_id, 'listKeys')
1558        next_application_key_id = None
1559        all_keys_sorted = sorted(self.all_application_keys, key=lambda key: key.application_key_id)
1560        if start_application_key_id is None:
1561            keys = all_keys_sorted[:max_key_count]
1562            if max_key_count < len(all_keys_sorted):
1563                next_application_key_id = all_keys_sorted[max_key_count].application_key_id
1564        else:
1565            keys = []
1566            got_already = 0
1567            for ind, key in enumerate(all_keys_sorted):
1568                if key.application_key_id >= start_application_key_id:
1569                    keys.append(key)
1570                    got_already += 1
1571                    if got_already == max_key_count:
1572                        if ind < len(all_keys_sorted) - 1:
1573                            next_application_key_id = all_keys_sorted[ind + 1].application_key_id
1574                        break
1575
1576        key_dicts = map(lambda key: key.as_key(), keys)
1577        return dict(keys=list(key_dicts), nextApplicationKeyId=next_application_key_id)
1578
1579    def list_parts(self, api_url, account_auth_token, file_id, start_part_number, max_part_count):
1580        bucket_id = self.file_id_to_bucket_id[file_id]
1581        bucket = self._get_bucket_by_id(bucket_id)
1582        self._assert_account_auth(api_url, account_auth_token, bucket.account_id, 'writeFiles')
1583        return bucket.list_parts(file_id, start_part_number, max_part_count)
1584
1585    def list_unfinished_large_files(
1586        self,
1587        api_url,
1588        account_auth_token,
1589        bucket_id,
1590        start_file_id=None,
1591        max_file_count=None,
1592        prefix=None
1593    ):
1594        bucket = self._get_bucket_by_id(bucket_id)
1595        self._assert_account_auth(
1596            api_url, account_auth_token, bucket.account_id, 'listFiles', file_name=prefix
1597        )
1598        start_file_id = start_file_id or ''
1599        max_file_count = max_file_count or 100
1600        return bucket.list_unfinished_large_files(
1601            account_auth_token, start_file_id, max_file_count, prefix
1602        )
1603
1604    def start_large_file(
1605        self,
1606        api_url,
1607        account_auth_token,
1608        bucket_id,
1609        file_name,
1610        content_type,
1611        file_info,
1612        server_side_encryption: Optional[EncryptionSetting] = None,
1613        file_retention: Optional[FileRetentionSetting] = None,
1614        legal_hold: Optional[LegalHold] = None,
1615    ):
1616        bucket = self._get_bucket_by_id(bucket_id)
1617        self._assert_account_auth(api_url, account_auth_token, bucket.account_id, 'writeFiles')
1618        result = bucket.start_large_file(
1619            account_auth_token,
1620            file_name,
1621            content_type,
1622            file_info,
1623            server_side_encryption,
1624            file_retention,
1625            legal_hold,
1626        )
1627        self.file_id_to_bucket_id[result['fileId']] = bucket_id
1628
1629        return result
1630
1631    def update_bucket(
1632        self,
1633        api_url,
1634        account_auth_token,
1635        account_id,
1636        bucket_id,
1637        bucket_type=None,
1638        bucket_info=None,
1639        cors_rules=None,
1640        lifecycle_rules=None,
1641        if_revision_is=None,
1642        default_server_side_encryption: Optional[EncryptionSetting] = None,
1643        default_retention: Optional[BucketRetentionSetting] = None,
1644    ):
1645        assert bucket_type or bucket_info or cors_rules or lifecycle_rules or default_server_side_encryption
1646        bucket = self._get_bucket_by_id(bucket_id)
1647        self._assert_account_auth(api_url, account_auth_token, bucket.account_id, 'writeBuckets')
1648        return bucket._update_bucket(
1649            bucket_type=bucket_type,
1650            bucket_info=bucket_info,
1651            cors_rules=cors_rules,
1652            lifecycle_rules=lifecycle_rules,
1653            if_revision_is=if_revision_is,
1654            default_server_side_encryption=default_server_side_encryption,
1655            default_retention=default_retention,
1656        )
1657
1658    def upload_file(
1659        self,
1660        upload_url,
1661        upload_auth_token,
1662        file_name,
1663        content_length,
1664        content_type,
1665        content_sha1,
1666        file_infos,
1667        data_stream,
1668        server_side_encryption: Optional[EncryptionSetting] = None,
1669        file_retention: Optional[FileRetentionSetting] = None,
1670        legal_hold: Optional[LegalHold] = None,
1671    ):
1672        with ConcurrentUsedAuthTokenGuard(
1673            self.currently_used_auth_tokens[upload_auth_token], upload_auth_token
1674        ):
1675            assert upload_url == upload_auth_token
1676            url_match = self.UPLOAD_URL_MATCHER.match(upload_url)
1677            if url_match is None:
1678                raise BadUploadUrl(upload_url)
1679            if self.upload_errors:
1680                raise self.upload_errors.pop(0)
1681            bucket_id, upload_id = url_match.groups()
1682            bucket = self._get_bucket_by_id(bucket_id)
1683            if server_side_encryption is not None:
1684                assert server_side_encryption.mode in (
1685                    EncryptionMode.NONE, EncryptionMode.SSE_B2, EncryptionMode.SSE_C
1686                )
1687                file_infos = server_side_encryption.add_key_id_to_file_info(file_infos)
1688            response = bucket.upload_file(
1689                upload_id,
1690                upload_auth_token,
1691                file_name,
1692                content_length,
1693                content_type,
1694                content_sha1,
1695                file_infos,
1696                data_stream,
1697                server_side_encryption,
1698                file_retention,
1699                legal_hold,
1700            )
1701            file_id = response['fileId']
1702            self.file_id_to_bucket_id[file_id] = bucket_id
1703
1704        return response
1705
1706    def upload_part(
1707        self,
1708        upload_url,
1709        upload_auth_token,
1710        part_number,
1711        content_length,
1712        sha1_sum,
1713        input_stream,
1714        server_side_encryption: Optional[EncryptionSetting] = None,
1715    ):
1716        with ConcurrentUsedAuthTokenGuard(
1717            self.currently_used_auth_tokens[upload_auth_token], upload_auth_token
1718        ):
1719            url_match = self.UPLOAD_PART_MATCHER.match(upload_url)
1720            if url_match is None:
1721                raise BadUploadUrl(upload_url)
1722            file_id = url_match.group(1)
1723            bucket_id = self.file_id_to_bucket_id[file_id]
1724            bucket = self._get_bucket_by_id(bucket_id)
1725            part = bucket.upload_part(
1726                file_id, part_number, content_length, sha1_sum, input_stream, server_side_encryption
1727            )
1728        return part
1729
1730    def _assert_account_auth(
1731        self, api_url, account_auth_token, account_id, capability, bucket_id=None, file_name=None
1732    ):
1733        key_sim = self.auth_token_to_key.get(account_auth_token)
1734        assert key_sim is not None
1735        assert api_url == self.API_URL
1736        assert account_id == key_sim.account_id
1737        if account_auth_token in self.expired_auth_tokens:
1738            raise InvalidAuthToken('auth token expired', 'auth_token_expired')
1739        if capability not in key_sim.capabilities:
1740            raise Unauthorized('', 'unauthorized')
1741        if key_sim.bucket_id_or_none is not None and key_sim.bucket_id_or_none != bucket_id:
1742            raise Unauthorized('', 'unauthorized')
1743        if key_sim.name_prefix_or_none is not None:
1744            if file_name is not None and not file_name.startswith(key_sim.name_prefix_or_none):
1745                raise Unauthorized('', 'unauthorized')
1746
1747    def _get_bucket_by_id(self, bucket_id):
1748        if bucket_id not in self.bucket_id_to_bucket:
1749            raise NonExistentBucket(bucket_id)
1750        return self.bucket_id_to_bucket[bucket_id]
1751
1752    def _get_bucket_by_name(self, bucket_name):
1753        if bucket_name not in self.bucket_name_to_bucket:
1754            raise NonExistentBucket(bucket_name)
1755        return self.bucket_name_to_bucket[bucket_name]
1756