diff --git a/cmd/server/main.go b/cmd/server/main.go index 4c9b284..697ecb9 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -37,6 +37,7 @@ import ( "github.com/hallelx2/vectorless-engine/pkg/queue" "github.com/hallelx2/vectorless-engine/pkg/retrieval" "github.com/hallelx2/vectorless-engine/pkg/storage" + "github.com/hallelx2/vectorless-engine/pkg/tree" "github.com/hallelx2/vectorless-engine/internal/config" "github.com/hallelx2/vectorless-engine/internal/handler" @@ -133,7 +134,7 @@ func run() error { if err != nil { return fmt.Errorf("init llm: %w", err) } - strategy := buildStrategy(cfg.Engine.Retrieval, llmClient, store) + strategy := buildStrategy(cfg.Engine.Retrieval, llmClient, store, pool) // Wrap with caching if enabled in engine config. if cfg.Engine.Retrieval.Cache.Enabled { @@ -154,6 +155,44 @@ func run() error { // Multi-document query dispatcher. multiDoc := retrieval.NewMultiDoc(strategy, pool.LoadTree) + // Pre-built set of selectable strategies, keyed by config name. + // Backs the per-request "strategy" override on /v1/query so the + // benchmark can A/B chunked-tree vs pageindex against this same + // running engine without a redeploy. Built from the raw client so + // each override behaves identically to booting with that strategy + // as the default (no shared cache wrapper across overrides). + strategies := buildStrategySet(cfg.Engine.Retrieval, llmClient, store, pool) + + // Replay store: every /v1/answer and /v1/answer/pageindex response + // is stamped with a deterministic trace_token and its body bytes + // persisted here so /v1/replay can return them verbatim. On by + // default; operators opt out via retrieval.replay.enabled=false. + var replayStore retrieval.ReplayStore + if cfg.Engine.Retrieval.Replay.Enabled { + replayStore = retrieval.NewLRUReplayStore(retrieval.LRUReplayConfig{ + MaxEntries: cfg.Engine.Retrieval.Replay.MaxEntries, + TTL: time.Duration(cfg.Engine.Retrieval.Replay.TTLSeconds) * time.Second, + }) + logger.Info("retrieval: replay store enabled", + "max_entries", cfg.Engine.Retrieval.Replay.MaxEntries, + "ttl_seconds", cfg.Engine.Retrieval.Replay.TTLSeconds, + ) + } + + // /v1/answer/pageindex gets its OWN PageIndexStrategy instance, + // independent of whatever selection strategy retrieval.strategy + // chose, so the endpoint is always available (gated by + // retrieval.pageindex.enabled) even on a chunked-tree deployment. + var pageIndexStrategy *retrieval.PageIndexStrategy + if cfg.Engine.Retrieval.PageIndex.Enabled && llmClient != nil { + pageIndexStrategy = buildPageIndexStrategy(cfg.Engine.Retrieval, llmClient, store, pool) + logger.Info("retrieval: pageindex answer endpoint enabled", + "max_hops", pageIndexStrategy.MaxHops, + "page_content_limit", pageIndexStrategy.PageContentLimit, + "model_override", cfg.Engine.Retrieval.PageIndex.Model, + ) + } + // ── Ingest pipeline ─────────────────────────────────────────── pipeline := ingest.NewPipeline(ingest.Pipeline{ DB: pool, @@ -201,14 +240,22 @@ func run() error { // Only start the HTTP server in "server" role. if *role == "server" { deps := handler.Deps{ - Logger: logger, - DB: pool, - Storage: store, - Queue: q, - Strategy: strategy, - MultiDoc: multiDoc, - Version: version, - Config: cfg, + Logger: logger, + DB: pool, + Storage: store, + Queue: q, + Strategy: strategy, + MultiDoc: multiDoc, + Version: version, + Config: cfg, + Strategies: strategies, + LLM: llmClient, + LLMModel: modelFor(cfg.Engine.LLM), + AnswerSpan: cfg.Engine.Retrieval.AnswerSpan, + Answer: cfg.Engine.Retrieval.Answer, + Replay: replayStore, + PageIndexStrategy: pageIndexStrategy, + PageIndex: cfg.Engine.Retrieval.PageIndex, } srv := &http.Server{ @@ -324,6 +371,21 @@ func buildQueue(c enginecfg.QueueConfig, dbURL string) (queue.Queue, error) { } } +// modelFor returns the configured chat/general-purpose model name for +// the selected LLM driver. Used as the engine-default fallback when an +// API request omits an explicit model (answer + answer/pageindex). +func modelFor(c enginecfg.LLMConfig) string { + switch c.Driver { + case "anthropic": + return c.Anthropic.Model + case "openai": + return c.OpenAI.Model + case "gemini": + return c.Gemini.Model + } + return "" +} + func buildLLM(c enginecfg.LLMConfig) (llmgate.Client, error) { switch c.Driver { case "anthropic": @@ -349,7 +411,11 @@ func buildLLM(c enginecfg.LLMConfig) (llmgate.Client, error) { } } -func buildStrategy(c enginecfg.RetrievalConfig, client llmgate.Client, store storage.Storage) retrieval.Strategy { +// buildStrategy constructs the retrieval strategy named by +// retrieval.strategy. The DB pool is threaded through so the +// pageindex strategy can wire a TOC provider that reads +// documents.toc_tree (the other strategies ignore it). +func buildStrategy(c enginecfg.RetrievalConfig, client llmgate.Client, store storage.Storage, pool *db.Pool) retrieval.Strategy { switch c.Strategy { case "single-pass": return retrieval.NewSinglePass(client) @@ -362,11 +428,109 @@ func buildStrategy(c enginecfg.RetrievalConfig, client llmgate.Client, store sto } a.ModelOverride = c.Agentic.Model return a + case "pageindex": + return buildPageIndexStrategy(c, client, store, pool) default: return retrieval.NewChunkedTree(client) } } +// buildStrategySet pre-builds one instance of every selectable +// strategy, keyed by its config name. The deployed /v1/query handler +// uses this map to honour a per-request "strategy" override without +// rebuilding a strategy on the hot path: selection is a map lookup. +// +// This is what lets the benchmark A/B chunked-tree vs pageindex +// against the SAME running engine — no redeploy, no config flip. The +// caps (agentic max-hops, pageindex page limits, model overrides) come +// from the same config blocks the default builder reads, so an +// override behaves identically to booting with that strategy as the +// default. +func buildStrategySet(c enginecfg.RetrievalConfig, client llmgate.Client, store storage.Storage, pool *db.Pool) map[string]retrieval.Strategy { + agentic := retrieval.NewAgentic(client, storageFetcher{s: store}) + if c.Agentic.MaxHops > 0 { + agentic.MaxHops = c.Agentic.MaxHops + } + agentic.ModelOverride = c.Agentic.Model + + return map[string]retrieval.Strategy{ + "single-pass": retrieval.NewSinglePass(client), + "chunked-tree": retrieval.NewChunkedTree(client), + "agentic": agentic, + "pageindex": buildPageIndexStrategy(c, client, store, pool), + } +} + +// buildPageIndexStrategy constructs the page-based agentic strategy +// with the storage-backed PageLoader, a DB-backed TOC provider, and +// the configured caps. Ported from cmd/engine so the DEPLOYED +// cmd/server binary can serve retrieval.strategy=pageindex AND the +// /v1/answer/pageindex endpoint. +// +// The TOC provider reads documents.toc_tree via the worker-scoped +// document lookup. The strategy degrades to its synthesised view +// (built from the loaded section tree) whenever the column is NULL or +// the read errors, so a document ingested before the TOC builder ran +// still navigates cleanly. +func buildPageIndexStrategy(c enginecfg.RetrievalConfig, client llmgate.Client, store storage.Storage, pool *db.Pool) *retrieval.PageIndexStrategy { + p := retrieval.NewPageIndexStrategy(client) + p.PageLoader = storagePageLoader{s: store} + if pool != nil { + p.TOC = dbTOCProvider{db: pool} + } + if c.PageIndex.MaxHops > 0 { + p.MaxHops = c.PageIndex.MaxHops + } + if c.PageIndex.PageContentLimit > 0 { + p.PageContentLimit = c.PageIndex.PageContentLimit + } + p.ModelOverride = c.PageIndex.Model + return p +} + +// storagePageLoader adapts a storage.Storage to +// retrieval.PageContentLoader. Mirrors storageFetcher but lives behind +// a separate interface so the two callers (agentic / pageindex) can be +// wired independently. The PageIndex strategy materialises section +// bodies once per get_pages observation, so reading the full reader +// into a []byte is the right shape. +type storagePageLoader struct{ s storage.Storage } + +func (l storagePageLoader) Load(ctx context.Context, ref string) ([]byte, error) { + rc, _, err := l.s.Get(ctx, ref) + if err != nil { + return nil, err + } + defer rc.Close() + return io.ReadAll(rc) +} + +// dbTOCProvider adapts the DB pool to retrieval.TOCProvider. It reads +// the persisted documents.toc_tree JSONB and returns it verbatim for +// the get_document_structure tool. A NULL column (the "not yet +// generated" state) surfaces as retrieval.ErrNoTOC, which the strategy +// treats as a graceful-degrade signal: it synthesises the TOC view +// from the section tree instead of failing the request. +// +// GetTOC carries only a document ID (the TOCProvider contract), so the +// lookup uses the worker-scoped accessor. That is safe here: the +// caller has already resolved + authorised the tree for this document +// via the org-scoped LoadTree before the strategy ever calls GetTOC, +// and the TOC tree is the same structural metadata (titles + page +// ranges, no bodies) already present on that authorised tree. +type dbTOCProvider struct{ db *db.Pool } + +func (p dbTOCProvider) GetTOC(ctx context.Context, docID tree.DocumentID) ([]byte, error) { + doc, err := p.db.GetDocumentForWorker(ctx, docID) + if err != nil { + return nil, err + } + if len(doc.TOCTree) == 0 { + return nil, retrieval.ErrNoTOC + } + return doc.TOCTree, nil +} + // storageFetcher adapts a storage.Storage to retrieval.ContentFetcher. // The agentic strategy reads section bodies one at a time, so we // materialize the full reader contents into a []byte here rather than diff --git a/internal/handler/answer.go b/internal/handler/answer.go new file mode 100644 index 0000000..b9f1626 --- /dev/null +++ b/internal/handler/answer.go @@ -0,0 +1,403 @@ +package handler + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "log/slog" + "net/http" + "strings" + "sync" + "time" + + "github.com/hallelx2/llmgate" + + enginecfg "github.com/hallelx2/vectorless-engine/pkg/config" + "github.com/hallelx2/vectorless-engine/pkg/db" + "github.com/hallelx2/vectorless-engine/pkg/retrieval" + "github.com/hallelx2/vectorless-engine/pkg/storage" + "github.com/hallelx2/vectorless-engine/pkg/tree" +) + +// AnswerHandler implements POST /v1/answer: retrieval + per-section +// answer-span extraction + a synthesis LLM call, returning a +// quote-grounded answer plus citations in one round-trip. Every +// citation carries a section ID, page range (when known), and the +// verbatim quote the answer relies on. +// +// Ported from cmd/engine's internal/api.handleAnswer, adapted to the +// deployed server's multi-tenant model: the org + store come from the +// X-Vectorless-Org / X-Vectorless-Store headers rather than the +// standalone nil-UUID org. +type AnswerHandler struct { + logger *slog.Logger + db *db.Pool + storage storage.Storage + strategy retrieval.Strategy + llm llmgate.Client + llmModel string + answerSpan enginecfg.AnswerSpanBlock + answer enginecfg.AnswerBlock + replay retrieval.ReplayStore +} + +// NewAnswerHandler creates an AnswerHandler. llm may be nil, in which +// case the endpoint returns 501; replay may be nil, which skips +// replay capture. +func NewAnswerHandler( + logger *slog.Logger, + pool *db.Pool, + store storage.Storage, + strategy retrieval.Strategy, + llm llmgate.Client, + llmModel string, + answerSpan enginecfg.AnswerSpanBlock, + answer enginecfg.AnswerBlock, + replay retrieval.ReplayStore, +) *AnswerHandler { + return &AnswerHandler{ + logger: logger, + db: pool, + storage: store, + strategy: strategy, + llm: llm, + llmModel: llmModel, + answerSpan: answerSpan, + answer: answer, + replay: replay, + } +} + +// answerRequest is the JSON body for POST /v1/answer. +type answerRequest struct { + DocumentID tree.DocumentID `json:"document_id"` + Query string `json:"query"` + Model string `json:"model"` + MaxTokens int `json:"max_tokens"` + ReservedForPrompt int `json:"reserved_for_prompt"` + MaxParallelCalls int `json:"max_parallel_calls"` + MaxSections int `json:"max_sections"` + MaxAnswerTokens int `json:"max_answer_tokens"` +} + +// HandleAnswer runs retrieval, extracts a grounding quote per +// returned section, synthesises a final answer, and returns it with +// per-section citations. +func (h *AnswerHandler) HandleAnswer(w http.ResponseWriter, r *http.Request) { + orgID, ok := requireOrgID(w, r) + if !ok { + return + } + if h.llm == nil { + writeErr(w, http.StatusNotImplemented, "answer endpoint requires an LLM client") + return + } + if h.strategy == nil { + writeErr(w, http.StatusServiceUnavailable, "no retrieval strategy configured") + return + } + + var body answerRequest + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + writeErr(w, http.StatusBadRequest, "invalid json: "+err.Error()) + return + } + if body.DocumentID == "" || body.Query == "" { + writeErr(w, http.StatusBadRequest, "document_id and query are required") + return + } + + t, err := h.db.LoadTree(r.Context(), body.DocumentID, orgID, storeID(r)) + if err != nil { + if errors.Is(err, db.ErrNotFound) { + writeErr(w, http.StatusNotFound, "document not found") + return + } + writeErr(w, http.StatusInternalServerError, err.Error()) + return + } + + budget := retrieval.ContextBudget{ + ModelName: body.Model, + MaxTokens: body.MaxTokens, + ReservedForPrompt: body.ReservedForPrompt, + MaxParallelCalls: body.MaxParallelCalls, + } + if budget.MaxTokens == 0 { + budget.MaxTokens = 100000 + } + if budget.ReservedForPrompt == 0 { + budget.ReservedForPrompt = 4000 + } + if budget.MaxParallelCalls == 0 { + budget.MaxParallelCalls = 8 + } + + started := time.Now() + totalUsage := retrieval.Usage{} + + ids, retrievalUsage, err := h.runSelection(r.Context(), t, body.Query, budget) + if err != nil { + h.logger.Error("answer: strategy failed", "err", err, "document_id", body.DocumentID) + writeErr(w, http.StatusInternalServerError, "retrieval failed: "+err.Error()) + return + } + totalUsage.Add(retrievalUsage) + + maxSections := body.MaxSections + if maxSections <= 0 { + maxSections = h.answer.MaxSections + } + if maxSections <= 0 { + maxSections = 5 + } + if len(ids) > maxSections { + ids = ids[:maxSections] + } + + // Load each section's content. + enriched := make([]answerSection, 0, len(ids)) + for _, id := range ids { + sec := t.FindByID(id) + if sec == nil { + continue + } + var content string + if sec.ContentRef != "" { + rc, _, getErr := h.storage.Get(r.Context(), sec.ContentRef) + if getErr == nil { + raw, _ := io.ReadAll(rc) + rc.Close() + content = string(raw) + } + } + enriched = append(enriched, answerSection{sec: sec, content: content}) + } + + // Always extract spans for /v1/answer — they ground each citation. + spanExtractor := h.spanExtractor(body.Model) + runAnswerSpansConcurrent(r.Context(), spanExtractor, body.Query, enriched, h.answerSpan.MaxConcurrency, h.logger) + + // Synthesise the final answer from the retrieved evidence. + synthModel := h.answer.Model + if synthModel == "" { + synthModel = body.Model + } + if synthModel == "" { + synthModel = h.llmModel + } + maxAnswerTokens := body.MaxAnswerTokens + if maxAnswerTokens <= 0 { + maxAnswerTokens = h.answer.MaxAnswerTokens + } + if maxAnswerTokens <= 0 { + maxAnswerTokens = 1024 + } + + answerText, synthUsage, err := synthesiseAnswer(r.Context(), h.llm, synthModel, body.Query, enriched, maxAnswerTokens) + if err != nil { + writeErr(w, http.StatusInternalServerError, "synthesis failed: "+err.Error()) + return + } + totalUsage.Add(synthUsage) + + citations := make([]map[string]any, 0, len(enriched)) + finalIDs := make([]tree.SectionID, 0, len(enriched)) + for _, e := range enriched { + finalIDs = append(finalIDs, e.sec.ID) + c := map[string]any{ + "section_id": e.sec.ID, + "title": e.sec.Title, + } + if e.sec.PageStart > 0 { + c["page_start"] = e.sec.PageStart + } + if e.sec.PageEnd > 0 { + c["page_end"] = e.sec.PageEnd + } + if e.span != nil && e.span.Text != "" { + c["quote"] = e.span.Text + if e.span.Start >= 0 && e.span.End > e.span.Start { + c["quote_start"] = e.span.Start + c["quote_end"] = e.span.End + } + } + citations = append(citations, c) + } + + // Trace token hashes over the final IDs that ground the answer + + // the synthesis model. Different synth models for the same + // retrieval set produce different answers and therefore different + // tokens. + traceToken := retrieval.ComputeTraceToken(body.DocumentID, "1", synthModel, finalIDs) + + resp := map[string]any{ + "document_id": body.DocumentID, + "query": body.Query, + "answer": answerText, + "citations": citations, + "strategy": h.strategy.Name(), + "model": synthModel, + "usage": map[string]any{ + "input_tokens": totalUsage.InputTokens, + "output_tokens": totalUsage.OutputTokens, + "total_tokens": totalUsage.TotalTokens, + "cost_usd": totalUsage.CostUSD, + "llm_calls": totalUsage.LLMCalls, + }, + "elapsed_ms": time.Since(started).Milliseconds(), + "trace_token": traceToken, + } + + raw, err := marshalJSONForReplay(resp) + if err != nil { + writeJSON(w, http.StatusOK, resp) + return + } + writeJSONWithReplay(w, h.replay, http.StatusOK, raw, traceToken, retrieval.ReplayEntry{ + DocumentID: body.DocumentID, + Query: body.Query, + Model: synthModel, + SelectedIDs: finalIDs, + }) +} + +// runSelection picks section IDs for the query, surfacing cost when +// the strategy implements CostStrategy. +func (h *AnswerHandler) runSelection(ctx context.Context, t *tree.Tree, query string, budget retrieval.ContextBudget) ([]tree.SectionID, retrieval.Usage, error) { + if cs, ok := h.strategy.(retrieval.CostStrategy); ok { + res, err := cs.SelectWithCost(ctx, t, query, budget) + if err != nil { + return nil, retrieval.Usage{}, err + } + if res == nil { + return nil, retrieval.Usage{}, nil + } + return res.SelectedIDs, res.Usage, nil + } + ids, err := h.strategy.Select(ctx, t, query, budget) + if err != nil { + return nil, retrieval.Usage{}, err + } + return ids, retrieval.Usage{}, nil +} + +// spanExtractor builds a SpanExtractor honouring the configured model +// override, with a fall-through to the request's model then the +// engine default. +func (h *AnswerHandler) spanExtractor(requestModel string) *retrieval.SpanExtractor { + model := h.answerSpan.Model + if model == "" { + model = requestModel + } + if model == "" { + model = h.llmModel + } + ext := retrieval.NewSpanExtractor(h.llm, model) + if h.answerSpan.MaxQuoteLen > 0 { + ext.MaxQuoteLen = h.answerSpan.MaxQuoteLen + } + return ext +} + +// answerSection bundles a tree section with its loaded content and +// the extracted answer span. Shared by /v1/answer. +type answerSection struct { + sec *tree.Section + content string + span *retrieval.AnswerSpan +} + +// runAnswerSpansConcurrent fans out span extraction across secs with a +// max-concurrency semaphore. Each extraction's outcome is written back +// into the matching slot's span field. Errors are logged and dropped — +// span extraction is best-effort. +func runAnswerSpansConcurrent(ctx context.Context, extractor *retrieval.SpanExtractor, query string, secs []answerSection, maxConcurrency int, logger *slog.Logger) { + if maxConcurrency <= 0 { + maxConcurrency = 4 + } + sem := make(chan struct{}, maxConcurrency) + var wg sync.WaitGroup + for i := range secs { + i := i + if strings.TrimSpace(secs[i].content) == "" { + continue + } + wg.Add(1) + go func() { + defer wg.Done() + select { + case sem <- struct{}{}: + defer func() { <-sem }() + case <-ctx.Done(): + return + } + span, _, err := extractor.Extract(ctx, secs[i].content, query) + if err != nil { + if logger != nil { + logger.Warn("answer-span: extract failed", "section_id", secs[i].sec.ID, "err", err) + } + return + } + secs[i].span = span + }() + } + wg.Wait() +} + +// synthesiseAnswer runs one LLM call producing the final answer from +// retrieved sections + their extracted spans. The model is told to +// cite by section ID. +func synthesiseAnswer(ctx context.Context, client llmgate.Client, model, query string, secs []answerSection, maxAnswerTokens int) (string, retrieval.Usage, error) { + var b strings.Builder + b.WriteString("You are answering a user's question using ONLY the evidence below.\n\n") + b.WriteString("User query:\n") + b.WriteString(query) + b.WriteString("\n\nRetrieved evidence (each block is a section of the document):\n") + for i, e := range secs { + fmt.Fprintf(&b, "\n[%d] section_id=%s, title=%q", i+1, e.sec.ID, e.sec.Title) + if e.sec.PageStart > 0 { + fmt.Fprintf(&b, ", pages=%d-%d", e.sec.PageStart, e.sec.PageEnd) + } + b.WriteString("\n") + if e.span != nil && e.span.Text != "" { + fmt.Fprintf(&b, "Most relevant quote: %q\n", e.span.Text) + } + // Always include some content so the model isn't blind when the + // span extractor returned nothing. + if e.content != "" { + snippet := e.content + if len(snippet) > 4000 { + snippet = snippet[:4000] + } + fmt.Fprintf(&b, "Section content:\n%s\n", snippet) + } + } + b.WriteString("\nWrite a concise answer to the user's query. ") + b.WriteString("If the evidence does not contain an answer, say so. ") + b.WriteString("Inline citations should reference the section_id values shown above. ") + b.WriteString("Output plain prose; no JSON.") + + req := llmgate.Request{ + Model: model, + Messages: []llmgate.Message{ + {Role: llmgate.RoleSystem, Content: "You synthesise grounded answers from retrieved document sections. Never invent facts; only cite what the evidence shows."}, + {Role: llmgate.RoleUser, Content: b.String()}, + }, + MaxTokens: maxAnswerTokens, + Temperature: 0, + } + resp, err := client.Complete(ctx, req) + if err != nil { + return "", retrieval.Usage{}, err + } + return strings.TrimSpace(resp.Content), retrieval.Usage{ + InputTokens: resp.Usage.InputTokens, + OutputTokens: resp.Usage.OutputTokens, + TotalTokens: resp.Usage.TotalTokens, + CostUSD: resp.Usage.CostUSD, + LLMCalls: 1, + }, nil +} diff --git a/internal/handler/answer_pageindex.go b/internal/handler/answer_pageindex.go new file mode 100644 index 0000000..fb207f0 --- /dev/null +++ b/internal/handler/answer_pageindex.go @@ -0,0 +1,471 @@ +package handler + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "log/slog" + "net/http" + "sort" + "strings" + "sync" + "time" + + "github.com/hallelx2/llmgate" + + enginecfg "github.com/hallelx2/vectorless-engine/pkg/config" + "github.com/hallelx2/vectorless-engine/pkg/db" + "github.com/hallelx2/vectorless-engine/pkg/retrieval" + "github.com/hallelx2/vectorless-engine/pkg/storage" + "github.com/hallelx2/vectorless-engine/pkg/tree" +) + +// AnswerPageIndexHandler implements POST /v1/answer/pageindex: it runs +// the PageIndex agentic loop end-to-end and returns the model's answer +// plus page-grounded citations in one round-trip. Unlike /v1/answer, +// the loop owns the answer — there is no separate synthesis call. +// +// Ported from cmd/engine's internal/api.handleAnswerPageIndex, adapted +// to the deployed server's multi-tenant model (org + store from +// headers). +type AnswerPageIndexHandler struct { + logger *slog.Logger + db *db.Pool + storage storage.Storage + llm llmgate.Client + llmModel string + answerSpan enginecfg.AnswerSpanBlock + replay retrieval.ReplayStore + strategy *retrieval.PageIndexStrategy + pageIndex enginecfg.PageIndexBlock + + // treeLoader is a test seam overriding how the handler resolves + // the document tree. Nil routes through the org-scoped DB lookup + // (the production path). Tests set it to a deterministic in-memory + // function so the handler can run end-to-end via httptest without + // a real Postgres backend. + treeLoader func(ctx context.Context, orgID, storeID string, docID tree.DocumentID) (*tree.Tree, error) +} + +// NewAnswerPageIndexHandler creates an AnswerPageIndexHandler. llm, +// replay, and strategy may be nil; a nil llm or strategy (or +// PageIndex.Enabled=false) makes the endpoint return 501. +func NewAnswerPageIndexHandler( + logger *slog.Logger, + pool *db.Pool, + store storage.Storage, + llm llmgate.Client, + llmModel string, + answerSpan enginecfg.AnswerSpanBlock, + replay retrieval.ReplayStore, + strategy *retrieval.PageIndexStrategy, + pageIndex enginecfg.PageIndexBlock, +) *AnswerPageIndexHandler { + return &AnswerPageIndexHandler{ + logger: logger, + db: pool, + storage: store, + llm: llm, + llmModel: llmModel, + answerSpan: answerSpan, + replay: replay, + strategy: strategy, + pageIndex: pageIndex, + } +} + +// loadTree resolves the document tree for the pageindex answer +// endpoint, routing through the test seam when set. +func (h *AnswerPageIndexHandler) loadTree(ctx context.Context, orgID, storeID string, docID tree.DocumentID) (*tree.Tree, error) { + if h.treeLoader != nil { + return h.treeLoader(ctx, orgID, storeID, docID) + } + return h.db.LoadTree(ctx, docID, orgID, storeID) +} + +// pageIndexAnswerRequest is the body shape for /v1/answer/pageindex. +type pageIndexAnswerRequest struct { + DocumentID tree.DocumentID `json:"document_id"` + Query string `json:"query"` + Model string `json:"model"` + MaxHops int `json:"max_hops"` + MaxPagesPerFetch int `json:"max_pages_per_fetch"` // chars cap; named per the spec + Stream bool `json:"stream"` + IncludeReasoning bool `json:"reasoning"` +} + +// HandleAnswerPageIndex runs the PageIndex agentic loop and returns +// the answer + page-grounded citations. Supports an SSE streaming +// variant (stream=true) and an opt-in reasoning trace +// (reasoning=true, or ?reasoning=true). +func (h *AnswerPageIndexHandler) HandleAnswerPageIndex(w http.ResponseWriter, r *http.Request) { + orgID, ok := requireOrgID(w, r) + if !ok { + return + } + if h.llm == nil { + writeErr(w, http.StatusNotImplemented, "answer/pageindex endpoint requires an LLM client") + return + } + if h.strategy == nil || !h.pageIndex.Enabled { + writeErr(w, http.StatusNotImplemented, "pageindex strategy not configured on this server (retrieval.pageindex.enabled=false)") + return + } + + var body pageIndexAnswerRequest + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + writeErr(w, http.StatusBadRequest, "invalid json: "+err.Error()) + return + } + if body.DocumentID == "" || body.Query == "" { + writeErr(w, http.StatusBadRequest, "document_id and query are required") + return + } + // Allow ?reasoning=true as an alternative to the body field. + if r.URL.Query().Get("reasoning") == "true" { + body.IncludeReasoning = true + } + + t, err := h.loadTree(r.Context(), orgID, storeID(r), body.DocumentID) + if err != nil { + if errors.Is(err, db.ErrNotFound) { + writeErr(w, http.StatusNotFound, "document not found") + return + } + writeErr(w, http.StatusInternalServerError, err.Error()) + return + } + + // Build a per-request copy of the shared strategy so per-request + // overrides (max_hops, max_pages_per_fetch, OnEvent, and the + // org-scoped TOC provider) never mutate the shared instance that + // other goroutines read concurrently. + perReq := *h.strategy + if body.MaxHops > 0 { + perReq.MaxHops = body.MaxHops + } + if body.MaxPagesPerFetch > 0 { + perReq.PageContentLimit = body.MaxPagesPerFetch + } + // Scope the TOC provider to the requesting org/store so the + // get_document_structure tool reads only this tenant's + // documents.toc_tree. Without a DB handle (tests) the strategy + // keeps whatever TOC it was constructed with (often nil → + // synthesised view). + if h.db != nil { + perReq.TOC = scopedTOCProvider{db: h.db, orgID: orgID, storeID: storeID(r)} + } + + budget := retrieval.ContextBudget{ModelName: body.Model} + if budget.ModelName == "" { + budget.ModelName = h.llmModel + } + + started := time.Now() + + // Stream variant: hijack the response writer for SSE. + if body.Stream { + h.serveStream(w, r, &perReq, t, body, budget, started) + return + } + + // Non-streaming: optionally capture the reasoning trace. + var ( + traceMu sync.Mutex + trace []map[string]any + ) + if body.IncludeReasoning { + perReq.OnEvent = func(ev retrieval.PageIndexEvent) { + traceMu.Lock() + defer traceMu.Unlock() + trace = append(trace, pageIndexEventToTraceMap(ev)) + } + } + + res, err := perReq.SelectWithCost(r.Context(), t, body.Query, budget) + if err != nil { + h.logger.Error("answer/pageindex: strategy failed", "err", err, "document_id", body.DocumentID) + writeErr(w, http.StatusInternalServerError, "pageindex strategy failed: "+err.Error()) + return + } + + citations := h.buildCitations(r.Context(), t, res, body.Query, body.Model) + + resp := map[string]any{ + "document_id": body.DocumentID, + "query": body.Query, + "answer": res.Reasoning, // strategy stores the agent's answer here + "citations": citations, + "strategy": perReq.Name(), + "model": budget.ModelName, + "hops_taken": res.HopsTaken, + "usage": map[string]any{ + "input_tokens": res.Usage.InputTokens, + "output_tokens": res.Usage.OutputTokens, + "total_tokens": res.Usage.TotalTokens, + "cost_usd": res.Usage.CostUSD, + "llm_calls": res.Usage.LLMCalls, + }, + "elapsed_ms": time.Since(started).Milliseconds(), + "trace_token": res.TraceToken, + "pages_read": res.PagesRead, + } + if body.IncludeReasoning && len(trace) > 0 { + resp["reasoning_trace"] = trace + } + + finalIDs := append([]tree.SectionID(nil), res.SelectedIDs...) + raw, err := marshalJSONForReplay(resp) + if err != nil { + writeJSON(w, http.StatusOK, resp) + return + } + writeJSONWithReplay(w, h.replay, http.StatusOK, raw, res.TraceToken, retrieval.ReplayEntry{ + DocumentID: body.DocumentID, + Query: body.Query, + Model: budget.ModelName, + SelectedIDs: finalIDs, + }) +} + +// serveStream handles the stream=true SSE variant. Each tool call +// emits one event so the caller can watch navigation in real time; +// the final "answer" event carries the full JSON response. +func (h *AnswerPageIndexHandler) serveStream(w http.ResponseWriter, r *http.Request, strat *retrieval.PageIndexStrategy, t *tree.Tree, body pageIndexAnswerRequest, budget retrieval.ContextBudget, started time.Time) { + flusher, ok := w.(http.Flusher) + if !ok { + writeErr(w, http.StatusInternalServerError, "streaming requires http.Flusher; response writer does not support it") + return + } + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.WriteHeader(http.StatusOK) + + var writeMu sync.Mutex + emitSSE := func(eventType string, payload any) { + raw, err := json.Marshal(payload) + if err != nil { + return + } + writeMu.Lock() + defer writeMu.Unlock() + fmt.Fprintf(w, "event: %s\ndata: %s\n\n", eventType, raw) + flusher.Flush() + } + + strat.OnEvent = func(ev retrieval.PageIndexEvent) { + emitSSE(ev.Type, ev) + } + + emitSSE("started", map[string]any{ + "document_id": body.DocumentID, + "query": body.Query, + "strategy": strat.Name(), + "model": budget.ModelName, + }) + + res, err := strat.SelectWithCost(r.Context(), t, body.Query, budget) + if err != nil { + emitSSE("error", map[string]string{"error": err.Error()}) + return + } + + citations := h.buildCitations(r.Context(), t, res, body.Query, body.Model) + final := map[string]any{ + "document_id": body.DocumentID, + "query": body.Query, + "answer": res.Reasoning, + "citations": citations, + "strategy": strat.Name(), + "model": budget.ModelName, + "hops_taken": res.HopsTaken, + "usage": map[string]any{ + "input_tokens": res.Usage.InputTokens, + "output_tokens": res.Usage.OutputTokens, + "total_tokens": res.Usage.TotalTokens, + "cost_usd": res.Usage.CostUSD, + "llm_calls": res.Usage.LLMCalls, + }, + "elapsed_ms": time.Since(started).Milliseconds(), + "trace_token": res.TraceToken, + "pages_read": res.PagesRead, + } + emitSSE("answer", final) +} + +// buildCitations transforms the strategy's PagesRead + the section +// tree into the response's citations array: one citation per unique +// cited page range, each carrying the overlapping section IDs and a +// best-effort grounding quote extracted from the cited content. +func (h *AnswerPageIndexHandler) buildCitations(ctx context.Context, t *tree.Tree, res *retrieval.Result, query, requestModel string) []map[string]any { + if res == nil { + return nil + } + seen := make(map[[2]int]struct{}, len(res.PagesRead)) + citations := make([]map[string]any, 0, len(res.PagesRead)) + + for _, pr := range res.PagesRead { + key := [2]int{pr.StartPage, pr.EndPage} + if _, dup := seen[key]; dup { + continue + } + seen[key] = struct{}{} + + c := map[string]any{ + "start_page": pr.StartPage, + "end_page": pr.EndPage, + "section_ids": pr.SectionIDs, + } + + if h.llm != nil { + content := h.materialiseCitedContent(ctx, t, pr.SectionIDs) + if strings.TrimSpace(content) != "" { + ext := h.spanExtractor(requestModel) + span, _, err := ext.Extract(ctx, content, query) + if err == nil && span != nil && span.Text != "" { + c["quote"] = span.Text + if span.Start >= 0 && span.End > span.Start { + c["quote_start"] = span.Start + c["quote_end"] = span.End + } + } + } + } + + citations = append(citations, c) + } + + // Stable sort by start_page so output ordering is deterministic + // across runs that fetch the same pages in different orders. + sort.SliceStable(citations, func(i, j int) bool { + return citations[i]["start_page"].(int) < citations[j]["start_page"].(int) + }) + + return citations +} + +// materialiseCitedContent loads + concatenates every cited section's +// content (capped at 16K chars), used for answer-span extraction over +// the pages the model relied on. +func (h *AnswerPageIndexHandler) materialiseCitedContent(ctx context.Context, t *tree.Tree, sectionIDs []tree.SectionID) string { + if len(sectionIDs) == 0 { + return "" + } + var ( + b strings.Builder + budget = 16000 + ) + for _, id := range sectionIDs { + if b.Len() >= budget { + break + } + sec := t.FindByID(id) + if sec == nil || sec.ContentRef == "" { + continue + } + rc, _, err := h.storage.Get(ctx, sec.ContentRef) + if err != nil { + continue + } + raw, err := io.ReadAll(rc) + rc.Close() + if err != nil { + continue + } + text := strings.TrimSpace(string(raw)) + remaining := budget - b.Len() + if remaining <= 0 { + break + } + if len(text) > remaining { + text = text[:remaining] + } + b.WriteString(text) + b.WriteString("\n\n") + } + return b.String() +} + +// spanExtractor builds a SpanExtractor for citation quoting, using the +// same model fall-through as the /v1/answer handler. +func (h *AnswerPageIndexHandler) spanExtractor(requestModel string) *retrieval.SpanExtractor { + model := h.answerSpan.Model + if model == "" { + model = requestModel + } + if model == "" { + model = h.llmModel + } + ext := retrieval.NewSpanExtractor(h.llm, model) + if h.answerSpan.MaxQuoteLen > 0 { + ext.MaxQuoteLen = h.answerSpan.MaxQuoteLen + } + return ext +} + +// pageIndexEventToTraceMap converts a PageIndexEvent into the +// reasoning_trace entry shape. Only documented fields ship. +func pageIndexEventToTraceMap(ev retrieval.PageIndexEvent) map[string]any { + args := map[string]any{} + switch ev.Type { + case "get_pages": + if ev.StartPage > 0 { + args["start_page"] = ev.StartPage + } + if ev.EndPage > 0 { + args["end_page"] = ev.EndPage + } + case "done": + if len(ev.CitedPages) > 0 { + args["cited_pages"] = ev.CitedPages + } + } + entry := map[string]any{ + "hop": ev.Hop, + "tool": ev.Type, + } + if len(args) > 0 { + entry["args"] = args + } + if ev.Reasoning != "" { + entry["reasoning"] = ev.Reasoning + } + if ev.Note != "" { + entry["note"] = ev.Note + } + if ev.CharCount > 0 { + entry["result_chars"] = ev.CharCount + } + if len(ev.SectionIDs) > 0 { + entry["sections_touched"] = ev.SectionIDs + } + if ev.Answer != "" { + entry["answer"] = ev.Answer + } + return entry +} + +// scopedTOCProvider adapts the DB pool to retrieval.TOCProvider with a +// fixed org/store scope captured per request. It reads the persisted +// documents.toc_tree for the requesting tenant and returns it verbatim +// for the get_document_structure tool; a NULL column surfaces as +// retrieval.ErrNoTOC so the strategy degrades to its synthesised view. +type scopedTOCProvider struct { + db *db.Pool + orgID string + storeID string +} + +func (p scopedTOCProvider) GetTOC(ctx context.Context, docID tree.DocumentID) ([]byte, error) { + doc, err := p.db.GetDocument(ctx, docID, p.orgID, p.storeID) + if err != nil { + return nil, err + } + if len(doc.TOCTree) == 0 { + return nil, retrieval.ErrNoTOC + } + return doc.TOCTree, nil +} diff --git a/internal/handler/helpers.go b/internal/handler/helpers.go index 77e6d02..b8693ad 100644 --- a/internal/handler/helpers.go +++ b/internal/handler/helpers.go @@ -4,6 +4,9 @@ import ( "encoding/json" "net/http" "strings" + "time" + + "github.com/hallelx2/vectorless-engine/pkg/retrieval" ) // writeJSON encodes v as JSON and writes it with the given status code. @@ -18,6 +21,36 @@ func writeErr(w http.ResponseWriter, status int, msg string) { writeJSON(w, status, map[string]string{"error": msg}) } +// marshalJSONForReplay marshals v to JSON exactly as it would be sent +// on the wire so the bytes can be both stored in the replay log AND +// returned to the caller in lock-step. A trailing newline is appended +// to match encoding/json.Encoder.Encode's behaviour so the replay +// path returns byte-identical responses. +func marshalJSONForReplay(v any) ([]byte, error) { + raw, err := json.Marshal(v) + if err != nil { + return nil, err + } + raw = append(raw, '\n') + return raw, nil +} + +// writeJSONWithReplay writes pre-marshalled JSON bytes verbatim and +// stores them in the replay store under the given token. Both writes +// MUST see the same bytes; this is the single point where that +// invariant is enforced. When store is nil or token is empty the +// replay write is skipped silently. +func writeJSONWithReplay(w http.ResponseWriter, store retrieval.ReplayStore, status int, raw []byte, token string, entry retrieval.ReplayEntry) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + _, _ = w.Write(raw) + if store != nil && token != "" { + entry.ResponseJSON = raw + entry.CreatedAt = time.Now() + store.Put(token, entry) + } +} + // guessContentType infers a MIME type from the file extension. func guessContentType(filename string) string { name := strings.ToLower(filename) diff --git a/internal/handler/query.go b/internal/handler/query.go index dbfb3f5..73621b8 100644 --- a/internal/handler/query.go +++ b/internal/handler/query.go @@ -1,6 +1,7 @@ package handler import ( + "context" "encoding/json" "errors" "io" @@ -20,20 +21,48 @@ type QueryHandler struct { db *db.Pool storage storage.Storage strategy retrieval.Strategy + // strategies is the pre-built set of selectable strategies keyed + // by config name (chunked-tree, pageindex, agentic, single-pass). + // A per-request "strategy" field selects one of these; an absent + // or empty field falls back to the configured default (strategy). + // Nil/empty disables the override entirely — every request uses + // the default. + strategies map[string]retrieval.Strategy + + // treeLoader is a test seam overriding how the handler resolves + // the document tree. Nil routes through the org-scoped DB lookup + // (the production path); tests set it to a deterministic in-memory + // function so the handler runs end-to-end without a real Postgres + // backend. + treeLoader func(ctx context.Context, orgID, storeID string, docID tree.DocumentID) (*tree.Tree, error) +} + +// loadTree resolves the document tree, routing through the test seam +// when set. +func (h *QueryHandler) loadTree(ctx context.Context, orgID, storeID string, docID tree.DocumentID) (*tree.Tree, error) { + if h.treeLoader != nil { + return h.treeLoader(ctx, orgID, storeID, docID) + } + return h.db.LoadTree(ctx, docID, orgID, storeID) } -// NewQueryHandler creates a QueryHandler. +// NewQueryHandler creates a QueryHandler. strategies is the optional +// pre-built map that backs the per-request "strategy" override; pass +// nil to disable the override (every request uses the default +// strategy). func NewQueryHandler( logger *slog.Logger, pool *db.Pool, store storage.Storage, strategy retrieval.Strategy, + strategies map[string]retrieval.Strategy, ) *QueryHandler { return &QueryHandler{ - logger: logger, - db: pool, - storage: store, - strategy: strategy, + logger: logger, + db: pool, + storage: store, + strategy: strategy, + strategies: strategies, } } @@ -46,6 +75,28 @@ type queryRequest struct { ReservedForPrompt int `json:"reserved_for_prompt"` MaxParallelCalls int `json:"max_parallel_calls"` MaxSections int `json:"max_sections"` + // Strategy optionally overrides the configured retrieval strategy + // for THIS request only. One of: chunked-tree, pageindex, agentic, + // single-pass. Empty uses the server default. This lets a caller + // (e.g. the benchmark harness) A/B strategies against the same + // running engine without a redeploy. Unknown values return 400. + Strategy string `json:"strategy"` +} + +// resolveStrategy picks the strategy for one request. An empty +// override yields the configured default. A non-empty override is +// looked up in the pre-built set; an unknown name (or an override on a +// handler with no strategy set wired) returns ok=false so the caller +// can reply 400 rather than silently falling back. +func (h *QueryHandler) resolveStrategy(override string) (retrieval.Strategy, bool) { + if override == "" { + return h.strategy, true + } + if h.strategies == nil { + return nil, false + } + s, ok := h.strategies[override] + return s, ok } // HandleQuery accepts a query, loads the document tree, runs the @@ -65,12 +116,18 @@ func (h *QueryHandler) HandleQuery(w http.ResponseWriter, r *http.Request) { writeErr(w, http.StatusBadRequest, "document_id and query are required") return } - if h.strategy == nil { + + strategy, ok := h.resolveStrategy(body.Strategy) + if !ok { + writeErr(w, http.StatusBadRequest, "unknown strategy: "+body.Strategy) + return + } + if strategy == nil { writeErr(w, http.StatusServiceUnavailable, "no retrieval strategy configured") return } - t, err := h.db.LoadTree(r.Context(), body.DocumentID, orgID, storeID(r)) + t, err := h.loadTree(r.Context(), orgID, storeID(r), body.DocumentID) if err != nil { if errors.Is(err, db.ErrNotFound) { writeErr(w, http.StatusNotFound, "document not found") @@ -103,7 +160,7 @@ func (h *QueryHandler) HandleQuery(w http.ResponseWriter, r *http.Request) { ids []tree.SectionID usage *retrieval.Usage ) - if cs, ok := h.strategy.(retrieval.CostStrategy); ok { + if cs, ok := strategy.(retrieval.CostStrategy); ok { result, err := cs.SelectWithCost(r.Context(), t, body.Query, budget) if err != nil { h.logger.Error("query: strategy failed", @@ -117,7 +174,7 @@ func (h *QueryHandler) HandleQuery(w http.ResponseWriter, r *http.Request) { usage = &result.Usage } else { var err error - ids, err = h.strategy.Select(r.Context(), t, body.Query, budget) + ids, err = strategy.Select(r.Context(), t, body.Query, budget) if err != nil { h.logger.Error("query: strategy failed", "err", err, @@ -160,7 +217,7 @@ func (h *QueryHandler) HandleQuery(w http.ResponseWriter, r *http.Request) { resp := map[string]any{ "document_id": body.DocumentID, "query": body.Query, - "strategy": h.strategy.Name(), + "strategy": strategy.Name(), "model": body.Model, "sections": sections, "elapsed_ms": time.Since(started).Milliseconds(), diff --git a/internal/handler/query_strategy_test.go b/internal/handler/query_strategy_test.go new file mode 100644 index 0000000..1bce66e --- /dev/null +++ b/internal/handler/query_strategy_test.go @@ -0,0 +1,210 @@ +package handler + +import ( + "bytes" + "context" + "encoding/json" + "io" + "log/slog" + "net/http" + "net/http/httptest" + "strings" + "sync/atomic" + "testing" + "time" + + "github.com/hallelx2/vectorless-engine/pkg/retrieval" + "github.com/hallelx2/vectorless-engine/pkg/storage" + "github.com/hallelx2/vectorless-engine/pkg/tree" +) + +// labeledStrategy is a mock retrieval.Strategy that records whether it +// was invoked and reports a caller-supplied Name. The per-request +// strategy-override test uses it to prove WHICH strategy a /v1/query +// request actually routed to. +type labeledStrategy struct { + name string + picks []tree.SectionID + called int32 +} + +func (s *labeledStrategy) Name() string { return s.name } + +func (s *labeledStrategy) Select(ctx context.Context, t *tree.Tree, query string, budget retrieval.ContextBudget) ([]tree.SectionID, error) { + atomic.AddInt32(&s.called, 1) + return s.picks, nil +} + +func (s *labeledStrategy) wasCalled() bool { return atomic.LoadInt32(&s.called) > 0 } + +// memStorage is a minimal in-memory storage.Storage; only Get matters +// for the query handler (it loads section content by ContentRef). +type memStorage struct{ data map[string][]byte } + +func (m *memStorage) Put(ctx context.Context, key string, r io.Reader, meta storage.Metadata) error { + b, err := io.ReadAll(r) + if err != nil { + return err + } + if m.data == nil { + m.data = map[string][]byte{} + } + m.data[key] = b + return nil +} + +func (m *memStorage) Get(ctx context.Context, key string) (io.ReadCloser, storage.Metadata, error) { + b, ok := m.data[key] + if !ok { + return nil, storage.Metadata{}, storage.ErrNotFound + } + return io.NopCloser(bytes.NewReader(b)), storage.Metadata{Key: key, Size: int64(len(b))}, nil +} + +func (m *memStorage) Delete(ctx context.Context, key string) error { return nil } +func (m *memStorage) Exists(ctx context.Context, key string) (bool, error) { + _, ok := m.data[key] + return ok, nil +} +func (m *memStorage) SignedURL(ctx context.Context, key string, expiry time.Duration) (string, error) { + return "", nil +} + +func queryStrategyTestTree() *tree.Tree { + a := &tree.Section{ID: "sec_a", Title: "A", ContentRef: "a_ref", PageStart: 1, PageEnd: 2} + b := &tree.Section{ID: "sec_b", Title: "B", ContentRef: "b_ref", PageStart: 3, PageEnd: 4} + root := &tree.Section{ID: "sec_root", Title: "Doc", Children: []*tree.Section{a, b}} + return &tree.Tree{DocumentID: "doc_x", Title: "Doc", Root: root} +} + +// newQueryStrategyHandler builds a QueryHandler wired with the default +// strategy + an override set, an in-memory tree loader, and storage, +// so HandleQuery runs end-to-end via httptest without a real DB. +func newQueryStrategyHandler(def retrieval.Strategy, set map[string]retrieval.Strategy) *QueryHandler { + store := &memStorage{data: map[string][]byte{ + "a_ref": []byte("section A content"), + "b_ref": []byte("section B content"), + }} + h := NewQueryHandler(slog.Default(), nil, store, def, set) + h.treeLoader = func(ctx context.Context, orgID, storeID string, docID tree.DocumentID) (*tree.Tree, error) { + return queryStrategyTestTree(), nil + } + return h +} + +func doQuery(t *testing.T, h *QueryHandler, jsonBody string) (*httptest.ResponseRecorder, map[string]any) { + t.Helper() + req := httptest.NewRequest(http.MethodPost, "/v1/query", strings.NewReader(jsonBody)) + req.Header.Set("X-Vectorless-Org", "org_test") + rec := httptest.NewRecorder() + h.HandleQuery(rec, req) + var resp map[string]any + if rec.Body.Len() > 0 { + _ = json.Unmarshal(rec.Body.Bytes(), &resp) + } + return rec, resp +} + +// TestHandleQueryStrategyOverrideRoutesToPageIndex is the Task-2 +// acceptance gate: a /v1/query body carrying {"strategy":"pageindex"} +// must route to the page-based strategy, NOT the configured default. +func TestHandleQueryStrategyOverrideRoutesToPageIndex(t *testing.T) { + t.Parallel() + + def := &labeledStrategy{name: "chunked-tree", picks: []tree.SectionID{"sec_a"}} + page := &labeledStrategy{name: "pageindex", picks: []tree.SectionID{"sec_b"}} + set := map[string]retrieval.Strategy{ + "chunked-tree": def, + "pageindex": page, + } + h := newQueryStrategyHandler(def, set) + + rec, resp := doQuery(t, h, `{"document_id":"doc_x","query":"q","strategy":"pageindex"}`) + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, body = %s", rec.Code, rec.Body.String()) + } + if page.wasCalled() == false { + t.Error("pageindex strategy was NOT called for {\"strategy\":\"pageindex\"}") + } + if def.wasCalled() { + t.Error("default (chunked-tree) strategy was called despite a pageindex override") + } + if got := resp["strategy"]; got != "pageindex" { + t.Errorf("response strategy = %v, want pageindex", got) + } + // The pageindex mock picks sec_b; prove the override's result (not + // the default's sec_a) is what surfaced. + secs, _ := resp["sections"].([]any) + if len(secs) != 1 { + t.Fatalf("sections = %v, want 1 (sec_b from pageindex)", resp["sections"]) + } + if id := secs[0].(map[string]any)["id"]; id != "sec_b" { + t.Errorf("section id = %v, want sec_b (pageindex's pick)", id) + } +} + +// TestHandleQueryDefaultStrategyWhenAbsent: no "strategy" field uses +// the configured default and never touches the override set. +func TestHandleQueryDefaultStrategyWhenAbsent(t *testing.T) { + t.Parallel() + + def := &labeledStrategy{name: "chunked-tree", picks: []tree.SectionID{"sec_a"}} + page := &labeledStrategy{name: "pageindex", picks: []tree.SectionID{"sec_b"}} + set := map[string]retrieval.Strategy{"chunked-tree": def, "pageindex": page} + h := newQueryStrategyHandler(def, set) + + rec, resp := doQuery(t, h, `{"document_id":"doc_x","query":"q"}`) + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, body = %s", rec.Code, rec.Body.String()) + } + if !def.wasCalled() { + t.Error("default strategy was NOT called when no override was set") + } + if page.wasCalled() { + t.Error("override strategy was called despite an absent strategy field") + } + if got := resp["strategy"]; got != "chunked-tree" { + t.Errorf("response strategy = %v, want chunked-tree", got) + } +} + +// TestHandleQueryUnknownStrategy: an override naming a strategy not in +// the set returns 400 rather than silently falling back. +func TestHandleQueryUnknownStrategy(t *testing.T) { + t.Parallel() + + def := &labeledStrategy{name: "chunked-tree", picks: []tree.SectionID{"sec_a"}} + set := map[string]retrieval.Strategy{"chunked-tree": def} + h := newQueryStrategyHandler(def, set) + + rec, _ := doQuery(t, h, `{"document_id":"doc_x","query":"q","strategy":"does-not-exist"}`) + if rec.Code != http.StatusBadRequest { + t.Errorf("status = %d, want 400 for unknown strategy", rec.Code) + } + if def.wasCalled() { + t.Error("default strategy must not run when an unknown override is rejected") + } +} + +// TestHandleQueryOverrideWithNilSet: a handler wired with no override +// set rejects any non-empty strategy field (400) but still serves +// requests that omit it. +func TestHandleQueryOverrideWithNilSet(t *testing.T) { + t.Parallel() + + def := &labeledStrategy{name: "chunked-tree", picks: []tree.SectionID{"sec_a"}} + h := newQueryStrategyHandler(def, nil) + + rec, _ := doQuery(t, h, `{"document_id":"doc_x","query":"q","strategy":"pageindex"}`) + if rec.Code != http.StatusBadRequest { + t.Errorf("nil set + override: status = %d, want 400", rec.Code) + } + + rec2, resp2 := doQuery(t, h, `{"document_id":"doc_x","query":"q"}`) + if rec2.Code != http.StatusOK { + t.Fatalf("nil set + no override: status = %d, body = %s", rec2.Code, rec2.Body.String()) + } + if got := resp2["strategy"]; got != "chunked-tree" { + t.Errorf("response strategy = %v, want chunked-tree", got) + } +} diff --git a/internal/handler/router.go b/internal/handler/router.go index 13e5012..b14fca6 100644 --- a/internal/handler/router.go +++ b/internal/handler/router.go @@ -5,8 +5,10 @@ import ( "net/http" "github.com/go-chi/chi/v5" + "github.com/hallelx2/llmgate" "github.com/prometheus/client_golang/prometheus/promhttp" + enginecfg "github.com/hallelx2/vectorless-engine/pkg/config" "github.com/hallelx2/vectorless-engine/pkg/db" "github.com/hallelx2/vectorless-engine/pkg/queue" "github.com/hallelx2/vectorless-engine/pkg/retrieval" @@ -28,6 +30,41 @@ type Deps struct { MultiDoc *retrieval.MultiDoc Version string Config config.Config + + // Strategies is the pre-built set of selectable retrieval + // strategies keyed by config name. It backs the per-request + // "strategy" override on /v1/query (the benchmark uses it to A/B + // chunked-tree vs pageindex against one running engine). Nil + // disables the override — every /v1/query uses Strategy. + Strategies map[string]retrieval.Strategy + + // LLM is the shared llmgate client used by the answer endpoints + // (/v1/answer, /v1/answer/pageindex) for span extraction and + // synthesis. Nil makes those endpoints return 501. + LLM llmgate.Client + + // LLMModel is the default model name. Per-request model overrides + // win over it. + LLMModel string + + // AnswerSpan / Answer hold the answer-endpoint config blocks. + AnswerSpan enginecfg.AnswerSpanBlock + Answer enginecfg.AnswerBlock + + // Replay is the replay-trace store. Every /v1/answer and + // /v1/answer/pageindex response is stamped with a trace_token and + // persisted here. Nil skips replay capture for those endpoints. + Replay retrieval.ReplayStore + + // PageIndexStrategy is the dedicated page-based agentic strategy + // instance used by /v1/answer/pageindex, independent of whichever + // selection strategy retrieval.strategy chose. Nil (or + // PageIndex.Enabled=false) makes the endpoint return 501. + PageIndexStrategy *retrieval.PageIndexStrategy + + // PageIndex carries the page-based answer endpoint's config. The + // per-request max_hops / max_pages_per_fetch fields override it. + PageIndex enginecfg.PageIndexBlock } // Router builds the chi router with all v1 routes and the full @@ -104,10 +141,12 @@ func Router(d Deps) http.Handler { // ── REST Handlers (hand-written, chi) ───────────────────────── health := NewHealthHandler(d.Version) docs := NewDocumentsHandler(d.Logger, d.DB, d.Storage, d.Queue) - query := NewQueryHandler(d.Logger, d.DB, d.Storage, d.Strategy) + query := NewQueryHandler(d.Logger, d.DB, d.Storage, d.Strategy, d.Strategies) queryStream := NewQueryStreamHandler(d.Logger, d.DB, d.Storage, d.Strategy) queryMulti := NewQueryMultiHandler(d.Logger, d.Storage, d.Strategy, d.MultiDoc) queryStreamMulti := NewQueryStreamMultiHandler(d.Logger, d.Storage, d.MultiDoc) + answer := NewAnswerHandler(d.Logger, d.DB, d.Storage, d.Strategy, d.LLM, d.LLMModel, d.AnswerSpan, d.Answer, d.Replay) + answerPageIndex := NewAnswerPageIndexHandler(d.Logger, d.DB, d.Storage, d.LLM, d.LLMModel, d.AnswerSpan, d.Replay, d.PageIndexStrategy, d.PageIndex) webhook := NewWebhookHandler(d.Logger, d.Queue) // ── Connect-RPC Handlers (generated stubs, three-transport) ─── @@ -159,6 +198,12 @@ func Router(d Deps) http.Handler { r.Post("/multi", queryMulti.HandleQueryMulti) r.Post("/multi/stream", queryStreamMulti.HandleQueryStreamMulti) }) + + // Answer: retrieval + synthesis in one round-trip. /answer + // uses the configured selection strategy; /answer/pageindex + // runs the page-based agentic loop end-to-end. + r.Post("/answer", answer.HandleAnswer) + r.Post("/answer/pageindex", answerPageIndex.HandleAnswerPageIndex) }) // Internal: queue webhook (QStash). diff --git a/internal/handler/router_parity_test.go b/internal/handler/router_parity_test.go new file mode 100644 index 0000000..d920b34 --- /dev/null +++ b/internal/handler/router_parity_test.go @@ -0,0 +1,96 @@ +package handler + +import ( + "net/http" + "testing" + + "github.com/go-chi/chi/v5" +) + +// TestRouterParity is a divergence guard. The deployed cmd/server +// binary and the standalone cmd/engine binary serve overlapping route +// sets from two different routers (internal/handler vs internal/api). +// They have silently diverged before — the PageIndex redesign landed +// only on cmd/engine, leaving /v1/answer and /v1/answer/pageindex +// unreachable in production. +// +// This test walks the mounted chi router and asserts that the routes +// the deployed binary is contractually required to expose are present. +// If a future refactor drops one of the answer endpoints (or the +// per-request strategy override's /v1/query mount), this fails loudly +// instead of shipping a binary that's missing half the API. +func TestRouterParity(t *testing.T) { + t.Parallel() + + // A zero Deps is enough to construct the router: handlers store + // their dependencies and only dereference them at request time, so + // Walk sees the full route table without any live backend. + h := Router(Deps{}) + + routes, ok := h.(chi.Routes) + if !ok { + t.Fatalf("Router did not return a chi.Routes (got %T); cannot walk routes", h) + } + + got := map[string]bool{} + walk := func(method, route string, handler http.Handler, middlewares ...func(http.Handler) http.Handler) error { + got[method+" "+route] = true + return nil + } + if err := chi.Walk(routes, walk); err != nil { + t.Fatalf("chi.Walk: %v", err) + } + + // The route set the deployed binary MUST expose. Anything added + // here becomes a hard contract; drop a route and this test fails. + want := []string{ + "POST /v1/query/", + "POST /v1/answer", + "POST /v1/answer/pageindex", + } + for _, route := range want { + if !got[route] { + t.Errorf("router is missing required route %q\nmounted routes: %v", route, sortedKeys(got)) + } + } +} + +// TestRouterMountsAnswerEndpoints is the focused assertion the task +// calls out explicitly: /v1/answer and /v1/answer/pageindex must be +// mounted on the deployed router. Kept separate from the broad parity +// set so a failure points straight at the answer-endpoint regression. +func TestRouterMountsAnswerEndpoints(t *testing.T) { + t.Parallel() + + routes, ok := Router(Deps{}).(chi.Routes) + if !ok { + t.Fatal("Router did not return a chi.Routes") + } + + found := map[string]bool{} + _ = chi.Walk(routes, func(method, route string, _ http.Handler, _ ...func(http.Handler) http.Handler) error { + found[route] = true + return nil + }) + + for _, route := range []string{"/v1/answer", "/v1/answer/pageindex"} { + if !found[route] { + t.Errorf("deployed router must mount %q but does not", route) + } + } +} + +func sortedKeys(m map[string]bool) []string { + out := make([]string, 0, len(m)) + for k := range m { + out = append(out, k) + } + // Simple insertion sort to avoid pulling in sort just for a test + // diagnostic; the route set is tiny. + for i := 1; i < len(out); i++ { + for j := i; j > 0 && out[j-1] > out[j]; j-- { + out[j-1], out[j] = out[j], out[j-1] + } + } + return out +}