1# -*- coding: utf-8 -*- 2# Copyright 2013 Google Inc. All Rights Reserved. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15"""Integration tests for cp command.""" 16 17from __future__ import absolute_import 18 19import base64 20import binascii 21import datetime 22from hashlib import md5 23import httplib 24import logging 25import os 26import pickle 27import pkgutil 28import random 29import re 30import string 31import sys 32import threading 33 34from apitools.base.py import exceptions as apitools_exceptions 35import boto 36from boto import storage_uri 37from boto.exception import ResumableTransferDisposition 38from boto.exception import StorageResponseError 39from boto.storage_uri import BucketStorageUri 40import crcmod 41 42from gslib.cloud_api import ResumableUploadStartOverException 43from gslib.commands.config import DEFAULT_SLICED_OBJECT_DOWNLOAD_THRESHOLD 44from gslib.copy_helper import GetTrackerFilePath 45from gslib.copy_helper import PARALLEL_UPLOAD_STATIC_SALT 46from gslib.copy_helper import PARALLEL_UPLOAD_TEMP_NAMESPACE 47from gslib.copy_helper import TrackerFileType 48from gslib.cs_api_map import ApiSelector 49from gslib.gcs_json_api import GcsJsonApi 50from gslib.hashing_helper import CalculateB64EncodedMd5FromContents 51from gslib.hashing_helper import CalculateMd5FromContents 52from gslib.parallel_tracker_file import ObjectFromTracker 53from gslib.parallel_tracker_file import WriteParallelUploadTrackerFile 54from gslib.posix_util import GID_ATTR 55from gslib.posix_util import MODE_ATTR 56from gslib.posix_util import NA_ID 57from gslib.posix_util import NA_MODE 58from gslib.posix_util import UID_ATTR 59from gslib.posix_util import ValidateFilePermissionAccess 60from gslib.posix_util import ValidatePOSIXMode 61from gslib.storage_url import StorageUrlFromString 62from gslib.tests.rewrite_helper import EnsureRewriteResumeCallbackHandler 63from gslib.tests.rewrite_helper import HaltingRewriteCallbackHandler 64from gslib.tests.rewrite_helper import RewriteHaltException 65import gslib.tests.testcase as testcase 66from gslib.tests.testcase.base import NotParallelizable 67from gslib.tests.testcase.integration_testcase import SkipForS3 68from gslib.tests.testcase.integration_testcase import SkipForXML 69from gslib.tests.util import BuildErrorRegex 70from gslib.tests.util import GenerationFromURI as urigen 71from gslib.tests.util import HaltingCopyCallbackHandler 72from gslib.tests.util import HaltOneComponentCopyCallbackHandler 73from gslib.tests.util import HAS_GS_PORT 74from gslib.tests.util import HAS_S3_CREDS 75from gslib.tests.util import ObjectToURI as suri 76from gslib.tests.util import ORPHANED_FILE 77from gslib.tests.util import POSIX_GID_ERROR 78from gslib.tests.util import POSIX_INSUFFICIENT_ACCESS_ERROR 79from gslib.tests.util import POSIX_MODE_ERROR 80from gslib.tests.util import POSIX_UID_ERROR 81from gslib.tests.util import SequentialAndParallelTransfer 82from gslib.tests.util import SetBotoConfigForTest 83from gslib.tests.util import TailSet 84from gslib.tests.util import TEST_ENCRYPTION_KEY1 85from gslib.tests.util import TEST_ENCRYPTION_KEY1_SHA256_B64 86from gslib.tests.util import TEST_ENCRYPTION_KEY2 87from gslib.tests.util import TEST_ENCRYPTION_KEY3 88from gslib.tests.util import unittest 89from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages 90from gslib.tracker_file import DeleteTrackerFile 91from gslib.tracker_file import GetRewriteTrackerFilePath 92from gslib.tracker_file import GetSlicedDownloadTrackerFilePaths 93from gslib.ui_controller import BytesToFixedWidthString 94from gslib.util import DiscardMessagesQueue 95from gslib.util import EIGHT_MIB 96from gslib.util import HumanReadableToBytes 97from gslib.util import IS_WINDOWS 98from gslib.util import MakeHumanReadable 99from gslib.util import ONE_KIB 100from gslib.util import ONE_MIB 101from gslib.util import Retry 102from gslib.util import START_CALLBACK_PER_BYTES 103from gslib.util import UsingCrcmodExtension 104from gslib.util import UTF8 105 106# These POSIX-specific variables aren't defined for Windows. 107# pylint: disable=g-import-not-at-top 108if not IS_WINDOWS: 109 from gslib.tests.util import DEFAULT_MODE 110 from gslib.tests.util import INVALID_GID 111 from gslib.tests.util import INVALID_UID 112 from gslib.tests.util import NON_PRIMARY_GID 113 from gslib.tests.util import PRIMARY_GID 114 from gslib.tests.util import USER_ID 115# pylint: enable=g-import-not-at-top 116 117 118def TestCpMvPOSIXBucketToLocalErrors(cls, bucket_uri, obj, tmpdir, is_cp=True): 119 """Helper function for preserve_posix_errors tests in test_cp and test_mv. 120 121 Args: 122 cls: An instance of either TestCp or TestMv. 123 bucket_uri: The uri of the bucket that the object is in. 124 obj: The object to run the tests on. 125 tmpdir: The local file path to cp to. 126 is_cp: Whether or not the calling test suite is cp or mv. 127 """ 128 error = 'error' 129 # A dict of test_name: attrs_dict. 130 # attrs_dict holds the different attributes that we want for the object in a 131 # specific test. 132 test_params = {'test1': {MODE_ATTR: '333', error: POSIX_MODE_ERROR}, 133 'test2': {GID_ATTR: INVALID_GID(), error: POSIX_GID_ERROR}, 134 'test3': {GID_ATTR: INVALID_GID(), MODE_ATTR: '420', 135 error: POSIX_GID_ERROR}, 136 'test4': {UID_ATTR: INVALID_UID(), error: POSIX_UID_ERROR}, 137 'test5': {UID_ATTR: INVALID_UID(), MODE_ATTR: '530', 138 error: POSIX_UID_ERROR}, 139 'test6': {UID_ATTR: INVALID_UID(), GID_ATTR: INVALID_GID(), 140 error: POSIX_UID_ERROR}, 141 'test7': {UID_ATTR: INVALID_UID(), GID_ATTR: INVALID_GID(), 142 MODE_ATTR: '640', error: POSIX_UID_ERROR}, 143 'test8': {UID_ATTR: INVALID_UID(), GID_ATTR: PRIMARY_GID, 144 error: POSIX_UID_ERROR}, 145 'test9': {UID_ATTR: INVALID_UID(), GID_ATTR: NON_PRIMARY_GID(), 146 error: POSIX_UID_ERROR}, 147 'test10': {UID_ATTR: INVALID_UID(), GID_ATTR: PRIMARY_GID, 148 MODE_ATTR: '640', error: POSIX_UID_ERROR}, 149 'test11': {UID_ATTR: INVALID_UID(), 150 GID_ATTR: NON_PRIMARY_GID(), 151 MODE_ATTR: '640', error: POSIX_UID_ERROR}, 152 'test12': {UID_ATTR: USER_ID, GID_ATTR: INVALID_GID(), 153 error: POSIX_GID_ERROR}, 154 'test13': {UID_ATTR: USER_ID, GID_ATTR: INVALID_GID(), 155 MODE_ATTR: '640', error: POSIX_GID_ERROR}, 156 'test14': {GID_ATTR: PRIMARY_GID, MODE_ATTR: '240', 157 error: POSIX_INSUFFICIENT_ACCESS_ERROR}} 158 # The first variable below can be used to help debug the test if there is a 159 # problem. 160 for test_name, attrs_dict in test_params.iteritems(): 161 cls.ClearPOSIXMetadata(obj) 162 163 # Attributes default to None if they are not in attrs_dict. 164 uid = attrs_dict.get(UID_ATTR) 165 gid = attrs_dict.get(GID_ATTR) 166 mode = attrs_dict.get(MODE_ATTR) 167 cls.SetPOSIXMetadata(cls.default_provider, bucket_uri.bucket_name, 168 obj.object_name, uid=uid, gid=gid, mode=mode) 169 stderr = cls.RunGsUtil(['cp' if is_cp else 'mv', '-P', 170 suri(bucket_uri, obj.object_name), tmpdir], 171 expected_status=1, return_stderr=True) 172 cls.assertIn(ORPHANED_FILE, stderr, '%s not found in stderr\n%s' 173 % (ORPHANED_FILE, stderr)) 174 error_regex = BuildErrorRegex(obj, attrs_dict.get(error)) 175 cls.assertTrue( 176 error_regex.search(stderr), 177 'Test %s did not match expected error; could not find a match for ' 178 '%s\n\nin stderr:\n%s' % (test_name, error_regex.pattern, stderr)) 179 listing1 = TailSet(suri(bucket_uri), cls.FlatListBucket(bucket_uri)) 180 listing2 = TailSet(tmpdir, cls.FlatListDir(tmpdir)) 181 # Bucket should have un-altered content. 182 cls.assertEquals(listing1, set(['/%s' % obj.object_name])) 183 # Dir should have un-altered content. 184 cls.assertEquals(listing2, set([''])) 185 186 187def TestCpMvPOSIXBucketToLocalNoErrors(cls, bucket_uri, tmpdir, is_cp=True): 188 """Helper function for preserve_posix_no_errors tests in test_cp and test_mv. 189 190 Args: 191 cls: An instance of either TestCp or TestMv. 192 bucket_uri: The uri of the bucket that the object is in. 193 tmpdir: The local file path to cp to. 194 is_cp: Whether or not the calling test suite is cp or mv. 195 """ 196 test_params = {'obj1': {GID_ATTR: PRIMARY_GID}, 197 'obj2': {GID_ATTR: NON_PRIMARY_GID()}, 198 'obj3': {GID_ATTR: PRIMARY_GID, MODE_ATTR: '440'}, 199 'obj4': {GID_ATTR: NON_PRIMARY_GID(), MODE_ATTR: '444'}, 200 'obj5': {UID_ATTR: USER_ID}, 201 'obj6': {UID_ATTR: USER_ID, MODE_ATTR: '420'}, 202 'obj7': {UID_ATTR: USER_ID, GID_ATTR: PRIMARY_GID}, 203 'obj8': {UID_ATTR: USER_ID, GID_ATTR: NON_PRIMARY_GID()}, 204 'obj9': {UID_ATTR: USER_ID, GID_ATTR: PRIMARY_GID, 205 MODE_ATTR: '433'}, 206 'obj10': {UID_ATTR: USER_ID, GID_ATTR: NON_PRIMARY_GID(), 207 MODE_ATTR: '442'}} 208 for obj_name, attrs_dict in test_params.iteritems(): 209 uid = attrs_dict.get(UID_ATTR) 210 gid = attrs_dict.get(GID_ATTR) 211 mode = attrs_dict.get(MODE_ATTR) 212 cls.CreateObject(bucket_uri=bucket_uri, object_name=obj_name, 213 contents=obj_name, uid=uid, gid=gid, mode=mode) 214 for obj_name in test_params.iterkeys(): 215 # Move objects one at a time to avoid listing consistency. 216 cls.RunGsUtil(['cp' if is_cp else 'mv', '-P', suri(bucket_uri, obj_name), 217 tmpdir]) 218 listing = TailSet(tmpdir, cls.FlatListDir(tmpdir)) 219 cls.assertEquals(listing, set(['/obj1', '/obj2', '/obj3', '/obj4', '/obj5', 220 '/obj6', '/obj7', '/obj8', '/obj9', '/obj10'])) 221 cls.VerifyLocalPOSIXPermissions(os.path.join(tmpdir, 'obj1'), 222 gid=PRIMARY_GID, mode=DEFAULT_MODE) 223 cls.VerifyLocalPOSIXPermissions(os.path.join(tmpdir, 'obj2'), 224 gid=NON_PRIMARY_GID(), mode=DEFAULT_MODE) 225 cls.VerifyLocalPOSIXPermissions(os.path.join(tmpdir, 'obj3'), 226 gid=PRIMARY_GID, mode=0o440) 227 cls.VerifyLocalPOSIXPermissions(os.path.join(tmpdir, 'obj4'), 228 gid=NON_PRIMARY_GID(), mode=0o444) 229 cls.VerifyLocalPOSIXPermissions(os.path.join(tmpdir, 'obj5'), 230 uid=USER_ID, gid=PRIMARY_GID, 231 mode=DEFAULT_MODE) 232 cls.VerifyLocalPOSIXPermissions(os.path.join(tmpdir, 'obj6'), 233 uid=USER_ID, gid=PRIMARY_GID, mode=0o420) 234 cls.VerifyLocalPOSIXPermissions(os.path.join(tmpdir, 'obj7'), 235 uid=USER_ID, gid=PRIMARY_GID, 236 mode=DEFAULT_MODE) 237 cls.VerifyLocalPOSIXPermissions(os.path.join(tmpdir, 'obj8'), 238 uid=USER_ID, gid=NON_PRIMARY_GID(), 239 mode=DEFAULT_MODE) 240 cls.VerifyLocalPOSIXPermissions(os.path.join(tmpdir, 'obj9'), 241 uid=USER_ID, gid=PRIMARY_GID, mode=0o433) 242 cls.VerifyLocalPOSIXPermissions(os.path.join(tmpdir, 'obj10'), 243 uid=USER_ID, gid=NON_PRIMARY_GID(), 244 mode=0o442) 245 246 247def TestCpMvPOSIXLocalToBucketNoErrors(cls, bucket_uri, is_cp=True): 248 """Helper function for testing local to bucket POSIX preservation.""" 249 test_params = {'obj1': {GID_ATTR: PRIMARY_GID}, 250 'obj2': {GID_ATTR: NON_PRIMARY_GID()}, 251 'obj3': {GID_ATTR: PRIMARY_GID, MODE_ATTR: '440'}, 252 'obj4': {GID_ATTR: NON_PRIMARY_GID(), MODE_ATTR: '444'}, 253 'obj5': {UID_ATTR: USER_ID}, 254 'obj6': {UID_ATTR: USER_ID, MODE_ATTR: '420'}, 255 'obj7': {UID_ATTR: USER_ID, GID_ATTR: PRIMARY_GID}, 256 'obj8': {UID_ATTR: USER_ID, GID_ATTR: NON_PRIMARY_GID()}, 257 'obj9': {UID_ATTR: USER_ID, GID_ATTR: PRIMARY_GID, 258 MODE_ATTR: '433'}, 259 'obj10': {UID_ATTR: USER_ID, GID_ATTR: NON_PRIMARY_GID(), 260 MODE_ATTR: '442'}} 261 for obj_name, attrs_dict in test_params.iteritems(): 262 uid = attrs_dict.get(UID_ATTR, NA_ID) 263 gid = attrs_dict.get(GID_ATTR, NA_ID) 264 mode = attrs_dict.get(MODE_ATTR, NA_MODE) 265 if mode != NA_MODE: 266 ValidatePOSIXMode(int(mode, 8)) 267 ValidateFilePermissionAccess(obj_name, uid=uid, gid=gid, mode=mode) 268 fpath = cls.CreateTempFile(contents='foo', uid=uid, gid=gid, mode=mode) 269 cls.RunGsUtil(['cp' if is_cp else 'mv', '-P', fpath, 270 suri(bucket_uri, obj_name)]) 271 if uid != NA_ID: 272 cls.VerifyObjectCustomAttribute(bucket_uri.bucket_name, obj_name, 273 UID_ATTR, str(uid)) 274 if gid != NA_ID: 275 cls.VerifyObjectCustomAttribute(bucket_uri.bucket_name, obj_name, 276 GID_ATTR, str(gid)) 277 if mode != NA_MODE: 278 cls.VerifyObjectCustomAttribute(bucket_uri.bucket_name, obj_name, 279 MODE_ATTR, str(mode)) 280 281 282def _ReadContentsFromFifo(fifo_path, list_for_output): 283 with open(fifo_path, 'rb') as f: 284 list_for_output.append(f.read()) 285 286 287def _WriteContentsToFifo(contents, fifo_path): 288 with open(fifo_path, 'wb') as f: 289 f.write(contents) 290 291 292class _JSONForceHTTPErrorCopyCallbackHandler(object): 293 """Test callback handler that raises an arbitrary HTTP error exception.""" 294 295 def __init__(self, startover_at_byte, http_error_num): 296 self._startover_at_byte = startover_at_byte 297 self._http_error_num = http_error_num 298 self.started_over_once = False 299 300 # pylint: disable=invalid-name 301 def call(self, total_bytes_transferred, total_size): 302 """Forcibly exits if the transfer has passed the halting point.""" 303 if (total_bytes_transferred >= self._startover_at_byte 304 and not self.started_over_once): 305 sys.stderr.write( 306 'Forcing HTTP error %s after byte %s. ' 307 '%s/%s transferred.\r\n' % ( 308 self._http_error_num, 309 self._startover_at_byte, 310 MakeHumanReadable(total_bytes_transferred), 311 MakeHumanReadable(total_size))) 312 self.started_over_once = True 313 raise apitools_exceptions.HttpError( 314 {'status': self._http_error_num}, None, None) 315 316 317class _XMLResumableUploadStartOverCopyCallbackHandler(object): 318 """Test callback handler that raises start-over exception during upload.""" 319 320 def __init__(self, startover_at_byte): 321 self._startover_at_byte = startover_at_byte 322 self.started_over_once = False 323 324 # pylint: disable=invalid-name 325 def call(self, total_bytes_transferred, total_size): 326 """Forcibly exits if the transfer has passed the halting point.""" 327 if (total_bytes_transferred >= self._startover_at_byte 328 and not self.started_over_once): 329 sys.stderr.write( 330 'Forcing ResumableUpload start over error after byte %s. ' 331 '%s/%s transferred.\r\n' % ( 332 self._startover_at_byte, 333 MakeHumanReadable(total_bytes_transferred), 334 MakeHumanReadable(total_size))) 335 self.started_over_once = True 336 raise boto.exception.ResumableUploadException( 337 'Forcing upload start over', 338 ResumableTransferDisposition.START_OVER) 339 340 341class _DeleteBucketThenStartOverCopyCallbackHandler(object): 342 """Test callback handler that deletes bucket then raises start-over.""" 343 344 def __init__(self, startover_at_byte, bucket_uri): 345 self._startover_at_byte = startover_at_byte 346 self._bucket_uri = bucket_uri 347 self.started_over_once = False 348 349 # pylint: disable=invalid-name 350 def call(self, total_bytes_transferred, total_size): 351 """Forcibly exits if the transfer has passed the halting point.""" 352 if (total_bytes_transferred >= self._startover_at_byte 353 and not self.started_over_once): 354 sys.stderr.write('Deleting bucket (%s)' %(self._bucket_uri.bucket_name)) 355 356 @Retry(StorageResponseError, tries=5, timeout_secs=1) 357 def DeleteBucket(): 358 bucket_list = list(self._bucket_uri.list_bucket(all_versions=True)) 359 for k in bucket_list: 360 self._bucket_uri.get_bucket().delete_key(k.name, 361 version_id=k.version_id) 362 self._bucket_uri.delete_bucket() 363 364 DeleteBucket() 365 sys.stderr.write( 366 'Forcing ResumableUpload start over error after byte %s. ' 367 '%s/%s transferred.\r\n' % ( 368 self._startover_at_byte, 369 MakeHumanReadable(total_bytes_transferred), 370 MakeHumanReadable(total_size))) 371 self.started_over_once = True 372 raise ResumableUploadStartOverException( 373 'Artificially forcing start-over') 374 375 376class _ResumableUploadRetryHandler(object): 377 """Test callback handler for causing retries during a resumable transfer.""" 378 379 def __init__(self, retry_at_byte, exception_to_raise, exc_args, 380 num_retries=1): 381 self._retry_at_byte = retry_at_byte 382 self._exception_to_raise = exception_to_raise 383 self._exception_args = exc_args 384 self._num_retries = num_retries 385 386 self._retries_made = 0 387 388 # pylint: disable=invalid-name 389 def call(self, total_bytes_transferred, unused_total_size): 390 """Cause a single retry at the retry point.""" 391 if (total_bytes_transferred >= self._retry_at_byte 392 and self._retries_made < self._num_retries): 393 self._retries_made += 1 394 raise self._exception_to_raise(*self._exception_args) 395 396 397class TestCp(testcase.GsUtilIntegrationTestCase): 398 """Integration tests for cp command.""" 399 400 # For tests that artificially halt, we need to ensure at least one callback 401 # occurs. 402 halt_size = START_CALLBACK_PER_BYTES * 2 403 404 def _get_test_file(self, name): 405 contents = pkgutil.get_data('gslib', 'tests/test_data/%s' % name) 406 return self.CreateTempFile(file_name=name, contents=contents) 407 408 def _CpWithFifoViaGsUtilAndAppendOutputToList( 409 self, src_path_tuple, dst_path, list_for_return_value, **kwargs): 410 arg_list = ['cp'] 411 arg_list.extend(src_path_tuple) 412 arg_list.append(dst_path) 413 # Append stderr, stdout, or return status (if specified in kwargs) to the 414 # given list. 415 list_for_return_value.append( 416 self.RunGsUtil(arg_list, **kwargs)) 417 418 @SequentialAndParallelTransfer 419 def test_noclobber(self): 420 key_uri = self.CreateObject(contents='foo') 421 fpath = self.CreateTempFile(contents='bar') 422 stderr = self.RunGsUtil(['cp', '-n', fpath, suri(key_uri)], 423 return_stderr=True) 424 self.assertIn('Skipping existing item: %s' % suri(key_uri), stderr) 425 self.assertEqual(key_uri.get_contents_as_string(), 'foo') 426 stderr = self.RunGsUtil(['cp', '-n', suri(key_uri), fpath], 427 return_stderr=True) 428 with open(fpath, 'r') as f: 429 self.assertIn('Skipping existing item: %s' % suri(f), stderr) 430 self.assertEqual(f.read(), 'bar') 431 432 def test_dest_bucket_not_exist(self): 433 fpath = self.CreateTempFile(contents='foo') 434 invalid_bucket_uri = ( 435 '%s://%s' % (self.default_provider, self.nonexistent_bucket_name)) 436 stderr = self.RunGsUtil(['cp', fpath, invalid_bucket_uri], 437 expected_status=1, return_stderr=True) 438 self.assertIn('does not exist', stderr) 439 440 def test_copy_in_cloud_noclobber(self): 441 bucket1_uri = self.CreateBucket() 442 bucket2_uri = self.CreateBucket() 443 key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents='foo') 444 stderr = self.RunGsUtil(['cp', suri(key_uri), suri(bucket2_uri)], 445 return_stderr=True) 446 # Rewrite API may output an additional 'Copying' progress notification. 447 self.assertGreaterEqual(stderr.count('Copying'), 1) 448 self.assertLessEqual(stderr.count('Copying'), 2) 449 stderr = self.RunGsUtil(['cp', '-n', suri(key_uri), suri(bucket2_uri)], 450 return_stderr=True) 451 self.assertIn('Skipping existing item: %s' % 452 suri(bucket2_uri, key_uri.object_name), stderr) 453 454 @unittest.skipIf(IS_WINDOWS, 'os.mkfifo not available on Windows.') 455 @SequentialAndParallelTransfer 456 def test_cp_from_local_file_to_fifo(self): 457 contents = 'bar' 458 fifo_path = self.CreateTempFifo() 459 file_path = self.CreateTempFile(contents=contents) 460 list_for_output = [] 461 462 read_thread = threading.Thread( 463 target=_ReadContentsFromFifo, 464 args=(fifo_path, list_for_output)) 465 read_thread.start() 466 write_thread = threading.Thread( 467 target=self._CpWithFifoViaGsUtilAndAppendOutputToList, 468 args=((file_path,), fifo_path, [])) 469 write_thread.start() 470 write_thread.join(120) 471 read_thread.join(120) 472 if not list_for_output: 473 self.fail('Reading/writing to the fifo timed out.') 474 self.assertEqual(list_for_output[0].strip(), contents) 475 476 @unittest.skipIf(IS_WINDOWS, 'os.mkfifo not available on Windows.') 477 @SequentialAndParallelTransfer 478 def test_cp_from_one_object_to_fifo(self): 479 fifo_path = self.CreateTempFifo() 480 bucket_uri = self.CreateBucket() 481 contents = 'bar' 482 obj_uri = self.CreateObject(bucket_uri=bucket_uri, contents=contents) 483 list_for_output = [] 484 485 read_thread = threading.Thread( 486 target=_ReadContentsFromFifo, 487 args=(fifo_path, list_for_output)) 488 read_thread.start() 489 write_thread = threading.Thread( 490 target=self._CpWithFifoViaGsUtilAndAppendOutputToList, 491 args=((suri(obj_uri),), fifo_path, [])) 492 write_thread.start() 493 write_thread.join(120) 494 read_thread.join(120) 495 if not list_for_output: 496 self.fail('Reading/writing to the fifo timed out.') 497 self.assertEqual(list_for_output[0].strip(), contents) 498 499 @unittest.skipIf(IS_WINDOWS, 'os.mkfifo not available on Windows.') 500 @SequentialAndParallelTransfer 501 def test_cp_from_multiple_objects_to_fifo(self): 502 fifo_path = self.CreateTempFifo() 503 bucket_uri = self.CreateBucket() 504 contents1 = 'foo and bar' 505 contents2 = 'baz and qux' 506 obj1_uri = self.CreateObject(bucket_uri=bucket_uri, contents=contents1) 507 obj2_uri = self.CreateObject(bucket_uri=bucket_uri, contents=contents2) 508 list_for_output = [] 509 510 read_thread = threading.Thread( 511 target=_ReadContentsFromFifo, 512 args=(fifo_path, list_for_output)) 513 read_thread.start() 514 write_thread = threading.Thread( 515 target=self._CpWithFifoViaGsUtilAndAppendOutputToList, 516 args=((suri(obj1_uri), suri(obj2_uri)), fifo_path, [])) 517 write_thread.start() 518 write_thread.join(120) 519 read_thread.join(120) 520 if not list_for_output: 521 self.fail('Reading/writing to the fifo timed out.') 522 self.assertIn(contents1, list_for_output[0]) 523 self.assertIn(contents2, list_for_output[0]) 524 525 @SequentialAndParallelTransfer 526 def test_streaming(self): 527 bucket_uri = self.CreateBucket() 528 stderr = self.RunGsUtil(['cp', '-', '%s' % suri(bucket_uri, 'foo')], 529 stdin='bar', return_stderr=True) 530 self.assertIn('Copying from <STDIN>', stderr) 531 key_uri = bucket_uri.clone_replace_name('foo') 532 self.assertEqual(key_uri.get_contents_as_string(), 'bar') 533 534 @unittest.skipIf(IS_WINDOWS, 'os.mkfifo not available on Windows.') 535 @SequentialAndParallelTransfer 536 def test_streaming_from_fifo_to_object(self): 537 bucket_uri = self.CreateBucket() 538 fifo_path = self.CreateTempFifo() 539 object_name = 'foo' 540 object_contents = 'bar' 541 list_for_output = [] 542 543 # Start writer in the background, which won't finish until a corresponding 544 # read operation is performed on the fifo. 545 write_thread = threading.Thread( 546 target=_WriteContentsToFifo, 547 args=(object_contents, fifo_path)) 548 write_thread.start() 549 # The fifo requires both a pending read and write before either operation 550 # will complete. Regardless of which operation occurs first, the 551 # corresponding subsequent operation will unblock the first one. 552 # We run gsutil in a thread so that it can timeout rather than hang forever 553 # if the write thread fails. 554 read_thread = threading.Thread( 555 target=self._CpWithFifoViaGsUtilAndAppendOutputToList, 556 args=((fifo_path,), suri(bucket_uri, object_name), list_for_output), 557 kwargs={'return_stderr': True}) 558 read_thread.start() 559 560 read_thread.join(120) 561 write_thread.join(120) 562 if not list_for_output: 563 self.fail('Reading/writing to the fifo timed out.') 564 self.assertIn('Copying from named pipe', list_for_output[0]) 565 566 key_uri = bucket_uri.clone_replace_name(object_name) 567 self.assertEqual(key_uri.get_contents_as_string(), object_contents) 568 569 @unittest.skipIf(IS_WINDOWS, 'os.mkfifo not available on Windows.') 570 @SequentialAndParallelTransfer 571 def test_streaming_from_fifo_to_stdout(self): 572 fifo_path = self.CreateTempFifo() 573 contents = 'bar' 574 list_for_output = [] 575 576 write_thread = threading.Thread( 577 target=_WriteContentsToFifo, 578 args=(contents, fifo_path)) 579 write_thread.start() 580 read_thread = threading.Thread( 581 target=self._CpWithFifoViaGsUtilAndAppendOutputToList, 582 args=((fifo_path,), '-', list_for_output), 583 kwargs={'return_stdout': True}) 584 read_thread.start() 585 read_thread.join(120) 586 write_thread.join(120) 587 if not list_for_output: 588 self.fail('Reading/writing to the fifo timed out.') 589 self.assertEqual(list_for_output[0].strip(), contents) 590 591 @unittest.skipIf(IS_WINDOWS, 'os.mkfifo not available on Windows.') 592 @SequentialAndParallelTransfer 593 def test_streaming_from_stdout_to_fifo(self): 594 fifo_path = self.CreateTempFifo() 595 contents = 'bar' 596 list_for_output = [] 597 list_for_gsutil_output = [] 598 599 read_thread = threading.Thread( 600 target=_ReadContentsFromFifo, 601 args=(fifo_path, list_for_output)) 602 read_thread.start() 603 write_thread = threading.Thread( 604 target=self._CpWithFifoViaGsUtilAndAppendOutputToList, 605 args=(('-',), fifo_path, list_for_gsutil_output), 606 kwargs={'return_stderr': True, 'stdin': contents}) 607 write_thread.start() 608 write_thread.join(120) 609 read_thread.join(120) 610 if not list_for_output: 611 self.fail('Reading/writing to the fifo timed out.') 612 self.assertEqual(list_for_output[0].strip(), contents) 613 614 def test_streaming_multiple_arguments(self): 615 bucket_uri = self.CreateBucket() 616 stderr = self.RunGsUtil(['cp', '-', '-', suri(bucket_uri)], 617 stdin='bar', return_stderr=True, expected_status=1) 618 self.assertIn('Multiple URL strings are not supported with streaming', 619 stderr) 620 621 # TODO: Implement a way to test both with and without using magic file. 622 623 @SequentialAndParallelTransfer 624 def test_detect_content_type(self): 625 """Tests local detection of content type.""" 626 bucket_uri = self.CreateBucket() 627 dsturi = suri(bucket_uri, 'foo') 628 629 self.RunGsUtil(['cp', self._get_test_file('test.mp3'), dsturi]) 630 631 # Use @Retry as hedge against bucket listing eventual consistency. 632 @Retry(AssertionError, tries=3, timeout_secs=1) 633 def _Check1(): 634 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) 635 if IS_WINDOWS: 636 self.assertTrue( 637 re.search(r'Content-Type:\s+audio/x-mpg', stdout) or 638 re.search(r'Content-Type:\s+audio/mpeg', stdout)) 639 else: 640 self.assertRegexpMatches(stdout, r'Content-Type:\s+audio/mpeg') 641 _Check1() 642 643 self.RunGsUtil(['cp', self._get_test_file('test.gif'), dsturi]) 644 645 # Use @Retry as hedge against bucket listing eventual consistency. 646 @Retry(AssertionError, tries=3, timeout_secs=1) 647 def _Check2(): 648 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) 649 self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif') 650 _Check2() 651 652 def test_content_type_override_default(self): 653 """Tests overriding content type with the default value.""" 654 bucket_uri = self.CreateBucket() 655 dsturi = suri(bucket_uri, 'foo') 656 657 self.RunGsUtil(['-h', 'Content-Type:', 'cp', 658 self._get_test_file('test.mp3'), dsturi]) 659 660 # Use @Retry as hedge against bucket listing eventual consistency. 661 @Retry(AssertionError, tries=3, timeout_secs=1) 662 def _Check1(): 663 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) 664 self.assertRegexpMatches(stdout, 665 r'Content-Type:\s+application/octet-stream') 666 _Check1() 667 668 self.RunGsUtil(['-h', 'Content-Type:', 'cp', 669 self._get_test_file('test.gif'), dsturi]) 670 671 # Use @Retry as hedge against bucket listing eventual consistency. 672 @Retry(AssertionError, tries=3, timeout_secs=1) 673 def _Check2(): 674 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) 675 self.assertRegexpMatches(stdout, 676 r'Content-Type:\s+application/octet-stream') 677 _Check2() 678 679 def test_content_type_override(self): 680 """Tests overriding content type with a value.""" 681 bucket_uri = self.CreateBucket() 682 dsturi = suri(bucket_uri, 'foo') 683 684 self.RunGsUtil(['-h', 'Content-Type:text/plain', 'cp', 685 self._get_test_file('test.mp3'), dsturi]) 686 687 # Use @Retry as hedge against bucket listing eventual consistency. 688 @Retry(AssertionError, tries=3, timeout_secs=1) 689 def _Check1(): 690 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) 691 self.assertRegexpMatches(stdout, r'Content-Type:\s+text/plain') 692 _Check1() 693 694 self.RunGsUtil(['-h', 'Content-Type:text/plain', 'cp', 695 self._get_test_file('test.gif'), dsturi]) 696 697 # Use @Retry as hedge against bucket listing eventual consistency. 698 @Retry(AssertionError, tries=3, timeout_secs=1) 699 def _Check2(): 700 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) 701 self.assertRegexpMatches(stdout, r'Content-Type:\s+text/plain') 702 _Check2() 703 704 @unittest.skipIf(IS_WINDOWS, 'magicfile is not available on Windows.') 705 @SequentialAndParallelTransfer 706 def test_magicfile_override(self): 707 """Tests content type override with magicfile value.""" 708 bucket_uri = self.CreateBucket() 709 dsturi = suri(bucket_uri, 'foo') 710 fpath = self.CreateTempFile(contents='foo/bar\n') 711 self.RunGsUtil(['cp', fpath, dsturi]) 712 713 # Use @Retry as hedge against bucket listing eventual consistency. 714 @Retry(AssertionError, tries=3, timeout_secs=1) 715 def _Check1(): 716 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) 717 use_magicfile = boto.config.getbool('GSUtil', 'use_magicfile', False) 718 content_type = ('text/plain' if use_magicfile 719 else 'application/octet-stream') 720 self.assertRegexpMatches(stdout, r'Content-Type:\s+%s' % content_type) 721 _Check1() 722 723 @SequentialAndParallelTransfer 724 def test_content_type_mismatches(self): 725 """Tests overriding content type when it does not match the file type.""" 726 bucket_uri = self.CreateBucket() 727 dsturi = suri(bucket_uri, 'foo') 728 fpath = self.CreateTempFile(contents='foo/bar\n') 729 730 self.RunGsUtil(['-h', 'Content-Type:image/gif', 'cp', 731 self._get_test_file('test.mp3'), dsturi]) 732 733 # Use @Retry as hedge against bucket listing eventual consistency. 734 @Retry(AssertionError, tries=3, timeout_secs=1) 735 def _Check1(): 736 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) 737 self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif') 738 _Check1() 739 740 self.RunGsUtil(['-h', 'Content-Type:image/gif', 'cp', 741 self._get_test_file('test.gif'), dsturi]) 742 743 # Use @Retry as hedge against bucket listing eventual consistency. 744 @Retry(AssertionError, tries=3, timeout_secs=1) 745 def _Check2(): 746 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) 747 self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif') 748 _Check2() 749 750 self.RunGsUtil(['-h', 'Content-Type:image/gif', 'cp', fpath, dsturi]) 751 752 # Use @Retry as hedge against bucket listing eventual consistency. 753 @Retry(AssertionError, tries=3, timeout_secs=1) 754 def _Check3(): 755 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) 756 self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif') 757 _Check3() 758 759 @SequentialAndParallelTransfer 760 def test_content_type_header_case_insensitive(self): 761 """Tests that content type header is treated with case insensitivity.""" 762 bucket_uri = self.CreateBucket() 763 dsturi = suri(bucket_uri, 'foo') 764 fpath = self._get_test_file('test.gif') 765 766 self.RunGsUtil(['-h', 'content-Type:text/plain', 'cp', 767 fpath, dsturi]) 768 769 # Use @Retry as hedge against bucket listing eventual consistency. 770 @Retry(AssertionError, tries=3, timeout_secs=1) 771 def _Check1(): 772 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) 773 self.assertRegexpMatches(stdout, r'Content-Type:\s+text/plain') 774 self.assertNotRegexpMatches(stdout, r'image/gif') 775 _Check1() 776 777 self.RunGsUtil(['-h', 'CONTENT-TYPE:image/gif', 778 '-h', 'content-type:image/gif', 779 'cp', fpath, dsturi]) 780 781 # Use @Retry as hedge against bucket listing eventual consistency. 782 @Retry(AssertionError, tries=3, timeout_secs=1) 783 def _Check2(): 784 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) 785 self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif') 786 self.assertNotRegexpMatches(stdout, r'image/gif,\s*image/gif') 787 _Check2() 788 789 @SequentialAndParallelTransfer 790 def test_other_headers(self): 791 """Tests that non-content-type headers are applied successfully on copy.""" 792 bucket_uri = self.CreateBucket() 793 dst_uri = suri(bucket_uri, 'foo') 794 fpath = self._get_test_file('test.gif') 795 796 self.RunGsUtil(['-h', 'Cache-Control:public,max-age=12', 797 '-h', 'x-%s-meta-1:abcd' % self.provider_custom_meta, 'cp', 798 fpath, dst_uri]) 799 800 stdout = self.RunGsUtil(['ls', '-L', dst_uri], return_stdout=True) 801 self.assertRegexpMatches(stdout, r'Cache-Control\s*:\s*public,max-age=12') 802 self.assertRegexpMatches(stdout, r'Metadata:\s*1:\s*abcd') 803 804 dst_uri2 = suri(bucket_uri, 'bar') 805 self.RunGsUtil(['cp', dst_uri, dst_uri2]) 806 # Ensure metadata was preserved across copy. 807 stdout = self.RunGsUtil(['ls', '-L', dst_uri2], return_stdout=True) 808 self.assertRegexpMatches(stdout, r'Cache-Control\s*:\s*public,max-age=12') 809 self.assertRegexpMatches(stdout, r'Metadata:\s*1:\s*abcd') 810 811 @SequentialAndParallelTransfer 812 def test_versioning(self): 813 """Tests copy with versioning.""" 814 bucket_uri = self.CreateVersionedBucket() 815 k1_uri = self.CreateObject(bucket_uri=bucket_uri, contents='data2') 816 k2_uri = self.CreateObject(bucket_uri=bucket_uri, contents='data1') 817 g1 = urigen(k2_uri) 818 self.RunGsUtil(['cp', suri(k1_uri), suri(k2_uri)]) 819 k2_uri = bucket_uri.clone_replace_name(k2_uri.object_name) 820 k2_uri = bucket_uri.clone_replace_key(k2_uri.get_key()) 821 g2 = urigen(k2_uri) 822 k2_uri.set_contents_from_string('data3') 823 g3 = urigen(k2_uri) 824 825 fpath = self.CreateTempFile() 826 # Check to make sure current version is data3. 827 self.RunGsUtil(['cp', k2_uri.versionless_uri, fpath]) 828 with open(fpath, 'r') as f: 829 self.assertEqual(f.read(), 'data3') 830 831 # Check contents of all three versions 832 self.RunGsUtil(['cp', '%s#%s' % (k2_uri.versionless_uri, g1), fpath]) 833 with open(fpath, 'r') as f: 834 self.assertEqual(f.read(), 'data1') 835 self.RunGsUtil(['cp', '%s#%s' % (k2_uri.versionless_uri, g2), fpath]) 836 with open(fpath, 'r') as f: 837 self.assertEqual(f.read(), 'data2') 838 self.RunGsUtil(['cp', '%s#%s' % (k2_uri.versionless_uri, g3), fpath]) 839 with open(fpath, 'r') as f: 840 self.assertEqual(f.read(), 'data3') 841 842 # Copy first version to current and verify. 843 self.RunGsUtil(['cp', '%s#%s' % (k2_uri.versionless_uri, g1), 844 k2_uri.versionless_uri]) 845 self.RunGsUtil(['cp', k2_uri.versionless_uri, fpath]) 846 with open(fpath, 'r') as f: 847 self.assertEqual(f.read(), 'data1') 848 849 # Attempt to specify a version-specific URI for destination. 850 stderr = self.RunGsUtil(['cp', fpath, k2_uri.uri], return_stderr=True, 851 expected_status=1) 852 self.assertIn('cannot be the destination for gsutil cp', stderr) 853 854 def test_versioning_no_parallelism(self): 855 """Tests that copy all-versions errors when parallelism is enabled.""" 856 stderr = self.RunGsUtil( 857 ['-m', 'cp', '-A', suri(self.nonexistent_bucket_name, 'foo'), 858 suri(self.nonexistent_bucket_name, 'bar')], 859 expected_status=1, return_stderr=True) 860 self.assertIn('-m option is not supported with the cp -A flag', stderr) 861 862 @SkipForS3('S3 lists versioned objects in reverse timestamp order.') 863 def test_recursive_copying_versioned_bucket(self): 864 """Tests cp -R with versioned buckets.""" 865 bucket1_uri = self.CreateVersionedBucket() 866 bucket2_uri = self.CreateVersionedBucket() 867 bucket3_uri = self.CreateVersionedBucket() 868 869 # Write two versions of an object to the bucket1. 870 v1_uri = self.CreateObject(bucket_uri=bucket1_uri, object_name='k', 871 contents='data0') 872 self.CreateObject(bucket_uri=bucket1_uri, object_name='k', 873 contents='longer_data1', 874 gs_idempotent_generation=urigen(v1_uri)) 875 876 self.AssertNObjectsInBucket(bucket1_uri, 2, versioned=True) 877 self.AssertNObjectsInBucket(bucket2_uri, 0, versioned=True) 878 self.AssertNObjectsInBucket(bucket3_uri, 0, versioned=True) 879 880 # Recursively copy to second versioned bucket. 881 # -A flag should copy all versions in order. 882 self.RunGsUtil(['cp', '-R', '-A', suri(bucket1_uri, '*'), 883 suri(bucket2_uri)]) 884 885 # Use @Retry as hedge against bucket listing eventual consistency. 886 @Retry(AssertionError, tries=3, timeout_secs=1) 887 def _Check2(): 888 """Validates the results of the cp -R.""" 889 listing1 = self.RunGsUtil(['ls', '-la', suri(bucket1_uri)], 890 return_stdout=True).split('\n') 891 listing2 = self.RunGsUtil(['ls', '-la', suri(bucket2_uri)], 892 return_stdout=True).split('\n') 893 # 2 lines of listing output, 1 summary line, 1 empty line from \n split. 894 self.assertEquals(len(listing1), 4) 895 self.assertEquals(len(listing2), 4) 896 897 # First object in each bucket should match in size and version-less name. 898 size1, _, uri_str1, _ = listing1[0].split() 899 self.assertEquals(size1, str(len('data0'))) 900 self.assertEquals(storage_uri(uri_str1).object_name, 'k') 901 size2, _, uri_str2, _ = listing2[0].split() 902 self.assertEquals(size2, str(len('data0'))) 903 self.assertEquals(storage_uri(uri_str2).object_name, 'k') 904 905 # Similarly for second object in each bucket. 906 size1, _, uri_str1, _ = listing1[1].split() 907 self.assertEquals(size1, str(len('longer_data1'))) 908 self.assertEquals(storage_uri(uri_str1).object_name, 'k') 909 size2, _, uri_str2, _ = listing2[1].split() 910 self.assertEquals(size2, str(len('longer_data1'))) 911 self.assertEquals(storage_uri(uri_str2).object_name, 'k') 912 _Check2() 913 914 # Recursively copy to second versioned bucket with no -A flag. 915 # This should copy only the live object. 916 self.RunGsUtil(['cp', '-R', suri(bucket1_uri, '*'), 917 suri(bucket3_uri)]) 918 919 # Use @Retry as hedge against bucket listing eventual consistency. 920 @Retry(AssertionError, tries=3, timeout_secs=1) 921 def _Check3(): 922 """Validates the results of the cp -R.""" 923 listing1 = self.RunGsUtil(['ls', '-la', suri(bucket1_uri)], 924 return_stdout=True).split('\n') 925 listing2 = self.RunGsUtil(['ls', '-la', suri(bucket3_uri)], 926 return_stdout=True).split('\n') 927 # 2 lines of listing output, 1 summary line, 1 empty line from \n split. 928 self.assertEquals(len(listing1), 4) 929 # 1 lines of listing output, 1 summary line, 1 empty line from \n split. 930 self.assertEquals(len(listing2), 3) 931 932 # Live (second) object in bucket 1 should match the single live object. 933 size1, _, uri_str1, _ = listing2[0].split() 934 self.assertEquals(size1, str(len('longer_data1'))) 935 self.assertEquals(storage_uri(uri_str1).object_name, 'k') 936 _Check3() 937 938 @SequentialAndParallelTransfer 939 @SkipForS3('Preconditions not supported for S3.') 940 def test_cp_generation_zero_match(self): 941 """Tests that cp handles an object-not-exists precondition header.""" 942 bucket_uri = self.CreateBucket() 943 fpath1 = self.CreateTempFile(contents='data1') 944 # Match 0 means only write the object if it doesn't already exist. 945 gen_match_header = 'x-goog-if-generation-match:0' 946 947 # First copy should succeed. 948 # TODO: This can fail (rarely) if the server returns a 5xx but actually 949 # commits the bytes. If we add restarts on small uploads, handle this 950 # case. 951 self.RunGsUtil(['-h', gen_match_header, 'cp', fpath1, suri(bucket_uri)]) 952 953 # Second copy should fail with a precondition error. 954 stderr = self.RunGsUtil(['-h', gen_match_header, 'cp', fpath1, 955 suri(bucket_uri)], 956 return_stderr=True, expected_status=1) 957 self.assertIn('PreconditionException', stderr) 958 959 @SequentialAndParallelTransfer 960 @SkipForS3('Preconditions not supported for S3.') 961 def test_cp_v_generation_match(self): 962 """Tests that cp -v option handles the if-generation-match header.""" 963 bucket_uri = self.CreateVersionedBucket() 964 k1_uri = self.CreateObject(bucket_uri=bucket_uri, contents='data1') 965 g1 = k1_uri.generation 966 967 tmpdir = self.CreateTempDir() 968 fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents='data2') 969 970 gen_match_header = 'x-goog-if-generation-match:%s' % g1 971 # First copy should succeed. 972 self.RunGsUtil(['-h', gen_match_header, 'cp', fpath1, suri(k1_uri)]) 973 974 # Second copy should fail the precondition. 975 stderr = self.RunGsUtil(['-h', gen_match_header, 'cp', fpath1, 976 suri(k1_uri)], 977 return_stderr=True, expected_status=1) 978 979 self.assertIn('PreconditionException', stderr) 980 981 # Specifiying a generation with -n should fail before the request hits the 982 # server. 983 stderr = self.RunGsUtil(['-h', gen_match_header, 'cp', '-n', fpath1, 984 suri(k1_uri)], 985 return_stderr=True, expected_status=1) 986 987 self.assertIn('ArgumentException', stderr) 988 self.assertIn('Specifying x-goog-if-generation-match is not supported ' 989 'with cp -n', stderr) 990 991 @SequentialAndParallelTransfer 992 def test_cp_nv(self): 993 """Tests that cp -nv works when skipping existing file.""" 994 bucket_uri = self.CreateVersionedBucket() 995 k1_uri = self.CreateObject(bucket_uri=bucket_uri, contents='data1') 996 997 tmpdir = self.CreateTempDir() 998 fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents='data2') 999 1000 # First copy should succeed. 1001 self.RunGsUtil(['cp', '-nv', fpath1, suri(k1_uri)]) 1002 1003 # Second copy should skip copying. 1004 stderr = self.RunGsUtil(['cp', '-nv', fpath1, suri(k1_uri)], 1005 return_stderr=True) 1006 self.assertIn('Skipping existing item:', stderr) 1007 1008 @SequentialAndParallelTransfer 1009 @SkipForS3('S3 lists versioned objects in reverse timestamp order.') 1010 def test_cp_v_option(self): 1011 """"Tests that cp -v returns the created object's version-specific URI.""" 1012 bucket_uri = self.CreateVersionedBucket() 1013 k1_uri = self.CreateObject(bucket_uri=bucket_uri, contents='data1') 1014 k2_uri = self.CreateObject(bucket_uri=bucket_uri, contents='data2') 1015 1016 # Case 1: Upload file to object using one-shot PUT. 1017 tmpdir = self.CreateTempDir() 1018 fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents='data1') 1019 self._run_cp_minus_v_test('-v', fpath1, k2_uri.uri) 1020 1021 # Case 2: Upload file to object using resumable upload. 1022 size_threshold = ONE_KIB 1023 boto_config_for_test = ('GSUtil', 'resumable_threshold', 1024 str(size_threshold)) 1025 with SetBotoConfigForTest([boto_config_for_test]): 1026 file_as_string = os.urandom(size_threshold) 1027 tmpdir = self.CreateTempDir() 1028 fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents=file_as_string) 1029 self._run_cp_minus_v_test('-v', fpath1, k2_uri.uri) 1030 1031 # Case 3: Upload stream to object. 1032 self._run_cp_minus_v_test('-v', '-', k2_uri.uri) 1033 1034 # Case 4: Download object to file. For this case we just expect output of 1035 # gsutil cp -v to be the URI of the file. 1036 tmpdir = self.CreateTempDir() 1037 fpath1 = self.CreateTempFile(tmpdir=tmpdir) 1038 dst_uri = storage_uri(fpath1) 1039 stderr = self.RunGsUtil(['cp', '-v', suri(k1_uri), suri(dst_uri)], 1040 return_stderr=True) 1041 # TODO: Add ordering assertion (should be in stderr.split('\n)[-2]) back 1042 # once both the creation and status messages are handled by the UI thread. 1043 self.assertIn('Created: %s\n' % dst_uri.uri, stderr) 1044 1045 # Case 5: Daisy-chain from object to object. 1046 self._run_cp_minus_v_test('-Dv', k1_uri.uri, k2_uri.uri) 1047 1048 # Case 6: Copy object to object in-the-cloud. 1049 self._run_cp_minus_v_test('-v', k1_uri.uri, k2_uri.uri) 1050 1051 def _run_cp_minus_v_test(self, opt, src_str, dst_str): 1052 """Runs cp -v with the options and validates the results.""" 1053 stderr = self.RunGsUtil(['cp', opt, src_str, dst_str], return_stderr=True) 1054 match = re.search(r'Created: (.*)\n', stderr) 1055 self.assertIsNotNone(match) 1056 created_uri = match.group(1) 1057 1058 # Use @Retry as hedge against bucket listing eventual consistency. 1059 @Retry(AssertionError, tries=3, timeout_secs=1) 1060 def _Check1(): 1061 stdout = self.RunGsUtil(['ls', '-a', dst_str], return_stdout=True) 1062 lines = stdout.split('\n') 1063 # Final (most recent) object should match the "Created:" URI. This is 1064 # in second-to-last line (last line is '\n'). 1065 self.assertGreater(len(lines), 2) 1066 self.assertEqual(created_uri, lines[-2]) 1067 _Check1() 1068 1069 @SequentialAndParallelTransfer 1070 def test_stdin_args(self): 1071 """Tests cp with the -I option.""" 1072 tmpdir = self.CreateTempDir() 1073 fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents='data1') 1074 fpath2 = self.CreateTempFile(tmpdir=tmpdir, contents='data2') 1075 bucket_uri = self.CreateBucket() 1076 self.RunGsUtil(['cp', '-I', suri(bucket_uri)], 1077 stdin='\n'.join((fpath1, fpath2))) 1078 1079 # Use @Retry as hedge against bucket listing eventual consistency. 1080 @Retry(AssertionError, tries=3, timeout_secs=1) 1081 def _Check1(): 1082 stdout = self.RunGsUtil(['ls', suri(bucket_uri)], return_stdout=True) 1083 self.assertIn(os.path.basename(fpath1), stdout) 1084 self.assertIn(os.path.basename(fpath2), stdout) 1085 self.assertNumLines(stdout, 2) 1086 _Check1() 1087 1088 def test_cross_storage_class_cloud_cp(self): 1089 bucket1_uri = self.CreateBucket(storage_class='standard') 1090 bucket2_uri = self.CreateBucket( 1091 storage_class='durable_reduced_availability') 1092 key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents='foo') 1093 # Server now allows copy-in-the-cloud across storage classes. 1094 self.RunGsUtil(['cp', suri(key_uri), suri(bucket2_uri)]) 1095 1096 @unittest.skipUnless(HAS_S3_CREDS, 'Test requires both S3 and GS credentials') 1097 def test_cross_provider_cp(self): 1098 s3_bucket = self.CreateBucket(provider='s3') 1099 gs_bucket = self.CreateBucket(provider='gs') 1100 s3_key = self.CreateObject(bucket_uri=s3_bucket, contents='foo') 1101 gs_key = self.CreateObject(bucket_uri=gs_bucket, contents='bar') 1102 self.RunGsUtil(['cp', suri(s3_key), suri(gs_bucket)]) 1103 self.RunGsUtil(['cp', suri(gs_key), suri(s3_bucket)]) 1104 1105 @unittest.skipUnless(HAS_S3_CREDS, 'Test requires both S3 and GS credentials') 1106 @unittest.skip('This test performs a large copy but remains here for ' 1107 'debugging purposes.') 1108 def test_cross_provider_large_cp(self): 1109 s3_bucket = self.CreateBucket(provider='s3') 1110 gs_bucket = self.CreateBucket(provider='gs') 1111 s3_key = self.CreateObject(bucket_uri=s3_bucket, contents='f'*1024*1024) 1112 gs_key = self.CreateObject(bucket_uri=gs_bucket, contents='b'*1024*1024) 1113 self.RunGsUtil(['cp', suri(s3_key), suri(gs_bucket)]) 1114 self.RunGsUtil(['cp', suri(gs_key), suri(s3_bucket)]) 1115 with SetBotoConfigForTest([ 1116 ('GSUtil', 'resumable_threshold', str(ONE_KIB)), 1117 ('GSUtil', 'json_resumable_chunk_size', str(ONE_KIB * 256))]): 1118 # Ensure copy also works across json upload chunk boundaries. 1119 self.RunGsUtil(['cp', suri(s3_key), suri(gs_bucket)]) 1120 1121 @unittest.skip('This test is slow due to creating many objects, ' 1122 'but remains here for debugging purposes.') 1123 def test_daisy_chain_cp_file_sizes(self): 1124 """Ensure daisy chain cp works with a wide of file sizes.""" 1125 bucket_uri = self.CreateBucket() 1126 bucket2_uri = self.CreateBucket() 1127 exponent_cap = 28 # Up to 256 MiB in size. 1128 for i in range(exponent_cap): 1129 one_byte_smaller = 2**i - 1 1130 normal = 2**i 1131 one_byte_larger = 2**i + 1 1132 self.CreateObject(bucket_uri=bucket_uri, contents='a'*one_byte_smaller) 1133 self.CreateObject(bucket_uri=bucket_uri, contents='b'*normal) 1134 self.CreateObject(bucket_uri=bucket_uri, contents='c'*one_byte_larger) 1135 1136 self.AssertNObjectsInBucket(bucket_uri, exponent_cap*3) 1137 self.RunGsUtil(['-m', 'cp', '-D', suri(bucket_uri, '**'), 1138 suri(bucket2_uri)]) 1139 1140 self.AssertNObjectsInBucket(bucket2_uri, exponent_cap*3) 1141 1142 def test_daisy_chain_cp(self): 1143 """Tests cp with the -D option.""" 1144 bucket1_uri = self.CreateBucket(storage_class='standard') 1145 bucket2_uri = self.CreateBucket( 1146 storage_class='durable_reduced_availability') 1147 key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents='foo') 1148 # Set some headers on source object so we can verify that headers are 1149 # presereved by daisy-chain copy. 1150 self.RunGsUtil(['setmeta', '-h', 'Cache-Control:public,max-age=12', 1151 '-h', 'Content-Type:image/gif', 1152 '-h', 'x-%s-meta-1:abcd' % self.provider_custom_meta, 1153 suri(key_uri)]) 1154 # Set public-read (non-default) ACL so we can verify that cp -D -p works. 1155 self.RunGsUtil(['acl', 'set', 'public-read', suri(key_uri)]) 1156 acl_json = self.RunGsUtil(['acl', 'get', suri(key_uri)], return_stdout=True) 1157 # Perform daisy-chain copy and verify that source object headers and ACL 1158 # were preserved. Also specify -n option to test that gsutil correctly 1159 # removes the x-goog-if-generation-match:0 header that was set at uploading 1160 # time when updating the ACL. 1161 stderr = self.RunGsUtil(['cp', '-Dpn', suri(key_uri), suri(bucket2_uri)], 1162 return_stderr=True) 1163 self.assertNotIn('Copy-in-the-cloud disallowed', stderr) 1164 1165 @Retry(AssertionError, tries=3, timeout_secs=1) 1166 def _Check(): 1167 uri = suri(bucket2_uri, key_uri.object_name) 1168 stdout = self.RunGsUtil(['ls', '-L', uri], return_stdout=True) 1169 self.assertRegexpMatches(stdout, r'Cache-Control:\s+public,max-age=12') 1170 self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif') 1171 self.assertRegexpMatches(stdout, r'Metadata:\s+1:\s+abcd') 1172 new_acl_json = self.RunGsUtil(['acl', 'get', uri], return_stdout=True) 1173 self.assertEqual(acl_json, new_acl_json) 1174 _Check() 1175 1176 @unittest.skipUnless( 1177 not HAS_GS_PORT, 'gs_port is defined in config which can cause ' 1178 'problems when uploading and downloading to the same local host port') 1179 def test_daisy_chain_cp_download_failure(self): 1180 """Tests cp with the -D option when the download thread dies.""" 1181 bucket1_uri = self.CreateBucket() 1182 bucket2_uri = self.CreateBucket() 1183 key_uri = self.CreateObject(bucket_uri=bucket1_uri, 1184 contents='a' * self.halt_size) 1185 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) 1186 test_callback_file = self.CreateTempFile( 1187 contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5))) 1188 with SetBotoConfigForTest([boto_config_for_test]): 1189 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, 1190 '-D', suri(key_uri), suri(bucket2_uri)], 1191 expected_status=1, return_stderr=True) 1192 # Should have three exception traces; one from the download thread and 1193 # two from the upload thread (expection message is repeated in main's 1194 # _OutputAndExit). 1195 self.assertEqual(stderr.count( 1196 'ResumableDownloadException: Artifically halting download'), 3) 1197 1198 def test_streaming_gzip_upload(self): 1199 """Tests error when compression flag is requested on a streaming source.""" 1200 bucket_uri = self.CreateBucket() 1201 stderr = self.RunGsUtil(['cp', '-Z', '-', suri(bucket_uri, 'foo')], 1202 return_stderr=True, expected_status=1, 1203 stdin='streaming data') 1204 self.assertIn( 1205 'gzip compression is not currently supported on streaming uploads', 1206 stderr) 1207 1208 def test_seek_ahead_upload_cp(self): 1209 """Tests that the seek-ahead iterator estimates total upload work.""" 1210 tmpdir = self.CreateTempDir(test_files=3) 1211 bucket_uri = self.CreateBucket() 1212 1213 with SetBotoConfigForTest([('GSUtil', 'task_estimation_threshold', '1'), 1214 ('GSUtil', 'task_estimation_force', 'True')]): 1215 stderr = self.RunGsUtil(['-m', 'cp', '-r', tmpdir, suri(bucket_uri)], 1216 return_stderr=True) 1217 self.assertIn( 1218 'Estimated work for this command: objects: 3, total size: 18', 1219 stderr) 1220 1221 with SetBotoConfigForTest([('GSUtil', 'task_estimation_threshold', '0'), 1222 ('GSUtil', 'task_estimation_force', 'True')]): 1223 stderr = self.RunGsUtil(['-m', 'cp', '-r', tmpdir, suri(bucket_uri)], 1224 return_stderr=True) 1225 self.assertNotIn('Estimated work', stderr) 1226 1227 def test_seek_ahead_download_cp(self): 1228 tmpdir = self.CreateTempDir() 1229 bucket_uri = self.CreateBucket(test_objects=3) 1230 self.AssertNObjectsInBucket(bucket_uri, 3) 1231 1232 with SetBotoConfigForTest([('GSUtil', 'task_estimation_threshold', '1'), 1233 ('GSUtil', 'task_estimation_force', 'True')]): 1234 stderr = self.RunGsUtil(['-m', 'cp', '-r', suri(bucket_uri), tmpdir], 1235 return_stderr=True) 1236 self.assertIn( 1237 'Estimated work for this command: objects: 3, total size: 18', 1238 stderr) 1239 1240 with SetBotoConfigForTest([('GSUtil', 'task_estimation_threshold', '0'), 1241 ('GSUtil', 'task_estimation_force', 'True')]): 1242 stderr = self.RunGsUtil(['-m', 'cp', '-r', suri(bucket_uri), tmpdir], 1243 return_stderr=True) 1244 self.assertNotIn('Estimated work', stderr) 1245 1246 def test_canned_acl_cp(self): 1247 """Tests copying with a canned ACL.""" 1248 bucket1_uri = self.CreateBucket() 1249 bucket2_uri = self.CreateBucket() 1250 key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents='foo') 1251 self.RunGsUtil(['cp', '-a', 'public-read', suri(key_uri), 1252 suri(bucket2_uri)]) 1253 # Set public-read on the original key after the copy so we can compare 1254 # the ACLs. 1255 self.RunGsUtil(['acl', 'set', 'public-read', suri(key_uri)]) 1256 public_read_acl = self.RunGsUtil(['acl', 'get', suri(key_uri)], 1257 return_stdout=True) 1258 1259 @Retry(AssertionError, tries=3, timeout_secs=1) 1260 def _Check(): 1261 uri = suri(bucket2_uri, key_uri.object_name) 1262 new_acl_json = self.RunGsUtil(['acl', 'get', uri], return_stdout=True) 1263 self.assertEqual(public_read_acl, new_acl_json) 1264 _Check() 1265 1266 @SequentialAndParallelTransfer 1267 def test_canned_acl_upload(self): 1268 """Tests uploading a file with a canned ACL.""" 1269 bucket1_uri = self.CreateBucket() 1270 key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents='foo') 1271 # Set public-read on the object so we can compare the ACLs. 1272 self.RunGsUtil(['acl', 'set', 'public-read', suri(key_uri)]) 1273 public_read_acl = self.RunGsUtil(['acl', 'get', suri(key_uri)], 1274 return_stdout=True) 1275 1276 file_name = 'bar' 1277 fpath = self.CreateTempFile(file_name=file_name, contents='foo') 1278 self.RunGsUtil(['cp', '-a', 'public-read', fpath, suri(bucket1_uri)]) 1279 new_acl_json = self.RunGsUtil(['acl', 'get', suri(bucket1_uri, file_name)], 1280 return_stdout=True) 1281 self.assertEqual(public_read_acl, new_acl_json) 1282 1283 resumable_size = ONE_KIB 1284 boto_config_for_test = ('GSUtil', 'resumable_threshold', 1285 str(resumable_size)) 1286 with SetBotoConfigForTest([boto_config_for_test]): 1287 resumable_file_name = 'resumable_bar' 1288 resumable_contents = os.urandom(resumable_size) 1289 resumable_fpath = self.CreateTempFile( 1290 file_name=resumable_file_name, contents=resumable_contents) 1291 self.RunGsUtil(['cp', '-a', 'public-read', resumable_fpath, 1292 suri(bucket1_uri)]) 1293 new_resumable_acl_json = self.RunGsUtil( 1294 ['acl', 'get', suri(bucket1_uri, resumable_file_name)], 1295 return_stdout=True) 1296 self.assertEqual(public_read_acl, new_resumable_acl_json) 1297 1298 def test_cp_key_to_local_stream(self): 1299 bucket_uri = self.CreateBucket() 1300 contents = 'foo' 1301 key_uri = self.CreateObject(bucket_uri=bucket_uri, contents=contents) 1302 stdout = self.RunGsUtil(['cp', suri(key_uri), '-'], return_stdout=True) 1303 self.assertIn(contents, stdout) 1304 1305 def test_cp_local_file_to_local_stream(self): 1306 contents = 'content' 1307 fpath = self.CreateTempFile(contents=contents) 1308 stdout = self.RunGsUtil(['cp', fpath, '-'], return_stdout=True) 1309 self.assertIn(contents, stdout) 1310 1311 @SequentialAndParallelTransfer 1312 def test_cp_zero_byte_file(self): 1313 dst_bucket_uri = self.CreateBucket() 1314 src_dir = self.CreateTempDir() 1315 fpath = os.path.join(src_dir, 'zero_byte') 1316 with open(fpath, 'w') as unused_out_file: 1317 pass # Write a zero byte file 1318 self.RunGsUtil(['cp', fpath, suri(dst_bucket_uri)]) 1319 1320 @Retry(AssertionError, tries=3, timeout_secs=1) 1321 def _Check1(): 1322 stdout = self.RunGsUtil(['ls', suri(dst_bucket_uri)], return_stdout=True) 1323 self.assertIn(os.path.basename(fpath), stdout) 1324 _Check1() 1325 1326 download_path = os.path.join(src_dir, 'zero_byte_download') 1327 self.RunGsUtil(['cp', suri(dst_bucket_uri, 'zero_byte'), download_path]) 1328 self.assertTrue(os.stat(download_path)) 1329 1330 def test_copy_bucket_to_bucket(self): 1331 """Tests recursively copying from bucket to bucket. 1332 1333 This should produce identically named objects (and not, in particular, 1334 destination objects named by the version-specific URI from source objects). 1335 """ 1336 src_bucket_uri = self.CreateVersionedBucket() 1337 dst_bucket_uri = self.CreateVersionedBucket() 1338 self.CreateObject(bucket_uri=src_bucket_uri, object_name='obj0', 1339 contents='abc') 1340 self.CreateObject(bucket_uri=src_bucket_uri, object_name='obj1', 1341 contents='def') 1342 1343 # Use @Retry as hedge against bucket listing eventual consistency. 1344 @Retry(AssertionError, tries=3, timeout_secs=1) 1345 def _CopyAndCheck(): 1346 self.RunGsUtil(['cp', '-R', suri(src_bucket_uri), 1347 suri(dst_bucket_uri)]) 1348 stdout = self.RunGsUtil(['ls', '-R', dst_bucket_uri.uri], 1349 return_stdout=True) 1350 self.assertIn('%s%s/obj0\n' % (dst_bucket_uri, 1351 src_bucket_uri.bucket_name), stdout) 1352 self.assertIn('%s%s/obj1\n' % (dst_bucket_uri, 1353 src_bucket_uri.bucket_name), stdout) 1354 _CopyAndCheck() 1355 1356 def test_copy_bucket_to_dir(self): 1357 """Tests recursively copying from bucket to a directory. 1358 1359 This should produce identically named objects (and not, in particular, 1360 destination objects named by the version- specific URI from source objects). 1361 """ 1362 src_bucket_uri = self.CreateBucket() 1363 dst_dir = self.CreateTempDir() 1364 self.CreateObject(bucket_uri=src_bucket_uri, object_name='obj0', 1365 contents='abc') 1366 self.CreateObject(bucket_uri=src_bucket_uri, object_name='obj1', 1367 contents='def') 1368 1369 # Use @Retry as hedge against bucket listing eventual consistency. 1370 @Retry(AssertionError, tries=3, timeout_secs=1) 1371 def _CopyAndCheck(): 1372 """Copies the bucket recursively and validates the results.""" 1373 self.RunGsUtil(['cp', '-R', suri(src_bucket_uri), dst_dir]) 1374 dir_list = [] 1375 for dirname, _, filenames in os.walk(dst_dir): 1376 for filename in filenames: 1377 dir_list.append(os.path.join(dirname, filename)) 1378 dir_list = sorted(dir_list) 1379 self.assertEqual(len(dir_list), 2) 1380 self.assertEqual(os.path.join(dst_dir, src_bucket_uri.bucket_name, 1381 'obj0'), dir_list[0]) 1382 self.assertEqual(os.path.join(dst_dir, src_bucket_uri.bucket_name, 1383 'obj1'), dir_list[1]) 1384 _CopyAndCheck() 1385 1386 def test_recursive_download_with_leftover_dir_placeholder(self): 1387 """Tests that we correctly handle leftover dir placeholders.""" 1388 src_bucket_uri = self.CreateBucket() 1389 dst_dir = self.CreateTempDir() 1390 self.CreateObject(bucket_uri=src_bucket_uri, object_name='obj0', 1391 contents='abc') 1392 self.CreateObject(bucket_uri=src_bucket_uri, object_name='obj1', 1393 contents='def') 1394 1395 # Create a placeholder like what can be left over by web GUI tools. 1396 key_uri = src_bucket_uri.clone_replace_name('/') 1397 key_uri.set_contents_from_string('') 1398 self.AssertNObjectsInBucket(src_bucket_uri, 3) 1399 1400 self.RunGsUtil(['cp', '-R', suri(src_bucket_uri), dst_dir]) 1401 dir_list = [] 1402 for dirname, _, filenames in os.walk(dst_dir): 1403 for filename in filenames: 1404 dir_list.append(os.path.join(dirname, filename)) 1405 dir_list = sorted(dir_list) 1406 self.assertEqual(len(dir_list), 2) 1407 self.assertEqual(os.path.join(dst_dir, src_bucket_uri.bucket_name, 1408 'obj0'), dir_list[0]) 1409 self.assertEqual(os.path.join(dst_dir, src_bucket_uri.bucket_name, 1410 'obj1'), dir_list[1]) 1411 1412 def test_copy_quiet(self): 1413 bucket_uri = self.CreateBucket() 1414 key_uri = self.CreateObject(bucket_uri=bucket_uri, contents='foo') 1415 stderr = self.RunGsUtil(['-q', 'cp', suri(key_uri), 1416 suri(bucket_uri.clone_replace_name('o2'))], 1417 return_stderr=True) 1418 self.assertEqual(stderr.count('Copying '), 0) 1419 1420 def test_cp_md5_match(self): 1421 """Tests that the uploaded object has the expected MD5. 1422 1423 Note that while this does perform a file to object upload, MD5's are 1424 not supported for composite objects so we don't use the decorator in this 1425 case. 1426 """ 1427 bucket_uri = self.CreateBucket() 1428 fpath = self.CreateTempFile(contents='bar') 1429 with open(fpath, 'r') as f_in: 1430 file_md5 = base64.encodestring(binascii.unhexlify( 1431 CalculateMd5FromContents(f_in))).rstrip('\n') 1432 self.RunGsUtil(['cp', fpath, suri(bucket_uri)]) 1433 1434 # Use @Retry as hedge against bucket listing eventual consistency. 1435 @Retry(AssertionError, tries=3, timeout_secs=1) 1436 def _Check1(): 1437 stdout = self.RunGsUtil(['ls', '-L', suri(bucket_uri)], 1438 return_stdout=True) 1439 self.assertRegexpMatches(stdout, 1440 r'Hash\s+\(md5\):\s+%s' % re.escape(file_md5)) 1441 _Check1() 1442 1443 @unittest.skipIf(IS_WINDOWS, 1444 'Unicode handling on Windows requires mods to site-packages') 1445 @SequentialAndParallelTransfer 1446 def test_cp_manifest_upload_unicode(self): 1447 return self._ManifestUpload('foo-unicöde', 'bar-unicöde', 1448 'manifest-unicöde') 1449 1450 @SequentialAndParallelTransfer 1451 def test_cp_manifest_upload(self): 1452 """Tests uploading with a mnifest file.""" 1453 return self._ManifestUpload('foo', 'bar', 'manifest') 1454 1455 def _ManifestUpload(self, file_name, object_name, manifest_name): 1456 """Tests uploading with a manifest file.""" 1457 bucket_uri = self.CreateBucket() 1458 dsturi = suri(bucket_uri, object_name) 1459 1460 fpath = self.CreateTempFile(file_name=file_name, contents='bar') 1461 logpath = self.CreateTempFile(file_name=manifest_name, contents='') 1462 # Ensure the file is empty. 1463 open(logpath, 'w').close() 1464 self.RunGsUtil(['cp', '-L', logpath, fpath, dsturi]) 1465 with open(logpath, 'r') as f: 1466 lines = f.readlines() 1467 self.assertEqual(len(lines), 2) 1468 1469 expected_headers = ['Source', 'Destination', 'Start', 'End', 'Md5', 1470 'UploadId', 'Source Size', 'Bytes Transferred', 1471 'Result', 'Description'] 1472 self.assertEqual(expected_headers, lines[0].strip().split(',')) 1473 results = lines[1].strip().split(',') 1474 self.assertEqual(results[0][:7], 'file://') # source 1475 self.assertEqual(results[1][:5], '%s://' % 1476 self.default_provider) # destination 1477 date_format = '%Y-%m-%dT%H:%M:%S.%fZ' 1478 start_date = datetime.datetime.strptime(results[2], date_format) 1479 end_date = datetime.datetime.strptime(results[3], date_format) 1480 self.assertEqual(end_date > start_date, True) 1481 if self.RunGsUtil == testcase.GsUtilIntegrationTestCase.RunGsUtil: 1482 # Check that we didn't do automatic parallel uploads - compose doesn't 1483 # calculate the MD5 hash. Since RunGsUtil is overriden in 1484 # TestCpParallelUploads to force parallel uploads, we can check which 1485 # method was used. 1486 self.assertEqual(results[4], 'rL0Y20zC+Fzt72VPzMSk2A==') # md5 1487 self.assertEqual(int(results[6]), 3) # Source Size 1488 self.assertEqual(int(results[7]), 3) # Bytes Transferred 1489 self.assertEqual(results[8], 'OK') # Result 1490 1491 @SequentialAndParallelTransfer 1492 def test_cp_manifest_download(self): 1493 """Tests downloading with a manifest file.""" 1494 key_uri = self.CreateObject(contents='foo') 1495 fpath = self.CreateTempFile(contents='') 1496 logpath = self.CreateTempFile(contents='') 1497 # Ensure the file is empty. 1498 open(logpath, 'w').close() 1499 self.RunGsUtil(['cp', '-L', logpath, suri(key_uri), fpath], 1500 return_stdout=True) 1501 with open(logpath, 'r') as f: 1502 lines = f.readlines() 1503 self.assertEqual(len(lines), 2) 1504 1505 expected_headers = ['Source', 'Destination', 'Start', 'End', 'Md5', 1506 'UploadId', 'Source Size', 'Bytes Transferred', 1507 'Result', 'Description'] 1508 self.assertEqual(expected_headers, lines[0].strip().split(',')) 1509 results = lines[1].strip().split(',') 1510 self.assertEqual(results[0][:5], '%s://' % 1511 self.default_provider) # source 1512 self.assertEqual(results[1][:7], 'file://') # destination 1513 date_format = '%Y-%m-%dT%H:%M:%S.%fZ' 1514 start_date = datetime.datetime.strptime(results[2], date_format) 1515 end_date = datetime.datetime.strptime(results[3], date_format) 1516 self.assertEqual(end_date > start_date, True) 1517 self.assertEqual(int(results[6]), 3) # Source Size 1518 # Bytes transferred might be more than 3 if the file was gzipped, since 1519 # the minimum gzip header is 10 bytes. 1520 self.assertGreaterEqual(int(results[7]), 3) # Bytes Transferred 1521 self.assertEqual(results[8], 'OK') # Result 1522 1523 @SequentialAndParallelTransfer 1524 def test_copy_unicode_non_ascii_filename(self): 1525 key_uri = self.CreateObject() 1526 # Try with and without resumable upload threshold, to ensure that each 1527 # scenario works. In particular, resumable uploads have tracker filename 1528 # logic. 1529 file_contents = 'x' * START_CALLBACK_PER_BYTES * 2 1530 fpath = self.CreateTempFile(file_name=u'Аудиоархив', 1531 contents=file_contents) 1532 with SetBotoConfigForTest([('GSUtil', 'resumable_threshold', '1')]): 1533 fpath_bytes = fpath.encode(UTF8) 1534 self.RunGsUtil(['cp', fpath_bytes, suri(key_uri)], return_stderr=True) 1535 stdout = self.RunGsUtil(['cat', suri(key_uri)], return_stdout=True) 1536 self.assertEquals(stdout, file_contents) 1537 with SetBotoConfigForTest([('GSUtil', 'resumable_threshold', 1538 str(START_CALLBACK_PER_BYTES * 3))]): 1539 fpath_bytes = fpath.encode(UTF8) 1540 self.RunGsUtil(['cp', fpath_bytes, suri(key_uri)], return_stderr=True) 1541 stdout = self.RunGsUtil(['cat', suri(key_uri)], return_stdout=True) 1542 self.assertEquals(stdout, file_contents) 1543 1544 # Note: We originally one time implemented a test 1545 # (test_copy_invalid_unicode_filename) that invalid unicode filenames were 1546 # skipped, but it turns out os.walk() on MacOS doesn't have problems with 1547 # such files (so, failed that test). Given that, we decided to remove the 1548 # test. 1549 1550 @SequentialAndParallelTransfer 1551 def test_gzip_upload_and_download(self): 1552 bucket_uri = self.CreateBucket() 1553 contents = 'x' * 10000 1554 tmpdir = self.CreateTempDir() 1555 self.CreateTempFile(file_name='test.html', tmpdir=tmpdir, contents=contents) 1556 self.CreateTempFile(file_name='test.js', tmpdir=tmpdir, contents=contents) 1557 self.CreateTempFile(file_name='test.txt', tmpdir=tmpdir, contents=contents) 1558 # Test that copying specifying only 2 of the 3 prefixes gzips the correct 1559 # files, and test that including whitespace in the extension list works. 1560 self.RunGsUtil(['cp', '-z', 'js, html', 1561 os.path.join(tmpdir, 'test.*'), suri(bucket_uri)]) 1562 self.AssertNObjectsInBucket(bucket_uri, 3) 1563 uri1 = suri(bucket_uri, 'test.html') 1564 uri2 = suri(bucket_uri, 'test.js') 1565 uri3 = suri(bucket_uri, 'test.txt') 1566 stdout = self.RunGsUtil(['stat', uri1], return_stdout=True) 1567 self.assertRegexpMatches(stdout, r'Content-Encoding:\s+gzip') 1568 stdout = self.RunGsUtil(['stat', uri2], return_stdout=True) 1569 self.assertRegexpMatches(stdout, r'Content-Encoding:\s+gzip') 1570 stdout = self.RunGsUtil(['stat', uri3], return_stdout=True) 1571 self.assertNotRegexpMatches(stdout, r'Content-Encoding:\s+gzip') 1572 fpath4 = self.CreateTempFile() 1573 for uri in (uri1, uri2, uri3): 1574 self.RunGsUtil(['cp', uri, suri(fpath4)]) 1575 with open(fpath4, 'r') as f: 1576 self.assertEqual(f.read(), contents) 1577 1578 @SequentialAndParallelTransfer 1579 def test_gzip_all_upload_and_download(self): 1580 bucket_uri = self.CreateBucket() 1581 contents = 'x' * 10000 1582 tmpdir = self.CreateTempDir() 1583 self.CreateTempFile(file_name='test.html', tmpdir=tmpdir, contents=contents) 1584 self.CreateTempFile(file_name='test.js', tmpdir=tmpdir, contents=contents) 1585 self.CreateTempFile(file_name='test.txt', tmpdir=tmpdir, contents=contents) 1586 self.CreateTempFile(file_name='test', tmpdir=tmpdir, contents=contents) 1587 # Test that all files are compressed. 1588 self.RunGsUtil(['cp', '-Z', 1589 os.path.join(tmpdir, 'test*'), suri(bucket_uri)]) 1590 self.AssertNObjectsInBucket(bucket_uri, 4) 1591 uri1 = suri(bucket_uri, 'test.html') 1592 uri2 = suri(bucket_uri, 'test.js') 1593 uri3 = suri(bucket_uri, 'test.txt') 1594 uri4 = suri(bucket_uri, 'test') 1595 stdout = self.RunGsUtil(['stat', uri1], return_stdout=True) 1596 self.assertRegexpMatches(stdout, r'Content-Encoding:\s+gzip') 1597 stdout = self.RunGsUtil(['stat', uri2], return_stdout=True) 1598 self.assertRegexpMatches(stdout, r'Content-Encoding:\s+gzip') 1599 stdout = self.RunGsUtil(['stat', uri3], return_stdout=True) 1600 self.assertRegexpMatches(stdout, r'Content-Encoding:\s+gzip') 1601 stdout = self.RunGsUtil(['stat', uri4], return_stdout=True) 1602 self.assertRegexpMatches(stdout, r'Content-Encoding:\s+gzip') 1603 fpath4 = self.CreateTempFile() 1604 for uri in (uri1, uri2, uri3, uri4): 1605 self.RunGsUtil(['cp', uri, suri(fpath4)]) 1606 with open(fpath4, 'r') as f: 1607 self.assertEqual(f.read(), contents) 1608 1609 def test_both_gzip_options_error(self): 1610 # Test with -Z and -z 1611 stderr = self.RunGsUtil(['cp', '-Z', '-z', 'html, js', 'a.js', 'b.js'], 1612 return_stderr=True, expected_status=1) 1613 1614 self.assertIn('CommandException', stderr) 1615 self.assertIn('Specifying both the -z and -Z options together is invalid.', 1616 stderr) 1617 1618 # Same test, but with arguments in the opposite order. 1619 stderr = self.RunGsUtil(['cp', '-z', 'html, js', '-Z', 'a.js', 'b.js'], 1620 return_stderr=True, expected_status=1) 1621 1622 self.assertIn('CommandException', stderr) 1623 self.assertIn('Specifying both the -z and -Z options together is invalid.', 1624 stderr) 1625 1626 def test_upload_with_subdir_and_unexpanded_wildcard(self): 1627 fpath1 = self.CreateTempFile(file_name=('tmp', 'x', 'y', 'z')) 1628 bucket_uri = self.CreateBucket() 1629 wildcard_uri = '%s*' % fpath1[:-5] 1630 stderr = self.RunGsUtil(['cp', '-R', wildcard_uri, suri(bucket_uri)], 1631 return_stderr=True) 1632 self.assertIn('Copying file:', stderr) 1633 self.AssertNObjectsInBucket(bucket_uri, 1) 1634 1635 @SequentialAndParallelTransfer 1636 def test_cp_object_ending_with_slash(self): 1637 """Tests that cp works with object names ending with slash.""" 1638 tmpdir = self.CreateTempDir() 1639 bucket_uri = self.CreateBucket() 1640 self.CreateObject(bucket_uri=bucket_uri, 1641 object_name='abc/', 1642 contents='dir') 1643 self.CreateObject(bucket_uri=bucket_uri, 1644 object_name='abc/def', 1645 contents='def') 1646 self.AssertNObjectsInBucket(bucket_uri, 2) 1647 self.RunGsUtil(['cp', '-R', suri(bucket_uri), tmpdir]) 1648 # Check that files in the subdir got copied even though subdir object 1649 # download was skipped. 1650 with open(os.path.join(tmpdir, bucket_uri.bucket_name, 'abc', 'def')) as f: 1651 self.assertEquals('def', '\n'.join(f.readlines())) 1652 1653 def test_cp_without_read_access(self): 1654 """Tests that cp fails without read access to the object.""" 1655 # TODO: With 401's triggering retries in apitools, this test will take 1656 # a long time. Ideally, make apitools accept a num_retries config for this 1657 # until we stop retrying the 401's. 1658 bucket_uri = self.CreateBucket() 1659 object_uri = self.CreateObject(bucket_uri=bucket_uri, contents='foo') 1660 1661 # Use @Retry as hedge against bucket listing eventual consistency. 1662 self.AssertNObjectsInBucket(bucket_uri, 1) 1663 1664 if self.default_provider == 's3': 1665 expected_error_regex = r'AccessDenied' 1666 else: 1667 expected_error_regex = r'Anonymous user(s)? do(es)? not have' 1668 1669 with self.SetAnonymousBotoCreds(): 1670 stderr = self.RunGsUtil(['cp', suri(object_uri), 'foo'], 1671 return_stderr=True, expected_status=1) 1672 self.assertRegexpMatches(stderr, expected_error_regex) 1673 1674 @unittest.skipIf(IS_WINDOWS, 'os.symlink() is not available on Windows.') 1675 def test_cp_minus_r_minus_e(self): 1676 """Tests that cp -e -r ignores symlinks when recursing.""" 1677 bucket_uri = self.CreateBucket() 1678 tmpdir = self.CreateTempDir() 1679 # Create a valid file, since cp expects to copy at least one source URL 1680 # successfully. 1681 self.CreateTempFile(tmpdir=tmpdir, contents='foo') 1682 subdir = os.path.join(tmpdir, 'subdir') 1683 os.mkdir(subdir) 1684 os.mkdir(os.path.join(tmpdir, 'missing')) 1685 # Create a blank directory that is a broken symlink to ensure that we 1686 # don't fail recursive enumeration with a bad symlink. 1687 os.symlink(os.path.join(tmpdir, 'missing'), 1688 os.path.join(subdir, 'missing')) 1689 os.rmdir(os.path.join(tmpdir, 'missing')) 1690 self.RunGsUtil(['cp', '-r', '-e', tmpdir, suri(bucket_uri)]) 1691 1692 @unittest.skipIf(IS_WINDOWS, 'os.symlink() is not available on Windows.') 1693 def test_cp_minus_e(self): 1694 fpath_dir = self.CreateTempDir() 1695 fpath1 = self.CreateTempFile(tmpdir=fpath_dir) 1696 fpath2 = os.path.join(fpath_dir, 'cp_minus_e') 1697 bucket_uri = self.CreateBucket() 1698 os.symlink(fpath1, fpath2) 1699 stderr = self.RunGsUtil( 1700 ['cp', '-e', '%s%s*' % (fpath_dir, os.path.sep), 1701 suri(bucket_uri, 'files')], 1702 return_stderr=True) 1703 self.assertIn('Copying file', stderr) 1704 self.assertIn('Skipping symbolic link', stderr) 1705 1706 # Ensure that top-level arguments are ignored if they are symlinks. 1707 stderr = self.RunGsUtil( 1708 ['cp', '-e', fpath1, fpath2, suri(bucket_uri, 'files')], 1709 return_stderr=True, expected_status=1) 1710 self.assertIn('Copying file', stderr) 1711 self.assertIn('Skipping symbolic link', stderr) 1712 self.assertIn('CommandException: No URLs matched: %s' % fpath2, stderr) 1713 1714 def test_cp_multithreaded_wildcard(self): 1715 """Tests that cp -m works with a wildcard.""" 1716 num_test_files = 5 1717 tmp_dir = self.CreateTempDir(test_files=num_test_files) 1718 bucket_uri = self.CreateBucket() 1719 wildcard_uri = '%s%s*' % (tmp_dir, os.sep) 1720 self.RunGsUtil(['-m', 'cp', wildcard_uri, suri(bucket_uri)]) 1721 self.AssertNObjectsInBucket(bucket_uri, num_test_files) 1722 1723 @SequentialAndParallelTransfer 1724 def test_cp_duplicate_source_args(self): 1725 """Tests that cp -m works when a source argument is provided twice.""" 1726 object_contents = 'edge' 1727 object_uri = self.CreateObject(object_name='foo', contents=object_contents) 1728 tmp_dir = self.CreateTempDir() 1729 self.RunGsUtil(['-m', 'cp', suri(object_uri), suri(object_uri), tmp_dir]) 1730 with open(os.path.join(tmp_dir, 'foo'), 'r') as in_fp: 1731 contents = in_fp.read() 1732 # Contents should be not duplicated. 1733 self.assertEqual(contents, object_contents) 1734 1735 @SkipForS3('gsutil doesn\'t support S3 customer-supplied encryption keys.') 1736 @SequentialAndParallelTransfer 1737 def test_cp_download_encrypted_object(self): 1738 """Tests downloading an encrypted object.""" 1739 if self.test_api == ApiSelector.XML: 1740 return unittest.skip( 1741 'gsutil does not support encryption with the XML API') 1742 object_contents = 'bar' 1743 object_uri = self.CreateObject(object_name='foo', contents=object_contents, 1744 encryption_key=TEST_ENCRYPTION_KEY1) 1745 fpath = self.CreateTempFile() 1746 boto_config_for_test = [('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY1)] 1747 1748 with SetBotoConfigForTest(boto_config_for_test): 1749 self.RunGsUtil(['cp', suri(object_uri), suri(fpath)]) 1750 with open(fpath, 'r') as f: 1751 self.assertEqual(f.read(), object_contents) 1752 1753 # If multiple keys are supplied and one is correct, download should succeed. 1754 fpath2 = self.CreateTempFile() 1755 boto_config_for_test2 = [ 1756 ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY3), 1757 ('GSUtil', 'decryption_key1', TEST_ENCRYPTION_KEY2), 1758 ('GSUtil', 'decryption_key2', TEST_ENCRYPTION_KEY1)] 1759 1760 with SetBotoConfigForTest(boto_config_for_test2): 1761 self.RunGsUtil(['cp', suri(object_uri), suri(fpath2)]) 1762 with open(fpath2, 'r') as f: 1763 self.assertEqual(f.read(), object_contents) 1764 1765 @SkipForS3('gsutil doesn\'t support S3 customer-supplied encryption keys.') 1766 @SequentialAndParallelTransfer 1767 def test_cp_download_encrypted_object_without_key(self): 1768 """Tests downloading an encrypted object without the necessary key.""" 1769 if self.test_api == ApiSelector.XML: 1770 return unittest.skip( 1771 'gsutil does not support encryption with the XML API') 1772 object_contents = 'bar' 1773 object_uri = self.CreateObject(object_name='foo', contents=object_contents, 1774 encryption_key=TEST_ENCRYPTION_KEY1) 1775 fpath = self.CreateTempFile() 1776 1777 stderr = self.RunGsUtil(['cp', suri(object_uri), suri(fpath)], 1778 expected_status=1, return_stderr=True) 1779 self.assertIn('Missing decryption key with SHA256 hash %s' % 1780 TEST_ENCRYPTION_KEY1_SHA256_B64, stderr) 1781 1782 @SkipForS3('gsutil doesn\'t support S3 customer-supplied encryption keys.') 1783 @SequentialAndParallelTransfer 1784 def test_cp_upload_encrypted_object(self): 1785 """Tests uploading an encrypted object.""" 1786 if self.test_api == ApiSelector.XML: 1787 return unittest.skip( 1788 'gsutil does not support encryption with the XML API') 1789 bucket_uri = self.CreateBucket() 1790 object_uri = suri(bucket_uri, 'foo') 1791 file_contents = 'bar' 1792 fpath = self.CreateTempFile(contents=file_contents, file_name='foo') 1793 1794 boto_config_for_test = [('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY1)] 1795 1796 # Uploading the object should succeed. 1797 with SetBotoConfigForTest(boto_config_for_test): 1798 self.RunGsUtil(['cp', suri(fpath), suri(bucket_uri)]) 1799 1800 self.AssertObjectUsesEncryptionKey(object_uri, TEST_ENCRYPTION_KEY1) 1801 1802 with SetBotoConfigForTest(boto_config_for_test): 1803 # Reading the object back should succeed. 1804 fpath2 = self.CreateTempFile() 1805 self.RunGsUtil(['cp', suri(bucket_uri, 'foo'), suri(fpath2)]) 1806 with open(fpath2, 'r') as f: 1807 self.assertEqual(f.read(), file_contents) 1808 1809 @SkipForS3('No resumable upload or encryption support for S3.') 1810 def test_cp_resumable_upload_encrypted_object_break(self): 1811 """Tests that an encrypted upload resumes after a connection break.""" 1812 if self.test_api == ApiSelector.XML: 1813 return unittest.skip( 1814 'gsutil does not support encryption with the XML API') 1815 bucket_uri = self.CreateBucket() 1816 object_uri_str = suri(bucket_uri, 'foo') 1817 fpath = self.CreateTempFile(contents='a' * self.halt_size) 1818 boto_config_for_test = [ 1819 ('GSUtil', 'resumable_threshold', str(ONE_KIB)), 1820 ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY1)] 1821 test_callback_file = self.CreateTempFile( 1822 contents=pickle.dumps(HaltingCopyCallbackHandler(True, 5))) 1823 1824 with SetBotoConfigForTest(boto_config_for_test): 1825 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, 1826 fpath, object_uri_str], 1827 expected_status=1, return_stderr=True) 1828 self.assertIn('Artifically halting upload', stderr) 1829 stderr = self.RunGsUtil(['cp', fpath, object_uri_str], 1830 return_stderr=True) 1831 self.assertIn('Resuming upload', stderr) 1832 stdout = self.RunGsUtil(['stat', object_uri_str], return_stdout=True) 1833 with open(fpath, 'rb') as fp: 1834 self.assertIn(CalculateB64EncodedMd5FromContents(fp), stdout) 1835 1836 self.AssertObjectUsesEncryptionKey(object_uri_str, 1837 TEST_ENCRYPTION_KEY1) 1838 1839 @SkipForS3('No resumable upload or encryption support for S3.') 1840 def test_cp_resumable_upload_encrypted_object_different_key(self): 1841 """Tests that an encrypted upload resume uses original encryption key.""" 1842 if self.test_api == ApiSelector.XML: 1843 return unittest.skip( 1844 'gsutil does not support encryption with the XML API') 1845 bucket_uri = self.CreateBucket() 1846 object_uri_str = suri(bucket_uri, 'foo') 1847 file_contents = 'a' * self.halt_size 1848 fpath = self.CreateTempFile(contents=file_contents) 1849 boto_config_for_test = [ 1850 ('GSUtil', 'resumable_threshold', str(ONE_KIB)), 1851 ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY1)] 1852 test_callback_file = self.CreateTempFile( 1853 contents=pickle.dumps(HaltingCopyCallbackHandler(True, 5))) 1854 1855 with SetBotoConfigForTest(boto_config_for_test): 1856 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, 1857 fpath, object_uri_str], 1858 expected_status=1, return_stderr=True) 1859 self.assertIn('Artifically halting upload', stderr) 1860 1861 # Resume the upload with multiple keys, including the original. 1862 boto_config_for_test2 = [ 1863 ('GSUtil', 'resumable_threshold', str(ONE_KIB)), 1864 ('GSUtil', 'decryption_key1', TEST_ENCRYPTION_KEY2), 1865 ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY1)] 1866 1867 with SetBotoConfigForTest(boto_config_for_test2): 1868 stderr = self.RunGsUtil(['cp', fpath, object_uri_str], 1869 return_stderr=True) 1870 self.assertIn('Resuming upload', stderr) 1871 1872 # Object should have the original key. 1873 self.AssertObjectUsesEncryptionKey(object_uri_str, 1874 TEST_ENCRYPTION_KEY1) 1875 1876 @SkipForS3('No resumable upload or encryption support for S3.') 1877 def test_cp_resumable_upload_encrypted_object_missing_key(self): 1878 """Tests that an encrypted upload does not resume without original key.""" 1879 if self.test_api == ApiSelector.XML: 1880 return unittest.skip( 1881 'gsutil does not support encryption with the XML API') 1882 bucket_uri = self.CreateBucket() 1883 object_uri_str = suri(bucket_uri, 'foo') 1884 file_contents = 'a' * self.halt_size 1885 fpath = self.CreateTempFile(contents=file_contents) 1886 boto_config_for_test = [ 1887 ('GSUtil', 'resumable_threshold', str(ONE_KIB)), 1888 ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY1)] 1889 test_callback_file = self.CreateTempFile( 1890 contents=pickle.dumps(HaltingCopyCallbackHandler(True, 5))) 1891 1892 with SetBotoConfigForTest(boto_config_for_test): 1893 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, 1894 fpath, object_uri_str], 1895 expected_status=1, return_stderr=True) 1896 self.assertIn('Artifically halting upload', stderr) 1897 1898 # Resume the upload without the original key. 1899 boto_config_for_test2 = [ 1900 ('GSUtil', 'resumable_threshold', str(ONE_KIB)), 1901 ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY2)] 1902 1903 with SetBotoConfigForTest(boto_config_for_test2): 1904 stderr = self.RunGsUtil(['cp', fpath, object_uri_str], 1905 return_stderr=True) 1906 self.assertNotIn('Resuming upload', stderr) 1907 self.assertIn('does not match current encryption key', stderr) 1908 self.assertIn('Restarting upload from scratch', stderr) 1909 1910 # Object should have the new key. 1911 self.AssertObjectUsesEncryptionKey(object_uri_str, 1912 TEST_ENCRYPTION_KEY2) 1913 1914 def _ensure_object_unencrypted(self, object_uri_str): 1915 """Strongly consistent check that the object is unencrypted.""" 1916 stdout = self.RunGsUtil(['stat', object_uri_str], return_stdout=True) 1917 self.assertNotIn('Encryption Key', stdout) 1918 1919 @SkipForS3('No resumable upload support for S3.') 1920 def test_cp_resumable_upload_break(self): 1921 """Tests that an upload can be resumed after a connection break.""" 1922 bucket_uri = self.CreateBucket() 1923 fpath = self.CreateTempFile(contents='a' * self.halt_size) 1924 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) 1925 test_callback_file = self.CreateTempFile( 1926 contents=pickle.dumps(HaltingCopyCallbackHandler(True, 5))) 1927 1928 with SetBotoConfigForTest([boto_config_for_test]): 1929 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, 1930 fpath, suri(bucket_uri)], 1931 expected_status=1, return_stderr=True) 1932 self.assertIn('Artifically halting upload', stderr) 1933 stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)], 1934 return_stderr=True) 1935 self.assertIn('Resuming upload', stderr) 1936 1937 @SkipForS3('No resumable upload support for S3.') 1938 def test_cp_resumable_upload_retry(self): 1939 """Tests that a resumable upload completes with one retry.""" 1940 bucket_uri = self.CreateBucket() 1941 fpath = self.CreateTempFile(contents='a' * self.halt_size) 1942 # TODO: Raising an httplib or socket error blocks bucket teardown 1943 # in JSON for 60-120s on a multiprocessing lock acquire. Figure out why; 1944 # until then, raise an apitools retryable exception. 1945 if self.test_api == ApiSelector.XML: 1946 test_callback_file = self.CreateTempFile( 1947 contents=pickle.dumps(_ResumableUploadRetryHandler( 1948 5, httplib.BadStatusLine, ('unused',)))) 1949 else: 1950 test_callback_file = self.CreateTempFile( 1951 contents=pickle.dumps(_ResumableUploadRetryHandler( 1952 5, apitools_exceptions.BadStatusCodeError, 1953 ('unused', 'unused', 'unused')))) 1954 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) 1955 with SetBotoConfigForTest([boto_config_for_test]): 1956 stderr = self.RunGsUtil(['-D', 'cp', '--testcallbackfile', 1957 test_callback_file, fpath, suri(bucket_uri)], 1958 return_stderr=1) 1959 if self.test_api == ApiSelector.XML: 1960 self.assertIn('Got retryable failure', stderr) 1961 else: 1962 self.assertIn('Retrying', stderr) 1963 1964 @SkipForS3('No resumable upload support for S3.') 1965 def test_cp_resumable_streaming_upload_retry(self): 1966 """Tests that a streaming resumable upload completes with one retry.""" 1967 if self.test_api == ApiSelector.XML: 1968 return unittest.skip('XML does not support resumable streaming uploads.') 1969 bucket_uri = self.CreateBucket() 1970 1971 test_callback_file = self.CreateTempFile( 1972 contents=pickle.dumps(_ResumableUploadRetryHandler( 1973 5, apitools_exceptions.BadStatusCodeError, 1974 ('unused', 'unused', 'unused')))) 1975 # Need to reduce the JSON chunk size since streaming uploads buffer a 1976 # full chunk. 1977 boto_configs_for_test = [('GSUtil', 'json_resumable_chunk_size', 1978 str(256 * ONE_KIB)), 1979 ('Boto', 'num_retries', '2')] 1980 with SetBotoConfigForTest(boto_configs_for_test): 1981 stderr = self.RunGsUtil( 1982 ['-D', 'cp', '--testcallbackfile', test_callback_file, '-', 1983 suri(bucket_uri, 'foo')], 1984 stdin='a' * 512 * ONE_KIB, return_stderr=1) 1985 self.assertIn('Retrying', stderr) 1986 1987 @SkipForS3('preserve_acl flag not supported for S3.') 1988 def test_cp_preserve_no_owner(self): 1989 bucket_uri = self.CreateBucket() 1990 object_uri = self.CreateObject(bucket_uri=bucket_uri, contents='foo') 1991 # Anonymous user can read the object and write to the bucket, but does 1992 # not own the object. 1993 self.RunGsUtil(['acl', 'ch', '-u', 'AllUsers:R', suri(object_uri)]) 1994 self.RunGsUtil(['acl', 'ch', '-u', 'AllUsers:W', suri(bucket_uri)]) 1995 with self.SetAnonymousBotoCreds(): 1996 stderr = self.RunGsUtil(['cp', '-p', suri(object_uri), 1997 suri(bucket_uri, 'foo')], 1998 return_stderr=True, expected_status=1) 1999 self.assertIn('OWNER permission is required for preserving ACLs', stderr) 2000 2001 @SkipForS3('No resumable upload support for S3.') 2002 def test_cp_progress_callbacks(self): 2003 bucket_uri = self.CreateBucket() 2004 final_size_string = BytesToFixedWidthString(1024**2) 2005 final_progress_callback = final_size_string+'/'+final_size_string 2006 fpath = self.CreateTempFile(contents='a'*ONE_MIB, file_name='foo') 2007 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) 2008 with SetBotoConfigForTest([boto_config_for_test]): 2009 stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)], 2010 return_stderr=True) 2011 self.assertEquals(1, stderr.count(final_progress_callback)) 2012 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(2 * ONE_MIB)) 2013 with SetBotoConfigForTest([boto_config_for_test]): 2014 stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)], 2015 return_stderr=True) 2016 self.assertEquals(1, stderr.count(final_progress_callback)) 2017 stderr = self.RunGsUtil(['cp', suri(bucket_uri, 'foo'), fpath], 2018 return_stderr=True) 2019 self.assertEquals(1, stderr.count(final_progress_callback)) 2020 2021 @SkipForS3('No resumable upload support for S3.') 2022 def test_cp_resumable_upload(self): 2023 """Tests that a basic resumable upload completes successfully.""" 2024 bucket_uri = self.CreateBucket() 2025 fpath = self.CreateTempFile(contents='a' * self.halt_size) 2026 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) 2027 with SetBotoConfigForTest([boto_config_for_test]): 2028 self.RunGsUtil(['cp', fpath, suri(bucket_uri)]) 2029 2030 @SkipForS3('No resumable upload support for S3.') 2031 def test_resumable_upload_break_leaves_tracker(self): 2032 """Tests that a tracker file is created with a resumable upload.""" 2033 bucket_uri = self.CreateBucket() 2034 fpath = self.CreateTempFile(file_name='foo', 2035 contents='a' * self.halt_size) 2036 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) 2037 with SetBotoConfigForTest([boto_config_for_test]): 2038 tracker_filename = GetTrackerFilePath( 2039 StorageUrlFromString(suri(bucket_uri, 'foo')), 2040 TrackerFileType.UPLOAD, self.test_api) 2041 test_callback_file = self.CreateTempFile( 2042 contents=pickle.dumps(HaltingCopyCallbackHandler(True, 5))) 2043 try: 2044 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, 2045 fpath, suri(bucket_uri, 'foo')], 2046 expected_status=1, return_stderr=True) 2047 self.assertIn('Artifically halting upload', stderr) 2048 self.assertTrue(os.path.exists(tracker_filename), 2049 'Tracker file %s not present.' % tracker_filename) 2050 finally: 2051 DeleteTrackerFile(tracker_filename) 2052 2053 @SkipForS3('No resumable upload support for S3.') 2054 def test_cp_resumable_upload_break_file_size_change(self): 2055 """Tests a resumable upload where the uploaded file changes size. 2056 2057 This should fail when we read the tracker data. 2058 """ 2059 bucket_uri = self.CreateBucket() 2060 tmp_dir = self.CreateTempDir() 2061 fpath = self.CreateTempFile(file_name='foo', tmpdir=tmp_dir, 2062 contents='a' * self.halt_size) 2063 test_callback_file = self.CreateTempFile( 2064 contents=pickle.dumps(HaltingCopyCallbackHandler(True, 5))) 2065 2066 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) 2067 with SetBotoConfigForTest([boto_config_for_test]): 2068 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, 2069 fpath, suri(bucket_uri)], 2070 expected_status=1, return_stderr=True) 2071 self.assertIn('Artifically halting upload', stderr) 2072 fpath = self.CreateTempFile(file_name='foo', tmpdir=tmp_dir, 2073 contents='a' * self.halt_size * 2) 2074 stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)], 2075 expected_status=1, return_stderr=True) 2076 self.assertIn('ResumableUploadAbortException', stderr) 2077 2078 @SkipForS3('No resumable upload support for S3.') 2079 def test_cp_resumable_upload_break_file_content_change(self): 2080 """Tests a resumable upload where the uploaded file changes content.""" 2081 if self.test_api == ApiSelector.XML: 2082 return unittest.skip( 2083 'XML doesn\'t make separate HTTP calls at fixed-size boundaries for ' 2084 'resumable uploads, so we can\'t guarantee that the server saves a ' 2085 'specific part of the upload.') 2086 bucket_uri = self.CreateBucket() 2087 tmp_dir = self.CreateTempDir() 2088 fpath = self.CreateTempFile(file_name='foo', tmpdir=tmp_dir, 2089 contents='a' * ONE_KIB * 512) 2090 test_callback_file = self.CreateTempFile( 2091 contents=pickle.dumps(HaltingCopyCallbackHandler(True, 2092 int(ONE_KIB) * 384))) 2093 resumable_threshold_for_test = ( 2094 'GSUtil', 'resumable_threshold', str(ONE_KIB)) 2095 resumable_chunk_size_for_test = ( 2096 'GSUtil', 'json_resumable_chunk_size', str(ONE_KIB * 256)) 2097 with SetBotoConfigForTest([resumable_threshold_for_test, 2098 resumable_chunk_size_for_test]): 2099 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, 2100 fpath, suri(bucket_uri)], 2101 expected_status=1, return_stderr=True) 2102 self.assertIn('Artifically halting upload', stderr) 2103 fpath = self.CreateTempFile(file_name='foo', tmpdir=tmp_dir, 2104 contents='b' * ONE_KIB * 512) 2105 stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)], 2106 expected_status=1, return_stderr=True) 2107 self.assertIn('doesn\'t match cloud-supplied digest', stderr) 2108 2109 @SkipForS3('No resumable upload support for S3.') 2110 def test_cp_resumable_upload_break_file_smaller_size(self): 2111 """Tests a resumable upload where the uploaded file changes content. 2112 2113 This should fail hash validation. 2114 """ 2115 bucket_uri = self.CreateBucket() 2116 tmp_dir = self.CreateTempDir() 2117 fpath = self.CreateTempFile(file_name='foo', tmpdir=tmp_dir, 2118 contents='a' * ONE_KIB * 512) 2119 test_callback_file = self.CreateTempFile( 2120 contents=pickle.dumps(HaltingCopyCallbackHandler(True, 2121 int(ONE_KIB) * 384))) 2122 resumable_threshold_for_test = ( 2123 'GSUtil', 'resumable_threshold', str(ONE_KIB)) 2124 resumable_chunk_size_for_test = ( 2125 'GSUtil', 'json_resumable_chunk_size', str(ONE_KIB * 256)) 2126 with SetBotoConfigForTest([resumable_threshold_for_test, 2127 resumable_chunk_size_for_test]): 2128 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, 2129 fpath, suri(bucket_uri)], 2130 expected_status=1, return_stderr=True) 2131 self.assertIn('Artifically halting upload', stderr) 2132 fpath = self.CreateTempFile(file_name='foo', tmpdir=tmp_dir, 2133 contents='a' * ONE_KIB) 2134 stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)], 2135 expected_status=1, return_stderr=True) 2136 self.assertIn('ResumableUploadAbortException', stderr) 2137 2138 @SkipForS3('No resumable upload support for S3.') 2139 def test_cp_composite_encrypted_upload_resume(self): 2140 """Tests that an encrypted composite upload resumes successfully.""" 2141 if self.test_api == ApiSelector.XML: 2142 return unittest.skip( 2143 'gsutil does not support encryption with the XML API') 2144 bucket_uri = self.CreateBucket() 2145 dst_url = StorageUrlFromString(suri(bucket_uri, 'foo')) 2146 2147 file_contents = 'foobar' 2148 source_file = self.CreateTempFile( 2149 contents=file_contents, file_name=file_contents) 2150 src_url = StorageUrlFromString(source_file) 2151 2152 # Simulate an upload that had occurred by writing a tracker file 2153 # that points to a previously uploaded component. 2154 tracker_file_name = GetTrackerFilePath( 2155 dst_url, TrackerFileType.PARALLEL_UPLOAD, self.test_api, src_url) 2156 tracker_prefix = '123' 2157 2158 # Create component 0 to be used in the resume; it must match the name 2159 # that will be generated in copy_helper, so we use the same scheme. 2160 encoded_name = (PARALLEL_UPLOAD_STATIC_SALT + source_file).encode(UTF8) 2161 content_md5 = md5() 2162 content_md5.update(encoded_name) 2163 digest = content_md5.hexdigest() 2164 component_object_name = (tracker_prefix + PARALLEL_UPLOAD_TEMP_NAMESPACE + 2165 digest + '_0') 2166 2167 component_size = 3 2168 object_uri = self.CreateObject( 2169 bucket_uri=bucket_uri, object_name=component_object_name, 2170 contents=file_contents[:component_size], 2171 encryption_key=TEST_ENCRYPTION_KEY1) 2172 existing_component = ObjectFromTracker(component_object_name, 2173 str(object_uri.generation)) 2174 existing_components = [existing_component] 2175 enc_key_sha256 = TEST_ENCRYPTION_KEY1_SHA256_B64 2176 2177 WriteParallelUploadTrackerFile( 2178 tracker_file_name, tracker_prefix, existing_components, 2179 encryption_key_sha256=enc_key_sha256) 2180 2181 try: 2182 # Now "resume" the upload using the original encryption key. 2183 with SetBotoConfigForTest([ 2184 ('GSUtil', 'parallel_composite_upload_threshold', '1'), 2185 ('GSUtil', 'parallel_composite_upload_component_size', 2186 str(component_size)), 2187 ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY1)]): 2188 stderr = self.RunGsUtil(['cp', source_file, suri(bucket_uri, 'foo')], 2189 return_stderr=True) 2190 self.assertIn('Found 1 existing temporary components to reuse.', stderr) 2191 self.assertFalse( 2192 os.path.exists(tracker_file_name), 2193 'Tracker file %s should have been deleted.' % tracker_file_name) 2194 read_contents = self.RunGsUtil(['cat', suri(bucket_uri, 'foo')], 2195 return_stdout=True) 2196 self.assertEqual(read_contents, file_contents) 2197 finally: 2198 # Clean up if something went wrong. 2199 DeleteTrackerFile(tracker_file_name) 2200 2201 @SkipForS3('No resumable upload support for S3.') 2202 def test_cp_composite_encrypted_upload_restart(self): 2203 """Tests that encrypted composite upload restarts given a different key.""" 2204 if self.test_api == ApiSelector.XML: 2205 return unittest.skip( 2206 'gsutil does not support encryption with the XML API') 2207 bucket_uri = self.CreateBucket() 2208 dst_url = StorageUrlFromString(suri(bucket_uri, 'foo')) 2209 2210 file_contents = 'foobar' 2211 source_file = self.CreateTempFile(contents=file_contents, file_name='foo') 2212 src_url = StorageUrlFromString(source_file) 2213 2214 # Simulate an upload that had occurred by writing a tracker file. 2215 tracker_file_name = GetTrackerFilePath( 2216 dst_url, TrackerFileType.PARALLEL_UPLOAD, self.test_api, src_url) 2217 tracker_prefix = '123' 2218 existing_component_name = 'foo_1' 2219 object_uri = self.CreateObject( 2220 bucket_uri=bucket_uri, object_name='foo_1', 2221 contents='foo', encryption_key=TEST_ENCRYPTION_KEY1) 2222 existing_component = ObjectFromTracker(existing_component_name, 2223 str(object_uri.generation)) 2224 existing_components = [existing_component] 2225 enc_key_sha256 = TEST_ENCRYPTION_KEY1_SHA256_B64 2226 WriteParallelUploadTrackerFile( 2227 tracker_file_name, tracker_prefix, existing_components, enc_key_sha256) 2228 2229 try: 2230 # Now "resume" the upload using the original encryption key. 2231 with SetBotoConfigForTest([ 2232 ('GSUtil', 'parallel_composite_upload_threshold', '1'), 2233 ('GSUtil', 'parallel_composite_upload_component_size', '3'), 2234 ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY2)]): 2235 stderr = self.RunGsUtil(['cp', source_file, suri(bucket_uri, 'foo')], 2236 return_stderr=True) 2237 self.assertIn('does not match current encryption key. ' 2238 'Deleting old components and restarting upload', stderr) 2239 self.assertNotIn('existing temporary components to reuse.', stderr) 2240 self.assertFalse( 2241 os.path.exists(tracker_file_name), 2242 'Tracker file %s should have been deleted.' % tracker_file_name) 2243 read_contents = self.RunGsUtil(['cat', suri(bucket_uri, 'foo')], 2244 return_stdout=True) 2245 self.assertEqual(read_contents, file_contents) 2246 finally: 2247 # Clean up if something went wrong. 2248 DeleteTrackerFile(tracker_file_name) 2249 2250 # This temporarily changes the tracker directory to unwritable which 2251 # interferes with any parallel running tests that use the tracker directory. 2252 @NotParallelizable 2253 @SkipForS3('No resumable upload support for S3.') 2254 @unittest.skipIf(IS_WINDOWS, 'chmod on dir unsupported on Windows.') 2255 @SequentialAndParallelTransfer 2256 def test_cp_unwritable_tracker_file(self): 2257 """Tests a resumable upload with an unwritable tracker file.""" 2258 bucket_uri = self.CreateBucket() 2259 tracker_filename = GetTrackerFilePath( 2260 StorageUrlFromString(suri(bucket_uri, 'foo')), 2261 TrackerFileType.UPLOAD, self.test_api) 2262 tracker_dir = os.path.dirname(tracker_filename) 2263 fpath = self.CreateTempFile(file_name='foo', contents='a' * ONE_KIB) 2264 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) 2265 save_mod = os.stat(tracker_dir).st_mode 2266 2267 try: 2268 os.chmod(tracker_dir, 0) 2269 with SetBotoConfigForTest([boto_config_for_test]): 2270 stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)], 2271 expected_status=1, return_stderr=True) 2272 self.assertIn('Couldn\'t write tracker file', stderr) 2273 finally: 2274 os.chmod(tracker_dir, save_mod) 2275 if os.path.exists(tracker_filename): 2276 os.unlink(tracker_filename) 2277 2278 # This temporarily changes the tracker directory to unwritable which 2279 # interferes with any parallel running tests that use the tracker directory. 2280 @NotParallelizable 2281 @unittest.skipIf(IS_WINDOWS, 'chmod on dir unsupported on Windows.') 2282 @SequentialAndParallelTransfer 2283 def test_cp_unwritable_tracker_file_download(self): 2284 """Tests downloads with an unwritable tracker file.""" 2285 object_uri = self.CreateObject(contents='foo' * ONE_KIB) 2286 tracker_filename = GetTrackerFilePath( 2287 StorageUrlFromString(suri(object_uri)), 2288 TrackerFileType.DOWNLOAD, self.test_api) 2289 tracker_dir = os.path.dirname(tracker_filename) 2290 fpath = self.CreateTempFile() 2291 save_mod = os.stat(tracker_dir).st_mode 2292 2293 try: 2294 os.chmod(tracker_dir, 0) 2295 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(EIGHT_MIB)) 2296 with SetBotoConfigForTest([boto_config_for_test]): 2297 # Should succeed because we are below the threshold. 2298 self.RunGsUtil(['cp', suri(object_uri), fpath]) 2299 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) 2300 with SetBotoConfigForTest([boto_config_for_test]): 2301 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], 2302 expected_status=1, return_stderr=True) 2303 self.assertIn('Couldn\'t write tracker file', stderr) 2304 finally: 2305 os.chmod(tracker_dir, save_mod) 2306 if os.path.exists(tracker_filename): 2307 os.unlink(tracker_filename) 2308 2309 def _test_cp_resumable_download_break_helper(self, boto_config, 2310 encryption_key=None): 2311 """Helper function for different modes of resumable download break. 2312 2313 Args: 2314 boto_config: List of boto configuration tuples for use with 2315 SetBotoConfigForTest. 2316 encryption_key: Base64 encryption key for object encryption (if any). 2317 """ 2318 bucket_uri = self.CreateBucket() 2319 file_contents = 'a' * self.halt_size 2320 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', 2321 contents=file_contents, 2322 encryption_key=encryption_key) 2323 fpath = self.CreateTempFile() 2324 test_callback_file = self.CreateTempFile( 2325 contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5))) 2326 2327 with SetBotoConfigForTest(boto_config): 2328 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, 2329 suri(object_uri), fpath], 2330 expected_status=1, return_stderr=True) 2331 self.assertIn('Artifically halting download.', stderr) 2332 tracker_filename = GetTrackerFilePath( 2333 StorageUrlFromString(fpath), TrackerFileType.DOWNLOAD, self.test_api) 2334 self.assertTrue(os.path.isfile(tracker_filename)) 2335 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], 2336 return_stderr=True) 2337 self.assertIn('Resuming download', stderr) 2338 with open(fpath, 'r') as f: 2339 self.assertEqual(f.read(), file_contents, 'File contents differ') 2340 2341 def test_cp_resumable_download_break(self): 2342 """Tests that a download can be resumed after a connection break.""" 2343 self._test_cp_resumable_download_break_helper( 2344 [('GSUtil', 'resumable_threshold', str(ONE_KIB))]) 2345 2346 @SkipForS3('gsutil doesn\'t support S3 customer-supplied encryption keys.') 2347 def test_cp_resumable_encrypted_download_break(self): 2348 """Tests that an encrypted download resumes after a connection break.""" 2349 if self.test_api == ApiSelector.XML: 2350 return unittest.skip( 2351 'gsutil does not support encryption with the XML API') 2352 self._test_cp_resumable_download_break_helper( 2353 [('GSUtil', 'resumable_threshold', str(ONE_KIB)), 2354 ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY1)], 2355 encryption_key=TEST_ENCRYPTION_KEY1) 2356 2357 @SkipForS3('gsutil doesn\'t support S3 customer-supplied encryption keys.') 2358 def test_cp_resumable_encrypted_download_key_rotation(self): 2359 """Tests that a download restarts with a rotated encryption key.""" 2360 if self.test_api == ApiSelector.XML: 2361 return unittest.skip( 2362 'gsutil does not support encryption with the XML API') 2363 bucket_uri = self.CreateBucket() 2364 file_contents = 'a' * self.halt_size 2365 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', 2366 contents=file_contents, 2367 encryption_key=TEST_ENCRYPTION_KEY1) 2368 fpath = self.CreateTempFile() 2369 test_callback_file = self.CreateTempFile( 2370 contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5))) 2371 2372 boto_config_for_test = [ 2373 ('GSUtil', 'resumable_threshold', str(ONE_KIB)), 2374 ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY1)] 2375 2376 with SetBotoConfigForTest(boto_config_for_test): 2377 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, 2378 suri(object_uri), fpath], 2379 expected_status=1, return_stderr=True) 2380 self.assertIn('Artifically halting download.', stderr) 2381 tracker_filename = GetTrackerFilePath( 2382 StorageUrlFromString(fpath), TrackerFileType.DOWNLOAD, self.test_api) 2383 self.assertTrue(os.path.isfile(tracker_filename)) 2384 2385 # After simulated connection break, rotate the key on the object. 2386 boto_config_for_test2 = [ 2387 ('GSUtil', 'resumable_threshold', str(ONE_KIB)), 2388 ('GSUtil', 'decryption_key1', TEST_ENCRYPTION_KEY1), 2389 ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY2)] 2390 with SetBotoConfigForTest(boto_config_for_test2): 2391 self.RunGsUtil(['rewrite', '-k', suri(object_uri)]) 2392 2393 # Now resume the download using only the new encryption key. Since its 2394 # generation changed, we must restart it. 2395 boto_config_for_test3 = [ 2396 ('GSUtil', 'resumable_threshold', str(ONE_KIB)), 2397 ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY2)] 2398 with SetBotoConfigForTest(boto_config_for_test3): 2399 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], 2400 return_stderr=True) 2401 self.assertIn('Restarting download', stderr) 2402 with open(fpath, 'r') as f: 2403 self.assertEqual(f.read(), file_contents, 'File contents differ') 2404 2405 @SequentialAndParallelTransfer 2406 def test_cp_resumable_download_etag_differs(self): 2407 """Tests that download restarts the file when the source object changes. 2408 2409 This causes the etag not to match. 2410 """ 2411 bucket_uri = self.CreateBucket() 2412 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', 2413 contents='abc' * self.halt_size) 2414 fpath = self.CreateTempFile() 2415 test_callback_file = self.CreateTempFile( 2416 contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5))) 2417 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) 2418 with SetBotoConfigForTest([boto_config_for_test]): 2419 # This will create a tracker file with an ETag. 2420 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, 2421 suri(object_uri), fpath], 2422 expected_status=1, return_stderr=True) 2423 self.assertIn('Artifically halting download.', stderr) 2424 # Create a new object with different contents - it should have a 2425 # different ETag since the content has changed. 2426 object_uri = self.CreateObject( 2427 bucket_uri=bucket_uri, object_name='foo', 2428 contents='b' * self.halt_size, 2429 gs_idempotent_generation=object_uri.generation) 2430 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], 2431 return_stderr=True) 2432 self.assertNotIn('Resuming download', stderr) 2433 2434 # TODO: Enable this test for sequential downloads when their tracker files are 2435 # modified to contain the source object generation. 2436 @unittest.skipUnless(UsingCrcmodExtension(crcmod), 2437 'Sliced download requires fast crcmod.') 2438 @SkipForS3('No sliced download support for S3.') 2439 def test_cp_resumable_download_generation_differs(self): 2440 """Tests that a resumable download restarts if the generation differs.""" 2441 bucket_uri = self.CreateBucket() 2442 file_contents = 'abcd' * self.halt_size 2443 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', 2444 contents=file_contents) 2445 fpath = self.CreateTempFile() 2446 2447 test_callback_file = self.CreateTempFile( 2448 contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5))) 2449 2450 boto_config_for_test = [ 2451 ('GSUtil', 'resumable_threshold', str(self.halt_size)), 2452 ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)), 2453 ('GSUtil', 'sliced_object_download_max_components', '3')] 2454 2455 with SetBotoConfigForTest(boto_config_for_test): 2456 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, 2457 suri(object_uri), suri(fpath)], 2458 return_stderr=True, expected_status=1) 2459 self.assertIn('Artifically halting download.', stderr) 2460 2461 # Overwrite the object with an identical object, increasing 2462 # the generation but leaving other metadata the same. 2463 identical_file = self.CreateTempFile(contents=file_contents) 2464 self.RunGsUtil(['cp', suri(identical_file), suri(object_uri)]) 2465 2466 stderr = self.RunGsUtil(['cp', suri(object_uri), suri(fpath)], 2467 return_stderr=True) 2468 self.assertIn('Restarting download from scratch', stderr) 2469 with open(fpath, 'r') as f: 2470 self.assertEqual(f.read(), file_contents, 'File contents differ') 2471 2472 def test_cp_resumable_download_file_larger(self): 2473 """Tests download deletes the tracker file when existing file is larger.""" 2474 bucket_uri = self.CreateBucket() 2475 fpath = self.CreateTempFile() 2476 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', 2477 contents='a' * self.halt_size) 2478 test_callback_file = self.CreateTempFile( 2479 contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5))) 2480 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) 2481 with SetBotoConfigForTest([boto_config_for_test]): 2482 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, 2483 suri(object_uri), fpath], 2484 expected_status=1, return_stderr=True) 2485 self.assertIn('Artifically halting download.', stderr) 2486 with open(fpath + '_.gstmp', 'w') as larger_file: 2487 for _ in range(self.halt_size * 2): 2488 larger_file.write('a') 2489 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], 2490 expected_status=1, return_stderr=True) 2491 self.assertNotIn('Resuming download', stderr) 2492 self.assertIn('Deleting tracker file', stderr) 2493 2494 def test_cp_resumable_download_content_differs(self): 2495 """Tests that we do not re-download when tracker file matches existing file. 2496 2497 We only compare size, not contents, so re-download should not occur even 2498 though the contents are technically different. However, hash validation on 2499 the file should still occur and we will delete the file then because 2500 the hashes differ. 2501 """ 2502 bucket_uri = self.CreateBucket() 2503 tmp_dir = self.CreateTempDir() 2504 fpath = self.CreateTempFile(tmpdir=tmp_dir) 2505 temp_download_file = fpath + '_.gstmp' 2506 with open(temp_download_file, 'w') as fp: 2507 fp.write('abcd' * ONE_KIB) 2508 2509 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', 2510 contents='efgh' * ONE_KIB) 2511 stdout = self.RunGsUtil(['ls', '-L', suri(object_uri)], return_stdout=True) 2512 etag_match = re.search(r'\s*ETag:\s*(.*)', stdout) 2513 self.assertIsNotNone(etag_match, 'Could not get object ETag') 2514 self.assertEqual(len(etag_match.groups()), 1, 2515 'Did not match expected single ETag') 2516 etag = etag_match.group(1) 2517 2518 tracker_filename = GetTrackerFilePath( 2519 StorageUrlFromString(fpath), TrackerFileType.DOWNLOAD, self.test_api) 2520 try: 2521 with open(tracker_filename, 'w') as tracker_fp: 2522 tracker_fp.write(etag) 2523 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) 2524 with SetBotoConfigForTest([boto_config_for_test]): 2525 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], 2526 return_stderr=True, expected_status=1) 2527 self.assertIn('Download already complete', stderr) 2528 self.assertIn('doesn\'t match cloud-supplied digest', stderr) 2529 # File and tracker file should be deleted. 2530 self.assertFalse(os.path.isfile(temp_download_file)) 2531 self.assertFalse(os.path.isfile(tracker_filename)) 2532 # Permanent file should not have been created. 2533 self.assertFalse(os.path.isfile(fpath)) 2534 finally: 2535 if os.path.exists(tracker_filename): 2536 os.unlink(tracker_filename) 2537 2538 def test_cp_resumable_download_content_matches(self): 2539 """Tests download no-ops when tracker file matches existing file.""" 2540 bucket_uri = self.CreateBucket() 2541 tmp_dir = self.CreateTempDir() 2542 fpath = self.CreateTempFile(tmpdir=tmp_dir) 2543 matching_contents = 'abcd' * ONE_KIB 2544 temp_download_file = fpath + '_.gstmp' 2545 with open(temp_download_file, 'w') as fp: 2546 fp.write(matching_contents) 2547 2548 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', 2549 contents=matching_contents) 2550 stdout = self.RunGsUtil(['ls', '-L', suri(object_uri)], return_stdout=True) 2551 etag_match = re.search(r'\s*ETag:\s*(.*)', stdout) 2552 self.assertIsNotNone(etag_match, 'Could not get object ETag') 2553 self.assertEqual(len(etag_match.groups()), 1, 2554 'Did not match expected single ETag') 2555 etag = etag_match.group(1) 2556 tracker_filename = GetTrackerFilePath( 2557 StorageUrlFromString(fpath), TrackerFileType.DOWNLOAD, self.test_api) 2558 with open(tracker_filename, 'w') as tracker_fp: 2559 tracker_fp.write(etag) 2560 try: 2561 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) 2562 with SetBotoConfigForTest([boto_config_for_test]): 2563 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], 2564 return_stderr=True) 2565 self.assertIn('Download already complete', stderr) 2566 # Tracker file should be removed after successful hash validation. 2567 self.assertFalse(os.path.isfile(tracker_filename)) 2568 finally: 2569 if os.path.exists(tracker_filename): 2570 os.unlink(tracker_filename) 2571 2572 def test_cp_resumable_download_tracker_file_not_matches(self): 2573 """Tests that download overwrites when tracker file etag does not match.""" 2574 bucket_uri = self.CreateBucket() 2575 tmp_dir = self.CreateTempDir() 2576 fpath = self.CreateTempFile(tmpdir=tmp_dir, contents='abcd' * ONE_KIB) 2577 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', 2578 contents='efgh' * ONE_KIB) 2579 stdout = self.RunGsUtil(['ls', '-L', suri(object_uri)], return_stdout=True) 2580 etag_match = re.search(r'\s*ETag:\s*(.*)', stdout) 2581 self.assertIsNotNone(etag_match, 'Could not get object ETag') 2582 self.assertEqual(len(etag_match.groups()), 1, 2583 'Did not match regex for exactly one object ETag') 2584 etag = etag_match.group(1) 2585 etag += 'nonmatching' 2586 tracker_filename = GetTrackerFilePath( 2587 StorageUrlFromString(fpath), TrackerFileType.DOWNLOAD, self.test_api) 2588 with open(tracker_filename, 'w') as tracker_fp: 2589 tracker_fp.write(etag) 2590 try: 2591 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) 2592 with SetBotoConfigForTest([boto_config_for_test]): 2593 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], 2594 return_stderr=True) 2595 self.assertNotIn('Resuming download', stderr) 2596 # Ensure the file was overwritten. 2597 with open(fpath, 'r') as in_fp: 2598 contents = in_fp.read() 2599 self.assertEqual(contents, 'efgh' * ONE_KIB, 2600 'File not overwritten when it should have been ' 2601 'due to a non-matching tracker file.') 2602 self.assertFalse(os.path.isfile(tracker_filename)) 2603 finally: 2604 if os.path.exists(tracker_filename): 2605 os.unlink(tracker_filename) 2606 2607 def test_cp_double_gzip(self): 2608 """Tests that upload and download of a doubly-gzipped file succeeds.""" 2609 bucket_uri = self.CreateBucket() 2610 fpath = self.CreateTempFile(file_name='looks-zipped.gz', contents='foo') 2611 self.RunGsUtil(['-h', 'content-type:application/gzip', 'cp', '-Z', 2612 suri(fpath), suri(bucket_uri, 'foo')]) 2613 self.RunGsUtil(['cp', suri(bucket_uri, 'foo'), fpath]) 2614 2615 @SequentialAndParallelTransfer 2616 def test_cp_resumable_download_gzip(self): 2617 """Tests that download can be resumed successfully with a gzipped file.""" 2618 # Generate some reasonably incompressible data. This compresses to a bit 2619 # around 128K in practice, but we assert specifically below that it is 2620 # larger than self.halt_size to guarantee that we can halt the download 2621 # partway through. 2622 object_uri = self.CreateObject() 2623 random.seed(0) 2624 contents = str([random.choice(string.ascii_letters) 2625 for _ in xrange(self.halt_size)]) 2626 random.seed() # Reset the seed for any other tests. 2627 fpath1 = self.CreateTempFile(file_name='unzipped.txt', contents=contents) 2628 self.RunGsUtil(['cp', '-z', 'txt', suri(fpath1), suri(object_uri)]) 2629 2630 # Use @Retry as hedge against bucket listing eventual consistency. 2631 @Retry(AssertionError, tries=3, timeout_secs=1) 2632 def _GetObjectSize(): 2633 stdout = self.RunGsUtil(['du', suri(object_uri)], return_stdout=True) 2634 size_match = re.search(r'(\d+)\s+.*', stdout) 2635 self.assertIsNotNone(size_match, 'Could not get object size') 2636 self.assertEqual(len(size_match.groups()), 1, 2637 'Did not match regex for exactly one object size.') 2638 return long(size_match.group(1)) 2639 2640 object_size = _GetObjectSize() 2641 self.assertGreaterEqual(object_size, self.halt_size, 2642 'Compresed object size was not large enough to ' 2643 'allow for a halted download, so the test results ' 2644 'would be invalid. Please increase the compressed ' 2645 'object size in the test.') 2646 fpath2 = self.CreateTempFile() 2647 test_callback_file = self.CreateTempFile( 2648 contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5))) 2649 2650 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) 2651 with SetBotoConfigForTest([boto_config_for_test]): 2652 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, 2653 suri(object_uri), suri(fpath2)], 2654 return_stderr=True, expected_status=1) 2655 self.assertIn('Artifically halting download.', stderr) 2656 self.assertIn('Downloading to temp gzip filename', stderr) 2657 2658 # Tracker files will have different names depending on if we are 2659 # downloading sequentially or in parallel. 2660 sliced_download_threshold = HumanReadableToBytes( 2661 boto.config.get('GSUtil', 'sliced_object_download_threshold', 2662 DEFAULT_SLICED_OBJECT_DOWNLOAD_THRESHOLD)) 2663 sliced_download = (len(contents) > sliced_download_threshold 2664 and sliced_download_threshold > 0 2665 and UsingCrcmodExtension(crcmod)) 2666 if sliced_download: 2667 trackerfile_type = TrackerFileType.SLICED_DOWNLOAD 2668 else: 2669 trackerfile_type = TrackerFileType.DOWNLOAD 2670 tracker_filename = GetTrackerFilePath( 2671 StorageUrlFromString(fpath2), trackerfile_type, self.test_api) 2672 2673 # We should have a temporary gzipped file, a tracker file, and no 2674 # final file yet. 2675 self.assertTrue(os.path.isfile(tracker_filename)) 2676 self.assertTrue(os.path.isfile('%s_.gztmp' % fpath2)) 2677 stderr = self.RunGsUtil(['cp', suri(object_uri), suri(fpath2)], 2678 return_stderr=True) 2679 self.assertIn('Resuming download', stderr) 2680 with open(fpath2, 'r') as f: 2681 self.assertEqual(f.read(), contents, 'File contents did not match.') 2682 self.assertFalse(os.path.isfile(tracker_filename)) 2683 self.assertFalse(os.path.isfile('%s_.gztmp' % fpath2)) 2684 2685 @SequentialAndParallelTransfer 2686 def test_cp_resumable_download_check_hashes_never(self): 2687 """Tests that resumble downloads work with check_hashes = never.""" 2688 bucket_uri = self.CreateBucket() 2689 contents = 'abcd' * self.halt_size 2690 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', 2691 contents=contents) 2692 fpath = self.CreateTempFile() 2693 test_callback_file = self.CreateTempFile( 2694 contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5))) 2695 2696 boto_config_for_test = [('GSUtil', 'resumable_threshold', str(ONE_KIB)), 2697 ('GSUtil', 'check_hashes', 'never')] 2698 with SetBotoConfigForTest(boto_config_for_test): 2699 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, 2700 suri(object_uri), fpath], 2701 expected_status=1, return_stderr=True) 2702 self.assertIn('Artifically halting download.', stderr) 2703 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], 2704 return_stderr=True) 2705 self.assertIn('Resuming download', stderr) 2706 self.assertIn('Found no hashes to validate object downloaded', stderr) 2707 with open(fpath, 'r') as f: 2708 self.assertEqual(f.read(), contents, 'File contents did not match.') 2709 2710 @SkipForS3('No resumable upload support for S3.') 2711 def test_cp_resumable_upload_bucket_deleted(self): 2712 """Tests that a not found exception is raised if bucket no longer exists.""" 2713 bucket_uri = self.CreateBucket() 2714 fpath = self.CreateTempFile(contents='a' * 2 * ONE_KIB) 2715 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) 2716 test_callback_file = self.CreateTempFile( 2717 contents=pickle.dumps( 2718 _DeleteBucketThenStartOverCopyCallbackHandler(5, bucket_uri))) 2719 2720 with SetBotoConfigForTest([boto_config_for_test]): 2721 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, 2722 fpath, suri(bucket_uri)], return_stderr=True, 2723 expected_status=1) 2724 self.assertIn('Deleting bucket', stderr) 2725 self.assertIn('bucket does not exist', stderr) 2726 2727 @SkipForS3('No sliced download support for S3.') 2728 def test_cp_sliced_download(self): 2729 """Tests that sliced object download works in the general case.""" 2730 bucket_uri = self.CreateBucket() 2731 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', 2732 contents='abc' * ONE_KIB) 2733 fpath = self.CreateTempFile() 2734 2735 # Force fast crcmod to return True to test the basic sliced download 2736 # scenario, ensuring that if the user installs crcmod, it will work. 2737 boto_config_for_test = [ 2738 ('GSUtil', 'resumable_threshold', str(ONE_KIB)), 2739 ('GSUtil', 'test_assume_fast_crcmod', 'True'), 2740 ('GSUtil', 'sliced_object_download_threshold', str(ONE_KIB)), 2741 ('GSUtil', 'sliced_object_download_max_components', '3')] 2742 2743 with SetBotoConfigForTest(boto_config_for_test): 2744 self.RunGsUtil(['cp', suri(object_uri), fpath]) 2745 2746 # Each tracker file should have been deleted. 2747 tracker_filenames = GetSlicedDownloadTrackerFilePaths( 2748 StorageUrlFromString(fpath), self.test_api) 2749 for tracker_filename in tracker_filenames: 2750 self.assertFalse(os.path.isfile(tracker_filename)) 2751 2752 with open(fpath, 'r') as f: 2753 self.assertEqual(f.read(), 'abc' * ONE_KIB, 'File contents differ') 2754 2755 @unittest.skipUnless(UsingCrcmodExtension(crcmod), 2756 'Sliced download requires fast crcmod.') 2757 @SkipForS3('No sliced download support for S3.') 2758 def test_cp_unresumable_sliced_download(self): 2759 """Tests sliced download works when resumability is disabled.""" 2760 bucket_uri = self.CreateBucket() 2761 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', 2762 contents='abcd' * self.halt_size) 2763 fpath = self.CreateTempFile() 2764 test_callback_file = self.CreateTempFile( 2765 contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5))) 2766 2767 boto_config_for_test = [ 2768 ('GSUtil', 'resumable_threshold', str(self.halt_size*5)), 2769 ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)), 2770 ('GSUtil', 'sliced_object_download_max_components', '4')] 2771 2772 with SetBotoConfigForTest(boto_config_for_test): 2773 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, 2774 suri(object_uri), suri(fpath)], 2775 return_stderr=True, expected_status=1) 2776 self.assertIn('not downloaded successfully', stderr) 2777 # Temporary download file should exist. 2778 self.assertTrue(os.path.isfile(fpath + '_.gstmp')) 2779 2780 # No tracker files should exist. 2781 tracker_filenames = GetSlicedDownloadTrackerFilePaths( 2782 StorageUrlFromString(fpath), self.test_api) 2783 for tracker_filename in tracker_filenames: 2784 self.assertFalse(os.path.isfile(tracker_filename)) 2785 2786 # Perform the entire download, without resuming. 2787 with SetBotoConfigForTest(boto_config_for_test): 2788 stderr = self.RunGsUtil(['cp', suri(object_uri), suri(fpath)], 2789 return_stderr=True) 2790 self.assertNotIn('Resuming download', stderr) 2791 # Temporary download file should have been deleted. 2792 self.assertFalse(os.path.isfile(fpath + '_.gstmp')) 2793 with open(fpath, 'r') as f: 2794 self.assertEqual(f.read(), 'abcd' * self.halt_size, 2795 'File contents differ') 2796 2797 @unittest.skipUnless(UsingCrcmodExtension(crcmod), 2798 'Sliced download requires fast crcmod.') 2799 @SkipForS3('No sliced download support for S3.') 2800 def test_cp_sliced_download_resume(self): 2801 """Tests that sliced object download is resumable.""" 2802 bucket_uri = self.CreateBucket() 2803 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', 2804 contents='abc' * self.halt_size) 2805 fpath = self.CreateTempFile() 2806 test_callback_file = self.CreateTempFile( 2807 contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5))) 2808 2809 boto_config_for_test = [ 2810 ('GSUtil', 'resumable_threshold', str(self.halt_size)), 2811 ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)), 2812 ('GSUtil', 'sliced_object_download_max_components', '3')] 2813 2814 with SetBotoConfigForTest(boto_config_for_test): 2815 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, 2816 suri(object_uri), suri(fpath)], 2817 return_stderr=True, expected_status=1) 2818 self.assertIn('not downloaded successfully', stderr) 2819 2820 # Each tracker file should exist. 2821 tracker_filenames = GetSlicedDownloadTrackerFilePaths( 2822 StorageUrlFromString(fpath), self.test_api) 2823 for tracker_filename in tracker_filenames: 2824 self.assertTrue(os.path.isfile(tracker_filename)) 2825 2826 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], 2827 return_stderr=True) 2828 self.assertIn('Resuming download', stderr) 2829 2830 # Each tracker file should have been deleted. 2831 tracker_filenames = GetSlicedDownloadTrackerFilePaths( 2832 StorageUrlFromString(fpath), self.test_api) 2833 for tracker_filename in tracker_filenames: 2834 self.assertFalse(os.path.isfile(tracker_filename)) 2835 2836 with open(fpath, 'r') as f: 2837 self.assertEqual(f.read(), 'abc' * self.halt_size, 2838 'File contents differ') 2839 2840 @unittest.skipUnless(UsingCrcmodExtension(crcmod), 2841 'Sliced download requires fast crcmod.') 2842 @SkipForS3('No sliced download support for S3.') 2843 def test_cp_sliced_download_partial_resume(self): 2844 """Test sliced download resumability when some components are finished.""" 2845 bucket_uri = self.CreateBucket() 2846 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', 2847 contents='abc' * self.halt_size) 2848 fpath = self.CreateTempFile() 2849 test_callback_file = self.CreateTempFile( 2850 contents=pickle.dumps(HaltOneComponentCopyCallbackHandler(5))) 2851 2852 boto_config_for_test = [ 2853 ('GSUtil', 'resumable_threshold', str(self.halt_size)), 2854 ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)), 2855 ('GSUtil', 'sliced_object_download_max_components', '3')] 2856 2857 with SetBotoConfigForTest(boto_config_for_test): 2858 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, 2859 suri(object_uri), suri(fpath)], 2860 return_stderr=True, expected_status=1) 2861 self.assertIn('not downloaded successfully', stderr) 2862 2863 # Each tracker file should exist. 2864 tracker_filenames = GetSlicedDownloadTrackerFilePaths( 2865 StorageUrlFromString(fpath), self.test_api) 2866 for tracker_filename in tracker_filenames: 2867 self.assertTrue(os.path.isfile(tracker_filename)) 2868 2869 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], 2870 return_stderr=True) 2871 self.assertIn('Resuming download', stderr) 2872 self.assertIn('Download already complete', stderr) 2873 2874 # Each tracker file should have been deleted. 2875 tracker_filenames = GetSlicedDownloadTrackerFilePaths( 2876 StorageUrlFromString(fpath), self.test_api) 2877 for tracker_filename in tracker_filenames: 2878 self.assertFalse(os.path.isfile(tracker_filename)) 2879 2880 with open(fpath, 'r') as f: 2881 self.assertEqual(f.read(), 'abc' * self.halt_size, 2882 'File contents differ') 2883 2884 @unittest.skipUnless(UsingCrcmodExtension(crcmod), 2885 'Sliced download requires fast crcmod.') 2886 @SkipForS3('No sliced download support for S3.') 2887 def test_cp_sliced_download_resume_content_differs(self): 2888 """Tests differing file contents are detected by sliced downloads.""" 2889 bucket_uri = self.CreateBucket() 2890 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', 2891 contents='abc' * self.halt_size) 2892 fpath = self.CreateTempFile(contents='') 2893 test_callback_file = self.CreateTempFile( 2894 contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5))) 2895 2896 boto_config_for_test = [ 2897 ('GSUtil', 'resumable_threshold', str(self.halt_size)), 2898 ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)), 2899 ('GSUtil', 'sliced_object_download_max_components', '3')] 2900 2901 with SetBotoConfigForTest(boto_config_for_test): 2902 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, 2903 suri(object_uri), suri(fpath)], 2904 return_stderr=True, expected_status=1) 2905 self.assertIn('not downloaded successfully', stderr) 2906 2907 # Temporary download file should exist. 2908 self.assertTrue(os.path.isfile(fpath + '_.gstmp')) 2909 2910 # Each tracker file should exist. 2911 tracker_filenames = GetSlicedDownloadTrackerFilePaths( 2912 StorageUrlFromString(fpath), self.test_api) 2913 for tracker_filename in tracker_filenames: 2914 self.assertTrue(os.path.isfile(tracker_filename)) 2915 2916 with open(fpath + '_.gstmp', 'r+b') as f: 2917 f.write('altered file contents') 2918 2919 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], 2920 return_stderr=True, expected_status=1) 2921 self.assertIn('Resuming download', stderr) 2922 self.assertIn('doesn\'t match cloud-supplied digest', stderr) 2923 self.assertIn('HashMismatchException: crc32c', stderr) 2924 2925 # Each tracker file should have been deleted. 2926 tracker_filenames = GetSlicedDownloadTrackerFilePaths( 2927 StorageUrlFromString(fpath), self.test_api) 2928 for tracker_filename in tracker_filenames: 2929 self.assertFalse(os.path.isfile(tracker_filename)) 2930 2931 # Temporary file should have been deleted due to hash mismatch. 2932 self.assertFalse(os.path.isfile(fpath + '_.gstmp')) 2933 # Final file should not exist. 2934 self.assertFalse(os.path.isfile(fpath)) 2935 2936 @unittest.skipUnless(UsingCrcmodExtension(crcmod), 2937 'Sliced download requires fast crcmod.') 2938 @SkipForS3('No sliced download support for S3.') 2939 def test_cp_sliced_download_component_size_changed(self): 2940 """Tests sliced download doesn't break when the boto config changes. 2941 2942 If the number of components used changes cross-process, the download should 2943 be restarted. 2944 """ 2945 bucket_uri = self.CreateBucket() 2946 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', 2947 contents='abcd' * self.halt_size) 2948 fpath = self.CreateTempFile() 2949 test_callback_file = self.CreateTempFile( 2950 contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5))) 2951 2952 boto_config_for_test = [ 2953 ('GSUtil', 'resumable_threshold', str(self.halt_size)), 2954 ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)), 2955 ('GSUtil', 'sliced_object_download_component_size', 2956 str(self.halt_size//4)), 2957 ('GSUtil', 'sliced_object_download_max_components', '4')] 2958 2959 with SetBotoConfigForTest(boto_config_for_test): 2960 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, 2961 suri(object_uri), suri(fpath)], 2962 return_stderr=True, expected_status=1) 2963 self.assertIn('not downloaded successfully', stderr) 2964 2965 boto_config_for_test = [ 2966 ('GSUtil', 'resumable_threshold', str(self.halt_size)), 2967 ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)), 2968 ('GSUtil', 'sliced_object_download_component_size', 2969 str(self.halt_size//2)), 2970 ('GSUtil', 'sliced_object_download_max_components', '2')] 2971 2972 with SetBotoConfigForTest(boto_config_for_test): 2973 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], 2974 return_stderr=True) 2975 self.assertIn('Sliced download tracker file doesn\'t match ', stderr) 2976 self.assertIn('Restarting download from scratch', stderr) 2977 self.assertNotIn('Resuming download', stderr) 2978 2979 @unittest.skipUnless(UsingCrcmodExtension(crcmod), 2980 'Sliced download requires fast crcmod.') 2981 @SkipForS3('No sliced download support for S3.') 2982 def test_cp_sliced_download_disabled_cross_process(self): 2983 """Tests temporary files are not orphaned if sliced download is disabled. 2984 2985 Specifically, temporary files should be deleted when the corresponding 2986 non-sliced download is completed. 2987 """ 2988 bucket_uri = self.CreateBucket() 2989 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', 2990 contents='abcd' * self.halt_size) 2991 fpath = self.CreateTempFile() 2992 test_callback_file = self.CreateTempFile( 2993 contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5))) 2994 2995 boto_config_for_test = [ 2996 ('GSUtil', 'resumable_threshold', str(self.halt_size)), 2997 ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)), 2998 ('GSUtil', 'sliced_object_download_max_components', '4')] 2999 3000 with SetBotoConfigForTest(boto_config_for_test): 3001 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, 3002 suri(object_uri), suri(fpath)], 3003 return_stderr=True, expected_status=1) 3004 self.assertIn('not downloaded successfully', stderr) 3005 # Temporary download file should exist. 3006 self.assertTrue(os.path.isfile(fpath + '_.gstmp')) 3007 3008 # Each tracker file should exist. 3009 tracker_filenames = GetSlicedDownloadTrackerFilePaths( 3010 StorageUrlFromString(fpath), self.test_api) 3011 for tracker_filename in tracker_filenames: 3012 self.assertTrue(os.path.isfile(tracker_filename)) 3013 3014 # Disable sliced downloads by increasing the threshold 3015 boto_config_for_test = [ 3016 ('GSUtil', 'resumable_threshold', str(self.halt_size)), 3017 ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size*5)), 3018 ('GSUtil', 'sliced_object_download_max_components', '4')] 3019 3020 with SetBotoConfigForTest(boto_config_for_test): 3021 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], 3022 return_stderr=True) 3023 self.assertNotIn('Resuming download', stderr) 3024 # Temporary download file should have been deleted. 3025 self.assertFalse(os.path.isfile(fpath + '_.gstmp')) 3026 3027 # Each tracker file should have been deleted. 3028 for tracker_filename in tracker_filenames: 3029 self.assertFalse(os.path.isfile(tracker_filename)) 3030 with open(fpath, 'r') as f: 3031 self.assertEqual(f.read(), 'abcd' * self.halt_size) 3032 3033 @SkipForS3('No resumable upload support for S3.') 3034 def test_cp_resumable_upload_start_over_http_error(self): 3035 for start_over_error in (404, 410): 3036 self.start_over_error_test_helper(start_over_error) 3037 3038 def start_over_error_test_helper(self, http_error_num): 3039 bucket_uri = self.CreateBucket() 3040 fpath = self.CreateTempFile(contents='a' * 2 * ONE_KIB) 3041 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) 3042 if self.test_api == ApiSelector.JSON: 3043 test_callback_file = self.CreateTempFile( 3044 contents=pickle.dumps(_JSONForceHTTPErrorCopyCallbackHandler(5, 404))) 3045 elif self.test_api == ApiSelector.XML: 3046 test_callback_file = self.CreateTempFile( 3047 contents=pickle.dumps( 3048 _XMLResumableUploadStartOverCopyCallbackHandler(5))) 3049 3050 with SetBotoConfigForTest([boto_config_for_test]): 3051 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, 3052 fpath, suri(bucket_uri)], return_stderr=True) 3053 self.assertIn('Restarting upload from scratch', stderr) 3054 3055 def test_cp_minus_c(self): 3056 bucket_uri = self.CreateBucket() 3057 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', 3058 contents='foo') 3059 self.RunGsUtil( 3060 ['cp', '-c', suri(bucket_uri) + '/foo2', suri(object_uri), 3061 suri(bucket_uri) + '/dir/'], 3062 expected_status=1) 3063 self.RunGsUtil(['stat', '%s/dir/foo' % suri(bucket_uri)]) 3064 3065 def test_rewrite_cp(self): 3066 """Tests the JSON Rewrite API.""" 3067 if self.test_api == ApiSelector.XML: 3068 return unittest.skip('Rewrite API is only supported in JSON.') 3069 bucket_uri = self.CreateBucket() 3070 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', 3071 contents='bar') 3072 gsutil_api = GcsJsonApi(BucketStorageUri, logging.getLogger(), 3073 DiscardMessagesQueue(), self.default_provider) 3074 key = object_uri.get_key() 3075 src_obj_metadata = apitools_messages.Object( 3076 name=key.name, bucket=key.bucket.name, contentType=key.content_type) 3077 dst_obj_metadata = apitools_messages.Object( 3078 bucket=src_obj_metadata.bucket, 3079 name=self.MakeTempName('object'), 3080 contentType=src_obj_metadata.contentType) 3081 gsutil_api.CopyObject(src_obj_metadata, dst_obj_metadata) 3082 self.assertEqual( 3083 gsutil_api.GetObjectMetadata(src_obj_metadata.bucket, 3084 src_obj_metadata.name, 3085 fields=['customerEncryption', 3086 'md5Hash']).md5Hash, 3087 gsutil_api.GetObjectMetadata(dst_obj_metadata.bucket, 3088 dst_obj_metadata.name, 3089 fields=['customerEncryption', 3090 'md5Hash']).md5Hash, 3091 'Error: Rewritten object\'s hash doesn\'t match source object.') 3092 3093 def test_rewrite_cp_resume(self): 3094 """Tests the JSON Rewrite API, breaking and resuming via a tracker file.""" 3095 if self.test_api == ApiSelector.XML: 3096 return unittest.skip('Rewrite API is only supported in JSON.') 3097 bucket_uri = self.CreateBucket() 3098 # Second bucket needs to be a different storage class so the service 3099 # actually rewrites the bytes. 3100 bucket_uri2 = self.CreateBucket( 3101 storage_class='durable_reduced_availability') 3102 # maxBytesPerCall must be >= 1 MiB, so create an object > 2 MiB because we 3103 # need 2 response from the service: 1 success, 1 failure prior to 3104 # completion. 3105 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', 3106 contents=('12'*ONE_MIB) + 'bar', 3107 prefer_json_api=True) 3108 gsutil_api = GcsJsonApi(BucketStorageUri, logging.getLogger(), 3109 DiscardMessagesQueue(), self.default_provider) 3110 key = object_uri.get_key() 3111 src_obj_metadata = apitools_messages.Object( 3112 name=key.name, bucket=key.bucket.name, contentType=key.content_type, 3113 etag=key.etag.strip('"\'')) 3114 dst_obj_name = self.MakeTempName('object') 3115 dst_obj_metadata = apitools_messages.Object( 3116 bucket=bucket_uri2.bucket_name, 3117 name=dst_obj_name, 3118 contentType=src_obj_metadata.contentType) 3119 tracker_file_name = GetRewriteTrackerFilePath( 3120 src_obj_metadata.bucket, src_obj_metadata.name, 3121 dst_obj_metadata.bucket, dst_obj_metadata.name, self.test_api) 3122 try: 3123 try: 3124 gsutil_api.CopyObject( 3125 src_obj_metadata, dst_obj_metadata, 3126 progress_callback=HaltingRewriteCallbackHandler(ONE_MIB*2).call, 3127 max_bytes_per_call=ONE_MIB) 3128 self.fail('Expected RewriteHaltException.') 3129 except RewriteHaltException: 3130 pass 3131 3132 # Tracker file should be left over. 3133 self.assertTrue(os.path.exists(tracker_file_name)) 3134 3135 # Now resume. Callback ensures we didn't start over. 3136 gsutil_api.CopyObject( 3137 src_obj_metadata, dst_obj_metadata, 3138 progress_callback=EnsureRewriteResumeCallbackHandler(ONE_MIB*2).call, 3139 max_bytes_per_call=ONE_MIB) 3140 3141 # Copy completed; tracker file should be deleted. 3142 self.assertFalse(os.path.exists(tracker_file_name)) 3143 3144 self.assertEqual( 3145 gsutil_api.GetObjectMetadata(src_obj_metadata.bucket, 3146 src_obj_metadata.name, 3147 fields=['customerEncryption', 3148 'md5Hash']).md5Hash, 3149 gsutil_api.GetObjectMetadata(dst_obj_metadata.bucket, 3150 dst_obj_metadata.name, 3151 fields=['customerEncryption', 3152 'md5Hash']).md5Hash, 3153 'Error: Rewritten object\'s hash doesn\'t match source object.') 3154 finally: 3155 # Clean up if something went wrong. 3156 DeleteTrackerFile(tracker_file_name) 3157 3158 def test_rewrite_cp_resume_source_changed(self): 3159 """Tests that Rewrite starts over when the source object has changed.""" 3160 if self.test_api == ApiSelector.XML: 3161 return unittest.skip('Rewrite API is only supported in JSON.') 3162 bucket_uri = self.CreateBucket() 3163 # Second bucket needs to be a different storage class so the service 3164 # actually rewrites the bytes. 3165 bucket_uri2 = self.CreateBucket( 3166 storage_class='durable_reduced_availability') 3167 # maxBytesPerCall must be >= 1 MiB, so create an object > 2 MiB because we 3168 # need 2 response from the service: 1 success, 1 failure prior to 3169 # completion. 3170 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', 3171 contents=('12'*ONE_MIB) + 'bar', 3172 prefer_json_api=True) 3173 gsutil_api = GcsJsonApi(BucketStorageUri, logging.getLogger(), 3174 DiscardMessagesQueue(), self.default_provider) 3175 key = object_uri.get_key() 3176 src_obj_metadata = apitools_messages.Object( 3177 name=key.name, bucket=key.bucket.name, contentType=key.content_type, 3178 etag=key.etag.strip('"\'')) 3179 dst_obj_name = self.MakeTempName('object') 3180 dst_obj_metadata = apitools_messages.Object( 3181 bucket=bucket_uri2.bucket_name, 3182 name=dst_obj_name, 3183 contentType=src_obj_metadata.contentType) 3184 tracker_file_name = GetRewriteTrackerFilePath( 3185 src_obj_metadata.bucket, src_obj_metadata.name, 3186 dst_obj_metadata.bucket, dst_obj_metadata.name, self.test_api) 3187 try: 3188 try: 3189 gsutil_api.CopyObject( 3190 src_obj_metadata, dst_obj_metadata, 3191 progress_callback=HaltingRewriteCallbackHandler(ONE_MIB*2).call, 3192 max_bytes_per_call=ONE_MIB) 3193 self.fail('Expected RewriteHaltException.') 3194 except RewriteHaltException: 3195 pass 3196 # Overwrite the original object. 3197 object_uri2 = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', 3198 contents='bar', prefer_json_api=True) 3199 key2 = object_uri2.get_key() 3200 src_obj_metadata2 = apitools_messages.Object( 3201 name=key2.name, bucket=key2.bucket.name, 3202 contentType=key2.content_type, etag=key2.etag.strip('"\'')) 3203 3204 # Tracker file for original object should still exist. 3205 self.assertTrue(os.path.exists(tracker_file_name)) 3206 3207 # Copy the new object. 3208 gsutil_api.CopyObject(src_obj_metadata2, dst_obj_metadata, 3209 max_bytes_per_call=ONE_MIB) 3210 3211 # Copy completed; original tracker file should be deleted. 3212 self.assertFalse(os.path.exists(tracker_file_name)) 3213 3214 self.assertEqual( 3215 gsutil_api.GetObjectMetadata(src_obj_metadata2.bucket, 3216 src_obj_metadata2.name, 3217 fields=['customerEncryption', 3218 'md5Hash']).md5Hash, 3219 gsutil_api.GetObjectMetadata(dst_obj_metadata.bucket, 3220 dst_obj_metadata.name, 3221 fields=['customerEncryption', 3222 'md5Hash']).md5Hash, 3223 'Error: Rewritten object\'s hash doesn\'t match source object.') 3224 finally: 3225 # Clean up if something went wrong. 3226 DeleteTrackerFile(tracker_file_name) 3227 3228 def test_rewrite_cp_resume_command_changed(self): 3229 """Tests that Rewrite starts over when the arguments changed.""" 3230 if self.test_api == ApiSelector.XML: 3231 return unittest.skip('Rewrite API is only supported in JSON.') 3232 bucket_uri = self.CreateBucket() 3233 # Second bucket needs to be a different storage class so the service 3234 # actually rewrites the bytes. 3235 bucket_uri2 = self.CreateBucket( 3236 storage_class='durable_reduced_availability') 3237 # maxBytesPerCall must be >= 1 MiB, so create an object > 2 MiB because we 3238 # need 2 response from the service: 1 success, 1 failure prior to 3239 # completion. 3240 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', 3241 contents=('12'*ONE_MIB) + 'bar', 3242 prefer_json_api=True) 3243 gsutil_api = GcsJsonApi(BucketStorageUri, logging.getLogger(), 3244 DiscardMessagesQueue(), self.default_provider) 3245 key = object_uri.get_key() 3246 src_obj_metadata = apitools_messages.Object( 3247 name=key.name, bucket=key.bucket.name, contentType=key.content_type, 3248 etag=key.etag.strip('"\'')) 3249 dst_obj_name = self.MakeTempName('object') 3250 dst_obj_metadata = apitools_messages.Object( 3251 bucket=bucket_uri2.bucket_name, 3252 name=dst_obj_name, 3253 contentType=src_obj_metadata.contentType) 3254 tracker_file_name = GetRewriteTrackerFilePath( 3255 src_obj_metadata.bucket, src_obj_metadata.name, 3256 dst_obj_metadata.bucket, dst_obj_metadata.name, self.test_api) 3257 try: 3258 try: 3259 gsutil_api.CopyObject( 3260 src_obj_metadata, dst_obj_metadata, canned_acl='private', 3261 progress_callback=HaltingRewriteCallbackHandler(ONE_MIB*2).call, 3262 max_bytes_per_call=ONE_MIB) 3263 self.fail('Expected RewriteHaltException.') 3264 except RewriteHaltException: 3265 pass 3266 3267 # Tracker file for original object should still exist. 3268 self.assertTrue(os.path.exists(tracker_file_name)) 3269 3270 # Copy the same object but with different call parameters. 3271 gsutil_api.CopyObject(src_obj_metadata, dst_obj_metadata, 3272 canned_acl='public-read', 3273 max_bytes_per_call=ONE_MIB) 3274 3275 # Copy completed; original tracker file should be deleted. 3276 self.assertFalse(os.path.exists(tracker_file_name)) 3277 3278 new_obj_metadata = gsutil_api.GetObjectMetadata( 3279 dst_obj_metadata.bucket, dst_obj_metadata.name, 3280 fields=['acl', 'customerEncryption', 'md5Hash']) 3281 self.assertEqual( 3282 gsutil_api.GetObjectMetadata(src_obj_metadata.bucket, 3283 src_obj_metadata.name, 3284 fields=['customerEncryption', 3285 'md5Hash']).md5Hash, 3286 new_obj_metadata.md5Hash, 3287 'Error: Rewritten object\'s hash doesn\'t match source object.') 3288 # New object should have a public-read ACL from the second command. 3289 found_public_acl = False 3290 for acl_entry in new_obj_metadata.acl: 3291 if acl_entry.entity == 'allUsers': 3292 found_public_acl = True 3293 self.assertTrue(found_public_acl, 3294 'New object was not written with a public ACL.') 3295 finally: 3296 # Clean up if something went wrong. 3297 DeleteTrackerFile(tracker_file_name) 3298 3299 @unittest.skipIf(IS_WINDOWS, 'POSIX attributes not available on Windows.') 3300 @unittest.skipUnless(UsingCrcmodExtension(crcmod), 3301 'Test requires fast crcmod.') 3302 def test_cp_preserve_posix_bucket_to_dir_no_errors(self): 3303 """Tests use of the -P flag with cp from a bucket to a local dir. 3304 3305 Specifically tests combinations of POSIX attributes in metadata that will 3306 pass validation. 3307 """ 3308 bucket_uri = self.CreateBucket() 3309 tmpdir = self.CreateTempDir() 3310 TestCpMvPOSIXBucketToLocalNoErrors(self, bucket_uri, tmpdir, is_cp=True) 3311 3312 @unittest.skipIf(IS_WINDOWS, 'POSIX attributes not available on Windows.') 3313 def test_cp_preserve_posix_bucket_to_dir_errors(self): 3314 """Tests use of the -P flag with cp from a bucket to a local dir. 3315 3316 Specifically, combinations of POSIX attributes in metadata that will fail 3317 validation. 3318 """ 3319 bucket_uri = self.CreateBucket() 3320 tmpdir = self.CreateTempDir() 3321 3322 obj = self.CreateObject(bucket_uri=bucket_uri, object_name='obj', 3323 contents='obj') 3324 TestCpMvPOSIXBucketToLocalErrors(self, bucket_uri, obj, tmpdir, is_cp=True) 3325 3326 @unittest.skipIf(IS_WINDOWS, 'POSIX attributes not available on Windows.') 3327 def test_cp_preseve_posix_dir_to_bucket_no_errors(self): 3328 """Tests use of the -P flag with cp from a local dir to a bucket.""" 3329 bucket_uri = self.CreateBucket() 3330 TestCpMvPOSIXLocalToBucketNoErrors(self, bucket_uri, is_cp=True) 3331 3332 def test_cp_minus_s_to_non_cloud_dest_fails(self): 3333 """Test that cp -s operations to a non-cloud destination are prevented.""" 3334 local_file = self.CreateTempFile(contents='foo') 3335 dest_dir = self.CreateTempDir() 3336 stderr = self.RunGsUtil(['cp', '-s', 'standard', local_file, dest_dir], 3337 expected_status=1, return_stderr=True) 3338 self.assertIn( 3339 'Cannot specify storage class for a non-cloud destination:', stderr) 3340 3341 # TODO: Remove @skip annotation from this test once we upgrade to the Boto 3342 # version that parses the storage class header for HEAD Object responses. 3343 @SkipForXML('Need Boto version > 2.46.1') 3344 def test_cp_specify_nondefault_storage_class(self): 3345 bucket_uri = self.CreateBucket() 3346 object_uri = self.CreateObject( 3347 bucket_uri=bucket_uri, object_name='foo', contents='foo') 3348 object2_suri = suri(object_uri) + 'bar' 3349 # Specify storage class name as mixed case here to ensure that it 3350 # gets normalized to uppercase (S3 would return an error otherwise), and 3351 # that using the normalized case is accepted by each API. 3352 nondefault_storage_class = { 3353 's3': 'Standard_iA', 3354 'gs':'durable_REDUCED_availability' 3355 } 3356 storage_class = nondefault_storage_class[self.default_provider] 3357 self.RunGsUtil(['cp', '-s', storage_class, suri(object_uri), object2_suri]) 3358 stdout = self.RunGsUtil(['stat', object2_suri], return_stdout=True) 3359 self.assertRegexpMatchesWithFlags( 3360 stdout, r'Storage class:\s+%s' % storage_class, flags=re.IGNORECASE) 3361 3362 @SkipForS3('Test uses gs-specific storage classes.') 3363 def test_cp_sets_correct_dest_storage_class(self): 3364 """Tests that object storage class is set correctly with and without -s.""" 3365 # Use a non-default storage class as the default for the bucket. 3366 bucket_uri = self.CreateBucket(storage_class='nearline') 3367 # Ensure storage class is set correctly for a local-to-cloud copy. 3368 local_fname = 'foo-orig' 3369 local_fpath = self.CreateTempFile(contents='foo', file_name=local_fname) 3370 foo_cloud_suri = suri(bucket_uri) + '/' + local_fname 3371 self.RunGsUtil(['cp', '-s', 'standard', local_fpath, foo_cloud_suri]) 3372 with SetBotoConfigForTest([('GSUtil', 'prefer_api', 'json')]): 3373 stdout = self.RunGsUtil(['stat', foo_cloud_suri], return_stdout=True) 3374 self.assertRegexpMatchesWithFlags( 3375 stdout, r'Storage class:\s+STANDARD', flags=re.IGNORECASE) 3376 3377 # Ensure storage class is set correctly for a cloud-to-cloud copy when no 3378 # destination storage class is specified. 3379 foo_nl_suri = suri(bucket_uri) + '/foo-nl' 3380 self.RunGsUtil(['cp', foo_cloud_suri, foo_nl_suri]) 3381 # TODO: Remove with-clause after adding storage class parsing in Boto. 3382 with SetBotoConfigForTest([('GSUtil', 'prefer_api', 'json')]): 3383 stdout = self.RunGsUtil(['stat', foo_nl_suri], return_stdout=True) 3384 self.assertRegexpMatchesWithFlags( 3385 stdout, r'Storage class:\s+NEARLINE', flags=re.IGNORECASE) 3386 3387 # Ensure storage class is set correctly for a cloud-to-cloud copy when a 3388 # non-bucket-default storage class is specified. 3389 foo_std_suri = suri(bucket_uri) + '/foo-std' 3390 self.RunGsUtil(['cp', '-s', 'standard', foo_nl_suri, foo_std_suri]) 3391 # TODO: Remove with-clause after adding storage class parsing in Boto. 3392 with SetBotoConfigForTest([('GSUtil', 'prefer_api', 'json')]): 3393 stdout = self.RunGsUtil(['stat', foo_std_suri], return_stdout=True) 3394 self.assertRegexpMatchesWithFlags( 3395 stdout, r'Storage class:\s+STANDARD', flags=re.IGNORECASE) 3396 3397 3398class TestCpUnitTests(testcase.GsUtilUnitTestCase): 3399 """Unit tests for gsutil cp.""" 3400 3401 def testDownloadWithNoHashAvailable(self): 3402 """Tests a download with no valid server-supplied hash.""" 3403 # S3 should have a special message for non-MD5 etags. 3404 bucket_uri = self.CreateBucket(provider='s3') 3405 object_uri = self.CreateObject(bucket_uri=bucket_uri, contents='foo') 3406 object_uri.get_key().etag = '12345' # Not an MD5 3407 dst_dir = self.CreateTempDir() 3408 3409 log_handler = self.RunCommand( 3410 'cp', [suri(object_uri), dst_dir], return_log_handler=True) 3411 warning_messages = log_handler.messages['warning'] 3412 self.assertEquals(2, len(warning_messages)) 3413 self.assertRegexpMatches( 3414 warning_messages[0], 3415 r'Non-MD5 etag \(12345\) present for key .*, ' 3416 r'data integrity checks are not possible') 3417 self.assertIn('Integrity cannot be assured', warning_messages[1]) 3418 3419 def test_object_and_prefix_same_name(self): 3420 bucket_uri = self.CreateBucket() 3421 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', 3422 contents='foo') 3423 self.CreateObject(bucket_uri=bucket_uri, 3424 object_name='foo/bar', contents='bar') 3425 fpath = self.CreateTempFile() 3426 # MockKey doesn't support hash_algs, so the MD5 will not match. 3427 with SetBotoConfigForTest([('GSUtil', 'check_hashes', 'never')]): 3428 self.RunCommand('cp', [suri(object_uri), fpath]) 3429 with open(fpath, 'r') as f: 3430 self.assertEqual(f.read(), 'foo') 3431 3432 def test_cp_upload_respects_no_hashes(self): 3433 bucket_uri = self.CreateBucket() 3434 fpath = self.CreateTempFile(contents='abcd') 3435 with SetBotoConfigForTest([('GSUtil', 'check_hashes', 'never')]): 3436 log_handler = self.RunCommand('cp', [fpath, suri(bucket_uri)], 3437 return_log_handler=True) 3438 warning_messages = log_handler.messages['warning'] 3439 self.assertEquals(1, len(warning_messages)) 3440 self.assertIn('Found no hashes to validate object upload', 3441 warning_messages[0]) 3442