Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
4dcb5ed
feat(search): add BM25 ranked text search
shaunpatterson Mar 4, 2026
ed720da
fix(bm25): store TF/doclen in facets and fix query pipeline integration
shaunpatterson Mar 4, 2026
1ee7473
perf(bm25): replace facet storage with compact direct Badger KV encoding
shaunpatterson Mar 5, 2026
79d5295
test(bm25): add 15 integration tests for mutation scenarios and edge …
shaunpatterson Mar 5, 2026
c9273f6
test(bm25): add exact score verification, BM15 variant, and single-do…
shaunpatterson Mar 5, 2026
ffb7f2f
feat(bm25): add block storage infrastructure for segmented column stores
shaunpatterson Mar 5, 2026
59f18cc
feat(bm25): segmented block writes and WAND/Block-Max WAND query path
shaunpatterson Mar 5, 2026
1a3ada3
feat(bm25): add legacy format fallback for migration and WAND unit tests
shaunpatterson Mar 5, 2026
fc6a212
fix(bm25): address GPT-5 code review findings in WAND implementation
shaunpatterson Mar 5, 2026
1073cba
fix(bm25): prevent stats double-counting on updates and fix BMW other…
shaunpatterson Mar 5, 2026
81b18c1
fix(bm25): clamp startPos in skipTo to prevent negative sort.Search l…
shaunpatterson Mar 5, 2026
edf466d
fix(bm25): address Gemini/GPT-5 code review findings
shaunpatterson Mar 19, 2026
6fd041e
feat(bm25): rework BM25 onto standard posting lists
shaunpatterson Jun 4, 2026
1ec5cc1
fix(bm25): accumulate corpus stats across transactions
shaunpatterson Jun 4, 2026
235c5eb
fix(bm25): reject @index(bm25) on list predicates
shaunpatterson Jun 4, 2026
7b063f0
chore(bm25): drop working design notes
shaunpatterson Jun 4, 2026
ca19759
fix(bm25): use tagged switch on err (staticcheck QF1002)
shaunpatterson Jun 7, 2026
77ee561
fix(bm25): clear corpus stats on index drop/rebuild
shaunpatterson Jun 8, 2026
6ddc583
fix(bm25): aggregate corpus stats during index rebuild
shaunpatterson Jun 8, 2026
c724ddc
fix(bm25): bound query memory on the offset/filter paths with WAND top-k
shaunpatterson Jun 8, 2026
3a40bfa
fix(bm25): build a correct BM25 index in the bulk loader
shaunpatterson Jun 8, 2026
a1b1c74
fix(bm25): bind score var from uid-keyed snapshot; drop dead helper
shaunpatterson Jun 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions dgraph/cmd/bulk/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/dgraph-io/dgraph/v25/enc"
"github.com/dgraph-io/dgraph/v25/filestore"
gqlSchema "github.com/dgraph-io/dgraph/v25/graphql/schema"
"github.com/dgraph-io/dgraph/v25/posting"
"github.com/dgraph-io/dgraph/v25/protos/pb"
"github.com/dgraph-io/dgraph/v25/schema"
"github.com/dgraph-io/dgraph/v25/x"
Expand Down Expand Up @@ -433,6 +434,12 @@ func (ld *loader) mapStage() {
close(ld.readerChunkCh)
mapperWg.Wait()

// Flush BM25 corpus statistics accumulated across all mappers as one posting per
// bucket, before the mappers are released. Stats must be summed (not unioned like
// postings), so this single merge-and-write avoids the per-mapper double counting a
// union would produce.
ld.flushBM25Stats()

// Allow memory to GC before the reduce phase.
for i := range ld.mappers {
ld.mappers[i] = nil
Expand All @@ -444,6 +451,75 @@ func (ld *loader) mapStage() {
ld.xids = nil
}

// mergeBM25Stats combines every mapper's per-predicate corpus-statistics partials into
// per-predicate bucket totals. Summing across mappers is what makes the final per-bucket
// counts correct; emitting each mapper's partial as its own posting would be unioned (or
// collapsed last-write-wins) at reduce time and undercount.
func mergeBM25Stats(mappers []*mapper) map[string]*bm25StatEntry {
merged := make(map[string]*bm25StatEntry)
for _, m := range mappers {
if m == nil {
continue
}
for attr, e := range m.bm25Stats {
me := merged[attr]
if me == nil {
me = &bm25StatEntry{}
merged[attr] = me
}
for i := 0; i < posting.NumBM25StatsBuckets; i++ {
me.count[i] += e.count[i]
me.terms[i] += e.terms[i]
}
}
}
return merged
}

// flushBM25Stats writes the merged BM25 corpus statistics as one value posting per
// non-empty bucket into the same map shard as the predicate's term postings (keyed by
// shardFor(attr)), so they co-locate through shard merge and land in the predicate's
// output DB. Exactly one posting is written per bucket, so the reduce produces a single
// value posting per bucket — the same format the live and rebuild paths store.
func (ld *loader) flushBM25Stats() {
merged := mergeBM25Stats(ld.mappers)
if len(merged) == 0 {
return
}

// A fresh mapper supplies clean shard buffers; the running mappers have already
// flushed and released theirs.
writer := newMapper(ld.state)
for attr, e := range merged {
shard := ld.shards.shardFor(attr)
for b := 0; b < posting.NumBM25StatsBuckets; b++ {
if e.count[b] == 0 && e.terms[b] == 0 {
continue
}
writer.addMapEntry(
x.BM25StatsKey(attr, b),
&pb.Posting{
Uid: math.MaxUint64,
PostingType: pb.Posting_VALUE,
ValType: pb.Posting_BINARY,
Value: posting.EncodeBM25Stats(uint64(e.count[b]), uint64(e.terms[b])),
},
shard,
)
}
}

for i := range writer.shards {
sh := &writer.shards[i]
if sh.cbuf.LenNoPadding() > 0 {
sh.mu.Lock() // writeMapEntriesToFile unlocks and releases the buffer.
writer.writeMapEntriesToFile(sh.cbuf, i)
} else if err := sh.cbuf.Release(); err != nil {
glog.Warningf("error releasing bm25 stats buffer: %v", err)
}
}
}

func parseGqlSchema(s string) map[uint64]string {
var schemas []x.ExportedGQLSchema
if err := json.Unmarshal([]byte(s), &schemas); err != nil {
Expand Down
79 changes: 74 additions & 5 deletions dgraph/cmd/bulk/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,19 @@ var (
type mapper struct {
*state
shards []shardState // shard is based on predicate

// bm25Stats accumulates per-predicate corpus statistics (document count and total
// term count, bucketed by uid) as documents are mapped. BM25 stats must be summed,
// not unioned like postings, so each mapper accumulates locally and the loader
// merges all mappers and flushes one stats posting per bucket after the map phase
// (see loader.flushBM25Stats). Keyed by namespaced predicate.
bm25Stats map[string]*bm25StatEntry
}

// bm25StatEntry holds one predicate's bucketed corpus-statistics partials for a mapper.
type bm25StatEntry struct {
count [posting.NumBM25StatsBuckets]int64
terms [posting.NumBM25StatsBuckets]int64
}

type shardState struct {
Expand All @@ -66,8 +79,9 @@ func newMapper(st *state) *mapper {
shards[i].cbuf = newMapperBuffer(st.opt)
}
return &mapper{
state: st,
shards: shards,
state: st,
shards: shards,
bm25Stats: make(map[string]*bm25StatEntry),
}
}

Expand Down Expand Up @@ -295,8 +309,11 @@ func (m *mapper) addMapEntry(key []byte, p *pb.Posting, shard int) {
atomic.AddInt64(&m.prog.mapEdgeCount, 1)

uid := p.Uid
if p.PostingType != pb.Posting_REF || len(p.Facets) > 0 {
// Keep p
if p.PostingType != pb.Posting_REF || len(p.Facets) > 0 || len(p.Value) > 0 {
// Keep p. A REF posting that carries a Value (e.g. a BM25 term posting packing
// term frequency and document length) must retain that payload — mirroring the
// len(p.Value) > 0 retention clause in List.encode — or it would be reduced to a
// bare UID and the value silently lost.
} else {
// We only needed the UID.
p = nil
Expand Down Expand Up @@ -456,11 +473,21 @@ func (m *mapper) addIndexMapEntries(nq dql.NQuad, de *pb.DirectedEdge) {
// doing edge postings. So okay to be fatal.
x.Check(err)

attr := x.NamespaceAttr(nq.Namespace, nq.Predicate)

// BM25 postings pack (term frequency, document length) into each posting's value
// and require corpus statistics; the generic token path would write bare,
// valueless postings and no stats, leaving bulk-loaded data unsearchable. Handle
// it separately.
if _, isBM25 := toker.(tok.BM25Tokenizer); isBM25 {
m.addBM25IndexMapEntries(attr, nq.Lang, de, schemaVal)
continue
}

// Extract tokens.
toks, err := tok.BuildTokens(schemaVal.Value, tok.GetTokenizerForLang(toker, nq.Lang))
x.Check(err)

attr := x.NamespaceAttr(nq.Namespace, nq.Predicate)
// Store index posting.
for _, t := range toks {
m.addMapEntry(
Expand All @@ -474,3 +501,45 @@ func (m *mapper) addIndexMapEntries(nq dql.NQuad, de *pb.DirectedEdge) {
}
}
}

// addBM25IndexMapEntries writes the BM25 term postings for one document — one posting
// per distinct term, packing (term frequency, document length) into the value exactly
// as the live index path does — and accumulates the document's contribution to this
// mapper's corpus statistics. The accumulated stats are flushed once per bucket after
// the map phase (loader.flushBM25Stats), because corpus statistics must be summed
// across documents rather than unioned like postings.
func (m *mapper) addBM25IndexMapEntries(attr string, lang string, de *pb.DirectedEdge,
schemaVal types.Val) {
termFreqs, docLen, err := tok.BM25Tokenizer{}.TokensWithFrequency(schemaVal.Value, lang)
x.Check(err)
if docLen == 0 {
// Document tokenizes to zero terms (e.g. all stopwords); it contributes no
// postings and no corpus statistics.
return
}

shard := m.state.shards.shardFor(attr)
uid := de.GetEntity()
for term, tf := range termFreqs {
encodedTerm := string([]byte{tok.IdentBM25}) + term
m.addMapEntry(
x.IndexKey(attr, encodedTerm),
&pb.Posting{
Uid: uid,
PostingType: pb.Posting_REF,
ValType: pb.Posting_BINARY,
Value: posting.EncodeBM25Value(tf, docLen),
},
shard,
)
}

entry := m.bm25Stats[attr]
if entry == nil {
entry = &bm25StatEntry{}
m.bm25Stats[attr] = entry
}
bucket := uid % posting.NumBM25StatsBuckets
entry.count[bucket]++
entry.terms[bucket] += int64(docLen)
}
59 changes: 59 additions & 0 deletions dgraph/cmd/bulk/mapper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* SPDX-FileCopyrightText: © 2017-2025 Istari Digital, Inc.
* SPDX-License-Identifier: Apache-2.0
*/

package bulk

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/dgraph-io/dgraph/v25/posting"
)

// TestMergeBM25Stats verifies that per-mapper corpus-statistics partials are summed
// across mappers per (predicate, bucket). This is the linchpin of correct bulk BM25
// stats: each mapper sees a disjoint subset of documents, so the final doc count and
// total term count for a bucket must be the sum of every mapper's partial — never just
// one mapper's (which a unioned/last-write-wins posting would produce).
func TestMergeBM25Stats(t *testing.T) {
mk := func(attr string, bucket int, count, terms int64) *mapper {
m := &mapper{bm25Stats: map[string]*bm25StatEntry{}}
e := &bm25StatEntry{}
e.count[bucket] = count
e.terms[bucket] = terms
m.bm25Stats[attr] = e
return m
}

mappers := []*mapper{
mk("name", 1, 3, 30),
mk("name", 1, 2, 25), // same predicate+bucket as above -> must sum
mk("name", 5, 4, 40), // same predicate, different bucket
mk("bio", 1, 7, 70), // different predicate
nil, // released mappers are skipped
}

merged := mergeBM25Stats(mappers)
require.Len(t, merged, 2)

require.Equal(t, int64(5), merged["name"].count[1], "bucket 1 doc count must sum across mappers")
require.Equal(t, int64(55), merged["name"].terms[1], "bucket 1 term count must sum across mappers")
require.Equal(t, int64(4), merged["name"].count[5])
require.Equal(t, int64(40), merged["name"].terms[5])
require.Equal(t, int64(7), merged["bio"].count[1])
require.Equal(t, int64(70), merged["bio"].terms[1])

// Untouched buckets stay zero.
require.Equal(t, int64(0), merged["name"].count[0])
require.Equal(t, int64(0), merged["bio"].count[5])

// Total doc count across the predicate equals the sum of all contributing mappers.
var nameDocs int64
for b := 0; b < posting.NumBM25StatsBuckets; b++ {
nameDocs += merged["name"].count[b]
}
require.Equal(t, int64(9), nameDocs)
}
2 changes: 1 addition & 1 deletion dql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -1701,7 +1701,7 @@ func validFuncName(name string) bool {

switch name {
case "regexp", "anyofterms", "allofterms", "alloftext", "anyoftext", "ngram",
"has", "uid", "uid_in", "anyof", "allof", "type", "match", "similar_to":
"has", "uid", "uid_in", "anyof", "allof", "type", "match", "similar_to", "bm25":
return true
}
return false
Expand Down
Loading
Loading