diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractEventFilterBuilder.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractEventFilterBuilder.java
index 80bfba154..051cd02bf 100644
--- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractEventFilterBuilder.java
+++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractEventFilterBuilder.java
@@ -37,8 +37,11 @@ public SELF with(Consumer
c) {
}
public SELF correlate(String key, Consumer c) {
- throw new UnsupportedOperationException(
- "correlate is not supported in the engine level: https://github.com/serverlessworkflow/sdk-java/issues/1206");
+ ListenTaskBuilder.CorrelatePropertyBuilder cb =
+ new ListenTaskBuilder.CorrelatePropertyBuilder();
+ c.accept(cb);
+ correlate.setAdditionalProperty(key, cb.build());
+ return self();
}
public EventFilter build() {
diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/AbstractEventFilterSpec.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/AbstractEventFilterSpec.java
index 29799c801..61ee5b814 100644
--- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/AbstractEventFilterSpec.java
+++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/AbstractEventFilterSpec.java
@@ -17,6 +17,7 @@
import io.serverlessworkflow.fluent.spec.AbstractEventFilterBuilder;
import io.serverlessworkflow.fluent.spec.AbstractEventPropertiesBuilder;
+import io.serverlessworkflow.fluent.spec.AbstractListenTaskBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
@@ -41,13 +42,11 @@ protected List> getFilterSteps() {
return filterSteps;
}
- // TODO: "correlate is not supported in the engine level:
- // https://github.com/serverlessworkflow/sdk-java/issues/1206". Keeping the code for a future
- // reference.
- // public SELF correlate(String key, Consumer c) {
- // filterSteps.add(f -> f.correlate(key, c));
- // return self();
- // }
+ public SELF correlate(
+ String key, Consumer c) {
+ addFilterStep(f -> f.correlate(key, c));
+ return self();
+ }
@Override
public void accept(EVENT_FILTER filterBuilder) {
diff --git a/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/WorkflowBuilderTest.java b/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/WorkflowBuilderTest.java
index 32243f28d..c350392da 100644
--- a/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/WorkflowBuilderTest.java
+++ b/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/WorkflowBuilderTest.java
@@ -37,6 +37,7 @@
import io.serverlessworkflow.api.types.AuthenticationPolicyUnion;
import io.serverlessworkflow.api.types.CallHTTP;
import io.serverlessworkflow.api.types.CatchErrors;
+import io.serverlessworkflow.api.types.CorrelateProperty;
import io.serverlessworkflow.api.types.Document;
import io.serverlessworkflow.api.types.EmitEventDefinition;
import io.serverlessworkflow.api.types.EmitTask;
@@ -310,8 +311,12 @@ void testDoTaskListenOne() {
to ->
to.one(
f ->
- f.with(
- p -> p.type("com.fake.pet").source("mySource"))))))
+ f.with(p -> p.type("com.fake.pet").source("mySource"))
+ .correlate(
+ "orderId",
+ c ->
+ c.from("$.data.orderId")
+ .expect("$.input.orderId"))))))
.build();
List items = wf.getDo();
@@ -327,6 +332,10 @@ void testDoTaskListenOne() {
EventFilter filter = one.getOne();
assertNotNull(filter, "EventFilter should be present");
assertEquals("com.fake.pet", filter.getWith().getType(), "Filter type should match");
+ CorrelateProperty correlate = filter.getCorrelate().getAdditionalProperties().get("orderId");
+ assertNotNull(correlate, "Correlate property should be present");
+ assertEquals("$.data.orderId", correlate.getFrom(), "Correlate from should match");
+ assertEquals("$.input.orderId", correlate.getExpect(), "Correlate expect should match");
}
@Test
diff --git a/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/dsl/DSLTest.java b/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/dsl/DSLTest.java
index eba88fe7f..b78ce6da6 100644
--- a/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/dsl/DSLTest.java
+++ b/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/dsl/DSLTest.java
@@ -30,6 +30,7 @@
import static io.serverlessworkflow.fluent.spec.dsl.DSL.workflow;
import static org.assertj.core.api.Assertions.assertThat;
+import io.serverlessworkflow.api.types.CorrelateProperty;
import io.serverlessworkflow.api.types.HTTPArguments;
import io.serverlessworkflow.api.types.ListenTaskConfiguration;
import io.serverlessworkflow.api.types.RunTaskConfiguration;
@@ -166,7 +167,15 @@ public void when_listen_any_with_until() {
public void when_listen_one() {
Workflow wf =
WorkflowBuilder.workflow("f", "ns", "1")
- .tasks(t -> t.listen(to().one(event().type("only-once"))))
+ .tasks(
+ t ->
+ t.listen(
+ to().one(
+ event()
+ .type("only-once")
+ .correlate(
+ "workflowInstanceId",
+ c -> c.from("$.metadata.instanceId")))))
.build();
var to = wf.getDo().get(0).getTask().getListenTask().getListen().getTo();
@@ -178,6 +187,10 @@ public void when_listen_one() {
var one = to.getOneEventConsumptionStrategy().getOne();
assertThat(one.getWith()).isNotNull();
assertThat(one.getWith().getType()).isEqualTo("only-once");
+ CorrelateProperty correlate =
+ one.getCorrelate().getAdditionalProperties().get("workflowInstanceId");
+ assertThat(correlate).isNotNull();
+ assertThat(correlate.getFrom()).isEqualTo("$.metadata.instanceId");
}
@Test
diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/CompositeExpressionFactory.java b/impl/core/src/main/java/io/serverlessworkflow/impl/CompositeExpressionFactory.java
index c8ea51e15..a1e39e42f 100644
--- a/impl/core/src/main/java/io/serverlessworkflow/impl/CompositeExpressionFactory.java
+++ b/impl/core/src/main/java/io/serverlessworkflow/impl/CompositeExpressionFactory.java
@@ -49,6 +49,11 @@ public WorkflowValueResolver resolveString(ExpressionDescriptor desc) {
return processFactories(desc, f -> f.resolveString(desc));
}
+ @Override
+ public WorkflowValueResolver