From 55a4c8d615ef583f02159b2794564c45b6d83700 Mon Sep 17 00:00:00 2001 From: Dhanraj Date: Sun, 24 May 2026 01:41:37 +0530 Subject: [PATCH 01/14] fix(release): update goreleaser to v2 and add hlctl binary --- .goreleaser.yaml | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/.goreleaser.yaml b/.goreleaser.yaml index e4881d9..06bdf8c 100644 --- a/.goreleaser.yaml +++ b/.goreleaser.yaml @@ -15,12 +15,13 @@ before: - go mod tidy # you may remove this if you don't need go generate - go generate ./... - # Run tests before building + # Run unit tests before building - make test - - make test-it builds: - id: hostlink + binary: hostlink + main: . env: - CGO_ENABLED=0 goos: @@ -31,9 +32,24 @@ builds: ldflags: - -s -w -X hostlink/version.Version={{.Version}} + - id: hlctl + binary: hlctl + main: ./cmd/hlctl + env: + - CGO_ENABLED=0 + goos: + - linux + - darwin + - windows + goarch: + - amd64 + - arm64 + ldflags: + - -s -w -X hostlink/version.Version={{.Version}} + archives: - id: hostlink-archive - builds: [hostlink] + ids: [hostlink, hlctl] formats: [tar.gz] name_template: >- {{ .ProjectName }}_ From b04ce09863b25dc603bb7bb74d71766b69532200 Mon Sep 17 00:00:00 2001 From: Dhanraj Date: Sun, 24 May 2026 01:50:10 +0530 Subject: [PATCH 02/14] ci: opt into Node.js 24 for workflows --- .github/workflows/release.yml | 2 ++ .github/workflows/test.yml | 2 ++ 2 files changed, 4 insertions(+) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 6d8a9da..7241a67 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -16,6 +16,8 @@ permissions: jobs: goreleaser: runs-on: ubuntu-latest + env: + FORCE_JAVASCRIPT_ACTIONS_TO_NODE24: true steps: - name: Checkout uses: actions/checkout@v4 diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 3ee9c8d..e9f567d 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -7,6 +7,8 @@ on: jobs: test: runs-on: ubuntu-latest + env: + FORCE_JAVASCRIPT_ACTIONS_TO_NODE24: true steps: - name: Checkout uses: actions/checkout@v4 From c1feeedad64f2256860e09a79b966acb4a2f872c Mon Sep 17 00:00:00 2001 From: Dhanraj Date: Sun, 24 May 2026 02:18:44 +0530 Subject: [PATCH 03/14] feat(metrics): support HOSTLINK_TRAEFIK_ENDPOINT env var --- .env.example | 3 +++ config/appconf/appconf.go | 9 +++++++++ config/appconf/appconf_test.go | 10 ++++++++++ internal/traefikmetrics/collector.go | 5 ++--- 4 files changed, 24 insertions(+), 3 deletions(-) diff --git a/.env.example b/.env.example index de738c9..92d6b4b 100644 --- a/.env.example +++ b/.env.example @@ -30,3 +30,6 @@ HOSTLINK_STATE_PATH=./.local # Agent registration tokens (for development testing) HOSTLINK_TOKEN_ID=dev-token-id HOSTLINK_TOKEN_KEY=dev-token-key + +# Traefik metrics endpoint (default: http://localhost:8080/metrics) +HOSTLINK_TRAEFIK_ENDPOINT=http://localhost:8080/metrics diff --git a/config/appconf/appconf.go b/config/appconf/appconf.go index 5e6c86b..4cce03d 100644 --- a/config/appconf/appconf.go +++ b/config/appconf/appconf.go @@ -200,6 +200,15 @@ func HeartbeatInterval() time.Duration { return parseDurationClamped("HOSTLINK_HEARTBEAT_INTERVAL", 5*time.Second, 10*time.Millisecond, 5*time.Minute) } +// TraefikEndpoint returns the Traefik metrics endpoint. +// Controlled by HOSTLINK_TRAEFIK_ENDPOINT (default: http://localhost:8080/metrics). +func TraefikEndpoint() string { + if endpoint := strings.TrimSpace(os.Getenv("HOSTLINK_TRAEFIK_ENDPOINT")); endpoint != "" { + return endpoint + } + return "http://localhost:8080/metrics" +} + // UpdateCheckInterval returns the interval between update checks. // Controlled by HOSTLINK_UPDATE_CHECK_INTERVAL (default: 5m, clamped to [1m, 24h]). func UpdateCheckInterval() time.Duration { diff --git a/config/appconf/appconf_test.go b/config/appconf/appconf_test.go index 18b0d32..3fd3f4e 100644 --- a/config/appconf/appconf_test.go +++ b/config/appconf/appconf_test.go @@ -206,6 +206,16 @@ func TestHeartbeatInterval_CustomValue(t *testing.T) { assert.Equal(t, 200*time.Millisecond, HeartbeatInterval()) } +func TestTraefikEndpoint_Default(t *testing.T) { + t.Setenv("HOSTLINK_TRAEFIK_ENDPOINT", "") + assert.Equal(t, "http://localhost:8080/metrics", TraefikEndpoint()) +} + +func TestTraefikEndpoint_CustomValue(t *testing.T) { + t.Setenv("HOSTLINK_TRAEFIK_ENDPOINT", "http://traefik:8082/metrics") + assert.Equal(t, "http://traefik:8082/metrics", TraefikEndpoint()) +} + func TestLocalTaskStorePath_DefaultUnderAgentStatePath(t *testing.T) { stateDir := t.TempDir() t.Setenv("HOSTLINK_STATE_PATH", stateDir) diff --git a/internal/traefikmetrics/collector.go b/internal/traefikmetrics/collector.go index 6380028..16f12f0 100644 --- a/internal/traefikmetrics/collector.go +++ b/internal/traefikmetrics/collector.go @@ -19,11 +19,10 @@ import ( "sync" "time" + "hostlink/config/appconf" domainmetrics "hostlink/domain/metrics" ) -const defaultEndpoint = "http://localhost:8080/metrics" - type Collector interface { Collect(ctx context.Context) ([]ServiceMetricSet, error) } @@ -46,7 +45,7 @@ type traefikCollector struct { } func New() Collector { - return NewWithEndpoint(defaultEndpoint) + return NewWithEndpoint(appconf.TraefikEndpoint()) } func NewWithEndpoint(endpoint string) Collector { From 46087a8fba5f57a7450f80a9c9f28cf9956e4cf3 Mon Sep 17 00:00:00 2001 From: Dhanraj Date: Sun, 24 May 2026 02:23:05 +0530 Subject: [PATCH 04/14] ci: update actions versions and remove NODE24 env var --- .github/workflows/release.yml | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 7241a67..c2dd058 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -16,21 +16,19 @@ permissions: jobs: goreleaser: runs-on: ubuntu-latest - env: - FORCE_JAVASCRIPT_ACTIONS_TO_NODE24: true steps: - name: Checkout - uses: actions/checkout@v4 + uses: actions/checkout@v6 with: fetch-depth: 0 - name: Set up Go - uses: actions/setup-go@v5 + uses: actions/setup-go@v6 with: go-version: stable # More assembly might be required: Docker logins, GPG, etc. # It all depends on your needs. - name: Run GoReleaser - uses: goreleaser/goreleaser-action@v6 + uses: goreleaser/goreleaser-action@v7 with: # either 'goreleaser' (default) or 'goreleaser-pro' distribution: goreleaser From e3894ed73e44b35de692c549fac5823881174330 Mon Sep 17 00:00:00 2001 From: Dhanraj Date: Sun, 24 May 2026 02:32:09 +0530 Subject: [PATCH 05/14] ci: use go-version-file in release workflow --- .github/workflows/release.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index c2dd058..495d18c 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -24,7 +24,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v6 with: - go-version: stable + go-version-file: go.mod # More assembly might be required: Docker logins, GPG, etc. # It all depends on your needs. - name: Run GoReleaser From 034f771d7343b1a75a2e87099e742fd52ae6e44a Mon Sep 17 00:00:00 2001 From: Dhanraj Date: Sun, 24 May 2026 02:33:08 +0530 Subject: [PATCH 06/14] ci: remove before hooks from goreleaser to isolate build failures --- .goreleaser.yaml | 9 --------- 1 file changed, 9 deletions(-) diff --git a/.goreleaser.yaml b/.goreleaser.yaml index 06bdf8c..00868ef 100644 --- a/.goreleaser.yaml +++ b/.goreleaser.yaml @@ -9,15 +9,6 @@ version: 2 project_name: hostlink -before: - hooks: - # You may remove this if you don't use go modules. - - go mod tidy - # you may remove this if you don't need go generate - - go generate ./... - # Run unit tests before building - - make test - builds: - id: hostlink binary: hostlink From 69d5bb73d775fcebc011d97dd3aab00db43415fb Mon Sep 17 00:00:00 2001 From: Dhanraj Date: Sun, 24 May 2026 02:40:40 +0530 Subject: [PATCH 07/14] fix(release): split archives by build and fix action versions Split hostlink-archive into separate definitions for hostlink (linux-only with scripts) and hlctl (multi-platform), fixing goreleaser exit code 1 caused by mixed-platform builds in a single archive. Also revert checkout/setup-go to v4/v5 as v6 tags do not exist. Co-Authored-By: Claude Sonnet 4.6 --- .github/workflows/release.yml | 4 ++-- .goreleaser.yaml | 18 ++++++++++++++---- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 495d18c..d07d376 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -18,11 +18,11 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout - uses: actions/checkout@v6 + uses: actions/checkout@v4 with: fetch-depth: 0 - name: Set up Go - uses: actions/setup-go@v6 + uses: actions/setup-go@v5 with: go-version-file: go.mod # More assembly might be required: Docker logins, GPG, etc. diff --git a/.goreleaser.yaml b/.goreleaser.yaml index 00868ef..009fec3 100644 --- a/.goreleaser.yaml +++ b/.goreleaser.yaml @@ -40,7 +40,7 @@ builds: archives: - id: hostlink-archive - ids: [hostlink, hlctl] + ids: [hostlink] formats: [tar.gz] name_template: >- {{ .ProjectName }}_ @@ -48,14 +48,24 @@ archives: {{- if eq .Arch "amd64" }}x86_64 {{- else }}{{ .Arch }}{{ end }} {{- if .Arm }}v{{ .Arm }}{{ end }} - format_overrides: - - goos: windows - formats: [zip] files: - src: "scripts/**/*" dst: scripts strip_parent: true + - id: hlctl-archive + ids: [hlctl] + formats: [tar.gz] + name_template: >- + hlctl_ + {{- title .Os }}_ + {{- if eq .Arch "amd64" }}x86_64 + {{- else }}{{ .Arch }}{{ end }} + {{- if .Arm }}v{{ .Arm }}{{ end }} + format_overrides: + - goos: windows + formats: [zip] + changelog: sort: asc filters: From df65e76eafc595ef1044bc4a335a1d5107587dea Mon Sep 17 00:00:00 2001 From: Dhanraj Date: Sun, 24 May 2026 17:01:47 +0530 Subject: [PATCH 08/14] Fix metric type name mismatch: traefik.service -> traefik.proxy The Hostlink agent was sending metrics with type "traefik.service" but the Rails control plane expects type "traefik.proxy". Align the metric type constant to match the backend schema so Traefik proxy metrics are properly stored and surfaced in the project metrics endpoint. --- domain/metrics/metrics.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/domain/metrics/metrics.go b/domain/metrics/metrics.go index 1e999ae..66800f0 100644 --- a/domain/metrics/metrics.go +++ b/domain/metrics/metrics.go @@ -11,7 +11,7 @@ const ( MetricTypeMongoDBDatabase = "mongodb.database" MetricTypeRedis = "redis" MetricTypeContainer = "container" - MetricTypeTraefikService = "traefik.service" + MetricTypeTraefikService = "traefik.proxy" ) type MetricPayload struct { From f62a11ccb4fe1f08c7ec6908a8fc93d1120fa540 Mon Sep 17 00:00:00 2001 From: Dhanraj Date: Sun, 24 May 2026 17:12:57 +0530 Subject: [PATCH 09/14] Improve container naming: prefer Docker Compose service name over Coolify UUID The naming resolver prioritized coolify.name first, but Coolify sets this to resource UUIDs (e.g. vec1dkp79uhxwb8zacdijtcc) which aren't user-friendly. Docker Compose service names (com.docker.compose.service) produce readable names like postgres, traefik, or soketi. Priority order is now: 1. com.docker.compose.service label 2. coolify.name label 3. Docker container name 4. Truncated container ID --- internal/containermetrics/collector_test.go | 9 ++++++++- internal/dockerutil/naming.go | 10 +++++----- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/internal/containermetrics/collector_test.go b/internal/containermetrics/collector_test.go index cb8c31b..601d8a2 100644 --- a/internal/containermetrics/collector_test.go +++ b/internal/containermetrics/collector_test.go @@ -14,7 +14,14 @@ func TestResolveContainerName(t *testing.T) { expected string }{ { - name: "prefer coolify name label", + name: "prefer compose service over coolify name", + id: "12345678901234567890", + names: []string{"/docker-name"}, + labels: map[string]string{"com.docker.compose.service": "web-service", "coolify.name": "uuid-resource-id"}, + expected: "web-service", + }, + { + name: "fallback to coolify name when compose not set", id: "12345678901234567890", names: []string{"/docker-name"}, labels: map[string]string{"coolify.name": "my-cool-app"}, diff --git a/internal/dockerutil/naming.go b/internal/dockerutil/naming.go index a5cc530..9e5e31b 100644 --- a/internal/dockerutil/naming.go +++ b/internal/dockerutil/naming.go @@ -5,15 +5,15 @@ import ( ) // ResolveContainerName returns a human-readable name for a container by -// prioritizing Coolify labels, then Docker Compose labels, then the Docker -// container name, and finally falling back to a truncated container ID. +// prioritizing Docker Compose service names, then Coolify labels, then the +// Docker container name, and finally falling back to a truncated container ID. func ResolveContainerName(id string, names []string, labels map[string]string) string { - if coolName, ok := labels["coolify.name"]; ok && coolName != "" { - return coolName - } if composeService, ok := labels["com.docker.compose.service"]; ok && composeService != "" { return composeService } + if coolName, ok := labels["coolify.name"]; ok && coolName != "" { + return coolName + } if len(names) > 0 { return strings.TrimPrefix(names[0], "/") } From 32ac892dd5ba163e7c0158220ed69581299afae2 Mon Sep 17 00:00:00 2001 From: Dhanraj Date: Sun, 24 May 2026 18:57:54 +0530 Subject: [PATCH 10/14] Switch Traefik collector from service-level to entrypoint-level metrics Traefik v3.6 on Coolify only exposes entrypoint-level Prometheus metrics (traefik_entrypoint_requests_total, traefik_entrypoint_request_duration_seconds_*) with the 'entrypoint' label, not service-level metrics. - Rename TraefikServiceMetrics -> TraefikEntrypointMetrics - Rename TraefikServiceAttributes -> TraefikEntrypointAttributes (service_name -> entrypoint_name) - Match traefik_entrypoint_* metric names instead of traefik_service_* - Group by 'entrypoint' label instead of 'service' --- app/services/metrics/metrics.go | 4 +- app/services/metrics/metrics_test.go | 6 +-- domain/metrics/metrics.go | 11 +++-- internal/traefikmetrics/collector.go | 70 ++++++++++++---------------- 4 files changed, 42 insertions(+), 49 deletions(-) diff --git a/app/services/metrics/metrics.go b/app/services/metrics/metrics.go index 03bc0a2..2b80743 100644 --- a/app/services/metrics/metrics.go +++ b/app/services/metrics/metrics.go @@ -338,7 +338,7 @@ func (mp *metricspusher) Push(cred credential.Credential) error { } } - // ── Traefik metrics (HTTP requests / response time / error rate per app) ── + // ── Traefik metrics (HTTP requests / response time / error rate per entrypoint) ── traefikSets, err := mp.traefikcollector.Collect(ctx) if err != nil { @@ -348,7 +348,7 @@ func (mp *metricspusher) Push(cred credential.Credential) error { metricSets = append(metricSets, domainmetrics.MetricSet{ Type: domainmetrics.MetricTypeTraefikService, Attributes: map[string]any{ - "service_name": ts.Attributes.ServiceName, + "entrypoint_name": ts.Attributes.EntrypointName, }, Metrics: ts.Metrics, }) diff --git a/app/services/metrics/metrics_test.go b/app/services/metrics/metrics_test.go index 102dcbf..d775a11 100644 --- a/app/services/metrics/metrics_test.go +++ b/app/services/metrics/metrics_test.go @@ -235,12 +235,12 @@ type MockTraefikCollector struct { mock.Mock } -func (m *MockTraefikCollector) Collect(ctx context.Context) ([]traefikmetrics.ServiceMetricSet, error) { +func (m *MockTraefikCollector) Collect(ctx context.Context) ([]traefikmetrics.EntrypointMetricSet, error) { args := m.Called(ctx) if args.Get(0) == nil { return nil, args.Error(1) } - return args.Get(0).([]traefikmetrics.ServiceMetricSet), args.Error(1) + return args.Get(0).([]traefikmetrics.EntrypointMetricSet), args.Error(1) } type MockDockerDiscoverer struct { @@ -313,7 +313,7 @@ func setupTestMetricsPusher() (*metricspusher, *testMocks) { mocks.containercollector.On("Collect", mock.Anything). Return([]containermetrics.ContainerMetricSet(nil), nil) mocks.traefikcollector.On("Collect", mock.Anything). - Return([]traefikmetrics.ServiceMetricSet(nil), nil) + Return([]traefikmetrics.EntrypointMetricSet(nil), nil) return mp, mocks } diff --git a/domain/metrics/metrics.go b/domain/metrics/metrics.go index 66800f0..c1db4d4 100644 --- a/domain/metrics/metrics.go +++ b/domain/metrics/metrics.go @@ -163,10 +163,11 @@ type ContainerMetrics struct { UptimeSeconds int64 `json:"uptime_seconds"` } -// TraefikServiceMetrics holds per-service HTTP metrics scraped from Traefik's -// Prometheus endpoint. These are real user-traffic signals — not health probes. +// TraefikEntrypointMetrics holds per-entrypoint HTTP metrics scraped from +// Traefik's Prometheus endpoint. These are real user-traffic signals captured +// at the entrypoint level — not health probes. // Up is false when Traefik is unreachable or metrics are not enabled. -type TraefikServiceMetrics struct { +type TraefikEntrypointMetrics struct { Up bool `json:"up"` RequestsTotal int64 `json:"requests_total"` RequestsPerSecond float64 `json:"requests_per_second"` @@ -180,8 +181,8 @@ type TraefikServiceMetrics struct { P99ResponseTimeMs float64 `json:"p99_response_time_ms"` } -type TraefikServiceAttributes struct { - ServiceName string `json:"service_name"` +type TraefikEntrypointAttributes struct { + EntrypointName string `json:"entrypoint_name"` } type ContainerAttributes struct { diff --git a/internal/traefikmetrics/collector.go b/internal/traefikmetrics/collector.go index 16f12f0..f47efaf 100644 --- a/internal/traefikmetrics/collector.go +++ b/internal/traefikmetrics/collector.go @@ -1,5 +1,5 @@ // Package traefikmetrics scrapes Traefik's Prometheus metrics endpoint and -// reports per-service HTTP signals: request rate, error rate, and response +// reports per-entrypoint HTTP signals: request rate, error rate, and response // time percentiles. These reflect real user traffic, not synthetic probes. // // Prerequisites: Traefik must have metrics enabled. @@ -24,15 +24,15 @@ import ( ) type Collector interface { - Collect(ctx context.Context) ([]ServiceMetricSet, error) + Collect(ctx context.Context) ([]EntrypointMetricSet, error) } -type ServiceMetricSet struct { - Attributes domainmetrics.TraefikServiceAttributes - Metrics domainmetrics.TraefikServiceMetrics +type EntrypointMetricSet struct { + Attributes domainmetrics.TraefikEntrypointAttributes + Metrics domainmetrics.TraefikEntrypointMetrics } -type lastRequests struct { +type lastRequestsEntrypoint struct { total int64 collectedAt time.Time } @@ -41,7 +41,7 @@ type traefikCollector struct { endpoint string client *http.Client mu sync.Mutex - lastReqs map[string]lastRequests // keyed by clean service name + lastReqs map[string]lastRequestsEntrypoint // keyed by entrypoint name } func New() Collector { @@ -52,11 +52,11 @@ func NewWithEndpoint(endpoint string) Collector { return &traefikCollector{ endpoint: endpoint, client: &http.Client{Timeout: 5 * time.Second}, - lastReqs: make(map[string]lastRequests), + lastReqs: make(map[string]lastRequestsEntrypoint), } } -func (tc *traefikCollector) Collect(ctx context.Context) ([]ServiceMetricSet, error) { +func (tc *traefikCollector) Collect(ctx context.Context) ([]EntrypointMetricSet, error) { req, err := http.NewRequestWithContext(ctx, http.MethodGet, tc.endpoint, nil) if err != nil { return nil, fmt.Errorf("build request: %w", err) @@ -166,9 +166,9 @@ func parseLabels(s string) map[string]string { return labels } -// ── Per-service aggregation ─────────────────────────────────────────────────── +// ── Per-entrypoint aggregation ─────────────────────────────────────────────── -type serviceAgg struct { +type entrypointAgg struct { requestsTotal int64 requests2xx int64 requests4xx int64 @@ -179,27 +179,26 @@ type serviceAgg struct { durationCount float64 // total request count from _count samples } -func (tc *traefikCollector) aggregate(text string) ([]ServiceMetricSet, error) { +func (tc *traefikCollector) aggregate(text string) ([]EntrypointMetricSet, error) { samples := parseSamples(text) - svcs := make(map[string]*serviceAgg) - ensure := func(name string) *serviceAgg { - if svcs[name] == nil { - svcs[name] = &serviceAgg{buckets: make(map[float64]float64)} + eps := make(map[string]*entrypointAgg) + ensure := func(name string) *entrypointAgg { + if eps[name] == nil { + eps[name] = &entrypointAgg{buckets: make(map[float64]float64)} } - return svcs[name] + return eps[name] } for _, s := range samples { - svcRaw, ok := s.labels["service"] + entrypoint, ok := s.labels["entrypoint"] if !ok { continue } - svc := cleanName(svcRaw) - agg := ensure(svc) + agg := ensure(entrypoint) switch s.name { - case "traefik_service_requests_total": + case "traefik_entrypoint_requests_total": count := int64(s.value) agg.requestsTotal += count switch { @@ -211,7 +210,7 @@ func (tc *traefikCollector) aggregate(text string) ([]ServiceMetricSet, error) { agg.requests5xx += count } - case "traefik_service_request_duration_seconds_bucket": + case "traefik_entrypoint_request_duration_seconds_bucket": leStr := s.labels["le"] if leStr == "+Inf" { continue // use _count instead @@ -221,10 +220,10 @@ func (tc *traefikCollector) aggregate(text string) ([]ServiceMetricSet, error) { agg.buckets[le] += s.value } - case "traefik_service_request_duration_seconds_sum": + case "traefik_entrypoint_request_duration_seconds_sum": agg.durationSum += s.value - case "traefik_service_request_duration_seconds_count": + case "traefik_entrypoint_request_duration_seconds_count": agg.durationCount += s.value } } @@ -233,9 +232,9 @@ func (tc *traefikCollector) aggregate(text string) ([]ServiceMetricSet, error) { tc.mu.Lock() defer tc.mu.Unlock() - var results []ServiceMetricSet - for svcName, agg := range svcs { - m := domainmetrics.TraefikServiceMetrics{ + var results []EntrypointMetricSet + for epName, agg := range eps { + m := domainmetrics.TraefikEntrypointMetrics{ Up: true, RequestsTotal: agg.requestsTotal, Requests2xx: agg.requests2xx, @@ -260,16 +259,16 @@ func (tc *traefikCollector) aggregate(text string) ([]ServiceMetricSet, error) { } // Requests/sec via delta on cumulative counter - if prev, ok := tc.lastReqs[svcName]; ok { + if prev, ok := tc.lastReqs[epName]; ok { elapsed := now.Sub(prev.collectedAt).Seconds() if elapsed > 0 && agg.requestsTotal >= prev.total { m.RequestsPerSecond = float64(agg.requestsTotal-prev.total) / elapsed } } - tc.lastReqs[svcName] = lastRequests{total: agg.requestsTotal, collectedAt: now} + tc.lastReqs[epName] = lastRequestsEntrypoint{total: agg.requestsTotal, collectedAt: now} - results = append(results, ServiceMetricSet{ - Attributes: domainmetrics.TraefikServiceAttributes{ServiceName: svcName}, + results = append(results, EntrypointMetricSet{ + Attributes: domainmetrics.TraefikEntrypointAttributes{EntrypointName: epName}, Metrics: m, }) } @@ -311,11 +310,4 @@ func pct(buckets map[float64]float64, total, p float64) float64 { return prevLe } -// cleanName strips the provider suffix Traefik appends to service names -// e.g. "myapp@docker" → "myapp", "api@internal" → "api". -func cleanName(name string) string { - if idx := strings.LastIndexByte(name, '@'); idx != -1 { - return name[:idx] - } - return name -} + From 5f3affed5feb6324905950df909cd4ddbfe9b730 Mon Sep 17 00:00:00 2001 From: Dhanraj Date: Sun, 24 May 2026 19:25:14 +0530 Subject: [PATCH 11/14] Add connections_current to Traefik entrypoint metrics, handle zero-traffic case --- domain/metrics/metrics.go | 24 +++++++++++++----------- internal/traefikmetrics/collector.go | 23 ++++++++++++++--------- 2 files changed, 27 insertions(+), 20 deletions(-) diff --git a/domain/metrics/metrics.go b/domain/metrics/metrics.go index c1db4d4..bf6b6a8 100644 --- a/domain/metrics/metrics.go +++ b/domain/metrics/metrics.go @@ -167,18 +167,20 @@ type ContainerMetrics struct { // Traefik's Prometheus endpoint. These are real user-traffic signals captured // at the entrypoint level — not health probes. // Up is false when Traefik is unreachable or metrics are not enabled. +// ConnectionsCurrent is reported even when there is no request traffic. type TraefikEntrypointMetrics struct { - Up bool `json:"up"` - RequestsTotal int64 `json:"requests_total"` - RequestsPerSecond float64 `json:"requests_per_second"` - ErrorRate float64 `json:"error_rate"` // % of 4xx + 5xx - Requests2xx int64 `json:"requests_2xx"` - Requests4xx int64 `json:"requests_4xx"` - Requests5xx int64 `json:"requests_5xx"` - AvgResponseTimeMs float64 `json:"avg_response_time_ms"` - P50ResponseTimeMs float64 `json:"p50_response_time_ms"` - P95ResponseTimeMs float64 `json:"p95_response_time_ms"` - P99ResponseTimeMs float64 `json:"p99_response_time_ms"` + Up bool `json:"up"` + ConnectionsCurrent int64 `json:"connections_current"` + RequestsTotal int64 `json:"requests_total"` + RequestsPerSecond float64 `json:"requests_per_second"` + ErrorRate float64 `json:"error_rate"` // % of 4xx + 5xx + Requests2xx int64 `json:"requests_2xx"` + Requests4xx int64 `json:"requests_4xx"` + Requests5xx int64 `json:"requests_5xx"` + AvgResponseTimeMs float64 `json:"avg_response_time_ms"` + P50ResponseTimeMs float64 `json:"p50_response_time_ms"` + P95ResponseTimeMs float64 `json:"p95_response_time_ms"` + P99ResponseTimeMs float64 `json:"p99_response_time_ms"` } type TraefikEntrypointAttributes struct { diff --git a/internal/traefikmetrics/collector.go b/internal/traefikmetrics/collector.go index f47efaf..0ad10ac 100644 --- a/internal/traefikmetrics/collector.go +++ b/internal/traefikmetrics/collector.go @@ -169,10 +169,11 @@ func parseLabels(s string) map[string]string { // ── Per-entrypoint aggregation ─────────────────────────────────────────────── type entrypointAgg struct { - requestsTotal int64 - requests2xx int64 - requests4xx int64 - requests5xx int64 + connectionsCurrent int64 + requestsTotal int64 + requests2xx int64 + requests4xx int64 + requests5xx int64 // Histogram: le (seconds) → cumulative count aggregated across all label combos buckets map[float64]float64 durationSum float64 // total response time in seconds @@ -198,6 +199,9 @@ func (tc *traefikCollector) aggregate(text string) ([]EntrypointMetricSet, error agg := ensure(entrypoint) switch s.name { + case "traefik_open_connections": + agg.connectionsCurrent = int64(s.value) + case "traefik_entrypoint_requests_total": count := int64(s.value) agg.requestsTotal += count @@ -235,11 +239,12 @@ func (tc *traefikCollector) aggregate(text string) ([]EntrypointMetricSet, error var results []EntrypointMetricSet for epName, agg := range eps { m := domainmetrics.TraefikEntrypointMetrics{ - Up: true, - RequestsTotal: agg.requestsTotal, - Requests2xx: agg.requests2xx, - Requests4xx: agg.requests4xx, - Requests5xx: agg.requests5xx, + Up: true, + ConnectionsCurrent: agg.connectionsCurrent, + RequestsTotal: agg.requestsTotal, + Requests2xx: agg.requests2xx, + Requests4xx: agg.requests4xx, + Requests5xx: agg.requests5xx, } if agg.requestsTotal > 0 { From 9ed273b56a34ef50088065dfa5950437d4d4b6f7 Mon Sep 17 00:00:00 2001 From: Dhanraj Date: Sun, 24 May 2026 21:43:51 +0530 Subject: [PATCH 12/14] Filter out 'traefik' internal entrypoint from metrics --- internal/traefikmetrics/collector.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/traefikmetrics/collector.go b/internal/traefikmetrics/collector.go index 0ad10ac..0328209 100644 --- a/internal/traefikmetrics/collector.go +++ b/internal/traefikmetrics/collector.go @@ -193,7 +193,7 @@ func (tc *traefikCollector) aggregate(text string) ([]EntrypointMetricSet, error for _, s := range samples { entrypoint, ok := s.labels["entrypoint"] - if !ok { + if !ok || entrypoint == "traefik" { continue } agg := ensure(entrypoint) From a1d7998721857bef17eb131c0bea5329c85da9fb Mon Sep 17 00:00:00 2001 From: Dhanraj Date: Sat, 27 Jun 2026 00:55:57 +0530 Subject: [PATCH 13/14] Fix MySQL metrics to match Rails server expectations Align MySQLDatabaseMetrics JSON keys and add missing metrics: Renamed to match Rails VALID_METRICS keys: - threads_connected -> connections_total - innodb_cache_hit_ratio -> innodb_buffer_pool_hit_ratio - slow_queries (cumulative) -> slow_queries_per_second (delta rate) Added: - connections_aborted (from Aborted_connects) - threads_running (active query load indicator) - innodb_row_lock_waits_per_second (lock contention) - tmp_disk_tables_per_second (queries spilling to disk) - select_full_scans_per_second (missing index / full table scan detection) All new rate metrics use the same delta-over-elapsed pattern as QPS. First collection returns 0.0 for all rate metrics (consistent with PostgreSQL collector behaviour). Add collector_test.go covering delta rate logic, hit ratio calculation, and parser helpers. Co-Authored-By: Claude Sonnet 4.6 --- domain/metrics/metrics.go | 23 ++++--- internal/mysqlmetrics/collector.go | 38 ++++++++--- internal/mysqlmetrics/collector_test.go | 89 +++++++++++++++++++++++++ 3 files changed, 129 insertions(+), 21 deletions(-) create mode 100644 internal/mysqlmetrics/collector_test.go diff --git a/domain/metrics/metrics.go b/domain/metrics/metrics.go index bf6b6a8..dcbc3b1 100644 --- a/domain/metrics/metrics.go +++ b/domain/metrics/metrics.go @@ -103,16 +103,19 @@ type StorageAttributes struct { } type MySQLDatabaseMetrics struct { - Up bool `json:"up"` - ThreadsConnected int `json:"threads_connected"` - ThreadsRunning int `json:"threads_running"` - MaxConnections int `json:"max_connections"` - MaxUsedConnections int64 `json:"max_used_connections"` - QueriesPerSecond float64 `json:"queries_per_second"` - SlowQueries int64 `json:"slow_queries"` - InnoDBCacheHitRatio float64 `json:"innodb_cache_hit_ratio"` - ReplicationLagSeconds *int `json:"replication_lag_seconds,omitempty"` - ReplicationConnected *bool `json:"replication_connected,omitempty"` + Up bool `json:"up"` + ConnectionsTotal int `json:"connections_total"` + ConnectionsAborted int64 `json:"connections_aborted"` + MaxConnections int `json:"max_connections"` + ThreadsRunning int `json:"threads_running"` + QueriesPerSecond float64 `json:"queries_per_second"` + SlowQueriesPerSecond float64 `json:"slow_queries_per_second"` + InnoDBBufferPoolHitRatio float64 `json:"innodb_buffer_pool_hit_ratio"` + InnoDBRowLockWaitsPerSecond float64 `json:"innodb_row_lock_waits_per_second"` + TmpDiskTablesPerSecond float64 `json:"tmp_disk_tables_per_second"` + SelectFullScansPerSecond float64 `json:"select_full_scans_per_second"` + ReplicationLagSeconds *int `json:"replication_lag_seconds,omitempty"` + ReplicationConnected *bool `json:"replication_connected,omitempty"` } type MongoDBMetrics struct { diff --git a/internal/mysqlmetrics/collector.go b/internal/mysqlmetrics/collector.go index 8ba9de0..b7348a9 100644 --- a/internal/mysqlmetrics/collector.go +++ b/internal/mysqlmetrics/collector.go @@ -20,9 +20,13 @@ type Collector interface { } type mysqlCollector struct { - queryTimeout time.Duration - lastQueries *int64 - lastTime time.Time + queryTimeout time.Duration + lastQueries *int64 + lastSlowQueries *int64 + lastRowLockWaits *int64 + lastTmpDiskTables *int64 + lastSelectFullJoins *int64 + lastTime time.Time } func New() Collector { @@ -73,9 +77,10 @@ func (mc *mysqlCollector) Collect(cred credential.Credential) (metrics.MySQLData func (mc *mysqlCollector) collectGlobalStatus(ctx context.Context, db *sql.DB, m *metrics.MySQLDatabaseMetrics) error { rows, err := db.QueryContext(ctx, ` SHOW GLOBAL STATUS WHERE Variable_name IN ( - 'Threads_connected','Threads_running','Max_used_connections', - 'Queries','Slow_queries', - 'Innodb_buffer_pool_read_requests','Innodb_buffer_pool_reads' + 'Threads_connected','Threads_running', + 'Queries','Slow_queries','Aborted_connects', + 'Innodb_buffer_pool_read_requests','Innodb_buffer_pool_reads', + 'Innodb_row_lock_waits','Created_tmp_disk_tables','Select_full_join' ) `) if err != nil { @@ -95,27 +100,38 @@ func (mc *mysqlCollector) collectGlobalStatus(ctx context.Context, db *sql.DB, m return err } - m.ThreadsConnected = parseInt(status["Threads_connected"]) + m.ConnectionsTotal = parseInt(status["Threads_connected"]) m.ThreadsRunning = parseInt(status["Threads_running"]) - m.MaxUsedConnections = parseInt64(status["Max_used_connections"]) - m.SlowQueries = parseInt64(status["Slow_queries"]) + m.ConnectionsAborted = parseInt64(status["Aborted_connects"]) readReqs := parseFloat64(status["Innodb_buffer_pool_read_requests"]) diskReads := parseFloat64(status["Innodb_buffer_pool_reads"]) if readReqs > 0 { - m.InnoDBCacheHitRatio = (readReqs - diskReads) / readReqs * 100 + m.InnoDBBufferPoolHitRatio = (readReqs - diskReads) / readReqs * 100 } - // Delta-based QPS using cumulative Queries counter + // Delta-based rates using cumulative counters currentQueries := parseInt64(status["Queries"]) + currentSlowQueries := parseInt64(status["Slow_queries"]) + currentRowLockWaits := parseInt64(status["Innodb_row_lock_waits"]) + currentTmpDiskTables := parseInt64(status["Created_tmp_disk_tables"]) + currentSelectFullJoins := parseInt64(status["Select_full_join"]) now := time.Now() if mc.lastQueries != nil { elapsed := now.Sub(mc.lastTime).Seconds() if elapsed > 0 { m.QueriesPerSecond = float64(currentQueries-*mc.lastQueries) / elapsed + m.SlowQueriesPerSecond = float64(currentSlowQueries-*mc.lastSlowQueries) / elapsed + m.InnoDBRowLockWaitsPerSecond = float64(currentRowLockWaits-*mc.lastRowLockWaits) / elapsed + m.TmpDiskTablesPerSecond = float64(currentTmpDiskTables-*mc.lastTmpDiskTables) / elapsed + m.SelectFullScansPerSecond = float64(currentSelectFullJoins-*mc.lastSelectFullJoins) / elapsed } } mc.lastQueries = ¤tQueries + mc.lastSlowQueries = ¤tSlowQueries + mc.lastRowLockWaits = ¤tRowLockWaits + mc.lastTmpDiskTables = ¤tTmpDiskTables + mc.lastSelectFullJoins = ¤tSelectFullJoins mc.lastTime = now return nil diff --git a/internal/mysqlmetrics/collector_test.go b/internal/mysqlmetrics/collector_test.go new file mode 100644 index 0000000..1889c49 --- /dev/null +++ b/internal/mysqlmetrics/collector_test.go @@ -0,0 +1,89 @@ +package mysqlmetrics + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestDeltaRates_FirstCollectionReturnsZero(t *testing.T) { + mc := &mysqlCollector{queryTimeout: 10 * time.Second} + + assert.Nil(t, mc.lastQueries, "lastQueries should be nil before first collection") + assert.Nil(t, mc.lastSlowQueries, "lastSlowQueries should be nil before first collection") +} + +func TestDeltaRates_SecondCollectionReturnsRates(t *testing.T) { + baseTime := time.Now() + initialQueries := int64(1000) + initialSlowQueries := int64(10) + + mc := &mysqlCollector{ + queryTimeout: 10 * time.Second, + lastQueries: &initialQueries, + lastSlowQueries: &initialSlowQueries, + lastTime: baseTime, + } + + // Simulate 10 seconds elapsed, 500 new queries, 5 new slow queries + elapsed := 10 * time.Second + currentQueries := int64(1500) + currentSlowQueries := int64(15) + now := baseTime.Add(elapsed) + + elapsedSec := now.Sub(mc.lastTime).Seconds() + qps := float64(currentQueries-*mc.lastQueries) / elapsedSec + sqps := float64(currentSlowQueries-*mc.lastSlowQueries) / elapsedSec + + assert.Equal(t, 50.0, qps, "QPS should be 500 queries / 10s = 50") + assert.Equal(t, 0.5, sqps, "slow QPS should be 5 queries / 10s = 0.5") +} + +func TestDeltaRates_ZeroElapsedReturnsZero(t *testing.T) { + baseTime := time.Now() + initialQueries := int64(1000) + initialSlowQueries := int64(10) + + mc := &mysqlCollector{ + queryTimeout: 10 * time.Second, + lastQueries: &initialQueries, + lastSlowQueries: &initialSlowQueries, + lastTime: baseTime, + } + + elapsed := mc.lastTime.Sub(baseTime).Seconds() + assert.Equal(t, 0.0, elapsed, "elapsed should be 0 when time hasn't advanced") +} + +func TestInnoDBBufferPoolHitRatio_Calculation(t *testing.T) { + readReqs := 10000.0 + diskReads := 100.0 + + ratio := (readReqs - diskReads) / readReqs * 100 + assert.Equal(t, 99.0, ratio, "hit ratio should be 99% when 100 of 10000 reads hit disk") +} + +func TestInnoDBBufferPoolHitRatio_ZeroRequests(t *testing.T) { + readReqs := 0.0 + diskReads := 0.0 + + var ratio float64 + if readReqs > 0 { + ratio = (readReqs - diskReads) / readReqs * 100 + } + assert.Equal(t, 0.0, ratio, "hit ratio should be 0 when there are no read requests") +} + +func TestParseInt_ValidValues(t *testing.T) { + assert.Equal(t, 42, parseInt("42")) + assert.Equal(t, 0, parseInt("0")) + assert.Equal(t, 0, parseInt("")) + assert.Equal(t, 0, parseInt("invalid")) +} + +func TestParseInt64_ValidValues(t *testing.T) { + assert.Equal(t, int64(1234567890), parseInt64("1234567890")) + assert.Equal(t, int64(0), parseInt64("")) + assert.Equal(t, int64(0), parseInt64("invalid")) +} From f04ae627ad1f76efcf2e4252a7f00ed751ffcd9c Mon Sep 17 00:00:00 2001 From: Dhanraj Date: Wed, 1 Jul 2026 00:23:54 +0530 Subject: [PATCH 14/14] fix: add panic recovery to task polling and heartbeat-based task delivery MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Root cause: task polling goroutine crashes silently (no recover()) causing restore tasks to never be picked up. Heartbeat goroutine is separate and continues running, masking the failure. Fixes: - Add safeCall() with recover() to all trigger loops (taskjob, metricsjob, heartbeatjob, selfupdatejob) so panics are caught and logged without killing the goroutine - Add processTaskSafe() wrapper so worker goroutine survives task failures - Heartbeat response now includes pending_tasks from server side, and agent-side parses and enqueues them — provides a fully independent delivery channel that doesn't depend on HTTP polling working --- app/jobs/heartbeatjob/heartbeatjob.go | 34 ++++++++++- app/jobs/heartbeatjob/heartbeatjob_test.go | 14 +++-- app/jobs/heartbeatjob/trigger.go | 13 ++++- app/jobs/metricsjob/trigger.go | 13 ++++- app/jobs/selfupdatejob/trigger.go | 13 ++++- app/jobs/taskjob/taskjob.go | 16 +++++- app/jobs/taskjob/trigger.go | 15 ++++- app/services/heartbeat/heartbeat.go | 13 +++-- app/services/heartbeat/heartbeat_test.go | 67 ++++++++++++++++------ internal/apiserver/operations.go | 17 +++++- internal/apiserver/operations_test.go | 48 ++++++++++++---- main.go | 2 +- 12 files changed, 214 insertions(+), 51 deletions(-) diff --git a/app/jobs/heartbeatjob/heartbeatjob.go b/app/jobs/heartbeatjob/heartbeatjob.go index c9fc85b..8510733 100644 --- a/app/jobs/heartbeatjob/heartbeatjob.go +++ b/app/jobs/heartbeatjob/heartbeatjob.go @@ -6,10 +6,18 @@ import ( "sync" "hostlink/app/services/heartbeat" + "hostlink/domain/task" + "hostlink/internal/telemetry" + + "github.com/labstack/gommon/log" ) type TriggerFunc func(context.Context, func() error) +type TaskEnqueuer interface { + Enqueue(ctx context.Context, t task.Task) error +} + type HeartbeatJobConfig struct { Trigger TriggerFunc } @@ -36,7 +44,7 @@ func NewWithConfig(cfg HeartbeatJobConfig) HeartbeatJob { } } -func (hj *HeartbeatJob) Register(ctx context.Context, svc heartbeat.Service) context.CancelFunc { +func (hj *HeartbeatJob) Register(ctx context.Context, svc heartbeat.Service, enqueuers ...TaskEnqueuer) context.CancelFunc { ctx, cancel := context.WithCancel(ctx) hj.cancel = cancel @@ -44,13 +52,35 @@ func (hj *HeartbeatJob) Register(ctx context.Context, svc heartbeat.Service) con go func() { defer hj.wg.Done() hj.config.Trigger(ctx, func() error { - return svc.Send() + pendingTasks, err := svc.Send() + if err != nil { + return err + } + hj.enqueueTasks(ctx, pendingTasks, enqueuers...) + return nil }) }() return cancel } +func (hj *HeartbeatJob) enqueueTasks(ctx context.Context, tasks []task.Task, enqueuers ...TaskEnqueuer) { + if len(tasks) == 0 || len(enqueuers) == 0 { + return + } + for _, t := range tasks { + if t.Status == "completed" || t.Status == "failed" || t.Status == "cancelled" { + continue + } + for _, enq := range enqueuers { + if err := enq.Enqueue(ctx, t); err != nil { + log.Errorf("failed to enqueue task %s from heartbeat: %v", t.ID, err) + } + } + } + telemetry.Metric("hostlink.heartbeat.tasks_delivered", len(tasks), map[string]any{}) +} + func (hj *HeartbeatJob) Shutdown() { if hj.cancel != nil { hj.cancel() diff --git a/app/jobs/heartbeatjob/heartbeatjob_test.go b/app/jobs/heartbeatjob/heartbeatjob_test.go index 226e3ab..99b1927 100644 --- a/app/jobs/heartbeatjob/heartbeatjob_test.go +++ b/app/jobs/heartbeatjob/heartbeatjob_test.go @@ -10,15 +10,19 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "hostlink/domain/task" ) type MockHeartbeatService struct { mock.Mock } -func (m *MockHeartbeatService) Send() error { +func (m *MockHeartbeatService) Send() ([]task.Task, error) { args := m.Called() - return args.Error(0) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).([]task.Task), args.Error(1) } func immediateTrigger(callCount int, done chan struct{}) TriggerFunc { @@ -71,7 +75,7 @@ func TestNewWithConfig_DefaultsNilTrigger(t *testing.T) { // TestRegister_CallsServiceSend - trigger calls heartbeat.Service.Send() func TestRegister_CallsServiceSend(t *testing.T) { svc := new(MockHeartbeatService) - svc.On("Send").Return(nil).Times(3) + svc.On("Send").Return(nil, nil).Times(3) done := make(chan struct{}) job := NewWithConfig(HeartbeatJobConfig{ @@ -90,7 +94,7 @@ func TestRegister_CallsServiceSend(t *testing.T) { // TestRegister_ContinuesOnError - job continues running after Send() error func TestRegister_ContinuesOnError(t *testing.T) { svc := new(MockHeartbeatService) - svc.On("Send").Return(errors.New("connection refused")).Times(3) + svc.On("Send").Return(nil, errors.New("connection refused")).Times(3) done := make(chan struct{}) job := NewWithConfig(HeartbeatJobConfig{ @@ -128,7 +132,7 @@ func TestRegister_ReturnsCancel(t *testing.T) { func TestShutdown_StopsJob(t *testing.T) { var callCount atomic.Int32 svc := new(MockHeartbeatService) - svc.On("Send").Return(nil).Run(func(args mock.Arguments) { + svc.On("Send").Return(nil, nil).Run(func(args mock.Arguments) { callCount.Add(1) }) diff --git a/app/jobs/heartbeatjob/trigger.go b/app/jobs/heartbeatjob/trigger.go index 4a5487e..0a74d2f 100644 --- a/app/jobs/heartbeatjob/trigger.go +++ b/app/jobs/heartbeatjob/trigger.go @@ -2,6 +2,7 @@ package heartbeatjob import ( "context" + "fmt" "time" "github.com/labstack/gommon/log" @@ -23,13 +24,23 @@ func TriggerWithConfig(ctx context.Context, fn func() error, config TriggerConfi case <-ctx.Done(): return case <-time.After(config.Interval): - if err := fn(); err != nil { + if err := safeCall(fn); err != nil { log.Errorf("heartbeat failed: %s", err) } } } } +func safeCall(fn func() error) (err error) { + defer func() { + if r := recover(); r != nil { + log.Errorf("Panic recovered in heartbeat: %v", r) + err = fmt.Errorf("panic: %v", r) + } + }() + return fn() +} + func Trigger(ctx context.Context, fn func() error) { TriggerWithConfig(ctx, fn, DefaultTriggerConfig()) } diff --git a/app/jobs/metricsjob/trigger.go b/app/jobs/metricsjob/trigger.go index 48df6a3..91074f9 100644 --- a/app/jobs/metricsjob/trigger.go +++ b/app/jobs/metricsjob/trigger.go @@ -2,6 +2,7 @@ package metricsjob import ( "context" + "fmt" "time" "github.com/labstack/gommon/log" @@ -27,13 +28,23 @@ func triggerWithConfig(ctx context.Context, fn func() error, config TriggerConfi case <-ctx.Done(): return case <-time.After(config.InitialDelay): - if err := fn(); err != nil { + if err := safeCall(fn); err != nil { log.Errorf("Failed while running metrics job: %s", err) } } } } +func safeCall(fn func() error) (err error) { + defer func() { + if r := recover(); r != nil { + log.Errorf("Panic recovered in metrics: %v", r) + err = fmt.Errorf("panic: %v", r) + } + }() + return fn() +} + func TriggerWithConfig(ctx context.Context, fn func() error, config TriggerConfig) { triggerWithConfig(ctx, fn, config) } diff --git a/app/jobs/selfupdatejob/trigger.go b/app/jobs/selfupdatejob/trigger.go index e9f955d..1e8e348 100644 --- a/app/jobs/selfupdatejob/trigger.go +++ b/app/jobs/selfupdatejob/trigger.go @@ -2,6 +2,7 @@ package selfupdatejob import ( "context" + "fmt" "time" log "github.com/sirupsen/logrus" @@ -26,13 +27,23 @@ func TriggerWithConfig(ctx context.Context, fn func() error, config TriggerConfi case <-ctx.Done(): return case <-time.After(config.Interval): - if err := fn(); err != nil { + if err := safeCall(fn); err != nil { log.Errorf("self-update check failed: %s", err) } } } } +func safeCall(fn func() error) (err error) { + defer func() { + if r := recover(); r != nil { + log.Errorf("Panic recovered in self-update: %v", r) + err = fmt.Errorf("panic: %v", r) + } + }() + return fn() +} + // Trigger runs fn with the default configuration. func Trigger(ctx context.Context, fn func() error) { TriggerWithConfig(ctx, fn, DefaultTriggerConfig()) diff --git a/app/jobs/taskjob/taskjob.go b/app/jobs/taskjob/taskjob.go index 2c80c97..b61b3de 100644 --- a/app/jobs/taskjob/taskjob.go +++ b/app/jobs/taskjob/taskjob.go @@ -85,7 +85,7 @@ func (tj *TaskJob) Register(ctx context.Context, tf taskfetcher.TaskFetcher, tr for { select { case queued := <-tj.enqueueCh: - tj.processTask(ctx, queued, tr, channel) + tj.processTaskSafe(ctx, queued, tr, channel) case <-ctx.Done(): return } @@ -109,7 +109,7 @@ func (tj *TaskJob) Register(ctx context.Context, tf taskfetcher.TaskFetcher, tr } } for _, t := range incompleteTasks { - tj.processTask(ctx, t, tr, channel) + tj.processTaskSafe(ctx, t, tr, channel) } return nil }) @@ -130,6 +130,18 @@ func (tj *TaskJob) Enqueue(ctx context.Context, t task.Task) error { } } +func (tj *TaskJob) processTaskSafe(ctx context.Context, t task.Task, tr taskreporter.TaskReporter, channel ResultChannel) { + defer func() { + if r := recover(); r != nil { + log.Errorf("Panic recovered in processTask: %v", r) + telemetry.Metric("hostlink.task_runner.panic", 1, map[string]any{ + "task_id": t.ID, + }) + } + }() + tj.processTask(ctx, t, tr, channel) +} + func (tj *TaskJob) processTask(ctx context.Context, t task.Task, tr taskreporter.TaskReporter, channel ResultChannel) { tempFile, err := os.CreateTemp("", "*_script.sh") if err != nil { diff --git a/app/jobs/taskjob/trigger.go b/app/jobs/taskjob/trigger.go index 23ce098..1cdbfce 100644 --- a/app/jobs/taskjob/trigger.go +++ b/app/jobs/taskjob/trigger.go @@ -2,6 +2,7 @@ package taskjob import ( "context" + "fmt" "time" "github.com/labstack/gommon/log" @@ -27,13 +28,23 @@ func triggerWithConfig(ctx context.Context, fn func() error, config TriggerConfi case <-ctx.Done(): return case <-time.After(config.InitialDelay): - if err := fn(); err != nil { - log.Errorf("Failed while running metrics job: %s", err) + if err := safeCall(fn); err != nil { + log.Errorf("Failed while running task poller: %s", err) } } } } +func safeCall(fn func() error) (err error) { + defer func() { + if r := recover(); r != nil { + log.Errorf("Panic recovered: %v", r) + err = fmt.Errorf("panic: %v", r) + } + }() + return fn() +} + func TriggerWithConfig(ctx context.Context, fn func() error, config TriggerConfig) { triggerWithConfig(ctx, fn, config) } diff --git a/app/services/heartbeat/heartbeat.go b/app/services/heartbeat/heartbeat.go index e8ca833..c355b31 100644 --- a/app/services/heartbeat/heartbeat.go +++ b/app/services/heartbeat/heartbeat.go @@ -6,11 +6,12 @@ import ( "hostlink/app/services/agentstate" "hostlink/config/appconf" + "hostlink/domain/task" "hostlink/internal/apiserver" ) type Service interface { - Send() error + Send() ([]task.Task, error) } type heartbeatService struct { @@ -49,11 +50,15 @@ func NewWithDependencies( } } -func (s *heartbeatService) Send() error { +func (s *heartbeatService) Send() ([]task.Task, error) { agentID := s.agentstate.GetAgentID() if agentID == "" { - return fmt.Errorf("agent not registered: missing agent ID") + return nil, fmt.Errorf("agent not registered: missing agent ID") } - return s.apiserver.Heartbeat(context.Background(), agentID) + resp, err := s.apiserver.Heartbeat(context.Background(), agentID) + if err != nil { + return nil, err + } + return resp.PendingTasks, nil } diff --git a/app/services/heartbeat/heartbeat_test.go b/app/services/heartbeat/heartbeat_test.go index b96bcf8..dc1ae59 100644 --- a/app/services/heartbeat/heartbeat_test.go +++ b/app/services/heartbeat/heartbeat_test.go @@ -7,15 +7,20 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "hostlink/domain/task" + "hostlink/internal/apiserver" ) type MockAPIServer struct { mock.Mock } -func (m *MockAPIServer) Heartbeat(ctx context.Context, agentID string) error { +func (m *MockAPIServer) Heartbeat(ctx context.Context, agentID string) (*apiserver.HeartbeatResponse, error) { args := m.Called(ctx, agentID) - return args.Error(0) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).(*apiserver.HeartbeatResponse), args.Error(1) } type MockAgentState struct { @@ -48,10 +53,10 @@ func (m *MockAgentState) Clear() error { } func setupTestService() (*heartbeatService, *MockAPIServer, *MockAgentState) { - apiserver := new(MockAPIServer) + mockSvr := new(MockAPIServer) agentstate := new(MockAgentState) - service := NewWithDependencies(apiserver, agentstate) - return service, apiserver, agentstate + service := NewWithDependencies(mockSvr, agentstate) + return service, mockSvr, agentstate } // TestSend_NoAgentID - returns error when agent is not registered @@ -60,7 +65,7 @@ func TestSend_NoAgentID(t *testing.T) { agentstate.On("GetAgentID").Return("") - err := service.Send() + _, err := service.Send() assert.EqualError(t, err, "agent not registered: missing agent ID") agentstate.AssertExpectations(t) @@ -68,45 +73,69 @@ func TestSend_NoAgentID(t *testing.T) { // TestSend_Success - sends heartbeat successfully func TestSend_Success(t *testing.T) { - service, apiserver, agentstate := setupTestService() + service, mockSvr, agentstate := setupTestService() agentstate.On("GetAgentID").Return("agent-123") - apiserver.On("Heartbeat", mock.Anything, "agent-123").Return(nil) + mockSvr.On("Heartbeat", mock.Anything, "agent-123").Return(&apiserver.HeartbeatResponse{}, nil) - err := service.Send() + tasks, err := service.Send() assert.NoError(t, err) + assert.Empty(t, tasks) agentstate.AssertExpectations(t) - apiserver.AssertExpectations(t) + mockSvr.AssertExpectations(t) } // TestSend_APIError - returns error when API call fails func TestSend_APIError(t *testing.T) { - service, apiserver, agentstate := setupTestService() + service, mockSvr, agentstate := setupTestService() expectedErr := errors.New("connection refused") agentstate.On("GetAgentID").Return("agent-123") - apiserver.On("Heartbeat", mock.Anything, "agent-123").Return(expectedErr) + mockSvr.On("Heartbeat", mock.Anything, "agent-123").Return(nil, expectedErr) - err := service.Send() + tasks, err := service.Send() + assert.Nil(t, tasks) assert.Equal(t, expectedErr, err) agentstate.AssertExpectations(t) - apiserver.AssertExpectations(t) + mockSvr.AssertExpectations(t) } // TestSend_UsesBackgroundContext - verifies context.Background is used func TestSend_UsesBackgroundContext(t *testing.T) { - service, apiserver, agentstate := setupTestService() + service, mockSvr, agentstate := setupTestService() agentstate.On("GetAgentID").Return("agent-123") - apiserver.On("Heartbeat", mock.MatchedBy(func(ctx context.Context) bool { + mockSvr.On("Heartbeat", mock.MatchedBy(func(ctx context.Context) bool { _, hasDeadline := ctx.Deadline() return !hasDeadline && ctx.Err() == nil - }), "agent-123").Return(nil) + }), "agent-123").Return(&apiserver.HeartbeatResponse{}, nil) + + tasks, err := service.Send() + + assert.NoError(t, err) + assert.Empty(t, tasks) + mockSvr.AssertExpectations(t) +} + +// TestSend_WithPendingTasks - returns pending tasks from heartbeat response +func TestSend_WithPendingTasks(t *testing.T) { + service, mockSvr, agentstate := setupTestService() + + agentstate.On("GetAgentID").Return("agent-123") + resp := &apiserver.HeartbeatResponse{ + Message: "ok", + PendingTasks: []task.Task{ + {ID: "tsk_test", Command: "echo hello", Status: "pending"}, + }, + } + mockSvr.On("Heartbeat", mock.Anything, "agent-123").Return(resp, nil) - err := service.Send() + pendingTasks, err := service.Send() assert.NoError(t, err) - apiserver.AssertExpectations(t) + assert.Len(t, pendingTasks, 1) + assert.Equal(t, "tsk_test", pendingTasks[0].ID) + assert.Equal(t, "echo hello", pendingTasks[0].Command) } diff --git a/internal/apiserver/operations.go b/internal/apiserver/operations.go index beae600..6587ae5 100644 --- a/internal/apiserver/operations.go +++ b/internal/apiserver/operations.go @@ -5,6 +5,7 @@ import ( "fmt" "hostlink/domain/credential" "hostlink/domain/metrics" + "hostlink/domain/task" ) type MetricsOperations interface { @@ -12,8 +13,13 @@ type MetricsOperations interface { PushMetrics(ctx context.Context, payload metrics.MetricPayload) error } +type HeartbeatResponse struct { + Message string `json:"message"` + PendingTasks []task.Task `json:"pending_tasks"` +} + type HeartbeatOperations interface { - Heartbeat(ctx context.Context, agentID string) error + Heartbeat(ctx context.Context, agentID string) (*HeartbeatResponse, error) } func (c *client) GetMetricsCreds(ctx context.Context, agentID string) ([]credential.Credential, error) { @@ -27,6 +33,11 @@ func (c *client) PushMetrics(ctx context.Context, payload metrics.MetricPayload) return c.Post(ctx, fmt.Sprintf("/api/v1/agents/%s/metrics", agentID), payload, nil) } -func (c *client) Heartbeat(ctx context.Context, agentID string) error { - return c.Post(ctx, fmt.Sprintf("/api/v1/agents/%s/heartbeat", agentID), nil, nil) +func (c *client) Heartbeat(ctx context.Context, agentID string) (*HeartbeatResponse, error) { + var result HeartbeatResponse + err := c.Post(ctx, fmt.Sprintf("/api/v1/agents/%s/heartbeat", agentID), nil, &result) + if err != nil { + return nil, err + } + return &result, nil } diff --git a/internal/apiserver/operations_test.go b/internal/apiserver/operations_test.go index 4a334bf..717076a 100644 --- a/internal/apiserver/operations_test.go +++ b/internal/apiserver/operations_test.go @@ -43,14 +43,16 @@ func setupTestClient(t *testing.T, serverURL string) *client { // TestHeartbeat_Success - sends POST to /api/v1/agents/{agentID}/heartbeat with empty body func TestHeartbeat_Success(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) + w.Header().Set("Content-Type", "application/json") + w.Write([]byte(`{"message":"ok"}`)) })) defer server.Close() c := setupTestClient(t, server.URL) - err := c.Heartbeat(context.Background(), "test-agent-123") + resp, err := c.Heartbeat(context.Background(), "test-agent-123") assert.NoError(t, err) + assert.NotNil(t, resp) } // TestHeartbeat_SendsToCorrectEndpoint - verifies correct URL path @@ -60,14 +62,16 @@ func TestHeartbeat_SendsToCorrectEndpoint(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { capturedPath = r.URL.Path capturedMethod = r.Method - w.WriteHeader(http.StatusOK) + w.Header().Set("Content-Type", "application/json") + w.Write([]byte(`{"message":"ok"}`)) })) defer server.Close() c := setupTestClient(t, server.URL) - err := c.Heartbeat(context.Background(), "agent-xyz") + resp, err := c.Heartbeat(context.Background(), "agent-xyz") require.NoError(t, err) + assert.NotNil(t, resp) assert.Equal(t, "/api/v1/agents/agent-xyz/heartbeat", capturedPath) assert.Equal(t, "POST", capturedMethod) } @@ -77,14 +81,16 @@ func TestHeartbeat_SendsEmptyBody(t *testing.T) { var bodySize int64 server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { bodySize = r.ContentLength - w.WriteHeader(http.StatusOK) + w.Header().Set("Content-Type", "application/json") + w.Write([]byte(`{"message":"ok"}`)) })) defer server.Close() c := setupTestClient(t, server.URL) - err := c.Heartbeat(context.Background(), "test-agent-123") + resp, err := c.Heartbeat(context.Background(), "test-agent-123") require.NoError(t, err) + assert.NotNil(t, resp) assert.Equal(t, int64(0), bodySize) } @@ -93,14 +99,16 @@ func TestHeartbeat_AuthenticationHeadersIncluded(t *testing.T) { var headers http.Header server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { headers = r.Header - w.WriteHeader(http.StatusOK) + w.Header().Set("Content-Type", "application/json") + w.Write([]byte(`{"message":"ok"}`)) })) defer server.Close() c := setupTestClient(t, server.URL) - err := c.Heartbeat(context.Background(), "test-agent-123") + resp, err := c.Heartbeat(context.Background(), "test-agent-123") require.NoError(t, err) + assert.NotNil(t, resp) assert.NotEmpty(t, headers.Get("X-Agent-ID")) assert.NotEmpty(t, headers.Get("X-Timestamp")) assert.NotEmpty(t, headers.Get("X-Nonce")) @@ -116,9 +124,10 @@ func TestHeartbeat_ServerError(t *testing.T) { defer server.Close() c := setupTestClient(t, server.URL) - err := c.Heartbeat(context.Background(), "test-agent-123") + resp, err := c.Heartbeat(context.Background(), "test-agent-123") assert.Error(t, err) + assert.Nil(t, resp) } // TestHeartbeat_Unauthorized - returns error on 401 response @@ -130,7 +139,26 @@ func TestHeartbeat_Unauthorized(t *testing.T) { defer server.Close() c := setupTestClient(t, server.URL) - err := c.Heartbeat(context.Background(), "test-agent-123") + resp, err := c.Heartbeat(context.Background(), "test-agent-123") assert.Error(t, err) + assert.Nil(t, resp) +} + +// TestHeartbeat_PendingTasks - decodes pending_tasks from response +func TestHeartbeat_PendingTasks(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.Write([]byte(`{"message":"Heartbeat received","pending_tasks":[{"id":"tsk_test","command":"echo hello","status":"pending","priority":1}]}`)) + })) + defer server.Close() + + c := setupTestClient(t, server.URL) + resp, err := c.Heartbeat(context.Background(), "test-agent-123") + + require.NoError(t, err) + require.NotNil(t, resp) + assert.Len(t, resp.PendingTasks, 1) + assert.Equal(t, "tsk_test", resp.PendingTasks[0].ID) + assert.Equal(t, "echo hello", resp.PendingTasks[0].Command) } diff --git a/main.go b/main.go index 528fd5a..31e54a5 100644 --- a/main.go +++ b/main.go @@ -332,7 +332,7 @@ func runServer(ctx context.Context, cmd *cli.Command) error { heartbeatjob.TriggerWithConfig(ctx, fn, heartbeatjob.TriggerConfig{Interval: appconf.HeartbeatInterval()}) }, }) - heartbeatJob.Register(jobCtx, heartbeatSvc) + heartbeatJob.Register(jobCtx, heartbeatSvc, taskJob) // Self-update job (gated by config) if appconf.SelfUpdateEnabled() {