1# -*- coding: utf-8 -*-
2# Copyright 2013 Google Inc. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Integration tests for cp command."""
16
17from __future__ import absolute_import
18
19import base64
20import binascii
21import datetime
22from hashlib import md5
23import httplib
24import logging
25import os
26import pickle
27import pkgutil
28import random
29import re
30import string
31import sys
32import threading
33
34from apitools.base.py import exceptions as apitools_exceptions
35import boto
36from boto import storage_uri
37from boto.exception import ResumableTransferDisposition
38from boto.exception import StorageResponseError
39from boto.storage_uri import BucketStorageUri
40import crcmod
41
42from gslib.cloud_api import ResumableUploadStartOverException
43from gslib.commands.config import DEFAULT_SLICED_OBJECT_DOWNLOAD_THRESHOLD
44from gslib.copy_helper import GetTrackerFilePath
45from gslib.copy_helper import PARALLEL_UPLOAD_STATIC_SALT
46from gslib.copy_helper import PARALLEL_UPLOAD_TEMP_NAMESPACE
47from gslib.copy_helper import TrackerFileType
48from gslib.cs_api_map import ApiSelector
49from gslib.gcs_json_api import GcsJsonApi
50from gslib.hashing_helper import CalculateB64EncodedMd5FromContents
51from gslib.hashing_helper import CalculateMd5FromContents
52from gslib.parallel_tracker_file import ObjectFromTracker
53from gslib.parallel_tracker_file import WriteParallelUploadTrackerFile
54from gslib.posix_util import GID_ATTR
55from gslib.posix_util import MODE_ATTR
56from gslib.posix_util import NA_ID
57from gslib.posix_util import NA_MODE
58from gslib.posix_util import UID_ATTR
59from gslib.posix_util import ValidateFilePermissionAccess
60from gslib.posix_util import ValidatePOSIXMode
61from gslib.storage_url import StorageUrlFromString
62from gslib.tests.rewrite_helper import EnsureRewriteResumeCallbackHandler
63from gslib.tests.rewrite_helper import HaltingRewriteCallbackHandler
64from gslib.tests.rewrite_helper import RewriteHaltException
65import gslib.tests.testcase as testcase
66from gslib.tests.testcase.base import NotParallelizable
67from gslib.tests.testcase.integration_testcase import SkipForS3
68from gslib.tests.testcase.integration_testcase import SkipForXML
69from gslib.tests.util import BuildErrorRegex
70from gslib.tests.util import GenerationFromURI as urigen
71from gslib.tests.util import HaltingCopyCallbackHandler
72from gslib.tests.util import HaltOneComponentCopyCallbackHandler
73from gslib.tests.util import HAS_GS_PORT
74from gslib.tests.util import HAS_S3_CREDS
75from gslib.tests.util import ObjectToURI as suri
76from gslib.tests.util import ORPHANED_FILE
77from gslib.tests.util import POSIX_GID_ERROR
78from gslib.tests.util import POSIX_INSUFFICIENT_ACCESS_ERROR
79from gslib.tests.util import POSIX_MODE_ERROR
80from gslib.tests.util import POSIX_UID_ERROR
81from gslib.tests.util import SequentialAndParallelTransfer
82from gslib.tests.util import SetBotoConfigForTest
83from gslib.tests.util import TailSet
84from gslib.tests.util import TEST_ENCRYPTION_KEY1
85from gslib.tests.util import TEST_ENCRYPTION_KEY1_SHA256_B64
86from gslib.tests.util import TEST_ENCRYPTION_KEY2
87from gslib.tests.util import TEST_ENCRYPTION_KEY3
88from gslib.tests.util import unittest
89from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages
90from gslib.tracker_file import DeleteTrackerFile
91from gslib.tracker_file import GetRewriteTrackerFilePath
92from gslib.tracker_file import GetSlicedDownloadTrackerFilePaths
93from gslib.ui_controller import BytesToFixedWidthString
94from gslib.util import DiscardMessagesQueue
95from gslib.util import EIGHT_MIB
96from gslib.util import HumanReadableToBytes
97from gslib.util import IS_WINDOWS
98from gslib.util import MakeHumanReadable
99from gslib.util import ONE_KIB
100from gslib.util import ONE_MIB
101from gslib.util import Retry
102from gslib.util import START_CALLBACK_PER_BYTES
103from gslib.util import UsingCrcmodExtension
104from gslib.util import UTF8
105
106# These POSIX-specific variables aren't defined for Windows.
107# pylint: disable=g-import-not-at-top
108if not IS_WINDOWS:
109  from gslib.tests.util import DEFAULT_MODE
110  from gslib.tests.util import INVALID_GID
111  from gslib.tests.util import INVALID_UID
112  from gslib.tests.util import NON_PRIMARY_GID
113  from gslib.tests.util import PRIMARY_GID
114  from gslib.tests.util import USER_ID
115# pylint: enable=g-import-not-at-top
116
117
118def TestCpMvPOSIXBucketToLocalErrors(cls, bucket_uri, obj, tmpdir, is_cp=True):
119  """Helper function for preserve_posix_errors tests in test_cp and test_mv.
120
121  Args:
122    cls: An instance of either TestCp or TestMv.
123    bucket_uri: The uri of the bucket that the object is in.
124    obj: The object to run the tests on.
125    tmpdir: The local file path to cp to.
126    is_cp: Whether or not the calling test suite is cp or mv.
127  """
128  error = 'error'
129  # A dict of test_name: attrs_dict.
130  # attrs_dict holds the different attributes that we want for the object in a
131  # specific test.
132  test_params = {'test1': {MODE_ATTR: '333', error: POSIX_MODE_ERROR},
133                 'test2': {GID_ATTR: INVALID_GID(), error: POSIX_GID_ERROR},
134                 'test3': {GID_ATTR: INVALID_GID(), MODE_ATTR: '420',
135                           error: POSIX_GID_ERROR},
136                 'test4': {UID_ATTR: INVALID_UID(), error: POSIX_UID_ERROR},
137                 'test5': {UID_ATTR: INVALID_UID(), MODE_ATTR: '530',
138                           error: POSIX_UID_ERROR},
139                 'test6': {UID_ATTR: INVALID_UID(), GID_ATTR: INVALID_GID(),
140                           error: POSIX_UID_ERROR},
141                 'test7': {UID_ATTR: INVALID_UID(), GID_ATTR: INVALID_GID(),
142                           MODE_ATTR: '640', error: POSIX_UID_ERROR},
143                 'test8': {UID_ATTR: INVALID_UID(), GID_ATTR: PRIMARY_GID,
144                           error: POSIX_UID_ERROR},
145                 'test9': {UID_ATTR: INVALID_UID(), GID_ATTR: NON_PRIMARY_GID(),
146                           error: POSIX_UID_ERROR},
147                 'test10': {UID_ATTR: INVALID_UID(), GID_ATTR: PRIMARY_GID,
148                            MODE_ATTR: '640', error: POSIX_UID_ERROR},
149                 'test11': {UID_ATTR: INVALID_UID(),
150                            GID_ATTR: NON_PRIMARY_GID(),
151                            MODE_ATTR: '640', error: POSIX_UID_ERROR},
152                 'test12': {UID_ATTR: USER_ID, GID_ATTR: INVALID_GID(),
153                            error: POSIX_GID_ERROR},
154                 'test13': {UID_ATTR: USER_ID, GID_ATTR: INVALID_GID(),
155                            MODE_ATTR: '640', error: POSIX_GID_ERROR},
156                 'test14': {GID_ATTR: PRIMARY_GID, MODE_ATTR: '240',
157                            error: POSIX_INSUFFICIENT_ACCESS_ERROR}}
158  # The first variable below can be used to help debug the test if there is a
159  # problem.
160  for test_name, attrs_dict in test_params.iteritems():
161    cls.ClearPOSIXMetadata(obj)
162
163    # Attributes default to None if they are not in attrs_dict.
164    uid = attrs_dict.get(UID_ATTR)
165    gid = attrs_dict.get(GID_ATTR)
166    mode = attrs_dict.get(MODE_ATTR)
167    cls.SetPOSIXMetadata(cls.default_provider, bucket_uri.bucket_name,
168                         obj.object_name, uid=uid, gid=gid, mode=mode)
169    stderr = cls.RunGsUtil(['cp' if is_cp else 'mv', '-P',
170                            suri(bucket_uri, obj.object_name), tmpdir],
171                           expected_status=1, return_stderr=True)
172    cls.assertIn(ORPHANED_FILE, stderr, '%s not found in stderr\n%s'
173                 % (ORPHANED_FILE, stderr))
174    error_regex = BuildErrorRegex(obj, attrs_dict.get(error))
175    cls.assertTrue(
176        error_regex.search(stderr),
177        'Test %s did not match expected error; could not find a match for '
178        '%s\n\nin stderr:\n%s' % (test_name, error_regex.pattern, stderr))
179    listing1 = TailSet(suri(bucket_uri), cls.FlatListBucket(bucket_uri))
180    listing2 = TailSet(tmpdir, cls.FlatListDir(tmpdir))
181    # Bucket should have un-altered content.
182    cls.assertEquals(listing1, set(['/%s' % obj.object_name]))
183    # Dir should have un-altered content.
184    cls.assertEquals(listing2, set(['']))
185
186
187def TestCpMvPOSIXBucketToLocalNoErrors(cls, bucket_uri, tmpdir, is_cp=True):
188  """Helper function for preserve_posix_no_errors tests in test_cp and test_mv.
189
190  Args:
191    cls: An instance of either TestCp or TestMv.
192    bucket_uri: The uri of the bucket that the object is in.
193    tmpdir: The local file path to cp to.
194    is_cp: Whether or not the calling test suite is cp or mv.
195  """
196  test_params = {'obj1': {GID_ATTR: PRIMARY_GID},
197                 'obj2': {GID_ATTR: NON_PRIMARY_GID()},
198                 'obj3': {GID_ATTR: PRIMARY_GID, MODE_ATTR: '440'},
199                 'obj4': {GID_ATTR: NON_PRIMARY_GID(), MODE_ATTR: '444'},
200                 'obj5': {UID_ATTR: USER_ID},
201                 'obj6': {UID_ATTR: USER_ID, MODE_ATTR: '420'},
202                 'obj7': {UID_ATTR: USER_ID, GID_ATTR: PRIMARY_GID},
203                 'obj8': {UID_ATTR: USER_ID, GID_ATTR: NON_PRIMARY_GID()},
204                 'obj9': {UID_ATTR: USER_ID, GID_ATTR: PRIMARY_GID,
205                          MODE_ATTR: '433'},
206                 'obj10': {UID_ATTR: USER_ID, GID_ATTR: NON_PRIMARY_GID(),
207                           MODE_ATTR: '442'}}
208  for obj_name, attrs_dict in test_params.iteritems():
209    uid = attrs_dict.get(UID_ATTR)
210    gid = attrs_dict.get(GID_ATTR)
211    mode = attrs_dict.get(MODE_ATTR)
212    cls.CreateObject(bucket_uri=bucket_uri, object_name=obj_name,
213                     contents=obj_name, uid=uid, gid=gid, mode=mode)
214  for obj_name in test_params.iterkeys():
215    # Move objects one at a time to avoid listing consistency.
216    cls.RunGsUtil(['cp' if is_cp else 'mv', '-P', suri(bucket_uri, obj_name),
217                   tmpdir])
218  listing = TailSet(tmpdir, cls.FlatListDir(tmpdir))
219  cls.assertEquals(listing, set(['/obj1', '/obj2', '/obj3', '/obj4', '/obj5',
220                                 '/obj6', '/obj7', '/obj8', '/obj9', '/obj10']))
221  cls.VerifyLocalPOSIXPermissions(os.path.join(tmpdir, 'obj1'),
222                                  gid=PRIMARY_GID, mode=DEFAULT_MODE)
223  cls.VerifyLocalPOSIXPermissions(os.path.join(tmpdir, 'obj2'),
224                                  gid=NON_PRIMARY_GID(), mode=DEFAULT_MODE)
225  cls.VerifyLocalPOSIXPermissions(os.path.join(tmpdir, 'obj3'),
226                                  gid=PRIMARY_GID, mode=0o440)
227  cls.VerifyLocalPOSIXPermissions(os.path.join(tmpdir, 'obj4'),
228                                  gid=NON_PRIMARY_GID(), mode=0o444)
229  cls.VerifyLocalPOSIXPermissions(os.path.join(tmpdir, 'obj5'),
230                                  uid=USER_ID, gid=PRIMARY_GID,
231                                  mode=DEFAULT_MODE)
232  cls.VerifyLocalPOSIXPermissions(os.path.join(tmpdir, 'obj6'),
233                                  uid=USER_ID, gid=PRIMARY_GID, mode=0o420)
234  cls.VerifyLocalPOSIXPermissions(os.path.join(tmpdir, 'obj7'),
235                                  uid=USER_ID, gid=PRIMARY_GID,
236                                  mode=DEFAULT_MODE)
237  cls.VerifyLocalPOSIXPermissions(os.path.join(tmpdir, 'obj8'),
238                                  uid=USER_ID, gid=NON_PRIMARY_GID(),
239                                  mode=DEFAULT_MODE)
240  cls.VerifyLocalPOSIXPermissions(os.path.join(tmpdir, 'obj9'),
241                                  uid=USER_ID, gid=PRIMARY_GID, mode=0o433)
242  cls.VerifyLocalPOSIXPermissions(os.path.join(tmpdir, 'obj10'),
243                                  uid=USER_ID, gid=NON_PRIMARY_GID(),
244                                  mode=0o442)
245
246
247def TestCpMvPOSIXLocalToBucketNoErrors(cls, bucket_uri, is_cp=True):
248  """Helper function for testing local to bucket POSIX preservation."""
249  test_params = {'obj1': {GID_ATTR: PRIMARY_GID},
250                 'obj2': {GID_ATTR: NON_PRIMARY_GID()},
251                 'obj3': {GID_ATTR: PRIMARY_GID, MODE_ATTR: '440'},
252                 'obj4': {GID_ATTR: NON_PRIMARY_GID(), MODE_ATTR: '444'},
253                 'obj5': {UID_ATTR: USER_ID},
254                 'obj6': {UID_ATTR: USER_ID, MODE_ATTR: '420'},
255                 'obj7': {UID_ATTR: USER_ID, GID_ATTR: PRIMARY_GID},
256                 'obj8': {UID_ATTR: USER_ID, GID_ATTR: NON_PRIMARY_GID()},
257                 'obj9': {UID_ATTR: USER_ID, GID_ATTR: PRIMARY_GID,
258                          MODE_ATTR: '433'},
259                 'obj10': {UID_ATTR: USER_ID, GID_ATTR: NON_PRIMARY_GID(),
260                           MODE_ATTR: '442'}}
261  for obj_name, attrs_dict in test_params.iteritems():
262    uid = attrs_dict.get(UID_ATTR, NA_ID)
263    gid = attrs_dict.get(GID_ATTR, NA_ID)
264    mode = attrs_dict.get(MODE_ATTR, NA_MODE)
265    if mode != NA_MODE:
266      ValidatePOSIXMode(int(mode, 8))
267    ValidateFilePermissionAccess(obj_name, uid=uid, gid=gid, mode=mode)
268    fpath = cls.CreateTempFile(contents='foo', uid=uid, gid=gid, mode=mode)
269    cls.RunGsUtil(['cp' if is_cp else 'mv', '-P', fpath,
270                   suri(bucket_uri, obj_name)])
271    if uid != NA_ID:
272      cls.VerifyObjectCustomAttribute(bucket_uri.bucket_name, obj_name,
273                                      UID_ATTR, str(uid))
274    if gid != NA_ID:
275      cls.VerifyObjectCustomAttribute(bucket_uri.bucket_name, obj_name,
276                                      GID_ATTR, str(gid))
277    if mode != NA_MODE:
278      cls.VerifyObjectCustomAttribute(bucket_uri.bucket_name, obj_name,
279                                      MODE_ATTR, str(mode))
280
281
282def _ReadContentsFromFifo(fifo_path, list_for_output):
283  with open(fifo_path, 'rb') as f:
284    list_for_output.append(f.read())
285
286
287def _WriteContentsToFifo(contents, fifo_path):
288  with open(fifo_path, 'wb') as f:
289    f.write(contents)
290
291
292class _JSONForceHTTPErrorCopyCallbackHandler(object):
293  """Test callback handler that raises an arbitrary HTTP error exception."""
294
295  def __init__(self, startover_at_byte, http_error_num):
296    self._startover_at_byte = startover_at_byte
297    self._http_error_num = http_error_num
298    self.started_over_once = False
299
300  # pylint: disable=invalid-name
301  def call(self, total_bytes_transferred, total_size):
302    """Forcibly exits if the transfer has passed the halting point."""
303    if (total_bytes_transferred >= self._startover_at_byte
304        and not self.started_over_once):
305      sys.stderr.write(
306          'Forcing HTTP error %s after byte %s. '
307          '%s/%s transferred.\r\n' % (
308              self._http_error_num,
309              self._startover_at_byte,
310              MakeHumanReadable(total_bytes_transferred),
311              MakeHumanReadable(total_size)))
312      self.started_over_once = True
313      raise apitools_exceptions.HttpError(
314          {'status': self._http_error_num}, None, None)
315
316
317class _XMLResumableUploadStartOverCopyCallbackHandler(object):
318  """Test callback handler that raises start-over exception during upload."""
319
320  def __init__(self, startover_at_byte):
321    self._startover_at_byte = startover_at_byte
322    self.started_over_once = False
323
324  # pylint: disable=invalid-name
325  def call(self, total_bytes_transferred, total_size):
326    """Forcibly exits if the transfer has passed the halting point."""
327    if (total_bytes_transferred >= self._startover_at_byte
328        and not self.started_over_once):
329      sys.stderr.write(
330          'Forcing ResumableUpload start over error after byte %s. '
331          '%s/%s transferred.\r\n' % (
332              self._startover_at_byte,
333              MakeHumanReadable(total_bytes_transferred),
334              MakeHumanReadable(total_size)))
335      self.started_over_once = True
336      raise boto.exception.ResumableUploadException(
337          'Forcing upload start over',
338          ResumableTransferDisposition.START_OVER)
339
340
341class _DeleteBucketThenStartOverCopyCallbackHandler(object):
342  """Test callback handler that deletes bucket then raises start-over."""
343
344  def __init__(self, startover_at_byte, bucket_uri):
345    self._startover_at_byte = startover_at_byte
346    self._bucket_uri = bucket_uri
347    self.started_over_once = False
348
349  # pylint: disable=invalid-name
350  def call(self, total_bytes_transferred, total_size):
351    """Forcibly exits if the transfer has passed the halting point."""
352    if (total_bytes_transferred >= self._startover_at_byte
353        and not self.started_over_once):
354      sys.stderr.write('Deleting bucket (%s)' %(self._bucket_uri.bucket_name))
355
356      @Retry(StorageResponseError, tries=5, timeout_secs=1)
357      def DeleteBucket():
358        bucket_list = list(self._bucket_uri.list_bucket(all_versions=True))
359        for k in bucket_list:
360          self._bucket_uri.get_bucket().delete_key(k.name,
361                                                   version_id=k.version_id)
362        self._bucket_uri.delete_bucket()
363
364      DeleteBucket()
365      sys.stderr.write(
366          'Forcing ResumableUpload start over error after byte %s. '
367          '%s/%s transferred.\r\n' % (
368              self._startover_at_byte,
369              MakeHumanReadable(total_bytes_transferred),
370              MakeHumanReadable(total_size)))
371      self.started_over_once = True
372      raise ResumableUploadStartOverException(
373          'Artificially forcing start-over')
374
375
376class _ResumableUploadRetryHandler(object):
377  """Test callback handler for causing retries during a resumable transfer."""
378
379  def __init__(self, retry_at_byte, exception_to_raise, exc_args,
380               num_retries=1):
381    self._retry_at_byte = retry_at_byte
382    self._exception_to_raise = exception_to_raise
383    self._exception_args = exc_args
384    self._num_retries = num_retries
385
386    self._retries_made = 0
387
388  # pylint: disable=invalid-name
389  def call(self, total_bytes_transferred, unused_total_size):
390    """Cause a single retry at the retry point."""
391    if (total_bytes_transferred >= self._retry_at_byte
392        and self._retries_made < self._num_retries):
393      self._retries_made += 1
394      raise self._exception_to_raise(*self._exception_args)
395
396
397class TestCp(testcase.GsUtilIntegrationTestCase):
398  """Integration tests for cp command."""
399
400  # For tests that artificially halt, we need to ensure at least one callback
401  # occurs.
402  halt_size = START_CALLBACK_PER_BYTES * 2
403
404  def _get_test_file(self, name):
405    contents = pkgutil.get_data('gslib', 'tests/test_data/%s' % name)
406    return self.CreateTempFile(file_name=name, contents=contents)
407
408  def _CpWithFifoViaGsUtilAndAppendOutputToList(
409      self, src_path_tuple, dst_path, list_for_return_value, **kwargs):
410    arg_list = ['cp']
411    arg_list.extend(src_path_tuple)
412    arg_list.append(dst_path)
413    # Append stderr, stdout, or return status (if specified in kwargs) to the
414    # given list.
415    list_for_return_value.append(
416        self.RunGsUtil(arg_list, **kwargs))
417
418  @SequentialAndParallelTransfer
419  def test_noclobber(self):
420    key_uri = self.CreateObject(contents='foo')
421    fpath = self.CreateTempFile(contents='bar')
422    stderr = self.RunGsUtil(['cp', '-n', fpath, suri(key_uri)],
423                            return_stderr=True)
424    self.assertIn('Skipping existing item: %s' % suri(key_uri), stderr)
425    self.assertEqual(key_uri.get_contents_as_string(), 'foo')
426    stderr = self.RunGsUtil(['cp', '-n', suri(key_uri), fpath],
427                            return_stderr=True)
428    with open(fpath, 'r') as f:
429      self.assertIn('Skipping existing item: %s' % suri(f), stderr)
430      self.assertEqual(f.read(), 'bar')
431
432  def test_dest_bucket_not_exist(self):
433    fpath = self.CreateTempFile(contents='foo')
434    invalid_bucket_uri = (
435        '%s://%s' % (self.default_provider, self.nonexistent_bucket_name))
436    stderr = self.RunGsUtil(['cp', fpath, invalid_bucket_uri],
437                            expected_status=1, return_stderr=True)
438    self.assertIn('does not exist', stderr)
439
440  def test_copy_in_cloud_noclobber(self):
441    bucket1_uri = self.CreateBucket()
442    bucket2_uri = self.CreateBucket()
443    key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents='foo')
444    stderr = self.RunGsUtil(['cp', suri(key_uri), suri(bucket2_uri)],
445                            return_stderr=True)
446    # Rewrite API may output an additional 'Copying' progress notification.
447    self.assertGreaterEqual(stderr.count('Copying'), 1)
448    self.assertLessEqual(stderr.count('Copying'), 2)
449    stderr = self.RunGsUtil(['cp', '-n', suri(key_uri), suri(bucket2_uri)],
450                            return_stderr=True)
451    self.assertIn('Skipping existing item: %s' %
452                  suri(bucket2_uri, key_uri.object_name), stderr)
453
454  @unittest.skipIf(IS_WINDOWS, 'os.mkfifo not available on Windows.')
455  @SequentialAndParallelTransfer
456  def test_cp_from_local_file_to_fifo(self):
457    contents = 'bar'
458    fifo_path = self.CreateTempFifo()
459    file_path = self.CreateTempFile(contents=contents)
460    list_for_output = []
461
462    read_thread = threading.Thread(
463        target=_ReadContentsFromFifo,
464        args=(fifo_path, list_for_output))
465    read_thread.start()
466    write_thread = threading.Thread(
467        target=self._CpWithFifoViaGsUtilAndAppendOutputToList,
468        args=((file_path,), fifo_path, []))
469    write_thread.start()
470    write_thread.join(120)
471    read_thread.join(120)
472    if not list_for_output:
473      self.fail('Reading/writing to the fifo timed out.')
474    self.assertEqual(list_for_output[0].strip(), contents)
475
476  @unittest.skipIf(IS_WINDOWS, 'os.mkfifo not available on Windows.')
477  @SequentialAndParallelTransfer
478  def test_cp_from_one_object_to_fifo(self):
479    fifo_path = self.CreateTempFifo()
480    bucket_uri = self.CreateBucket()
481    contents = 'bar'
482    obj_uri = self.CreateObject(bucket_uri=bucket_uri, contents=contents)
483    list_for_output = []
484
485    read_thread = threading.Thread(
486        target=_ReadContentsFromFifo,
487        args=(fifo_path, list_for_output))
488    read_thread.start()
489    write_thread = threading.Thread(
490        target=self._CpWithFifoViaGsUtilAndAppendOutputToList,
491        args=((suri(obj_uri),), fifo_path, []))
492    write_thread.start()
493    write_thread.join(120)
494    read_thread.join(120)
495    if not list_for_output:
496      self.fail('Reading/writing to the fifo timed out.')
497    self.assertEqual(list_for_output[0].strip(), contents)
498
499  @unittest.skipIf(IS_WINDOWS, 'os.mkfifo not available on Windows.')
500  @SequentialAndParallelTransfer
501  def test_cp_from_multiple_objects_to_fifo(self):
502    fifo_path = self.CreateTempFifo()
503    bucket_uri = self.CreateBucket()
504    contents1 = 'foo and bar'
505    contents2 = 'baz and qux'
506    obj1_uri = self.CreateObject(bucket_uri=bucket_uri, contents=contents1)
507    obj2_uri = self.CreateObject(bucket_uri=bucket_uri, contents=contents2)
508    list_for_output = []
509
510    read_thread = threading.Thread(
511        target=_ReadContentsFromFifo,
512        args=(fifo_path, list_for_output))
513    read_thread.start()
514    write_thread = threading.Thread(
515        target=self._CpWithFifoViaGsUtilAndAppendOutputToList,
516        args=((suri(obj1_uri), suri(obj2_uri)), fifo_path, []))
517    write_thread.start()
518    write_thread.join(120)
519    read_thread.join(120)
520    if not list_for_output:
521      self.fail('Reading/writing to the fifo timed out.')
522    self.assertIn(contents1, list_for_output[0])
523    self.assertIn(contents2, list_for_output[0])
524
525  @SequentialAndParallelTransfer
526  def test_streaming(self):
527    bucket_uri = self.CreateBucket()
528    stderr = self.RunGsUtil(['cp', '-', '%s' % suri(bucket_uri, 'foo')],
529                            stdin='bar', return_stderr=True)
530    self.assertIn('Copying from <STDIN>', stderr)
531    key_uri = bucket_uri.clone_replace_name('foo')
532    self.assertEqual(key_uri.get_contents_as_string(), 'bar')
533
534  @unittest.skipIf(IS_WINDOWS, 'os.mkfifo not available on Windows.')
535  @SequentialAndParallelTransfer
536  def test_streaming_from_fifo_to_object(self):
537    bucket_uri = self.CreateBucket()
538    fifo_path = self.CreateTempFifo()
539    object_name = 'foo'
540    object_contents = 'bar'
541    list_for_output = []
542
543    # Start writer in the background, which won't finish until a corresponding
544    # read operation is performed on the fifo.
545    write_thread = threading.Thread(
546        target=_WriteContentsToFifo,
547        args=(object_contents, fifo_path))
548    write_thread.start()
549    # The fifo requires both a pending read and write before either operation
550    # will complete. Regardless of which operation occurs first, the
551    # corresponding subsequent operation will unblock the first one.
552    # We run gsutil in a thread so that it can timeout rather than hang forever
553    # if the write thread fails.
554    read_thread = threading.Thread(
555        target=self._CpWithFifoViaGsUtilAndAppendOutputToList,
556        args=((fifo_path,), suri(bucket_uri, object_name), list_for_output),
557        kwargs={'return_stderr': True})
558    read_thread.start()
559
560    read_thread.join(120)
561    write_thread.join(120)
562    if not list_for_output:
563      self.fail('Reading/writing to the fifo timed out.')
564    self.assertIn('Copying from named pipe', list_for_output[0])
565
566    key_uri = bucket_uri.clone_replace_name(object_name)
567    self.assertEqual(key_uri.get_contents_as_string(), object_contents)
568
569  @unittest.skipIf(IS_WINDOWS, 'os.mkfifo not available on Windows.')
570  @SequentialAndParallelTransfer
571  def test_streaming_from_fifo_to_stdout(self):
572    fifo_path = self.CreateTempFifo()
573    contents = 'bar'
574    list_for_output = []
575
576    write_thread = threading.Thread(
577        target=_WriteContentsToFifo,
578        args=(contents, fifo_path))
579    write_thread.start()
580    read_thread = threading.Thread(
581        target=self._CpWithFifoViaGsUtilAndAppendOutputToList,
582        args=((fifo_path,), '-', list_for_output),
583        kwargs={'return_stdout': True})
584    read_thread.start()
585    read_thread.join(120)
586    write_thread.join(120)
587    if not list_for_output:
588      self.fail('Reading/writing to the fifo timed out.')
589    self.assertEqual(list_for_output[0].strip(), contents)
590
591  @unittest.skipIf(IS_WINDOWS, 'os.mkfifo not available on Windows.')
592  @SequentialAndParallelTransfer
593  def test_streaming_from_stdout_to_fifo(self):
594    fifo_path = self.CreateTempFifo()
595    contents = 'bar'
596    list_for_output = []
597    list_for_gsutil_output = []
598
599    read_thread = threading.Thread(
600        target=_ReadContentsFromFifo,
601        args=(fifo_path, list_for_output))
602    read_thread.start()
603    write_thread = threading.Thread(
604        target=self._CpWithFifoViaGsUtilAndAppendOutputToList,
605        args=(('-',), fifo_path, list_for_gsutil_output),
606        kwargs={'return_stderr': True, 'stdin': contents})
607    write_thread.start()
608    write_thread.join(120)
609    read_thread.join(120)
610    if not list_for_output:
611      self.fail('Reading/writing to the fifo timed out.')
612    self.assertEqual(list_for_output[0].strip(), contents)
613
614  def test_streaming_multiple_arguments(self):
615    bucket_uri = self.CreateBucket()
616    stderr = self.RunGsUtil(['cp', '-', '-', suri(bucket_uri)],
617                            stdin='bar', return_stderr=True, expected_status=1)
618    self.assertIn('Multiple URL strings are not supported with streaming',
619                  stderr)
620
621  # TODO: Implement a way to test both with and without using magic file.
622
623  @SequentialAndParallelTransfer
624  def test_detect_content_type(self):
625    """Tests local detection of content type."""
626    bucket_uri = self.CreateBucket()
627    dsturi = suri(bucket_uri, 'foo')
628
629    self.RunGsUtil(['cp', self._get_test_file('test.mp3'), dsturi])
630
631    # Use @Retry as hedge against bucket listing eventual consistency.
632    @Retry(AssertionError, tries=3, timeout_secs=1)
633    def _Check1():
634      stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
635      if IS_WINDOWS:
636        self.assertTrue(
637            re.search(r'Content-Type:\s+audio/x-mpg', stdout) or
638            re.search(r'Content-Type:\s+audio/mpeg', stdout))
639      else:
640        self.assertRegexpMatches(stdout, r'Content-Type:\s+audio/mpeg')
641    _Check1()
642
643    self.RunGsUtil(['cp', self._get_test_file('test.gif'), dsturi])
644
645    # Use @Retry as hedge against bucket listing eventual consistency.
646    @Retry(AssertionError, tries=3, timeout_secs=1)
647    def _Check2():
648      stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
649      self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif')
650    _Check2()
651
652  def test_content_type_override_default(self):
653    """Tests overriding content type with the default value."""
654    bucket_uri = self.CreateBucket()
655    dsturi = suri(bucket_uri, 'foo')
656
657    self.RunGsUtil(['-h', 'Content-Type:', 'cp',
658                    self._get_test_file('test.mp3'), dsturi])
659
660    # Use @Retry as hedge against bucket listing eventual consistency.
661    @Retry(AssertionError, tries=3, timeout_secs=1)
662    def _Check1():
663      stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
664      self.assertRegexpMatches(stdout,
665                               r'Content-Type:\s+application/octet-stream')
666    _Check1()
667
668    self.RunGsUtil(['-h', 'Content-Type:', 'cp',
669                    self._get_test_file('test.gif'), dsturi])
670
671    # Use @Retry as hedge against bucket listing eventual consistency.
672    @Retry(AssertionError, tries=3, timeout_secs=1)
673    def _Check2():
674      stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
675      self.assertRegexpMatches(stdout,
676                               r'Content-Type:\s+application/octet-stream')
677    _Check2()
678
679  def test_content_type_override(self):
680    """Tests overriding content type with a value."""
681    bucket_uri = self.CreateBucket()
682    dsturi = suri(bucket_uri, 'foo')
683
684    self.RunGsUtil(['-h', 'Content-Type:text/plain', 'cp',
685                    self._get_test_file('test.mp3'), dsturi])
686
687    # Use @Retry as hedge against bucket listing eventual consistency.
688    @Retry(AssertionError, tries=3, timeout_secs=1)
689    def _Check1():
690      stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
691      self.assertRegexpMatches(stdout, r'Content-Type:\s+text/plain')
692    _Check1()
693
694    self.RunGsUtil(['-h', 'Content-Type:text/plain', 'cp',
695                    self._get_test_file('test.gif'), dsturi])
696
697    # Use @Retry as hedge against bucket listing eventual consistency.
698    @Retry(AssertionError, tries=3, timeout_secs=1)
699    def _Check2():
700      stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
701      self.assertRegexpMatches(stdout, r'Content-Type:\s+text/plain')
702    _Check2()
703
704  @unittest.skipIf(IS_WINDOWS, 'magicfile is not available on Windows.')
705  @SequentialAndParallelTransfer
706  def test_magicfile_override(self):
707    """Tests content type override with magicfile value."""
708    bucket_uri = self.CreateBucket()
709    dsturi = suri(bucket_uri, 'foo')
710    fpath = self.CreateTempFile(contents='foo/bar\n')
711    self.RunGsUtil(['cp', fpath, dsturi])
712
713    # Use @Retry as hedge against bucket listing eventual consistency.
714    @Retry(AssertionError, tries=3, timeout_secs=1)
715    def _Check1():
716      stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
717      use_magicfile = boto.config.getbool('GSUtil', 'use_magicfile', False)
718      content_type = ('text/plain' if use_magicfile
719                      else 'application/octet-stream')
720      self.assertRegexpMatches(stdout, r'Content-Type:\s+%s' % content_type)
721    _Check1()
722
723  @SequentialAndParallelTransfer
724  def test_content_type_mismatches(self):
725    """Tests overriding content type when it does not match the file type."""
726    bucket_uri = self.CreateBucket()
727    dsturi = suri(bucket_uri, 'foo')
728    fpath = self.CreateTempFile(contents='foo/bar\n')
729
730    self.RunGsUtil(['-h', 'Content-Type:image/gif', 'cp',
731                    self._get_test_file('test.mp3'), dsturi])
732
733    # Use @Retry as hedge against bucket listing eventual consistency.
734    @Retry(AssertionError, tries=3, timeout_secs=1)
735    def _Check1():
736      stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
737      self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif')
738    _Check1()
739
740    self.RunGsUtil(['-h', 'Content-Type:image/gif', 'cp',
741                    self._get_test_file('test.gif'), dsturi])
742
743    # Use @Retry as hedge against bucket listing eventual consistency.
744    @Retry(AssertionError, tries=3, timeout_secs=1)
745    def _Check2():
746      stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
747      self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif')
748    _Check2()
749
750    self.RunGsUtil(['-h', 'Content-Type:image/gif', 'cp', fpath, dsturi])
751
752    # Use @Retry as hedge against bucket listing eventual consistency.
753    @Retry(AssertionError, tries=3, timeout_secs=1)
754    def _Check3():
755      stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
756      self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif')
757    _Check3()
758
759  @SequentialAndParallelTransfer
760  def test_content_type_header_case_insensitive(self):
761    """Tests that content type header is treated with case insensitivity."""
762    bucket_uri = self.CreateBucket()
763    dsturi = suri(bucket_uri, 'foo')
764    fpath = self._get_test_file('test.gif')
765
766    self.RunGsUtil(['-h', 'content-Type:text/plain', 'cp',
767                    fpath, dsturi])
768
769    # Use @Retry as hedge against bucket listing eventual consistency.
770    @Retry(AssertionError, tries=3, timeout_secs=1)
771    def _Check1():
772      stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
773      self.assertRegexpMatches(stdout, r'Content-Type:\s+text/plain')
774      self.assertNotRegexpMatches(stdout, r'image/gif')
775    _Check1()
776
777    self.RunGsUtil(['-h', 'CONTENT-TYPE:image/gif',
778                    '-h', 'content-type:image/gif',
779                    'cp', fpath, dsturi])
780
781    # Use @Retry as hedge against bucket listing eventual consistency.
782    @Retry(AssertionError, tries=3, timeout_secs=1)
783    def _Check2():
784      stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
785      self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif')
786      self.assertNotRegexpMatches(stdout, r'image/gif,\s*image/gif')
787    _Check2()
788
789  @SequentialAndParallelTransfer
790  def test_other_headers(self):
791    """Tests that non-content-type headers are applied successfully on copy."""
792    bucket_uri = self.CreateBucket()
793    dst_uri = suri(bucket_uri, 'foo')
794    fpath = self._get_test_file('test.gif')
795
796    self.RunGsUtil(['-h', 'Cache-Control:public,max-age=12',
797                    '-h', 'x-%s-meta-1:abcd' % self.provider_custom_meta, 'cp',
798                    fpath, dst_uri])
799
800    stdout = self.RunGsUtil(['ls', '-L', dst_uri], return_stdout=True)
801    self.assertRegexpMatches(stdout, r'Cache-Control\s*:\s*public,max-age=12')
802    self.assertRegexpMatches(stdout, r'Metadata:\s*1:\s*abcd')
803
804    dst_uri2 = suri(bucket_uri, 'bar')
805    self.RunGsUtil(['cp', dst_uri, dst_uri2])
806    # Ensure metadata was preserved across copy.
807    stdout = self.RunGsUtil(['ls', '-L', dst_uri2], return_stdout=True)
808    self.assertRegexpMatches(stdout, r'Cache-Control\s*:\s*public,max-age=12')
809    self.assertRegexpMatches(stdout, r'Metadata:\s*1:\s*abcd')
810
811  @SequentialAndParallelTransfer
812  def test_versioning(self):
813    """Tests copy with versioning."""
814    bucket_uri = self.CreateVersionedBucket()
815    k1_uri = self.CreateObject(bucket_uri=bucket_uri, contents='data2')
816    k2_uri = self.CreateObject(bucket_uri=bucket_uri, contents='data1')
817    g1 = urigen(k2_uri)
818    self.RunGsUtil(['cp', suri(k1_uri), suri(k2_uri)])
819    k2_uri = bucket_uri.clone_replace_name(k2_uri.object_name)
820    k2_uri = bucket_uri.clone_replace_key(k2_uri.get_key())
821    g2 = urigen(k2_uri)
822    k2_uri.set_contents_from_string('data3')
823    g3 = urigen(k2_uri)
824
825    fpath = self.CreateTempFile()
826    # Check to make sure current version is data3.
827    self.RunGsUtil(['cp', k2_uri.versionless_uri, fpath])
828    with open(fpath, 'r') as f:
829      self.assertEqual(f.read(), 'data3')
830
831    # Check contents of all three versions
832    self.RunGsUtil(['cp', '%s#%s' % (k2_uri.versionless_uri, g1), fpath])
833    with open(fpath, 'r') as f:
834      self.assertEqual(f.read(), 'data1')
835    self.RunGsUtil(['cp', '%s#%s' % (k2_uri.versionless_uri, g2), fpath])
836    with open(fpath, 'r') as f:
837      self.assertEqual(f.read(), 'data2')
838    self.RunGsUtil(['cp', '%s#%s' % (k2_uri.versionless_uri, g3), fpath])
839    with open(fpath, 'r') as f:
840      self.assertEqual(f.read(), 'data3')
841
842    # Copy first version to current and verify.
843    self.RunGsUtil(['cp', '%s#%s' % (k2_uri.versionless_uri, g1),
844                    k2_uri.versionless_uri])
845    self.RunGsUtil(['cp', k2_uri.versionless_uri, fpath])
846    with open(fpath, 'r') as f:
847      self.assertEqual(f.read(), 'data1')
848
849    # Attempt to specify a version-specific URI for destination.
850    stderr = self.RunGsUtil(['cp', fpath, k2_uri.uri], return_stderr=True,
851                            expected_status=1)
852    self.assertIn('cannot be the destination for gsutil cp', stderr)
853
854  def test_versioning_no_parallelism(self):
855    """Tests that copy all-versions errors when parallelism is enabled."""
856    stderr = self.RunGsUtil(
857        ['-m', 'cp', '-A', suri(self.nonexistent_bucket_name, 'foo'),
858         suri(self.nonexistent_bucket_name, 'bar')],
859        expected_status=1, return_stderr=True)
860    self.assertIn('-m option is not supported with the cp -A flag', stderr)
861
862  @SkipForS3('S3 lists versioned objects in reverse timestamp order.')
863  def test_recursive_copying_versioned_bucket(self):
864    """Tests cp -R with versioned buckets."""
865    bucket1_uri = self.CreateVersionedBucket()
866    bucket2_uri = self.CreateVersionedBucket()
867    bucket3_uri = self.CreateVersionedBucket()
868
869    # Write two versions of an object to the bucket1.
870    v1_uri = self.CreateObject(bucket_uri=bucket1_uri, object_name='k',
871                               contents='data0')
872    self.CreateObject(bucket_uri=bucket1_uri, object_name='k',
873                      contents='longer_data1',
874                      gs_idempotent_generation=urigen(v1_uri))
875
876    self.AssertNObjectsInBucket(bucket1_uri, 2, versioned=True)
877    self.AssertNObjectsInBucket(bucket2_uri, 0, versioned=True)
878    self.AssertNObjectsInBucket(bucket3_uri, 0, versioned=True)
879
880    # Recursively copy to second versioned bucket.
881    # -A flag should copy all versions in order.
882    self.RunGsUtil(['cp', '-R', '-A', suri(bucket1_uri, '*'),
883                    suri(bucket2_uri)])
884
885    # Use @Retry as hedge against bucket listing eventual consistency.
886    @Retry(AssertionError, tries=3, timeout_secs=1)
887    def _Check2():
888      """Validates the results of the cp -R."""
889      listing1 = self.RunGsUtil(['ls', '-la', suri(bucket1_uri)],
890                                return_stdout=True).split('\n')
891      listing2 = self.RunGsUtil(['ls', '-la', suri(bucket2_uri)],
892                                return_stdout=True).split('\n')
893      # 2 lines of listing output, 1 summary line, 1 empty line from \n split.
894      self.assertEquals(len(listing1), 4)
895      self.assertEquals(len(listing2), 4)
896
897      # First object in each bucket should match in size and version-less name.
898      size1, _, uri_str1, _ = listing1[0].split()
899      self.assertEquals(size1, str(len('data0')))
900      self.assertEquals(storage_uri(uri_str1).object_name, 'k')
901      size2, _, uri_str2, _ = listing2[0].split()
902      self.assertEquals(size2, str(len('data0')))
903      self.assertEquals(storage_uri(uri_str2).object_name, 'k')
904
905      # Similarly for second object in each bucket.
906      size1, _, uri_str1, _ = listing1[1].split()
907      self.assertEquals(size1, str(len('longer_data1')))
908      self.assertEquals(storage_uri(uri_str1).object_name, 'k')
909      size2, _, uri_str2, _ = listing2[1].split()
910      self.assertEquals(size2, str(len('longer_data1')))
911      self.assertEquals(storage_uri(uri_str2).object_name, 'k')
912    _Check2()
913
914    # Recursively copy to second versioned bucket with no -A flag.
915    # This should copy only the live object.
916    self.RunGsUtil(['cp', '-R', suri(bucket1_uri, '*'),
917                    suri(bucket3_uri)])
918
919    # Use @Retry as hedge against bucket listing eventual consistency.
920    @Retry(AssertionError, tries=3, timeout_secs=1)
921    def _Check3():
922      """Validates the results of the cp -R."""
923      listing1 = self.RunGsUtil(['ls', '-la', suri(bucket1_uri)],
924                                return_stdout=True).split('\n')
925      listing2 = self.RunGsUtil(['ls', '-la', suri(bucket3_uri)],
926                                return_stdout=True).split('\n')
927      # 2 lines of listing output, 1 summary line, 1 empty line from \n split.
928      self.assertEquals(len(listing1), 4)
929      # 1 lines of listing output, 1 summary line, 1 empty line from \n split.
930      self.assertEquals(len(listing2), 3)
931
932      # Live (second) object in bucket 1 should match the single live object.
933      size1, _, uri_str1, _ = listing2[0].split()
934      self.assertEquals(size1, str(len('longer_data1')))
935      self.assertEquals(storage_uri(uri_str1).object_name, 'k')
936    _Check3()
937
938  @SequentialAndParallelTransfer
939  @SkipForS3('Preconditions not supported for S3.')
940  def test_cp_generation_zero_match(self):
941    """Tests that cp handles an object-not-exists precondition header."""
942    bucket_uri = self.CreateBucket()
943    fpath1 = self.CreateTempFile(contents='data1')
944    # Match 0 means only write the object if it doesn't already exist.
945    gen_match_header = 'x-goog-if-generation-match:0'
946
947    # First copy should succeed.
948    # TODO: This can fail (rarely) if the server returns a 5xx but actually
949    # commits the bytes. If we add restarts on small uploads, handle this
950    # case.
951    self.RunGsUtil(['-h', gen_match_header, 'cp', fpath1, suri(bucket_uri)])
952
953    # Second copy should fail with a precondition error.
954    stderr = self.RunGsUtil(['-h', gen_match_header, 'cp', fpath1,
955                             suri(bucket_uri)],
956                            return_stderr=True, expected_status=1)
957    self.assertIn('PreconditionException', stderr)
958
959  @SequentialAndParallelTransfer
960  @SkipForS3('Preconditions not supported for S3.')
961  def test_cp_v_generation_match(self):
962    """Tests that cp -v option handles the if-generation-match header."""
963    bucket_uri = self.CreateVersionedBucket()
964    k1_uri = self.CreateObject(bucket_uri=bucket_uri, contents='data1')
965    g1 = k1_uri.generation
966
967    tmpdir = self.CreateTempDir()
968    fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents='data2')
969
970    gen_match_header = 'x-goog-if-generation-match:%s' % g1
971    # First copy should succeed.
972    self.RunGsUtil(['-h', gen_match_header, 'cp', fpath1, suri(k1_uri)])
973
974    # Second copy should fail the precondition.
975    stderr = self.RunGsUtil(['-h', gen_match_header, 'cp', fpath1,
976                             suri(k1_uri)],
977                            return_stderr=True, expected_status=1)
978
979    self.assertIn('PreconditionException', stderr)
980
981    # Specifiying a generation with -n should fail before the request hits the
982    # server.
983    stderr = self.RunGsUtil(['-h', gen_match_header, 'cp', '-n', fpath1,
984                             suri(k1_uri)],
985                            return_stderr=True, expected_status=1)
986
987    self.assertIn('ArgumentException', stderr)
988    self.assertIn('Specifying x-goog-if-generation-match is not supported '
989                  'with cp -n', stderr)
990
991  @SequentialAndParallelTransfer
992  def test_cp_nv(self):
993    """Tests that cp -nv works when skipping existing file."""
994    bucket_uri = self.CreateVersionedBucket()
995    k1_uri = self.CreateObject(bucket_uri=bucket_uri, contents='data1')
996
997    tmpdir = self.CreateTempDir()
998    fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents='data2')
999
1000    # First copy should succeed.
1001    self.RunGsUtil(['cp', '-nv', fpath1, suri(k1_uri)])
1002
1003    # Second copy should skip copying.
1004    stderr = self.RunGsUtil(['cp', '-nv', fpath1, suri(k1_uri)],
1005                            return_stderr=True)
1006    self.assertIn('Skipping existing item:', stderr)
1007
1008  @SequentialAndParallelTransfer
1009  @SkipForS3('S3 lists versioned objects in reverse timestamp order.')
1010  def test_cp_v_option(self):
1011    """"Tests that cp -v returns the created object's version-specific URI."""
1012    bucket_uri = self.CreateVersionedBucket()
1013    k1_uri = self.CreateObject(bucket_uri=bucket_uri, contents='data1')
1014    k2_uri = self.CreateObject(bucket_uri=bucket_uri, contents='data2')
1015
1016    # Case 1: Upload file to object using one-shot PUT.
1017    tmpdir = self.CreateTempDir()
1018    fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents='data1')
1019    self._run_cp_minus_v_test('-v', fpath1, k2_uri.uri)
1020
1021    # Case 2: Upload file to object using resumable upload.
1022    size_threshold = ONE_KIB
1023    boto_config_for_test = ('GSUtil', 'resumable_threshold',
1024                            str(size_threshold))
1025    with SetBotoConfigForTest([boto_config_for_test]):
1026      file_as_string = os.urandom(size_threshold)
1027      tmpdir = self.CreateTempDir()
1028      fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents=file_as_string)
1029      self._run_cp_minus_v_test('-v', fpath1, k2_uri.uri)
1030
1031    # Case 3: Upload stream to object.
1032    self._run_cp_minus_v_test('-v', '-', k2_uri.uri)
1033
1034    # Case 4: Download object to file. For this case we just expect output of
1035    # gsutil cp -v to be the URI of the file.
1036    tmpdir = self.CreateTempDir()
1037    fpath1 = self.CreateTempFile(tmpdir=tmpdir)
1038    dst_uri = storage_uri(fpath1)
1039    stderr = self.RunGsUtil(['cp', '-v', suri(k1_uri), suri(dst_uri)],
1040                            return_stderr=True)
1041    # TODO: Add ordering assertion (should be in stderr.split('\n)[-2]) back
1042    # once both the creation and status messages are handled by the UI thread.
1043    self.assertIn('Created: %s\n' % dst_uri.uri, stderr)
1044
1045    # Case 5: Daisy-chain from object to object.
1046    self._run_cp_minus_v_test('-Dv', k1_uri.uri, k2_uri.uri)
1047
1048    # Case 6: Copy object to object in-the-cloud.
1049    self._run_cp_minus_v_test('-v', k1_uri.uri, k2_uri.uri)
1050
1051  def _run_cp_minus_v_test(self, opt, src_str, dst_str):
1052    """Runs cp -v with the options and validates the results."""
1053    stderr = self.RunGsUtil(['cp', opt, src_str, dst_str], return_stderr=True)
1054    match = re.search(r'Created: (.*)\n', stderr)
1055    self.assertIsNotNone(match)
1056    created_uri = match.group(1)
1057
1058    # Use @Retry as hedge against bucket listing eventual consistency.
1059    @Retry(AssertionError, tries=3, timeout_secs=1)
1060    def _Check1():
1061      stdout = self.RunGsUtil(['ls', '-a', dst_str], return_stdout=True)
1062      lines = stdout.split('\n')
1063      # Final (most recent) object should match the "Created:" URI. This is
1064      # in second-to-last line (last line is '\n').
1065      self.assertGreater(len(lines), 2)
1066      self.assertEqual(created_uri, lines[-2])
1067    _Check1()
1068
1069  @SequentialAndParallelTransfer
1070  def test_stdin_args(self):
1071    """Tests cp with the -I option."""
1072    tmpdir = self.CreateTempDir()
1073    fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents='data1')
1074    fpath2 = self.CreateTempFile(tmpdir=tmpdir, contents='data2')
1075    bucket_uri = self.CreateBucket()
1076    self.RunGsUtil(['cp', '-I', suri(bucket_uri)],
1077                   stdin='\n'.join((fpath1, fpath2)))
1078
1079    # Use @Retry as hedge against bucket listing eventual consistency.
1080    @Retry(AssertionError, tries=3, timeout_secs=1)
1081    def _Check1():
1082      stdout = self.RunGsUtil(['ls', suri(bucket_uri)], return_stdout=True)
1083      self.assertIn(os.path.basename(fpath1), stdout)
1084      self.assertIn(os.path.basename(fpath2), stdout)
1085      self.assertNumLines(stdout, 2)
1086    _Check1()
1087
1088  def test_cross_storage_class_cloud_cp(self):
1089    bucket1_uri = self.CreateBucket(storage_class='standard')
1090    bucket2_uri = self.CreateBucket(
1091        storage_class='durable_reduced_availability')
1092    key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents='foo')
1093    # Server now allows copy-in-the-cloud across storage classes.
1094    self.RunGsUtil(['cp', suri(key_uri), suri(bucket2_uri)])
1095
1096  @unittest.skipUnless(HAS_S3_CREDS, 'Test requires both S3 and GS credentials')
1097  def test_cross_provider_cp(self):
1098    s3_bucket = self.CreateBucket(provider='s3')
1099    gs_bucket = self.CreateBucket(provider='gs')
1100    s3_key = self.CreateObject(bucket_uri=s3_bucket, contents='foo')
1101    gs_key = self.CreateObject(bucket_uri=gs_bucket, contents='bar')
1102    self.RunGsUtil(['cp', suri(s3_key), suri(gs_bucket)])
1103    self.RunGsUtil(['cp', suri(gs_key), suri(s3_bucket)])
1104
1105  @unittest.skipUnless(HAS_S3_CREDS, 'Test requires both S3 and GS credentials')
1106  @unittest.skip('This test performs a large copy but remains here for '
1107                 'debugging purposes.')
1108  def test_cross_provider_large_cp(self):
1109    s3_bucket = self.CreateBucket(provider='s3')
1110    gs_bucket = self.CreateBucket(provider='gs')
1111    s3_key = self.CreateObject(bucket_uri=s3_bucket, contents='f'*1024*1024)
1112    gs_key = self.CreateObject(bucket_uri=gs_bucket, contents='b'*1024*1024)
1113    self.RunGsUtil(['cp', suri(s3_key), suri(gs_bucket)])
1114    self.RunGsUtil(['cp', suri(gs_key), suri(s3_bucket)])
1115    with SetBotoConfigForTest([
1116        ('GSUtil', 'resumable_threshold', str(ONE_KIB)),
1117        ('GSUtil', 'json_resumable_chunk_size', str(ONE_KIB * 256))]):
1118      # Ensure copy also works across json upload chunk boundaries.
1119      self.RunGsUtil(['cp', suri(s3_key), suri(gs_bucket)])
1120
1121  @unittest.skip('This test is slow due to creating many objects, '
1122                 'but remains here for debugging purposes.')
1123  def test_daisy_chain_cp_file_sizes(self):
1124    """Ensure daisy chain cp works with a wide of file sizes."""
1125    bucket_uri = self.CreateBucket()
1126    bucket2_uri = self.CreateBucket()
1127    exponent_cap = 28  # Up to 256 MiB in size.
1128    for i in range(exponent_cap):
1129      one_byte_smaller = 2**i - 1
1130      normal = 2**i
1131      one_byte_larger = 2**i + 1
1132      self.CreateObject(bucket_uri=bucket_uri, contents='a'*one_byte_smaller)
1133      self.CreateObject(bucket_uri=bucket_uri, contents='b'*normal)
1134      self.CreateObject(bucket_uri=bucket_uri, contents='c'*one_byte_larger)
1135
1136    self.AssertNObjectsInBucket(bucket_uri, exponent_cap*3)
1137    self.RunGsUtil(['-m', 'cp', '-D', suri(bucket_uri, '**'),
1138                    suri(bucket2_uri)])
1139
1140    self.AssertNObjectsInBucket(bucket2_uri, exponent_cap*3)
1141
1142  def test_daisy_chain_cp(self):
1143    """Tests cp with the -D option."""
1144    bucket1_uri = self.CreateBucket(storage_class='standard')
1145    bucket2_uri = self.CreateBucket(
1146        storage_class='durable_reduced_availability')
1147    key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents='foo')
1148    # Set some headers on source object so we can verify that headers are
1149    # presereved by daisy-chain copy.
1150    self.RunGsUtil(['setmeta', '-h', 'Cache-Control:public,max-age=12',
1151                    '-h', 'Content-Type:image/gif',
1152                    '-h', 'x-%s-meta-1:abcd' % self.provider_custom_meta,
1153                    suri(key_uri)])
1154    # Set public-read (non-default) ACL so we can verify that cp -D -p works.
1155    self.RunGsUtil(['acl', 'set', 'public-read', suri(key_uri)])
1156    acl_json = self.RunGsUtil(['acl', 'get', suri(key_uri)], return_stdout=True)
1157    # Perform daisy-chain copy and verify that source object headers and ACL
1158    # were preserved. Also specify -n option to test that gsutil correctly
1159    # removes the x-goog-if-generation-match:0 header that was set at uploading
1160    # time when updating the ACL.
1161    stderr = self.RunGsUtil(['cp', '-Dpn', suri(key_uri), suri(bucket2_uri)],
1162                            return_stderr=True)
1163    self.assertNotIn('Copy-in-the-cloud disallowed', stderr)
1164
1165    @Retry(AssertionError, tries=3, timeout_secs=1)
1166    def _Check():
1167      uri = suri(bucket2_uri, key_uri.object_name)
1168      stdout = self.RunGsUtil(['ls', '-L', uri], return_stdout=True)
1169      self.assertRegexpMatches(stdout, r'Cache-Control:\s+public,max-age=12')
1170      self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif')
1171      self.assertRegexpMatches(stdout, r'Metadata:\s+1:\s+abcd')
1172      new_acl_json = self.RunGsUtil(['acl', 'get', uri], return_stdout=True)
1173      self.assertEqual(acl_json, new_acl_json)
1174    _Check()
1175
1176  @unittest.skipUnless(
1177      not HAS_GS_PORT, 'gs_port is defined in config which can cause '
1178      'problems when uploading and downloading to the same local host port')
1179  def test_daisy_chain_cp_download_failure(self):
1180    """Tests cp with the -D option when the download thread dies."""
1181    bucket1_uri = self.CreateBucket()
1182    bucket2_uri = self.CreateBucket()
1183    key_uri = self.CreateObject(bucket_uri=bucket1_uri,
1184                                contents='a' * self.halt_size)
1185    boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
1186    test_callback_file = self.CreateTempFile(
1187        contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5)))
1188    with SetBotoConfigForTest([boto_config_for_test]):
1189      stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
1190                               '-D', suri(key_uri), suri(bucket2_uri)],
1191                              expected_status=1, return_stderr=True)
1192      # Should have three exception traces; one from the download thread and
1193      # two from the upload thread (expection message is repeated in main's
1194      # _OutputAndExit).
1195      self.assertEqual(stderr.count(
1196          'ResumableDownloadException: Artifically halting download'), 3)
1197
1198  def test_streaming_gzip_upload(self):
1199    """Tests error when compression flag is requested on a streaming source."""
1200    bucket_uri = self.CreateBucket()
1201    stderr = self.RunGsUtil(['cp', '-Z', '-', suri(bucket_uri, 'foo')],
1202                            return_stderr=True, expected_status=1,
1203                            stdin='streaming data')
1204    self.assertIn(
1205        'gzip compression is not currently supported on streaming uploads',
1206        stderr)
1207
1208  def test_seek_ahead_upload_cp(self):
1209    """Tests that the seek-ahead iterator estimates total upload work."""
1210    tmpdir = self.CreateTempDir(test_files=3)
1211    bucket_uri = self.CreateBucket()
1212
1213    with SetBotoConfigForTest([('GSUtil', 'task_estimation_threshold', '1'),
1214                               ('GSUtil', 'task_estimation_force', 'True')]):
1215      stderr = self.RunGsUtil(['-m', 'cp', '-r', tmpdir, suri(bucket_uri)],
1216                              return_stderr=True)
1217      self.assertIn(
1218          'Estimated work for this command: objects: 3, total size: 18',
1219          stderr)
1220
1221    with SetBotoConfigForTest([('GSUtil', 'task_estimation_threshold', '0'),
1222                               ('GSUtil', 'task_estimation_force', 'True')]):
1223      stderr = self.RunGsUtil(['-m', 'cp', '-r', tmpdir, suri(bucket_uri)],
1224                              return_stderr=True)
1225      self.assertNotIn('Estimated work', stderr)
1226
1227  def test_seek_ahead_download_cp(self):
1228    tmpdir = self.CreateTempDir()
1229    bucket_uri = self.CreateBucket(test_objects=3)
1230    self.AssertNObjectsInBucket(bucket_uri, 3)
1231
1232    with SetBotoConfigForTest([('GSUtil', 'task_estimation_threshold', '1'),
1233                               ('GSUtil', 'task_estimation_force', 'True')]):
1234      stderr = self.RunGsUtil(['-m', 'cp', '-r', suri(bucket_uri), tmpdir],
1235                              return_stderr=True)
1236      self.assertIn(
1237          'Estimated work for this command: objects: 3, total size: 18',
1238          stderr)
1239
1240    with SetBotoConfigForTest([('GSUtil', 'task_estimation_threshold', '0'),
1241                               ('GSUtil', 'task_estimation_force', 'True')]):
1242      stderr = self.RunGsUtil(['-m', 'cp', '-r', suri(bucket_uri), tmpdir],
1243                              return_stderr=True)
1244      self.assertNotIn('Estimated work', stderr)
1245
1246  def test_canned_acl_cp(self):
1247    """Tests copying with a canned ACL."""
1248    bucket1_uri = self.CreateBucket()
1249    bucket2_uri = self.CreateBucket()
1250    key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents='foo')
1251    self.RunGsUtil(['cp', '-a', 'public-read', suri(key_uri),
1252                    suri(bucket2_uri)])
1253    # Set public-read on the original key after the copy so we can compare
1254    # the ACLs.
1255    self.RunGsUtil(['acl', 'set', 'public-read', suri(key_uri)])
1256    public_read_acl = self.RunGsUtil(['acl', 'get', suri(key_uri)],
1257                                     return_stdout=True)
1258
1259    @Retry(AssertionError, tries=3, timeout_secs=1)
1260    def _Check():
1261      uri = suri(bucket2_uri, key_uri.object_name)
1262      new_acl_json = self.RunGsUtil(['acl', 'get', uri], return_stdout=True)
1263      self.assertEqual(public_read_acl, new_acl_json)
1264    _Check()
1265
1266  @SequentialAndParallelTransfer
1267  def test_canned_acl_upload(self):
1268    """Tests uploading a file with a canned ACL."""
1269    bucket1_uri = self.CreateBucket()
1270    key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents='foo')
1271    # Set public-read on the object so we can compare the ACLs.
1272    self.RunGsUtil(['acl', 'set', 'public-read', suri(key_uri)])
1273    public_read_acl = self.RunGsUtil(['acl', 'get', suri(key_uri)],
1274                                     return_stdout=True)
1275
1276    file_name = 'bar'
1277    fpath = self.CreateTempFile(file_name=file_name, contents='foo')
1278    self.RunGsUtil(['cp', '-a', 'public-read', fpath, suri(bucket1_uri)])
1279    new_acl_json = self.RunGsUtil(['acl', 'get', suri(bucket1_uri, file_name)],
1280                                  return_stdout=True)
1281    self.assertEqual(public_read_acl, new_acl_json)
1282
1283    resumable_size = ONE_KIB
1284    boto_config_for_test = ('GSUtil', 'resumable_threshold',
1285                            str(resumable_size))
1286    with SetBotoConfigForTest([boto_config_for_test]):
1287      resumable_file_name = 'resumable_bar'
1288      resumable_contents = os.urandom(resumable_size)
1289      resumable_fpath = self.CreateTempFile(
1290          file_name=resumable_file_name, contents=resumable_contents)
1291      self.RunGsUtil(['cp', '-a', 'public-read', resumable_fpath,
1292                      suri(bucket1_uri)])
1293      new_resumable_acl_json = self.RunGsUtil(
1294          ['acl', 'get', suri(bucket1_uri, resumable_file_name)],
1295          return_stdout=True)
1296      self.assertEqual(public_read_acl, new_resumable_acl_json)
1297
1298  def test_cp_key_to_local_stream(self):
1299    bucket_uri = self.CreateBucket()
1300    contents = 'foo'
1301    key_uri = self.CreateObject(bucket_uri=bucket_uri, contents=contents)
1302    stdout = self.RunGsUtil(['cp', suri(key_uri), '-'], return_stdout=True)
1303    self.assertIn(contents, stdout)
1304
1305  def test_cp_local_file_to_local_stream(self):
1306    contents = 'content'
1307    fpath = self.CreateTempFile(contents=contents)
1308    stdout = self.RunGsUtil(['cp', fpath, '-'], return_stdout=True)
1309    self.assertIn(contents, stdout)
1310
1311  @SequentialAndParallelTransfer
1312  def test_cp_zero_byte_file(self):
1313    dst_bucket_uri = self.CreateBucket()
1314    src_dir = self.CreateTempDir()
1315    fpath = os.path.join(src_dir, 'zero_byte')
1316    with open(fpath, 'w') as unused_out_file:
1317      pass  # Write a zero byte file
1318    self.RunGsUtil(['cp', fpath, suri(dst_bucket_uri)])
1319
1320    @Retry(AssertionError, tries=3, timeout_secs=1)
1321    def _Check1():
1322      stdout = self.RunGsUtil(['ls', suri(dst_bucket_uri)], return_stdout=True)
1323      self.assertIn(os.path.basename(fpath), stdout)
1324    _Check1()
1325
1326    download_path = os.path.join(src_dir, 'zero_byte_download')
1327    self.RunGsUtil(['cp', suri(dst_bucket_uri, 'zero_byte'), download_path])
1328    self.assertTrue(os.stat(download_path))
1329
1330  def test_copy_bucket_to_bucket(self):
1331    """Tests recursively copying from bucket to bucket.
1332
1333    This should produce identically named objects (and not, in particular,
1334    destination objects named by the version-specific URI from source objects).
1335    """
1336    src_bucket_uri = self.CreateVersionedBucket()
1337    dst_bucket_uri = self.CreateVersionedBucket()
1338    self.CreateObject(bucket_uri=src_bucket_uri, object_name='obj0',
1339                      contents='abc')
1340    self.CreateObject(bucket_uri=src_bucket_uri, object_name='obj1',
1341                      contents='def')
1342
1343    # Use @Retry as hedge against bucket listing eventual consistency.
1344    @Retry(AssertionError, tries=3, timeout_secs=1)
1345    def _CopyAndCheck():
1346      self.RunGsUtil(['cp', '-R', suri(src_bucket_uri),
1347                      suri(dst_bucket_uri)])
1348      stdout = self.RunGsUtil(['ls', '-R', dst_bucket_uri.uri],
1349                              return_stdout=True)
1350      self.assertIn('%s%s/obj0\n' % (dst_bucket_uri,
1351                                     src_bucket_uri.bucket_name), stdout)
1352      self.assertIn('%s%s/obj1\n' % (dst_bucket_uri,
1353                                     src_bucket_uri.bucket_name), stdout)
1354    _CopyAndCheck()
1355
1356  def test_copy_bucket_to_dir(self):
1357    """Tests recursively copying from bucket to a directory.
1358
1359    This should produce identically named objects (and not, in particular,
1360    destination objects named by the version- specific URI from source objects).
1361    """
1362    src_bucket_uri = self.CreateBucket()
1363    dst_dir = self.CreateTempDir()
1364    self.CreateObject(bucket_uri=src_bucket_uri, object_name='obj0',
1365                      contents='abc')
1366    self.CreateObject(bucket_uri=src_bucket_uri, object_name='obj1',
1367                      contents='def')
1368
1369    # Use @Retry as hedge against bucket listing eventual consistency.
1370    @Retry(AssertionError, tries=3, timeout_secs=1)
1371    def _CopyAndCheck():
1372      """Copies the bucket recursively and validates the results."""
1373      self.RunGsUtil(['cp', '-R', suri(src_bucket_uri), dst_dir])
1374      dir_list = []
1375      for dirname, _, filenames in os.walk(dst_dir):
1376        for filename in filenames:
1377          dir_list.append(os.path.join(dirname, filename))
1378      dir_list = sorted(dir_list)
1379      self.assertEqual(len(dir_list), 2)
1380      self.assertEqual(os.path.join(dst_dir, src_bucket_uri.bucket_name,
1381                                    'obj0'), dir_list[0])
1382      self.assertEqual(os.path.join(dst_dir, src_bucket_uri.bucket_name,
1383                                    'obj1'), dir_list[1])
1384    _CopyAndCheck()
1385
1386  def test_recursive_download_with_leftover_dir_placeholder(self):
1387    """Tests that we correctly handle leftover dir placeholders."""
1388    src_bucket_uri = self.CreateBucket()
1389    dst_dir = self.CreateTempDir()
1390    self.CreateObject(bucket_uri=src_bucket_uri, object_name='obj0',
1391                      contents='abc')
1392    self.CreateObject(bucket_uri=src_bucket_uri, object_name='obj1',
1393                      contents='def')
1394
1395    # Create a placeholder like what can be left over by web GUI tools.
1396    key_uri = src_bucket_uri.clone_replace_name('/')
1397    key_uri.set_contents_from_string('')
1398    self.AssertNObjectsInBucket(src_bucket_uri, 3)
1399
1400    self.RunGsUtil(['cp', '-R', suri(src_bucket_uri), dst_dir])
1401    dir_list = []
1402    for dirname, _, filenames in os.walk(dst_dir):
1403      for filename in filenames:
1404        dir_list.append(os.path.join(dirname, filename))
1405    dir_list = sorted(dir_list)
1406    self.assertEqual(len(dir_list), 2)
1407    self.assertEqual(os.path.join(dst_dir, src_bucket_uri.bucket_name,
1408                                  'obj0'), dir_list[0])
1409    self.assertEqual(os.path.join(dst_dir, src_bucket_uri.bucket_name,
1410                                  'obj1'), dir_list[1])
1411
1412  def test_copy_quiet(self):
1413    bucket_uri = self.CreateBucket()
1414    key_uri = self.CreateObject(bucket_uri=bucket_uri, contents='foo')
1415    stderr = self.RunGsUtil(['-q', 'cp', suri(key_uri),
1416                             suri(bucket_uri.clone_replace_name('o2'))],
1417                            return_stderr=True)
1418    self.assertEqual(stderr.count('Copying '), 0)
1419
1420  def test_cp_md5_match(self):
1421    """Tests that the uploaded object has the expected MD5.
1422
1423    Note that while this does perform a file to object upload, MD5's are
1424    not supported for composite objects so we don't use the decorator in this
1425    case.
1426    """
1427    bucket_uri = self.CreateBucket()
1428    fpath = self.CreateTempFile(contents='bar')
1429    with open(fpath, 'r') as f_in:
1430      file_md5 = base64.encodestring(binascii.unhexlify(
1431          CalculateMd5FromContents(f_in))).rstrip('\n')
1432    self.RunGsUtil(['cp', fpath, suri(bucket_uri)])
1433
1434    # Use @Retry as hedge against bucket listing eventual consistency.
1435    @Retry(AssertionError, tries=3, timeout_secs=1)
1436    def _Check1():
1437      stdout = self.RunGsUtil(['ls', '-L', suri(bucket_uri)],
1438                              return_stdout=True)
1439      self.assertRegexpMatches(stdout,
1440                               r'Hash\s+\(md5\):\s+%s' % re.escape(file_md5))
1441    _Check1()
1442
1443  @unittest.skipIf(IS_WINDOWS,
1444                   'Unicode handling on Windows requires mods to site-packages')
1445  @SequentialAndParallelTransfer
1446  def test_cp_manifest_upload_unicode(self):
1447    return self._ManifestUpload('foo-unicöde', 'bar-unicöde',
1448                                'manifest-unicöde')
1449
1450  @SequentialAndParallelTransfer
1451  def test_cp_manifest_upload(self):
1452    """Tests uploading with a mnifest file."""
1453    return self._ManifestUpload('foo', 'bar', 'manifest')
1454
1455  def _ManifestUpload(self, file_name, object_name, manifest_name):
1456    """Tests uploading with a manifest file."""
1457    bucket_uri = self.CreateBucket()
1458    dsturi = suri(bucket_uri, object_name)
1459
1460    fpath = self.CreateTempFile(file_name=file_name, contents='bar')
1461    logpath = self.CreateTempFile(file_name=manifest_name, contents='')
1462    # Ensure the file is empty.
1463    open(logpath, 'w').close()
1464    self.RunGsUtil(['cp', '-L', logpath, fpath, dsturi])
1465    with open(logpath, 'r') as f:
1466      lines = f.readlines()
1467    self.assertEqual(len(lines), 2)
1468
1469    expected_headers = ['Source', 'Destination', 'Start', 'End', 'Md5',
1470                        'UploadId', 'Source Size', 'Bytes Transferred',
1471                        'Result', 'Description']
1472    self.assertEqual(expected_headers, lines[0].strip().split(','))
1473    results = lines[1].strip().split(',')
1474    self.assertEqual(results[0][:7], 'file://')  # source
1475    self.assertEqual(results[1][:5], '%s://' %
1476                     self.default_provider)      # destination
1477    date_format = '%Y-%m-%dT%H:%M:%S.%fZ'
1478    start_date = datetime.datetime.strptime(results[2], date_format)
1479    end_date = datetime.datetime.strptime(results[3], date_format)
1480    self.assertEqual(end_date > start_date, True)
1481    if self.RunGsUtil == testcase.GsUtilIntegrationTestCase.RunGsUtil:
1482      # Check that we didn't do automatic parallel uploads - compose doesn't
1483      # calculate the MD5 hash. Since RunGsUtil is overriden in
1484      # TestCpParallelUploads to force parallel uploads, we can check which
1485      # method was used.
1486      self.assertEqual(results[4], 'rL0Y20zC+Fzt72VPzMSk2A==')  # md5
1487    self.assertEqual(int(results[6]), 3)  # Source Size
1488    self.assertEqual(int(results[7]), 3)  # Bytes Transferred
1489    self.assertEqual(results[8], 'OK')  # Result
1490
1491  @SequentialAndParallelTransfer
1492  def test_cp_manifest_download(self):
1493    """Tests downloading with a manifest file."""
1494    key_uri = self.CreateObject(contents='foo')
1495    fpath = self.CreateTempFile(contents='')
1496    logpath = self.CreateTempFile(contents='')
1497    # Ensure the file is empty.
1498    open(logpath, 'w').close()
1499    self.RunGsUtil(['cp', '-L', logpath, suri(key_uri), fpath],
1500                   return_stdout=True)
1501    with open(logpath, 'r') as f:
1502      lines = f.readlines()
1503    self.assertEqual(len(lines), 2)
1504
1505    expected_headers = ['Source', 'Destination', 'Start', 'End', 'Md5',
1506                        'UploadId', 'Source Size', 'Bytes Transferred',
1507                        'Result', 'Description']
1508    self.assertEqual(expected_headers, lines[0].strip().split(','))
1509    results = lines[1].strip().split(',')
1510    self.assertEqual(results[0][:5], '%s://' %
1511                     self.default_provider)      # source
1512    self.assertEqual(results[1][:7], 'file://')  # destination
1513    date_format = '%Y-%m-%dT%H:%M:%S.%fZ'
1514    start_date = datetime.datetime.strptime(results[2], date_format)
1515    end_date = datetime.datetime.strptime(results[3], date_format)
1516    self.assertEqual(end_date > start_date, True)
1517    self.assertEqual(int(results[6]), 3)  # Source Size
1518    # Bytes transferred might be more than 3 if the file was gzipped, since
1519    # the minimum gzip header is 10 bytes.
1520    self.assertGreaterEqual(int(results[7]), 3)  # Bytes Transferred
1521    self.assertEqual(results[8], 'OK')  # Result
1522
1523  @SequentialAndParallelTransfer
1524  def test_copy_unicode_non_ascii_filename(self):
1525    key_uri = self.CreateObject()
1526    # Try with and without resumable upload threshold, to ensure that each
1527    # scenario works. In particular, resumable uploads have tracker filename
1528    # logic.
1529    file_contents = 'x' * START_CALLBACK_PER_BYTES * 2
1530    fpath = self.CreateTempFile(file_name=u'Аудиоархив',
1531                                contents=file_contents)
1532    with SetBotoConfigForTest([('GSUtil', 'resumable_threshold', '1')]):
1533      fpath_bytes = fpath.encode(UTF8)
1534      self.RunGsUtil(['cp', fpath_bytes, suri(key_uri)], return_stderr=True)
1535      stdout = self.RunGsUtil(['cat', suri(key_uri)], return_stdout=True)
1536      self.assertEquals(stdout, file_contents)
1537    with SetBotoConfigForTest([('GSUtil', 'resumable_threshold',
1538                                str(START_CALLBACK_PER_BYTES * 3))]):
1539      fpath_bytes = fpath.encode(UTF8)
1540      self.RunGsUtil(['cp', fpath_bytes, suri(key_uri)], return_stderr=True)
1541      stdout = self.RunGsUtil(['cat', suri(key_uri)], return_stdout=True)
1542      self.assertEquals(stdout, file_contents)
1543
1544  # Note: We originally one time implemented a test
1545  # (test_copy_invalid_unicode_filename) that invalid unicode filenames were
1546  # skipped, but it turns out os.walk() on MacOS doesn't have problems with
1547  # such files (so, failed that test). Given that, we decided to remove the
1548  # test.
1549
1550  @SequentialAndParallelTransfer
1551  def test_gzip_upload_and_download(self):
1552    bucket_uri = self.CreateBucket()
1553    contents = 'x' * 10000
1554    tmpdir = self.CreateTempDir()
1555    self.CreateTempFile(file_name='test.html', tmpdir=tmpdir, contents=contents)
1556    self.CreateTempFile(file_name='test.js', tmpdir=tmpdir, contents=contents)
1557    self.CreateTempFile(file_name='test.txt', tmpdir=tmpdir, contents=contents)
1558    # Test that copying specifying only 2 of the 3 prefixes gzips the correct
1559    # files, and test that including whitespace in the extension list works.
1560    self.RunGsUtil(['cp', '-z', 'js, html',
1561                    os.path.join(tmpdir, 'test.*'), suri(bucket_uri)])
1562    self.AssertNObjectsInBucket(bucket_uri, 3)
1563    uri1 = suri(bucket_uri, 'test.html')
1564    uri2 = suri(bucket_uri, 'test.js')
1565    uri3 = suri(bucket_uri, 'test.txt')
1566    stdout = self.RunGsUtil(['stat', uri1], return_stdout=True)
1567    self.assertRegexpMatches(stdout, r'Content-Encoding:\s+gzip')
1568    stdout = self.RunGsUtil(['stat', uri2], return_stdout=True)
1569    self.assertRegexpMatches(stdout, r'Content-Encoding:\s+gzip')
1570    stdout = self.RunGsUtil(['stat', uri3], return_stdout=True)
1571    self.assertNotRegexpMatches(stdout, r'Content-Encoding:\s+gzip')
1572    fpath4 = self.CreateTempFile()
1573    for uri in (uri1, uri2, uri3):
1574      self.RunGsUtil(['cp', uri, suri(fpath4)])
1575      with open(fpath4, 'r') as f:
1576        self.assertEqual(f.read(), contents)
1577
1578  @SequentialAndParallelTransfer
1579  def test_gzip_all_upload_and_download(self):
1580    bucket_uri = self.CreateBucket()
1581    contents = 'x' * 10000
1582    tmpdir = self.CreateTempDir()
1583    self.CreateTempFile(file_name='test.html', tmpdir=tmpdir, contents=contents)
1584    self.CreateTempFile(file_name='test.js', tmpdir=tmpdir, contents=contents)
1585    self.CreateTempFile(file_name='test.txt', tmpdir=tmpdir, contents=contents)
1586    self.CreateTempFile(file_name='test', tmpdir=tmpdir, contents=contents)
1587    # Test that all files are compressed.
1588    self.RunGsUtil(['cp', '-Z',
1589                    os.path.join(tmpdir, 'test*'), suri(bucket_uri)])
1590    self.AssertNObjectsInBucket(bucket_uri, 4)
1591    uri1 = suri(bucket_uri, 'test.html')
1592    uri2 = suri(bucket_uri, 'test.js')
1593    uri3 = suri(bucket_uri, 'test.txt')
1594    uri4 = suri(bucket_uri, 'test')
1595    stdout = self.RunGsUtil(['stat', uri1], return_stdout=True)
1596    self.assertRegexpMatches(stdout, r'Content-Encoding:\s+gzip')
1597    stdout = self.RunGsUtil(['stat', uri2], return_stdout=True)
1598    self.assertRegexpMatches(stdout, r'Content-Encoding:\s+gzip')
1599    stdout = self.RunGsUtil(['stat', uri3], return_stdout=True)
1600    self.assertRegexpMatches(stdout, r'Content-Encoding:\s+gzip')
1601    stdout = self.RunGsUtil(['stat', uri4], return_stdout=True)
1602    self.assertRegexpMatches(stdout, r'Content-Encoding:\s+gzip')
1603    fpath4 = self.CreateTempFile()
1604    for uri in (uri1, uri2, uri3, uri4):
1605      self.RunGsUtil(['cp', uri, suri(fpath4)])
1606      with open(fpath4, 'r') as f:
1607        self.assertEqual(f.read(), contents)
1608
1609  def test_both_gzip_options_error(self):
1610    # Test with -Z and -z
1611    stderr = self.RunGsUtil(['cp', '-Z', '-z', 'html, js', 'a.js', 'b.js'],
1612                            return_stderr=True, expected_status=1)
1613
1614    self.assertIn('CommandException', stderr)
1615    self.assertIn('Specifying both the -z and -Z options together is invalid.',
1616                  stderr)
1617
1618    # Same test, but with arguments in the opposite order.
1619    stderr = self.RunGsUtil(['cp', '-z', 'html, js', '-Z', 'a.js', 'b.js'],
1620                            return_stderr=True, expected_status=1)
1621
1622    self.assertIn('CommandException', stderr)
1623    self.assertIn('Specifying both the -z and -Z options together is invalid.',
1624                  stderr)
1625
1626  def test_upload_with_subdir_and_unexpanded_wildcard(self):
1627    fpath1 = self.CreateTempFile(file_name=('tmp', 'x', 'y', 'z'))
1628    bucket_uri = self.CreateBucket()
1629    wildcard_uri = '%s*' % fpath1[:-5]
1630    stderr = self.RunGsUtil(['cp', '-R', wildcard_uri, suri(bucket_uri)],
1631                            return_stderr=True)
1632    self.assertIn('Copying file:', stderr)
1633    self.AssertNObjectsInBucket(bucket_uri, 1)
1634
1635  @SequentialAndParallelTransfer
1636  def test_cp_object_ending_with_slash(self):
1637    """Tests that cp works with object names ending with slash."""
1638    tmpdir = self.CreateTempDir()
1639    bucket_uri = self.CreateBucket()
1640    self.CreateObject(bucket_uri=bucket_uri,
1641                      object_name='abc/',
1642                      contents='dir')
1643    self.CreateObject(bucket_uri=bucket_uri,
1644                      object_name='abc/def',
1645                      contents='def')
1646    self.AssertNObjectsInBucket(bucket_uri, 2)
1647    self.RunGsUtil(['cp', '-R', suri(bucket_uri), tmpdir])
1648    # Check that files in the subdir got copied even though subdir object
1649    # download was skipped.
1650    with open(os.path.join(tmpdir, bucket_uri.bucket_name, 'abc', 'def')) as f:
1651      self.assertEquals('def', '\n'.join(f.readlines()))
1652
1653  def test_cp_without_read_access(self):
1654    """Tests that cp fails without read access to the object."""
1655    # TODO: With 401's triggering retries in apitools, this test will take
1656    # a long time.  Ideally, make apitools accept a num_retries config for this
1657    # until we stop retrying the 401's.
1658    bucket_uri = self.CreateBucket()
1659    object_uri = self.CreateObject(bucket_uri=bucket_uri, contents='foo')
1660
1661    # Use @Retry as hedge against bucket listing eventual consistency.
1662    self.AssertNObjectsInBucket(bucket_uri, 1)
1663
1664    if self.default_provider == 's3':
1665      expected_error_regex = r'AccessDenied'
1666    else:
1667      expected_error_regex = r'Anonymous user(s)? do(es)? not have'
1668
1669    with self.SetAnonymousBotoCreds():
1670      stderr = self.RunGsUtil(['cp', suri(object_uri), 'foo'],
1671                              return_stderr=True, expected_status=1)
1672      self.assertRegexpMatches(stderr, expected_error_regex)
1673
1674  @unittest.skipIf(IS_WINDOWS, 'os.symlink() is not available on Windows.')
1675  def test_cp_minus_r_minus_e(self):
1676    """Tests that cp -e -r ignores symlinks when recursing."""
1677    bucket_uri = self.CreateBucket()
1678    tmpdir = self.CreateTempDir()
1679    # Create a valid file, since cp expects to copy at least one source URL
1680    # successfully.
1681    self.CreateTempFile(tmpdir=tmpdir, contents='foo')
1682    subdir = os.path.join(tmpdir, 'subdir')
1683    os.mkdir(subdir)
1684    os.mkdir(os.path.join(tmpdir, 'missing'))
1685    # Create a blank directory that is a broken symlink to ensure that we
1686    # don't fail recursive enumeration with a bad symlink.
1687    os.symlink(os.path.join(tmpdir, 'missing'),
1688               os.path.join(subdir, 'missing'))
1689    os.rmdir(os.path.join(tmpdir, 'missing'))
1690    self.RunGsUtil(['cp', '-r', '-e', tmpdir, suri(bucket_uri)])
1691
1692  @unittest.skipIf(IS_WINDOWS, 'os.symlink() is not available on Windows.')
1693  def test_cp_minus_e(self):
1694    fpath_dir = self.CreateTempDir()
1695    fpath1 = self.CreateTempFile(tmpdir=fpath_dir)
1696    fpath2 = os.path.join(fpath_dir, 'cp_minus_e')
1697    bucket_uri = self.CreateBucket()
1698    os.symlink(fpath1, fpath2)
1699    stderr = self.RunGsUtil(
1700        ['cp', '-e', '%s%s*' % (fpath_dir, os.path.sep),
1701         suri(bucket_uri, 'files')],
1702        return_stderr=True)
1703    self.assertIn('Copying file', stderr)
1704    self.assertIn('Skipping symbolic link', stderr)
1705
1706    # Ensure that top-level arguments are ignored if they are symlinks.
1707    stderr = self.RunGsUtil(
1708        ['cp', '-e', fpath1, fpath2, suri(bucket_uri, 'files')],
1709        return_stderr=True, expected_status=1)
1710    self.assertIn('Copying file', stderr)
1711    self.assertIn('Skipping symbolic link', stderr)
1712    self.assertIn('CommandException: No URLs matched: %s' % fpath2, stderr)
1713
1714  def test_cp_multithreaded_wildcard(self):
1715    """Tests that cp -m works with a wildcard."""
1716    num_test_files = 5
1717    tmp_dir = self.CreateTempDir(test_files=num_test_files)
1718    bucket_uri = self.CreateBucket()
1719    wildcard_uri = '%s%s*' % (tmp_dir, os.sep)
1720    self.RunGsUtil(['-m', 'cp', wildcard_uri, suri(bucket_uri)])
1721    self.AssertNObjectsInBucket(bucket_uri, num_test_files)
1722
1723  @SequentialAndParallelTransfer
1724  def test_cp_duplicate_source_args(self):
1725    """Tests that cp -m works when a source argument is provided twice."""
1726    object_contents = 'edge'
1727    object_uri = self.CreateObject(object_name='foo', contents=object_contents)
1728    tmp_dir = self.CreateTempDir()
1729    self.RunGsUtil(['-m', 'cp', suri(object_uri), suri(object_uri), tmp_dir])
1730    with open(os.path.join(tmp_dir, 'foo'), 'r') as in_fp:
1731      contents = in_fp.read()
1732      # Contents should be not duplicated.
1733      self.assertEqual(contents, object_contents)
1734
1735  @SkipForS3('gsutil doesn\'t support S3 customer-supplied encryption keys.')
1736  @SequentialAndParallelTransfer
1737  def test_cp_download_encrypted_object(self):
1738    """Tests downloading an encrypted object."""
1739    if self.test_api == ApiSelector.XML:
1740      return unittest.skip(
1741          'gsutil does not support encryption with the XML API')
1742    object_contents = 'bar'
1743    object_uri = self.CreateObject(object_name='foo', contents=object_contents,
1744                                   encryption_key=TEST_ENCRYPTION_KEY1)
1745    fpath = self.CreateTempFile()
1746    boto_config_for_test = [('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY1)]
1747
1748    with SetBotoConfigForTest(boto_config_for_test):
1749      self.RunGsUtil(['cp', suri(object_uri), suri(fpath)])
1750    with open(fpath, 'r') as f:
1751      self.assertEqual(f.read(), object_contents)
1752
1753    # If multiple keys are supplied and one is correct, download should succeed.
1754    fpath2 = self.CreateTempFile()
1755    boto_config_for_test2 = [
1756        ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY3),
1757        ('GSUtil', 'decryption_key1', TEST_ENCRYPTION_KEY2),
1758        ('GSUtil', 'decryption_key2', TEST_ENCRYPTION_KEY1)]
1759
1760    with SetBotoConfigForTest(boto_config_for_test2):
1761      self.RunGsUtil(['cp', suri(object_uri), suri(fpath2)])
1762    with open(fpath2, 'r') as f:
1763      self.assertEqual(f.read(), object_contents)
1764
1765  @SkipForS3('gsutil doesn\'t support S3 customer-supplied encryption keys.')
1766  @SequentialAndParallelTransfer
1767  def test_cp_download_encrypted_object_without_key(self):
1768    """Tests downloading an encrypted object without the necessary key."""
1769    if self.test_api == ApiSelector.XML:
1770      return unittest.skip(
1771          'gsutil does not support encryption with the XML API')
1772    object_contents = 'bar'
1773    object_uri = self.CreateObject(object_name='foo', contents=object_contents,
1774                                   encryption_key=TEST_ENCRYPTION_KEY1)
1775    fpath = self.CreateTempFile()
1776
1777    stderr = self.RunGsUtil(['cp', suri(object_uri), suri(fpath)],
1778                            expected_status=1, return_stderr=True)
1779    self.assertIn('Missing decryption key with SHA256 hash %s' %
1780                  TEST_ENCRYPTION_KEY1_SHA256_B64, stderr)
1781
1782  @SkipForS3('gsutil doesn\'t support S3 customer-supplied encryption keys.')
1783  @SequentialAndParallelTransfer
1784  def test_cp_upload_encrypted_object(self):
1785    """Tests uploading an encrypted object."""
1786    if self.test_api == ApiSelector.XML:
1787      return unittest.skip(
1788          'gsutil does not support encryption with the XML API')
1789    bucket_uri = self.CreateBucket()
1790    object_uri = suri(bucket_uri, 'foo')
1791    file_contents = 'bar'
1792    fpath = self.CreateTempFile(contents=file_contents, file_name='foo')
1793
1794    boto_config_for_test = [('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY1)]
1795
1796    # Uploading the object should succeed.
1797    with SetBotoConfigForTest(boto_config_for_test):
1798      self.RunGsUtil(['cp', suri(fpath), suri(bucket_uri)])
1799
1800    self.AssertObjectUsesEncryptionKey(object_uri, TEST_ENCRYPTION_KEY1)
1801
1802    with SetBotoConfigForTest(boto_config_for_test):
1803      # Reading the object back should succeed.
1804      fpath2 = self.CreateTempFile()
1805      self.RunGsUtil(['cp', suri(bucket_uri, 'foo'), suri(fpath2)])
1806      with open(fpath2, 'r') as f:
1807        self.assertEqual(f.read(), file_contents)
1808
1809  @SkipForS3('No resumable upload or encryption support for S3.')
1810  def test_cp_resumable_upload_encrypted_object_break(self):
1811    """Tests that an encrypted upload resumes after a connection break."""
1812    if self.test_api == ApiSelector.XML:
1813      return unittest.skip(
1814          'gsutil does not support encryption with the XML API')
1815    bucket_uri = self.CreateBucket()
1816    object_uri_str = suri(bucket_uri, 'foo')
1817    fpath = self.CreateTempFile(contents='a' * self.halt_size)
1818    boto_config_for_test = [
1819        ('GSUtil', 'resumable_threshold', str(ONE_KIB)),
1820        ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY1)]
1821    test_callback_file = self.CreateTempFile(
1822        contents=pickle.dumps(HaltingCopyCallbackHandler(True, 5)))
1823
1824    with SetBotoConfigForTest(boto_config_for_test):
1825      stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
1826                               fpath, object_uri_str],
1827                              expected_status=1, return_stderr=True)
1828      self.assertIn('Artifically halting upload', stderr)
1829      stderr = self.RunGsUtil(['cp', fpath, object_uri_str],
1830                              return_stderr=True)
1831      self.assertIn('Resuming upload', stderr)
1832      stdout = self.RunGsUtil(['stat', object_uri_str], return_stdout=True)
1833      with open(fpath, 'rb') as fp:
1834        self.assertIn(CalculateB64EncodedMd5FromContents(fp), stdout)
1835
1836    self.AssertObjectUsesEncryptionKey(object_uri_str,
1837                                       TEST_ENCRYPTION_KEY1)
1838
1839  @SkipForS3('No resumable upload or encryption support for S3.')
1840  def test_cp_resumable_upload_encrypted_object_different_key(self):
1841    """Tests that an encrypted upload resume uses original encryption key."""
1842    if self.test_api == ApiSelector.XML:
1843      return unittest.skip(
1844          'gsutil does not support encryption with the XML API')
1845    bucket_uri = self.CreateBucket()
1846    object_uri_str = suri(bucket_uri, 'foo')
1847    file_contents = 'a' * self.halt_size
1848    fpath = self.CreateTempFile(contents=file_contents)
1849    boto_config_for_test = [
1850        ('GSUtil', 'resumable_threshold', str(ONE_KIB)),
1851        ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY1)]
1852    test_callback_file = self.CreateTempFile(
1853        contents=pickle.dumps(HaltingCopyCallbackHandler(True, 5)))
1854
1855    with SetBotoConfigForTest(boto_config_for_test):
1856      stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
1857                               fpath, object_uri_str],
1858                              expected_status=1, return_stderr=True)
1859      self.assertIn('Artifically halting upload', stderr)
1860
1861    # Resume the upload with multiple keys, including the original.
1862    boto_config_for_test2 = [
1863        ('GSUtil', 'resumable_threshold', str(ONE_KIB)),
1864        ('GSUtil', 'decryption_key1', TEST_ENCRYPTION_KEY2),
1865        ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY1)]
1866
1867    with SetBotoConfigForTest(boto_config_for_test2):
1868      stderr = self.RunGsUtil(['cp', fpath, object_uri_str],
1869                              return_stderr=True)
1870      self.assertIn('Resuming upload', stderr)
1871
1872    # Object should have the original key.
1873    self.AssertObjectUsesEncryptionKey(object_uri_str,
1874                                       TEST_ENCRYPTION_KEY1)
1875
1876  @SkipForS3('No resumable upload or encryption support for S3.')
1877  def test_cp_resumable_upload_encrypted_object_missing_key(self):
1878    """Tests that an encrypted upload does not resume without original key."""
1879    if self.test_api == ApiSelector.XML:
1880      return unittest.skip(
1881          'gsutil does not support encryption with the XML API')
1882    bucket_uri = self.CreateBucket()
1883    object_uri_str = suri(bucket_uri, 'foo')
1884    file_contents = 'a' * self.halt_size
1885    fpath = self.CreateTempFile(contents=file_contents)
1886    boto_config_for_test = [
1887        ('GSUtil', 'resumable_threshold', str(ONE_KIB)),
1888        ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY1)]
1889    test_callback_file = self.CreateTempFile(
1890        contents=pickle.dumps(HaltingCopyCallbackHandler(True, 5)))
1891
1892    with SetBotoConfigForTest(boto_config_for_test):
1893      stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
1894                               fpath, object_uri_str],
1895                              expected_status=1, return_stderr=True)
1896      self.assertIn('Artifically halting upload', stderr)
1897
1898    # Resume the upload without the original key.
1899    boto_config_for_test2 = [
1900        ('GSUtil', 'resumable_threshold', str(ONE_KIB)),
1901        ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY2)]
1902
1903    with SetBotoConfigForTest(boto_config_for_test2):
1904      stderr = self.RunGsUtil(['cp', fpath, object_uri_str],
1905                              return_stderr=True)
1906      self.assertNotIn('Resuming upload', stderr)
1907      self.assertIn('does not match current encryption key', stderr)
1908      self.assertIn('Restarting upload from scratch', stderr)
1909
1910      # Object should have the new key.
1911      self.AssertObjectUsesEncryptionKey(object_uri_str,
1912                                         TEST_ENCRYPTION_KEY2)
1913
1914  def _ensure_object_unencrypted(self, object_uri_str):
1915    """Strongly consistent check that the object is unencrypted."""
1916    stdout = self.RunGsUtil(['stat', object_uri_str], return_stdout=True)
1917    self.assertNotIn('Encryption Key', stdout)
1918
1919  @SkipForS3('No resumable upload support for S3.')
1920  def test_cp_resumable_upload_break(self):
1921    """Tests that an upload can be resumed after a connection break."""
1922    bucket_uri = self.CreateBucket()
1923    fpath = self.CreateTempFile(contents='a' * self.halt_size)
1924    boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
1925    test_callback_file = self.CreateTempFile(
1926        contents=pickle.dumps(HaltingCopyCallbackHandler(True, 5)))
1927
1928    with SetBotoConfigForTest([boto_config_for_test]):
1929      stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
1930                               fpath, suri(bucket_uri)],
1931                              expected_status=1, return_stderr=True)
1932      self.assertIn('Artifically halting upload', stderr)
1933      stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)],
1934                              return_stderr=True)
1935      self.assertIn('Resuming upload', stderr)
1936
1937  @SkipForS3('No resumable upload support for S3.')
1938  def test_cp_resumable_upload_retry(self):
1939    """Tests that a resumable upload completes with one retry."""
1940    bucket_uri = self.CreateBucket()
1941    fpath = self.CreateTempFile(contents='a' * self.halt_size)
1942    # TODO: Raising an httplib or socket error blocks bucket teardown
1943    # in JSON for 60-120s on a multiprocessing lock acquire. Figure out why;
1944    # until then, raise an apitools retryable exception.
1945    if self.test_api == ApiSelector.XML:
1946      test_callback_file = self.CreateTempFile(
1947          contents=pickle.dumps(_ResumableUploadRetryHandler(
1948              5, httplib.BadStatusLine, ('unused',))))
1949    else:
1950      test_callback_file = self.CreateTempFile(
1951          contents=pickle.dumps(_ResumableUploadRetryHandler(
1952              5, apitools_exceptions.BadStatusCodeError,
1953              ('unused', 'unused', 'unused'))))
1954    boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
1955    with SetBotoConfigForTest([boto_config_for_test]):
1956      stderr = self.RunGsUtil(['-D', 'cp', '--testcallbackfile',
1957                               test_callback_file, fpath, suri(bucket_uri)],
1958                              return_stderr=1)
1959      if self.test_api == ApiSelector.XML:
1960        self.assertIn('Got retryable failure', stderr)
1961      else:
1962        self.assertIn('Retrying', stderr)
1963
1964  @SkipForS3('No resumable upload support for S3.')
1965  def test_cp_resumable_streaming_upload_retry(self):
1966    """Tests that a streaming resumable upload completes with one retry."""
1967    if self.test_api == ApiSelector.XML:
1968      return unittest.skip('XML does not support resumable streaming uploads.')
1969    bucket_uri = self.CreateBucket()
1970
1971    test_callback_file = self.CreateTempFile(
1972        contents=pickle.dumps(_ResumableUploadRetryHandler(
1973            5, apitools_exceptions.BadStatusCodeError,
1974            ('unused', 'unused', 'unused'))))
1975    # Need to reduce the JSON chunk size since streaming uploads buffer a
1976    # full chunk.
1977    boto_configs_for_test = [('GSUtil', 'json_resumable_chunk_size',
1978                              str(256 * ONE_KIB)),
1979                             ('Boto', 'num_retries', '2')]
1980    with SetBotoConfigForTest(boto_configs_for_test):
1981      stderr = self.RunGsUtil(
1982          ['-D', 'cp', '--testcallbackfile', test_callback_file, '-',
1983           suri(bucket_uri, 'foo')],
1984          stdin='a' * 512 * ONE_KIB, return_stderr=1)
1985      self.assertIn('Retrying', stderr)
1986
1987  @SkipForS3('preserve_acl flag not supported for S3.')
1988  def test_cp_preserve_no_owner(self):
1989    bucket_uri = self.CreateBucket()
1990    object_uri = self.CreateObject(bucket_uri=bucket_uri, contents='foo')
1991    # Anonymous user can read the object and write to the bucket, but does
1992    # not own the object.
1993    self.RunGsUtil(['acl', 'ch', '-u', 'AllUsers:R', suri(object_uri)])
1994    self.RunGsUtil(['acl', 'ch', '-u', 'AllUsers:W', suri(bucket_uri)])
1995    with self.SetAnonymousBotoCreds():
1996      stderr = self.RunGsUtil(['cp', '-p', suri(object_uri),
1997                               suri(bucket_uri, 'foo')],
1998                              return_stderr=True, expected_status=1)
1999      self.assertIn('OWNER permission is required for preserving ACLs', stderr)
2000
2001  @SkipForS3('No resumable upload support for S3.')
2002  def test_cp_progress_callbacks(self):
2003    bucket_uri = self.CreateBucket()
2004    final_size_string = BytesToFixedWidthString(1024**2)
2005    final_progress_callback = final_size_string+'/'+final_size_string
2006    fpath = self.CreateTempFile(contents='a'*ONE_MIB, file_name='foo')
2007    boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
2008    with SetBotoConfigForTest([boto_config_for_test]):
2009      stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)],
2010                              return_stderr=True)
2011      self.assertEquals(1, stderr.count(final_progress_callback))
2012    boto_config_for_test = ('GSUtil', 'resumable_threshold', str(2 * ONE_MIB))
2013    with SetBotoConfigForTest([boto_config_for_test]):
2014      stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)],
2015                              return_stderr=True)
2016      self.assertEquals(1, stderr.count(final_progress_callback))
2017    stderr = self.RunGsUtil(['cp', suri(bucket_uri, 'foo'), fpath],
2018                            return_stderr=True)
2019    self.assertEquals(1, stderr.count(final_progress_callback))
2020
2021  @SkipForS3('No resumable upload support for S3.')
2022  def test_cp_resumable_upload(self):
2023    """Tests that a basic resumable upload completes successfully."""
2024    bucket_uri = self.CreateBucket()
2025    fpath = self.CreateTempFile(contents='a' * self.halt_size)
2026    boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
2027    with SetBotoConfigForTest([boto_config_for_test]):
2028      self.RunGsUtil(['cp', fpath, suri(bucket_uri)])
2029
2030  @SkipForS3('No resumable upload support for S3.')
2031  def test_resumable_upload_break_leaves_tracker(self):
2032    """Tests that a tracker file is created with a resumable upload."""
2033    bucket_uri = self.CreateBucket()
2034    fpath = self.CreateTempFile(file_name='foo',
2035                                contents='a' * self.halt_size)
2036    boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
2037    with SetBotoConfigForTest([boto_config_for_test]):
2038      tracker_filename = GetTrackerFilePath(
2039          StorageUrlFromString(suri(bucket_uri, 'foo')),
2040          TrackerFileType.UPLOAD, self.test_api)
2041      test_callback_file = self.CreateTempFile(
2042          contents=pickle.dumps(HaltingCopyCallbackHandler(True, 5)))
2043      try:
2044        stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
2045                                 fpath, suri(bucket_uri, 'foo')],
2046                                expected_status=1, return_stderr=True)
2047        self.assertIn('Artifically halting upload', stderr)
2048        self.assertTrue(os.path.exists(tracker_filename),
2049                        'Tracker file %s not present.' % tracker_filename)
2050      finally:
2051        DeleteTrackerFile(tracker_filename)
2052
2053  @SkipForS3('No resumable upload support for S3.')
2054  def test_cp_resumable_upload_break_file_size_change(self):
2055    """Tests a resumable upload where the uploaded file changes size.
2056
2057    This should fail when we read the tracker data.
2058    """
2059    bucket_uri = self.CreateBucket()
2060    tmp_dir = self.CreateTempDir()
2061    fpath = self.CreateTempFile(file_name='foo', tmpdir=tmp_dir,
2062                                contents='a' * self.halt_size)
2063    test_callback_file = self.CreateTempFile(
2064        contents=pickle.dumps(HaltingCopyCallbackHandler(True, 5)))
2065
2066    boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
2067    with SetBotoConfigForTest([boto_config_for_test]):
2068      stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
2069                               fpath, suri(bucket_uri)],
2070                              expected_status=1, return_stderr=True)
2071      self.assertIn('Artifically halting upload', stderr)
2072      fpath = self.CreateTempFile(file_name='foo', tmpdir=tmp_dir,
2073                                  contents='a' * self.halt_size * 2)
2074      stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)],
2075                              expected_status=1, return_stderr=True)
2076      self.assertIn('ResumableUploadAbortException', stderr)
2077
2078  @SkipForS3('No resumable upload support for S3.')
2079  def test_cp_resumable_upload_break_file_content_change(self):
2080    """Tests a resumable upload where the uploaded file changes content."""
2081    if self.test_api == ApiSelector.XML:
2082      return unittest.skip(
2083          'XML doesn\'t make separate HTTP calls at fixed-size boundaries for '
2084          'resumable uploads, so we can\'t guarantee that the server saves a '
2085          'specific part of the upload.')
2086    bucket_uri = self.CreateBucket()
2087    tmp_dir = self.CreateTempDir()
2088    fpath = self.CreateTempFile(file_name='foo', tmpdir=tmp_dir,
2089                                contents='a' * ONE_KIB * 512)
2090    test_callback_file = self.CreateTempFile(
2091        contents=pickle.dumps(HaltingCopyCallbackHandler(True,
2092                                                         int(ONE_KIB) * 384)))
2093    resumable_threshold_for_test = (
2094        'GSUtil', 'resumable_threshold', str(ONE_KIB))
2095    resumable_chunk_size_for_test = (
2096        'GSUtil', 'json_resumable_chunk_size', str(ONE_KIB * 256))
2097    with SetBotoConfigForTest([resumable_threshold_for_test,
2098                               resumable_chunk_size_for_test]):
2099      stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
2100                               fpath, suri(bucket_uri)],
2101                              expected_status=1, return_stderr=True)
2102      self.assertIn('Artifically halting upload', stderr)
2103      fpath = self.CreateTempFile(file_name='foo', tmpdir=tmp_dir,
2104                                  contents='b' * ONE_KIB * 512)
2105      stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)],
2106                              expected_status=1, return_stderr=True)
2107      self.assertIn('doesn\'t match cloud-supplied digest', stderr)
2108
2109  @SkipForS3('No resumable upload support for S3.')
2110  def test_cp_resumable_upload_break_file_smaller_size(self):
2111    """Tests a resumable upload where the uploaded file changes content.
2112
2113    This should fail hash validation.
2114    """
2115    bucket_uri = self.CreateBucket()
2116    tmp_dir = self.CreateTempDir()
2117    fpath = self.CreateTempFile(file_name='foo', tmpdir=tmp_dir,
2118                                contents='a' * ONE_KIB * 512)
2119    test_callback_file = self.CreateTempFile(
2120        contents=pickle.dumps(HaltingCopyCallbackHandler(True,
2121                                                         int(ONE_KIB) * 384)))
2122    resumable_threshold_for_test = (
2123        'GSUtil', 'resumable_threshold', str(ONE_KIB))
2124    resumable_chunk_size_for_test = (
2125        'GSUtil', 'json_resumable_chunk_size', str(ONE_KIB * 256))
2126    with SetBotoConfigForTest([resumable_threshold_for_test,
2127                               resumable_chunk_size_for_test]):
2128      stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
2129                               fpath, suri(bucket_uri)],
2130                              expected_status=1, return_stderr=True)
2131      self.assertIn('Artifically halting upload', stderr)
2132      fpath = self.CreateTempFile(file_name='foo', tmpdir=tmp_dir,
2133                                  contents='a' * ONE_KIB)
2134      stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)],
2135                              expected_status=1, return_stderr=True)
2136      self.assertIn('ResumableUploadAbortException', stderr)
2137
2138  @SkipForS3('No resumable upload support for S3.')
2139  def test_cp_composite_encrypted_upload_resume(self):
2140    """Tests that an encrypted composite upload resumes successfully."""
2141    if self.test_api == ApiSelector.XML:
2142      return unittest.skip(
2143          'gsutil does not support encryption with the XML API')
2144    bucket_uri = self.CreateBucket()
2145    dst_url = StorageUrlFromString(suri(bucket_uri, 'foo'))
2146
2147    file_contents = 'foobar'
2148    source_file = self.CreateTempFile(
2149        contents=file_contents, file_name=file_contents)
2150    src_url = StorageUrlFromString(source_file)
2151
2152    # Simulate an upload that had occurred by writing a tracker file
2153    # that points to a previously uploaded component.
2154    tracker_file_name = GetTrackerFilePath(
2155        dst_url, TrackerFileType.PARALLEL_UPLOAD, self.test_api, src_url)
2156    tracker_prefix = '123'
2157
2158    # Create component 0 to be used in the resume; it must match the name
2159    # that will be generated in copy_helper, so we use the same scheme.
2160    encoded_name = (PARALLEL_UPLOAD_STATIC_SALT + source_file).encode(UTF8)
2161    content_md5 = md5()
2162    content_md5.update(encoded_name)
2163    digest = content_md5.hexdigest()
2164    component_object_name = (tracker_prefix + PARALLEL_UPLOAD_TEMP_NAMESPACE +
2165                             digest + '_0')
2166
2167    component_size = 3
2168    object_uri = self.CreateObject(
2169        bucket_uri=bucket_uri, object_name=component_object_name,
2170        contents=file_contents[:component_size],
2171        encryption_key=TEST_ENCRYPTION_KEY1)
2172    existing_component = ObjectFromTracker(component_object_name,
2173                                           str(object_uri.generation))
2174    existing_components = [existing_component]
2175    enc_key_sha256 = TEST_ENCRYPTION_KEY1_SHA256_B64
2176
2177    WriteParallelUploadTrackerFile(
2178        tracker_file_name, tracker_prefix, existing_components,
2179        encryption_key_sha256=enc_key_sha256)
2180
2181    try:
2182      # Now "resume" the upload using the original encryption key.
2183      with SetBotoConfigForTest([
2184          ('GSUtil', 'parallel_composite_upload_threshold', '1'),
2185          ('GSUtil', 'parallel_composite_upload_component_size',
2186           str(component_size)),
2187          ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY1)]):
2188        stderr = self.RunGsUtil(['cp', source_file, suri(bucket_uri, 'foo')],
2189                                return_stderr=True)
2190        self.assertIn('Found 1 existing temporary components to reuse.', stderr)
2191        self.assertFalse(
2192            os.path.exists(tracker_file_name),
2193            'Tracker file %s should have been deleted.' % tracker_file_name)
2194        read_contents = self.RunGsUtil(['cat', suri(bucket_uri, 'foo')],
2195                                       return_stdout=True)
2196        self.assertEqual(read_contents, file_contents)
2197    finally:
2198      # Clean up if something went wrong.
2199      DeleteTrackerFile(tracker_file_name)
2200
2201  @SkipForS3('No resumable upload support for S3.')
2202  def test_cp_composite_encrypted_upload_restart(self):
2203    """Tests that encrypted composite upload restarts given a different key."""
2204    if self.test_api == ApiSelector.XML:
2205      return unittest.skip(
2206          'gsutil does not support encryption with the XML API')
2207    bucket_uri = self.CreateBucket()
2208    dst_url = StorageUrlFromString(suri(bucket_uri, 'foo'))
2209
2210    file_contents = 'foobar'
2211    source_file = self.CreateTempFile(contents=file_contents, file_name='foo')
2212    src_url = StorageUrlFromString(source_file)
2213
2214    # Simulate an upload that had occurred by writing a tracker file.
2215    tracker_file_name = GetTrackerFilePath(
2216        dst_url, TrackerFileType.PARALLEL_UPLOAD, self.test_api, src_url)
2217    tracker_prefix = '123'
2218    existing_component_name = 'foo_1'
2219    object_uri = self.CreateObject(
2220        bucket_uri=bucket_uri, object_name='foo_1',
2221        contents='foo', encryption_key=TEST_ENCRYPTION_KEY1)
2222    existing_component = ObjectFromTracker(existing_component_name,
2223                                           str(object_uri.generation))
2224    existing_components = [existing_component]
2225    enc_key_sha256 = TEST_ENCRYPTION_KEY1_SHA256_B64
2226    WriteParallelUploadTrackerFile(
2227        tracker_file_name, tracker_prefix, existing_components, enc_key_sha256)
2228
2229    try:
2230      # Now "resume" the upload using the original encryption key.
2231      with SetBotoConfigForTest([
2232          ('GSUtil', 'parallel_composite_upload_threshold', '1'),
2233          ('GSUtil', 'parallel_composite_upload_component_size', '3'),
2234          ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY2)]):
2235        stderr = self.RunGsUtil(['cp', source_file, suri(bucket_uri, 'foo')],
2236                                return_stderr=True)
2237        self.assertIn('does not match current encryption key. '
2238                      'Deleting old components and restarting upload', stderr)
2239        self.assertNotIn('existing temporary components to reuse.', stderr)
2240        self.assertFalse(
2241            os.path.exists(tracker_file_name),
2242            'Tracker file %s should have been deleted.' % tracker_file_name)
2243        read_contents = self.RunGsUtil(['cat', suri(bucket_uri, 'foo')],
2244                                       return_stdout=True)
2245        self.assertEqual(read_contents, file_contents)
2246    finally:
2247      # Clean up if something went wrong.
2248      DeleteTrackerFile(tracker_file_name)
2249
2250  # This temporarily changes the tracker directory to unwritable which
2251  # interferes with any parallel running tests that use the tracker directory.
2252  @NotParallelizable
2253  @SkipForS3('No resumable upload support for S3.')
2254  @unittest.skipIf(IS_WINDOWS, 'chmod on dir unsupported on Windows.')
2255  @SequentialAndParallelTransfer
2256  def test_cp_unwritable_tracker_file(self):
2257    """Tests a resumable upload with an unwritable tracker file."""
2258    bucket_uri = self.CreateBucket()
2259    tracker_filename = GetTrackerFilePath(
2260        StorageUrlFromString(suri(bucket_uri, 'foo')),
2261        TrackerFileType.UPLOAD, self.test_api)
2262    tracker_dir = os.path.dirname(tracker_filename)
2263    fpath = self.CreateTempFile(file_name='foo', contents='a' * ONE_KIB)
2264    boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
2265    save_mod = os.stat(tracker_dir).st_mode
2266
2267    try:
2268      os.chmod(tracker_dir, 0)
2269      with SetBotoConfigForTest([boto_config_for_test]):
2270        stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)],
2271                                expected_status=1, return_stderr=True)
2272        self.assertIn('Couldn\'t write tracker file', stderr)
2273    finally:
2274      os.chmod(tracker_dir, save_mod)
2275      if os.path.exists(tracker_filename):
2276        os.unlink(tracker_filename)
2277
2278  # This temporarily changes the tracker directory to unwritable which
2279  # interferes with any parallel running tests that use the tracker directory.
2280  @NotParallelizable
2281  @unittest.skipIf(IS_WINDOWS, 'chmod on dir unsupported on Windows.')
2282  @SequentialAndParallelTransfer
2283  def test_cp_unwritable_tracker_file_download(self):
2284    """Tests downloads with an unwritable tracker file."""
2285    object_uri = self.CreateObject(contents='foo' * ONE_KIB)
2286    tracker_filename = GetTrackerFilePath(
2287        StorageUrlFromString(suri(object_uri)),
2288        TrackerFileType.DOWNLOAD, self.test_api)
2289    tracker_dir = os.path.dirname(tracker_filename)
2290    fpath = self.CreateTempFile()
2291    save_mod = os.stat(tracker_dir).st_mode
2292
2293    try:
2294      os.chmod(tracker_dir, 0)
2295      boto_config_for_test = ('GSUtil', 'resumable_threshold', str(EIGHT_MIB))
2296      with SetBotoConfigForTest([boto_config_for_test]):
2297        # Should succeed because we are below the threshold.
2298        self.RunGsUtil(['cp', suri(object_uri), fpath])
2299      boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
2300      with SetBotoConfigForTest([boto_config_for_test]):
2301        stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
2302                                expected_status=1, return_stderr=True)
2303        self.assertIn('Couldn\'t write tracker file', stderr)
2304    finally:
2305      os.chmod(tracker_dir, save_mod)
2306      if os.path.exists(tracker_filename):
2307        os.unlink(tracker_filename)
2308
2309  def _test_cp_resumable_download_break_helper(self, boto_config,
2310                                               encryption_key=None):
2311    """Helper function for different modes of resumable download break.
2312
2313    Args:
2314      boto_config: List of boto configuration tuples for use with
2315          SetBotoConfigForTest.
2316      encryption_key: Base64 encryption key for object encryption (if any).
2317    """
2318    bucket_uri = self.CreateBucket()
2319    file_contents = 'a' * self.halt_size
2320    object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
2321                                   contents=file_contents,
2322                                   encryption_key=encryption_key)
2323    fpath = self.CreateTempFile()
2324    test_callback_file = self.CreateTempFile(
2325        contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5)))
2326
2327    with SetBotoConfigForTest(boto_config):
2328      stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
2329                               suri(object_uri), fpath],
2330                              expected_status=1, return_stderr=True)
2331      self.assertIn('Artifically halting download.', stderr)
2332      tracker_filename = GetTrackerFilePath(
2333          StorageUrlFromString(fpath), TrackerFileType.DOWNLOAD, self.test_api)
2334      self.assertTrue(os.path.isfile(tracker_filename))
2335      stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
2336                              return_stderr=True)
2337      self.assertIn('Resuming download', stderr)
2338    with open(fpath, 'r') as f:
2339      self.assertEqual(f.read(), file_contents, 'File contents differ')
2340
2341  def test_cp_resumable_download_break(self):
2342    """Tests that a download can be resumed after a connection break."""
2343    self._test_cp_resumable_download_break_helper(
2344        [('GSUtil', 'resumable_threshold', str(ONE_KIB))])
2345
2346  @SkipForS3('gsutil doesn\'t support S3 customer-supplied encryption keys.')
2347  def test_cp_resumable_encrypted_download_break(self):
2348    """Tests that an encrypted download resumes after a connection break."""
2349    if self.test_api == ApiSelector.XML:
2350      return unittest.skip(
2351          'gsutil does not support encryption with the XML API')
2352    self._test_cp_resumable_download_break_helper(
2353        [('GSUtil', 'resumable_threshold', str(ONE_KIB)),
2354         ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY1)],
2355        encryption_key=TEST_ENCRYPTION_KEY1)
2356
2357  @SkipForS3('gsutil doesn\'t support S3 customer-supplied encryption keys.')
2358  def test_cp_resumable_encrypted_download_key_rotation(self):
2359    """Tests that a download restarts with a rotated encryption key."""
2360    if self.test_api == ApiSelector.XML:
2361      return unittest.skip(
2362          'gsutil does not support encryption with the XML API')
2363    bucket_uri = self.CreateBucket()
2364    file_contents = 'a' * self.halt_size
2365    object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
2366                                   contents=file_contents,
2367                                   encryption_key=TEST_ENCRYPTION_KEY1)
2368    fpath = self.CreateTempFile()
2369    test_callback_file = self.CreateTempFile(
2370        contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5)))
2371
2372    boto_config_for_test = [
2373        ('GSUtil', 'resumable_threshold', str(ONE_KIB)),
2374        ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY1)]
2375
2376    with SetBotoConfigForTest(boto_config_for_test):
2377      stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
2378                               suri(object_uri), fpath],
2379                              expected_status=1, return_stderr=True)
2380      self.assertIn('Artifically halting download.', stderr)
2381      tracker_filename = GetTrackerFilePath(
2382          StorageUrlFromString(fpath), TrackerFileType.DOWNLOAD, self.test_api)
2383      self.assertTrue(os.path.isfile(tracker_filename))
2384
2385    # After simulated connection break, rotate the key on the object.
2386    boto_config_for_test2 = [
2387        ('GSUtil', 'resumable_threshold', str(ONE_KIB)),
2388        ('GSUtil', 'decryption_key1', TEST_ENCRYPTION_KEY1),
2389        ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY2)]
2390    with SetBotoConfigForTest(boto_config_for_test2):
2391      self.RunGsUtil(['rewrite', '-k', suri(object_uri)])
2392
2393    # Now resume the download using only the new encryption key. Since its
2394    # generation changed, we must restart it.
2395    boto_config_for_test3 = [
2396        ('GSUtil', 'resumable_threshold', str(ONE_KIB)),
2397        ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY2)]
2398    with SetBotoConfigForTest(boto_config_for_test3):
2399      stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
2400                              return_stderr=True)
2401      self.assertIn('Restarting download', stderr)
2402    with open(fpath, 'r') as f:
2403      self.assertEqual(f.read(), file_contents, 'File contents differ')
2404
2405  @SequentialAndParallelTransfer
2406  def test_cp_resumable_download_etag_differs(self):
2407    """Tests that download restarts the file when the source object changes.
2408
2409    This causes the etag not to match.
2410    """
2411    bucket_uri = self.CreateBucket()
2412    object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
2413                                   contents='abc' * self.halt_size)
2414    fpath = self.CreateTempFile()
2415    test_callback_file = self.CreateTempFile(
2416        contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5)))
2417    boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
2418    with SetBotoConfigForTest([boto_config_for_test]):
2419      # This will create a tracker file with an ETag.
2420      stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
2421                               suri(object_uri), fpath],
2422                              expected_status=1, return_stderr=True)
2423      self.assertIn('Artifically halting download.', stderr)
2424      # Create a new object with different contents - it should have a
2425      # different ETag since the content has changed.
2426      object_uri = self.CreateObject(
2427          bucket_uri=bucket_uri, object_name='foo',
2428          contents='b' * self.halt_size,
2429          gs_idempotent_generation=object_uri.generation)
2430      stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
2431                              return_stderr=True)
2432      self.assertNotIn('Resuming download', stderr)
2433
2434  # TODO: Enable this test for sequential downloads when their tracker files are
2435  # modified to contain the source object generation.
2436  @unittest.skipUnless(UsingCrcmodExtension(crcmod),
2437                       'Sliced download requires fast crcmod.')
2438  @SkipForS3('No sliced download support for S3.')
2439  def test_cp_resumable_download_generation_differs(self):
2440    """Tests that a resumable download restarts if the generation differs."""
2441    bucket_uri = self.CreateBucket()
2442    file_contents = 'abcd' * self.halt_size
2443    object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
2444                                   contents=file_contents)
2445    fpath = self.CreateTempFile()
2446
2447    test_callback_file = self.CreateTempFile(
2448        contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5)))
2449
2450    boto_config_for_test = [
2451        ('GSUtil', 'resumable_threshold', str(self.halt_size)),
2452        ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)),
2453        ('GSUtil', 'sliced_object_download_max_components', '3')]
2454
2455    with SetBotoConfigForTest(boto_config_for_test):
2456      stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
2457                               suri(object_uri), suri(fpath)],
2458                              return_stderr=True, expected_status=1)
2459      self.assertIn('Artifically halting download.', stderr)
2460
2461      # Overwrite the object with an identical object, increasing
2462      # the generation but leaving other metadata the same.
2463      identical_file = self.CreateTempFile(contents=file_contents)
2464      self.RunGsUtil(['cp', suri(identical_file), suri(object_uri)])
2465
2466      stderr = self.RunGsUtil(['cp', suri(object_uri), suri(fpath)],
2467                              return_stderr=True)
2468      self.assertIn('Restarting download from scratch', stderr)
2469      with open(fpath, 'r') as f:
2470        self.assertEqual(f.read(), file_contents, 'File contents differ')
2471
2472  def test_cp_resumable_download_file_larger(self):
2473    """Tests download deletes the tracker file when existing file is larger."""
2474    bucket_uri = self.CreateBucket()
2475    fpath = self.CreateTempFile()
2476    object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
2477                                   contents='a' * self.halt_size)
2478    test_callback_file = self.CreateTempFile(
2479        contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5)))
2480    boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
2481    with SetBotoConfigForTest([boto_config_for_test]):
2482      stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
2483                               suri(object_uri), fpath],
2484                              expected_status=1, return_stderr=True)
2485      self.assertIn('Artifically halting download.', stderr)
2486      with open(fpath + '_.gstmp', 'w') as larger_file:
2487        for _ in range(self.halt_size * 2):
2488          larger_file.write('a')
2489      stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
2490                              expected_status=1, return_stderr=True)
2491      self.assertNotIn('Resuming download', stderr)
2492      self.assertIn('Deleting tracker file', stderr)
2493
2494  def test_cp_resumable_download_content_differs(self):
2495    """Tests that we do not re-download when tracker file matches existing file.
2496
2497    We only compare size, not contents, so re-download should not occur even
2498    though the contents are technically different. However, hash validation on
2499    the file should still occur and we will delete the file then because
2500    the hashes differ.
2501    """
2502    bucket_uri = self.CreateBucket()
2503    tmp_dir = self.CreateTempDir()
2504    fpath = self.CreateTempFile(tmpdir=tmp_dir)
2505    temp_download_file = fpath + '_.gstmp'
2506    with open(temp_download_file, 'w') as fp:
2507      fp.write('abcd' * ONE_KIB)
2508
2509    object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
2510                                   contents='efgh' * ONE_KIB)
2511    stdout = self.RunGsUtil(['ls', '-L', suri(object_uri)], return_stdout=True)
2512    etag_match = re.search(r'\s*ETag:\s*(.*)', stdout)
2513    self.assertIsNotNone(etag_match, 'Could not get object ETag')
2514    self.assertEqual(len(etag_match.groups()), 1,
2515                     'Did not match expected single ETag')
2516    etag = etag_match.group(1)
2517
2518    tracker_filename = GetTrackerFilePath(
2519        StorageUrlFromString(fpath), TrackerFileType.DOWNLOAD, self.test_api)
2520    try:
2521      with open(tracker_filename, 'w') as tracker_fp:
2522        tracker_fp.write(etag)
2523      boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
2524      with SetBotoConfigForTest([boto_config_for_test]):
2525        stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
2526                                return_stderr=True, expected_status=1)
2527        self.assertIn('Download already complete', stderr)
2528        self.assertIn('doesn\'t match cloud-supplied digest', stderr)
2529        # File and tracker file should be deleted.
2530        self.assertFalse(os.path.isfile(temp_download_file))
2531        self.assertFalse(os.path.isfile(tracker_filename))
2532        # Permanent file should not have been created.
2533        self.assertFalse(os.path.isfile(fpath))
2534    finally:
2535      if os.path.exists(tracker_filename):
2536        os.unlink(tracker_filename)
2537
2538  def test_cp_resumable_download_content_matches(self):
2539    """Tests download no-ops when tracker file matches existing file."""
2540    bucket_uri = self.CreateBucket()
2541    tmp_dir = self.CreateTempDir()
2542    fpath = self.CreateTempFile(tmpdir=tmp_dir)
2543    matching_contents = 'abcd' * ONE_KIB
2544    temp_download_file = fpath + '_.gstmp'
2545    with open(temp_download_file, 'w') as fp:
2546      fp.write(matching_contents)
2547
2548    object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
2549                                   contents=matching_contents)
2550    stdout = self.RunGsUtil(['ls', '-L', suri(object_uri)], return_stdout=True)
2551    etag_match = re.search(r'\s*ETag:\s*(.*)', stdout)
2552    self.assertIsNotNone(etag_match, 'Could not get object ETag')
2553    self.assertEqual(len(etag_match.groups()), 1,
2554                     'Did not match expected single ETag')
2555    etag = etag_match.group(1)
2556    tracker_filename = GetTrackerFilePath(
2557        StorageUrlFromString(fpath), TrackerFileType.DOWNLOAD, self.test_api)
2558    with open(tracker_filename, 'w') as tracker_fp:
2559      tracker_fp.write(etag)
2560    try:
2561      boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
2562      with SetBotoConfigForTest([boto_config_for_test]):
2563        stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
2564                                return_stderr=True)
2565        self.assertIn('Download already complete', stderr)
2566        # Tracker file should be removed after successful hash validation.
2567        self.assertFalse(os.path.isfile(tracker_filename))
2568    finally:
2569      if os.path.exists(tracker_filename):
2570        os.unlink(tracker_filename)
2571
2572  def test_cp_resumable_download_tracker_file_not_matches(self):
2573    """Tests that download overwrites when tracker file etag does not match."""
2574    bucket_uri = self.CreateBucket()
2575    tmp_dir = self.CreateTempDir()
2576    fpath = self.CreateTempFile(tmpdir=tmp_dir, contents='abcd' * ONE_KIB)
2577    object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
2578                                   contents='efgh' * ONE_KIB)
2579    stdout = self.RunGsUtil(['ls', '-L', suri(object_uri)], return_stdout=True)
2580    etag_match = re.search(r'\s*ETag:\s*(.*)', stdout)
2581    self.assertIsNotNone(etag_match, 'Could not get object ETag')
2582    self.assertEqual(len(etag_match.groups()), 1,
2583                     'Did not match regex for exactly one object ETag')
2584    etag = etag_match.group(1)
2585    etag += 'nonmatching'
2586    tracker_filename = GetTrackerFilePath(
2587        StorageUrlFromString(fpath), TrackerFileType.DOWNLOAD, self.test_api)
2588    with open(tracker_filename, 'w') as tracker_fp:
2589      tracker_fp.write(etag)
2590    try:
2591      boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
2592      with SetBotoConfigForTest([boto_config_for_test]):
2593        stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
2594                                return_stderr=True)
2595        self.assertNotIn('Resuming download', stderr)
2596        # Ensure the file was overwritten.
2597        with open(fpath, 'r') as in_fp:
2598          contents = in_fp.read()
2599          self.assertEqual(contents, 'efgh' * ONE_KIB,
2600                           'File not overwritten when it should have been '
2601                           'due to a non-matching tracker file.')
2602        self.assertFalse(os.path.isfile(tracker_filename))
2603    finally:
2604      if os.path.exists(tracker_filename):
2605        os.unlink(tracker_filename)
2606
2607  def test_cp_double_gzip(self):
2608    """Tests that upload and download of a doubly-gzipped file succeeds."""
2609    bucket_uri = self.CreateBucket()
2610    fpath = self.CreateTempFile(file_name='looks-zipped.gz', contents='foo')
2611    self.RunGsUtil(['-h', 'content-type:application/gzip', 'cp', '-Z',
2612                    suri(fpath), suri(bucket_uri, 'foo')])
2613    self.RunGsUtil(['cp', suri(bucket_uri, 'foo'), fpath])
2614
2615  @SequentialAndParallelTransfer
2616  def test_cp_resumable_download_gzip(self):
2617    """Tests that download can be resumed successfully with a gzipped file."""
2618    # Generate some reasonably incompressible data.  This compresses to a bit
2619    # around 128K in practice, but we assert specifically below that it is
2620    # larger than self.halt_size to guarantee that we can halt the download
2621    # partway through.
2622    object_uri = self.CreateObject()
2623    random.seed(0)
2624    contents = str([random.choice(string.ascii_letters)
2625                    for _ in xrange(self.halt_size)])
2626    random.seed()  # Reset the seed for any other tests.
2627    fpath1 = self.CreateTempFile(file_name='unzipped.txt', contents=contents)
2628    self.RunGsUtil(['cp', '-z', 'txt', suri(fpath1), suri(object_uri)])
2629
2630    # Use @Retry as hedge against bucket listing eventual consistency.
2631    @Retry(AssertionError, tries=3, timeout_secs=1)
2632    def _GetObjectSize():
2633      stdout = self.RunGsUtil(['du', suri(object_uri)], return_stdout=True)
2634      size_match = re.search(r'(\d+)\s+.*', stdout)
2635      self.assertIsNotNone(size_match, 'Could not get object size')
2636      self.assertEqual(len(size_match.groups()), 1,
2637                       'Did not match regex for exactly one object size.')
2638      return long(size_match.group(1))
2639
2640    object_size = _GetObjectSize()
2641    self.assertGreaterEqual(object_size, self.halt_size,
2642                            'Compresed object size was not large enough to '
2643                            'allow for a halted download, so the test results '
2644                            'would be invalid. Please increase the compressed '
2645                            'object size in the test.')
2646    fpath2 = self.CreateTempFile()
2647    test_callback_file = self.CreateTempFile(
2648        contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5)))
2649
2650    boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
2651    with SetBotoConfigForTest([boto_config_for_test]):
2652      stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
2653                               suri(object_uri), suri(fpath2)],
2654                              return_stderr=True, expected_status=1)
2655      self.assertIn('Artifically halting download.', stderr)
2656      self.assertIn('Downloading to temp gzip filename', stderr)
2657
2658      # Tracker files will have different names depending on if we are
2659      # downloading sequentially or in parallel.
2660      sliced_download_threshold = HumanReadableToBytes(
2661          boto.config.get('GSUtil', 'sliced_object_download_threshold',
2662                          DEFAULT_SLICED_OBJECT_DOWNLOAD_THRESHOLD))
2663      sliced_download = (len(contents) > sliced_download_threshold
2664                         and sliced_download_threshold > 0
2665                         and UsingCrcmodExtension(crcmod))
2666      if sliced_download:
2667        trackerfile_type = TrackerFileType.SLICED_DOWNLOAD
2668      else:
2669        trackerfile_type = TrackerFileType.DOWNLOAD
2670      tracker_filename = GetTrackerFilePath(
2671          StorageUrlFromString(fpath2), trackerfile_type, self.test_api)
2672
2673      # We should have a temporary gzipped file, a tracker file, and no
2674      # final file yet.
2675      self.assertTrue(os.path.isfile(tracker_filename))
2676      self.assertTrue(os.path.isfile('%s_.gztmp' % fpath2))
2677      stderr = self.RunGsUtil(['cp', suri(object_uri), suri(fpath2)],
2678                              return_stderr=True)
2679      self.assertIn('Resuming download', stderr)
2680      with open(fpath2, 'r') as f:
2681        self.assertEqual(f.read(), contents, 'File contents did not match.')
2682      self.assertFalse(os.path.isfile(tracker_filename))
2683      self.assertFalse(os.path.isfile('%s_.gztmp' % fpath2))
2684
2685  @SequentialAndParallelTransfer
2686  def test_cp_resumable_download_check_hashes_never(self):
2687    """Tests that resumble downloads work with check_hashes = never."""
2688    bucket_uri = self.CreateBucket()
2689    contents = 'abcd' * self.halt_size
2690    object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
2691                                   contents=contents)
2692    fpath = self.CreateTempFile()
2693    test_callback_file = self.CreateTempFile(
2694        contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5)))
2695
2696    boto_config_for_test = [('GSUtil', 'resumable_threshold', str(ONE_KIB)),
2697                            ('GSUtil', 'check_hashes', 'never')]
2698    with SetBotoConfigForTest(boto_config_for_test):
2699      stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
2700                               suri(object_uri), fpath],
2701                              expected_status=1, return_stderr=True)
2702      self.assertIn('Artifically halting download.', stderr)
2703      stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
2704                              return_stderr=True)
2705      self.assertIn('Resuming download', stderr)
2706      self.assertIn('Found no hashes to validate object downloaded', stderr)
2707      with open(fpath, 'r') as f:
2708        self.assertEqual(f.read(), contents, 'File contents did not match.')
2709
2710  @SkipForS3('No resumable upload support for S3.')
2711  def test_cp_resumable_upload_bucket_deleted(self):
2712    """Tests that a not found exception is raised if bucket no longer exists."""
2713    bucket_uri = self.CreateBucket()
2714    fpath = self.CreateTempFile(contents='a' * 2 * ONE_KIB)
2715    boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
2716    test_callback_file = self.CreateTempFile(
2717        contents=pickle.dumps(
2718            _DeleteBucketThenStartOverCopyCallbackHandler(5, bucket_uri)))
2719
2720    with SetBotoConfigForTest([boto_config_for_test]):
2721      stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
2722                               fpath, suri(bucket_uri)], return_stderr=True,
2723                              expected_status=1)
2724    self.assertIn('Deleting bucket', stderr)
2725    self.assertIn('bucket does not exist', stderr)
2726
2727  @SkipForS3('No sliced download support for S3.')
2728  def test_cp_sliced_download(self):
2729    """Tests that sliced object download works in the general case."""
2730    bucket_uri = self.CreateBucket()
2731    object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
2732                                   contents='abc' * ONE_KIB)
2733    fpath = self.CreateTempFile()
2734
2735    # Force fast crcmod to return True to test the basic sliced download
2736    # scenario, ensuring that if the user installs crcmod, it will work.
2737    boto_config_for_test = [
2738        ('GSUtil', 'resumable_threshold', str(ONE_KIB)),
2739        ('GSUtil', 'test_assume_fast_crcmod', 'True'),
2740        ('GSUtil', 'sliced_object_download_threshold', str(ONE_KIB)),
2741        ('GSUtil', 'sliced_object_download_max_components', '3')]
2742
2743    with SetBotoConfigForTest(boto_config_for_test):
2744      self.RunGsUtil(['cp', suri(object_uri), fpath])
2745
2746      # Each tracker file should have been deleted.
2747      tracker_filenames = GetSlicedDownloadTrackerFilePaths(
2748          StorageUrlFromString(fpath), self.test_api)
2749      for tracker_filename in tracker_filenames:
2750        self.assertFalse(os.path.isfile(tracker_filename))
2751
2752      with open(fpath, 'r') as f:
2753        self.assertEqual(f.read(), 'abc' * ONE_KIB, 'File contents differ')
2754
2755  @unittest.skipUnless(UsingCrcmodExtension(crcmod),
2756                       'Sliced download requires fast crcmod.')
2757  @SkipForS3('No sliced download support for S3.')
2758  def test_cp_unresumable_sliced_download(self):
2759    """Tests sliced download works when resumability is disabled."""
2760    bucket_uri = self.CreateBucket()
2761    object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
2762                                   contents='abcd' * self.halt_size)
2763    fpath = self.CreateTempFile()
2764    test_callback_file = self.CreateTempFile(
2765        contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5)))
2766
2767    boto_config_for_test = [
2768        ('GSUtil', 'resumable_threshold', str(self.halt_size*5)),
2769        ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)),
2770        ('GSUtil', 'sliced_object_download_max_components', '4')]
2771
2772    with SetBotoConfigForTest(boto_config_for_test):
2773      stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
2774                               suri(object_uri), suri(fpath)],
2775                              return_stderr=True, expected_status=1)
2776      self.assertIn('not downloaded successfully', stderr)
2777      # Temporary download file should exist.
2778      self.assertTrue(os.path.isfile(fpath + '_.gstmp'))
2779
2780      # No tracker files should exist.
2781      tracker_filenames = GetSlicedDownloadTrackerFilePaths(
2782          StorageUrlFromString(fpath), self.test_api)
2783      for tracker_filename in tracker_filenames:
2784        self.assertFalse(os.path.isfile(tracker_filename))
2785
2786    # Perform the entire download, without resuming.
2787    with SetBotoConfigForTest(boto_config_for_test):
2788      stderr = self.RunGsUtil(['cp', suri(object_uri), suri(fpath)],
2789                              return_stderr=True)
2790      self.assertNotIn('Resuming download', stderr)
2791      # Temporary download file should have been deleted.
2792      self.assertFalse(os.path.isfile(fpath + '_.gstmp'))
2793      with open(fpath, 'r') as f:
2794        self.assertEqual(f.read(), 'abcd' * self.halt_size,
2795                         'File contents differ')
2796
2797  @unittest.skipUnless(UsingCrcmodExtension(crcmod),
2798                       'Sliced download requires fast crcmod.')
2799  @SkipForS3('No sliced download support for S3.')
2800  def test_cp_sliced_download_resume(self):
2801    """Tests that sliced object download is resumable."""
2802    bucket_uri = self.CreateBucket()
2803    object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
2804                                   contents='abc' * self.halt_size)
2805    fpath = self.CreateTempFile()
2806    test_callback_file = self.CreateTempFile(
2807        contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5)))
2808
2809    boto_config_for_test = [
2810        ('GSUtil', 'resumable_threshold', str(self.halt_size)),
2811        ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)),
2812        ('GSUtil', 'sliced_object_download_max_components', '3')]
2813
2814    with SetBotoConfigForTest(boto_config_for_test):
2815      stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
2816                               suri(object_uri), suri(fpath)],
2817                              return_stderr=True, expected_status=1)
2818      self.assertIn('not downloaded successfully', stderr)
2819
2820      # Each tracker file should exist.
2821      tracker_filenames = GetSlicedDownloadTrackerFilePaths(
2822          StorageUrlFromString(fpath), self.test_api)
2823      for tracker_filename in tracker_filenames:
2824        self.assertTrue(os.path.isfile(tracker_filename))
2825
2826      stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
2827                              return_stderr=True)
2828      self.assertIn('Resuming download', stderr)
2829
2830      # Each tracker file should have been deleted.
2831      tracker_filenames = GetSlicedDownloadTrackerFilePaths(
2832          StorageUrlFromString(fpath), self.test_api)
2833      for tracker_filename in tracker_filenames:
2834        self.assertFalse(os.path.isfile(tracker_filename))
2835
2836      with open(fpath, 'r') as f:
2837        self.assertEqual(f.read(), 'abc' * self.halt_size,
2838                         'File contents differ')
2839
2840  @unittest.skipUnless(UsingCrcmodExtension(crcmod),
2841                       'Sliced download requires fast crcmod.')
2842  @SkipForS3('No sliced download support for S3.')
2843  def test_cp_sliced_download_partial_resume(self):
2844    """Test sliced download resumability when some components are finished."""
2845    bucket_uri = self.CreateBucket()
2846    object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
2847                                   contents='abc' * self.halt_size)
2848    fpath = self.CreateTempFile()
2849    test_callback_file = self.CreateTempFile(
2850        contents=pickle.dumps(HaltOneComponentCopyCallbackHandler(5)))
2851
2852    boto_config_for_test = [
2853        ('GSUtil', 'resumable_threshold', str(self.halt_size)),
2854        ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)),
2855        ('GSUtil', 'sliced_object_download_max_components', '3')]
2856
2857    with SetBotoConfigForTest(boto_config_for_test):
2858      stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
2859                               suri(object_uri), suri(fpath)],
2860                              return_stderr=True, expected_status=1)
2861      self.assertIn('not downloaded successfully', stderr)
2862
2863      # Each tracker file should exist.
2864      tracker_filenames = GetSlicedDownloadTrackerFilePaths(
2865          StorageUrlFromString(fpath), self.test_api)
2866      for tracker_filename in tracker_filenames:
2867        self.assertTrue(os.path.isfile(tracker_filename))
2868
2869      stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
2870                              return_stderr=True)
2871      self.assertIn('Resuming download', stderr)
2872      self.assertIn('Download already complete', stderr)
2873
2874      # Each tracker file should have been deleted.
2875      tracker_filenames = GetSlicedDownloadTrackerFilePaths(
2876          StorageUrlFromString(fpath), self.test_api)
2877      for tracker_filename in tracker_filenames:
2878        self.assertFalse(os.path.isfile(tracker_filename))
2879
2880      with open(fpath, 'r') as f:
2881        self.assertEqual(f.read(), 'abc' * self.halt_size,
2882                         'File contents differ')
2883
2884  @unittest.skipUnless(UsingCrcmodExtension(crcmod),
2885                       'Sliced download requires fast crcmod.')
2886  @SkipForS3('No sliced download support for S3.')
2887  def test_cp_sliced_download_resume_content_differs(self):
2888    """Tests differing file contents are detected by sliced downloads."""
2889    bucket_uri = self.CreateBucket()
2890    object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
2891                                   contents='abc' * self.halt_size)
2892    fpath = self.CreateTempFile(contents='')
2893    test_callback_file = self.CreateTempFile(
2894        contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5)))
2895
2896    boto_config_for_test = [
2897        ('GSUtil', 'resumable_threshold', str(self.halt_size)),
2898        ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)),
2899        ('GSUtil', 'sliced_object_download_max_components', '3')]
2900
2901    with SetBotoConfigForTest(boto_config_for_test):
2902      stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
2903                               suri(object_uri), suri(fpath)],
2904                              return_stderr=True, expected_status=1)
2905      self.assertIn('not downloaded successfully', stderr)
2906
2907      # Temporary download file should exist.
2908      self.assertTrue(os.path.isfile(fpath + '_.gstmp'))
2909
2910      # Each tracker file should exist.
2911      tracker_filenames = GetSlicedDownloadTrackerFilePaths(
2912          StorageUrlFromString(fpath), self.test_api)
2913      for tracker_filename in tracker_filenames:
2914        self.assertTrue(os.path.isfile(tracker_filename))
2915
2916      with open(fpath + '_.gstmp', 'r+b') as f:
2917        f.write('altered file contents')
2918
2919      stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
2920                              return_stderr=True, expected_status=1)
2921      self.assertIn('Resuming download', stderr)
2922      self.assertIn('doesn\'t match cloud-supplied digest', stderr)
2923      self.assertIn('HashMismatchException: crc32c', stderr)
2924
2925      # Each tracker file should have been deleted.
2926      tracker_filenames = GetSlicedDownloadTrackerFilePaths(
2927          StorageUrlFromString(fpath), self.test_api)
2928      for tracker_filename in tracker_filenames:
2929        self.assertFalse(os.path.isfile(tracker_filename))
2930
2931      # Temporary file should have been deleted due to hash mismatch.
2932      self.assertFalse(os.path.isfile(fpath + '_.gstmp'))
2933      # Final file should not exist.
2934      self.assertFalse(os.path.isfile(fpath))
2935
2936  @unittest.skipUnless(UsingCrcmodExtension(crcmod),
2937                       'Sliced download requires fast crcmod.')
2938  @SkipForS3('No sliced download support for S3.')
2939  def test_cp_sliced_download_component_size_changed(self):
2940    """Tests sliced download doesn't break when the boto config changes.
2941
2942    If the number of components used changes cross-process, the download should
2943    be restarted.
2944    """
2945    bucket_uri = self.CreateBucket()
2946    object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
2947                                   contents='abcd' * self.halt_size)
2948    fpath = self.CreateTempFile()
2949    test_callback_file = self.CreateTempFile(
2950        contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5)))
2951
2952    boto_config_for_test = [
2953        ('GSUtil', 'resumable_threshold', str(self.halt_size)),
2954        ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)),
2955        ('GSUtil', 'sliced_object_download_component_size',
2956         str(self.halt_size//4)),
2957        ('GSUtil', 'sliced_object_download_max_components', '4')]
2958
2959    with SetBotoConfigForTest(boto_config_for_test):
2960      stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
2961                               suri(object_uri), suri(fpath)],
2962                              return_stderr=True, expected_status=1)
2963      self.assertIn('not downloaded successfully', stderr)
2964
2965    boto_config_for_test = [
2966        ('GSUtil', 'resumable_threshold', str(self.halt_size)),
2967        ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)),
2968        ('GSUtil', 'sliced_object_download_component_size',
2969         str(self.halt_size//2)),
2970        ('GSUtil', 'sliced_object_download_max_components', '2')]
2971
2972    with SetBotoConfigForTest(boto_config_for_test):
2973      stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
2974                              return_stderr=True)
2975      self.assertIn('Sliced download tracker file doesn\'t match ', stderr)
2976      self.assertIn('Restarting download from scratch', stderr)
2977      self.assertNotIn('Resuming download', stderr)
2978
2979  @unittest.skipUnless(UsingCrcmodExtension(crcmod),
2980                       'Sliced download requires fast crcmod.')
2981  @SkipForS3('No sliced download support for S3.')
2982  def test_cp_sliced_download_disabled_cross_process(self):
2983    """Tests temporary files are not orphaned if sliced download is disabled.
2984
2985    Specifically, temporary files should be deleted when the corresponding
2986    non-sliced download is completed.
2987    """
2988    bucket_uri = self.CreateBucket()
2989    object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
2990                                   contents='abcd' * self.halt_size)
2991    fpath = self.CreateTempFile()
2992    test_callback_file = self.CreateTempFile(
2993        contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5)))
2994
2995    boto_config_for_test = [
2996        ('GSUtil', 'resumable_threshold', str(self.halt_size)),
2997        ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)),
2998        ('GSUtil', 'sliced_object_download_max_components', '4')]
2999
3000    with SetBotoConfigForTest(boto_config_for_test):
3001      stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
3002                               suri(object_uri), suri(fpath)],
3003                              return_stderr=True, expected_status=1)
3004      self.assertIn('not downloaded successfully', stderr)
3005      # Temporary download file should exist.
3006      self.assertTrue(os.path.isfile(fpath + '_.gstmp'))
3007
3008      # Each tracker file should exist.
3009      tracker_filenames = GetSlicedDownloadTrackerFilePaths(
3010          StorageUrlFromString(fpath), self.test_api)
3011      for tracker_filename in tracker_filenames:
3012        self.assertTrue(os.path.isfile(tracker_filename))
3013
3014    # Disable sliced downloads by increasing the threshold
3015    boto_config_for_test = [
3016        ('GSUtil', 'resumable_threshold', str(self.halt_size)),
3017        ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size*5)),
3018        ('GSUtil', 'sliced_object_download_max_components', '4')]
3019
3020    with SetBotoConfigForTest(boto_config_for_test):
3021      stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
3022                              return_stderr=True)
3023      self.assertNotIn('Resuming download', stderr)
3024      # Temporary download file should have been deleted.
3025      self.assertFalse(os.path.isfile(fpath + '_.gstmp'))
3026
3027      # Each tracker file should have been deleted.
3028      for tracker_filename in tracker_filenames:
3029        self.assertFalse(os.path.isfile(tracker_filename))
3030      with open(fpath, 'r') as f:
3031        self.assertEqual(f.read(), 'abcd' * self.halt_size)
3032
3033  @SkipForS3('No resumable upload support for S3.')
3034  def test_cp_resumable_upload_start_over_http_error(self):
3035    for start_over_error in (404, 410):
3036      self.start_over_error_test_helper(start_over_error)
3037
3038  def start_over_error_test_helper(self, http_error_num):
3039    bucket_uri = self.CreateBucket()
3040    fpath = self.CreateTempFile(contents='a' * 2 * ONE_KIB)
3041    boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
3042    if self.test_api == ApiSelector.JSON:
3043      test_callback_file = self.CreateTempFile(
3044          contents=pickle.dumps(_JSONForceHTTPErrorCopyCallbackHandler(5, 404)))
3045    elif self.test_api == ApiSelector.XML:
3046      test_callback_file = self.CreateTempFile(
3047          contents=pickle.dumps(
3048              _XMLResumableUploadStartOverCopyCallbackHandler(5)))
3049
3050    with SetBotoConfigForTest([boto_config_for_test]):
3051      stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
3052                               fpath, suri(bucket_uri)], return_stderr=True)
3053      self.assertIn('Restarting upload from scratch', stderr)
3054
3055  def test_cp_minus_c(self):
3056    bucket_uri = self.CreateBucket()
3057    object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
3058                                   contents='foo')
3059    self.RunGsUtil(
3060        ['cp', '-c', suri(bucket_uri) + '/foo2', suri(object_uri),
3061         suri(bucket_uri) + '/dir/'],
3062        expected_status=1)
3063    self.RunGsUtil(['stat', '%s/dir/foo' % suri(bucket_uri)])
3064
3065  def test_rewrite_cp(self):
3066    """Tests the JSON Rewrite API."""
3067    if self.test_api == ApiSelector.XML:
3068      return unittest.skip('Rewrite API is only supported in JSON.')
3069    bucket_uri = self.CreateBucket()
3070    object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
3071                                   contents='bar')
3072    gsutil_api = GcsJsonApi(BucketStorageUri, logging.getLogger(),
3073                            DiscardMessagesQueue(), self.default_provider)
3074    key = object_uri.get_key()
3075    src_obj_metadata = apitools_messages.Object(
3076        name=key.name, bucket=key.bucket.name, contentType=key.content_type)
3077    dst_obj_metadata = apitools_messages.Object(
3078        bucket=src_obj_metadata.bucket,
3079        name=self.MakeTempName('object'),
3080        contentType=src_obj_metadata.contentType)
3081    gsutil_api.CopyObject(src_obj_metadata, dst_obj_metadata)
3082    self.assertEqual(
3083        gsutil_api.GetObjectMetadata(src_obj_metadata.bucket,
3084                                     src_obj_metadata.name,
3085                                     fields=['customerEncryption',
3086                                             'md5Hash']).md5Hash,
3087        gsutil_api.GetObjectMetadata(dst_obj_metadata.bucket,
3088                                     dst_obj_metadata.name,
3089                                     fields=['customerEncryption',
3090                                             'md5Hash']).md5Hash,
3091        'Error: Rewritten object\'s hash doesn\'t match source object.')
3092
3093  def test_rewrite_cp_resume(self):
3094    """Tests the JSON Rewrite API, breaking and resuming via a tracker file."""
3095    if self.test_api == ApiSelector.XML:
3096      return unittest.skip('Rewrite API is only supported in JSON.')
3097    bucket_uri = self.CreateBucket()
3098    # Second bucket needs to be a different storage class so the service
3099    # actually rewrites the bytes.
3100    bucket_uri2 = self.CreateBucket(
3101        storage_class='durable_reduced_availability')
3102    # maxBytesPerCall must be >= 1 MiB, so create an object > 2 MiB because we
3103    # need 2 response from the service: 1 success, 1 failure prior to
3104    # completion.
3105    object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
3106                                   contents=('12'*ONE_MIB) + 'bar',
3107                                   prefer_json_api=True)
3108    gsutil_api = GcsJsonApi(BucketStorageUri, logging.getLogger(),
3109                            DiscardMessagesQueue(), self.default_provider)
3110    key = object_uri.get_key()
3111    src_obj_metadata = apitools_messages.Object(
3112        name=key.name, bucket=key.bucket.name, contentType=key.content_type,
3113        etag=key.etag.strip('"\''))
3114    dst_obj_name = self.MakeTempName('object')
3115    dst_obj_metadata = apitools_messages.Object(
3116        bucket=bucket_uri2.bucket_name,
3117        name=dst_obj_name,
3118        contentType=src_obj_metadata.contentType)
3119    tracker_file_name = GetRewriteTrackerFilePath(
3120        src_obj_metadata.bucket, src_obj_metadata.name,
3121        dst_obj_metadata.bucket, dst_obj_metadata.name, self.test_api)
3122    try:
3123      try:
3124        gsutil_api.CopyObject(
3125            src_obj_metadata, dst_obj_metadata,
3126            progress_callback=HaltingRewriteCallbackHandler(ONE_MIB*2).call,
3127            max_bytes_per_call=ONE_MIB)
3128        self.fail('Expected RewriteHaltException.')
3129      except RewriteHaltException:
3130        pass
3131
3132      # Tracker file should be left over.
3133      self.assertTrue(os.path.exists(tracker_file_name))
3134
3135      # Now resume. Callback ensures we didn't start over.
3136      gsutil_api.CopyObject(
3137          src_obj_metadata, dst_obj_metadata,
3138          progress_callback=EnsureRewriteResumeCallbackHandler(ONE_MIB*2).call,
3139          max_bytes_per_call=ONE_MIB)
3140
3141      # Copy completed; tracker file should be deleted.
3142      self.assertFalse(os.path.exists(tracker_file_name))
3143
3144      self.assertEqual(
3145          gsutil_api.GetObjectMetadata(src_obj_metadata.bucket,
3146                                       src_obj_metadata.name,
3147                                       fields=['customerEncryption',
3148                                               'md5Hash']).md5Hash,
3149          gsutil_api.GetObjectMetadata(dst_obj_metadata.bucket,
3150                                       dst_obj_metadata.name,
3151                                       fields=['customerEncryption',
3152                                               'md5Hash']).md5Hash,
3153          'Error: Rewritten object\'s hash doesn\'t match source object.')
3154    finally:
3155      # Clean up if something went wrong.
3156      DeleteTrackerFile(tracker_file_name)
3157
3158  def test_rewrite_cp_resume_source_changed(self):
3159    """Tests that Rewrite starts over when the source object has changed."""
3160    if self.test_api == ApiSelector.XML:
3161      return unittest.skip('Rewrite API is only supported in JSON.')
3162    bucket_uri = self.CreateBucket()
3163    # Second bucket needs to be a different storage class so the service
3164    # actually rewrites the bytes.
3165    bucket_uri2 = self.CreateBucket(
3166        storage_class='durable_reduced_availability')
3167    # maxBytesPerCall must be >= 1 MiB, so create an object > 2 MiB because we
3168    # need 2 response from the service: 1 success, 1 failure prior to
3169    # completion.
3170    object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
3171                                   contents=('12'*ONE_MIB) + 'bar',
3172                                   prefer_json_api=True)
3173    gsutil_api = GcsJsonApi(BucketStorageUri, logging.getLogger(),
3174                            DiscardMessagesQueue(), self.default_provider)
3175    key = object_uri.get_key()
3176    src_obj_metadata = apitools_messages.Object(
3177        name=key.name, bucket=key.bucket.name, contentType=key.content_type,
3178        etag=key.etag.strip('"\''))
3179    dst_obj_name = self.MakeTempName('object')
3180    dst_obj_metadata = apitools_messages.Object(
3181        bucket=bucket_uri2.bucket_name,
3182        name=dst_obj_name,
3183        contentType=src_obj_metadata.contentType)
3184    tracker_file_name = GetRewriteTrackerFilePath(
3185        src_obj_metadata.bucket, src_obj_metadata.name,
3186        dst_obj_metadata.bucket, dst_obj_metadata.name, self.test_api)
3187    try:
3188      try:
3189        gsutil_api.CopyObject(
3190            src_obj_metadata, dst_obj_metadata,
3191            progress_callback=HaltingRewriteCallbackHandler(ONE_MIB*2).call,
3192            max_bytes_per_call=ONE_MIB)
3193        self.fail('Expected RewriteHaltException.')
3194      except RewriteHaltException:
3195        pass
3196      # Overwrite the original object.
3197      object_uri2 = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
3198                                      contents='bar', prefer_json_api=True)
3199      key2 = object_uri2.get_key()
3200      src_obj_metadata2 = apitools_messages.Object(
3201          name=key2.name, bucket=key2.bucket.name,
3202          contentType=key2.content_type, etag=key2.etag.strip('"\''))
3203
3204      # Tracker file for original object should still exist.
3205      self.assertTrue(os.path.exists(tracker_file_name))
3206
3207      # Copy the new object.
3208      gsutil_api.CopyObject(src_obj_metadata2, dst_obj_metadata,
3209                            max_bytes_per_call=ONE_MIB)
3210
3211      # Copy completed; original tracker file should be deleted.
3212      self.assertFalse(os.path.exists(tracker_file_name))
3213
3214      self.assertEqual(
3215          gsutil_api.GetObjectMetadata(src_obj_metadata2.bucket,
3216                                       src_obj_metadata2.name,
3217                                       fields=['customerEncryption',
3218                                               'md5Hash']).md5Hash,
3219          gsutil_api.GetObjectMetadata(dst_obj_metadata.bucket,
3220                                       dst_obj_metadata.name,
3221                                       fields=['customerEncryption',
3222                                               'md5Hash']).md5Hash,
3223          'Error: Rewritten object\'s hash doesn\'t match source object.')
3224    finally:
3225      # Clean up if something went wrong.
3226      DeleteTrackerFile(tracker_file_name)
3227
3228  def test_rewrite_cp_resume_command_changed(self):
3229    """Tests that Rewrite starts over when the arguments changed."""
3230    if self.test_api == ApiSelector.XML:
3231      return unittest.skip('Rewrite API is only supported in JSON.')
3232    bucket_uri = self.CreateBucket()
3233    # Second bucket needs to be a different storage class so the service
3234    # actually rewrites the bytes.
3235    bucket_uri2 = self.CreateBucket(
3236        storage_class='durable_reduced_availability')
3237    # maxBytesPerCall must be >= 1 MiB, so create an object > 2 MiB because we
3238    # need 2 response from the service: 1 success, 1 failure prior to
3239    # completion.
3240    object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
3241                                   contents=('12'*ONE_MIB) + 'bar',
3242                                   prefer_json_api=True)
3243    gsutil_api = GcsJsonApi(BucketStorageUri, logging.getLogger(),
3244                            DiscardMessagesQueue(), self.default_provider)
3245    key = object_uri.get_key()
3246    src_obj_metadata = apitools_messages.Object(
3247        name=key.name, bucket=key.bucket.name, contentType=key.content_type,
3248        etag=key.etag.strip('"\''))
3249    dst_obj_name = self.MakeTempName('object')
3250    dst_obj_metadata = apitools_messages.Object(
3251        bucket=bucket_uri2.bucket_name,
3252        name=dst_obj_name,
3253        contentType=src_obj_metadata.contentType)
3254    tracker_file_name = GetRewriteTrackerFilePath(
3255        src_obj_metadata.bucket, src_obj_metadata.name,
3256        dst_obj_metadata.bucket, dst_obj_metadata.name, self.test_api)
3257    try:
3258      try:
3259        gsutil_api.CopyObject(
3260            src_obj_metadata, dst_obj_metadata, canned_acl='private',
3261            progress_callback=HaltingRewriteCallbackHandler(ONE_MIB*2).call,
3262            max_bytes_per_call=ONE_MIB)
3263        self.fail('Expected RewriteHaltException.')
3264      except RewriteHaltException:
3265        pass
3266
3267      # Tracker file for original object should still exist.
3268      self.assertTrue(os.path.exists(tracker_file_name))
3269
3270      # Copy the same object but with different call parameters.
3271      gsutil_api.CopyObject(src_obj_metadata, dst_obj_metadata,
3272                            canned_acl='public-read',
3273                            max_bytes_per_call=ONE_MIB)
3274
3275      # Copy completed; original tracker file should be deleted.
3276      self.assertFalse(os.path.exists(tracker_file_name))
3277
3278      new_obj_metadata = gsutil_api.GetObjectMetadata(
3279          dst_obj_metadata.bucket, dst_obj_metadata.name,
3280          fields=['acl', 'customerEncryption', 'md5Hash'])
3281      self.assertEqual(
3282          gsutil_api.GetObjectMetadata(src_obj_metadata.bucket,
3283                                       src_obj_metadata.name,
3284                                       fields=['customerEncryption',
3285                                               'md5Hash']).md5Hash,
3286          new_obj_metadata.md5Hash,
3287          'Error: Rewritten object\'s hash doesn\'t match source object.')
3288      # New object should have a public-read ACL from the second command.
3289      found_public_acl = False
3290      for acl_entry in new_obj_metadata.acl:
3291        if acl_entry.entity == 'allUsers':
3292          found_public_acl = True
3293      self.assertTrue(found_public_acl,
3294                      'New object was not written with a public ACL.')
3295    finally:
3296      # Clean up if something went wrong.
3297      DeleteTrackerFile(tracker_file_name)
3298
3299  @unittest.skipIf(IS_WINDOWS, 'POSIX attributes not available on Windows.')
3300  @unittest.skipUnless(UsingCrcmodExtension(crcmod),
3301                       'Test requires fast crcmod.')
3302  def test_cp_preserve_posix_bucket_to_dir_no_errors(self):
3303    """Tests use of the -P flag with cp from a bucket to a local dir.
3304
3305    Specifically tests combinations of POSIX attributes in metadata that will
3306    pass validation.
3307    """
3308    bucket_uri = self.CreateBucket()
3309    tmpdir = self.CreateTempDir()
3310    TestCpMvPOSIXBucketToLocalNoErrors(self, bucket_uri, tmpdir, is_cp=True)
3311
3312  @unittest.skipIf(IS_WINDOWS, 'POSIX attributes not available on Windows.')
3313  def test_cp_preserve_posix_bucket_to_dir_errors(self):
3314    """Tests use of the -P flag with cp from a bucket to a local dir.
3315
3316    Specifically, combinations of POSIX attributes in metadata that will fail
3317    validation.
3318    """
3319    bucket_uri = self.CreateBucket()
3320    tmpdir = self.CreateTempDir()
3321
3322    obj = self.CreateObject(bucket_uri=bucket_uri, object_name='obj',
3323                            contents='obj')
3324    TestCpMvPOSIXBucketToLocalErrors(self, bucket_uri, obj, tmpdir, is_cp=True)
3325
3326  @unittest.skipIf(IS_WINDOWS, 'POSIX attributes not available on Windows.')
3327  def test_cp_preseve_posix_dir_to_bucket_no_errors(self):
3328    """Tests use of the -P flag with cp from a local dir to a bucket."""
3329    bucket_uri = self.CreateBucket()
3330    TestCpMvPOSIXLocalToBucketNoErrors(self, bucket_uri, is_cp=True)
3331
3332  def test_cp_minus_s_to_non_cloud_dest_fails(self):
3333    """Test that cp -s operations to a non-cloud destination are prevented."""
3334    local_file = self.CreateTempFile(contents='foo')
3335    dest_dir = self.CreateTempDir()
3336    stderr = self.RunGsUtil(['cp', '-s', 'standard', local_file, dest_dir],
3337                            expected_status=1, return_stderr=True)
3338    self.assertIn(
3339        'Cannot specify storage class for a non-cloud destination:', stderr)
3340
3341  # TODO: Remove @skip annotation from this test once we upgrade to the Boto
3342  # version that parses the storage class header for HEAD Object responses.
3343  @SkipForXML('Need Boto version > 2.46.1')
3344  def test_cp_specify_nondefault_storage_class(self):
3345    bucket_uri = self.CreateBucket()
3346    object_uri = self.CreateObject(
3347        bucket_uri=bucket_uri, object_name='foo', contents='foo')
3348    object2_suri = suri(object_uri) + 'bar'
3349    # Specify storage class name as mixed case here to ensure that it
3350    # gets normalized to uppercase (S3 would return an error otherwise), and
3351    # that using the normalized case is accepted by each API.
3352    nondefault_storage_class = {
3353        's3': 'Standard_iA',
3354        'gs':'durable_REDUCED_availability'
3355    }
3356    storage_class = nondefault_storage_class[self.default_provider]
3357    self.RunGsUtil(['cp', '-s', storage_class, suri(object_uri), object2_suri])
3358    stdout = self.RunGsUtil(['stat', object2_suri], return_stdout=True)
3359    self.assertRegexpMatchesWithFlags(
3360        stdout, r'Storage class:\s+%s' % storage_class, flags=re.IGNORECASE)
3361
3362  @SkipForS3('Test uses gs-specific storage classes.')
3363  def test_cp_sets_correct_dest_storage_class(self):
3364    """Tests that object storage class is set correctly with and without -s."""
3365    # Use a non-default storage class as the default for the bucket.
3366    bucket_uri = self.CreateBucket(storage_class='nearline')
3367    # Ensure storage class is set correctly for a local-to-cloud copy.
3368    local_fname = 'foo-orig'
3369    local_fpath = self.CreateTempFile(contents='foo', file_name=local_fname)
3370    foo_cloud_suri = suri(bucket_uri) + '/' + local_fname
3371    self.RunGsUtil(['cp', '-s', 'standard', local_fpath, foo_cloud_suri])
3372    with SetBotoConfigForTest([('GSUtil', 'prefer_api', 'json')]):
3373      stdout = self.RunGsUtil(['stat', foo_cloud_suri], return_stdout=True)
3374    self.assertRegexpMatchesWithFlags(
3375        stdout, r'Storage class:\s+STANDARD', flags=re.IGNORECASE)
3376
3377    # Ensure storage class is set correctly for a cloud-to-cloud copy when no
3378    # destination storage class is specified.
3379    foo_nl_suri = suri(bucket_uri) + '/foo-nl'
3380    self.RunGsUtil(['cp', foo_cloud_suri, foo_nl_suri])
3381    # TODO: Remove with-clause after adding storage class parsing in Boto.
3382    with SetBotoConfigForTest([('GSUtil', 'prefer_api', 'json')]):
3383      stdout = self.RunGsUtil(['stat', foo_nl_suri], return_stdout=True)
3384    self.assertRegexpMatchesWithFlags(
3385        stdout, r'Storage class:\s+NEARLINE', flags=re.IGNORECASE)
3386
3387    # Ensure storage class is set correctly for a cloud-to-cloud copy when a
3388    # non-bucket-default storage class is specified.
3389    foo_std_suri = suri(bucket_uri) + '/foo-std'
3390    self.RunGsUtil(['cp', '-s', 'standard', foo_nl_suri, foo_std_suri])
3391    # TODO: Remove with-clause after adding storage class parsing in Boto.
3392    with SetBotoConfigForTest([('GSUtil', 'prefer_api', 'json')]):
3393      stdout = self.RunGsUtil(['stat', foo_std_suri], return_stdout=True)
3394    self.assertRegexpMatchesWithFlags(
3395        stdout, r'Storage class:\s+STANDARD', flags=re.IGNORECASE)
3396
3397
3398class TestCpUnitTests(testcase.GsUtilUnitTestCase):
3399  """Unit tests for gsutil cp."""
3400
3401  def testDownloadWithNoHashAvailable(self):
3402    """Tests a download with no valid server-supplied hash."""
3403    # S3 should have a special message for non-MD5 etags.
3404    bucket_uri = self.CreateBucket(provider='s3')
3405    object_uri = self.CreateObject(bucket_uri=bucket_uri, contents='foo')
3406    object_uri.get_key().etag = '12345'  # Not an MD5
3407    dst_dir = self.CreateTempDir()
3408
3409    log_handler = self.RunCommand(
3410        'cp', [suri(object_uri), dst_dir], return_log_handler=True)
3411    warning_messages = log_handler.messages['warning']
3412    self.assertEquals(2, len(warning_messages))
3413    self.assertRegexpMatches(
3414        warning_messages[0],
3415        r'Non-MD5 etag \(12345\) present for key .*, '
3416        r'data integrity checks are not possible')
3417    self.assertIn('Integrity cannot be assured', warning_messages[1])
3418
3419  def test_object_and_prefix_same_name(self):
3420    bucket_uri = self.CreateBucket()
3421    object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
3422                                   contents='foo')
3423    self.CreateObject(bucket_uri=bucket_uri,
3424                      object_name='foo/bar', contents='bar')
3425    fpath = self.CreateTempFile()
3426    # MockKey doesn't support hash_algs, so the MD5 will not match.
3427    with SetBotoConfigForTest([('GSUtil', 'check_hashes', 'never')]):
3428      self.RunCommand('cp', [suri(object_uri), fpath])
3429    with open(fpath, 'r') as f:
3430      self.assertEqual(f.read(), 'foo')
3431
3432  def test_cp_upload_respects_no_hashes(self):
3433    bucket_uri = self.CreateBucket()
3434    fpath = self.CreateTempFile(contents='abcd')
3435    with SetBotoConfigForTest([('GSUtil', 'check_hashes', 'never')]):
3436      log_handler = self.RunCommand('cp', [fpath, suri(bucket_uri)],
3437                                    return_log_handler=True)
3438    warning_messages = log_handler.messages['warning']
3439    self.assertEquals(1, len(warning_messages))
3440    self.assertIn('Found no hashes to validate object upload',
3441                  warning_messages[0])
3442