From 22bad213b14bf087e7be6746d77f89411de37f00 Mon Sep 17 00:00:00 2001 From: Mitchell Herrijgers Date: Thu, 12 Feb 2026 13:09:24 +0000 Subject: [PATCH 1/2] Add clientId header to all GRPC calls While implementing the license work for rate limiting and connection limiting, we found it's necessary to uniquely identify an instance of an application. There are, however two problems in the current codebase: 1. Some GRPC calls have a clientId as body in the message, but not all of them. For example, appending events does not have a clientId. 2. Users can override the clientId in the AxonServer properties of the framework, making multiple applications count as the same. This PR introduces the `AxonIQ-ClientId` header on every GRPC call, so Axon Server can always uniquely identify connections. In addition, the AxonServerConnectionFactory now postfixes the `clientId` with 8 random characters to enforce uniqueness. Axon Framework applications have only one factory per Axon Server cluster. This should make the limits enforceable. --- pom.xml | 2 +- .../AxonServerConnectionFactory.java | 50 +++++++++---------- .../connector/impl/ControlChannelImpl.java | 2 +- .../axonserver/connector/impl/Headers.java | 6 +++ .../impl/ControlChannelIntegrationTest.java | 2 +- 5 files changed, 32 insertions(+), 30 deletions(-) diff --git a/pom.xml b/pom.xml index 6919fb34..a25a25de 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ io.axoniq axonserver-connector-java - 2025.2.2-SNAPSHOT + 2026.0.0-SNAPSHOT AxonServer Connector diff --git a/src/main/java/io/axoniq/axonserver/connector/AxonServerConnectionFactory.java b/src/main/java/io/axoniq/axonserver/connector/AxonServerConnectionFactory.java index 81a3d329..ab77d3b2 100644 --- a/src/main/java/io/axoniq/axonserver/connector/AxonServerConnectionFactory.java +++ b/src/main/java/io/axoniq/axonserver/connector/AxonServerConnectionFactory.java @@ -90,13 +90,14 @@ public class AxonServerConnectionFactory { private volatile boolean shutdown; /** - * Instantiates an {@link AxonServerConnectionFactory} with the given {@code builder}. + * Instantiates an {@link AxonServerConnectionFactory} with the given {@code builder}. The clientInstanceId is + * postfixed by a random hex string to ensure uniqueness between different instances of the application. * * @param builder the {@link Builder} used to set all the specifics of an {@link AxonServerConnectionFactory} */ protected AxonServerConnectionFactory(Builder builder) { this.componentName = builder.componentName; - this.clientInstanceId = builder.clientInstanceId; + this.clientInstanceId = builder.clientInstanceId + "-" + randomHex(8); this.token = builder.token; this.tags.putAll(builder.tags); this.executorService = builder.executorService; @@ -121,7 +122,6 @@ protected AxonServerConnectionFactory(Builder builder) { * information and should be the same only for instances of the same application or component. * * @param componentName The name of the component connecting to AxonServer - * * @return a builder instance for further configuration of the connector * @see #forClient(String, String) */ @@ -131,8 +131,8 @@ public static Builder forClient(String componentName) { /** - * Returns a builder to configure a ConnectionFactory instance for the given {@code componentName} and {@code - * clientInstanceId}. + * Returns a builder to configure a ConnectionFactory instance for the given {@code componentName} and + * {@code clientInstanceId}. *

