Skip to content

Commit

Permalink
server: fix metadata values after resume
Browse files Browse the repository at this point in the history
  • Loading branch information
brig committed May 14, 2024
1 parent 577b0ec commit de0a41a
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -546,4 +546,17 @@ public void testRestart() throws Exception {
assertFalse(waitConditions.getIsWaiting());
assertNull(waitConditions.getWaits());
}

@Test
public void metaAfterSuspend() throws Exception {
Payload payload = new Payload()
.archive(resource("metaAfterSuspend"));

ConcordProcess proc = concord.processes().start(payload);
ProcessEntry pe = expectStatus(proc, ProcessEntry.StatusEnum.FAILED);

// ---
Object myMetaValue = pe.getMeta().get("myMetaVar");
assertEquals("myMetaVarValue", myMetaValue);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
configuration:
runtime: "concord-v2"
meta:
myMetaVar: "n/a"

flows:
default:
- set:
myMetaVar: "myMetaVarValue"

- task: concord
in:
action: start
payload: payload
sync: true
suspend: true
arguments:
name: "Concord"
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
configuration:
runtime: "concord-v2"

flows:
default:
- throw: "BOOM"
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public ResumePipeline(Injector injector) {
DependencyVersionsExportProcessor.class,
PolicyExportProcessor.class,
StateImportingProcessor.class,
EnqueueingProcessor.class);
ResumeProcessor.class);

this.exceptionProcessor = injector.getInstance(FailProcessor.class);
this.finalizerProcessor = injector.getInstance(CleanupProcessor.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.walmartlabs.concord.server.process.pipelines.processors;

/*-
* *****
* Concord
* -----
* Copyright (C) 2017 - 2023 Walmart Inc.
* -----
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* =====
*/

import com.walmartlabs.concord.server.process.Payload;
import com.walmartlabs.concord.server.process.queue.ProcessQueueManager;
import com.walmartlabs.concord.server.sdk.ProcessKey;
import com.walmartlabs.concord.server.sdk.ProcessStatus;
import com.walmartlabs.concord.server.sdk.metrics.WithTimer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import javax.inject.Named;

/**
* Moves the process into ENQUEUED status if current process eq RESUMING.
*/
@Named
public class ResumeProcessor implements PayloadProcessor {

private static final Logger log = LoggerFactory.getLogger(ResumeProcessor.class);

private final ProcessQueueManager queueManager;

@Inject
public ResumeProcessor(ProcessQueueManager queueManager) {
this.queueManager = queueManager;
}

@Override
@WithTimer
public Payload process(Chain chain, Payload payload) {
ProcessKey processKey = payload.getProcessKey();

boolean updated = queueManager.updateExpectedStatus(processKey, ProcessStatus.RESUMING, ProcessStatus.ENQUEUED);
if (updated) {
return chain.process(payload);
}

log.warn("process ['{}'] -> process is not suspended, can't resume", processKey);
throw new InvalidProcessStateException("Process is not suspended, can't resume");
}
}

0 comments on commit de0a41a

Please sign in to comment.