Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runtime-v2: error location for loop, call, parallel, retry commands (v2) #865

Merged
merged 21 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

public final class ExceptionUtils {

Expand All @@ -34,6 +35,15 @@ public static List<Throwable> getExceptionList(Throwable e) {
return list;
}

@SuppressWarnings("unchecked")
public static <T> T filterException(Exception e, Class<T> clazz) {
return getExceptionList(e).stream()
.filter(clazz::isInstance)
.map(c -> (T)c)
.findAny()
.orElse(null);
}

private ExceptionUtils() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
@Serial.Version(-8269676324702677451L)
public interface SimpleOptions extends StepOptions {

long serialVersionUID = -8269676324702677451L;

static SimpleOptions of(Map<String, Serializable> meta) {
return ImmutableSimpleOptions.builder()
.meta(meta)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import com.walmartlabs.concord.common.ExceptionUtils;
import com.walmartlabs.concord.runtime.v2.runner.el.functions.*;
import com.walmartlabs.concord.runtime.v2.runner.el.resolvers.BeanELResolver;
import com.walmartlabs.concord.runtime.v2.runner.el.resolvers.*;
import com.walmartlabs.concord.runtime.v2.runner.el.resolvers.MapELResolver;
import com.walmartlabs.concord.runtime.v2.runner.el.resolvers.*;
import com.walmartlabs.concord.runtime.v2.runner.tasks.TaskProviders;
import com.walmartlabs.concord.runtime.v2.sdk.EvalContext;
import com.walmartlabs.concord.runtime.v2.sdk.ExpressionEvaluator;
Expand Down Expand Up @@ -157,16 +157,12 @@ public FunctionMapper getFunctionMapper() {
}

throw new UserDefinedException(errorMessage);
} catch (ELException e) {
throw ExceptionUtils.getExceptionList(e).stream()
.filter(i -> i instanceof UserDefinedException)
.findAny()
.map(i -> (RuntimeException)i)
.orElse(e);
} catch (UserDefinedException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException("while evaluating expression '" + expr + "'", e);
UserDefinedException u = ExceptionUtils.filterException(e, UserDefinedException.class);
if (u != null) {
throw u;
}
throw new RuntimeException("while evaluating expression '" + expr + "': " + e.getMessage());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.walmartlabs.concord.runtime.v2.model.Step;
import com.walmartlabs.concord.runtime.v2.runner.el.MethodNotFoundException;
import com.walmartlabs.concord.runtime.v2.runner.tasks.TaskCallInterceptor;
import com.walmartlabs.concord.runtime.v2.runner.tasks.TaskException;
import com.walmartlabs.concord.runtime.v2.sdk.Context;
import com.walmartlabs.concord.runtime.v2.sdk.Task;

Expand Down Expand Up @@ -77,6 +78,8 @@ public Object invoke(ELContext elContext, Object base, Object method, Class<?>[]
throw e;
} catch (RuntimeException e) {
throw e;
} catch (TaskException e) {
throw new RuntimeException(e.getCause());
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,13 @@
* =====
*/

import javax.annotation.Nullable;
import java.util.UUID;

public interface RunnerLogger {

void withContext(LogContext context, Runnable runnable);

@Nullable
Long createSegment(String segmentName, UUID correlationId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.io.Serializable;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.*;

import static ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER;
Expand All @@ -47,27 +48,31 @@ public SegmentedLogger(LoggingClient loggingClient) {
}

@Override
public void withContext(LogContext ctx, Runnable runnable) {
Long segmentId = ctx.segmentId();
if (segmentId == null) {
segmentId = loggingClient.createSegment(ctx.correlationId(), ctx.segmentName());
}

LogContext context = LogContext.builder().from(ctx)
.segmentId(segmentId)
.build();
public Long createSegment(String segmentName, UUID correlationId) {
return loggingClient.createSegment(correlationId, segmentName);
}

@Override
public void withContext(LogContext context, Runnable runnable) {
ThreadGroup threadGroup = new LogContextThreadGroup(context);
executeInThreadGroup(threadGroup, "thread-" + context.segmentName(), () -> {
// make sure the redirection is enabled in the current thread
if (context.redirectSystemOutAndErr() && !SysOutOverSLF4J.systemOutputsAreSLF4JPrintStreams()) {
SysOutOverSLF4J.sendSystemOutAndErrToSLF4J(LogLevel.INFO, LogLevel.ERROR);
}

boolean exceptionOccurred = false;
try {
runnable.run();
} catch (Exception e) {
exceptionOccurred = true;
throw e;
} finally {
log.info(FINALIZE_SESSION_MARKER, "<<finalize>>");
if (exceptionOccurred) {
log.error(FINALIZE_SESSION_MARKER, "<<finalize>>");
} else {
log.info(FINALIZE_SESSION_MARKER, "<<finalize>>");
}
}
});
}
Expand Down Expand Up @@ -117,15 +122,7 @@ private static void executeInThreadGroup(ThreadGroup group, String threadName, R
}
}

private static final class ThreadGroupAwareThreadFactory implements ThreadFactory {

private final ThreadGroup group;
private final String threadName;

private ThreadGroupAwareThreadFactory(ThreadGroup group, String threadName) {
this.group = group;
this.threadName = threadName;
}
private record ThreadGroupAwareThreadFactory(ThreadGroup group, String threadName) implements ThreadFactory {

@Override
public Thread newThread(Runnable r) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,15 @@
* =====
*/

import java.util.UUID;

public class SimpleLogger implements RunnerLogger {

@Override
public Long createSegment(String segmentName, UUID correlationId) {
return null;
}

@Override
public void withContext(LogContext context, Runnable runnable) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,10 @@
import com.walmartlabs.concord.runtime.v2.model.FlowCall;
import com.walmartlabs.concord.runtime.v2.model.FlowCallOptions;
import com.walmartlabs.concord.runtime.v2.model.ProcessDefinition;
import com.walmartlabs.concord.runtime.v2.model.Step;
import com.walmartlabs.concord.runtime.v2.runner.compiler.CompilerUtils;
import com.walmartlabs.concord.runtime.v2.runner.context.ContextFactory;
import com.walmartlabs.concord.runtime.v2.sdk.EvalContext;
import com.walmartlabs.concord.runtime.v2.sdk.EvalContextFactory;
import com.walmartlabs.concord.runtime.v2.sdk.ExpressionEvaluator;
import com.walmartlabs.concord.runtime.v2.runner.logging.LogContext;
import com.walmartlabs.concord.runtime.v2.sdk.Compiler;
import com.walmartlabs.concord.runtime.v2.sdk.Context;
import com.walmartlabs.concord.runtime.v2.sdk.ProcessConfiguration;
import com.walmartlabs.concord.runtime.v2.sdk.*;
import com.walmartlabs.concord.svm.Runtime;
import com.walmartlabs.concord.svm.*;

Expand Down Expand Up @@ -87,7 +82,7 @@ protected void execute(Runtime runtime, State state, ThreadId threadId) {
// and put it into the callee's frame
Command processOutVars;
if (!opts.outExpr().isEmpty()) {
processOutVars = new EvalVariablesCommand(getStep(), opts.outExpr(), innerFrame);
processOutVars = new EvalVariablesCommand(getStep(), opts.outExpr(), innerFrame, getLogContext());
} else {
processOutVars = new CopyVariablesCommand(opts.out(), innerFrame, VMUtils::assertNearestRoot);
}
Expand All @@ -102,36 +97,43 @@ public static String getFlowName(State state, ThreadId threadId) {
return VMUtils.getLocal(state, threadId, FLOW_NAME_VARIABLE);
}

private static class EvalVariablesCommand implements Command {
private static class EvalVariablesCommand extends StepCommand<FlowCall> {

// for backward compatibility (java8 concord 1.92.0 version)
private static final long serialVersionUID = -7294220776008029488L;

private final Step step;
// TODO: only for backward compatibility
private final FlowCall step;

private final Map<String, Serializable> variables;
private final Frame variablesFrame;

private EvalVariablesCommand(FlowCall step, Map<String, Serializable> variables, Frame variablesFrame) {
private EvalVariablesCommand(FlowCall step, Map<String, Serializable> variables, Frame variablesFrame, LogContext logContext) {
super(step, logContext);
this.step = step;
this.variables = variables;
this.variablesFrame = variablesFrame;
}

@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public void eval(Runtime runtime, State state, ThreadId threadId) {
protected void execute(Runtime runtime, State state, ThreadId threadId) {
Frame frame = state.peekFrame(threadId);
frame.pop();

ContextFactory contextFactory = runtime.getService(ContextFactory.class);
Context ctx = contextFactory.create(runtime, state, threadId, step);
Context ctx = runtime.getService(Context.class);

EvalContextFactory ecf = runtime.getService(EvalContextFactory.class);
ExpressionEvaluator expressionEvaluator = runtime.getService(ExpressionEvaluator.class);
Map<String, Object> vars = (Map)variablesFrame.getLocals();
Map<String, Serializable> out = expressionEvaluator.evalAsMap(ecf.global(ctx, vars), variables);
out.forEach((k, v) -> ctx.variables().set(k, v));
}

// TODO: only for backward compatibility
@Override
public FlowCall getStep() {
return step;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ protected LoopWrapper(Command cmd, Serializable items, Collection<String> outVar

@Override
public void eval(Runtime runtime, State state, ThreadId threadId) {
execute(runtime, state, threadId);
}

@Override
public void onException(Runtime runtime, Exception e, State state, ThreadId threadId) {
cmd.onException(runtime, e, state, threadId);
}

private void execute(Runtime runtime, State state, ThreadId threadId) {
Frame frame = state.peekFrame(threadId);
frame.pop();

Expand All @@ -86,14 +95,9 @@ public void eval(Runtime runtime, State state, ThreadId threadId) {
return;
}

Step currentStep = null;
if (cmd instanceof StepCommand) {
currentStep = ((StepCommand<?>) cmd).getStep();
}

// create the context explicitly
ContextFactory contextFactory = runtime.getService(ContextFactory.class);
Context ctx = contextFactory.create(runtime, state, threadId, currentStep);
Context ctx = contextFactory.create(runtime, state, threadId, getCurrentStep());

EvalContextFactory ecf = runtime.getService(EvalContextFactory.class);
ExpressionEvaluator ee = runtime.getService(ExpressionEvaluator.class);
Expand All @@ -111,6 +115,13 @@ public void eval(Runtime runtime, State state, ThreadId threadId) {

protected abstract void eval(Runtime runtime, State state, ThreadId threadId, Context ctx, ArrayList<Serializable> items);

private Step getCurrentStep() {
if (cmd instanceof StepCommand) {
return ((StepCommand<?>) cmd).getStep();
}
return null;
}

static class ParallelWithItems extends LoopWrapper {

private static final long serialVersionUID = 1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@

import com.walmartlabs.concord.runtime.v2.model.ParallelBlock;
import com.walmartlabs.concord.runtime.v2.model.ParallelBlockOptions;
import com.walmartlabs.concord.runtime.v2.runner.context.ContextFactory;
import com.walmartlabs.concord.runtime.v2.runner.logging.LogContext;
import com.walmartlabs.concord.runtime.v2.sdk.Context;
import com.walmartlabs.concord.runtime.v2.sdk.EvalContextFactory;
import com.walmartlabs.concord.runtime.v2.sdk.ExpressionEvaluator;
import com.walmartlabs.concord.runtime.v2.sdk.Context;
import com.walmartlabs.concord.svm.Runtime;
import com.walmartlabs.concord.svm.*;

Expand Down Expand Up @@ -64,7 +64,7 @@ protected void execute(Runtime runtime, State state, ThreadId threadId) {
Map<String, Object> accumulator = new ConcurrentHashMap<>();
outVarsCommand = new CollectVariablesCommand(accumulator);

frame.push(new EvalVariablesCommand(accumulator, opts.outExpr(), frame));
frame.push(new EvalVariablesCommand(getStep(), accumulator, opts.outExpr(), frame, getLogContext()));
} else {
outVarsCommand = new CopyVariablesCommand(opts.out(), State::peekFrame, frame);
}
Expand Down Expand Up @@ -99,7 +99,7 @@ public void eval(Runtime runtime, State state, ThreadId threadId) {
}
}

static class EvalVariablesCommand implements Command {
static class EvalVariablesCommand extends StepCommand<ParallelBlock> {

// for backward compatibility (java8 concord 1.92.0 version)
private static final long serialVersionUID = 1370076263447141826L;
Expand All @@ -108,20 +108,20 @@ static class EvalVariablesCommand implements Command {
private final Map<String, Serializable> variables;
private final Frame target;

public EvalVariablesCommand(Map<String, Object> allVars, Map<String, Serializable> variables, Frame target) {
public EvalVariablesCommand(ParallelBlock step, Map<String, Object> allVars, Map<String, Serializable> variables, Frame target, LogContext logContext) {
super(step, logContext);

this.allVars = allVars;
this.variables = variables;
this.target = target;
}

@Override
public void eval(Runtime runtime, State state, ThreadId threadId) {
protected void execute(Runtime runtime, State state, ThreadId threadId) {
Frame frame = state.peekFrame(threadId);
frame.pop();

ContextFactory contextFactory = runtime.getService(ContextFactory.class);
Context ctx = contextFactory.create(runtime, state, threadId, null);

Context ctx = runtime.getService(Context.class);
EvalContextFactory ecf = runtime.getService(EvalContextFactory.class);
ExpressionEvaluator expressionEvaluator = runtime.getService(ExpressionEvaluator.class);
Map<String, Serializable> out = expressionEvaluator.evalAsMap(ecf.global(ctx, allVars), variables);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,15 @@ public RetryWrapper(Command cmd, Retry retry) {

@Override
public void eval(Runtime runtime, State state, ThreadId threadId) {
execute(runtime, state, threadId);
}

@Override
public void onException(Runtime runtime, Exception e, State state, ThreadId threadId) {
cmd.onException(runtime, e, state, threadId);
}

private void execute(Runtime runtime, State state, ThreadId threadId) {
Frame frame = state.peekFrame(threadId);
frame.pop();

Expand Down
Loading