diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index f1d73043da..878a4689d2 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -352,6 +352,28 @@ jobs: ./target/release/turso_whopper ${{ matrix.profile.args }} || exit 1 done + concurrent-simulator-windows: + runs-on: windows-latest + timeout-minutes: 30 + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + - uses: Swatinem/rust-cache@v2 + with: + prefix-key: "v1-rust" + cache-on-failure: true + - uses: "./.github/shared/setup-sccache" + - name: Run Windows concurrent simulator multiprocess regressions + run: cargo test -q -p turso_whopper --test regression_tests + - name: Run Windows concurrent simulator cross-platform regression + run: cargo test -q -p turso_whopper --test regression_tests_cross_platform + - name: Run Windows CLI multiprocess VFS regression + run: cargo test -q -p turso_cli input::tests::experimental_win_iocp_backend_is_available_for_path_databases -- --exact + - name: Run Windows core multiprocess regressions + run: cargo test -q -p turso_core --features "fs experimental_win_iocp" multiprocess_tests:: -- --nocapture + - name: Run Windows multiprocess startup smoke + run: cargo run -q -p turso_whopper -- --mode fast --multiprocess --connections-per-process 1 --processes 2 --max-steps 0 + test-sqlite: runs-on: blacksmith-4vcpu-ubuntu-2404 timeout-minutes: 30 diff --git a/cli/Cargo.toml b/cli/Cargo.toml index da52e9bf72..8bb47aaadf 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -65,7 +65,7 @@ bytes = "1.11.1" itertools.workspace = true [features] -default = ["io_uring", "mimalloc"] +default = ["io_uring", "mimalloc", "experimental_win_iocp"] io_uring = ["turso_core/io_uring"] experimental_win_iocp = ["turso_core/experimental_win_iocp"] tracing_release = ["turso_core/tracing_release"] diff --git a/cli/app.rs b/cli/app.rs index 796a4e7956..1c6fd440f6 100644 --- a/cli/app.rs +++ b/cli/app.rs @@ -93,7 +93,10 @@ pub struct Opts { pub experimental_attach: bool, #[clap(long, help = "Enable experimental generated columns feature")] pub experimental_generated_columns: bool, - #[clap(long, help = "Enable experimental multiprocess WAL coordination")] + #[clap( + long, + help = "Enable experimental multiprocess WAL coordination (on Windows, use --vfs experimental_win_iocp)" + )] pub experimental_multiprocess_wal: bool, #[cfg(feature = "mvcc_repl")] #[clap(long, help = "Start MVCC concurrent transaction harness")] diff --git a/cli/input.rs b/cli/input.rs index f1ee8b44a1..d9849bcee6 100644 --- a/cli/input.rs +++ b/cli/input.rs @@ -192,6 +192,18 @@ pub fn get_io(db_location: DbLocation, io_choice: &str) -> anyhow::Result { target: &'a Arc, // accumulate raw bytes to support non-utf8 BLOB types diff --git a/core/Cargo.toml b/core/Cargo.toml index 831668ba2b..f8f4aa91c1 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -45,6 +45,9 @@ optimizer_params = ["serde", "dep:serde_json"] [target.'cfg(target_os = "windows")'.dependencies] windows-sys = { version = "0.61.2", features = [ "Win32_System_IO", + "Win32_System_Memory", + "Win32_System_SystemInformation", + "Win32_System_Threading", "Win32_Storage_FileSystem", "Win32_Security", "Win32_System_Diagnostics_Debug", diff --git a/core/build.rs b/core/build.rs index 0d164dbf76..d304999c45 100644 --- a/core/build.rs +++ b/core/build.rs @@ -6,6 +6,7 @@ use std::{env, fs}; fn main() { cfg_aliases! { injected_yields: { any(feature = "test_helper", feature = "simulator") }, + host_shared_wal: { all(any(unix, target_os = "windows"), target_pointer_width = "64") }, } // Ensure Cargo reruns when this script or the reproducibility seed changes. diff --git a/core/connection.rs b/core/connection.rs index 42c1aac5d2..ce87c2bd08 100644 --- a/core/connection.rs +++ b/core/connection.rs @@ -1596,7 +1596,7 @@ impl Connection { .block(|| pager.checkpoint(mode, SyncMode::Full, true)) } - #[cfg(all(feature = "simulator", target_pointer_width = "64", unix))] + #[cfg(all(feature = "simulator", target_pointer_width = "64", host_shared_wal))] pub fn install_unpublished_backfill_proof_for_testing( &self, upper_bound_inclusive: u64, diff --git a/core/io/win_iocp.rs b/core/io/win_iocp.rs index a2f01bc6be..6f8b111289 100644 --- a/core/io/win_iocp.rs +++ b/core/io/win_iocp.rs @@ -61,8 +61,10 @@ use tracing::{debug, instrument, trace, warn, Level}; use super::FileSyncType; use crate::io::completions::CompletionInner; +use crate::io::{SharedWalLockKind, SharedWalMappedRegion}; use windows_sys::Win32::Foundation::{ CloseHandle, GetLastError, LocalFree, ERROR_HANDLE_EOF, ERROR_IO_PENDING, + ERROR_LOCK_VIOLATION, ERROR_NOT_LOCKED, ERROR_OPERATION_ABORTED, FALSE, GENERIC_READ, GENERIC_WRITE, HANDLE, INVALID_HANDLE_VALUE, TRUE, WAIT_TIMEOUT, }; @@ -77,14 +79,19 @@ use windows_sys::Win32::System::IO::{ CancelIoEx, CreateIoCompletionPort, GetOverlappedResult, GetQueuedCompletionStatus, OVERLAPPED, OVERLAPPED_0, OVERLAPPED_0_0, }; +use windows_sys::Win32::System::Memory::{ + CreateFileMappingW, MapViewOfFile, UnmapViewOfFile, FILE_MAP_READ, FILE_MAP_WRITE, + PAGE_READWRITE, +}; +use windows_sys::Win32::System::SystemInformation::{GetSystemInfo, SYSTEM_INFO}; +use windows_sys::Win32::System::Threading::CreateEventW; // Constants const CACHING_CAPACITY: usize = 128; //TODO: enable this or remove when direct IO stabilized const ENABLE_DIRECT_IO: bool = false; -//TODO: enable this or remove when windows locking stabilized -const ENABLE_LOCK_ON_OPEN: bool = false; +const ENABLE_LOCK_ON_OPEN: bool = true; // Types @@ -106,8 +113,6 @@ enum GetIOCPPacketError { enum IoKind { Write(Arc), Read, - Lock, - Unlock, Unknown, } @@ -207,6 +212,48 @@ pub struct WindowsIOCP { instance: Arc, } +struct WindowsSharedWalMapping { + mapping_handle: HANDLE, + view_ptr: NonNull, + ptr: NonNull, + len: usize, +} + +unsafe impl Send for WindowsSharedWalMapping {} +unsafe impl Sync for WindowsSharedWalMapping {} + +impl SharedWalMappedRegion for WindowsSharedWalMapping { + fn ptr(&self) -> NonNull { + self.ptr + } + + fn len(&self) -> usize { + self.len + } +} + +impl Drop for WindowsSharedWalMapping { + fn drop(&mut self) { + unsafe { + if UnmapViewOfFile(windows_sys::Win32::System::Memory::MEMORY_MAPPED_VIEW_ADDRESS { + Value: self.view_ptr.as_ptr().cast(), + }) == FALSE + { + tracing::error!( + "UnmapViewOfFile failed for shared WAL coordination region: {}", + io::Error::last_os_error() + ); + } + if CloseHandle(self.mapping_handle) == FALSE { + tracing::error!( + "CloseHandle failed for shared WAL mapping: {}", + io::Error::last_os_error() + ); + } + } + } +} + impl WindowsIOCP { pub fn new() -> Result { debug!("Using IO backend 'win_iocp'"); @@ -227,6 +274,10 @@ unsafe impl Sync for WindowsIOCP {} crate::assert::assert_send_sync!(WindowsIOCP); impl IO for WindowsIOCP { + fn supports_shared_wal_coordination(&self) -> bool { + true + } + #[instrument(skip_all, level = Level::TRACE)] fn open_file( &self, @@ -282,7 +333,7 @@ impl IO for WindowsIOCP { ); if file_handle == INVALID_HANDLE_VALUE { - return Err(get_generic_limboerror_from_last_os_err()); + return Err(io_error(io::Error::last_os_error(), "open")); }; let windows_file = Arc::new(WindowsFile { @@ -294,12 +345,15 @@ impl IO for WindowsIOCP { let result = CreateIoCompletionPort(file_handle, self.instance.iocp_queue_handle, 0, 0); if result.is_null() { - return Err(get_generic_limboerror_from_last_os_err()); + return Err(io_error( + io::Error::last_os_error(), + "associate file with iocp", + )); }; if ENABLE_LOCK_ON_OPEN - && (std::env::var(common::ENV_DISABLE_FILE_LOCK).is_err() - || !open_flags.contains(OpenFlags::ReadOnly)) + && !open_flags.intersects(OpenFlags::ReadOnly | OpenFlags::NoLock) + && std::env::var(common::ENV_DISABLE_FILE_LOCK).is_err() { windows_file.lock_file(true)?; } @@ -480,13 +534,16 @@ impl InnerWindowsIOCP { fn forget_io_packet(&self, mut io_packet: IoPacket) -> Option<(Option, IoKind)> { trace!("forget packet and completion"); - if let Some(completion) = io_packet.completion.as_ref() { - // this may be removed earlier in cancel - // so this operation is optional if the record exists - let _ = self.pop_io_context_from_completion(completion); - }; + if let Some(completion) = io_packet.completion.as_ref().cloned() { + // Prefer the tracked packet when it still exists: the raw IOCP alias is + // an extra Arc clone and cannot be recycled in place until we drop it. + if let Some(context) = self.pop_io_context_from_completion(&completion) { + drop(io_packet); + io_packet = context.io_packet; + } + } - let internals = Arc::get_mut(&mut io_packet).unwrap(); + let internals = Arc::get_mut(&mut io_packet)?; let completion = internals.completion.take(); let kind = mem::replace(&mut internals.kind, IoKind::Unknown); @@ -629,35 +686,143 @@ pub struct WindowsFile { } impl WindowsFile { - fn sync_iocp_operation( + fn overlapped_for_position(position: u64) -> OVERLAPPED { + unsafe { + let mut overlapped: OVERLAPPED = mem::zeroed(); + overlapped.Anonymous = OVERLAPPED_0 { + Anonymous: OVERLAPPED_0_0 { + Offset: position as u32, + OffsetHigh: (position >> 32) as u32, + }, + }; + overlapped + } + } + + fn suppressed_iocp_overlapped_for_position(position: u64) -> Result<(OVERLAPPED, HANDLE)> { + let event = unsafe { CreateEventW(ptr::null(), TRUE, FALSE, ptr::null()) }; + if event.is_null() { + return Err(get_generic_limboerror_from_last_os_err()); + } + + let mut overlapped = Self::overlapped_for_position(position); + overlapped.hEvent = ((event as usize) | 1) as HANDLE; + Ok((overlapped, event)) + } + + fn lock_range( &self, - kind: IoKind, - io_function: impl Fn(*mut OVERLAPPED) -> BOOL, - ) -> Result<(), u32> { - let mut bytes = 0; - let packet_io = self.parent_io.build_io_packet(None, 0, kind); - let overlapped_ptr = Arc::into_raw(packet_io) as *mut OVERLAPPED; + offset: u64, + len: u64, + exclusive: bool, + fail_immediately: bool, + ) -> Result { + let (mut overlapped, event) = Self::suppressed_iocp_overlapped_for_position(offset)?; + let flags = (if exclusive { + LOCKFILE_EXCLUSIVE_LOCK + } else { + 0 + }) | if fail_immediately { + LOCKFILE_FAIL_IMMEDIATELY + } else { + 0 + }; + let low = len as u32; + let high = (len >> 32) as u32; + let result = (|| { + unsafe { + if LockFileEx(self.file_handle, flags, 0, low, high, &raw mut overlapped) == TRUE { + return Ok(true); + } + } + + let initial_error = unsafe { GetLastError() }; + if initial_error == ERROR_LOCK_VIOLATION { + return Ok(false); + } + if initial_error != ERROR_IO_PENDING { + return Err(LimboError::LockingError( + io::Error::from_raw_os_error(initial_error as i32).to_string(), + )); + } + + let mut bytes = 0; + unsafe { + if GetOverlappedResult( + self.file_handle, + &raw mut overlapped, + &raw mut bytes, + TRUE, + ) == TRUE + { + return Ok(true); + } + } + + let completion_error = unsafe { GetLastError() }; + if completion_error == ERROR_LOCK_VIOLATION { + return Ok(false); + } + Err(LimboError::LockingError( + io::Error::from_raw_os_error(completion_error as i32).to_string(), + )) + })(); + unsafe { - let result = io_function(overlapped_ptr); - let error = GetLastError(); - // the io function fails - if result == FALSE && error != ERROR_IO_PENDING { - let restored_io_packet = Arc::from_raw(overlapped_ptr as *mut IoOverlappedPacket); - let _ = self.parent_io.forget_io_packet(restored_io_packet); - return Err(GetLastError()); + CloseHandle(event); + } + + result + } + + fn unlock_range(&self, offset: u64, len: u64) -> Result<()> { + let (mut overlapped, event) = Self::suppressed_iocp_overlapped_for_position(offset)?; + let low = len as u32; + let high = (len >> 32) as u32; + let result = (|| { + unsafe { + if UnlockFileEx(self.file_handle, 0, low, high, &raw mut overlapped) == TRUE { + return Ok(()); + } } - // if it is async wait for it - if result == FALSE - // && error == ERROR_IO_PENDING (just to remember) - && GetOverlappedResult(self.file_handle, overlapped_ptr, &raw mut bytes, TRUE) - == FALSE - { - return Err(GetLastError()); + let initial_error = unsafe { GetLastError() }; + if initial_error == ERROR_NOT_LOCKED { + return Ok(()); + } + if initial_error != ERROR_IO_PENDING { + return Err(LimboError::LockingError( + io::Error::from_raw_os_error(initial_error as i32).to_string(), + )); } + + let mut bytes = 0; + unsafe { + if GetOverlappedResult( + self.file_handle, + &raw mut overlapped, + &raw mut bytes, + TRUE, + ) == TRUE + { + return Ok(()); + } + } + + let completion_error = unsafe { GetLastError() }; + if completion_error == ERROR_NOT_LOCKED { + return Ok(()); + } + Err(LimboError::LockingError( + io::Error::from_raw_os_error(completion_error as i32).to_string(), + )) + })(); + + unsafe { + CloseHandle(event); } - Ok(()) + result } fn async_iocp_operation( @@ -684,7 +849,9 @@ impl WindowsFile { } unsafe { - if io_function(overlapped_ptr) == FALSE && GetLastError() != ERROR_IO_PENDING { + let result = io_function(overlapped_ptr); + let error = GetLastError(); + if result == FALSE && error != ERROR_IO_PENDING { let io_packet = Arc::from_raw(overlapped_ptr as *mut IoOverlappedPacket); let _ = self.parent_io.forget_io_packet(io_packet); return Err(get_generic_limboerror_from_last_os_err()); @@ -706,38 +873,20 @@ impl File for WindowsFile { self.file_handle.addr() ); - let locking_flags = if exclusive_access { - LOCKFILE_EXCLUSIVE_LOCK | LOCKFILE_FAIL_IMMEDIATELY - } else { - LOCKFILE_FAIL_IMMEDIATELY - }; - - self.sync_iocp_operation(IoKind::Lock, |overlapped| unsafe { - LockFileEx( - self.file_handle, - locking_flags, - 0, - u32::MAX, - u32::MAX, - overlapped, - ) - }) - .map_err(|err| { - let error = io::Error::from_raw_os_error(err as i32); - LimboError::LockingError(error.to_string()) - }) + match self.lock_range(0, u64::MAX, exclusive_access, true) { + Ok(true) => Ok(()), + Ok(false) => Err(LimboError::LockingError( + "The process cannot access the file because another process has locked a portion of the file." + .into(), + )), + Err(err) => Err(err), + } } #[instrument(err, skip_all, level = Level::TRACE)] fn unlock_file(&self) -> Result<()> { trace!("Unlocking file {:08X}", self.file_handle.addr()); - self.sync_iocp_operation(IoKind::Unlock, |overlapped| unsafe { - UnlockFileEx(self.file_handle, 0, u32::MAX, u32::MAX, overlapped) - }) - .map_err(|err| { - let error = io::Error::from_raw_os_error(err as i32); - LimboError::LockingError(error.to_string()) - }) + self.unlock_range(0, u64::MAX) } #[instrument(skip(self, completion), level = Level::TRACE)] @@ -862,6 +1011,122 @@ impl File for WindowsFile { filesize.try_into().map_err(get_limboerror_from_std_error) } + + fn shared_wal_lock_byte( + &self, + offset: u64, + exclusive: bool, + _kind: SharedWalLockKind, + ) -> Result<()> { + match self.lock_range(offset, 1, exclusive, false) { + Ok(true) => Ok(()), + Ok(false) => Err(LimboError::LockingError( + "Failed locking shared WAL coordination file. File is locked by another process" + .into(), + )), + Err(err) => Err(err), + } + } + + fn shared_wal_try_lock_byte( + &self, + offset: u64, + exclusive: bool, + _kind: SharedWalLockKind, + ) -> Result { + self.lock_range(offset, 1, exclusive, true) + } + + fn shared_wal_unlock_byte(&self, offset: u64, _kind: SharedWalLockKind) -> Result<()> { + self.unlock_range(offset, 1) + } + + fn shared_wal_set_len(&self, len: u64) -> Result<()> { + unsafe { + let file_info = FILE_END_OF_FILE_INFO { + EndOfFile: len.try_into().map_err(get_limboerror_from_std_error)?, + }; + + if SetFileInformationByHandle( + self.file_handle, + FileEndOfFileInfo, + (&raw const file_info).cast(), + size_of_val(&file_info) + .try_into() + .map_err(get_limboerror_from_std_error)?, + ) == FALSE + { + return Err(io_error( + io::Error::last_os_error(), + "resize shared WAL coordination file", + )); + } + } + + Ok(()) + } + + fn shared_wal_map(&self, offset: u64, len: usize) -> Result> { + if len == 0 { + return Err(LimboError::InternalError( + "cannot map shared WAL coordination region with zero length".into(), + )); + } + + let mut system_info = unsafe { mem::zeroed::() }; + unsafe { GetSystemInfo(&raw mut system_info) }; + let granularity = u64::from(system_info.dwAllocationGranularity); + if granularity == 0 { + return Err(LimboError::LockingError( + "failed to determine shared WAL mapping allocation granularity".into(), + )); + } + + let aligned_offset = offset / granularity * granularity; + let prefix_len = (offset - aligned_offset) as usize; + let view_len = prefix_len + .checked_add(len) + .ok_or_else(|| LimboError::InternalError("shared WAL map length overflow".into()))?; + + let mapping_handle = + unsafe { CreateFileMappingW(self.file_handle, ptr::null(), PAGE_READWRITE, 0, 0, ptr::null()) }; + if mapping_handle.is_null() { + return Err(io_error( + io::Error::last_os_error(), + "create shared WAL file mapping", + )); + } + + let offset_high = (aligned_offset >> 32) as u32; + let offset_low = aligned_offset as u32; + let mapped_ptr = unsafe { + MapViewOfFile( + mapping_handle, + FILE_MAP_READ | FILE_MAP_WRITE, + offset_high, + offset_low, + view_len, + ) + }; + if mapped_ptr.Value.is_null() { + unsafe { + CloseHandle(mapping_handle); + } + return Err(io_error(io::Error::last_os_error(), "map shared WAL coordination file")); + } + + let view_ptr = NonNull::new(mapped_ptr.Value.cast::()) + .expect("MapViewOfFile returned null for shared WAL map"); + let ptr = NonNull::new(unsafe { view_ptr.as_ptr().add(prefix_len) }) + .expect("mapped base plus prefix_len returned null"); + + Ok(Box::new(WindowsSharedWalMapping { + mapping_handle, + view_ptr, + ptr, + len, + })) + } } impl Drop for WindowsFile { @@ -954,7 +1219,7 @@ mod tests { buffer.as_mut_slice().copy_from_slice(write); - let _ = file.pwrite(0, buffer, comp).unwrap(); + drop(file.pwrite(0, buffer, comp).unwrap()); drop(iocp); drop(file); } diff --git a/core/lib.rs b/core/lib.rs index 829c83afef..9800202759 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -7,7 +7,12 @@ pub mod index_method; pub mod io; #[cfg(all(feature = "json", any(feature = "fuzz", feature = "bench")))] pub mod json; -#[cfg(all(test, feature = "fs", unix, target_pointer_width = "64"))] +#[cfg(all( + test, + feature = "fs", + host_shared_wal, + any(not(target_os = "windows"), feature = "experimental_win_iocp") +))] mod multiprocess_tests; pub mod mvcc; #[cfg(any(feature = "fuzz", feature = "bench"))] @@ -91,9 +96,9 @@ use arc_swap::{ArcSwap, ArcSwapOption}; use core::str; use rustc_hash::{FxHashMap as HashMap, FxHashSet as HashSet}; use schema::Schema; -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(host_shared_wal)] use std::path::Path; -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(host_shared_wal)] use std::sync::OnceLock; use std::{ fmt::{self}, @@ -102,7 +107,7 @@ use std::{ }; #[cfg(feature = "fs")] use storage::database::DatabaseFile; -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(host_shared_wal)] use storage::shared_wal_coordination::MappedSharedWalCoordination; use storage::{page_cache::PageCache, sqlite3_ondisk::PageSize}; use tracing::{instrument, Level}; @@ -465,7 +470,7 @@ pub struct Database { /// (commit, sync, checkpoint thresholds, etc.) instead of the built-in storage. durable_storage: Option>, shared_wal: Arc>, - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] shared_wal_coordination: OnceLock>, init_lock: Arc>, open_flags: OpenFlags, @@ -602,7 +607,7 @@ impl Database { }))), _shared_page_cache: shared_page_cache, shared_wal, - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] shared_wal_coordination: OnceLock::new(), db_file, builtin_syms: parking_lot::RwLock::new(syms), @@ -662,7 +667,7 @@ impl Database { } #[cfg(feature = "fs")] - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] fn effective_open_flags_for_path( io: &Arc, path: &str, @@ -697,7 +702,7 @@ impl Database { } #[cfg(feature = "fs")] - #[cfg(not(all(unix, target_pointer_width = "64")))] + #[cfg(not(host_shared_wal))] fn effective_open_flags_for_path( _io: &Arc, _path: &str, @@ -711,7 +716,7 @@ impl Database { } #[cfg(feature = "fs")] - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] fn reject_live_multiprocess_wal_for_legacy_open( io: &Arc, path: &str, @@ -746,7 +751,7 @@ impl Database { } #[cfg(feature = "fs")] - #[cfg(not(all(unix, target_pointer_width = "64")))] + #[cfg(not(host_shared_wal))] fn reject_live_multiprocess_wal_for_legacy_open( _io: &Arc, _path: &str, @@ -756,7 +761,7 @@ impl Database { } #[cfg(feature = "fs")] - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] fn reject_live_legacy_wal_for_multiprocess_open( io: &Arc, path: &str, @@ -778,7 +783,7 @@ impl Database { } #[cfg(feature = "fs")] - #[cfg(not(all(unix, target_pointer_width = "64")))] + #[cfg(not(host_shared_wal))] fn reject_live_legacy_wal_for_multiprocess_open( _io: &Arc, _path: &str, @@ -1560,13 +1565,13 @@ impl Database { // Always Open shared wal and set it in the Database and Pager. // MVCC currently requires a WAL open to function - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] let shared_authority = self.open_shared_wal_coordination_for_open()?; - #[cfg(not(all(unix, target_pointer_width = "64")))] + #[cfg(not(host_shared_wal))] let shared_authority: Option<()> = None; let shared_wal = { - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] { if let Some(authority) = shared_authority.as_ref() { // The no-scan open path only works if the shared frame index @@ -1587,7 +1592,7 @@ impl Database { WalFileShared::open_shared_if_exists(&self.io, &self.wal_path, flags)? } } - #[cfg(not(all(unix, target_pointer_width = "64")))] + #[cfg(not(host_shared_wal))] { WalFileShared::open_shared_if_exists(&self.io, &self.wal_path, flags)? } @@ -1937,7 +1942,60 @@ impl Database { Ok(Self::filesystem_type_allows_shared_wal(fs_type)) } - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(all(target_os = "windows", target_pointer_width = "64"))] + fn path_allows_shared_wal_coordination(path: &Path) -> Result { + use std::iter::once; + use std::os::windows::ffi::OsStrExt; + use windows_sys::Win32::Storage::FileSystem::{GetDriveTypeW, GetVolumePathNameW}; + + const DRIVE_REMOVABLE: u32 = 2; + const DRIVE_FIXED: u32 = 3; + const DRIVE_REMOTE: u32 = 4; + const DRIVE_RAMDISK: u32 = 6; + + let probe_path = if path.exists() { + path.to_path_buf() + } else { + path.parent() + .filter(|parent| !parent.as_os_str().is_empty()) + .unwrap_or_else(|| Path::new(".")) + .to_path_buf() + }; + let probe_path = if probe_path.is_absolute() { + probe_path + } else { + std::env::current_dir() + .map_err(|err| io_error(err, "resolve shared WAL coordination path"))? + .join(probe_path) + }; + let probe_path_wide: Vec = probe_path + .as_os_str() + .encode_wide() + .chain(once(0)) + .collect(); + let mut volume_path = vec![0u16; 261]; + let result = unsafe { + GetVolumePathNameW( + probe_path_wide.as_ptr(), + volume_path.as_mut_ptr(), + volume_path.len() as u32, + ) + }; + if result == 0 { + return Err(io_error( + std::io::Error::last_os_error(), + "GetVolumePathNameW shared WAL coordination path", + )); + } + + let drive_type = unsafe { GetDriveTypeW(volume_path.as_ptr()) }; + Ok( + matches!(drive_type, DRIVE_FIXED | DRIVE_RAMDISK | DRIVE_REMOVABLE) + && drive_type != DRIVE_REMOTE, + ) + } + + #[cfg(host_shared_wal)] pub(crate) fn shared_wal_coordination( &self, ) -> Result>> { @@ -1949,19 +2007,19 @@ impl Database { self.open_shared_wal_coordination_inner() } - #[cfg(not(all(unix, target_pointer_width = "64")))] + #[cfg(not(host_shared_wal))] pub(crate) fn shared_wal_coordination(&self) -> Result> { Ok(None) } - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] pub(crate) fn open_shared_wal_coordination_for_open( &self, ) -> Result>> { self.open_shared_wal_coordination_inner() } - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] fn open_shared_wal_coordination_inner( &self, ) -> Result>> { @@ -2033,7 +2091,7 @@ impl Database { let reopened_checkpoint_seq = shared_wal.metadata.wal_header.lock().checkpoint_seq; drop(shared_wal); - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] let (coordination_open_mode, sanitized_backfill_proof_on_open) = if let Some(authority) = self.shared_wal_coordination()? { let mode = match authority.open_mode() { @@ -2048,7 +2106,7 @@ impl Database { } else { (None, false) }; - #[cfg(not(all(unix, target_pointer_width = "64")))] + #[cfg(not(host_shared_wal))] let (coordination_open_mode, sanitized_backfill_proof_on_open) = (None, false); Ok(SharedWalOpenTelemetry { @@ -2063,7 +2121,7 @@ impl Database { #[cfg(feature = "simulator")] pub fn shared_wal_snapshot_for_testing(&self) -> Result> { - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] if let Some(authority) = self.shared_wal_coordination()? { let snapshot = authority.snapshot(); return Ok(Some(SharedWalTestingSnapshot { @@ -2079,7 +2137,7 @@ impl Database { #[cfg(feature = "simulator")] pub fn shared_wal_find_frame_for_testing(&self, page_id: u64) -> Result> { - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] if let Some(authority) = self.shared_wal_coordination()? { let snapshot = authority.snapshot(); return Ok(authority.find_frame(page_id, 0, snapshot.max_frame, None)); @@ -2113,7 +2171,7 @@ impl Database { #[cfg(feature = "simulator")] pub fn clear_backfill_proof_for_testing(&self) -> Result<()> { - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] { let authority = self.shared_wal_coordination()?.ok_or_else(|| { LimboError::InternalError("shared WAL authority is unavailable".into()) @@ -2122,7 +2180,7 @@ impl Database { Ok(()) } - #[cfg(not(all(unix, target_pointer_width = "64")))] + #[cfg(not(host_shared_wal))] { Err(LimboError::InternalError( "shared WAL authority is unavailable on this platform".into(), @@ -2135,7 +2193,7 @@ impl Database { last_checksum_and_max_frame: ((u32, u32), u64), buffer_pool: Arc, ) -> Result> { - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] if let Some(authority) = self.shared_wal_coordination()? { return Ok(Arc::new(WalFile::new_with_shared_coordination( self.io.clone(), diff --git a/core/multiprocess_tests.rs b/core/multiprocess_tests.rs index 10a15f1b85..e912b07fce 100644 --- a/core/multiprocess_tests.rs +++ b/core/multiprocess_tests.rs @@ -1,7 +1,11 @@ use super::*; -use std::os::unix::process::ExitStatusExt; use std::process::Command; +#[cfg(all(target_os = "windows", feature = "experimental_win_iocp"))] +use crate::WindowsIOCP; +#[cfg(unix)] +use std::os::unix::process::ExitStatusExt; + const MULTIPROCESS_SHM_INSERT_CHILD_TEST: &str = "multiprocess_tests::multiprocess_shm_insert_child_process"; const MULTIPROCESS_SHM_COUNT_CHILD_TEST: &str = @@ -16,6 +20,18 @@ const MULTIPROCESS_SHM_EXPECT_OPEN_FAILURE_CHILD_TEST: &str = "multiprocess_tests::multiprocess_shm_expect_open_failure_child_process"; const DEFAULT_LOCKED_DB_CHILD_TEST: &str = "multiprocess_tests::default_locked_db_child_process"; +fn multiprocess_test_io() -> Arc { + #[cfg(all(target_os = "windows", feature = "experimental_win_iocp"))] + { + Arc::new(WindowsIOCP::new().unwrap()) + } + + #[cfg(not(all(target_os = "windows", feature = "experimental_win_iocp")))] + { + Arc::new(PlatformIO::new().unwrap()) + } +} + fn count_test_rows(conn: &Arc) -> i64 { let mut stmt = conn.prepare("select count(*) from test").unwrap(); let mut count = 0; @@ -185,7 +201,7 @@ fn shared_wal_coordination_rejects_remote_filesystem_magic_values() { } #[test] -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(host_shared_wal)] fn shared_wal_coordination_path_probe_accepts_nonexistent_relative_paths() { let result = Database::path_allows_shared_wal_coordination(std::path::Path::new( "nonexistent-relative-multiprocess-wal.db", @@ -201,7 +217,7 @@ fn database_open_without_experimental_multiprocess_wal_uses_in_process_backend() let dir = tempfile::tempdir().unwrap(); let db_path = dir.path().join("coordination-default-off.db"); let db_path_str = db_path.to_str().unwrap(); - let io: Arc = Arc::new(PlatformIO::new().unwrap()); + let io: Arc = multiprocess_test_io(); let db = Database::open_file(io, db_path_str).unwrap(); let last_checksum_and_max_frame = db.shared_wal.read().last_checksum_and_max_frame(); @@ -221,7 +237,7 @@ fn database_open_without_experimental_multiprocess_wal_rejects_second_process() let dir = tempfile::tempdir().unwrap(); let db_path = dir.path().join("coordination-default-locked.db"); let db_path_str = db_path.to_str().unwrap(); - let io: Arc = Arc::new(PlatformIO::new().unwrap()); + let io: Arc = multiprocess_test_io(); let _db = Database::open_file(io, db_path_str).unwrap(); let current_exe = std::env::current_exe().unwrap(); @@ -262,7 +278,7 @@ fn readonly_open_with_experimental_multiprocess_wal_allows_missing_coordination_ let dir = tempfile::tempdir().unwrap(); let db_path = dir.path().join("readonly-multiprocess-no-tshm.db"); let db_path_str = db_path.to_str().unwrap(); - let io: Arc = Arc::new(PlatformIO::new().unwrap()); + let io: Arc = multiprocess_test_io(); { let db = Database::open_file(io.clone(), db_path_str).unwrap(); @@ -283,7 +299,7 @@ fn database_open_with_experimental_multiprocess_wal_rejects_second_default_proce .path() .join("coordination-multiprocess-parent-default-child.db"); let db_path_str = db_path.to_str().unwrap(); - let io: Arc = Arc::new(PlatformIO::new().unwrap()); + let io: Arc = multiprocess_test_io(); let _db = open_multiprocess_db(io, db_path_str).unwrap(); let current_exe = std::env::current_exe().unwrap(); @@ -309,7 +325,7 @@ fn database_open_without_experimental_multiprocess_wal_rejects_second_multiproce .path() .join("coordination-default-parent-multiprocess-child.db"); let db_path_str = db_path.to_str().unwrap(); - let io: Arc = Arc::new(PlatformIO::new().unwrap()); + let io: Arc = multiprocess_test_io(); let _db = Database::open_file(io, db_path_str).unwrap(); let current_exe = std::env::current_exe().unwrap(); @@ -333,7 +349,7 @@ fn database_open_selects_shm_wal_backend() { let dir = tempfile::tempdir().unwrap(); let db_path = dir.path().join("coordination.db"); let db_path_str = db_path.to_str().unwrap(); - let io: Arc = Arc::new(PlatformIO::new().unwrap()); + let io: Arc = multiprocess_test_io(); let db = open_multiprocess_db(io, db_path_str).unwrap(); let last_checksum_and_max_frame = db.shared_wal.read().last_checksum_and_max_frame(); @@ -352,7 +368,7 @@ fn database_open_rebuilds_from_disk_scan_when_exclusive_shm_snapshot_is_stale() let dir = tempfile::tempdir().unwrap(); let db_path = dir.path().join("coordination-stale-reopen.db"); let db_path_str = db_path.to_str().unwrap(); - let io: Arc = Arc::new(PlatformIO::new().unwrap()); + let io: Arc = multiprocess_test_io(); let db_a = open_multiprocess_db(io.clone(), db_path_str).unwrap(); let conn_a = db_a.connect().unwrap(); @@ -402,7 +418,7 @@ fn database_open_reuses_trusted_tshm_snapshot_without_disk_scan_when_no_backfill let dir = tempfile::tempdir().unwrap(); let db_path = dir.path().join("coordination-trusted-tail-reopen.db"); let db_path_str = db_path.to_str().unwrap(); - let io: Arc = Arc::new(PlatformIO::new().unwrap()); + let io: Arc = multiprocess_test_io(); let db = open_multiprocess_db(io.clone(), db_path_str).unwrap(); let conn = db.connect().unwrap(); @@ -469,7 +485,7 @@ fn database_open_rebuilds_from_disk_scan_after_partial_checkpoint_without_backfi .path() .join("coordination-partial-checkpoint-reopen-no-proof.db"); let db_path_str = db_path.to_str().unwrap(); - let io: Arc = Arc::new(PlatformIO::new().unwrap()); + let io: Arc = multiprocess_test_io(); let db = open_multiprocess_db(io.clone(), db_path_str).unwrap(); let conn = db.connect().unwrap(); @@ -547,7 +563,7 @@ fn database_open_rebuilds_from_disk_scan_after_wal_append_invalidates_backfill_p .path() .join("coordination-partial-checkpoint-reopen-stale-proof.db"); let db_path_str = db_path.to_str().unwrap(); - let io: Arc = Arc::new(PlatformIO::new().unwrap()); + let io: Arc = multiprocess_test_io(); let db = open_multiprocess_db(io.clone(), db_path_str).unwrap(); let conn = db.connect().unwrap(); @@ -619,7 +635,7 @@ fn database_open_rebuilds_from_disk_scan_after_db_header_mismatch_invalidates_ba .path() .join("coordination-partial-checkpoint-reopen-db-header-mismatch.db"); let db_path_str = db_path.to_str().unwrap(); - let io: Arc = Arc::new(PlatformIO::new().unwrap()); + let io: Arc = multiprocess_test_io(); let db = open_multiprocess_db(io.clone(), db_path_str).unwrap(); let conn = db.connect().unwrap(); @@ -692,7 +708,7 @@ fn default_locked_db_child_process() { return; }; - let io: Arc = Arc::new(PlatformIO::new().unwrap()); + let io: Arc = multiprocess_test_io(); let err = Database::open_file(io, db_path.to_str().unwrap()) .expect_err("default non-multiprocess open should stay DB-file locked across processes"); assert!( @@ -707,7 +723,7 @@ fn multiprocess_shm_expect_open_failure_child_process() { return; }; - let io: Arc = Arc::new(PlatformIO::new().unwrap()); + let io: Arc = multiprocess_test_io(); let err = open_multiprocess_db(io, db_path.to_str().unwrap()) .expect_err("multiprocess open should fail when a legacy opener already owns the DB"); assert!( @@ -722,7 +738,7 @@ fn multiprocess_shm_insert_child_process() { return; }; - let io: Arc = Arc::new(PlatformIO::new().unwrap()); + let io: Arc = multiprocess_test_io(); let db = open_multiprocess_db(io, db_path.to_str().unwrap()).unwrap(); let last_checksum_and_max_frame = db.shared_wal.read().last_checksum_and_max_frame(); let wal = db @@ -743,7 +759,7 @@ fn multiprocess_shm_insert_and_close_child_process() { return; }; - let io: Arc = Arc::new(PlatformIO::new().unwrap()); + let io: Arc = multiprocess_test_io(); let db = open_multiprocess_db(io, db_path.to_str().unwrap()).unwrap(); let last_checksum_and_max_frame = db.shared_wal.read().last_checksum_and_max_frame(); let wal = db @@ -769,7 +785,7 @@ fn multiprocess_shm_count_child_process() { .parse::() .unwrap(); - let io: Arc = Arc::new(PlatformIO::new().unwrap()); + let io: Arc = multiprocess_test_io(); let db = open_multiprocess_db(io, db_path.to_str().unwrap()).unwrap(); let last_checksum_and_max_frame = db.shared_wal.read().last_checksum_and_max_frame(); let wal = db @@ -797,7 +813,7 @@ fn multiprocess_shm_hold_read_tx_child_process() { let readonly = std::env::var_os("TURSO_MULTIPROCESS_READONLY").is_some(); let expect_disk_scan = std::env::var_os("TURSO_MULTIPROCESS_EXPECT_DISK_SCAN").is_some(); - let io: Arc = Arc::new(PlatformIO::new().unwrap()); + let io: Arc = multiprocess_test_io(); let db = if readonly { open_multiprocess_db_with_flags(io, db_path.to_str().unwrap(), OpenFlags::ReadOnly).unwrap() } else { @@ -835,7 +851,7 @@ fn multiprocess_shm_schema_child_process() { return; }; - let io: Arc = Arc::new(PlatformIO::new().unwrap()); + let io: Arc = multiprocess_test_io(); let db = open_multiprocess_db(io, db_path.to_str().unwrap()).unwrap(); let last_checksum_and_max_frame = db.shared_wal.read().last_checksum_and_max_frame(); let wal = db @@ -857,7 +873,7 @@ fn subprocess_database_open_selects_multiprocess_shm_backend() { let dir = tempfile::tempdir().unwrap(); let db_path = dir.path().join("coordination-subprocess.db"); let db_path_str = db_path.to_str().unwrap(); - let io: Arc = Arc::new(PlatformIO::new().unwrap()); + let io: Arc = multiprocess_test_io(); let db = open_multiprocess_db(io, db_path_str).unwrap(); let last_checksum_and_max_frame = db.shared_wal.read().last_checksum_and_max_frame(); let wal = db @@ -909,7 +925,7 @@ fn subprocess_child_close_skips_shutdown_checkpoint_in_multiprocess_wal_mode() { let dir = tempfile::tempdir().unwrap(); let db_path = dir.path().join("close-skip-shutdown-checkpoint.db"); let db_path_str = db_path.to_str().unwrap(); - let io: Arc = Arc::new(PlatformIO::new().unwrap()); + let io: Arc = multiprocess_test_io(); let db = open_multiprocess_db(io, db_path_str).unwrap(); let conn = db.connect().unwrap(); @@ -958,7 +974,7 @@ fn subprocess_database_open_survives_truncate_rewrite_cycles() { let dir = tempfile::tempdir().unwrap(); let db_path = dir.path().join("coordination-truncate-rewrite.db"); let db_path_str = db_path.to_str().unwrap(); - let io: Arc = Arc::new(PlatformIO::new().unwrap()); + let io: Arc = multiprocess_test_io(); let db = open_multiprocess_db(io, db_path_str).unwrap(); let conn = db.connect().unwrap(); conn.execute("create table test(id integer primary key, value text)") @@ -1021,7 +1037,7 @@ fn subprocess_database_open_peer_refreshes_remote_schema_without_reopen() { let dir = tempfile::tempdir().unwrap(); let db_path = dir.path().join("coordination-peer-schema-refresh.db"); let db_path_str = db_path.to_str().unwrap(); - let io: Arc = Arc::new(PlatformIO::new().unwrap()); + let io: Arc = multiprocess_test_io(); let db = open_multiprocess_db(io, db_path_str).unwrap(); let conn = db.connect().unwrap(); conn.execute("create table t(value integer, next_value integer)") @@ -1084,7 +1100,7 @@ fn subprocess_database_open_parent_directly_uses_child_created_table() { let dir = tempfile::tempdir().unwrap(); let db_path = dir.path().join("coordination-direct-child-table-use.db"); let db_path_str = db_path.to_str().unwrap(); - let io: Arc = Arc::new(PlatformIO::new().unwrap()); + let io: Arc = multiprocess_test_io(); let db = open_multiprocess_db(io, db_path_str).unwrap(); let conn = db.connect().unwrap(); conn.execute("create table t(value integer)").unwrap(); @@ -1121,7 +1137,7 @@ fn subprocess_database_open_parent_execute_uses_child_created_table() { let dir = tempfile::tempdir().unwrap(); let db_path = dir.path().join("coordination-execute-child-table-use.db"); let db_path_str = db_path.to_str().unwrap(); - let io: Arc = Arc::new(PlatformIO::new().unwrap()); + let io: Arc = multiprocess_test_io(); let db = open_multiprocess_db(io, db_path_str).unwrap(); let conn = db.connect().unwrap(); conn.execute("create table t(value integer)").unwrap(); @@ -1160,7 +1176,7 @@ fn subprocess_readonly_child_reader_blocks_restart_and_truncate_checkpoints() { let ready_file = dir.path().join("child-ready"); let release_file = dir.path().join("child-release"); let db_path_str = db_path.to_str().unwrap(); - let io: Arc = Arc::new(PlatformIO::new().unwrap()); + let io: Arc = multiprocess_test_io(); let db = open_multiprocess_db(io, db_path_str).unwrap(); let conn = db.connect().unwrap(); @@ -1254,7 +1270,7 @@ fn subprocess_readonly_disk_scan_child_reader_stays_in_shared_coordination() { let ready_file = dir.path().join("child-ready"); let release_file = dir.path().join("child-release"); let db_path_str = db_path.to_str().unwrap(); - let io: Arc = Arc::new(PlatformIO::new().unwrap()); + let io: Arc = multiprocess_test_io(); let db = open_multiprocess_db(io.clone(), db_path_str).unwrap(); let conn = db.connect().unwrap(); @@ -1361,7 +1377,7 @@ fn subprocess_readonly_disk_scan_child_reader_stays_in_shared_coordination() { "disk-scan read-only child should exit cleanly after releasing its read transaction: {child_status:?}" ); - let reopened = open_multiprocess_db(Arc::new(PlatformIO::new().unwrap()), db_path_str).unwrap(); + let reopened = open_multiprocess_db(multiprocess_test_io(), db_path_str).unwrap(); let reopened_conn = reopened.connect().unwrap(); let checkpoint = reopened_conn .checkpoint(CheckpointMode::Truncate { @@ -1386,7 +1402,7 @@ fn subprocess_database_truncate_checkpoint_reclaims_dead_child_reader_slot() { let ready_file = dir.path().join("child-ready"); let release_file = dir.path().join("child-release"); let db_path_str = db_path.to_str().unwrap(); - let io: Arc = Arc::new(PlatformIO::new().unwrap()); + let io: Arc = multiprocess_test_io(); let db = open_multiprocess_db(io, db_path_str).unwrap(); let conn = db.connect().unwrap(); conn.execute("create table test(id integer primary key, value text)") @@ -1436,11 +1452,17 @@ fn subprocess_database_truncate_checkpoint_reclaims_dead_child_reader_slot() { child.kill().unwrap(); let child_status = child.wait().unwrap(); + #[cfg(unix)] assert_eq!( child_status.signal(), Some(libc::SIGKILL), "expected killed child process to exit via SIGKILL, got {child_status:?}" ); + #[cfg(windows)] + assert!( + !child_status.success(), + "expected killed child process to exit unsuccessfully on Windows, got {child_status:?}" + ); let checkpoint = conn .checkpoint(CheckpointMode::Truncate { @@ -1468,7 +1490,7 @@ fn subprocess_database_truncate_checkpoint_reclaims_dead_child_reader_slot() { manager.clear(); drop(manager); - let io: Arc = Arc::new(PlatformIO::new().unwrap()); + let io: Arc = multiprocess_test_io(); let reopened = open_multiprocess_db(io, db_path_str).unwrap(); let reopened_conn = reopened.connect().unwrap(); assert_eq!( @@ -1485,7 +1507,7 @@ fn database_open_reopen_with_live_child_reader_does_not_clobber_authority() { let ready_file = dir.path().join("child-ready"); let release_file = dir.path().join("child-release"); let db_path_str = db_path.to_str().unwrap(); - let io: Arc = Arc::new(PlatformIO::new().unwrap()); + let io: Arc = multiprocess_test_io(); let db = open_multiprocess_db(io.clone(), db_path_str).unwrap(); let conn = db.connect().unwrap(); @@ -1584,7 +1606,7 @@ fn database_open_rebuilds_from_disk_scan_when_shared_frame_index_overflowed() { .path() .join("coordination-overflowed-frame-index-reopen.db"); let db_path_str = db_path.to_str().unwrap(); - let io: Arc = Arc::new(PlatformIO::new().unwrap()); + let io: Arc = multiprocess_test_io(); let db_a = open_multiprocess_db(io.clone(), db_path_str).unwrap(); let conn_a = db_a.connect().unwrap(); diff --git a/core/storage/mod.rs b/core/storage/mod.rs index d128bb2ea6..6a8195c891 100644 --- a/core/storage/mod.rs +++ b/core/storage/mod.rs @@ -19,7 +19,7 @@ pub(crate) mod journal_mode; pub(crate) mod page_cache; #[allow(clippy::arc_with_non_send_sync)] pub(crate) mod pager; -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(host_shared_wal)] #[allow(dead_code)] pub(crate) mod shared_wal_coordination; #[allow(dead_code)] diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 42aa7c74b2..47d92549ec 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -5504,7 +5504,7 @@ mod ptrmap_tests { } } -#[cfg(all(test, feature = "fs", unix, target_pointer_width = "64"))] +#[cfg(all(test, feature = "fs", host_shared_wal))] mod checkpoint_phase_tests { use super::*; use crate::io::{PlatformIO, IO}; diff --git a/core/storage/shared_wal_coordination.rs b/core/storage/shared_wal_coordination.rs index 161329fdd3..217cf39607 100644 --- a/core/storage/shared_wal_coordination.rs +++ b/core/storage/shared_wal_coordination.rs @@ -335,12 +335,16 @@ fn next_shared_owner_instance_id() -> u32 { } } -/// Check whether a process is still running via `kill(pid, 0)`. -/// Signal 0 is a no-op probe: the kernel checks permissions and existence -/// without delivering a signal. Returns true if the process exists (or we -/// lack permission to signal it — EPERM means it's alive but owned by -/// another user). False-negatives are impossible; false-positives can occur -/// if the PID has been recycled by an unrelated process. +/// Check whether a process is still running. +/// +/// On Unix this uses `kill(pid, 0)`, which is a no-op probe that checks +/// permissions and existence without delivering a signal. On Windows this +/// uses `OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION)` and +/// `GetExitCodeProcess`. +/// +/// False-negatives are avoided; false-positives are still possible if a PID +/// has been recycled by an unrelated process. +#[cfg(unix)] fn pid_is_alive(pid: u32) -> bool { if pid == 0 || pid > i32::MAX as u32 { return false; @@ -352,6 +356,32 @@ fn pid_is_alive(pid: u32) -> bool { std::io::Error::last_os_error().raw_os_error() == Some(libc::EPERM) } +#[cfg(target_os = "windows")] +fn pid_is_alive(pid: u32) -> bool { + use windows_sys::Win32::Foundation::{CloseHandle, ERROR_ACCESS_DENIED, FALSE, STILL_ACTIVE}; + use windows_sys::Win32::System::Threading::{ + GetExitCodeProcess, OpenProcess, PROCESS_QUERY_LIMITED_INFORMATION, + }; + + if pid == 0 { + return false; + } + + let process = unsafe { OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, FALSE, pid) }; + if process.is_null() { + return std::io::Error::last_os_error().raw_os_error() == Some(ERROR_ACCESS_DENIED as i32); + } + + let mut exit_code = 0u32; + let result = unsafe { GetExitCodeProcess(process, &raw mut exit_code) }; + let close_result = unsafe { CloseHandle(process) }; + turso_assert!( + close_result != FALSE, + "failed to close Windows process handle" + ); + result != FALSE && exit_code == STILL_ACTIVE as u32 +} + /// Serializable snapshot of the authoritative shared WAL coordination state. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(crate) struct SharedWalCoordinationHeader { @@ -2901,7 +2931,9 @@ impl MappedSharedWalCoordination { #[cfg(test)] mod tests { use super::*; - use crate::io::{PlatformIO, IO}; + #[cfg(not(all(target_os = "windows", feature = "experimental_win_iocp")))] + use crate::io::PlatformIO; + use crate::io::IO; use std::sync::Arc; fn colliding_page_ids(count: usize) -> Vec { @@ -2919,7 +2951,15 @@ mod tests { } fn test_shared_wal_io() -> Arc { - Arc::new(PlatformIO::new().unwrap()) + #[cfg(all(target_os = "windows", feature = "experimental_win_iocp"))] + { + Arc::new(crate::WindowsIOCP::new().unwrap()) + } + + #[cfg(not(all(target_os = "windows", feature = "experimental_win_iocp")))] + { + Arc::new(PlatformIO::new().unwrap()) + } } fn create_mapping(path: &Path) -> MappedSharedWalCoordination { @@ -2936,16 +2976,31 @@ mod tests { } fn exited_child_pid() -> u32 { - let child = unsafe { libc::fork() }; - assert!(child >= 0, "fork failed"); - if child == 0 { - unsafe { libc::_exit(0) }; + #[cfg(unix)] + { + let child = unsafe { libc::fork() }; + assert!(child >= 0, "fork failed"); + if child == 0 { + unsafe { libc::_exit(0) }; + } + let mut status: libc::c_int = 0; + let waited = unsafe { libc::waitpid(child, &mut status, 0) }; + assert_eq!(waited, child, "waitpid failed"); + assert!(libc::WIFEXITED(status), "child did not exit cleanly"); + return child as u32; + } + + #[cfg(windows)] + { + let mut child = std::process::Command::new("cmd") + .args(["/C", "exit", "0"]) + .spawn() + .expect("spawn exited child"); + let pid = child.id(); + let status = child.wait().expect("wait exited child"); + assert!(status.success(), "child did not exit cleanly"); + pid } - let mut status: libc::c_int = 0; - let waited = unsafe { libc::waitpid(child, &mut status, 0) }; - assert_eq!(waited, child, "waitpid failed"); - assert!(libc::WIFEXITED(status), "child did not exit cleanly"); - child as u32 } #[test] diff --git a/core/storage/wal.rs b/core/storage/wal.rs index f823c6add6..0942336e40 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -28,9 +28,9 @@ use crate::io::clock::MonotonicInstant; use crate::io::CompletionGroup; use crate::io::{File, IO}; use crate::storage::database::{DatabaseStorage, EncryptionOrChecksum}; -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(host_shared_wal)] use crate::storage::shared_wal_coordination::SharedWalCoordinationOpenMode; -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(host_shared_wal)] use crate::storage::shared_wal_coordination::{ MappedSharedWalCoordination, SharedOwnerRecord, SharedReaderSlot, SharedWalCoordinationHeader, }; @@ -108,7 +108,7 @@ impl CheckpointResult { } } -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(host_shared_wal)] pub(crate) fn coordination_path_for_wal_path(wal_path: &str) -> String { if let Some(db_path) = wal_path.strip_suffix("-wal") { format!("{db_path}-tshm") @@ -1162,7 +1162,7 @@ impl WalCoordination for InProcessWalCoordination { /// cross-process shared state (reader slots, frame index, snapshot metadata). /// Both are consulted: the fallback serializes same-process connections, the /// authority serializes across processes. -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(host_shared_wal)] #[derive(Debug)] struct ShmWalCoordination { shared: Arc>, @@ -1175,7 +1175,7 @@ struct ShmWalCoordination { owner: SharedOwnerRecord, } -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(host_shared_wal)] impl ShmWalCoordination { fn overflow_fallback_covers( &self, @@ -1643,7 +1643,7 @@ impl ShmWalCoordination { } } -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(host_shared_wal)] impl WalCoordination for ShmWalCoordination { fn load_snapshot(&self) -> WalSnapshot { let snapshot = self.authority.snapshot(); @@ -2558,7 +2558,7 @@ impl OverflowFallbackCoverage { self.valid = true; } - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] pub(crate) fn record_snapshot( &mut self, snapshot: SharedWalCoordinationHeader, @@ -2572,7 +2572,7 @@ impl OverflowFallbackCoverage { ); } - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] pub(crate) fn same_generation(&self, snapshot: SharedWalCoordinationHeader) -> bool { self.valid && self.checkpoint_seq == snapshot.checkpoint_seq @@ -2580,7 +2580,7 @@ impl OverflowFallbackCoverage { && self.salt_2 == snapshot.salt_2 } - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] pub(crate) fn covers(&self, snapshot: SharedWalCoordinationHeader, max_frame: u64) -> bool { self.same_generation(snapshot) && self.max_frame >= max_frame } @@ -3725,7 +3725,7 @@ impl Wal for WalFile { } impl WalFile { - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] pub(crate) fn new_with_shared_coordination( io: Arc, shared: Arc>, @@ -4378,7 +4378,7 @@ impl WalFile { } } -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(host_shared_wal)] fn read_exact_bytes_from_file( io: &Arc, file: &Arc, @@ -4406,7 +4406,7 @@ fn read_exact_bytes_from_file( Ok(Some(read_buf.as_slice()[..len].to_vec())) } -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(host_shared_wal)] fn read_validated_wal_header_from_file( io: &Arc, file: &Arc, @@ -4443,7 +4443,7 @@ fn read_validated_wal_header_from_file( Ok(Some(header)) } -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(host_shared_wal)] fn wal_header_matches_authority_snapshot( wal_header: WalHeader, snapshot: SharedWalCoordinationHeader, @@ -4494,7 +4494,7 @@ fn read_database_identity_from_storage( )?)) } -#[cfg(all(test, unix, target_pointer_width = "64"))] +#[cfg(all(test, host_shared_wal))] fn read_database_identity_from_file_path( io: &Arc, wal_path: &str, @@ -4517,14 +4517,14 @@ fn read_database_identity_from_file_path( Ok(Some(database_identity_from_header_bytes(&bytes)?)) } -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(host_shared_wal)] #[derive(Debug, Clone, Copy, PartialEq, Eq)] enum AuthoritySnapshotValidation { Trusted, RebuildFromDisk(AuthoritySnapshotRebuildReason), } -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(host_shared_wal)] #[derive(Debug, Clone, Copy, PartialEq, Eq)] enum AuthoritySnapshotRebuildReason { WalHeaderUnreadable, @@ -4537,7 +4537,7 @@ enum AuthoritySnapshotRebuildReason { LastFrameChecksumMismatch, } -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(host_shared_wal)] fn classify_authority_snapshot_against_wal( io: &Arc, file: &Arc, @@ -4631,7 +4631,7 @@ impl WalFileShared { ) } - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] pub(crate) fn open_shared_from_authority_if_exists( io: &Arc, path: &str, @@ -4908,7 +4908,7 @@ impl WalFileShared { #[cfg(test)] pub mod test { - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] use super::{ classify_authority_snapshot_against_wal, AuthoritySnapshotRebuildReason, AuthoritySnapshotValidation, ShmWalCoordination, @@ -4917,7 +4917,7 @@ pub mod test { InProcessWalCoordination, ReadGuardKind, Wal, WalCommitState, WalConnectionState, WalCoordination, WalFile, WalSnapshot, }; - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] use crate::storage::shared_wal_coordination::{ MappedSharedWalCoordination, SharedWalCoordinationHeader, SharedWalCoordinationOpenMode, }; @@ -5242,7 +5242,7 @@ pub mod test { InProcessWalCoordination::new(shared.clone()) } - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] fn make_test_shm_coordination( shared: &Arc>, path: &std::path::Path, @@ -5254,7 +5254,7 @@ pub mod test { (authority, coordination) } - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] fn active_shared_reader_slot_count(authority: &MappedSharedWalCoordination) -> usize { let reader_slot_count = authority.snapshot().reader_slot_count; (0..reader_slot_count) @@ -5262,7 +5262,7 @@ pub mod test { .count() } - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] fn write_test_wal_with_single_commit_frame( io: &Arc, wal_path: &std::path::Path, @@ -5343,7 +5343,7 @@ pub mod test { } } - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] fn open_test_db_file_for_wal( io: &Arc, wal_path: &std::path::Path, @@ -5356,7 +5356,7 @@ pub mod test { } #[test] - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] fn test_read_frame_keeps_epoch_from_issue_time() { let dir = tempfile::tempdir().unwrap(); let wal_path = dir.path().join("epoch-race.db-wal"); @@ -5652,7 +5652,7 @@ pub mod test { coordination.end_write_tx(); } - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] #[test] fn test_shm_coordination_uses_shared_authority() { let dir = tempfile::tempdir().unwrap(); @@ -5741,7 +5741,7 @@ pub mod test { assert!(shm_path.exists()); } - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] #[test] fn test_shm_coordination_many_same_snapshot_readers_share_one_published_slot() { let dir = tempfile::tempdir().unwrap(); @@ -5798,7 +5798,7 @@ pub mod test { assert_eq!(active_shared_reader_slot_count(&authority), 0); } - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] #[test] fn test_shm_coordination_uses_one_published_slot_per_active_snapshot_generation() { let dir = tempfile::tempdir().unwrap(); @@ -5882,7 +5882,7 @@ pub mod test { assert_eq!(active_shared_reader_slot_count(&authority), 0); } - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] #[test] fn test_shm_coordination_shared_index_grows_past_old_fixed_limit() { const OLD_FIXED_LIMIT: u64 = 65_536; @@ -5939,7 +5939,7 @@ pub mod test { .contains(&(7, OLD_FIXED_LIMIT + 2))); } - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] #[test] fn test_shm_coordination_restart_uses_authority_snapshot() { let dir = tempfile::tempdir().unwrap(); @@ -6050,7 +6050,7 @@ pub mod test { assert!(shared.runtime.frame_cache.lock().is_empty()); } - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] #[test] fn test_shm_coordination_exclusive_reopen_reuses_persisted_authority() { let dir = tempfile::tempdir().unwrap(); @@ -6120,7 +6120,7 @@ pub mod test { assert_eq!(reopened_authority.min_active_reader_frame(), None); } - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] #[test] fn test_open_shared_from_authority_reuses_trusted_snapshot_after_exclusive_reopen() { let dir = tempfile::tempdir().unwrap(); @@ -6173,7 +6173,7 @@ pub mod test { assert!(shared.runtime.frame_cache.lock().is_empty()); } - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] #[test] fn test_shm_coordination_live_overflow_returns_busy_without_runtime_disk_scan() { let dir = tempfile::tempdir().unwrap(); @@ -6227,7 +6227,7 @@ pub mod test { ); } - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] #[test] fn test_open_shared_from_authority_exclusive_rebuilds_positive_snapshot_from_disk() { let dir = tempfile::tempdir().unwrap(); @@ -6278,7 +6278,7 @@ pub mod test { ); } - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] #[test] fn test_shared_coordination_open_uses_reconciled_snapshot_for_local_wal_state() { let dir = tempfile::tempdir().unwrap(); @@ -6319,7 +6319,7 @@ pub mod test { assert_eq!(wal.get_last_checksum(), (31, 37)); } - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] #[test] fn test_open_shared_from_authority_rebuilds_from_disk_when_snapshot_is_stale() { let dir = tempfile::tempdir().unwrap(); @@ -6369,7 +6369,7 @@ pub mod test { ); } - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] #[test] fn test_open_shared_from_authority_rebuilt_authority_persists_across_exclusive_reopen() { let dir = tempfile::tempdir().unwrap(); @@ -6430,7 +6430,7 @@ pub mod test { assert_eq!(reopened_coordination.find_frame(7, 0, 1, None), Some(1)); } - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] #[test] fn test_open_shared_from_authority_exclusive_disk_scan_does_not_downgrade_newer_zero_frame_generation( ) { @@ -6490,7 +6490,7 @@ pub mod test { ); } - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] #[test] fn test_open_shared_from_authority_ignores_unpublished_backfill_proof_after_exclusive_reopen() { let dir = tempfile::tempdir().unwrap(); @@ -6536,7 +6536,7 @@ pub mod test { .load(Ordering::Acquire)); } - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] #[test] fn test_restart_checkpoint_clears_backfill_proof_and_later_replaces_it() { let (db, path) = get_database(); @@ -6624,7 +6624,7 @@ pub mod test { ); } - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] #[test] fn test_truncate_checkpoint_clears_backfill_proof_and_later_replaces_it() { let (db, path) = get_database(); @@ -6722,7 +6722,7 @@ pub mod test { ); } - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] #[test] fn test_classify_authority_snapshot_marks_truncated_wal_for_rebuild() { let dir = tempfile::tempdir().unwrap(); @@ -6752,7 +6752,7 @@ pub mod test { ); } - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] #[test] fn test_classify_authority_snapshot_marks_corrupt_header_for_rebuild() { let dir = tempfile::tempdir().unwrap(); @@ -6789,7 +6789,7 @@ pub mod test { ); } - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] #[test] fn test_open_shared_from_authority_keeps_zero_length_wal_uninitialized_after_exclusive_reopen() { @@ -6844,7 +6844,7 @@ pub mod test { .load(Ordering::Acquire)); } - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] #[test] fn test_shm_coordination_secondary_disk_scan_does_not_reseed_authority_while_writer_active() { let dir = tempfile::tempdir().unwrap(); @@ -6917,7 +6917,7 @@ pub mod test { authority.release_writer(authority.owner_record()); } - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] #[test] fn test_shm_coordination_disk_scan_matching_authority_keeps_frame_index() { let dir = tempfile::tempdir().unwrap(); @@ -6991,7 +6991,7 @@ pub mod test { ); } - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] #[test] fn test_shm_coordination_disk_scan_matching_snapshot_rebuilds_stale_frame_index() { let dir = tempfile::tempdir().unwrap(); @@ -7070,7 +7070,7 @@ pub mod test { ); } - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] #[test] fn test_shm_coordination_empty_disk_scan_keeps_zero_frame_authority_metadata() { let dir = tempfile::tempdir().unwrap(); @@ -7118,7 +7118,7 @@ pub mod test { assert_eq!(header.salt_2, authoritative.salt_2); } - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] #[test] fn test_shm_coordination_empty_disk_scan_does_not_clobber_positive_authority() { let dir = tempfile::tempdir().unwrap(); @@ -7185,7 +7185,7 @@ pub mod test { ); } - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] #[test] fn test_shm_zero_frame_authority_invalidates_stale_local_initialized_state() { let dir = tempfile::tempdir().unwrap(); @@ -7266,7 +7266,7 @@ pub mod test { ); } - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] #[test] fn test_shm_prepare_wal_header_seeds_uninitialized_authority_from_prepared_header() { let dir = tempfile::tempdir().unwrap(); @@ -7299,7 +7299,7 @@ pub mod test { assert_eq!(snapshot.salt_2, prepared.salt_2); } - #[cfg(all(unix, target_pointer_width = "64"))] + #[cfg(host_shared_wal)] #[test] fn test_shm_prepare_wal_header_does_not_clobber_zero_frame_authority_snapshot() { let dir = tempfile::tempdir().unwrap(); diff --git a/docs/manual.md b/docs/manual.md index bdd62bd0cd..3d29fea5e7 100644 --- a/docs/manual.md +++ b/docs/manual.md @@ -156,7 +156,7 @@ The SQL shell supports the following command line options: | `--mcp` | Start a MCP server instead of the interactive shell | | `--experimental-encryption` | Enable experimental encryption at rest feature. **Note:** the feature is not production ready so do not use it for critical data right now. | | `--experimental-views` | Enable experimental views feature. **Note**: the feature is not production ready so do not use it for critical data right now. | -| `--experimental-multiprocess-wal` | Enable experimental multi-process WAL coordination via the `.tshm` sidecar. **Note:** the feature is not production ready so do not use it for critical data right now. | +| `--experimental-multiprocess-wal` | Enable experimental multi-process WAL coordination via the `.tshm` sidecar. Supported on 64-bit Unix, and on 64-bit Windows when used with `--vfs experimental_win_iocp`. **Note:** the feature is not production ready so do not use it for critical data right now. | ## Transactions diff --git a/testing/concurrent-simulator/Cargo.toml b/testing/concurrent-simulator/Cargo.toml index 42a445d731..8deac3b995 100644 --- a/testing/concurrent-simulator/Cargo.toml +++ b/testing/concurrent-simulator/Cargo.toml @@ -22,6 +22,10 @@ path = "main.rs" name = "regression_tests" path = "regression_tests.rs" +[[test]] +name = "regression_tests_cross_platform" +path = "regression_tests_cross_platform.rs" + [dependencies] anyhow.workspace = true clap = { workspace = true, features = ["derive"] } @@ -31,7 +35,7 @@ rand_chacha = { workspace = true } sql_generation = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter"] } -turso_core = { workspace = true, features = ["simulator", "fs"]} +turso_core = { workspace = true, features = ["simulator", "fs", "experimental_win_iocp"]} turso_parser = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } diff --git a/testing/concurrent-simulator/lib.rs b/testing/concurrent-simulator/lib.rs index f2c3671c4e..1e7dabce78 100644 --- a/testing/concurrent-simulator/lib.rs +++ b/testing/concurrent-simulator/lib.rs @@ -13,6 +13,8 @@ use std::collections::{BTreeMap, HashMap}; use std::ops::Bound; use std::sync::Arc; use tracing::{debug, error, trace}; +#[cfg(target_os = "windows")] +use turso_core::WindowsIOCP; use turso_core::{ CipherMode, Connection, Database, DatabaseOpts, EncryptionOpts, IO, OpenFlags, Statement, Value, }; @@ -21,13 +23,13 @@ use turso_parser::ast::{ColumnConstraint, SortOrder}; pub mod chaotic_elle; pub mod elle; mod io; -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(all(any(unix, target_os = "windows"), target_pointer_width = "64"))] pub mod multiprocess; pub mod operations; pub mod properties; -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(all(any(unix, target_os = "windows"), target_pointer_width = "64"))] pub mod protocol; -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(all(any(unix, target_os = "windows"), target_pointer_width = "64"))] pub mod worker; pub mod workloads; mod yield_injection; @@ -38,6 +40,19 @@ use crate::{ properties::Property, workloads::{Workload, WorkloadContext}, }; + +#[cfg(all(any(unix, target_os = "windows"), target_pointer_width = "64"))] +pub fn multiprocess_platform_io() -> anyhow::Result> { + #[cfg(target_os = "windows")] + { + return Ok(Arc::new(WindowsIOCP::new()?)); + } + + #[cfg(unix)] + { + Ok(Arc::new(turso_core::PlatformIO::new()?)) + } +} pub use io::{IOFaultConfig, SimulatorIO}; pub use operations::{FiberState, OpContext, OpResult, Operation, TxMode}; use yield_injection::{SimulatorYieldInjector, fiber_yield_seed}; diff --git a/testing/concurrent-simulator/main.rs b/testing/concurrent-simulator/main.rs index 153ecde8d5..386851fc76 100644 --- a/testing/concurrent-simulator/main.rs +++ b/testing/concurrent-simulator/main.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use clap::{Parser, Subcommand, ValueEnum}; use rand::{Rng, RngCore}; use tracing_subscriber::{EnvFilter, layer::SubscriberExt, util::SubscriberInitExt}; -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(all(any(unix, target_os = "windows"), target_pointer_width = "64"))] use turso_whopper::multiprocess::{MultiprocessOpts, MultiprocessWhopper}; use turso_whopper::{ StepResult, Whopper, WhopperOpts, @@ -106,8 +106,20 @@ fn main() -> anyhow::Result<()> { connections_per_process, }) = &args.subcommand { - #[cfg(all(unix, target_pointer_width = "64"))] - return turso_whopper::worker::run_worker(db_path, *enable_mvcc, *connections_per_process); + #[cfg(all(any(unix, target_os = "windows"), target_pointer_width = "64"))] + { + return turso_whopper::worker::run_worker( + db_path, + *enable_mvcc, + *connections_per_process, + ); + } + #[cfg(not(all(any(unix, target_os = "windows"), target_pointer_width = "64")))] + { + return Err(anyhow::anyhow!( + "worker mode is only supported on 64-bit Unix and Windows hosts" + )); + } } init_logger(); @@ -125,14 +137,13 @@ fn main() -> anyhow::Result<()> { println!("seed = {seed}"); if args.multiprocess { - #[cfg(all(unix, target_pointer_width = "64"))] return run_multiprocess(&args, seed); } run_inprocess(&args, seed) } -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(all(any(unix, target_os = "windows"), target_pointer_width = "64"))] fn run_multiprocess(args: &Args, seed: u64) -> anyhow::Result<()> { if args.enable_mvcc { eprintln!("MVCC mode not yet supported with multiprocess mode"); @@ -218,6 +229,13 @@ fn run_multiprocess(args: &Args, seed: u64) -> anyhow::Result<()> { Ok(()) } +#[cfg(not(all(any(unix, target_os = "windows"), target_pointer_width = "64")))] +fn run_multiprocess(_args: &Args, _seed: u64) -> anyhow::Result<()> { + Err(anyhow::anyhow!( + "multiprocess mode is only supported on 64-bit Unix and Windows hosts" + )) +} + fn run_inprocess(args: &Args, seed: u64) -> anyhow::Result<()> { let opts = build_inprocess_opts(args, seed)?; diff --git a/testing/concurrent-simulator/multiprocess.rs b/testing/concurrent-simulator/multiprocess.rs index b46f39af50..0c219aefdb 100644 --- a/testing/concurrent-simulator/multiprocess.rs +++ b/testing/concurrent-simulator/multiprocess.rs @@ -3,14 +3,14 @@ //! Spawns N worker processes, each opening the same on-disk database with real //! filesystem I/O. Reuses the existing workload generation and property //! validation infrastructure while exercising the full multiprocess coordination -//! stack (OFD locks, .tshm shared memory, WAL reader slots, MVCC tx slots). +//! stack (shared byte-range locks, .tshm shared memory, WAL reader slots, MVCC +//! tx slots). use serde::Serialize; use std::fs::{File, create_dir_all}; use std::io::{BufRead, BufReader, BufWriter, Write}; use std::path::{Path, PathBuf}; use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio}; -use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering}; use std::sync::mpsc::{self, Receiver, RecvTimeoutError}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; @@ -19,7 +19,7 @@ use rand::{Rng, RngCore, SeedableRng}; use rand_chacha::ChaCha8Rng; use sql_generation::generation::Opts; use tracing::{debug, error, info}; -use turso_core::{Database, DatabaseOpts, LimboError, OpenFlags, UnixIO}; +use turso_core::{Database, DatabaseOpts, LimboError, OpenFlags}; use crate::chaotic_elle::{ChaoticWorkload, ChaoticWorkloadProfile}; use crate::operations::{FiberState, OpResult, Operation, TxMode}; @@ -28,7 +28,10 @@ use crate::protocol::{ WorkerCommand, WorkerResponse, WorkerSharedWalSnapshot, WorkerStartupTelemetry, }; use crate::workloads::{Workload, WorkloadContext}; -use crate::{SimulatorState, Stats, StepResult, create_initial_indexes, create_initial_schema}; +use crate::{ + SimulatorState, Stats, StepResult, create_initial_indexes, create_initial_schema, + multiprocess_platform_io, +}; /// Configuration for the multiprocess simulator. pub struct MultiprocessOpts { @@ -249,8 +252,8 @@ impl MultiprocessWhopper { .expect("system clock is before UNIX_EPOCH") .as_nanos(); let counter = PATH_COUNTER.fetch_add(1, AtomicOrdering::Relaxed); - let db_path = PathBuf::from(format!( - "/tmp/whopper-mp-{}-{}-{}-{}.db", + let db_path = std::env::temp_dir().join(format!( + "whopper-mp-{}-{}-{}-{}.db", seed, std::process::id(), unique_suffix, @@ -260,7 +263,7 @@ impl MultiprocessWhopper { // Bootstrap schema using real I/O { - let io = Arc::new(UnixIO::new()?); + let io = multiprocess_platform_io()?; let db = Database::open_file_with_flags( io, db_path.to_str().unwrap(), @@ -1255,7 +1258,19 @@ fn send_command(process: &mut WorkerProcessHandle, cmd: &WorkerCommand) -> anyho /// Receive a response from a worker. fn recv_response(process: &mut WorkerProcessHandle) -> anyhow::Result { - recv_response_timeout(process, Duration::from_secs(10)) + recv_response_timeout(process, worker_response_timeout()) +} + +fn worker_response_timeout() -> Duration { + #[cfg(target_os = "windows")] + { + Duration::from_secs(30) + } + + #[cfg(not(target_os = "windows"))] + { + Duration::from_secs(10) + } } fn recv_response_timeout( @@ -1342,6 +1357,22 @@ mod tests { use super::WorkerProcessHandle; + fn placeholder_child_command() -> Command { + #[cfg(windows)] + { + let mut cmd = Command::new("cmd"); + cmd.args(["/C", "timeout", "/T", "60", "/NOBREAK"]); + cmd + } + + #[cfg(not(windows))] + { + let mut cmd = Command::new("sleep"); + cmd.arg("60"); + cmd + } + } + fn history_output_path(label: &str) -> std::path::PathBuf { std::env::temp_dir().join(format!( "turso-whopper-history-{label}-{}-{}.jsonl", @@ -1360,8 +1391,7 @@ mod tests { #[test] fn recv_response_times_out_when_worker_stops_responding() { let (_tx, rx) = mpsc::channel(); - let mut child = Command::new("sleep") - .arg("60") + let mut child = placeholder_child_command() .stdin(Stdio::piped()) .spawn() .expect("failed to spawn placeholder child"); diff --git a/testing/concurrent-simulator/regression_tests.rs b/testing/concurrent-simulator/regression_tests.rs index f9ddff6ce8..8f5f9a06d9 100644 --- a/testing/concurrent-simulator/regression_tests.rs +++ b/testing/concurrent-simulator/regression_tests.rs @@ -1,14 +1,11 @@ -use rand_chacha::ChaCha8Rng; -use rand_chacha::rand_core::SeedableRng; +#![cfg(all(any(unix, target_os = "windows"), target_pointer_width = "64"))] + use std::path::Path; use std::sync::Arc; -use turso_core::{ - Connection, Database, DatabaseOpts, IO, LimboError, OpenFlags, PlatformIO, Statement, -}; -#[cfg(all(unix, target_pointer_width = "64"))] +use turso_core::{Connection, Database, DatabaseOpts, IO, LimboError, OpenFlags}; use turso_whopper::multiprocess::{MultiprocessOpts, MultiprocessWhopper}; use turso_whopper::{ - IOFaultConfig, SimulatorIO, + multiprocess_platform_io, workloads::{ BeginWorkload, CommitWorkload, CreateIndexWorkload, CreateSimpleTableWorkload, DeleteWorkload, DropIndexWorkload, IntegrityCheckWorkload, RollbackWorkload, @@ -16,17 +13,6 @@ use turso_whopper::{ }, }; -/// Helper: run a SQL statement to completion with round-robin IO stepping. -fn run_to_done(stmt: &mut Statement, io: &SimulatorIO) { - loop { - match stmt.step().expect("step") { - turso_core::StepResult::Done => return, - turso_core::StepResult::IO => io.step().expect("io step"), - _ => {} - } - } -} - fn wait_for_file(path: &Path) { let deadline = std::time::Instant::now() + std::time::Duration::from_secs(10); while std::time::Instant::now() < deadline { @@ -38,12 +24,17 @@ fn wait_for_file(path: &Path) { panic!("timed out waiting for {}", path.display()); } -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(all(any(unix, target_os = "windows"), target_pointer_width = "64"))] +fn multiprocess_test_io() -> Arc { + multiprocess_platform_io().expect("multiprocess io") +} + +#[cfg(all(any(unix, target_os = "windows"), target_pointer_width = "64"))] fn multiprocess_wal_db_opts() -> DatabaseOpts { DatabaseOpts::new().with_multiprocess_wal(true) } -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(all(any(unix, target_os = "windows"), target_pointer_width = "64"))] fn flip_db_header_reserved_byte(path: &Path) { use std::io::{Read, Seek, SeekFrom, Write}; @@ -83,7 +74,7 @@ fn flip_db_header_reserved_byte(path: &Path) { file.sync_all().expect("sync db header mutation"); } -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(all(any(unix, target_os = "windows"), target_pointer_width = "64"))] fn configure_worker_exe() { unsafe { std::env::set_var( @@ -93,12 +84,12 @@ fn configure_worker_exe() { } } -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(all(any(unix, target_os = "windows"), target_pointer_width = "64"))] fn create_multiprocess_whopper(max_connections: usize) -> MultiprocessWhopper { create_multiprocess_whopper_with_keep(max_connections, false) } -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(all(any(unix, target_os = "windows"), target_pointer_width = "64"))] fn create_multiprocess_whopper_with_shape( process_count: usize, connections_per_process: usize, @@ -106,7 +97,7 @@ fn create_multiprocess_whopper_with_shape( create_multiprocess_whopper_with_shape_and_keep(process_count, connections_per_process, false) } -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(all(any(unix, target_os = "windows"), target_pointer_width = "64"))] fn create_multiprocess_whopper_with_keep( max_connections: usize, keep_files: bool, @@ -114,7 +105,7 @@ fn create_multiprocess_whopper_with_keep( create_multiprocess_whopper_with_shape_and_keep(max_connections, 1, keep_files) } -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(all(any(unix, target_os = "windows"), target_pointer_width = "64"))] fn create_multiprocess_whopper_with_shape_and_keep( process_count: usize, connections_per_process: usize, @@ -139,7 +130,7 @@ fn create_multiprocess_whopper_with_shape_and_keep( .expect("create multiprocess whopper") } -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(all(any(unix, target_os = "windows"), target_pointer_width = "64"))] fn populate_blob_test_rows(whopper: &mut MultiprocessWhopper) { whopper .disable_auto_checkpoint_direct(0) @@ -164,7 +155,7 @@ fn populate_blob_test_rows(whopper: &mut MultiprocessWhopper) { .expect("commit should succeed"); } -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(all(any(unix, target_os = "windows"), target_pointer_width = "64"))] fn create_partial_checkpoint_state(whopper: &mut MultiprocessWhopper) -> (u64, u64) { populate_blob_test_rows(whopper); @@ -200,7 +191,7 @@ fn create_partial_checkpoint_state(whopper: &mut MultiprocessWhopper) -> (u64, u ) } -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(all(any(unix, target_os = "windows"), target_pointer_width = "64"))] fn count_test_rows(whopper: &mut MultiprocessWhopper, worker_idx: usize) -> i64 { for _ in 0..32 { let rows = whopper @@ -221,7 +212,7 @@ fn count_test_rows(whopper: &mut MultiprocessWhopper, worker_idx: usize) -> i64 panic!("count rows did not stabilize after transient multiprocess errors"); } -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(all(any(unix, target_os = "windows"), target_pointer_width = "64"))] fn truncate_checkpoint_until_stable(whopper: &mut MultiprocessWhopper, connection_idx: usize) { for _ in 0..32 { let result = whopper @@ -242,7 +233,7 @@ fn truncate_checkpoint_until_stable(whopper: &mut MultiprocessWhopper, connectio panic!("TRUNCATE checkpoint did not stabilize after transient multiprocess errors"); } -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(all(any(unix, target_os = "windows"), target_pointer_width = "64"))] fn assert_integrity_check_ok(whopper: &mut MultiprocessWhopper, worker_idx: usize) { let rows = whopper .execute_sql_direct(worker_idx, "PRAGMA integrity_check") @@ -261,7 +252,7 @@ fn assert_integrity_check_ok(whopper: &mut MultiprocessWhopper, worker_idx: usiz ); } -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(all(any(unix, target_os = "windows"), target_pointer_width = "64"))] #[test] fn multiprocess_same_process_sibling_reader_keeps_shared_snapshot_live_until_last_release() { let mut whopper = create_multiprocess_whopper_with_shape(2, 2); @@ -376,7 +367,7 @@ fn multiprocess_same_process_sibling_reader_keeps_shared_snapshot_live_until_las whopper.finalize().expect("finalize multiprocess whopper"); } -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(all(any(unix, target_os = "windows"), target_pointer_width = "64"))] fn count_rows_in_table(conn: &Arc, table_name: &str) -> i64 { let mut stmt = conn .prepare(format!("select count(*) from {table_name}")) @@ -390,9 +381,9 @@ fn count_rows_in_table(conn: &Arc, table_name: &str) -> i64 { count } -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(all(any(unix, target_os = "windows"), target_pointer_width = "64"))] fn read_simple_kv_length(db_path: &Path, table_name: &str, key: &str) -> Option { - let io: Arc = Arc::new(PlatformIO::new().expect("platform io")); + let io: Arc = multiprocess_test_io(); let reopened = Database::open_file_with_flags( io, db_path.to_str().expect("db path utf8"), @@ -437,7 +428,7 @@ fn read_simple_kv_length(db_path: &Path, table_name: &str, key: &str) -> Option< panic!("observer query did not stabilize after transient multiprocess errors"); } -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(all(any(unix, target_os = "windows"), target_pointer_width = "64"))] fn probe_optional_int_via_fresh_worker(whopper: &MultiprocessWhopper, sql: String) -> Option { for _ in 0..8 { let (_startup, result) = whopper @@ -464,7 +455,7 @@ fn probe_optional_int_via_fresh_worker(whopper: &MultiprocessWhopper, sql: Strin panic!("fresh-worker probe did not stabilize after transient multiprocess errors"); } -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(all(any(unix, target_os = "windows"), target_pointer_width = "64"))] fn probe_table_rootpage_via_fresh_worker( whopper: &MultiprocessWhopper, table_name: &str, @@ -475,7 +466,7 @@ fn probe_table_rootpage_via_fresh_worker( ) } -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(all(any(unix, target_os = "windows"), target_pointer_width = "64"))] fn probe_simple_kv_length_via_fresh_worker( whopper: &MultiprocessWhopper, table_name: &str, @@ -487,130 +478,14 @@ fn probe_simple_kv_length_via_fresh_worker( ) } -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(all(any(unix, target_os = "windows"), target_pointer_width = "64"))] fn advance_seeded_whopper_to_step(whopper: &mut MultiprocessWhopper, step_after_execution: usize) { while whopper.current_step < step_after_execution { whopper.step().expect("seeded whopper step"); } } -/// Regression test for MVCC concurrent commit yield-spin deadlock. -/// -/// Under round-robin cooperative scheduling, when two BEGIN CONCURRENT -/// transactions commit simultaneously, the VDBE must yield (return -/// StepResult::IO) when pager_commit_lock is held by the other connection. -/// -/// Before the fix in core/vdbe/mod.rs, Completion::new_yield() had -/// finished()==true, so the VDBE inner loop retried without ever returning -/// — starving the first connection and deadlocking both. -#[test] -fn test_concurrent_commit_no_yield_spin() { - let io_rng = ChaCha8Rng::seed_from_u64(42); - let fault_config = IOFaultConfig { - cosmic_ray_probability: 0.0, - }; - let io = Arc::new(SimulatorIO::new(false, io_rng, fault_config)); - - let db_path = format!("test-yield-spin-{}.db", std::process::id()); - let db = Database::open_file_with_flags( - io.clone(), - &db_path, - OpenFlags::default(), - DatabaseOpts::new(), - None, - ) - .expect("open db"); - - let setup = db.connect().expect("setup conn"); - setup - .execute("PRAGMA journal_mode = 'mvcc'") - .expect("enable mvcc"); - setup - .execute("CREATE TABLE t(id INTEGER PRIMARY KEY, v TEXT)") - .expect("create table"); - setup.close().expect("close setup"); - - let conn1 = db.connect().expect("conn1"); - let conn2 = db.connect().expect("conn2"); - - // Both connections start concurrent transactions with non-conflicting writes - let mut s = conn1.prepare("BEGIN CONCURRENT").expect("prepare"); - run_to_done(&mut s, &io); - let mut s = conn2.prepare("BEGIN CONCURRENT").expect("prepare"); - run_to_done(&mut s, &io); - - let mut s = conn1 - .prepare("INSERT INTO t VALUES (1, 'a')") - .expect("prepare"); - run_to_done(&mut s, &io); - let mut s = conn2 - .prepare("INSERT INTO t VALUES (2, 'b')") - .expect("prepare"); - run_to_done(&mut s, &io); - - // Commit both using round-robin stepping — the pattern that triggers - // the bug. Each connection gets one step() call, then IO is advanced. - let mut commit1 = conn1.prepare("COMMIT").expect("prepare commit1"); - let mut commit2 = conn2.prepare("COMMIT").expect("prepare commit2"); - - let mut done1 = false; - let mut done2 = false; - let max_steps = 10_000; - - for i in 0..max_steps { - if done1 && done2 { - break; - } - - // Round-robin: step each connection once, then advance IO - if !done1 { - match commit1.step().expect("commit1 step") { - turso_core::StepResult::Done => done1 = true, - turso_core::StepResult::IO => {} - _ => {} - } - } - io.step().expect("io step"); - - if !done2 { - match commit2.step().expect("commit2 step") { - turso_core::StepResult::Done => done2 = true, - turso_core::StepResult::IO => {} - _ => {} - } - } - io.step().expect("io step"); - - assert!( - i < max_steps - 1, - "concurrent commits did not complete within {max_steps} steps — \ - likely stuck in yield-spin loop (done1={done1}, done2={done2})" - ); - } - - assert!(done1, "commit1 should have completed"); - assert!(done2, "commit2 should have completed"); - - // Verify both rows are visible - let verify = db.connect().expect("verify conn"); - let mut stmt = verify.prepare("SELECT COUNT(*) FROM t").expect("prepare"); - let mut count = 0i64; - loop { - match stmt.step().expect("step") { - turso_core::StepResult::Row => { - if let Some(row) = stmt.row() { - count = row.get_values().next().unwrap().as_int().unwrap(); - } - } - turso_core::StepResult::Done => break, - turso_core::StepResult::IO => io.step().expect("io"), - _ => {} - } - } - assert_eq!(count, 2, "both inserts should be visible"); -} - -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(all(any(unix, target_os = "windows"), target_pointer_width = "64"))] #[test] fn multiprocess_restart_reuses_persisted_tshm_without_disk_scan() { let mut whopper = create_multiprocess_whopper(2); @@ -678,7 +553,7 @@ fn multiprocess_restart_reuses_persisted_tshm_without_disk_scan() { whopper.finalize().expect("finalize multiprocess whopper"); } -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(all(any(unix, target_os = "windows"), target_pointer_width = "64"))] #[test] fn multiprocess_finalize_after_restart_preserves_simple_kv_rows() { let mut whopper = create_multiprocess_whopper_with_keep(16, true); @@ -729,7 +604,7 @@ fn multiprocess_finalize_after_restart_preserves_simple_kv_rows() { .expect("restart after post-restart insert"); whopper.finalize().expect("finalize multiprocess whopper"); - let io: Arc = Arc::new(PlatformIO::new().expect("platform io")); + let io: Arc = multiprocess_test_io(); let reopened = Database::open_file(io, db_path.to_str().expect("db path utf8")) .expect("reopen finalized database"); let conn = reopened.connect().expect("connect reopened db"); @@ -745,7 +620,7 @@ fn multiprocess_finalize_after_restart_preserves_simple_kv_rows() { let _ = std::fs::remove_file(format!("{db_str}-tshm")); } -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(all(any(unix, target_os = "windows"), target_pointer_width = "64"))] #[test] fn multiprocess_seed_5724542806254236599_restart_then_finalize_preserves_key_684() { configure_worker_exe(); @@ -840,7 +715,7 @@ fn multiprocess_seed_5724542806254236599_restart_then_finalize_preserves_key_684 .finalize() .expect("finalize seeded multiprocess whopper"); - let io: Arc = Arc::new(PlatformIO::new().expect("platform io")); + let io: Arc = multiprocess_test_io(); let reopened = Database::open_file(io, db_path.to_str().expect("db path utf8")) .expect("reopen finalized seeded database"); let conn = reopened.connect().expect("connect reopened seeded db"); @@ -865,7 +740,7 @@ fn multiprocess_seed_5724542806254236599_restart_then_finalize_preserves_key_684 let _ = std::fs::remove_file(format!("{db_str}-tshm")); } -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(all(any(unix, target_os = "windows"), target_pointer_width = "64"))] #[test] fn multiprocess_seed_5724542806254236599_localizes_key_4994_loss() { configure_worker_exe(); @@ -918,7 +793,7 @@ fn multiprocess_seed_5724542806254236599_localizes_key_4994_loss() { .shared_wal_snapshot_direct(10) .expect("read shared WAL snapshot after insert") ); - let observer_io: Arc = Arc::new(PlatformIO::new().expect("platform io")); + let observer_io: Arc = multiprocess_test_io(); let observer_db = Database::open_file_with_flags( observer_io, db_path.to_str().expect("db path utf8"), @@ -963,7 +838,7 @@ fn multiprocess_seed_5724542806254236599_localizes_key_4994_loss() { ); } -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(all(any(unix, target_os = "windows"), target_pointer_width = "64"))] #[test] fn multiprocess_seed_8849519299024683634_localizes_schema_loss_boundary() { configure_worker_exe(); @@ -1020,7 +895,7 @@ fn multiprocess_seed_8849519299024683634_localizes_schema_loss_boundary() { .expect("finalize seeded multiprocess whopper"); } -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(all(any(unix, target_os = "windows"), target_pointer_width = "64"))] #[test] fn multiprocess_truncate_generation_survives_foreign_first_append_and_restart() { let mut whopper = create_multiprocess_whopper(2); @@ -1107,7 +982,7 @@ fn multiprocess_truncate_generation_survives_foreign_first_append_and_restart() whopper.finalize().expect("finalize multiprocess whopper"); } -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(all(any(unix, target_os = "windows"), target_pointer_width = "64"))] #[test] fn multiprocess_restart_rebuilds_from_disk_when_partial_checkpoint_publishes_positive_nbackfills() { let mut whopper = create_multiprocess_whopper(1); @@ -1149,7 +1024,7 @@ fn multiprocess_restart_rebuilds_from_disk_when_partial_checkpoint_publishes_pos whopper.finalize().expect("finalize multiprocess whopper"); } -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(all(any(unix, target_os = "windows"), target_pointer_width = "64"))] #[test] fn multiprocess_restart_rebuilds_from_disk_after_wal_append_invalidates_partial_checkpoint_proof() { let mut whopper = create_multiprocess_whopper(1); @@ -1197,7 +1072,7 @@ fn multiprocess_restart_rebuilds_from_disk_after_wal_append_invalidates_partial_ whopper.finalize().expect("finalize multiprocess whopper"); } -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(all(any(unix, target_os = "windows"), target_pointer_width = "64"))] #[test] fn multiprocess_restart_rebuilds_from_disk_after_partial_checkpoint_proof_is_cleared() { let mut whopper = create_multiprocess_whopper(1); @@ -1228,7 +1103,7 @@ fn multiprocess_restart_rebuilds_from_disk_after_partial_checkpoint_proof_is_cle whopper.finalize().expect("finalize multiprocess whopper"); } -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(all(any(unix, target_os = "windows"), target_pointer_width = "64"))] #[test] fn multiprocess_restart_rebuilds_from_disk_after_db_header_mismatch_invalidates_partial_checkpoint_proof() { @@ -1260,7 +1135,7 @@ fn multiprocess_restart_rebuilds_from_disk_after_db_header_mismatch_invalidates_ whopper.finalize().expect("finalize multiprocess whopper"); } -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(all(any(unix, target_os = "windows"), target_pointer_width = "64"))] #[test] fn multiprocess_restart_stays_conservative_after_unpublished_backfill_proof_install() { let mut whopper = create_multiprocess_whopper(1); @@ -1305,7 +1180,7 @@ fn multiprocess_restart_stays_conservative_after_unpublished_backfill_proof_inst whopper.finalize().expect("finalize multiprocess whopper"); } -#[cfg(all(unix, target_pointer_width = "64"))] +#[cfg(all(any(unix, target_os = "windows"), target_pointer_width = "64"))] #[test] fn multiprocess_restart_rebuilds_from_disk_after_restart_checkpoint_changes_generation() { let mut whopper = create_multiprocess_whopper(1); diff --git a/testing/concurrent-simulator/regression_tests_cross_platform.rs b/testing/concurrent-simulator/regression_tests_cross_platform.rs new file mode 100644 index 0000000000..d77f933e06 --- /dev/null +++ b/testing/concurrent-simulator/regression_tests_cross_platform.rs @@ -0,0 +1,125 @@ +use rand_chacha::ChaCha8Rng; +use rand_chacha::rand_core::SeedableRng; +use std::sync::Arc; +use turso_core::{Database, DatabaseOpts, IO, OpenFlags, Statement}; +use turso_whopper::{IOFaultConfig, SimulatorIO}; + +fn run_to_done(stmt: &mut Statement, io: &SimulatorIO) { + loop { + match stmt.step().expect("step") { + turso_core::StepResult::Done => return, + turso_core::StepResult::IO => io.step().expect("io step"), + _ => {} + } + } +} + +/// Regression test for MVCC concurrent commit yield-spin deadlock. +/// +/// Under round-robin cooperative scheduling, when two BEGIN CONCURRENT +/// transactions commit simultaneously, the VDBE must yield (return +/// StepResult::IO) when pager_commit_lock is held by the other connection. +/// +/// Before the fix in core/vdbe/mod.rs, Completion::new_yield() had +/// finished()==true, so the VDBE inner loop retried without ever returning +/// and both commits could starve. +#[test] +fn test_concurrent_commit_no_yield_spin() { + let io_rng = ChaCha8Rng::seed_from_u64(42); + let fault_config = IOFaultConfig { + cosmic_ray_probability: 0.0, + }; + let io = Arc::new(SimulatorIO::new(false, io_rng, fault_config)); + + let db_path = format!("test-yield-spin-{}.db", std::process::id()); + let db = Database::open_file_with_flags( + io.clone(), + &db_path, + OpenFlags::default(), + DatabaseOpts::new(), + None, + ) + .expect("open db"); + + let setup = db.connect().expect("setup conn"); + setup + .execute("PRAGMA journal_mode = 'mvcc'") + .expect("enable mvcc"); + setup + .execute("CREATE TABLE t(id INTEGER PRIMARY KEY, v TEXT)") + .expect("create table"); + setup.close().expect("close setup"); + + let conn1 = db.connect().expect("conn1"); + let conn2 = db.connect().expect("conn2"); + + let mut stmt = conn1.prepare("BEGIN CONCURRENT").expect("prepare"); + run_to_done(&mut stmt, &io); + let mut stmt = conn2.prepare("BEGIN CONCURRENT").expect("prepare"); + run_to_done(&mut stmt, &io); + + let mut stmt = conn1 + .prepare("INSERT INTO t VALUES (1, 'a')") + .expect("prepare"); + run_to_done(&mut stmt, &io); + let mut stmt = conn2 + .prepare("INSERT INTO t VALUES (2, 'b')") + .expect("prepare"); + run_to_done(&mut stmt, &io); + + let mut commit1 = conn1.prepare("COMMIT").expect("prepare commit1"); + let mut commit2 = conn2.prepare("COMMIT").expect("prepare commit2"); + + let mut done1 = false; + let mut done2 = false; + let max_steps = 10_000; + + for step in 0..max_steps { + if done1 && done2 { + break; + } + + if !done1 { + match commit1.step().expect("commit1 step") { + turso_core::StepResult::Done => done1 = true, + turso_core::StepResult::IO => {} + _ => {} + } + } + io.step().expect("io step"); + + if !done2 { + match commit2.step().expect("commit2 step") { + turso_core::StepResult::Done => done2 = true, + turso_core::StepResult::IO => {} + _ => {} + } + } + io.step().expect("io step"); + + assert!( + step < max_steps - 1, + "concurrent commits did not complete within {max_steps} steps: done1={done1}, done2={done2}" + ); + } + + assert!(done1, "commit1 should have completed"); + assert!(done2, "commit2 should have completed"); + + let verify = db.connect().expect("verify conn"); + let mut stmt = verify.prepare("SELECT COUNT(*) FROM t").expect("prepare"); + let mut count = 0i64; + loop { + match stmt.step().expect("step") { + turso_core::StepResult::Row => { + if let Some(row) = stmt.row() { + count = row.get_values().next().expect("count value").as_int().expect("count int"); + } + } + turso_core::StepResult::Done => break, + turso_core::StepResult::IO => io.step().expect("io"), + _ => {} + } + } + assert_eq!(count, 2, "both inserts should be visible"); +} \ No newline at end of file diff --git a/testing/concurrent-simulator/worker.rs b/testing/concurrent-simulator/worker.rs index f2a76d93ee..8348916794 100644 --- a/testing/concurrent-simulator/worker.rs +++ b/testing/concurrent-simulator/worker.rs @@ -1,8 +1,9 @@ //! Worker process for multiprocess mode. //! -//! Each worker opens the database with real filesystem I/O (UnixIO) and -//! executes SQL commands received from the coordinator over stdin, returning -//! results over stdout using the JSON-line protocol. +//! Each worker opens the database with the host's real multiprocess-capable +//! filesystem I/O backend and executes SQL commands received from the +//! coordinator over stdin, returning results over stdout using the JSON-line +//! protocol. use std::io::{BufRead, BufReader, Write}; use std::sync::Arc; @@ -10,9 +11,10 @@ use std::time::{Duration, Instant}; use turso_core::{ CheckpointMode, Connection, Database, DatabaseOpts, LimboError, OpenFlags, - SharedWalCoordinationOpenTelemetryMode, SharedWalTestingSnapshot, StepResult, UnixIO, Value, + SharedWalCoordinationOpenTelemetryMode, SharedWalTestingSnapshot, StepResult, Value, }; +use crate::multiprocess_platform_io; use crate::protocol::{ self, WorkerCommand, WorkerCoordinationOpenMode, WorkerResponse, WorkerSharedWalSnapshot, WorkerStartupTelemetry, @@ -49,7 +51,7 @@ pub fn run_worker( .with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("warn"))) .try_init(); - let io = Arc::new(UnixIO::new()?); + let io = multiprocess_platform_io()?; let db_opts = DatabaseOpts::new().with_multiprocess_wal(true); let db = Database::open_file_with_flags( io, @@ -362,7 +364,10 @@ fn execute_sql_inner(conn: &Arc, sql: &str) -> WorkerResponse { } Ok(StepResult::IO) => { io_count += 1; - // Real I/O: the operation needs more I/O, keep stepping + stmt.get_pager() + .io + .step() + .expect("worker should advance statement IO"); continue; } Err(turso_core::LimboError::SchemaUpdated) => {