1# -------------------------------------------------------------------------
2# Copyright (c) Microsoft Corporation. All rights reserved.
3# Licensed under the MIT License. See License.txt in the project root for
4# license information.
5# --------------------------------------------------------------------------
6# pylint: disable=no-self-use
7
8from io import SEEK_SET, UnsupportedOperation
9from typing import Optional, Union, Any, TypeVar, TYPE_CHECKING # pylint: disable=unused-import
10
11import six
12from azure.core.exceptions import ResourceExistsError, ResourceModifiedError
13
14from ._shared.response_handlers import (
15    process_storage_error,
16    return_response_headers)
17from ._shared.models import StorageErrorCode
18from ._shared.uploads import (
19    upload_data_chunks,
20    upload_substream_blocks,
21    BlockBlobChunkUploader,
22    PageBlobChunkUploader,
23    AppendBlobChunkUploader)
24from ._shared.encryption import generate_blob_encryption_data, encrypt_blob
25from ._generated.models import (
26    StorageErrorException,
27    BlockLookupList,
28    AppendPositionAccessConditions,
29    ModifiedAccessConditions,
30)
31
32if TYPE_CHECKING:
33    from datetime import datetime # pylint: disable=unused-import
34    BlobLeaseClient = TypeVar("BlobLeaseClient")
35
36_LARGE_BLOB_UPLOAD_MAX_READ_BUFFER_SIZE = 4 * 1024 * 1024
37_ERROR_VALUE_SHOULD_BE_SEEKABLE_STREAM = '{0} should be a seekable file-like/io.IOBase type stream object.'
38
39
40def _convert_mod_error(error):
41    message = error.message.replace(
42        "The condition specified using HTTP conditional header(s) is not met.",
43        "The specified blob already exists.")
44    message = message.replace("ConditionNotMet", "BlobAlreadyExists")
45    overwrite_error = ResourceExistsError(
46        message=message,
47        response=error.response,
48        error=error)
49    overwrite_error.error_code = StorageErrorCode.blob_already_exists
50    raise overwrite_error
51
52
53def _any_conditions(modified_access_conditions=None, **kwargs):  # pylint: disable=unused-argument
54    return any([
55        modified_access_conditions.if_modified_since,
56        modified_access_conditions.if_unmodified_since,
57        modified_access_conditions.if_none_match,
58        modified_access_conditions.if_match
59    ])
60
61
62def upload_block_blob(  # pylint: disable=too-many-locals
63        client=None,
64        data=None,
65        stream=None,
66        length=None,
67        overwrite=None,
68        headers=None,
69        validate_content=None,
70        max_concurrency=None,
71        blob_settings=None,
72        encryption_options=None,
73        **kwargs):
74    try:
75        if not overwrite and not _any_conditions(**kwargs):
76            kwargs['modified_access_conditions'].if_none_match = '*'
77        adjusted_count = length
78        if (encryption_options.get('key') is not None) and (adjusted_count is not None):
79            adjusted_count += (16 - (length % 16))
80        blob_headers = kwargs.pop('blob_headers', None)
81        tier = kwargs.pop('standard_blob_tier', None)
82
83        # Do single put if the size is smaller than config.max_single_put_size
84        if adjusted_count is not None and (adjusted_count < blob_settings.max_single_put_size):
85            try:
86                data = data.read(length)
87                if not isinstance(data, six.binary_type):
88                    raise TypeError('Blob data should be of type bytes.')
89            except AttributeError:
90                pass
91            if encryption_options.get('key'):
92                encryption_data, data = encrypt_blob(data, encryption_options['key'])
93                headers['x-ms-meta-encryptiondata'] = encryption_data
94            return client.upload(
95                data,
96                content_length=adjusted_count,
97                blob_http_headers=blob_headers,
98                headers=headers,
99                cls=return_response_headers,
100                validate_content=validate_content,
101                data_stream_total=adjusted_count,
102                upload_stream_current=0,
103                tier=tier.value if tier else None,
104                **kwargs)
105
106        use_original_upload_path = blob_settings.use_byte_buffer or \
107            validate_content or encryption_options.get('required') or \
108            blob_settings.max_block_size < blob_settings.min_large_block_upload_threshold or \
109            hasattr(stream, 'seekable') and not stream.seekable() or \
110            not hasattr(stream, 'seek') or not hasattr(stream, 'tell')
111
112        if use_original_upload_path:
113            if encryption_options.get('key'):
114                cek, iv, encryption_data = generate_blob_encryption_data(encryption_options['key'])
115                headers['x-ms-meta-encryptiondata'] = encryption_data
116                encryption_options['cek'] = cek
117                encryption_options['vector'] = iv
118            block_ids = upload_data_chunks(
119                service=client,
120                uploader_class=BlockBlobChunkUploader,
121                total_size=length,
122                chunk_size=blob_settings.max_block_size,
123                max_concurrency=max_concurrency,
124                stream=stream,
125                validate_content=validate_content,
126                encryption_options=encryption_options,
127                **kwargs
128            )
129        else:
130            block_ids = upload_substream_blocks(
131                service=client,
132                uploader_class=BlockBlobChunkUploader,
133                total_size=length,
134                chunk_size=blob_settings.max_block_size,
135                max_concurrency=max_concurrency,
136                stream=stream,
137                validate_content=validate_content,
138                **kwargs
139            )
140
141        block_lookup = BlockLookupList(committed=[], uncommitted=[], latest=[])
142        block_lookup.latest = block_ids
143        return client.commit_block_list(
144            block_lookup,
145            blob_http_headers=blob_headers,
146            cls=return_response_headers,
147            validate_content=validate_content,
148            headers=headers,
149            tier=tier.value if tier else None,
150            **kwargs)
151    except StorageErrorException as error:
152        try:
153            process_storage_error(error)
154        except ResourceModifiedError as mod_error:
155            if not overwrite:
156                _convert_mod_error(mod_error)
157            raise
158
159
160def upload_page_blob(
161        client=None,
162        stream=None,
163        length=None,
164        overwrite=None,
165        headers=None,
166        validate_content=None,
167        max_concurrency=None,
168        blob_settings=None,
169        encryption_options=None,
170        **kwargs):
171    try:
172        if not overwrite and not _any_conditions(**kwargs):
173            kwargs['modified_access_conditions'].if_none_match = '*'
174        if length is None or length < 0:
175            raise ValueError("A content length must be specified for a Page Blob.")
176        if length % 512 != 0:
177            raise ValueError("Invalid page blob size: {0}. "
178                             "The size must be aligned to a 512-byte boundary.".format(length))
179        if kwargs.get('premium_page_blob_tier'):
180            premium_page_blob_tier = kwargs.pop('premium_page_blob_tier')
181            try:
182                headers['x-ms-access-tier'] = premium_page_blob_tier.value
183            except AttributeError:
184                headers['x-ms-access-tier'] = premium_page_blob_tier
185        if encryption_options and encryption_options.get('data'):
186            headers['x-ms-meta-encryptiondata'] = encryption_options['data']
187        response = client.create(
188            content_length=0,
189            blob_content_length=length,
190            blob_sequence_number=None,
191            blob_http_headers=kwargs.pop('blob_headers', None),
192            cls=return_response_headers,
193            headers=headers,
194            **kwargs)
195        if length == 0:
196            return response
197
198        kwargs['modified_access_conditions'] = ModifiedAccessConditions(if_match=response['etag'])
199        return upload_data_chunks(
200            service=client,
201            uploader_class=PageBlobChunkUploader,
202            total_size=length,
203            chunk_size=blob_settings.max_page_size,
204            stream=stream,
205            max_concurrency=max_concurrency,
206            validate_content=validate_content,
207            encryption_options=encryption_options,
208            **kwargs)
209
210    except StorageErrorException as error:
211        try:
212            process_storage_error(error)
213        except ResourceModifiedError as mod_error:
214            if not overwrite:
215                _convert_mod_error(mod_error)
216            raise
217
218
219def upload_append_blob(  # pylint: disable=unused-argument
220        client=None,
221        stream=None,
222        length=None,
223        overwrite=None,
224        headers=None,
225        validate_content=None,
226        max_concurrency=None,
227        blob_settings=None,
228        encryption_options=None,
229        **kwargs):
230    try:
231        if length == 0:
232            return {}
233        blob_headers = kwargs.pop('blob_headers', None)
234        append_conditions = AppendPositionAccessConditions(
235            max_size=kwargs.pop('maxsize_condition', None),
236            append_position=None)
237        try:
238            if overwrite:
239                client.create(
240                    content_length=0,
241                    blob_http_headers=blob_headers,
242                    headers=headers,
243                    **kwargs)
244            return upload_data_chunks(
245                service=client,
246                uploader_class=AppendBlobChunkUploader,
247                total_size=length,
248                chunk_size=blob_settings.max_block_size,
249                stream=stream,
250                max_concurrency=max_concurrency,
251                validate_content=validate_content,
252                append_position_access_conditions=append_conditions,
253                **kwargs)
254        except StorageErrorException as error:
255            if error.response.status_code != 404:
256                raise
257            # rewind the request body if it is a stream
258            if hasattr(stream, 'read'):
259                try:
260                    # attempt to rewind the body to the initial position
261                    stream.seek(0, SEEK_SET)
262                except UnsupportedOperation:
263                    # if body is not seekable, then retry would not work
264                    raise error
265            client.create(
266                content_length=0,
267                blob_http_headers=blob_headers,
268                headers=headers,
269                **kwargs)
270            return upload_data_chunks(
271                service=client,
272                uploader_class=AppendBlobChunkUploader,
273                total_size=length,
274                chunk_size=blob_settings.max_block_size,
275                stream=stream,
276                max_concurrency=max_concurrency,
277                validate_content=validate_content,
278                append_position_access_conditions=append_conditions,
279                **kwargs)
280    except StorageErrorException as error:
281        process_storage_error(error)
282