Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions libdd-data-pipeline/src/trace_exporter/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,17 +320,14 @@ impl TraceExporterBuilder {
let libdatadog_version = tag!("libdatadog_version", env!("CARGO_PKG_VERSION"));

// On native, `C::new_client()` may capture `tokio::runtime::Handle::current()`
// internally (e.g. `NativeCapabilities`). Enter the SharedRuntime's tokio context
// so that handle is available. On wasm this is a no-op — the JS event loop is
// always the implicit executor.
#[cfg(not(target_arch = "wasm32"))]
let _guard = shared_runtime
.runtime_handle()
// internally (e.g. `NativeCapabilities`). Run it inside the SharedRuntime's
// tokio context so that handle is available. On wasm this is a no-op — the
// JS event loop is always the implicit executor.
let capabilities = shared_runtime
.with_runtime_context(C::new_client)
.map_err(|e| {
TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration(e.to_string()))
})?
.enter();
let capabilities = C::new_client();
})?;

// --- Platform-specific worker setup ---
// The blocks below spawn background workers via `SharedRuntime`. On
Expand Down
3 changes: 3 additions & 0 deletions libdd-shared-runtime-ffi/src/shared_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ pub enum SharedRuntimeErrorCode {
/// An unexpected panic occurred inside the FFI call.
#[cfg(feature = "catch_panic")]
Panic,
/// Operation rejected because the runtime has already been shut down.
AlreadyShutdown,
}

