diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEventFilterPropertiesBuilder.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEventFilterPropertiesBuilder.java index 3c5703a93..5bb8e64f7 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEventFilterPropertiesBuilder.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEventFilterPropertiesBuilder.java @@ -19,17 +19,18 @@ import io.cloudevents.CloudEventData; import io.serverlessworkflow.api.types.func.ContextPredicate; import io.serverlessworkflow.api.types.func.EventDataPredicate; +import io.serverlessworkflow.api.types.func.EventPropertiesPredicate; import io.serverlessworkflow.api.types.func.FilterPredicate; -import io.serverlessworkflow.api.types.func.TypedContextPredicate; -import io.serverlessworkflow.api.types.func.TypedFilterPredicate; -import io.serverlessworkflow.api.types.func.TypedPredicate; import io.serverlessworkflow.fluent.spec.AbstractEventPropertiesBuilder; -import io.serverlessworkflow.impl.events.DefaultCloudEventPredicate; import java.util.function.Predicate; public class FuncEventFilterPropertiesBuilder extends AbstractEventPropertiesBuilder { + public FuncEventFilterPropertiesBuilder() { + super(new EventPropertiesPredicate()); + } + @Override protected FuncEventFilterPropertiesBuilder self() { return this; @@ -54,23 +55,17 @@ public FuncEventFilterPropertiesBuilder data(FilterPredicate pre } public FuncEventFilterPropertiesBuilder envelope(Predicate predicate) { - this.eventProperties.setAdditionalProperty( - DefaultCloudEventPredicate.ENVELOPE_PREDICATE, - new TypedPredicate<>(predicate, CloudEvent.class)); + ((EventPropertiesPredicate) eventProperties).withPredicate(predicate, CloudEvent.class); return this; } public FuncEventFilterPropertiesBuilder envelope(ContextPredicate predicate) { - this.eventProperties.setAdditionalProperty( - DefaultCloudEventPredicate.ENVELOPE_PREDICATE, - new TypedContextPredicate<>(predicate, CloudEvent.class)); + ((EventPropertiesPredicate) eventProperties).withPredicate(predicate, CloudEvent.class); return this; } public FuncEventFilterPropertiesBuilder envelope(FilterPredicate predicate) { - this.eventProperties.setAdditionalProperty( - DefaultCloudEventPredicate.ENVELOPE_PREDICATE, - new TypedFilterPredicate<>(predicate, CloudEvent.class)); + ((EventPropertiesPredicate) eventProperties).withPredicate(predicate, CloudEvent.class); return this; } } diff --git a/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/EventPropertiesPredicate.java b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/EventPropertiesPredicate.java new file mode 100644 index 000000000..4c1b774d3 --- /dev/null +++ b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/EventPropertiesPredicate.java @@ -0,0 +1,62 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.api.types.func; + +import io.serverlessworkflow.api.types.EventProperties; +import java.util.Objects; +import java.util.function.Predicate; + +public class EventPropertiesPredicate extends EventProperties { + + private Object filterPredicate; + + public Object getFilterPredicate() { + return filterPredicate; + } + + public EventPropertiesPredicate withPredicate(Predicate predicate) { + this.filterPredicate = predicate; + return this; + } + + public EventPropertiesPredicate withPredicate(Predicate predicate, Class clazz) { + Objects.requireNonNull(clazz); + this.filterPredicate = new TypedPredicate<>(predicate, clazz); + return this; + } + + public EventPropertiesPredicate withPredicate(ContextPredicate predicate) { + this.filterPredicate = predicate; + return this; + } + + public EventPropertiesPredicate withPredicate(ContextPredicate predicate, Class clazz) { + Objects.requireNonNull(clazz); + this.filterPredicate = new TypedContextPredicate<>(predicate, clazz); + return this; + } + + public EventPropertiesPredicate withPredicate(FilterPredicate predicate) { + this.filterPredicate = predicate; + return this; + } + + public EventPropertiesPredicate withPredicate(FilterPredicate predicate, Class clazz) { + Objects.requireNonNull(clazz); + this.filterPredicate = new TypedFilterPredicate<>(predicate, clazz); + return this; + } +} diff --git a/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/FuncCloudEventPredicate.java b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/FuncCloudEventPredicate.java new file mode 100644 index 000000000..995129350 --- /dev/null +++ b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/FuncCloudEventPredicate.java @@ -0,0 +1,53 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.api.types.func; + +import io.cloudevents.CloudEvent; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModelFactory; +import io.serverlessworkflow.impl.WorkflowPredicate; +import io.serverlessworkflow.impl.events.CloudEventAttrPredicate; +import io.serverlessworkflow.impl.events.DefaultCloudEventPredicate; +import io.serverlessworkflow.impl.expressions.ExpressionDescriptor; + +public class FuncCloudEventPredicate extends DefaultCloudEventPredicate { + + private final CloudEventAttrPredicate envelopePredicate; + + public FuncCloudEventPredicate(EventPropertiesPredicate properties, WorkflowApplication app) { + super(properties, app); + Object envelopePredObj = properties.getFilterPredicate(); + this.envelopePredicate = + envelopePredObj == null + ? isTrue() + : fromCloudEvent( + app.modelFactory(), + app.expressionFactory() + .buildPredicate(new ExpressionDescriptor(null, envelopePredObj))); + } + + private CloudEventAttrPredicate fromCloudEvent( + WorkflowModelFactory workflowModelFactory, WorkflowPredicate filter) { + return (e, w, t) -> filter.test(w, t, workflowModelFactory.from(e)); + } + + @Override + public boolean test(CloudEvent event, WorkflowContext workflow, TaskContext task) { + return envelopePredicate.test(event, workflow, task) && super.test(event, workflow, task); + } +} diff --git a/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/FuncCloudEventPredicateFactory.java b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/FuncCloudEventPredicateFactory.java new file mode 100644 index 000000000..13ec465c9 --- /dev/null +++ b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/FuncCloudEventPredicateFactory.java @@ -0,0 +1,32 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.api.types.func; + +import io.serverlessworkflow.api.types.EventProperties; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.events.CloudEventPredicate; +import io.serverlessworkflow.impl.events.CloudEventPredicateFactory; +import io.serverlessworkflow.impl.events.DefaultCloudEventPredicate; + +public class FuncCloudEventPredicateFactory implements CloudEventPredicateFactory { + + @Override + public CloudEventPredicate build(WorkflowApplication appl, EventProperties props) { + return props instanceof EventPropertiesPredicate funcProps + ? new FuncCloudEventPredicate(funcProps, appl) + : new DefaultCloudEventPredicate(props, appl); + } +} diff --git a/experimental/types/src/main/resources/META-INF/services/io.serverlessworkflow.impl.events.CloudEventPredicateFactory b/experimental/types/src/main/resources/META-INF/services/io.serverlessworkflow.impl.events.CloudEventPredicateFactory new file mode 100644 index 000000000..4e2a7f991 --- /dev/null +++ b/experimental/types/src/main/resources/META-INF/services/io.serverlessworkflow.impl.events.CloudEventPredicateFactory @@ -0,0 +1 @@ +io.serverlessworkflow.api.types.func.FuncCloudEventPredicateFactory \ No newline at end of file diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractEventPropertiesBuilder.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractEventPropertiesBuilder.java index dfacd8176..62f50e481 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractEventPropertiesBuilder.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractEventPropertiesBuilder.java @@ -27,7 +27,15 @@ public abstract class AbstractEventPropertiesBuilder< SELF extends AbstractEventPropertiesBuilder> { - protected final EventProperties eventProperties = new EventProperties(); + protected final EventProperties eventProperties; + + protected AbstractEventPropertiesBuilder() { + this.eventProperties = new EventProperties(); + } + + protected AbstractEventPropertiesBuilder(EventProperties eventProperties) { + this.eventProperties = eventProperties; + } protected abstract SELF self(); diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java index 703e0397f..08b22cdbf 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java @@ -26,6 +26,8 @@ import io.serverlessworkflow.impl.config.ConfigSecretManager; import io.serverlessworkflow.impl.config.SecretManager; import io.serverlessworkflow.impl.config.SystemPropertyConfigManager; +import io.serverlessworkflow.impl.events.CloudEventPredicateFactory; +import io.serverlessworkflow.impl.events.DefaultCloudEventPredicateFactory; import io.serverlessworkflow.impl.events.EventConsumer; import io.serverlessworkflow.impl.events.EventPublisher; import io.serverlessworkflow.impl.events.InMemoryEvents; @@ -90,6 +92,7 @@ public class WorkflowApplication implements AutoCloseable { private final Optional functionReader; private final URI defaultCatalogURI; private final Collection callableProxyBuilders; + private final CloudEventPredicateFactory cloudEventPredicateFactory; private WorkflowApplication(Builder builder) { this.taskFactory = builder.taskFactory; @@ -118,6 +121,7 @@ private WorkflowApplication(Builder builder) { this.defaultCatalogURI = builder.defaultCatalogURI; this.id = builder.id; this.callableProxyBuilders = builder.callableProxyBuilders; + this.cloudEventPredicateFactory = builder.cloudEventPredicateFactory; } public TaskExecutorFactory taskFactory() { @@ -235,6 +239,7 @@ public SchemaValidator getValidator(SchemaInline inline) { private Optional templateResolver; private Optional functionReader; private URI defaultCatalogURI; + private CloudEventPredicateFactory cloudEventPredicateFactory; private Builder() { ServiceLoader.load(NamedWorkflowAdditionalObject.class) @@ -361,6 +366,12 @@ public Builder withDefaultCatalogURI(URI defaultCatalogURI) { return this; } + public Builder withCloudEventPredicateFactory( + CloudEventPredicateFactory cloudEventPredicateFactory) { + this.cloudEventPredicateFactory = cloudEventPredicateFactory; + return this; + } + public WorkflowApplication build() { if (modelFactory == null) { @@ -416,6 +427,11 @@ public WorkflowApplication build() { } templateResolver = loadFirst(URITemplateResolver.class); functionReader = loadFirst(FunctionReader.class); + if (cloudEventPredicateFactory == null) { + cloudEventPredicateFactory = + loadFirst(CloudEventPredicateFactory.class) + .orElseGet(() -> new DefaultCloudEventPredicateFactory()); + } if (defaultCatalogURI == null) { defaultCatalogURI = URI.create("https://github.com/serverlessworkflow/catalog"); } @@ -522,6 +538,10 @@ public Optional functionReader() { return functionReader; } + public CloudEventPredicateFactory cloudEventPredicateFactory() { + return cloudEventPredicateFactory; + } + public URI defaultCatalogURI() { return defaultCatalogURI; } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/AbstractTypeConsumer.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/AbstractTypeConsumer.java index f589055df..d7c6ac1eb 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/events/AbstractTypeConsumer.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/AbstractTypeConsumer.java @@ -53,7 +53,7 @@ public TypeEventRegistrationBuilder listen( EventProperties properties = register.getWith(); String type = properties.getType(); return new TypeEventRegistrationBuilder( - type, new DefaultCloudEventPredicate(properties, application)); + type, application.cloudEventPredicateFactory().build(application, properties)); } @Override diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/CloudEventPredicateFactory.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/CloudEventPredicateFactory.java new file mode 100644 index 000000000..d1e8a2a53 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/CloudEventPredicateFactory.java @@ -0,0 +1,24 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.events; + +import io.serverlessworkflow.api.types.EventProperties; +import io.serverlessworkflow.impl.ServicePriority; +import io.serverlessworkflow.impl.WorkflowApplication; + +public interface CloudEventPredicateFactory extends ServicePriority { + CloudEventPredicate build(WorkflowApplication appl, EventProperties props); +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/DefaultCloudEventPredicate.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/DefaultCloudEventPredicate.java index 3d5863035..94adfff45 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/events/DefaultCloudEventPredicate.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/DefaultCloudEventPredicate.java @@ -36,12 +36,6 @@ public class DefaultCloudEventPredicate implements CloudEventPredicate { - /** - * {@link EventProperties#getAdditionalProperties()} property for custom {@link CloudEvent} {@link - * java.util.function.Predicate} evaluation. - */ - public static final String ENVELOPE_PREDICATE = "envelopePredicate"; - private final CloudEventAttrPredicate idFilter; private final CloudEventAttrPredicate sourceFilter; private final CloudEventAttrPredicate subjectFilter; @@ -50,10 +44,9 @@ public class DefaultCloudEventPredicate implements CloudEventPredicate { private final CloudEventAttrPredicate dataSchemaFilter; private final CloudEventAttrPredicate timeFilter; private final CloudEventAttrPredicate dataFilter; - private final CloudEventAttrPredicate envelopeFilter; private final CloudEventAttrPredicate> additionalFilter; - private static CloudEventAttrPredicate isTrue() { + protected static CloudEventAttrPredicate isTrue() { return (x, w, t) -> true; } @@ -66,25 +59,9 @@ public DefaultCloudEventPredicate(EventProperties properties, WorkflowApplicatio dataSchemaFilter = dataSchemaFilter(properties.getDataschema(), app); timeFilter = offsetTimeFilter(properties.getTime(), app); dataFilter = dataFilter(properties.getData(), app); - envelopeFilter = envelopeFilter(properties, app); additionalFilter = additionalFilter(properties.getAdditionalProperties(), app); } - private CloudEventAttrPredicate envelopeFilter( - EventProperties properties, WorkflowApplication app) { - Object envelopePredObj = null; - if (properties.getAdditionalProperties() != null) { - envelopePredObj = properties.getAdditionalProperties().remove(ENVELOPE_PREDICATE); - } - - return envelopePredObj == null - ? isTrue() - : fromCloudEvent( - app.modelFactory(), - app.expressionFactory() - .buildPredicate(new ExpressionDescriptor(null, envelopePredObj))); - } - private CloudEventAttrPredicate> additionalFilter( Map additionalProperties, WorkflowApplication app) { return additionalProperties != null && !additionalProperties.isEmpty() @@ -100,11 +77,6 @@ private CloudEventAttrPredicate fromCloudEventData( return (d, w, t) -> filter.test(w, t, workflowModelFactory.from(d)); } - private CloudEventAttrPredicate fromCloudEvent( - WorkflowModelFactory workflowModelFactory, WorkflowPredicate filter) { - return (e, w, t) -> filter.test(w, t, workflowModelFactory.from(e)); - } - private CloudEventAttrPredicate> fromMap( WorkflowModelFactory workflowModelFactory, WorkflowPredicate filter) { return (d, w, t) -> filter.test(w, t, workflowModelFactory.from(d)); @@ -207,7 +179,6 @@ public boolean test(CloudEvent event, WorkflowContext workflow, TaskContext task && typeFilter.test(event.getType(), workflow, task) && dataSchemaFilter.test(event.getDataSchema(), workflow, task) && timeFilter.test(event.getTime(), workflow, task) - && envelopeFilter.test(event, workflow, task) && dataFilter.test(event.getData(), workflow, task) && additionalFilter.test(CloudEventUtils.extensions(event), workflow, task); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/DefaultCloudEventPredicateFactory.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/DefaultCloudEventPredicateFactory.java new file mode 100644 index 000000000..82a9ceedb --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/DefaultCloudEventPredicateFactory.java @@ -0,0 +1,27 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.events; + +import io.serverlessworkflow.api.types.EventProperties; +import io.serverlessworkflow.impl.WorkflowApplication; + +public class DefaultCloudEventPredicateFactory implements CloudEventPredicateFactory { + + @Override + public CloudEventPredicate build(WorkflowApplication application, EventProperties props) { + return new DefaultCloudEventPredicate(props, application); + } +}