diff --git a/pyproject.toml b/pyproject.toml index 463aeb0..a4dcfd6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,6 +5,7 @@ description = "GitHub bot for PSRT activities on GHSAs" readme = "README.md" requires-python = ">=3.14.0" dependencies = [ + "codeowners>=0.8.0", "cvelib>=1.4.0", "githubkit[auth-app]>=0.13.5", "python-dotenv>=1.0.0", diff --git a/src/psrt_ghsa_bot/_codeowners.py b/src/psrt_ghsa_bot/_codeowners.py new file mode 100644 index 0000000..165d2c6 --- /dev/null +++ b/src/psrt_ghsa_bot/_codeowners.py @@ -0,0 +1,60 @@ +"""Resolving CODEOWNERS for the files changed by a GHSA's fix.""" + +import json +import typing + +from codeowners import CodeOwners +from githubkit import GitHub +from githubkit.exception import RequestFailed + + +def load_codeowners(github: GitHub, owner: str, repo: str) -> CodeOwners | None: + """Fetch and parse a repo's '.github/CODEOWNERS' on the 'main' branch.""" + try: + response = github.rest.repos.get_content( + owner=owner, + repo=repo, + path=".github/CODEOWNERS", + ref="main", + headers={"Accept": "application/vnd.github.raw+json"}, + ) + except RequestFailed as e: + if e.response.status_code == 404: + return None # No such file + raise + return CodeOwners(response.content.decode()) + + +def code_owners_for_files(code_owners: CodeOwners, filenames: typing.Iterable[str]) -> tuple[set[str], set[str]]: + """Resolve the changed files to the CODEOWNERS (users/teams) for them.""" + users = set() + teams = set() + for filename in filenames: + for kind, owner in code_owners.of(filename): + if kind == "USERNAME": + users.add(owner.lstrip("@").lower()) + elif kind == "TEAM": + teams.add(owner.lstrip("@").lower()) + return users, teams + + +def get_advisory_changed_files(github: GitHub, private_fork: dict) -> set[str]: + """List files changed by the open fix pull request (against 'main') in a private fork.""" + try: + pulls_response = github.rest.pulls.list( + owner=private_fork["owner"]["login"], repo=private_fork["name"], state="open", base="main", per_page=100 + ) + except RequestFailed as e: + if e.response.status_code == 404: + return set() + raise + # Parse JSON directly to bypass Pydantic validation + pulls = json.loads(pulls_response.content) + if not pulls: + return set() + + pull = pulls[0] + files_response = github.rest.pulls.list_files( + owner=private_fork["owner"]["login"], repo=private_fork["name"], pull_number=pull["number"], per_page=100 + ) + return {file["filename"] for file in json.loads(files_response.content)} diff --git a/src/psrt_ghsa_bot/app.py b/src/psrt_ghsa_bot/app.py index 40e0907..2b53f02 100644 --- a/src/psrt_ghsa_bot/app.py +++ b/src/psrt_ghsa_bot/app.py @@ -11,11 +11,17 @@ import urllib.parse import urllib3 +from codeowners import CodeOwners from cvelib.cve_api import CveApi from dotenv import load_dotenv from githubkit import AppAuthStrategy, GitHub from githubkit.exception import RequestFailed, RequestError +from psrt_ghsa_bot._codeowners import ( + code_owners_for_files, + get_advisory_changed_files, + load_codeowners, +) from psrt_ghsa_bot._sentry_monitoring import ( MONITOR_SLUG_GHSA, STATUS_ERROR, @@ -147,7 +153,13 @@ def reserve_one_cve(cve_api: CveApi) -> str: def apply_to_repo( - github: GitHub, owner: str, repo: str, cve_api: CveApi, *, collaborating_users: set[str] | None = None + github: GitHub, + owner: str, + repo: str, + cve_api: CveApi, + *, + collaborating_users: set[str] | None = None, + code_owners: CodeOwners | None = None, ) -> None: """Applies the PSRT GitHub Security Advisory process to the repository.""" security_advisories = get_repository_advisories(github, owner, repo) @@ -205,21 +217,36 @@ def apply_to_repo( patch_data["cve_id"] = cve_id print(f" ✅ Will reserve CVE ID: {cve_id}") + # Resolve the CODEOWNERS for the files changed in the private fork + # so they can be added as collaborators on the advisory. + codeowner_users = set() + codeowner_teams = set() + private_fork = security_advisory.get("private_fork") + if (code_owners is not None) and (private_fork is not None): + changed_files = get_advisory_changed_files(github, private_fork) + codeowner_users, codeowner_teams = code_owners_for_files(code_owners, changed_files) + if codeowner_users or codeowner_teams: + print( + f" 👥 CODEOWNERS for changed files: " + f"users={sorted(codeowner_users)} teams={sorted(codeowner_teams)}" + ) + + teams_to_add = {PSRT_GITHUB_TEAM_SLUG} | codeowner_teams collaborating_teams = {team["slug"] for team in security_advisory["collaborating_teams"]} - if PSRT_GITHUB_TEAM_SLUG not in collaborating_teams: - collaborating_teams.add(PSRT_GITHUB_TEAM_SLUG) - patch_data["collaborating_teams"] = sorted(collaborating_teams) - print(f" ➕ Will ensure team present: {PSRT_GITHUB_TEAM_SLUG}") + if teams_to_add - collaborating_teams: + patch_data["collaborating_teams"] = sorted(collaborating_teams | teams_to_add) + print(f" ➕ Will ensure teams present: {sorted(teams_to_add)}") - if collaborating_users: + users_to_add = (collaborating_users or set()) | codeowner_users + if users_to_add: # Determine if we set the 'collaborating_users' field # at all by seeing if there are missing users on the # advisory. Preserve the old list, as this is edited # manually by coordinators. prev_collaborating_users = {user["login"].lower() for user in security_advisory["collaborating_users"]} - if collaborating_users - prev_collaborating_users: + if users_to_add - prev_collaborating_users: # Sorting is only done for consistency's sake in testing. - new_collaborating_users = sorted(collaborating_users | prev_collaborating_users) + new_collaborating_users = sorted(users_to_add | prev_collaborating_users) patch_data["collaborating_users"] = new_collaborating_users print(f" ➕ Will ensure users are present: {new_collaborating_users}") @@ -298,8 +325,14 @@ def fetch_collaborating_users(installation_github: GitHub) -> set[str]: ) for repo in repos: print(f" Checking repo: {repo.owner.login}/{repo.name}") + code_owners = load_codeowners(installation_github, repo.owner.login, repo.name) apply_to_repo( - installation_github, repo.owner.login, repo.name, cve_api, collaborating_users=collaborating_users + installation_github, + repo.owner.login, + repo.name, + cve_api, + collaborating_users=collaborating_users, + code_owners=code_owners, ) print(f"\nDone! Processed {installation_count} installation(s).") diff --git a/tests/test_app.py b/tests/test_app.py index 08cb344..fc93e94 100644 --- a/tests/test_app.py +++ b/tests/test_app.py @@ -2,6 +2,7 @@ from unittest import mock import pytest +from codeowners import CodeOwners from psrt_ghsa_bot import app @@ -284,3 +285,87 @@ def test_reserve_one_cve_id(cve_reserve_response, cve_id, year) -> None: assert app.reserve_one_cve(cve_api) == cve_id cve_api.reserve.assert_called_with(count=1, year=year, random=True) + + +def test_adds_codeowners_from_changed_files() -> None: + code_owners = CodeOwners(""" +Lib/ @StanFromIreland @hugovk +.github/ @python/psrt +""") + security_advisory = _create_advisory_dict("draft", "CVE-2026-0001", ["psrt"]) + github = mock.Mock() + cve_api = mock.Mock() + + with ( + mock.patch("psrt_ghsa_bot.app.get_repository_advisories") as get_repo_advs, + mock.patch("psrt_ghsa_bot.app.get_advisory_changed_files") as changed_files, + ): + get_repo_advs.return_value = [security_advisory] + changed_files.return_value = ["Lib/foo.py", ".github/workflows/ci.yml"] + + app.apply_to_repo(github, "owner", "repo", cve_api, code_owners=code_owners) + + github.rest.security_advisories.update_repository_advisory.assert_called_once_with( + owner="owner", + repo="repo", + ghsa_id="GHSA-xxxx-xxxx-xxxx", + data={ + "collaborating_teams": ["psrt", "python/psrt"], + "collaborating_users": ["hugovk", "octocat", "stanfromireland"], + }, + ) + + +def test_combines_codeowners_and_collaborating_users() -> None: + code_owners = CodeOwners(""" +Lib/ @StanFromIreland @hugovk +.github/ @python/psrt +""") + security_advisory = _create_advisory_dict("draft", "CVE-2026-0001", ["psrt"]) + github = mock.Mock() + cve_api = mock.Mock() + + with ( + mock.patch("psrt_ghsa_bot.app.get_repository_advisories") as get_repo_advs, + mock.patch("psrt_ghsa_bot.app.get_advisory_changed_files") as changed_files, + ): + get_repo_advs.return_value = [security_advisory] + changed_files.return_value = ["Lib/foo.py"] + + app.apply_to_repo( + github, + "owner", + "repo", + cve_api, + collaborating_users={"sethmlarson", "hugovk"}, + code_owners=code_owners, + ) + + github.rest.security_advisories.update_repository_advisory.assert_called_once_with( + owner="owner", + repo="repo", + ghsa_id="GHSA-xxxx-xxxx-xxxx", + data={"collaborating_users": ["hugovk", "octocat", "sethmlarson", "stanfromireland"]}, + ) + + +def test_skips_codeowners_lookup_without_private_fork() -> None: + code_owners = CodeOwners(""" +Lib/ @StanFromIreland @hugovk +.github/ @python/psrt +""") + security_advisory = _create_advisory_dict("draft", "CVE-2026-0001", ["psrt"]) + security_advisory.pop("private_fork", None) + github = mock.Mock() + cve_api = mock.Mock() + + with ( + mock.patch("psrt_ghsa_bot.app.get_repository_advisories") as get_repo_advs, + mock.patch("psrt_ghsa_bot.app.get_advisory_changed_files") as changed_files, + ): + get_repo_advs.return_value = [security_advisory] + + app.apply_to_repo(github, "owner", "repo", cve_api, code_owners=code_owners) + + changed_files.assert_not_called() + github.rest.security_advisories.update_repository_advisory.assert_not_called() diff --git a/tests/test_codeowners.py b/tests/test_codeowners.py new file mode 100644 index 0000000..ec97fde --- /dev/null +++ b/tests/test_codeowners.py @@ -0,0 +1,85 @@ +import json +from unittest import mock + +from codeowners import CodeOwners + +from psrt_ghsa_bot import _codeowners + + +def _request_failed(status_code: int) -> _codeowners.RequestFailed: + """Build a githubkit RequestFailed exception w/ the given status code.""" + response = mock.Mock() + response.status_code = status_code + return _codeowners.RequestFailed(response) + + +def test_code_owners_for_files() -> None: + code_owners = CodeOwners(""" +*.py @StanFromIreland +Doc/ @python/docs +""") + + users, teams = _codeowners.code_owners_for_files(code_owners, ["x.py", "Doc/index.rst", "README.md"]) + assert users == {"stanfromireland"} + assert teams == {"python/docs"} + + users, teams = _codeowners.code_owners_for_files(code_owners, ["Modules/spam.c"]) + assert users == set() + assert teams == set() + + +def test_load_codeowners() -> None: + github = mock.Mock() + response = mock.Mock() + response.content = b"*.py @StanFromIreland\n" + github.rest.repos.get_content.return_value = response + + # CODEOWNERS are available + code_owners = _codeowners.load_codeowners(github, "owner", "repo") + github.rest.repos.get_content.assert_called_once_with( + owner="owner", + repo="repo", + path=".github/CODEOWNERS", + ref="main", + headers={"Accept": "application/vnd.github.raw+json"}, + ) + assert code_owners.of("x.py") == [("USERNAME", "@StanFromIreland")] + + # CODEOWNERS are missing + github.rest.repos.get_content.side_effect = _request_failed(404) + assert _codeowners.load_codeowners(github, "owner", "repo") is None + + +def test_get_advisory_changed_files() -> None: + github = mock.Mock() + pulls_response = mock.Mock() + pulls_response.content = json.dumps([{"number": 7}]).encode() + github.rest.pulls.list.return_value = pulls_response + files_response = mock.Mock() + files_response.content = json.dumps([{"filename": "Lib/foo.py"}, {"filename": "Doc/bar.rst"}]).encode() + github.rest.pulls.list_files.return_value = files_response + + private_fork = {"owner": {"login": "fork-owner"}, "name": "fork"} + files = _codeowners.get_advisory_changed_files(github, private_fork) + + # Only open PRs against the 'main' branch are considered + github.rest.pulls.list.assert_called_once_with( + owner="fork-owner", repo="fork", state="open", base="main", per_page=100 + ) + github.rest.pulls.list_files.assert_called_once_with(owner="fork-owner", repo="fork", pull_number=7, per_page=100) + assert files == {"Doc/bar.rst", "Lib/foo.py"} + + +def test_get_advisory_changed_files_no_open_pr() -> None: + github = mock.Mock() + pulls_response = mock.Mock() + pulls_response.content = json.dumps([]).encode() + github.rest.pulls.list.return_value = pulls_response + + files = _codeowners.get_advisory_changed_files(github, {"owner": {"login": "fork-owner"}, "name": "fork"}) + assert files == set() + github.rest.pulls.list_files.assert_not_called() + + github.rest.pulls.list.side_effect = _request_failed(404) + files = _codeowners.get_advisory_changed_files(github, {"owner": {"login": "fork-owner"}, "name": "fork"}) + assert files == set() diff --git a/uv.lock b/uv.lock index f260c99..7002719 100644 --- a/uv.lock +++ b/uv.lock @@ -139,6 +139,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/c7/0d/67e5b4109ea4a837e80daa87c2c696711955e40449a97e8926672534def2/click-8.4.1-py3-none-any.whl", hash = "sha256:482be17c6991b8c19c5429a1e995d9b0efdbb63172824c41f99965dc0ade8ec2", size = 116639, upload-time = "2026-05-22T04:08:35.26Z" }, ] +[[package]] +name = "codeowners" +version = "0.8.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/80/98/590929c83f921a75a3153bff3328e143ae900f757dffc722656b58003ba1/codeowners-0.8.0.tar.gz", hash = "sha256:6505727574b67db4832a3d0d5e41c0f7eb80f8858ee27996e85b1e046f9266f8", size = 7749, upload-time = "2025-04-26T02:20:37.743Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/75/e5/ae08304853f5c6184292e9c679458623c937a2e33d98730ce30fa8f4a8b5/codeowners-0.8.0-py3-none-any.whl", hash = "sha256:b92ce6f6c36dd9de5295c4dc285b2a3b14c81cae8b5eb3b05f0cfdbc4d93995a", size = 8786, upload-time = "2025-04-26T02:20:36.214Z" }, +] + [[package]] name = "colorama" version = "0.4.6" @@ -431,6 +443,7 @@ name = "psrt-ghsa-bot" version = "0.1.0" source = { editable = "." } dependencies = [ + { name = "codeowners" }, { name = "cvelib" }, { name = "githubkit", extra = ["auth-app"] }, { name = "python-dotenv" }, @@ -449,6 +462,7 @@ dev = [ [package.metadata] requires-dist = [ + { name = "codeowners", specifier = ">=0.8.0" }, { name = "cvelib", specifier = ">=1.4.0" }, { name = "githubkit", extras = ["auth-app"], specifier = ">=0.13.5" }, { name = "python-dotenv", specifier = ">=1.0.0" },