/// Error returned by SharedRuntime FFI functions.
Expand All @@ -50,6 +52,7 @@ impl From<SharedRuntimeError> for SharedRuntimeFFIError {
fn from(err: SharedRuntimeError) -> Self {
let code = match &err {
SharedRuntimeError::RuntimeUnavailable => SharedRuntimeErrorCode::RuntimeUnavailable,
SharedRuntimeError::AlreadyShutdown => SharedRuntimeErrorCode::AlreadyShutdown,
SharedRuntimeError::LockFailed(_) => SharedRuntimeErrorCode::LockFailed,
SharedRuntimeError::WorkerError(_) => SharedRuntimeErrorCode::WorkerError,
SharedRuntimeError::RuntimeCreation(_) => SharedRuntimeErrorCode::RuntimeCreation,
Expand Down
141 changes: 130 additions & 11 deletions libdd-shared-runtime/src/shared_runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::worker::Worker;
use futures::stream::{FuturesUnordered, StreamExt};
use libdd_common::MutexExt;
use pausable_worker::{PausableWorker, PausableWorkerError};
use std::sync::atomic::AtomicU64;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::{fmt, io};
use tracing::{debug, error};
Expand All @@ -26,7 +26,6 @@ use tracing::{debug, error};
mod native {
use super::*;
use pausable_worker::tokio_spawn_fn;
use std::sync::atomic::Ordering;
use tokio::runtime::{Builder, Runtime};

fn build_runtime() -> Result<Runtime, io::Error> {
Expand All @@ -42,21 +41,39 @@ mod native {
runtime: Arc::new(Mutex::new(Some(Arc::new(build_runtime()?)))),
workers: Arc::new(Mutex::new(Vec::new())),
next_worker_id: AtomicU64::new(1),
shutdown: AtomicBool::new(false),
})
}

/// Returns a clone of the tokio runtime handle managed by this SharedRuntime.
/// Run `f` with the shared tokio runtime entered as the current context.
///
/// Useful for synchronous initialization that calls into
/// [`tokio::runtime::Handle::current()`] (e.g., constructing an HTTP
/// client that captures the current handle internally).
///
/// # Fork safety
/// Tasks spawned via `tokio::spawn` / `Handle::current().spawn(...)`
/// inside `f` are NOT tracked by `SharedRuntime`: they will not be
/// paused before fork, restarted after fork, or shut down by
/// [`Self::shutdown`]. For background work, register a
/// [`crate::Worker`] via [`Self::spawn_worker`] instead.
///
/// # Errors
/// Returns [`SharedRuntimeError::RuntimeUnavailable`] if the runtime has been shut down.
pub fn runtime_handle(&self) -> Result<tokio::runtime::Handle, SharedRuntimeError> {
Ok(self
/// Returns [`SharedRuntimeError::RuntimeUnavailable`] if the runtime
/// has been shut down or is in a fork window.
pub fn with_runtime_context<F, T>(&self, f: F) -> Result<T, SharedRuntimeError>
where
Comment on lines -51 to +65
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure this is any safer than before 🤔
Maybe a tiny bit nicer because it moves the wasm cfg to the runtime

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

runtime_handle() handed callers an owned, long-lived capability that lets them bypass every safety guarantee. An spawned task from that handle is not in the workers list, is not paused by before_fork, etc so could it lead to confusion?

F: FnOnce() -> T,
{
let handle = self
.runtime
.lock_or_panic()
.as_ref()
.ok_or(SharedRuntimeError::RuntimeUnavailable)?
.handle()
.clone())
.clone();
let _guard = handle.enter();
Ok(f())
}

/// Spawn a PausableWorker on this runtime.
Expand Down Expand Up @@ -89,6 +106,10 @@ mod native {
let runtime_guard = self.runtime.lock_or_panic();
let mut workers_guard = self.workers.lock_or_panic();

if self.shutdown.load(Ordering::Acquire) {
return Err(SharedRuntimeError::AlreadyShutdown);
}

if let Some(rt) = runtime_guard.as_ref() {
if let Err(e) = pausable_worker.start(tokio_spawn_fn(rt.handle())) {
return Err(e.into());
Expand Down Expand Up @@ -166,7 +187,12 @@ mod native {
let mut workers_lock = self.workers.lock_or_panic();

for worker_entry in workers_lock.iter_mut() {
worker_entry.worker.start(tokio_spawn_fn(&handle))?;
if let Err(e) = worker_entry.worker.start(tokio_spawn_fn(&handle)) {
error!(
worker_id = worker_entry.id,
"Worker failed to restart after fork in parent: {:?}", e
);
}
}

Ok(())
Expand Down Expand Up @@ -198,7 +224,12 @@ mod native {

for worker_entry in workers_lock.iter_mut() {
worker_entry.worker.reset();
worker_entry.worker.start(tokio_spawn_fn(&handle))?;
if let Err(e) = worker_entry.worker.start(tokio_spawn_fn(&handle)) {
error!(
worker_id = worker_entry.id,
"Worker failed to restart after fork in child: {:?}", e
);
}
}

Ok(())
Expand Down Expand Up @@ -234,6 +265,7 @@ mod native {
timeout: Option<std::time::Duration>,
) -> Result<(), SharedRuntimeError> {
debug!(?timeout, "Shutting down SharedRuntime");
self.shutdown.store(true, Ordering::Release);
match self.runtime.lock_or_panic().take() {
Some(runtime) => {
if let Some(timeout) = timeout {
Expand Down Expand Up @@ -337,6 +369,8 @@ impl WorkerHandle {
pub enum SharedRuntimeError {
/// The runtime is not available or in an invalid state.
RuntimeUnavailable,
/// Operation rejected because the runtime has already been shut down.
AlreadyShutdown,
/// Failed to acquire a lock on internal state.
LockFailed(String),
/// A worker operation failed.
Expand All @@ -353,6 +387,7 @@ impl fmt::Display for SharedRuntimeError {
Self::RuntimeUnavailable => {
write!(f, "Runtime is not available or in an invalid state")
}
Self::AlreadyShutdown => write!(f, "Runtime has already been shut down"),
Self::LockFailed(msg) => write!(f, "Failed to acquire lock: {}", msg),
Self::WorkerError(err) => write!(f, "Worker error: {}", err),
Self::RuntimeCreation(err) => {
Expand Down Expand Up @@ -397,6 +432,8 @@ pub struct SharedRuntime {
runtime: Arc<Mutex<Option<Arc<tokio::runtime::Runtime>>>>,
workers: Arc<Mutex<Vec<WorkerEntry>>>,
next_worker_id: AtomicU64,
/// Set once `shutdown` / `shutdown_async` is called
shutdown: AtomicBool,
}

impl SharedRuntime {
Expand All @@ -419,6 +456,7 @@ impl SharedRuntime {
Ok(Self {
workers: Arc::new(Mutex::new(Vec::new())),
next_worker_id: AtomicU64::new(1),
shutdown: AtomicBool::new(false),
})
}
}
Expand All @@ -430,14 +468,16 @@ impl SharedRuntime {
worker: T,
restart_on_fork: bool,
) -> Result<WorkerHandle, SharedRuntimeError> {
use std::sync::atomic::Ordering;

let boxed_worker: BoxedWorker = Box::new(worker);
debug!(?boxed_worker, "Spawning worker on SharedRuntime");
let mut pausable_worker = PausableWorker::new(boxed_worker);

let mut workers_guard = self.workers.lock_or_panic();

if self.shutdown.load(Ordering::Acquire) {
return Err(SharedRuntimeError::AlreadyShutdown);
}

if let Err(e) = pausable_worker.start(|future| {
use futures_util::FutureExt;
let (remote, handle) = future.remote_handle();
Expand All @@ -461,6 +501,17 @@ impl SharedRuntime {
})
}

/// On wasm32, [`Self::with_runtime_context`] is a no-op — the JS event
/// loop is the implicit executor, so there is no tokio context to enter.
/// The closure is invoked unchanged so callers can be platform-agnostic.
#[cfg(target_arch = "wasm32")]
pub fn with_runtime_context<F, T>(&self, f: F) -> Result<T, SharedRuntimeError>
where
F: FnOnce() -> T,
{
Ok(f())
}

/// Shutdown all workers asynchronously.
///
/// This should be called during application shutdown to cleanly stop all
Expand All @@ -469,6 +520,7 @@ impl SharedRuntime {
/// Worker errors are logged but do not cause the function to fail.
pub async fn shutdown_async(&self) {
debug!("Shutting down all workers asynchronously");
self.shutdown.store(true, Ordering::Release);
let workers = {
let mut workers_lock = self.workers.lock_or_panic();
std::mem::take(&mut *workers_lock)
Expand Down Expand Up @@ -696,4 +748,71 @@ mod tests {
"worker should not run or shut down after fork in child when restart_on_fork is false"
);
}

/// A single `PausableWorker` in `InvalidState` must
/// not abort the whole restart loop in `after_fork_parent` /
/// `after_fork_child`. The bad entry is logged and skipped; every
/// other worker still resumes after fork.
#[test]
fn after_fork_parent_skips_invalid_state_workers() {
let shared_runtime = SharedRuntime::new().unwrap();

let (good, good_rx) = make_test_worker();
let _ = shared_runtime.spawn_worker(good, true).unwrap();

// Second worker — we'll corrupt its entry into InvalidState below,
// simulating a previously-aborted task.
let (bad, _bad_rx) = make_test_worker();
let _ = shared_runtime.spawn_worker(bad, true).unwrap();

good_rx
.recv_timeout(Duration::from_secs(1))
.expect("good worker did not run before fork");

{
let mut workers = shared_runtime.workers.lock_or_panic();
workers[1].worker = PausableWorker::InvalidState;
}

shared_runtime.before_fork();
while good_rx.try_recv().is_ok() {}

let result = shared_runtime.after_fork_parent();

assert!(
result.is_ok(),
"after_fork_parent should not bail on a single InvalidState worker"
);
assert!(
good_rx.recv_timeout(Duration::from_secs(1)).is_ok(),
"good worker should resume after fork even if a peer is InvalidState"
);
}

/// `spawn_worker` after `shutdown` must reject with
/// `AlreadyShutdown` rather than silently registering a worker that
/// will never run. The shutdown state is observed under the workers
/// lock so the same guarantee holds against the during-shutdown race.
#[test]
fn spawn_worker_after_shutdown_should_be_rejected() {
let shared_runtime = SharedRuntime::new().unwrap();
shared_runtime.shutdown(None).unwrap();

let (worker, rx) = make_test_worker();
let result = shared_runtime.spawn_worker(worker, true);

assert!(
matches!(result, Err(SharedRuntimeError::AlreadyShutdown)),
"spawn_worker after shutdown should return AlreadyShutdown, got {result:?}"
);
assert_eq!(
shared_runtime.workers.lock_or_panic().len(),
0,
"no dead worker should be registered"
);
assert!(
rx.recv_timeout(Duration::from_millis(200)).is_err(),
"no worker should be running after shutdown"
);
}
}
Loading