Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 174 additions & 10 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/hallelx2/vectorless-engine/pkg/queue"
"github.com/hallelx2/vectorless-engine/pkg/retrieval"
"github.com/hallelx2/vectorless-engine/pkg/storage"
"github.com/hallelx2/vectorless-engine/pkg/tree"

"github.com/hallelx2/vectorless-engine/internal/config"
"github.com/hallelx2/vectorless-engine/internal/handler"
Expand Down Expand Up @@ -133,7 +134,7 @@ func run() error {
if err != nil {
return fmt.Errorf("init llm: %w", err)
}
strategy := buildStrategy(cfg.Engine.Retrieval, llmClient, store)
strategy := buildStrategy(cfg.Engine.Retrieval, llmClient, store, pool)

// Wrap with caching if enabled in engine config.
if cfg.Engine.Retrieval.Cache.Enabled {
Expand All @@ -154,6 +155,44 @@ func run() error {
// Multi-document query dispatcher.
multiDoc := retrieval.NewMultiDoc(strategy, pool.LoadTree)

// Pre-built set of selectable strategies, keyed by config name.
// Backs the per-request "strategy" override on /v1/query so the
// benchmark can A/B chunked-tree vs pageindex against this same
// running engine without a redeploy. Built from the raw client so
// each override behaves identically to booting with that strategy
// as the default (no shared cache wrapper across overrides).
strategies := buildStrategySet(cfg.Engine.Retrieval, llmClient, store, pool)

// Replay store: every /v1/answer and /v1/answer/pageindex response
// is stamped with a deterministic trace_token and its body bytes
// persisted here so /v1/replay can return them verbatim. On by
// default; operators opt out via retrieval.replay.enabled=false.
var replayStore retrieval.ReplayStore
if cfg.Engine.Retrieval.Replay.Enabled {
replayStore = retrieval.NewLRUReplayStore(retrieval.LRUReplayConfig{
MaxEntries: cfg.Engine.Retrieval.Replay.MaxEntries,
TTL: time.Duration(cfg.Engine.Retrieval.Replay.TTLSeconds) * time.Second,
})
logger.Info("retrieval: replay store enabled",
"max_entries", cfg.Engine.Retrieval.Replay.MaxEntries,
"ttl_seconds", cfg.Engine.Retrieval.Replay.TTLSeconds,
)
}

// /v1/answer/pageindex gets its OWN PageIndexStrategy instance,
// independent of whatever selection strategy retrieval.strategy
// chose, so the endpoint is always available (gated by
// retrieval.pageindex.enabled) even on a chunked-tree deployment.
var pageIndexStrategy *retrieval.PageIndexStrategy
if cfg.Engine.Retrieval.PageIndex.Enabled && llmClient != nil {
pageIndexStrategy = buildPageIndexStrategy(cfg.Engine.Retrieval, llmClient, store, pool)
logger.Info("retrieval: pageindex answer endpoint enabled",
"max_hops", pageIndexStrategy.MaxHops,
"page_content_limit", pageIndexStrategy.PageContentLimit,
"model_override", cfg.Engine.Retrieval.PageIndex.Model,
)
}

// ── Ingest pipeline ───────────────────────────────────────────
pipeline := ingest.NewPipeline(ingest.Pipeline{
DB: pool,
Expand Down Expand Up @@ -201,14 +240,22 @@ func run() error {
// Only start the HTTP server in "server" role.
if *role == "server" {
deps := handler.Deps{
Logger: logger,
DB: pool,
Storage: store,
Queue: q,
Strategy: strategy,
MultiDoc: multiDoc,
Version: version,
Config: cfg,
Logger: logger,
DB: pool,
Storage: store,
Queue: q,
Strategy: strategy,
MultiDoc: multiDoc,
Version: version,
Config: cfg,
Strategies: strategies,
LLM: llmClient,
LLMModel: modelFor(cfg.Engine.LLM),
AnswerSpan: cfg.Engine.Retrieval.AnswerSpan,
Answer: cfg.Engine.Retrieval.Answer,
Replay: replayStore,
PageIndexStrategy: pageIndexStrategy,
PageIndex: cfg.Engine.Retrieval.PageIndex,
}

srv := &http.Server{
Expand Down Expand Up @@ -324,6 +371,21 @@ func buildQueue(c enginecfg.QueueConfig, dbURL string) (queue.Queue, error) {
}
}

// modelFor returns the configured chat/general-purpose model name for
// the selected LLM driver. Used as the engine-default fallback when an
// API request omits an explicit model (answer + answer/pageindex).
func modelFor(c enginecfg.LLMConfig) string {
switch c.Driver {
case "anthropic":
return c.Anthropic.Model
case "openai":
return c.OpenAI.Model
case "gemini":
return c.Gemini.Model
}
return ""
}

func buildLLM(c enginecfg.LLMConfig) (llmgate.Client, error) {
switch c.Driver {
case "anthropic":
Expand All @@ -349,7 +411,11 @@ func buildLLM(c enginecfg.LLMConfig) (llmgate.Client, error) {
}
}

func buildStrategy(c enginecfg.RetrievalConfig, client llmgate.Client, store storage.Storage) retrieval.Strategy {
// buildStrategy constructs the retrieval strategy named by
// retrieval.strategy. The DB pool is threaded through so the
// pageindex strategy can wire a TOC provider that reads
// documents.toc_tree (the other strategies ignore it).
func buildStrategy(c enginecfg.RetrievalConfig, client llmgate.Client, store storage.Storage, pool *db.Pool) retrieval.Strategy {
switch c.Strategy {
case "single-pass":
return retrieval.NewSinglePass(client)
Expand All @@ -362,11 +428,109 @@ func buildStrategy(c enginecfg.RetrievalConfig, client llmgate.Client, store sto
}
a.ModelOverride = c.Agentic.Model
return a
case "pageindex":
return buildPageIndexStrategy(c, client, store, pool)
default:
return retrieval.NewChunkedTree(client)
}
}

// buildStrategySet pre-builds one instance of every selectable
// strategy, keyed by its config name. The deployed /v1/query handler
// uses this map to honour a per-request "strategy" override without
// rebuilding a strategy on the hot path: selection is a map lookup.
//
// This is what lets the benchmark A/B chunked-tree vs pageindex
// against the SAME running engine — no redeploy, no config flip. The
// caps (agentic max-hops, pageindex page limits, model overrides) come
// from the same config blocks the default builder reads, so an
// override behaves identically to booting with that strategy as the
// default.
func buildStrategySet(c enginecfg.RetrievalConfig, client llmgate.Client, store storage.Storage, pool *db.Pool) map[string]retrieval.Strategy {
agentic := retrieval.NewAgentic(client, storageFetcher{s: store})
if c.Agentic.MaxHops > 0 {
agentic.MaxHops = c.Agentic.MaxHops
}
agentic.ModelOverride = c.Agentic.Model

return map[string]retrieval.Strategy{
"single-pass": retrieval.NewSinglePass(client),
"chunked-tree": retrieval.NewChunkedTree(client),
"agentic": agentic,
"pageindex": buildPageIndexStrategy(c, client, store, pool),
}
}

// buildPageIndexStrategy constructs the page-based agentic strategy
// with the storage-backed PageLoader, a DB-backed TOC provider, and
// the configured caps. Ported from cmd/engine so the DEPLOYED
// cmd/server binary can serve retrieval.strategy=pageindex AND the
// /v1/answer/pageindex endpoint.
//
// The TOC provider reads documents.toc_tree via the worker-scoped
// document lookup. The strategy degrades to its synthesised view
// (built from the loaded section tree) whenever the column is NULL or
// the read errors, so a document ingested before the TOC builder ran
// still navigates cleanly.
func buildPageIndexStrategy(c enginecfg.RetrievalConfig, client llmgate.Client, store storage.Storage, pool *db.Pool) *retrieval.PageIndexStrategy {
p := retrieval.NewPageIndexStrategy(client)
p.PageLoader = storagePageLoader{s: store}
if pool != nil {
p.TOC = dbTOCProvider{db: pool}
}
if c.PageIndex.MaxHops > 0 {
p.MaxHops = c.PageIndex.MaxHops
}
if c.PageIndex.PageContentLimit > 0 {
p.PageContentLimit = c.PageIndex.PageContentLimit
}
p.ModelOverride = c.PageIndex.Model
return p
}

// storagePageLoader adapts a storage.Storage to
// retrieval.PageContentLoader. Mirrors storageFetcher but lives behind
// a separate interface so the two callers (agentic / pageindex) can be
// wired independently. The PageIndex strategy materialises section
// bodies once per get_pages observation, so reading the full reader
// into a []byte is the right shape.
type storagePageLoader struct{ s storage.Storage }

func (l storagePageLoader) Load(ctx context.Context, ref string) ([]byte, error) {
rc, _, err := l.s.Get(ctx, ref)
if err != nil {
return nil, err
}
defer rc.Close()
return io.ReadAll(rc)
}

// dbTOCProvider adapts the DB pool to retrieval.TOCProvider. It reads
// the persisted documents.toc_tree JSONB and returns it verbatim for
// the get_document_structure tool. A NULL column (the "not yet
// generated" state) surfaces as retrieval.ErrNoTOC, which the strategy
// treats as a graceful-degrade signal: it synthesises the TOC view
// from the section tree instead of failing the request.
//
// GetTOC carries only a document ID (the TOCProvider contract), so the
// lookup uses the worker-scoped accessor. That is safe here: the
// caller has already resolved + authorised the tree for this document
// via the org-scoped LoadTree before the strategy ever calls GetTOC,
// and the TOC tree is the same structural metadata (titles + page
// ranges, no bodies) already present on that authorised tree.
type dbTOCProvider struct{ db *db.Pool }

func (p dbTOCProvider) GetTOC(ctx context.Context, docID tree.DocumentID) ([]byte, error) {
doc, err := p.db.GetDocumentForWorker(ctx, docID)
if err != nil {
return nil, err
}
if len(doc.TOCTree) == 0 {
return nil, retrieval.ErrNoTOC
}
return doc.TOCTree, nil
}

// storageFetcher adapts a storage.Storage to retrieval.ContentFetcher.
// The agentic strategy reads section bodies one at a time, so we
// materialize the full reader contents into a []byte here rather than
Expand Down
Loading
Loading