diff --git a/Cargo.lock b/Cargo.lock index ba59e2a..26718db 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2056,6 +2056,7 @@ dependencies = [ "thiserror 2.0.18", "tikv-jemallocator", "tokio", + "tokio-util", "tracing", "tracing-subscriber", "vergen-git2", @@ -2165,6 +2166,7 @@ dependencies = [ "serde", "serde_json", "tokio", + "tokio-util", "tower", "tracing", ] diff --git a/Cargo.toml b/Cargo.toml index d0a415e..c407f5e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,6 +49,7 @@ hex = "0.4" spawned-concurrency = "0.5.0" spawned-rt = "0.5.0" tokio = "1.0" +tokio-util = "0.7" prometheus = "0.14" diff --git a/bin/ethlambda/Cargo.toml b/bin/ethlambda/Cargo.toml index 799a27b..e5e22ee 100644 --- a/bin/ethlambda/Cargo.toml +++ b/bin/ethlambda/Cargo.toml @@ -17,6 +17,7 @@ libssz.workspace = true libssz-types.workspace = true tokio.workspace = true +tokio-util.workspace = true tracing.workspace = true tracing-subscriber = "0.3" diff --git a/bin/ethlambda/src/main.rs b/bin/ethlambda/src/main.rs index 3c3f816..2821489 100644 --- a/bin/ethlambda/src/main.rs +++ b/bin/ethlambda/src/main.rs @@ -16,6 +16,7 @@ use std::{ path::{Path, PathBuf}, sync::Arc, }; +use tokio_util::sync::CancellationToken; use clap::Parser; use ethlambda_blockchain::key_manager::ValidatorKeyPair; @@ -29,7 +30,7 @@ use ethlambda_types::{ state::{State, ValidatorPubkeyBytes}, }; use serde::Deserialize; -use tracing::{error, info}; +use tracing::{error, info, warn}; use tracing_subscriber::{EnvFilter, Layer, Registry, layer::SubscriberExt}; use ethlambda_blockchain::BlockChain; @@ -204,21 +205,51 @@ async fn main() -> eyre::Result<()> { }) .inspect_err(|err| error!(%err, "Failed to send InitBlockChain — actors not wired"))?; - tokio::spawn(async move { - let _ = ethlambda_rpc::start_metrics_server(metrics_socket) + let shutdown_token = CancellationToken::new(); + let metrics_shutdown = shutdown_token.clone(); + let api_shutdown = shutdown_token.clone(); + + let metrics_handle = tokio::spawn(async move { + let _ = ethlambda_rpc::start_metrics_server(metrics_socket, metrics_shutdown) .await .inspect_err(|err| error!(%err, "Metrics server failed")); }); - tokio::spawn(async move { - let _ = ethlambda_rpc::start_api_server(api_socket, store, aggregator) + let api_handle = tokio::spawn(async move { + let _ = ethlambda_rpc::start_api_server(api_socket, store, aggregator, api_shutdown) .await .inspect_err(|err| error!(%err, "API server failed")); }); info!("Node initialized"); + // 1st ctrl+c: start graceful shutdown tokio::signal::ctrl_c().await.ok(); - println!("Shutting down..."); + + info!("Shutdown signal received, stopping actors and servers..."); + + tokio::spawn(async move { + // This can be turned into a loop + tokio::signal::ctrl_c().await.ok(); + warn!("Graceful shutdown in progress. Press ctrl+C 2 more times to force ungraceful shutdown"); + tokio::signal::ctrl_c().await.ok(); + warn!("Graceful shutdown in progress. Press ctrl+C 1 more times to force ungraceful shutdown"); + tokio::signal::ctrl_c().await.ok(); + info!("Forced ungraceful shutdown..."); + std::process::exit(1); + }); + + let blockchain_ref = blockchain.actor_ref().clone(); + let p2p_ref = p2p.actor_ref().clone(); + blockchain_ref.context().stop(); + p2p_ref.context().stop(); + shutdown_token.cancel(); + + blockchain_ref.join().await; + p2p_ref.join().await; + let _ = api_handle.await; + let _ = metrics_handle.await; + + info!("Shutdown complete"); Ok(()) } diff --git a/crates/net/rpc/Cargo.toml b/crates/net/rpc/Cargo.toml index c05e9a2..df7e23c 100644 --- a/crates/net/rpc/Cargo.toml +++ b/crates/net/rpc/Cargo.toml @@ -12,6 +12,7 @@ version.workspace = true [dependencies] axum = "0.8.1" tokio.workspace = true +tokio-util.workspace = true ethlambda-fork-choice.workspace = true ethlambda-metrics.workspace = true tracing.workspace = true diff --git a/crates/net/rpc/src/lib.rs b/crates/net/rpc/src/lib.rs index acec7fa..e756e2f 100644 --- a/crates/net/rpc/src/lib.rs +++ b/crates/net/rpc/src/lib.rs @@ -7,6 +7,7 @@ use ethlambda_storage::Store; use ethlambda_types::aggregator::AggregatorController; use ethlambda_types::primitives::H256; use libssz::SszEncode; +use tokio_util::sync::CancellationToken; pub(crate) const JSON_CONTENT_TYPE: &str = "application/json; charset=utf-8"; pub(crate) const SSZ_CONTENT_TYPE: &str = "application/octet-stream"; @@ -20,23 +21,35 @@ pub async fn start_api_server( address: SocketAddr, store: Store, aggregator: AggregatorController, + shutdown: CancellationToken, ) -> Result<(), std::io::Error> { let api_router = build_api_router(store).layer(Extension(aggregator)); let listener = tokio::net::TcpListener::bind(address).await?; - axum::serve(listener, api_router).await?; + axum::serve(listener, api_router) + .with_graceful_shutdown(async move { + shutdown.cancelled().await; + }) + .await?; Ok(()) } -pub async fn start_metrics_server(address: SocketAddr) -> Result<(), std::io::Error> { +pub async fn start_metrics_server( + address: SocketAddr, + shutdown: CancellationToken, +) -> Result<(), std::io::Error> { let metrics_router = metrics::start_prometheus_metrics_api(); let debug_router = build_debug_router(); let app = Router::new().merge(metrics_router).merge(debug_router); let listener = tokio::net::TcpListener::bind(address).await?; - axum::serve(listener, app).await?; + axum::serve(listener, app) + .with_graceful_shutdown(async move { + shutdown.cancelled().await; + }) + .await?; Ok(()) }