1# -*- coding: utf-8 -*-
2# Copyright 2014 Google Inc. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""JSON gsutil Cloud API implementation for Google Cloud Storage."""
16
17from __future__ import absolute_import
18
19from contextlib import contextmanager
20import httplib
21import json
22import logging
23import socket
24import ssl
25import time
26import traceback
27
28from apitools.base.py import encoding
29from apitools.base.py import exceptions as apitools_exceptions
30from apitools.base.py import http_wrapper as apitools_http_wrapper
31from apitools.base.py import transfer as apitools_transfer
32from apitools.base.py.util import CalculateWaitForRetry
33
34from boto import config
35
36from gslib.cloud_api import AccessDeniedException
37from gslib.cloud_api import ArgumentException
38from gslib.cloud_api import BadRequestException
39from gslib.cloud_api import CloudApi
40from gslib.cloud_api import CryptoTuple
41from gslib.cloud_api import EncryptionException
42from gslib.cloud_api import NotEmptyException
43from gslib.cloud_api import NotFoundException
44from gslib.cloud_api import PreconditionException
45from gslib.cloud_api import Preconditions
46from gslib.cloud_api import PublishPermissionDeniedException
47from gslib.cloud_api import ResumableDownloadException
48from gslib.cloud_api import ResumableUploadAbortException
49from gslib.cloud_api import ResumableUploadException
50from gslib.cloud_api import ResumableUploadStartOverException
51from gslib.cloud_api import ServiceException
52from gslib.cloud_api_helper import ListToGetFields
53from gslib.cloud_api_helper import ValidateDstObjectMetadata
54from gslib.encryption_helper import Base64Sha256FromBase64EncryptionKey
55from gslib.encryption_helper import FindMatchingCryptoKey
56from gslib.gcs_json_credentials import CheckAndGetCredentials
57from gslib.gcs_json_credentials import GetCredentialStoreKeyDict
58from gslib.gcs_json_media import BytesTransferredContainer
59from gslib.gcs_json_media import DownloadCallbackConnectionClassFactory
60from gslib.gcs_json_media import HttpWithDownloadStream
61from gslib.gcs_json_media import HttpWithNoRetries
62from gslib.gcs_json_media import UploadCallbackConnectionClassFactory
63from gslib.gcs_json_media import WrapDownloadHttpRequest
64from gslib.gcs_json_media import WrapUploadHttpRequest
65from gslib.no_op_credentials import NoOpCredentials
66from gslib.progress_callback import ProgressCallbackWithTimeout
67from gslib.project_id import PopulateProjectId
68from gslib.third_party.storage_apitools import storage_v1_client as apitools_client
69from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages
70from gslib.tracker_file import DeleteTrackerFile
71from gslib.tracker_file import GetRewriteTrackerFilePath
72from gslib.tracker_file import HashRewriteParameters
73from gslib.tracker_file import ReadRewriteTrackerFile
74from gslib.tracker_file import WriteRewriteTrackerFile
75from gslib.translation_helper import CreateBucketNotFoundException
76from gslib.translation_helper import CreateNotFoundExceptionForObjectWrite
77from gslib.translation_helper import CreateObjectNotFoundException
78from gslib.translation_helper import DEFAULT_CONTENT_TYPE
79from gslib.translation_helper import PRIVATE_DEFAULT_OBJ_ACL
80from gslib.translation_helper import REMOVE_CORS_CONFIG
81from gslib.util import AddAcceptEncodingGzipIfNeeded
82from gslib.util import GetCertsFile
83from gslib.util import GetCredentialStoreFilename
84from gslib.util import GetJsonResumableChunkSize
85from gslib.util import GetMaxRetryDelay
86from gslib.util import GetNewHttp
87from gslib.util import GetNumRetries
88from gslib.util import GetPrintableExceptionString
89from gslib.util import JsonResumableChunkSizeDefined
90from gslib.util import LogAndHandleRetries
91from gslib.util import NUM_OBJECTS_PER_LIST_PAGE
92
93import httplib2
94import oauth2client
95from oauth2client.contrib import multistore_file
96
97
98# pylint: disable=invalid-name
99Notification = apitools_messages.Notification
100NotificationCustomAttributesValue = Notification.CustomAttributesValue
101NotificationAdditionalProperty = (NotificationCustomAttributesValue
102                                  .AdditionalProperty)
103
104
105# Implementation supports only 'gs' URLs, so provider is unused.
106# pylint: disable=unused-argument
107
108DEFAULT_GCS_JSON_VERSION = 'v1'
109
110NUM_BUCKETS_PER_LIST_PAGE = 1000
111
112TRANSLATABLE_APITOOLS_EXCEPTIONS = (apitools_exceptions.HttpError,
113                                    apitools_exceptions.StreamExhausted,
114                                    apitools_exceptions.TransferError,
115                                    apitools_exceptions.TransferInvalidError)
116
117# TODO: Distribute these exceptions better through apitools and here.
118# Right now, apitools is configured not to handle any exceptions on
119# uploads/downloads.
120# oauth2_client tries to JSON-decode the response, which can result
121# in a ValueError if the response was invalid. Until that is fixed in
122# oauth2_client, need to handle it here.
123HTTP_TRANSFER_EXCEPTIONS = (apitools_exceptions.TransferRetryError,
124                            apitools_exceptions.BadStatusCodeError,
125                            # TODO: Honor retry-after headers.
126                            apitools_exceptions.RetryAfterError,
127                            apitools_exceptions.RequestError,
128                            httplib.BadStatusLine,
129                            httplib.IncompleteRead,
130                            httplib.ResponseNotReady,
131                            httplib2.ServerNotFoundError,
132                            oauth2client.client.HttpAccessTokenRefreshError,
133                            socket.error,
134                            socket.gaierror,
135                            socket.timeout,
136                            ssl.SSLError,
137                            ValueError)
138
139# Fields requiring projection=full across all API calls.
140_ACL_FIELDS_SET = set(['acl', 'defaultObjectAcl', 'items/acl',
141                       'items/defaultObjectAcl', 'items/owner', 'owner'])
142
143# Fields that may be encrypted.
144_ENCRYPTED_HASHES_SET = set(['crc32c', 'md5Hash'])
145
146
147# During listing, if we attempt to decrypt an object but it has been removed,
148# we skip including the object in the listing.
149_SKIP_LISTING_OBJECT = 'skip'
150
151
152_INSUFFICIENT_OAUTH2_SCOPE_MESSAGE = (
153    'Insufficient OAuth2 scope to perform this operation.')
154
155
156class GcsJsonApi(CloudApi):
157  """Google Cloud Storage JSON implementation of gsutil Cloud API."""
158
159  def __init__(self, bucket_storage_uri_class, logger, status_queue,
160               provider=None, credentials=None, debug=0, trace_token=None,
161               perf_trace_token=None, user_project=None):
162    """Performs necessary setup for interacting with Google Cloud Storage.
163
164    Args:
165      bucket_storage_uri_class: Unused.
166      logger: logging.logger for outputting log messages.
167      status_queue: Queue for relaying status to UI.
168      provider: Unused.  This implementation supports only Google Cloud Storage.
169      credentials: Credentials to be used for interacting with Google Cloud
170                   Storage.
171      debug: Debug level for the API implementation (0..3).
172      trace_token: Trace token to pass to the API implementation.
173      perf_trace_token: Performance trace token to use when making API calls.
174      user_project: Project to be billed for this request.
175    """
176    # TODO: Plumb host_header for perfdiag / test_perfdiag.
177    # TODO: Add jitter to apitools' http_wrapper retry mechanism.
178    super(GcsJsonApi, self).__init__(
179        bucket_storage_uri_class, logger, status_queue, provider='gs',
180        debug=debug, trace_token=trace_token, perf_trace_token=perf_trace_token,
181        user_project=user_project)
182    no_op_credentials = False
183    if not credentials:
184      loaded_credentials = CheckAndGetCredentials(logger)
185
186      if not loaded_credentials:
187        loaded_credentials = NoOpCredentials()
188        no_op_credentials = True
189    else:
190      if isinstance(credentials, NoOpCredentials):
191        no_op_credentials = True
192
193    self.credentials = credentials or loaded_credentials
194
195    self.certs_file = GetCertsFile()
196
197    self.http = GetNewHttp()
198
199    # Re-use download and upload connections. This class is only called
200    # sequentially, but we can share TCP warmed-up connections across calls.
201    self.download_http = self._GetNewDownloadHttp()
202    self.upload_http = self._GetNewUploadHttp()
203    if self.credentials:
204      self.authorized_download_http = self.credentials.authorize(
205          self.download_http)
206      self.authorized_upload_http = self.credentials.authorize(self.upload_http)
207    else:
208      self.authorized_download_http = self.download_http
209      self.authorized_upload_http = self.upload_http
210
211    WrapDownloadHttpRequest(self.authorized_download_http)
212    WrapUploadHttpRequest(self.authorized_upload_http)
213
214    self.http_base = 'https://'
215    gs_json_host = config.get('Credentials', 'gs_json_host', None)
216    self.host_base = gs_json_host or 'www.googleapis.com'
217
218    if not gs_json_host:
219      gs_host = config.get('Credentials', 'gs_host', None)
220      if gs_host:
221        raise ArgumentException(
222            'JSON API is selected but gs_json_host is not configured, '
223            'while gs_host is configured to %s. Please also configure '
224            'gs_json_host and gs_json_port to match your desired endpoint.'
225            % gs_host)
226
227    gs_json_port = config.get('Credentials', 'gs_json_port', None)
228
229    if not gs_json_port:
230      gs_port = config.get('Credentials', 'gs_port', None)
231      if gs_port:
232        raise ArgumentException(
233            'JSON API is selected but gs_json_port is not configured, '
234            'while gs_port is configured to %s. Please also configure '
235            'gs_json_host and gs_json_port to match your desired endpoint.'
236            % gs_port)
237      self.host_port = ''
238    else:
239      self.host_port = ':' + config.get('Credentials', 'gs_json_port')
240
241    self.api_version = config.get('GSUtil', 'json_api_version',
242                                  DEFAULT_GCS_JSON_VERSION)
243    self.url_base = (self.http_base + self.host_base + self.host_port + '/' +
244                     'storage/' + self.api_version + '/')
245
246    credential_store_key_dict = GetCredentialStoreKeyDict(self.credentials,
247                                                          self.api_version)
248
249    self.credentials.set_store(
250        multistore_file.get_credential_storage_custom_key(
251            GetCredentialStoreFilename(), credential_store_key_dict))
252
253    self.num_retries = GetNumRetries()
254    self.max_retry_wait = GetMaxRetryDelay()
255
256    log_request = (debug >= 3)
257    log_response = (debug >= 3)
258
259    self.global_params = apitools_messages.StandardQueryParameters(
260        trace='token:%s' % trace_token) if trace_token else None
261    additional_http_headers = {}
262    self._AddPerfTraceTokenToHeaders(additional_http_headers)
263
264    self.api_client = apitools_client.StorageV1(
265        url=self.url_base, http=self.http, log_request=log_request,
266        log_response=log_response, credentials=self.credentials,
267        version=self.api_version, default_global_params=self.global_params,
268        additional_http_headers=additional_http_headers)
269
270    self.api_client.max_retry_wait = self.max_retry_wait
271    self.api_client.num_retries = self.num_retries
272    self.api_client.retry_func = LogAndHandleRetries(
273        status_queue=self.status_queue)
274
275    if no_op_credentials:
276      # This API key is not secret and is used to identify gsutil during
277      # anonymous requests.
278      self.api_client.AddGlobalParam('key',
279                                     u'AIzaSyDnacJHrKma0048b13sh8cgxNUwulubmJM')
280
281  def _AddPerfTraceTokenToHeaders(self, headers):
282    if self.perf_trace_token:
283      headers['cookie'] = self.perf_trace_token
284
285  def _GetNewDownloadHttp(self):
286    return GetNewHttp(http_class=HttpWithDownloadStream)
287
288  def _GetNewUploadHttp(self):
289    """Returns an upload-safe Http object (by disabling httplib2 retries)."""
290    return GetNewHttp(http_class=HttpWithNoRetries)
291
292  def _FieldsContainsAclField(self, fields=None):
293    """Checks Returns true if ACL related values are in fields set.
294
295    Args:
296      fields: list or set of fields. May be in GET ['acl'] or List
297          ['items/acl'] call format.
298
299    Returns:
300      True if an ACL value is requested in the input fields, False otherwise.
301    """
302    return fields is None or _ACL_FIELDS_SET.intersection(set(fields))
303
304  def _ValidateEncryptionFields(self, fields=None):
305    """Ensures customerEncryption field is included if hashes are requested."""
306    if (fields
307        and _ENCRYPTED_HASHES_SET.intersection(set(fields))
308        and 'customerEncryption' not in fields):
309      raise ArgumentException(
310          'gsutil client error: customerEncryption must be included when '
311          'requesting potentially encrypted fields %s' % _ENCRYPTED_HASHES_SET)
312
313  def GetBucketIamPolicy(self, bucket_name, provider=None, fields=None):
314    apitools_request = apitools_messages.StorageBucketsGetIamPolicyRequest(
315        bucket=bucket_name, userProject=self.user_project)
316    global_params = apitools_messages.StandardQueryParameters()
317    if fields:
318      global_params.fields = ','.join(set(fields))
319    try:
320      return self.api_client.buckets.GetIamPolicy(
321          apitools_request,
322          global_params=global_params)
323    except TRANSLATABLE_APITOOLS_EXCEPTIONS as e:
324      self._TranslateExceptionAndRaise(e, bucket_name=bucket_name)
325
326  def GetObjectIamPolicy(self, bucket_name, object_name,
327                         generation=None, provider=None, fields=None):
328    if generation is not None:
329      generation = long(generation)
330    apitools_request = apitools_messages.StorageObjectsGetIamPolicyRequest(
331        bucket=bucket_name, object=object_name, generation=generation,
332        userProject=self.user_project)
333    global_params = apitools_messages.StandardQueryParameters()
334    if fields:
335      global_params.fields = ','.join(set(fields))
336    try:
337      return self.api_client.objects.GetIamPolicy(
338          apitools_request,
339          global_params=global_params)
340    except TRANSLATABLE_APITOOLS_EXCEPTIONS as e:
341      self._TranslateExceptionAndRaise(
342          e, bucket_name=bucket_name, object_name=object_name,
343          generation=generation)
344
345  def SetObjectIamPolicy(self, bucket_name, object_name, policy,
346                         generation=None, provider=None):
347    if generation is not None:
348      generation = long(generation)
349    api_request = apitools_messages.StorageObjectsSetIamPolicyRequest(
350        bucket=bucket_name, object=object_name, generation=generation,
351        policy=policy, userProject=self.user_project)
352    global_params = apitools_messages.StandardQueryParameters()
353    try:
354      return self.api_client.objects.SetIamPolicy(
355          api_request,
356          global_params=global_params)
357    except TRANSLATABLE_APITOOLS_EXCEPTIONS as e:
358      self._TranslateExceptionAndRaise(
359          e, bucket_name=bucket_name, object_name=object_name)
360
361  def SetBucketIamPolicy(self, bucket_name, policy, provider=None):
362    apitools_request = apitools_messages.StorageBucketsSetIamPolicyRequest(
363        bucket=bucket_name,
364        policy=policy,
365        userProject=self.user_project)
366    global_params = apitools_messages.StandardQueryParameters()
367    try:
368      return self.api_client.buckets.SetIamPolicy(
369          apitools_request,
370          global_params=global_params)
371    except TRANSLATABLE_APITOOLS_EXCEPTIONS as e:
372      self._TranslateExceptionAndRaise(e, bucket_name=bucket_name)
373
374  def GetBucket(self, bucket_name, provider=None, fields=None):
375    """See CloudApi class for function doc strings."""
376    projection = (apitools_messages.StorageBucketsGetRequest
377                  .ProjectionValueValuesEnum.noAcl)
378    if self._FieldsContainsAclField(fields):
379      projection = (apitools_messages.StorageBucketsGetRequest
380                    .ProjectionValueValuesEnum.full)
381    apitools_request = apitools_messages.StorageBucketsGetRequest(
382        bucket=bucket_name, projection=projection,
383        userProject=self.user_project)
384    global_params = apitools_messages.StandardQueryParameters()
385    if fields:
386      global_params.fields = ','.join(set(fields))
387
388    # Here and in list buckets, we have no way of knowing
389    # whether we requested a field and didn't get it because it didn't exist
390    # or because we didn't have permission to access it.
391    try:
392      return self.api_client.buckets.Get(apitools_request,
393                                         global_params=global_params)
394    except TRANSLATABLE_APITOOLS_EXCEPTIONS, e:
395      self._TranslateExceptionAndRaise(e, bucket_name=bucket_name)
396
397  def PatchBucket(self, bucket_name, metadata, canned_acl=None,
398                  canned_def_acl=None, preconditions=None, provider=None,
399                  fields=None):
400    """See CloudApi class for function doc strings."""
401    projection = (apitools_messages.StorageBucketsPatchRequest
402                  .ProjectionValueValuesEnum.noAcl)
403    if self._FieldsContainsAclField(fields):
404      projection = (apitools_messages.StorageBucketsPatchRequest
405                    .ProjectionValueValuesEnum.full)
406    bucket_metadata = metadata
407
408    if not preconditions:
409      preconditions = Preconditions()
410
411    # For blank metadata objects, we need to explicitly call
412    # them out to apitools so it will send/erase them.
413    apitools_include_fields = []
414    for metadata_field in ('billing',
415                           'lifecycle',
416                           'logging',
417                           'metadata',
418                           'versioning',
419                           'website'):
420      attr = getattr(bucket_metadata, metadata_field, None)
421      if attr and not encoding.MessageToDict(attr):
422        setattr(bucket_metadata, metadata_field, None)
423        apitools_include_fields.append(metadata_field)
424
425    if bucket_metadata.cors and bucket_metadata.cors == REMOVE_CORS_CONFIG:
426      bucket_metadata.cors = []
427      apitools_include_fields.append('cors')
428
429    if (bucket_metadata.defaultObjectAcl and
430        bucket_metadata.defaultObjectAcl[0] == PRIVATE_DEFAULT_OBJ_ACL):
431      bucket_metadata.defaultObjectAcl = []
432      apitools_include_fields.append('defaultObjectAcl')
433
434    predefined_acl = None
435    if canned_acl:
436      # Must null out existing ACLs to apply a canned ACL.
437      apitools_include_fields.append('acl')
438      predefined_acl = (
439          apitools_messages.StorageBucketsPatchRequest.
440          PredefinedAclValueValuesEnum(
441              self._BucketCannedAclToPredefinedAcl(canned_acl)))
442
443    predefined_def_acl = None
444    if canned_def_acl:
445      # Must null out existing default object ACLs to apply a canned ACL.
446      apitools_include_fields.append('defaultObjectAcl')
447      predefined_def_acl = (
448          apitools_messages.StorageBucketsPatchRequest.
449          PredefinedDefaultObjectAclValueValuesEnum(
450              self._ObjectCannedAclToPredefinedAcl(canned_def_acl)))
451
452    apitools_request = apitools_messages.StorageBucketsPatchRequest(
453        bucket=bucket_name, bucketResource=bucket_metadata,
454        projection=projection,
455        ifMetagenerationMatch=preconditions.meta_gen_match,
456        predefinedAcl=predefined_acl,
457        predefinedDefaultObjectAcl=predefined_def_acl,
458        userProject=self.user_project)
459    global_params = apitools_messages.StandardQueryParameters()
460    if fields:
461      global_params.fields = ','.join(set(fields))
462    with self.api_client.IncludeFields(apitools_include_fields):
463      try:
464        return self.api_client.buckets.Patch(apitools_request,
465                                             global_params=global_params)
466      except TRANSLATABLE_APITOOLS_EXCEPTIONS, e:
467        self._TranslateExceptionAndRaise(e)
468
469  def CreateBucket(self, bucket_name, project_id=None, metadata=None,
470                   provider=None, fields=None):
471    """See CloudApi class for function doc strings."""
472    projection = (apitools_messages.StorageBucketsInsertRequest
473                  .ProjectionValueValuesEnum.noAcl)
474    if self._FieldsContainsAclField(fields):
475      projection = (apitools_messages.StorageBucketsInsertRequest
476                    .ProjectionValueValuesEnum.full)
477    if not metadata:
478      metadata = apitools_messages.Bucket()
479    metadata.name = bucket_name
480
481    if metadata.location:
482      metadata.location = metadata.location.lower()
483    if metadata.storageClass:
484      metadata.storageClass = metadata.storageClass.lower()
485
486    project_id = PopulateProjectId(project_id)
487
488    apitools_request = apitools_messages.StorageBucketsInsertRequest(
489        bucket=metadata, project=project_id, projection=projection,
490        userProject=self.user_project)
491    global_params = apitools_messages.StandardQueryParameters()
492    if fields:
493      global_params.fields = ','.join(set(fields))
494    try:
495      return self.api_client.buckets.Insert(apitools_request,
496                                            global_params=global_params)
497    except TRANSLATABLE_APITOOLS_EXCEPTIONS, e:
498      self._TranslateExceptionAndRaise(e, bucket_name=bucket_name)
499
500  def DeleteBucket(self, bucket_name, preconditions=None, provider=None):
501    """See CloudApi class for function doc strings."""
502    if not preconditions:
503      preconditions = Preconditions()
504
505    apitools_request = apitools_messages.StorageBucketsDeleteRequest(
506        bucket=bucket_name, ifMetagenerationMatch=preconditions.meta_gen_match,
507        userProject=self.user_project)
508
509    try:
510      self.api_client.buckets.Delete(apitools_request)
511    except TRANSLATABLE_APITOOLS_EXCEPTIONS, e:
512      if isinstance(
513          self._TranslateApitoolsException(e, bucket_name=bucket_name),
514          NotEmptyException):
515        # If bucket is not empty, check to see if versioning is enabled and
516        # signal that in the exception if it is.
517        bucket_metadata = self.GetBucket(bucket_name,
518                                         fields=['versioning'])
519        if bucket_metadata.versioning and bucket_metadata.versioning.enabled:
520          raise NotEmptyException('VersionedBucketNotEmpty',
521                                  status=e.status_code)
522      self._TranslateExceptionAndRaise(e, bucket_name=bucket_name)
523
524  def ListBuckets(self, project_id=None, provider=None, fields=None):
525    """See CloudApi class for function doc strings."""
526    projection = (apitools_messages.StorageBucketsListRequest
527                  .ProjectionValueValuesEnum.noAcl)
528    if self._FieldsContainsAclField(fields):
529      projection = (apitools_messages.StorageBucketsListRequest
530                    .ProjectionValueValuesEnum.full)
531    project_id = PopulateProjectId(project_id)
532
533    apitools_request = apitools_messages.StorageBucketsListRequest(
534        project=project_id, maxResults=NUM_BUCKETS_PER_LIST_PAGE,
535        projection=projection, userProject=self.user_project)
536    global_params = apitools_messages.StandardQueryParameters()
537    if fields:
538      if 'nextPageToken' not in fields:
539        fields.add('nextPageToken')
540      global_params.fields = ','.join(set(fields))
541    try:
542      bucket_list = self.api_client.buckets.List(apitools_request,
543                                                 global_params=global_params)
544    except TRANSLATABLE_APITOOLS_EXCEPTIONS, e:
545      self._TranslateExceptionAndRaise(e)
546
547    for bucket in self._YieldBuckets(bucket_list):
548      yield bucket
549
550    while bucket_list.nextPageToken:
551      apitools_request = apitools_messages.StorageBucketsListRequest(
552          project=project_id, pageToken=bucket_list.nextPageToken,
553          maxResults=NUM_BUCKETS_PER_LIST_PAGE, projection=projection,
554          userProject=self.user_project)
555      try:
556        bucket_list = self.api_client.buckets.List(apitools_request,
557                                                   global_params=global_params)
558      except TRANSLATABLE_APITOOLS_EXCEPTIONS, e:
559        self._TranslateExceptionAndRaise(e)
560
561      for bucket in self._YieldBuckets(bucket_list):
562        yield bucket
563
564  def _YieldBuckets(self, bucket_list):
565    """Yields buckets from a list returned by apitools."""
566    if bucket_list.items:
567      for bucket in bucket_list.items:
568        yield bucket
569
570  def ListObjects(self, bucket_name, prefix=None, delimiter=None,
571                  all_versions=None, provider=None, fields=None):
572    """See CloudApi class for function doc strings."""
573    projection = (apitools_messages.StorageObjectsListRequest
574                  .ProjectionValueValuesEnum.noAcl)
575    if self._FieldsContainsAclField(fields):
576      projection = (apitools_messages.StorageObjectsListRequest
577                    .ProjectionValueValuesEnum.full)
578    apitools_request = apitools_messages.StorageObjectsListRequest(
579        bucket=bucket_name, prefix=prefix, delimiter=delimiter,
580        versions=all_versions, projection=projection,
581        maxResults=NUM_OBJECTS_PER_LIST_PAGE, userProject=self.user_project)
582    global_params = apitools_messages.StandardQueryParameters()
583
584    if fields:
585      fields = set(fields)
586      if 'nextPageToken' not in fields:
587        fields.add('nextPageToken')
588      if ListToGetFields(list_fields=fields).intersection(
589          _ENCRYPTED_HASHES_SET):
590        # We may need to make a follow-up request to decrypt the hashes,
591        # so ensure the required fields are present.
592        fields.add('items/customerEncryption')
593        fields.add('items/generation')
594        fields.add('items/name')
595      global_params.fields = ','.join(fields)
596
597    list_page = True
598    next_page_token = None
599
600    while list_page:
601      list_page = False
602      apitools_request = apitools_messages.StorageObjectsListRequest(
603          bucket=bucket_name, prefix=prefix, delimiter=delimiter,
604          versions=all_versions, projection=projection,
605          pageToken=next_page_token, maxResults=NUM_OBJECTS_PER_LIST_PAGE,
606          userProject=self.user_project)
607      try:
608        object_list = self.api_client.objects.List(apitools_request,
609                                                   global_params=global_params)
610      except TRANSLATABLE_APITOOLS_EXCEPTIONS, e:
611        self._TranslateExceptionAndRaise(e, bucket_name=bucket_name)
612
613      if object_list.nextPageToken:
614        list_page = True
615        next_page_token = object_list.nextPageToken
616
617      for object_or_prefix in self._YieldObjectsAndPrefixes(object_list):
618        if object_or_prefix.datatype == CloudApi.CsObjectOrPrefixType.OBJECT:
619          decrypted_metadata = self._DecryptHashesIfPossible(
620              bucket_name, object_or_prefix.data,
621              fields=ListToGetFields(fields))
622          if decrypted_metadata == _SKIP_LISTING_OBJECT:
623            continue
624          elif decrypted_metadata:
625            object_or_prefix.data = decrypted_metadata
626          else:
627            # Object metadata was unencrypted or we did not have a matching
628            # key; yield what we received in the initial listing.
629            pass
630
631        yield object_or_prefix
632
633  def _DecryptHashesIfPossible(self, bucket_name, object_metadata,
634                               fields=None):
635    """Attempts to decrypt object metadata.
636
637    This function issues a GetObjectMetadata request with a decryption key if
638    a matching one is found.
639
640    Args:
641      bucket_name: String of bucket containing the object.
642      object_metadata: apitools Object describing the object. Must include
643          name, generation, and customerEncryption fields.
644      fields: ListObjects-format fields to return.
645
646    Returns:
647      apitools Object with decrypted hashes if decryption was performed,
648      None if the object was not encrypted or a matching decryption key was not
649      found, or _SKIP_LISTING_OBJECT if the object no longer exists.
650    """
651    get_fields = ListToGetFields(list_fields=fields)
652    if self._ShouldTryWithEncryptionKey(object_metadata, fields=get_fields):
653      decryption_key = FindMatchingCryptoKey(
654          object_metadata.customerEncryption.keySha256)
655      if decryption_key:
656        # Issue a get request per-object with the encryption key.
657        # This requires an API call per-object and may be slow.
658        try:
659          return self._GetObjectMetadataWithDecryptionKey(
660              bucket_name, object_metadata.name, CryptoTuple(decryption_key),
661              object_metadata.generation, fields=get_fields)
662        except NotFoundException:
663          # If the object was deleted between the listing request and
664          # the get request, do not include it in the list.
665          return _SKIP_LISTING_OBJECT
666
667  def _YieldObjectsAndPrefixes(self, object_list):
668    # ls depends on iterating objects before prefixes for proper display.
669    if object_list.items:
670      for cloud_obj in object_list.items:
671        yield CloudApi.CsObjectOrPrefix(cloud_obj,
672                                        CloudApi.CsObjectOrPrefixType.OBJECT)
673    if object_list.prefixes:
674      for prefix in object_list.prefixes:
675        yield CloudApi.CsObjectOrPrefix(prefix,
676                                        CloudApi.CsObjectOrPrefixType.PREFIX)
677
678  @contextmanager
679  def _ApitoolsRequestHeaders(self, headers):
680    """Wrapped code makes apitools requests with the specified headers."""
681    if headers:
682      old_headers = self.api_client.additional_http_headers.copy()
683      self.api_client.additional_http_headers.update(headers)
684    yield
685    if headers:
686      self.api_client.additional_http_headers = old_headers
687
688  def _EncryptionHeadersFromTuple(self, crypto_tuple=None):
689    """Returns headers dict matching crypto_tuple, or empty dict."""
690    headers = {}
691    if crypto_tuple:
692      headers['x-goog-encryption-algorithm'] = crypto_tuple.crypto_alg
693      headers['x-goog-encryption-key'] = crypto_tuple.crypto_key
694      headers['x-goog-encryption-key-sha256'] = (
695          Base64Sha256FromBase64EncryptionKey(crypto_tuple.crypto_key))
696    return headers
697
698  def _RewriteCryptoHeadersFromTuples(self, decryption_tuple=None,
699                                      encryption_tuple=None):
700    """Returns headers dict matching the provided tuples, or empty dict."""
701    headers = {}
702    if decryption_tuple:
703      headers['x-goog-copy-source-encryption-algorithm'] = (
704          decryption_tuple.crypto_alg)
705      headers['x-goog-copy-source-encryption-key'] = (
706          decryption_tuple.crypto_key)
707      headers['x-goog-copy-source-encryption-key-sha256'] = (
708          Base64Sha256FromBase64EncryptionKey(decryption_tuple.crypto_key))
709
710    if encryption_tuple:
711      headers['x-goog-encryption-algorithm'] = encryption_tuple.crypto_alg
712      headers['x-goog-encryption-key'] = encryption_tuple.crypto_key
713      headers['x-goog-encryption-key-sha256'] = (
714          Base64Sha256FromBase64EncryptionKey(encryption_tuple.crypto_key))
715
716    return headers
717
718  def _GetObjectMetadataWithDecryptionKey(
719      self, bucket_name, object_name, decryption_tuple, generation=None,
720      fields=None):
721    """Helper function to get encrypted object metadata.
722
723    Args:
724      bucket_name: Bucket containing the object.
725      object_name: Object name.
726      decryption_tuple: CryptoTuple with decryption key.
727      generation: Generation of the object to retrieve.
728      fields: If present, return only these Object metadata fields, for
729              example, ['acl', 'updated'].
730
731    Raises:
732      ArgumentException for errors during input validation.
733      ServiceException for errors interacting with cloud storage providers.
734
735    Returns:
736      Object object.
737    """
738    apitools_request = self._GetApitoolsObjectMetadataRequest(
739        bucket_name, object_name, generation=generation, fields=fields)
740    global_params = self._GetApitoolsObjectMetadataGlobalParams(fields=fields)
741
742    try:
743      with self._ApitoolsRequestHeaders(
744          self._EncryptionHeadersFromTuple(crypto_tuple=decryption_tuple)):
745        return self.api_client.objects.Get(
746            apitools_request, global_params=global_params)
747    except TRANSLATABLE_APITOOLS_EXCEPTIONS, e:
748      self._TranslateExceptionAndRaise(e, bucket_name=bucket_name,
749                                       object_name=object_name,
750                                       generation=generation)
751
752  def _GetApitoolsObjectMetadataRequest(self, bucket_name, object_name,
753                                        generation=None, fields=None):
754    self._ValidateEncryptionFields(fields=fields)
755    projection = (apitools_messages.StorageObjectsGetRequest
756                  .ProjectionValueValuesEnum.noAcl)
757    if self._FieldsContainsAclField(fields):
758      projection = (apitools_messages.StorageObjectsGetRequest
759                    .ProjectionValueValuesEnum.full)
760
761    if generation:
762      generation = long(generation)
763
764    return apitools_messages.StorageObjectsGetRequest(
765        bucket=bucket_name, object=object_name, projection=projection,
766        generation=generation, userProject=self.user_project)
767
768  def _GetApitoolsObjectMetadataGlobalParams(self, fields=None):
769    global_params = apitools_messages.StandardQueryParameters()
770    if fields:
771      global_params.fields = ','.join(set(fields))
772    return global_params
773
774  def GetObjectMetadata(self, bucket_name, object_name, generation=None,
775                        provider=None, fields=None):
776    """See CloudApi class for function doc strings."""
777    apitools_request = self._GetApitoolsObjectMetadataRequest(
778        bucket_name, object_name, generation=generation, fields=fields)
779    global_params = self._GetApitoolsObjectMetadataGlobalParams(fields=fields)
780
781    try:
782      object_metadata = self.api_client.objects.Get(apitools_request,
783                                                    global_params=global_params)
784      if self._ShouldTryWithEncryptionKey(object_metadata, fields=fields):
785        key_sha256 = object_metadata.customerEncryption.keySha256
786        decryption_key = FindMatchingCryptoKey(key_sha256)
787        if not decryption_key:
788          raise EncryptionException(
789              'Missing decryption key with SHA256 hash %s. No decryption key '
790              'matches object %s://%s/%s' % (key_sha256, self.provider,
791                                             bucket_name, object_name))
792        return self._GetObjectMetadataWithDecryptionKey(
793            bucket_name, object_name, CryptoTuple(decryption_key),
794            generation=generation, fields=fields)
795      else:
796        return object_metadata
797    except TRANSLATABLE_APITOOLS_EXCEPTIONS, e:
798      self._TranslateExceptionAndRaise(e, bucket_name=bucket_name,
799                                       object_name=object_name,
800                                       generation=generation)
801
802  def _ShouldTryWithEncryptionKey(self, object_metadata, fields=None):
803    """Checks if a metadata request should be re-issued with a decryption key.
804
805    Args:
806      object_metadata: apitools Object metadata from a request without a
807          decryption key; must include customerEncryption field if hashes
808          are requested.
809      fields: Requested fields.
810
811    Returns:
812      True if request requires a decryption key and should be re-issued, False
813      otherwise.
814
815    Raises:
816      ServiceException if service did not provide at least one of the requested
817      hashes or the customerEncryption field.
818    """
819    if fields is not None and not _ENCRYPTED_HASHES_SET.intersection(fields):
820      # No potentially encrypted metadata requested.
821      return False
822
823    if object_metadata.customerEncryption:
824      # Object is encrypted.
825      return True
826
827    need_hash = fields is None or 'md5Hash' in fields or 'crc32c' in fields
828    has_hash = object_metadata.md5Hash or object_metadata.crc32c
829    if need_hash and not has_hash:
830      raise ServiceException('Service did not provide requested hashes, '
831                             'but customerEncryption field is missing.')
832
833    return False
834
835  def GetObjectMedia(
836      self, bucket_name, object_name, download_stream,
837      provider=None, generation=None, object_size=None,
838      compressed_encoding=False,
839      download_strategy=CloudApi.DownloadStrategy.ONE_SHOT, start_byte=0,
840      end_byte=None, progress_callback=None, serialization_data=None,
841      digesters=None, decryption_tuple=None):
842    """See CloudApi class for function doc strings."""
843    # This implementation will get the object metadata first if we don't pass it
844    # in via serialization_data.
845    if generation:
846      generation = long(generation)
847
848    # 'outer_total_size' is only used for formatting user output, and is
849    # expected to be one higher than the last byte that should be downloaded.
850    # TODO: Change DownloadCallbackConnectionClassFactory and progress callbacks
851    # to more elegantly handle total size for components of files.
852    outer_total_size = object_size
853    if end_byte:
854      outer_total_size = end_byte + 1
855    elif serialization_data:
856      outer_total_size = json.loads(serialization_data)['total_size']
857
858    if progress_callback:
859      if outer_total_size is None:
860        raise ArgumentException('Download size is required when callbacks are '
861                                'requested for a download, but no size was '
862                                'provided.')
863      progress_callback(start_byte, outer_total_size)
864
865    bytes_downloaded_container = BytesTransferredContainer()
866    bytes_downloaded_container.bytes_transferred = start_byte
867
868    callback_class_factory = DownloadCallbackConnectionClassFactory(
869        bytes_downloaded_container, total_size=outer_total_size,
870        progress_callback=progress_callback, digesters=digesters)
871    download_http_class = callback_class_factory.GetConnectionClass()
872
873    # Point our download HTTP at our download stream.
874    self.download_http.stream = download_stream
875    self.download_http.connections = {'https': download_http_class}
876
877    if serialization_data:
878      # If we have an apiary trace token, add it to the URL.
879      # TODO: Add query parameter support to apitools downloads so there is
880      # a well-defined way to express query parameters. Currently, we assume
881      # the URL ends in ?alt=media, and this will break if that changes.
882      if self.trace_token:
883        serialization_dict = json.loads(serialization_data)
884        serialization_dict['url'] += '&trace=token%%3A%s' % self.trace_token
885        serialization_data = json.dumps(serialization_dict)
886
887      apitools_download = apitools_transfer.Download.FromData(
888          download_stream, serialization_data, self.api_client.http,
889          num_retries=self.num_retries)
890    else:
891      apitools_download = apitools_transfer.Download.FromStream(
892          download_stream, auto_transfer=False, total_size=object_size,
893          num_retries=self.num_retries)
894
895    apitools_download.bytes_http = self.authorized_download_http
896    apitools_request = apitools_messages.StorageObjectsGetRequest(
897        bucket=bucket_name, object=object_name, generation=generation,
898        userProject=self.user_project)
899
900    # Disable retries in apitools. We will handle them explicitly for
901    # resumable downloads; one-shot downloads are not retryable as we do
902    # not track how many bytes were written to the stream.
903    apitools_download.retry_func = LogAndHandleRetries(
904        is_data_transfer=True,
905        status_queue=self.status_queue)
906
907    try:
908      if download_strategy == CloudApi.DownloadStrategy.RESUMABLE:
909        return self._PerformResumableDownload(
910            bucket_name, object_name, download_stream, apitools_request,
911            apitools_download, bytes_downloaded_container,
912            compressed_encoding=compressed_encoding,
913            generation=generation, start_byte=start_byte, end_byte=end_byte,
914            serialization_data=serialization_data,
915            decryption_tuple=decryption_tuple)
916      else:
917        return self._PerformDownload(
918            bucket_name, object_name, download_stream, apitools_request,
919            apitools_download, generation=generation,
920            compressed_encoding=compressed_encoding,
921            start_byte=start_byte, end_byte=end_byte,
922            serialization_data=serialization_data,
923            decryption_tuple=decryption_tuple)
924    except TRANSLATABLE_APITOOLS_EXCEPTIONS, e:
925      self._TranslateExceptionAndRaise(e, bucket_name=bucket_name,
926                                       object_name=object_name,
927                                       generation=generation)
928
929  def _PerformResumableDownload(
930      self, bucket_name, object_name, download_stream, apitools_request,
931      apitools_download, bytes_downloaded_container, generation=None,
932      compressed_encoding=False, start_byte=0, end_byte=None,
933      serialization_data=None, decryption_tuple=None):
934    retries = 0
935    last_progress_byte = start_byte
936    while retries <= self.num_retries:
937      try:
938        return self._PerformDownload(
939            bucket_name, object_name, download_stream, apitools_request,
940            apitools_download, generation=generation,
941            compressed_encoding=compressed_encoding, start_byte=start_byte,
942            end_byte=end_byte, serialization_data=serialization_data,
943            decryption_tuple=decryption_tuple)
944      except HTTP_TRANSFER_EXCEPTIONS, e:
945        self._ValidateHttpAccessTokenRefreshError(e)
946        start_byte = download_stream.tell()
947        bytes_downloaded_container.bytes_transferred = start_byte
948        if start_byte > last_progress_byte:
949          # We've made progress, so allow a fresh set of retries.
950          last_progress_byte = start_byte
951          retries = 0
952        retries += 1
953        if retries > self.num_retries:
954          raise ResumableDownloadException(
955              'Transfer failed after %d retries. Final exception: %s' %
956              (self.num_retries, GetPrintableExceptionString(e)))
957        time.sleep(CalculateWaitForRetry(retries, max_wait=self.max_retry_wait))
958        if self.logger.isEnabledFor(logging.DEBUG):
959          self.logger.debug(
960              'Retrying download from byte %s after exception: %s. Trace: %s',
961              start_byte, GetPrintableExceptionString(e),
962              traceback.format_exc())
963        apitools_http_wrapper.RebuildHttpConnections(
964            apitools_download.bytes_http)
965
966  def _PerformDownload(
967      self, bucket_name, object_name, download_stream, apitools_request,
968      apitools_download, generation=None, compressed_encoding=False,
969      start_byte=0, end_byte=None, serialization_data=None,
970      decryption_tuple=None):
971    if not serialization_data:
972      try:
973        self.api_client.objects.Get(apitools_request,
974                                    download=apitools_download)
975      except TRANSLATABLE_APITOOLS_EXCEPTIONS, e:
976        self._TranslateExceptionAndRaise(e, bucket_name=bucket_name,
977                                         object_name=object_name,
978                                         generation=generation)
979
980    # Disable apitools' default print callbacks.
981    def _NoOpCallback(unused_response, unused_download_object):
982      pass
983
984    # TODO: If we have a resumable download with accept-encoding:gzip
985    # on a object that is compressible but not in gzip form in the cloud,
986    # on-the-fly compression may gzip the object.  In this case if our
987    # download breaks, future requests will ignore the range header and just
988    # return the object (gzipped) in its entirety.  Ideally, we would unzip
989    # the bytes that we have locally and send a range request without
990    # accept-encoding:gzip so that we can download only the (uncompressed) bytes
991    # that we don't yet have.
992
993    # Since bytes_http is created in this function, we don't get the
994    # user-agent header from api_client's http automatically.
995    additional_headers = {
996        'user-agent': self.api_client.user_agent,
997    }
998    AddAcceptEncodingGzipIfNeeded(additional_headers,
999                                  compressed_encoding=compressed_encoding)
1000
1001    self._AddPerfTraceTokenToHeaders(additional_headers)
1002    additional_headers.update(
1003        self._EncryptionHeadersFromTuple(decryption_tuple))
1004
1005    if start_byte or end_byte is not None:
1006      apitools_download.GetRange(additional_headers=additional_headers,
1007                                 start=start_byte, end=end_byte,
1008                                 use_chunks=False)
1009    else:
1010      apitools_download.StreamMedia(
1011          callback=_NoOpCallback, finish_callback=_NoOpCallback,
1012          additional_headers=additional_headers, use_chunks=False)
1013    return apitools_download.encoding
1014
1015  def PatchObjectMetadata(self, bucket_name, object_name, metadata,
1016                          canned_acl=None, generation=None, preconditions=None,
1017                          provider=None, fields=None, user_project=None):
1018    """See CloudApi class for function doc strings."""
1019    projection = (apitools_messages.StorageObjectsPatchRequest
1020                  .ProjectionValueValuesEnum.noAcl)
1021    if self._FieldsContainsAclField(fields):
1022      projection = (apitools_messages.StorageObjectsPatchRequest
1023                    .ProjectionValueValuesEnum.full)
1024
1025    if not preconditions:
1026      preconditions = Preconditions()
1027
1028    if generation:
1029      generation = long(generation)
1030
1031    predefined_acl = None
1032    apitools_include_fields = []
1033    if canned_acl:
1034      # Must null out existing ACLs to apply a canned ACL.
1035      apitools_include_fields.append('acl')
1036      predefined_acl = (
1037          apitools_messages.StorageObjectsPatchRequest.
1038          PredefinedAclValueValuesEnum(
1039              self._ObjectCannedAclToPredefinedAcl(canned_acl)))
1040
1041    apitools_request = apitools_messages.StorageObjectsPatchRequest(
1042        bucket=bucket_name, object=object_name, objectResource=metadata,
1043        generation=generation, projection=projection,
1044        ifGenerationMatch=preconditions.gen_match,
1045        ifMetagenerationMatch=preconditions.meta_gen_match,
1046        predefinedAcl=predefined_acl, userProject=self.user_project)
1047    global_params = apitools_messages.StandardQueryParameters()
1048    if fields:
1049      global_params.fields = ','.join(set(fields))
1050
1051    try:
1052      with self.api_client.IncludeFields(apitools_include_fields):
1053        return self.api_client.objects.Patch(apitools_request,
1054                                             global_params=global_params)
1055    except TRANSLATABLE_APITOOLS_EXCEPTIONS, e:
1056      self._TranslateExceptionAndRaise(e, bucket_name=bucket_name,
1057                                       object_name=object_name,
1058                                       generation=generation)
1059
1060  def _UploadObject(self, upload_stream, object_metadata, canned_acl=None,
1061                    size=None, preconditions=None, encryption_tuple=None,
1062                    provider=None, fields=None, serialization_data=None,
1063                    tracker_callback=None, progress_callback=None,
1064                    apitools_strategy=apitools_transfer.SIMPLE_UPLOAD,
1065                    total_size=0):
1066    # pylint: disable=g-doc-args
1067    """Upload implementation. Cloud API arguments, plus two more.
1068
1069    Additional args:
1070      apitools_strategy: SIMPLE_UPLOAD or RESUMABLE_UPLOAD.
1071      total_size: Total size of the upload; None if it is unknown (streaming).
1072
1073    Returns:
1074      Uploaded object metadata.
1075    """
1076    # pylint: enable=g-doc-args
1077    ValidateDstObjectMetadata(object_metadata)
1078    predefined_acl = None
1079    if canned_acl:
1080      predefined_acl = (
1081          apitools_messages.StorageObjectsInsertRequest.
1082          PredefinedAclValueValuesEnum(
1083              self._ObjectCannedAclToPredefinedAcl(canned_acl)))
1084
1085    bytes_uploaded_container = BytesTransferredContainer()
1086
1087    if progress_callback and size:
1088      total_size = size
1089      progress_callback(0, size)
1090
1091    callback_class_factory = UploadCallbackConnectionClassFactory(
1092        bytes_uploaded_container, total_size=total_size,
1093        progress_callback=progress_callback, logger=self.logger,
1094        debug=self.debug)
1095
1096    upload_http_class = callback_class_factory.GetConnectionClass()
1097    self.upload_http.connections = {'http': upload_http_class,
1098                                    'https': upload_http_class}
1099
1100    # Since bytes_http is created in this function, we don't get the
1101    # user-agent header from api_client's http automatically.
1102    additional_headers = {
1103        'user-agent': self.api_client.user_agent
1104    }
1105    self._AddPerfTraceTokenToHeaders(additional_headers)
1106
1107    try:
1108      content_type = None
1109      apitools_request = None
1110      global_params = None
1111
1112      if not serialization_data:
1113        # This is a new upload, set up initial upload state.
1114        content_type = object_metadata.contentType
1115        if not content_type:
1116          content_type = DEFAULT_CONTENT_TYPE
1117
1118        if not preconditions:
1119          preconditions = Preconditions()
1120
1121        apitools_request = apitools_messages.StorageObjectsInsertRequest(
1122            bucket=object_metadata.bucket, object=object_metadata,
1123            ifGenerationMatch=preconditions.gen_match,
1124            ifMetagenerationMatch=preconditions.meta_gen_match,
1125            predefinedAcl=predefined_acl, userProject=self.user_project)
1126        global_params = apitools_messages.StandardQueryParameters()
1127        if fields:
1128          global_params.fields = ','.join(set(fields))
1129
1130      encryption_headers = self._EncryptionHeadersFromTuple(
1131          crypto_tuple=encryption_tuple)
1132
1133      if apitools_strategy == apitools_transfer.SIMPLE_UPLOAD:
1134        # One-shot upload.
1135        apitools_upload = apitools_transfer.Upload(
1136            upload_stream, content_type, total_size=size, auto_transfer=True,
1137            num_retries=self.num_retries)
1138        apitools_upload.strategy = apitools_strategy
1139        apitools_upload.bytes_http = self.authorized_upload_http
1140
1141        with self._ApitoolsRequestHeaders(encryption_headers):
1142          return self.api_client.objects.Insert(
1143              apitools_request,
1144              upload=apitools_upload,
1145              global_params=global_params)
1146      else:  # Resumable upload.
1147        # Since bytes_http is created in this function, we don't get the
1148        # user-agent header from api_client's http automatically.
1149        additional_headers = {
1150            'user-agent': self.api_client.user_agent
1151        }
1152        additional_headers.update(encryption_headers)
1153
1154        return self._PerformResumableUpload(
1155            upload_stream, self.authorized_upload_http, content_type, size,
1156            serialization_data, apitools_strategy, apitools_request,
1157            global_params, bytes_uploaded_container, tracker_callback,
1158            additional_headers, progress_callback)
1159    except TRANSLATABLE_APITOOLS_EXCEPTIONS, e:
1160      not_found_exception = CreateNotFoundExceptionForObjectWrite(
1161          self.provider, object_metadata.bucket)
1162      self._TranslateExceptionAndRaise(e, bucket_name=object_metadata.bucket,
1163                                       object_name=object_metadata.name,
1164                                       not_found_exception=not_found_exception)
1165
1166  def _PerformResumableUpload(
1167      self, upload_stream, authorized_upload_http, content_type, size,
1168      serialization_data, apitools_strategy, apitools_request, global_params,
1169      bytes_uploaded_container, tracker_callback, addl_headers,
1170      progress_callback):
1171    try:
1172      if serialization_data:
1173        # Resuming an existing upload.
1174        apitools_upload = apitools_transfer.Upload.FromData(
1175            upload_stream, serialization_data, self.api_client.http,
1176            num_retries=self.num_retries)
1177        apitools_upload.chunksize = GetJsonResumableChunkSize()
1178        apitools_upload.bytes_http = authorized_upload_http
1179      else:
1180        # New resumable upload.
1181        apitools_upload = apitools_transfer.Upload(
1182            upload_stream, content_type, total_size=size,
1183            chunksize=GetJsonResumableChunkSize(), auto_transfer=False,
1184            num_retries=self.num_retries)
1185        apitools_upload.strategy = apitools_strategy
1186        apitools_upload.bytes_http = authorized_upload_http
1187        with self._ApitoolsRequestHeaders(addl_headers):
1188          self.api_client.objects.Insert(
1189              apitools_request,
1190              upload=apitools_upload,
1191              global_params=global_params)
1192      # Disable retries in apitools. We will handle them explicitly here.
1193      apitools_upload.retry_func = LogAndHandleRetries(
1194          is_data_transfer=True,
1195          status_queue=self.status_queue)
1196
1197      # Disable apitools' default print callbacks.
1198      def _NoOpCallback(unused_response, unused_upload_object):
1199        pass
1200
1201      # If we're resuming an upload, apitools has at this point received
1202      # from the server how many bytes it already has. Update our
1203      # callback class with this information.
1204      bytes_uploaded_container.bytes_transferred = apitools_upload.progress
1205
1206      if tracker_callback:
1207        tracker_callback(json.dumps(apitools_upload.serialization_data))
1208
1209      retries = 0
1210      last_progress_byte = apitools_upload.progress
1211      while retries <= self.num_retries:
1212        try:
1213          # TODO: On retry, this will seek to the bytes that the server has,
1214          # causing the hash to be recalculated. Make HashingFileUploadWrapper
1215          # save a digest according to json_resumable_chunk_size.
1216          if size and not JsonResumableChunkSizeDefined():
1217            # If size is known, we can send it all in one request and avoid
1218            # making a round-trip per chunk.
1219            http_response = apitools_upload.StreamMedia(
1220                callback=_NoOpCallback, finish_callback=_NoOpCallback,
1221                additional_headers=addl_headers)
1222          else:
1223            # Otherwise it's a streaming request and we need to ensure that we
1224            # send the bytes in chunks so that we can guarantee that we never
1225            # need to seek backwards more than our buffer (and also that the
1226            # chunks are aligned to 256KB).
1227            http_response = apitools_upload.StreamInChunks(
1228                callback=_NoOpCallback, finish_callback=_NoOpCallback,
1229                additional_headers=addl_headers)
1230          processed_response = self.api_client.objects.ProcessHttpResponse(
1231              self.api_client.objects.GetMethodConfig('Insert'), http_response)
1232          if size is None and progress_callback:
1233            # Make final progress callback; total size should now be known.
1234            # This works around the fact the send function counts header bytes.
1235            # However, this will make the progress appear to go slightly
1236            # backwards at the end.
1237            progress_callback(apitools_upload.total_size,
1238                              apitools_upload.total_size)
1239          return processed_response
1240        except HTTP_TRANSFER_EXCEPTIONS, e:
1241          self._ValidateHttpAccessTokenRefreshError(e)
1242          apitools_http_wrapper.RebuildHttpConnections(
1243              apitools_upload.bytes_http)
1244          while retries <= self.num_retries:
1245            try:
1246              # TODO: Simulate the refresh case in tests. Right now, our
1247              # mocks are not complex enough to simulate a failure.
1248              apitools_upload.RefreshResumableUploadState()
1249              start_byte = apitools_upload.progress
1250              bytes_uploaded_container.bytes_transferred = start_byte
1251              break
1252            except HTTP_TRANSFER_EXCEPTIONS, e2:
1253              self._ValidateHttpAccessTokenRefreshError(e2)
1254              apitools_http_wrapper.RebuildHttpConnections(
1255                  apitools_upload.bytes_http)
1256              retries += 1
1257              if retries > self.num_retries:
1258                raise ResumableUploadException(
1259                    'Transfer failed after %d retries. Final exception: %s' %
1260                    (self.num_retries, e2))
1261              time.sleep(
1262                  CalculateWaitForRetry(retries, max_wait=self.max_retry_wait))
1263          if start_byte > last_progress_byte:
1264            # We've made progress, so allow a fresh set of retries.
1265            last_progress_byte = start_byte
1266            retries = 0
1267          else:
1268            retries += 1
1269            if retries > self.num_retries:
1270              raise ResumableUploadException(
1271                  'Transfer failed after %d retries. Final exception: %s' %
1272                  (self.num_retries, GetPrintableExceptionString(e)))
1273            time.sleep(
1274                CalculateWaitForRetry(retries, max_wait=self.max_retry_wait))
1275          if self.logger.isEnabledFor(logging.DEBUG):
1276            self.logger.debug(
1277                'Retrying upload from byte %s after exception: %s. Trace: %s',
1278                start_byte, GetPrintableExceptionString(e),
1279                traceback.format_exc())
1280    except TRANSLATABLE_APITOOLS_EXCEPTIONS, e:
1281      resumable_ex = self._TranslateApitoolsResumableUploadException(e)
1282      if resumable_ex:
1283        raise resumable_ex
1284      else:
1285        raise
1286
1287  def UploadObject(self, upload_stream, object_metadata, canned_acl=None,
1288                   size=None, preconditions=None, progress_callback=None,
1289                   encryption_tuple=None, provider=None, fields=None):
1290    """See CloudApi class for function doc strings."""
1291    return self._UploadObject(
1292        upload_stream, object_metadata, canned_acl=canned_acl,
1293        size=size, preconditions=preconditions,
1294        progress_callback=progress_callback, encryption_tuple=encryption_tuple,
1295        fields=fields, apitools_strategy=apitools_transfer.SIMPLE_UPLOAD)
1296
1297  def UploadObjectStreaming(
1298      self, upload_stream, object_metadata, canned_acl=None, preconditions=None,
1299      progress_callback=None, encryption_tuple=None, provider=None,
1300      fields=None):
1301    """See CloudApi class for function doc strings."""
1302    # Streaming indicated by not passing a size.
1303    # Resumable capabilities are present up to the resumable chunk size using
1304    # a buffered stream.
1305    return self._UploadObject(
1306        upload_stream, object_metadata, canned_acl=canned_acl,
1307        preconditions=preconditions, progress_callback=progress_callback,
1308        encryption_tuple=encryption_tuple, fields=fields,
1309        apitools_strategy=apitools_transfer.RESUMABLE_UPLOAD, total_size=None)
1310
1311  def UploadObjectResumable(
1312      self, upload_stream, object_metadata, canned_acl=None, preconditions=None,
1313      size=None, serialization_data=None, tracker_callback=None,
1314      progress_callback=None, encryption_tuple=None, provider=None,
1315      fields=None):
1316    """See CloudApi class for function doc strings."""
1317    return self._UploadObject(
1318        upload_stream, object_metadata, canned_acl=canned_acl,
1319        preconditions=preconditions, fields=fields, size=size,
1320        serialization_data=serialization_data,
1321        tracker_callback=tracker_callback, progress_callback=progress_callback,
1322        encryption_tuple=encryption_tuple,
1323        apitools_strategy=apitools_transfer.RESUMABLE_UPLOAD)
1324
1325  def CopyObject(self, src_obj_metadata, dst_obj_metadata, src_generation=None,
1326                 canned_acl=None, preconditions=None, progress_callback=None,
1327                 max_bytes_per_call=None, encryption_tuple=None,
1328                 decryption_tuple=None, provider=None, fields=None):
1329    """See CloudApi class for function doc strings."""
1330    ValidateDstObjectMetadata(dst_obj_metadata)
1331    predefined_acl = None
1332    if canned_acl:
1333      predefined_acl = (
1334          apitools_messages.StorageObjectsRewriteRequest.
1335          DestinationPredefinedAclValueValuesEnum(
1336              self._ObjectCannedAclToPredefinedAcl(canned_acl)))
1337
1338    if src_generation:
1339      src_generation = long(src_generation)
1340
1341    if not preconditions:
1342      preconditions = Preconditions()
1343
1344    projection = (apitools_messages.StorageObjectsRewriteRequest.
1345                  ProjectionValueValuesEnum.noAcl)
1346    if self._FieldsContainsAclField(fields):
1347      projection = (apitools_messages.StorageObjectsRewriteRequest.
1348                    ProjectionValueValuesEnum.full)
1349    global_params = apitools_messages.StandardQueryParameters()
1350    if fields:
1351      # Rewrite returns the resultant object under the 'resource' field.
1352      new_fields = set(['done', 'objectSize', 'rewriteToken',
1353                        'totalBytesRewritten'])
1354      for field in fields:
1355        new_fields.add('resource/' + field)
1356      global_params.fields = ','.join(set(new_fields))
1357
1358    dec_key_sha256 = (
1359        Base64Sha256FromBase64EncryptionKey(decryption_tuple.crypto_key)
1360        if decryption_tuple else None)
1361    enc_key_sha256 = (
1362        Base64Sha256FromBase64EncryptionKey(encryption_tuple.crypto_key)
1363        if encryption_tuple else None)
1364
1365    # Check to see if we are resuming a rewrite.
1366    tracker_file_name = GetRewriteTrackerFilePath(
1367        src_obj_metadata.bucket, src_obj_metadata.name, dst_obj_metadata.bucket,
1368        dst_obj_metadata.name, 'JSON')
1369    rewrite_params_hash = HashRewriteParameters(
1370        src_obj_metadata, dst_obj_metadata, projection,
1371        src_generation=src_generation, gen_match=preconditions.gen_match,
1372        meta_gen_match=preconditions.meta_gen_match,
1373        canned_acl=predefined_acl, max_bytes_per_call=max_bytes_per_call,
1374        src_dec_key_sha256=dec_key_sha256, dst_enc_key_sha256=enc_key_sha256,
1375        fields=global_params.fields)
1376    resume_rewrite_token = ReadRewriteTrackerFile(tracker_file_name,
1377                                                  rewrite_params_hash)
1378    crypto_headers = self._RewriteCryptoHeadersFromTuples(
1379        decryption_tuple=decryption_tuple, encryption_tuple=encryption_tuple)
1380
1381    progress_cb_with_timeout = None
1382    try:
1383      last_bytes_written = 0L
1384      while True:
1385        with self._ApitoolsRequestHeaders(crypto_headers):
1386          apitools_request = apitools_messages.StorageObjectsRewriteRequest(
1387              sourceBucket=src_obj_metadata.bucket,
1388              sourceObject=src_obj_metadata.name,
1389              destinationBucket=dst_obj_metadata.bucket,
1390              destinationObject=dst_obj_metadata.name,
1391              projection=projection, object=dst_obj_metadata,
1392              sourceGeneration=src_generation,
1393              ifGenerationMatch=preconditions.gen_match,
1394              ifMetagenerationMatch=preconditions.meta_gen_match,
1395              destinationPredefinedAcl=predefined_acl,
1396              rewriteToken=resume_rewrite_token,
1397              maxBytesRewrittenPerCall=max_bytes_per_call,
1398              userProject=self.user_project)
1399          rewrite_response = self.api_client.objects.Rewrite(
1400              apitools_request, global_params=global_params)
1401        bytes_written = long(rewrite_response.totalBytesRewritten)
1402        if progress_callback and not progress_cb_with_timeout:
1403          progress_cb_with_timeout = ProgressCallbackWithTimeout(
1404              long(rewrite_response.objectSize), progress_callback)
1405        if progress_cb_with_timeout:
1406          progress_cb_with_timeout.Progress(
1407              bytes_written - last_bytes_written)
1408
1409        if rewrite_response.done:
1410          break
1411        elif not resume_rewrite_token:
1412          # Save the token and make a tracker file if they don't already exist.
1413          resume_rewrite_token = rewrite_response.rewriteToken
1414          WriteRewriteTrackerFile(tracker_file_name, rewrite_params_hash,
1415                                  rewrite_response.rewriteToken)
1416        last_bytes_written = bytes_written
1417
1418      DeleteTrackerFile(tracker_file_name)
1419      return rewrite_response.resource
1420    except TRANSLATABLE_APITOOLS_EXCEPTIONS, e:
1421      not_found_exception = CreateNotFoundExceptionForObjectWrite(
1422          self.provider, dst_obj_metadata.bucket, src_provider=self.provider,
1423          src_bucket_name=src_obj_metadata.bucket,
1424          src_object_name=src_obj_metadata.name, src_generation=src_generation)
1425      self._TranslateExceptionAndRaise(e, bucket_name=dst_obj_metadata.bucket,
1426                                       object_name=dst_obj_metadata.name,
1427                                       not_found_exception=not_found_exception)
1428
1429  def DeleteObject(self, bucket_name, object_name, preconditions=None,
1430                   generation=None, provider=None):
1431    """See CloudApi class for function doc strings."""
1432    if not preconditions:
1433      preconditions = Preconditions()
1434
1435    if generation:
1436      generation = long(generation)
1437
1438    apitools_request = apitools_messages.StorageObjectsDeleteRequest(
1439        bucket=bucket_name, object=object_name, generation=generation,
1440        ifGenerationMatch=preconditions.gen_match,
1441        ifMetagenerationMatch=preconditions.meta_gen_match,
1442        userProject=self.user_project)
1443    try:
1444      return self.api_client.objects.Delete(apitools_request)
1445    except TRANSLATABLE_APITOOLS_EXCEPTIONS, e:
1446      self._TranslateExceptionAndRaise(e, bucket_name=bucket_name,
1447                                       object_name=object_name,
1448                                       generation=generation)
1449
1450  def ComposeObject(self, src_objs_metadata, dst_obj_metadata,
1451                    preconditions=None, encryption_tuple=None,
1452                    provider=None, fields=None):
1453    """See CloudApi class for function doc strings."""
1454    ValidateDstObjectMetadata(dst_obj_metadata)
1455
1456    dst_obj_name = dst_obj_metadata.name
1457    dst_obj_metadata.name = None
1458    dst_bucket_name = dst_obj_metadata.bucket
1459    dst_obj_metadata.bucket = None
1460    if not dst_obj_metadata.contentType:
1461      dst_obj_metadata.contentType = DEFAULT_CONTENT_TYPE
1462
1463    if not preconditions:
1464      preconditions = Preconditions()
1465
1466    global_params = apitools_messages.StandardQueryParameters()
1467    if fields:
1468      global_params.fields = ','.join(set(fields))
1469
1470    src_objs_compose_request = apitools_messages.ComposeRequest(
1471        sourceObjects=src_objs_metadata, destination=dst_obj_metadata)
1472
1473    encryption_headers = self._EncryptionHeadersFromTuple(
1474        crypto_tuple=encryption_tuple)
1475
1476    with self._ApitoolsRequestHeaders(encryption_headers):
1477      apitools_request = apitools_messages.StorageObjectsComposeRequest(
1478          composeRequest=src_objs_compose_request,
1479          destinationBucket=dst_bucket_name,
1480          destinationObject=dst_obj_name,
1481          ifGenerationMatch=preconditions.gen_match,
1482          ifMetagenerationMatch=preconditions.meta_gen_match,
1483          userProject=self.user_project)
1484      try:
1485        return self.api_client.objects.Compose(apitools_request,
1486                                               global_params=global_params)
1487      except TRANSLATABLE_APITOOLS_EXCEPTIONS, e:
1488        # We can't be sure which object was missing in the 404 case.
1489        if (isinstance(e, apitools_exceptions.HttpError)
1490            and e.status_code == 404):
1491          raise NotFoundException('One of the source objects does not exist.')
1492        else:
1493          self._TranslateExceptionAndRaise(e)
1494
1495  def WatchBucket(self, bucket_name, address, channel_id, token=None,
1496                  provider=None, fields=None):
1497    """See CloudApi class for function doc strings."""
1498    projection = (apitools_messages.StorageObjectsWatchAllRequest
1499                  .ProjectionValueValuesEnum.full)
1500
1501    channel = apitools_messages.Channel(address=address, id=channel_id,
1502                                        token=token, type='WEB_HOOK')
1503
1504    apitools_request = apitools_messages.StorageObjectsWatchAllRequest(
1505        bucket=bucket_name, channel=channel, projection=projection,
1506        userProject=self.user_project)
1507
1508    global_params = apitools_messages.StandardQueryParameters()
1509    if fields:
1510      global_params.fields = ','.join(set(fields))
1511
1512    try:
1513      return self.api_client.objects.WatchAll(apitools_request,
1514                                              global_params=global_params)
1515    except TRANSLATABLE_APITOOLS_EXCEPTIONS, e:
1516      self._TranslateExceptionAndRaise(e, bucket_name=bucket_name)
1517
1518  def StopChannel(self, channel_id, resource_id, provider=None):
1519    """See CloudApi class for function doc strings."""
1520    channel = apitools_messages.Channel(id=channel_id, resourceId=resource_id)
1521    try:
1522      self.api_client.channels.Stop(channel)
1523    except TRANSLATABLE_APITOOLS_EXCEPTIONS, e:
1524      self._TranslateExceptionAndRaise(e)
1525
1526  def GetProjectServiceAccount(self, project_number):
1527    """See CloudApi class for function doc strings."""
1528    try:
1529      request = apitools_messages.StorageProjectsServiceAccountGetRequest(
1530          projectId=unicode(project_number))
1531      return self.api_client.projects_serviceAccount.Get(request)
1532    except TRANSLATABLE_APITOOLS_EXCEPTIONS, e:
1533      self._TranslateExceptionAndRaise(e)
1534
1535  def CreateNotificationConfig(
1536      self,
1537      bucket_name,
1538      pubsub_topic,
1539      payload_format,
1540      event_types=None,
1541      custom_attributes=None,
1542      object_name_prefix=None):
1543    """See CloudApi class for function doc strings."""
1544    try:
1545      notification = apitools_messages.Notification(
1546          topic=pubsub_topic,
1547          payload_format=payload_format)
1548      if event_types:
1549        notification.event_types = event_types
1550      if custom_attributes:
1551        additional_properties = [
1552            NotificationAdditionalProperty(key=key, value=value)
1553            for key, value in custom_attributes.items()]
1554        notification.custom_attributes = NotificationCustomAttributesValue(
1555            additionalProperties=additional_properties)
1556      if object_name_prefix:
1557        notification.object_name_prefix = object_name_prefix
1558
1559      request = apitools_messages.StorageNotificationsInsertRequest(
1560          bucket=bucket_name,
1561          notification=notification,
1562          userProject=self.user_project)
1563      return self.api_client.notifications.Insert(request)
1564    except TRANSLATABLE_APITOOLS_EXCEPTIONS, e:
1565      self._TranslateExceptionAndRaise(e)
1566
1567  def DeleteNotificationConfig(
1568      self,
1569      bucket_name,
1570      notification):
1571    """See CloudApi class for function doc strings."""
1572    try:
1573      request = apitools_messages.StorageNotificationsDeleteRequest(
1574          bucket=bucket_name,
1575          notification=notification,
1576          userProject=self.user_project)
1577      return self.api_client.notifications.Delete(request)
1578    except TRANSLATABLE_APITOOLS_EXCEPTIONS, e:
1579      self._TranslateExceptionAndRaise(e)
1580
1581  def ListNotificationConfigs(self, bucket_name):
1582    """See CloudApi class for function doc strings."""
1583    try:
1584      request = apitools_messages.StorageNotificationsListRequest(
1585          bucket=bucket_name, userProject=self.user_project)
1586      response = self.api_client.notifications.List(request)
1587      for notification in response.items:
1588        yield notification
1589    except TRANSLATABLE_APITOOLS_EXCEPTIONS, e:
1590      self._TranslateExceptionAndRaise(e)
1591
1592  def _BucketCannedAclToPredefinedAcl(self, canned_acl_string):
1593    """Translates the input string to a bucket PredefinedAcl string.
1594
1595    Args:
1596      canned_acl_string: Canned ACL string.
1597
1598    Returns:
1599      String that can be used as a query parameter with the JSON API. This
1600      corresponds to a flavor of *PredefinedAclValueValuesEnum and can be
1601      used as input to apitools requests that affect bucket access controls.
1602    """
1603    # XML : JSON
1604    translation_dict = {
1605        None: None,
1606        'authenticated-read': 'authenticatedRead',
1607        'private': 'private',
1608        'project-private': 'projectPrivate',
1609        'public-read': 'publicRead',
1610        'public-read-write': 'publicReadWrite'
1611    }
1612    if canned_acl_string in translation_dict:
1613      return translation_dict[canned_acl_string]
1614    raise ArgumentException('Invalid canned ACL %s' % canned_acl_string)
1615
1616  def _ObjectCannedAclToPredefinedAcl(self, canned_acl_string):
1617    """Translates the input string to an object PredefinedAcl string.
1618
1619    Args:
1620      canned_acl_string: Canned ACL string.
1621
1622    Returns:
1623      String that can be used as a query parameter with the JSON API. This
1624      corresponds to a flavor of *PredefinedAclValueValuesEnum and can be
1625      used as input to apitools requests that affect object access controls.
1626    """
1627    # XML : JSON
1628    translation_dict = {
1629        None: None,
1630        'authenticated-read': 'authenticatedRead',
1631        'bucket-owner-read': 'bucketOwnerRead',
1632        'bucket-owner-full-control': 'bucketOwnerFullControl',
1633        'private': 'private',
1634        'project-private': 'projectPrivate',
1635        'public-read': 'publicRead'
1636    }
1637    if canned_acl_string in translation_dict:
1638      return translation_dict[canned_acl_string]
1639    raise ArgumentException('Invalid canned ACL %s' % canned_acl_string)
1640
1641  def _ValidateHttpAccessTokenRefreshError(self, e):
1642    if (isinstance(e, oauth2client.client.HttpAccessTokenRefreshError)
1643        and not (e.status == 429 or e.status >= 500)):
1644      raise
1645
1646  def _TranslateExceptionAndRaise(self, e, bucket_name=None, object_name=None,
1647                                  generation=None, not_found_exception=None):
1648    """Translates an HTTP exception and raises the translated or original value.
1649
1650    Args:
1651      e: Any Exception.
1652      bucket_name: Optional bucket name in request that caused the exception.
1653      object_name: Optional object name in request that caused the exception.
1654      generation: Optional generation in request that caused the exception.
1655      not_found_exception: Optional exception to raise in the not-found case.
1656
1657    Raises:
1658      Translated CloudApi exception, or the original exception if it was not
1659      translatable.
1660    """
1661    if self.logger.isEnabledFor(logging.DEBUG):
1662      self.logger.debug(
1663          'TranslateExceptionAndRaise: %s', traceback.format_exc())
1664    translated_exception = self._TranslateApitoolsException(
1665        e, bucket_name=bucket_name, object_name=object_name,
1666        generation=generation, not_found_exception=not_found_exception)
1667    if translated_exception:
1668      raise translated_exception
1669    else:
1670      raise
1671
1672  def _GetMessageFromHttpError(self, http_error):
1673    if isinstance(http_error, apitools_exceptions.HttpError):
1674      if getattr(http_error, 'content', None):
1675        try:
1676          json_obj = json.loads(http_error.content)
1677          if 'error' in json_obj and 'message' in json_obj['error']:
1678            return json_obj['error']['message']
1679        except Exception:  # pylint: disable=broad-except
1680          # If we couldn't decode anything, just leave the message as None.
1681          pass
1682
1683  def _GetAcceptableScopesFromHttpError(self, http_error):
1684    try:
1685      www_authenticate = http_error.response['www-authenticate']
1686      # In the event of a scope error, the www-authenticate field of the HTTP
1687      # response should contain text of the form
1688      #
1689      # 'Bearer realm="https://accounts.google.com/", error=insufficient_scope,
1690      # scope="${space separated list of acceptable scopes}"'
1691      #
1692      # Here we use a quick string search to find the scope list, just looking
1693      # for a substring with the form 'scope="${scopes}"'.
1694      scope_idx = www_authenticate.find('scope="')
1695      if scope_idx >= 0:
1696        scopes = www_authenticate[scope_idx:].split('"')[1]
1697        return 'Acceptable scopes: %s' % scopes
1698    except Exception:  # pylint: disable=broad-except
1699      # Return None if we have any trouble parsing out the acceptable scopes.
1700      pass
1701
1702  def _TranslateApitoolsResumableUploadException(self, e):
1703    if isinstance(e, apitools_exceptions.HttpError):
1704      message = self._GetMessageFromHttpError(e)
1705      if e.status_code >= 500:
1706        return ResumableUploadException(
1707            message or 'Server Error', status=e.status_code)
1708      elif e.status_code == 429:
1709        return ResumableUploadException(
1710            message or 'Too Many Requests', status=e.status_code)
1711      elif e.status_code == 410:
1712        return ResumableUploadStartOverException(
1713            message or 'Bad Request', status=e.status_code)
1714      elif e.status_code == 404:
1715        return ResumableUploadStartOverException(
1716            message or 'Bad Request', status=e.status_code)
1717      elif e.status_code >= 400:
1718        return ResumableUploadAbortException(
1719            message or 'Bad Request', status=e.status_code)
1720    if isinstance(e, apitools_exceptions.StreamExhausted):
1721      return ResumableUploadAbortException(e.message)
1722    if isinstance(e, apitools_exceptions.TransferError):
1723      if ('Aborting transfer' in e.message or
1724          'Not enough bytes in stream' in e.message):
1725        return ResumableUploadAbortException(e.message)
1726      elif 'additional bytes left in stream' in e.message:
1727        return ResumableUploadAbortException(
1728            '%s; this can happen if a file changes size while being uploaded' %
1729            e.message)
1730
1731  def _TranslateApitoolsException(self, e, bucket_name=None, object_name=None,
1732                                  generation=None, not_found_exception=None):
1733    """Translates apitools exceptions into their gsutil Cloud Api equivalents.
1734
1735    Args:
1736      e: Any exception in TRANSLATABLE_APITOOLS_EXCEPTIONS.
1737      bucket_name: Optional bucket name in request that caused the exception.
1738      object_name: Optional object name in request that caused the exception.
1739      generation: Optional generation in request that caused the exception.
1740      not_found_exception: Optional exception to raise in the not-found case.
1741
1742    Returns:
1743      CloudStorageApiServiceException for translatable exceptions, None
1744      otherwise.
1745    """
1746    if isinstance(e, apitools_exceptions.HttpError):
1747      message = self._GetMessageFromHttpError(e)
1748      if e.status_code == 400:
1749        # It is possible that the Project ID is incorrect.  Unfortunately the
1750        # JSON API does not give us much information about what part of the
1751        # request was bad.
1752        return BadRequestException(message or 'Bad Request',
1753                                   status=e.status_code)
1754      elif e.status_code == 401:
1755        if 'Login Required' in str(e):
1756          return AccessDeniedException(
1757              message or 'Access denied: login required.',
1758              status=e.status_code)
1759        elif 'insufficient_scope' in str(e):
1760          # If the service includes insufficient scope error detail in the
1761          # response body, this check can be removed.
1762          return AccessDeniedException(
1763              _INSUFFICIENT_OAUTH2_SCOPE_MESSAGE, status=e.status_code,
1764              body=self._GetAcceptableScopesFromHttpError(e))
1765      elif e.status_code == 403:
1766        if 'The account for the specified project has been disabled' in str(e):
1767          return AccessDeniedException(message or 'Account disabled.',
1768                                       status=e.status_code)
1769        elif 'Daily Limit for Unauthenticated Use Exceeded' in str(e):
1770          return AccessDeniedException(
1771              message or 'Access denied: quota exceeded. '
1772              'Is your project ID valid?',
1773              status=e.status_code)
1774        elif 'The bucket you tried to delete was not empty.' in str(e):
1775          return NotEmptyException('BucketNotEmpty (%s)' % bucket_name,
1776                                   status=e.status_code)
1777        elif ('The bucket you tried to create requires domain ownership '
1778              'verification.' in str(e)):
1779          return AccessDeniedException(
1780              'The bucket you tried to create requires domain ownership '
1781              'verification. Please see '
1782              'https://cloud.google.com/storage/docs/naming'
1783              '?hl=en#verification for more details.', status=e.status_code)
1784        elif 'User Rate Limit Exceeded' in str(e):
1785          return AccessDeniedException('Rate limit exceeded. Please retry this '
1786                                       'request later.', status=e.status_code)
1787        elif 'Access Not Configured' in str(e):
1788          return AccessDeniedException(
1789              'Access Not Configured. Please go to the Google Cloud Platform '
1790              'Console (https://cloud.google.com/console#/project) for your '
1791              'project, select APIs and Auth and enable the '
1792              'Google Cloud Storage JSON API.',
1793              status=e.status_code)
1794        elif 'insufficient_scope' in str(e):
1795          # If the service includes insufficient scope error detail in the
1796          # response body, this check can be removed.
1797          return AccessDeniedException(
1798              _INSUFFICIENT_OAUTH2_SCOPE_MESSAGE, status=e.status_code,
1799              body=self._GetAcceptableScopesFromHttpError(e))
1800        elif 'does not have permission to publish messages' in str(e):
1801          return PublishPermissionDeniedException(message, status=e.status_code)
1802        else:
1803          return AccessDeniedException(message or e.message,
1804                                       status=e.status_code)
1805      elif e.status_code == 404:
1806        if not_found_exception:
1807          # The exception is pre-constructed prior to translation; the HTTP
1808          # status code isn't available at that time.
1809          setattr(not_found_exception, 'status', e.status_code)
1810          return not_found_exception
1811        elif bucket_name:
1812          if object_name:
1813            return CreateObjectNotFoundException(e.status_code, self.provider,
1814                                                 bucket_name, object_name,
1815                                                 generation=generation)
1816          return CreateBucketNotFoundException(e.status_code, self.provider,
1817                                               bucket_name)
1818        return NotFoundException(e.message, status=e.status_code)
1819
1820      elif e.status_code == 409 and bucket_name:
1821        if 'The bucket you tried to delete was not empty.' in str(e):
1822          return NotEmptyException('BucketNotEmpty (%s)' % bucket_name,
1823                                   status=e.status_code)
1824        return ServiceException(
1825            'Bucket %s already exists.' % bucket_name, status=e.status_code)
1826      elif e.status_code == 412:
1827        return PreconditionException(message, status=e.status_code)
1828      return ServiceException(message, status=e.status_code)
1829    elif isinstance(e, apitools_exceptions.TransferInvalidError):
1830      return ServiceException('Transfer invalid (possible encoding error: %s)'
1831                              % str(e))
1832