From e0ee685b680cfb2c9c7e3787521d8912aa79f792 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Tich=C3=A1k?= Date: Wed, 22 Apr 2026 18:41:01 +0200 Subject: [PATCH 1/3] [core] add metrics to the transition --- common/monitoring/metric.go | 11 +++++++++++ core/environment/transition_configure.go | 6 ++++++ core/environment/transition_deploy.go | 8 ++++++++ core/environment/transition_goerror.go | 4 ++++ core/environment/transition_reset.go | 6 ++++++ core/environment/transition_startactivity.go | 6 ++++++ core/environment/transition_stopactivity.go | 6 ++++++ core/environment/utils.go | 10 +++++++++- core/workflow/callable/call.go | 6 +++--- 9 files changed, 59 insertions(+), 4 deletions(-) diff --git a/common/monitoring/metric.go b/common/monitoring/metric.go index 41f8b78fd..fe0ab8c9a 100644 --- a/common/monitoring/metric.go +++ b/common/monitoring/metric.go @@ -71,6 +71,17 @@ func (metric *Metric) AddTag(tagName string, value string) { metric.tags = append(metric.tags, Tag{name: tagName, value: value}) } +const ( + ERROR = "error" + SUCCESS = "success" + CANCELLED = "cancelled" + TIMEOUT = "timeout" +) + +func (metric *Metric) AddResult(result string) { + metric.AddTag("result", result) +} + func (metric *Metric) setField(fieldName string, field any) { if metric.fields == nil { metric.fields = make(FieldsType) diff --git a/core/environment/transition_configure.go b/core/environment/transition_configure.go index aab902bb2..4ff0a0072 100644 --- a/core/environment/transition_configure.go +++ b/core/environment/transition_configure.go @@ -30,6 +30,7 @@ import ( "github.com/AliceO2Group/Control/core/workflow" "github.com/AliceO2Group/Control/common/event" + "github.com/AliceO2Group/Control/common/monitoring" "github.com/AliceO2Group/Control/core/task" "github.com/AliceO2Group/Control/core/task/taskop" ) @@ -52,6 +53,9 @@ func (t ConfigureTransition) do(env *Environment) (err error) { return errors.New("cannot transition in NIL environment") } + metric := transitionMetric("configure", env) + defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)() + wf := env.Workflow() activeTasks := workflow.GetActiveTasks(wf) @@ -64,9 +68,11 @@ func (t ConfigureTransition) do(env *Environment) (err error) { incomingEv := <-env.stateChangedCh // If some tasks failed to transition if tasksStateErrors := incomingEv.GetTasksStateChangedError(); tasksStateErrors != nil { + metric.AddResult(monitoring.ERROR) return tasksStateErrors } env.sendEnvironmentEvent(&event.EnvironmentEvent{EnvironmentID: env.Id().String(), State: "CONFIGURED"}) + metric.AddResult(monitoring.SUCCESS) return } diff --git a/core/environment/transition_deploy.go b/core/environment/transition_deploy.go index 34436be04..a25ee15a8 100644 --- a/core/environment/transition_deploy.go +++ b/core/environment/transition_deploy.go @@ -36,6 +36,7 @@ import ( "github.com/AliceO2Group/Control/common/event" "github.com/AliceO2Group/Control/common/logger/infologger" + "github.com/AliceO2Group/Control/common/monitoring" "github.com/AliceO2Group/Control/core/task" "github.com/AliceO2Group/Control/core/task/sm" "github.com/AliceO2Group/Control/core/task/taskop" @@ -66,6 +67,9 @@ func (t DeployTransition) do(env *Environment) (err error) { return errors.New("cannot transition in NIL environment") } + metric := transitionMetric("deploy", env) + defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)() + wf := env.Workflow() // Skip cleanup for anything other than readout-dataflow @@ -347,10 +351,14 @@ func (t DeployTransition) do(env *Environment) (err error) { log.WithField("level", infologger.IL_Ops). WithField("partition", env.Id().String()). Error(err) + metric.AddResult(monitoring.ERROR) return } env.sendEnvironmentEvent(&event.EnvironmentEvent{EnvironmentID: env.Id().String(), State: "DEPLOYED"}) + + metric.AddResult(monitoring.SUCCESS) + return } diff --git a/core/environment/transition_goerror.go b/core/environment/transition_goerror.go index c8adb14e2..378441e87 100644 --- a/core/environment/transition_goerror.go +++ b/core/environment/transition_goerror.go @@ -25,6 +25,7 @@ package environment import ( + "github.com/AliceO2Group/Control/common/monitoring" "github.com/AliceO2Group/Control/core/controlcommands" "github.com/AliceO2Group/Control/core/task" "github.com/AliceO2Group/Control/core/task/sm" @@ -44,6 +45,8 @@ type GoErrorTransition struct { } func (t GoErrorTransition) do(env *Environment) (err error) { + metric := transitionMetric("goerror", env) + defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)() // we stop all tasks which are in RUNNING toStop := env.Workflow().GetTasks().Filtered(func(t *task.Task) bool { @@ -72,5 +75,6 @@ func (t GoErrorTransition) do(env *Environment) (err error) { <-env.stateChangedCh } + metric.AddResult(monitoring.SUCCESS) return } diff --git a/core/environment/transition_reset.go b/core/environment/transition_reset.go index fd9642dfa..50795dff0 100644 --- a/core/environment/transition_reset.go +++ b/core/environment/transition_reset.go @@ -28,6 +28,7 @@ import ( "errors" "github.com/AliceO2Group/Control/common/event" + "github.com/AliceO2Group/Control/common/monitoring" "github.com/AliceO2Group/Control/core/task" "github.com/AliceO2Group/Control/core/task/sm" "github.com/AliceO2Group/Control/core/workflow" @@ -51,6 +52,9 @@ func (t ResetTransition) do(env *Environment) (err error) { return errors.New("cannot transition in NIL environment") } + metric := transitionMetric("reset", env) + defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)() + taskmanMessage := task.NewTransitionTaskMessage( workflow.GetActiveTasks(env.Workflow()), sm.CONFIGURED.String(), @@ -64,9 +68,11 @@ func (t ResetTransition) do(env *Environment) (err error) { incomingEv := <-env.stateChangedCh // If some tasks failed to transition if tasksStateErrors := incomingEv.GetTasksStateChangedError(); tasksStateErrors != nil { + metric.AddResult(monitoring.ERROR) return tasksStateErrors } env.sendEnvironmentEvent(&event.EnvironmentEvent{EnvironmentID: env.Id().String(), State: "RESET"}) + metric.AddResult(monitoring.SUCCESS) return } diff --git a/core/environment/transition_startactivity.go b/core/environment/transition_startactivity.go index be7ec03c6..334ffb789 100644 --- a/core/environment/transition_startactivity.go +++ b/core/environment/transition_startactivity.go @@ -33,6 +33,7 @@ import ( "github.com/AliceO2Group/Control/common/event" "github.com/AliceO2Group/Control/common/logger/infologger" + "github.com/AliceO2Group/Control/common/monitoring" "github.com/AliceO2Group/Control/core/controlcommands" "github.com/AliceO2Group/Control/core/task" "github.com/iancoleman/strcase" @@ -72,6 +73,9 @@ func (t StartActivityTransition) do(env *Environment) (err error) { return errors.New("cannot transition in NIL environment") } + metric := transitionMetric("startactivity", env) + defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)() + runNumber := env.currentRunNumber log.WithField(infologger.Run, runNumber). @@ -120,6 +124,7 @@ func (t StartActivityTransition) do(env *Environment) (err error) { incomingEv := <-env.stateChangedCh // If some tasks failed to transition if tasksStateErrors := incomingEv.GetTasksStateChangedError(); tasksStateErrors != nil { + metric.AddResult(monitoring.ERROR) return tasksStateErrors } @@ -133,5 +138,6 @@ func (t StartActivityTransition) do(env *Environment) (err error) { Run: env.currentRunNumber, }) + metric.AddResult(monitoring.SUCCESS) return } diff --git a/core/environment/transition_stopactivity.go b/core/environment/transition_stopactivity.go index 8653302e1..c40e97454 100644 --- a/core/environment/transition_stopactivity.go +++ b/core/environment/transition_stopactivity.go @@ -29,6 +29,7 @@ import ( "github.com/AliceO2Group/Control/common/event" "github.com/AliceO2Group/Control/common/logger/infologger" + "github.com/AliceO2Group/Control/common/monitoring" "github.com/AliceO2Group/Control/core/controlcommands" "github.com/AliceO2Group/Control/core/task" "github.com/AliceO2Group/Control/core/task/sm" @@ -63,6 +64,9 @@ func (t StopActivityTransition) do(env *Environment) (err error) { return errors.New("cannot transition in NIL environment") } + metric := transitionMetric("stopactivity", env) + defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)() + log.WithField(infologger.Run, env.currentRunNumber). WithField("partition", env.Id().String()). WithField(infologger.Level, infologger.IL_Support). @@ -98,6 +102,7 @@ func (t StopActivityTransition) do(env *Environment) (err error) { incomingEv := <-env.stateChangedCh // If some tasks failed to transition if tasksStateErrors := incomingEv.GetTasksStateChangedError(); tasksStateErrors != nil { + metric.AddResult(monitoring.ERROR) return tasksStateErrors } env.sendEnvironmentEvent(&event.EnvironmentEvent{EnvironmentID: env.Id().String(), State: "CONFIGURED"}) @@ -107,5 +112,6 @@ func (t StopActivityTransition) do(env *Environment) (err error) { WithField(infologger.Level, infologger.IL_Support). Info("run stopped") + metric.AddResult(monitoring.SUCCESS) return } diff --git a/core/environment/utils.go b/core/environment/utils.go index 1ca0dcad3..135b3c44c 100644 --- a/core/environment/utils.go +++ b/core/environment/utils.go @@ -33,6 +33,7 @@ import ( "sort" "github.com/AliceO2Group/Control/common/logger/infologger" + "github.com/AliceO2Group/Control/common/monitoring" pb "github.com/AliceO2Group/Control/common/protos" "github.com/AliceO2Group/Control/core/task" "github.com/AliceO2Group/Control/core/task/sm" @@ -52,7 +53,7 @@ type WorkflowPublicInfo struct { func parseWorkflowPublicInfo(workflowExpr string) (WorkflowPublicInfo, error) { repoManager := the.RepoManager() - resolvedWorkflowPath, _, err := repoManager.GetWorkflow(workflowExpr) //Will fail if repo unknown + resolvedWorkflowPath, _, err := repoManager.GetWorkflow(workflowExpr) // Will fail if repo unknown if err != nil { return WorkflowPublicInfo{}, err } @@ -166,3 +167,10 @@ func HandleFailedGoError(err error, env *Environment) { env.setState("ERROR") } } + +func transitionMetric(transition string, env *Environment) monitoring.Metric { + metric := monitoring.NewMetric("transition_do") + metric.AddTag("transition", transition) + metric.AddTag("envId", env.Id().String()) + return metric +} diff --git a/core/workflow/callable/call.go b/core/workflow/callable/call.go index 502688c1e..fdb130646 100644 --- a/core/workflow/callable/call.go +++ b/core/workflow/callable/call.go @@ -117,7 +117,7 @@ func (c *Call) Call() error { WithField("level", infologger.IL_Devel). Debugf("calling hook function %s", c.Func) - metric := c.newMetric("callablecall") + metric := c.callableMetric("callablecall") defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)() the.EventWriterWithTopic(topic.Call).WriteEvent(&evpb.Ev_CallEvent{ @@ -227,7 +227,7 @@ func (c *Call) Call() error { return nil } -func (c *Call) newMetric(name string) monitoring.Metric { +func (c *Call) callableMetric(name string) monitoring.Metric { metric := monitoring.NewMetric(name) metric.AddTag("runtype", c.getRunTypeTag()) metric.AddTag("name", c.GetName()) @@ -241,7 +241,7 @@ func (c *Call) Start() { ctx, cancel := context.WithCancel(context.Background()) c.awaitCancel = cancel go func() { - metric := c.newMetric("callablewrapped") + metric := c.callableMetric("callablewrapped") defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)() callId := fmt.Sprintf("hook:%s:%s", c.GetTraits().Trigger, c.GetName()) From ab691abfc8172b99b1f3bf67bbe95515726c5686 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Tich=C3=A1k?= Date: Thu, 23 Apr 2026 11:06:10 +0200 Subject: [PATCH 2/3] [core] OCTRL-1092 add call results to metric in call.go --- core/environment/transition_deploy.go | 2 -- core/workflow/callable/call.go | 30 ++++++++++++++++++--------- 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/core/environment/transition_deploy.go b/core/environment/transition_deploy.go index a25ee15a8..0cbc05059 100644 --- a/core/environment/transition_deploy.go +++ b/core/environment/transition_deploy.go @@ -356,9 +356,7 @@ func (t DeployTransition) do(env *Environment) (err error) { } env.sendEnvironmentEvent(&event.EnvironmentEvent{EnvironmentID: env.Id().String(), State: "DEPLOYED"}) - metric.AddResult(monitoring.SUCCESS) - return } diff --git a/core/workflow/callable/call.go b/core/workflow/callable/call.go index fdb130646..1a83c100f 100644 --- a/core/workflow/callable/call.go +++ b/core/workflow/callable/call.go @@ -110,6 +110,15 @@ func (s Calls) AwaitAll() map[*Call]error { return errs } +func (c *Call) callableMetric(name string) monitoring.Metric { + metric := monitoring.NewMetric(name) + metric.AddTag("runtype", c.getRunTypeTag()) + metric.AddTag("name", c.GetName()) + metric.AddTag("trigger", c.GetTraits().Trigger) + metric.AddTag("envId", c.parentRole.GetEnvironmentId().String()) + return metric +} + func (c *Call) Call() error { log.WithField("trigger", c.Traits.Trigger). WithField("await", c.Traits.Await). @@ -178,6 +187,7 @@ func (c *Call) Call() error { EnvironmentId: c.parentRole.GetEnvironmentId().String(), }) + metric.AddResult(monitoring.ERROR) return err } if len(returnVar) > 0 { @@ -206,6 +216,7 @@ func (c *Call) Call() error { EnvironmentId: c.parentRole.GetEnvironmentId().String(), }) + metric.AddResult(monitoring.ERROR) return errors.New(errMsg) } @@ -224,18 +235,10 @@ func (c *Call) Call() error { EnvironmentId: c.parentRole.GetEnvironmentId().String(), }) + metric.AddResult(monitoring.SUCCESS) return nil } -func (c *Call) callableMetric(name string) monitoring.Metric { - metric := monitoring.NewMetric(name) - metric.AddTag("runtype", c.getRunTypeTag()) - metric.AddTag("name", c.GetName()) - metric.AddTag("trigger", c.GetTraits().Trigger) - metric.AddTag("envId", c.parentRole.GetEnvironmentId().String()) - return metric -} - func (c *Call) Start() { c.await = make(chan error) ctx, cancel := context.WithCancel(context.Background()) @@ -247,9 +250,16 @@ func (c *Call) Start() { callId := fmt.Sprintf("hook:%s:%s", c.GetTraits().Trigger, c.GetName()) log.Debugf("%s started", callId) defer utils.TimeTrack(time.Now(), callId, log.WithPrefix("callable")) + err := c.Call() select { - case c.await <- c.Call(): + case c.await <- err: + if err == nil { + metric.AddResult(monitoring.SUCCESS) + } else { + metric.AddResult(monitoring.ERROR) + } case <-ctx.Done(): + metric.AddResult(monitoring.CANCELLED) log.Debugf("%s cancelled", callId) } close(c.await) From 45c8f7132e60b53a37b99e7dac1b115649ca4e46 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Tich=C3=A1k?= Date: Fri, 24 Apr 2026 10:42:11 +0200 Subject: [PATCH 3/3] [core] use transtion name as a tag --- core/environment/transition.go | 10 +++++++++- core/environment/transition_configure.go | 2 +- core/environment/transition_deploy.go | 2 +- core/environment/transition_goerror.go | 2 +- core/environment/transition_reset.go | 2 +- core/environment/transition_startactivity.go | 2 +- core/environment/transition_stopactivity.go | 2 +- core/environment/utils.go | 8 -------- 8 files changed, 15 insertions(+), 15 deletions(-) diff --git a/core/environment/transition.go b/core/environment/transition.go index a821e3dfa..ecb563bf8 100644 --- a/core/environment/transition.go +++ b/core/environment/transition.go @@ -27,7 +27,8 @@ package environment import ( "errors" - "github.com/AliceO2Group/Control/core/protos" + "github.com/AliceO2Group/Control/common/monitoring" + pb "github.com/AliceO2Group/Control/core/protos" "github.com/AliceO2Group/Control/core/task" ) @@ -74,3 +75,10 @@ func (t baseTransition) check() (err error) { func (t baseTransition) eventName() string { return t.name } + +func (t baseTransition) transitionDoMetric(env *Environment) monitoring.Metric { + metric := monitoring.NewMetric("transition_do") + metric.AddTag("transition", t.name) + metric.AddTag("envId", env.Id().String()) + return metric +} diff --git a/core/environment/transition_configure.go b/core/environment/transition_configure.go index 4ff0a0072..09d93eca9 100644 --- a/core/environment/transition_configure.go +++ b/core/environment/transition_configure.go @@ -53,7 +53,7 @@ func (t ConfigureTransition) do(env *Environment) (err error) { return errors.New("cannot transition in NIL environment") } - metric := transitionMetric("configure", env) + metric := t.transitionDoMetric(env) defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)() wf := env.Workflow() diff --git a/core/environment/transition_deploy.go b/core/environment/transition_deploy.go index 0cbc05059..9c0e49b5b 100644 --- a/core/environment/transition_deploy.go +++ b/core/environment/transition_deploy.go @@ -67,7 +67,7 @@ func (t DeployTransition) do(env *Environment) (err error) { return errors.New("cannot transition in NIL environment") } - metric := transitionMetric("deploy", env) + metric := t.transitionDoMetric(env) defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)() wf := env.Workflow() diff --git a/core/environment/transition_goerror.go b/core/environment/transition_goerror.go index 378441e87..e0844f6d5 100644 --- a/core/environment/transition_goerror.go +++ b/core/environment/transition_goerror.go @@ -45,7 +45,7 @@ type GoErrorTransition struct { } func (t GoErrorTransition) do(env *Environment) (err error) { - metric := transitionMetric("goerror", env) + metric := t.transitionDoMetric(env) defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)() // we stop all tasks which are in RUNNING diff --git a/core/environment/transition_reset.go b/core/environment/transition_reset.go index 50795dff0..84bfd44da 100644 --- a/core/environment/transition_reset.go +++ b/core/environment/transition_reset.go @@ -52,7 +52,7 @@ func (t ResetTransition) do(env *Environment) (err error) { return errors.New("cannot transition in NIL environment") } - metric := transitionMetric("reset", env) + metric := t.transitionDoMetric(env) defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)() taskmanMessage := task.NewTransitionTaskMessage( diff --git a/core/environment/transition_startactivity.go b/core/environment/transition_startactivity.go index 334ffb789..47a547c1b 100644 --- a/core/environment/transition_startactivity.go +++ b/core/environment/transition_startactivity.go @@ -73,7 +73,7 @@ func (t StartActivityTransition) do(env *Environment) (err error) { return errors.New("cannot transition in NIL environment") } - metric := transitionMetric("startactivity", env) + metric := t.transitionDoMetric(env) defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)() runNumber := env.currentRunNumber diff --git a/core/environment/transition_stopactivity.go b/core/environment/transition_stopactivity.go index c40e97454..7e55fbe62 100644 --- a/core/environment/transition_stopactivity.go +++ b/core/environment/transition_stopactivity.go @@ -64,7 +64,7 @@ func (t StopActivityTransition) do(env *Environment) (err error) { return errors.New("cannot transition in NIL environment") } - metric := transitionMetric("stopactivity", env) + metric := t.transitionDoMetric(env) defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)() log.WithField(infologger.Run, env.currentRunNumber). diff --git a/core/environment/utils.go b/core/environment/utils.go index 135b3c44c..7bc15c4f7 100644 --- a/core/environment/utils.go +++ b/core/environment/utils.go @@ -33,7 +33,6 @@ import ( "sort" "github.com/AliceO2Group/Control/common/logger/infologger" - "github.com/AliceO2Group/Control/common/monitoring" pb "github.com/AliceO2Group/Control/common/protos" "github.com/AliceO2Group/Control/core/task" "github.com/AliceO2Group/Control/core/task/sm" @@ -167,10 +166,3 @@ func HandleFailedGoError(err error, env *Environment) { env.setState("ERROR") } } - -func transitionMetric(transition string, env *Environment) monitoring.Metric { - metric := monitoring.NewMetric("transition_do") - metric.AddTag("transition", transition) - metric.AddTag("envId", env.Id().String()) - return metric -}