From 741a99cb94268e2a5e1baaa50b15ddbbf411364d Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Wed, 8 Apr 2026 19:54:34 +0900 Subject: [PATCH 1/4] preserving the capacity of the Symbols slice during resets Signed-off-by: SungJin1212 --- CHANGELOG.md | 3 +- pkg/cortexpb/timeseriesv2.go | 8 ++++ pkg/cortexpb/timeseriesv2_test.go | 65 +++++++++++++++++++++++++++++++ pkg/util/push/push.go | 8 ++-- 4 files changed, 79 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c59e1966bc1..95d688766ce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # Changelog ## master / unreleased +* [CHANGE] Querier: Make query time range configurations per-tenant: `query_ingesters_within`, `query_store_after`, and `shuffle_sharding_ingesters_lookback_period`. Uses `model.Duration` instead of `time.Duration` to support serialization but has minimum unit of 1ms (nanoseconds/microseconds not supported). #7160 * [FEATURE] Ruler: Add per-tenant `ruler_alert_generator_url_template` runtime config option to customize alert generator URLs using Go templates. Supports Grafana Explore, Perses, and other UIs. #7302 * [FEATURE] Distributor: Add experimental `-distributor.enable-start-timestamp` flag for Prometheus Remote Write 2.0. When enabled, `StartTimestamp (ST)` is ingested. #7371 * [FEATURE] Memberlist: Add `-memberlist.cluster-label` and `-memberlist.cluster-label-verification-disabled` to prevent accidental cross-cluster gossip joins and support rolling label rollout. #7385 @@ -10,11 +11,11 @@ * [ENHANCEMENT] Metrics Helper: Add native histogram support for aggregating and merging, including dual-format histogram handling that exposes both native and classic bucket formats. #7359 * [ENHANCEMENT] Cache: Add per-tenant TTL configuration for query results cache to control cache expiration on a per-tenant basis with separate TTLs for regular and out-of-order data. #7357 * [ENHANCEMENT] Update build image and Go version to 1.26. #7437 -* [CHANGE] Querier: Make query time range configurations per-tenant: `query_ingesters_within`, `query_store_after`, and `shuffle_sharding_ingesters_lookback_period`. Uses `model.Duration` instead of `time.Duration` to support serialization but has minimum unit of 1ms (nanoseconds/microseconds not supported). #7160 * [ENHANCEMENT] Tenant Federation: Add a local cache to regex resolver. #7363 * [ENHANCEMENT] Query Scheduler: Add `cortex_query_scheduler_tracked_requests` metric to track the current number of requests held by the scheduler. #7355 * [ENHANCEMENT] Compactor: Prevent partition compaction to compact any blocks marked for deletion. #7391 * [ENHANCEMENT] Distributor: Optimize memory allocations by reusing the existing capacity of these pooled slices in the Prometheus Remote Write 2.0 path. #7392 +* [ENHANCEMENT] Distributor: Optimize memory allocations by pooling PreallocWriteRequestV2 and preserving the capacity of the Symbols slice during resets. #7404 * [BUGFIX] Alertmanager: Fix disappearing user config and state when ring is temporarily unreachable. #7372 * [BUGFIX] Fix nil when ingester_query_max_attempts > 1. #7369 * [BUGFIX] Querier: Fix queryWithRetry and labelsWithRetry returning (nil, nil) on cancelled context by propagating ctx.Err(). #7370 diff --git a/pkg/cortexpb/timeseriesv2.go b/pkg/cortexpb/timeseriesv2.go index ed8d9ba5696..2a903bcb8ee 100644 --- a/pkg/cortexpb/timeseriesv2.go +++ b/pkg/cortexpb/timeseriesv2.go @@ -130,6 +130,14 @@ func PreallocWriteRequestV2FromPool() *PreallocWriteRequestV2 { return writeRequestPoolV2.Get().(*PreallocWriteRequestV2) } +// Reset implements proto.Message and preserves the capacity of the Symbols slice. +func (p *PreallocWriteRequestV2) Reset() { + savedSymbols := p.Symbols + p.WriteRequestV2.Reset() + p.Symbols = savedSymbols[:0] + p.data = nil +} + // PreallocTimeseriesV2SliceFromPool retrieves a slice of PreallocTimeseriesV2 from a sync.Pool. // ReuseSliceV2 should be called once done. func PreallocTimeseriesV2SliceFromPool() []PreallocTimeseriesV2 { diff --git a/pkg/cortexpb/timeseriesv2_test.go b/pkg/cortexpb/timeseriesv2_test.go index e71f6b9f155..89fd525036e 100644 --- a/pkg/cortexpb/timeseriesv2_test.go +++ b/pkg/cortexpb/timeseriesv2_test.go @@ -122,6 +122,71 @@ func TestReuseWriteRequestV2(t *testing.T) { }) } +func TestPreallocWriteRequestV2Reset(t *testing.T) { + t.Run("preserves Symbols capacity", func(t *testing.T) { + const symbolsCap = 100 + req := &PreallocWriteRequestV2{ + WriteRequestV2: WriteRequestV2{ + Symbols: make([]string, 0, symbolsCap), + }, + } + req.Symbols = append(req.Symbols, "a", "b", "c") + + ptrBefore := &req.Symbols[:cap(req.Symbols)][0] + + req.Reset() + + assert.Equal(t, 0, len(req.Symbols), "Symbols length should be 0 after Reset") + assert.Equal(t, symbolsCap, cap(req.Symbols), "Symbols capacity should be preserved after Reset") + assert.Same(t, ptrBefore, &req.Symbols[:cap(req.Symbols)][0], "Symbols backing array should be reused after Reset") + }) + + t.Run("clears non-Symbols WriteRequestV2 fields", func(t *testing.T) { + b := []byte{1, 2, 3} + req := &PreallocWriteRequestV2{ + WriteRequestV2: WriteRequestV2{ + Source: RULE, + SkipLabelNameValidation: true, + Timeseries: []PreallocTimeseriesV2{{TimeSeriesV2: &TimeSeriesV2{}}}, + }, + data: &b, + } + + req.Reset() + + assert.Equal(t, SourceEnum(0), req.Source) + assert.False(t, req.SkipLabelNameValidation) + assert.Nil(t, req.Timeseries) + assert.Nil(t, req.data) + }) + + t.Run("Unmarshal after Reset reuses Symbols backing array", func(t *testing.T) { + const symbolsCount = 50 + symbols := make([]string, symbolsCount) + for i := range symbols { + symbols[i] = fmt.Sprintf("symbol_%04d", i) + } + data, err := (&WriteRequestV2{Symbols: symbols}).Marshal() + require.NoError(t, err) + + req := &PreallocWriteRequestV2{ + WriteRequestV2: WriteRequestV2{ + Symbols: make([]string, 0, symbolsCount*2), + }, + } + + // Simulate Reset in util.ParseProtoReader() + req.Reset() + ptrAfterReset := &req.Symbols[:cap(req.Symbols)][0] + capAfterReset := cap(req.Symbols) + + require.NoError(t, req.WriteRequestV2.Unmarshal(data)) + assert.Equal(t, symbolsCount, len(req.Symbols)) + assert.Equal(t, capAfterReset, cap(req.Symbols), "capacity should not change: Unmarshal reused the existing backing array") + assert.Same(t, ptrAfterReset, &req.Symbols[:cap(req.Symbols)][0], "backing array pointer should be identical: no new allocation occurred") + }) +} + func BenchmarkMarshallWriteRequestV2(b *testing.B) { ts := PreallocTimeseriesV2SliceFromPool() diff --git a/pkg/util/push/push.go b/pkg/util/push/push.go index 675785c6230..aa5b085380b 100644 --- a/pkg/util/push/push.go +++ b/pkg/util/push/push.go @@ -94,14 +94,14 @@ func Handler(remoteWrite2Enabled bool, acceptUnknownRemoteWriteContentType bool, return } - var req cortexpb.PreallocWriteRequestV2 + req := cortexpb.PreallocWriteRequestV2FromPool() // v1 request is put back into the pool by the Distributor. defer func() { - cortexpb.ReuseWriteRequestV2(&req) + cortexpb.ReuseWriteRequestV2(req) req.Free() }() - err = util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, &req, util.RawSnappy) + err = util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, req, util.RawSnappy) if err != nil { level.Error(logger).Log("err", err.Error()) http.Error(w, err.Error(), http.StatusBadRequest) @@ -113,7 +113,7 @@ func Handler(remoteWrite2Enabled bool, acceptUnknownRemoteWriteContentType bool, req.Source = cortexpb.API } - v1Req, err := convertV2RequestToV1(&req, overrides.EnableTypeAndUnitLabels(userID), overrides.EnableStartTimestamp(userID)) + v1Req, err := convertV2RequestToV1(req, overrides.EnableTypeAndUnitLabels(userID), overrides.EnableStartTimestamp(userID)) if err != nil { level.Error(logger).Log("err", err.Error()) http.Error(w, err.Error(), http.StatusBadRequest) From cc33c3ae6aa2e3447d69bef4a42bde34afc920d6 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Thu, 9 Apr 2026 15:49:26 +0900 Subject: [PATCH 2/4] delete req.Free() due to http path Signed-off-by: SungJin1212 --- pkg/util/push/push.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pkg/util/push/push.go b/pkg/util/push/push.go index aa5b085380b..fe8efe53e94 100644 --- a/pkg/util/push/push.go +++ b/pkg/util/push/push.go @@ -96,10 +96,7 @@ func Handler(remoteWrite2Enabled bool, acceptUnknownRemoteWriteContentType bool, req := cortexpb.PreallocWriteRequestV2FromPool() // v1 request is put back into the pool by the Distributor. - defer func() { - cortexpb.ReuseWriteRequestV2(req) - req.Free() - }() + defer cortexpb.ReuseWriteRequestV2(req) err = util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, req, util.RawSnappy) if err != nil { From 6054deecb75ca316df0a882af4a0ba0f10257b97 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Sat, 25 Apr 2026 11:30:25 +0900 Subject: [PATCH 3/4] Add benchmark Signed-off-by: SungJin1212 --- pkg/util/push/push_test.go | 119 ++++++++++++++++++++++++++++++++++++- 1 file changed, 117 insertions(+), 2 deletions(-) diff --git a/pkg/util/push/push_test.go b/pkg/util/push/push_test.go index 3021449c228..2f2de6840d8 100644 --- a/pkg/util/push/push_test.go +++ b/pkg/util/push/push_test.go @@ -24,10 +24,14 @@ import ( "github.com/weaveworks/common/user" "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/validation" ) +// benchMaxRecvMsgSize is the max message size used in benchmarks. +const benchMaxRecvMsgSize = 100 * 1024 * 1024 + var ( testHistogram = histogram.Histogram{ Schema: 2, @@ -42,6 +46,51 @@ var ( } ) +// makeV2ReqWithSeriesAndSymbols builds a PRW2 request with the given number of +// series and symbols +func makeV2ReqWithSeriesAndSymbols(seriesNum, symbolCount int) *cortexpb.PreallocWriteRequestV2 { + const baseSymbols = 5 // "", "__name__", "bench_metric", "help text", "unit" + if symbolCount < baseSymbols { + symbolCount = baseSymbols + } + + symbols := make([]string, 0, symbolCount) + symbols = append(symbols, "", "__name__", "bench_metric", "help text", "unit") + + extraPairs := (symbolCount - baseSymbols) / 2 + for i := 0; i < extraPairs; i++ { + symbols = append(symbols, fmt.Sprintf("lbl_%d", i), fmt.Sprintf("val_%d", i)) + } + + labelsRefs := []uint32{1, 2} // __name__ = "bench_metric" + for i := 0; i < extraPairs; i++ { + nameIdx := uint32(baseSymbols + i*2) + labelsRefs = append(labelsRefs, nameIdx, nameIdx+1) + } + + ts := make([]cortexpb.PreallocTimeseriesV2, 0, seriesNum) + for range seriesNum { + ts = append(ts, cortexpb.PreallocTimeseriesV2{ + TimeSeriesV2: &cortexpb.TimeSeriesV2{ + LabelsRefs: labelsRefs, + Metadata: cortexpb.MetadataV2{ + Type: cortexpb.METRIC_TYPE_GAUGE, + HelpRef: 3, + UnitRef: 4, + }, + Samples: []cortexpb.Sample{{Value: 1, TimestampMs: 10}}, + }, + }) + } + + return &cortexpb.PreallocWriteRequestV2{ + WriteRequestV2: cortexpb.WriteRequestV2{ + Symbols: symbols, + Timeseries: ts, + }, + } +} + func makeV2ReqWithSeries(num int) *cortexpb.PreallocWriteRequestV2 { ts := make([]cortexpb.PreallocTimeseriesV2, 0, num) symbols := []string{"", "__name__", "test_metric1", "b", "c", "baz", "qux", "d", "e", "foo", "bar", "f", "g", "h", "i", "Test gauge for test purposes", "Maybe op/sec who knows (:", "Test counter for test purposes"} @@ -50,8 +99,7 @@ func makeV2ReqWithSeries(num int) *cortexpb.PreallocWriteRequestV2 { TimeSeriesV2: &cortexpb.TimeSeriesV2{ LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, Metadata: cortexpb.MetadataV2{ - Type: cortexpb.METRIC_TYPE_GAUGE, - + Type: cortexpb.METRIC_TYPE_GAUGE, HelpRef: 15, UnitRef: 16, }, @@ -178,6 +226,73 @@ func Benchmark_convertV2RequestToV1(b *testing.B) { } } +func makeEncodedPRW2Body(b *testing.B, seriesNum, symbolCount int) (body []byte, contentLength int) { + b.Helper() + series := makeV2ReqWithSeriesAndSymbols(seriesNum, symbolCount) + protobuf, err := series.Marshal() + if err != nil { + b.Fatal(err) + } + encoded := snappy.Encode(nil, protobuf) + return encoded, len(encoded) +} + +// runPRW2HandleFromPool simulates handlePRW2 using the sync.Pool +func runPRW2HandleFromPool(ctx context.Context, body []byte, contentLength int, overrides *validation.Overrides, userID string) error { + req := cortexpb.PreallocWriteRequestV2FromPool() + defer cortexpb.ReuseWriteRequestV2(req) + + if err := util.ParseProtoReader(ctx, bytes.NewReader(body), contentLength, benchMaxRecvMsgSize, req, util.RawSnappy); err != nil { + return err + } + _, err := convertV2RequestToV1(req, overrides.EnableTypeAndUnitLabels(userID), overrides.EnableStartTimestamp(userID)) + return err +} + +// runPRW2HandleFromScratch simulates handlePRW2 without using the sync.Pool. +func runPRW2HandleFromScratch(ctx context.Context, body []byte, contentLength int, overrides *validation.Overrides, userID string) error { + var req cortexpb.PreallocWriteRequestV2 + defer cortexpb.ReuseWriteRequestV2(&req) + + if err := util.ParseProtoReader(ctx, bytes.NewReader(body), contentLength, benchMaxRecvMsgSize, &req, util.RawSnappy); err != nil { + return err + } + _, err := convertV2RequestToV1(&req, overrides.EnableTypeAndUnitLabels(userID), overrides.EnableStartTimestamp(userID)) + return err +} + +// Benchmark_HandlePRW2_PoolVsScratch compares two allocation strategies for the PRW2 parse path. +// - pool: req := cortexpb.PreallocWriteRequestV2FromPool() + defer ReuseWriteRequestV2(req) +// - scratch: var req cortexpb.PreallocWriteRequestV2 + defer ReuseWriteRequestV2(&req) +func Benchmark_HandlePRW2_PoolVsScratch(b *testing.B) { + var limits validation.Limits + flagext.DefaultValues(&limits) + overrides := validation.NewOverrides(limits, nil) + + userID := "bench-user" + seriesNum := 100 + ctx := user.InjectOrgID(context.Background(), userID) + + for _, symCount := range []int{32, 128, 512, 2048, 4096} { + body, contentLength := makeEncodedPRW2Body(b, seriesNum, symCount) + name := fmt.Sprintf("symbols=%d", symCount) + + b.Run("pool/"+name, func(b *testing.B) { + b.ReportAllocs() + for b.Loop() { + require.NoError(b, runPRW2HandleFromPool(ctx, body, contentLength, overrides, userID)) + } + }) + + b.Run("scratch/"+name, func(b *testing.B) { + b.ReportAllocs() + for b.Loop() { + require.NoError(b, runPRW2HandleFromScratch(ctx, body, contentLength, overrides, userID)) + } + }) + } +} + func Test_convertV2RequestToV1_WithEnableTypeAndUnitLabels(t *testing.T) { symbols := []string{"", "__name__", "test_metric1", "b", "c", "baz", "qux", "d", "e", "foo", "bar", "f", "g", "h", "i", "Test gauge for test purposes", "Maybe op/sec who knows (:", "Test counter for test purposes", "__type__", "exist type", "__unit__", "exist unit"} samples := []cortexpb.Sample{ From 8dc0555fbbcca28e32b48b16011f6687338aa2e1 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Sat, 25 Apr 2026 11:41:28 +0900 Subject: [PATCH 4/4] fix lint Signed-off-by: SungJin1212 --- pkg/util/push/push_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/util/push/push_test.go b/pkg/util/push/push_test.go index 2f2de6840d8..fb89bc3d640 100644 --- a/pkg/util/push/push_test.go +++ b/pkg/util/push/push_test.go @@ -58,12 +58,12 @@ func makeV2ReqWithSeriesAndSymbols(seriesNum, symbolCount int) *cortexpb.Preallo symbols = append(symbols, "", "__name__", "bench_metric", "help text", "unit") extraPairs := (symbolCount - baseSymbols) / 2 - for i := 0; i < extraPairs; i++ { + for i := range extraPairs { symbols = append(symbols, fmt.Sprintf("lbl_%d", i), fmt.Sprintf("val_%d", i)) } labelsRefs := []uint32{1, 2} // __name__ = "bench_metric" - for i := 0; i < extraPairs; i++ { + for i := range extraPairs { nameIdx := uint32(baseSymbols + i*2) labelsRefs = append(labelsRefs, nameIdx, nameIdx+1) }