diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index aaa1318..543f745 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -25,12 +25,18 @@ jobs: path: blue-quickjs submodules: recursive - - name: Set up JDK + - name: Set up Java 8 test runtime uses: actions/setup-java@v3 with: java-version: '8' distribution: 'corretto' + - name: Set up JDK 25 + uses: actions/setup-java@v3 + with: + java-version: '25' + distribution: 'corretto' + - name: Set up pnpm uses: pnpm/action-setup@v4 with: diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 659f033..6fa2f78 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -29,10 +29,16 @@ jobs: path: blue-quickjs submodules: recursive - - name: Set up JDK + - name: Set up Java 8 test runtime uses: actions/setup-java@v3 with: - java-version: '17' + java-version: '8' + distribution: 'corretto' + + - name: Set up JDK 25 + uses: actions/setup-java@v3 + with: + java-version: '25' distribution: 'corretto' - name: Set up pnpm diff --git a/README.md b/README.md index ffea53e..31b2d18 100644 --- a/README.md +++ b/README.md @@ -25,11 +25,12 @@ dependencies { } ``` -The project targets Java 8 bytecode and depends on: +The project targets Java 8-compatible bytecode, builds with JDK 25, runs tests +on Java 8, and depends on: ```groovy api "blue.language:blue-language-java:3.0.0" -api "blue.repo:blue-repo-java:2.0.1" +api "blue.repo:blue-repo-java:3.0.0-rc.1" api "blue.bex:blue-bex-java:1.0.0" ``` @@ -64,7 +65,9 @@ DocumentProcessor processor = `CoordinationProcessors` intentionally does not register a concrete processor for `Coordination/Timeline Channel`. Applications should provide their own timeline provider channel processor or register a small local test processor -for fixtures that use `Coordination/Timeline Entry`. +for fixtures that use `Coordination/Timeline Entry`. `Coordination/All Timelines +Channel` delegates to those registered timeline channel processors when deciding +which declared timelines can invoke a shared operation. ## Counter Document @@ -79,14 +82,10 @@ contracts: timelineId: counter-demo increment: - type: Coordination/Operation + type: Coordination/Sequential Workflow Operation channel: ownerChannel request: type: Integer - - incrementImpl: - type: Coordination/Sequential Workflow Operation - operation: increment steps: - name: IncrementAndEmit type: Coordination/Compute @@ -158,14 +157,18 @@ processing from the same resolved state. This library provides executable behavior for: +- `Coordination/All Timelines Channel`; - `Coordination/Composite Timeline Channel`; -- `Coordination/Operation`; +- `Coordination/Chat Workflow Operation`; - `Coordination/Sequential Workflow`; - `Coordination/Sequential Workflow Operation`; - `Coordination/Compute`; - `Coordination/Update Document`; - `Coordination/Trigger Event`. +It also registers `Coordination/Operation` as a non-executable declaration +type for operation-shaped contracts. + The underlying `blue-language-java` runtime provides base behavior used by Coordination documents: @@ -198,6 +201,10 @@ Common workflow bindings: ## Build And Test +Gradle runs on JDK 25 and uses a Java 8 toolchain for tests. If Java 8 is not +installed locally, Gradle can provision it through the configured Foojay +toolchain resolver. + Run tests: ```bash @@ -240,7 +247,9 @@ src/main/java/blue/coordination/processor CoordinationProcessors.java CoordinationProcessorOptions.java CoordinationBexIntrinsics.java + AllTimelinesChannelProcessor.java CompositeTimelineChannelProcessor.java + ChatWorkflowOperationProcessor.java OperationProcessor.java SequentialWorkflowProcessor.java SequentialWorkflowOperationProcessor.java diff --git a/build.gradle b/build.gradle index e474a22..defd609 100644 --- a/build.gradle +++ b/build.gradle @@ -8,7 +8,7 @@ plugins { id 'java-library' id 'maven-publish' id 'signing' - id 'org.jreleaser' version '1.13.1' + id 'org.jreleaser' version '1.24.0' } group = 'blue.coordination' @@ -28,21 +28,18 @@ repositories { java { withSourcesJar() withJavadocJar() + sourceCompatibility = JavaVersion.VERSION_1_8 + targetCompatibility = JavaVersion.VERSION_1_8 } -sourceCompatibility = JavaVersion.VERSION_1_8 -targetCompatibility = JavaVersion.VERSION_1_8 - tasks.withType(JavaCompile).configureEach { options.encoding = 'UTF-8' - if (JavaVersion.current().isJava9Compatible()) { - options.release = 8 - } + options.release = 8 } dependencies { api 'blue.language:blue-language-java:3.0.0' - api 'blue.repo:blue-repo-java:2.0.1' + api 'blue.repo:blue-repo-java:3.0.0-rc.1' api 'blue.bex:blue-bex-java:1.0.0' implementation 'com.fasterxml.jackson.core:jackson-databind:2.15.2' @@ -59,6 +56,9 @@ compileTestJava { } test { + javaLauncher = javaToolchains.launcherFor { + languageVersion = JavaLanguageVersion.of(8) + } useJUnitPlatform() reports { junitXml.required = false diff --git a/gradle.properties b/gradle.properties new file mode 100644 index 0000000..4ac81a4 --- /dev/null +++ b/gradle.properties @@ -0,0 +1,2 @@ +org.gradle.java.installations.auto-download=true +org.gradle.java.installations.fromEnv=JAVA_HOME_8_X64,JAVA_HOME_8_ARM64,JAVA8_HOME diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index 1b33c55..b1b8ef5 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index ca025c8..eb84db6 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,7 +1,9 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-8.14-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-9.6.0-bin.zip networkTimeout=10000 +retries=0 +retryBackOffMs=500 validateDistributionUrl=true zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/gradlew b/gradlew index 23d15a9..249efbb 100755 --- a/gradlew +++ b/gradlew @@ -1,7 +1,7 @@ #!/bin/sh # -# Copyright © 2015-2021 the original authors. +# Copyright © 2015 the original authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -20,7 +20,7 @@ ############################################################################## # -# Gradle start up script for POSIX generated by Gradle. +# gradlew start up script for POSIX generated by Gradle. # # Important for running: # @@ -29,7 +29,7 @@ # bash, then to run this script, type that shell name before the whole # command line, like: # -# ksh Gradle +# ksh gradlew # # Busybox and similar reduced shells will NOT work, because this script # requires all of these POSIX shell features: @@ -57,7 +57,7 @@ # Darwin, MinGW, and NonStop. # # (3) This script is generated from the Groovy template -# https://github.com/gradle/gradle/blob/HEAD/platforms/jvm/plugins-application/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# https://github.com/gradle/gradle/blob/3d91ce3b8caaf77ad09f381f43615b715b53f72c/platforms/jvm/plugins-application/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt # within the Gradle project. # # You can find Gradle at https://github.com/gradle/gradle/. @@ -114,7 +114,6 @@ case "$( uname )" in #( NONSTOP* ) nonstop=true ;; esac -CLASSPATH="\\\"\\\"" # Determine the Java command to use to start the JVM. @@ -172,7 +171,6 @@ fi # For Cygwin or MSYS, switch paths to Windows format before running java if "$cygwin" || "$msys" ; then APP_HOME=$( cygpath --path --mixed "$APP_HOME" ) - CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" ) JAVACMD=$( cygpath --unix "$JAVACMD" ) @@ -212,7 +210,6 @@ DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' set -- \ "-Dorg.gradle.appname=$APP_BASE_NAME" \ - -classpath "$CLASSPATH" \ -jar "$APP_HOME/gradle/wrapper/gradle-wrapper.jar" \ "$@" diff --git a/gradlew.bat b/gradlew.bat index db3a6ac..8508ef6 100644 --- a/gradlew.bat +++ b/gradlew.bat @@ -1,94 +1,82 @@ -@rem -@rem Copyright 2015 the original author or authors. -@rem -@rem Licensed under the Apache License, Version 2.0 (the "License"); -@rem you may not use this file except in compliance with the License. -@rem You may obtain a copy of the License at -@rem -@rem https://www.apache.org/licenses/LICENSE-2.0 -@rem -@rem Unless required by applicable law or agreed to in writing, software -@rem distributed under the License is distributed on an "AS IS" BASIS, -@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -@rem See the License for the specific language governing permissions and -@rem limitations under the License. -@rem -@rem SPDX-License-Identifier: Apache-2.0 -@rem - -@if "%DEBUG%"=="" @echo off -@rem ########################################################################## -@rem -@rem Gradle startup script for Windows -@rem -@rem ########################################################################## - -@rem Set local scope for the variables with windows NT shell -if "%OS%"=="Windows_NT" setlocal - -set DIRNAME=%~dp0 -if "%DIRNAME%"=="" set DIRNAME=. -@rem This is normally unused -set APP_BASE_NAME=%~n0 -set APP_HOME=%DIRNAME% - -@rem Resolve any "." and ".." in APP_HOME to make it shorter. -for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi - -@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" - -@rem Find java.exe -if defined JAVA_HOME goto findJavaFromJavaHome - -set JAVA_EXE=java.exe -%JAVA_EXE% -version >NUL 2>&1 -if %ERRORLEVEL% equ 0 goto execute - -echo. 1>&2 -echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. 1>&2 -echo. 1>&2 -echo Please set the JAVA_HOME variable in your environment to match the 1>&2 -echo location of your Java installation. 1>&2 - -goto fail - -:findJavaFromJavaHome -set JAVA_HOME=%JAVA_HOME:"=% -set JAVA_EXE=%JAVA_HOME%/bin/java.exe - -if exist "%JAVA_EXE%" goto execute - -echo. 1>&2 -echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% 1>&2 -echo. 1>&2 -echo Please set the JAVA_HOME variable in your environment to match the 1>&2 -echo location of your Java installation. 1>&2 - -goto fail - -:execute -@rem Setup the command line - -set CLASSPATH= - - -@rem Execute Gradle -"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" -jar "%APP_HOME%\gradle\wrapper\gradle-wrapper.jar" %* - -:end -@rem End local scope for the variables with windows NT shell -if %ERRORLEVEL% equ 0 goto mainEnd - -:fail -rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of -rem the _cmd.exe /c_ return code! -set EXIT_CODE=%ERRORLEVEL% -if %EXIT_CODE% equ 0 set EXIT_CODE=1 -if not ""=="%GRADLE_EXIT_CONSOLE%" exit %EXIT_CODE% -exit /b %EXIT_CODE% - -:mainEnd -if "%OS%"=="Windows_NT" endlocal - -:omega +@rem +@rem Copyright 2015 the original author or authors. +@rem +@rem Licensed under the Apache License, Version 2.0 (the "License"); +@rem you may not use this file except in compliance with the License. +@rem You may obtain a copy of the License at +@rem +@rem https://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. +@rem +@rem SPDX-License-Identifier: Apache-2.0 +@rem + +@if "%DEBUG%"=="" @echo off +@rem ########################################################################## +@rem +@rem gradlew startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables, and ensure extensions are enabled +setlocal EnableExtensions + +set DIRNAME=%~dp0 +if "%DIRNAME%"=="" set DIRNAME=. +@rem This is normally unused +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Resolve any "." and ".." in APP_HOME to make it shorter. +for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if %ERRORLEVEL% equ 0 goto execute + +echo. 1>&2 +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. 1>&2 +echo. 1>&2 +echo Please set the JAVA_HOME variable in your environment to match the 1>&2 +echo location of your Java installation. 1>&2 + +"%COMSPEC%" /c exit 1 + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto execute + +echo. 1>&2 +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% 1>&2 +echo. 1>&2 +echo Please set the JAVA_HOME variable in your environment to match the 1>&2 +echo location of your Java installation. 1>&2 + +"%COMSPEC%" /c exit 1 + +:execute +@rem Setup the command line + + + +@rem Execute gradlew +@rem endlocal doesn't take effect until after the line is parsed and variables are expanded +@rem which allows us to clear the local environment before executing the java command +endlocal & "%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -jar "%APP_HOME%\gradle\wrapper\gradle-wrapper.jar" %* & call :exitWithErrorLevel + +:exitWithErrorLevel +@rem Use "%COMSPEC%" /c exit to allow operators to work properly in scripts +"%COMSPEC%" /c exit %ERRORLEVEL% diff --git a/settings.gradle b/settings.gradle index aede013..58270ff 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1 +1,15 @@ +plugins { + id 'org.gradle.toolchains.foojay-resolver-convention' version '1.0.0' +} + rootProject.name = 'blue-coordination-java' + +def localBlueRepository = file('../blue-repository-java') +if (providers.gradleProperty('useLocalBlueRepository').map { it.toBoolean() }.getOrElse(false) + && localBlueRepository.isDirectory()) { + includeBuild(localBlueRepository) { + dependencySubstitution { + substitute module('blue.repo:blue-repo-java') using project(':') + } + } +} diff --git a/src/main/java/blue/coordination/processor/AllTimelinesChannelProcessor.java b/src/main/java/blue/coordination/processor/AllTimelinesChannelProcessor.java new file mode 100644 index 0000000..9228e33 --- /dev/null +++ b/src/main/java/blue/coordination/processor/AllTimelinesChannelProcessor.java @@ -0,0 +1,102 @@ +package blue.coordination.processor; + +import blue.language.model.Node; +import blue.language.processor.ChannelCheckpointContext; +import blue.language.processor.ChannelEvaluation; +import blue.language.processor.ChannelEvaluationContext; +import blue.language.processor.ChannelProcessor; +import blue.language.processor.model.ChannelEventCheckpoint; +import blue.language.processor.model.ChannelContract; +import blue.language.processor.model.MarkerContract; +import blue.repo.coordination.AllTimelinesChannel; +import blue.repo.coordination.TimelineChannel; +import java.util.Map; + +public final class AllTimelinesChannelProcessor implements ChannelProcessor { + @Override + public Class contractType() { + return AllTimelinesChannel.class; + } + + @Override + public ChannelEvaluation evaluate(AllTimelinesChannel contract, ChannelEvaluationContext context) { + Node event = context.event(); + if (!CoordinationEventNodes.isTimelineEntry(event)) { + return ChannelEvaluation.noMatch(); + } + MatchingTimeline matching = matchingTimeline(context); + if (matching == null) { + return ChannelEvaluation.noMatch(); + } + Node deliveryEvent = matching.evaluation.event() != null + ? matching.evaluation.event() + : event; + return ChannelEvaluation.match(withAllTimelinesMetadata(deliveryEvent, matching.channelKey), + matching.evaluation.eventId()); + } + + private MatchingTimeline matchingTimeline(ChannelEvaluationContext context) { + for (Map.Entry entry : context.channels().entrySet()) { + String key = entry.getKey(); + ChannelContract channel = entry.getValue(); + if (key == null || key.equals(context.bindingKey()) || !(channel instanceof TimelineChannel)) { + continue; + } + ChannelProcessor processor = context.channelProcessor(key); + if (processor == null) { + throw new IllegalStateException("No processor registered for All Timelines Channel child '" + + key + "'"); + } + ChannelEvaluation evaluation = evaluateChild(processor, channel, context.forBindingKey(key)); + if (evaluation != null && evaluation.matches() && childCheckpointAllows(key, context)) { + return new MatchingTimeline(key, evaluation); + } + } + return null; + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + private ChannelEvaluation evaluateChild(ChannelProcessor processor, + ChannelContract child, + ChannelEvaluationContext context) { + return processor.evaluate(child, context); + } + + private boolean childCheckpointAllows(String channelKey, + ChannelEvaluationContext context) { + ChannelEventCheckpoint checkpoint = checkpoint(context); + Node lastEvent = checkpoint != null ? checkpoint.lastEvent(channelKey) : null; + return lastEvent == null || !TimelineProviderSupport.isOlderSameTimelineEvent(context.event(), lastEvent); + } + + private ChannelEventCheckpoint checkpoint(ChannelEvaluationContext context) { + MarkerContract marker = context.markers().get("checkpoint"); + return marker instanceof ChannelEventCheckpoint ? (ChannelEventCheckpoint) marker : null; + } + + @Override + public boolean isNewerEvent(AllTimelinesChannel contract, ChannelCheckpointContext context) { + return TimelineProviderSupport.isNewerOrDifferentTimelineEvent(context); + } + + private Node withAllTimelinesMetadata(Node event, String sourceChannelKey) { + Node copy = event.clone(); + Node meta = TimelineProviderSupport.property(copy, "meta"); + if (meta == null) { + meta = new Node(); + copy.properties("meta", meta); + } + meta.properties("allTimelinesSourceChannelKey", new Node().value(sourceChannelKey)); + return copy; + } + + private static final class MatchingTimeline { + private final String channelKey; + private final ChannelEvaluation evaluation; + + private MatchingTimeline(String channelKey, ChannelEvaluation evaluation) { + this.channelKey = channelKey; + this.evaluation = evaluation; + } + } +} diff --git a/src/main/java/blue/coordination/processor/ChatWorkflowOperationProcessor.java b/src/main/java/blue/coordination/processor/ChatWorkflowOperationProcessor.java new file mode 100644 index 0000000..a302de2 --- /dev/null +++ b/src/main/java/blue/coordination/processor/ChatWorkflowOperationProcessor.java @@ -0,0 +1,58 @@ +package blue.coordination.processor; + +import blue.coordination.processor.workflow.SequentialWorkflowRunner; +import blue.language.processor.HandlerMatchContext; +import blue.language.processor.HandlerProcessor; +import blue.language.processor.HandlerRegistrationContext; +import blue.language.processor.ProcessorExecutionContext; +import blue.repo.coordination.ChatWorkflowOperation; +import blue.repo.coordination.SequentialWorkflow; + +public final class ChatWorkflowOperationProcessor implements HandlerProcessor { + private final SequentialWorkflowRunner runner; + private final OperationRequestMatcher matcher = new OperationRequestMatcher(); + + public ChatWorkflowOperationProcessor() { + this(new SequentialWorkflowRunner()); + } + + public ChatWorkflowOperationProcessor(SequentialWorkflowRunner runner) { + if (runner == null) { + throw new IllegalArgumentException("runner must not be null"); + } + this.runner = runner; + } + + @Override + public Class contractType() { + return ChatWorkflowOperation.class; + } + + @Override + public String deriveChannel(ChatWorkflowOperation contract, HandlerRegistrationContext context) { + String channel = trimToNull(contract.getChannel()); + if (channel != null && !context.hasContract(channel)) { + throw new IllegalStateException("Chat workflow operation '" + context.handlerKey() + + "' references unknown channel '" + channel + "'"); + } + return channel; + } + + @Override + public boolean matches(ChatWorkflowOperation contract, HandlerMatchContext context) { + return matcher.matches(contract, context); + } + + @Override + public void execute(ChatWorkflowOperation contract, ProcessorExecutionContext context) { + runner.execute(new SequentialWorkflow().steps(contract.getSteps()), context); + } + + private static String trimToNull(String value) { + if (value == null) { + return null; + } + String trimmed = value.trim(); + return trimmed.isEmpty() ? null : trimmed; + } +} diff --git a/src/main/java/blue/coordination/processor/CompositeTimelineChannelProcessor.java b/src/main/java/blue/coordination/processor/CompositeTimelineChannelProcessor.java index 63a9829..2e274fb 100644 --- a/src/main/java/blue/coordination/processor/CompositeTimelineChannelProcessor.java +++ b/src/main/java/blue/coordination/processor/CompositeTimelineChannelProcessor.java @@ -2,7 +2,6 @@ import blue.language.model.Node; import blue.language.processor.ChannelCheckpointContext; -import blue.language.processor.ChannelDelivery; import blue.language.processor.ChannelEvaluation; import blue.language.processor.ChannelEvaluationContext; import blue.language.processor.ChannelProcessor; @@ -10,7 +9,6 @@ import blue.language.processor.model.ChannelContract; import blue.language.processor.model.MarkerContract; import blue.repo.coordination.CompositeTimelineChannel; -import java.util.ArrayList; import java.util.List; public final class CompositeTimelineChannelProcessor implements ChannelProcessor { @@ -25,7 +23,7 @@ public ChannelEvaluation evaluate(CompositeTimelineChannel contract, ChannelEval if (channels == null || channels.isEmpty()) { return ChannelEvaluation.noMatch(); } - List deliveries = new ArrayList(); + MatchingChild matching = null; for (String childKey : channels) { String key = trimToNull(childKey); if (key == null) { @@ -55,18 +53,18 @@ public ChannelEvaluation evaluate(CompositeTimelineChannel contract, ChannelEval if (deliveryEvent == null) { continue; } - String checkpointKey = compositeCheckpointKey(context.bindingKey(), key); - Boolean shouldProcess = shouldProcessChild(processor, child, context, checkpointKey, childEvaluation); - deliveries.add(ChannelDelivery.of(withCompositeMetadata(deliveryEvent, key), - childEvaluation.eventId(), - checkpointKey, - shouldProcess)); + if (!childCheckpointAllows(key, context)) { + continue; + } + if (matching == null) { + matching = new MatchingChild(key, deliveryEvent, childEvaluation.eventId()); + } } - return ChannelEvaluation.matchDeliveries(deliveries); - } - - static String compositeCheckpointKey(String compositeKey, String childKey) { - return compositeKey + "::" + childKey; + if (matching == null) { + return ChannelEvaluation.noMatch(); + } + return ChannelEvaluation.match(withCompositeMetadata(matching.event, matching.channelKey), + matching.eventId); } @SuppressWarnings({"rawtypes", "unchecked"}) @@ -76,26 +74,21 @@ private ChannelEvaluation evaluateChild(ChannelProcessor processor, return processor.evaluate(child, context); } - @SuppressWarnings({"rawtypes", "unchecked"}) - private Boolean shouldProcessChild(ChannelProcessor processor, - ChannelContract child, - ChannelEvaluationContext context, - String checkpointKey, - ChannelEvaluation childEvaluation) { + private boolean childCheckpointAllows(String channelKey, + ChannelEvaluationContext context) { + ChannelEventCheckpoint checkpoint = checkpoint(context); + Node lastEvent = checkpoint != null ? checkpoint.lastEvent(channelKey) : null; + return lastEvent == null || !TimelineProviderSupport.isOlderSameTimelineEvent(context.event(), lastEvent); + } + + private ChannelEventCheckpoint checkpoint(ChannelEvaluationContext context) { MarkerContract marker = context.markers().get("checkpoint"); - ChannelEventCheckpoint checkpoint = marker instanceof ChannelEventCheckpoint - ? (ChannelEventCheckpoint) marker - : null; - Node lastEvent = checkpoint != null ? checkpoint.lastEvent(checkpointKey) : null; - ChannelCheckpointContext checkpointContext = ChannelCheckpointContext.of( - context.scopePath(), - checkpointKey, - context.event(), - childEvaluation.eventId(), - lastEvent, - null, - context.markers()); - return processor.isNewerEvent(child, checkpointContext); + return marker instanceof ChannelEventCheckpoint ? (ChannelEventCheckpoint) marker : null; + } + + @Override + public boolean isNewerEvent(CompositeTimelineChannel contract, ChannelCheckpointContext context) { + return TimelineProviderSupport.isNewerOrDifferentTimelineEvent(context); } private Node withCompositeMetadata(Node event, String childKey) { @@ -123,4 +116,16 @@ private String trimToNull(String value) { String trimmed = value.trim(); return trimmed.isEmpty() ? null : trimmed; } + + private static final class MatchingChild { + private final String channelKey; + private final Node event; + private final String eventId; + + private MatchingChild(String channelKey, Node event, String eventId) { + this.channelKey = channelKey; + this.event = event; + this.eventId = eventId; + } + } } diff --git a/src/main/java/blue/coordination/processor/CoordinationProcessors.java b/src/main/java/blue/coordination/processor/CoordinationProcessors.java index 4a878c2..ef643ca 100644 --- a/src/main/java/blue/coordination/processor/CoordinationProcessors.java +++ b/src/main/java/blue/coordination/processor/CoordinationProcessors.java @@ -22,8 +22,12 @@ public static Blue registerWith(Blue blue, CoordinationProcessorOptions options) } SequentialWorkflowRunner runner = workflowRunner(options); BlueRepositoryModels.registerAll(blue.getDocumentProcessor().getContractTypeResolver()); + blue.registerContractProcessor(new AllTimelinesChannelProcessor()); blue.registerContractProcessor(new CompositeTimelineChannelProcessor()); blue.registerContractProcessor(new OperationProcessor()); + blue.registerContractProcessor(runner != null + ? new ChatWorkflowOperationProcessor(runner) + : new ChatWorkflowOperationProcessor()); blue.registerContractProcessor(runner != null ? new SequentialWorkflowProcessor(runner) : new SequentialWorkflowProcessor()); @@ -48,8 +52,12 @@ public static DocumentProcessor.Builder configure(DocumentProcessor.Builder buil new TypeClassResolver("blue.language.processor.model")); return builder .withContractTypeResolver(resolver) + .registerContractProcessor(new AllTimelinesChannelProcessor()) .registerContractProcessor(new CompositeTimelineChannelProcessor()) .registerContractProcessor(new OperationProcessor()) + .registerContractProcessor(runner != null + ? new ChatWorkflowOperationProcessor(runner) + : new ChatWorkflowOperationProcessor()) .registerContractProcessor(runner != null ? new SequentialWorkflowProcessor(runner) : new SequentialWorkflowProcessor()) diff --git a/src/main/java/blue/coordination/processor/OperationProcessor.java b/src/main/java/blue/coordination/processor/OperationProcessor.java index 52128d5..3a9b709 100644 --- a/src/main/java/blue/coordination/processor/OperationProcessor.java +++ b/src/main/java/blue/coordination/processor/OperationProcessor.java @@ -1,11 +1,29 @@ package blue.coordination.processor; -import blue.language.processor.ContractProcessor; +import blue.language.processor.HandlerMatchContext; +import blue.language.processor.HandlerProcessor; +import blue.language.processor.HandlerRegistrationContext; +import blue.language.processor.ProcessorExecutionContext; import blue.repo.coordination.Operation; -public final class OperationProcessor implements ContractProcessor { +public final class OperationProcessor implements HandlerProcessor { @Override public Class contractType() { return Operation.class; } + + @Override + public String deriveChannel(Operation contract, HandlerRegistrationContext context) { + return contract != null ? contract.getChannel() : null; + } + + @Override + public boolean matches(Operation contract, HandlerMatchContext context) { + return false; + } + + @Override + public void execute(Operation contract, ProcessorExecutionContext context) { + throw new IllegalStateException("Operation contracts declare operation shape but are not executable"); + } } diff --git a/src/main/java/blue/coordination/processor/OperationRequestMatcher.java b/src/main/java/blue/coordination/processor/OperationRequestMatcher.java index 6da314b..c57a2e6 100644 --- a/src/main/java/blue/coordination/processor/OperationRequestMatcher.java +++ b/src/main/java/blue/coordination/processor/OperationRequestMatcher.java @@ -5,7 +5,6 @@ import blue.language.processor.model.InitializationMarker; import blue.language.processor.model.MarkerContract; import blue.language.utils.BlueIdCalculator; -import blue.repo.coordination.Operation; import blue.repo.coordination.OperationRequest; import blue.repo.coordination.SequentialWorkflowOperation; import java.util.Map; @@ -23,21 +22,17 @@ boolean matches(SequentialWorkflowOperation contract, HandlerMatchContext contex if (requestEvent == null) { return false; } - String operationKey = trimToNull(contract.getOperation()); + String operationKey = trimToNull(contract.getKey()); if (operationKey == null || !operationKey.equals(requestEvent.operation())) { return false; } - Operation operation = operationMarker(context.markers(), operationKey); - if (operation == null) { - return false; - } - if (!channelsCompatible(contract, operation)) { + if (!channelsCompatible(contract)) { return false; } if (!pinnedDocumentCompatible(requestEvent, context.markers())) { return false; } - return requestMatches(operation.getRequest(), requestEvent, context); + return requestMatches(contract.getRequest(), requestEvent, context); } private boolean requestMatches(Node requestPattern, @@ -67,8 +62,8 @@ private boolean isEmptyRequestPattern(Node requestPattern) { && requestPattern.getSchema() == null; } - private boolean channelsCompatible(SequentialWorkflowOperation contract, Operation operation) { - String operationChannel = trimToNull(operation.getChannel()); + private boolean channelsCompatible(SequentialWorkflowOperation contract) { + String operationChannel = trimToNull(contract.getChannel()); if (operationChannel == null) { return true; } @@ -102,14 +97,6 @@ private InitializationMarker initializationMarker(Map ma return marker instanceof InitializationMarker ? (InitializationMarker) marker : null; } - private Operation operationMarker(Map markers, String key) { - if (markers == null) { - return null; - } - MarkerContract marker = markers.get(key); - return marker instanceof Operation ? (Operation) marker : null; - } - private static String trimToNull(String value) { if (value == null) { return null; diff --git a/src/main/java/blue/coordination/processor/SequentialWorkflowOperationProcessor.java b/src/main/java/blue/coordination/processor/SequentialWorkflowOperationProcessor.java index 1a5cf33..45739d0 100644 --- a/src/main/java/blue/coordination/processor/SequentialWorkflowOperationProcessor.java +++ b/src/main/java/blue/coordination/processor/SequentialWorkflowOperationProcessor.java @@ -5,7 +5,7 @@ import blue.language.processor.HandlerProcessor; import blue.language.processor.HandlerRegistrationContext; import blue.language.processor.ProcessorExecutionContext; -import blue.repo.coordination.Operation; +import blue.repo.coordination.SequentialWorkflow; import blue.repo.coordination.SequentialWorkflowOperation; public final class SequentialWorkflowOperationProcessor implements HandlerProcessor { @@ -30,8 +30,7 @@ public Class contractType() { @Override public String deriveChannel(SequentialWorkflowOperation contract, HandlerRegistrationContext context) { - Operation operation = context.contractAs(contract.getOperation(), Operation.class); - String channel = operation != null ? trimToNull(operation.getChannel()) : null; + String channel = trimToNull(contract.getChannel()); if (channel != null && !context.hasContract(channel)) { throw new IllegalStateException("Sequential workflow operation '" + context.handlerKey() + "' references unknown channel '" + channel + "'"); @@ -46,7 +45,7 @@ public boolean matches(SequentialWorkflowOperation contract, HandlerMatchContext @Override public void execute(SequentialWorkflowOperation contract, ProcessorExecutionContext context) { - runner.execute(contract, context); + runner.execute(new SequentialWorkflow().steps(contract.getSteps()), context); } private static String trimToNull(String value) { diff --git a/src/main/java/blue/coordination/processor/TimelineProviderSupport.java b/src/main/java/blue/coordination/processor/TimelineProviderSupport.java index 329b13d..8166ac1 100644 --- a/src/main/java/blue/coordination/processor/TimelineProviderSupport.java +++ b/src/main/java/blue/coordination/processor/TimelineProviderSupport.java @@ -55,6 +55,30 @@ public static boolean isNewerOrSameTimelineEvent(ChannelCheckpointContext contex return currentTimestamp.compareTo(previousTimestamp) >= 0; } + public static boolean isNewerOrDifferentTimelineEvent(ChannelCheckpointContext context) { + Node currentEvent = context.event(); + Node previousEvent = context.lastEvent(); + String currentTimeline = CoordinationEventNodes.timelineId(currentEvent); + String previousTimeline = CoordinationEventNodes.timelineId(previousEvent); + if (currentTimeline != null && previousTimeline != null && !currentTimeline.equals(previousTimeline)) { + return true; + } + return isNewerOrSameTimelineEvent(context); + } + + public static boolean isOlderSameTimelineEvent(Node currentEvent, Node previousEvent) { + String currentTimeline = CoordinationEventNodes.timelineId(currentEvent); + String previousTimeline = CoordinationEventNodes.timelineId(previousEvent); + if (currentTimeline == null || previousTimeline == null || !currentTimeline.equals(previousTimeline)) { + return false; + } + BigInteger currentTimestamp = CoordinationEventNodes.timestamp(currentEvent); + BigInteger previousTimestamp = CoordinationEventNodes.timestamp(previousEvent); + return currentTimestamp != null + && previousTimestamp != null + && currentTimestamp.compareTo(previousTimestamp) < 0; + } + public static Node property(Node node, String key) { if (node == null || node.getProperties() == null) { return null; diff --git a/src/test/java/blue/coordination/processor/AllTimelinesChannelProcessorTest.java b/src/test/java/blue/coordination/processor/AllTimelinesChannelProcessorTest.java new file mode 100644 index 0000000..74880c0 --- /dev/null +++ b/src/test/java/blue/coordination/processor/AllTimelinesChannelProcessorTest.java @@ -0,0 +1,201 @@ +package blue.coordination.processor; + +import blue.language.Blue; +import blue.language.model.Node; +import blue.language.processor.DocumentProcessingResult; +import blue.repo.BlueRepository; +import java.math.BigInteger; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +class AllTimelinesChannelProcessorTest { + + @Test + void allTimelinesChannelLetsDeclaredTimelineParticipantsCallChatOperation() { + Fixture fixture = configuredFixture(); + Node initialized = initializedDocument(fixture, chatDocument(fixture.repository)); + + DocumentProcessingResult alice = processChat(fixture, initialized, "alice", 1, "bob", "hello bob"); + DocumentProcessingResult bob = processChat(fixture, alice.document(), "bob", 1, "alice", "hello alice"); + DocumentProcessingResult charlie = processChat(fixture, bob.document(), "charlie", 1, "alice", "not allowed"); + + assertSuccessful(alice); + assertSuccessful(bob); + assertSuccessful(charlie); + assertChatMessage(alice, "bob", "hello bob"); + assertChatMessage(bob, "alice", "hello alice"); + assertEquals(0, charlie.triggeredEvents().size()); + assertEquals("alice", alice.document() + .getAsText("/contracts/checkpoint/lastEvents/aliceTimeline/timeline/timelineId")); + assertEquals("alice", alice.document() + .getAsText("/contracts/checkpoint/lastEvents/allTimelines/timeline/timelineId")); + assertEquals("bob", bob.document() + .getAsText("/contracts/checkpoint/lastEvents/bobTimeline/timeline/timelineId")); + assertEquals("bob", bob.document() + .getAsText("/contracts/checkpoint/lastEvents/allTimelines/timeline/timelineId")); + assertNull(nodeAt(bob.document(), "/contracts/checkpoint/lastEvents/allTimelines::alice")); + assertNull(nodeAt(bob.document(), "/contracts/checkpoint/lastEvents/allTimelines::bob")); + } + + @Test + void allTimelinesChannelLetsOneParticipantRespondToAnother() { + Fixture fixture = configuredFixture(); + Node initialized = initializedDocument(fixture, chatDocument(fixture.repository)); + + DocumentProcessingResult request = processChat(fixture, initialized, "alice", 1, "bob", "question"); + DocumentProcessingResult response = processChat(fixture, request.document(), "bob", 2, "alice", "answer", "question"); + + assertSuccessful(request); + assertSuccessful(response); + assertChatMessage(request, "bob", "question"); + assertChatMessage(response, "alice", "answer"); + assertEquals("question", onlyChatEvent(response).getAsText("/inReplyTo")); + assertEquals("alice", response.document() + .getAsText("/contracts/checkpoint/lastEvents/aliceTimeline/timeline/timelineId")); + assertEquals("bob", response.document() + .getAsText("/contracts/checkpoint/lastEvents/bobTimeline/timeline/timelineId")); + assertEquals("bob", response.document() + .getAsText("/contracts/checkpoint/lastEvents/allTimelines/timeline/timelineId")); + assertNull(nodeAt(response.document(), "/contracts/checkpoint/lastEvents/allTimelines::alice")); + assertNull(nodeAt(response.document(), "/contracts/checkpoint/lastEvents/allTimelines::bob")); + } + + @Test + void allTimelinesChannelUsesDeclaredTimelineCheckpointsForIndependentRecency() { + Fixture fixture = configuredFixture(); + Node initialized = initializedDocument(fixture, chatDocument(fixture.repository)); + + DocumentProcessingResult alice = processChat(fixture, initialized, "alice", 2, "bob", "newer alice"); + DocumentProcessingResult bob = processChat(fixture, alice.document(), "bob", 1, "alice", "independent bob"); + DocumentProcessingResult staleAlice = processChat(fixture, bob.document(), "alice", 1, "bob", "stale alice"); + + assertSuccessful(alice); + assertSuccessful(bob); + assertSuccessful(staleAlice); + assertChatMessage(alice, "bob", "newer alice"); + assertChatMessage(bob, "alice", "independent bob"); + assertEquals(0, staleAlice.triggeredEvents().size()); + assertEquals(BigInteger.valueOf(2), + staleAlice.document().get("/contracts/checkpoint/lastEvents/aliceTimeline/timestamp")); + assertEquals(BigInteger.valueOf(1), + staleAlice.document().get("/contracts/checkpoint/lastEvents/bobTimeline/timestamp")); + assertEquals("bob", staleAlice.document() + .getAsText("/contracts/checkpoint/lastEvents/allTimelines/timeline/timelineId")); + assertNull(nodeAt(staleAlice.document(), "/contracts/checkpoint/lastEvents/allTimelines::alice")); + assertNull(nodeAt(staleAlice.document(), "/contracts/checkpoint/lastEvents/allTimelines::bob")); + } + + private static Node chatDocument(BlueRepository repository) { + Map contracts = new LinkedHashMap(); + contracts.put("allTimelines", new Node() + .type("Coordination/All Timelines Channel")); + contracts.put("aliceTimeline", TestTimelineProvider.channel("alice")); + contracts.put("bobTimeline", TestTimelineProvider.channel("bob")); + contracts.put("chat", chatWorkflowOperation()); + return new Node() + .blue(repository.typeAliasBlue()) + .name("All Timelines Chat") + .properties("contracts", new Node().properties(contracts)); + } + + private static Node chatWorkflowOperation() { + return new Node() + .type("Coordination/Chat Workflow Operation") + .properties("channel", new Node().value("allTimelines")); + } + + private static Node chatRequest(String to, String message) { + return new Node() + .type("Coordination/Chat Message") + .properties("to", new Node().value(to)) + .properties("message", new Node().value(message)); + } + + private static Node chatRequest(String to, String message, String inReplyTo) { + Node request = chatRequest(to, message); + if (inReplyTo != null) { + request.properties("inReplyTo", new Node().value(inReplyTo)); + } + return request; + } + + private static DocumentProcessingResult processChat(Fixture fixture, + Node document, + String timelineId, + int timestamp, + String to, + String message) { + return processChat(fixture, document, timelineId, timestamp, to, message, null); + } + + private static DocumentProcessingResult processChat(Fixture fixture, + Node document, + String timelineId, + int timestamp, + String to, + String message, + String inReplyTo) { + return fixture.blue.processDocument(document, + TestTimelineProvider.timelineEntry(fixture.blue, + fixture.repository, + timelineId, + timestamp, + CoordinationTestResources.operationRequest("chat", chatRequest(to, message, inReplyTo)))); + } + + private static Node initializedDocument(Fixture fixture, Node document) { + DocumentProcessingResult result = fixture.blue.initializeDocument(fixture.blue.preprocess(document)); + assertSuccessful(result); + return result.document(); + } + + private static Fixture configuredFixture() { + BlueRepository repository = BlueRepository.v1_3_0(); + Blue blue = CoordinationTestResources.configuredBlue(repository); + CoordinationProcessors.registerWith(blue); + TestTimelineProvider.registerWith(blue); + return new Fixture(repository, blue); + } + + private static void assertSuccessful(DocumentProcessingResult result) { + assertFalse(result.capabilityFailure(), result.failureReason()); + } + + private static void assertChatMessage(DocumentProcessingResult result, String expectedTo, String expectedMessage) { + Node event = onlyChatEvent(result); + assertEquals(expectedTo, event.getAsText("/to")); + assertEquals(expectedMessage, event.getAsText("/message")); + } + + private static Node onlyChatEvent(DocumentProcessingResult result) { + List events = result.triggeredEvents(); + assertEquals(1, events.size()); + return events.get(0); + } + + private static Node nodeAt(Node node, String path) { + try { + Object value = node.get(path); + return value instanceof Node ? (Node) value : null; + } catch (IllegalArgumentException ex) { + return null; + } + } + + private static final class Fixture { + private final BlueRepository repository; + private final Blue blue; + + private Fixture(BlueRepository repository, Blue blue) { + this.repository = repository; + this.blue = blue; + } + } +} diff --git a/src/test/java/blue/coordination/processor/CompositeTimelineChannelProcessorTest.java b/src/test/java/blue/coordination/processor/CompositeTimelineChannelProcessorTest.java index 1c882c8..5faf447 100644 --- a/src/test/java/blue/coordination/processor/CompositeTimelineChannelProcessorTest.java +++ b/src/test/java/blue/coordination/processor/CompositeTimelineChannelProcessorTest.java @@ -31,7 +31,8 @@ void deliversMatchingChildEvent() { DocumentProcessingResult result = processChat(fixture, initialized, "owner", 1, "hello"); assertChatCount(result.triggeredEvents(), "composite saw owner", 1); - assertNotNull(nodeAt(result.document(), "/contracts/checkpoint/lastEvents/inbox::owner")); + assertNotNull(nodeAt(result.document(), "/contracts/checkpoint/lastEvents/inbox")); + assertNull(nodeAt(result.document(), "/contracts/checkpoint/lastEvents/inbox::owner")); } @Test @@ -46,12 +47,11 @@ void doesNotDeliverNonMatchingChild() { assertChatCount(result.triggeredEvents(), "composite saw owner", 0); assertChatCount(result.triggeredEvents(), "composite saw support", 0); - assertNull(nodeAt(result.document(), "/contracts/checkpoint/lastEvents/inbox::owner")); - assertNull(nodeAt(result.document(), "/contracts/checkpoint/lastEvents/inbox::support")); + assertNull(nodeAt(result.document(), "/contracts/checkpoint/lastEvents/inbox")); } @Test - void deliversMultipleMatchingChildrenIndependently() { + void deliversMultipleMatchingChildrenOnceThroughCompositeChannel() { Fixture fixture = configuredFixture(); Node initialized = initializedDocument(fixture, compositeDocument(fixture.repository, Arrays.asList("childA", "childB"), @@ -61,13 +61,14 @@ void deliversMultipleMatchingChildrenIndependently() { DocumentProcessingResult result = processChat(fixture, initialized, "owner", 1, "hello"); assertChatCount(result.triggeredEvents(), "composite saw childA", 1); - assertChatCount(result.triggeredEvents(), "composite saw childB", 1); - assertNotNull(nodeAt(result.document(), "/contracts/checkpoint/lastEvents/inbox::childA")); - assertNotNull(nodeAt(result.document(), "/contracts/checkpoint/lastEvents/inbox::childB")); + assertChatCount(result.triggeredEvents(), "composite saw childB", 0); + assertNotNull(nodeAt(result.document(), "/contracts/checkpoint/lastEvents/inbox")); + assertNull(nodeAt(result.document(), "/contracts/checkpoint/lastEvents/inbox::childA")); + assertNull(nodeAt(result.document(), "/contracts/checkpoint/lastEvents/inbox::childB")); } @Test - void duplicateEventIsSkippedPerChildCheckpoint() { + void duplicateEventIsSkippedByCompositeChannelCheckpoint() { Fixture fixture = configuredFixture(); Node initialized = initializedDocument(fixture, compositeDocument(fixture.repository, Arrays.asList("childA", "childB"), @@ -76,22 +77,21 @@ void duplicateEventIsSkippedPerChildCheckpoint() { Node event = chatTimelineEntry(fixture, "owner", 1, "hello"); DocumentProcessingResult first = fixture.blue.processDocument(initialized, event); - Node firstA = nodeAt(first.document(), "/contracts/checkpoint/lastEvents/inbox::childA"); - Node firstB = nodeAt(first.document(), "/contracts/checkpoint/lastEvents/inbox::childB"); + Node firstCheckpoint = nodeAt(first.document(), "/contracts/checkpoint/lastEvents/inbox"); DocumentProcessingResult second = fixture.blue.processDocument(first.document(), event); assertChatCount(first.triggeredEvents(), "composite saw childA", 1); - assertChatCount(first.triggeredEvents(), "composite saw childB", 1); + assertChatCount(first.triggeredEvents(), "composite saw childB", 0); assertChatCount(second.triggeredEvents(), "composite saw childA", 0); assertChatCount(second.triggeredEvents(), "composite saw childB", 0); - assertEquals(firstA.get("/timestamp"), - nodeAt(second.document(), "/contracts/checkpoint/lastEvents/inbox::childA/timestamp").getValue()); - assertEquals(firstB.get("/timestamp"), - nodeAt(second.document(), "/contracts/checkpoint/lastEvents/inbox::childB/timestamp").getValue()); + assertEquals(firstCheckpoint.get("/timestamp"), + nodeAt(second.document(), "/contracts/checkpoint/lastEvents/inbox/timestamp").getValue()); + assertNull(nodeAt(second.document(), "/contracts/checkpoint/lastEvents/inbox::childA")); + assertNull(nodeAt(second.document(), "/contracts/checkpoint/lastEvents/inbox::childB")); } @Test - void childTimelineRecencyIsRespectedPerCompositeCheckpoint() { + void timelineRecencyIsRespectedPerCompositeChannelCheckpoint() { Fixture fixture = configuredFixture(); Node initialized = initializedDocument(fixture, compositeDocument(fixture.repository, Arrays.asList("owner"), @@ -106,9 +106,34 @@ void childTimelineRecencyIsRespectedPerCompositeCheckpoint() { assertChatCount(stale.triggeredEvents(), "composite saw owner", 0); assertChatCount(newer.triggeredEvents(), "composite saw owner", 1); assertEquals(BigInteger.valueOf(10), - stale.document().get("/contracts/checkpoint/lastEvents/inbox::owner/timestamp")); + stale.document().get("/contracts/checkpoint/lastEvents/inbox/timestamp")); assertEquals(BigInteger.valueOf(11), - newer.document().get("/contracts/checkpoint/lastEvents/inbox::owner/timestamp")); + newer.document().get("/contracts/checkpoint/lastEvents/inbox/timestamp")); + } + + @Test + void compositeChannelUsesChildCheckpointsForIndependentRecency() { + Fixture fixture = configuredFixture(); + Node initialized = initializedDocument(fixture, compositeDocument(fixture.repository, + Arrays.asList("owner", "support"), + timelineChannel("owner"), + timelineChannel("support"))); + + DocumentProcessingResult owner = processChat(fixture, initialized, "owner", 10, "owner latest"); + DocumentProcessingResult support = processChat(fixture, owner.document(), "support", 1, "support first"); + DocumentProcessingResult staleOwner = processChat(fixture, support.document(), "owner", 9, "owner stale"); + + assertChatCount(owner.triggeredEvents(), "composite saw owner", 1); + assertChatCount(support.triggeredEvents(), "composite saw support", 1); + assertChatCount(staleOwner.triggeredEvents(), "composite saw owner", 0); + assertEquals(BigInteger.valueOf(10), + staleOwner.document().get("/contracts/checkpoint/lastEvents/owner/timestamp")); + assertEquals(BigInteger.valueOf(1), + staleOwner.document().get("/contracts/checkpoint/lastEvents/support/timestamp")); + assertEquals("support", + staleOwner.document().getAsText("/contracts/checkpoint/lastEvents/inbox/timeline/timelineId")); + assertNull(nodeAt(staleOwner.document(), "/contracts/checkpoint/lastEvents/inbox::owner")); + assertNull(nodeAt(staleOwner.document(), "/contracts/checkpoint/lastEvents/inbox::support")); } @Test diff --git a/src/test/java/blue/coordination/processor/CoordinationProcessorsTest.java b/src/test/java/blue/coordination/processor/CoordinationProcessorsTest.java index 633efe6..9bc924d 100644 --- a/src/test/java/blue/coordination/processor/CoordinationProcessorsTest.java +++ b/src/test/java/blue/coordination/processor/CoordinationProcessorsTest.java @@ -10,7 +10,9 @@ import blue.language.processor.model.MarkerContract; import blue.language.utils.TypeClassResolver; import blue.repo.BlueRepository; +import blue.repo.coordination.AllTimelinesChannel; import blue.repo.coordination.ChatMessage; +import blue.repo.coordination.ChatWorkflowOperation; import blue.repo.coordination.CompositeTimelineChannel; import blue.repo.coordination.Operation; import blue.repo.coordination.OperationRequest; @@ -27,7 +29,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; class CoordinationProcessorsTest { @@ -56,18 +57,18 @@ void realRepositoryCoordinationContractsLoadAndInitialize() { Map contracts = contracts(preprocessed); assertEquals(TimelineChannel.blueId(), contracts.get("ownerChannel").getType().getBlueId()); - assertEquals(Operation.blueId(), contracts.get("increment").getType().getBlueId()); assertEquals(SequentialWorkflowOperation.blueId(), - contracts.get("incrementImpl").getType().getBlueId()); + contracts.get("increment").getType().getBlueId()); Object convertedOperation = fixture.blue.nodeToObject(contracts.get("increment"), Object.class); - assertTrue(convertedOperation instanceof Operation); - assertEquals("ownerChannel", ((Operation) convertedOperation).getChannel()); + assertTrue(convertedOperation instanceof SequentialWorkflowOperation); + assertEquals("ownerChannel", ((SequentialWorkflowOperation) convertedOperation).getChannel()); - Object convertedHandler = fixture.blue.nodeToObject(contracts.get("incrementImpl"), Object.class); + Object convertedHandler = fixture.blue.nodeToObject(contracts.get("increment"), Object.class); assertTrue(convertedHandler instanceof SequentialWorkflowOperation); SequentialWorkflowOperation handler = (SequentialWorkflowOperation) convertedHandler; - assertEquals("increment", handler.getOperation()); + assertEquals("ownerChannel", handler.getChannel()); + assertNotNull(handler.getRequest()); assertNotNull(handler.getSteps()); assertTrue(handler.getSteps().isEmpty()); @@ -80,35 +81,44 @@ void realRepositoryCoordinationContractsLoadAndInitialize() { } @Test - void initializationFailsWhenSequentialWorkflowOperationDerivesMissingChannel() { + void sequentialWorkflowOperationWithMissingChannelDoesNotRun() { Fixture fixture = configuredFixture(); TestTimelineProvider.registerWith(fixture.blue); Node document = counterDocument(fixture.repository, "missingChannel"); Node preprocessed = fixture.blue.preprocess(document.clone()); - IllegalStateException ex = assertThrows(IllegalStateException.class, - () -> fixture.blue.initializeDocument(preprocessed)); + DocumentProcessingResult initialized = fixture.blue.initializeDocument(preprocessed); + DocumentProcessingResult processed = fixture.blue.processDocument(initialized.document(), + TestTimelineProvider.timelineEntry(fixture.blue, + fixture.repository, + "owner", + 1, + CoordinationTestResources.operationRequest("increment", new Node().value(7)))); - assertTrue(ex.getMessage().contains("unknown channel")); - assertTrue(ex.getMessage().contains("missingChannel")); + assertFalse(processed.capabilityFailure(), processed.failureReason()); + assertEquals(BigInteger.ZERO, processed.document().getProperties().get("counter").getValue()); } @Test void generatedRepositoryContractsProvideProcessorModelBaseTypes() { + assertTrue(ChannelContract.class.isAssignableFrom(AllTimelinesChannel.class)); assertTrue(ChannelContract.class.isAssignableFrom(TimelineChannel.class)); assertTrue(ChannelContract.class.isAssignableFrom(CompositeTimelineChannel.class)); + assertTrue(HandlerContract.class.isAssignableFrom(ChatWorkflowOperation.class)); assertTrue(HandlerContract.class.isAssignableFrom(SequentialWorkflow.class)); + assertTrue(HandlerContract.class.isAssignableFrom(Operation.class)); assertTrue(HandlerContract.class.isAssignableFrom(SequentialWorkflowOperation.class)); - assertTrue(MarkerContract.class.isAssignableFrom(Operation.class)); } @Test void generatedCoordinationTypesResolveToRepositoryClasses() { TypeClassResolver resolver = BlueRepository.v1_3_0().typeClassResolver(); + assertEquals(AllTimelinesChannel.class, resolver.resolveClass(AllTimelinesChannel.blueId())); assertEquals(TimelineChannel.class, resolver.resolveClass(TimelineChannel.blueId())); assertEquals(CompositeTimelineChannel.class, resolver.resolveClass(CompositeTimelineChannel.blueId())); + assertEquals(ChatWorkflowOperation.class, resolver.resolveClass(ChatWorkflowOperation.blueId())); assertEquals(Operation.class, resolver.resolveClass(Operation.blueId())); assertEquals(SequentialWorkflow.class, resolver.resolveClass(SequentialWorkflow.blueId())); assertEquals(SequentialWorkflowOperation.class, @@ -121,9 +131,12 @@ void generatedCoordinationTypesResolveToRepositoryClasses() { private static void assertCoordinationProcessorsRegistered(DocumentProcessor processor) { ContractProcessorRegistry registry = processor.getContractRegistry(); + assertTrue(registry.lookupChannel(AllTimelinesChannel.blueId()).isPresent()); assertFalse(registry.lookupChannel(TimelineChannel.blueId()).isPresent()); assertTrue(registry.lookupChannel(CompositeTimelineChannel.blueId()).isPresent()); - assertTrue(registry.lookupMarker(Operation.blueId()).isPresent()); + assertFalse(registry.lookupMarker(Operation.blueId()).isPresent()); + assertTrue(registry.lookupHandler(ChatWorkflowOperation.blueId()).isPresent()); + assertTrue(registry.lookupHandler(Operation.blueId()).isPresent()); assertTrue(registry.lookupHandler(SequentialWorkflow.blueId()).isPresent()); assertTrue(registry.lookupHandler(SequentialWorkflowOperation.blueId()).isPresent()); } @@ -141,12 +154,9 @@ private static Node counterDocument(BlueRepository repository, String operationC .type("Coordination/Timeline Channel") .properties("timelineId", new Node().value("owner"))); contracts.put("increment", new Node() - .type("Coordination/Operation") - .properties("channel", new Node().value(operationChannel)) - .properties("request", new Node().type("Integer"))); - contracts.put("incrementImpl", new Node() .type("Coordination/Sequential Workflow Operation") - .properties("operation", new Node().value("increment")) + .properties("channel", new Node().value(operationChannel)) + .properties("request", new Node().type("Integer")) .properties("steps", new Node().items(Collections.emptyList()))); return new Node() diff --git a/src/test/java/blue/coordination/processor/OperationRequestMatchingTest.java b/src/test/java/blue/coordination/processor/OperationRequestMatchingTest.java index 4c1e8fb..1fc480f 100644 --- a/src/test/java/blue/coordination/processor/OperationRequestMatchingTest.java +++ b/src/test/java/blue/coordination/processor/OperationRequestMatchingTest.java @@ -20,8 +20,7 @@ void directOperationRequestRunsThroughTriggeredChannel() { Fixture fixture = configuredFixture(); Map contracts = ownerContracts(); contracts.put("triggered", triggeredChannel()); - contracts.put("increment", operation("triggered", integerPattern())); - contracts.put("incrementImpl", sequentialWorkflowOperation("increment", + contracts.put("increment", operation("triggered", integerPattern(), updateDocumentStep("replace", "/counter", directOperationIncrementValue()))); contracts.put("producer", directWorkflow("owner", triggerEventStep(operationRequestEventNode("increment", new Node().value(7))))); @@ -36,8 +35,7 @@ void directOperationRequestRunsThroughTriggeredChannel() { void timelineEntryOperationRequestStillRuns() { Fixture fixture = configuredFixture(); Node initialized = initializedDocument(fixture, timelineCounterDocument(fixture.repository, - operation("owner", integerPattern()), - sequentialWorkflowOperation("increment", + operation("owner", integerPattern(), updateDocumentStep("replace", "/counter", timelineIncrementValue())))); Node processed = processOperationRequest(fixture, initialized, "owner", 1, "increment", new Node().value(7)); @@ -45,17 +43,61 @@ void timelineEntryOperationRequestStillRuns() { assertCounter(processed, 7); } + @Test + void directSequentialWorkflowOperationDeclaresChannelRequestAndSteps() { + Fixture fixture = configuredFixture(); + Node initialized = initializedDocument(fixture, timelineCounterDocument(fixture.repository, + operation("owner", integerPattern(), + updateDocumentStep("replace", "/counter", timelineIncrementValue())))); + + Node processed = processOperationRequest(fixture, initialized, "owner", 1, "increment", new Node().value(7)); + + assertCounter(processed, 7); + } + + @Test + void operationDeclarationCanCoexistWithConcreteSequentialWorkflowOperation() { + Fixture fixture = configuredFixture(); + Map contracts = ownerContracts(); + contracts.put("incrementShape", operationDeclaration("owner", integerPattern())); + contracts.put("increment", operation("owner", integerPattern(), + updateDocumentStep("replace", "/counter", timelineIncrementValue()))); + Node initialized = initializedDocument(fixture, document(fixture.repository, 0, contracts)); + + Node processed = processOperationRequest(fixture, initialized, "owner", 1, "increment", new Node().value(7)); + + assertCounter(processed, 7); + } + + @Test + void operationDeclarationCanBeSpecializedBeforeConcreteSequentialWorkflowOperation() { + Fixture fixture = configuredFixture(); + Map contracts = ownerContracts(); + contracts.put("incrementShape", operationDeclaration("owner", null)); + contracts.put("incrementAmountShape", operationDeclaration("owner", objectAmountPattern())); + contracts.put("increment", operation("owner", objectAmountPattern(), + updateDocumentStep("replace", "/counter", timelineAmountIncrementValue()))); + Node initialized = initializedDocument(fixture, document(fixture.repository, 0, contracts)); + + Node accepted = processOperationRequest(fixture, initialized, "owner", 1, "increment", + new Node().properties("amount", new Node().value(7))); + Node rejected = processOperationRequest(fixture, accepted, "owner", 2, "increment", + new Node().properties("ignored", new Node().value(7))); + + assertCounter(accepted, 7); + assertCounter(rejected, 7); + } + @Test void sequentialWorkflowOperationEventPatternAllowsMatchingEvent() { Fixture fixture = configuredFixture(); - Node workflow = sequentialWorkflowOperation("increment", + Node workflow = operation("owner", integerPattern(), updateDocumentStep("replace", "/counter", timelineIncrementValue())); workflow.properties("event", new Node() .type("Coordination/Timeline Entry") .properties("source", new Node() .properties("value", new Node().value("web")))); Node initialized = initializedDocument(fixture, timelineCounterDocument(fixture.repository, - operation("owner", integerPattern()), workflow)); Node processed = processOperationRequest(fixture, initialized, "owner", 1, "increment", new Node().value(7), "web"); @@ -66,14 +108,13 @@ void sequentialWorkflowOperationEventPatternAllowsMatchingEvent() { @Test void sequentialWorkflowOperationEventPatternRejectsDifferentEvent() { Fixture fixture = configuredFixture(); - Node workflow = sequentialWorkflowOperation("increment", + Node workflow = operation("owner", integerPattern(), updateDocumentStep("replace", "/counter", timelineIncrementValue())); workflow.properties("event", new Node() .type("Coordination/Timeline Entry") .properties("source", new Node() .properties("value", new Node().value("web")))); Node initialized = initializedDocument(fixture, timelineCounterDocument(fixture.repository, - operation("owner", integerPattern()), workflow)); Node processed = processOperationRequest(fixture, initialized, "owner", 1, "increment", new Node().value(7), "api"); @@ -82,11 +123,10 @@ void sequentialWorkflowOperationEventPatternRejectsDifferentEvent() { } @Test - void operationHandlerCanDeriveChannelFromOperation() { + void sequentialWorkflowOperationUsesDeclaredChannel() { Fixture fixture = configuredFixture(); Node initialized = initializedDocument(fixture, timelineCounterDocument(fixture.repository, - operation("owner", integerPattern()), - sequentialWorkflowOperation("increment", + operation("owner", integerPattern(), updateDocumentStep("replace", "/counter", timelineIncrementValue())))); Node processed = processOperationRequest(fixture, initialized, "owner", 1, "increment", new Node().value(7)); @@ -95,45 +135,12 @@ void operationHandlerCanDeriveChannelFromOperation() { } @Test - void operationHandlerCanDeclareSameChannelAsOperation() { - Fixture fixture = configuredFixture(); - Node workflow = sequentialWorkflowOperation("increment", - updateDocumentStep("replace", "/counter", timelineIncrementValue())); - workflow.properties("channel", new Node().value("owner")); - Node initialized = initializedDocument(fixture, timelineCounterDocument(fixture.repository, - operation("owner", integerPattern()), - workflow)); - - Node processed = processOperationRequest(fixture, initialized, "owner", 1, "increment", new Node().value(7)); - - assertCounter(processed, 7); - } - - @Test - void operationWithoutChannelCanUseExplicitHandlerChannel() { - Fixture fixture = configuredFixture(); - Node workflow = sequentialWorkflowOperation("increment", - updateDocumentStep("replace", "/counter", timelineIncrementValue())); - workflow.properties("channel", new Node().value("owner")); - Node initialized = initializedDocument(fixture, timelineCounterDocument(fixture.repository, - operation(null, integerPattern()), - workflow)); - - Node processed = processOperationRequest(fixture, initialized, "owner", 1, "increment", new Node().value(7)); - - assertCounter(processed, 7); - } - - @Test - void conflictingOperationAndHandlerChannelsDoNotRun() { + void operationRequestMustArriveThroughDeclaredChannel() { Fixture fixture = configuredFixture(); Map contracts = ownerContracts(); contracts.put("other", timelineChannel("other")); - contracts.put("increment", operation("owner", integerPattern())); - Node workflow = sequentialWorkflowOperation("increment", - updateDocumentStep("replace", "/counter", timelineIncrementValue())); - workflow.properties("channel", new Node().value("other")); - contracts.put("incrementImpl", workflow); + contracts.put("increment", operation("owner", integerPattern(), + updateDocumentStep("replace", "/counter", timelineIncrementValue()))); Node initialized = initializedDocument(fixture, document(fixture.repository, 0, contracts)); Node processed = processOperationRequest(fixture, initialized, "other", 1, "increment", new Node().value(7)); @@ -145,8 +152,7 @@ void conflictingOperationAndHandlerChannelsDoNotRun() { void integerRequestPatternAcceptsIntegerAndRejectsText() { Fixture fixture = configuredFixture(); Node initialized = initializedDocument(fixture, timelineCounterDocument(fixture.repository, - operation("owner", integerPattern()), - sequentialWorkflowOperation("increment", + operation("owner", integerPattern(), updateDocumentStep("replace", "/counter", timelineIncrementValue())))); Node afterInteger = processOperationRequest(fixture, initialized, "owner", 1, "increment", new Node().value(7)); @@ -160,8 +166,7 @@ void integerRequestPatternAcceptsIntegerAndRejectsText() { void objectRequestPatternAcceptsRequiredNestedProperty() { Fixture fixture = configuredFixture(); Node initialized = initializedDocument(fixture, timelineCounterDocument(fixture.repository, - operation("owner", objectAmountPattern()), - sequentialWorkflowOperation("increment", + operation("owner", objectAmountPattern(), updateDocumentStep("replace", "/counter", timelineAmountIncrementValue())))); Node processed = processOperationRequest(fixture, initialized, "owner", 1, "increment", @@ -174,8 +179,7 @@ void objectRequestPatternAcceptsRequiredNestedProperty() { void objectRequestPatternRejectsMissingRequiredNestedProperty() { Fixture fixture = configuredFixture(); Node initialized = initializedDocument(fixture, timelineCounterDocument(fixture.repository, - operation("owner", objectAmountPattern()), - sequentialWorkflowOperation("increment", + operation("owner", objectAmountPattern(), updateDocumentStep("replace", "/counter", timelineAmountIncrementValue())))); Node processed = processOperationRequest(fixture, initialized, "owner", 1, "increment", @@ -188,8 +192,7 @@ void objectRequestPatternRejectsMissingRequiredNestedProperty() { void requestPatternIgnoresIrrelevantLargePayloadBranches() { Fixture fixture = configuredFixture(); Node initialized = initializedDocument(fixture, timelineCounterDocument(fixture.repository, - operation("owner", objectAmountPattern()), - sequentialWorkflowOperation("increment", + operation("owner", objectAmountPattern(), updateDocumentStep("replace", "/counter", timelineAmountIncrementValue())))); Node irrelevant = new Node().properties("nested", largePayloadBranch()); Node request = new Node() @@ -207,8 +210,7 @@ void requestPatternIgnoresIrrelevantLargePayloadBranches() { void pinnedMatchingInitialDocumentRunsWhenNewerVersionIsNotAllowed() { Fixture fixture = configuredFixture(); Node original = timelineCounterDocument(fixture.repository, - operation("owner", integerPattern()), - sequentialWorkflowOperation("increment", + operation("owner", integerPattern(), updateDocumentStep("replace", "/counter", timelineIncrementValue()))); Node initialized = initializedDocument(fixture, original); Node pinned = new Node().blueId((String) initialized.get("/contracts/initialized/documentId")); @@ -225,8 +227,7 @@ void pinnedMatchingInitialDocumentRunsWhenNewerVersionIsNotAllowed() { void pinnedStaleDocumentDoesNotRunWhenNewerVersionIsNotAllowed() { Fixture fixture = configuredFixture(); Node original = timelineCounterDocument(fixture.repository, - operation("owner", integerPattern()), - sequentialWorkflowOperation("increment", + operation("owner", integerPattern(), updateDocumentStep("replace", "/counter", timelineIncrementValue()))); Node initialized = initializedDocument(fixture, original); Node stale = new Node().blueId("2vz831ZwzhpUefTb5XkodBRANKpFMbj1F4CN33kf38Hw"); @@ -243,8 +244,7 @@ void pinnedStaleDocumentDoesNotRunWhenNewerVersionIsNotAllowed() { void allowNewerVersionTrueRunsWithStalePinnedDocument() { Fixture fixture = configuredFixture(); Node original = timelineCounterDocument(fixture.repository, - operation("owner", integerPattern()), - sequentialWorkflowOperation("increment", + operation("owner", integerPattern(), updateDocumentStep("replace", "/counter", timelineIncrementValue()))); Node initialized = initializedDocument(fixture, original); Node stale = new Node().blueId("2vz831ZwzhpUefTb5XkodBRANKpFMbj1F4CN33kf38Hw"); @@ -261,8 +261,7 @@ void allowNewerVersionTrueRunsWithStalePinnedDocument() { void missingPinnedDocumentRunsWhenNewerVersionIsNotAllowed() { Fixture fixture = configuredFixture(); Node initialized = initializedDocument(fixture, timelineCounterDocument(fixture.repository, - operation("owner", integerPattern()), - sequentialWorkflowOperation("increment", + operation("owner", integerPattern(), updateDocumentStep("replace", "/counter", timelineIncrementValue())))); Node processed = processOperationRequest(fixture, initialized, "owner", 1, @@ -272,14 +271,13 @@ void missingPinnedDocumentRunsWhenNewerVersionIsNotAllowed() { assertCounter(processed, 7); } - private static Node timelineCounterDocument(BlueRepository repository, Node operation, Node handler) { - return timelineCounterDocument(repository, 0, operation, handler); + private static Node timelineCounterDocument(BlueRepository repository, Node operation) { + return timelineCounterDocument(repository, 0, operation); } - private static Node timelineCounterDocument(BlueRepository repository, int counter, Node operation, Node handler) { + private static Node timelineCounterDocument(BlueRepository repository, int counter, Node operation) { Map contracts = ownerContracts(); contracts.put("increment", operation); - contracts.put("incrementImpl", handler); return document(repository, counter, contracts); } @@ -297,16 +295,29 @@ private static Node triggeredChannel() { return new Node().type("Triggered Event Channel"); } - private static Node operation(String channel, Node requestPattern) { + private static Node operation(String channel, Node requestPattern, Node... steps) { Node operation = new Node() - .type("Coordination/Operation") - .properties("request", requestPattern); + .type("Coordination/Sequential Workflow Operation") + .properties("request", requestPattern) + .properties("steps", new Node().items(steps)); if (channel != null) { operation.properties("channel", new Node().value(channel)); } return operation; } + private static Node operationDeclaration(String channel, Node requestPattern) { + Node operation = new Node() + .type("Coordination/Operation"); + if (channel != null) { + operation.properties("channel", new Node().value(channel)); + } + if (requestPattern != null) { + operation.properties("request", requestPattern); + } + return operation; + } + private static Node integerPattern() { return new Node().type("Integer"); } @@ -318,13 +329,6 @@ private static Node objectAmountPattern() { .schema(new Schema().required(new Node().value(true)))); } - private static Node sequentialWorkflowOperation(String operation, Node... steps) { - return new Node() - .type("Coordination/Sequential Workflow Operation") - .properties("operation", new Node().value(operation)) - .properties("steps", new Node().items(steps)); - } - private static Node directWorkflow(String channel, Node... steps) { return new Node() .type("Coordination/Sequential Workflow") diff --git a/src/test/java/blue/coordination/processor/RepositoryStyleCounterDocumentTest.java b/src/test/java/blue/coordination/processor/RepositoryStyleCounterDocumentTest.java index 67943d1..3243639 100644 --- a/src/test/java/blue/coordination/processor/RepositoryStyleCounterDocumentTest.java +++ b/src/test/java/blue/coordination/processor/RepositoryStyleCounterDocumentTest.java @@ -88,7 +88,7 @@ private static String richCounterDocumentYaml() { " value: " + TIMELINE_ID, " increment:", " description: Increment the counter by the given number", - " type: Coordination/Operation", + " type: Coordination/Sequential Workflow Operation", " order:", " description: Deterministic sort key within a scope; missing == 0.", " type: Integer", @@ -99,14 +99,6 @@ private static String richCounterDocumentYaml() { " request:", " description: Represents a value by which counter will be incremented", " type: Integer", - " incrementImpl:", - " type: Coordination/Sequential Workflow Operation", - " order:", - " description: Deterministic sort key within a scope; missing == 0.", - " type: Integer", - " channel:", - " description: The contracts-map key of the channel this handler binds to (same scope).", - " type: Text", " event:", " description: Optional matcher payload used by the handler's processor to further restrict events.", " steps:", @@ -144,13 +136,9 @@ private static String richCounterDocumentYaml() { " - $text:", " $document: /counter", " - $return: {}", - " operation:", - " description: The name of the Operation this handler implements. Must reference an Operation defined in the same scope.", - " type: Text", - " value: increment", " decrement:", " description: Decrement the counter by the given number", - " type: Coordination/Operation", + " type: Coordination/Sequential Workflow Operation", " order:", " description: Deterministic sort key within a scope; missing == 0.", " type: Integer", @@ -161,14 +149,6 @@ private static String richCounterDocumentYaml() { " request:", " description: Value to subtract", " type: Integer", - " decrementImpl:", - " type: Coordination/Sequential Workflow Operation", - " order:", - " description: Deterministic sort key within a scope; missing == 0.", - " type: Integer", - " channel:", - " description: The contracts-map key of the channel this handler binds to (same scope).", - " type: Text", " event:", " description: Optional matcher payload used by the handler's processor to further restrict events.", " steps:", @@ -205,11 +185,7 @@ private static String richCounterDocumentYaml() { " - \" and is now \"", " - $text:", " $document: /counter", - " - $return: {}", - " operation:", - " description: The name of the Operation this handler implements. Must reference an Operation defined in the same scope.", - " type: Text", - " value: decrement"); + " - $return: {}"); } private static Node operationRequest(String operation, int request) { diff --git a/src/test/java/blue/coordination/processor/RuntimeChannelsTest.java b/src/test/java/blue/coordination/processor/RuntimeChannelsTest.java index db7c4be..d4238fd 100644 --- a/src/test/java/blue/coordination/processor/RuntimeChannelsTest.java +++ b/src/test/java/blue/coordination/processor/RuntimeChannelsTest.java @@ -229,8 +229,7 @@ void multipleCheckpointMarkersInOneScopeFail() { private static Node embeddedOperationDocument(BlueRepository repository) { Map childContracts = ownerChannelContracts(); - childContracts.put("increment", operation("owner")); - childContracts.put("incrementImpl", sequentialWorkflowOperation("increment", + childContracts.put("increment", sequentialWorkflowOperation("owner", computePatchStep("replace", "/counter", bexAdd(bexBinding("event", "/message/request"), bexDocument("/counter"))))); @@ -289,17 +288,11 @@ private static Node processEmbedded(String path) { .properties("paths", new Node().items(new Node().value(path))); } - private static Node operation(String channel) { - return new Node() - .type("Coordination/Operation") - .properties("channel", new Node().value(channel)) - .properties("request", new Node().type("Integer")); - } - - private static Node sequentialWorkflowOperation(String operation, Node... steps) { + private static Node sequentialWorkflowOperation(String channel, Node... steps) { return new Node() .type("Coordination/Sequential Workflow Operation") - .properties("operation", new Node().value(operation)) + .properties("channel", new Node().value(channel)) + .properties("request", new Node().type("Integer")) .properties("steps", new Node().items(steps)); } diff --git a/src/test/java/blue/coordination/processor/SequentialWorkflowExecutionTest.java b/src/test/java/blue/coordination/processor/SequentialWorkflowExecutionTest.java index 6cc54aa..6f75b10 100644 --- a/src/test/java/blue/coordination/processor/SequentialWorkflowExecutionTest.java +++ b/src/test/java/blue/coordination/processor/SequentialWorkflowExecutionTest.java @@ -373,12 +373,10 @@ private static DocumentProcessingResult processChat(Fixture fixture, private static Node counterDocument(BlueRepository repository, int counter, boolean includeDecrement) { Map contracts = baseOperationContracts(); - contracts.put("increment", operation("ownerChannel")); - contracts.put("incrementImpl", sequentialWorkflowOperation("increment", + contracts.put("increment", sequentialWorkflowOperation("ownerChannel", computeReplaceCounterStep(incrementValue()))); if (includeDecrement) { - contracts.put("decrement", operation("ownerChannel")); - contracts.put("decrementImpl", sequentialWorkflowOperation("decrement", + contracts.put("decrement", sequentialWorkflowOperation("ownerChannel", computeReplaceCounterStep(decrementValue()))); } return document(repository, counter, contracts); @@ -386,8 +384,7 @@ private static Node counterDocument(BlueRepository repository, int counter, bool private static Node counterWorkflowDocument(BlueRepository repository, int counter, Node... steps) { Map contracts = baseOperationContracts(); - contracts.put("increment", operation("ownerChannel")); - contracts.put("incrementImpl", sequentialWorkflowOperation("increment", steps)); + contracts.put("increment", sequentialWorkflowOperation("ownerChannel", steps)); return document(repository, counter, contracts); } @@ -397,16 +394,14 @@ private static Node staticUpdateDocument(BlueRepository repository, int counter, private static Node staticUpdateDocument(BlueRepository repository, Node counter, Node value) { Map contracts = baseOperationContracts(); - contracts.put("increment", operation("ownerChannel")); - contracts.put("incrementImpl", sequentialWorkflowOperation("increment", + contracts.put("increment", sequentialWorkflowOperation("ownerChannel", updateDocumentStep("replace", "/counter", value))); return document(repository, counter, contracts); } private static Node doubleIncrementDocument(BlueRepository repository) { Map contracts = baseOperationContracts(); - contracts.put("increment", operation("ownerChannel")); - contracts.put("incrementImpl", sequentialWorkflowOperation("increment", + contracts.put("increment", sequentialWorkflowOperation("ownerChannel", computeReplaceCounterStep(incrementValue()), computeReplaceCounterStep(incrementValue()))); return document(repository, 0, contracts); @@ -468,8 +463,7 @@ private static Node stepResultsDocument(BlueRepository repository) { private static Node embeddedScopeDocument(BlueRepository repository) { Map childContracts = baseOperationContracts(); - childContracts.put("increment", operation("ownerChannel")); - childContracts.put("incrementImpl", sequentialWorkflowOperation("increment", + childContracts.put("increment", sequentialWorkflowOperation("ownerChannel", computeReplaceCounterStep(incrementValue()))); Map rootContracts = new LinkedHashMap<>(); @@ -494,17 +488,11 @@ private static Map baseOperationContracts() { return contracts; } - private static Node operation(String channel) { - return new Node() - .type("Coordination/Operation") - .properties("channel", new Node().value(channel)) - .properties("request", new Node().type("Integer")); - } - - private static Node sequentialWorkflowOperation(String operation, Node... steps) { + private static Node sequentialWorkflowOperation(String channel, Node... steps) { return new Node() .type("Coordination/Sequential Workflow Operation") - .properties("operation", new Node().value(operation)) + .properties("channel", new Node().value(channel)) + .properties("request", new Node().type("Integer")) .properties("steps", new Node().items(steps)); } diff --git a/src/test/java/blue/coordination/processor/compute/ComputeWorkflowExecutionTest.java b/src/test/java/blue/coordination/processor/compute/ComputeWorkflowExecutionTest.java index 79873f2..3d48c6e 100644 --- a/src/test/java/blue/coordination/processor/compute/ComputeWorkflowExecutionTest.java +++ b/src/test/java/blue/coordination/processor/compute/ComputeWorkflowExecutionTest.java @@ -355,11 +355,7 @@ void currentContractChannelBindingPreservesAuthoredChannel() { "contracts:", CoordinationTestResources.simpleTimelineChannelYaml("manualChannel", "owner", 2), " run:", - " type: Coordination/Operation", - " channel: manualChannel", - " runImpl:", " type: Coordination/Sequential Workflow Operation", - " operation: run", " channel: manualChannel", " steps:", " - name: Build", diff --git a/src/test/java/blue/coordination/processor/compute/ComputeWorkflowTestSupport.java b/src/test/java/blue/coordination/processor/compute/ComputeWorkflowTestSupport.java index 2c20be4..bb0b7bb 100644 --- a/src/test/java/blue/coordination/processor/compute/ComputeWorkflowTestSupport.java +++ b/src/test/java/blue/coordination/processor/compute/ComputeWorkflowTestSupport.java @@ -86,11 +86,8 @@ String operationWorkflowDocumentWithStatus(String rootFields, String body) { "contracts:", CoordinationTestResources.simpleTimelineChannelYaml("ownerChannel", "owner", 2), " run:", - " type: Coordination/Operation", - " channel: ownerChannel", - " runImpl:", " type: Coordination/Sequential Workflow Operation", - " operation: run", + " channel: ownerChannel", body); } @@ -101,13 +98,10 @@ String operationWorkflowDocumentWithContracts(String extraContracts, String body "contracts:", CoordinationTestResources.simpleTimelineChannelYaml("ownerChannel", "owner", 2), " run:", - " type: Coordination/Operation", - " channel: ownerChannel", - extraContracts, - " runImpl:", " type: Coordination/Sequential Workflow Operation", - " operation: run", - body); + " channel: ownerChannel", + body, + extraContracts); } Node initializedOperationWorkflow(String body) { diff --git a/src/test/java/blue/coordination/processor/compute/CustomerPaynoteLatestBexFixtureTest.java b/src/test/java/blue/coordination/processor/compute/CustomerPaynoteLatestBexFixtureTest.java index e691565..a754419 100644 --- a/src/test/java/blue/coordination/processor/compute/CustomerPaynoteLatestBexFixtureTest.java +++ b/src/test/java/blue/coordination/processor/compute/CustomerPaynoteLatestBexFixtureTest.java @@ -218,14 +218,12 @@ private static void retainAdminUpdateContracts(Node document) { Map all = contracts.getProperties(); Node channel = all.get("sampleAdminChannel"); Node operation = all.get("sampleAdminUpdate"); - Node implementation = all.get("sampleAdminUpdateImpl"); operation.getProperties().remove("request"); - implementation.getProperties().remove("event"); - implementation.properties("channel", new Node().value("sampleAdminChannel")); + operation.getProperties().remove("event"); + operation.properties("channel", new Node().value("sampleAdminChannel")); all.clear(); all.put("sampleAdminChannel", channel); all.put("sampleAdminUpdate", operation); - all.put("sampleAdminUpdateImpl", implementation); } private static final class Fixture { diff --git a/src/test/java/blue/coordination/processor/compute/DynamicEmbeddedParticipantsWorkflowTest.java b/src/test/java/blue/coordination/processor/compute/DynamicEmbeddedParticipantsWorkflowTest.java index 0a35755..2f3f384 100644 --- a/src/test/java/blue/coordination/processor/compute/DynamicEmbeddedParticipantsWorkflowTest.java +++ b/src/test/java/blue/coordination/processor/compute/DynamicEmbeddedParticipantsWorkflowTest.java @@ -113,7 +113,7 @@ private static void assertEmbeddedParticipant(Node document, int number) { assertEquals("embedded-" + number, document.get(prefix + "/contracts/participantChannel/timelineId")); assertNotNull(document.getAsNode(prefix + "/contracts/say")); - assertNotNull(document.getAsNode(prefix + "/contracts/sayImpl")); + assertNotNull(document.getAsNode(prefix + "/contracts/say")); } private static Node operationEvent(ComputeWorkflowTestSupport support, diff --git a/src/test/java/blue/coordination/processor/compute/OfferPaynoteEmbeddedOrdersWorkflowTest.java b/src/test/java/blue/coordination/processor/compute/OfferPaynoteEmbeddedOrdersWorkflowTest.java index 4df2b5c..9948090 100644 --- a/src/test/java/blue/coordination/processor/compute/OfferPaynoteEmbeddedOrdersWorkflowTest.java +++ b/src/test/java/blue/coordination/processor/compute/OfferPaynoteEmbeddedOrdersWorkflowTest.java @@ -376,11 +376,8 @@ private static Node packagePaynote(ComputeWorkflowTestSupport support) { " type: Coordination/Timeline Channel", " timelineId: card-processor", " confirmAuthorization:", - " type: Coordination/Operation", - " channel: cardProcessorChannel", - " confirmAuthorizationImpl:", " type: Coordination/Sequential Workflow Operation", - " operation: confirmAuthorization", + " channel: cardProcessorChannel", " steps:", " - name: BuildAuthorizationPatch", " type: Coordination/Compute", @@ -409,7 +406,7 @@ private static Node packagePaynote(ComputeWorkflowTestSupport support) { " events:", " $events: true", " provideRestaurantOrder:", - " type: Coordination/Operation", + " type: Coordination/Sequential Workflow Operation", " channel: travelAgencyChannel", " request:", " name: Restaurant Order", @@ -425,11 +422,6 @@ private static Node packagePaynote(ComputeWorkflowTestSupport support) { " timelineId: restaurant", " confirm:", " channel: restaurantChannel", - " confirmImpl:", - " operation: confirm", - " provideRestaurantOrderImpl:", - " type: Coordination/Sequential Workflow Operation", - " operation: provideRestaurantOrder", " steps:", " - name: BuildRestaurantOrderPatch", " type: Coordination/Compute", @@ -467,7 +459,7 @@ private static Node packagePaynote(ComputeWorkflowTestSupport support) { " events:", " $events: true", " provideHotelOrder:", - " type: Coordination/Operation", + " type: Coordination/Sequential Workflow Operation", " channel: travelAgencyChannel", " request:", " name: Hotel Order", @@ -484,11 +476,6 @@ private static Node packagePaynote(ComputeWorkflowTestSupport support) { " timelineId: hotel", " confirm:", " channel: hotelChannel", - " confirmImpl:", - " operation: confirm", - " provideHotelOrderImpl:", - " type: Coordination/Sequential Workflow Operation", - " operation: provideHotelOrder", " steps:", " - name: BuildHotelOrderPatch", " type: Coordination/Compute", @@ -611,11 +598,8 @@ private static Node packagePaynote(ComputeWorkflowTestSupport support) { " events:", " $events: true", " confirmCapture:", - " type: Coordination/Operation", - " channel: cardProcessorChannel", - " confirmCaptureImpl:", " type: Coordination/Sequential Workflow Operation", - " operation: confirmCapture", + " channel: cardProcessorChannel", " steps:", " - name: BuildCapturePatch", " type: Coordination/Compute", @@ -663,11 +647,8 @@ private static Node restaurantOrder(ComputeWorkflowTestSupport support) { " type: Coordination/Timeline Channel", " timelineId: restaurant", " confirm:", - " type: Coordination/Operation", - " channel: restaurantChannel", - " confirmImpl:", " type: Coordination/Sequential Workflow Operation", - " operation: confirm", + " channel: restaurantChannel", " steps:", " - name: BuildConfirmation", " type: Coordination/Compute", @@ -710,11 +691,8 @@ private static Node hotelOrder(ComputeWorkflowTestSupport support) { " type: Coordination/Timeline Channel", " timelineId: hotel", " confirm:", - " type: Coordination/Operation", - " channel: hotelChannel", - " confirmImpl:", " type: Coordination/Sequential Workflow Operation", - " operation: confirm", + " channel: hotelChannel", " steps:", " - name: BuildConfirmation", " type: Coordination/Compute", diff --git a/src/test/resources/coordination/compute/bex-counter-persistence.yaml b/src/test/resources/coordination/compute/bex-counter-persistence.yaml index a37de23..76b6eaf 100644 --- a/src/test/resources/coordination/compute/bex-counter-persistence.yaml +++ b/src/test/resources/coordination/compute/bex-counter-persistence.yaml @@ -5,11 +5,8 @@ contracts: type: Coordination/Timeline Channel timelineId: owner increment: - type: Coordination/Operation - channel: ownerChannel - incrementImpl: type: Coordination/Sequential Workflow Operation - operation: increment + channel: ownerChannel steps: - name: BuildPatch type: Coordination/Compute diff --git a/src/test/resources/coordination/compute/dynamic-embedded-participants-bex.yaml b/src/test/resources/coordination/compute/dynamic-embedded-participants-bex.yaml index 19a1d00..27ae9fb 100644 --- a/src/test/resources/coordination/compute/dynamic-embedded-participants-bex.yaml +++ b/src/test/resources/coordination/compute/dynamic-embedded-participants-bex.yaml @@ -15,11 +15,8 @@ embeddedTemplate: type: Coordination/Timeline Channel timelineId: embedded say: - type: Coordination/Operation - channel: participantChannel - sayImpl: type: Coordination/Sequential Workflow Operation - operation: say + channel: participantChannel steps: - type: Coordination/Trigger Event event: @@ -66,11 +63,8 @@ contracts: type: Coordination/Timeline Channel timelineId: bob createEmbedded: - type: Coordination/Operation - channel: aliceChannel - createEmbeddedImpl: type: Coordination/Sequential Workflow Operation - operation: createEmbedded + channel: aliceChannel steps: - name: BuildEmbedded type: Coordination/Compute @@ -133,7 +127,7 @@ contracts: $concat: - /embedded_ - $var: suffix - - /contracts/sayImpl/steps/0/event/message + - /contracts/say/steps/0/event/message val: $concat: - Chat from embedded @@ -249,11 +243,8 @@ contracts: events: $events: true checkChatCount: - type: Coordination/Operation - channel: bobChannel - checkChatCountImpl: type: Coordination/Sequential Workflow Operation - operation: checkChatCount + channel: bobChannel steps: - name: BuildCheck type: Coordination/Compute diff --git a/src/test/resources/coordination/compute/ed25519-hotel-access.yaml b/src/test/resources/coordination/compute/ed25519-hotel-access.yaml index 6b3a9d7..6e3bbfb 100644 --- a/src/test/resources/coordination/compute/ed25519-hotel-access.yaml +++ b/src/test/resources/coordination/compute/ed25519-hotel-access.yaml @@ -7,11 +7,8 @@ contracts: type: Coordination/Timeline Channel timelineId: hotel checkIn: - type: Coordination/Operation - channel: hotelChannel - checkInImpl: type: Coordination/Sequential Workflow Operation - operation: checkIn + channel: hotelChannel steps: - name: VerifyCheckIn type: Coordination/Compute diff --git a/src/test/resources/coordination/compute/ed25519-threshold-approval.yaml b/src/test/resources/coordination/compute/ed25519-threshold-approval.yaml index ec31340..79d17fc 100644 --- a/src/test/resources/coordination/compute/ed25519-threshold-approval.yaml +++ b/src/test/resources/coordination/compute/ed25519-threshold-approval.yaml @@ -19,11 +19,8 @@ contracts: type: Coordination/Timeline Channel timelineId: admin approveAction: - type: Coordination/Operation - channel: adminChannel - approveActionImpl: type: Coordination/Sequential Workflow Operation - operation: approveAction + channel: adminChannel steps: - name: VerifyApproval type: Coordination/Compute diff --git a/src/test/resources/coordination/compute/offer-paynote-embedded-orders-bex.yaml b/src/test/resources/coordination/compute/offer-paynote-embedded-orders-bex.yaml index bf8b7bf..18c493a 100644 --- a/src/test/resources/coordination/compute/offer-paynote-embedded-orders-bex.yaml +++ b/src/test/resources/coordination/compute/offer-paynote-embedded-orders-bex.yaml @@ -39,7 +39,7 @@ contracts: # - A request for another package id does not match this operation. # - A second PayNote reaches the workflow and fails because /paynote is already present. deliverPaynote: - type: Coordination/Operation + type: Coordination/Sequential Workflow Operation channel: packageParticipants request: name: Package PayNote @@ -65,16 +65,10 @@ contracts: timelineId: card-processor confirmAuthorization: channel: cardProcessorChannel - confirmAuthorizationImpl: - operation: confirmAuthorization provideRestaurantOrder: channel: travelAgencyChannel - provideRestaurantOrderImpl: - operation: provideRestaurantOrder provideHotelOrder: channel: travelAgencyChannel - provideHotelOrderImpl: - operation: provideHotelOrder restaurantOrderEvents: childPath: /restaurantOrder hotelOrderEvents: @@ -85,11 +79,6 @@ contracts: channel: hotelOrderEvents confirmCapture: channel: cardProcessorChannel - confirmCaptureImpl: - operation: confirmCapture - deliverPaynoteImpl: - type: Coordination/Sequential Workflow Operation - operation: deliverPaynote steps: - name: BuildDeliverPaynotePatch type: Coordination/Compute diff --git a/src/test/resources/coordination/counter-bex.yaml b/src/test/resources/coordination/counter-bex.yaml index 88a6506..4a6b2c1 100644 --- a/src/test/resources/coordination/counter-bex.yaml +++ b/src/test/resources/coordination/counter-bex.yaml @@ -6,14 +6,11 @@ contracts: timelineId: counter-timeline increment: description: Increment the counter by the given number - type: Coordination/Operation + type: Coordination/Sequential Workflow Operation channel: ownerChannel request: description: Represents a value by which counter will be incremented type: Integer - incrementImpl: - type: Coordination/Sequential Workflow Operation - operation: increment steps: - name: IncrementAndEmit type: Coordination/Compute diff --git a/src/test/resources/processor-delay/customer-paynote-snapshot.document.compute.latest-bex.yaml b/src/test/resources/processor-delay/customer-paynote-snapshot.document.compute.latest-bex.yaml index 541a10f..6a49ce3 100644 --- a/src/test/resources/processor-delay/customer-paynote-snapshot.document.compute.latest-bex.yaml +++ b/src/test/resources/processor-delay/customer-paynote-snapshot.document.compute.latest-bex.yaml @@ -22,7 +22,7 @@ }, "sampleAdminUpdate": { "description": "The standard, required operation for Sample Admin to deliver events.", - "type": "Coordination/Operation", + "type": "Coordination/Sequential Workflow Operation", "order": { "description": "Deterministic sort key within a scope; missing ≡ 0.", "type": "Integer" @@ -30,18 +30,6 @@ "channel": "sampleAdminChannel", "request": { "description": "The request schema for this operation (any Blue node). Invocation payloads MUST conform to this shape.\n" - } - }, - "sampleAdminUpdateImpl": { - "description": "Implementation that re-emits the provided events", - "type": "Coordination/Sequential Workflow Operation", - "order": { - "description": "Deterministic sort key within a scope; missing ≡ 0.", - "type": "Integer" - }, - "channel": { - "description": "The contracts-map key of the channel this handler binds to (same scope).", - "type": "Text" }, "event": { "description": "Optional matcher payload used by the handler's processor to further restrict events." @@ -65,8 +53,7 @@ } ] } - ], - "operation": "sampleAdminUpdate" + ] }, "investorChannel": { "type": "Coordination/Timeline Channel", @@ -5298,7 +5285,7 @@ ] }, "attachComponentOrder": { - "type": "Coordination/Operation", + "type": "Coordination/Sequential Workflow Operation", "description": "Attaches an included merchant order snapshot so package payment can complete after both confirmations.", "channel": "payeeChannel", "request": { @@ -5308,11 +5295,7 @@ "initialSnapshot": { "type": "Common/Record" } - } - }, - "attachComponentOrderImpl": { - "type": "Coordination/Sequential Workflow Operation", - "operation": "attachComponentOrder", + }, "steps": [ { "name": "BuildComponentAttachment", diff --git a/src/test/resources/processor-delay/customer-paynote-snapshot.event.yaml b/src/test/resources/processor-delay/customer-paynote-snapshot.event.yaml index 5d89da1..1621989 100644 --- a/src/test/resources/processor-delay/customer-paynote-snapshot.event.yaml +++ b/src/test/resources/processor-delay/customer-paynote-snapshot.event.yaml @@ -71,7 +71,7 @@ message: value: "zzimVxhnKLL5SwMkS9kmF8p5g7pyxPWBu664HxGbszB" attachComponentOrder: description: "Attaches an included merchant order snapshot so package payment can complete after both confirmations." - type: { blueId: "BoAiqVUZv9Fum3wFqaX2JnQMBHJLxJSo2V9U2UBmCfsC" } + type: { blueId: "39HJEYVHX6RoRhdVx2mpLQ1GQ3RP5CSvDCSiWF6Mhdpq" } channel: type: { blueId: "GX7CFUmSDrE2MzptunLCCdZwnuwwrenRQqEnHL4x3uoC" } value: "payeeChannel" @@ -80,8 +80,6 @@ message: type: { blueId: "GX7CFUmSDrE2MzptunLCCdZwnuwwrenRQqEnHL4x3uoC" } initialSnapshot: type: { blueId: "J18rFf6VX3ADe5gTnqmL4wXtivLkzrRXLPPhnoghnjzB" } - attachComponentOrderImpl: - type: { blueId: "CGdxkNjPcsdescqLPz6SNLsMyak6demQQr7RoKNHbCyv" } steps: items: - name: "BuildComponentAttachment" @@ -95,9 +93,6 @@ message: $binding: name: "steps" path: "/BuildComponentAttachment/changeset" - operation: - type: { blueId: "GX7CFUmSDrE2MzptunLCCdZwnuwwrenRQqEnHL4x3uoC" } - value: "attachComponentOrder" embeddedHotelOrderEvents: type: { blueId: "Fjbu3QpnUaTruDTcTidETCX2N5STyv7KYxT42PCzGHxm" } childPath: diff --git a/src/test/resources/processor-delay/paynote-resale-reduced-bex.yaml b/src/test/resources/processor-delay/paynote-resale-reduced-bex.yaml index 5b67d1e..3afd8ec 100644 --- a/src/test/resources/processor-delay/paynote-resale-reduced-bex.yaml +++ b/src/test/resources/processor-delay/paynote-resale-reduced-bex.yaml @@ -78,14 +78,8 @@ contracts: triggeredEventChannel: type: Triggered Event Channel hotelResaleOrderPlaced: - type: Coordination/Operation - channel: hotelParticipantChannel - restaurantResaleOrderPlaced: - type: Coordination/Operation - channel: restaurantParticipantChannel - hotelResaleOrderPlacedImpl: type: Coordination/Sequential Workflow Operation - operation: hotelResaleOrderPlaced + channel: hotelParticipantChannel steps: - name: ForwardHotelResaleOrderPlaced type: Coordination/Compute @@ -97,9 +91,9 @@ contracts: - $return: events: $events: true - restaurantResaleOrderPlacedImpl: + restaurantResaleOrderPlaced: type: Coordination/Sequential Workflow Operation - operation: restaurantResaleOrderPlaced + channel: restaurantParticipantChannel steps: - name: ForwardRestaurantResaleOrderPlaced type: Coordination/Compute