1# -------------------------------------------------------------------------
2# Copyright (c) Microsoft Corporation. All rights reserved.
3# Licensed under the MIT License. See License.txt in the project root for
4# license information.
5# --------------------------------------------------------------------------
6# pylint: disable=no-self-use
7
8from io import SEEK_SET, UnsupportedOperation
9from typing import Optional, Union, Any, TypeVar, TYPE_CHECKING # pylint: disable=unused-import
10
11import six
12from azure.core.exceptions import ResourceModifiedError
13
14from .._shared.response_handlers import (
15    process_storage_error,
16    return_response_headers)
17from .._shared.uploads_async import (
18    upload_data_chunks,
19    upload_substream_blocks,
20    BlockBlobChunkUploader,
21    PageBlobChunkUploader,
22    AppendBlobChunkUploader)
23from .._shared.encryption import generate_blob_encryption_data, encrypt_blob
24from .._generated.models import (
25    StorageErrorException,
26    BlockLookupList,
27    AppendPositionAccessConditions,
28    ModifiedAccessConditions,
29)
30from .._upload_helpers import _convert_mod_error, _any_conditions
31
32if TYPE_CHECKING:
33    from datetime import datetime # pylint: disable=unused-import
34    BlobLeaseClient = TypeVar("BlobLeaseClient")
35
36
37async def upload_block_blob(  # pylint: disable=too-many-locals
38        client=None,
39        data=None,
40        stream=None,
41        length=None,
42        overwrite=None,
43        headers=None,
44        validate_content=None,
45        max_concurrency=None,
46        blob_settings=None,
47        encryption_options=None,
48        **kwargs):
49    try:
50        if not overwrite and not _any_conditions(**kwargs):
51            kwargs['modified_access_conditions'].if_none_match = '*'
52        adjusted_count = length
53        if (encryption_options.get('key') is not None) and (adjusted_count is not None):
54            adjusted_count += (16 - (length % 16))
55        blob_headers = kwargs.pop('blob_headers', None)
56        tier = kwargs.pop('standard_blob_tier', None)
57
58        # Do single put if the size is smaller than config.max_single_put_size
59        if adjusted_count is not None and (adjusted_count < blob_settings.max_single_put_size):
60            try:
61                data = data.read(length)
62                if not isinstance(data, six.binary_type):
63                    raise TypeError('Blob data should be of type bytes.')
64            except AttributeError:
65                pass
66            if encryption_options.get('key'):
67                encryption_data, data = encrypt_blob(data, encryption_options['key'])
68                headers['x-ms-meta-encryptiondata'] = encryption_data
69            return await client.upload(
70                data,
71                content_length=adjusted_count,
72                blob_http_headers=blob_headers,
73                headers=headers,
74                cls=return_response_headers,
75                validate_content=validate_content,
76                data_stream_total=adjusted_count,
77                upload_stream_current=0,
78                tier=tier.value if tier else None,
79                **kwargs)
80
81        use_original_upload_path = blob_settings.use_byte_buffer or \
82            validate_content or encryption_options.get('required') or \
83            blob_settings.max_block_size < blob_settings.min_large_block_upload_threshold or \
84            hasattr(stream, 'seekable') and not stream.seekable() or \
85            not hasattr(stream, 'seek') or not hasattr(stream, 'tell')
86
87        if use_original_upload_path:
88            if encryption_options.get('key'):
89                cek, iv, encryption_data = generate_blob_encryption_data(encryption_options['key'])
90                headers['x-ms-meta-encryptiondata'] = encryption_data
91                encryption_options['cek'] = cek
92                encryption_options['vector'] = iv
93            block_ids = await upload_data_chunks(
94                service=client,
95                uploader_class=BlockBlobChunkUploader,
96                total_size=length,
97                chunk_size=blob_settings.max_block_size,
98                max_concurrency=max_concurrency,
99                stream=stream,
100                validate_content=validate_content,
101                encryption_options=encryption_options,
102                **kwargs
103            )
104        else:
105            block_ids = await upload_substream_blocks(
106                service=client,
107                uploader_class=BlockBlobChunkUploader,
108                total_size=length,
109                chunk_size=blob_settings.max_block_size,
110                max_concurrency=max_concurrency,
111                stream=stream,
112                validate_content=validate_content,
113                **kwargs
114            )
115
116        block_lookup = BlockLookupList(committed=[], uncommitted=[], latest=[])
117        block_lookup.latest = block_ids
118        return await client.commit_block_list(
119            block_lookup,
120            blob_http_headers=blob_headers,
121            cls=return_response_headers,
122            validate_content=validate_content,
123            headers=headers,
124            tier=tier.value if tier else None,
125            **kwargs)
126    except StorageErrorException as error:
127        try:
128            process_storage_error(error)
129        except ResourceModifiedError as mod_error:
130            if not overwrite:
131                _convert_mod_error(mod_error)
132            raise
133
134
135async def upload_page_blob(
136        client=None,
137        stream=None,
138        length=None,
139        overwrite=None,
140        headers=None,
141        validate_content=None,
142        max_concurrency=None,
143        blob_settings=None,
144        encryption_options=None,
145        **kwargs):
146    try:
147        if not overwrite and not _any_conditions(**kwargs):
148            kwargs['modified_access_conditions'].if_none_match = '*'
149        if length is None or length < 0:
150            raise ValueError("A content length must be specified for a Page Blob.")
151        if length % 512 != 0:
152            raise ValueError("Invalid page blob size: {0}. "
153                             "The size must be aligned to a 512-byte boundary.".format(length))
154        if kwargs.get('premium_page_blob_tier'):
155            premium_page_blob_tier = kwargs.pop('premium_page_blob_tier')
156            try:
157                headers['x-ms-access-tier'] = premium_page_blob_tier.value
158            except AttributeError:
159                headers['x-ms-access-tier'] = premium_page_blob_tier
160        if encryption_options and encryption_options.get('data'):
161            headers['x-ms-meta-encryptiondata'] = encryption_options['data']
162        response = await client.create(
163            content_length=0,
164            blob_content_length=length,
165            blob_sequence_number=None,
166            blob_http_headers=kwargs.pop('blob_headers', None),
167            cls=return_response_headers,
168            headers=headers,
169            **kwargs)
170        if length == 0:
171            return response
172
173        kwargs['modified_access_conditions'] = ModifiedAccessConditions(if_match=response['etag'])
174        return await upload_data_chunks(
175            service=client,
176            uploader_class=PageBlobChunkUploader,
177            total_size=length,
178            chunk_size=blob_settings.max_page_size,
179            stream=stream,
180            max_concurrency=max_concurrency,
181            validate_content=validate_content,
182            encryption_options=encryption_options,
183            **kwargs)
184
185    except StorageErrorException as error:
186        try:
187            process_storage_error(error)
188        except ResourceModifiedError as mod_error:
189            if not overwrite:
190                _convert_mod_error(mod_error)
191            raise
192
193
194async def upload_append_blob(  # pylint: disable=unused-argument
195        client=None,
196        stream=None,
197        length=None,
198        overwrite=None,
199        headers=None,
200        validate_content=None,
201        max_concurrency=None,
202        blob_settings=None,
203        encryption_options=None,
204        **kwargs):
205    try:
206        if length == 0:
207            return {}
208        blob_headers = kwargs.pop('blob_headers', None)
209        append_conditions = AppendPositionAccessConditions(
210            max_size=kwargs.pop('maxsize_condition', None),
211            append_position=None)
212        try:
213            if overwrite:
214                await client.create(
215                    content_length=0,
216                    blob_http_headers=blob_headers,
217                    headers=headers,
218                    **kwargs)
219            return await upload_data_chunks(
220                service=client,
221                uploader_class=AppendBlobChunkUploader,
222                total_size=length,
223                chunk_size=blob_settings.max_block_size,
224                stream=stream,
225                max_concurrency=max_concurrency,
226                validate_content=validate_content,
227                append_position_access_conditions=append_conditions,
228                **kwargs)
229        except StorageErrorException as error:
230            if error.response.status_code != 404:
231                raise
232            # rewind the request body if it is a stream
233            if hasattr(stream, 'read'):
234                try:
235                    # attempt to rewind the body to the initial position
236                    stream.seek(0, SEEK_SET)
237                except UnsupportedOperation:
238                    # if body is not seekable, then retry would not work
239                    raise error
240            await client.create(
241                content_length=0,
242                blob_http_headers=blob_headers,
243                headers=headers,
244                **kwargs)
245            return await upload_data_chunks(
246                service=client,
247                uploader_class=AppendBlobChunkUploader,
248                total_size=length,
249                chunk_size=blob_settings.max_block_size,
250                stream=stream,
251                max_concurrency=max_concurrency,
252                validate_content=validate_content,
253                append_position_access_conditions=append_conditions,
254                **kwargs)
255    except StorageErrorException as error:
256        process_storage_error(error)
257