diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AsyncPersistenceInstanceWriter.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractAsyncPersistenceExecutor.java similarity index 68% rename from impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AsyncPersistenceInstanceWriter.java rename to impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractAsyncPersistenceExecutor.java index 3226eb8b..f94393d2 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AsyncPersistenceInstanceWriter.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractAsyncPersistenceExecutor.java @@ -16,30 +16,26 @@ package io.serverlessworkflow.impl.persistence; import io.serverlessworkflow.impl.WorkflowContextData; -import io.serverlessworkflow.impl.WorkflowDefinitionData; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public abstract class AsyncPersistenceInstanceWriter extends AbstractPersistenceInstanceWriter { +public abstract class AbstractAsyncPersistenceExecutor implements PersistenceExecutor { private static final Logger logger = - LoggerFactory.getLogger(AsyncPersistenceInstanceWriter.class); + LoggerFactory.getLogger(AbstractAsyncPersistenceExecutor.class); private final Map> futuresMap = new ConcurrentHashMap<>(); @Override - protected CompletableFuture doTransaction( - Consumer operation, WorkflowContextData context) { + public CompletableFuture execute(Runnable runnable, WorkflowContextData context) { final ExecutorService service = executorService().orElse(context.definition().application().executorService()); - final Runnable runnable = () -> doTransaction(operation, context.definition()); return futuresMap.compute( context.instanceData().id(), (k, v) -> @@ -49,16 +45,20 @@ protected CompletableFuture doTransaction( } @Override - protected CompletableFuture removeProcessInstance(WorkflowContextData workflowContext) { - return super.removeProcessInstance(workflowContext) - .thenRun(() -> futuresMap.remove(workflowContext.instanceData().id())); + public CompletableFuture startInstance(Runnable runnable, WorkflowContextData context) { + return SyncPersistenceExecutor.execute(runnable); } - protected abstract void doTransaction( - Consumer operation, WorkflowDefinitionData definition); - - protected Optional executorService() { - return Optional.empty(); + @Override + public CompletableFuture deleteInstance(Runnable runnable, WorkflowContextData context) { + CompletableFuture completable = futuresMap.remove(context.instanceData().id()); + if (completable != null) { + CompletableFuture result = completable.whenComplete((__, ___) -> runnable.run()); + completable.cancel(true); + return result; + } else { + return CompletableFuture.completedFuture(null); + } } @Override @@ -75,4 +75,6 @@ public void close() { } futuresMap.clear(); } + + protected abstract Optional executorService(); } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractPersistenceInstanceWriter.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractPersistenceInstanceWriter.java index aa8cd029..17b76034 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractPersistenceInstanceWriter.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractPersistenceInstanceWriter.java @@ -25,7 +25,7 @@ public abstract class AbstractPersistenceInstanceWriter implements PersistenceIn @Override public CompletableFuture started(WorkflowContextData workflowContext) { - return doTransaction(t -> t.writeInstanceData(workflowContext), workflowContext); + return doStartInstance(t -> t.writeInstanceData(workflowContext), workflowContext); } @Override @@ -44,7 +44,7 @@ public CompletableFuture aborted(WorkflowContextData workflowContext) { } protected CompletableFuture removeProcessInstance(WorkflowContextData workflowContext) { - return doTransaction(t -> t.removeProcessInstance(workflowContext), workflowContext); + return doCompleteInstance(t -> t.removeProcessInstance(workflowContext), workflowContext); } @Override @@ -81,4 +81,14 @@ public void close() throws Exception {} protected abstract CompletableFuture doTransaction( Consumer operation, WorkflowContextData context); + + protected CompletableFuture doCompleteInstance( + Consumer operation, WorkflowContextData workflowContext) { + return doTransaction(operation, workflowContext); + } + + protected CompletableFuture doStartInstance( + Consumer operation, WorkflowContextData workflowContext) { + return doTransaction(operation, workflowContext); + } } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AsyncPersistenceExecutor.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AsyncPersistenceExecutor.java new file mode 100644 index 00000000..3cb79403 --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AsyncPersistenceExecutor.java @@ -0,0 +1,33 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence; + +import java.util.Optional; +import java.util.concurrent.ExecutorService; + +public class AsyncPersistenceExecutor extends AbstractAsyncPersistenceExecutor { + + private final Optional service; + + protected AsyncPersistenceExecutor(ExecutorService service) { + this.service = Optional.ofNullable(service); + } + + @Override + protected Optional executorService() { + return service; + } +} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceHandlers.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceHandlers.java index 4e426bee..95311104 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceHandlers.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceHandlers.java @@ -17,7 +17,6 @@ import static io.serverlessworkflow.impl.WorkflowUtils.safeClose; -import java.util.Optional; import java.util.concurrent.ExecutorService; public class DefaultPersistenceInstanceHandlers extends PersistenceInstanceHandlers { @@ -25,20 +24,26 @@ public class DefaultPersistenceInstanceHandlers extends PersistenceInstanceHandl public static class Builder { private final PersistenceInstanceStore store; - private ExecutorService executorService; + private PersistenceExecutor executor; private Builder(PersistenceInstanceStore store) { this.store = store; } public Builder withExecutorService(ExecutorService executorService) { - this.executorService = executorService; + this.executor = new AsyncPersistenceExecutor(executorService); + return this; + } + + public Builder withPersistenceExecutor(PersistenceExecutor executor) { + this.executor = executor; return this; } public PersistenceInstanceHandlers build() { return new DefaultPersistenceInstanceHandlers( - new DefaultPersistenceInstanceWriter(store, Optional.ofNullable(executorService)), + new DefaultPersistenceInstanceWriter( + store, executor == null ? new SyncPersistenceExecutor() : executor), new DefaultPersistenceInstanceReader(store), store); } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceWriter.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceWriter.java index 8765b79b..a3a1953f 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceWriter.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceWriter.java @@ -16,24 +16,23 @@ package io.serverlessworkflow.impl.persistence; import io.serverlessworkflow.impl.WorkflowDefinitionData; -import java.util.Optional; -import java.util.concurrent.ExecutorService; import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class DefaultPersistenceInstanceWriter extends AsyncPersistenceInstanceWriter { +public class DefaultPersistenceInstanceWriter extends TransactedPersistenceInstanceWriter { private final PersistenceInstanceStore store; - private final Optional executorService; + + private final PersistenceExecutor persistenceExecutor; private static final Logger logger = LoggerFactory.getLogger(DefaultPersistenceInstanceWriter.class); protected DefaultPersistenceInstanceWriter( - PersistenceInstanceStore store, Optional executorService) { - this.executorService = executorService; + PersistenceInstanceStore store, PersistenceExecutor persistenceExecutor) { this.store = store; + this.persistenceExecutor = persistenceExecutor; } @Override @@ -54,7 +53,7 @@ protected void doTransaction( } @Override - protected Optional executorService() { - return executorService; + protected PersistenceExecutor persistenceExecutor() { + return persistenceExecutor; } } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceExecutor.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceExecutor.java new file mode 100644 index 00000000..4520ed5b --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceExecutor.java @@ -0,0 +1,33 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence; + +import io.serverlessworkflow.impl.WorkflowContextData; +import java.util.concurrent.CompletableFuture; + +public interface PersistenceExecutor extends AutoCloseable { + CompletableFuture execute(Runnable runnable, WorkflowContextData context); + + default CompletableFuture startInstance(Runnable runnable, WorkflowContextData context) { + return execute(runnable, context); + } + + default CompletableFuture deleteInstance(Runnable runnable, WorkflowContextData context) { + return execute(runnable, context); + } + + default void close() {} +} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/SyncPersistenceExecutor.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/SyncPersistenceExecutor.java new file mode 100644 index 00000000..701d6005 --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/SyncPersistenceExecutor.java @@ -0,0 +1,36 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence; + +import io.serverlessworkflow.impl.WorkflowContextData; +import java.util.concurrent.CompletableFuture; + +public class SyncPersistenceExecutor implements PersistenceExecutor { + + @Override + public CompletableFuture execute(Runnable runnable, WorkflowContextData context) { + return execute(runnable); + } + + public static CompletableFuture execute(Runnable runnable) { + try { + runnable.run(); + return CompletableFuture.completedFuture(null); + } catch (Exception ex) { + return CompletableFuture.failedFuture(ex); + } + } +} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/TransactedPersistenceInstanceWriter.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/TransactedPersistenceInstanceWriter.java new file mode 100644 index 00000000..0e95cd56 --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/TransactedPersistenceInstanceWriter.java @@ -0,0 +1,55 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence; + +import io.serverlessworkflow.impl.WorkflowContextData; +import io.serverlessworkflow.impl.WorkflowDefinitionData; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +public abstract class TransactedPersistenceInstanceWriter + extends AbstractPersistenceInstanceWriter { + + @Override + protected CompletableFuture doTransaction( + Consumer operation, WorkflowContextData context) { + return persistenceExecutor() + .execute(() -> doTransaction(operation, context.definition()), context); + } + + @Override + protected CompletableFuture doStartInstance( + Consumer operation, WorkflowContextData context) { + return persistenceExecutor() + .startInstance(() -> doTransaction(operation, context.definition()), context); + } + + @Override + protected CompletableFuture doCompleteInstance( + Consumer operation, WorkflowContextData context) { + return persistenceExecutor() + .deleteInstance(() -> doTransaction(operation, context.definition()), context); + } + + protected abstract void doTransaction( + Consumer operation, WorkflowDefinitionData definition); + + public void close() { + persistenceExecutor().close(); + } + + protected abstract PersistenceExecutor persistenceExecutor(); +}