Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions api/v1alpha1/discoverypolicy_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,61 @@ type PrometheusSource struct {
// Default: 5m. Example: "1m", "15m"
// +optional
Step *metav1.Duration `json:"step,omitempty"`
// ScoringStrategy applies a weighting function to data points before aggregation.
// Use this to give higher importance to data points that occur during specific time windows
// (e.g., working hours). Only used when queryType is "range".
// +optional
ScoringStrategy *ScoringStrategy `json:"scoringStrategy,omitempty"`
}

// ScoringStrategy configures how data point values are weighted before aggregation.
type ScoringStrategy struct {
// Type identifies the scoring strategy. Must be "worktime".
// +kubebuilder:validation:Enum=worktime
Type ScoringStrategyType `json:"type"`
// Worktime contains the configuration when type=worktime.
// +optional
Worktime *WorktimeStrategy `json:"worktime,omitempty"`
}

// ScoringStrategyType identifies the scoring strategy.
// +kubebuilder:validation:Enum=worktime
type ScoringStrategyType string

const (
// ScoringStrategyWorktime weights data points based on the time of day they occurred.
ScoringStrategyWorktime ScoringStrategyType = "worktime"
)

// WorktimeStrategy weights data points based on configurable time-of-day windows.
// Each window defines an hour range and a weight multiplier. Data points outside
// all defined windows receive zero weight by default.
type WorktimeStrategy struct {
// Windows defines the time-of-day windows and their weight multipliers.
// Hours are in 24h format (0-23) interpreted in the configured timezone.
// Windows must not overlap. Data points outside all windows have weight 0.
// +kubebuilder:validation:MinItems=1
Windows []WorktimeWindow `json:"windows"`
// Timezone is the IANA timezone name used to interpret window hours.
// Default: "UTC". Example: "Europe/Berlin", "America/New_York"
// +kubebuilder:default="UTC"
// +optional
Timezone string `json:"timezone,omitempty"`
}

// WorktimeWindow defines a single time-of-day window with a weight multiplier.
type WorktimeWindow struct {
// StartHour is the beginning of the window (inclusive, 0-23).
// +kubebuilder:validation:Minimum=0
// +kubebuilder:validation:Maximum=23
StartHour int32 `json:"startHour"`
// EndHour is the end of the window (exclusive, 1-24). Must be greater than startHour.
// +kubebuilder:validation:Minimum=1
// +kubebuilder:validation:Maximum=24
EndHour int32 `json:"endHour"`
// Weight is the multiplier applied to data point values within this window.
// Example: "1.0" (full weight), "0.3" (reduced), "0.0" (ignored)
Weight string `json:"weight"`
}

// RegistrySource defines OCI registry tag listing configuration for image discovery.
Expand Down
60 changes: 60 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

