1# Copyright 2021 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import datetime 16 17import pytest 18import six 19 20from google.api_core import exceptions 21from . import _helpers 22 23 24def test_bucket_create_w_alt_storage_class(storage_client, buckets_to_delete): 25 from google.cloud.storage import constants 26 27 bucket_name = _helpers.unique_name("bucket-w-archive") 28 29 with pytest.raises(exceptions.NotFound): 30 storage_client.get_bucket(bucket_name) 31 32 bucket = storage_client.bucket(bucket_name) 33 bucket.storage_class = constants.ARCHIVE_STORAGE_CLASS 34 35 _helpers.retry_429_503(bucket.create)() 36 buckets_to_delete.append(bucket) 37 38 created = storage_client.get_bucket(bucket_name) 39 assert created.storage_class == constants.ARCHIVE_STORAGE_CLASS 40 41 42def test_bucket_lifecycle_rules(storage_client, buckets_to_delete): 43 from google.cloud.storage import constants 44 from google.cloud.storage.bucket import LifecycleRuleDelete 45 from google.cloud.storage.bucket import LifecycleRuleSetStorageClass 46 47 bucket_name = _helpers.unique_name("w-lifcycle-rules") 48 custom_time_before = datetime.date(2018, 8, 1) 49 noncurrent_before = datetime.date(2018, 8, 1) 50 51 with pytest.raises(exceptions.NotFound): 52 storage_client.get_bucket(bucket_name) 53 54 bucket = storage_client.bucket(bucket_name) 55 bucket.add_lifecycle_delete_rule( 56 age=42, 57 number_of_newer_versions=3, 58 days_since_custom_time=2, 59 custom_time_before=custom_time_before, 60 days_since_noncurrent_time=2, 61 noncurrent_time_before=noncurrent_before, 62 ) 63 bucket.add_lifecycle_set_storage_class_rule( 64 constants.COLDLINE_STORAGE_CLASS, 65 is_live=False, 66 matches_storage_class=[constants.NEARLINE_STORAGE_CLASS], 67 ) 68 69 expected_rules = [ 70 LifecycleRuleDelete( 71 age=42, 72 number_of_newer_versions=3, 73 days_since_custom_time=2, 74 custom_time_before=custom_time_before, 75 days_since_noncurrent_time=2, 76 noncurrent_time_before=noncurrent_before, 77 ), 78 LifecycleRuleSetStorageClass( 79 constants.COLDLINE_STORAGE_CLASS, 80 is_live=False, 81 matches_storage_class=[constants.NEARLINE_STORAGE_CLASS], 82 ), 83 ] 84 85 _helpers.retry_429_503(bucket.create)(location="us") 86 buckets_to_delete.append(bucket) 87 88 assert bucket.name == bucket_name 89 assert list(bucket.lifecycle_rules) == expected_rules 90 91 bucket.clear_lifecyle_rules() 92 bucket.patch() 93 94 assert list(bucket.lifecycle_rules) == [] 95 96 97def test_bucket_update_labels(storage_client, buckets_to_delete): 98 bucket_name = _helpers.unique_name("update-labels") 99 bucket = _helpers.retry_429_503(storage_client.create_bucket)(bucket_name) 100 buckets_to_delete.append(bucket) 101 assert bucket.exists() 102 103 updated_labels = {"test-label": "label-value"} 104 bucket.labels = updated_labels 105 bucket.update() 106 assert bucket.labels == updated_labels 107 108 new_labels = {"another-label": "another-value"} 109 bucket.labels = new_labels 110 bucket.patch() 111 assert bucket.labels == new_labels 112 113 bucket.labels = {} 114 # See https://github.com/googleapis/python-storage/issues/541 115 retry_400 = _helpers.RetryErrors(exceptions.BadRequest) 116 retry_400(bucket.update)() 117 assert bucket.labels == {} 118 119 120def test_bucket_get_set_iam_policy( 121 storage_client, buckets_to_delete, service_account, 122): 123 from google.cloud.storage.iam import STORAGE_OBJECT_VIEWER_ROLE 124 from google.api_core.exceptions import BadRequest 125 from google.api_core.exceptions import PreconditionFailed 126 127 bucket_name = _helpers.unique_name("iam-policy") 128 bucket = _helpers.retry_429_503(storage_client.create_bucket)(bucket_name) 129 buckets_to_delete.append(bucket) 130 assert bucket.exists() 131 132 policy_no_version = bucket.get_iam_policy() 133 assert policy_no_version.version == 1 134 135 policy = bucket.get_iam_policy(requested_policy_version=3) 136 assert policy == policy_no_version 137 138 member = "serviceAccount:{}".format(storage_client.get_service_account_email()) 139 140 binding_w_condition = { 141 "role": STORAGE_OBJECT_VIEWER_ROLE, 142 "members": {member}, 143 "condition": { 144 "title": "always-true", 145 "description": "test condition always-true", 146 "expression": "true", 147 }, 148 } 149 policy.bindings.append(binding_w_condition) 150 151 with pytest.raises(PreconditionFailed, match="enable uniform bucket-level access"): 152 bucket.set_iam_policy(policy) 153 154 bucket.iam_configuration.uniform_bucket_level_access_enabled = True 155 bucket.patch() 156 157 policy = bucket.get_iam_policy(requested_policy_version=3) 158 policy.bindings.append(binding_w_condition) 159 160 with pytest.raises(BadRequest, match="at least 3"): 161 bucket.set_iam_policy(policy) 162 163 policy.version = 3 164 returned_policy = bucket.set_iam_policy(policy) 165 assert returned_policy.version == 3 166 assert returned_policy.bindings == policy.bindings 167 168 fetched_policy = bucket.get_iam_policy(requested_policy_version=3) 169 assert fetched_policy.bindings == returned_policy.bindings 170 171 172def test_bucket_crud_w_requester_pays(storage_client, buckets_to_delete, user_project): 173 bucket_name = _helpers.unique_name("w-requester-pays") 174 created = _helpers.retry_429_503(storage_client.create_bucket)( 175 bucket_name, requester_pays=True 176 ) 177 buckets_to_delete.append(created) 178 assert created.name == bucket_name 179 assert created.requester_pays 180 181 with_user_project = storage_client.bucket(bucket_name, user_project=user_project,) 182 183 try: 184 # Exercise 'buckets.get' w/ userProject. 185 assert with_user_project.exists() 186 with_user_project.reload() 187 assert with_user_project.requester_pays 188 189 # Exercise 'buckets.patch' w/ userProject. 190 with_user_project.configure_website( 191 main_page_suffix="index.html", not_found_page="404.html" 192 ) 193 with_user_project.patch() 194 expected_website = {"mainPageSuffix": "index.html", "notFoundPage": "404.html"} 195 assert with_user_project._properties["website"] == expected_website 196 197 # Exercise 'buckets.update' w/ userProject. 198 new_labels = {"another-label": "another-value"} 199 with_user_project.labels = new_labels 200 with_user_project.update() 201 assert with_user_project.labels == new_labels 202 203 finally: 204 # Exercise 'buckets.delete' w/ userProject. 205 with_user_project.delete() 206 buckets_to_delete.remove(created) 207 208 209def test_bucket_acls_iam_w_user_project( 210 storage_client, buckets_to_delete, user_project 211): 212 bucket_name = _helpers.unique_name("acl-w-user-project") 213 created = _helpers.retry_429_503(storage_client.create_bucket)( 214 bucket_name, requester_pays=True, 215 ) 216 buckets_to_delete.append(created) 217 218 with_user_project = storage_client.bucket(bucket_name, user_project=user_project) 219 220 # Exercise bucket ACL w/ userProject 221 acl = with_user_project.acl 222 acl.reload() 223 acl.all().grant_read() 224 acl.save() 225 assert "READER" in acl.all().get_roles() 226 227 del acl.entities["allUsers"] 228 acl.save() 229 assert not acl.has_entity("allUsers") 230 231 # Exercise default object ACL w/ userProject 232 doa = with_user_project.default_object_acl 233 doa.reload() 234 doa.all().grant_read() 235 doa.save() 236 assert "READER" in doa.all().get_roles() 237 238 # Exercise IAM w/ userProject 239 test_permissions = ["storage.buckets.get"] 240 found = with_user_project.test_iam_permissions(test_permissions) 241 assert found == test_permissions 242 243 policy = with_user_project.get_iam_policy() 244 viewers = policy.setdefault("roles/storage.objectViewer", set()) 245 viewers.add(policy.all_users()) 246 with_user_project.set_iam_policy(policy) 247 248 249def test_bucket_acls_w_metageneration_match(storage_client, buckets_to_delete): 250 wrong_metageneration_number = 9 251 bucket_name = _helpers.unique_name("acl-w-metageneration-match") 252 bucket = _helpers.retry_429_503(storage_client.create_bucket)(bucket_name) 253 buckets_to_delete.append(bucket) 254 255 # Exercise bucket ACL with metageneration match 256 acl = bucket.acl 257 acl.group("cloud-developer-relations@google.com").grant_read() 258 bucket.reload() 259 260 with pytest.raises(exceptions.PreconditionFailed): 261 acl.save(if_metageneration_match=wrong_metageneration_number) 262 assert ( 263 "READER" 264 not in acl.group("cloud-developer-relations@google.com").get_roles() 265 ) 266 267 acl.save(if_metageneration_match=bucket.metageneration) 268 assert "READER" in acl.group("cloud-developer-relations@google.com").get_roles() 269 270 # Exercise default object ACL w/ metageneration match 271 doa = bucket.default_object_acl 272 doa.group("cloud-developer-relations@google.com").grant_owner() 273 bucket.reload() 274 275 with pytest.raises(exceptions.PreconditionFailed): 276 doa.save(if_metageneration_match=wrong_metageneration_number) 277 assert ( 278 "OWNER" not in doa.group("cloud-developer-relations@google.com").get_roles() 279 ) 280 281 doa.save(if_metageneration_match=bucket.metageneration) 282 assert "OWNER" in doa.group("cloud-developer-relations@google.com").get_roles() 283 284 285def test_bucket_copy_blob( 286 storage_client, buckets_to_delete, blobs_to_delete, user_project, 287): 288 payload = b"DEADBEEF" 289 bucket_name = _helpers.unique_name("copy-blob") 290 created = _helpers.retry_429_503(storage_client.create_bucket)(bucket_name) 291 buckets_to_delete.append(created) 292 assert created.name == bucket_name 293 294 blob = created.blob("CloudLogo") 295 blob.upload_from_string(payload) 296 blobs_to_delete.append(blob) 297 298 new_blob = _helpers.retry_bad_copy(created.copy_blob)( 299 blob, created, "CloudLogoCopy" 300 ) 301 blobs_to_delete.append(new_blob) 302 303 copied_contents = new_blob.download_as_bytes() 304 assert copied_contents == payload 305 306 307def test_bucket_copy_blob_w_user_project( 308 storage_client, buckets_to_delete, blobs_to_delete, user_project, 309): 310 payload = b"DEADBEEF" 311 bucket_name = _helpers.unique_name("copy-w-requester-pays") 312 created = _helpers.retry_429_503(storage_client.create_bucket)( 313 bucket_name, requester_pays=True 314 ) 315 buckets_to_delete.append(created) 316 assert created.name == bucket_name 317 assert created.requester_pays 318 319 blob = created.blob("simple") 320 blob.upload_from_string(payload) 321 blobs_to_delete.append(blob) 322 323 with_user_project = storage_client.bucket(bucket_name, user_project=user_project) 324 325 new_blob = _helpers.retry_bad_copy(with_user_project.copy_blob)( 326 blob, with_user_project, "simple-copy" 327 ) 328 blobs_to_delete.append(new_blob) 329 330 assert new_blob.download_as_bytes() == payload 331 332 333def test_bucket_copy_blob_w_generation_match( 334 storage_client, buckets_to_delete, blobs_to_delete, 335): 336 payload = b"DEADBEEF" 337 bucket_name = _helpers.unique_name("generation-match") 338 created = _helpers.retry_429_503(storage_client.create_bucket)(bucket_name) 339 buckets_to_delete.append(created) 340 assert created.name == bucket_name 341 342 blob = created.blob("simple") 343 blob.upload_from_string(payload) 344 blobs_to_delete.append(blob) 345 346 dest_bucket = storage_client.bucket(bucket_name) 347 348 new_blob = dest_bucket.copy_blob( 349 blob, dest_bucket, "simple-copy", if_source_generation_match=blob.generation, 350 ) 351 blobs_to_delete.append(new_blob) 352 353 assert new_blob.download_as_bytes() == payload 354 355 356def test_bucket_copy_blob_w_metageneration_match( 357 storage_client, buckets_to_delete, blobs_to_delete, 358): 359 payload = b"DEADBEEF" 360 bucket_name = _helpers.unique_name("generation-match") 361 created = _helpers.retry_429_503(storage_client.create_bucket)( 362 bucket_name, requester_pays=True 363 ) 364 buckets_to_delete.append(created) 365 assert created.name == bucket_name 366 367 blob = created.blob("simple") 368 blob.upload_from_string(payload) 369 blobs_to_delete.append(blob) 370 371 dest_bucket = storage_client.bucket(bucket_name) 372 373 new_blob = dest_bucket.copy_blob( 374 blob, 375 dest_bucket, 376 "simple-copy", 377 if_source_metageneration_match=blob.metageneration, 378 ) 379 blobs_to_delete.append(new_blob) 380 381 assert new_blob.download_as_bytes() == payload 382 383 384def test_bucket_get_blob_with_user_project( 385 storage_client, buckets_to_delete, blobs_to_delete, user_project, 386): 387 blob_name = "blob-name" 388 payload = b"DEADBEEF" 389 bucket_name = _helpers.unique_name("w-requester-pays") 390 created = _helpers.retry_429_503(storage_client.create_bucket)( 391 bucket_name, requester_pays=True 392 ) 393 buckets_to_delete.append(created) 394 assert created.name == bucket_name 395 assert created.requester_pays 396 397 with_user_project = storage_client.bucket(bucket_name, user_project=user_project) 398 399 assert with_user_project.get_blob("nonesuch") is None 400 401 to_add = created.blob(blob_name) 402 to_add.upload_from_string(payload) 403 blobs_to_delete.append(to_add) 404 405 found = with_user_project.get_blob(blob_name) 406 assert found.download_as_bytes() == payload 407 408 409@_helpers.retry_failures 410def test_bucket_list_blobs(listable_bucket, listable_filenames): 411 all_blobs = list(listable_bucket.list_blobs()) 412 assert sorted(blob.name for blob in all_blobs) == sorted(listable_filenames) 413 414 415@_helpers.retry_failures 416def test_bucket_list_blobs_w_user_project( 417 storage_client, listable_bucket, listable_filenames, user_project, 418): 419 with_user_project = storage_client.bucket( 420 listable_bucket.name, user_project=user_project 421 ) 422 all_blobs = list(with_user_project.list_blobs()) 423 assert sorted(blob.name for blob in all_blobs) == sorted(listable_filenames) 424 425 426@_helpers.retry_failures 427def test_bucket_list_blobs_paginated(listable_bucket, listable_filenames): 428 truncation_size = 1 429 count = len(listable_filenames) - truncation_size 430 iterator = listable_bucket.list_blobs(max_results=count) 431 page_iter = iterator.pages 432 433 page1 = six.next(page_iter) 434 blobs = list(page1) 435 assert len(blobs) == count 436 assert iterator.next_page_token is not None 437 # Technically the iterator is exhausted. 438 assert iterator.num_results == iterator.max_results 439 # But we modify the iterator to continue paging after 440 # artificially stopping after ``count`` items. 441 iterator.max_results = None 442 443 page2 = six.next(page_iter) 444 last_blobs = list(page2) 445 assert len(last_blobs) == truncation_size 446 447 448@_helpers.retry_failures 449def test_bucket_list_blobs_paginated_w_offset(listable_bucket, listable_filenames): 450 truncation_size = 1 451 inclusive_start_offset = listable_filenames[1] 452 exclusive_end_offset = listable_filenames[-1] 453 desired_files = listable_filenames[1:-1] 454 count = len(desired_files) - truncation_size 455 iterator = listable_bucket.list_blobs( 456 max_results=count, 457 start_offset=inclusive_start_offset, 458 end_offset=exclusive_end_offset, 459 ) 460 page_iter = iterator.pages 461 462 page1 = six.next(page_iter) 463 blobs = list(page1) 464 assert len(blobs) == count 465 assert blobs[0].name == desired_files[0] 466 assert iterator.next_page_token is not None 467 # Technically the iterator is exhausted. 468 assert iterator.num_results == iterator.max_results 469 # But we modify the iterator to continue paging after 470 # artificially stopping after ``count`` items. 471 iterator.max_results = None 472 473 page2 = six.next(page_iter) 474 last_blobs = list(page2) 475 assert len(last_blobs) == truncation_size 476 assert last_blobs[-1].name == desired_files[-1] 477 478 479@_helpers.retry_failures 480def test_blob_exists_hierarchy(hierarchy_bucket, hierarchy_filenames): 481 for filename in hierarchy_filenames: 482 blob = hierarchy_bucket.blob(filename) 483 assert blob.exists() 484 485 486@_helpers.retry_failures 487def test_bucket_list_blobs_hierarchy_root_level(hierarchy_bucket, hierarchy_filenames): 488 expected_names = ["file01.txt"] 489 expected_prefixes = set(["parent/"]) 490 491 iterator = hierarchy_bucket.list_blobs(delimiter="/") 492 page = six.next(iterator.pages) 493 blobs = list(page) 494 495 assert [blob.name for blob in blobs] == expected_names 496 assert iterator.next_page_token is None 497 assert iterator.prefixes == expected_prefixes 498 499 500@_helpers.retry_failures 501def test_bucket_list_blobs_hierarchy_first_level(hierarchy_bucket, hierarchy_filenames): 502 expected_names = ["parent/", "parent/file11.txt"] 503 expected_prefixes = set(["parent/child/"]) 504 505 iterator = hierarchy_bucket.list_blobs(delimiter="/", prefix="parent/") 506 page = six.next(iterator.pages) 507 blobs = list(page) 508 509 assert [blob.name for blob in blobs] == expected_names 510 assert iterator.next_page_token is None 511 assert iterator.prefixes == expected_prefixes 512 513 514@_helpers.retry_failures 515def test_bucket_list_blobs_hierarchy_second_level( 516 hierarchy_bucket, hierarchy_filenames 517): 518 expected_names = ["parent/child/file21.txt", "parent/child/file22.txt"] 519 expected_prefixes = set(["parent/child/grand/", "parent/child/other/"]) 520 521 iterator = hierarchy_bucket.list_blobs(delimiter="/", prefix="parent/child/") 522 page = six.next(iterator.pages) 523 blobs = list(page) 524 assert [blob.name for blob in blobs] == expected_names 525 assert iterator.next_page_token is None 526 assert iterator.prefixes == expected_prefixes 527 528 529@_helpers.retry_failures 530def test_bucket_list_blobs_hierarchy_third_level(hierarchy_bucket, hierarchy_filenames): 531 # Pseudo-hierarchy can be arbitrarily deep, subject to the limit 532 # of 1024 characters in the UTF-8 encoded name: 533 # https://cloud.google.com/storage/docs/bucketnaming#objectnames 534 # Exercise a layer deeper to illustrate this. 535 expected_names = ["parent/child/grand/file31.txt"] 536 expected_prefixes = set() 537 538 iterator = hierarchy_bucket.list_blobs(delimiter="/", prefix="parent/child/grand/") 539 page = six.next(iterator.pages) 540 blobs = list(page) 541 542 assert [blob.name for blob in blobs] == expected_names 543 assert iterator.next_page_token is None 544 assert iterator.prefixes == expected_prefixes 545 546 547@_helpers.retry_failures 548def test_bucket_list_blobs_hierarchy_w_include_trailing_delimiter( 549 hierarchy_bucket, hierarchy_filenames, 550): 551 expected_names = ["file01.txt", "parent/"] 552 expected_prefixes = set(["parent/"]) 553 554 iterator = hierarchy_bucket.list_blobs( 555 delimiter="/", include_trailing_delimiter=True 556 ) 557 page = six.next(iterator.pages) 558 blobs = list(page) 559 560 assert [blob.name for blob in blobs] == expected_names 561 assert iterator.next_page_token is None 562 assert iterator.prefixes == expected_prefixes 563 564 565def test_bucket_w_retention_period( 566 storage_client, buckets_to_delete, blobs_to_delete, 567): 568 period_secs = 10 569 bucket_name = _helpers.unique_name("w-retention-period") 570 bucket = _helpers.retry_429_503(storage_client.create_bucket)(bucket_name) 571 buckets_to_delete.append(bucket) 572 573 bucket.retention_period = period_secs 574 bucket.default_event_based_hold = False 575 bucket.patch() 576 577 assert bucket.retention_period == period_secs 578 assert isinstance(bucket.retention_policy_effective_time, datetime.datetime) 579 assert not bucket.default_event_based_hold 580 assert not bucket.retention_policy_locked 581 582 blob_name = "test-blob" 583 payload = b"DEADBEEF" 584 blob = bucket.blob(blob_name) 585 blob.upload_from_string(payload) 586 587 blobs_to_delete.append(blob) 588 589 other = bucket.get_blob(blob_name) 590 591 assert not other.event_based_hold 592 assert not other.temporary_hold 593 assert isinstance(other.retention_expiration_time, datetime.datetime) 594 595 with pytest.raises(exceptions.Forbidden): 596 other.delete() 597 598 bucket.retention_period = None 599 bucket.patch() 600 601 assert bucket.retention_period is None 602 assert bucket.retention_policy_effective_time is None 603 assert not bucket.default_event_based_hold 604 assert not bucket.retention_policy_locked 605 606 _helpers.retry_no_event_based_hold(other.reload)() 607 608 assert not other.event_based_hold 609 assert not other.temporary_hold 610 assert other.retention_expiration_time is None 611 612 other.delete() 613 blobs_to_delete.pop() 614 615 616def test_bucket_w_default_event_based_hold( 617 storage_client, buckets_to_delete, blobs_to_delete, 618): 619 bucket_name = _helpers.unique_name("w-def-ebh") 620 bucket = _helpers.retry_429_503(storage_client.create_bucket)(bucket_name) 621 buckets_to_delete.append(bucket) 622 623 bucket.default_event_based_hold = True 624 bucket.patch() 625 626 assert bucket.default_event_based_hold 627 assert bucket.retention_period is None 628 assert bucket.retention_policy_effective_time is None 629 assert not bucket.retention_policy_locked 630 631 blob_name = "test-blob" 632 payload = b"DEADBEEF" 633 blob = bucket.blob(blob_name) 634 blob.upload_from_string(payload) 635 636 blobs_to_delete.append(blob) 637 638 other = bucket.get_blob(blob_name) 639 640 assert other.event_based_hold 641 assert not other.temporary_hold 642 assert other.retention_expiration_time is None 643 644 with pytest.raises(exceptions.Forbidden): 645 other.delete() 646 647 other.event_based_hold = False 648 other.patch() 649 other.delete() 650 651 bucket.default_event_based_hold = False 652 bucket.patch() 653 654 assert not bucket.default_event_based_hold 655 assert bucket.retention_period is None 656 assert bucket.retention_policy_effective_time is None 657 assert not bucket.retention_policy_locked 658 659 blob.upload_from_string(payload) 660 661 # https://github.com/googleapis/python-storage/issues/435 662 if blob.event_based_hold: 663 _helpers.retry_no_event_based_hold(blob.reload)() 664 665 assert not blob.event_based_hold 666 assert not blob.temporary_hold 667 assert blob.retention_expiration_time is None 668 669 blob.delete() 670 blobs_to_delete.pop() 671 672 673def test_blob_w_temporary_hold( 674 storage_client, buckets_to_delete, blobs_to_delete, 675): 676 bucket_name = _helpers.unique_name("w-tmp-hold") 677 bucket = _helpers.retry_429_503(storage_client.create_bucket)(bucket_name) 678 buckets_to_delete.append(bucket) 679 680 blob_name = "test-blob" 681 payload = b"DEADBEEF" 682 blob = bucket.blob(blob_name) 683 blob.upload_from_string(payload) 684 685 blobs_to_delete.append(blob) 686 687 other = bucket.get_blob(blob_name) 688 other.temporary_hold = True 689 other.patch() 690 691 assert other.temporary_hold 692 assert not other.event_based_hold 693 assert other.retention_expiration_time is None 694 695 with pytest.raises(exceptions.Forbidden): 696 other.delete() 697 698 other.temporary_hold = False 699 other.patch() 700 701 other.delete() 702 blobs_to_delete.pop() 703 704 705def test_bucket_lock_retention_policy( 706 storage_client, buckets_to_delete, 707): 708 period_secs = 10 709 bucket_name = _helpers.unique_name("loc-ret-policy") 710 bucket = _helpers.retry_429_503(storage_client.create_bucket)(bucket_name) 711 buckets_to_delete.append(bucket) 712 713 bucket.retention_period = period_secs 714 bucket.patch() 715 716 assert bucket.retention_period == period_secs 717 assert isinstance(bucket.retention_policy_effective_time, datetime.datetime) 718 assert not bucket.default_event_based_hold 719 assert not bucket.retention_policy_locked 720 721 bucket.lock_retention_policy() 722 723 bucket.reload() 724 assert bucket.retention_policy_locked 725 726 bucket.retention_period = None 727 with pytest.raises(exceptions.Forbidden): 728 bucket.patch() 729 730 731def test_new_bucket_w_ubla( 732 storage_client, buckets_to_delete, blobs_to_delete, 733): 734 bucket_name = _helpers.unique_name("new-w-ubla") 735 bucket = storage_client.bucket(bucket_name) 736 bucket.iam_configuration.uniform_bucket_level_access_enabled = True 737 _helpers.retry_429_503(bucket.create)() 738 buckets_to_delete.append(bucket) 739 740 bucket_acl = bucket.acl 741 with pytest.raises(exceptions.BadRequest): 742 bucket_acl.reload() 743 744 bucket_acl.loaded = True # Fake that we somehow loaded the ACL 745 bucket_acl.group("cloud-developer-relations@google.com").grant_read() 746 with pytest.raises(exceptions.BadRequest): 747 bucket_acl.save() 748 749 blob_name = "my-blob.txt" 750 blob = bucket.blob(blob_name) 751 payload = b"DEADBEEF" 752 blob.upload_from_string(payload) 753 blobs_to_delete.append(blob) 754 755 found = bucket.get_blob(blob_name) 756 assert found.download_as_bytes() == payload 757 758 blob_acl = blob.acl 759 with pytest.raises(exceptions.BadRequest): 760 blob_acl.reload() 761 762 blob_acl.loaded = True # Fake that we somehow loaded the ACL 763 blob_acl.group("cloud-developer-relations@google.com").grant_read() 764 with pytest.raises(exceptions.BadRequest): 765 blob_acl.save() 766 767 768def test_ubla_set_unset_preserves_acls( 769 storage_client, buckets_to_delete, blobs_to_delete, 770): 771 bucket_name = _helpers.unique_name("ubla-acls") 772 bucket = _helpers.retry_429_503(storage_client.create_bucket)(bucket_name) 773 buckets_to_delete.append(bucket) 774 775 blob_name = "my-blob.txt" 776 blob = bucket.blob(blob_name) 777 payload = b"DEADBEEF" 778 blob.upload_from_string(payload) 779 blobs_to_delete.append(blob) 780 781 # Preserve ACLs before setting UBLA 782 bucket_acl_before = list(bucket.acl) 783 blob_acl_before = list(bucket.acl) 784 785 # Set UBLA 786 bucket.iam_configuration.uniform_bucket_level_access_enabled = True 787 bucket.patch() 788 789 assert bucket.iam_configuration.uniform_bucket_level_access_enabled 790 791 # While UBLA is set, cannot get / set ACLs 792 with pytest.raises(exceptions.BadRequest): 793 bucket.acl.reload() 794 795 # Clear UBLA 796 bucket.iam_configuration.uniform_bucket_level_access_enabled = False 797 bucket.patch() 798 799 # Query ACLs after clearing UBLA 800 bucket.acl.reload() 801 bucket_acl_after = list(bucket.acl) 802 blob.acl.reload() 803 blob_acl_after = list(bucket.acl) 804 805 assert bucket_acl_before == bucket_acl_after 806 assert blob_acl_before == blob_acl_after 807 808 809def test_new_bucket_created_w_inherited_pap( 810 storage_client, buckets_to_delete, blobs_to_delete, 811): 812 from google.cloud.storage import constants 813 814 bucket_name = _helpers.unique_name("new-w-pap-inherited") 815 bucket = storage_client.bucket(bucket_name) 816 bucket.iam_configuration.uniform_bucket_level_access_enabled = True 817 bucket.create() 818 buckets_to_delete.append(bucket) 819 820 # TODO: Remove unspecified after changeover is complete 821 assert bucket.iam_configuration.public_access_prevention in [ 822 constants.PUBLIC_ACCESS_PREVENTION_UNSPECIFIED, 823 constants.PUBLIC_ACCESS_PREVENTION_INHERITED, 824 ] 825 826 bucket.iam_configuration.public_access_prevention = ( 827 constants.PUBLIC_ACCESS_PREVENTION_ENFORCED 828 ) 829 bucket.patch() 830 assert ( 831 bucket.iam_configuration.public_access_prevention 832 == constants.PUBLIC_ACCESS_PREVENTION_ENFORCED 833 ) 834 assert bucket.iam_configuration.uniform_bucket_level_access_enabled 835 836 bucket.iam_configuration.uniform_bucket_level_access_enabled = False 837 bucket.patch() 838 assert ( 839 bucket.iam_configuration.public_access_prevention 840 == constants.PUBLIC_ACCESS_PREVENTION_ENFORCED 841 ) 842 843 with pytest.raises(exceptions.BadRequest): 844 bucket.iam_configuration.public_access_prevention = "unexpected value" 845 bucket.patch() 846 847 with pytest.raises(exceptions.PreconditionFailed): 848 bucket.make_public() 849 850 blob_name = "my-blob.txt" 851 blob = bucket.blob(blob_name) 852 payload = b"DEADBEEF" 853 blob.upload_from_string(payload) 854 855 with pytest.raises(exceptions.PreconditionFailed): 856 blob.make_public() 857 858 859@pytest.mark.skip(reason="Unspecified PAP is changing to inherited") 860def test_new_bucket_created_w_enforced_pap( 861 storage_client, buckets_to_delete, blobs_to_delete, 862): 863 from google.cloud.storage import constants 864 865 bucket_name = _helpers.unique_name("new-w-pap-enforced") 866 bucket = storage_client.bucket(bucket_name) 867 bucket.iam_configuration.public_access_prevention = ( 868 constants.PUBLIC_ACCESS_PREVENTION_ENFORCED 869 ) 870 bucket.create() 871 buckets_to_delete.append(bucket) 872 873 assert ( 874 bucket.iam_configuration.public_access_prevention 875 == constants.PUBLIC_ACCESS_PREVENTION_ENFORCED 876 ) 877 878 bucket.iam_configuration.public_access_prevention = ( 879 constants.PUBLIC_ACCESS_PREVENTION_INHERITED 880 ) 881 bucket.patch() 882 883 # TODO: Remove unspecified after changeover is complete 884 assert bucket.iam_configuration.public_access_prevention in [ 885 constants.PUBLIC_ACCESS_PREVENTION_UNSPECIFIED, 886 constants.PUBLIC_ACCESS_PREVENTION_INHERITED, 887 ] 888 assert not bucket.iam_configuration.uniform_bucket_level_access_enabled 889