apiNote
diff --git a/src/main/java/io/axoniq/axonserver/connector/AxonServerConnectionFactory.java b/src/main/java/io/axoniq/axonserver/connector/AxonServerConnectionFactory.java
index 81a3d329..ab77d3b2 100644
--- a/src/main/java/io/axoniq/axonserver/connector/AxonServerConnectionFactory.java
+++ b/src/main/java/io/axoniq/axonserver/connector/AxonServerConnectionFactory.java
@@ -90,13 +90,14 @@ public class AxonServerConnectionFactory {
private volatile boolean shutdown;
/**
- * Instantiates an {@link AxonServerConnectionFactory} with the given {@code builder}.
+ * Instantiates an {@link AxonServerConnectionFactory} with the given {@code builder}. The clientInstanceId is
+ * postfixed by a random hex string to ensure uniqueness between different instances of the application.
*
* @param builder the {@link Builder} used to set all the specifics of an {@link AxonServerConnectionFactory}
*/
protected AxonServerConnectionFactory(Builder builder) {
this.componentName = builder.componentName;
- this.clientInstanceId = builder.clientInstanceId;
+ this.clientInstanceId = builder.clientInstanceId + "-" + randomHex(8);
this.token = builder.token;
this.tags.putAll(builder.tags);
this.executorService = builder.executorService;
@@ -121,7 +122,6 @@ protected AxonServerConnectionFactory(Builder builder) {
* information and should be the same only for instances of the same application or component.
*
* @param componentName The name of the component connecting to AxonServer
- *
* @return a builder instance for further configuration of the connector
* @see #forClient(String, String)
*/
@@ -131,8 +131,8 @@ public static Builder forClient(String componentName) {
/**
- * Returns a builder to configure a ConnectionFactory instance for the given {@code componentName} and {@code
- * clientInstanceId}.
+ * Returns a builder to configure a ConnectionFactory instance for the given {@code componentName} and
+ * {@code clientInstanceId}.
*
* The clientInstanceId MUST be a unique value across all instances that connect to AxonServer. The componentName is
* used in monitoring information and should be the same only for instances of the same application or component.
@@ -142,7 +142,6 @@ public static Builder forClient(String componentName) {
*
* @param componentName The name of the component connecting to AxonServer
* @param clientInstanceId The unique instance identifier for this instance of the component
- *
* @return a builder instance for further configuration of the connector
* @see #forClient(String)
*/
@@ -154,7 +153,6 @@ public static Builder forClient(String componentName, String clientInstanceId) {
* Connects to the given {@code context} using the settings defined in this ConnectionFactory.
*
* @param context The name of the context to connect to
- *
* @return a Connection allowing interaction with the mentioned context
*/
public AxonServerConnection connect(String context) {
@@ -206,7 +204,8 @@ private ManagedChannel createChannel(ServerAddress address, String context) {
return builder.intercept(
new GrpcBufferingInterceptor(50),
new HeaderAttachingInterceptor<>(Headers.CONTEXT, context),
- new HeaderAttachingInterceptor<>(Headers.ACCESS_TOKEN, token)
+ new HeaderAttachingInterceptor<>(Headers.ACCESS_TOKEN, token),
+ new HeaderAttachingInterceptor<>(Headers.CLIENT_ID, clientInstanceId)
).build();
}
@@ -234,12 +233,24 @@ public void shutdown() {
}
}
+ /**
+ * Returns the unique client instance identifier used by this ConnectionFactory. This is the value configured as
+ * {@code clientInstanceId} on the Builder, postfixed with a random hex string to ensure uniqueness between
+ * different instances of the application.
+ *
+ * @return the unique client instance identifier used by this ConnectionFactory
+ */
+ public String getClientInstanceId() {
+ return clientInstanceId;
+ }
+
/**
* Builder for AxonServerConnectionFactory instances. The methods on this class allow for configuration of the
* {@link AxonServerConnectionFactory} instance used to connect to an AxonServer (cluster).
*
- * This class is not intended to be instantiated directly, but rather through {@link
- * AxonServerConnectionFactory#forClient(String)} or {@link AxonServerConnectionFactory#forClient(String, String)}.
+ * This class is not intended to be instantiated directly, but rather through
+ * {@link AxonServerConnectionFactory#forClient(String)} or
+ * {@link AxonServerConnectionFactory#forClient(String, String)}.
*/
public static class Builder {
@@ -281,7 +292,6 @@ protected Builder(String componentName, String clientInstanceId) {
* Defaults to "localhost:8024".
*
* @param serverAddresses The addresses to try to set up the initial connection with.
- *
* @return this builder for further configuration
*/
public Builder routingServers(ServerAddress... serverAddresses) {
@@ -297,7 +307,6 @@ public Builder routingServers(ServerAddress... serverAddresses) {
*
* @param interval The amount of time to wait in between connection attempts
* @param timeUnit The unit in which the interval is expressed
- *
* @return this builder for further configuration
*/
public Builder reconnectInterval(long interval, TimeUnit timeUnit) {
@@ -312,7 +321,6 @@ public Builder reconnectInterval(long interval, TimeUnit timeUnit) {
*
* @param timeout The amount of time to wait for a connection to be established
* @param timeUnit The unit in which the timout is expressed
- *
* @return this builder for further configuration
*/
public Builder connectTimeout(long timeout, TimeUnit timeUnit) {
@@ -330,7 +338,6 @@ public Builder connectTimeout(long timeout, TimeUnit timeUnit) {
* By default, no tags are defined.
*
* @param additionalClientTags additional tags that define this client component
- *
* @return this builder for further configuration
*/
public Builder clientTags(Map additionalClientTags) {
@@ -349,7 +356,6 @@ public Builder clientTags(Map additionalClientTags) {
*
* @param key the key of the Tag to configure
* @param value the value of the Tag to configure
- *
* @return this builder for further configuration
*/
public Builder clientTag(String key, String value) {
@@ -364,7 +370,6 @@ public Builder clientTag(String key, String value) {
* AxonServer.
*
* @param token The token to which the required authorizations have been assigned.
- *
* @return this builder for further configuration
*/
public Builder token(String token) {
@@ -391,7 +396,6 @@ public Builder useTransportSecurity() {
* Defaults to not using TLS.
*
* @param sslContext The context defining TLS parameters
- *
* @return this builder for further configuration
* @see SslContextBuilder#forClient()
*/
@@ -403,14 +407,13 @@ public Builder useTransportSecurity(SslContext sslContext) {
/**
* Indicates whether the connector should always reconnect via the Routing Servers. When {@code true} (default),
* the connector will contact the Routing Servers for a new destination each time a connection is dropped. When
- * {@code false}, the connector will first attempt to re-establish a connection to the node is was
- * previously connected to. When that fails, only then will it contact the Routing Servers.
+ * {@code false}, the connector will first attempt to re-establish a connection to the node is was previously
+ * connected to. When that fails, only then will it contact the Routing Servers.
*
* Default to {@code true}, forcing the failed connection to be abandoned and a new one to be requested via the
* routing servers.
*
* @param forceReconnectViaRoutingServers whether to force a reconnect to the Cluster via the RoutingServers.
- *
* @return this builder for further configuration
*/
public Builder forceReconnectViaRoutingServers(boolean forceReconnectViaRoutingServers) {
@@ -426,7 +429,6 @@ public Builder forceReconnectViaRoutingServers(boolean forceReconnectViaRoutingS
* Defaults to 2.
*
* @param poolSize The number of threads to assign to Connection related activities.
- *
* @return this builder for further configuration
*/
public Builder threadPoolSize(int poolSize) {
@@ -449,7 +451,6 @@ public Builder threadPoolSize(int poolSize) {
* @param interval time without read activity before sending a keepalive ping
* @param timeout the time waiting for read activity after sending a keepalive ping
* @param timeUnit the unit in which the interval and timeout are expressed
- *
* @return this builder for further configuration
*/
public Builder usingKeepAlive(long interval, long timeout, TimeUnit timeUnit, boolean keepAliveWithoutCalls) {
@@ -466,7 +467,6 @@ public Builder usingKeepAlive(long interval, long timeout, TimeUnit timeUnit, bo
* Default to 4 MiB.
*
* @param bytes The number of bytes to limit inbound message to
- *
* @return this builder for further configuration
*/
public Builder maxInboundMessageSize(int bytes) {
@@ -482,7 +482,6 @@ public Builder maxInboundMessageSize(int bytes) {
* feature.
*
* @param customization A function defining the customization to make on the ManagedChannelBuilder
- *
* @return this builder for further configuration
*/
public Builder customize(UnaryOperator> customization) {
@@ -496,7 +495,6 @@ public Builder customize(UnaryOperator> customization)
*
* @param interval The interval in which to send status updates
* @param unit The unit of time in which the interval is expressed
- *
* @return this builder for further configuration
*/
public Builder processorInfoUpdateFrequency(long interval, TimeUnit unit) {
@@ -511,7 +509,6 @@ public Builder processorInfoUpdateFrequency(long interval, TimeUnit unit) {
* Values lower than 16 will be replaced with 16.
*
* @param permits The number of initial permits
- *
* @return this builder for further configuration
*/
public Builder queryPermits(int permits) {
@@ -526,7 +523,6 @@ public Builder queryPermits(int permits) {
* Values lower than 16 will be replaced with 16.
*
* @param permits The number of initial permits
- *
* @return this builder for further configuration
*/
public Builder commandPermits(int permits) {
diff --git a/src/main/java/io/axoniq/axonserver/connector/impl/ControlChannelImpl.java b/src/main/java/io/axoniq/axonserver/connector/impl/ControlChannelImpl.java
index eb5a36e1..26407014 100644
--- a/src/main/java/io/axoniq/axonserver/connector/impl/ControlChannelImpl.java
+++ b/src/main/java/io/axoniq/axonserver/connector/impl/ControlChannelImpl.java
@@ -193,7 +193,7 @@ public synchronized void connect() {
responseObserver.getInstructionsForPlatform();
try {
- logger.info("Connected instruction stream for context '{}'. Sending client identification", context);
+ logger.info("Connected instruction stream for context '{}'. Sending client identification with clientId {}", context, clientIdentification.getClientId());
instructionsForPlatform.onNext(PlatformInboundInstruction.newBuilder()
.setRegister(clientIdentification)
.build());
diff --git a/src/main/java/io/axoniq/axonserver/connector/impl/Headers.java b/src/main/java/io/axoniq/axonserver/connector/impl/Headers.java
index 9d79e045..64598a24 100644
--- a/src/main/java/io/axoniq/axonserver/connector/impl/Headers.java
+++ b/src/main/java/io/axoniq/axonserver/connector/impl/Headers.java
@@ -35,6 +35,12 @@ public abstract class Headers {
public static final Metadata.Key ACCESS_TOKEN =
Metadata.Key.of("AxonIQ-Access-Token", Metadata.ASCII_STRING_MARSHALLER);
+ /**
+ * A {@link Metadata.Key} defining the client ID of the application sending this message.
+ */
+ public static final Metadata.Key CLIENT_ID =
+ Metadata.Key.of("AxonIQ-ClientId", Metadata.ASCII_STRING_MARSHALLER);
+
private Headers() {
// Utility class
}
diff --git a/src/test/java/io/axoniq/axonserver/connector/impl/ControlChannelIntegrationTest.java b/src/test/java/io/axoniq/axonserver/connector/impl/ControlChannelIntegrationTest.java
index 20dba825..dca6bd08 100644
--- a/src/test/java/io/axoniq/axonserver/connector/impl/ControlChannelIntegrationTest.java
+++ b/src/test/java/io/axoniq/axonserver/connector/impl/ControlChannelIntegrationTest.java
@@ -284,7 +284,7 @@ void moveSegmentInstructionIsPickedUpByHandler() throws Exception {
String segmentToMove = "0";
String segmentsPath = "/v1/components/" + getClass().getSimpleName() + "/processors/testProcessor/segments/" +
- segmentToMove + "/move?tokenStoreIdentifier=TokenStoreId&context=default&target=foo";
+ segmentToMove + "/move?tokenStoreIdentifier=TokenStoreId&context=default&target=" + clientToMoveTo.getClientInstanceId();
assertWithin(2, TimeUnit.SECONDS, () -> sendToAxonServer(HttpPatch::new, segmentsPath));
assertWithin(2, TimeUnit.SECONDS,
() -> assertTrue(instructionHandler.instructions.contains("release" + segmentToMove)));