diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 48cf627..f10e6a5 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -67,6 +67,14 @@ The repository runs an automated maintainer agent that may close PRs in the foll - Unresolved merge conflicts for 12+ hours with no resolution push - Requested changes from a maintainer for 12+ hours with no follow-up commits +## Automatic Closures + +The maintainer bot enforces these rules without manual review. Contributions that violate them are closed automatically. + +### Open item limits + +Each contributor may have at most **2 open PRs** and **2 open issues** in this repository at any time. Submitting a 3rd of either type while at the cap closes the new one on submission. The limits apply independently — you can have 2 open PRs and 2 open issues at the same time. + ## PR Labels Apply appropriate labels to help categorize and track your contribution: diff --git a/packages/das/src/api/health.controller.ts b/packages/das/src/api/health.controller.ts index d2c61e7..e38545e 100644 --- a/packages/das/src/api/health.controller.ts +++ b/packages/das/src/api/health.controller.ts @@ -3,7 +3,7 @@ import { ApiOperation, ApiTags } from "@nestjs/swagger"; import { InjectQueue } from "@nestjs/bullmq"; import { InjectRepository } from "@nestjs/typeorm"; import { Queue } from "bullmq"; -import { DataSource, Repository } from "typeorm"; +import { DataSource, Not, IsNull, Repository } from "typeorm"; import { NoCache } from "../cache"; import { Repo } from "../entities"; import { FETCH_QUEUE } from "../queue/constants"; @@ -106,7 +106,10 @@ export class HealthController { } private async listRepoHealth(): Promise { + // Soft-cleared rows (installationId=null after uninstall/remove) are kept + // for historical scoring evidence but are no longer tracked. const repos = await this.repoRepo.find({ + where: { installationId: Not(IsNull()) }, select: ["repoFullName", "lastEventAt"], }); diff --git a/packages/das/src/api/miners/miners.service.ts b/packages/das/src/api/miners/miners.service.ts index 113a502..2469d98 100644 --- a/packages/das/src/api/miners/miners.service.ts +++ b/packages/das/src/api/miners/miners.service.ts @@ -131,6 +131,10 @@ const ISSUE_SELECT_COLUMNS = ` 'head_sha', sp.head_sha, 'base_sha', sp.base_sha, 'merge_base_sha', sp.merge_base_sha, + 'base_ref', sp.base_ref, + 'head_ref', sp.head_ref, + 'head_repo_full_name', LOWER(sp.head_repo_full_name), + 'default_branch', sr.default_branch, 'labels', COALESCE(( SELECT json_agg(json_build_object( 'name', plt.label_name, @@ -152,10 +156,13 @@ const ISSUE_SELECT_COLUMNS = ` LEFT JOIN pr_review_summary rs ON rs.repo_full_name = sp.repo_full_name AND rs.pr_number = sp.pr_number + LEFT JOIN repos sr + ON sr.repo_full_name = sp.repo_full_name WHERE sp.repo_full_name = i.repo_full_name AND sp.pr_number = i.solved_by_pr -- Skip null-author solving PRs (no one to credit) AND sp.author_github_id IS NOT NULL + AND BTRIM(sp.author_github_id) <> '' -- Skip corrupted MERGED-without-merged_at shape AND NOT (sp.state = 'MERGED' AND sp.merged_at IS NULL) ) AS solving_pr`; diff --git a/packages/das/src/queue/constants.ts b/packages/das/src/queue/constants.ts index fd77f4e..04046ea 100644 --- a/packages/das/src/queue/constants.ts +++ b/packages/das/src/queue/constants.ts @@ -4,6 +4,7 @@ export const FETCH_JOBS = { PR_METADATA: "fetch-pr-metadata", PR_FILES: "fetch-pr-files", BACKFILL_REPO: "backfill-repo", + ISSUE_CLOSURE: "fetch-issue-closure", } as const; export const DEFAULT_BACKFILL_DAYS = 40; diff --git a/packages/das/src/queue/fetch.processor.ts b/packages/das/src/queue/fetch.processor.ts index 27860d2..9600559 100644 --- a/packages/das/src/queue/fetch.processor.ts +++ b/packages/das/src/queue/fetch.processor.ts @@ -29,12 +29,21 @@ export interface BackfillRepoJobData { days?: number; } +export interface IssueClosureJobData { + repoFullName: string; + issueNumber: number; +} + interface PrFilesGeneration { headSha: string | null; baseSha: string | null; } -type JobData = PrMetadataJobData | PrFilesJobData | BackfillRepoJobData; +type JobData = + | PrMetadataJobData + | PrFilesJobData + | BackfillRepoJobData + | IssueClosureJobData; @Processor(FETCH_QUEUE, { concurrency: 5 }) export class FetchProcessor extends WorkerHost { @@ -68,11 +77,46 @@ export class FetchProcessor extends WorkerHost { await this.handleBackfill(repoFullName, days ?? DEFAULT_BACKFILL_DAYS); break; } + case FETCH_JOBS.ISSUE_CLOSURE: { + const { repoFullName, issueNumber } = job.data as IssueClosureJobData; + await this.handleIssueClosure(repoFullName, issueNumber); + break; + } default: this.logger.warn(`Unknown job name: ${job.name}`); } } + private async handleIssueClosure( + repoFullName: string, + issueNumber: number, + ): Promise { + this.logger.log(`Resolving closer for ${repoFullName}#${issueNumber}`); + + const issue = await this.issueRepo.findOneBy({ + repoFullName, + issueNumber, + }); + if (!issue) return; + + // Reopens already null out solvedByPr in the webhook handler; never + // re-fetch for an open issue. + if (issue.state !== "CLOSED") { + await this.issueRepo.update( + { repoFullName, issueNumber }, + { solvedByPr: null }, + ); + return; + } + + const solvedByPr = await this.fetcher.fetchIssueClosingPr( + repoFullName, + issueNumber, + ); + + await this.issueRepo.update({ repoFullName, issueNumber }, { solvedByPr }); + } + private async handlePrMetadata( repoFullName: string, prNumber: number, @@ -81,26 +125,22 @@ export class FetchProcessor extends WorkerHost { const { closingIssueNumbers, body, lastEditedAt } = await this.fetcher.fetchPrMetadata(repoFullName, prNumber); + const currentClosingIssueNumbers = + this.uniqueIssueNumbers(closingIssueNumbers); await this.prRepo.update( { repoFullName, prNumber }, { - closingIssueNumbers, + closingIssueNumbers: currentClosingIssueNumbers, body, lastEditedAt, }, ); - // If this PR is merged, mark each linked issue as solved_by_pr - const pr = await this.prRepo.findOneBy({ repoFullName, prNumber }); - if (pr?.state === "MERGED" && closingIssueNumbers.length > 0) { - for (const issueNumber of closingIssueNumbers) { - await this.issueRepo.update( - { repoFullName, issueNumber }, - { solvedByPr: prNumber }, - ); - } - } + // Issue solver attribution is closure-driven (ISSUE_CLOSURE jobs read + // ClosedEvent.closer). PR metadata only refreshes the PR-side text view + // of which issues this PR claims to close — it never writes + // issues.solved_by_pr. } private async handlePrFiles(data: PrFilesJobData): Promise { @@ -139,6 +179,10 @@ export class FetchProcessor extends WorkerHost { ); this.logger.log(`Backfilled ${prs.length} PRs from ${repoFullName}`); + // Fetch and upsert issues before PR metadata jobs can link solved_by_pr. + await this.fetcher.backfillIssues(repoFullName, sinceDate); + this.logger.log(`Backfilled issues from ${repoFullName}`); + // Enqueue follow-up jobs (metadata + files for every PR). for (const { prNumber, headSha, baseSha } of prs) { await this.fetchQueue.add( @@ -147,7 +191,9 @@ export class FetchProcessor extends WorkerHost { { jobId: `meta-${repoFullName}-${prNumber}`, removeOnComplete: true, - removeOnFail: 50, + // Match the webhook handler — failed metadata jobs must not squat + // on the stable per-PR jobId (#75). + removeOnFail: true, attempts: 3, backoff: { type: "exponential", delay: 5000 }, }, @@ -160,10 +206,6 @@ export class FetchProcessor extends WorkerHost { baseSha ?? null, ); } - - // Fetch and upsert issues - await this.fetcher.backfillIssues(repoFullName, sinceDate); - this.logger.log(`Backfilled issues from ${repoFullName}`); } private async handleStalePrFilesJob( @@ -203,7 +245,7 @@ export class FetchProcessor extends WorkerHost { expectedBaseSha, ), removeOnComplete: true, - removeOnFail: 50, + removeOnFail: true, attempts: 3, backoff: { type: "exponential", delay: 5000 }, }, @@ -222,4 +264,8 @@ export class FetchProcessor extends WorkerHost { baseSha: generation.baseSha ?? IsNull(), }; } + + private uniqueIssueNumbers(issueNumbers: number[]): number[] { + return [...new Set(issueNumbers)]; + } } diff --git a/packages/das/src/webhook/github-fetcher.service.ts b/packages/das/src/webhook/github-fetcher.service.ts index c50e067..ba8a666 100644 --- a/packages/das/src/webhook/github-fetcher.service.ts +++ b/packages/das/src/webhook/github-fetcher.service.ts @@ -150,6 +150,14 @@ export class GitHubFetcherService implements OnModuleInit { return 60_000; } + private assertNoGraphQLErrors(body: any, context: string): void { + if (!body?.errors) return; + + throw new Error( + `${context} GraphQL errors: ${JSON.stringify(body.errors)}`, + ); + } + // --- Authentication --- private createAppJwt(): string { @@ -319,7 +327,13 @@ export class GitHubFetcherService implements OnModuleInit { } const body: any = await res.json(); - const pr = body.data?.repository?.pullRequest ?? {}; + this.assertNoGraphQLErrors(body, "PR metadata fetch"); + + const pr = body.data?.repository?.pullRequest; + if (!pr) { + throw new Error(`GraphQL PR metadata fetch returned no PR data`); + } + const nodes = pr.closingIssuesReferences?.nodes ?? []; return { @@ -346,6 +360,126 @@ export class GitHubFetcherService implements OnModuleInit { .filter((number): number is number => typeof number === "number"); } + // --- GraphQL: issue closure (which PR caused the current close) --- + + /** + * Resolve the PR responsible for an issue's current closed state. + * + * Reads `ClosedEvent.closer` from the issue timeline and anchors to the + * issue's current `closedAt`, so reopen-then-reclose cycles attribute to + * the latest closer, not whichever PR first declared `Closes #N` in its + * body. Returns the PR number when the closer is a merged same-repo PR; + * `null` for manual closes, non-PR closers (commits, projects), or + * `NOT_PLANNED` closures. + * + * Source of truth for `issues.solved_by_pr`. Issue discovery and the + * issue-bounty solver lookup both read from this field, so they stay 1:1. + */ + async fetchIssueClosingPr( + repoFullName: string, + issueNumber: number, + ): Promise { + const [owner, repo] = repoFullName.split("/"); + const token = await this.getTokenForRepo(repoFullName); + + const query = ` + query($owner: String!, $repo: String!, $issue: Int!) { + repository(owner: $owner, name: $repo) { + issue(number: $issue) { + closedAt + timelineItems(itemTypes: [CLOSED_EVENT], last: 20) { + nodes { + ... on ClosedEvent { + createdAt + stateReason + closer { + __typename + ... on PullRequest { + number + merged + state + baseRepository { nameWithOwner } + } + } + } + } + } + } + } + } + `; + + const res = await this.githubFetch("https://api.github.com/graphql", { + method: "POST", + headers: { + Authorization: `Bearer ${token}`, + "Content-Type": "application/json", + }, + body: JSON.stringify({ + query, + variables: { owner, repo, issue: issueNumber }, + }), + }); + + if (!res.ok) { + throw new Error( + `GraphQL issue closure fetch failed: ${res.status} ${await res.text()}`, + ); + } + + const body: any = await res.json(); + this.assertNoGraphQLErrors(body, "Issue closure fetch"); + + const issue = body.data?.repository?.issue; + if (!issue) return null; + + return this.selectClosingPrFromTimeline(repoFullName, issue); + } + + private selectClosingPrFromTimeline( + repoFullName: string, + issue: { + closedAt: string | null; + timelineItems?: { nodes?: any[] }; + }, + ): number | null { + const closedAt = issue.closedAt; + if (!closedAt) return null; + + const expectedRepo = repoFullName.toLowerCase(); + const nodes = issue.timelineItems?.nodes ?? []; + + // Walk newest to oldest, find the close event matching the issue's + // current closedAt. NOT_PLANNED closures (and anything else non-COMPLETED) + // don't attribute a solver. + for (let i = nodes.length - 1; i >= 0; i--) { + const ev = nodes[i]; + if (!ev) continue; + const stateReason = ev.stateReason; + if ( + stateReason != null && + String(stateReason).toUpperCase() !== "COMPLETED" + ) { + continue; + } + if (ev.createdAt !== closedAt) continue; + const closer = ev.closer; + if (!closer || closer.__typename !== "PullRequest") return null; + if ( + (closer.baseRepository?.nameWithOwner ?? "").toLowerCase() !== + expectedRepo + ) { + return null; + } + const merged = + closer.merged === true || + String(closer.state ?? "").toUpperCase() === "MERGED"; + if (!merged) return null; + return typeof closer.number === "number" ? closer.number : null; + } + return null; + } + // --- PR files + contents (REST for list, batched GraphQL for contents) --- /** @@ -606,11 +740,7 @@ export class GitHubFetcherService implements OnModuleInit { } const body: any = await res.json(); - if (body.errors) { - throw new Error( - `GraphQL content fetch errors: ${JSON.stringify(body.errors)}`, - ); - } + this.assertNoGraphQLErrors(body, "Content fetch"); const repoData = body.data?.repository ?? {}; @@ -786,9 +916,13 @@ export class GitHubFetcherService implements OnModuleInit { } const body: any = await res.json(); + this.assertNoGraphQLErrors(body, "Backfill PR fetch"); + const repoData: any = body.data?.repository; const page: any = repoData?.pullRequests; - if (!page) break; + if (!page) { + throw new Error(`Backfill PR GraphQL returned no pullRequests page`); + } // defaultBranchRef is the same across every page — write once. if (!defaultBranchWritten) { @@ -938,6 +1072,26 @@ export class GitHubFetcherService implements OnModuleInit { } } } + closureTimeline: timelineItems( + itemTypes: [CLOSED_EVENT] + last: 20 + ) { + nodes { + ... on ClosedEvent { + createdAt + stateReason + closer { + __typename + ... on PullRequest { + number + merged + state + baseRepository { nameWithOwner } + } + } + } + } + } } } } @@ -969,8 +1123,12 @@ export class GitHubFetcherService implements OnModuleInit { } const body: any = await res.json(); + this.assertNoGraphQLErrors(body, "Backfill issue fetch"); + const page: any = body.data?.repository?.issues; - if (!page) break; + if (!page) { + throw new Error(`Backfill issue GraphQL returned no issues page`); + } let shouldStop = false; for (const issue of page.nodes) { @@ -1006,9 +1164,13 @@ export class GitHubFetcherService implements OnModuleInit { issueData.isTransferred = true; } - if (issue.state === "OPEN") { - issueData.solvedByPr = null; - } + issueData.solvedByPr = + issue.state === "CLOSED" + ? this.selectClosingPrFromTimeline(repoFullName, { + closedAt: issue.closedAt ?? null, + timelineItems: { nodes: issue.closureTimeline?.nodes ?? [] }, + }) + : null; await this.issueRepo.upsert(issueData, ["repoFullName", "issueNumber"]); diff --git a/packages/das/src/webhook/handlers/installation.handler.ts b/packages/das/src/webhook/handlers/installation.handler.ts index 2638652..7eaf630 100644 --- a/packages/das/src/webhook/handlers/installation.handler.ts +++ b/packages/das/src/webhook/handlers/installation.handler.ts @@ -38,22 +38,19 @@ export class InstallationHandler { payload.repositories ?? payload.repositories_added ?? []; for (const repo of repos) { - // Check existence first so we only set added_at on insert, not on every - // re-fire of installation.created / installation_repositories.added. - const existing = await this.repoRepo.findOneBy({ - repoFullName: repo.full_name, - }); - if (existing) { - await this.repoRepo.update(repo.full_name, { - installationId: String(installationId), - }); - } else { - await this.repoRepo.insert({ + // Atomic upsert: insert with addedAt on first encounter; on conflict only + // update installationId so addedAt is never overwritten on re-fires. + await this.repoRepo + .createQueryBuilder() + .insert() + .into(Repo) + .values({ repoFullName: repo.full_name, installationId: String(installationId), addedAt: new Date().toISOString(), - }); - } + }) + .orUpdate(["installationId"], ["repoFullName"]) + .execute(); this.logger.log(`Tracking repo: ${repo.full_name}`); } diff --git a/packages/das/src/webhook/handlers/issue.handler.ts b/packages/das/src/webhook/handlers/issue.handler.ts index ac54368..a25ba8b 100644 --- a/packages/das/src/webhook/handlers/issue.handler.ts +++ b/packages/das/src/webhook/handlers/issue.handler.ts @@ -1,8 +1,11 @@ /* eslint-disable @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-argument, @typescript-eslint/no-unsafe-return, @typescript-eslint/no-unsafe-call, @typescript-eslint/no-explicit-any */ import { Injectable } from "@nestjs/common"; import { InjectRepository } from "@nestjs/typeorm"; +import { InjectQueue } from "@nestjs/bullmq"; import { Repository } from "typeorm"; +import { Queue } from "bullmq"; import { Issue, Repo } from "../../entities"; +import { FETCH_QUEUE, FETCH_JOBS } from "../../queue/constants"; @Injectable() export class IssueHandler { @@ -11,6 +14,8 @@ export class IssueHandler { private readonly issueRepo: Repository, @InjectRepository(Repo) private readonly repoRepo: Repository, + @InjectQueue(FETCH_QUEUE) + private readonly fetchQueue: Queue, ) {} async handle(payload: Record): Promise { @@ -54,8 +59,31 @@ export class IssueHandler { await this.issueRepo.upsert(data, ["repoFullName", "issueNumber"]); - await this.repoRepo.update(repoFullName, { + // Resolve solver attribution from the issue's ClosedEvent timeline. + // The same primitive feeds issue discovery and the issue-bounty solver + // lookup so the two paths stay 1:1. + if (payload.action === "closed") { + await this.fetchQueue.add( + FETCH_JOBS.ISSUE_CLOSURE, + { repoFullName, issueNumber: issue.number }, + { + jobId: `closure-${repoFullName}-${issue.number}`, + removeOnComplete: true, + removeOnFail: true, + attempts: 3, + backoff: { type: "exponential", delay: 5000 }, + }, + ); + } + + const repoUpdate: Partial = { lastEventAt: new Date().toISOString(), - }); + }; + const defaultBranch: string | null = + payload.repository?.default_branch ?? null; + if (defaultBranch) { + repoUpdate.defaultBranch = defaultBranch; + } + await this.repoRepo.update(repoFullName, repoUpdate); } } diff --git a/packages/das/src/webhook/handlers/pull-request.handler.ts b/packages/das/src/webhook/handlers/pull-request.handler.ts index 7213834..e99a531 100644 --- a/packages/das/src/webhook/handlers/pull-request.handler.ts +++ b/packages/das/src/webhook/handlers/pull-request.handler.ts @@ -53,9 +53,15 @@ export class PullRequestHandler { await this.prRepo.upsert(data, ["repoFullName", "prNumber"]); - await this.repoRepo.update(repoFullName, { + const repoUpdate: Partial = { lastEventAt: new Date().toISOString(), - }); + }; + const defaultBranch: string | null = + payload.repository?.default_branch ?? null; + if (defaultBranch) { + repoUpdate.defaultBranch = defaultBranch; + } + await this.repoRepo.update(repoFullName, repoUpdate); // Enqueue metadata fetch (closing issues + body + lastEditedAt) on relevant actions. // Also run on `edited` so post-merge body edits are captured. @@ -73,23 +79,30 @@ export class PullRequestHandler { { repoFullName, prNumber }, { jobId, - // Replace any pending job for the same PR (e.g. rapid pushes) + // Pending/active jobs for the same PR still dedupe by jobId. + // Don't retain failed jobs — they'd block future enqueues for this + // PR until the failed-set cap evicts them (#75). removeOnComplete: true, - removeOnFail: 50, + removeOnFail: true, attempts: 3, backoff: { type: "exponential", delay: 5000 }, }, ); } - // Enqueue diff fetch on open, push, or merge + // Enqueue diff fetch on open, push, merge, or base-branch retarget. + // GitHub sends `pull_request.edited` with `changes.base` when the base ref + // changes; stored pr_files were resolved against the old base and need a + // fresh fetch even when head_sha is unchanged. const diffActions = ["opened", "synchronize", "closed"]; + const isBaseRetarget = action === "edited" && payload.changes?.base != null; const shouldFetchDiff = - diffActions.includes(action) && (action !== "closed" || pr.merged); + (diffActions.includes(action) && (action !== "closed" || pr.merged)) || + isBaseRetarget; if (shouldFetchDiff) { - // Reset scoring flag on new pushes - if (action === "synchronize") { + // Reset scoring flag on new pushes or base retargets + if (action === "synchronize" || isBaseRetarget) { await this.prRepo.update( { repoFullName, prNumber }, { scoringDataStored: false }, @@ -110,7 +123,7 @@ export class PullRequestHandler { { jobId, removeOnComplete: true, - removeOnFail: 50, + removeOnFail: true, attempts: 3, backoff: { type: "exponential", delay: 5000 }, }, diff --git a/packages/das/src/webhook/webhook.service.ts b/packages/das/src/webhook/webhook.service.ts index ab4d707..aafa29d 100644 --- a/packages/das/src/webhook/webhook.service.ts +++ b/packages/das/src/webhook/webhook.service.ts @@ -91,6 +91,9 @@ export class WebhookService { } switch (event) { + case "repository": + await this.handleRepositoryEvent(payload); + break; case "pull_request": await this.pullRequestHandler.handle(payload); if (payload.action === "labeled" || payload.action === "unlabeled") { @@ -119,4 +122,22 @@ export class WebhookService { this.logger.debug(`Unhandled event type: ${event}`); } } + + private async handleRepositoryEvent( + payload: Record, + ): Promise { + const repoFullName: string | undefined = payload.repository?.full_name; + if (!repoFullName) return; + + const repoUpdate: Partial = { + lastEventAt: new Date().toISOString(), + }; + const defaultBranch: string | null = + payload.repository?.default_branch ?? null; + if (defaultBranch) { + repoUpdate.defaultBranch = defaultBranch; + } + + await this.repoRepo.update(repoFullName, repoUpdate); + } }