1# -*- coding: utf-8 -*- 2# Copyright 2013 Google Inc. All Rights Reserved. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15"""Integration tests for cp command.""" 16 17from __future__ import absolute_import 18from __future__ import division 19from __future__ import print_function 20from __future__ import unicode_literals 21 22import ast 23import base64 24import binascii 25import datetime 26import gzip 27import hashlib 28import logging 29import os 30import pickle 31import pkgutil 32import random 33import re 34import stat 35import string 36import sys 37import threading 38 39from apitools.base.py import exceptions as apitools_exceptions 40import boto 41from boto import storage_uri 42from boto.exception import ResumableTransferDisposition 43from boto.exception import StorageResponseError 44from boto.storage_uri import BucketStorageUri 45from gslib.cloud_api import ResumableUploadStartOverException 46from gslib.commands.config import DEFAULT_SLICED_OBJECT_DOWNLOAD_THRESHOLD 47from gslib.cs_api_map import ApiSelector 48from gslib.daisy_chain_wrapper import _DEFAULT_DOWNLOAD_CHUNK_SIZE 49from gslib.discard_messages_queue import DiscardMessagesQueue 50from gslib.gcs_json_api import GcsJsonApi 51from gslib.parallel_tracker_file import ObjectFromTracker 52from gslib.parallel_tracker_file import WriteParallelUploadTrackerFile 53from gslib.project_id import PopulateProjectId 54from gslib.storage_url import StorageUrlFromString 55from gslib.tests.rewrite_helper import EnsureRewriteResumeCallbackHandler 56from gslib.tests.rewrite_helper import HaltingRewriteCallbackHandler 57from gslib.tests.rewrite_helper import RewriteHaltException 58import gslib.tests.testcase as testcase 59from gslib.tests.testcase.base import NotParallelizable 60from gslib.tests.testcase.integration_testcase import SkipForGS 61from gslib.tests.testcase.integration_testcase import SkipForS3 62from gslib.tests.testcase.integration_testcase import SkipForXML 63from gslib.tests.testcase.integration_testcase import SkipForJSON 64from gslib.tests.util import BuildErrorRegex 65from gslib.tests.util import GenerationFromURI as urigen 66from gslib.tests.util import HaltingCopyCallbackHandler 67from gslib.tests.util import HaltOneComponentCopyCallbackHandler 68from gslib.tests.util import HAS_GS_PORT 69from gslib.tests.util import HAS_S3_CREDS 70from gslib.tests.util import ObjectToURI as suri 71from gslib.tests.util import ORPHANED_FILE 72from gslib.tests.util import POSIX_GID_ERROR 73from gslib.tests.util import POSIX_INSUFFICIENT_ACCESS_ERROR 74from gslib.tests.util import POSIX_MODE_ERROR 75from gslib.tests.util import POSIX_UID_ERROR 76from gslib.tests.util import SequentialAndParallelTransfer 77from gslib.tests.util import SetBotoConfigForTest 78from gslib.tests.util import TailSet 79from gslib.tests.util import TEST_ENCRYPTION_KEY1 80from gslib.tests.util import TEST_ENCRYPTION_KEY1_SHA256_B64 81from gslib.tests.util import TEST_ENCRYPTION_KEY2 82from gslib.tests.util import TEST_ENCRYPTION_KEY3 83from gslib.tests.util import unittest 84from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages 85from gslib.tracker_file import DeleteTrackerFile 86from gslib.tracker_file import GetRewriteTrackerFilePath 87from gslib.tracker_file import GetSlicedDownloadTrackerFilePaths 88from gslib.ui_controller import BytesToFixedWidthString 89from gslib.utils import hashing_helper 90from gslib.utils.boto_util import UsingCrcmodExtension 91from gslib.utils.constants import START_CALLBACK_PER_BYTES 92from gslib.utils.constants import UTF8 93from gslib.utils.copy_helper import GetTrackerFilePath 94from gslib.utils.copy_helper import PARALLEL_UPLOAD_STATIC_SALT 95from gslib.utils.copy_helper import PARALLEL_UPLOAD_TEMP_NAMESPACE 96from gslib.utils.copy_helper import TrackerFileType 97from gslib.utils.hashing_helper import CalculateB64EncodedMd5FromContents 98from gslib.utils.hashing_helper import CalculateMd5FromContents 99from gslib.utils.posix_util import GID_ATTR 100from gslib.utils.posix_util import MODE_ATTR 101from gslib.utils.posix_util import NA_ID 102from gslib.utils.posix_util import NA_MODE 103from gslib.utils.posix_util import UID_ATTR 104from gslib.utils.posix_util import ValidateFilePermissionAccess 105from gslib.utils.posix_util import ValidatePOSIXMode 106from gslib.utils.retry_util import Retry 107from gslib.utils.system_util import IS_WINDOWS 108from gslib.utils.text_util import get_random_ascii_chars 109from gslib.utils.unit_util import EIGHT_MIB 110from gslib.utils.unit_util import HumanReadableToBytes 111from gslib.utils.unit_util import MakeHumanReadable 112from gslib.utils.unit_util import ONE_KIB 113from gslib.utils.unit_util import ONE_MIB 114import six 115from six.moves import http_client 116from six.moves import range 117from six.moves import xrange 118 119if six.PY3: 120 long = int # pylint: disable=redefined-builtin,invalid-name 121 122# These POSIX-specific variables aren't defined for Windows. 123# pylint: disable=g-import-not-at-top 124if not IS_WINDOWS: 125 from gslib.tests import util 126 from gslib.tests.util import DEFAULT_MODE 127 from gslib.tests.util import GetInvalidGid 128 from gslib.tests.util import GetNonPrimaryGid 129 from gslib.tests.util import GetPrimaryGid 130 from gslib.tests.util import INVALID_UID 131 from gslib.tests.util import USER_ID 132# pylint: enable=g-import-not-at-top 133 134 135def TestCpMvPOSIXBucketToLocalErrors(cls, bucket_uri, obj, tmpdir, is_cp=True): 136 """Helper function for preserve_posix_errors tests in test_cp and test_mv. 137 138 Args: 139 cls: An instance of either TestCp or TestMv. 140 bucket_uri: The uri of the bucket that the object is in. 141 obj: The object to run the tests on. 142 tmpdir: The local file path to cp to. 143 is_cp: Whether or not the calling test suite is cp or mv. 144 """ 145 error = 'error' 146 # A dict of test_name: attrs_dict. 147 # attrs_dict holds the different attributes that we want for the object in a 148 # specific test. 149 # To minimize potential test flakes from the system's GID mapping changing 150 # mid-test, we use the GID-related methods that fetch GID info each time, 151 # rather than reusing the LazyWrapper-wrapped constants across operations. 152 test_params = { 153 'test1': { 154 MODE_ATTR: '333', 155 error: POSIX_MODE_ERROR 156 }, 157 'test2': { 158 GID_ATTR: GetInvalidGid, 159 error: POSIX_GID_ERROR 160 }, 161 'test3': { 162 GID_ATTR: GetInvalidGid, 163 MODE_ATTR: '420', 164 error: POSIX_GID_ERROR 165 }, 166 'test4': { 167 UID_ATTR: INVALID_UID, 168 error: POSIX_UID_ERROR 169 }, 170 'test5': { 171 UID_ATTR: INVALID_UID, 172 MODE_ATTR: '530', 173 error: POSIX_UID_ERROR 174 }, 175 'test6': { 176 UID_ATTR: INVALID_UID, 177 GID_ATTR: GetInvalidGid, 178 error: POSIX_UID_ERROR 179 }, 180 'test7': { 181 UID_ATTR: INVALID_UID, 182 GID_ATTR: GetInvalidGid, 183 MODE_ATTR: '640', 184 error: POSIX_UID_ERROR 185 }, 186 'test8': { 187 UID_ATTR: INVALID_UID, 188 GID_ATTR: GetPrimaryGid, 189 error: POSIX_UID_ERROR 190 }, 191 'test9': { 192 UID_ATTR: INVALID_UID, 193 GID_ATTR: GetNonPrimaryGid, 194 error: POSIX_UID_ERROR 195 }, 196 'test10': { 197 UID_ATTR: INVALID_UID, 198 GID_ATTR: GetPrimaryGid, 199 MODE_ATTR: '640', 200 error: POSIX_UID_ERROR 201 }, 202 'test11': { 203 UID_ATTR: INVALID_UID, 204 GID_ATTR: GetNonPrimaryGid, 205 MODE_ATTR: '640', 206 error: POSIX_UID_ERROR 207 }, 208 'test12': { 209 UID_ATTR: USER_ID, 210 GID_ATTR: GetInvalidGid, 211 error: POSIX_GID_ERROR 212 }, 213 'test13': { 214 UID_ATTR: USER_ID, 215 GID_ATTR: GetInvalidGid, 216 MODE_ATTR: '640', 217 error: POSIX_GID_ERROR 218 }, 219 'test14': { 220 GID_ATTR: GetPrimaryGid, 221 MODE_ATTR: '240', 222 error: POSIX_INSUFFICIENT_ACCESS_ERROR 223 } 224 } 225 # The first variable below can be used to help debug the test if there is a 226 # problem. 227 for test_name, attrs_dict in six.iteritems(test_params): 228 cls.ClearPOSIXMetadata(obj) 229 230 # Attributes default to None if they are not in attrs_dict; some attrs are 231 # functions or LazyWrapper objects that should be called. 232 uid = attrs_dict.get(UID_ATTR) 233 if uid is not None and callable(uid): 234 uid = uid() 235 236 gid = attrs_dict.get(GID_ATTR) 237 if gid is not None and callable(gid): 238 gid = gid() 239 240 mode = attrs_dict.get(MODE_ATTR) 241 242 cls.SetPOSIXMetadata(cls.default_provider, 243 bucket_uri.bucket_name, 244 obj.object_name, 245 uid=uid, 246 gid=gid, 247 mode=mode) 248 stderr = cls.RunGsUtil([ 249 'cp' if is_cp else 'mv', '-P', 250 suri(bucket_uri, obj.object_name), tmpdir 251 ], 252 expected_status=1, 253 return_stderr=True) 254 cls.assertIn( 255 ORPHANED_FILE, stderr, 256 'Error during test "%s": %s not found in stderr:\n%s' % 257 (test_name, ORPHANED_FILE, stderr)) 258 error_regex = BuildErrorRegex(obj, attrs_dict.get(error)) 259 cls.assertTrue( 260 error_regex.search(stderr), 261 'Test %s did not match expected error; could not find a match for ' 262 '%s\n\nin stderr:\n%s' % (test_name, error_regex.pattern, stderr)) 263 listing1 = TailSet(suri(bucket_uri), cls.FlatListBucket(bucket_uri)) 264 listing2 = TailSet(tmpdir, cls.FlatListDir(tmpdir)) 265 # Bucket should have un-altered content. 266 cls.assertEquals(listing1, set(['/%s' % obj.object_name])) 267 # Dir should have un-altered content. 268 cls.assertEquals(listing2, set([''])) 269 270 271def TestCpMvPOSIXBucketToLocalNoErrors(cls, bucket_uri, tmpdir, is_cp=True): 272 """Helper function for preserve_posix_no_errors tests in test_cp and test_mv. 273 274 Args: 275 cls: An instance of either TestCp or TestMv. 276 bucket_uri: The uri of the bucket that the object is in. 277 tmpdir: The local file path to cp to. 278 is_cp: Whether or not the calling test suite is cp or mv. 279 """ 280 primary_gid = os.stat(tmpdir).st_gid 281 non_primary_gid = util.GetNonPrimaryGid() 282 test_params = { 283 'obj1': { 284 GID_ATTR: primary_gid 285 }, 286 'obj2': { 287 GID_ATTR: non_primary_gid 288 }, 289 'obj3': { 290 GID_ATTR: primary_gid, 291 MODE_ATTR: '440' 292 }, 293 'obj4': { 294 GID_ATTR: non_primary_gid, 295 MODE_ATTR: '444' 296 }, 297 'obj5': { 298 UID_ATTR: USER_ID 299 }, 300 'obj6': { 301 UID_ATTR: USER_ID, 302 MODE_ATTR: '420' 303 }, 304 'obj7': { 305 UID_ATTR: USER_ID, 306 GID_ATTR: primary_gid 307 }, 308 'obj8': { 309 UID_ATTR: USER_ID, 310 GID_ATTR: non_primary_gid 311 }, 312 'obj9': { 313 UID_ATTR: USER_ID, 314 GID_ATTR: primary_gid, 315 MODE_ATTR: '433' 316 }, 317 'obj10': { 318 UID_ATTR: USER_ID, 319 GID_ATTR: non_primary_gid, 320 MODE_ATTR: '442' 321 } 322 } 323 for obj_name, attrs_dict in six.iteritems(test_params): 324 uid = attrs_dict.get(UID_ATTR) 325 gid = attrs_dict.get(GID_ATTR) 326 mode = attrs_dict.get(MODE_ATTR) 327 cls.CreateObject(bucket_uri=bucket_uri, 328 object_name=obj_name, 329 contents=obj_name.encode(UTF8), 330 uid=uid, 331 gid=gid, 332 mode=mode) 333 for obj_name in six.iterkeys(test_params): 334 # Move objects one at a time to avoid listing consistency. 335 cls.RunGsUtil( 336 ['cp' if is_cp else 'mv', '-P', 337 suri(bucket_uri, obj_name), tmpdir]) 338 listing = TailSet(tmpdir, cls.FlatListDir(tmpdir)) 339 cls.assertEquals( 340 listing, 341 set([ 342 '/obj1', '/obj2', '/obj3', '/obj4', '/obj5', '/obj6', '/obj7', 343 '/obj8', '/obj9', '/obj10' 344 ])) 345 cls.VerifyLocalPOSIXPermissions(os.path.join(tmpdir, 'obj1'), 346 gid=primary_gid, 347 mode=DEFAULT_MODE) 348 cls.VerifyLocalPOSIXPermissions(os.path.join(tmpdir, 'obj2'), 349 gid=non_primary_gid, 350 mode=DEFAULT_MODE) 351 cls.VerifyLocalPOSIXPermissions(os.path.join(tmpdir, 'obj3'), 352 gid=primary_gid, 353 mode=0o440) 354 cls.VerifyLocalPOSIXPermissions(os.path.join(tmpdir, 'obj4'), 355 gid=non_primary_gid, 356 mode=0o444) 357 cls.VerifyLocalPOSIXPermissions(os.path.join(tmpdir, 'obj5'), 358 uid=USER_ID, 359 gid=primary_gid, 360 mode=DEFAULT_MODE) 361 cls.VerifyLocalPOSIXPermissions(os.path.join(tmpdir, 'obj6'), 362 uid=USER_ID, 363 gid=primary_gid, 364 mode=0o420) 365 cls.VerifyLocalPOSIXPermissions(os.path.join(tmpdir, 'obj7'), 366 uid=USER_ID, 367 gid=primary_gid, 368 mode=DEFAULT_MODE) 369 cls.VerifyLocalPOSIXPermissions(os.path.join(tmpdir, 'obj8'), 370 uid=USER_ID, 371 gid=non_primary_gid, 372 mode=DEFAULT_MODE) 373 cls.VerifyLocalPOSIXPermissions(os.path.join(tmpdir, 'obj9'), 374 uid=USER_ID, 375 gid=primary_gid, 376 mode=0o433) 377 cls.VerifyLocalPOSIXPermissions(os.path.join(tmpdir, 'obj10'), 378 uid=USER_ID, 379 gid=non_primary_gid, 380 mode=0o442) 381 382 383def TestCpMvPOSIXLocalToBucketNoErrors(cls, bucket_uri, is_cp=True): 384 """Helper function for testing local to bucket POSIX preservation. 385 386 Args: 387 cls: An instance of either TestCp or TestMv. 388 bucket_uri: The uri of the bucket to cp/mv to. 389 is_cp: Whether or not the calling test suite is cp or mv. 390 """ 391 primary_gid = os.getgid() 392 non_primary_gid = util.GetNonPrimaryGid() 393 test_params = { 394 'obj1': { 395 GID_ATTR: primary_gid 396 }, 397 'obj2': { 398 GID_ATTR: non_primary_gid 399 }, 400 'obj3': { 401 GID_ATTR: primary_gid, 402 MODE_ATTR: '440' 403 }, 404 'obj4': { 405 GID_ATTR: non_primary_gid, 406 MODE_ATTR: '444' 407 }, 408 'obj5': { 409 UID_ATTR: USER_ID 410 }, 411 'obj6': { 412 UID_ATTR: USER_ID, 413 MODE_ATTR: '420' 414 }, 415 'obj7': { 416 UID_ATTR: USER_ID, 417 GID_ATTR: primary_gid 418 }, 419 'obj8': { 420 UID_ATTR: USER_ID, 421 GID_ATTR: non_primary_gid 422 }, 423 'obj9': { 424 UID_ATTR: USER_ID, 425 GID_ATTR: primary_gid, 426 MODE_ATTR: '433' 427 }, 428 'obj10': { 429 UID_ATTR: USER_ID, 430 GID_ATTR: non_primary_gid, 431 MODE_ATTR: '442' 432 } 433 } 434 for obj_name, attrs_dict in six.iteritems(test_params): 435 uid = attrs_dict.get(UID_ATTR, NA_ID) 436 gid = attrs_dict.get(GID_ATTR, NA_ID) 437 mode = attrs_dict.get(MODE_ATTR, NA_MODE) 438 if mode != NA_MODE: 439 ValidatePOSIXMode(int(mode, 8)) 440 ValidateFilePermissionAccess(obj_name, 441 uid=uid, 442 gid=int(gid), 443 mode=int(mode)) 444 fpath = cls.CreateTempFile(contents=b'foo', uid=uid, gid=gid, mode=mode) 445 cls.RunGsUtil( 446 ['cp' if is_cp else 'mv', '-P', fpath, 447 suri(bucket_uri, obj_name)]) 448 if uid != NA_ID: 449 cls.VerifyObjectCustomAttribute(bucket_uri.bucket_name, obj_name, 450 UID_ATTR, str(uid)) 451 if gid != NA_ID: 452 cls.VerifyObjectCustomAttribute(bucket_uri.bucket_name, obj_name, 453 GID_ATTR, str(gid)) 454 if mode != NA_MODE: 455 cls.VerifyObjectCustomAttribute(bucket_uri.bucket_name, obj_name, 456 MODE_ATTR, str(mode)) 457 458 459def _ReadContentsFromFifo(fifo_path, list_for_output): 460 with open(fifo_path, 'rb') as f: 461 list_for_output.append(f.read()) 462 463 464def _WriteContentsToFifo(contents, fifo_path): 465 with open(fifo_path, 'wb') as f: 466 f.write(contents) 467 468 469class _JSONForceHTTPErrorCopyCallbackHandler(object): 470 """Test callback handler that raises an arbitrary HTTP error exception.""" 471 472 def __init__(self, startover_at_byte, http_error_num): 473 self._startover_at_byte = startover_at_byte 474 self._http_error_num = http_error_num 475 self.started_over_once = False 476 477 # pylint: disable=invalid-name 478 def call(self, total_bytes_transferred, total_size): 479 """Forcibly exits if the transfer has passed the halting point.""" 480 if (total_bytes_transferred >= self._startover_at_byte and 481 not self.started_over_once): 482 sys.stderr.write('Forcing HTTP error %s after byte %s. ' 483 '%s/%s transferred.\r\n' % 484 (self._http_error_num, self._startover_at_byte, 485 MakeHumanReadable(total_bytes_transferred), 486 MakeHumanReadable(total_size))) 487 self.started_over_once = True 488 raise apitools_exceptions.HttpError({'status': self._http_error_num}, 489 None, None) 490 491 492class _XMLResumableUploadStartOverCopyCallbackHandler(object): 493 """Test callback handler that raises start-over exception during upload.""" 494 495 def __init__(self, startover_at_byte): 496 self._startover_at_byte = startover_at_byte 497 self.started_over_once = False 498 499 # pylint: disable=invalid-name 500 def call(self, total_bytes_transferred, total_size): 501 """Forcibly exits if the transfer has passed the halting point.""" 502 if (total_bytes_transferred >= self._startover_at_byte and 503 not self.started_over_once): 504 sys.stderr.write( 505 'Forcing ResumableUpload start over error after byte %s. ' 506 '%s/%s transferred.\r\n' % 507 (self._startover_at_byte, MakeHumanReadable(total_bytes_transferred), 508 MakeHumanReadable(total_size))) 509 self.started_over_once = True 510 raise boto.exception.ResumableUploadException( 511 'Forcing upload start over', ResumableTransferDisposition.START_OVER) 512 513 514class _DeleteBucketThenStartOverCopyCallbackHandler(object): 515 """Test callback handler that deletes bucket then raises start-over.""" 516 517 def __init__(self, startover_at_byte, bucket_uri): 518 self._startover_at_byte = startover_at_byte 519 self._bucket_uri = bucket_uri 520 self.started_over_once = False 521 522 # pylint: disable=invalid-name 523 def call(self, total_bytes_transferred, total_size): 524 """Forcibly exits if the transfer has passed the halting point.""" 525 if (total_bytes_transferred >= self._startover_at_byte and 526 not self.started_over_once): 527 sys.stderr.write('Deleting bucket (%s)' % (self._bucket_uri.bucket_name)) 528 529 @Retry(StorageResponseError, tries=5, timeout_secs=1) 530 def DeleteBucket(): 531 bucket_list = list(self._bucket_uri.list_bucket(all_versions=True)) 532 for k in bucket_list: 533 self._bucket_uri.get_bucket().delete_key(k.name, 534 version_id=k.version_id) 535 self._bucket_uri.delete_bucket() 536 537 DeleteBucket() 538 sys.stderr.write( 539 'Forcing ResumableUpload start over error after byte %s. ' 540 '%s/%s transferred.\r\n' % 541 (self._startover_at_byte, MakeHumanReadable(total_bytes_transferred), 542 MakeHumanReadable(total_size))) 543 self.started_over_once = True 544 raise ResumableUploadStartOverException('Artificially forcing start-over') 545 546 547class _ResumableUploadRetryHandler(object): 548 """Test callback handler for causing retries during a resumable transfer.""" 549 550 def __init__(self, 551 retry_at_byte, 552 exception_to_raise, 553 exc_args, 554 num_retries=1): 555 self._retry_at_byte = retry_at_byte 556 self._exception_to_raise = exception_to_raise 557 self._exception_args = exc_args 558 self._num_retries = num_retries 559 560 self._retries_made = 0 561 562 # pylint: disable=invalid-name 563 def call(self, total_bytes_transferred, unused_total_size): 564 """Cause a single retry at the retry point.""" 565 if (total_bytes_transferred >= self._retry_at_byte and 566 self._retries_made < self._num_retries): 567 self._retries_made += 1 568 raise self._exception_to_raise(*self._exception_args) 569 570 571class TestCp(testcase.GsUtilIntegrationTestCase): 572 """Integration tests for cp command.""" 573 574 # For tests that artificially halt, we need to ensure at least one callback 575 # occurs. 576 halt_size = START_CALLBACK_PER_BYTES * 2 577 578 def _get_test_file(self, name): 579 contents = pkgutil.get_data('gslib', 'tests/test_data/%s' % name) 580 return self.CreateTempFile(file_name=name, contents=contents) 581 582 def _CpWithFifoViaGsUtilAndAppendOutputToList(self, src_path_tuple, dst_path, 583 list_for_return_value, 584 **kwargs): 585 arg_list = ['cp'] 586 arg_list.extend(src_path_tuple) 587 arg_list.append(dst_path) 588 # Append stderr, stdout, or return status (if specified in kwargs) to the 589 # given list. 590 list_for_return_value.append(self.RunGsUtil(arg_list, **kwargs)) 591 592 @SequentialAndParallelTransfer 593 def test_noclobber(self): 594 key_uri = self.CreateObject(contents=b'foo') 595 fpath = self.CreateTempFile(contents=b'bar') 596 stderr = self.RunGsUtil( 597 ['cp', '-n', fpath, suri(key_uri)], return_stderr=True) 598 self.assertIn('Skipping existing item: %s' % suri(key_uri), stderr) 599 self.assertEqual(key_uri.get_contents_as_string(), b'foo') 600 stderr = self.RunGsUtil(['cp', '-n', suri(key_uri), fpath], 601 return_stderr=True) 602 with open(fpath, 'rb') as f: 603 self.assertIn('Skipping existing item: %s' % suri(f), stderr) 604 self.assertEqual(f.read(), b'bar') 605 606 @SequentialAndParallelTransfer 607 def test_noclobber_different_size(self): 608 key_uri = self.CreateObject(contents=b'foo') 609 fpath = self.CreateTempFile(contents=b'quux') 610 stderr = self.RunGsUtil( 611 ['cp', '-n', fpath, suri(key_uri)], return_stderr=True) 612 self.assertIn('Skipping existing item: %s' % suri(key_uri), stderr) 613 self.assertEqual(key_uri.get_contents_as_string(), b'foo') 614 stderr = self.RunGsUtil(['cp', '-n', suri(key_uri), fpath], 615 return_stderr=True) 616 with open(fpath, 'rb') as f: 617 self.assertIn('Skipping existing item: %s' % suri(f), stderr) 618 self.assertEqual(f.read(), b'quux') 619 620 def test_dest_bucket_not_exist(self): 621 fpath = self.CreateTempFile(contents=b'foo') 622 invalid_bucket_uri = ('%s://%s' % 623 (self.default_provider, self.nonexistent_bucket_name)) 624 # TODO(b/135780661): Remove retry after bug resolved 625 @Retry(AssertionError, tries=3, timeout_secs=1) 626 def _Check(): 627 stderr = self.RunGsUtil(['cp', fpath, invalid_bucket_uri], 628 expected_status=1, 629 return_stderr=True) 630 self.assertIn('does not exist', stderr) 631 632 _Check() 633 634 def test_copy_in_cloud_noclobber(self): 635 bucket1_uri = self.CreateBucket() 636 bucket2_uri = self.CreateBucket() 637 key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents=b'foo') 638 stderr = self.RunGsUtil( 639 ['cp', suri(key_uri), suri(bucket2_uri)], return_stderr=True) 640 # Rewrite API may output an additional 'Copying' progress notification. 641 self.assertGreaterEqual(stderr.count('Copying'), 1) 642 self.assertLessEqual(stderr.count('Copying'), 2) 643 stderr = self.RunGsUtil( 644 ['cp', '-n', suri(key_uri), 645 suri(bucket2_uri)], return_stderr=True) 646 self.assertIn( 647 'Skipping existing item: %s' % suri(bucket2_uri, key_uri.object_name), 648 stderr) 649 650 @unittest.skipIf(IS_WINDOWS, 'os.mkfifo not available on Windows.') 651 @SequentialAndParallelTransfer 652 def test_cp_from_local_file_to_fifo(self): 653 contents = b'bar' 654 fifo_path = self.CreateTempFifo() 655 file_path = self.CreateTempFile(contents=contents) 656 list_for_output = [] 657 658 read_thread = threading.Thread(target=_ReadContentsFromFifo, 659 args=(fifo_path, list_for_output)) 660 read_thread.start() 661 write_thread = threading.Thread( 662 target=self._CpWithFifoViaGsUtilAndAppendOutputToList, 663 args=((file_path,), fifo_path, [])) 664 write_thread.start() 665 write_thread.join(120) 666 read_thread.join(120) 667 if not list_for_output: 668 self.fail('Reading/writing to the fifo timed out.') 669 self.assertEqual(list_for_output[0].strip(), contents) 670 671 @unittest.skipIf(IS_WINDOWS, 'os.mkfifo not available on Windows.') 672 @SequentialAndParallelTransfer 673 def test_cp_from_one_object_to_fifo(self): 674 fifo_path = self.CreateTempFifo() 675 bucket_uri = self.CreateBucket() 676 contents = b'bar' 677 obj_uri = self.CreateObject(bucket_uri=bucket_uri, contents=contents) 678 list_for_output = [] 679 680 read_thread = threading.Thread(target=_ReadContentsFromFifo, 681 args=(fifo_path, list_for_output)) 682 read_thread.start() 683 write_thread = threading.Thread( 684 target=self._CpWithFifoViaGsUtilAndAppendOutputToList, 685 args=((suri(obj_uri),), fifo_path, [])) 686 write_thread.start() 687 write_thread.join(120) 688 read_thread.join(120) 689 if not list_for_output: 690 self.fail('Reading/writing to the fifo timed out.') 691 self.assertEqual(list_for_output[0].strip(), contents) 692 693 @unittest.skipIf(IS_WINDOWS, 'os.mkfifo not available on Windows.') 694 @SequentialAndParallelTransfer 695 def test_cp_from_multiple_objects_to_fifo(self): 696 fifo_path = self.CreateTempFifo() 697 bucket_uri = self.CreateBucket() 698 contents1 = b'foo and bar' 699 contents2 = b'baz and qux' 700 obj1_uri = self.CreateObject(bucket_uri=bucket_uri, contents=contents1) 701 obj2_uri = self.CreateObject(bucket_uri=bucket_uri, contents=contents2) 702 list_for_output = [] 703 704 read_thread = threading.Thread(target=_ReadContentsFromFifo, 705 args=(fifo_path, list_for_output)) 706 read_thread.start() 707 write_thread = threading.Thread( 708 target=self._CpWithFifoViaGsUtilAndAppendOutputToList, 709 args=((suri(obj1_uri), suri(obj2_uri)), fifo_path, [])) 710 write_thread.start() 711 write_thread.join(120) 712 read_thread.join(120) 713 if not list_for_output: 714 self.fail('Reading/writing to the fifo timed out.') 715 self.assertIn(contents1, list_for_output[0]) 716 self.assertIn(contents2, list_for_output[0]) 717 718 @SequentialAndParallelTransfer 719 def test_streaming(self): 720 bucket_uri = self.CreateBucket() 721 stderr = self.RunGsUtil( 722 ['cp', '-', '%s' % suri(bucket_uri, 'foo')], 723 stdin='bar', 724 return_stderr=True) 725 self.assertIn('Copying from <STDIN>', stderr) 726 key_uri = bucket_uri.clone_replace_name('foo') 727 self.assertEqual(key_uri.get_contents_as_string(), b'bar') 728 729 @unittest.skipIf(IS_WINDOWS, 'os.mkfifo not available on Windows.') 730 @SequentialAndParallelTransfer 731 def test_streaming_from_fifo_to_object(self): 732 bucket_uri = self.CreateBucket() 733 fifo_path = self.CreateTempFifo() 734 object_name = 'foo' 735 object_contents = b'bar' 736 list_for_output = [] 737 738 # Start writer in the background, which won't finish until a corresponding 739 # read operation is performed on the fifo. 740 write_thread = threading.Thread(target=_WriteContentsToFifo, 741 args=(object_contents, fifo_path)) 742 write_thread.start() 743 # The fifo requires both a pending read and write before either operation 744 # will complete. Regardless of which operation occurs first, the 745 # corresponding subsequent operation will unblock the first one. 746 # We run gsutil in a thread so that it can timeout rather than hang forever 747 # if the write thread fails. 748 read_thread = threading.Thread( 749 target=self._CpWithFifoViaGsUtilAndAppendOutputToList, 750 args=((fifo_path,), suri(bucket_uri, object_name), list_for_output), 751 kwargs={'return_stderr': True}) 752 read_thread.start() 753 754 read_thread.join(120) 755 write_thread.join(120) 756 if not list_for_output: 757 self.fail('Reading/writing to the fifo timed out.') 758 self.assertIn('Copying from named pipe', list_for_output[0]) 759 760 key_uri = bucket_uri.clone_replace_name(object_name) 761 self.assertEqual(key_uri.get_contents_as_string(), object_contents) 762 763 @unittest.skipIf(IS_WINDOWS, 'os.mkfifo not available on Windows.') 764 @SequentialAndParallelTransfer 765 def test_streaming_from_fifo_to_stdout(self): 766 fifo_path = self.CreateTempFifo() 767 contents = b'bar' 768 list_for_output = [] 769 770 write_thread = threading.Thread(target=_WriteContentsToFifo, 771 args=(contents, fifo_path)) 772 write_thread.start() 773 read_thread = threading.Thread( 774 target=self._CpWithFifoViaGsUtilAndAppendOutputToList, 775 args=((fifo_path,), '-', list_for_output), 776 kwargs={'return_stdout': True}) 777 read_thread.start() 778 read_thread.join(120) 779 write_thread.join(120) 780 if not list_for_output: 781 self.fail('Reading/writing to the fifo timed out.') 782 self.assertEqual(list_for_output[0].strip().encode('ascii'), contents) 783 784 @unittest.skipIf(IS_WINDOWS, 'os.mkfifo not available on Windows.') 785 @SequentialAndParallelTransfer 786 def test_streaming_from_stdout_to_fifo(self): 787 fifo_path = self.CreateTempFifo() 788 contents = b'bar' 789 list_for_output = [] 790 list_for_gsutil_output = [] 791 792 read_thread = threading.Thread(target=_ReadContentsFromFifo, 793 args=(fifo_path, list_for_output)) 794 read_thread.start() 795 write_thread = threading.Thread( 796 target=self._CpWithFifoViaGsUtilAndAppendOutputToList, 797 args=(('-',), fifo_path, list_for_gsutil_output), 798 kwargs={ 799 'return_stderr': True, 800 'stdin': contents 801 }) 802 write_thread.start() 803 write_thread.join(120) 804 read_thread.join(120) 805 if not list_for_output: 806 self.fail('Reading/writing to the fifo timed out.') 807 self.assertEqual(list_for_output[0].strip(), contents) 808 809 def test_streaming_multiple_arguments(self): 810 bucket_uri = self.CreateBucket() 811 stderr = self.RunGsUtil(['cp', '-', '-', suri(bucket_uri)], 812 stdin='bar', 813 return_stderr=True, 814 expected_status=1) 815 self.assertIn('Multiple URL strings are not supported with streaming', 816 stderr) 817 818 # TODO: Implement a way to test both with and without using magic file. 819 820 @SequentialAndParallelTransfer 821 def test_detect_content_type(self): 822 """Tests local detection of content type.""" 823 bucket_uri = self.CreateBucket() 824 dsturi = suri(bucket_uri, 'foo') 825 826 self.RunGsUtil(['cp', self._get_test_file('test.mp3'), dsturi]) 827 828 # Use @Retry as hedge against bucket listing eventual consistency. 829 @Retry(AssertionError, tries=3, timeout_secs=1) 830 def _Check1(): 831 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) 832 if IS_WINDOWS: 833 self.assertTrue( 834 re.search(r'Content-Type:\s+audio/x-mpg', stdout) or 835 re.search(r'Content-Type:\s+audio/mpeg', stdout)) 836 else: 837 self.assertRegex(stdout, r'Content-Type:\s+audio/mpeg') 838 839 _Check1() 840 841 self.RunGsUtil(['cp', self._get_test_file('test.gif'), dsturi]) 842 843 # Use @Retry as hedge against bucket listing eventual consistency. 844 @Retry(AssertionError, tries=3, timeout_secs=1) 845 def _Check2(): 846 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) 847 self.assertRegex(stdout, r'Content-Type:\s+image/gif') 848 849 _Check2() 850 851 def test_content_type_override_default(self): 852 """Tests overriding content type with the default value.""" 853 bucket_uri = self.CreateBucket() 854 dsturi = suri(bucket_uri, 'foo') 855 856 self.RunGsUtil( 857 ['-h', 'Content-Type:', 'cp', 858 self._get_test_file('test.mp3'), dsturi]) 859 860 # Use @Retry as hedge against bucket listing eventual consistency. 861 @Retry(AssertionError, tries=3, timeout_secs=1) 862 def _Check1(): 863 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) 864 self.assertRegex(stdout, r'Content-Type:\s+application/octet-stream') 865 866 _Check1() 867 868 self.RunGsUtil( 869 ['-h', 'Content-Type:', 'cp', 870 self._get_test_file('test.gif'), dsturi]) 871 872 # Use @Retry as hedge against bucket listing eventual consistency. 873 @Retry(AssertionError, tries=3, timeout_secs=1) 874 def _Check2(): 875 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) 876 self.assertRegex(stdout, r'Content-Type:\s+application/octet-stream') 877 878 _Check2() 879 880 def test_content_type_override(self): 881 """Tests overriding content type with a value.""" 882 bucket_uri = self.CreateBucket() 883 dsturi = suri(bucket_uri, 'foo') 884 885 self.RunGsUtil([ 886 '-h', 'Content-Type:text/plain', 'cp', 887 self._get_test_file('test.mp3'), dsturi 888 ]) 889 890 # Use @Retry as hedge against bucket listing eventual consistency. 891 @Retry(AssertionError, tries=3, timeout_secs=1) 892 def _Check1(): 893 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) 894 self.assertRegex(stdout, r'Content-Type:\s+text/plain') 895 896 _Check1() 897 898 self.RunGsUtil([ 899 '-h', 'Content-Type:text/plain', 'cp', 900 self._get_test_file('test.gif'), dsturi 901 ]) 902 903 # Use @Retry as hedge against bucket listing eventual consistency. 904 @Retry(AssertionError, tries=3, timeout_secs=1) 905 def _Check2(): 906 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) 907 self.assertRegex(stdout, r'Content-Type:\s+text/plain') 908 909 _Check2() 910 911 @unittest.skipIf(IS_WINDOWS, 'magicfile is not available on Windows.') 912 @SequentialAndParallelTransfer 913 def test_magicfile_override(self): 914 """Tests content type override with magicfile value.""" 915 bucket_uri = self.CreateBucket() 916 dsturi = suri(bucket_uri, 'foo') 917 fpath = self.CreateTempFile(contents=b'foo/bar\n') 918 self.RunGsUtil(['cp', fpath, dsturi]) 919 920 # Use @Retry as hedge against bucket listing eventual consistency. 921 @Retry(AssertionError, tries=3, timeout_secs=1) 922 def _Check1(): 923 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) 924 use_magicfile = boto.config.getbool('GSUtil', 'use_magicfile', False) 925 content_type = ('text/plain' 926 if use_magicfile else 'application/octet-stream') 927 self.assertRegex(stdout, r'Content-Type:\s+%s' % content_type) 928 929 _Check1() 930 931 @SequentialAndParallelTransfer 932 def test_content_type_mismatches(self): 933 """Tests overriding content type when it does not match the file type.""" 934 bucket_uri = self.CreateBucket() 935 dsturi = suri(bucket_uri, 'foo') 936 fpath = self.CreateTempFile(contents=b'foo/bar\n') 937 938 self.RunGsUtil([ 939 '-h', 'Content-Type:image/gif', 'cp', 940 self._get_test_file('test.mp3'), dsturi 941 ]) 942 943 # Use @Retry as hedge against bucket listing eventual consistency. 944 @Retry(AssertionError, tries=3, timeout_secs=1) 945 def _Check1(): 946 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) 947 self.assertRegex(stdout, r'Content-Type:\s+image/gif') 948 949 _Check1() 950 951 self.RunGsUtil([ 952 '-h', 'Content-Type:image/gif', 'cp', 953 self._get_test_file('test.gif'), dsturi 954 ]) 955 956 # Use @Retry as hedge against bucket listing eventual consistency. 957 @Retry(AssertionError, tries=3, timeout_secs=1) 958 def _Check2(): 959 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) 960 self.assertRegex(stdout, r'Content-Type:\s+image/gif') 961 962 _Check2() 963 964 self.RunGsUtil(['-h', 'Content-Type:image/gif', 'cp', fpath, dsturi]) 965 966 # Use @Retry as hedge against bucket listing eventual consistency. 967 @Retry(AssertionError, tries=3, timeout_secs=1) 968 def _Check3(): 969 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) 970 self.assertRegex(stdout, r'Content-Type:\s+image/gif') 971 972 _Check3() 973 974 @SequentialAndParallelTransfer 975 def test_content_type_header_case_insensitive(self): 976 """Tests that content type header is treated with case insensitivity.""" 977 bucket_uri = self.CreateBucket() 978 dsturi = suri(bucket_uri, 'foo') 979 fpath = self._get_test_file('test.gif') 980 981 self.RunGsUtil(['-h', 'content-Type:text/plain', 'cp', fpath, dsturi]) 982 983 # Use @Retry as hedge against bucket listing eventual consistency. 984 @Retry(AssertionError, tries=3, timeout_secs=1) 985 def _Check1(): 986 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) 987 self.assertRegex(stdout, r'Content-Type:\s+text/plain') 988 self.assertNotRegex(stdout, r'image/gif') 989 990 _Check1() 991 992 self.RunGsUtil([ 993 '-h', 'CONTENT-TYPE:image/gif', '-h', 'content-type:image/gif', 'cp', 994 fpath, dsturi 995 ]) 996 997 # Use @Retry as hedge against bucket listing eventual consistency. 998 @Retry(AssertionError, tries=3, timeout_secs=1) 999 def _Check2(): 1000 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) 1001 self.assertRegex(stdout, r'Content-Type:\s+image/gif') 1002 self.assertNotRegex(stdout, r'image/gif,\s*image/gif') 1003 1004 _Check2() 1005 1006 @SequentialAndParallelTransfer 1007 def test_other_headers(self): 1008 """Tests that non-content-type headers are applied successfully on copy.""" 1009 bucket_uri = self.CreateBucket() 1010 dst_uri = suri(bucket_uri, 'foo') 1011 fpath = self._get_test_file('test.gif') 1012 1013 self.RunGsUtil([ 1014 '-h', 'Cache-Control:public,max-age=12', '-h', 1015 'x-%s-meta-1:abcd' % self.provider_custom_meta, 'cp', fpath, dst_uri 1016 ]) 1017 1018 stdout = self.RunGsUtil(['ls', '-L', dst_uri], return_stdout=True) 1019 self.assertRegex(stdout, r'Cache-Control\s*:\s*public,max-age=12') 1020 self.assertRegex(stdout, r'Metadata:\s*1:\s*abcd') 1021 1022 dst_uri2 = suri(bucket_uri, 'bar') 1023 self.RunGsUtil(['cp', dst_uri, dst_uri2]) 1024 # Ensure metadata was preserved across copy. 1025 stdout = self.RunGsUtil(['ls', '-L', dst_uri2], return_stdout=True) 1026 self.assertRegex(stdout, r'Cache-Control\s*:\s*public,max-age=12') 1027 self.assertRegex(stdout, r'Metadata:\s*1:\s*abcd') 1028 1029 @SequentialAndParallelTransfer 1030 def test_request_reason_header(self): 1031 """Test that x-goog-request-header can be set using the environment variable.""" 1032 os.environ['CLOUDSDK_CORE_REQUEST_REASON'] = 'b/this_is_env_reason' 1033 bucket_uri = self.CreateBucket() 1034 dst_uri = suri(bucket_uri, 'foo') 1035 fpath = self._get_test_file('test.gif') 1036 # Ensure x-goog-request-header is set in cp command 1037 stderr = self.RunGsUtil(['-D', 'cp', fpath, dst_uri], return_stderr=True) 1038 self.assertRegex(stderr, 1039 r'\'x-goog-request-reason\': \'b/this_is_env_reason\'') 1040 # Ensure x-goog-request-header is set in ls command 1041 stderr = self.RunGsUtil(['-D', 'ls', '-L', dst_uri], return_stderr=True) 1042 self.assertRegex(stderr, 1043 r'\'x-goog-request-reason\': \'b/this_is_env_reason\'') 1044 1045 @SequentialAndParallelTransfer 1046 @SkipForXML('XML APIs use a different debug log format.') 1047 def test_request_reason_header_persists_multiple_requests_json(self): 1048 """Test that x-goog-request-header works when cp sends multiple requests.""" 1049 os.environ['CLOUDSDK_CORE_REQUEST_REASON'] = 'b/this_is_env_reason' 1050 bucket_uri = self.CreateBucket() 1051 dst_uri = suri(bucket_uri, 'foo') 1052 fpath = self._get_test_file('test.gif') 1053 1054 boto_config_for_test = ('GSUtil', 'resumable_threshold', '0') 1055 with SetBotoConfigForTest([boto_config_for_test]): 1056 stderr = self.RunGsUtil(['-D', 'cp', fpath, dst_uri], return_stderr=True) 1057 1058 # PUT follows GET request. Both need the request-reason header. 1059 reason_regex = (r'Making http GET[\s\S]*' 1060 r'x-goog-request-reason\': \'b/this_is_env_reason[\s\S]*' 1061 r'send: (b\')?PUT[\s\S]*x-goog-request-reason:' 1062 r' b/this_is_env_reason') 1063 self.assertRegex(stderr, reason_regex) 1064 1065 @SequentialAndParallelTransfer 1066 @SkipForJSON('JSON API uses a different debug log format.') 1067 def test_request_reason_header_persists_multiple_requests_xml(self): 1068 """Test that x-goog-request-header works when cp sends multiple requests.""" 1069 os.environ['CLOUDSDK_CORE_REQUEST_REASON'] = 'b/this_is_env_reason' 1070 bucket_uri = self.CreateBucket() 1071 dst_uri = suri(bucket_uri, 'foo') 1072 fpath = self._get_test_file('test.gif') 1073 1074 boto_config_for_test = ('GSUtil', 'resumable_threshold', '0') 1075 with SetBotoConfigForTest([boto_config_for_test]): 1076 stderr = self.RunGsUtil(['-D', 'cp', fpath, dst_uri], return_stderr=True) 1077 1078 reason_regex = ( 1079 r'Final headers: \{[\s\S]*\'' 1080 r'x-goog-request-reason\': \'b/this_is_env_reason\'[\s\S]*}') 1081 1082 # Pattern should match twice since two requests should have a reason header. 1083 self.assertRegex(stderr, reason_regex + r'[\s\S]*' + reason_regex) 1084 1085 @SequentialAndParallelTransfer 1086 def test_versioning(self): 1087 """Tests copy with versioning.""" 1088 bucket_uri = self.CreateVersionedBucket() 1089 k1_uri = self.CreateObject(bucket_uri=bucket_uri, contents=b'data2') 1090 k2_uri = self.CreateObject(bucket_uri=bucket_uri, contents=b'data1') 1091 g1 = urigen(k2_uri) 1092 self.RunGsUtil(['cp', suri(k1_uri), suri(k2_uri)]) 1093 k2_uri = bucket_uri.clone_replace_name(k2_uri.object_name) 1094 k2_uri = bucket_uri.clone_replace_key(k2_uri.get_key()) 1095 g2 = urigen(k2_uri) 1096 k2_uri.set_contents_from_string('data3') 1097 g3 = urigen(k2_uri) 1098 1099 fpath = self.CreateTempFile() 1100 # Check to make sure current version is data3. 1101 self.RunGsUtil(['cp', k2_uri.versionless_uri, fpath]) 1102 with open(fpath, 'rb') as f: 1103 self.assertEqual(f.read(), b'data3') 1104 1105 # Check contents of all three versions 1106 self.RunGsUtil(['cp', '%s#%s' % (k2_uri.versionless_uri, g1), fpath]) 1107 with open(fpath, 'rb') as f: 1108 self.assertEqual(f.read(), b'data1') 1109 self.RunGsUtil(['cp', '%s#%s' % (k2_uri.versionless_uri, g2), fpath]) 1110 with open(fpath, 'rb') as f: 1111 self.assertEqual(f.read(), b'data2') 1112 self.RunGsUtil(['cp', '%s#%s' % (k2_uri.versionless_uri, g3), fpath]) 1113 with open(fpath, 'rb') as f: 1114 self.assertEqual(f.read(), b'data3') 1115 1116 # Copy first version to current and verify. 1117 self.RunGsUtil( 1118 ['cp', 1119 '%s#%s' % (k2_uri.versionless_uri, g1), k2_uri.versionless_uri]) 1120 self.RunGsUtil(['cp', k2_uri.versionless_uri, fpath]) 1121 with open(fpath, 'rb') as f: 1122 self.assertEqual(f.read(), b'data1') 1123 1124 # Attempt to specify a version-specific URI for destination. 1125 stderr = self.RunGsUtil(['cp', fpath, k2_uri.uri], 1126 return_stderr=True, 1127 expected_status=1) 1128 self.assertIn('cannot be the destination for gsutil cp', stderr) 1129 1130 def test_versioning_no_parallelism(self): 1131 """Tests that copy all-versions errors when parallelism is enabled.""" 1132 # TODO(b/135780661): Remove retry after bug resolved 1133 @Retry(AssertionError, tries=3, timeout_secs=1) 1134 def _Check(): 1135 stderr = self.RunGsUtil([ 1136 '-m', 'cp', '-A', 1137 suri(self.nonexistent_bucket_name, 'foo'), 1138 suri(self.nonexistent_bucket_name, 'bar') 1139 ], 1140 expected_status=1, 1141 return_stderr=True) 1142 self.assertIn('-m option is not supported with the cp -A flag', stderr) 1143 1144 _Check() 1145 1146 @SkipForS3('S3 lists versioned objects in reverse timestamp order.') 1147 def test_recursive_copying_versioned_bucket(self): 1148 """Tests cp -R with versioned buckets.""" 1149 bucket1_uri = self.CreateVersionedBucket() 1150 bucket2_uri = self.CreateVersionedBucket() 1151 bucket3_uri = self.CreateVersionedBucket() 1152 1153 # Write two versions of an object to the bucket1. 1154 v1_uri = self.CreateObject(bucket_uri=bucket1_uri, 1155 object_name='k', 1156 contents=b'data0') 1157 self.CreateObject(bucket_uri=bucket1_uri, 1158 object_name='k', 1159 contents=b'longer_data1', 1160 gs_idempotent_generation=urigen(v1_uri)) 1161 1162 self.AssertNObjectsInBucket(bucket1_uri, 2, versioned=True) 1163 self.AssertNObjectsInBucket(bucket2_uri, 0, versioned=True) 1164 self.AssertNObjectsInBucket(bucket3_uri, 0, versioned=True) 1165 1166 # Recursively copy to second versioned bucket. 1167 # -A flag should copy all versions in order. 1168 self.RunGsUtil( 1169 ['cp', '-R', '-A', 1170 suri(bucket1_uri, '*'), 1171 suri(bucket2_uri)]) 1172 1173 # Use @Retry as hedge against bucket listing eventual consistency. 1174 @Retry(AssertionError, tries=3, timeout_secs=1) 1175 def _Check2(): 1176 """Validates the results of the cp -R.""" 1177 listing1 = self.RunGsUtil(['ls', '-la', suri(bucket1_uri)], 1178 return_stdout=True).split('\n') 1179 listing2 = self.RunGsUtil(['ls', '-la', suri(bucket2_uri)], 1180 return_stdout=True).split('\n') 1181 # 2 lines of listing output, 1 summary line, 1 empty line from \n split. 1182 self.assertEquals(len(listing1), 4) 1183 self.assertEquals(len(listing2), 4) 1184 1185 # First object in each bucket should match in size and version-less name. 1186 size1, _, uri_str1, _ = listing1[0].split() 1187 self.assertEquals(size1, str(len('data0'))) 1188 self.assertEquals(storage_uri(uri_str1).object_name, 'k') 1189 size2, _, uri_str2, _ = listing2[0].split() 1190 self.assertEquals(size2, str(len('data0'))) 1191 self.assertEquals(storage_uri(uri_str2).object_name, 'k') 1192 1193 # Similarly for second object in each bucket. 1194 size1, _, uri_str1, _ = listing1[1].split() 1195 self.assertEquals(size1, str(len('longer_data1'))) 1196 self.assertEquals(storage_uri(uri_str1).object_name, 'k') 1197 size2, _, uri_str2, _ = listing2[1].split() 1198 self.assertEquals(size2, str(len('longer_data1'))) 1199 self.assertEquals(storage_uri(uri_str2).object_name, 'k') 1200 1201 _Check2() 1202 1203 # Recursively copy to second versioned bucket with no -A flag. 1204 # This should copy only the live object. 1205 self.RunGsUtil(['cp', '-R', suri(bucket1_uri, '*'), suri(bucket3_uri)]) 1206 1207 # Use @Retry as hedge against bucket listing eventual consistency. 1208 @Retry(AssertionError, tries=3, timeout_secs=1) 1209 def _Check3(): 1210 """Validates the results of the cp -R.""" 1211 listing1 = self.RunGsUtil(['ls', '-la', suri(bucket1_uri)], 1212 return_stdout=True).split('\n') 1213 listing2 = self.RunGsUtil(['ls', '-la', suri(bucket3_uri)], 1214 return_stdout=True).split('\n') 1215 # 2 lines of listing output, 1 summary line, 1 empty line from \n split. 1216 self.assertEquals(len(listing1), 4) 1217 # 1 lines of listing output, 1 summary line, 1 empty line from \n split. 1218 self.assertEquals(len(listing2), 3) 1219 1220 # Live (second) object in bucket 1 should match the single live object. 1221 size1, _, uri_str1, _ = listing2[0].split() 1222 self.assertEquals(size1, str(len('longer_data1'))) 1223 self.assertEquals(storage_uri(uri_str1).object_name, 'k') 1224 1225 _Check3() 1226 1227 @SequentialAndParallelTransfer 1228 @SkipForS3('Preconditions not supported for S3.') 1229 def test_cp_generation_zero_match(self): 1230 """Tests that cp handles an object-not-exists precondition header.""" 1231 bucket_uri = self.CreateBucket() 1232 fpath1 = self.CreateTempFile(contents=b'data1') 1233 # Match 0 means only write the object if it doesn't already exist. 1234 gen_match_header = 'x-goog-if-generation-match:0' 1235 1236 # First copy should succeed. 1237 # TODO: This can fail (rarely) if the server returns a 5xx but actually 1238 # commits the bytes. If we add restarts on small uploads, handle this 1239 # case. 1240 self.RunGsUtil(['-h', gen_match_header, 'cp', fpath1, suri(bucket_uri)]) 1241 1242 # Second copy should fail with a precondition error. 1243 stderr = self.RunGsUtil( 1244 ['-h', gen_match_header, 'cp', fpath1, 1245 suri(bucket_uri)], 1246 return_stderr=True, 1247 expected_status=1) 1248 self.assertIn('PreconditionException', stderr) 1249 1250 @SequentialAndParallelTransfer 1251 @SkipForS3('Preconditions not supported for S3.') 1252 def test_cp_v_generation_match(self): 1253 """Tests that cp -v option handles the if-generation-match header.""" 1254 bucket_uri = self.CreateVersionedBucket() 1255 k1_uri = self.CreateObject(bucket_uri=bucket_uri, contents=b'data1') 1256 g1 = k1_uri.generation 1257 1258 tmpdir = self.CreateTempDir() 1259 fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents=b'data2') 1260 1261 gen_match_header = 'x-goog-if-generation-match:%s' % g1 1262 # First copy should succeed. 1263 self.RunGsUtil(['-h', gen_match_header, 'cp', fpath1, suri(k1_uri)]) 1264 1265 # Second copy should fail the precondition. 1266 stderr = self.RunGsUtil( 1267 ['-h', gen_match_header, 'cp', fpath1, 1268 suri(k1_uri)], 1269 return_stderr=True, 1270 expected_status=1) 1271 1272 self.assertIn('PreconditionException', stderr) 1273 1274 # Specifiying a generation with -n should fail before the request hits the 1275 # server. 1276 stderr = self.RunGsUtil( 1277 ['-h', gen_match_header, 'cp', '-n', fpath1, 1278 suri(k1_uri)], 1279 return_stderr=True, 1280 expected_status=1) 1281 1282 self.assertIn('ArgumentException', stderr) 1283 self.assertIn( 1284 'Specifying x-goog-if-generation-match is not supported ' 1285 'with cp -n', stderr) 1286 1287 @SequentialAndParallelTransfer 1288 def test_cp_nv(self): 1289 """Tests that cp -nv works when skipping existing file.""" 1290 bucket_uri = self.CreateVersionedBucket() 1291 k1_uri = self.CreateObject(bucket_uri=bucket_uri, contents=b'data1') 1292 1293 tmpdir = self.CreateTempDir() 1294 fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents=b'data2') 1295 1296 # First copy should succeed. 1297 self.RunGsUtil(['cp', '-nv', fpath1, suri(k1_uri)]) 1298 1299 # Second copy should skip copying. 1300 stderr = self.RunGsUtil( 1301 ['cp', '-nv', fpath1, suri(k1_uri)], return_stderr=True) 1302 self.assertIn('Skipping existing item:', stderr) 1303 1304 @SequentialAndParallelTransfer 1305 @SkipForS3('S3 lists versioned objects in reverse timestamp order.') 1306 def test_cp_v_option(self): 1307 """"Tests that cp -v returns the created object's version-specific URI.""" 1308 bucket_uri = self.CreateVersionedBucket() 1309 k1_uri = self.CreateObject(bucket_uri=bucket_uri, contents=b'data1') 1310 k2_uri = self.CreateObject(bucket_uri=bucket_uri, contents=b'data2') 1311 1312 # Case 1: Upload file to object using one-shot PUT. 1313 tmpdir = self.CreateTempDir() 1314 fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents=b'data1') 1315 self._run_cp_minus_v_test('-v', fpath1, k2_uri.uri) 1316 1317 # Case 2: Upload file to object using resumable upload. 1318 size_threshold = ONE_KIB 1319 boto_config_for_test = ('GSUtil', 'resumable_threshold', 1320 str(size_threshold)) 1321 with SetBotoConfigForTest([boto_config_for_test]): 1322 file_as_string = os.urandom(size_threshold) 1323 tmpdir = self.CreateTempDir() 1324 fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents=file_as_string) 1325 self._run_cp_minus_v_test('-v', fpath1, k2_uri.uri) 1326 1327 # Case 3: Upload stream to object. 1328 self._run_cp_minus_v_test('-v', '-', k2_uri.uri) 1329 1330 # Case 4: Download object to file. For this case we just expect output of 1331 # gsutil cp -v to be the URI of the file. 1332 tmpdir = self.CreateTempDir() 1333 fpath1 = self.CreateTempFile(tmpdir=tmpdir) 1334 dst_uri = storage_uri(fpath1) 1335 stderr = self.RunGsUtil( 1336 ['cp', '-v', suri(k1_uri), suri(dst_uri)], return_stderr=True) 1337 # TODO: Add ordering assertion (should be in stderr.split('\n)[-2]) back 1338 # once both the creation and status messages are handled by the UI thread. 1339 self.assertIn('Created: %s\n' % dst_uri.uri, stderr) 1340 1341 # Case 5: Daisy-chain from object to object. 1342 self._run_cp_minus_v_test('-Dv', k1_uri.uri, k2_uri.uri) 1343 1344 # Case 6: Copy object to object in-the-cloud. 1345 self._run_cp_minus_v_test('-v', k1_uri.uri, k2_uri.uri) 1346 1347 def _run_cp_minus_v_test(self, opt, src_str, dst_str): 1348 """Runs cp -v with the options and validates the results.""" 1349 stderr = self.RunGsUtil(['cp', opt, src_str, dst_str], return_stderr=True) 1350 match = re.search(r'Created: (.*)\n', stderr) 1351 self.assertIsNotNone(match) 1352 created_uri = match.group(1) 1353 1354 # Use @Retry as hedge against bucket listing eventual consistency. 1355 @Retry(AssertionError, tries=3, timeout_secs=1) 1356 def _Check1(): 1357 stdout = self.RunGsUtil(['ls', '-a', dst_str], return_stdout=True) 1358 lines = stdout.split('\n') 1359 # Final (most recent) object should match the "Created:" URI. This is 1360 # in second-to-last line (last line is '\n'). 1361 self.assertGreater(len(lines), 2) 1362 self.assertEqual(created_uri, lines[-2]) 1363 1364 _Check1() 1365 1366 @SequentialAndParallelTransfer 1367 def test_stdin_args(self): 1368 """Tests cp with the -I option.""" 1369 tmpdir = self.CreateTempDir() 1370 fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents=b'data1') 1371 fpath2 = self.CreateTempFile(tmpdir=tmpdir, contents=b'data2') 1372 bucket_uri = self.CreateBucket() 1373 self.RunGsUtil(['cp', '-I', suri(bucket_uri)], 1374 stdin='\n'.join((fpath1, fpath2))) 1375 1376 # Use @Retry as hedge against bucket listing eventual consistency. 1377 @Retry(AssertionError, tries=3, timeout_secs=1) 1378 def _Check1(): 1379 stdout = self.RunGsUtil(['ls', suri(bucket_uri)], return_stdout=True) 1380 self.assertIn(os.path.basename(fpath1), stdout) 1381 self.assertIn(os.path.basename(fpath2), stdout) 1382 self.assertNumLines(stdout, 2) 1383 1384 _Check1() 1385 1386 def test_cross_storage_class_cloud_cp(self): 1387 bucket1_uri = self.CreateBucket(storage_class='standard') 1388 bucket2_uri = self.CreateBucket( 1389 storage_class='durable_reduced_availability') 1390 key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents=b'foo') 1391 # Server now allows copy-in-the-cloud across storage classes. 1392 self.RunGsUtil(['cp', suri(key_uri), suri(bucket2_uri)]) 1393 1394 @unittest.skipUnless(HAS_S3_CREDS, 'Test requires both S3 and GS credentials') 1395 def test_cross_provider_cp(self): 1396 s3_bucket = self.CreateBucket(provider='s3') 1397 gs_bucket = self.CreateBucket(provider='gs') 1398 s3_key = self.CreateObject(bucket_uri=s3_bucket, contents=b'foo') 1399 gs_key = self.CreateObject(bucket_uri=gs_bucket, contents=b'bar') 1400 self.RunGsUtil(['cp', suri(s3_key), suri(gs_bucket)]) 1401 self.RunGsUtil(['cp', suri(gs_key), suri(s3_bucket)]) 1402 1403 @unittest.skipUnless(HAS_S3_CREDS, 'Test requires both S3 and GS credentials') 1404 @unittest.skip('This test performs a large copy but remains here for ' 1405 'debugging purposes.') 1406 def test_cross_provider_large_cp(self): 1407 s3_bucket = self.CreateBucket(provider='s3') 1408 gs_bucket = self.CreateBucket(provider='gs') 1409 s3_key = self.CreateObject(bucket_uri=s3_bucket, 1410 contents=b'f' * 1024 * 1024) 1411 gs_key = self.CreateObject(bucket_uri=gs_bucket, 1412 contents=b'b' * 1024 * 1024) 1413 self.RunGsUtil(['cp', suri(s3_key), suri(gs_bucket)]) 1414 self.RunGsUtil(['cp', suri(gs_key), suri(s3_bucket)]) 1415 with SetBotoConfigForTest([('GSUtil', 'resumable_threshold', str(ONE_KIB)), 1416 ('GSUtil', 'json_resumable_chunk_size', 1417 str(ONE_KIB * 256))]): 1418 # Ensure copy also works across json upload chunk boundaries. 1419 self.RunGsUtil(['cp', suri(s3_key), suri(gs_bucket)]) 1420 1421 @unittest.skipUnless(HAS_S3_CREDS, 'Test requires both S3 and GS credentials') 1422 def test_gs_to_s3_multipart_cp(self): 1423 """Ensure daisy_chain works for an object that is downloaded in 2 parts.""" 1424 s3_bucket = self.CreateBucket(provider='s3') 1425 gs_bucket = self.CreateBucket(provider='gs', prefer_json_api=True) 1426 num_bytes = int(_DEFAULT_DOWNLOAD_CHUNK_SIZE * 1.1) 1427 gs_key = self.CreateObject(bucket_uri=gs_bucket, 1428 contents=b'b' * num_bytes, 1429 prefer_json_api=True) 1430 self.RunGsUtil([ 1431 '-o', 's3:use-sigv4=True', '-o', 's3:host=s3.amazonaws.com', 'cp', 1432 suri(gs_key), 1433 suri(s3_bucket) 1434 ]) 1435 1436 @unittest.skip('This test is slow due to creating many objects, ' 1437 'but remains here for debugging purposes.') 1438 def test_daisy_chain_cp_file_sizes(self): 1439 """Ensure daisy chain cp works with a wide of file sizes.""" 1440 bucket_uri = self.CreateBucket() 1441 bucket2_uri = self.CreateBucket() 1442 exponent_cap = 28 # Up to 256 MiB in size. 1443 for i in range(exponent_cap): 1444 one_byte_smaller = 2**i - 1 1445 normal = 2**i 1446 one_byte_larger = 2**i + 1 1447 self.CreateObject(bucket_uri=bucket_uri, contents=b'a' * one_byte_smaller) 1448 self.CreateObject(bucket_uri=bucket_uri, contents=b'b' * normal) 1449 self.CreateObject(bucket_uri=bucket_uri, contents=b'c' * one_byte_larger) 1450 1451 self.AssertNObjectsInBucket(bucket_uri, exponent_cap * 3) 1452 self.RunGsUtil( 1453 ['-m', 'cp', '-D', 1454 suri(bucket_uri, '**'), 1455 suri(bucket2_uri)]) 1456 1457 self.AssertNObjectsInBucket(bucket2_uri, exponent_cap * 3) 1458 1459 def test_daisy_chain_cp(self): 1460 """Tests cp with the -D option.""" 1461 bucket1_uri = self.CreateBucket(storage_class='standard') 1462 bucket2_uri = self.CreateBucket( 1463 storage_class='durable_reduced_availability') 1464 key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents=b'foo') 1465 # Set some headers on source object so we can verify that headers are 1466 # presereved by daisy-chain copy. 1467 self.RunGsUtil([ 1468 'setmeta', '-h', 'Cache-Control:public,max-age=12', '-h', 1469 'Content-Type:image/gif', '-h', 1470 'x-%s-meta-1:abcd' % self.provider_custom_meta, 1471 suri(key_uri) 1472 ]) 1473 # Set public-read (non-default) ACL so we can verify that cp -D -p works. 1474 self.RunGsUtil(['acl', 'set', 'public-read', suri(key_uri)]) 1475 acl_json = self.RunGsUtil(['acl', 'get', suri(key_uri)], return_stdout=True) 1476 # Perform daisy-chain copy and verify that source object headers and ACL 1477 # were preserved. Also specify -n option to test that gsutil correctly 1478 # removes the x-goog-if-generation-match:0 header that was set at uploading 1479 # time when updating the ACL. 1480 stderr = self.RunGsUtil( 1481 ['cp', '-Dpn', suri(key_uri), 1482 suri(bucket2_uri)], return_stderr=True) 1483 self.assertNotIn('Copy-in-the-cloud disallowed', stderr) 1484 1485 @Retry(AssertionError, tries=3, timeout_secs=1) 1486 def _Check(): 1487 uri = suri(bucket2_uri, key_uri.object_name) 1488 stdout = self.RunGsUtil(['ls', '-L', uri], return_stdout=True) 1489 self.assertRegex(stdout, r'Cache-Control:\s+public,max-age=12') 1490 self.assertRegex(stdout, r'Content-Type:\s+image/gif') 1491 self.assertRegex(stdout, r'Metadata:\s+1:\s+abcd') 1492 new_acl_json = self.RunGsUtil(['acl', 'get', uri], return_stdout=True) 1493 self.assertEqual(acl_json, new_acl_json) 1494 1495 _Check() 1496 1497 @unittest.skipUnless( 1498 not HAS_GS_PORT, 'gs_port is defined in config which can cause ' 1499 'problems when uploading and downloading to the same local host port') 1500 def test_daisy_chain_cp_download_failure(self): 1501 """Tests cp with the -D option when the download thread dies.""" 1502 bucket1_uri = self.CreateBucket() 1503 bucket2_uri = self.CreateBucket() 1504 key_uri = self.CreateObject(bucket_uri=bucket1_uri, 1505 contents=b'a' * self.halt_size) 1506 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) 1507 test_callback_file = self.CreateTempFile( 1508 contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5))) 1509 with SetBotoConfigForTest([boto_config_for_test]): 1510 stderr = self.RunGsUtil([ 1511 'cp', '--testcallbackfile', test_callback_file, '-D', 1512 suri(key_uri), 1513 suri(bucket2_uri) 1514 ], 1515 expected_status=1, 1516 return_stderr=True) 1517 # Should have three exception traces; one from the download thread and 1518 # two from the upload thread (expection message is repeated in main's 1519 # _OutputAndExit). 1520 self.assertEqual( 1521 stderr.count( 1522 'ResumableDownloadException: Artifically halting download'), 3) 1523 1524 def test_streaming_gzip_upload(self): 1525 """Tests error when compression flag is requested on a streaming source.""" 1526 bucket_uri = self.CreateBucket() 1527 stderr = self.RunGsUtil( 1528 ['cp', '-Z', '-', suri(bucket_uri, 'foo')], 1529 return_stderr=True, 1530 expected_status=1, 1531 stdin='streaming data') 1532 self.assertIn( 1533 'gzip compression is not currently supported on streaming uploads', 1534 stderr) 1535 1536 def test_seek_ahead_upload_cp(self): 1537 """Tests that the seek-ahead iterator estimates total upload work.""" 1538 tmpdir = self.CreateTempDir(test_files=3) 1539 bucket_uri = self.CreateBucket() 1540 1541 with SetBotoConfigForTest([('GSUtil', 'task_estimation_threshold', '1'), 1542 ('GSUtil', 'task_estimation_force', 'True')]): 1543 stderr = self.RunGsUtil( 1544 ['-m', 'cp', '-r', tmpdir, suri(bucket_uri)], return_stderr=True) 1545 self.assertIn( 1546 'Estimated work for this command: objects: 3, total size: 18', stderr) 1547 1548 with SetBotoConfigForTest([('GSUtil', 'task_estimation_threshold', '0'), 1549 ('GSUtil', 'task_estimation_force', 'True')]): 1550 stderr = self.RunGsUtil( 1551 ['-m', 'cp', '-r', tmpdir, suri(bucket_uri)], return_stderr=True) 1552 self.assertNotIn('Estimated work', stderr) 1553 1554 def test_seek_ahead_download_cp(self): 1555 tmpdir = self.CreateTempDir() 1556 bucket_uri = self.CreateBucket(test_objects=3) 1557 self.AssertNObjectsInBucket(bucket_uri, 3) 1558 1559 with SetBotoConfigForTest([('GSUtil', 'task_estimation_threshold', '1'), 1560 ('GSUtil', 'task_estimation_force', 'True')]): 1561 stderr = self.RunGsUtil( 1562 ['-m', 'cp', '-r', suri(bucket_uri), tmpdir], return_stderr=True) 1563 self.assertIn( 1564 'Estimated work for this command: objects: 3, total size: 18', stderr) 1565 1566 with SetBotoConfigForTest([('GSUtil', 'task_estimation_threshold', '0'), 1567 ('GSUtil', 'task_estimation_force', 'True')]): 1568 stderr = self.RunGsUtil( 1569 ['-m', 'cp', '-r', suri(bucket_uri), tmpdir], return_stderr=True) 1570 self.assertNotIn('Estimated work', stderr) 1571 1572 def test_canned_acl_cp(self): 1573 """Tests copying with a canned ACL.""" 1574 bucket1_uri = self.CreateBucket() 1575 bucket2_uri = self.CreateBucket() 1576 key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents=b'foo') 1577 self.RunGsUtil( 1578 ['cp', '-a', 'public-read', 1579 suri(key_uri), 1580 suri(bucket2_uri)]) 1581 # Set public-read on the original key after the copy so we can compare 1582 # the ACLs. 1583 self.RunGsUtil(['acl', 'set', 'public-read', suri(key_uri)]) 1584 public_read_acl = self.RunGsUtil(['acl', 'get', suri(key_uri)], 1585 return_stdout=True) 1586 1587 @Retry(AssertionError, tries=3, timeout_secs=1) 1588 def _Check(): 1589 uri = suri(bucket2_uri, key_uri.object_name) 1590 new_acl_json = self.RunGsUtil(['acl', 'get', uri], return_stdout=True) 1591 self.assertEqual(public_read_acl, new_acl_json) 1592 1593 _Check() 1594 1595 @SequentialAndParallelTransfer 1596 def test_canned_acl_upload(self): 1597 """Tests uploading a file with a canned ACL.""" 1598 bucket1_uri = self.CreateBucket() 1599 key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents=b'foo') 1600 # Set public-read on the object so we can compare the ACLs. 1601 self.RunGsUtil(['acl', 'set', 'public-read', suri(key_uri)]) 1602 public_read_acl = self.RunGsUtil(['acl', 'get', suri(key_uri)], 1603 return_stdout=True) 1604 1605 file_name = 'bar' 1606 fpath = self.CreateTempFile(file_name=file_name, contents=b'foo') 1607 self.RunGsUtil(['cp', '-a', 'public-read', fpath, suri(bucket1_uri)]) 1608 new_acl_json = self.RunGsUtil( 1609 ['acl', 'get', suri(bucket1_uri, file_name)], return_stdout=True) 1610 self.assertEqual(public_read_acl, new_acl_json) 1611 1612 resumable_size = ONE_KIB 1613 boto_config_for_test = ('GSUtil', 'resumable_threshold', 1614 str(resumable_size)) 1615 with SetBotoConfigForTest([boto_config_for_test]): 1616 resumable_file_name = 'resumable_bar' 1617 resumable_contents = os.urandom(resumable_size) 1618 resumable_fpath = self.CreateTempFile(file_name=resumable_file_name, 1619 contents=resumable_contents) 1620 self.RunGsUtil( 1621 ['cp', '-a', 'public-read', resumable_fpath, 1622 suri(bucket1_uri)]) 1623 new_resumable_acl_json = self.RunGsUtil( 1624 ['acl', 'get', suri(bucket1_uri, resumable_file_name)], 1625 return_stdout=True) 1626 self.assertEqual(public_read_acl, new_resumable_acl_json) 1627 1628 def test_cp_key_to_local_stream(self): 1629 bucket_uri = self.CreateBucket() 1630 contents = b'foo' 1631 key_uri = self.CreateObject(bucket_uri=bucket_uri, contents=contents) 1632 stdout = self.RunGsUtil(['cp', suri(key_uri), '-'], return_stdout=True) 1633 self.assertIn(contents, stdout.encode('ascii')) 1634 1635 def test_cp_local_file_to_local_stream(self): 1636 contents = b'content' 1637 fpath = self.CreateTempFile(contents=contents) 1638 stdout = self.RunGsUtil(['cp', fpath, '-'], return_stdout=True) 1639 self.assertIn(contents, stdout.encode(UTF8)) 1640 1641 @SequentialAndParallelTransfer 1642 def test_cp_zero_byte_file(self): 1643 dst_bucket_uri = self.CreateBucket() 1644 src_dir = self.CreateTempDir() 1645 fpath = os.path.join(src_dir, 'zero_byte') 1646 with open(fpath, 'w') as unused_out_file: 1647 pass # Write a zero byte file 1648 self.RunGsUtil(['cp', fpath, suri(dst_bucket_uri)]) 1649 1650 @Retry(AssertionError, tries=3, timeout_secs=1) 1651 def _Check1(): 1652 stdout = self.RunGsUtil(['ls', suri(dst_bucket_uri)], return_stdout=True) 1653 self.assertIn(os.path.basename(fpath), stdout) 1654 1655 _Check1() 1656 1657 download_path = os.path.join(src_dir, 'zero_byte_download') 1658 self.RunGsUtil(['cp', suri(dst_bucket_uri, 'zero_byte'), download_path]) 1659 self.assertTrue(os.stat(download_path)) 1660 1661 def test_copy_bucket_to_bucket(self): 1662 """Tests recursively copying from bucket to bucket. 1663 1664 This should produce identically named objects (and not, in particular, 1665 destination objects named by the version-specific URI from source objects). 1666 """ 1667 src_bucket_uri = self.CreateVersionedBucket() 1668 dst_bucket_uri = self.CreateVersionedBucket() 1669 self.CreateObject(bucket_uri=src_bucket_uri, 1670 object_name='obj0', 1671 contents=b'abc') 1672 self.CreateObject(bucket_uri=src_bucket_uri, 1673 object_name='obj1', 1674 contents=b'def') 1675 1676 # Use @Retry as hedge against bucket listing eventual consistency. 1677 @Retry(AssertionError, tries=3, timeout_secs=1) 1678 def _CopyAndCheck(): 1679 self.RunGsUtil(['cp', '-R', suri(src_bucket_uri), suri(dst_bucket_uri)]) 1680 stdout = self.RunGsUtil(['ls', '-R', dst_bucket_uri.uri], 1681 return_stdout=True) 1682 self.assertIn( 1683 '%s%s/obj0\n' % (dst_bucket_uri, src_bucket_uri.bucket_name), stdout) 1684 self.assertIn( 1685 '%s%s/obj1\n' % (dst_bucket_uri, src_bucket_uri.bucket_name), stdout) 1686 1687 _CopyAndCheck() 1688 1689 @SkipForGS('Only s3 V4 signatures error on location mismatches.') 1690 def test_copy_bucket_to_bucket_with_location_redirect(self): 1691 # cp uses a sender function that raises an exception on location mismatches, 1692 # instead of returning a response. This integration test ensures retries 1693 # from exceptions work correctly. 1694 1695 src_bucket_region = 'ap-east-1' 1696 dest_bucket_region = 'us-east-2' 1697 src_bucket_host = 's3.%s.amazonaws.com' % src_bucket_region 1698 dest_bucket_host = 's3.%s.amazonaws.com' % dest_bucket_region 1699 client_host = 's3.eu-west-1.amazonaws.com' 1700 1701 with SetBotoConfigForTest([('s3', 'host', src_bucket_host)]): 1702 src_bucket_uri = self.CreateBucket(location=src_bucket_region) 1703 self.CreateObject(bucket_uri=src_bucket_uri, 1704 object_name='obj0', 1705 contents=b'abc') 1706 self.CreateObject(bucket_uri=src_bucket_uri, 1707 object_name='obj1', 1708 contents=b'def') 1709 1710 with SetBotoConfigForTest([('s3', 'host', dest_bucket_host)]): 1711 dst_bucket_uri = self.CreateBucket(location=dest_bucket_region) 1712 1713 # Use @Retry as hedge against bucket listing eventual consistency. 1714 @Retry(AssertionError, tries=3, timeout_secs=1) 1715 def _CopyAndCheck(): 1716 self.RunGsUtil(['cp', '-R', suri(src_bucket_uri), suri(dst_bucket_uri)]) 1717 stdout = self.RunGsUtil(['ls', '-R', dst_bucket_uri.uri], 1718 return_stdout=True) 1719 self.assertIn( 1720 '%s%s/obj0\n' % (dst_bucket_uri, src_bucket_uri.bucket_name), stdout) 1721 self.assertIn( 1722 '%s%s/obj1\n' % (dst_bucket_uri, src_bucket_uri.bucket_name), stdout) 1723 1724 with SetBotoConfigForTest([('s3', 'host', client_host)]): 1725 _CopyAndCheck() 1726 1727 def test_copy_bucket_to_dir(self): 1728 """Tests recursively copying from bucket to a directory. 1729 1730 This should produce identically named objects (and not, in particular, 1731 destination objects named by the version- specific URI from source objects). 1732 """ 1733 src_bucket_uri = self.CreateBucket() 1734 dst_dir = self.CreateTempDir() 1735 self.CreateObject(bucket_uri=src_bucket_uri, 1736 object_name='obj0', 1737 contents=b'abc') 1738 self.CreateObject(bucket_uri=src_bucket_uri, 1739 object_name='obj1', 1740 contents=b'def') 1741 1742 # Use @Retry as hedge against bucket listing eventual consistency. 1743 @Retry(AssertionError, tries=3, timeout_secs=1) 1744 def _CopyAndCheck(): 1745 """Copies the bucket recursively and validates the results.""" 1746 self.RunGsUtil(['cp', '-R', suri(src_bucket_uri), dst_dir]) 1747 dir_list = [] 1748 for dirname, _, filenames in os.walk(dst_dir): 1749 for filename in filenames: 1750 dir_list.append(os.path.join(dirname, filename)) 1751 dir_list = sorted(dir_list) 1752 self.assertEqual(len(dir_list), 2) 1753 self.assertEqual( 1754 os.path.join(dst_dir, src_bucket_uri.bucket_name, 'obj0'), 1755 dir_list[0]) 1756 self.assertEqual( 1757 os.path.join(dst_dir, src_bucket_uri.bucket_name, 'obj1'), 1758 dir_list[1]) 1759 1760 _CopyAndCheck() 1761 1762 @unittest.skipUnless(HAS_S3_CREDS, 'Test requires both S3 and GS credentials') 1763 def test_copy_object_to_dir_s3_v4(self): 1764 """Tests copying object from s3 to local dir with v4 signature. 1765 1766 Regions like us-east2 accept only V4 signature, hence we will create 1767 the bucket in us-east2 region to enforce testing with V4 signature. 1768 """ 1769 src_bucket_uri = self.CreateBucket(provider='s3', location='us-east-2') 1770 dst_dir = self.CreateTempDir() 1771 self.CreateObject(bucket_uri=src_bucket_uri, 1772 object_name='obj0', 1773 contents=b'abc') 1774 self.CreateObject(bucket_uri=src_bucket_uri, 1775 object_name='obj1', 1776 contents=b'def') 1777 1778 # Use @Retry as hedge against bucket listing eventual consistency. 1779 @Retry(AssertionError, tries=3, timeout_secs=1) 1780 def _CopyAndCheck(): 1781 """Copies the bucket recursively and validates the results.""" 1782 self.RunGsUtil(['cp', '-R', suri(src_bucket_uri), dst_dir]) 1783 dir_list = [] 1784 for dirname, _, filenames in os.walk(dst_dir): 1785 for filename in filenames: 1786 dir_list.append(os.path.join(dirname, filename)) 1787 dir_list = sorted(dir_list) 1788 self.assertEqual(len(dir_list), 2) 1789 self.assertEqual( 1790 os.path.join(dst_dir, src_bucket_uri.bucket_name, 'obj0'), 1791 dir_list[0]) 1792 self.assertEqual( 1793 os.path.join(dst_dir, src_bucket_uri.bucket_name, 'obj1'), 1794 dir_list[1]) 1795 1796 _CopyAndCheck() 1797 1798 @SkipForS3('The boto lib used for S3 does not handle objects ' 1799 'starting with slashes if we use V4 signature') 1800 def test_recursive_download_with_leftover_slash_only_dir_placeholder(self): 1801 """Tests that we correctly handle leftover dir placeholders.""" 1802 src_bucket_uri = self.CreateBucket() 1803 dst_dir = self.CreateTempDir() 1804 self.CreateObject(bucket_uri=src_bucket_uri, 1805 object_name='obj0', 1806 contents=b'abc') 1807 self.CreateObject(bucket_uri=src_bucket_uri, 1808 object_name='obj1', 1809 contents=b'def') 1810 1811 # Create a placeholder like what can be left over by web GUI tools. 1812 key_uri = src_bucket_uri.clone_replace_name('/') 1813 key_uri.set_contents_from_string('') 1814 self.AssertNObjectsInBucket(src_bucket_uri, 3) 1815 1816 self.RunGsUtil(['cp', '-R', suri(src_bucket_uri), dst_dir]) 1817 dir_list = [] 1818 for dirname, _, filenames in os.walk(dst_dir): 1819 for filename in filenames: 1820 dir_list.append(os.path.join(dirname, filename)) 1821 dir_list = sorted(dir_list) 1822 self.assertEqual(len(dir_list), 2) 1823 self.assertEqual(os.path.join(dst_dir, src_bucket_uri.bucket_name, 'obj0'), 1824 dir_list[0]) 1825 self.assertEqual(os.path.join(dst_dir, src_bucket_uri.bucket_name, 'obj1'), 1826 dir_list[1]) 1827 1828 def test_recursive_download_with_leftover_dir_placeholder(self): 1829 """Tests that we correctly handle leftover dir placeholders.""" 1830 src_bucket_uri = self.CreateBucket() 1831 dst_dir = self.CreateTempDir() 1832 self.CreateObject(bucket_uri=src_bucket_uri, 1833 object_name='obj0', 1834 contents=b'abc') 1835 self.CreateObject(bucket_uri=src_bucket_uri, 1836 object_name='obj1', 1837 contents=b'def') 1838 1839 # Create a placeholder like what can be left over by web GUI tools. 1840 key_uri = src_bucket_uri.clone_replace_name('foo/') 1841 key_uri.set_contents_from_string('') 1842 self.AssertNObjectsInBucket(src_bucket_uri, 3) 1843 1844 self.RunGsUtil(['cp', '-R', suri(src_bucket_uri), dst_dir]) 1845 dir_list = [] 1846 for dirname, _, filenames in os.walk(dst_dir): 1847 for filename in filenames: 1848 dir_list.append(os.path.join(dirname, filename)) 1849 dir_list = sorted(dir_list) 1850 self.assertEqual(len(dir_list), 2) 1851 self.assertEqual(os.path.join(dst_dir, src_bucket_uri.bucket_name, 'obj0'), 1852 dir_list[0]) 1853 self.assertEqual(os.path.join(dst_dir, src_bucket_uri.bucket_name, 'obj1'), 1854 dir_list[1]) 1855 1856 def test_copy_quiet(self): 1857 bucket_uri = self.CreateBucket() 1858 key_uri = self.CreateObject(bucket_uri=bucket_uri, contents=b'foo') 1859 stderr = self.RunGsUtil( 1860 ['-q', 'cp', 1861 suri(key_uri), 1862 suri(bucket_uri.clone_replace_name('o2'))], 1863 return_stderr=True) 1864 self.assertEqual(stderr.count('Copying '), 0) 1865 1866 def test_cp_md5_match(self): 1867 """Tests that the uploaded object has the expected MD5. 1868 1869 Note that while this does perform a file to object upload, MD5's are 1870 not supported for composite objects so we don't use the decorator in this 1871 case. 1872 """ 1873 bucket_uri = self.CreateBucket() 1874 fpath = self.CreateTempFile(contents=b'bar') 1875 with open(fpath, 'rb') as f_in: 1876 md5 = binascii.unhexlify(CalculateMd5FromContents(f_in)) 1877 try: 1878 encoded_bytes = base64.encodebytes(md5) 1879 except AttributeError: 1880 # For Python 2 compatability. 1881 encoded_bytes = base64.encodestring(md5) 1882 file_md5 = encoded_bytes.rstrip(b'\n') 1883 self.RunGsUtil(['cp', fpath, suri(bucket_uri)]) 1884 1885 # Use @Retry as hedge against bucket listing eventual consistency. 1886 @Retry(AssertionError, tries=3, timeout_secs=1) 1887 def _Check1(): 1888 stdout = self.RunGsUtil(['ls', '-L', suri(bucket_uri)], 1889 return_stdout=True) 1890 self.assertRegex( 1891 stdout, r'Hash\s+\(md5\):\s+%s' % re.escape(file_md5.decode('ascii'))) 1892 1893 _Check1() 1894 1895 @unittest.skipIf(IS_WINDOWS, 1896 'Unicode handling on Windows requires mods to site-packages') 1897 @SequentialAndParallelTransfer 1898 def test_cp_manifest_upload_unicode(self): 1899 return self._ManifestUpload('foo-unicöde'.encode(UTF8), 1900 'bar-unicöde'.encode(UTF8), 1901 'manifest-unicöde'.encode(UTF8)) 1902 1903 @SequentialAndParallelTransfer 1904 def test_cp_manifest_upload(self): 1905 """Tests uploading with a mnifest file.""" 1906 return self._ManifestUpload('foo', 'bar', 'manifest') 1907 1908 def _ManifestUpload(self, file_name, object_name, manifest_name): 1909 """Tests uploading with a manifest file.""" 1910 bucket_uri = self.CreateBucket() 1911 dsturi = suri(bucket_uri, object_name) 1912 1913 fpath = self.CreateTempFile(file_name=file_name, contents=b'bar') 1914 logpath = self.CreateTempFile(file_name=manifest_name, contents=b'') 1915 # Ensure the file is empty. 1916 open(logpath, 'w').close() 1917 self.RunGsUtil(['cp', '-L', logpath, fpath, dsturi]) 1918 1919 with open(logpath, 'r') as f: 1920 lines = f.readlines() 1921 if six.PY2: 1922 lines = [six.text_type(line, UTF8) for line in lines] 1923 1924 self.assertEqual(len(lines), 2) 1925 1926 expected_headers = [ 1927 'Source', 'Destination', 'Start', 'End', 'Md5', 'UploadId', 1928 'Source Size', 'Bytes Transferred', 'Result', 'Description' 1929 ] 1930 self.assertEqual(expected_headers, lines[0].strip().split(',')) 1931 results = lines[1].strip().split(',') 1932 1933 results = dict(zip(expected_headers, results)) 1934 1935 self.assertEqual( 1936 results['Source'], 1937 'file://' + fpath, 1938 ) 1939 self.assertEqual( 1940 results['Destination'], 1941 dsturi, 1942 ) 1943 1944 date_format = '%Y-%m-%dT%H:%M:%S.%fZ' 1945 start_date = datetime.datetime.strptime(results['Start'], date_format) 1946 end_date = datetime.datetime.strptime(results['End'], date_format) 1947 self.assertEqual(end_date > start_date, True) 1948 1949 if self.RunGsUtil == testcase.GsUtilIntegrationTestCase.RunGsUtil: 1950 # Check that we didn't do automatic parallel uploads - compose doesn't 1951 # calculate the MD5 hash. Since RunGsUtil is overriden in 1952 # TestCpParallelUploads to force parallel uploads, we can check which 1953 # method was used. 1954 self.assertEqual(results['Md5'], 'rL0Y20zC+Fzt72VPzMSk2A==') 1955 1956 self.assertEqual(int(results['Source Size']), 3) 1957 self.assertEqual(int(results['Bytes Transferred']), 3) 1958 self.assertEqual(results['Result'], 'OK') 1959 1960 @SequentialAndParallelTransfer 1961 def test_cp_manifest_download(self): 1962 """Tests downloading with a manifest file.""" 1963 key_uri = self.CreateObject(contents=b'foo') 1964 fpath = self.CreateTempFile(contents=b'') 1965 logpath = self.CreateTempFile(contents=b'') 1966 # Ensure the file is empty. 1967 open(logpath, 'w').close() 1968 self.RunGsUtil( 1969 ['cp', '-L', logpath, suri(key_uri), fpath], return_stdout=True) 1970 with open(logpath, 'r') as f: 1971 lines = f.readlines() 1972 if six.PY3: 1973 decode_lines = [] 1974 for line in lines: 1975 if line.startswith("b'"): 1976 some_strs = line.split(',') 1977 line_parts = [] 1978 for some_str in some_strs: 1979 if some_str.startswith("b'"): 1980 line_parts.append(ast.literal_eval(some_str).decode(UTF8)) 1981 else: 1982 line_parts.append(some_str) 1983 decode_lines.append(','.join(line_parts)) 1984 else: 1985 decode_lines.append(line) 1986 lines = decode_lines 1987 self.assertEqual(len(lines), 2) 1988 1989 expected_headers = [ 1990 'Source', 'Destination', 'Start', 'End', 'Md5', 'UploadId', 1991 'Source Size', 'Bytes Transferred', 'Result', 'Description' 1992 ] 1993 self.assertEqual(expected_headers, lines[0].strip().split(',')) 1994 results = lines[1].strip().split(',') 1995 self.assertEqual(results[0][:5], '%s://' % self.default_provider) # source 1996 self.assertEqual(results[1][:7], 'file://') # destination 1997 date_format = '%Y-%m-%dT%H:%M:%S.%fZ' 1998 start_date = datetime.datetime.strptime(results[2], date_format) 1999 end_date = datetime.datetime.strptime(results[3], date_format) 2000 self.assertEqual(end_date > start_date, True) 2001 self.assertEqual(int(results[6]), 3) # Source Size 2002 # Bytes transferred might be more than 3 if the file was gzipped, since 2003 # the minimum gzip header is 10 bytes. 2004 self.assertGreaterEqual(int(results[7]), 3) # Bytes Transferred 2005 self.assertEqual(results[8], 'OK') # Result 2006 2007 @SequentialAndParallelTransfer 2008 def test_copy_unicode_non_ascii_filename(self): 2009 key_uri = self.CreateObject() 2010 # Try with and without resumable upload threshold, to ensure that each 2011 # scenario works. In particular, resumable uploads have tracker filename 2012 # logic. 2013 file_contents = b'x' * START_CALLBACK_PER_BYTES * 2 2014 fpath = self.CreateTempFile(file_name='Аудиоархив', contents=file_contents) 2015 with SetBotoConfigForTest([('GSUtil', 'resumable_threshold', '1')]): 2016 # fpath_bytes = fpath.encode(UTF8) 2017 self.RunGsUtil(['cp', fpath, suri(key_uri)], return_stderr=True) 2018 stdout = self.RunGsUtil(['cat', suri(key_uri)], return_stdout=True) 2019 self.assertEquals(stdout.encode('ascii'), file_contents) 2020 with SetBotoConfigForTest([('GSUtil', 'resumable_threshold', 2021 str(START_CALLBACK_PER_BYTES * 3))]): 2022 self.RunGsUtil(['cp', fpath, suri(key_uri)], return_stderr=True) 2023 stdout = self.RunGsUtil(['cat', suri(key_uri)], return_stdout=True) 2024 self.assertEquals(stdout.encode('ascii'), file_contents) 2025 2026 # Note: We originally one time implemented a test 2027 # (test_copy_invalid_unicode_filename) that invalid unicode filenames were 2028 # skipped, but it turns out os.walk() on macOS doesn't have problems with 2029 # such files (so, failed that test). Given that, we decided to remove the 2030 # test. 2031 2032 @SequentialAndParallelTransfer 2033 def test_gzip_upload_and_download(self): 2034 bucket_uri = self.CreateBucket() 2035 contents = b'x' * 10000 2036 tmpdir = self.CreateTempDir() 2037 self.CreateTempFile(file_name='test.html', tmpdir=tmpdir, contents=contents) 2038 self.CreateTempFile(file_name='test.js', tmpdir=tmpdir, contents=contents) 2039 self.CreateTempFile(file_name='test.txt', tmpdir=tmpdir, contents=contents) 2040 # Test that copying specifying only 2 of the 3 prefixes gzips the correct 2041 # files, and test that including whitespace in the extension list works. 2042 self.RunGsUtil([ 2043 'cp', '-z', 'js, html', 2044 os.path.join(tmpdir, 'test.*'), 2045 suri(bucket_uri) 2046 ]) 2047 self.AssertNObjectsInBucket(bucket_uri, 3) 2048 uri1 = suri(bucket_uri, 'test.html') 2049 uri2 = suri(bucket_uri, 'test.js') 2050 uri3 = suri(bucket_uri, 'test.txt') 2051 stdout = self.RunGsUtil(['stat', uri1], return_stdout=True) 2052 self.assertRegex(stdout, r'Content-Encoding:\s+gzip') 2053 stdout = self.RunGsUtil(['stat', uri2], return_stdout=True) 2054 self.assertRegex(stdout, r'Content-Encoding:\s+gzip') 2055 stdout = self.RunGsUtil(['stat', uri3], return_stdout=True) 2056 self.assertNotRegex(stdout, r'Content-Encoding:\s+gzip') 2057 fpath4 = self.CreateTempFile() 2058 for uri in (uri1, uri2, uri3): 2059 self.RunGsUtil(['cp', uri, suri(fpath4)]) 2060 with open(fpath4, 'rb') as f: 2061 self.assertEqual(f.read(), contents) 2062 2063 @SkipForS3('No compressed transport encoding support for S3.') 2064 @SkipForXML('No compressed transport encoding support for the XML API.') 2065 @SequentialAndParallelTransfer 2066 def test_gzip_transport_encoded_upload_and_download(self): 2067 """Test gzip encoded files upload correctly. 2068 2069 This checks that files are not tagged with a gzip content encoding and 2070 that the contents of the files are uncompressed in GCS. This test uses the 2071 -j flag to target specific extensions. 2072 """ 2073 2074 def _create_test_data(): # pylint: disable=invalid-name 2075 """Setup the bucket and local data to test with. 2076 2077 Returns: 2078 Triplet containing the following values: 2079 bucket_uri: String URI of cloud storage bucket to upload mock data 2080 to. 2081 tmpdir: String, path of a temporary directory to write mock data to. 2082 local_uris: Tuple of three strings; each is the file path to a file 2083 containing mock data. 2084 """ 2085 bucket_uri = self.CreateBucket() 2086 contents = b'x' * 10000 2087 tmpdir = self.CreateTempDir() 2088 2089 local_uris = [] 2090 for filename in ('test.html', 'test.js', 'test.txt'): 2091 local_uris.append( 2092 self.CreateTempFile(file_name=filename, 2093 tmpdir=tmpdir, 2094 contents=contents)) 2095 2096 return (bucket_uri, tmpdir, local_uris) 2097 2098 def _upload_test_data(tmpdir, bucket_uri): # pylint: disable=invalid-name 2099 """Upload local test data. 2100 2101 Args: 2102 tmpdir: String, path of a temporary directory to write mock data to. 2103 bucket_uri: String URI of cloud storage bucket to upload mock data to. 2104 2105 Returns: 2106 stderr: String output from running the gsutil command to upload mock 2107 data. 2108 """ 2109 stderr = self.RunGsUtil([ 2110 '-D', 'cp', '-j', 'js, html', 2111 os.path.join(tmpdir, 'test*'), 2112 suri(bucket_uri) 2113 ], 2114 return_stderr=True) 2115 self.AssertNObjectsInBucket(bucket_uri, 3) 2116 return stderr 2117 2118 def _assert_sent_compressed(local_uris, stderr): # pylint: disable=invalid-name 2119 """Ensure the correct files were marked for compression. 2120 2121 Args: 2122 local_uris: Tuple of three strings; each is the file path to a file 2123 containing mock data. 2124 stderr: String output from running the gsutil command to upload mock 2125 data. 2126 """ 2127 local_uri_html, local_uri_js, local_uri_txt = local_uris 2128 assert_base_string = 'Using compressed transport encoding for file://{}.' 2129 self.assertIn(assert_base_string.format(local_uri_html), stderr) 2130 self.assertIn(assert_base_string.format(local_uri_js), stderr) 2131 self.assertNotIn(assert_base_string.format(local_uri_txt), stderr) 2132 2133 def _assert_stored_uncompressed(bucket_uri, contents=b'x' * 10000): # pylint: disable=invalid-name 2134 """Ensure the files are not compressed when they are stored in the bucket. 2135 2136 Args: 2137 bucket_uri: String with URI for bucket containing uploaded test data. 2138 contents: Byte string that are stored in each file in the bucket. 2139 """ 2140 local_uri_html = suri(bucket_uri, 'test.html') 2141 local_uri_js = suri(bucket_uri, 'test.js') 2142 local_uri_txt = suri(bucket_uri, 'test.txt') 2143 fpath4 = self.CreateTempFile() 2144 2145 for uri in (local_uri_html, local_uri_js, local_uri_txt): 2146 stdout = self.RunGsUtil(['stat', uri], return_stdout=True) 2147 self.assertNotRegex(stdout, r'Content-Encoding:\s+gzip') 2148 self.RunGsUtil(['cp', uri, suri(fpath4)]) 2149 with open(fpath4, 'rb') as f: 2150 self.assertEqual(f.read(), contents) 2151 2152 # Get mock data, run tests 2153 bucket_uri, tmpdir, local_uris = _create_test_data() 2154 stderr = _upload_test_data(tmpdir, bucket_uri) 2155 _assert_sent_compressed(local_uris, stderr) 2156 _assert_stored_uncompressed(bucket_uri) 2157 2158 @SkipForS3('No compressed transport encoding support for S3.') 2159 @SkipForXML('No compressed transport encoding support for the XML API.') 2160 @SequentialAndParallelTransfer 2161 def test_gzip_transport_encoded_parallel_upload_non_resumable(self): 2162 """Test non resumable, gzip encoded files upload correctly in parallel. 2163 2164 This test generates a small amount of data (e.g. 100 chars) to upload. 2165 Due to the small size, it will be below the resumable threshold, 2166 and test the behavior of non-resumable uploads. 2167 """ 2168 # Setup the bucket and local data. 2169 bucket_uri = self.CreateBucket() 2170 contents = b'x' * 100 2171 tmpdir = self.CreateTempDir(test_files=10, contents=contents) 2172 # Upload the data. 2173 with SetBotoConfigForTest([('GSUtil', 'resumable_threshold', str(ONE_KIB)) 2174 ]): 2175 stderr = self.RunGsUtil( 2176 ['-D', '-m', 'cp', '-J', '-r', tmpdir, 2177 suri(bucket_uri)], 2178 return_stderr=True) 2179 # Ensure all objects are uploaded. 2180 self.AssertNObjectsInBucket(bucket_uri, 10) 2181 # Ensure the progress logger sees a gzip encoding. 2182 self.assertIn('send: Using gzip transport encoding for the request.', 2183 stderr) 2184 2185 @SkipForS3('No compressed transport encoding support for S3.') 2186 @SkipForXML('No compressed transport encoding support for the XML API.') 2187 @SequentialAndParallelTransfer 2188 def test_gzip_transport_encoded_parallel_upload_resumable(self): 2189 """Test resumable, gzip encoded files upload correctly in parallel. 2190 2191 This test generates a large amount of data (e.g. halt_size amount of chars) 2192 to upload. Due to the large size, it will be above the resumable threshold, 2193 and test the behavior of resumable uploads. 2194 """ 2195 # Setup the bucket and local data. 2196 bucket_uri = self.CreateBucket() 2197 contents = get_random_ascii_chars(size=self.halt_size) 2198 tmpdir = self.CreateTempDir(test_files=10, contents=contents) 2199 # Upload the data. 2200 with SetBotoConfigForTest([('GSUtil', 'resumable_threshold', str(ONE_KIB)) 2201 ]): 2202 stderr = self.RunGsUtil( 2203 ['-D', '-m', 'cp', '-J', '-r', tmpdir, 2204 suri(bucket_uri)], 2205 return_stderr=True) 2206 # Ensure all objects are uploaded. 2207 self.AssertNObjectsInBucket(bucket_uri, 10) 2208 # Ensure the progress logger sees a gzip encoding. 2209 self.assertIn('send: Using gzip transport encoding for the request.', 2210 stderr) 2211 2212 @SequentialAndParallelTransfer 2213 def test_gzip_all_upload_and_download(self): 2214 bucket_uri = self.CreateBucket() 2215 contents = b'x' * 10000 2216 tmpdir = self.CreateTempDir() 2217 self.CreateTempFile(file_name='test.html', tmpdir=tmpdir, contents=contents) 2218 self.CreateTempFile(file_name='test.js', tmpdir=tmpdir, contents=contents) 2219 self.CreateTempFile(file_name='test.txt', tmpdir=tmpdir, contents=contents) 2220 self.CreateTempFile(file_name='test', tmpdir=tmpdir, contents=contents) 2221 # Test that all files are compressed. 2222 self.RunGsUtil( 2223 ['cp', '-Z', 2224 os.path.join(tmpdir, 'test*'), 2225 suri(bucket_uri)]) 2226 self.AssertNObjectsInBucket(bucket_uri, 4) 2227 uri1 = suri(bucket_uri, 'test.html') 2228 uri2 = suri(bucket_uri, 'test.js') 2229 uri3 = suri(bucket_uri, 'test.txt') 2230 uri4 = suri(bucket_uri, 'test') 2231 stdout = self.RunGsUtil(['stat', uri1], return_stdout=True) 2232 self.assertRegex(stdout, r'Content-Encoding:\s+gzip') 2233 stdout = self.RunGsUtil(['stat', uri2], return_stdout=True) 2234 self.assertRegex(stdout, r'Content-Encoding:\s+gzip') 2235 stdout = self.RunGsUtil(['stat', uri3], return_stdout=True) 2236 self.assertRegex(stdout, r'Content-Encoding:\s+gzip') 2237 stdout = self.RunGsUtil(['stat', uri4], return_stdout=True) 2238 self.assertRegex(stdout, r'Content-Encoding:\s+gzip') 2239 fpath4 = self.CreateTempFile() 2240 for uri in (uri1, uri2, uri3, uri4): 2241 self.RunGsUtil(['cp', uri, suri(fpath4)]) 2242 with open(fpath4, 'rb') as f: 2243 self.assertEqual(f.read(), contents) 2244 2245 @SkipForS3('No compressed transport encoding support for S3.') 2246 @SkipForXML('No compressed transport encoding support for the XML API.') 2247 @SequentialAndParallelTransfer 2248 def test_gzip_transport_encoded_all_upload_and_download(self): 2249 """Test gzip encoded files upload correctly. 2250 2251 This checks that files are not tagged with a gzip content encoding and 2252 that the contents of the files are uncompressed in GCS. This test uses the 2253 -J flag to target all files. 2254 """ 2255 # Setup the bucket and local data. 2256 bucket_uri = self.CreateBucket() 2257 contents = b'x' * 10000 2258 tmpdir = self.CreateTempDir() 2259 local_uri1 = self.CreateTempFile(file_name='test.txt', 2260 tmpdir=tmpdir, 2261 contents=contents) 2262 local_uri2 = self.CreateTempFile(file_name='test', 2263 tmpdir=tmpdir, 2264 contents=contents) 2265 # Upload the data. 2266 stderr = self.RunGsUtil( 2267 ['-D', 'cp', '-J', 2268 os.path.join(tmpdir, 'test*'), 2269 suri(bucket_uri)], 2270 return_stderr=True) 2271 self.AssertNObjectsInBucket(bucket_uri, 2) 2272 # Ensure the correct files were marked for compression. 2273 self.assertIn( 2274 'Using compressed transport encoding for file://%s.' % (local_uri1), 2275 stderr) 2276 self.assertIn( 2277 'Using compressed transport encoding for file://%s.' % (local_uri2), 2278 stderr) 2279 # Ensure the progress logger sees a gzip encoding. 2280 self.assertIn('send: Using gzip transport encoding for the request.', 2281 stderr) 2282 # Ensure the files do not have a stored encoding of gzip and are stored 2283 # uncompressed. 2284 remote_uri1 = suri(bucket_uri, 'test.txt') 2285 remote_uri2 = suri(bucket_uri, 'test') 2286 fpath4 = self.CreateTempFile() 2287 for uri in (remote_uri1, remote_uri2): 2288 stdout = self.RunGsUtil(['stat', uri], return_stdout=True) 2289 self.assertNotRegex(stdout, r'Content-Encoding:\s+gzip') 2290 self.RunGsUtil(['cp', uri, suri(fpath4)]) 2291 with open(fpath4, 'rb') as f: 2292 self.assertEqual(f.read(), contents) 2293 2294 def test_both_gzip_options_error(self): 2295 """Test that mixing compression flags error.""" 2296 cases = ( 2297 # Test with -Z and -z 2298 ['cp', '-Z', '-z', 'html, js', 'a.js', 'b.js'], 2299 # Same test, but with arguments in the opposite order. 2300 ['cp', '-z', 'html, js', '-Z', 'a.js', 'b.js']) 2301 2302 for case in cases: 2303 stderr = self.RunGsUtil(case, return_stderr=True, expected_status=1) 2304 self.assertIn('CommandException', stderr) 2305 self.assertIn( 2306 'Specifying both the -z and -Z options together is invalid.', stderr) 2307 2308 def test_both_gzip_transport_encoding_options_error(self): 2309 """Test that mixing transport encoding flags error.""" 2310 cases = ( 2311 # Test with -J and -j 2312 ['cp', '-J', '-j', 'html, js', 'a.js', 'b.js'], 2313 # Same test, but with arguments in the opposite order. 2314 ['cp', '-j', 'html, js', '-J', 'a.js', 'b.js']) 2315 2316 for case in cases: 2317 stderr = self.RunGsUtil(case, return_stderr=True, expected_status=1) 2318 self.assertIn('CommandException', stderr) 2319 self.assertIn( 2320 'Specifying both the -j and -J options together is invalid.', stderr) 2321 2322 def test_combined_gzip_options_error(self): 2323 """Test that mixing transport encoding and compression flags error.""" 2324 cases = (['cp', '-Z', '-j', 'html, js', 'a.js', 2325 'b.js'], ['cp', '-J', '-z', 'html, js', 'a.js', 2326 'b.js'], ['cp', '-j', 'html, js', '-Z', 'a.js', 'b.js'], 2327 ['cp', '-z', 'html, js', '-J', 'a.js', 'b.js']) 2328 2329 for case in cases: 2330 stderr = self.RunGsUtil(case, return_stderr=True, expected_status=1) 2331 self.assertIn('CommandException', stderr) 2332 self.assertIn( 2333 'Specifying both the -j/-J and -z/-Z options together is invalid.', 2334 stderr) 2335 2336 def test_upload_with_subdir_and_unexpanded_wildcard(self): 2337 fpath1 = self.CreateTempFile(file_name=('tmp', 'x', 'y', 'z')) 2338 bucket_uri = self.CreateBucket() 2339 wildcard_uri = '%s*' % fpath1[:-5] 2340 stderr = self.RunGsUtil( 2341 ['cp', '-R', wildcard_uri, suri(bucket_uri)], return_stderr=True) 2342 self.assertIn('Copying file:', stderr) 2343 self.AssertNObjectsInBucket(bucket_uri, 1) 2344 2345 @SequentialAndParallelTransfer 2346 def test_cp_object_ending_with_slash(self): 2347 """Tests that cp works with object names ending with slash.""" 2348 tmpdir = self.CreateTempDir() 2349 bucket_uri = self.CreateBucket() 2350 self.CreateObject(bucket_uri=bucket_uri, 2351 object_name='abc/', 2352 contents=b'dir') 2353 self.CreateObject(bucket_uri=bucket_uri, 2354 object_name='abc/def', 2355 contents=b'def') 2356 self.AssertNObjectsInBucket(bucket_uri, 2) 2357 self.RunGsUtil(['cp', '-R', suri(bucket_uri), tmpdir]) 2358 # Check that files in the subdir got copied even though subdir object 2359 # download was skipped. 2360 with open(os.path.join(tmpdir, bucket_uri.bucket_name, 'abc', 'def')) as f: 2361 self.assertEquals('def', '\n'.join(f.readlines())) 2362 2363 def test_cp_without_read_access(self): 2364 """Tests that cp fails without read access to the object.""" 2365 # TODO: With 401's triggering retries in apitools, this test will take 2366 # a long time. Ideally, make apitools accept a num_retries config for this 2367 # until we stop retrying the 401's. 2368 bucket_uri = self.CreateBucket() 2369 object_uri = self.CreateObject(bucket_uri=bucket_uri, contents=b'foo') 2370 2371 # Use @Retry as hedge against bucket listing eventual consistency. 2372 self.AssertNObjectsInBucket(bucket_uri, 1) 2373 2374 if self.default_provider == 's3': 2375 expected_error_regex = r'AccessDenied' 2376 else: 2377 expected_error_regex = r'Anonymous \S+ do(es)? not have' 2378 2379 with self.SetAnonymousBotoCreds(): 2380 stderr = self.RunGsUtil(['cp', suri(object_uri), 'foo'], 2381 return_stderr=True, 2382 expected_status=1) 2383 self.assertRegex(stderr, expected_error_regex) 2384 2385 @unittest.skipIf(IS_WINDOWS, 'os.symlink() is not available on Windows.') 2386 def test_cp_minus_r_minus_e(self): 2387 """Tests that cp -e -r ignores symlinks when recursing.""" 2388 bucket_uri = self.CreateBucket() 2389 tmpdir = self.CreateTempDir() 2390 # Create a valid file, since cp expects to copy at least one source URL 2391 # successfully. 2392 self.CreateTempFile(tmpdir=tmpdir, contents=b'foo') 2393 subdir = os.path.join(tmpdir, 'subdir') 2394 os.mkdir(subdir) 2395 os.mkdir(os.path.join(tmpdir, 'missing')) 2396 # Create a blank directory that is a broken symlink to ensure that we 2397 # don't fail recursive enumeration with a bad symlink. 2398 os.symlink(os.path.join(tmpdir, 'missing'), os.path.join(subdir, 'missing')) 2399 os.rmdir(os.path.join(tmpdir, 'missing')) 2400 self.RunGsUtil(['cp', '-r', '-e', tmpdir, suri(bucket_uri)]) 2401 2402 @unittest.skipIf(IS_WINDOWS, 'os.symlink() is not available on Windows.') 2403 def test_cp_minus_e(self): 2404 fpath_dir = self.CreateTempDir() 2405 fpath1 = self.CreateTempFile(tmpdir=fpath_dir) 2406 fpath2 = os.path.join(fpath_dir, 'cp_minus_e') 2407 bucket_uri = self.CreateBucket() 2408 os.symlink(fpath1, fpath2) 2409 # We also use -c to continue on errors. One of the expanded glob entries 2410 # should be the symlinked file, which should throw a CommandException since 2411 # no valid (non-symlinked) files could be found at that path; we don't want 2412 # the command to terminate if that's the first file we attempt to copy. 2413 stderr = self.RunGsUtil([ 2414 'cp', '-e', '-c', 2415 '%s%s*' % (fpath_dir, os.path.sep), 2416 suri(bucket_uri, 'files') 2417 ], 2418 return_stderr=True) 2419 self.assertIn('Copying file', stderr) 2420 self.assertIn('Skipping symbolic link', stderr) 2421 2422 # Ensure that top-level arguments are ignored if they are symlinks. The file 2423 # at fpath1 should be successfully copied, then copying the symlink at 2424 # fpath2 should fail. 2425 stderr = self.RunGsUtil( 2426 ['cp', '-e', '-r', fpath1, fpath2, 2427 suri(bucket_uri, 'files')], 2428 return_stderr=True, 2429 expected_status=1) 2430 self.assertIn('Copying file', stderr) 2431 self.assertIn('Skipping symbolic link', stderr) 2432 self.assertIn('CommandException: No URLs matched: %s' % fpath2, stderr) 2433 2434 def test_cp_multithreaded_wildcard(self): 2435 """Tests that cp -m works with a wildcard.""" 2436 num_test_files = 5 2437 tmp_dir = self.CreateTempDir(test_files=num_test_files) 2438 bucket_uri = self.CreateBucket() 2439 wildcard_uri = '%s%s*' % (tmp_dir, os.sep) 2440 self.RunGsUtil(['-m', 'cp', wildcard_uri, suri(bucket_uri)]) 2441 self.AssertNObjectsInBucket(bucket_uri, num_test_files) 2442 2443 @SequentialAndParallelTransfer 2444 def test_cp_duplicate_source_args(self): 2445 """Tests that cp -m works when a source argument is provided twice.""" 2446 object_contents = b'edge' 2447 object_uri = self.CreateObject(object_name='foo', contents=object_contents) 2448 tmp_dir = self.CreateTempDir() 2449 self.RunGsUtil(['-m', 'cp', suri(object_uri), suri(object_uri), tmp_dir]) 2450 with open(os.path.join(tmp_dir, 'foo'), 'rb') as in_fp: 2451 contents = in_fp.read() 2452 # Contents should be not duplicated. 2453 self.assertEqual(contents, object_contents) 2454 2455 @SkipForS3('gsutil doesn\'t support S3 customer-supplied encryption keys.') 2456 @SequentialAndParallelTransfer 2457 def test_cp_download_encrypted_object(self): 2458 """Tests downloading an encrypted object.""" 2459 if self.test_api == ApiSelector.XML: 2460 return unittest.skip( 2461 'gsutil does not support encryption with the XML API') 2462 object_contents = b'bar' 2463 object_uri = self.CreateObject(object_name='foo', 2464 contents=object_contents, 2465 encryption_key=TEST_ENCRYPTION_KEY1) 2466 fpath = self.CreateTempFile() 2467 boto_config_for_test = [('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY1)] 2468 2469 with SetBotoConfigForTest(boto_config_for_test): 2470 self.RunGsUtil(['cp', suri(object_uri), suri(fpath)]) 2471 with open(fpath, 'rb') as f: 2472 self.assertEqual(f.read(), object_contents) 2473 2474 # If multiple keys are supplied and one is correct, download should succeed. 2475 fpath2 = self.CreateTempFile() 2476 boto_config_for_test2 = [ 2477 ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY3), 2478 ('GSUtil', 'decryption_key1', TEST_ENCRYPTION_KEY2), 2479 ('GSUtil', 'decryption_key2', TEST_ENCRYPTION_KEY1) 2480 ] 2481 2482 with SetBotoConfigForTest(boto_config_for_test2): 2483 self.RunGsUtil(['cp', suri(object_uri), suri(fpath2)]) 2484 with open(fpath2, 'rb') as f: 2485 self.assertEqual(f.read(), object_contents) 2486 2487 @SkipForS3('gsutil doesn\'t support S3 customer-supplied encryption keys.') 2488 @SequentialAndParallelTransfer 2489 def test_cp_download_encrypted_object_without_key(self): 2490 """Tests downloading an encrypted object without the necessary key.""" 2491 if self.test_api == ApiSelector.XML: 2492 return unittest.skip( 2493 'gsutil does not support encryption with the XML API') 2494 object_contents = b'bar' 2495 object_uri = self.CreateObject(object_name='foo', 2496 contents=object_contents, 2497 encryption_key=TEST_ENCRYPTION_KEY1) 2498 fpath = self.CreateTempFile() 2499 2500 stderr = self.RunGsUtil( 2501 ['cp', suri(object_uri), suri(fpath)], 2502 expected_status=1, 2503 return_stderr=True) 2504 self.assertIn( 2505 'Missing decryption key with SHA256 hash %s' % 2506 TEST_ENCRYPTION_KEY1_SHA256_B64, stderr) 2507 2508 @SkipForS3('gsutil doesn\'t support S3 customer-supplied encryption keys.') 2509 @SequentialAndParallelTransfer 2510 def test_cp_upload_encrypted_object(self): 2511 """Tests uploading an encrypted object.""" 2512 if self.test_api == ApiSelector.XML: 2513 return unittest.skip( 2514 'gsutil does not support encryption with the XML API') 2515 bucket_uri = self.CreateBucket() 2516 object_uri = suri(bucket_uri, 'foo') 2517 file_contents = b'bar' 2518 fpath = self.CreateTempFile(contents=file_contents, file_name='foo') 2519 2520 boto_config_for_test = [('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY1)] 2521 2522 # Uploading the object should succeed. 2523 with SetBotoConfigForTest(boto_config_for_test): 2524 self.RunGsUtil(['cp', suri(fpath), suri(bucket_uri)]) 2525 2526 self.AssertObjectUsesCSEK(object_uri, TEST_ENCRYPTION_KEY1) 2527 2528 with SetBotoConfigForTest(boto_config_for_test): 2529 # Reading the object back should succeed. 2530 fpath2 = self.CreateTempFile() 2531 self.RunGsUtil(['cp', suri(bucket_uri, 'foo'), suri(fpath2)]) 2532 with open(fpath2, 'rb') as f: 2533 self.assertEqual(f.read(), file_contents) 2534 2535 @SkipForS3('No resumable upload or encryption support for S3.') 2536 def test_cp_resumable_upload_encrypted_object_break(self): 2537 """Tests that an encrypted upload resumes after a connection break.""" 2538 if self.test_api == ApiSelector.XML: 2539 return unittest.skip( 2540 'gsutil does not support encryption with the XML API') 2541 bucket_uri = self.CreateBucket() 2542 object_uri_str = suri(bucket_uri, 'foo') 2543 fpath = self.CreateTempFile(contents=b'a' * self.halt_size) 2544 boto_config_for_test = [('GSUtil', 'resumable_threshold', str(ONE_KIB)), 2545 ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY1)] 2546 test_callback_file = self.CreateTempFile( 2547 contents=pickle.dumps(HaltingCopyCallbackHandler(True, 5))) 2548 2549 with SetBotoConfigForTest(boto_config_for_test): 2550 stderr = self.RunGsUtil([ 2551 'cp', '--testcallbackfile', test_callback_file, fpath, object_uri_str 2552 ], 2553 expected_status=1, 2554 return_stderr=True) 2555 self.assertIn('Artifically halting upload', stderr) 2556 stderr = self.RunGsUtil(['cp', fpath, object_uri_str], return_stderr=True) 2557 self.assertIn('Resuming upload', stderr) 2558 stdout = self.RunGsUtil(['stat', object_uri_str], return_stdout=True) 2559 with open(fpath, 'rb') as fp: 2560 self.assertIn(CalculateB64EncodedMd5FromContents(fp), stdout) 2561 2562 self.AssertObjectUsesCSEK(object_uri_str, TEST_ENCRYPTION_KEY1) 2563 2564 @SkipForS3('No resumable upload or encryption support for S3.') 2565 def test_cp_resumable_upload_encrypted_object_different_key(self): 2566 """Tests that an encrypted upload resume uses original encryption key.""" 2567 if self.test_api == ApiSelector.XML: 2568 return unittest.skip( 2569 'gsutil does not support encryption with the XML API') 2570 bucket_uri = self.CreateBucket() 2571 object_uri_str = suri(bucket_uri, 'foo') 2572 file_contents = b'a' * self.halt_size 2573 fpath = self.CreateTempFile(contents=file_contents) 2574 boto_config_for_test = [('GSUtil', 'resumable_threshold', str(ONE_KIB)), 2575 ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY1)] 2576 test_callback_file = self.CreateTempFile( 2577 contents=pickle.dumps(HaltingCopyCallbackHandler(True, 5))) 2578 2579 with SetBotoConfigForTest(boto_config_for_test): 2580 stderr = self.RunGsUtil([ 2581 'cp', '--testcallbackfile', test_callback_file, fpath, object_uri_str 2582 ], 2583 expected_status=1, 2584 return_stderr=True) 2585 self.assertIn('Artifically halting upload', stderr) 2586 2587 # Resume the upload with multiple keys, including the original. 2588 boto_config_for_test2 = [('GSUtil', 'resumable_threshold', str(ONE_KIB)), 2589 ('GSUtil', 'decryption_key1', 2590 TEST_ENCRYPTION_KEY2), 2591 ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY1)] 2592 2593 with SetBotoConfigForTest(boto_config_for_test2): 2594 stderr = self.RunGsUtil(['cp', fpath, object_uri_str], return_stderr=True) 2595 self.assertIn('Resuming upload', stderr) 2596 2597 # Object should have the original key. 2598 self.AssertObjectUsesCSEK(object_uri_str, TEST_ENCRYPTION_KEY1) 2599 2600 @SkipForS3('No resumable upload or encryption support for S3.') 2601 def test_cp_resumable_upload_encrypted_object_missing_key(self): 2602 """Tests that an encrypted upload does not resume without original key.""" 2603 if self.test_api == ApiSelector.XML: 2604 return unittest.skip( 2605 'gsutil does not support encryption with the XML API') 2606 bucket_uri = self.CreateBucket() 2607 object_uri_str = suri(bucket_uri, 'foo') 2608 file_contents = b'a' * self.halt_size 2609 fpath = self.CreateTempFile(contents=file_contents) 2610 boto_config_for_test = [('GSUtil', 'resumable_threshold', str(ONE_KIB)), 2611 ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY1)] 2612 test_callback_file = self.CreateTempFile( 2613 contents=pickle.dumps(HaltingCopyCallbackHandler(True, 5))) 2614 2615 with SetBotoConfigForTest(boto_config_for_test): 2616 stderr = self.RunGsUtil([ 2617 'cp', '--testcallbackfile', test_callback_file, fpath, object_uri_str 2618 ], 2619 expected_status=1, 2620 return_stderr=True) 2621 self.assertIn('Artifically halting upload', stderr) 2622 2623 # Resume the upload without the original key. 2624 boto_config_for_test2 = [('GSUtil', 'resumable_threshold', str(ONE_KIB)), 2625 ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY2)] 2626 2627 with SetBotoConfigForTest(boto_config_for_test2): 2628 stderr = self.RunGsUtil(['cp', fpath, object_uri_str], return_stderr=True) 2629 self.assertNotIn('Resuming upload', stderr) 2630 self.assertIn('does not match current encryption key', stderr) 2631 self.assertIn('Restarting upload from scratch', stderr) 2632 2633 # Object should have the new key. 2634 self.AssertObjectUsesCSEK(object_uri_str, TEST_ENCRYPTION_KEY2) 2635 2636 def _ensure_object_unencrypted(self, object_uri_str): 2637 """Strongly consistent check that the object is unencrypted.""" 2638 stdout = self.RunGsUtil(['stat', object_uri_str], return_stdout=True) 2639 self.assertNotIn('Encryption Key', stdout) 2640 2641 @SkipForS3('No resumable upload support for S3.') 2642 def test_cp_resumable_upload_break(self): 2643 """Tests that an upload can be resumed after a connection break.""" 2644 bucket_uri = self.CreateBucket() 2645 fpath = self.CreateTempFile(contents=b'a' * self.halt_size) 2646 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) 2647 test_callback_file = self.CreateTempFile( 2648 contents=pickle.dumps(HaltingCopyCallbackHandler(True, 5))) 2649 2650 with SetBotoConfigForTest([boto_config_for_test]): 2651 stderr = self.RunGsUtil([ 2652 'cp', '--testcallbackfile', test_callback_file, fpath, 2653 suri(bucket_uri) 2654 ], 2655 expected_status=1, 2656 return_stderr=True) 2657 self.assertIn('Artifically halting upload', stderr) 2658 stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)], 2659 return_stderr=True) 2660 self.assertIn('Resuming upload', stderr) 2661 2662 @SkipForS3('No compressed transport encoding support for S3.') 2663 @SkipForXML('No compressed transport encoding support for the XML API.') 2664 @SequentialAndParallelTransfer 2665 def test_cp_resumable_upload_gzip_encoded_break(self): 2666 """Tests that a gzip encoded upload can be resumed.""" 2667 # Setup the bucket and local data. File contents are randomized to prevent 2668 # them from compressing below the resumable-threshold and failing the test. 2669 bucket_uri = self.CreateBucket() 2670 contents = get_random_ascii_chars(size=self.halt_size) 2671 local_uri = self.CreateTempFile(file_name='test.txt', contents=contents) 2672 # Configure boto 2673 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) 2674 test_callback_file = self.CreateTempFile( 2675 contents=pickle.dumps(HaltingCopyCallbackHandler(True, 5))) 2676 2677 with SetBotoConfigForTest([boto_config_for_test]): 2678 stderr = self.RunGsUtil([ 2679 '-D', 'cp', '-J', '--testcallbackfile', test_callback_file, local_uri, 2680 suri(bucket_uri) 2681 ], 2682 expected_status=1, 2683 return_stderr=True) 2684 # Ensure the progress logger sees a gzip encoding. 2685 self.assertIn('send: Using gzip transport encoding for the request.', 2686 stderr) 2687 self.assertIn('Artifically halting upload', stderr) 2688 stderr = self.RunGsUtil(['-D', 'cp', '-J', local_uri, 2689 suri(bucket_uri)], 2690 return_stderr=True) 2691 self.assertIn('Resuming upload', stderr) 2692 # Ensure the progress logger is still seeing a gzip encoding. 2693 self.assertIn('send: Using gzip transport encoding for the request.', 2694 stderr) 2695 # Ensure the files do not have a stored encoding of gzip and are stored 2696 # uncompressed. 2697 temp_uri = self.CreateTempFile() 2698 remote_uri = suri(bucket_uri, 'test.txt') 2699 stdout = self.RunGsUtil(['stat', remote_uri], return_stdout=True) 2700 self.assertNotRegex(stdout, r'Content-Encoding:\s+gzip') 2701 self.RunGsUtil(['cp', remote_uri, suri(temp_uri)]) 2702 with open(temp_uri, 'rb') as f: 2703 self.assertEqual(f.read(), contents) 2704 2705 @SkipForS3('No resumable upload support for S3.') 2706 def test_cp_resumable_upload_retry(self): 2707 """Tests that a resumable upload completes with one retry.""" 2708 bucket_uri = self.CreateBucket() 2709 fpath = self.CreateTempFile(contents=b'a' * self.halt_size) 2710 # TODO: Raising an httplib or socket error blocks bucket teardown 2711 # in JSON for 60-120s on a multiprocessing lock acquire. Figure out why; 2712 # until then, raise an apitools retryable exception. 2713 if self.test_api == ApiSelector.XML: 2714 test_callback_file = self.CreateTempFile(contents=pickle.dumps( 2715 _ResumableUploadRetryHandler(5, http_client.BadStatusLine, ( 2716 'unused',)))) 2717 else: 2718 test_callback_file = self.CreateTempFile(contents=pickle.dumps( 2719 _ResumableUploadRetryHandler( 2720 5, apitools_exceptions.BadStatusCodeError, ('unused', 'unused', 2721 'unused')))) 2722 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) 2723 with SetBotoConfigForTest([boto_config_for_test]): 2724 stderr = self.RunGsUtil([ 2725 '-D', 'cp', '--testcallbackfile', test_callback_file, fpath, 2726 suri(bucket_uri) 2727 ], 2728 return_stderr=1) 2729 if self.test_api == ApiSelector.XML: 2730 self.assertIn('Got retryable failure', stderr) 2731 else: 2732 self.assertIn('Retrying', stderr) 2733 2734 @SkipForS3('No resumable upload support for S3.') 2735 def test_cp_resumable_streaming_upload_retry(self): 2736 """Tests that a streaming resumable upload completes with one retry.""" 2737 if self.test_api == ApiSelector.XML: 2738 return unittest.skip('XML does not support resumable streaming uploads.') 2739 bucket_uri = self.CreateBucket() 2740 2741 test_callback_file = self.CreateTempFile(contents=pickle.dumps( 2742 _ResumableUploadRetryHandler(5, apitools_exceptions.BadStatusCodeError, 2743 ('unused', 'unused', 'unused')))) 2744 # Need to reduce the JSON chunk size since streaming uploads buffer a 2745 # full chunk. 2746 boto_configs_for_test = [('GSUtil', 'json_resumable_chunk_size', 2747 str(256 * ONE_KIB)), ('Boto', 'num_retries', '2')] 2748 with SetBotoConfigForTest(boto_configs_for_test): 2749 stderr = self.RunGsUtil([ 2750 '-D', 'cp', '--testcallbackfile', test_callback_file, '-', 2751 suri(bucket_uri, 'foo') 2752 ], 2753 stdin='a' * 512 * ONE_KIB, 2754 return_stderr=1) 2755 self.assertIn('Retrying', stderr) 2756 2757 @SkipForS3('preserve_acl flag not supported for S3.') 2758 def test_cp_preserve_no_owner(self): 2759 bucket_uri = self.CreateBucket() 2760 object_uri = self.CreateObject(bucket_uri=bucket_uri, contents=b'foo') 2761 # Anonymous user can read the object and write to the bucket, but does 2762 # not own the object. 2763 self.RunGsUtil(['acl', 'ch', '-u', 'AllUsers:R', suri(object_uri)]) 2764 self.RunGsUtil(['acl', 'ch', '-u', 'AllUsers:W', suri(bucket_uri)]) 2765 with self.SetAnonymousBotoCreds(): 2766 stderr = self.RunGsUtil( 2767 ['cp', '-p', suri(object_uri), 2768 suri(bucket_uri, 'foo')], 2769 return_stderr=True, 2770 expected_status=1) 2771 self.assertIn('OWNER permission is required for preserving ACLs', stderr) 2772 2773 @SkipForS3('No resumable upload support for S3.') 2774 def test_cp_progress_callbacks(self): 2775 bucket_uri = self.CreateBucket() 2776 final_size_string = BytesToFixedWidthString(1024**2) 2777 final_progress_callback = final_size_string + '/' + final_size_string 2778 fpath = self.CreateTempFile(contents=b'a' * ONE_MIB, file_name='foo') 2779 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) 2780 with SetBotoConfigForTest([boto_config_for_test]): 2781 stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)], 2782 return_stderr=True) 2783 self.assertEquals(1, stderr.count(final_progress_callback)) 2784 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(2 * ONE_MIB)) 2785 with SetBotoConfigForTest([boto_config_for_test]): 2786 stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)], 2787 return_stderr=True) 2788 self.assertEquals(1, stderr.count(final_progress_callback)) 2789 stderr = self.RunGsUtil(['cp', suri(bucket_uri, 'foo'), fpath], 2790 return_stderr=True) 2791 self.assertEquals(1, stderr.count(final_progress_callback)) 2792 2793 @SkipForS3('No resumable upload support for S3.') 2794 def test_cp_resumable_upload(self): 2795 """Tests that a basic resumable upload completes successfully.""" 2796 bucket_uri = self.CreateBucket() 2797 fpath = self.CreateTempFile(contents=b'a' * self.halt_size) 2798 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) 2799 with SetBotoConfigForTest([boto_config_for_test]): 2800 self.RunGsUtil(['cp', fpath, suri(bucket_uri)]) 2801 2802 @SkipForS3('No resumable upload support for S3.') 2803 def test_resumable_upload_break_leaves_tracker(self): 2804 """Tests that a tracker file is created with a resumable upload.""" 2805 bucket_uri = self.CreateBucket() 2806 fpath = self.CreateTempFile(file_name='foo', contents=b'a' * self.halt_size) 2807 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) 2808 with SetBotoConfigForTest([boto_config_for_test]): 2809 tracker_filename = GetTrackerFilePath( 2810 StorageUrlFromString(suri(bucket_uri, 'foo')), TrackerFileType.UPLOAD, 2811 self.test_api) 2812 test_callback_file = self.CreateTempFile( 2813 contents=pickle.dumps(HaltingCopyCallbackHandler(True, 5))) 2814 try: 2815 stderr = self.RunGsUtil([ 2816 'cp', '--testcallbackfile', test_callback_file, fpath, 2817 suri(bucket_uri, 'foo') 2818 ], 2819 expected_status=1, 2820 return_stderr=True) 2821 self.assertIn('Artifically halting upload', stderr) 2822 self.assertTrue(os.path.exists(tracker_filename), 2823 'Tracker file %s not present.' % tracker_filename) 2824 # Test the permissions 2825 if os.name == 'posix': 2826 mode = oct(stat.S_IMODE(os.stat(tracker_filename).st_mode)) 2827 # Assert that only user has read/write permission 2828 self.assertEqual(oct(0o600), mode) 2829 finally: 2830 DeleteTrackerFile(tracker_filename) 2831 2832 @SkipForS3('No resumable upload support for S3.') 2833 def test_cp_resumable_upload_break_file_size_change(self): 2834 """Tests a resumable upload where the uploaded file changes size. 2835 2836 This should fail when we read the tracker data. 2837 """ 2838 bucket_uri = self.CreateBucket() 2839 tmp_dir = self.CreateTempDir() 2840 fpath = self.CreateTempFile(file_name='foo', 2841 tmpdir=tmp_dir, 2842 contents=b'a' * self.halt_size) 2843 test_callback_file = self.CreateTempFile( 2844 contents=pickle.dumps(HaltingCopyCallbackHandler(True, 5))) 2845 2846 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) 2847 with SetBotoConfigForTest([boto_config_for_test]): 2848 stderr = self.RunGsUtil([ 2849 'cp', '--testcallbackfile', test_callback_file, fpath, 2850 suri(bucket_uri) 2851 ], 2852 expected_status=1, 2853 return_stderr=True) 2854 self.assertIn('Artifically halting upload', stderr) 2855 fpath = self.CreateTempFile(file_name='foo', 2856 tmpdir=tmp_dir, 2857 contents=b'a' * self.halt_size * 2) 2858 stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)], 2859 expected_status=1, 2860 return_stderr=True) 2861 self.assertIn('ResumableUploadAbortException', stderr) 2862 2863 @SkipForS3('No resumable upload support for S3.') 2864 def test_cp_resumable_upload_break_file_content_change(self): 2865 """Tests a resumable upload where the uploaded file changes content.""" 2866 if self.test_api == ApiSelector.XML: 2867 return unittest.skip( 2868 'XML doesn\'t make separate HTTP calls at fixed-size boundaries for ' 2869 'resumable uploads, so we can\'t guarantee that the server saves a ' 2870 'specific part of the upload.') 2871 bucket_uri = self.CreateBucket() 2872 tmp_dir = self.CreateTempDir() 2873 fpath = self.CreateTempFile(file_name='foo', 2874 tmpdir=tmp_dir, 2875 contents=b'a' * ONE_KIB * 512) 2876 test_callback_file = self.CreateTempFile(contents=pickle.dumps( 2877 HaltingCopyCallbackHandler(True, 2878 int(ONE_KIB) * 384))) 2879 resumable_threshold_for_test = ('GSUtil', 'resumable_threshold', 2880 str(ONE_KIB)) 2881 resumable_chunk_size_for_test = ('GSUtil', 'json_resumable_chunk_size', 2882 str(ONE_KIB * 256)) 2883 with SetBotoConfigForTest( 2884 [resumable_threshold_for_test, resumable_chunk_size_for_test]): 2885 stderr = self.RunGsUtil([ 2886 'cp', '--testcallbackfile', test_callback_file, fpath, 2887 suri(bucket_uri) 2888 ], 2889 expected_status=1, 2890 return_stderr=True) 2891 self.assertIn('Artifically halting upload', stderr) 2892 fpath = self.CreateTempFile(file_name='foo', 2893 tmpdir=tmp_dir, 2894 contents=b'b' * ONE_KIB * 512) 2895 stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)], 2896 expected_status=1, 2897 return_stderr=True) 2898 self.assertIn('doesn\'t match cloud-supplied digest', stderr) 2899 2900 @SkipForS3('No resumable upload support for S3.') 2901 def test_cp_resumable_upload_break_file_smaller_size(self): 2902 """Tests a resumable upload where the uploaded file changes content. 2903 2904 This should fail hash validation. 2905 """ 2906 bucket_uri = self.CreateBucket() 2907 tmp_dir = self.CreateTempDir() 2908 fpath = self.CreateTempFile(file_name='foo', 2909 tmpdir=tmp_dir, 2910 contents=b'a' * ONE_KIB * 512) 2911 test_callback_file = self.CreateTempFile(contents=pickle.dumps( 2912 HaltingCopyCallbackHandler(True, 2913 int(ONE_KIB) * 384))) 2914 resumable_threshold_for_test = ('GSUtil', 'resumable_threshold', 2915 str(ONE_KIB)) 2916 resumable_chunk_size_for_test = ('GSUtil', 'json_resumable_chunk_size', 2917 str(ONE_KIB * 256)) 2918 with SetBotoConfigForTest( 2919 [resumable_threshold_for_test, resumable_chunk_size_for_test]): 2920 stderr = self.RunGsUtil([ 2921 'cp', '--testcallbackfile', test_callback_file, fpath, 2922 suri(bucket_uri) 2923 ], 2924 expected_status=1, 2925 return_stderr=True) 2926 self.assertIn('Artifically halting upload', stderr) 2927 fpath = self.CreateTempFile(file_name='foo', 2928 tmpdir=tmp_dir, 2929 contents=b'a' * ONE_KIB) 2930 stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)], 2931 expected_status=1, 2932 return_stderr=True) 2933 self.assertIn('ResumableUploadAbortException', stderr) 2934 2935 @SkipForS3('No resumable upload support for S3.') 2936 def test_cp_composite_encrypted_upload_resume(self): 2937 """Tests that an encrypted composite upload resumes successfully.""" 2938 if self.test_api == ApiSelector.XML: 2939 return unittest.skip( 2940 'gsutil does not support encryption with the XML API') 2941 bucket_uri = self.CreateBucket() 2942 dst_url = StorageUrlFromString(suri(bucket_uri, 'foo')) 2943 2944 file_contents = b'foobar' 2945 file_name = 'foobar' 2946 source_file = self.CreateTempFile(contents=file_contents, 2947 file_name=file_name) 2948 src_url = StorageUrlFromString(source_file) 2949 2950 # Simulate an upload that had occurred by writing a tracker file 2951 # that points to a previously uploaded component. 2952 tracker_file_name = GetTrackerFilePath(dst_url, 2953 TrackerFileType.PARALLEL_UPLOAD, 2954 self.test_api, src_url) 2955 tracker_prefix = '123' 2956 2957 # Create component 0 to be used in the resume; it must match the name 2958 # that will be generated in copy_helper, so we use the same scheme. 2959 encoded_name = (PARALLEL_UPLOAD_STATIC_SALT + source_file).encode(UTF8) 2960 content_md5 = hashlib.md5() 2961 content_md5.update(encoded_name) 2962 digest = content_md5.hexdigest() 2963 component_object_name = (tracker_prefix + PARALLEL_UPLOAD_TEMP_NAMESPACE + 2964 digest + '_0') 2965 2966 component_size = 3 2967 object_uri = self.CreateObject(bucket_uri=bucket_uri, 2968 object_name=component_object_name, 2969 contents=file_contents[:component_size], 2970 encryption_key=TEST_ENCRYPTION_KEY1) 2971 existing_component = ObjectFromTracker(component_object_name, 2972 str(object_uri.generation)) 2973 existing_components = [existing_component] 2974 enc_key_sha256 = TEST_ENCRYPTION_KEY1_SHA256_B64 2975 2976 WriteParallelUploadTrackerFile(tracker_file_name, 2977 tracker_prefix, 2978 existing_components, 2979 encryption_key_sha256=enc_key_sha256) 2980 2981 try: 2982 # Now "resume" the upload using the original encryption key. 2983 with SetBotoConfigForTest([ 2984 ('GSUtil', 'parallel_composite_upload_threshold', '1'), 2985 ('GSUtil', 'parallel_composite_upload_component_size', 2986 str(component_size)), 2987 ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY1) 2988 ]): 2989 stderr = self.RunGsUtil( 2990 ['cp', source_file, suri(bucket_uri, 'foo')], return_stderr=True) 2991 self.assertIn('Found 1 existing temporary components to reuse.', stderr) 2992 self.assertFalse( 2993 os.path.exists(tracker_file_name), 2994 'Tracker file %s should have been deleted.' % tracker_file_name) 2995 read_contents = self.RunGsUtil(['cat', suri(bucket_uri, 'foo')], 2996 return_stdout=True) 2997 self.assertEqual(read_contents.encode('ascii'), file_contents) 2998 finally: 2999 # Clean up if something went wrong. 3000 DeleteTrackerFile(tracker_file_name) 3001 3002 @SkipForS3('No resumable upload support for S3.') 3003 def test_cp_composite_encrypted_upload_restart(self): 3004 """Tests that encrypted composite upload restarts given a different key.""" 3005 if self.test_api == ApiSelector.XML: 3006 return unittest.skip( 3007 'gsutil does not support encryption with the XML API') 3008 bucket_uri = self.CreateBucket() 3009 dst_url = StorageUrlFromString(suri(bucket_uri, 'foo')) 3010 3011 file_contents = b'foobar' 3012 source_file = self.CreateTempFile(contents=file_contents, file_name='foo') 3013 src_url = StorageUrlFromString(source_file) 3014 3015 # Simulate an upload that had occurred by writing a tracker file. 3016 tracker_file_name = GetTrackerFilePath(dst_url, 3017 TrackerFileType.PARALLEL_UPLOAD, 3018 self.test_api, src_url) 3019 tracker_prefix = '123' 3020 existing_component_name = 'foo_1' 3021 object_uri = self.CreateObject(bucket_uri=bucket_uri, 3022 object_name='foo_1', 3023 contents=b'foo', 3024 encryption_key=TEST_ENCRYPTION_KEY1) 3025 existing_component = ObjectFromTracker(existing_component_name, 3026 str(object_uri.generation)) 3027 existing_components = [existing_component] 3028 enc_key_sha256 = TEST_ENCRYPTION_KEY1_SHA256_B64 3029 WriteParallelUploadTrackerFile(tracker_file_name, tracker_prefix, 3030 existing_components, 3031 enc_key_sha256.decode('ascii')) 3032 3033 try: 3034 # Now "resume" the upload using the original encryption key. 3035 with SetBotoConfigForTest([ 3036 ('GSUtil', 'parallel_composite_upload_threshold', '1'), 3037 ('GSUtil', 'parallel_composite_upload_component_size', '3'), 3038 ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY2) 3039 ]): 3040 stderr = self.RunGsUtil( 3041 ['cp', source_file, suri(bucket_uri, 'foo')], return_stderr=True) 3042 self.assertIn( 3043 'does not match current encryption key. ' 3044 'Deleting old components and restarting upload', stderr) 3045 self.assertNotIn('existing temporary components to reuse.', stderr) 3046 self.assertFalse( 3047 os.path.exists(tracker_file_name), 3048 'Tracker file %s should have been deleted.' % tracker_file_name) 3049 read_contents = self.RunGsUtil(['cat', suri(bucket_uri, 'foo')], 3050 return_stdout=True) 3051 self.assertEqual(read_contents.encode('ascii'), file_contents) 3052 finally: 3053 # Clean up if something went wrong. 3054 DeleteTrackerFile(tracker_file_name) 3055 3056 # This temporarily changes the tracker directory to unwritable which 3057 # interferes with any parallel running tests that use the tracker directory. 3058 @NotParallelizable 3059 @SkipForS3('No resumable upload support for S3.') 3060 @unittest.skipIf(IS_WINDOWS, 'chmod on dir unsupported on Windows.') 3061 @SequentialAndParallelTransfer 3062 def test_cp_unwritable_tracker_file(self): 3063 """Tests a resumable upload with an unwritable tracker file.""" 3064 bucket_uri = self.CreateBucket() 3065 tracker_filename = GetTrackerFilePath( 3066 StorageUrlFromString(suri(bucket_uri, 'foo')), TrackerFileType.UPLOAD, 3067 self.test_api) 3068 tracker_dir = os.path.dirname(tracker_filename) 3069 fpath = self.CreateTempFile(file_name='foo', contents=b'a' * ONE_KIB) 3070 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) 3071 save_mod = os.stat(tracker_dir).st_mode 3072 3073 try: 3074 os.chmod(tracker_dir, 0) 3075 with SetBotoConfigForTest([boto_config_for_test]): 3076 stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)], 3077 expected_status=1, 3078 return_stderr=True) 3079 self.assertIn('Couldn\'t write tracker file', stderr) 3080 finally: 3081 os.chmod(tracker_dir, save_mod) 3082 if os.path.exists(tracker_filename): 3083 os.unlink(tracker_filename) 3084 3085 # This temporarily changes the tracker directory to unwritable which 3086 # interferes with any parallel running tests that use the tracker directory. 3087 @NotParallelizable 3088 @unittest.skipIf(IS_WINDOWS, 'chmod on dir unsupported on Windows.') 3089 @SequentialAndParallelTransfer 3090 def test_cp_unwritable_tracker_file_download(self): 3091 """Tests downloads with an unwritable tracker file.""" 3092 object_uri = self.CreateObject(contents=b'foo' * ONE_KIB) 3093 tracker_filename = GetTrackerFilePath( 3094 StorageUrlFromString(suri(object_uri)), TrackerFileType.DOWNLOAD, 3095 self.test_api) 3096 tracker_dir = os.path.dirname(tracker_filename) 3097 fpath = self.CreateTempFile() 3098 save_mod = os.stat(tracker_dir).st_mode 3099 3100 try: 3101 os.chmod(tracker_dir, 0) 3102 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(EIGHT_MIB)) 3103 with SetBotoConfigForTest([boto_config_for_test]): 3104 # Should succeed because we are below the threshold. 3105 self.RunGsUtil(['cp', suri(object_uri), fpath]) 3106 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) 3107 with SetBotoConfigForTest([boto_config_for_test]): 3108 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], 3109 expected_status=1, 3110 return_stderr=True) 3111 self.assertIn('Couldn\'t write tracker file', stderr) 3112 finally: 3113 os.chmod(tracker_dir, save_mod) 3114 if os.path.exists(tracker_filename): 3115 os.unlink(tracker_filename) 3116 3117 def _test_cp_resumable_download_break_helper(self, 3118 boto_config, 3119 encryption_key=None): 3120 """Helper function for different modes of resumable download break. 3121 3122 Args: 3123 boto_config: List of boto configuration tuples for use with 3124 SetBotoConfigForTest. 3125 encryption_key: Base64 encryption key for object encryption (if any). 3126 """ 3127 bucket_uri = self.CreateBucket() 3128 file_contents = b'a' * self.halt_size 3129 object_uri = self.CreateObject(bucket_uri=bucket_uri, 3130 object_name='foo', 3131 contents=file_contents, 3132 encryption_key=encryption_key) 3133 fpath = self.CreateTempFile() 3134 test_callback_file = self.CreateTempFile( 3135 contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5))) 3136 3137 with SetBotoConfigForTest(boto_config): 3138 stderr = self.RunGsUtil([ 3139 'cp', '--testcallbackfile', test_callback_file, 3140 suri(object_uri), fpath 3141 ], 3142 expected_status=1, 3143 return_stderr=True) 3144 self.assertIn('Artifically halting download.', stderr) 3145 tracker_filename = GetTrackerFilePath(StorageUrlFromString(fpath), 3146 TrackerFileType.DOWNLOAD, 3147 self.test_api) 3148 self.assertTrue(os.path.isfile(tracker_filename)) 3149 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], 3150 return_stderr=True) 3151 self.assertIn('Resuming download', stderr) 3152 with open(fpath, 'rb') as f: 3153 self.assertEqual(f.read(), file_contents, 'File contents differ') 3154 3155 def test_cp_resumable_download_break(self): 3156 """Tests that a download can be resumed after a connection break.""" 3157 self._test_cp_resumable_download_break_helper([ 3158 ('GSUtil', 'resumable_threshold', str(ONE_KIB)) 3159 ]) 3160 3161 @SkipForS3('gsutil doesn\'t support S3 customer-supplied encryption keys.') 3162 def test_cp_resumable_encrypted_download_break(self): 3163 """Tests that an encrypted download resumes after a connection break.""" 3164 if self.test_api == ApiSelector.XML: 3165 return unittest.skip( 3166 'gsutil does not support encryption with the XML API') 3167 self._test_cp_resumable_download_break_helper( 3168 [('GSUtil', 'resumable_threshold', str(ONE_KIB)), 3169 ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY1)], 3170 encryption_key=TEST_ENCRYPTION_KEY1) 3171 3172 @SkipForS3('gsutil doesn\'t support S3 customer-supplied encryption keys.') 3173 def test_cp_resumable_encrypted_download_key_rotation(self): 3174 """Tests that a download restarts with a rotated encryption key.""" 3175 if self.test_api == ApiSelector.XML: 3176 return unittest.skip( 3177 'gsutil does not support encryption with the XML API') 3178 bucket_uri = self.CreateBucket() 3179 file_contents = b'a' * self.halt_size 3180 object_uri = self.CreateObject(bucket_uri=bucket_uri, 3181 object_name='foo', 3182 contents=file_contents, 3183 encryption_key=TEST_ENCRYPTION_KEY1) 3184 fpath = self.CreateTempFile() 3185 test_callback_file = self.CreateTempFile( 3186 contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5))) 3187 3188 boto_config_for_test = [('GSUtil', 'resumable_threshold', str(ONE_KIB)), 3189 ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY1)] 3190 3191 with SetBotoConfigForTest(boto_config_for_test): 3192 stderr = self.RunGsUtil([ 3193 'cp', '--testcallbackfile', test_callback_file, 3194 suri(object_uri), fpath 3195 ], 3196 expected_status=1, 3197 return_stderr=True) 3198 self.assertIn('Artifically halting download.', stderr) 3199 tracker_filename = GetTrackerFilePath(StorageUrlFromString(fpath), 3200 TrackerFileType.DOWNLOAD, 3201 self.test_api) 3202 self.assertTrue(os.path.isfile(tracker_filename)) 3203 3204 # After simulated connection break, rotate the key on the object. 3205 boto_config_for_test2 = [('GSUtil', 'resumable_threshold', str(ONE_KIB)), 3206 ('GSUtil', 'decryption_key1', 3207 TEST_ENCRYPTION_KEY1), 3208 ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY2)] 3209 with SetBotoConfigForTest(boto_config_for_test2): 3210 self.RunGsUtil(['rewrite', '-k', suri(object_uri)]) 3211 3212 # Now resume the download using only the new encryption key. Since its 3213 # generation changed, we must restart it. 3214 boto_config_for_test3 = [('GSUtil', 'resumable_threshold', str(ONE_KIB)), 3215 ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY2)] 3216 with SetBotoConfigForTest(boto_config_for_test3): 3217 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], 3218 return_stderr=True) 3219 self.assertIn('Restarting download', stderr) 3220 with open(fpath, 'rb') as f: 3221 self.assertEqual(f.read(), file_contents, 'File contents differ') 3222 3223 @SequentialAndParallelTransfer 3224 def test_cp_resumable_download_etag_differs(self): 3225 """Tests that download restarts the file when the source object changes. 3226 3227 This causes the etag not to match. 3228 """ 3229 bucket_uri = self.CreateBucket() 3230 object_uri = self.CreateObject(bucket_uri=bucket_uri, 3231 object_name='foo', 3232 contents=b'abc' * self.halt_size) 3233 fpath = self.CreateTempFile() 3234 test_callback_file = self.CreateTempFile( 3235 contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5))) 3236 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) 3237 with SetBotoConfigForTest([boto_config_for_test]): 3238 # This will create a tracker file with an ETag. 3239 stderr = self.RunGsUtil([ 3240 'cp', '--testcallbackfile', test_callback_file, 3241 suri(object_uri), fpath 3242 ], 3243 expected_status=1, 3244 return_stderr=True) 3245 self.assertIn('Artifically halting download.', stderr) 3246 # Create a new object with different contents - it should have a 3247 # different ETag since the content has changed. 3248 object_uri = self.CreateObject( 3249 bucket_uri=bucket_uri, 3250 object_name='foo', 3251 contents=b'b' * self.halt_size, 3252 gs_idempotent_generation=object_uri.generation) 3253 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], 3254 return_stderr=True) 3255 self.assertNotIn('Resuming download', stderr) 3256 3257 # TODO: Enable this test for sequential downloads when their tracker files are 3258 # modified to contain the source object generation. 3259 @unittest.skipUnless(UsingCrcmodExtension(), 3260 'Sliced download requires fast crcmod.') 3261 @SkipForS3('No sliced download support for S3.') 3262 def test_cp_resumable_download_generation_differs(self): 3263 """Tests that a resumable download restarts if the generation differs.""" 3264 bucket_uri = self.CreateBucket() 3265 file_contents = b'abcd' * self.halt_size 3266 object_uri = self.CreateObject(bucket_uri=bucket_uri, 3267 object_name='foo', 3268 contents=file_contents) 3269 fpath = self.CreateTempFile() 3270 3271 test_callback_file = self.CreateTempFile( 3272 contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5))) 3273 3274 boto_config_for_test = [ 3275 ('GSUtil', 'resumable_threshold', str(self.halt_size)), 3276 ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)), 3277 ('GSUtil', 'sliced_object_download_max_components', '3') 3278 ] 3279 3280 with SetBotoConfigForTest(boto_config_for_test): 3281 stderr = self.RunGsUtil([ 3282 'cp', '--testcallbackfile', test_callback_file, 3283 suri(object_uri), 3284 suri(fpath) 3285 ], 3286 return_stderr=True, 3287 expected_status=1) 3288 self.assertIn('Artifically halting download.', stderr) 3289 3290 # Overwrite the object with an identical object, increasing 3291 # the generation but leaving other metadata the same. 3292 identical_file = self.CreateTempFile(contents=file_contents) 3293 self.RunGsUtil(['cp', suri(identical_file), suri(object_uri)]) 3294 3295 stderr = self.RunGsUtil( 3296 ['cp', suri(object_uri), suri(fpath)], return_stderr=True) 3297 self.assertIn('Restarting download from scratch', stderr) 3298 with open(fpath, 'rb') as f: 3299 self.assertEqual(f.read(), file_contents, 'File contents differ') 3300 3301 def test_cp_resumable_download_file_larger(self): 3302 """Tests download deletes the tracker file when existing file is larger.""" 3303 bucket_uri = self.CreateBucket() 3304 fpath = self.CreateTempFile() 3305 object_uri = self.CreateObject(bucket_uri=bucket_uri, 3306 object_name='foo', 3307 contents=b'a' * self.halt_size) 3308 test_callback_file = self.CreateTempFile( 3309 contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5))) 3310 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) 3311 with SetBotoConfigForTest([boto_config_for_test]): 3312 stderr = self.RunGsUtil([ 3313 'cp', '--testcallbackfile', test_callback_file, 3314 suri(object_uri), fpath 3315 ], 3316 expected_status=1, 3317 return_stderr=True) 3318 self.assertIn('Artifically halting download.', stderr) 3319 with open(fpath + '_.gstmp', 'w') as larger_file: 3320 for _ in range(self.halt_size * 2): 3321 larger_file.write('a') 3322 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], 3323 expected_status=1, 3324 return_stderr=True) 3325 self.assertNotIn('Resuming download', stderr) 3326 self.assertIn('Deleting tracker file', stderr) 3327 3328 def test_cp_resumable_download_content_differs(self): 3329 """Tests that we do not re-download when tracker file matches existing file. 3330 3331 We only compare size, not contents, so re-download should not occur even 3332 though the contents are technically different. However, hash validation on 3333 the file should still occur and we will delete the file then because 3334 the hashes differ. 3335 """ 3336 bucket_uri = self.CreateBucket() 3337 tmp_dir = self.CreateTempDir() 3338 fpath = self.CreateTempFile(tmpdir=tmp_dir) 3339 temp_download_file = fpath + '_.gstmp' 3340 with open(temp_download_file, 'w') as fp: 3341 fp.write('abcd' * ONE_KIB) 3342 3343 object_uri = self.CreateObject(bucket_uri=bucket_uri, 3344 object_name='foo', 3345 contents=b'efgh' * ONE_KIB) 3346 stdout = self.RunGsUtil(['ls', '-L', suri(object_uri)], return_stdout=True) 3347 etag_match = re.search(r'\s*ETag:\s*(.*)', stdout) 3348 self.assertIsNotNone(etag_match, 'Could not get object ETag') 3349 self.assertEqual(len(etag_match.groups()), 1, 3350 'Did not match expected single ETag') 3351 etag = etag_match.group(1) 3352 3353 tracker_filename = GetTrackerFilePath(StorageUrlFromString(fpath), 3354 TrackerFileType.DOWNLOAD, 3355 self.test_api) 3356 try: 3357 with open(tracker_filename, 'w') as tracker_fp: 3358 tracker_fp.write(etag) 3359 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) 3360 with SetBotoConfigForTest([boto_config_for_test]): 3361 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], 3362 return_stderr=True, 3363 expected_status=1) 3364 self.assertIn('Download already complete', stderr) 3365 self.assertIn('doesn\'t match cloud-supplied digest', stderr) 3366 # File and tracker file should be deleted. 3367 self.assertFalse(os.path.isfile(temp_download_file)) 3368 self.assertFalse(os.path.isfile(tracker_filename)) 3369 # Permanent file should not have been created. 3370 self.assertFalse(os.path.isfile(fpath)) 3371 finally: 3372 if os.path.exists(tracker_filename): 3373 os.unlink(tracker_filename) 3374 3375 def test_cp_resumable_download_content_matches(self): 3376 """Tests download no-ops when tracker file matches existing file.""" 3377 bucket_uri = self.CreateBucket() 3378 tmp_dir = self.CreateTempDir() 3379 fpath = self.CreateTempFile(tmpdir=tmp_dir) 3380 matching_contents = b'abcd' * ONE_KIB 3381 temp_download_file = fpath + '_.gstmp' 3382 with open(temp_download_file, 'wb') as fp: 3383 fp.write(matching_contents) 3384 3385 object_uri = self.CreateObject(bucket_uri=bucket_uri, 3386 object_name='foo', 3387 contents=matching_contents) 3388 stdout = self.RunGsUtil(['ls', '-L', suri(object_uri)], return_stdout=True) 3389 etag_match = re.search(r'\s*ETag:\s*(.*)', stdout) 3390 self.assertIsNotNone(etag_match, 'Could not get object ETag') 3391 self.assertEqual(len(etag_match.groups()), 1, 3392 'Did not match expected single ETag') 3393 etag = etag_match.group(1) 3394 tracker_filename = GetTrackerFilePath(StorageUrlFromString(fpath), 3395 TrackerFileType.DOWNLOAD, 3396 self.test_api) 3397 with open(tracker_filename, 'w') as tracker_fp: 3398 tracker_fp.write(etag) 3399 try: 3400 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) 3401 with SetBotoConfigForTest([boto_config_for_test]): 3402 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], 3403 return_stderr=True) 3404 self.assertIn('Download already complete', stderr) 3405 # Tracker file should be removed after successful hash validation. 3406 self.assertFalse(os.path.isfile(tracker_filename)) 3407 finally: 3408 if os.path.exists(tracker_filename): 3409 os.unlink(tracker_filename) 3410 3411 def test_cp_resumable_download_tracker_file_not_matches(self): 3412 """Tests that download overwrites when tracker file etag does not match.""" 3413 bucket_uri = self.CreateBucket() 3414 tmp_dir = self.CreateTempDir() 3415 fpath = self.CreateTempFile(tmpdir=tmp_dir, contents=b'abcd' * ONE_KIB) 3416 object_uri = self.CreateObject(bucket_uri=bucket_uri, 3417 object_name='foo', 3418 contents=b'efgh' * ONE_KIB) 3419 stdout = self.RunGsUtil(['ls', '-L', suri(object_uri)], return_stdout=True) 3420 etag_match = re.search(r'\s*ETag:\s*(.*)', stdout) 3421 self.assertIsNotNone(etag_match, 'Could not get object ETag') 3422 self.assertEqual(len(etag_match.groups()), 1, 3423 'Did not match regex for exactly one object ETag') 3424 etag = etag_match.group(1) 3425 etag += 'nonmatching' 3426 tracker_filename = GetTrackerFilePath(StorageUrlFromString(fpath), 3427 TrackerFileType.DOWNLOAD, 3428 self.test_api) 3429 with open(tracker_filename, 'w') as tracker_fp: 3430 tracker_fp.write(etag) 3431 try: 3432 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) 3433 with SetBotoConfigForTest([boto_config_for_test]): 3434 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], 3435 return_stderr=True) 3436 self.assertNotIn('Resuming download', stderr) 3437 # Ensure the file was overwritten. 3438 with open(fpath, 'r') as in_fp: 3439 contents = in_fp.read() 3440 self.assertEqual( 3441 contents, 'efgh' * ONE_KIB, 3442 'File not overwritten when it should have been ' 3443 'due to a non-matching tracker file.') 3444 self.assertFalse(os.path.isfile(tracker_filename)) 3445 finally: 3446 if os.path.exists(tracker_filename): 3447 os.unlink(tracker_filename) 3448 3449 def test_cp_double_gzip(self): 3450 """Tests that upload and download of a doubly-gzipped file succeeds.""" 3451 bucket_uri = self.CreateBucket() 3452 fpath = self.CreateTempFile(file_name='looks-zipped.gz', contents=b'foo') 3453 self.RunGsUtil([ 3454 '-h', 'content-type:application/gzip', 'cp', '-Z', 3455 suri(fpath), 3456 suri(bucket_uri, 'foo') 3457 ]) 3458 self.RunGsUtil(['cp', suri(bucket_uri, 'foo'), fpath]) 3459 3460 @SkipForS3('No compressed transport encoding support for S3.') 3461 @SkipForXML('No compressed transport encoding support for the XML API.') 3462 @SequentialAndParallelTransfer 3463 def test_cp_double_gzip_transport_encoded(self): 3464 """Tests that upload and download of a doubly-gzipped file succeeds.""" 3465 bucket_uri = self.CreateBucket() 3466 fpath = self.CreateTempFile(file_name='looks-zipped.gz', contents=b'foo') 3467 stderr = self.RunGsUtil([ 3468 '-D', '-h', 'content-type:application/gzip', 'cp', '-J', 3469 suri(fpath), 3470 suri(bucket_uri, 'foo') 3471 ], 3472 return_stderr=True) 3473 # Ensure the progress logger sees a gzip encoding. 3474 self.assertIn('send: Using gzip transport encoding for the request.', 3475 stderr) 3476 self.RunGsUtil(['cp', suri(bucket_uri, 'foo'), fpath]) 3477 3478 @SequentialAndParallelTransfer 3479 def test_cp_resumable_download_gzip(self): 3480 """Tests that download can be resumed successfully with a gzipped file.""" 3481 # Generate some reasonably incompressible data. This compresses to a bit 3482 # around 128K in practice, but we assert specifically below that it is 3483 # larger than self.halt_size to guarantee that we can halt the download 3484 # partway through. 3485 object_uri = self.CreateObject() 3486 random.seed(0) 3487 contents = str([ 3488 random.choice(string.ascii_letters) for _ in xrange(self.halt_size) 3489 ]).encode('ascii') 3490 random.seed() # Reset the seed for any other tests. 3491 fpath1 = self.CreateTempFile(file_name='unzipped.txt', contents=contents) 3492 self.RunGsUtil(['cp', '-z', 'txt', suri(fpath1), suri(object_uri)]) 3493 3494 # Use @Retry as hedge against bucket listing eventual consistency. 3495 @Retry(AssertionError, tries=3, timeout_secs=1) 3496 def _GetObjectSize(): 3497 stdout = self.RunGsUtil(['du', suri(object_uri)], return_stdout=True) 3498 size_match = re.search(r'(\d+)\s+.*', stdout) 3499 self.assertIsNotNone(size_match, 'Could not get object size') 3500 self.assertEqual(len(size_match.groups()), 1, 3501 'Did not match regex for exactly one object size.') 3502 return long(size_match.group(1)) 3503 3504 object_size = _GetObjectSize() 3505 self.assertGreaterEqual( 3506 object_size, self.halt_size, 3507 'Compresed object size was not large enough to ' 3508 'allow for a halted download, so the test results ' 3509 'would be invalid. Please increase the compressed ' 3510 'object size in the test.') 3511 fpath2 = self.CreateTempFile() 3512 test_callback_file = self.CreateTempFile( 3513 contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5))) 3514 3515 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) 3516 with SetBotoConfigForTest([boto_config_for_test]): 3517 stderr = self.RunGsUtil([ 3518 'cp', '--testcallbackfile', test_callback_file, 3519 suri(object_uri), 3520 suri(fpath2) 3521 ], 3522 return_stderr=True, 3523 expected_status=1) 3524 self.assertIn('Artifically halting download.', stderr) 3525 self.assertIn('Downloading to temp gzip filename', stderr) 3526 3527 # Tracker files will have different names depending on if we are 3528 # downloading sequentially or in parallel. 3529 sliced_download_threshold = HumanReadableToBytes( 3530 boto.config.get('GSUtil', 'sliced_object_download_threshold', 3531 DEFAULT_SLICED_OBJECT_DOWNLOAD_THRESHOLD)) 3532 sliced_download = (len(contents) > sliced_download_threshold and 3533 sliced_download_threshold > 0 and 3534 UsingCrcmodExtension()) 3535 if sliced_download: 3536 trackerfile_type = TrackerFileType.SLICED_DOWNLOAD 3537 else: 3538 trackerfile_type = TrackerFileType.DOWNLOAD 3539 tracker_filename = GetTrackerFilePath(StorageUrlFromString(fpath2), 3540 trackerfile_type, self.test_api) 3541 3542 # We should have a temporary gzipped file, a tracker file, and no 3543 # final file yet. 3544 self.assertTrue(os.path.isfile(tracker_filename)) 3545 self.assertTrue(os.path.isfile('%s_.gztmp' % fpath2)) 3546 stderr = self.RunGsUtil( 3547 ['cp', suri(object_uri), suri(fpath2)], return_stderr=True) 3548 self.assertIn('Resuming download', stderr) 3549 with open(fpath2, 'rb') as f: 3550 self.assertEqual(f.read(), contents, 'File contents did not match.') 3551 self.assertFalse(os.path.isfile(tracker_filename)) 3552 self.assertFalse(os.path.isfile('%s_.gztmp' % fpath2)) 3553 3554 def _GetFaviconFile(self): 3555 # Make a temp file from favicon.ico.gz. Finding the location of our test 3556 # data varies depending on how/where gsutil was installed, so we get the 3557 # data via pkgutil and use this workaround. 3558 if not hasattr(self, 'test_data_favicon_file'): 3559 contents = pkgutil.get_data('gslib', 'tests/test_data/favicon.ico.gz') 3560 self.test_data_favicon_file = self.CreateTempFile(contents=contents) 3561 return self.test_data_favicon_file 3562 3563 def test_cp_download_transfer_encoded(self): 3564 """Tests chunked transfer encoded download handling. 3565 3566 Tests that download works correctly with a gzipped chunked transfer-encoded 3567 object (which therefore lacks Content-Length) of a size that gets fetched 3568 in a single chunk (exercising downloading of objects lacking a length 3569 response header). 3570 """ 3571 # Upload a file / content-encoding / content-type that triggers this flow. 3572 # Note: We need to use the file with pre-zipped format and manually set the 3573 # content-encoding and content-type because the Python gzip module (used by 3574 # gsutil cp -Z) won't reproduce the bytes that trigger this problem. 3575 bucket_uri = self.CreateBucket() 3576 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo') 3577 input_filename = self._GetFaviconFile() 3578 self.RunGsUtil([ 3579 '-h', 'Content-Encoding:gzip', '-h', 'Content-Type:image/x-icon', 'cp', 3580 suri(input_filename), 3581 suri(object_uri) 3582 ]) 3583 # Compute the MD5 of the uncompressed bytes. 3584 with gzip.open(input_filename) as fp: 3585 hash_dict = {'md5': hashlib.md5()} 3586 hashing_helper.CalculateHashesFromContents(fp, hash_dict) 3587 in_file_md5 = hash_dict['md5'].digest() 3588 3589 # Downloading this file triggers the flow. 3590 fpath2 = self.CreateTempFile() 3591 self.RunGsUtil(['cp', suri(object_uri), suri(fpath2)]) 3592 # Compute MD5 of the downloaded (uncompressed) file, and validate it. 3593 with open(fpath2, 'rb') as fp: 3594 hash_dict = {'md5': hashlib.md5()} 3595 hashing_helper.CalculateHashesFromContents(fp, hash_dict) 3596 out_file_md5 = hash_dict['md5'].digest() 3597 self.assertEqual(in_file_md5, out_file_md5) 3598 3599 @SequentialAndParallelTransfer 3600 def test_cp_resumable_download_check_hashes_never(self): 3601 """Tests that resumble downloads work with check_hashes = never.""" 3602 bucket_uri = self.CreateBucket() 3603 contents = b'abcd' * self.halt_size 3604 object_uri = self.CreateObject(bucket_uri=bucket_uri, 3605 object_name='foo', 3606 contents=contents) 3607 fpath = self.CreateTempFile() 3608 test_callback_file = self.CreateTempFile( 3609 contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5))) 3610 3611 boto_config_for_test = [('GSUtil', 'resumable_threshold', str(ONE_KIB)), 3612 ('GSUtil', 'check_hashes', 'never')] 3613 with SetBotoConfigForTest(boto_config_for_test): 3614 stderr = self.RunGsUtil([ 3615 'cp', '--testcallbackfile', test_callback_file, 3616 suri(object_uri), fpath 3617 ], 3618 expected_status=1, 3619 return_stderr=True) 3620 self.assertIn('Artifically halting download.', stderr) 3621 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], 3622 return_stderr=True) 3623 self.assertIn('Resuming download', stderr) 3624 self.assertIn('Found no hashes to validate object downloaded', stderr) 3625 with open(fpath, 'rb') as f: 3626 self.assertEqual(f.read(), contents, 'File contents did not match.') 3627 3628 @SkipForS3('No resumable upload support for S3.') 3629 def test_cp_resumable_upload_bucket_deleted(self): 3630 """Tests that a not found exception is raised if bucket no longer exists.""" 3631 bucket_uri = self.CreateBucket() 3632 fpath = self.CreateTempFile(contents=b'a' * 2 * ONE_KIB) 3633 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) 3634 test_callback_file = self.CreateTempFile(contents=pickle.dumps( 3635 _DeleteBucketThenStartOverCopyCallbackHandler(5, bucket_uri))) 3636 3637 with SetBotoConfigForTest([boto_config_for_test]): 3638 stderr = self.RunGsUtil([ 3639 'cp', '--testcallbackfile', test_callback_file, fpath, 3640 suri(bucket_uri) 3641 ], 3642 return_stderr=True, 3643 expected_status=1) 3644 self.assertIn('Deleting bucket', stderr) 3645 self.assertIn('bucket does not exist', stderr) 3646 3647 @SkipForS3('No sliced download support for S3.') 3648 def test_cp_sliced_download(self): 3649 """Tests that sliced object download works in the general case.""" 3650 bucket_uri = self.CreateBucket() 3651 object_uri = self.CreateObject(bucket_uri=bucket_uri, 3652 object_name='foo', 3653 contents=b'abc' * ONE_KIB) 3654 fpath = self.CreateTempFile() 3655 3656 # Force fast crcmod to return True to test the basic sliced download 3657 # scenario, ensuring that if the user installs crcmod, it will work. 3658 boto_config_for_test = [ 3659 ('GSUtil', 'resumable_threshold', str(ONE_KIB)), 3660 ('GSUtil', 'test_assume_fast_crcmod', 'True'), 3661 ('GSUtil', 'sliced_object_download_threshold', str(ONE_KIB)), 3662 ('GSUtil', 'sliced_object_download_max_components', '3') 3663 ] 3664 3665 with SetBotoConfigForTest(boto_config_for_test): 3666 self.RunGsUtil(['cp', suri(object_uri), fpath]) 3667 3668 # Each tracker file should have been deleted. 3669 tracker_filenames = GetSlicedDownloadTrackerFilePaths( 3670 StorageUrlFromString(fpath), self.test_api) 3671 for tracker_filename in tracker_filenames: 3672 self.assertFalse(os.path.isfile(tracker_filename)) 3673 3674 with open(fpath, 'rb') as f: 3675 self.assertEqual(f.read(), b'abc' * ONE_KIB, 'File contents differ') 3676 3677 @unittest.skipUnless(UsingCrcmodExtension(), 3678 'Sliced download requires fast crcmod.') 3679 @SkipForS3('No sliced download support for S3.') 3680 def test_cp_unresumable_sliced_download(self): 3681 """Tests sliced download works when resumability is disabled.""" 3682 bucket_uri = self.CreateBucket() 3683 object_uri = self.CreateObject(bucket_uri=bucket_uri, 3684 object_name='foo', 3685 contents=b'abcd' * self.halt_size) 3686 fpath = self.CreateTempFile() 3687 test_callback_file = self.CreateTempFile( 3688 contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5))) 3689 3690 boto_config_for_test = [ 3691 ('GSUtil', 'resumable_threshold', str(self.halt_size * 5)), 3692 ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)), 3693 ('GSUtil', 'sliced_object_download_max_components', '4') 3694 ] 3695 3696 with SetBotoConfigForTest(boto_config_for_test): 3697 stderr = self.RunGsUtil([ 3698 'cp', '--testcallbackfile', test_callback_file, 3699 suri(object_uri), 3700 suri(fpath) 3701 ], 3702 return_stderr=True, 3703 expected_status=1) 3704 self.assertIn('not downloaded successfully', stderr) 3705 # Temporary download file should exist. 3706 self.assertTrue(os.path.isfile(fpath + '_.gstmp')) 3707 3708 # No tracker files should exist. 3709 tracker_filenames = GetSlicedDownloadTrackerFilePaths( 3710 StorageUrlFromString(fpath), self.test_api) 3711 for tracker_filename in tracker_filenames: 3712 self.assertFalse(os.path.isfile(tracker_filename)) 3713 3714 # Perform the entire download, without resuming. 3715 with SetBotoConfigForTest(boto_config_for_test): 3716 stderr = self.RunGsUtil( 3717 ['cp', suri(object_uri), suri(fpath)], return_stderr=True) 3718 self.assertNotIn('Resuming download', stderr) 3719 # Temporary download file should have been deleted. 3720 self.assertFalse(os.path.isfile(fpath + '_.gstmp')) 3721 with open(fpath, 'rb') as f: 3722 self.assertEqual(f.read(), b'abcd' * self.halt_size, 3723 'File contents differ') 3724 3725 @unittest.skipUnless(UsingCrcmodExtension(), 3726 'Sliced download requires fast crcmod.') 3727 @SkipForS3('No sliced download support for S3.') 3728 def test_cp_sliced_download_resume(self): 3729 """Tests that sliced object download is resumable.""" 3730 bucket_uri = self.CreateBucket() 3731 object_uri = self.CreateObject(bucket_uri=bucket_uri, 3732 object_name='foo', 3733 contents=b'abc' * self.halt_size) 3734 fpath = self.CreateTempFile() 3735 test_callback_file = self.CreateTempFile( 3736 contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5))) 3737 3738 boto_config_for_test = [ 3739 ('GSUtil', 'resumable_threshold', str(self.halt_size)), 3740 ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)), 3741 ('GSUtil', 'sliced_object_download_max_components', '3') 3742 ] 3743 3744 with SetBotoConfigForTest(boto_config_for_test): 3745 stderr = self.RunGsUtil([ 3746 'cp', '--testcallbackfile', test_callback_file, 3747 suri(object_uri), 3748 suri(fpath) 3749 ], 3750 return_stderr=True, 3751 expected_status=1) 3752 self.assertIn('not downloaded successfully', stderr) 3753 3754 # Each tracker file should exist. 3755 tracker_filenames = GetSlicedDownloadTrackerFilePaths( 3756 StorageUrlFromString(fpath), self.test_api) 3757 for tracker_filename in tracker_filenames: 3758 self.assertTrue(os.path.isfile(tracker_filename)) 3759 3760 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], 3761 return_stderr=True) 3762 self.assertIn('Resuming download', stderr) 3763 3764 # Each tracker file should have been deleted. 3765 tracker_filenames = GetSlicedDownloadTrackerFilePaths( 3766 StorageUrlFromString(fpath), self.test_api) 3767 for tracker_filename in tracker_filenames: 3768 self.assertFalse(os.path.isfile(tracker_filename)) 3769 3770 with open(fpath, 'rb') as f: 3771 self.assertEqual(f.read(), b'abc' * self.halt_size, 3772 'File contents differ') 3773 3774 @unittest.skipUnless(UsingCrcmodExtension(), 3775 'Sliced download requires fast crcmod.') 3776 @SkipForS3('No sliced download support for S3.') 3777 def test_cp_sliced_download_partial_resume(self): 3778 """Test sliced download resumability when some components are finished.""" 3779 bucket_uri = self.CreateBucket() 3780 object_uri = self.CreateObject(bucket_uri=bucket_uri, 3781 object_name='foo', 3782 contents=b'abc' * self.halt_size) 3783 fpath = self.CreateTempFile() 3784 test_callback_file = self.CreateTempFile( 3785 contents=pickle.dumps(HaltOneComponentCopyCallbackHandler(5))) 3786 3787 boto_config_for_test = [ 3788 ('GSUtil', 'resumable_threshold', str(self.halt_size)), 3789 ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)), 3790 ('GSUtil', 'sliced_object_download_max_components', '3') 3791 ] 3792 3793 with SetBotoConfigForTest(boto_config_for_test): 3794 stderr = self.RunGsUtil([ 3795 'cp', '--testcallbackfile', test_callback_file, 3796 suri(object_uri), 3797 suri(fpath) 3798 ], 3799 return_stderr=True, 3800 expected_status=1) 3801 self.assertIn('not downloaded successfully', stderr) 3802 3803 # Each tracker file should exist. 3804 tracker_filenames = GetSlicedDownloadTrackerFilePaths( 3805 StorageUrlFromString(fpath), self.test_api) 3806 for tracker_filename in tracker_filenames: 3807 self.assertTrue(os.path.isfile(tracker_filename)) 3808 3809 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], 3810 return_stderr=True) 3811 self.assertIn('Resuming download', stderr) 3812 self.assertIn('Download already complete', stderr) 3813 3814 # Each tracker file should have been deleted. 3815 tracker_filenames = GetSlicedDownloadTrackerFilePaths( 3816 StorageUrlFromString(fpath), self.test_api) 3817 for tracker_filename in tracker_filenames: 3818 self.assertFalse(os.path.isfile(tracker_filename)) 3819 3820 with open(fpath, 'rb') as f: 3821 self.assertEqual(f.read(), b'abc' * self.halt_size, 3822 'File contents differ') 3823 3824 @unittest.skipUnless(UsingCrcmodExtension(), 3825 'Sliced download requires fast crcmod.') 3826 @SkipForS3('No sliced download support for S3.') 3827 def test_cp_sliced_download_resume_content_differs(self): 3828 """Tests differing file contents are detected by sliced downloads.""" 3829 bucket_uri = self.CreateBucket() 3830 object_uri = self.CreateObject(bucket_uri=bucket_uri, 3831 object_name='foo', 3832 contents=b'abc' * self.halt_size) 3833 fpath = self.CreateTempFile(contents=b'') 3834 test_callback_file = self.CreateTempFile( 3835 contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5))) 3836 3837 boto_config_for_test = [ 3838 ('GSUtil', 'resumable_threshold', str(self.halt_size)), 3839 ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)), 3840 ('GSUtil', 'sliced_object_download_max_components', '3') 3841 ] 3842 3843 with SetBotoConfigForTest(boto_config_for_test): 3844 stderr = self.RunGsUtil([ 3845 'cp', '--testcallbackfile', test_callback_file, 3846 suri(object_uri), 3847 suri(fpath) 3848 ], 3849 return_stderr=True, 3850 expected_status=1) 3851 self.assertIn('not downloaded successfully', stderr) 3852 3853 # Temporary download file should exist. 3854 self.assertTrue(os.path.isfile(fpath + '_.gstmp')) 3855 3856 # Each tracker file should exist. 3857 tracker_filenames = GetSlicedDownloadTrackerFilePaths( 3858 StorageUrlFromString(fpath), self.test_api) 3859 for tracker_filename in tracker_filenames: 3860 self.assertTrue(os.path.isfile(tracker_filename)) 3861 3862 with open(fpath + '_.gstmp', 'r+b') as f: 3863 f.write(b'altered file contents') 3864 3865 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], 3866 return_stderr=True, 3867 expected_status=1) 3868 self.assertIn('Resuming download', stderr) 3869 self.assertIn('doesn\'t match cloud-supplied digest', stderr) 3870 self.assertIn('HashMismatchException: crc32c', stderr) 3871 3872 # Each tracker file should have been deleted. 3873 tracker_filenames = GetSlicedDownloadTrackerFilePaths( 3874 StorageUrlFromString(fpath), self.test_api) 3875 for tracker_filename in tracker_filenames: 3876 self.assertFalse(os.path.isfile(tracker_filename)) 3877 3878 # Temporary file should have been deleted due to hash mismatch. 3879 self.assertFalse(os.path.isfile(fpath + '_.gstmp')) 3880 # Final file should not exist. 3881 self.assertFalse(os.path.isfile(fpath)) 3882 3883 @unittest.skipUnless(UsingCrcmodExtension(), 3884 'Sliced download requires fast crcmod.') 3885 @SkipForS3('No sliced download support for S3.') 3886 def test_cp_sliced_download_component_size_changed(self): 3887 """Tests sliced download doesn't break when the boto config changes. 3888 3889 If the number of components used changes cross-process, the download should 3890 be restarted. 3891 """ 3892 bucket_uri = self.CreateBucket() 3893 object_uri = self.CreateObject(bucket_uri=bucket_uri, 3894 object_name='foo', 3895 contents=b'abcd' * self.halt_size) 3896 fpath = self.CreateTempFile() 3897 test_callback_file = self.CreateTempFile( 3898 contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5))) 3899 3900 boto_config_for_test = [ 3901 ('GSUtil', 'resumable_threshold', str(self.halt_size)), 3902 ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)), 3903 ('GSUtil', 'sliced_object_download_component_size', 3904 str(self.halt_size // 4)), 3905 ('GSUtil', 'sliced_object_download_max_components', '4') 3906 ] 3907 3908 with SetBotoConfigForTest(boto_config_for_test): 3909 stderr = self.RunGsUtil([ 3910 'cp', '--testcallbackfile', test_callback_file, 3911 suri(object_uri), 3912 suri(fpath) 3913 ], 3914 return_stderr=True, 3915 expected_status=1) 3916 self.assertIn('not downloaded successfully', stderr) 3917 3918 boto_config_for_test = [ 3919 ('GSUtil', 'resumable_threshold', str(self.halt_size)), 3920 ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)), 3921 ('GSUtil', 'sliced_object_download_component_size', 3922 str(self.halt_size // 2)), 3923 ('GSUtil', 'sliced_object_download_max_components', '2') 3924 ] 3925 3926 with SetBotoConfigForTest(boto_config_for_test): 3927 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], 3928 return_stderr=True) 3929 self.assertIn('Sliced download tracker file doesn\'t match ', stderr) 3930 self.assertIn('Restarting download from scratch', stderr) 3931 self.assertNotIn('Resuming download', stderr) 3932 3933 @unittest.skipUnless(UsingCrcmodExtension(), 3934 'Sliced download requires fast crcmod.') 3935 @SkipForS3('No sliced download support for S3.') 3936 def test_cp_sliced_download_disabled_cross_process(self): 3937 """Tests temporary files are not orphaned if sliced download is disabled. 3938 3939 Specifically, temporary files should be deleted when the corresponding 3940 non-sliced download is completed. 3941 """ 3942 bucket_uri = self.CreateBucket() 3943 object_uri = self.CreateObject(bucket_uri=bucket_uri, 3944 object_name='foo', 3945 contents=b'abcd' * self.halt_size) 3946 fpath = self.CreateTempFile() 3947 test_callback_file = self.CreateTempFile( 3948 contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5))) 3949 3950 boto_config_for_test = [ 3951 ('GSUtil', 'resumable_threshold', str(self.halt_size)), 3952 ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)), 3953 ('GSUtil', 'sliced_object_download_max_components', '4') 3954 ] 3955 3956 with SetBotoConfigForTest(boto_config_for_test): 3957 stderr = self.RunGsUtil([ 3958 'cp', '--testcallbackfile', test_callback_file, 3959 suri(object_uri), 3960 suri(fpath) 3961 ], 3962 return_stderr=True, 3963 expected_status=1) 3964 self.assertIn('not downloaded successfully', stderr) 3965 # Temporary download file should exist. 3966 self.assertTrue(os.path.isfile(fpath + '_.gstmp')) 3967 3968 # Each tracker file should exist. 3969 tracker_filenames = GetSlicedDownloadTrackerFilePaths( 3970 StorageUrlFromString(fpath), self.test_api) 3971 for tracker_filename in tracker_filenames: 3972 self.assertTrue(os.path.isfile(tracker_filename)) 3973 3974 # Disable sliced downloads by increasing the threshold 3975 boto_config_for_test = [ 3976 ('GSUtil', 'resumable_threshold', str(self.halt_size)), 3977 ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size * 5)), 3978 ('GSUtil', 'sliced_object_download_max_components', '4') 3979 ] 3980 3981 with SetBotoConfigForTest(boto_config_for_test): 3982 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], 3983 return_stderr=True) 3984 self.assertNotIn('Resuming download', stderr) 3985 # Temporary download file should have been deleted. 3986 self.assertFalse(os.path.isfile(fpath + '_.gstmp')) 3987 3988 # Each tracker file should have been deleted. 3989 for tracker_filename in tracker_filenames: 3990 self.assertFalse(os.path.isfile(tracker_filename)) 3991 with open(fpath, 'rb') as f: 3992 self.assertEqual(f.read(), b'abcd' * self.halt_size) 3993 3994 @SkipForS3('No resumable upload support for S3.') 3995 def test_cp_resumable_upload_start_over_http_error(self): 3996 for start_over_error in ( 3997 403, # If user doesn't have storage.buckets.get access to dest bucket. 3998 404, # If the dest bucket exists, but the dest object does not. 3999 410): # If the service tells us to restart the upload from scratch. 4000 self.start_over_error_test_helper(start_over_error) 4001 4002 def start_over_error_test_helper(self, http_error_num): 4003 bucket_uri = self.CreateBucket() 4004 # The object contents need to be fairly large to avoid the race condition 4005 # where the contents finish uploading before we artifically halt the copy. 4006 rand_chars = get_random_ascii_chars(size=(ONE_MIB * 4)) 4007 fpath = self.CreateTempFile(contents=rand_chars) 4008 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) 4009 if self.test_api == ApiSelector.JSON: 4010 test_callback_file = self.CreateTempFile( 4011 contents=pickle.dumps(_JSONForceHTTPErrorCopyCallbackHandler(5, 404))) 4012 elif self.test_api == ApiSelector.XML: 4013 test_callback_file = self.CreateTempFile(contents=pickle.dumps( 4014 _XMLResumableUploadStartOverCopyCallbackHandler(5))) 4015 4016 with SetBotoConfigForTest([boto_config_for_test]): 4017 stderr = self.RunGsUtil([ 4018 'cp', '--testcallbackfile', test_callback_file, fpath, 4019 suri(bucket_uri) 4020 ], 4021 return_stderr=True) 4022 self.assertIn('Restarting upload of', stderr) 4023 4024 def test_cp_minus_c(self): 4025 bucket_uri = self.CreateBucket() 4026 object_uri = self.CreateObject(bucket_uri=bucket_uri, 4027 object_name='foo', 4028 contents=b'foo') 4029 self.RunGsUtil([ 4030 'cp', '-c', 4031 suri(bucket_uri) + '/foo2', 4032 suri(object_uri), 4033 suri(bucket_uri) + '/dir/' 4034 ], 4035 expected_status=1) 4036 self.RunGsUtil(['stat', '%s/dir/foo' % suri(bucket_uri)]) 4037 4038 def test_rewrite_cp(self): 4039 """Tests the JSON Rewrite API.""" 4040 if self.test_api == ApiSelector.XML: 4041 return unittest.skip('Rewrite API is only supported in JSON.') 4042 bucket_uri = self.CreateBucket() 4043 object_uri = self.CreateObject(bucket_uri=bucket_uri, 4044 object_name='foo', 4045 contents=b'bar') 4046 gsutil_api = GcsJsonApi(BucketStorageUri, logging.getLogger(), 4047 DiscardMessagesQueue(), self.default_provider) 4048 key = object_uri.get_key() 4049 src_obj_metadata = apitools_messages.Object(name=key.name, 4050 bucket=key.bucket.name, 4051 contentType=key.content_type) 4052 dst_obj_metadata = apitools_messages.Object( 4053 bucket=src_obj_metadata.bucket, 4054 name=self.MakeTempName('object'), 4055 contentType=src_obj_metadata.contentType) 4056 gsutil_api.CopyObject(src_obj_metadata, dst_obj_metadata) 4057 self.assertEqual( 4058 gsutil_api.GetObjectMetadata(src_obj_metadata.bucket, 4059 src_obj_metadata.name, 4060 fields=['customerEncryption', 4061 'md5Hash']).md5Hash, 4062 gsutil_api.GetObjectMetadata(dst_obj_metadata.bucket, 4063 dst_obj_metadata.name, 4064 fields=['customerEncryption', 4065 'md5Hash']).md5Hash, 4066 'Error: Rewritten object\'s hash doesn\'t match source object.') 4067 4068 def test_rewrite_cp_resume(self): 4069 """Tests the JSON Rewrite API, breaking and resuming via a tracker file.""" 4070 if self.test_api == ApiSelector.XML: 4071 return unittest.skip('Rewrite API is only supported in JSON.') 4072 bucket_uri = self.CreateBucket() 4073 # Second bucket needs to be a different storage class so the service 4074 # actually rewrites the bytes. 4075 bucket_uri2 = self.CreateBucket( 4076 storage_class='durable_reduced_availability') 4077 # maxBytesPerCall must be >= 1 MiB, so create an object > 2 MiB because we 4078 # need 2 response from the service: 1 success, 1 failure prior to 4079 # completion. 4080 object_uri = self.CreateObject(bucket_uri=bucket_uri, 4081 object_name='foo', 4082 contents=(b'12' * ONE_MIB) + b'bar', 4083 prefer_json_api=True) 4084 gsutil_api = GcsJsonApi(BucketStorageUri, logging.getLogger(), 4085 DiscardMessagesQueue(), self.default_provider) 4086 key = object_uri.get_key() 4087 src_obj_metadata = apitools_messages.Object(name=key.name, 4088 bucket=key.bucket.name, 4089 contentType=key.content_type, 4090 etag=key.etag.strip('"\'')) 4091 dst_obj_name = self.MakeTempName('object') 4092 dst_obj_metadata = apitools_messages.Object( 4093 bucket=bucket_uri2.bucket_name, 4094 name=dst_obj_name, 4095 contentType=src_obj_metadata.contentType) 4096 tracker_file_name = GetRewriteTrackerFilePath(src_obj_metadata.bucket, 4097 src_obj_metadata.name, 4098 dst_obj_metadata.bucket, 4099 dst_obj_metadata.name, 4100 self.test_api) 4101 try: 4102 try: 4103 gsutil_api.CopyObject(src_obj_metadata, 4104 dst_obj_metadata, 4105 progress_callback=HaltingRewriteCallbackHandler( 4106 ONE_MIB * 2).call, 4107 max_bytes_per_call=ONE_MIB) 4108 self.fail('Expected RewriteHaltException.') 4109 except RewriteHaltException: 4110 pass 4111 4112 # Tracker file should be left over. 4113 self.assertTrue(os.path.exists(tracker_file_name)) 4114 4115 # Now resume. Callback ensures we didn't start over. 4116 gsutil_api.CopyObject( 4117 src_obj_metadata, 4118 dst_obj_metadata, 4119 progress_callback=EnsureRewriteResumeCallbackHandler(ONE_MIB * 4120 2).call, 4121 max_bytes_per_call=ONE_MIB) 4122 4123 # Copy completed; tracker file should be deleted. 4124 self.assertFalse(os.path.exists(tracker_file_name)) 4125 4126 self.assertEqual( 4127 gsutil_api.GetObjectMetadata(src_obj_metadata.bucket, 4128 src_obj_metadata.name, 4129 fields=['customerEncryption', 4130 'md5Hash']).md5Hash, 4131 gsutil_api.GetObjectMetadata(dst_obj_metadata.bucket, 4132 dst_obj_metadata.name, 4133 fields=['customerEncryption', 4134 'md5Hash']).md5Hash, 4135 'Error: Rewritten object\'s hash doesn\'t match source object.') 4136 finally: 4137 # Clean up if something went wrong. 4138 DeleteTrackerFile(tracker_file_name) 4139 4140 def test_rewrite_cp_resume_source_changed(self): 4141 """Tests that Rewrite starts over when the source object has changed.""" 4142 if self.test_api == ApiSelector.XML: 4143 return unittest.skip('Rewrite API is only supported in JSON.') 4144 bucket_uri = self.CreateBucket() 4145 # Second bucket needs to be a different storage class so the service 4146 # actually rewrites the bytes. 4147 bucket_uri2 = self.CreateBucket( 4148 storage_class='durable_reduced_availability') 4149 # maxBytesPerCall must be >= 1 MiB, so create an object > 2 MiB because we 4150 # need 2 response from the service: 1 success, 1 failure prior to 4151 # completion. 4152 object_uri = self.CreateObject(bucket_uri=bucket_uri, 4153 object_name='foo', 4154 contents=(b'12' * ONE_MIB) + b'bar', 4155 prefer_json_api=True) 4156 gsutil_api = GcsJsonApi(BucketStorageUri, logging.getLogger(), 4157 DiscardMessagesQueue(), self.default_provider) 4158 key = object_uri.get_key() 4159 src_obj_metadata = apitools_messages.Object(name=key.name, 4160 bucket=key.bucket.name, 4161 contentType=key.content_type, 4162 etag=key.etag.strip('"\'')) 4163 dst_obj_name = self.MakeTempName('object') 4164 dst_obj_metadata = apitools_messages.Object( 4165 bucket=bucket_uri2.bucket_name, 4166 name=dst_obj_name, 4167 contentType=src_obj_metadata.contentType) 4168 tracker_file_name = GetRewriteTrackerFilePath(src_obj_metadata.bucket, 4169 src_obj_metadata.name, 4170 dst_obj_metadata.bucket, 4171 dst_obj_metadata.name, 4172 self.test_api) 4173 try: 4174 try: 4175 gsutil_api.CopyObject(src_obj_metadata, 4176 dst_obj_metadata, 4177 progress_callback=HaltingRewriteCallbackHandler( 4178 ONE_MIB * 2).call, 4179 max_bytes_per_call=ONE_MIB) 4180 self.fail('Expected RewriteHaltException.') 4181 except RewriteHaltException: 4182 pass 4183 # Overwrite the original object. 4184 object_uri2 = self.CreateObject(bucket_uri=bucket_uri, 4185 object_name='foo', 4186 contents=b'bar', 4187 prefer_json_api=True) 4188 key2 = object_uri2.get_key() 4189 src_obj_metadata2 = apitools_messages.Object( 4190 name=key2.name, 4191 bucket=key2.bucket.name, 4192 contentType=key2.content_type, 4193 etag=key2.etag.strip('"\'')) 4194 4195 # Tracker file for original object should still exist. 4196 self.assertTrue(os.path.exists(tracker_file_name)) 4197 4198 # Copy the new object. 4199 gsutil_api.CopyObject(src_obj_metadata2, 4200 dst_obj_metadata, 4201 max_bytes_per_call=ONE_MIB) 4202 4203 # Copy completed; original tracker file should be deleted. 4204 self.assertFalse(os.path.exists(tracker_file_name)) 4205 4206 self.assertEqual( 4207 gsutil_api.GetObjectMetadata(src_obj_metadata2.bucket, 4208 src_obj_metadata2.name, 4209 fields=['customerEncryption', 4210 'md5Hash']).md5Hash, 4211 gsutil_api.GetObjectMetadata(dst_obj_metadata.bucket, 4212 dst_obj_metadata.name, 4213 fields=['customerEncryption', 4214 'md5Hash']).md5Hash, 4215 'Error: Rewritten object\'s hash doesn\'t match source object.') 4216 finally: 4217 # Clean up if something went wrong. 4218 DeleteTrackerFile(tracker_file_name) 4219 4220 def test_rewrite_cp_resume_command_changed(self): 4221 """Tests that Rewrite starts over when the arguments changed.""" 4222 if self.test_api == ApiSelector.XML: 4223 return unittest.skip('Rewrite API is only supported in JSON.') 4224 bucket_uri = self.CreateBucket() 4225 # Second bucket needs to be a different storage class so the service 4226 # actually rewrites the bytes. 4227 bucket_uri2 = self.CreateBucket( 4228 storage_class='durable_reduced_availability') 4229 # maxBytesPerCall must be >= 1 MiB, so create an object > 2 MiB because we 4230 # need 2 response from the service: 1 success, 1 failure prior to 4231 # completion. 4232 object_uri = self.CreateObject(bucket_uri=bucket_uri, 4233 object_name='foo', 4234 contents=(b'12' * ONE_MIB) + b'bar', 4235 prefer_json_api=True) 4236 gsutil_api = GcsJsonApi(BucketStorageUri, logging.getLogger(), 4237 DiscardMessagesQueue(), self.default_provider) 4238 key = object_uri.get_key() 4239 src_obj_metadata = apitools_messages.Object(name=key.name, 4240 bucket=key.bucket.name, 4241 contentType=key.content_type, 4242 etag=key.etag.strip('"\'')) 4243 dst_obj_name = self.MakeTempName('object') 4244 dst_obj_metadata = apitools_messages.Object( 4245 bucket=bucket_uri2.bucket_name, 4246 name=dst_obj_name, 4247 contentType=src_obj_metadata.contentType) 4248 tracker_file_name = GetRewriteTrackerFilePath(src_obj_metadata.bucket, 4249 src_obj_metadata.name, 4250 dst_obj_metadata.bucket, 4251 dst_obj_metadata.name, 4252 self.test_api) 4253 try: 4254 try: 4255 gsutil_api.CopyObject(src_obj_metadata, 4256 dst_obj_metadata, 4257 canned_acl='private', 4258 progress_callback=HaltingRewriteCallbackHandler( 4259 ONE_MIB * 2).call, 4260 max_bytes_per_call=ONE_MIB) 4261 self.fail('Expected RewriteHaltException.') 4262 except RewriteHaltException: 4263 pass 4264 4265 # Tracker file for original object should still exist. 4266 self.assertTrue(os.path.exists(tracker_file_name)) 4267 4268 # Copy the same object but with different call parameters. 4269 gsutil_api.CopyObject(src_obj_metadata, 4270 dst_obj_metadata, 4271 canned_acl='public-read', 4272 max_bytes_per_call=ONE_MIB) 4273 4274 # Copy completed; original tracker file should be deleted. 4275 self.assertFalse(os.path.exists(tracker_file_name)) 4276 4277 new_obj_metadata = gsutil_api.GetObjectMetadata( 4278 dst_obj_metadata.bucket, 4279 dst_obj_metadata.name, 4280 fields=['acl', 'customerEncryption', 'md5Hash']) 4281 self.assertEqual( 4282 gsutil_api.GetObjectMetadata(src_obj_metadata.bucket, 4283 src_obj_metadata.name, 4284 fields=['customerEncryption', 4285 'md5Hash']).md5Hash, 4286 new_obj_metadata.md5Hash, 4287 'Error: Rewritten object\'s hash doesn\'t match source object.') 4288 # New object should have a public-read ACL from the second command. 4289 found_public_acl = False 4290 for acl_entry in new_obj_metadata.acl: 4291 if acl_entry.entity == 'allUsers': 4292 found_public_acl = True 4293 self.assertTrue(found_public_acl, 4294 'New object was not written with a public ACL.') 4295 finally: 4296 # Clean up if something went wrong. 4297 DeleteTrackerFile(tracker_file_name) 4298 4299 @unittest.skipIf(IS_WINDOWS, 'POSIX attributes not available on Windows.') 4300 @unittest.skipUnless(UsingCrcmodExtension(), 'Test requires fast crcmod.') 4301 def test_cp_preserve_posix_bucket_to_dir_no_errors(self): 4302 """Tests use of the -P flag with cp from a bucket to a local dir. 4303 4304 Specifically tests combinations of POSIX attributes in metadata that will 4305 pass validation. 4306 """ 4307 bucket_uri = self.CreateBucket() 4308 tmpdir = self.CreateTempDir() 4309 TestCpMvPOSIXBucketToLocalNoErrors(self, bucket_uri, tmpdir, is_cp=True) 4310 4311 @unittest.skipIf(IS_WINDOWS, 'POSIX attributes not available on Windows.') 4312 def test_cp_preserve_posix_bucket_to_dir_errors(self): 4313 """Tests use of the -P flag with cp from a bucket to a local dir. 4314 4315 Specifically, combinations of POSIX attributes in metadata that will fail 4316 validation. 4317 """ 4318 bucket_uri = self.CreateBucket() 4319 tmpdir = self.CreateTempDir() 4320 4321 obj = self.CreateObject(bucket_uri=bucket_uri, 4322 object_name='obj', 4323 contents=b'obj') 4324 TestCpMvPOSIXBucketToLocalErrors(self, bucket_uri, obj, tmpdir, is_cp=True) 4325 4326 @unittest.skipIf(IS_WINDOWS, 'POSIX attributes not available on Windows.') 4327 def test_cp_preseve_posix_dir_to_bucket_no_errors(self): 4328 """Tests use of the -P flag with cp from a local dir to a bucket.""" 4329 bucket_uri = self.CreateBucket() 4330 TestCpMvPOSIXLocalToBucketNoErrors(self, bucket_uri, is_cp=True) 4331 4332 def test_cp_minus_s_to_non_cloud_dest_fails(self): 4333 """Test that cp -s operations to a non-cloud destination are prevented.""" 4334 local_file = self.CreateTempFile(contents=b'foo') 4335 dest_dir = self.CreateTempDir() 4336 stderr = self.RunGsUtil(['cp', '-s', 'standard', local_file, dest_dir], 4337 expected_status=1, 4338 return_stderr=True) 4339 self.assertIn('Cannot specify storage class for a non-cloud destination:', 4340 stderr) 4341 4342 # TODO: Remove @skip annotation from this test once we upgrade to the Boto 4343 # version that parses the storage class header for HEAD Object responses. 4344 @SkipForXML('Need Boto version > 2.46.1') 4345 def test_cp_specify_nondefault_storage_class(self): 4346 bucket_uri = self.CreateBucket() 4347 object_uri = self.CreateObject(bucket_uri=bucket_uri, 4348 object_name='foo', 4349 contents=b'foo') 4350 object2_suri = suri(object_uri) + 'bar' 4351 # Specify storage class name as mixed case here to ensure that it 4352 # gets normalized to uppercase (S3 would return an error otherwise), and 4353 # that using the normalized case is accepted by each API. 4354 nondefault_storage_class = { 4355 's3': 'Standard_iA', 4356 'gs': 'durable_REDUCED_availability' 4357 } 4358 storage_class = nondefault_storage_class[self.default_provider] 4359 self.RunGsUtil(['cp', '-s', storage_class, suri(object_uri), object2_suri]) 4360 stdout = self.RunGsUtil(['stat', object2_suri], return_stdout=True) 4361 self.assertRegexpMatchesWithFlags(stdout, 4362 r'Storage class:\s+%s' % storage_class, 4363 flags=re.IGNORECASE) 4364 4365 @SkipForS3('Test uses gs-specific storage classes.') 4366 def test_cp_sets_correct_dest_storage_class(self): 4367 """Tests that object storage class is set correctly with and without -s.""" 4368 # Use a non-default storage class as the default for the bucket. 4369 bucket_uri = self.CreateBucket(storage_class='nearline') 4370 # Ensure storage class is set correctly for a local-to-cloud copy. 4371 local_fname = 'foo-orig' 4372 local_fpath = self.CreateTempFile(contents=b'foo', file_name=local_fname) 4373 foo_cloud_suri = suri(bucket_uri) + '/' + local_fname 4374 self.RunGsUtil(['cp', '-s', 'standard', local_fpath, foo_cloud_suri]) 4375 with SetBotoConfigForTest([('GSUtil', 'prefer_api', 'json')]): 4376 stdout = self.RunGsUtil(['stat', foo_cloud_suri], return_stdout=True) 4377 self.assertRegexpMatchesWithFlags(stdout, 4378 r'Storage class:\s+STANDARD', 4379 flags=re.IGNORECASE) 4380 4381 # Ensure storage class is set correctly for a cloud-to-cloud copy when no 4382 # destination storage class is specified. 4383 foo_nl_suri = suri(bucket_uri) + '/foo-nl' 4384 self.RunGsUtil(['cp', foo_cloud_suri, foo_nl_suri]) 4385 # TODO: Remove with-clause after adding storage class parsing in Boto. 4386 with SetBotoConfigForTest([('GSUtil', 'prefer_api', 'json')]): 4387 stdout = self.RunGsUtil(['stat', foo_nl_suri], return_stdout=True) 4388 self.assertRegexpMatchesWithFlags(stdout, 4389 r'Storage class:\s+NEARLINE', 4390 flags=re.IGNORECASE) 4391 4392 # Ensure storage class is set correctly for a cloud-to-cloud copy when a 4393 # non-bucket-default storage class is specified. 4394 foo_std_suri = suri(bucket_uri) + '/foo-std' 4395 self.RunGsUtil(['cp', '-s', 'standard', foo_nl_suri, foo_std_suri]) 4396 # TODO: Remove with-clause after adding storage class parsing in Boto. 4397 with SetBotoConfigForTest([('GSUtil', 'prefer_api', 'json')]): 4398 stdout = self.RunGsUtil(['stat', foo_std_suri], return_stdout=True) 4399 self.assertRegexpMatchesWithFlags(stdout, 4400 r'Storage class:\s+STANDARD', 4401 flags=re.IGNORECASE) 4402 4403 def authorize_project_to_use_testing_kms_key( 4404 self, key_name=testcase.KmsTestingResources.CONSTANT_KEY_NAME): 4405 # Make sure our keyRing and cryptoKey exist. 4406 keyring_fqn = self.kms_api.CreateKeyRing( 4407 PopulateProjectId(None), 4408 testcase.KmsTestingResources.KEYRING_NAME, 4409 location=testcase.KmsTestingResources.KEYRING_LOCATION) 4410 key_fqn = self.kms_api.CreateCryptoKey(keyring_fqn, key_name) 4411 # Make sure that the service account for our default project is authorized 4412 # to use our test KMS key. 4413 self.RunGsUtil(['kms', 'authorize', '-k', key_fqn]) 4414 return key_fqn 4415 4416 @SkipForS3('Test uses gs-specific KMS encryption') 4417 def test_kms_key_correctly_applied_to_dst_obj_from_src_with_no_key(self): 4418 bucket_uri = self.CreateBucket() 4419 obj1_name = 'foo' 4420 obj2_name = 'bar' 4421 key_fqn = self.authorize_project_to_use_testing_kms_key() 4422 4423 # Create the unencrypted object, then copy it, specifying a KMS key for the 4424 # new object. 4425 obj_uri = self.CreateObject(bucket_uri=bucket_uri, 4426 object_name=obj1_name, 4427 contents=b'foo') 4428 with SetBotoConfigForTest([('GSUtil', 'encryption_key', key_fqn)]): 4429 self.RunGsUtil( 4430 ['cp', suri(obj_uri), 4431 '%s/%s' % (suri(bucket_uri), obj2_name)]) 4432 4433 # Make sure the new object is encrypted with the specified KMS key. 4434 with SetBotoConfigForTest([('GSUtil', 'prefer_api', 'json')]): 4435 self.AssertObjectUsesCMEK('%s/%s' % (suri(bucket_uri), obj2_name), 4436 key_fqn) 4437 4438 @SkipForS3('Test uses gs-specific KMS encryption') 4439 def test_kms_key_correctly_applied_to_dst_obj_from_local_file(self): 4440 bucket_uri = self.CreateBucket() 4441 fpath = self.CreateTempFile(contents=b'abcd') 4442 obj_name = 'foo' 4443 obj_suri = suri(bucket_uri) + '/' + obj_name 4444 key_fqn = self.authorize_project_to_use_testing_kms_key() 4445 4446 with SetBotoConfigForTest([('GSUtil', 'encryption_key', key_fqn)]): 4447 self.RunGsUtil(['cp', fpath, obj_suri]) 4448 4449 with SetBotoConfigForTest([('GSUtil', 'prefer_api', 'json')]): 4450 self.AssertObjectUsesCMEK(obj_suri, key_fqn) 4451 4452 @SkipForS3('Test uses gs-specific KMS encryption') 4453 def test_kms_key_works_with_resumable_upload(self): 4454 resumable_threshold = 1024 * 1024 # 1M 4455 bucket_uri = self.CreateBucket() 4456 fpath = self.CreateTempFile(contents=b'a' * resumable_threshold) 4457 obj_name = 'foo' 4458 obj_suri = suri(bucket_uri) + '/' + obj_name 4459 key_fqn = self.authorize_project_to_use_testing_kms_key() 4460 4461 with SetBotoConfigForTest([('GSUtil', 'encryption_key', key_fqn), 4462 ('GSUtil', 'resumable_threshold', 4463 str(resumable_threshold))]): 4464 self.RunGsUtil(['cp', fpath, obj_suri]) 4465 4466 with SetBotoConfigForTest([('GSUtil', 'prefer_api', 'json')]): 4467 self.AssertObjectUsesCMEK(obj_suri, key_fqn) 4468 4469 @SkipForS3('Test uses gs-specific KMS encryption') 4470 def test_kms_key_correctly_applied_to_dst_obj_from_src_with_diff_key(self): 4471 bucket_uri = self.CreateBucket() 4472 obj1_name = 'foo' 4473 obj2_name = 'bar' 4474 key1_fqn = self.authorize_project_to_use_testing_kms_key() 4475 key2_fqn = self.authorize_project_to_use_testing_kms_key( 4476 key_name=testcase.KmsTestingResources.CONSTANT_KEY_NAME2) 4477 obj1_suri = suri( 4478 self.CreateObject(bucket_uri=bucket_uri, 4479 object_name=obj1_name, 4480 contents=b'foo', 4481 kms_key_name=key1_fqn)) 4482 4483 # Copy the object to the same bucket, specifying a different key to be used. 4484 obj2_suri = '%s/%s' % (suri(bucket_uri), obj2_name) 4485 with SetBotoConfigForTest([('GSUtil', 'encryption_key', key2_fqn)]): 4486 self.RunGsUtil(['cp', obj1_suri, obj2_suri]) 4487 4488 # Ensure the new object has the different key. 4489 with SetBotoConfigForTest([('GSUtil', 'prefer_api', 'json')]): 4490 self.AssertObjectUsesCMEK(obj2_suri, key2_fqn) 4491 4492 @SkipForS3('Test uses gs-specific KMS encryption') 4493 @SkipForXML('Copying KMS-encrypted objects prohibited with XML API') 4494 def test_kms_key_not_applied_to_nonkms_dst_obj_from_src_with_kms_key(self): 4495 bucket_uri = self.CreateBucket() 4496 obj1_name = 'foo' 4497 obj2_name = 'bar' 4498 key1_fqn = self.authorize_project_to_use_testing_kms_key() 4499 obj1_suri = suri( 4500 self.CreateObject(bucket_uri=bucket_uri, 4501 object_name=obj1_name, 4502 contents=b'foo', 4503 kms_key_name=key1_fqn)) 4504 4505 # Copy the object to the same bucket, not specifying any KMS key. 4506 obj2_suri = '%s/%s' % (suri(bucket_uri), obj2_name) 4507 self.RunGsUtil(['cp', obj1_suri, obj2_suri]) 4508 4509 # Ensure the new object has no KMS key. 4510 with SetBotoConfigForTest([('GSUtil', 'prefer_api', 'json')]): 4511 self.AssertObjectUnencrypted(obj2_suri) 4512 4513 @unittest.skipUnless( 4514 IS_WINDOWS, 4515 'Only Windows paths need to be normalized to use backslashes instead of ' 4516 'forward slashes.') 4517 def test_windows_path_with_back_and_forward_slash_is_normalized(self): 4518 # Prior to this test and its corresponding fix, running 4519 # `gsutil cp dir/./file gs://bucket` would result in an object whose name 4520 # was "dir/./file", rather than just "file", as Windows tried to split on 4521 # the path component separator "\" intead of "/". 4522 tmp_dir = self.CreateTempDir() 4523 self.CreateTempFile(tmpdir=tmp_dir, file_name='obj1', contents=b'foo') 4524 bucket_uri = self.CreateBucket() 4525 self.RunGsUtil(['cp', '%s\\./obj1' % tmp_dir, suri(bucket_uri)]) 4526 # If the destination path was not created correctly, this stat call should 4527 # fail with a non-zero exit code because the specified object won't exist. 4528 self.RunGsUtil(['stat', '%s/obj1' % suri(bucket_uri)]) 4529 4530 def test_cp_minus_m_streaming_upload(self): 4531 """Tests that cp -m - anything is disallowed.""" 4532 stderr = self.RunGsUtil(['-m', 'cp', '-', 'file'], 4533 return_stderr=True, 4534 expected_status=1) 4535 self.assertIn( 4536 'CommandException: Cannot upload from a stream when using gsutil -m', 4537 stderr) 4538 4539 4540class TestCpUnitTests(testcase.GsUtilUnitTestCase): 4541 """Unit tests for gsutil cp.""" 4542 4543 def testDownloadWithNoHashAvailable(self): 4544 """Tests a download with no valid server-supplied hash.""" 4545 # S3 should have a special message for non-MD5 etags. 4546 bucket_uri = self.CreateBucket(provider='s3') 4547 object_uri = self.CreateObject(bucket_uri=bucket_uri, contents=b'foo') 4548 object_uri.get_key().etag = '12345' # Not an MD5 4549 dst_dir = self.CreateTempDir() 4550 4551 log_handler = self.RunCommand('cp', [suri(object_uri), dst_dir], 4552 return_log_handler=True) 4553 warning_messages = log_handler.messages['warning'] 4554 self.assertEquals(2, len(warning_messages)) 4555 self.assertRegex( 4556 warning_messages[0], r'Non-MD5 etag \(12345\) present for key .*, ' 4557 r'data integrity checks are not possible') 4558 self.assertIn('Integrity cannot be assured', warning_messages[1]) 4559 4560 def test_object_and_prefix_same_name(self): 4561 bucket_uri = self.CreateBucket() 4562 object_uri = self.CreateObject(bucket_uri=bucket_uri, 4563 object_name='foo', 4564 contents=b'foo') 4565 self.CreateObject(bucket_uri=bucket_uri, 4566 object_name='foo/bar', 4567 contents=b'bar') 4568 fpath = self.CreateTempFile() 4569 # MockKey doesn't support hash_algs, so the MD5 will not match. 4570 with SetBotoConfigForTest([('GSUtil', 'check_hashes', 'never')]): 4571 self.RunCommand('cp', [suri(object_uri), fpath]) 4572 with open(fpath, 'rb') as f: 4573 self.assertEqual(f.read(), b'foo') 4574 4575 def test_cp_upload_respects_no_hashes(self): 4576 bucket_uri = self.CreateBucket() 4577 fpath = self.CreateTempFile(contents=b'abcd') 4578 with SetBotoConfigForTest([('GSUtil', 'check_hashes', 'never')]): 4579 log_handler = self.RunCommand('cp', [fpath, suri(bucket_uri)], 4580 return_log_handler=True) 4581 warning_messages = log_handler.messages['warning'] 4582 self.assertEquals(1, len(warning_messages)) 4583 self.assertIn('Found no hashes to validate object upload', 4584 warning_messages[0]) 4585