1# -------------------------------------------------------------------------
2# Copyright (c) Microsoft Corporation. All rights reserved.
3# Licensed under the MIT License. See License.txt in the project root for
4# license information.
5# --------------------------------------------------------------------------
6# pylint: disable=no-self-use
7
8from io import SEEK_SET, UnsupportedOperation
9from typing import Optional, Union, Any, TypeVar, TYPE_CHECKING # pylint: disable=unused-import
10
11import six
12from azure.core.exceptions import ResourceExistsError, ResourceModifiedError
13
14from ._shared.response_handlers import (
15    process_storage_error,
16    return_response_headers)
17from ._shared.models import StorageErrorCode
18from ._shared.uploads import (
19    upload_data_chunks,
20    upload_substream_blocks,
21    BlockBlobChunkUploader,
22    PageBlobChunkUploader,
23    AppendBlobChunkUploader)
24from ._shared.encryption import generate_blob_encryption_data, encrypt_blob
25from ._generated.models import (
26    StorageErrorException,
27    BlockLookupList,
28    AppendPositionAccessConditions,
29    ModifiedAccessConditions,
30)
31
32if TYPE_CHECKING:
33    from datetime import datetime # pylint: disable=unused-import
34    BlobLeaseClient = TypeVar("BlobLeaseClient")
35
36_LARGE_BLOB_UPLOAD_MAX_READ_BUFFER_SIZE = 4 * 1024 * 1024
37_ERROR_VALUE_SHOULD_BE_SEEKABLE_STREAM = '{0} should be a seekable file-like/io.IOBase type stream object.'
38
39
40def _convert_mod_error(error):
41    message = error.message.replace(
42        "The condition specified using HTTP conditional header(s) is not met.",
43        "The specified blob already exists.")
44    message = message.replace("ConditionNotMet", "BlobAlreadyExists")
45    overwrite_error = ResourceExistsError(
46        message=message,
47        response=error.response,
48        error=error)
49    overwrite_error.error_code = StorageErrorCode.blob_already_exists
50    raise overwrite_error
51
52
53def _any_conditions(modified_access_conditions=None, **kwargs):  # pylint: disable=unused-argument
54    return any([
55        modified_access_conditions.if_modified_since,
56        modified_access_conditions.if_unmodified_since,
57        modified_access_conditions.if_none_match,
58        modified_access_conditions.if_match
59    ])
60
61
62def upload_block_blob(  # pylint: disable=too-many-locals
63        client=None,
64        data=None,
65        stream=None,
66        length=None,
67        overwrite=None,
68        headers=None,
69        validate_content=None,
70        max_concurrency=None,
71        blob_settings=None,
72        encryption_options=None,
73        **kwargs):
74    try:
75        if not overwrite and not _any_conditions(**kwargs):
76            kwargs['modified_access_conditions'].if_none_match = '*'
77        adjusted_count = length
78        if (encryption_options.get('key') is not None) and (adjusted_count is not None):
79            adjusted_count += (16 - (length % 16))
80        blob_headers = kwargs.pop('blob_headers', None)
81        tier = kwargs.pop('standard_blob_tier', None)
82        blob_tags_string = kwargs.pop('blob_tags_string', None)
83
84        # Do single put if the size is smaller than or equal config.max_single_put_size
85        if adjusted_count is not None and (adjusted_count <= blob_settings.max_single_put_size):
86            try:
87                data = data.read(length)
88                if not isinstance(data, six.binary_type):
89                    raise TypeError('Blob data should be of type bytes.')
90            except AttributeError:
91                pass
92            if encryption_options.get('key'):
93                encryption_data, data = encrypt_blob(data, encryption_options['key'])
94                headers['x-ms-meta-encryptiondata'] = encryption_data
95            return client.upload(
96                data,
97                content_length=adjusted_count,
98                blob_http_headers=blob_headers,
99                headers=headers,
100                cls=return_response_headers,
101                validate_content=validate_content,
102                data_stream_total=adjusted_count,
103                upload_stream_current=0,
104                tier=tier.value if tier else None,
105                blob_tags_string=blob_tags_string,
106                **kwargs)
107
108        use_original_upload_path = blob_settings.use_byte_buffer or \
109            validate_content or encryption_options.get('required') or \
110            blob_settings.max_block_size < blob_settings.min_large_block_upload_threshold or \
111            hasattr(stream, 'seekable') and not stream.seekable() or \
112            not hasattr(stream, 'seek') or not hasattr(stream, 'tell')
113
114        if use_original_upload_path:
115            if encryption_options.get('key'):
116                cek, iv, encryption_data = generate_blob_encryption_data(encryption_options['key'])
117                headers['x-ms-meta-encryptiondata'] = encryption_data
118                encryption_options['cek'] = cek
119                encryption_options['vector'] = iv
120            block_ids = upload_data_chunks(
121                service=client,
122                uploader_class=BlockBlobChunkUploader,
123                total_size=length,
124                chunk_size=blob_settings.max_block_size,
125                max_concurrency=max_concurrency,
126                stream=stream,
127                validate_content=validate_content,
128                encryption_options=encryption_options,
129                **kwargs
130            )
131        else:
132            block_ids = upload_substream_blocks(
133                service=client,
134                uploader_class=BlockBlobChunkUploader,
135                total_size=length,
136                chunk_size=blob_settings.max_block_size,
137                max_concurrency=max_concurrency,
138                stream=stream,
139                validate_content=validate_content,
140                **kwargs
141            )
142
143        block_lookup = BlockLookupList(committed=[], uncommitted=[], latest=[])
144        block_lookup.latest = block_ids
145        return client.commit_block_list(
146            block_lookup,
147            blob_http_headers=blob_headers,
148            cls=return_response_headers,
149            validate_content=validate_content,
150            headers=headers,
151            tier=tier.value if tier else None,
152            blob_tags_string=blob_tags_string,
153            **kwargs)
154    except StorageErrorException as error:
155        try:
156            process_storage_error(error)
157        except ResourceModifiedError as mod_error:
158            if not overwrite:
159                _convert_mod_error(mod_error)
160            raise
161
162
163def upload_page_blob(
164        client=None,
165        stream=None,
166        length=None,
167        overwrite=None,
168        headers=None,
169        validate_content=None,
170        max_concurrency=None,
171        blob_settings=None,
172        encryption_options=None,
173        **kwargs):
174    try:
175        if not overwrite and not _any_conditions(**kwargs):
176            kwargs['modified_access_conditions'].if_none_match = '*'
177        if length is None or length < 0:
178            raise ValueError("A content length must be specified for a Page Blob.")
179        if length % 512 != 0:
180            raise ValueError("Invalid page blob size: {0}. "
181                             "The size must be aligned to a 512-byte boundary.".format(length))
182        if kwargs.get('premium_page_blob_tier'):
183            premium_page_blob_tier = kwargs.pop('premium_page_blob_tier')
184            try:
185                headers['x-ms-access-tier'] = premium_page_blob_tier.value
186            except AttributeError:
187                headers['x-ms-access-tier'] = premium_page_blob_tier
188        if encryption_options and encryption_options.get('data'):
189            headers['x-ms-meta-encryptiondata'] = encryption_options['data']
190        blob_tags_string = kwargs.pop('blob_tags_string', None)
191
192        response = client.create(
193            content_length=0,
194            blob_content_length=length,
195            blob_sequence_number=None,
196            blob_http_headers=kwargs.pop('blob_headers', None),
197            blob_tags_string=blob_tags_string,
198            cls=return_response_headers,
199            headers=headers,
200            **kwargs)
201        if length == 0:
202            return response
203
204        kwargs['modified_access_conditions'] = ModifiedAccessConditions(if_match=response['etag'])
205        return upload_data_chunks(
206            service=client,
207            uploader_class=PageBlobChunkUploader,
208            total_size=length,
209            chunk_size=blob_settings.max_page_size,
210            stream=stream,
211            max_concurrency=max_concurrency,
212            validate_content=validate_content,
213            encryption_options=encryption_options,
214            **kwargs)
215
216    except StorageErrorException as error:
217        try:
218            process_storage_error(error)
219        except ResourceModifiedError as mod_error:
220            if not overwrite:
221                _convert_mod_error(mod_error)
222            raise
223
224
225def upload_append_blob(  # pylint: disable=unused-argument
226        client=None,
227        stream=None,
228        length=None,
229        overwrite=None,
230        headers=None,
231        validate_content=None,
232        max_concurrency=None,
233        blob_settings=None,
234        encryption_options=None,
235        **kwargs):
236    try:
237        if length == 0:
238            return {}
239        blob_headers = kwargs.pop('blob_headers', None)
240        append_conditions = AppendPositionAccessConditions(
241            max_size=kwargs.pop('maxsize_condition', None),
242            append_position=None)
243        blob_tags_string = kwargs.pop('blob_tags_string', None)
244
245        try:
246            if overwrite:
247                client.create(
248                    content_length=0,
249                    blob_http_headers=blob_headers,
250                    headers=headers,
251                    blob_tags_string=blob_tags_string,
252                    **kwargs)
253            return upload_data_chunks(
254                service=client,
255                uploader_class=AppendBlobChunkUploader,
256                total_size=length,
257                chunk_size=blob_settings.max_block_size,
258                stream=stream,
259                max_concurrency=max_concurrency,
260                validate_content=validate_content,
261                append_position_access_conditions=append_conditions,
262                **kwargs)
263        except StorageErrorException as error:
264            if error.response.status_code != 404:
265                raise
266            # rewind the request body if it is a stream
267            if hasattr(stream, 'read'):
268                try:
269                    # attempt to rewind the body to the initial position
270                    stream.seek(0, SEEK_SET)
271                except UnsupportedOperation:
272                    # if body is not seekable, then retry would not work
273                    raise error
274            client.create(
275                content_length=0,
276                blob_http_headers=blob_headers,
277                headers=headers,
278                blob_tags_string=blob_tags_string,
279                **kwargs)
280            return upload_data_chunks(
281                service=client,
282                uploader_class=AppendBlobChunkUploader,
283                total_size=length,
284                chunk_size=blob_settings.max_block_size,
285                stream=stream,
286                max_concurrency=max_concurrency,
287                validate_content=validate_content,
288                append_position_access_conditions=append_conditions,
289                **kwargs)
290    except StorageErrorException as error:
291        process_storage_error(error)
292