1# -------------------------------------------------------------------------
2# Copyright (c) Microsoft Corporation. All rights reserved.
3# Licensed under the MIT License. See License.txt in the project root for
4# license information.
5# --------------------------------------------------------------------------
6# pylint: disable=no-self-use
7
8import asyncio
9from asyncio import Lock
10from itertools import islice
11import threading
12
13from math import ceil
14
15import six
16
17from . import encode_base64, url_quote
18from .request_handlers import get_length
19from .response_handlers import return_response_headers
20from .encryption import get_blob_encryptor_and_padder
21from .uploads import SubStream, IterStreamer  # pylint: disable=unused-import
22
23
24_LARGE_BLOB_UPLOAD_MAX_READ_BUFFER_SIZE = 4 * 1024 * 1024
25_ERROR_VALUE_SHOULD_BE_SEEKABLE_STREAM = '{0} should be a seekable file-like/io.IOBase type stream object.'
26
27
28async def _parallel_uploads(uploader, pending, running):
29    range_ids = []
30    while True:
31        # Wait for some download to finish before adding a new one
32        done, running = await asyncio.wait(running, return_when=asyncio.FIRST_COMPLETED)
33        range_ids.extend([chunk.result() for chunk in done])
34        try:
35            for _ in range(0, len(done)):
36                next_chunk = next(pending)
37                running.add(asyncio.ensure_future(uploader(next_chunk)))
38        except StopIteration:
39            break
40
41    # Wait for the remaining uploads to finish
42    if running:
43        done, _running = await asyncio.wait(running)
44        range_ids.extend([chunk.result() for chunk in done])
45    return range_ids
46
47
48async def upload_data_chunks(
49        service=None,
50        uploader_class=None,
51        total_size=None,
52        chunk_size=None,
53        max_concurrency=None,
54        stream=None,
55        encryption_options=None,
56        **kwargs):
57
58    if encryption_options:
59        encryptor, padder = get_blob_encryptor_and_padder(
60            encryption_options.get('cek'),
61            encryption_options.get('vector'),
62            uploader_class is not PageBlobChunkUploader)
63        kwargs['encryptor'] = encryptor
64        kwargs['padder'] = padder
65
66    parallel = max_concurrency > 1
67    if parallel and 'modified_access_conditions' in kwargs:
68        # Access conditions do not work with parallelism
69        kwargs['modified_access_conditions'] = None
70
71    uploader = uploader_class(
72        service=service,
73        total_size=total_size,
74        chunk_size=chunk_size,
75        stream=stream,
76        parallel=parallel,
77        **kwargs)
78
79    if parallel:
80        upload_tasks = uploader.get_chunk_streams()
81        running_futures = [
82            asyncio.ensure_future(uploader.process_chunk(u))
83            for u in islice(upload_tasks, 0, max_concurrency)
84        ]
85        range_ids = await _parallel_uploads(uploader.process_chunk, upload_tasks, running_futures)
86    else:
87        range_ids = []
88        for chunk in uploader.get_chunk_streams():
89            range_ids.append(await uploader.process_chunk(chunk))
90
91    if any(range_ids):
92        return [r[1] for r in sorted(range_ids, key=lambda r: r[0])]
93    return uploader.response_headers
94
95
96async def upload_substream_blocks(
97        service=None,
98        uploader_class=None,
99        total_size=None,
100        chunk_size=None,
101        max_concurrency=None,
102        stream=None,
103        **kwargs):
104    parallel = max_concurrency > 1
105    if parallel and 'modified_access_conditions' in kwargs:
106        # Access conditions do not work with parallelism
107        kwargs['modified_access_conditions'] = None
108    uploader = uploader_class(
109        service=service,
110        total_size=total_size,
111        chunk_size=chunk_size,
112        stream=stream,
113        parallel=parallel,
114        **kwargs)
115
116    if parallel:
117        upload_tasks = uploader.get_substream_blocks()
118        running_futures = [
119            asyncio.ensure_future(uploader.process_substream_block(u))
120            for u in islice(upload_tasks, 0, max_concurrency)
121        ]
122        range_ids = await _parallel_uploads(uploader.process_substream_block, upload_tasks, running_futures)
123    else:
124        range_ids = []
125        for block in uploader.get_substream_blocks():
126            range_ids.append(await uploader.process_substream_block(block))
127    return sorted(range_ids)
128
129
130class _ChunkUploader(object):  # pylint: disable=too-many-instance-attributes
131
132    def __init__(self, service, total_size, chunk_size, stream, parallel, encryptor=None, padder=None, **kwargs):
133        self.service = service
134        self.total_size = total_size
135        self.chunk_size = chunk_size
136        self.stream = stream
137        self.parallel = parallel
138
139        # Stream management
140        self.stream_start = stream.tell() if parallel else None
141        self.stream_lock = threading.Lock() if parallel else None
142
143        # Progress feedback
144        self.progress_total = 0
145        self.progress_lock = Lock() if parallel else None
146
147        # Encryption
148        self.encryptor = encryptor
149        self.padder = padder
150        self.response_headers = None
151        self.etag = None
152        self.last_modified = None
153        self.request_options = kwargs
154
155    def get_chunk_streams(self):
156        index = 0
157        while True:
158            data = b''
159            read_size = self.chunk_size
160
161            # Buffer until we either reach the end of the stream or get a whole chunk.
162            while True:
163                if self.total_size:
164                    read_size = min(self.chunk_size - len(data), self.total_size - (index + len(data)))
165                temp = self.stream.read(read_size)
166                if not isinstance(temp, six.binary_type):
167                    raise TypeError('Blob data should be of type bytes.')
168                data += temp or b""
169
170                # We have read an empty string and so are at the end
171                # of the buffer or we have read a full chunk.
172                if temp == b'' or len(data) == self.chunk_size:
173                    break
174
175            if len(data) == self.chunk_size:
176                if self.padder:
177                    data = self.padder.update(data)
178                if self.encryptor:
179                    data = self.encryptor.update(data)
180                yield index, data
181            else:
182                if self.padder:
183                    data = self.padder.update(data) + self.padder.finalize()
184                if self.encryptor:
185                    data = self.encryptor.update(data) + self.encryptor.finalize()
186                if data:
187                    yield index, data
188                break
189            index += len(data)
190
191    async def process_chunk(self, chunk_data):
192        chunk_bytes = chunk_data[1]
193        chunk_offset = chunk_data[0]
194        return await self._upload_chunk_with_progress(chunk_offset, chunk_bytes)
195
196    async def _update_progress(self, length):
197        if self.progress_lock is not None:
198            async with self.progress_lock:
199                self.progress_total += length
200        else:
201            self.progress_total += length
202
203    async def _upload_chunk(self, chunk_offset, chunk_data):
204        raise NotImplementedError("Must be implemented by child class.")
205
206    async def _upload_chunk_with_progress(self, chunk_offset, chunk_data):
207        range_id = await self._upload_chunk(chunk_offset, chunk_data)
208        await self._update_progress(len(chunk_data))
209        return range_id
210
211    def get_substream_blocks(self):
212        assert self.chunk_size is not None
213        lock = self.stream_lock
214        blob_length = self.total_size
215
216        if blob_length is None:
217            blob_length = get_length(self.stream)
218            if blob_length is None:
219                raise ValueError("Unable to determine content length of upload data.")
220
221        blocks = int(ceil(blob_length / (self.chunk_size * 1.0)))
222        last_block_size = self.chunk_size if blob_length % self.chunk_size == 0 else blob_length % self.chunk_size
223
224        for i in range(blocks):
225            index = i * self.chunk_size
226            length = last_block_size if i == blocks - 1 else self.chunk_size
227            yield ('BlockId{}'.format("%05d" % i), SubStream(self.stream, index, length, lock))
228
229    async def process_substream_block(self, block_data):
230        return await self._upload_substream_block_with_progress(block_data[0], block_data[1])
231
232    async def _upload_substream_block(self, block_id, block_stream):
233        raise NotImplementedError("Must be implemented by child class.")
234
235    async def _upload_substream_block_with_progress(self, block_id, block_stream):
236        range_id = await self._upload_substream_block(block_id, block_stream)
237        await self._update_progress(len(block_stream))
238        return range_id
239
240    def set_response_properties(self, resp):
241        self.etag = resp.etag
242        self.last_modified = resp.last_modified
243
244
245class BlockBlobChunkUploader(_ChunkUploader):
246
247    def __init__(self, *args, **kwargs):
248        kwargs.pop('modified_access_conditions', None)
249        super(BlockBlobChunkUploader, self).__init__(*args, **kwargs)
250        self.current_length = None
251
252    async def _upload_chunk(self, chunk_offset, chunk_data):
253        # TODO: This is incorrect, but works with recording.
254        index = '{0:032d}'.format(chunk_offset)
255        block_id = encode_base64(url_quote(encode_base64(index)))
256        await self.service.stage_block(
257            block_id,
258            len(chunk_data),
259            chunk_data,
260            data_stream_total=self.total_size,
261            upload_stream_current=self.progress_total,
262            **self.request_options)
263        return index, block_id
264
265    async def _upload_substream_block(self, block_id, block_stream):
266        try:
267            await self.service.stage_block(
268                block_id,
269                len(block_stream),
270                block_stream,
271                data_stream_total=self.total_size,
272                upload_stream_current=self.progress_total,
273                **self.request_options)
274        finally:
275            block_stream.close()
276        return block_id
277
278
279class PageBlobChunkUploader(_ChunkUploader):  # pylint: disable=abstract-method
280
281    def _is_chunk_empty(self, chunk_data):
282        # read until non-zero byte is encountered
283        # if reached the end without returning, then chunk_data is all 0's
284        for each_byte in chunk_data:
285            if each_byte not in [0, b'\x00']:
286                return False
287        return True
288
289    async def _upload_chunk(self, chunk_offset, chunk_data):
290        # avoid uploading the empty pages
291        if not self._is_chunk_empty(chunk_data):
292            chunk_end = chunk_offset + len(chunk_data) - 1
293            content_range = 'bytes={0}-{1}'.format(chunk_offset, chunk_end)
294            computed_md5 = None
295            self.response_headers = await self.service.upload_pages(
296                chunk_data,
297                content_length=len(chunk_data),
298                transactional_content_md5=computed_md5,
299                range=content_range,
300                cls=return_response_headers,
301                data_stream_total=self.total_size,
302                upload_stream_current=self.progress_total,
303                **self.request_options)
304
305            if not self.parallel and self.request_options.get('modified_access_conditions'):
306                self.request_options['modified_access_conditions'].if_match = self.response_headers['etag']
307
308
309class AppendBlobChunkUploader(_ChunkUploader):  # pylint: disable=abstract-method
310
311    def __init__(self, *args, **kwargs):
312        super(AppendBlobChunkUploader, self).__init__(*args, **kwargs)
313        self.current_length = None
314
315    async def _upload_chunk(self, chunk_offset, chunk_data):
316        if self.current_length is None:
317            self.response_headers = await self.service.append_block(
318                chunk_data,
319                content_length=len(chunk_data),
320                cls=return_response_headers,
321                data_stream_total=self.total_size,
322                upload_stream_current=self.progress_total,
323                **self.request_options)
324            self.current_length = int(self.response_headers['blob_append_offset'])
325        else:
326            self.request_options['append_position_access_conditions'].append_position = \
327                self.current_length + chunk_offset
328            self.response_headers = await self.service.append_block(
329                chunk_data,
330                content_length=len(chunk_data),
331                cls=return_response_headers,
332                data_stream_total=self.total_size,
333                upload_stream_current=self.progress_total,
334                **self.request_options)
335
336
337class FileChunkUploader(_ChunkUploader):  # pylint: disable=abstract-method
338
339    async def _upload_chunk(self, chunk_offset, chunk_data):
340        chunk_end = chunk_offset + len(chunk_data) - 1
341        response = await self.service.upload_range(
342            chunk_data,
343            chunk_offset,
344            chunk_end,
345            data_stream_total=self.total_size,
346            upload_stream_current=self.progress_total,
347            **self.request_options
348        )
349        range_id = 'bytes={0}-{1}'.format(chunk_offset, chunk_end)
350        return range_id, response
351