Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,18 @@
import io.cloudevents.CloudEventData;
import io.serverlessworkflow.api.types.func.ContextPredicate;
import io.serverlessworkflow.api.types.func.EventDataPredicate;
import io.serverlessworkflow.api.types.func.EventPropertiesPredicate;
import io.serverlessworkflow.api.types.func.FilterPredicate;
import io.serverlessworkflow.api.types.func.TypedContextPredicate;
import io.serverlessworkflow.api.types.func.TypedFilterPredicate;
import io.serverlessworkflow.api.types.func.TypedPredicate;
import io.serverlessworkflow.fluent.spec.AbstractEventPropertiesBuilder;
import io.serverlessworkflow.impl.events.DefaultCloudEventPredicate;
import java.util.function.Predicate;

public class FuncEventFilterPropertiesBuilder
extends AbstractEventPropertiesBuilder<FuncEventFilterPropertiesBuilder> {

public FuncEventFilterPropertiesBuilder() {
super(new EventPropertiesPredicate());
}

@Override
protected FuncEventFilterPropertiesBuilder self() {
return this;
Expand All @@ -54,23 +55,17 @@ public FuncEventFilterPropertiesBuilder data(FilterPredicate<CloudEventData> pre
}

public FuncEventFilterPropertiesBuilder envelope(Predicate<CloudEvent> predicate) {
this.eventProperties.setAdditionalProperty(
DefaultCloudEventPredicate.ENVELOPE_PREDICATE,
new TypedPredicate<>(predicate, CloudEvent.class));
((EventPropertiesPredicate) eventProperties).withPredicate(predicate, CloudEvent.class);
return this;
}

public FuncEventFilterPropertiesBuilder envelope(ContextPredicate<CloudEvent> predicate) {
this.eventProperties.setAdditionalProperty(
DefaultCloudEventPredicate.ENVELOPE_PREDICATE,
new TypedContextPredicate<>(predicate, CloudEvent.class));
((EventPropertiesPredicate) eventProperties).withPredicate(predicate, CloudEvent.class);
return this;
}

public FuncEventFilterPropertiesBuilder envelope(FilterPredicate<CloudEvent> predicate) {
this.eventProperties.setAdditionalProperty(
DefaultCloudEventPredicate.ENVELOPE_PREDICATE,
new TypedFilterPredicate<>(predicate, CloudEvent.class));
((EventPropertiesPredicate) eventProperties).withPredicate(predicate, CloudEvent.class);
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.api.types.func;

import io.serverlessworkflow.api.types.EventProperties;
import java.util.Objects;
import java.util.function.Predicate;

public class EventPropertiesPredicate extends EventProperties {

private Object filterPredicate;

public Object getFilterPredicate() {
return filterPredicate;
}

public <T> EventPropertiesPredicate withPredicate(Predicate<T> predicate) {
this.filterPredicate = predicate;
return this;
}

public <T> EventPropertiesPredicate withPredicate(Predicate<T> predicate, Class<T> clazz) {
Objects.requireNonNull(clazz);
this.filterPredicate = new TypedPredicate<>(predicate, clazz);
return this;
}

public <T> EventPropertiesPredicate withPredicate(ContextPredicate<T> predicate) {
this.filterPredicate = predicate;
return this;
}

public <T> EventPropertiesPredicate withPredicate(ContextPredicate<T> predicate, Class<T> clazz) {
Objects.requireNonNull(clazz);
this.filterPredicate = new TypedContextPredicate<>(predicate, clazz);
return this;
}

public <T> EventPropertiesPredicate withPredicate(FilterPredicate<T> predicate) {
this.filterPredicate = predicate;
return this;
}

public <T> EventPropertiesPredicate withPredicate(FilterPredicate<T> predicate, Class<T> clazz) {
Objects.requireNonNull(clazz);
this.filterPredicate = new TypedFilterPredicate<>(predicate, clazz);
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.api.types.func;

import io.cloudevents.CloudEvent;
import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowApplication;
import io.serverlessworkflow.impl.WorkflowContext;
import io.serverlessworkflow.impl.WorkflowModelFactory;
import io.serverlessworkflow.impl.WorkflowPredicate;
import io.serverlessworkflow.impl.events.CloudEventAttrPredicate;
import io.serverlessworkflow.impl.events.DefaultCloudEventPredicate;
import io.serverlessworkflow.impl.expressions.ExpressionDescriptor;

public class FuncCloudEventPredicate extends DefaultCloudEventPredicate {

private final CloudEventAttrPredicate<CloudEvent> envelopePredicate;

public FuncCloudEventPredicate(EventPropertiesPredicate properties, WorkflowApplication app) {
super(properties, app);
Object envelopePredObj = properties.getFilterPredicate();
this.envelopePredicate =
envelopePredObj == null
? isTrue()
: fromCloudEvent(
app.modelFactory(),
app.expressionFactory()
.buildPredicate(new ExpressionDescriptor(null, envelopePredObj)));
}

private CloudEventAttrPredicate<CloudEvent> fromCloudEvent(
WorkflowModelFactory workflowModelFactory, WorkflowPredicate filter) {
return (e, w, t) -> filter.test(w, t, workflowModelFactory.from(e));
}

@Override
public boolean test(CloudEvent event, WorkflowContext workflow, TaskContext task) {
return envelopePredicate.test(event, workflow, task) && super.test(event, workflow, task);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.api.types.func;

import io.serverlessworkflow.api.types.EventProperties;
import io.serverlessworkflow.impl.WorkflowApplication;
import io.serverlessworkflow.impl.events.CloudEventPredicate;
import io.serverlessworkflow.impl.events.CloudEventPredicateFactory;
import io.serverlessworkflow.impl.events.DefaultCloudEventPredicate;

public class FuncCloudEventPredicateFactory implements CloudEventPredicateFactory {

@Override
public CloudEventPredicate build(WorkflowApplication appl, EventProperties props) {
return props instanceof EventPropertiesPredicate funcProps
? new FuncCloudEventPredicate(funcProps, appl)
: new DefaultCloudEventPredicate(props, appl);
}
}
Comment thread
fjtirado marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
io.serverlessworkflow.api.types.func.FuncCloudEventPredicateFactory
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,15 @@
public abstract class AbstractEventPropertiesBuilder<
SELF extends AbstractEventPropertiesBuilder<SELF>> {

protected final EventProperties eventProperties = new EventProperties();
protected final EventProperties eventProperties;

protected AbstractEventPropertiesBuilder() {
this.eventProperties = new EventProperties();
}

protected AbstractEventPropertiesBuilder(EventProperties eventProperties) {
this.eventProperties = eventProperties;
}

protected abstract SELF self();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import io.serverlessworkflow.impl.config.ConfigSecretManager;
import io.serverlessworkflow.impl.config.SecretManager;
import io.serverlessworkflow.impl.config.SystemPropertyConfigManager;
import io.serverlessworkflow.impl.events.CloudEventPredicateFactory;
import io.serverlessworkflow.impl.events.DefaultCloudEventPredicateFactory;
import io.serverlessworkflow.impl.events.EventConsumer;
import io.serverlessworkflow.impl.events.EventPublisher;
import io.serverlessworkflow.impl.events.InMemoryEvents;
Expand Down Expand Up @@ -90,6 +92,7 @@ public class WorkflowApplication implements AutoCloseable {
private final Optional<FunctionReader> functionReader;
private final URI defaultCatalogURI;
private final Collection<CallableTaskProxyBuilder> callableProxyBuilders;
private final CloudEventPredicateFactory cloudEventPredicateFactory;

private WorkflowApplication(Builder builder) {
this.taskFactory = builder.taskFactory;
Expand Down Expand Up @@ -118,6 +121,7 @@ private WorkflowApplication(Builder builder) {
this.defaultCatalogURI = builder.defaultCatalogURI;
this.id = builder.id;
this.callableProxyBuilders = builder.callableProxyBuilders;
this.cloudEventPredicateFactory = builder.cloudEventPredicateFactory;
}

public TaskExecutorFactory taskFactory() {
Expand Down Expand Up @@ -235,6 +239,7 @@ public SchemaValidator getValidator(SchemaInline inline) {
private Optional<URITemplateResolver> templateResolver;
private Optional<FunctionReader> functionReader;
private URI defaultCatalogURI;
private CloudEventPredicateFactory cloudEventPredicateFactory;

private Builder() {
ServiceLoader.load(NamedWorkflowAdditionalObject.class)
Expand Down Expand Up @@ -361,6 +366,12 @@ public Builder withDefaultCatalogURI(URI defaultCatalogURI) {
return this;
}

public Builder withCloudEventPredicateFactory(
CloudEventPredicateFactory cloudEventPredicateFactory) {
this.cloudEventPredicateFactory = cloudEventPredicateFactory;
return this;
}

public WorkflowApplication build() {

if (modelFactory == null) {
Expand Down Expand Up @@ -416,6 +427,11 @@ public WorkflowApplication build() {
}
templateResolver = loadFirst(URITemplateResolver.class);
functionReader = loadFirst(FunctionReader.class);
if (cloudEventPredicateFactory == null) {
cloudEventPredicateFactory =
loadFirst(CloudEventPredicateFactory.class)
.orElseGet(() -> new DefaultCloudEventPredicateFactory());
}
if (defaultCatalogURI == null) {
defaultCatalogURI = URI.create("https://github.com/serverlessworkflow/catalog");
}
Expand Down Expand Up @@ -522,6 +538,10 @@ public Optional<FunctionReader> functionReader() {
return functionReader;
}

public CloudEventPredicateFactory cloudEventPredicateFactory() {
return cloudEventPredicateFactory;
}

public URI defaultCatalogURI() {
return defaultCatalogURI;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public TypeEventRegistrationBuilder listen(
EventProperties properties = register.getWith();
String type = properties.getType();
return new TypeEventRegistrationBuilder(
type, new DefaultCloudEventPredicate(properties, application));
type, application.cloudEventPredicateFactory().build(application, properties));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.events;

import io.serverlessworkflow.api.types.EventProperties;
import io.serverlessworkflow.impl.ServicePriority;
import io.serverlessworkflow.impl.WorkflowApplication;

public interface CloudEventPredicateFactory extends ServicePriority {
CloudEventPredicate build(WorkflowApplication appl, EventProperties props);
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,6 @@

public class DefaultCloudEventPredicate implements CloudEventPredicate {

/**
* {@link EventProperties#getAdditionalProperties()} property for custom {@link CloudEvent} {@link
* java.util.function.Predicate} evaluation.
*/
public static final String ENVELOPE_PREDICATE = "envelopePredicate";

private final CloudEventAttrPredicate<String> idFilter;
private final CloudEventAttrPredicate<URI> sourceFilter;
private final CloudEventAttrPredicate<String> subjectFilter;
Expand All @@ -50,10 +44,9 @@ public class DefaultCloudEventPredicate implements CloudEventPredicate {
private final CloudEventAttrPredicate<URI> dataSchemaFilter;
private final CloudEventAttrPredicate<OffsetDateTime> timeFilter;
private final CloudEventAttrPredicate<CloudEventData> dataFilter;
private final CloudEventAttrPredicate<CloudEvent> envelopeFilter;
private final CloudEventAttrPredicate<Map<String, Object>> additionalFilter;

private static <T> CloudEventAttrPredicate<T> isTrue() {
protected static <T> CloudEventAttrPredicate<T> isTrue() {
return (x, w, t) -> true;
}

Expand All @@ -66,25 +59,9 @@ public DefaultCloudEventPredicate(EventProperties properties, WorkflowApplicatio
dataSchemaFilter = dataSchemaFilter(properties.getDataschema(), app);
timeFilter = offsetTimeFilter(properties.getTime(), app);
dataFilter = dataFilter(properties.getData(), app);
envelopeFilter = envelopeFilter(properties, app);
additionalFilter = additionalFilter(properties.getAdditionalProperties(), app);
}
Comment thread
fjtirado marked this conversation as resolved.

private CloudEventAttrPredicate<CloudEvent> envelopeFilter(
EventProperties properties, WorkflowApplication app) {
Object envelopePredObj = null;
if (properties.getAdditionalProperties() != null) {
envelopePredObj = properties.getAdditionalProperties().remove(ENVELOPE_PREDICATE);
}

return envelopePredObj == null
? isTrue()
: fromCloudEvent(
app.modelFactory(),
app.expressionFactory()
.buildPredicate(new ExpressionDescriptor(null, envelopePredObj)));
}

private CloudEventAttrPredicate<Map<String, Object>> additionalFilter(
Map<String, Object> additionalProperties, WorkflowApplication app) {
return additionalProperties != null && !additionalProperties.isEmpty()
Expand All @@ -100,11 +77,6 @@ private CloudEventAttrPredicate<CloudEventData> fromCloudEventData(
return (d, w, t) -> filter.test(w, t, workflowModelFactory.from(d));
}

private CloudEventAttrPredicate<CloudEvent> fromCloudEvent(
WorkflowModelFactory workflowModelFactory, WorkflowPredicate filter) {
return (e, w, t) -> filter.test(w, t, workflowModelFactory.from(e));
}

private CloudEventAttrPredicate<Map<String, Object>> fromMap(
WorkflowModelFactory workflowModelFactory, WorkflowPredicate filter) {
return (d, w, t) -> filter.test(w, t, workflowModelFactory.from(d));
Expand Down Expand Up @@ -207,7 +179,6 @@ public boolean test(CloudEvent event, WorkflowContext workflow, TaskContext task
&& typeFilter.test(event.getType(), workflow, task)
&& dataSchemaFilter.test(event.getDataSchema(), workflow, task)
&& timeFilter.test(event.getTime(), workflow, task)
&& envelopeFilter.test(event, workflow, task)
&& dataFilter.test(event.getData(), workflow, task)
&& additionalFilter.test(CloudEventUtils.extensions(event), workflow, task);
}
Expand Down
Loading
Loading