1# Copyright 2016 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Google App Engine standard environment support. 16 17This module provides authentication and signing for applications running on App 18Engine in the standard environment using the `App Identity API`_. 19 20 21.. _App Identity API: 22 https://cloud.google.com/appengine/docs/python/appidentity/ 23""" 24 25import datetime 26 27from google.auth import _helpers 28from google.auth import credentials 29from google.auth import crypt 30 31# pytype: disable=import-error 32try: 33 from google.appengine.api import app_identity 34except ImportError: 35 app_identity = None 36# pytype: enable=import-error 37 38 39class Signer(crypt.Signer): 40 """Signs messages using the App Engine App Identity service. 41 42 This can be used in place of :class:`google.auth.crypt.Signer` when 43 running in the App Engine standard environment. 44 """ 45 46 @property 47 def key_id(self): 48 """Optional[str]: The key ID used to identify this private key. 49 50 .. warning:: 51 This is always ``None``. The key ID used by App Engine can not 52 be reliably determined ahead of time. 53 """ 54 return None 55 56 @_helpers.copy_docstring(crypt.Signer) 57 def sign(self, message): 58 message = _helpers.to_bytes(message) 59 _, signature = app_identity.sign_blob(message) 60 return signature 61 62 63def get_project_id(): 64 """Gets the project ID for the current App Engine application. 65 66 Returns: 67 str: The project ID 68 69 Raises: 70 EnvironmentError: If the App Engine APIs are unavailable. 71 """ 72 # pylint: disable=missing-raises-doc 73 # Pylint rightfully thinks EnvironmentError is OSError, but doesn't 74 # realize it's a valid alias. 75 if app_identity is None: 76 raise EnvironmentError("The App Engine APIs are not available.") 77 return app_identity.get_application_id() 78 79 80class Credentials( 81 credentials.Scoped, credentials.Signing, credentials.CredentialsWithQuotaProject 82): 83 """App Engine standard environment credentials. 84 85 These credentials use the App Engine App Identity API to obtain access 86 tokens. 87 """ 88 89 def __init__( 90 self, 91 scopes=None, 92 default_scopes=None, 93 service_account_id=None, 94 quota_project_id=None, 95 ): 96 """ 97 Args: 98 scopes (Sequence[str]): Scopes to request from the App Identity 99 API. 100 default_scopes (Sequence[str]): Default scopes passed by a 101 Google client library. Use 'scopes' for user-defined scopes. 102 service_account_id (str): The service account ID passed into 103 :func:`google.appengine.api.app_identity.get_access_token`. 104 If not specified, the default application service account 105 ID will be used. 106 quota_project_id (Optional[str]): The project ID used for quota 107 and billing. 108 109 Raises: 110 EnvironmentError: If the App Engine APIs are unavailable. 111 """ 112 # pylint: disable=missing-raises-doc 113 # Pylint rightfully thinks EnvironmentError is OSError, but doesn't 114 # realize it's a valid alias. 115 if app_identity is None: 116 raise EnvironmentError("The App Engine APIs are not available.") 117 118 super(Credentials, self).__init__() 119 self._scopes = scopes 120 self._default_scopes = default_scopes 121 self._service_account_id = service_account_id 122 self._signer = Signer() 123 self._quota_project_id = quota_project_id 124 125 @_helpers.copy_docstring(credentials.Credentials) 126 def refresh(self, request): 127 scopes = self._scopes if self._scopes is not None else self._default_scopes 128 # pylint: disable=unused-argument 129 token, ttl = app_identity.get_access_token(scopes, self._service_account_id) 130 expiry = datetime.datetime.utcfromtimestamp(ttl) 131 132 self.token, self.expiry = token, expiry 133 134 @property 135 def service_account_email(self): 136 """The service account email.""" 137 if self._service_account_id is None: 138 self._service_account_id = app_identity.get_service_account_name() 139 return self._service_account_id 140 141 @property 142 def requires_scopes(self): 143 """Checks if the credentials requires scopes. 144 145 Returns: 146 bool: True if there are no scopes set otherwise False. 147 """ 148 return not self._scopes and not self._default_scopes 149 150 @_helpers.copy_docstring(credentials.Scoped) 151 def with_scopes(self, scopes, default_scopes=None): 152 return self.__class__( 153 scopes=scopes, 154 default_scopes=default_scopes, 155 service_account_id=self._service_account_id, 156 quota_project_id=self.quota_project_id, 157 ) 158 159 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) 160 def with_quota_project(self, quota_project_id): 161 return self.__class__( 162 scopes=self._scopes, 163 service_account_id=self._service_account_id, 164 quota_project_id=quota_project_id, 165 ) 166 167 @_helpers.copy_docstring(credentials.Signing) 168 def sign_bytes(self, message): 169 return self._signer.sign(message) 170 171 @property 172 @_helpers.copy_docstring(credentials.Signing) 173 def signer_email(self): 174 return self.service_account_email 175 176 @property 177 @_helpers.copy_docstring(credentials.Signing) 178 def signer(self): 179 return self._signer 180