* The clientInstanceId MUST be a unique value across all instances that connect to AxonServer. The componentName is * used in monitoring information and should be the same only for instances of the same application or component. @@ -142,7 +142,6 @@ public static Builder forClient(String componentName) { * * @param componentName The name of the component connecting to AxonServer * @param clientInstanceId The unique instance identifier for this instance of the component - * * @return a builder instance for further configuration of the connector * @see #forClient(String) */ @@ -154,7 +153,6 @@ public static Builder forClient(String componentName, String clientInstanceId) { * Connects to the given {@code context} using the settings defined in this ConnectionFactory. * * @param context The name of the context to connect to - * * @return a Connection allowing interaction with the mentioned context */ public AxonServerConnection connect(String context) { @@ -206,7 +204,8 @@ private ManagedChannel createChannel(ServerAddress address, String context) { return builder.intercept( new GrpcBufferingInterceptor(50), new HeaderAttachingInterceptor<>(Headers.CONTEXT, context), - new HeaderAttachingInterceptor<>(Headers.ACCESS_TOKEN, token) + new HeaderAttachingInterceptor<>(Headers.ACCESS_TOKEN, token), + new HeaderAttachingInterceptor<>(Headers.CLIENT_ID, clientInstanceId) ).build(); } @@ -234,12 +233,24 @@ public void shutdown() { } } + /** + * Returns the unique client instance identifier used by this ConnectionFactory. This is the value configured as + * {@code clientInstanceId} on the Builder, postfixed with a random hex string to ensure uniqueness between + * different instances of the application. + * + * @return the unique client instance identifier used by this ConnectionFactory + */ + public String getClientInstanceId() { + return clientInstanceId; + } + /** * Builder for AxonServerConnectionFactory instances. The methods on this class allow for configuration of the * {@link AxonServerConnectionFactory} instance used to connect to an AxonServer (cluster). *

- * This class is not intended to be instantiated directly, but rather through {@link - * AxonServerConnectionFactory#forClient(String)} or {@link AxonServerConnectionFactory#forClient(String, String)}. + * This class is not intended to be instantiated directly, but rather through + * {@link AxonServerConnectionFactory#forClient(String)} or + * {@link AxonServerConnectionFactory#forClient(String, String)}. */ public static class Builder { @@ -281,7 +292,6 @@ protected Builder(String componentName, String clientInstanceId) { * Defaults to "localhost:8024". * * @param serverAddresses The addresses to try to set up the initial connection with. - * * @return this builder for further configuration */ public Builder routingServers(ServerAddress... serverAddresses) { @@ -297,7 +307,6 @@ public Builder routingServers(ServerAddress... serverAddresses) { * * @param interval The amount of time to wait in between connection attempts * @param timeUnit The unit in which the interval is expressed - * * @return this builder for further configuration */ public Builder reconnectInterval(long interval, TimeUnit timeUnit) { @@ -312,7 +321,6 @@ public Builder reconnectInterval(long interval, TimeUnit timeUnit) { * * @param timeout The amount of time to wait for a connection to be established * @param timeUnit The unit in which the timout is expressed - * * @return this builder for further configuration */ public Builder connectTimeout(long timeout, TimeUnit timeUnit) { @@ -330,7 +338,6 @@ public Builder connectTimeout(long timeout, TimeUnit timeUnit) { * By default, no tags are defined. * * @param additionalClientTags additional tags that define this client component - * * @return this builder for further configuration */ public Builder clientTags(Map additionalClientTags) { @@ -349,7 +356,6 @@ public Builder clientTags(Map additionalClientTags) { * * @param key the key of the Tag to configure * @param value the value of the Tag to configure - * * @return this builder for further configuration */ public Builder clientTag(String key, String value) { @@ -364,7 +370,6 @@ public Builder clientTag(String key, String value) { * AxonServer. * * @param token The token to which the required authorizations have been assigned. - * * @return this builder for further configuration */ public Builder token(String token) { @@ -391,7 +396,6 @@ public Builder useTransportSecurity() { * Defaults to not using TLS. * * @param sslContext The context defining TLS parameters - * * @return this builder for further configuration * @see SslContextBuilder#forClient() */ @@ -403,14 +407,13 @@ public Builder useTransportSecurity(SslContext sslContext) { /** * Indicates whether the connector should always reconnect via the Routing Servers. When {@code true} (default), * the connector will contact the Routing Servers for a new destination each time a connection is dropped. When - * {@code false}, the connector will first attempt to re-establish a connection to the node is was - * previously connected to. When that fails, only then will it contact the Routing Servers. + * {@code false}, the connector will first attempt to re-establish a connection to the node is was previously + * connected to. When that fails, only then will it contact the Routing Servers. *

* Default to {@code true}, forcing the failed connection to be abandoned and a new one to be requested via the * routing servers. * * @param forceReconnectViaRoutingServers whether to force a reconnect to the Cluster via the RoutingServers. - * * @return this builder for further configuration */ public Builder forceReconnectViaRoutingServers(boolean forceReconnectViaRoutingServers) { @@ -426,7 +429,6 @@ public Builder forceReconnectViaRoutingServers(boolean forceReconnectViaRoutingS * Defaults to 2. * * @param poolSize The number of threads to assign to Connection related activities. - * * @return this builder for further configuration */ public Builder threadPoolSize(int poolSize) { @@ -449,7 +451,6 @@ public Builder threadPoolSize(int poolSize) { * @param interval time without read activity before sending a keepalive ping * @param timeout the time waiting for read activity after sending a keepalive ping * @param timeUnit the unit in which the interval and timeout are expressed - * * @return this builder for further configuration */ public Builder usingKeepAlive(long interval, long timeout, TimeUnit timeUnit, boolean keepAliveWithoutCalls) { @@ -466,7 +467,6 @@ public Builder usingKeepAlive(long interval, long timeout, TimeUnit timeUnit, bo * Default to 4 MiB. * * @param bytes The number of bytes to limit inbound message to - * * @return this builder for further configuration */ public Builder maxInboundMessageSize(int bytes) { @@ -482,7 +482,6 @@ public Builder maxInboundMessageSize(int bytes) { * feature. * * @param customization A function defining the customization to make on the ManagedChannelBuilder - * * @return this builder for further configuration */ public Builder customize(UnaryOperator> customization) { @@ -496,7 +495,6 @@ public Builder customize(UnaryOperator> customization) * * @param interval The interval in which to send status updates * @param unit The unit of time in which the interval is expressed - * * @return this builder for further configuration */ public Builder processorInfoUpdateFrequency(long interval, TimeUnit unit) { @@ -511,7 +509,6 @@ public Builder processorInfoUpdateFrequency(long interval, TimeUnit unit) { * Values lower than 16 will be replaced with 16. * * @param permits The number of initial permits - * * @return this builder for further configuration */ public Builder queryPermits(int permits) { @@ -526,7 +523,6 @@ public Builder queryPermits(int permits) { * Values lower than 16 will be replaced with 16. * * @param permits The number of initial permits - * * @return this builder for further configuration */ public Builder commandPermits(int permits) { diff --git a/src/main/java/io/axoniq/axonserver/connector/impl/ControlChannelImpl.java b/src/main/java/io/axoniq/axonserver/connector/impl/ControlChannelImpl.java index eb5a36e1..26407014 100644 --- a/src/main/java/io/axoniq/axonserver/connector/impl/ControlChannelImpl.java +++ b/src/main/java/io/axoniq/axonserver/connector/impl/ControlChannelImpl.java @@ -193,7 +193,7 @@ public synchronized void connect() { responseObserver.getInstructionsForPlatform(); try { - logger.info("Connected instruction stream for context '{}'. Sending client identification", context); + logger.info("Connected instruction stream for context '{}'. Sending client identification with clientId {}", context, clientIdentification.getClientId()); instructionsForPlatform.onNext(PlatformInboundInstruction.newBuilder() .setRegister(clientIdentification) .build()); diff --git a/src/main/java/io/axoniq/axonserver/connector/impl/Headers.java b/src/main/java/io/axoniq/axonserver/connector/impl/Headers.java index 9d79e045..64598a24 100644 --- a/src/main/java/io/axoniq/axonserver/connector/impl/Headers.java +++ b/src/main/java/io/axoniq/axonserver/connector/impl/Headers.java @@ -35,6 +35,12 @@ public abstract class Headers { public static final Metadata.Key ACCESS_TOKEN = Metadata.Key.of("AxonIQ-Access-Token", Metadata.ASCII_STRING_MARSHALLER); + /** + * A {@link Metadata.Key} defining the client ID of the application sending this message. + */ + public static final Metadata.Key CLIENT_ID = + Metadata.Key.of("AxonIQ-ClientId", Metadata.ASCII_STRING_MARSHALLER); + private Headers() { // Utility class } diff --git a/src/test/java/io/axoniq/axonserver/connector/impl/ControlChannelIntegrationTest.java b/src/test/java/io/axoniq/axonserver/connector/impl/ControlChannelIntegrationTest.java index 20dba825..dca6bd08 100644 --- a/src/test/java/io/axoniq/axonserver/connector/impl/ControlChannelIntegrationTest.java +++ b/src/test/java/io/axoniq/axonserver/connector/impl/ControlChannelIntegrationTest.java @@ -284,7 +284,7 @@ void moveSegmentInstructionIsPickedUpByHandler() throws Exception { String segmentToMove = "0"; String segmentsPath = "/v1/components/" + getClass().getSimpleName() + "/processors/testProcessor/segments/" + - segmentToMove + "/move?tokenStoreIdentifier=TokenStoreId&context=default&target=foo"; + segmentToMove + "/move?tokenStoreIdentifier=TokenStoreId&context=default&target=" + clientToMoveTo.getClientInstanceId(); assertWithin(2, TimeUnit.SECONDS, () -> sendToAxonServer(HttpPatch::new, segmentsPath)); assertWithin(2, TimeUnit.SECONDS, () -> assertTrue(instructionHandler.instructions.contains("release" + segmentToMove))); From 368c9473d6be9cb17e3cbfe0b0babfe9fa005c3a Mon Sep 17 00:00:00 2001 From: Mitchell Herrijgers Date: Thu, 12 Feb 2026 15:12:36 +0000 Subject: [PATCH 2/2] Don't fail on javadoc --- pom.xml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pom.xml b/pom.xml index a25a25de..7cb5917c 100644 --- a/pom.xml +++ b/pom.xml @@ -422,6 +422,8 @@ UTF-8 + false + false apiNote