Context
src/queue.ts is under the 400-line module cap, but processUpload is a 170-line transaction script that concentrates ingestion concerns inside one nested callback.
processUpload currently handles:
- upload progress loading and idempotency (
src/queue.ts:116)
- R2 object/body retrieval (
src/queue.ts:125)
- parser selection for JSON vs NDJSON (
src/queue.ts:143)
- resume checkpoint math (
src/queue.ts:136)
- per-batch validation and text extraction (
src/queue.ts:157)
- dedupe checks against KV (
src/queue.ts:169)
- embedding generation (
src/queue.ts:194)
- ConversationRecord construction (
src/queue.ts:198)
- Vectorize/KV upsert and processed markers (
src/queue.ts:228)
- progress checkpointing and final status (
src/queue.ts:237, src/queue.ts:255)
- parse error accounting/logging (
src/queue.ts:246)
This is cohesive enough to tolerate today, but it is the backend ingestion path most likely to become fragile as formats, retries, v2 notebook behavior, or observability expand.
Recommendation
Extract small helpers while keeping the queue consumer behavior unchanged.
Suggested helpers:
loadUploadStream(env, r2Key)
selectConversationParser(fileName)
collectValidConversationItems(env, items)
buildConversationUpserts(validItems, embeddings, uploadId, context)
checkpointUploadProgress(env, uploadId, progress, updates)
finalizeUploadProgress(...)
Consider moving v1 upload ingestion into src/lib/ingestion-v1.ts, leaving src/queue.ts as message routing between v1 and v2 ingestion.
Acceptance criteria
handleIngestion remains the queue entrypoint.
- V1 upload ingestion behavior is unchanged for JSON arrays, JSON object maps, and NDJSON.
- Resume/checkpoint behavior remains unchanged.
- Existing stream-parser tests still pass.
- Add targeted tests for any extracted pure helpers, especially parser selection and record construction.
Verification baseline
Current baseline from the audit:
npm test passes: 12 tests across 2 files.
npm run type-check passes.
Context
src/queue.tsis under the 400-line module cap, butprocessUploadis a 170-line transaction script that concentrates ingestion concerns inside one nested callback.processUploadcurrently handles:src/queue.ts:116)src/queue.ts:125)src/queue.ts:143)src/queue.ts:136)src/queue.ts:157)src/queue.ts:169)src/queue.ts:194)src/queue.ts:198)src/queue.ts:228)src/queue.ts:237,src/queue.ts:255)src/queue.ts:246)This is cohesive enough to tolerate today, but it is the backend ingestion path most likely to become fragile as formats, retries, v2 notebook behavior, or observability expand.
Recommendation
Extract small helpers while keeping the queue consumer behavior unchanged.
Suggested helpers:
loadUploadStream(env, r2Key)selectConversationParser(fileName)collectValidConversationItems(env, items)buildConversationUpserts(validItems, embeddings, uploadId, context)checkpointUploadProgress(env, uploadId, progress, updates)finalizeUploadProgress(...)Consider moving v1 upload ingestion into
src/lib/ingestion-v1.ts, leavingsrc/queue.tsas message routing between v1 and v2 ingestion.Acceptance criteria
handleIngestionremains the queue entrypoint.Verification baseline
Current baseline from the audit:
npm testpasses: 12 tests across 2 files.npm run type-checkpasses.