1# ------------------------------------------------------------------------- 2# Copyright (c) Microsoft Corporation. All rights reserved. 3# Licensed under the MIT License. See License.txt in the project root for 4# license information. 5# -------------------------------------------------------------------------- 6# pylint: disable=no-self-use 7 8from io import SEEK_SET, UnsupportedOperation 9from typing import Optional, Union, Any, TypeVar, TYPE_CHECKING # pylint: disable=unused-import 10 11import six 12from azure.core.exceptions import ResourceExistsError, ResourceModifiedError 13 14from ._shared.response_handlers import ( 15 process_storage_error, 16 return_response_headers) 17from ._shared.models import StorageErrorCode 18from ._shared.uploads import ( 19 upload_data_chunks, 20 upload_substream_blocks, 21 BlockBlobChunkUploader, 22 PageBlobChunkUploader, 23 AppendBlobChunkUploader) 24from ._shared.encryption import generate_blob_encryption_data, encrypt_blob 25from ._generated.models import ( 26 StorageErrorException, 27 BlockLookupList, 28 AppendPositionAccessConditions, 29 ModifiedAccessConditions, 30) 31 32if TYPE_CHECKING: 33 from datetime import datetime # pylint: disable=unused-import 34 BlobLeaseClient = TypeVar("BlobLeaseClient") 35 36_LARGE_BLOB_UPLOAD_MAX_READ_BUFFER_SIZE = 4 * 1024 * 1024 37_ERROR_VALUE_SHOULD_BE_SEEKABLE_STREAM = '{0} should be a seekable file-like/io.IOBase type stream object.' 38 39 40def _convert_mod_error(error): 41 message = error.message.replace( 42 "The condition specified using HTTP conditional header(s) is not met.", 43 "The specified blob already exists.") 44 message = message.replace("ConditionNotMet", "BlobAlreadyExists") 45 overwrite_error = ResourceExistsError( 46 message=message, 47 response=error.response, 48 error=error) 49 overwrite_error.error_code = StorageErrorCode.blob_already_exists 50 raise overwrite_error 51 52 53def _any_conditions(modified_access_conditions=None, **kwargs): # pylint: disable=unused-argument 54 return any([ 55 modified_access_conditions.if_modified_since, 56 modified_access_conditions.if_unmodified_since, 57 modified_access_conditions.if_none_match, 58 modified_access_conditions.if_match 59 ]) 60 61 62def upload_block_blob( # pylint: disable=too-many-locals 63 client=None, 64 data=None, 65 stream=None, 66 length=None, 67 overwrite=None, 68 headers=None, 69 validate_content=None, 70 max_concurrency=None, 71 blob_settings=None, 72 encryption_options=None, 73 **kwargs): 74 try: 75 if not overwrite and not _any_conditions(**kwargs): 76 kwargs['modified_access_conditions'].if_none_match = '*' 77 adjusted_count = length 78 if (encryption_options.get('key') is not None) and (adjusted_count is not None): 79 adjusted_count += (16 - (length % 16)) 80 blob_headers = kwargs.pop('blob_headers', None) 81 tier = kwargs.pop('standard_blob_tier', None) 82 83 # Do single put if the size is smaller than config.max_single_put_size 84 if adjusted_count is not None and (adjusted_count < blob_settings.max_single_put_size): 85 try: 86 data = data.read(length) 87 if not isinstance(data, six.binary_type): 88 raise TypeError('Blob data should be of type bytes.') 89 except AttributeError: 90 pass 91 if encryption_options.get('key'): 92 encryption_data, data = encrypt_blob(data, encryption_options['key']) 93 headers['x-ms-meta-encryptiondata'] = encryption_data 94 return client.upload( 95 data, 96 content_length=adjusted_count, 97 blob_http_headers=blob_headers, 98 headers=headers, 99 cls=return_response_headers, 100 validate_content=validate_content, 101 data_stream_total=adjusted_count, 102 upload_stream_current=0, 103 tier=tier.value if tier else None, 104 **kwargs) 105 106 use_original_upload_path = blob_settings.use_byte_buffer or \ 107 validate_content or encryption_options.get('required') or \ 108 blob_settings.max_block_size < blob_settings.min_large_block_upload_threshold or \ 109 hasattr(stream, 'seekable') and not stream.seekable() or \ 110 not hasattr(stream, 'seek') or not hasattr(stream, 'tell') 111 112 if use_original_upload_path: 113 if encryption_options.get('key'): 114 cek, iv, encryption_data = generate_blob_encryption_data(encryption_options['key']) 115 headers['x-ms-meta-encryptiondata'] = encryption_data 116 encryption_options['cek'] = cek 117 encryption_options['vector'] = iv 118 block_ids = upload_data_chunks( 119 service=client, 120 uploader_class=BlockBlobChunkUploader, 121 total_size=length, 122 chunk_size=blob_settings.max_block_size, 123 max_concurrency=max_concurrency, 124 stream=stream, 125 validate_content=validate_content, 126 encryption_options=encryption_options, 127 **kwargs 128 ) 129 else: 130 block_ids = upload_substream_blocks( 131 service=client, 132 uploader_class=BlockBlobChunkUploader, 133 total_size=length, 134 chunk_size=blob_settings.max_block_size, 135 max_concurrency=max_concurrency, 136 stream=stream, 137 validate_content=validate_content, 138 **kwargs 139 ) 140 141 block_lookup = BlockLookupList(committed=[], uncommitted=[], latest=[]) 142 block_lookup.latest = block_ids 143 return client.commit_block_list( 144 block_lookup, 145 blob_http_headers=blob_headers, 146 cls=return_response_headers, 147 validate_content=validate_content, 148 headers=headers, 149 tier=tier.value if tier else None, 150 **kwargs) 151 except StorageErrorException as error: 152 try: 153 process_storage_error(error) 154 except ResourceModifiedError as mod_error: 155 if not overwrite: 156 _convert_mod_error(mod_error) 157 raise 158 159 160def upload_page_blob( 161 client=None, 162 stream=None, 163 length=None, 164 overwrite=None, 165 headers=None, 166 validate_content=None, 167 max_concurrency=None, 168 blob_settings=None, 169 encryption_options=None, 170 **kwargs): 171 try: 172 if not overwrite and not _any_conditions(**kwargs): 173 kwargs['modified_access_conditions'].if_none_match = '*' 174 if length is None or length < 0: 175 raise ValueError("A content length must be specified for a Page Blob.") 176 if length % 512 != 0: 177 raise ValueError("Invalid page blob size: {0}. " 178 "The size must be aligned to a 512-byte boundary.".format(length)) 179 if kwargs.get('premium_page_blob_tier'): 180 premium_page_blob_tier = kwargs.pop('premium_page_blob_tier') 181 try: 182 headers['x-ms-access-tier'] = premium_page_blob_tier.value 183 except AttributeError: 184 headers['x-ms-access-tier'] = premium_page_blob_tier 185 if encryption_options and encryption_options.get('data'): 186 headers['x-ms-meta-encryptiondata'] = encryption_options['data'] 187 response = client.create( 188 content_length=0, 189 blob_content_length=length, 190 blob_sequence_number=None, 191 blob_http_headers=kwargs.pop('blob_headers', None), 192 cls=return_response_headers, 193 headers=headers, 194 **kwargs) 195 if length == 0: 196 return response 197 198 kwargs['modified_access_conditions'] = ModifiedAccessConditions(if_match=response['etag']) 199 return upload_data_chunks( 200 service=client, 201 uploader_class=PageBlobChunkUploader, 202 total_size=length, 203 chunk_size=blob_settings.max_page_size, 204 stream=stream, 205 max_concurrency=max_concurrency, 206 validate_content=validate_content, 207 encryption_options=encryption_options, 208 **kwargs) 209 210 except StorageErrorException as error: 211 try: 212 process_storage_error(error) 213 except ResourceModifiedError as mod_error: 214 if not overwrite: 215 _convert_mod_error(mod_error) 216 raise 217 218 219def upload_append_blob( # pylint: disable=unused-argument 220 client=None, 221 stream=None, 222 length=None, 223 overwrite=None, 224 headers=None, 225 validate_content=None, 226 max_concurrency=None, 227 blob_settings=None, 228 encryption_options=None, 229 **kwargs): 230 try: 231 if length == 0: 232 return {} 233 blob_headers = kwargs.pop('blob_headers', None) 234 append_conditions = AppendPositionAccessConditions( 235 max_size=kwargs.pop('maxsize_condition', None), 236 append_position=None) 237 try: 238 if overwrite: 239 client.create( 240 content_length=0, 241 blob_http_headers=blob_headers, 242 headers=headers, 243 **kwargs) 244 return upload_data_chunks( 245 service=client, 246 uploader_class=AppendBlobChunkUploader, 247 total_size=length, 248 chunk_size=blob_settings.max_block_size, 249 stream=stream, 250 max_concurrency=max_concurrency, 251 validate_content=validate_content, 252 append_position_access_conditions=append_conditions, 253 **kwargs) 254 except StorageErrorException as error: 255 if error.response.status_code != 404: 256 raise 257 # rewind the request body if it is a stream 258 if hasattr(stream, 'read'): 259 try: 260 # attempt to rewind the body to the initial position 261 stream.seek(0, SEEK_SET) 262 except UnsupportedOperation: 263 # if body is not seekable, then retry would not work 264 raise error 265 client.create( 266 content_length=0, 267 blob_http_headers=blob_headers, 268 headers=headers, 269 **kwargs) 270 return upload_data_chunks( 271 service=client, 272 uploader_class=AppendBlobChunkUploader, 273 total_size=length, 274 chunk_size=blob_settings.max_block_size, 275 stream=stream, 276 max_concurrency=max_concurrency, 277 validate_content=validate_content, 278 append_position_access_conditions=append_conditions, 279 **kwargs) 280 except StorageErrorException as error: 281 process_storage_error(error) 282