Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 3 additions & 72 deletions src/psrt_ghsa_bot/app.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""GitHub application which applies the PSRT process for GitHub Security Advisories."""
"""GitHub application that applies the PSRT process for GitHub Security Advisories."""

import base64
import csv
import datetime
import json
import os
Expand All @@ -10,7 +9,6 @@
import typing
import urllib.parse

import urllib3
from cvelib.cve_api import CveApi
from dotenv import load_dotenv
from githubkit import AppAuthStrategy, GitHub
Expand All @@ -27,7 +25,6 @@

load_dotenv()

PSRT_GITHUB_TEAM_ORG = "python"
PSRT_GITHUB_TEAM_SLUG = "psrt"
COMPLETION_TAGS = (
"CLOSE",
Expand All @@ -40,34 +37,6 @@
)


def load_psrt_members_from_devguide() -> set[str]:
"""The PSRT GitHub team only supports adding users that are
already within the 'python' GitHub org. This allows users
that aren't in the org team to be added automatically to
GHSA advisories.
"""
psrt_csv_url = "https://raw.githubusercontent.com/python/devguide/refs/heads/main/security/psrt.csv"
resp = urllib3.request(
"GET", psrt_csv_url, timeout=10, redirect=False, retries=urllib3.Retry(total=5, backoff_factor=2)
)
if resp.status != 200:
raise RuntimeError(
f"Couldn't resolve PSRT members from python/devguide (status={resp.status} data={resp.data[:500]})"
)
rows = csv.reader(resp.data.decode().splitlines())
return {github_login.lower() for _, github_login, *_ in rows}


def load_psrt_members_from_github(github: GitHub) -> set[str]:
"""Loads the GitHub usernames from the PSRT team on GitHub"""
team_members = github.rest.teams.list_members_in_org(
org=PSRT_GITHUB_TEAM_ORG,
team_slug=PSRT_GITHUB_TEAM_SLUG,
per_page=100,
)
return {member.login.lower() for member in team_members.parsed_data}


def get_repository_advisories(
github: GitHub,
owner: str,
Expand Down Expand Up @@ -146,9 +115,7 @@ def reserve_one_cve(cve_api: CveApi) -> str:
return cve_ids[0]


def apply_to_repo(
github: GitHub, owner: str, repo: str, cve_api: CveApi, *, collaborating_users: set[str] | None = None
) -> None:
def apply_to_repo(github: GitHub, owner: str, repo: str, cve_api: CveApi) -> None:
"""Applies the PSRT GitHub Security Advisory process to the repository."""
security_advisories = get_repository_advisories(github, owner, repo)
advisory_count = 0
Expand Down Expand Up @@ -211,18 +178,6 @@ def apply_to_repo(
patch_data["collaborating_teams"] = sorted(collaborating_teams)
print(f" ➕ Will ensure team present: {PSRT_GITHUB_TEAM_SLUG}")

if collaborating_users:
# Determine if we set the 'collaborating_users' field
# at all by seeing if there are missing users on the
# advisory. Preserve the old list, as this is edited
# manually by coordinators.
prev_collaborating_users = {user["login"].lower() for user in security_advisory["collaborating_users"]}
if collaborating_users - prev_collaborating_users:
# Sorting is only done for consistency's sake in testing.
new_collaborating_users = sorted(collaborating_users | prev_collaborating_users)
patch_data["collaborating_users"] = new_collaborating_users
print(f" ➕ Will ensure users are present: {new_collaborating_users}")

# Apply updates, if any, to the security advisory.
if patch_data:
try:
Expand Down Expand Up @@ -256,27 +211,6 @@ def run() -> None:
env=os.environ.get("CVE_ENV", "prod"),
)

psrt_members_devguide: set[str] | None = None
psrt_members_github: set[str] | None = None

def fetch_collaborating_users(installation_github: GitHub) -> set[str]:
nonlocal psrt_members_github, psrt_members_devguide

# Only run the fetching step once.
if psrt_members_github is None or psrt_members_devguide is None:
print("Fetching PSRT members from Developer Guide...")
psrt_members_devguide = load_psrt_members_from_devguide()

print("Fetching PSRT members from GitHub Team...")
psrt_members_github = load_psrt_members_from_github(installation_github)

# Determine which PSRT members need to be added as
# 'collaborating_users' to advisories due to not being
# in the GitHub Team and Organization.
psrt_members_not_in_github = set(psrt_members_devguide - psrt_members_github)
print(f"PSRT members not in GitHub Team: {', '.join(sorted(psrt_members_not_in_github))}")
return psrt_members_not_in_github

print("Fetching installations...")
# Apply to all repositories for each installation.
installations = github.rest.paginate(
Expand All @@ -291,16 +225,13 @@ def fetch_collaborating_users(installation_github: GitHub) -> set[str]:
github.auth.as_installation(installation_data.id),
)

collaborating_users = fetch_collaborating_users(installation_github)
repos = installation_github.rest.paginate(
installation_github.rest.apps.list_repos_accessible_to_installation,
map_func=lambda r: r.parsed_data.repositories,
)
for repo in repos:
print(f" Checking repo: {repo.owner.login}/{repo.name}")
apply_to_repo(
installation_github, repo.owner.login, repo.name, cve_api, collaborating_users=collaborating_users
)
apply_to_repo(installation_github, repo.owner.login, repo.name, cve_api)

print(f"\nDone! Processed {installation_count} installation(s).")

Expand Down
40 changes: 0 additions & 40 deletions tests/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,27 +156,6 @@ def test_does_not_reserve_cve_id_for_triage_security_advisories(state) -> None:
github.rest.security_advisories.update_repository_advisory.assert_not_called()


def test_update_collaborating_users() -> None:
github = mock.Mock()
cve_api = mock.Mock()

with (
mock.patch("psrt_ghsa_bot.app.get_repository_advisories") as get_repo_advs,
):
security_advisory = _create_advisory_dict("draft", "CVE-2026-0001", ["psrt"])
get_repo_advs.return_value = [security_advisory]

# 'alice' isn't in the GitHub Team, but is in the Devguide list.
app.apply_to_repo(github, "owner", "repo", cve_api, collaborating_users={"alice"})

github.rest.security_advisories.update_repository_advisory.assert_called_once_with(
owner="owner",
repo="repo",
ghsa_id="GHSA-xxxx-xxxx-xxxx",
data={"collaborating_users": ["alice", "octocat"]},
)


def test_create_private_fork() -> None:
github = mock.Mock()
cve_api = mock.Mock()
Expand Down Expand Up @@ -258,25 +237,6 @@ def test_accepts_advisory_with_accept_tag(summary, cve_id, cve_reserve_response)
)


def test_load_psrt_members_from_devguide() -> None:
with mock.patch("psrt_ghsa_bot.app.urllib3.request") as urllib3_request:
resp = mock.Mock()
resp.status = 200
resp.data = b"""Barry Warsaw,warsaw,Admin
Benjamin Peterson,benjaminp,
Donald Stufft,dstufft,
Dustin Ingram,di,
Ee Durbin,ewdurbin,Admin
Glyph Lefkowitz,glyph,
Gregory P. Smith,gpshead,"""

urllib3_request.return_value = resp

members = app.load_psrt_members_from_devguide()

assert members == {"warsaw", "benjaminp", "dstufft", "di", "ewdurbin", "glyph", "gpshead"}


def test_reserve_one_cve_id(cve_reserve_response, cve_id, year) -> None:
cve_api = mock.Mock()
cve_api.reserve.return_value = cve_reserve_response
Expand Down