1###################################################################### 2# 3# File: test_bucket.py 4# 5# Copyright 2018 Backblaze Inc. All Rights Reserved. 6# 7# License https://www.backblaze.com/using_b2_code.html 8# 9###################################################################### 10 11from __future__ import absolute_import, division 12 13from nose import SkipTest 14import os 15import platform 16 17import six 18 19from .stub_account_info import StubAccountInfo 20from .test_base import TestBase 21from b2.api import B2Api 22from b2.bucket import LargeFileUploadState 23from b2.download_dest import DownloadDestBytes, PreSeekedDownloadDest 24from b2.exception import AlreadyFailed, B2Error, InvalidAuthToken, InvalidRange, InvalidUploadSource, MaxRetriesExceeded 25from b2.file_version import FileVersionInfo 26from b2.part import Part 27from b2.progress import AbstractProgressListener 28from b2.raw_simulator import RawSimulator 29from b2.transferer.parallel import ParallelDownloader 30from b2.transferer.simple import SimpleDownloader 31from b2.upload_source import UploadSourceBytes 32from b2.utils import hex_sha1_of_bytes, TempDir 33 34try: 35 import unittest.mock as mock 36except ImportError: 37 import mock 38 39 40def write_file(path, data): 41 with open(path, 'wb') as f: 42 f.write(data) 43 44 45class StubProgressListener(AbstractProgressListener): 46 """ 47 Implementation of a progress listener that remembers what calls were made, 48 and returns them as a short string to use in unit tests. 49 50 For a total byte count of 100, and updates at 33 and 66, the returned 51 string looks like: "100: 33 66" 52 """ 53 54 def __init__(self): 55 self.total = None 56 self.history = [] 57 self.last_byte_count = 0 58 59 def get_history(self): 60 return ' '.join(self.history) 61 62 def set_total_bytes(self, total_byte_count): 63 assert total_byte_count is not None 64 assert self.total is None, 'set_total_bytes called twice' 65 self.total = total_byte_count 66 assert len(self.history) == 0, self.history 67 self.history.append('%d:' % (total_byte_count,)) 68 69 def bytes_completed(self, byte_count): 70 assert byte_count >= self.last_byte_count 71 self.last_byte_count = byte_count 72 self.history.append(str(byte_count)) 73 74 def is_valid(self): 75 return self.total == self.last_byte_count 76 77 def close(self): 78 self.history.append('closed') 79 80 81class CanRetry(B2Error): 82 """ 83 An exception that can be retryable, or not. 84 """ 85 86 def __init__(self, can_retry): 87 super(CanRetry, self).__init__(None, None, None, None, None) 88 self.can_retry = can_retry 89 90 def should_retry_upload(self): 91 return self.can_retry 92 93 94class TestCaseWithBucket(TestBase): 95 def setUp(self): 96 self.bucket_name = 'my-bucket' 97 self.simulator = RawSimulator() 98 self.account_info = StubAccountInfo() 99 self.api = B2Api(self.account_info, raw_api=self.simulator) 100 (self.account_id, self.master_key) = self.simulator.create_account() 101 self.api.authorize_account('production', self.account_id, self.master_key) 102 self.api_url = self.account_info.get_api_url() 103 self.account_auth_token = self.account_info.get_account_auth_token() 104 self.bucket = self.api.create_bucket('my-bucket', 'allPublic') 105 self.bucket_id = self.bucket.id_ 106 107 def assertBucketContents(self, expected, *args, **kwargs): 108 """ 109 *args and **kwargs are passed to self.bucket.ls() 110 """ 111 actual = [ 112 (info.file_name, info.size, info.action, folder) 113 for (info, folder) in self.bucket.ls(*args, **kwargs) 114 ] 115 self.assertEqual(expected, actual) 116 117 118class TestReauthorization(TestCaseWithBucket): 119 def testCreateBucket(self): 120 class InvalidAuthTokenWrapper(object): 121 def __init__(self, original_function): 122 self.__original_function = original_function 123 self.__name__ = original_function.__name__ 124 self.__called = False 125 126 def __call__(self, *args, **kwargs): 127 if self.__called: 128 return self.__original_function(*args, **kwargs) 129 self.__called = True 130 raise InvalidAuthToken('message', 401) 131 132 self.simulator.create_bucket = InvalidAuthTokenWrapper(self.simulator.create_bucket) 133 self.bucket = self.api.create_bucket('your-bucket', 'allPublic') 134 135 136class TestListParts(TestCaseWithBucket): 137 def testEmpty(self): 138 file1 = self.bucket.start_large_file('file1.txt', 'text/plain', {}) 139 self.assertEqual([], list(self.bucket.list_parts(file1.file_id, batch_size=1))) 140 141 def testThree(self): 142 file1 = self.bucket.start_large_file('file1.txt', 'text/plain', {}) 143 content = six.b('hello world') 144 content_sha1 = hex_sha1_of_bytes(content) 145 large_file_upload_state = mock.MagicMock() 146 large_file_upload_state.has_error.return_value = False 147 self.bucket._upload_part( 148 file1.file_id, 1, (0, 11), UploadSourceBytes(content), large_file_upload_state 149 ) 150 self.bucket._upload_part( 151 file1.file_id, 2, (0, 11), UploadSourceBytes(content), large_file_upload_state 152 ) 153 self.bucket._upload_part( 154 file1.file_id, 3, (0, 11), UploadSourceBytes(content), large_file_upload_state 155 ) 156 expected_parts = [ 157 Part('9999', 1, 11, content_sha1), 158 Part('9999', 2, 11, content_sha1), 159 Part('9999', 3, 11, content_sha1), 160 ] 161 self.assertEqual(expected_parts, list(self.bucket.list_parts(file1.file_id, batch_size=1))) 162 163 164class TestUploadPart(TestCaseWithBucket): 165 def test_error_in_state(self): 166 file1 = self.bucket.start_large_file('file1.txt', 'text/plain', {}) 167 content = six.b('hello world') 168 file_progress_listener = mock.MagicMock() 169 large_file_upload_state = LargeFileUploadState(file_progress_listener) 170 large_file_upload_state.set_error('test error') 171 try: 172 self.bucket._upload_part( 173 file1.file_id, 1, (0, 11), UploadSourceBytes(content), large_file_upload_state 174 ) 175 self.fail('should have thrown') 176 except AlreadyFailed: 177 pass 178 179 180class TestListUnfinished(TestCaseWithBucket): 181 def test_empty(self): 182 self.assertEqual([], list(self.bucket.list_unfinished_large_files())) 183 184 def test_one(self): 185 file1 = self.bucket.start_large_file('file1.txt', 'text/plain', {}) 186 self.assertEqual([file1], list(self.bucket.list_unfinished_large_files())) 187 188 def test_three(self): 189 file1 = self.bucket.start_large_file('file1.txt', 'text/plain', {}) 190 file2 = self.bucket.start_large_file('file2.txt', 'text/plain', {}) 191 file3 = self.bucket.start_large_file('file3.txt', 'text/plain', {}) 192 self.assertEqual( 193 [file1, file2, file3], list(self.bucket.list_unfinished_large_files(batch_size=1)) 194 ) 195 196 def _make_file(self, file_id, file_name): 197 return self.bucket.start_large_file(file_name, 'text/plain', {}) 198 199 200class TestLs(TestCaseWithBucket): 201 def test_empty(self): 202 self.assertEqual([], list(self.bucket.ls('foo'))) 203 204 def test_one_file_at_root(self): 205 data = six.b('hello world') 206 self.bucket.upload_bytes(data, 'hello.txt') 207 expected = [('hello.txt', 11, 'upload', None)] 208 self.assertBucketContents(expected, '') 209 210 def test_three_files_at_root(self): 211 data = six.b('hello world') 212 self.bucket.upload_bytes(data, 'a') 213 self.bucket.upload_bytes(data, 'bb') 214 self.bucket.upload_bytes(data, 'ccc') 215 expected = [ 216 ('a', 11, 'upload', None), ('bb', 11, 'upload', None), ('ccc', 11, 'upload', None) 217 ] 218 self.assertBucketContents(expected, '') 219 220 def test_three_files_in_dir(self): 221 data = six.b('hello world') 222 self.bucket.upload_bytes(data, 'a') 223 self.bucket.upload_bytes(data, 'bb/1') 224 self.bucket.upload_bytes(data, 'bb/2/sub1') 225 self.bucket.upload_bytes(data, 'bb/2/sub2') 226 self.bucket.upload_bytes(data, 'bb/3') 227 self.bucket.upload_bytes(data, 'ccc') 228 expected = [ 229 ('bb/1', 11, 'upload', None), ('bb/2/sub1', 11, 'upload', 'bb/2/'), 230 ('bb/3', 11, 'upload', None) 231 ] 232 self.assertBucketContents(expected, 'bb', fetch_count=1) 233 234 def test_three_files_multiple_versions(self): 235 data = six.b('hello world') 236 self.bucket.upload_bytes(data, 'a') 237 self.bucket.upload_bytes(data, 'bb/1') 238 self.bucket.upload_bytes(data, 'bb/2') 239 self.bucket.upload_bytes(data, 'bb/2') 240 self.bucket.upload_bytes(data, 'bb/2') 241 self.bucket.upload_bytes(data, 'bb/3') 242 self.bucket.upload_bytes(data, 'ccc') 243 expected = [ 244 ('9998', 'bb/1', 11, 'upload', None), 245 ('9995', 'bb/2', 11, 'upload', None), 246 ('9996', 'bb/2', 11, 'upload', None), 247 ('9997', 'bb/2', 11, 'upload', None), 248 ('9994', 'bb/3', 11, 'upload', None), 249 ] 250 actual = [ 251 (info.id_, info.file_name, info.size, info.action, folder) 252 for (info, folder) in self.bucket.ls('bb', show_versions=True, fetch_count=1) 253 ] 254 self.assertEqual(expected, actual) 255 256 def test_started_large_file(self): 257 self.bucket.start_large_file('hello.txt') 258 expected = [('hello.txt', 0, 'start', None)] 259 self.assertBucketContents(expected, '', show_versions=True) 260 261 def test_hidden_file(self): 262 data = six.b('hello world') 263 self.bucket.upload_bytes(data, 'hello.txt') 264 self.bucket.hide_file('hello.txt') 265 expected = [('hello.txt', 0, 'hide', None), ('hello.txt', 11, 'upload', None)] 266 self.assertBucketContents(expected, '', show_versions=True) 267 268 def test_delete_file_version(self): 269 data = six.b('hello world') 270 self.bucket.upload_bytes(data, 'hello.txt') 271 272 files = self.bucket.list_file_names('hello.txt', 1)['files'] 273 file_dict = files[0] 274 file_id = file_dict['fileId'] 275 276 data = six.b('hello new world') 277 self.bucket.upload_bytes(data, 'hello.txt') 278 self.bucket.delete_file_version(file_id, 'hello.txt') 279 280 expected = [('hello.txt', 15, 'upload', None)] 281 self.assertBucketContents(expected, '', show_versions=True) 282 283 284class TestUpload(TestCaseWithBucket): 285 def test_upload_bytes(self): 286 data = six.b('hello world') 287 file_info = self.bucket.upload_bytes(data, 'file1') 288 self.assertTrue(isinstance(file_info, FileVersionInfo)) 289 self._check_file_contents('file1', data) 290 291 def test_upload_bytes_progress(self): 292 data = six.b('hello world') 293 progress_listener = StubProgressListener() 294 self.bucket.upload_bytes(data, 'file1', progress_listener=progress_listener) 295 self.assertEqual("11: 11 closed", progress_listener.get_history()) 296 297 def test_upload_local_file(self): 298 with TempDir() as d: 299 path = os.path.join(d, 'file1') 300 data = six.b('hello world') 301 write_file(path, data) 302 self.bucket.upload_local_file(path, 'file1') 303 self._check_file_contents('file1', data) 304 305 def test_upload_fifo(self): 306 if platform.system().lower().startswith('java'): 307 raise SkipTest('in Jython 2.7.1b3 there is no os.mkfifo()') 308 with TempDir() as d: 309 path = os.path.join(d, 'file1') 310 os.mkfifo(path) 311 with self.assertRaises(InvalidUploadSource): 312 self.bucket.upload_local_file(path, 'file1') 313 314 def test_upload_dead_symlink(self): 315 with TempDir() as d: 316 path = os.path.join(d, 'file1') 317 os.symlink('non-existing', path) 318 with self.assertRaises(InvalidUploadSource): 319 self.bucket.upload_local_file(path, 'file1') 320 321 def test_upload_one_retryable_error(self): 322 self.simulator.set_upload_errors([CanRetry(True)]) 323 data = six.b('hello world') 324 self.bucket.upload_bytes(data, 'file1') 325 326 def test_upload_file_one_fatal_error(self): 327 self.simulator.set_upload_errors([CanRetry(False)]) 328 data = six.b('hello world') 329 with self.assertRaises(CanRetry): 330 self.bucket.upload_bytes(data, 'file1') 331 332 def test_upload_file_too_many_retryable_errors(self): 333 self.simulator.set_upload_errors([CanRetry(True)] * 6) 334 data = six.b('hello world') 335 with self.assertRaises(MaxRetriesExceeded): 336 self.bucket.upload_bytes(data, 'file1') 337 338 def test_upload_large(self): 339 data = self._make_data(self.simulator.MIN_PART_SIZE * 3) 340 progress_listener = StubProgressListener() 341 self.bucket.upload_bytes(data, 'file1', progress_listener=progress_listener) 342 self._check_file_contents('file1', data) 343 self.assertEqual("600: 200 400 600 closed", progress_listener.get_history()) 344 345 def test_upload_large_resume(self): 346 part_size = self.simulator.MIN_PART_SIZE 347 data = self._make_data(part_size * 3) 348 large_file_id = self._start_large_file('file1') 349 self._upload_part(large_file_id, 1, data[:part_size]) 350 progress_listener = StubProgressListener() 351 file_info = self.bucket.upload_bytes(data, 'file1', progress_listener=progress_listener) 352 self.assertEqual(large_file_id, file_info.id_) 353 self._check_file_contents('file1', data) 354 self.assertEqual("600: 200 400 600 closed", progress_listener.get_history()) 355 356 def test_upload_large_resume_no_parts(self): 357 part_size = self.simulator.MIN_PART_SIZE 358 data = self._make_data(part_size * 3) 359 large_file_id = self._start_large_file('file1') 360 progress_listener = StubProgressListener() 361 file_info = self.bucket.upload_bytes(data, 'file1', progress_listener=progress_listener) 362 self.assertNotEqual(large_file_id, file_info.id_) # it's not a match if there are no parts 363 self._check_file_contents('file1', data) 364 self.assertEqual("600: 200 400 600 closed", progress_listener.get_history()) 365 366 def test_upload_large_resume_all_parts_there(self): 367 part_size = self.simulator.MIN_PART_SIZE 368 data = self._make_data(part_size * 3) 369 large_file_id = self._start_large_file('file1') 370 self._upload_part(large_file_id, 1, data[:part_size]) 371 self._upload_part(large_file_id, 2, data[part_size:2 * part_size]) 372 self._upload_part(large_file_id, 3, data[2 * part_size:]) 373 progress_listener = StubProgressListener() 374 file_info = self.bucket.upload_bytes(data, 'file1', progress_listener=progress_listener) 375 self.assertEqual(large_file_id, file_info.id_) 376 self._check_file_contents('file1', data) 377 self.assertEqual("600: 200 400 600 closed", progress_listener.get_history()) 378 379 def test_upload_large_resume_part_does_not_match(self): 380 part_size = self.simulator.MIN_PART_SIZE 381 data = self._make_data(part_size * 3) 382 large_file_id = self._start_large_file('file1') 383 self._upload_part(large_file_id, 3, data[:part_size]) # wrong part number for this data 384 progress_listener = StubProgressListener() 385 file_info = self.bucket.upload_bytes(data, 'file1', progress_listener=progress_listener) 386 self.assertNotEqual(large_file_id, file_info.id_) 387 self._check_file_contents('file1', data) 388 self.assertEqual("600: 200 400 600 closed", progress_listener.get_history()) 389 390 def test_upload_large_resume_wrong_part_size(self): 391 part_size = self.simulator.MIN_PART_SIZE 392 data = self._make_data(part_size * 3) 393 large_file_id = self._start_large_file('file1') 394 self._upload_part(large_file_id, 1, data[:part_size + 1]) # one byte to much 395 progress_listener = StubProgressListener() 396 file_info = self.bucket.upload_bytes(data, 'file1', progress_listener=progress_listener) 397 self.assertNotEqual(large_file_id, file_info.id_) 398 self._check_file_contents('file1', data) 399 self.assertEqual("600: 200 400 600 closed", progress_listener.get_history()) 400 401 def test_upload_large_resume_file_info(self): 402 part_size = self.simulator.MIN_PART_SIZE 403 data = self._make_data(part_size * 3) 404 large_file_id = self._start_large_file('file1', {'property': 'value1'}) 405 self._upload_part(large_file_id, 1, data[:part_size]) 406 progress_listener = StubProgressListener() 407 file_info = self.bucket.upload_bytes( 408 data, 'file1', progress_listener=progress_listener, file_infos={'property': 'value1'} 409 ) 410 self.assertEqual(large_file_id, file_info.id_) 411 self._check_file_contents('file1', data) 412 self.assertEqual("600: 200 400 600 closed", progress_listener.get_history()) 413 414 def test_upload_large_resume_file_info_does_not_match(self): 415 part_size = self.simulator.MIN_PART_SIZE 416 data = self._make_data(part_size * 3) 417 large_file_id = self._start_large_file('file1', {'property': 'value1'}) 418 self._upload_part(large_file_id, 1, data[:part_size]) 419 progress_listener = StubProgressListener() 420 file_info = self.bucket.upload_bytes( 421 data, 'file1', progress_listener=progress_listener, file_infos={'property': 'value2'} 422 ) 423 self.assertNotEqual(large_file_id, file_info.id_) 424 self._check_file_contents('file1', data) 425 self.assertEqual("600: 200 400 600 closed", progress_listener.get_history()) 426 427 def _start_large_file(self, file_name, file_info=None): 428 if file_info is None: 429 file_info = {} 430 large_file_info = self.simulator.start_large_file( 431 self.api_url, self.account_auth_token, self.bucket_id, file_name, None, file_info 432 ) 433 return large_file_info['fileId'] 434 435 def _upload_part(self, large_file_id, part_number, part_data): 436 part_stream = six.BytesIO(part_data) 437 upload_info = self.simulator.get_upload_part_url( 438 self.api_url, self.account_auth_token, large_file_id 439 ) 440 self.simulator.upload_part( 441 upload_info['uploadUrl'], upload_info['authorizationToken'], part_number, 442 len(part_data), hex_sha1_of_bytes(part_data), part_stream 443 ) 444 445 def _check_file_contents(self, file_name, expected_contents): 446 download = DownloadDestBytes() 447 self.bucket.download_file_by_name(file_name, download) 448 self.assertEqual(expected_contents, download.get_bytes_written()) 449 450 def _make_data(self, approximate_length): 451 """ 452 Generate a sequence of bytes to use in testing an upload. 453 Don't repeat a short pattern, so we're sure that the different 454 parts of a large file are actually different. 455 456 Returns bytes. 457 """ 458 fragments = [] 459 so_far = 0 460 while so_far < approximate_length: 461 fragment = ('%d:' % so_far).encode('utf-8') 462 so_far += len(fragment) 463 fragments.append(fragment) 464 return six.b('').join(fragments) 465 466 467class DownloadTests(object): 468 def setUp(self): 469 super(DownloadTests, self).setUp() 470 self.file_info = self.bucket.upload_bytes(six.b('hello world'), 'file1') 471 self.download_dest = DownloadDestBytes() 472 self.progress_listener = StubProgressListener() 473 474 def _verify(self, expected_result): 475 assert self.download_dest.get_bytes_written() == six.b(expected_result) 476 assert self.progress_listener.is_valid() 477 478 def test_download_by_id_no_progress(self): 479 self.bucket.download_file_by_id(self.file_info.id_, self.download_dest) 480 481 def test_download_by_name_no_progress(self): 482 self.bucket.download_file_by_name('file1', self.download_dest) 483 484 def test_download_by_name_progress(self): 485 self.bucket.download_file_by_name('file1', self.download_dest, self.progress_listener) 486 self._verify('hello world') 487 488 def test_download_by_id_progress(self): 489 self.bucket.download_file_by_id( 490 self.file_info.id_, self.download_dest, self.progress_listener 491 ) 492 self._verify('hello world') 493 494 def test_download_by_id_progress_partial(self): 495 self.bucket.download_file_by_id( 496 self.file_info.id_, self.download_dest, self.progress_listener, range_=(3, 9) 497 ) 498 self._verify('lo worl') 499 500 def test_download_by_id_progress_exact_range(self): 501 self.bucket.download_file_by_id( 502 self.file_info.id_, self.download_dest, self.progress_listener, range_=(0, 10) 503 ) 504 self._verify('hello world') 505 506 def test_download_by_id_progress_range_one_off(self): 507 with self.assertRaises( 508 InvalidRange, 509 msg='A range of 0-11 was requested (size of 12), but cloud could only serve 11 of that', 510 ): 511 self.bucket.download_file_by_id( 512 self.file_info.id_, 513 self.download_dest, 514 self.progress_listener, 515 range_=(0, 11), 516 ) 517 518 def test_download_by_id_progress_partial_inplace_overwrite(self): 519 # LOCAL is 520 # 12345678901234567890 521 # 522 # and then: 523 # 524 # hello world 525 # ||||||| 526 # ||||||| 527 # vvvvvvv 528 # 529 # 123lo worl1234567890 530 531 with TempDir() as d: 532 path = os.path.join(d, 'file2') 533 download_dest = PreSeekedDownloadDest(seek_target=3, local_file_path=path) 534 data = six.b('12345678901234567890') 535 write_file(path, data) 536 self.bucket.download_file_by_id( 537 self.file_info.id_, 538 download_dest, 539 self.progress_listener, 540 range_=(3, 9), 541 ) 542 self._check_local_file_contents(path, six.b('123lo worl1234567890')) 543 544 def test_download_by_id_progress_partial_shifted_overwrite(self): 545 # LOCAL is 546 # 12345678901234567890 547 # 548 # and then: 549 # 550 # hello world 551 # ||||||| 552 # \\\\\\\ 553 # \\\\\\\ 554 # \\\\\\\ 555 # \\\\\\\ 556 # \\\\\\\ 557 # ||||||| 558 # vvvvvvv 559 # 560 # 1234567lo worl567890 561 562 with TempDir() as d: 563 path = os.path.join(d, 'file2') 564 download_dest = PreSeekedDownloadDest(seek_target=7, local_file_path=path) 565 data = six.b('12345678901234567890') 566 write_file(path, data) 567 self.bucket.download_file_by_id( 568 self.file_info.id_, 569 download_dest, 570 self.progress_listener, 571 range_=(3, 9), 572 ) 573 self._check_local_file_contents(path, six.b('1234567lo worl567890')) 574 575 def _check_local_file_contents(self, path, expected_contents): 576 with open(path, 'rb') as f: 577 contents = f.read() 578 self.assertEqual(contents, expected_contents) 579 580 581class TestDownloadDefault(DownloadTests, TestCaseWithBucket): 582 pass 583 584 585class TestDownloadSimple(DownloadTests, TestCaseWithBucket): 586 def setUp(self): 587 super(TestDownloadSimple, self).setUp() 588 self.bucket.api.transferer.strategies = [SimpleDownloader(force_chunk_size=20,)] 589 590 591class TestDownloadParallel(DownloadTests, TestCaseWithBucket): 592 def setUp(self): 593 super(TestDownloadParallel, self).setUp() 594 self.bucket.api.transferer.strategies = [ 595 ParallelDownloader( 596 force_chunk_size=2, 597 max_streams=999, 598 min_part_size=2, 599 ) 600 ] 601