Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## master / unreleased
* [CHANGE] Querier: Make query time range configurations per-tenant: `query_ingesters_within`, `query_store_after`, and `shuffle_sharding_ingesters_lookback_period`. Uses `model.Duration` instead of `time.Duration` to support serialization but has minimum unit of 1ms (nanoseconds/microseconds not supported). #7160
* [FEATURE] Ruler: Add per-tenant `ruler_alert_generator_url_template` runtime config option to customize alert generator URLs using Go templates. Supports Grafana Explore, Perses, and other UIs. #7302
* [FEATURE] Distributor: Add experimental `-distributor.enable-start-timestamp` flag for Prometheus Remote Write 2.0. When enabled, `StartTimestamp (ST)` is ingested. #7371
* [FEATURE] Memberlist: Add `-memberlist.cluster-label` and `-memberlist.cluster-label-verification-disabled` to prevent accidental cross-cluster gossip joins and support rolling label rollout. #7385
Expand All @@ -10,11 +11,11 @@
* [ENHANCEMENT] Metrics Helper: Add native histogram support for aggregating and merging, including dual-format histogram handling that exposes both native and classic bucket formats. #7359
* [ENHANCEMENT] Cache: Add per-tenant TTL configuration for query results cache to control cache expiration on a per-tenant basis with separate TTLs for regular and out-of-order data. #7357
* [ENHANCEMENT] Update build image and Go version to 1.26. #7437
* [CHANGE] Querier: Make query time range configurations per-tenant: `query_ingesters_within`, `query_store_after`, and `shuffle_sharding_ingesters_lookback_period`. Uses `model.Duration` instead of `time.Duration` to support serialization but has minimum unit of 1ms (nanoseconds/microseconds not supported). #7160
* [ENHANCEMENT] Tenant Federation: Add a local cache to regex resolver. #7363
* [ENHANCEMENT] Query Scheduler: Add `cortex_query_scheduler_tracked_requests` metric to track the current number of requests held by the scheduler. #7355
* [ENHANCEMENT] Compactor: Prevent partition compaction to compact any blocks marked for deletion. #7391
* [ENHANCEMENT] Distributor: Optimize memory allocations by reusing the existing capacity of these pooled slices in the Prometheus Remote Write 2.0 path. #7392
* [ENHANCEMENT] Distributor: Optimize memory allocations by pooling PreallocWriteRequestV2 and preserving the capacity of the Symbols slice during resets. #7404
* [BUGFIX] Alertmanager: Fix disappearing user config and state when ring is temporarily unreachable. #7372
* [BUGFIX] Fix nil when ingester_query_max_attempts > 1. #7369
* [BUGFIX] Querier: Fix queryWithRetry and labelsWithRetry returning (nil, nil) on cancelled context by propagating ctx.Err(). #7370
Expand Down
8 changes: 8 additions & 0 deletions pkg/cortexpb/timeseriesv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,14 @@ func PreallocWriteRequestV2FromPool() *PreallocWriteRequestV2 {
return writeRequestPoolV2.Get().(*PreallocWriteRequestV2)
}

// Reset implements proto.Message and preserves the capacity of the Symbols slice.
func (p *PreallocWriteRequestV2) Reset() {
savedSymbols := p.Symbols
p.WriteRequestV2.Reset()
p.Symbols = savedSymbols[:0]
p.data = nil
}

Comment on lines +133 to +140
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Override a Reset() to preserve the capacity of the Symbols.

// PreallocTimeseriesV2SliceFromPool retrieves a slice of PreallocTimeseriesV2 from a sync.Pool.
// ReuseSliceV2 should be called once done.
func PreallocTimeseriesV2SliceFromPool() []PreallocTimeseriesV2 {
Expand Down
65 changes: 65 additions & 0 deletions pkg/cortexpb/timeseriesv2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,71 @@ func TestReuseWriteRequestV2(t *testing.T) {
})
}

