From 57118f388b716e025e78100bf879ceee12f3fb9a Mon Sep 17 00:00:00 2001 From: Ivan Bodrov Date: Mon, 6 May 2024 12:52:58 -0400 Subject: [PATCH 1/3] runtime-v2: tidy up --- .../runtime/v2/runner/logging/RunnerLogger.java | 2 +- .../runtime/v2/runner/logging/SegmentedLogger.java | 4 ++-- .../runtime/v2/runner/logging/SimpleLogger.java | 2 +- .../concord/runtime/v2/runner/vm/StepCommand.java | 12 ++++-------- .../main/java/com/walmartlabs/concord/svm/VM.java | 12 +++++++----- 5 files changed, 15 insertions(+), 17 deletions(-) diff --git a/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/logging/RunnerLogger.java b/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/logging/RunnerLogger.java index 30f0f5bf77..6b55ff8701 100644 --- a/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/logging/RunnerLogger.java +++ b/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/logging/RunnerLogger.java @@ -25,7 +25,7 @@ public interface RunnerLogger { - void withContext(LogContext context, Runnable runnable); + void withLogContext(LogContext context, Runnable runnable); @Nullable Long createSegment(String segmentName, UUID correlationId); diff --git a/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/logging/SegmentedLogger.java b/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/logging/SegmentedLogger.java index f5582a7628..45fcf7642e 100644 --- a/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/logging/SegmentedLogger.java +++ b/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/logging/SegmentedLogger.java @@ -53,7 +53,7 @@ public Long createSegment(String segmentName, UUID correlationId) { } @Override - public void withContext(LogContext context, Runnable runnable) { + public void withLogContext(LogContext context, Runnable runnable) { ThreadGroup threadGroup = new LogContextThreadGroup(context); executeInThreadGroup(threadGroup, "thread-" + context.segmentName(), () -> { // make sure the redirection is enabled in the current thread @@ -99,7 +99,7 @@ private static Map meta(AbstractStep step) { /** * Executes the {@link Callable} in the specified {@link ThreadGroup}. - * A bit expensive as it is creates a new thread. + * A bit expensive as it is creating a new thread. */ private static void executeInThreadGroup(ThreadGroup group, String threadName, Runnable runnable) { ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadGroupAwareThreadFactory(group, threadName)); diff --git a/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/logging/SimpleLogger.java b/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/logging/SimpleLogger.java index 7234afd4da..1464b1a9b3 100644 --- a/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/logging/SimpleLogger.java +++ b/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/logging/SimpleLogger.java @@ -30,7 +30,7 @@ public Long createSegment(String segmentName, UUID correlationId) { } @Override - public void withContext(LogContext context, Runnable runnable) { + public void withLogContext(LogContext context, Runnable runnable) { try { runnable.run(); } catch (RuntimeException e) { diff --git a/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/vm/StepCommand.java b/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/vm/StepCommand.java index 52b0247d60..58259e25c3 100644 --- a/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/vm/StepCommand.java +++ b/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/vm/StepCommand.java @@ -95,10 +95,10 @@ public void eval(Runtime runtime, State state, ThreadId threadId) { logContext = getLogContext(runtime, ctx, correlationId); if (logContext == null) { - executeWithContext(ctx, runtime, state, threadId); + ContextProvider.withContext(ctx, () -> execute(runtime, state, threadId)); } else { - runtime.getService(RunnerLogger.class).withContext(logContext, - () -> executeWithContext(ctx, runtime, state, threadId)); + runtime.getService(RunnerLogger.class).withLogContext(logContext, + () -> ContextProvider.withContext(ctx, () -> execute(runtime, state, threadId))); } } @@ -106,10 +106,6 @@ public UUID getCorrelationId() { return correlationId; } - private void executeWithContext(Context ctx, Runtime runtime, State state, ThreadId threadId) { - ContextProvider.withContext(ctx, () -> execute(runtime, state, threadId)); - } - @Override public void onException(Runtime runtime, Exception e, State state, ThreadId threadId) { if (step.getLocation() == null) { @@ -119,7 +115,7 @@ public void onException(Runtime runtime, Exception e, State state, ThreadId thre if (logContext == null) { logException(e, state, threadId); } else { - runtime.getService(RunnerLogger.class).withContext(logContext, + runtime.getService(RunnerLogger.class).withLogContext(logContext, () -> logException(e, state, threadId)); } } diff --git a/runtime/v2/vm/src/main/java/com/walmartlabs/concord/svm/VM.java b/runtime/v2/vm/src/main/java/com/walmartlabs/concord/svm/VM.java index 0048d7a378..a8dc68aaf7 100644 --- a/runtime/v2/vm/src/main/java/com/walmartlabs/concord/svm/VM.java +++ b/runtime/v2/vm/src/main/java/com/walmartlabs/concord/svm/VM.java @@ -23,6 +23,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Serial; import java.io.Serializable; import java.util.Collection; import java.util.Map; @@ -91,12 +92,12 @@ public void resume(State state, Set eventRefs) throws Exception { public void run(State state, Command cmd) throws Exception { log.debug("run ['{}'] -> start", cmd); - Runtime rt = runtimeFactory.create(this); + Runtime runtime = runtimeFactory.create(this); ThreadId threadId = state.getRootThreadId(); try { - cmd.eval(rt, state, threadId); + cmd.eval(runtime, state, threadId); } catch (Exception e) { - cmd.onException(rt, e, state, threadId); + cmd.onException(runtime, e, state, threadId); throw e; } @@ -166,7 +167,7 @@ private EvalResult execute(Runtime runtime, State state) throws Exception { EvalResult result; while (true) { - // if we're restoring from a previously saved state or we had new threads created + // if we're restoring from a previously saved state, or we had new threads created // on the previous iteration we need to spawn all READY threads for (Map.Entry e : state.threadStatus().entrySet()) { if (e.getKey() != state.getRootThreadId() && e.getValue() == ThreadStatus.READY) { @@ -232,8 +233,9 @@ private static void wakeSuspended(State state) { } } - private static class EvalResult implements Serializable { + public static class EvalResult implements Serializable { + @Serial private static final long serialVersionUID = 1L; private final Frame lastFrame; From 8c604ed0e5bcb1640582ecf22c7d3db53cfac9bb Mon Sep 17 00:00:00 2001 From: Ivan Bodrov Date: Mon, 6 May 2024 13:44:36 -0400 Subject: [PATCH 2/3] runtime-v2: remove constructor argument --- .../concord/runtime/v2/runner/context/ContextImpl.java | 2 +- .../concord/runtime/v2/runner/vm/TaskSuspendCommand.java | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/context/ContextImpl.java b/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/context/ContextImpl.java index da16a47293..c57a7de254 100644 --- a/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/context/ContextImpl.java +++ b/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/context/ContextImpl.java @@ -215,6 +215,6 @@ public void reentrantSuspend(String eventName, Map taskSta } state.peekFrame(currentThreadId) - .push(new TaskSuspendCommand(correlationId, LogUtils.getContext(), eventName, (TaskCall) step, taskState)); + .push(new TaskSuspendCommand(correlationId, eventName, (TaskCall) step, taskState)); } } diff --git a/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/vm/TaskSuspendCommand.java b/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/vm/TaskSuspendCommand.java index f1abcd6d83..97f6a4910c 100644 --- a/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/vm/TaskSuspendCommand.java +++ b/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/vm/TaskSuspendCommand.java @@ -23,6 +23,7 @@ import com.walmartlabs.concord.runtime.v2.model.TaskCall; import com.walmartlabs.concord.runtime.v2.runner.context.ResumeEventImpl; import com.walmartlabs.concord.runtime.v2.runner.logging.LogContext; +import com.walmartlabs.concord.runtime.v2.runner.logging.LogUtils; import com.walmartlabs.concord.svm.Runtime; import com.walmartlabs.concord.svm.*; @@ -35,14 +36,12 @@ public class TaskSuspendCommand implements Command { private static final long serialVersionUID = 1L; private final UUID correlationId; - private final LogContext logContext; private final String eventName; private final TaskCall step; private final Map taskState; - public TaskSuspendCommand(UUID correlationId, LogContext logContext, String eventName, TaskCall step, Map taskState) { + public TaskSuspendCommand(UUID correlationId, String eventName, TaskCall step, Map taskState) { this.correlationId = correlationId; - this.logContext = logContext; this.eventName = eventName; this.step = step; this.taskState = taskState; @@ -53,6 +52,7 @@ public void eval(Runtime runtime, State state, ThreadId threadId) { Frame frame = state.peekFrame(threadId); frame.pop(); + LogContext logContext = LogUtils.getContext(); frame.push(new TaskResumeCommand(correlationId, logContext, step, new ResumeEventImpl(eventName, taskState))); frame.push(new SuspendCommand(eventName)); } From 60bc653947f4644205457fe4f2e8e0e5d5cf82f7 Mon Sep 17 00:00:00 2001 From: Ivan Bodrov Date: Tue, 7 May 2024 08:50:34 -0400 Subject: [PATCH 3/3] runtime-v2: synchronize LogContext creation Trying an easy fix first. --- .../runtime/v2/runner/vm/StepCommand.java | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/vm/StepCommand.java b/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/vm/StepCommand.java index 58259e25c3..fa788fe89f 100644 --- a/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/vm/StepCommand.java +++ b/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/vm/StepCommand.java @@ -40,6 +40,7 @@ import javax.el.ELException; import java.util.List; +import java.util.Objects; import java.util.UUID; import java.util.stream.Collectors; @@ -93,7 +94,7 @@ public void eval(Runtime runtime, State state, ThreadId threadId) { UUID correlationId = getCorrelationId(); Context ctx = contextFactory.create(runtime, state, threadId, step, correlationId); - logContext = getLogContext(runtime, ctx, correlationId); + logContext = initLogContext(runtime, ctx, correlationId); if (logContext == null) { ContextProvider.withContext(ctx, () -> execute(runtime, state, threadId)); } else { @@ -131,11 +132,18 @@ private void logException(Exception e, State state, ThreadId threadId) { protected abstract void execute(Runtime runtime, State state, ThreadId threadId); protected LogContext getLogContext() { + if (logContext == null) { + log.warn("getLogContext -> not initialized yet"); + } return logContext; } - private LogContext getLogContext(Runtime runtime, Context ctx, UUID correlationId) { + // TODO create logContext outside of StepCommand + private synchronized LogContext initLogContext(Runtime runtime, Context ctx, UUID correlationId) { if (logContext != null) { + if (!Objects.equals(correlationId, this.logContext.correlationId())) { + log.warn("initLogContext -> correlationId mismatch, requested: {}, current: {}", correlationId, this.logContext.correlationId()); + } return logContext; } @@ -144,14 +152,10 @@ private LogContext getLogContext(Runtime runtime, Context ctx, UUID correlationI return null; } - return buildLogContext(runtime, segmentName, correlationId); - } - - private LogContext buildLogContext(Runtime runtime, String segmentName, UUID correlationId) { RunnerConfiguration runnerCfg = runtime.getService(RunnerConfiguration.class); boolean redirectSystemOutAndErr = runnerCfg.logging().sendSystemOutAndErrToSLF4J(); - return LogContext.builder() + return logContext = LogContext.builder() .segmentName(segmentName) .correlationId(correlationId) .redirectSystemOutAndErr(redirectSystemOutAndErr) @@ -192,9 +196,9 @@ private static String getExceptionMessage(Exception e) { if (e instanceof ELException) { return - ExceptionUtils.getExceptionList(e).stream() - .map(Throwable::getMessage) - .collect(Collectors.joining(". ")); + ExceptionUtils.getExceptionList(e).stream() + .map(Throwable::getMessage) + .collect(Collectors.joining(". ")); } List exceptions = ExceptionUtils.getExceptionList(e);