diff --git a/crates/key-value-store/src/redis_impl.rs b/crates/key-value-store/src/redis_impl.rs index 9feb851..c0800eb 100644 --- a/crates/key-value-store/src/redis_impl.rs +++ b/crates/key-value-store/src/redis_impl.rs @@ -1,25 +1,28 @@ use crate::Store; use reactor::gcore::fastedge::key_value::{Error, Value}; +use redis::aio::ConnectionManager; use redis::{AsyncCommands, AsyncIter}; #[derive(Clone)] pub struct RedisStore { - inner: redis::aio::MultiplexedConnection, + inner: ConnectionManager, } impl RedisStore { + /// Open a store backed by `ConnectionManager`, which holds a multiplexed + /// connection and transparently reconnects with exponential backoff when + /// the underlying socket dies (e.g. broken pipe on Redis restart). The + /// command that hits the dead socket still surfaces as an error, but + /// follow-up calls land on the freshly re-established connection. pub async fn open(params: &str) -> Result { - let conn = ::redis::Client::open(params) - .map_err(|error| { - tracing::warn!(error=?error, "redis open"); - Error::InternalError - })? - .get_multiplexed_async_connection() - .await - .map_err(|error| { - tracing::warn!(error=?error, "redis open"); - Error::InternalError - })?; + let client = ::redis::Client::open(params).map_err(|error| { + tracing::warn!(error=?error, "redis open"); + Error::InternalError + })?; + let conn = ConnectionManager::new(client).await.map_err(|error| { + tracing::warn!(error=?error, "redis open"); + Error::InternalError + })?; Ok(Self { inner: conn }) } }