Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP #902

Closed
wants to merge 3 commits into from
Closed

WIP #902

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,6 @@ public void reentrantSuspend(String eventName, Map<String, Serializable> taskSta
}

state.peekFrame(currentThreadId)
.push(new TaskSuspendCommand(correlationId, LogUtils.getContext(), eventName, (TaskCall) step, taskState));
.push(new TaskSuspendCommand(correlationId, eventName, (TaskCall) step, taskState));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

public interface RunnerLogger {

void withContext(LogContext context, Runnable runnable);
void withLogContext(LogContext context, Runnable runnable);

@Nullable
Long createSegment(String segmentName, UUID correlationId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public Long createSegment(String segmentName, UUID correlationId) {
}

@Override
public void withContext(LogContext context, Runnable runnable) {
public void withLogContext(LogContext context, Runnable runnable) {
ThreadGroup threadGroup = new LogContextThreadGroup(context);
executeInThreadGroup(threadGroup, "thread-" + context.segmentName(), () -> {
// make sure the redirection is enabled in the current thread
Expand Down Expand Up @@ -99,7 +99,7 @@ private static Map<String, Serializable> meta(AbstractStep<?> step) {

/**
* Executes the {@link Callable} in the specified {@link ThreadGroup}.
* A bit expensive as it is creates a new thread.
* A bit expensive as it is creating a new thread.
*/
private static void executeInThreadGroup(ThreadGroup group, String threadName, Runnable runnable) {
ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadGroupAwareThreadFactory(group, threadName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public Long createSegment(String segmentName, UUID correlationId) {
}

@Override
public void withContext(LogContext context, Runnable runnable) {
public void withLogContext(LogContext context, Runnable runnable) {
try {
runnable.run();
} catch (RuntimeException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

import javax.el.ELException;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -93,23 +94,19 @@ public void eval(Runtime runtime, State state, ThreadId threadId) {
UUID correlationId = getCorrelationId();
Context ctx = contextFactory.create(runtime, state, threadId, step, correlationId);

logContext = getLogContext(runtime, ctx, correlationId);
logContext = initLogContext(runtime, ctx, correlationId);
if (logContext == null) {
executeWithContext(ctx, runtime, state, threadId);
ContextProvider.withContext(ctx, () -> execute(runtime, state, threadId));
} else {
runtime.getService(RunnerLogger.class).withContext(logContext,
() -> executeWithContext(ctx, runtime, state, threadId));
runtime.getService(RunnerLogger.class).withLogContext(logContext,
() -> ContextProvider.withContext(ctx, () -> execute(runtime, state, threadId)));
}
}

public UUID getCorrelationId() {
return correlationId;
}

private void executeWithContext(Context ctx, Runtime runtime, State state, ThreadId threadId) {
ContextProvider.withContext(ctx, () -> execute(runtime, state, threadId));
}

@Override
public void onException(Runtime runtime, Exception e, State state, ThreadId threadId) {
if (step.getLocation() == null) {
Expand All @@ -119,7 +116,7 @@ public void onException(Runtime runtime, Exception e, State state, ThreadId thre
if (logContext == null) {
logException(e, state, threadId);
} else {
runtime.getService(RunnerLogger.class).withContext(logContext,
runtime.getService(RunnerLogger.class).withLogContext(logContext,
() -> logException(e, state, threadId));
}
}
Expand All @@ -135,11 +132,18 @@ private void logException(Exception e, State state, ThreadId threadId) {
protected abstract void execute(Runtime runtime, State state, ThreadId threadId);

protected LogContext getLogContext() {
if (logContext == null) {
log.warn("getLogContext -> not initialized yet");
}
return logContext;
}

private LogContext getLogContext(Runtime runtime, Context ctx, UUID correlationId) {
// TODO create logContext outside of StepCommand
private synchronized LogContext initLogContext(Runtime runtime, Context ctx, UUID correlationId) {
if (logContext != null) {
if (!Objects.equals(correlationId, this.logContext.correlationId())) {
log.warn("initLogContext -> correlationId mismatch, requested: {}, current: {}", correlationId, this.logContext.correlationId());
}
return logContext;
}

Expand All @@ -148,14 +152,10 @@ private LogContext getLogContext(Runtime runtime, Context ctx, UUID correlationI
return null;
}

return buildLogContext(runtime, segmentName, correlationId);
}

private LogContext buildLogContext(Runtime runtime, String segmentName, UUID correlationId) {
RunnerConfiguration runnerCfg = runtime.getService(RunnerConfiguration.class);
boolean redirectSystemOutAndErr = runnerCfg.logging().sendSystemOutAndErrToSLF4J();

return LogContext.builder()
return logContext = LogContext.builder()
.segmentName(segmentName)
.correlationId(correlationId)
.redirectSystemOutAndErr(redirectSystemOutAndErr)
Expand Down Expand Up @@ -196,9 +196,9 @@ private static String getExceptionMessage(Exception e) {

if (e instanceof ELException) {
return
ExceptionUtils.getExceptionList(e).stream()
.map(Throwable::getMessage)
.collect(Collectors.joining(". "));
ExceptionUtils.getExceptionList(e).stream()
.map(Throwable::getMessage)
.collect(Collectors.joining(". "));
}

List<Throwable> exceptions = ExceptionUtils.getExceptionList(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.walmartlabs.concord.runtime.v2.model.TaskCall;
import com.walmartlabs.concord.runtime.v2.runner.context.ResumeEventImpl;
import com.walmartlabs.concord.runtime.v2.runner.logging.LogContext;
import com.walmartlabs.concord.runtime.v2.runner.logging.LogUtils;
import com.walmartlabs.concord.svm.Runtime;
import com.walmartlabs.concord.svm.*;

Expand All @@ -35,14 +36,12 @@ public class TaskSuspendCommand implements Command {
private static final long serialVersionUID = 1L;

private final UUID correlationId;
private final LogContext logContext;
private final String eventName;
private final TaskCall step;
private final Map<String, Serializable> taskState;

public TaskSuspendCommand(UUID correlationId, LogContext logContext, String eventName, TaskCall step, Map<String, Serializable> taskState) {
public TaskSuspendCommand(UUID correlationId, String eventName, TaskCall step, Map<String, Serializable> taskState) {
this.correlationId = correlationId;
this.logContext = logContext;
this.eventName = eventName;
this.step = step;
this.taskState = taskState;
Expand All @@ -53,6 +52,7 @@ public void eval(Runtime runtime, State state, ThreadId threadId) {
Frame frame = state.peekFrame(threadId);
frame.pop();

LogContext logContext = LogUtils.getContext();
frame.push(new TaskResumeCommand(correlationId, logContext, step, new ResumeEventImpl(eventName, taskState)));
frame.push(new SuspendCommand(eventName));
}
Expand Down
12 changes: 7 additions & 5 deletions runtime/v2/vm/src/main/java/com/walmartlabs/concord/svm/VM.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serial;
import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
Expand Down Expand Up @@ -91,12 +92,12 @@ public void resume(State state, Set<String> eventRefs) throws Exception {
public void run(State state, Command cmd) throws Exception {
log.debug("run ['{}'] -> start", cmd);

Runtime rt = runtimeFactory.create(this);
Runtime runtime = runtimeFactory.create(this);
ThreadId threadId = state.getRootThreadId();
try {
cmd.eval(rt, state, threadId);
cmd.eval(runtime, state, threadId);
} catch (Exception e) {
cmd.onException(rt, e, state, threadId);
cmd.onException(runtime, e, state, threadId);
throw e;
}

Expand Down Expand Up @@ -166,7 +167,7 @@ private EvalResult execute(Runtime runtime, State state) throws Exception {
EvalResult result;

while (true) {
// if we're restoring from a previously saved state or we had new threads created
// if we're restoring from a previously saved state, or we had new threads created
// on the previous iteration we need to spawn all READY threads
for (Map.Entry<ThreadId, ThreadStatus> e : state.threadStatus().entrySet()) {
if (e.getKey() != state.getRootThreadId() && e.getValue() == ThreadStatus.READY) {
Expand Down Expand Up @@ -232,8 +233,9 @@ private static void wakeSuspended(State state) {
}
}

private static class EvalResult implements Serializable {
public static class EvalResult implements Serializable {

@Serial
private static final long serialVersionUID = 1L;

private final Frame lastFrame;
Expand Down
Loading