Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ GH_AUTH_TOKEN="ghp_123456"
CVE_USERNAME="user@example.org"
CVE_API_KEY="123456"
CVE_ENV="testproddev"
SENTRY_DSN="{PROTOCOL}://{PUBLIC_KEY}:{SECRET_KEY}@{HOST}{PATH}/{PROJECT_ID}"
1 change: 1 addition & 0 deletions .github/workflows/cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,4 @@ jobs:
CVE_USERNAME: ${{ vars.CVE_USERNAME }}
CVE_API_KEY: ${{ secrets.CVE_API_KEY }}
CVE_ENV: ${{ vars.CVE_ENV }}
SENTRY_DSN: ${{ secrets.SENTRY_DSN }}
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ dependencies = [
"cvelib>=1.4.0",
"githubkit[auth-app]>=0.13.5",
"python-dotenv>=1.0.0",
"sentry-sdk>=2.22.0",
]

[dependency-groups]
Expand Down
40 changes: 40 additions & 0 deletions src/psrt_ghsa_bot/_sentry_monitoring.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""Sentry cron monitoring integration for the PSRT GHSA bot."""

import os

import sentry_sdk
from sentry_sdk import crons

MONITOR_SLUG_GHSA = "psrt-ghsa-cron"

STATUS_IN_PROGRESS = "in_progress"
STATUS_OK = "ok"
STATUS_ERROR = "error"


def init_sentry() -> None:
"""Initialize the Sentry SDK with the DSN from the env"""
dsn = os.environ.get("SENTRY_DSN")
if not dsn:
return

sentry_sdk.init(
dsn=dsn,
enable_tracing=False,
)


def capture_checkin(monitor_slug, status, duration=None, check_in_id=None):
"""Capture a Sentry cron check-in."""
if not os.environ.get("SENTRY_DSN"):
return None

try:
return crons.capture_checkin(
monitor_slug=monitor_slug,
status=status,
duration=duration,
check_in_id=check_in_id,
)
except ImportError, AttributeError:
return None
27 changes: 26 additions & 1 deletion src/psrt_ghsa_bot/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import json
import os
import re
import time
import typing
import urllib.parse

Expand All @@ -15,6 +16,15 @@
from githubkit import AppAuthStrategy, GitHub
from githubkit.exception import RequestFailed, RequestError

from psrt_ghsa_bot._sentry_monitoring import (
MONITOR_SLUG_GHSA,
STATUS_ERROR,
STATUS_IN_PROGRESS,
STATUS_OK,
capture_checkin,
init_sentry,
)

load_dotenv()

PSRT_GITHUB_TEAM_ORG = "python"
Expand Down Expand Up @@ -233,7 +243,7 @@ def apply_to_repo(
print(" ℹ️ No security advisories found")


def main() -> None:
def run() -> None:
print("Starting PSRT GitHub Security Advisory bot...")
gh_client_private_key = base64.b64decode(os.environ["GH_CLIENT_PRIVATE_KEY"]).decode().strip()
github = GitHub(
Expand Down Expand Up @@ -295,5 +305,20 @@ def fetch_collaborating_users(installation_github: GitHub) -> set[str]:
print(f"\nDone! Processed {installation_count} installation(s).")


def main() -> None:
# Report the cron run to Sentry so we're alerted if it fails or stops running.
init_sentry()
check_in_id = capture_checkin(MONITOR_SLUG_GHSA, STATUS_IN_PROGRESS)
start_time = time.monotonic()
try:
run()
except Exception:
capture_checkin(
MONITOR_SLUG_GHSA, STATUS_ERROR, duration=time.monotonic() - start_time, check_in_id=check_in_id
)
raise
capture_checkin(MONITOR_SLUG_GHSA, STATUS_OK, duration=time.monotonic() - start_time, check_in_id=check_in_id)


if __name__ == "__main__":
main()
74 changes: 74 additions & 0 deletions tests/test_sentry_monitoring.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from unittest import mock

import pytest

from psrt_ghsa_bot import _sentry_monitoring as sm
from psrt_ghsa_bot import app

DSN = "https://public@o0.ingest.sentry.io/0"


def test_noop_without_dsn(monkeypatch) -> None:
monkeypatch.delenv("SENTRY_DSN", raising=False)
with mock.patch.object(sm.sentry_sdk, "init") as init:
sm.init_sentry()
init.assert_not_called()

with mock.patch.object(sm.crons, "capture_checkin") as capture:
result = sm.capture_checkin(sm.MONITOR_SLUG_GHSA, sm.STATUS_OK)
assert result is None
capture.assert_not_called()


def test_init_with_dsn(monkeypatch) -> None:
monkeypatch.setenv("SENTRY_DSN", DSN)
with mock.patch.object(sm.sentry_sdk, "init") as init:
sm.init_sentry()
init.assert_called_once()
assert init.call_args.kwargs["dsn"] == DSN


def test_capture_checkin_with_dsn(monkeypatch) -> None:
monkeypatch.setenv("SENTRY_DSN", DSN)
with mock.patch.object(sm.crons, "capture_checkin", return_value="stanstan") as capture:
result = sm.capture_checkin(sm.MONITOR_SLUG_GHSA, sm.STATUS_OK, duration=1.5, check_in_id="stanstan")
assert result == "stanstan"
capture.assert_called_once_with(
monitor_slug=sm.MONITOR_SLUG_GHSA,
status=sm.STATUS_OK,
duration=1.5,
check_in_id="stanstan",
)


def test_capture_checkin_swallow_sdk_errors(monkeypatch) -> None:
monkeypatch.setenv("SENTRY_DSN", DSN)
with mock.patch.object(sm.crons, "capture_checkin", side_effect=AttributeError):
assert sm.capture_checkin(sm.MONITOR_SLUG_GHSA, sm.STATUS_OK) is None


def test_main_reports_ok_on_success() -> None:
with (
mock.patch("psrt_ghsa_bot.app.init_sentry") as init,
mock.patch("psrt_ghsa_bot.app.run") as run,
mock.patch("psrt_ghsa_bot.app.capture_checkin", return_value="cid") as capture,
):
app.main()

init.assert_called_once()
run.assert_called_once()
assert [c.args[1] for c in capture.call_args_list] == [app.STATUS_IN_PROGRESS, app.STATUS_OK]
assert capture.call_args_list[-1].kwargs["check_in_id"] == "cid"


def test_main_reports_error_and_reraises() -> None:
with (
mock.patch("psrt_ghsa_bot.app.init_sentry"),
mock.patch("psrt_ghsa_bot.app.run", side_effect=RuntimeError("uh oh")),
mock.patch("psrt_ghsa_bot.app.capture_checkin", return_value="cid") as capture,
):
with pytest.raises(RuntimeError, match="uh oh"):
app.main()

assert [c.args[1] for c in capture.call_args_list] == [app.STATUS_IN_PROGRESS, app.STATUS_ERROR]
assert capture.call_args_list[-1].kwargs["check_in_id"] == "cid"
15 changes: 15 additions & 0 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.