From 85b1de82357a04182276407b0ed7058d33e8d8f8 Mon Sep 17 00:00:00 2001 From: Matheus Cruz Date: Thu, 19 Mar 2026 23:57:13 -0300 Subject: [PATCH] Enable workflow restoration for WAITING status Workflows in WAITING status now properly restore after restart by re-registering event listeners. Modified WorkflowPersistenceInstance to skip internalSuspend() for WAITING status, allowing listen tasks to re-execute and register handlers. Signed-off-by: Matheus Cruz --- .../persistence/AbstractPersistenceInstanceWriter.java | 6 ++++++ .../impl/persistence/PersistenceInstanceWriter.java | 6 ++++++ .../impl/persistence/WorkflowPersistenceInstance.java | 2 ++ .../impl/persistence/WorkflowPersistenceListener.java | 10 ++++++++++ 4 files changed, 24 insertions(+) diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractPersistenceInstanceWriter.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractPersistenceInstanceWriter.java index aa8cd0298..f1b21b075 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractPersistenceInstanceWriter.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractPersistenceInstanceWriter.java @@ -76,6 +76,12 @@ public CompletableFuture resumed(WorkflowContextData workflowContext) { return doTransaction(t -> t.clearStatus(workflowContext), workflowContext); } + @Override + public CompletableFuture statusChanged( + WorkflowContextData workflowContext, WorkflowStatus status) { + return doTransaction(t -> t.writeStatus(workflowContext, status), workflowContext); + } + @Override public void close() throws Exception {} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceWriter.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceWriter.java index 31ce235a5..56d067223 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceWriter.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceWriter.java @@ -17,6 +17,7 @@ import io.serverlessworkflow.impl.TaskContextData; import io.serverlessworkflow.impl.WorkflowContextData; +import io.serverlessworkflow.impl.WorkflowStatus; import java.util.concurrent.CompletableFuture; public interface PersistenceInstanceWriter extends AutoCloseable { @@ -33,6 +34,11 @@ public interface PersistenceInstanceWriter extends AutoCloseable { CompletableFuture resumed(WorkflowContextData workflowContext); + default CompletableFuture statusChanged( + WorkflowContextData workflowContext, WorkflowStatus status) { + return CompletableFuture.completedFuture(null); + } + CompletableFuture taskRetried( WorkflowContextData workflowContext, TaskContextData taskContext); diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java index a245c113d..629f18a1a 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java @@ -40,6 +40,8 @@ public CompletableFuture start() { () -> { if (info.status() == WorkflowStatus.SUSPENDED) { internalSuspend(); + } else if (info.status() == WorkflowStatus.WAITING) { + status(WorkflowStatus.WAITING); } }); } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceListener.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceListener.java index 781b8c12a..abfac34c8 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceListener.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceListener.java @@ -15,6 +15,7 @@ */ package io.serverlessworkflow.impl.persistence; +import io.serverlessworkflow.impl.WorkflowStatus; import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent; import io.serverlessworkflow.impl.lifecycle.TaskRetriedEvent; import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent; @@ -24,6 +25,7 @@ import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent; import io.serverlessworkflow.impl.lifecycle.WorkflowResumedEvent; import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowStatusEvent; import io.serverlessworkflow.impl.lifecycle.WorkflowSuspendedEvent; public class WorkflowPersistenceListener implements WorkflowExecutionListener { @@ -78,4 +80,12 @@ public void onTaskCompleted(TaskCompletedEvent ev) { public void onTaskRetried(TaskRetriedEvent ev) { persistenceWriter.taskRetried(ev.workflowContext(), ev.taskContext()); } + + @Override + public void onWorkflowStatusChanged(WorkflowStatusEvent ev) { + if (ev.status() == WorkflowStatus.WAITING) { + // Only persist WAITING for now + persistenceWriter.statusChanged(ev.workflowContext(), ev.status()); + } + } }