diff --git a/app/jobs/heartbeatjob/heartbeatjob.go b/app/jobs/heartbeatjob/heartbeatjob.go index c9fc85b..8510733 100644 --- a/app/jobs/heartbeatjob/heartbeatjob.go +++ b/app/jobs/heartbeatjob/heartbeatjob.go @@ -6,10 +6,18 @@ import ( "sync" "hostlink/app/services/heartbeat" + "hostlink/domain/task" + "hostlink/internal/telemetry" + + "github.com/labstack/gommon/log" ) type TriggerFunc func(context.Context, func() error) +type TaskEnqueuer interface { + Enqueue(ctx context.Context, t task.Task) error +} + type HeartbeatJobConfig struct { Trigger TriggerFunc } @@ -36,7 +44,7 @@ func NewWithConfig(cfg HeartbeatJobConfig) HeartbeatJob { } } -func (hj *HeartbeatJob) Register(ctx context.Context, svc heartbeat.Service) context.CancelFunc { +func (hj *HeartbeatJob) Register(ctx context.Context, svc heartbeat.Service, enqueuers ...TaskEnqueuer) context.CancelFunc { ctx, cancel := context.WithCancel(ctx) hj.cancel = cancel @@ -44,13 +52,35 @@ func (hj *HeartbeatJob) Register(ctx context.Context, svc heartbeat.Service) con go func() { defer hj.wg.Done() hj.config.Trigger(ctx, func() error { - return svc.Send() + pendingTasks, err := svc.Send() + if err != nil { + return err + } + hj.enqueueTasks(ctx, pendingTasks, enqueuers...) + return nil }) }() return cancel } +func (hj *HeartbeatJob) enqueueTasks(ctx context.Context, tasks []task.Task, enqueuers ...TaskEnqueuer) { + if len(tasks) == 0 || len(enqueuers) == 0 { + return + } + for _, t := range tasks { + if t.Status == "completed" || t.Status == "failed" || t.Status == "cancelled" { + continue + } + for _, enq := range enqueuers { + if err := enq.Enqueue(ctx, t); err != nil { + log.Errorf("failed to enqueue task %s from heartbeat: %v", t.ID, err) + } + } + } + telemetry.Metric("hostlink.heartbeat.tasks_delivered", len(tasks), map[string]any{}) +} + func (hj *HeartbeatJob) Shutdown() { if hj.cancel != nil { hj.cancel() diff --git a/app/jobs/heartbeatjob/heartbeatjob_test.go b/app/jobs/heartbeatjob/heartbeatjob_test.go index 226e3ab..99b1927 100644 --- a/app/jobs/heartbeatjob/heartbeatjob_test.go +++ b/app/jobs/heartbeatjob/heartbeatjob_test.go @@ -10,15 +10,19 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "hostlink/domain/task" ) type MockHeartbeatService struct { mock.Mock } -func (m *MockHeartbeatService) Send() error { +func (m *MockHeartbeatService) Send() ([]task.Task, error) { args := m.Called() - return args.Error(0) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).([]task.Task), args.Error(1) } func immediateTrigger(callCount int, done chan struct{}) TriggerFunc { @@ -71,7 +75,7 @@ func TestNewWithConfig_DefaultsNilTrigger(t *testing.T) { // TestRegister_CallsServiceSend - trigger calls heartbeat.Service.Send() func TestRegister_CallsServiceSend(t *testing.T) { svc := new(MockHeartbeatService) - svc.On("Send").Return(nil).Times(3) + svc.On("Send").Return(nil, nil).Times(3) done := make(chan struct{}) job := NewWithConfig(HeartbeatJobConfig{ @@ -90,7 +94,7 @@ func TestRegister_CallsServiceSend(t *testing.T) { // TestRegister_ContinuesOnError - job continues running after Send() error func TestRegister_ContinuesOnError(t *testing.T) { svc := new(MockHeartbeatService) - svc.On("Send").Return(errors.New("connection refused")).Times(3) + svc.On("Send").Return(nil, errors.New("connection refused")).Times(3) done := make(chan struct{}) job := NewWithConfig(HeartbeatJobConfig{ @@ -128,7 +132,7 @@ func TestRegister_ReturnsCancel(t *testing.T) { func TestShutdown_StopsJob(t *testing.T) { var callCount atomic.Int32 svc := new(MockHeartbeatService) - svc.On("Send").Return(nil).Run(func(args mock.Arguments) { + svc.On("Send").Return(nil, nil).Run(func(args mock.Arguments) { callCount.Add(1) }) diff --git a/app/jobs/heartbeatjob/trigger.go b/app/jobs/heartbeatjob/trigger.go index 4a5487e..0a74d2f 100644 --- a/app/jobs/heartbeatjob/trigger.go +++ b/app/jobs/heartbeatjob/trigger.go @@ -2,6 +2,7 @@ package heartbeatjob import ( "context" + "fmt" "time" "github.com/labstack/gommon/log" @@ -23,13 +24,23 @@ func TriggerWithConfig(ctx context.Context, fn func() error, config TriggerConfi case <-ctx.Done(): return case <-time.After(config.Interval): - if err := fn(); err != nil { + if err := safeCall(fn); err != nil { log.Errorf("heartbeat failed: %s", err) } } } } +func safeCall(fn func() error) (err error) { + defer func() { + if r := recover(); r != nil { + log.Errorf("Panic recovered in heartbeat: %v", r) + err = fmt.Errorf("panic: %v", r) + } + }() + return fn() +} + func Trigger(ctx context.Context, fn func() error) { TriggerWithConfig(ctx, fn, DefaultTriggerConfig()) } diff --git a/app/jobs/metricsjob/trigger.go b/app/jobs/metricsjob/trigger.go index 48df6a3..91074f9 100644 --- a/app/jobs/metricsjob/trigger.go +++ b/app/jobs/metricsjob/trigger.go @@ -2,6 +2,7 @@ package metricsjob import ( "context" + "fmt" "time" "github.com/labstack/gommon/log" @@ -27,13 +28,23 @@ func triggerWithConfig(ctx context.Context, fn func() error, config TriggerConfi case <-ctx.Done(): return case <-time.After(config.InitialDelay): - if err := fn(); err != nil { + if err := safeCall(fn); err != nil { log.Errorf("Failed while running metrics job: %s", err) } } } } +func safeCall(fn func() error) (err error) { + defer func() { + if r := recover(); r != nil { + log.Errorf("Panic recovered in metrics: %v", r) + err = fmt.Errorf("panic: %v", r) + } + }() + return fn() +} + func TriggerWithConfig(ctx context.Context, fn func() error, config TriggerConfig) { triggerWithConfig(ctx, fn, config) } diff --git a/app/jobs/selfupdatejob/trigger.go b/app/jobs/selfupdatejob/trigger.go index e9f955d..1e8e348 100644 --- a/app/jobs/selfupdatejob/trigger.go +++ b/app/jobs/selfupdatejob/trigger.go @@ -2,6 +2,7 @@ package selfupdatejob import ( "context" + "fmt" "time" log "github.com/sirupsen/logrus" @@ -26,13 +27,23 @@ func TriggerWithConfig(ctx context.Context, fn func() error, config TriggerConfi case <-ctx.Done(): return case <-time.After(config.Interval): - if err := fn(); err != nil { + if err := safeCall(fn); err != nil { log.Errorf("self-update check failed: %s", err) } } } } +func safeCall(fn func() error) (err error) { + defer func() { + if r := recover(); r != nil { + log.Errorf("Panic recovered in self-update: %v", r) + err = fmt.Errorf("panic: %v", r) + } + }() + return fn() +} + // Trigger runs fn with the default configuration. func Trigger(ctx context.Context, fn func() error) { TriggerWithConfig(ctx, fn, DefaultTriggerConfig()) diff --git a/app/jobs/taskjob/taskjob.go b/app/jobs/taskjob/taskjob.go index 2c80c97..b61b3de 100644 --- a/app/jobs/taskjob/taskjob.go +++ b/app/jobs/taskjob/taskjob.go @@ -85,7 +85,7 @@ func (tj *TaskJob) Register(ctx context.Context, tf taskfetcher.TaskFetcher, tr for { select { case queued := <-tj.enqueueCh: - tj.processTask(ctx, queued, tr, channel) + tj.processTaskSafe(ctx, queued, tr, channel) case <-ctx.Done(): return } @@ -109,7 +109,7 @@ func (tj *TaskJob) Register(ctx context.Context, tf taskfetcher.TaskFetcher, tr } } for _, t := range incompleteTasks { - tj.processTask(ctx, t, tr, channel) + tj.processTaskSafe(ctx, t, tr, channel) } return nil }) @@ -130,6 +130,18 @@ func (tj *TaskJob) Enqueue(ctx context.Context, t task.Task) error { } } +func (tj *TaskJob) processTaskSafe(ctx context.Context, t task.Task, tr taskreporter.TaskReporter, channel ResultChannel) { + defer func() { + if r := recover(); r != nil { + log.Errorf("Panic recovered in processTask: %v", r) + telemetry.Metric("hostlink.task_runner.panic", 1, map[string]any{ + "task_id": t.ID, + }) + } + }() + tj.processTask(ctx, t, tr, channel) +} + func (tj *TaskJob) processTask(ctx context.Context, t task.Task, tr taskreporter.TaskReporter, channel ResultChannel) { tempFile, err := os.CreateTemp("", "*_script.sh") if err != nil { diff --git a/app/jobs/taskjob/trigger.go b/app/jobs/taskjob/trigger.go index 23ce098..1cdbfce 100644 --- a/app/jobs/taskjob/trigger.go +++ b/app/jobs/taskjob/trigger.go @@ -2,6 +2,7 @@ package taskjob import ( "context" + "fmt" "time" "github.com/labstack/gommon/log" @@ -27,13 +28,23 @@ func triggerWithConfig(ctx context.Context, fn func() error, config TriggerConfi case <-ctx.Done(): return case <-time.After(config.InitialDelay): - if err := fn(); err != nil { - log.Errorf("Failed while running metrics job: %s", err) + if err := safeCall(fn); err != nil { + log.Errorf("Failed while running task poller: %s", err) } } } } +func safeCall(fn func() error) (err error) { + defer func() { + if r := recover(); r != nil { + log.Errorf("Panic recovered: %v", r) + err = fmt.Errorf("panic: %v", r) + } + }() + return fn() +} + func TriggerWithConfig(ctx context.Context, fn func() error, config TriggerConfig) { triggerWithConfig(ctx, fn, config) } diff --git a/app/services/heartbeat/heartbeat.go b/app/services/heartbeat/heartbeat.go index e8ca833..c355b31 100644 --- a/app/services/heartbeat/heartbeat.go +++ b/app/services/heartbeat/heartbeat.go @@ -6,11 +6,12 @@ import ( "hostlink/app/services/agentstate" "hostlink/config/appconf" + "hostlink/domain/task" "hostlink/internal/apiserver" ) type Service interface { - Send() error + Send() ([]task.Task, error) } type heartbeatService struct { @@ -49,11 +50,15 @@ func NewWithDependencies( } } -func (s *heartbeatService) Send() error { +func (s *heartbeatService) Send() ([]task.Task, error) { agentID := s.agentstate.GetAgentID() if agentID == "" { - return fmt.Errorf("agent not registered: missing agent ID") + return nil, fmt.Errorf("agent not registered: missing agent ID") } - return s.apiserver.Heartbeat(context.Background(), agentID) + resp, err := s.apiserver.Heartbeat(context.Background(), agentID) + if err != nil { + return nil, err + } + return resp.PendingTasks, nil } diff --git a/app/services/heartbeat/heartbeat_test.go b/app/services/heartbeat/heartbeat_test.go index b96bcf8..dc1ae59 100644 --- a/app/services/heartbeat/heartbeat_test.go +++ b/app/services/heartbeat/heartbeat_test.go @@ -7,15 +7,20 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "hostlink/domain/task" + "hostlink/internal/apiserver" ) type MockAPIServer struct { mock.Mock } -func (m *MockAPIServer) Heartbeat(ctx context.Context, agentID string) error { +func (m *MockAPIServer) Heartbeat(ctx context.Context, agentID string) (*apiserver.HeartbeatResponse, error) { args := m.Called(ctx, agentID) - return args.Error(0) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).(*apiserver.HeartbeatResponse), args.Error(1) } type MockAgentState struct { @@ -48,10 +53,10 @@ func (m *MockAgentState) Clear() error { } func setupTestService() (*heartbeatService, *MockAPIServer, *MockAgentState) { - apiserver := new(MockAPIServer) + mockSvr := new(MockAPIServer) agentstate := new(MockAgentState) - service := NewWithDependencies(apiserver, agentstate) - return service, apiserver, agentstate + service := NewWithDependencies(mockSvr, agentstate) + return service, mockSvr, agentstate } // TestSend_NoAgentID - returns error when agent is not registered @@ -60,7 +65,7 @@ func TestSend_NoAgentID(t *testing.T) { agentstate.On("GetAgentID").Return("") - err := service.Send() + _, err := service.Send() assert.EqualError(t, err, "agent not registered: missing agent ID") agentstate.AssertExpectations(t) @@ -68,45 +73,69 @@ func TestSend_NoAgentID(t *testing.T) { // TestSend_Success - sends heartbeat successfully func TestSend_Success(t *testing.T) { - service, apiserver, agentstate := setupTestService() + service, mockSvr, agentstate := setupTestService() agentstate.On("GetAgentID").Return("agent-123") - apiserver.On("Heartbeat", mock.Anything, "agent-123").Return(nil) + mockSvr.On("Heartbeat", mock.Anything, "agent-123").Return(&apiserver.HeartbeatResponse{}, nil) - err := service.Send() + tasks, err := service.Send() assert.NoError(t, err) + assert.Empty(t, tasks) agentstate.AssertExpectations(t) - apiserver.AssertExpectations(t) + mockSvr.AssertExpectations(t) } // TestSend_APIError - returns error when API call fails func TestSend_APIError(t *testing.T) { - service, apiserver, agentstate := setupTestService() + service, mockSvr, agentstate := setupTestService() expectedErr := errors.New("connection refused") agentstate.On("GetAgentID").Return("agent-123") - apiserver.On("Heartbeat", mock.Anything, "agent-123").Return(expectedErr) + mockSvr.On("Heartbeat", mock.Anything, "agent-123").Return(nil, expectedErr) - err := service.Send() + tasks, err := service.Send() + assert.Nil(t, tasks) assert.Equal(t, expectedErr, err) agentstate.AssertExpectations(t) - apiserver.AssertExpectations(t) + mockSvr.AssertExpectations(t) } // TestSend_UsesBackgroundContext - verifies context.Background is used func TestSend_UsesBackgroundContext(t *testing.T) { - service, apiserver, agentstate := setupTestService() + service, mockSvr, agentstate := setupTestService() agentstate.On("GetAgentID").Return("agent-123") - apiserver.On("Heartbeat", mock.MatchedBy(func(ctx context.Context) bool { + mockSvr.On("Heartbeat", mock.MatchedBy(func(ctx context.Context) bool { _, hasDeadline := ctx.Deadline() return !hasDeadline && ctx.Err() == nil - }), "agent-123").Return(nil) + }), "agent-123").Return(&apiserver.HeartbeatResponse{}, nil) + + tasks, err := service.Send() + + assert.NoError(t, err) + assert.Empty(t, tasks) + mockSvr.AssertExpectations(t) +} + +// TestSend_WithPendingTasks - returns pending tasks from heartbeat response +func TestSend_WithPendingTasks(t *testing.T) { + service, mockSvr, agentstate := setupTestService() + + agentstate.On("GetAgentID").Return("agent-123") + resp := &apiserver.HeartbeatResponse{ + Message: "ok", + PendingTasks: []task.Task{ + {ID: "tsk_test", Command: "echo hello", Status: "pending"}, + }, + } + mockSvr.On("Heartbeat", mock.Anything, "agent-123").Return(resp, nil) - err := service.Send() + pendingTasks, err := service.Send() assert.NoError(t, err) - apiserver.AssertExpectations(t) + assert.Len(t, pendingTasks, 1) + assert.Equal(t, "tsk_test", pendingTasks[0].ID) + assert.Equal(t, "echo hello", pendingTasks[0].Command) } diff --git a/app/services/metrics/metrics.go b/app/services/metrics/metrics.go index 116ed4f..d281be0 100644 --- a/app/services/metrics/metrics.go +++ b/app/services/metrics/metrics.go @@ -341,7 +341,7 @@ func (mp *metricspusher) Push(cred credential.Credential) error { } } - // ── Traefik metrics (aggregate per entrypoint) ────────────────────────────── + // ── Traefik metrics (HTTP requests / response time / error rate per entrypoint) ── traefikSets, err := mp.traefikcollector.Collect(ctx) if err != nil { diff --git a/domain/metrics/metrics.go b/domain/metrics/metrics.go index 4dfe69b..7d0f4f2 100644 --- a/domain/metrics/metrics.go +++ b/domain/metrics/metrics.go @@ -104,16 +104,19 @@ type StorageAttributes struct { } type MySQLDatabaseMetrics struct { - Up bool `json:"up"` - ThreadsConnected int `json:"threads_connected"` - ThreadsRunning int `json:"threads_running"` - MaxConnections int `json:"max_connections"` - MaxUsedConnections int64 `json:"max_used_connections"` - QueriesPerSecond float64 `json:"queries_per_second"` - SlowQueries int64 `json:"slow_queries"` - InnoDBCacheHitRatio float64 `json:"innodb_cache_hit_ratio"` - ReplicationLagSeconds *int `json:"replication_lag_seconds,omitempty"` - ReplicationConnected *bool `json:"replication_connected,omitempty"` + Up bool `json:"up"` + ConnectionsTotal int `json:"connections_total"` + ConnectionsAborted int64 `json:"connections_aborted"` + MaxConnections int `json:"max_connections"` + ThreadsRunning int `json:"threads_running"` + QueriesPerSecond float64 `json:"queries_per_second"` + SlowQueriesPerSecond float64 `json:"slow_queries_per_second"` + InnoDBBufferPoolHitRatio float64 `json:"innodb_buffer_pool_hit_ratio"` + InnoDBRowLockWaitsPerSecond float64 `json:"innodb_row_lock_waits_per_second"` + TmpDiskTablesPerSecond float64 `json:"tmp_disk_tables_per_second"` + SelectFullScansPerSecond float64 `json:"select_full_scans_per_second"` + ReplicationLagSeconds *int `json:"replication_lag_seconds,omitempty"` + ReplicationConnected *bool `json:"replication_connected,omitempty"` } type MongoDBMetrics struct { @@ -204,9 +207,9 @@ type TraefikRouterMetrics struct { } type TraefikRouterAttributes struct { - RouterName string `json:"router_name"` + RouterName string `json:"router_name"` EntrypointName string `json:"entrypoint_name"` - Service string `json:"service,omitempty"` + Service string `json:"service,omitempty"` } type ContainerAttributes struct { diff --git a/internal/apiserver/operations.go b/internal/apiserver/operations.go index beae600..6587ae5 100644 --- a/internal/apiserver/operations.go +++ b/internal/apiserver/operations.go @@ -5,6 +5,7 @@ import ( "fmt" "hostlink/domain/credential" "hostlink/domain/metrics" + "hostlink/domain/task" ) type MetricsOperations interface { @@ -12,8 +13,13 @@ type MetricsOperations interface { PushMetrics(ctx context.Context, payload metrics.MetricPayload) error } +type HeartbeatResponse struct { + Message string `json:"message"` + PendingTasks []task.Task `json:"pending_tasks"` +} + type HeartbeatOperations interface { - Heartbeat(ctx context.Context, agentID string) error + Heartbeat(ctx context.Context, agentID string) (*HeartbeatResponse, error) } func (c *client) GetMetricsCreds(ctx context.Context, agentID string) ([]credential.Credential, error) { @@ -27,6 +33,11 @@ func (c *client) PushMetrics(ctx context.Context, payload metrics.MetricPayload) return c.Post(ctx, fmt.Sprintf("/api/v1/agents/%s/metrics", agentID), payload, nil) } -func (c *client) Heartbeat(ctx context.Context, agentID string) error { - return c.Post(ctx, fmt.Sprintf("/api/v1/agents/%s/heartbeat", agentID), nil, nil) +func (c *client) Heartbeat(ctx context.Context, agentID string) (*HeartbeatResponse, error) { + var result HeartbeatResponse + err := c.Post(ctx, fmt.Sprintf("/api/v1/agents/%s/heartbeat", agentID), nil, &result) + if err != nil { + return nil, err + } + return &result, nil } diff --git a/internal/apiserver/operations_test.go b/internal/apiserver/operations_test.go index 4a334bf..717076a 100644 --- a/internal/apiserver/operations_test.go +++ b/internal/apiserver/operations_test.go @@ -43,14 +43,16 @@ func setupTestClient(t *testing.T, serverURL string) *client { // TestHeartbeat_Success - sends POST to /api/v1/agents/{agentID}/heartbeat with empty body func TestHeartbeat_Success(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) + w.Header().Set("Content-Type", "application/json") + w.Write([]byte(`{"message":"ok"}`)) })) defer server.Close() c := setupTestClient(t, server.URL) - err := c.Heartbeat(context.Background(), "test-agent-123") + resp, err := c.Heartbeat(context.Background(), "test-agent-123") assert.NoError(t, err) + assert.NotNil(t, resp) } // TestHeartbeat_SendsToCorrectEndpoint - verifies correct URL path @@ -60,14 +62,16 @@ func TestHeartbeat_SendsToCorrectEndpoint(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { capturedPath = r.URL.Path capturedMethod = r.Method - w.WriteHeader(http.StatusOK) + w.Header().Set("Content-Type", "application/json") + w.Write([]byte(`{"message":"ok"}`)) })) defer server.Close() c := setupTestClient(t, server.URL) - err := c.Heartbeat(context.Background(), "agent-xyz") + resp, err := c.Heartbeat(context.Background(), "agent-xyz") require.NoError(t, err) + assert.NotNil(t, resp) assert.Equal(t, "/api/v1/agents/agent-xyz/heartbeat", capturedPath) assert.Equal(t, "POST", capturedMethod) } @@ -77,14 +81,16 @@ func TestHeartbeat_SendsEmptyBody(t *testing.T) { var bodySize int64 server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { bodySize = r.ContentLength - w.WriteHeader(http.StatusOK) + w.Header().Set("Content-Type", "application/json") + w.Write([]byte(`{"message":"ok"}`)) })) defer server.Close() c := setupTestClient(t, server.URL) - err := c.Heartbeat(context.Background(), "test-agent-123") + resp, err := c.Heartbeat(context.Background(), "test-agent-123") require.NoError(t, err) + assert.NotNil(t, resp) assert.Equal(t, int64(0), bodySize) } @@ -93,14 +99,16 @@ func TestHeartbeat_AuthenticationHeadersIncluded(t *testing.T) { var headers http.Header server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { headers = r.Header - w.WriteHeader(http.StatusOK) + w.Header().Set("Content-Type", "application/json") + w.Write([]byte(`{"message":"ok"}`)) })) defer server.Close() c := setupTestClient(t, server.URL) - err := c.Heartbeat(context.Background(), "test-agent-123") + resp, err := c.Heartbeat(context.Background(), "test-agent-123") require.NoError(t, err) + assert.NotNil(t, resp) assert.NotEmpty(t, headers.Get("X-Agent-ID")) assert.NotEmpty(t, headers.Get("X-Timestamp")) assert.NotEmpty(t, headers.Get("X-Nonce")) @@ -116,9 +124,10 @@ func TestHeartbeat_ServerError(t *testing.T) { defer server.Close() c := setupTestClient(t, server.URL) - err := c.Heartbeat(context.Background(), "test-agent-123") + resp, err := c.Heartbeat(context.Background(), "test-agent-123") assert.Error(t, err) + assert.Nil(t, resp) } // TestHeartbeat_Unauthorized - returns error on 401 response @@ -130,7 +139,26 @@ func TestHeartbeat_Unauthorized(t *testing.T) { defer server.Close() c := setupTestClient(t, server.URL) - err := c.Heartbeat(context.Background(), "test-agent-123") + resp, err := c.Heartbeat(context.Background(), "test-agent-123") assert.Error(t, err) + assert.Nil(t, resp) +} + +// TestHeartbeat_PendingTasks - decodes pending_tasks from response +func TestHeartbeat_PendingTasks(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.Write([]byte(`{"message":"Heartbeat received","pending_tasks":[{"id":"tsk_test","command":"echo hello","status":"pending","priority":1}]}`)) + })) + defer server.Close() + + c := setupTestClient(t, server.URL) + resp, err := c.Heartbeat(context.Background(), "test-agent-123") + + require.NoError(t, err) + require.NotNil(t, resp) + assert.Len(t, resp.PendingTasks, 1) + assert.Equal(t, "tsk_test", resp.PendingTasks[0].ID) + assert.Equal(t, "echo hello", resp.PendingTasks[0].Command) } diff --git a/internal/mysqlmetrics/collector.go b/internal/mysqlmetrics/collector.go index 8ba9de0..b7348a9 100644 --- a/internal/mysqlmetrics/collector.go +++ b/internal/mysqlmetrics/collector.go @@ -20,9 +20,13 @@ type Collector interface { } type mysqlCollector struct { - queryTimeout time.Duration - lastQueries *int64 - lastTime time.Time + queryTimeout time.Duration + lastQueries *int64 + lastSlowQueries *int64 + lastRowLockWaits *int64 + lastTmpDiskTables *int64 + lastSelectFullJoins *int64 + lastTime time.Time } func New() Collector { @@ -73,9 +77,10 @@ func (mc *mysqlCollector) Collect(cred credential.Credential) (metrics.MySQLData func (mc *mysqlCollector) collectGlobalStatus(ctx context.Context, db *sql.DB, m *metrics.MySQLDatabaseMetrics) error { rows, err := db.QueryContext(ctx, ` SHOW GLOBAL STATUS WHERE Variable_name IN ( - 'Threads_connected','Threads_running','Max_used_connections', - 'Queries','Slow_queries', - 'Innodb_buffer_pool_read_requests','Innodb_buffer_pool_reads' + 'Threads_connected','Threads_running', + 'Queries','Slow_queries','Aborted_connects', + 'Innodb_buffer_pool_read_requests','Innodb_buffer_pool_reads', + 'Innodb_row_lock_waits','Created_tmp_disk_tables','Select_full_join' ) `) if err != nil { @@ -95,27 +100,38 @@ func (mc *mysqlCollector) collectGlobalStatus(ctx context.Context, db *sql.DB, m return err } - m.ThreadsConnected = parseInt(status["Threads_connected"]) + m.ConnectionsTotal = parseInt(status["Threads_connected"]) m.ThreadsRunning = parseInt(status["Threads_running"]) - m.MaxUsedConnections = parseInt64(status["Max_used_connections"]) - m.SlowQueries = parseInt64(status["Slow_queries"]) + m.ConnectionsAborted = parseInt64(status["Aborted_connects"]) readReqs := parseFloat64(status["Innodb_buffer_pool_read_requests"]) diskReads := parseFloat64(status["Innodb_buffer_pool_reads"]) if readReqs > 0 { - m.InnoDBCacheHitRatio = (readReqs - diskReads) / readReqs * 100 + m.InnoDBBufferPoolHitRatio = (readReqs - diskReads) / readReqs * 100 } - // Delta-based QPS using cumulative Queries counter + // Delta-based rates using cumulative counters currentQueries := parseInt64(status["Queries"]) + currentSlowQueries := parseInt64(status["Slow_queries"]) + currentRowLockWaits := parseInt64(status["Innodb_row_lock_waits"]) + currentTmpDiskTables := parseInt64(status["Created_tmp_disk_tables"]) + currentSelectFullJoins := parseInt64(status["Select_full_join"]) now := time.Now() if mc.lastQueries != nil { elapsed := now.Sub(mc.lastTime).Seconds() if elapsed > 0 { m.QueriesPerSecond = float64(currentQueries-*mc.lastQueries) / elapsed + m.SlowQueriesPerSecond = float64(currentSlowQueries-*mc.lastSlowQueries) / elapsed + m.InnoDBRowLockWaitsPerSecond = float64(currentRowLockWaits-*mc.lastRowLockWaits) / elapsed + m.TmpDiskTablesPerSecond = float64(currentTmpDiskTables-*mc.lastTmpDiskTables) / elapsed + m.SelectFullScansPerSecond = float64(currentSelectFullJoins-*mc.lastSelectFullJoins) / elapsed } } mc.lastQueries = ¤tQueries + mc.lastSlowQueries = ¤tSlowQueries + mc.lastRowLockWaits = ¤tRowLockWaits + mc.lastTmpDiskTables = ¤tTmpDiskTables + mc.lastSelectFullJoins = ¤tSelectFullJoins mc.lastTime = now return nil diff --git a/internal/mysqlmetrics/collector_test.go b/internal/mysqlmetrics/collector_test.go new file mode 100644 index 0000000..1889c49 --- /dev/null +++ b/internal/mysqlmetrics/collector_test.go @@ -0,0 +1,89 @@ +package mysqlmetrics + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestDeltaRates_FirstCollectionReturnsZero(t *testing.T) { + mc := &mysqlCollector{queryTimeout: 10 * time.Second} + + assert.Nil(t, mc.lastQueries, "lastQueries should be nil before first collection") + assert.Nil(t, mc.lastSlowQueries, "lastSlowQueries should be nil before first collection") +} + +func TestDeltaRates_SecondCollectionReturnsRates(t *testing.T) { + baseTime := time.Now() + initialQueries := int64(1000) + initialSlowQueries := int64(10) + + mc := &mysqlCollector{ + queryTimeout: 10 * time.Second, + lastQueries: &initialQueries, + lastSlowQueries: &initialSlowQueries, + lastTime: baseTime, + } + + // Simulate 10 seconds elapsed, 500 new queries, 5 new slow queries + elapsed := 10 * time.Second + currentQueries := int64(1500) + currentSlowQueries := int64(15) + now := baseTime.Add(elapsed) + + elapsedSec := now.Sub(mc.lastTime).Seconds() + qps := float64(currentQueries-*mc.lastQueries) / elapsedSec + sqps := float64(currentSlowQueries-*mc.lastSlowQueries) / elapsedSec + + assert.Equal(t, 50.0, qps, "QPS should be 500 queries / 10s = 50") + assert.Equal(t, 0.5, sqps, "slow QPS should be 5 queries / 10s = 0.5") +} + +func TestDeltaRates_ZeroElapsedReturnsZero(t *testing.T) { + baseTime := time.Now() + initialQueries := int64(1000) + initialSlowQueries := int64(10) + + mc := &mysqlCollector{ + queryTimeout: 10 * time.Second, + lastQueries: &initialQueries, + lastSlowQueries: &initialSlowQueries, + lastTime: baseTime, + } + + elapsed := mc.lastTime.Sub(baseTime).Seconds() + assert.Equal(t, 0.0, elapsed, "elapsed should be 0 when time hasn't advanced") +} + +func TestInnoDBBufferPoolHitRatio_Calculation(t *testing.T) { + readReqs := 10000.0 + diskReads := 100.0 + + ratio := (readReqs - diskReads) / readReqs * 100 + assert.Equal(t, 99.0, ratio, "hit ratio should be 99% when 100 of 10000 reads hit disk") +} + +func TestInnoDBBufferPoolHitRatio_ZeroRequests(t *testing.T) { + readReqs := 0.0 + diskReads := 0.0 + + var ratio float64 + if readReqs > 0 { + ratio = (readReqs - diskReads) / readReqs * 100 + } + assert.Equal(t, 0.0, ratio, "hit ratio should be 0 when there are no read requests") +} + +func TestParseInt_ValidValues(t *testing.T) { + assert.Equal(t, 42, parseInt("42")) + assert.Equal(t, 0, parseInt("0")) + assert.Equal(t, 0, parseInt("")) + assert.Equal(t, 0, parseInt("invalid")) +} + +func TestParseInt64_ValidValues(t *testing.T) { + assert.Equal(t, int64(1234567890), parseInt64("1234567890")) + assert.Equal(t, int64(0), parseInt64("")) + assert.Equal(t, int64(0), parseInt64("invalid")) +} diff --git a/main.go b/main.go index 528fd5a..31e54a5 100644 --- a/main.go +++ b/main.go @@ -332,7 +332,7 @@ func runServer(ctx context.Context, cmd *cli.Command) error { heartbeatjob.TriggerWithConfig(ctx, fn, heartbeatjob.TriggerConfig{Interval: appconf.HeartbeatInterval()}) }, }) - heartbeatJob.Register(jobCtx, heartbeatSvc) + heartbeatJob.Register(jobCtx, heartbeatSvc, taskJob) // Self-update job (gated by config) if appconf.SelfUpdateEnabled() {