From 05553f343e243cc5d309d74acbdd990521b35390 Mon Sep 17 00:00:00 2001 From: Mikita Hradovich Date: Fri, 15 May 2026 19:26:13 +0200 Subject: [PATCH] feat: add EndPoint.resolveAll() for multi-address DNS expansion (DRIVER-201) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses the endpoint-API aspect of DRIVER-201. Problem: EndPoint.resolve() returns a single SocketAddress. When a hostname maps to multiple IPs, the driver can only try the first one and fails with AllNodesFailedException if it is unreachable — the remaining IPs are invisible to the connection layer. Solution (per @dkropachev's architectural direction): - Deprecate EndPoint.resolve(). Add EndPoint.resolveAll() with a default implementation that wraps resolve() in a single-element array for backward compatibility with third-party implementations. - DefaultEndPoint.resolveAll(): if the stored InetSocketAddress is unresolved, calls InetAddress.getAllByName() to expand the hostname to all known IPs, returning one InetSocketAddress per IP. Falls back to the single-element unresolved address if DNS fails, so the connect attempt surfaces a descriptive error rather than returning empty. - SniEndPoint.resolveAll(): re-resolves the proxy hostname on each call and returns all A-records sorted by IP, enabling the caller to try each proxy address in sequence. - ClientRoutesEndPoint.resolveAll(): delegates to resolve() (single- address topology-monitor lookup) and wraps in a one-element array. - ChannelFactory.connect(): replaced endPoint.resolve() with endPoint.resolveAll(). Iterates through the returned candidates via tryNextCandidate(); on per-address failure logs and tries the next; only fails the overall resultFuture when all candidates are exhausted. Protocol-version negotiation (downgrade retries) is scoped to the same address via connectToAddress(), which is semantically correct. Tests: - DefaultEndPointTest: 3 new cases — already-resolved passthrough, unresolved hostname expansion, unresolvable hostname fallback. - SniEndPointTest: new class with cases for resolveAll() happy path, unresolvable host exception, and resolve() sanity check. - All 13 existing ChannelFactory tests continue to pass (LocalEndPoint uses the default single-element resolveAll() via the interface default). --- .../driver/api/core/metadata/EndPoint.java | 29 ++++- .../internal/core/channel/ChannelFactory.java | 113 ++++++++++++++++-- .../core/metadata/ClientRoutesEndPoint.java | 14 +++ .../core/metadata/DefaultEndPoint.java | 35 ++++++ .../internal/core/metadata/SniEndPoint.java | 28 +++++ .../core/metadata/DefaultEndPointTest.java | 37 ++++++ .../core/metadata/SniEndPointTest.java | 62 ++++++++++ 7 files changed, 307 insertions(+), 11 deletions(-) create mode 100644 core/src/test/java/com/datastax/oss/driver/internal/core/metadata/SniEndPointTest.java diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/metadata/EndPoint.java b/core/src/main/java/com/datastax/oss/driver/api/core/metadata/EndPoint.java index 530f2ad38ac..e5c229294ec 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/metadata/EndPoint.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/metadata/EndPoint.java @@ -18,28 +18,51 @@ package com.datastax.oss.driver.api.core.metadata; import edu.umd.cs.findbugs.annotations.NonNull; -import java.net.InetSocketAddress; import java.net.SocketAddress; /** * Encapsulates the information needed to open connections to a node. * *

By default, the driver assumes plain TCP connections, and this is just a wrapper around an - * {@link InetSocketAddress}. However, more complex deployment scenarios might use a custom + * {@link java.net.InetSocketAddress}. However, more complex deployment scenarios might use a custom * implementation that contains additional information; for example, if the nodes are accessed * through a proxy with SNI routing, an SNI server name is needed in addition to the proxy address. */ public interface EndPoint { /** - * Resolves this instance to a socket address. + * Resolves this instance to a single socket address. * *

This will be called each time the driver opens a new connection to the node. The returned * address cannot be null. + * + * @deprecated Use {@link #resolveAll()} instead. When a hostname maps to multiple IPs (e.g. in + * dynamic DNS environments) only one address is returned here, causing the driver to miss + * fallback IPs when the first one is unreachable. {@code resolveAll()} returns the full set. */ + @Deprecated @NonNull SocketAddress resolve(); + /** + * Resolves this instance to all known socket addresses. + * + *

This is called each time the driver opens a new connection to the node. For endpoints backed + * by a plain IP address the array contains exactly one element. For endpoints whose hostname + * resolves to multiple IPs (e.g. a DNS round-robin entry) all addresses are returned so that the + * driver can try each one in sequence and fall back gracefully when individual IPs are + * unreachable. + * + *

The default implementation wraps {@link #resolve()} and returns a single-element array. + * Implementations that can supply multiple addresses should override this method. + * + *

The returned array must not be null and must contain at least one element. + */ + @NonNull + default SocketAddress[] resolveAll() { + return new SocketAddress[] {resolve()}; + } + /** * Returns an alternate string representation for use in node-level metric names. * diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ChannelFactory.java b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ChannelFactory.java index 5ad8c111262..360f67fa70e 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ChannelFactory.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ChannelFactory.java @@ -219,14 +219,108 @@ private void connect( List attemptedVersions, CompletableFuture resultFuture) { - SocketAddress resolvedAddress; + SocketAddress[] candidates; try { - resolvedAddress = endPoint.resolve(); + candidates = endPoint.resolveAll(); } catch (Exception e) { resultFuture.completeExceptionally(e); return; } + tryNextCandidate( + endPoint, + shardingInfo, + shardId, + options, + nodeMetricUpdater, + currentVersion, + isNegotiating, + attemptedVersions, + resultFuture, + candidates, + 0); + } + + /** + * Iterates through the candidate addresses from {@link EndPoint#resolveAll()}. Tries each one in + * sequence; if an address fails for a reason other than protocol-version negotiation exhaustion, + * the next candidate is tried. Only when all candidates are exhausted is the overall {@code + * resultFuture} failed. + */ + private void tryNextCandidate( + EndPoint endPoint, + NodeShardingInfo shardingInfo, + Integer shardId, + DriverChannelOptions options, + NodeMetricUpdater nodeMetricUpdater, + ProtocolVersion currentVersion, + boolean isNegotiating, + List attemptedVersions, + CompletableFuture resultFuture, + SocketAddress[] candidates, + int index) { + + SocketAddress candidate = candidates[index]; + CompletableFuture perAddressFuture = new CompletableFuture<>(); + connectToAddress( + endPoint, + shardingInfo, + shardId, + options, + nodeMetricUpdater, + currentVersion, + isNegotiating, + attemptedVersions, + perAddressFuture, + candidate); + + perAddressFuture.whenComplete( + (channel, error) -> { + if (error == null) { + resultFuture.complete(channel); + } else if (index + 1 < candidates.length) { + LOG.debug( + "[{}] Failed to connect to {} ({}), trying next address", + logPrefix, + candidate, + error.getMessage()); + tryNextCandidate( + endPoint, + shardingInfo, + shardId, + options, + nodeMetricUpdater, + currentVersion, + isNegotiating, + attemptedVersions, + resultFuture, + candidates, + index + 1); + } else { + // Note: might be completed already if the failure happened in initializer() + resultFuture.completeExceptionally(error); + } + }); + } + + /** + * Performs a Netty bootstrap connect to a single, already-resolved address. Handles + * protocol-version negotiation (downgrade retries) internally, staying on the same address. Uses + * {@code perAddressFuture} so {@link #tryNextCandidate} can distinguish a per-address TCP failure + * (try the next IP) from a successful protocol handshake. + */ + private void connectToAddress( + EndPoint endPoint, + NodeShardingInfo shardingInfo, + Integer shardId, + DriverChannelOptions options, + NodeMetricUpdater nodeMetricUpdater, + ProtocolVersion currentVersion, + boolean isNegotiating, + List attemptedVersions, + CompletableFuture perAddressFuture, + SocketAddress resolvedAddress) { + NettyOptions nettyOptions = context.getNettyOptions(); Bootstrap bootstrap = @@ -235,7 +329,8 @@ private void connect( .channel(nettyOptions.channelClass()) .option(ChannelOption.ALLOCATOR, nettyOptions.allocator()) .handler( - initializer(endPoint, currentVersion, options, nodeMetricUpdater, resultFuture)); + initializer( + endPoint, currentVersion, options, nodeMetricUpdater, perAddressFuture)); nettyOptions.afterBootstrapInitialized(bootstrap); @@ -294,7 +389,7 @@ private void connect( ConsistencyLevel.LOCAL_QUORUM.name())); } } - resultFuture.complete(driverChannel); + perAddressFuture.complete(driverChannel); } else { Throwable error = connectFuture.cause(); if (error instanceof UnsupportedProtocolVersionException && isNegotiating) { @@ -307,7 +402,8 @@ private void connect( logPrefix, currentVersion, downgraded.get()); - connect( + // Stay on the same address for protocol-version downgrade retries. + connectToAddress( endPoint, shardingInfo, shardId, @@ -316,16 +412,17 @@ private void connect( downgraded.get(), true, attemptedVersions, - resultFuture); + perAddressFuture, + resolvedAddress); } else { - resultFuture.completeExceptionally( + perAddressFuture.completeExceptionally( UnsupportedProtocolVersionException.forNegotiation( endPoint, attemptedVersions)); } } else { // Note: might be completed already if the failure happened in initializer(), this is // fine - resultFuture.completeExceptionally(error); + perAddressFuture.completeExceptionally(error); } } }); diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/ClientRoutesEndPoint.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/ClientRoutesEndPoint.java index 15d825b2efc..246ae562379 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/ClientRoutesEndPoint.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/ClientRoutesEndPoint.java @@ -76,6 +76,20 @@ public SocketAddress resolve() { return fallbackEndPoint.resolve(); } + /** + * Returns all socket addresses for this endpoint. + * + *

Delegates to {@link #resolve()} to obtain the single address provided by the topology + * monitor (or the fallback endpoint), then returns it as a one-element array. The topology + * monitor resolves each node to exactly one address by design (via a per-host-id lookup), so + * multi-address expansion is not applicable here. + */ + @NonNull + @Override + public SocketAddress[] resolveAll() { + return new SocketAddress[] {resolve()}; + } + @Override public boolean equals(Object other) { if (other == this) { diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultEndPoint.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultEndPoint.java index 7ffbee8e4bb..93b1c458ea4 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultEndPoint.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultEndPoint.java @@ -20,7 +20,10 @@ import com.datastax.oss.driver.api.core.metadata.EndPoint; import edu.umd.cs.findbugs.annotations.NonNull; import java.io.Serializable; +import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.UnknownHostException; import java.util.Objects; public class DefaultEndPoint implements EndPoint, Serializable { @@ -41,6 +44,38 @@ public InetSocketAddress resolve() { return address; } + /** + * Returns all socket addresses for this endpoint. + * + *

If the stored address is unresolved (i.e. the driver was configured with {@code + * RESOLVE_CONTACT_POINTS=false} and the hostname has not been looked up yet), this method calls + * {@link InetAddress#getAllByName(String)} to expand the hostname to every IP it resolves to. + * Each resolved IP is returned as an {@link InetSocketAddress} with the same port as the + * original. If the hostname resolves to only one IP, or if the address is already resolved, a + * single-element array is returned. + * + *

If DNS resolution fails, falls back to a single-element array containing {@link #resolve()}. + */ + @NonNull + @Override + public SocketAddress[] resolveAll() { + if (!address.isUnresolved()) { + return new SocketAddress[] {address}; + } + try { + InetAddress[] all = InetAddress.getAllByName(address.getHostString()); + SocketAddress[] result = new SocketAddress[all.length]; + for (int i = 0; i < all.length; i++) { + result[i] = new InetSocketAddress(all[i], address.getPort()); + } + return result; + } catch (UnknownHostException e) { + // Fallback: return the single unresolved address; the connect attempt will fail with a + // descriptive error rather than silently returning an empty array. + return new SocketAddress[] {address}; + } + } + @Override public boolean equals(Object other) { if (other == this) { diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/SniEndPoint.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/SniEndPoint.java index ace4e82617d..01d0db9dad6 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/SniEndPoint.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/SniEndPoint.java @@ -22,6 +22,7 @@ import edu.umd.cs.findbugs.annotations.NonNull; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.net.UnknownHostException; import java.util.Arrays; import java.util.Comparator; @@ -72,6 +73,33 @@ public InetSocketAddress resolve() { } } + /** + * Returns all socket addresses for this SNI proxy endpoint. + * + *

Re-resolves the proxy hostname on each call and returns one {@link InetSocketAddress} per + * A-record, so that the driver can try every proxy IP in sequence if one is unreachable. + */ + @NonNull + @Override + public SocketAddress[] resolveAll() { + try { + InetAddress[] aRecords = InetAddress.getAllByName(proxyAddress.getHostName()); + if (aRecords.length == 0) { + throw new IllegalArgumentException( + "Could not resolve proxy address " + proxyAddress.getHostName()); + } + Arrays.sort(aRecords, IP_COMPARATOR); + SocketAddress[] result = new SocketAddress[aRecords.length]; + for (int i = 0; i < aRecords.length; i++) { + result[i] = new InetSocketAddress(aRecords[i], proxyAddress.getPort()); + } + return result; + } catch (UnknownHostException e) { + throw new IllegalArgumentException( + "Could not resolve proxy address " + proxyAddress.getHostName(), e); + } + } + @Override public boolean equals(Object other) { if (other == this) { diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/DefaultEndPointTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/DefaultEndPointTest.java index 7da8fb39415..8b7c60ff763 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/DefaultEndPointTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/DefaultEndPointTest.java @@ -21,6 +21,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.net.InetSocketAddress; +import java.net.SocketAddress; import org.junit.Test; public class DefaultEndPointTest { @@ -57,4 +58,40 @@ public void should_reject_null_address() { .isInstanceOf(NullPointerException.class) .hasMessage("address can't be null"); } + + @Test + public void resolve_all_returns_single_element_for_already_resolved_address() { + DefaultEndPoint endPoint = new DefaultEndPoint(new InetSocketAddress("127.0.0.1", 9042)); + SocketAddress[] all = endPoint.resolveAll(); + assertThat(all).hasSize(1); + assertThat(((InetSocketAddress) all[0]).isUnresolved()).isFalse(); + assertThat(((InetSocketAddress) all[0]).getHostString()).isEqualTo("127.0.0.1"); + } + + @Test + public void resolve_all_expands_unresolved_hostname_to_at_least_one_address() { + // localhost reliably resolves to at least 127.0.0.1 + DefaultEndPoint endPoint = + new DefaultEndPoint(InetSocketAddress.createUnresolved("localhost", 9042)); + SocketAddress[] all = endPoint.resolveAll(); + assertThat(all).isNotEmpty(); + for (SocketAddress addr : all) { + InetSocketAddress inet = (InetSocketAddress) addr; + assertThat(inet.isUnresolved()).isFalse(); + assertThat(inet.getPort()).isEqualTo(9042); + } + } + + @Test + public void resolve_all_falls_back_to_single_element_when_hostname_is_unresolvable() { + // Unresolvable hostname: resolveAll() must not throw; it returns the unresolved address. + DefaultEndPoint endPoint = + new DefaultEndPoint( + InetSocketAddress.createUnresolved("this-host-does-not-exist.invalid", 9042)); + SocketAddress[] all = endPoint.resolveAll(); + assertThat(all).hasSize(1); + // The fallback address is the original unresolved one. + assertThat(((InetSocketAddress) all[0]).getHostString()) + .isEqualTo("this-host-does-not-exist.invalid"); + } } diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/SniEndPointTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/SniEndPointTest.java new file mode 100644 index 00000000000..e2aecf06970 --- /dev/null +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/SniEndPointTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.driver.internal.core.metadata; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import org.junit.Test; + +public class SniEndPointTest { + + @Test + public void resolve_all_returns_all_proxy_addresses_for_resolvable_hostname() { + // localhost reliably resolves to at least one address + SniEndPoint endPoint = + new SniEndPoint(new InetSocketAddress("localhost", 9042), "test-server-name"); + SocketAddress[] all = endPoint.resolveAll(); + assertThat(all).isNotEmpty(); + for (SocketAddress addr : all) { + InetSocketAddress inet = (InetSocketAddress) addr; + assertThat(inet.isUnresolved()).isFalse(); + assertThat(inet.getPort()).isEqualTo(9042); + } + } + + @Test + public void resolve_all_throws_for_unresolvable_hostname() { + SniEndPoint endPoint = + new SniEndPoint( + new InetSocketAddress("this-host-does-not-exist.invalid", 9042), "test-server-name"); + assertThatThrownBy(endPoint::resolveAll) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Could not resolve proxy address"); + } + + @Test + public void resolve_returns_single_address_from_round_robin() { + // Sanity check: resolve() still works and returns a single address + SniEndPoint endPoint = + new SniEndPoint(new InetSocketAddress("localhost", 9042), "test-server-name"); + InetSocketAddress addr = endPoint.resolve(); + assertThat(addr.isUnresolved()).isFalse(); + assertThat(addr.getPort()).isEqualTo(9042); + } +}