diff --git a/crates/taurus/src/app/mod.rs b/crates/taurus/src/app/mod.rs index 3bf3587..c794a89 100644 --- a/crates/taurus/src/app/mod.rs +++ b/crates/taurus/src/app/mod.rs @@ -1,5 +1,6 @@ mod worker; +use std::sync::Arc; use std::time::Duration; use code0_flow::flow_config::load_env_file; @@ -27,7 +28,7 @@ pub async fn run() { let client = connect_nats(&config).await; let mut health_task = spawn_health_task(&config); - let (runtime_status_service, runtime_usage_service) = + let (runtime_status_service, runtime_usage_service, mut runtime_status_heartbeat_task) = setup_dynamic_services_if_needed(&config).await; let nats_remote = NATSRemoteRuntime::new(client.clone()); @@ -41,6 +42,14 @@ pub async fn run() { ); wait_for_shutdown(&mut worker_task, &mut health_task).await; + if let Some(handle) = runtime_status_heartbeat_task.take() { + handle.abort(); + if let Err(err) = handle.await { + if !err.is_cancelled() { + log::warn!("Runtime status heartbeat task ended unexpectedly: {}", err); + } + } + } update_stopped_status(runtime_status_service.as_ref()).await; log::info!("Taurus shutdown complete"); @@ -95,11 +104,12 @@ fn spawn_health_task(config: &Config) -> Option> { async fn setup_dynamic_services_if_needed( config: &Config, ) -> ( - Option, + Option>, Option, + Option>, ) { if config.mode != DYNAMIC { - return (None, None); + return (None, None, None); } push_definitions_until_success(config).await; @@ -109,7 +119,7 @@ async fn setup_dynamic_services_if_needed( .await, ); - let runtime_status_service = Some( + let runtime_status_service = Some(Arc::new( TaurusRuntimeStatusService::from_url( config.aquila_url.clone(), config.aquila_token.clone(), @@ -117,7 +127,7 @@ async fn setup_dynamic_services_if_needed( runtime_features(), ) .await, - ); + )); if let Some(status_service) = runtime_status_service.as_ref() { status_service @@ -125,7 +135,43 @@ async fn setup_dynamic_services_if_needed( .await; } - (runtime_status_service, runtime_usage_service) + let runtime_status_heartbeat_task = if config.adapter_status_update_interval_seconds > 0 { + let status_service = runtime_status_service + .as_ref() + .expect("runtime status service should exist in dynamic mode") + .clone(); + let update_interval_seconds = config.adapter_status_update_interval_seconds; + + let handle = tokio::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_secs(update_interval_seconds)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + // First tick is immediate; consume it so heartbeats start after the interval. + interval.tick().await; + + loop { + interval.tick().await; + status_service + .update_runtime_status(tucana::shared::execution_runtime_status::Status::Running) + .await; + } + }); + + log::info!( + "Runtime status heartbeat started (interval={}s)", + update_interval_seconds + ); + Some(handle) + } else { + log::info!("Runtime status heartbeat is disabled"); + None + }; + + ( + runtime_status_service, + runtime_usage_service, + runtime_status_heartbeat_task, + ) } async fn push_definitions_until_success(config: &Config) { @@ -165,7 +211,7 @@ fn runtime_features() -> Vec { }] } -async fn update_stopped_status(runtime_status_service: Option<&TaurusRuntimeStatusService>) { +async fn update_stopped_status(runtime_status_service: Option<&Arc>) { if let Some(status_service) = runtime_status_service { status_service .update_runtime_status(tucana::shared::execution_runtime_status::Status::Stopped) diff --git a/crates/taurus/src/config/mod.rs b/crates/taurus/src/config/mod.rs index dfc2a39..dd9e550 100644 --- a/crates/taurus/src/config/mod.rs +++ b/crates/taurus/src/config/mod.rs @@ -23,6 +23,10 @@ pub struct Config { pub grpc_port: u16, pub definitions: String, + + /// Runtime status heartbeat interval in seconds while Taurus is running. + /// Set to 0 to disable periodic heartbeat updates. + pub adapter_status_update_interval_seconds: u64, } /// Implementation for all relevant `Aquila` startup configurations @@ -40,6 +44,10 @@ impl Config { grpc_host: env_with_default("GRPC_HOST", "127.0.0.1".to_string()), grpc_port: env_with_default("GRPC_PORT", 50051), definitions: env_with_default("DEFINITIONS", String::from("./definitions")), + adapter_status_update_interval_seconds: env_with_default( + "ADAPTER_STATUS_UPDATE_INTERVAL_SECONDS", + 30_u64, + ), } } }