1# -*- coding: utf-8 -*-
2# Copyright 2013 Google Inc. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Integration tests for cp command."""
16
17from __future__ import absolute_import
18from __future__ import division
19from __future__ import print_function
20from __future__ import unicode_literals
21
22import ast
23import base64
24import binascii
25import datetime
26import gzip
27import hashlib
28import logging
29import os
30import pickle
31import pkgutil
32import random
33import re
34import stat
35import string
36import sys
37import threading
38
39from apitools.base.py import exceptions as apitools_exceptions
40import boto
41from boto import storage_uri
42from boto.exception import ResumableTransferDisposition
43from boto.exception import StorageResponseError
44from boto.storage_uri import BucketStorageUri
45from gslib.cloud_api import ResumableUploadStartOverException
46from gslib.commands.config import DEFAULT_SLICED_OBJECT_DOWNLOAD_THRESHOLD
47from gslib.cs_api_map import ApiSelector
48from gslib.daisy_chain_wrapper import _DEFAULT_DOWNLOAD_CHUNK_SIZE
49from gslib.discard_messages_queue import DiscardMessagesQueue
50from gslib.gcs_json_api import GcsJsonApi
51from gslib.parallel_tracker_file import ObjectFromTracker
52from gslib.parallel_tracker_file import WriteParallelUploadTrackerFile
53from gslib.project_id import PopulateProjectId
54from gslib.storage_url import StorageUrlFromString
55from gslib.tests.rewrite_helper import EnsureRewriteResumeCallbackHandler
56from gslib.tests.rewrite_helper import HaltingRewriteCallbackHandler
57from gslib.tests.rewrite_helper import RewriteHaltException
58import gslib.tests.testcase as testcase
59from gslib.tests.testcase.base import NotParallelizable
60from gslib.tests.testcase.integration_testcase import SkipForGS
61from gslib.tests.testcase.integration_testcase import SkipForS3
62from gslib.tests.testcase.integration_testcase import SkipForXML
63from gslib.tests.testcase.integration_testcase import SkipForJSON
64from gslib.tests.util import BuildErrorRegex
65from gslib.tests.util import GenerationFromURI as urigen
66from gslib.tests.util import HaltingCopyCallbackHandler
67from gslib.tests.util import HaltOneComponentCopyCallbackHandler
68from gslib.tests.util import HAS_GS_PORT
69from gslib.tests.util import HAS_S3_CREDS
70from gslib.tests.util import ObjectToURI as suri
71from gslib.tests.util import ORPHANED_FILE
72from gslib.tests.util import POSIX_GID_ERROR
73from gslib.tests.util import POSIX_INSUFFICIENT_ACCESS_ERROR
74from gslib.tests.util import POSIX_MODE_ERROR
75from gslib.tests.util import POSIX_UID_ERROR
76from gslib.tests.util import SequentialAndParallelTransfer
77from gslib.tests.util import SetBotoConfigForTest
78from gslib.tests.util import TailSet
79from gslib.tests.util import TEST_ENCRYPTION_KEY1
80from gslib.tests.util import TEST_ENCRYPTION_KEY1_SHA256_B64
81from gslib.tests.util import TEST_ENCRYPTION_KEY2
82from gslib.tests.util import TEST_ENCRYPTION_KEY3
83from gslib.tests.util import unittest
84from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages
85from gslib.tracker_file import DeleteTrackerFile
86from gslib.tracker_file import GetRewriteTrackerFilePath
87from gslib.tracker_file import GetSlicedDownloadTrackerFilePaths
88from gslib.ui_controller import BytesToFixedWidthString
89from gslib.utils import hashing_helper
90from gslib.utils.boto_util import UsingCrcmodExtension
91from gslib.utils.constants import START_CALLBACK_PER_BYTES
92from gslib.utils.constants import UTF8
93from gslib.utils.copy_helper import GetTrackerFilePath
94from gslib.utils.copy_helper import PARALLEL_UPLOAD_STATIC_SALT
95from gslib.utils.copy_helper import PARALLEL_UPLOAD_TEMP_NAMESPACE
96from gslib.utils.copy_helper import TrackerFileType
97from gslib.utils.hashing_helper import CalculateB64EncodedMd5FromContents
98from gslib.utils.hashing_helper import CalculateMd5FromContents
99from gslib.utils.posix_util import GID_ATTR
100from gslib.utils.posix_util import MODE_ATTR
101from gslib.utils.posix_util import NA_ID
102from gslib.utils.posix_util import NA_MODE
103from gslib.utils.posix_util import UID_ATTR
104from gslib.utils.posix_util import ValidateFilePermissionAccess
105from gslib.utils.posix_util import ValidatePOSIXMode
106from gslib.utils.retry_util import Retry
107from gslib.utils.system_util import IS_WINDOWS
108from gslib.utils.text_util import get_random_ascii_chars
109from gslib.utils.unit_util import EIGHT_MIB
110from gslib.utils.unit_util import HumanReadableToBytes
111from gslib.utils.unit_util import MakeHumanReadable
112from gslib.utils.unit_util import ONE_KIB
113from gslib.utils.unit_util import ONE_MIB
114import six
115from six.moves import http_client
116from six.moves import range
117from six.moves import xrange
118
119if six.PY3:
120  long = int  # pylint: disable=redefined-builtin,invalid-name
121
122# These POSIX-specific variables aren't defined for Windows.
123# pylint: disable=g-import-not-at-top
124if not IS_WINDOWS:
125  from gslib.tests import util
126  from gslib.tests.util import DEFAULT_MODE
127  from gslib.tests.util import GetInvalidGid
128  from gslib.tests.util import GetNonPrimaryGid
129  from gslib.tests.util import GetPrimaryGid
130  from gslib.tests.util import INVALID_UID
131  from gslib.tests.util import USER_ID
132# pylint: enable=g-import-not-at-top
133
134
135def TestCpMvPOSIXBucketToLocalErrors(cls, bucket_uri, obj, tmpdir, is_cp=True):
136  """Helper function for preserve_posix_errors tests in test_cp and test_mv.
137
138  Args:
139    cls: An instance of either TestCp or TestMv.
140    bucket_uri: The uri of the bucket that the object is in.
141    obj: The object to run the tests on.
142    tmpdir: The local file path to cp to.
143    is_cp: Whether or not the calling test suite is cp or mv.
144  """
145  error = 'error'
146  # A dict of test_name: attrs_dict.
147  # attrs_dict holds the different attributes that we want for the object in a
148  # specific test.
149  # To minimize potential test flakes from the system's GID mapping changing
150  # mid-test, we use the GID-related methods that fetch GID info each time,
151  # rather than reusing the LazyWrapper-wrapped constants across operations.
152  test_params = {
153      'test1': {
154          MODE_ATTR: '333',
155          error: POSIX_MODE_ERROR
156      },
157      'test2': {
158          GID_ATTR: GetInvalidGid,
159          error: POSIX_GID_ERROR
160      },
161      'test3': {
162          GID_ATTR: GetInvalidGid,
163          MODE_ATTR: '420',
164          error: POSIX_GID_ERROR
165      },
166      'test4': {
167          UID_ATTR: INVALID_UID,
168          error: POSIX_UID_ERROR
169      },
170      'test5': {
171          UID_ATTR: INVALID_UID,
172          MODE_ATTR: '530',
173          error: POSIX_UID_ERROR
174      },
175      'test6': {
176          UID_ATTR: INVALID_UID,
177          GID_ATTR: GetInvalidGid,
178          error: POSIX_UID_ERROR
179      },
180      'test7': {
181          UID_ATTR: INVALID_UID,
182          GID_ATTR: GetInvalidGid,
183          MODE_ATTR: '640',
184          error: POSIX_UID_ERROR
185      },
186      'test8': {
187          UID_ATTR: INVALID_UID,
188          GID_ATTR: GetPrimaryGid,
189          error: POSIX_UID_ERROR
190      },
191      'test9': {
192          UID_ATTR: INVALID_UID,
193          GID_ATTR: GetNonPrimaryGid,
194          error: POSIX_UID_ERROR
195      },
196      'test10': {
197          UID_ATTR: INVALID_UID,
198          GID_ATTR: GetPrimaryGid,
199          MODE_ATTR: '640',
200          error: POSIX_UID_ERROR
201      },
202      'test11': {
203          UID_ATTR: INVALID_UID,
204          GID_ATTR: GetNonPrimaryGid,
205          MODE_ATTR: '640',
206          error: POSIX_UID_ERROR
207      },
208      'test12': {
209          UID_ATTR: USER_ID,
210          GID_ATTR: GetInvalidGid,
211          error: POSIX_GID_ERROR
212      },
213      'test13': {
214          UID_ATTR: USER_ID,
215          GID_ATTR: GetInvalidGid,
216          MODE_ATTR: '640',
217          error: POSIX_GID_ERROR
218      },
219      'test14': {
220          GID_ATTR: GetPrimaryGid,
221          MODE_ATTR: '240',
222          error: POSIX_INSUFFICIENT_ACCESS_ERROR
223      }
224  }
225  # The first variable below can be used to help debug the test if there is a
226  # problem.
227  for test_name, attrs_dict in six.iteritems(test_params):
228    cls.ClearPOSIXMetadata(obj)
229
230    # Attributes default to None if they are not in attrs_dict; some attrs are
231    # functions or LazyWrapper objects that should be called.
232    uid = attrs_dict.get(UID_ATTR)
233    if uid is not None and callable(uid):
234      uid = uid()
235
236    gid = attrs_dict.get(GID_ATTR)
237    if gid is not None and callable(gid):
238      gid = gid()
239
240    mode = attrs_dict.get(MODE_ATTR)
241
242    cls.SetPOSIXMetadata(cls.default_provider,
243                         bucket_uri.bucket_name,
244                         obj.object_name,
245                         uid=uid,
246                         gid=gid,
247                         mode=mode)
248    stderr = cls.RunGsUtil([
249        'cp' if is_cp else 'mv', '-P',
250        suri(bucket_uri, obj.object_name), tmpdir
251    ],
252                           expected_status=1,
253                           return_stderr=True)
254    cls.assertIn(
255        ORPHANED_FILE, stderr,
256        'Error during test "%s": %s not found in stderr:\n%s' %
257        (test_name, ORPHANED_FILE, stderr))
258    error_regex = BuildErrorRegex(obj, attrs_dict.get(error))
259    cls.assertTrue(
260        error_regex.search(stderr),
261        'Test %s did not match expected error; could not find a match for '
262        '%s\n\nin stderr:\n%s' % (test_name, error_regex.pattern, stderr))
263    listing1 = TailSet(suri(bucket_uri), cls.FlatListBucket(bucket_uri))
264    listing2 = TailSet(tmpdir, cls.FlatListDir(tmpdir))
265    # Bucket should have un-altered content.
266    cls.assertEquals(listing1, set(['/%s' % obj.object_name]))
267    # Dir should have un-altered content.
268    cls.assertEquals(listing2, set(['']))
269
270
271def TestCpMvPOSIXBucketToLocalNoErrors(cls, bucket_uri, tmpdir, is_cp=True):
272  """Helper function for preserve_posix_no_errors tests in test_cp and test_mv.
273
274  Args:
275    cls: An instance of either TestCp or TestMv.
276    bucket_uri: The uri of the bucket that the object is in.
277    tmpdir: The local file path to cp to.
278    is_cp: Whether or not the calling test suite is cp or mv.
279  """
280  primary_gid = os.stat(tmpdir).st_gid
281  non_primary_gid = util.GetNonPrimaryGid()
282  test_params = {
283      'obj1': {
284          GID_ATTR: primary_gid
285      },
286      'obj2': {
287          GID_ATTR: non_primary_gid
288      },
289      'obj3': {
290          GID_ATTR: primary_gid,
291          MODE_ATTR: '440'
292      },
293      'obj4': {
294          GID_ATTR: non_primary_gid,
295          MODE_ATTR: '444'
296      },
297      'obj5': {
298          UID_ATTR: USER_ID
299      },
300      'obj6': {
301          UID_ATTR: USER_ID,
302          MODE_ATTR: '420'
303      },
304      'obj7': {
305          UID_ATTR: USER_ID,
306          GID_ATTR: primary_gid
307      },
308      'obj8': {
309          UID_ATTR: USER_ID,
310          GID_ATTR: non_primary_gid
311      },
312      'obj9': {
313          UID_ATTR: USER_ID,
314          GID_ATTR: primary_gid,
315          MODE_ATTR: '433'
316      },
317      'obj10': {
318          UID_ATTR: USER_ID,
319          GID_ATTR: non_primary_gid,
320          MODE_ATTR: '442'
321      }
322  }
323  for obj_name, attrs_dict in six.iteritems(test_params):
324    uid = attrs_dict.get(UID_ATTR)
325    gid = attrs_dict.get(GID_ATTR)
326    mode = attrs_dict.get(MODE_ATTR)
327    cls.CreateObject(bucket_uri=bucket_uri,
328                     object_name=obj_name,
329                     contents=obj_name.encode(UTF8),
330                     uid=uid,
331                     gid=gid,
332                     mode=mode)
333  for obj_name in six.iterkeys(test_params):
334    # Move objects one at a time to avoid listing consistency.
335    cls.RunGsUtil(
336        ['cp' if is_cp else 'mv', '-P',
337         suri(bucket_uri, obj_name), tmpdir])
338  listing = TailSet(tmpdir, cls.FlatListDir(tmpdir))
339  cls.assertEquals(
340      listing,
341      set([
342          '/obj1', '/obj2', '/obj3', '/obj4', '/obj5', '/obj6', '/obj7',
343          '/obj8', '/obj9', '/obj10'
344      ]))
345  cls.VerifyLocalPOSIXPermissions(os.path.join(tmpdir, 'obj1'),
346                                  gid=primary_gid,
347                                  mode=DEFAULT_MODE)
348  cls.VerifyLocalPOSIXPermissions(os.path.join(tmpdir, 'obj2'),
349                                  gid=non_primary_gid,
350                                  mode=DEFAULT_MODE)
351  cls.VerifyLocalPOSIXPermissions(os.path.join(tmpdir, 'obj3'),
352                                  gid=primary_gid,
353                                  mode=0o440)
354  cls.VerifyLocalPOSIXPermissions(os.path.join(tmpdir, 'obj4'),
355                                  gid=non_primary_gid,
356                                  mode=0o444)
357  cls.VerifyLocalPOSIXPermissions(os.path.join(tmpdir, 'obj5'),
358                                  uid=USER_ID,
359                                  gid=primary_gid,
360                                  mode=DEFAULT_MODE)
361  cls.VerifyLocalPOSIXPermissions(os.path.join(tmpdir, 'obj6'),
362                                  uid=USER_ID,
363                                  gid=primary_gid,
364                                  mode=0o420)
365  cls.VerifyLocalPOSIXPermissions(os.path.join(tmpdir, 'obj7'),
366                                  uid=USER_ID,
367                                  gid=primary_gid,
368                                  mode=DEFAULT_MODE)
369  cls.VerifyLocalPOSIXPermissions(os.path.join(tmpdir, 'obj8'),
370                                  uid=USER_ID,
371                                  gid=non_primary_gid,
372                                  mode=DEFAULT_MODE)
373  cls.VerifyLocalPOSIXPermissions(os.path.join(tmpdir, 'obj9'),
374                                  uid=USER_ID,
375                                  gid=primary_gid,
376                                  mode=0o433)
377  cls.VerifyLocalPOSIXPermissions(os.path.join(tmpdir, 'obj10'),
378                                  uid=USER_ID,
379                                  gid=non_primary_gid,
380                                  mode=0o442)
381
382
383def TestCpMvPOSIXLocalToBucketNoErrors(cls, bucket_uri, is_cp=True):
384  """Helper function for testing local to bucket POSIX preservation.
385
386  Args:
387    cls: An instance of either TestCp or TestMv.
388    bucket_uri: The uri of the bucket to cp/mv to.
389    is_cp: Whether or not the calling test suite is cp or mv.
390  """
391  primary_gid = os.getgid()
392  non_primary_gid = util.GetNonPrimaryGid()
393  test_params = {
394      'obj1': {
395          GID_ATTR: primary_gid
396      },
397      'obj2': {
398          GID_ATTR: non_primary_gid
399      },
400      'obj3': {
401          GID_ATTR: primary_gid,
402          MODE_ATTR: '440'
403      },
404      'obj4': {
405          GID_ATTR: non_primary_gid,
406          MODE_ATTR: '444'
407      },
408      'obj5': {
409          UID_ATTR: USER_ID
410      },
411      'obj6': {
412          UID_ATTR: USER_ID,
413          MODE_ATTR: '420'
414      },
415      'obj7': {
416          UID_ATTR: USER_ID,
417          GID_ATTR: primary_gid
418      },
419      'obj8': {
420          UID_ATTR: USER_ID,
421          GID_ATTR: non_primary_gid
422      },
423      'obj9': {
424          UID_ATTR: USER_ID,
425          GID_ATTR: primary_gid,
426          MODE_ATTR: '433'
427      },
428      'obj10': {
429          UID_ATTR: USER_ID,
430          GID_ATTR: non_primary_gid,
431          MODE_ATTR: '442'
432      }
433  }
434  for obj_name, attrs_dict in six.iteritems(test_params):
435    uid = attrs_dict.get(UID_ATTR, NA_ID)
436    gid = attrs_dict.get(GID_ATTR, NA_ID)
437    mode = attrs_dict.get(MODE_ATTR, NA_MODE)
438    if mode != NA_MODE:
439      ValidatePOSIXMode(int(mode, 8))
440    ValidateFilePermissionAccess(obj_name,
441                                 uid=uid,
442                                 gid=int(gid),
443                                 mode=int(mode))
444    fpath = cls.CreateTempFile(contents=b'foo', uid=uid, gid=gid, mode=mode)
445    cls.RunGsUtil(
446        ['cp' if is_cp else 'mv', '-P', fpath,
447         suri(bucket_uri, obj_name)])
448    if uid != NA_ID:
449      cls.VerifyObjectCustomAttribute(bucket_uri.bucket_name, obj_name,
450                                      UID_ATTR, str(uid))
451    if gid != NA_ID:
452      cls.VerifyObjectCustomAttribute(bucket_uri.bucket_name, obj_name,
453                                      GID_ATTR, str(gid))
454    if mode != NA_MODE:
455      cls.VerifyObjectCustomAttribute(bucket_uri.bucket_name, obj_name,
456                                      MODE_ATTR, str(mode))
457
458
459def _ReadContentsFromFifo(fifo_path, list_for_output):
460  with open(fifo_path, 'rb') as f:
461    list_for_output.append(f.read())
462
463
464def _WriteContentsToFifo(contents, fifo_path):
465  with open(fifo_path, 'wb') as f:
466    f.write(contents)
467
468
469class _JSONForceHTTPErrorCopyCallbackHandler(object):
470  """Test callback handler that raises an arbitrary HTTP error exception."""
471
472  def __init__(self, startover_at_byte, http_error_num):
473    self._startover_at_byte = startover_at_byte
474    self._http_error_num = http_error_num
475    self.started_over_once = False
476
477  # pylint: disable=invalid-name
478  def call(self, total_bytes_transferred, total_size):
479    """Forcibly exits if the transfer has passed the halting point."""
480    if (total_bytes_transferred >= self._startover_at_byte and
481        not self.started_over_once):
482      sys.stderr.write('Forcing HTTP error %s after byte %s. '
483                       '%s/%s transferred.\r\n' %
484                       (self._http_error_num, self._startover_at_byte,
485                        MakeHumanReadable(total_bytes_transferred),
486                        MakeHumanReadable(total_size)))
487      self.started_over_once = True
488      raise apitools_exceptions.HttpError({'status': self._http_error_num},
489                                          None, None)
490
491
492class _XMLResumableUploadStartOverCopyCallbackHandler(object):
493  """Test callback handler that raises start-over exception during upload."""
494
495  def __init__(self, startover_at_byte):
496    self._startover_at_byte = startover_at_byte
497    self.started_over_once = False
498
499  # pylint: disable=invalid-name
500  def call(self, total_bytes_transferred, total_size):
501    """Forcibly exits if the transfer has passed the halting point."""
502    if (total_bytes_transferred >= self._startover_at_byte and
503        not self.started_over_once):
504      sys.stderr.write(
505          'Forcing ResumableUpload start over error after byte %s. '
506          '%s/%s transferred.\r\n' %
507          (self._startover_at_byte, MakeHumanReadable(total_bytes_transferred),
508           MakeHumanReadable(total_size)))
509      self.started_over_once = True
510      raise boto.exception.ResumableUploadException(
511          'Forcing upload start over', ResumableTransferDisposition.START_OVER)
512
513
514class _DeleteBucketThenStartOverCopyCallbackHandler(object):
515  """Test callback handler that deletes bucket then raises start-over."""
516
517  def __init__(self, startover_at_byte, bucket_uri):
518    self._startover_at_byte = startover_at_byte
519    self._bucket_uri = bucket_uri
520    self.started_over_once = False
521
522  # pylint: disable=invalid-name
523  def call(self, total_bytes_transferred, total_size):
524    """Forcibly exits if the transfer has passed the halting point."""
525    if (total_bytes_transferred >= self._startover_at_byte and
526        not self.started_over_once):
527      sys.stderr.write('Deleting bucket (%s)' % (self._bucket_uri.bucket_name))
528
529      @Retry(StorageResponseError, tries=5, timeout_secs=1)
530      def DeleteBucket():
531        bucket_list = list(self._bucket_uri.list_bucket(all_versions=True))
532        for k in bucket_list:
533          self._bucket_uri.get_bucket().delete_key(k.name,
534                                                   version_id=k.version_id)
535        self._bucket_uri.delete_bucket()
536
537      DeleteBucket()
538      sys.stderr.write(
539          'Forcing ResumableUpload start over error after byte %s. '
540          '%s/%s transferred.\r\n' %
541          (self._startover_at_byte, MakeHumanReadable(total_bytes_transferred),
542           MakeHumanReadable(total_size)))
543      self.started_over_once = True
544      raise ResumableUploadStartOverException('Artificially forcing start-over')
545
546
547class _ResumableUploadRetryHandler(object):
548  """Test callback handler for causing retries during a resumable transfer."""
549
550  def __init__(self,
551               retry_at_byte,
552               exception_to_raise,
553               exc_args,
554               num_retries=1):
555    self._retry_at_byte = retry_at_byte
556    self._exception_to_raise = exception_to_raise
557    self._exception_args = exc_args
558    self._num_retries = num_retries
559
560    self._retries_made = 0
561
562  # pylint: disable=invalid-name
563  def call(self, total_bytes_transferred, unused_total_size):
564    """Cause a single retry at the retry point."""
565    if (total_bytes_transferred >= self._retry_at_byte and
566        self._retries_made < self._num_retries):
567      self._retries_made += 1
568      raise self._exception_to_raise(*self._exception_args)
569
570
571class TestCp(testcase.GsUtilIntegrationTestCase):
572  """Integration tests for cp command."""
573
574  # For tests that artificially halt, we need to ensure at least one callback
575  # occurs.
576  halt_size = START_CALLBACK_PER_BYTES * 2
577
578  def _get_test_file(self, name):
579    contents = pkgutil.get_data('gslib', 'tests/test_data/%s' % name)
580    return self.CreateTempFile(file_name=name, contents=contents)
581
582  def _CpWithFifoViaGsUtilAndAppendOutputToList(self, src_path_tuple, dst_path,
583                                                list_for_return_value,
584                                                **kwargs):
585    arg_list = ['cp']
586    arg_list.extend(src_path_tuple)
587    arg_list.append(dst_path)
588    # Append stderr, stdout, or return status (if specified in kwargs) to the
589    # given list.
590    list_for_return_value.append(self.RunGsUtil(arg_list, **kwargs))
591
592  @SequentialAndParallelTransfer
593  def test_noclobber(self):
594    key_uri = self.CreateObject(contents=b'foo')
595    fpath = self.CreateTempFile(contents=b'bar')
596    stderr = self.RunGsUtil(
597        ['cp', '-n', fpath, suri(key_uri)], return_stderr=True)
598    self.assertIn('Skipping existing item: %s' % suri(key_uri), stderr)
599    self.assertEqual(key_uri.get_contents_as_string(), b'foo')
600    stderr = self.RunGsUtil(['cp', '-n', suri(key_uri), fpath],
601                            return_stderr=True)
602    with open(fpath, 'rb') as f:
603      self.assertIn('Skipping existing item: %s' % suri(f), stderr)
604      self.assertEqual(f.read(), b'bar')
605
606  @SequentialAndParallelTransfer
607  def test_noclobber_different_size(self):
608    key_uri = self.CreateObject(contents=b'foo')
609    fpath = self.CreateTempFile(contents=b'quux')
610    stderr = self.RunGsUtil(
611        ['cp', '-n', fpath, suri(key_uri)], return_stderr=True)
612    self.assertIn('Skipping existing item: %s' % suri(key_uri), stderr)
613    self.assertEqual(key_uri.get_contents_as_string(), b'foo')
614    stderr = self.RunGsUtil(['cp', '-n', suri(key_uri), fpath],
615                            return_stderr=True)
616    with open(fpath, 'rb') as f:
617      self.assertIn('Skipping existing item: %s' % suri(f), stderr)
618      self.assertEqual(f.read(), b'quux')
619
620  def test_dest_bucket_not_exist(self):
621    fpath = self.CreateTempFile(contents=b'foo')
622    invalid_bucket_uri = ('%s://%s' %
623                          (self.default_provider, self.nonexistent_bucket_name))
624    # TODO(b/135780661): Remove retry after bug resolved
625    @Retry(AssertionError, tries=3, timeout_secs=1)
626    def _Check():
627      stderr = self.RunGsUtil(['cp', fpath, invalid_bucket_uri],
628                              expected_status=1,
629                              return_stderr=True)
630      self.assertIn('does not exist', stderr)
631
632    _Check()
633
634  def test_copy_in_cloud_noclobber(self):
635    bucket1_uri = self.CreateBucket()
636    bucket2_uri = self.CreateBucket()
637    key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents=b'foo')
638    stderr = self.RunGsUtil(
639        ['cp', suri(key_uri), suri(bucket2_uri)], return_stderr=True)
640    # Rewrite API may output an additional 'Copying' progress notification.
641    self.assertGreaterEqual(stderr.count('Copying'), 1)
642    self.assertLessEqual(stderr.count('Copying'), 2)
643    stderr = self.RunGsUtil(
644        ['cp', '-n', suri(key_uri),
645         suri(bucket2_uri)], return_stderr=True)
646    self.assertIn(
647        'Skipping existing item: %s' % suri(bucket2_uri, key_uri.object_name),
648        stderr)
649
650  @unittest.skipIf(IS_WINDOWS, 'os.mkfifo not available on Windows.')
651  @SequentialAndParallelTransfer
652  def test_cp_from_local_file_to_fifo(self):
653    contents = b'bar'
654    fifo_path = self.CreateTempFifo()
655    file_path = self.CreateTempFile(contents=contents)
656    list_for_output = []
657
658    read_thread = threading.Thread(target=_ReadContentsFromFifo,
659                                   args=(fifo_path, list_for_output))
660    read_thread.start()
661    write_thread = threading.Thread(
662        target=self._CpWithFifoViaGsUtilAndAppendOutputToList,
663        args=((file_path,), fifo_path, []))
664    write_thread.start()
665    write_thread.join(120)
666    read_thread.join(120)
667    if not list_for_output:
668      self.fail('Reading/writing to the fifo timed out.')
669    self.assertEqual(list_for_output[0].strip(), contents)
670
671  @unittest.skipIf(IS_WINDOWS, 'os.mkfifo not available on Windows.')
672  @SequentialAndParallelTransfer
673  def test_cp_from_one_object_to_fifo(self):
674    fifo_path = self.CreateTempFifo()
675    bucket_uri = self.CreateBucket()
676    contents = b'bar'
677    obj_uri = self.CreateObject(bucket_uri=bucket_uri, contents=contents)
678    list_for_output = []
679
680    read_thread = threading.Thread(target=_ReadContentsFromFifo,
681                                   args=(fifo_path, list_for_output))
682    read_thread.start()
683    write_thread = threading.Thread(
684        target=self._CpWithFifoViaGsUtilAndAppendOutputToList,
685        args=((suri(obj_uri),), fifo_path, []))
686    write_thread.start()
687    write_thread.join(120)
688    read_thread.join(120)
689    if not list_for_output:
690      self.fail('Reading/writing to the fifo timed out.')
691    self.assertEqual(list_for_output[0].strip(), contents)
692
693  @unittest.skipIf(IS_WINDOWS, 'os.mkfifo not available on Windows.')
694  @SequentialAndParallelTransfer
695  def test_cp_from_multiple_objects_to_fifo(self):
696    fifo_path = self.CreateTempFifo()
697    bucket_uri = self.CreateBucket()
698    contents1 = b'foo and bar'
699    contents2 = b'baz and qux'
700    obj1_uri = self.CreateObject(bucket_uri=bucket_uri, contents=contents1)
701    obj2_uri = self.CreateObject(bucket_uri=bucket_uri, contents=contents2)
702    list_for_output = []
703
704    read_thread = threading.Thread(target=_ReadContentsFromFifo,
705                                   args=(fifo_path, list_for_output))
706    read_thread.start()
707    write_thread = threading.Thread(
708        target=self._CpWithFifoViaGsUtilAndAppendOutputToList,
709        args=((suri(obj1_uri), suri(obj2_uri)), fifo_path, []))
710    write_thread.start()
711    write_thread.join(120)
712    read_thread.join(120)
713    if not list_for_output:
714      self.fail('Reading/writing to the fifo timed out.')
715    self.assertIn(contents1, list_for_output[0])
716    self.assertIn(contents2, list_for_output[0])
717
718  @SequentialAndParallelTransfer
719  def test_streaming(self):
720    bucket_uri = self.CreateBucket()
721    stderr = self.RunGsUtil(
722        ['cp', '-', '%s' % suri(bucket_uri, 'foo')],
723        stdin='bar',
724        return_stderr=True)
725    self.assertIn('Copying from <STDIN>', stderr)
726    key_uri = bucket_uri.clone_replace_name('foo')
727    self.assertEqual(key_uri.get_contents_as_string(), b'bar')
728
729  @unittest.skipIf(IS_WINDOWS, 'os.mkfifo not available on Windows.')
730  @SequentialAndParallelTransfer
731  def test_streaming_from_fifo_to_object(self):
732    bucket_uri = self.CreateBucket()
733    fifo_path = self.CreateTempFifo()
734    object_name = 'foo'
735    object_contents = b'bar'
736    list_for_output = []
737
738    # Start writer in the background, which won't finish until a corresponding
739    # read operation is performed on the fifo.
740    write_thread = threading.Thread(target=_WriteContentsToFifo,
741                                    args=(object_contents, fifo_path))
742    write_thread.start()
743    # The fifo requires both a pending read and write before either operation
744    # will complete. Regardless of which operation occurs first, the
745    # corresponding subsequent operation will unblock the first one.
746    # We run gsutil in a thread so that it can timeout rather than hang forever
747    # if the write thread fails.
748    read_thread = threading.Thread(
749        target=self._CpWithFifoViaGsUtilAndAppendOutputToList,
750        args=((fifo_path,), suri(bucket_uri, object_name), list_for_output),
751        kwargs={'return_stderr': True})
752    read_thread.start()
753
754    read_thread.join(120)
755    write_thread.join(120)
756    if not list_for_output:
757      self.fail('Reading/writing to the fifo timed out.')
758    self.assertIn('Copying from named pipe', list_for_output[0])
759
760    key_uri = bucket_uri.clone_replace_name(object_name)
761    self.assertEqual(key_uri.get_contents_as_string(), object_contents)
762
763  @unittest.skipIf(IS_WINDOWS, 'os.mkfifo not available on Windows.')
764  @SequentialAndParallelTransfer
765  def test_streaming_from_fifo_to_stdout(self):
766    fifo_path = self.CreateTempFifo()
767    contents = b'bar'
768    list_for_output = []
769
770    write_thread = threading.Thread(target=_WriteContentsToFifo,
771                                    args=(contents, fifo_path))
772    write_thread.start()
773    read_thread = threading.Thread(
774        target=self._CpWithFifoViaGsUtilAndAppendOutputToList,
775        args=((fifo_path,), '-', list_for_output),
776        kwargs={'return_stdout': True})
777    read_thread.start()
778    read_thread.join(120)
779    write_thread.join(120)
780    if not list_for_output:
781      self.fail('Reading/writing to the fifo timed out.')
782    self.assertEqual(list_for_output[0].strip().encode('ascii'), contents)
783
784  @unittest.skipIf(IS_WINDOWS, 'os.mkfifo not available on Windows.')
785  @SequentialAndParallelTransfer
786  def test_streaming_from_stdout_to_fifo(self):
787    fifo_path = self.CreateTempFifo()
788    contents = b'bar'
789    list_for_output = []
790    list_for_gsutil_output = []
791
792    read_thread = threading.Thread(target=_ReadContentsFromFifo,
793                                   args=(fifo_path, list_for_output))
794    read_thread.start()
795    write_thread = threading.Thread(
796        target=self._CpWithFifoViaGsUtilAndAppendOutputToList,
797        args=(('-',), fifo_path, list_for_gsutil_output),
798        kwargs={
799            'return_stderr': True,
800            'stdin': contents
801        })
802    write_thread.start()
803    write_thread.join(120)
804    read_thread.join(120)
805    if not list_for_output:
806      self.fail('Reading/writing to the fifo timed out.')
807    self.assertEqual(list_for_output[0].strip(), contents)
808
809  def test_streaming_multiple_arguments(self):
810    bucket_uri = self.CreateBucket()
811    stderr = self.RunGsUtil(['cp', '-', '-', suri(bucket_uri)],
812                            stdin='bar',
813                            return_stderr=True,
814                            expected_status=1)
815    self.assertIn('Multiple URL strings are not supported with streaming',
816                  stderr)
817
818  # TODO: Implement a way to test both with and without using magic file.
819
820  @SequentialAndParallelTransfer
821  def test_detect_content_type(self):
822    """Tests local detection of content type."""
823    bucket_uri = self.CreateBucket()
824    dsturi = suri(bucket_uri, 'foo')
825
826    self.RunGsUtil(['cp', self._get_test_file('test.mp3'), dsturi])
827
828    # Use @Retry as hedge against bucket listing eventual consistency.
829    @Retry(AssertionError, tries=3, timeout_secs=1)
830    def _Check1():
831      stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
832      if IS_WINDOWS:
833        self.assertTrue(
834            re.search(r'Content-Type:\s+audio/x-mpg', stdout) or
835            re.search(r'Content-Type:\s+audio/mpeg', stdout))
836      else:
837        self.assertRegex(stdout, r'Content-Type:\s+audio/mpeg')
838
839    _Check1()
840
841    self.RunGsUtil(['cp', self._get_test_file('test.gif'), dsturi])
842
843    # Use @Retry as hedge against bucket listing eventual consistency.
844    @Retry(AssertionError, tries=3, timeout_secs=1)
845    def _Check2():
846      stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
847      self.assertRegex(stdout, r'Content-Type:\s+image/gif')
848
849    _Check2()
850
851  def test_content_type_override_default(self):
852    """Tests overriding content type with the default value."""
853    bucket_uri = self.CreateBucket()
854    dsturi = suri(bucket_uri, 'foo')
855
856    self.RunGsUtil(
857        ['-h', 'Content-Type:', 'cp',
858         self._get_test_file('test.mp3'), dsturi])
859
860    # Use @Retry as hedge against bucket listing eventual consistency.
861    @Retry(AssertionError, tries=3, timeout_secs=1)
862    def _Check1():
863      stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
864      self.assertRegex(stdout, r'Content-Type:\s+application/octet-stream')
865
866    _Check1()
867
868    self.RunGsUtil(
869        ['-h', 'Content-Type:', 'cp',
870         self._get_test_file('test.gif'), dsturi])
871
872    # Use @Retry as hedge against bucket listing eventual consistency.
873    @Retry(AssertionError, tries=3, timeout_secs=1)
874    def _Check2():
875      stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
876      self.assertRegex(stdout, r'Content-Type:\s+application/octet-stream')
877
878    _Check2()
879
880  def test_content_type_override(self):
881    """Tests overriding content type with a value."""
882    bucket_uri = self.CreateBucket()
883    dsturi = suri(bucket_uri, 'foo')
884
885    self.RunGsUtil([
886        '-h', 'Content-Type:text/plain', 'cp',
887        self._get_test_file('test.mp3'), dsturi
888    ])
889
890    # Use @Retry as hedge against bucket listing eventual consistency.
891    @Retry(AssertionError, tries=3, timeout_secs=1)
892    def _Check1():
893      stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
894      self.assertRegex(stdout, r'Content-Type:\s+text/plain')
895
896    _Check1()
897
898    self.RunGsUtil([
899        '-h', 'Content-Type:text/plain', 'cp',
900        self._get_test_file('test.gif'), dsturi
901    ])
902
903    # Use @Retry as hedge against bucket listing eventual consistency.
904    @Retry(AssertionError, tries=3, timeout_secs=1)
905    def _Check2():
906      stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
907      self.assertRegex(stdout, r'Content-Type:\s+text/plain')
908
909    _Check2()
910
911  @unittest.skipIf(IS_WINDOWS, 'magicfile is not available on Windows.')
912  @SequentialAndParallelTransfer
913  def test_magicfile_override(self):
914    """Tests content type override with magicfile value."""
915    bucket_uri = self.CreateBucket()
916    dsturi = suri(bucket_uri, 'foo')
917    fpath = self.CreateTempFile(contents=b'foo/bar\n')
918    self.RunGsUtil(['cp', fpath, dsturi])
919
920    # Use @Retry as hedge against bucket listing eventual consistency.
921    @Retry(AssertionError, tries=3, timeout_secs=1)
922    def _Check1():
923      stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
924      use_magicfile = boto.config.getbool('GSUtil', 'use_magicfile', False)
925      content_type = ('text/plain'
926                      if use_magicfile else 'application/octet-stream')
927      self.assertRegex(stdout, r'Content-Type:\s+%s' % content_type)
928
929    _Check1()
930
931  @SequentialAndParallelTransfer
932  def test_content_type_mismatches(self):
933    """Tests overriding content type when it does not match the file type."""
934    bucket_uri = self.CreateBucket()
935    dsturi = suri(bucket_uri, 'foo')
936    fpath = self.CreateTempFile(contents=b'foo/bar\n')
937
938    self.RunGsUtil([
939        '-h', 'Content-Type:image/gif', 'cp',
940        self._get_test_file('test.mp3'), dsturi
941    ])
942
943    # Use @Retry as hedge against bucket listing eventual consistency.
944    @Retry(AssertionError, tries=3, timeout_secs=1)
945    def _Check1():
946      stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
947      self.assertRegex(stdout, r'Content-Type:\s+image/gif')
948
949    _Check1()
950
951    self.RunGsUtil([
952        '-h', 'Content-Type:image/gif', 'cp',
953        self._get_test_file('test.gif'), dsturi
954    ])
955
956    # Use @Retry as hedge against bucket listing eventual consistency.
957    @Retry(AssertionError, tries=3, timeout_secs=1)
958    def _Check2():
959      stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
960      self.assertRegex(stdout, r'Content-Type:\s+image/gif')
961
962    _Check2()
963
964    self.RunGsUtil(['-h', 'Content-Type:image/gif', 'cp', fpath, dsturi])
965
966    # Use @Retry as hedge against bucket listing eventual consistency.
967    @Retry(AssertionError, tries=3, timeout_secs=1)
968    def _Check3():
969      stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
970      self.assertRegex(stdout, r'Content-Type:\s+image/gif')
971
972    _Check3()
973
974  @SequentialAndParallelTransfer
975  def test_content_type_header_case_insensitive(self):
976    """Tests that content type header is treated with case insensitivity."""
977    bucket_uri = self.CreateBucket()
978    dsturi = suri(bucket_uri, 'foo')
979    fpath = self._get_test_file('test.gif')
980
981    self.RunGsUtil(['-h', 'content-Type:text/plain', 'cp', fpath, dsturi])
982
983    # Use @Retry as hedge against bucket listing eventual consistency.
984    @Retry(AssertionError, tries=3, timeout_secs=1)
985    def _Check1():
986      stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
987      self.assertRegex(stdout, r'Content-Type:\s+text/plain')
988      self.assertNotRegex(stdout, r'image/gif')
989
990    _Check1()
991
992    self.RunGsUtil([
993        '-h', 'CONTENT-TYPE:image/gif', '-h', 'content-type:image/gif', 'cp',
994        fpath, dsturi
995    ])
996
997    # Use @Retry as hedge against bucket listing eventual consistency.
998    @Retry(AssertionError, tries=3, timeout_secs=1)
999    def _Check2():
1000      stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
1001      self.assertRegex(stdout, r'Content-Type:\s+image/gif')
1002      self.assertNotRegex(stdout, r'image/gif,\s*image/gif')
1003
1004    _Check2()
1005
1006  @SequentialAndParallelTransfer
1007  def test_other_headers(self):
1008    """Tests that non-content-type headers are applied successfully on copy."""
1009    bucket_uri = self.CreateBucket()
1010    dst_uri = suri(bucket_uri, 'foo')
1011    fpath = self._get_test_file('test.gif')
1012
1013    self.RunGsUtil([
1014        '-h', 'Cache-Control:public,max-age=12', '-h',
1015        'x-%s-meta-1:abcd' % self.provider_custom_meta, 'cp', fpath, dst_uri
1016    ])
1017
1018    stdout = self.RunGsUtil(['ls', '-L', dst_uri], return_stdout=True)
1019    self.assertRegex(stdout, r'Cache-Control\s*:\s*public,max-age=12')
1020    self.assertRegex(stdout, r'Metadata:\s*1:\s*abcd')
1021
1022    dst_uri2 = suri(bucket_uri, 'bar')
1023    self.RunGsUtil(['cp', dst_uri, dst_uri2])
1024    # Ensure metadata was preserved across copy.
1025    stdout = self.RunGsUtil(['ls', '-L', dst_uri2], return_stdout=True)
1026    self.assertRegex(stdout, r'Cache-Control\s*:\s*public,max-age=12')
1027    self.assertRegex(stdout, r'Metadata:\s*1:\s*abcd')
1028
1029  @SequentialAndParallelTransfer
1030  def test_request_reason_header(self):
1031    """Test that x-goog-request-header can be set using the environment variable."""
1032    os.environ['CLOUDSDK_CORE_REQUEST_REASON'] = 'b/this_is_env_reason'
1033    bucket_uri = self.CreateBucket()
1034    dst_uri = suri(bucket_uri, 'foo')
1035    fpath = self._get_test_file('test.gif')
1036    # Ensure x-goog-request-header is set in cp command
1037    stderr = self.RunGsUtil(['-D', 'cp', fpath, dst_uri], return_stderr=True)
1038    self.assertRegex(stderr,
1039                     r'\'x-goog-request-reason\': \'b/this_is_env_reason\'')
1040    # Ensure x-goog-request-header is set in ls command
1041    stderr = self.RunGsUtil(['-D', 'ls', '-L', dst_uri], return_stderr=True)
1042    self.assertRegex(stderr,
1043                     r'\'x-goog-request-reason\': \'b/this_is_env_reason\'')
1044
1045  @SequentialAndParallelTransfer
1046  @SkipForXML('XML APIs use a different debug log format.')
1047  def test_request_reason_header_persists_multiple_requests_json(self):
1048    """Test that x-goog-request-header works when cp sends multiple requests."""
1049    os.environ['CLOUDSDK_CORE_REQUEST_REASON'] = 'b/this_is_env_reason'
1050    bucket_uri = self.CreateBucket()
1051    dst_uri = suri(bucket_uri, 'foo')
1052    fpath = self._get_test_file('test.gif')
1053
1054    boto_config_for_test = ('GSUtil', 'resumable_threshold', '0')
1055    with SetBotoConfigForTest([boto_config_for_test]):
1056      stderr = self.RunGsUtil(['-D', 'cp', fpath, dst_uri], return_stderr=True)
1057
1058    # PUT follows GET request. Both need the request-reason header.
1059    reason_regex = (r'Making http GET[\s\S]*'
1060                    r'x-goog-request-reason\': \'b/this_is_env_reason[\s\S]*'
1061                    r'send: (b\')?PUT[\s\S]*x-goog-request-reason:'
1062                    r' b/this_is_env_reason')
1063    self.assertRegex(stderr, reason_regex)
1064
1065  @SequentialAndParallelTransfer
1066  @SkipForJSON('JSON API uses a different debug log format.')
1067  def test_request_reason_header_persists_multiple_requests_xml(self):
1068    """Test that x-goog-request-header works when cp sends multiple requests."""
1069    os.environ['CLOUDSDK_CORE_REQUEST_REASON'] = 'b/this_is_env_reason'
1070    bucket_uri = self.CreateBucket()
1071    dst_uri = suri(bucket_uri, 'foo')
1072    fpath = self._get_test_file('test.gif')
1073
1074    boto_config_for_test = ('GSUtil', 'resumable_threshold', '0')
1075    with SetBotoConfigForTest([boto_config_for_test]):
1076      stderr = self.RunGsUtil(['-D', 'cp', fpath, dst_uri], return_stderr=True)
1077
1078    reason_regex = (
1079        r'Final headers: \{[\s\S]*\''
1080        r'x-goog-request-reason\': \'b/this_is_env_reason\'[\s\S]*}')
1081
1082    # Pattern should match twice since two requests should have a reason header.
1083    self.assertRegex(stderr, reason_regex + r'[\s\S]*' + reason_regex)
1084
1085  @SequentialAndParallelTransfer
1086  def test_versioning(self):
1087    """Tests copy with versioning."""
1088    bucket_uri = self.CreateVersionedBucket()
1089    k1_uri = self.CreateObject(bucket_uri=bucket_uri, contents=b'data2')
1090    k2_uri = self.CreateObject(bucket_uri=bucket_uri, contents=b'data1')
1091    g1 = urigen(k2_uri)
1092    self.RunGsUtil(['cp', suri(k1_uri), suri(k2_uri)])
1093    k2_uri = bucket_uri.clone_replace_name(k2_uri.object_name)
1094    k2_uri = bucket_uri.clone_replace_key(k2_uri.get_key())
1095    g2 = urigen(k2_uri)
1096    k2_uri.set_contents_from_string('data3')
1097    g3 = urigen(k2_uri)
1098
1099    fpath = self.CreateTempFile()
1100    # Check to make sure current version is data3.
1101    self.RunGsUtil(['cp', k2_uri.versionless_uri, fpath])
1102    with open(fpath, 'rb') as f:
1103      self.assertEqual(f.read(), b'data3')
1104
1105    # Check contents of all three versions
1106    self.RunGsUtil(['cp', '%s#%s' % (k2_uri.versionless_uri, g1), fpath])
1107    with open(fpath, 'rb') as f:
1108      self.assertEqual(f.read(), b'data1')
1109    self.RunGsUtil(['cp', '%s#%s' % (k2_uri.versionless_uri, g2), fpath])
1110    with open(fpath, 'rb') as f:
1111      self.assertEqual(f.read(), b'data2')
1112    self.RunGsUtil(['cp', '%s#%s' % (k2_uri.versionless_uri, g3), fpath])
1113    with open(fpath, 'rb') as f:
1114      self.assertEqual(f.read(), b'data3')
1115
1116    # Copy first version to current and verify.
1117    self.RunGsUtil(
1118        ['cp',
1119         '%s#%s' % (k2_uri.versionless_uri, g1), k2_uri.versionless_uri])
1120    self.RunGsUtil(['cp', k2_uri.versionless_uri, fpath])
1121    with open(fpath, 'rb') as f:
1122      self.assertEqual(f.read(), b'data1')
1123
1124    # Attempt to specify a version-specific URI for destination.
1125    stderr = self.RunGsUtil(['cp', fpath, k2_uri.uri],
1126                            return_stderr=True,
1127                            expected_status=1)
1128    self.assertIn('cannot be the destination for gsutil cp', stderr)
1129
1130  def test_versioning_no_parallelism(self):
1131    """Tests that copy all-versions errors when parallelism is enabled."""
1132    # TODO(b/135780661): Remove retry after bug resolved
1133    @Retry(AssertionError, tries=3, timeout_secs=1)
1134    def _Check():
1135      stderr = self.RunGsUtil([
1136          '-m', 'cp', '-A',
1137          suri(self.nonexistent_bucket_name, 'foo'),
1138          suri(self.nonexistent_bucket_name, 'bar')
1139      ],
1140                              expected_status=1,
1141                              return_stderr=True)
1142      self.assertIn('-m option is not supported with the cp -A flag', stderr)
1143
1144    _Check()
1145
1146  @SkipForS3('S3 lists versioned objects in reverse timestamp order.')
1147  def test_recursive_copying_versioned_bucket(self):
1148    """Tests cp -R with versioned buckets."""
1149    bucket1_uri = self.CreateVersionedBucket()
1150    bucket2_uri = self.CreateVersionedBucket()
1151    bucket3_uri = self.CreateVersionedBucket()
1152
1153    # Write two versions of an object to the bucket1.
1154    v1_uri = self.CreateObject(bucket_uri=bucket1_uri,
1155                               object_name='k',
1156                               contents=b'data0')
1157    self.CreateObject(bucket_uri=bucket1_uri,
1158                      object_name='k',
1159                      contents=b'longer_data1',
1160                      gs_idempotent_generation=urigen(v1_uri))
1161
1162    self.AssertNObjectsInBucket(bucket1_uri, 2, versioned=True)
1163    self.AssertNObjectsInBucket(bucket2_uri, 0, versioned=True)
1164    self.AssertNObjectsInBucket(bucket3_uri, 0, versioned=True)
1165
1166    # Recursively copy to second versioned bucket.
1167    # -A flag should copy all versions in order.
1168    self.RunGsUtil(
1169        ['cp', '-R', '-A',
1170         suri(bucket1_uri, '*'),
1171         suri(bucket2_uri)])
1172
1173    # Use @Retry as hedge against bucket listing eventual consistency.
1174    @Retry(AssertionError, tries=3, timeout_secs=1)
1175    def _Check2():
1176      """Validates the results of the cp -R."""
1177      listing1 = self.RunGsUtil(['ls', '-la', suri(bucket1_uri)],
1178                                return_stdout=True).split('\n')
1179      listing2 = self.RunGsUtil(['ls', '-la', suri(bucket2_uri)],
1180                                return_stdout=True).split('\n')
1181      # 2 lines of listing output, 1 summary line, 1 empty line from \n split.
1182      self.assertEquals(len(listing1), 4)
1183      self.assertEquals(len(listing2), 4)
1184
1185      # First object in each bucket should match in size and version-less name.
1186      size1, _, uri_str1, _ = listing1[0].split()
1187      self.assertEquals(size1, str(len('data0')))
1188      self.assertEquals(storage_uri(uri_str1).object_name, 'k')
1189      size2, _, uri_str2, _ = listing2[0].split()
1190      self.assertEquals(size2, str(len('data0')))
1191      self.assertEquals(storage_uri(uri_str2).object_name, 'k')
1192
1193      # Similarly for second object in each bucket.
1194      size1, _, uri_str1, _ = listing1[1].split()
1195      self.assertEquals(size1, str(len('longer_data1')))
1196      self.assertEquals(storage_uri(uri_str1).object_name, 'k')
1197      size2, _, uri_str2, _ = listing2[1].split()
1198      self.assertEquals(size2, str(len('longer_data1')))
1199      self.assertEquals(storage_uri(uri_str2).object_name, 'k')
1200
1201    _Check2()
1202
1203    # Recursively copy to second versioned bucket with no -A flag.
1204    # This should copy only the live object.
1205    self.RunGsUtil(['cp', '-R', suri(bucket1_uri, '*'), suri(bucket3_uri)])
1206
1207    # Use @Retry as hedge against bucket listing eventual consistency.
1208    @Retry(AssertionError, tries=3, timeout_secs=1)
1209    def _Check3():
1210      """Validates the results of the cp -R."""
1211      listing1 = self.RunGsUtil(['ls', '-la', suri(bucket1_uri)],
1212                                return_stdout=True).split('\n')
1213      listing2 = self.RunGsUtil(['ls', '-la', suri(bucket3_uri)],
1214                                return_stdout=True).split('\n')
1215      # 2 lines of listing output, 1 summary line, 1 empty line from \n split.
1216      self.assertEquals(len(listing1), 4)
1217      # 1 lines of listing output, 1 summary line, 1 empty line from \n split.
1218      self.assertEquals(len(listing2), 3)
1219
1220      # Live (second) object in bucket 1 should match the single live object.
1221      size1, _, uri_str1, _ = listing2[0].split()
1222      self.assertEquals(size1, str(len('longer_data1')))
1223      self.assertEquals(storage_uri(uri_str1).object_name, 'k')
1224
1225    _Check3()
1226
1227  @SequentialAndParallelTransfer
1228  @SkipForS3('Preconditions not supported for S3.')
1229  def test_cp_generation_zero_match(self):
1230    """Tests that cp handles an object-not-exists precondition header."""
1231    bucket_uri = self.CreateBucket()
1232    fpath1 = self.CreateTempFile(contents=b'data1')
1233    # Match 0 means only write the object if it doesn't already exist.
1234    gen_match_header = 'x-goog-if-generation-match:0'
1235
1236    # First copy should succeed.
1237    # TODO: This can fail (rarely) if the server returns a 5xx but actually
1238    # commits the bytes. If we add restarts on small uploads, handle this
1239    # case.
1240    self.RunGsUtil(['-h', gen_match_header, 'cp', fpath1, suri(bucket_uri)])
1241
1242    # Second copy should fail with a precondition error.
1243    stderr = self.RunGsUtil(
1244        ['-h', gen_match_header, 'cp', fpath1,
1245         suri(bucket_uri)],
1246        return_stderr=True,
1247        expected_status=1)
1248    self.assertIn('PreconditionException', stderr)
1249
1250  @SequentialAndParallelTransfer
1251  @SkipForS3('Preconditions not supported for S3.')
1252  def test_cp_v_generation_match(self):
1253    """Tests that cp -v option handles the if-generation-match header."""
1254    bucket_uri = self.CreateVersionedBucket()
1255    k1_uri = self.CreateObject(bucket_uri=bucket_uri, contents=b'data1')
1256    g1 = k1_uri.generation
1257
1258    tmpdir = self.CreateTempDir()
1259    fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents=b'data2')
1260
1261    gen_match_header = 'x-goog-if-generation-match:%s' % g1
1262    # First copy should succeed.
1263    self.RunGsUtil(['-h', gen_match_header, 'cp', fpath1, suri(k1_uri)])
1264
1265    # Second copy should fail the precondition.
1266    stderr = self.RunGsUtil(
1267        ['-h', gen_match_header, 'cp', fpath1,
1268         suri(k1_uri)],
1269        return_stderr=True,
1270        expected_status=1)
1271
1272    self.assertIn('PreconditionException', stderr)
1273
1274    # Specifiying a generation with -n should fail before the request hits the
1275    # server.
1276    stderr = self.RunGsUtil(
1277        ['-h', gen_match_header, 'cp', '-n', fpath1,
1278         suri(k1_uri)],
1279        return_stderr=True,
1280        expected_status=1)
1281
1282    self.assertIn('ArgumentException', stderr)
1283    self.assertIn(
1284        'Specifying x-goog-if-generation-match is not supported '
1285        'with cp -n', stderr)
1286
1287  @SequentialAndParallelTransfer
1288  def test_cp_nv(self):
1289    """Tests that cp -nv works when skipping existing file."""
1290    bucket_uri = self.CreateVersionedBucket()
1291    k1_uri = self.CreateObject(bucket_uri=bucket_uri, contents=b'data1')
1292
1293    tmpdir = self.CreateTempDir()
1294    fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents=b'data2')
1295
1296    # First copy should succeed.
1297    self.RunGsUtil(['cp', '-nv', fpath1, suri(k1_uri)])
1298
1299    # Second copy should skip copying.
1300    stderr = self.RunGsUtil(
1301        ['cp', '-nv', fpath1, suri(k1_uri)], return_stderr=True)
1302    self.assertIn('Skipping existing item:', stderr)
1303
1304  @SequentialAndParallelTransfer
1305  @SkipForS3('S3 lists versioned objects in reverse timestamp order.')
1306  def test_cp_v_option(self):
1307    """"Tests that cp -v returns the created object's version-specific URI."""
1308    bucket_uri = self.CreateVersionedBucket()
1309    k1_uri = self.CreateObject(bucket_uri=bucket_uri, contents=b'data1')
1310    k2_uri = self.CreateObject(bucket_uri=bucket_uri, contents=b'data2')
1311
1312    # Case 1: Upload file to object using one-shot PUT.
1313    tmpdir = self.CreateTempDir()
1314    fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents=b'data1')
1315    self._run_cp_minus_v_test('-v', fpath1, k2_uri.uri)
1316
1317    # Case 2: Upload file to object using resumable upload.
1318    size_threshold = ONE_KIB
1319    boto_config_for_test = ('GSUtil', 'resumable_threshold',
1320                            str(size_threshold))
1321    with SetBotoConfigForTest([boto_config_for_test]):
1322      file_as_string = os.urandom(size_threshold)
1323      tmpdir = self.CreateTempDir()
1324      fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents=file_as_string)
1325      self._run_cp_minus_v_test('-v', fpath1, k2_uri.uri)
1326
1327    # Case 3: Upload stream to object.
1328    self._run_cp_minus_v_test('-v', '-', k2_uri.uri)
1329
1330    # Case 4: Download object to file. For this case we just expect output of
1331    # gsutil cp -v to be the URI of the file.
1332    tmpdir = self.CreateTempDir()
1333    fpath1 = self.CreateTempFile(tmpdir=tmpdir)
1334    dst_uri = storage_uri(fpath1)
1335    stderr = self.RunGsUtil(
1336        ['cp', '-v', suri(k1_uri), suri(dst_uri)], return_stderr=True)
1337    # TODO: Add ordering assertion (should be in stderr.split('\n)[-2]) back
1338    # once both the creation and status messages are handled by the UI thread.
1339    self.assertIn('Created: %s\n' % dst_uri.uri, stderr)
1340
1341    # Case 5: Daisy-chain from object to object.
1342    self._run_cp_minus_v_test('-Dv', k1_uri.uri, k2_uri.uri)
1343
1344    # Case 6: Copy object to object in-the-cloud.
1345    self._run_cp_minus_v_test('-v', k1_uri.uri, k2_uri.uri)
1346
1347  def _run_cp_minus_v_test(self, opt, src_str, dst_str):
1348    """Runs cp -v with the options and validates the results."""
1349    stderr = self.RunGsUtil(['cp', opt, src_str, dst_str], return_stderr=True)
1350    match = re.search(r'Created: (.*)\n', stderr)
1351    self.assertIsNotNone(match)
1352    created_uri = match.group(1)
1353
1354    # Use @Retry as hedge against bucket listing eventual consistency.
1355    @Retry(AssertionError, tries=3, timeout_secs=1)
1356    def _Check1():
1357      stdout = self.RunGsUtil(['ls', '-a', dst_str], return_stdout=True)
1358      lines = stdout.split('\n')
1359      # Final (most recent) object should match the "Created:" URI. This is
1360      # in second-to-last line (last line is '\n').
1361      self.assertGreater(len(lines), 2)
1362      self.assertEqual(created_uri, lines[-2])
1363
1364    _Check1()
1365
1366  @SequentialAndParallelTransfer
1367  def test_stdin_args(self):
1368    """Tests cp with the -I option."""
1369    tmpdir = self.CreateTempDir()
1370    fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents=b'data1')
1371    fpath2 = self.CreateTempFile(tmpdir=tmpdir, contents=b'data2')
1372    bucket_uri = self.CreateBucket()
1373    self.RunGsUtil(['cp', '-I', suri(bucket_uri)],
1374                   stdin='\n'.join((fpath1, fpath2)))
1375
1376    # Use @Retry as hedge against bucket listing eventual consistency.
1377    @Retry(AssertionError, tries=3, timeout_secs=1)
1378    def _Check1():
1379      stdout = self.RunGsUtil(['ls', suri(bucket_uri)], return_stdout=True)
1380      self.assertIn(os.path.basename(fpath1), stdout)
1381      self.assertIn(os.path.basename(fpath2), stdout)
1382      self.assertNumLines(stdout, 2)
1383
1384    _Check1()
1385
1386  def test_cross_storage_class_cloud_cp(self):
1387    bucket1_uri = self.CreateBucket(storage_class='standard')
1388    bucket2_uri = self.CreateBucket(
1389        storage_class='durable_reduced_availability')
1390    key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents=b'foo')
1391    # Server now allows copy-in-the-cloud across storage classes.
1392    self.RunGsUtil(['cp', suri(key_uri), suri(bucket2_uri)])
1393
1394  @unittest.skipUnless(HAS_S3_CREDS, 'Test requires both S3 and GS credentials')
1395  def test_cross_provider_cp(self):
1396    s3_bucket = self.CreateBucket(provider='s3')
1397    gs_bucket = self.CreateBucket(provider='gs')
1398    s3_key = self.CreateObject(bucket_uri=s3_bucket, contents=b'foo')
1399    gs_key = self.CreateObject(bucket_uri=gs_bucket, contents=b'bar')
1400    self.RunGsUtil(['cp', suri(s3_key), suri(gs_bucket)])
1401    self.RunGsUtil(['cp', suri(gs_key), suri(s3_bucket)])
1402
1403  @unittest.skipUnless(HAS_S3_CREDS, 'Test requires both S3 and GS credentials')
1404  @unittest.skip('This test performs a large copy but remains here for '
1405                 'debugging purposes.')
1406  def test_cross_provider_large_cp(self):
1407    s3_bucket = self.CreateBucket(provider='s3')
1408    gs_bucket = self.CreateBucket(provider='gs')
1409    s3_key = self.CreateObject(bucket_uri=s3_bucket,
1410                               contents=b'f' * 1024 * 1024)
1411    gs_key = self.CreateObject(bucket_uri=gs_bucket,
1412                               contents=b'b' * 1024 * 1024)
1413    self.RunGsUtil(['cp', suri(s3_key), suri(gs_bucket)])
1414    self.RunGsUtil(['cp', suri(gs_key), suri(s3_bucket)])
1415    with SetBotoConfigForTest([('GSUtil', 'resumable_threshold', str(ONE_KIB)),
1416                               ('GSUtil', 'json_resumable_chunk_size',
1417                                str(ONE_KIB * 256))]):
1418      # Ensure copy also works across json upload chunk boundaries.
1419      self.RunGsUtil(['cp', suri(s3_key), suri(gs_bucket)])
1420
1421  @unittest.skipUnless(HAS_S3_CREDS, 'Test requires both S3 and GS credentials')
1422  def test_gs_to_s3_multipart_cp(self):
1423    """Ensure daisy_chain works for an object that is downloaded in 2 parts."""
1424    s3_bucket = self.CreateBucket(provider='s3')
1425    gs_bucket = self.CreateBucket(provider='gs', prefer_json_api=True)
1426    num_bytes = int(_DEFAULT_DOWNLOAD_CHUNK_SIZE * 1.1)
1427    gs_key = self.CreateObject(bucket_uri=gs_bucket,
1428                               contents=b'b' * num_bytes,
1429                               prefer_json_api=True)
1430    self.RunGsUtil([
1431        '-o', 's3:use-sigv4=True', '-o', 's3:host=s3.amazonaws.com', 'cp',
1432        suri(gs_key),
1433        suri(s3_bucket)
1434    ])
1435
1436  @unittest.skip('This test is slow due to creating many objects, '
1437                 'but remains here for debugging purposes.')
1438  def test_daisy_chain_cp_file_sizes(self):
1439    """Ensure daisy chain cp works with a wide of file sizes."""
1440    bucket_uri = self.CreateBucket()
1441    bucket2_uri = self.CreateBucket()
1442    exponent_cap = 28  # Up to 256 MiB in size.
1443    for i in range(exponent_cap):
1444      one_byte_smaller = 2**i - 1
1445      normal = 2**i
1446      one_byte_larger = 2**i + 1
1447      self.CreateObject(bucket_uri=bucket_uri, contents=b'a' * one_byte_smaller)
1448      self.CreateObject(bucket_uri=bucket_uri, contents=b'b' * normal)
1449      self.CreateObject(bucket_uri=bucket_uri, contents=b'c' * one_byte_larger)
1450
1451    self.AssertNObjectsInBucket(bucket_uri, exponent_cap * 3)
1452    self.RunGsUtil(
1453        ['-m', 'cp', '-D',
1454         suri(bucket_uri, '**'),
1455         suri(bucket2_uri)])
1456
1457    self.AssertNObjectsInBucket(bucket2_uri, exponent_cap * 3)
1458
1459  def test_daisy_chain_cp(self):
1460    """Tests cp with the -D option."""
1461    bucket1_uri = self.CreateBucket(storage_class='standard')
1462    bucket2_uri = self.CreateBucket(
1463        storage_class='durable_reduced_availability')
1464    key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents=b'foo')
1465    # Set some headers on source object so we can verify that headers are
1466    # presereved by daisy-chain copy.
1467    self.RunGsUtil([
1468        'setmeta', '-h', 'Cache-Control:public,max-age=12', '-h',
1469        'Content-Type:image/gif', '-h',
1470        'x-%s-meta-1:abcd' % self.provider_custom_meta,
1471        suri(key_uri)
1472    ])
1473    # Set public-read (non-default) ACL so we can verify that cp -D -p works.
1474    self.RunGsUtil(['acl', 'set', 'public-read', suri(key_uri)])
1475    acl_json = self.RunGsUtil(['acl', 'get', suri(key_uri)], return_stdout=True)
1476    # Perform daisy-chain copy and verify that source object headers and ACL
1477    # were preserved. Also specify -n option to test that gsutil correctly
1478    # removes the x-goog-if-generation-match:0 header that was set at uploading
1479    # time when updating the ACL.
1480    stderr = self.RunGsUtil(
1481        ['cp', '-Dpn', suri(key_uri),
1482         suri(bucket2_uri)], return_stderr=True)
1483    self.assertNotIn('Copy-in-the-cloud disallowed', stderr)
1484
1485    @Retry(AssertionError, tries=3, timeout_secs=1)
1486    def _Check():
1487      uri = suri(bucket2_uri, key_uri.object_name)
1488      stdout = self.RunGsUtil(['ls', '-L', uri], return_stdout=True)
1489      self.assertRegex(stdout, r'Cache-Control:\s+public,max-age=12')
1490      self.assertRegex(stdout, r'Content-Type:\s+image/gif')
1491      self.assertRegex(stdout, r'Metadata:\s+1:\s+abcd')
1492      new_acl_json = self.RunGsUtil(['acl', 'get', uri], return_stdout=True)
1493      self.assertEqual(acl_json, new_acl_json)
1494
1495    _Check()
1496
1497  @unittest.skipUnless(
1498      not HAS_GS_PORT, 'gs_port is defined in config which can cause '
1499      'problems when uploading and downloading to the same local host port')
1500  def test_daisy_chain_cp_download_failure(self):
1501    """Tests cp with the -D option when the download thread dies."""
1502    bucket1_uri = self.CreateBucket()
1503    bucket2_uri = self.CreateBucket()
1504    key_uri = self.CreateObject(bucket_uri=bucket1_uri,
1505                                contents=b'a' * self.halt_size)
1506    boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
1507    test_callback_file = self.CreateTempFile(
1508        contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5)))
1509    with SetBotoConfigForTest([boto_config_for_test]):
1510      stderr = self.RunGsUtil([
1511          'cp', '--testcallbackfile', test_callback_file, '-D',
1512          suri(key_uri),
1513          suri(bucket2_uri)
1514      ],
1515                              expected_status=1,
1516                              return_stderr=True)
1517      # Should have three exception traces; one from the download thread and
1518      # two from the upload thread (expection message is repeated in main's
1519      # _OutputAndExit).
1520      self.assertEqual(
1521          stderr.count(
1522              'ResumableDownloadException: Artifically halting download'), 3)
1523
1524  def test_streaming_gzip_upload(self):
1525    """Tests error when compression flag is requested on a streaming source."""
1526    bucket_uri = self.CreateBucket()
1527    stderr = self.RunGsUtil(
1528        ['cp', '-Z', '-', suri(bucket_uri, 'foo')],
1529        return_stderr=True,
1530        expected_status=1,
1531        stdin='streaming data')
1532    self.assertIn(
1533        'gzip compression is not currently supported on streaming uploads',
1534        stderr)
1535
1536  def test_seek_ahead_upload_cp(self):
1537    """Tests that the seek-ahead iterator estimates total upload work."""
1538    tmpdir = self.CreateTempDir(test_files=3)
1539    bucket_uri = self.CreateBucket()
1540
1541    with SetBotoConfigForTest([('GSUtil', 'task_estimation_threshold', '1'),
1542                               ('GSUtil', 'task_estimation_force', 'True')]):
1543      stderr = self.RunGsUtil(
1544          ['-m', 'cp', '-r', tmpdir, suri(bucket_uri)], return_stderr=True)
1545      self.assertIn(
1546          'Estimated work for this command: objects: 3, total size: 18', stderr)
1547
1548    with SetBotoConfigForTest([('GSUtil', 'task_estimation_threshold', '0'),
1549                               ('GSUtil', 'task_estimation_force', 'True')]):
1550      stderr = self.RunGsUtil(
1551          ['-m', 'cp', '-r', tmpdir, suri(bucket_uri)], return_stderr=True)
1552      self.assertNotIn('Estimated work', stderr)
1553
1554  def test_seek_ahead_download_cp(self):
1555    tmpdir = self.CreateTempDir()
1556    bucket_uri = self.CreateBucket(test_objects=3)
1557    self.AssertNObjectsInBucket(bucket_uri, 3)
1558
1559    with SetBotoConfigForTest([('GSUtil', 'task_estimation_threshold', '1'),
1560                               ('GSUtil', 'task_estimation_force', 'True')]):
1561      stderr = self.RunGsUtil(
1562          ['-m', 'cp', '-r', suri(bucket_uri), tmpdir], return_stderr=True)
1563      self.assertIn(
1564          'Estimated work for this command: objects: 3, total size: 18', stderr)
1565
1566    with SetBotoConfigForTest([('GSUtil', 'task_estimation_threshold', '0'),
1567                               ('GSUtil', 'task_estimation_force', 'True')]):
1568      stderr = self.RunGsUtil(
1569          ['-m', 'cp', '-r', suri(bucket_uri), tmpdir], return_stderr=True)
1570      self.assertNotIn('Estimated work', stderr)
1571
1572  def test_canned_acl_cp(self):
1573    """Tests copying with a canned ACL."""
1574    bucket1_uri = self.CreateBucket()
1575    bucket2_uri = self.CreateBucket()
1576    key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents=b'foo')
1577    self.RunGsUtil(
1578        ['cp', '-a', 'public-read',
1579         suri(key_uri),
1580         suri(bucket2_uri)])
1581    # Set public-read on the original key after the copy so we can compare
1582    # the ACLs.
1583    self.RunGsUtil(['acl', 'set', 'public-read', suri(key_uri)])
1584    public_read_acl = self.RunGsUtil(['acl', 'get', suri(key_uri)],
1585                                     return_stdout=True)
1586
1587    @Retry(AssertionError, tries=3, timeout_secs=1)
1588    def _Check():
1589      uri = suri(bucket2_uri, key_uri.object_name)
1590      new_acl_json = self.RunGsUtil(['acl', 'get', uri], return_stdout=True)
1591      self.assertEqual(public_read_acl, new_acl_json)
1592
1593    _Check()
1594
1595  @SequentialAndParallelTransfer
1596  def test_canned_acl_upload(self):
1597    """Tests uploading a file with a canned ACL."""
1598    bucket1_uri = self.CreateBucket()
1599    key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents=b'foo')
1600    # Set public-read on the object so we can compare the ACLs.
1601    self.RunGsUtil(['acl', 'set', 'public-read', suri(key_uri)])
1602    public_read_acl = self.RunGsUtil(['acl', 'get', suri(key_uri)],
1603                                     return_stdout=True)
1604
1605    file_name = 'bar'
1606    fpath = self.CreateTempFile(file_name=file_name, contents=b'foo')
1607    self.RunGsUtil(['cp', '-a', 'public-read', fpath, suri(bucket1_uri)])
1608    new_acl_json = self.RunGsUtil(
1609        ['acl', 'get', suri(bucket1_uri, file_name)], return_stdout=True)
1610    self.assertEqual(public_read_acl, new_acl_json)
1611
1612    resumable_size = ONE_KIB
1613    boto_config_for_test = ('GSUtil', 'resumable_threshold',
1614                            str(resumable_size))
1615    with SetBotoConfigForTest([boto_config_for_test]):
1616      resumable_file_name = 'resumable_bar'
1617      resumable_contents = os.urandom(resumable_size)
1618      resumable_fpath = self.CreateTempFile(file_name=resumable_file_name,
1619                                            contents=resumable_contents)
1620      self.RunGsUtil(
1621          ['cp', '-a', 'public-read', resumable_fpath,
1622           suri(bucket1_uri)])
1623      new_resumable_acl_json = self.RunGsUtil(
1624          ['acl', 'get', suri(bucket1_uri, resumable_file_name)],
1625          return_stdout=True)
1626      self.assertEqual(public_read_acl, new_resumable_acl_json)
1627
1628  def test_cp_key_to_local_stream(self):
1629    bucket_uri = self.CreateBucket()
1630    contents = b'foo'
1631    key_uri = self.CreateObject(bucket_uri=bucket_uri, contents=contents)
1632    stdout = self.RunGsUtil(['cp', suri(key_uri), '-'], return_stdout=True)
1633    self.assertIn(contents, stdout.encode('ascii'))
1634
1635  def test_cp_local_file_to_local_stream(self):
1636    contents = b'content'
1637    fpath = self.CreateTempFile(contents=contents)
1638    stdout = self.RunGsUtil(['cp', fpath, '-'], return_stdout=True)
1639    self.assertIn(contents, stdout.encode(UTF8))
1640
1641  @SequentialAndParallelTransfer
1642  def test_cp_zero_byte_file(self):
1643    dst_bucket_uri = self.CreateBucket()
1644    src_dir = self.CreateTempDir()
1645    fpath = os.path.join(src_dir, 'zero_byte')
1646    with open(fpath, 'w') as unused_out_file:
1647      pass  # Write a zero byte file
1648    self.RunGsUtil(['cp', fpath, suri(dst_bucket_uri)])
1649
1650    @Retry(AssertionError, tries=3, timeout_secs=1)
1651    def _Check1():
1652      stdout = self.RunGsUtil(['ls', suri(dst_bucket_uri)], return_stdout=True)
1653      self.assertIn(os.path.basename(fpath), stdout)
1654
1655    _Check1()
1656
1657    download_path = os.path.join(src_dir, 'zero_byte_download')
1658    self.RunGsUtil(['cp', suri(dst_bucket_uri, 'zero_byte'), download_path])
1659    self.assertTrue(os.stat(download_path))
1660
1661  def test_copy_bucket_to_bucket(self):
1662    """Tests recursively copying from bucket to bucket.
1663
1664    This should produce identically named objects (and not, in particular,
1665    destination objects named by the version-specific URI from source objects).
1666    """
1667    src_bucket_uri = self.CreateVersionedBucket()
1668    dst_bucket_uri = self.CreateVersionedBucket()
1669    self.CreateObject(bucket_uri=src_bucket_uri,
1670                      object_name='obj0',
1671                      contents=b'abc')
1672    self.CreateObject(bucket_uri=src_bucket_uri,
1673                      object_name='obj1',
1674                      contents=b'def')
1675
1676    # Use @Retry as hedge against bucket listing eventual consistency.
1677    @Retry(AssertionError, tries=3, timeout_secs=1)
1678    def _CopyAndCheck():
1679      self.RunGsUtil(['cp', '-R', suri(src_bucket_uri), suri(dst_bucket_uri)])
1680      stdout = self.RunGsUtil(['ls', '-R', dst_bucket_uri.uri],
1681                              return_stdout=True)
1682      self.assertIn(
1683          '%s%s/obj0\n' % (dst_bucket_uri, src_bucket_uri.bucket_name), stdout)
1684      self.assertIn(
1685          '%s%s/obj1\n' % (dst_bucket_uri, src_bucket_uri.bucket_name), stdout)
1686
1687    _CopyAndCheck()
1688
1689  @SkipForGS('Only s3 V4 signatures error on location mismatches.')
1690  def test_copy_bucket_to_bucket_with_location_redirect(self):
1691    # cp uses a sender function that raises an exception on location mismatches,
1692    # instead of returning a response. This integration test ensures retries
1693    # from exceptions work correctly.
1694
1695    src_bucket_region = 'ap-east-1'
1696    dest_bucket_region = 'us-east-2'
1697    src_bucket_host = 's3.%s.amazonaws.com' % src_bucket_region
1698    dest_bucket_host = 's3.%s.amazonaws.com' % dest_bucket_region
1699    client_host = 's3.eu-west-1.amazonaws.com'
1700
1701    with SetBotoConfigForTest([('s3', 'host', src_bucket_host)]):
1702      src_bucket_uri = self.CreateBucket(location=src_bucket_region)
1703      self.CreateObject(bucket_uri=src_bucket_uri,
1704                        object_name='obj0',
1705                        contents=b'abc')
1706      self.CreateObject(bucket_uri=src_bucket_uri,
1707                        object_name='obj1',
1708                        contents=b'def')
1709
1710    with SetBotoConfigForTest([('s3', 'host', dest_bucket_host)]):
1711      dst_bucket_uri = self.CreateBucket(location=dest_bucket_region)
1712
1713    # Use @Retry as hedge against bucket listing eventual consistency.
1714    @Retry(AssertionError, tries=3, timeout_secs=1)
1715    def _CopyAndCheck():
1716      self.RunGsUtil(['cp', '-R', suri(src_bucket_uri), suri(dst_bucket_uri)])
1717      stdout = self.RunGsUtil(['ls', '-R', dst_bucket_uri.uri],
1718                              return_stdout=True)
1719      self.assertIn(
1720          '%s%s/obj0\n' % (dst_bucket_uri, src_bucket_uri.bucket_name), stdout)
1721      self.assertIn(
1722          '%s%s/obj1\n' % (dst_bucket_uri, src_bucket_uri.bucket_name), stdout)
1723
1724    with SetBotoConfigForTest([('s3', 'host', client_host)]):
1725      _CopyAndCheck()
1726
1727  def test_copy_bucket_to_dir(self):
1728    """Tests recursively copying from bucket to a directory.
1729
1730    This should produce identically named objects (and not, in particular,
1731    destination objects named by the version- specific URI from source objects).
1732    """
1733    src_bucket_uri = self.CreateBucket()
1734    dst_dir = self.CreateTempDir()
1735    self.CreateObject(bucket_uri=src_bucket_uri,
1736                      object_name='obj0',
1737                      contents=b'abc')
1738    self.CreateObject(bucket_uri=src_bucket_uri,
1739                      object_name='obj1',
1740                      contents=b'def')
1741
1742    # Use @Retry as hedge against bucket listing eventual consistency.
1743    @Retry(AssertionError, tries=3, timeout_secs=1)
1744    def _CopyAndCheck():
1745      """Copies the bucket recursively and validates the results."""
1746      self.RunGsUtil(['cp', '-R', suri(src_bucket_uri), dst_dir])
1747      dir_list = []
1748      for dirname, _, filenames in os.walk(dst_dir):
1749        for filename in filenames:
1750          dir_list.append(os.path.join(dirname, filename))
1751      dir_list = sorted(dir_list)
1752      self.assertEqual(len(dir_list), 2)
1753      self.assertEqual(
1754          os.path.join(dst_dir, src_bucket_uri.bucket_name, 'obj0'),
1755          dir_list[0])
1756      self.assertEqual(
1757          os.path.join(dst_dir, src_bucket_uri.bucket_name, 'obj1'),
1758          dir_list[1])
1759
1760    _CopyAndCheck()
1761
1762  @unittest.skipUnless(HAS_S3_CREDS, 'Test requires both S3 and GS credentials')
1763  def test_copy_object_to_dir_s3_v4(self):
1764    """Tests copying object from s3 to local dir with v4 signature.
1765
1766    Regions like us-east2 accept only V4 signature, hence we will create
1767    the bucket in us-east2 region to enforce testing with V4 signature.
1768    """
1769    src_bucket_uri = self.CreateBucket(provider='s3', location='us-east-2')
1770    dst_dir = self.CreateTempDir()
1771    self.CreateObject(bucket_uri=src_bucket_uri,
1772                      object_name='obj0',
1773                      contents=b'abc')
1774    self.CreateObject(bucket_uri=src_bucket_uri,
1775                      object_name='obj1',
1776                      contents=b'def')
1777
1778    # Use @Retry as hedge against bucket listing eventual consistency.
1779    @Retry(AssertionError, tries=3, timeout_secs=1)
1780    def _CopyAndCheck():
1781      """Copies the bucket recursively and validates the results."""
1782      self.RunGsUtil(['cp', '-R', suri(src_bucket_uri), dst_dir])
1783      dir_list = []
1784      for dirname, _, filenames in os.walk(dst_dir):
1785        for filename in filenames:
1786          dir_list.append(os.path.join(dirname, filename))
1787      dir_list = sorted(dir_list)
1788      self.assertEqual(len(dir_list), 2)
1789      self.assertEqual(
1790          os.path.join(dst_dir, src_bucket_uri.bucket_name, 'obj0'),
1791          dir_list[0])
1792      self.assertEqual(
1793          os.path.join(dst_dir, src_bucket_uri.bucket_name, 'obj1'),
1794          dir_list[1])
1795
1796    _CopyAndCheck()
1797
1798  @SkipForS3('The boto lib used for S3 does not handle objects '
1799             'starting with slashes if we use V4 signature')
1800  def test_recursive_download_with_leftover_slash_only_dir_placeholder(self):
1801    """Tests that we correctly handle leftover dir placeholders."""
1802    src_bucket_uri = self.CreateBucket()
1803    dst_dir = self.CreateTempDir()
1804    self.CreateObject(bucket_uri=src_bucket_uri,
1805                      object_name='obj0',
1806                      contents=b'abc')
1807    self.CreateObject(bucket_uri=src_bucket_uri,
1808                      object_name='obj1',
1809                      contents=b'def')
1810
1811    # Create a placeholder like what can be left over by web GUI tools.
1812    key_uri = src_bucket_uri.clone_replace_name('/')
1813    key_uri.set_contents_from_string('')
1814    self.AssertNObjectsInBucket(src_bucket_uri, 3)
1815
1816    self.RunGsUtil(['cp', '-R', suri(src_bucket_uri), dst_dir])
1817    dir_list = []
1818    for dirname, _, filenames in os.walk(dst_dir):
1819      for filename in filenames:
1820        dir_list.append(os.path.join(dirname, filename))
1821    dir_list = sorted(dir_list)
1822    self.assertEqual(len(dir_list), 2)
1823    self.assertEqual(os.path.join(dst_dir, src_bucket_uri.bucket_name, 'obj0'),
1824                     dir_list[0])
1825    self.assertEqual(os.path.join(dst_dir, src_bucket_uri.bucket_name, 'obj1'),
1826                     dir_list[1])
1827
1828  def test_recursive_download_with_leftover_dir_placeholder(self):
1829    """Tests that we correctly handle leftover dir placeholders."""
1830    src_bucket_uri = self.CreateBucket()
1831    dst_dir = self.CreateTempDir()
1832    self.CreateObject(bucket_uri=src_bucket_uri,
1833                      object_name='obj0',
1834                      contents=b'abc')
1835    self.CreateObject(bucket_uri=src_bucket_uri,
1836                      object_name='obj1',
1837                      contents=b'def')
1838
1839    # Create a placeholder like what can be left over by web GUI tools.
1840    key_uri = src_bucket_uri.clone_replace_name('foo/')
1841    key_uri.set_contents_from_string('')
1842    self.AssertNObjectsInBucket(src_bucket_uri, 3)
1843
1844    self.RunGsUtil(['cp', '-R', suri(src_bucket_uri), dst_dir])
1845    dir_list = []
1846    for dirname, _, filenames in os.walk(dst_dir):
1847      for filename in filenames:
1848        dir_list.append(os.path.join(dirname, filename))
1849    dir_list = sorted(dir_list)
1850    self.assertEqual(len(dir_list), 2)
1851    self.assertEqual(os.path.join(dst_dir, src_bucket_uri.bucket_name, 'obj0'),
1852                     dir_list[0])
1853    self.assertEqual(os.path.join(dst_dir, src_bucket_uri.bucket_name, 'obj1'),
1854                     dir_list[1])
1855
1856  def test_copy_quiet(self):
1857    bucket_uri = self.CreateBucket()
1858    key_uri = self.CreateObject(bucket_uri=bucket_uri, contents=b'foo')
1859    stderr = self.RunGsUtil(
1860        ['-q', 'cp',
1861         suri(key_uri),
1862         suri(bucket_uri.clone_replace_name('o2'))],
1863        return_stderr=True)
1864    self.assertEqual(stderr.count('Copying '), 0)
1865
1866  def test_cp_md5_match(self):
1867    """Tests that the uploaded object has the expected MD5.
1868
1869    Note that while this does perform a file to object upload, MD5's are
1870    not supported for composite objects so we don't use the decorator in this
1871    case.
1872    """
1873    bucket_uri = self.CreateBucket()
1874    fpath = self.CreateTempFile(contents=b'bar')
1875    with open(fpath, 'rb') as f_in:
1876      md5 = binascii.unhexlify(CalculateMd5FromContents(f_in))
1877      try:
1878        encoded_bytes = base64.encodebytes(md5)
1879      except AttributeError:
1880        # For Python 2 compatability.
1881        encoded_bytes = base64.encodestring(md5)
1882      file_md5 = encoded_bytes.rstrip(b'\n')
1883    self.RunGsUtil(['cp', fpath, suri(bucket_uri)])
1884
1885    # Use @Retry as hedge against bucket listing eventual consistency.
1886    @Retry(AssertionError, tries=3, timeout_secs=1)
1887    def _Check1():
1888      stdout = self.RunGsUtil(['ls', '-L', suri(bucket_uri)],
1889                              return_stdout=True)
1890      self.assertRegex(
1891          stdout, r'Hash\s+\(md5\):\s+%s' % re.escape(file_md5.decode('ascii')))
1892
1893    _Check1()
1894
1895  @unittest.skipIf(IS_WINDOWS,
1896                   'Unicode handling on Windows requires mods to site-packages')
1897  @SequentialAndParallelTransfer
1898  def test_cp_manifest_upload_unicode(self):
1899    return self._ManifestUpload('foo-unicöde'.encode(UTF8),
1900                                'bar-unicöde'.encode(UTF8),
1901                                'manifest-unicöde'.encode(UTF8))
1902
1903  @SequentialAndParallelTransfer
1904  def test_cp_manifest_upload(self):
1905    """Tests uploading with a mnifest file."""
1906    return self._ManifestUpload('foo', 'bar', 'manifest')
1907
1908  def _ManifestUpload(self, file_name, object_name, manifest_name):
1909    """Tests uploading with a manifest file."""
1910    bucket_uri = self.CreateBucket()
1911    dsturi = suri(bucket_uri, object_name)
1912
1913    fpath = self.CreateTempFile(file_name=file_name, contents=b'bar')
1914    logpath = self.CreateTempFile(file_name=manifest_name, contents=b'')
1915    # Ensure the file is empty.
1916    open(logpath, 'w').close()
1917    self.RunGsUtil(['cp', '-L', logpath, fpath, dsturi])
1918
1919    with open(logpath, 'r') as f:
1920      lines = f.readlines()
1921    if six.PY2:
1922      lines = [six.text_type(line, UTF8) for line in lines]
1923
1924    self.assertEqual(len(lines), 2)
1925
1926    expected_headers = [
1927        'Source', 'Destination', 'Start', 'End', 'Md5', 'UploadId',
1928        'Source Size', 'Bytes Transferred', 'Result', 'Description'
1929    ]
1930    self.assertEqual(expected_headers, lines[0].strip().split(','))
1931    results = lines[1].strip().split(',')
1932
1933    results = dict(zip(expected_headers, results))
1934
1935    self.assertEqual(
1936        results['Source'],
1937        'file://' + fpath,
1938    )
1939    self.assertEqual(
1940        results['Destination'],
1941        dsturi,
1942    )
1943
1944    date_format = '%Y-%m-%dT%H:%M:%S.%fZ'
1945    start_date = datetime.datetime.strptime(results['Start'], date_format)
1946    end_date = datetime.datetime.strptime(results['End'], date_format)
1947    self.assertEqual(end_date > start_date, True)
1948
1949    if self.RunGsUtil == testcase.GsUtilIntegrationTestCase.RunGsUtil:
1950      # Check that we didn't do automatic parallel uploads - compose doesn't
1951      # calculate the MD5 hash. Since RunGsUtil is overriden in
1952      # TestCpParallelUploads to force parallel uploads, we can check which
1953      # method was used.
1954      self.assertEqual(results['Md5'], 'rL0Y20zC+Fzt72VPzMSk2A==')
1955
1956    self.assertEqual(int(results['Source Size']), 3)
1957    self.assertEqual(int(results['Bytes Transferred']), 3)
1958    self.assertEqual(results['Result'], 'OK')
1959
1960  @SequentialAndParallelTransfer
1961  def test_cp_manifest_download(self):
1962    """Tests downloading with a manifest file."""
1963    key_uri = self.CreateObject(contents=b'foo')
1964    fpath = self.CreateTempFile(contents=b'')
1965    logpath = self.CreateTempFile(contents=b'')
1966    # Ensure the file is empty.
1967    open(logpath, 'w').close()
1968    self.RunGsUtil(
1969        ['cp', '-L', logpath, suri(key_uri), fpath], return_stdout=True)
1970    with open(logpath, 'r') as f:
1971      lines = f.readlines()
1972    if six.PY3:
1973      decode_lines = []
1974      for line in lines:
1975        if line.startswith("b'"):
1976          some_strs = line.split(',')
1977          line_parts = []
1978          for some_str in some_strs:
1979            if some_str.startswith("b'"):
1980              line_parts.append(ast.literal_eval(some_str).decode(UTF8))
1981            else:
1982              line_parts.append(some_str)
1983          decode_lines.append(','.join(line_parts))
1984        else:
1985          decode_lines.append(line)
1986      lines = decode_lines
1987    self.assertEqual(len(lines), 2)
1988
1989    expected_headers = [
1990        'Source', 'Destination', 'Start', 'End', 'Md5', 'UploadId',
1991        'Source Size', 'Bytes Transferred', 'Result', 'Description'
1992    ]
1993    self.assertEqual(expected_headers, lines[0].strip().split(','))
1994    results = lines[1].strip().split(',')
1995    self.assertEqual(results[0][:5], '%s://' % self.default_provider)  # source
1996    self.assertEqual(results[1][:7], 'file://')  # destination
1997    date_format = '%Y-%m-%dT%H:%M:%S.%fZ'
1998    start_date = datetime.datetime.strptime(results[2], date_format)
1999    end_date = datetime.datetime.strptime(results[3], date_format)
2000    self.assertEqual(end_date > start_date, True)
2001    self.assertEqual(int(results[6]), 3)  # Source Size
2002    # Bytes transferred might be more than 3 if the file was gzipped, since
2003    # the minimum gzip header is 10 bytes.
2004    self.assertGreaterEqual(int(results[7]), 3)  # Bytes Transferred
2005    self.assertEqual(results[8], 'OK')  # Result
2006
2007  @SequentialAndParallelTransfer
2008  def test_copy_unicode_non_ascii_filename(self):
2009    key_uri = self.CreateObject()
2010    # Try with and without resumable upload threshold, to ensure that each
2011    # scenario works. In particular, resumable uploads have tracker filename
2012    # logic.
2013    file_contents = b'x' * START_CALLBACK_PER_BYTES * 2
2014    fpath = self.CreateTempFile(file_name='Аудиоархив', contents=file_contents)
2015    with SetBotoConfigForTest([('GSUtil', 'resumable_threshold', '1')]):
2016      # fpath_bytes = fpath.encode(UTF8)
2017      self.RunGsUtil(['cp', fpath, suri(key_uri)], return_stderr=True)
2018      stdout = self.RunGsUtil(['cat', suri(key_uri)], return_stdout=True)
2019      self.assertEquals(stdout.encode('ascii'), file_contents)
2020    with SetBotoConfigForTest([('GSUtil', 'resumable_threshold',
2021                                str(START_CALLBACK_PER_BYTES * 3))]):
2022      self.RunGsUtil(['cp', fpath, suri(key_uri)], return_stderr=True)
2023      stdout = self.RunGsUtil(['cat', suri(key_uri)], return_stdout=True)
2024      self.assertEquals(stdout.encode('ascii'), file_contents)
2025
2026  # Note: We originally one time implemented a test
2027  # (test_copy_invalid_unicode_filename) that invalid unicode filenames were
2028  # skipped, but it turns out os.walk() on macOS doesn't have problems with
2029  # such files (so, failed that test). Given that, we decided to remove the
2030  # test.
2031
2032  @SequentialAndParallelTransfer
2033  def test_gzip_upload_and_download(self):
2034    bucket_uri = self.CreateBucket()
2035    contents = b'x' * 10000
2036    tmpdir = self.CreateTempDir()
2037    self.CreateTempFile(file_name='test.html', tmpdir=tmpdir, contents=contents)
2038    self.CreateTempFile(file_name='test.js', tmpdir=tmpdir, contents=contents)
2039    self.CreateTempFile(file_name='test.txt', tmpdir=tmpdir, contents=contents)
2040    # Test that copying specifying only 2 of the 3 prefixes gzips the correct
2041    # files, and test that including whitespace in the extension list works.
2042    self.RunGsUtil([
2043        'cp', '-z', 'js, html',
2044        os.path.join(tmpdir, 'test.*'),
2045        suri(bucket_uri)
2046    ])
2047    self.AssertNObjectsInBucket(bucket_uri, 3)
2048    uri1 = suri(bucket_uri, 'test.html')
2049    uri2 = suri(bucket_uri, 'test.js')
2050    uri3 = suri(bucket_uri, 'test.txt')
2051    stdout = self.RunGsUtil(['stat', uri1], return_stdout=True)
2052    self.assertRegex(stdout, r'Content-Encoding:\s+gzip')
2053    stdout = self.RunGsUtil(['stat', uri2], return_stdout=True)
2054    self.assertRegex(stdout, r'Content-Encoding:\s+gzip')
2055    stdout = self.RunGsUtil(['stat', uri3], return_stdout=True)
2056    self.assertNotRegex(stdout, r'Content-Encoding:\s+gzip')
2057    fpath4 = self.CreateTempFile()
2058    for uri in (uri1, uri2, uri3):
2059      self.RunGsUtil(['cp', uri, suri(fpath4)])
2060      with open(fpath4, 'rb') as f:
2061        self.assertEqual(f.read(), contents)
2062
2063  @SkipForS3('No compressed transport encoding support for S3.')
2064  @SkipForXML('No compressed transport encoding support for the XML API.')
2065  @SequentialAndParallelTransfer
2066  def test_gzip_transport_encoded_upload_and_download(self):
2067    """Test gzip encoded files upload correctly.
2068
2069    This checks that files are not tagged with a gzip content encoding and
2070    that the contents of the files are uncompressed in GCS. This test uses the
2071    -j flag to target specific extensions.
2072    """
2073
2074    def _create_test_data():  # pylint: disable=invalid-name
2075      """Setup the bucket and local data to test with.
2076
2077      Returns:
2078        Triplet containing the following values:
2079          bucket_uri: String URI of cloud storage bucket to upload mock data
2080                      to.
2081          tmpdir: String, path of a temporary directory to write mock data to.
2082          local_uris: Tuple of three strings; each is the file path to a file
2083                      containing mock data.
2084      """
2085      bucket_uri = self.CreateBucket()
2086      contents = b'x' * 10000
2087      tmpdir = self.CreateTempDir()
2088
2089      local_uris = []
2090      for filename in ('test.html', 'test.js', 'test.txt'):
2091        local_uris.append(
2092            self.CreateTempFile(file_name=filename,
2093                                tmpdir=tmpdir,
2094                                contents=contents))
2095
2096      return (bucket_uri, tmpdir, local_uris)
2097
2098    def _upload_test_data(tmpdir, bucket_uri):  # pylint: disable=invalid-name
2099      """Upload local test data.
2100
2101      Args:
2102        tmpdir: String, path of a temporary directory to write mock data to.
2103        bucket_uri: String URI of cloud storage bucket to upload mock data to.
2104
2105      Returns:
2106        stderr: String output from running the gsutil command to upload mock
2107                  data.
2108      """
2109      stderr = self.RunGsUtil([
2110          '-D', 'cp', '-j', 'js, html',
2111          os.path.join(tmpdir, 'test*'),
2112          suri(bucket_uri)
2113      ],
2114                              return_stderr=True)
2115      self.AssertNObjectsInBucket(bucket_uri, 3)
2116      return stderr
2117
2118    def _assert_sent_compressed(local_uris, stderr):  # pylint: disable=invalid-name
2119      """Ensure the correct files were marked for compression.
2120
2121      Args:
2122        local_uris: Tuple of three strings; each is the file path to a file
2123                    containing mock data.
2124        stderr: String output from running the gsutil command to upload mock
2125                data.
2126      """
2127      local_uri_html, local_uri_js, local_uri_txt = local_uris
2128      assert_base_string = 'Using compressed transport encoding for file://{}.'
2129      self.assertIn(assert_base_string.format(local_uri_html), stderr)
2130      self.assertIn(assert_base_string.format(local_uri_js), stderr)
2131      self.assertNotIn(assert_base_string.format(local_uri_txt), stderr)
2132
2133    def _assert_stored_uncompressed(bucket_uri, contents=b'x' * 10000):  # pylint: disable=invalid-name
2134      """Ensure the files are not compressed when they are stored in the bucket.
2135
2136      Args:
2137        bucket_uri: String with URI for bucket containing uploaded test data.
2138        contents: Byte string that are stored in each file in the bucket.
2139      """
2140      local_uri_html = suri(bucket_uri, 'test.html')
2141      local_uri_js = suri(bucket_uri, 'test.js')
2142      local_uri_txt = suri(bucket_uri, 'test.txt')
2143      fpath4 = self.CreateTempFile()
2144
2145      for uri in (local_uri_html, local_uri_js, local_uri_txt):
2146        stdout = self.RunGsUtil(['stat', uri], return_stdout=True)
2147        self.assertNotRegex(stdout, r'Content-Encoding:\s+gzip')
2148        self.RunGsUtil(['cp', uri, suri(fpath4)])
2149        with open(fpath4, 'rb') as f:
2150          self.assertEqual(f.read(), contents)
2151
2152    # Get mock data, run tests
2153    bucket_uri, tmpdir, local_uris = _create_test_data()
2154    stderr = _upload_test_data(tmpdir, bucket_uri)
2155    _assert_sent_compressed(local_uris, stderr)
2156    _assert_stored_uncompressed(bucket_uri)
2157
2158  @SkipForS3('No compressed transport encoding support for S3.')
2159  @SkipForXML('No compressed transport encoding support for the XML API.')
2160  @SequentialAndParallelTransfer
2161  def test_gzip_transport_encoded_parallel_upload_non_resumable(self):
2162    """Test non resumable, gzip encoded files upload correctly in parallel.
2163
2164    This test generates a small amount of data (e.g. 100 chars) to upload.
2165    Due to the small size, it will be below the resumable threshold,
2166    and test the behavior of non-resumable uploads.
2167    """
2168    # Setup the bucket and local data.
2169    bucket_uri = self.CreateBucket()
2170    contents = b'x' * 100
2171    tmpdir = self.CreateTempDir(test_files=10, contents=contents)
2172    # Upload the data.
2173    with SetBotoConfigForTest([('GSUtil', 'resumable_threshold', str(ONE_KIB))
2174                              ]):
2175      stderr = self.RunGsUtil(
2176          ['-D', '-m', 'cp', '-J', '-r', tmpdir,
2177           suri(bucket_uri)],
2178          return_stderr=True)
2179      # Ensure all objects are uploaded.
2180      self.AssertNObjectsInBucket(bucket_uri, 10)
2181      # Ensure the progress logger sees a gzip encoding.
2182      self.assertIn('send: Using gzip transport encoding for the request.',
2183                    stderr)
2184
2185  @SkipForS3('No compressed transport encoding support for S3.')
2186  @SkipForXML('No compressed transport encoding support for the XML API.')
2187  @SequentialAndParallelTransfer
2188  def test_gzip_transport_encoded_parallel_upload_resumable(self):
2189    """Test resumable, gzip encoded files upload correctly in parallel.
2190
2191    This test generates a large amount of data (e.g. halt_size amount of chars)
2192    to upload. Due to the large size, it will be above the resumable threshold,
2193    and test the behavior of resumable uploads.
2194    """
2195    # Setup the bucket and local data.
2196    bucket_uri = self.CreateBucket()
2197    contents = get_random_ascii_chars(size=self.halt_size)
2198    tmpdir = self.CreateTempDir(test_files=10, contents=contents)
2199    # Upload the data.
2200    with SetBotoConfigForTest([('GSUtil', 'resumable_threshold', str(ONE_KIB))
2201                              ]):
2202      stderr = self.RunGsUtil(
2203          ['-D', '-m', 'cp', '-J', '-r', tmpdir,
2204           suri(bucket_uri)],
2205          return_stderr=True)
2206      # Ensure all objects are uploaded.
2207      self.AssertNObjectsInBucket(bucket_uri, 10)
2208      # Ensure the progress logger sees a gzip encoding.
2209      self.assertIn('send: Using gzip transport encoding for the request.',
2210                    stderr)
2211
2212  @SequentialAndParallelTransfer
2213  def test_gzip_all_upload_and_download(self):
2214    bucket_uri = self.CreateBucket()
2215    contents = b'x' * 10000
2216    tmpdir = self.CreateTempDir()
2217    self.CreateTempFile(file_name='test.html', tmpdir=tmpdir, contents=contents)
2218    self.CreateTempFile(file_name='test.js', tmpdir=tmpdir, contents=contents)
2219    self.CreateTempFile(file_name='test.txt', tmpdir=tmpdir, contents=contents)
2220    self.CreateTempFile(file_name='test', tmpdir=tmpdir, contents=contents)
2221    # Test that all files are compressed.
2222    self.RunGsUtil(
2223        ['cp', '-Z',
2224         os.path.join(tmpdir, 'test*'),
2225         suri(bucket_uri)])
2226    self.AssertNObjectsInBucket(bucket_uri, 4)
2227    uri1 = suri(bucket_uri, 'test.html')
2228    uri2 = suri(bucket_uri, 'test.js')
2229    uri3 = suri(bucket_uri, 'test.txt')
2230    uri4 = suri(bucket_uri, 'test')
2231    stdout = self.RunGsUtil(['stat', uri1], return_stdout=True)
2232    self.assertRegex(stdout, r'Content-Encoding:\s+gzip')
2233    stdout = self.RunGsUtil(['stat', uri2], return_stdout=True)
2234    self.assertRegex(stdout, r'Content-Encoding:\s+gzip')
2235    stdout = self.RunGsUtil(['stat', uri3], return_stdout=True)
2236    self.assertRegex(stdout, r'Content-Encoding:\s+gzip')
2237    stdout = self.RunGsUtil(['stat', uri4], return_stdout=True)
2238    self.assertRegex(stdout, r'Content-Encoding:\s+gzip')
2239    fpath4 = self.CreateTempFile()
2240    for uri in (uri1, uri2, uri3, uri4):
2241      self.RunGsUtil(['cp', uri, suri(fpath4)])
2242      with open(fpath4, 'rb') as f:
2243        self.assertEqual(f.read(), contents)
2244
2245  @SkipForS3('No compressed transport encoding support for S3.')
2246  @SkipForXML('No compressed transport encoding support for the XML API.')
2247  @SequentialAndParallelTransfer
2248  def test_gzip_transport_encoded_all_upload_and_download(self):
2249    """Test gzip encoded files upload correctly.
2250
2251    This checks that files are not tagged with a gzip content encoding and
2252    that the contents of the files are uncompressed in GCS. This test uses the
2253    -J flag to target all files.
2254    """
2255    # Setup the bucket and local data.
2256    bucket_uri = self.CreateBucket()
2257    contents = b'x' * 10000
2258    tmpdir = self.CreateTempDir()
2259    local_uri1 = self.CreateTempFile(file_name='test.txt',
2260                                     tmpdir=tmpdir,
2261                                     contents=contents)
2262    local_uri2 = self.CreateTempFile(file_name='test',
2263                                     tmpdir=tmpdir,
2264                                     contents=contents)
2265    # Upload the data.
2266    stderr = self.RunGsUtil(
2267        ['-D', 'cp', '-J',
2268         os.path.join(tmpdir, 'test*'),
2269         suri(bucket_uri)],
2270        return_stderr=True)
2271    self.AssertNObjectsInBucket(bucket_uri, 2)
2272    # Ensure the correct files were marked for compression.
2273    self.assertIn(
2274        'Using compressed transport encoding for file://%s.' % (local_uri1),
2275        stderr)
2276    self.assertIn(
2277        'Using compressed transport encoding for file://%s.' % (local_uri2),
2278        stderr)
2279    # Ensure the progress logger sees a gzip encoding.
2280    self.assertIn('send: Using gzip transport encoding for the request.',
2281                  stderr)
2282    # Ensure the files do not have a stored encoding of gzip and are stored
2283    # uncompressed.
2284    remote_uri1 = suri(bucket_uri, 'test.txt')
2285    remote_uri2 = suri(bucket_uri, 'test')
2286    fpath4 = self.CreateTempFile()
2287    for uri in (remote_uri1, remote_uri2):
2288      stdout = self.RunGsUtil(['stat', uri], return_stdout=True)
2289      self.assertNotRegex(stdout, r'Content-Encoding:\s+gzip')
2290      self.RunGsUtil(['cp', uri, suri(fpath4)])
2291      with open(fpath4, 'rb') as f:
2292        self.assertEqual(f.read(), contents)
2293
2294  def test_both_gzip_options_error(self):
2295    """Test that mixing compression flags error."""
2296    cases = (
2297        # Test with -Z and -z
2298        ['cp', '-Z', '-z', 'html, js', 'a.js', 'b.js'],
2299        # Same test, but with arguments in the opposite order.
2300        ['cp', '-z', 'html, js', '-Z', 'a.js', 'b.js'])
2301
2302    for case in cases:
2303      stderr = self.RunGsUtil(case, return_stderr=True, expected_status=1)
2304      self.assertIn('CommandException', stderr)
2305      self.assertIn(
2306          'Specifying both the -z and -Z options together is invalid.', stderr)
2307
2308  def test_both_gzip_transport_encoding_options_error(self):
2309    """Test that mixing transport encoding flags error."""
2310    cases = (
2311        # Test with -J and -j
2312        ['cp', '-J', '-j', 'html, js', 'a.js', 'b.js'],
2313        # Same test, but with arguments in the opposite order.
2314        ['cp', '-j', 'html, js', '-J', 'a.js', 'b.js'])
2315
2316    for case in cases:
2317      stderr = self.RunGsUtil(case, return_stderr=True, expected_status=1)
2318      self.assertIn('CommandException', stderr)
2319      self.assertIn(
2320          'Specifying both the -j and -J options together is invalid.', stderr)
2321
2322  def test_combined_gzip_options_error(self):
2323    """Test that mixing transport encoding and compression flags error."""
2324    cases = (['cp', '-Z', '-j', 'html, js', 'a.js',
2325              'b.js'], ['cp', '-J', '-z', 'html, js', 'a.js',
2326                        'b.js'], ['cp', '-j', 'html, js', '-Z', 'a.js', 'b.js'],
2327             ['cp', '-z', 'html, js', '-J', 'a.js', 'b.js'])
2328
2329    for case in cases:
2330      stderr = self.RunGsUtil(case, return_stderr=True, expected_status=1)
2331      self.assertIn('CommandException', stderr)
2332      self.assertIn(
2333          'Specifying both the -j/-J and -z/-Z options together is invalid.',
2334          stderr)
2335
2336  def test_upload_with_subdir_and_unexpanded_wildcard(self):
2337    fpath1 = self.CreateTempFile(file_name=('tmp', 'x', 'y', 'z'))
2338    bucket_uri = self.CreateBucket()
2339    wildcard_uri = '%s*' % fpath1[:-5]
2340    stderr = self.RunGsUtil(
2341        ['cp', '-R', wildcard_uri, suri(bucket_uri)], return_stderr=True)
2342    self.assertIn('Copying file:', stderr)
2343    self.AssertNObjectsInBucket(bucket_uri, 1)
2344
2345  @SequentialAndParallelTransfer
2346  def test_cp_object_ending_with_slash(self):
2347    """Tests that cp works with object names ending with slash."""
2348    tmpdir = self.CreateTempDir()
2349    bucket_uri = self.CreateBucket()
2350    self.CreateObject(bucket_uri=bucket_uri,
2351                      object_name='abc/',
2352                      contents=b'dir')
2353    self.CreateObject(bucket_uri=bucket_uri,
2354                      object_name='abc/def',
2355                      contents=b'def')
2356    self.AssertNObjectsInBucket(bucket_uri, 2)
2357    self.RunGsUtil(['cp', '-R', suri(bucket_uri), tmpdir])
2358    # Check that files in the subdir got copied even though subdir object
2359    # download was skipped.
2360    with open(os.path.join(tmpdir, bucket_uri.bucket_name, 'abc', 'def')) as f:
2361      self.assertEquals('def', '\n'.join(f.readlines()))
2362
2363  def test_cp_without_read_access(self):
2364    """Tests that cp fails without read access to the object."""
2365    # TODO: With 401's triggering retries in apitools, this test will take
2366    # a long time.  Ideally, make apitools accept a num_retries config for this
2367    # until we stop retrying the 401's.
2368    bucket_uri = self.CreateBucket()
2369    object_uri = self.CreateObject(bucket_uri=bucket_uri, contents=b'foo')
2370
2371    # Use @Retry as hedge against bucket listing eventual consistency.
2372    self.AssertNObjectsInBucket(bucket_uri, 1)
2373
2374    if self.default_provider == 's3':
2375      expected_error_regex = r'AccessDenied'
2376    else:
2377      expected_error_regex = r'Anonymous \S+ do(es)? not have'
2378
2379    with self.SetAnonymousBotoCreds():
2380      stderr = self.RunGsUtil(['cp', suri(object_uri), 'foo'],
2381                              return_stderr=True,
2382                              expected_status=1)
2383    self.assertRegex(stderr, expected_error_regex)
2384
2385  @unittest.skipIf(IS_WINDOWS, 'os.symlink() is not available on Windows.')
2386  def test_cp_minus_r_minus_e(self):
2387    """Tests that cp -e -r ignores symlinks when recursing."""
2388    bucket_uri = self.CreateBucket()
2389    tmpdir = self.CreateTempDir()
2390    # Create a valid file, since cp expects to copy at least one source URL
2391    # successfully.
2392    self.CreateTempFile(tmpdir=tmpdir, contents=b'foo')
2393    subdir = os.path.join(tmpdir, 'subdir')
2394    os.mkdir(subdir)
2395    os.mkdir(os.path.join(tmpdir, 'missing'))
2396    # Create a blank directory that is a broken symlink to ensure that we
2397    # don't fail recursive enumeration with a bad symlink.
2398    os.symlink(os.path.join(tmpdir, 'missing'), os.path.join(subdir, 'missing'))
2399    os.rmdir(os.path.join(tmpdir, 'missing'))
2400    self.RunGsUtil(['cp', '-r', '-e', tmpdir, suri(bucket_uri)])
2401
2402  @unittest.skipIf(IS_WINDOWS, 'os.symlink() is not available on Windows.')
2403  def test_cp_minus_e(self):
2404    fpath_dir = self.CreateTempDir()
2405    fpath1 = self.CreateTempFile(tmpdir=fpath_dir)
2406    fpath2 = os.path.join(fpath_dir, 'cp_minus_e')
2407    bucket_uri = self.CreateBucket()
2408    os.symlink(fpath1, fpath2)
2409    # We also use -c to continue on errors. One of the expanded glob entries
2410    # should be the symlinked file, which should throw a CommandException since
2411    # no valid (non-symlinked) files could be found at that path; we don't want
2412    # the command to terminate if that's the first file we attempt to copy.
2413    stderr = self.RunGsUtil([
2414        'cp', '-e', '-c',
2415        '%s%s*' % (fpath_dir, os.path.sep),
2416        suri(bucket_uri, 'files')
2417    ],
2418                            return_stderr=True)
2419    self.assertIn('Copying file', stderr)
2420    self.assertIn('Skipping symbolic link', stderr)
2421
2422    # Ensure that top-level arguments are ignored if they are symlinks. The file
2423    # at fpath1 should be successfully copied, then copying the symlink at
2424    # fpath2 should fail.
2425    stderr = self.RunGsUtil(
2426        ['cp', '-e', '-r', fpath1, fpath2,
2427         suri(bucket_uri, 'files')],
2428        return_stderr=True,
2429        expected_status=1)
2430    self.assertIn('Copying file', stderr)
2431    self.assertIn('Skipping symbolic link', stderr)
2432    self.assertIn('CommandException: No URLs matched: %s' % fpath2, stderr)
2433
2434  def test_cp_multithreaded_wildcard(self):
2435    """Tests that cp -m works with a wildcard."""
2436    num_test_files = 5
2437    tmp_dir = self.CreateTempDir(test_files=num_test_files)
2438    bucket_uri = self.CreateBucket()
2439    wildcard_uri = '%s%s*' % (tmp_dir, os.sep)
2440    self.RunGsUtil(['-m', 'cp', wildcard_uri, suri(bucket_uri)])
2441    self.AssertNObjectsInBucket(bucket_uri, num_test_files)
2442
2443  @SequentialAndParallelTransfer
2444  def test_cp_duplicate_source_args(self):
2445    """Tests that cp -m works when a source argument is provided twice."""
2446    object_contents = b'edge'
2447    object_uri = self.CreateObject(object_name='foo', contents=object_contents)
2448    tmp_dir = self.CreateTempDir()
2449    self.RunGsUtil(['-m', 'cp', suri(object_uri), suri(object_uri), tmp_dir])
2450    with open(os.path.join(tmp_dir, 'foo'), 'rb') as in_fp:
2451      contents = in_fp.read()
2452      # Contents should be not duplicated.
2453      self.assertEqual(contents, object_contents)
2454
2455  @SkipForS3('gsutil doesn\'t support S3 customer-supplied encryption keys.')
2456  @SequentialAndParallelTransfer
2457  def test_cp_download_encrypted_object(self):
2458    """Tests downloading an encrypted object."""
2459    if self.test_api == ApiSelector.XML:
2460      return unittest.skip(
2461          'gsutil does not support encryption with the XML API')
2462    object_contents = b'bar'
2463    object_uri = self.CreateObject(object_name='foo',
2464                                   contents=object_contents,
2465                                   encryption_key=TEST_ENCRYPTION_KEY1)
2466    fpath = self.CreateTempFile()
2467    boto_config_for_test = [('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY1)]
2468
2469    with SetBotoConfigForTest(boto_config_for_test):
2470      self.RunGsUtil(['cp', suri(object_uri), suri(fpath)])
2471    with open(fpath, 'rb') as f:
2472      self.assertEqual(f.read(), object_contents)
2473
2474    # If multiple keys are supplied and one is correct, download should succeed.
2475    fpath2 = self.CreateTempFile()
2476    boto_config_for_test2 = [
2477        ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY3),
2478        ('GSUtil', 'decryption_key1', TEST_ENCRYPTION_KEY2),
2479        ('GSUtil', 'decryption_key2', TEST_ENCRYPTION_KEY1)
2480    ]
2481
2482    with SetBotoConfigForTest(boto_config_for_test2):
2483      self.RunGsUtil(['cp', suri(object_uri), suri(fpath2)])
2484    with open(fpath2, 'rb') as f:
2485      self.assertEqual(f.read(), object_contents)
2486
2487  @SkipForS3('gsutil doesn\'t support S3 customer-supplied encryption keys.')
2488  @SequentialAndParallelTransfer
2489  def test_cp_download_encrypted_object_without_key(self):
2490    """Tests downloading an encrypted object without the necessary key."""
2491    if self.test_api == ApiSelector.XML:
2492      return unittest.skip(
2493          'gsutil does not support encryption with the XML API')
2494    object_contents = b'bar'
2495    object_uri = self.CreateObject(object_name='foo',
2496                                   contents=object_contents,
2497                                   encryption_key=TEST_ENCRYPTION_KEY1)
2498    fpath = self.CreateTempFile()
2499
2500    stderr = self.RunGsUtil(
2501        ['cp', suri(object_uri), suri(fpath)],
2502        expected_status=1,
2503        return_stderr=True)
2504    self.assertIn(
2505        'Missing decryption key with SHA256 hash %s' %
2506        TEST_ENCRYPTION_KEY1_SHA256_B64, stderr)
2507
2508  @SkipForS3('gsutil doesn\'t support S3 customer-supplied encryption keys.')
2509  @SequentialAndParallelTransfer
2510  def test_cp_upload_encrypted_object(self):
2511    """Tests uploading an encrypted object."""
2512    if self.test_api == ApiSelector.XML:
2513      return unittest.skip(
2514          'gsutil does not support encryption with the XML API')
2515    bucket_uri = self.CreateBucket()
2516    object_uri = suri(bucket_uri, 'foo')
2517    file_contents = b'bar'
2518    fpath = self.CreateTempFile(contents=file_contents, file_name='foo')
2519
2520    boto_config_for_test = [('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY1)]
2521
2522    # Uploading the object should succeed.
2523    with SetBotoConfigForTest(boto_config_for_test):
2524      self.RunGsUtil(['cp', suri(fpath), suri(bucket_uri)])
2525
2526    self.AssertObjectUsesCSEK(object_uri, TEST_ENCRYPTION_KEY1)
2527
2528    with SetBotoConfigForTest(boto_config_for_test):
2529      # Reading the object back should succeed.
2530      fpath2 = self.CreateTempFile()
2531      self.RunGsUtil(['cp', suri(bucket_uri, 'foo'), suri(fpath2)])
2532      with open(fpath2, 'rb') as f:
2533        self.assertEqual(f.read(), file_contents)
2534
2535  @SkipForS3('No resumable upload or encryption support for S3.')
2536  def test_cp_resumable_upload_encrypted_object_break(self):
2537    """Tests that an encrypted upload resumes after a connection break."""
2538    if self.test_api == ApiSelector.XML:
2539      return unittest.skip(
2540          'gsutil does not support encryption with the XML API')
2541    bucket_uri = self.CreateBucket()
2542    object_uri_str = suri(bucket_uri, 'foo')
2543    fpath = self.CreateTempFile(contents=b'a' * self.halt_size)
2544    boto_config_for_test = [('GSUtil', 'resumable_threshold', str(ONE_KIB)),
2545                            ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY1)]
2546    test_callback_file = self.CreateTempFile(
2547        contents=pickle.dumps(HaltingCopyCallbackHandler(True, 5)))
2548
2549    with SetBotoConfigForTest(boto_config_for_test):
2550      stderr = self.RunGsUtil([
2551          'cp', '--testcallbackfile', test_callback_file, fpath, object_uri_str
2552      ],
2553                              expected_status=1,
2554                              return_stderr=True)
2555      self.assertIn('Artifically halting upload', stderr)
2556      stderr = self.RunGsUtil(['cp', fpath, object_uri_str], return_stderr=True)
2557      self.assertIn('Resuming upload', stderr)
2558      stdout = self.RunGsUtil(['stat', object_uri_str], return_stdout=True)
2559      with open(fpath, 'rb') as fp:
2560        self.assertIn(CalculateB64EncodedMd5FromContents(fp), stdout)
2561
2562    self.AssertObjectUsesCSEK(object_uri_str, TEST_ENCRYPTION_KEY1)
2563
2564  @SkipForS3('No resumable upload or encryption support for S3.')
2565  def test_cp_resumable_upload_encrypted_object_different_key(self):
2566    """Tests that an encrypted upload resume uses original encryption key."""
2567    if self.test_api == ApiSelector.XML:
2568      return unittest.skip(
2569          'gsutil does not support encryption with the XML API')
2570    bucket_uri = self.CreateBucket()
2571    object_uri_str = suri(bucket_uri, 'foo')
2572    file_contents = b'a' * self.halt_size
2573    fpath = self.CreateTempFile(contents=file_contents)
2574    boto_config_for_test = [('GSUtil', 'resumable_threshold', str(ONE_KIB)),
2575                            ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY1)]
2576    test_callback_file = self.CreateTempFile(
2577        contents=pickle.dumps(HaltingCopyCallbackHandler(True, 5)))
2578
2579    with SetBotoConfigForTest(boto_config_for_test):
2580      stderr = self.RunGsUtil([
2581          'cp', '--testcallbackfile', test_callback_file, fpath, object_uri_str
2582      ],
2583                              expected_status=1,
2584                              return_stderr=True)
2585      self.assertIn('Artifically halting upload', stderr)
2586
2587    # Resume the upload with multiple keys, including the original.
2588    boto_config_for_test2 = [('GSUtil', 'resumable_threshold', str(ONE_KIB)),
2589                             ('GSUtil', 'decryption_key1',
2590                              TEST_ENCRYPTION_KEY2),
2591                             ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY1)]
2592
2593    with SetBotoConfigForTest(boto_config_for_test2):
2594      stderr = self.RunGsUtil(['cp', fpath, object_uri_str], return_stderr=True)
2595      self.assertIn('Resuming upload', stderr)
2596
2597    # Object should have the original key.
2598    self.AssertObjectUsesCSEK(object_uri_str, TEST_ENCRYPTION_KEY1)
2599
2600  @SkipForS3('No resumable upload or encryption support for S3.')
2601  def test_cp_resumable_upload_encrypted_object_missing_key(self):
2602    """Tests that an encrypted upload does not resume without original key."""
2603    if self.test_api == ApiSelector.XML:
2604      return unittest.skip(
2605          'gsutil does not support encryption with the XML API')
2606    bucket_uri = self.CreateBucket()
2607    object_uri_str = suri(bucket_uri, 'foo')
2608    file_contents = b'a' * self.halt_size
2609    fpath = self.CreateTempFile(contents=file_contents)
2610    boto_config_for_test = [('GSUtil', 'resumable_threshold', str(ONE_KIB)),
2611                            ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY1)]
2612    test_callback_file = self.CreateTempFile(
2613        contents=pickle.dumps(HaltingCopyCallbackHandler(True, 5)))
2614
2615    with SetBotoConfigForTest(boto_config_for_test):
2616      stderr = self.RunGsUtil([
2617          'cp', '--testcallbackfile', test_callback_file, fpath, object_uri_str
2618      ],
2619                              expected_status=1,
2620                              return_stderr=True)
2621      self.assertIn('Artifically halting upload', stderr)
2622
2623    # Resume the upload without the original key.
2624    boto_config_for_test2 = [('GSUtil', 'resumable_threshold', str(ONE_KIB)),
2625                             ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY2)]
2626
2627    with SetBotoConfigForTest(boto_config_for_test2):
2628      stderr = self.RunGsUtil(['cp', fpath, object_uri_str], return_stderr=True)
2629      self.assertNotIn('Resuming upload', stderr)
2630      self.assertIn('does not match current encryption key', stderr)
2631      self.assertIn('Restarting upload from scratch', stderr)
2632
2633      # Object should have the new key.
2634      self.AssertObjectUsesCSEK(object_uri_str, TEST_ENCRYPTION_KEY2)
2635
2636  def _ensure_object_unencrypted(self, object_uri_str):
2637    """Strongly consistent check that the object is unencrypted."""
2638    stdout = self.RunGsUtil(['stat', object_uri_str], return_stdout=True)
2639    self.assertNotIn('Encryption Key', stdout)
2640
2641  @SkipForS3('No resumable upload support for S3.')
2642  def test_cp_resumable_upload_break(self):
2643    """Tests that an upload can be resumed after a connection break."""
2644    bucket_uri = self.CreateBucket()
2645    fpath = self.CreateTempFile(contents=b'a' * self.halt_size)
2646    boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
2647    test_callback_file = self.CreateTempFile(
2648        contents=pickle.dumps(HaltingCopyCallbackHandler(True, 5)))
2649
2650    with SetBotoConfigForTest([boto_config_for_test]):
2651      stderr = self.RunGsUtil([
2652          'cp', '--testcallbackfile', test_callback_file, fpath,
2653          suri(bucket_uri)
2654      ],
2655                              expected_status=1,
2656                              return_stderr=True)
2657      self.assertIn('Artifically halting upload', stderr)
2658      stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)],
2659                              return_stderr=True)
2660      self.assertIn('Resuming upload', stderr)
2661
2662  @SkipForS3('No compressed transport encoding support for S3.')
2663  @SkipForXML('No compressed transport encoding support for the XML API.')
2664  @SequentialAndParallelTransfer
2665  def test_cp_resumable_upload_gzip_encoded_break(self):
2666    """Tests that a gzip encoded upload can be resumed."""
2667    # Setup the bucket and local data. File contents are randomized to prevent
2668    # them from compressing below the resumable-threshold and failing the test.
2669    bucket_uri = self.CreateBucket()
2670    contents = get_random_ascii_chars(size=self.halt_size)
2671    local_uri = self.CreateTempFile(file_name='test.txt', contents=contents)
2672    # Configure boto
2673    boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
2674    test_callback_file = self.CreateTempFile(
2675        contents=pickle.dumps(HaltingCopyCallbackHandler(True, 5)))
2676
2677    with SetBotoConfigForTest([boto_config_for_test]):
2678      stderr = self.RunGsUtil([
2679          '-D', 'cp', '-J', '--testcallbackfile', test_callback_file, local_uri,
2680          suri(bucket_uri)
2681      ],
2682                              expected_status=1,
2683                              return_stderr=True)
2684      # Ensure the progress logger sees a gzip encoding.
2685      self.assertIn('send: Using gzip transport encoding for the request.',
2686                    stderr)
2687      self.assertIn('Artifically halting upload', stderr)
2688      stderr = self.RunGsUtil(['-D', 'cp', '-J', local_uri,
2689                               suri(bucket_uri)],
2690                              return_stderr=True)
2691      self.assertIn('Resuming upload', stderr)
2692      # Ensure the progress logger is still seeing a gzip encoding.
2693      self.assertIn('send: Using gzip transport encoding for the request.',
2694                    stderr)
2695    # Ensure the files do not have a stored encoding of gzip and are stored
2696    # uncompressed.
2697    temp_uri = self.CreateTempFile()
2698    remote_uri = suri(bucket_uri, 'test.txt')
2699    stdout = self.RunGsUtil(['stat', remote_uri], return_stdout=True)
2700    self.assertNotRegex(stdout, r'Content-Encoding:\s+gzip')
2701    self.RunGsUtil(['cp', remote_uri, suri(temp_uri)])
2702    with open(temp_uri, 'rb') as f:
2703      self.assertEqual(f.read(), contents)
2704
2705  @SkipForS3('No resumable upload support for S3.')
2706  def test_cp_resumable_upload_retry(self):
2707    """Tests that a resumable upload completes with one retry."""
2708    bucket_uri = self.CreateBucket()
2709    fpath = self.CreateTempFile(contents=b'a' * self.halt_size)
2710    # TODO: Raising an httplib or socket error blocks bucket teardown
2711    # in JSON for 60-120s on a multiprocessing lock acquire. Figure out why;
2712    # until then, raise an apitools retryable exception.
2713    if self.test_api == ApiSelector.XML:
2714      test_callback_file = self.CreateTempFile(contents=pickle.dumps(
2715          _ResumableUploadRetryHandler(5, http_client.BadStatusLine, (
2716              'unused',))))
2717    else:
2718      test_callback_file = self.CreateTempFile(contents=pickle.dumps(
2719          _ResumableUploadRetryHandler(
2720              5, apitools_exceptions.BadStatusCodeError, ('unused', 'unused',
2721                                                          'unused'))))
2722    boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
2723    with SetBotoConfigForTest([boto_config_for_test]):
2724      stderr = self.RunGsUtil([
2725          '-D', 'cp', '--testcallbackfile', test_callback_file, fpath,
2726          suri(bucket_uri)
2727      ],
2728                              return_stderr=1)
2729      if self.test_api == ApiSelector.XML:
2730        self.assertIn('Got retryable failure', stderr)
2731      else:
2732        self.assertIn('Retrying', stderr)
2733
2734  @SkipForS3('No resumable upload support for S3.')
2735  def test_cp_resumable_streaming_upload_retry(self):
2736    """Tests that a streaming resumable upload completes with one retry."""
2737    if self.test_api == ApiSelector.XML:
2738      return unittest.skip('XML does not support resumable streaming uploads.')
2739    bucket_uri = self.CreateBucket()
2740
2741    test_callback_file = self.CreateTempFile(contents=pickle.dumps(
2742        _ResumableUploadRetryHandler(5, apitools_exceptions.BadStatusCodeError,
2743                                     ('unused', 'unused', 'unused'))))
2744    # Need to reduce the JSON chunk size since streaming uploads buffer a
2745    # full chunk.
2746    boto_configs_for_test = [('GSUtil', 'json_resumable_chunk_size',
2747                              str(256 * ONE_KIB)), ('Boto', 'num_retries', '2')]
2748    with SetBotoConfigForTest(boto_configs_for_test):
2749      stderr = self.RunGsUtil([
2750          '-D', 'cp', '--testcallbackfile', test_callback_file, '-',
2751          suri(bucket_uri, 'foo')
2752      ],
2753                              stdin='a' * 512 * ONE_KIB,
2754                              return_stderr=1)
2755      self.assertIn('Retrying', stderr)
2756
2757  @SkipForS3('preserve_acl flag not supported for S3.')
2758  def test_cp_preserve_no_owner(self):
2759    bucket_uri = self.CreateBucket()
2760    object_uri = self.CreateObject(bucket_uri=bucket_uri, contents=b'foo')
2761    # Anonymous user can read the object and write to the bucket, but does
2762    # not own the object.
2763    self.RunGsUtil(['acl', 'ch', '-u', 'AllUsers:R', suri(object_uri)])
2764    self.RunGsUtil(['acl', 'ch', '-u', 'AllUsers:W', suri(bucket_uri)])
2765    with self.SetAnonymousBotoCreds():
2766      stderr = self.RunGsUtil(
2767          ['cp', '-p', suri(object_uri),
2768           suri(bucket_uri, 'foo')],
2769          return_stderr=True,
2770          expected_status=1)
2771      self.assertIn('OWNER permission is required for preserving ACLs', stderr)
2772
2773  @SkipForS3('No resumable upload support for S3.')
2774  def test_cp_progress_callbacks(self):
2775    bucket_uri = self.CreateBucket()
2776    final_size_string = BytesToFixedWidthString(1024**2)
2777    final_progress_callback = final_size_string + '/' + final_size_string
2778    fpath = self.CreateTempFile(contents=b'a' * ONE_MIB, file_name='foo')
2779    boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
2780    with SetBotoConfigForTest([boto_config_for_test]):
2781      stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)],
2782                              return_stderr=True)
2783      self.assertEquals(1, stderr.count(final_progress_callback))
2784    boto_config_for_test = ('GSUtil', 'resumable_threshold', str(2 * ONE_MIB))
2785    with SetBotoConfigForTest([boto_config_for_test]):
2786      stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)],
2787                              return_stderr=True)
2788      self.assertEquals(1, stderr.count(final_progress_callback))
2789    stderr = self.RunGsUtil(['cp', suri(bucket_uri, 'foo'), fpath],
2790                            return_stderr=True)
2791    self.assertEquals(1, stderr.count(final_progress_callback))
2792
2793  @SkipForS3('No resumable upload support for S3.')
2794  def test_cp_resumable_upload(self):
2795    """Tests that a basic resumable upload completes successfully."""
2796    bucket_uri = self.CreateBucket()
2797    fpath = self.CreateTempFile(contents=b'a' * self.halt_size)
2798    boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
2799    with SetBotoConfigForTest([boto_config_for_test]):
2800      self.RunGsUtil(['cp', fpath, suri(bucket_uri)])
2801
2802  @SkipForS3('No resumable upload support for S3.')
2803  def test_resumable_upload_break_leaves_tracker(self):
2804    """Tests that a tracker file is created with a resumable upload."""
2805    bucket_uri = self.CreateBucket()
2806    fpath = self.CreateTempFile(file_name='foo', contents=b'a' * self.halt_size)
2807    boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
2808    with SetBotoConfigForTest([boto_config_for_test]):
2809      tracker_filename = GetTrackerFilePath(
2810          StorageUrlFromString(suri(bucket_uri, 'foo')), TrackerFileType.UPLOAD,
2811          self.test_api)
2812      test_callback_file = self.CreateTempFile(
2813          contents=pickle.dumps(HaltingCopyCallbackHandler(True, 5)))
2814      try:
2815        stderr = self.RunGsUtil([
2816            'cp', '--testcallbackfile', test_callback_file, fpath,
2817            suri(bucket_uri, 'foo')
2818        ],
2819                                expected_status=1,
2820                                return_stderr=True)
2821        self.assertIn('Artifically halting upload', stderr)
2822        self.assertTrue(os.path.exists(tracker_filename),
2823                        'Tracker file %s not present.' % tracker_filename)
2824        # Test the permissions
2825        if os.name == 'posix':
2826          mode = oct(stat.S_IMODE(os.stat(tracker_filename).st_mode))
2827          # Assert that only user has read/write permission
2828          self.assertEqual(oct(0o600), mode)
2829      finally:
2830        DeleteTrackerFile(tracker_filename)
2831
2832  @SkipForS3('No resumable upload support for S3.')
2833  def test_cp_resumable_upload_break_file_size_change(self):
2834    """Tests a resumable upload where the uploaded file changes size.
2835
2836    This should fail when we read the tracker data.
2837    """
2838    bucket_uri = self.CreateBucket()
2839    tmp_dir = self.CreateTempDir()
2840    fpath = self.CreateTempFile(file_name='foo',
2841                                tmpdir=tmp_dir,
2842                                contents=b'a' * self.halt_size)
2843    test_callback_file = self.CreateTempFile(
2844        contents=pickle.dumps(HaltingCopyCallbackHandler(True, 5)))
2845
2846    boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
2847    with SetBotoConfigForTest([boto_config_for_test]):
2848      stderr = self.RunGsUtil([
2849          'cp', '--testcallbackfile', test_callback_file, fpath,
2850          suri(bucket_uri)
2851      ],
2852                              expected_status=1,
2853                              return_stderr=True)
2854      self.assertIn('Artifically halting upload', stderr)
2855      fpath = self.CreateTempFile(file_name='foo',
2856                                  tmpdir=tmp_dir,
2857                                  contents=b'a' * self.halt_size * 2)
2858      stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)],
2859                              expected_status=1,
2860                              return_stderr=True)
2861      self.assertIn('ResumableUploadAbortException', stderr)
2862
2863  @SkipForS3('No resumable upload support for S3.')
2864  def test_cp_resumable_upload_break_file_content_change(self):
2865    """Tests a resumable upload where the uploaded file changes content."""
2866    if self.test_api == ApiSelector.XML:
2867      return unittest.skip(
2868          'XML doesn\'t make separate HTTP calls at fixed-size boundaries for '
2869          'resumable uploads, so we can\'t guarantee that the server saves a '
2870          'specific part of the upload.')
2871    bucket_uri = self.CreateBucket()
2872    tmp_dir = self.CreateTempDir()
2873    fpath = self.CreateTempFile(file_name='foo',
2874                                tmpdir=tmp_dir,
2875                                contents=b'a' * ONE_KIB * 512)
2876    test_callback_file = self.CreateTempFile(contents=pickle.dumps(
2877        HaltingCopyCallbackHandler(True,
2878                                   int(ONE_KIB) * 384)))
2879    resumable_threshold_for_test = ('GSUtil', 'resumable_threshold',
2880                                    str(ONE_KIB))
2881    resumable_chunk_size_for_test = ('GSUtil', 'json_resumable_chunk_size',
2882                                     str(ONE_KIB * 256))
2883    with SetBotoConfigForTest(
2884        [resumable_threshold_for_test, resumable_chunk_size_for_test]):
2885      stderr = self.RunGsUtil([
2886          'cp', '--testcallbackfile', test_callback_file, fpath,
2887          suri(bucket_uri)
2888      ],
2889                              expected_status=1,
2890                              return_stderr=True)
2891      self.assertIn('Artifically halting upload', stderr)
2892      fpath = self.CreateTempFile(file_name='foo',
2893                                  tmpdir=tmp_dir,
2894                                  contents=b'b' * ONE_KIB * 512)
2895      stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)],
2896                              expected_status=1,
2897                              return_stderr=True)
2898      self.assertIn('doesn\'t match cloud-supplied digest', stderr)
2899
2900  @SkipForS3('No resumable upload support for S3.')
2901  def test_cp_resumable_upload_break_file_smaller_size(self):
2902    """Tests a resumable upload where the uploaded file changes content.
2903
2904    This should fail hash validation.
2905    """
2906    bucket_uri = self.CreateBucket()
2907    tmp_dir = self.CreateTempDir()
2908    fpath = self.CreateTempFile(file_name='foo',
2909                                tmpdir=tmp_dir,
2910                                contents=b'a' * ONE_KIB * 512)
2911    test_callback_file = self.CreateTempFile(contents=pickle.dumps(
2912        HaltingCopyCallbackHandler(True,
2913                                   int(ONE_KIB) * 384)))
2914    resumable_threshold_for_test = ('GSUtil', 'resumable_threshold',
2915                                    str(ONE_KIB))
2916    resumable_chunk_size_for_test = ('GSUtil', 'json_resumable_chunk_size',
2917                                     str(ONE_KIB * 256))
2918    with SetBotoConfigForTest(
2919        [resumable_threshold_for_test, resumable_chunk_size_for_test]):
2920      stderr = self.RunGsUtil([
2921          'cp', '--testcallbackfile', test_callback_file, fpath,
2922          suri(bucket_uri)
2923      ],
2924                              expected_status=1,
2925                              return_stderr=True)
2926      self.assertIn('Artifically halting upload', stderr)
2927      fpath = self.CreateTempFile(file_name='foo',
2928                                  tmpdir=tmp_dir,
2929                                  contents=b'a' * ONE_KIB)
2930      stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)],
2931                              expected_status=1,
2932                              return_stderr=True)
2933      self.assertIn('ResumableUploadAbortException', stderr)
2934
2935  @SkipForS3('No resumable upload support for S3.')
2936  def test_cp_composite_encrypted_upload_resume(self):
2937    """Tests that an encrypted composite upload resumes successfully."""
2938    if self.test_api == ApiSelector.XML:
2939      return unittest.skip(
2940          'gsutil does not support encryption with the XML API')
2941    bucket_uri = self.CreateBucket()
2942    dst_url = StorageUrlFromString(suri(bucket_uri, 'foo'))
2943
2944    file_contents = b'foobar'
2945    file_name = 'foobar'
2946    source_file = self.CreateTempFile(contents=file_contents,
2947                                      file_name=file_name)
2948    src_url = StorageUrlFromString(source_file)
2949
2950    # Simulate an upload that had occurred by writing a tracker file
2951    # that points to a previously uploaded component.
2952    tracker_file_name = GetTrackerFilePath(dst_url,
2953                                           TrackerFileType.PARALLEL_UPLOAD,
2954                                           self.test_api, src_url)
2955    tracker_prefix = '123'
2956
2957    # Create component 0 to be used in the resume; it must match the name
2958    # that will be generated in copy_helper, so we use the same scheme.
2959    encoded_name = (PARALLEL_UPLOAD_STATIC_SALT + source_file).encode(UTF8)
2960    content_md5 = hashlib.md5()
2961    content_md5.update(encoded_name)
2962    digest = content_md5.hexdigest()
2963    component_object_name = (tracker_prefix + PARALLEL_UPLOAD_TEMP_NAMESPACE +
2964                             digest + '_0')
2965
2966    component_size = 3
2967    object_uri = self.CreateObject(bucket_uri=bucket_uri,
2968                                   object_name=component_object_name,
2969                                   contents=file_contents[:component_size],
2970                                   encryption_key=TEST_ENCRYPTION_KEY1)
2971    existing_component = ObjectFromTracker(component_object_name,
2972                                           str(object_uri.generation))
2973    existing_components = [existing_component]
2974    enc_key_sha256 = TEST_ENCRYPTION_KEY1_SHA256_B64
2975
2976    WriteParallelUploadTrackerFile(tracker_file_name,
2977                                   tracker_prefix,
2978                                   existing_components,
2979                                   encryption_key_sha256=enc_key_sha256)
2980
2981    try:
2982      # Now "resume" the upload using the original encryption key.
2983      with SetBotoConfigForTest([
2984          ('GSUtil', 'parallel_composite_upload_threshold', '1'),
2985          ('GSUtil', 'parallel_composite_upload_component_size',
2986           str(component_size)),
2987          ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY1)
2988      ]):
2989        stderr = self.RunGsUtil(
2990            ['cp', source_file, suri(bucket_uri, 'foo')], return_stderr=True)
2991        self.assertIn('Found 1 existing temporary components to reuse.', stderr)
2992        self.assertFalse(
2993            os.path.exists(tracker_file_name),
2994            'Tracker file %s should have been deleted.' % tracker_file_name)
2995        read_contents = self.RunGsUtil(['cat', suri(bucket_uri, 'foo')],
2996                                       return_stdout=True)
2997        self.assertEqual(read_contents.encode('ascii'), file_contents)
2998    finally:
2999      # Clean up if something went wrong.
3000      DeleteTrackerFile(tracker_file_name)
3001
3002  @SkipForS3('No resumable upload support for S3.')
3003  def test_cp_composite_encrypted_upload_restart(self):
3004    """Tests that encrypted composite upload restarts given a different key."""
3005    if self.test_api == ApiSelector.XML:
3006      return unittest.skip(
3007          'gsutil does not support encryption with the XML API')
3008    bucket_uri = self.CreateBucket()
3009    dst_url = StorageUrlFromString(suri(bucket_uri, 'foo'))
3010
3011    file_contents = b'foobar'
3012    source_file = self.CreateTempFile(contents=file_contents, file_name='foo')
3013    src_url = StorageUrlFromString(source_file)
3014
3015    # Simulate an upload that had occurred by writing a tracker file.
3016    tracker_file_name = GetTrackerFilePath(dst_url,
3017                                           TrackerFileType.PARALLEL_UPLOAD,
3018                                           self.test_api, src_url)
3019    tracker_prefix = '123'
3020    existing_component_name = 'foo_1'
3021    object_uri = self.CreateObject(bucket_uri=bucket_uri,
3022                                   object_name='foo_1',
3023                                   contents=b'foo',
3024                                   encryption_key=TEST_ENCRYPTION_KEY1)
3025    existing_component = ObjectFromTracker(existing_component_name,
3026                                           str(object_uri.generation))
3027    existing_components = [existing_component]
3028    enc_key_sha256 = TEST_ENCRYPTION_KEY1_SHA256_B64
3029    WriteParallelUploadTrackerFile(tracker_file_name, tracker_prefix,
3030                                   existing_components,
3031                                   enc_key_sha256.decode('ascii'))
3032
3033    try:
3034      # Now "resume" the upload using the original encryption key.
3035      with SetBotoConfigForTest([
3036          ('GSUtil', 'parallel_composite_upload_threshold', '1'),
3037          ('GSUtil', 'parallel_composite_upload_component_size', '3'),
3038          ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY2)
3039      ]):
3040        stderr = self.RunGsUtil(
3041            ['cp', source_file, suri(bucket_uri, 'foo')], return_stderr=True)
3042        self.assertIn(
3043            'does not match current encryption key. '
3044            'Deleting old components and restarting upload', stderr)
3045        self.assertNotIn('existing temporary components to reuse.', stderr)
3046        self.assertFalse(
3047            os.path.exists(tracker_file_name),
3048            'Tracker file %s should have been deleted.' % tracker_file_name)
3049        read_contents = self.RunGsUtil(['cat', suri(bucket_uri, 'foo')],
3050                                       return_stdout=True)
3051        self.assertEqual(read_contents.encode('ascii'), file_contents)
3052    finally:
3053      # Clean up if something went wrong.
3054      DeleteTrackerFile(tracker_file_name)
3055
3056  # This temporarily changes the tracker directory to unwritable which
3057  # interferes with any parallel running tests that use the tracker directory.
3058  @NotParallelizable
3059  @SkipForS3('No resumable upload support for S3.')
3060  @unittest.skipIf(IS_WINDOWS, 'chmod on dir unsupported on Windows.')
3061  @SequentialAndParallelTransfer
3062  def test_cp_unwritable_tracker_file(self):
3063    """Tests a resumable upload with an unwritable tracker file."""
3064    bucket_uri = self.CreateBucket()
3065    tracker_filename = GetTrackerFilePath(
3066        StorageUrlFromString(suri(bucket_uri, 'foo')), TrackerFileType.UPLOAD,
3067        self.test_api)
3068    tracker_dir = os.path.dirname(tracker_filename)
3069    fpath = self.CreateTempFile(file_name='foo', contents=b'a' * ONE_KIB)
3070    boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
3071    save_mod = os.stat(tracker_dir).st_mode
3072
3073    try:
3074      os.chmod(tracker_dir, 0)
3075      with SetBotoConfigForTest([boto_config_for_test]):
3076        stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)],
3077                                expected_status=1,
3078                                return_stderr=True)
3079        self.assertIn('Couldn\'t write tracker file', stderr)
3080    finally:
3081      os.chmod(tracker_dir, save_mod)
3082      if os.path.exists(tracker_filename):
3083        os.unlink(tracker_filename)
3084
3085  # This temporarily changes the tracker directory to unwritable which
3086  # interferes with any parallel running tests that use the tracker directory.
3087  @NotParallelizable
3088  @unittest.skipIf(IS_WINDOWS, 'chmod on dir unsupported on Windows.')
3089  @SequentialAndParallelTransfer
3090  def test_cp_unwritable_tracker_file_download(self):
3091    """Tests downloads with an unwritable tracker file."""
3092    object_uri = self.CreateObject(contents=b'foo' * ONE_KIB)
3093    tracker_filename = GetTrackerFilePath(
3094        StorageUrlFromString(suri(object_uri)), TrackerFileType.DOWNLOAD,
3095        self.test_api)
3096    tracker_dir = os.path.dirname(tracker_filename)
3097    fpath = self.CreateTempFile()
3098    save_mod = os.stat(tracker_dir).st_mode
3099
3100    try:
3101      os.chmod(tracker_dir, 0)
3102      boto_config_for_test = ('GSUtil', 'resumable_threshold', str(EIGHT_MIB))
3103      with SetBotoConfigForTest([boto_config_for_test]):
3104        # Should succeed because we are below the threshold.
3105        self.RunGsUtil(['cp', suri(object_uri), fpath])
3106      boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
3107      with SetBotoConfigForTest([boto_config_for_test]):
3108        stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
3109                                expected_status=1,
3110                                return_stderr=True)
3111        self.assertIn('Couldn\'t write tracker file', stderr)
3112    finally:
3113      os.chmod(tracker_dir, save_mod)
3114      if os.path.exists(tracker_filename):
3115        os.unlink(tracker_filename)
3116
3117  def _test_cp_resumable_download_break_helper(self,
3118                                               boto_config,
3119                                               encryption_key=None):
3120    """Helper function for different modes of resumable download break.
3121
3122    Args:
3123      boto_config: List of boto configuration tuples for use with
3124          SetBotoConfigForTest.
3125      encryption_key: Base64 encryption key for object encryption (if any).
3126    """
3127    bucket_uri = self.CreateBucket()
3128    file_contents = b'a' * self.halt_size
3129    object_uri = self.CreateObject(bucket_uri=bucket_uri,
3130                                   object_name='foo',
3131                                   contents=file_contents,
3132                                   encryption_key=encryption_key)
3133    fpath = self.CreateTempFile()
3134    test_callback_file = self.CreateTempFile(
3135        contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5)))
3136
3137    with SetBotoConfigForTest(boto_config):
3138      stderr = self.RunGsUtil([
3139          'cp', '--testcallbackfile', test_callback_file,
3140          suri(object_uri), fpath
3141      ],
3142                              expected_status=1,
3143                              return_stderr=True)
3144      self.assertIn('Artifically halting download.', stderr)
3145      tracker_filename = GetTrackerFilePath(StorageUrlFromString(fpath),
3146                                            TrackerFileType.DOWNLOAD,
3147                                            self.test_api)
3148      self.assertTrue(os.path.isfile(tracker_filename))
3149      stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
3150                              return_stderr=True)
3151      self.assertIn('Resuming download', stderr)
3152    with open(fpath, 'rb') as f:
3153      self.assertEqual(f.read(), file_contents, 'File contents differ')
3154
3155  def test_cp_resumable_download_break(self):
3156    """Tests that a download can be resumed after a connection break."""
3157    self._test_cp_resumable_download_break_helper([
3158        ('GSUtil', 'resumable_threshold', str(ONE_KIB))
3159    ])
3160
3161  @SkipForS3('gsutil doesn\'t support S3 customer-supplied encryption keys.')
3162  def test_cp_resumable_encrypted_download_break(self):
3163    """Tests that an encrypted download resumes after a connection break."""
3164    if self.test_api == ApiSelector.XML:
3165      return unittest.skip(
3166          'gsutil does not support encryption with the XML API')
3167    self._test_cp_resumable_download_break_helper(
3168        [('GSUtil', 'resumable_threshold', str(ONE_KIB)),
3169         ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY1)],
3170        encryption_key=TEST_ENCRYPTION_KEY1)
3171
3172  @SkipForS3('gsutil doesn\'t support S3 customer-supplied encryption keys.')
3173  def test_cp_resumable_encrypted_download_key_rotation(self):
3174    """Tests that a download restarts with a rotated encryption key."""
3175    if self.test_api == ApiSelector.XML:
3176      return unittest.skip(
3177          'gsutil does not support encryption with the XML API')
3178    bucket_uri = self.CreateBucket()
3179    file_contents = b'a' * self.halt_size
3180    object_uri = self.CreateObject(bucket_uri=bucket_uri,
3181                                   object_name='foo',
3182                                   contents=file_contents,
3183                                   encryption_key=TEST_ENCRYPTION_KEY1)
3184    fpath = self.CreateTempFile()
3185    test_callback_file = self.CreateTempFile(
3186        contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5)))
3187
3188    boto_config_for_test = [('GSUtil', 'resumable_threshold', str(ONE_KIB)),
3189                            ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY1)]
3190
3191    with SetBotoConfigForTest(boto_config_for_test):
3192      stderr = self.RunGsUtil([
3193          'cp', '--testcallbackfile', test_callback_file,
3194          suri(object_uri), fpath
3195      ],
3196                              expected_status=1,
3197                              return_stderr=True)
3198      self.assertIn('Artifically halting download.', stderr)
3199      tracker_filename = GetTrackerFilePath(StorageUrlFromString(fpath),
3200                                            TrackerFileType.DOWNLOAD,
3201                                            self.test_api)
3202      self.assertTrue(os.path.isfile(tracker_filename))
3203
3204    # After simulated connection break, rotate the key on the object.
3205    boto_config_for_test2 = [('GSUtil', 'resumable_threshold', str(ONE_KIB)),
3206                             ('GSUtil', 'decryption_key1',
3207                              TEST_ENCRYPTION_KEY1),
3208                             ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY2)]
3209    with SetBotoConfigForTest(boto_config_for_test2):
3210      self.RunGsUtil(['rewrite', '-k', suri(object_uri)])
3211
3212    # Now resume the download using only the new encryption key. Since its
3213    # generation changed, we must restart it.
3214    boto_config_for_test3 = [('GSUtil', 'resumable_threshold', str(ONE_KIB)),
3215                             ('GSUtil', 'encryption_key', TEST_ENCRYPTION_KEY2)]
3216    with SetBotoConfigForTest(boto_config_for_test3):
3217      stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
3218                              return_stderr=True)
3219      self.assertIn('Restarting download', stderr)
3220    with open(fpath, 'rb') as f:
3221      self.assertEqual(f.read(), file_contents, 'File contents differ')
3222
3223  @SequentialAndParallelTransfer
3224  def test_cp_resumable_download_etag_differs(self):
3225    """Tests that download restarts the file when the source object changes.
3226
3227    This causes the etag not to match.
3228    """
3229    bucket_uri = self.CreateBucket()
3230    object_uri = self.CreateObject(bucket_uri=bucket_uri,
3231                                   object_name='foo',
3232                                   contents=b'abc' * self.halt_size)
3233    fpath = self.CreateTempFile()
3234    test_callback_file = self.CreateTempFile(
3235        contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5)))
3236    boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
3237    with SetBotoConfigForTest([boto_config_for_test]):
3238      # This will create a tracker file with an ETag.
3239      stderr = self.RunGsUtil([
3240          'cp', '--testcallbackfile', test_callback_file,
3241          suri(object_uri), fpath
3242      ],
3243                              expected_status=1,
3244                              return_stderr=True)
3245      self.assertIn('Artifically halting download.', stderr)
3246      # Create a new object with different contents - it should have a
3247      # different ETag since the content has changed.
3248      object_uri = self.CreateObject(
3249          bucket_uri=bucket_uri,
3250          object_name='foo',
3251          contents=b'b' * self.halt_size,
3252          gs_idempotent_generation=object_uri.generation)
3253      stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
3254                              return_stderr=True)
3255      self.assertNotIn('Resuming download', stderr)
3256
3257  # TODO: Enable this test for sequential downloads when their tracker files are
3258  # modified to contain the source object generation.
3259  @unittest.skipUnless(UsingCrcmodExtension(),
3260                       'Sliced download requires fast crcmod.')
3261  @SkipForS3('No sliced download support for S3.')
3262  def test_cp_resumable_download_generation_differs(self):
3263    """Tests that a resumable download restarts if the generation differs."""
3264    bucket_uri = self.CreateBucket()
3265    file_contents = b'abcd' * self.halt_size
3266    object_uri = self.CreateObject(bucket_uri=bucket_uri,
3267                                   object_name='foo',
3268                                   contents=file_contents)
3269    fpath = self.CreateTempFile()
3270
3271    test_callback_file = self.CreateTempFile(
3272        contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5)))
3273
3274    boto_config_for_test = [
3275        ('GSUtil', 'resumable_threshold', str(self.halt_size)),
3276        ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)),
3277        ('GSUtil', 'sliced_object_download_max_components', '3')
3278    ]
3279
3280    with SetBotoConfigForTest(boto_config_for_test):
3281      stderr = self.RunGsUtil([
3282          'cp', '--testcallbackfile', test_callback_file,
3283          suri(object_uri),
3284          suri(fpath)
3285      ],
3286                              return_stderr=True,
3287                              expected_status=1)
3288      self.assertIn('Artifically halting download.', stderr)
3289
3290      # Overwrite the object with an identical object, increasing
3291      # the generation but leaving other metadata the same.
3292      identical_file = self.CreateTempFile(contents=file_contents)
3293      self.RunGsUtil(['cp', suri(identical_file), suri(object_uri)])
3294
3295      stderr = self.RunGsUtil(
3296          ['cp', suri(object_uri), suri(fpath)], return_stderr=True)
3297      self.assertIn('Restarting download from scratch', stderr)
3298      with open(fpath, 'rb') as f:
3299        self.assertEqual(f.read(), file_contents, 'File contents differ')
3300
3301  def test_cp_resumable_download_file_larger(self):
3302    """Tests download deletes the tracker file when existing file is larger."""
3303    bucket_uri = self.CreateBucket()
3304    fpath = self.CreateTempFile()
3305    object_uri = self.CreateObject(bucket_uri=bucket_uri,
3306                                   object_name='foo',
3307                                   contents=b'a' * self.halt_size)
3308    test_callback_file = self.CreateTempFile(
3309        contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5)))
3310    boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
3311    with SetBotoConfigForTest([boto_config_for_test]):
3312      stderr = self.RunGsUtil([
3313          'cp', '--testcallbackfile', test_callback_file,
3314          suri(object_uri), fpath
3315      ],
3316                              expected_status=1,
3317                              return_stderr=True)
3318      self.assertIn('Artifically halting download.', stderr)
3319      with open(fpath + '_.gstmp', 'w') as larger_file:
3320        for _ in range(self.halt_size * 2):
3321          larger_file.write('a')
3322      stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
3323                              expected_status=1,
3324                              return_stderr=True)
3325      self.assertNotIn('Resuming download', stderr)
3326      self.assertIn('Deleting tracker file', stderr)
3327
3328  def test_cp_resumable_download_content_differs(self):
3329    """Tests that we do not re-download when tracker file matches existing file.
3330
3331    We only compare size, not contents, so re-download should not occur even
3332    though the contents are technically different. However, hash validation on
3333    the file should still occur and we will delete the file then because
3334    the hashes differ.
3335    """
3336    bucket_uri = self.CreateBucket()
3337    tmp_dir = self.CreateTempDir()
3338    fpath = self.CreateTempFile(tmpdir=tmp_dir)
3339    temp_download_file = fpath + '_.gstmp'
3340    with open(temp_download_file, 'w') as fp:
3341      fp.write('abcd' * ONE_KIB)
3342
3343    object_uri = self.CreateObject(bucket_uri=bucket_uri,
3344                                   object_name='foo',
3345                                   contents=b'efgh' * ONE_KIB)
3346    stdout = self.RunGsUtil(['ls', '-L', suri(object_uri)], return_stdout=True)
3347    etag_match = re.search(r'\s*ETag:\s*(.*)', stdout)
3348    self.assertIsNotNone(etag_match, 'Could not get object ETag')
3349    self.assertEqual(len(etag_match.groups()), 1,
3350                     'Did not match expected single ETag')
3351    etag = etag_match.group(1)
3352
3353    tracker_filename = GetTrackerFilePath(StorageUrlFromString(fpath),
3354                                          TrackerFileType.DOWNLOAD,
3355                                          self.test_api)
3356    try:
3357      with open(tracker_filename, 'w') as tracker_fp:
3358        tracker_fp.write(etag)
3359      boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
3360      with SetBotoConfigForTest([boto_config_for_test]):
3361        stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
3362                                return_stderr=True,
3363                                expected_status=1)
3364        self.assertIn('Download already complete', stderr)
3365        self.assertIn('doesn\'t match cloud-supplied digest', stderr)
3366        # File and tracker file should be deleted.
3367        self.assertFalse(os.path.isfile(temp_download_file))
3368        self.assertFalse(os.path.isfile(tracker_filename))
3369        # Permanent file should not have been created.
3370        self.assertFalse(os.path.isfile(fpath))
3371    finally:
3372      if os.path.exists(tracker_filename):
3373        os.unlink(tracker_filename)
3374
3375  def test_cp_resumable_download_content_matches(self):
3376    """Tests download no-ops when tracker file matches existing file."""
3377    bucket_uri = self.CreateBucket()
3378    tmp_dir = self.CreateTempDir()
3379    fpath = self.CreateTempFile(tmpdir=tmp_dir)
3380    matching_contents = b'abcd' * ONE_KIB
3381    temp_download_file = fpath + '_.gstmp'
3382    with open(temp_download_file, 'wb') as fp:
3383      fp.write(matching_contents)
3384
3385    object_uri = self.CreateObject(bucket_uri=bucket_uri,
3386                                   object_name='foo',
3387                                   contents=matching_contents)
3388    stdout = self.RunGsUtil(['ls', '-L', suri(object_uri)], return_stdout=True)
3389    etag_match = re.search(r'\s*ETag:\s*(.*)', stdout)
3390    self.assertIsNotNone(etag_match, 'Could not get object ETag')
3391    self.assertEqual(len(etag_match.groups()), 1,
3392                     'Did not match expected single ETag')
3393    etag = etag_match.group(1)
3394    tracker_filename = GetTrackerFilePath(StorageUrlFromString(fpath),
3395                                          TrackerFileType.DOWNLOAD,
3396                                          self.test_api)
3397    with open(tracker_filename, 'w') as tracker_fp:
3398      tracker_fp.write(etag)
3399    try:
3400      boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
3401      with SetBotoConfigForTest([boto_config_for_test]):
3402        stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
3403                                return_stderr=True)
3404        self.assertIn('Download already complete', stderr)
3405        # Tracker file should be removed after successful hash validation.
3406        self.assertFalse(os.path.isfile(tracker_filename))
3407    finally:
3408      if os.path.exists(tracker_filename):
3409        os.unlink(tracker_filename)
3410
3411  def test_cp_resumable_download_tracker_file_not_matches(self):
3412    """Tests that download overwrites when tracker file etag does not match."""
3413    bucket_uri = self.CreateBucket()
3414    tmp_dir = self.CreateTempDir()
3415    fpath = self.CreateTempFile(tmpdir=tmp_dir, contents=b'abcd' * ONE_KIB)
3416    object_uri = self.CreateObject(bucket_uri=bucket_uri,
3417                                   object_name='foo',
3418                                   contents=b'efgh' * ONE_KIB)
3419    stdout = self.RunGsUtil(['ls', '-L', suri(object_uri)], return_stdout=True)
3420    etag_match = re.search(r'\s*ETag:\s*(.*)', stdout)
3421    self.assertIsNotNone(etag_match, 'Could not get object ETag')
3422    self.assertEqual(len(etag_match.groups()), 1,
3423                     'Did not match regex for exactly one object ETag')
3424    etag = etag_match.group(1)
3425    etag += 'nonmatching'
3426    tracker_filename = GetTrackerFilePath(StorageUrlFromString(fpath),
3427                                          TrackerFileType.DOWNLOAD,
3428                                          self.test_api)
3429    with open(tracker_filename, 'w') as tracker_fp:
3430      tracker_fp.write(etag)
3431    try:
3432      boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
3433      with SetBotoConfigForTest([boto_config_for_test]):
3434        stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
3435                                return_stderr=True)
3436        self.assertNotIn('Resuming download', stderr)
3437        # Ensure the file was overwritten.
3438        with open(fpath, 'r') as in_fp:
3439          contents = in_fp.read()
3440          self.assertEqual(
3441              contents, 'efgh' * ONE_KIB,
3442              'File not overwritten when it should have been '
3443              'due to a non-matching tracker file.')
3444        self.assertFalse(os.path.isfile(tracker_filename))
3445    finally:
3446      if os.path.exists(tracker_filename):
3447        os.unlink(tracker_filename)
3448
3449  def test_cp_double_gzip(self):
3450    """Tests that upload and download of a doubly-gzipped file succeeds."""
3451    bucket_uri = self.CreateBucket()
3452    fpath = self.CreateTempFile(file_name='looks-zipped.gz', contents=b'foo')
3453    self.RunGsUtil([
3454        '-h', 'content-type:application/gzip', 'cp', '-Z',
3455        suri(fpath),
3456        suri(bucket_uri, 'foo')
3457    ])
3458    self.RunGsUtil(['cp', suri(bucket_uri, 'foo'), fpath])
3459
3460  @SkipForS3('No compressed transport encoding support for S3.')
3461  @SkipForXML('No compressed transport encoding support for the XML API.')
3462  @SequentialAndParallelTransfer
3463  def test_cp_double_gzip_transport_encoded(self):
3464    """Tests that upload and download of a doubly-gzipped file succeeds."""
3465    bucket_uri = self.CreateBucket()
3466    fpath = self.CreateTempFile(file_name='looks-zipped.gz', contents=b'foo')
3467    stderr = self.RunGsUtil([
3468        '-D', '-h', 'content-type:application/gzip', 'cp', '-J',
3469        suri(fpath),
3470        suri(bucket_uri, 'foo')
3471    ],
3472                            return_stderr=True)
3473    # Ensure the progress logger sees a gzip encoding.
3474    self.assertIn('send: Using gzip transport encoding for the request.',
3475                  stderr)
3476    self.RunGsUtil(['cp', suri(bucket_uri, 'foo'), fpath])
3477
3478  @SequentialAndParallelTransfer
3479  def test_cp_resumable_download_gzip(self):
3480    """Tests that download can be resumed successfully with a gzipped file."""
3481    # Generate some reasonably incompressible data.  This compresses to a bit
3482    # around 128K in practice, but we assert specifically below that it is
3483    # larger than self.halt_size to guarantee that we can halt the download
3484    # partway through.
3485    object_uri = self.CreateObject()
3486    random.seed(0)
3487    contents = str([
3488        random.choice(string.ascii_letters) for _ in xrange(self.halt_size)
3489    ]).encode('ascii')
3490    random.seed()  # Reset the seed for any other tests.
3491    fpath1 = self.CreateTempFile(file_name='unzipped.txt', contents=contents)
3492    self.RunGsUtil(['cp', '-z', 'txt', suri(fpath1), suri(object_uri)])
3493
3494    # Use @Retry as hedge against bucket listing eventual consistency.
3495    @Retry(AssertionError, tries=3, timeout_secs=1)
3496    def _GetObjectSize():
3497      stdout = self.RunGsUtil(['du', suri(object_uri)], return_stdout=True)
3498      size_match = re.search(r'(\d+)\s+.*', stdout)
3499      self.assertIsNotNone(size_match, 'Could not get object size')
3500      self.assertEqual(len(size_match.groups()), 1,
3501                       'Did not match regex for exactly one object size.')
3502      return long(size_match.group(1))
3503
3504    object_size = _GetObjectSize()
3505    self.assertGreaterEqual(
3506        object_size, self.halt_size,
3507        'Compresed object size was not large enough to '
3508        'allow for a halted download, so the test results '
3509        'would be invalid. Please increase the compressed '
3510        'object size in the test.')
3511    fpath2 = self.CreateTempFile()
3512    test_callback_file = self.CreateTempFile(
3513        contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5)))
3514
3515    boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
3516    with SetBotoConfigForTest([boto_config_for_test]):
3517      stderr = self.RunGsUtil([
3518          'cp', '--testcallbackfile', test_callback_file,
3519          suri(object_uri),
3520          suri(fpath2)
3521      ],
3522                              return_stderr=True,
3523                              expected_status=1)
3524      self.assertIn('Artifically halting download.', stderr)
3525      self.assertIn('Downloading to temp gzip filename', stderr)
3526
3527      # Tracker files will have different names depending on if we are
3528      # downloading sequentially or in parallel.
3529      sliced_download_threshold = HumanReadableToBytes(
3530          boto.config.get('GSUtil', 'sliced_object_download_threshold',
3531                          DEFAULT_SLICED_OBJECT_DOWNLOAD_THRESHOLD))
3532      sliced_download = (len(contents) > sliced_download_threshold and
3533                         sliced_download_threshold > 0 and
3534                         UsingCrcmodExtension())
3535      if sliced_download:
3536        trackerfile_type = TrackerFileType.SLICED_DOWNLOAD
3537      else:
3538        trackerfile_type = TrackerFileType.DOWNLOAD
3539      tracker_filename = GetTrackerFilePath(StorageUrlFromString(fpath2),
3540                                            trackerfile_type, self.test_api)
3541
3542      # We should have a temporary gzipped file, a tracker file, and no
3543      # final file yet.
3544      self.assertTrue(os.path.isfile(tracker_filename))
3545      self.assertTrue(os.path.isfile('%s_.gztmp' % fpath2))
3546      stderr = self.RunGsUtil(
3547          ['cp', suri(object_uri), suri(fpath2)], return_stderr=True)
3548      self.assertIn('Resuming download', stderr)
3549      with open(fpath2, 'rb') as f:
3550        self.assertEqual(f.read(), contents, 'File contents did not match.')
3551      self.assertFalse(os.path.isfile(tracker_filename))
3552      self.assertFalse(os.path.isfile('%s_.gztmp' % fpath2))
3553
3554  def _GetFaviconFile(self):
3555    # Make a temp file from favicon.ico.gz. Finding the location of our test
3556    # data varies depending on how/where gsutil was installed, so we get the
3557    # data via pkgutil and use this workaround.
3558    if not hasattr(self, 'test_data_favicon_file'):
3559      contents = pkgutil.get_data('gslib', 'tests/test_data/favicon.ico.gz')
3560      self.test_data_favicon_file = self.CreateTempFile(contents=contents)
3561    return self.test_data_favicon_file
3562
3563  def test_cp_download_transfer_encoded(self):
3564    """Tests chunked transfer encoded download handling.
3565
3566    Tests that download works correctly with a gzipped chunked transfer-encoded
3567    object (which therefore lacks Content-Length) of a size that gets fetched
3568    in a single chunk (exercising downloading of objects lacking a length
3569    response header).
3570    """
3571    # Upload a file / content-encoding / content-type that triggers this flow.
3572    # Note: We need to use the file with pre-zipped format and manually set the
3573    # content-encoding and content-type because the Python gzip module (used by
3574    # gsutil cp -Z) won't reproduce the bytes that trigger this problem.
3575    bucket_uri = self.CreateBucket()
3576    object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo')
3577    input_filename = self._GetFaviconFile()
3578    self.RunGsUtil([
3579        '-h', 'Content-Encoding:gzip', '-h', 'Content-Type:image/x-icon', 'cp',
3580        suri(input_filename),
3581        suri(object_uri)
3582    ])
3583    # Compute the MD5 of the uncompressed bytes.
3584    with gzip.open(input_filename) as fp:
3585      hash_dict = {'md5': hashlib.md5()}
3586      hashing_helper.CalculateHashesFromContents(fp, hash_dict)
3587      in_file_md5 = hash_dict['md5'].digest()
3588
3589    # Downloading this file triggers the flow.
3590    fpath2 = self.CreateTempFile()
3591    self.RunGsUtil(['cp', suri(object_uri), suri(fpath2)])
3592    # Compute MD5 of the downloaded (uncompressed) file, and validate it.
3593    with open(fpath2, 'rb') as fp:
3594      hash_dict = {'md5': hashlib.md5()}
3595      hashing_helper.CalculateHashesFromContents(fp, hash_dict)
3596      out_file_md5 = hash_dict['md5'].digest()
3597    self.assertEqual(in_file_md5, out_file_md5)
3598
3599  @SequentialAndParallelTransfer
3600  def test_cp_resumable_download_check_hashes_never(self):
3601    """Tests that resumble downloads work with check_hashes = never."""
3602    bucket_uri = self.CreateBucket()
3603    contents = b'abcd' * self.halt_size
3604    object_uri = self.CreateObject(bucket_uri=bucket_uri,
3605                                   object_name='foo',
3606                                   contents=contents)
3607    fpath = self.CreateTempFile()
3608    test_callback_file = self.CreateTempFile(
3609        contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5)))
3610
3611    boto_config_for_test = [('GSUtil', 'resumable_threshold', str(ONE_KIB)),
3612                            ('GSUtil', 'check_hashes', 'never')]
3613    with SetBotoConfigForTest(boto_config_for_test):
3614      stderr = self.RunGsUtil([
3615          'cp', '--testcallbackfile', test_callback_file,
3616          suri(object_uri), fpath
3617      ],
3618                              expected_status=1,
3619                              return_stderr=True)
3620      self.assertIn('Artifically halting download.', stderr)
3621      stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
3622                              return_stderr=True)
3623      self.assertIn('Resuming download', stderr)
3624      self.assertIn('Found no hashes to validate object downloaded', stderr)
3625      with open(fpath, 'rb') as f:
3626        self.assertEqual(f.read(), contents, 'File contents did not match.')
3627
3628  @SkipForS3('No resumable upload support for S3.')
3629  def test_cp_resumable_upload_bucket_deleted(self):
3630    """Tests that a not found exception is raised if bucket no longer exists."""
3631    bucket_uri = self.CreateBucket()
3632    fpath = self.CreateTempFile(contents=b'a' * 2 * ONE_KIB)
3633    boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
3634    test_callback_file = self.CreateTempFile(contents=pickle.dumps(
3635        _DeleteBucketThenStartOverCopyCallbackHandler(5, bucket_uri)))
3636
3637    with SetBotoConfigForTest([boto_config_for_test]):
3638      stderr = self.RunGsUtil([
3639          'cp', '--testcallbackfile', test_callback_file, fpath,
3640          suri(bucket_uri)
3641      ],
3642                              return_stderr=True,
3643                              expected_status=1)
3644    self.assertIn('Deleting bucket', stderr)
3645    self.assertIn('bucket does not exist', stderr)
3646
3647  @SkipForS3('No sliced download support for S3.')
3648  def test_cp_sliced_download(self):
3649    """Tests that sliced object download works in the general case."""
3650    bucket_uri = self.CreateBucket()
3651    object_uri = self.CreateObject(bucket_uri=bucket_uri,
3652                                   object_name='foo',
3653                                   contents=b'abc' * ONE_KIB)
3654    fpath = self.CreateTempFile()
3655
3656    # Force fast crcmod to return True to test the basic sliced download
3657    # scenario, ensuring that if the user installs crcmod, it will work.
3658    boto_config_for_test = [
3659        ('GSUtil', 'resumable_threshold', str(ONE_KIB)),
3660        ('GSUtil', 'test_assume_fast_crcmod', 'True'),
3661        ('GSUtil', 'sliced_object_download_threshold', str(ONE_KIB)),
3662        ('GSUtil', 'sliced_object_download_max_components', '3')
3663    ]
3664
3665    with SetBotoConfigForTest(boto_config_for_test):
3666      self.RunGsUtil(['cp', suri(object_uri), fpath])
3667
3668      # Each tracker file should have been deleted.
3669      tracker_filenames = GetSlicedDownloadTrackerFilePaths(
3670          StorageUrlFromString(fpath), self.test_api)
3671      for tracker_filename in tracker_filenames:
3672        self.assertFalse(os.path.isfile(tracker_filename))
3673
3674      with open(fpath, 'rb') as f:
3675        self.assertEqual(f.read(), b'abc' * ONE_KIB, 'File contents differ')
3676
3677  @unittest.skipUnless(UsingCrcmodExtension(),
3678                       'Sliced download requires fast crcmod.')
3679  @SkipForS3('No sliced download support for S3.')
3680  def test_cp_unresumable_sliced_download(self):
3681    """Tests sliced download works when resumability is disabled."""
3682    bucket_uri = self.CreateBucket()
3683    object_uri = self.CreateObject(bucket_uri=bucket_uri,
3684                                   object_name='foo',
3685                                   contents=b'abcd' * self.halt_size)
3686    fpath = self.CreateTempFile()
3687    test_callback_file = self.CreateTempFile(
3688        contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5)))
3689
3690    boto_config_for_test = [
3691        ('GSUtil', 'resumable_threshold', str(self.halt_size * 5)),
3692        ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)),
3693        ('GSUtil', 'sliced_object_download_max_components', '4')
3694    ]
3695
3696    with SetBotoConfigForTest(boto_config_for_test):
3697      stderr = self.RunGsUtil([
3698          'cp', '--testcallbackfile', test_callback_file,
3699          suri(object_uri),
3700          suri(fpath)
3701      ],
3702                              return_stderr=True,
3703                              expected_status=1)
3704      self.assertIn('not downloaded successfully', stderr)
3705      # Temporary download file should exist.
3706      self.assertTrue(os.path.isfile(fpath + '_.gstmp'))
3707
3708      # No tracker files should exist.
3709      tracker_filenames = GetSlicedDownloadTrackerFilePaths(
3710          StorageUrlFromString(fpath), self.test_api)
3711      for tracker_filename in tracker_filenames:
3712        self.assertFalse(os.path.isfile(tracker_filename))
3713
3714    # Perform the entire download, without resuming.
3715    with SetBotoConfigForTest(boto_config_for_test):
3716      stderr = self.RunGsUtil(
3717          ['cp', suri(object_uri), suri(fpath)], return_stderr=True)
3718      self.assertNotIn('Resuming download', stderr)
3719      # Temporary download file should have been deleted.
3720      self.assertFalse(os.path.isfile(fpath + '_.gstmp'))
3721      with open(fpath, 'rb') as f:
3722        self.assertEqual(f.read(), b'abcd' * self.halt_size,
3723                         'File contents differ')
3724
3725  @unittest.skipUnless(UsingCrcmodExtension(),
3726                       'Sliced download requires fast crcmod.')
3727  @SkipForS3('No sliced download support for S3.')
3728  def test_cp_sliced_download_resume(self):
3729    """Tests that sliced object download is resumable."""
3730    bucket_uri = self.CreateBucket()
3731    object_uri = self.CreateObject(bucket_uri=bucket_uri,
3732                                   object_name='foo',
3733                                   contents=b'abc' * self.halt_size)
3734    fpath = self.CreateTempFile()
3735    test_callback_file = self.CreateTempFile(
3736        contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5)))
3737
3738    boto_config_for_test = [
3739        ('GSUtil', 'resumable_threshold', str(self.halt_size)),
3740        ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)),
3741        ('GSUtil', 'sliced_object_download_max_components', '3')
3742    ]
3743
3744    with SetBotoConfigForTest(boto_config_for_test):
3745      stderr = self.RunGsUtil([
3746          'cp', '--testcallbackfile', test_callback_file,
3747          suri(object_uri),
3748          suri(fpath)
3749      ],
3750                              return_stderr=True,
3751                              expected_status=1)
3752      self.assertIn('not downloaded successfully', stderr)
3753
3754      # Each tracker file should exist.
3755      tracker_filenames = GetSlicedDownloadTrackerFilePaths(
3756          StorageUrlFromString(fpath), self.test_api)
3757      for tracker_filename in tracker_filenames:
3758        self.assertTrue(os.path.isfile(tracker_filename))
3759
3760      stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
3761                              return_stderr=True)
3762      self.assertIn('Resuming download', stderr)
3763
3764      # Each tracker file should have been deleted.
3765      tracker_filenames = GetSlicedDownloadTrackerFilePaths(
3766          StorageUrlFromString(fpath), self.test_api)
3767      for tracker_filename in tracker_filenames:
3768        self.assertFalse(os.path.isfile(tracker_filename))
3769
3770      with open(fpath, 'rb') as f:
3771        self.assertEqual(f.read(), b'abc' * self.halt_size,
3772                         'File contents differ')
3773
3774  @unittest.skipUnless(UsingCrcmodExtension(),
3775                       'Sliced download requires fast crcmod.')
3776  @SkipForS3('No sliced download support for S3.')
3777  def test_cp_sliced_download_partial_resume(self):
3778    """Test sliced download resumability when some components are finished."""
3779    bucket_uri = self.CreateBucket()
3780    object_uri = self.CreateObject(bucket_uri=bucket_uri,
3781                                   object_name='foo',
3782                                   contents=b'abc' * self.halt_size)
3783    fpath = self.CreateTempFile()
3784    test_callback_file = self.CreateTempFile(
3785        contents=pickle.dumps(HaltOneComponentCopyCallbackHandler(5)))
3786
3787    boto_config_for_test = [
3788        ('GSUtil', 'resumable_threshold', str(self.halt_size)),
3789        ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)),
3790        ('GSUtil', 'sliced_object_download_max_components', '3')
3791    ]
3792
3793    with SetBotoConfigForTest(boto_config_for_test):
3794      stderr = self.RunGsUtil([
3795          'cp', '--testcallbackfile', test_callback_file,
3796          suri(object_uri),
3797          suri(fpath)
3798      ],
3799                              return_stderr=True,
3800                              expected_status=1)
3801      self.assertIn('not downloaded successfully', stderr)
3802
3803      # Each tracker file should exist.
3804      tracker_filenames = GetSlicedDownloadTrackerFilePaths(
3805          StorageUrlFromString(fpath), self.test_api)
3806      for tracker_filename in tracker_filenames:
3807        self.assertTrue(os.path.isfile(tracker_filename))
3808
3809      stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
3810                              return_stderr=True)
3811      self.assertIn('Resuming download', stderr)
3812      self.assertIn('Download already complete', stderr)
3813
3814      # Each tracker file should have been deleted.
3815      tracker_filenames = GetSlicedDownloadTrackerFilePaths(
3816          StorageUrlFromString(fpath), self.test_api)
3817      for tracker_filename in tracker_filenames:
3818        self.assertFalse(os.path.isfile(tracker_filename))
3819
3820      with open(fpath, 'rb') as f:
3821        self.assertEqual(f.read(), b'abc' * self.halt_size,
3822                         'File contents differ')
3823
3824  @unittest.skipUnless(UsingCrcmodExtension(),
3825                       'Sliced download requires fast crcmod.')
3826  @SkipForS3('No sliced download support for S3.')
3827  def test_cp_sliced_download_resume_content_differs(self):
3828    """Tests differing file contents are detected by sliced downloads."""
3829    bucket_uri = self.CreateBucket()
3830    object_uri = self.CreateObject(bucket_uri=bucket_uri,
3831                                   object_name='foo',
3832                                   contents=b'abc' * self.halt_size)
3833    fpath = self.CreateTempFile(contents=b'')
3834    test_callback_file = self.CreateTempFile(
3835        contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5)))
3836
3837    boto_config_for_test = [
3838        ('GSUtil', 'resumable_threshold', str(self.halt_size)),
3839        ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)),
3840        ('GSUtil', 'sliced_object_download_max_components', '3')
3841    ]
3842
3843    with SetBotoConfigForTest(boto_config_for_test):
3844      stderr = self.RunGsUtil([
3845          'cp', '--testcallbackfile', test_callback_file,
3846          suri(object_uri),
3847          suri(fpath)
3848      ],
3849                              return_stderr=True,
3850                              expected_status=1)
3851      self.assertIn('not downloaded successfully', stderr)
3852
3853      # Temporary download file should exist.
3854      self.assertTrue(os.path.isfile(fpath + '_.gstmp'))
3855
3856      # Each tracker file should exist.
3857      tracker_filenames = GetSlicedDownloadTrackerFilePaths(
3858          StorageUrlFromString(fpath), self.test_api)
3859      for tracker_filename in tracker_filenames:
3860        self.assertTrue(os.path.isfile(tracker_filename))
3861
3862      with open(fpath + '_.gstmp', 'r+b') as f:
3863        f.write(b'altered file contents')
3864
3865      stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
3866                              return_stderr=True,
3867                              expected_status=1)
3868      self.assertIn('Resuming download', stderr)
3869      self.assertIn('doesn\'t match cloud-supplied digest', stderr)
3870      self.assertIn('HashMismatchException: crc32c', stderr)
3871
3872      # Each tracker file should have been deleted.
3873      tracker_filenames = GetSlicedDownloadTrackerFilePaths(
3874          StorageUrlFromString(fpath), self.test_api)
3875      for tracker_filename in tracker_filenames:
3876        self.assertFalse(os.path.isfile(tracker_filename))
3877
3878      # Temporary file should have been deleted due to hash mismatch.
3879      self.assertFalse(os.path.isfile(fpath + '_.gstmp'))
3880      # Final file should not exist.
3881      self.assertFalse(os.path.isfile(fpath))
3882
3883  @unittest.skipUnless(UsingCrcmodExtension(),
3884                       'Sliced download requires fast crcmod.')
3885  @SkipForS3('No sliced download support for S3.')
3886  def test_cp_sliced_download_component_size_changed(self):
3887    """Tests sliced download doesn't break when the boto config changes.
3888
3889    If the number of components used changes cross-process, the download should
3890    be restarted.
3891    """
3892    bucket_uri = self.CreateBucket()
3893    object_uri = self.CreateObject(bucket_uri=bucket_uri,
3894                                   object_name='foo',
3895                                   contents=b'abcd' * self.halt_size)
3896    fpath = self.CreateTempFile()
3897    test_callback_file = self.CreateTempFile(
3898        contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5)))
3899
3900    boto_config_for_test = [
3901        ('GSUtil', 'resumable_threshold', str(self.halt_size)),
3902        ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)),
3903        ('GSUtil', 'sliced_object_download_component_size',
3904         str(self.halt_size // 4)),
3905        ('GSUtil', 'sliced_object_download_max_components', '4')
3906    ]
3907
3908    with SetBotoConfigForTest(boto_config_for_test):
3909      stderr = self.RunGsUtil([
3910          'cp', '--testcallbackfile', test_callback_file,
3911          suri(object_uri),
3912          suri(fpath)
3913      ],
3914                              return_stderr=True,
3915                              expected_status=1)
3916      self.assertIn('not downloaded successfully', stderr)
3917
3918    boto_config_for_test = [
3919        ('GSUtil', 'resumable_threshold', str(self.halt_size)),
3920        ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)),
3921        ('GSUtil', 'sliced_object_download_component_size',
3922         str(self.halt_size // 2)),
3923        ('GSUtil', 'sliced_object_download_max_components', '2')
3924    ]
3925
3926    with SetBotoConfigForTest(boto_config_for_test):
3927      stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
3928                              return_stderr=True)
3929      self.assertIn('Sliced download tracker file doesn\'t match ', stderr)
3930      self.assertIn('Restarting download from scratch', stderr)
3931      self.assertNotIn('Resuming download', stderr)
3932
3933  @unittest.skipUnless(UsingCrcmodExtension(),
3934                       'Sliced download requires fast crcmod.')
3935  @SkipForS3('No sliced download support for S3.')
3936  def test_cp_sliced_download_disabled_cross_process(self):
3937    """Tests temporary files are not orphaned if sliced download is disabled.
3938
3939    Specifically, temporary files should be deleted when the corresponding
3940    non-sliced download is completed.
3941    """
3942    bucket_uri = self.CreateBucket()
3943    object_uri = self.CreateObject(bucket_uri=bucket_uri,
3944                                   object_name='foo',
3945                                   contents=b'abcd' * self.halt_size)
3946    fpath = self.CreateTempFile()
3947    test_callback_file = self.CreateTempFile(
3948        contents=pickle.dumps(HaltingCopyCallbackHandler(False, 5)))
3949
3950    boto_config_for_test = [
3951        ('GSUtil', 'resumable_threshold', str(self.halt_size)),
3952        ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)),
3953        ('GSUtil', 'sliced_object_download_max_components', '4')
3954    ]
3955
3956    with SetBotoConfigForTest(boto_config_for_test):
3957      stderr = self.RunGsUtil([
3958          'cp', '--testcallbackfile', test_callback_file,
3959          suri(object_uri),
3960          suri(fpath)
3961      ],
3962                              return_stderr=True,
3963                              expected_status=1)
3964      self.assertIn('not downloaded successfully', stderr)
3965      # Temporary download file should exist.
3966      self.assertTrue(os.path.isfile(fpath + '_.gstmp'))
3967
3968      # Each tracker file should exist.
3969      tracker_filenames = GetSlicedDownloadTrackerFilePaths(
3970          StorageUrlFromString(fpath), self.test_api)
3971      for tracker_filename in tracker_filenames:
3972        self.assertTrue(os.path.isfile(tracker_filename))
3973
3974    # Disable sliced downloads by increasing the threshold
3975    boto_config_for_test = [
3976        ('GSUtil', 'resumable_threshold', str(self.halt_size)),
3977        ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size * 5)),
3978        ('GSUtil', 'sliced_object_download_max_components', '4')
3979    ]
3980
3981    with SetBotoConfigForTest(boto_config_for_test):
3982      stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
3983                              return_stderr=True)
3984      self.assertNotIn('Resuming download', stderr)
3985      # Temporary download file should have been deleted.
3986      self.assertFalse(os.path.isfile(fpath + '_.gstmp'))
3987
3988      # Each tracker file should have been deleted.
3989      for tracker_filename in tracker_filenames:
3990        self.assertFalse(os.path.isfile(tracker_filename))
3991      with open(fpath, 'rb') as f:
3992        self.assertEqual(f.read(), b'abcd' * self.halt_size)
3993
3994  @SkipForS3('No resumable upload support for S3.')
3995  def test_cp_resumable_upload_start_over_http_error(self):
3996    for start_over_error in (
3997        403,  # If user doesn't have storage.buckets.get access to dest bucket.
3998        404,  # If the dest bucket exists, but the dest object does not.
3999        410):  # If the service tells us to restart the upload from scratch.
4000      self.start_over_error_test_helper(start_over_error)
4001
4002  def start_over_error_test_helper(self, http_error_num):
4003    bucket_uri = self.CreateBucket()
4004    # The object contents need to be fairly large to avoid the race condition
4005    # where the contents finish uploading before we artifically halt the copy.
4006    rand_chars = get_random_ascii_chars(size=(ONE_MIB * 4))
4007    fpath = self.CreateTempFile(contents=rand_chars)
4008    boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
4009    if self.test_api == ApiSelector.JSON:
4010      test_callback_file = self.CreateTempFile(
4011          contents=pickle.dumps(_JSONForceHTTPErrorCopyCallbackHandler(5, 404)))
4012    elif self.test_api == ApiSelector.XML:
4013      test_callback_file = self.CreateTempFile(contents=pickle.dumps(
4014          _XMLResumableUploadStartOverCopyCallbackHandler(5)))
4015
4016    with SetBotoConfigForTest([boto_config_for_test]):
4017      stderr = self.RunGsUtil([
4018          'cp', '--testcallbackfile', test_callback_file, fpath,
4019          suri(bucket_uri)
4020      ],
4021                              return_stderr=True)
4022      self.assertIn('Restarting upload of', stderr)
4023
4024  def test_cp_minus_c(self):
4025    bucket_uri = self.CreateBucket()
4026    object_uri = self.CreateObject(bucket_uri=bucket_uri,
4027                                   object_name='foo',
4028                                   contents=b'foo')
4029    self.RunGsUtil([
4030        'cp', '-c',
4031        suri(bucket_uri) + '/foo2',
4032        suri(object_uri),
4033        suri(bucket_uri) + '/dir/'
4034    ],
4035                   expected_status=1)
4036    self.RunGsUtil(['stat', '%s/dir/foo' % suri(bucket_uri)])
4037
4038  def test_rewrite_cp(self):
4039    """Tests the JSON Rewrite API."""
4040    if self.test_api == ApiSelector.XML:
4041      return unittest.skip('Rewrite API is only supported in JSON.')
4042    bucket_uri = self.CreateBucket()
4043    object_uri = self.CreateObject(bucket_uri=bucket_uri,
4044                                   object_name='foo',
4045                                   contents=b'bar')
4046    gsutil_api = GcsJsonApi(BucketStorageUri, logging.getLogger(),
4047                            DiscardMessagesQueue(), self.default_provider)
4048    key = object_uri.get_key()
4049    src_obj_metadata = apitools_messages.Object(name=key.name,
4050                                                bucket=key.bucket.name,
4051                                                contentType=key.content_type)
4052    dst_obj_metadata = apitools_messages.Object(
4053        bucket=src_obj_metadata.bucket,
4054        name=self.MakeTempName('object'),
4055        contentType=src_obj_metadata.contentType)
4056    gsutil_api.CopyObject(src_obj_metadata, dst_obj_metadata)
4057    self.assertEqual(
4058        gsutil_api.GetObjectMetadata(src_obj_metadata.bucket,
4059                                     src_obj_metadata.name,
4060                                     fields=['customerEncryption',
4061                                             'md5Hash']).md5Hash,
4062        gsutil_api.GetObjectMetadata(dst_obj_metadata.bucket,
4063                                     dst_obj_metadata.name,
4064                                     fields=['customerEncryption',
4065                                             'md5Hash']).md5Hash,
4066        'Error: Rewritten object\'s hash doesn\'t match source object.')
4067
4068  def test_rewrite_cp_resume(self):
4069    """Tests the JSON Rewrite API, breaking and resuming via a tracker file."""
4070    if self.test_api == ApiSelector.XML:
4071      return unittest.skip('Rewrite API is only supported in JSON.')
4072    bucket_uri = self.CreateBucket()
4073    # Second bucket needs to be a different storage class so the service
4074    # actually rewrites the bytes.
4075    bucket_uri2 = self.CreateBucket(
4076        storage_class='durable_reduced_availability')
4077    # maxBytesPerCall must be >= 1 MiB, so create an object > 2 MiB because we
4078    # need 2 response from the service: 1 success, 1 failure prior to
4079    # completion.
4080    object_uri = self.CreateObject(bucket_uri=bucket_uri,
4081                                   object_name='foo',
4082                                   contents=(b'12' * ONE_MIB) + b'bar',
4083                                   prefer_json_api=True)
4084    gsutil_api = GcsJsonApi(BucketStorageUri, logging.getLogger(),
4085                            DiscardMessagesQueue(), self.default_provider)
4086    key = object_uri.get_key()
4087    src_obj_metadata = apitools_messages.Object(name=key.name,
4088                                                bucket=key.bucket.name,
4089                                                contentType=key.content_type,
4090                                                etag=key.etag.strip('"\''))
4091    dst_obj_name = self.MakeTempName('object')
4092    dst_obj_metadata = apitools_messages.Object(
4093        bucket=bucket_uri2.bucket_name,
4094        name=dst_obj_name,
4095        contentType=src_obj_metadata.contentType)
4096    tracker_file_name = GetRewriteTrackerFilePath(src_obj_metadata.bucket,
4097                                                  src_obj_metadata.name,
4098                                                  dst_obj_metadata.bucket,
4099                                                  dst_obj_metadata.name,
4100                                                  self.test_api)
4101    try:
4102      try:
4103        gsutil_api.CopyObject(src_obj_metadata,
4104                              dst_obj_metadata,
4105                              progress_callback=HaltingRewriteCallbackHandler(
4106                                  ONE_MIB * 2).call,
4107                              max_bytes_per_call=ONE_MIB)
4108        self.fail('Expected RewriteHaltException.')
4109      except RewriteHaltException:
4110        pass
4111
4112      # Tracker file should be left over.
4113      self.assertTrue(os.path.exists(tracker_file_name))
4114
4115      # Now resume. Callback ensures we didn't start over.
4116      gsutil_api.CopyObject(
4117          src_obj_metadata,
4118          dst_obj_metadata,
4119          progress_callback=EnsureRewriteResumeCallbackHandler(ONE_MIB *
4120                                                               2).call,
4121          max_bytes_per_call=ONE_MIB)
4122
4123      # Copy completed; tracker file should be deleted.
4124      self.assertFalse(os.path.exists(tracker_file_name))
4125
4126      self.assertEqual(
4127          gsutil_api.GetObjectMetadata(src_obj_metadata.bucket,
4128                                       src_obj_metadata.name,
4129                                       fields=['customerEncryption',
4130                                               'md5Hash']).md5Hash,
4131          gsutil_api.GetObjectMetadata(dst_obj_metadata.bucket,
4132                                       dst_obj_metadata.name,
4133                                       fields=['customerEncryption',
4134                                               'md5Hash']).md5Hash,
4135          'Error: Rewritten object\'s hash doesn\'t match source object.')
4136    finally:
4137      # Clean up if something went wrong.
4138      DeleteTrackerFile(tracker_file_name)
4139
4140  def test_rewrite_cp_resume_source_changed(self):
4141    """Tests that Rewrite starts over when the source object has changed."""
4142    if self.test_api == ApiSelector.XML:
4143      return unittest.skip('Rewrite API is only supported in JSON.')
4144    bucket_uri = self.CreateBucket()
4145    # Second bucket needs to be a different storage class so the service
4146    # actually rewrites the bytes.
4147    bucket_uri2 = self.CreateBucket(
4148        storage_class='durable_reduced_availability')
4149    # maxBytesPerCall must be >= 1 MiB, so create an object > 2 MiB because we
4150    # need 2 response from the service: 1 success, 1 failure prior to
4151    # completion.
4152    object_uri = self.CreateObject(bucket_uri=bucket_uri,
4153                                   object_name='foo',
4154                                   contents=(b'12' * ONE_MIB) + b'bar',
4155                                   prefer_json_api=True)
4156    gsutil_api = GcsJsonApi(BucketStorageUri, logging.getLogger(),
4157                            DiscardMessagesQueue(), self.default_provider)
4158    key = object_uri.get_key()
4159    src_obj_metadata = apitools_messages.Object(name=key.name,
4160                                                bucket=key.bucket.name,
4161                                                contentType=key.content_type,
4162                                                etag=key.etag.strip('"\''))
4163    dst_obj_name = self.MakeTempName('object')
4164    dst_obj_metadata = apitools_messages.Object(
4165        bucket=bucket_uri2.bucket_name,
4166        name=dst_obj_name,
4167        contentType=src_obj_metadata.contentType)
4168    tracker_file_name = GetRewriteTrackerFilePath(src_obj_metadata.bucket,
4169                                                  src_obj_metadata.name,
4170                                                  dst_obj_metadata.bucket,
4171                                                  dst_obj_metadata.name,
4172                                                  self.test_api)
4173    try:
4174      try:
4175        gsutil_api.CopyObject(src_obj_metadata,
4176                              dst_obj_metadata,
4177                              progress_callback=HaltingRewriteCallbackHandler(
4178                                  ONE_MIB * 2).call,
4179                              max_bytes_per_call=ONE_MIB)
4180        self.fail('Expected RewriteHaltException.')
4181      except RewriteHaltException:
4182        pass
4183      # Overwrite the original object.
4184      object_uri2 = self.CreateObject(bucket_uri=bucket_uri,
4185                                      object_name='foo',
4186                                      contents=b'bar',
4187                                      prefer_json_api=True)
4188      key2 = object_uri2.get_key()
4189      src_obj_metadata2 = apitools_messages.Object(
4190          name=key2.name,
4191          bucket=key2.bucket.name,
4192          contentType=key2.content_type,
4193          etag=key2.etag.strip('"\''))
4194
4195      # Tracker file for original object should still exist.
4196      self.assertTrue(os.path.exists(tracker_file_name))
4197
4198      # Copy the new object.
4199      gsutil_api.CopyObject(src_obj_metadata2,
4200                            dst_obj_metadata,
4201                            max_bytes_per_call=ONE_MIB)
4202
4203      # Copy completed; original tracker file should be deleted.
4204      self.assertFalse(os.path.exists(tracker_file_name))
4205
4206      self.assertEqual(
4207          gsutil_api.GetObjectMetadata(src_obj_metadata2.bucket,
4208                                       src_obj_metadata2.name,
4209                                       fields=['customerEncryption',
4210                                               'md5Hash']).md5Hash,
4211          gsutil_api.GetObjectMetadata(dst_obj_metadata.bucket,
4212                                       dst_obj_metadata.name,
4213                                       fields=['customerEncryption',
4214                                               'md5Hash']).md5Hash,
4215          'Error: Rewritten object\'s hash doesn\'t match source object.')
4216    finally:
4217      # Clean up if something went wrong.
4218      DeleteTrackerFile(tracker_file_name)
4219
4220  def test_rewrite_cp_resume_command_changed(self):
4221    """Tests that Rewrite starts over when the arguments changed."""
4222    if self.test_api == ApiSelector.XML:
4223      return unittest.skip('Rewrite API is only supported in JSON.')
4224    bucket_uri = self.CreateBucket()
4225    # Second bucket needs to be a different storage class so the service
4226    # actually rewrites the bytes.
4227    bucket_uri2 = self.CreateBucket(
4228        storage_class='durable_reduced_availability')
4229    # maxBytesPerCall must be >= 1 MiB, so create an object > 2 MiB because we
4230    # need 2 response from the service: 1 success, 1 failure prior to
4231    # completion.
4232    object_uri = self.CreateObject(bucket_uri=bucket_uri,
4233                                   object_name='foo',
4234                                   contents=(b'12' * ONE_MIB) + b'bar',
4235                                   prefer_json_api=True)
4236    gsutil_api = GcsJsonApi(BucketStorageUri, logging.getLogger(),
4237                            DiscardMessagesQueue(), self.default_provider)
4238    key = object_uri.get_key()
4239    src_obj_metadata = apitools_messages.Object(name=key.name,
4240                                                bucket=key.bucket.name,
4241                                                contentType=key.content_type,
4242                                                etag=key.etag.strip('"\''))
4243    dst_obj_name = self.MakeTempName('object')
4244    dst_obj_metadata = apitools_messages.Object(
4245        bucket=bucket_uri2.bucket_name,
4246        name=dst_obj_name,
4247        contentType=src_obj_metadata.contentType)
4248    tracker_file_name = GetRewriteTrackerFilePath(src_obj_metadata.bucket,
4249                                                  src_obj_metadata.name,
4250                                                  dst_obj_metadata.bucket,
4251                                                  dst_obj_metadata.name,
4252                                                  self.test_api)
4253    try:
4254      try:
4255        gsutil_api.CopyObject(src_obj_metadata,
4256                              dst_obj_metadata,
4257                              canned_acl='private',
4258                              progress_callback=HaltingRewriteCallbackHandler(
4259                                  ONE_MIB * 2).call,
4260                              max_bytes_per_call=ONE_MIB)
4261        self.fail('Expected RewriteHaltException.')
4262      except RewriteHaltException:
4263        pass
4264
4265      # Tracker file for original object should still exist.
4266      self.assertTrue(os.path.exists(tracker_file_name))
4267
4268      # Copy the same object but with different call parameters.
4269      gsutil_api.CopyObject(src_obj_metadata,
4270                            dst_obj_metadata,
4271                            canned_acl='public-read',
4272                            max_bytes_per_call=ONE_MIB)
4273
4274      # Copy completed; original tracker file should be deleted.
4275      self.assertFalse(os.path.exists(tracker_file_name))
4276
4277      new_obj_metadata = gsutil_api.GetObjectMetadata(
4278          dst_obj_metadata.bucket,
4279          dst_obj_metadata.name,
4280          fields=['acl', 'customerEncryption', 'md5Hash'])
4281      self.assertEqual(
4282          gsutil_api.GetObjectMetadata(src_obj_metadata.bucket,
4283                                       src_obj_metadata.name,
4284                                       fields=['customerEncryption',
4285                                               'md5Hash']).md5Hash,
4286          new_obj_metadata.md5Hash,
4287          'Error: Rewritten object\'s hash doesn\'t match source object.')
4288      # New object should have a public-read ACL from the second command.
4289      found_public_acl = False
4290      for acl_entry in new_obj_metadata.acl:
4291        if acl_entry.entity == 'allUsers':
4292          found_public_acl = True
4293      self.assertTrue(found_public_acl,
4294                      'New object was not written with a public ACL.')
4295    finally:
4296      # Clean up if something went wrong.
4297      DeleteTrackerFile(tracker_file_name)
4298
4299  @unittest.skipIf(IS_WINDOWS, 'POSIX attributes not available on Windows.')
4300  @unittest.skipUnless(UsingCrcmodExtension(), 'Test requires fast crcmod.')
4301  def test_cp_preserve_posix_bucket_to_dir_no_errors(self):
4302    """Tests use of the -P flag with cp from a bucket to a local dir.
4303
4304    Specifically tests combinations of POSIX attributes in metadata that will
4305    pass validation.
4306    """
4307    bucket_uri = self.CreateBucket()
4308    tmpdir = self.CreateTempDir()
4309    TestCpMvPOSIXBucketToLocalNoErrors(self, bucket_uri, tmpdir, is_cp=True)
4310
4311  @unittest.skipIf(IS_WINDOWS, 'POSIX attributes not available on Windows.')
4312  def test_cp_preserve_posix_bucket_to_dir_errors(self):
4313    """Tests use of the -P flag with cp from a bucket to a local dir.
4314
4315    Specifically, combinations of POSIX attributes in metadata that will fail
4316    validation.
4317    """
4318    bucket_uri = self.CreateBucket()
4319    tmpdir = self.CreateTempDir()
4320
4321    obj = self.CreateObject(bucket_uri=bucket_uri,
4322                            object_name='obj',
4323                            contents=b'obj')
4324    TestCpMvPOSIXBucketToLocalErrors(self, bucket_uri, obj, tmpdir, is_cp=True)
4325
4326  @unittest.skipIf(IS_WINDOWS, 'POSIX attributes not available on Windows.')
4327  def test_cp_preseve_posix_dir_to_bucket_no_errors(self):
4328    """Tests use of the -P flag with cp from a local dir to a bucket."""
4329    bucket_uri = self.CreateBucket()
4330    TestCpMvPOSIXLocalToBucketNoErrors(self, bucket_uri, is_cp=True)
4331
4332  def test_cp_minus_s_to_non_cloud_dest_fails(self):
4333    """Test that cp -s operations to a non-cloud destination are prevented."""
4334    local_file = self.CreateTempFile(contents=b'foo')
4335    dest_dir = self.CreateTempDir()
4336    stderr = self.RunGsUtil(['cp', '-s', 'standard', local_file, dest_dir],
4337                            expected_status=1,
4338                            return_stderr=True)
4339    self.assertIn('Cannot specify storage class for a non-cloud destination:',
4340                  stderr)
4341
4342  # TODO: Remove @skip annotation from this test once we upgrade to the Boto
4343  # version that parses the storage class header for HEAD Object responses.
4344  @SkipForXML('Need Boto version > 2.46.1')
4345  def test_cp_specify_nondefault_storage_class(self):
4346    bucket_uri = self.CreateBucket()
4347    object_uri = self.CreateObject(bucket_uri=bucket_uri,
4348                                   object_name='foo',
4349                                   contents=b'foo')
4350    object2_suri = suri(object_uri) + 'bar'
4351    # Specify storage class name as mixed case here to ensure that it
4352    # gets normalized to uppercase (S3 would return an error otherwise), and
4353    # that using the normalized case is accepted by each API.
4354    nondefault_storage_class = {
4355        's3': 'Standard_iA',
4356        'gs': 'durable_REDUCED_availability'
4357    }
4358    storage_class = nondefault_storage_class[self.default_provider]
4359    self.RunGsUtil(['cp', '-s', storage_class, suri(object_uri), object2_suri])
4360    stdout = self.RunGsUtil(['stat', object2_suri], return_stdout=True)
4361    self.assertRegexpMatchesWithFlags(stdout,
4362                                      r'Storage class:\s+%s' % storage_class,
4363                                      flags=re.IGNORECASE)
4364
4365  @SkipForS3('Test uses gs-specific storage classes.')
4366  def test_cp_sets_correct_dest_storage_class(self):
4367    """Tests that object storage class is set correctly with and without -s."""
4368    # Use a non-default storage class as the default for the bucket.
4369    bucket_uri = self.CreateBucket(storage_class='nearline')
4370    # Ensure storage class is set correctly for a local-to-cloud copy.
4371    local_fname = 'foo-orig'
4372    local_fpath = self.CreateTempFile(contents=b'foo', file_name=local_fname)
4373    foo_cloud_suri = suri(bucket_uri) + '/' + local_fname
4374    self.RunGsUtil(['cp', '-s', 'standard', local_fpath, foo_cloud_suri])
4375    with SetBotoConfigForTest([('GSUtil', 'prefer_api', 'json')]):
4376      stdout = self.RunGsUtil(['stat', foo_cloud_suri], return_stdout=True)
4377    self.assertRegexpMatchesWithFlags(stdout,
4378                                      r'Storage class:\s+STANDARD',
4379                                      flags=re.IGNORECASE)
4380
4381    # Ensure storage class is set correctly for a cloud-to-cloud copy when no
4382    # destination storage class is specified.
4383    foo_nl_suri = suri(bucket_uri) + '/foo-nl'
4384    self.RunGsUtil(['cp', foo_cloud_suri, foo_nl_suri])
4385    # TODO: Remove with-clause after adding storage class parsing in Boto.
4386    with SetBotoConfigForTest([('GSUtil', 'prefer_api', 'json')]):
4387      stdout = self.RunGsUtil(['stat', foo_nl_suri], return_stdout=True)
4388    self.assertRegexpMatchesWithFlags(stdout,
4389                                      r'Storage class:\s+NEARLINE',
4390                                      flags=re.IGNORECASE)
4391
4392    # Ensure storage class is set correctly for a cloud-to-cloud copy when a
4393    # non-bucket-default storage class is specified.
4394    foo_std_suri = suri(bucket_uri) + '/foo-std'
4395    self.RunGsUtil(['cp', '-s', 'standard', foo_nl_suri, foo_std_suri])
4396    # TODO: Remove with-clause after adding storage class parsing in Boto.
4397    with SetBotoConfigForTest([('GSUtil', 'prefer_api', 'json')]):
4398      stdout = self.RunGsUtil(['stat', foo_std_suri], return_stdout=True)
4399    self.assertRegexpMatchesWithFlags(stdout,
4400                                      r'Storage class:\s+STANDARD',
4401                                      flags=re.IGNORECASE)
4402
4403  def authorize_project_to_use_testing_kms_key(
4404      self, key_name=testcase.KmsTestingResources.CONSTANT_KEY_NAME):
4405    # Make sure our keyRing and cryptoKey exist.
4406    keyring_fqn = self.kms_api.CreateKeyRing(
4407        PopulateProjectId(None),
4408        testcase.KmsTestingResources.KEYRING_NAME,
4409        location=testcase.KmsTestingResources.KEYRING_LOCATION)
4410    key_fqn = self.kms_api.CreateCryptoKey(keyring_fqn, key_name)
4411    # Make sure that the service account for our default project is authorized
4412    # to use our test KMS key.
4413    self.RunGsUtil(['kms', 'authorize', '-k', key_fqn])
4414    return key_fqn
4415
4416  @SkipForS3('Test uses gs-specific KMS encryption')
4417  def test_kms_key_correctly_applied_to_dst_obj_from_src_with_no_key(self):
4418    bucket_uri = self.CreateBucket()
4419    obj1_name = 'foo'
4420    obj2_name = 'bar'
4421    key_fqn = self.authorize_project_to_use_testing_kms_key()
4422
4423    # Create the unencrypted object, then copy it, specifying a KMS key for the
4424    # new object.
4425    obj_uri = self.CreateObject(bucket_uri=bucket_uri,
4426                                object_name=obj1_name,
4427                                contents=b'foo')
4428    with SetBotoConfigForTest([('GSUtil', 'encryption_key', key_fqn)]):
4429      self.RunGsUtil(
4430          ['cp', suri(obj_uri),
4431           '%s/%s' % (suri(bucket_uri), obj2_name)])
4432
4433    # Make sure the new object is encrypted with the specified KMS key.
4434    with SetBotoConfigForTest([('GSUtil', 'prefer_api', 'json')]):
4435      self.AssertObjectUsesCMEK('%s/%s' % (suri(bucket_uri), obj2_name),
4436                                key_fqn)
4437
4438  @SkipForS3('Test uses gs-specific KMS encryption')
4439  def test_kms_key_correctly_applied_to_dst_obj_from_local_file(self):
4440    bucket_uri = self.CreateBucket()
4441    fpath = self.CreateTempFile(contents=b'abcd')
4442    obj_name = 'foo'
4443    obj_suri = suri(bucket_uri) + '/' + obj_name
4444    key_fqn = self.authorize_project_to_use_testing_kms_key()
4445
4446    with SetBotoConfigForTest([('GSUtil', 'encryption_key', key_fqn)]):
4447      self.RunGsUtil(['cp', fpath, obj_suri])
4448
4449    with SetBotoConfigForTest([('GSUtil', 'prefer_api', 'json')]):
4450      self.AssertObjectUsesCMEK(obj_suri, key_fqn)
4451
4452  @SkipForS3('Test uses gs-specific KMS encryption')
4453  def test_kms_key_works_with_resumable_upload(self):
4454    resumable_threshold = 1024 * 1024  # 1M
4455    bucket_uri = self.CreateBucket()
4456    fpath = self.CreateTempFile(contents=b'a' * resumable_threshold)
4457    obj_name = 'foo'
4458    obj_suri = suri(bucket_uri) + '/' + obj_name
4459    key_fqn = self.authorize_project_to_use_testing_kms_key()
4460
4461    with SetBotoConfigForTest([('GSUtil', 'encryption_key', key_fqn),
4462                               ('GSUtil', 'resumable_threshold',
4463                                str(resumable_threshold))]):
4464      self.RunGsUtil(['cp', fpath, obj_suri])
4465
4466    with SetBotoConfigForTest([('GSUtil', 'prefer_api', 'json')]):
4467      self.AssertObjectUsesCMEK(obj_suri, key_fqn)
4468
4469  @SkipForS3('Test uses gs-specific KMS encryption')
4470  def test_kms_key_correctly_applied_to_dst_obj_from_src_with_diff_key(self):
4471    bucket_uri = self.CreateBucket()
4472    obj1_name = 'foo'
4473    obj2_name = 'bar'
4474    key1_fqn = self.authorize_project_to_use_testing_kms_key()
4475    key2_fqn = self.authorize_project_to_use_testing_kms_key(
4476        key_name=testcase.KmsTestingResources.CONSTANT_KEY_NAME2)
4477    obj1_suri = suri(
4478        self.CreateObject(bucket_uri=bucket_uri,
4479                          object_name=obj1_name,
4480                          contents=b'foo',
4481                          kms_key_name=key1_fqn))
4482
4483    # Copy the object to the same bucket, specifying a different key to be used.
4484    obj2_suri = '%s/%s' % (suri(bucket_uri), obj2_name)
4485    with SetBotoConfigForTest([('GSUtil', 'encryption_key', key2_fqn)]):
4486      self.RunGsUtil(['cp', obj1_suri, obj2_suri])
4487
4488    # Ensure the new object has the different key.
4489    with SetBotoConfigForTest([('GSUtil', 'prefer_api', 'json')]):
4490      self.AssertObjectUsesCMEK(obj2_suri, key2_fqn)
4491
4492  @SkipForS3('Test uses gs-specific KMS encryption')
4493  @SkipForXML('Copying KMS-encrypted objects prohibited with XML API')
4494  def test_kms_key_not_applied_to_nonkms_dst_obj_from_src_with_kms_key(self):
4495    bucket_uri = self.CreateBucket()
4496    obj1_name = 'foo'
4497    obj2_name = 'bar'
4498    key1_fqn = self.authorize_project_to_use_testing_kms_key()
4499    obj1_suri = suri(
4500        self.CreateObject(bucket_uri=bucket_uri,
4501                          object_name=obj1_name,
4502                          contents=b'foo',
4503                          kms_key_name=key1_fqn))
4504
4505    # Copy the object to the same bucket, not specifying any KMS key.
4506    obj2_suri = '%s/%s' % (suri(bucket_uri), obj2_name)
4507    self.RunGsUtil(['cp', obj1_suri, obj2_suri])
4508
4509    # Ensure the new object has no KMS key.
4510    with SetBotoConfigForTest([('GSUtil', 'prefer_api', 'json')]):
4511      self.AssertObjectUnencrypted(obj2_suri)
4512
4513  @unittest.skipUnless(
4514      IS_WINDOWS,
4515      'Only Windows paths need to be normalized to use backslashes instead of '
4516      'forward slashes.')
4517  def test_windows_path_with_back_and_forward_slash_is_normalized(self):
4518    # Prior to this test and its corresponding fix, running
4519    # `gsutil cp dir/./file gs://bucket` would result in an object whose name
4520    # was "dir/./file", rather than just "file", as Windows tried to split on
4521    # the path component separator "\" intead of "/".
4522    tmp_dir = self.CreateTempDir()
4523    self.CreateTempFile(tmpdir=tmp_dir, file_name='obj1', contents=b'foo')
4524    bucket_uri = self.CreateBucket()
4525    self.RunGsUtil(['cp', '%s\\./obj1' % tmp_dir, suri(bucket_uri)])
4526    # If the destination path was not created correctly, this stat call should
4527    # fail with a non-zero exit code because the specified object won't exist.
4528    self.RunGsUtil(['stat', '%s/obj1' % suri(bucket_uri)])
4529
4530  def test_cp_minus_m_streaming_upload(self):
4531    """Tests that cp -m - anything is disallowed."""
4532    stderr = self.RunGsUtil(['-m', 'cp', '-', 'file'],
4533                            return_stderr=True,
4534                            expected_status=1)
4535    self.assertIn(
4536        'CommandException: Cannot upload from a stream when using gsutil -m',
4537        stderr)
4538
4539
4540class TestCpUnitTests(testcase.GsUtilUnitTestCase):
4541  """Unit tests for gsutil cp."""
4542
4543  def testDownloadWithNoHashAvailable(self):
4544    """Tests a download with no valid server-supplied hash."""
4545    # S3 should have a special message for non-MD5 etags.
4546    bucket_uri = self.CreateBucket(provider='s3')
4547    object_uri = self.CreateObject(bucket_uri=bucket_uri, contents=b'foo')
4548    object_uri.get_key().etag = '12345'  # Not an MD5
4549    dst_dir = self.CreateTempDir()
4550
4551    log_handler = self.RunCommand('cp', [suri(object_uri), dst_dir],
4552                                  return_log_handler=True)
4553    warning_messages = log_handler.messages['warning']
4554    self.assertEquals(2, len(warning_messages))
4555    self.assertRegex(
4556        warning_messages[0], r'Non-MD5 etag \(12345\) present for key .*, '
4557        r'data integrity checks are not possible')
4558    self.assertIn('Integrity cannot be assured', warning_messages[1])
4559
4560  def test_object_and_prefix_same_name(self):
4561    bucket_uri = self.CreateBucket()
4562    object_uri = self.CreateObject(bucket_uri=bucket_uri,
4563                                   object_name='foo',
4564                                   contents=b'foo')
4565    self.CreateObject(bucket_uri=bucket_uri,
4566                      object_name='foo/bar',
4567                      contents=b'bar')
4568    fpath = self.CreateTempFile()
4569    # MockKey doesn't support hash_algs, so the MD5 will not match.
4570    with SetBotoConfigForTest([('GSUtil', 'check_hashes', 'never')]):
4571      self.RunCommand('cp', [suri(object_uri), fpath])
4572    with open(fpath, 'rb') as f:
4573      self.assertEqual(f.read(), b'foo')
4574
4575  def test_cp_upload_respects_no_hashes(self):
4576    bucket_uri = self.CreateBucket()
4577    fpath = self.CreateTempFile(contents=b'abcd')
4578    with SetBotoConfigForTest([('GSUtil', 'check_hashes', 'never')]):
4579      log_handler = self.RunCommand('cp', [fpath, suri(bucket_uri)],
4580                                    return_log_handler=True)
4581    warning_messages = log_handler.messages['warning']
4582    self.assertEquals(1, len(warning_messages))
4583    self.assertIn('Found no hashes to validate object upload',
4584                  warning_messages[0])
4585