Skip to content

Commit

Permalink
Merge branch 'master' into ib/mask-sensitive-data-in-task-params
Browse files Browse the repository at this point in the history
  • Loading branch information
brig authored Dec 30, 2024
2 parents 11d0e72 + 4376458 commit 68f565f
Show file tree
Hide file tree
Showing 9 changed files with 131 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down Expand Up @@ -60,7 +60,7 @@ public static ConcordRule configure() {
.useLocalMavenRepository(true)
.extraConfigurationSupplier(() -> """
concord-agent {
dependencyResolveTimeout = "5 seconds"
dependencyResolveTimeout = "20 seconds"
logMaxDelay = "250 milliseconds"
pollInterval = "250 milliseconds"
prefork {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import static com.walmartlabs.concord.it.common.ITUtils.randomString;
import static com.walmartlabs.concord.it.runtime.v2.Utils.resourceToString;
import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertNotNull;

public class ProcessIT extends AbstractTest {

Expand Down Expand Up @@ -729,6 +730,23 @@ public void testDryRunModeNotSupportedByScript() throws Exception {
proc.assertLog(".*Error @ line: 6, col: 7. Dry-run mode is not supported for this 'script' step.*");
}

@Test
public void testThrowParallelWithPayload() throws Exception {
Payload payload = new Payload()
.archive(resource("parallelExceptionPayload"));

ConcordProcess proc = concord.processes().start(payload);
expectStatus(proc, ProcessEntry.StatusEnum.FINISHED);

// ---
Map<String, Object> data = proc.getOutVariables();
List<Map<String, Object>> exceptions = (List<Map<String, Object>>) ConfigurationUtils.get(data, "exceptions");

assertNotNull(exceptions);
assertEquals(List.of("BOOM1", "BOOM2"), exceptions.stream().map(e -> e.get("message")).toList());
assertEquals(List.of(Map.of("key", 1), Map.of("key", 2)), exceptions.stream().map(e -> e.get("payload")).toList());
}

private List<ProcessEventEntry> getProcessElementEvents(ConcordProcess proc) throws Exception {
ProcessEventsApi processEventsApi = new ProcessEventsApi(concord.apiClient());
return processEventsApi.listProcessEvents(proc.instanceId(), "ELEMENT", null, null, null, "pre", null, null);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
configuration:
runtime: "concord-v2"
out:
- exceptions

flows:
default:
- try:
- task: "throw"
in:
exception: "BOOM${item}"
payload:
key: "${item}"
loop:
items:
- 1
- 2
mode: parallel
parallelism: 2
error:
- set:
exceptions: ${lastError.cause.exceptions.stream().map(e -> e.cause).toList()}
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ public void callWithRetryTest() throws Exception {
"(concord.yaml) @ line: 3, col: 7, thread: 0, flow: inner");
assertSegmentStatusError(log, 1);

assertSystemSegment(log, "[WARN ] Last error: com.walmartlabs.concord.runtime.v2.sdk.UserDefinedException: FAIL. Waiting for 1000ms before retry (attempt #0)");
assertSystemSegment(log, "[WARN ] Last error: FAIL. Waiting for 1000ms before retry (attempt #0)");

assertSegmentLog(log, 2, "[INFO ] in inner flow");
assertSegmentStatusOk(log, 2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,9 @@ public String toString() {
public String getMessage() {
return getCause().getMessage();
}

@Override
public StackTraceElement[] getStackTrace() {
return new StackTraceElement[0];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,24 @@
* =====
*/

import com.fasterxml.jackson.annotation.JsonIdentityInfo;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.ObjectIdGenerators;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.walmartlabs.concord.runtime.v2.runner.PersistenceService;
import com.walmartlabs.concord.runtime.v2.sdk.UserDefinedException;
import com.walmartlabs.concord.sdk.Constants;
import com.walmartlabs.concord.svm.Runtime;
import com.walmartlabs.concord.svm.*;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Saves the current unhandled exception as a process metadata variable.
Expand All @@ -44,9 +47,11 @@ public class SaveLastErrorCommand implements Command {
// for backward compatibility (java8 concord 1.92.0 version)
private static final long serialVersionUID = 5759484819869224819L;

private static final TypeReference<Map<String, Object>> MAP_TYPE = new TypeReference<Map<String, Object>>() {
private static final TypeReference<Map<String, Object>> MAP_TYPE = new TypeReference<>() {
};

private static final AtomicInteger idGenerator = new AtomicInteger(1);

@Override
public void eval(Runtime runtime, State state, ThreadId threadId) throws Exception {
Frame frame = state.peekFrame(threadId);
Expand All @@ -68,10 +73,6 @@ public void eval(Runtime runtime, State state, ThreadId threadId) throws Excepti

private static Map<String, Object> serialize(Exception e) {
try {
if (e instanceof LoggedException le) {
e = le.getCause();
}

return createMapper().convertValue(e, MAP_TYPE);
} catch (Exception ex) {
// ignore ex
Expand All @@ -80,26 +81,67 @@ private static Map<String, Object> serialize(Exception e) {
}

private static ObjectMapper createMapper() {
ObjectMapper om = new ObjectMapper();
var module = new SimpleModule();
module.addSerializer(ParallelExecutionException.class, new ParallelExceptionSerializer());
module.addSerializer(LoggedException.class, new LoggedExceptionSerializer());
module.addSerializer(UserDefinedException.class, new UserDefinedExceptionSerializer());
module.addSerializer(Exception.class, new ExceptionSerializer());

var om = new ObjectMapper();
om.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
om.addMixIn(Throwable.class, ExceptionMixIn.class);
om.registerModule(module);
return om;
}

@SuppressWarnings("unused")
@JsonInclude(JsonInclude.Include.NON_EMPTY)
@JsonIdentityInfo(generator= ObjectIdGenerators.IntSequenceGenerator.class)
abstract static class ExceptionMixIn {
@JsonIgnore
abstract StackTraceElement[] getStackTrace();
private static class ParallelExceptionSerializer extends JsonSerializer<ParallelExecutionException> {

@Override
public void serialize(ParallelExecutionException exception, JsonGenerator gen, SerializerProvider serializers) throws IOException {
gen.writeStartObject();
gen.writeNumberField("@id", idGenerator.getAndIncrement());
gen.writeStringField("message", exception.getMessage());

@JsonIgnore
abstract String getLocalizedMessage();
gen.writeArrayFieldStart("exceptions");
for (var e : exception.getExceptions()) {
gen.writeObject(e);
}
gen.writeEndArray();

@JsonIgnore
abstract Throwable[] getSuppressed();
gen.writeEndObject();
}
}

@JsonIgnore
abstract Throwable getCause();
private static class LoggedExceptionSerializer extends JsonSerializer<LoggedException> {

@Override
public void serialize(LoggedException exception, JsonGenerator gen, SerializerProvider serializers) throws IOException {
gen.writeObject(exception.getCause());
}
}

private static class UserDefinedExceptionSerializer extends JsonSerializer<UserDefinedException> {

@Override
public void serialize(UserDefinedException exception, JsonGenerator gen, SerializerProvider serializers) throws IOException {
gen.writeStartObject();
gen.writeNumberField("@id", idGenerator.getAndIncrement());
gen.writeStringField("message", exception.getMessage());
if (exception.getPayload() != null && !exception.getPayload().isEmpty()) {
gen.writeObjectField("payload", exception.getPayload());
}
gen.writeEndObject();
}
}

private static class ExceptionSerializer extends JsonSerializer<Exception> {

@Override
public void serialize(Exception exception, JsonGenerator gen, SerializerProvider serializers) throws IOException {
gen.writeStartObject();
gen.writeNumberField("@id", idGenerator.getAndIncrement());
gen.writeStringField("message", exception.getMessage());
gen.writeStringField("type", exception.getClass().getName());
gen.writeEndObject();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ public void test() throws Exception {
writerCaptor.getValue().write(bos);

assertThat(bos.toString(), containsString("\"message\":\"BOOM1\""));
assertThat(bos.toString(), containsString("\"someCyclicField\":1"));
assertThat(bos.toString(), containsString("\"@id\":1"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,13 @@ public void printStackTrace(PrintWriter s) {
public Map<String, Object> getPayload() {
return payload;
}

@Override
public String toString() {
var m = getLocalizedMessage();
if (payload != null && !payload.isEmpty()) {
m = m + ": " + payload;
}
return m;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import java.io.PrintStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;

/**
Expand All @@ -34,9 +36,15 @@ public class ParallelExecutionException extends RuntimeException {

private static final long serialVersionUID = 1L;
private static final int MAX_STACK_TRACE_ELEMENTS = 3;
private final List<Exception> exceptions;

public ParallelExecutionException(Collection<Exception> causes) {
super("Parallel execution errors: \n" + toMessage(causes));
this.exceptions = new ArrayList<>(causes);
}

public List<Exception> getExceptions() {
return exceptions;
}

private static String toMessage(Collection<Exception> causes) {
Expand Down

0 comments on commit 68f565f

Please sign in to comment.