Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,25 @@ OPENAI_API_KEY=
# in production so secrets decrypt across restarts and machines. Generate one:
# python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"
LANG2SQL_SECRET_KEY=

# ── Database connection ──────────────────────────────────────────────────
# The default explorer is chosen from these (precedence: LANG2SQL_DB_URL, then
# Cloudflare D1, else an offline canned stub). Install the matching driver
# extra, e.g. uv sync --extra postgres (or bigquery/snowflake/mysql/duckdb).
#
# Any SQLAlchemy URL works (one adapter, many engines):
# postgresql+psycopg://user:pass@host:5432/dbname
# bigquery://project/dataset
# snowflake://user:pass@account/db/schema?warehouse=wh
# mysql+pymysql://user:pass@host/dbname
# duckdb:////absolute/path/to/file.duckdb
LANG2SQL_DB_URL=
# Optional default schema for the SQLAlchemy explorer.
LANG2SQL_DB_SCHEMA=

# Cloudflare D1 (used when LANG2SQL_DB_URL is unset). D1 is SQLite over an HTTP
# API — no driver needed. Find IDs in the Cloudflare dashboard → D1.
CLOUDFLARE_D1_ACCOUNT_ID=
CLOUDFLARE_D1_DATABASE_ID=
# API token with D1 read access (Account → API Tokens). Required for D1 queries.
CLOUDFLARE_API_TOKEN=
17 changes: 15 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,24 @@ authors = [
dependencies = [
"discord.py>=2.3,<3.0", # Phase 1 frontend transport
"cryptography>=42.0", # EncryptedSecrets at-rest encryption
"sqlalchemy>=2.0", # generic DB explorer (one adapter, many engines)
]

[project.optional-dependencies]
# Real outbound adapters (V1 ships urllib OpenAI + stub PG; these enable v1.5 swaps)
postgres = ["psycopg[binary]>=3.2,<4.0"]
# DB driver extras. The SQLAlchemyExplorer is dialect-agnostic; install only the
# drivers you connect to. Cloudflare D1 needs no driver (HTTP API via stdlib).
postgres = ["psycopg[binary]>=3.2,<4.0"]
bigquery = ["sqlalchemy-bigquery>=1.11"]
snowflake = ["snowflake-sqlalchemy>=1.6"]
mysql = ["pymysql>=1.1"]
duckdb = ["duckdb-engine>=0.13"]
all-db = [
"psycopg[binary]>=3.2,<4.0",
"sqlalchemy-bigquery>=1.11",
"snowflake-sqlalchemy>=1.6",
"pymysql>=1.1",
"duckdb-engine>=0.13",
]

[project.scripts]
lang2sql = "lang2sql.frontends.cli.app:main"
Expand Down
17 changes: 15 additions & 2 deletions src/lang2sql/adapters/db/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,20 @@
"""DB adapters — :class:`ExplorerPort` impls."""
"""DB adapters — :class:`ExplorerPort` impls + the connection factory.

``build_explorer`` routes a connection string to the right adapter:
Cloudflare D1 over its HTTP API, everything else over generic SQLAlchemy.
"""

from __future__ import annotations

from .d1_explorer import D1Explorer
from .factory import build_explorer, explorer_from_env
from .postgres_explorer import PostgresExplorer
from .sqlalchemy_explorer import SqlAlchemyExplorer

__all__ = ["PostgresExplorer"]
__all__ = [
"build_explorer",
"explorer_from_env",
"D1Explorer",
"SqlAlchemyExplorer",
"PostgresExplorer",
]
116 changes: 116 additions & 0 deletions src/lang2sql/adapters/db/d1_explorer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
"""Cloudflare D1 explorer — read-only introspection over the D1 HTTP API.

D1 is SQLite that lives on Cloudflare's edge and is only reachable from a Worker
or over the REST query endpoint. A Python process (our Discord bot) uses the
**HTTP Query API**:

POST /client/v4/accounts/{account_id}/d1/database/{database_id}/query
Authorization: Bearer <token>
{"sql": "...", "params": [...]}

Since D1 *is* SQLite, schema introspection uses ``sqlite_master`` / ``PRAGMA``.
The HTTP call is injectable (``transport``) so the adapter is unit-testable with
no network.
"""

from __future__ import annotations

import asyncio
import json
import os
import urllib.request
from typing import Any, Callable

from ...core.ports.explorer import Column, Table

_API_ROOT = "https://api.cloudflare.com/client/v4"

# A transport takes (sql, params) and returns the parsed D1 JSON response.
Transport = Callable[[str, list], dict]


class D1Explorer:
"""ExplorerPort backed by Cloudflare D1's HTTP query API."""

def __init__(
self,
account_id: str,
database_id: str,
token: str | None = None,
*,
transport: Transport | None = None,
timeout: float = 30.0,
) -> None:
self.account_id = account_id
self.database_id = database_id
self._token = token if token is not None else os.environ.get("CLOUDFLARE_API_TOKEN")
self._timeout = timeout
self._transport = transport or self._http_transport

# --- ExplorerPort ----------------------------------------------------

async def list_tables(self) -> list[Table]:
rows = await self._query(
"SELECT name FROM sqlite_master WHERE type='table' "
"AND name NOT LIKE 'sqlite_%' AND name NOT LIKE '_cf_%' ORDER BY name"
)
return [Table(name=r["name"], schema="") for r in rows]

async def describe_table(self, name: str) -> Table:
rows = await self._query(f"PRAGMA table_info({_ident(name)})")
cols = [
Column(name=r["name"], type=r["type"] or "", nullable=not bool(r["notnull"]))
for r in rows
]
return Table(name=name, schema="", columns=cols)

async def sample_rows(self, name: str, limit: int = 5) -> list[dict]:
return await self._query(f"SELECT * FROM {_ident(name)} LIMIT {int(limit)}")

async def execute(self, sql: str, limit: int = 1000) -> list[dict]:
rows = await self._query(sql)
return rows[: int(limit)]

# --- internals -------------------------------------------------------

async def _query(self, sql: str, params: list | None = None) -> list[dict]:
resp = await asyncio.to_thread(self._transport, sql, params or [])
if not resp.get("success", False):
errors = resp.get("errors") or resp.get("messages") or "unknown D1 error"
raise RuntimeError(f"D1 query failed: {errors}")
result = resp.get("result") or []
if not result:
return []
# The query endpoint returns one result object per statement.
return result[0].get("results", []) or []

def _http_transport(self, sql: str, params: list) -> dict:
if not self._token:
raise RuntimeError("CLOUDFLARE_API_TOKEN not set (D1 requires an API token)")
url = (
f"{_API_ROOT}/accounts/{self.account_id}"
f"/d1/database/{self.database_id}/query"
)
body = json.dumps({"sql": sql, "params": params}).encode("utf-8")
req = urllib.request.Request(
url,
data=body,
method="POST",
headers={
"Authorization": f"Bearer {self._token}",
"Content-Type": "application/json",
},
)
with urllib.request.urlopen(req, timeout=self._timeout) as resp:
return json.loads(resp.read().decode("utf-8"))


def _ident(name: str) -> str:
"""Quote a SQLite identifier, rejecting anything that isn't a plain name.

introspection helpers interpolate the table name into PRAGMA/SELECT where
binds aren't allowed, so we hard-validate to avoid injection.
"""
if not name.replace("_", "").isalnum():
raise ValueError(f"unsafe table identifier: {name!r}")
return f'"{name}"'
64 changes: 64 additions & 0 deletions src/lang2sql/adapters/db/factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""build_explorer — turn a connection string into the right ExplorerPort.

This is what makes ``/connect`` trivial: the user (or env) gives one URL and the
factory routes it. Cloudflare D1 has its own HTTP adapter; everything else with
a normal SQLAlchemy URL goes through the generic SQLAlchemy explorer.

d1://<account_id>/<database_id> → D1Explorer (token from env)
postgresql+psycopg://user:…/db → SqlAlchemyExplorer
bigquery://project/dataset → SqlAlchemyExplorer
snowflake://user:…@account/db → SqlAlchemyExplorer
mysql+pymysql://… / duckdb:///… → SqlAlchemyExplorer
"""

from __future__ import annotations

import os
from urllib.parse import urlsplit

from ...core.ports.explorer import ExplorerPort
from .d1_explorer import D1Explorer
from .sqlalchemy_explorer import SqlAlchemyExplorer


def build_explorer(connection: str, *, schema: str | None = None) -> ExplorerPort:
"""Route a connection string to the matching explorer adapter.

``schema`` is forwarded to the SQLAlchemy explorer (ignored by D1, which is
schema-less SQLite). Raises ``ValueError`` on an empty/unparseable string.
"""
if not connection or not connection.strip():
raise ValueError("empty connection string")

scheme = urlsplit(connection).scheme.lower()
if not scheme:
raise ValueError(f"connection string has no scheme: {connection!r}")

if scheme == "d1":
parts = urlsplit(connection)
account_id = parts.netloc
database_id = parts.path.lstrip("/")
if not account_id or not database_id:
raise ValueError("d1 URL must be d1://<account_id>/<database_id>")
return D1Explorer(account_id=account_id, database_id=database_id)

# Anything else is assumed to be a SQLAlchemy URL (driver loaded lazily).
return SqlAlchemyExplorer(connection, schema=schema)


def explorer_from_env() -> ExplorerPort | None:
"""Build an explorer from environment, or ``None`` if nothing is configured.

Precedence: an explicit ``LANG2SQL_DB_URL`` wins; otherwise a pair of
``CLOUDFLARE_D1_ACCOUNT_ID`` + ``CLOUDFLARE_D1_DATABASE_ID`` selects D1.
"""
url = os.environ.get("LANG2SQL_DB_URL")
if url:
return build_explorer(url, schema=os.environ.get("LANG2SQL_DB_SCHEMA"))

account = os.environ.get("CLOUDFLARE_D1_ACCOUNT_ID")
database = os.environ.get("CLOUDFLARE_D1_DATABASE_ID")
if account and database:
return build_explorer(f"d1://{account}/{database}")

return None
88 changes: 88 additions & 0 deletions src/lang2sql/adapters/db/sqlalchemy_explorer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
"""Generic SQLAlchemy explorer — one adapter, many engines.

A single :class:`ExplorerPort` implementation that connects to anything
SQLAlchemy speaks (PostgreSQL, MySQL, Snowflake, BigQuery, DuckDB, SQLite, …)
purely from a connection URL. This is the "사용성" win: adding a new warehouse is
``pip install <driver>`` + a DSN, not a new adapter class.

The engine is created lazily on first use so constructing the explorer (and
routing to it in the factory) never imports a driver that isn't installed.
Blocking DB calls run in a worker thread to keep the async event loop free.
"""

from __future__ import annotations

import asyncio
from typing import Any

from ...core.ports.explorer import Column, Table


class SqlAlchemyExplorer:
"""ExplorerPort over a SQLAlchemy Engine, built from a connection URL."""

def __init__(self, url: str, *, schema: str | None = None) -> None:
self.url = url
self._schema = schema
self._engine: Any = None # created lazily

def _get_engine(self) -> Any:
if self._engine is None:
from sqlalchemy import create_engine # imported here = lazy driver load

self._engine = create_engine(self.url)
return self._engine

# --- ExplorerPort ----------------------------------------------------

async def list_tables(self) -> list[Table]:
return await asyncio.to_thread(self._list_tables_sync)

async def describe_table(self, name: str) -> Table:
return await asyncio.to_thread(self._describe_table_sync, name)

async def sample_rows(self, name: str, limit: int = 5) -> list[dict]:
# Bind the limit; quote the identifier via the dialect's preparer.
eng = self._get_engine()
qname = eng.dialect.identifier_preparer.quote(name)
return await self.execute(f"SELECT * FROM {qname}", limit=limit)

async def execute(self, sql: str, limit: int = 1000) -> list[dict]:
return await asyncio.to_thread(self._execute_sync, sql, int(limit))

# --- sync workers ----------------------------------------------------

def _list_tables_sync(self) -> list[Table]:
from sqlalchemy import inspect

insp = inspect(self._get_engine())
schema = self._schema or insp.default_schema_name
return [
Table(name=t, schema=schema or "")
for t in insp.get_table_names(schema=self._schema)
]

def _describe_table_sync(self, name: str) -> Table:
from sqlalchemy import inspect

insp = inspect(self._get_engine())
cols = [
Column(
name=c["name"],
type=str(c["type"]),
nullable=bool(c.get("nullable", True)),
description=c.get("comment") or "",
)
for c in insp.get_columns(name, schema=self._schema)
]
return Table(name=name, schema=self._schema or "", columns=cols)

def _execute_sync(self, sql: str, limit: int) -> list[dict]:
from sqlalchemy import text

with self._get_engine().connect() as conn:
result = conn.execute(text(sql))
if not result.returns_rows:
return []
rows = result.mappings().fetchmany(limit)
return [dict(r) for r in rows]
5 changes: 4 additions & 1 deletion src/lang2sql/tenancy/concierge.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import os

from ..adapters.db.factory import explorer_from_env
from ..adapters.db.postgres_explorer import PostgresExplorer
from ..adapters.llm.fake import FakeLLM
from ..adapters.llm.openai_ import OpenAILLM
Expand Down Expand Up @@ -61,7 +62,9 @@ def __init__(
self._store = store if store is not None else SqliteStore(path)
# Audit + session persistence both ride the one sqlite store by default.
self._llm = llm if llm is not None else _default_llm()
self._explorer = explorer if explorer is not None else PostgresExplorer(_DEFAULT_DSN)
# Explorer precedence: explicit injection → env-configured real DB
# (LANG2SQL_DB_URL / Cloudflare D1) → the canned stub for offline dev.
self._explorer = explorer or explorer_from_env() or PostgresExplorer(_DEFAULT_DSN)
self._safety = safety if safety is not None else SafetyPipeline()
# Persistent semantic store by default so definitions survive restart.
self._scope_resolver = (
Expand Down
Loading
Loading