1# Copyright 2020 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import datetime 16import json 17import os 18 19import mock 20import pytest 21from six.moves import http_client 22from six.moves import urllib 23 24from google.auth import _helpers 25from google.auth import exceptions 26from google.auth import identity_pool 27from google.auth import transport 28 29 30CLIENT_ID = "username" 31CLIENT_SECRET = "password" 32# Base64 encoding of "username:password". 33BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ=" 34SERVICE_ACCOUNT_EMAIL = "service-1234@service-name.iam.gserviceaccount.com" 35SERVICE_ACCOUNT_IMPERSONATION_URL = ( 36 "https://us-east1-iamcredentials.googleapis.com/v1/projects/-" 37 + "/serviceAccounts/{}:generateAccessToken".format(SERVICE_ACCOUNT_EMAIL) 38) 39QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID" 40SCOPES = ["scope1", "scope2"] 41DATA_DIR = os.path.join(os.path.dirname(__file__), "data") 42SUBJECT_TOKEN_TEXT_FILE = os.path.join(DATA_DIR, "external_subject_token.txt") 43SUBJECT_TOKEN_JSON_FILE = os.path.join(DATA_DIR, "external_subject_token.json") 44SUBJECT_TOKEN_FIELD_NAME = "access_token" 45 46with open(SUBJECT_TOKEN_TEXT_FILE) as fh: 47 TEXT_FILE_SUBJECT_TOKEN = fh.read() 48 49with open(SUBJECT_TOKEN_JSON_FILE) as fh: 50 JSON_FILE_CONTENT = json.load(fh) 51 JSON_FILE_SUBJECT_TOKEN = JSON_FILE_CONTENT.get(SUBJECT_TOKEN_FIELD_NAME) 52 53TOKEN_URL = "https://sts.googleapis.com/v1/token" 54SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt" 55AUDIENCE = "//iam.googleapis.com/projects/123456/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID" 56WORKFORCE_AUDIENCE = ( 57 "//iam.googleapis.com/locations/global/workforcePools/POOL_ID/providers/PROVIDER_ID" 58) 59WORKFORCE_SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:id_token" 60WORKFORCE_POOL_USER_PROJECT = "WORKFORCE_POOL_USER_PROJECT_NUMBER" 61 62 63class TestCredentials(object): 64 CREDENTIAL_SOURCE_TEXT = {"file": SUBJECT_TOKEN_TEXT_FILE} 65 CREDENTIAL_SOURCE_JSON = { 66 "file": SUBJECT_TOKEN_JSON_FILE, 67 "format": {"type": "json", "subject_token_field_name": "access_token"}, 68 } 69 CREDENTIAL_URL = "http://fakeurl.com" 70 CREDENTIAL_SOURCE_TEXT_URL = {"url": CREDENTIAL_URL} 71 CREDENTIAL_SOURCE_JSON_URL = { 72 "url": CREDENTIAL_URL, 73 "format": {"type": "json", "subject_token_field_name": "access_token"}, 74 } 75 SUCCESS_RESPONSE = { 76 "access_token": "ACCESS_TOKEN", 77 "issued_token_type": "urn:ietf:params:oauth:token-type:access_token", 78 "token_type": "Bearer", 79 "expires_in": 3600, 80 "scope": " ".join(SCOPES), 81 } 82 83 @classmethod 84 def make_mock_response(cls, status, data): 85 response = mock.create_autospec(transport.Response, instance=True) 86 response.status = status 87 if isinstance(data, dict): 88 response.data = json.dumps(data).encode("utf-8") 89 else: 90 response.data = data 91 return response 92 93 @classmethod 94 def make_mock_request( 95 cls, token_status=http_client.OK, token_data=None, *extra_requests 96 ): 97 responses = [] 98 responses.append(cls.make_mock_response(token_status, token_data)) 99 100 while len(extra_requests) > 0: 101 # If service account impersonation is requested, mock the expected response. 102 status, data, extra_requests = ( 103 extra_requests[0], 104 extra_requests[1], 105 extra_requests[2:], 106 ) 107 responses.append(cls.make_mock_response(status, data)) 108 109 request = mock.create_autospec(transport.Request) 110 request.side_effect = responses 111 112 return request 113 114 @classmethod 115 def assert_credential_request_kwargs( 116 cls, request_kwargs, headers, url=CREDENTIAL_URL 117 ): 118 assert request_kwargs["url"] == url 119 assert request_kwargs["method"] == "GET" 120 assert request_kwargs["headers"] == headers 121 assert request_kwargs.get("body", None) is None 122 123 @classmethod 124 def assert_token_request_kwargs( 125 cls, request_kwargs, headers, request_data, token_url=TOKEN_URL 126 ): 127 assert request_kwargs["url"] == token_url 128 assert request_kwargs["method"] == "POST" 129 assert request_kwargs["headers"] == headers 130 assert request_kwargs["body"] is not None 131 body_tuples = urllib.parse.parse_qsl(request_kwargs["body"]) 132 assert len(body_tuples) == len(request_data.keys()) 133 for (k, v) in body_tuples: 134 assert v.decode("utf-8") == request_data[k.decode("utf-8")] 135 136 @classmethod 137 def assert_impersonation_request_kwargs( 138 cls, 139 request_kwargs, 140 headers, 141 request_data, 142 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, 143 ): 144 assert request_kwargs["url"] == service_account_impersonation_url 145 assert request_kwargs["method"] == "POST" 146 assert request_kwargs["headers"] == headers 147 assert request_kwargs["body"] is not None 148 body_json = json.loads(request_kwargs["body"].decode("utf-8")) 149 assert body_json == request_data 150 151 @classmethod 152 def assert_underlying_credentials_refresh( 153 cls, 154 credentials, 155 audience, 156 subject_token, 157 subject_token_type, 158 token_url, 159 service_account_impersonation_url=None, 160 basic_auth_encoding=None, 161 quota_project_id=None, 162 used_scopes=None, 163 credential_data=None, 164 scopes=None, 165 default_scopes=None, 166 workforce_pool_user_project=None, 167 ): 168 """Utility to assert that a credentials are initialized with the expected 169 attributes by calling refresh functionality and confirming response matches 170 expected one and that the underlying requests were populated with the 171 expected parameters. 172 """ 173 # STS token exchange request/response. 174 token_response = cls.SUCCESS_RESPONSE.copy() 175 token_headers = {"Content-Type": "application/x-www-form-urlencoded"} 176 if basic_auth_encoding: 177 token_headers["Authorization"] = "Basic " + basic_auth_encoding 178 179 if service_account_impersonation_url: 180 token_scopes = "https://www.googleapis.com/auth/iam" 181 else: 182 token_scopes = " ".join(used_scopes or []) 183 184 token_request_data = { 185 "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", 186 "audience": audience, 187 "requested_token_type": "urn:ietf:params:oauth:token-type:access_token", 188 "scope": token_scopes, 189 "subject_token": subject_token, 190 "subject_token_type": subject_token_type, 191 } 192 if workforce_pool_user_project: 193 token_request_data["options"] = urllib.parse.quote( 194 json.dumps({"userProject": workforce_pool_user_project}) 195 ) 196 197 if service_account_impersonation_url: 198 # Service account impersonation request/response. 199 expire_time = ( 200 _helpers.utcnow().replace(microsecond=0) 201 + datetime.timedelta(seconds=3600) 202 ).isoformat("T") + "Z" 203 impersonation_response = { 204 "accessToken": "SA_ACCESS_TOKEN", 205 "expireTime": expire_time, 206 } 207 impersonation_headers = { 208 "Content-Type": "application/json", 209 "authorization": "Bearer {}".format(token_response["access_token"]), 210 } 211 impersonation_request_data = { 212 "delegates": None, 213 "scope": used_scopes, 214 "lifetime": "3600s", 215 } 216 217 # Initialize mock request to handle token retrieval, token exchange and 218 # service account impersonation request. 219 requests = [] 220 if credential_data: 221 requests.append((http_client.OK, credential_data)) 222 223 token_request_index = len(requests) 224 requests.append((http_client.OK, token_response)) 225 226 if service_account_impersonation_url: 227 impersonation_request_index = len(requests) 228 requests.append((http_client.OK, impersonation_response)) 229 230 request = cls.make_mock_request(*[el for req in requests for el in req]) 231 232 credentials.refresh(request) 233 234 assert len(request.call_args_list) == len(requests) 235 if credential_data: 236 cls.assert_credential_request_kwargs(request.call_args_list[0][1], None) 237 # Verify token exchange request parameters. 238 cls.assert_token_request_kwargs( 239 request.call_args_list[token_request_index][1], 240 token_headers, 241 token_request_data, 242 token_url, 243 ) 244 # Verify service account impersonation request parameters if the request 245 # is processed. 246 if service_account_impersonation_url: 247 cls.assert_impersonation_request_kwargs( 248 request.call_args_list[impersonation_request_index][1], 249 impersonation_headers, 250 impersonation_request_data, 251 service_account_impersonation_url, 252 ) 253 assert credentials.token == impersonation_response["accessToken"] 254 else: 255 assert credentials.token == token_response["access_token"] 256 assert credentials.quota_project_id == quota_project_id 257 assert credentials.scopes == scopes 258 assert credentials.default_scopes == default_scopes 259 260 @classmethod 261 def make_credentials( 262 cls, 263 audience=AUDIENCE, 264 subject_token_type=SUBJECT_TOKEN_TYPE, 265 client_id=None, 266 client_secret=None, 267 quota_project_id=None, 268 scopes=None, 269 default_scopes=None, 270 service_account_impersonation_url=None, 271 credential_source=None, 272 workforce_pool_user_project=None, 273 ): 274 return identity_pool.Credentials( 275 audience=audience, 276 subject_token_type=subject_token_type, 277 token_url=TOKEN_URL, 278 service_account_impersonation_url=service_account_impersonation_url, 279 credential_source=credential_source, 280 client_id=client_id, 281 client_secret=client_secret, 282 quota_project_id=quota_project_id, 283 scopes=scopes, 284 default_scopes=default_scopes, 285 workforce_pool_user_project=workforce_pool_user_project, 286 ) 287 288 @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None) 289 def test_from_info_full_options(self, mock_init): 290 credentials = identity_pool.Credentials.from_info( 291 { 292 "audience": AUDIENCE, 293 "subject_token_type": SUBJECT_TOKEN_TYPE, 294 "token_url": TOKEN_URL, 295 "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL, 296 "client_id": CLIENT_ID, 297 "client_secret": CLIENT_SECRET, 298 "quota_project_id": QUOTA_PROJECT_ID, 299 "credential_source": self.CREDENTIAL_SOURCE_TEXT, 300 } 301 ) 302 303 # Confirm identity_pool.Credentials instantiated with expected attributes. 304 assert isinstance(credentials, identity_pool.Credentials) 305 mock_init.assert_called_once_with( 306 audience=AUDIENCE, 307 subject_token_type=SUBJECT_TOKEN_TYPE, 308 token_url=TOKEN_URL, 309 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, 310 client_id=CLIENT_ID, 311 client_secret=CLIENT_SECRET, 312 credential_source=self.CREDENTIAL_SOURCE_TEXT, 313 quota_project_id=QUOTA_PROJECT_ID, 314 workforce_pool_user_project=None, 315 ) 316 317 @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None) 318 def test_from_info_required_options_only(self, mock_init): 319 credentials = identity_pool.Credentials.from_info( 320 { 321 "audience": AUDIENCE, 322 "subject_token_type": SUBJECT_TOKEN_TYPE, 323 "token_url": TOKEN_URL, 324 "credential_source": self.CREDENTIAL_SOURCE_TEXT, 325 } 326 ) 327 328 # Confirm identity_pool.Credentials instantiated with expected attributes. 329 assert isinstance(credentials, identity_pool.Credentials) 330 mock_init.assert_called_once_with( 331 audience=AUDIENCE, 332 subject_token_type=SUBJECT_TOKEN_TYPE, 333 token_url=TOKEN_URL, 334 service_account_impersonation_url=None, 335 client_id=None, 336 client_secret=None, 337 credential_source=self.CREDENTIAL_SOURCE_TEXT, 338 quota_project_id=None, 339 workforce_pool_user_project=None, 340 ) 341 342 @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None) 343 def test_from_info_workforce_pool(self, mock_init): 344 credentials = identity_pool.Credentials.from_info( 345 { 346 "audience": WORKFORCE_AUDIENCE, 347 "subject_token_type": WORKFORCE_SUBJECT_TOKEN_TYPE, 348 "token_url": TOKEN_URL, 349 "credential_source": self.CREDENTIAL_SOURCE_TEXT, 350 "workforce_pool_user_project": WORKFORCE_POOL_USER_PROJECT, 351 } 352 ) 353 354 # Confirm identity_pool.Credentials instantiated with expected attributes. 355 assert isinstance(credentials, identity_pool.Credentials) 356 mock_init.assert_called_once_with( 357 audience=WORKFORCE_AUDIENCE, 358 subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, 359 token_url=TOKEN_URL, 360 service_account_impersonation_url=None, 361 client_id=None, 362 client_secret=None, 363 credential_source=self.CREDENTIAL_SOURCE_TEXT, 364 quota_project_id=None, 365 workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT, 366 ) 367 368 @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None) 369 def test_from_file_full_options(self, mock_init, tmpdir): 370 info = { 371 "audience": AUDIENCE, 372 "subject_token_type": SUBJECT_TOKEN_TYPE, 373 "token_url": TOKEN_URL, 374 "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL, 375 "client_id": CLIENT_ID, 376 "client_secret": CLIENT_SECRET, 377 "quota_project_id": QUOTA_PROJECT_ID, 378 "credential_source": self.CREDENTIAL_SOURCE_TEXT, 379 } 380 config_file = tmpdir.join("config.json") 381 config_file.write(json.dumps(info)) 382 credentials = identity_pool.Credentials.from_file(str(config_file)) 383 384 # Confirm identity_pool.Credentials instantiated with expected attributes. 385 assert isinstance(credentials, identity_pool.Credentials) 386 mock_init.assert_called_once_with( 387 audience=AUDIENCE, 388 subject_token_type=SUBJECT_TOKEN_TYPE, 389 token_url=TOKEN_URL, 390 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, 391 client_id=CLIENT_ID, 392 client_secret=CLIENT_SECRET, 393 credential_source=self.CREDENTIAL_SOURCE_TEXT, 394 quota_project_id=QUOTA_PROJECT_ID, 395 workforce_pool_user_project=None, 396 ) 397 398 @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None) 399 def test_from_file_required_options_only(self, mock_init, tmpdir): 400 info = { 401 "audience": AUDIENCE, 402 "subject_token_type": SUBJECT_TOKEN_TYPE, 403 "token_url": TOKEN_URL, 404 "credential_source": self.CREDENTIAL_SOURCE_TEXT, 405 } 406 config_file = tmpdir.join("config.json") 407 config_file.write(json.dumps(info)) 408 credentials = identity_pool.Credentials.from_file(str(config_file)) 409 410 # Confirm identity_pool.Credentials instantiated with expected attributes. 411 assert isinstance(credentials, identity_pool.Credentials) 412 mock_init.assert_called_once_with( 413 audience=AUDIENCE, 414 subject_token_type=SUBJECT_TOKEN_TYPE, 415 token_url=TOKEN_URL, 416 service_account_impersonation_url=None, 417 client_id=None, 418 client_secret=None, 419 credential_source=self.CREDENTIAL_SOURCE_TEXT, 420 quota_project_id=None, 421 workforce_pool_user_project=None, 422 ) 423 424 @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None) 425 def test_from_file_workforce_pool(self, mock_init, tmpdir): 426 info = { 427 "audience": WORKFORCE_AUDIENCE, 428 "subject_token_type": WORKFORCE_SUBJECT_TOKEN_TYPE, 429 "token_url": TOKEN_URL, 430 "credential_source": self.CREDENTIAL_SOURCE_TEXT, 431 "workforce_pool_user_project": WORKFORCE_POOL_USER_PROJECT, 432 } 433 config_file = tmpdir.join("config.json") 434 config_file.write(json.dumps(info)) 435 credentials = identity_pool.Credentials.from_file(str(config_file)) 436 437 # Confirm identity_pool.Credentials instantiated with expected attributes. 438 assert isinstance(credentials, identity_pool.Credentials) 439 mock_init.assert_called_once_with( 440 audience=WORKFORCE_AUDIENCE, 441 subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, 442 token_url=TOKEN_URL, 443 service_account_impersonation_url=None, 444 client_id=None, 445 client_secret=None, 446 credential_source=self.CREDENTIAL_SOURCE_TEXT, 447 quota_project_id=None, 448 workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT, 449 ) 450 451 def test_constructor_nonworkforce_with_workforce_pool_user_project(self): 452 with pytest.raises(ValueError) as excinfo: 453 self.make_credentials( 454 audience=AUDIENCE, 455 workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT, 456 ) 457 458 assert excinfo.match( 459 "workforce_pool_user_project should not be set for non-workforce " 460 "pool credentials" 461 ) 462 463 def test_constructor_invalid_options(self): 464 credential_source = {"unsupported": "value"} 465 466 with pytest.raises(ValueError) as excinfo: 467 self.make_credentials(credential_source=credential_source) 468 469 assert excinfo.match(r"Missing credential_source") 470 471 def test_constructor_invalid_options_url_and_file(self): 472 credential_source = { 473 "url": self.CREDENTIAL_URL, 474 "file": SUBJECT_TOKEN_TEXT_FILE, 475 } 476 477 with pytest.raises(ValueError) as excinfo: 478 self.make_credentials(credential_source=credential_source) 479 480 assert excinfo.match(r"Ambiguous credential_source") 481 482 def test_constructor_invalid_options_environment_id(self): 483 credential_source = {"url": self.CREDENTIAL_URL, "environment_id": "aws1"} 484 485 with pytest.raises(ValueError) as excinfo: 486 self.make_credentials(credential_source=credential_source) 487 488 assert excinfo.match( 489 r"Invalid Identity Pool credential_source field 'environment_id'" 490 ) 491 492 def test_constructor_invalid_credential_source(self): 493 with pytest.raises(ValueError) as excinfo: 494 self.make_credentials(credential_source="non-dict") 495 496 assert excinfo.match(r"Missing credential_source") 497 498 def test_constructor_invalid_credential_source_format_type(self): 499 credential_source = {"format": {"type": "xml"}} 500 501 with pytest.raises(ValueError) as excinfo: 502 self.make_credentials(credential_source=credential_source) 503 504 assert excinfo.match(r"Invalid credential_source format 'xml'") 505 506 def test_constructor_missing_subject_token_field_name(self): 507 credential_source = {"format": {"type": "json"}} 508 509 with pytest.raises(ValueError) as excinfo: 510 self.make_credentials(credential_source=credential_source) 511 512 assert excinfo.match( 513 r"Missing subject_token_field_name for JSON credential_source format" 514 ) 515 516 def test_info_with_workforce_pool_user_project(self): 517 credentials = self.make_credentials( 518 audience=WORKFORCE_AUDIENCE, 519 subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, 520 credential_source=self.CREDENTIAL_SOURCE_TEXT_URL.copy(), 521 workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT, 522 ) 523 524 assert credentials.info == { 525 "type": "external_account", 526 "audience": WORKFORCE_AUDIENCE, 527 "subject_token_type": WORKFORCE_SUBJECT_TOKEN_TYPE, 528 "token_url": TOKEN_URL, 529 "credential_source": self.CREDENTIAL_SOURCE_TEXT_URL, 530 "workforce_pool_user_project": WORKFORCE_POOL_USER_PROJECT, 531 } 532 533 def test_info_with_file_credential_source(self): 534 credentials = self.make_credentials( 535 credential_source=self.CREDENTIAL_SOURCE_TEXT_URL.copy() 536 ) 537 538 assert credentials.info == { 539 "type": "external_account", 540 "audience": AUDIENCE, 541 "subject_token_type": SUBJECT_TOKEN_TYPE, 542 "token_url": TOKEN_URL, 543 "credential_source": self.CREDENTIAL_SOURCE_TEXT_URL, 544 } 545 546 def test_info_with_url_credential_source(self): 547 credentials = self.make_credentials( 548 credential_source=self.CREDENTIAL_SOURCE_JSON_URL.copy() 549 ) 550 551 assert credentials.info == { 552 "type": "external_account", 553 "audience": AUDIENCE, 554 "subject_token_type": SUBJECT_TOKEN_TYPE, 555 "token_url": TOKEN_URL, 556 "credential_source": self.CREDENTIAL_SOURCE_JSON_URL, 557 } 558 559 def test_retrieve_subject_token_missing_subject_token(self, tmpdir): 560 # Provide empty text file. 561 empty_file = tmpdir.join("empty.txt") 562 empty_file.write("") 563 credential_source = {"file": str(empty_file)} 564 credentials = self.make_credentials(credential_source=credential_source) 565 566 with pytest.raises(exceptions.RefreshError) as excinfo: 567 credentials.retrieve_subject_token(None) 568 569 assert excinfo.match(r"Missing subject_token in the credential_source file") 570 571 def test_retrieve_subject_token_text_file(self): 572 credentials = self.make_credentials( 573 credential_source=self.CREDENTIAL_SOURCE_TEXT 574 ) 575 576 subject_token = credentials.retrieve_subject_token(None) 577 578 assert subject_token == TEXT_FILE_SUBJECT_TOKEN 579 580 def test_retrieve_subject_token_json_file(self): 581 credentials = self.make_credentials( 582 credential_source=self.CREDENTIAL_SOURCE_JSON 583 ) 584 585 subject_token = credentials.retrieve_subject_token(None) 586 587 assert subject_token == JSON_FILE_SUBJECT_TOKEN 588 589 def test_retrieve_subject_token_json_file_invalid_field_name(self): 590 credential_source = { 591 "file": SUBJECT_TOKEN_JSON_FILE, 592 "format": {"type": "json", "subject_token_field_name": "not_found"}, 593 } 594 credentials = self.make_credentials(credential_source=credential_source) 595 596 with pytest.raises(exceptions.RefreshError) as excinfo: 597 credentials.retrieve_subject_token(None) 598 599 assert excinfo.match( 600 "Unable to parse subject_token from JSON file '{}' using key '{}'".format( 601 SUBJECT_TOKEN_JSON_FILE, "not_found" 602 ) 603 ) 604 605 def test_retrieve_subject_token_invalid_json(self, tmpdir): 606 # Provide JSON file. This should result in JSON parsing error. 607 invalid_json_file = tmpdir.join("invalid.json") 608 invalid_json_file.write("{") 609 credential_source = { 610 "file": str(invalid_json_file), 611 "format": {"type": "json", "subject_token_field_name": "access_token"}, 612 } 613 credentials = self.make_credentials(credential_source=credential_source) 614 615 with pytest.raises(exceptions.RefreshError) as excinfo: 616 credentials.retrieve_subject_token(None) 617 618 assert excinfo.match( 619 "Unable to parse subject_token from JSON file '{}' using key '{}'".format( 620 str(invalid_json_file), "access_token" 621 ) 622 ) 623 624 def test_retrieve_subject_token_file_not_found(self): 625 credential_source = {"file": "./not_found.txt"} 626 credentials = self.make_credentials(credential_source=credential_source) 627 628 with pytest.raises(exceptions.RefreshError) as excinfo: 629 credentials.retrieve_subject_token(None) 630 631 assert excinfo.match(r"File './not_found.txt' was not found") 632 633 def test_refresh_text_file_success_without_impersonation_ignore_default_scopes( 634 self, 635 ): 636 credentials = self.make_credentials( 637 client_id=CLIENT_ID, 638 client_secret=CLIENT_SECRET, 639 # Test with text format type. 640 credential_source=self.CREDENTIAL_SOURCE_TEXT, 641 scopes=SCOPES, 642 # Default scopes should be ignored. 643 default_scopes=["ignored"], 644 ) 645 646 self.assert_underlying_credentials_refresh( 647 credentials=credentials, 648 audience=AUDIENCE, 649 subject_token=TEXT_FILE_SUBJECT_TOKEN, 650 subject_token_type=SUBJECT_TOKEN_TYPE, 651 token_url=TOKEN_URL, 652 service_account_impersonation_url=None, 653 basic_auth_encoding=BASIC_AUTH_ENCODING, 654 quota_project_id=None, 655 used_scopes=SCOPES, 656 scopes=SCOPES, 657 default_scopes=["ignored"], 658 ) 659 660 def test_refresh_workforce_success_with_client_auth_without_impersonation(self): 661 credentials = self.make_credentials( 662 audience=WORKFORCE_AUDIENCE, 663 subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, 664 client_id=CLIENT_ID, 665 client_secret=CLIENT_SECRET, 666 # Test with text format type. 667 credential_source=self.CREDENTIAL_SOURCE_TEXT, 668 scopes=SCOPES, 669 # This will be ignored in favor of client auth. 670 workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT, 671 ) 672 673 self.assert_underlying_credentials_refresh( 674 credentials=credentials, 675 audience=WORKFORCE_AUDIENCE, 676 subject_token=TEXT_FILE_SUBJECT_TOKEN, 677 subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, 678 token_url=TOKEN_URL, 679 service_account_impersonation_url=None, 680 basic_auth_encoding=BASIC_AUTH_ENCODING, 681 quota_project_id=None, 682 used_scopes=SCOPES, 683 scopes=SCOPES, 684 workforce_pool_user_project=None, 685 ) 686 687 def test_refresh_workforce_success_with_client_auth_and_no_workforce_project(self): 688 credentials = self.make_credentials( 689 audience=WORKFORCE_AUDIENCE, 690 subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, 691 client_id=CLIENT_ID, 692 client_secret=CLIENT_SECRET, 693 # Test with text format type. 694 credential_source=self.CREDENTIAL_SOURCE_TEXT, 695 scopes=SCOPES, 696 # This is not needed when client Auth is used. 697 workforce_pool_user_project=None, 698 ) 699 700 self.assert_underlying_credentials_refresh( 701 credentials=credentials, 702 audience=WORKFORCE_AUDIENCE, 703 subject_token=TEXT_FILE_SUBJECT_TOKEN, 704 subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, 705 token_url=TOKEN_URL, 706 service_account_impersonation_url=None, 707 basic_auth_encoding=BASIC_AUTH_ENCODING, 708 quota_project_id=None, 709 used_scopes=SCOPES, 710 scopes=SCOPES, 711 workforce_pool_user_project=None, 712 ) 713 714 def test_refresh_workforce_success_without_client_auth_without_impersonation(self): 715 credentials = self.make_credentials( 716 audience=WORKFORCE_AUDIENCE, 717 subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, 718 client_id=None, 719 client_secret=None, 720 # Test with text format type. 721 credential_source=self.CREDENTIAL_SOURCE_TEXT, 722 scopes=SCOPES, 723 # This will not be ignored as client auth is not used. 724 workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT, 725 ) 726 727 self.assert_underlying_credentials_refresh( 728 credentials=credentials, 729 audience=WORKFORCE_AUDIENCE, 730 subject_token=TEXT_FILE_SUBJECT_TOKEN, 731 subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, 732 token_url=TOKEN_URL, 733 service_account_impersonation_url=None, 734 basic_auth_encoding=None, 735 quota_project_id=None, 736 used_scopes=SCOPES, 737 scopes=SCOPES, 738 workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT, 739 ) 740 741 def test_refresh_workforce_success_without_client_auth_with_impersonation(self): 742 credentials = self.make_credentials( 743 audience=WORKFORCE_AUDIENCE, 744 subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, 745 client_id=None, 746 client_secret=None, 747 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, 748 # Test with text format type. 749 credential_source=self.CREDENTIAL_SOURCE_TEXT, 750 scopes=SCOPES, 751 # This will not be ignored as client auth is not used. 752 workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT, 753 ) 754 755 self.assert_underlying_credentials_refresh( 756 credentials=credentials, 757 audience=WORKFORCE_AUDIENCE, 758 subject_token=TEXT_FILE_SUBJECT_TOKEN, 759 subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, 760 token_url=TOKEN_URL, 761 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, 762 basic_auth_encoding=None, 763 quota_project_id=None, 764 used_scopes=SCOPES, 765 scopes=SCOPES, 766 workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT, 767 ) 768 769 def test_refresh_text_file_success_without_impersonation_use_default_scopes(self): 770 credentials = self.make_credentials( 771 client_id=CLIENT_ID, 772 client_secret=CLIENT_SECRET, 773 # Test with text format type. 774 credential_source=self.CREDENTIAL_SOURCE_TEXT, 775 scopes=None, 776 # Default scopes should be used since user specified scopes are none. 777 default_scopes=SCOPES, 778 ) 779 780 self.assert_underlying_credentials_refresh( 781 credentials=credentials, 782 audience=AUDIENCE, 783 subject_token=TEXT_FILE_SUBJECT_TOKEN, 784 subject_token_type=SUBJECT_TOKEN_TYPE, 785 token_url=TOKEN_URL, 786 service_account_impersonation_url=None, 787 basic_auth_encoding=BASIC_AUTH_ENCODING, 788 quota_project_id=None, 789 used_scopes=SCOPES, 790 scopes=None, 791 default_scopes=SCOPES, 792 ) 793 794 def test_refresh_text_file_success_with_impersonation_ignore_default_scopes(self): 795 # Initialize credentials with service account impersonation and basic auth. 796 credentials = self.make_credentials( 797 # Test with text format type. 798 credential_source=self.CREDENTIAL_SOURCE_TEXT, 799 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, 800 scopes=SCOPES, 801 # Default scopes should be ignored. 802 default_scopes=["ignored"], 803 ) 804 805 self.assert_underlying_credentials_refresh( 806 credentials=credentials, 807 audience=AUDIENCE, 808 subject_token=TEXT_FILE_SUBJECT_TOKEN, 809 subject_token_type=SUBJECT_TOKEN_TYPE, 810 token_url=TOKEN_URL, 811 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, 812 basic_auth_encoding=None, 813 quota_project_id=None, 814 used_scopes=SCOPES, 815 scopes=SCOPES, 816 default_scopes=["ignored"], 817 ) 818 819 def test_refresh_text_file_success_with_impersonation_use_default_scopes(self): 820 # Initialize credentials with service account impersonation, basic auth 821 # and default scopes (no user scopes). 822 credentials = self.make_credentials( 823 # Test with text format type. 824 credential_source=self.CREDENTIAL_SOURCE_TEXT, 825 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, 826 scopes=None, 827 # Default scopes should be used since user specified scopes are none. 828 default_scopes=SCOPES, 829 ) 830 831 self.assert_underlying_credentials_refresh( 832 credentials=credentials, 833 audience=AUDIENCE, 834 subject_token=TEXT_FILE_SUBJECT_TOKEN, 835 subject_token_type=SUBJECT_TOKEN_TYPE, 836 token_url=TOKEN_URL, 837 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, 838 basic_auth_encoding=None, 839 quota_project_id=None, 840 used_scopes=SCOPES, 841 scopes=None, 842 default_scopes=SCOPES, 843 ) 844 845 def test_refresh_json_file_success_without_impersonation(self): 846 credentials = self.make_credentials( 847 client_id=CLIENT_ID, 848 client_secret=CLIENT_SECRET, 849 # Test with JSON format type. 850 credential_source=self.CREDENTIAL_SOURCE_JSON, 851 scopes=SCOPES, 852 ) 853 854 self.assert_underlying_credentials_refresh( 855 credentials=credentials, 856 audience=AUDIENCE, 857 subject_token=JSON_FILE_SUBJECT_TOKEN, 858 subject_token_type=SUBJECT_TOKEN_TYPE, 859 token_url=TOKEN_URL, 860 service_account_impersonation_url=None, 861 basic_auth_encoding=BASIC_AUTH_ENCODING, 862 quota_project_id=None, 863 used_scopes=SCOPES, 864 scopes=SCOPES, 865 default_scopes=None, 866 ) 867 868 def test_refresh_json_file_success_with_impersonation(self): 869 # Initialize credentials with service account impersonation and basic auth. 870 credentials = self.make_credentials( 871 # Test with JSON format type. 872 credential_source=self.CREDENTIAL_SOURCE_JSON, 873 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, 874 scopes=SCOPES, 875 ) 876 877 self.assert_underlying_credentials_refresh( 878 credentials=credentials, 879 audience=AUDIENCE, 880 subject_token=JSON_FILE_SUBJECT_TOKEN, 881 subject_token_type=SUBJECT_TOKEN_TYPE, 882 token_url=TOKEN_URL, 883 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, 884 basic_auth_encoding=None, 885 quota_project_id=None, 886 used_scopes=SCOPES, 887 scopes=SCOPES, 888 default_scopes=None, 889 ) 890 891 def test_refresh_with_retrieve_subject_token_error(self): 892 credential_source = { 893 "file": SUBJECT_TOKEN_JSON_FILE, 894 "format": {"type": "json", "subject_token_field_name": "not_found"}, 895 } 896 credentials = self.make_credentials(credential_source=credential_source) 897 898 with pytest.raises(exceptions.RefreshError) as excinfo: 899 credentials.refresh(None) 900 901 assert excinfo.match( 902 "Unable to parse subject_token from JSON file '{}' using key '{}'".format( 903 SUBJECT_TOKEN_JSON_FILE, "not_found" 904 ) 905 ) 906 907 def test_retrieve_subject_token_from_url(self): 908 credentials = self.make_credentials( 909 credential_source=self.CREDENTIAL_SOURCE_TEXT_URL 910 ) 911 request = self.make_mock_request(token_data=TEXT_FILE_SUBJECT_TOKEN) 912 subject_token = credentials.retrieve_subject_token(request) 913 914 assert subject_token == TEXT_FILE_SUBJECT_TOKEN 915 self.assert_credential_request_kwargs(request.call_args_list[0][1], None) 916 917 def test_retrieve_subject_token_from_url_with_headers(self): 918 credentials = self.make_credentials( 919 credential_source={"url": self.CREDENTIAL_URL, "headers": {"foo": "bar"}} 920 ) 921 request = self.make_mock_request(token_data=TEXT_FILE_SUBJECT_TOKEN) 922 subject_token = credentials.retrieve_subject_token(request) 923 924 assert subject_token == TEXT_FILE_SUBJECT_TOKEN 925 self.assert_credential_request_kwargs( 926 request.call_args_list[0][1], {"foo": "bar"} 927 ) 928 929 def test_retrieve_subject_token_from_url_json(self): 930 credentials = self.make_credentials( 931 credential_source=self.CREDENTIAL_SOURCE_JSON_URL 932 ) 933 request = self.make_mock_request(token_data=JSON_FILE_CONTENT) 934 subject_token = credentials.retrieve_subject_token(request) 935 936 assert subject_token == JSON_FILE_SUBJECT_TOKEN 937 self.assert_credential_request_kwargs(request.call_args_list[0][1], None) 938 939 def test_retrieve_subject_token_from_url_json_with_headers(self): 940 credentials = self.make_credentials( 941 credential_source={ 942 "url": self.CREDENTIAL_URL, 943 "format": {"type": "json", "subject_token_field_name": "access_token"}, 944 "headers": {"foo": "bar"}, 945 } 946 ) 947 request = self.make_mock_request(token_data=JSON_FILE_CONTENT) 948 subject_token = credentials.retrieve_subject_token(request) 949 950 assert subject_token == JSON_FILE_SUBJECT_TOKEN 951 self.assert_credential_request_kwargs( 952 request.call_args_list[0][1], {"foo": "bar"} 953 ) 954 955 def test_retrieve_subject_token_from_url_not_found(self): 956 credentials = self.make_credentials( 957 credential_source=self.CREDENTIAL_SOURCE_TEXT_URL 958 ) 959 with pytest.raises(exceptions.RefreshError) as excinfo: 960 credentials.retrieve_subject_token( 961 self.make_mock_request(token_status=404, token_data=JSON_FILE_CONTENT) 962 ) 963 964 assert excinfo.match("Unable to retrieve Identity Pool subject token") 965 966 def test_retrieve_subject_token_from_url_json_invalid_field(self): 967 credential_source = { 968 "url": self.CREDENTIAL_URL, 969 "format": {"type": "json", "subject_token_field_name": "not_found"}, 970 } 971 credentials = self.make_credentials(credential_source=credential_source) 972 973 with pytest.raises(exceptions.RefreshError) as excinfo: 974 credentials.retrieve_subject_token( 975 self.make_mock_request(token_data=JSON_FILE_CONTENT) 976 ) 977 978 assert excinfo.match( 979 "Unable to parse subject_token from JSON file '{}' using key '{}'".format( 980 self.CREDENTIAL_URL, "not_found" 981 ) 982 ) 983 984 def test_retrieve_subject_token_from_url_json_invalid_format(self): 985 credentials = self.make_credentials( 986 credential_source=self.CREDENTIAL_SOURCE_JSON_URL 987 ) 988 989 with pytest.raises(exceptions.RefreshError) as excinfo: 990 credentials.retrieve_subject_token(self.make_mock_request(token_data="{")) 991 992 assert excinfo.match( 993 "Unable to parse subject_token from JSON file '{}' using key '{}'".format( 994 self.CREDENTIAL_URL, "access_token" 995 ) 996 ) 997 998 def test_refresh_text_file_success_without_impersonation_url(self): 999 credentials = self.make_credentials( 1000 client_id=CLIENT_ID, 1001 client_secret=CLIENT_SECRET, 1002 # Test with text format type. 1003 credential_source=self.CREDENTIAL_SOURCE_TEXT_URL, 1004 scopes=SCOPES, 1005 ) 1006 1007 self.assert_underlying_credentials_refresh( 1008 credentials=credentials, 1009 audience=AUDIENCE, 1010 subject_token=TEXT_FILE_SUBJECT_TOKEN, 1011 subject_token_type=SUBJECT_TOKEN_TYPE, 1012 token_url=TOKEN_URL, 1013 service_account_impersonation_url=None, 1014 basic_auth_encoding=BASIC_AUTH_ENCODING, 1015 quota_project_id=None, 1016 used_scopes=SCOPES, 1017 scopes=SCOPES, 1018 default_scopes=None, 1019 credential_data=TEXT_FILE_SUBJECT_TOKEN, 1020 ) 1021 1022 def test_refresh_text_file_success_with_impersonation_url(self): 1023 # Initialize credentials with service account impersonation and basic auth. 1024 credentials = self.make_credentials( 1025 # Test with text format type. 1026 credential_source=self.CREDENTIAL_SOURCE_TEXT_URL, 1027 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, 1028 scopes=SCOPES, 1029 ) 1030 1031 self.assert_underlying_credentials_refresh( 1032 credentials=credentials, 1033 audience=AUDIENCE, 1034 subject_token=TEXT_FILE_SUBJECT_TOKEN, 1035 subject_token_type=SUBJECT_TOKEN_TYPE, 1036 token_url=TOKEN_URL, 1037 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, 1038 basic_auth_encoding=None, 1039 quota_project_id=None, 1040 used_scopes=SCOPES, 1041 scopes=SCOPES, 1042 default_scopes=None, 1043 credential_data=TEXT_FILE_SUBJECT_TOKEN, 1044 ) 1045 1046 def test_refresh_json_file_success_without_impersonation_url(self): 1047 credentials = self.make_credentials( 1048 client_id=CLIENT_ID, 1049 client_secret=CLIENT_SECRET, 1050 # Test with JSON format type. 1051 credential_source=self.CREDENTIAL_SOURCE_JSON_URL, 1052 scopes=SCOPES, 1053 ) 1054 1055 self.assert_underlying_credentials_refresh( 1056 credentials=credentials, 1057 audience=AUDIENCE, 1058 subject_token=JSON_FILE_SUBJECT_TOKEN, 1059 subject_token_type=SUBJECT_TOKEN_TYPE, 1060 token_url=TOKEN_URL, 1061 service_account_impersonation_url=None, 1062 basic_auth_encoding=BASIC_AUTH_ENCODING, 1063 quota_project_id=None, 1064 used_scopes=SCOPES, 1065 scopes=SCOPES, 1066 default_scopes=None, 1067 credential_data=JSON_FILE_CONTENT, 1068 ) 1069 1070 def test_refresh_json_file_success_with_impersonation_url(self): 1071 # Initialize credentials with service account impersonation and basic auth. 1072 credentials = self.make_credentials( 1073 # Test with JSON format type. 1074 credential_source=self.CREDENTIAL_SOURCE_JSON_URL, 1075 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, 1076 scopes=SCOPES, 1077 ) 1078 1079 self.assert_underlying_credentials_refresh( 1080 credentials=credentials, 1081 audience=AUDIENCE, 1082 subject_token=JSON_FILE_SUBJECT_TOKEN, 1083 subject_token_type=SUBJECT_TOKEN_TYPE, 1084 token_url=TOKEN_URL, 1085 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, 1086 basic_auth_encoding=None, 1087 quota_project_id=None, 1088 used_scopes=SCOPES, 1089 scopes=SCOPES, 1090 default_scopes=None, 1091 credential_data=JSON_FILE_CONTENT, 1092 ) 1093 1094 def test_refresh_with_retrieve_subject_token_error_url(self): 1095 credential_source = { 1096 "url": self.CREDENTIAL_URL, 1097 "format": {"type": "json", "subject_token_field_name": "not_found"}, 1098 } 1099 credentials = self.make_credentials(credential_source=credential_source) 1100 1101 with pytest.raises(exceptions.RefreshError) as excinfo: 1102 credentials.refresh(self.make_mock_request(token_data=JSON_FILE_CONTENT)) 1103 1104 assert excinfo.match( 1105 "Unable to parse subject_token from JSON file '{}' using key '{}'".format( 1106 self.CREDENTIAL_URL, "not_found" 1107 ) 1108 ) 1109