Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ func run() error {
SummaryAxesMaxTopics: cfg.Engine.Ingest.SummaryAxes.MaxTopics,
SummaryAxesMaxEntities: cfg.Engine.Ingest.SummaryAxes.MaxEntities,
SummaryAxesMaxNumbers: cfg.Engine.Ingest.SummaryAxes.MaxNumbers,
TOCEnabled: cfg.Engine.Ingest.TOC.Enabled,
TOCModel: cfg.Engine.Ingest.TOC.Model,
TOCConcurrency: cfg.Engine.Ingest.TOC.Concurrency,
TOCCheckPages: cfg.Engine.Ingest.TOC.TOCCheckPages,
GlobalLLMConcurrency: cfg.Engine.Ingest.GlobalLLMConcurrency,
})
if cfg.Engine.Ingest.Tables.Enabled {
Expand Down
26 changes: 26 additions & 0 deletions config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,32 @@ ingest:
max_entities: 8
max_numbers: 6

# LLM-built table-of-contents tree (PageIndex-style). Runs after
# summarize+HyDE on PDF inputs and persists a hierarchical TOC on
# documents.toc_tree (JSONB). The tree is small (a few KB even
# for 300-page filings) and is intended as a higher-level map
# retrieval strategies can reason over before drilling into the
# parser-derived sections tree.
#
# ENABLED BY DEFAULT for PDFs. Non-PDF documents skip the stage
# unconditionally. Builder failures are non-fatal — the document
# remains fully retrievable via the existing sections tree.
toc:
enabled: true
# Override the LLM model used by the builder; empty inherits
# the summary model. Point this at a reasoning-capable model —
# the no-TOC generator has to find hierarchy in raw body text,
# which a small/fast model often botches.
model: ""
# Cap on parallel LLM calls during the verification phase
# (one call per leaf node).
concurrency: 4
# The detector scans the first N pages for a table of
# contents. PageIndex defaults this to 20 — financial filings
# put their TOC inside the first dozen pages and a document
# without one by page 20 almost never has one further in.
toc_check_pages: 20

log:
level: "info" # debug | info | warn | error
format: "json" # json | console
76 changes: 76 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ type IngestConfig struct {
// populate it).
SummaryAxes SummaryAxesBlock `yaml:"summary_axes"`

// TOC configures the PageIndex-style LLM-built table-of-contents
// tree stage. Enabled by default for PDF inputs; the resulting
// tree is persisted on documents.toc_tree (JSONB). Failures are
// non-fatal — they leave the column NULL and the document fully
// retrievable via the existing sections tree.
TOC TOCBlock `yaml:"toc"`

// GlobalLLMConcurrency caps the total number of LLM calls in flight
// across the summarize and HyDE stages combined, which now run
// concurrently. Each stage still respects its own per-stage cap
Expand All @@ -60,6 +67,39 @@ type IngestConfig struct {
GlobalLLMConcurrency int `yaml:"global_llm_concurrency"`
}

// TOCBlock configures the LLM-driven table-of-contents tree
// builder. The builder reads page-by-page text from a freshly-
// ingested PDF and emits a hierarchical TOC (PageIndex-style),
// persisted on documents.toc_tree (JSONB).
//
// Enabled by default for PDF inputs; non-PDF documents skip the
// stage unconditionally. Builder failures never break ingest —
// the document remains fully retrievable via the existing
// sections tree.
type TOCBlock struct {
// Enabled toggles the stage. Default: true. Flip to false to
// skip the extra LLM round-trip when ingest budget matters
// more than having a TOC tree for retrieval to reason over.
Enabled bool `yaml:"enabled"`

// Model overrides the LLM model used by the builder. Empty
// inherits the engine's configured default. Point this at a
// reasoning-capable model — the no-TOC generator has to find
// hierarchy in raw body text, which a small/fast model often
// botches.
Model string `yaml:"model"`

// Concurrency caps parallel LLM calls during the verification
// phase (one call per leaf node). Default: 4.
Concurrency int `yaml:"concurrency"`

// TOCCheckPages bounds the leading prefix the detector scans
// for a table of contents. Default: 20 — financial filings
// put their TOC inside the first dozen pages and a document
// without one by page 20 almost never has one further in.
TOCCheckPages int `yaml:"toc_check_pages"`
}

// SummaryAxesBlock configures the Phase 2.5 structured summarizer.
//
// When enabled, the summarize stage runs in JSON mode and produces
Expand Down Expand Up @@ -584,6 +624,11 @@ func Default() Config {
MaxEntities: 8,
MaxNumbers: 6,
},
TOC: TOCBlock{
Enabled: true,
Concurrency: 4,
TOCCheckPages: 20,
},
},
Log: LogConfig{Level: "info", Format: "json"},
}
Expand Down Expand Up @@ -767,6 +812,30 @@ func applyEnvOverrides(c *Config) {
c.Ingest.SummaryAxes.MaxNumbers = n
}
}
// LLM-built TOC tree (PageIndex-style). Same truthy-string set
// as the other ingest toggles; numeric overrides require a
// positive int so a typo doesn't silently flip the default.
if v := os.Getenv("VLE_INGEST_TOC_ENABLED"); v != "" {
switch strings.ToLower(strings.TrimSpace(v)) {
case "1", "true", "yes", "on":
c.Ingest.TOC.Enabled = true
case "0", "false", "no", "off":
c.Ingest.TOC.Enabled = false
}
}
if v := os.Getenv("VLE_INGEST_TOC_MODEL"); v != "" {
c.Ingest.TOC.Model = v
}
if v := os.Getenv("VLE_INGEST_TOC_CONCURRENCY"); v != "" {
if n, err := strconv.Atoi(v); err == nil && n > 0 {
c.Ingest.TOC.Concurrency = n
}
}
if v := os.Getenv("VLE_INGEST_TOC_TOC_CHECK_PAGES"); v != "" {
if n, err := strconv.Atoi(v); err == nil && n > 0 {
c.Ingest.TOC.TOCCheckPages = n
}
}
if v := os.Getenv("VLE_RETRIEVAL_ANSWER_SPAN_ENABLED"); v != "" {
switch strings.ToLower(strings.TrimSpace(v)) {
case "1", "true", "yes", "on":
Expand Down Expand Up @@ -978,6 +1047,13 @@ func (c Config) Validate() error {
return fmt.Errorf("ingest.summary_axes.max_numbers must be >= 0, got %d", c.Ingest.SummaryAxes.MaxNumbers)
}

if c.Ingest.TOC.Concurrency < 0 {
return fmt.Errorf("ingest.toc.concurrency must be >= 0, got %d", c.Ingest.TOC.Concurrency)
}
if c.Ingest.TOC.TOCCheckPages < 0 {
return fmt.Errorf("ingest.toc.toc_check_pages must be >= 0, got %d", c.Ingest.TOC.TOCCheckPages)
}

if c.Retrieval.Planning.CacheSize < 0 {
return fmt.Errorf("retrieval.planning.cache_size must be >= 0, got %d", c.Retrieval.Planning.CacheSize)
}
Expand Down
49 changes: 49 additions & 0 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,55 @@ func TestDefaultValues(t *testing.T) {
if cfg.Log.Level != "info" {
t.Errorf("log.level = %q, want info", cfg.Log.Level)
}
if !cfg.Ingest.TOC.Enabled {
t.Error("ingest.toc.enabled should default to true (opt-out)")
}
if cfg.Ingest.TOC.Concurrency != 4 {
t.Errorf("ingest.toc.concurrency = %d, want 4", cfg.Ingest.TOC.Concurrency)
}
if cfg.Ingest.TOC.TOCCheckPages != 20 {
t.Errorf("ingest.toc.toc_check_pages = %d, want 20", cfg.Ingest.TOC.TOCCheckPages)
}
}

func TestTOCEnvOverride(t *testing.T) {
// Mutates env — restore on exit. Not parallel.
keys := []string{
"VLE_INGEST_TOC_ENABLED",
"VLE_INGEST_TOC_MODEL",
"VLE_INGEST_TOC_CONCURRENCY",
"VLE_INGEST_TOC_TOC_CHECK_PAGES",
}
prev := make(map[string]string, len(keys))
for _, k := range keys {
prev[k] = os.Getenv(k)
}
defer func() {
for k, v := range prev {
os.Setenv(k, v)
}
}()

os.Setenv("VLE_INGEST_TOC_ENABLED", "false")
os.Setenv("VLE_INGEST_TOC_MODEL", "gemini-2.5-pro")
os.Setenv("VLE_INGEST_TOC_CONCURRENCY", "12")
os.Setenv("VLE_INGEST_TOC_TOC_CHECK_PAGES", "30")

cfg := Default()
applyEnvOverrides(&cfg)

if cfg.Ingest.TOC.Enabled {
t.Error("VLE_INGEST_TOC_ENABLED=false should disable the stage")
}
if cfg.Ingest.TOC.Model != "gemini-2.5-pro" {
t.Errorf("VLE_INGEST_TOC_MODEL not applied, got %q", cfg.Ingest.TOC.Model)
}
if cfg.Ingest.TOC.Concurrency != 12 {
t.Errorf("VLE_INGEST_TOC_CONCURRENCY=12 not applied, got %d", cfg.Ingest.TOC.Concurrency)
}
if cfg.Ingest.TOC.TOCCheckPages != 30 {
t.Errorf("VLE_INGEST_TOC_TOC_CHECK_PAGES=30 not applied, got %d", cfg.Ingest.TOC.TOCCheckPages)
}
}

func TestAbstainEnvOverride(t *testing.T) {
Expand Down
51 changes: 42 additions & 9 deletions pkg/db/documents.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,18 @@ type Document struct {
Metadata map[string]string
CreatedAt time.Time
UpdatedAt time.Time

// TOCTree is the JSONB blob persisted by the ingest pipeline's
// LLM-driven TOC builder ([]tree.TOCNode marshalled). nil
// (NULL in DB) means "not yet generated" — the expected state
// for non-PDF documents, for documents ingested before the
// 0006 migration, and when the builder failed (builder
// failures are non-fatal and leave this column NULL).
//
// Stored raw so the column round-trips byte-identically
// regardless of slice-element ordering inside the encoder.
// Callers that need the typed shape unmarshal at read time.
TOCTree []byte
}

// NewDocument inserts a fresh document row in the "pending" state.
Expand Down Expand Up @@ -83,7 +95,7 @@ func (p *Pool) GetDocument(ctx context.Context, id tree.DocumentID, orgID, store
}
q := `
SELECT id, org_id, store_id, title, content_type, source_ref, status, error_message,
byte_size, metadata, created_at, updated_at
byte_size, metadata, created_at, updated_at, toc_tree
FROM documents WHERE id = $1 AND org_id = $2`
args := []any{string(id), orgID}
if storeID != "" {
Expand All @@ -94,13 +106,14 @@ func (p *Pool) GetDocument(ctx context.Context, id tree.DocumentID, orgID, store

var d Document
var status string
var rawMeta []byte
var rawMeta, rawTOC []byte
if err := row.Scan(&d.ID, &d.OrgID, &d.StoreID, &d.Title, &d.ContentType, &d.SourceRef, &status,
&d.ErrorMessage, &d.ByteSize, &rawMeta, &d.CreatedAt, &d.UpdatedAt); err != nil {
&d.ErrorMessage, &d.ByteSize, &rawMeta, &d.CreatedAt, &d.UpdatedAt, &rawTOC); err != nil {
return nil, mapErr(err)
}
d.Status = DocumentStatus(status)
d.Metadata = unmarshalMeta(rawMeta)
d.TOCTree = rawTOC
return &d, nil
}

Expand All @@ -111,18 +124,19 @@ func (p *Pool) GetDocument(ctx context.Context, id tree.DocumentID, orgID, store
func (p *Pool) GetDocumentForWorker(ctx context.Context, id tree.DocumentID) (*Document, error) {
row := p.QueryRow(ctx, `
SELECT id, org_id, store_id, title, content_type, source_ref, status, error_message,
byte_size, metadata, created_at, updated_at
byte_size, metadata, created_at, updated_at, toc_tree
FROM documents WHERE id = $1`, string(id))

var d Document
var status string
var rawMeta []byte
var rawMeta, rawTOC []byte
if err := row.Scan(&d.ID, &d.OrgID, &d.StoreID, &d.Title, &d.ContentType, &d.SourceRef, &status,
&d.ErrorMessage, &d.ByteSize, &rawMeta, &d.CreatedAt, &d.UpdatedAt); err != nil {
&d.ErrorMessage, &d.ByteSize, &rawMeta, &d.CreatedAt, &d.UpdatedAt, &rawTOC); err != nil {
return nil, mapErr(err)
}
d.Status = DocumentStatus(status)
d.Metadata = unmarshalMeta(rawMeta)
d.TOCTree = rawTOC
return &d, nil
}

Expand All @@ -143,6 +157,24 @@ func (p *Pool) SetDocumentTitle(ctx context.Context, id tree.DocumentID, title s
return mapErr(err)
}

// UpdateDocumentTOCTree persists the LLM-built table-of-contents
// tree onto the documents.toc_tree column. treeJSON is the already
// JSON-marshalled []tree.TOCNode; pass a nil slice to clear (writes
// SQL NULL — the "not yet generated" state). Mirrors
// UpdateSectionSummaryAxes so the column can be patched
// independently of the rest of the document row.
func (p *Pool) UpdateDocumentTOCTree(ctx context.Context, id tree.DocumentID, treeJSON []byte) error {
var arg any
if len(treeJSON) > 0 {
arg = treeJSON
}
_, err := p.Exec(ctx, `
UPDATE documents
SET toc_tree = $2, updated_at = now()
WHERE id = $1`, string(id), arg)
return mapErr(err)
}

// ListDocumentsOpts controls pagination + filtering for ListDocuments.
type ListDocumentsOpts struct {
// OrgID restricts the listing to a single tenant. Required.
Expand Down Expand Up @@ -197,7 +229,7 @@ func (p *Pool) ListDocuments(ctx context.Context, o ListDocumentsOpts) ([]Docume

q := `
SELECT id, org_id, store_id, title, content_type, source_ref, status, error_message,
byte_size, metadata, created_at, updated_at
byte_size, metadata, created_at, updated_at, toc_tree
FROM documents ` + where + `
ORDER BY created_at DESC
LIMIT $` + itoa(next)
Expand All @@ -212,13 +244,14 @@ func (p *Pool) ListDocuments(ctx context.Context, o ListDocumentsOpts) ([]Docume
for rows.Next() {
var d Document
var status string
var rawMeta []byte
var rawMeta, rawTOC []byte
if err := rows.Scan(&d.ID, &d.OrgID, &d.StoreID, &d.Title, &d.ContentType, &d.SourceRef, &status,
&d.ErrorMessage, &d.ByteSize, &rawMeta, &d.CreatedAt, &d.UpdatedAt); err != nil {
&d.ErrorMessage, &d.ByteSize, &rawMeta, &d.CreatedAt, &d.UpdatedAt, &rawTOC); err != nil {
return nil, time.Time{}, err
}
d.Status = DocumentStatus(status)
d.Metadata = unmarshalMeta(rawMeta)
d.TOCTree = rawTOC
out = append(out, d)
}
if err := rows.Err(); err != nil {
Expand Down
Loading
Loading