diff --git a/packages/google-auth/google/auth/_regional_access_boundary_utils.py b/packages/google-auth/google/auth/_regional_access_boundary_utils.py index c97bf8f484df..8fd8c2fed5a2 100644 --- a/packages/google-auth/google/auth/_regional_access_boundary_utils.py +++ b/packages/google-auth/google/auth/_regional_access_boundary_utils.py @@ -17,15 +17,12 @@ import asyncio import copy import datetime -import functools import inspect import logging -import os import threading from typing import NamedTuple, Optional, TYPE_CHECKING from google.auth import _helpers -from google.auth import environment_vars if TYPE_CHECKING: import google.auth.credentials @@ -34,25 +31,6 @@ _LOGGER = logging.getLogger(__name__) -@functools.lru_cache() -def is_regional_access_boundary_enabled(): - """Checks if Regional Access Boundary is enabled via environment variable. - - The environment variable is interpreted as a boolean with the following - (case-insensitive) rules: - - "true", "1" are considered true. - - Any other value (or unset) is considered false. - - Returns: - bool: True if Regional Access Boundary is enabled, False otherwise. - """ - value = os.environ.get(environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED) - if value is None: - return False - - return value.lower() in ("true", "1") - - # The default lifetime for a cached Regional Access Boundary. DEFAULT_REGIONAL_ACCESS_BOUNDARY_TTL = datetime.timedelta(hours=6) diff --git a/packages/google-auth/google/auth/aws.py b/packages/google-auth/google/auth/aws.py index c640568b80e9..46c913a7a96f 100644 --- a/packages/google-auth/google/auth/aws.py +++ b/packages/google-auth/google/auth/aws.py @@ -841,11 +841,9 @@ def from_info(cls, info, **kwargs): Raises: ValueError: For invalid parameters. """ - aws_security_credentials_supplier = info.get( - "aws_security_credentials_supplier" - ) - kwargs.update( - {"aws_security_credentials_supplier": aws_security_credentials_supplier} + kwargs.setdefault( + "aws_security_credentials_supplier", + info.get("aws_security_credentials_supplier"), ) return super(Credentials, cls).from_info(info, **kwargs) diff --git a/packages/google-auth/google/auth/credentials.py b/packages/google-auth/google/auth/credentials.py index f0ce4f41e0ac..95aa2cf3503f 100644 --- a/packages/google-auth/google/auth/credentials.py +++ b/packages/google-auth/google/auth/credentials.py @@ -446,9 +446,13 @@ def _is_regional_endpoint(self, url): try: # Do not perform a lookup if the request is for a regional endpoint. hostname = urlparse(url).hostname - if hostname and ( - hostname.endswith(".rep.googleapis.com") - or hostname.endswith(".rep.sandbox.googleapis.com") + if hostname and hostname.endswith( + ( + ".rep.googleapis.com", + ".rep.sandbox.googleapis.com", + ".rep.mtls.googleapis.com", + ".rep.mtls.sandbox.googleapis.com", + ) ): return True except (ValueError, TypeError, AttributeError): @@ -484,16 +488,11 @@ def _maybe_start_regional_access_boundary_refresh(self, request, url): def _is_regional_access_boundary_lookup_required(self): """Checks if a Regional Access Boundary lookup is required. - A lookup is required if the feature is enabled via an environment - variable and the universe domain is supported. + A lookup is required if the universe domain is supported. Returns: bool: True if a Regional Access Boundary lookup is required, False otherwise. """ - # Check if the feature is enabled. - if not _regional_access_boundary_utils.is_regional_access_boundary_enabled(): - return False - # Skip for non-default universe domains. if self.universe_domain != DEFAULT_UNIVERSE_DOMAIN: return False diff --git a/packages/google-auth/google/auth/environment_vars.py b/packages/google-auth/google/auth/environment_vars.py index c7d706467ed4..04c1c61c7d34 100644 --- a/packages/google-auth/google/auth/environment_vars.py +++ b/packages/google-auth/google/auth/environment_vars.py @@ -105,9 +105,6 @@ AWS_REGION = "AWS_REGION" AWS_DEFAULT_REGION = "AWS_DEFAULT_REGION" -GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED = "GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED" -"""Environment variable controlling whether to enable trust boundary feature. -The default value is false. Users have to explicitly set this value to true.""" GOOGLE_API_CERTIFICATE_CONFIG = "GOOGLE_API_CERTIFICATE_CONFIG" """Environment variable defining the location of Google API certificate config diff --git a/packages/google-auth/google/auth/external_account.py b/packages/google-auth/google/auth/external_account.py index eee6d1194031..95525768fe8e 100644 --- a/packages/google-auth/google/auth/external_account.py +++ b/packages/google-auth/google/auth/external_account.py @@ -444,6 +444,17 @@ def _maybe_start_regional_access_boundary_refresh(self, request, url): HTTP requests. url (str): The URL of the request. """ + if ( + self._should_initialize_impersonated_credentials() + and self.service_account_email + ): + self._impersonated_credentials = self._initialize_impersonated_credentials() + if getattr(self, "token", None): + self._impersonated_credentials.token = self.token + if getattr(self, "expiry", None): + self._impersonated_credentials.expiry = self.expiry + self._rab_manager = self._impersonated_credentials._rab_manager + if getattr(self, "_impersonated_credentials", None): self._impersonated_credentials._maybe_start_regional_access_boundary_refresh( request, url diff --git a/packages/google-auth/google/auth/identity_pool.py b/packages/google-auth/google/auth/identity_pool.py index 30819ef0485a..333f7bdf53ea 100644 --- a/packages/google-auth/google/auth/identity_pool.py +++ b/packages/google-auth/google/auth/identity_pool.py @@ -526,8 +526,7 @@ def from_info(cls, info, **kwargs): Raises: ValueError: For invalid parameters. """ - subject_token_supplier = info.get("subject_token_supplier") - kwargs.update({"subject_token_supplier": subject_token_supplier}) + kwargs.setdefault("subject_token_supplier", info.get("subject_token_supplier")) return super(Credentials, cls).from_info(info, **kwargs) @classmethod diff --git a/packages/google-auth/tests/compute_engine/test_credentials.py b/packages/google-auth/tests/compute_engine/test_credentials.py index 7fb2b8b504fc..1db0810d4048 100644 --- a/packages/google-auth/tests/compute_engine/test_credentials.py +++ b/packages/google-auth/tests/compute_engine/test_credentials.py @@ -13,6 +13,7 @@ # limitations under the License. import base64 import datetime +import re from unittest import mock import pytest # type: ignore @@ -206,6 +207,7 @@ def test_before_request_refreshes(self, get): "access_token": "token", "expires_in": 500, }, + "googleapis.com", ] # Credentials should start as invalid @@ -252,7 +254,11 @@ def test_with_universe_domain(self): assert creds.universe_domain == "universe_domain" assert creds._universe_domain_cached - def test_token_usage_metrics(self): + @mock.patch( + "google.auth.compute_engine._metadata.get_universe_domain", + return_value="googleapis.com", + ) + def test_token_usage_metrics(self, mock_get_universe_domain): self.credentials.token = "token" self.credentials.expiry = None @@ -410,11 +416,7 @@ def test_build_regional_access_boundary_lookup_url_no_email( url = creds._build_regional_access_boundary_lookup_url() assert url is None - @mock.patch( - "google.auth._regional_access_boundary_utils.is_regional_access_boundary_enabled", - return_value=True, - ) - def test_is_regional_access_boundary_lookup_required(self, mock_enabled): + def test_is_regional_access_boundary_lookup_required(self): creds = self.credentials creds._universe_domain_cached = True @@ -442,15 +444,11 @@ def test_build_regional_access_boundary_lookup_url_with_invalid_email(self): url = creds._build_regional_access_boundary_lookup_url() assert url is None - @mock.patch( - "google.auth._regional_access_boundary_utils.is_regional_access_boundary_enabled", - return_value=True, - ) @mock.patch( "google.auth.compute_engine._metadata.get_service_account_info", autospec=True ) def test_regional_access_boundary_disabled_state_transitions( - self, mock_get_service_account_info, mock_enabled + self, mock_get_service_account_info ): mock_get_service_account_info.return_value = { "email": "spiffe://trust-domain/ns/ns/sa/sa", @@ -769,6 +767,15 @@ def test_with_target_audience_integration(self): json={}, ) + # mock allowedLocations for Regional Access Boundary + responses.add( + responses.GET, + re.compile(r".*/allowedLocations$"), + status=200, + content_type="application/json", + json={"encodedLocations": "0xABC"}, + ) + # mock token for credentials responses.add( responses.GET, @@ -787,8 +794,10 @@ def test_with_target_audience_integration(self): signature = base64.b64encode(b"some-signature").decode("utf-8") responses.add( responses.POST, - "https://iamcredentials.googleapis.com/v1/projects/-/" - "serviceAccounts/service-account@example.com:signBlob", + re.compile( + r"https://iamcredentials\.(mtls\.)?googleapis\.com/v1/projects/-/" + r"serviceAccounts/service-account@example\.com:signBlob" + ), status=200, content_type="application/json", json={"keyId": "some-key-id", "signedBlob": signature}, @@ -951,12 +960,23 @@ def test_with_quota_project_integration(self): json={}, ) + # mock allowedLocations for Regional Access Boundary + responses.add( + responses.GET, + re.compile(r".*/allowedLocations$"), + status=200, + content_type="application/json", + json={"encodedLocations": "0xABC"}, + ) + # mock sign blob endpoint signature = base64.b64encode(b"some-signature").decode("utf-8") responses.add( responses.POST, - "https://iamcredentials.googleapis.com/v1/projects/-/" - "serviceAccounts/service-account@example.com:signBlob", + re.compile( + r"https://iamcredentials\.(mtls\.)?googleapis\.com/v1/projects/-/" + r"serviceAccounts/service-account@example\.com:signBlob" + ), status=200, content_type="application/json", json={"keyId": "some-key-id", "signedBlob": signature}, diff --git a/packages/google-auth/tests/test__regional_access_boundary_utils.py b/packages/google-auth/tests/test__regional_access_boundary_utils.py index c612b60b8ed2..5e4243f94347 100644 --- a/packages/google-auth/tests/test__regional_access_boundary_utils.py +++ b/packages/google-auth/tests/test__regional_access_boundary_utils.py @@ -13,7 +13,6 @@ # limitations under the License. import datetime -import os from unittest import mock import pytest # type: ignore @@ -22,7 +21,6 @@ from google.auth import _helpers from google.auth import _regional_access_boundary_utils from google.auth import credentials -from google.auth import environment_vars from google.oauth2 import credentials as oauth2_credentials @@ -53,48 +51,7 @@ def _make_copy(self): return new_credentials -@pytest.fixture(autouse=True) -def clear_rab_cache(): - """Clears the Regional Access Boundary enablement cache before every test.""" - _regional_access_boundary_utils.is_regional_access_boundary_enabled.cache_clear() - - class TestCredentialsWithRegionalAccessBoundary(object): - def test_is_regional_access_boundary_enabled_cached(self, monkeypatch): - # Set to true - monkeypatch.setenv(environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED, "true") - assert ( - _regional_access_boundary_utils.is_regional_access_boundary_enabled() - is True - ) - - # Change env var to false, but it should still return True due to caching - monkeypatch.setenv(environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED, "false") - assert ( - _regional_access_boundary_utils.is_regional_access_boundary_enabled() - is True - ) - - # Clear cache and it should now reflect the new value - _regional_access_boundary_utils.is_regional_access_boundary_enabled.cache_clear() - assert ( - _regional_access_boundary_utils.is_regional_access_boundary_enabled() - is False - ) - - @mock.patch( - "google.auth._regional_access_boundary_utils._RegionalAccessBoundaryRefreshManager.start_refresh" - ) - def test_maybe_start_refresh_is_skipped_if_env_var_not_set( - self, mock_start_refresh - ): - creds = CredentialsImpl() - with mock.patch.dict(os.environ, clear=True): - creds._maybe_start_regional_access_boundary_refresh( - mock.Mock(), "http://example.com" - ) - mock_start_refresh.assert_not_called() - @mock.patch( "google.auth._regional_access_boundary_utils._RegionalAccessBoundaryRefreshManager.start_refresh" ) @@ -106,13 +63,9 @@ def test_maybe_start_refresh_is_skipped_if_not_expired(self, mock_start_refresh) cooldown_expiry=None, cooldown_duration=_regional_access_boundary_utils.DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN, ) - with mock.patch.dict( - os.environ, - {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}, - ): - creds._maybe_start_regional_access_boundary_refresh( - mock.Mock(), "http://example.com" - ) + creds._maybe_start_regional_access_boundary_refresh( + mock.Mock(), "http://example.com" + ) mock_start_refresh.assert_not_called() @mock.patch( @@ -127,13 +80,9 @@ def test_maybe_start_refresh_triggered_if_soft_expired(self, mock_start_refresh) cooldown_duration=_regional_access_boundary_utils.DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN, ) request = mock.Mock() - with mock.patch.dict( - os.environ, - {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}, - ): - creds._maybe_start_regional_access_boundary_refresh( - request, "http://example.com" - ) + creds._maybe_start_regional_access_boundary_refresh( + request, "http://example.com" + ) mock_start_refresh.assert_called_once_with(creds, request, creds._rab_manager) @mock.patch( @@ -149,29 +98,28 @@ def test_maybe_start_refresh_is_skipped_if_cooldown_active( cooldown_expiry=_helpers.utcnow() + datetime.timedelta(minutes=5), cooldown_duration=_regional_access_boundary_utils.DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN, ) - with mock.patch.dict( - os.environ, - {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}, - ): - creds._maybe_start_regional_access_boundary_refresh( - mock.Mock(), "http://example.com" - ) + creds._maybe_start_regional_access_boundary_refresh( + mock.Mock(), "http://example.com" + ) mock_start_refresh.assert_not_called() @mock.patch( "google.auth._regional_access_boundary_utils._RegionalAccessBoundaryRefreshManager.start_refresh" ) + @pytest.mark.parametrize( + "url", + [ + "https://my-service.us-east1.rep.googleapis.com", + "https://my-service.us-east1.rep.sandbox.googleapis.com", + "https://my-service.us-east1.rep.mtls.googleapis.com", + "https://my-service.us-east1.rep.mtls.sandbox.googleapis.com", + ], + ) def test_maybe_start_refresh_is_skipped_for_regional_endpoint( - self, mock_start_refresh + self, mock_start_refresh, url ): creds = CredentialsImpl() - with mock.patch.dict( - os.environ, - {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}, - ): - creds._maybe_start_regional_access_boundary_refresh( - mock.Mock(), "https://my-service.us-east1.rep.googleapis.com" - ) + creds._maybe_start_regional_access_boundary_refresh(mock.Mock(), url) mock_start_refresh.assert_not_called() @mock.patch( @@ -180,13 +128,9 @@ def test_maybe_start_refresh_is_skipped_for_regional_endpoint( def test_maybe_start_refresh_is_triggered(self, mock_start_refresh): creds = CredentialsImpl() request = mock.Mock() - with mock.patch.dict( - os.environ, - {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}, - ): - creds._maybe_start_regional_access_boundary_refresh( - request, "http://example.com" - ) + creds._maybe_start_regional_access_boundary_refresh( + request, "http://example.com" + ) mock_start_refresh.assert_called_once_with(creds, request, creds._rab_manager) def test_apply_headers_success(self): @@ -327,13 +271,9 @@ def test_maybe_start_refresh_is_skipped_if_non_default_universe_domain( self, mock_start_refresh ): creds = CredentialsImpl(universe_domain="not.googleapis.com") - with mock.patch.dict( - os.environ, - {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}, - ): - creds._maybe_start_regional_access_boundary_refresh( - mock.Mock(), "http://example.com" - ) + creds._maybe_start_regional_access_boundary_refresh( + mock.Mock(), "http://example.com" + ) mock_start_refresh.assert_not_called() @mock.patch( @@ -346,13 +286,9 @@ def test_maybe_start_refresh_handles_url_parse_errors( mock_urlparse.side_effect = ValueError("Malformed URL") creds = CredentialsImpl() request = mock.Mock() - with mock.patch.dict( - os.environ, - {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}, - ): - creds._maybe_start_regional_access_boundary_refresh( - request, "http://malformed-url" - ) + creds._maybe_start_regional_access_boundary_refresh( + request, "http://malformed-url" + ) mock_start_refresh.assert_called_once_with(creds, request, creds._rab_manager) @mock.patch( @@ -362,13 +298,9 @@ def test_maybe_start_refresh_blocking(self, mock_start_blocking_refresh): creds = CredentialsImpl() creds._rab_manager._use_blocking_regional_access_boundary_lookup = True request = mock.Mock() - with mock.patch.dict( - os.environ, - {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}, - ): - creds._maybe_start_regional_access_boundary_refresh( - request, "http://example.com" - ) + creds._maybe_start_regional_access_boundary_refresh( + request, "http://example.com" + ) mock_start_blocking_refresh.assert_called_once_with(creds, request) def test_start_blocking_refresh_success(self): @@ -622,19 +554,15 @@ async def test_maybe_start_refresh_async_blocking(self): creds._rab_manager._use_blocking_regional_access_boundary_lookup = True request = mock.Mock() - with mock.patch.dict( - os.environ, - {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}, - ): - with mock.patch.object( - creds._rab_manager, - "start_blocking_refresh_async", - new_callable=mock.AsyncMock, - ) as mock_start_blocking: - await creds._maybe_start_regional_access_boundary_refresh_async( - request, "http://example.com" - ) - mock_start_blocking.assert_called_once_with(creds, request) + with mock.patch.object( + creds._rab_manager, + "start_blocking_refresh_async", + new_callable=mock.AsyncMock, + ) as mock_start_blocking: + await creds._maybe_start_regional_access_boundary_refresh_async( + request, "http://example.com" + ) + mock_start_blocking.assert_called_once_with(creds, request) @pytest.mark.asyncio async def test_start_blocking_refresh_async_success(self): diff --git a/packages/google-auth/tests/test_aws.py b/packages/google-auth/tests/test_aws.py index b6b1ca2319ed..e0eba29737b0 100644 --- a/packages/google-auth/tests/test_aws.py +++ b/packages/google-auth/tests/test_aws.py @@ -1038,6 +1038,21 @@ def test_from_info_supplier(self, mock_init): trust_boundary=None, ) + @mock.patch.object(aws.Credentials, "__init__", return_value=None) + def test_from_info_programmatic_supplier_keyword(self, mock_init): + supplier = TestAwsSecurityCredentialsSupplier() + info = { + "audience": AUDIENCE, + "subject_token_type": SUBJECT_TOKEN_TYPE, + "token_url": TOKEN_URL, + } + credentials = aws.Credentials.from_info( + info, aws_security_credentials_supplier=supplier + ) + + assert isinstance(credentials, aws.Credentials) + assert mock_init.call_args[1]["aws_security_credentials_supplier"] == supplier + @mock.patch.object(aws.Credentials, "__init__", return_value=None) def test_from_file_full_options(self, mock_init, tmpdir): info = { diff --git a/packages/google-auth/tests/test_credentials.py b/packages/google-auth/tests/test_credentials.py index 5c7e39d59e84..24cbb98afd94 100644 --- a/packages/google-auth/tests/test_credentials.py +++ b/packages/google-auth/tests/test_credentials.py @@ -407,38 +407,31 @@ def _build_trust_boundary_lookup_url(self): def test_before_request_triggers_rab_refresh(): - with mock.patch( - "google.auth._regional_access_boundary_utils." - "is_regional_access_boundary_enabled", - return_value=True, - ): - with mock.patch( - "google.oauth2._client._lookup_regional_access_boundary" - ) as lookup: - lookup.return_value = {"encodedLocations": "0xA30"} - - creds = CredentialsImpl() - creds = creds._set_blocking_regional_access_boundary_lookup() - - request = mock.Mock() - headers = {} - - # Initial state: no token - assert creds.token is None - - # before_request should trigger token refresh and THEN RAB refresh. - # We verify this by checking that the RAB lookup was called with - # the URL containing the refreshed token. - creds.before_request(request, "GET", "http://example.com", headers) - - assert creds.token == "refreshed-token" - assert headers["authorization"] == "Bearer refreshed-token" - assert headers["x-allowed-locations"] == "0xA30" - - # Verify lookup was called with the refreshed token's URL - lookup.assert_called_once() - args, kwargs = lookup.call_args - assert args[1] == "http://mock.url/lookup_for_refreshed-token" + with mock.patch("google.oauth2._client._lookup_regional_access_boundary") as lookup: + lookup.return_value = {"encodedLocations": "0xA30"} + + creds = CredentialsImpl() + creds = creds._set_blocking_regional_access_boundary_lookup() + + request = mock.Mock() + headers = {} + + # Initial state: no token + assert creds.token is None + + # before_request should trigger token refresh and THEN RAB refresh. + # We verify this by checking that the RAB lookup was called with + # the URL containing the refreshed token. + creds.before_request(request, "GET", "http://example.com", headers) + + assert creds.token == "refreshed-token" + assert headers["authorization"] == "Bearer refreshed-token" + assert headers["x-allowed-locations"] == "0xA30" + + # Verify lookup was called with the refreshed token's URL + lookup.assert_called_once() + args, kwargs = lookup.call_args + assert args[1] == "http://mock.url/lookup_for_refreshed-token" def test_maybe_start_regional_access_boundary_refresh_invalid_url(): diff --git a/packages/google-auth/tests/test_external_account.py b/packages/google-auth/tests/test_external_account.py index 870b07d47b6e..e887a45db208 100644 --- a/packages/google-auth/tests/test_external_account.py +++ b/packages/google-auth/tests/test_external_account.py @@ -1028,6 +1028,37 @@ def test_refresh_impersonation_propagates_rab_config( is credentials._impersonated_credentials._rab_manager ) + def test_cached_token_initializes_impersonated_credentials(self): + # Initialize credentials with impersonation. + credentials = self.make_credentials( + service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL, + scopes=self.SCOPES, + ) + + assert credentials._impersonated_credentials is None + + # Simulate cached token by setting it directly. + credentials.token = "CACHED_SA_TOKEN" + credentials.expiry = _helpers.utcnow() + datetime.timedelta(seconds=3600) + + assert credentials.token == "CACHED_SA_TOKEN" + + request = self.make_mock_request(status=http_client.OK, data={}) + + # Mock RAB refresh on ImpersonatedCredentials to verify delegation. + with mock.patch( + "google.auth.impersonated_credentials.Credentials._maybe_start_regional_access_boundary_refresh" + ) as mock_rab_refresh: + headers = {} + credentials.before_request(request, "GET", "https://example.com", headers) + + assert credentials._impersonated_credentials is not None + assert credentials._impersonated_credentials.token == "CACHED_SA_TOKEN" + assert credentials._impersonated_credentials.expiry == credentials.expiry + + # Verify delegation occurred. + mock_rab_refresh.assert_called_once_with(request, "https://example.com") + @mock.patch( "google.auth.metrics.token_request_access_token_impersonate", return_value=IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, diff --git a/packages/google-auth/tests/test_identity_pool.py b/packages/google-auth/tests/test_identity_pool.py index c68fac64708d..b326bbaae13f 100644 --- a/packages/google-auth/tests/test_identity_pool.py +++ b/packages/google-auth/tests/test_identity_pool.py @@ -604,6 +604,21 @@ def test_from_info_workforce_pool(self, mock_init): trust_boundary=None, ) + @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None) + def test_from_info_programmatic_supplier_keyword(self, mock_init): + supplier = TestSubjectTokenSupplier() + info = { + "audience": AUDIENCE, + "subject_token_type": SUBJECT_TOKEN_TYPE, + "token_url": TOKEN_URL, + } + credentials = identity_pool.Credentials.from_info( + info, subject_token_supplier=supplier + ) + + assert isinstance(credentials, identity_pool.Credentials) + assert mock_init.call_args[1]["subject_token_supplier"] == supplier + @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None) def test_from_file_full_options(self, mock_init, tmpdir): info = {