1# -*- coding: utf-8 -*- 2# Copyright 2017 Google Inc. All Rights Reserved. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15"""JSON gsutil Cloud API implementation for Google Cloud Storage.""" 16 17from __future__ import absolute_import 18from __future__ import print_function 19from __future__ import division 20from __future__ import unicode_literals 21 22import json 23import logging 24import traceback 25 26from apitools.base.py import exceptions as apitools_exceptions 27from boto import config 28from gslib.cloud_api import AccessDeniedException 29from gslib.cloud_api import BadRequestException 30from gslib.cloud_api import NotFoundException 31from gslib.cloud_api import PreconditionException 32from gslib.cloud_api import ServiceException 33from gslib.gcs_json_credentials import SetUpJsonCredentialsAndCache 34from gslib.no_op_credentials import NoOpCredentials 35from gslib.third_party.pubsub_apitools import pubsub_v1_client as apitools_client 36from gslib.third_party.pubsub_apitools import pubsub_v1_messages as apitools_messages 37from gslib.utils import system_util 38from gslib.utils.boto_util import GetCertsFile 39from gslib.utils.boto_util import GetMaxRetryDelay 40from gslib.utils.boto_util import GetNewHttp 41from gslib.utils.boto_util import GetNumRetries 42from gslib.utils.constants import UTF8 43 44TRANSLATABLE_APITOOLS_EXCEPTIONS = (apitools_exceptions.HttpError) 45 46if system_util.InvokedViaCloudSdk(): 47 _INSUFFICIENT_OAUTH2_SCOPE_MESSAGE = ( 48 'Insufficient OAuth2 scope to perform this operation. ' 49 'Please re-run `gcloud auth login`') 50else: 51 _INSUFFICIENT_OAUTH2_SCOPE_MESSAGE = ( 52 'Insufficient OAuth2 scope to perform this operation. ' 53 'Please re-run `gsutil config`') 54 55 56class PubsubApi(object): 57 """Wraps calls to the Cloud Pub/Sub v1 interface via apitools.""" 58 59 def __init__(self, logger=None, credentials=None, debug=0): 60 """Performs necessary setup for interacting with Google Cloud Pub/Sub. 61 62 Args: 63 logger: logging.logger for outputting log messages. 64 credentials: Credentials to be used for interacting with Google Cloud 65 Pub/Sub 66 debug: Debug level for the API implementation (0..3). 67 """ 68 super(PubsubApi, self).__init__() 69 self.logger = logger 70 71 self.certs_file = GetCertsFile() 72 self.http = GetNewHttp() 73 self.http_base = 'https://' 74 self.host_base = config.get('Credentials', 'gs_pubsub_host', 75 'pubsub.googleapis.com') 76 gs_pubsub_port = config.get('Credentials', 'gs_pubsub_port', None) 77 self.host_port = (':' + gs_pubsub_port) if gs_pubsub_port else '' 78 self.url_base = (self.http_base + self.host_base + self.host_port) 79 80 SetUpJsonCredentialsAndCache(self, logger, credentials=credentials) 81 82 log_request = (debug >= 3) 83 log_response = (debug >= 3) 84 85 self.api_client = apitools_client.PubsubV1(url=self.url_base, 86 http=self.http, 87 log_request=log_request, 88 log_response=log_response, 89 credentials=self.credentials) 90 91 self.num_retries = GetNumRetries() 92 self.api_client.num_retries = self.num_retries 93 94 self.max_retry_wait = GetMaxRetryDelay() 95 self.api_client.max_retry_wait = self.max_retry_wait 96 97 if isinstance(self.credentials, NoOpCredentials): 98 # This API key is not secret and is used to identify gsutil during 99 # anonymous requests. 100 self.api_client.AddGlobalParam('key', 101 'AIzaSyDnacJHrKma0048b13sh8cgxNUwulubmJM') 102 103 def GetTopic(self, topic_name): 104 request = apitools_messages.PubsubProjectsTopicsGetRequest(topic=topic_name) 105 try: 106 return self.api_client.projects_topics.Get(request) 107 except TRANSLATABLE_APITOOLS_EXCEPTIONS as e: 108 self._TranslateExceptionAndRaise(e, topic_name=topic_name) 109 110 def CreateTopic(self, topic_name): 111 topic = apitools_messages.Topic(name=topic_name) 112 try: 113 return self.api_client.projects_topics.Create(topic) 114 except TRANSLATABLE_APITOOLS_EXCEPTIONS as e: 115 self._TranslateExceptionAndRaise(e, topic_name=topic_name) 116 117 def DeleteTopic(self, topic_name): 118 """Only used in system tests.""" 119 request = apitools_messages.PubsubProjectsTopicsDeleteRequest( 120 topic=topic_name) 121 try: 122 return self.api_client.projects_topics.Delete(request) 123 except TRANSLATABLE_APITOOLS_EXCEPTIONS as e: 124 self._TranslateExceptionAndRaise(e, topic_name=topic_name) 125 126 def GetTopicIamPolicy(self, topic_name): 127 request = apitools_messages.PubsubProjectsTopicsGetIamPolicyRequest( 128 resource=topic_name) 129 try: 130 return self.api_client.projects_topics.GetIamPolicy(request) 131 except TRANSLATABLE_APITOOLS_EXCEPTIONS as e: 132 self._TranslateExceptionAndRaise(e, topic_name=topic_name) 133 134 def SetTopicIamPolicy(self, topic_name, policy): 135 policy_request = apitools_messages.SetIamPolicyRequest(policy=policy) 136 request = apitools_messages.PubsubProjectsTopicsSetIamPolicyRequest( 137 resource=topic_name, setIamPolicyRequest=policy_request) 138 try: 139 return self.api_client.projects_topics.SetIamPolicy(request) 140 except TRANSLATABLE_APITOOLS_EXCEPTIONS as e: 141 self._TranslateExceptionAndRaise(e, topic_name=topic_name) 142 143 def _TranslateExceptionAndRaise(self, e, topic_name=None): 144 """Translates an HTTP exception and raises the translated or original value. 145 146 Args: 147 e: Any Exception. 148 topic_name: Optional topic name in request that caused the exception. 149 150 Raises: 151 Translated CloudApi exception, or the original exception if it was not 152 translatable. 153 """ 154 if self.logger.isEnabledFor(logging.DEBUG): 155 self.logger.debug('TranslateExceptionAndRaise: %s', 156 traceback.format_exc()) 157 translated_exception = self._TranslateApitoolsException( 158 e, topic_name=topic_name) 159 if translated_exception: 160 raise translated_exception 161 else: 162 raise 163 164 def _GetMessageFromHttpError(self, http_error): 165 if isinstance(http_error, apitools_exceptions.HttpError): 166 if getattr(http_error, 'content', None): 167 try: 168 json_obj = json.loads(http_error.content.decode(UTF8)) 169 if 'error' in json_obj and 'message' in json_obj['error']: 170 return json_obj['error']['message'] 171 except Exception: # pylint: disable=broad-except 172 # If we couldn't decode anything, just leave the message as None. 173 pass 174 175 def _GetAcceptableScopesFromHttpError(self, http_error): 176 try: 177 www_authenticate = http_error.response['www-authenticate'] 178 # In the event of a scope error, the www-authenticate field of the HTTP 179 # response should contain text of the form 180 # 181 # 'Bearer realm="https://oauth2.googleapis.com/", 182 # error=insufficient_scope, 183 # scope="${space separated list of acceptable scopes}"' 184 # 185 # Here we use a quick string search to find the scope list, just looking 186 # for a substring with the form 'scope="${scopes}"'. 187 scope_idx = www_authenticate.find('scope="') 188 if scope_idx >= 0: 189 scopes = www_authenticate[scope_idx:].split('"')[1] 190 return 'Acceptable scopes: %s' % scopes 191 except Exception: # pylint: disable=broad-except 192 # Return None if we have any trouble parsing out the acceptable scopes. 193 pass 194 195 def _TranslateApitoolsException(self, e, topic_name=None): 196 """Translates apitools exceptions into their gsutil equivalents. 197 198 Args: 199 e: Any exception in TRANSLATABLE_APITOOLS_EXCEPTIONS. 200 topic_name: Optional topic name in request that caused the exception. 201 202 Returns: 203 ServiceException for translatable exceptions, None 204 otherwise. 205 """ 206 207 if isinstance(e, apitools_exceptions.HttpError): 208 message = self._GetMessageFromHttpError(e) 209 if e.status_code == 400: 210 # It is possible that the Project ID is incorrect. Unfortunately the 211 # JSON API does not give us much information about what part of the 212 # request was bad. 213 return BadRequestException(message or 'Bad Request', 214 status=e.status_code) 215 elif e.status_code == 401: 216 if 'Login Required' in str(e): 217 return AccessDeniedException(message or 218 'Access denied: login required.', 219 status=e.status_code) 220 elif 'insufficient_scope' in str(e): 221 # If the service includes insufficient scope error detail in the 222 # response body, this check can be removed. 223 return AccessDeniedException( 224 _INSUFFICIENT_OAUTH2_SCOPE_MESSAGE, 225 status=e.status_code, 226 body=self._GetAcceptableScopesFromHttpError(e)) 227 elif e.status_code == 403: 228 if 'The account for the specified project has been disabled' in str(e): 229 return AccessDeniedException(message or 'Account disabled.', 230 status=e.status_code) 231 elif 'Daily Limit for Unauthenticated Use Exceeded' in str(e): 232 return AccessDeniedException(message or 233 'Access denied: quota exceeded. ' 234 'Is your project ID valid?', 235 status=e.status_code) 236 elif 'User Rate Limit Exceeded' in str(e): 237 return AccessDeniedException( 238 'Rate limit exceeded. Please retry this ' 239 'request later.', 240 status=e.status_code) 241 elif 'Access Not Configured' in str(e): 242 return AccessDeniedException( 243 'Access Not Configured. Please go to the Google Cloud Platform ' 244 'Console (https://cloud.google.com/console#/project) for your ' 245 'project, select APIs and Auth and enable the ' 246 'Google Cloud Pub/Sub API.', 247 status=e.status_code) 248 elif 'insufficient_scope' in str(e): 249 # If the service includes insufficient scope error detail in the 250 # response body, this check can be removed. 251 return AccessDeniedException( 252 _INSUFFICIENT_OAUTH2_SCOPE_MESSAGE, 253 status=e.status_code, 254 body=self._GetAcceptableScopesFromHttpError(e)) 255 else: 256 return AccessDeniedException(message or e.message, 257 status=e.status_code) 258 elif e.status_code == 404: 259 return NotFoundException(message, status=e.status_code) 260 261 elif e.status_code == 409 and topic_name: 262 return ServiceException('The topic %s already exists.' % topic_name, 263 status=e.status_code) 264 elif e.status_code == 412: 265 return PreconditionException(message, status=e.status_code) 266 return ServiceException(message, status=e.status_code) 267