diff --git a/CHANGELOG.md b/CHANGELOG.md index 2f67cc4..c12e88c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,34 @@ # Change Log (v2.8.1+) +## v4.6.0 [2026-05-27] + +__What's New:__ + +* Added AWS STS JWT federation provider via `GetWebIdentityToken`. +* Added broker scan and scheduled scan support for access broker resource types. +* Added secret rotation support for secrets manager. +* Added policy prioritization support for access broker profiles. + +__Enhancements:__ + +* Added `federation_providers.aws_sts_jwt` for OIDC-based AWS federation using STS web identity tokens. +* Added `access_broker.resources.types.[scan|get_scan_settings]` for on-demand broker scans. +* Added `access_broker.resources.types.[get_scheduled_scan_service|create_scheduled_scan|list_scheduled_scans|enable_scheduled_scans|disable_scheduled_scans|delete_scheduled_scan]` for scheduled scan management. +* Added `secrets_manager.secrets.[metadata|rotation_details|update_rotation|rotate|rotation_history|versions]` for secret rotation and version management. +* Added `access_broker.profiles.policy_prioritization.[enable|disable|reorder]` for profile policy evaluation ordering. + +__Bug Fixes:__ + +* None + +__Dependencies:__ + +* None + +__Other:__ + +* None + ## v4.5.0 [2026-01-26] __What's New:__ diff --git a/src/britive/__init__.py b/src/britive/__init__.py index 330025d..52fde38 100644 --- a/src/britive/__init__.py +++ b/src/britive/__init__.py @@ -1 +1 @@ -__version__ = '4.5.0' +__version__ = '4.6.0' diff --git a/src/britive/access_broker/profiles/__init__.py b/src/britive/access_broker/profiles/__init__.py index bad81b8..28bdf73 100644 --- a/src/britive/access_broker/profiles/__init__.py +++ b/src/britive/access_broker/profiles/__init__.py @@ -2,6 +2,7 @@ from .permissions import Permissions from .policies import Policies +from .policy_prioritization import PolicyPrioritization class Profiles: @@ -11,6 +12,7 @@ def __init__(self, britive) -> None: self.advanced_settings = AdvancedSettings(britive, base_url='/resource-manager/profile/{}/advanced-settings') self.permissions = Permissions(britive) self.policies = Policies(britive) + self.policy_prioritization = PolicyPrioritization(britive) def create( self, name: str, description: str = '', expiration_duration: int = 900000, impersonation: bool = False diff --git a/src/britive/access_broker/profiles/policy_prioritization.py b/src/britive/access_broker/profiles/policy_prioritization.py new file mode 100644 index 0000000..d121446 --- /dev/null +++ b/src/britive/access_broker/profiles/policy_prioritization.py @@ -0,0 +1,46 @@ +class PolicyPrioritization: + def __init__(self, britive) -> None: + self.britive = britive + self.base_url = f'{self.britive.base_url}/resource-manager/profiles' + + def enable(self, profile_id: str) -> None: + """ + Enable policy prioritization for a profile. + + When enabled, policies will be evaluated in sequential order. Prioritizing policies may change + existing access and/or approval conditions for affected identities. + + :param profile_id: ID of the profile. + :return: None + """ + + return self.britive.patch(f'{self.base_url}/{profile_id}', json={'policyOrderingEnabled': True}) + + def disable(self, profile_id: str) -> None: + """ + Disable policy prioritization for a profile. + + When disabled, the system default policy processing will be used. This may change existing access + and/or approval conditions for affected identities. Existing prioritization will be saved and may + be restored later. + + :param profile_id: ID of the profile. + :return: None + """ + + return self.britive.patch(f'{self.base_url}/{profile_id}', json={'policyOrderingEnabled': False}) + + def reorder(self, profile_id: str, policy_ids: list) -> None: + """ + Set the evaluation order of policies for a profile. + + Policy prioritization must be enabled for the profile before reordering. + + :param profile_id: ID of the profile. + :param policy_ids: Ordered list of policy IDs. The first ID in the list will be evaluated first + (order 0), the second will be order 1, and so on. + :return: None + """ + + ordering = [{'id': policy_id, 'order': i} for i, policy_id in enumerate(policy_ids)] + return self.britive.post(f'{self.base_url}/{profile_id}/policies/order', json=ordering) diff --git a/src/britive/access_broker/resources/types.py b/src/britive/access_broker/resources/types.py index 7d291c9..d5c8700 100644 --- a/src/britive/access_broker/resources/types.py +++ b/src/britive/access_broker/resources/types.py @@ -2,6 +2,7 @@ class Types: def __init__(self, britive) -> None: self.britive = britive self.base_url = f'{self.britive.base_url}/resource-manager/resource-types' + self.scheduled_scan_base_url = f'{self.britive.base_url}/tasks/services/resource-scan' def create(self, name: str, description: str = '', fields: list = None) -> dict: """ @@ -82,3 +83,124 @@ def delete(self, resource_type_id: str) -> None: :return: None """ return self.britive.delete(f'{self.base_url}/{resource_type_id}') + + def scan(self, resource_type_id: str, resource_labels: list = None) -> dict: + """ + Trigger a broker scan for resources of the given resource type. + + When called without `resource_labels`, all resources of the resource type are scanned. + When called with `resource_labels`, only resources matching the provided labels are scanned. + + :param resource_type_id: ID of the resource type. + :param resource_labels: Optional list of resource label dicts to filter which resources to scan. + Example: [{'key': 'env', 'label-values': ['prod', 'staging']}] + :return: Details of the scan that was initiated. + """ + + params = {} + if resource_labels: + params['resourceLabels'] = resource_labels + return self.britive.post(f'{self.base_url}/{resource_type_id}/scan', json=params) + + def get_scan_settings(self, resource_type_id: str) -> dict: + """ + Retrieve scan settings for a resource type. + + :param resource_type_id: ID of the resource type. + :return: Scan settings for the resource type. + """ + + return self.britive.get(f'{self.base_url}/{resource_type_id}/scan-settings') + + def get_scheduled_scan_service(self, resource_type_id: str) -> dict: + """ + Retrieve the scheduled scan service for a resource type. + + The scheduled scan service manages all scheduled scan tasks for a given resource type. + Use the returned service ID with `list_scheduled_scans`, `enable_scheduled_scans`, + `disable_scheduled_scans`, and `delete_scheduled_scan`. + + :param resource_type_id: ID of the resource type. + :return: Scheduled scan service details. + """ + + return self.britive.get(f'{self.scheduled_scan_base_url}/resource-types/{resource_type_id}') + + def create_scheduled_scan(self, resource_type_id: str, name: str, description: str = '', + frequency: str = 'Daily', frequency_interval: int = None, + start_time: str = '12:00', resource_labels: list = None) -> dict: + """ + Create a new scheduled scan task for a resource type. + + :param resource_type_id: ID of the resource type. + :param name: Name of the scheduled scan. + :param description: Optional description for the scheduled scan. + :param frequency: Frequency of the scan. One of 'Daily', 'Weekly', or 'Monthly'. Defaults to 'Daily'. + :param frequency_interval: Required for Weekly (0=Sunday..6=Saturday) and Monthly (1-31 day of month). + Not used for Daily. + :param start_time: Time of day to run the scan in HH:MM format in UTC. Defaults to '12:00'. + :param resource_labels: Optional list of resource label dicts to filter which resources to scan. + Example: [{'key': 'env', 'label-values': ['prod', 'staging']}] + :return: Details of the created scheduled scan task. + """ + + task = { + 'name': name, + 'description': description, + 'properties': {}, + 'frequencyType': frequency, + 'frequencyInterval': frequency_interval, + 'startTime': start_time, + } + if resource_labels: + task['properties']['resourceLabels'] = resource_labels + params = { + 'taskService': { + 'name': 'ResourceScanner', + 'enabled': False, + 'queueId': 'resourceScannerQueue', + }, + 'task': task, + } + return self.britive.post(f'{self.scheduled_scan_base_url}/resource-types/{resource_type_id}', json=params) + + def list_scheduled_scans(self, service_id: str) -> list: + """ + List all scheduled scan tasks for a scheduled scan service. + + :param service_id: ID of the scheduled scan service (from `get_scheduled_scan_service`). + :return: List of scheduled scan tasks. + """ + + return self.britive.get(f'{self.scheduled_scan_base_url}/{service_id}/tasks') + + def enable_scheduled_scans(self, service_id: str) -> dict: + """ + Enable scheduled scans for a scheduled scan service. + + :param service_id: ID of the scheduled scan service (from `get_scheduled_scan_service`). + :return: Details of the enabled service. + """ + + return self.britive.post(f'{self.scheduled_scan_base_url}/{service_id}/enabled-statuses') + + def disable_scheduled_scans(self, service_id: str) -> dict: + """ + Disable scheduled scans for a scheduled scan service. + + :param service_id: ID of the scheduled scan service (from `get_scheduled_scan_service`). + :return: Details of the disabled service. + """ + + return self.britive.post(f'{self.scheduled_scan_base_url}/{service_id}/disabled-statuses') + + def delete_scheduled_scan(self, service_id: str, task_id: str) -> None: + """ + Delete a scheduled scan task. + + :param service_id: ID of the scheduled scan service (from `get_scheduled_scan_service`). + :param task_id: ID of the scheduled scan task to delete. + :return: None + """ + + return self.britive.delete(f'{self.scheduled_scan_base_url}/{service_id}/tasks/{task_id}') diff --git a/src/britive/federation_providers/__init__.py b/src/britive/federation_providers/__init__.py index ba95794..04129f0 100644 --- a/src/britive/federation_providers/__init__.py +++ b/src/britive/federation_providers/__init__.py @@ -1,4 +1,5 @@ from .aws import AwsFederationProvider +from .aws_sts_jwt import AwsStsJwtFederationProvider from .azure_system_assigned_managed_identity import AzureSystemAssignedManagedIdentityFederationProvider from .azure_user_assigned_managed_identity import AzureUserAssignedManagedIdentityFederationProvider from .bitbucket import BitbucketFederationProvider @@ -12,6 +13,7 @@ class FederationProviders: def __init__(self, britive) -> None: self.aws = AwsFederationProvider(britive) + self.aws_sts_jwt = AwsStsJwtFederationProvider(britive) self.azure_system_assigned_managed_identity = AzureSystemAssignedManagedIdentityFederationProvider(britive) self.azure_user_assigned_managed_identity = AzureUserAssignedManagedIdentityFederationProvider(britive) self.bitbucket = BitbucketFederationProvider(britive) diff --git a/src/britive/federation_providers/aws_sts_jwt.py b/src/britive/federation_providers/aws_sts_jwt.py new file mode 100644 index 0000000..412e514 --- /dev/null +++ b/src/britive/federation_providers/aws_sts_jwt.py @@ -0,0 +1,48 @@ +from .federation_provider import FederationProvider + + +class AwsStsJwtFederationProvider(FederationProvider): + """Federation provider that obtains an OIDC JWT from AWS STS via GetWebIdentityToken. + + :param profile: AWS profile name to use for the boto3 session. + :param audience: Audience claim for the JWT. Defaults to 'britive'. + :param duration_seconds: Token validity in seconds (clamped to 60-3600). Defaults to 300. + :param signing_algorithm: JWT signing algorithm. Defaults to 'ES384'. + """ + + def __init__( + self, + profile: str = None, + audience: str = None, + duration_seconds: int = 300, + signing_algorithm: str = 'ES384', + ) -> None: + self.profile = profile + self.audience = audience or 'britive' + self.duration_seconds = max(60, min(3600, duration_seconds)) + self.signing_algorithm = signing_algorithm + super().__init__() + + def get_token(self) -> str: + try: + import boto3 + import botocore.exceptions as botoexceptions + except ImportError as e: + raise Exception( + 'boto3 required - please install boto3 package to use the aws-sts-jwt federation provider' + ) from e + + try: + session = boto3.Session(profile_name=self.profile) + except botoexceptions.ProfileNotFound as e: + raise Exception(f'Error: {e!s}') from e + + sts_client = session.client('sts') + + response = sts_client.get_web_identity_token( + Audience=[self.audience], + DurationSeconds=self.duration_seconds, + SigningAlgorithm=self.signing_algorithm, + ) + + return f'OIDC::{response["WebIdentityToken"]}' diff --git a/src/britive/helpers/utils.py b/src/britive/helpers/utils.py index be9cbce..1346dfa 100644 --- a/src/britive/helpers/utils.py +++ b/src/britive/helpers/utils.py @@ -10,6 +10,7 @@ from britive.exceptions.unauthorized import InvalidTenantError, unauthorized_code_map from britive.federation_providers import ( AwsFederationProvider, + AwsStsJwtFederationProvider, AzureSystemAssignedManagedIdentityFederationProvider, AzureUserAssignedManagedIdentityFederationProvider, BitbucketFederationProvider, @@ -102,9 +103,10 @@ def source_federation_token(provider: str, tenant: Optional[str] = None, duratio sourced outside of this SDK and provided as input via the standard token presentation options. - Six federation providers are currently supported by this method. + The following federation providers are currently supported by this method. * AWS IAM/STS, with optional profile specified - (aws) + * AWS STS JWT via GetWebIdentityToken - (awsstsjwt) * Azure System Assigned Managed Identities (azuresmi) * Azure User Assigned Managed Identities (azureumi) * Bitbucket Pipelines (bitbucket) @@ -116,14 +118,21 @@ def source_federation_token(provider: str, tenant: Optional[str] = None, duratio Any other OIDC federation provider can be used and tokens can be provided to this class for authentication to a Britive tenant. Details of how to construct these tokens can be found at https://docs.britive.com. - :param provider: The name of the federation provider. Valid options are `aws`, `azuresmi`, `azureumi`, `bitbucket`, - `gcp`, `github`, `gitlab`, and `spacelift`. + :param provider: The name of the federation provider. Valid options are `aws`, `awsstsjwt`, `azuresmi`, `azureumi`, + `bitbucket`, `gcp`, `github`, `gitlab`, and `spacelift`. For the AWS provider it is possible to provide a profile via value `aws-profile`. If no profile is provided then the boto3 `Session.get_credentials()` method will be used to obtain AWS credentials, which follows the order provided here: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html#configuring-credentials + For the AWS STS JWT provider (awsstsjwt) it is possible to provide optional parameters via pipe-delimited + values: `awsstsjwt-|||`. All parameters are optional. + Use an empty string for profile to skip it (e.g. `awsstsjwt-|myaudience`). Defaults: audience=`britive`, + signing_algorithm=`ES384`, duration_seconds=`300`. Valid signing algorithms are `ES384` and `RS256`. + Duration must be between 60 and 3600 seconds. This provider requires the AWS account to have IAM outbound + identity federation enabled. + For Azure User Assigned Managed Identities (azureumi) a client id is required. It must be provided in the form `azureumi-`. From the Azure documentation...a user-assigned identity's client ID or, when using Pod Identity, the client ID of an Azure AD app registration. This argument @@ -168,6 +177,16 @@ def source_federation_token(provider: str, tenant: Optional[str] = None, duratio if provider_name in federation_providers: return federation_providers[provider_name]() + if provider_name == 'awsstsjwt': + parts = helper[1].split('|') if len(helper) > 1 else [] + profile = safe_list_get(parts, 0) or None + audience = safe_list_get(parts, 1) or None + signing_algorithm = safe_list_get(parts, 2) or 'ES384' + duration = int(safe_list_get(parts, 3) or 300) + return AwsStsJwtFederationProvider( + profile=profile, audience=audience, duration_seconds=duration, signing_algorithm=signing_algorithm + ).get_token() + if provider_name == 'azuresmi': return AzureSystemAssignedManagedIdentityFederationProvider(audience=safe_list_get(helper, 1)).get_token() diff --git a/src/britive/secrets_manager/secrets.py b/src/britive/secrets_manager/secrets.py index c6e3a16..08f668f 100644 --- a/src/britive/secrets_manager/secrets.py +++ b/src/britive/secrets_manager/secrets.py @@ -164,3 +164,86 @@ def access(self, vault_id: str, path: str, get_metadata: bool = False) -> dict: params = {'getmetadata': get_metadata} return self.britive.get(f'{self.base_url}/{vault_id}/secrets?path={path}', params=params) + + def metadata(self, vault_id: str, path: str) -> dict: + """ + Retrieve metadata for a secret, including rotation configuration. + + :param vault_id: ID of the vault. + :param path: path of the secret, include the / at the beginning. + :return: Secret metadata including rotation interval, last/next rotation timestamps. + """ + + return self.britive.get(f'{self.base_url}/{vault_id}/secret-metadata?path={path}') + + def rotation_details(self, vault_id: str, path: str) -> dict: + """ + Retrieve admin-level secret details including rotation targets and configuration. + + :param vault_id: ID of the vault. + :param path: path of the secret, include the / at the beginning. + :return: Secret rotation details including resource, account, and rotation template mappings. + """ + + return self.britive.get(f'{self.base_url}/{vault_id}/admin/accesssecrets?path={path}') + + def update_rotation(self, vault_id: str, path: str, rotation_config: dict = None, **kwargs) -> None: + """ + Update rotation configuration for a secret. + + The rotation config is sent as part of the secret PATCH body. This can include + resource/account mapping, rotation template, notification settings, etc. + + :param vault_id: ID of the vault. + :param path: path of the secret, include the / at the beginning. + :param rotation_config: dict of rotation configuration fields to set on the secret. + :param kwargs: additional fields to include in the PATCH body. + :return: None + """ + + params = {} + if rotation_config: + params.update(rotation_config) + params.update(kwargs) + return self.britive.patch(f'{self.base_url}/{vault_id}/secrets?path={path}', json=params) + + def rotate(self, vault_id: str, path: str, value: dict = None, sync_to_target: bool = True) -> dict: + """ + Trigger rotation for a secret (update password and sync to target). + + Updates the secret value and optionally syncs the new value to the mapped target resource. + Requires that the secret has a resource and account mapped in its rotation details. + + :param vault_id: ID of the vault. + :param path: path of the secret, include the / at the beginning. + :param value: new secret value dict (e.g. {'Password': 'newpass'}). If None, only sync is triggered. + :param sync_to_target: whether to sync the updated value to the target resource. Defaults to True. + :return: Details of the rotation operation. + """ + + params = {'syncToTarget': sync_to_target} + if value: + params['value'] = value + return self.britive.patch(f'{self.base_url}/{vault_id}/secrets?path={path}', json=params) + + def rotation_history(self, vault_id: str, secret_id: str) -> list: + """ + Retrieve rotation history for a secret. + + :param vault_id: ID of the vault. + :param secret_id: ID of the secret. + :return: List of rotation history entries with date, status, executed by, and type. + """ + + return self.britive.get(f'{self.base_url}/{vault_id}/secrets/{secret_id}/rotate/history') + + def versions(self, vault_id: str, secret_id: str) -> list: + """ + List all versions of a secret. + + :param vault_id: ID of the vault. + :param secret_id: ID of the secret. + :return: List of secret versions with version number, creation date, and created by. + """ + + return self.britive.get(f'{self.base_url}/{vault_id}/secrets/{secret_id}/versions')