1# ------------------------------------------------------------------------- 2# Copyright (c) Microsoft Corporation. All rights reserved. 3# Licensed under the MIT License. See License.txt in the project root for 4# license information. 5# -------------------------------------------------------------------------- 6# pylint: disable=no-self-use 7 8from io import SEEK_SET, UnsupportedOperation 9from typing import Optional, Union, Any, TypeVar, TYPE_CHECKING # pylint: disable=unused-import 10 11import six 12from azure.core.exceptions import ResourceModifiedError 13 14from .._shared.response_handlers import ( 15 process_storage_error, 16 return_response_headers) 17from .._shared.uploads_async import ( 18 upload_data_chunks, 19 upload_substream_blocks, 20 BlockBlobChunkUploader, 21 PageBlobChunkUploader, 22 AppendBlobChunkUploader) 23from .._shared.encryption import generate_blob_encryption_data, encrypt_blob 24from .._generated.models import ( 25 StorageErrorException, 26 BlockLookupList, 27 AppendPositionAccessConditions, 28 ModifiedAccessConditions, 29) 30from .._upload_helpers import _convert_mod_error, _any_conditions 31 32if TYPE_CHECKING: 33 from datetime import datetime # pylint: disable=unused-import 34 BlobLeaseClient = TypeVar("BlobLeaseClient") 35 36 37async def upload_block_blob( # pylint: disable=too-many-locals 38 client=None, 39 data=None, 40 stream=None, 41 length=None, 42 overwrite=None, 43 headers=None, 44 validate_content=None, 45 max_concurrency=None, 46 blob_settings=None, 47 encryption_options=None, 48 **kwargs): 49 try: 50 if not overwrite and not _any_conditions(**kwargs): 51 kwargs['modified_access_conditions'].if_none_match = '*' 52 adjusted_count = length 53 if (encryption_options.get('key') is not None) and (adjusted_count is not None): 54 adjusted_count += (16 - (length % 16)) 55 blob_headers = kwargs.pop('blob_headers', None) 56 tier = kwargs.pop('standard_blob_tier', None) 57 58 # Do single put if the size is smaller than config.max_single_put_size 59 if adjusted_count is not None and (adjusted_count < blob_settings.max_single_put_size): 60 try: 61 data = data.read(length) 62 if not isinstance(data, six.binary_type): 63 raise TypeError('Blob data should be of type bytes.') 64 except AttributeError: 65 pass 66 if encryption_options.get('key'): 67 encryption_data, data = encrypt_blob(data, encryption_options['key']) 68 headers['x-ms-meta-encryptiondata'] = encryption_data 69 return await client.upload( 70 data, 71 content_length=adjusted_count, 72 blob_http_headers=blob_headers, 73 headers=headers, 74 cls=return_response_headers, 75 validate_content=validate_content, 76 data_stream_total=adjusted_count, 77 upload_stream_current=0, 78 tier=tier.value if tier else None, 79 **kwargs) 80 81 use_original_upload_path = blob_settings.use_byte_buffer or \ 82 validate_content or encryption_options.get('required') or \ 83 blob_settings.max_block_size < blob_settings.min_large_block_upload_threshold or \ 84 hasattr(stream, 'seekable') and not stream.seekable() or \ 85 not hasattr(stream, 'seek') or not hasattr(stream, 'tell') 86 87 if use_original_upload_path: 88 if encryption_options.get('key'): 89 cek, iv, encryption_data = generate_blob_encryption_data(encryption_options['key']) 90 headers['x-ms-meta-encryptiondata'] = encryption_data 91 encryption_options['cek'] = cek 92 encryption_options['vector'] = iv 93 block_ids = await upload_data_chunks( 94 service=client, 95 uploader_class=BlockBlobChunkUploader, 96 total_size=length, 97 chunk_size=blob_settings.max_block_size, 98 max_concurrency=max_concurrency, 99 stream=stream, 100 validate_content=validate_content, 101 encryption_options=encryption_options, 102 **kwargs 103 ) 104 else: 105 block_ids = await upload_substream_blocks( 106 service=client, 107 uploader_class=BlockBlobChunkUploader, 108 total_size=length, 109 chunk_size=blob_settings.max_block_size, 110 max_concurrency=max_concurrency, 111 stream=stream, 112 validate_content=validate_content, 113 **kwargs 114 ) 115 116 block_lookup = BlockLookupList(committed=[], uncommitted=[], latest=[]) 117 block_lookup.latest = block_ids 118 return await client.commit_block_list( 119 block_lookup, 120 blob_http_headers=blob_headers, 121 cls=return_response_headers, 122 validate_content=validate_content, 123 headers=headers, 124 tier=tier.value if tier else None, 125 **kwargs) 126 except StorageErrorException as error: 127 try: 128 process_storage_error(error) 129 except ResourceModifiedError as mod_error: 130 if not overwrite: 131 _convert_mod_error(mod_error) 132 raise 133 134 135async def upload_page_blob( 136 client=None, 137 stream=None, 138 length=None, 139 overwrite=None, 140 headers=None, 141 validate_content=None, 142 max_concurrency=None, 143 blob_settings=None, 144 encryption_options=None, 145 **kwargs): 146 try: 147 if not overwrite and not _any_conditions(**kwargs): 148 kwargs['modified_access_conditions'].if_none_match = '*' 149 if length is None or length < 0: 150 raise ValueError("A content length must be specified for a Page Blob.") 151 if length % 512 != 0: 152 raise ValueError("Invalid page blob size: {0}. " 153 "The size must be aligned to a 512-byte boundary.".format(length)) 154 if kwargs.get('premium_page_blob_tier'): 155 premium_page_blob_tier = kwargs.pop('premium_page_blob_tier') 156 try: 157 headers['x-ms-access-tier'] = premium_page_blob_tier.value 158 except AttributeError: 159 headers['x-ms-access-tier'] = premium_page_blob_tier 160 if encryption_options and encryption_options.get('data'): 161 headers['x-ms-meta-encryptiondata'] = encryption_options['data'] 162 response = await client.create( 163 content_length=0, 164 blob_content_length=length, 165 blob_sequence_number=None, 166 blob_http_headers=kwargs.pop('blob_headers', None), 167 cls=return_response_headers, 168 headers=headers, 169 **kwargs) 170 if length == 0: 171 return response 172 173 kwargs['modified_access_conditions'] = ModifiedAccessConditions(if_match=response['etag']) 174 return await upload_data_chunks( 175 service=client, 176 uploader_class=PageBlobChunkUploader, 177 total_size=length, 178 chunk_size=blob_settings.max_page_size, 179 stream=stream, 180 max_concurrency=max_concurrency, 181 validate_content=validate_content, 182 encryption_options=encryption_options, 183 **kwargs) 184 185 except StorageErrorException as error: 186 try: 187 process_storage_error(error) 188 except ResourceModifiedError as mod_error: 189 if not overwrite: 190 _convert_mod_error(mod_error) 191 raise 192 193 194async def upload_append_blob( # pylint: disable=unused-argument 195 client=None, 196 stream=None, 197 length=None, 198 overwrite=None, 199 headers=None, 200 validate_content=None, 201 max_concurrency=None, 202 blob_settings=None, 203 encryption_options=None, 204 **kwargs): 205 try: 206 if length == 0: 207 return {} 208 blob_headers = kwargs.pop('blob_headers', None) 209 append_conditions = AppendPositionAccessConditions( 210 max_size=kwargs.pop('maxsize_condition', None), 211 append_position=None) 212 try: 213 if overwrite: 214 await client.create( 215 content_length=0, 216 blob_http_headers=blob_headers, 217 headers=headers, 218 **kwargs) 219 return await upload_data_chunks( 220 service=client, 221 uploader_class=AppendBlobChunkUploader, 222 total_size=length, 223 chunk_size=blob_settings.max_block_size, 224 stream=stream, 225 max_concurrency=max_concurrency, 226 validate_content=validate_content, 227 append_position_access_conditions=append_conditions, 228 **kwargs) 229 except StorageErrorException as error: 230 if error.response.status_code != 404: 231 raise 232 # rewind the request body if it is a stream 233 if hasattr(stream, 'read'): 234 try: 235 # attempt to rewind the body to the initial position 236 stream.seek(0, SEEK_SET) 237 except UnsupportedOperation: 238 # if body is not seekable, then retry would not work 239 raise error 240 await client.create( 241 content_length=0, 242 blob_http_headers=blob_headers, 243 headers=headers, 244 **kwargs) 245 return await upload_data_chunks( 246 service=client, 247 uploader_class=AppendBlobChunkUploader, 248 total_size=length, 249 chunk_size=blob_settings.max_block_size, 250 stream=stream, 251 max_concurrency=max_concurrency, 252 validate_content=validate_content, 253 append_position_access_conditions=append_conditions, 254 **kwargs) 255 except StorageErrorException as error: 256 process_storage_error(error) 257