67 changes: 67 additions & 0 deletions config/crd/bases/drop.corewire.io_discoverypolicies.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,73 @@ spec:
- range
- instant
type: string
scoringStrategy:
description: |-
ScoringStrategy applies a weighting function to data points before aggregation.
Use this to give higher importance to data points that occur during specific time windows
(e.g., working hours). Only used when queryType is "range".
properties:
type:
allOf:
- enum:
- worktime
- enum:
- worktime
description: Type identifies the scoring strategy. Must
be "worktime".
type: string
worktime:
description: Worktime contains the configuration when
type=worktime.
properties:
timezone:
default: UTC
description: |-
Timezone is the IANA timezone name used to interpret window hours.
Default: "UTC". Example: "Europe/Berlin", "America/New_York"
type: string
windows:
description: |-
Windows defines the time-of-day windows and their weight multipliers.
Hours are in 24h format (0-23) interpreted in the configured timezone.
Windows must not overlap. Data points outside all windows have weight 0.
items:
description: WorktimeWindow defines a single time-of-day
window with a weight multiplier.
properties:
endHour:
description: EndHour is the end of the window
(exclusive, 1-24). Must be greater than
startHour.
format: int32
maximum: 24
minimum: 1
type: integer
startHour:
description: StartHour is the beginning of
the window (inclusive, 0-23).
format: int32
maximum: 23
minimum: 0
type: integer
weight:
description: |-
Weight is the multiplier applied to data point values within this window.
Example: "1.0" (full weight), "0.3" (reduced), "0.0" (ignored)
type: string
required:
- endHour
- startHour
- weight
type: object
minItems: 1
type: array
required:
- windows
type: object
required:
- type
type: object
step:
description: |-
Step is the resolution step for range queries (only used when lookback is set).
Expand Down
6 changes: 5 additions & 1 deletion internal/controller/discoverypolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,11 @@ func (r *DiscoveryPolicyReconciler) buildSource(ctx context.Context, src dropv1a
if src.Prometheus.Step != nil {
step = src.Prometheus.Step.Duration
}
return discovery.NewPrometheusSource(src.Prometheus.Endpoint, src.Prometheus.Query, src.Prometheus.QueryType, lookback, src.Prometheus.AggregationMethod, step, httpClient), nil
weighter, err := discovery.NewScoreWeighter(src.Prometheus.ScoringStrategy)
if err != nil {
return nil, fmt.Errorf("building scoring strategy: %w", err)
}
return discovery.NewPrometheusSource(src.Prometheus.Endpoint, src.Prometheus.Query, src.Prometheus.QueryType, lookback, src.Prometheus.AggregationMethod, step, weighter, httpClient), nil
case "registry":
if src.Registry == nil {
return nil, fmt.Errorf("registry config is required when type=registry")
Expand Down
33 changes: 30 additions & 3 deletions internal/discovery/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ type PrometheusSource struct {
Lookback time.Duration // time window for range queries
AggregationMethod *dropv1alpha1.AggregationMethod // nil = use last value; sum, count, avg, max
Step time.Duration // resolution step for range queries (default 5m)
Weighter ScoreWeighter // optional time-based weighting strategy
HTTPClient *http.Client
}

// NewPrometheusSource creates a new Prometheus discovery source.
func NewPrometheusSource(endpoint, query string, queryType dropv1alpha1.QueryType, lookback time.Duration, aggregationMethod *dropv1alpha1.AggregationMethod, step time.Duration, httpClient *http.Client) *PrometheusSource {
func NewPrometheusSource(endpoint, query string, queryType dropv1alpha1.QueryType, lookback time.Duration, aggregationMethod *dropv1alpha1.AggregationMethod, step time.Duration, weighter ScoreWeighter, httpClient *http.Client) *PrometheusSource {
if httpClient == nil {
httpClient = &http.Client{Timeout: 30 * time.Second}
}
Expand All @@ -44,6 +45,7 @@ func NewPrometheusSource(endpoint, query string, queryType dropv1alpha1.QueryTyp
Lookback: lookback,
AggregationMethod: aggregationMethod,
Step: step,
Weighter: weighter,
HTTPClient: httpClient,
}
}
Expand Down Expand Up @@ -121,7 +123,7 @@ func (p *PrometheusSource) Fetch(ctx context.Context) ([]ImageResult, error) {
var score int64
if p.QueryType == dropv1alpha1.QueryTypeRange {
// Range query: aggregate values according to configured method (nil = last value)
score = aggregateRangeValues(r.Values, p.AggregationMethod)
score = aggregateRangeValues(r.Values, p.AggregationMethod, p.Weighter)
} else {
// Instant query: use single value
score = extractScore(r.Value)
Expand Down Expand Up @@ -159,7 +161,8 @@ func extractScore(value []interface{}) int64 {

// aggregateRangeValues aggregates all values from a query_range result using the specified method.
// When method is nil, the last data-point value is used directly (no aggregation).
func aggregateRangeValues(values [][]interface{}, method *dropv1alpha1.AggregationMethod) int64 {
// When weighter is non-nil, each data point value is multiplied by the weight for its timestamp.
func aggregateRangeValues(values [][]interface{}, method *dropv1alpha1.AggregationMethod, weighter ScoreWeighter) int64 {
// nil = no aggregation, use last data-point value directly
if method == nil {
if len(values) == 0 {
Expand All @@ -177,6 +180,10 @@ func aggregateRangeValues(values [][]interface{}, method *dropv1alpha1.Aggregati
if _, err := fmt.Sscanf(strVal, "%f", &v); err != nil {
return 0
}
if weighter != nil {
ts := extractTimestamp(lastPair[0])
v *= weighter.Weight(ts)
}
return int64(v)
}

Expand All @@ -197,6 +204,10 @@ func aggregateRangeValues(values [][]interface{}, method *dropv1alpha1.Aggregati
if _, err := fmt.Sscanf(strVal, "%f", &v); err != nil {
continue
}
if weighter != nil {
ts := extractTimestamp(pair[0])
v *= weighter.Weight(ts)
}
total += v
count++
if !maxSet || v > max {
Expand All @@ -219,3 +230,19 @@ func aggregateRangeValues(values [][]interface{}, method *dropv1alpha1.Aggregati
return int64(total)
}
}

// extractTimestamp parses a Unix timestamp from a Prometheus data point.
func extractTimestamp(raw interface{}) time.Time {
switch v := raw.(type) {
case float64:
return time.Unix(int64(v), 0).UTC()
case json.Number:
f, err := v.Float64()
if err != nil {
return time.Time{}
}
return time.Unix(int64(f), 0).UTC()
default:
return time.Time{}
}
}
6 changes: 3 additions & 3 deletions internal/discovery/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestPrometheusSource_Fetch_Instant(t *testing.T) {
}))
defer server.Close()

source := NewPrometheusSource(server.URL, "test_query", dropv1alpha1.QueryTypeInstant, 0, nil, 0, server.Client())
source := NewPrometheusSource(server.URL, "test_query", dropv1alpha1.QueryTypeInstant, 0, nil, 0, nil, server.Client())
results, err := source.Fetch(context.Background())

if tt.wantErr {
Expand Down Expand Up @@ -287,7 +287,7 @@ func TestPrometheusSource_Fetch_Range(t *testing.T) {
}))
defer server.Close()

source := NewPrometheusSource(server.URL, "test_query", dropv1alpha1.QueryTypeRange, time.Hour, tt.aggregationMethod, 5*time.Minute, server.Client())
source := NewPrometheusSource(server.URL, "test_query", dropv1alpha1.QueryTypeRange, time.Hour, tt.aggregationMethod, 5*time.Minute, nil, server.Client())
results, err := source.Fetch(context.Background())
if err != nil {
t.Fatalf("unexpected error: %v", err)
Expand Down Expand Up @@ -323,7 +323,7 @@ func TestPrometheusSource_DefaultQueryType(t *testing.T) {
defer server.Close()

// Empty queryType should default to range
source := NewPrometheusSource(server.URL, "test_query", "", time.Hour, nil, 0, server.Client())
source := NewPrometheusSource(server.URL, "test_query", "", time.Hour, nil, 0, nil, server.Client())
if source.QueryType != dropv1alpha1.QueryTypeRange {
t.Errorf("default QueryType = %q, want %q", source.QueryType, dropv1alpha1.QueryTypeRange)
}
Expand Down
Loading