1# ------------------------------------------------------------------------- 2# Copyright (c) Microsoft Corporation. All rights reserved. 3# Licensed under the MIT License. See License.txt in the project root for 4# license information. 5# -------------------------------------------------------------------------- 6# pylint: disable=no-self-use 7 8from io import SEEK_SET, UnsupportedOperation 9from typing import Optional, Union, Any, TypeVar, TYPE_CHECKING # pylint: disable=unused-import 10 11import six 12from azure.core.exceptions import ResourceExistsError, ResourceModifiedError 13 14from ._shared.response_handlers import ( 15 process_storage_error, 16 return_response_headers) 17from ._shared.models import StorageErrorCode 18from ._shared.uploads import ( 19 upload_data_chunks, 20 upload_substream_blocks, 21 BlockBlobChunkUploader, 22 PageBlobChunkUploader, 23 AppendBlobChunkUploader) 24from ._shared.encryption import generate_blob_encryption_data, encrypt_blob 25from ._generated.models import ( 26 StorageErrorException, 27 BlockLookupList, 28 AppendPositionAccessConditions, 29 ModifiedAccessConditions, 30) 31 32if TYPE_CHECKING: 33 from datetime import datetime # pylint: disable=unused-import 34 BlobLeaseClient = TypeVar("BlobLeaseClient") 35 36_LARGE_BLOB_UPLOAD_MAX_READ_BUFFER_SIZE = 4 * 1024 * 1024 37_ERROR_VALUE_SHOULD_BE_SEEKABLE_STREAM = '{0} should be a seekable file-like/io.IOBase type stream object.' 38 39 40def _convert_mod_error(error): 41 message = error.message.replace( 42 "The condition specified using HTTP conditional header(s) is not met.", 43 "The specified blob already exists.") 44 message = message.replace("ConditionNotMet", "BlobAlreadyExists") 45 overwrite_error = ResourceExistsError( 46 message=message, 47 response=error.response, 48 error=error) 49 overwrite_error.error_code = StorageErrorCode.blob_already_exists 50 raise overwrite_error 51 52 53def _any_conditions(modified_access_conditions=None, **kwargs): # pylint: disable=unused-argument 54 return any([ 55 modified_access_conditions.if_modified_since, 56 modified_access_conditions.if_unmodified_since, 57 modified_access_conditions.if_none_match, 58 modified_access_conditions.if_match 59 ]) 60 61 62def upload_block_blob( # pylint: disable=too-many-locals 63 client=None, 64 data=None, 65 stream=None, 66 length=None, 67 overwrite=None, 68 headers=None, 69 validate_content=None, 70 max_concurrency=None, 71 blob_settings=None, 72 encryption_options=None, 73 **kwargs): 74 try: 75 if not overwrite and not _any_conditions(**kwargs): 76 kwargs['modified_access_conditions'].if_none_match = '*' 77 adjusted_count = length 78 if (encryption_options.get('key') is not None) and (adjusted_count is not None): 79 adjusted_count += (16 - (length % 16)) 80 blob_headers = kwargs.pop('blob_headers', None) 81 tier = kwargs.pop('standard_blob_tier', None) 82 blob_tags_string = kwargs.pop('blob_tags_string', None) 83 84 # Do single put if the size is smaller than or equal config.max_single_put_size 85 if adjusted_count is not None and (adjusted_count <= blob_settings.max_single_put_size): 86 try: 87 data = data.read(length) 88 if not isinstance(data, six.binary_type): 89 raise TypeError('Blob data should be of type bytes.') 90 except AttributeError: 91 pass 92 if encryption_options.get('key'): 93 encryption_data, data = encrypt_blob(data, encryption_options['key']) 94 headers['x-ms-meta-encryptiondata'] = encryption_data 95 return client.upload( 96 data, 97 content_length=adjusted_count, 98 blob_http_headers=blob_headers, 99 headers=headers, 100 cls=return_response_headers, 101 validate_content=validate_content, 102 data_stream_total=adjusted_count, 103 upload_stream_current=0, 104 tier=tier.value if tier else None, 105 blob_tags_string=blob_tags_string, 106 **kwargs) 107 108 use_original_upload_path = blob_settings.use_byte_buffer or \ 109 validate_content or encryption_options.get('required') or \ 110 blob_settings.max_block_size < blob_settings.min_large_block_upload_threshold or \ 111 hasattr(stream, 'seekable') and not stream.seekable() or \ 112 not hasattr(stream, 'seek') or not hasattr(stream, 'tell') 113 114 if use_original_upload_path: 115 if encryption_options.get('key'): 116 cek, iv, encryption_data = generate_blob_encryption_data(encryption_options['key']) 117 headers['x-ms-meta-encryptiondata'] = encryption_data 118 encryption_options['cek'] = cek 119 encryption_options['vector'] = iv 120 block_ids = upload_data_chunks( 121 service=client, 122 uploader_class=BlockBlobChunkUploader, 123 total_size=length, 124 chunk_size=blob_settings.max_block_size, 125 max_concurrency=max_concurrency, 126 stream=stream, 127 validate_content=validate_content, 128 encryption_options=encryption_options, 129 **kwargs 130 ) 131 else: 132 block_ids = upload_substream_blocks( 133 service=client, 134 uploader_class=BlockBlobChunkUploader, 135 total_size=length, 136 chunk_size=blob_settings.max_block_size, 137 max_concurrency=max_concurrency, 138 stream=stream, 139 validate_content=validate_content, 140 **kwargs 141 ) 142 143 block_lookup = BlockLookupList(committed=[], uncommitted=[], latest=[]) 144 block_lookup.latest = block_ids 145 return client.commit_block_list( 146 block_lookup, 147 blob_http_headers=blob_headers, 148 cls=return_response_headers, 149 validate_content=validate_content, 150 headers=headers, 151 tier=tier.value if tier else None, 152 blob_tags_string=blob_tags_string, 153 **kwargs) 154 except StorageErrorException as error: 155 try: 156 process_storage_error(error) 157 except ResourceModifiedError as mod_error: 158 if not overwrite: 159 _convert_mod_error(mod_error) 160 raise 161 162 163def upload_page_blob( 164 client=None, 165 stream=None, 166 length=None, 167 overwrite=None, 168 headers=None, 169 validate_content=None, 170 max_concurrency=None, 171 blob_settings=None, 172 encryption_options=None, 173 **kwargs): 174 try: 175 if not overwrite and not _any_conditions(**kwargs): 176 kwargs['modified_access_conditions'].if_none_match = '*' 177 if length is None or length < 0: 178 raise ValueError("A content length must be specified for a Page Blob.") 179 if length % 512 != 0: 180 raise ValueError("Invalid page blob size: {0}. " 181 "The size must be aligned to a 512-byte boundary.".format(length)) 182 if kwargs.get('premium_page_blob_tier'): 183 premium_page_blob_tier = kwargs.pop('premium_page_blob_tier') 184 try: 185 headers['x-ms-access-tier'] = premium_page_blob_tier.value 186 except AttributeError: 187 headers['x-ms-access-tier'] = premium_page_blob_tier 188 if encryption_options and encryption_options.get('data'): 189 headers['x-ms-meta-encryptiondata'] = encryption_options['data'] 190 blob_tags_string = kwargs.pop('blob_tags_string', None) 191 192 response = client.create( 193 content_length=0, 194 blob_content_length=length, 195 blob_sequence_number=None, 196 blob_http_headers=kwargs.pop('blob_headers', None), 197 blob_tags_string=blob_tags_string, 198 cls=return_response_headers, 199 headers=headers, 200 **kwargs) 201 if length == 0: 202 return response 203 204 kwargs['modified_access_conditions'] = ModifiedAccessConditions(if_match=response['etag']) 205 return upload_data_chunks( 206 service=client, 207 uploader_class=PageBlobChunkUploader, 208 total_size=length, 209 chunk_size=blob_settings.max_page_size, 210 stream=stream, 211 max_concurrency=max_concurrency, 212 validate_content=validate_content, 213 encryption_options=encryption_options, 214 **kwargs) 215 216 except StorageErrorException as error: 217 try: 218 process_storage_error(error) 219 except ResourceModifiedError as mod_error: 220 if not overwrite: 221 _convert_mod_error(mod_error) 222 raise 223 224 225def upload_append_blob( # pylint: disable=unused-argument 226 client=None, 227 stream=None, 228 length=None, 229 overwrite=None, 230 headers=None, 231 validate_content=None, 232 max_concurrency=None, 233 blob_settings=None, 234 encryption_options=None, 235 **kwargs): 236 try: 237 if length == 0: 238 return {} 239 blob_headers = kwargs.pop('blob_headers', None) 240 append_conditions = AppendPositionAccessConditions( 241 max_size=kwargs.pop('maxsize_condition', None), 242 append_position=None) 243 blob_tags_string = kwargs.pop('blob_tags_string', None) 244 245 try: 246 if overwrite: 247 client.create( 248 content_length=0, 249 blob_http_headers=blob_headers, 250 headers=headers, 251 blob_tags_string=blob_tags_string, 252 **kwargs) 253 return upload_data_chunks( 254 service=client, 255 uploader_class=AppendBlobChunkUploader, 256 total_size=length, 257 chunk_size=blob_settings.max_block_size, 258 stream=stream, 259 max_concurrency=max_concurrency, 260 validate_content=validate_content, 261 append_position_access_conditions=append_conditions, 262 **kwargs) 263 except StorageErrorException as error: 264 if error.response.status_code != 404: 265 raise 266 # rewind the request body if it is a stream 267 if hasattr(stream, 'read'): 268 try: 269 # attempt to rewind the body to the initial position 270 stream.seek(0, SEEK_SET) 271 except UnsupportedOperation: 272 # if body is not seekable, then retry would not work 273 raise error 274 client.create( 275 content_length=0, 276 blob_http_headers=blob_headers, 277 headers=headers, 278 blob_tags_string=blob_tags_string, 279 **kwargs) 280 return upload_data_chunks( 281 service=client, 282 uploader_class=AppendBlobChunkUploader, 283 total_size=length, 284 chunk_size=blob_settings.max_block_size, 285 stream=stream, 286 max_concurrency=max_concurrency, 287 validate_content=validate_content, 288 append_position_access_conditions=append_conditions, 289 **kwargs) 290 except StorageErrorException as error: 291 process_storage_error(error) 292