Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 53 additions & 7 deletions crates/taurus/src/app/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod worker;

use std::sync::Arc;
use std::time::Duration;

use code0_flow::flow_config::load_env_file;
Expand Down Expand Up @@ -27,7 +28,7 @@ pub async fn run() {
let client = connect_nats(&config).await;

let mut health_task = spawn_health_task(&config);
let (runtime_status_service, runtime_usage_service) =
let (runtime_status_service, runtime_usage_service, mut runtime_status_heartbeat_task) =
setup_dynamic_services_if_needed(&config).await;

let nats_remote = NATSRemoteRuntime::new(client.clone());
Expand All @@ -41,6 +42,14 @@ pub async fn run() {
);

wait_for_shutdown(&mut worker_task, &mut health_task).await;
if let Some(handle) = runtime_status_heartbeat_task.take() {
handle.abort();
if let Err(err) = handle.await {
if !err.is_cancelled() {
log::warn!("Runtime status heartbeat task ended unexpectedly: {}", err);
}
}
}
update_stopped_status(runtime_status_service.as_ref()).await;

log::info!("Taurus shutdown complete");
Expand Down Expand Up @@ -95,11 +104,12 @@ fn spawn_health_task(config: &Config) -> Option<JoinHandle<()>> {
async fn setup_dynamic_services_if_needed(
config: &Config,
) -> (
Option<TaurusRuntimeStatusService>,
Option<Arc<TaurusRuntimeStatusService>>,
Option<TaurusRuntimeUsageService>,
Option<JoinHandle<()>>,
) {
if config.mode != DYNAMIC {
return (None, None);
return (None, None, None);
}

push_definitions_until_success(config).await;
Expand All @@ -109,23 +119,59 @@ async fn setup_dynamic_services_if_needed(
.await,
);

let runtime_status_service = Some(
let runtime_status_service = Some(Arc::new(
TaurusRuntimeStatusService::from_url(
config.aquila_url.clone(),
config.aquila_token.clone(),
"taurus".into(),
runtime_features(),
)
.await,
);
));

if let Some(status_service) = runtime_status_service.as_ref() {
status_service
.update_runtime_status(tucana::shared::execution_runtime_status::Status::Running)
.await;
}

(runtime_status_service, runtime_usage_service)
let runtime_status_heartbeat_task = if config.adapter_status_update_interval_seconds > 0 {
let status_service = runtime_status_service
.as_ref()
.expect("runtime status service should exist in dynamic mode")
.clone();
let update_interval_seconds = config.adapter_status_update_interval_seconds;

let handle = tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(update_interval_seconds));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

// First tick is immediate; consume it so heartbeats start after the interval.
interval.tick().await;

loop {
interval.tick().await;
status_service
.update_runtime_status(tucana::shared::execution_runtime_status::Status::Running)
.await;
Comment on lines +152 to +156
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The heartbeat loop calls update_runtime_status on every tick; that method logs at INFO level on each call and performs a network request. With the default 30s interval this can create persistent log noise and steady load on Aquila/Sagittarius. Consider lowering the log level for periodic heartbeats (or adding a lightweight/quiet update path) so normal operation doesn’t spam logs.

Copilot uses AI. Check for mistakes.
}
});

log::info!(
"Runtime status heartbeat started (interval={}s)",
update_interval_seconds
);
Some(handle)
} else {
log::info!("Runtime status heartbeat is disabled");
None
};

(
runtime_status_service,
runtime_usage_service,
runtime_status_heartbeat_task,
)
}

async fn push_definitions_until_success(config: &Config) {
Expand Down Expand Up @@ -165,7 +211,7 @@ fn runtime_features() -> Vec<RuntimeFeature> {
}]
}

async fn update_stopped_status(runtime_status_service: Option<&TaurusRuntimeStatusService>) {
async fn update_stopped_status(runtime_status_service: Option<&Arc<TaurusRuntimeStatusService>>) {
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update_stopped_status takes Option<&Arc<TaurusRuntimeStatusService>>, which propagates the Arc into the helper’s signature. To keep the helper independent of the ownership strategy, consider taking Option<&TaurusRuntimeStatusService> and calling it with runtime_status_service.as_deref() (and similarly for other call sites).

Suggested change
async fn update_stopped_status(runtime_status_service: Option<&Arc<TaurusRuntimeStatusService>>) {
async fn update_stopped_status(
runtime_status_service: Option<&TaurusRuntimeStatusService>,
) {

Copilot uses AI. Check for mistakes.
if let Some(status_service) = runtime_status_service {
status_service
.update_runtime_status(tucana::shared::execution_runtime_status::Status::Stopped)
Expand Down
8 changes: 8 additions & 0 deletions crates/taurus/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ pub struct Config {
pub grpc_port: u16,

pub definitions: String,

/// Runtime status heartbeat interval in seconds while Taurus is running.
/// Set to 0 to disable periodic heartbeat updates.
pub adapter_status_update_interval_seconds: u64,
Comment on lines +27 to +29
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new config field name/env var (adapter_status_update_interval_seconds / ADAPTER_STATUS_UPDATE_INTERVAL_SECONDS) doesn’t match the docstring (“Runtime status heartbeat”). Consider renaming the field and env var to refer to runtime-status/heartbeat (and update the doc comment to mention it only applies in dynamic mode, since the status service is only initialized there).

Copilot uses AI. Check for mistakes.
}

/// Implementation for all relevant `Aquila` startup configurations
Expand All @@ -40,6 +44,10 @@ impl Config {
grpc_host: env_with_default("GRPC_HOST", "127.0.0.1".to_string()),
grpc_port: env_with_default("GRPC_PORT", 50051),
definitions: env_with_default("DEFINITIONS", String::from("./definitions")),
adapter_status_update_interval_seconds: env_with_default(
"ADAPTER_STATUS_UPDATE_INTERVAL_SECONDS",
30_u64,
),
}
}
}