Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions src/qdrant_client/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ pub struct QdrantConfig {
/// Whether to check compatibility between the client and server versions
pub check_compatibility: bool,

/// Whether to verify connectivity to the Qdrant server when building the client.
///
/// gRPC connections are established lazily, so by default building a client
/// succeeds even when the server is unreachable; connection errors only surface
/// on the first request. When enabled, [`build`](Self::build) performs a health
/// check and returns an error if the server cannot be reached.
pub check_connection: bool,

/// Amount of concurrent connections.
/// If set to 0 or 1, connection pools will be disabled.
pub pool_size: usize,
Expand Down Expand Up @@ -207,11 +215,37 @@ impl QdrantConfig {
self
}

/// Verify connectivity to the Qdrant server when building the client.
///
/// gRPC connections are established lazily, so [`build`](Self::build) normally
/// succeeds even when the server is unreachable; connection failures would only
/// be observed on the first API call. Enabling this makes `build` perform a
/// health check and return an error if the server cannot be reached.
///
/// ```rust,no_run
/// use qdrant_client::Qdrant;
///
/// let client = Qdrant::from_url("http://localhost:6334")
/// .check_connection()
/// .build();
/// ```
pub fn check_connection(mut self) -> Self {
self.check_connection = true;
self
}

/// Set the pool size of concurrent connections.
/// If set to 0 or 1, connection pools will be disabled.
pub fn set_pool_size(&mut self, pool_size: usize) {
self.pool_size = pool_size;
}

/// Set whether to verify connectivity to the server when building the client.
///
/// Also see [`check_connection()`](fn@Self::check_connection).
pub fn set_check_connection(&mut self, check_connection: bool) {
self.check_connection = check_connection;
}
}

/// Default Qdrant client configuration.
Expand All @@ -227,6 +261,7 @@ impl Default for QdrantConfig {
api_key: None,
compression: None,
check_compatibility: true,
check_connection: false,
pool_size: 3,
custom_headers: Vec::new(),
}
Expand Down
86 changes: 65 additions & 21 deletions src/qdrant_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,22 +99,22 @@ impl Qdrant {
///
/// Constructs the client and connects based on the given [`QdrantConfig`](config::QdrantConfig).
pub fn new(config: QdrantConfig) -> QdrantResult<Self> {
if config.check_compatibility {
// create a temporary client to check compatibility
if config.check_compatibility || config.check_connection {
// create a temporary client to check connectivity and/or compatibility
let channel = ChannelPool::new(
config.uri.parse::<Uri>()?,
config.timeout,
config.connect_timeout,
config.keep_alive_while_idle,
1, // No need to create a pool for the compatibility check.
1, // No need to create a pool for the health check.
);
let client = Self {
channel: Arc::new(channel),
config: config.clone(),
};

// We're in sync context, spawn temporary runtime in thread to do async health check
let server_version = thread::scope(|s| {
let health_check = thread::scope(|s| {
s.spawn(|| {
tokio::runtime::Builder::new_current_thread()
.enable_io()
Expand All @@ -125,24 +125,36 @@ impl Qdrant {
})
.join()
.expect("Failed to join health check thread")
})
.ok()
.map(|info| info.version);

let client_version = env!("CARGO_PKG_VERSION").to_string();
if let Some(server_version) = server_version {
let is_compatible = is_compatible(Some(&client_version), Some(&server_version));
if !is_compatible {
println!("Client version {client_version} is not compatible with server version {server_version}. \
Major versions should match and minor version difference must not exceed 1. \
Set check_compatibility=false to skip version check.");
});

// When connection checking is requested, surface connection errors eagerly
// instead of silently deferring them to the first API call.
let server_version = match health_check {
Ok(info) => Some(info.version),
Err(err) => {
if config.check_connection {
return Err(err);
}
None
}
};

if config.check_compatibility {
let client_version = env!("CARGO_PKG_VERSION").to_string();
if let Some(server_version) = server_version {
let is_compatible = is_compatible(Some(&client_version), Some(&server_version));
if !is_compatible {
println!("Client version {client_version} is not compatible with server version {server_version}. \
Major versions should match and minor version difference must not exceed 1. \
Set check_compatibility=false to skip version check.");
}
} else {
println!(
"Failed to obtain server version. \
Unable to check client-server compatibility. \
Set check_compatibility=false to skip version check."
);
}
} else {
println!(
"Failed to obtain server version. \
Unable to check client-server compatibility. \
Set check_compatibility=false to skip version check."
);
}
}

Expand Down Expand Up @@ -253,3 +265,35 @@ impl Qdrant {
.await
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;

use super::*;

// Nothing listens on this port, so a connection attempt is refused quickly;
// these tests do not require a running Qdrant server.
const UNREACHABLE_URL: &str = "http://127.0.0.1:6999";

#[test]
fn build_without_connection_check_succeeds_when_server_is_down() {
// gRPC connects lazily; without a connection check, building the client
// must succeed even when no server is reachable.
let client = Qdrant::from_url(UNREACHABLE_URL)
.skip_compatibility_check()
.build();
assert!(client.is_ok());
}

#[test]
fn build_with_connection_check_fails_when_server_is_down() {
// With connection checking enabled, building must fail eagerly when the
// server is unreachable (issue #258).
let result = Qdrant::from_url(UNREACHABLE_URL)
.check_connection()
.connect_timeout(Duration::from_secs(1))
.build();
assert!(result.is_err());
}
}