1#------------------------------------------------------------------------- 2# Copyright (c) Microsoft. All rights reserved. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14#-------------------------------------------------------------------------- 15import threading 16 17from time import sleep 18from .._error import _ERROR_NO_SINGLE_THREAD_CHUNKING 19 20def _download_file_chunks(file_service, share_name, directory_name, file_name, 21 download_size, block_size, progress, start_range, end_range, 22 stream, max_connections, progress_callback, validate_content, 23 timeout, operation_context): 24 if max_connections <= 1: 25 raise ValueError(_ERROR_NO_SINGLE_THREAD_CHUNKING.format('file')) 26 27 downloader = _FileChunkDownloader( 28 file_service, 29 share_name, 30 directory_name, 31 file_name, 32 download_size, 33 block_size, 34 progress, 35 start_range, 36 end_range, 37 stream, 38 progress_callback, 39 validate_content, 40 timeout, 41 operation_context, 42 ) 43 44 import concurrent.futures 45 executor = concurrent.futures.ThreadPoolExecutor(max_connections) 46 result = list(executor.map(downloader.process_chunk, downloader.get_chunk_offsets())) 47 48class _FileChunkDownloader(object): 49 def __init__(self, file_service, share_name, directory_name, file_name, 50 download_size, chunk_size, progress, start_range, end_range, 51 stream, progress_callback, validate_content, timeout, operation_context): 52 self.file_service = file_service 53 self.share_name = share_name 54 self.directory_name = directory_name 55 self.file_name = file_name 56 self.chunk_size = chunk_size 57 58 self.download_size = download_size 59 self.start_index = start_range 60 self.file_end = end_range 61 62 self.stream = stream 63 self.stream_start = stream.tell() 64 self.stream_lock = threading.Lock() 65 self.progress_callback = progress_callback 66 self.progress_total = progress 67 self.progress_lock = threading.Lock() 68 self.validate_content = validate_content 69 self.timeout = timeout 70 self.operation_context = operation_context 71 72 def get_chunk_offsets(self): 73 index = self.start_index 74 while index < self.file_end: 75 yield index 76 index += self.chunk_size 77 78 def process_chunk(self, chunk_start): 79 if chunk_start + self.chunk_size > self.file_end: 80 chunk_end = self.file_end 81 else: 82 chunk_end = chunk_start + self.chunk_size 83 84 chunk_data = self._download_chunk(chunk_start, chunk_end).content 85 length = chunk_end - chunk_start 86 if length > 0: 87 self._write_to_stream(chunk_data, chunk_start) 88 self._update_progress(length) 89 90 def _update_progress(self, length): 91 if self.progress_callback is not None: 92 with self.progress_lock: 93 self.progress_total += length 94 total = self.progress_total 95 self.progress_callback(total, self.download_size) 96 97 def _write_to_stream(self, chunk_data, chunk_start): 98 with self.stream_lock: 99 self.stream.seek(self.stream_start + (chunk_start - self.start_index)) 100 self.stream.write(chunk_data) 101 102 def _download_chunk(self, chunk_start, chunk_end): 103 return self.file_service._get_file( 104 self.share_name, 105 self.directory_name, 106 self.file_name, 107 start_range=chunk_start, 108 end_range=chunk_end - 1, 109 validate_content=self.validate_content, 110 timeout=self.timeout, 111 _context=self.operation_context 112 )