diff --git a/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/context/ContextImpl.java b/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/context/ContextImpl.java index da16a47293..c57a7de254 100644 --- a/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/context/ContextImpl.java +++ b/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/context/ContextImpl.java @@ -215,6 +215,6 @@ public void reentrantSuspend(String eventName, Map taskSta } state.peekFrame(currentThreadId) - .push(new TaskSuspendCommand(correlationId, LogUtils.getContext(), eventName, (TaskCall) step, taskState)); + .push(new TaskSuspendCommand(correlationId, eventName, (TaskCall) step, taskState)); } } diff --git a/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/logging/RunnerLogger.java b/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/logging/RunnerLogger.java index 30f0f5bf77..6b55ff8701 100644 --- a/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/logging/RunnerLogger.java +++ b/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/logging/RunnerLogger.java @@ -25,7 +25,7 @@ public interface RunnerLogger { - void withContext(LogContext context, Runnable runnable); + void withLogContext(LogContext context, Runnable runnable); @Nullable Long createSegment(String segmentName, UUID correlationId); diff --git a/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/logging/SegmentedLogger.java b/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/logging/SegmentedLogger.java index f5582a7628..45fcf7642e 100644 --- a/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/logging/SegmentedLogger.java +++ b/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/logging/SegmentedLogger.java @@ -53,7 +53,7 @@ public Long createSegment(String segmentName, UUID correlationId) { } @Override - public void withContext(LogContext context, Runnable runnable) { + public void withLogContext(LogContext context, Runnable runnable) { ThreadGroup threadGroup = new LogContextThreadGroup(context); executeInThreadGroup(threadGroup, "thread-" + context.segmentName(), () -> { // make sure the redirection is enabled in the current thread @@ -99,7 +99,7 @@ private static Map meta(AbstractStep step) { /** * Executes the {@link Callable} in the specified {@link ThreadGroup}. - * A bit expensive as it is creates a new thread. + * A bit expensive as it is creating a new thread. */ private static void executeInThreadGroup(ThreadGroup group, String threadName, Runnable runnable) { ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadGroupAwareThreadFactory(group, threadName)); diff --git a/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/logging/SimpleLogger.java b/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/logging/SimpleLogger.java index 7234afd4da..1464b1a9b3 100644 --- a/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/logging/SimpleLogger.java +++ b/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/logging/SimpleLogger.java @@ -30,7 +30,7 @@ public Long createSegment(String segmentName, UUID correlationId) { } @Override - public void withContext(LogContext context, Runnable runnable) { + public void withLogContext(LogContext context, Runnable runnable) { try { runnable.run(); } catch (RuntimeException e) { diff --git a/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/vm/StepCommand.java b/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/vm/StepCommand.java index 52b0247d60..fa788fe89f 100644 --- a/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/vm/StepCommand.java +++ b/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/vm/StepCommand.java @@ -40,6 +40,7 @@ import javax.el.ELException; import java.util.List; +import java.util.Objects; import java.util.UUID; import java.util.stream.Collectors; @@ -93,12 +94,12 @@ public void eval(Runtime runtime, State state, ThreadId threadId) { UUID correlationId = getCorrelationId(); Context ctx = contextFactory.create(runtime, state, threadId, step, correlationId); - logContext = getLogContext(runtime, ctx, correlationId); + logContext = initLogContext(runtime, ctx, correlationId); if (logContext == null) { - executeWithContext(ctx, runtime, state, threadId); + ContextProvider.withContext(ctx, () -> execute(runtime, state, threadId)); } else { - runtime.getService(RunnerLogger.class).withContext(logContext, - () -> executeWithContext(ctx, runtime, state, threadId)); + runtime.getService(RunnerLogger.class).withLogContext(logContext, + () -> ContextProvider.withContext(ctx, () -> execute(runtime, state, threadId))); } } @@ -106,10 +107,6 @@ public UUID getCorrelationId() { return correlationId; } - private void executeWithContext(Context ctx, Runtime runtime, State state, ThreadId threadId) { - ContextProvider.withContext(ctx, () -> execute(runtime, state, threadId)); - } - @Override public void onException(Runtime runtime, Exception e, State state, ThreadId threadId) { if (step.getLocation() == null) { @@ -119,7 +116,7 @@ public void onException(Runtime runtime, Exception e, State state, ThreadId thre if (logContext == null) { logException(e, state, threadId); } else { - runtime.getService(RunnerLogger.class).withContext(logContext, + runtime.getService(RunnerLogger.class).withLogContext(logContext, () -> logException(e, state, threadId)); } } @@ -135,11 +132,18 @@ private void logException(Exception e, State state, ThreadId threadId) { protected abstract void execute(Runtime runtime, State state, ThreadId threadId); protected LogContext getLogContext() { + if (logContext == null) { + log.warn("getLogContext -> not initialized yet"); + } return logContext; } - private LogContext getLogContext(Runtime runtime, Context ctx, UUID correlationId) { + // TODO create logContext outside of StepCommand + private synchronized LogContext initLogContext(Runtime runtime, Context ctx, UUID correlationId) { if (logContext != null) { + if (!Objects.equals(correlationId, this.logContext.correlationId())) { + log.warn("initLogContext -> correlationId mismatch, requested: {}, current: {}", correlationId, this.logContext.correlationId()); + } return logContext; } @@ -148,14 +152,10 @@ private LogContext getLogContext(Runtime runtime, Context ctx, UUID correlationI return null; } - return buildLogContext(runtime, segmentName, correlationId); - } - - private LogContext buildLogContext(Runtime runtime, String segmentName, UUID correlationId) { RunnerConfiguration runnerCfg = runtime.getService(RunnerConfiguration.class); boolean redirectSystemOutAndErr = runnerCfg.logging().sendSystemOutAndErrToSLF4J(); - return LogContext.builder() + return logContext = LogContext.builder() .segmentName(segmentName) .correlationId(correlationId) .redirectSystemOutAndErr(redirectSystemOutAndErr) @@ -196,9 +196,9 @@ private static String getExceptionMessage(Exception e) { if (e instanceof ELException) { return - ExceptionUtils.getExceptionList(e).stream() - .map(Throwable::getMessage) - .collect(Collectors.joining(". ")); + ExceptionUtils.getExceptionList(e).stream() + .map(Throwable::getMessage) + .collect(Collectors.joining(". ")); } List exceptions = ExceptionUtils.getExceptionList(e); diff --git a/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/vm/TaskSuspendCommand.java b/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/vm/TaskSuspendCommand.java index f1abcd6d83..97f6a4910c 100644 --- a/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/vm/TaskSuspendCommand.java +++ b/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/vm/TaskSuspendCommand.java @@ -23,6 +23,7 @@ import com.walmartlabs.concord.runtime.v2.model.TaskCall; import com.walmartlabs.concord.runtime.v2.runner.context.ResumeEventImpl; import com.walmartlabs.concord.runtime.v2.runner.logging.LogContext; +import com.walmartlabs.concord.runtime.v2.runner.logging.LogUtils; import com.walmartlabs.concord.svm.Runtime; import com.walmartlabs.concord.svm.*; @@ -35,14 +36,12 @@ public class TaskSuspendCommand implements Command { private static final long serialVersionUID = 1L; private final UUID correlationId; - private final LogContext logContext; private final String eventName; private final TaskCall step; private final Map taskState; - public TaskSuspendCommand(UUID correlationId, LogContext logContext, String eventName, TaskCall step, Map taskState) { + public TaskSuspendCommand(UUID correlationId, String eventName, TaskCall step, Map taskState) { this.correlationId = correlationId; - this.logContext = logContext; this.eventName = eventName; this.step = step; this.taskState = taskState; @@ -53,6 +52,7 @@ public void eval(Runtime runtime, State state, ThreadId threadId) { Frame frame = state.peekFrame(threadId); frame.pop(); + LogContext logContext = LogUtils.getContext(); frame.push(new TaskResumeCommand(correlationId, logContext, step, new ResumeEventImpl(eventName, taskState))); frame.push(new SuspendCommand(eventName)); } diff --git a/runtime/v2/vm/src/main/java/com/walmartlabs/concord/svm/VM.java b/runtime/v2/vm/src/main/java/com/walmartlabs/concord/svm/VM.java index 0048d7a378..a8dc68aaf7 100644 --- a/runtime/v2/vm/src/main/java/com/walmartlabs/concord/svm/VM.java +++ b/runtime/v2/vm/src/main/java/com/walmartlabs/concord/svm/VM.java @@ -23,6 +23,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Serial; import java.io.Serializable; import java.util.Collection; import java.util.Map; @@ -91,12 +92,12 @@ public void resume(State state, Set eventRefs) throws Exception { public void run(State state, Command cmd) throws Exception { log.debug("run ['{}'] -> start", cmd); - Runtime rt = runtimeFactory.create(this); + Runtime runtime = runtimeFactory.create(this); ThreadId threadId = state.getRootThreadId(); try { - cmd.eval(rt, state, threadId); + cmd.eval(runtime, state, threadId); } catch (Exception e) { - cmd.onException(rt, e, state, threadId); + cmd.onException(runtime, e, state, threadId); throw e; } @@ -166,7 +167,7 @@ private EvalResult execute(Runtime runtime, State state) throws Exception { EvalResult result; while (true) { - // if we're restoring from a previously saved state or we had new threads created + // if we're restoring from a previously saved state, or we had new threads created // on the previous iteration we need to spawn all READY threads for (Map.Entry e : state.threadStatus().entrySet()) { if (e.getKey() != state.getRootThreadId() && e.getValue() == ThreadStatus.READY) { @@ -232,8 +233,9 @@ private static void wakeSuspended(State state) { } } - private static class EvalResult implements Serializable { + public static class EvalResult implements Serializable { + @Serial private static final long serialVersionUID = 1L; private final Frame lastFrame;