Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,26 @@
package io.serverlessworkflow.impl.persistence;

import io.serverlessworkflow.impl.WorkflowContextData;
import io.serverlessworkflow.impl.WorkflowDefinitionData;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AsyncPersistenceInstanceWriter extends AbstractPersistenceInstanceWriter {
public abstract class AbstractAsyncPersistenceExecutor implements PersistenceExecutor {

private static final Logger logger =
LoggerFactory.getLogger(AsyncPersistenceInstanceWriter.class);
LoggerFactory.getLogger(AbstractAsyncPersistenceExecutor.class);

private final Map<String, CompletableFuture<Void>> futuresMap = new ConcurrentHashMap<>();

@Override
protected CompletableFuture<Void> doTransaction(
Consumer<PersistenceInstanceOperations> operation, WorkflowContextData context) {
public CompletableFuture<Void> execute(Runnable runnable, WorkflowContextData context) {
final ExecutorService service =
executorService().orElse(context.definition().application().executorService());
final Runnable runnable = () -> doTransaction(operation, context.definition());
return futuresMap.compute(
context.instanceData().id(),
(k, v) ->
Expand All @@ -49,16 +45,20 @@ protected CompletableFuture<Void> doTransaction(
}

@Override
protected CompletableFuture<Void> removeProcessInstance(WorkflowContextData workflowContext) {
return super.removeProcessInstance(workflowContext)
.thenRun(() -> futuresMap.remove(workflowContext.instanceData().id()));
public CompletableFuture<Void> startInstance(Runnable runnable, WorkflowContextData context) {
return SyncPersistenceExecutor.execute(runnable);
}

protected abstract void doTransaction(
Consumer<PersistenceInstanceOperations> operation, WorkflowDefinitionData definition);

protected Optional<ExecutorService> executorService() {
return Optional.empty();
@Override
public CompletableFuture<Void> deleteInstance(Runnable runnable, WorkflowContextData context) {
CompletableFuture<Void> completable = futuresMap.remove(context.instanceData().id());
if (completable != null) {
CompletableFuture<Void> result = completable.whenComplete((__, ___) -> runnable.run());
completable.cancel(true);
return result;
} else {
return CompletableFuture.completedFuture(null);
}
}

@Override
Expand All @@ -75,4 +75,6 @@ public void close() {
}
futuresMap.clear();
}

protected abstract Optional<ExecutorService> executorService();
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public abstract class AbstractPersistenceInstanceWriter implements PersistenceIn

@Override
public CompletableFuture<Void> started(WorkflowContextData workflowContext) {
return doTransaction(t -> t.writeInstanceData(workflowContext), workflowContext);
return doStartInstance(t -> t.writeInstanceData(workflowContext), workflowContext);
}

@Override
Expand All @@ -44,7 +44,7 @@ public CompletableFuture<Void> aborted(WorkflowContextData workflowContext) {
}

protected CompletableFuture<Void> removeProcessInstance(WorkflowContextData workflowContext) {
return doTransaction(t -> t.removeProcessInstance(workflowContext), workflowContext);
return doCompleteInstance(t -> t.removeProcessInstance(workflowContext), workflowContext);
}

@Override
Expand Down Expand Up @@ -81,4 +81,14 @@ public void close() throws Exception {}

protected abstract CompletableFuture<Void> doTransaction(
Consumer<PersistenceInstanceOperations> operation, WorkflowContextData context);

protected CompletableFuture<Void> doCompleteInstance(
Consumer<PersistenceInstanceOperations> operation, WorkflowContextData workflowContext) {
return doTransaction(operation, workflowContext);
}

protected CompletableFuture<Void> doStartInstance(
Consumer<PersistenceInstanceOperations> operation, WorkflowContextData workflowContext) {
return doTransaction(operation, workflowContext);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.persistence;

import java.util.Optional;
import java.util.concurrent.ExecutorService;

public class AsyncPersistenceExecutor extends AbstractAsyncPersistenceExecutor {

private final Optional<ExecutorService> service;

protected AsyncPersistenceExecutor(ExecutorService service) {
this.service = Optional.ofNullable(service);
}

@Override
protected Optional<ExecutorService> executorService() {
return service;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,33 @@

import static io.serverlessworkflow.impl.WorkflowUtils.safeClose;

import java.util.Optional;
import java.util.concurrent.ExecutorService;

public class DefaultPersistenceInstanceHandlers extends PersistenceInstanceHandlers {

public static class Builder {

private final PersistenceInstanceStore store;
private ExecutorService executorService;
private PersistenceExecutor executor;

private Builder(PersistenceInstanceStore store) {
this.store = store;
}

public Builder withExecutorService(ExecutorService executorService) {
this.executorService = executorService;
this.executor = new AsyncPersistenceExecutor(executorService);
return this;
}

public Builder withPersistenceExecutor(PersistenceExecutor executor) {
this.executor = executor;
return this;
}

public PersistenceInstanceHandlers build() {
return new DefaultPersistenceInstanceHandlers(
new DefaultPersistenceInstanceWriter(store, Optional.ofNullable(executorService)),
new DefaultPersistenceInstanceWriter(
store, executor == null ? new SyncPersistenceExecutor() : executor),
new DefaultPersistenceInstanceReader(store),
store);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,23 @@
package io.serverlessworkflow.impl.persistence;

import io.serverlessworkflow.impl.WorkflowDefinitionData;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultPersistenceInstanceWriter extends AsyncPersistenceInstanceWriter {
public class DefaultPersistenceInstanceWriter extends TransactedPersistenceInstanceWriter {

private final PersistenceInstanceStore store;
private final Optional<ExecutorService> executorService;

private final PersistenceExecutor persistenceExecutor;

private static final Logger logger =
LoggerFactory.getLogger(DefaultPersistenceInstanceWriter.class);

protected DefaultPersistenceInstanceWriter(
PersistenceInstanceStore store, Optional<ExecutorService> executorService) {
this.executorService = executorService;
PersistenceInstanceStore store, PersistenceExecutor persistenceExecutor) {
this.store = store;
this.persistenceExecutor = persistenceExecutor;
}

@Override
Expand All @@ -54,7 +53,7 @@ protected void doTransaction(
}

@Override
protected Optional<ExecutorService> executorService() {
return executorService;
protected PersistenceExecutor persistenceExecutor() {
return persistenceExecutor;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.persistence;

import io.serverlessworkflow.impl.WorkflowContextData;
import java.util.concurrent.CompletableFuture;

public interface PersistenceExecutor extends AutoCloseable {
CompletableFuture<Void> execute(Runnable runnable, WorkflowContextData context);

default CompletableFuture<Void> startInstance(Runnable runnable, WorkflowContextData context) {
return execute(runnable, context);
}

default CompletableFuture<Void> deleteInstance(Runnable runnable, WorkflowContextData context) {
return execute(runnable, context);
}

default void close() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.persistence;

import io.serverlessworkflow.impl.WorkflowContextData;
import java.util.concurrent.CompletableFuture;

public class SyncPersistenceExecutor implements PersistenceExecutor {

@Override
public CompletableFuture<Void> execute(Runnable runnable, WorkflowContextData context) {
return execute(runnable);
}

public static CompletableFuture<Void> execute(Runnable runnable) {
try {
runnable.run();
return CompletableFuture.completedFuture(null);
} catch (Exception ex) {
return CompletableFuture.failedFuture(ex);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.persistence;

import io.serverlessworkflow.impl.WorkflowContextData;
import io.serverlessworkflow.impl.WorkflowDefinitionData;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

public abstract class TransactedPersistenceInstanceWriter
extends AbstractPersistenceInstanceWriter {

@Override
protected CompletableFuture<Void> doTransaction(
Consumer<PersistenceInstanceOperations> operation, WorkflowContextData context) {
return persistenceExecutor()
.execute(() -> doTransaction(operation, context.definition()), context);
}

@Override
protected CompletableFuture<Void> doStartInstance(
Consumer<PersistenceInstanceOperations> operation, WorkflowContextData context) {
return persistenceExecutor()
.startInstance(() -> doTransaction(operation, context.definition()), context);
}

@Override
protected CompletableFuture<Void> doCompleteInstance(
Consumer<PersistenceInstanceOperations> operation, WorkflowContextData context) {
return persistenceExecutor()
.deleteInstance(() -> doTransaction(operation, context.definition()), context);
}

protected abstract void doTransaction(
Consumer<PersistenceInstanceOperations> operation, WorkflowDefinitionData definition);

public void close() {
persistenceExecutor().close();
}

protected abstract PersistenceExecutor persistenceExecutor();
}
Loading