From 7956093b1143912fcbace73614dc99d4a664ff24 Mon Sep 17 00:00:00 2001 From: Ruslan Pislari Date: Fri, 8 May 2026 19:16:45 +0300 Subject: [PATCH] refactor: switch to `ConnectionManager` for RedisStore - Replaced `MultiplexedConnection` with `ConnectionManager` to improve connection handling. - Enhanced resilience with transparent reconnections and exponential backoff on failures. --- crates/key-value-store/src/redis_impl.rs | 27 +++++++++++++----------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/crates/key-value-store/src/redis_impl.rs b/crates/key-value-store/src/redis_impl.rs index 9feb851..c0800eb 100644 --- a/crates/key-value-store/src/redis_impl.rs +++ b/crates/key-value-store/src/redis_impl.rs @@ -1,25 +1,28 @@ use crate::Store; use reactor::gcore::fastedge::key_value::{Error, Value}; +use redis::aio::ConnectionManager; use redis::{AsyncCommands, AsyncIter}; #[derive(Clone)] pub struct RedisStore { - inner: redis::aio::MultiplexedConnection, + inner: ConnectionManager, } impl RedisStore { + /// Open a store backed by `ConnectionManager`, which holds a multiplexed + /// connection and transparently reconnects with exponential backoff when + /// the underlying socket dies (e.g. broken pipe on Redis restart). The + /// command that hits the dead socket still surfaces as an error, but + /// follow-up calls land on the freshly re-established connection. pub async fn open(params: &str) -> Result { - let conn = ::redis::Client::open(params) - .map_err(|error| { - tracing::warn!(error=?error, "redis open"); - Error::InternalError - })? - .get_multiplexed_async_connection() - .await - .map_err(|error| { - tracing::warn!(error=?error, "redis open"); - Error::InternalError - })?; + let client = ::redis::Client::open(params).map_err(|error| { + tracing::warn!(error=?error, "redis open"); + Error::InternalError + })?; + let conn = ConnectionManager::new(client).await.map_err(|error| { + tracing::warn!(error=?error, "redis open"); + Error::InternalError + })?; Ok(Self { inner: conn }) } }