Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ The repository runs an automated maintainer agent that may close PRs in the foll
- Unresolved merge conflicts for 12+ hours with no resolution push
- Requested changes from a maintainer for 12+ hours with no follow-up commits

## Automatic Closures

The maintainer bot enforces these rules without manual review. Contributions that violate them are closed automatically.

### Open item limits

Each contributor may have at most **2 open PRs** and **2 open issues** in this repository at any time. Submitting a 3rd of either type while at the cap closes the new one on submission. The limits apply independently — you can have 2 open PRs and 2 open issues at the same time.

## PR Labels

Apply appropriate labels to help categorize and track your contribution:
Expand Down
5 changes: 4 additions & 1 deletion packages/das/src/api/health.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { ApiOperation, ApiTags } from "@nestjs/swagger";
import { InjectQueue } from "@nestjs/bullmq";
import { InjectRepository } from "@nestjs/typeorm";
import { Queue } from "bullmq";
import { DataSource, Repository } from "typeorm";
import { DataSource, Not, IsNull, Repository } from "typeorm";
import { NoCache } from "../cache";
import { Repo } from "../entities";
import { FETCH_QUEUE } from "../queue/constants";
Expand Down Expand Up @@ -106,7 +106,10 @@ export class HealthController {
}

private async listRepoHealth(): Promise<RepoHealth[]> {
// Soft-cleared rows (installationId=null after uninstall/remove) are kept
// for historical scoring evidence but are no longer tracked.
const repos = await this.repoRepo.find({
where: { installationId: Not(IsNull()) },
select: ["repoFullName", "lastEventAt"],
});

Expand Down
7 changes: 7 additions & 0 deletions packages/das/src/api/miners/miners.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ const ISSUE_SELECT_COLUMNS = `
'head_sha', sp.head_sha,
'base_sha', sp.base_sha,
'merge_base_sha', sp.merge_base_sha,
'base_ref', sp.base_ref,
'head_ref', sp.head_ref,
'head_repo_full_name', LOWER(sp.head_repo_full_name),
'default_branch', sr.default_branch,
'labels', COALESCE((
SELECT json_agg(json_build_object(
'name', plt.label_name,
Expand All @@ -152,10 +156,13 @@ const ISSUE_SELECT_COLUMNS = `
LEFT JOIN pr_review_summary rs
ON rs.repo_full_name = sp.repo_full_name
AND rs.pr_number = sp.pr_number
LEFT JOIN repos sr
ON sr.repo_full_name = sp.repo_full_name
WHERE sp.repo_full_name = i.repo_full_name
AND sp.pr_number = i.solved_by_pr
-- Skip null-author solving PRs (no one to credit)
AND sp.author_github_id IS NOT NULL
AND BTRIM(sp.author_github_id) <> ''
-- Skip corrupted MERGED-without-merged_at shape
AND NOT (sp.state = 'MERGED' AND sp.merged_at IS NULL)
) AS solving_pr`;
Expand Down
1 change: 1 addition & 0 deletions packages/das/src/queue/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export const FETCH_JOBS = {
PR_METADATA: "fetch-pr-metadata",
PR_FILES: "fetch-pr-files",
BACKFILL_REPO: "backfill-repo",
ISSUE_CLOSURE: "fetch-issue-closure",
} as const;

export const DEFAULT_BACKFILL_DAYS = 40;
Expand Down
82 changes: 64 additions & 18 deletions packages/das/src/queue/fetch.processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,21 @@ export interface BackfillRepoJobData {
days?: number;
}

export interface IssueClosureJobData {
repoFullName: string;
issueNumber: number;
}

interface PrFilesGeneration {
headSha: string | null;
baseSha: string | null;
}

type JobData = PrMetadataJobData | PrFilesJobData | BackfillRepoJobData;
type JobData =
| PrMetadataJobData
| PrFilesJobData
| BackfillRepoJobData
| IssueClosureJobData;

@Processor(FETCH_QUEUE, { concurrency: 5 })
export class FetchProcessor extends WorkerHost {
Expand Down Expand Up @@ -68,11 +77,46 @@ export class FetchProcessor extends WorkerHost {
await this.handleBackfill(repoFullName, days ?? DEFAULT_BACKFILL_DAYS);
break;
}
case FETCH_JOBS.ISSUE_CLOSURE: {
const { repoFullName, issueNumber } = job.data as IssueClosureJobData;
await this.handleIssueClosure(repoFullName, issueNumber);
break;
}
default:
this.logger.warn(`Unknown job name: ${job.name}`);
}
}

private async handleIssueClosure(
repoFullName: string,
issueNumber: number,
): Promise<void> {
this.logger.log(`Resolving closer for ${repoFullName}#${issueNumber}`);

