1######################################################################
2#
3# File: test_bucket.py
4#
5# Copyright 2018 Backblaze Inc. All Rights Reserved.
6#
7# License https://www.backblaze.com/using_b2_code.html
8#
9######################################################################
10
11from __future__ import absolute_import, division
12
13from nose import SkipTest
14import os
15import platform
16
17import six
18
19from .stub_account_info import StubAccountInfo
20from .test_base import TestBase
21from b2.api import B2Api
22from b2.bucket import LargeFileUploadState
23from b2.download_dest import DownloadDestBytes, PreSeekedDownloadDest
24from b2.exception import AlreadyFailed, B2Error, InvalidAuthToken, InvalidRange, InvalidUploadSource, MaxRetriesExceeded
25from b2.file_version import FileVersionInfo
26from b2.part import Part
27from b2.progress import AbstractProgressListener
28from b2.raw_simulator import RawSimulator
29from b2.transferer.parallel import ParallelDownloader
30from b2.transferer.simple import SimpleDownloader
31from b2.upload_source import UploadSourceBytes
32from b2.utils import hex_sha1_of_bytes, TempDir
33
34try:
35    import unittest.mock as mock
36except ImportError:
37    import mock
38
39
40def write_file(path, data):
41    with open(path, 'wb') as f:
42        f.write(data)
43
44
45class StubProgressListener(AbstractProgressListener):
46    """
47    Implementation of a progress listener that remembers what calls were made,
48    and returns them as a short string to use in unit tests.
49
50    For a total byte count of 100, and updates at 33 and 66, the returned
51    string looks like: "100: 33 66"
52    """
53
54    def __init__(self):
55        self.total = None
56        self.history = []
57        self.last_byte_count = 0
58
59    def get_history(self):
60        return ' '.join(self.history)
61
62    def set_total_bytes(self, total_byte_count):
63        assert total_byte_count is not None
64        assert self.total is None, 'set_total_bytes called twice'
65        self.total = total_byte_count
66        assert len(self.history) == 0, self.history
67        self.history.append('%d:' % (total_byte_count,))
68
69    def bytes_completed(self, byte_count):
70        assert byte_count >= self.last_byte_count
71        self.last_byte_count = byte_count
72        self.history.append(str(byte_count))
73
74    def is_valid(self):
75        return self.total == self.last_byte_count
76
77    def close(self):
78        self.history.append('closed')
79
80
81class CanRetry(B2Error):
82    """
83    An exception that can be retryable, or not.
84    """
85
86    def __init__(self, can_retry):
87        super(CanRetry, self).__init__(None, None, None, None, None)
88        self.can_retry = can_retry
89
90    def should_retry_upload(self):
91        return self.can_retry
92
93
94class TestCaseWithBucket(TestBase):
95    def setUp(self):
96        self.bucket_name = 'my-bucket'
97        self.simulator = RawSimulator()
98        self.account_info = StubAccountInfo()
99        self.api = B2Api(self.account_info, raw_api=self.simulator)
100        (self.account_id, self.master_key) = self.simulator.create_account()
101        self.api.authorize_account('production', self.account_id, self.master_key)
102        self.api_url = self.account_info.get_api_url()
103        self.account_auth_token = self.account_info.get_account_auth_token()
104        self.bucket = self.api.create_bucket('my-bucket', 'allPublic')
105        self.bucket_id = self.bucket.id_
106
107    def assertBucketContents(self, expected, *args, **kwargs):
108        """
109        *args and **kwargs are passed to self.bucket.ls()
110        """
111        actual = [
112            (info.file_name, info.size, info.action, folder)
113            for (info, folder) in self.bucket.ls(*args, **kwargs)
114        ]
115        self.assertEqual(expected, actual)
116
117
118class TestReauthorization(TestCaseWithBucket):
119    def testCreateBucket(self):
120        class InvalidAuthTokenWrapper(object):
121            def __init__(self, original_function):
122                self.__original_function = original_function
123                self.__name__ = original_function.__name__
124                self.__called = False
125
126            def __call__(self, *args, **kwargs):
127                if self.__called:
128                    return self.__original_function(*args, **kwargs)
129                self.__called = True
130                raise InvalidAuthToken('message', 401)
131
132        self.simulator.create_bucket = InvalidAuthTokenWrapper(self.simulator.create_bucket)
133        self.bucket = self.api.create_bucket('your-bucket', 'allPublic')
134
135
136class TestListParts(TestCaseWithBucket):
137    def testEmpty(self):
138        file1 = self.bucket.start_large_file('file1.txt', 'text/plain', {})
139        self.assertEqual([], list(self.bucket.list_parts(file1.file_id, batch_size=1)))
140
141    def testThree(self):
142        file1 = self.bucket.start_large_file('file1.txt', 'text/plain', {})
143        content = six.b('hello world')
144        content_sha1 = hex_sha1_of_bytes(content)
145        large_file_upload_state = mock.MagicMock()
146        large_file_upload_state.has_error.return_value = False
147        self.bucket._upload_part(
148            file1.file_id, 1, (0, 11), UploadSourceBytes(content), large_file_upload_state
149        )
150        self.bucket._upload_part(
151            file1.file_id, 2, (0, 11), UploadSourceBytes(content), large_file_upload_state
152        )
153        self.bucket._upload_part(
154            file1.file_id, 3, (0, 11), UploadSourceBytes(content), large_file_upload_state
155        )
156        expected_parts = [
157            Part('9999', 1, 11, content_sha1),
158            Part('9999', 2, 11, content_sha1),
159            Part('9999', 3, 11, content_sha1),
160        ]
161        self.assertEqual(expected_parts, list(self.bucket.list_parts(file1.file_id, batch_size=1)))
162
163
164class TestUploadPart(TestCaseWithBucket):
165    def test_error_in_state(self):
166        file1 = self.bucket.start_large_file('file1.txt', 'text/plain', {})
167        content = six.b('hello world')
168        file_progress_listener = mock.MagicMock()
169        large_file_upload_state = LargeFileUploadState(file_progress_listener)
170        large_file_upload_state.set_error('test error')
171        try:
172            self.bucket._upload_part(
173                file1.file_id, 1, (0, 11), UploadSourceBytes(content), large_file_upload_state
174            )
175            self.fail('should have thrown')
176        except AlreadyFailed:
177            pass
178
179
180class TestListUnfinished(TestCaseWithBucket):
181    def test_empty(self):
182        self.assertEqual([], list(self.bucket.list_unfinished_large_files()))
183
184    def test_one(self):
185        file1 = self.bucket.start_large_file('file1.txt', 'text/plain', {})
186        self.assertEqual([file1], list(self.bucket.list_unfinished_large_files()))
187
188    def test_three(self):
189        file1 = self.bucket.start_large_file('file1.txt', 'text/plain', {})
190        file2 = self.bucket.start_large_file('file2.txt', 'text/plain', {})
191        file3 = self.bucket.start_large_file('file3.txt', 'text/plain', {})
192        self.assertEqual(
193            [file1, file2, file3], list(self.bucket.list_unfinished_large_files(batch_size=1))
194        )
195
196    def _make_file(self, file_id, file_name):
197        return self.bucket.start_large_file(file_name, 'text/plain', {})
198
199
200class TestLs(TestCaseWithBucket):
201    def test_empty(self):
202        self.assertEqual([], list(self.bucket.ls('foo')))
203
204    def test_one_file_at_root(self):
205        data = six.b('hello world')
206        self.bucket.upload_bytes(data, 'hello.txt')
207        expected = [('hello.txt', 11, 'upload', None)]
208        self.assertBucketContents(expected, '')
209
210    def test_three_files_at_root(self):
211        data = six.b('hello world')
212        self.bucket.upload_bytes(data, 'a')
213        self.bucket.upload_bytes(data, 'bb')
214        self.bucket.upload_bytes(data, 'ccc')
215        expected = [
216            ('a', 11, 'upload', None), ('bb', 11, 'upload', None), ('ccc', 11, 'upload', None)
217        ]
218        self.assertBucketContents(expected, '')
219
220    def test_three_files_in_dir(self):
221        data = six.b('hello world')
222        self.bucket.upload_bytes(data, 'a')
223        self.bucket.upload_bytes(data, 'bb/1')
224        self.bucket.upload_bytes(data, 'bb/2/sub1')
225        self.bucket.upload_bytes(data, 'bb/2/sub2')
226        self.bucket.upload_bytes(data, 'bb/3')
227        self.bucket.upload_bytes(data, 'ccc')
228        expected = [
229            ('bb/1', 11, 'upload', None), ('bb/2/sub1', 11, 'upload', 'bb/2/'),
230            ('bb/3', 11, 'upload', None)
231        ]
232        self.assertBucketContents(expected, 'bb', fetch_count=1)
233
234    def test_three_files_multiple_versions(self):
235        data = six.b('hello world')
236        self.bucket.upload_bytes(data, 'a')
237        self.bucket.upload_bytes(data, 'bb/1')
238        self.bucket.upload_bytes(data, 'bb/2')
239        self.bucket.upload_bytes(data, 'bb/2')
240        self.bucket.upload_bytes(data, 'bb/2')
241        self.bucket.upload_bytes(data, 'bb/3')
242        self.bucket.upload_bytes(data, 'ccc')
243        expected = [
244            ('9998', 'bb/1', 11, 'upload', None),
245            ('9995', 'bb/2', 11, 'upload', None),
246            ('9996', 'bb/2', 11, 'upload', None),
247            ('9997', 'bb/2', 11, 'upload', None),
248            ('9994', 'bb/3', 11, 'upload', None),
249        ]
250        actual = [
251            (info.id_, info.file_name, info.size, info.action, folder)
252            for (info, folder) in self.bucket.ls('bb', show_versions=True, fetch_count=1)
253        ]
254        self.assertEqual(expected, actual)
255
256    def test_started_large_file(self):
257        self.bucket.start_large_file('hello.txt')
258        expected = [('hello.txt', 0, 'start', None)]
259        self.assertBucketContents(expected, '', show_versions=True)
260
261    def test_hidden_file(self):
262        data = six.b('hello world')
263        self.bucket.upload_bytes(data, 'hello.txt')
264        self.bucket.hide_file('hello.txt')
265        expected = [('hello.txt', 0, 'hide', None), ('hello.txt', 11, 'upload', None)]
266        self.assertBucketContents(expected, '', show_versions=True)
267
268    def test_delete_file_version(self):
269        data = six.b('hello world')
270        self.bucket.upload_bytes(data, 'hello.txt')
271
272        files = self.bucket.list_file_names('hello.txt', 1)['files']
273        file_dict = files[0]
274        file_id = file_dict['fileId']
275
276        data = six.b('hello new world')
277        self.bucket.upload_bytes(data, 'hello.txt')
278        self.bucket.delete_file_version(file_id, 'hello.txt')
279
280        expected = [('hello.txt', 15, 'upload', None)]
281        self.assertBucketContents(expected, '', show_versions=True)
282
283
284class TestUpload(TestCaseWithBucket):
285    def test_upload_bytes(self):
286        data = six.b('hello world')
287        file_info = self.bucket.upload_bytes(data, 'file1')
288        self.assertTrue(isinstance(file_info, FileVersionInfo))
289        self._check_file_contents('file1', data)
290
291    def test_upload_bytes_progress(self):
292        data = six.b('hello world')
293        progress_listener = StubProgressListener()
294        self.bucket.upload_bytes(data, 'file1', progress_listener=progress_listener)
295        self.assertEqual("11: 11 closed", progress_listener.get_history())
296
297    def test_upload_local_file(self):
298        with TempDir() as d:
299            path = os.path.join(d, 'file1')
300            data = six.b('hello world')
301            write_file(path, data)
302            self.bucket.upload_local_file(path, 'file1')
303            self._check_file_contents('file1', data)
304
305    def test_upload_fifo(self):
306        if platform.system().lower().startswith('java'):
307            raise SkipTest('in Jython 2.7.1b3 there is no os.mkfifo()')
308        with TempDir() as d:
309            path = os.path.join(d, 'file1')
310            os.mkfifo(path)
311            with self.assertRaises(InvalidUploadSource):
312                self.bucket.upload_local_file(path, 'file1')
313
314    def test_upload_dead_symlink(self):
315        with TempDir() as d:
316            path = os.path.join(d, 'file1')
317            os.symlink('non-existing', path)
318            with self.assertRaises(InvalidUploadSource):
319                self.bucket.upload_local_file(path, 'file1')
320
321    def test_upload_one_retryable_error(self):
322        self.simulator.set_upload_errors([CanRetry(True)])
323        data = six.b('hello world')
324        self.bucket.upload_bytes(data, 'file1')
325
326    def test_upload_file_one_fatal_error(self):
327        self.simulator.set_upload_errors([CanRetry(False)])
328        data = six.b('hello world')
329        with self.assertRaises(CanRetry):
330            self.bucket.upload_bytes(data, 'file1')
331
332    def test_upload_file_too_many_retryable_errors(self):
333        self.simulator.set_upload_errors([CanRetry(True)] * 6)
334        data = six.b('hello world')
335        with self.assertRaises(MaxRetriesExceeded):
336            self.bucket.upload_bytes(data, 'file1')
337
338    def test_upload_large(self):
339        data = self._make_data(self.simulator.MIN_PART_SIZE * 3)
340        progress_listener = StubProgressListener()
341        self.bucket.upload_bytes(data, 'file1', progress_listener=progress_listener)
342        self._check_file_contents('file1', data)
343        self.assertEqual("600: 200 400 600 closed", progress_listener.get_history())
344
345    def test_upload_large_resume(self):
346        part_size = self.simulator.MIN_PART_SIZE
347        data = self._make_data(part_size * 3)
348        large_file_id = self._start_large_file('file1')
349        self._upload_part(large_file_id, 1, data[:part_size])
350        progress_listener = StubProgressListener()
351        file_info = self.bucket.upload_bytes(data, 'file1', progress_listener=progress_listener)
352        self.assertEqual(large_file_id, file_info.id_)
353        self._check_file_contents('file1', data)
354        self.assertEqual("600: 200 400 600 closed", progress_listener.get_history())
355
356    def test_upload_large_resume_no_parts(self):
357        part_size = self.simulator.MIN_PART_SIZE
358        data = self._make_data(part_size * 3)
359        large_file_id = self._start_large_file('file1')
360        progress_listener = StubProgressListener()
361        file_info = self.bucket.upload_bytes(data, 'file1', progress_listener=progress_listener)
362        self.assertNotEqual(large_file_id, file_info.id_)  # it's not a match if there are no parts
363        self._check_file_contents('file1', data)
364        self.assertEqual("600: 200 400 600 closed", progress_listener.get_history())
365
366    def test_upload_large_resume_all_parts_there(self):
367        part_size = self.simulator.MIN_PART_SIZE
368        data = self._make_data(part_size * 3)
369        large_file_id = self._start_large_file('file1')
370        self._upload_part(large_file_id, 1, data[:part_size])
371        self._upload_part(large_file_id, 2, data[part_size:2 * part_size])
372        self._upload_part(large_file_id, 3, data[2 * part_size:])
373        progress_listener = StubProgressListener()
374        file_info = self.bucket.upload_bytes(data, 'file1', progress_listener=progress_listener)
375        self.assertEqual(large_file_id, file_info.id_)
376        self._check_file_contents('file1', data)
377        self.assertEqual("600: 200 400 600 closed", progress_listener.get_history())
378
379    def test_upload_large_resume_part_does_not_match(self):
380        part_size = self.simulator.MIN_PART_SIZE
381        data = self._make_data(part_size * 3)
382        large_file_id = self._start_large_file('file1')
383        self._upload_part(large_file_id, 3, data[:part_size])  # wrong part number for this data
384        progress_listener = StubProgressListener()
385        file_info = self.bucket.upload_bytes(data, 'file1', progress_listener=progress_listener)
386        self.assertNotEqual(large_file_id, file_info.id_)
387        self._check_file_contents('file1', data)
388        self.assertEqual("600: 200 400 600 closed", progress_listener.get_history())
389
390    def test_upload_large_resume_wrong_part_size(self):
391        part_size = self.simulator.MIN_PART_SIZE
392        data = self._make_data(part_size * 3)
393        large_file_id = self._start_large_file('file1')
394        self._upload_part(large_file_id, 1, data[:part_size + 1])  # one byte to much
395        progress_listener = StubProgressListener()
396        file_info = self.bucket.upload_bytes(data, 'file1', progress_listener=progress_listener)
397        self.assertNotEqual(large_file_id, file_info.id_)
398        self._check_file_contents('file1', data)
399        self.assertEqual("600: 200 400 600 closed", progress_listener.get_history())
400
401    def test_upload_large_resume_file_info(self):
402        part_size = self.simulator.MIN_PART_SIZE
403        data = self._make_data(part_size * 3)
404        large_file_id = self._start_large_file('file1', {'property': 'value1'})
405        self._upload_part(large_file_id, 1, data[:part_size])
406        progress_listener = StubProgressListener()
407        file_info = self.bucket.upload_bytes(
408            data, 'file1', progress_listener=progress_listener, file_infos={'property': 'value1'}
409        )
410        self.assertEqual(large_file_id, file_info.id_)
411        self._check_file_contents('file1', data)
412        self.assertEqual("600: 200 400 600 closed", progress_listener.get_history())
413
414    def test_upload_large_resume_file_info_does_not_match(self):
415        part_size = self.simulator.MIN_PART_SIZE
416        data = self._make_data(part_size * 3)
417        large_file_id = self._start_large_file('file1', {'property': 'value1'})
418        self._upload_part(large_file_id, 1, data[:part_size])
419        progress_listener = StubProgressListener()
420        file_info = self.bucket.upload_bytes(
421            data, 'file1', progress_listener=progress_listener, file_infos={'property': 'value2'}
422        )
423        self.assertNotEqual(large_file_id, file_info.id_)
424        self._check_file_contents('file1', data)
425        self.assertEqual("600: 200 400 600 closed", progress_listener.get_history())
426
427    def _start_large_file(self, file_name, file_info=None):
428        if file_info is None:
429            file_info = {}
430        large_file_info = self.simulator.start_large_file(
431            self.api_url, self.account_auth_token, self.bucket_id, file_name, None, file_info
432        )
433        return large_file_info['fileId']
434
435    def _upload_part(self, large_file_id, part_number, part_data):
436        part_stream = six.BytesIO(part_data)
437        upload_info = self.simulator.get_upload_part_url(
438            self.api_url, self.account_auth_token, large_file_id
439        )
440        self.simulator.upload_part(
441            upload_info['uploadUrl'], upload_info['authorizationToken'], part_number,
442            len(part_data), hex_sha1_of_bytes(part_data), part_stream
443        )
444
445    def _check_file_contents(self, file_name, expected_contents):
446        download = DownloadDestBytes()
447        self.bucket.download_file_by_name(file_name, download)
448        self.assertEqual(expected_contents, download.get_bytes_written())
449
450    def _make_data(self, approximate_length):
451        """
452        Generate a sequence of bytes to use in testing an upload.
453        Don't repeat a short pattern, so we're sure that the different
454        parts of a large file are actually different.
455
456        Returns bytes.
457        """
458        fragments = []
459        so_far = 0
460        while so_far < approximate_length:
461            fragment = ('%d:' % so_far).encode('utf-8')
462            so_far += len(fragment)
463            fragments.append(fragment)
464        return six.b('').join(fragments)
465
466
467class DownloadTests(object):
468    def setUp(self):
469        super(DownloadTests, self).setUp()
470        self.file_info = self.bucket.upload_bytes(six.b('hello world'), 'file1')
471        self.download_dest = DownloadDestBytes()
472        self.progress_listener = StubProgressListener()
473
474    def _verify(self, expected_result):
475        assert self.download_dest.get_bytes_written() == six.b(expected_result)
476        assert self.progress_listener.is_valid()
477
478    def test_download_by_id_no_progress(self):
479        self.bucket.download_file_by_id(self.file_info.id_, self.download_dest)
480
481    def test_download_by_name_no_progress(self):
482        self.bucket.download_file_by_name('file1', self.download_dest)
483
484    def test_download_by_name_progress(self):
485        self.bucket.download_file_by_name('file1', self.download_dest, self.progress_listener)
486        self._verify('hello world')
487
488    def test_download_by_id_progress(self):
489        self.bucket.download_file_by_id(
490            self.file_info.id_, self.download_dest, self.progress_listener
491        )
492        self._verify('hello world')
493
494    def test_download_by_id_progress_partial(self):
495        self.bucket.download_file_by_id(
496            self.file_info.id_, self.download_dest, self.progress_listener, range_=(3, 9)
497        )
498        self._verify('lo worl')
499
500    def test_download_by_id_progress_exact_range(self):
501        self.bucket.download_file_by_id(
502            self.file_info.id_, self.download_dest, self.progress_listener, range_=(0, 10)
503        )
504        self._verify('hello world')
505
506    def test_download_by_id_progress_range_one_off(self):
507        with self.assertRaises(
508            InvalidRange,
509            msg='A range of 0-11 was requested (size of 12), but cloud could only serve 11 of that',
510        ):
511            self.bucket.download_file_by_id(
512                self.file_info.id_,
513                self.download_dest,
514                self.progress_listener,
515                range_=(0, 11),
516            )
517
518    def test_download_by_id_progress_partial_inplace_overwrite(self):
519        # LOCAL is
520        # 12345678901234567890
521        #
522        # and then:
523        #
524        # hello world
525        #    |||||||
526        #    |||||||
527        #    vvvvvvv
528        #
529        # 123lo worl1234567890
530
531        with TempDir() as d:
532            path = os.path.join(d, 'file2')
533            download_dest = PreSeekedDownloadDest(seek_target=3, local_file_path=path)
534            data = six.b('12345678901234567890')
535            write_file(path, data)
536            self.bucket.download_file_by_id(
537                self.file_info.id_,
538                download_dest,
539                self.progress_listener,
540                range_=(3, 9),
541            )
542            self._check_local_file_contents(path, six.b('123lo worl1234567890'))
543
544    def test_download_by_id_progress_partial_shifted_overwrite(self):
545        # LOCAL is
546        # 12345678901234567890
547        #
548        # and then:
549        #
550        # hello world
551        #    |||||||
552        #    \\\\\\\
553        #     \\\\\\\
554        #      \\\\\\\
555        #       \\\\\\\
556        #        \\\\\\\
557        #        |||||||
558        #        vvvvvvv
559        #
560        # 1234567lo worl567890
561
562        with TempDir() as d:
563            path = os.path.join(d, 'file2')
564            download_dest = PreSeekedDownloadDest(seek_target=7, local_file_path=path)
565            data = six.b('12345678901234567890')
566            write_file(path, data)
567            self.bucket.download_file_by_id(
568                self.file_info.id_,
569                download_dest,
570                self.progress_listener,
571                range_=(3, 9),
572            )
573            self._check_local_file_contents(path, six.b('1234567lo worl567890'))
574
575    def _check_local_file_contents(self, path, expected_contents):
576        with open(path, 'rb') as f:
577            contents = f.read()
578            self.assertEqual(contents, expected_contents)
579
580
581class TestDownloadDefault(DownloadTests, TestCaseWithBucket):
582    pass
583
584
585class TestDownloadSimple(DownloadTests, TestCaseWithBucket):
586    def setUp(self):
587        super(TestDownloadSimple, self).setUp()
588        self.bucket.api.transferer.strategies = [SimpleDownloader(force_chunk_size=20,)]
589
590
591class TestDownloadParallel(DownloadTests, TestCaseWithBucket):
592    def setUp(self):
593        super(TestDownloadParallel, self).setUp()
594        self.bucket.api.transferer.strategies = [
595            ParallelDownloader(
596                force_chunk_size=2,
597                max_streams=999,
598                min_part_size=2,
599            )
600        ]
601