func TestPreallocWriteRequestV2Reset(t *testing.T) {
t.Run("preserves Symbols capacity", func(t *testing.T) {
const symbolsCap = 100
req := &PreallocWriteRequestV2{
WriteRequestV2: WriteRequestV2{
Symbols: make([]string, 0, symbolsCap),
},
}
req.Symbols = append(req.Symbols, "a", "b", "c")

ptrBefore := &req.Symbols[:cap(req.Symbols)][0]

req.Reset()

assert.Equal(t, 0, len(req.Symbols), "Symbols length should be 0 after Reset")
assert.Equal(t, symbolsCap, cap(req.Symbols), "Symbols capacity should be preserved after Reset")
assert.Same(t, ptrBefore, &req.Symbols[:cap(req.Symbols)][0], "Symbols backing array should be reused after Reset")
})

t.Run("clears non-Symbols WriteRequestV2 fields", func(t *testing.T) {
b := []byte{1, 2, 3}
req := &PreallocWriteRequestV2{
WriteRequestV2: WriteRequestV2{
Source: RULE,
SkipLabelNameValidation: true,
Timeseries: []PreallocTimeseriesV2{{TimeSeriesV2: &TimeSeriesV2{}}},
},
data: &b,
}

req.Reset()

assert.Equal(t, SourceEnum(0), req.Source)
assert.False(t, req.SkipLabelNameValidation)
assert.Nil(t, req.Timeseries)
assert.Nil(t, req.data)
})

t.Run("Unmarshal after Reset reuses Symbols backing array", func(t *testing.T) {
const symbolsCount = 50
symbols := make([]string, symbolsCount)
for i := range symbols {
symbols[i] = fmt.Sprintf("symbol_%04d", i)
}
data, err := (&WriteRequestV2{Symbols: symbols}).Marshal()
require.NoError(t, err)

req := &PreallocWriteRequestV2{
WriteRequestV2: WriteRequestV2{
Symbols: make([]string, 0, symbolsCount*2),
},
}

// Simulate Reset in util.ParseProtoReader()
req.Reset()
ptrAfterReset := &req.Symbols[:cap(req.Symbols)][0]
capAfterReset := cap(req.Symbols)

require.NoError(t, req.WriteRequestV2.Unmarshal(data))
assert.Equal(t, symbolsCount, len(req.Symbols))
assert.Equal(t, capAfterReset, cap(req.Symbols), "capacity should not change: Unmarshal reused the existing backing array")
assert.Same(t, ptrAfterReset, &req.Symbols[:cap(req.Symbols)][0], "backing array pointer should be identical: no new allocation occurred")
})
}

