diff --git a/.schema/pgdog.schema.json b/.schema/pgdog.schema.json index 78238e78b..1b8f6385f 100644 --- a/.schema/pgdog.schema.json +++ b/.schema/pgdog.schema.json @@ -31,6 +31,17 @@ "ban_timeout": 300000, "broadcast_address": null, "broadcast_port": 6433, + "cache": { + "backend": "redis", + "enabled": false, + "max_result_size": 0, + "policy": "no_cache", + "redis": { + "cache_key_prefix": "pgdog:", + "url": "redis://localhost:6379" + }, + "ttl": 300 + }, "checkout_timeout": 5000, "client_connection_recovery": "drop", "client_idle_in_transaction_timeout": 9223372036854775807, @@ -275,6 +286,75 @@ } ] }, + "Cache": { + "description": "Cache configuration.", + "type": "object", + "properties": { + "backend": { + "description": "Which storage backend to use.\n\n_Default:_ `redis`", + "$ref": "#/$defs/CacheBackend", + "default": "redis" + }, + "enabled": { + "description": "Whether to enable caching.\n\n_Default:_ `false`", + "type": "boolean", + "default": false + }, + "max_result_size": { + "description": "Maximum result size in bytes to cache (0 = unlimited).\n\n_Default:_ `0`", + "type": "integer", + "format": "uint", + "default": 0, + "minimum": 0 + }, + "policy": { + "description": "Cache policy: `no_cache` or `cache`.\n\n_Default:_ `no_cache`", + "$ref": "#/$defs/CachePolicy", + "default": "no_cache" + }, + "redis": { + "description": "Redis backend configuration.\n\nOnly read when `backend = \"redis\"`.", + "$ref": "#/$defs/RedisConfig", + "default": { + "cache_key_prefix": "pgdog:", + "url": "redis://localhost:6379" + } + }, + "ttl": { + "description": "Default TTL in seconds for cached queries.\n\n_Default:_ `300`", + "type": "integer", + "format": "uint64", + "default": 300, + "minimum": 0 + } + }, + "additionalProperties": false + }, + "CacheBackend": { + "description": "Cache storage backend discriminator.", + "oneOf": [ + { + "description": "Redis backend (default).", + "type": "string", + "const": "redis" + } + ] + }, + "CachePolicy": { + "description": "Cache policy.", + "oneOf": [ + { + "description": "Never cache queries for this database.", + "type": "string", + "const": "no_cache" + }, + { + "description": "Always cache read queries.", + "type": "string", + "const": "cache" + } + ] + }, "ConnectionRecovery": { "description": "controls if server connections are recovered or dropped if a client abruptly disconnects.\n\nhttps://docs.pgdog.dev/configuration/pgdog.toml/general/#connection_recovery", "oneOf": [ @@ -574,6 +654,21 @@ "maximum": 65535, "minimum": 0 }, + "cache": { + "description": "Redis cache configuration for this database.", + "$ref": "#/$defs/Cache", + "default": { + "backend": "redis", + "enabled": false, + "max_result_size": 0, + "policy": "no_cache", + "redis": { + "cache_key_prefix": "pgdog:", + "url": "redis://localhost:6379" + }, + "ttl": 300 + } + }, "checkout_timeout": { "description": "Maximum amount of time a client is allowed to wait for a connection from the pool.\n\n_Default:_ `5000`\n\nhttps://docs.pgdog.dev/configuration/pgdog.toml/general/#checkout_timeout", "type": "integer", @@ -1441,6 +1536,23 @@ } ] }, + "RedisConfig": { + "description": "Redis-specific cache backend configuration.\n\nCorresponds to the `[general.cache.redis]` TOML section.", + "type": "object", + "properties": { + "cache_key_prefix": { + "description": "Key prefix prepended to every cache key stored in Redis.\n\n_Default:_ `pgdog:`", + "type": "string", + "default": "pgdog:" + }, + "url": { + "description": "Redis connection URL.\n\n_Default:_ `redis://localhost:6379`", + "type": "string", + "default": "redis://localhost:6379" + } + }, + "additionalProperties": false + }, "ReplicaLag": { "description": "Replica lag banning configuration. When a replica's replication lag exceeds the threshold, it is banned from serving read queries.", "type": "object", diff --git a/Cargo.lock b/Cargo.lock index 98f6c842c..b445a1d5d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -663,7 +663,7 @@ dependencies = [ "bitflags 2.9.1", "cexpr", "clang-sys", - "itertools 0.10.5", + "itertools 0.13.0", "log", "prettyplease", "proc-macro2", @@ -804,6 +804,15 @@ dependencies = [ "either", ] +[[package]] +name = "cache" +version = "0.1.0" +dependencies = [ + "serial_test", + "sqlx", + "tokio", +] + [[package]] name = "castaway" version = "0.2.3" @@ -973,6 +982,12 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "cookie-factory" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "396de984970346b0d9e93d1415082923c679e5ae5c3ee3dcbd104f5610af126b" + [[package]] name = "core-foundation" version = "0.9.4" @@ -1023,6 +1038,12 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" +[[package]] +name = "crc16" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "338089f42c427b86394a5ee60ff321da23a5c89c9d89514c829687b26359fcff" + [[package]] name = "critical-section" version = "1.2.0" @@ -1450,6 +1471,15 @@ version = "0.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" +[[package]] +name = "float-cmp" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98de4bbd547a563b716d8dfa9aad1cb19bfab00f4fa09a6a4ed21dbcf44ce9c4" +dependencies = [ + "num-traits", +] + [[package]] name = "flume" version = "0.11.1" @@ -1497,6 +1527,47 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fred" +version = "9.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3cdd5378252ea124b712e0ac55147d26ae3af575883b34b8423091a4c719606b" +dependencies = [ + "arc-swap", + "async-trait", + "bytes", + "bytes-utils", + "crossbeam-queue", + "float-cmp", + "fred-macros", + "futures", + "log", + "parking_lot", + "rand 0.8.5", + "redis-protocol", + "rustls 0.23.27", + "rustls-native-certs 0.7.3", + "semver", + "socket2", + "tokio", + "tokio-rustls 0.26.2", + "tokio-stream", + "tokio-util", + "url", + "urlencoding", +] + +[[package]] +name = "fred-macros" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1458c6e22d36d61507034d5afecc64f105c1d39712b7ac6ec3b352c423f715cc" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.101", +] + [[package]] name = "fs_extra" version = "1.3.0" @@ -2871,6 +2942,7 @@ dependencies = [ "dashmap", "derive_builder", "fnv", + "fred", "futures", "hickory-resolver", "http-body-util", @@ -2914,6 +2986,7 @@ dependencies = [ "tracing-subscriber", "url", "uuid", + "xxhash-rust", ] [[package]] @@ -3458,6 +3531,20 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "redis-protocol" +version = "5.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65deb7c9501fbb2b6f812a30d59c0253779480853545153a51d8e9e444ddc99f" +dependencies = [ + "bytes", + "bytes-utils", + "cookie-factory", + "crc16", + "log", + "nom", +] + [[package]] name = "redox_syscall" version = "0.5.12" @@ -3826,6 +3913,19 @@ dependencies = [ "security-framework 2.11.1", ] +[[package]] +name = "rustls-native-certs" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5bfb394eeed242e909609f56089eecfe5fda225042e8b171791b9c95f5931e5" +dependencies = [ + "openssl-probe", + "rustls-pemfile 2.2.0", + "rustls-pki-types", + "schannel", + "security-framework 2.11.1", +] + [[package]] name = "rustls-native-certs" version = "0.8.1" @@ -5859,6 +5959,12 @@ version = "0.13.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4" +[[package]] +name = "xxhash-rust" +version = "0.8.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdd20c5420375476fbd4394763288da7eb0cc0b8c11deed431a91562af7335d3" + [[package]] name = "yoke" version = "0.8.0" diff --git a/Cargo.toml b/Cargo.toml index d217b8652..d6c4153b7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,7 @@ resolver = "2" members = [ "examples/demo", "integration/rust", + "integration/cache", "pgdog", "pgdog-config", "pgdog-macros", diff --git a/docs/CACHE.md b/docs/CACHE.md new file mode 100644 index 000000000..9fa79baef --- /dev/null +++ b/docs/CACHE.md @@ -0,0 +1,324 @@ +# Cache for pgdog — State of Implementation + +## Architecture + +Cache SELECT queries in Redis, bypass PostgreSQL on cache hit, populate cache on cache miss. Two-tier policy resolution: SQL comment/connection parameter → pgdog's config. + +--- + +## Implementation + +### Configuration (`pgdog-config`) + +**`cache.rs`** — Cache configuration types: + +**CachePolicy enum:** `NoCache` (default), `Cache`. Implements `FromStr`, `Display`, `Serialize`, `Deserialize`, `Copy`, `JsonSchema`. + +**CacheBackend enum:** `Redis` (default). Discriminator for selecting the storage backend and for hotswap detection when the backend type changes in config. + +**RedisConfig struct** (`[general.cache.redis]`): +- `url: String` — Redis connection URL (default `redis://localhost:6379`) +- `cache_key_prefix: String` — prefix prepended to every Redis key (default `pgdog:`) + +**Cache struct** (`[general.cache]`): +- `enabled: bool` — is caching on? (default `false`) +- `policy: CachePolicy` — which policy? (default `no_cache`) +- `ttl: u64` — default TTL seconds (default `300`) +- `backend: CacheBackend` — which storage backend (default `redis`) +- `redis: RedisConfig` — Redis-specific settings +- `max_result_size: usize` — max cached result bytes (default `0` = unlimited) + +Example TOML: +```toml +[general.cache] +enabled = true +policy = "cache" +ttl = 300 + +[general.cache.redis] +url = "redis://localhost:6379" +cache_key_prefix = "pgdog:" +``` + +**`general.rs`** — `General` struct holds `cache: Cache` field. **Cache config is global.** + +**`lib.rs`** — Exports `pub use cache::{CacheBackend, CachePolicy, Cache, RedisConfig as CacheRedisConfig};`. + +### Cache Module (`pgdog/src/frontend/cache/`) + +**`mod.rs`** — Module exports, global singleton, and main `Cache` struct: +```rust +pub mod context; +pub mod integration; +pub mod policy; +pub mod storage; + +pub use context::CacheContext; +pub use integration::CacheCheckResult; +pub use policy::CacheDecision; +pub use storage::{CacheStorage, RedisCacheStorage}; +``` + +`Cache` struct wraps `RwLock>>` (tokio `RwLock`). + +**Global singleton:** Cache is global-scoped, not connection-scoped. Accessed via `cache()` function which returns `Arc` from a `Lazy>` static. `Cache::new()` reads config internally — no parameters needed. + +**Config hotswap:** `hotswap_if_needed()` is called at the top of `try_read_cache` and `save_response_in_cache`. It fast-paths with a read-lock; acquires write-lock only if the URL or backend type has changed, then rebuilds the storage. + +Key methods: +- `new()` — creates storage from current config (or `None` if disabled) +- `hotswap_if_needed()` — compares live config against the active storage's one with `has_config_changed()`; swaps if `true` returned +- `try_read_cache(cache_context, in_transaction, client_request, params)` — hotswaps, calls `cache_check()`, returns `Ok(Some(Vec))` on HIT (caller replays through pipeline), `Ok(None)` on MISS/PASSTHROUGH +- `save_response_in_cache(cache_context)` — hotswaps, finalizes by storing the captured response + +**`storage/mod.rs`** — Abstract storage trait and error type: +- `CacheStorage` trait: `get`, `set`, `is_enabled`, `has_config_changed` — implemented by all cache backends +- `Error` enum shared across all backends: `RedisError`, `ConnectionFailed`, `CacheMiss` + +**`storage/redis.rs`** — Redis storage backend (`RedisCacheStorage`) implementing `CacheStorage`: +- `RedisCacheStorage::new(config)` — builds client from given URL; immediately spawns a background connection task; returns `None` if URL is invalid +- Background connect task: retries `init()` in a loop (5ms to 5s exponential backoff); sets `reconnecting = false` on success; CAS-guarded so only one task runs at a time +- `get(&self, key)` — returns `Result, Error>`; returns `Err(Error::ConnectionFailed)` immediately (triggering cache miss) if not yet connected; marks `reconnecting` and spawns reconnect on Redis errors +- `set(&self, key, value, ttl)` — stores bytes with EX expiration; returns immediately on disconnect; respects `max_result_size` from live config +- `reconnect()` — spawns reconnect if not already running (CAS-guarded) +- `has_config_changed()` — returns `true` if cache config has changed (used for hotswap detection) +- `is_enabled()` — reads live `config().config.general.cache.enabled` +- Key prefix comes from `config().config.general.cache.redis.cache_key_prefix` +- `reconnecting: Arc` — prevents multiple concurrent reconnect tasks +- All Redis operations wrapped in `tokio::time::timeout(REDIS_OPERATION_TIMEOUT)` (2s) + +**`policy.rs`** — 2-tier policy resolution: +- `CacheDirective` enum: `Cache { ttl_seconds }`, `ForceCache { ttl_seconds }`, `NoCache` (default) +- `CacheDecision` enum: `Skip`, `Cache(u64)`, `ForceCache(u64)` +- `resolve(client_request, params, is_read)` — main resolver function, chains all tiers +- `get_cache_directive(client_request, params)` — comment hint (from AST) has priority over connection parameter (`pgdog.cache`) +- `extract_parameter_directive(params)` — parses `pgdog.cache` parameter: `no_cache`, `cache`, `cache ttl=N`, `force_cache`, `force_cache ttl=N` +- Tier 1: Extractor directive (`CacheDirective::Cache { ttl }`, `CacheDirective::ForceCache { ttl }`, or `CacheDirective::NoCache`) +- Tier 2: Global config `CachePolicy` (`NoCache` / `Cache`) + +**`context.rs`** — Cache context held in `QueryEngineContext`: +- `CacheContext` with `cache_miss: Option`, `response_buffer: Vec`, and `had_error: bool` +- `capture_response(message)` — stores message in buffer when cache miss is tracked; sets `had_error = true` on `E` messages +- `reset()` — clears all state for per-query isolation + +**`integration.rs`** — Integration methods on `impl Cache`: +- `cache_check()` — main entry point, checks route, calls `policy::resolve()`, checks Redis +- `deserialize_cached(Vec) -> Vec` — parses a flat blob of concatenated PostgreSQL wire messages into individual `Message` values. Wire format: `[1B code][4B length (incl. itself)][payload]`. Named constants `HEADER_CODE_LEN`, `HEADER_LEN_SIZE`, `HEADER_TOTAL` replace the former magic numbers. Not Redis-specific — usable with any cache backend that stores raw bytes. +- `cache_response()` — serializes `Vec` into wire bytes and stores in Redis +- Cache key: XXH3 hash of `database_name + comment-stripped query string + bind params` + +### Query Engine Integration + +**`pgdog/src/frontend/client/query_engine/mod.rs`** +- Imports global `cache()` from `frontend::cache` +- `handle()` flow: after `route_query()` and before `before_execution()`, calls `cache().try_read_cache(context)`. If HIT: replays each cached `Message` through `process_server_message()` (same pipeline as live backend responses — stats, transaction state, hooks all fire correctly), then returns. On MISS: stores state in `context.cache_context`. +- After `match command`, calls `cache().save_response_in_cache(context)` to finalize caching. + +**`pgdog/src/frontend/client/query_engine/query.rs`** +- `process_server_message()` calls `context.cache_context.capture_response(message.clone())`. + +**`pgdog/src/frontend/client/query_engine/context.rs`** +- `QueryEngineContext` holds `cache_context: CacheContext` field. + +### Backend and Config Integration + +**`pgdog/src/backend/pool/cluster.rs`** +- `ClusterConfig` and `Cluster` hold `cache_enabled: bool` field +- Query parser requirement check includes `|| self.cache_enabled()` — when caching is on, the query parser is forced on. + +**`pgdog-config/src/core.rs`** +- Startup warning emitted when `cache.is_enabled()` and parser is `Off` or `SessionControl`. + +### Dependencies + +**`pgdog/Cargo.toml`** +fred = { version = "9", features = ["enable-rustls"] } +xxhash-rust = { version = "0.8", features = ["xxh3"]} + +--- + +## Key Design Decisions + +| Decision | Choice | +|----------|--------| +| Interception point | Between `route_query()` and `before_execution()` in `handle()` | +| Cache config scope | **Global** (`config.general.cache`) | +| Redis client | `fred` crate v9 (async-native, tokio integration) | +| Cacheable queries | Only reads (`route.is_read()`) | +| Cache policy resolution | 2-tier: SQL comment/param → DB policy | +| Cache HIT flow | Deserialize wire bytes → `Vec` → replay each through `process_server_message()` | +| Cache MISS flow | Normal execute → capture response via `CacheContext` → store in Redis → respond | +| Cache key | XXH3 hash of `database_name + comment-stripped query string + bind params` | +| Wire format | Full PostgreSQL wire messages stored as raw bytes (one concatenated buffer) | + +--- + +## How to Control Cache + +### SQL Comments + +Add a C-style comment before your query. The first matching directive wins: + +```sql +-- Force bypass cache for this query +/* pgdog_cache: no_cache */ +SELECT * FROM users WHERE id = 1; + +-- Cache with database default TTL +/* pgdog_cache: cache */ +SELECT * FROM products WHERE category = 'electronics'; + +-- Cache with custom TTL in seconds +/* pgdog_cache: cache ttl=300 */ +SELECT * FROM orders; + +-- Force cache with database default TTL +/* pgdog_cache: force_cache */ +SELECT * FROM products WHERE category = 'electronics'; + +-- Force cache with custom TTL in seconds +/* pgdog_cache: force_cache ttl=300 */ +SELECT * FROM orders; +``` + +> **Hash independence from comments:** All SQL comments (block `/* */` and line `--`) are stripped +> before computing the cache key hash. This means a query sent with a `/* pgdog_cache: cache */` +> comment produces **exactly the same cache key** as the same query sent without any comment but +> with the directive supplied via a connection parameter (`SET pgdog.cache = 'cache'`). There is +> no longer a need for special normalization of `force_cache` vs `cache` hints — both result in +> the same hash because comments are removed entirely. + +### Connection Parameter + +Set `pgdog.cache` at connection time (via DSN options) or with `SET` after connecting: + +```sql +-- Session-wide: all queries in this connection bypass cache +SET pgdog.cache = 'no_cache'; + +-- Session-wide: cache all queries with default TTL +SET pgdog.cache = 'cache'; + +-- Session-wide: cache all queries with 5-minute TTL +SET pgdog.cache = 'cache ttl=300'; + +-- Session-wide: force cache all queries with default TTL +SET pgdog.cache = 'force_cache'; + +-- Session-wide: force cache all queries with 5-minute TTL +SET pgdog.cache = 'force_cache ttl=300'; +``` + +```sh +# Session-wide: all queries in this connection bypass cache +psql postgresql://postgres:postgres@127.0.0.1:5432/postgres?options=-c%20pgdog.cache%3Dno_cache + +# Session-wide: cache all queries with default TTL +psql postgresql://postgres:postgres@127.0.0.1:5432/postgres?options=-c%20pgdog.cache%3Dcache + +# Session-wide: cache all queries with 5-minute TTL +psql postgresql://postgres:postgres@127.0.0.1:5432/postgres?options=-c%20pgdog.cache%3Dcache%20ttl%3D300 + +# Session-wide: force cache all queries with default TTL +psql postgresql://postgres:postgres@127.0.0.1:5432/postgres?options=-c%20pgdog.cache%3Dforce_cache + +# Session-wide: force cache all queries with 5-minute TTL +psql postgresql://postgres:postgres@127.0.0.1:5432/postgres?options=-c%20pgdog.cache%3Dforce_cache%20ttl%3D300 +``` + +### Priority Order + +Sources are checked in order — first non-None result wins, then falls through to global config: + +``` +SQL comment → pgdog.cache parameter → DB policy config +(highest) (lowest) +``` + +--- + +## Completed + +1. **Redis client never connects** - Problem: CacheClient::new() built the client but never called init(). Fred requires explicit connection initialization. Fix: Added lazy `ensure_connected()` using `client.init().await`, guarded by `AtomicBool` so it runs exactly once on first get()/set(). Changed CacheClient from `#[derive(Debug)]` to manual Debug impl (contains `Arc`). + +2. **Redis GET fails on NULL / cache miss** - Problem: `client.get::()` throws `Parse Error: Cannot parse into bytes` when the key doesn't exist. Fix: Use `client.get::()` and check `val.is_null()` before extracting bytes. Later refined: `get()` now returns `Result, Error>` instead of `Result>>` — a missing key yields `Err(Error::CacheMiss)`, which is matched explicitly in `cache_check()` and converted to `CacheCheckResult::Miss`. Other errors propagate as `Passthrough`. + +3. **Wire format deserialization wrong in send_cached_response** - Problem: PostgreSQL wire message structure is `[1B code][4B length]` where length includes the 4B itself. I calculated `offset + 5 + msg_len` (treating length as payload-only), causing incorrect byte slicing. Fix: Corrected to `offset + 1 + msg_len`, then replaced magic numbers with named constants `HEADER_CODE_LEN`, `HEADER_LEN_SIZE`, `HEADER_TOTAL`. + +4. **Route incorrectly reports read-only as write when parser is disabled** - Problem: `query_parser_bypass()` conservatively returns `Route::write()` for all SQL when the query parser is disabled. Since pgdog doesn't enable the parser by default for simple queries, `route.is_read()` was false for `SELECT 1`. Fix: When any database has `cache.enabled = true`, the query parser level is auto-upgraded to `On` in the cluster config. The `|| self.cache_enabled()` check in `cluster.rs:475` forces the parser on. Cache also emits a startup warning if parser is `Off` or `SessionControl`. The old `is_likely_read()` string-prefix heuristic has been removed entirely. + +5. **DB cache config defaults** - Observation: `Cache.policy` defaults to `CachePolicy::NoCache`. Even with `enabled = true`, caching is skipped unless policy is explicitly set. User action taken: Add `policy = "cache"` to pgdog.toml. + +6. **Query parser auto-upgrade for caching** — When caching is enabled and parser is `Auto`/`Off`/`SessionControl`, the parser is forced to `On` via `|| self.cache_enabled()` check in `cluster.rs`. A startup warning is emitted in `core.rs` if parser remains incompatible. + +7. **Decoupled cache policy extraction** — Cache directives extracted via standalone regex in `cache/policy.rs`, works regardless of parser state. Supports `/* pgdog_cache: ... */` format with optional `ttl=` parameter. Unified with sharding hints via `comment()` function in `comment.rs`. + +8. **Error handling / Reconnection** — Automatic reconnection with background task, CAS-guarded single reconnect, 2s operation timeout on all Redis calls, PING-based connection verification. + +9. **Cache key collision across databases sharing one Redis** — Database name and query string (with all SQL comments stripped) are combined via a single XXH3 hash call, producing deterministic, collision-resistant per-database keys even on shared Redis. Different literal values in queries produce different cache keys. Because all comments are stripped before hashing, the cache key is identical whether the cache directive arrives via a SQL comment or a connection parameter. + +10. **Wire format serialization/deserialization** — PostgreSQL wire messages stored as raw bytes. Correct byte slice calculation expressed via named constants (`HEADER_CODE_LEN = 1`, `HEADER_LEN_SIZE = 4`, `HEADER_TOTAL = 5`). Deserialization extracted into `deserialize_cached()` with inline comments explaining each boundary check. + +11. **Do not cache error responses**. + +12. **Setting pgdog.cache via connection url doesn't work** — now works. + +13. **Moved all cache-related structs from QueryEngine to Client** — now all cache structs including redis client are creating for whole pgdog's lifetime. + +14. **Use built-in query comment hints** — Cache hints (`pgdog_cache:`) are now extracted alongside sharding hints (`pgdog_shard:`, `pgdog_sharding_key:`, `pgdog_role:`) via the unified `comment()` function in `comment.rs`. The `comment_cache` field is stored in `AstInner` and accessed during cache checking via `client_request.ast.comment_cache`. Policy resolution simplified: trait-based extractors replaced with free functions (`resolve()`, `get_cache_directive()`, `extract_parameter_directive()`). Comment hint (from AST) has priority over connection parameter `pgdog.cache`. `Cache` struct no longer needs `policy_dispatcher` field. Parameter format unified to `no_cache` (underscore, not dash). + +15. **Add cache config to .schema**. + +16. **Force-cache hint support** — `/* pgdog_cache: force_cache */` and `/* pgdog_cache: force_cache ttl=N */` directives always attempt to cache. Because all comments are stripped before hashing, `force_cache` and `cache` directives produce the same cache key as the bare query with no comment at all. + +17. **Cache HIT replays through the server-message pipeline** — Previously, cache hits sent responses directly to the stream, bypassing `process_server_message()`. Now `try_read_cache()` returns `Option>` and the caller (`handle()`) feeds each message through `process_server_message()` — giving correct stats accounting, transaction state updates from `ReadyForQuery`, and hook invocations on every cache hit. + +18. **CacheClient error types refined** — `get()` now returns `Result, Error>` (no more `Option`). `Error::CacheMiss(u64)` is a dedicated variant for key-not-found; `Error::RedisError` is now a struct variant carrying `cmd: &'static str`, `key: u64`, and the underlying error for richer diagnostics. `Error::ConnectionFailed` uses `&'static str` instead of `String` to avoid heap allocation on the hot path. + +19. **Config hotswap** — `Cache` singleton holds `Arc>>>`. `hotswap_if_needed()` runs at the start of every `try_read_cache` and `save_response_in_cache` call: read-locks to compare the active backend's URL against `config().config.general.cache.redis.url`; if they differ (or the backend type changes) it write-locks and rebuilds the storage. Fast path is a read-lock-only check with no allocation. + +20. **CacheClient rewritten as `RedisCacheStorage`** — Replaced `CacheClient` with `RedisCacheStorage` implementing the `CacheStorage` trait. Key improvements: background connect task is spawned immediately in `new()` so the first query never blocks on init; `get`/`set` check only one atomic flag (`reconnecting`) and return immediately if `true` returned instead of running `ensure_connected`; the `Option` field and the three-condition guard at the top of every operation are gone; `reconnect` is the single place that sets the flag and CAS-guards the reconnect spawn. + +21. **Abstract storage backend** — `storage/mod.rs` defines the `CacheStorage` trait (`get`, `set`, `is_enabled`, `has_config_changed`) and the shared `Error` enum. `storage/redis.rs` is the Redis implementation. `Cache` holds `Box` behind a tokio `RwLock` so any backend (e.g. Memcached) can be plugged in by adding a sub-module under `storage/` and a variant to `CacheBackend`. `deserialize_cached()` remains backend-agnostic in `integration.rs`. + +22. **Nested backend config** — Backend-specific settings live in their own TOML subtable (`[general.cache.redis]`) rather than flat fields on `[general.cache]`. `RedisConfig` holds `url` and `cache_key_prefix`. When a new backend is added, it gets its own subtable (e.g. `[general.cache.memcached]`) without polluting the top-level cache section. `client.rs` renamed to `storage/redis.rs`. + +23. **Cache key must include Bind parameters for extended protocol** — For simple `Query` messages, parameter values are embedded in the SQL string, so the XXH3 hash of `database + query_text` is naturally unique per value. For extended protocol (Parse/Bind/Execute), the SQL contains `$1`/`$2` placeholders and the actual values arrive in the `Bind` message separately. The current hash ignores them, so `SELECT * FROM users WHERE id = $1` with `id = 1` and `id = 2` produce the same cache key — wrong rows are returned on the second call. Fix: hash `param.len` (the `i32` field, not the `len()` method which returns wire size) and `param.data` for each entry in `bind.params_raw()` into the hasher in `cache_check()` in `integration.rs`. This affects all production drivers that use extended protocol by default: psycopg3, asyncpg, JDBC, npgsql. Note: pgdog's built-in prepared statement cache (`PreparedStatements` / `GlobalCache`) is a proxy-level plan cache only — it deduplicates backend `Parse` round-trips. It does not cache result rows and is orthogonal to the Redis result cache. + +24. **Comments stripped from query before hashing** — All SQL block comments (`/* … */`, including nested) and line comments (`-- …`) are removed from the query string before computing the XXH3 cache key. This makes the cache key independent of whether the cache directive was supplied via a SQL comment or a connection parameter. `compute_cache_key_hash` is a standalone public function in `integration.rs` so it can be unit-tested directly. `strip_sql_comments` returns `Cow<'_, str>`: when no comment markers are present the original string slice is returned without any allocation; only queries that actually contain `/*` or `--` incur a heap allocation. The `FORCE_CACHE_RE` regex normalization that previously converted `force_cache` to `cache` in the hash input has been removed — stripping all comments achieves the same result in a more general way. + +--- + +## What's Left To Do + +1. **Redis disconnect/reconnect under heavy load** — The reconnection logic works, but timing edge cases under rapid disconnect/reconnect cycles still need stress-testing. + +2. **Integration tests**. + +3. **Set redis query timeout from config** + +4. **Add hint for query hash key** + +5. **Add flag for required cache storage available** — query will fall with error if redis (or another cache storage) unavaliable. And subtask: first query inits cache client, but connection is established later, which is why the cache storage is unavailable for the first query — so need to wait for established connection. + +6. **Hash query without comments on the fly instead of normalizing it first** — with this no `String` will be allocated. But must deal somehow with getting same hash for "SELECT 1;" and "/* pgdog_cache: cache */ SELECT 1;" because the second one transforms to " SELECT 1;" (with space at the start). + +# Tests + +## Running the tests + +Unit tests (no PostgreSQL or Redis needed) +```sh +cargo nextest run -p pgdog frontend::cache +``` + +## Integration tests (PostgreSQL + Redis + pgdog required) + +```sh +bash integration/cache/run.sh +``` + +Or if you already have pgdog running on port 6432 with that config: +```sh +bash integration/cache/dev.sh +``` \ No newline at end of file diff --git a/integration/cache/Cargo.toml b/integration/cache/Cargo.toml new file mode 100644 index 000000000..c90004800 --- /dev/null +++ b/integration/cache/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "cache" +version = "0.1.0" +edition = "2024" + +[lib] +test = true + +[dependencies] +sqlx = { version = "*", features = ["postgres", "runtime-tokio"]} +tokio = { version = "1", features = ["full"]} +serial_test = "3" diff --git a/integration/cache/dev.sh b/integration/cache/dev.sh new file mode 100755 index 000000000..e1943d893 --- /dev/null +++ b/integration/cache/dev.sh @@ -0,0 +1,8 @@ +#!/bin/bash +# Run only the cache integration tests. +set -e +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) + +pushd "${SCRIPT_DIR}" +cargo nextest run --nff -j 1 integration +popd diff --git a/integration/cache/pgdog.toml b/integration/cache/pgdog.toml new file mode 100644 index 000000000..08c87d054 --- /dev/null +++ b/integration/cache/pgdog.toml @@ -0,0 +1,31 @@ +[general] +query_timeout = 2_000 +checkout_timeout = 2_000 +connect_timeout = 2_000 + +[general.cache] +enabled = true +policy = "cache" +ttl = 5 + +[general.cache.redis] +url = "redis://127.0.0.1:6379" + +# ------------------------------------------------------------------------------ +# ----- Admin ------------------------------------------------------------------ + +[admin] +password = "pgdog" + +# ------------------------------------------------------------------------------ +# ----- Database :: pgdog ------------------------------------------------------ + +[[databases]] +name = "pgdog" +host = "127.0.0.1" + +[[databases]] +name = "pgdog" +host = "127.0.0.1" +role = "replica" +read_only = true \ No newline at end of file diff --git a/integration/cache/run.sh b/integration/cache/run.sh new file mode 100755 index 000000000..4efde3003 --- /dev/null +++ b/integration/cache/run.sh @@ -0,0 +1,14 @@ +#!/bin/bash +# Run cache integration tests with the dedicated cache pgdog config. +# PostgreSQL must be running on 127.0.0.1:5432 and Redis on 127.0.0.1:6379. +# Run integration/setup.sh first if you haven't already. +set -e +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) +source "${SCRIPT_DIR}"/../common.sh + +run_pgdog "integration/cache" +wait_for_pgdog + +bash "${SCRIPT_DIR}"/dev.sh + +stop_pgdog diff --git a/integration/cache/src/lib.rs b/integration/cache/src/lib.rs new file mode 100644 index 000000000..6ec5655e6 --- /dev/null +++ b/integration/cache/src/lib.rs @@ -0,0 +1,47 @@ +use sqlx::{Postgres, pool::Pool, postgres::PgPoolOptions}; + + +pub async fn connection() -> Pool { + PgPoolOptions::new() + .max_connections(1) + .connect("postgres://pgdog:pgdog@127.0.0.1:6432/pgdog?application_name=sqlx") + .await + .unwrap() +} + +pub async fn connection_direct() -> Pool { + PgPoolOptions::new() + .max_connections(1) + .connect("postgres://pgdog:pgdog@127.0.0.1:5432/pgdog?application_name=sqlx_direct") + .await + .unwrap() +} + +/// `options` should be a space-separated list of `-c key=value` pairs using +/// the exact percent-encoded form that PostgreSQL DSNs require (space - `%20`, +/// `=` - `%3D`). +pub async fn connection_with_options(options: &str) -> Pool { + PgPoolOptions::new() + .max_connections(1) + .connect(&format!( + "postgres://pgdog:pgdog@127.0.0.1:6432/pgdog?application_name=sqlx&options={}", + options, + )) + .await + .unwrap() +} + +pub async fn redis_available() -> bool { + tokio::net::TcpStream::connect("127.0.0.1:6379") + .await + .is_ok() +} + +#[macro_export] +macro_rules! require_redis { + () => { + if !redis_available().await { + panic!("Redis required at 127.0.0.1:6379 — start it before running cache tests"); + } + }; +} diff --git a/integration/cache/tests/integration/mod.rs b/integration/cache/tests/integration/mod.rs new file mode 100644 index 000000000..13d544be1 --- /dev/null +++ b/integration/cache/tests/integration/mod.rs @@ -0,0 +1,447 @@ +use std::time::Duration; + +use cache::*; +use serial_test::serial; +use sqlx::Executor; +use tokio::time::sleep; + +/// Verifies that a second identical SELECT is served from Redis instead of PostgreSQL. +/// +/// Strategy: +/// 1. Create and populate a test table. +/// 2. `no_cache` SELECT to verify the row exists in PG (does not warm Redis). +/// 3. Normal SELECT through pgdog (cache miss → response stored in Redis). +/// 4. Delete the row *directly* on PostgreSQL port 5432 so Redis is not invalidated. +/// 5. Normal SELECT again — must return the cached row even though PG has none. +#[tokio::test] +#[serial] +async fn test_cache_hit() { + require_redis!(); + let pool = connection().await; + + pool.execute("CREATE TABLE IF NOT EXISTS cache_test_hit (id BIGINT PRIMARY KEY, val TEXT)") + .await + .unwrap(); + pool.execute("TRUNCATE cache_test_hit").await.unwrap(); + pool.execute("INSERT INTO cache_test_hit VALUES (1, 'hello')") + .await + .unwrap(); + + // Confirm the row is there without touching the Redis cache. + let rows: Vec<(i64, String)> = sqlx::query_as( + "/* pgdog_cache: no_cache */ SELECT id, val FROM cache_test_hit WHERE id = 1", + ) + .fetch_all(&pool) + .await + .unwrap(); + assert_eq!(rows.len(), 1, "row must exist in PG before cache warm-up"); + + // Warm up the Redis cache with a regular (cacheable) SELECT. + let first: Vec<(i64, String)> = + sqlx::query_as("SELECT id, val FROM cache_test_hit WHERE id = 1") + .fetch_all(&pool) + .await + .unwrap(); + assert_eq!(first.len(), 1, "first SELECT must return the row"); + + // Delete the row directly in Postgres, bypassing pgdog so Redis is not invalidated. + let direct = connection_direct().await; + direct + .execute("DELETE FROM cache_test_hit WHERE id = 1") + .await + .unwrap(); + direct.close().await; + + // Confirm the row is actually gone in PG (no_cache hint bypasses Redis). + let gone: Vec<(i64, String)> = sqlx::query_as( + "/* pgdog_cache: no_cache */ SELECT id, val FROM cache_test_hit WHERE id = 1", + ) + .fetch_all(&pool) + .await + .unwrap(); + assert!( + gone.is_empty(), + "row must be gone in PG after direct delete" + ); + + // Now the same query through pgdog without a hint: must be served from Redis. + let cached: Vec<(i64, String)> = + sqlx::query_as("SELECT id, val FROM cache_test_hit WHERE id = 1") + .fetch_all(&pool) + .await + .unwrap(); + assert_eq!( + cached.len(), + 1, + "cached SELECT must still return the row served from Redis" + ); + assert_eq!(cached[0].1, "hello"); + + pool.execute("DROP TABLE IF EXISTS cache_test_hit") + .await + .unwrap(); + pool.close().await; +} + +/// Verifies that queries inside an explicit transaction are never served from Redis. +/// +/// The cache must be bypassed for in-transaction queries so that the client +/// always sees the latest database state as part of its own transaction. +#[tokio::test] +#[serial] +async fn test_cache_bypassed_in_transaction() { + require_redis!(); + let pool = connection().await; + + pool.execute("CREATE TABLE IF NOT EXISTS cache_test_txn (id BIGINT PRIMARY KEY, val TEXT)") + .await + .unwrap(); + pool.execute("TRUNCATE cache_test_txn").await.unwrap(); + pool.execute("INSERT INTO cache_test_txn VALUES (1, 'original')") + .await + .unwrap(); + + // Warm up the cache. + let _: Vec<(i64, String)> = sqlx::query_as("SELECT id, val FROM cache_test_txn WHERE id = 1") + .fetch_all(&pool) + .await + .unwrap(); + + // Inside a transaction, update the row and then SELECT — must see the updated value, + // not the stale cached one. + let mut tx = pool.begin().await.unwrap(); + sqlx::query("UPDATE cache_test_txn SET val = 'updated' WHERE id = 1") + .execute(&mut *tx) + .await + .unwrap(); + + let in_tx: Vec<(i64, String)> = + sqlx::query_as("SELECT id, val FROM cache_test_txn WHERE id = 1") + .fetch_all(&mut *tx) + .await + .unwrap(); + assert_eq!( + in_tx[0].1, "updated", + "SELECT inside a transaction must see the transaction's own write, not the Redis cache" + ); + + tx.commit().await.unwrap(); + + pool.execute("DROP TABLE IF EXISTS cache_test_txn") + .await + .unwrap(); + pool.close().await; +} + +/// Verifies that cached results expire after the configured TTL. +#[tokio::test] +#[serial] +async fn test_cache_ttl_expiry() { + require_redis!(); + let pool = connection().await; + + pool.execute("CREATE TABLE IF NOT EXISTS cache_test_ttl (id BIGINT PRIMARY KEY, val TEXT)") + .await + .unwrap(); + pool.execute("TRUNCATE cache_test_ttl").await.unwrap(); + pool.execute("INSERT INTO cache_test_ttl VALUES (1, 'original')") + .await + .unwrap(); + + // Warm up Redis. + let _: Vec<(i64, String)> = sqlx::query_as( + "/* pgdog_cache: cache ttl=1 */ SELECT id, val FROM cache_test_ttl WHERE id = 1", + ) + .fetch_all(&pool) + .await + .unwrap(); + + // Remove the row directly from PG so Redis is stale. + let direct = connection_direct().await; + direct + .execute("DELETE FROM cache_test_ttl WHERE id = 1") + .await + .unwrap(); + direct.close().await; + + // Wait for the Redis entry to expire + sleep(Duration::from_secs(2)).await; + + // After expiry pgdog must query PG and return no rows. + let rows: Vec<(i64, String)> = sqlx::query_as( + "/* pgdog_cache: cache ttl=1 */ SELECT id, val FROM cache_test_ttl WHERE id = 1", + ) + .fetch_all(&pool) + .await + .unwrap(); + assert!( + rows.is_empty(), + "after TTL expiry the cached row must no longer be returned" + ); + + pool.execute("DROP TABLE IF EXISTS cache_test_ttl") + .await + .unwrap(); + pool.close().await; +} + +/// Verifies that the extended protocol (parameterized `$1` queries) uses the +/// bind parameter values in the cache key. +#[tokio::test] +#[serial] +async fn test_extended_protocol_different_params_have_different_cache_keys() { + require_redis!(); + let pool = connection().await; + + pool.execute("CREATE TABLE IF NOT EXISTS cache_test_ext (id BIGINT PRIMARY KEY, val TEXT)") + .await + .unwrap(); + pool.execute("TRUNCATE cache_test_ext").await.unwrap(); + pool.execute("INSERT INTO cache_test_ext VALUES (1, 'one'), (2, 'two')") + .await + .unwrap(); + + // Warm cache for id=1. + let r1: Vec<(i64, String)> = sqlx::query_as("SELECT id, val FROM cache_test_ext WHERE id = $1") + .bind(1i64) + .fetch_all(&pool) + .await + .unwrap(); + assert_eq!(r1.len(), 1); + assert_eq!(r1[0].1, "one"); + + // Warm cache for id=2. + let r2: Vec<(i64, String)> = sqlx::query_as("SELECT id, val FROM cache_test_ext WHERE id = $1") + .bind(2i64) + .fetch_all(&pool) + .await + .unwrap(); + assert_eq!(r2.len(), 1); + assert_eq!(r2[0].1, "two"); + + // Delete both rows directly in PG so that any result must come from Redis. + let direct = connection_direct().await; + direct.execute("DELETE FROM cache_test_ext").await.unwrap(); + direct.close().await; + + let cached1: Vec<(i64, String)> = + sqlx::query_as("SELECT id, val FROM cache_test_ext WHERE id = $1") + .bind(1i64) + .fetch_all(&pool) + .await + .unwrap(); + assert_eq!(cached1.len(), 1, "id=1 entry must be served from cache"); + assert_eq!(cached1[0].1, "one"); + + let cached2: Vec<(i64, String)> = + sqlx::query_as("SELECT id, val FROM cache_test_ext WHERE id = $1") + .bind(2i64) + .fetch_all(&pool) + .await + .unwrap(); + assert_eq!(cached2.len(), 1, "id=2 entry must be served from cache"); + assert_eq!(cached2[0].1, "two"); + + pool.execute("DROP TABLE IF EXISTS cache_test_ext") + .await + .unwrap(); + pool.close().await; +} + +/// Verifies that `/* pgdog_cache: force_cache */` updates cache. +#[tokio::test] +#[serial] +async fn test_force_cache() { + require_redis!(); + let pool = connection().await; + + pool.execute("CREATE TABLE IF NOT EXISTS cache_test_force (id BIGINT PRIMARY KEY, val TEXT)") + .await + .unwrap(); + pool.execute("TRUNCATE cache_test_force").await.unwrap(); + pool.execute("INSERT INTO cache_test_force VALUES (1, 'not_forced')") + .await + .unwrap(); + + // Warm cache + let r1: Vec<(i64, String)> = + sqlx::query_as("/* pgdog_cache: cache ttl=2 */ SELECT id, val FROM cache_test_force WHERE id = 1") + .fetch_all(&pool) + .await + .unwrap(); + assert_eq!(r1.len(), 1); + assert_eq!(r1[0].1, "not_forced"); + + let direct = connection_direct().await; + direct.execute("UPDATE cache_test_force SET val = 'forced' WHERE id = 1") + .await + .unwrap(); + direct.close().await; + + let r: Vec<(i64, String)> = sqlx::query_as( + "/* pgdog_cache: force_cache ttl=3 */ SELECT id, val FROM cache_test_force WHERE id = 1", + ) + .fetch_all(&pool) + .await + .unwrap(); + assert_eq!(r.len(), 1); + assert_eq!(r[0].1, "forced"); + + let cached: Vec<(i64, String)> = + sqlx::query_as("SELECT id, val FROM cache_test_force WHERE id = 1") + .fetch_all(&pool) + .await + .unwrap(); + assert_eq!(cached.len(), 1); + assert_eq!(cached[0].1, "forced"); + + pool.execute("DROP TABLE IF EXISTS cache_test_force") + .await + .unwrap(); + pool.close().await; +} + +/// Verifies that `/* pgdog_cache: no_cache */` prevents the response from being +/// stored in Redis, so a subsequent plain SELECT actually hits PostgreSQL. +#[tokio::test] +#[serial] +async fn test_no_cache_hint_does_not_warm_redis() { + require_redis!(); + let pool = connection().await; + + pool.execute("CREATE TABLE IF NOT EXISTS cache_test_no_warm (id BIGINT PRIMARY KEY, val TEXT)") + .await + .unwrap(); + pool.execute("TRUNCATE cache_test_no_warm").await.unwrap(); + pool.execute("INSERT INTO cache_test_no_warm VALUES (1, 'original')") + .await + .unwrap(); + + // Fetch with no_cache — must NOT warm Redis. + let r: Vec<(i64, String)> = sqlx::query_as( + "/* pgdog_cache: no_cache */ SELECT id, val FROM cache_test_no_warm WHERE id = 1", + ) + .fetch_all(&pool) + .await + .unwrap(); + assert_eq!(r.len(), 1); + + let direct = connection_direct().await; + direct + .execute("DELETE FROM cache_test_no_warm WHERE id = 1") + .await + .unwrap(); + direct.close().await; + + // A plain SELECT must reach PG (no cache entry) and return 0 rows. + let after: Vec<(i64, String)> = + sqlx::query_as("SELECT id, val FROM cache_test_no_warm WHERE id = 1") + .fetch_all(&pool) + .await + .unwrap(); + assert!( + after.is_empty(), + "no_cache hint must not warm Redis, so PG miss returns 0 rows" + ); + + pool.execute("DROP TABLE IF EXISTS cache_test_no_warm") + .await + .unwrap(); + pool.close().await; +} + +/// Verifies that passing `pgdog.cache=no_cache` in the connection DSN options +/// bypasses the cache for all queries on that connection. +#[tokio::test] +#[serial] +async fn test_connection_option_no_cache_bypasses_redis() { + require_redis!(); + let pool = connection().await; + + pool.execute("CREATE TABLE IF NOT EXISTS cache_test_param (id BIGINT PRIMARY KEY, val TEXT)") + .await + .unwrap(); + pool.execute("TRUNCATE cache_test_param").await.unwrap(); + pool.execute("INSERT INTO cache_test_param VALUES (1, 'cached_val')") + .await + .unwrap(); + + // Warm the cache via a normal connection. + let _: Vec<(i64, String)> = sqlx::query_as("SELECT id, val FROM cache_test_param WHERE id = 1") + .fetch_all(&pool) + .await + .unwrap(); + + let direct = connection_direct().await; + direct + .execute("DELETE FROM cache_test_param WHERE id = 1") + .await + .unwrap(); + direct.close().await; + + // A connection with pgdog.cache=no_cache must bypass Redis and hit PG, + // returning 0 rows because the row was deleted. + let no_cache_conn = connection_with_options("-c%20pgdog.cache%3Dno_cache").await; + let rows: Vec<(i64, String)> = + sqlx::query_as("SELECT id, val FROM cache_test_param WHERE id = 1") + .fetch_all(&no_cache_conn) + .await + .unwrap(); + assert!( + rows.is_empty(), + "connection-level no_cache must bypass Redis and see the deleted row" + ); + no_cache_conn.close().await; + + pool.execute("DROP TABLE IF EXISTS cache_test_param") + .await + .unwrap(); + pool.close().await; +} + +/// Verifies that error responses are never stored in Redis. +/// +/// A query that initially errors must not poison the cache: after the error +/// is fixed (table created), the same query must reach PG and return live data. +#[tokio::test] +#[serial] +async fn test_error_response_not_cached() { + require_redis!(); + let pool = connection().await; + + pool.execute("DROP TABLE IF EXISTS cache_test_error") + .await + .unwrap(); + + // This SELECT will produce an error (table does not exist). + let err = sqlx::query("SELECT id, val FROM cache_test_error WHERE id = 1") + .fetch_all(&pool) + .await; + assert!(err.is_err(), "query on missing table must return an error"); + + // Now create the table and insert a row. + pool.execute("CREATE TABLE cache_test_error (id BIGINT PRIMARY KEY, val TEXT)") + .await + .unwrap(); + pool.execute("INSERT INTO cache_test_error VALUES (1, 'live')") + .await + .unwrap(); + + // The same query must now hit PG (the previous error was not cached). + let rows: Vec<(i64, String)> = + sqlx::query_as("SELECT id, val FROM cache_test_error WHERE id = 1") + .fetch_all(&pool) + .await + .unwrap(); + assert_eq!( + rows.len(), + 1, + "error must not be cached; must return live row" + ); + assert_eq!(rows[0].1, "live"); + + pool.execute("DROP TABLE IF EXISTS cache_test_error") + .await + .unwrap(); + pool.close().await; +} diff --git a/integration/cache/tests/mod.rs b/integration/cache/tests/mod.rs new file mode 100644 index 000000000..7221f6b15 --- /dev/null +++ b/integration/cache/tests/mod.rs @@ -0,0 +1 @@ +pub mod integration; \ No newline at end of file diff --git a/integration/cache/users.toml b/integration/cache/users.toml new file mode 100644 index 000000000..77fa26a15 --- /dev/null +++ b/integration/cache/users.toml @@ -0,0 +1,4 @@ +[[users]] +name = "pgdog" +database = "pgdog" +password = "pgdog" \ No newline at end of file diff --git a/pgdog-config/src/cache.rs b/pgdog-config/src/cache.rs new file mode 100644 index 000000000..0a7ae9021 --- /dev/null +++ b/pgdog-config/src/cache.rs @@ -0,0 +1,162 @@ +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +/// Cache policy. +#[derive( + Serialize, Deserialize, Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Copy, JsonSchema, +)] +#[serde(rename_all = "snake_case")] +pub enum CachePolicy { + /// Never cache queries for this database. + #[default] + NoCache, + /// Always cache read queries. + Cache, +} + +impl std::str::FromStr for CachePolicy { + type Err = String; + + fn from_str(s: &str) -> Result { + match s.to_lowercase().as_str() { + "no_cache" => Ok(Self::NoCache), + "cache" => Ok(Self::Cache), + _ => Err(format!("Invalid cache policy: {}", s)), + } + } +} + +impl std::fmt::Display for CachePolicy { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let display = match self { + Self::NoCache => "no_cache", + Self::Cache => "cache", + }; + write!(f, "{}", display) + } +} + +/// Cache storage backend discriminator. +#[derive( + Serialize, Deserialize, Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Copy, JsonSchema, +)] +#[serde(rename_all = "snake_case")] +pub enum CacheBackend { + /// Redis backend (default). + #[default] + Redis, +} + +/// Redis-specific cache backend configuration. +/// +/// Corresponds to the `[general.cache.redis]` TOML section. +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, JsonSchema)] +#[serde(deny_unknown_fields)] +pub struct RedisConfig { + /// Redis connection URL. + /// + /// _Default:_ `redis://localhost:6379` + #[serde(default = "RedisConfig::url")] + pub url: String, + + /// Key prefix prepended to every cache key stored in Redis. + /// + /// _Default:_ `pgdog:` + #[serde(default = "RedisConfig::cache_key_prefix")] + pub cache_key_prefix: String, +} + +impl Default for RedisConfig { + fn default() -> Self { + Self { + url: Self::url(), + cache_key_prefix: Self::cache_key_prefix(), + } + } +} + +impl RedisConfig { + fn url() -> String { + "redis://localhost:6379".to_string() + } + + fn cache_key_prefix() -> String { + "pgdog:".to_string() + } +} + +/// Cache configuration. +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, JsonSchema)] +#[serde(deny_unknown_fields)] +pub struct Cache { + /// Whether to enable caching. + /// + /// _Default:_ `false` + #[serde(default = "Cache::enabled")] + pub enabled: bool, + + /// Cache policy: `no_cache` or `cache`. + /// + /// _Default:_ `no_cache` + #[serde(default = "Cache::policy")] + pub policy: CachePolicy, + + /// Default TTL in seconds for cached queries. + /// + /// _Default:_ `300` + #[serde(default = "Cache::ttl")] + pub ttl: u64, + + /// Which storage backend to use. + /// + /// _Default:_ `redis` + #[serde(default = "Cache::backend")] + pub backend: CacheBackend, + + /// Redis backend configuration. + /// + /// Only read when `backend = "redis"`. + #[serde(default)] + pub redis: RedisConfig, + + /// Maximum result size in bytes to cache (0 = unlimited). + /// + /// _Default:_ `0` + #[serde(default = "Cache::max_result_size")] + pub max_result_size: usize, +} + +impl Default for Cache { + fn default() -> Self { + Self { + enabled: Self::enabled(), + policy: Self::policy(), + ttl: Self::ttl(), + backend: Self::backend(), + redis: RedisConfig::default(), + max_result_size: Self::max_result_size(), + } + } +} + +impl Cache { + fn enabled() -> bool { + false + } + + fn policy() -> CachePolicy { + CachePolicy::default() + } + + fn ttl() -> u64 { + 300 + } + + fn backend() -> CacheBackend { + CacheBackend::default() + } + + fn max_result_size() -> usize { + 0 + } +} diff --git a/pgdog-config/src/core.rs b/pgdog-config/src/core.rs index 856518a89..52e0187fb 100644 --- a/pgdog-config/src/core.rs +++ b/pgdog-config/src/core.rs @@ -564,6 +564,11 @@ impl Config { r#""pg_query_raw" parser engine requires a large thread stack, setting it to 32MiB for each Tokio worker"# ); } + + if self.general.cache.enabled + && matches!(self.general.query_parser, QueryParserLevel::Off | QueryParserLevel::SessionControl) { + warn!("cache requires enabled query parser but it's disabled or session controlled"); + } } /// Multi-tenancy is enabled. diff --git a/pgdog-config/src/general.rs b/pgdog-config/src/general.rs index aa0f636f6..59d76aafa 100644 --- a/pgdog-config/src/general.rs +++ b/pgdog-config/src/general.rs @@ -7,6 +7,7 @@ use std::path::PathBuf; use std::str::FromStr; use std::time::Duration; +use crate::cache::Cache; use crate::pooling::ConnectionRecovery; use crate::UniqueIdFunction; use crate::{ @@ -643,6 +644,10 @@ pub struct General { /// https://docs.pgdog.dev/configuration/pgdog.toml/general/#cutover_save_config #[serde(default)] pub cutover_save_config: bool, + + /// Redis cache configuration for this database. + #[serde(default)] + pub cache: Cache, } impl Default for General { @@ -729,6 +734,7 @@ impl Default for General { cutover_timeout_action: Self::cutover_timeout_action(), cutover_save_config: bool::default(), unique_id_function: Self::unique_id_function(), + cache: Cache::default(), } } } diff --git a/pgdog-config/src/lib.rs b/pgdog-config/src/lib.rs index 1a106a295..22ab53404 100644 --- a/pgdog-config/src/lib.rs +++ b/pgdog-config/src/lib.rs @@ -1,5 +1,6 @@ // Submodules pub mod auth; +pub mod cache; pub mod core; pub mod data_types; pub mod database; @@ -18,6 +19,7 @@ pub mod users; pub mod util; pub use auth::{AuthType, PassthroughAuth}; +pub use cache::{CacheBackend, CachePolicy, Cache, RedisConfig as CacheRedisConfig}; pub use core::{Config, ConfigAndUsers}; pub use data_types::*; pub use database::{ diff --git a/pgdog/Cargo.toml b/pgdog/Cargo.toml index 7c62c28c4..461daec45 100644 --- a/pgdog/Cargo.toml +++ b/pgdog/Cargo.toml @@ -69,6 +69,8 @@ pgdog-config = { path = "../pgdog-config" } pgdog-vector = { path = "../pgdog-vector" } pgdog-stats = { path = "../pgdog-stats" } pgdog-postgres-types = { path = "../pgdog-postgres-types"} +fred = { version = "9", features = ["enable-rustls"] } +xxhash-rust = { version = "0.8", features = ["xxh3"]} [target.'cfg(not(target_env = "msvc"))'.dependencies] tikv-jemallocator = "0.6" diff --git a/pgdog/src/backend/pool/cluster.rs b/pgdog/src/backend/pool/cluster.rs index 9dbd038d0..1dff017a5 100644 --- a/pgdog/src/backend/pool/cluster.rs +++ b/pgdog/src/backend/pool/cluster.rs @@ -81,6 +81,7 @@ pub struct Cluster { reload_schema_on_ddl: bool, load_schema: LoadSchema, resharding_parallel_copies: usize, + cache_enabled: bool, } /// Sharding configuration from the cluster. @@ -157,6 +158,7 @@ pub struct ClusterConfig<'a> { pub reload_schema_on_ddl: bool, pub load_schema: LoadSchema, pub resharding_parallel_copies: usize, + pub cache_enabled: bool, } impl<'a> ClusterConfig<'a> { @@ -210,6 +212,7 @@ impl<'a> ClusterConfig<'a> { reload_schema_on_ddl: general.reload_schema_on_ddl, load_schema: general.load_schema, resharding_parallel_copies: general.resharding_parallel_copies, + cache_enabled: general.cache.enabled, } } } @@ -247,6 +250,7 @@ impl Cluster { reload_schema_on_ddl, load_schema, resharding_parallel_copies, + cache_enabled, } = config; let identifier = Arc::new(DatabaseUser { @@ -296,6 +300,7 @@ impl Cluster { reload_schema_on_ddl, load_schema, resharding_parallel_copies, + cache_enabled, } } @@ -470,6 +475,7 @@ impl Cluster { || self.dry_run() || self.prepared_statements() == &PreparedStatements::Full || self.pub_sub_enabled() + || self.cache_enabled() || RegexParser::use_parser(request) } } @@ -545,6 +551,11 @@ impl Cluster { self.resharding_parallel_copies } + /// Redis cache enabled. + pub fn cache_enabled(&self) -> bool { + self.cache_enabled + } + /// Launch the connection pools. pub(crate) fn launch(&self) { for shard in self.shards() { diff --git a/pgdog/src/config/cache.rs b/pgdog/src/config/cache.rs new file mode 100644 index 000000000..ece03acb6 --- /dev/null +++ b/pgdog/src/config/cache.rs @@ -0,0 +1 @@ +pub use pgdog_config::cache::*; diff --git a/pgdog/src/config/mod.rs b/pgdog/src/config/mod.rs index 835a0f10e..6ecd3785d 100644 --- a/pgdog/src/config/mod.rs +++ b/pgdog/src/config/mod.rs @@ -1,6 +1,7 @@ //! Configuration. // Submodules +pub mod cache; pub mod convert; pub mod core; pub mod database; @@ -15,6 +16,7 @@ pub mod rewrite; pub mod sharding; pub mod users; +pub use cache::*; pub use core::{Config, ConfigAndUsers}; pub use database::{Database, Role}; pub use error::Error; diff --git a/pgdog/src/frontend/cache/context.rs b/pgdog/src/frontend/cache/context.rs new file mode 100644 index 000000000..aeeab7613 --- /dev/null +++ b/pgdog/src/frontend/cache/context.rs @@ -0,0 +1,31 @@ +use crate::{ + frontend::cache::integration::CacheMiss, + net::{messages::Protocol, Message}, +}; + +/// Cache context to use in QueryEngineContext. +#[derive(Default)] +pub struct CacheContext { + pub cache_miss: Option, + pub response_buffer: Vec, + pub had_error: bool, +} + +impl CacheContext { + /// Capture a response message for caching. + pub fn capture_response(&mut self, message: Message) { + if self.cache_miss.is_some() { + if message.code() == 'E' { + self.had_error = true; + } + self.response_buffer.push(message); + } + } + + /// Reset the cache context for a new query. + pub fn reset(&mut self) { + self.cache_miss = None; + self.response_buffer.clear(); + self.had_error = false; + } +} diff --git a/pgdog/src/frontend/cache/integration.rs b/pgdog/src/frontend/cache/integration.rs new file mode 100644 index 000000000..d314e9040 --- /dev/null +++ b/pgdog/src/frontend/cache/integration.rs @@ -0,0 +1,566 @@ +use std::borrow::Cow; +use std::hash::{Hash, Hasher}; + +use crate::{ + frontend::{ + cache::{storage::Error as CacheStorageError, CacheDecision}, + ClientRequest, + }, + net::{bind::Bind, FromBytes, Message, Parameters, ToBytes}, +}; + +use tracing::{debug, warn}; + +use super::{policy, Cache}; + +pub struct CacheMiss { + pub cache_key_hash: u64, + pub ttl: u64, +} + +pub enum CacheCheckResult { + Hit { cached: Vec }, + Miss(CacheMiss), + Passthrough, +} + +/// Strip SQL block comments (`/* ... */`, including nested) and line comments (`-- ...`) +/// from `query`, preserving string literals (`'...'`). +/// +/// Returns `Cow::Borrowed(query)` without any allocation when no comment markers +/// are found. Only allocates and builds a new `String` when a `/*` or `--` +/// sequence is actually present in the input. +pub fn strip_sql_comments(query: &str) -> Cow<'_, str> { + // Fast path: scan bytes for comment markers before doing any allocation. + let bytes = query.as_bytes(); + let has_comment = bytes.windows(2).any(|w| w == b"/*" || w == b"--"); + if !has_comment { + return Cow::Borrowed(query); + } + + let mut result = String::with_capacity(query.len()); + let mut chars = query.chars().peekable(); + + while let Some(c) = chars.next() { + match c { + // Block comment — supports PostgreSQL nested `/* */`. + '/' if chars.peek() == Some(&'*') => { + chars.next(); // consume '*' + let mut depth = 1u32; + while depth > 0 { + match chars.next() { + Some('/') if chars.peek() == Some(&'*') => { + chars.next(); + depth += 1; + } + Some('*') if chars.peek() == Some(&'/') => { + chars.next(); + depth -= 1; + } + None => break, // malformed input + _ => {} + } + } + // Replace the entire comment with a single space to avoid + // accidentally merging adjacent tokens (e.g. `SELECT/*c*/1`). + result.push(' '); + } + // Line comment. + '-' if chars.peek() == Some(&'-') => { + for ch in chars.by_ref() { + if ch == '\n' { + result.push('\n'); + break; + } + } + } + // String literal — pass through unchanged so we don't mistake `--` + // or `/*` inside a string for a comment. + '\'' => { + result.push(c); + while let Some(ch) = chars.next() { + result.push(ch); + if ch == '\'' { + // Standard SQL escaped quote: two consecutive single-quotes. + if chars.peek() == Some(&'\'') { + result.push(chars.next().unwrap()); + } else { + break; + } + } + } + } + _ => result.push(c), + } + } + + Cow::Owned(result) +} + +/// Compute the XXH3 cache key hash for a query. +/// +/// All SQL comments are stripped from `query` before hashing so the hash is identical +/// regardless of whether the cache directive was supplied via a comment or a connection +/// parameter. +pub fn compute_cache_key_hash(database: &str, query: &str, bind: Option<&Bind>) -> u64 { + let mut hasher = xxhash_rust::xxh3::Xxh3Default::new(); + database.hash(&mut hasher); + let stripped = strip_sql_comments(query); + stripped.trim().hash(&mut hasher); + if let Some(bind) = bind { + for param in bind.params_raw() { + param.len.hash(&mut hasher); + param.data.hash(&mut hasher); + } + } + hasher.finish() +} + +const HEADER_CODE_LEN: usize = 1; +const HEADER_LEN_SIZE: usize = 4; +const HEADER_TOTAL: usize = HEADER_CODE_LEN + HEADER_LEN_SIZE; + +impl Cache { + pub(super) async fn cache_check( + &self, + in_transaction: bool, + client_request: &ClientRequest, + params: &Parameters, + ) -> Result { + if in_transaction || !client_request.is_executable() { + return Ok(CacheCheckResult::Passthrough); + } + + let route = match client_request.route.as_ref() { + Some(r) => r, + None => return Ok(CacheCheckResult::Passthrough), + }; + + // Detect read-only status via the AST parser's route classification. + // When caching is enabled, the query parser is auto-enabled. + let is_read = route.is_read(); + if !is_read { + return Ok(CacheCheckResult::Passthrough); + } + + let query = match client_request.query() { + Ok(Some(q)) => q, + _ => return Ok(CacheCheckResult::Passthrough), + }; + + let user = params.get_required("user")?; + let database = params.get_default("database", user); + let bind = client_request.parameters()?; + + let decision = policy::resolve(client_request, params, is_read).await; + match decision { + CacheDecision::Skip => Ok(CacheCheckResult::Passthrough), + CacheDecision::ForceCache(ttl) => Ok(CacheCheckResult::Miss(CacheMiss { + cache_key_hash: compute_cache_key_hash(database, query.query(), bind), + ttl, + })), + CacheDecision::Cache(ttl) => { + let cache_key_hash = compute_cache_key_hash(database, query.query(), bind); + let guard = self.storage.read().await; + match guard.as_ref() { + None => Ok(CacheCheckResult::Passthrough), + Some(storage) => match storage.get(cache_key_hash).await { + Ok(cached) => Ok(CacheCheckResult::Hit { cached }), + Err(CacheStorageError::CacheMiss(_)) => { + Ok(CacheCheckResult::Miss(CacheMiss { + cache_key_hash, + ttl, + })) + } + Err(e) => { + warn!("{}", e); + Ok(CacheCheckResult::Passthrough) + } + }, + } + } + } + } + + /// Deserializes a flat byte blob (N concatenated PostgreSQL wire messages) into `Vec`. + /// + /// Redis stores cache responses as raw wire-format bytes concatenated together without framing. + /// We walk through the blob reading each message boundary, then slice out the individual message. + /// + /// ### PostgreSQL wire protocol message layout: + /// + /// [Source](https://www.postgresql.org/docs/current/protocol-overview.html) + /// + /// ```text + /// +----------+--------------------------+-------------------+ + /// | 1 byte | 4 bytes (big-endian) | N bytes (payload) | + /// | code | length (incl. 4B itself) | data | + /// +----------+--------------------------+-------------------+ + /// ``` + /// + /// Constants for parsing: + /// - `HEADER_CODE_LEN` = 1 byte (message type code, e.g. 'T' = RowDescription) + /// - `HEADER_LEN_SIZE` = 4 bytes (message length, includes itself but NOT the code byte) + /// - `HEADER_TOTAL` = 5 bytes (minimum bytes needed to read the length field) + pub(super) fn deserialize_cached(cached: Vec) -> Vec { + let mut messages = Vec::new(); + let mut offset = 0; + let len = cached.len(); + + while offset < len { + // Need at least a full header (code + length) to proceed. + if offset + HEADER_TOTAL > len { + debug!( + "deserializing cached response: not enough bytes for message header (offset={}, len={})", + offset, len + ); + break; + } + + // Read the message length field (4 bytes, big-endian). + // This length includes the 4-byte length field itself but NOT the code byte. + let msg_len = u32::from_be_bytes([ + cached[offset + 1], + cached[offset + 2], + cached[offset + 3], + cached[offset + 4], + ]) as usize; + + // Sanity checks: + // 1. Length must be at least 4 (the length field itself): if < 4 the data is corrupt. + // 2. Must not read past the end of the blob. + if msg_len < 4 || offset + HEADER_CODE_LEN + msg_len > len { + debug!( + "deserializing cached response: invalid msg length {} (offset={}, len={})", + msg_len, offset, len + ); + break; + } + + // Full message spans: 1 byte (code) + msg_len (length field + payload) + let end = offset + HEADER_CODE_LEN + msg_len; + + let msg_bytes: bytes::Bytes = cached[offset..end].to_vec().into(); + if let Ok(msg) = Message::from_bytes(msg_bytes) { + messages.push(msg); + } + offset = end; + } + + messages + } + + pub(super) async fn cache_response( + &self, + cache_key_hash: u64, + messages: Vec, + ttl: u64, + ) { + let guard = self.storage.read().await; + let storage = match guard.as_ref() { + Some(s) if s.is_enabled() => s, + _ => return, + }; + + if messages.is_empty() { + return; + } + + let mut buffer = Vec::new(); + for msg in &messages { + match msg.to_bytes() { + Ok(bytes) => buffer.extend_from_slice(&bytes), + Err(e) => { + warn!("Failed to serialize message for caching: {}", e); + return; + } + } + } + + if buffer.is_empty() { + return; + } + + if let Err(e) = storage.set(cache_key_hash, &buffer, ttl).await { + warn!("{}", e); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::net::messages::{CommandComplete, Protocol, ReadyForQuery, ToBytes}; + + /// Build a raw wire-format blob from a list of typed protocol messages. + fn wire_bytes(msgs: &[&dyn ToBytes]) -> Vec { + let mut buf = Vec::new(); + for msg in msgs { + buf.extend_from_slice(&msg.to_bytes().unwrap()); + } + buf + } + + #[test] + fn deserialize_empty_input() { + let messages = Cache::deserialize_cached(vec![]); + assert!(messages.is_empty()); + } + + #[test] + fn deserialize_single_message() { + let rfq = ReadyForQuery::idle(); + let blob = wire_bytes(&[&rfq]); + let messages = Cache::deserialize_cached(blob); + assert_eq!(messages.len(), 1); + assert_eq!(messages[0].code(), 'Z'); + } + + #[test] + fn deserialize_multiple_messages_roundtrip() { + let cc = CommandComplete::new("SELECT 1"); + let rfq = ReadyForQuery::idle(); + let blob = wire_bytes(&[&cc, &rfq]); + + let messages = Cache::deserialize_cached(blob); + assert_eq!(messages.len(), 2); + assert_eq!(messages[0].code(), 'C'); + assert_eq!(messages[1].code(), 'Z'); + } + + #[test] + fn deserialize_roundtrip_payload_matches() { + let cc = CommandComplete::new("SELECT 42"); + let rfq = ReadyForQuery::idle(); + let original: Vec = vec![ + Message::new(cc.to_bytes().unwrap()), + Message::new(rfq.to_bytes().unwrap()), + ]; + + // Serialize to flat blob exactly as cache_response does. + let mut blob = Vec::new(); + for msg in &original { + blob.extend_from_slice(&msg.to_bytes().unwrap()); + } + + let deserialized = Cache::deserialize_cached(blob); + assert_eq!(deserialized.len(), original.len()); + for (d, o) in deserialized.iter().zip(original.iter()) { + assert_eq!(d.payload(), o.payload()); + } + } + + #[test] + fn deserialize_truncated_header_no_panic() { + // Only 3 bytes — not enough for a full 5-byte header. + let truncated = vec![b'Z', 0x00, 0x00]; + let messages = Cache::deserialize_cached(truncated); + assert!(messages.is_empty()); + } + + #[test] + fn deserialize_truncated_payload_no_panic() { + // Valid header claiming length 8 (4-byte len field + 4-byte payload), + // but we only provide the header and 2 payload bytes instead of 4. + let mut blob = Vec::new(); + blob.push(b'C'); // code byte + blob.extend_from_slice(&8u32.to_be_bytes()); // length = 8 (includes itself) + blob.extend_from_slice(&[0u8, 0]); // only 2 of the expected 4 payload bytes + let messages = Cache::deserialize_cached(blob); + assert!(messages.is_empty()); + } + + #[test] + fn deserialize_corrupt_length_no_panic() { + // Length field set to 0 — invalid (must be >= 4). + let mut blob = Vec::new(); + blob.push(b'Z'); + blob.extend_from_slice(&0u32.to_be_bytes()); + let messages = Cache::deserialize_cached(blob); + assert!(messages.is_empty()); + } + + #[test] + fn deserialize_length_of_three_no_panic() { + // Length field = 3 — below minimum of 4, should be rejected. + let mut blob = Vec::new(); + blob.push(b'Z'); + blob.extend_from_slice(&3u32.to_be_bytes()); + blob.extend_from_slice(&[0u8; 3]); + let messages = Cache::deserialize_cached(blob); + assert!(messages.is_empty()); + } + + #[test] + fn deserialize_many_messages() { + // Round-trip 10 CommandComplete messages. + let n = 10usize; + let mut blob = Vec::new(); + for i in 0..n { + let cc = CommandComplete::new(format!("SELECT {}", i)); + blob.extend_from_slice(&cc.to_bytes().unwrap()); + } + + let messages = Cache::deserialize_cached(blob); + assert_eq!(messages.len(), n); + for msg in &messages { + assert_eq!(msg.code(), 'C'); + } + } + + // ------------------------------------------------------------------------- + // strip_sql_comments tests + // ------------------------------------------------------------------------- + + #[test] + fn strip_no_comments() { + let q = "SELECT 1"; + assert_eq!(strip_sql_comments(q), "SELECT 1"); + } + + #[test] + fn strip_no_comments_returns_borrowed() { + // When there are no comment markers the original slice must be returned + // without any allocation (Cow::Borrowed). + let q = "SELECT 1 FROM t WHERE id = 42"; + assert!(matches!( + strip_sql_comments(q), + std::borrow::Cow::Borrowed(_) + )); + } + + #[test] + fn strip_with_comment_returns_owned() { + let q = "/* hint */ SELECT 1"; + assert!(matches!(strip_sql_comments(q), std::borrow::Cow::Owned(_))); + } + + #[test] + fn strip_block_comment() { + let q = "/* pgdog_cache: cache */ SELECT 1"; + let stripped = strip_sql_comments(q); + assert!(!stripped.contains("pgdog_cache")); + assert!(stripped.contains("SELECT 1")); + } + + #[test] + fn strip_line_comment() { + let q = "-- pgdog_cache: cache\nSELECT 1"; + let stripped = strip_sql_comments(q); + assert!(!stripped.contains("pgdog_cache")); + assert!(stripped.contains("SELECT 1")); + } + + #[test] + fn strip_nested_block_comments() { + let q = "/* outer /* inner */ still outer */ SELECT 2"; + let stripped = strip_sql_comments(q); + assert!(!stripped.contains("outer")); + assert!(!stripped.contains("inner")); + assert!(stripped.contains("SELECT 2")); + } + + #[test] + fn strip_does_not_remove_string_literal_contents() { + let q = "SELECT '/* not a comment */' FROM t"; + let stripped = strip_sql_comments(q); + // The string literal must be preserved verbatim. + assert!(stripped.contains("'/* not a comment */'")); + } + + #[test] + fn strip_preserves_escaped_quotes_in_literal() { + let q = "SELECT 'it''s fine' FROM t"; + let stripped = strip_sql_comments(q); + assert_eq!(stripped, "SELECT 'it''s fine' FROM t"); + } + + #[test] + fn strip_multiple_block_comments() { + let q = "/* a */ SELECT /* b */ 1"; + let stripped = strip_sql_comments(q); + assert!(!stripped.contains("/* a */")); + assert!(!stripped.contains("/* b */")); + assert!(stripped.contains("SELECT")); + assert!(stripped.contains("1")); + } + + // ------------------------------------------------------------------------- + // compute_cache_key_hash tests + // ------------------------------------------------------------------------- + + #[test] + fn hash_is_stable() { + let h1 = compute_cache_key_hash("mydb", "SELECT 1", None); + let h2 = compute_cache_key_hash("mydb", "SELECT 1", None); + assert_eq!(h1, h2); + } + + #[test] + fn hash_differs_by_database() { + let h1 = compute_cache_key_hash("db1", "SELECT 1", None); + let h2 = compute_cache_key_hash("db2", "SELECT 1", None); + assert_ne!(h1, h2); + } + + #[test] + fn hash_differs_by_query() { + let h1 = compute_cache_key_hash("db", "SELECT 1", None); + let h2 = compute_cache_key_hash("db", "SELECT 2", None); + assert_ne!(h1, h2); + } + + #[test] + fn hash_same_with_and_without_cache_comment() { + // A block comment containing the cache directive must be stripped so + // the hash is the same whether the directive was in a comment or a + // connection parameter. + let h_with_comment = + compute_cache_key_hash("db", "/* pgdog_cache: cache */ SELECT 1", None); + let h_without_comment = compute_cache_key_hash("db", "SELECT 1", None); + assert_eq!(h_with_comment, h_without_comment); + } + + #[test] + fn hash_same_for_force_cache_and_regular_comment() { + // force_cache and cache hints should produce the same hash (both are + // stripped before hashing, so the underlying query is identical). + let h_force = compute_cache_key_hash("db", "/* pgdog_cache: force_cache */ SELECT 1", None); + let h_cache = compute_cache_key_hash("db", "/* pgdog_cache: cache */ SELECT 1", None); + let h_plain = compute_cache_key_hash("db", "SELECT 1", None); + assert_eq!(h_force, h_cache); + assert_eq!(h_force, h_plain); + } + + #[test] + fn hash_same_for_line_comment_cache_directive() { + let h_with_line = compute_cache_key_hash("db", "-- pgdog_cache: cache\nSELECT 1", None); + let h_plain = compute_cache_key_hash("db", "SELECT 1", None); + assert_eq!(h_with_line, h_plain); + } + + #[test] + fn hash_differs_by_bind_params() { + use crate::net::messages::bind::{Bind, Parameter}; + use bytes::Bytes; + use pgdog_postgres_types::Format; + + let make_bind = |val: &'static [u8]| { + let mut b = Bind::default(); + b.push_param( + Parameter { + len: val.len() as i32, + data: Bytes::from_static(val), + }, + Format::Text, + ); + b + }; + + let b1 = make_bind(b"1"); + let b2 = make_bind(b"2"); + let h1 = compute_cache_key_hash("db", "SELECT $1", Some(&b1)); + let h2 = compute_cache_key_hash("db", "SELECT $1", Some(&b2)); + assert_ne!(h1, h2); + } +} diff --git a/pgdog/src/frontend/cache/mod.rs b/pgdog/src/frontend/cache/mod.rs new file mode 100644 index 000000000..6c0023b2d --- /dev/null +++ b/pgdog/src/frontend/cache/mod.rs @@ -0,0 +1,143 @@ +pub mod context; +pub mod integration; +pub mod policy; +pub mod storage; + +pub use context::CacheContext; +pub use integration::CacheCheckResult; +pub use policy::CacheDecision; +pub use storage::{CacheStorage, RedisCacheStorage}; + +use once_cell::sync::Lazy; +use std::sync::Arc; +use tokio::sync::RwLock; +use tracing::debug; + +use crate::{ + config::config, + frontend::{ + cache::{integration::CacheMiss, storage::build_storage}, + ClientRequest, + }, + net::{Message, Parameters}, +}; + +/// Wraps the active storage backend behind a tokio `RwLock` so it can be +/// hotswapped without restarting pgdog. +pub struct Cache { + storage: RwLock>>, +} + +impl std::fmt::Debug for Cache { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Cache").field("storage", &"...").finish() + } +} + +static CACHE: Lazy> = Lazy::new(|| Arc::new(Cache::new())); + +pub fn cache() -> Arc { + CACHE.clone() +} + +impl Cache { + fn new() -> Self { + let storage = build_storage(); + Cache { + storage: RwLock::new(storage), + } + } + + /// Replace the storage backend if the config has changed (URL or backend type). + /// + /// Acquires the write lock only when a change is detected; otherwise the + /// read-lock path is zero-allocation and very fast. + async fn hotswap_if_needed(&self) { + let cfg = &config().config.general.cache; + + // Fast path: read-lock to check whether anything has changed. + { + let guard = self.storage.read().await; + let needs_swap = match guard.as_ref() { + Some(s) => s.has_config_changed(cfg), + None => cfg.enabled, + }; + if !needs_swap { + return; + } + } + + // Slow path: write-lock and rebuild. + let mut guard = self.storage.write().await; + // Re-check under the write lock (another task may have already swapped). + let needs_swap = match guard.as_ref() { + Some(s) => s.has_config_changed(cfg), + None => cfg.enabled, + }; + + if needs_swap { + debug!("Cache storage config changed — rebuilding backend"); + *guard = build_storage(); + } + } + + // ── public API ─────────────────────────────────────────────────────────── + + /// Check the cache for a query response. + /// + /// On HIT returns `Ok(Some(messages))` — the caller is responsible for + /// replaying these messages through the normal server-message pipeline. + /// + /// On MISS or PASSTHROUGH returns `Ok(None)` and updates `cache_context` + /// so that the response can later be captured and stored via + /// `save_response_in_cache`. + pub async fn try_read_cache( + &self, + cache_context: &mut CacheContext, + in_transaction: bool, + client_request: &ClientRequest, + params: &Parameters, + ) -> Result>, crate::frontend::Error> { + self.hotswap_if_needed().await; + + let cache_result = self + .cache_check(in_transaction, client_request, params) + .await?; + + match cache_result { + CacheCheckResult::Hit { cached } => { + debug!("Cache hit, serving from cache"); + let messages = Self::deserialize_cached(cached); + cache_context.reset(); + Ok(Some(messages)) + } + CacheCheckResult::Miss(cache_miss) => { + debug!("Cache miss for key hash: {}", cache_miss.cache_key_hash); + cache_context.cache_miss = Some(cache_miss); + cache_context.response_buffer.clear(); + cache_context.had_error = false; + Ok(None) + } + CacheCheckResult::Passthrough => { + cache_context.reset(); + Ok(None) + } + } + } + + /// Finalize caching by storing the response in the active backend. + pub async fn save_response_in_cache(&self, cache_context: &mut CacheContext) { + self.hotswap_if_needed().await; + + if let Some(CacheMiss { + cache_key_hash, + ttl, + }) = cache_context.cache_miss.take() + { + if !cache_context.had_error && !cache_context.response_buffer.is_empty() { + let messages = std::mem::take(&mut cache_context.response_buffer); + self.cache_response(cache_key_hash, messages, ttl).await; + } + } + } +} diff --git a/pgdog/src/frontend/cache/policy.rs b/pgdog/src/frontend/cache/policy.rs new file mode 100644 index 000000000..862fbc922 --- /dev/null +++ b/pgdog/src/frontend/cache/policy.rs @@ -0,0 +1,212 @@ +use crate::config::{config, CachePolicy}; +use crate::frontend::ClientRequest; +use crate::net::parameter::ParameterValue; +use crate::net::Parameters; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum CacheDirective { + Cache { + ttl_seconds: Option, + }, + ForceCache { + ttl_seconds: Option, + }, + #[default] + NoCache, +} + +pub enum CacheDecision { + Skip, + Cache(u64), + ForceCache(u64), +} + +const KEY: &str = "pgdog.cache"; + +pub async fn resolve( + client_request: &ClientRequest, + params: &Parameters, + is_read: bool, +) -> CacheDecision { + let cache_config = &config().config.general.cache; + + if !is_read { + return CacheDecision::Skip; + } + + let cache_directive = get_cache_directive(client_request, params); + match cache_directive { + Some(CacheDirective::NoCache) => return CacheDecision::Skip, + Some(CacheDirective::Cache { ttl_seconds }) => { + return CacheDecision::Cache(ttl_seconds.unwrap_or(cache_config.ttl)) + } + Some(CacheDirective::ForceCache { ttl_seconds }) => { + return CacheDecision::ForceCache(ttl_seconds.unwrap_or(cache_config.ttl)) + } + _ => (), + } + + match cache_config.policy { + CachePolicy::NoCache => CacheDecision::Skip, + CachePolicy::Cache => CacheDecision::Cache(cache_config.ttl), + } +} + +// Comment hint has priority over connection parameter +fn get_cache_directive( + client_request: &ClientRequest, + params: &Parameters, +) -> Option { + client_request + .ast + .as_ref() + .and_then(|ast| ast.comment_cache) + .or_else(|| extract_parameter_directive(params)) +} + +fn extract_parameter_directive(params: &Parameters) -> Option { + let value = params.get(KEY)?; + let s = match value { + ParameterValue::String(v) => v.as_str().trim(), + _ => return None, + }; + + match s { + "no_cache" => return Some(CacheDirective::NoCache), + "force_cache" => return Some(CacheDirective::ForceCache { ttl_seconds: None }), + "cache" => return Some(CacheDirective::Cache { ttl_seconds: None }), + _ => (), + } + + if let Some(ttl) = s + .strip_prefix("force_cache") + .or_else(|| s.strip_prefix("cache")) + .map(|s| s.trim_start()) + .and_then(|s| s.strip_prefix("ttl=")) + .and_then(|t| t.trim().parse::().ok()) + { + let ttl_seconds = Some(ttl); + if s.starts_with("force_cache") { + return Some(CacheDirective::ForceCache { ttl_seconds }); + } else { + return Some(CacheDirective::Cache { ttl_seconds }); + } + } + + None +} + +#[cfg(test)] +mod tests { + use super::*; + + fn extract(s: &str) -> Option { + let mut params = Parameters::default(); + params.insert("pgdog.cache", s); + extract_parameter_directive(¶ms) + } + + #[test] + fn no_cache_directive() { + assert_eq!(extract("no_cache"), Some(CacheDirective::NoCache)); + } + + #[test] + fn cache_directive_no_ttl() { + assert_eq!( + extract("cache"), + Some(CacheDirective::Cache { ttl_seconds: None }) + ); + } + + #[test] + fn cache_directive_with_ttl() { + assert_eq!( + extract("cache ttl=60"), + Some(CacheDirective::Cache { + ttl_seconds: Some(60) + }) + ); + } + + #[test] + fn cache_directive_with_large_ttl() { + assert_eq!( + extract("cache ttl=86400"), + Some(CacheDirective::Cache { + ttl_seconds: Some(86400) + }) + ); + } + + #[test] + fn force_cache_no_ttl() { + assert_eq!( + extract("force_cache"), + Some(CacheDirective::ForceCache { ttl_seconds: None }) + ); + } + + #[test] + fn force_cache_with_ttl() { + assert_eq!( + extract("force_cache ttl=120"), + Some(CacheDirective::ForceCache { + ttl_seconds: Some(120) + }) + ); + } + + #[test] + fn garbage_input_returns_none() { + assert_eq!(extract("garbage"), None); + } + + #[test] + fn invalid_ttl_letters_returns_none() { + assert_eq!(extract("cache ttl=abc"), None); + } + + #[test] + fn empty_ttl_returns_none() { + assert_eq!(extract("cache ttl="), None); + } + + #[test] + fn ttl_zero_is_valid() { + // 0 is a valid u64, even if semantically it means "expire immediately" + assert_eq!( + extract("cache ttl=0"), + Some(CacheDirective::Cache { + ttl_seconds: Some(0) + }) + ); + } + + #[test] + fn missing_key_returns_none() { + let params = Parameters::default(); + assert_eq!(extract_parameter_directive(¶ms), None); + } + + #[test] + fn force_cache_invalid_ttl_returns_none() { + assert_eq!(extract("force_cache ttl=bad"), None); + } + + #[test] + fn force_cache_empty_ttl_returns_none() { + assert_eq!(extract("force_cache ttl="), None); + } + + #[test] + fn whitespace_trimmed_around_value() { + // The value stored in the param is retrieved with .trim() in extract_parameter_directive + let mut params = Parameters::default(); + params.insert("pgdog.cache", " no_cache "); + assert_eq!( + extract_parameter_directive(¶ms), + Some(CacheDirective::NoCache) + ); + } +} diff --git a/pgdog/src/frontend/cache/storage/mod.rs b/pgdog/src/frontend/cache/storage/mod.rs new file mode 100644 index 000000000..91a7b377d --- /dev/null +++ b/pgdog/src/frontend/cache/storage/mod.rs @@ -0,0 +1,58 @@ +pub mod redis; + +pub use redis::RedisCacheStorage; + +use async_trait::async_trait; + +use crate::config::{ + cache::{Cache as CacheConfig, CacheBackend}, + config, +}; + +/// Errors returned by cache storage backends. +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("Redis {cmd} error for key {key}: {err}")] + RedisError { + cmd: &'static str, + key: u64, + err: fred::error::RedisError, + }, + #[error("Connection failed: {0}")] + ConnectionFailed(&'static str), + #[error("Cache miss for key {0}")] + CacheMiss(u64), +} + +/// Abstract cache storage backend. +/// +/// Implementations must be `Send + Sync` so they can be held behind +/// something like `Arc>` and shared across async tasks. +#[async_trait] +pub trait CacheStorage: Send + Sync { + /// Fetch cached bytes for `key`. Returns [`Error::CacheMiss`] when the + /// key is absent (not an error condition — used for control flow). + async fn get(&self, key: u64) -> Result, Error>; + + /// Store `value` under `key` with a `ttl` in seconds. + async fn set(&self, key: u64, value: &[u8], ttl: u64) -> Result<(), Error>; + + /// Returns `true` when the backend is configured and enabled. + fn is_enabled(&self) -> bool; + + /// Returns `true` if cache config has changed (used for hotswap detection). + fn has_config_changed(&self, new_config: &CacheConfig) -> bool; +} + +/// Construct the appropriate storage backend from the current config. +pub fn build_storage() -> Option> { + let cfg = &config().config.general.cache; + if !cfg.enabled { + return None; + } + match cfg.backend { + CacheBackend::Redis => { + RedisCacheStorage::new(cfg).map(|s| Box::new(s) as Box) + } + } +} diff --git a/pgdog/src/frontend/cache/storage/redis.rs b/pgdog/src/frontend/cache/storage/redis.rs new file mode 100644 index 000000000..e0aec87a6 --- /dev/null +++ b/pgdog/src/frontend/cache/storage/redis.rs @@ -0,0 +1,246 @@ +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use fred::prelude::*; +use pgdog_config::CacheBackend; +use tracing::{debug, error, info}; + +use crate::config::{cache::Cache as CacheConfig, config}; + +use super::{CacheStorage, Error}; + +/// Timeout for individual Redis operations (GET/SET/ping). +const REDIS_OPERATION_TIMEOUT: Duration = Duration::from_secs(2); +/// Max time between reconnection attempts +const MAX_REDIS_RECONNECTION_PERIOD: Duration = Duration::from_secs(5); + +/// Redis implementation of [`CacheStorage`]. +/// +/// Connection is established in a background task spawned from [`RedisCacheStorage::new`]. +/// All operations return immediately if the connection is not yet ready — `get` returns +/// [`Error::ConnectionFailed`] (triggering a cache-miss path) and `set` is silently dropped. +/// +/// At most one reconnect task runs at any time, enforced by a CAS on `reconnecting`. +pub struct RedisCacheStorage { + client: RedisClient, + /// Cache config. + config: CacheConfig, + /// Guards against spawning multiple concurrent reconnect tasks. + reconnecting: Arc, +} + +impl std::fmt::Debug for RedisCacheStorage { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RedisCacheStorage") + .field("config", &self.config) + .field("reconnecting", &self.reconnecting.load(Ordering::Relaxed)) + .finish() + } +} + +impl RedisCacheStorage { + /// Build a new storage instance for `url` and immediately start a background + /// connection task. Returns `None` when the URL cannot be parsed. + pub fn new(config: &CacheConfig) -> Option { + let client_config = match RedisConfig::from_url(&config.redis.url) { + Ok(c) => c, + Err(e) => { + error!("Failed to parse Redis URL '{}': {}", config.redis.url, e); + return None; + } + }; + + let client = match Builder::from_config(client_config).build() { + Ok(c) => c, + Err(e) => { + error!("Failed to build Redis client: {}", e); + return None; + } + }; + + let reconnecting = Arc::new(AtomicBool::new(true)); // treat initial connect as "reconnecting" + + let storage = Self { + client, + config: config.clone(), + reconnecting, + }; + + // Fire-and-forget initial connection. + storage.spawn_connect_task(); + + Some(storage) + } + + // ── internal helpers ──────────────────────────────────────────────────── + + /// Spawn the (re)connect background loop. Uses a CAS to ensure only one + /// task is ever running at a time. + fn spawn_connect_task(&self) { + let client = self.client.clone(); + let reconnecting = self.reconnecting.clone(); + + tokio::spawn(async move { + info!("Redis connect task started"); + let mut attempt = 0u32; + + loop { + attempt += 1; + debug!("Redis connect attempt #{}", attempt); + + let init_ok = + match tokio::time::timeout(REDIS_OPERATION_TIMEOUT, client.init()).await { + Ok(Ok(_)) => true, + Ok(Err(e)) => { + debug!("Redis init error: {}", e); + false + } + Err(_) => { + debug!("Redis init timed out"); + false + } + }; + + if init_ok { + reconnecting.store(false, Ordering::Release); + info!("Redis connected (attempt #{})", attempt); + return; + } + + // Exponential backoff + tokio::time::sleep( + const { Duration::from_millis(5) } + .saturating_mul(1u32 << attempt.min(10)) + .min(MAX_REDIS_RECONNECTION_PERIOD), + ) + .await; + } + }); + } + + /// Mark the reconnecting as true and spawn a reconnect task if one is not + /// already running. + fn reconnect(&self) { + if self + .reconnecting + .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) + .is_ok() + { + self.spawn_connect_task(); + } else { + debug!("Redis reconnect task already running"); + } + } +} + +#[async_trait] +impl CacheStorage for RedisCacheStorage { + async fn get(&self, key: u64) -> Result, Error> { + if self.reconnecting.load(Ordering::Acquire) { + return Err(Error::ConnectionFailed("Redis not connected")); + } + + let full_key = format!("{}{}", self.config.redis.cache_key_prefix, key); + + let redis_result = tokio::time::timeout( + REDIS_OPERATION_TIMEOUT, + self.client.get::(full_key), + ) + .await; + let val = match redis_result { + Ok(Ok(v)) => v, + Ok(Err(err)) => { + self.reconnect(); + return Err(Error::RedisError { + cmd: "GET", + key, + err, + }); + } + Err(_) => { + self.reconnect(); + return Err(Error::ConnectionFailed("Redis GET timed out")); + } + }; + + match val.into_bytes() { + Some(bytes) => { + debug!("Cache hit for key {}", key); + Ok(bytes.to_vec()) + } + None => Err(Error::CacheMiss(key)), + } + } + + async fn set(&self, key: u64, value: &[u8], ttl: u64) -> Result<(), Error> { + if self.reconnecting.load(Ordering::Acquire) { + return Err(Error::ConnectionFailed("Redis not connected")); + } + + let max_result_size = config().config.general.cache.max_result_size; + if max_result_size != 0 && value.len() > max_result_size { + debug!( + "Skipping cache for key {}: size {} exceeds max {}", + key, + value.len(), + max_result_size + ); + return Ok(()); + } + + let full_key = format!("{}{}", self.config.redis.cache_key_prefix, key); + let ttl_seconds = ttl as i64; + + match tokio::time::timeout( + REDIS_OPERATION_TIMEOUT, + self.client.set::<(), _, _>( + full_key, + value, + Some(Expiration::EX(ttl_seconds)), + None, + false, + ), + ) + .await + { + Ok(Ok(_)) => { + debug!("Cached key {} with TTL {}s", key, ttl_seconds); + Ok(()) + } + Ok(Err(err)) => { + self.reconnect(); + Err(Error::RedisError { + cmd: "SET", + key, + err, + }) + } + Err(_) => { + self.reconnect(); + Err(Error::ConnectionFailed("Redis SET timed out")) + } + } + } + + fn is_enabled(&self) -> bool { + config().config.general.cache.enabled + } + + fn has_config_changed(&self, new_config: &CacheConfig) -> bool { + new_config.backend != CacheBackend::Redis + || self.config.redis.cmp(&new_config.redis).is_ne() + } +} + +// Avoid shallow copy +impl Clone for RedisCacheStorage { + fn clone(&self) -> Self { + Self { + client: self.client.clone_new(), + config: self.config.clone(), + reconnecting: Arc::new(AtomicBool::new(false)), + } + } +} diff --git a/pgdog/src/frontend/client/query_engine/context.rs b/pgdog/src/frontend/client/query_engine/context.rs index b54751a35..6a1fe3c38 100644 --- a/pgdog/src/frontend/client/query_engine/context.rs +++ b/pgdog/src/frontend/client/query_engine/context.rs @@ -1,6 +1,7 @@ use crate::{ backend::pool::{connection::mirror::Mirror, stats::MemoryStats}, frontend::{ + cache::context::CacheContext, client::{timeouts::Timeouts, Sticky, TransactionType}, router::parser::rewrite::statement::plan::RewriteResult, Client, ClientRequest, PreparedStatements, @@ -39,6 +40,8 @@ pub struct QueryEngineContext<'a> { pub(super) sticky: Sticky, /// Rewrite result. pub(super) rewrite_result: Option, + /// Cache context. + pub(super) cache_context: CacheContext, } impl<'a> QueryEngineContext<'a> { @@ -60,6 +63,7 @@ impl<'a> QueryEngineContext<'a> { rollback: false, sticky: client.sticky, rewrite_result: None, + cache_context: CacheContext::default(), } } @@ -86,6 +90,7 @@ impl<'a> QueryEngineContext<'a> { rollback: false, sticky: Sticky::new(), rewrite_result: None, + cache_context: CacheContext::default(), } } diff --git a/pgdog/src/frontend/client/query_engine/mod.rs b/pgdog/src/frontend/client/query_engine/mod.rs index f0dc8979b..9223f5b86 100644 --- a/pgdog/src/frontend/client/query_engine/mod.rs +++ b/pgdog/src/frontend/client/query_engine/mod.rs @@ -2,6 +2,7 @@ use crate::{ backend::pool::{Connection, Request}, config::config, frontend::{ + cache::cache, client::query_engine::{hooks::QueryEngineHooks, route_query::ClusterCheck}, router::{parser::Shard, Route}, BufferedQuery, Client, ClientComms, Command, Error, Router, RouterContext, Stats, @@ -129,6 +130,23 @@ impl QueryEngine { return Ok(()); } + let in_transaction = context.in_transaction(); + if let Some(cached_messages) = cache() + .try_read_cache( + &mut context.cache_context, + in_transaction, + context.client_request, + context.params, + ) + .await? + { + for msg in cached_messages { + self.process_server_message(context, msg).await?; + } + self.update_stats(context); + return Ok(()); + } + self.hooks.before_execution(context)?; // Queue up request to mirrors, if any. @@ -228,6 +246,10 @@ impl QueryEngine { command => self.unknown_command(context, command.clone()).await?, } + cache() + .save_response_in_cache(&mut context.cache_context) + .await; + self.hooks.after_execution(context)?; if context.in_error() { diff --git a/pgdog/src/frontend/client/query_engine/query.rs b/pgdog/src/frontend/client/query_engine/query.rs index 231d936cd..0775b4682 100644 --- a/pgdog/src/frontend/client/query_engine/query.rs +++ b/pgdog/src/frontend/client/query_engine/query.rs @@ -120,6 +120,8 @@ impl QueryEngine { context: &mut QueryEngineContext<'_>, mut message: Message, ) -> Result<(), Error> { + context.cache_context.capture_response(message.clone()); + self.streaming = message.streaming(); let code = message.code(); diff --git a/pgdog/src/frontend/mod.rs b/pgdog/src/frontend/mod.rs index 284b777b0..aa1bbe523 100644 --- a/pgdog/src/frontend/mod.rs +++ b/pgdog/src/frontend/mod.rs @@ -1,6 +1,7 @@ //! pgDog frontend manages connections to clients. pub mod buffered_query; +pub mod cache; pub mod client; pub mod client_request; pub mod comms; diff --git a/pgdog/src/frontend/router/parser/cache/ast.rs b/pgdog/src/frontend/router/parser/cache/ast.rs index c34d865dc..855792f96 100644 --- a/pgdog/src/frontend/router/parser/cache/ast.rs +++ b/pgdog/src/frontend/router/parser/cache/ast.rs @@ -12,6 +12,7 @@ use super::super::{ }; use super::{Fingerprint, Stats}; use crate::backend::schema::Schema; +use crate::frontend::cache::policy::CacheDirective; use crate::frontend::router::parser::rewrite::statement::RewritePlan; use crate::frontend::{BufferedQuery, PreparedStatements}; use crate::net::parameter::ParameterValue; @@ -37,6 +38,8 @@ pub struct AstInner { pub comment_shard: Option, /// Role. pub comment_role: Option, + /// Cache. + pub comment_cache: Option, /// Rewrite plan. pub rewrite_plan: RewritePlan, /// Fingerprint. @@ -51,6 +54,7 @@ impl AstInner { stats: Mutex::new(Stats::new()), comment_role: None, comment_shard: None, + comment_cache: None, rewrite_plan: RewritePlan::default(), fingerprint: Fingerprint::default(), } @@ -81,7 +85,7 @@ impl Ast { QueryParserEngine::PgQueryRaw => parse_raw(query), } .map_err(Error::PgQuery)?; - let (comment_shard, comment_role) = comment(query, schema)?; + let (comment_shard, comment_role, comment_cache) = comment(query, schema)?; let fingerprint = Fingerprint::new(query, schema.query_parser_engine).map_err(Error::PgQuery)?; @@ -113,6 +117,7 @@ impl Ast { stats: Mutex::new(stats), comment_shard, comment_role, + comment_cache, ast, rewrite_plan, fingerprint, diff --git a/pgdog/src/frontend/router/parser/comment.rs b/pgdog/src/frontend/router/parser/comment.rs index a87883adb..29494c287 100644 --- a/pgdog/src/frontend/router/parser/comment.rs +++ b/pgdog/src/frontend/router/parser/comment.rs @@ -6,6 +6,7 @@ use regex::Regex; use crate::backend::ShardingSchema; use crate::config::database::Role; +use crate::frontend::cache::policy::CacheDirective; use crate::frontend::router::sharding::ContextBuilder; use super::super::parser::Shard; @@ -16,6 +17,9 @@ static SHARDING_KEY: Lazy = Lazy::new(|| { Regex::new(r#"pgdog_sharding_key: *(?:"([^"]*)"|'([^']*)'|([0-9a-zA-Z-]+))"#).unwrap() }); static ROLE: Lazy = Lazy::new(|| Regex::new(r#"pgdog_role: *(primary|replica)"#).unwrap()); +static CACHE: Lazy = Lazy::new(|| { + Regex::new(r#"pgdog_cache: *(no_cache|force_cache(?:\s+ttl\s*=\s*([0-9]+))?|cache(?:\s+ttl\s*=\s*([0-9]+))?)?"#).unwrap() +}); fn get_matched_value<'a>(caps: &'a regex::Captures<'a>) -> Option<&'a str> { caps.get(1) @@ -24,23 +28,24 @@ fn get_matched_value<'a>(caps: &'a regex::Captures<'a>) -> Option<&'a str> { .map(|m| m.as_str()) } -/// Extract shard number from a comment. +/// Extract shard number, role and cache directive from a comment. /// /// Comment style uses the C-style comments (not SQL comments!) /// as to allow the comment to appear anywhere in the query. /// -/// See [`SHARD`] and [`SHARDING_KEY`] for the style of comment we expect. +/// See [`SHARD`], [`SHARDING_KEY`], [`ROLE`] and [`CACHE`] for the style of comment we expect. /// pub fn comment( query: &str, schema: &ShardingSchema, -) -> Result<(Option, Option), Error> { +) -> Result<(Option, Option, Option), Error> { let tokens = match schema.query_parser_engine { QueryParserEngine::PgQueryProtobuf => scan(query), QueryParserEngine::PgQueryRaw => scan_raw(query), } .map_err(Error::PgQuery)?; let mut role = None; + let mut cache = None; for token in tokens.tokens.iter() { if token.token == Token::CComment as i32 { @@ -54,15 +59,29 @@ pub fn comment( } } } + if let Some(cap) = CACHE.captures(comment) { + if let Some(action) = cap.get(1) { + let action = action.as_str(); + if action == "no_cache" { + cache = Some(CacheDirective::NoCache); + } else if action.starts_with("force_cache") { + let ttl = cap.get(2).and_then(|m| m.as_str().parse::().ok()); + cache = Some(CacheDirective::ForceCache { ttl_seconds: ttl }); + } else { + let ttl = cap.get(3).and_then(|m| m.as_str().parse::().ok()); + cache = Some(CacheDirective::Cache { ttl_seconds: ttl }); + } + } + } if let Some(cap) = SHARDING_KEY.captures(comment) { if let Some(sharding_key) = get_matched_value(&cap) { if let Some(schema) = schema.schemas.get(Some(sharding_key.into())) { - return Ok((Some(schema.shard().into()), role)); + return Ok((Some(schema.shard().into()), role, cache)); } let ctx = ContextBuilder::infer_from_from_and_config(sharding_key, schema)? .shards(schema.shards) .build()?; - return Ok((Some(ctx.apply()?), role)); + return Ok((Some(ctx.apply()?), role, cache)); } } if let Some(cap) = SHARD.captures(comment) { @@ -77,13 +96,14 @@ pub fn comment( .unwrap_or(Shard::All), ), role, + cache, )); } } } } - Ok((None, role)) + Ok((None, role, cache)) } #[cfg(test)] @@ -255,4 +275,132 @@ mod tests { let result = comment(query, &schema).unwrap(); assert_eq!(result.0, Some(Shard::Direct(1))); } + + #[test] + fn test_cache_hint_no_cache() { + use crate::backend::ShardedTables; + + let schema = ShardingSchema { + shards: 2, + tables: ShardedTables::new(vec![], vec![], false, SystemCatalogsBehavior::default()), + ..Default::default() + }; + + let query = "SELECT * FROM users /* pgdog_cache: no_cache */"; + let result = comment(query, &schema).unwrap(); + assert!(matches!(result.2, Some(CacheDirective::NoCache))); + } + + #[test] + fn test_cache_hint_cache_default_ttl() { + use crate::backend::ShardedTables; + + let schema = ShardingSchema { + shards: 2, + tables: ShardedTables::new(vec![], vec![], false, SystemCatalogsBehavior::default()), + ..Default::default() + }; + + let query = "SELECT * FROM users /* pgdog_cache: cache */"; + let result = comment(query, &schema).unwrap(); + assert!(matches!( + result.2, + Some(CacheDirective::Cache { ttl_seconds: None }) + )); + } + + #[test] + fn test_cache_hint_cache_with_ttl() { + use crate::backend::ShardedTables; + + let schema = ShardingSchema { + shards: 2, + tables: ShardedTables::new(vec![], vec![], false, SystemCatalogsBehavior::default()), + ..Default::default() + }; + + let query = "SELECT * FROM users /* pgdog_cache: cache ttl=60 */"; + let result = comment(query, &schema).unwrap(); + assert!(matches!( + result.2, + Some(CacheDirective::Cache { + ttl_seconds: Some(60) + }) + )); + } + + #[test] + fn test_cache_hint_no_directive() { + use crate::backend::ShardedTables; + + let schema = ShardingSchema { + shards: 2, + tables: ShardedTables::new(vec![], vec![], false, SystemCatalogsBehavior::default()), + ..Default::default() + }; + + let query = "SELECT * FROM users"; + let result = comment(query, &schema).unwrap(); + assert!(matches!(result.2, None)); + } + + #[test] + fn test_combined_shard_and_cache_hints() { + use crate::backend::ShardedTables; + + let schema = ShardingSchema { + shards: 2, + tables: ShardedTables::new(vec![], vec![], false, SystemCatalogsBehavior::default()), + ..Default::default() + }; + + let query = "SELECT * FROM users /* pgdog_role: replica pgdog_shard: 1 pgdog_cache: cache ttl=300 */"; + let result = comment(query, &schema).unwrap(); + assert_eq!(result.1, Some(Role::Replica)); + assert_eq!(result.0, Some(Shard::Direct(1))); + assert!(matches!( + result.2, + Some(CacheDirective::Cache { + ttl_seconds: Some(300) + }) + )); + } + + #[test] + fn test_cache_hint_force_cache() { + use crate::backend::ShardedTables; + + let schema = ShardingSchema { + shards: 2, + tables: ShardedTables::new(vec![], vec![], false, SystemCatalogsBehavior::default()), + ..Default::default() + }; + + let query = "SELECT * FROM users /* pgdog_cache: force_cache */"; + let result = comment(query, &schema).unwrap(); + assert!(matches!( + result.2, + Some(CacheDirective::ForceCache { ttl_seconds: None }) + )); + } + + #[test] + fn test_cache_hint_force_cache_with_ttl() { + use crate::backend::ShardedTables; + + let schema = ShardingSchema { + shards: 2, + tables: ShardedTables::new(vec![], vec![], false, SystemCatalogsBehavior::default()), + ..Default::default() + }; + + let query = "SELECT * FROM users /* pgdog_cache: force_cache ttl=60 */"; + let result = comment(query, &schema).unwrap(); + assert!(matches!( + result.2, + Some(CacheDirective::ForceCache { + ttl_seconds: Some(60) + }) + )); + } } diff --git a/pgdog/src/net/messages/hello.rs b/pgdog/src/net/messages/hello.rs index 84f901b06..5c989221b 100644 --- a/pgdog/src/net/messages/hello.rs +++ b/pgdog/src/net/messages/hello.rs @@ -60,7 +60,7 @@ impl Startup { } else if name == "options" { let kvs = value.split("-c"); for kv in kvs { - let mut nvs = kv.split("="); + let mut nvs = kv.splitn(2, "="); let name = nvs.next(); let value = nvs.next(); diff --git a/pgdog/src/net/parameter.rs b/pgdog/src/net/parameter.rs index 1502d0397..4dd0c6114 100644 --- a/pgdog/src/net/parameter.rs +++ b/pgdog/src/net/parameter.rs @@ -33,6 +33,7 @@ static UNTRACKED_PARAMS: Lazy> = Lazy::new(|| { String::from("pgdog.role"), String::from("pgdog.shard"), String::from("pgdog.sharding_key"), + String::from("pgdog.cache"), ]) });