1#-------------------------------------------------------------------------
2# Copyright (c) Microsoft.  All rights reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#   http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#--------------------------------------------------------------------------
15import threading
16
17from time import sleep
18from .._error import _ERROR_NO_SINGLE_THREAD_CHUNKING
19
20def _download_file_chunks(file_service, share_name, directory_name, file_name,
21                          download_size, block_size, progress, start_range, end_range,
22                          stream, max_connections, progress_callback, validate_content,
23                          timeout, operation_context):
24    if max_connections <= 1:
25        raise ValueError(_ERROR_NO_SINGLE_THREAD_CHUNKING.format('file'))
26
27    downloader = _FileChunkDownloader(
28        file_service,
29        share_name,
30        directory_name,
31        file_name,
32        download_size,
33        block_size,
34        progress,
35        start_range,
36        end_range,
37        stream,
38        progress_callback,
39        validate_content,
40        timeout,
41        operation_context,
42    )
43
44    import concurrent.futures
45    executor = concurrent.futures.ThreadPoolExecutor(max_connections)
46    result = list(executor.map(downloader.process_chunk, downloader.get_chunk_offsets()))
47
48class _FileChunkDownloader(object):
49    def __init__(self, file_service, share_name, directory_name, file_name,
50                 download_size, chunk_size, progress, start_range, end_range,
51                 stream, progress_callback, validate_content, timeout, operation_context):
52        self.file_service = file_service
53        self.share_name = share_name
54        self.directory_name = directory_name
55        self.file_name = file_name
56        self.chunk_size = chunk_size
57
58        self.download_size = download_size
59        self.start_index = start_range
60        self.file_end = end_range
61
62        self.stream = stream
63        self.stream_start = stream.tell()
64        self.stream_lock = threading.Lock()
65        self.progress_callback = progress_callback
66        self.progress_total = progress
67        self.progress_lock = threading.Lock()
68        self.validate_content = validate_content
69        self.timeout = timeout
70        self.operation_context = operation_context
71
72    def get_chunk_offsets(self):
73        index = self.start_index
74        while index < self.file_end:
75            yield index
76            index += self.chunk_size
77
78    def process_chunk(self, chunk_start):
79        if chunk_start + self.chunk_size > self.file_end:
80            chunk_end = self.file_end
81        else:
82            chunk_end = chunk_start + self.chunk_size
83
84        chunk_data = self._download_chunk(chunk_start, chunk_end).content
85        length = chunk_end - chunk_start
86        if length > 0:
87            self._write_to_stream(chunk_data, chunk_start)
88            self._update_progress(length)
89
90    def _update_progress(self, length):
91        if self.progress_callback is not None:
92            with self.progress_lock:
93                self.progress_total += length
94                total = self.progress_total
95                self.progress_callback(total, self.download_size)
96
97    def _write_to_stream(self, chunk_data, chunk_start):
98        with self.stream_lock:
99            self.stream.seek(self.stream_start + (chunk_start - self.start_index))
100            self.stream.write(chunk_data)
101
102    def _download_chunk(self, chunk_start, chunk_end):
103        return self.file_service._get_file(
104            self.share_name,
105            self.directory_name,
106            self.file_name,
107            start_range=chunk_start,
108            end_range=chunk_end - 1,
109            validate_content=self.validate_content,
110            timeout=self.timeout,
111            _context=self.operation_context
112        )