Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions crates/base/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl DracoRuntimeStatusService {
&self,
status: tucana::shared::adapter_runtime_status::Status,
) {
log::info!("Updating the current Runtime Status!");
log::info!("Updating the current runtime status!");
let mut client = RuntimeStatusServiceClient::new(self.channel.clone());
Comment thread
raphael-goetz marked this conversation as resolved.

let now = SystemTime::now();
Expand All @@ -128,12 +128,10 @@ impl DracoRuntimeStatusService {
);

match client.update(request).await {
Ok(response) => {
log::info!(
"Was the update of the RuntimeStatus accepted by Sagittarius? {}",
response.into_inner().success
);
}
Ok(response) => match response.into_inner().success {
true => log::info!("Successful update of the runtime status."),
false => log::warn!("Failed to update runtime status."),
},
Comment thread
raphael-goetz marked this conversation as resolved.
Err(err) => {
log::error!("Failed to update RuntimeStatus: {:?}", err);
}
Expand Down
11 changes: 11 additions & 0 deletions crates/base/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ pub struct AdapterConfig {
///
/// The Variant of Draco. E.g. Http, Cron...
pub draco_variant: String,

/// Adapter Status Update Interval Seconds
///
/// Interval for runtime status heartbeat updates while the adapter is running.
/// Set to 0 to disable periodic heartbeat updates.
pub adapter_status_update_interval_seconds: u64,
}

impl AdapterConfig {
Expand Down Expand Up @@ -93,6 +99,10 @@ impl AdapterConfig {

let draco_variant =
code0_flow::flow_config::env_with_default("DRACO_VARIANT", String::from("None"));
let adapter_status_update_interval_seconds = code0_flow::flow_config::env_with_default(
"ADAPTER_STATUS_UPDATE_INTERVAL_SECONDS",
30_u64,
);
Self {
environment,
nats_bucket,
Expand All @@ -105,6 +115,7 @@ impl AdapterConfig {
definition_path,
with_health_service,
draco_variant,
adapter_status_update_interval_seconds,
}
}

Expand Down
47 changes: 43 additions & 4 deletions crates/base/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
};
use code0_flow::flow_service::FlowUpdateService;
use std::{sync::Arc, time::Duration};
use tokio::{signal, time::sleep};
use tokio::{signal, task::JoinHandle, time::sleep};
use tonic::transport::Server;
use tonic_health::pb::health_server::HealthServer;
use tucana::shared::{AdapterConfiguration, RuntimeFeature};
Expand Down Expand Up @@ -62,11 +62,12 @@ impl<C: LoadConfig> ServerRunner<C> {
runtime_config: Vec<AdapterConfiguration>,
) -> anyhow::Result<()> {
let config = self.context.adapter_config.clone();
let mut runtime_status_service: Option<DracoRuntimeStatusService> = None;
let mut runtime_status_service: Option<Arc<DracoRuntimeStatusService>> = None;
let mut runtime_status_heartbeat_task: Option<JoinHandle<()>> = None;
log::info!("Starting Draco Variant: {}", config.draco_variant);

if !config.is_static() {
runtime_status_service = Some(
runtime_status_service = Some(Arc::new(
DracoRuntimeStatusService::from_url(
config.aquila_url.clone(),
config.aquila_token.clone(),
Expand All @@ -75,7 +76,7 @@ impl<C: LoadConfig> ServerRunner<C> {
runtime_config,
)
.await,
);
));

if let Some(ser) = &runtime_status_service {
ser.update_runtime_status_by_status(
Expand Down Expand Up @@ -148,6 +149,35 @@ impl<C: LoadConfig> ServerRunner<C> {
tucana::shared::adapter_runtime_status::Status::Running,
)
.await;

if config.adapter_status_update_interval_seconds > 0 {
let status_service = Arc::clone(ser);
let update_interval_seconds = config.adapter_status_update_interval_seconds;
runtime_status_heartbeat_task = Some(tokio::spawn(async move {
let mut interval =
tokio::time::interval(Duration::from_secs(update_interval_seconds));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

// First tick is immediate; consume it so heartbeats start after the interval.
interval.tick().await;

loop {
interval.tick().await;
status_service
.update_runtime_status_by_status(
tucana::shared::adapter_runtime_status::Status::Running,
)
Comment thread
raphael-goetz marked this conversation as resolved.
.await;
}
}));

log::info!(
"Runtime status heartbeat started (interval={}s)",
update_interval_seconds
);
} else {
log::info!("Runtime status heartbeat is disabled");
}
};
log::info!("Draco successfully initialized.");

Expand Down Expand Up @@ -212,6 +242,15 @@ impl<C: LoadConfig> ServerRunner<C> {
}
}
}
if let Some(handle) = runtime_status_heartbeat_task.take() {
handle.abort();
if let Err(err) = handle.await {
if !err.is_cancelled() {
log::warn!("Runtime status heartbeat task ended unexpectedly: {}", err);
}
}
}

if let Some(ser) = &runtime_status_service {
ser.update_runtime_status_by_status(
tucana::shared::adapter_runtime_status::Status::Stopped,
Expand Down