1# -*- coding: utf-8 -*-
2# Copyright 2017 Google Inc. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""JSON gsutil Cloud API implementation for Google Cloud Storage."""
16
17from __future__ import absolute_import
18from __future__ import print_function
19from __future__ import division
20from __future__ import unicode_literals
21
22import json
23import logging
24import traceback
25
26from apitools.base.py import exceptions as apitools_exceptions
27from boto import config
28from gslib.cloud_api import AccessDeniedException
29from gslib.cloud_api import BadRequestException
30from gslib.cloud_api import NotFoundException
31from gslib.cloud_api import PreconditionException
32from gslib.cloud_api import ServiceException
33from gslib.gcs_json_credentials import SetUpJsonCredentialsAndCache
34from gslib.no_op_credentials import NoOpCredentials
35from gslib.third_party.pubsub_apitools import pubsub_v1_client as apitools_client
36from gslib.third_party.pubsub_apitools import pubsub_v1_messages as apitools_messages
37from gslib.utils import system_util
38from gslib.utils.boto_util import GetCertsFile
39from gslib.utils.boto_util import GetMaxRetryDelay
40from gslib.utils.boto_util import GetNewHttp
41from gslib.utils.boto_util import GetNumRetries
42from gslib.utils.constants import UTF8
43
44TRANSLATABLE_APITOOLS_EXCEPTIONS = (apitools_exceptions.HttpError)
45
46if system_util.InvokedViaCloudSdk():
47  _INSUFFICIENT_OAUTH2_SCOPE_MESSAGE = (
48      'Insufficient OAuth2 scope to perform this operation. '
49      'Please re-run `gcloud auth login`')
50else:
51  _INSUFFICIENT_OAUTH2_SCOPE_MESSAGE = (
52      'Insufficient OAuth2 scope to perform this operation. '
53      'Please re-run `gsutil config`')
54
55
56class PubsubApi(object):
57  """Wraps calls to the Cloud Pub/Sub v1 interface via apitools."""
58
59  def __init__(self, logger=None, credentials=None, debug=0):
60    """Performs necessary setup for interacting with Google Cloud Pub/Sub.
61
62    Args:
63      logger: logging.logger for outputting log messages.
64      credentials: Credentials to be used for interacting with Google Cloud
65          Pub/Sub
66      debug: Debug level for the API implementation (0..3).
67    """
68    super(PubsubApi, self).__init__()
69    self.logger = logger
70
71    self.certs_file = GetCertsFile()
72    self.http = GetNewHttp()
73    self.http_base = 'https://'
74    self.host_base = config.get('Credentials', 'gs_pubsub_host',
75                                'pubsub.googleapis.com')
76    gs_pubsub_port = config.get('Credentials', 'gs_pubsub_port', None)
77    self.host_port = (':' + gs_pubsub_port) if gs_pubsub_port else ''
78    self.url_base = (self.http_base + self.host_base + self.host_port)
79
80    SetUpJsonCredentialsAndCache(self, logger, credentials=credentials)
81
82    log_request = (debug >= 3)
83    log_response = (debug >= 3)
84
85    self.api_client = apitools_client.PubsubV1(url=self.url_base,
86                                               http=self.http,
87                                               log_request=log_request,
88                                               log_response=log_response,
89                                               credentials=self.credentials)
90
91    self.num_retries = GetNumRetries()
92    self.api_client.num_retries = self.num_retries
93
94    self.max_retry_wait = GetMaxRetryDelay()
95    self.api_client.max_retry_wait = self.max_retry_wait
96
97    if isinstance(self.credentials, NoOpCredentials):
98      # This API key is not secret and is used to identify gsutil during
99      # anonymous requests.
100      self.api_client.AddGlobalParam('key',
101                                     'AIzaSyDnacJHrKma0048b13sh8cgxNUwulubmJM')
102
103  def GetTopic(self, topic_name):
104    request = apitools_messages.PubsubProjectsTopicsGetRequest(topic=topic_name)
105    try:
106      return self.api_client.projects_topics.Get(request)
107    except TRANSLATABLE_APITOOLS_EXCEPTIONS as e:
108      self._TranslateExceptionAndRaise(e, topic_name=topic_name)
109
110  def CreateTopic(self, topic_name):
111    topic = apitools_messages.Topic(name=topic_name)
112    try:
113      return self.api_client.projects_topics.Create(topic)
114    except TRANSLATABLE_APITOOLS_EXCEPTIONS as e:
115      self._TranslateExceptionAndRaise(e, topic_name=topic_name)
116
117  def DeleteTopic(self, topic_name):
118    """Only used in system tests."""
119    request = apitools_messages.PubsubProjectsTopicsDeleteRequest(
120        topic=topic_name)
121    try:
122      return self.api_client.projects_topics.Delete(request)
123    except TRANSLATABLE_APITOOLS_EXCEPTIONS as e:
124      self._TranslateExceptionAndRaise(e, topic_name=topic_name)
125
126  def GetTopicIamPolicy(self, topic_name):
127    request = apitools_messages.PubsubProjectsTopicsGetIamPolicyRequest(
128        resource=topic_name)
129    try:
130      return self.api_client.projects_topics.GetIamPolicy(request)
131    except TRANSLATABLE_APITOOLS_EXCEPTIONS as e:
132      self._TranslateExceptionAndRaise(e, topic_name=topic_name)
133
134  def SetTopicIamPolicy(self, topic_name, policy):
135    policy_request = apitools_messages.SetIamPolicyRequest(policy=policy)
136    request = apitools_messages.PubsubProjectsTopicsSetIamPolicyRequest(
137        resource=topic_name, setIamPolicyRequest=policy_request)
138    try:
139      return self.api_client.projects_topics.SetIamPolicy(request)
140    except TRANSLATABLE_APITOOLS_EXCEPTIONS as e:
141      self._TranslateExceptionAndRaise(e, topic_name=topic_name)
142
143  def _TranslateExceptionAndRaise(self, e, topic_name=None):
144    """Translates an HTTP exception and raises the translated or original value.
145
146    Args:
147      e: Any Exception.
148      topic_name: Optional topic name in request that caused the exception.
149
150    Raises:
151      Translated CloudApi exception, or the original exception if it was not
152      translatable.
153    """
154    if self.logger.isEnabledFor(logging.DEBUG):
155      self.logger.debug('TranslateExceptionAndRaise: %s',
156                        traceback.format_exc())
157    translated_exception = self._TranslateApitoolsException(
158        e, topic_name=topic_name)
159    if translated_exception:
160      raise translated_exception
161    else:
162      raise
163
164  def _GetMessageFromHttpError(self, http_error):
165    if isinstance(http_error, apitools_exceptions.HttpError):
166      if getattr(http_error, 'content', None):
167        try:
168          json_obj = json.loads(http_error.content.decode(UTF8))
169          if 'error' in json_obj and 'message' in json_obj['error']:
170            return json_obj['error']['message']
171        except Exception:  # pylint: disable=broad-except
172          # If we couldn't decode anything, just leave the message as None.
173          pass
174
175  def _GetAcceptableScopesFromHttpError(self, http_error):
176    try:
177      www_authenticate = http_error.response['www-authenticate']
178      # In the event of a scope error, the www-authenticate field of the HTTP
179      # response should contain text of the form
180      #
181      # 'Bearer realm="https://oauth2.googleapis.com/",
182      # error=insufficient_scope,
183      # scope="${space separated list of acceptable scopes}"'
184      #
185      # Here we use a quick string search to find the scope list, just looking
186      # for a substring with the form 'scope="${scopes}"'.
187      scope_idx = www_authenticate.find('scope="')
188      if scope_idx >= 0:
189        scopes = www_authenticate[scope_idx:].split('"')[1]
190        return 'Acceptable scopes: %s' % scopes
191    except Exception:  # pylint: disable=broad-except
192      # Return None if we have any trouble parsing out the acceptable scopes.
193      pass
194
195  def _TranslateApitoolsException(self, e, topic_name=None):
196    """Translates apitools exceptions into their gsutil equivalents.
197
198    Args:
199      e: Any exception in TRANSLATABLE_APITOOLS_EXCEPTIONS.
200      topic_name: Optional topic name in request that caused the exception.
201
202    Returns:
203      ServiceException for translatable exceptions, None
204      otherwise.
205    """
206
207    if isinstance(e, apitools_exceptions.HttpError):
208      message = self._GetMessageFromHttpError(e)
209      if e.status_code == 400:
210        # It is possible that the Project ID is incorrect.  Unfortunately the
211        # JSON API does not give us much information about what part of the
212        # request was bad.
213        return BadRequestException(message or 'Bad Request',
214                                   status=e.status_code)
215      elif e.status_code == 401:
216        if 'Login Required' in str(e):
217          return AccessDeniedException(message or
218                                       'Access denied: login required.',
219                                       status=e.status_code)
220        elif 'insufficient_scope' in str(e):
221          # If the service includes insufficient scope error detail in the
222          # response body, this check can be removed.
223          return AccessDeniedException(
224              _INSUFFICIENT_OAUTH2_SCOPE_MESSAGE,
225              status=e.status_code,
226              body=self._GetAcceptableScopesFromHttpError(e))
227      elif e.status_code == 403:
228        if 'The account for the specified project has been disabled' in str(e):
229          return AccessDeniedException(message or 'Account disabled.',
230                                       status=e.status_code)
231        elif 'Daily Limit for Unauthenticated Use Exceeded' in str(e):
232          return AccessDeniedException(message or
233                                       'Access denied: quota exceeded. '
234                                       'Is your project ID valid?',
235                                       status=e.status_code)
236        elif 'User Rate Limit Exceeded' in str(e):
237          return AccessDeniedException(
238              'Rate limit exceeded. Please retry this '
239              'request later.',
240              status=e.status_code)
241        elif 'Access Not Configured' in str(e):
242          return AccessDeniedException(
243              'Access Not Configured. Please go to the Google Cloud Platform '
244              'Console (https://cloud.google.com/console#/project) for your '
245              'project, select APIs and Auth and enable the '
246              'Google Cloud Pub/Sub API.',
247              status=e.status_code)
248        elif 'insufficient_scope' in str(e):
249          # If the service includes insufficient scope error detail in the
250          # response body, this check can be removed.
251          return AccessDeniedException(
252              _INSUFFICIENT_OAUTH2_SCOPE_MESSAGE,
253              status=e.status_code,
254              body=self._GetAcceptableScopesFromHttpError(e))
255        else:
256          return AccessDeniedException(message or e.message,
257                                       status=e.status_code)
258      elif e.status_code == 404:
259        return NotFoundException(message, status=e.status_code)
260
261      elif e.status_code == 409 and topic_name:
262        return ServiceException('The topic %s already exists.' % topic_name,
263                                status=e.status_code)
264      elif e.status_code == 412:
265        return PreconditionException(message, status=e.status_code)
266      return ServiceException(message, status=e.status_code)
267