func BenchmarkMarshallWriteRequestV2(b *testing.B) {
ts := PreallocTimeseriesV2SliceFromPool()

Expand Down
11 changes: 4 additions & 7 deletions pkg/util/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,11 @@ func Handler(remoteWrite2Enabled bool, acceptUnknownRemoteWriteContentType bool,
return
}

var req cortexpb.PreallocWriteRequestV2
req := cortexpb.PreallocWriteRequestV2FromPool()
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a req from the pool to reuse the capacity of the Symbols.

// v1 request is put back into the pool by the Distributor.
defer func() {
cortexpb.ReuseWriteRequestV2(&req)
req.Free()
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete the req.Free() since this path is a http.

}()
defer cortexpb.ReuseWriteRequestV2(req)

err = util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, &req, util.RawSnappy)
err = util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, req, util.RawSnappy)
if err != nil {
level.Error(logger).Log("err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
Expand All @@ -113,7 +110,7 @@ func Handler(remoteWrite2Enabled bool, acceptUnknownRemoteWriteContentType bool,
req.Source = cortexpb.API
}

v1Req, err := convertV2RequestToV1(&req, overrides.EnableTypeAndUnitLabels(userID), overrides.EnableStartTimestamp(userID))
v1Req, err := convertV2RequestToV1(req, overrides.EnableTypeAndUnitLabels(userID), overrides.EnableStartTimestamp(userID))
if err != nil {
level.Error(logger).Log("err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
Expand Down
119 changes: 117 additions & 2 deletions pkg/util/push/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@ import (
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/validation"
)

// benchMaxRecvMsgSize is the max message size used in benchmarks.
const benchMaxRecvMsgSize = 100 * 1024 * 1024

var (
testHistogram = histogram.Histogram{
Schema: 2,
Expand All @@ -42,6 +46,51 @@ var (
}
)

// makeV2ReqWithSeriesAndSymbols builds a PRW2 request with the given number of
// series and symbols
func makeV2ReqWithSeriesAndSymbols(seriesNum, symbolCount int) *cortexpb.PreallocWriteRequestV2 {
const baseSymbols = 5 // "", "__name__", "bench_metric", "help text", "unit"
if symbolCount < baseSymbols {
symbolCount = baseSymbols
}

symbols := make([]string, 0, symbolCount)
symbols = append(symbols, "", "__name__", "bench_metric", "help text", "unit")

extraPairs := (symbolCount - baseSymbols) / 2
for i := range extraPairs {
symbols = append(symbols, fmt.Sprintf("lbl_%d", i), fmt.Sprintf("val_%d", i))
}

labelsRefs := []uint32{1, 2} // __name__ = "bench_metric"
for i := range extraPairs {
nameIdx := uint32(baseSymbols + i*2)
labelsRefs = append(labelsRefs, nameIdx, nameIdx+1)
}

ts := make([]cortexpb.PreallocTimeseriesV2, 0, seriesNum)
for range seriesNum {
ts = append(ts, cortexpb.PreallocTimeseriesV2{
TimeSeriesV2: &cortexpb.TimeSeriesV2{
LabelsRefs: labelsRefs,
Metadata: cortexpb.MetadataV2{
Type: cortexpb.METRIC_TYPE_GAUGE,
HelpRef: 3,
UnitRef: 4,
},
Samples: []cortexpb.Sample{{Value: 1, TimestampMs: 10}},
},
})
}

return &cortexpb.PreallocWriteRequestV2{
WriteRequestV2: cortexpb.WriteRequestV2{
Symbols: symbols,
Timeseries: ts,
},
}
}

func makeV2ReqWithSeries(num int) *cortexpb.PreallocWriteRequestV2 {
ts := make([]cortexpb.PreallocTimeseriesV2, 0, num)
symbols := []string{"", "__name__", "test_metric1", "b", "c", "baz", "qux", "d", "e", "foo", "bar", "f", "g", "h", "i", "Test gauge for test purposes", "Maybe op/sec who knows (:", "Test counter for test purposes"}
Expand All @@ -50,8 +99,7 @@ func makeV2ReqWithSeries(num int) *cortexpb.PreallocWriteRequestV2 {
TimeSeriesV2: &cortexpb.TimeSeriesV2{
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
Metadata: cortexpb.MetadataV2{
Type: cortexpb.METRIC_TYPE_GAUGE,

Type: cortexpb.METRIC_TYPE_GAUGE,
HelpRef: 15,
UnitRef: 16,
},
Expand Down Expand Up @@ -178,6 +226,73 @@ func Benchmark_convertV2RequestToV1(b *testing.B) {
}
}

func makeEncodedPRW2Body(b *testing.B, seriesNum, symbolCount int) (body []byte, contentLength int) {
b.Helper()
series := makeV2ReqWithSeriesAndSymbols(seriesNum, symbolCount)
protobuf, err := series.Marshal()
if err != nil {
b.Fatal(err)
}
encoded := snappy.Encode(nil, protobuf)
return encoded, len(encoded)
}

// runPRW2HandleFromPool simulates handlePRW2 using the sync.Pool
func runPRW2HandleFromPool(ctx context.Context, body []byte, contentLength int, overrides *validation.Overrides, userID string) error {
req := cortexpb.PreallocWriteRequestV2FromPool()
defer cortexpb.ReuseWriteRequestV2(req)

if err := util.ParseProtoReader(ctx, bytes.NewReader(body), contentLength, benchMaxRecvMsgSize, req, util.RawSnappy); err != nil {
return err
}
_, err := convertV2RequestToV1(req, overrides.EnableTypeAndUnitLabels(userID), overrides.EnableStartTimestamp(userID))
return err
}

// runPRW2HandleFromScratch simulates handlePRW2 without using the sync.Pool.
func runPRW2HandleFromScratch(ctx context.Context, body []byte, contentLength int, overrides *validation.Overrides, userID string) error {
var req cortexpb.PreallocWriteRequestV2
defer cortexpb.ReuseWriteRequestV2(&req)

if err := util.ParseProtoReader(ctx, bytes.NewReader(body), contentLength, benchMaxRecvMsgSize, &req, util.RawSnappy); err != nil {
return err
}
_, err := convertV2RequestToV1(&req, overrides.EnableTypeAndUnitLabels(userID), overrides.EnableStartTimestamp(userID))
return err
}

// Benchmark_HandlePRW2_PoolVsScratch compares two allocation strategies for the PRW2 parse path.
// - pool: req := cortexpb.PreallocWriteRequestV2FromPool() + defer ReuseWriteRequestV2(req)
// - scratch: var req cortexpb.PreallocWriteRequestV2 + defer ReuseWriteRequestV2(&req)
func Benchmark_HandlePRW2_PoolVsScratch(b *testing.B) {
var limits validation.Limits
flagext.DefaultValues(&limits)
overrides := validation.NewOverrides(limits, nil)

userID := "bench-user"
seriesNum := 100
ctx := user.InjectOrgID(context.Background(), userID)

for _, symCount := range []int{32, 128, 512, 2048, 4096} {
body, contentLength := makeEncodedPRW2Body(b, seriesNum, symCount)
name := fmt.Sprintf("symbols=%d", symCount)

b.Run("pool/"+name, func(b *testing.B) {
b.ReportAllocs()
for b.Loop() {
require.NoError(b, runPRW2HandleFromPool(ctx, body, contentLength, overrides, userID))
}
})

b.Run("scratch/"+name, func(b *testing.B) {
b.ReportAllocs()
for b.Loop() {
require.NoError(b, runPRW2HandleFromScratch(ctx, body, contentLength, overrides, userID))
}
})
}
}

func Test_convertV2RequestToV1_WithEnableTypeAndUnitLabels(t *testing.T) {
symbols := []string{"", "__name__", "test_metric1", "b", "c", "baz", "qux", "d", "e", "foo", "bar", "f", "g", "h", "i", "Test gauge for test purposes", "Maybe op/sec who knows (:", "Test counter for test purposes", "__type__", "exist type", "__unit__", "exist unit"}
samples := []cortexpb.Sample{
Expand Down
Loading