diff --git a/backend/scripts/migrate_sqlite_to_nedb.py b/backend/scripts/migrate_sqlite_to_nedb.py new file mode 100644 index 0000000..2167be2 --- /dev/null +++ b/backend/scripts/migrate_sqlite_to_nedb.py @@ -0,0 +1,487 @@ +#!/usr/bin/env python3 +"""migrate_sqlite_to_nedb.py — optimized, resumable SQLite → nedbd migrator. + +Streams SQLite rows via LIMIT/OFFSET so peak memory is proportional to +--chunk, not total rows. Sends nedbd batches concurrently (asyncio semaphore). +Persists a state file after every chunk so interrupted runs resume within one +chunk of where they stopped. Verifies actual nedbd collection counts at +startup and advances past data already present (survives partial prior runs). + +Usage +----- + # Full migration (auto-resumes if interrupted): + python migrate_sqlite_to_nedb.py --sqlite ../data/vision.db + + # Skip the ~1.2M block-cache rows — only migrate live state: + python migrate_sqlite_to_nedb.py --sqlite ../data/vision.db --skip-block-cache + + # Dry run (counts rows, no writes): + python migrate_sqlite_to_nedb.py --sqlite ../data/vision.db --dry-run + + # Reset saved progress and start fresh: + python migrate_sqlite_to_nedb.py --sqlite ../data/vision.db --reset + + # Tune for a slow/encrypted nedbd: + python migrate_sqlite_to_nedb.py --concurrency 2 --batch-size 50 + +Environment variables (override CLI defaults): + NEDB_URL, NEDB_DB_NAME, NEDBD_TOKEN +""" + +from __future__ import annotations + +import argparse +import asyncio +import json +import os +import sys +import time +from pathlib import Path +from typing import Any, Callable, Optional + +try: + import aiosqlite + import httpx +except ImportError: + print("ERROR: aiosqlite and httpx are required.") + print(" pip install aiosqlite httpx") + sys.exit(1) + +__version__ = "2.0.0" + +# --------------------------------------------------------------------------- +# State file (resume) +# --------------------------------------------------------------------------- + +def _load_state(path: Path) -> dict: + try: + return json.loads(path.read_text()) + except Exception: + return {"kv_done": 0, "zsets_done": 0, "sets_done": 0} + + +def _save_state(path: Path, state: dict) -> None: + """Atomic write via temp file + rename.""" + tmp = path.with_suffix(".tmp") + tmp.write_text(json.dumps(state, indent=2)) + tmp.rename(path) + + +# --------------------------------------------------------------------------- +# nedbd helpers +# --------------------------------------------------------------------------- + +async def _nedb_health(client: httpx.AsyncClient, base: str) -> dict: + r = await client.get(f"{base}/health") + r.raise_for_status() + return r.json() + + +async def _ensure_db(client: httpx.AsyncClient, base: str, db: str) -> None: + for attempt in range(1, 4): + try: + r = await client.get(f"{base}/v1/databases/{db}") + if r.status_code == 200: + return + if r.status_code == 404: + cr = await client.post(f"{base}/v1/databases", json={"name": db}) + cr.raise_for_status() + return + except (httpx.TimeoutException, httpx.ConnectError) as e: + if attempt < 3: + print(f" ensure_db attempt {attempt}/3 failed ({e}), retrying in 5s…") + await asyncio.sleep(5) + else: + raise + + +async def _count_collection( + client: httpx.AsyncClient, base: str, db: str, coll: str +) -> int: + try: + r = await client.post( + f"{base}/v1/databases/{db}/query", + json={"nql": f"FROM {coll} LIMIT 9999999"}, + timeout=120.0, + ) + if r.status_code == 200: + return int(r.json().get("count", 0)) + except Exception: + pass + return 0 + + +async def _send_batch( + client: httpx.AsyncClient, + base: str, + db: str, + ops: list, + dry_run: bool = False, +) -> int: + if dry_run: + return len(ops) + + delay = 0.5 + last_err = "" + for attempt in range(1, 5): + try: + r = await client.post( + f"{base}/v1/databases/{db}/batch", + json={"ops": ops}, + timeout=120.0, + ) + if r.status_code == 200: + return int(r.json().get("count", 0)) + last_err = f"HTTP {r.status_code}" + except (httpx.TimeoutException, httpx.ConnectError) as e: + last_err = str(e) + + if attempt < 4: + print(f"\n batch retry {attempt}/4 ({last_err}), waiting {delay:.1f}s…", + flush=True) + await asyncio.sleep(delay) + delay = min(delay * 2, 8.0) + + raise RuntimeError(f"batch failed after 4 attempts: {last_err}") + + +# --------------------------------------------------------------------------- +# Progress bar (stdlib only) +# --------------------------------------------------------------------------- + +class _Bar: + def __init__(self, label: str, total: int, start: int = 0) -> None: + self.label = label + self.total = max(total, 1) + self.done = start + self._t0 = time.time() + self._drawn = 0 + + def update(self, n: int = 0) -> None: + self.done += n + elapsed = max(time.time() - self._t0, 0.001) + rate = self.done / elapsed + pct = self.done / self.total * 100 + remain = max(self.total - self.done, 0) + eta = f"{remain / max(rate, 1):.0f}s" if self.done > 0 else "?" + bar_w = 38 + filled = int(bar_w * self.done / self.total) + bar = "█" * filled + "░" * (bar_w - filled) + line = ( + f"\r {self.label:<6} [{bar}] " + f"{self.done:>9,}/{self.total:>9,} " + f"{rate:>8,.0f}/s eta {eta} " + ) + print(line, end="", flush=True) + self._drawn += 1 + + def finish(self) -> None: + self.update(0) + print(flush=True) + + +# --------------------------------------------------------------------------- +# Core: stream one table +# --------------------------------------------------------------------------- + +async def stream_table( + *, + label: str, + db_path: Path, + fetch_sql: str, + total: int, + start_offset: int, + to_op: Callable, + state: dict, + state_key: str, + state_file: Path, + client: httpx.AsyncClient, + base: str, + db: str, + chunk: int, + batch_size: int, + concurrency: int, + dry_run: bool, +) -> int: + if start_offset >= total: + print(f" {label}: already done ({start_offset:,}/{total:,})") + return 0 + + sem = asyncio.Semaphore(concurrency) + bar = _Bar(label, total, start_offset) + sent = 0 + offset = start_offset + + async with aiosqlite.connect(str(db_path)) as conn: + conn.row_factory = aiosqlite.Row + + while offset < total: + async with conn.execute(fetch_sql, (chunk, offset)) as cur: + raw_rows = await cur.fetchall() + + if not raw_rows: + offset += chunk + continue + + ops = [op for row in raw_rows if (op := to_op(row)) is not None] + chunk_len = len(raw_rows) + + if ops: + tasks = [] + for i in range(0, len(ops), batch_size): + batch = ops[i : i + batch_size] + + async def _send(b: list = batch) -> int: + async with sem: + return await _send_batch(client, base, db, b, dry_run) + + tasks.append(asyncio.create_task(_send())) + + results = await asyncio.gather(*tasks, return_exceptions=True) + for r in results: + if isinstance(r, Exception): + raise r + sent += r + bar.update(r) + + offset += chunk_len + state[state_key] = offset + if not dry_run: + _save_state(state_file, state) + + bar.finish() + return sent + + +# --------------------------------------------------------------------------- +# Row → nedbd op converters +# --------------------------------------------------------------------------- + +def _make_kv_op(skip_block_cache: bool) -> Callable: + now = time.time() + + def _op(row: aiosqlite.Row) -> Optional[dict]: + key = row["key"] + value = row["value"] + expires_at = row["expires_at"] + if expires_at is not None and expires_at < now: + return None + if skip_block_cache and ( + key.startswith("vision:block:height:") + or key.startswith("vision:block:hash:") + ): + return None + return { + "op": "put", "coll": "kv", "id": key, + "doc": {"_id": key, "value": value, "expires_at": expires_at}, + } + + return _op + + +def _zset_op(row: aiosqlite.Row) -> dict: + name, member, score = row["name"], row["member"], row["score"] + doc_id = f"{name}::{member}" + return { + "op": "put", "coll": "zset", "id": doc_id, + "doc": {"_id": doc_id, "_name": name, "_member": member, "score": score}, + } + + +def _set_op(row: aiosqlite.Row) -> dict: + name, member = row["name"], row["member"] + doc_id = f"{name}::{member}" + return { + "op": "put", "coll": "set", "id": doc_id, + "doc": {"_id": doc_id, "_name": name, "_member": member}, + } + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +async def _run(args: argparse.Namespace) -> int: + base = args.nedb_url.rstrip("/") + db = args.db + + print(f"\nnedb-migrator v{__version__} — SQLite → nedbd (streaming async)\n") + print(f" sqlite {args.sqlite}") + print(f" nedbd {base}") + print(f" database {db}") + print(f" chunk {args.chunk:,}") + print(f" concurrency {args.concurrency}") + print(f" batch-size {args.batch_size}") + print(f" skip-block-cache {args.skip_block_cache}") + print(f" dry-run {args.dry_run}") + print(f" state file {args.state_file}\n") + + # ── State ──────────────────────────────────────────────────────────────── + state_file = Path(args.state_file) + if args.reset and state_file.exists(): + state_file.unlink() + print("↺ State reset.\n") + + state = _load_state(state_file) + if any(state.get(k, 0) for k in ("kv_done", "zsets_done", "sets_done")): + print(f"→ Resuming — kv={state['kv_done']:,} " + f"zsets={state['zsets_done']:,} sets={state['sets_done']:,}\n") + + # ── Count rows ─────────────────────────────────────────────────────────── + sqlite_path = Path(args.sqlite) + if not sqlite_path.exists(): + print(f"ERROR: SQLite not found: {sqlite_path}") + return 1 + + async with aiosqlite.connect(str(sqlite_path)) as conn: + async with conn.execute("SELECT COUNT(*) FROM kv") as c: kv_total = (await c.fetchone())[0] + async with conn.execute("SELECT COUNT(*) FROM zsets") as c: zsets_total = (await c.fetchone())[0] + async with conn.execute("SELECT COUNT(*) FROM sets") as c: sets_total = (await c.fetchone())[0] + + print(f"◉ Rows in SQLite — kv={kv_total:,} zsets={zsets_total:,} sets={sets_total:,}\n") + + # ── nedbd ──────────────────────────────────────────────────────────────── + headers: dict = {} + if args.token: + headers["Authorization"] = f"Bearer {args.token}" + + async with httpx.AsyncClient(headers=headers, timeout=120.0) as client: + if not args.dry_run: + try: + h = await _nedb_health(client, base) + print(f"✓ nedbd OK version={h.get('version')} " + f"encrypted={h.get('encrypted')}\n") + except Exception as e: + print(f"ERROR: Cannot reach nedbd at {base}: {e}") + return 1 + + if not args.no_verify: + await _ensure_db(client, base, db) + print("◉ Verifying against nedbd…", end=" ", flush=True) + kv_n = await _count_collection(client, base, db, "kv") + zset_n = await _count_collection(client, base, db, "zset") + set_n = await _count_collection(client, base, db, "set") + print(f"kv={kv_n:,} zset={zset_n:,} set={set_n:,}") + + advanced = False + for key, nedb_n, total, lbl in [ + ("kv_done", kv_n, kv_total, "kv"), + ("zsets_done", zset_n, zsets_total, "zsets"), + ("sets_done", set_n, sets_total, "sets"), + ]: + cur = state.get(key, 0) + if nedb_n >= total: + print(f" ✓ {lbl}: all {total:,} rows already in nedbd") + state[key] = total + advanced = True + elif nedb_n > cur: + print(f" ↑ {lbl}: state={cur:,} → nedbd={nedb_n:,} (advancing)") + state[key] = nedb_n + advanced = True + + if advanced: + _save_state(state_file, state) + print(" ✓ State synced from nedbd.\n") + else: + print(" ✓ Consistent.\n") + else: + print("⚠ Dry-run — skipping nedbd check\n") + + t0 = time.time() + skip = args.skip_block_cache + + kv_start = state.get("kv_done", 0) + zsets_start = state.get("zsets_done", 0) + sets_start = state.get("sets_done", 0) + + kv_sent = await stream_table( + label="kv", + db_path=sqlite_path, + fetch_sql="SELECT key, value, expires_at FROM kv ORDER BY rowid LIMIT ? OFFSET ?", + total=kv_total, + start_offset=kv_start, + to_op=_make_kv_op(skip), + state=state, state_key="kv_done", state_file=state_file, + client=client, base=base, db=db, + chunk=args.chunk, batch_size=args.batch_size, + concurrency=args.concurrency, dry_run=args.dry_run, + ) + + zsets_sent = await stream_table( + label="zsets", + db_path=sqlite_path, + fetch_sql="SELECT name, member, score FROM zsets ORDER BY rowid LIMIT ? OFFSET ?", + total=zsets_total, + start_offset=zsets_start, + to_op=_zset_op, + state=state, state_key="zsets_done", state_file=state_file, + client=client, base=base, db=db, + chunk=args.chunk, batch_size=args.batch_size, + concurrency=args.concurrency, dry_run=args.dry_run, + ) + + sets_sent = await stream_table( + label="sets", + db_path=sqlite_path, + fetch_sql="SELECT name, member FROM sets ORDER BY rowid LIMIT ? OFFSET ?", + total=sets_total, + start_offset=sets_start, + to_op=_set_op, + state=state, state_key="sets_done", state_file=state_file, + client=client, base=base, db=db, + chunk=args.chunk, batch_size=args.batch_size, + concurrency=args.concurrency, dry_run=args.dry_run, + ) + + elapsed = time.time() - t0 + total = kv_sent + zsets_sent + sets_sent + rps = total / max(elapsed, 0.001) + + print("\n" + "─" * 52) + print(" Migration complete " if not args.dry_run else " DRY-RUN summary ") + print("─" * 52) + print(f" kv sent: {kv_sent:>10,}") + if kv_start: + print(f" kv skipped: {kv_start:>10,} (already in nedbd)") + print(f" zsets sent: {zsets_sent:>10,}") + print(f" sets sent: {sets_sent:>10,}") + print(f" total: {total:>10,}") + print(f" elapsed: {elapsed:>9.1f}s ({rps:,.0f} rows/s)") + if not args.dry_run and total > 0: + print(f"\n✓ State → {state_file}") + if total == 0: + print("\n✓ Nothing new — already migrated. Use --reset to start over.") + print() + return 0 + + +def main() -> None: + p = argparse.ArgumentParser( + description="Optimized, resumable SQLite → nedbd migrator (v2)", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + p.add_argument("--sqlite", default=os.getenv("SQLITE_PATH", "../data/vision.db")) + p.add_argument("--nedb-url", default=os.getenv("NEDB_URL", "http://127.0.0.1:7070")) + p.add_argument("--db", default=os.getenv("NEDB_DB_NAME", "vision")) + p.add_argument("--token", default=os.getenv("NEDBD_TOKEN", "")) + p.add_argument("--chunk", type=int, default=2000, + help="rows fetched from SQLite per pass (controls peak memory)") + p.add_argument("--concurrency", type=int, default=4, + help="concurrent nedbd batch requests (lower for encrypted DBs)") + p.add_argument("--batch-size", type=int, default=50, + help="rows per nedbd batch request") + p.add_argument("--skip-block-cache", action="store_true", + help="skip vision:block:height:* and vision:block:hash:* rows") + p.add_argument("--state-file", default=".nedb-migrator-state.json") + p.add_argument("--reset", action="store_true", + help="delete state file and start from scratch") + p.add_argument("--no-verify", action="store_true", + help="skip nedbd verification pass at startup") + p.add_argument("--dry-run", action="store_true", + help="count rows, print plan, no writes") + args = p.parse_args() + sys.exit(asyncio.run(_run(args))) + + +if __name__ == "__main__": + main() diff --git a/tools/nedb-migrator/Cargo.toml b/tools/nedb-migrator/Cargo.toml new file mode 100644 index 0000000..f42533c --- /dev/null +++ b/tools/nedb-migrator/Cargo.toml @@ -0,0 +1,42 @@ +[package] +name = "nedb-migrator" +version = "1.0.0" +edition = "2021" +authors = ["Interchained LLC"] +description = "Fast, resumable SQLite → nedbd migrator for Interchained Vision" +license = "MIT" +repository = "https://github.com/Eth-Interchained/vision" + +[[bin]] +name = "nedb-migrator" +path = "src/main.rs" + +[dependencies] +# SQLite — bundled feature so no system libsqlite3 required +rusqlite = { version = "0.31", features = ["bundled"] } + +# Async HTTP — used for concurrent nedbd batch calls +reqwest = { version = "0.12", features = ["json"] } +tokio = { version = "1", features = ["full"] } + +# Serialisation +serde = { version = "1", features = ["derive"] } +serde_json = "1" + +# CLI +clap = { version = "4", features = ["derive", "env"] } + +# Progress bars +indicatif = "0.17" + +# Error handling +anyhow = "1" + +# Coloured output +colored = "2" + +[profile.release] +opt-level = 3 +lto = "fat" +codegen-units = 1 +strip = true diff --git a/tools/nedb-migrator/src/main.rs b/tools/nedb-migrator/src/main.rs new file mode 100644 index 0000000..6cc4342 --- /dev/null +++ b/tools/nedb-migrator/src/main.rs @@ -0,0 +1,557 @@ +//! nedb-migrator — fast, resumable, low-memory SQLite → nedbd migration tool +//! +//! Streams SQLite rows in chunks so memory usage stays constant regardless of +//! table size. 1.2M rows uses the same RAM as 1k rows. +//! +//! # Usage +//! +//! ```bash +//! nedb-migrator --sqlite ../data/vision.db +//! nedb-migrator --sqlite ../data/vision.db --skip-block-cache +//! nedb-migrator --sqlite ../data/vision.db --reset +//! nedb-migrator --sqlite ../data/vision.db --dry-run +//! nedb-migrator --sqlite ../data/vision.db --chunk 5000 --concurrency 32 +//! ``` + +use std::fs; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::time::Instant; + +use anyhow::{Context, Result}; +use clap::Parser; +use colored::*; +use indicatif::{ProgressBar, ProgressStyle}; +use reqwest::Client; +use rusqlite::Connection; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; +use tokio::sync::Semaphore; + +// --------------------------------------------------------------------------- +// CLI +// --------------------------------------------------------------------------- + +#[derive(Parser, Debug)] +#[command(name = "nedb-migrator", version = "1.0.0", + about = "Fast, resumable, streaming SQLite → nedbd migration")] +struct Cli { + #[arg(long, default_value = "../data/vision.db")] + sqlite: PathBuf, + + #[arg(long, default_value = "http://127.0.0.1:7070")] + nedb_url: String, + + #[arg(long, default_value = "vision")] + db: String, + + #[arg(long, default_value = "")] + token: String, + + /// Rows fetched from SQLite per streaming chunk (controls peak memory) + #[arg(long, default_value_t = 2000)] + chunk: usize, + + /// Concurrent nedbd batch requests per chunk. + /// Reduce to 2-4 for encrypted databases — nedbd's Sequencer serialises + /// writes and high concurrency causes queue buildup + timeouts. + #[arg(long, default_value_t = 4)] + concurrency: usize, + + /// Rows per nedbd batch request + #[arg(long, default_value_t = 50)] + batch_size: usize, + + /// Skip vision:block:height:* and vision:block:hash:* kv rows + #[arg(long)] + skip_block_cache: bool, + + #[arg(long, default_value = ".nedb-migrator-state.json")] + state_file: PathBuf, + + #[arg(long)] + reset: bool, + + #[arg(long)] + no_verify: bool, + + #[arg(long)] + dry_run: bool, + + #[arg(long, short)] + verbose: bool, +} + +// --------------------------------------------------------------------------- +// Resume state +// --------------------------------------------------------------------------- + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +struct State { + kv_done: usize, + zsets_done: usize, + sets_done: usize, +} + +fn load_state(path: &Path) -> State { + fs::read_to_string(path) + .ok() + .and_then(|s| serde_json::from_str(&s).ok()) + .unwrap_or_default() +} + +fn save_state(path: &Path, state: &State) -> Result<()> { + let tmp = path.with_extension("tmp"); + fs::write(&tmp, serde_json::to_string_pretty(state)?)?; + fs::rename(&tmp, path).context("atomic rename failed")?; + Ok(()) +} + +// --------------------------------------------------------------------------- +// SQLite helpers — streaming via LIMIT/OFFSET, never loads full table +// --------------------------------------------------------------------------- + +fn count_table(conn: &Connection, table: &str, extra_where: &str) -> Result { + let sql = if extra_where.is_empty() { + format!("SELECT COUNT(*) FROM {table}") + } else { + format!("SELECT COUNT(*) FROM {table} WHERE {extra_where}") + }; + Ok(conn.query_row(&sql, [], |r| r.get::<_, i64>(0))? as usize) +} + +/// Fetch one chunk of kv rows starting at `offset`, up to `limit` rows. +fn fetch_kv_chunk( + conn: &Connection, + offset: usize, + limit: usize, + skip_block_cache: bool, +) -> Result> { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs_f64(); + + let sql = "SELECT key, value, expires_at FROM kv ORDER BY rowid LIMIT ?1 OFFSET ?2"; + let mut stmt = conn.prepare(sql)?; + let rows: Vec = stmt + .query_map([limit as i64, offset as i64], |r| { + Ok(( + r.get::<_, String>(0)?, + r.get::<_, String>(1)?, + r.get::<_, Option>(2)?, + )) + })? + .filter_map(|r| r.ok()) + .filter(|(key, _, expires_at)| { + if let Some(exp) = expires_at { + if *exp < now { return false; } + } + if skip_block_cache + && (key.starts_with("vision:block:height:") + || key.starts_with("vision:block:hash:")) + { + return false; + } + true + }) + .map(|(key, value, expires_at)| json!({ + "op": "put", "coll": "kv", "id": &key, + "doc": { "_id": &key, "value": value, "expires_at": expires_at } + })) + .collect(); + Ok(rows) +} + +fn fetch_zset_chunk(conn: &Connection, offset: usize, limit: usize) -> Result> { + let sql = "SELECT name, member, score FROM zsets ORDER BY rowid LIMIT ?1 OFFSET ?2"; + let mut stmt = conn.prepare(sql)?; + let rows: Vec = stmt + .query_map([limit as i64, offset as i64], |r| { + Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?, r.get::<_, f64>(2)?)) + })? + .filter_map(|r| r.ok()) + .map(|(name, member, score)| { + let id = format!("{name}::{member}"); + json!({ + "op": "put", "coll": "zset", "id": &id, + "doc": { "_id": &id, "_name": name, "_member": member, "score": score } + }) + }) + .collect(); + Ok(rows) +} + +fn fetch_set_chunk(conn: &Connection, offset: usize, limit: usize) -> Result> { + let sql = "SELECT name, member FROM sets ORDER BY rowid LIMIT ?1 OFFSET ?2"; + let mut stmt = conn.prepare(sql)?; + let rows: Vec = stmt + .query_map([limit as i64, offset as i64], |r| { + Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)) + })? + .filter_map(|r| r.ok()) + .map(|(name, member)| { + let id = format!("{name}::{member}"); + json!({ + "op": "put", "coll": "set", "id": &id, + "doc": { "_id": &id, "_name": name, "_member": member } + }) + }) + .collect(); + Ok(rows) +} + +// --------------------------------------------------------------------------- +// nedbd HTTP +// --------------------------------------------------------------------------- + +trait MaybeBearer { fn maybe_bearer(self, token: &str) -> Self; } +impl MaybeBearer for reqwest::RequestBuilder { + fn maybe_bearer(self, t: &str) -> Self { + if t.is_empty() { self } else { self.bearer_auth(t) } + } +} + +async fn nedb_health(client: &Client, base: &str, token: &str) -> Result { + Ok(client.get(format!("{base}/health")) + .maybe_bearer(token).send().await?.json().await?) +} + +async fn ensure_db(client: &Client, base: &str, db: &str, token: &str) -> Result<()> { + let mut last_err = String::new(); + for attempt in 1u8..=3 { + match client.get(format!("{base}/v1/databases/{db}")) + .maybe_bearer(token).send().await + { + Ok(r) if r.status().is_success() => return Ok(()), + Ok(r) if r.status().as_u16() == 404 => { + client.post(format!("{base}/v1/databases")) + .maybe_bearer(token) + .json(&json!({"name": db})) + .send().await?.error_for_status()?; + return Ok(()); + } + Ok(r) => last_err = format!("HTTP {}", r.status()), + Err(e) => { + last_err = e.to_string(); + if attempt < 3 { + eprintln!(" ensure_db attempt {attempt}/3 failed, retrying in 5s…"); + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + } + } + } + } + anyhow::bail!("ensure_db failed after 3 attempts: {last_err}") +} + +async fn send_batch(client: &Client, base: &str, db: &str, token: &str, ops: Vec) -> Result { + // Retry up to 4 times with exponential backoff. + // Encrypted nedbd under write pressure can timeout transiently — this + // ensures a single slow Sequencer flush doesn't abort the migration. + let mut delay_ms = 500u64; + let mut last_err = String::new(); + for attempt in 1u8..=4 { + match client.post(format!("{base}/v1/databases/{db}/batch")) + .maybe_bearer(token) + .json(&json!({"ops": &ops})) + .send().await + { + Ok(r) if r.status().is_success() => { + let body: Value = r.json().await?; + return Ok(body["count"].as_u64().unwrap_or(0) as usize); + } + Ok(r) => last_err = format!("HTTP {}", r.status()), + Err(e) => last_err = e.to_string(), + } + if attempt < 4 { + tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await; + delay_ms = (delay_ms * 2).min(8_000); + } + } + anyhow::bail!("batch failed after 4 attempts: {last_err}") +} + +// --------------------------------------------------------------------------- +// Streaming table sender — processes one chunk at a time, constant memory +// --------------------------------------------------------------------------- + +async fn stream_table( + label: &str, + total: usize, + cli: &Cli, + state: &mut State, + get_done: fn(&State) -> usize, + set_done: fn(&mut State, usize), + fetch_chunk: impl Fn(usize, usize) -> Result>, + client: Arc, + pb: &ProgressBar, +) -> Result { + let start = get_done(state); + let mut offset = start; + let mut sent = 0usize; + + while offset < total { + // 1. Fetch one chunk from SQLite — only `chunk` rows in memory at once + let ops = fetch_chunk(offset, cli.chunk)?; + if ops.is_empty() { + // All remaining rows were filtered (expired / skip-block-cache) + offset += cli.chunk; + continue; + } + let chunk_len = ops.len(); + + // 2. Split into batches and fire concurrently + let sem = Arc::new(Semaphore::new(cli.concurrency)); + let mut handles = Vec::new(); + for batch in ops.chunks(cli.batch_size) { + let batch_ops = batch.to_vec(); + let batch_len = batch_ops.len(); + let c2 = Arc::clone(&client); + let sem2 = Arc::clone(&sem); + let base = cli.nedb_url.clone(); + let db = cli.db.clone(); + let tok = cli.token.clone(); + let dry = cli.dry_run; + let h: tokio::task::JoinHandle> = tokio::spawn(async move { + let _p = sem2.acquire_owned().await.unwrap(); + if dry { return Ok(batch_len); } + send_batch(&c2, &base, &db, &tok, batch_ops).await + }); + handles.push((h, batch_len)); + } + + // 3. Collect in order + let mut chunk_sent = 0usize; + for (h, batch_len) in handles { + chunk_sent += h.await.context("task panicked")??; + pb.inc(batch_len as u64); + } + + sent += chunk_sent; + offset += chunk_len; + + // 4. Save cursor atomically after every chunk + set_done(state, offset); + if !cli.dry_run { + save_state(&cli.state_file, state)?; + } + if cli.verbose { + eprintln!(" {label}: {offset}/{total} chunk_sent={chunk_sent}"); + } + } + + pb.finish_with_message(format!("{} rows", start + sent)); + Ok(sent) +} + +// --------------------------------------------------------------------------- +// nedbd-side verification +// --------------------------------------------------------------------------- + +async fn count_collection(client: &Client, base: &str, db: &str, token: &str, coll: &str) -> usize { + let res = client.post(format!("{base}/v1/databases/{db}/query")) + .maybe_bearer(token) + .json(&json!({"nql": format!("FROM {coll} LIMIT 9999999")})) + .send().await; + match res { + Ok(r) if r.status().is_success() => { + r.json::().await.ok() + .and_then(|v| v["count"].as_u64()) + .unwrap_or(0) as usize + } + _ => 0, + } +} + +async fn verify_state( + client: &Client, base: &str, db: &str, token: &str, + state: &mut State, state_file: &Path, + kv_total: usize, zsets_total: usize, sets_total: usize, +) -> Result<()> { + print!("{} Checking nedbd collections… ", "◉".blue()); + std::io::Write::flush(&mut std::io::stdout()).ok(); + + let kv_n = count_collection(client, base, db, token, "kv").await; + let zset_n = count_collection(client, base, db, token, "zset").await; + let set_n = count_collection(client, base, db, token, "set").await; + println!("kv={kv_n} zset={zset_n} set={set_n}"); + + let mut advanced = false; + macro_rules! sync_f { + ($f:expr, $n:expr, $total:expr, $lbl:expr) => { + if $n > $f && $n <= $total { + println!(" {} {}: {} → {} (advancing)", "↑".yellow(), $lbl, $f, $n); + $f = $n; advanced = true; + } else if $n >= $total { + println!(" {} {}: all {} rows already in nedbd", "✓".green(), $lbl, $total); + $f = $total; advanced = true; + } + }; + } + sync_f!(state.kv_done, kv_n, kv_total, "kv"); + sync_f!(state.zsets_done, zset_n, zsets_total, "zsets"); + sync_f!(state.sets_done, set_n, sets_total, "sets"); + + if advanced { save_state(state_file, state)?; println!(" {} State synced.\n", "✓".green()); } + else { println!(" {} Consistent.\n", "✓".green()); } + Ok(()) +} + +// --------------------------------------------------------------------------- +// Entry point +// --------------------------------------------------------------------------- + +#[tokio::main] +async fn main() -> Result<()> { + let cli = Cli::parse(); + + println!("\n{} {} — SQLite → nedbd (streaming)\n", + "nedb-migrator".bold().cyan(), "v1.0.0".dimmed()); + println!(" sqlite {}", cli.sqlite.display()); + println!(" nedbd {}", cli.nedb_url); + println!(" database {}", cli.db); + println!(" chunk size {} rows (peak memory: ~{} MB)", + cli.chunk, + cli.chunk * 300 / 1_000_000 + 1); // rough estimate + println!(" concurrency {}", cli.concurrency); + println!(" batch size {}", cli.batch_size); + println!(" skip-block-cache {}", cli.skip_block_cache); + println!(" dry-run {}", cli.dry_run); + println!(" state file {}", cli.state_file.display()); + println!(); + + // ── Resume state ──────────────────────────────────────────────────────── + if cli.reset && cli.state_file.exists() { + fs::remove_file(&cli.state_file).ok(); + println!("{} State reset.\n", "↺".yellow()); + } + let mut state = if cli.reset { State::default() } else { load_state(&cli.state_file) }; + + if state.kv_done + state.zsets_done + state.sets_done > 0 { + println!("{} Resuming — kv={} zsets={} sets={}\n", + "→".green(), state.kv_done, state.zsets_done, state.sets_done); + } + + // ── Open SQLite (read-only) ────────────────────────────────────────────── + let canon = cli.sqlite.canonicalize() + .with_context(|| format!("SQLite not found: {}", cli.sqlite.display()))?; + let conn = Connection::open_with_flags(&canon, + rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX)?; + + // Count rows — cheap, doesn't load data + print!("{} Counting rows… ", "◉".blue()); + std::io::Write::flush(&mut std::io::stdout()).ok(); + let kv_total = count_table(&conn, "kv", "")?; + let zsets_total = count_table(&conn, "zsets", "")?; + let sets_total = count_table(&conn, "sets", "")?; + println!("kv={} zsets={} sets={}\n", + kv_total.to_string().yellow(), + zsets_total.to_string().yellow(), + sets_total.to_string().yellow()); + + // ── Connectivity ──────────────────────────────────────────────────────── + let client = Arc::new(Client::builder() + .timeout(std::time::Duration::from_secs(120)) + .build()?); + + if !cli.dry_run { + let h = nedb_health(&client, &cli.nedb_url, &cli.token) + .await.context("Cannot reach nedbd")?; + println!("{} nedbd {} version={} encrypted={}\n", + "✓".green(), "OK".green(), + h["version"].as_str().unwrap_or("?"), + h["encrypted"].as_bool().unwrap_or(false)); + + if !cli.no_verify { + ensure_db(&client, &cli.nedb_url, &cli.db, &cli.token).await?; + verify_state(&client, &cli.nedb_url, &cli.db, &cli.token, + &mut state, &cli.state_file, + kv_total, zsets_total, sets_total).await?; + } else { + println!("{} Skipping nedbd check (--no-verify)\n", "⚠".yellow()); + } + } else { + println!("{} Dry-run\n", "⚠".yellow()); + } + + // ── Progress bars ──────────────────────────────────────────────────────── + let style = ProgressStyle::with_template( + "{prefix:.bold} [{bar:42.cyan/blue}] {pos:>9}/{len:>9} {per_sec:>12} eta {eta}" + ).unwrap().progress_chars("█▉▊▋▌▍▎▏ "); + + let pb_kv = ProgressBar::new(kv_total as u64); + pb_kv.set_style(style.clone()); pb_kv.set_prefix("kv "); + pb_kv.set_position(state.kv_done as u64); + + let pb_zset = ProgressBar::new(zsets_total as u64); + pb_zset.set_style(style.clone()); pb_zset.set_prefix("zset "); + pb_zset.set_position(state.zsets_done as u64); + + let pb_set = ProgressBar::new(sets_total as u64); + pb_set.set_style(style.clone()); pb_set.set_prefix("set "); + pb_set.set_position(state.sets_done as u64); + + let t0 = Instant::now(); + + // ── kv ─────────────────────────────────────────────────────────────────── + let kv_start = state.kv_done; + { + let skip = cli.skip_block_cache; + let c = Arc::clone(&client); + stream_table( + "kv", kv_total, &cli, &mut state, + |s| s.kv_done, |s, v| s.kv_done = v, + |off, lim| fetch_kv_chunk(&conn, off, lim, skip), + c, &pb_kv, + ).await?; + } + + // ── zsets ──────────────────────────────────────────────────────────────── + let zsets_start = state.zsets_done; + { + let c = Arc::clone(&client); + stream_table( + "zset", zsets_total, &cli, &mut state, + |s| s.zsets_done, |s, v| s.zsets_done = v, + |off, lim| fetch_zset_chunk(&conn, off, lim), + c, &pb_zset, + ).await?; + } + + // ── sets ───────────────────────────────────────────────────────────────── + let sets_start = state.sets_done; + { + let c = Arc::clone(&client); + stream_table( + "set", sets_total, &cli, &mut state, + |s| s.sets_done, |s, v| s.sets_done = v, + |off, lim| fetch_set_chunk(&conn, off, lim), + c, &pb_set, + ).await?; + } + + // ── Summary ─────────────────────────────────────────────────────────────── + let elapsed = t0.elapsed().as_secs_f64(); + let total = (state.kv_done - kv_start) + + (state.zsets_done - zsets_start) + + (state.sets_done - sets_start); + let rps = if elapsed > 0.0 { total as f64 / elapsed } else { 0.0 }; + + println!("\n{}", "─".repeat(52)); + println!("{}", if cli.dry_run { " DRY-RUN summary " } else { " Migration complete " }.bold()); + println!("{}", "─".repeat(52)); + println!(" kv sent: {}", (state.kv_done - kv_start).to_string().green()); + println!(" zsets sent: {}", (state.zsets_done - zsets_start).to_string().green()); + println!(" sets sent: {}", (state.sets_done - sets_start).to_string().green()); + println!(" total: {}", total.to_string().bold().green()); + println!(" elapsed: {:.1}s ({:.0} rows/s)", elapsed, rps); + + if !cli.dry_run && total > 0 { + println!("\n{} State → {}", "✓".green(), cli.state_file.display()); + } + if total == 0 { + println!("\n{} Nothing new — already migrated. Use --reset to start over.", "✓".green()); + } + println!(); + Ok(()) +}