1# Copyright 2020 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import datetime
16import json
17import os
18
19import mock
20import pytest
21from six.moves import http_client
22from six.moves import urllib
23
24from google.auth import _helpers
25from google.auth import exceptions
26from google.auth import identity_pool
27from google.auth import transport
28
29
30CLIENT_ID = "username"
31CLIENT_SECRET = "password"
32# Base64 encoding of "username:password".
33BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ="
34SERVICE_ACCOUNT_EMAIL = "service-1234@service-name.iam.gserviceaccount.com"
35SERVICE_ACCOUNT_IMPERSONATION_URL = (
36    "https://us-east1-iamcredentials.googleapis.com/v1/projects/-"
37    + "/serviceAccounts/{}:generateAccessToken".format(SERVICE_ACCOUNT_EMAIL)
38)
39QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID"
40SCOPES = ["scope1", "scope2"]
41DATA_DIR = os.path.join(os.path.dirname(__file__), "data")
42SUBJECT_TOKEN_TEXT_FILE = os.path.join(DATA_DIR, "external_subject_token.txt")
43SUBJECT_TOKEN_JSON_FILE = os.path.join(DATA_DIR, "external_subject_token.json")
44SUBJECT_TOKEN_FIELD_NAME = "access_token"
45
46with open(SUBJECT_TOKEN_TEXT_FILE) as fh:
47    TEXT_FILE_SUBJECT_TOKEN = fh.read()
48
49with open(SUBJECT_TOKEN_JSON_FILE) as fh:
50    JSON_FILE_CONTENT = json.load(fh)
51    JSON_FILE_SUBJECT_TOKEN = JSON_FILE_CONTENT.get(SUBJECT_TOKEN_FIELD_NAME)
52
53TOKEN_URL = "https://sts.googleapis.com/v1/token"
54SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt"
55AUDIENCE = "//iam.googleapis.com/projects/123456/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID"
56WORKFORCE_AUDIENCE = (
57    "//iam.googleapis.com/locations/global/workforcePools/POOL_ID/providers/PROVIDER_ID"
58)
59WORKFORCE_SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:id_token"
60WORKFORCE_POOL_USER_PROJECT = "WORKFORCE_POOL_USER_PROJECT_NUMBER"
61
62
63class TestCredentials(object):
64    CREDENTIAL_SOURCE_TEXT = {"file": SUBJECT_TOKEN_TEXT_FILE}
65    CREDENTIAL_SOURCE_JSON = {
66        "file": SUBJECT_TOKEN_JSON_FILE,
67        "format": {"type": "json", "subject_token_field_name": "access_token"},
68    }
69    CREDENTIAL_URL = "http://fakeurl.com"
70    CREDENTIAL_SOURCE_TEXT_URL = {"url": CREDENTIAL_URL}
71    CREDENTIAL_SOURCE_JSON_URL = {
72        "url": CREDENTIAL_URL,
73        "format": {"type": "json", "subject_token_field_name": "access_token"},
74    }
75    SUCCESS_RESPONSE = {
76        "access_token": "ACCESS_TOKEN",
77        "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
78        "token_type": "Bearer",
79        "expires_in": 3600,
80        "scope": " ".join(SCOPES),
81    }
82
83    @classmethod
84    def make_mock_response(cls, status, data):
85        response = mock.create_autospec(transport.Response, instance=True)
86        response.status = status
87        if isinstance(data, dict):
88            response.data = json.dumps(data).encode("utf-8")
89        else:
90            response.data = data
91        return response
92
93    @classmethod
94    def make_mock_request(
95        cls, token_status=http_client.OK, token_data=None, *extra_requests
96    ):
97        responses = []
98        responses.append(cls.make_mock_response(token_status, token_data))
99
100        while len(extra_requests) > 0:
101            # If service account impersonation is requested, mock the expected response.
102            status, data, extra_requests = (
103                extra_requests[0],
104                extra_requests[1],
105                extra_requests[2:],
106            )
107            responses.append(cls.make_mock_response(status, data))
108
109        request = mock.create_autospec(transport.Request)
110        request.side_effect = responses
111
112        return request
113
114    @classmethod
115    def assert_credential_request_kwargs(
116        cls, request_kwargs, headers, url=CREDENTIAL_URL
117    ):
118        assert request_kwargs["url"] == url
119        assert request_kwargs["method"] == "GET"
120        assert request_kwargs["headers"] == headers
121        assert request_kwargs.get("body", None) is None
122
123    @classmethod
124    def assert_token_request_kwargs(
125        cls, request_kwargs, headers, request_data, token_url=TOKEN_URL
126    ):
127        assert request_kwargs["url"] == token_url
128        assert request_kwargs["method"] == "POST"
129        assert request_kwargs["headers"] == headers
130        assert request_kwargs["body"] is not None
131        body_tuples = urllib.parse.parse_qsl(request_kwargs["body"])
132        assert len(body_tuples) == len(request_data.keys())
133        for (k, v) in body_tuples:
134            assert v.decode("utf-8") == request_data[k.decode("utf-8")]
135
136    @classmethod
137    def assert_impersonation_request_kwargs(
138        cls,
139        request_kwargs,
140        headers,
141        request_data,
142        service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
143    ):
144        assert request_kwargs["url"] == service_account_impersonation_url
145        assert request_kwargs["method"] == "POST"
146        assert request_kwargs["headers"] == headers
147        assert request_kwargs["body"] is not None
148        body_json = json.loads(request_kwargs["body"].decode("utf-8"))
149        assert body_json == request_data
150
151    @classmethod
152    def assert_underlying_credentials_refresh(
153        cls,
154        credentials,
155        audience,
156        subject_token,
157        subject_token_type,
158        token_url,
159        service_account_impersonation_url=None,
160        basic_auth_encoding=None,
161        quota_project_id=None,
162        used_scopes=None,
163        credential_data=None,
164        scopes=None,
165        default_scopes=None,
166        workforce_pool_user_project=None,
167    ):
168        """Utility to assert that a credentials are initialized with the expected
169        attributes by calling refresh functionality and confirming response matches
170        expected one and that the underlying requests were populated with the
171        expected parameters.
172        """
173        # STS token exchange request/response.
174        token_response = cls.SUCCESS_RESPONSE.copy()
175        token_headers = {"Content-Type": "application/x-www-form-urlencoded"}
176        if basic_auth_encoding:
177            token_headers["Authorization"] = "Basic " + basic_auth_encoding
178
179        if service_account_impersonation_url:
180            token_scopes = "https://www.googleapis.com/auth/iam"
181        else:
182            token_scopes = " ".join(used_scopes or [])
183
184        token_request_data = {
185            "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
186            "audience": audience,
187            "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
188            "scope": token_scopes,
189            "subject_token": subject_token,
190            "subject_token_type": subject_token_type,
191        }
192        if workforce_pool_user_project:
193            token_request_data["options"] = urllib.parse.quote(
194                json.dumps({"userProject": workforce_pool_user_project})
195            )
196
197        if service_account_impersonation_url:
198            # Service account impersonation request/response.
199            expire_time = (
200                _helpers.utcnow().replace(microsecond=0)
201                + datetime.timedelta(seconds=3600)
202            ).isoformat("T") + "Z"
203            impersonation_response = {
204                "accessToken": "SA_ACCESS_TOKEN",
205                "expireTime": expire_time,
206            }
207            impersonation_headers = {
208                "Content-Type": "application/json",
209                "authorization": "Bearer {}".format(token_response["access_token"]),
210            }
211            impersonation_request_data = {
212                "delegates": None,
213                "scope": used_scopes,
214                "lifetime": "3600s",
215            }
216
217        # Initialize mock request to handle token retrieval, token exchange and
218        # service account impersonation request.
219        requests = []
220        if credential_data:
221            requests.append((http_client.OK, credential_data))
222
223        token_request_index = len(requests)
224        requests.append((http_client.OK, token_response))
225
226        if service_account_impersonation_url:
227            impersonation_request_index = len(requests)
228            requests.append((http_client.OK, impersonation_response))
229
230        request = cls.make_mock_request(*[el for req in requests for el in req])
231
232        credentials.refresh(request)
233
234        assert len(request.call_args_list) == len(requests)
235        if credential_data:
236            cls.assert_credential_request_kwargs(request.call_args_list[0][1], None)
237        # Verify token exchange request parameters.
238        cls.assert_token_request_kwargs(
239            request.call_args_list[token_request_index][1],
240            token_headers,
241            token_request_data,
242            token_url,
243        )
244        # Verify service account impersonation request parameters if the request
245        # is processed.
246        if service_account_impersonation_url:
247            cls.assert_impersonation_request_kwargs(
248                request.call_args_list[impersonation_request_index][1],
249                impersonation_headers,
250                impersonation_request_data,
251                service_account_impersonation_url,
252            )
253            assert credentials.token == impersonation_response["accessToken"]
254        else:
255            assert credentials.token == token_response["access_token"]
256        assert credentials.quota_project_id == quota_project_id
257        assert credentials.scopes == scopes
258        assert credentials.default_scopes == default_scopes
259
260    @classmethod
261    def make_credentials(
262        cls,
263        audience=AUDIENCE,
264        subject_token_type=SUBJECT_TOKEN_TYPE,
265        client_id=None,
266        client_secret=None,
267        quota_project_id=None,
268        scopes=None,
269        default_scopes=None,
270        service_account_impersonation_url=None,
271        credential_source=None,
272        workforce_pool_user_project=None,
273    ):
274        return identity_pool.Credentials(
275            audience=audience,
276            subject_token_type=subject_token_type,
277            token_url=TOKEN_URL,
278            service_account_impersonation_url=service_account_impersonation_url,
279            credential_source=credential_source,
280            client_id=client_id,
281            client_secret=client_secret,
282            quota_project_id=quota_project_id,
283            scopes=scopes,
284            default_scopes=default_scopes,
285            workforce_pool_user_project=workforce_pool_user_project,
286        )
287
288    @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
289    def test_from_info_full_options(self, mock_init):
290        credentials = identity_pool.Credentials.from_info(
291            {
292                "audience": AUDIENCE,
293                "subject_token_type": SUBJECT_TOKEN_TYPE,
294                "token_url": TOKEN_URL,
295                "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL,
296                "client_id": CLIENT_ID,
297                "client_secret": CLIENT_SECRET,
298                "quota_project_id": QUOTA_PROJECT_ID,
299                "credential_source": self.CREDENTIAL_SOURCE_TEXT,
300            }
301        )
302
303        # Confirm identity_pool.Credentials instantiated with expected attributes.
304        assert isinstance(credentials, identity_pool.Credentials)
305        mock_init.assert_called_once_with(
306            audience=AUDIENCE,
307            subject_token_type=SUBJECT_TOKEN_TYPE,
308            token_url=TOKEN_URL,
309            service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
310            client_id=CLIENT_ID,
311            client_secret=CLIENT_SECRET,
312            credential_source=self.CREDENTIAL_SOURCE_TEXT,
313            quota_project_id=QUOTA_PROJECT_ID,
314            workforce_pool_user_project=None,
315        )
316
317    @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
318    def test_from_info_required_options_only(self, mock_init):
319        credentials = identity_pool.Credentials.from_info(
320            {
321                "audience": AUDIENCE,
322                "subject_token_type": SUBJECT_TOKEN_TYPE,
323                "token_url": TOKEN_URL,
324                "credential_source": self.CREDENTIAL_SOURCE_TEXT,
325            }
326        )
327
328        # Confirm identity_pool.Credentials instantiated with expected attributes.
329        assert isinstance(credentials, identity_pool.Credentials)
330        mock_init.assert_called_once_with(
331            audience=AUDIENCE,
332            subject_token_type=SUBJECT_TOKEN_TYPE,
333            token_url=TOKEN_URL,
334            service_account_impersonation_url=None,
335            client_id=None,
336            client_secret=None,
337            credential_source=self.CREDENTIAL_SOURCE_TEXT,
338            quota_project_id=None,
339            workforce_pool_user_project=None,
340        )
341
342    @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
343    def test_from_info_workforce_pool(self, mock_init):
344        credentials = identity_pool.Credentials.from_info(
345            {
346                "audience": WORKFORCE_AUDIENCE,
347                "subject_token_type": WORKFORCE_SUBJECT_TOKEN_TYPE,
348                "token_url": TOKEN_URL,
349                "credential_source": self.CREDENTIAL_SOURCE_TEXT,
350                "workforce_pool_user_project": WORKFORCE_POOL_USER_PROJECT,
351            }
352        )
353
354        # Confirm identity_pool.Credentials instantiated with expected attributes.
355        assert isinstance(credentials, identity_pool.Credentials)
356        mock_init.assert_called_once_with(
357            audience=WORKFORCE_AUDIENCE,
358            subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
359            token_url=TOKEN_URL,
360            service_account_impersonation_url=None,
361            client_id=None,
362            client_secret=None,
363            credential_source=self.CREDENTIAL_SOURCE_TEXT,
364            quota_project_id=None,
365            workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
366        )
367
368    @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
369    def test_from_file_full_options(self, mock_init, tmpdir):
370        info = {
371            "audience": AUDIENCE,
372            "subject_token_type": SUBJECT_TOKEN_TYPE,
373            "token_url": TOKEN_URL,
374            "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL,
375            "client_id": CLIENT_ID,
376            "client_secret": CLIENT_SECRET,
377            "quota_project_id": QUOTA_PROJECT_ID,
378            "credential_source": self.CREDENTIAL_SOURCE_TEXT,
379        }
380        config_file = tmpdir.join("config.json")
381        config_file.write(json.dumps(info))
382        credentials = identity_pool.Credentials.from_file(str(config_file))
383
384        # Confirm identity_pool.Credentials instantiated with expected attributes.
385        assert isinstance(credentials, identity_pool.Credentials)
386        mock_init.assert_called_once_with(
387            audience=AUDIENCE,
388            subject_token_type=SUBJECT_TOKEN_TYPE,
389            token_url=TOKEN_URL,
390            service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
391            client_id=CLIENT_ID,
392            client_secret=CLIENT_SECRET,
393            credential_source=self.CREDENTIAL_SOURCE_TEXT,
394            quota_project_id=QUOTA_PROJECT_ID,
395            workforce_pool_user_project=None,
396        )
397
398    @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
399    def test_from_file_required_options_only(self, mock_init, tmpdir):
400        info = {
401            "audience": AUDIENCE,
402            "subject_token_type": SUBJECT_TOKEN_TYPE,
403            "token_url": TOKEN_URL,
404            "credential_source": self.CREDENTIAL_SOURCE_TEXT,
405        }
406        config_file = tmpdir.join("config.json")
407        config_file.write(json.dumps(info))
408        credentials = identity_pool.Credentials.from_file(str(config_file))
409
410        # Confirm identity_pool.Credentials instantiated with expected attributes.
411        assert isinstance(credentials, identity_pool.Credentials)
412        mock_init.assert_called_once_with(
413            audience=AUDIENCE,
414            subject_token_type=SUBJECT_TOKEN_TYPE,
415            token_url=TOKEN_URL,
416            service_account_impersonation_url=None,
417            client_id=None,
418            client_secret=None,
419            credential_source=self.CREDENTIAL_SOURCE_TEXT,
420            quota_project_id=None,
421            workforce_pool_user_project=None,
422        )
423
424    @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
425    def test_from_file_workforce_pool(self, mock_init, tmpdir):
426        info = {
427            "audience": WORKFORCE_AUDIENCE,
428            "subject_token_type": WORKFORCE_SUBJECT_TOKEN_TYPE,
429            "token_url": TOKEN_URL,
430            "credential_source": self.CREDENTIAL_SOURCE_TEXT,
431            "workforce_pool_user_project": WORKFORCE_POOL_USER_PROJECT,
432        }
433        config_file = tmpdir.join("config.json")
434        config_file.write(json.dumps(info))
435        credentials = identity_pool.Credentials.from_file(str(config_file))
436
437        # Confirm identity_pool.Credentials instantiated with expected attributes.
438        assert isinstance(credentials, identity_pool.Credentials)
439        mock_init.assert_called_once_with(
440            audience=WORKFORCE_AUDIENCE,
441            subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
442            token_url=TOKEN_URL,
443            service_account_impersonation_url=None,
444            client_id=None,
445            client_secret=None,
446            credential_source=self.CREDENTIAL_SOURCE_TEXT,
447            quota_project_id=None,
448            workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
449        )
450
451    def test_constructor_nonworkforce_with_workforce_pool_user_project(self):
452        with pytest.raises(ValueError) as excinfo:
453            self.make_credentials(
454                audience=AUDIENCE,
455                workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
456            )
457
458        assert excinfo.match(
459            "workforce_pool_user_project should not be set for non-workforce "
460            "pool credentials"
461        )
462
463    def test_constructor_invalid_options(self):
464        credential_source = {"unsupported": "value"}
465
466        with pytest.raises(ValueError) as excinfo:
467            self.make_credentials(credential_source=credential_source)
468
469        assert excinfo.match(r"Missing credential_source")
470
471    def test_constructor_invalid_options_url_and_file(self):
472        credential_source = {
473            "url": self.CREDENTIAL_URL,
474            "file": SUBJECT_TOKEN_TEXT_FILE,
475        }
476
477        with pytest.raises(ValueError) as excinfo:
478            self.make_credentials(credential_source=credential_source)
479
480        assert excinfo.match(r"Ambiguous credential_source")
481
482    def test_constructor_invalid_options_environment_id(self):
483        credential_source = {"url": self.CREDENTIAL_URL, "environment_id": "aws1"}
484
485        with pytest.raises(ValueError) as excinfo:
486            self.make_credentials(credential_source=credential_source)
487
488        assert excinfo.match(
489            r"Invalid Identity Pool credential_source field 'environment_id'"
490        )
491
492    def test_constructor_invalid_credential_source(self):
493        with pytest.raises(ValueError) as excinfo:
494            self.make_credentials(credential_source="non-dict")
495
496        assert excinfo.match(r"Missing credential_source")
497
498    def test_constructor_invalid_credential_source_format_type(self):
499        credential_source = {"format": {"type": "xml"}}
500
501        with pytest.raises(ValueError) as excinfo:
502            self.make_credentials(credential_source=credential_source)
503
504        assert excinfo.match(r"Invalid credential_source format 'xml'")
505
506    def test_constructor_missing_subject_token_field_name(self):
507        credential_source = {"format": {"type": "json"}}
508
509        with pytest.raises(ValueError) as excinfo:
510            self.make_credentials(credential_source=credential_source)
511
512        assert excinfo.match(
513            r"Missing subject_token_field_name for JSON credential_source format"
514        )
515
516    def test_info_with_workforce_pool_user_project(self):
517        credentials = self.make_credentials(
518            audience=WORKFORCE_AUDIENCE,
519            subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
520            credential_source=self.CREDENTIAL_SOURCE_TEXT_URL.copy(),
521            workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
522        )
523
524        assert credentials.info == {
525            "type": "external_account",
526            "audience": WORKFORCE_AUDIENCE,
527            "subject_token_type": WORKFORCE_SUBJECT_TOKEN_TYPE,
528            "token_url": TOKEN_URL,
529            "credential_source": self.CREDENTIAL_SOURCE_TEXT_URL,
530            "workforce_pool_user_project": WORKFORCE_POOL_USER_PROJECT,
531        }
532
533    def test_info_with_file_credential_source(self):
534        credentials = self.make_credentials(
535            credential_source=self.CREDENTIAL_SOURCE_TEXT_URL.copy()
536        )
537
538        assert credentials.info == {
539            "type": "external_account",
540            "audience": AUDIENCE,
541            "subject_token_type": SUBJECT_TOKEN_TYPE,
542            "token_url": TOKEN_URL,
543            "credential_source": self.CREDENTIAL_SOURCE_TEXT_URL,
544        }
545
546    def test_info_with_url_credential_source(self):
547        credentials = self.make_credentials(
548            credential_source=self.CREDENTIAL_SOURCE_JSON_URL.copy()
549        )
550
551        assert credentials.info == {
552            "type": "external_account",
553            "audience": AUDIENCE,
554            "subject_token_type": SUBJECT_TOKEN_TYPE,
555            "token_url": TOKEN_URL,
556            "credential_source": self.CREDENTIAL_SOURCE_JSON_URL,
557        }
558
559    def test_retrieve_subject_token_missing_subject_token(self, tmpdir):
560        # Provide empty text file.
561        empty_file = tmpdir.join("empty.txt")
562        empty_file.write("")
563        credential_source = {"file": str(empty_file)}
564        credentials = self.make_credentials(credential_source=credential_source)
565
566        with pytest.raises(exceptions.RefreshError) as excinfo:
567            credentials.retrieve_subject_token(None)
568
569        assert excinfo.match(r"Missing subject_token in the credential_source file")
570
571    def test_retrieve_subject_token_text_file(self):
572        credentials = self.make_credentials(
573            credential_source=self.CREDENTIAL_SOURCE_TEXT
574        )
575
576        subject_token = credentials.retrieve_subject_token(None)
577
578        assert subject_token == TEXT_FILE_SUBJECT_TOKEN
579
580    def test_retrieve_subject_token_json_file(self):
581        credentials = self.make_credentials(
582            credential_source=self.CREDENTIAL_SOURCE_JSON
583        )
584
585        subject_token = credentials.retrieve_subject_token(None)
586
587        assert subject_token == JSON_FILE_SUBJECT_TOKEN
588
589    def test_retrieve_subject_token_json_file_invalid_field_name(self):
590        credential_source = {
591            "file": SUBJECT_TOKEN_JSON_FILE,
592            "format": {"type": "json", "subject_token_field_name": "not_found"},
593        }
594        credentials = self.make_credentials(credential_source=credential_source)
595
596        with pytest.raises(exceptions.RefreshError) as excinfo:
597            credentials.retrieve_subject_token(None)
598
599        assert excinfo.match(
600            "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
601                SUBJECT_TOKEN_JSON_FILE, "not_found"
602            )
603        )
604
605    def test_retrieve_subject_token_invalid_json(self, tmpdir):
606        # Provide JSON file. This should result in JSON parsing error.
607        invalid_json_file = tmpdir.join("invalid.json")
608        invalid_json_file.write("{")
609        credential_source = {
610            "file": str(invalid_json_file),
611            "format": {"type": "json", "subject_token_field_name": "access_token"},
612        }
613        credentials = self.make_credentials(credential_source=credential_source)
614
615        with pytest.raises(exceptions.RefreshError) as excinfo:
616            credentials.retrieve_subject_token(None)
617
618        assert excinfo.match(
619            "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
620                str(invalid_json_file), "access_token"
621            )
622        )
623
624    def test_retrieve_subject_token_file_not_found(self):
625        credential_source = {"file": "./not_found.txt"}
626        credentials = self.make_credentials(credential_source=credential_source)
627
628        with pytest.raises(exceptions.RefreshError) as excinfo:
629            credentials.retrieve_subject_token(None)
630
631        assert excinfo.match(r"File './not_found.txt' was not found")
632
633    def test_refresh_text_file_success_without_impersonation_ignore_default_scopes(
634        self,
635    ):
636        credentials = self.make_credentials(
637            client_id=CLIENT_ID,
638            client_secret=CLIENT_SECRET,
639            # Test with text format type.
640            credential_source=self.CREDENTIAL_SOURCE_TEXT,
641            scopes=SCOPES,
642            # Default scopes should be ignored.
643            default_scopes=["ignored"],
644        )
645
646        self.assert_underlying_credentials_refresh(
647            credentials=credentials,
648            audience=AUDIENCE,
649            subject_token=TEXT_FILE_SUBJECT_TOKEN,
650            subject_token_type=SUBJECT_TOKEN_TYPE,
651            token_url=TOKEN_URL,
652            service_account_impersonation_url=None,
653            basic_auth_encoding=BASIC_AUTH_ENCODING,
654            quota_project_id=None,
655            used_scopes=SCOPES,
656            scopes=SCOPES,
657            default_scopes=["ignored"],
658        )
659
660    def test_refresh_workforce_success_with_client_auth_without_impersonation(self):
661        credentials = self.make_credentials(
662            audience=WORKFORCE_AUDIENCE,
663            subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
664            client_id=CLIENT_ID,
665            client_secret=CLIENT_SECRET,
666            # Test with text format type.
667            credential_source=self.CREDENTIAL_SOURCE_TEXT,
668            scopes=SCOPES,
669            # This will be ignored in favor of client auth.
670            workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
671        )
672
673        self.assert_underlying_credentials_refresh(
674            credentials=credentials,
675            audience=WORKFORCE_AUDIENCE,
676            subject_token=TEXT_FILE_SUBJECT_TOKEN,
677            subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
678            token_url=TOKEN_URL,
679            service_account_impersonation_url=None,
680            basic_auth_encoding=BASIC_AUTH_ENCODING,
681            quota_project_id=None,
682            used_scopes=SCOPES,
683            scopes=SCOPES,
684            workforce_pool_user_project=None,
685        )
686
687    def test_refresh_workforce_success_with_client_auth_and_no_workforce_project(self):
688        credentials = self.make_credentials(
689            audience=WORKFORCE_AUDIENCE,
690            subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
691            client_id=CLIENT_ID,
692            client_secret=CLIENT_SECRET,
693            # Test with text format type.
694            credential_source=self.CREDENTIAL_SOURCE_TEXT,
695            scopes=SCOPES,
696            # This is not needed when client Auth is used.
697            workforce_pool_user_project=None,
698        )
699
700        self.assert_underlying_credentials_refresh(
701            credentials=credentials,
702            audience=WORKFORCE_AUDIENCE,
703            subject_token=TEXT_FILE_SUBJECT_TOKEN,
704            subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
705            token_url=TOKEN_URL,
706            service_account_impersonation_url=None,
707            basic_auth_encoding=BASIC_AUTH_ENCODING,
708            quota_project_id=None,
709            used_scopes=SCOPES,
710            scopes=SCOPES,
711            workforce_pool_user_project=None,
712        )
713
714    def test_refresh_workforce_success_without_client_auth_without_impersonation(self):
715        credentials = self.make_credentials(
716            audience=WORKFORCE_AUDIENCE,
717            subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
718            client_id=None,
719            client_secret=None,
720            # Test with text format type.
721            credential_source=self.CREDENTIAL_SOURCE_TEXT,
722            scopes=SCOPES,
723            # This will not be ignored as client auth is not used.
724            workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
725        )
726
727        self.assert_underlying_credentials_refresh(
728            credentials=credentials,
729            audience=WORKFORCE_AUDIENCE,
730            subject_token=TEXT_FILE_SUBJECT_TOKEN,
731            subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
732            token_url=TOKEN_URL,
733            service_account_impersonation_url=None,
734            basic_auth_encoding=None,
735            quota_project_id=None,
736            used_scopes=SCOPES,
737            scopes=SCOPES,
738            workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
739        )
740
741    def test_refresh_workforce_success_without_client_auth_with_impersonation(self):
742        credentials = self.make_credentials(
743            audience=WORKFORCE_AUDIENCE,
744            subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
745            client_id=None,
746            client_secret=None,
747            service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
748            # Test with text format type.
749            credential_source=self.CREDENTIAL_SOURCE_TEXT,
750            scopes=SCOPES,
751            # This will not be ignored as client auth is not used.
752            workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
753        )
754
755        self.assert_underlying_credentials_refresh(
756            credentials=credentials,
757            audience=WORKFORCE_AUDIENCE,
758            subject_token=TEXT_FILE_SUBJECT_TOKEN,
759            subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
760            token_url=TOKEN_URL,
761            service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
762            basic_auth_encoding=None,
763            quota_project_id=None,
764            used_scopes=SCOPES,
765            scopes=SCOPES,
766            workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
767        )
768
769    def test_refresh_text_file_success_without_impersonation_use_default_scopes(self):
770        credentials = self.make_credentials(
771            client_id=CLIENT_ID,
772            client_secret=CLIENT_SECRET,
773            # Test with text format type.
774            credential_source=self.CREDENTIAL_SOURCE_TEXT,
775            scopes=None,
776            # Default scopes should be used since user specified scopes are none.
777            default_scopes=SCOPES,
778        )
779
780        self.assert_underlying_credentials_refresh(
781            credentials=credentials,
782            audience=AUDIENCE,
783            subject_token=TEXT_FILE_SUBJECT_TOKEN,
784            subject_token_type=SUBJECT_TOKEN_TYPE,
785            token_url=TOKEN_URL,
786            service_account_impersonation_url=None,
787            basic_auth_encoding=BASIC_AUTH_ENCODING,
788            quota_project_id=None,
789            used_scopes=SCOPES,
790            scopes=None,
791            default_scopes=SCOPES,
792        )
793
794    def test_refresh_text_file_success_with_impersonation_ignore_default_scopes(self):
795        # Initialize credentials with service account impersonation and basic auth.
796        credentials = self.make_credentials(
797            # Test with text format type.
798            credential_source=self.CREDENTIAL_SOURCE_TEXT,
799            service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
800            scopes=SCOPES,
801            # Default scopes should be ignored.
802            default_scopes=["ignored"],
803        )
804
805        self.assert_underlying_credentials_refresh(
806            credentials=credentials,
807            audience=AUDIENCE,
808            subject_token=TEXT_FILE_SUBJECT_TOKEN,
809            subject_token_type=SUBJECT_TOKEN_TYPE,
810            token_url=TOKEN_URL,
811            service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
812            basic_auth_encoding=None,
813            quota_project_id=None,
814            used_scopes=SCOPES,
815            scopes=SCOPES,
816            default_scopes=["ignored"],
817        )
818
819    def test_refresh_text_file_success_with_impersonation_use_default_scopes(self):
820        # Initialize credentials with service account impersonation, basic auth
821        # and default scopes (no user scopes).
822        credentials = self.make_credentials(
823            # Test with text format type.
824            credential_source=self.CREDENTIAL_SOURCE_TEXT,
825            service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
826            scopes=None,
827            # Default scopes should be used since user specified scopes are none.
828            default_scopes=SCOPES,
829        )
830
831        self.assert_underlying_credentials_refresh(
832            credentials=credentials,
833            audience=AUDIENCE,
834            subject_token=TEXT_FILE_SUBJECT_TOKEN,
835            subject_token_type=SUBJECT_TOKEN_TYPE,
836            token_url=TOKEN_URL,
837            service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
838            basic_auth_encoding=None,
839            quota_project_id=None,
840            used_scopes=SCOPES,
841            scopes=None,
842            default_scopes=SCOPES,
843        )
844
845    def test_refresh_json_file_success_without_impersonation(self):
846        credentials = self.make_credentials(
847            client_id=CLIENT_ID,
848            client_secret=CLIENT_SECRET,
849            # Test with JSON format type.
850            credential_source=self.CREDENTIAL_SOURCE_JSON,
851            scopes=SCOPES,
852        )
853
854        self.assert_underlying_credentials_refresh(
855            credentials=credentials,
856            audience=AUDIENCE,
857            subject_token=JSON_FILE_SUBJECT_TOKEN,
858            subject_token_type=SUBJECT_TOKEN_TYPE,
859            token_url=TOKEN_URL,
860            service_account_impersonation_url=None,
861            basic_auth_encoding=BASIC_AUTH_ENCODING,
862            quota_project_id=None,
863            used_scopes=SCOPES,
864            scopes=SCOPES,
865            default_scopes=None,
866        )
867
868    def test_refresh_json_file_success_with_impersonation(self):
869        # Initialize credentials with service account impersonation and basic auth.
870        credentials = self.make_credentials(
871            # Test with JSON format type.
872            credential_source=self.CREDENTIAL_SOURCE_JSON,
873            service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
874            scopes=SCOPES,
875        )
876
877        self.assert_underlying_credentials_refresh(
878            credentials=credentials,
879            audience=AUDIENCE,
880            subject_token=JSON_FILE_SUBJECT_TOKEN,
881            subject_token_type=SUBJECT_TOKEN_TYPE,
882            token_url=TOKEN_URL,
883            service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
884            basic_auth_encoding=None,
885            quota_project_id=None,
886            used_scopes=SCOPES,
887            scopes=SCOPES,
888            default_scopes=None,
889        )
890
891    def test_refresh_with_retrieve_subject_token_error(self):
892        credential_source = {
893            "file": SUBJECT_TOKEN_JSON_FILE,
894            "format": {"type": "json", "subject_token_field_name": "not_found"},
895        }
896        credentials = self.make_credentials(credential_source=credential_source)
897
898        with pytest.raises(exceptions.RefreshError) as excinfo:
899            credentials.refresh(None)
900
901        assert excinfo.match(
902            "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
903                SUBJECT_TOKEN_JSON_FILE, "not_found"
904            )
905        )
906
907    def test_retrieve_subject_token_from_url(self):
908        credentials = self.make_credentials(
909            credential_source=self.CREDENTIAL_SOURCE_TEXT_URL
910        )
911        request = self.make_mock_request(token_data=TEXT_FILE_SUBJECT_TOKEN)
912        subject_token = credentials.retrieve_subject_token(request)
913
914        assert subject_token == TEXT_FILE_SUBJECT_TOKEN
915        self.assert_credential_request_kwargs(request.call_args_list[0][1], None)
916
917    def test_retrieve_subject_token_from_url_with_headers(self):
918        credentials = self.make_credentials(
919            credential_source={"url": self.CREDENTIAL_URL, "headers": {"foo": "bar"}}
920        )
921        request = self.make_mock_request(token_data=TEXT_FILE_SUBJECT_TOKEN)
922        subject_token = credentials.retrieve_subject_token(request)
923
924        assert subject_token == TEXT_FILE_SUBJECT_TOKEN
925        self.assert_credential_request_kwargs(
926            request.call_args_list[0][1], {"foo": "bar"}
927        )
928
929    def test_retrieve_subject_token_from_url_json(self):
930        credentials = self.make_credentials(
931            credential_source=self.CREDENTIAL_SOURCE_JSON_URL
932        )
933        request = self.make_mock_request(token_data=JSON_FILE_CONTENT)
934        subject_token = credentials.retrieve_subject_token(request)
935
936        assert subject_token == JSON_FILE_SUBJECT_TOKEN
937        self.assert_credential_request_kwargs(request.call_args_list[0][1], None)
938
939    def test_retrieve_subject_token_from_url_json_with_headers(self):
940        credentials = self.make_credentials(
941            credential_source={
942                "url": self.CREDENTIAL_URL,
943                "format": {"type": "json", "subject_token_field_name": "access_token"},
944                "headers": {"foo": "bar"},
945            }
946        )
947        request = self.make_mock_request(token_data=JSON_FILE_CONTENT)
948        subject_token = credentials.retrieve_subject_token(request)
949
950        assert subject_token == JSON_FILE_SUBJECT_TOKEN
951        self.assert_credential_request_kwargs(
952            request.call_args_list[0][1], {"foo": "bar"}
953        )
954
955    def test_retrieve_subject_token_from_url_not_found(self):
956        credentials = self.make_credentials(
957            credential_source=self.CREDENTIAL_SOURCE_TEXT_URL
958        )
959        with pytest.raises(exceptions.RefreshError) as excinfo:
960            credentials.retrieve_subject_token(
961                self.make_mock_request(token_status=404, token_data=JSON_FILE_CONTENT)
962            )
963
964        assert excinfo.match("Unable to retrieve Identity Pool subject token")
965
966    def test_retrieve_subject_token_from_url_json_invalid_field(self):
967        credential_source = {
968            "url": self.CREDENTIAL_URL,
969            "format": {"type": "json", "subject_token_field_name": "not_found"},
970        }
971        credentials = self.make_credentials(credential_source=credential_source)
972
973        with pytest.raises(exceptions.RefreshError) as excinfo:
974            credentials.retrieve_subject_token(
975                self.make_mock_request(token_data=JSON_FILE_CONTENT)
976            )
977
978        assert excinfo.match(
979            "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
980                self.CREDENTIAL_URL, "not_found"
981            )
982        )
983
984    def test_retrieve_subject_token_from_url_json_invalid_format(self):
985        credentials = self.make_credentials(
986            credential_source=self.CREDENTIAL_SOURCE_JSON_URL
987        )
988
989        with pytest.raises(exceptions.RefreshError) as excinfo:
990            credentials.retrieve_subject_token(self.make_mock_request(token_data="{"))
991
992        assert excinfo.match(
993            "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
994                self.CREDENTIAL_URL, "access_token"
995            )
996        )
997
998    def test_refresh_text_file_success_without_impersonation_url(self):
999        credentials = self.make_credentials(
1000            client_id=CLIENT_ID,
1001            client_secret=CLIENT_SECRET,
1002            # Test with text format type.
1003            credential_source=self.CREDENTIAL_SOURCE_TEXT_URL,
1004            scopes=SCOPES,
1005        )
1006
1007        self.assert_underlying_credentials_refresh(
1008            credentials=credentials,
1009            audience=AUDIENCE,
1010            subject_token=TEXT_FILE_SUBJECT_TOKEN,
1011            subject_token_type=SUBJECT_TOKEN_TYPE,
1012            token_url=TOKEN_URL,
1013            service_account_impersonation_url=None,
1014            basic_auth_encoding=BASIC_AUTH_ENCODING,
1015            quota_project_id=None,
1016            used_scopes=SCOPES,
1017            scopes=SCOPES,
1018            default_scopes=None,
1019            credential_data=TEXT_FILE_SUBJECT_TOKEN,
1020        )
1021
1022    def test_refresh_text_file_success_with_impersonation_url(self):
1023        # Initialize credentials with service account impersonation and basic auth.
1024        credentials = self.make_credentials(
1025            # Test with text format type.
1026            credential_source=self.CREDENTIAL_SOURCE_TEXT_URL,
1027            service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
1028            scopes=SCOPES,
1029        )
1030
1031        self.assert_underlying_credentials_refresh(
1032            credentials=credentials,
1033            audience=AUDIENCE,
1034            subject_token=TEXT_FILE_SUBJECT_TOKEN,
1035            subject_token_type=SUBJECT_TOKEN_TYPE,
1036            token_url=TOKEN_URL,
1037            service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
1038            basic_auth_encoding=None,
1039            quota_project_id=None,
1040            used_scopes=SCOPES,
1041            scopes=SCOPES,
1042            default_scopes=None,
1043            credential_data=TEXT_FILE_SUBJECT_TOKEN,
1044        )
1045
1046    def test_refresh_json_file_success_without_impersonation_url(self):
1047        credentials = self.make_credentials(
1048            client_id=CLIENT_ID,
1049            client_secret=CLIENT_SECRET,
1050            # Test with JSON format type.
1051            credential_source=self.CREDENTIAL_SOURCE_JSON_URL,
1052            scopes=SCOPES,
1053        )
1054
1055        self.assert_underlying_credentials_refresh(
1056            credentials=credentials,
1057            audience=AUDIENCE,
1058            subject_token=JSON_FILE_SUBJECT_TOKEN,
1059            subject_token_type=SUBJECT_TOKEN_TYPE,
1060            token_url=TOKEN_URL,
1061            service_account_impersonation_url=None,
1062            basic_auth_encoding=BASIC_AUTH_ENCODING,
1063            quota_project_id=None,
1064            used_scopes=SCOPES,
1065            scopes=SCOPES,
1066            default_scopes=None,
1067            credential_data=JSON_FILE_CONTENT,
1068        )
1069
1070    def test_refresh_json_file_success_with_impersonation_url(self):
1071        # Initialize credentials with service account impersonation and basic auth.
1072        credentials = self.make_credentials(
1073            # Test with JSON format type.
1074            credential_source=self.CREDENTIAL_SOURCE_JSON_URL,
1075            service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
1076            scopes=SCOPES,
1077        )
1078
1079        self.assert_underlying_credentials_refresh(
1080            credentials=credentials,
1081            audience=AUDIENCE,
1082            subject_token=JSON_FILE_SUBJECT_TOKEN,
1083            subject_token_type=SUBJECT_TOKEN_TYPE,
1084            token_url=TOKEN_URL,
1085            service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
1086            basic_auth_encoding=None,
1087            quota_project_id=None,
1088            used_scopes=SCOPES,
1089            scopes=SCOPES,
1090            default_scopes=None,
1091            credential_data=JSON_FILE_CONTENT,
1092        )
1093
1094    def test_refresh_with_retrieve_subject_token_error_url(self):
1095        credential_source = {
1096            "url": self.CREDENTIAL_URL,
1097            "format": {"type": "json", "subject_token_field_name": "not_found"},
1098        }
1099        credentials = self.make_credentials(credential_source=credential_source)
1100
1101        with pytest.raises(exceptions.RefreshError) as excinfo:
1102            credentials.refresh(self.make_mock_request(token_data=JSON_FILE_CONTENT))
1103
1104        assert excinfo.match(
1105            "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
1106                self.CREDENTIAL_URL, "not_found"
1107            )
1108        )
1109