Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1702,6 +1702,11 @@ func prefixesToDropVectorIndexEdges(ctx context.Context, rb *IndexRebuild) [][]b
prefixes := append([][]byte{}, x.PredicatePrefix(hnsw.ConcatStrings(rb.Attr, hnsw.VecEntry)))
prefixes = append(prefixes, x.PredicatePrefix(hnsw.ConcatStrings(rb.Attr, hnsw.VecDead)))
prefixes = append(prefixes, x.PredicatePrefix(hnsw.ConcatStrings(rb.Attr, hnsw.VecKeyword)))
// VecQuant ("__vector_q") is a distinct predicate from VecKeyword
// ("__vector_"), so its keys are not covered by the VecKeyword prefix and
// must be dropped explicitly on rebuild to avoid leaving stale quantized
// blobs behind.
prefixes = append(prefixes, x.PredicatePrefix(hnsw.ConcatStrings(rb.Attr, hnsw.VecQuant)))

for i := range hnsw.VectorIndexMaxLevels {
prefixes = append(prefixes, x.PredicatePrefix(hnsw.ConcatStrings(rb.Attr, hnsw.VecKeyword, fmt.Sprint(i))))
Expand Down
23 changes: 23 additions & 0 deletions schema/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,3 +689,26 @@ func TestMain(m *testing.M) {
Init(ps)
m.Run()
}

// TestParseVectorQuantizeOption verifies the int8 quantization option is
// accepted end-to-end through schema parsing (registered as an HNSW factory
// option) and that an invalid value is rejected at parse time.
func TestParseVectorQuantizeOption(t *testing.T) {
require.NoError(t, ParseBytes([]byte(
`vqi: float32vector @index(hnsw(metric:"euclidean", quantize:"int8")) .`+"\n"), 1))
su, ok := State().predicate[x.AttrInRootNamespace("vqi")]
require.True(t, ok)
require.Len(t, su.IndexSpecs, 1)
found := false
for _, op := range su.IndexSpecs[0].Options {
if op.Key == "quantize" {
require.Equal(t, "int8", op.Value)
found = true
}
}
require.True(t, found, "quantize option must be captured in the vector index spec")

// An unsupported quantize value must be rejected when the schema is parsed.
require.Error(t, ParseBytes([]byte(
`vqbad: float32vector @index(hnsw(quantize:"int4")) .`+"\n"), 1))
}
1 change: 1 addition & 0 deletions schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ func (s *state) PredicatesToDelete(pred string) []string {
preds = append(preds, pred+hnsw.VecEntry)
preds = append(preds, pred+hnsw.VecKeyword)
preds = append(preds, pred+hnsw.VecDead)
preds = append(preds, pred+hnsw.VecQuant)
}
}
return preds
Expand Down
69 changes: 69 additions & 0 deletions tok/hnsw/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (
DotProd = "dotproduct"
EmptyHNSWTreeError = "HNSW tree has no elements"
VecKeyword = "__vector_"
VecQuant = "__vector_q" // per-node int8-quantized vector (opt-in)
visitedVectorsLevel = "visited_vectors_level_"
distanceComputations = "vector_distance_computations"
searchTime = "vector_search_time"
Expand Down Expand Up @@ -365,6 +366,32 @@ var emptyVec = []byte{}
// adds the data corresponding to a uid to the given vec variable in the form of []T
// this does not allocate memory for vec, so it must be allocated before calling this function
func (ph *persistentHNSW[T]) getVecFromUid(uid uint64, c index.CacheType, vec *[]T) error {
// Quantized index: read the int8 blob from vecQKey and dequantize into the
// caller's reused buffer. On a missing/undecodable blob, fall back to the
// raw vector (graceful degradation; the build writes vecQKey for every node
// so misses are rare and indicate a partial/corrupt index).
if ph.quantize {
data, err := getDataFromKeyWithCacheType(ph.vecQKey, uid, c)
if err != nil && !errors.Is(err, errFetchingPostingList) {
return err
}
if len(data) > 0 {
if derr := index.DequantizeInto(data, vec); derr == nil {
// Accept only when the decoded length matches the index
// dimension. ph.dim is established solely from full-precision
// vectors (the raw path below), so a corrupt blob can never
// poison it, and a wrong-length slice never reaches the SIMD
// distance kernels. When dim is not yet known (d == 0) we fall
// back to raw, which both returns a correct-length vector and
// sets dim from trusted data.
if d := ph.dim.Load(); d != 0 && int(d) == len(*vec) {
return nil
}
}
// fall through to the raw vector on decode failure or dim mismatch.
}
}

data, err := getDataFromKeyWithCacheType(ph.pred, uid, c)
if err != nil {
if errors.Is(err, errFetchingPostingList) {
Expand All @@ -376,13 +403,55 @@ func (ph *persistentHNSW[T]) getVecFromUid(uid uint64, c index.CacheType, vec *[
}
if data != nil {
index.BytesAsFloatArray(data, vec, ph.floatBits)
ph.noteDim(len(*vec))
return nil
} else {
index.BytesAsFloatArray(emptyVec, vec, ph.floatBits)
return errNilVector
}
}

// noteDim records the index's vector dimension the first time a vector is
// materialized. Safe for concurrent use during a multi-goroutine build.
func (ph *persistentHNSW[T]) noteDim(n int) {
if n > 0 {
ph.dim.CompareAndSwap(0, int32(n))
}
}

// writeQuantizedVec stores the int8-quantized copy of inVec at vecQKey[uid].
// No-op unless the index is quantized. It must be called before uid can be read
// as a neighbor by a later insertion; since insertHelper calls it up front for
// the node being inserted, and neighbors are always earlier insertions, the
// blob is always present by the time it is needed.
func (ph *persistentHNSW[T]) writeQuantizedVec(
ctx context.Context, tc *TxnCache, uid uint64, inVec []T) error {
if !ph.quantize || len(inVec) == 0 {
return nil
}
// Fast path: T is already float32 (the only width quantization supports),
// so avoid the per-insert copy.
f32, ok := any(inVec).([]float32)
if !ok {
f32 = make([]float32, len(inVec))
for i, x := range inVec {
f32[i] = float32(x)
}
}
blob := index.QuantizeFloat32(f32)
if blob == nil {
return nil
}
key := DataKey(ph.vecQKey, uid)
tc.txn.LockKey(key)
defer tc.txn.UnlockKey(key)
return tc.txn.AddMutationWithLockHeld(ctx, key, &index.KeyValue{
Entity: uid,
Attr: ph.vecQKey,
Value: blob,
})
}

// chooses whether to create the entry and start nodes based on if it already
// exists, and if it hasnt been created yet, it adds the startNode to all
// levels.
Expand Down
14 changes: 13 additions & 1 deletion tok/hnsw/persistent_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
EfConstructionOpt string = "efConstruction"
EfSearchOpt string = "efSearch"
MetricOpt string = "metric"
QuantizeOpt string = "quantize"
Hnsw string = "hnsw"
)

Expand Down Expand Up @@ -73,8 +74,18 @@ func (hf *persistentIndexFactory[T]) AllowedOptions() opt.AllowedOptions {
}
return GetSimType[T](optValue, hf.floatBits), nil
}

retVal.AddCustomOption(MetricOpt, getSimFunc)

// quantize is validated at the option layer so a bad value (e.g. "int4")
// is rejected when the schema is altered, not at first index build.
getQuantFunc := func(optValue string) (any, error) {
if optValue != "int8" {
return nil, fmt.Errorf("unsupported %q value %q (only \"int8\" is supported)",
QuantizeOpt, optValue)
}
return optValue, nil
}
retVal.AddCustomOption(QuantizeOpt, getQuantFunc)
return retVal
}

Expand Down Expand Up @@ -106,6 +117,7 @@ func (hf *persistentIndexFactory[T]) createWithLock(
vecEntryKey: ConcatStrings(name, VecEntry),
vecKey: ConcatStrings(name, VecKeyword),
vecDead: ConcatStrings(name, VecDead),
vecQKey: ConcatStrings(name, VecQuant),
floatBits: floatBits,
nodeAllEdges: map[uint64][][]uint64{},
}
Expand Down
37 changes: 37 additions & 0 deletions tok/hnsw/persistent_hnsw.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"fmt"
"strings"
"sync/atomic"
"time"

c "github.com/dgraph-io/dgraph/v25/tok/constraints"
Expand All @@ -26,8 +27,19 @@ type persistentHNSW[T c.Float] struct {
vecEntryKey string
vecKey string
vecDead string
vecQKey string
simType SimilarityType[T]
floatBits int
// quantize is true when this index stores an int8-quantized copy of each
// vector in vecQKey and computes distances against the dequantized copy
// (opt-in via the "quantize":"int8" index option). The raw float vectors
// in pred are left untouched.
quantize bool
// dim is the vector dimension, learned lazily from the first materialized
// vector. Used to reject a quantized blob whose dimension disagrees
// (corruption / stale schema) before its wrong-length slice reaches the
// SIMD distance kernels. 0 means "not yet known".
dim atomic.Int32
// nodeAllEdges[65443][1][3] indicates the 3rd neighbor in the first
// layer for UUID 65443. The result will be a neighboring UUID.
nodeAllEdges map[uint64][][]uint64
Expand Down Expand Up @@ -58,6 +70,10 @@ func GetPersistantOptions[T c.Float](o opt.Options) string {
sb.WriteString(fmt.Sprintf(`"%s":"%s",`, MetricOpt, sim.indexType))
}

if val, ok, _ := opt.GetOpt(o, QuantizeOpt, ""); ok && val != "" {
sb.WriteString(fmt.Sprintf(`"%s":"%s",`, QuantizeOpt, val))
}

final := sb.String()
if len(final) > 0 {
// Remove last , and cover with brackets
Expand Down Expand Up @@ -109,6 +125,21 @@ func (ph *persistentHNSW[T]) applyOptions(o opt.Options) error {
insortHeap: insortPersistentHeapAscending[T], isBetterScore: isBetterScoreForDistance[T],
isSimilarityMetric: false}
}

qval, _, err := opt.GetOpt(o, QuantizeOpt, "")
if err != nil {
return err
}
if qval != "" {
if qval != "int8" {
return fmt.Errorf("unsupported %q value %q (only \"int8\" is supported)", QuantizeOpt, qval)
}
// int8 scalar quantization currently targets 32-bit float vectors.
if ph.floatBits != 32 {
return fmt.Errorf("%q=int8 requires 32-bit float vectors, got %d-bit", QuantizeOpt, ph.floatBits)
}
ph.quantize = true
}
return nil
}

Expand Down Expand Up @@ -572,6 +603,12 @@ func (ph *persistentHNSW[T]) Insert(ctx context.Context, c index.CacheType,
func (ph *persistentHNSW[T]) insertHelper(ctx context.Context, tc *TxnCache,
inUuid uint64, inVec []T) ([]persistentHeapElement[T], []*index.KeyValue, error) {

// Persist the quantized copy of this node's vector first (no-op unless the
// index is quantized), so later insertions can read it as a neighbor.
if err := ph.writeQuantizedVec(ctx, tc, inUuid, inVec); err != nil {
return []persistentHeapElement[T]{}, []*index.KeyValue{}, err
}

// return all the new edges created at all HNSW levels
var startVec []T
entry, edges, err := ph.createEntryAndStartNodes(ctx, tc, inUuid, &startVec)
Expand Down
Loading
Loading