From ef62114d8baa88b1879f4f60235de13a93ce1a83 Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Wed, 6 May 2026 17:22:20 -0700 Subject: [PATCH 1/8] Initial port --- crates/core/src/host/module_host.rs | 307 ++++++++++++++++++---------- crates/core/src/util/jobs.rs | 20 ++ 2 files changed, 218 insertions(+), 109 deletions(-) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 5a645b2bcb0..043a5f38a2f 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -355,9 +355,16 @@ enum ModuleHostInner { Js(Box), } +/// Wasm uses one instance manager for reducers/views and one for procedures. +/// +/// Both managers share the compiled module via `Arc` so either manager can +/// create replacement instances. Main-lane instance checkout happens after the +/// job is queued on the [`SingleCoreExecutor`], so concurrent websocket callers +/// enqueue first instead of racing to allocate multiple main instances. struct WasmtimeModuleHost { executor: SingleCoreExecutor, - instance_manager: ModuleInstanceManager, + main_instance: Arc>>, + procedure_instances: Arc>>, } struct V8ModuleHost { @@ -366,6 +373,12 @@ struct V8ModuleHost { procedure_instances: ModuleInstanceManager, } +#[derive(Clone, Copy)] +enum InstanceKind { + Main, + Procedure, +} + /// A module; used as a bound on `InstanceManager`. trait GenericModule { type Instance: GenericModuleInstance; @@ -407,6 +420,16 @@ impl GenericModule for super::wasmtime::Module { } } +impl GenericModule for Arc { + type Instance = Box; + async fn create_instance(&self) -> Self::Instance { + Box::new((**self).create_instance()) + } + fn host_type(&self) -> HostType { + HostType::Wasm + } +} + impl GenericModule for super::v8::JsModule { type Instance = super::v8::JsProcedureInstance; async fn create_instance(&self) -> Self::Instance { @@ -801,6 +824,7 @@ impl OneOffQueryV2Params { } } +#[derive(Clone)] pub(crate) struct ProcedureResultTarget { sender: Arc, request_id: RequestId, @@ -1011,11 +1035,6 @@ impl CreateInstanceTimeMetric { } impl ModuleInstanceManager { - fn new(module: M, init_inst: Option, database_identity: Identity) -> Self { - let metrics = InstanceManagerMetrics::new(module.host_type(), database_identity); - Self::new_with_metrics(module, init_inst, metrics) - } - fn new_with_metrics(module: M, init_inst: Option, metrics: InstanceManagerMetrics) -> Self { Self::new_inner(module, init_inst, metrics, None) } @@ -1321,10 +1340,18 @@ impl ModuleHost { init_inst, } => { info = module.info(); - let instance_manager = ModuleInstanceManager::new(module, Some(init_inst), database_identity); + let module = Arc::new(module); + let metrics = InstanceManagerMetrics::new(module.host_type(), database_identity); + let main_instance = Arc::new(ModuleInstanceManager::new_with_metrics( + module.clone(), + Some(init_inst), + metrics.clone(), + )); + let procedure_instances = Arc::new(ModuleInstanceManager::new_with_metrics(module, None, metrics)); Arc::new(ModuleHostInner::Wasm(Box::new(WasmtimeModuleHost { executor, - instance_manager, + main_instance, + procedure_instances, }))) } ModuleWithInstance::Js { module, init_inst } => { @@ -1411,45 +1438,14 @@ impl ModuleHost { }) } - /// Run a function for this module which has access to the module instance. - async fn with_instance( - &self, - label: &str, - arg: A, - timer: impl FnOnce(&str) -> Guard, - work_wasm: impl AsyncFnOnce(Guard, &SingleCoreExecutor, Box, A) -> (R, Box), - work_js: impl AsyncFnOnce(Guard, &JsMainInstance, A) -> R, - ) -> Result { - self.guard_closed()?; - let timer_guard = timer(label); - - // Operations on module instances (e.g. calling reducers) is blocking, - // partially because the computation can potentially take a long time - // and partially because interacting with the database requires taking - // a blocking lock. So, we run `f` on a dedicated thread with `self.executor`. - // This will bubble up any panic that may occur. - - // If a function call panics, we **must** ensure to call `self.on_panic` - // so that the module is discarded by the host controller. - scopeguard::defer_on_unwind!({ - log::warn!("module operation {label} panicked"); - (self.on_panic)(); - }); - - Ok(match &*self.inner { - ModuleHostInner::Wasm(wasm) => { - let executor = &wasm.executor; - let instance_manager = &wasm.instance_manager; - instance_manager - .with_instance(async |inst| work_wasm(timer_guard, executor, inst, arg).await) - .await - } - ModuleHostInner::Js(js) => { - js.main_instance - .with_instance(async |inst| work_js(timer_guard, &inst, arg).await) - .await - } - }) + fn wasm_instance_manager( + host: &WasmtimeModuleHost, + kind: InstanceKind, + ) -> Arc>> { + match kind { + InstanceKind::Main => host.main_instance.clone(), + InstanceKind::Procedure => host.procedure_instances.clone(), + } } /// Run a function for this module which has access to the module instance. @@ -1467,36 +1463,43 @@ impl ModuleHost { R: Send + 'static, A: Send + 'static, { - self.with_instance( - label, - arg, - |l| self.start_call_timer(l), - // Operations on module instances (e.g. calling reducers) is blocking, - // partially because the computation can potentially take a long time - // and partially because interacting with the database requires taking a blocking lock. - // So, we run `work` on a dedicated thread with `self.executor`. - // This will bubble up any panic that may occur. - async move |timer_guard, executor, mut inst, arg| { + self.guard_closed()?; + let timer_guard = self.start_call_timer(label); + + scopeguard::defer_on_unwind!({ + log::warn!("module operation {label} panicked"); + (self.on_panic)(); + }); + + Ok(match &*self.inner { + ModuleHostInner::Wasm(host) => { + let executor = host.executor.clone(); + let instance_manager = Self::wasm_instance_manager(host, InstanceKind::Main); executor .run_job(async move || { drop(timer_guard); - (wasm(arg, &mut inst).await, inst) + instance_manager + .with_instance(async move |mut inst| { + let res = wasm(arg, &mut inst).await; + (res, inst) + }) + .await }) .await - }, - async move |timer_guard, inst, arg| { + } + ModuleHostInner::Js(host) => { drop(timer_guard); - js(arg, inst).await - }, - ) - .await + host.main_instance + .with_instance(async |inst| js(arg, &inst).await) + .await + } + }) } /// Run a function for this module using pooled instances. /// - /// For WASM, this is identical to [`Self::call`]. - /// For V8/JS, this uses the pooled procedure instances instead of the - /// shared main instance. + /// For both WASM and V8/JS, this uses the pooled procedure instances + /// instead of the shared main instance. async fn call_pooled( &self, label: &str, @@ -1518,12 +1521,15 @@ impl ModuleHost { Ok(match &*self.inner { ModuleHostInner::Wasm(host) => { - host.instance_manager - .with_instance(async |mut inst| { - host.executor - .run_job(async move || { - drop(timer_guard); - (wasm(arg, &mut inst).await, inst) + let executor = host.executor.clone(); + let instance_manager = Self::wasm_instance_manager(host, InstanceKind::Procedure); + executor + .run_job(async move || { + drop(timer_guard); + instance_manager + .with_instance(async move |mut inst| { + let res = wasm(arg, &mut inst).await; + (res, inst) }) .await }) @@ -1541,15 +1547,60 @@ impl ModuleHost { }) } - async fn call_view_command(&self, label: &str, cmd: ViewCommand) -> Result { - self.call( - label, - cmd, - async |cmd, inst| inst.call_view(cmd), - async |cmd, inst| inst.call_view(cmd).await, - ) - .await - .map_err(Into::into) + fn enqueue_wasm_job(&self, kind: &str, label: &str, f: F) -> Result<(), NoSuchModule> + where + F: AsyncFnOnce() + Send + 'static, + { + self.guard_closed()?; + + let timer_guard = self.start_call_timer(label); + let kind = kind.to_owned(); + let label = label.to_owned(); + let on_panic = self.on_panic.clone(); + + match &*self.inner { + ModuleHostInner::Wasm(host) => { + let executor = host.executor.clone(); + executor.enqueue_job(async move || { + scopeguard::defer_on_unwind!({ + log::warn!("{kind} {label} panicked"); + on_panic(); + }); + + drop(timer_guard); + f().await; + }); + Ok(()) + } + ModuleHostInner::Js(_) => unreachable!("enqueue_wasm_job should only be used for wasm"), + } + } + + fn enqueue_wasm_instance( + &self, + kind: &str, + label: &str, + instance_kind: InstanceKind, + arg: A, + wasm: impl AsyncFnOnce(A, &mut ModuleInstance) + Send + 'static, + ) -> Result<(), NoSuchModule> + where + A: Send + 'static, + { + match &*self.inner { + ModuleHostInner::Wasm(host) => { + let instance_manager = Self::wasm_instance_manager(host, instance_kind); + self.enqueue_wasm_job(kind, label, async move || { + instance_manager + .with_instance(async move |mut inst| { + wasm(arg, &mut inst).await; + ((), inst) + }) + .await; + }) + } + ModuleHostInner::Js(_) => unreachable!("enqueue_wasm_instance should only be used for wasm"), + } } async fn call_view_command_for_websocket( @@ -1575,12 +1626,22 @@ impl ModuleHost { Ok(None) } ModuleHostInner::Wasm(_) => { - let result = self - .call_view_command(label, cmd) - .await - .map_err(|e| DBError::Other(anyhow::anyhow!(e)))?; - Self::record_view_command_round_trip(&self.info, metric); - result + let info = self.info.clone(); + self.enqueue_wasm_instance( + "main-instance operation", + label, + InstanceKind::Main, + cmd, + async move |cmd, inst| { + let result = inst.call_view(cmd); + Self::record_view_command_round_trip(&info, metric); + if let Err(err) = result { + log::warn!("websocket view operation failed: {err:#}"); + } + }, + ) + .map_err(|e| DBError::Other(anyhow::anyhow!(e)))?; + Ok(None) } } } @@ -1983,7 +2044,17 @@ impl ModuleHost { .await; Ok(()) } - ModuleHostInner::Wasm(_) => self.call_reducer_with_params(reducer_def, params).await.map(drop), + ModuleHostInner::Wasm(_) => self + .enqueue_wasm_instance( + "main-instance operation", + &reducer_def.name, + InstanceKind::Main, + params, + async |params, inst| { + let _ = inst.call_reducer(params); + }, + ) + .map_err(Into::into), } } .await; @@ -2238,15 +2309,34 @@ impl ModuleHost { Ok(()) } ModuleHostInner::Wasm(_) => { - let ret = match self.call_procedure_with_params(&procedure_name, params).await { - Ok(ret) => ret, - Err(err) => CallProcedureReturn { - result: Err(err.into()), - tx_offset: None, + let module = self.clone(); + let procedure_name_for_job = procedure_name.clone(); + let target_for_job = target.clone(); + match self.enqueue_wasm_instance( + "pooled operation", + &procedure_name, + InstanceKind::Procedure, + params, + async move |params, inst| { + let ret = inst.call_procedure(params).await; + module.log_procedure_validation_result(&procedure_name_for_job, &ret); + if let Err(err) = + module.send_procedure_result(&procedure_name_for_job, timer, target_for_job, ret) + { + log::warn!("Procedure call failed: {err:#}"); + } }, - }; - self.log_procedure_validation_result(&procedure_name, &ret); - self.send_procedure_result(&procedure_name, timer, target, ret) + ) { + Ok(()) => Ok(()), + Err(err) => { + let ret = CallProcedureReturn { + result: Err(err.into()), + tx_offset: None, + }; + self.log_procedure_validation_result(&procedure_name, &ret); + self.send_procedure_result(&procedure_name, timer, target, ret) + } + } } } } @@ -2747,16 +2837,15 @@ impl ModuleHost { Ok(()) } ModuleHostInner::Wasm(_) => { - let result = self - .call( - label, - params, - async move |params, _inst| run_inner(params), - async |_, _| unreachable!("one-off query JS path is handled before Self::call"), - ) - .await?; - Self::record_one_off_query_round_trip(&self.info, timer); - result.map(|_| ()) + let info = self.info.clone(); + self.enqueue_wasm_job("module-thread operation", label, async move || { + let result = run_inner(params); + Self::record_one_off_query_round_trip(&info, timer); + if let Err(err) = result { + log::warn!("One-off query failed: {err:#}"); + } + })?; + Ok(()) } } } @@ -3131,14 +3220,14 @@ impl ModuleHost { pub(crate) fn replica_ctx(&self) -> &ReplicaContext { match &*self.inner { - ModuleHostInner::Wasm(wasm) => wasm.instance_manager.module.replica_ctx(), + ModuleHostInner::Wasm(wasm) => wasm.main_instance.module.replica_ctx(), ModuleHostInner::Js(js) => js.module.replica_ctx(), } } fn scheduler(&self) -> &Scheduler { match &*self.inner { - ModuleHostInner::Wasm(wasm) => wasm.instance_manager.module.scheduler(), + ModuleHostInner::Wasm(wasm) => wasm.main_instance.module.scheduler(), ModuleHostInner::Js(js) => js.module.scheduler(), } } diff --git a/crates/core/src/util/jobs.rs b/crates/core/src/util/jobs.rs index cf5e7997fdd..015bc1dc1ef 100644 --- a/crates/core/src/util/jobs.rs +++ b/crates/core/src/util/jobs.rs @@ -375,6 +375,26 @@ impl SingleCoreExecutor { } } + /// Enqueue a job for this database executor without waiting for its result. + pub fn enqueue_job(&self, f: F) + where + F: AsyncFnOnce() + Send + 'static, + { + let span = tracing::Span::current(); + + self.inner + .job_tx + .send(Box::new(move || { + async move { + if AssertUnwindSafe(f().instrument(span)).catch_unwind().await.is_err() { + tracing::warn!("uncaught panic on `SingleCoreExecutor`") + } + } + .boxed_local() + })) + .unwrap_or_else(|_| panic!("job thread exited")); + } + /// Run `f` on this database executor and return its result. pub async fn run_sync_job(&self, f: F) -> R where From 1f824439c59e2718078d78cc4ce1fdc14df9d8de Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Wed, 6 May 2026 17:43:44 -0700 Subject: [PATCH 2/8] Move procedure instance pool in front of job queue --- crates/core/src/host/module_host.rs | 94 ++++++++++++++++++++++------- 1 file changed, 73 insertions(+), 21 deletions(-) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 043a5f38a2f..7fab9be8bf4 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -379,6 +379,10 @@ enum InstanceKind { Procedure, } +fn default_wasm_procedure_instance_pool_size() -> NonZeroUsize { + std::thread::available_parallelism().unwrap_or_else(|_| NonZeroUsize::new(1).unwrap()) +} + /// A module; used as a bound on `InstanceManager`. trait GenericModule { type Instance: GenericModuleInstance; @@ -1347,7 +1351,12 @@ impl ModuleHost { Some(init_inst), metrics.clone(), )); - let procedure_instances = Arc::new(ModuleInstanceManager::new_with_metrics(module, None, metrics)); + let procedure_instances = Arc::new(ModuleInstanceManager::new_bounded_with_metrics( + module, + None, + metrics, + default_wasm_procedure_instance_pool_size(), + )); Arc::new(ModuleHostInner::Wasm(Box::new(WasmtimeModuleHost { executor, main_instance, @@ -1523,11 +1532,11 @@ impl ModuleHost { ModuleHostInner::Wasm(host) => { let executor = host.executor.clone(); let instance_manager = Self::wasm_instance_manager(host, InstanceKind::Procedure); - executor - .run_job(async move || { - drop(timer_guard); - instance_manager - .with_instance(async move |mut inst| { + instance_manager + .with_instance(async move |mut inst| { + executor + .run_job(async move || { + drop(timer_guard); let res = wasm(arg, &mut inst).await; (res, inst) }) @@ -1603,6 +1612,47 @@ impl ModuleHost { } } + async fn enqueue_wasm_procedure_instance( + &self, + kind: &str, + label: &str, + arg: A, + wasm: impl AsyncFnOnce(A, &mut ModuleInstance) + Send + 'static, + ) -> Result<(), NoSuchModule> + where + A: Send + 'static, + { + self.guard_closed()?; + + let timer_guard = self.start_call_timer(label); + let kind = kind.to_owned(); + let label = label.to_owned(); + let on_panic = self.on_panic.clone(); + + match &*self.inner { + ModuleHostInner::Wasm(host) => { + let executor = host.executor.clone(); + let instance_manager = Self::wasm_instance_manager(host, InstanceKind::Procedure); + let ModuleInstanceLease { instance, slot } = instance_manager.get_instance().await; + executor.enqueue_job(async move || { + scopeguard::defer_on_unwind!({ + log::warn!("{kind} {label} panicked"); + on_panic(); + }); + + let mut inst = instance; + drop(timer_guard); + wasm(arg, &mut inst).await; + instance_manager + .return_instance(ModuleInstanceLease { instance: inst, slot }) + .await; + }); + Ok(()) + } + ModuleHostInner::Js(_) => unreachable!("enqueue_wasm_procedure_instance should only be used for wasm"), + } + } + async fn call_view_command_for_websocket( &self, label: &'static str, @@ -2312,21 +2362,23 @@ impl ModuleHost { let module = self.clone(); let procedure_name_for_job = procedure_name.clone(); let target_for_job = target.clone(); - match self.enqueue_wasm_instance( - "pooled operation", - &procedure_name, - InstanceKind::Procedure, - params, - async move |params, inst| { - let ret = inst.call_procedure(params).await; - module.log_procedure_validation_result(&procedure_name_for_job, &ret); - if let Err(err) = - module.send_procedure_result(&procedure_name_for_job, timer, target_for_job, ret) - { - log::warn!("Procedure call failed: {err:#}"); - } - }, - ) { + match self + .enqueue_wasm_procedure_instance( + "pooled operation", + &procedure_name, + params, + async move |params, inst| { + let ret = inst.call_procedure(params).await; + module.log_procedure_validation_result(&procedure_name_for_job, &ret); + if let Err(err) = + module.send_procedure_result(&procedure_name_for_job, timer, target_for_job, ret) + { + log::warn!("Procedure call failed: {err:#}"); + } + }, + ) + .await + { Ok(()) => Ok(()), Err(err) => { let ret = CallProcedureReturn { From 32bcd72272e4258962da22120fc3c2ce657a7a89 Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Wed, 6 May 2026 18:16:30 -0700 Subject: [PATCH 3/8] Make procedure instance pool configurable --- crates/core/src/config.rs | 49 ++++++++++++++++++++++ crates/core/src/host/host_controller.rs | 26 ++++++++---- crates/core/src/host/mod.rs | 2 +- crates/core/src/host/module_host.rs | 8 ++-- crates/core/src/host/wasmtime/mod.rs | 8 +++- crates/standalone/config.toml | 5 +++ crates/standalone/src/lib.rs | 8 ++-- crates/standalone/src/subcommands/start.rs | 5 +++ crates/testing/src/modules.rs | 1 + 9 files changed, 94 insertions(+), 18 deletions(-) diff --git a/crates/core/src/config.rs b/crates/core/src/config.rs index 4a1406ebf41..7821f61f5d9 100644 --- a/crates/core/src/config.rs +++ b/crates/core/src/config.rs @@ -137,6 +137,7 @@ impl fmt::Display for MetadataFile { pub struct ConfigFile { pub certificate_authority: Option, pub logs: LogConfig, + pub wasm: WasmConfig, pub v8: V8Config, } @@ -148,6 +149,8 @@ struct ConfigFileToml { #[serde(default)] logs: LogConfig, #[serde(default)] + wasm: WasmConfigToml, + #[serde(default)] v8: V8ConfigToml, #[serde(default)] v8_heap_policy: V8HeapPolicyConfig, @@ -162,6 +165,9 @@ impl<'de> serde::Deserialize<'de> for ConfigFile { Ok(Self { certificate_authority: config.certificate_authority, logs: config.logs, + wasm: WasmConfig { + procedure_instance_pool_size: config.wasm.procedure_instance_pool_size, + }, v8: V8Config { procedure_instance_pool_size: config.v8.procedure_instance_pool_size, heap_policy: config.v8_heap_policy, @@ -206,6 +212,37 @@ pub struct LogConfig { pub directives: Vec, } +#[derive(Clone, Copy, Debug)] +pub struct WasmConfig { + pub procedure_instance_pool_size: NonZeroUsize, +} + +impl Default for WasmConfig { + fn default() -> Self { + Self { + procedure_instance_pool_size: default_wasm_procedure_instance_pool_size(), + } + } +} + +#[derive(Clone, Copy, Debug, serde::Deserialize)] +#[serde(rename_all = "kebab-case")] +struct WasmConfigToml { + #[serde( + default = "default_wasm_procedure_instance_pool_size", + deserialize_with = "de_nz_usize" + )] + pub procedure_instance_pool_size: NonZeroUsize, +} + +impl Default for WasmConfigToml { + fn default() -> Self { + Self { + procedure_instance_pool_size: default_wasm_procedure_instance_pool_size(), + } + } +} + #[derive(Clone, Copy, Debug)] pub struct V8Config { pub procedure_instance_pool_size: NonZeroUsize, @@ -323,6 +360,10 @@ fn default_v8_procedure_instance_pool_size() -> NonZeroUsize { std::thread::available_parallelism().unwrap_or_else(|_| NonZeroUsize::new(1).unwrap()) } +fn default_wasm_procedure_instance_pool_size() -> NonZeroUsize { + std::thread::available_parallelism().unwrap_or_else(|_| NonZeroUsize::new(1).unwrap()) +} + fn de_nz_usize<'de, D>(deserializer: D) -> Result where D: serde::Deserializer<'de>, @@ -521,6 +562,10 @@ mod tests { fn v8_heap_policy_defaults_when_omitted() { let config: ConfigFile = toml::from_str("").unwrap(); + assert_eq!( + config.wasm.procedure_instance_pool_size, + default_wasm_procedure_instance_pool_size() + ); assert_eq!( config.v8.procedure_instance_pool_size, default_v8_procedure_instance_pool_size() @@ -538,6 +583,9 @@ mod tests { #[test] fn v8_heap_policy_parses_from_toml() { let toml = r#" + [wasm] + procedure-instance-pool-size = 4 + [v8] procedure-instance-pool-size = 3 @@ -551,6 +599,7 @@ mod tests { let config: ConfigFile = toml::from_str(toml).unwrap(); + assert_eq!(config.wasm.procedure_instance_pool_size.get(), 4); assert_eq!(config.v8.procedure_instance_pool_size.get(), 3); assert_eq!(config.v8.heap_policy.heap_check_request_interval, None); assert_eq!( diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index 84fac36c9de..fbed6f33ebb 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -3,7 +3,7 @@ use super::scheduler::SchedulerStarter; use super::wasmtime::WasmtimeRuntime; use super::{Scheduler, UpdateDatabaseResult}; use crate::client::{ClientActorId, ClientName}; -use crate::config::V8Config; +use crate::config::{V8Config, WasmConfig}; use crate::database_logger::DatabaseLogger; use crate::db::persistence::PersistenceProvider; use crate::db::relational_db::{self, spawn_view_cleanup_loop, DiskSizeFn, RelationalDB, Txdata}; @@ -124,10 +124,22 @@ pub(crate) struct HostRuntimes { v8: V8Runtime, } +#[derive(Clone, Copy, Debug, Default)] +pub struct HostRuntimeConfig { + pub wasm: WasmConfig, + pub v8: V8Config, +} + +impl HostRuntimeConfig { + pub fn new(wasm: WasmConfig, v8: V8Config) -> Self { + Self { wasm, v8 } + } +} + impl HostRuntimes { - fn new(data_dir: Option<&ServerDataDir>, v8_config: V8Config) -> Arc { - let wasmtime = WasmtimeRuntime::new(data_dir); - let v8 = V8Runtime::new(v8_config); + fn new(data_dir: Option<&ServerDataDir>, config: HostRuntimeConfig) -> Arc { + let wasmtime = WasmtimeRuntime::new(data_dir, config.wasm); + let v8 = V8Runtime::new(config.v8); Arc::new(Self { wasmtime, v8 }) } } @@ -211,7 +223,7 @@ impl HostController { pub fn new( data_dir: Arc, default_config: db::Config, - v8_config: V8Config, + runtime_config: HostRuntimeConfig, program_storage: ProgramStorage, energy_monitor: Arc, persistence: Arc, @@ -223,7 +235,7 @@ impl HostController { program_storage, energy_monitor, persistence, - runtimes: HostRuntimes::new(Some(&data_dir), v8_config), + runtimes: HostRuntimes::new(Some(&data_dir), runtime_config), data_dir, page_pool: PagePool::new(default_config.page_pool_max_size), bsatn_rlb_pool: BsatnRowListBuilderPool::new(), @@ -1365,7 +1377,7 @@ pub async fn extract_schema(program_bytes: Box<[u8]>, host_type: HostType) -> an extract_schema_with_pools( PagePool::new(None), BsatnRowListBuilderPool::new(), - &HostRuntimes::new(None, V8Config::default()), + &HostRuntimes::new(None, HostRuntimeConfig::default()), program_bytes, host_type, ) diff --git a/crates/core/src/host/mod.rs b/crates/core/src/host/mod.rs index ae5e1fcafc4..daa506a8cf5 100644 --- a/crates/core/src/host/mod.rs +++ b/crates/core/src/host/mod.rs @@ -26,7 +26,7 @@ mod wasm_common; pub use disk_storage::DiskStorage; pub use host_controller::{ extract_schema, CallProcedureReturn, CallResult, ExternalDurability, ExternalStorage, HostController, - MigratePlanResult, ProcedureCallResult, ProgramStorage, ReducerCallResult, ReducerOutcome, + HostRuntimeConfig, MigratePlanResult, ProcedureCallResult, ProgramStorage, ReducerCallResult, ReducerOutcome, }; pub use module_host::{ModuleHost, NoSuchModule, ProcedureCallError, ReducerCallError, UpdateDatabaseResult}; pub use scheduler::Scheduler; diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 7fab9be8bf4..a5de9baa1f0 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -343,6 +343,7 @@ pub enum ModuleWithInstance { module: super::wasmtime::Module, executor: SingleCoreExecutor, init_inst: Box, + procedure_instance_pool_size: NonZeroUsize, }, Js { module: super::v8::JsModule, @@ -379,10 +380,6 @@ enum InstanceKind { Procedure, } -fn default_wasm_procedure_instance_pool_size() -> NonZeroUsize { - std::thread::available_parallelism().unwrap_or_else(|_| NonZeroUsize::new(1).unwrap()) -} - /// A module; used as a bound on `InstanceManager`. trait GenericModule { type Instance: GenericModuleInstance; @@ -1342,6 +1339,7 @@ impl ModuleHost { module, executor, init_inst, + procedure_instance_pool_size, } => { info = module.info(); let module = Arc::new(module); @@ -1355,7 +1353,7 @@ impl ModuleHost { module, None, metrics, - default_wasm_procedure_instance_pool_size(), + procedure_instance_pool_size, )); Arc::new(ModuleHostInner::Wasm(Box::new(WasmtimeModuleHost { executor, diff --git a/crates/core/src/host/wasmtime/mod.rs b/crates/core/src/host/wasmtime/mod.rs index 45960b48ae7..e9396fc6d7e 100644 --- a/crates/core/src/host/wasmtime/mod.rs +++ b/crates/core/src/host/wasmtime/mod.rs @@ -1,6 +1,7 @@ use self::wasm_instance_env::WasmInstanceEnv; use super::wasm_common::module_host_actor::{InitializationError, WasmModuleHostActor, WasmModuleInstance}; use super::wasm_common::{abi, ModuleCreationError}; +use crate::config::WasmConfig; use crate::energy::{EnergyQuanta, FunctionBudget}; use crate::error::NodesError; use crate::module_host_context::ModuleCreationContext; @@ -20,6 +21,7 @@ mod wasmtime_module; pub struct WasmtimeRuntime { engine: Engine, linker: Box>, + config: WasmConfig, } const EPOCH_TICK_LENGTH: Duration = Duration::from_millis(10); @@ -43,7 +45,7 @@ pub(crate) fn epoch_ticker(mut on_tick: impl 'static + Send + FnMut() -> Option< } impl WasmtimeRuntime { - pub fn new(data_dir: Option<&ServerDataDir>) -> Self { + pub fn new(data_dir: Option<&ServerDataDir>, runtime_config: WasmConfig) -> Self { let mut config = wasmtime::Config::new(); config .cranelift_opt_level(wasmtime::OptLevel::Speed) @@ -99,7 +101,8 @@ impl WasmtimeRuntime { let mut linker = Box::new(Linker::new(&engine)); WasmtimeModule::link_imports(&mut linker).unwrap(); - WasmtimeRuntime { engine, linker } + let config = runtime_config; + WasmtimeRuntime { engine, linker, config } } } @@ -145,6 +148,7 @@ impl WasmtimeRuntime { module, executor: core.spawn_named_async_executor(executor_thread_name), init_inst: Box::new(init_inst), + procedure_instance_pool_size: self.config.procedure_instance_pool_size, }) } } diff --git a/crates/standalone/config.toml b/crates/standalone/config.toml index 307bc808a02..29a4bf3556e 100644 --- a/crates/standalone/config.toml +++ b/crates/standalone/config.toml @@ -18,6 +18,11 @@ directives = [ "axum::rejection=trace", ] +[wasm] +# Maximum number of WASM procedure instances per database. Omit to use the +# number of cores reported by the OS. +# procedure-instance-pool-size = 8 + [v8] # Maximum number of JS procedure isolates per database. Omit to use the number # of cores reported by the OS. diff --git a/crates/standalone/src/lib.rs b/crates/standalone/src/lib.rs index 3b6a01a11a6..de4b80ce78c 100644 --- a/crates/standalone/src/lib.rs +++ b/crates/standalone/src/lib.rs @@ -10,11 +10,11 @@ use async_trait::async_trait; use clap::{ArgMatches, Command}; use http::StatusCode; use spacetimedb::client::ClientActorIndex; -use spacetimedb::config::{CertificateAuthority, MetadataFile, V8Config}; +use spacetimedb::config::{CertificateAuthority, MetadataFile, V8Config, WasmConfig}; use spacetimedb::db; use spacetimedb::db::persistence::LocalPersistenceProvider; use spacetimedb::energy::{EnergyBalance, EnergyQuanta, NullEnergyMonitor}; -use spacetimedb::host::{DiskStorage, HostController, MigratePlanResult, UpdateDatabaseResult}; +use spacetimedb::host::{DiskStorage, HostController, HostRuntimeConfig, MigratePlanResult, UpdateDatabaseResult}; use spacetimedb::identity::{AuthCtx, Identity}; use spacetimedb::messages::control_db::{Database, Node, Replica}; use spacetimedb::subscription::row_list_builder_pool::BsatnRowListBuilderPool; @@ -42,6 +42,7 @@ pub use spacetimedb_client_api::routes::subscribe::{BIN_PROTOCOL, TEXT_PROTOCOL} pub struct StandaloneOptions { pub db_config: db::Config, pub websocket: WebSocketOptions, + pub wasm: WasmConfig, pub v8: V8Config, } @@ -79,7 +80,7 @@ impl StandaloneEnv { let host_controller = HostController::new( data_dir, config.db_config, - config.v8, + HostRuntimeConfig::new(config.wasm, config.v8), program_store.clone(), energy_monitor, persistence_provider, @@ -650,6 +651,7 @@ mod tests { page_pool_max_size: None, }, websocket: WebSocketOptions::default(), + wasm: WasmConfig::default(), v8: V8Config::default(), }; diff --git a/crates/standalone/src/subcommands/start.rs b/crates/standalone/src/subcommands/start.rs index b407372aa34..50f6db19257 100644 --- a/crates/standalone/src/subcommands/start.rs +++ b/crates/standalone/src/subcommands/start.rs @@ -182,6 +182,7 @@ pub async fn exec(args: &ArgMatches, db_cores: JobCores) -> anyhow::Result<()> { StandaloneOptions { db_config, websocket: config.websocket, + wasm: config.common.wasm, v8: config.common.v8, }, &certs, @@ -512,6 +513,9 @@ mod tests { idle-timeout = "1min" close-handshake-timeout = "500ms" + [wasm] + procedure-instance-pool-size = 4 + [v8] procedure-instance-pool-size = 3 @@ -529,6 +533,7 @@ mod tests { // so check `common` in a pedestrian way. assert_eq!(&config.common.logs.directives, &["banana_shake=strawberry"]); assert!(config.common.certificate_authority.is_none()); + assert_eq!(config.common.wasm.procedure_instance_pool_size.get(), 4); assert_eq!(config.common.v8.procedure_instance_pool_size.get(), 3); assert_eq!(config.common.v8.heap_policy.heap_check_request_interval, None); assert_eq!( diff --git a/crates/testing/src/modules.rs b/crates/testing/src/modules.rs index 4dc81857a4a..0b1a1f16d66 100644 --- a/crates/testing/src/modules.rs +++ b/crates/testing/src/modules.rs @@ -236,6 +236,7 @@ impl CompiledModule { spacetimedb_standalone::StandaloneOptions { db_config: config, websocket: WebSocketOptions::default(), + wasm: Default::default(), v8: Default::default(), }, &certs, From b13778feb3a77dd8f9cdb7b87650ca79c1881419 Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Wed, 6 May 2026 19:14:44 -0700 Subject: [PATCH 4/8] Remove boilerplate and duplication module host code --- crates/core/src/host/module_host.rs | 946 ++++++++++++++-------------- crates/core/src/host/v8/mod.rs | 314 +++------ 2 files changed, 565 insertions(+), 695 deletions(-) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index a5de9baa1f0..7ef1d850588 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -5,7 +5,7 @@ use super::{ use crate::client::messages::{OneOffQueryResponseMessage, ProcedureResultMessage, SerializableMessage}; use crate::client::{ClientActorId, ClientConnectionSender, WsVersion}; use crate::database_logger::{DatabaseLogger, LogLevel, Record}; -use crate::db::relational_db::RelationalDB; +use crate::db::relational_db::{RelationalDB, Tx}; use crate::energy::EnergyQuanta; use crate::error::DBError; use crate::estimation::{check_row_limit, estimate_rows_scanned}; @@ -411,16 +411,6 @@ impl GenericModuleInstance for Box { } } -impl GenericModule for super::wasmtime::Module { - type Instance = Box; - async fn create_instance(&self) -> Self::Instance { - Box::new(self.create_instance()) - } - fn host_type(&self) -> HostType { - HostType::Wasm - } -} - impl GenericModule for Arc { type Instance = Box; async fn create_instance(&self) -> Self::Instance { @@ -688,6 +678,16 @@ impl CallReducerParams { } } +struct PreparedReducerCall { + name: ReducerName, + params: CallReducerParams, +} + +struct PreparedProcedureCall { + name: String, + params: CallProcedureParams, +} + pub enum ViewCommand { AddSingleSubscription { sender: Arc, @@ -785,12 +785,6 @@ pub(in crate::host) struct OneOffQueryJsonParams { timer: Instant, } -impl OneOffQueryJsonParams { - pub(in crate::host) fn timer(&self) -> Instant { - self.timer - } -} - pub(in crate::host) struct OneOffQueryBsatnParams { db: Arc, subscriptions: ModuleSubscriptions, @@ -802,12 +796,6 @@ pub(in crate::host) struct OneOffQueryBsatnParams { rlb_pool: BsatnRowListBuilderPool, } -impl OneOffQueryBsatnParams { - pub(in crate::host) fn timer(&self) -> Instant { - self.timer - } -} - pub(in crate::host) struct OneOffQueryV2Params { db: Arc, subscriptions: ModuleSubscriptions, @@ -819,9 +807,35 @@ pub(in crate::host) struct OneOffQueryV2Params { rlb_pool: BsatnRowListBuilderPool, } -impl OneOffQueryV2Params { +pub(in crate::host) enum OneOffQueryRequest { + Json(OneOffQueryJsonParams), + Bsatn(OneOffQueryBsatnParams), + V2(OneOffQueryV2Params), +} + +impl OneOffQueryRequest { + pub(in crate::host) fn label(&self) -> &'static str { + match self { + Self::Json(_) => "one_off_query_json", + Self::Bsatn(_) => "one_off_query_bsatn", + Self::V2(_) => "one_off_query_v2", + } + } + pub(in crate::host) fn timer(&self) -> Instant { - self.timer + match self { + Self::Json(params) => params.timer, + Self::Bsatn(params) => params.timer, + Self::V2(params) => params.timer, + } + } + + pub(in crate::host) fn run(self) -> OneOffQueryResult { + match self { + Self::Json(params) => ModuleHost::one_off_query_json_inner(params), + Self::Bsatn(params) => ModuleHost::one_off_query_bsatn_inner(params), + Self::V2(params) => ModuleHost::one_off_query_v2_inner(params), + } } } @@ -1327,6 +1341,66 @@ pub struct RefInstance<'a, I: WasmInstance> { pub instance: &'a mut I, } +macro_rules! call_view_command_method { + ( + $(#[$meta:meta])* + pub async fn $method:ident( + &self, + $($arg:ident: $arg_ty:ty),* $(,)? + ) -> $label:literal => $variant:ident $body:tt + ) => { + $(#[$meta])* + pub async fn $method( + &self, + $($arg: $arg_ty),* + ) -> Result, DBError> { + self.call_view_command_for_websocket( + $label, + ViewCommand::$variant $body, + ) + .await + } + }; +} + +macro_rules! call_instance { + ( + $self:expr, + $label:expr, + $arg:expr, + |$wasm_arg:pat_param, $wasm_inst:ident| $wasm:expr, + |$js_arg:pat_param, $js_inst:ident| $js:expr $(,)? + ) => { + $self + .call( + $label, + $arg, + async |$wasm_arg, $wasm_inst| $wasm, + async |$js_arg, $js_inst| $js, + ) + .await + }; +} + +macro_rules! call_pooled_instance { + ( + $self:expr, + $label:expr, + $arg:expr, + |$wasm_arg:pat_param, $wasm_inst:ident| $wasm:expr, + |$js_arg:pat_param, $js_inst:ident| $js:expr $(,)? + ) => { + $self + .call_pooled( + $label, + $arg, + async |$wasm_arg, $wasm_inst| $wasm, + async |$js_arg, $js_inst| $js, + ) + .await + }; +} + impl ModuleHost { pub(super) fn new( module: ModuleWithInstance, @@ -1651,31 +1725,54 @@ impl ModuleHost { } } - async fn call_view_command_for_websocket( + async fn enqueue_main_operation( &self, - label: &'static str, - cmd: ViewCommand, - ) -> Result, DBError> { - let metric = cmd.metric(); - + panic_kind: &'static str, + label: &str, + arg: A, + js: impl FnOnce(A, JsMainInstance, JsFatalHook) -> JsFut, + wasm: impl FnOnce(&Self, A) -> Result<(), NoSuchModule>, + ) -> Result<(), NoSuchModule> + where + A: Send + 'static, + JsFut: Future + Send + 'static, + { + let panic_label = label.to_owned(); scopeguard::defer_on_unwind!({ - log::warn!("websocket view operation {label} panicked"); + log::warn!("{panic_kind} {panic_label} panicked"); (self.on_panic)(); }); match &*self.inner { - ModuleHostInner::Js(js) => { - self.guard_closed() - .map_err(|err| DBError::Other(anyhow::anyhow!(err)))?; + ModuleHostInner::Js(js_host) => { + self.guard_closed()?; let on_panic = self.on_panic.clone(); - js.main_instance - .with_instance(async |inst| inst.enqueue_call_view(cmd, metric, on_panic).await) + js_host + .main_instance + .with_instance(async |inst| js(arg, inst, on_panic).await) .await; - Ok(None) + Ok(()) } - ModuleHostInner::Wasm(_) => { - let info = self.info.clone(); - self.enqueue_wasm_instance( + ModuleHostInner::Wasm(_) => wasm(self, arg), + } + } + + async fn call_view_command_for_websocket( + &self, + label: &'static str, + cmd: ViewCommand, + ) -> Result, DBError> { + let metric = cmd.metric(); + + let info = self.info.clone(); + self.enqueue_main_operation( + "websocket view operation", + label, + (cmd, metric), + |(cmd, metric), inst, on_panic| async move { inst.enqueue_call_view(cmd, metric, on_panic).await }, + move |module, (cmd, metric)| { + let info = info.clone(); + module.enqueue_wasm_instance( "main-instance operation", label, InstanceKind::Main, @@ -1688,10 +1785,11 @@ impl ModuleHost { } }, ) - .map_err(|e| DBError::Other(anyhow::anyhow!(e)))?; - Ok(None) - } - } + }, + ) + .await + .map_err(|e| DBError::Other(anyhow::anyhow!(e)))?; + Ok(None) } pub(in crate::host) fn record_view_command_round_trip(info: &ModuleInfo, metric: ViewCommandMetric) { @@ -1709,27 +1807,21 @@ impl ModuleHost { } async fn call_sql_command(&self, cmd: SqlCommand) -> Result { - self.call( - "call_sql", - cmd, - async |cmd, inst| inst.call_sql(cmd), - async |cmd, inst| inst.call_sql(cmd).await, - ) - .await + call_instance!(self, "call_sql", cmd, |cmd, inst| inst.call_sql(cmd), |cmd, inst| inst + .call_sql(cmd) + .await,) .map_err(|_| DBError::Other(anyhow::anyhow!("no such module"))) } pub async fn disconnect_client(&self, client_id: ClientActorId) { log::trace!("disconnecting client {client_id}"); - if let Err(e) = self - .call( - "disconnect_client", - client_id, - async |client_id, inst| inst.disconnect_client(client_id), - async |client_id, inst| inst.disconnect_client(client_id).await, - ) - .await - { + if let Err(e) = call_instance!( + self, + "disconnect_client", + client_id, + |client_id, inst| inst.disconnect_client(client_id), + |client_id, inst| inst.disconnect_client(client_id).await, + ) { log::error!("Error from client_disconnected transaction: {e}"); } } @@ -1770,13 +1862,13 @@ impl ModuleHost { caller_auth: ConnectionAuthCtx, caller_connection_id: ConnectionId, ) -> Result<(), ClientConnectedError> { - self.call( + call_instance!( + self, "call_identity_connected", (caller_auth, caller_connection_id), - async |(a, b), inst| inst.call_identity_connected(a, b), - async |(a, b), inst| inst.call_identity_connected(a, b).await, + |(a, b), inst| inst.call_identity_connected(a, b), + |(a, b), inst| inst.call_identity_connected(a, b).await, ) - .await .map_err(ReducerCallError::from)? } @@ -1913,24 +2005,24 @@ impl ModuleHost { caller_identity: Identity, caller_connection_id: ConnectionId, ) -> Result<(), ReducerCallError> { - self.call( + call_instance!( + self, "call_identity_disconnected", (caller_identity, caller_connection_id), - async |(a, b), inst| inst.call_identity_disconnected(a, b), - async |(a, b), inst| inst.call_identity_disconnected(a, b).await, - ) - .await? + |(a, b), inst| inst.call_identity_disconnected(a, b), + |(a, b), inst| inst.call_identity_disconnected(a, b).await, + )? } /// Empty the system tables tracking clients without running any lifecycle reducers. pub async fn clear_all_clients(&self) -> anyhow::Result<()> { - self.call( + call_instance!( + self, "clear_all_clients", (), - async |_, inst| inst.clear_all_clients(), - async |_, inst| inst.clear_all_clients().await, - ) - .await? + |_, inst| inst.clear_all_clients(), + |_, inst| inst.clear_all_clients().await, + )? } fn call_reducer_params( @@ -2001,16 +2093,12 @@ impl ModuleHost { async fn call_reducer_with_params( &self, - reducer_def: &ReducerDef, + reducer_name: &ReducerName, params: CallReducerParams, ) -> Result { - self.call( - &reducer_def.name, - params, - async |p, inst| inst.call_reducer(p), - async |p, inst| inst.call_reducer(p).await, - ) - .await + call_instance!(self, reducer_name, params, |p, inst| inst.call_reducer(p), |p, inst| { + inst.call_reducer(p).await + },) .map_err(Into::into) } @@ -2026,7 +2114,7 @@ impl ModuleHost { } } - pub async fn call_reducer( + async fn with_reducer_call( &self, caller_identity: Identity, caller_connection_id: Option, @@ -2035,7 +2123,8 @@ impl ModuleHost { timer: Option, reducer_name: &str, args: FunctionArgs, - ) -> Result { + f: impl AsyncFnOnce(PreparedReducerCall) -> Result, + ) -> Result { let res = async { let (reducer_def, params) = self.reducer_call_params( caller_identity, @@ -2046,7 +2135,11 @@ impl ModuleHost { reducer_name, args, )?; - self.call_reducer_with_params(reducer_def, params).await + f(PreparedReducerCall { + name: reducer_def.name.clone(), + params, + }) + .await } .await; @@ -2057,6 +2150,29 @@ impl ModuleHost { res } + pub async fn call_reducer( + &self, + caller_identity: Identity, + caller_connection_id: Option, + client: Option>, + request_id: Option, + timer: Option, + reducer_name: &str, + args: FunctionArgs, + ) -> Result { + self.with_reducer_call( + caller_identity, + caller_connection_id, + client, + request_id, + timer, + reducer_name, + args, + async |call| self.call_reducer_with_params(&call.name, call.params).await, + ) + .await + } + pub async fn enqueue_reducer( &self, caller_identity: Identity, @@ -2067,177 +2183,143 @@ impl ModuleHost { reducer_name: &str, args: FunctionArgs, ) -> Result<(), ReducerCallError> { - let res = async { - let (reducer_def, params) = self.reducer_call_params( - caller_identity, - caller_connection_id, - client, - request_id, - timer, - reducer_name, - args, - )?; - - scopeguard::defer_on_unwind!({ - log::warn!("websocket reducer operation {reducer_name} panicked"); - (self.on_panic)(); - }); - - match &*self.inner { - ModuleHostInner::Js(js) => { - self.guard_closed()?; - let on_panic = self.on_panic.clone(); - js.main_instance - .with_instance(async |inst| inst.enqueue_reducer(params, on_panic).await) - .await; - Ok(()) - } - ModuleHostInner::Wasm(_) => self - .enqueue_wasm_instance( - "main-instance operation", - &reducer_def.name, - InstanceKind::Main, - params, - async |params, inst| { - let _ = inst.call_reducer(params); - }, - ) - .map_err(Into::into), - } - } - .await; - - if let Err(err) = &res { - self.log_reducer_submit_error(reducer_name, err); - } - - res + self.with_reducer_call( + caller_identity, + caller_connection_id, + client, + request_id, + timer, + reducer_name, + args, + async |call| { + let reducer_label = call.name; + self.enqueue_main_operation( + "websocket reducer operation", + reducer_name, + call.params, + |params, inst, on_panic| async move { inst.enqueue_reducer(params, on_panic).await }, + move |module, params| { + module.enqueue_wasm_instance( + "main-instance operation", + &reducer_label, + InstanceKind::Main, + params, + async |params, inst| { + let _ = inst.call_reducer(params); + }, + ) + }, + ) + .await + .map_err(Into::into) + }, + ) + .await } - pub async fn call_view_add_single_subscription( - &self, - sender: Arc, - auth: AuthCtx, - request: ws_v1::SubscribeSingle, - timer: Instant, - ) -> Result, DBError> { - let cmd = ViewCommand::AddSingleSubscription { + call_view_command_method! { + pub async fn call_view_add_single_subscription( + &self, + sender: Arc, + auth: AuthCtx, + request: ws_v1::SubscribeSingle, + timer: Instant, + ) -> "call_view_add_single_subscription" => AddSingleSubscription { sender, auth, request, _timer: timer, - }; - - self.call_view_command_for_websocket("call_view_add_single_subscription", cmd) - .await + } } - pub async fn call_view_add_v2_subscription( - &self, - sender: Arc, - auth: AuthCtx, - request: ws_v2::Subscribe, - timer: Instant, - ) -> Result, DBError> { - let cmd = ViewCommand::AddSubscriptionV2 { + call_view_command_method! { + pub async fn call_view_add_v2_subscription( + &self, + sender: Arc, + auth: AuthCtx, + request: ws_v2::Subscribe, + timer: Instant, + ) -> "call_view_add_multi_subscription" => AddSubscriptionV2 { sender, auth, request, _timer: timer, - }; - - self.call_view_command_for_websocket("call_view_add_multi_subscription", cmd) - .await + } } - pub async fn call_view_remove_single_subscription( - &self, - sender: Arc, - auth: AuthCtx, - request: ws_v1::Unsubscribe, - timer: Instant, - ) -> Result, DBError> { - let cmd = ViewCommand::RemoveSingleSubscription { + call_view_command_method! { + pub async fn call_view_remove_single_subscription( + &self, + sender: Arc, + auth: AuthCtx, + request: ws_v1::Unsubscribe, + timer: Instant, + ) -> "call_view_remove_single_subscription" => RemoveSingleSubscription { sender, auth, request, timer, - }; - - self.call_view_command_for_websocket("call_view_remove_single_subscription", cmd) - .await + } } - pub async fn call_view_remove_v2_subscription( - &self, - sender: Arc, - auth: AuthCtx, - request: ws_v2::Unsubscribe, - timer: Instant, - ) -> Result, DBError> { - let cmd = ViewCommand::RemoveSubscriptionV2 { + call_view_command_method! { + pub async fn call_view_remove_v2_subscription( + &self, + sender: Arc, + auth: AuthCtx, + request: ws_v2::Unsubscribe, + timer: Instant, + ) -> "call_view_remove_v2_subscription" => RemoveSubscriptionV2 { sender, auth, request, timer, - }; - - self.call_view_command_for_websocket("call_view_remove_v2_subscription", cmd) - .await + } } - pub async fn call_view_remove_multi_subscription( - &self, - sender: Arc, - auth: AuthCtx, - request: ws_v1::UnsubscribeMulti, - timer: Instant, - ) -> Result, DBError> { - let cmd = ViewCommand::RemoveMultiSubscription { + call_view_command_method! { + pub async fn call_view_remove_multi_subscription( + &self, + sender: Arc, + auth: AuthCtx, + request: ws_v1::UnsubscribeMulti, + timer: Instant, + ) -> "call_view_remove_multi_subscription" => RemoveMultiSubscription { sender, auth, request, timer, - }; - - self.call_view_command_for_websocket("call_view_remove_multi_subscription", cmd) - .await + } } - pub async fn call_view_add_multi_subscription( - &self, - sender: Arc, - auth: AuthCtx, - request: ws_v1::SubscribeMulti, - timer: Instant, - ) -> Result, DBError> { - let cmd = ViewCommand::AddMultiSubscription { + call_view_command_method! { + pub async fn call_view_add_multi_subscription( + &self, + sender: Arc, + auth: AuthCtx, + request: ws_v1::SubscribeMulti, + timer: Instant, + ) -> "call_view_add_multi_subscription" => AddMultiSubscription { sender, auth, request, _timer: timer, - }; - - self.call_view_command_for_websocket("call_view_add_multi_subscription", cmd) - .await + } } - pub async fn call_view_add_legacy_subscription( - &self, - sender: Arc, - auth: AuthCtx, - subscribe: ws_v1::Subscribe, - timer: Instant, - ) -> Result, DBError> { - let cmd = ViewCommand::AddLegacySubscription { + call_view_command_method! { + pub async fn call_view_add_legacy_subscription( + &self, + sender: Arc, + auth: AuthCtx, + subscribe: ws_v1::Subscribe, + timer: Instant, + ) -> "call_view_add_legacy_subscription" => AddLegacySubscription { sender, auth, subscribe, _timer: timer, - }; - - self.call_view_command_for_websocket("call_view_add_legacy_subscription", cmd) - .await + } } pub async fn call_sql( @@ -2270,21 +2352,15 @@ impl ModuleHost { args: FunctionArgs, ) -> CallProcedureReturn { let res = async { - let (procedure_def, params) = - self.procedure_call_params(caller_identity, caller_connection_id, timer, procedure_name, args)?; - self.call_procedure_with_params(&procedure_def.name, params) + let call = + self.prepare_procedure_call(caller_identity, caller_connection_id, timer, procedure_name, args)?; + self.call_procedure_with_params(&call.name, call.params) .await .map_err(Into::into) } .await; - let ret = match res { - Ok(ret) => ret, - Err(err) => CallProcedureReturn { - result: Err(err), - tx_offset: None, - }, - }; + let ret = res.unwrap_or_else(Self::procedure_error_return); self.log_procedure_validation_result(procedure_name, &ret); @@ -2300,24 +2376,14 @@ impl ModuleHost { args: FunctionArgs, target: ProcedureResultTarget, ) -> Result<(), BroadcastError> { - let res = async { - let (procedure_def, params) = - self.procedure_call_params(caller_identity, caller_connection_id, timer, procedure_name, args)?; - Ok((procedure_def.name.to_string(), params)) - } - .await; - - let (procedure_name, params) = match res { - Ok(value) => value, - Err(err) => { - let ret = CallProcedureReturn { - result: Err(err), - tx_offset: None, - }; - self.log_procedure_validation_result(procedure_name, &ret); - return self.send_procedure_result(procedure_name, timer, target, ret); - } - }; + let PreparedProcedureCall { name, params } = + match self.prepare_procedure_call(caller_identity, caller_connection_id, timer, procedure_name, args) { + Ok(value) => value, + Err(err) => { + return self.send_procedure_error(procedure_name, timer, target, err); + } + }; + let procedure_name = name; let guard_procedure_name = procedure_name.clone(); scopeguard::defer_on_unwind!({ @@ -2326,12 +2392,7 @@ impl ModuleHost { }); if let Err(err) = self.guard_closed() { - let ret = CallProcedureReturn { - result: Err(err.into()), - tx_offset: None, - }; - self.log_procedure_validation_result(&procedure_name, &ret); - return self.send_procedure_result(&procedure_name, timer, target, ret); + return self.send_procedure_error(&procedure_name, timer, target, err.into()); } match &*self.inner { @@ -2342,8 +2403,8 @@ impl ModuleHost { tokio::spawn(async move { match call.receive().await { JsProcedureCallCompletion::Completed(ret) => { - module.log_procedure_validation_result(&procedure_name, &ret); - if let Err(err) = module.send_procedure_result(&procedure_name, timer, target, ret) { + if let Err(err) = module.log_and_send_procedure_result(&procedure_name, timer, target, ret) + { log::warn!("failed to send procedure result: {err:#}"); } } @@ -2367,10 +2428,12 @@ impl ModuleHost { params, async move |params, inst| { let ret = inst.call_procedure(params).await; - module.log_procedure_validation_result(&procedure_name_for_job, &ret); - if let Err(err) = - module.send_procedure_result(&procedure_name_for_job, timer, target_for_job, ret) - { + if let Err(err) = module.log_and_send_procedure_result( + &procedure_name_for_job, + timer, + target_for_job, + ret, + ) { log::warn!("Procedure call failed: {err:#}"); } }, @@ -2378,14 +2441,7 @@ impl ModuleHost { .await { Ok(()) => Ok(()), - Err(err) => { - let ret = CallProcedureReturn { - result: Err(err.into()), - tx_offset: None, - }; - self.log_procedure_validation_result(&procedure_name, &ret); - self.send_procedure_result(&procedure_name, timer, target, ret) - } + Err(err) => self.send_procedure_error(&procedure_name, timer, target, err.into()), } } } @@ -2398,6 +2454,34 @@ impl ModuleHost { host.procedure_instances.return_instance(lease).await; } + fn procedure_error_return(err: ProcedureCallError) -> CallProcedureReturn { + CallProcedureReturn { + result: Err(err), + tx_offset: None, + } + } + + fn send_procedure_error( + &self, + procedure_name: &str, + timer: Option, + target: ProcedureResultTarget, + err: ProcedureCallError, + ) -> Result<(), BroadcastError> { + self.log_and_send_procedure_result(procedure_name, timer, target, Self::procedure_error_return(err)) + } + + fn log_and_send_procedure_result( + &self, + procedure_name: &str, + timer: Option, + target: ProcedureResultTarget, + ret: CallProcedureReturn, + ) -> Result<(), BroadcastError> { + self.log_procedure_validation_result(procedure_name, &ret); + self.send_procedure_result(procedure_name, timer, target, ret) + } + fn log_procedure_validation_result(&self, procedure_name: &str, ret: &CallProcedureReturn) { let log_message = match &ret.result { Err(ProcedureCallError::NoSuchProcedure) => Some(no_such_function_log_message("procedure", procedure_name)), @@ -2466,6 +2550,22 @@ impl ModuleHost { } } + fn prepare_procedure_call( + &self, + caller_identity: Identity, + caller_connection_id: Option, + timer: Option, + procedure_name: &str, + args: FunctionArgs, + ) -> Result { + let (procedure_def, params) = + self.procedure_call_params(caller_identity, caller_connection_id, timer, procedure_name, args)?; + Ok(PreparedProcedureCall { + name: procedure_def.name.to_string(), + params, + }) + } + fn procedure_call_params<'a>( &'a self, caller_identity: Identity, @@ -2512,26 +2612,26 @@ impl ModuleHost { name: &str, params: CallProcedureParams, ) -> Result { - self.call_pooled( + call_pooled_instance!( + self, name, params, - async move |params, inst| inst.call_procedure(params).await, - async move |params, inst| inst.call_procedure(params).await, + |params, inst| inst.call_procedure(params).await, + |params, inst| inst.call_procedure(params).await, ) - .await } pub(super) async fn call_scheduled_reducer( &self, params: ScheduledFunctionParams, ) -> Result { - self.call( + call_instance!( + self, "scheduled reducer", params, - async move |params, inst| inst.call_scheduled_function(params).await, - async move |params, inst| inst.call_scheduled_reducer(params).await, + |params, inst| inst.call_scheduled_function(params).await, + |params, inst| inst.call_scheduled_reducer(params).await, ) - .await .map_err(Into::into) } @@ -2539,13 +2639,13 @@ impl ModuleHost { &self, params: ScheduledFunctionParams, ) -> Result { - self.call_pooled( + call_pooled_instance!( + self, "scheduled procedure", params, - async move |params, inst| inst.call_scheduled_function(params).await, - async move |params, inst| inst.call_scheduled_procedure(params).await, + |params, inst| inst.call_scheduled_function(params).await, + |params, inst| inst.call_scheduled_procedure(params).await, ) - .await .map_err(Into::into) } @@ -2761,13 +2861,13 @@ impl ModuleHost { } pub async fn init_database(&self, program: Program) -> Result, InitDatabaseError> { - self.call( + call_instance!( + self, "", program, - async |p, inst| inst.init_database(p), - async |p, inst| inst.init_database(p).await, - ) - .await? + |p, inst| inst.init_database(p), + |p, inst| inst.init_database(p).await, + )? .map_err(InitDatabaseError::Other) } @@ -2777,13 +2877,13 @@ impl ModuleHost { old_module_info: Arc, policy: MigrationPolicy, ) -> Result { - self.call( + call_instance!( + self, "", (program, old_module_info, policy), - async |(a, b, c), inst| inst.update_database(a, b, c), - async |(a, b, c), inst| inst.update_database(a, b, c).await, - ) - .await? + |(a, b, c), inst| inst.update_database(a, b, c), + |(a, b, c), inst| inst.update_database(a, b, c).await, + )? } pub async fn exit(&self) { @@ -2830,7 +2930,7 @@ impl ModuleHost { timer, }; log::debug!("One-off query: {}", params.query); - self.one_off_query_json_with_params(params).await + self.one_off_query_with_params(OneOffQueryRequest::Json(params)).await } /// Execute a BSATN one-off query and send the results to the given client. @@ -2857,71 +2957,31 @@ impl ModuleHost { rlb_pool, }; log::debug!("One-off query: {}", params.query); - self.one_off_query_bsatn_with_params(params).await + self.one_off_query_with_params(OneOffQueryRequest::Bsatn(params)).await } - async fn one_off_query_for_websocket( - &self, - label: &'static str, - params: P, - timer: Instant, - run_inner: impl FnOnce(P) -> OneOffQueryResult + Send + 'static, - enqueue_js: impl FnOnce(P, JsMainInstance, JsFatalHook) -> Fut + Send + 'static, - ) -> Result<(), anyhow::Error> - where - P: Send + 'static, - Fut: Future + Send + 'static, - { - scopeguard::defer_on_unwind!({ - log::warn!("websocket one-off query operation {label} panicked"); - (self.on_panic)(); - }); - - match &*self.inner { - ModuleHostInner::Js(js) => { - self.guard_closed()?; - let on_panic = self.on_panic.clone(); - js.main_instance - .with_instance(async |inst| enqueue_js(params, inst, on_panic).await) - .await; - Ok(()) - } - ModuleHostInner::Wasm(_) => { - let info = self.info.clone(); - self.enqueue_wasm_job("module-thread operation", label, async move || { - let result = run_inner(params); + async fn one_off_query_with_params(&self, request: OneOffQueryRequest) -> Result<(), anyhow::Error> { + let label = request.label(); + let timer = request.timer(); + let info = self.info.clone(); + self.enqueue_main_operation( + "websocket one-off query operation", + label, + request, + |request, inst, on_panic| async move { inst.enqueue_one_off_query(request, on_panic).await }, + move |module, request| { + let info = info.clone(); + module.enqueue_wasm_job("module-thread operation", label, async move || { + let result = request.run(); Self::record_one_off_query_round_trip(&info, timer); if let Err(err) = result { log::warn!("One-off query failed: {err:#}"); } - })?; - Ok(()) - } - } - } - - async fn one_off_query_json_with_params(&self, params: OneOffQueryJsonParams) -> Result<(), anyhow::Error> { - let timer = params.timer; - self.one_off_query_for_websocket( - "one_off_query_json", - params, - timer, - Self::one_off_query_json_inner, - |params, inst, on_panic| async move { inst.enqueue_one_off_query_json(params, on_panic).await }, - ) - .await - } - - async fn one_off_query_bsatn_with_params(&self, params: OneOffQueryBsatnParams) -> Result<(), anyhow::Error> { - let timer = params.timer; - self.one_off_query_for_websocket( - "one_off_query_bsatn", - params, - timer, - Self::one_off_query_bsatn_inner, - |params, inst, on_panic| async move { inst.enqueue_one_off_query_bsatn(params, on_panic).await }, + }) + }, ) - .await + .await?; + Ok(()) } pub(in crate::host) fn one_off_query_json_inner(params: OneOffQueryJsonParams) -> OneOffQueryResult { @@ -2971,6 +3031,71 @@ impl ModuleHost { ) } + fn execute_one_off_query( + db: &RelationalDB, + tx: &Tx, + auth: &AuthCtx, + query: &str, + rlb_pool: &impl RowListBuilderSource, + into_rows: impl FnOnce(RawIdentifier, F::List) -> R, + ) -> Result<(R, ExecutionMetrics), anyhow::Error> { + let schema_tx = SchemaViewer::new(tx, auth); + + let ( + // A query may compile down to several plans. + // This happens when there are multiple RLS rules per table. + // The original query is the union of these plans. + plans, + _, + table_name, + _, + ) = compile_subscription(query, &schema_tx, auth)?; + + // Optimize each fragment. + let optimized = plans + .into_iter() + .map(|plan| plan.optimize(auth)) + .collect::, _>>()?; + + check_row_limit( + &optimized, + db, + tx, + // Estimate the number of rows this query will scan. + |plan, tx| estimate_rows_scanned(tx, plan), + auth, + )?; + + let return_table = || optimized.first().and_then(|plan| plan.return_table()); + + let returns_view_table = optimized.first().is_some_and(|plan| plan.returns_view_table()); + let num_cols = return_table().map(|schema| schema.num_cols()).unwrap_or_default(); + let num_private_cols = return_table() + .map(|schema| schema.num_private_cols()) + .unwrap_or_default(); + + let optimized = optimized + .into_iter() + // Convert into something we can execute. + .map(PipelinedProject::from) + .collect::>(); + + let table_name = table_name.into(); + let delta_tx = DeltaTx::from(tx); + let (rows, _, metrics) = if returns_view_table && num_private_cols > 0 { + let optimized = optimized + .into_iter() + .map(|plan| ViewProject::new(plan, num_cols, num_private_cols)) + .collect::>(); + execute_plan_for_view::(&optimized, &delta_tx, rlb_pool) + } else { + execute_plan::(&optimized, &delta_tx, rlb_pool) + } + .context("One-off queries are not allowed to modify the database")?; + + Ok((into_rows(table_name, rows), metrics)) + } + fn one_off_query_v1_inner( db: Arc, subscriptions: ModuleSubscriptions, @@ -2990,68 +3115,9 @@ impl ModuleHost { db.report_read_tx_metrics(reducer, tx_metrics); }); - // We wrap the actual query in a closure so we can use ? to handle errors without making - // the entire transaction abort with an error. - let result: Result<(ws_v1::OneOffTable, ExecutionMetrics), anyhow::Error> = (|| { - let tx = SchemaViewer::new(&*tx, &auth); - - let ( - // A query may compile down to several plans. - // This happens when there are multiple RLS rules per table. - // The original query is the union of these plans. - plans, - _, - table_name, - _, - ) = compile_subscription(&query, &tx, &auth)?; - - // Optimize each fragment - let optimized = plans - .into_iter() - .map(|plan| plan.optimize(&auth)) - .collect::, _>>()?; - - check_row_limit( - &optimized, - &db, - &tx, - // Estimate the number of rows this query will scan - |plan, tx| estimate_rows_scanned(tx, plan), - &auth, - )?; - - let return_table = || optimized.first().and_then(|plan| plan.return_table()); - - let returns_view_table = optimized.first().is_some_and(|plan| plan.returns_view_table()); - let num_cols = return_table().map(|schema| schema.num_cols()).unwrap_or_default(); - let num_private_cols = return_table() - .map(|schema| schema.num_private_cols()) - .unwrap_or_default(); - - let optimized = optimized - .into_iter() - // Convert into something we can execute - .map(PipelinedProject::from) - .collect::>(); - - let table_name = table_name.into(); - - if returns_view_table && num_private_cols > 0 { - let optimized = optimized - .into_iter() - .map(|plan| ViewProject::new(plan, num_cols, num_private_cols)) - .collect::>(); - // Execute the union and return the results - return execute_plan_for_view::(&optimized, &DeltaTx::from(&*tx), &rlb_pool) - .map(|(rows, _, metrics)| (ws_v1::OneOffTable { table_name, rows }, metrics)) - .context("One-off queries are not allowed to modify the database"); - } - - // Execute the union and return the results - execute_plan::(&optimized, &DeltaTx::from(&*tx), &rlb_pool) - .map(|(rows, _, metrics)| (ws_v1::OneOffTable { table_name, rows }, metrics)) - .context("One-off queries are not allowed to modify the database") - })(); + let result = Self::execute_one_off_query(&db, &tx, &auth, &query, &rlb_pool, |table_name, rows| { + ws_v1::OneOffTable { table_name, rows } + }); let total_host_execution_duration = timer.elapsed().into(); let (message, metrics): (SerializableMessage, Option) = match result { @@ -3107,15 +3173,7 @@ impl ModuleHost { rlb_pool, }; log::debug!("One-off query: {}", params.query); - let timer = params.timer; - self.one_off_query_for_websocket( - "one_off_query_v2", - params, - timer, - Self::one_off_query_v2_inner, - |params, inst, on_panic| async move { inst.enqueue_one_off_query_v2(params, on_panic).await }, - ) - .await + self.one_off_query_with_params(OneOffQueryRequest::V2(params)).await } pub(in crate::host) fn one_off_query_v2_inner(params: OneOffQueryV2Params) -> OneOffQueryResult { @@ -3136,60 +3194,10 @@ impl ModuleHost { db.report_read_tx_metrics(reducer, tx_metrics); }); - let result: Result<(ws_v2::SingleTableRows, ExecutionMetrics), anyhow::Error> = (|| { - let tx = SchemaViewer::new(&*tx, &auth); - - let (plans, _, table_name, _) = compile_subscription(&query, &tx, &auth)?; - - let optimized = plans - .into_iter() - .map(|plan| plan.optimize(&auth)) - .collect::, _>>()?; - - check_row_limit(&optimized, &db, &tx, |plan, tx| estimate_rows_scanned(tx, plan), &auth)?; - - let return_table = || optimized.first().and_then(|plan| plan.return_table()); - - let returns_view_table = optimized.first().is_some_and(|plan| plan.returns_view_table()); - let num_cols = return_table().map(|schema| schema.num_cols()).unwrap_or_default(); - let num_private_cols = return_table() - .map(|schema| schema.num_private_cols()) - .unwrap_or_default(); - - let optimized = optimized.into_iter().map(PipelinedProject::from).collect::>(); - - let table_name = table_name.into(); - - if returns_view_table && num_private_cols > 0 { - let optimized = optimized - .into_iter() - .map(|plan| ViewProject::new(plan, num_cols, num_private_cols)) - .collect::>(); - return execute_plan_for_view::(&optimized, &DeltaTx::from(&*tx), &rlb_pool) - .map(|(rows, _, metrics)| { - ( - ws_v2::SingleTableRows { - table: table_name, - rows, - }, - metrics, - ) - }) - .context("One-off queries are not allowed to modify the database"); - } - - execute_plan::(&optimized, &DeltaTx::from(&*tx), &rlb_pool) - .map(|(rows, _, metrics)| { - ( - ws_v2::SingleTableRows { - table: table_name, - rows, - }, - metrics, - ) - }) - .context("One-off queries are not allowed to modify the database") - })(); + let result = + Self::execute_one_off_query::(&db, &tx, &auth, &query, &rlb_pool, |table, rows| { + ws_v2::SingleTableRows { table, rows } + }); let (message, metrics) = match result { Ok((rows, metrics)) => { diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index 37e4c27704f..db5b64bf5fc 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -74,8 +74,8 @@ use crate::config::{V8Config, V8HeapPolicyConfig}; use crate::host::host_controller::CallProcedureReturn; use crate::host::instance_env::{ChunkPool, InstanceEnv, TxSlot}; use crate::host::module_host::{ - call_identity_connected, init_database, ClientConnectedError, OneOffQueryBsatnParams, OneOffQueryJsonParams, - OneOffQueryV2Params, SqlCommand, SqlCommandResult, ViewCommand, ViewCommandMetric, ViewCommandResult, + call_identity_connected, init_database, ClientConnectedError, OneOffQueryRequest, SqlCommand, SqlCommandResult, + ViewCommand, ViewCommandMetric, ViewCommandResult, }; use crate::host::scheduler::{CallScheduledFunctionResult, ScheduledFunctionParams}; use crate::host::wasm_common::instrumentation::CallTimes; @@ -564,36 +564,10 @@ impl JsMainInstance { self.request(CallSqlRequest { cmd }).await } - pub(in crate::host) async fn enqueue_one_off_query_json( - &self, - params: OneOffQueryJsonParams, - on_panic: JsFatalHook, - ) { - self.send_detached_request( - "one_off_query_json", - JsMainWorkerRequest::OneOffQueryJsonDetached { params, on_panic }, - ) - .await - } - - pub(in crate::host) async fn enqueue_one_off_query_bsatn( - &self, - params: OneOffQueryBsatnParams, - on_panic: JsFatalHook, - ) { - self.send_detached_request( - "one_off_query_bsatn", - JsMainWorkerRequest::OneOffQueryBsatnDetached { params, on_panic }, - ) - .await - } - - pub(in crate::host) async fn enqueue_one_off_query_v2(&self, params: OneOffQueryV2Params, on_panic: JsFatalHook) { - self.send_detached_request( - "one_off_query_v2", - JsMainWorkerRequest::OneOffQueryV2Detached { params, on_panic }, - ) - .await + pub(in crate::host) async fn enqueue_one_off_query(&self, request: OneOffQueryRequest, on_panic: JsFatalHook) { + let ctx = request.label(); + self.send_detached_request(ctx, JsMainWorkerRequest::OneOffQueryDetached { request, on_panic }) + .await } } @@ -605,177 +579,106 @@ trait JsMainRequest { fn into_worker_request(self, reply_tx: JsReplyTx) -> JsMainWorkerRequest; } -struct UpdateDatabaseRequest { - program: Program, - old_module_info: Arc, - policy: MigrationPolicy, -} - -impl JsMainRequest for UpdateDatabaseRequest { - type Response = anyhow::Result; - - const CTX: &'static str = "update_database"; - - fn into_worker_request(self, reply_tx: JsReplyTx) -> JsMainWorkerRequest { - JsMainWorkerRequest::UpdateDatabase { - reply_tx, - program: self.program, - old_module_info: self.old_module_info, - policy: self.policy, - } - } -} - -struct CallReducerRequest { - params: CallReducerParams, -} - -impl JsMainRequest for CallReducerRequest { - type Response = ReducerCallResult; - - const CTX: &'static str = "call_reducer"; - - fn into_worker_request(self, reply_tx: JsReplyTx) -> JsMainWorkerRequest { - JsMainWorkerRequest::CallReducer { - reply_tx, - params: self.params, +macro_rules! js_main_request { + ( + $request:ident { + $($field:ident: $field_ty:ty),* $(,)? + } => $ctx:literal, $response:ty, $variant:ident + ) => { + struct $request { + $($field: $field_ty),* } - } -} -struct ScheduledReducerRequest { - params: ScheduledFunctionParams, -} - -impl JsMainRequest for ScheduledReducerRequest { - type Response = CallScheduledFunctionResult; + impl JsMainRequest for $request { + type Response = $response; - const CTX: &'static str = "scheduled_reducer"; + const CTX: &'static str = $ctx; - fn into_worker_request(self, reply_tx: JsReplyTx) -> JsMainWorkerRequest { - JsMainWorkerRequest::ScheduledReducer { - reply_tx, - params: self.params, + fn into_worker_request(self, reply_tx: JsReplyTx) -> JsMainWorkerRequest { + JsMainWorkerRequest::$variant { + reply_tx, + $($field: self.$field),* + } + } } - } -} - -struct ClearAllClientsRequest; - -impl JsMainRequest for ClearAllClientsRequest { - type Response = anyhow::Result<()>; - - const CTX: &'static str = "clear_all_clients"; - - fn into_worker_request(self, reply_tx: JsReplyTx) -> JsMainWorkerRequest { - JsMainWorkerRequest::ClearAllClients(reply_tx) - } -} - -struct CallIdentityConnectedRequest { - caller_auth: ConnectionAuthCtx, - caller_connection_id: ConnectionId, -} + }; + ( + $request:ident => $ctx:literal, $response:ty, $variant:ident + ) => { + struct $request; -impl JsMainRequest for CallIdentityConnectedRequest { - type Response = Result<(), ClientConnectedError>; + impl JsMainRequest for $request { + type Response = $response; - const CTX: &'static str = "call_identity_connected"; + const CTX: &'static str = $ctx; - fn into_worker_request(self, reply_tx: JsReplyTx) -> JsMainWorkerRequest { - JsMainWorkerRequest::CallIdentityConnected { - reply_tx, - caller_auth: self.caller_auth, - caller_connection_id: self.caller_connection_id, + fn into_worker_request(self, reply_tx: JsReplyTx) -> JsMainWorkerRequest { + JsMainWorkerRequest::$variant(reply_tx) + } } - } + }; } -struct CallIdentityDisconnectedRequest { - caller_identity: Identity, - caller_connection_id: ConnectionId, +js_main_request! { + UpdateDatabaseRequest { + program: Program, + old_module_info: Arc, + policy: MigrationPolicy, + } => "update_database", anyhow::Result, UpdateDatabase } -impl JsMainRequest for CallIdentityDisconnectedRequest { - type Response = Result<(), ReducerCallError>; - - const CTX: &'static str = "call_identity_disconnected"; - - fn into_worker_request(self, reply_tx: JsReplyTx) -> JsMainWorkerRequest { - JsMainWorkerRequest::CallIdentityDisconnected { - reply_tx, - caller_identity: self.caller_identity, - caller_connection_id: self.caller_connection_id, - } - } +js_main_request! { + CallReducerRequest { + params: CallReducerParams, + } => "call_reducer", ReducerCallResult, CallReducer } -struct DisconnectClientRequest { - client_id: ClientActorId, +js_main_request! { + ScheduledReducerRequest { + params: ScheduledFunctionParams, + } => "scheduled_reducer", CallScheduledFunctionResult, ScheduledReducer } -impl JsMainRequest for DisconnectClientRequest { - type Response = Result<(), ReducerCallError>; - - const CTX: &'static str = "disconnect_client"; - - fn into_worker_request(self, reply_tx: JsReplyTx) -> JsMainWorkerRequest { - JsMainWorkerRequest::DisconnectClient { - reply_tx, - client_id: self.client_id, - } - } +js_main_request! { + ClearAllClientsRequest => "clear_all_clients", anyhow::Result<()>, ClearAllClients } -struct InitDatabaseRequest { - program: Program, +js_main_request! { + CallIdentityConnectedRequest { + caller_auth: ConnectionAuthCtx, + caller_connection_id: ConnectionId, + } => "call_identity_connected", Result<(), ClientConnectedError>, CallIdentityConnected } -impl JsMainRequest for InitDatabaseRequest { - type Response = anyhow::Result>; - - const CTX: &'static str = "init_database"; - - fn into_worker_request(self, reply_tx: JsReplyTx) -> JsMainWorkerRequest { - JsMainWorkerRequest::InitDatabase { - reply_tx, - program: self.program, - } - } +js_main_request! { + CallIdentityDisconnectedRequest { + caller_identity: Identity, + caller_connection_id: ConnectionId, + } => "call_identity_disconnected", Result<(), ReducerCallError>, CallIdentityDisconnected } -struct CallViewRequest { - cmd: ViewCommand, +js_main_request! { + DisconnectClientRequest { + client_id: ClientActorId, + } => "disconnect_client", Result<(), ReducerCallError>, DisconnectClient } -impl JsMainRequest for CallViewRequest { - type Response = ViewCommandResult; - - const CTX: &'static str = "call_view"; - - fn into_worker_request(self, reply_tx: JsReplyTx) -> JsMainWorkerRequest { - JsMainWorkerRequest::CallView { - reply_tx, - cmd: self.cmd, - } - } +js_main_request! { + InitDatabaseRequest { + program: Program, + } => "init_database", anyhow::Result>, InitDatabase } -struct CallSqlRequest { - cmd: SqlCommand, +js_main_request! { + CallViewRequest { + cmd: ViewCommand, + } => "call_view", ViewCommandResult, CallView } -impl JsMainRequest for CallSqlRequest { - type Response = SqlCommandResult; - - const CTX: &'static str = "call_sql"; - - fn into_worker_request(self, reply_tx: JsReplyTx) -> JsMainWorkerRequest { - JsMainWorkerRequest::CallSql { - reply_tx, - cmd: self.cmd, - } - } +js_main_request! { + CallSqlRequest { + cmd: SqlCommand, + } => "call_sql", SqlCommandResult, CallSql } impl JsProcedureInstance { @@ -927,19 +830,9 @@ enum JsMainWorkerRequest { reply_tx: JsReplyTx, cmd: SqlCommand, }, - /// See [`JsMainInstance::enqueue_one_off_query_json`]. - OneOffQueryJsonDetached { - params: OneOffQueryJsonParams, - on_panic: JsFatalHook, - }, - /// See [`JsMainInstance::enqueue_one_off_query_bsatn`]. - OneOffQueryBsatnDetached { - params: OneOffQueryBsatnParams, - on_panic: JsFatalHook, - }, - /// See [`JsMainInstance::enqueue_one_off_query_v2`]. - OneOffQueryV2Detached { - params: OneOffQueryV2Params, + /// See [`JsMainInstance::enqueue_one_off_query`]. + OneOffQueryDetached { + request: OneOffQueryRequest, on_panic: JsFatalHook, }, /// See [`JsMainInstance::clear_all_clients`]. @@ -1358,7 +1251,6 @@ trait JsWorkerSpec { fn handle_request( request: Self::Request, - rt: &tokio::runtime::Handle, instance_common: &mut InstanceCommon, inst: &mut V8Instance<'_, '_, '_>, module_common: &ModuleCommon, @@ -1395,13 +1287,12 @@ impl JsWorkerSpec for MainJsWorker { fn handle_request( request: Self::Request, - rt: &tokio::runtime::Handle, instance_common: &mut InstanceCommon, inst: &mut V8Instance<'_, '_, '_>, module_common: &ModuleCommon, replica_ctx: &Arc, ) -> WorkerRequestOutcome { - handle_main_worker_request(request, rt, instance_common, inst, module_common, replica_ctx) + handle_main_worker_request(request, instance_common, inst, module_common, replica_ctx) } } @@ -1427,7 +1318,6 @@ impl JsWorkerSpec for ProcedureJsWorker { fn handle_request( request: Self::Request, - _rt: &tokio::runtime::Handle, instance_common: &mut InstanceCommon, inst: &mut V8Instance<'_, '_, '_>, _module_common: &ModuleCommon, @@ -1439,7 +1329,6 @@ impl JsWorkerSpec for ProcedureJsWorker { fn handle_main_worker_request( request: JsMainWorkerRequest, - _rt: &tokio::runtime::Handle, instance_common: &mut InstanceCommon, inst: &mut V8Instance<'_, '_, '_>, module_common: &ModuleCommon, @@ -1495,32 +1384,11 @@ fn handle_main_worker_request( let (res, trapped) = instance_common.handle_sql_cmd(cmd, inst); (res, trapped) }), - JsMainWorkerRequest::OneOffQueryJsonDetached { params, on_panic } => { - handle_detached_worker_request("one_off_query_json", on_panic, || { - let timer = params.timer(); - let res = ModuleHost::one_off_query_json_inner(params); - if let Err(err) = &res { - log::warn!("detached one-off query failed: {err:#}"); - } - ModuleHost::record_one_off_query_round_trip(&module_common.info(), timer); - false - }) - } - JsMainWorkerRequest::OneOffQueryBsatnDetached { params, on_panic } => { - handle_detached_worker_request("one_off_query_bsatn", on_panic, || { - let timer = params.timer(); - let res = ModuleHost::one_off_query_bsatn_inner(params); - if let Err(err) = &res { - log::warn!("detached one-off query failed: {err:#}"); - } - ModuleHost::record_one_off_query_round_trip(&module_common.info(), timer); - false - }) - } - JsMainWorkerRequest::OneOffQueryV2Detached { params, on_panic } => { - handle_detached_worker_request("one_off_query_v2", on_panic, || { - let timer = params.timer(); - let res = ModuleHost::one_off_query_v2_inner(params); + JsMainWorkerRequest::OneOffQueryDetached { request, on_panic } => { + let label = request.label(); + handle_detached_worker_request(label, on_panic, || { + let timer = request.timer(); + let res = request.run(); if let Err(err) = &res { log::warn!("detached one-off query failed: {err:#}"); } @@ -1784,14 +1652,8 @@ where while let Some(request) = W::blocking_recv(&mut request_rx) { core_pinner.pin_if_changed(); - let mut outcome = W::handle_request( - request, - &rt, - &mut instance_common, - &mut inst, - &module_common, - replica_ctx, - ); + let mut outcome = + W::handle_request(request, &mut instance_common, &mut inst, &module_common, replica_ctx); if let WorkerRequestOutcome::Continue = outcome && let Some(heap_metrics) = heap_metrics.as_mut() From a15f6918391e16261786c1f24fed42c00c0b7cb2 Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Wed, 6 May 2026 21:10:45 -0700 Subject: [PATCH 5/8] fix test --- crates/testing/src/modules.rs | 9 +++++++++ .../tests/standalone_integration_test.rs | 18 +++++++----------- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/crates/testing/src/modules.rs b/crates/testing/src/modules.rs index 0b1a1f16d66..750d9ca7606 100644 --- a/crates/testing/src/modules.rs +++ b/crates/testing/src/modules.rs @@ -91,6 +91,15 @@ impl ModuleHandle { self.client.handle_message(message, timer).await.map_err(Into::into) } + pub async fn send_reducer_and_recv_update( + &mut self, + message: impl Into, + request_id: RequestId, + ) -> anyhow::Result<()> { + self.send(message).await?; + self.recv_reducer_update(request_id).await + } + pub async fn recv_message(&mut self) -> Option { self.receiver.recv().await } diff --git a/crates/testing/tests/standalone_integration_test.rs b/crates/testing/tests/standalone_integration_test.rs index d9752553c02..db5b9bedc3a 100644 --- a/crates/testing/tests/standalone_integration_test.rs +++ b/crates/testing/tests/standalone_integration_test.rs @@ -50,29 +50,25 @@ fn test_calling_a_reducer_in_module(module_name: &'static str) { let json = r#"{"CallReducer": {"reducer": "add", "args": "[\"Tyrion\", 24]", "request_id": 0, "flags": 0 }}"# .to_string(); - module.send(json).await.unwrap(); + module.send_reducer_and_recv_update(json, 0).await.unwrap(); let json = r#"{"CallReducer": {"reducer": "add", "args": "[\"Cersei\", 31]", "request_id": 1, "flags": 0 }}"# .to_string(); - module.send(json).await.unwrap(); + module.send_reducer_and_recv_update(json, 1).await.unwrap(); let json = r#"{"CallReducer": {"reducer": "say_hello", "args": "[]", "request_id": 2, "flags": 0 }}"#.to_string(); - module.send(json).await.unwrap(); + module.send_reducer_and_recv_update(json, 2).await.unwrap(); let json = r#"{"CallReducer": {"reducer": "list_over_age", "args": "[30]", "request_id": 3, "flags": 0 }}"# .to_string(); - module.send(json).await.unwrap(); + module.send_reducer_and_recv_update(json, 3).await.unwrap(); let json = r#"{"CallReducer": {"reducer": "log_module_identity", "args": "[]", "request_id": 4, "flags": 0 }}"# .to_string(); - module.send(json).await.unwrap(); - - for request_id in 0..=4 { - module.recv_reducer_update(request_id).await.unwrap(); - } + module.send_reducer_and_recv_update(json, 4).await.unwrap(); assert_eq!( read_logs(&module).await, @@ -325,7 +321,7 @@ fn test_call_query_macro_with_caller>(caller: i #[serial] fn test_call_query_macro() { // Hand-written JSON. This will fail if the JSON encoding of `ClientMessage` changes. - test_call_query_macro_with_caller(|module| async move { + test_call_query_macro_with_caller(|mut module| async move { // Note that JSON doesn't allow multiline strings, so the encoded args string must be on one line! let json = r#" { "CallReducer": { @@ -336,7 +332,7 @@ fn test_call_query_macro() { "flags": 0 } }"# .to_string(); - module.send(json).await.unwrap(); + module.send_reducer_and_recv_update(json, 0).await.unwrap(); module }); From f562ab93d09f2c04bbb175aeafec36039b8cd28b Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Thu, 7 May 2026 09:01:51 -0700 Subject: [PATCH 6/8] fix: send initial subscribe errors --- crates/cli/src/subcommands/subscribe.rs | 8 ++ crates/core/src/host/module_host.rs | 110 ++++++++++++++++++ .../src/host/wasm_common/module_host_actor.rs | 92 +++++++-------- .../subscription/module_subscription_actor.rs | 77 ++++++++++-- 4 files changed, 229 insertions(+), 58 deletions(-) diff --git a/crates/cli/src/subcommands/subscribe.rs b/crates/cli/src/subcommands/subscribe.rs index 1f966fe10b0..5b22b4cd8a5 100644 --- a/crates/cli/src/subcommands/subscribe.rs +++ b/crates/cli/src/subcommands/subscribe.rs @@ -240,6 +240,8 @@ enum Error { }, #[error("encountered failed transaction: {reason}")] TransactionFailure { reason: Box }, + #[error("encountered error in initial subscribe: {reason}")] + SubscribeFailure { reason: Box }, #[error("error formatting response: {source:#}")] Reformat { #[source] @@ -295,6 +297,9 @@ where } break; } + ws_v1::ServerMessage::SubscriptionError(error) => { + return Err(Error::SubscribeFailure { reason: error.error }); + } ws_v1::ServerMessage::TransactionUpdate(ws_v1::TransactionUpdate { status, .. }) => { return Err(match status { ws_v1::UpdateStatus::Failed(msg) => Error::TransactionFailure { reason: msg }, @@ -341,6 +346,9 @@ where details: "received a second initial subscription update", }) } + ws_v1::ServerMessage::SubscriptionError(error) => { + return Err(Error::SubscribeFailure { reason: error.error }); + } ws_v1::ServerMessage::TransactionUpdateLight(ws_v1::TransactionUpdateLight { update, .. }) | ws_v1::ServerMessage::TransactionUpdate(ws_v1::TransactionUpdate { status: ws_v1::UpdateStatus::Committed(update), diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 7ef1d850588..c0f86bb3f64 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -741,6 +741,20 @@ pub(in crate::host) struct ViewCommandMetric { timer: Instant, } +pub(in crate::host) enum ViewCommandErrorTarget { + V1 { + sender: Arc, + request_id: Option, + query_id: Option, + timer: Option, + }, + V2 { + sender: Arc, + request_id: Option, + query_set_id: ws_v2::QuerySetId, + }, +} + impl ViewCommand { pub(in crate::host) fn metric(&self) -> ViewCommandMetric { match self { @@ -759,6 +773,102 @@ impl ViewCommand { }, } } + + pub(in crate::host) fn error_target(&self) -> ViewCommandErrorTarget { + match self { + Self::AddSingleSubscription { + sender, + request, + _timer, + .. + } => ViewCommandErrorTarget::V1 { + sender: sender.clone(), + request_id: Some(request.request_id), + query_id: Some(request.query_id), + timer: Some(*_timer), + }, + Self::AddMultiSubscription { + sender, + request, + _timer, + .. + } => ViewCommandErrorTarget::V1 { + sender: sender.clone(), + request_id: Some(request.request_id), + query_id: Some(request.query_id), + timer: Some(*_timer), + }, + Self::AddLegacySubscription { + sender, + subscribe, + _timer, + .. + } => ViewCommandErrorTarget::V1 { + sender: sender.clone(), + request_id: Some(subscribe.request_id), + query_id: None, + timer: Some(*_timer), + }, + Self::RemoveSingleSubscription { + sender, request, timer, .. + } => ViewCommandErrorTarget::V1 { + sender: sender.clone(), + request_id: Some(request.request_id), + query_id: Some(request.query_id), + timer: Some(*timer), + }, + Self::RemoveMultiSubscription { + sender, request, timer, .. + } => ViewCommandErrorTarget::V1 { + sender: sender.clone(), + request_id: Some(request.request_id), + query_id: Some(request.query_id), + timer: Some(*timer), + }, + Self::AddSubscriptionV2 { sender, request, .. } => ViewCommandErrorTarget::V2 { + sender: sender.clone(), + request_id: Some(request.request_id), + query_set_id: request.query_set_id, + }, + Self::RemoveSubscriptionV2 { sender, request, .. } => ViewCommandErrorTarget::V2 { + sender: sender.clone(), + request_id: Some(request.request_id), + query_set_id: request.query_set_id, + }, + } + } +} + +impl ViewCommandErrorTarget { + pub(in crate::host) fn send(&self, subscriptions: &ModuleSubscriptions, err: &DBError) { + let res = match self { + Self::V1 { + sender, + request_id, + query_id, + timer, + } => subscriptions.send_subscription_error_v1( + sender.clone(), + *request_id, + *query_id, + *timer, + err.to_string().into(), + ), + Self::V2 { + sender, + request_id, + query_set_id, + } => subscriptions.send_subscription_error_v2( + sender.clone(), + *request_id, + *query_set_id, + err.to_string().into(), + ), + }; + if let Err(send_err) = res { + log::warn!("failed to send subscription error: {send_err:#}"); + } + } } pub(in crate::host) struct SqlCommand { diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index c96b86d4a53..b269bbb92e4 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -1042,56 +1042,48 @@ impl InstanceCommon { pub(crate) fn handle_cmd(&mut self, cmds: ViewCommand, inst: &mut I) -> (ViewCommandResult, bool) { let info = self.info.clone(); + let error_target = cmds.error_target(); let mut inst = RefInstance { instance: inst, common: self, }; - match cmds { + let (res, trapped) = match cmds { ViewCommand::AddSingleSubscription { sender, auth, request, _timer: timer, - } => { - let res = info - .subscriptions - .add_single_subscription_with_instance(&mut inst, sender, auth, request, timer, None); - - match res { - Ok((metrics, trapped)) => (Ok(metrics), trapped), - Err(err) => (Err(err), false), - } - } + } => match info + .subscriptions + .add_single_subscription_with_instance(&mut inst, sender, auth, request, timer, None) + { + Ok((metrics, trapped)) => (Ok(metrics), trapped), + Err(err) => (Err(err), false), + }, ViewCommand::AddLegacySubscription { sender, auth, subscribe, _timer: timer, - } => { - let res = info - .subscriptions - .add_legacy_subscriber_with_instance(&mut inst, sender, auth, subscribe, timer, None); - - match res { - Ok((metrics, trapped)) => (Ok(Some(metrics)), trapped), - Err(err) => (Err(err), false), - } - } + } => match info + .subscriptions + .add_legacy_subscriber_with_instance(&mut inst, sender, auth, subscribe, timer, None) + { + Ok((metrics, trapped)) => (Ok(Some(metrics)), trapped), + Err(err) => (Err(err), false), + }, ViewCommand::AddSubscriptionV2 { sender, auth, request, _timer: timer, - } => { - let res = info - .subscriptions - .add_v2_subscription_with_instance(&mut inst, sender, auth, request, timer, None); - - match res { - Ok((metrics, trapped)) => (Ok(metrics), trapped), - Err(err) => (Err(err), false), - } - } + } => match info + .subscriptions + .add_v2_subscription_with_instance(&mut inst, sender, auth, request, timer, None) + { + Ok((metrics, trapped)) => (Ok(metrics), trapped), + Err(err) => (Err(err), false), + }, ViewCommand::RemoveSingleSubscription { sender, auth, @@ -1107,16 +1099,13 @@ impl InstanceCommon { auth, request, timer, - } => { - let res = info - .subscriptions - .remove_v2_subscription_with_instance(&mut inst, sender, auth, request, timer, None); - - match res { - Ok((metrics, trapped)) => (Ok(metrics), trapped), - Err(err) => (Err(err), false), - } - } + } => match info + .subscriptions + .remove_v2_subscription_with_instance(&mut inst, sender, auth, request, timer, None) + { + Ok((metrics, trapped)) => (Ok(metrics), trapped), + Err(err) => (Err(err), false), + }, ViewCommand::RemoveMultiSubscription { sender, auth, @@ -1132,17 +1121,18 @@ impl InstanceCommon { auth, request, _timer: timer, - } => { - let res = info - .subscriptions - .add_multi_subscription_with_instance(&mut inst, sender, auth, request, timer, None); - - match res { - Ok((metrics, trapped)) => (Ok(metrics), trapped), - Err(err) => (Err(err), false), - } - } + } => match info + .subscriptions + .add_multi_subscription_with_instance(&mut inst, sender, auth, request, timer, None) + { + Ok((metrics, trapped)) => (Ok(metrics), trapped), + Err(err) => (Err(err), false), + }, + }; + if let Err(err) = &res { + error_target.send(&info.subscriptions, err); } + (res, trapped) } pub(in crate::host) fn handle_sql_cmd( diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 83760252a5e..ce541a43228 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -37,6 +37,7 @@ use spacetimedb_datastore::traits::{IsolationLevel, TxData}; use spacetimedb_durability::TxOffset; use spacetimedb_execution::pipelined::{PipelinedProject, ViewProject}; use spacetimedb_expr::expr::CollectViews; +use spacetimedb_lib::identity::RequestId; use spacetimedb_lib::metrics::ExecutionMetrics; use spacetimedb_lib::Identity; use spacetimedb_lib::{bsatn, identity::AuthCtx}; @@ -1136,6 +1137,47 @@ impl ModuleSubscriptions { .send_client_message_v2(recipient, Some(tx_offset), message) } + pub fn send_subscription_error_v1( + &self, + recipient: Arc, + request_id: Option, + query_id: Option, + timer: Option, + message: Box, + ) -> Result<(), BroadcastError> { + self.broadcast_queue.send_client_message_v1( + recipient, + None, + SubscriptionMessage { + request_id, + query_id, + timer, + result: SubscriptionResult::Error(SubscriptionError { + table_id: None, + message, + }), + }, + ) + } + + pub fn send_subscription_error_v2( + &self, + recipient: Arc, + request_id: Option, + query_set_id: ws_v2::QuerySetId, + message: Box, + ) -> Result<(), BroadcastError> { + self.broadcast_queue.send_client_message_v2( + recipient, + None, + ws_v2::SubscriptionError { + request_id, + query_set_id, + error: message, + }, + ) + } + /// Add a subscription consisting of multiple queries. /// /// Read more in [`Self::add_single_subscription`]. @@ -1474,18 +1516,39 @@ impl ModuleSubscriptions { timer: Instant, _assert: Option, ) -> Result<(ExecutionMetrics, bool), DBError> { + // Send an error message to the client. + let send_err_msg = |message| { + let _ = self.broadcast_queue.send_client_message_v1( + sender.clone(), + None, + SubscriptionMessage { + request_id: Some(subscription.request_id), + query_id: None, + timer: Some(timer), + result: SubscriptionResult::Error(SubscriptionError { + table_id: None, + message, + }), + }, + ); + }; + // How many queries make up this subscription? let subscription_metrics = &self.metrics.subscribe; let num_queries = subscription.query_strings.len(); subscription_metrics.num_queries_subscribed.inc_by(num_queries as _); - let (queries, auth, mut_tx, compile_timer) = self.compile_queries( - sender.id.identity, - auth, - &subscription.query_strings, - num_queries, - subscription_metrics, - )?; + let (queries, auth, mut_tx, compile_timer) = return_on_err!( + self.compile_queries( + sender.id.identity, + auth, + &subscription.query_strings, + num_queries, + subscription_metrics, + ), + send_err_msg, + (ExecutionMetrics::default(), false) + ); let (mut tx, tx_offset, trapped) = self.materialize_views_and_downgrade_tx(mut_tx, instance, &queries, auth.caller())?; From 5ac8131221fb5dca22b7c3ba3982fee1f33386b7 Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Thu, 7 May 2026 09:52:17 -0700 Subject: [PATCH 7/8] fix test --- .../subscription/module_subscription_actor.rs | 35 ++++--------------- 1 file changed, 7 insertions(+), 28 deletions(-) diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index ce541a43228..d748fdd09ab 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -1516,39 +1516,18 @@ impl ModuleSubscriptions { timer: Instant, _assert: Option, ) -> Result<(ExecutionMetrics, bool), DBError> { - // Send an error message to the client. - let send_err_msg = |message| { - let _ = self.broadcast_queue.send_client_message_v1( - sender.clone(), - None, - SubscriptionMessage { - request_id: Some(subscription.request_id), - query_id: None, - timer: Some(timer), - result: SubscriptionResult::Error(SubscriptionError { - table_id: None, - message, - }), - }, - ); - }; - // How many queries make up this subscription? let subscription_metrics = &self.metrics.subscribe; let num_queries = subscription.query_strings.len(); subscription_metrics.num_queries_subscribed.inc_by(num_queries as _); - let (queries, auth, mut_tx, compile_timer) = return_on_err!( - self.compile_queries( - sender.id.identity, - auth, - &subscription.query_strings, - num_queries, - subscription_metrics, - ), - send_err_msg, - (ExecutionMetrics::default(), false) - ); + let (queries, auth, mut_tx, compile_timer) = self.compile_queries( + sender.id.identity, + auth, + &subscription.query_strings, + num_queries, + subscription_metrics, + )?; let (mut tx, tx_offset, trapped) = self.materialize_views_and_downgrade_tx(mut_tx, instance, &queries, auth.caller())?; From dc989ebae29762c47557321fd63a220c1b1f2ebe Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Thu, 7 May 2026 10:58:34 -0700 Subject: [PATCH 8/8] changes from review --- crates/core/src/host/module_host.rs | 273 ++++++++++++++-------------- 1 file changed, 132 insertions(+), 141 deletions(-) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index c0f86bb3f64..553c7ff685c 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -37,7 +37,7 @@ use derive_more::From; use futures::lock::Mutex; use indexmap::IndexSet; use itertools::Itertools; -use prometheus::{Histogram, IntGauge}; +use prometheus::{Histogram, HistogramTimer, IntGauge}; use scopeguard::ScopeGuard; use smallvec::SmallVec; use spacetimedb_auth::identity::ConnectionAuthCtx; @@ -356,6 +356,22 @@ enum ModuleHostInner { Js(Box), } +struct CallTimerGuard { + queue_timer: Option, + queue_length_gauge: IntGauge, +} + +impl Drop for CallTimerGuard { + fn drop(&mut self) { + self.queue_length_gauge.dec(); + if let Some(queue_timer) = self.queue_timer.take() { + queue_timer.stop_and_record(); + } + } +} + +type WasmtimeInstanceManager = ModuleInstanceManager>; + /// Wasm uses one instance manager for reducers/views and one for procedures. /// /// Both managers share the compiled module via `Arc` so either manager can @@ -364,8 +380,83 @@ enum ModuleHostInner { /// enqueue first instead of racing to allocate multiple main instances. struct WasmtimeModuleHost { executor: SingleCoreExecutor, - main_instance: Arc>>, - procedure_instances: Arc>>, + main_instance: Arc, + procedure_instances: Arc, +} + +impl WasmtimeModuleHost { + fn instance_manager(&self, kind: InstanceKind) -> Arc { + match kind { + InstanceKind::Main => self.main_instance.clone(), + InstanceKind::Procedure => self.procedure_instances.clone(), + } + } + + fn enqueue_job(&self, label: &str, on_panic: Arc, timer_guard: CallTimerGuard, f: F) + where + F: AsyncFnOnce() + Send + 'static, + { + let label = label.to_owned(); + self.executor.enqueue_job(async move || { + scopeguard::defer_on_unwind!({ + log::warn!("wasm job {label} panicked"); + on_panic(); + }); + + drop(timer_guard); + f().await; + }); + } + + fn enqueue_with_instance( + &self, + label: &str, + on_panic: Arc, + timer_guard: CallTimerGuard, + instance_kind: InstanceKind, + arg: A, + wasm: impl AsyncFnOnce(A, &mut ModuleInstance) + Send + 'static, + ) where + A: Send + 'static, + { + let instance_manager = self.instance_manager(instance_kind); + self.enqueue_job(label, on_panic, timer_guard, async move || { + instance_manager + .with_instance(async move |mut inst| { + wasm(arg, &mut inst).await; + ((), inst) + }) + .await; + }); + } + + async fn enqueue_with_procedure_instance( + &self, + label: &str, + on_panic: Arc, + timer_guard: CallTimerGuard, + arg: A, + wasm: impl AsyncFnOnce(A, &mut ModuleInstance) + Send + 'static, + ) where + A: Send + 'static, + { + let instance_manager = self.instance_manager(InstanceKind::Procedure); + let ModuleInstanceLease { instance, slot } = instance_manager.get_instance().await; + let label = label.to_owned(); + self.executor.enqueue_job(async move || { + scopeguard::defer_on_unwind!({ + log::warn!("wasm procedure {label} panicked"); + on_panic(); + }); + + let mut inst = instance; + drop(timer_guard); + wasm(arg, &mut inst).await; + instance_manager + .return_instance(ModuleInstanceLease { instance: inst, slot }) + .await; + }); + } } struct V8ModuleHost { @@ -1603,7 +1694,7 @@ impl ModuleHost { } } - fn start_call_timer(&self, label: &str) -> ScopeGuard<(), impl FnOnce(()) + use<>> { + fn start_call_timer(&self, label: &str) -> CallTimerGuard { // Record the time until our function starts running. let queue_timer = WORKER_METRICS .reducer_wait_time @@ -1620,22 +1711,9 @@ impl ModuleHost { .with_label_values(&self.info.database_identity) .observe(queue_length as f64); } - // Ensure that we always decrement the gauge. - scopeguard::guard((), move |_| { - // Decrement the queue length gauge when we're done. - // This is done in a defer so that it happens even if the reducer call panics. - queue_length_gauge.dec(); - queue_timer.stop_and_record(); - }) - } - - fn wasm_instance_manager( - host: &WasmtimeModuleHost, - kind: InstanceKind, - ) -> Arc>> { - match kind { - InstanceKind::Main => host.main_instance.clone(), - InstanceKind::Procedure => host.procedure_instances.clone(), + CallTimerGuard { + queue_timer: Some(queue_timer), + queue_length_gauge, } } @@ -1665,7 +1743,7 @@ impl ModuleHost { Ok(match &*self.inner { ModuleHostInner::Wasm(host) => { let executor = host.executor.clone(); - let instance_manager = Self::wasm_instance_manager(host, InstanceKind::Main); + let instance_manager = host.instance_manager(InstanceKind::Main); executor .run_job(async move || { drop(timer_guard); @@ -1713,7 +1791,7 @@ impl ModuleHost { Ok(match &*self.inner { ModuleHostInner::Wasm(host) => { let executor = host.executor.clone(); - let instance_manager = Self::wasm_instance_manager(host, InstanceKind::Procedure); + let instance_manager = host.instance_manager(InstanceKind::Procedure); instance_manager .with_instance(async move |mut inst| { executor @@ -1738,110 +1816,13 @@ impl ModuleHost { }) } - fn enqueue_wasm_job(&self, kind: &str, label: &str, f: F) -> Result<(), NoSuchModule> - where - F: AsyncFnOnce() + Send + 'static, - { - self.guard_closed()?; - - let timer_guard = self.start_call_timer(label); - let kind = kind.to_owned(); - let label = label.to_owned(); - let on_panic = self.on_panic.clone(); - - match &*self.inner { - ModuleHostInner::Wasm(host) => { - let executor = host.executor.clone(); - executor.enqueue_job(async move || { - scopeguard::defer_on_unwind!({ - log::warn!("{kind} {label} panicked"); - on_panic(); - }); - - drop(timer_guard); - f().await; - }); - Ok(()) - } - ModuleHostInner::Js(_) => unreachable!("enqueue_wasm_job should only be used for wasm"), - } - } - - fn enqueue_wasm_instance( - &self, - kind: &str, - label: &str, - instance_kind: InstanceKind, - arg: A, - wasm: impl AsyncFnOnce(A, &mut ModuleInstance) + Send + 'static, - ) -> Result<(), NoSuchModule> - where - A: Send + 'static, - { - match &*self.inner { - ModuleHostInner::Wasm(host) => { - let instance_manager = Self::wasm_instance_manager(host, instance_kind); - self.enqueue_wasm_job(kind, label, async move || { - instance_manager - .with_instance(async move |mut inst| { - wasm(arg, &mut inst).await; - ((), inst) - }) - .await; - }) - } - ModuleHostInner::Js(_) => unreachable!("enqueue_wasm_instance should only be used for wasm"), - } - } - - async fn enqueue_wasm_procedure_instance( - &self, - kind: &str, - label: &str, - arg: A, - wasm: impl AsyncFnOnce(A, &mut ModuleInstance) + Send + 'static, - ) -> Result<(), NoSuchModule> - where - A: Send + 'static, - { - self.guard_closed()?; - - let timer_guard = self.start_call_timer(label); - let kind = kind.to_owned(); - let label = label.to_owned(); - let on_panic = self.on_panic.clone(); - - match &*self.inner { - ModuleHostInner::Wasm(host) => { - let executor = host.executor.clone(); - let instance_manager = Self::wasm_instance_manager(host, InstanceKind::Procedure); - let ModuleInstanceLease { instance, slot } = instance_manager.get_instance().await; - executor.enqueue_job(async move || { - scopeguard::defer_on_unwind!({ - log::warn!("{kind} {label} panicked"); - on_panic(); - }); - - let mut inst = instance; - drop(timer_guard); - wasm(arg, &mut inst).await; - instance_manager - .return_instance(ModuleInstanceLease { instance: inst, slot }) - .await; - }); - Ok(()) - } - ModuleHostInner::Js(_) => unreachable!("enqueue_wasm_procedure_instance should only be used for wasm"), - } - } - async fn enqueue_main_operation( &self, panic_kind: &'static str, label: &str, arg: A, js: impl FnOnce(A, JsMainInstance, JsFatalHook) -> JsFut, - wasm: impl FnOnce(&Self, A) -> Result<(), NoSuchModule>, + wasm: impl FnOnce(A, &WasmtimeModuleHost, JsFatalHook, CallTimerGuard) -> Result<(), NoSuchModule>, ) -> Result<(), NoSuchModule> where A: Send + 'static, @@ -1863,7 +1844,12 @@ impl ModuleHost { .await; Ok(()) } - ModuleHostInner::Wasm(_) => wasm(self, arg), + ModuleHostInner::Wasm(wasm_host) => { + self.guard_closed()?; + let timer_guard = self.start_call_timer(label); + let on_panic = self.on_panic.clone(); + wasm(arg, wasm_host, on_panic, timer_guard) + } } } @@ -1880,11 +1866,12 @@ impl ModuleHost { label, (cmd, metric), |(cmd, metric), inst, on_panic| async move { inst.enqueue_call_view(cmd, metric, on_panic).await }, - move |module, (cmd, metric)| { + move |(cmd, metric), wasm_host, on_panic, timer_guard| { let info = info.clone(); - module.enqueue_wasm_instance( - "main-instance operation", + wasm_host.enqueue_with_instance( label, + on_panic, + timer_guard, InstanceKind::Main, cmd, async move |cmd, inst| { @@ -1894,7 +1881,8 @@ impl ModuleHost { log::warn!("websocket view operation failed: {err:#}"); } }, - ) + ); + Ok(()) }, ) .await @@ -2308,16 +2296,18 @@ impl ModuleHost { reducer_name, call.params, |params, inst, on_panic| async move { inst.enqueue_reducer(params, on_panic).await }, - move |module, params| { - module.enqueue_wasm_instance( - "main-instance operation", + move |params, wasm_host, on_panic, timer_guard| { + wasm_host.enqueue_with_instance( &reducer_label, + on_panic, + timer_guard, InstanceKind::Main, params, async |params, inst| { let _ = inst.call_reducer(params); }, - ) + ); + Ok(()) }, ) .await @@ -2527,14 +2517,17 @@ impl ModuleHost { }); Ok(()) } - ModuleHostInner::Wasm(_) => { + ModuleHostInner::Wasm(wasm_host) => { let module = self.clone(); let procedure_name_for_job = procedure_name.clone(); let target_for_job = target.clone(); - match self - .enqueue_wasm_procedure_instance( - "pooled operation", + let timer_guard = self.start_call_timer(&procedure_name); + let on_panic = self.on_panic.clone(); + wasm_host + .enqueue_with_procedure_instance( &procedure_name, + on_panic, + timer_guard, params, async move |params, inst| { let ret = inst.call_procedure(params).await; @@ -2548,11 +2541,8 @@ impl ModuleHost { } }, ) - .await - { - Ok(()) => Ok(()), - Err(err) => self.send_procedure_error(&procedure_name, timer, target, err.into()), - } + .await; + Ok(()) } } } @@ -3079,15 +3069,16 @@ impl ModuleHost { label, request, |request, inst, on_panic| async move { inst.enqueue_one_off_query(request, on_panic).await }, - move |module, request| { + move |request, wasm_host, on_panic, timer_guard| { let info = info.clone(); - module.enqueue_wasm_job("module-thread operation", label, async move || { + wasm_host.enqueue_job(label, on_panic, timer_guard, async move || { let result = request.run(); Self::record_one_off_query_round_trip(&info, timer); if let Err(err) = result { log::warn!("One-off query failed: {err:#}"); } - }) + }); + Ok(()) }, ) .await?;