diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutorHelper.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutorHelper.java index c5b26e0dc..540e20ff8 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutorHelper.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutorHelper.java @@ -61,8 +61,16 @@ public static TaskExecutor createExecutorList( WorkflowMutablePosition position, List taskItems, WorkflowDefinition workflowDefinition) { + return createExecutorList(position, taskItems, workflowDefinition, "do"); + } + + public static TaskExecutor createExecutorList( + WorkflowMutablePosition position, + List taskItems, + WorkflowDefinition workflowDefinition, + String positionPrefix) { Map> executors = - createExecutorBuilderList(position, taskItems, workflowDefinition, "do"); + createExecutorBuilderList(position, taskItems, workflowDefinition, positionPrefix); executors.values().forEach(t -> t.connect(executors)); Iterator> iter = executors.values().iterator(); TaskExecutor first = iter.next().build(); diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TryExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TryExecutor.java index 35807e690..00e765d5e 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TryExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TryExecutor.java @@ -39,6 +39,7 @@ import io.serverlessworkflow.impl.executors.retry.RetryExecutor; import io.serverlessworkflow.impl.executors.retry.RetryIntervalFunction; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -67,28 +68,23 @@ public static class TryExecutorBuilder extends RegularTaskExecutorBuilder catchTaskDo = catchInfo.getDo(); + this.catchTaskExecutor = + catchTaskDo != null && !catchTaskDo.isEmpty() + ? Optional.of( + TaskExecutorHelper.createExecutorList( + position.copy().addProperty("catch"), catchTaskDo, definition)) + : Optional.empty(); + Retry retry = catchInfo.getRetry(); + this.retryIntervalExecutor = retry != null ? buildRetryInterval(retry) : Optional.empty(); this.taskExecutor = - TaskExecutorHelper.createExecutorList(position, task.getTry(), definition); - TryTaskCatch catchTask = task.getCatch(); - if (catchTask != null) { - this.errorVariable = catchTask.getAs(); - List catchTaskDo = catchTask.getDo(); - this.catchTaskExecutor = - catchTaskDo != null && !catchTaskDo.isEmpty() - ? Optional.of( - TaskExecutorHelper.createExecutorList(position, catchTaskDo, definition)) - : Optional.empty(); - - Retry retry = catchTask.getRetry(); - this.retryIntervalExecutor = retry != null ? buildRetryInterval(retry) : Optional.empty(); - } else { - this.catchTaskExecutor = Optional.empty(); - this.retryIntervalExecutor = Optional.empty(); - } + TaskExecutorHelper.createExecutorList(position, task.getTry(), definition, "try"); } private Optional buildRetryInterval(Retry retry) { diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/RetryTimeoutTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/RetryTimeoutTest.java index d298e32c3..909e4af61 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/RetryTimeoutTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/RetryTimeoutTest.java @@ -20,19 +20,18 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import com.fasterxml.jackson.databind.JsonNode; -import io.serverlessworkflow.api.types.TryTask; +import io.serverlessworkflow.impl.TaskContextData; import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowException; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.jackson.JsonUtils; import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent; +import io.serverlessworkflow.impl.lifecycle.TaskEvent; import io.serverlessworkflow.impl.lifecycle.TaskRetriedEvent; -import io.serverlessworkflow.impl.lifecycle.TraceExecutionListener; import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener; import java.io.IOException; import java.time.Duration; import java.util.Map; -import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import okhttp3.mockwebserver.MockResponse; @@ -55,11 +54,7 @@ void setUp() throws IOException { apiServer = new MockWebServer(); apiServer.start(9797); retryListener = new RetryListener(); - app = - WorkflowApplication.builder() - .withListener(retryListener) - .withListener(new TraceExecutionListener()) - .build(); + app = WorkflowApplication.builder().withListener(retryListener).build(); } @AfterEach @@ -71,16 +66,21 @@ void tearDown() throws IOException { private class RetryListener implements WorkflowExecutionListener { private Map taskRetried = new ConcurrentHashMap<>(); - private Set contexts = ConcurrentHashMap.newKeySet(); + private Map taskCompleted = new ConcurrentHashMap<>(); + @Override public void onTaskRetried(TaskRetriedEvent ev) { - taskRetried.put(ev.taskContext().position().jsonPointer(), ev.taskContext().retryAttempt()); + add2Map(taskRetried, ev); } + @Override public void onTaskCompleted(TaskCompletedEvent ev) { - if (ev.taskContext().task() instanceof TryTask) { - contexts.add(ev.taskContext().retryAttempt()); - } + add2Map(taskCompleted, ev); + } + + private static void add2Map(Map map, TaskEvent ev) { + TaskContextData taskContext = ev.taskContext(); + map.put(taskContext.position().jsonPointer(), taskContext.retryAttempt()); } } @@ -107,8 +107,7 @@ void testRetry(String path) throws IOException { .atMost(Duration.ofSeconds(1)) .until(() -> future.join().as(JsonNode.class).orElseThrow().equals(result)); assertThat(retryListener.taskRetried).hasSize(1); - assertThat(retryListener.taskRetried.get("do/0/tryGetPet/do/0/getPet")).isEqualTo((short) 2); - assertThat(retryListener.contexts).containsOnly((short) 0); + assertThat(retryListener.taskRetried.get("do/0/tryGetPet/try/0/getPet")).isEqualTo((short) 2); } @Test @@ -135,8 +134,37 @@ void testNestedRetry() throws IOException { .atMost(Duration.ofSeconds(1)) .until(() -> future.join().as(JsonNode.class).orElseThrow().equals(result)); assertThat(retryListener.taskRetried).hasSize(2); - assertThat(retryListener.taskRetried.values()).containsExactlyInAnyOrder((short) 5, (short) 2); - assertThat(retryListener.contexts).containsExactlyInAnyOrder((short) 0, (short) 2); + assertThat(retryListener.taskRetried.get("do/0/tryServerError/try/0/tryCommunication/try")) + .isEqualTo((short) 2); + assertThat( + retryListener.taskRetried.get( + "do/0/tryServerError/try/0/tryCommunication/try/0/getPet")) + .isEqualTo((short) 5); + assertThat(retryListener.taskCompleted.get("do/0/tryServerError/try/0/tryCommunication/try")) + .isEqualTo((short) 2); + assertThat(retryListener.taskCompleted.get("do/0/tryServerError/try")).isEqualTo((short) 0); + } + + @Test + void testRetryDo() throws IOException { + CompletableFuture future = + app.workflowDefinition( + readWorkflowFromClasspath("workflows-samples/try-catch-with-do.yaml")) + .instance(Map.of("delay", 0.01)) + .start(); + Awaitility.await() + .atMost(Duration.ofSeconds(1)) + .until( + () -> + future + .join() + .asMap() + .orElseThrow() + .equals(Map.of("setAfterFailingTask", "No problem"))); + + assertThat(retryListener.taskCompleted.get("do/0/attemptTask/try")).isEqualTo((short) 0); + assertThat(retryListener.taskCompleted) + .containsKey("do/0/attemptTask/catch/do/0/executeAfterFailingTask"); } @Test diff --git a/impl/test/src/test/resources/workflows-samples/try-catch-with-do.yaml b/impl/test/src/test/resources/workflows-samples/try-catch-with-do.yaml new file mode 100644 index 000000000..ab68a0a56 --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/try-catch-with-do.yaml @@ -0,0 +1,22 @@ +document: + dsl: '1.0.0-alpha5' + namespace: test + name: try-catch-with-do + version: '0.1.0' +do: + - attemptTask: + try: + - failingTask: + raise: + error: + type: https://example.com/errors/runtime + status: 500 + catch: + errors: + with: + type: https://example.com/errors/runtime + status: 500 + do: + - executeAfterFailingTask: + set: + setAfterFailingTask: "No problem" \ No newline at end of file