diff --git a/docs/_index.md b/docs/_index.md index d30fc30..4e6d1be 100644 --- a/docs/_index.md +++ b/docs/_index.md @@ -54,7 +54,7 @@ As of May 2026, SQLRite has: - Full-text search + hybrid retrieval (Phase 8 complete): FTS5-style inverted index with BM25 ranking + `fts_match` / `bm25_score` scalar functions + `try_fts_probe` optimizer hook + on-disk persistence with on-demand v4 → v5 file-format bump (8a-8c), a worked hybrid-retrieval example combining BM25 with vector cosine via raw arithmetic (8d), and a `bm25_search` MCP tool symmetric with `vector_search` (8e). See [`docs/fts.md`](fts.md). - SQL surface + DX follow-ups (Phase 9 complete, v0.2.0 → v0.9.1): DDL completeness — `DEFAULT`, `DROP TABLE` / `DROP INDEX`, `ALTER TABLE` (9a); free-list + manual `VACUUM` (9b) + auto-VACUUM (9c); `IS NULL` / `IS NOT NULL` (9d); `GROUP BY` + aggregates + `DISTINCT` + `LIKE` + `IN` (9e); four flavors of `JOIN` — INNER, LEFT, RIGHT, FULL OUTER (9f); prepared statements + `?` parameter binding with a per-connection LRU plan cache (9g); HNSW probe widened to cosine + dot via `WITH (metric = …)` (9h); `PRAGMA` dispatcher with the `auto_vacuum` knob (9i) - Benchmarks against SQLite + DuckDB (Phase 10 complete, SQLR-4 / SQLR-16): twelve-workload bench harness with a pluggable `Driver` trait, criterion-driven, pinned-host runs published. See [`docs/benchmarks.md`](benchmarks.md). -- Phase 11 (concurrent writes via MVCC + `BEGIN CONCURRENT`, SQLR-22) is in flight. **11.1 multi-connection foundation: shipped.** `Connection` is now `Send + Sync` and `Connection::connect()` mints a sibling handle that shares the same backing `Database`. **11.2 logical clock + active-tx registry: shipped on this branch.** New `sqlrite::mvcc` module exposes `MvccClock` (AtomicU64-backed) and `ActiveTxRegistry` (`min_active_begin_ts` for GC). The WAL header bumps v1 → v2 to persist the clock high-water mark; v1 WALs upgrade transparently. Snapshot-isolation reads + `BEGIN CONCURRENT` writes follow in 11.3 / 11.4. Plan: [`docs/concurrent-writes-plan.md`](concurrent-writes-plan.md). +- Phase 11 (concurrent writes via MVCC + `BEGIN CONCURRENT`, SQLR-22) is in flight. **11.1 + 11.2: shipped.** `Connection` is `Send + Sync`; `Connection::connect()` mints sibling handles. `sqlrite::mvcc` exposes `MvccClock` and `ActiveTxRegistry`. WAL header v1 → v2 persists the clock high-water mark. **11.3 `MvStore` + `PRAGMA journal_mode`: shipped on this branch.** New `MvStore` (the in-memory version index keyed by `RowID`, with the snapshot-isolation visibility rule `begin <= T < end`) and the `JournalMode { Wal, Mvcc }` per-database toggle reachable via `PRAGMA journal_mode = mvcc;`. The executor doesn't consult `MvStore` yet — that wiring lands in 11.4 alongside `BEGIN CONCURRENT` writes (read-side and write-side are coupled). Plan: [`docs/concurrent-writes-plan.md`](concurrent-writes-plan.md). - A fully-automated release pipeline that ships every product to its registry on every release with one human action — Rust engine + `sqlrite-ask` + `sqlrite-mcp` to crates.io, Python wheels to PyPI (`sqlrite`), Node.js + WASM to npm (`@joaoh82/sqlrite` + `@joaoh82/sqlrite-wasm`), Go module via `sdk/go/v*` git tag, plus C FFI tarballs, MCP binary tarballs, and unsigned desktop installers as GitHub Release assets (Phase 6 complete) See the [Roadmap](roadmap.md) for the full phase plan. diff --git a/docs/design-decisions.md b/docs/design-decisions.md index c76fdc9..364a864 100644 --- a/docs/design-decisions.md +++ b/docs/design-decisions.md @@ -172,6 +172,24 @@ Decisions are grouped by the engine layer they concern: parser, storage, concurr --- +### 12c. `MvStore` data structure + `JournalMode` toggle land before the read path uses them (Phase 11.3) + +**Decision.** [`MvStore`](../src/mvcc/store.rs) (the in-memory version index) and the [`JournalMode`](../src/mvcc/mod.rs) enum (with the `PRAGMA journal_mode = wal | mvcc` SQL surface) ship together in Phase 11.3, but the executor's read path **does not consult `MvStore`** until 11.4. `Database` grows two new fields (`mvcc_clock: Arc`, `mv_store: MvStore`); both are allocated on every `Database::new`, even when the journal mode is `Wal`. + +**Why ship the data structures before the read-side wiring.** The snapshot-isolation contract requires that the read path see versions the write path produced. In v0 our writes happen via the legacy `Database.tables` mutation followed by a per-page WAL commit; those don't push into `MvStore`. So if 11.3 wired reads through `MvStore`, every read would see an empty store and return wrong rows. Routing reads through `MvStore` only makes sense once the *commit path* is mirroring writes into it — and that's a non-trivial change (the commit timestamp must come from `MvccClock`, the cap rule has to fire on the previous version, schema changes must invalidate the store). 11.4 ships both halves together because they're coupled. 11.3 ships the parts that *aren't* coupled (the data structure + the toggle) so the diffs stay reviewable. + +**Why allocate `mvcc_clock` + `mv_store` even in `Wal` mode.** Two reasons: +- `PRAGMA journal_mode = mvcc;` shouldn't have to lazy-construct anything mid-statement. Constructing `MvccClock` is cheap (one `AtomicU64`); `MvStore` is a `Mutex` (zero-allocation when empty). +- Sibling `Connection::connect` handles can outlive the moment when MVCC was enabled. If the clock were lazy, a sibling connecting before MVCC was first enabled wouldn't observe the same clock as one connecting after — a confusing footgun. Allocating eagerly on `Database::new` means every sibling shares the same `Arc` from day one. + +**Why `Mvcc → Wal` is rejected when the store has committed versions.** The `MvStore` is the only durable record of those versions until 11.5's checkpoint integration drains them into the pager. Switching back to `Wal` mode would either silently strand them (correctness bug) or quietly discard them (data loss). v0 fails the PRAGMA with a typed error and lets the caller decide what to do. When 11.5 lands, "drain to pager then switch" becomes legal. + +**Why per-database, not per-connection.** [`concurrent-writes-plan.md`](concurrent-writes-plan.md) §8 flags this as an open question. Per-connection is more flexible (a maintenance connection can stay in WAL mode while app connections use MVCC); per-database is closer to user expectation and matches SQLite's `PRAGMA journal_mode` semantic. For 11.3 we picked per-database for simplicity — the journal-mode field lives on `Database`, every `Connection::connect` sibling sees the same value. If the per-connection trade-off becomes load-bearing later, the dispatch lives behind `Connection::journal_mode()` already, so callers don't need to change. + +**Plan-doc reference.** [`concurrent-writes-plan.md`](concurrent-writes-plan.md) §4.2 (version index), §6 (SQL surface), §8 (open questions on per-connection vs per-database journal mode). + +--- + ## Query execution ### 13. `NULL`-as-false in `WHERE` clauses diff --git a/docs/roadmap.md b/docs/roadmap.md index 7b45d50..56622b9 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -589,19 +589,25 @@ Lift SQLRite past SQLite's single-writer ceiling with multi-version concurrency `Connection` is a thin handle backed by `Arc>`. Call [`Connection::connect`] to mint a sibling that shares the same engine state — typically one per worker thread. The headline contract: `Connection` is `Send + Sync`, and the engine no longer requires the caller to wrap the public API in their own `Mutex`. Today every operation still serializes through the per-database mutex (and the pager's existing process-level flock), so the behaviour change is *capability*, not throughput; concurrent throughput arrives with `BEGIN CONCURRENT` in 11.4. -### 🚧 Phase 11.2 — Logical clock + active-tx registry *(in progress, plan-doc "Phase 10.2")* +### ✅ Phase 11.2 — Logical clock + active-tx registry *(plan-doc "Phase 10.2")* -New [`sqlrite::mvcc`](../src/mvcc/) module: +[`sqlrite::mvcc`](../src/mvcc/) module: - `MvccClock` — process-wide monotonic `u64` over `AtomicU64`. `tick()` hands out begin- / commit-timestamps; `now()` reads the high-water without advancing it; `observe(value)` advances the clock to `value` if greater (used at WAL replay). - `ActiveTxRegistry` — `Mutex` over in-flight transactions. `register(&clock)` allocates a `TxId`, snapshots `begin_ts`, and returns a RAII `TxHandle`; `min_active_begin_ts()` answers Phase 11.6 GC's "what's still possibly visible" question. - `TxId` newtype + `TxTimestampOrId` tagged union — defined now so 11.4 can plug in without re-litigating the type shape. -WAL format bumps **v1 → v2**: bytes 24..32 of the WAL header (previously reserved-zero) now carry the persisted `clock_high_water` `u64`. v1 WALs open cleanly — those zero bytes read as "clock never advanced" — and the next checkpoint rewrites the header at v2. No offline upgrade step. `Wal::set_clock_high_water` / `Wal::clock_high_water` accessors expose the field; the setter rejects regressions with a typed error. The clock isn't wired into the executor yet (that's 11.3); the persistence + restore plumbing is in place so 11.3 just reads the high-water at open and seeds the in-memory clock. +WAL format bumps **v1 → v2**: bytes 24..32 of the WAL header (previously reserved-zero) now carry the persisted `clock_high_water` `u64`. v1 WALs open cleanly — those zero bytes read as "clock never advanced" — and the next checkpoint rewrites the header at v2. No offline upgrade step. `Wal::set_clock_high_water` / `Wal::clock_high_water` accessors expose the field; the setter rejects regressions with a typed error. -### Phase 11.3 — `MvStore` skeleton + snapshot-isolation reads *(planned)* +### 🚧 Phase 11.3 — `MvStore` skeleton + `PRAGMA journal_mode` opt-in *(in progress, plan-doc "Phase 10.3")* -In-memory version index + `PRAGMA journal_mode = mvcc` opt-in. Lazy-loads versions from the pager on first touch. Writes still go through the legacy path — only reads change. +Standalone version-index data structure + the per-database journal-mode toggle. + +- New [`MvStore`](../src/mvcc/store.rs): `Mutex>>>>`. `RowID = (table, rowid)`; each `RowVersion` carries `begin: TxTimestampOrId`, `end: Option`, `payload: VersionPayload` (`Present(cols)` or `Tombstone`). `MvStore::read(row, begin_ts)` implements the textbook snapshot-isolation visibility rule (`begin <= T < end`). `push_committed` validates monotonicity + caps the previous latest version's `end`; `push_in_flight` adds a placeholder version that's invisible to other readers until commit rewrites its `begin`. +- New [`JournalMode`](../src/mvcc/mod.rs) enum (`Wal` default, `Mvcc`); per-database setting on `Database`. `PRAGMA journal_mode = wal | mvcc;` toggles; `PRAGMA journal_mode;` returns the current value as a single-row, single-column result. `Connection::journal_mode()` reads the value through the public API. Switching `Mvcc → Wal` is rejected if the store carries committed versions (would silently strand them); v0 is intentionally strict. +- `Database` grows `mvcc_clock: Arc` and `mv_store: MvStore` fields, allocated on every `Database::new` so the toggle to MVCC mode doesn't require a re-init step. Both are shared across every `Connection::connect` sibling. + +The executor doesn't consult `MvStore` yet — that wiring lives in 11.4 alongside `BEGIN CONCURRENT` writes (the read-side and write-side are coupled: snapshot reads make sense only once the commit path is mirroring versions into the store). 11.3's contract is *the data structure + the toggle exist and round-trip*; 11.4 will turn the dial. ### Phase 11.4 — `BEGIN CONCURRENT` writes + commit-time validation *(planned, the meat)* diff --git a/docs/supported-sql.md b/docs/supported-sql.md index a63b73e..62e1bce 100644 --- a/docs/supported-sql.md +++ b/docs/supported-sql.md @@ -562,6 +562,23 @@ PRAGMA auto_vacuum = OFF; -- disable; equivalent: NONE, 'OFF', 'NONE' Out-of-range values (anything outside `0.0..=1.0`, `NaN`, `±∞`) and unknown identifiers like `WAL` / `FULL` are rejected with a typed error — the trigger never silently saturates or falls back to a default. The setting is per-`Connection` runtime state — it's not persisted in the file header, so every reopen starts at the default `Some(0.25)`. +### `PRAGMA journal_mode` (Phase 11.3, SQLR-22) + +Selects the per-database concurrency model. `wal` (default) is the legacy WAL-backed pager every pre-Phase-11 build used; `mvcc` opts the database into multi-version concurrency control (Phase 11 — concurrent writes via `BEGIN CONCURRENT`). + +```sql +PRAGMA journal_mode; -- read; renders a single-row "wal" or "mvcc" +PRAGMA journal_mode = mvcc; -- opt into MVCC for this database +PRAGMA journal_mode = wal; -- switch back (rejected if the MvStore + -- already carries committed versions) +``` + +Case-insensitive on both the pragma name and the value. Quoted values (`'mvcc'`) work; numeric values are rejected (the field is enum-shaped). Unknown modes return a typed error and don't disturb the existing setting. + +The setting is **per-database** — every `Connection::connect` sibling sees the same value (the [open-question](concurrent-writes-plan.md) on per-connection vs per-database journal mode resolved to per-database for v0; revisit if a workload requires the per-connection variant). Reachable through the public API as `Connection::journal_mode() -> JournalMode`. + +**What 11.3 changes:** the toggle is observable. The data structures backing MVCC (`MvccClock`, `MvStore`, the active-transaction registry) are allocated and round-trip through `PRAGMA`. **What 11.3 does *not* change yet:** the executor's read path. SELECTs still go through the legacy `tables → pager` path regardless of journal mode. End-to-end snapshot-isolation reads + `BEGIN CONCURRENT` writes land together in 11.4 — the read-side and write-side are coupled, and shipping one without the other would surface as wrong rows. + --- ## Read-only databases @@ -638,7 +655,7 @@ For context when you hit `NotImplemented`. See [Roadmap](roadmap.md) for when th ### Session / schema - Multiple attached databases (`ATTACH DATABASE`, `DETACH DATABASE`) -- `PRAGMA` statements other than `auto_vacuum` (SQLR-13). The dispatcher is in place — adding a pragma is a single arm in `execute_pragma`. `journal_mode`, `synchronous`, `cache_size`, etc. are not yet wired up +- `PRAGMA` statements other than `auto_vacuum` (SQLR-13) and `journal_mode` (SQLR-22 / Phase 11.3). The dispatcher is in place — adding a pragma is a single arm in `execute_pragma`. `synchronous`, `cache_size`, etc. are not yet wired up - `REPLACE INTO`, `INSERT OR IGNORE`, `INSERT OR REPLACE` (conflict-resolution clauses) --- diff --git a/src/connection.rs b/src/connection.rs index 1cc94a0..c3bf64c 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -347,6 +347,17 @@ impl Connection { self.lock().is_read_only() } + /// Phase 11.3 — current journal mode. `Wal` (default) keeps every + /// pre-Phase-11 caller's behaviour. `Mvcc` is opt-in via + /// `PRAGMA journal_mode = mvcc;`. Per-database — every + /// [`Connection::connect`] sibling sees the same value. + /// + /// Today this is observable but doesn't change query behaviour; + /// 11.4 wires `Mvcc` mode into the read/write paths. + pub fn journal_mode(&self) -> crate::mvcc::JournalMode { + self.lock().journal_mode() + } + /// Escape hatch for advanced callers — locks the shared `Database` /// and hands back the guard. Not part of the stable API; will move /// or change as Phase 10's MVCC sub-phases land. @@ -1512,6 +1523,90 @@ mod tests { assert_sync::(); } + // ----------------------------------------------------------------- + // Phase 11.3 — `PRAGMA journal_mode` round-trip + // ----------------------------------------------------------------- + + /// Fresh connections default to `wal` mode. The PRAGMA read form + /// renders the current value as a single-row, single-column table + /// the REPL can print. + #[test] + fn journal_mode_defaults_to_wal_and_renders_through_pragma() { + let mut conn = Connection::open_in_memory().unwrap(); + assert_eq!(conn.journal_mode(), crate::mvcc::JournalMode::Wal); + + // Read form returns "1 row returned." status (matching + // `auto_vacuum`'s shape). + let status = conn.execute("PRAGMA journal_mode;").unwrap(); + assert!( + status.contains("1 row returned"), + "unexpected status: {status}" + ); + } + + /// `PRAGMA journal_mode = mvcc;` flips the per-database mode and + /// is observable through every sibling handle. The headline + /// per-database contract for Phase 11.3. + #[test] + fn journal_mode_set_to_mvcc_propagates_to_siblings() { + let mut primary = Connection::open_in_memory().unwrap(); + let sibling = primary.connect(); + assert_eq!(sibling.journal_mode(), crate::mvcc::JournalMode::Wal); + + primary.execute("PRAGMA journal_mode = mvcc;").unwrap(); + assert_eq!(primary.journal_mode(), crate::mvcc::JournalMode::Mvcc); + // Sibling sees the same value — proves the setting lives on + // the shared `Database`, not on the per-handle Connection. + assert_eq!(sibling.journal_mode(), crate::mvcc::JournalMode::Mvcc); + + // Switch back is allowed because no MVCC versions exist yet + // (11.4 will populate the store). + primary.execute("PRAGMA journal_mode = wal;").unwrap(); + assert_eq!(primary.journal_mode(), crate::mvcc::JournalMode::Wal); + assert_eq!(sibling.journal_mode(), crate::mvcc::JournalMode::Wal); + } + + /// The set form is case-insensitive on both the pragma name and + /// the value (matching SQLite). Quoted values work too. + #[test] + fn journal_mode_pragma_is_case_insensitive() { + let mut conn = Connection::open_in_memory().unwrap(); + conn.execute("PRAGMA JOURNAL_MODE = MVCC;").unwrap(); + assert_eq!(conn.journal_mode(), crate::mvcc::JournalMode::Mvcc); + conn.execute("pragma journal_mode = 'wal';").unwrap(); + assert_eq!(conn.journal_mode(), crate::mvcc::JournalMode::Wal); + } + + /// Unknown modes return a typed error and don't disturb the + /// existing setting. + #[test] + fn journal_mode_rejects_unknown_value() { + let mut conn = Connection::open_in_memory().unwrap(); + let err = conn + .execute("PRAGMA journal_mode = delete;") + .expect_err("unknown mode must error"); + let msg = format!("{err}"); + assert!( + msg.contains("unknown mode 'delete'"), + "unexpected error: {msg}" + ); + // Setting wasn't disturbed. + assert_eq!(conn.journal_mode(), crate::mvcc::JournalMode::Wal); + } + + /// Numeric values are rejected — `journal_mode` is enum-shaped. + /// SQLite accepts e.g. `journal_mode = 0` for OFF historically; + /// SQLRite stays explicit. + #[test] + fn journal_mode_rejects_numeric_value() { + let mut conn = Connection::open_in_memory().unwrap(); + let err = conn + .execute("PRAGMA journal_mode = 0;") + .expect_err("numeric mode must error"); + let msg = format!("{err}"); + assert!(msg.contains("numeric"), "unexpected error: {msg}"); + } + #[test] fn prepare_cached_executes_the_same_as_prepare() { let mut conn = Connection::open_in_memory().unwrap(); diff --git a/src/mvcc/mod.rs b/src/mvcc/mod.rs index 45c1da3..4284d54 100644 --- a/src/mvcc/mod.rs +++ b/src/mvcc/mod.rs @@ -1,30 +1,130 @@ //! Multi-version concurrency control primitives (Phase 11). //! -//! This module is the foundation for SQLRite's `BEGIN CONCURRENT` story -//! — see [`docs/concurrent-writes-plan.md`](../../docs/concurrent-writes-plan.md) -//! for the full sequenced design. As of **Phase 11.2** it carries the -//! standalone primitives that the rest of the work hangs off: +//! This module is the foundation for SQLRite's `BEGIN CONCURRENT` +//! story — see [`docs/concurrent-writes-plan.md`](../../docs/concurrent-writes-plan.md) +//! for the full sequenced design. //! -//! - [`MvccClock`] — a process-wide monotonic `u64` counter that hands -//! out begin- and commit-timestamps. Persisted to the WAL header so -//! timestamps don't reuse the same value across reopens. +//! Surface as of Phase 11.3: +//! +//! - [`MvccClock`] — process-wide monotonic `u64` counter that hands +//! out begin- and commit-timestamps. Persisted to the WAL header +//! so timestamps don't reuse the same value across reopens. //! - [`ActiveTxRegistry`] — tracks the begin-timestamps of in-flight -//! transactions. Garbage collection (Phase 11.6) needs -//! [`ActiveTxRegistry::min_active_begin_ts`] to know which versions -//! are still possibly visible to a live reader. -//! - [`TxId`] — opaque newtype around a `u64`, allocated by the clock -//! while a transaction is in flight. After commit the same value is -//! reused as the row version's `begin` timestamp; the discriminator -//! between "in-flight transaction id" and "committed timestamp" -//! lives in [`TxTimestampOrId`]. +//! transactions; [`ActiveTxRegistry::min_active_begin_ts`] is the +//! GC watermark. +//! - [`TxId`] / [`TxTimestampOrId`] — types the version chains +//! carry. +//! - [`MvStore`] — the in-memory version index. Holds row chains +//! keyed by [`RowID`]; `read(row, begin_ts)` implements the +//! snapshot-isolation visibility rule (`begin <= T < end`). +//! - [`JournalMode`] — per-database setting toggled by +//! `PRAGMA journal_mode = …`. `Wal` (default) keeps every +//! pre-Phase-11 read path in place; `Mvcc` is the opt-in that +//! 11.4 will wire reads through. //! -//! Nothing in the executor reads from these yet — Phase 11.3 wires -//! them into a new `MvStore` in front of the pager. Keeping the -//! plumbing standalone in 11.2 means the Phase 11.4 `BEGIN CONCURRENT` -//! work can pull them in without re-litigating the foundation. +//! The executor doesn't consult `MvStore` yet — that wiring lives +//! in 11.4 alongside `BEGIN CONCURRENT` writes. Decoupling the +//! data structure (this PR) from the read/write integration (next +//! PR) keeps the diffs reviewable. pub mod clock; pub mod registry; +pub mod store; pub use clock::MvccClock; pub use registry::{ActiveTxRegistry, TxHandle, TxId, TxTimestampOrId}; +pub use store::{MvStore, MvStoreError, RowID, RowVersion, RowVersionChain, VersionPayload}; + +/// Selects the durability + concurrency story a database operates +/// under. Toggled by `PRAGMA journal_mode = …` (see +/// [`crate::sql::pragma::execute_pragma`]). +/// +/// - [`JournalMode::Wal`] (default) — every read goes through the +/// legacy table → pager path; every write fsyncs a per-page +/// commit frame. This is the only mode pre-Phase-11 builds knew +/// about, and it's what file-format-v5 + WAL-format-v2 files +/// produce by default. +/// - [`JournalMode::Mvcc`] — opts the database into Phase 11's +/// multi-version concurrency control. Enables snapshot-isolated +/// reads (consult `MvStore` first, fall back to the pager) and +/// `BEGIN CONCURRENT` writes (Phase 11.4). On-disk format is +/// unchanged; the WAL header's `clock_high_water` byte range +/// carries the persisted clock value either way. +/// +/// Phase 11.3 ships the parser surface and the per-database +/// setting; the read path doesn't change behaviour yet. The +/// `Mvcc` value is observable via the PRAGMA read form so callers +/// can confirm the toggle landed. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum JournalMode { + /// Default — legacy WAL-backed pager. Every commit fsyncs a + /// page-level frame; every read consults `staged → wal_cache + /// → on_disk`. + #[default] + Wal, + /// Phase 11 MVCC + `BEGIN CONCURRENT`. Same on-disk format as + /// `Wal`; the in-memory `MvStore` sits in front of the pager + /// for reads, and writes go through commit-time validation. + Mvcc, +} + +impl JournalMode { + /// Parses a PRAGMA value (case-insensitive). Returns `None` for + /// unrecognized inputs so the caller can surface a typed + /// `unknown journal_mode` error with the bad string. + pub fn from_str_lossless(s: &str) -> Option { + match s.to_ascii_lowercase().as_str() { + "wal" => Some(Self::Wal), + "mvcc" => Some(Self::Mvcc), + _ => None, + } + } + + /// The lowercase string form the PRAGMA read renders. + pub fn as_str(&self) -> &'static str { + match self { + Self::Wal => "wal", + Self::Mvcc => "mvcc", + } + } +} + +impl std::fmt::Display for JournalMode { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(self.as_str()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn journal_mode_default_is_wal() { + assert_eq!(JournalMode::default(), JournalMode::Wal); + } + + #[test] + fn journal_mode_round_trips_through_str() { + assert_eq!( + JournalMode::from_str_lossless("wal"), + Some(JournalMode::Wal) + ); + assert_eq!( + JournalMode::from_str_lossless("WAL"), + Some(JournalMode::Wal) + ); + assert_eq!( + JournalMode::from_str_lossless("Mvcc"), + Some(JournalMode::Mvcc) + ); + assert_eq!(JournalMode::from_str_lossless("delete"), None); + assert_eq!(JournalMode::from_str_lossless(""), None); + } + + #[test] + fn journal_mode_displays_lowercase() { + assert_eq!(format!("{}", JournalMode::Wal), "wal"); + assert_eq!(format!("{}", JournalMode::Mvcc), "mvcc"); + } +} diff --git a/src/mvcc/store.rs b/src/mvcc/store.rs new file mode 100644 index 0000000..cbe2452 --- /dev/null +++ b/src/mvcc/store.rs @@ -0,0 +1,681 @@ +//! [`MvStore`] — the in-memory version index sitting in front of +//! the pager (Phase 11.3 skeleton). +//! +//! Per [`docs/concurrent-writes-plan.md`](../../../docs/concurrent-writes-plan.md): +//! +//! > The MVCC store keeps an in-memory map keyed by `RowID +//! > { table_id, row_key }` whose value is a chain of `RowVersion` +//! > records. Each version carries `begin`/`end` timestamps and the +//! > row payload itself. Visibility for a reader transaction with +//! > begin-timestamp `T` is the textbook snapshot-isolation rule: +//! > pick the version whose `begin <= T < end`. +//! +//! Phase 11.3 lands the standalone data structures + visibility +//! logic so 11.4 can plug them into: +//! +//! - the **executor's read path** when the connection is in MVCC +//! journal mode (the [`super::JournalMode`] enum); +//! - the **commit path**, which mirrors successful writes from the +//! legacy `Database::tables` map into the MvStore at the assigned +//! `commit_ts` and ends the previous latest version at the same +//! timestamp. +//! +//! Today nothing in the executor calls into this module. The +//! `PRAGMA journal_mode = mvcc` switch parses but doesn't change +//! query behaviour. That's intentional — committing to a half-wired +//! read path before the write side exists would force 11.4's +//! commit-validation work into this PR. The two are coupled and +//! ship together. +//! +//! ## Why one big mutex per chain rather than a per-row lock +//! +//! v0 stores each row's version chain inside an +//! `Arc>>`. The outer map is a +//! `Mutex>`. Two reasons not to over-engineer: +//! +//! 1. The plan-doc explicitly calls this out: +//! > One chain per row, behind `RwLock` (or `parking_lot::RwLock`). +//! > The wait-free chain is a known follow-up; it's not on the v0 +//! > critical path. +//! 2. The hot path is `MvStore::read`, which takes the outer lock to +//! fetch the `Arc>`, drops it, then takes the chain's +//! `RwLock` in read mode for the visibility scan. The outer lock +//! is held only long enough to clone an `Arc`. +//! +//! When the commit path lands (11.4) and we observe contention, a +//! sharded outer map (e.g. `dashmap`) becomes the obvious upgrade — +//! same `RowID → chain` shape, just multiple shards. None of +//! `MvStore`'s public surface assumes the inner storage shape, so +//! the swap is local. + +use std::collections::HashMap; +use std::sync::{Arc, Mutex, RwLock}; + +use crate::sql::db::table::Value; + +use super::clock::MvccClock; +use super::registry::{ActiveTxRegistry, TxTimestampOrId}; + +/// Identifies a row across the MvStore. v0 keys by table name + +/// rowid because the engine doesn't yet have a stable numeric +/// `table_id` (the schema catalog is keyed by name). When 11.5 +/// lands a numeric table id (likely as part of the checkpoint +/// integration so the index doesn't carry a `String` per row), +/// flip this to `(u32, i64)` — every consumer of `RowID` only +/// uses it for hashing / equality, so the rename is local. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct RowID { + pub table: String, + pub rowid: i64, +} + +impl RowID { + pub fn new(table: impl Into, rowid: i64) -> Self { + Self { + table: table.into(), + rowid, + } + } +} + +/// What a [`RowVersion`] records. `Present` carries the row's +/// column values at the moment of commit; `Tombstone` records that +/// the row was deleted at this version's `begin` timestamp. +/// +/// Storing column-value pairs as a `Vec<(String, Value)>` rather +/// than `BTreeMap` because: +/// - The vector preserves declaration order (stable for tests + +/// diagnostics). +/// - Lookups by column are rare on this path — the executor walks +/// the row by projection order. +#[derive(Debug, Clone, PartialEq)] +pub enum VersionPayload { + /// Row exists with the given column-value pairs. + Present(Vec<(String, Value)>), + /// Row was deleted at this version's `begin` timestamp. Visible + /// readers see "no such row"; readers older than `begin` still + /// see whatever the previous version held. + Tombstone, +} + +/// One link in a row's version chain. +/// +/// Visibility under snapshot isolation is the textbook rule the +/// Hekaton paper formalises and Turso's MVCC implements: +/// +/// - `begin <= T`: the version was committed at or before the +/// reader's begin-timestamp. (For an in-flight version +/// `begin = Id(tx)`, only the producing transaction can see it.) +/// - `end > T` or `end is None`: the version hasn't been superseded +/// yet from the reader's point of view. +/// +/// Both conditions must hold. See [`MvStore::visible_at`]. +#[derive(Debug, Clone)] +pub struct RowVersion { + pub begin: TxTimestampOrId, + pub end: Option, + pub payload: VersionPayload, +} + +impl RowVersion { + /// Builds a freshly-committed version at `commit_ts` with no + /// `end` (i.e. currently latest). This is the shape the legacy + /// commit path produces in 11.4 when it mirrors a row write. + pub fn committed(commit_ts: u64, payload: VersionPayload) -> Self { + Self { + begin: TxTimestampOrId::Timestamp(commit_ts), + end: None, + payload, + } + } + + /// Builds an in-flight version owned by `tx_id`. v0 tests use + /// this to construct chains by hand; the production write path + /// (11.4) will own it. + pub fn in_flight(tx_id: super::TxId, payload: VersionPayload) -> Self { + Self { + begin: TxTimestampOrId::Id(tx_id), + end: None, + payload, + } + } +} + +/// A row's version chain. Newest version at the back — easy +/// `push_version` semantics; reads scan from the back since that's +/// where most queries' `begin_ts` lands. +pub type RowVersionChain = Vec; + +/// In-memory MVCC version index. Cheap to clone — the heavy state +/// is behind `Arc`s. +#[derive(Clone, Debug)] +pub struct MvStore { + inner: Arc, +} + +#[derive(Debug)] +struct MvStoreInner { + /// `RowID → version chain`. Outer `Mutex` guards the map's + /// shape (insert / lookup); the per-chain `RwLock` guards the + /// `Vec` (so two readers walking different chains don't fight, + /// and the writer that ends the latest version doesn't block + /// readers on other chains). + versions: Mutex>>>, + clock: Arc, + active: ActiveTxRegistry, +} + +impl MvStore { + /// Builds an empty store wired to a shared clock + registry. + /// Phase 11.3 wires this into `Database` so every connection + /// observes the same version index; 11.2's `Wal::clock_high_water` + /// seeds the clock at open time. + pub fn new(clock: Arc) -> Self { + Self { + inner: Arc::new(MvStoreInner { + versions: Mutex::new(HashMap::new()), + clock, + active: ActiveTxRegistry::new(), + }), + } + } + + /// Convenience for tests + standalone callers — builds a store + /// over a freshly-allocated clock seeded at 0. The clock is + /// returned so the caller can `tick()` it to allocate + /// timestamps for hand-built versions. + pub fn fresh() -> (Self, Arc) { + let clock = Arc::new(MvccClock::new(0)); + let store = Self::new(Arc::clone(&clock)); + (store, clock) + } + + /// Returns the shared clock. The same `Arc` every consumer + /// (commit path, read path, GC) holds. + pub fn clock(&self) -> &Arc { + &self.inner.clock + } + + /// Returns the active-transaction registry. Phase 11.4 will + /// register `BEGIN CONCURRENT` transactions here; Phase 11.6 + /// reads `min_active_begin_ts()` to set the GC watermark. + pub fn active_registry(&self) -> &ActiveTxRegistry { + &self.inner.active + } + + /// Number of rows the store holds at least one version for. + /// Cheap diagnostic — locks only the outer map briefly. + pub fn tracked_rows(&self) -> usize { + self.lock_map().len() + } + + /// Total versions across every chain. Linear in row count; + /// intended for tests + assertions, not the hot path. + pub fn total_versions(&self) -> usize { + let map = self.lock_map(); + map.values() + .map(|chain| chain.read().expect("chain RwLock poisoned").len()) + .sum() + } + + /// Returns the version of `row_id` that's visible to a reader + /// transaction whose begin-timestamp is `begin_ts`, or `None` + /// if no version satisfies the snapshot-isolation rule. + /// + /// Snapshot-isolation visibility: + /// - the version's `begin` is a committed timestamp `<= begin_ts`, + /// and + /// - the version's `end` is `None` (still latest) or a committed + /// timestamp `> begin_ts`. + /// + /// In-flight versions (`begin = Id(_)`) are never visible to + /// other readers — they're a placeholder until the producing + /// transaction either commits (the version's `begin` is rewritten + /// to a `Timestamp`) or aborts (the version is dropped). The + /// producing transaction itself reads its own writes through a + /// separate path (Phase 11.4); it doesn't go through this + /// function. + /// + /// The chain is scanned **front to back**: in v0 we don't trust + /// any insertion order, so the loop must not exit early. When + /// the chain becomes ordered-by-`begin` (a natural property of + /// the commit path's append-only writes in 11.4), this can + /// short-circuit on the first visible version. + pub fn read(&self, row_id: &RowID, begin_ts: u64) -> Option { + let chain = { + let map = self.lock_map(); + Arc::clone(map.get(row_id)?) + }; + let chain = chain.read().expect("chain RwLock poisoned"); + for v in chain.iter() { + if Self::visible_at(v, begin_ts) { + return Some(v.payload.clone()); + } + } + None + } + + /// Returns true if `version` is visible to a reader whose + /// begin-timestamp is `begin_ts`. Pure function — exposed for + /// tests + future GC code. + pub fn visible_at(version: &RowVersion, begin_ts: u64) -> bool { + // begin must be a committed timestamp <= begin_ts. + let begin_ok = match version.begin { + TxTimestampOrId::Timestamp(t) => t <= begin_ts, + TxTimestampOrId::Id(_) => false, + }; + if !begin_ok { + return false; + } + // end must be None (still latest) OR a committed timestamp + // strictly > begin_ts. An in-flight `Id(_)` cap means some + // other transaction is in the process of superseding this + // version but hasn't committed yet — from the reader's + // perspective the version is still latest. + match version.end { + None => true, + Some(TxTimestampOrId::Timestamp(t)) => t > begin_ts, + Some(TxTimestampOrId::Id(_)) => true, + } + } + + /// Pushes a new version onto the chain for `row_id`. Caps the + /// chain's previous latest version (if any) at `version.begin` + /// — the canonical write-side bookkeeping the commit path will + /// use in 11.4. + /// + /// `version.begin` must be a `Timestamp` (committed) — pushing + /// an in-flight version through this entry point would break + /// the cap rule. Use [`MvStore::push_in_flight`] for in-flight + /// versions; commit will rewrite their `begin` later. + /// + /// Errors if the new `begin` is `<= the previous latest's + /// begin` (violates monotonicity — the commit path must always + /// hand out increasing timestamps via the `MvccClock`). + pub fn push_committed(&self, row_id: RowID, version: RowVersion) -> Result<(), MvStoreError> { + let begin_ts = match version.begin { + TxTimestampOrId::Timestamp(t) => t, + TxTimestampOrId::Id(_) => return Err(MvStoreError::NotCommitted), + }; + let chain_arc = self.get_or_create_chain(row_id); + let mut chain = chain_arc.write().expect("chain RwLock poisoned"); + if let Some(prev) = chain.last() { + // Validate before mutating — a failed validation must + // not leave the chain in a half-capped state. (Earlier + // drafts mutated `prev.end` first, then ran these + // checks; equal-begin retries then surfaced as + // `PreviousAlreadyCapped` instead of the + // `NonMonotonicBegin` callers expect.) + let prev_begin = match prev.begin { + TxTimestampOrId::Timestamp(t) => t, + TxTimestampOrId::Id(_) => 0, + }; + if begin_ts <= prev_begin { + return Err(MvStoreError::NonMonotonicBegin { + prev: prev_begin, + new: begin_ts, + }); + } + match prev.end { + None => {} + Some(TxTimestampOrId::Timestamp(existing)) if existing == begin_ts => { + // Idempotent replay — already capped at exactly + // this timestamp (recovery path will hit this). + } + Some(TxTimestampOrId::Timestamp(existing)) => { + return Err(MvStoreError::PreviousAlreadyCapped { existing }); + } + Some(TxTimestampOrId::Id(_)) => { + // An in-flight cap means another transaction + // owns the supersession; the commit path + // shouldn't hit this in 11.4 (validation runs + // first). v0 returns a typed error rather than + // silently overwriting. + return Err(MvStoreError::PreviousCappedByInFlight); + } + } + } + // Validation passed — apply the cap (if any) and push. + if let Some(prev) = chain.last_mut() { + if prev.end.is_none() { + prev.end = Some(TxTimestampOrId::Timestamp(begin_ts)); + } + } + chain.push(version); + Ok(()) + } + + /// Pushes an in-flight version onto the chain. Used by the + /// 11.4 write path while a `BEGIN CONCURRENT` transaction is + /// open; the version's `begin` is rewritten from `Id(tx)` to + /// `Timestamp(commit_ts)` on commit, and the previous latest + /// gets capped at the same timestamp (via [`Self::push_committed`] + /// at commit time, after the in-flight version is removed). + /// + /// 11.3 ships this as standalone API for tests; 11.4 wires it + /// into the executor. + pub fn push_in_flight(&self, row_id: RowID, version: RowVersion) { + let chain_arc = self.get_or_create_chain(row_id); + let mut chain = chain_arc.write().expect("chain RwLock poisoned"); + chain.push(version); + } + + fn get_or_create_chain(&self, row_id: RowID) -> Arc> { + let mut map = self.lock_map(); + Arc::clone( + map.entry(row_id) + .or_insert_with(|| Arc::new(RwLock::new(Vec::new()))), + ) + } + + fn lock_map(&self) -> std::sync::MutexGuard<'_, HashMap>>> { + self.inner + .versions + .lock() + .unwrap_or_else(|e| panic!("sqlrite: MvStore versions mutex poisoned: {e}")) + } +} + +/// Errors returned by mutating MvStore operations. Read-side calls +/// (`read`, `visible_at`) don't error. +#[derive(Debug, thiserror::Error, PartialEq)] +pub enum MvStoreError { + /// `push_committed` got a version whose `begin` is an in-flight + /// `TxId` rather than a committed `Timestamp`. + #[error("push_committed expects a committed Timestamp, not an in-flight TxId")] + NotCommitted, + + /// The previous latest version is already capped at a different + /// timestamp. Either the caller is double-committing, or the + /// commit path is racing with itself (which 11.4's commit-validation + /// loop is supposed to prevent). + #[error("previous latest version already capped at end_ts={existing}")] + PreviousAlreadyCapped { existing: u64 }, + + /// The previous latest's `end` is set to an in-flight cap. v0 + /// rejects rather than silently overwriting; 11.4's commit + /// validation runs first so this shouldn't fire in production. + #[error("previous latest version is being capped by an in-flight transaction")] + PreviousCappedByInFlight, + + /// New version's `begin` is not strictly greater than the + /// previous latest's `begin`. The clock should always hand out + /// monotonically increasing timestamps; this is a corruption / + /// bug indicator. + #[error("non-monotonic begin: previous={prev}, new={new}")] + NonMonotonicBegin { prev: u64, new: u64 }, +} + +#[cfg(test)] +mod tests { + use super::*; + + fn payload(value: i64) -> VersionPayload { + VersionPayload::Present(vec![("v".to_string(), Value::Integer(value))]) + } + + #[test] + fn empty_store_returns_none() { + let (store, _clock) = MvStore::fresh(); + assert!(store.read(&RowID::new("t", 1), 100).is_none()); + assert_eq!(store.tracked_rows(), 0); + assert_eq!(store.total_versions(), 0); + } + + /// Snapshot isolation visibility — the headline rule. One row + /// gets two committed versions at different timestamps; readers + /// at varying `begin_ts` see exactly the version that satisfies + /// `begin <= T < end`. + #[test] + fn visibility_picks_the_right_version_for_each_begin_ts() { + let (store, clock) = MvStore::fresh(); + let row = RowID::new("accounts", 1); + + // V1 committed at ts=5, V2 committed at ts=10. + clock.observe(5); + store + .push_committed(row.clone(), RowVersion::committed(5, payload(100))) + .unwrap(); + clock.observe(10); + store + .push_committed(row.clone(), RowVersion::committed(10, payload(200))) + .unwrap(); + + // Reader before V1 — nothing visible. + assert_eq!(store.read(&row, 4), None); + + // Reader at exactly V1's begin — sees V1. + assert_eq!(store.read(&row, 5), Some(payload(100))); + + // Reader between V1 and V2 — still sees V1 (V2's begin > T). + assert_eq!(store.read(&row, 9), Some(payload(100))); + + // Reader at exactly V2's begin — sees V2. + assert_eq!(store.read(&row, 10), Some(payload(200))); + + // Reader past V2 — sees V2. + assert_eq!(store.read(&row, 1_000), Some(payload(200))); + } + + /// `push_committed` caps the previous latest version's `end` at + /// the new version's `begin`. Without this, every version's + /// `end` would stay None and the visibility rule would return + /// the oldest committed version for every reader. + #[test] + fn push_committed_caps_previous_latest() { + let (store, _clock) = MvStore::fresh(); + let row = RowID::new("t", 7); + store + .push_committed(row.clone(), RowVersion::committed(2, payload(1))) + .unwrap(); + store + .push_committed(row.clone(), RowVersion::committed(5, payload(2))) + .unwrap(); + // Inspect the chain through the public API. A reader at + // exactly ts=4 should see V1 — that's only correct if V1's + // end was set to Some(Timestamp(5)). + assert_eq!(store.read(&row, 4), Some(payload(1))); + } + + /// The visibility helper is pure; test it independently of + /// the chain to lock down the rule. + #[test] + fn visible_at_handles_each_combination() { + // Committed begin, no end — visible iff T >= begin. + let v = RowVersion { + begin: TxTimestampOrId::Timestamp(10), + end: None, + payload: payload(0), + }; + assert!(!MvStore::visible_at(&v, 9)); + assert!(MvStore::visible_at(&v, 10)); + assert!(MvStore::visible_at(&v, 1_000)); + + // Committed begin + committed end — visible iff begin <= T < end. + let v = RowVersion { + begin: TxTimestampOrId::Timestamp(10), + end: Some(TxTimestampOrId::Timestamp(20)), + payload: payload(0), + }; + assert!(!MvStore::visible_at(&v, 9)); + assert!(MvStore::visible_at(&v, 10)); + assert!(MvStore::visible_at(&v, 19)); + assert!(!MvStore::visible_at(&v, 20)); + + // In-flight begin — invisible to outside readers regardless + // of `end`. + let v = RowVersion { + begin: TxTimestampOrId::Id(super::super::TxId(42)), + end: None, + payload: payload(0), + }; + assert!(!MvStore::visible_at(&v, 0)); + assert!(!MvStore::visible_at(&v, 1_000)); + + // In-flight cap on an otherwise-visible version — still + // visible (the supersession isn't committed yet). + let v = RowVersion { + begin: TxTimestampOrId::Timestamp(5), + end: Some(TxTimestampOrId::Id(super::super::TxId(42))), + payload: payload(0), + }; + assert!(MvStore::visible_at(&v, 10)); + assert!(!MvStore::visible_at(&v, 4)); // begin > T + } + + /// Tombstone semantics: deleting the row creates a Tombstone + /// version. Readers older than the delete still see the value + /// from the previous version; readers at or after the delete + /// see "no row" (the tombstone payload). + #[test] + fn tombstone_versions_capture_the_delete() { + let (store, _clock) = MvStore::fresh(); + let row = RowID::new("t", 1); + store + .push_committed(row.clone(), RowVersion::committed(1, payload(42))) + .unwrap(); + store + .push_committed( + row.clone(), + RowVersion::committed(5, VersionPayload::Tombstone), + ) + .unwrap(); + + assert_eq!(store.read(&row, 1), Some(payload(42))); + assert_eq!(store.read(&row, 4), Some(payload(42))); + assert_eq!(store.read(&row, 5), Some(VersionPayload::Tombstone)); + assert_eq!(store.read(&row, 100), Some(VersionPayload::Tombstone)); + } + + #[test] + fn push_committed_rejects_in_flight_begin() { + let (store, _clock) = MvStore::fresh(); + let v = RowVersion::in_flight(super::super::TxId(7), payload(0)); + let err = store + .push_committed(RowID::new("t", 1), v) + .expect_err("in-flight begin must be rejected"); + assert_eq!(err, MvStoreError::NotCommitted); + } + + #[test] + fn push_committed_rejects_non_monotonic_begin() { + let (store, _clock) = MvStore::fresh(); + let row = RowID::new("t", 1); + store + .push_committed(row.clone(), RowVersion::committed(10, payload(1))) + .unwrap(); + let err = store + .push_committed(row.clone(), RowVersion::committed(10, payload(2))) + .expect_err("equal begin should be rejected"); + assert!(matches!(err, MvStoreError::NonMonotonicBegin { .. })); + let err = store + .push_committed(row.clone(), RowVersion::committed(5, payload(2))) + .expect_err("backward begin should be rejected"); + assert!(matches!(err, MvStoreError::NonMonotonicBegin { .. })); + } + + /// In-flight versions don't appear to other readers — the + /// snapshot-isolation contract Phase 11.4 relies on. Other + /// readers see the previously-committed version (or None if + /// the chain is empty otherwise). + #[test] + fn in_flight_versions_are_invisible_to_other_readers() { + let (store, _clock) = MvStore::fresh(); + let row = RowID::new("t", 1); + store + .push_committed(row.clone(), RowVersion::committed(5, payload(100))) + .unwrap(); + // Simulate an in-flight write at a higher (uncommitted) + // timestamp via a fresh TxId. Reader at any begin_ts must + // still see V1. + store.push_in_flight( + row.clone(), + RowVersion::in_flight(super::super::TxId(99), payload(200)), + ); + assert_eq!(store.read(&row, 5), Some(payload(100))); + assert_eq!(store.read(&row, 1_000), Some(payload(100))); + } + + /// Tracked-row + version counters reflect the chain shape. + /// Cheap sanity test that 11.6's GC will rely on once it lands. + #[test] + fn tracked_rows_and_total_versions_are_accurate() { + let (store, _clock) = MvStore::fresh(); + store + .push_committed(RowID::new("a", 1), RowVersion::committed(1, payload(0))) + .unwrap(); + store + .push_committed(RowID::new("a", 1), RowVersion::committed(2, payload(0))) + .unwrap(); + store + .push_committed(RowID::new("a", 2), RowVersion::committed(1, payload(0))) + .unwrap(); + store + .push_committed(RowID::new("b", 1), RowVersion::committed(1, payload(0))) + .unwrap(); + assert_eq!(store.tracked_rows(), 3); + assert_eq!(store.total_versions(), 4); + } + + #[test] + fn store_is_send_and_sync() { + fn assert_send() {} + fn assert_sync() {} + assert_send::(); + assert_sync::(); + } + + /// Concurrent readers walking different chains must not block + /// each other — that's the reason for the per-chain `RwLock` + /// rather than one big `Mutex`. Smoke test: many + /// threads read concurrently and must all see the right + /// versions. + #[test] + fn concurrent_reads_see_consistent_snapshots() { + use std::thread; + + let (store, _clock) = MvStore::fresh(); + for rid in 0..32 { + let row = RowID::new("t", rid); + store + .push_committed(row.clone(), RowVersion::committed(1, payload(rid))) + .unwrap(); + store + .push_committed(row, RowVersion::committed(10, payload(rid * 100))) + .unwrap(); + } + + let store_arc = Arc::new(store); + let handles: Vec<_> = (0..8) + .map(|_| { + let s = Arc::clone(&store_arc); + thread::spawn(move || { + for _ in 0..500 { + for rid in 0..32 { + let row = RowID::new("t", rid); + // Pre-supersession: V1 visible. + assert_eq!(s.read(&row, 5), Some(payload(rid))); + // Post-supersession: V2 visible. + assert_eq!(s.read(&row, 100), Some(payload(rid * 100))); + } + } + }) + }) + .collect(); + + for h in handles { + h.join().unwrap(); + } + } + + /// The store's clock is the same `Arc` callers handed in — a + /// later 11.3 wiring change in `Database` relies on this. + #[test] + fn store_shares_caller_clock() { + let clock = Arc::new(MvccClock::new(42)); + let store = MvStore::new(Arc::clone(&clock)); + assert_eq!(store.clock().now(), 42); + clock.tick(); // clock.tick now == 43 + assert_eq!(store.clock().now(), 43); + } +} diff --git a/src/sql/db/database.rs b/src/sql/db/database.rs index f5db61d..ee179dc 100644 --- a/src/sql/db/database.rs +++ b/src/sql/db/database.rs @@ -1,8 +1,10 @@ use crate::error::{Result, SQLRiteError}; +use crate::mvcc::{JournalMode, MvStore, MvccClock}; use crate::sql::db::table::Table; use crate::sql::pager::pager::{AccessMode, Pager}; use std::collections::HashMap; use std::path::PathBuf; +use std::sync::Arc; /// Snapshot of the mutable in-memory state taken at `BEGIN` time so /// `ROLLBACK` can restore it. See `begin_transaction`, `rollback_transaction`. @@ -51,6 +53,28 @@ pub struct Database { /// (SQLite parity at 25%). Per-connection runtime state — not /// persisted across reopens. pub auto_vacuum_threshold: Option, + /// Phase 11.3 — current journal mode for the database. Default + /// is [`JournalMode::Wal`] (every pre-Phase-11 caller). Toggled + /// by `PRAGMA journal_mode = mvcc | wal`. The setting is + /// per-database (every `Connection` to this `Database` observes + /// the same value) — see the open question in + /// [`docs/concurrent-writes-plan.md`](../../../docs/concurrent-writes-plan.md) + /// §8 for the per-connection vs. per-database trade-off; v0 + /// picked per-database for simplicity. + pub journal_mode: JournalMode, + /// Phase 11.3 — process-wide MVCC clock. Shared between every + /// `Connection` to this `Database` (and 11.4's `MvStore`). + /// Seeded from the WAL header's `clock_high_water` at open + /// time so timestamps don't repeat across reopens. Allocated + /// here even in `JournalMode::Wal` so `PRAGMA journal_mode = + /// mvcc` doesn't require lazy-creating the clock. + pub mvcc_clock: Arc, + /// Phase 11.3 — in-memory version index. Allocated on every + /// `Database::new` so the toggle to MVCC mode doesn't require + /// a re-init step. Empty until 11.4 wires the commit path to + /// publish row versions; reads still go through the legacy + /// path until then. + pub mv_store: MvStore, } impl Database { @@ -63,6 +87,8 @@ impl Database { /// let mut db = Database::new("my_db".to_string()); /// ``` pub fn new(db_name: String) -> Self { + let mvcc_clock = Arc::new(MvccClock::new(0)); + let mv_store = MvStore::new(Arc::clone(&mvcc_clock)); Database { db_name, tables: HashMap::new(), @@ -70,9 +96,60 @@ impl Database { pager: None, txn: None, auto_vacuum_threshold: Some(DEFAULT_AUTO_VACUUM_THRESHOLD), + journal_mode: JournalMode::default(), + mvcc_clock, + mv_store, } } + /// Phase 11.3 — current journal mode. Toggled by `PRAGMA + /// journal_mode = mvcc | wal`. `Wal` (the default) keeps every + /// pre-Phase-11 caller's behaviour; `Mvcc` opts the database + /// into MVCC + `BEGIN CONCURRENT` (Phase 11.4 wires this end-to- + /// end; today the toggle is observable but the read/write + /// paths don't change). + pub fn journal_mode(&self) -> JournalMode { + self.journal_mode + } + + /// Phase 11.3 — switch the database's journal mode. `Wal → Mvcc` + /// is unconditional in v0 (no in-flight transactions to drain + /// because nothing publishes versions yet). `Mvcc → Wal` is + /// rejected if `mv_store` carries any committed versions — + /// switching back would silently strand them. v0 keeps this + /// strict; the loosening (and the discard-versions path) lands + /// when 11.4 starts populating the store. + pub fn set_journal_mode(&mut self, mode: JournalMode) -> Result<()> { + if self.journal_mode == mode { + return Ok(()); + } + if mode == JournalMode::Wal && self.mv_store.total_versions() > 0 { + return Err(SQLRiteError::General( + "PRAGMA journal_mode: cannot switch back to 'wal' while \ + the MVCC store holds committed versions" + .to_string(), + )); + } + self.journal_mode = mode; + Ok(()) + } + + /// Phase 11.3 — the shared MVCC logical clock. Returned by + /// reference (not cloned) because callers typically just read + /// `now()` / `tick()` against the same `Arc` `Database` already + /// holds. + pub fn mvcc_clock(&self) -> &Arc { + &self.mvcc_clock + } + + /// Phase 11.3 — the in-memory version index. Read-only access + /// is enough for 11.3's tests; 11.4 grows the commit-path + /// helpers into typed methods on `Database` rather than mutating + /// this directly. + pub fn mv_store(&self) -> &MvStore { + &self.mv_store + } + /// Returns the current auto-VACUUM threshold, or `None` if disabled. /// See [`Database::set_auto_vacuum_threshold`] for semantics. pub fn auto_vacuum_threshold(&self) -> Option { diff --git a/src/sql/pager/mod.rs b/src/sql/pager/mod.rs index ae6eea1..2b4e9fd 100644 --- a/src/sql/pager/mod.rs +++ b/src/sql/pager/mod.rs @@ -3970,10 +3970,12 @@ mod tests { /// Pragmas SQLRite doesn't know about return `NotImplemented` — /// not a generic parser error. Future pragmas plug in here. + /// (Phase 11.3 made `journal_mode` a recognised pragma; this + /// test uses a name that's still unsupported.) #[test] fn pragma_unknown_returns_not_implemented() { let mut db = Database::new("t".to_string()); - let err = process_command("PRAGMA journal_mode = WAL;", &mut db).unwrap_err(); + let err = process_command("PRAGMA synchronous = NORMAL;", &mut db).unwrap_err(); assert!( matches!(err, SQLRiteError::NotImplemented(_)), "unknown pragma must surface NotImplemented, got: {err:?}" diff --git a/src/sql/pragma.rs b/src/sql/pragma.rs index 957708a..3a1b69d 100644 --- a/src/sql/pragma.rs +++ b/src/sql/pragma.rs @@ -22,6 +22,7 @@ use sqlparser::keywords::Keyword; use sqlparser::tokenizer::{Token, Tokenizer}; use crate::error::{Result, SQLRiteError}; +use crate::mvcc::JournalMode; use crate::sql::CommandOutput; use crate::sql::db::database::Database; @@ -193,12 +194,57 @@ where pub fn execute_pragma(stmt: PragmaStatement, db: &mut Database) -> Result { match stmt.name.to_ascii_lowercase().as_str() { "auto_vacuum" => pragma_auto_vacuum(stmt.value, db), + "journal_mode" => pragma_journal_mode(stmt.value, db), other => Err(SQLRiteError::NotImplemented(format!( "PRAGMA '{other}' is not supported" ))), } } +/// `PRAGMA journal_mode;` (read) or `PRAGMA journal_mode = wal | mvcc;` +/// (write). Phase 11.3 — the toggle is observable but doesn't change +/// query behaviour yet; 11.4 wires `Mvcc` mode into the read/write +/// paths. The set form returns the new mode (SQLite parity); the +/// read form returns the current mode. +fn pragma_journal_mode(value: Option, db: &mut Database) -> Result { + match value { + None => render_journal_mode(db.journal_mode()), + Some(v) => { + let target = parse_journal_mode_target(&v)?; + db.set_journal_mode(target)?; + // SQLite renders the post-set mode as a result row; + // mirror that so callers can confirm the toggle landed. + render_journal_mode(db.journal_mode()) + } + } +} + +fn render_journal_mode(mode: JournalMode) -> Result { + let mut t = PrintTable::new(); + t.add_row(PrintRow::new(vec![PrintCell::new("journal_mode")])); + t.add_row(PrintRow::new(vec![PrintCell::new(mode.as_str())])); + Ok(CommandOutput { + status: "PRAGMA journal_mode executed. 1 row returned.".to_string(), + rendered: Some(t.to_string()), + }) +} + +fn parse_journal_mode_target(value: &PragmaValue) -> Result { + let s = match value { + PragmaValue::Identifier(s) | PragmaValue::String(s) => s.as_str(), + PragmaValue::Number(s) => { + return Err(SQLRiteError::General(format!( + "PRAGMA journal_mode: expected 'wal' or 'mvcc', got numeric '{s}'" + ))); + } + }; + JournalMode::from_str_lossless(s).ok_or_else(|| { + SQLRiteError::General(format!( + "PRAGMA journal_mode: unknown mode '{s}' (supported: 'wal', 'mvcc')" + )) + }) +} + /// `PRAGMA auto_vacuum;` (read) or `PRAGMA auto_vacuum = N | OFF | NONE;` /// (write). Reuses [`Database::set_auto_vacuum_threshold`] so the range /// validation lives in exactly one place. @@ -417,10 +463,12 @@ mod tests { #[test] fn execute_pragma_unknown_returns_not_implemented() { + // `journal_mode` was the canary unknown pragma here before + // Phase 11.3 added it. Use a name that's still unsupported. let mut db = Database::new("t".to_string()); let err = execute_pragma( PragmaStatement { - name: "journal_mode".to_string(), + name: "synchronous".to_string(), value: None, }, &mut db,