From 5a1720b1f4e7d75ff13608bae9b1683821a1af09 Mon Sep 17 00:00:00 2001 From: Bartosz Blizniak Date: Thu, 30 Apr 2026 15:44:41 +0100 Subject: [PATCH 1/5] add metadata API client and client test --- cloudsmith_cli/core/api/metadata.py | 247 +++++++++++++ cloudsmith_cli/core/tests/test_metadata.py | 397 +++++++++++++++++++++ 2 files changed, 644 insertions(+) create mode 100644 cloudsmith_cli/core/api/metadata.py create mode 100644 cloudsmith_cli/core/tests/test_metadata.py diff --git a/cloudsmith_cli/core/api/metadata.py b/cloudsmith_cli/core/api/metadata.py new file mode 100644 index 00000000..ce884b69 --- /dev/null +++ b/cloudsmith_cli/core/api/metadata.py @@ -0,0 +1,247 @@ +"""API - Package metadata (v2) endpoints.""" + +import json +from typing import Any, Optional, Union + +import cloudsmith_api + +from .. import ratelimits, utils +from ..pagination import PageInfo +from ..rest import RestClient +from .exceptions import catch_raise_api_exception + +SOURCE_KIND_VALUES = { + "unknown": 0, + "system": 1, + "ecosystem": 2, + "customer": 3, + "third_party": 4, +} + +CLASSIFICATION_VALUES = { + "unknown": 0, + "intrinsic": 1, + "upstream": 2, + "security": 3, + "provenance": 4, + "sbom": 5, + "generic": 6, +} + + +def _normalise_enum(value, mapping, name): + if value is None: + return None + if isinstance(value, bool): + raise ValueError(f"Invalid {name} value: {value!r}") + if isinstance(value, int): + return value + if isinstance(value, str): + text = value.strip() + if not text: + raise ValueError(f"Invalid {name} value: {value!r}") + try: + return int(text) + except ValueError: + pass + key = text.lower().replace("-", "_") + try: + return mapping[key] + except KeyError: + valid = ", ".join(sorted(mapping)) + raise ValueError( + f"Invalid {name} {value!r}. Expected an integer or one of: {valid}." + ) + raise ValueError(f"Invalid {name} type: {type(value).__name__}") + + +def normalise_source_kind(value): + """Coerce a MetadataSourceKind name or integer to its integer value.""" + return _normalise_enum(value, SOURCE_KIND_VALUES, "source_kind") + + +def normalise_classification(value): + """Coerce a MetadataClassification name or integer to its integer value.""" + return _normalise_enum(value, CLASSIFICATION_VALUES, "classification") + + +class _MetadataApi: + """Small client for metadata endpoints not yet present in cloudsmith_api.""" + + def __init__(self): + self.config = cloudsmith_api.Configuration() + self.rest_client = RestClient( + error_retry_cb=getattr(self.config, "error_retry_cb", None), + respect_retry_after_header=getattr(self.config, "rate_limit", True), + ) + + +def get_metadata_api(): + """Get the metadata API client.""" + return _MetadataApi() + + +def _build_url(config, *parts): + host = (config.host or "").rstrip("/") + suffix = "/".join(p.strip("/") for p in parts if p) + return f"{host}/v2/{suffix}/" + + +def _build_headers(config): + """Build request headers from the configured cloudsmith_api.Configuration. + + Mirrors the auth resolution performed by core/api/init.py: an Authorization + header (SSO bearer or Basic) takes precedence; otherwise we fall back to + the X-Api-Key header. + """ + headers = {"Accept": "application/json", "Content-Type": "application/json"} + + user_agent = getattr(config, "user_agent", None) + if user_agent: + headers["User-Agent"] = user_agent + + extra = getattr(config, "headers", None) or {} + auth_header = extra.get("Authorization") + if auth_header: + headers["Authorization"] = auth_header + else: + api_key = (config.api_key or {}).get("X-Api-Key") + if api_key: + headers["X-Api-Key"] = api_key + + return headers + + +def _request(client, method, *path_parts, query_params=None, body=None): + url = _build_url(client.config, *path_parts) + + with catch_raise_api_exception(): + response = client.rest_client.request( + method, + url, + query_params=query_params, + headers=_build_headers(client.config), + body=body, + ) + + ratelimits.maybe_rate_limit(client, response.getheaders()) + return response + + +def _response_json(response): + if not response.data: + return {} + return json.loads(response.data) + + +def list_metadata( + package_slug_perm: str, + *, + source_kind: Optional[Union[int, str]] = None, + classification: Optional[Union[int, str]] = None, + page: Optional[int] = None, + page_size: Optional[int] = None, +): + """List metadata entries attached to a package. + + `source_kind` and `classification` may be supplied as either an integer + or the matching enum name (case-insensitive); both are converted to the + integer the v2 API expects before the request is issued. + + Returns a (results, PageInfo) tuple. + """ + client = get_metadata_api() + api_kwargs = {} + + source_kind_value = normalise_source_kind(source_kind) + if source_kind_value is not None: + api_kwargs["source_kind"] = source_kind_value + + classification_value = normalise_classification(classification) + if classification_value is not None: + api_kwargs["classification"] = classification_value + + api_kwargs.update(utils.get_page_kwargs(page=page, page_size=page_size)) + + response = _request( + client, + "GET", + "packages", + package_slug_perm, + "metadata", + query_params=api_kwargs or None, + ) + + payload = _response_json(response) + results = payload.get("results", payload) if isinstance(payload, dict) else payload + page_info = PageInfo.from_headers(response.getheaders()) + return results, page_info + + +def create_metadata( + package_slug_perm: str, + *, + content: Any, + content_type: str, + source_identity: str, +): + """Attach a new metadata entry to a package.""" + client = get_metadata_api() + body = { + "content": content, + "content_type": content_type, + "source_identity": source_identity, + } + response = _request( + client, "POST", "packages", package_slug_perm, "metadata", body=body + ) + return _response_json(response) + + +def update_metadata( + package_slug_perm: str, + metadata_slug_perm: str, + *, + content: Any = None, + source_identity: Optional[str] = None, +): + """Patch an existing customer-owned metadata entry. + + Only `content` and `source_identity` are mutable; the v2 API rejects + attempts to change `content_type`. Fields left as None are omitted from + the patch body so existing values are preserved. + """ + client = get_metadata_api() + body = {} + if content is not None: + body["content"] = content + if source_identity is not None: + body["source_identity"] = source_identity + if not body: + raise ValueError( + "update_metadata requires at least one of content or source_identity" + ) + + response = _request( + client, + "PATCH", + "packages", + package_slug_perm, + "metadata", + metadata_slug_perm, + body=body, + ) + return _response_json(response) + + +def delete_metadata(package_slug_perm: str, metadata_slug_perm: str): + """Remove a customer-owned metadata entry from a package.""" + client = get_metadata_api() + _request( + client, + "DELETE", + "packages", + package_slug_perm, + "metadata", + metadata_slug_perm, + ) diff --git a/cloudsmith_cli/core/tests/test_metadata.py b/cloudsmith_cli/core/tests/test_metadata.py new file mode 100644 index 00000000..e35c159d --- /dev/null +++ b/cloudsmith_cli/core/tests/test_metadata.py @@ -0,0 +1,397 @@ +"""Tests for the v2 package metadata API client.""" + +import json + +import httpretty +import httpretty.core +import pytest + +from .. import keyring +from ..api import metadata +from ..api.exceptions import ApiException +from ..api.init import initialise_api + +API_HOST = "https://api.cloudsmith.io" +PKG = "pkg-slug" +META = "meta-slug" +LIST_URL = f"{API_HOST}/v2/packages/{PKG}/metadata/" +DETAIL_URL = f"{API_HOST}/v2/packages/{PKG}/metadata/{META}/" + + +@pytest.fixture(autouse=True) +def _setup_api(monkeypatch): + """Initialise the SDK Configuration and stub keyring lookups. + + initialise_api() registers custom retry attributes on cloudsmith_api.Configuration + that create_requests_session expects, and the metadata module reads host/auth + off the same Configuration singleton. Keyring is stubbed so we never touch the + user's real SSO tokens during a test run. + """ + monkeypatch.setattr(keyring, "get_access_token", lambda host: None) + monkeypatch.setattr(keyring, "get_refresh_token", lambda host: None) + monkeypatch.setattr(keyring, "should_refresh_access_token", lambda host: False) + monkeypatch.setattr( + httpretty.core.fakesock.socket, + "shutdown", + lambda self, how: None, + raising=False, + ) + initialise_api(host=API_HOST, key="test-api-key") + + +def _last_request(): + return httpretty.last_request() + + +class TestNormalisers: + @pytest.mark.parametrize( + "value, expected", + [ + (None, None), + (3, 3), + ("3", 3), + ("customer", 3), + ("CUSTOMER", 3), + ("Third-Party", 4), + ("third_party", 4), + ], + ) + def test_source_kind(self, value, expected): + assert metadata.normalise_source_kind(value) == expected + + @pytest.mark.parametrize( + "value, expected", + [ + (None, None), + (6, 6), + ("generic", 6), + ("GENERIC", 6), + ("PROVENANCE", 4), + ], + ) + def test_classification(self, value, expected): + assert metadata.normalise_classification(value) == expected + + def test_invalid_source_kind_name(self): + with pytest.raises(ValueError, match="Invalid source_kind"): + metadata.normalise_source_kind("not-a-kind") + + def test_invalid_classification_name(self): + with pytest.raises(ValueError, match="Invalid classification"): + metadata.normalise_classification("nope") + + def test_invalid_type(self): + with pytest.raises(ValueError): + metadata.normalise_source_kind(3.14) + + def test_bool_rejected(self): + with pytest.raises(ValueError): + metadata.normalise_source_kind(True) + + +class TestListMetadata: + @httpretty.activate(allow_net_connect=False) + def test_success_returns_results_and_page_info(self): + body = {"results": [{"slug_perm": "abc", "content_type": "application/json"}]} + httpretty.register_uri( + httpretty.GET, + LIST_URL, + body=json.dumps(body), + status=200, + content_type="application/json", + adding_headers={ + "X-Pagination-Count": "1", + "X-Pagination-Page": "1", + "X-Pagination-PageSize": "30", + "X-Pagination-PageTotal": "1", + }, + ) + + results, page_info = metadata.list_metadata(PKG) + + assert results == body["results"] + assert page_info.is_valid + assert page_info.count == 1 + + sent = _last_request() + assert sent.headers.get("X-Api-Key") == "test-api-key" + assert sent.headers.get("Accept") == "application/json" + + @httpretty.activate(allow_net_connect=False) + def test_filters_normalised_to_integers(self): + httpretty.register_uri( + httpretty.GET, + LIST_URL, + body=json.dumps({"results": []}), + status=200, + content_type="application/json", + ) + + metadata.list_metadata( + PKG, source_kind="customer", classification="GENERIC", page=2, page_size=50 + ) + + qs = _last_request().querystring # pylint: disable=no-member + assert qs["source_kind"] == ["3"] + assert qs["classification"] == ["6"] + assert qs["page"] == ["2"] + assert qs["page_size"] == ["50"] + + @httpretty.activate(allow_net_connect=False) + def test_non_positive_page_options_omitted(self): + httpretty.register_uri( + httpretty.GET, + LIST_URL, + body=json.dumps({"results": []}), + status=200, + content_type="application/json", + ) + + metadata.list_metadata(PKG, page=0, page_size=0) + + qs = _last_request().querystring # pylint: disable=no-member + assert "page" not in qs + assert "page_size" not in qs + + @httpretty.activate(allow_net_connect=False) + def test_404_raises_api_exception(self): + httpretty.register_uri( + httpretty.GET, + LIST_URL, + body=json.dumps({"detail": "Not found."}), + status=404, + content_type="application/json", + ) + + with pytest.raises(ApiException) as exc_info: + metadata.list_metadata(PKG) + + assert exc_info.value.status == 404 + assert exc_info.value.detail == "Not found." + + @httpretty.activate(allow_net_connect=False) + def test_422_raises_with_fields(self): + body = { + "detail": "Invalid query parameters.", + "fields": {"source_kind": ["Not a valid choice."]}, + } + httpretty.register_uri( + httpretty.GET, + LIST_URL, + body=json.dumps(body), + status=422, + content_type="application/json", + ) + + with pytest.raises(ApiException) as exc_info: + metadata.list_metadata(PKG, source_kind=3) + + assert exc_info.value.status == 422 + assert exc_info.value.fields == {"source_kind": ["Not a valid choice."]} + + +class TestCreateMetadata: + @httpretty.activate(allow_net_connect=False) + def test_success_posts_required_fields(self): + created = { + "slug_perm": "new-slug", + "content_type": "application/json", + "source_identity": "cloudsmith-cli@1.16.0", + } + httpretty.register_uri( + httpretty.POST, + LIST_URL, + body=json.dumps(created), + status=201, + content_type="application/json", + ) + + result = metadata.create_metadata( + PKG, + content={"foo": "bar"}, + content_type="application/json", + source_identity="cloudsmith-cli@1.16.0", + ) + + assert result == created + sent_body = json.loads(_last_request().body) + assert sent_body == { + "content": {"foo": "bar"}, + "content_type": "application/json", + "source_identity": "cloudsmith-cli@1.16.0", + } + + @httpretty.activate(allow_net_connect=False) + def test_404_when_package_unknown(self): + httpretty.register_uri( + httpretty.POST, + LIST_URL, + body=json.dumps({"detail": "Not found."}), + status=404, + content_type="application/json", + ) + + with pytest.raises(ApiException) as exc_info: + metadata.create_metadata( + PKG, + content={"x": 1}, + content_type="application/json", + source_identity="customer:test", + ) + assert exc_info.value.status == 404 + + @httpretty.activate(allow_net_connect=False) + def test_422_carries_field_errors(self): + body = { + "detail": "Validation failed.", + "fields": {"content": ["Content must be a JSON object."]}, + } + httpretty.register_uri( + httpretty.POST, + LIST_URL, + body=json.dumps(body), + status=422, + content_type="application/json", + ) + + with pytest.raises(ApiException) as exc_info: + metadata.create_metadata( + PKG, + content="not-an-object", + content_type="application/json", + source_identity="customer:test", + ) + + assert exc_info.value.status == 422 + assert exc_info.value.detail == "Validation failed." + assert exc_info.value.fields == {"content": ["Content must be a JSON object."]} + + +class TestUpdateMetadata: + @httpretty.activate(allow_net_connect=False) + def test_success_sends_only_provided_fields(self): + updated = {"slug_perm": META, "source_identity": "customer:new"} + httpretty.register_uri( + httpretty.PATCH, + DETAIL_URL, + body=json.dumps(updated), + status=200, + content_type="application/json", + ) + + result = metadata.update_metadata(PKG, META, source_identity="customer:new") + + assert result == updated + assert json.loads(_last_request().body) == {"source_identity": "customer:new"} + + @httpretty.activate(allow_net_connect=False) + def test_rejects_empty_patch(self): + with pytest.raises(ValueError): + metadata.update_metadata(PKG, META) + + @httpretty.activate(allow_net_connect=False) + def test_404_on_unknown_metadata(self): + httpretty.register_uri( + httpretty.PATCH, + DETAIL_URL, + body=json.dumps({"detail": "Not found."}), + status=404, + content_type="application/json", + ) + + with pytest.raises(ApiException) as exc_info: + metadata.update_metadata(PKG, META, content={"k": "v"}) + assert exc_info.value.status == 404 + + @httpretty.activate(allow_net_connect=False) + def test_422_when_changing_content_type(self): + body = { + "detail": "Validation failed.", + "fields": { + "content_type": "content_type cannot be changed after creation." + }, + } + httpretty.register_uri( + httpretty.PATCH, + DETAIL_URL, + body=json.dumps(body), + status=422, + content_type="application/json", + ) + + with pytest.raises(ApiException) as exc_info: + metadata.update_metadata(PKG, META, content={"k": "v"}) + + assert exc_info.value.status == 422 + assert exc_info.value.fields == { + "content_type": "content_type cannot be changed after creation." + } + + +class TestDeleteMetadata: + @httpretty.activate(allow_net_connect=False) + def test_success_returns_none(self): + httpretty.register_uri(httpretty.DELETE, DETAIL_URL, status=204) + + assert metadata.delete_metadata(PKG, META) is None + assert _last_request().method == "DELETE" + + @httpretty.activate(allow_net_connect=False) + def test_404_raises(self): + httpretty.register_uri( + httpretty.DELETE, + DETAIL_URL, + body=json.dumps({"detail": "Not found."}), + status=404, + content_type="application/json", + ) + + with pytest.raises(ApiException) as exc_info: + metadata.delete_metadata(PKG, META) + assert exc_info.value.status == 404 + + @httpretty.activate(allow_net_connect=False) + def test_422_raises(self): + body = { + "detail": "Cannot delete.", + "fields": {"non_field_errors": ["Metadata is read-only."]}, + } + httpretty.register_uri( + httpretty.DELETE, + DETAIL_URL, + body=json.dumps(body), + status=422, + content_type="application/json", + ) + + with pytest.raises(ApiException) as exc_info: + metadata.delete_metadata(PKG, META) + + assert exc_info.value.status == 422 + assert exc_info.value.fields == {"non_field_errors": ["Metadata is read-only."]} + + +class TestAuthHeaders: + @httpretty.activate(allow_net_connect=False) + def test_sso_authorization_header_takes_precedence(self): + # initialise_api with no key, then mutate Configuration to mimic post-SSO state + import cloudsmith_api + + cfg = cloudsmith_api.Configuration() + cfg.api_key = {} + cfg.headers = {"Authorization": "Bearer sso-token"} + cloudsmith_api.Configuration.set_default(cfg) + + httpretty.register_uri( + httpretty.GET, + LIST_URL, + body=json.dumps({"results": []}), + status=200, + content_type="application/json", + ) + + metadata.list_metadata(PKG) + + sent = _last_request() + assert sent.headers.get("Authorization") == "Bearer sso-token" + assert sent.headers.get("X-Api-Key") is None From 76bc280a0221a7e78a0560f7fb8e8adf650ad507 Mon Sep 17 00:00:00 2001 From: Bartosz Blizniak Date: Thu, 30 Apr 2026 16:14:19 +0100 Subject: [PATCH 2/5] copilot feedback --- cloudsmith_cli/core/api/metadata.py | 7 ++-- cloudsmith_cli/core/tests/test_metadata.py | 41 +++++++++++++++++----- 2 files changed, 36 insertions(+), 12 deletions(-) diff --git a/cloudsmith_cli/core/api/metadata.py b/cloudsmith_cli/core/api/metadata.py index ce884b69..3e06b73d 100644 --- a/cloudsmith_cli/core/api/metadata.py +++ b/cloudsmith_cli/core/api/metadata.py @@ -95,15 +95,14 @@ def _build_headers(config): the X-Api-Key header. """ headers = {"Accept": "application/json", "Content-Type": "application/json"} + headers.update(getattr(config, "headers", None) or {}) user_agent = getattr(config, "user_agent", None) if user_agent: headers["User-Agent"] = user_agent - extra = getattr(config, "headers", None) or {} - auth_header = extra.get("Authorization") - if auth_header: - headers["Authorization"] = auth_header + if headers.get("Authorization"): + headers.pop("X-Api-Key", None) else: api_key = (config.api_key or {}).get("X-Api-Key") if api_key: diff --git a/cloudsmith_cli/core/tests/test_metadata.py b/cloudsmith_cli/core/tests/test_metadata.py index e35c159d..479c1a86 100644 --- a/cloudsmith_cli/core/tests/test_metadata.py +++ b/cloudsmith_cli/core/tests/test_metadata.py @@ -2,6 +2,7 @@ import json +import cloudsmith_api import httpretty import httpretty.core import pytest @@ -372,16 +373,20 @@ def test_422_raises(self): class TestAuthHeaders: - @httpretty.activate(allow_net_connect=False) - def test_sso_authorization_header_takes_precedence(self): - # initialise_api with no key, then mutate Configuration to mimic post-SSO state - import cloudsmith_api - + @staticmethod + def _override_config(monkeypatch, *, api_key=None, headers=None): cfg = cloudsmith_api.Configuration() - cfg.api_key = {} - cfg.headers = {"Authorization": "Bearer sso-token"} - cloudsmith_api.Configuration.set_default(cfg) + cfg.api_key = api_key if api_key is not None else cfg.api_key + cfg.headers = headers if headers is not None else cfg.headers + monkeypatch.setattr(cloudsmith_api.Configuration, "_default", cfg) + @httpretty.activate(allow_net_connect=False) + def test_sso_authorization_header_takes_precedence(self, monkeypatch): + self._override_config( + monkeypatch, + api_key={"X-Api-Key": "test-api-key"}, + headers={"Authorization": "Bearer sso-token"}, + ) httpretty.register_uri( httpretty.GET, LIST_URL, @@ -395,3 +400,23 @@ def test_sso_authorization_header_takes_precedence(self): sent = _last_request() assert sent.headers.get("Authorization") == "Bearer sso-token" assert sent.headers.get("X-Api-Key") is None + + @httpretty.activate(allow_net_connect=False) + def test_extra_config_headers_are_preserved(self, monkeypatch): + self._override_config( + monkeypatch, + headers={"X-Custom-Header": "custom-value"}, + ) + httpretty.register_uri( + httpretty.GET, + LIST_URL, + body=json.dumps({"results": []}), + status=200, + content_type="application/json", + ) + + metadata.list_metadata(PKG) + + sent = _last_request() + assert sent.headers.get("X-Custom-Header") == "custom-value" + assert sent.headers.get("X-Api-Key") == "test-api-key" From c77930a0b955008673c22d528b5cdd7c06d3e1d8 Mon Sep 17 00:00:00 2001 From: Bartosz Blizniak Date: Fri, 1 May 2026 12:39:51 +0100 Subject: [PATCH 3/5] Add metadata CRUD commands to the CLI --- cloudsmith_cli/cli/commands/__init__.py | 1 + cloudsmith_cli/cli/commands/metadata.py | 533 ++++++++++++++ .../cli/tests/commands/test_metadata.py | 650 ++++++++++++++++++ cloudsmith_cli/core/api/packages.py | 18 + 4 files changed, 1202 insertions(+) create mode 100644 cloudsmith_cli/cli/commands/metadata.py create mode 100644 cloudsmith_cli/cli/tests/commands/test_metadata.py diff --git a/cloudsmith_cli/cli/commands/__init__.py b/cloudsmith_cli/cli/commands/__init__.py index af10c90e..ed80eac5 100644 --- a/cloudsmith_cli/cli/commands/__init__.py +++ b/cloudsmith_cli/cli/commands/__init__.py @@ -14,6 +14,7 @@ login, logout, mcp, + metadata, metrics, move, policy, diff --git a/cloudsmith_cli/cli/commands/metadata.py b/cloudsmith_cli/cli/commands/metadata.py new file mode 100644 index 00000000..87f7b385 --- /dev/null +++ b/cloudsmith_cli/cli/commands/metadata.py @@ -0,0 +1,533 @@ +"""CLI/Commands - Manage metadata attached to packages.""" + +import json + +import click + +from ...core.api.metadata import ( + create_metadata as api_create_metadata, + delete_metadata as api_delete_metadata, + get_metadata as api_get_metadata, + list_metadata as api_list_metadata, + normalise_classification, + normalise_source_kind, + update_metadata as api_update_metadata, +) +from ...core.api.packages import get_package_slug_perm as api_get_package_slug_perm +from ...core.pagination import paginate_results +from ...core.version import get_version as get_cli_version +from .. import command, decorators, utils, validators +from ..exceptions import handle_api_exceptions +from ..utils import maybe_spinner +from .main import main + +_METADATA_HEADERS = [ + "Slug", + "Content Type", + "Classification", + "Source Kind", + "Source Identity", +] + + +def _default_source_identity(): + """Return the default value for --source-identity.""" + return f"cloudsmith-cli@{get_cli_version()}" + + +def _format_metadata_row(entry): + return [ + click.style(entry.get("slug_perm") or "", fg="cyan"), + click.style(entry.get("content_type") or "", fg="yellow"), + click.style(str(entry.get("classification", "")), fg="magenta"), + click.style(str(entry.get("source_kind", "")), fg="blue"), + click.style(entry.get("source_identity") or "", fg="green"), + ] + + +def _echo_action(message, use_stderr): + """Print an in-progress status message.""" + click.echo(message, nl=False, err=use_stderr) + + +def _print_metadata_table(opts, entries, page_info=None, page_all=False): + """Print a list of metadata entries as a table or JSON.""" + if utils.maybe_print_as_json(opts, list(entries), page_info=page_info): + return + + rows = [ + _format_metadata_row(e) + for e in sorted(entries, key=lambda e: e.get("slug_perm") or "") + ] + + if rows: + click.echo() + utils.pretty_print_table(_METADATA_HEADERS, rows) + + click.echo() + + num_results = len(rows) + list_suffix = "metadata entr%s" % ("ies" if num_results != 1 else "y") + utils.pretty_print_list_info( + num_results=num_results, + page_info=None if page_all else page_info, + suffix=f"{list_suffix} retrieved" if page_all else f"{list_suffix} visible", + page_all=page_all, + ) + + +def _print_metadata_entry(opts, entry): + """Print a single metadata entry as a table + indented JSON content.""" + if utils.maybe_print_as_json(opts, entry): + return + + click.echo() + utils.pretty_print_table(_METADATA_HEADERS, [_format_metadata_row(entry)]) + click.echo() + + content = entry.get("content") + if content is not None: + click.secho("Content:", bold=True) + click.echo(json.dumps(content, indent=2, sort_keys=True)) + + +def _load_content(content_file, inline_content, *, required): + """Resolve --file / --content into a parsed object. + + Enforces the XOR between the two sources. When `required` is True (used + by `add`), at least one source must be provided. When False (used by + `update`), a missing source means "do not change content". + """ + if content_file is not None and inline_content is not None: + raise click.UsageError("--file and --content are mutually exclusive.") + + if content_file is not None: + if content_file == "-": + raw, source = click.get_text_stream("stdin").read(), "stdin" + else: + with open(content_file, encoding="utf-8") as fh: + raw, source = fh.read(), "--file" + elif inline_content is not None: + raw, source = inline_content, "--content" + elif required: + raise click.UsageError("One of --file or --content is required.") + else: + return None + + try: + return json.loads(raw) + except ValueError as exc: + raise click.UsageError(f"Invalid JSON in {source}: {exc}") from exc + + +@main.group(name="metadata", cls=command.AliasGroup) +@decorators.common_cli_config_options +@decorators.common_cli_output_options +@decorators.common_api_auth_options +@decorators.initialise_api +@click.pass_context +def metadata_(ctx, opts): # pylint: disable=unused-argument + """ + Manage metadata attached to packages in a repository. + + See the help for subcommands for more information on each. + """ + + +@metadata_.command(name="list", aliases=["ls"]) +@decorators.common_cli_config_options +@decorators.common_cli_output_options +@decorators.common_cli_list_options +@decorators.common_api_auth_options +@decorators.initialise_api +@click.argument( + "owner_repo_package", + metavar="OWNER/REPO/PACKAGE", + callback=validators.validate_owner_repo_package, +) +@click.argument("metadata_slug_perm", required=False, default=None) +@click.option( + "--source-kind", + "source_kind", + default=None, + help=( + "Filter by metadata source kind. Accepts an integer or a name " + "(e.g. 'customer', 'third_party'). Ignored when METADATA_SLUG_PERM is given." + ), +) +@click.option( + "--classification", + "classification", + default=None, + help=( + "Filter by metadata classification. Accepts an integer or a name " + "(e.g. 'provenance', 'sbom'). Ignored when METADATA_SLUG_PERM is given." + ), +) +@click.pass_context +def list_metadata( + ctx, + opts, + owner_repo_package, + metadata_slug_perm, + page, + page_size, + page_all, + source_kind, + classification, +): + """ + List metadata entries attached to a package. + + OWNER/REPO/PACKAGE: identifies the package whose metadata you want to list. + + METADATA_SLUG_PERM (optional): if given, fetch and display only that single + metadata entry. Pagination and filter flags are ignored in this case. + + \b + Examples: + $ cloudsmith metadata list your-org/awesome-repo/better-pkg + $ cloudsmith metadata list your-org/awesome-repo/better-pkg --classification provenance + $ cloudsmith metadata list your-org/awesome-repo/better-pkg meta-slug-perm + """ + owner, repo, package = owner_repo_package + use_stderr = utils.should_use_stderr(opts) + + if metadata_slug_perm: + _echo_action( + "Fetching metadata entry %(metadata)s for the '%(package)s' package ... " + % { + "metadata": click.style(metadata_slug_perm, bold=True), + "package": click.style(package, bold=True), + }, + use_stderr, + ) + + context_msg = "Failed to fetch metadata for the package!" + with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): + with maybe_spinner(opts): + slug_perm = api_get_package_slug_perm( + owner=owner, repo=repo, identifier=package + ) + entry = api_get_metadata(slug_perm, metadata_slug_perm) + + click.secho("OK", fg="green", err=use_stderr) + _print_metadata_entry(opts, entry) + return + + # Validate filter values up-front for a friendlier error than what the + # API would return (the normalisers raise ValueError on invalid values). + try: + normalise_source_kind(source_kind) + normalise_classification(classification) + except ValueError as exc: + raise click.UsageError(str(exc)) from exc + + _echo_action( + "Listing metadata for the '%(package)s' package ... " + % {"package": click.style(package, bold=True)}, + use_stderr, + ) + + context_msg = "Failed to list metadata for the package!" + with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): + with maybe_spinner(opts): + slug_perm = api_get_package_slug_perm( + owner=owner, repo=repo, identifier=package + ) + entries, page_info = paginate_results( + api_list_metadata, + page_all=page_all, + page=page, + page_size=page_size, + package_slug_perm=slug_perm, + source_kind=source_kind, + classification=classification, + ) + + click.secho("OK", fg="green", err=use_stderr) + _print_metadata_table(opts, entries, page_info=page_info, page_all=page_all) + + +@metadata_.command(name="add") +@decorators.common_cli_config_options +@decorators.common_cli_output_options +@decorators.common_api_auth_options +@decorators.initialise_api +@click.argument( + "owner_repo_package", + metavar="OWNER/REPO/PACKAGE", + callback=validators.validate_owner_repo_package, +) +@click.option( + "--content-type", + "content_type", + required=True, + help=( + "The content type of the metadata payload (e.g. 'application/json'). " + "Content type is immutable after creation." + ), +) +@click.option( + "--file", + "content_file", + type=click.Path( + exists=True, + dir_okay=False, + readable=True, + resolve_path=True, + allow_dash=True, + ), + default=None, + help="Path to a JSON file containing the metadata content. Use '-' for stdin.", +) +@click.option( + "--content", + "inline_content", + default=None, + help=("Inline JSON content for the metadata. Mutually exclusive with --file."), +) +@click.option( + "--source-identity", + "source_identity", + default=None, + help=( + "Free-text identifier indicating where this metadata originated. " + "Defaults to 'cloudsmith-cli@'." + ), +) +@click.pass_context +def add_metadata( + ctx, + opts, + owner_repo_package, + content_type, + content_file, + inline_content, + source_identity, +): + """ + Attach a new metadata entry to a package. + + OWNER/REPO/PACKAGE: the package the metadata should be attached to. + + Exactly one of --file or --content must be supplied. + Content type is set on creation and cannot be changed later. + + \b + Examples: + $ cloudsmith metadata add your-org/awesome-repo/better-pkg \\ + --content-type application/json \\ + --content '{"foo": "bar"}' + $ cat payload.json | cloudsmith metadata add your-org/awesome-repo/better-pkg \\ + --content-type application/json \\ + --file - + $ cloudsmith metadata add your-org/awesome-repo/better-pkg \\ + --content-type application/vnd.jfrog.buildinfo+json \\ + --file buildinfo.json + """ + owner, repo, package = owner_repo_package + use_stderr = utils.should_use_stderr(opts) + + content = _load_content(content_file, inline_content, required=True) + source_identity = source_identity or _default_source_identity() + + _echo_action( + "Attaching metadata to the '%(package)s' package ... " + % {"package": click.style(package, bold=True)}, + use_stderr, + ) + + context_msg = "Failed to attach metadata to the package!" + with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): + with maybe_spinner(opts): + slug_perm = api_get_package_slug_perm( + owner=owner, repo=repo, identifier=package + ) + entry = api_create_metadata( + slug_perm, + content=content, + content_type=content_type, + source_identity=source_identity, + ) + + click.secho("OK", fg="green", err=use_stderr) + _print_metadata_entry(opts, entry) + + +@metadata_.command(name="update") +@decorators.common_cli_config_options +@decorators.common_cli_output_options +@decorators.common_api_auth_options +@decorators.initialise_api +@click.argument( + "owner_repo_package", + metavar="OWNER/REPO/PACKAGE", + callback=validators.validate_owner_repo_package, +) +@click.argument("metadata_slug_perm") +@click.option( + "--file", + "content_file", + type=click.Path( + exists=True, + dir_okay=False, + readable=True, + resolve_path=True, + allow_dash=True, + ), + default=None, + help=( + "Path to a JSON file containing replacement metadata content. " + "Use '-' for stdin." + ), +) +@click.option( + "--content", + "inline_content", + default=None, + help=( + "Inline JSON replacement content for the metadata. Mutually exclusive " + "with --file." + ), +) +@click.option( + "--source-identity", + "source_identity", + default=None, + help="Update the free-text source identity for this metadata entry.", +) +@click.pass_context +def update_metadata( + ctx, + opts, + owner_repo_package, + metadata_slug_perm, + content_file, + inline_content, + source_identity, +): + """ + Patch an existing metadata entry on a package. + + OWNER/REPO/PACKAGE: the package the metadata is attached to. + METADATA_SLUG_PERM: the permanent slug of the metadata entry to update. + + Content type cannot be changed after creation. + + \b + Examples: + $ cloudsmith metadata update your-org/awesome-repo/better-pkg meta-slug \\ + --content '{"foo": "baz"}' + $ cat payload.json | cloudsmith metadata update your-org/awesome-repo/better-pkg meta-slug \\ + --file - + """ + owner, repo, package = owner_repo_package + use_stderr = utils.should_use_stderr(opts) + + content = _load_content(content_file, inline_content, required=False) + + patch_kwargs = { + key: value + for key, value in ( + ("content", content), + ("source_identity", source_identity), + ) + if value is not None + } + if not patch_kwargs: + raise click.UsageError( + "Nothing to update. Provide --file, --content, or --source-identity." + ) + + _echo_action( + "Updating metadata entry %(metadata)s on the '%(package)s' package ... " + % { + "metadata": click.style(metadata_slug_perm, bold=True), + "package": click.style(package, bold=True), + }, + use_stderr, + ) + + context_msg = "Failed to update metadata on the package!" + with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): + with maybe_spinner(opts): + slug_perm = api_get_package_slug_perm( + owner=owner, repo=repo, identifier=package + ) + entry = api_update_metadata(slug_perm, metadata_slug_perm, **patch_kwargs) + + click.secho("OK", fg="green", err=use_stderr) + _print_metadata_entry(opts, entry) + + +@metadata_.command(name="remove", aliases=["rm"]) +@decorators.common_cli_config_options +@decorators.common_cli_output_options +@decorators.common_api_auth_options +@decorators.initialise_api +@click.argument( + "owner_repo_package", + metavar="OWNER/REPO/PACKAGE", + callback=validators.validate_owner_repo_package, +) +@click.argument("metadata_slug_perm") +@click.option( + "-y", + "--yes", + default=False, + is_flag=True, + help="Assume yes as default answer to questions (this is dangerous!)", +) +@click.pass_context +def remove_metadata(ctx, opts, owner_repo_package, metadata_slug_perm, yes): + """ + Remove a metadata entry from a package. + + OWNER/REPO/PACKAGE: the package the metadata is attached to. + METADATA_SLUG_PERM: the permanent slug of the metadata entry to delete. + + \b + Example: + $ cloudsmith metadata remove your-org/awesome-repo/better-pkg meta-slug + """ + owner, repo, package = owner_repo_package + use_stderr = utils.should_use_stderr(opts) + + remove_args = { + "metadata": click.style(metadata_slug_perm, bold=True), + "package": click.style(package, bold=True), + } + + prompt = ( + "remove the %(metadata)s metadata entry from the %(package)s package" + % remove_args + ) + if not utils.confirm_operation(prompt, assume_yes=yes, err=use_stderr): + return + + _echo_action( + "Removing metadata entry %(metadata)s from the '%(package)s' package ... " + % remove_args, + use_stderr, + ) + + context_msg = "Failed to remove metadata from the package!" + with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): + with maybe_spinner(opts): + slug_perm = api_get_package_slug_perm( + owner=owner, repo=repo, identifier=package + ) + api_delete_metadata(slug_perm, metadata_slug_perm) + + click.secho("OK", fg="green", err=use_stderr) + + result_payload = {"deleted": True, "slug_perm": metadata_slug_perm} + if utils.maybe_print_as_json(opts, result_payload): + return + + click.echo() + click.secho( + "Removed metadata entry %(slug)s." + % {"slug": click.style(metadata_slug_perm, bold=True)} + ) diff --git a/cloudsmith_cli/cli/tests/commands/test_metadata.py b/cloudsmith_cli/cli/tests/commands/test_metadata.py new file mode 100644 index 00000000..f55b7cb7 --- /dev/null +++ b/cloudsmith_cli/cli/tests/commands/test_metadata.py @@ -0,0 +1,650 @@ +"""CLI tests for the `cloudsmith metadata` command group.""" + +import json +import unittest +from unittest.mock import patch + +from click.testing import CliRunner + +from cloudsmith_cli.cli.commands.metadata import metadata_ +from cloudsmith_cli.core.pagination import MAX_PAGE_SIZE, PageInfo + + +def _empty_page_info(): + """Return an invalid PageInfo, matching current v2 API responses.""" + return PageInfo() + + +def _page_info(*, page, page_total, count, page_size=MAX_PAGE_SIZE): + info = PageInfo() + info.count = count + info.page = page + info.page_size = page_size + info.page_total = page_total + return info + + +class TestMetadataGroupSmoke(unittest.TestCase): + def setUp(self): + self.runner = CliRunner() + + def test_help_lists_subcommands(self): + result = self.runner.invoke(metadata_, ["--help"]) + self.assertEqual(result.exit_code, 0, msg=result.output) + self.assertIn("list", result.output) + self.assertIn("add", result.output) + self.assertIn("update", result.output) + self.assertIn("remove", result.output) + + def test_help_preserves_example_lines(self): + result = self.runner.invoke(metadata_, ["list", "--help"]) + + self.assertEqual(result.exit_code, 0, msg=result.output) + self.assertIn( + "$ cloudsmith metadata list your-org/awesome-repo/better-pkg\n", + result.output, + ) + self.assertIn( + "$ cloudsmith metadata list your-org/awesome-repo/better-pkg " + "--classification provenance\n", + result.output, + ) + self.assertIn( + "$ cloudsmith metadata list your-org/awesome-repo/better-pkg meta-slug-perm\n", + result.output, + ) + + def test_add_help_preserves_multiline_example(self): + result = self.runner.invoke(metadata_, ["add", "--help"]) + + self.assertEqual(result.exit_code, 0, msg=result.output) + self.assertIn( + "$ cloudsmith metadata add your-org/awesome-repo/better-pkg \\\n", + result.output, + ) + self.assertIn("--content-type application/json \\\n", result.output) + self.assertIn('--content \'{"foo": "bar"}\'', result.output) + self.assertIn("cat payload.json | cloudsmith metadata add", result.output) + self.assertIn("--file -", result.output) + self.assertIn("application/vnd.jfrog.buildinfo+json", result.output) + self.assertIn("--file buildinfo.json", result.output) + + def test_update_help_preserves_stdin_example(self): + result = self.runner.invoke(metadata_, ["update", "--help"]) + + self.assertEqual(result.exit_code, 0, msg=result.output) + self.assertIn("cat payload.json | cloudsmith metadata update", result.output) + self.assertIn("--file -", result.output) + + +class TestMetadataList(unittest.TestCase): + def setUp(self): + self.runner = CliRunner() + + @patch("cloudsmith_cli.cli.commands.metadata.api_list_metadata") + @patch("cloudsmith_cli.cli.commands.metadata.api_get_package_slug_perm") + def test_list_resolves_slug_perm_and_calls_list(self, mock_resolve, mock_list): + mock_resolve.return_value = "pkg-slug-perm" + mock_list.return_value = ([], _empty_page_info()) + + result = self.runner.invoke(metadata_, ["list", "myorg/myrepo/mypkg"]) + + self.assertEqual(result.exit_code, 0, msg=result.output) + mock_resolve.assert_called_once_with( + owner="myorg", repo="myrepo", identifier="mypkg" + ) + mock_list.assert_called_once() + kwargs = mock_list.call_args.kwargs + self.assertEqual(kwargs["package_slug_perm"], "pkg-slug-perm") + self.assertIsNone(kwargs.get("source_kind")) + self.assertIsNone(kwargs.get("classification")) + + @patch("cloudsmith_cli.cli.commands.metadata.api_list_metadata") + @patch("cloudsmith_cli.cli.commands.metadata.api_get_package_slug_perm") + def test_list_passes_filters(self, mock_resolve, mock_list): + mock_resolve.return_value = "pkg-slug-perm" + mock_list.return_value = ([], _empty_page_info()) + + result = self.runner.invoke( + metadata_, + [ + "list", + "myorg/myrepo/mypkg", + "--source-kind", + "customer", + "--classification", + "4", + ], + ) + + self.assertEqual(result.exit_code, 0, msg=result.output) + kwargs = mock_list.call_args.kwargs + self.assertEqual(kwargs["source_kind"], "customer") + self.assertEqual(kwargs["classification"], "4") + + @patch("cloudsmith_cli.cli.commands.metadata.api_list_metadata") + @patch("cloudsmith_cli.cli.commands.metadata.api_get_package_slug_perm") + def test_list_json_output(self, mock_resolve, mock_list): + mock_resolve.return_value = "pkg-slug-perm" + mock_list.return_value = ( + [ + { + "slug_perm": "abc", + "content_type": "application/json", + "classification": "GENERIC", + "source_kind": "CUSTOMER", + "source_identity": "cloudsmith-cli@1.16.0", + } + ], + _empty_page_info(), + ) + + result = self.runner.invoke( + metadata_, ["list", "-F", "json", "myorg/myrepo/mypkg"] + ) + + self.assertEqual(result.exit_code, 0, msg=result.output) + payload = json.loads(result.stdout) + self.assertEqual(len(payload["data"]), 1) + self.assertEqual(payload["data"][0]["slug_perm"], "abc") + + @patch("cloudsmith_cli.cli.commands.metadata.api_list_metadata") + @patch("cloudsmith_cli.cli.commands.metadata.api_get_package_slug_perm") + def test_list_invalid_filter_value_is_usage_error(self, mock_resolve, mock_list): + mock_resolve.return_value = "pkg-slug-perm" + + result = self.runner.invoke( + metadata_, + ["list", "myorg/myrepo/mypkg", "--source-kind", "not-a-kind"], + ) + + self.assertNotEqual(result.exit_code, 0) + self.assertIn("source_kind", result.output.lower()) + mock_list.assert_not_called() + + @patch("cloudsmith_cli.cli.commands.metadata.api_list_metadata") + @patch("cloudsmith_cli.cli.commands.metadata.api_get_package_slug_perm") + def test_list_page_all_aggregates_all_pages(self, mock_resolve, mock_list): + mock_resolve.return_value = "pkg-slug-perm" + mock_list.side_effect = [ + ( + [ + { + "slug_perm": "first", + "content_type": "application/json", + } + ], + _page_info(page=1, page_total=2, count=2), + ), + ( + [ + { + "slug_perm": "second", + "content_type": "application/json", + } + ], + _page_info(page=2, page_total=2, count=2), + ), + ] + + result = self.runner.invoke( + metadata_, ["list", "-F", "json", "myorg/myrepo/mypkg", "--page-all"] + ) + + self.assertEqual(result.exit_code, 0, msg=result.output) + payload = json.loads(result.stdout) + self.assertEqual( + [item["slug_perm"] for item in payload["data"]], ["first", "second"] + ) + self.assertEqual(mock_list.call_count, 2) + self.assertEqual( + [call.kwargs["page"] for call in mock_list.call_args_list], [1, 2] + ) + self.assertTrue( + all( + call.kwargs["page_size"] == MAX_PAGE_SIZE + for call in mock_list.call_args_list + ) + ) + + @patch("cloudsmith_cli.cli.commands.metadata.api_list_metadata") + @patch("cloudsmith_cli.cli.commands.metadata.api_get_package_slug_perm") + def test_list_ls_alias(self, mock_resolve, mock_list): + mock_resolve.return_value = "pkg-slug-perm" + mock_list.return_value = ([], _empty_page_info()) + + result = self.runner.invoke(metadata_, ["ls", "myorg/myrepo/mypkg"]) + + self.assertEqual(result.exit_code, 0, msg=result.output) + mock_list.assert_called_once() + + +class TestMetadataListSingle(unittest.TestCase): + def setUp(self): + self.runner = CliRunner() + + @patch("cloudsmith_cli.cli.commands.metadata.api_get_metadata") + @patch("cloudsmith_cli.cli.commands.metadata.api_list_metadata") + @patch("cloudsmith_cli.cli.commands.metadata.api_get_package_slug_perm") + def test_single_fetch_calls_get_and_skips_list( + self, mock_resolve, mock_list, mock_get + ): + mock_resolve.return_value = "pkg-slug-perm" + mock_get.return_value = { + "slug_perm": "meta-slug", + "content_type": "application/json", + "content": {"hello": "world"}, + } + + result = self.runner.invoke( + metadata_, ["list", "myorg/myrepo/mypkg", "meta-slug"] + ) + + self.assertEqual(result.exit_code, 0, msg=result.output) + mock_get.assert_called_once_with("pkg-slug-perm", "meta-slug") + mock_list.assert_not_called() + + @patch("cloudsmith_cli.cli.commands.metadata.api_get_metadata") + @patch("cloudsmith_cli.cli.commands.metadata.api_get_package_slug_perm") + def test_single_fetch_json_output(self, mock_resolve, mock_get): + mock_resolve.return_value = "pkg-slug-perm" + mock_get.return_value = { + "slug_perm": "meta-slug", + "content_type": "application/json", + "content": {"hello": "world"}, + } + + result = self.runner.invoke( + metadata_, + ["list", "-F", "json", "myorg/myrepo/mypkg", "meta-slug"], + ) + + self.assertEqual(result.exit_code, 0, msg=result.output) + payload = json.loads(result.stdout) + self.assertEqual(payload["data"]["slug_perm"], "meta-slug") + self.assertEqual(payload["data"]["content"], {"hello": "world"}) + + +class TestMetadataAdd(unittest.TestCase): + def setUp(self): + self.runner = CliRunner() + + @patch("cloudsmith_cli.cli.commands.metadata.api_create_metadata") + @patch("cloudsmith_cli.cli.commands.metadata.api_get_package_slug_perm") + def test_add_with_inline_content(self, mock_resolve, mock_create): + mock_resolve.return_value = "pkg-slug-perm" + mock_create.return_value = {"slug_perm": "new-slug"} + + result = self.runner.invoke( + metadata_, + [ + "add", + "myorg/myrepo/mypkg", + "--content-type", + "application/json", + "--content", + '{"foo": "bar"}', + ], + ) + + self.assertEqual(result.exit_code, 0, msg=result.output) + mock_create.assert_called_once() + kwargs = mock_create.call_args.kwargs + self.assertEqual(kwargs["content"], {"foo": "bar"}) + self.assertEqual(kwargs["content_type"], "application/json") + self.assertTrue(kwargs["source_identity"].startswith("cloudsmith-cli@")) + # First positional arg is the resolved slug_perm. + self.assertEqual(mock_create.call_args.args[0], "pkg-slug-perm") + + @patch("cloudsmith_cli.cli.commands.metadata.api_create_metadata") + @patch("cloudsmith_cli.cli.commands.metadata.api_get_package_slug_perm") + def test_add_with_file(self, mock_resolve, mock_create): + mock_resolve.return_value = "pkg-slug-perm" + mock_create.return_value = {"slug_perm": "new-slug"} + + with self.runner.isolated_filesystem(): + with open("payload.json", "w", encoding="utf-8") as fh: + fh.write('{"hello": "world"}') + + result = self.runner.invoke( + metadata_, + [ + "add", + "myorg/myrepo/mypkg", + "--content-type", + "application/json", + "--file", + "payload.json", + ], + ) + + self.assertEqual(result.exit_code, 0, msg=result.output) + kwargs = mock_create.call_args.kwargs + self.assertEqual(kwargs["content"], {"hello": "world"}) + + @patch("cloudsmith_cli.cli.commands.metadata.api_create_metadata") + @patch("cloudsmith_cli.cli.commands.metadata.api_get_package_slug_perm") + def test_add_with_stdin_file(self, mock_resolve, mock_create): + mock_resolve.return_value = "pkg-slug-perm" + mock_create.return_value = {"slug_perm": "new-slug"} + + result = self.runner.invoke( + metadata_, + [ + "add", + "myorg/myrepo/mypkg", + "--content-type", + "application/json", + "--file", + "-", + ], + input='{"from": "stdin"}', + ) + + self.assertEqual(result.exit_code, 0, msg=result.output) + kwargs = mock_create.call_args.kwargs + self.assertEqual(kwargs["content"], {"from": "stdin"}) + + @patch("cloudsmith_cli.cli.commands.metadata.api_create_metadata") + @patch("cloudsmith_cli.cli.commands.metadata.api_get_package_slug_perm") + def test_add_rejects_both_sources(self, mock_resolve, mock_create): + mock_resolve.return_value = "pkg-slug-perm" + + with self.runner.isolated_filesystem(): + with open("payload.json", "w", encoding="utf-8") as fh: + fh.write("{}") + + result = self.runner.invoke( + metadata_, + [ + "add", + "myorg/myrepo/mypkg", + "--content-type", + "application/json", + "--file", + "payload.json", + "--content", + "{}", + ], + ) + + self.assertNotEqual(result.exit_code, 0) + self.assertIn("mutually exclusive", result.output.lower()) + mock_create.assert_not_called() + + @patch("cloudsmith_cli.cli.commands.metadata.api_create_metadata") + @patch("cloudsmith_cli.cli.commands.metadata.api_get_package_slug_perm") + def test_add_requires_one_source(self, mock_resolve, mock_create): + mock_resolve.return_value = "pkg-slug-perm" + + result = self.runner.invoke( + metadata_, + [ + "add", + "myorg/myrepo/mypkg", + "--content-type", + "application/json", + ], + ) + + self.assertNotEqual(result.exit_code, 0) + self.assertIn("--file", result.output) + mock_create.assert_not_called() + + @patch("cloudsmith_cli.cli.commands.metadata.api_create_metadata") + @patch("cloudsmith_cli.cli.commands.metadata.api_get_package_slug_perm") + def test_add_invalid_json_is_usage_error(self, mock_resolve, mock_create): + mock_resolve.return_value = "pkg-slug-perm" + + result = self.runner.invoke( + metadata_, + [ + "add", + "myorg/myrepo/mypkg", + "--content-type", + "application/json", + "--content", + "{not json", + ], + ) + + self.assertNotEqual(result.exit_code, 0) + self.assertIn("invalid", result.output.lower()) + mock_create.assert_not_called() + + @patch("cloudsmith_cli.cli.commands.metadata.api_create_metadata") + @patch("cloudsmith_cli.cli.commands.metadata.api_get_package_slug_perm") + def test_add_invalid_stdin_json_is_usage_error(self, mock_resolve, mock_create): + mock_resolve.return_value = "pkg-slug-perm" + + result = self.runner.invoke( + metadata_, + [ + "add", + "myorg/myrepo/mypkg", + "--content-type", + "application/json", + "--file", + "-", + ], + input="{not json", + ) + + self.assertNotEqual(result.exit_code, 0) + self.assertIn("invalid json in stdin", result.output.lower()) + mock_create.assert_not_called() + + @patch("cloudsmith_cli.cli.commands.metadata.api_create_metadata") + @patch("cloudsmith_cli.cli.commands.metadata.api_get_package_slug_perm") + def test_add_uses_explicit_source_identity(self, mock_resolve, mock_create): + mock_resolve.return_value = "pkg-slug-perm" + mock_create.return_value = {"slug_perm": "new-slug"} + + result = self.runner.invoke( + metadata_, + [ + "add", + "myorg/myrepo/mypkg", + "--content-type", + "application/json", + "--content", + "{}", + "--source-identity", + "ci-pipeline:42", + ], + ) + + self.assertEqual(result.exit_code, 0, msg=result.output) + kwargs = mock_create.call_args.kwargs + self.assertEqual(kwargs["source_identity"], "ci-pipeline:42") + + +class TestMetadataUpdate(unittest.TestCase): + def setUp(self): + self.runner = CliRunner() + + @patch("cloudsmith_cli.cli.commands.metadata.api_update_metadata") + @patch("cloudsmith_cli.cli.commands.metadata.api_get_package_slug_perm") + def test_update_patches_content(self, mock_resolve, mock_update): + mock_resolve.return_value = "pkg-slug-perm" + mock_update.return_value = {"slug_perm": "meta-slug"} + + result = self.runner.invoke( + metadata_, + [ + "update", + "myorg/myrepo/mypkg", + "meta-slug", + "--content", + '{"foo": "baz"}', + ], + ) + + self.assertEqual(result.exit_code, 0, msg=result.output) + mock_update.assert_called_once() + args = mock_update.call_args.args + kwargs = mock_update.call_args.kwargs + self.assertEqual(args, ("pkg-slug-perm", "meta-slug")) + self.assertEqual(kwargs["content"], {"foo": "baz"}) + self.assertNotIn("source_identity", kwargs) + + @patch("cloudsmith_cli.cli.commands.metadata.api_update_metadata") + @patch("cloudsmith_cli.cli.commands.metadata.api_get_package_slug_perm") + def test_update_patches_stdin_file(self, mock_resolve, mock_update): + mock_resolve.return_value = "pkg-slug-perm" + mock_update.return_value = {"slug_perm": "meta-slug"} + + result = self.runner.invoke( + metadata_, + [ + "update", + "myorg/myrepo/mypkg", + "meta-slug", + "--file", + "-", + ], + input='{"foo": "from-stdin"}', + ) + + self.assertEqual(result.exit_code, 0, msg=result.output) + kwargs = mock_update.call_args.kwargs + self.assertEqual(kwargs["content"], {"foo": "from-stdin"}) + self.assertNotIn("source_identity", kwargs) + + @patch("cloudsmith_cli.cli.commands.metadata.api_update_metadata") + @patch("cloudsmith_cli.cli.commands.metadata.api_get_package_slug_perm") + def test_update_patches_source_identity_only(self, mock_resolve, mock_update): + mock_resolve.return_value = "pkg-slug-perm" + mock_update.return_value = {"slug_perm": "meta-slug"} + + result = self.runner.invoke( + metadata_, + [ + "update", + "myorg/myrepo/mypkg", + "meta-slug", + "--source-identity", + "ci-pipeline:99", + ], + ) + + self.assertEqual(result.exit_code, 0, msg=result.output) + kwargs = mock_update.call_args.kwargs + self.assertEqual(kwargs["source_identity"], "ci-pipeline:99") + self.assertNotIn("content", kwargs) + + @patch("cloudsmith_cli.cli.commands.metadata.api_update_metadata") + @patch("cloudsmith_cli.cli.commands.metadata.api_get_package_slug_perm") + def test_update_rejects_both_content_sources(self, mock_resolve, mock_update): + mock_resolve.return_value = "pkg-slug-perm" + + with self.runner.isolated_filesystem(): + with open("payload.json", "w", encoding="utf-8") as fh: + fh.write("{}") + + result = self.runner.invoke( + metadata_, + [ + "update", + "myorg/myrepo/mypkg", + "meta-slug", + "--file", + "payload.json", + "--content", + "{}", + ], + ) + + self.assertNotEqual(result.exit_code, 0) + self.assertIn("mutually exclusive", result.output.lower()) + mock_update.assert_not_called() + + @patch("cloudsmith_cli.cli.commands.metadata.api_update_metadata") + @patch("cloudsmith_cli.cli.commands.metadata.api_get_package_slug_perm") + def test_update_requires_some_field(self, mock_resolve, mock_update): + mock_resolve.return_value = "pkg-slug-perm" + + result = self.runner.invoke( + metadata_, + ["update", "myorg/myrepo/mypkg", "meta-slug"], + ) + + self.assertNotEqual(result.exit_code, 0) + self.assertIn("nothing to update", result.output.lower()) + mock_update.assert_not_called() + + def test_update_rejects_content_type_flag(self): + result = self.runner.invoke( + metadata_, + [ + "update", + "myorg/myrepo/mypkg", + "meta-slug", + "--content-type", + "application/json", + ], + ) + + self.assertNotEqual(result.exit_code, 0) + # Click's default is "no such option" + self.assertIn("--content-type", result.output) + + +class TestMetadataRemove(unittest.TestCase): + def setUp(self): + self.runner = CliRunner() + + @patch("cloudsmith_cli.cli.commands.metadata.api_delete_metadata") + @patch("cloudsmith_cli.cli.commands.metadata.api_get_package_slug_perm") + def test_remove_calls_delete(self, mock_resolve, mock_delete): + mock_resolve.return_value = "pkg-slug-perm" + + result = self.runner.invoke( + metadata_, ["remove", "-y", "myorg/myrepo/mypkg", "meta-slug"] + ) + + self.assertEqual(result.exit_code, 0, msg=result.output) + mock_delete.assert_called_once_with("pkg-slug-perm", "meta-slug") + + @patch("cloudsmith_cli.cli.commands.metadata.api_delete_metadata") + @patch("cloudsmith_cli.cli.commands.metadata.api_get_package_slug_perm") + def test_remove_prompts_and_aborts(self, mock_resolve, mock_delete): + result = self.runner.invoke( + metadata_, ["remove", "myorg/myrepo/mypkg", "meta-slug"], input="N\n" + ) + + self.assertEqual(result.exit_code, 0, msg=result.output) + self.assertIn("Are you absolutely certain", result.output) + mock_resolve.assert_not_called() + mock_delete.assert_not_called() + + @patch("cloudsmith_cli.cli.commands.metadata.api_delete_metadata") + @patch("cloudsmith_cli.cli.commands.metadata.api_get_package_slug_perm") + def test_remove_alias_rm(self, mock_resolve, mock_delete): + mock_resolve.return_value = "pkg-slug-perm" + + result = self.runner.invoke( + metadata_, ["rm", "-y", "myorg/myrepo/mypkg", "meta-slug"] + ) + + self.assertEqual(result.exit_code, 0, msg=result.output) + mock_delete.assert_called_once_with("pkg-slug-perm", "meta-slug") + + @patch("cloudsmith_cli.cli.commands.metadata.api_delete_metadata") + @patch("cloudsmith_cli.cli.commands.metadata.api_get_package_slug_perm") + def test_remove_json_output(self, mock_resolve, mock_delete): + mock_resolve.return_value = "pkg-slug-perm" + + result = self.runner.invoke( + metadata_, + ["remove", "-F", "json", "-y", "myorg/myrepo/mypkg", "meta-slug"], + ) + + self.assertEqual(result.exit_code, 0, msg=result.output) + payload = json.loads(result.stdout) + self.assertTrue(payload["data"]["deleted"]) + self.assertEqual(payload["data"]["slug_perm"], "meta-slug") + + +if __name__ == "__main__": + unittest.main() diff --git a/cloudsmith_cli/core/api/packages.py b/cloudsmith_cli/core/api/packages.py index cdd237ab..94879046 100644 --- a/cloudsmith_cli/core/api/packages.py +++ b/cloudsmith_cli/core/api/packages.py @@ -215,6 +215,24 @@ def get_package_tags(owner, repo, identifier): return (data.tags, data.tags_immutable) +def get_package_slug_perm(owner, repo, identifier): + """Resolve a package's permanent slug from owner/repo/identifier. + + Used by metadata commands that address packages by slug_perm. + """ + client = get_packages_api() + + with catch_raise_api_exception(): + data, _, headers = client.packages_read_with_http_info( + owner=owner, repo=repo, identifier=identifier + ) + + ratelimits.maybe_rate_limit(client, headers) + + # pylint: disable=no-member + return data.slug_perm + + def list_packages(owner, repo, **kwargs): """List packages for a repository.""" client = get_packages_api() From e21eca423c9d96f01c7f340bf3a9319e20a40508 Mon Sep 17 00:00:00 2001 From: Bartosz Blizniak Date: Fri, 1 May 2026 14:24:03 +0100 Subject: [PATCH 4/5] copilot feedback --- cloudsmith_cli/cli/commands/metadata.py | 4 +++- cloudsmith_cli/cli/tests/commands/test_metadata.py | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/cloudsmith_cli/cli/commands/metadata.py b/cloudsmith_cli/cli/commands/metadata.py index 87f7b385..3ad10c89 100644 --- a/cloudsmith_cli/cli/commands/metadata.py +++ b/cloudsmith_cli/cli/commands/metadata.py @@ -52,7 +52,9 @@ def _echo_action(message, use_stderr): def _print_metadata_table(opts, entries, page_info=None, page_all=False): """Print a list of metadata entries as a table or JSON.""" - if utils.maybe_print_as_json(opts, list(entries), page_info=page_info): + if utils.maybe_print_as_json( + opts, list(entries), page_info=None if page_all else page_info + ): return rows = [ diff --git a/cloudsmith_cli/cli/tests/commands/test_metadata.py b/cloudsmith_cli/cli/tests/commands/test_metadata.py index f55b7cb7..afb5d316 100644 --- a/cloudsmith_cli/cli/tests/commands/test_metadata.py +++ b/cloudsmith_cli/cli/tests/commands/test_metadata.py @@ -196,6 +196,7 @@ def test_list_page_all_aggregates_all_pages(self, mock_resolve, mock_list): self.assertEqual( [item["slug_perm"] for item in payload["data"]], ["first", "second"] ) + self.assertNotIn("meta", payload) self.assertEqual(mock_list.call_count, 2) self.assertEqual( [call.kwargs["page"] for call in mock_list.call_args_list], [1, 2] From 3f58faddab5288ba3f63705b3321602fa298de42 Mon Sep 17 00:00:00 2001 From: Bartosz Blizniak Date: Fri, 1 May 2026 16:22:37 +0100 Subject: [PATCH 5/5] format to fstring --- cloudsmith_cli/cli/commands/metadata.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cloudsmith_cli/cli/commands/metadata.py b/cloudsmith_cli/cli/commands/metadata.py index 3ad10c89..7574ebb9 100644 --- a/cloudsmith_cli/cli/commands/metadata.py +++ b/cloudsmith_cli/cli/commands/metadata.py @@ -69,7 +69,7 @@ def _print_metadata_table(opts, entries, page_info=None, page_all=False): click.echo() num_results = len(rows) - list_suffix = "metadata entr%s" % ("ies" if num_results != 1 else "y") + list_suffix = f"metadata entr{'ies' if num_results != 1 else 'y'}" utils.pretty_print_list_info( num_results=num_results, page_info=None if page_all else page_info,