const issue = await this.issueRepo.findOneBy({
repoFullName,
issueNumber,
});
if (!issue) return;

// Reopens already null out solvedByPr in the webhook handler; never
// re-fetch for an open issue.
if (issue.state !== "CLOSED") {
await this.issueRepo.update(
{ repoFullName, issueNumber },
{ solvedByPr: null },
);
return;
}

const solvedByPr = await this.fetcher.fetchIssueClosingPr(
repoFullName,
issueNumber,
);

await this.issueRepo.update({ repoFullName, issueNumber }, { solvedByPr });
}

private async handlePrMetadata(
repoFullName: string,
prNumber: number,
Expand All @@ -81,26 +125,22 @@ export class FetchProcessor extends WorkerHost {

const { closingIssueNumbers, body, lastEditedAt } =
await this.fetcher.fetchPrMetadata(repoFullName, prNumber);
const currentClosingIssueNumbers =
this.uniqueIssueNumbers(closingIssueNumbers);

await this.prRepo.update(
{ repoFullName, prNumber },
{
closingIssueNumbers,
closingIssueNumbers: currentClosingIssueNumbers,
body,
lastEditedAt,
},
);

// If this PR is merged, mark each linked issue as solved_by_pr
const pr = await this.prRepo.findOneBy({ repoFullName, prNumber });
if (pr?.state === "MERGED" && closingIssueNumbers.length > 0) {
for (const issueNumber of closingIssueNumbers) {
await this.issueRepo.update(
{ repoFullName, issueNumber },
{ solvedByPr: prNumber },
);
}
}
// Issue solver attribution is closure-driven (ISSUE_CLOSURE jobs read
// ClosedEvent.closer). PR metadata only refreshes the PR-side text view
// of which issues this PR claims to close — it never writes
// issues.solved_by_pr.
}

private async handlePrFiles(data: PrFilesJobData): Promise<void> {
Expand Down Expand Up @@ -139,6 +179,10 @@ export class FetchProcessor extends WorkerHost {
);
this.logger.log(`Backfilled ${prs.length} PRs from ${repoFullName}`);

// Fetch and upsert issues before PR metadata jobs can link solved_by_pr.
await this.fetcher.backfillIssues(repoFullName, sinceDate);
this.logger.log(`Backfilled issues from ${repoFullName}`);

// Enqueue follow-up jobs (metadata + files for every PR).
for (const { prNumber, headSha, baseSha } of prs) {
await this.fetchQueue.add(
Expand All @@ -147,7 +191,9 @@ export class FetchProcessor extends WorkerHost {
{
jobId: `meta-${repoFullName}-${prNumber}`,
removeOnComplete: true,
removeOnFail: 50,
// Match the webhook handler — failed metadata jobs must not squat
// on the stable per-PR jobId (#75).
removeOnFail: true,
attempts: 3,
backoff: { type: "exponential", delay: 5000 },
},
Expand All @@ -160,10 +206,6 @@ export class FetchProcessor extends WorkerHost {
baseSha ?? null,
);
}

// Fetch and upsert issues
await this.fetcher.backfillIssues(repoFullName, sinceDate);
this.logger.log(`Backfilled issues from ${repoFullName}`);
}

private async handleStalePrFilesJob(
Expand Down Expand Up @@ -203,7 +245,7 @@ export class FetchProcessor extends WorkerHost {
expectedBaseSha,
),
removeOnComplete: true,
removeOnFail: 50,
removeOnFail: true,
attempts: 3,
backoff: { type: "exponential", delay: 5000 },
},
Expand All @@ -222,4 +264,8 @@ export class FetchProcessor extends WorkerHost {
baseSha: generation.baseSha ?? IsNull(),
};
}

private uniqueIssueNumbers(issueNumbers: number[]): number[] {
return [...new Set(issueNumbers)];
}
}
Loading
Loading