diff --git a/crates/base/src/client/mod.rs b/crates/base/src/client/mod.rs index 0bc8c43..ea9c5a5 100644 --- a/crates/base/src/client/mod.rs +++ b/crates/base/src/client/mod.rs @@ -101,7 +101,7 @@ impl DracoRuntimeStatusService { &self, status: tucana::shared::adapter_runtime_status::Status, ) { - log::info!("Updating the current Runtime Status!"); + log::info!("Updating the current runtime status!"); let mut client = RuntimeStatusServiceClient::new(self.channel.clone()); let now = SystemTime::now(); @@ -128,12 +128,10 @@ impl DracoRuntimeStatusService { ); match client.update(request).await { - Ok(response) => { - log::info!( - "Was the update of the RuntimeStatus accepted by Sagittarius? {}", - response.into_inner().success - ); - } + Ok(response) => match response.into_inner().success { + true => log::info!("Successful update of the runtime status."), + false => log::warn!("Failed to update runtime status."), + }, Err(err) => { log::error!("Failed to update RuntimeStatus: {:?}", err); } diff --git a/crates/base/src/config.rs b/crates/base/src/config.rs index ca139aa..bccb8be 100644 --- a/crates/base/src/config.rs +++ b/crates/base/src/config.rs @@ -61,6 +61,12 @@ pub struct AdapterConfig { /// /// The Variant of Draco. E.g. Http, Cron... pub draco_variant: String, + + /// Adapter Status Update Interval Seconds + /// + /// Interval for runtime status heartbeat updates while the adapter is running. + /// Set to 0 to disable periodic heartbeat updates. + pub adapter_status_update_interval_seconds: u64, } impl AdapterConfig { @@ -93,6 +99,10 @@ impl AdapterConfig { let draco_variant = code0_flow::flow_config::env_with_default("DRACO_VARIANT", String::from("None")); + let adapter_status_update_interval_seconds = code0_flow::flow_config::env_with_default( + "ADAPTER_STATUS_UPDATE_INTERVAL_SECONDS", + 30_u64, + ); Self { environment, nats_bucket, @@ -105,6 +115,7 @@ impl AdapterConfig { definition_path, with_health_service, draco_variant, + adapter_status_update_interval_seconds, } } diff --git a/crates/base/src/runner.rs b/crates/base/src/runner.rs index 52eaf3d..385057b 100644 --- a/crates/base/src/runner.rs +++ b/crates/base/src/runner.rs @@ -6,7 +6,7 @@ use crate::{ }; use code0_flow::flow_service::FlowUpdateService; use std::{sync::Arc, time::Duration}; -use tokio::{signal, time::sleep}; +use tokio::{signal, task::JoinHandle, time::sleep}; use tonic::transport::Server; use tonic_health::pb::health_server::HealthServer; use tucana::shared::{AdapterConfiguration, RuntimeFeature}; @@ -62,11 +62,12 @@ impl ServerRunner { runtime_config: Vec, ) -> anyhow::Result<()> { let config = self.context.adapter_config.clone(); - let mut runtime_status_service: Option = None; + let mut runtime_status_service: Option> = None; + let mut runtime_status_heartbeat_task: Option> = None; log::info!("Starting Draco Variant: {}", config.draco_variant); if !config.is_static() { - runtime_status_service = Some( + runtime_status_service = Some(Arc::new( DracoRuntimeStatusService::from_url( config.aquila_url.clone(), config.aquila_token.clone(), @@ -75,7 +76,7 @@ impl ServerRunner { runtime_config, ) .await, - ); + )); if let Some(ser) = &runtime_status_service { ser.update_runtime_status_by_status( @@ -148,6 +149,35 @@ impl ServerRunner { tucana::shared::adapter_runtime_status::Status::Running, ) .await; + + if config.adapter_status_update_interval_seconds > 0 { + let status_service = Arc::clone(ser); + let update_interval_seconds = config.adapter_status_update_interval_seconds; + runtime_status_heartbeat_task = Some(tokio::spawn(async move { + let mut interval = + tokio::time::interval(Duration::from_secs(update_interval_seconds)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + // First tick is immediate; consume it so heartbeats start after the interval. + interval.tick().await; + + loop { + interval.tick().await; + status_service + .update_runtime_status_by_status( + tucana::shared::adapter_runtime_status::Status::Running, + ) + .await; + } + })); + + log::info!( + "Runtime status heartbeat started (interval={}s)", + update_interval_seconds + ); + } else { + log::info!("Runtime status heartbeat is disabled"); + } }; log::info!("Draco successfully initialized."); @@ -212,6 +242,15 @@ impl ServerRunner { } } } + if let Some(handle) = runtime_status_heartbeat_task.take() { + handle.abort(); + if let Err(err) = handle.await { + if !err.is_cancelled() { + log::warn!("Runtime status heartbeat task ended unexpectedly: {}", err); + } + } + } + if let Some(ser) = &runtime_status_service { ser.update_runtime_status_by_status( tucana::shared::adapter_runtime_status::Status::Stopped,