1# ------------------------------------------------------------------------- 2# Copyright (c) Microsoft Corporation. All rights reserved. 3# Licensed under the MIT License. See License.txt in the project root for 4# license information. 5# -------------------------------------------------------------------------- 6# pylint: disable=no-self-use 7 8import asyncio 9from asyncio import Lock 10from itertools import islice 11import threading 12 13from math import ceil 14 15import six 16 17from . import encode_base64, url_quote 18from .request_handlers import get_length 19from .response_handlers import return_response_headers 20from .encryption import get_blob_encryptor_and_padder 21from .uploads import SubStream, IterStreamer # pylint: disable=unused-import 22 23 24_LARGE_BLOB_UPLOAD_MAX_READ_BUFFER_SIZE = 4 * 1024 * 1024 25_ERROR_VALUE_SHOULD_BE_SEEKABLE_STREAM = '{0} should be a seekable file-like/io.IOBase type stream object.' 26 27 28async def _parallel_uploads(uploader, pending, running): 29 range_ids = [] 30 while True: 31 # Wait for some download to finish before adding a new one 32 done, running = await asyncio.wait(running, return_when=asyncio.FIRST_COMPLETED) 33 range_ids.extend([chunk.result() for chunk in done]) 34 try: 35 for _ in range(0, len(done)): 36 next_chunk = next(pending) 37 running.add(asyncio.ensure_future(uploader(next_chunk))) 38 except StopIteration: 39 break 40 41 # Wait for the remaining uploads to finish 42 if running: 43 done, _running = await asyncio.wait(running) 44 range_ids.extend([chunk.result() for chunk in done]) 45 return range_ids 46 47 48async def upload_data_chunks( 49 service=None, 50 uploader_class=None, 51 total_size=None, 52 chunk_size=None, 53 max_concurrency=None, 54 stream=None, 55 encryption_options=None, 56 **kwargs): 57 58 if encryption_options: 59 encryptor, padder = get_blob_encryptor_and_padder( 60 encryption_options.get('cek'), 61 encryption_options.get('vector'), 62 uploader_class is not PageBlobChunkUploader) 63 kwargs['encryptor'] = encryptor 64 kwargs['padder'] = padder 65 66 parallel = max_concurrency > 1 67 if parallel and 'modified_access_conditions' in kwargs: 68 # Access conditions do not work with parallelism 69 kwargs['modified_access_conditions'] = None 70 71 uploader = uploader_class( 72 service=service, 73 total_size=total_size, 74 chunk_size=chunk_size, 75 stream=stream, 76 parallel=parallel, 77 **kwargs) 78 79 if parallel: 80 upload_tasks = uploader.get_chunk_streams() 81 running_futures = [ 82 asyncio.ensure_future(uploader.process_chunk(u)) 83 for u in islice(upload_tasks, 0, max_concurrency) 84 ] 85 range_ids = await _parallel_uploads(uploader.process_chunk, upload_tasks, running_futures) 86 else: 87 range_ids = [] 88 for chunk in uploader.get_chunk_streams(): 89 range_ids.append(await uploader.process_chunk(chunk)) 90 91 if any(range_ids): 92 return [r[1] for r in sorted(range_ids, key=lambda r: r[0])] 93 return uploader.response_headers 94 95 96async def upload_substream_blocks( 97 service=None, 98 uploader_class=None, 99 total_size=None, 100 chunk_size=None, 101 max_concurrency=None, 102 stream=None, 103 **kwargs): 104 parallel = max_concurrency > 1 105 if parallel and 'modified_access_conditions' in kwargs: 106 # Access conditions do not work with parallelism 107 kwargs['modified_access_conditions'] = None 108 uploader = uploader_class( 109 service=service, 110 total_size=total_size, 111 chunk_size=chunk_size, 112 stream=stream, 113 parallel=parallel, 114 **kwargs) 115 116 if parallel: 117 upload_tasks = uploader.get_substream_blocks() 118 running_futures = [ 119 asyncio.ensure_future(uploader.process_substream_block(u)) 120 for u in islice(upload_tasks, 0, max_concurrency) 121 ] 122 range_ids = await _parallel_uploads(uploader.process_substream_block, upload_tasks, running_futures) 123 else: 124 range_ids = [] 125 for block in uploader.get_substream_blocks(): 126 range_ids.append(await uploader.process_substream_block(block)) 127 return sorted(range_ids) 128 129 130class _ChunkUploader(object): # pylint: disable=too-many-instance-attributes 131 132 def __init__(self, service, total_size, chunk_size, stream, parallel, encryptor=None, padder=None, **kwargs): 133 self.service = service 134 self.total_size = total_size 135 self.chunk_size = chunk_size 136 self.stream = stream 137 self.parallel = parallel 138 139 # Stream management 140 self.stream_start = stream.tell() if parallel else None 141 self.stream_lock = threading.Lock() if parallel else None 142 143 # Progress feedback 144 self.progress_total = 0 145 self.progress_lock = Lock() if parallel else None 146 147 # Encryption 148 self.encryptor = encryptor 149 self.padder = padder 150 self.response_headers = None 151 self.etag = None 152 self.last_modified = None 153 self.request_options = kwargs 154 155 def get_chunk_streams(self): 156 index = 0 157 while True: 158 data = b'' 159 read_size = self.chunk_size 160 161 # Buffer until we either reach the end of the stream or get a whole chunk. 162 while True: 163 if self.total_size: 164 read_size = min(self.chunk_size - len(data), self.total_size - (index + len(data))) 165 temp = self.stream.read(read_size) 166 if not isinstance(temp, six.binary_type): 167 raise TypeError('Blob data should be of type bytes.') 168 data += temp or b"" 169 170 # We have read an empty string and so are at the end 171 # of the buffer or we have read a full chunk. 172 if temp == b'' or len(data) == self.chunk_size: 173 break 174 175 if len(data) == self.chunk_size: 176 if self.padder: 177 data = self.padder.update(data) 178 if self.encryptor: 179 data = self.encryptor.update(data) 180 yield index, data 181 else: 182 if self.padder: 183 data = self.padder.update(data) + self.padder.finalize() 184 if self.encryptor: 185 data = self.encryptor.update(data) + self.encryptor.finalize() 186 if data: 187 yield index, data 188 break 189 index += len(data) 190 191 async def process_chunk(self, chunk_data): 192 chunk_bytes = chunk_data[1] 193 chunk_offset = chunk_data[0] 194 return await self._upload_chunk_with_progress(chunk_offset, chunk_bytes) 195 196 async def _update_progress(self, length): 197 if self.progress_lock is not None: 198 async with self.progress_lock: 199 self.progress_total += length 200 else: 201 self.progress_total += length 202 203 async def _upload_chunk(self, chunk_offset, chunk_data): 204 raise NotImplementedError("Must be implemented by child class.") 205 206 async def _upload_chunk_with_progress(self, chunk_offset, chunk_data): 207 range_id = await self._upload_chunk(chunk_offset, chunk_data) 208 await self._update_progress(len(chunk_data)) 209 return range_id 210 211 def get_substream_blocks(self): 212 assert self.chunk_size is not None 213 lock = self.stream_lock 214 blob_length = self.total_size 215 216 if blob_length is None: 217 blob_length = get_length(self.stream) 218 if blob_length is None: 219 raise ValueError("Unable to determine content length of upload data.") 220 221 blocks = int(ceil(blob_length / (self.chunk_size * 1.0))) 222 last_block_size = self.chunk_size if blob_length % self.chunk_size == 0 else blob_length % self.chunk_size 223 224 for i in range(blocks): 225 index = i * self.chunk_size 226 length = last_block_size if i == blocks - 1 else self.chunk_size 227 yield ('BlockId{}'.format("%05d" % i), SubStream(self.stream, index, length, lock)) 228 229 async def process_substream_block(self, block_data): 230 return await self._upload_substream_block_with_progress(block_data[0], block_data[1]) 231 232 async def _upload_substream_block(self, block_id, block_stream): 233 raise NotImplementedError("Must be implemented by child class.") 234 235 async def _upload_substream_block_with_progress(self, block_id, block_stream): 236 range_id = await self._upload_substream_block(block_id, block_stream) 237 await self._update_progress(len(block_stream)) 238 return range_id 239 240 def set_response_properties(self, resp): 241 self.etag = resp.etag 242 self.last_modified = resp.last_modified 243 244 245class BlockBlobChunkUploader(_ChunkUploader): 246 247 def __init__(self, *args, **kwargs): 248 kwargs.pop('modified_access_conditions', None) 249 super(BlockBlobChunkUploader, self).__init__(*args, **kwargs) 250 self.current_length = None 251 252 async def _upload_chunk(self, chunk_offset, chunk_data): 253 # TODO: This is incorrect, but works with recording. 254 index = '{0:032d}'.format(chunk_offset) 255 block_id = encode_base64(url_quote(encode_base64(index))) 256 await self.service.stage_block( 257 block_id, 258 len(chunk_data), 259 chunk_data, 260 data_stream_total=self.total_size, 261 upload_stream_current=self.progress_total, 262 **self.request_options) 263 return index, block_id 264 265 async def _upload_substream_block(self, block_id, block_stream): 266 try: 267 await self.service.stage_block( 268 block_id, 269 len(block_stream), 270 block_stream, 271 data_stream_total=self.total_size, 272 upload_stream_current=self.progress_total, 273 **self.request_options) 274 finally: 275 block_stream.close() 276 return block_id 277 278 279class PageBlobChunkUploader(_ChunkUploader): # pylint: disable=abstract-method 280 281 def _is_chunk_empty(self, chunk_data): 282 # read until non-zero byte is encountered 283 # if reached the end without returning, then chunk_data is all 0's 284 for each_byte in chunk_data: 285 if each_byte not in [0, b'\x00']: 286 return False 287 return True 288 289 async def _upload_chunk(self, chunk_offset, chunk_data): 290 # avoid uploading the empty pages 291 if not self._is_chunk_empty(chunk_data): 292 chunk_end = chunk_offset + len(chunk_data) - 1 293 content_range = 'bytes={0}-{1}'.format(chunk_offset, chunk_end) 294 computed_md5 = None 295 self.response_headers = await self.service.upload_pages( 296 chunk_data, 297 content_length=len(chunk_data), 298 transactional_content_md5=computed_md5, 299 range=content_range, 300 cls=return_response_headers, 301 data_stream_total=self.total_size, 302 upload_stream_current=self.progress_total, 303 **self.request_options) 304 305 if not self.parallel and self.request_options.get('modified_access_conditions'): 306 self.request_options['modified_access_conditions'].if_match = self.response_headers['etag'] 307 308 309class AppendBlobChunkUploader(_ChunkUploader): # pylint: disable=abstract-method 310 311 def __init__(self, *args, **kwargs): 312 super(AppendBlobChunkUploader, self).__init__(*args, **kwargs) 313 self.current_length = None 314 315 async def _upload_chunk(self, chunk_offset, chunk_data): 316 if self.current_length is None: 317 self.response_headers = await self.service.append_block( 318 chunk_data, 319 content_length=len(chunk_data), 320 cls=return_response_headers, 321 data_stream_total=self.total_size, 322 upload_stream_current=self.progress_total, 323 **self.request_options) 324 self.current_length = int(self.response_headers['blob_append_offset']) 325 else: 326 self.request_options['append_position_access_conditions'].append_position = \ 327 self.current_length + chunk_offset 328 self.response_headers = await self.service.append_block( 329 chunk_data, 330 content_length=len(chunk_data), 331 cls=return_response_headers, 332 data_stream_total=self.total_size, 333 upload_stream_current=self.progress_total, 334 **self.request_options) 335 336 337class FileChunkUploader(_ChunkUploader): # pylint: disable=abstract-method 338 339 async def _upload_chunk(self, chunk_offset, chunk_data): 340 length = len(chunk_data) 341 chunk_end = chunk_offset + length - 1 342 response = await self.service.upload_range( 343 chunk_data, 344 chunk_offset, 345 length, 346 data_stream_total=self.total_size, 347 upload_stream_current=self.progress_total, 348 **self.request_options 349 ) 350 range_id = 'bytes={0}-{1}'.format(chunk_offset, chunk_end) 351 return range_id, response 352