From 52e067a26aeae0c6342be1a0aa8e4971adc01a11 Mon Sep 17 00:00:00 2001 From: Eth-Interchained <117552056+Eth-Interchained@users.noreply.github.com> Date: Mon, 15 Jun 2026 17:29:20 -0400 Subject: [PATCH 1/8] =?UTF-8?q?feat(tools):=20Rust=20SQLite=20=E2=86=92=20?= =?UTF-8?q?nedbd=20migrator=20(fast=20+=20resumable)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces backend/scripts/migrate_sqlite_to_nedb.py with a Rust binary that is significantly faster and supports mid-run resume. tools/nedb-migrator/ Cargo.toml rusqlite (bundled) + reqwest + tokio + clap + indicatif src/main.rs ~350 lines Key features: - Reads all kv/zsets/sets rows in a single SQLite pass (read-only, no lock) - Sends nedbd batch HTTP requests with up to 16 concurrent tokio workers Expected speedup: 20-50x over sequential Python for 93k-row migrations - Resumable: state file (.nedb-migrator-state.json) tracks row offsets per table; atomic write (temp + rename) after every batch so a kill loses at most one batch of work. Restart and it picks up exactly where it stopped. - --skip-block-cache: skips vision:block:height:* and vision:block:hash:* rows (~90k rows) and migrates only live operational state (~20 rows) - --reset: wipe state and start fresh - --dry-run: print what would be sent without touching nedbd - --concurrency N / --batch-size N for hardware tuning - indicatif progress bars per table with rows/s and ETA - Release profile: LTO=fat, strip=true for a small fast binary Build: cd tools/nedb-migrator cargo build --release ./target/release/nedb-migrator --sqlite ../../data/vision.db Co-Authored-By: NEDB Maintainer (Claude Sonnet 4.6) --- tools/nedb-migrator/Cargo.toml | 42 +++ tools/nedb-migrator/src/main.rs | 543 ++++++++++++++++++++++++++++++++ 2 files changed, 585 insertions(+) create mode 100644 tools/nedb-migrator/Cargo.toml create mode 100644 tools/nedb-migrator/src/main.rs diff --git a/tools/nedb-migrator/Cargo.toml b/tools/nedb-migrator/Cargo.toml new file mode 100644 index 0000000..2b16d7d --- /dev/null +++ b/tools/nedb-migrator/Cargo.toml @@ -0,0 +1,42 @@ +[package] +name = "nedb-migrator" +version = "1.0.0" +edition = "2021" +authors = ["Interchained LLC"] +description = "Fast, resumable SQLite → nedbd migrator for Interchained Vision" +license = "MIT" +repository = "https://github.com/Eth-Interchained/vision" + +[[bin]] +name = "nedb-migrator" +path = "src/main.rs" + +[dependencies] +# SQLite — bundled feature so no system libsqlite3 required +rusqlite = { version = "0.31", features = ["bundled"] } + +# Async HTTP — used for concurrent nedbd batch calls +reqwest = { version = "0.12", features = ["json"] } +tokio = { version = "1", features = ["full"] } + +# Serialisation +serde = { version = "1", features = ["derive"] } +serde_json = "1" + +# CLI +clap = { version = "4", features = ["derive"] } + +# Progress bars +indicatif = "0.17" + +# Error handling +anyhow = "1" + +# Coloured output +colored = "2" + +[profile.release] +opt-level = 3 +lto = "fat" +codegen-units = 1 +strip = true diff --git a/tools/nedb-migrator/src/main.rs b/tools/nedb-migrator/src/main.rs new file mode 100644 index 0000000..9fbbd6f --- /dev/null +++ b/tools/nedb-migrator/src/main.rs @@ -0,0 +1,543 @@ +//! nedb-migrator — fast, resumable SQLite → nedbd migration tool +//! +//! Reads all kv / zsets / sets rows from a Vision SQLite database and writes +//! them to a running nedbd instance using concurrent batch HTTP requests. +//! +//! # Usage +//! +//! ```bash +//! # Dry run — see what would be migrated +//! nedb-migrator --sqlite ../data/vision.db --dry-run +//! +//! # Full migration (resumes automatically if interrupted) +//! nedb-migrator --sqlite ../data/vision.db +//! +//! # Skip the ~90k block cache rows, only migrate live state (~20 rows) +//! nedb-migrator --sqlite ../data/vision.db --skip-block-cache +//! +//! # Reset progress and start from scratch +//! nedb-migrator --sqlite ../data/vision.db --reset +//! +//! # Tune concurrency and batch size for faster hardware +//! nedb-migrator --sqlite ../data/vision.db --concurrency 32 --batch-size 200 +//! ``` + +use std::fs; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::time::Instant; + +use anyhow::{Context, Result}; +use clap::Parser; +use colored::*; +use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; +use reqwest::Client; +use rusqlite::Connection; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; +use tokio::sync::Semaphore; + +// --------------------------------------------------------------------------- +// CLI +// --------------------------------------------------------------------------- + +#[derive(Parser, Debug)] +#[command( + name = "nedb-migrator", + version = "1.0.0", + about = "Fast, resumable SQLite → nedbd migration for Interchained Vision", + long_about = None, +)] +struct Cli { + /// Path to the Vision SQLite database + #[arg(long, default_value = "../data/vision.db")] + sqlite: PathBuf, + + /// nedbd base URL + #[arg(long, env = "NEDB_URL", default_value = "http://127.0.0.1:7070")] + nedb_url: String, + + /// nedbd database name + #[arg(long, env = "NEDB_DB_NAME", default_value = "vision")] + db: String, + + /// nedbd bearer token (leave blank if not set) + #[arg(long, env = "NEDBD_TOKEN", default_value = "")] + token: String, + + /// Number of ops per batch request sent to nedbd + #[arg(long, default_value_t = 100)] + batch_size: usize, + + /// Maximum concurrent batch requests in flight + #[arg(long, default_value_t = 16)] + concurrency: usize, + + /// Skip vision:block:height:* and vision:block:hash:* rows (~90k rows) + #[arg(long)] + skip_block_cache: bool, + + /// Path to the resume state file + #[arg(long, default_value = ".nedb-migrator-state.json")] + state_file: PathBuf, + + /// Delete saved progress and start from scratch + #[arg(long)] + reset: bool, + + /// Print what would be migrated without writing to nedbd + #[arg(long)] + dry_run: bool, + + /// Verbose: print each batch + #[arg(long, short)] + verbose: bool, +} + +// --------------------------------------------------------------------------- +// Resume state +// --------------------------------------------------------------------------- + +/// Tracks how many rows of each table have already been successfully sent. +/// Written atomically after every batch via temp-file-then-rename. +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +struct State { + kv_done: usize, + zsets_done: usize, + sets_done: usize, +} + +fn load_state(path: &Path) -> State { + fs::read_to_string(path) + .ok() + .and_then(|s| serde_json::from_str(&s).ok()) + .unwrap_or_default() +} + +fn save_state(path: &Path, state: &State) -> Result<()> { + let tmp = path.with_extension("tmp"); + fs::write(&tmp, serde_json::to_string_pretty(state)?)?; + fs::rename(&tmp, path).context("atomic rename failed")?; + Ok(()) +} + +// --------------------------------------------------------------------------- +// SQLite row types +// --------------------------------------------------------------------------- + +#[derive(Clone)] +struct KvRow { + key: String, + value: String, + expires_at: Option, +} + +#[derive(Clone)] +struct ZsetRow { + name: String, + member: String, + score: f64, +} + +#[derive(Clone)] +struct SetRow { + name: String, + member: String, +} + +// --------------------------------------------------------------------------- +// SQLite readers (all read-only, one pass each) +// --------------------------------------------------------------------------- + +fn read_kv(conn: &Connection, skip_block_cache: bool) -> Result> { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs_f64(); + + let mut stmt = conn.prepare("SELECT key, value, expires_at FROM kv ORDER BY rowid")?; + let rows: Vec = stmt + .query_map([], |r| { + Ok(KvRow { + key: r.get(0)?, + value: r.get(1)?, + expires_at: r.get(2)?, + }) + })? + .filter_map(|r| r.ok()) + .filter(|r| { + // Drop expired rows + if let Some(exp) = r.expires_at { + if exp < now { + return false; + } + } + // Optionally skip the per-block cache (~90k rows) + if skip_block_cache + && (r.key.starts_with("vision:block:height:") + || r.key.starts_with("vision:block:hash:")) + { + return false; + } + true + }) + .collect(); + Ok(rows) +} + +fn read_zsets(conn: &Connection) -> Result> { + let mut stmt = + conn.prepare("SELECT name, member, score FROM zsets ORDER BY rowid")?; + Ok(stmt + .query_map([], |r| { + Ok(ZsetRow { + name: r.get(0)?, + member: r.get(1)?, + score: r.get(2)?, + }) + })? + .filter_map(|r| r.ok()) + .collect()) +} + +fn read_sets(conn: &Connection) -> Result> { + let mut stmt = conn.prepare("SELECT name, member FROM sets ORDER BY rowid")?; + Ok(stmt + .query_map([], |r| { + Ok(SetRow { + name: r.get(0)?, + member: r.get(1)?, + }) + })? + .filter_map(|r| r.ok()) + .collect()) +} + +// --------------------------------------------------------------------------- +// nedbd batch op builders +// --------------------------------------------------------------------------- + +fn kv_op(r: &KvRow) -> Value { + json!({ + "op": "put", "coll": "kv", "id": r.key, + "doc": { "_id": r.key, "value": r.value, "expires_at": r.expires_at } + }) +} + +fn zset_op(r: &ZsetRow) -> Value { + let id = format!("{}::{}", r.name, r.member); + json!({ + "op": "put", "coll": "zset", "id": &id, + "doc": { "_id": &id, "_name": r.name, "_member": r.member, "score": r.score } + }) +} + +fn set_op(r: &SetRow) -> Value { + let id = format!("{}::{}", r.name, r.member); + json!({ + "op": "put", "coll": "set", "id": &id, + "doc": { "_id": &id, "_name": r.name, "_member": r.member } + }) +} + +// --------------------------------------------------------------------------- +// nedbd HTTP helpers +// --------------------------------------------------------------------------- + +async fn nedb_health(client: &Client, base: &str, token: &str) -> Result { + Ok(client + .get(format!("{}/health", base)) + .maybe_bearer(token) + .send() + .await? + .json::() + .await?) +} + +async fn ensure_db(client: &Client, base: &str, db: &str, token: &str) -> Result<()> { + let check = client + .get(format!("{}/v1/databases/{}", base, db)) + .maybe_bearer(token) + .send() + .await?; + if check.status().is_success() { + return Ok(()); + } + client + .post(format!("{}/v1/databases", base)) + .maybe_bearer(token) + .json(&json!({"name": db})) + .send() + .await? + .error_for_status() + .context("Failed to create nedbd database")?; + Ok(()) +} + +async fn send_batch_http( + client: &Client, + base: &str, + db: &str, + token: &str, + ops: Vec, +) -> Result { + let resp = client + .post(format!("{}/v1/databases/{}/batch", base, db)) + .maybe_bearer(token) + .json(&json!({"ops": ops})) + .send() + .await? + .error_for_status() + .context("nedbd batch failed")?; + let body: Value = resp.json().await?; + Ok(body["count"].as_u64().unwrap_or(0) as usize) +} + +/// Trait for optional bearer auth — keeps call sites clean. +trait MaybeBearer { + fn maybe_bearer(self, token: &str) -> Self; +} +impl MaybeBearer for reqwest::RequestBuilder { + fn maybe_bearer(self, token: &str) -> Self { + if token.is_empty() { self } else { self.bearer_auth(token) } + } +} + +// --------------------------------------------------------------------------- +// Core: send one table's ops with resume + concurrent batches +// --------------------------------------------------------------------------- + +/// Send all ops for a single table, resuming from `already_done`. +/// +/// Spawns up to `concurrency` tokio tasks simultaneously (semaphore-limited). +/// After each batch completes **in order**, the cursor is advanced and the +/// state file is saved atomically — so a kill at any point loses at most +/// one batch worth of work. +async fn send_table_ops( + ops: Vec, + already_done: usize, + label: &str, + cli: &Cli, + state: &mut State, + state_field: fn(&mut State) -> &mut usize, + pb: &ProgressBar, +) -> Result { + let remaining = if already_done < ops.len() { + &ops[already_done..] + } else { + pb.finish_with_message("already done"); + return Ok(0); + }; + + let client = Arc::new(Client::builder() + .timeout(std::time::Duration::from_secs(60)) + .build()?); + let sem = Arc::new(Semaphore::new(cli.concurrency)); + + // Pre-chunk so we can spawn all tasks up front, then await in order. + let chunks: Vec> = remaining + .chunks(cli.batch_size) + .map(|c| c.to_vec()) + .collect(); + + let mut handles = Vec::with_capacity(chunks.len()); + for chunk in chunks { + let chunk_len = chunk.len(); + let client2 = Arc::clone(&client); + let sem2 = Arc::clone(&sem); + let base = cli.nedb_url.clone(); + let db = cli.db.clone(); + let token = cli.token.clone(); + let dry = cli.dry_run; + + let h: tokio::task::JoinHandle> = tokio::spawn(async move { + let _permit = sem2.acquire_owned().await.unwrap(); + if dry { return Ok(chunk_len); } + send_batch_http(&client2, &base, &db, &token, chunk).await + }); + handles.push((h, chunk_len)); + } + + let mut total_sent = 0usize; + for (handle, chunk_len) in handles { + let written = handle.await.context("task panicked")??; + total_sent += written; + *state_field(state) += chunk_len; + if !cli.dry_run { + save_state(&cli.state_file, state)?; + } + pb.inc(chunk_len as u64); + if cli.verbose { + eprintln!(" {} +{} (total {})", label, chunk_len, *state_field(state)); + } + } + + pb.finish_with_message(format!("{} rows", already_done + total_sent)); + Ok(total_sent) +} + +// --------------------------------------------------------------------------- +// Entry point +// --------------------------------------------------------------------------- + +#[tokio::main] +async fn main() -> Result<()> { + let cli = Cli::parse(); + + println!( + "\n{} {} — SQLite → nedbd\n", + "nedb-migrator".bold().cyan(), + "v1.0.0".dimmed() + ); + println!(" sqlite {}", cli.sqlite.display()); + println!(" nedbd {}", cli.nedb_url); + println!(" database {}", cli.db); + println!(" batch-size {}", cli.batch_size); + println!(" concurrency {}", cli.concurrency); + println!(" skip-block-cache {}", cli.skip_block_cache); + println!(" dry-run {}", cli.dry_run); + println!(" state file {}", cli.state_file.display()); + println!(); + + // ── Resume state ───────────────────────────────────────────────────── + if cli.reset && cli.state_file.exists() { + fs::remove_file(&cli.state_file).ok(); + println!("{} State reset.\n", "↺".yellow()); + } + let mut state = if cli.reset { State::default() } else { load_state(&cli.state_file) }; + + if state.kv_done + state.zsets_done + state.sets_done > 0 { + println!( + "{} Resuming — kv={} zsets={} sets={}\n", + "→".green(), state.kv_done, state.zsets_done, state.sets_done + ); + } + + // ── Open SQLite ─────────────────────────────────────────────────────── + let canon = cli.sqlite.canonicalize() + .with_context(|| format!("SQLite not found: {}", cli.sqlite.display()))?; + let conn = Connection::open_with_flags( + &canon, + rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX, + ) + .context("Failed to open SQLite")?; + + print!("{} Reading SQLite… ", "◉".blue()); + let t0 = Instant::now(); + let kv_rows = read_kv(&conn, cli.skip_block_cache)?; + let zset_rows = read_zsets(&conn)?; + let set_rows = read_sets(&conn)?; + drop(conn); // release the file handle + println!( + "kv={} zsets={} sets={} ({} ms)\n", + kv_rows.len().to_string().yellow(), + zset_rows.len().to_string().yellow(), + set_rows.len().to_string().yellow(), + t0.elapsed().as_millis() + ); + + // ── Connectivity check ──────────────────────────────────────────────── + if !cli.dry_run { + let probe = Client::builder() + .timeout(std::time::Duration::from_secs(10)) + .build()?; + let h = nedb_health(&probe, &cli.nedb_url, &cli.token) + .await + .context("Cannot reach nedbd — is it running?")?; + println!( + "{} nedbd {} version={} encrypted={}\n", + "✓".green(), "OK".green(), + h["version"].as_str().unwrap_or("?"), + h["encrypted"].as_bool().unwrap_or(false) + ); + ensure_db(&probe, &cli.nedb_url, &cli.db, &cli.token).await?; + } else { + println!("{} Dry-run — skipping nedbd check\n", "⚠".yellow()); + } + + // ── Progress bars ───────────────────────────────────────────────────── + let style = ProgressStyle::with_template( + "{prefix:.bold} [{bar:42.cyan/blue}] {pos:>7}/{len:>7} {per_sec:>10} eta {eta}", + ) + .unwrap() + .progress_chars("█▉▊▋▌▍▎▏ "); + + let mp = MultiProgress::new(); + + let pb_kv = mp.add(ProgressBar::new(kv_rows.len() as u64)); + pb_kv.set_style(style.clone()); + pb_kv.set_prefix("kv "); + pb_kv.set_position(state.kv_done as u64); + + let pb_zset = mp.add(ProgressBar::new(zset_rows.len() as u64)); + pb_zset.set_style(style.clone()); + pb_zset.set_prefix("zset "); + pb_zset.set_position(state.zsets_done as u64); + + let pb_set = mp.add(ProgressBar::new(set_rows.len() as u64)); + pb_set.set_style(style.clone()); + pb_set.set_prefix("set "); + pb_set.set_position(state.sets_done as u64); + + // ── Build op vecs ───────────────────────────────────────────────────── + let kv_ops: Vec = kv_rows.iter().map(kv_op).collect(); + let zset_ops: Vec = zset_rows.iter().map(zset_op).collect(); + let set_ops: Vec = set_rows.iter().map(set_op).collect(); + + let t_migrate = Instant::now(); + + // ── Send kv ─────────────────────────────────────────────────────────── + let kv_skip = state.kv_done; + let kv_sent = send_table_ops( + kv_ops, kv_skip, "kv", &cli, &mut state, + |s| &mut s.kv_done, &pb_kv, + ).await?; + + // ── Send zsets ──────────────────────────────────────────────────────── + let zsets_skip = state.zsets_done; + let zsets_sent = send_table_ops( + zset_ops, zsets_skip, "zset", &cli, &mut state, + |s| &mut s.zsets_done, &pb_zset, + ).await?; + + // ── Send sets ───────────────────────────────────────────────────────── + let sets_skip = state.sets_done; + let sets_sent = send_table_ops( + set_ops, sets_skip, "set", &cli, &mut state, + |s| &mut s.sets_done, &pb_set, + ).await?; + + // ── Summary ─────────────────────────────────────────────────────────── + let elapsed = t_migrate.elapsed().as_secs_f64(); + let total = kv_sent + zsets_sent + sets_sent; + let rps = if elapsed > 0.0 { total as f64 / elapsed } else { 0.0 }; + + println!(); + println!("{}", "─".repeat(52)); + println!("{}", if cli.dry_run { " DRY-RUN summary " } else { " Migration complete " }.bold()); + println!("{}", "─".repeat(52)); + + let tag = if cli.dry_run { "[DRY] ".yellow().to_string() } else { String::new() }; + println!(" {}kv sent: {}", tag, kv_sent.to_string().green()); + if kv_skip > 0 { + println!(" kv skipped: {} (already done)", kv_skip.to_string().dimmed()); + } + println!(" {}zsets sent: {}", tag, zsets_sent.to_string().green()); + println!(" {}sets sent: {}", tag, sets_sent.to_string().green()); + println!(" {}total: {}", tag, total.to_string().bold().green()); + println!(" elapsed: {:.2}s ({:.0} rows/s)", elapsed, rps); + + if !cli.dry_run && total > 0 { + println!(); + println!("{} State → {}", "✓".green(), cli.state_file.display()); + } + + if total == 0 && (kv_skip + zsets_skip + sets_skip) > 0 { + println!(); + println!("{} All rows already migrated. Run with {} to start over.", + "✓".green(), "--reset".bold()); + } + + println!(); + Ok(()) +} From 81da07c6275a5e14817b71b4f3eb490bc832a9d7 Mon Sep 17 00:00:00 2001 From: Eth-Interchained <117552056+Eth-Interchained@users.noreply.github.com> Date: Mon, 15 Jun 2026 17:31:15 -0400 Subject: [PATCH 2/8] feat(migrator): nedbd-side verification pass on startup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit At startup, query nedbd for actual collection counts and advance the resume state to max(state_file, nedbd_count). Detects rows inserted by the Python migrator, a previous run on another machine, or a lost state file. - count_collection(): queries FROM {coll} LIMIT 9999999 → count - verify_against_nedb(): syncs state.{kv,zsets,sets}_done to max of state file and actual nedbd count; saves state atomically if advanced - --no-verify flag to skip for speed (default: always verify) Co-Authored-By: NEDB Maintainer (Claude Sonnet 4.6) --- tools/nedb-migrator/src/main.rs | 127 ++++++++++++++++++++++++++++++++ 1 file changed, 127 insertions(+) diff --git a/tools/nedb-migrator/src/main.rs b/tools/nedb-migrator/src/main.rs index 9fbbd6f..1b465ea 100644 --- a/tools/nedb-migrator/src/main.rs +++ b/tools/nedb-migrator/src/main.rs @@ -89,6 +89,10 @@ struct Cli { #[arg(long)] dry_run: bool, + /// Skip the nedbd verification pass at startup (use state file as-is) + #[arg(long)] + no_verify: bool, + /// Verbose: print each batch #[arg(long, short)] verbose: bool, @@ -293,6 +297,111 @@ async fn send_batch_http( Ok(body["count"].as_u64().unwrap_or(0) as usize) } +/// Query how many documents a collection currently holds in nedbd. +/// Uses a high LIMIT — nedbd returns `count` in the response. +async fn count_collection( + client: &Client, + base: &str, + db: &str, + token: &str, + coll: &str, +) -> Result { + let resp = client + .post(format!("{}/v1/databases/{}/query", base, db)) + .maybe_bearer(token) + .json(&json!({ "nql": format!("FROM {} LIMIT 1", coll) })) + .send() + .await; + + // If the database or collection doesn't exist yet, treat count as 0. + match resp { + Err(_) => Ok(0), + Ok(r) => { + if !r.status().is_success() { + return Ok(0); + } + // Re-run with no limit to get actual count + let body: Value = r.json().await?; + // First check "count" field — nedbd returns it + if let Some(c) = body["count"].as_u64() { + if c < 2 { + // Could be 0 or 1 — do a full count + let r2 = client + .post(format!("{}/v1/databases/{}/query", base, db)) + .maybe_bearer(token) + .json(&json!({ "nql": format!("FROM {} LIMIT 9999999", coll) })) + .send() + .await?; + let b2: Value = r2.json().await?; + return Ok(b2["count"].as_u64().unwrap_or(0) as usize); + } + return Ok(c as usize); + } + Ok(0) + } + } +} + +/// Check nedbd collection counts and advance the resume state if nedbd is +/// ahead of the state file (e.g. data inserted by the Python migrator or a +/// previous run on another machine). +async fn verify_against_nedb( + client: &Client, + base: &str, + db: &str, + token: &str, + state: &mut State, + kv_total: usize, + zsets_total: usize, + sets_total: usize, + state_file: &Path, +) -> Result<()> { + print!("{} Verifying against nedbd… ", "◉".blue()); + std::io::Write::flush(&mut std::io::stdout()).ok(); + + let kv_n = count_collection(client, base, db, token, "kv").await.unwrap_or(0); + let zset_n = count_collection(client, base, db, token, "zset").await.unwrap_or(0); + let set_n = count_collection(client, base, db, token, "set").await.unwrap_or(0); + + println!("kv={kv_n} zset={zset_n} set={set_n}"); + + let mut advanced = false; + + macro_rules! sync_field { + ($field:expr, $nedb_count:expr, $total:expr, $label:expr) => { + if $nedb_count > $field && $nedb_count <= $total { + println!( + " {} {}: state file={} → nedbd={} (advancing)", + "↑".yellow(), $label, $field, $nedb_count + ); + $field = $nedb_count; + advanced = true; + } else if $nedb_count >= $total && $field < $total { + // nedbd already has all rows for this table + println!( + " {} {}: nedbd has all {} rows — skipping table", + "✓".green(), $label, $total + ); + $field = $total; + advanced = true; + } + }; + } + + sync_field!(state.kv_done, kv_n, kv_total, "kv"); + sync_field!(state.zsets_done, zset_n, zsets_total, "zsets"); + sync_field!(state.sets_done, set_n, sets_total, "sets"); + + if advanced { + save_state(state_file, state)?; + println!(" {} State synced from nedbd.\n", "✓".green()); + } else { + println!(" {} State is consistent with nedbd.\n", "✓".green()); + } + + Ok(()) +} + /// Trait for optional bearer auth — keeps call sites clean. trait MaybeBearer { fn maybe_bearer(self, token: &str) -> Self; @@ -451,6 +560,24 @@ async fn main() -> Result<()> { h["encrypted"].as_bool().unwrap_or(false) ); ensure_db(&probe, &cli.nedb_url, &cli.db, &cli.token).await?; + + // ── Verify state against what nedbd actually holds ──────────────── + if !cli.no_verify { + verify_against_nedb( + &probe, + &cli.nedb_url, + &cli.db, + &cli.token, + &mut state, + kv_rows.len(), + zset_rows.len(), + set_rows.len(), + &cli.state_file, + ) + .await?; + } else { + println!("{} Skipping nedbd verification (--no-verify)\n", "⚠".yellow()); + } } else { println!("{} Dry-run — skipping nedbd check\n", "⚠".yellow()); } From aaca22ba2e81cba73d61360fb067421b23bd15ef Mon Sep 17 00:00:00 2001 From: Eth-Interchained <117552056+Eth-Interchained@users.noreply.github.com> Date: Mon, 15 Jun 2026 17:44:27 -0400 Subject: [PATCH 3/8] fix(migrator): clap env feature + rusqlite stmt lifetime - Cargo.toml: add `env` to clap features (needed for #[arg(env=...)]) - read_zsets / read_sets: collect into Vec before returning so stmt outlives the iterator (borrow checker fix) Co-Authored-By: NEDB Maintainer (Claude Sonnet 4.6) --- tools/nedb-migrator/Cargo.toml | 2 +- tools/nedb-migrator/src/main.rs | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/tools/nedb-migrator/Cargo.toml b/tools/nedb-migrator/Cargo.toml index 2b16d7d..f42533c 100644 --- a/tools/nedb-migrator/Cargo.toml +++ b/tools/nedb-migrator/Cargo.toml @@ -24,7 +24,7 @@ serde = { version = "1", features = ["derive"] } serde_json = "1" # CLI -clap = { version = "4", features = ["derive"] } +clap = { version = "4", features = ["derive", "env"] } # Progress bars indicatif = "0.17" diff --git a/tools/nedb-migrator/src/main.rs b/tools/nedb-migrator/src/main.rs index 1b465ea..eb176f1 100644 --- a/tools/nedb-migrator/src/main.rs +++ b/tools/nedb-migrator/src/main.rs @@ -192,7 +192,7 @@ fn read_kv(conn: &Connection, skip_block_cache: bool) -> Result> { fn read_zsets(conn: &Connection) -> Result> { let mut stmt = conn.prepare("SELECT name, member, score FROM zsets ORDER BY rowid")?; - Ok(stmt + let rows: Vec = stmt .query_map([], |r| { Ok(ZsetRow { name: r.get(0)?, @@ -201,12 +201,13 @@ fn read_zsets(conn: &Connection) -> Result> { }) })? .filter_map(|r| r.ok()) - .collect()) + .collect(); + Ok(rows) } fn read_sets(conn: &Connection) -> Result> { let mut stmt = conn.prepare("SELECT name, member FROM sets ORDER BY rowid")?; - Ok(stmt + let rows: Vec = stmt .query_map([], |r| { Ok(SetRow { name: r.get(0)?, @@ -214,7 +215,8 @@ fn read_sets(conn: &Connection) -> Result> { }) })? .filter_map(|r| r.ok()) - .collect()) + .collect(); + Ok(rows) } // --------------------------------------------------------------------------- From 66e866c6c091f6247778964064dda6c6ba39b5fc Mon Sep 17 00:00:00 2001 From: Eth-Interchained <117552056+Eth-Interchained@users.noreply.github.com> Date: Mon, 15 Jun 2026 17:52:06 -0400 Subject: [PATCH 4/8] fix(migrator): 120s timeout + 3-retry ensure_db for large encrypted DBs GET /v1/databases/{name} can be slow on first access after heavy writes (nedbd replays the AOF log for large encrypted databases). Increase probe timeout from 10s to 120s and add 3-attempt retry with 5s backoff. Co-Authored-By: NEDB Maintainer (Claude Sonnet 4.6) --- tools/nedb-migrator/src/main.rs | 53 ++++++++++++++++++++++----------- 1 file changed, 36 insertions(+), 17 deletions(-) diff --git a/tools/nedb-migrator/src/main.rs b/tools/nedb-migrator/src/main.rs index eb176f1..87abeba 100644 --- a/tools/nedb-migrator/src/main.rs +++ b/tools/nedb-migrator/src/main.rs @@ -261,23 +261,40 @@ async fn nedb_health(client: &Client, base: &str, token: &str) -> Result } async fn ensure_db(client: &Client, base: &str, db: &str, token: &str) -> Result<()> { - let check = client - .get(format!("{}/v1/databases/{}", base, db)) - .maybe_bearer(token) - .send() - .await?; - if check.status().is_success() { - return Ok(()); + // Retry up to 3 times — first access after heavy writes can be slow + // while nedbd replays the AOF log for a large encrypted database. + let mut last_err = String::new(); + for attempt in 1..=3u8 { + match client + .get(format!("{}/v1/databases/{}", base, db)) + .maybe_bearer(token) + .send() + .await + { + Ok(r) if r.status().is_success() => return Ok(()), + Ok(r) if r.status().as_u16() == 404 => { + // DB doesn't exist yet — create it + client + .post(format!("{}/v1/databases", base)) + .maybe_bearer(token) + .json(&json!({"name": db})) + .send() + .await? + .error_for_status() + .context("Failed to create nedbd database")?; + return Ok(()); + } + Ok(r) => last_err = format!("HTTP {}", r.status()), + Err(e) => { + last_err = e.to_string(); + if attempt < 3 { + eprintln!(" ensure_db attempt {attempt}/3 failed ({last_err}), retrying…"); + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + } + } + } } - client - .post(format!("{}/v1/databases", base)) - .maybe_bearer(token) - .json(&json!({"name": db})) - .send() - .await? - .error_for_status() - .context("Failed to create nedbd database")?; - Ok(()) + anyhow::bail!("ensure_db failed after 3 attempts: {last_err}") } async fn send_batch_http( @@ -549,8 +566,10 @@ async fn main() -> Result<()> { // ── Connectivity check ──────────────────────────────────────────────── if !cli.dry_run { + // Use a longer timeout here: opening a large encrypted nedbd database + // (AOF replay) can take 30-60s on first access after heavy writes. let probe = Client::builder() - .timeout(std::time::Duration::from_secs(10)) + .timeout(std::time::Duration::from_secs(120)) .build()?; let h = nedb_health(&probe, &cli.nedb_url, &cli.token) .await From c6c561f8f71a21cc4b84a844a546fe7c8b487e8f Mon Sep 17 00:00:00 2001 From: Eth-Interchained <117552056+Eth-Interchained@users.noreply.github.com> Date: Mon, 15 Jun 2026 17:57:40 -0400 Subject: [PATCH 5/8] =?UTF-8?q?fix(migrator):=20stream=20SQLite=20via=20LI?= =?UTF-8?q?MIT/OFFSET=20=E2=80=94=20prevents=20OOM=20kill?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1.2M rows at once = OOM on low-RAM VPS. New approach: - SELECT COUNT(*) upfront for progress bars (no data in memory) - fetch_*_chunk: LIMIT/OFFSET streaming, only --chunk rows at a time - stream_table: chunk -> concurrent batches -> save cursor -> next chunk - Peak memory = chunk_size * ~300 bytes (default 2000 rows = ~0.6 MB) - Added --chunk N flag for tuning (default 2000) Co-Authored-By: NEDB Maintainer (Claude Sonnet 4.6) --- tools/nedb-migrator/src/main.rs | 779 +++++++++++++------------------- 1 file changed, 314 insertions(+), 465 deletions(-) diff --git a/tools/nedb-migrator/src/main.rs b/tools/nedb-migrator/src/main.rs index 87abeba..cb8eb5f 100644 --- a/tools/nedb-migrator/src/main.rs +++ b/tools/nedb-migrator/src/main.rs @@ -1,25 +1,16 @@ -//! nedb-migrator — fast, resumable SQLite → nedbd migration tool +//! nedb-migrator — fast, resumable, low-memory SQLite → nedbd migration tool //! -//! Reads all kv / zsets / sets rows from a Vision SQLite database and writes -//! them to a running nedbd instance using concurrent batch HTTP requests. +//! Streams SQLite rows in chunks so memory usage stays constant regardless of +//! table size. 1.2M rows uses the same RAM as 1k rows. //! //! # Usage //! //! ```bash -//! # Dry run — see what would be migrated -//! nedb-migrator --sqlite ../data/vision.db --dry-run -//! -//! # Full migration (resumes automatically if interrupted) //! nedb-migrator --sqlite ../data/vision.db -//! -//! # Skip the ~90k block cache rows, only migrate live state (~20 rows) //! nedb-migrator --sqlite ../data/vision.db --skip-block-cache -//! -//! # Reset progress and start from scratch //! nedb-migrator --sqlite ../data/vision.db --reset -//! -//! # Tune concurrency and batch size for faster hardware -//! nedb-migrator --sqlite ../data/vision.db --concurrency 32 --batch-size 200 +//! nedb-migrator --sqlite ../data/vision.db --dry-run +//! nedb-migrator --sqlite ../data/vision.db --chunk 5000 --concurrency 32 //! ``` use std::fs; @@ -30,7 +21,7 @@ use std::time::Instant; use anyhow::{Context, Result}; use clap::Parser; use colored::*; -use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; +use indicatif::{ProgressBar, ProgressStyle}; use reqwest::Client; use rusqlite::Connection; use serde::{Deserialize, Serialize}; @@ -42,58 +33,49 @@ use tokio::sync::Semaphore; // --------------------------------------------------------------------------- #[derive(Parser, Debug)] -#[command( - name = "nedb-migrator", - version = "1.0.0", - about = "Fast, resumable SQLite → nedbd migration for Interchained Vision", - long_about = None, -)] +#[command(name = "nedb-migrator", version = "1.0.0", + about = "Fast, resumable, streaming SQLite → nedbd migration")] struct Cli { - /// Path to the Vision SQLite database #[arg(long, default_value = "../data/vision.db")] sqlite: PathBuf, - /// nedbd base URL - #[arg(long, env = "NEDB_URL", default_value = "http://127.0.0.1:7070")] + #[arg(long, default_value = "http://127.0.0.1:7070")] nedb_url: String, - /// nedbd database name - #[arg(long, env = "NEDB_DB_NAME", default_value = "vision")] + #[arg(long, default_value = "vision")] db: String, - /// nedbd bearer token (leave blank if not set) - #[arg(long, env = "NEDBD_TOKEN", default_value = "")] + #[arg(long, default_value = "")] token: String, - /// Number of ops per batch request sent to nedbd - #[arg(long, default_value_t = 100)] - batch_size: usize, + /// Rows fetched from SQLite per streaming chunk (controls peak memory) + #[arg(long, default_value_t = 2000)] + chunk: usize, - /// Maximum concurrent batch requests in flight + /// Concurrent nedbd batch requests per chunk #[arg(long, default_value_t = 16)] concurrency: usize, - /// Skip vision:block:height:* and vision:block:hash:* rows (~90k rows) + /// Rows per nedbd batch request + #[arg(long, default_value_t = 100)] + batch_size: usize, + + /// Skip vision:block:height:* and vision:block:hash:* kv rows #[arg(long)] skip_block_cache: bool, - /// Path to the resume state file #[arg(long, default_value = ".nedb-migrator-state.json")] state_file: PathBuf, - /// Delete saved progress and start from scratch #[arg(long)] reset: bool, - /// Print what would be migrated without writing to nedbd #[arg(long)] - dry_run: bool, + no_verify: bool, - /// Skip the nedbd verification pass at startup (use state file as-is) #[arg(long)] - no_verify: bool, + dry_run: bool, - /// Verbose: print each batch #[arg(long, short)] verbose: bool, } @@ -102,8 +84,6 @@ struct Cli { // Resume state // --------------------------------------------------------------------------- -/// Tracks how many rows of each table have already been successfully sent. -/// Written atomically after every batch via temp-file-then-rename. #[derive(Debug, Clone, Serialize, Deserialize, Default)] struct State { kv_done: usize, @@ -126,169 +106,134 @@ fn save_state(path: &Path, state: &State) -> Result<()> { } // --------------------------------------------------------------------------- -// SQLite row types +// SQLite helpers — streaming via LIMIT/OFFSET, never loads full table // --------------------------------------------------------------------------- -#[derive(Clone)] -struct KvRow { - key: String, - value: String, - expires_at: Option, -} - -#[derive(Clone)] -struct ZsetRow { - name: String, - member: String, - score: f64, -} - -#[derive(Clone)] -struct SetRow { - name: String, - member: String, +fn count_table(conn: &Connection, table: &str, extra_where: &str) -> Result { + let sql = if extra_where.is_empty() { + format!("SELECT COUNT(*) FROM {table}") + } else { + format!("SELECT COUNT(*) FROM {table} WHERE {extra_where}") + }; + Ok(conn.query_row(&sql, [], |r| r.get::<_, i64>(0))? as usize) } -// --------------------------------------------------------------------------- -// SQLite readers (all read-only, one pass each) -// --------------------------------------------------------------------------- - -fn read_kv(conn: &Connection, skip_block_cache: bool) -> Result> { +/// Fetch one chunk of kv rows starting at `offset`, up to `limit` rows. +fn fetch_kv_chunk( + conn: &Connection, + offset: usize, + limit: usize, + skip_block_cache: bool, +) -> Result> { let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_secs_f64(); - let mut stmt = conn.prepare("SELECT key, value, expires_at FROM kv ORDER BY rowid")?; - let rows: Vec = stmt - .query_map([], |r| { - Ok(KvRow { - key: r.get(0)?, - value: r.get(1)?, - expires_at: r.get(2)?, - }) + let sql = "SELECT key, value, expires_at FROM kv ORDER BY rowid LIMIT ?1 OFFSET ?2"; + let mut stmt = conn.prepare(sql)?; + let rows: Vec = stmt + .query_map([limit as i64, offset as i64], |r| { + Ok(( + r.get::<_, String>(0)?, + r.get::<_, String>(1)?, + r.get::<_, Option>(2)?, + )) })? .filter_map(|r| r.ok()) - .filter(|r| { - // Drop expired rows - if let Some(exp) = r.expires_at { - if exp < now { - return false; - } + .filter(|(key, _, expires_at)| { + if let Some(exp) = expires_at { + if *exp < now { return false; } } - // Optionally skip the per-block cache (~90k rows) if skip_block_cache - && (r.key.starts_with("vision:block:height:") - || r.key.starts_with("vision:block:hash:")) + && (key.starts_with("vision:block:height:") + || key.starts_with("vision:block:hash:")) { return false; } true }) + .map(|(key, value, expires_at)| json!({ + "op": "put", "coll": "kv", "id": &key, + "doc": { "_id": &key, "value": value, "expires_at": expires_at } + })) .collect(); Ok(rows) } -fn read_zsets(conn: &Connection) -> Result> { - let mut stmt = - conn.prepare("SELECT name, member, score FROM zsets ORDER BY rowid")?; - let rows: Vec = stmt - .query_map([], |r| { - Ok(ZsetRow { - name: r.get(0)?, - member: r.get(1)?, - score: r.get(2)?, - }) +fn fetch_zset_chunk(conn: &Connection, offset: usize, limit: usize) -> Result> { + let sql = "SELECT name, member, score FROM zsets ORDER BY rowid LIMIT ?1 OFFSET ?2"; + let mut stmt = conn.prepare(sql)?; + let rows: Vec = stmt + .query_map([limit as i64, offset as i64], |r| { + Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?, r.get::<_, f64>(2)?)) })? .filter_map(|r| r.ok()) + .map(|(name, member, score)| { + let id = format!("{name}::{member}"); + json!({ + "op": "put", "coll": "zset", "id": &id, + "doc": { "_id": &id, "_name": name, "_member": member, "score": score } + }) + }) .collect(); Ok(rows) } -fn read_sets(conn: &Connection) -> Result> { - let mut stmt = conn.prepare("SELECT name, member FROM sets ORDER BY rowid")?; - let rows: Vec = stmt - .query_map([], |r| { - Ok(SetRow { - name: r.get(0)?, - member: r.get(1)?, - }) +fn fetch_set_chunk(conn: &Connection, offset: usize, limit: usize) -> Result> { + let sql = "SELECT name, member FROM sets ORDER BY rowid LIMIT ?1 OFFSET ?2"; + let mut stmt = conn.prepare(sql)?; + let rows: Vec = stmt + .query_map([limit as i64, offset as i64], |r| { + Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)) })? .filter_map(|r| r.ok()) + .map(|(name, member)| { + let id = format!("{name}::{member}"); + json!({ + "op": "put", "coll": "set", "id": &id, + "doc": { "_id": &id, "_name": name, "_member": member } + }) + }) .collect(); Ok(rows) } // --------------------------------------------------------------------------- -// nedbd batch op builders +// nedbd HTTP // --------------------------------------------------------------------------- -fn kv_op(r: &KvRow) -> Value { - json!({ - "op": "put", "coll": "kv", "id": r.key, - "doc": { "_id": r.key, "value": r.value, "expires_at": r.expires_at } - }) -} - -fn zset_op(r: &ZsetRow) -> Value { - let id = format!("{}::{}", r.name, r.member); - json!({ - "op": "put", "coll": "zset", "id": &id, - "doc": { "_id": &id, "_name": r.name, "_member": r.member, "score": r.score } - }) -} - -fn set_op(r: &SetRow) -> Value { - let id = format!("{}::{}", r.name, r.member); - json!({ - "op": "put", "coll": "set", "id": &id, - "doc": { "_id": &id, "_name": r.name, "_member": r.member } - }) +trait MaybeBearer { fn maybe_bearer(self, token: &str) -> Self; } +impl MaybeBearer for reqwest::RequestBuilder { + fn maybe_bearer(self, t: &str) -> Self { + if t.is_empty() { self } else { self.bearer_auth(t) } + } } -// --------------------------------------------------------------------------- -// nedbd HTTP helpers -// --------------------------------------------------------------------------- - async fn nedb_health(client: &Client, base: &str, token: &str) -> Result { - Ok(client - .get(format!("{}/health", base)) - .maybe_bearer(token) - .send() - .await? - .json::() - .await?) + Ok(client.get(format!("{base}/health")) + .maybe_bearer(token).send().await?.json().await?) } async fn ensure_db(client: &Client, base: &str, db: &str, token: &str) -> Result<()> { - // Retry up to 3 times — first access after heavy writes can be slow - // while nedbd replays the AOF log for a large encrypted database. let mut last_err = String::new(); - for attempt in 1..=3u8 { - match client - .get(format!("{}/v1/databases/{}", base, db)) - .maybe_bearer(token) - .send() - .await + for attempt in 1u8..=3 { + match client.get(format!("{base}/v1/databases/{db}")) + .maybe_bearer(token).send().await { Ok(r) if r.status().is_success() => return Ok(()), Ok(r) if r.status().as_u16() == 404 => { - // DB doesn't exist yet — create it - client - .post(format!("{}/v1/databases", base)) + client.post(format!("{base}/v1/databases")) .maybe_bearer(token) .json(&json!({"name": db})) - .send() - .await? - .error_for_status() - .context("Failed to create nedbd database")?; + .send().await?.error_for_status()?; return Ok(()); } - Ok(r) => last_err = format!("HTTP {}", r.status()), + Ok(r) => last_err = format!("HTTP {}", r.status()), Err(e) => { last_err = e.to_string(); if attempt < 3 { - eprintln!(" ensure_db attempt {attempt}/3 failed ({last_err}), retrying…"); + eprintln!(" ensure_db attempt {attempt}/3 failed, retrying in 5s…"); tokio::time::sleep(std::time::Duration::from_secs(5)).await; } } @@ -297,211 +242,141 @@ async fn ensure_db(client: &Client, base: &str, db: &str, token: &str) -> Result anyhow::bail!("ensure_db failed after 3 attempts: {last_err}") } -async fn send_batch_http( - client: &Client, - base: &str, - db: &str, - token: &str, - ops: Vec, -) -> Result { - let resp = client - .post(format!("{}/v1/databases/{}/batch", base, db)) +async fn send_batch(client: &Client, base: &str, db: &str, token: &str, ops: Vec) -> Result { + let resp = client.post(format!("{base}/v1/databases/{db}/batch")) .maybe_bearer(token) .json(&json!({"ops": ops})) - .send() - .await? - .error_for_status() - .context("nedbd batch failed")?; + .send().await? + .error_for_status()?; let body: Value = resp.json().await?; Ok(body["count"].as_u64().unwrap_or(0) as usize) } -/// Query how many documents a collection currently holds in nedbd. -/// Uses a high LIMIT — nedbd returns `count` in the response. -async fn count_collection( - client: &Client, - base: &str, - db: &str, - token: &str, - coll: &str, +// --------------------------------------------------------------------------- +// Streaming table sender — processes one chunk at a time, constant memory +// --------------------------------------------------------------------------- + +async fn stream_table( + label: &str, + total: usize, + start_offset: usize, + cli: &Cli, + state_field: &mut usize, + state: &mut State, + fetch_chunk: impl Fn(usize, usize) -> Result>, + client: Arc, + pb: &ProgressBar, ) -> Result { - let resp = client - .post(format!("{}/v1/databases/{}/query", base, db)) - .maybe_bearer(token) - .json(&json!({ "nql": format!("FROM {} LIMIT 1", coll) })) - .send() - .await; - - // If the database or collection doesn't exist yet, treat count as 0. - match resp { - Err(_) => Ok(0), - Ok(r) => { - if !r.status().is_success() { - return Ok(0); - } - // Re-run with no limit to get actual count - let body: Value = r.json().await?; - // First check "count" field — nedbd returns it - if let Some(c) = body["count"].as_u64() { - if c < 2 { - // Could be 0 or 1 — do a full count - let r2 = client - .post(format!("{}/v1/databases/{}/query", base, db)) - .maybe_bearer(token) - .json(&json!({ "nql": format!("FROM {} LIMIT 9999999", coll) })) - .send() - .await?; - let b2: Value = r2.json().await?; - return Ok(b2["count"].as_u64().unwrap_or(0) as usize); - } - return Ok(c as usize); - } - Ok(0) + let mut offset = start_offset; + let mut sent = 0usize; + + while offset < total { + // 1. Fetch one chunk from SQLite (low memory — only `chunk` rows at a time) + let ops = fetch_chunk(offset, cli.chunk)?; + if ops.is_empty() { + break; // expired/filtered rows mean chunk may be empty — advance } - } -} - -/// Check nedbd collection counts and advance the resume state if nedbd is -/// ahead of the state file (e.g. data inserted by the Python migrator or a -/// previous run on another machine). -async fn verify_against_nedb( - client: &Client, - base: &str, - db: &str, - token: &str, - state: &mut State, - kv_total: usize, - zsets_total: usize, - sets_total: usize, - state_file: &Path, -) -> Result<()> { - print!("{} Verifying against nedbd… ", "◉".blue()); - std::io::Write::flush(&mut std::io::stdout()).ok(); - let kv_n = count_collection(client, base, db, token, "kv").await.unwrap_or(0); - let zset_n = count_collection(client, base, db, token, "zset").await.unwrap_or(0); - let set_n = count_collection(client, base, db, token, "set").await.unwrap_or(0); - - println!("kv={kv_n} zset={zset_n} set={set_n}"); + let chunk_len = ops.len(); + + // 2. Split chunk into batches and send concurrently + let sem = Arc::new(Semaphore::new(cli.concurrency)); + let mut handles = Vec::new(); + + for batch in ops.chunks(cli.batch_size) { + let batch_ops = batch.to_vec(); + let batch_len = batch_ops.len(); + let c2 = Arc::clone(&client); + let sem2 = Arc::clone(&sem); + let base = cli.nedb_url.clone(); + let db = cli.db.clone(); + let tok = cli.token.clone(); + let dry = cli.dry_run; + + let h: tokio::task::JoinHandle> = tokio::spawn(async move { + let _p = sem2.acquire_owned().await.unwrap(); + if dry { return Ok(batch_len); } + send_batch(&c2, &base, &db, &tok, batch_ops).await + }); + handles.push((h, batch_len)); + } - let mut advanced = false; + // 3. Collect results in order + let mut chunk_sent = 0usize; + for (h, batch_len) in handles { + chunk_sent += h.await.context("task panicked")??; + pb.inc(batch_len as u64); + } - macro_rules! sync_field { - ($field:expr, $nedb_count:expr, $total:expr, $label:expr) => { - if $nedb_count > $field && $nedb_count <= $total { - println!( - " {} {}: state file={} → nedbd={} (advancing)", - "↑".yellow(), $label, $field, $nedb_count - ); - $field = $nedb_count; - advanced = true; - } else if $nedb_count >= $total && $field < $total { - // nedbd already has all rows for this table - println!( - " {} {}: nedbd has all {} rows — skipping table", - "✓".green(), $label, $total - ); - $field = $total; - advanced = true; - } - }; - } + sent += chunk_sent; + offset += chunk_len; // advance by raw chunk size (pre-filter) - sync_field!(state.kv_done, kv_n, kv_total, "kv"); - sync_field!(state.zsets_done, zset_n, zsets_total, "zsets"); - sync_field!(state.sets_done, set_n, sets_total, "sets"); + // 4. Persist cursor after every chunk — losing a chunk is the worst case + *state_field = offset; + if !cli.dry_run { + save_state(&cli.state_file, state)?; + } - if advanced { - save_state(state_file, state)?; - println!(" {} State synced from nedbd.\n", "✓".green()); - } else { - println!(" {} State is consistent with nedbd.\n", "✓".green()); + if cli.verbose { + eprintln!(" {label}: offset={offset}/{total} sent_this_chunk={chunk_sent}"); + } } - Ok(()) -} - -/// Trait for optional bearer auth — keeps call sites clean. -trait MaybeBearer { - fn maybe_bearer(self, token: &str) -> Self; -} -impl MaybeBearer for reqwest::RequestBuilder { - fn maybe_bearer(self, token: &str) -> Self { - if token.is_empty() { self } else { self.bearer_auth(token) } - } + pb.finish_with_message(format!("{} rows", start_offset + sent)); + Ok(sent) } // --------------------------------------------------------------------------- -// Core: send one table's ops with resume + concurrent batches +// nedbd-side verification // --------------------------------------------------------------------------- -/// Send all ops for a single table, resuming from `already_done`. -/// -/// Spawns up to `concurrency` tokio tasks simultaneously (semaphore-limited). -/// After each batch completes **in order**, the cursor is advanced and the -/// state file is saved atomically — so a kill at any point loses at most -/// one batch worth of work. -async fn send_table_ops( - ops: Vec, - already_done: usize, - label: &str, - cli: &Cli, - state: &mut State, - state_field: fn(&mut State) -> &mut usize, - pb: &ProgressBar, -) -> Result { - let remaining = if already_done < ops.len() { - &ops[already_done..] - } else { - pb.finish_with_message("already done"); - return Ok(0); - }; - - let client = Arc::new(Client::builder() - .timeout(std::time::Duration::from_secs(60)) - .build()?); - let sem = Arc::new(Semaphore::new(cli.concurrency)); +async fn count_collection(client: &Client, base: &str, db: &str, token: &str, coll: &str) -> usize { + let res = client.post(format!("{base}/v1/databases/{db}/query")) + .maybe_bearer(token) + .json(&json!({"nql": format!("FROM {coll} LIMIT 9999999")})) + .send().await; + match res { + Ok(r) if r.status().is_success() => { + r.json::().await.ok() + .and_then(|v| v["count"].as_u64()) + .unwrap_or(0) as usize + } + _ => 0, + } +} - // Pre-chunk so we can spawn all tasks up front, then await in order. - let chunks: Vec> = remaining - .chunks(cli.batch_size) - .map(|c| c.to_vec()) - .collect(); +async fn verify_state( + client: &Client, base: &str, db: &str, token: &str, + state: &mut State, state_file: &Path, + kv_total: usize, zsets_total: usize, sets_total: usize, +) -> Result<()> { + print!("{} Checking nedbd collections… ", "◉".blue()); + std::io::Write::flush(&mut std::io::stdout()).ok(); - let mut handles = Vec::with_capacity(chunks.len()); - for chunk in chunks { - let chunk_len = chunk.len(); - let client2 = Arc::clone(&client); - let sem2 = Arc::clone(&sem); - let base = cli.nedb_url.clone(); - let db = cli.db.clone(); - let token = cli.token.clone(); - let dry = cli.dry_run; - - let h: tokio::task::JoinHandle> = tokio::spawn(async move { - let _permit = sem2.acquire_owned().await.unwrap(); - if dry { return Ok(chunk_len); } - send_batch_http(&client2, &base, &db, &token, chunk).await - }); - handles.push((h, chunk_len)); - } + let kv_n = count_collection(client, base, db, token, "kv").await; + let zset_n = count_collection(client, base, db, token, "zset").await; + let set_n = count_collection(client, base, db, token, "set").await; + println!("kv={kv_n} zset={zset_n} set={set_n}"); - let mut total_sent = 0usize; - for (handle, chunk_len) in handles { - let written = handle.await.context("task panicked")??; - total_sent += written; - *state_field(state) += chunk_len; - if !cli.dry_run { - save_state(&cli.state_file, state)?; - } - pb.inc(chunk_len as u64); - if cli.verbose { - eprintln!(" {} +{} (total {})", label, chunk_len, *state_field(state)); - } + let mut advanced = false; + macro_rules! sync_f { + ($f:expr, $n:expr, $total:expr, $lbl:expr) => { + if $n > $f && $n <= $total { + println!(" {} {}: {} → {} (advancing)", "↑".yellow(), $lbl, $f, $n); + $f = $n; advanced = true; + } else if $n >= $total { + println!(" {} {}: all {} rows already in nedbd", "✓".green(), $lbl, $total); + $f = $total; advanced = true; + } + }; } + sync_f!(state.kv_done, kv_n, kv_total, "kv"); + sync_f!(state.zsets_done, zset_n, zsets_total, "zsets"); + sync_f!(state.sets_done, set_n, sets_total, "sets"); - pb.finish_with_message(format!("{} rows", already_done + total_sent)); - Ok(total_sent) + if advanced { save_state(state_file, state)?; println!(" {} State synced.\n", "✓".green()); } + else { println!(" {} Consistent.\n", "✓".green()); } + Ok(()) } // --------------------------------------------------------------------------- @@ -512,22 +387,22 @@ async fn send_table_ops( async fn main() -> Result<()> { let cli = Cli::parse(); - println!( - "\n{} {} — SQLite → nedbd\n", - "nedb-migrator".bold().cyan(), - "v1.0.0".dimmed() - ); - println!(" sqlite {}", cli.sqlite.display()); - println!(" nedbd {}", cli.nedb_url); - println!(" database {}", cli.db); - println!(" batch-size {}", cli.batch_size); - println!(" concurrency {}", cli.concurrency); + println!("\n{} {} — SQLite → nedbd (streaming)\n", + "nedb-migrator".bold().cyan(), "v1.0.0".dimmed()); + println!(" sqlite {}", cli.sqlite.display()); + println!(" nedbd {}", cli.nedb_url); + println!(" database {}", cli.db); + println!(" chunk size {} rows (peak memory: ~{} MB)", + cli.chunk, + cli.chunk * 300 / 1_000_000 + 1); // rough estimate + println!(" concurrency {}", cli.concurrency); + println!(" batch size {}", cli.batch_size); println!(" skip-block-cache {}", cli.skip_block_cache); - println!(" dry-run {}", cli.dry_run); - println!(" state file {}", cli.state_file.display()); + println!(" dry-run {}", cli.dry_run); + println!(" state file {}", cli.state_file.display()); println!(); - // ── Resume state ───────────────────────────────────────────────────── + // ── Resume state ──────────────────────────────────────────────────────── if cli.reset && cli.state_file.exists() { fs::remove_file(&cli.state_file).ok(); println!("{} State reset.\n", "↺".yellow()); @@ -535,157 +410,131 @@ async fn main() -> Result<()> { let mut state = if cli.reset { State::default() } else { load_state(&cli.state_file) }; if state.kv_done + state.zsets_done + state.sets_done > 0 { - println!( - "{} Resuming — kv={} zsets={} sets={}\n", - "→".green(), state.kv_done, state.zsets_done, state.sets_done - ); + println!("{} Resuming — kv={} zsets={} sets={}\n", + "→".green(), state.kv_done, state.zsets_done, state.sets_done); } - // ── Open SQLite ─────────────────────────────────────────────────────── + // ── Open SQLite (read-only) ────────────────────────────────────────────── let canon = cli.sqlite.canonicalize() .with_context(|| format!("SQLite not found: {}", cli.sqlite.display()))?; - let conn = Connection::open_with_flags( - &canon, - rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX, - ) - .context("Failed to open SQLite")?; + let conn = Connection::open_with_flags(&canon, + rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX)?; + + // Count rows — cheap, doesn't load data + print!("{} Counting rows… ", "◉".blue()); + std::io::Write::flush(&mut std::io::stdout()).ok(); + let kv_total = count_table(&conn, "kv", "")?; + let zsets_total = count_table(&conn, "zsets", "")?; + let sets_total = count_table(&conn, "sets", "")?; + println!("kv={} zsets={} sets={}\n", + kv_total.to_string().yellow(), + zsets_total.to_string().yellow(), + sets_total.to_string().yellow()); + + // ── Connectivity ──────────────────────────────────────────────────────── + let client = Arc::new(Client::builder() + .timeout(std::time::Duration::from_secs(120)) + .build()?); - print!("{} Reading SQLite… ", "◉".blue()); - let t0 = Instant::now(); - let kv_rows = read_kv(&conn, cli.skip_block_cache)?; - let zset_rows = read_zsets(&conn)?; - let set_rows = read_sets(&conn)?; - drop(conn); // release the file handle - println!( - "kv={} zsets={} sets={} ({} ms)\n", - kv_rows.len().to_string().yellow(), - zset_rows.len().to_string().yellow(), - set_rows.len().to_string().yellow(), - t0.elapsed().as_millis() - ); - - // ── Connectivity check ──────────────────────────────────────────────── if !cli.dry_run { - // Use a longer timeout here: opening a large encrypted nedbd database - // (AOF replay) can take 30-60s on first access after heavy writes. - let probe = Client::builder() - .timeout(std::time::Duration::from_secs(120)) - .build()?; - let h = nedb_health(&probe, &cli.nedb_url, &cli.token) - .await - .context("Cannot reach nedbd — is it running?")?; - println!( - "{} nedbd {} version={} encrypted={}\n", + let h = nedb_health(&client, &cli.nedb_url, &cli.token) + .await.context("Cannot reach nedbd")?; + println!("{} nedbd {} version={} encrypted={}\n", "✓".green(), "OK".green(), h["version"].as_str().unwrap_or("?"), - h["encrypted"].as_bool().unwrap_or(false) - ); - ensure_db(&probe, &cli.nedb_url, &cli.db, &cli.token).await?; + h["encrypted"].as_bool().unwrap_or(false)); - // ── Verify state against what nedbd actually holds ──────────────── if !cli.no_verify { - verify_against_nedb( - &probe, - &cli.nedb_url, - &cli.db, - &cli.token, - &mut state, - kv_rows.len(), - zset_rows.len(), - set_rows.len(), - &cli.state_file, - ) - .await?; + ensure_db(&client, &cli.nedb_url, &cli.db, &cli.token).await?; + verify_state(&client, &cli.nedb_url, &cli.db, &cli.token, + &mut state, &cli.state_file, + kv_total, zsets_total, sets_total).await?; } else { - println!("{} Skipping nedbd verification (--no-verify)\n", "⚠".yellow()); + println!("{} Skipping nedbd check (--no-verify)\n", "⚠".yellow()); } } else { - println!("{} Dry-run — skipping nedbd check\n", "⚠".yellow()); + println!("{} Dry-run\n", "⚠".yellow()); } - // ── Progress bars ───────────────────────────────────────────────────── + // ── Progress bars ──────────────────────────────────────────────────────── let style = ProgressStyle::with_template( - "{prefix:.bold} [{bar:42.cyan/blue}] {pos:>7}/{len:>7} {per_sec:>10} eta {eta}", - ) - .unwrap() - .progress_chars("█▉▊▋▌▍▎▏ "); - - let mp = MultiProgress::new(); + "{prefix:.bold} [{bar:42.cyan/blue}] {pos:>9}/{len:>9} {per_sec:>12} eta {eta}" + ).unwrap().progress_chars("█▉▊▋▌▍▎▏ "); - let pb_kv = mp.add(ProgressBar::new(kv_rows.len() as u64)); - pb_kv.set_style(style.clone()); - pb_kv.set_prefix("kv "); + let pb_kv = ProgressBar::new(kv_total as u64); + pb_kv.set_style(style.clone()); pb_kv.set_prefix("kv "); pb_kv.set_position(state.kv_done as u64); - let pb_zset = mp.add(ProgressBar::new(zset_rows.len() as u64)); - pb_zset.set_style(style.clone()); - pb_zset.set_prefix("zset "); + let pb_zset = ProgressBar::new(zsets_total as u64); + pb_zset.set_style(style.clone()); pb_zset.set_prefix("zset "); pb_zset.set_position(state.zsets_done as u64); - let pb_set = mp.add(ProgressBar::new(set_rows.len() as u64)); - pb_set.set_style(style.clone()); - pb_set.set_prefix("set "); + let pb_set = ProgressBar::new(sets_total as u64); + pb_set.set_style(style.clone()); pb_set.set_prefix("set "); pb_set.set_position(state.sets_done as u64); - // ── Build op vecs ───────────────────────────────────────────────────── - let kv_ops: Vec = kv_rows.iter().map(kv_op).collect(); - let zset_ops: Vec = zset_rows.iter().map(zset_op).collect(); - let set_ops: Vec = set_rows.iter().map(set_op).collect(); - - let t_migrate = Instant::now(); - - // ── Send kv ─────────────────────────────────────────────────────────── - let kv_skip = state.kv_done; - let kv_sent = send_table_ops( - kv_ops, kv_skip, "kv", &cli, &mut state, - |s| &mut s.kv_done, &pb_kv, - ).await?; - - // ── Send zsets ──────────────────────────────────────────────────────── - let zsets_skip = state.zsets_done; - let zsets_sent = send_table_ops( - zset_ops, zsets_skip, "zset", &cli, &mut state, - |s| &mut s.zsets_done, &pb_zset, - ).await?; - - // ── Send sets ───────────────────────────────────────────────────────── - let sets_skip = state.sets_done; - let sets_sent = send_table_ops( - set_ops, sets_skip, "set", &cli, &mut state, - |s| &mut s.sets_done, &pb_set, - ).await?; - - // ── Summary ─────────────────────────────────────────────────────────── - let elapsed = t_migrate.elapsed().as_secs_f64(); - let total = kv_sent + zsets_sent + sets_sent; - let rps = if elapsed > 0.0 { total as f64 / elapsed } else { 0.0 }; - - println!(); - println!("{}", "─".repeat(52)); - println!("{}", if cli.dry_run { " DRY-RUN summary " } else { " Migration complete " }.bold()); - println!("{}", "─".repeat(52)); + let t0 = Instant::now(); - let tag = if cli.dry_run { "[DRY] ".yellow().to_string() } else { String::new() }; - println!(" {}kv sent: {}", tag, kv_sent.to_string().green()); - if kv_skip > 0 { - println!(" kv skipped: {} (already done)", kv_skip.to_string().dimmed()); + // ── kv ─────────────────────────────────────────────────────────────────── + let kv_start = state.kv_done; + { + let skip = cli.skip_block_cache; + let c = Arc::clone(&client); + let kv_sent = stream_table( + "kv", kv_total, kv_start, &cli, + &mut state.kv_done, &mut state, + |off, lim| fetch_kv_chunk(&conn, off, lim, skip), + c, &pb_kv, + ).await?; + let _ = kv_sent; // progress bar handles display } - println!(" {}zsets sent: {}", tag, zsets_sent.to_string().green()); - println!(" {}sets sent: {}", tag, sets_sent.to_string().green()); - println!(" {}total: {}", tag, total.to_string().bold().green()); - println!(" elapsed: {:.2}s ({:.0} rows/s)", elapsed, rps); - if !cli.dry_run && total > 0 { - println!(); - println!("{} State → {}", "✓".green(), cli.state_file.display()); + // ── zsets ──────────────────────────────────────────────────────────────── + let zsets_start = state.zsets_done; + { + let c = Arc::clone(&client); + stream_table( + "zset", zsets_total, zsets_start, &cli, + &mut state.zsets_done, &mut state, + |off, lim| fetch_zset_chunk(&conn, off, lim), + c, &pb_zset, + ).await?; } - if total == 0 && (kv_skip + zsets_skip + sets_skip) > 0 { - println!(); - println!("{} All rows already migrated. Run with {} to start over.", - "✓".green(), "--reset".bold()); + // ── sets ───────────────────────────────────────────────────────────────── + let sets_start = state.sets_done; + { + let c = Arc::clone(&client); + stream_table( + "set", sets_total, sets_start, &cli, + &mut state.sets_done, &mut state, + |off, lim| fetch_set_chunk(&conn, off, lim), + c, &pb_set, + ).await?; } + // ── Summary ─────────────────────────────────────────────────────────────── + let elapsed = t0.elapsed().as_secs_f64(); + let total = (state.kv_done - kv_start) + + (state.zsets_done - zsets_start) + + (state.sets_done - sets_start); + let rps = if elapsed > 0.0 { total as f64 / elapsed } else { 0.0 }; + + println!("\n{}", "─".repeat(52)); + println!("{}", if cli.dry_run { " DRY-RUN summary " } else { " Migration complete " }.bold()); + println!("{}", "─".repeat(52)); + println!(" kv sent: {}", (state.kv_done - kv_start).to_string().green()); + println!(" zsets sent: {}", (state.zsets_done - zsets_start).to_string().green()); + println!(" sets sent: {}", (state.sets_done - sets_start).to_string().green()); + println!(" total: {}", total.to_string().bold().green()); + println!(" elapsed: {:.1}s ({:.0} rows/s)", elapsed, rps); + + if !cli.dry_run && total > 0 { + println!("\n{} State → {}", "✓".green(), cli.state_file.display()); + } + if total == 0 { + println!("\n{} Nothing new — already migrated. Use --reset to start over.", "✓".green()); + } println!(); Ok(()) } From 5caea4a4259d538a7ce99919bb3e1d78de81ed41 Mon Sep 17 00:00:00 2001 From: Eth-Interchained <117552056+Eth-Interchained@users.noreply.github.com> Date: Mon, 15 Jun 2026 17:58:48 -0400 Subject: [PATCH 6/8] fix(migrator): lower default concurrency + retry-with-backoff on timeout MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit nedbd Sequencer serialises writes internally. High concurrency on an encrypted database causes the request queue to back up and later batches to timeout. Changes: - Default concurrency: 16 -> 4 (encrypted DB safe) - Default batch size: 100 -> 50 rows - send_batch: retry up to 4x with exponential backoff (500ms→1s→2s→4s) so a single slow Sequencer flush does not abort the migration Co-Authored-By: NEDB Maintainer (Claude Sonnet 4.6) --- tools/nedb-migrator/src/main.rs | 39 ++++++++++++++++++++++++--------- 1 file changed, 29 insertions(+), 10 deletions(-) diff --git a/tools/nedb-migrator/src/main.rs b/tools/nedb-migrator/src/main.rs index cb8eb5f..4bf8a5e 100644 --- a/tools/nedb-migrator/src/main.rs +++ b/tools/nedb-migrator/src/main.rs @@ -52,12 +52,14 @@ struct Cli { #[arg(long, default_value_t = 2000)] chunk: usize, - /// Concurrent nedbd batch requests per chunk - #[arg(long, default_value_t = 16)] + /// Concurrent nedbd batch requests per chunk. + /// Reduce to 2-4 for encrypted databases — nedbd's Sequencer serialises + /// writes and high concurrency causes queue buildup + timeouts. + #[arg(long, default_value_t = 4)] concurrency: usize, /// Rows per nedbd batch request - #[arg(long, default_value_t = 100)] + #[arg(long, default_value_t = 50)] batch_size: usize, /// Skip vision:block:height:* and vision:block:hash:* kv rows @@ -243,13 +245,30 @@ async fn ensure_db(client: &Client, base: &str, db: &str, token: &str) -> Result } async fn send_batch(client: &Client, base: &str, db: &str, token: &str, ops: Vec) -> Result { - let resp = client.post(format!("{base}/v1/databases/{db}/batch")) - .maybe_bearer(token) - .json(&json!({"ops": ops})) - .send().await? - .error_for_status()?; - let body: Value = resp.json().await?; - Ok(body["count"].as_u64().unwrap_or(0) as usize) + // Retry up to 4 times with exponential backoff. + // Encrypted nedbd under write pressure can timeout transiently — this + // ensures a single slow Sequencer flush doesn't abort the migration. + let mut delay_ms = 500u64; + let mut last_err = String::new(); + for attempt in 1u8..=4 { + match client.post(format!("{base}/v1/databases/{db}/batch")) + .maybe_bearer(token) + .json(&json!({"ops": &ops})) + .send().await + { + Ok(r) if r.status().is_success() => { + let body: Value = r.json().await?; + return Ok(body["count"].as_u64().unwrap_or(0) as usize); + } + Ok(r) => last_err = format!("HTTP {}", r.status()), + Err(e) => last_err = e.to_string(), + } + if attempt < 4 { + tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await; + delay_ms = (delay_ms * 2).min(8_000); + } + } + anyhow::bail!("batch failed after 4 attempts: {last_err}") } // --------------------------------------------------------------------------- From 0ab78de7932925a9ee30e49425347e21a1b5bf41 Mon Sep 17 00:00:00 2001 From: Eth-Interchained <117552056+Eth-Interchained@users.noreply.github.com> Date: Mon, 15 Jun 2026 18:00:12 -0400 Subject: [PATCH 7/8] fix(migrator): resolve double-borrow via getter/setter fn pointers Cannot borrow &mut state.field and &mut state simultaneously. Fix: stream_table takes &mut State + (fn get, fn set) instead of &mut field + &mut state. Single borrow, no conflict. Co-Authored-By: NEDB Maintainer (Claude Sonnet 4.6) --- tools/nedb-migrator/src/main.rs | 80 ++++++++++++++++----------------- 1 file changed, 39 insertions(+), 41 deletions(-) diff --git a/tools/nedb-migrator/src/main.rs b/tools/nedb-migrator/src/main.rs index 4bf8a5e..6cc4342 100644 --- a/tools/nedb-migrator/src/main.rs +++ b/tools/nedb-migrator/src/main.rs @@ -276,42 +276,42 @@ async fn send_batch(client: &Client, base: &str, db: &str, token: &str, ops: Vec // --------------------------------------------------------------------------- async fn stream_table( - label: &str, - total: usize, - start_offset: usize, - cli: &Cli, - state_field: &mut usize, - state: &mut State, - fetch_chunk: impl Fn(usize, usize) -> Result>, - client: Arc, - pb: &ProgressBar, + label: &str, + total: usize, + cli: &Cli, + state: &mut State, + get_done: fn(&State) -> usize, + set_done: fn(&mut State, usize), + fetch_chunk: impl Fn(usize, usize) -> Result>, + client: Arc, + pb: &ProgressBar, ) -> Result { - let mut offset = start_offset; - let mut sent = 0usize; + let start = get_done(state); + let mut offset = start; + let mut sent = 0usize; while offset < total { - // 1. Fetch one chunk from SQLite (low memory — only `chunk` rows at a time) + // 1. Fetch one chunk from SQLite — only `chunk` rows in memory at once let ops = fetch_chunk(offset, cli.chunk)?; if ops.is_empty() { - break; // expired/filtered rows mean chunk may be empty — advance + // All remaining rows were filtered (expired / skip-block-cache) + offset += cli.chunk; + continue; } - let chunk_len = ops.len(); - // 2. Split chunk into batches and send concurrently + // 2. Split into batches and fire concurrently let sem = Arc::new(Semaphore::new(cli.concurrency)); let mut handles = Vec::new(); - for batch in ops.chunks(cli.batch_size) { - let batch_ops = batch.to_vec(); - let batch_len = batch_ops.len(); - let c2 = Arc::clone(&client); - let sem2 = Arc::clone(&sem); - let base = cli.nedb_url.clone(); - let db = cli.db.clone(); - let tok = cli.token.clone(); - let dry = cli.dry_run; - + let batch_ops = batch.to_vec(); + let batch_len = batch_ops.len(); + let c2 = Arc::clone(&client); + let sem2 = Arc::clone(&sem); + let base = cli.nedb_url.clone(); + let db = cli.db.clone(); + let tok = cli.token.clone(); + let dry = cli.dry_run; let h: tokio::task::JoinHandle> = tokio::spawn(async move { let _p = sem2.acquire_owned().await.unwrap(); if dry { return Ok(batch_len); } @@ -320,28 +320,27 @@ async fn stream_table( handles.push((h, batch_len)); } - // 3. Collect results in order + // 3. Collect in order let mut chunk_sent = 0usize; for (h, batch_len) in handles { chunk_sent += h.await.context("task panicked")??; pb.inc(batch_len as u64); } - sent += chunk_sent; - offset += chunk_len; // advance by raw chunk size (pre-filter) + sent += chunk_sent; + offset += chunk_len; - // 4. Persist cursor after every chunk — losing a chunk is the worst case - *state_field = offset; + // 4. Save cursor atomically after every chunk + set_done(state, offset); if !cli.dry_run { save_state(&cli.state_file, state)?; } - if cli.verbose { - eprintln!(" {label}: offset={offset}/{total} sent_this_chunk={chunk_sent}"); + eprintln!(" {label}: {offset}/{total} chunk_sent={chunk_sent}"); } } - pb.finish_with_message(format!("{} rows", start_offset + sent)); + pb.finish_with_message(format!("{} rows", start + sent)); Ok(sent) } @@ -499,13 +498,12 @@ async fn main() -> Result<()> { { let skip = cli.skip_block_cache; let c = Arc::clone(&client); - let kv_sent = stream_table( - "kv", kv_total, kv_start, &cli, - &mut state.kv_done, &mut state, + stream_table( + "kv", kv_total, &cli, &mut state, + |s| s.kv_done, |s, v| s.kv_done = v, |off, lim| fetch_kv_chunk(&conn, off, lim, skip), c, &pb_kv, ).await?; - let _ = kv_sent; // progress bar handles display } // ── zsets ──────────────────────────────────────────────────────────────── @@ -513,8 +511,8 @@ async fn main() -> Result<()> { { let c = Arc::clone(&client); stream_table( - "zset", zsets_total, zsets_start, &cli, - &mut state.zsets_done, &mut state, + "zset", zsets_total, &cli, &mut state, + |s| s.zsets_done, |s, v| s.zsets_done = v, |off, lim| fetch_zset_chunk(&conn, off, lim), c, &pb_zset, ).await?; @@ -525,8 +523,8 @@ async fn main() -> Result<()> { { let c = Arc::clone(&client); stream_table( - "set", sets_total, sets_start, &cli, - &mut state.sets_done, &mut state, + "set", sets_total, &cli, &mut state, + |s| s.sets_done, |s, v| s.sets_done = v, |off, lim| fetch_set_chunk(&conn, off, lim), c, &pb_set, ).await?; From 69c98a8cc35a49c74f6ceb1bb661071abd956d1c Mon Sep 17 00:00:00 2001 From: Eth-Interchained <117552056+Eth-Interchained@users.noreply.github.com> Date: Mon, 15 Jun 2026 18:15:26 -0400 Subject: [PATCH 8/8] =?UTF-8?q?feat(migrator):=20rewrite=20Python=20migrat?= =?UTF-8?q?or=20v2=20=E2=80=94=20streaming=20async=20+=20resumable?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - LIMIT/OFFSET streaming (2000 rows/chunk), constant peak memory - asyncio concurrent batch sends (default concurrency=4) - 4-attempt retry with exponential backoff on timeouts/errors - State file saved after every chunk (atomic tmp+rename) - nedbd verification at startup: max(state_file, nedbd_count) - --skip-block-cache, --reset, --no-verify, --dry-run, --chunk flags - stdlib progress bar with rows/s and ETA Co-Authored-By: NEDB Maintainer (Claude Sonnet 4.6) --- backend/scripts/migrate_sqlite_to_nedb.py | 487 ++++++++++++++++++++++ 1 file changed, 487 insertions(+) create mode 100644 backend/scripts/migrate_sqlite_to_nedb.py diff --git a/backend/scripts/migrate_sqlite_to_nedb.py b/backend/scripts/migrate_sqlite_to_nedb.py new file mode 100644 index 0000000..2167be2 --- /dev/null +++ b/backend/scripts/migrate_sqlite_to_nedb.py @@ -0,0 +1,487 @@ +#!/usr/bin/env python3 +"""migrate_sqlite_to_nedb.py — optimized, resumable SQLite → nedbd migrator. + +Streams SQLite rows via LIMIT/OFFSET so peak memory is proportional to +--chunk, not total rows. Sends nedbd batches concurrently (asyncio semaphore). +Persists a state file after every chunk so interrupted runs resume within one +chunk of where they stopped. Verifies actual nedbd collection counts at +startup and advances past data already present (survives partial prior runs). + +Usage +----- + # Full migration (auto-resumes if interrupted): + python migrate_sqlite_to_nedb.py --sqlite ../data/vision.db + + # Skip the ~1.2M block-cache rows — only migrate live state: + python migrate_sqlite_to_nedb.py --sqlite ../data/vision.db --skip-block-cache + + # Dry run (counts rows, no writes): + python migrate_sqlite_to_nedb.py --sqlite ../data/vision.db --dry-run + + # Reset saved progress and start fresh: + python migrate_sqlite_to_nedb.py --sqlite ../data/vision.db --reset + + # Tune for a slow/encrypted nedbd: + python migrate_sqlite_to_nedb.py --concurrency 2 --batch-size 50 + +Environment variables (override CLI defaults): + NEDB_URL, NEDB_DB_NAME, NEDBD_TOKEN +""" + +from __future__ import annotations + +import argparse +import asyncio +import json +import os +import sys +import time +from pathlib import Path +from typing import Any, Callable, Optional + +try: + import aiosqlite + import httpx +except ImportError: + print("ERROR: aiosqlite and httpx are required.") + print(" pip install aiosqlite httpx") + sys.exit(1) + +__version__ = "2.0.0" + +# --------------------------------------------------------------------------- +# State file (resume) +# --------------------------------------------------------------------------- + +def _load_state(path: Path) -> dict: + try: + return json.loads(path.read_text()) + except Exception: + return {"kv_done": 0, "zsets_done": 0, "sets_done": 0} + + +def _save_state(path: Path, state: dict) -> None: + """Atomic write via temp file + rename.""" + tmp = path.with_suffix(".tmp") + tmp.write_text(json.dumps(state, indent=2)) + tmp.rename(path) + + +# --------------------------------------------------------------------------- +# nedbd helpers +# --------------------------------------------------------------------------- + +async def _nedb_health(client: httpx.AsyncClient, base: str) -> dict: + r = await client.get(f"{base}/health") + r.raise_for_status() + return r.json() + + +async def _ensure_db(client: httpx.AsyncClient, base: str, db: str) -> None: + for attempt in range(1, 4): + try: + r = await client.get(f"{base}/v1/databases/{db}") + if r.status_code == 200: + return + if r.status_code == 404: + cr = await client.post(f"{base}/v1/databases", json={"name": db}) + cr.raise_for_status() + return + except (httpx.TimeoutException, httpx.ConnectError) as e: + if attempt < 3: + print(f" ensure_db attempt {attempt}/3 failed ({e}), retrying in 5s…") + await asyncio.sleep(5) + else: + raise + + +async def _count_collection( + client: httpx.AsyncClient, base: str, db: str, coll: str +) -> int: + try: + r = await client.post( + f"{base}/v1/databases/{db}/query", + json={"nql": f"FROM {coll} LIMIT 9999999"}, + timeout=120.0, + ) + if r.status_code == 200: + return int(r.json().get("count", 0)) + except Exception: + pass + return 0 + + +async def _send_batch( + client: httpx.AsyncClient, + base: str, + db: str, + ops: list, + dry_run: bool = False, +) -> int: + if dry_run: + return len(ops) + + delay = 0.5 + last_err = "" + for attempt in range(1, 5): + try: + r = await client.post( + f"{base}/v1/databases/{db}/batch", + json={"ops": ops}, + timeout=120.0, + ) + if r.status_code == 200: + return int(r.json().get("count", 0)) + last_err = f"HTTP {r.status_code}" + except (httpx.TimeoutException, httpx.ConnectError) as e: + last_err = str(e) + + if attempt < 4: + print(f"\n batch retry {attempt}/4 ({last_err}), waiting {delay:.1f}s…", + flush=True) + await asyncio.sleep(delay) + delay = min(delay * 2, 8.0) + + raise RuntimeError(f"batch failed after 4 attempts: {last_err}") + + +# --------------------------------------------------------------------------- +# Progress bar (stdlib only) +# --------------------------------------------------------------------------- + +class _Bar: + def __init__(self, label: str, total: int, start: int = 0) -> None: + self.label = label + self.total = max(total, 1) + self.done = start + self._t0 = time.time() + self._drawn = 0 + + def update(self, n: int = 0) -> None: + self.done += n + elapsed = max(time.time() - self._t0, 0.001) + rate = self.done / elapsed + pct = self.done / self.total * 100 + remain = max(self.total - self.done, 0) + eta = f"{remain / max(rate, 1):.0f}s" if self.done > 0 else "?" + bar_w = 38 + filled = int(bar_w * self.done / self.total) + bar = "█" * filled + "░" * (bar_w - filled) + line = ( + f"\r {self.label:<6} [{bar}] " + f"{self.done:>9,}/{self.total:>9,} " + f"{rate:>8,.0f}/s eta {eta} " + ) + print(line, end="", flush=True) + self._drawn += 1 + + def finish(self) -> None: + self.update(0) + print(flush=True) + + +# --------------------------------------------------------------------------- +# Core: stream one table +# --------------------------------------------------------------------------- + +async def stream_table( + *, + label: str, + db_path: Path, + fetch_sql: str, + total: int, + start_offset: int, + to_op: Callable, + state: dict, + state_key: str, + state_file: Path, + client: httpx.AsyncClient, + base: str, + db: str, + chunk: int, + batch_size: int, + concurrency: int, + dry_run: bool, +) -> int: + if start_offset >= total: + print(f" {label}: already done ({start_offset:,}/{total:,})") + return 0 + + sem = asyncio.Semaphore(concurrency) + bar = _Bar(label, total, start_offset) + sent = 0 + offset = start_offset + + async with aiosqlite.connect(str(db_path)) as conn: + conn.row_factory = aiosqlite.Row + + while offset < total: + async with conn.execute(fetch_sql, (chunk, offset)) as cur: + raw_rows = await cur.fetchall() + + if not raw_rows: + offset += chunk + continue + + ops = [op for row in raw_rows if (op := to_op(row)) is not None] + chunk_len = len(raw_rows) + + if ops: + tasks = [] + for i in range(0, len(ops), batch_size): + batch = ops[i : i + batch_size] + + async def _send(b: list = batch) -> int: + async with sem: + return await _send_batch(client, base, db, b, dry_run) + + tasks.append(asyncio.create_task(_send())) + + results = await asyncio.gather(*tasks, return_exceptions=True) + for r in results: + if isinstance(r, Exception): + raise r + sent += r + bar.update(r) + + offset += chunk_len + state[state_key] = offset + if not dry_run: + _save_state(state_file, state) + + bar.finish() + return sent + + +# --------------------------------------------------------------------------- +# Row → nedbd op converters +# --------------------------------------------------------------------------- + +def _make_kv_op(skip_block_cache: bool) -> Callable: + now = time.time() + + def _op(row: aiosqlite.Row) -> Optional[dict]: + key = row["key"] + value = row["value"] + expires_at = row["expires_at"] + if expires_at is not None and expires_at < now: + return None + if skip_block_cache and ( + key.startswith("vision:block:height:") + or key.startswith("vision:block:hash:") + ): + return None + return { + "op": "put", "coll": "kv", "id": key, + "doc": {"_id": key, "value": value, "expires_at": expires_at}, + } + + return _op + + +def _zset_op(row: aiosqlite.Row) -> dict: + name, member, score = row["name"], row["member"], row["score"] + doc_id = f"{name}::{member}" + return { + "op": "put", "coll": "zset", "id": doc_id, + "doc": {"_id": doc_id, "_name": name, "_member": member, "score": score}, + } + + +def _set_op(row: aiosqlite.Row) -> dict: + name, member = row["name"], row["member"] + doc_id = f"{name}::{member}" + return { + "op": "put", "coll": "set", "id": doc_id, + "doc": {"_id": doc_id, "_name": name, "_member": member}, + } + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +async def _run(args: argparse.Namespace) -> int: + base = args.nedb_url.rstrip("/") + db = args.db + + print(f"\nnedb-migrator v{__version__} — SQLite → nedbd (streaming async)\n") + print(f" sqlite {args.sqlite}") + print(f" nedbd {base}") + print(f" database {db}") + print(f" chunk {args.chunk:,}") + print(f" concurrency {args.concurrency}") + print(f" batch-size {args.batch_size}") + print(f" skip-block-cache {args.skip_block_cache}") + print(f" dry-run {args.dry_run}") + print(f" state file {args.state_file}\n") + + # ── State ──────────────────────────────────────────────────────────────── + state_file = Path(args.state_file) + if args.reset and state_file.exists(): + state_file.unlink() + print("↺ State reset.\n") + + state = _load_state(state_file) + if any(state.get(k, 0) for k in ("kv_done", "zsets_done", "sets_done")): + print(f"→ Resuming — kv={state['kv_done']:,} " + f"zsets={state['zsets_done']:,} sets={state['sets_done']:,}\n") + + # ── Count rows ─────────────────────────────────────────────────────────── + sqlite_path = Path(args.sqlite) + if not sqlite_path.exists(): + print(f"ERROR: SQLite not found: {sqlite_path}") + return 1 + + async with aiosqlite.connect(str(sqlite_path)) as conn: + async with conn.execute("SELECT COUNT(*) FROM kv") as c: kv_total = (await c.fetchone())[0] + async with conn.execute("SELECT COUNT(*) FROM zsets") as c: zsets_total = (await c.fetchone())[0] + async with conn.execute("SELECT COUNT(*) FROM sets") as c: sets_total = (await c.fetchone())[0] + + print(f"◉ Rows in SQLite — kv={kv_total:,} zsets={zsets_total:,} sets={sets_total:,}\n") + + # ── nedbd ──────────────────────────────────────────────────────────────── + headers: dict = {} + if args.token: + headers["Authorization"] = f"Bearer {args.token}" + + async with httpx.AsyncClient(headers=headers, timeout=120.0) as client: + if not args.dry_run: + try: + h = await _nedb_health(client, base) + print(f"✓ nedbd OK version={h.get('version')} " + f"encrypted={h.get('encrypted')}\n") + except Exception as e: + print(f"ERROR: Cannot reach nedbd at {base}: {e}") + return 1 + + if not args.no_verify: + await _ensure_db(client, base, db) + print("◉ Verifying against nedbd…", end=" ", flush=True) + kv_n = await _count_collection(client, base, db, "kv") + zset_n = await _count_collection(client, base, db, "zset") + set_n = await _count_collection(client, base, db, "set") + print(f"kv={kv_n:,} zset={zset_n:,} set={set_n:,}") + + advanced = False + for key, nedb_n, total, lbl in [ + ("kv_done", kv_n, kv_total, "kv"), + ("zsets_done", zset_n, zsets_total, "zsets"), + ("sets_done", set_n, sets_total, "sets"), + ]: + cur = state.get(key, 0) + if nedb_n >= total: + print(f" ✓ {lbl}: all {total:,} rows already in nedbd") + state[key] = total + advanced = True + elif nedb_n > cur: + print(f" ↑ {lbl}: state={cur:,} → nedbd={nedb_n:,} (advancing)") + state[key] = nedb_n + advanced = True + + if advanced: + _save_state(state_file, state) + print(" ✓ State synced from nedbd.\n") + else: + print(" ✓ Consistent.\n") + else: + print("⚠ Dry-run — skipping nedbd check\n") + + t0 = time.time() + skip = args.skip_block_cache + + kv_start = state.get("kv_done", 0) + zsets_start = state.get("zsets_done", 0) + sets_start = state.get("sets_done", 0) + + kv_sent = await stream_table( + label="kv", + db_path=sqlite_path, + fetch_sql="SELECT key, value, expires_at FROM kv ORDER BY rowid LIMIT ? OFFSET ?", + total=kv_total, + start_offset=kv_start, + to_op=_make_kv_op(skip), + state=state, state_key="kv_done", state_file=state_file, + client=client, base=base, db=db, + chunk=args.chunk, batch_size=args.batch_size, + concurrency=args.concurrency, dry_run=args.dry_run, + ) + + zsets_sent = await stream_table( + label="zsets", + db_path=sqlite_path, + fetch_sql="SELECT name, member, score FROM zsets ORDER BY rowid LIMIT ? OFFSET ?", + total=zsets_total, + start_offset=zsets_start, + to_op=_zset_op, + state=state, state_key="zsets_done", state_file=state_file, + client=client, base=base, db=db, + chunk=args.chunk, batch_size=args.batch_size, + concurrency=args.concurrency, dry_run=args.dry_run, + ) + + sets_sent = await stream_table( + label="sets", + db_path=sqlite_path, + fetch_sql="SELECT name, member FROM sets ORDER BY rowid LIMIT ? OFFSET ?", + total=sets_total, + start_offset=sets_start, + to_op=_set_op, + state=state, state_key="sets_done", state_file=state_file, + client=client, base=base, db=db, + chunk=args.chunk, batch_size=args.batch_size, + concurrency=args.concurrency, dry_run=args.dry_run, + ) + + elapsed = time.time() - t0 + total = kv_sent + zsets_sent + sets_sent + rps = total / max(elapsed, 0.001) + + print("\n" + "─" * 52) + print(" Migration complete " if not args.dry_run else " DRY-RUN summary ") + print("─" * 52) + print(f" kv sent: {kv_sent:>10,}") + if kv_start: + print(f" kv skipped: {kv_start:>10,} (already in nedbd)") + print(f" zsets sent: {zsets_sent:>10,}") + print(f" sets sent: {sets_sent:>10,}") + print(f" total: {total:>10,}") + print(f" elapsed: {elapsed:>9.1f}s ({rps:,.0f} rows/s)") + if not args.dry_run and total > 0: + print(f"\n✓ State → {state_file}") + if total == 0: + print("\n✓ Nothing new — already migrated. Use --reset to start over.") + print() + return 0 + + +def main() -> None: + p = argparse.ArgumentParser( + description="Optimized, resumable SQLite → nedbd migrator (v2)", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + p.add_argument("--sqlite", default=os.getenv("SQLITE_PATH", "../data/vision.db")) + p.add_argument("--nedb-url", default=os.getenv("NEDB_URL", "http://127.0.0.1:7070")) + p.add_argument("--db", default=os.getenv("NEDB_DB_NAME", "vision")) + p.add_argument("--token", default=os.getenv("NEDBD_TOKEN", "")) + p.add_argument("--chunk", type=int, default=2000, + help="rows fetched from SQLite per pass (controls peak memory)") + p.add_argument("--concurrency", type=int, default=4, + help="concurrent nedbd batch requests (lower for encrypted DBs)") + p.add_argument("--batch-size", type=int, default=50, + help="rows per nedbd batch request") + p.add_argument("--skip-block-cache", action="store_true", + help="skip vision:block:height:* and vision:block:hash:* rows") + p.add_argument("--state-file", default=".nedb-migrator-state.json") + p.add_argument("--reset", action="store_true", + help="delete state file and start from scratch") + p.add_argument("--no-verify", action="store_true", + help="skip nedbd verification pass at startup") + p.add_argument("--dry-run", action="store_true", + help="count rows, print plan, no writes") + args = p.parse_args() + sys.exit(asyncio.run(_run(args))) + + +if __name__ == "__main__": + main()