Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ description = "GitHub bot for PSRT activities on GHSAs"
readme = "README.md"
requires-python = ">=3.14.0"
dependencies = [
"codeowners>=0.8.0",
"cvelib>=1.4.0",
"githubkit[auth-app]>=0.13.5",
"python-dotenv>=1.0.0",
Expand Down
60 changes: 60 additions & 0 deletions src/psrt_ghsa_bot/_codeowners.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""Resolving CODEOWNERS for the files changed by a GHSA's fix."""

import json
import typing

from codeowners import CodeOwners
from githubkit import GitHub
from githubkit.exception import RequestFailed


def load_codeowners(github: GitHub, owner: str, repo: str) -> CodeOwners | None:
"""Fetch and parse a repo's '.github/CODEOWNERS' on the 'main' branch."""
try:
response = github.rest.repos.get_content(
owner=owner,
repo=repo,
path=".github/CODEOWNERS",
ref="main",
headers={"Accept": "application/vnd.github.raw+json"},
)
except RequestFailed as e:
if e.response.status_code == 404:
return None # No such file
raise
return CodeOwners(response.content.decode())


def code_owners_for_files(code_owners: CodeOwners, filenames: typing.Iterable[str]) -> tuple[set[str], set[str]]:
"""Resolve the changed files to the CODEOWNERS (users/teams) for them."""
users = set()
teams = set()
for filename in filenames:
for kind, owner in code_owners.of(filename):
if kind == "USERNAME":
users.add(owner.lstrip("@").lower())
elif kind == "TEAM":
teams.add(owner.lstrip("@").lower())
return users, teams


def get_advisory_changed_files(github: GitHub, private_fork: dict) -> set[str]:
"""List files changed by the open fix pull request (against 'main') in a private fork."""
try:
pulls_response = github.rest.pulls.list(
owner=private_fork["owner"]["login"], repo=private_fork["name"], state="open", base="main", per_page=100
)
except RequestFailed as e:
if e.response.status_code == 404:
return set()
raise
# Parse JSON directly to bypass Pydantic validation
pulls = json.loads(pulls_response.content)
if not pulls:
return set()

pull = pulls[0]
files_response = github.rest.pulls.list_files(
owner=private_fork["owner"]["login"], repo=private_fork["name"], pull_number=pull["number"], per_page=100
)
return {file["filename"] for file in json.loads(files_response.content)}
51 changes: 42 additions & 9 deletions src/psrt_ghsa_bot/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,17 @@
import urllib.parse

import urllib3
from codeowners import CodeOwners
from cvelib.cve_api import CveApi
from dotenv import load_dotenv
from githubkit import AppAuthStrategy, GitHub
from githubkit.exception import RequestFailed, RequestError

from psrt_ghsa_bot._codeowners import (
code_owners_for_files,
get_advisory_changed_files,
load_codeowners,
)
from psrt_ghsa_bot._sentry_monitoring import (
MONITOR_SLUG_GHSA,
STATUS_ERROR,
Expand Down Expand Up @@ -147,7 +153,13 @@ def reserve_one_cve(cve_api: CveApi) -> str:


def apply_to_repo(
github: GitHub, owner: str, repo: str, cve_api: CveApi, *, collaborating_users: set[str] | None = None
github: GitHub,
owner: str,
repo: str,
cve_api: CveApi,
*,
collaborating_users: set[str] | None = None,
code_owners: CodeOwners | None = None,
) -> None:
"""Applies the PSRT GitHub Security Advisory process to the repository."""
security_advisories = get_repository_advisories(github, owner, repo)
Expand Down Expand Up @@ -205,21 +217,36 @@ def apply_to_repo(
patch_data["cve_id"] = cve_id
print(f" ✅ Will reserve CVE ID: {cve_id}")

# Resolve the CODEOWNERS for the files changed in the private fork
# so they can be added as collaborators on the advisory.
codeowner_users = set()
codeowner_teams = set()
private_fork = security_advisory.get("private_fork")
if (code_owners is not None) and (private_fork is not None):
changed_files = get_advisory_changed_files(github, private_fork)
codeowner_users, codeowner_teams = code_owners_for_files(code_owners, changed_files)
if codeowner_users or codeowner_teams:
print(
f" 👥 CODEOWNERS for changed files: "
f"users={sorted(codeowner_users)} teams={sorted(codeowner_teams)}"
)

teams_to_add = {PSRT_GITHUB_TEAM_SLUG} | codeowner_teams
collaborating_teams = {team["slug"] for team in security_advisory["collaborating_teams"]}
if PSRT_GITHUB_TEAM_SLUG not in collaborating_teams:
collaborating_teams.add(PSRT_GITHUB_TEAM_SLUG)
patch_data["collaborating_teams"] = sorted(collaborating_teams)
print(f" ➕ Will ensure team present: {PSRT_GITHUB_TEAM_SLUG}")
if teams_to_add - collaborating_teams:
patch_data["collaborating_teams"] = sorted(collaborating_teams | teams_to_add)
print(f" ➕ Will ensure teams present: {sorted(teams_to_add)}")

if collaborating_users:
users_to_add = (collaborating_users or set()) | codeowner_users
if users_to_add:
# Determine if we set the 'collaborating_users' field
# at all by seeing if there are missing users on the
# advisory. Preserve the old list, as this is edited
# manually by coordinators.
prev_collaborating_users = {user["login"].lower() for user in security_advisory["collaborating_users"]}
if collaborating_users - prev_collaborating_users:
if users_to_add - prev_collaborating_users:
# Sorting is only done for consistency's sake in testing.
new_collaborating_users = sorted(collaborating_users | prev_collaborating_users)
new_collaborating_users = sorted(users_to_add | prev_collaborating_users)
patch_data["collaborating_users"] = new_collaborating_users
print(f" ➕ Will ensure users are present: {new_collaborating_users}")

Expand Down Expand Up @@ -298,8 +325,14 @@ def fetch_collaborating_users(installation_github: GitHub) -> set[str]:
)
for repo in repos:
print(f" Checking repo: {repo.owner.login}/{repo.name}")
code_owners = load_codeowners(installation_github, repo.owner.login, repo.name)
apply_to_repo(
installation_github, repo.owner.login, repo.name, cve_api, collaborating_users=collaborating_users
installation_github,
repo.owner.login,
repo.name,
cve_api,
collaborating_users=collaborating_users,
code_owners=code_owners,
)

print(f"\nDone! Processed {installation_count} installation(s).")
Expand Down
85 changes: 85 additions & 0 deletions tests/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from unittest import mock

import pytest
from codeowners import CodeOwners

from psrt_ghsa_bot import app

Expand Down Expand Up @@ -284,3 +285,87 @@ def test_reserve_one_cve_id(cve_reserve_response, cve_id, year) -> None:
assert app.reserve_one_cve(cve_api) == cve_id

cve_api.reserve.assert_called_with(count=1, year=year, random=True)


def test_adds_codeowners_from_changed_files() -> None:
code_owners = CodeOwners("""
Lib/ @StanFromIreland @hugovk
.github/ @python/psrt
""")
security_advisory = _create_advisory_dict("draft", "CVE-2026-0001", ["psrt"])
github = mock.Mock()
cve_api = mock.Mock()

with (
mock.patch("psrt_ghsa_bot.app.get_repository_advisories") as get_repo_advs,
mock.patch("psrt_ghsa_bot.app.get_advisory_changed_files") as changed_files,
):
get_repo_advs.return_value = [security_advisory]
changed_files.return_value = ["Lib/foo.py", ".github/workflows/ci.yml"]

app.apply_to_repo(github, "owner", "repo", cve_api, code_owners=code_owners)

github.rest.security_advisories.update_repository_advisory.assert_called_once_with(
owner="owner",
repo="repo",
ghsa_id="GHSA-xxxx-xxxx-xxxx",
data={
"collaborating_teams": ["psrt", "python/psrt"],
"collaborating_users": ["hugovk", "octocat", "stanfromireland"],
},
)


def test_combines_codeowners_and_collaborating_users() -> None:
code_owners = CodeOwners("""
Lib/ @StanFromIreland @hugovk
.github/ @python/psrt
""")
security_advisory = _create_advisory_dict("draft", "CVE-2026-0001", ["psrt"])
github = mock.Mock()
cve_api = mock.Mock()

with (
mock.patch("psrt_ghsa_bot.app.get_repository_advisories") as get_repo_advs,
mock.patch("psrt_ghsa_bot.app.get_advisory_changed_files") as changed_files,
):
get_repo_advs.return_value = [security_advisory]
changed_files.return_value = ["Lib/foo.py"]

app.apply_to_repo(
github,
"owner",
"repo",
cve_api,
collaborating_users={"sethmlarson", "hugovk"},
code_owners=code_owners,
)

github.rest.security_advisories.update_repository_advisory.assert_called_once_with(
owner="owner",
repo="repo",
ghsa_id="GHSA-xxxx-xxxx-xxxx",
data={"collaborating_users": ["hugovk", "octocat", "sethmlarson", "stanfromireland"]},
)


def test_skips_codeowners_lookup_without_private_fork() -> None:
code_owners = CodeOwners("""
Lib/ @StanFromIreland @hugovk
.github/ @python/psrt
""")
security_advisory = _create_advisory_dict("draft", "CVE-2026-0001", ["psrt"])
security_advisory.pop("private_fork", None)
github = mock.Mock()
cve_api = mock.Mock()

with (
mock.patch("psrt_ghsa_bot.app.get_repository_advisories") as get_repo_advs,
mock.patch("psrt_ghsa_bot.app.get_advisory_changed_files") as changed_files,
):
get_repo_advs.return_value = [security_advisory]

app.apply_to_repo(github, "owner", "repo", cve_api, code_owners=code_owners)

changed_files.assert_not_called()
github.rest.security_advisories.update_repository_advisory.assert_not_called()
85 changes: 85 additions & 0 deletions tests/test_codeowners.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import json
from unittest import mock

from codeowners import CodeOwners

from psrt_ghsa_bot import _codeowners


def _request_failed(status_code: int) -> _codeowners.RequestFailed:
"""Build a githubkit RequestFailed exception w/ the given status code."""
response = mock.Mock()
response.status_code = status_code
return _codeowners.RequestFailed(response)


def test_code_owners_for_files() -> None:
code_owners = CodeOwners("""
*.py @StanFromIreland
Doc/ @python/docs
""")

users, teams = _codeowners.code_owners_for_files(code_owners, ["x.py", "Doc/index.rst", "README.md"])
assert users == {"stanfromireland"}
assert teams == {"python/docs"}

users, teams = _codeowners.code_owners_for_files(code_owners, ["Modules/spam.c"])
assert users == set()
assert teams == set()


def test_load_codeowners() -> None:
github = mock.Mock()
response = mock.Mock()
response.content = b"*.py @StanFromIreland\n"
github.rest.repos.get_content.return_value = response

# CODEOWNERS are available
code_owners = _codeowners.load_codeowners(github, "owner", "repo")
github.rest.repos.get_content.assert_called_once_with(
owner="owner",
repo="repo",
path=".github/CODEOWNERS",
ref="main",
headers={"Accept": "application/vnd.github.raw+json"},
)
assert code_owners.of("x.py") == [("USERNAME", "@StanFromIreland")]

# CODEOWNERS are missing
github.rest.repos.get_content.side_effect = _request_failed(404)
assert _codeowners.load_codeowners(github, "owner", "repo") is None


def test_get_advisory_changed_files() -> None:
github = mock.Mock()
pulls_response = mock.Mock()
pulls_response.content = json.dumps([{"number": 7}]).encode()
github.rest.pulls.list.return_value = pulls_response
files_response = mock.Mock()
files_response.content = json.dumps([{"filename": "Lib/foo.py"}, {"filename": "Doc/bar.rst"}]).encode()
github.rest.pulls.list_files.return_value = files_response

private_fork = {"owner": {"login": "fork-owner"}, "name": "fork"}
files = _codeowners.get_advisory_changed_files(github, private_fork)

# Only open PRs against the 'main' branch are considered
github.rest.pulls.list.assert_called_once_with(
owner="fork-owner", repo="fork", state="open", base="main", per_page=100
)
github.rest.pulls.list_files.assert_called_once_with(owner="fork-owner", repo="fork", pull_number=7, per_page=100)
assert files == {"Doc/bar.rst", "Lib/foo.py"}


def test_get_advisory_changed_files_no_open_pr() -> None:
github = mock.Mock()
pulls_response = mock.Mock()
pulls_response.content = json.dumps([]).encode()
github.rest.pulls.list.return_value = pulls_response

files = _codeowners.get_advisory_changed_files(github, {"owner": {"login": "fork-owner"}, "name": "fork"})
assert files == set()
github.rest.pulls.list_files.assert_not_called()

github.rest.pulls.list.side_effect = _request_failed(404)
files = _codeowners.get_advisory_changed_files(github, {"owner": {"login": "fork-owner"}, "name": "fork"})
assert files == set()
14 changes: 14 additions & 0 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.