From de0a41a4f828a578506e64ecb911cab7a27c77c8 Mon Sep 17 00:00:00 2001 From: brig Date: Mon, 13 May 2024 21:00:48 -0400 Subject: [PATCH] server: fix metadata values after resume --- .../concord/it/runtime/v2/ProcessIT.java | 13 ++++ .../runtime/v2/metaAfterSuspend/concord.yml | 18 ++++++ .../v2/metaAfterSuspend/payload/concord.yml | 6 ++ .../process/pipelines/ResumePipeline.java | 2 +- .../pipelines/processors/ResumeProcessor.java | 62 +++++++++++++++++++ 5 files changed, 100 insertions(+), 1 deletion(-) create mode 100644 it/runtime-v2/src/test/resources/com/walmartlabs/concord/it/runtime/v2/metaAfterSuspend/concord.yml create mode 100644 it/runtime-v2/src/test/resources/com/walmartlabs/concord/it/runtime/v2/metaAfterSuspend/payload/concord.yml create mode 100644 server/impl/src/main/java/com/walmartlabs/concord/server/process/pipelines/processors/ResumeProcessor.java diff --git a/it/runtime-v2/src/test/java/com/walmartlabs/concord/it/runtime/v2/ProcessIT.java b/it/runtime-v2/src/test/java/com/walmartlabs/concord/it/runtime/v2/ProcessIT.java index b567e9265b..ad6ad466d0 100644 --- a/it/runtime-v2/src/test/java/com/walmartlabs/concord/it/runtime/v2/ProcessIT.java +++ b/it/runtime-v2/src/test/java/com/walmartlabs/concord/it/runtime/v2/ProcessIT.java @@ -546,4 +546,17 @@ public void testRestart() throws Exception { assertFalse(waitConditions.getIsWaiting()); assertNull(waitConditions.getWaits()); } + + @Test + public void metaAfterSuspend() throws Exception { + Payload payload = new Payload() + .archive(resource("metaAfterSuspend")); + + ConcordProcess proc = concord.processes().start(payload); + ProcessEntry pe = expectStatus(proc, ProcessEntry.StatusEnum.FAILED); + + // --- + Object myMetaValue = pe.getMeta().get("myMetaVar"); + assertEquals("myMetaVarValue", myMetaValue); + } } diff --git a/it/runtime-v2/src/test/resources/com/walmartlabs/concord/it/runtime/v2/metaAfterSuspend/concord.yml b/it/runtime-v2/src/test/resources/com/walmartlabs/concord/it/runtime/v2/metaAfterSuspend/concord.yml new file mode 100644 index 0000000000..bd2ff82861 --- /dev/null +++ b/it/runtime-v2/src/test/resources/com/walmartlabs/concord/it/runtime/v2/metaAfterSuspend/concord.yml @@ -0,0 +1,18 @@ +configuration: + runtime: "concord-v2" + meta: + myMetaVar: "n/a" + +flows: + default: + - set: + myMetaVar: "myMetaVarValue" + + - task: concord + in: + action: start + payload: payload + sync: true + suspend: true + arguments: + name: "Concord" diff --git a/it/runtime-v2/src/test/resources/com/walmartlabs/concord/it/runtime/v2/metaAfterSuspend/payload/concord.yml b/it/runtime-v2/src/test/resources/com/walmartlabs/concord/it/runtime/v2/metaAfterSuspend/payload/concord.yml new file mode 100644 index 0000000000..04d399ebbe --- /dev/null +++ b/it/runtime-v2/src/test/resources/com/walmartlabs/concord/it/runtime/v2/metaAfterSuspend/payload/concord.yml @@ -0,0 +1,6 @@ +configuration: + runtime: "concord-v2" + +flows: + default: + - throw: "BOOM" diff --git a/server/impl/src/main/java/com/walmartlabs/concord/server/process/pipelines/ResumePipeline.java b/server/impl/src/main/java/com/walmartlabs/concord/server/process/pipelines/ResumePipeline.java index 223610ec6a..ceb406e305 100644 --- a/server/impl/src/main/java/com/walmartlabs/concord/server/process/pipelines/ResumePipeline.java +++ b/server/impl/src/main/java/com/walmartlabs/concord/server/process/pipelines/ResumePipeline.java @@ -51,7 +51,7 @@ public ResumePipeline(Injector injector) { DependencyVersionsExportProcessor.class, PolicyExportProcessor.class, StateImportingProcessor.class, - EnqueueingProcessor.class); + ResumeProcessor.class); this.exceptionProcessor = injector.getInstance(FailProcessor.class); this.finalizerProcessor = injector.getInstance(CleanupProcessor.class); diff --git a/server/impl/src/main/java/com/walmartlabs/concord/server/process/pipelines/processors/ResumeProcessor.java b/server/impl/src/main/java/com/walmartlabs/concord/server/process/pipelines/processors/ResumeProcessor.java new file mode 100644 index 0000000000..e1b3180061 --- /dev/null +++ b/server/impl/src/main/java/com/walmartlabs/concord/server/process/pipelines/processors/ResumeProcessor.java @@ -0,0 +1,62 @@ +package com.walmartlabs.concord.server.process.pipelines.processors; + +/*- + * ***** + * Concord + * ----- + * Copyright (C) 2017 - 2023 Walmart Inc. + * ----- + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ===== + */ + +import com.walmartlabs.concord.server.process.Payload; +import com.walmartlabs.concord.server.process.queue.ProcessQueueManager; +import com.walmartlabs.concord.server.sdk.ProcessKey; +import com.walmartlabs.concord.server.sdk.ProcessStatus; +import com.walmartlabs.concord.server.sdk.metrics.WithTimer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.inject.Inject; +import javax.inject.Named; + +/** + * Moves the process into ENQUEUED status if current process eq RESUMING. + */ +@Named +public class ResumeProcessor implements PayloadProcessor { + + private static final Logger log = LoggerFactory.getLogger(ResumeProcessor.class); + + private final ProcessQueueManager queueManager; + + @Inject + public ResumeProcessor(ProcessQueueManager queueManager) { + this.queueManager = queueManager; + } + + @Override + @WithTimer + public Payload process(Chain chain, Payload payload) { + ProcessKey processKey = payload.getProcessKey(); + + boolean updated = queueManager.updateExpectedStatus(processKey, ProcessStatus.RESUMING, ProcessStatus.ENQUEUED); + if (updated) { + return chain.process(payload); + } + + log.warn("process ['{}'] -> process is not suspended, can't resume", processKey); + throw new InvalidProcessStateException("Process is not suspended, can't resume"); + } +}