Skip to content

feat(api): add sequential batch ingestion endpoint (Phase 1)#143

Merged
ishaanxgupta merged 1 commit intoXortexAI:mainfrom
anirudhaacharyap:feat/batch-ingest
May 4, 2026
Merged

feat(api): add sequential batch ingestion endpoint (Phase 1)#143
ishaanxgupta merged 1 commit intoXortexAI:mainfrom
anirudhaacharyap:feat/batch-ingest

Conversation

@anirudhaacharyap
Copy link
Copy Markdown
Contributor

Description

Closes #132
This PR implements Phase 1 of the dedicated batch ingestion endpoint, allowing clients to send multiple conversation turns in a single HTTP request to significantly reduce network overhead.

As per the Phase 1 scope, this implementation focuses strictly on the API contract and sequential processing. Concurrency (asyncio.gather), semaphores, and partial success aggregation are deferred to a subsequent PR to keep this implementation minimal and easy to review.

Changes Made

  • Added BatchIngestRequest and BatchIngestResponse schemas to strictly validate batch payloads.
  • Implemented the POST /v1/memory/batch-ingest endpoint in src/api/routes/memory.py.
  • Reused the existing get_ingest_pipeline() to ensure domain extraction and storage logic remains completely uniform.
  • Added comprehensive unit tests in tests/test_batch_ingest.py to verify standard success workflows and schema validation.

Resolves #132

Verification

  • All existing tests pass.
  • New tests for batch-ingest pass.
  • Implementation uses a simple for loop with no try/except aggregation (strict Phase 1 alignment).

@ved015 ved015 self-assigned this May 2, 2026
@ishaanxgupta
Copy link
Copy Markdown
Member

ishaanxgupta commented May 2, 2026

@cursor review

@ishaanxgupta ishaanxgupta requested a review from ved015 May 2, 2026 09:34
@ishaanxgupta
Copy link
Copy Markdown
Member

ishaanxgupta commented May 2, 2026

@anirudhaacharyap Thanks for the PR, most of it looks good to me, just a small question how do you think we could handle the UPDATE step as memories get ingested parallelly in batch mode.

@anirudhaacharyap
Copy link
Copy Markdown
Contributor Author

Great question! The UPDATE step is exactly why I chose sequential processing for Phase 1 — it completely sidesteps the race condition since each item passes through the Judge agent one at a time, always seeing the latest state of memory.

For Phase 2 (parallel mode), I think we can handle this with a per-user locking strategy: use a keyed asyncio.Lock per user_id so that items for the same user are still processed sequentially (preserving safe Judge UPDATEs), while items across different users run in parallel via asyncio.gather. This covers the primary batch use case — bulk-importing a single user's chat history — without any risk of stale overwrites, and keeps the implementation straightforward.

from collections import defaultdict

_user_locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock)

async def _process_item(item, user_id, pipeline):
    async with _user_locks[user_id]:
        return await pipeline.run(...)

# Phase 2 batch endpoint would then do:
results = await asyncio.gather(*[_process_item(item, user_id, pipeline) for item in req.items])

@anirudhaacharyap
Copy link
Copy Markdown
Contributor Author

anirudhaacharyap commented May 2, 2026

I also had another idea

Queue System

The core challenge with parallel batch ingestion is ensuring the Judge agent always sees consistent memory state when deciding ADD/UPDATE/DELETE/NOOP. A queue naturally serializes writes per user while still allowing the API to respond immediately — giving us both correctness and responsiveness.

Benefits

  • Non-blocking API: The endpoint returns immediately with a job_id, so clients aren't waiting for 100 items to process synchronously.
  • Per-user ordering: By routing items to queues keyed by user_id, the Judge agent always evaluates against up-to-date memory — no race conditions.
  • Cross-user parallelism: Multiple Celery workers can process different users' batches simultaneously.
  • Retry & resilience: Failed items can be retried automatically without affecting the rest of the batch.
  • Scalability: Worker count can scale independently of the API server.

Implementation Plan

  1. Phase 1 (this PR): Sequential processing — already merged/ready.
  2. Phase 2: Add Redis + Celery as optional dependencies. Enqueue batch items into a per-user queue. Return 202 Accepted with a job_id.
  3. Phase 3: Add a GET /v1/memory/batch-ingest/{job_id}/status polling endpoint so clients can track progress and retrieve results.

Trade-offs

I want to be transparent about the downsides:

  • Introduces new infrastructure dependencies (Redis at minimum).
  • Adds operational complexity (running Celery workers alongside the API).
  • Requires a status polling mechanism since the response is no longer synchronous.

That said, XMem already uses MongoDB and Neo4j in its stack, so adding Redis isn't a huge leap — and the Docker Compose setup could bundle it cleanly.

Happy to hear your thoughts on whether this aligns with the project's direction, or if you'd prefer a lighter-weight approach for now!

@ishaanxgupta
Copy link
Copy Markdown
Member

Hi @anirudhaacharyap great research! I am happy with the user locking strategy that solves the issue for the case where memory is getting ingested for multiple users which is great.
For the single user itself how could we ensure some sort of parallel behaviour, one of the ideas I had was if we could do the ingestion phase (i.e the classification phase) parallelly and keep the judge phase sequential with the weaver phase again parallelly, this could save some time. what are your thoughts on this one?
That said global queue would still be needed as we could get ingestion requests from mcp, sdk, extension for the same user.

Please let me know your thoughts!

@anirudhaacharyap
Copy link
Copy Markdown
Contributor Author

Hi @ishaanxgupta , that’s a really great suggestion , I like the idea of splitting the pipeline into stages to balance performance and consistency.

The “fan-out → fan-in → fan-out” approach makes a lot of sense:

  • Classification / extraction (parallel): since this operates only on input text and doesn’t depend on shared memory state, we can safely parallelize this using something like asyncio.gather.
  • Judge phase (sequential): this becomes the critical section where ordering matters, so processing items one-by-one ensures we avoid race conditions when updating memory.
  • Weaver phase (parallel): once the judge has made decisions, we can parallelize the final writes as long as they don’t conflict on shared state.

For a single user, this staged approach seems like a clean way to introduce parallelism while still keeping updates deterministic.

Regarding the global queue, I agree it would still be important to coordinate ingestion requests coming from different sources (MCP, SDK, extension), especially to avoid overlapping updates for the same user.

My thought would be:

  • Keep this PR focused on the sequential baseline (Phase 1)
  • In the next iteration, introduce staged parallelism within the pipeline (starting with parallel classification using asyncio.gather)

Happy to take this up in a follow-up PR once this is merged.

@ishaanxgupta
Copy link
Copy Markdown
Member

Sure thanks for validating the approach lets merge this one first.
also please have a look on approximately how much time would we save in ingestion of a batch of 10 pairs

@ishaanxgupta ishaanxgupta merged commit 1dad58b into XortexAI:main May 4, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Feature Request: Add /v1/memory/batch-ingest API endpoint

3 participants